?? stable.java
* passed down by default by the superclass after this method returns !</b> * * @return boolean Defaults to true. If false, event will not be passed * down the stack. */ public boolean handleDownEvent(Event evt) { switch(evt.getType()) { case Event.VIEW_CHANGE: if(!downViewChange(evt)) return (false); break; // does anyone else below needs this msg except STABLE? case Event.GET_MSGS_RECEIVED_OK: if(!downGetMsgsReceived(evt)) return (false); break; } return (true); } /** * The gossip task that runs periodically */ private void gossipRun() { num_msgs=max_msgs; sendGossip(); } /** * <pre> * Reset the state of msg garbage-collection: * i. Reset the table of highest seqnos seen by each member * ii. Reset the tbl of mbrs from which highest seqnos have been recorded * </pre> */ private void initialize() { synchronized(this) { seqnos=new long[mbrs.size()]; for(int i=0; i < seqnos.length; i++) seqnos[i]=-1; heard_from=new boolean[mbrs.size()]; for(int i=0; i < heard_from.length; i++) heard_from[i]=false; } } /** * (1)<br> * Merge this member's table of highest seqnos seen by a each member * with the one received from a gossip by another member. The result is * the element-wise minimum of the input arrays. For each entry:<br> * * <tt>seqno[mbr_i] = min(seqno[mbr_i], gossip_seqno[mbr_i])</tt> * <p> * * (2)<br> * Merge the <tt>heard from</tt> tables of this member and the sender of * the gossip. The resulting table is:<br> * * <tt>heard_from[mbr_i] = heard_from[mbr_i] | sender_heard[mbr_i]</tt> * * @param sender the sender of the gossip * @param gossip_seqnos the highest deliverable seqnos of the sender * @param gossip_heard_from the table of members sender has heard from * */ private void update(Object sender, long[] gossip_seqnos, boolean[] gossip_heard_from) { int index; synchronized(this) { index=mbrs.indexOf(sender); if(index < 0) { if(warn) log.warn("sender " + sender + " not found in mbrs !"); return; } for(int i=0; i < gossip_seqnos.length; i++) seqnos[i]=Math.min(seqnos[i], gossip_seqnos[i]); heard_from[index]=true; for(int i=0; i < heard_from.length; i++) heard_from[i]=heard_from[i] | gossip_heard_from[i]; } } /** * Set the seqnos and heard_from arrays to those of the sender. The * method is called when the sender seems to know more than this member. * The situation occurs if either: * <ul> * <li> * sender.heard_from > this.heard_from, i.e. the sender has heard * from more members than we have</li> * <li> * sender.round > this.round, i.e. the sender is in a more recent round * than we are</li> * </ul> * * In both cases, this member is assigned the state of the sender */ private void set(Object sender, long[] gossip_seqnos, boolean[] gossip_heard_from) { int index; synchronized(this) { index=mbrs.indexOf(sender); if(index < 0) { if(warn) log.warn("sender " + sender + " not found in mbrs !"); return; } seqnos=gossip_seqnos; heard_from=gossip_heard_from; } } /** * @return true, if we have received the highest deliverable seqnos * directly or indirectly from all members */ private boolean heardFromAll() { synchronized(this) { if(heard_from == null) return false; for(int i=0; i < heard_from.length; i++) if(!heard_from[i]) return false; } return true; } /** * Send our <code>seqnos</code> array to a subset of the membership */ private void sendGossip() { Vector gossip_subset; Object[] params; MethodCall call; synchronized(this) { gossip_subset=Util.pickSubset(mbrs, subset); if(gossip_subset == null || gossip_subset.size() < 1) { if(warn) log.warn("picked empty subset !"); return; } if(log.isInfoEnabled()) log.info("subset=" + gossip_subset + ", round=" + round + ", seqnos=" + Util.array2String(seqnos)); params=new Object[]{ vid.clone(), new Long(round), seqnos.clone(), heard_from.clone(), local_addr}; } call=new MethodCall("gossip", params, new String[] {ViewId.class.getName(), long.class.getName(), long[].class.getName(), boolean[].class.getName(), Object.class.getName()}); for(int i=0; i < gossip_subset.size(); i++) { try { callRemoteMethod((Address)gossip_subset.get(i), call, GroupRequest.GET_NONE, 0); } catch(Exception e) { if(log.isDebugEnabled()) log.debug("exception=" + e); } } } /** * Sends GET_MSGS_RECEIVED to NAKACK layer (above us !) and stores result * in <code>seqnos</code>. In case <code>seqnos</code> does not yet exist * it creates and initializes it. */ private void getHighestSeqnos() { synchronized(highest_seqnos_mutex) { passUp(new Event(Event.GET_MSGS_RECEIVED)); try { highest_seqnos_mutex.wait(highest_seqnos_timeout); } catch(InterruptedException e) { if(log.isErrorEnabled()) log.error("Interrupted while waiting for highest seqnos from NAKACK"); } } } /** * Start scheduling the gossip task */ private void startGossip() { synchronized(this) { if(gossip_task != null) gossip_task.cancel(); gossip_task=new Task(new Times(new long[]{GOSSIP_INTERVAL})); sched.add(gossip_task); } } /** * Received a <tt>MSG</tt> event from a layer below * * A msg received: * If unicast ignore; if multicast and time for gossiping has been * reached, send out a gossip to a subset of the mbrs * * @return true if the event should be forwarded to the layer above */ private boolean upMsg(Event e) { Message msg=(Message)e.getArg(); if(msg.getDest() != null && (!msg.getDest().isMulticastAddress())) return (true); synchronized(this) { --num_msgs; if(num_msgs > 0) return (true); num_msgs=max_msgs; gossip_task.cancel(); gossip_task=new Task(new Times(new long[]{0, GOSSIP_INTERVAL})); sched.add(gossip_task); } return (true); } /** * Received a <tt>VIEW_CHANGE</tt> event from a layer above * * A new view: * i. Set the new mbrs list and the new view ID. * ii. Reset the highest deliverable seqnos seen * * @return true if the event should be forwarded to the layer below */ private boolean downViewChange(Event e) { View v=(View)e.getArg(); Vector new_mbrs=v.getMembers(); /* // Could this ever happen? GMS is always sending non-null value if(new_mbrs == null) { / Trace.println( "STABLE.handleDownEvent()", Trace.ERROR, "Received VIEW_CHANGE event with null mbrs list"); break; } */ synchronized(this) { vid=v.getVid(); mbrs.clear(); mbrs.addAll(new_mbrs); initialize(); } return (true); } /** * Received a <tt>GET_MSGS__RECEIVED_OK</tt> event from a layer above * * Updated list of highest deliverable seqnos: * i. Update the local copy of highest deliverable seqnos * * @return true if the event should be forwarded to the layer below */ private boolean downGetMsgsReceived(Event e) { long[] new_seqnos=(long[])e.getArg(); try { synchronized(this) { if(new_seqnos == null) return (true); if(new_seqnos.length != seqnos.length) { if(log.isInfoEnabled()) log.info("GET_MSGS_RECEIVED: array of highest " + "seqnos seen so far (received from NAKACK layer) " + "has a different length (" + new_seqnos.length + ") from 'seqnos' array (" + seqnos.length + ')'); return (true); } System.arraycopy(new_seqnos, 0, seqnos, 0, seqnos.length); } } finally { synchronized(highest_seqnos_mutex) { highest_seqnos_mutex.notifyAll(); } } return (true); } /** * Select next interval from list. Once the end of the list is reached, * keep returning the last value. It would be sensible that list of * times is in increasing order */ private static class Times { private int next=0; private long[] times; Times(long[] times) { if(times.length == 0) throw new IllegalArgumentException("times"); this.times=times; } public synchronized long next() { if(next >= times.length) return (times[times.length - 1]); else return (times[next++]); } public long[] times() { return (times); } public synchronized void reset() { next=0; } } /** * The gossiping task */ private class Task implements TimeScheduler.Task { private final Times intervals; private boolean cancelled=false; Task(Times intervals) { this.intervals=intervals; } public long nextInterval() { } public boolean cancelled() { return (cancelled); } public void cancel() { cancelled=true; } public void run() { gossipRun(); } }}
?? 快捷鍵說(shuō)明
復(fù)制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號(hào)
Ctrl + =
減小字號(hào)
Ctrl + -