?? smgp3.c
字號:
}
}
else
{
INFO("[INFO-OnlyRecv:%u] RECV DATA %d\n", threadsnum, recv_len);
buffersize += recv_len;
}
}
}
// INFO("[INFO-RECV:%u] Buffersize:%d resqnum:%d\n", threadsnum , buffersize, resqnum);
// if( (buffersize >= LEN_SGIP_HEADER ) && (resqnum >0) )
while(buffersize>=LEN_SMGP_HEAD)
{
n = GetMessageHead( socketbuffer, &Body_Length, &Command_ID, &R_Seq);
buffersize = buffersize - LEN_SMGP_HEAD;
INFO("[INFO-OnlyRecv:%u] buffsersize:%d Command_ID:%08x Body_Length:%d \n", threadsnum, buffersize, Command_ID, Body_Length);
memmove(socketbuffer , socketbuffer+LEN_SMGP_HEAD, buffersize);
if( buffersize < Body_Length )
{
usleep(1000*500);
recv_len = recv(Rsocket_fd, socketbuffer+buffersize, socketbuffersize-buffersize-1, 0);
if(recv_len> 0)
buffersize += recv_len;
INFO("[INFO-OnlyRecv:%u] buffersize < Body_Length,restart recv, buffersize=%d\n", threadsnum, buffersize);
}
if(Command_ID == ID_SMGP_LOGIN_RESP) //登陸請求響應(yīng)
{
// Bind Resp
n = RecvLoginResp(socketbuffer, Body_Length, &Status, &Version);
buffersize -= Body_Length;
memmove(socketbuffer, socketbuffer+Body_Length, buffersize );
INFO("[INFO-OnlyRecv:%u] login resp Recv ,chang login status lock\n", threadsnum );
if(Status == 0)
{
INFO("[INFO-OnlyRecv:%u] login resp Recv ,chang login status unlock.\n", threadsnum );
}
else
{
close(Rsocket_fd);
Rsocket_fd = -1;
INFO("[INFO-OnlyRecv:%u] login resp Recv ,Status :%d login err \n", threadsnum, Status);
}
}
else if(Command_ID == ID_SMGP_ACTIVE_TEST )
{
if (ActiveTestResp(Rsocket_fd, R_Seq) != 0) {
INFO("[INFO-OnlyRecv:%u]: ERR:SEND ACTIVE_TEST_RESP FAILED\n", threadsnum);
close(Rsocket_fd);
Rsocket_fd = -1;
}
INFO("[INFO-OnlyRecv:%u]: CONNECTION IS OK. R: 0x%08x\n", threadsnum, R_Seq);
}
else if(Command_ID == ID_SMGP_ACTIVE_TEST_RESP)
{
INFO("[INFO-OnlyRecv:%u]: CONNECTION IS OK. S: 0x%08x NSR(未響應(yīng)): %-4d\n", threadsnum, R_Seq, NeedSubmitResp);
}
else if(Command_ID == ID_SMGP_SUBMIT_RESP)
{
INFO("[INFO-OnlyRecv:%u]:[ERR] only for recving MO or REPORT", threadsnum);
}
else if(Command_ID == ID_SMGP_DELIVER)
{
memset(&deliver, 0, sizeof(deliver));
if (DeliverGet(socketbuffer, &deliver,Body_Length) != 0) {
INFO("[INFO-OnlyRecv:%u][-ERR]: RECV DELIVER FAILED\n", threadsnum);
close(Rsocket_fd);
Rsocket_fd = -1;
}
buffersize -= Body_Length;
if(buffersize < 0)
{
INFO("[INFO-OnlyRecv:%u][-ERR]: buffersize<0, exit\n", threadsnum);
exit(0);
}
memmove(socketbuffer, socketbuffer+Body_Length, buffersize );
//如果接收號碼有118,去掉后再插入recv_q
INFO("[INFO-OnlyRecv:%u]before:DestTermID=%s\n", threadsnum, deliver.DestTermID);
if ( strncmp(deliver.DestTermID,"118",3) == 0 ) {
char dest[21+1];
memset(dest,0,22);
strcpy(dest,deliver.DestTermID);
memset(deliver.DestTermID,0,22);
strcpy(deliver.DestTermID,dest+3);
}
INFO("[INFO-OnlyRecv:%u]after:DestTermID=%s deliver.IsReport:%d\n", threadsnum, deliver.DestTermID,deliver.IsReport);
if (deliver.IsReport == 0)
{
memset(sqlbuf, 0, sizeof(sqlbuf));
mysql_escape_string(sqlbuf, deliver.MsgContent, strlen(deliver.MsgContent)) ;
//INFO("[INFO-RECV:%u] DEBUG LinkID:[%s]\n",threadsnum, LinkID);
memset(LinkID, 0, sizeof(LinkID));
SubmitType=0;
if(unparse_packet_1(deliver.Paramater, deliver.ParamaterLen,LinkID, &SubmitType,&DealResult) !=0 )
{
INFO("[INFO-OnlyRecv:%u][ERR]: unparse_packet_1 ERR %s\n", threadsnum, deliver.Paramater);
break;
}
INFO("[INFO-RECV:%u] SubmitType:[%d] LinkID:[%s]\n",threadsnum,SubmitType, LinkID);
if( SubmitType==13 || SubmitType==15)
{
if( SubmitType==13 )
{
sprintf(query_string, "insert into %s.recv_q values(NULL,'%s','%s',now(),'%s','0','%s','4','0','%s')",
send_q_db,deliver.SrcTermID, sqlbuf, deliver.DestTermID,LinkID,"13" );
}
else if( SubmitType==15 )
{
sprintf(query_string, "insert into %s.recv_q values(NULL,'%s','%s',now(),'%s','0','%s','4','0','%s')",
send_q_db,deliver.SrcTermID, sqlbuf, deliver.DestTermID,LinkID,"15" );
}
INFO("[INFO-OnlyRecv:%u]: SubmitType is 13 or 15:%s", threadsnum, query_string );
ret= mysql_real_query(&recv_mysql, query_string, strlen(query_string));
if ( ret !=0 )
{
INFO("[-ERR]: sql_query: %s\n", mysql_error (&recv_mysql));
exit(-1);
}
} else
if( SubmitType==5 ) //包月扣費結(jié)果通知.
{
//MsgContent:09918061574 ,DXXDG ,20060601,20060630,0,001500
//oper databases; insert smc_result.
char smcBuf[256];
char mobile[21]="";
char service[20]="";
char date1[16]="";
char date2[16]="";
char status[6]="";
char feecode[8]="";
int ret2=1;
memset( smcBuf ,0 ,sizeof(smcBuf));
strcpy( smcBuf ,deliver.MsgContent );
INFO("[INFO-OnlyRecv:%u][BYKF]: SrcTermID:%s ,DestTermID:%s ,LinkID:%s smcBuf=%s\n",
threadsnum, deliver.SrcTermID, deliver.DestTermID ,LinkID ,smcBuf);
ret2=GetSmcNotice( smcBuf , mobile , service ,date1 ,date2 ,status,feecode );
//---------------------------------add by 2006-06-12
sprintf(query_string, "insert into %s.smc_result values(NULL,now(),'%s','%s','%s','%s','%s','%s','')",
send_q_db, mobile ,service ,feecode ,status,date1 ,date2);
INFO("[INFO-OnlyRecv:%u]: SubmitType is 5:%s", threadsnum, query_string );
ret= mysql_real_query(&recv_mysql, query_string, strlen(query_string));
if ( ret !=0 )
{
INFO("[-ERR]: sql_query: %s\n", mysql_error (&recv_mysql));
exit(-1);
}
}
else //MO
{
if(strlen(deliver.SrcTermID) < 8)
{
sprintf(query_string, "insert into %s.recv_q values(NULL,'%s','%s',now(),'%s','0','%s','5','0','%s')",
send_q_db,deliver.SrcTermID, sqlbuf, deliver.DestTermID,LinkID,"0" );
INFO("[INFO-OnlyRecv:%u]: SQL query:%s\n", threadsnum, query_string );
}
else{
sprintf(query_string, "insert into %s.recv_q values(NULL,'%s','%s',now(),'%s','0','%s','0','0','0')",
send_q_db,deliver.SrcTermID, sqlbuf, deliver.DestTermID, LinkID );
INFO("[INFO-OnlyRecv:%u]: MO:%s", threadsnum, query_string );
}
ret= mysql_real_query(&recv_mysql, query_string, strlen(query_string));
if ( ret !=0 )
{
INFO("[-ERR]: sql_query: %s\n", mysql_error (&recv_mysql));
exit(-1);
}
}
}
else { //狀態(tài)報告
char stat[8];
char err[8];
char temp[21];
char MsgID[11];
memset(stat,0,sizeof(stat));
memset(temp,0,sizeof(temp));
memset(MsgID,0,sizeof(MsgID));
memset(err, 0, sizeof(err));
n = GetStatus(deliver.MsgContent, MsgID, stat, err);
BCD2ASCII(temp, MsgID,10);
if (n == 1) { // 取狀態(tài)報告成功
INFO("[INFO-OnlyRecv:%u]: REPORT: stat: %s err: %s msgid: %s\n", threadsnum, stat, err, temp);
sprintf(query_string, "insert into %s.report_q(rq_id, rq_sequence, rq_mobile, rq_state, rq_code, rq_date, rq_provider, rq_flag)"
"values(NULL,'%s','%s','0','%s',now(),'0', '0')", send_q_db, temp, deliver.SrcTermID, err);
INFO("[INFO-OnlyRecv:%u]: REPORT SQL:%s\n", threadsnum, query_string);
ret = mysql_real_query(&recv_mysql, query_string, strlen(query_string));
if ( ret !=0 ) {
INFO("[INFO-OnlyRecv:%u][-ERR]: fsql_query: %s\n", threadsnum, mysql_error (&recv_mysql));
exit(-1);
}
} else {
BCD2ASCII(temp, deliver.MsgID, 10);
INFO("[INFO-OnlyRecv:%u][-ERR]: DELIVER: MsgID: %s SRC: %s DEST: %s ISREPORT: %d content=%s\n",
threadsnum, temp, deliver.SrcTermID, deliver.DestTermID, deliver.IsReport, deliver.MsgContent);
}
}
if (DeliverResp(Rsocket_fd, R_Seq, deliver.MsgID, 0) != 0) {
INFO("[INFO-OnlyRecv:%u][-ERR]: SEND DELIVER_RESP FAILED\n", threadsnum);
close(Rsocket_fd);
Rsocket_fd = -1;
}
}
else
{
// other data
INFO("[INFO-OnlyRecv:%u] Recv unknown data, Command_ID:%08x ,Body_Length :%d \n", threadsnum, Command_ID, Body_Length);
if(Body_Length >0)
{
buffersize -= Body_Length;
memmove(socketbuffer, socketbuffer + Body_Length , buffersize);
}
else
{
close(Rsocket_fd);
Rsocket_fd = -1;
}
}
}
while(Rsocket_fd <=0)
{
Rsocket_fd = ConnectISMG(R_Seq, cn, threadsnum, smg_recv_type);
INFO("[INFO-OnlyRecv:%u] Rsocket_fd :%d\n", threadsnum , Rsocket_fd);
if(Rsocket_fd >0)
{
INFO("[INFO-OnlyRecv:%u] ConnectISMG SUCCESS\n", threadsnum);
}
else
{
recvtime.tv_sec = time(NULL)+5;
recvtime.tv_nsec = 0;
pthread_mutex_lock(&recvlock);
ret = pthread_cond_timedwait(&recvcond, &recvlock,(const struct timespec *)&recvtime);
if(ret !=0 )
{
INFO("[INFO-RECV:%u]: Connect ISMG failed , wait 5 sec .\n", threadsnum);
pthread_mutex_unlock(&recvlock);
}
}
}
if(buffersize ==0 )
{
recvtime.tv_sec = time(NULL)+1;
recvtime.tv_nsec = 0;
pthread_mutex_lock(&recvlock);
ret = pthread_cond_timedwait(&recvcond, &recvlock, (const struct timespec *)&recvtime);
if(ret !=0 )
{
INFO("[INFO-OnlyRecv:%u]: NO data ,wait 1 sec .\n", threadsnum);
pthread_mutex_unlock(&recvlock);
}
}
}
}
void *Recv(int *argu)
{
int n=0;
int Status=0,ret = 0;
int Version = 0;
unsigned char t_MsgID[40],MsgID[40];
SMGP_DELIVER deliver;
int Rsocket_fd=0, recv_len= 0;
char bufferrecv[1024]="";
unsigned int Command_ID=0;
pthread_t threadsnum=0;
unsigned int R_Seq = 0;
unsigned int Body_Length=0;
int cn= 0, connectionnum = 0,i =0;
struct timespec recvtime;
pthread_mutex_t recvlock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t recvcond = PTHREAD_COND_INITIALIZER;
char *socketbuffer = NULL;
int bufferoffset =0, buffersize=0;
RES_Q resqtmp;
struct tm *tp;
time_t timep;
int resqnum =0;
char sqlbuf[1024];
char query_string[1024];
MYSQL recv_mysql;
unsigned char DealResult =0;
unsigned char LinkID[21];
unsigned char SubmitType=0;
threadsnum = pthread_self();
pthread_detach(threadsnum);
cn = argu[1]; // 連接號
INFO("[INFO-RECV:%u] RECV Thread START\n", threadsnum);
/*連接recv_q, report_q所在數(shù)據(jù)庫*/
if(!mysql_init(&recv_mysql))
{
INFO("[INFO-RECV:%u][-ERR} init mysql error \n", threadsnum);
exit(-1);
}
if (!mysql_real_connect(&recv_mysql, send_q_host, send_q_user, send_q_pass, send_q_db, send_q_port, NULL, 0) )
{
INFO("[INFO-RECV:%u][-ERR] sql_connect: %s\n", mysql_error(&recv_mysql));
exit(-1);
}
socketbuffer = malloc(socketbuffersize);
if(socketbuffer == NULL)
{
INFO("[INFO-RECV:%u] malloc error .\n", threadsnum);
socketbuffer = malloc(socketbuffersize);
}
buffersize = 0;
INFO("[INFO-RECV:%u]: Recv thread start!\n", threadsnum);
while(1)
{
//INFO("[INFO-RECV:]1Rsocket_fd = %d\n", Rsocket_fd);
if( HaveData(Rsocket_fd) > 0 )
{
// have data to receive
INFO("[INFO-RECV:] have-data-to-receive\n");
//INFO("[INFO-RECV:]2Rsocket_fd = %d\n", Rsocket_fd);
if( buffersize < socketbuffersize)
{
//INFO("[INFO-RECV:]3Rsocket_fd = %d\n", Rsocket_fd);
//INFO("[INFO-RECV:]socketbuffersize-buffersize-1 = %d\n", socketbuffersize-buffersize-1);
//INFO("[INFO-RECV:]buffersize = %d\n", buffersize);
//INFO("[INFO-RECV:]socketbuffer = %s\n", socketbuffer);
recv_len = recv(Rsocket_fd, socketbuffer+buffersize, socketbuffersize-buffersize-1, 0);
//INFO("[INFO-RECV:]4Rsocket_fd = %d\n", Rsocket_fd);
if(recv_len <= 0)
{
// reconnect
INFO("[INFO-RECV:] errno = %d\n", errno);
?? 快捷鍵說明
復(fù)制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -