?? stable.java
字號:
// $Id: STABLE.java,v 1.11 2005/08/11 12:43:47 belaban Exp $package org.jgroups.protocols;import org.jgroups.*;import org.jgroups.blocks.GroupRequest;import org.jgroups.blocks.MethodCall;import org.jgroups.stack.RpcProtocol;import org.jgroups.util.TimeScheduler;import org.jgroups.util.Util;import java.util.Properties;import java.util.Vector;/** * Computes the broadcast messages that are stable; i.e., that have been received * by all members. Sends STABLE events up the stack when this is the case. * Uses a probabilistic scheme to do so, as described in:<br> * GSGC: An Efficient Gossip-Style Garbage Collection Scheme for Scalable * Reliable Multicast, K. Guo et al., 1997. * <p> * The only difference is that instead of using counters for an estimation of * messages received from each member, we retrieve this actual information * from the NAKACK layer (which must be present for the STABLE protocol to * work). * <p> * Note: the the <tt>Event.MSG</tt> call path path must be as lightweight as * possible. It should not request any lock for which there is a high * contention and/or long delay. * <p> * <pre> * Changes(igeorg - 2.VI.2001): * i. Thread-safety (in RPC calls most notably on the lines of Gianluca * Collot's bugfix) * ii. All slow calls (RPCs, seqnos requests, etc.) placed outside locks * iii. Removed redundant initialization in adaptation to a higher round * iv. heard_from[this meber] is always set to true on every new round * (i.e. on every stability bcast). * v. Replaced gossip thread with <tt>TimeScheduler.Task</tt> * </pre> * <p> * [[[ TODO(igeorg - 2.VI.2001) * i. Faster stability convergence by better selection of gossip subsets * (replace Util.pickSubset()). * ii. Special mutex on the <tt>Event.MSG</tt> call path. I.e. remove * <tt>synchronized(this)</t>> with e.g. <tt>synchronized(msg_mutex)</tt>. * ]] TODO */public class STABLE extends RpcProtocol { /** The protocol name */ private static final String PROT_NAME="STABLE"; /** Default subgroup size for gossiping expressed as percentage overthe group's size */ private static final double SUBSET_SIZE=0.1; /** Default max number of msgs to wait for before sending gossip */ private static final int GOSSIP_MSG_INTERVAL=100; /** Default max time to wait before sending gossip (ms) */ private static final int GOSSIP_INTERVAL=10000; private Address local_addr=null; private ViewId vid=null; private final Vector mbrs=new Vector(11); /** gossip round */ private long round=1; /** highest seqno received for each member (corresponds to membership) */ private long[] seqnos=new long[0]; /** Array of members from which we have received a gossip in the current round */ private boolean[] heard_from=new boolean[0]; /** Percentage of members to which gossip is sent (parameterizable by user) */ private double subset=SUBSET_SIZE; /** The gossiping task scheduler */ private TimeScheduler sched=null; private Task gossip_task; /** wait for n messages until sending gossip ... */ private int max_msgs=GOSSIP_MSG_INTERVAL; /** ... or until max_wait_time has elapsed, whichever comes first */ private long max_wait_time=GOSSIP_INTERVAL; /** Current number of msgs left to be received before sending a gossip */ private long num_msgs=max_msgs; /** mutex for interacting with NAKACK layer (GET_MSGS_RECVD) */ private final Object highest_seqnos_mutex=new Object(); /** Time to wait for a reply from NAKACK layer (GET_MSGS_RECVD) */ private long highest_seqnos_timeout=4000; /** * @return this protocol name */ public String getName() { return (PROT_NAME); } /** * The events expected to be handled from some layer above: * <ul> * <li> * GET_MSGS_RECEIVED: NAKACK layer * </li> * </ul> * @return a list of events expected by to be handled from some layer * above */ public Vector requiredUpServices() { Vector retval=new Vector(1); retval.addElement(new Integer(Event.GET_MSGS_RECEIVED)); return retval; } /** * Set the parameters for this layer. * * <ul> * <li> * <i>subset</i>: the percentage of the group'size to which the * msgs_seen_so_far gossip is sent periodically.</li> * <li> * <i>max_msgs</i>: the max number of msgs to wait for between two * consecutive gossipings.</li> * <li> * <i>max_wait_time</i>: the max time to wait for between two consecutive * gossipings.</li> * <li> * <i>highest_seqno_timeout</i>: time to wait to receive from NAKACK * the array of highest deliverable seqnos * </li> * </ul> * * @param props the list of parameters */ public boolean setProperties(Properties props) { String str; super.setProperties(props); str=props.getProperty("subset"); if(str != null) { subset=Float.parseFloat(str); props.remove("subset"); } str=props.getProperty("max_msgs"); if(str != null) { num_msgs=max_msgs=Integer.parseInt(str); if(max_msgs <= 1) { if(log.isFatalEnabled()) log.fatal("value for 'max_msgs' must be greater than 1 !"); return false; } props.remove("max_msgs"); } str=props.getProperty("max_wait_time"); if(str != null) { max_wait_time=Long.parseLong(str); props.remove("max_wait_time"); } str=props.getProperty("highest_seqnos_timeout"); if(str != null) { highest_seqnos_timeout=Long.parseLong(str); props.remove("highest_seqnos_timeout"); } if(props.size() > 0) { log.error("STABLE.setProperties(): these properties are not recognized: " + props); return false; } return true; } /** * Start the layer: * i. Set the gossip task scheduler * ii. Reset the layer's state. * iii. Start the gossiping task */ public void start() throws Exception { TimeScheduler timer; super.start(); timer=stack != null ? stack.timer : null; if(timer == null) throw new Exception("STABLE.start(): timer is null"); sched=timer; // we use only asynchronous method invocations... if(_corr != null) _corr.setDeadlockDetection(false); initialize(); startGossip(); } /** * Stop scheduling the gossip task */ public void stop() { super.stop(); synchronized(this) { if(gossip_task != null) gossip_task.cancel(); gossip_task=null; } } /* ------------------------- Request handler methods ------------------ */ /** * Contains the highest sequence numbers as seen by <code>sender</code> * * @param view_id The view ID in which the gossip was sent. Must be the * same as ours, otherwise it is discarded * * @param gossip_round The round in which the gossip was sent * * @param gossip_seqnos A vector with the highest sequence numbers as * seen by <code>sender</code> * * @param heard The sender's <code>heard_from</code> array. This allows * us to minimize the gossip msgs for a given round as a member does not * have to receive gossip msgs from each member, but members pass gossips * they've received from others on in their own gossips. E.g. when a * member P (of group {P,Q,R}) receives a gossip from R, its own gossip * to Q might be {R,P}. Q, who hasn't received a gossip from R, will not * need to receive it anymore as it is already sent by P. This simple * scheme reduces the number of gossip messages needed. * * @param sender The sender of the gossip message (obviously :-)) */ public void gossip(ViewId view_id, long gossip_round, long[] gossip_seqnos, boolean[] heard, Object sender) { Object[] params; MethodCall call; synchronized(this) { if(log.isInfoEnabled()) log.info("sender=" + sender + ", round=" + gossip_round + ", seqnos=" + Util.array2String(gossip_seqnos) + ", heard=" + Util.array2String(heard)); if(vid == null || view_id == null || !vid.equals(view_id)) { if(log.isInfoEnabled()) log.info("view ID s are different (" + vid + " != " + view_id + "). Discarding gossip received"); return; } if(gossip_round < this.round) { if(log.isInfoEnabled()) log.info("received a gossip from a previous round (" + gossip_round + "); my round is " + round + ". Discarding gossip"); return; } if(gossip_seqnos == null || seqnos == null || seqnos.length != gossip_seqnos.length) { if(warn) log.warn("size of seqnos and gossip_seqnos are not equal ! " + "Discarding gossip"); return; } // (1) If round greater than local round: // i. Adjust the local to the received round // // (2) // i. local_seqnos = arrayMin(local_seqnos, gossip_seqnos) // ii. local_heard = arrayMax(local_heard, gossip_heard) // iii. If heard from all, bcast our seqnos (stability vector) if(round == gossip_round) { update(sender, gossip_seqnos, heard); } else if(round < gossip_round) { if(log.isInfoEnabled()) log.info("received a gossip from a higher round (" + gossip_round + "); adopting my round (" + round + ") to " + gossip_round); round=gossip_round; set(sender, gossip_seqnos, heard_from); } if(log.isInfoEnabled()) log.info("heard_from=" + Util.array2String(heard_from)); if(!heardFromAll()) return; params=new Object[]{ vid.clone(), new Long(gossip_round), seqnos.clone(), local_addr}; } // synchronized(this) call=new MethodCall("stability", params, new String[] {ViewId.class.getName(), long.class.getName(), long[].class.getName(), Object.class.getName()}); callRemoteMethods(null, call, GroupRequest.GET_NONE, 0); } /** * Contains the highest message sequence numbers (for each member) that * can safely be deleted (because they have been seen by all members). */ public void stability(ViewId view_id, long gossip_round, long[] stability_vector, Object sender) { // i. Proceed to the next round; init the heard from list // ii. Send up the stability vector // iii. get a fresh copy of the highest deliverable seqnos synchronized(this) { if(log.isInfoEnabled()) log.info("sender=" + sender + ", round=" + gossip_round + ", vector=" + Util.array2String(stability_vector) + ')'); if(vid == null || view_id == null || !vid.equals(view_id)) { if(log.isInfoEnabled()) log.info("view ID s are different (" + vid + " != " + view_id + "). Discarding gossip received"); return; } if(round > gossip_round) return; round=gossip_round + 1; for(int i=0; i < heard_from.length; i++) heard_from[i]=false; } heard_from[mbrs.indexOf(local_addr)]=true; passUp(new Event(Event.STABLE, stability_vector)); getHighestSeqnos(); } /* --------------------- End of Request handler methods --------------- */ /** * <b>Callback</b>. Called by superclass when event may be handled. * <p> * <b>Do not use <code>PassUp</code> in this method as the event is passed * up by default by the superclass after this method returns !</b> * * @return boolean Defaults to true. If false, event will not be passed * up the stack. */ public boolean handleUpEvent(Event evt) { switch(evt.getType()) { case Event.MSG: if(!upMsg(evt)) return (false); break; case Event.SET_LOCAL_ADDRESS: local_addr=(Address)evt.getArg(); break; } return true; } /** * <b>Callback</b>. Called by superclass when event may be handled. * <p> * <b>Do not use <code>PassDown</code> in this method as the event is
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -