?? fc.java.txt
字號:
StringBuffer sb=null; boolean unblock=false; if(trace) { Long old_credit=(Long)sent.get(sender); sb=new StringBuffer(); sb.append("received credit from ").append(sender).append(", old credit was "). append(old_credit).append(", new credits are ").append(max_credits). append(".\nCreditors before are: ").append(creditors); } synchronized(sent) { sent.put(sender, new Long(max_credits)); if(creditors.size() > 0) { // we are blocked because we expect credit from one or more members removeCreditor(sender); if(trace) { sb.append("\nCreditors after removal of ").append(sender).append(" are: ").append(creditors); log.trace(sb.toString()); } if(creditors.size() == 0) { unblock=true; } } else { // no creditors, but still blocking: we need to unblock if(Boolean.TRUE.equals(blocking.get())) unblock=true; } } if(unblock) // moved this outside of the 'sent' synchronized block unblockSender(); } /** * Check whether sender has enough credits left. If not, send him some more * @param msg */ private void adjustCredit(Message msg) { Address src=msg.getSrc(); long size=Math.max(24, msg.getLength()); if(src == null) { if(log.isErrorEnabled()) log.error("src is null"); return; } if(decrementCredit(received, src, size, min_credits) == false) { received.put(src, new Long(max_credits)); if(trace) log.trace("sending replenishment message to " + src); sendCredit(src); } } private void sendCredit(Address dest) { Message msg=new Message(dest, null, null); msg.putHeader(name, REPLENISH_HDR); passDown(new Event(Event.MSG, msg)); } private void sendCreditRequest(final Address dest) { Message msg=new Message(dest, null, null); msg.putHeader(name, CREDIT_REQUEST_HDR); passDown(new Event(Event.MSG, msg)); } /** * Checks whether enough credits are available to send message. If not, blocks until enough credits * are available * @param evt Guaranteed to be a Message * @return */ private void waitUntilEnoughCreditsAvailable() { while(true) { try { blocking.waitUntilWithTimeout(Boolean.FALSE, max_block_time); // waits on 'sent' break; } catch(TimeoutException e) { List tmp=new ArrayList(creditors); if(trace) log.trace("timeout occurred waiting for credits; sending credit request to " + tmp + ", creditors are " + creditors); Address mbr; for(Iterator it=tmp.iterator(); it.hasNext();) { mbr=(Address)it.next(); sendCreditRequest(mbr); } } } } /** * Try to decrement the credits needed for this message and return true if successful, or false otherwise. * For unicast destinations, the credits required are subtracted from the unicast destination member, for * multicast messages the credits are subtracted from all current members in the group. * @param msg * @return false: will block, true: will not block */ private boolean decrMessage(Message msg) { Address dest; long size; boolean success=true; // ****************************************************************************************************** // this method is called by waitUntilEnoughCredits() which syncs on 'sent', so we don't need to sync here // ****************************************************************************************************** if(msg == null) { if(log.isErrorEnabled()) log.error("msg is null"); return true; // don't block ! } dest=msg.getDest(); size=Math.max(24, msg.getLength()); if(dest != null && !dest.isMulticastAddress()) { // unicast destination if(decrementCredit(sent, dest, size, 0)) { return true; } else { addCreditor(dest); return false; } } else { // multicast destination for(Iterator it=members.iterator(); it.hasNext();) { dest=(Address)it.next(); if(decrementCredit(sent, dest, size, 0) == false) { addCreditor(dest); success=false; } } } return success; } /** If message queueing is enabled, sends queued messages and unlocks sender (if successful) */ private void unblockSender() { if(start_blocking > 0) { stop_blocking=System.currentTimeMillis(); long diff=stop_blocking - start_blocking; total_time_blocking+=diff; last_blockings.add(new Long(diff)); stop_blocking=start_blocking=0; if(trace) log.trace("setting blocking=false, blocking time was " + diff + "ms"); } if(trace) log.trace("setting blocking=false"); blocking.set(Boolean.FALSE); } private void addCreditor(Address mbr) { if(mbr != null && !creditors.contains(mbr)) creditors.add(mbr); } private void removeCreditor(Address mbr) { creditors.remove(mbr); } /** * Find the credits associated with <tt>dest</tt> and decrement its credits by credits_required. If the remaining * value is less than or equal to 0, return false, else return true. Note that we will always subtract the credits. * @param map * @param dest * @param credits_required Number of bytes required * @param minimal_credits For the receiver: add minimal credits to check whether credits need to be sent * @return Whether the required credits could successfully be subtracted from the credits left */ private boolean decrementCredit(Map map, Address dest, long credits_required, long minimal_credits) { long credits_left, new_credits_left; Long tmp=(Long)map.get(dest); boolean success; if(tmp == null) return true; credits_left=tmp.longValue(); success=credits_left > (credits_required + minimal_credits); new_credits_left=Math.max(0, credits_left - credits_required); map.put(dest, new Long(new_credits_left)); if(success) { return true; } else { if(trace) { StringBuffer sb=new StringBuffer(); sb.append("not enough credits left for ").append(dest).append(": left=").append(new_credits_left); sb.append(", required+min_credits=").append((credits_required +min_credits)).append(", required="); sb.append(credits_required).append(", min_credits=").append(min_credits); log.trace(sb.toString()); } return false; } } void handleViewChange(Vector mbrs) { Address addr; if(mbrs == null) return; if(trace) log.trace("new membership: " + mbrs); members.clear(); members.addAll(mbrs); synchronized(received) { // add members not in membership to received hashmap (with full credits) for(int i=0; i < mbrs.size(); i++) { addr=(Address) mbrs.elementAt(i); if(!received.containsKey(addr)) received.put(addr, new Long(max_credits)); } // remove members that left for(Iterator it=received.keySet().iterator(); it.hasNext();) { addr=(Address) it.next(); if(!mbrs.contains(addr)) it.remove(); } } boolean unblock=false; synchronized(sent) { // add members not in membership to sent hashmap (with full credits) for(int i=0; i < mbrs.size(); i++) { addr=(Address) mbrs.elementAt(i); if(!sent.containsKey(addr)) sent.put(addr, new Long(max_credits)); } // remove members that left for(Iterator it=sent.keySet().iterator(); it.hasNext();) { addr=(Address)it.next(); if(!mbrs.contains(addr)) it.remove(); // modified the underlying map } // remove all creditors which are not in the new view for(int i=0; i < creditors.size(); i++) { Address creditor=(Address)creditors.elementAt(i); if(!mbrs.contains(creditor)) creditors.remove(creditor); } if(trace) log.trace("creditors are " + creditors); if(creditors.size() == 0) unblock=true; } if(unblock) unblockSender(); } private static String printMap(Map m) { Map.Entry entry; StringBuffer sb=new StringBuffer(); for(Iterator it=m.entrySet().iterator(); it.hasNext();) { entry=(Map.Entry)it.next(); sb.append(entry.getKey()).append(": ").append(entry.getValue()).append("\n"); } return sb.toString(); } public static class FcHeader extends Header implements Streamable { public static final byte REPLENISH = 1; public static final byte CREDIT_REQUEST = 2; // the sender of the message is the requester byte type = REPLENISH; public FcHeader() { } public FcHeader(byte type) { this.type=type; } public long size() { return Global.BYTE_SIZE; } public void writeExternal(ObjectOutput out) throws IOException { out.writeByte(type); } public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { type=in.readByte(); } public void writeTo(DataOutputStream out) throws IOException { out.writeByte(type); } public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException { type=in.readByte(); } public String toString() { switch(type) { case REPLENISH: return "REPLENISH"; case CREDIT_REQUEST: return "CREDIT_REQUEST"; default: return "<invalid type>"; } } }}
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -