?? distributedwebdbwriter.java
字號:
/**
* This class helps the LinksByURLProcessor test a list of
* Page objects, sorted by URL, for outlink-counts. We query
* this class with a series of questions, based on Links sorted
* by target URL.
*/
private class TargetTester {
MapFile.Reader pagedb;
boolean hasPage = false;
UTF8 pageURL = null;
Page page = null;
/**
*/
public TargetTester(MapFile.Reader pagedb) throws IOException {
this.pagedb = pagedb;
this.pageURL = new UTF8();
this.page = new Page();
this.hasPage = pagedb.next(pageURL, page);
}
/**
* Match the given URL against the sorted series of Page URLs.
*/
public int hasOutlinks(UTF8 curURL) throws IOException {
int returnCode = NO_OUTLINKS;
int comparison = pageURL.compareTo(curURL);
while (hasPage && comparison < 0) {
hasPage = pagedb.next(pageURL, page);
if (hasPage) {
comparison = pageURL.compareTo(curURL);
}
}
if (hasPage) {
if (comparison == 0) {
returnCode = (page.getNumOutlinks() > 0) ? HAS_OUTLINKS : NO_OUTLINKS;
} else if (comparison > 0) {
//
// This situation indicates that the Link's
// target page has been deleted, probably
// because we repeatedly failed to fetch the URL.
// So, we should delete the Link.
//
returnCode = LINK_INVALID;
}
}
return returnCode;
}
/**
*/
public void close() throws IOException {
pagedb.close();
}
}
/**
* Closes down and merges changes to the URL-driven link
* table. This does nothing fancy, and propagates nothing
* to a further stage. There is no next stage!
*/
private class LinksByURLProcessor extends CloseProcessor {
MapFile.Reader pageDb;
EditSectionGroupWriter futureEdits;
/**
*/
public LinksByURLProcessor(MapFile.Reader db, EditSectionGroupWriter editWriter, MapFile.Reader pageDb, EditSectionGroupWriter futureEdits) {
super(LINKS_BY_URL, db, editWriter, new SequenceFile.Sorter(nfs, new LinkInstruction.UrlComparator(), NullWritable.class), new Link.UrlComparator(), Link.class, NullWritable.class, "LinksByURLPart");
this.pageDb = pageDb;
this.futureEdits = futureEdits;
}
/**
*/
public long closeDown(File workingDir, File outputDir) throws IOException {
long result = super.closeDown(workingDir, outputDir);
pageDb.close();
return result;
}
/**
* Merge the existing db with the edit-stream into a brand-new file.
*/
void mergeEdits(MapFile.Reader db, SequenceFile.Reader sortedEdits, MapFile.Writer newDb) throws IOException {
WritableComparator comparator = new Link.UrlComparator();
// Create the keys and vals we'll use
LinkInstruction editItem = new LinkInstruction();
Link readerItem = new Link();
// Read the first items from both streams
boolean hasEntries = db.next(readerItem, NullWritable.get());
boolean hasEdits = sortedEdits.next(editItem, NullWritable.get());
TargetTester targetTester = new TargetTester(pageDb);
// As long as we have both edits and entries to process,
// we need to interleave them
while (hasEntries && hasEdits) {
int curInstruction = editItem.getInstruction();
if (curInstruction == ADD_LINK) {
// When we add a link, we may replace a previous
// link with identical URL and MD5 values. Our
// comparator will test both
//
int comparison = comparator.compare(readerItem, editItem.getLink());
if (comparison < 0) {
// Write the readerKey, just passing it along.
// Don't process the edit yet.
int linkTest = targetTester.hasOutlinks(readerItem.getURL());
if (linkTest == LINK_INVALID) {
liwriter.appendInstructionInfo(futureEdits, readerItem, DEL_SINGLE_LINK, NullWritable.get());
targetOutlinkEdits++;
} else {
boolean oldOutlinkStatus = readerItem.targetHasOutlink();
boolean newOutlinkStatus = (linkTest == HAS_OUTLINKS);
// Do the conditional so we minimize unnecessary
// mod-writes.
if (oldOutlinkStatus != newOutlinkStatus) {
readerItem.setTargetHasOutlink(newOutlinkStatus);
liwriter.appendInstructionInfo(futureEdits, readerItem, ADD_LINK, NullWritable.get());
targetOutlinkEdits++;
}
newDb.append(readerItem, NullWritable.get());
itemsWritten++;
}
hasEntries = db.next(readerItem, NullWritable.get());
} else if (comparison == 0) {
// Write the new item, "replacing" the old one.
// We move to the next edit instruction and move
// past the replaced db entry.
Link editLink = editItem.getLink();
int linkTest = targetTester.hasOutlinks(editLink.getURL());
// Delete the edit/readerItem from the other table if it's
// found to be invalid.
if (linkTest == LINK_INVALID) {
liwriter.appendInstructionInfo(futureEdits, editLink, DEL_SINGLE_LINK, NullWritable.get());
} else {
editLink.setTargetHasOutlink(linkTest == HAS_OUTLINKS);
liwriter.appendInstructionInfo(futureEdits, editLink, ADD_LINK, NullWritable.get());
newDb.append(editLink, NullWritable.get());
itemsWritten++;
}
targetOutlinkEdits++;
hasEntries = db.next(readerItem, NullWritable.get());
hasEdits = sortedEdits.next(editItem, NullWritable.get());
} else if (comparison > 0) {
// Write the new item. We stay at the current
// db entry.
Link editLink = editItem.getLink();
int linkTest = targetTester.hasOutlinks(editLink.getURL());
// Delete the edit from the other table if it's invalid
if (linkTest == LINK_INVALID) {
liwriter.appendInstructionInfo(futureEdits, editLink, DEL_SINGLE_LINK, NullWritable.get());
} else {
editLink.setTargetHasOutlink(linkTest == HAS_OUTLINKS);
liwriter.appendInstructionInfo(futureEdits, editLink, ADD_LINK, NullWritable.get());
newDb.append(editLink, NullWritable.get());
itemsWritten++;
}
targetOutlinkEdits++;
hasEdits = sortedEdits.next(editItem, NullWritable.get());
}
} else if (curInstruction == DEL_LINK) {
// When we delete a link, we do it by MD5 and apply
// it to the index first. A single delete instruction
// may remove many items in the db, during the earlier
// processing. However, unlike the index-processing stage,
// here we can expect a new DEL instruction for every
// item that we remove from the db.
//
int comparison = comparator.compare(readerItem, editItem.getLink());
if (comparison < 0) {
// Write readerKey, just passing it along. Don't
// process the edit yet.
int linkTest = targetTester.hasOutlinks(readerItem.getURL());
// Delete the reader item if it's found to be invalid
if (linkTest == LINK_INVALID) {
liwriter.appendInstructionInfo(futureEdits, readerItem, DEL_SINGLE_LINK, NullWritable.get());
} else {
readerItem.setTargetHasOutlink(linkTest == HAS_OUTLINKS);
liwriter.appendInstructionInfo(futureEdits, readerItem, ADD_LINK, NullWritable.get());
newDb.append(readerItem, NullWritable.get());
itemsWritten++;
}
targetOutlinkEdits++;
hasEntries = db.next(readerItem, NullWritable.get());
} else if (comparison == 0) {
// "Delete" the item by passing by the readerKey.
// We want a new entry, as well as the next instruction
// to process.
hasEntries = db.next(readerItem, NullWritable.get());
hasEdits = sortedEdits.next(editItem, NullWritable.get());
} else if (comparison > 0) {
// Ignore, move on to next instruction
hasEdits = sortedEdits.next(editItem, NullWritable.get());
}
}
}
// Now we have only edits. No more preexisting items!
while (! hasEntries && hasEdits) {
int curInstruction = editItem.getInstruction();
if (curInstruction == ADD_LINK) {
//
// Add the item from the edit list.
//
//
// Make sure the outlinks flag is set properly.
//
Link editLink = editItem.getLink();
int linkTest = targetTester.hasOutlinks(editLink.getURL());
if (linkTest == LINK_INVALID) {
liwriter.appendInstructionInfo(futureEdits, editLink, DEL_SINGLE_LINK, NullWritable.get());
} else {
editLink.setTargetHasOutlink(linkTest == HAS_OUTLINKS);
liwriter.appendInstructionInfo(futureEdits, editLink, ADD_LINK, NullWritable.get());
newDb.append(editLink, NullWritable.get());
itemsWritten++;
}
targetOutlinkEdits++;
} else if (curInstruction == DEL_LINK) {
// Ignore operation
}
// Move on to next edit
hasEdits = sortedEdits.next(editItem, NullWritable.get());
}
// Now we have only preexisting items. Just copy them
// to the new file, in order.
while (hasEntries && ! hasEdits) {
//
// Simply copy the remaining database items.
//
//
// First, make sure the 'outlinks' flag is set properly.
//
int linkTest = targetTester.hasOutlinks(readerItem.getURL());
if (linkTest == LINK_INVALID) {
liwriter.appendInstructionInfo(futureEdits, readerItem, DEL_SINGLE_LINK, NullWritable.get());
targetOutlinkEdits++;
} else {
boolean oldOutlinkStatus = readerItem.targetHasOutlink();
boolean newOutlinkStatus = (linkTest == HAS_OUTLINKS);
if (oldOutlinkStatus != newOutlinkStatus) {
readerItem.setTargetHasOutlink(newOutlinkStatus);
liwriter.appendInstructionInfo(futureEdits, readerItem, ADD_LINK, NullWritable.get());
targetOutlinkEdits++;
}
// Now copy the object
newDb.append(readerItem, NullWritable.get());
itemsWritten++;
}
// Move on to next
hasEntries = db.next(readerItem, NullWritable.get());
}
targetTester.close();
}
}
/**
* Method useful for the first time we create a distributed db project.
* Basically need to write down the number of dirs we can expect.
*/
public static void createDB(NutchFileSystem nfs, File root, int totalMachines) throws IOException {
//
// Check to see if the db already exists
//
File stdDir = new File(root, "standard");
File machineInfo = new Fi
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -