?? jxtamulticastsocketservice.java
字號:
package jxtamessenger.service;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.SocketException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Logger;
import jxtamessenger.ChatWindow;
import jxtamessenger.MainApplicationWindow;
import jxtamessenger.bean.OfflineMessage;
import jxtamessenger.bean.OnlineMessage;
import jxtamessenger.util.MiscUtil;
import jxtamessenger.util.PipeUtil;
import jxtamessenger.util.ThreadPoolUtil;
import jxtamessenger.xml.XmlCreator;
import jxtamessenger.xml.XmlParser;
import net.jxta.peergroup.PeerGroup;
import net.jxta.pipe.PipeService;
import net.jxta.protocol.PipeAdvertisement;
import net.jxta.socket.JxtaMulticastSocket;
import org.apache.commons.lang.ClassUtils;
import org.eclipse.jface.viewers.TableViewer;
import org.eclipse.swt.widgets.Display;
public class JxtaMulticastSocketService implements Service {
private static final Logger LOG = Logger.getLogger(JxtaMulticastSocketService.class.getName());
private TableViewer viewer;
private JxtaMulticastSocket mcastSocket;
private final ExecutorService pool;
private final static String SOCKETIDSTR = "urn:jxta:uuid-59616261646162614E5047205032503386E7C7AE38954620A595F809548D680304";
private static final int TIMEOUT = 0;
private static final int BUFFERSIZE = 16384;
public JxtaMulticastSocketService(PeerGroup pg, TableViewer viewer) {
try {
this.mcastSocket = new JxtaMulticastSocket(pg, getSocketAdvertisement(pg));
this.viewer = viewer;
} catch (IOException e) {
LOG.severe("JxtaMulticastSocket initialize failed!");
e.printStackTrace();
System.exit(-1);
}
if (this.mcastSocket != null) {
try {
this.mcastSocket.setSoTimeout(TIMEOUT);
} catch (SocketException se) {
se.printStackTrace(System.out);
}
}
pool = Executors.newCachedThreadPool();
}
public static PipeAdvertisement getSocketAdvertisement(PeerGroup pg) {
return PipeUtil.getPipeAdvWithoutRemoteDiscovery(pg,
"JxtaMulticastSocket", PipeService.PropagateType, SOCKETIDSTR, true);
}
public void shutdownAndAwaitTermination() {
ThreadPoolUtil.shutdownAndAwaitTermination(pool);
if (this.mcastSocket != null) {
this.mcastSocket.close();
}
}
public void run() {
try {
byte[] buffer = new byte[BUFFERSIZE];
for (;;) {
Arrays.fill(buffer,(byte)0);
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
mcastSocket.receive(packet);
pool.execute(new Handler(packet));
}
} catch(IOException e) {
// e.printStackTrace();
pool.shutdown();
} catch (Exception e) {
// ignore
// RejectedExecutionException(線程池關閉shutdown/saturated, 拋出java.util.concurrent.RejectedExecutionException)
// e.printStackTrace();
}
}
class Handler implements Runnable {
private final DatagramPacket packet;
private final ReentrantLock lock = new ReentrantLock();
Handler(DatagramPacket packet) {
this.packet = packet;
}
public void run() {
String sw = new String(packet.getData(), 0, packet.getLength());
LOG.info("sw=" + sw);
Object obj = XmlParser.getObject(sw);
if(ClassUtils.isAssignable(obj.getClass(), OnlineMessage.class)) {
final OnlineMessage msg = (OnlineMessage)obj;
// 如果列表中已經存在該消息發送者的數據,那么說明已經發送過,不需要將自己的信息發送回去
if(viewer.getData(msg.getHostName()) == null) {
// TODO: 加入配置項,在列表中是否顯示自己
Display.getDefault().asyncExec(new Runnable() {
@SuppressWarnings("unchecked")
public void run(){
lock.lock();
try {
if(MainApplicationWindow.chatwin.containsKey(msg.getHostName())) {
ChatWindow chatWindow = (ChatWindow)MainApplicationWindow.chatwin.get(msg.getHostName());
if(chatWindow != null)
chatWindow.enableInputAndSend();
}
if(viewer.getData(msg.getHostName()) == null) {
viewer.add(msg);
viewer.setData(msg.getHostName(), msg);
((List)viewer.getInput()).add(msg);
}
} finally {
lock.unlock();
}
}
});
// 將自己的信息返回給源節點
if(!msg.getHostName().equals(MiscUtil.getHostName())) {
try {
String msgres = XmlCreator.createOnlineMessage();
DatagramPacket res = new DatagramPacket(msgres.getBytes(), msgres.length());
res.setAddress(res.getAddress());
mcastSocket.send(res);
} catch (IOException e) {
LOG.warning("Seng back OnlineMsg failed.");
e.printStackTrace();
}
}
}
} else if(ClassUtils.isAssignable(obj.getClass(), OfflineMessage.class)) {
final OfflineMessage msg = (OfflineMessage)obj;
if(!msg.getHostName().equals(MiscUtil.getHostName())) {
Display.getDefault().asyncExec(new Runnable() {
@SuppressWarnings("unchecked")
public void run(){
lock.lock();
try {
LOG.info(msg.getHostName());
if(MainApplicationWindow.chatwin.containsKey(msg.getHostName())) {
ChatWindow chatWindow = (ChatWindow)MainApplicationWindow.chatwin.get(msg.getHostName());
if(chatWindow != null)
chatWindow.disableInputAndSend(msg.getUserName());
}
Object o = viewer.getData(msg.getHostName());
if(o != null) {
viewer.setData(msg.getHostName(), null);
viewer.remove(o);
((List)viewer.getInput()).remove(o);
}
} finally {
lock.unlock();
}
}
});
}
} else {
// other message ignore
}
}
}
}
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -