?? dispatcherjob.java
字號:
/* * Created by IntelliJ IDEA. * User: fsommers * Date: Apr 10, 2002 * Time: 10:00:18 PM * To change template for new class use * Code Style | Class Templates options (Tools | IDE Options). */package primecruncher;import net.jxta.endpoint.Message;import net.jxta.endpoint.MessageElement;import net.jxta.pipe.*;import net.jxta.peergroup.PeerGroup;import net.jxta.protocol.PipeAdvertisement;import net.jxta.protocol.ModuleSpecAdvertisement;import net.jxta.document.*;import net.jxta.id.IDFactory;import java.util.ArrayList;import java.util.HashMap;import java.util.Enumeration;import java.io.PrintWriter;import java.io.IOException;import java.io.InputStream;/** * An instance of this object is created for each client request. This object * does the following: * <ol> * <li>Extract the high, low numbers of the list of primes to be produced. DONE</li> * <li>Divide the list into subtasks, one subtask corresponding to each peer * specified in the peers array of the constructor. How the work is divided * is handled by an instance of <code>JobPartitioner</code>, which returns * an array of <code>StructuredTextDocument</code> s corresponding to a piece of the list * to be computed by each peer.</li> * <li>Create an input pipe for the results. Add that pipe's advertisement to the * <code>StructuredTextDocument</code>s produced by <code>JobPartitioner</code>.</li> * <li>For each peer, create a Message that contains the * <code>StructuredTextDocument</code>. Assign a message ID to each message.</li> * <li>Send that message to the peer, and mark when the message was sent. * <li>When a response arrives via the pipe, mark the message corresponding to that result * as being answered.</li> * <li>When all messages are answered, sort the sublists to a master list. Create a * result document, and call the <code>ResultListener</code> with that message.</li> * </ul> * REMIND: What to do if not all the requests are anwered before the timeout? */class DispatcherJob implements Runnable, PipeMsgListener, OutputPipeListener, Comparable { private int low = 0; private int high = 0; private String jobID; private PeerInfoBundle peerInfoBundle; private PendingJob pendingJob; private int count = 0; private PeerGroup group; private static PipeService pService = null; private InputPipe inPipe = null; private PipeAdvertisement inPipeAdv = null; /** * Start a new job for a compute request. * * @param request this document contains the HIGH, LOW, and JobID * @param peers the working set of peers * @param listener what to notify when we have all the results in * @param our current peer group */ public DispatcherJob(String jobID, int count, int low, int high, PeerInfoBundle peerInfoBundle, PendingJob job, PeerGroup group) { this.high = high; this.low = low; this.jobID = jobID; this.count = count; this.group = group; this.peerInfoBundle = peerInfoBundle; this.pendingJob = job; pService = group.getPipeService(); } /** * Create a pipe advert for an input pipe so that the other side can get back to us * Open an outpipe into each peer, and send that Message * The peer would then send a message back with the response via the input pipe * When we get an event from the input pipe, we process it, then call call * pending job's gotResult */ public void run() { System.out.println("Starting dispatcher job execution with low=" + low + " and high=" + high); //create input pipe that the other peer will use to send back a result //add ourselves as a listener on that pipe inPipeAdv = (PipeAdvertisement) AdvertisementFactory.newAdvertisement( PipeAdvertisement.getAdvertisementType()); final PipeService pService = group.getPipeService(); PipeID pid = IDFactory.newPipeID(group.getPeerGroupID()); inPipeAdv.setPipeID(pid); try { inPipe = pService.createInputPipe(inPipeAdv, this); //open a pipe to the other peer, create a request message, //add this peer's pipe adv to that message, and send that message ModuleSpecAdvertisement adv = peerInfoBundle.getModuleSpecAdvertisement(); PipeAdvertisement otherPipe = adv.getPipeAdvertisement(); pService.createOutputPipe(otherPipe, this); } catch (IOException e) { System.out.println("An IO Exception occured:" + e.getMessage()); e.printStackTrace(); } //pendingJob.gotResult(this, result); } public void outputPipeEvent(OutputPipeEvent event) { System.out.println("Connected to other peer's pipe"); try { Message message = pService.createMessage(); //add the high, low, and the input pipe advert to the message StructuredTextDocument doc = (StructuredTextDocument) inPipeAdv.getDocument(new MimeMediaType("text/xml")); InputStream is = doc.getStream(); MessageElement advElement = message.newMessageElement( ServiceConstants.PIPEADV, new MimeMediaType("text/xml"), is); MessageElement highNum = message.newMessageElement(ServiceConstants.HIGH_INT, new MimeMediaType("text/plain"), new Integer(high).toString().getBytes()); MessageElement lowNum = message.newMessageElement(ServiceConstants.LOW_INT, new MimeMediaType("text/plain"), new Integer(low).toString().getBytes()); MessageElement idEl = message.newMessageElement(ServiceConstants.JOBID, new MimeMediaType("text/plain"), jobID.getBytes()); message.addElement(advElement); message.addElement(highNum); message.addElement(lowNum); message.addElement(idEl); event.getOutputPipe().send(message); System.out.println("Sent the message"); } catch (IOException e) { e.printStackTrace(); } } public int compareTo(Object o) { if (o instanceof DispatcherJob) { DispatcherJob other = (DispatcherJob)o; if (count < other.count) return -1; else if (count > other.count) return 1; } return 0; } public void pipeMsgEvent(PipeMsgEvent event) { //here's where we listen for results coming from other peers System.out.println("Received a message!"); Message mes = event.getMessage(); pendingJob.gotResult(this, new Result(mes)); }}
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -