?? socketproxy.java
字號:
/***************************************************************************
* *
* SocketProxy.java *
* ------------------- *
* date : 12.08.2004 *
* copyright : (C) 2004-2008 Distributed and *
* Mobile Systems Group *
* Lehrstuhl fuer Praktische Informatik *
* Universitaet Bamberg *
* http://www.uni-bamberg.de/pi/ *
* email : sven.kaffille@uni-bamberg.de *
* karsten.loesing@uni-bamberg.de *
* *
* *
***************************************************************************/
/***************************************************************************
* *
* This program is free software; you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation; either version 2 of the License, or *
* (at your option) any later version. *
* *
* A copy of the license can be found in the license.txt file supplied *
* with this software or at: http://www.gnu.org/copyleft/gpl.html *
* *
***************************************************************************/
package de.uniba.wiai.lspi.chord.com.socket;
import static de.uniba.wiai.lspi.util.logging.Logger.LogLevel.DEBUG;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import de.uniba.wiai.lspi.chord.com.CommunicationException;
import de.uniba.wiai.lspi.chord.com.Endpoint;
import de.uniba.wiai.lspi.chord.com.Entry;
import de.uniba.wiai.lspi.chord.com.Node;
import de.uniba.wiai.lspi.chord.com.Proxy;
import de.uniba.wiai.lspi.chord.com.RefsAndEntries;
import de.uniba.wiai.lspi.chord.data.ID;
import de.uniba.wiai.lspi.chord.data.URL;
import de.uniba.wiai.lspi.util.logging.Logger;
/**
* This is the implementation of {@link Proxy} for the socket protocol. This
* connects to the {@link SocketEndpoint endpoint} of the node it represents by
* means of <code>Sockets</code>.
*
* @author sven
* @version 1.0.5
*/
public final class SocketProxy extends Proxy implements Runnable {
/**
* The logger for instances of this class.
*/
private final static Logger logger = Logger.getLogger(SocketProxy.class);
/**
* Map of existing proxies. Key: {@link String}, Value: {@link SocketProxy}.
* changed on 21.03.2006 by sven. See documentation of method
* {@link #createProxyKey(URL, URL)}
*
*/
private static Map<String, SocketProxy> proxies = new HashMap<String, SocketProxy>();
/**
* The {@link URL}of the node that uses this proxy to connect to the node,
* which is represented by this proxy.
*
*/
private URL urlOfLocalNode = null;
/**
* Counter for requests that have been made by this proxy. Also required to
* create unique identifiers for {@link Request requests}.
*/
private long requestCounter = -1;
/**
* The socket that provides the connection to the node that this is the
* Proxy for. This is transient as a proxy can be transferred over the
* network. After transfer this socket has to be restored by reconnecting to
* the node.
*/
private transient Socket mySocket;
/**
* The {@link ObjectOutputStream}this Proxy writes objects to. This is
* transient as a proxy can be transferred over the network. After transfer
* this stream has to be restored.
*/
private transient ObjectOutputStream out;
/**
* The {@link ObjectInputStream}this Proxy reads objects from. This is
* transient as a proxy can be transferred over the network. After transfer
* this stream has to be restored.
*/
private transient ObjectInputStream in;
/**
* The {@link ObjectInputStream} this Proxy reads objects from. This is
* transient as a proxy can be transferred over the network. After transfer
* this stream has to be restored.
*/
private transient Map<String, Response> responses;
/**
* {@link Map} where threads are put in that are waiting for a repsonse.
* Key: identifier of the request (same as for the response). Value: The
* Thread itself.
*/
private transient Map<String, WaitingThread> waitingThreads;
/**
* This indicates that an exception occured while waiting for responses and
* that the connection to the {@link Node node}, that this is the proxy
* for, could not be reestablished.
*/
private volatile boolean disconnected = false;
/**
* Establishes a connection from <code>urlOfLocalNode</code> to
* <code>url</code>. The connection is represented by the returned
* <code>SocketProxy</code>.
*
* @param url
* The {@link URL} to connect to.
* @param urlOfLocalNode
* {@link URL} of local node that establishes the connection.
* @return <code>SocketProxy</code> representing the established
* connection.
* @throws CommunicationException
* Thrown if establishment of connection to <code>url</code>
* failed.
*/
public static SocketProxy create(URL urlOfLocalNode, URL url)
throws CommunicationException {
synchronized (proxies) {
/*
* added on 21.03.2006 by sven. See documentation of method
* createProxyKey(URL, URL);
*/
String proxyKey = SocketProxy.createProxyKey(urlOfLocalNode, url);
logger.debug("Known proxies " + SocketProxy.proxies.keySet());
if (proxies.containsKey(proxyKey)) {
logger.debug("Returning existing proxy for " + url);
return proxies.get(proxyKey);
} else {
logger.debug("Creating new proxy for " + url);
SocketProxy newProxy = new SocketProxy(url, urlOfLocalNode);
proxies.put(proxyKey, newProxy);
return newProxy;
}
}
}
/**
* Closes all outgoing connections to other peers. Allows the local peer to
* shutdown cleanly.
*
*/
static void shutDownAll() {
Set<String> keys = proxies.keySet();
for (String key : keys) {
proxies.get(key).disconnect();
}
proxies.clear();
}
/**
* Creates a <code>SocketProxy</code> representing the connection from
* <code>urlOfLocalNode</code> to <code>url</code>. The connection is
* established when the first (remote) invocation with help of the
* <code>SocketProxy</code> occurs.
*
* @param url
* The {@link URL} of the remote node.
* @param urlOfLocalNode
* The {@link URL} of local node.
* @param nodeID
* The {@link ID} of the remote node.
* @return SocketProxy
*/
protected static SocketProxy create(URL url, URL urlOfLocalNode, ID nodeID) {
synchronized (proxies) {
/*
* added on 21.03.2006 by sven. See documentation of method
* createProxyKey(String, String);
*/
String proxyKey = SocketProxy.createProxyKey(urlOfLocalNode, url);
logger.debug("Known proxies " + SocketProxy.proxies.keySet());
if (proxies.containsKey(proxyKey)) {
logger.debug("Returning existing proxy for " + url);
return proxies.get(proxyKey);
} else {
logger.debug("Creating new proxy for " + url);
SocketProxy proxy = new SocketProxy(url, urlOfLocalNode, nodeID);
proxies.put(proxyKey, proxy);
return proxy;
}
}
}
/**
* Method that creates a unique key for a SocketProxy to be stored in
* {@link #proxies}.
*
* This is important for the methods {@link #create(URL, URL)},
* {@link #create(URL, URL, ID)}, and {@link #disconnect()}, so that
* socket communication also works when it is used within one JVM.
*
* Added by sven 21.03.2006, as before SocketProxy were stored in
* {@link #proxies} with help of their remote URL as key, so that they were
* a kind of singleton for that URL. But the key has to consist of the URL
* of the local peer, that uses the proxy, and the remote URL as
* SocketProxies must only be (kind of) a singleton per local and remote
* URL.
*
* @param localURL
* @param remoteURL
* @return The key to store the SocketProxy
*/
private static String createProxyKey(URL localURL, URL remoteURL) {
return localURL.toString() + "->" + remoteURL.toString();
}
/**
* Corresponding constructor to factory method {@link #create(URL, URL, ID)}.
*
* @see #create(URL, URL, ID)
* @param url
* @param urlOfLocalNode1
* @param nodeID1
*/
protected SocketProxy(URL url, URL urlOfLocalNode1, ID nodeID1) {
super(url);
if (url == null || urlOfLocalNode1 == null || nodeID1 == null) {
throw new IllegalArgumentException("null");
}
this.urlOfLocalNode = urlOfLocalNode1;
this.nodeID = nodeID1;
}
/**
* Corresponding constructor to factory method {@link #create(URL, URL)}.
*
* @see #create(URL, URL)
* @param url
* @param urlOfLocalNode1
* @throws CommunicationException
*/
private SocketProxy(URL url, URL urlOfLocalNode1)
throws CommunicationException {
super(url);
if (url == null || urlOfLocalNode1 == null) {
throw new IllegalArgumentException(
"URLs must not be null!");
}
this.urlOfLocalNode = urlOfLocalNode1;
this.initializeNodeID();
logger.info("SocketProxy for " + url + " has been created.");
}
/**
* Private method to send requests over the socket. This method is
* synchronized to ensure that no other thread concurrently accesses the
* {@link ObjectOutputStream output stream}<code>out</code> while sending
* {@link Request request}.
*
* @param request
* The {@link Request}to be sent.
* @throws CommunicationException
* while writing to {@link ObjectOutputStream output stream}.
*/
private synchronized void send(Request request)
throws CommunicationException {
try {
logger.debug("Sending request " + request.getReplyWith());
this.out.writeObject(request);
this.out.flush();
this.out.reset();
} catch (IOException e) {
throw new CommunicationException("Could not connect to node "
+ this.nodeURL, e);
}
}
/**
* Private method to create an identifier that enables this to associate a
* {@link Response response}with a {@link Request request}made before.
* This method is synchronized to protect {@link #requestCounter}from race
* conditions.
*
* @param methodIdentifier
* Integer identifying the method this method is called from.
* @return Unique Identifier for the request.
*/
private synchronized String createIdentifier(int methodIdentifier) {
/* Create unique identifier from */
StringBuilder uid = new StringBuilder();
/* Time stamp */
uid.append(System.currentTimeMillis());
uid.append("-");
/* counter and */
uid.append(this.requestCounter++);
/* methodIdentifier */
uid.append("-");
uid.append(methodIdentifier);
return uid.toString();
}
/**
* Called in a method that is delegated to the {@link Node node}, that this
* is the proxy for. This method blocks the thread that calls the particular
* method until a {@link Response response} is received.
*
* @param request
* @return The {@link Response} for <code>request</code>.
* @throws CommunicationException
*/
private Response waitForResponse(Request request)
throws CommunicationException {
String responseIdentifier = request.getReplyWith();
Response response = null;
logger.debug("Trying to wait for response with identifier "
+ responseIdentifier + " for method "
+ MethodConstants.getMethodName(request.getRequestType()));
synchronized (this.responses) {
logger.debug("No of responses " + this.responses.size());
/* Test if we got disconnected while waiting for lock on object */
if (this.disconnected) {
throw new CommunicationException("Connection to remote host "
+ " is broken down. ");
}
/*
* Test if response is already available (Maybe response arrived
* before we reached this point).
*/
response = this.responses.remove(responseIdentifier);
if (response != null) {
return response;
}
/* WAIT FOR RESPONSE */
/* add current thread to map of threads waiting for a response */
WaitingThread wt = new WaitingThread(Thread.currentThread());
this.waitingThreads.put(responseIdentifier, wt);
while (!wt.hasBeenWokenUp()) {
try {
/*
* Wait until notified or time out is reached.
*/
logger.debug("Waiting for response to arrive.");
this.responses.wait();
} catch (InterruptedException e) {
/*
* does not matter as this is intended Thread is interrupted
* if response arrives
*/
}
}
logger.debug("Have been woken up from waiting for response.");
/* remove thread from map of threads waiting for a response */
this.waitingThreads.remove(responseIdentifier);
/* try to get the response if available */
response = this.responses.remove(responseIdentifier);
logger.debug("Response for request with identifier "
+ responseIdentifier + " for method "
+ MethodConstants.getMethodName(request.getRequestType())
+ " received.");
/* if no response availabe */
if (response == null) {
logger.debug("No response received.");
/* we have been disconnected */
if (this.disconnected) {
logger.info("Connection to remote host lost.");
throw new CommunicationException(
"Connection to remote host " + " is broken down. ");
}
/* or time out has elapsed */
?? 快捷鍵說明
復(fù)制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -