?? stable.java
字號:
log.trace("stable task started"); } void stopStableTask() { // contrary to startStableTask(), we don't need double-checked locking here because this method is not // called frequently synchronized(stable_task_mutex) { if(stable_task != null) { stable_task.stop(); stable_task=null; } } } void startResumeTask(long max_suspend_time) { max_suspend_time=(long)(max_suspend_time * 1.1); // little slack if(max_suspend_time <= 0) max_suspend_time=MAX_SUSPEND_TIME; synchronized(resume_task_mutex) { if(resume_task != null && resume_task.running()) { return; // already running } else { resume_task=new ResumeTask(max_suspend_time); timer.add(resume_task, true); // fixed-rate scheduling } } if(log.isDebugEnabled()) log.debug("resume task started, max_suspend_time=" + max_suspend_time); } void stopResumeTask() { synchronized(resume_task_mutex) { if(resume_task != null) { resume_task.stop(); resume_task=null; } } } void startStabilityTask(Digest d, long delay) { synchronized(stability_mutex) { if(stability_task != null && stability_task.running()) { } else { stability_task=new StabilitySendTask(d, delay); // runs only once timer.add(stability_task, true); } } } void stopStabilityTask() { synchronized(stability_mutex) { if(stability_task != null) { stability_task.stop(); stability_task=null; } } } /** Digest d contains (a) the highest seqnos <em>deliverable</em> for each sender and (b) the highest seqnos <em>seen</em> for each member. (Difference: with 1,2,4,5, the highest seqno seen is 5, whereas the highest seqno deliverable is 2). The minimum of all highest seqnos deliverable will be taken to send a stability message, which results in garbage collection of messages lower than the ones in the stability vector. The maximum of all seqnos will be taken to trigger possible retransmission of last missing seqno (see DESIGN for details). */ private void handleStableMessage(Address sender, Digest d) { if(d == null || sender == null) { if(log.isErrorEnabled()) log.error("digest or sender is null"); return; } if(!initialized) { if(trace) log.trace("STABLE message will not be handled as I'm not yet initialized"); return; } if(suspended) { if(trace) log.trace("STABLE message will not be handled as I'm suspended"); return; } if(trace) log.trace(new StringBuffer("received stable msg from ").append(sender).append(": ").append(d.printHighSeqnos())); if(!heard_from.contains(sender)) { // already received gossip from sender; discard it if(trace) log.trace("already received stable msg from " + sender); return; } Digest copy; synchronized(digest) { boolean success=updateLocalDigest(d, sender); if(!success) // we can only remove the sender from heard_from if *all* elements of my digest were updated return; copy=digest.copy(); } boolean was_last=removeFromHeardFromList(sender); if(was_last) { sendStabilityMessage(copy); } } /** * Bcasts a STABLE message of the current digest to all members. Message contains highest seqnos of all members * seen by this member. Highest seqnos are retrieved from the NAKACK layer below. * @param d A <em>copy</em> of this.digest */ private void sendStableMessage(Digest d) { if(suspended) { if(trace) log.trace("will not send STABLE message as I'm suspended"); return; } if(d != null && d.size() > 0) { if(trace) log.trace("sending stable msg " + d.printHighSeqnos()); Message msg=new Message(); // mcast message StableHeader hdr=new StableHeader(StableHeader.STABLE_GOSSIP, d); msg.putHeader(name, hdr); num_gossips++; passDown(new Event(Event.MSG, msg)); } } /** Schedules a stability message to be mcast after a random number of milliseconds (range 1-5 secs). The reason for waiting a random amount of time is that, in the worst case, all members receive a STABLE_GOSSIP message from the last outstanding member at the same time and would therefore mcast the STABILITY message at the same time too. To avoid this, each member waits random N msecs. If, before N elapses, some other member sent the STABILITY message, we just cancel our own message. If, during waiting for N msecs to send STABILITY message S1, another STABILITY message S2 is to be sent, we just discard S2. @param tmp A copy of te stability digest, so we don't need to copy it again */ void sendStabilityMessage(Digest tmp) { long delay; if(suspended) { if(trace) log.trace("STABILITY message will not be sent as I'm suspended"); return; } // give other members a chance to mcast STABILITY message. if we receive STABILITY by the end of // our random sleep, we will not send the STABILITY msg. this prevents that all mbrs mcast a // STABILITY msg at the same time delay=Util.random(stability_delay); startStabilityTask(tmp, delay); } void handleStabilityMessage(Digest d, Address sender) { if(d == null) { if(log.isErrorEnabled()) log.error("stability digest is null"); return; } if(!initialized) { if(trace) log.trace("STABLE message will not be handled as I'm not yet initialized"); return; } if(suspended) { if(log.isDebugEnabled()) { log.debug("stability message will not be handled as I'm suspended"); } return; } if(trace) log.trace(new StringBuffer("received stability msg from ").append(sender).append(": ").append(d.printHighSeqnos())); stopStabilityTask(); // we won't handle the gossip d, if d's members don't match the membership in my own digest, // this is part of the fix for the NAKACK problem (bugs #943480 and #938584) if(!this.digest.sameSenders(d)) { if(log.isDebugEnabled()) { log.debug("received digest (digest=" + d + ") which does not match my own digest ("+ this.digest + "): ignoring digest and re-initializing own digest"); } return; } resetDigest(mbrs); // pass STABLE event down the stack, so NAKACK can garbage collect old messages passDown(new Event(Event.STABLE, d)); } /* ------------------------------------End of Private Methods ------------------------------------- */ public static class StableHeader extends Header implements Streamable { public static final int STABLE_GOSSIP=1; public static final int STABILITY=2; int type=0; // Digest digest=new Digest(); // used for both STABLE_GOSSIP and STABILITY message Digest stableDigest=null; // changed by Bela April 4 2004 public StableHeader() { } // used for externalizable public StableHeader(int type, Digest digest) { this.type=type; this.stableDigest=digest; } static String type2String(int t) { switch(t) { case STABLE_GOSSIP: return "STABLE_GOSSIP"; case STABILITY: return "STABILITY"; default: return "<unknown>"; } } public String toString() { StringBuffer sb=new StringBuffer(); sb.append('['); sb.append(type2String(type)); sb.append("]: digest is "); sb.append(stableDigest); return sb.toString(); } public void writeExternal(ObjectOutput out) throws IOException { out.writeInt(type); if(stableDigest == null) { out.writeBoolean(false); return; } out.writeBoolean(true); stableDigest.writeExternal(out); } public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { type=in.readInt(); boolean digest_not_null=in.readBoolean(); if(digest_not_null) { stableDigest=new Digest(); stableDigest.readExternal(in); } } public long size() { long retval=Global.INT_SIZE + Global.BYTE_SIZE; // type + presence for digest if(stableDigest != null) retval+=stableDigest.serializedSize(); return retval; } public void writeTo(DataOutputStream out) throws IOException { out.writeInt(type); Util.writeStreamable(stableDigest, out); } public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException { type=in.readInt(); stableDigest=(Digest)Util.readStreamable(Digest.class, in); } } /** Mcast periodic STABLE message. Interval between sends varies. Terminates after num_gossip_runs is 0. However, UP or DOWN messages will reset num_gossip_runs to max_gossip_runs. This has the effect that the stable_send task terminates only after a period of time within which no messages were either sent or received */ private class StableTask implements TimeScheduler.Task { boolean stopped=false; public void stop() { stopped=true; } public boolean running() { // syntactic sugar return !stopped; } public boolean cancelled() { return stopped; } public long nextInterval() { long interval=computeSleepTime(); if(interval <= 0) return 10000; else return interval; } public void run() { if(suspended) { if(trace) log.trace("stable task will not run as suspended=" + suspended); return; } // asks the NAKACK protocol for the current digest, reply event is GET_DIGEST_STABLE_OK (arg=digest) passDown(new Event(Event.GET_DIGEST_STABLE)); } long computeSleepTime() { return getRandom((mbrs.size() * desired_avg_gossip * 2)); } long getRandom(long range) { return (long)((Math.random() * range) % range); } } /** * Multicasts a STABILITY message. */ private class StabilitySendTask implements TimeScheduler.Task { Digest d=null; boolean stopped=false; long delay=2000; StabilitySendTask(Digest d, long delay) { this.d=d; this.delay=delay; } public boolean running() { return !stopped; } public void stop() { stopped=true; } public boolean cancelled() { return stopped; } /** wait a random number of msecs (to give other a chance to send the STABILITY msg first) */ public long nextInterval() { return delay; } public void run() { Message msg; StableHeader hdr; if(suspended) { if(log.isDebugEnabled()) { log.debug("STABILITY message will not be sent as suspended=" + suspended); } stopped=true; return; } if(d != null && !stopped) { msg=new Message(); hdr=new StableHeader(StableHeader.STABILITY, d); msg.putHeader(STABLE.name, hdr); if(trace) log.trace("sending stability msg " + d.printHighSeqnos()); passDown(new Event(Event.MSG, msg)); d=null; } stopped=true; // run only once } } private class ResumeTask implements TimeScheduler.Task { boolean running=true; long max_suspend_time=0; ResumeTask(long max_suspend_time) { this.max_suspend_time=max_suspend_time; } void stop() { running=false; } public boolean running() { return running; } public boolean cancelled() { return running == false; } public long nextInterval() { return max_suspend_time; } public void run() { if(suspended) log.warn("ResumeTask resumed message garbage collection - this should be done by a RESUME_STABLE event; " + "check why this event was not received (or increase max_suspend_time for large state transfers)"); resume(); } }}
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -