?? fetcher.java
字號:
parseTextWriter.append(text);
parseDataWriter.append(parseData);
}
}
} catch (Throwable t) {
LOG.severe("error writing output:" + t.toString());
}
}
}
public Fetcher(NutchFileSystem nfs, String directory, boolean parsing)
throws IOException {
this.parsing = parsing;
// Set up in/out streams
fetchList = new ArrayFile.Reader
(nfs, new File(directory, FetchListEntry.DIR_NAME).toString());
if (this.parsing) {
fetcherWriter = new ArrayFile.Writer
(nfs, new File(directory, FetcherOutput.DIR_NAME).toString(),
FetcherOutput.class);
} else {
fetcherWriter = new ArrayFile.Writer
(nfs, new File(directory, FetcherOutput.DIR_NAME_NP).toString(),
FetcherOutput.class);
}
contentWriter = new ArrayFile.Writer
(nfs, new File(directory, Content.DIR_NAME).toString(), Content.class);
if (this.parsing) {
parseTextWriter = new ArrayFile.Writer(nfs,
new File(directory, ParseText.DIR_NAME).toString(), ParseText.class);
parseDataWriter = new ArrayFile.Writer(nfs,
new File(directory, ParseData.DIR_NAME).toString(), ParseData.class);
}
name = new File(directory).getName();
}
/** Set thread count */
public void setThreadCount(int threadCount) {
this.threadCount=threadCount;
}
/** Set the logging level. */
public static void setLogLevel(Level level) {
LOG.setLevel(level);
PluginRepository.LOG.setLevel(level);
ParserFactory.LOG.setLevel(level);
LOG.info("logging at " + level);
}
/** Runs the fetcher. */
public void run() throws IOException, InterruptedException {
start = System.currentTimeMillis();
for (int i = 0; i < threadCount; i++) { // spawn threads
FetcherThread thread = new FetcherThread(THREAD_GROUP_NAME+i);
thread.start();
}
// Quit monitoring if all FetcherThreads are gone.
// There could still be other threads, which may well be runaway threads
// started by external libs via FetcherThreads and it is generally safe
// to ignore them because our main FetcherThreads have finished their jobs.
// In fact we are a little more cautious here by making sure
// there is no more outstanding page fetches via monitoring
// changes of pages, errors and bytes.
int pages0 = pages; int errors0 = errors; long bytes0 = bytes;
while (true) {
Thread.sleep(1000);
if (LogFormatter.hasLoggedSevere())
throw new RuntimeException("SEVERE error logged. Exiting fetcher.");
int n = group.activeCount();
Thread[] list = new Thread[n];
group.enumerate(list);
boolean noMoreFetcherThread = true; // assumption
for (int i = 0; i < n; i++) {
// this thread may have gone away in the meantime
if (list[i] == null) continue;
String name = list[i].getName();
if (name.startsWith(THREAD_GROUP_NAME)) // prove it
noMoreFetcherThread = false;
if (LOG.isLoggable(Level.FINE))
LOG.fine(list[i].toString());
}
if (noMoreFetcherThread) {
if (LOG.isLoggable(Level.FINE))
LOG.fine("number of active threads: "+n);
if (pages == pages0 && errors == errors0 && bytes == bytes0)
break;
status();
pages0 = pages; errors0 = errors; bytes0 = bytes;
}
}
fetchList.close(); // close databases
fetcherWriter.close();
contentWriter.close();
if (this.parsing) {
parseTextWriter.close();
parseDataWriter.close();
}
}
public static class FetcherStatus {
private String name;
private long startTime, curTime;
private int pageCount, errorCount;
private long byteCount;
/**
* FetcherStatus encapsulates a snapshot of the Fetcher progress status.
* @param name short name of the segment being processed
* @param start the time in millisec. this fetcher was started
* @param pages number of pages fetched
* @param errors number of fetching errors
* @param bytes number of bytes fetched
*/
public FetcherStatus(String name, long start, int pages, int errors, long bytes) {
this.name = name;
this.startTime = start;
this.curTime = System.currentTimeMillis();
this.pageCount = pages;
this.errorCount = errors;
this.byteCount = bytes;
}
public String getName() {return name;}
public long getStartTime() {return startTime;}
public long getCurTime() {return curTime;}
public long getElapsedTime() {return curTime - startTime;}
public int getPageCount() {return pageCount;}
public int getErrorCount() {return errorCount;}
public long getByteCount() {return byteCount;}
public String toString() {
return "status: segment " + name + ", "
+ pageCount + " pages, "
+ errorCount + " errors, "
+ byteCount + " bytes, "
+ (curTime - startTime) + " ms";
}
}
public synchronized FetcherStatus getStatus() {
return new FetcherStatus(name, start, pages, errors, bytes);
}
/** Display the status of the fetcher run. */
public synchronized void status() {
FetcherStatus status = getStatus();
LOG.info(status.toString());
LOG.info("status: "
+ (((float)status.getPageCount())/(status.getElapsedTime()/1000.0f))+" pages/s, "
+ (((float)status.getByteCount()*8/1024)/(status.getElapsedTime()/1000.0f))+" kb/s, "
+ (((float)status.getByteCount())/status.getPageCount()) + " bytes/page");
}
/** Run the fetcher. */
public static void main(String[] args) throws Exception {
int threadCount = -1;
long delay = -1;
String logLevel = "info";
boolean parsing = true;
boolean showThreadID = false;
String directory = null;
String usage = "Usage: Fetcher (-local | -ndfs <namenode:port>) [-logLevel level] [-noParsing] [-showThreadID] [-threads n] <dir>";
if (args.length == 0) {
System.err.println(usage);
System.exit(-1);
}
int i = 0;
NutchFileSystem nfs = NutchFileSystem.parseArgs(args, i);
for (; i < args.length; i++) { // parse command line
if (args[i] == null) {
continue;
} else if (args[i].equals("-threads")) { // found -threads option
threadCount = Integer.parseInt(args[++i]);
} else if (args[i].equals("-logLevel")) {
logLevel = args[++i];
} else if (args[i].equals("-noParsing")) {
parsing = false;
} else if (args[i].equals("-showThreadID")) {
showThreadID = true;
} else // root is required parameter
directory = args[i];
}
Fetcher fetcher = new Fetcher(nfs, directory, parsing);// make a Fetcher
if (threadCount != -1) { // set threadCount option
fetcher.setThreadCount(threadCount);
}
// set log level
fetcher.setLogLevel(Level.parse(logLevel.toUpperCase()));
if (showThreadID) {
LogFormatter.setShowThreadIDs(showThreadID);
}
try {
fetcher.run(); // run the Fetcher
} finally {
nfs.close();
}
}
}
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -