?? blockingmessenger.java
字號:
super(baseAddress, redirection, origService, origServiceParam); // We tell our super class that we synchronize on the stateMachine object. Althoug it is not obvious, our getState() // method calls the shared messenger getState() method, which synchronizes on the shared messenger's state machine // object. So, that's what we must specify. Logic would dictate that we pass it to super(), but it is not itself // constructed until super() returns. No way around it. setStateLock(stateMachine); } /** * {@inheritDoc} */ public int getState() { return BlockingMessenger.this.getState(); } /** * {@inheritDoc} */ public void resolve() { BlockingMessenger.this.resolve(); } /** * {@inheritDoc} */ public void close() { BlockingMessenger.this.close(); } /** * {@inheritDoc} * * <p/>Address rewritting done here. */ public boolean sendMessageN( Message msg, String service, String serviceParam ) { return BlockingMessenger.this.sendMessageN( msg, effectiveService(service), effectiveParam(service, serviceParam) ); } /** * {@inheritDoc} * * <p/>Address rewritting done here. */ public void sendMessageB( Message msg, String service, String serviceParam ) throws IOException { BlockingMessenger.this.sendMessageB( msg, effectiveService(service), effectiveParam(service, serviceParam )); } /** * {@inheritDoc} * * <p/>We're supposed to return the complete destination, including * service and param specific to that channel. It is not clear, whether * this should include the cross-group mangling, though. For now, let's * say it does not. */ public EndpointAddress getLogicalDestinationAddress() { EndpointAddress rawLogical = getLogicalDestinationImpl(); if ( rawLogical == null) { return null; } return new EndpointAddress (rawLogical, origService, origServiceParam); } // Check if it is worth staying registered public void itemChanged(Object changedObject) { if (! notifyChange()) { if (haveListeners()) { return; } BlockingMessenger.this.unregisterListener(this); if (! haveListeners()) { return; } // Ooops collision. We should not have unregistered. Next time, then. In case of collision, the end result // is always to stay registered. There's no harm in staying registered. BlockingMessenger.this.registerListener(this); } } /** * {@inheritDoc} * * <p/>Always make sure we're registered with the shared messenger. **/ protected void registerListener( SimpleSelectable l ) { BlockingMessenger.this.registerListener(this); super.registerListener(l); } } private void storeCurrent(Message msg, String service, String param) { currentMessage = msg; currentService = service; currentParam = param; currentThrowable = null; } /** Constructor. * * <p/>We start in the CONNECTED state, we pretend to have a queue of size 1, and we can never re-connect. Although this * messenger fully respects the asynchronous semantics, it is saturated as soon as one msg is being send, and if not * saturated, send is actually performed by the invoker thread. So return is not completely immediate. This is a barely * acceptable implementation, but this is also a transition adapter that is bound to disappear one release from now. The main * goal is to get things going until transports are adapted. * * @param homeGroupID the group that this messenger works for. This is the group of the endpoint service or transport * that created this messenger. * @param dest where messages should be addressed to * @param selfDestruct true if this messenger must self close destruct when idle. <b>Warning:</b> If selfDestruct is used, * this messenger will remained referenced for as long as isIdleImpl returns false. **/ public BlockingMessenger( PeerGroupID homeGroupID, EndpointAddress dest, boolean selfDestruct ) { super(dest); this.homeGroupID = homeGroupID; // We tell our superclass that we synchronize our state on the stateMachine object. Logic would dictate that we pass it // to super(), but it is not itself constructed until super() returns. No way around it. setStateLock(stateMachine); /* * Sets up a timer task that will close this messenger if it says to have become idle. It will keep it referenced * until then.<p/> * * As long as this timer task is scheduled, this messenger is not subject to GC. Therefore, its owner, if any, which is strongly * referenced, is not subject to GC either. This avoids prematurely closing open connections just because a destination is * not currently in use, which we would have to do if CanonicalMessengers could be GCed independantly (and which would * force us to use finalizers, too).<p/> * * Such a mechanism is usefull only if this blocking messenger is expensive to make or holds system resources that require * an explicit invocation of the close method. Else, it is better to let it be GC'ed along with any refering canonical * messenger when memory is tight.<p/> * */ // // FIXME - jice@jxta.org 20040413: we trust transports to implement isIdle reasonably, which may be a leap of faith. We // should probably superimpose a time limit of our own. // if (selfDestruct) { selfDestructTask = new TimerTask() { public void run() { try { try { if (isIdleImpl()) { close(); } } finally { cancel(); } } catch (Throwable uncaught ) { if (LOG.isEnabledFor(Level.ERROR)) { LOG.error("Uncaught Throwable in selfDescructTask. Perpetrator: ", uncaught); } } } }; timer.schedule(selfDestructTask, TimeUtils.AMINUTE, TimeUtils.AMINUTE); } else { selfDestructTask = null; } } /** * Sets an owner for this blocking messenger. Owners are normally canonical messengers. The goal of registering the owner is * to keep that owner reachable as long as this blocking messenger is. Canonical messengers are otherwise softly referenced, * and so, may be deleted whenever memory is tight. * * We do not want to use finalizers or the equivalent reference queue mechanism; so we have no idea when a blocking messenger * is no-longer referenced by any canonical. In addition it may be expensive to make and so we want to keep it for a while * anyway. As a result, instead of keeping a blocking messenger open as long as there is a canonical, we do the opposite: we * keep the canonical (owner, here) as long as the blocking messenger is open (and usually beyond, memory allowing). How long * a blocking messenger will stay around depends upon that messenger's implementation. That may even be left up to the GC, in * the end (if close is not needed AND the messenger is cheap to make). In that case, the owner is likely the only referer, * and so both will have the same lifetime. * * @param owner The object that should be kept referenced at least as long as this one. **/ public void setOwner(Object owner) { this.owner = owner; } /** * A trivial convenience method that transports still depend upon. * The reason it exists is that it used to be non-trivial, when * the group redirection would sometimes be done at this point (the * transports could be asked to send to the non-mangled service and * param, when the application used the implicit defaults). This is * no-longer true: the transport (the blocking messenger) is always * invoked with fully defaulted and mangled service name and param. So * all we have to do is to paste them all together. Eventually blocking * messenger could simply be invoked with an already computed * full destination. **/ protected EndpointAddress getDestAddressToUse( String service, String serviceParam ) { EndpointAddress result = getDestinationAddress(); return new EndpointAddress( result, service, serviceParam ); } /** * A transport may call this to cause an orderly closure of its messengers. **/ protected final void shutdown() { int action; synchronized(stateMachine) { stateMachine.shutdownEvent(); action = eventCalled(); } // We called an event. State may have changed. notifyChange(); performDeferredAction(action); } /** * * {@inheritDoc} * * We overload isClosed because many messengers still use super.isClosed() * as part of their own implementation or don't override it at all. They * expect it to be true only when all is shutdown; not while we're closing * gently. * * FIXME - jice@jxta.org 20040413: transports should get a deeper retrofit eventually. **/ public boolean isClosed() { return ((!lieToOldTransports) && (getState() & TERMINAL) != 0); } /** * {@inheritDoc} * * <p/> getLogicalDestinationAddress() requires resolution (it's the address advertised by the other end). * For a blocking messenger it's easy. We're born resolved. So, just ask the implementor what it is. **/ public final EndpointAddress getLogicalDestinationAddress() { return getLogicalDestinationImpl(); } /** * {@inheritDoc} * * <p/> Some transports historically overload the close method of BlockingMessenger. * The final is there to make sure we know about it. However, there should be no * harm done if the unchanged code is renamed to closeImpl; even if it calls super.close(). * The real problem, however, is transports calling close (was their own, but now it means this one), when * they want to break. It will make things look like someone just called close, but it will not * actually break anything. However, that will cause the state machine to go through the close process. * this will end up calling closeImpl(). That will do. */ public final void close() { int action; synchronized(stateMachine) { stateMachine.closeEvent(); action = eventCalled(); } // We called an event. State may have changed. notifyChange(); performDeferredAction(action); } /** * {@inheritDoc} **/
?? 快捷鍵說明
復(fù)制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -