?? blockingmessenger.java
字號:
/* * * $Id: BlockingMessenger.java,v 1.13 2006/04/18 00:51:27 bondolo Exp $ * * Copyright (c) 2004 Sun Microsystems, Inc. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in * the documentation and/or other materials provided with the * distribution. * * 3. The end-user documentation included with the redistribution, * if any, must include the following acknowledgment: * "This product includes software developed by the * Sun Microsystems, Inc. for Project JXTA." * Alternately, this acknowledgment may appear in the software itself, * if and wherever such third-party acknowledgments normally appear. * * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" * must not be used to endorse or promote products derived from this * software without prior written permission. For written * permission, please contact Project JXTA at http://www.jxta.org. * * 5. Products derived from this software may not be called "JXTA", * nor may "JXTA" appear in their name, without prior written * permission of Sun. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL SUN MICROSYSTEMS OR * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * * ==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of Project JXTA. For more * information on Project JXTA, please see * <http://www.jxta.org/>. * * This license is based on the BSD license adopted by the Apache Foundation. */package net.jxta.impl.endpoint;import java.util.Timer;import java.util.TimerTask;import java.io.IOException;import java.io.InterruptedIOException;import org.apache.log4j.Level;import org.apache.log4j.Logger;import net.jxta.endpoint.AbstractMessenger;import net.jxta.endpoint.ChannelMessenger;import net.jxta.endpoint.EndpointAddress;import net.jxta.endpoint.Message;import net.jxta.endpoint.Messenger;import net.jxta.endpoint.MessengerState;import net.jxta.endpoint.OutgoingMessageEvent;import net.jxta.peergroup.PeerGroupID;import net.jxta.util.SimpleSelectable;import net.jxta.impl.util.TimeUtils;import net.jxta.impl.util.TimerThreadNamer;/** * This class is a near-drop-in replacement for the previous BlockingMessenger class. * To subclassers (that is, currently, transports) the only difference is that some * overloaded methods have a different name (class hierarchy reasons made it impossible * to preserve the names without forcing an API change for applications). * * The other difference which is not API visible, is that it implements the * standard MessengerState behaviour and semantics required by the changes in the endpoint framework. * * This the only base messenger class meant to be extended by outside code that is in the impl tree. The * reason being that what it replaces was there already and that new code should not become dependant upon it. * **/public abstract class BlockingMessenger extends AbstractMessenger { /** * Log4J Logger **/ private final static transient Logger LOG = Logger.getLogger(BlockingMessenger.class.getName()); /** * The self destruct timer. * * <p/>When a messenger has become idle, it is closed. As a side effect, it * makes the owning canonical messenger, if any, subject to removal if it is * otherwise unreferenced. **/ private final static transient Timer timer = new Timer( "BlockingMessenger self destruct timer", true ); /* * Actions that we defer to after returning from event methods. In other words, they cannot be done with the lock held, or * they require calling more event methods. Because this messenger can take only one message at a time (saturated while * sending), actions do not cascade much. Start can lead to connect if the sending fails, but, because we always fail to * connect, connect will not lead to start. As a result we can get away with performing deferred actions recursively. That * simplifies the code. */ /** * No action deferred. **/ private static final int ACTION_NONE = 0; /** * Must send the current message. **/ private static final int ACTION_SEND = 1; /** * Must report failure to connect. **/ private static final int ACTION_CONNECT = 2; /** * The outstanding message. **/ private Message currentMessage = null; /** * The serviceName override for that message. **/ private String currentService = null; /** * The serviceParam override for that message. */ private String currentParam = null; /** * The exception that caused that message to not be sent. **/ private Throwable currentThrowable = null; /** * true if we have deliberately closed our one message input queue. **/ private boolean inputClosed = false; /** * Need to know which group this transport lives in, so that we can suppress channel redirection when in the same group. This is currently the norm. **/ private final PeerGroupID homeGroupID; /** * The current deferred action. **/ private int deferredAction = ACTION_NONE; /** * Reference to owning object. This is there so that the owning object is not subject to garbage collection * unless this object here becomes itself unreferenced. That happens when the self destruct timer closed it. **/ private Object owner = null; /** * The timer task watching over our self destruction requirement. **/ private final TimerTask selfDestructTask; /** * State lock and engine. **/ private final BlockingMessengerState stateMachine = new BlockingMessengerState(); /** * legacy artefact: transports need to believe the messenger is not yet closed in order to actually close it. * So we lie to them just while we run their closeImpl method so that they do not see that the messenger is * officially closed. **/ private boolean lieToOldTransports = false; /** * Our statemachine implementation; just connects the standard AbstractMessengerState action methods to * this object. **/ private class BlockingMessengerState extends MessengerState { protected BlockingMessengerState() { super(true); } /* * The required action methods. */ /** * {@inheritDoc} */ protected void connectAction() { deferredAction = ACTION_CONNECT; } /** * {@inheritDoc} */ protected void startAction() { deferredAction = ACTION_SEND; } /** * {@inheritDoc} */ protected void closeInputAction() { // we're synchonized here. (invoked from stateMachine). inputClosed = true; } /** * {@inheritDoc} */ protected void closeOutputAction() { // This will break the cnx; thereby causing a down event if we have a send in progress. // If the cnx does not break before the current message is sent, then the message will be sent successfully, // resulting in an idle event. Either of these events is enough to complete the shutdown process. lieToOldTransports = true; closeImpl(); lieToOldTransports = false; // Disconnect from the timer. if (selfDestructTask != null) { selfDestructTask.cancel(); } } // This is a synchronous action. No synchronization needed: we're already synchronized, here. // There's a subtlety here: we do not clear the current message. We let sendMessageB or sendMessageN // deal with it, so that they can handle the status reporting each in their own way. So basically, all we // do is to set a reason for that message to fail in case we are shutdown from the outside and that message // is not sent yet. As long as there is a current message, it is guaranteed that there is a thread // in charge of reporting its status. It is also guaranteed that when failAll is called, the input is // already closed, and so, we have no obligation of making room for future messages immediately. // All this aggravation is so that we do not have to create one context wrapper for each message just so // that we can associate it with its result. Instead we use our single msg and single status model // throughout. protected void failAllAction() { if (currentMessage == null) { return; } if (currentThrowable == null) { currentThrowable = new IOException("Messenger unexpectedly closed"); } } } /** * The implementation of channel messenger that getChannelMessenger returns: * All it does is address rewritting. Even close() is forwarded to the shared messenger. * The reason is that BlockingMessengers are not really shared; they're transitional * entities used directly by CanonicalMessenger. GetChannel is used only to provide address * rewritting when we pass a blocking messenger directly to incoming messenger listeners...this * practice is to be removed in the future, in favor of making incoming messengers full-featured * async messengers that can be shared. **/ private final class BlockingMessengerChannel extends ChannelMessenger { public BlockingMessengerChannel( EndpointAddress baseAddress, PeerGroupID redirection, String origService, String origServiceParam ) {
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -