?? fc.java.txt
字號:
// $Id: FC.java.txt,v 1.1 2005/08/16 12:58:58 belaban Exp $package org.jgroups.protocols;import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;import org.jgroups.*;import org.jgroups.stack.Protocol;import org.jgroups.util.BoundedList;import org.jgroups.util.CondVar;import org.jgroups.util.Streamable;import java.io.*;import java.util.*;/** * Simple flow control protocol based on a credit system. Each sender has a number of credits (bytes * to send). When the credits have been exhausted, the sender blocks. Each receiver also keeps track of * how many credits it has received from a sender. When credits for a sender fall below a threshold, * the receiver sends more credits to the sender. Works for both unicast and multicast messages. * <p> * Note that this protocol must be located towards the top of the stack, or all down_threads from JChannel to this * protocol must be set to false ! This is in order to block JChannel.send()/JChannel.down(). * @author Bela Ban * @version $Revision: 1.1 $ */public class FC extends Protocol { /** My own address */ Address local_addr=null; /** HashMap<Address,Long>: keys are members, values are credits left. For each send, the * number of credits is decremented by the message size */ final Map sent=new HashMap(11); // final Map sent=new ConcurrentHashMap(11); /** HashMap<Address,Long>: keys are members, values are credits left (in bytes). * For each receive, the credits for the sender are decremented by the size of the received message. * When the credits are 0, we refill and send a CREDIT message to the sender. Sender blocks until CREDIT * is received after reaching <tt>min_credits</tt> credits. */ final Map received=new ConcurrentReaderHashMap(11); // final Map received=new ConcurrentHashMap(11); /** We cache the membership */ final Vector members=new Vector(11); /** List of members from whom we expect credits */ final Vector creditors=new Vector(11); /** Max number of bytes to send per receiver until an ack must * be received before continuing sending */ private long max_credits=50000; /** Max time (in milliseconds) to block. If credit hasn't been received after max_block_time, we send * a REPLENISHMENT request to the members from which we expect credits. A value <= 0 means to * wait forever. */ private long max_block_time=5000; /** If credits fall below this limit, we send more credits to the sender. (We also send when * credits are exhausted (0 credits left)) */ double min_threshold=0.25; /** Computed as <tt>max_credits</tt> times <tt>min_theshold</tt>. If explicitly set, this will * override the above computation */ private long min_credits=0; /** Current blocking. True if blocking, else false */ private final CondVar blocking=new CondVar("blocking", Boolean.FALSE); static final String name="FC"; private long start_blocking=0, stop_blocking=0; private int num_blockings=0, num_replenishments=0, num_credit_requests=0; private long total_time_blocking=0; final BoundedList last_blockings=new BoundedList(50); final static FcHeader REPLENISH_HDR=new FcHeader(FcHeader.REPLENISH); final static FcHeader CREDIT_REQUEST_HDR=new FcHeader(FcHeader.CREDIT_REQUEST); public String getName() { return name; } public void resetStats() { super.resetStats(); num_blockings=num_replenishments=num_credit_requests=0; total_time_blocking=0; last_blockings.removeAll(); } public long getMaxCredits() { return max_credits; } public void setMaxCredits(long max_credits) { this.max_credits=max_credits; } public double getMinThreshold() { return min_threshold; } public void setMinThreshold(double min_threshold) { this.min_threshold=min_threshold; } public long getMinCredits() { return min_credits; } public void setMinCredits(long min_credits) { this.min_credits=min_credits; } public boolean isBlocked() { Object obj=blocking.get(); return obj != null && obj instanceof Boolean && ((Boolean)obj).booleanValue(); } public int getNumberOfBlockings() { return num_blockings; } public long getTotalTimeBlocked() { return total_time_blocking; } public double getAverageTimeBlocked() { return num_blockings == 0? num_blockings : total_time_blocking / num_blockings; } public int getNumberOfReplenishmentsReceived() { return num_replenishments; } public int getNumberOfCreditRequests() { return num_credit_requests; } public String printSenderCredits() { return printMap(sent); } public String printReceiverCredits() { return printMap(received); } public String printCredits() { StringBuffer sb=new StringBuffer(); sb.append("senders:\n").append(printMap(sent)).append("\n\nreceivers:\n").append(printMap(received)); return sb.toString(); } public Map dumpStats() { Map retval=super.dumpStats(); if(retval == null) retval=new HashMap(); retval.put("senders", printMap(sent)); retval.put("receivers", printMap(received)); retval.put("num_blockings", new Integer(this.num_blockings)); retval.put("avg_time_blocked", new Double(getAverageTimeBlocked())); retval.put("num_replenishments", new Integer(this.num_replenishments)); return retval; } public String showLastBlockingTimes() { return last_blockings.toString(); } public void unblock() { unblockSender(); } public boolean setProperties(Properties props) { String str; boolean min_credits_set=false; super.setProperties(props); str=props.getProperty("max_credits"); if(str != null) { max_credits=Long.parseLong(str); props.remove("max_credits"); } str=props.getProperty("min_threshold"); if(str != null) { min_threshold=Double.parseDouble(str); props.remove("min_threshold"); } str=props.getProperty("min_credits"); if(str != null) { min_credits=Long.parseLong(str); props.remove("min_credits"); min_credits_set=true; } if(!min_credits_set) min_credits=(long)((double)max_credits * min_threshold); str=props.getProperty("max_block_time"); if(str != null) { max_block_time=Long.parseLong(str); props.remove("max_block_time"); } if(props.size() > 0) { log.error("FC.setProperties(): the following properties are not recognized: " + props); return false; } return true; } public void stop() { super.stop(); unblock(); } /** * We need to receive view changes concurrent to messages on the down events: a message might blocks, e.g. * because we don't have enough credits to send to member P. However, if member P crashed, we need to unblock ! * @param evt */ protected void receiveDownEvent(Event evt) { if(evt.getType() == Event.VIEW_CHANGE) { View v=(View)evt.getArg(); Vector mbrs=v.getMembers(); handleViewChange(mbrs); } super.receiveDownEvent(evt); } public void down(Event evt) { switch(evt.getType()) { case Event.MSG: handleDownMessage(evt); return; } passDown(evt); // this could potentially use the lower protocol's thread which may block } private synchronized void handleDownMessage(Event evt) { if(Boolean.TRUE.equals(blocking.get())) { // blocked waitUntilEnoughCreditsAvailable(); } else { // not blocked boolean rc; synchronized(sent) { // 'sent' is the same lock as blocking.getLock()... rc=decrMessage((Message)evt.getArg()); if(rc == false) { if(trace) log.trace("blocking due to insufficient credits"); blocking.set(Boolean.TRUE); start_blocking=System.currentTimeMillis(); num_blockings++; } } if(rc == false) { waitUntilEnoughCreditsAvailable(); } } passDown(evt); } public void up(Event evt) { switch(evt.getType()) { case Event.SET_LOCAL_ADDRESS: local_addr=(Address)evt.getArg(); break; case Event.VIEW_CHANGE: handleViewChange(((View)evt.getArg()).getMembers()); break; case Event.MSG: Message msg=(Message)evt.getArg(); FcHeader hdr=(FcHeader)msg.removeHeader(name); if(hdr != null) { switch(hdr.type) { case FcHeader.REPLENISH: num_replenishments++; handleCredit(msg.getSrc()); break; case FcHeader.CREDIT_REQUEST: num_credit_requests++; Address sender=msg.getSrc(); if(trace) log.trace("received credit request from " + sender + ": sending credits"); received.put(sender, new Long(max_credits)); sendCredit(sender); break; default: log.error("header type " + hdr.type + " not known"); break; } return; // don't pass message up } else { adjustCredit(msg); } break; } passUp(evt); } private void handleCredit(Address sender) { if(sender == null) return;
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -