?? dispatcher.java
字號(hào):
/* * Created by IntelliJ IDEA. * User: fsommers * Date: Apr 7, 2002 * Time: 10:26:30 PM * To change template for new class use * Code Style | Class Templates options (Tools | IDE Options). */package primecruncher;import net.jxta.peergroup.PeerGroup;import net.jxta.peergroup.PeerGroupFactory;import net.jxta.discovery.DiscoveryService;import net.jxta.discovery.DiscoveryListener;import net.jxta.discovery.DiscoveryEvent;import net.jxta.pipe.PipeService;import net.jxta.pipe.OutputPipe;import net.jxta.pipe.PipeID;import net.jxta.exception.PeerGroupException;import net.jxta.protocol.DiscoveryResponseMsg;import net.jxta.protocol.ModuleSpecAdvertisement;import net.jxta.protocol.PipeAdvertisement;import net.jxta.protocol.PeerAdvertisement;import net.jxta.document.*;import net.jxta.id.IDFactory;import net.jxta.endpoint.Message;import net.jxta.endpoint.MessageElement;import net.jxta.codat.Codat;import java.util.*;import java.io.*;import java.net.URL;import java.net.MalformedURLException;import java.net.UnknownServiceException;/** * This class acts as a client-side proxy for the service. */public class Dispatcher implements DiscoveryListener { private static PeerGroup group; private static DiscoveryService discoSvc; private static PipeService pipeSvc; private static Random random; //Keep cached list of peers that offer the service private HashSet peerCache = new HashSet(); private OutputPipe outputPipe; private HashMap jobMap = new HashMap(); public Dispatcher() { startJxta(); random = new Random(); doDiscovery(); } private void doDiscovery() { System.out.println("Starting service discovery..."); /* System.out.println("Searching local cache for " + ServiceConstants.SPEC_NAME + " advertisements"); Enumeration res = null; try { res = discoSvc.getLocalAdvertisements(DiscoveryService.ADV, "Name", ServiceConstants.SPEC_NAME); } catch (IOException e) { System.out.println("IO Exception."); } if (res != null) { while (res.hasMoreElements()) { //this will be a ModuleSpecAdvertisement //adverts.add(res.nextElement()); } } */ System.out.println("Starting remote discovery..."); discoSvc.getRemoteAdvertisements(null, DiscoveryService.ADV, "Name", ServiceConstants.SPEC_NAME, 1, this); } //create request ID. break down list into smaller lists, according to metrics on each //peer in the cache. Create a new dispatcher job for each peer in a new //thread, associate that list with the request ID. //resigter a result listener with each //each dispatcher job will make a call-back to this class, when (a) //it got the results, or (b) if the timeout for that peer has expired //Once all results are received, we call ResultListener and produce the //result document. // public void processPrimes(int low, int high, ResultListener listener) { //since pending jobs are hashed against a job id, check if no existing job //has the same id String jobID = null; while (jobID == null || (jobMap.containsKey(jobID))) { jobID = new Long(random.nextLong()).toString(); } System.out.println("Allocated job id: " + jobID); PendingJob pendingJob = new PendingJob(jobID, this, listener); jobMap.put(jobID, pendingJob); //TODO: prune the cache here //create array of all known peers PeerInfoBundle[] peers = (PeerInfoBundle[]) peerCache.toArray(new PeerInfoBundle[peerCache.size()]); //TODO: What to do when there are no peers ready to take the job, //i.e., peers.length == 0 //1. Create an empty PendingJob -- it collects all the DispatcherJobs // when a job is ready, it calls its pendingJob. whenver a such a call //arrives, PendingJob checks if all jobs are complete. If so, it calls //this Dispatcher's jobComplete(). JobComplete() then normalizes the results, //and calls result listener with the answer. A PendingJob is hashed againsta JobID. System.out.println("Peers length is " + peers.length); //we include the highest number int segment = high - low + 1; int mod = segment % peers.length; int perPiece = (segment - mod)/peers.length; int l = low; int h = 0; int count = 1; for (int i=0; i < peers.length; i++) { h = l + perPiece -1; if (i == peers.length - 1) { h += mod; } DispatcherJob job = new DispatcherJob(jobID, count, l, h, peers[i], pendingJob, group); Thread thread = new Thread(job); thread.setName("Dispatcher job thread " + i); pendingJob.addJob(job); thread.start(); l += perPiece; count++; } } /** * All computations for a pending job have completed. We're ready. */ void jobComplete(PendingJob job, Result[] subResults) { //normalize results, we're done, Result finalResult = normalizeResults(subResults); //call result listener with result. ResultListener listener = job.getResultListener(); listener.resultEvent(finalResult); jobMap.remove(job.getID()); System.out.println("Removed pending job from job map."); //shell we null out listener? It should destroy all subjobs as well } private Result normalizeResults(Result[] subRes) { //create a string for the result StringBuffer b = new StringBuffer(); for (int i=0; i < subRes.length; i++) { Message mes = subRes[i].getMessage(); MessageElement el = mes.getElement(ServiceConstants.PRIMELIST); b.append(new String(el.getBytesOffset())); } Result finRes = new Result(b.toString()); return finRes; } private void pruneCache() { //do some cache management here - eliminate stale adverts HashSet adCopy = null; synchronized(peerCache) { adCopy = (HashSet)peerCache.clone(); } //do some pruning here long currentTime = System.currentTimeMillis(); Iterator it = adCopy.iterator(); while (it.hasNext()) { ModuleSpecAdvertisement ad = (ModuleSpecAdvertisement)it.next(); if (ad.getLocalExpirationTime() < currentTime + (2 * 60 * 1000)) { adCopy.remove(ad); } } //REMIND: Is this OK? peerCache = adCopy; } private void startJxta() { try { group = PeerGroupFactory.newNetPeerGroup(); discoSvc = group.getDiscoveryService(); pipeSvc = group.getPipeService(); } catch (PeerGroupException e) { System.out.println("Can't create net peer group: " + e.getMessage()); System.exit(-1); } } public void discoveryEvent(DiscoveryEvent event) { System.out.println("DiscoveryEvent called"); DiscoveryResponseMsg mes = event.getResponse(); String padv = mes.getPeerAdv(); PeerAdvertisement peerAdv = null; try { peerAdv = (PeerAdvertisement) AdvertisementFactory.newAdvertisement( new MimeMediaType("text/xml"), new ByteArrayInputStream(padv.getBytes())); } catch (IOException ex) { ex.printStackTrace(); } //get the ModuleSpecAdvs from this ModuleSpecAdvertisement moduleSpecAd = null; Enumeration en = mes.getResponses(); try { //REMIND: can there be many??? while (en.hasMoreElements()) { String st = (String)en.nextElement(); moduleSpecAd = (ModuleSpecAdvertisement)AdvertisementFactory.newAdvertisement( new MimeMediaType("text/xml"), new ByteArrayInputStream(st.getBytes())); } } catch (IOException e) { e.printStackTrace(); } if (peerAdv != null && moduleSpecAd != null) { PeerInfoBundle bundle = new PeerInfoBundle(peerAdv, moduleSpecAd); if (!peerCache.contains(bundle)) { peerCache.add(bundle); System.out.println("Discovered peer, added to cache"); } else { System.out.println("Discovered peer, but it's already cached."); } } }}
?? 快捷鍵說(shuō)明
復(fù)制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號(hào)
Ctrl + =
減小字號(hào)
Ctrl + -