?? distributedwebdbwriter.java
字號:
/**
*/
public boolean next(PageInstruction result) throws IOException {
if (!haveCurrent) {
return false;
}
currentUrl.set(current.getPage().getURL());
result.set(current); // take the first instruction
do {
// skip the rest
} while ((haveCurrent = edits.next(current, NullWritable.get())) &&
currentUrl.compareTo(current.getPage().getURL()) == 0);
return true;
}
}
/*************************************************
* Holds an instruction over a Link.
*************************************************/
public static class LinkInstruction implements WritableComparable {
Link link;
int instruction;
/**
*/
public LinkInstruction() {
}
/**
*/
public LinkInstruction(Link link, int instruction) {
set(link, instruction);
}
/**
* Re-init from another LinkInstruction's info.
*/
public void set(LinkInstruction that) {
this.instruction = that.instruction;
if (this.link == null)
this.link = new Link();
this.link.set(that.link);
}
/**
* Re-init with a Link and an instruction
*/
public void set(Link link, int instruction) {
this.link = link;
this.instruction = instruction;
}
//
// WritableComparable
//
public int compareTo(Object o) {
return this.link.compareTo(((LinkInstruction) o).link);
}
public void write(DataOutput out) throws IOException {
out.writeByte(instruction);
link.write(out);
}
public void readFields(DataInput in) throws IOException {
this.instruction = in.readByte();
if (link == null)
link = new Link();
link.readFields(in);
}
public Link getLink() {
return link;
}
public int getInstruction() {
return instruction;
}
/*******************************************************
* Sorts the instruction first by Md5, then by opcode.
*******************************************************/
public static class MD5Comparator extends WritableComparator {
private static final Link.MD5Comparator MD5_COMPARATOR =
new Link.MD5Comparator();
public MD5Comparator() { super(LinkInstruction.class); }
public int compare(WritableComparable a, WritableComparable b) {
LinkInstruction instructionA = (LinkInstruction)a;
LinkInstruction instructionB = (LinkInstruction)b;
return instructionA.link.md5Compare(instructionB.link);
}
/** Optimized comparator. */
public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
return MD5_COMPARATOR.compare(b1, s1+1, l1-1, b2, s2+1, l2-1);
}
}
/*********************************************************
* Sorts the instruction first by url, then by opcode.
*********************************************************/
public static class UrlComparator extends WritableComparator {
private static final Link.UrlComparator URL_COMPARATOR =
new Link.UrlComparator();
public UrlComparator() { super(LinkInstruction.class); }
public int compare(WritableComparable a, WritableComparable b) {
LinkInstruction instructionA = (LinkInstruction)a;
LinkInstruction instructionB = (LinkInstruction)b;
return instructionA.link.urlCompare(instructionB.link);
}
/**
* Optimized comparator.
*/
public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
return URL_COMPARATOR.compare(b1, s1+1, l1-1, b2, s2+1, l2-1);
}
}
}
/*******************************************************
* LinkInstructionWriter very efficiently writes a
* LinkInstruction to an EditSectionGroupWriter. Much better
* than calling "writer.append(new LinkInstruction())"
********************************************************/
public static class LinkInstructionWriter {
LinkInstruction li = new LinkInstruction();
/**
*/
public LinkInstructionWriter() {
}
/**
* Append the LinkInstruction info to the indicated SequenceFile
* and keep the LI for later reuse.
*/
public synchronized void appendInstructionInfo(EditSectionGroupWriter writer, Link link, int opcode, Writable val) throws IOException {
li.set(link, opcode);
writer.append(li, val);
}
}
/********************************************************
* This class deduplicates link operations. We want to
* sort by MD5, then by URL. But all operations
* should be unique.
*********************************************************/
class DeduplicatingLinkSequenceReader {
Link currentKey = new Link();
LinkInstruction current = new LinkInstruction();
SequenceFile.Reader edits;
boolean haveCurrent;
/**
*/
public DeduplicatingLinkSequenceReader(SequenceFile.Reader edits) throws IOException {
this.edits = edits;
this.haveCurrent = edits.next(current, NullWritable.get());
}
/**
* The incoming stream of edits is sorted first by MD5, then by URL.
* MD5-only values always come before MD5+URL.
*/
public boolean next(LinkInstruction key) throws IOException {
if (! haveCurrent) {
return false;
}
currentKey.set(current.getLink());
do {
key.set(current);
} while ((haveCurrent = edits.next(current, NullWritable.get())) &&
currentKey.compareTo(current.getLink()) == 0);
return true;
}
}
/**************************************************
* The CloseProcessor class is used when we close down
* the webdb. We give it the path, members, and class values
* needed to apply changes to any of our 4 data tables.
*
* This is an abstract class. Each subclass must define
* the exact merge procedure. However, file-handling
* and edit-processing is standardized as much as possible.
*
**************************************************/
private abstract class CloseProcessor {
String basename;
String curDBPart;
MapFile.Reader oldDb;
EditSectionGroupWriter editWriter;
SequenceFile.Sorter sorter;
WritableComparator comparator;
Class keyClass, valueClass;
long itemsWritten = 0;
/**
* Store away these members for later use.
*/
CloseProcessor(String basename, MapFile.Reader oldDb, EditSectionGroupWriter editWriter, SequenceFile.Sorter sorter, WritableComparator comparator, Class keyClass, Class valueClass, String curDBPart) {
this.basename = basename;
this.oldDb = oldDb;
this.editWriter = editWriter;
this.sorter = sorter;
this.comparator = comparator;
this.keyClass = keyClass;
this.valueClass = valueClass;
this.curDBPart = curDBPart;
}
/**
* Perform the shutdown sequence for this Processor.
* There is a lot of file-moving and edit-sorting that
* is common across all the 4 tables.
*
* Returns how many items were written out by this close().
*/
long closeDown(File workingDir, File outputDir) throws IOException {
//
// Done adding edits, so close edit-writer.
//
editWriter.close();
//
// Where the output is going
//
File sectionDir = new File(outputDir, "dbsection." + machineNum);
File newDbFile = new File(sectionDir, basename);
//
// Grab all the edits that we need to process. We build an EditSectionGroupReader
// and aim it at the right location. The ESR will wait until all its
// component Sections are written and completed before returning from
// any method (other than the constructor). So we expect to possibly wait
// inside the call to numEdits().
//
EditSectionGroupReader edits = new EditSectionGroupReader(nfs, basename, machineNum, totalMachines);
int numEdits = edits.numEdits();
// If there are edits, then process them.
if (numEdits != 0) {
File mergedEditsFile = new File(sectionDir, "mergedEdits");
edits.mergeSectionComponents(mergedEditsFile);
File sortedEditsFile = new File(mergedEditsFile.getPath() + ".sorted");
// Sort the edits
long startSort = System.currentTimeMillis();
sorter.sort(mergedEditsFile.getPath(), sortedEditsFile.getPath());
long endSort = System.currentTimeMillis();
LOG.info("Processing " + basename + ": Sorted " + numEdits + " instructions in " + ((endSort - startSort) / 1000.0) + " seconds.");
LOG.info("Processing " + basename + ": Sorted " + (numEdits / ((endSort - startSort) / 1000.0)) + " instructions/second");
// Delete old file
nfs.delete(mergedEditsFile);
// Read the sorted edits. That means read all
// the edits from the local subsection of the
// database. We must merge every machine's
// contribution to the edit-list first (which
// also means waiting until each machine has
// completed that step).
// Read the sorted edits
SequenceFile.Reader sortedEdits = new SequenceFile.Reader(nfs, sortedEditsFile.getPath());
// Create a brand-new output db for the integrated data
MapFile.Writer newDb = (comparator == null) ? new MapFile.Writer(nfs, newDbFile.getPath(), keyClass, valueClass) : new MapFile.Writer(nfs, newDbFile.getPath(), comparator, valueClass);
// Iterate through the edits, and merge changes with existing
// db into the brand-new file
oldDb.reset();
// Merge the edits. We did it!
long startMerge = System.currentTimeMillis();
mergeEdits(oldDb, sortedEdits, newDb);
long endMerge = System.currentTimeMillis();
LOG.info("Processing " + basename + ": Merged to new DB containing " + itemsWritten + " records in " + ((endMerge - startMerge) / 1000.0) + " seconds");
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -