?? cmppdata.c
字號:
/*******************************************************
NAME: cmppdata.c
PURPOSE: China Mobile Peer to Peer Protocol 2.0
Permanent connection implementation to
submit data to Internet Short Message Gateway
or deliver data from ISMG to upper layer.
data management, including storing into and fetching
from queue, implementation.
VERSION: 0.0.1
AUTHOR: Ke Heng Zhong
DATE: 2002/06/17 12:52
MODIFIED: 2000/06/17 21:12
********************************************************/
#include "kevopsall.ext"
#include "cmpp.h"
#include "cmppif.h"
int if_get_submit_num (VTASK * vtask)
{
VTASK * entity = NULL;
CmppEntity * cment = NULL;
int num = 0;
if (!vtask) return 0;
entity = (VTASK *) getEntity (vtask);
cment = (CmppEntity *)entity->var;
EnterCriticalSection (&cment->submitCS);
num += lt_num(cment->submits);
LeaveCriticalSection (&cment->submitCS);
return num;
}
int if_get_submitresp_num (VTASK * vtask)
{
VTASK * entity = NULL;
CmppEntity * cment = NULL;
int num = 0;
if (!vtask) return 0;
entity = (VTASK *) getEntity (vtask);
cment = (CmppEntity *)entity->var;
EnterCriticalSection (&cment->respCS);
num += lt_num(cment->submitresps);
LeaveCriticalSection (&cment->respCS);
return num;
}
int if_get_deliver_num (VTASK * vtask)
{
VTASK * entity = NULL;
CmppEntity * cment = NULL;
int num = 0;
if (!vtask) return 0;
entity = (VTASK *) getEntity (vtask);
cment = (CmppEntity *)entity->var;
EnterCriticalSection (&cment->deliverCS);
num += lt_num(cment->delivers);
LeaveCriticalSection (&cment->deliverCS);
return num;
}
int get_submit_num (VTASK * vtask)
{
VTASK * entity = NULL;
CmppEntity * cment = NULL;
int num = 0;
if (!vtask) return 0;
entity = (VTASK *) getEntity (vtask);
cment = (CmppEntity *)entity->var;
num = lt_num(cment->unsucc_submits);
EnterCriticalSection (&cment->submitCS);
num += lt_num(cment->submits);
LeaveCriticalSection (&cment->submitCS);
return num;
}
CmppPdu * fetch_submit (VTASK * vtask)
{
VTASK * entity = NULL;
CmppPdu * submit = NULL;
CmppEntity * cment = NULL;
if (!vtask) return NULL;
entity = (VTASK *) getEntity (vtask);
cment = (CmppEntity *)entity->var;
if (lt_num(cment->unsucc_submits)) {
submit = (CmppPdu *)lt_rm_head(cment->unsucc_submits);
if (submit)
return submit;
}
EnterCriticalSection (&cment->submitCS);
submit = (CmppPdu *)lt_rm_head(cment->submits);
LeaveCriticalSection (&cment->submitCS);
return submit;
}
int if_store_submit (VTASK * vtask, CmppPdu * submit)
{
VTASK * entity = NULL;
CmppEntity * cment = NULL;
if (!vtask || !submit) return -1;
entity = (VTASK *) getEntity (vtask);
cment = (CmppEntity *)entity->var;
EnterCriticalSection (&cment->submitCS);
lt_append(cment->submits, submit);
LeaveCriticalSection (&cment->submitCS);
#ifdef _DEBUG
if (lt_num(cment->submits) % 500 == 0)
tolog("if_store_submits, total submits %d, pid=%ld, threadid=%ld.\n",
lt_num(cment->submits), getpid(), pthread_self());
#endif
return 0;
}
int store_unsucc_submit (VTASK * vtask, CmppPdu * submit)
{
VTASK * entity = NULL;
CmppEntity * cment = NULL;
if (!vtask || !submit) return -1;
entity = (VTASK *) getEntity (vtask);
cment = (CmppEntity *)entity->var;
lt_append(cment->unsucc_submits, submit);
#ifdef _DEBUG
if (lt_num(cment->unsucc_submits) % 500 == 0)
tolog("submit unsucc list total number %d, pid=%ld, threadid=%ld\n",
lt_num(cment->unsucc_submits), getpid(), pthread_self());
#endif
return 0;
}
int recycle_submit_pdu (VTASK * vtask, CmppPdu * pdu)
{
VTASK * entity = NULL;
CmppEntity * cment = NULL;
if (!vtask || !pdu) return -1;
entity = (VTASK *) getEntity (vtask);
cment = (CmppEntity *)entity->var;
EnterCriticalSection (&cment->recycleCS);
lt_append(cment->recycle_submits, pdu);
LeaveCriticalSection (&cment->recycleCS);
#ifdef _DEBUG
if (lt_num(cment->recycle_submits) % 500 == 0)
tolog("submit recycle list total number %d, pid=%ld, threadid=%ld\n",
lt_num(cment->recycle_submits), getpid(), pthread_self());
#endif
return 0;
}
CmppPdu * get_recycle_submit_pdu (VTASK * vtask)
{
VTASK * entity = NULL;
CmppEntity * cment = NULL;
CmppPdu * pdu = NULL;
if (!vtask) return NULL;
entity = (VTASK *) getEntity (vtask);
cment = (CmppEntity *)entity->var;
EnterCriticalSection (&cment->recycleCS);
pdu = lt_rm_head (cment->recycle_submits);
LeaveCriticalSection (&cment->recycleCS);
#ifdef _DEBUG
if (!pdu) {
FILE * fp = fopen("kepanic.txt", "a+");
fprintf(fp, "recycle submit pdu number is 0, this result in the allocation of one submit.\n"
"using submit pdu number is %d.\n",
lt_num(cment->submits));
fclose(fp);
}
#endif
return pdu;
}
int recycle_submitresp_pdu (VTASK * vtask, CmppPdu * pdu)
{
VTASK * entity = NULL;
CmppEntity * cment = NULL;
if (!vtask || !pdu) return -1;
entity = (VTASK *) getEntity (vtask);
cment = (CmppEntity *)entity->var;
EnterCriticalSection (&cment->recyclerespCS);
lt_append(cment->recycle_submitresps, pdu);
LeaveCriticalSection (&cment->recyclerespCS);
#ifdef _DEBUG
if (lt_num(cment->recycle_submitresps) % 500 == 0)
tolog("submit_response recycle list total number %d, pid=%ld, threadid=%ld\n",
lt_num(cment->recycle_submitresps), getpid(), pthread_self());
#endif
return 0;
}
CmppPdu * get_recycle_submitresp_pdu (VTASK * vtask)
{
VTASK * entity = NULL;
CmppEntity * cment = NULL;
CmppPdu * pdu = NULL;
if (!vtask) return NULL;
entity = (VTASK *) getEntity (vtask);
cment = (CmppEntity *)entity->var;
EnterCriticalSection (&cment->recyclerespCS);
pdu = lt_rm_head (cment->recycle_submitresps);
LeaveCriticalSection (&cment->recyclerespCS);
#ifdef _DEBUG
if (!pdu) {
FILE * fp = fopen("kepanic.txt", "a+");
fprintf(fp, "recycle submitresp pdu number is 0, this result in the allocation of one submitresp.\n"
"using submitresp pdu number is %d.\n",
lt_num(cment->submitresps));
fclose(fp);
}
#endif
return pdu;
}
int distribute_submit (VTASK * vtask, void * userinfo)
{
VTASK * entity = NULL,
* vtcon = NULL;
ENTITY * mentity = NULL;
CmppEntity * cment = NULL;
CmppCon * cmcon = NULL;
int i, num, j, k;
SubmitPending * item = NULL;
CmppPdu * cmpdu = NULL;
int sendNum = 0, vtsent = 0;
struct tm * curtime;
time_t calendt;
uint32 ufmt = 0;
if (!vtask) return 0;
#ifdef _DEBUG
info("distribute_submit: vtask %s begin.\n", vtask->name);
#endif
mentity = getEntity (vtask);
entity = (VTASK *)mentity;
cment = (CmppEntity *)entity->var;
if (cment->total_avail - cment->consumedwin <= 0)
return 0;
if (get_submit_num(entity) == 0)
return 0;
num = sk_num (mentity->busyConnections);
for (i=0; i<num; i++) {
vtcon = (VTASK *)sk_value(mentity->busyConnections, i);
if (vtcon->state != cmpp_ready)
continue;
cmcon = (CmppCon *)vtcon->var;
#ifdef _DEBUG
info("distribute via %s, availwin=%d, is_mt=%s, totalavail=%d consumedwin=%d\n",
vtcon->name, cmcon->availwin, cmcon->is_mo?"no":"yes",
cment->total_avail, cment->consumedwin);
#endif
if (cmcon->availwin <= 0)
continue;
if (cmcon->is_mo)
continue;
for (j=0, vtsent=0; j<cmcon->sliding_len; j++) {
item = &cmcon->subpdu[j];
if (item->status != SUBMIT_AVAIL)
continue;
item->submit = cmpdu = fetch_submit (vtask);
if (!cmpdu) {
/*cment->total_avail -= sendNum;*/
cment->consumedwin += sendNum;
return 0;
}
/* some default parameters not filled by upper layer,
* these paras are fetched from configuration file.
* now fill them here. */
memcpy(cmpdu->Submit.msg_src, cmcon->sp_id,
sizeof(cmpdu->Submit.msg_src));
time(&calendt);
curtime = localtime(&calendt);
/* construct time format of MSG_ID */
ufmt = 0;
ufmt = ufmt | ((curtime->tm_sec & 0x3f) << 7);
ufmt = ufmt | ((curtime->tm_min & 0x1f) << 13);
ufmt = ufmt | ((curtime->tm_hour & 0x1f) << 18);
ufmt = ufmt | ((curtime->tm_mday & 0x1f) << 23);
ufmt = ufmt | ((curtime->tm_mon & 0x0f) << 28);
for (k=3; k>=0; k--) {
cmpdu->Submit.msg_id[k] = (uint8)(ufmt & 0xff);
ufmt = ufmt >> 8;
}
/*memcpy(&cmpdu->Submit.msg_id[0], (uint8 *)&ufmt, 4);*/
ufmt = cmpdu->sequence_id & 0x0000ffff;
for (k=7; k>=4; k--) {
cmpdu->Submit.msg_id[k] = (uint8)(ufmt & 0xff);
ufmt = ufmt >> 8;
}
/*memcpy(&cmpdu->Submit.msg_id[4], (uint8 *)&ufmt, 4);*/
emptyFrame(item->submitframe);
cmpdu_encode(cmpdu, &item->submitframe);
item->retrynum = 0;
item->status = SUBMIT_PRESEND;
#ifdef _DEBUG
info("distribute procedure: %d bytes submit data to be sent:\n",
frameLength(item->submitframe));
/*printFrame(INFO, item->submitframe, 3);*/
#endif
if (sendTcpCon (cmcon->fd, item->submitframe) < 0) {
error("distribute_submit: CMPP connection %s to "
"ISMG %s:%d crashed while reading length.\n",
vtcon->name, cment->remote_host, cment->remote_port);
vtcon->state = cmpp_null;
checkIdleConnection(vtcon);
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -