?? fsnamesystem.java
字號:
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.dfs;import org.apache.hadoop.io.*;import org.apache.hadoop.conf.*;import org.apache.hadoop.util.*;import java.io.*;import java.util.*;import java.util.logging.*;/*************************************************** * FSNamesystem does the actual bookkeeping work for the * DataNode. * * It tracks several important tables. * * 1) valid fsname --> blocklist (kept on disk, logged) * 2) Set of all valid blocks (inverted #1) * 3) block --> machinelist (kept in memory, rebuilt dynamically from reports) * 4) machine --> blocklist (inverted #2) * 5) LRU cache of updated-heartbeat machines ***************************************************/class FSNamesystem implements FSConstants { public static final Logger LOG = LogFormatter.getLogger("org.apache.hadoop.fs.FSNamesystem"); // // Stores the correct file name hierarchy // FSDirectory dir; // // Stores the block-->datanode(s) map. Updated only in response // to client-sent information. // TreeMap blocksMap = new TreeMap(); // // Stores the datanode-->block map. Done by storing a // set of datanode info objects, sorted by name. Updated only in // response to client-sent information. // TreeMap datanodeMap = new TreeMap(); // // Keeps a Vector for every named machine. The Vector contains // blocks that have recently been invalidated and are thought to live // on the machine in question. // TreeMap recentInvalidateSets = new TreeMap(); // // Keeps a TreeSet for every named node. Each treeset contains // a list of the blocks that are "extra" at that location. We'll // eventually remove these extras. // TreeMap excessReplicateMap = new TreeMap(); // // Keeps track of files that are being created, plus the // blocks that make them up. // TreeMap pendingCreates = new TreeMap(); // // Keeps track of the blocks that are part of those pending creates // TreeSet pendingCreateBlocks = new TreeSet(); // // Stats on overall usage // long totalCapacity = 0, totalRemaining = 0; // Random r = new Random(); // // Stores a set of datanode info objects, sorted by heartbeat // TreeSet heartbeats = new TreeSet(new Comparator() { public int compare(Object o1, Object o2) { DatanodeInfo d1 = (DatanodeInfo) o1; DatanodeInfo d2 = (DatanodeInfo) o2; long lu1 = d1.lastUpdate(); long lu2 = d2.lastUpdate(); if (lu1 < lu2) { return -1; } else if (lu1 > lu2) { return 1; } else { return d1.getName().compareTo(d2.getName()); } } }); // // Store set of Blocks that need to be replicated 1 or more times. // We also store pending replication-orders. // private TreeSet neededReplications = new TreeSet(); private TreeSet pendingReplications = new TreeSet(); // // Used for handling lock-leases // private TreeMap leases = new TreeMap(); private TreeSet sortedLeases = new TreeSet(); // // Threaded object that checks to see if we have been // getting heartbeats from all clients. // HeartbeatMonitor hbmon = null; LeaseMonitor lmon = null; Daemon hbthread = null, lmthread = null; boolean fsRunning = true; long systemStart = 0; private Configuration conf; // DESIRED_REPLICATION is how many copies we try to have at all times private int desiredReplication; // The maximum number of replicates we should allow for a single block private int maxReplication; // How many outgoing replication streams a given node should have at one time private int maxReplicationStreams; // MIN_REPLICATION is how many copies we need in place or else we disallow the write private int minReplication; // HEARTBEAT_RECHECK is how often a datanode sends its hearbeat private int heartBeatRecheck; /** * dir is where the filesystem directory state * is stored */ public FSNamesystem(File dir, Configuration conf) throws IOException { this.dir = new FSDirectory(dir); this.hbthread = new Daemon(new HeartbeatMonitor()); this.lmthread = new Daemon(new LeaseMonitor()); hbthread.start(); lmthread.start(); this.systemStart = System.currentTimeMillis(); this.conf = conf; this.desiredReplication = conf.getInt("dfs.replication", 3); this.maxReplication = desiredReplication; this.maxReplicationStreams = conf.getInt("dfs.max-repl-streams", 2); this.minReplication = 1; this.heartBeatRecheck= 1000; } /** Close down this filesystem manager. * Causes heartbeat and lease daemons to stop; waits briefly for * them to finish, but a short timeout returns control back to caller. */ public void close() { synchronized (this) { fsRunning = false; } try { hbthread.join(3000); } catch (InterruptedException ie) { } finally { // using finally to ensure we also wait for lease daemon try { lmthread.join(3000); } catch (InterruptedException ie) { } } } ///////////////////////////////////////////////////////// // // These methods are called by HadoopFS clients // ///////////////////////////////////////////////////////// /** * The client wants to open the given filename. Return a * list of (block,machineArray) pairs. The sequence of unique blocks * in the list indicates all the blocks that make up the filename. * * The client should choose one of the machines from the machineArray * at random. */ public Object[] open(UTF8 src) { Object results[] = null; Block blocks[] = dir.getFile(src); if (blocks != null) { results = new Object[2]; DatanodeInfo machineSets[][] = new DatanodeInfo[blocks.length][]; for (int i = 0; i < blocks.length; i++) { TreeSet containingNodes = (TreeSet) blocksMap.get(blocks[i]); if (containingNodes == null) { machineSets[i] = new DatanodeInfo[0]; } else { machineSets[i] = new DatanodeInfo[containingNodes.size()]; int j = 0; for (Iterator it = containingNodes.iterator(); it.hasNext(); j++) { machineSets[i][j] = (DatanodeInfo) it.next(); } } } results[0] = blocks; results[1] = machineSets; } return results; } /** * The client would like to create a new block for the indicated * filename. Return an array that consists of the block, plus a set * of machines. The first on this list should be where the client * writes data. Subsequent items in the list must be provided in * the connection to the first datanode. * @return Return an array that consists of the block, plus a set * of machines, or null if src is invalid for creation (based on * {@link FSDirectory#isValidToCreate(UTF8)}. */ public synchronized Object[] startFile(UTF8 src, UTF8 holder, UTF8 clientMachine, boolean overwrite) { Object results[] = null; if (pendingCreates.get(src) == null) { boolean fileValid = dir.isValidToCreate(src); if (overwrite && ! fileValid) { delete(src); fileValid = true; } if (fileValid) { results = new Object[2]; // Get the array of replication targets DatanodeInfo targets[] = chooseTargets(this.desiredReplication, null, clientMachine); if (targets.length < this.minReplication) { LOG.warning("Target-length is " + targets.length + ", below MIN_REPLICATION (" + this.minReplication+ ")"); return null; } // Reserve space for this pending file pendingCreates.put(src, new Vector()); synchronized (leases) { Lease lease = (Lease) leases.get(holder); if (lease == null) { lease = new Lease(holder); leases.put(holder, lease); sortedLeases.add(lease); } else { sortedLeases.remove(lease); lease.renew(); sortedLeases.add(lease); } lease.startedCreate(src); } // Create next block results[0] = allocateBlock(src); results[1] = targets; } else { // ! fileValid LOG.warning("Cannot start file because it is invalid. src=" + src); } } else { LOG.warning("Cannot start file because pendingCreates is non-null. src=" + src); } return results; } /** * The client would like to obtain an additional block for the indicated * filename (which is being written-to). Return an array that consists * of the block, plus a set of machines. The first on this list should * be where the client writes data. Subsequent items in the list must * be provided in the connection to the first datanode. * * Make sure the previous blocks have been reported by datanodes and * are replicated. Will return an empty 2-elt array if we want the * client to "try again later". */ public synchronized Object[] getAdditionalBlock(UTF8 src, UTF8 clientMachine) { Object results[] = null; if (dir.getFile(src) == null && pendingCreates.get(src) != null) { results = new Object[2]; // // If we fail this, bad things happen! // if (checkFileProgress(src)) { // Get the array of replication targets DatanodeInfo targets[] = chooseTargets(this.desiredReplication, null, clientMachine); if (targets.length < this.minReplication) { return null; } // Create next block results[0] = allocateBlock(src); results[1] = targets; } } return results; } /** * The client would like to let go of the given block */ public synchronized boolean abandonBlock(Block b, UTF8 src) { // // Remove the block from the pending creates list // Vector pendingVector = (Vector) pendingCreates.get(src); if (pendingVector != null) { for (Iterator it = pendingVector.iterator(); it.hasNext(); ) { Block cur = (Block) it.next(); if (cur.compareTo(b) == 0) { pendingCreateBlocks.remove(cur); it.remove(); return true; } } } return false;
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -