?? jobinprogress.java
字號:
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.mapred;import org.apache.hadoop.fs.*;import org.apache.hadoop.conf.*;import org.apache.hadoop.util.LogFormatter;import java.io.*;import java.net.*;import java.util.*;import java.util.logging.*;///////////////////////////////////////////////////////// JobInProgress maintains all the info for keeping// a Job on the straight and narrow. It keeps its JobProfile// and its latest JobStatus, plus a set of tables for // doing bookkeeping of its Tasks.///////////////////////////////////////////////////////class JobInProgress { public static final Logger LOG = LogFormatter.getLogger("org.apache.hadoop.mapred.JobInProgress"); JobProfile profile; JobStatus status; File localJobFile = null; File localJarFile = null; TaskInProgress maps[] = new TaskInProgress[0]; TaskInProgress reduces[] = new TaskInProgress[0]; int numMapTasks = 0; int numReduceTasks = 0; JobTracker jobtracker = null; TreeMap cachedHints = new TreeMap(); long startTime; long finishTime; String deleteUponCompletion = null; private JobConf conf; boolean tasksInited = false; /** * Create a JobInProgress with the given job file, plus a handle * to the tracker. */ public JobInProgress(String jobFile, JobTracker jobtracker, Configuration default_conf) throws IOException { String jobid = "job_" + jobtracker.createUniqueId(); String url = "http://" + jobtracker.getJobTrackerMachine() + ":" + jobtracker.getInfoPort() + "/jobdetails.jsp?jobid=" + jobid; this.jobtracker = jobtracker; this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP); this.startTime = System.currentTimeMillis(); JobConf default_job_conf = new JobConf(default_conf); this.localJobFile = default_job_conf.getLocalFile(JobTracker.SUBDIR, jobid + ".xml"); this.localJarFile = default_job_conf.getLocalFile(JobTracker.SUBDIR, jobid + ".jar"); FileSystem fs = FileSystem.get(default_conf); fs.copyToLocalFile(new File(jobFile), localJobFile); conf = new JobConf(localJobFile); this.profile = new JobProfile(conf.getUser(), jobid, jobFile, url, conf.getJobName()); String jarFile = conf.getJar(); if (jarFile != null) { fs.copyToLocalFile(new File(jarFile), localJarFile); conf.setJar(localJarFile.getCanonicalPath()); } this.numMapTasks = conf.getNumMapTasks(); this.numReduceTasks = conf.getNumReduceTasks(); // // If a jobFile is in the systemDir, we can delete it (and // its JAR) upon completion // File systemDir = conf.getSystemDir(); if (jobFile.startsWith(systemDir.getPath())) { this.deleteUponCompletion = jobFile; } } /** * Construct the splits, etc. This is invoked from an async * thread so that split-computation doesn't block anyone. */ public void initTasks() throws IOException { if (tasksInited) { return; } // // construct input splits // String jobid = profile.getJobId(); String jobFile = profile.getJobFile(); JobConf jd = new JobConf(localJobFile); FileSystem fs = FileSystem.get(conf); String ifClassName = jd.get("mapred.input.format.class"); InputFormat inputFormat; if (ifClassName != null && localJarFile != null) { try { ClassLoader loader = new URLClassLoader(new URL[]{ localJarFile.toURL() }); Class inputFormatClass = loader.loadClass(ifClassName); inputFormat = (InputFormat)inputFormatClass.newInstance(); } catch (Exception e) { throw new IOException(e.toString()); } } else { inputFormat = jd.getInputFormat(); } FileSplit[] splits = inputFormat.getSplits(fs, jd, numMapTasks); // // sort splits by decreasing length, to reduce job's tail // Arrays.sort(splits, new Comparator() { public int compare(Object a, Object b) { long diff = ((FileSplit)b).getLength() - ((FileSplit)a).getLength(); return diff==0 ? 0 : (diff > 0 ? 1 : -1); } }); // // adjust number of map tasks to actual number of splits // this.numMapTasks = splits.length; // create a map task for each split this.maps = new TaskInProgress[numMapTasks]; for (int i = 0; i < numMapTasks; i++) { maps[i] = new TaskInProgress(jobFile, splits[i], jobtracker, conf, this); } // // Create reduce tasks // this.reduces = new TaskInProgress[numReduceTasks]; for (int i = 0; i < numReduceTasks; i++) { reduces[i] = new TaskInProgress(jobFile, maps, i, jobtracker, conf, this); } // // Obtain some tasktracker-cache information for the map task splits. // for (int i = 0; i < maps.length; i++) { String hints[][] = fs.getFileCacheHints(splits[i].getFile(), splits[i].getStart(), splits[i].getLength()); cachedHints.put(maps[i].getTIPId(), hints); } this.status = new JobStatus(status.getJobId(), 0.0f, 0.0f, JobStatus.RUNNING); tasksInited = true; } /** * This is called by TaskInProgress objects. The JobInProgress * prefetches and caches a lot of these hints. If the hint is * not available, then we pass it through to the filesystem. */ String[][] getFileCacheHints(String tipID, File f, long start, long len) throws IOException { String results[][] = (String[][]) cachedHints.get(tipID); if (tipID == null) { FileSystem fs = FileSystem.get(conf); results = fs.getFileCacheHints(f, start, len); cachedHints.put(tipID, results); } return results; } ///////////////////////////////////////////////////// // Accessors for the JobInProgress ///////////////////////////////////////////////////// public JobProfile getProfile() { return profile; } public JobStatus getStatus() { return status; } public long getStartTime() { return startTime; } public long getFinishTime() { return finishTime; } public int desiredMaps() { return numMapTasks; } public int finishedMaps() { int finishedCount = 0; for (int i = 0; i < maps.length; i++) { if (maps[i].isComplete()) { finishedCount++; } } return finishedCount; } public int desiredReduces() { return numReduceTasks; } public int finishedReduces() { int finishedCount = 0; for (int i = 0; i < reduces.length; i++) { if (reduces[i].isComplete()) { finishedCount++; } } return finishedCount; } /** * Return a treeset of completed TaskInProgress objects */ public Vector reportTasksInProgress(boolean shouldBeMap, boolean shouldBeComplete) { Vector results = new Vector(); TaskInProgress tips[] = null; if (shouldBeMap) { tips = maps; } else { tips = reduces; } for (int i = 0; i < tips.length; i++) { if (tips[i].isComplete() == shouldBeComplete) { results.add(tips[i]); } } return results; } //////////////////////////////////////////////////// // Status update methods //////////////////////////////////////////////////// public void updateTaskStatus(TaskInProgress tip, TaskStatus status) { double oldProgress = tip.getProgress(); // save old progress tip.updateStatus(status); // update tip //
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -