?? fc.java.new
字號:
// $Id: FC.java,v 1.51 2006/01/14 14:00:38 belaban Exp $package org.jgroups.protocols;import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;import EDU.oswego.cs.dl.util.concurrent.Sync;import EDU.oswego.cs.dl.util.concurrent.ReentrantLock;import EDU.oswego.cs.dl.util.concurrent.CondVar;import org.jgroups.*;import org.jgroups.stack.Protocol;import org.jgroups.util.BoundedList;import org.jgroups.util.Streamable;import org.jgroups.util.Util;import java.io.*;import java.util.*;/** * Simple flow control protocol based on a credit system. Each sender has a number of credits (bytes * to send). When the credits have been exhausted, the sender blocks. Each receiver also keeps track of * how many credits it has received from a sender. When credits for a sender fall below a threshold, * the receiver sends more credits to the sender. Works for both unicast and multicast messages. * <p> * Note that this protocol must be located towards the top of the stack, or all down_threads from JChannel to this * protocol must be set to false ! This is in order to block JChannel.send()/JChannel.down(). * <br/>This is the second simplified implementation of the same model. The algorithm is sketched out in * doc/FlowControl.txt * @author Bela Ban * @version $Revision: 1.51 $ */public class FC extends Protocol { /** HashMap<Address,Long>: keys are members, values are credits left. For each send, the * number of credits is decremented by the message size */ final Map sent=new HashMap(11); // final Map sent=new ConcurrentHashMap(11); /** HashMap<Address,Long>: keys are members, values are credits left (in bytes). * For each receive, the credits for the sender are decremented by the size of the received message. * When the credits are 0, we refill and send a CREDIT message to the sender. Sender blocks until CREDIT * is received after reaching <tt>min_credits</tt> credits. */ final Map received=new ConcurrentReaderHashMap(11); // final Map received=new ConcurrentHashMap(11); /** List of members from whom we expect credits */ final List creditors=new ArrayList(11); /** Max number of bytes to send per receiver until an ack must * be received before continuing sending */ private long max_credits=50000; private Long max_credits_constant; /** Max time (in milliseconds) to block. If credit hasn't been received after max_block_time, we send * a REPLENISHMENT request to the members from which we expect credits. A value <= 0 means to * wait forever. */ private long max_block_time=5000; /** If credits fall below this limit, we send more credits to the sender. (We also send when * credits are exhausted (0 credits left)) */ private double min_threshold=0.25; /** Computed as <tt>max_credits</tt> times <tt>min_theshold</tt>. If explicitly set, this will * override the above computation */ private long min_credits=0; /** Whether FC is still running, this is set to false when the protocol terminates (on stop()) */ private boolean running=true; /** Determines whether or not to block on down(). Set when not enough credit is available to send a message * to all or a single member */ private boolean insufficient_credit=false; /** the lowest credits of any destination (sent_msgs) */ private long lowest_credit=max_credits; /** Lock to be used with the Condvar below */ final Sync lock=new ReentrantLock(); /** Mutex to block on down() */ final CondVar mutex=new CondVar(lock); static final String name="FC"; private long start_blocking=0, stop_blocking=0; private int num_blockings=0; private int num_credit_requests_received=0, num_credit_requests_sent=0; private int num_credit_responses_sent=0, num_credit_responses_received=0; private long total_time_blocking=0; final BoundedList last_blockings=new BoundedList(50); final static FcHeader REPLENISH_HDR=new FcHeader(FcHeader.REPLENISH); final static FcHeader CREDIT_REQUEST_HDR=new FcHeader(FcHeader.CREDIT_REQUEST); public final String getName() { return name; } public void resetStats() { super.resetStats(); num_blockings=0; num_credit_responses_sent=num_credit_responses_received=num_credit_requests_received=num_credit_requests_sent=0; total_time_blocking=0; last_blockings.removeAll(); } public long getMaxCredits() { return max_credits; } public void setMaxCredits(long max_credits) { this.max_credits=max_credits; max_credits_constant=new Long(this.max_credits); } public double getMinThreshold() { return min_threshold; } public void setMinThreshold(double min_threshold) { this.min_threshold=min_threshold; } public long getMinCredits() { return min_credits; } public void setMinCredits(long min_credits) { this.min_credits=min_credits; } public boolean isBlocked() { return insufficient_credit; } public int getNumberOfBlockings() { return num_blockings; } public long getMaxBlockTime() { return max_block_time; } public void setMaxBlockTime(long t) { max_block_time=t; } public long getTotalTimeBlocked() { return total_time_blocking; } public double getAverageTimeBlocked() { return num_blockings == 0? 0.0 : total_time_blocking / (double)num_blockings; } public int getNumberOfCreditRequestsReceived() { return num_credit_requests_received; } public int getNumberOfCreditRequestsSent() { return num_credit_requests_sent; } public int getNumberOfCreditResponsesReceived() { return num_credit_responses_received; } public int getNumberOfCreditResponsesSent() { return num_credit_responses_sent; } public String printSenderCredits() { return printMap(sent); } public String printReceiverCredits() { return printMap(received); } public String printCredits() { StringBuffer sb=new StringBuffer(); sb.append("senders:\n").append(printMap(sent)).append("\n\nreceivers:\n").append(printMap(received)); return sb.toString(); } public Map dumpStats() { Map retval=super.dumpStats(); if(retval == null) retval=new HashMap(); retval.put("senders", printMap(sent)); retval.put("receivers", printMap(received)); retval.put("num_blockings", new Integer(this.num_blockings)); retval.put("avg_time_blocked", new Double(getAverageTimeBlocked())); retval.put("num_replenishments", new Integer(this.num_credit_responses_received)); retval.put("total_time_blocked", new Long(total_time_blocking)); return retval; } public String showLastBlockingTimes() { return last_blockings.toString(); } /** Allows to unblock a blocked sender from an external program, e.g. JMX */ public void unblock() { if(Util.acquire(lock)) { try { if(trace) log.trace("unblocking the sender and replenishing all members, creditors are " + creditors); Map.Entry entry; for(Iterator it=sent.entrySet().iterator(); it.hasNext();) { entry=(Map.Entry)it.next(); entry.setValue(max_credits_constant); } lowest_credit=computeLowestCredit(sent); creditors.clear(); insufficient_credit=false; mutex.broadcast(); } finally { Util.release(lock); } }// synchronized(mutex) {// if(trace)// log.trace("unblocking the sender and replenishing all members, creditors are " + creditors);//// Map.Entry entry;// for(Iterator it=sent.entrySet().iterator(); it.hasNext();) {// entry=(Map.Entry)it.next();// entry.setValue(max_credits_constant);// }//// lowest_credit=computeLowestCredit(sent);// creditors.clear();// insufficient_credit=false;// mutex.notifyAll();// } } public boolean setProperties(Properties props) { String str; boolean min_credits_set=false; super.setProperties(props); str=props.getProperty("max_credits"); if(str != null) { max_credits=Long.parseLong(str); props.remove("max_credits"); } str=props.getProperty("min_threshold"); if(str != null) { min_threshold=Double.parseDouble(str); props.remove("min_threshold"); } str=props.getProperty("min_credits"); if(str != null) { min_credits=Long.parseLong(str); props.remove("min_credits"); min_credits_set=true; } if(!min_credits_set) min_credits=(long)((double)max_credits * min_threshold); str=props.getProperty("max_block_time"); if(str != null) { max_block_time=Long.parseLong(str); props.remove("max_block_time"); } if(props.size() > 0) { log.error("FC.setProperties(): the following properties are not recognized: " + props); return false; } max_credits_constant=new Long(max_credits); return true; } public void start() throws Exception { super.start();// synchronized(mutex) {// running=true;// insufficient_credit=false;// lowest_credit=max_credits;// } lock.acquire(); try { running=true; insufficient_credit=false; lowest_credit=max_credits; } finally { lock.release(); } } public void stop() { super.stop();// synchronized(mutex) {// running=false;// mutex.notifyAll();// } if(Util.acquire(lock)) { try { running=false; mutex.broadcast(); // notify all threads waiting on the mutex that we are done } finally { Util.release(lock); } } } /** * We need to receive view changes concurrent to messages on the down events: a message might blocks, e.g. * because we don't have enough credits to send to member P. However, if member P crashed, we need to unblock ! * @param evt */ protected void receiveDownEvent(Event evt) { if(evt.getType() == Event.VIEW_CHANGE) { View v=(View)evt.getArg(); Vector mbrs=v.getMembers(); handleViewChange(mbrs); } super.receiveDownEvent(evt); } public void down(Event evt) { switch(evt.getType()) { case Event.MSG: handleDownMessage(evt); return; } passDown(evt); // this could potentially use the lower protocol's thread which may block }
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -