?? tcpclientimpl.java
字號:
package com.aceway.vas.commons.tcp.impl;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import org.apache.log4j.Logger;
import com.aceway.vas.commons.tcp.IClientHandler;
import com.aceway.vas.commons.tcp.TcpClient;
import com.aceway.vas.commons.tcp.common.TypeConvert;
public class TcpClientImpl implements TcpClient {
static Logger log = Logger.getLogger(TcpClientImpl.class);
private SocketChannel sc = null;
// 包的最大尺寸
private int maxPacketSize = 1024;
// 接收線程的運行標志
private boolean val = true;
// 回調函數
private IClientHandler handler = null;
// 接收緩沖
private ByteBuffer recvBuff = null;
// 接收進程
static RecvThread rt = null;
/**
* TCP線路維護線程
*/
//static
// 存儲當前的實例
static List objs = new ArrayList();
private SocketChannel getSocketChannel() {
return this.sc;
}
private IClientHandler getClientHandler() {
return this.handler;
}
private int getMaxPackageSize() {
return this.maxPacketSize;
}
public boolean connect(String ip, int port) {
try {
InetSocketAddress addr = new InetSocketAddress(ip, port);
// 生成一個socketchannel
sc = SocketChannel.open(addr);
// 連接到server
sc.configureBlocking(false);
// 存儲所有的連接
this.objs.add(this);
// 添加TcpClientImpl的實例到接收線程
rt.add(this);
// 響應連接事件
handler.onConnect(ip, port);
return true;
}catch(Exception e){
log.error("連接錯誤, 可能遠程的服務器已經關閉",e);
return false;
}
}
public boolean disconnect() {
try {
this.sc.close();
} catch (Exception ex) {
ex.printStackTrace();
return false;
}
if (objs.contains(this)) {
objs.remove(this);
}
handler.onDisconnect();
return true;
}
public synchronized int send(byte[] bytes) {
try {
ByteBuffer bb = ByteBuffer.wrap(bytes);
while (bb.hasRemaining())
sc.write(bb);
// 如果是消息的話
if (TypeConvert.byte2int(bytes, 0) != 1)
handler.onSendedMsg(bytes);
else {
// 否則如果是OBJECT的話
byte[] temp = new byte[bytes.length - 8];
System.arraycopy(bytes, 8, temp, 0, temp.length);
ByteArrayInputStream byteIn = new ByteArrayInputStream(temp);
ObjectInputStream in = new ObjectInputStream(byteIn);
Object o = in.readObject();
handler.onSendedMsg((Serializable) o);
}
return 0;
} catch (Exception ex) {
ex.printStackTrace();
try {
sc.close();
} catch (IOException e) {
e.printStackTrace();
}
return 1;
}
}
// 從服務器端獲取信息
private class RecvThread extends Thread {
Selector selector = null;
//存放所有的TcpClientImpl實例
List list = new ArrayList();
public RecvThread() {
try {
selector = Selector.open();
} catch (IOException e) {
e.printStackTrace();
}
}
public void add(TcpClientImpl client) {
synchronized (list) {
list.add(client);
list.notifyAll();
selector.wakeup();
}
}
private void sliceAndDealWithData(TcpClientImpl ci, IClientHandler dh) {
ByteBuffer buff = ci.recvBuff;
int dataLen = buff.position();
int iPos = 0; // 每次切包開始位置
while (dataLen - iPos > 4) {
buff.position(iPos);
if (buff.getInt(iPos) == 1) { // object
if (dataLen - iPos > 8){
int packageSize = buff.getInt(iPos + 4);
if (packageSize > maxPacketSize){
buff.clear();
return;
}
if (dataLen - iPos - 8 >= packageSize ) { // 如果buffer里有一個完整的包
buff.position(iPos + 8);
byte[] byteObj = new byte[packageSize];
buff.get(byteObj, 0, packageSize); // 將消息存放入byteObj里
/* ================還原對象=========開始======== */
ByteArrayInputStream byteIn = new ByteArrayInputStream(
byteObj);
ObjectInputStream in = null;
Object o = null;
try {
in = new ObjectInputStream(byteIn);
o = in.readObject();
} catch (IOException e) {
log.error("接收對象的時候發生錯誤");
e.printStackTrace();
break;
} catch (ClassNotFoundException e) {
log.error("接收對象的時候發生錯誤, 對象沒有找到");
e.printStackTrace();
break;
}
dh.onReceiveMsg(
(Serializable) o);
/* ================還原對象=========結束======== */
iPos += 8 + packageSize;
continue;
} // if (dataLen - iPos > 8)
}
// not enough for one package
break;
} else { // byte stream
byte[] msg = new byte[dataLen - iPos];
buff.get(msg, 0, dataLen - iPos); // 從buffer里取得當前的消息體
int sliceResult = dh.slice(msg);
if (sliceResult > 0) { // 當可以切到一個完整的包的時候
byte[] msgPkt = new byte[sliceResult];
System.arraycopy(msg, 0, msgPkt, 0, sliceResult); // 獲得當前一個完整的包
dh.onReceiveMsg(msgPkt); //回調
iPos += sliceResult;
}
}
}
// 處理切包剩余部分
if (dataLen - iPos > 0) {// remaining
buff.position(iPos);
buff.limit(dataLen);
buff.compact();
} else {
buff.clear();
}
}
public void run() {
int n = 0, read;
while (val) {
// try {
//判斷list里有沒有TcpClientImpl實例, 如果有的話, 注冊到接收線程的selector里
while (list.size() > 0) {
TcpClientImpl client = (TcpClientImpl) list.remove(list
.size() - 1);
SocketChannel sc = client.getSocketChannel();
try {
sc.register(selector, SelectionKey.OP_READ, client);
} catch (ClosedChannelException e) {
e.printStackTrace();
break;
}
}
try {
n = selector.select();
} catch (IOException e) {
n = 0;
val = false;
e.printStackTrace();
} catch(ClosedSelectorException e){
n = 0;
val = false;
e.printStackTrace();
}
if (n > 0) {
Set set = selector.selectedKeys();
java.util.Iterator it = set.iterator();
while (it.hasNext()) {
SelectionKey skey = (SelectionKey) it.next();
it.remove();
try {
if (skey.isReadable()) {
SocketChannel sc = (SocketChannel) skey
.channel();
TcpClientImpl temp = (TcpClientImpl) skey
.attachment();
IClientHandler ch = temp.getClientHandler();
ByteBuffer buffer = temp.recvBuff;
buffer.limit(buffer.capacity());
while ((read = sc.read(buffer)) != -1) {
if (read == 0)
break;
//buffer.flip();
sliceAndDealWithData(temp, ch);
}
}
} catch (java.nio.channels.CancelledKeyException ex) {
try {
skey.channel().close();
} catch (IOException e) {
e.printStackTrace();
}
objs.remove((TcpClientImpl) skey.attachment());
((TcpClientImpl) skey.attachment()).disconnect();
} catch (ClosedSelectorException e1) {
objs.remove((TcpClientImpl) skey.attachment());
((TcpClientImpl) skey.attachment()).disconnect();
val = false;
log.warn("通信鏈路被關閉");
log.warn(e1.getMessage());
} catch (IOException e2) {
objs.remove((TcpClientImpl) skey.attachment());
((TcpClientImpl) skey.attachment()).disconnect();
try {
skey.channel().close();
} catch (Exception ex) {
ex.printStackTrace();
}
log.error("接收消息時發生錯誤, 可能遠程服務器已經被關閉, 也可能本地關閉連接");
log.error(e2.getMessage());
}
}
}
}
}
}
public void setDataHandler(IClientHandler handler, int maxPacketSize) {
this.handler = handler;
this.maxPacketSize = maxPacketSize;
this.recvBuff = ByteBuffer.allocate(maxPacketSize);
if (rt == null) {
rt = new RecvThread();
rt.start();
}
}
/*
* (non-Javadoc)
*
* @see com.aceway.communication.tcp.TcpClient#send(java.io.Serializable)
*/
public int send(Serializable obj) {
// TODO Auto-generated method stub
try {
ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream(byteOut);
out.writeObject(obj);
int len = byteOut.toByteArray().length;
byte[] bytes = new byte[len + 8];
TypeConvert.int2byte(1, bytes, 0);
TypeConvert.int2byte(len, bytes, 4);
System.arraycopy(byteOut.toByteArray(), 0, bytes, 8, byteOut
.toByteArray().length);
return send(bytes); // kongds modified on 2007-06-20
} catch (Exception e) {
log.error("發送失敗",e);
return 1;
}
}
}
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -