?? cmppsocket_bcb.cpp
字號:
// 拷貝數(shù)據(jù)到自己的內存區(qū)域,避免在臨界區(qū)中發(fā)送數(shù)據(jù)
memcpy( (void *)window, (void *)cmpp._window, sizeof( window));
// 搜尋并修改到期時間
for( i=0; i<nCMPP_WINDOW_SIZE; i++)
{
// 空位,跳過
if( cmpp._window[i].head.cmdid == 0)
continue;
// 未到發(fā)送時間,跳過
if( cmpp._window[i].t > time( NULL))
continue;
// 設置下一次發(fā)送的時間,當前時間+60秒
cmpp._window[i].t += 60;
}
LeaveCriticalSection( &cmpp._csec_wnd);
// 搜尋并發(fā)送到期的數(shù)據(jù)
for( i=0; i<nCMPP_WINDOW_SIZE; i++)
{
// 空位,跳過
if( window[i].head.cmdid == 0)
continue;
// 未到發(fā)送時間,跳過
if( window[i].t > time( NULL))
continue;
// 發(fā)送
nsize = ntohl( window[i].head.size);
err = cmpp._send( (char *)&window[i], nsize);
// 發(fā)送出錯,結束線程
if( err != nsize)
return 0;
}
// 主線程請求退出
if( cmpp._bexitting)
break;
}
return 0;
}
DWORD WINAPI CcmppSocket::thread_recv( LPVOID pdata)
{
int err, i;
long prevcount;
FD_SET fdset;
TIMEVAL timeout;
CcmppSocket &cmpp = *( CcmppSocket *)pdata;
CMPP_PACKAGE pkg;
memset((void *)&pkg,0,sizeof(pkg));
FD_ZERO( &fdset);
FD_SET( cmpp._soc, &fdset);
// 輪詢間隔1秒
timeout.tv_sec = 1;
timeout.tv_usec= 0;
#ifdef _DEBUG
_dbgeventlst.push_back("Thread Recieve : Thread Begin.");
#endif
for( ;;)
{
FD_ZERO( &fdset);
FD_SET( cmpp._soc, &fdset);
err = select(
0,
&fdset,
NULL,
NULL,
&timeout);
// 出錯
if( err == SOCKET_ERROR)
{
#ifdef _DEBUG
sprintf(_dbgtemp,"Thread Recieve : Socket Wait for Data Error.Error Code=%d",WSAGetLastError());
_dbgeventlst.push_back(_dbgtemp);
#endif
continue;//break;
}
// 超時
if( err == 0)
continue;
// 先接收包頭部分,以確定包的大小、類型
err = cmpp._recv( (char *)&pkg.head, sizeof( pkg.head));
if( err != sizeof( pkg.head))
{
#ifdef _DEBUG
sprintf(_dbgtemp,"Thread Recieve : Recieve Header Return Code=%d Needed Length=%d Error=%d.",err,sizeof( pkg.head),WSAGetLastError());
_dbgeventlst.push_back(_dbgtemp);
#endif
continue;//break;
}
// 接收包體
err = cmpp._recv( pkg.data, ntohl( pkg.head.size )-sizeof( pkg.head));
if( err == SOCKET_ERROR)
{
#ifdef _DEBUG
sprintf(_dbgtemp,"Thread Recieve : Socket Wait for Data Error.Error Code=%d",WSAGetLastError());
_dbgeventlst.push_back(_dbgtemp);
#endif
continue;//break;
}
switch (ntohl(pkg.head.cmdid))
{
case nCMPP_SUBMIT_RESP:
{
CMPP_SUBMIT_RESP & _smtrsp=*(CMPP_SUBMIT_RESP *)&pkg.data;
if (err!=sizeof(CMPP_SUBMIT_RESP))
{
#ifdef _DEBUG
sprintf(_dbgtemp,"Thread Recieve : Data Body Length Error.Length=%d",err);
_dbgeventlst.push_back(_dbgtemp);
#endif
break;
}
// 申請數(shù)據(jù)發(fā)送窗口的使用權
EnterCriticalSection( &cmpp._csec_wnd);
// 刪除對應流水號的包
for( i=0; i<nCMPP_WINDOW_SIZE; i++)
{
if( cmpp._window[i].head.cmdid == 0)
continue;
if( cmpp._window[i].head.seqid == pkg.head.seqid)
{
cmpp._window[i].head.cmdid = 0;
break;
}
}
LeaveCriticalSection( &cmpp._csec_wnd);
// 釋放一個窗口格子
ReleaseSemaphore(
cmpp._hsema_wnd,
1,
&prevcount);
#ifdef _DEBUG
sprintf(_dbgtemp,"Thread Recieve : Recieve CMPP_SUBMIT_RESP CMD. SeqID=%d",ntohl(pkg.head.seqid));
_dbgeventlst.push_back(_dbgtemp);
#endif
break;
}
//2. CMPP_DELIVER
case nCMPP_DELIVER:
{
CMPP_DELIVER & _dlv=*(CMPP_DELIVER *) &pkg.data;
printf("\nRecieve Data.\n");
memcpy((void *)_dlv.LinkID,(void *)(_dlv.msgcontent+_dlv.msglen),20);
_dlv.msgcontent[_dlv.msglen]=0x0;
_dlv.LinkID[20]='\0';
printf(" Size: %d\n",ntohl(pkg.head.size));
printf(" cmdid: %d\n",ntohl(pkg.head.cmdid));
printf(" seqid: %d\n",ntohl(pkg.head.seqid));
printf(" msgid: %LX\n",_dlv.msgid);
printf(" destnumber: %s\n",_dlv.destnumber);
printf(" serviceid : %s\n",_dlv.serviceid );
printf(" tppid: %d\n",_dlv.tppid);
printf(" tpudhi: %d\n",_dlv.tpudhi);
printf(" msgfmt: %d\n",_dlv.msgfmt);
printf(" srcnumber : %s\n",_dlv.srcnumber );
printf(" srctype;: %d\n",_dlv.srctype);
printf(" delivery: %d\n",_dlv.delivery);
printf(" msglen;: %d\n",_dlv.msglen);
printf(" msgcontent: %s\n",_dlv.msgcontent);
printf(" LinkID : %s\n",_dlv.LinkID );
#ifdef _DEBUG
sprintf(_dbgtemp,"Thread Recieve : Recieve CMPP_DELIVER CMD. SeqID=%d",ntohl(pkg.head.seqid));
_dbgeventlst.push_back(_dbgtemp);
#endif
break;
}
//3. CMPP_ACTIVE_TEST_RESP
case nCMPP_ACTIVE_TEST_RESP:
{
#ifdef _DEBUG
sprintf(_dbgtemp,"Thread Recieve : Recieve CMPP_ACTIVE_TEST_RESP CMD. SeqID=%d",ntohl(pkg.head.seqid));
_dbgeventlst.push_back(_dbgtemp);
#endif
break;
}
default:
{
#ifdef _DEBUG
sprintf(_dbgtemp,"Thread Recieve : Recieve CMPP_ACTIVE_TEST_RESP CMD. CMD=%LX SeqID=%d",ntohl(pkg.head.cmdid),ntohl(pkg.head.seqid));
_dbgeventlst.push_back(_dbgtemp);
#endif
}
}
// 主線程請求退出
if( cmpp._bexitting)
break;
}
#ifdef _DEBUG
_dbgeventlst.push_back("Thread Recieve : Thread End.");
#endif
return 0;
}
/******************************************************************************
* 數(shù)據(jù)接收線程,從端口向數(shù)據(jù)窗口填充數(shù)據(jù)
******************************************************************************/
/*
DWORD WINAPI CcmppSocket::thread_revwnd(LPVOID LParam)
{
/*
int err=0;
char buff=0;
for(;;)
{
err=WaitForSingleObject(_hsema_rev,1000);
if (err==WAIT_OBJECT_0)
{
err=_recv(&buff,1);
if (err!=SOCKET_ERROR)
{
EnterCriticalSection(_csec_revwnd);
_revbuff[_prevbuff++]=buff;
LeaveCriticalSection(_csec_revwnd);
}
}
if( cmpp._bexitting)
break;
}
}
*/
/******************************************************************************
* 為了保證線程及時正常退出,設定輪詢間隔為1秒
* CMPP2.0建議的鏈路檢測間隔為180秒
* 為了兼容各個廠家的實現(xiàn),設為30秒
******************************************************************************/
DWORD WINAPI CcmppSocket::thread_actv( LPVOID pdata)
{
CcmppSocket &cmpp = *( CcmppSocket *)pdata;
CMPP_HEAD msg;
int c = 0, // 計數(shù)
n = 30; // 等待的秒數(shù)
for( ;;)
{
Sleep( 1000);
// 主線程請求退出
if( cmpp._bexitting)
break;
if( ++ c < n)
continue;
c = 0;
// 獲得數(shù)據(jù)發(fā)送權利,發(fā)送鏈路測試包
msg.size = htonl( sizeof( msg));
msg.cmdid= htonl( nCMPP_ACTIVE_TEST);
msg.seqid= htonl( cmpp._getseqid());
cmpp._send( (char *)&msg, sizeof( msg));
}
return 0;
}
char * CcmppSocket::_timestamp( char *buf)
{
time_t tt;
struct tm *now;
time( &tt);
now = localtime( &tt);
sprintf( buf, "%02d%02d%02d%02d%02d",
now->tm_mon + 1,
now->tm_mday,
now->tm_hour,
now->tm_min,
now->tm_sec);
return buf;
}
__int64 CcmppSocket::_ntoh64( __int64 inval)
{
__int64 outval = 0;
for( int i=0; i<8; i++)
outval = ( outval << 8) + ( ( inval >> ( i * 8)) & 255);
return outval;
}
__int64 CcmppSocket::_hton64( __int64 inval)
{
return _ntoh64( inval);
}
int CcmppSocket::_getseqid()
{
int id;
EnterCriticalSection( &_csec_seq);
id = ++_seqid;
LeaveCriticalSection( &_csec_seq);
return id;
}
int CcmppSocket::_send( char *buf, int len)
{
int err;
EnterCriticalSection( &_csec_snd);
err = send( _soc, buf, len, 0);
LeaveCriticalSection( &_csec_snd);
return err;
}
int CcmppSocket::_recv( char *buf, int len)
{
int err;
EnterCriticalSection( &_csec_recv);
err = recv( _soc, buf, len, 0);
#ifdef _DEBUG
//sprintf(_dbgtemp," Recieve Data=%d Needed Length=%d.",err);
//_dbgeventlst.push_back(_dbgtemp);
if (err!=SOCKET_ERROR)
{
memcpy( (void *)_revp, (void *)buf, err);
_revp+=err;
//memset(_revp,0xFF,16);
//_revp+=16;
}
#endif
LeaveCriticalSection( &_csec_recv);
return err;
}
int main()
{
#ifdef _DEBUG
_dbgeventlst.push_back("Begin Of Debug.");
#endif
CcmppSocket cmpp;
cmpp.init( "900001", "900001", "127.0.0.1");
CMPP_SUBMIT msg;
memset( (void *)&msg, 0, sizeof( msg));
msg.desttotal = 1;
strcpy( (char *)msg.destnumbers, "13808425257");
msg.msglen = 10;
strcpy( (char *)msg.msgcontent, "0123456789");
strcpy( (char *)msg.reserve, "abcdefgh");
strcpy( (char *)msg.msgsrc, "900001");
msg.pkgnumber=1;
msg.pkgtotal=1;
msg.delivery=1;
for( int i=0; i<10; i++)
{
Sleep(100);
cmpp.Submit( msg);
}
printf("Press Return Continue...");
getchar();
#ifdef _DEBUG
FILE *_revstream;
if ((_revstream = fopen("CMPP3REV.LOG", "wb")) != NULL)
{
fwrite((void *)_revdebug,_revp-_revdebug,1,_revstream);
fclose(_revstream);
}
std::ofstream fileo;
fileo.open("CMPP3EVENT.LOG");
fileo << _dbgeventlst << std::endl;
fileo.close();
#endif
return 0;
}
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -