?? p2psocket.java
字號:
import java.net.*;
import java.io.*;
import net.jxta.pipe.*;
import net.jxta.protocol.*;
import net.jxta.peergroup.*;
import net.jxta.document.*;
import net.jxta.discovery.*;
import net.jxta.exception.*;
import net.jxta.endpoint.*;
import net.jxta.impl.endpoint.*;
public class P2PSocket
extends Thread
implements OutputListener,PipeMsgListener
{
private String inputPipeName = null; //輸入管道的名稱
private String outputPipeName = null; //輸出管道的名稱
private PipeAdvertisement inputPipeAdv = null; //輸入管道廣告
private PipeAdvertisement outputPipeAdv = null; //輸出管道廣告
private InputPipe ip = null; //輸入管道
private OutputPipe op = null; //輸出管道
private PipeMsgListener pml = null; //輸入管道的監聽器
private OutputListener opl = null; //輸出管道的監聽器
private PeerGroup pg = null; //所屬的組
private PipeService ps = null; //管道服務
private DiscoveryService disc = null; //發現服務
public P2PSocket()
{
if(pg == null) this.newPeerGroup();
ps = pg.getPipeService();
disc = pg.getDiscoveryService();
}
public P2PSocket(String inputPipeName,String outputPipeName)
{
if(pg == null) this.newPeerGroup();
ps = pg.getPipeService();
disc = pg.getDiscoveryService();
this.setInputPipeName(inputPipeName);
this.setOutputPipeName(outputPipeName);
}
/**
* Peer與InputPipe綁定,開始監聽---------------------------------(1)
*/
public boolean bind()
{
for(int i=0;i<5;i++)
{
try
{
this.start();
if(pml != null) ip = ps.createInputPipe(inputPipeAdv,pml);
else ip = ps.createInputPipe(inputPipeAdv,this);
}catch(Exception e)
{
System.out.println("Error creating input pipe.");
}
if(ip != null)
{
return true;
}
}
return false;
}
/**
* 負責定時發布與InputPipe綁定的信息---------------------------------(2)
*/
public void run()
{
try
{
disc.remotePublish(inputPipeAdv,DiscoveryService.ADV,10*60*1000);
disc.publish(inputPipeAdv,DiscoveryService.ADV,10*60*1000,10*60*1000);
this.sleep(10*60*1000);
}catch(InterruptedException ie)
{
System.out.println("出錯!"+ie);
}catch(IOException ioe)
{
System.out.println("出錯!"+ioe);
System.exit(-1);
}
}
/**
* Peer與OutputPipe綁定,建立通信連接---------------------------------(3)
*/
public boolean connect()
{
for(int i=0;i<10;i++)
{
try
{
System.out.println("Create Outpipe");
if(opl != null) ps.createOutputPipe(outputPipeAdv,opl);
else ps.createOutputPipe(outputPipeAdv,this);
}catch(Exception e)
{
System.out.println("Error creating ounput pipe.");
e.printStackTrace();
}
if( opl != null && opl.getOutputPipe() != null )break;
try
{
Thread.sleep(5*1000);
}catch(InterruptedException ie)
{
System.out.println("出錯!"+ie);
System.exit(-1);
}
}
if( opl != null && opl.getOutputPipe() != null )
{
this.op = opl.getOutputPipe();
return true;
}
else
{
System.out.println("通信連接失敗!");
System.exit(-1);
}
return false;
}
public boolean connect(String outputPipeName)
{
this.setOutputPipeName(outputPipeName);
if( this.connect() ) return true;
else return false;
}
public void outputPipeEvent(OutputPipeEvent event)
{
op = event.getOutputPipe();
}
/**
* 通過OutputPipe發送消息---------------------------------(4)
*/
public boolean send(Message mess)
{
if(opl != null) op = opl.getOutputPipe();
while(op == null)
{
try
{
Thread.sleep(5*1000);
}catch(InterruptedException ie)
{
System.out.println("出錯!"+ie);
System.exit(-1);
}
}
try
{
op.send(mess);
return true;
}catch(IOException ioe)
{
System.out.println("發送消息失敗!");
return false;
}
}
public void pipeMsgEvent(PipeMsgEvent event){}
/**
* 設定OutputPipe的監聽器,監聽器需要實現接口outputListener--------(5)
*/
public void setOutListener(OutputListener opl) { this.opl = opl; }
/**
* 設定InputPipe的監聽器---------------------------------(6)
*/
public void setInListener(PipeMsgListener opl) { this.pml = pml; }
public void setPeerGroup(PeerGroup pg)
{
this.pg = pg;
this.ps = pg.getPipeService();
this.disc = pg.getDiscoveryService();
}
public void setInputPipeName(String inputPipeName)
{
this.inputPipeName = inputPipeName;
inputPipeAdv = createPipeAdvFromFile("adv/"+inputPipeName+".xml");
}
public void setOutputPipeName(String outputPipeName)
{
this.outputPipeName = outputPipeName;
outputPipeAdv = createPipeAdvFromFile("adv/"+outputPipeName+".xml");
}
public InputPipe getInputPipe() { return this.ip; }
public String getInputPipeName() { return this.inputPipeName; }
public OutputPipe getOutputPipe() { return this.op; }
public String getOutputPipeName() { return this.outputPipeName; }
private void newPeerGroup()
{
try
{
pg = PeerGroupFactory.newNetPeerGroup();
}catch(PeerGroupException e)
{
System.out.println("fatal error : group creation failure");
e.printStackTrace();
System.exit(-1);
}
}
private PipeAdvertisement createPipeAdvFromFile(String filename)
{
PipeAdvertisement pipeAdv = null;
try
{
FileInputStream is = new FileInputStream(filename);
pipeAdv = (PipeAdvertisement) AdvertisementFactory.newAdvertisement(new MimeMediaType("text/xml"),is);
is.close();
}catch(Exception e)
{
System.out.println("Error to create PipeAdvertisement from file");
e.printStackTrace();
System.exit(1);
}
return pipeAdv;
}
};
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -