?? ctpnet.cpp
字號:
// CCTPReceivedData - Buffer for storing received data and information about it
// CCTPErrorInfo - Buffer for storing error information
// CCTPNet - Class, which implements CTP
// Implementation file
//
// (c) Lev Naumov, CAMEL Laboratory
// E-mail: camellab@mail.ru
// For more information see http://camel.ifmo.ru or
// http://www.codeproject.com/internet/ctp.asp
/////////////////////////////////////////////////////////////////////////////
#include "stdafx.h"
#include "NetBasic.h"
#include "DebugLog.h"
#include "CTPNet.h"
// Macrodefinitions for handy log building
// Put message mess to output stream log, protecter by critical section cs
#define LOG(log,cs,mess) \
if (log) { \
char ts[22]; \
CSingleLock lock(&cs,TRUE); \
(((ostream&)*log)<<CCTPErrorInfo::GetTimeStamp(ts)<<" "<<mess<<"\n").flush();\
}
// The same, but string representation of ip-address ip can be referenced as
// "addr"
#define LOGA(log,cs,mess,ip) \
if (log) { \
char ts[22],saddr[16]; \
ip.GetString(saddr); \
CSingleLock lock(&cs,TRUE); \
(((ostream&)*log)<<CCTPErrorInfo::GetTimeStamp(ts)<<" "<<mess<<"\n").flush();\
}
// The same, but inserts description of header head between mess1 and mess2.
// Moreover, string representation of ip-address ip can be referenced as "addr"
#define LOGHA(log,cs,mess1,mess2,head,ip) \
if (log) { \
char ts[22],saddr[16]; \
ip.GetString(saddr); \
CSingleLock lock(&cs,TRUE); \
((ostream&)*log)<<CCTPErrorInfo::GetTimeStamp(ts)<<" "<<mess1;\
head->ToStream(*log); \
(((ostream&)*(log))<<mess2<<"\n").flush();\
}
CCTPReceivedData::CCTPReceivedData(unsigned __int16 command, unsigned __int64 size, unsigned long from, char* buf)
{
this->command=command;
this->size=size;
this->from=IPAddr(from);
pBuf=new char[(unsigned int)size];
if (buf) memcpy(pBuf,buf,(unsigned int)size);
}
CCTPErrorInfo::CCTPErrorInfo(unsigned char type,unsigned __int16 command,int code,IPAddr addr)
{
this->type=type;
this->command=command;
this->code=code;
this->addr=addr;
GetTimeStamp(timestamp);
}
char* CCTPErrorInfo::GetTimeStamp(char* s)
{
CHAR date[30]="";
CHAR time[30]="";
_timeb timebuffer;
// Get date/time
_strdate(date);
_ftime(&timebuffer);
_strtime(time);
// Create string
sprintf(s,"%8s %8s.%03d",date,time,timebuffer.millitm);
return s;
}
void CCTPNet::Header::ToStream(ostream& out)
{
if (command&CCTPNet::m_iConfirm) {
out<<"{confirm: "<<(command^CCTPNet::m_iConfirm);
} else {
out<<"{command: "<<command;
}
out<<", id: "<<(unsigned int)id<<", size: "<<(unsigned int)size;
if (amount>1) {
out<<", num: "<<(unsigned int)number<<"("<<(unsigned int)amount<<")";
}
if (options) {
out<<", opt: "<<options;
}
out<<"}";
}
bool CCTPNet::SntCommandInfo::Confirm(unsigned int i)
{
CI[i].bConfirmed=true;
for (unsigned int j=0; j<uCount; j++) {
if (!CI[j].bConfirmed) return false;
}
return true;
}
bool CCTPNet::LargeCommandInfo::GotPart(unsigned int i)
{
received[i]=true;
for (unsigned int j=0; j<uCount; j++) {
if (!received[j]) return false;
}
return true;
}
const unsigned __int8 CCTPNet::OptPing=CCTPNet::Options::DelAfterError|CCTPNet::Options::NoResend|CCTPNet::Options::UniqueCommand;
const unsigned __int16 CCTPNet::m_iConfirm=0x8000;
CCTPNet::CCTPNet(NetReceiver* receiver,unsigned short port,unsigned short servers,Times* times,ostream* log,unsigned __int16 packetdatasize,unsigned short maxdeliverers)
{
// Tuning
m_DefReceiver=receiver;
m_uPort=port;
if (times) m_Times=*times;
m_uPacketDataSize=packetdatasize;
m_pLog=log;
// Initialize random generator
srand((unsigned)time(NULL));
// Initialization
m_bSuspended=true;
m_SntCommands.clear();
m_Sessions.clear();
m_LargeCommands.clear();
m_Receivers.clear();
m_pBuffer=new char[m_uPacketDataSize+GetHeaderSize()];
m_Deliveries.clear();
m_pDeliverTrds.clear();
m_uMaxDeliverers=maxdeliverers;
m_uBusy=0;
CreateSockets();
m_bKill=false;
// Start threads and store handles
for (unsigned int i=0;i<servers;i++) {
m_pServerTrds.push_back(AfxBeginThread(CTPServerFunction,this));
}
m_pDelManTrd=AfxBeginThread(CTPDelManFunction,this);
LOG(m_pLog,m_csLog,"CTP started on port "<<port<<" with "<<servers<<" servers\n");
}
CCTPNet::~CCTPNet()
{
LOG(m_pLog,m_csLog,"CTP is shuting down");
// Terminate threads
m_bKill=true;
DWORD time=GetTickCount();
while ((!m_pDeliverTrds.empty()) || !m_pServerTrds.empty() || m_pDelManTrd) {
Sleep(m_Times.uSleepOnDestroy);
// Kill servers, deliverers and delivery manager if they are busy too long
if (GetTickCount()-time>m_Times.uPeriodDestroy) {
CSingleLock locks(&m_csServerTrds);
LOCK(locks);
for (vector<CWinThread*>::iterator it=m_pServerTrds.begin(); it!=m_pServerTrds.end(); it++) {
LOG(m_pLog,m_csLog,"Server thread with handle "<<(*it)->m_hThread<<" was stopped forcedly");
TerminateThread((*it)->m_hThread,0);
}
m_pServerTrds.clear();
UNLOCK(locks);
CSingleLock lockd(&m_csDeliverTrds);
LOCK(lockd);
for (it=m_pDeliverTrds.begin(); it!=m_pDeliverTrds.end(); it++) {
LOG(m_pLog,m_csLog,"Deliverer thread with handle "<<(*it)->m_hThread<<" was stopped forcedly");
TerminateThread((*it)->m_hThread,0);
}
m_pDeliverTrds.clear();
UNLOCK(lockd);
if (m_pDelManTrd) {
LOG(m_pLog,m_csLog,"Delivery manager thread was stopped forcedly");
TerminateThread(m_pDelManTrd->m_hThread,0);
}
}
}
// Free resources
closesocket(m_SendSocket);
closesocket(m_RecvSocket);
FreeSntCommands();
FreeSessions();
FreeLargeCommands();
FreeDeliveries();
m_Receivers.clear();
delete[] m_pBuffer;
LOG(m_pLog,m_csLog,"CTP stopped");
}
bool CCTPNet::CreateSockets()
{
LOG(m_pLog,m_csLog,"Creating sockets");
CSingleLock lock(&m_csDeliveries);
// Sockets creation
m_SendSocket=socket(AF_INET, SOCK_DGRAM, 0);
m_RecvSocket=socket(AF_INET, SOCK_DGRAM, 0);
if (m_SendSocket==INVALID_SOCKET || m_RecvSocket==INVALID_SOCKET) {
LOCK(lock);
m_Deliveries.push_back(Delivery(m_DefReceiver,new CCTPErrorInfo(0,0,WSAGetLastError(),IPAddr())));
UNLOCK(lock);
LOG(m_pLog,m_csLog,"Failed to create sockets");
m_bSuspended=true;
return false;
}
// Sockets are to support broadcasting
BOOL broadcast=TRUE;
setsockopt(m_SendSocket,SOL_SOCKET,SO_BROADCAST,(char*)&broadcast,sizeof(broadcast));
setsockopt(m_RecvSocket,SOL_SOCKET,SO_BROADCAST,(char*)&broadcast,sizeof(broadcast));
// Binding local address with receiving socket
m_Local.sin_family=AF_INET;
m_Local.sin_port=htons(m_uPort);
m_Local.sin_addr.s_addr=htonl(INADDR_ANY);
if (bind(m_RecvSocket,(SOCKADDR*)&m_Local,sizeof(m_Local))==SOCKET_ERROR)
{
LOCK(lock);
m_Deliveries.push_back(Delivery(m_DefReceiver,new CCTPErrorInfo(1,0,WSAGetLastError(),IPAddr())));
UNLOCK(lock);
LOG(m_pLog,m_csLog,"Failed to bind receiving socket");
m_bSuspended=true;
return false;
}
return true;
}
void CCTPNet::CheckupOptions(Header& header)
{
if (header.amount>1 && header.options&Options::UniqueCommand) header.options^=Options::UniqueCommand;
if (header.options&Options::StartSession) header.options^=Options::StartSession;
}
bool CCTPNet::Send(SmartBuffer& sb, unsigned __int16 command, IPAddr to, unsigned __int8 options, bool storeiffail)
{
// Arrange header
Header head;
head.amount=sb.GetPacketsCount();
head.command=command;
head.messize=sb.GetDataSize();
head.options=options;
CheckupOptions(head);
// Provide data with headers
for (unsigned int i=0;i<head.amount;i++) {
// Fill rest header information
GetNextID(head,to);
head.size=(unsigned __int16)sb.GetPacketSize(i);
head.number=i;
// Put header in the packet
sb.PutHead(&head,i);
}
// Store message to be sent and pointer to it
SntCommandInfo ci(sb,GetTickCount(),to.Solid);
CSingleLock lock(&m_csSntCommands);
LOCK(lock);
m_SntCommands.push_back(ci);
SntCommandInfoList::iterator curentry=--m_SntCommands.end();
UNLOCK(lock);
// Send packets
for (i=0;i<head.amount;i++) {
if (!SendPacket(sb.GetHeadPtr(i),to.Solid)) {
// Store error information
CSingleLock lock(&m_csDeliveries);
LOCK(lock);
m_Deliveries.push_back(Delivery(m_DefReceiver,new CCTPErrorInfo(2,head.command,WSAGetLastError(),IPAddr(to.Solid))));
UNLOCK(lock);
// Delete from sent packets storage if there was an error and storeiffail equals false
if (!storeiffail) {
CSingleLock lock(&m_csSntCommands);
LOCK(lock);
m_SntCommands.erase(curentry);
UNLOCK(lock);
}
// Exit sending function with return value, which shows error
return false;
}
}
// Exit sending function with return value, which shows success
return true;
}
bool CCTPNet::SendPacket(char* buf, unsigned long to)
{
SOCKADDR_IN recip;
recip.sin_family=AF_INET;
recip.sin_port=htons(m_uPort);
recip.sin_addr.s_addr=to;
Header* head=(Header*)buf;
LOGHA(m_pLog,m_csLog,"Send packet "," to "<<saddr,head,IPAddr(to));
return (sendto(m_SendSocket,(const char*)buf,head->size,0,(SOCKADDR*)&recip,sizeof(recip))!=SOCKET_ERROR);
}
void CCTPNet::FreeSntCommands()
{
CSingleLock lock(&m_csSntCommands);
LOCK(lock);
for (SntCommandInfoList::iterator it=m_SntCommands.begin();it!=m_SntCommands.end();it++) {
it->Free();
}
m_SntCommands.clear();
UNLOCK(lock);
LOG(m_pLog,m_csLog,"Sent commands storage freed");
}
void CCTPNet::FreeSessions()
{
CSingleLock lock(&m_csSessions);
LOCK(lock);
for (SessionsInfo::iterator it=m_Sessions.begin();it!=m_Sessions.end();it++) {
it->second.received.clear();
}
m_Sessions.clear();
UNLOCK(lock);
LOG(m_pLog,m_csLog,"Sessions information storage freed");
}
void CCTPNet::FreeLargeCommands()
{
CSingleLock lock(&m_csLargeCommands);
LOCK(lock);
for (LargeCommandInfoList::iterator it=m_LargeCommands.begin();it!=m_LargeCommands.end();it++) {
it->Free();
if (it->pRD) delete it->pRD;
}
m_LargeCommands.clear();
UNLOCK(lock);
LOG(m_pLog,m_csLog,"Large commands storage freed");
}
void CCTPNet::FreeDeliveries()
{
for (DeliveriesList::iterator it=m_Deliveries.begin();it!=m_Deliveries.end();it++) {
if (it->data) {
if (it->type==DeliveryType::ReceivedData) delete (CCTPReceivedData*)it->data; else
if (it->type==DeliveryType::ErrorInfo) delete (CCTPErrorInfo*)it->data; else
delete it->data;
}
}
m_Deliveries.clear();
LOG(m_pLog,m_csLog,"Deliveries storage freed");
}
bool CCTPNet::SaveRcvPacket(unsigned long from,Header* head)
{
CSingleLock lock(&m_csSessions);
LOCK(lock);
SessionInfo& si=GetSessionInfo(IPAddr(from),head->options&Options::Broadcast?true:false);
if (si.received.empty()) {
// First message from corresponding workstation
si.received.push_back(head->id);
} else {
// Find place among already received messages
if (Less(si.received.back(),head->id)) {
si.received.push_back(head->id);
} else {
for (SessionInfo::RcvList::iterator it=si.received.begin();it!=si.received.end();it++) {
// Already has been recieved
if ((*it)==head->id) {
LOGHA(m_pLog,m_csLog,"Packet "," from "<<saddr<<" have been already received",head,IPAddr(from));
return false;
}
if (Less(head->id,*it)) {
// Already has been recieved
if (it==si.received.begin() && si.minwasset) {
LOGHA(m_pLog,m_csLog,"Packet "," from "<<saddr<<" have been already received",head,IPAddr(from));
return false;
}
// New one has been recieved
si.received.insert(it,head->id);
break;
}
}
}
// Remove ambigous information
if (si.minwasset) {
while ((si.received.front()+1)==*(++si.received.begin())) {
si.received.pop_front();
}
}
?? 快捷鍵說明
復(fù)制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -