?? ctpnet.cpp
字號(hào):
}
// Session was started with this packet
if (head->options&Options::StartSession) {
si.minwasset=true;
LOGA(m_pLog,m_csLog,"Session starting packet with ID:"<<(unsigned int)head->id<<" have been received from "<<saddr,IPAddr(from));
}
UNLOCK(lock);
return true;
}
void CCTPNet::SendConfirmation(unsigned long to,Header header)
{
// Change header
header.command|=m_iConfirm;
header.messize=0;
header.size=GetHeaderSize();
// Send confirmation command
SendPacket((char*)&header,to);
}
void CCTPNet::ConfirmSntPacket(unsigned long to,Header* header)
{
Header* head;
bool erased=false;
// Is command unique
bool unique=(header->options&Options::UniqueCommand)&&(header->amount<=1);
CSingleLock lock(&m_csSntCommands);
LOCK(lock);
for (SntCommandInfoList::iterator it=m_SntCommands.begin();it!=m_SntCommands.end();) {
erased=false;
head=(Header*)it->sbBody.GetBufferBegin();
// Find appropriate recipient or accept any if message was broadcasted
if (it->ipTo==to || head->options&Options::Broadcast) {
if (!unique) {
// Not unique command
if (head->id==header->id-header->number) {
if (it->Confirm(header->number)) {
SetTimeout(it->ipTo,head->options&Options::Broadcast?true:false,m_Times.uMultiplier*(GetTickCount()-(it->CI[0].dwTime)));
LOGHA(m_pLog,m_csLog,"Sent commands storage excluded entry ",", sent to "<<saddr,head,IPAddr(to));
it->Free();
m_SntCommands.erase(it);
}
UNLOCK(lock);
return;
}
} else {
// Unique command
if ((head->command^header->command)==m_iConfirm) {
if (it->Confirm(0)) {
SetTimeout(it->ipTo,head->options&Options::Broadcast?true:false,m_Times.uMultiplier*(GetTickCount()-(it->CI[0].dwTime)));
LOGHA(m_pLog,m_csLog,"Sent commands storage excluded entry ",", sent to "<<saddr,head,IPAddr(to));
it->Free();
it=m_SntCommands.erase(it);
erased=true;
if (it==m_SntCommands.end()) break;
}
}
}
}
// Go to next sent packet
if (!erased) it++;
}
UNLOCK(lock);
}
bool CCTPNet::ArrangeLargeCommand(unsigned long from,Header* head)
{
LOGHA(m_pLog,m_csLog,"Packet "," from "<<saddr<<" is a part of large command",head,IPAddr(from));
char* mem=NULL;
CSingleLock lock(&m_csLargeCommands);
LOCK(lock);
// Try to find packets, wich belongs to the same message in received parts
for (LargeCommandInfoList::iterator it=m_LargeCommands.begin();it!=m_LargeCommands.end();it++) {
if (it->id==head->id-head->number && it->pRD->from==from && !it->received[head->number]) {
mem=it->pRD->pBuf;
mem+=head->number*m_uPacketDataSize;
memcpy(mem,m_pBuffer+GetHeaderSize(),head->size-GetHeaderSize());
if (it->GotPart(head->number)) {
LOG(m_pLog,m_csLog,"Large command arranged");
CSingleLock lock(&m_csDeliveries);
LOCK(lock);
m_Deliveries.push_back(Delivery(GetReceiver(head->command,DeliveryType::ReceivedData),it->pRD));
UNLOCK(lock);
it->Free();
m_LargeCommands.erase(it);
return true;
} else return false;
}
}
// Part of the new message received
LargeCommandInfo lpi(head->command,head->messize,from,head->id,head->amount);
mem=lpi.pRD->pBuf;
mem+=head->number*m_uPacketDataSize;
memcpy(mem,m_pBuffer+GetHeaderSize(),head->size-GetHeaderSize());
m_LargeCommands.push_front(lpi);
m_LargeCommands.front().GotPart(head->number);
return false;
}
void CCTPNet::ResendNotConfirmedData()
{
Header* head;
DWORD time=GetTickCount();
bool erased=false;
unsigned int i=0;
// This fleag is used to keep from showing error for all parts of the large
// message
bool wasdead=false;
CSingleLock lock(&m_csSntCommands);
LOCK(lock);
for (SntCommandInfoList::iterator it=m_SntCommands.begin();it!=m_SntCommands.end();) {
erased=false;
head=(Header*)it->sbBody.GetBufferBegin();
for (i=0; i<it->uCount; i++) if (!it->CI[i].bConfirmed) {
// If timeout have expired
if ((unsigned int)(time-it->CI[i].dwLTime)>it->CI[i].uResend*GetTimeout(it->ipTo,head->options&Options::Broadcast?true:false)) {
if (!(head->options&Options::NoResend)) {
LOGHA(m_pLog,m_csLog,"Packet "," is to be resent to "<<saddr,((Header*)it->sbBody.GetHeadPtr(i)),IPAddr(it->ipTo));
SendPacket(it->sbBody.GetHeadPtr(i),it->ipTo);
}
it->CI[i].dwLTime=time;
it->CI[i].IncResend();
// Produce error if dead timeout occuired
if (it->CI[i].IsDeadTimeout() && !wasdead){
CSingleLock lock(&m_csDeliveries);
LOCK(lock);
m_Deliveries.push_back(Delivery(GetReceiver(head->command,DeliveryType::ErrorInfo),new CCTPErrorInfo(4,head->command,WSAGetLastError(),IPAddr(it->ipTo))));
UNLOCK(lock);
wasdead=true;
// Erase if needed
if (head->options&Options::DelAfterError) {
LOGHA(m_pLog,m_csLog,"Command, sent to "<<saddr<<", which includes packet ",", is delete from sent commands storage after generating error delivery",((Header*)it->sbBody.GetHeadPtr(i)),IPAddr(it->ipTo));
m_SntCommands.erase(it);
erased=true;
break;
}
}
}
}
// Go to next element
if (!erased) it++;
}
UNLOCK(lock);
}
void CCTPNet::AddSpecialReceiver(unsigned __int16 command, NetReceiver* receiver, DeliveryType type)
{
if (command==0) return;
for (SpecialReceiversList::iterator it=m_Receivers.begin();it!=m_Receivers.end();) {
if (it->command==command && it->type==type) {
it=m_Receivers.erase(it);
if (it==m_Receivers.end()) break;
} else it++;
}
m_Receivers.push_front(SpecialReceiver(command,receiver,type));
}
void CCTPNet::DeleteSpecialReceiver(NetReceiver* receiver)
{
for (SpecialReceiversList::iterator it=m_Receivers.begin();it!=m_Receivers.end();) {
if (it->receiver==receiver) {
it=m_Receivers.erase(it);
if (it==m_Receivers.end()) break;
} else it++;
}
}
NetReceiver* CCTPNet::GetReceiver(unsigned __int16 command, DeliveryType type)
{
if (command==0) return m_DefReceiver;
for (SpecialReceiversList::iterator it=m_Receivers.begin();it!=m_Receivers.end();it++) {
if (it->command==command && it->type==type) return it->receiver;
}
return m_DefReceiver;
}
CCTPNet::SessionInfo& CCTPNet::GetSessionInfo(IPAddr addr, bool bcast)
{
if (bcast) addr.SetBroadcast();
return m_Sessions.insert(SessionsInfo::value_type(addr.Solid,SessionInfo())).first->second;
}
void CCTPNet::GetNextID(Header& head,IPAddr addr)
{
CSingleLock lock(&m_csSessions);
LOCK(lock);
if (m_Sessions.find(addr.Solid)==m_Sessions.end()) head.options|=Options::StartSession;
SessionInfo& si=GetSessionInfo(addr,head.options&Options::Broadcast?true:false);
head.id=++si.id;
}
unsigned int CCTPNet::GetTimeout(IPAddr addr, bool bcast)
{
CSingleLock lock(&m_csSessions);
LOCK(lock);
SessionInfo& si=GetSessionInfo(addr,bcast);
if (!(si.timeout)) return m_Times.uDefTimeout; else return si.timeout;
}
void CCTPNet::SetTimeout(IPAddr addr, bool bcast, unsigned int timeout)
{
CSingleLock lock(&m_csSessions);
LOCK(lock);
SessionInfo& si=GetSessionInfo(addr,bcast);
if (!(si.timeout)) {
LOGA(m_pLog,m_csLog,"Timeout for session with "<<saddr<<" is considered to be "<<timeout<<" microseconds",addr);
si.timeout=timeout;
}
}
unsigned int CTPServerFunction(void* pNet)
{
CCTPNet* net=(CCTPNet*)pNet;
// Necessary variables
timeval tv;
tv.tv_sec=0;
tv.tv_usec=1;
fd_set fdread;
SOCKADDR_IN sender;
int sendersize=NULL;
CCTPNet::Header* head;
DWORD time=NULL;
DWORD checktime=GetTickCount();
CSingleLock lock(&net->m_csDeliveries);
CSingleLock lockn(&net->m_csNetwork);
for(;;) {
// Wait while suspended
while (net->GetSuspended()) {
// Does killing needed
if (net->m_bKill) {
CSingleLock lock(&net->m_csServerTrds);
LOCK(lock);
net->m_pServerTrds.erase(find(net->m_pServerTrds.begin(),net->m_pServerTrds.end(),AfxGetThread()));
UNLOCK(lock);
LOG(net->m_pLog,net->m_csLog,"Server thread with handle "<<(unsigned int)AfxGetThread()<<" stopped");
return (unsigned int)AfxGetThread();
}
// Sleep a little bit
Sleep(net->GetTimes().uSleepSuspended);
}
// Check for received data
time=GetTickCount();
FD_ZERO(&fdread);
FD_SET(net->m_RecvSocket,&fdread);
LOCK(lockn);
if (select(0,&fdread,NULL,NULL,&tv)>0) {
// Receive data
sendersize=sizeof(sender);
int ret=recvfrom(net->m_RecvSocket,net->m_pBuffer,net->GetPacketDataSize()+net->GetHeaderSize(),0,(SOCKADDR*)&sender,&sendersize);
UNLOCK(lockn);
if (ret==SOCKET_ERROR) {
// Error while receiving
LOCK(lock);
net->m_Deliveries.push_back(CCTPNet::Delivery(net->GetDefaultReceiver(),new CCTPErrorInfo(3,0,WSAGetLastError(),IPAddr(sender.sin_addr.S_un.S_addr))));
UNLOCK(lock);
LOG(net->m_pLog,net->m_csLog,"Network receiving error");
} else {
if (ret>=net->GetHeaderSize()) {
head=(CCTPNet::Header*)net->m_pBuffer;
LOGHA(net->m_pLog,net->m_csLog,"Packet "," have been received from "<<saddr,head,IPAddr(sender.sin_addr.S_un.S_addr));
if (net->IsConfirmation(head->command)) {
net->ConfirmSntPacket(sender.sin_addr.S_un.S_addr,head);
} else {
if (net->SaveRcvPacket(sender.sin_addr.S_un.S_addr,head)) {
// New packet was got
if (head->amount>1) {
net->ArrangeLargeCommand(sender.sin_addr.S_un.S_addr,head);
} else {
LOCK(lock);
net->m_Deliveries.push_back(CCTPNet::Delivery(net->GetReceiver(head->command,CCTPNet::DeliveryType::ReceivedData),new CCTPReceivedData(head->command,head->messize,sender.sin_addr.S_un.S_addr,net->m_pBuffer+net->GetHeaderSize())));
UNLOCK(lock);
}
}
// Send confimation
net->SendConfirmation(sender.sin_addr.S_un.S_addr,*head);
}
}
}
} else UNLOCK(lockn);
// Does killing needed
if (net->m_bKill) {
CSingleLock lock(&net->m_csServerTrds);
LOCK(lock);
net->m_pServerTrds.erase(find(net->m_pServerTrds.begin(),net->m_pServerTrds.end(),AfxGetThread()));
UNLOCK(lock);
LOG(net->m_pLog,net->m_csLog,"Server thread with handle "<<(unsigned int)AfxGetThread()<<" stopped");
return (unsigned int)AfxGetThread();
}
// Sent packets were not checked long enough
if ((time-checktime)>net->GetTimes().uPeriodCheckResend) {
// Check if resending needed
checktime=GetTickCount();
net->ResendNotConfirmedData();
}
}
}
unsigned int CTPDelManFunction(void* pNet)
{
CCTPNet* net=(CCTPNet*)pNet;
CCTPNet::Delivery del;
CSingleLock lock(&net->m_csDeliverTrds);
for(;;) {
// Does additional delivery threads needed
if (!net->GetSuspended()) {
LOCK(lock);
if (!net->m_Deliveries.empty() && net->m_pDeliverTrds.size()<net->m_uMaxDeliverers && net->m_pDeliverTrds.size()==net->m_uBusy) {
net->m_pDeliverTrds.push_back(AfxBeginThread(CTPDeliverFunction,pNet));
LOG(net->m_pLog,net->m_csLog,"Delivery manager have created deliverer with handle "<<net->m_pDeliverTrds.back()->m_hThread);
}
UNLOCK(lock);
}
// Kill server
if (net->m_bKill) {
net->m_pDelManTrd=NULL;
LOG(net->m_pLog,net->m_csLog,"Delivery manager thread stopped");
return (unsigned int)AfxGetThread();
}
Sleep(net->GetTimes().uSleepDelMan);
}
}
unsigned int CTPDeliverFunction(void* pNet)
{
CCTPNet* net=(CCTPNet*)pNet;
CCTPNet::Delivery del;
// If zero then something was done. If greater then nothing was done
//(greater means doing nothing longer)
bool bNothing=true;
DWORD lastdel=GetTickCount();
CSingleLock lock(&net->m_csDeliveries);
for(;;) {
bNothing=true;
for (;;) {
// Get delivery if exist
LOCK(lock);
if (net->m_Deliveries.empty()) break;
del=net->m_Deliveries.front();
net->m_Deliveries.pop_front();
UNLOCK(lock);
LOG(net->m_pLog,net->m_csLog,"Delivery is beeing processed");
// Deliver delivery
net->m_uBusy++;
switch (del.type) {
case CCTPNet::DeliveryType::ReceivedData:
del.target->OnReceive(del.data);
delete (CCTPReceivedData*)del.data;
break;
case CCTPNet::DeliveryType::ErrorInfo:
del.target->OnError(del.data);
delete (CCTPErrorInfo*)del.data;
break;
default:
if (VALID(del.data)) delete del.data;
}
net->m_uBusy--;
lastdel=GetTickCount();
// Thread is working now
bNothing=false;
// If killing needed then do it after unlocking
if (net->m_bKill) break;
}
if (lock.IsLocked()) UNLOCK(lock);
// Does killing needed (because of request or because of sponging)
if (net->m_bKill || (net->m_Deliveries.size()==0 && GetTickCount()-lastdel>net->GetTimes().uPeriodAutoDest)) {
CSingleLock lock(&net->m_csDeliverTrds);
LOCK(lock);
net->m_pDeliverTrds.erase(find(net->m_pDeliverTrds.begin(),net->m_pDeliverTrds.end(),AfxGetThread()));
UNLOCK(lock);
LOG(net->m_pLog,net->m_csLog,"Deliverer thread with handle "<<(unsigned int)AfxGetThread()<<" stopped");
return (unsigned int)AfxGetThread();
}
// Sleep a little bit
if (bNothing) {
Sleep(net->GetTimes().uSleepNothing);
}
}
}
?? 快捷鍵說(shuō)明
復(fù)制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號(hào)
Ctrl + =
減小字號(hào)
Ctrl + -