?? distributedwebdbwriter.java
字號(hào):
/* Copyright (c) 2003 The Nutch Organization. All rights reserved. */
/* Use subject to the conditions in http://www.nutch.org/LICENSE.txt. */
package net.nutch.db;
import java.io.*;
import java.util.*;
import java.util.logging.*;
import java.nio.channels.*;
import net.nutch.io.*;
import net.nutch.fs.*;
import net.nutch.util.*;
import net.nutch.pagedb.*;
import net.nutch.linkdb.*;
/***************************************************
* This is a wrapper class that allows us to reorder
* write operations to the linkdb and pagedb. It is
* useful only for objects like UpdateDatabaseTool,
* which just does writes.
*
* The WebDBWriter is a traditional single-pass database writer.
* It does not cache any instructions to disk (but it does
* in memory, with possible resorting). It certainly does
* nothing in a distributed fashion.
*
* There are other implementors of IWebDBWriter that do
* all that fancy stuff.
*
* @author Mike Cafarella
*************************************************/
public class DistributedWebDBWriter implements IWebDBWriter {
static final Logger LOG = LogFormatter.getLogger("net.nutch.db.WebDBWriter");
static final byte CUR_VERSION = 0;
static final byte OPEN_COUNTER_VERSION = 0;
static final byte CLOSE_COUNTER_VERSION = 0;
static final byte MACHINE_INFO_VERSION = 0;
// magic number
static int READY_TO_USE = 0xbabecafe;
static int IS_COMPLETE = 0xbabe0000;
static int WRITE_LOCK_INFO = 0xcafe0000;
static long LONG_TIMEOUT = 10 * 1000;
// db opcodes
static final byte ADD_PAGE = 0;
static final byte ADD_PAGE_WITH_SCORE = 1;
static final byte ADD_PAGE_IFN_PRESENT = 2;
static final byte DEL_PAGE = 3;
static final int ADD_LINK = 0;
static final int DEL_LINK = 1;
static final int DEL_SINGLE_LINK = 2;
// filenames
static final String PAGES_BY_URL = "pagesByURL";
static final String PAGES_BY_MD5 = "pagesByMD5";
static final String LINKS_BY_URL = "linksByURL";
static final String LINKS_BY_MD5 = "linksByMD5";
static final String STATS_FILE = "stats";
static final String META_SHAREGROUP = "metashare";
static final String METAINFO = "metainfo";
// Result codes for page-url comparisons
static final int NO_OUTLINKS = 0;
static final int HAS_OUTLINKS = 1;
static final int LINK_INVALID = 2;
/********************************************
* PageInstruction holds an operation over a Page.
*********************************************/
public static class PageInstruction implements WritableComparable {
byte opcode;
boolean hasLink;
Page page;
Link link;
/**
*/
public PageInstruction() {}
/**
*/
public PageInstruction(Page page, int opcode) {
set(page, opcode);
}
/**
*/
public PageInstruction(Page page, Link link, int opcode) {
set(page, link, opcode);
}
/**
* Init from another PageInstruction object.
*/
public void set(PageInstruction that) {
this.opcode = that.opcode;
if (this.page == null) {
this.page = new Page();
}
this.page.set(that.page);
if (this.link == null) {
this.link = new Link();
}
this.hasLink = that.hasLink;
if (this.hasLink) {
this.link.set(that.link);
}
}
/**
* Init PageInstruction with no Link
*/
public void set(Page page, int opcode) {
this.opcode = (byte) opcode;
this.page = page;
this.hasLink = false;
this.link = null;
}
/**
* Init PageInstruction with a Link
*/
public void set(Page page, Link link, int opcode) {
this.opcode = (byte) opcode;
this.page = page;
this.hasLink = true;
this.link = link;
}
//
// WritableComparable
//
public int compareTo(Object o) {
int pageResult = this.page.compareTo(((PageInstruction) o).page);
if (pageResult != 0) {
return pageResult;
} else {
return this.opcode - (((PageInstruction) o).opcode);
}
}
public void write(DataOutput out) throws IOException {
out.writeByte(opcode);
page.write(out);
out.writeByte(hasLink ? 1 : 0);
if (hasLink) {
link.write(out);
}
}
public void readFields(DataInput in) throws IOException {
opcode = in.readByte();
if (page == null) {
page = new Page();
}
page.readFields(in);
if (link == null) {
link = new Link();
}
hasLink = (1 == in.readByte());
if (hasLink) {
link.readFields(in);
}
}
public Page getPage() {
return page;
}
public Link getLink() {
if (hasLink) {
return link;
} else {
return null;
}
}
public int getInstruction() {
return opcode;
}
/**
* Sorts the instruction first by Page, then by opcode.
*/
public static class PageComparator extends WritableComparator {
private static final Page.Comparator PAGE_COMPARATOR =
new Page.Comparator();
public PageComparator() { super(PageInstruction.class); }
/** Optimized comparator. */
public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
int opcode1 = b1[s1];
int opcode2 = b2[s2];
int c = PAGE_COMPARATOR.compare(b1, s1+1, l1-1, b2, s2+1, l2-1);
if (c != 0)
return c;
return opcode1 - opcode2;
}
}
/*****************************************************
* Sorts the instruction first by url, then by opcode.
*****************************************************/
public static class UrlComparator extends WritableComparator {
private static final Page.UrlComparator PAGE_COMPARATOR =
new Page.UrlComparator();
public UrlComparator() { super(PageInstruction.class); }
/**
* We need to sort by ordered URLs. First, we sort by
* URL, then by opcode.
*/
public int compare(WritableComparable a, WritableComparable b) {
PageInstruction instructionA = (PageInstruction)a;
PageInstruction instructionB = (PageInstruction)b;
Page pageA = instructionA.getPage();
Page pageB = instructionB.getPage();
int result = pageA.getURL().compareTo(pageB.getURL());
if (result != 0) {
return result;
} else {
return instructionA.opcode - instructionB.opcode;
}
}
/**
* Optimized comparator.
*/
public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
int opcode1 = b1[s1];
int opcode2 = b2[s2];
int c = PAGE_COMPARATOR.compare(b1, s1+1, l1-1, b2, s2+1, l2-1);
if (c != 0)
return c;
return opcode1 - opcode2;
}
}
}
/********************************************************
* PageInstructionWriter very efficiently writes a
* PageInstruction to an EditSectionGroupWriter. Much better
* than calling "writer.append(new PageInstruction())"
********************************************************/
public static class PageInstructionWriter {
PageInstruction pi = new PageInstruction();
/**
*/
public PageInstructionWriter() {
}
/**
* Append the PageInstruction info to the indicated SequenceFile,
* and keep the PI for later reuse.
*/
public synchronized void appendInstructionInfo(EditSectionGroupWriter writer, Page page, int opcode, Writable val) throws IOException {
pi.set(page, opcode);
writer.append(pi, val);
}
/**
* Append the PageInstruction info to the indicated SequenceFile,
* and keep the PI for later reuse.
*/
public synchronized void appendInstructionInfo(EditSectionGroupWriter writer, Page page, Link link, int opcode, Writable val) throws IOException {
pi.set(page, link, opcode);
writer.append(pi, val);
}
}
/*************************************************************
* Reduce multiple instructions for a given url to the single effective
* instruction. ADD is prioritized highest, then ADD_IFN_PRESENT, and then
* DEL. Not coincidentally, this is opposite the order they're sorted in.
**************************************************************/
private static class DeduplicatingPageSequenceReader {
SequenceFile.Reader edits;
PageInstruction current = new PageInstruction();
UTF8 currentUrl = new UTF8();
boolean haveCurrent;
/**
*/
public DeduplicatingPageSequenceReader(SequenceFile.Reader edits) throws IOException {
this.edits = edits;
this.haveCurrent = edits.next(current, NullWritable.get());
}
?? 快捷鍵說明
復(fù)制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號(hào)
Ctrl + =
減小字號(hào)
Ctrl + -