?? socketserver.cpp
字號:
{
CIOBuffer *pBuffer = Allocate();
pBuffer->SetUserData(IO_Close);
pSocket->AddRef();
m_iocp.PostStatus((ULONG_PTR)pSocket, 0, pBuffer);
}
void CSocketServer::Read(
Socket *pSocket,
CIOBuffer *pBuffer)
{
// Post a read request to the iocp so that the actual socket read gets performed by
// one of our IO threads...
if (!pBuffer)
{
pBuffer = Allocate();
}
else
{
pBuffer->AddRef();
}
pBuffer->SetUserData(IO_Read_Request);
pSocket->AddRef();
m_iocp.PostStatus((ULONG_PTR)pSocket, 0, pBuffer);
}
void CSocketServer::Write(
Socket *pSocket,
const char *pData,
size_t dataLength,
bool thenShutdown)
{
// Post a write request to the iocp so that the actual socket write gets performed by
// one of our IO threads...
CIOBuffer *pBuffer = Allocate();
pBuffer->AddData(pData, dataLength);
pBuffer->SetUserData(IO_Write_Request);
pSocket->AddRef();
m_iocp.PostStatus((ULONG_PTR)pSocket, thenShutdown, pBuffer);
}
void CSocketServer::Write(
Socket *pSocket,
CIOBuffer *pBuffer,
bool thenShutdown)
{
// Post a write request to the iocp so that the actual socket write gets performed by
// one of our IO threads...
pBuffer->AddRef();
pBuffer->SetUserData(IO_Write_Request);
pSocket->AddRef();
m_iocp.PostStatus((ULONG_PTR)pSocket, thenShutdown, pBuffer);
}
void CSocketServer::OnError(
const _tstring &message)
{
Output(message);
}
///////////////////////////////////////////////////////////////////////////////
// CSocketServer::Socket
///////////////////////////////////////////////////////////////////////////////
CSocketServer::Socket::Socket(
CSocketServer &server,
SOCKET theSocket)
: m_server(server),
m_socket(theSocket),
m_ref(1)
{
if (INVALID_SOCKET == m_socket)
{
throw CException(_T("CSocketServer::Socket::Socket()"), _T("Invalid socket"));
}
CSocketServer::Socket::~Socket()
{
}
void CSocketServer::Socket::Attach(
SOCKET theSocket)
{
if (INVALID_SOCKET != m_socket)
{
throw CException(_T("CSocketServer::Socket::Attach()"), _T("Socket already attached"));
}
m_socket = theSocket;
SetUserData(0);
}
void CSocketServer::Socket::AddRef()
{
::InterlockedIncrement(&m_ref);
}
void CSocketServer::Socket::Release()
{
if (0 == ::InterlockedDecrement(&m_ref))
{
m_server.ReleaseSocket(this);
}
}
void CSocketServer::Socket::Shutdown(
int how /* = SD_BOTH */)
{
Output(_T("CSocketServer::Socket::Shutdown() ") + ToString(how));
if (INVALID_SOCKET != m_socket)
{
if (0 != ::shutdown(m_socket, how))
{
m_server.OnError(_T("CSocketServer::Server::Shutdown() - ") + GetLastErrorMessage(::WSAGetLastError()));
}
Output(_T("shutdown initiated"));
}
}
void CSocketServer::Socket::Close()
{
CCriticalSection::Owner lock(m_server.m_listManipulationSection);
if (INVALID_SOCKET != m_socket)
{
if (0 != ::closesocket(m_socket))
{
m_server.OnError(_T("CSocketServer::Socket::Close() - closesocket - ") + GetLastErrorMessage(::WSAGetLastError()));
}
m_socket = INVALID_SOCKET;
m_server.OnConnectionClosed(this);
Release();
}
}
void CSocketServer::Socket::AbortiveClose()
{
m_server.PostAbortiveClose(this);
}
void CSocketServer::Socket::Read(
CIOBuffer *pBuffer /* = 0 */)
{
m_server.Read(this, pBuffer);
}
void CSocketServer::Socket::Write(
const char *pData,
size_t dataLength,
bool thenShutdown /* = false */)
{
m_server.Write(this, pData, dataLength, thenShutdown);
}
void CSocketServer::Socket::Write(
CIOBuffer *pBuffer,
bool thenShutdown /* = false */)
{
m_server.Write(this, pBuffer, thenShutdown);
}
///////////////////////////////////////////////////////////////////////////////
// CSocketServer::WorkerThread
///////////////////////////////////////////////////////////////////////////////
CSocketServer::WorkerThread::WorkerThread(
CIOCompletionPort &iocp)
: m_iocp(iocp)
{
// All work done in initialiser list
}
int CSocketServer::WorkerThread::Run()
{
try
{
//lint -e{716} while(1)
while (true)
{
// continually loop to service io completion packets
bool closeSocket = false;
DWORD dwIoSize = 0;
Socket *pSocket = 0;
CIOBuffer *pBuffer = 0;
try
{
m_iocp.GetStatus((PDWORD_PTR)&pSocket, &dwIoSize, (OVERLAPPED**)&pBuffer);
}
catch (const CWin32Exception &e)
{
if (e.GetError() != ERROR_NETNAME_DELETED &&
e.GetError() != WSA_OPERATION_ABORTED)
{
throw;
}
Output(_T("IOCP error - client connection dropped"));
closeSocket = true;
}
if (!pSocket)
{
// A completion key of 0 is posted to the iocp to request us to shut down...
break;
}
//lint -e{1933} call to unqualified virtual function
OnBeginProcessing();
if (pBuffer)
{
const IO_Operation operation = static_cast<IO_Operation>(pBuffer->GetUserData());
switch (operation)
{
case IO_Read_Request :
Read(pSocket, pBuffer);
break;
case IO_Read_Completed :
if (0 != dwIoSize)
{
pBuffer->Use(dwIoSize);
DEBUG_ONLY(Output(_T("RX: ") + ToString(pBuffer) + _T("\n") + DumpData(reinterpret_cast<const BYTE*>(pBuffer->GetWSABUF()->buf), dwIoSize, 40)));
//lint -e{1933} call to unqualified virtual function
ReadCompleted(pSocket, pBuffer);
}
else
{
// client connection dropped...
Output(_T("ReadCompleted - 0 bytes - client connection dropped"));
closeSocket = true;
}
pSocket->Release();
pBuffer->Release();
break;
case IO_Write_Request :
Write(pSocket, pBuffer);
if (dwIoSize != 0)
{
// final write, now shutdown send side of connection
pSocket->Shutdown(SD_SEND);
}
break;
case IO_Write_Completed :
pBuffer->Use(dwIoSize);
DEBUG_ONLY(Output(_T("TX: ") + ToString(pBuffer) + _T("\n") + DumpData(reinterpret_cast<const BYTE*>(pBuffer->GetWSABUF()->buf), dwIoSize, 40)));
//lint -e{1933} call to unqualified virtual function
WriteCompleted(pSocket, pBuffer);
pSocket->Release();
pBuffer->Release();
break;
case IO_Close :
AbortiveClose(pSocket);
pSocket->Release();
pBuffer->Release();
break;
default :
//lint -e{1933} call to unqualified virtual function
OnError(_T("CSocketServer::WorkerThread::Run() - Unexpected operation"));
break;
}
}
else
{
//lint -e{1933} call to unqualified virtual function
OnError(_T("CSocketServer::WorkerThread::Run() - Unexpected - pBuffer is 0"));
}
if (closeSocket)
{
pSocket->Close();
}
//lint -e{1933} call to unqualified virtual function
OnEndProcessing();
}
}
catch(const CException &e)
{
//lint -e{1933} call to unqualified virtual function
OnError(_T("CSocketServer::WorkerThread::Run() - Exception: ") + e.GetWhere() + _T(" - ") + e.GetMessage());
}
catch(...)
{
//lint -e{1933} call to unqualified virtual function
OnError(_T("CSocketServer::WorkerThread::Run() - Unexpected exception"));
}
return 0;
}
void CSocketServer::WorkerThread::InitiateShutdown()
{
m_iocp.PostStatus(0);
}
void CSocketServer::WorkerThread::WaitForShutdownToComplete()
{
// if we havent already started a shut down, do so...
InitiateShutdown();
Wait();
}
void CSocketServer::WorkerThread::Read(
Socket *pSocket,
CIOBuffer *pBuffer) const
{
pBuffer->SetUserData(IO_Read_Completed);
pBuffer->SetupRead();
DWORD dwNumBytes = 0;
DWORD dwFlags = 0;
if (SOCKET_ERROR == ::WSARecv(
pSocket->m_socket,
pBuffer->GetWSABUF(),
1,
&dwNumBytes,
&dwFlags,
pBuffer,
NULL))
{
DWORD lastError = ::WSAGetLastError();
if (ERROR_IO_PENDING != lastError)
{
Output(_T("CSocketServer::Read() - WSARecv: ") + GetLastErrorMessage(lastError));
if (lastError == WSAECONNABORTED ||
lastError == WSAECONNRESET ||
lastError == WSAEDISCON)
{
pSocket->Close();
}
pSocket->Release();
pBuffer->Release();
}
}
}
void CSocketServer::WorkerThread::Write(
Socket *pSocket,
CIOBuffer *pBuffer) const
{
pBuffer->SetUserData(IO_Write_Completed);
pBuffer->SetupWrite();
DWORD dwFlags = 0;
DWORD dwSendNumBytes = 0;
if (SOCKET_ERROR == ::WSASend(
pSocket->m_socket,
pBuffer->GetWSABUF(),
1,
&dwSendNumBytes,
dwFlags,
pBuffer,
NULL))
{
DWORD lastError = ::WSAGetLastError();
if (ERROR_IO_PENDING != lastError)
{
Output(_T("CSocketServer::Write() - WSASend: ") + GetLastErrorMessage(lastError));
if (lastError == WSAECONNABORTED ||
lastError == WSAECONNRESET ||
lastError == WSAEDISCON)
{
pSocket->Close();
}
pSocket->Release();
pBuffer->Release();
}
}
}
void CSocketServer::WorkerThread::WriteCompleted(
Socket * /*pSocket*/,
CIOBuffer *pBuffer)
{
if (pBuffer->GetUsed() != pBuffer->GetWSABUF()->len)
{
//lint -e{1933} call to unqualified virtual function
OnError(_T("CSocketServer::WorkerThread::WriteCompleted - Socket write where not all data was written"));
}
//lint -e{818} pointer pBuffer could be declared const (but not in derived classes...)
}
void CSocketServer::WorkerThread::AbortiveClose(
Socket *pSocket)
{
// Force an abortive close.
LINGER lingerStruct;
lingerStruct.l_onoff = 1;
lingerStruct.l_linger = 0;
if (SOCKET_ERROR == ::setsockopt(pSocket->m_socket, SOL_SOCKET, SO_LINGER, (char *)&lingerStruct, sizeof(lingerStruct)))
{
//lint -e{1933} call to unqualified virtual function
OnError(_T("CSocketServer::Socket::AbortiveClose() - setsockopt(SO_LINGER) - ") + GetLastErrorMessage(::WSAGetLastError()));
}
pSocket->Close();
}
void CSocketServer::WorkerThread::OnError(
const _tstring &message)
{
Output(message);
}
///////////////////////////////////////////////////////////////////////////////
// Static helper methods
///////////////////////////////////////////////////////////////////////////////
static size_t CalculateNumberOfThreads(size_t numThreads)
{
if (numThreads == 0)
{
CSystemInfo systemInfo;
numThreads = systemInfo.dwNumberOfProcessors * 2;
}
return numThreads;
}
///////////////////////////////////////////////////////////////////////////////
// Namespace: JetByteTools::Win32
///////////////////////////////////////////////////////////////////////////////
} // End of namespace Win32
} // End of namespace JetByteTools
///////////////////////////////////////////////////////////////////////////////
// Lint options
//
//lint -restore
//
///////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////
// End of file...
///////////////////////////////////////////////////////////////////////////////
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -