?? iocp.cpp
字號:
#include <process.h>
#include <set>
#include <string>
#include "iocp.h"
using std::set;
using std::pair;
using std::string;
#pragma comment(lib,"wsock32")
#pragma comment(lib,"Ws2_32")
/***************************************
NetIOCPBase
***************************************/
//worker接收到的操作
enum NET_OPTYPE
{
NOT_SOCKNEW, //新socket
NOT_SOCKCLOSE, //關閉socket
NOT_DATASEND, //發送數據
NOT_DATARECV, //接收數據
NOT_SHUTDOWN, //關閉server
};
int NetIOCPBase::ref=0;
static void stopTCP(SOCKET s)
{
//int b;
//char buf[256];
shutdown(s, SD_SEND);
//while ((b = recv(s, buf, 256, 0))!=0)
//{
// if (b==SOCKET_ERROR)
// break;
//}
closesocket(s);
}
void NetIOCPBase::senddata(SOCKET s,PerIoData *iodata)
{
assert(iodata->len<=NET_BLOCK_SIZE&&iodata->len>0);
iodata->sock=s;
map<SOCKET,Channel *>::iterator iter;
mmsend_pool_mutex.Lock();
iter=mmsend_pool.find(s);
if(mmsend_pool.end()==iter)
{
mmsend_pool_mutex.Unlock();
delete iodata;
return;
}
Channel *chan=iter->second;
chan->sendqueue.push_back(iodata);
if(chan->busy)
{
mmsend_pool_mutex.Unlock();
return;
}
iodata=chan->sendqueue.front();
chan->sendqueue.pop_front();
chan->busy=true;
mmsend_pool_mutex.Unlock();
aio_send(iodata->sock,iodata);
}
void NetIOCPBase::wkshutdown()
{
set<SOCKET> clients;
{
map<SOCKET,Channel *>::iterator iter;
Locker lock(&mmsend_pool_mutex);
for_each_iter(mmsend_pool,iter)
{
Channel *chan=iter->second;
list<PerIoData *>::iterator dataiter;
for_each_iter(chan->sendqueue,dataiter)
{
delete *dataiter;
}
clients.insert(chan->sock);
delete chan;
}
mmsend_pool.clear();
}
set<SOCKET>::iterator iter;
for_each_iter(clients,iter)
{
close(*iter);
}
for(int i=0;i<(int)mwthread.size();++i)
{
PerIoData *iodata=new PerIoData;
memset(&iodata->overlapped,0,sizeof(OVERLAPPED));
iodata->sock=INVALID_SOCKET;
iodata->op_type=NOT_SHUTDOWN;
BOOL ok=PostQueuedCompletionStatus(miocp,1,iodata->sock,(LPOVERLAPPED)iodata);
assert(ok);
}
for(int i=0;i<(int)mwthread.size();++i)
{
WaitForSingleObject(mwthread[i], INFINITE);
CloseHandle(mwthread[i]);
}
}
void NetIOCPBase::runloop()
{
recv_allnetevents();
NetEvent ne;
while(peek_netevent(&ne))
{
switch(ne.event)
{
case NE_SOCKNEW:
net_open_handler(ne.iodata->sock);
aio_recv(ne.iodata->sock,ne.iodata);
break;
case NE_SOCKCLOSED:
net_closed_handler(ne.iodata->sock);
delete ne.iodata;
break;
case NE_DATARECVED:
net_data_handler(ne.iodata);
delete ne.iodata;
break;
default:
assert(0);
delete ne.iodata;
break;
}
}
}
bool NetIOCPBase::delete_sendqueue(SOCKET s)
{
Channel *chan=NULL;
{
map<SOCKET,Channel *>::iterator iter;
Locker lock(&mmsend_pool_mutex);
iter=mmsend_pool.find(s);
if(iter!=mmsend_pool.end())
{
chan=iter->second;
mmsend_pool.erase(iter);
}
}
if(chan!=NULL)
{
list<PerIoData *>::iterator iter;
for_each_iter(chan->sendqueue,iter)
{
delete *iter;
}
delete chan;
}
return chan!=NULL;
}
bool NetIOCPBase::create_sendqueue(SOCKET s)
{
typedef map<SOCKET,Channel *>::iterator ChanMapIter;
pair<ChanMapIter,bool> ret;
Channel *chan=new Channel;
chan->sock=s;
chan->busy=false;
{
Locker lock(&mmsend_pool_mutex);
ret=mmsend_pool.insert(make_pair(chan->sock,chan));
}
if(!ret.second)
{
delete chan;
}
return ret.second;
}
void NetIOCPBase::worker_func()
{
SOCKET sock;
PerIoData *iodata;
DWORD bytes;
bool running=true;
while (running)
{
bytes=0;
BOOL ret = GetQueuedCompletionStatus(miocp, &bytes, (LPDWORD) & sock,
(LPOVERLAPPED *) & iodata, INFINITE);
if((!ret&&iodata!=NULL) || bytes==0)
{
assert(iodata->sock==sock);
if(delete_sendqueue(iodata->sock))
{
stopTCP(iodata->sock);
NetEvent ne;
ne.event=NE_SOCKCLOSED;
ne.iodata=iodata;
post_netevent(ne);
}
else
{
delete iodata;
}
continue;
}
assert(iodata!=NULL);
assert(iodata->sock==sock);
switch(iodata->op_type)
{
case NOT_SOCKNEW:
{
bool ok=create_sendqueue(iodata->sock);
assert(ok);
NetEvent ne;
ne.event=NE_SOCKNEW;
ne.iodata=iodata;
post_netevent(ne);
}
break;
case NOT_SOCKCLOSE:
{
stopTCP(iodata->sock);
NetEvent ne;
ne.event=NE_SOCKCLOSED;
ne.iodata=iodata;
post_netevent(ne);
}
break;
case NOT_DATASEND:
{
if(bytes<iodata->databuf.len)
{
aio_resend(iodata,bytes);
break;
}
SOCKET s=iodata->sock;
delete iodata;
map<SOCKET,Channel *>::iterator iter;
{
Locker lock(&mmsend_pool_mutex);
iter=mmsend_pool.find(s);
if(iter==mmsend_pool.end())
break;
Channel *chan=iter->second;
if(chan->sendqueue.empty())
{
chan->busy=false;
break;
}
iodata=chan->sendqueue.front();
chan->sendqueue.pop_front();
}
aio_send(iodata->sock,iodata);
}
break;
case NOT_DATARECV:
{
NetEvent ne;
ne.event=NE_DATARECVED;
ne.iodata=iodata;
SOCKET s=iodata->sock;
iodata->len=bytes;
post_netevent(ne);
iodata=new PerIoData;
aio_recv(s,iodata);
}
break;
case NOT_SHUTDOWN:
{
running=false;
delete iodata;
}
break;
default:
assert(0);
break;
}
}
}
void NetIOCPBase::aio_send(SOCKET s,PerIoData *iodata)
{
memset(&iodata->overlapped,0,sizeof(OVERLAPPED));
iodata->sock=s;
iodata->op_type=NOT_DATASEND;
iodata->databuf.buf=iodata->data;
iodata->databuf.len=iodata->len;
DWORD bytes = 0;
DWORD flags = 0;
int ret = WSASend(iodata->sock, &iodata->databuf, 1, &bytes, flags,
(LPWSAOVERLAPPED)iodata, NULL);
//發送失敗,通知關閉連接
if (ret == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING)
{
BOOL ok=PostQueuedCompletionStatus(miocp, 0, iodata->sock, (LPOVERLAPPED)iodata);
assert(ok);
}
}
void NetIOCPBase::aio_recv(SOCKET s,PerIoData *iodata)
{
memset(&iodata->overlapped,0,sizeof(OVERLAPPED));
iodata->sock=s;
iodata->op_type=NOT_DATARECV;
iodata->databuf.buf=iodata->data;
iodata->databuf.len=sizeof(iodata->data);
DWORD bytes = 0;
DWORD flags = 0;
int ret = WSARecv(iodata->sock, &iodata->databuf, 1, &bytes, &flags,
(LPWSAOVERLAPPED)iodata, NULL);
if (ret == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING)
{
BOOL ok=PostQueuedCompletionStatus(miocp, 0, iodata->sock, (LPOVERLAPPED)iodata);
assert(ok);
}
}
void NetIOCPBase::aio_resend(PerIoData *iodata,int lastlen)
{
assert(lastlen<(int)iodata->databuf.len);
iodata->databuf.buf+=lastlen;
iodata->databuf.len-=lastlen;
DWORD bytes = 0;
DWORD flags = 0;
int ret = WSASend(iodata->sock, &iodata->databuf, 1, &bytes, flags,
(LPWSAOVERLAPPED)iodata, NULL);
//發送失敗,通知關閉連接
if (ret == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING)
{
BOOL ok=PostQueuedCompletionStatus(miocp, 0, iodata->sock, (LPOVERLAPPED)iodata);
assert(ok);
}
}
void NetIOCPBase::close(SOCKET s)
{
if(delete_sendqueue(s))
{
PerIoData *iodata=new PerIoData;
memset(&iodata->overlapped,0,sizeof(OVERLAPPED));
iodata->sock=s;
iodata->op_type=NOT_SOCKCLOSE;
BOOL ok=PostQueuedCompletionStatus(miocp, 1, iodata->sock, (LPOVERLAPPED)iodata);
assert(ok);
}
}
NetIOCPBase::NetIOCPBase()
{
if(ref==0)
{
WSADATA data;
if (WSAStartup(0x0202, &data) != 0)
{
throw string("WSAStartup fail!");
}
}
started=false;
miocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
if (NULL == miocp)
throw string("CreateIoCompletionPort fail!");
++ref;
}
NetIOCPBase::~NetIOCPBase()
{
if(started)
{
stop();
started=false;
}
if(miocp!=NULL)
{
CloseHandle(miocp);
miocp=NULL;
}
if(ref==1)
{
WSACleanup();
}
--ref;
}
void NetIOCPBase::start()
{
if(!started)
{
SYSTEM_INFO sys_info;
GetSystemInfo(&sys_info);
for (int i = 0;i < (int)sys_info.dwNumberOfProcessors*2;++i)
{
HANDLE thread = (HANDLE)_beginthreadex(NULL, 0, completion_port_worker_thread, this, 0, NULL);
if (NULL == thread)
throw string("_beginthreadex,completion_port_worker_thread,fail!");
mwthread.push_back(thread);
}
start_run();
started=true;
}
}
void NetIOCPBase::stop()
{
if(started)
{
stop_run();
wkshutdown();
started=false;
}
}
unsigned int __stdcall NetIOCPBase::completion_port_worker_thread(void *cookie)
{
((NetIOCPBase *)cookie)->worker_func();
_endthreadex(0);
return 0;
}
/***************************************
NetIOCPServer
***************************************/
void NetIOCPServer::accept_func()
{
while(1)
{
SOCKET client = accept(mserver, NULL, NULL);
if (client == INVALID_SOCKET)
{
//如果服務器套接字已經關閉,退出線程
int ret = WSAGetLastError();
if (ret == WSAEINTR ||
ret == WSAENOTSOCK||
ret == WSANOTINITIALISED)
{
break;
}
assert(0);
continue;
}
HANDLE port = CreateIoCompletionPort((HANDLE)client, miocp, (DWORD)client, 0);
//如果不能將套接字關聯到完成端口
if (port == NULL)
{
stopTCP(client);
continue;
}
PerIoData *iodata=new PerIoData;
memset(&iodata->overlapped,0,sizeof(OVERLAPPED));
iodata->sock=client;
iodata->op_type=NOT_SOCKNEW;
BOOL ok=PostQueuedCompletionStatus(miocp,1,iodata->sock,(LPOVERLAPPED)iodata);
assert(ok);
}
}
NetIOCPServer::NetIOCPServer(unsigned short port)
{
mport=port;
mserver= WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
if (mserver == INVALID_SOCKET)
throw string("WSASocket fail!");
sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl(INADDR_ANY);
addr.sin_port = htons(mport);
if (bind(mserver, (sockaddr *)&addr, sizeof(addr)) != 0)
throw string("bind fail!");
if (listen(mserver, SOMAXCONN) != 0)
throw string("listen fail!");
mathread=NULL;
}
NetIOCPServer::~NetIOCPServer()
{
stop_run();
}
void NetIOCPServer::start_run()
{
mathread=(HANDLE)_beginthreadex(NULL, 0, completion_port_accept_thread, this, 0, NULL);
if(mathread==NULL)
throw string("_beginthreadex,completion_port_accept_thread,fail!");
}
void NetIOCPServer::stop_run()
{
if(mserver!=INVALID_SOCKET)
{
closesocket(mserver);
mserver=INVALID_SOCKET;
if(mathread!=NULL)
{
WaitForSingleObject(mathread,INFINITE);
CloseHandle(mathread);
mathread=NULL;
}
}
}
unsigned int __stdcall NetIOCPServer::completion_port_accept_thread(void *cookie)
{
((NetIOCPServer *)cookie)->accept_func();
_endthreadex(0);
return 0;
}
/***************************************
NetIOCPClient
***************************************/
//
// 注意: connect 成功后
// 參數SOCKET返回連接的socket,此時底層尚未完成對該socket的資源分配
// iocp完成資源分配后會在net_open_handler里面將該socket返回
//
bool NetIOCPClient::connect(const char *dotip,unsigned short port,SOCKET *ss)
{
SOCKET s = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
if (s == INVALID_SOCKET)
{
return false;
}
sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = inet_addr(dotip);
addr.sin_port = htons(port);
if (::connect(s, (sockaddr *)&addr, sizeof(addr)) != 0)
{
closesocket(s);
return false;
}
HANDLE hdl = CreateIoCompletionPort((HANDLE)s, miocp, (DWORD)s, 0);
if (hdl == NULL)
{
stopTCP(s);
return false;
}
PerIoData *iodata=new PerIoData;
memset(&iodata->overlapped,0,sizeof(OVERLAPPED));
iodata->sock=s;
iodata->op_type=NOT_SOCKNEW;
BOOL ok=PostQueuedCompletionStatus(miocp, 1, iodata->sock, (LPOVERLAPPED)iodata);
assert(ok);
if(ss)
*ss=s;
return true;
}
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -