?? mysocket.java
字號:
package connex.core.net;
import net.jxta.socket.JxtaSocket;
import net.jxta.endpoint.Messenger;
import net.jxta.peergroup.PeerGroup;
import net.jxta.protocol.PipeAdvertisement;
import net.jxta.protocol.PeerAdvertisement;
import net.jxta.document.StructuredDocument;
import java.io.IOException;
import net.jxta.peer.PeerID;
import net.jxta.pipe.PipeMsgEvent;
import net.jxta.socket.JxtaServerSocket;
import net.jxta.endpoint.Message;
import net.jxta.endpoint.MessageElement;
import net.jxta.document.AdvertisementFactory;
import java.io.InputStream;
import net.jxta.impl.util.pipe.reliable.Defs;
import org.apache.log4j.Level;
import net.jxta.document.StructuredDocumentFactory;
import java.util.Iterator;
import org.apache.log4j.Logger;
import net.jxta.impl.util.pipe.reliable.OutgoingMsgrAdaptor;
import net.jxta.impl.util.pipe.reliable.ReliableInputStream;
import net.jxta.endpoint.StringMessageElement;
import net.jxta.document.MimeMediaType;
import net.jxta.pipe.PipeService;
import net.jxta.document.XMLDocument;
import net.jxta.endpoint.TextDocumentMessageElement;
/**
* <p>Title: </p>
* <p>Description: </p>
* <p>Copyright: Copyright (c) 2005</p>
* <p>Company: </p>
* @author unbekannt
* @version 1.0
*/
public class MySocket extends JxtaSocket {
private final static Logger LOG = Logger.getLogger(MySocket.class.getName());
protected PeerAdvertisement peerAdvt;
protected MySocket(PeerGroup group,
Messenger msgr,
PipeAdvertisement newpipe,
StructuredDocument credDoc,
boolean isStream, PeerAdvertisement peerAdv) throws
IOException {
super(group, msgr, newpipe, credDoc, isStream);
setPeerAdv(peerAdv);
}
public MySocket(PeerGroup group,
PeerID peerid,
PipeAdvertisement pipeAd,
int timeout,
boolean stream) throws IOException {
super(group, peerid, pipeAd, timeout, stream);
}
/**
* Create a connection request message
*
*@param group group context
*@param pipeAd pipe advertisement
*@return the Message object
*/
protected Message createOpenMessage(PeerGroup group, PipeAdvertisement pipeAd) throws IOException {
Message msg = new Message();
PeerAdvertisement peerAdv = group.getPeerAdvertisement();
if (myCredentialDoc == null) {
myCredentialDoc = getCredDoc(group);
}
if (myCredentialDoc == null && pipeAd.getType().equals(PipeService.UnicastSecureType)) {
throw new IOException("No credentials established to initiate a secure connection");
}
if (LOG.isEnabledFor(Level.DEBUG)) {
LOG.debug("Requesting connection [isStream] :"+isStream);
}
try {
if (myCredentialDoc != null) {
msg.addMessageElement(JxtaServerSocket.nameSpace,
new TextDocumentMessageElement(JxtaServerSocket.credTag,
(XMLDocument) myCredentialDoc, null));
}
msg.addMessageElement(JxtaServerSocket.nameSpace,
new TextDocumentMessageElement(JxtaServerSocket.reqPipeTag,
(XMLDocument) pipeAd.getDocument(MimeMediaType.XMLUTF8), null));
msg.addMessageElement(JxtaServerSocket.nameSpace,
new StringMessageElement(JxtaServerSocket.streamTag,
Boolean.toString(isStream),
null));
msg.addMessageElement(JxtaServerSocket.nameSpace,
new TextDocumentMessageElement(JxtaServerSocket.remPeerTag,
(XMLDocument) peerAdv.getDocument(MimeMediaType.XMLUTF8), null));
return msg;
} catch (Throwable t) {
if (LOG.isEnabledFor(Level.DEBUG)) {
LOG.debug("error getting element stream", t);
}
return null;
}
}
protected static Messenger lightweightOutputPipe1(PeerGroup group,
PipeAdvertisement outputPipeAdv,
PeerAdvertisement peerAdv) {
return lightweightOutputPipe(group, outputPipeAdv, peerAdv);
}
/**
* {@inheritDoc}
*/
public void pipeMsgEvent(PipeMsgEvent event) {
Message message = event.getMessage();
if (message == null) {
return;
}
MessageElement element = null;
if (!bound) {
// look for a remote pipe answer
element = (MessageElement)
message.getMessageElement(JxtaServerSocket.nameSpace,
JxtaServerSocket.remPipeTag);
if (element != null) {
// connect response
try {
PeerAdvertisement peerAdv = null;
InputStream in = element.getStream();
PipeAdvertisement pa = (PipeAdvertisement)
AdvertisementFactory.newAdvertisement(element.getMimeType(), in);
element = message.getMessageElement(JxtaServerSocket.nameSpace, JxtaServerSocket.remPeerTag);
if (element != null) {
in = element.getStream();
peerAdv = (PeerAdvertisement)
AdvertisementFactory.newAdvertisement(element.getMimeType(), in);
} else {
return;
}
element = message.getMessageElement (JxtaServerSocket.nameSpace, JxtaServerSocket.credTag);
if (element != null) {
in = element.getStream();
credentialDoc = (StructuredDocument)
StructuredDocumentFactory.newStructuredDocument(element.getMimeType(), in);
}
element = message.getMessageElement (JxtaServerSocket.nameSpace, JxtaServerSocket.streamTag);
if (element != null) {
isStream = (element.toString().equals("true"));
}
msgr = lightweightOutputPipe(group, pa, peerAdv);
if (msgr == null) {
// let the connection attempt timeout
if (LOG.isEnabledFor(Level.ERROR)) {
LOG.error("Unable to obtain a back messenger");
}
return;
}
if (isStream) {
// Create the input stream right away, otherwise
// the first few messages from remote will be lost, unless
// we use an intermediate queue.
// FIXME: it would be even better if we could create the
// input stream BEFORE having the output pipe resolved, but
// that would force us to have the MsrgAdaptor block
// until we can give it the real pipe or msgr... later.
createRis();
}
synchronized (finalLock) {
waiting = false;
finalLock.notifyAll();
}
} catch (IOException e) {
if (LOG.isEnabledFor(Level.ERROR)) {
LOG.error("failed to process response message", e);
}
}
}
}
//net.jxta.impl.util.MessageUtil.printMessageStats(message, true);
// look for close request
element = (MessageElement)
message.getMessageElement(JxtaServerSocket.nameSpace,
JxtaServerSocket.closeTag);
if (element != null) {
if (element.toString().equals("close")) {
try {
if (LOG.isEnabledFor(Level.DEBUG)) {
LOG.debug("Received a close request");
}
closeFromRemote();
} catch (IOException ie) {
if (LOG.isEnabledFor(Level.ERROR)) {
LOG.error("failed during closeFromRemote", ie);
}
}
} else if (element.toString().equals("closeACK")) {
if (LOG.isEnabledFor(Level.DEBUG)) {
LOG.debug("Received a close acknowledgement");
}
synchronized(closeLock) {
closeLock.notify();
}
}
}
if (!isStream) {
// isthere data ?
element = (MessageElement)
message.getMessageElement(JxtaServerSocket.nameSpace, JxtaServerSocket.dataTag);
if (element == null) {
return;
}
try {
queue.push(element, -1);
} catch (InterruptedException e) {
if (LOG.isEnabledFor(Level.DEBUG)) {
LOG.debug("Interrupted", e);
}
}
return;
}
Iterator i =
message.getMessageElements(Defs.NAMESPACE, Defs.MIME_TYPE_ACK);
if (i != null && i.hasNext()) {
if (ros != null) {
ros.recv(message);
}
return;
}
i = message.getMessageElements(Defs.NAMESPACE, Defs.MIME_TYPE_BLOCK);
if (i != null && i.hasNext()) {
// It can happen that we receive messages for the input stream
// while we have not finished creating it.
try {
synchronized (finalLock) {
while (waiting) {
finalLock.wait(timeout);
}
}
} catch (InterruptedException ie) {}
if (ris != null) {
ris.recv(message);
}
}
}
private void createRis() {
if (outgoing == null) {
outgoing = new OutgoingMsgrAdaptor(msgr, retryTimeout);
}
if (ris == null) {
ris = new ReliableInputStream(outgoing, retryTimeout);
}
}
private void setPeerAdv(PeerAdvertisement peerAdv) {
this.peerAdvt = peerAdv;
}
public PeerAdvertisement getRemotePeerAdv() {
return this.peerAdvt;
}
public String getRemotePeerID() {
return this.peerAdvt.getPeerID().toString();
}
public String getRemotePeerName() {
return this.peerAdvt.getName();
}
}
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -