?? stable.java
字號:
// $Id: STABLE.java,v 1.46 2006/05/17 10:54:38 belaban Exp $package org.jgroups.protocols.pbcast;import org.jgroups.*;import org.jgroups.stack.Protocol;import org.jgroups.util.Streamable;import org.jgroups.util.TimeScheduler;import org.jgroups.util.Util;import java.io.*;import java.util.Iterator;import java.util.Map;import java.util.Properties;import java.util.Vector;/** * Computes the broadcast messages that are stable; i.e., have been received by all members. Sends * STABLE events up the stack when this is the case. This allows NAKACK to garbage collect messages that * have been seen by all members.<p> * Works as follows: periodically we mcast our highest seqnos (seen for each member) to the group. * A stability vector, which maintains the highest seqno for each member and initially contains no data, * is updated when such a message is received. The entry for a member P is computed set to * min(entry[P], digest[P]). When messages from all members have been received, a stability * message is mcast, which causes all members to send a STABLE event up the stack (triggering garbage collection * in the NAKACK layer).<p> * The stable task now terminates after max_num_gossips if no messages or view changes have been sent or received * in the meantime. It will resume when messages are received. This effectively suspends sending superfluous * STABLE messages in the face of no activity.<br/> * New: when <code>max_bytes</code> is exceeded (unless disabled by setting it to 0), * a STABLE task will be started (unless it is already running). * @author Bela Ban */public class STABLE extends Protocol { Address local_addr=null; final Vector mbrs=new Vector(); final Digest digest=new Digest(10); // keeps track of the highest seqnos from all members final Digest latest_local_digest=new Digest(10); // keeps track of the latest digests received from NAKACK final Vector heard_from=new Vector(); // keeps track of who we already heard from (STABLE_GOSSIP msgs) /** Sends a STABLE gossip every 20 seconds on average. 0 disables gossipping of STABLE messages */ long desired_avg_gossip=20000; /** delay before we send STABILITY msg (give others a change to send first). This should be set to a very * small number (> 0 !) if <code>max_bytes</code> is used */ long stability_delay=6000; private StabilitySendTask stability_task=null; final Object stability_mutex=new Object(); // to synchronize on stability_task private volatile StableTask stable_task=null; // bcasts periodic STABLE message (added to timer below) final Object stable_task_mutex=new Object(); // to sync on stable_task TimeScheduler timer=null; // to send periodic STABLE msgs (and STABILITY messages) static final String name="STABLE"; /** Total amount of bytes from incoming messages (default = 0 = disabled). When exceeded, a STABLE * message will be broadcast and <code>num_bytes_received</code> reset to 0 . If this is > 0, then ideally * <code>stability_delay</code> should be set to a low number as well */ long max_bytes=0; /** The total number of bytes received from unicast and multicast messages */ long num_bytes_received=0; /** When true, don't take part in garbage collection protocol: neither send STABLE messages nor * handle STABILITY messages */ boolean suspended=false; boolean initialized=false; private ResumeTask resume_task=null; final Object resume_task_mutex=new Object(); /** Number of gossip messages */ int num_gossips=0; private static final long MAX_SUSPEND_TIME=200000; public String getName() { return name; } public long getDesiredAverageGossip() { return desired_avg_gossip; } public void setDesiredAverageGossip(long gossip_interval) { desired_avg_gossip=gossip_interval; } public long getMaxBytes() { return max_bytes; } public void setMaxBytes(long max_bytes) { this.max_bytes=max_bytes; } public int getNumberOfGossipMessages() {return num_gossips;} public void resetStats() { super.resetStats(); num_gossips=0; } public Vector requiredDownServices() { Vector retval=new Vector(); retval.addElement(new Integer(Event.GET_DIGEST_STABLE)); // NAKACK layer return retval; } public boolean setProperties(Properties props) { String str; super.setProperties(props); str=props.getProperty("digest_timeout"); if(str != null) { props.remove("digest_timeout"); log.error("digest_timeout has been deprecated; it will be ignored"); } str=props.getProperty("desired_avg_gossip"); if(str != null) { desired_avg_gossip=Long.parseLong(str); props.remove("desired_avg_gossip"); } str=props.getProperty("stability_delay"); if(str != null) { stability_delay=Long.parseLong(str); props.remove("stability_delay"); } str=props.getProperty("max_gossip_runs"); if(str != null) { props.remove("max_gossip_runs"); log.error("max_gossip_runs has been deprecated and will be ignored"); } str=props.getProperty("max_bytes"); if(str != null) { max_bytes=Long.parseLong(str); props.remove("max_bytes"); } str=props.getProperty("max_suspend_time"); if(str != null) { log.error("max_suspend_time is not supported any longer; please remove it (ignoring it)"); props.remove("max_suspend_time"); } if(props.size() > 0) { log.error("these properties are not recognized: " + props); return false; } return true; } private void suspend(long timeout) { if(!suspended) { suspended=true; if(log.isDebugEnabled()) log.debug("suspending message garbage collection"); } startResumeTask(timeout); // will not start task if already running } private void resume() { resetDigest(mbrs); // start from scratch suspended=false; if(log.isDebugEnabled()) log.debug("resuming message garbage collection"); stopResumeTask(); } public void start() throws Exception { if(stack != null && stack.timer != null) timer=stack.timer; else throw new Exception("timer cannot be retrieved from protocol stack"); if(desired_avg_gossip > 0) startStableTask(); } public void stop() { stopStableTask(); clearDigest(); } public void up(Event evt) { Message msg; StableHeader hdr; int type=evt.getType(); switch(type) { case Event.MSG: msg=(Message)evt.getArg(); // only if message counting is enabled, and only for multicast messages // fixes http://jira.jboss.com/jira/browse/JGRP-233 if(max_bytes > 0) { Address dest=msg.getDest(); if(dest == null || dest.isMulticastAddress()) { num_bytes_received+=(long)Math.max(msg.getLength(), 24); if(num_bytes_received >= max_bytes) { if(trace) { log.trace(new StringBuffer("max_bytes has been reached (").append(max_bytes). append(", bytes received=").append(num_bytes_received).append("): triggers stable msg")); } num_bytes_received=0; // asks the NAKACK protocol for the current digest, reply event is GET_DIGEST_STABLE_OK (arg=digest) passDown(new Event(Event.GET_DIGEST_STABLE)); } } } hdr=(StableHeader)msg.removeHeader(name); if(hdr == null) break; switch(hdr.type) { case StableHeader.STABLE_GOSSIP: handleStableMessage(msg.getSrc(), hdr.stableDigest); break; case StableHeader.STABILITY: handleStabilityMessage(hdr.stableDigest, msg.getSrc()); break; default: if(log.isErrorEnabled()) log.error("StableHeader type " + hdr.type + " not known"); } return; // don't pass STABLE or STABILITY messages up the stack case Event.GET_DIGEST_STABLE_OK: Digest d=(Digest)evt.getArg(); synchronized(latest_local_digest) { latest_local_digest.replace(d); } if(trace) log.trace("setting latest_local_digest from NAKACK: " + d.printHighSeqnos()); sendStableMessage(d); break; case Event.VIEW_CHANGE: View view=(View)evt.getArg(); handleViewChange(view); break; case Event.SET_LOCAL_ADDRESS: local_addr=(Address)evt.getArg(); break; } passUp(evt); } public void down(Event evt) { switch(evt.getType()) { case Event.VIEW_CHANGE: View v=(View)evt.getArg(); handleViewChange(v); break; case Event.SUSPEND_STABLE: long timeout=0; Object t=evt.getArg(); if(t != null && t instanceof Long) timeout=((Long)t).longValue(); suspend(timeout); break; case Event.RESUME_STABLE: resume(); break; } passDown(evt); } public void runMessageGarbageCollection() { Digest copy; synchronized(digest) { copy=digest.copy(); } sendStableMessage(copy); } /* --------------------------------------- Private Methods ---------------------------------------- */ private void handleViewChange(View v) { Vector tmp=v.getMembers(); mbrs.clear(); mbrs.addAll(tmp); adjustSenders(digest, tmp); adjustSenders(latest_local_digest, tmp); resetDigest(tmp); if(!initialized) initialized=true; } /** Digest and members are guaranteed to be non-null */ private static void adjustSenders(Digest d, Vector members) { synchronized(d) { // 1. remove all members from digest who are not in the view Iterator it=d.senders.keySet().iterator(); Address mbr; while(it.hasNext()) { mbr=(Address)it.next(); if(!members.contains(mbr)) it.remove(); } // 2. add members to digest which are in the new view but not in the digest for(int i=0; i < members.size(); i++) { mbr=(Address)members.get(i); if(!d.contains(mbr)) d.add(mbr, -1, -1); } } } private void clearDigest() { synchronized(digest) { digest.clear(); } } /** Update my own digest from a digest received by somebody else. Returns whether the update was successful. * Needs to be called with a lock on digest */ private boolean updateLocalDigest(Digest d, Address sender) { if(d == null || d.size() == 0) return false; if(!initialized) { if(trace) log.trace("STABLE message will not be handled as I'm not yet initialized"); return false; } if(!digest.sameSenders(d)) { if(trace) log.trace(new StringBuffer("received a digest ").append(d.printHighSeqnos()).append(" from "). append(sender).append(" which has different members than mine ("). append(digest.printHighSeqnos()).append("), discarding it and resetting heard_from list")); // to avoid sending incorrect stability/stable msgs, we simply reset our heard_from list, see DESIGN resetDigest(mbrs); return false; } StringBuffer sb=null; if(trace) sb=new StringBuffer("my [").append(local_addr).append("] digest before: ").append(digest). append("\ndigest from ").append(sender).append(": ").append(d); Address mbr; long highest_seqno, my_highest_seqno, new_highest_seqno; long highest_seen_seqno, my_highest_seen_seqno, new_highest_seen_seqno; Map.Entry entry; org.jgroups.protocols.pbcast.Digest.Entry val; for(Iterator it=d.senders.entrySet().iterator(); it.hasNext();) { entry=(Map.Entry)it.next(); mbr=(Address)entry.getKey(); val=(org.jgroups.protocols.pbcast.Digest.Entry)entry.getValue(); highest_seqno=val.high_seqno; highest_seen_seqno=val.high_seqno_seen; // compute the minimum of the highest seqnos deliverable (for garbage collection) my_highest_seqno=digest.highSeqnoAt(mbr); // compute the maximum of the highest seqnos seen (for retransmission of last missing message) my_highest_seen_seqno=digest.highSeqnoSeenAt(mbr); new_highest_seqno=Math.min(my_highest_seqno, highest_seqno); new_highest_seen_seqno=Math.max(my_highest_seen_seqno, highest_seen_seqno); digest.setHighestDeliveredAndSeenSeqnos(mbr, new_highest_seqno, new_highest_seen_seqno); } if(trace) { sb.append("\nmy [").append(local_addr).append("] digest after: ").append(digest).append("\n"); log.trace(sb); } return true; } private void resetDigest(Vector new_members) { if(new_members == null || new_members.size() == 0) return; synchronized(heard_from) { heard_from.clear(); heard_from.addAll(new_members); } Digest copy_of_latest; synchronized(latest_local_digest) { copy_of_latest=latest_local_digest.copy(); } synchronized(digest) { digest.replace(copy_of_latest); if(trace) log.trace("resetting digest from NAKACK: " + copy_of_latest.printHighSeqnos()); } } /** * Removes mbr from heard_from and returns true if this was the last member, otherwise false. * Resets the heard_from list (populates with membership) * @param mbr */ private boolean removeFromHeardFromList(Address mbr) { synchronized(heard_from) { heard_from.remove(mbr); if(heard_from.size() == 0) { resetDigest(this.mbrs); return true; } } return false; } void startStableTask() { // Here, double-checked locking works: we don't want to synchronize if the task already runs (which is the case // 99% of the time). If stable_task gets nulled after the condition check, we return anyways, but just miss // 1 cycle: on the next message or view, we will start the task if(stable_task != null) return; synchronized(stable_task_mutex) { if(stable_task != null && stable_task.running()) { return; // already running } stable_task=new StableTask(); timer.add(stable_task, true); // fixed-rate scheduling } if(trace)
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -