?? cngp.c
字號:
}
else if(Is_Report == 1)
{
char err[4]="";
int status =0;
parse_msgid (Msg_Content + 3, msgid);
//memset( msgid ,0, sizeof(msgid));
//strncpy( msgid,Msg_Content+3,20);
err[0] = *(Msg_Content + 93); //93 err code
err[1] = *(Msg_Content + 94); //93 err code
err[2] = *(Msg_Content + 95); //93 err code
err[3] = '\0';
INFO("Msg_Content=%s\n", Msg_Content ); // 14 = strlen("id:") + msgid length+ space
INFO("Msg_msgid=%s\n", msgid );
INFO("err = %s\n", err);
if (Command_Status != 0)
status = 2;
sprintf (query_string, "insert into %s.report_q (rq_id, rq_sequence,rq_mobile, rq_state, rq_code, rq_date, rq_provider, rq_flag) values(NULL,'%s','%s','%d','%s',now(),'0')", send_q_db, msgid, Src_Term_ID, status, err); //Src_Term_ID is send cellphone code,
INFO("[INFO-RECVMO:%u] Receive report sql: %s\n",query_string,threadsnum);
ii = mysql_real_query (&fsql_mo, query_string, strlen (query_string));
if(ii !=0)
{
INFO("[INFO-RECVMO:%u] ERROR:mysql_real_query :%s\n",threadsnum , mysql_error(&fsql_mo));
}
else
INFO("[INFO-RECVMO:%u] INSERT:mysql_affected_rows:%d\n", threadsnum, mysql_affected_rows(&fsql_mo) );
}
else{
// wrong type
INFO("[INFO-RECVMO:%u] Error Command_ID=>Command_ID=%8x\n", threadsnum, Command_ID);
}
}
else
{
// other data
INFO("[INFO-RECVMO:%u] Recv unknown data, Command_ID:%08x ,Body_Length :%d \n", threadsnum, Command_ID, Body_Length);
if(Body_Length >0)
{
buffersize -= Body_Length;
memmove( socketbuffer, socketbuffer + Body_Length , buffersize );
}
}
}
while( loginstatus == 0 )
{
Rsocket_fd = ConnectISMG(CNGP_IP, recvmoport, LOGIN_NAME, LOGIN_PASSWORD, Sequence_Num, recvmomode, recvmoversion, threadsnum);
INFO("[INFO-RECVMO:%u] ConnectISMG socket :%d\n", threadsnum, Rsocket_fd);
if( Rsocket_fd > 0 )
{
loginstatus = 0;
}
else
{
recvmotime.tv_sec = time(NULL)+5;
recvmotime.tv_nsec = 0;
pthread_mutex_lock(&recvmolock);
ret = pthread_cond_timedwait( &recvmocond, &recvmolock, (const struct timespec *)&recvmotime );
if(ret !=0 )
{
INFO("[INFO-RECVMO:%u]: Connect ISMG failed , wait 5 sec .\n", threadsnum);
pthread_mutex_unlock(&recvmolock);
}
}
}
timeend = time(NULL);
if( loginstatus == 2 && (timeend-times) >= active_time)
{
if( Sequence_Num == 0xffffffff)
Sequence_Num = 1;
else
Sequence_Num ++;
ret= CNGP_Active_Test(Rsocket_fd, Sequence_Num);
INFO("[INFO-RECVMO:%u] Active test send %d\n", threadsnum, ret);
times = time(NULL);
}
}
}
void *Recv_Res(int *argu)
{
int n=0,ii=0;
int ret = 0;
int datatmp=0;
int Rsocket_fd=0, recv_len= 0;
unsigned long Command_ID=0;
pthread_t threadsnum=0;
unsigned long Body_Length=0;
int cn= 0;
struct timespec recvtime;
pthread_mutex_t recvlock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t recvcond = PTHREAD_COND_INITIALIZER;
char *socketbuffer = NULL;
int buffersize=0;
RES_Q resqtmp;
struct tm *tp;
time_t timep;
time_t lastLoginTime;
time_t nowTime;
unsigned long icount=0;
int resqnum =0;
int recvport =0, recvversion =0;
unsigned long Sequence_Num=1;
unsigned long SUB_Sequence_Num=0;
int return_id =0;
char Msg_Content[500]="", query_string[1024],sqlbuf[1024];
char Version;
MYSQL fsql_res;
int recvmode = 0;
unsigned long Command_Status =0;
char Msg_ID[30]="", Recv_Time[25] ="",Src_Term_ID[32] ="", Dest_Term_ID[32] ="", tmp[500]="";
char ContentPackType[20]="",IsmpPackType[20]="",msgid[30]="";
int Is_Report=0,Msg_Length=0;
TLV Congestion_State = { 0x0428, 1, 35 };
char RerverseId[30]="",ErrorCode[32]="";
threadsnum = pthread_self();
pthread_detach(threadsnum);
mysql_thread_init();
recvport = argu[0]; // Port
cn = argu[1]; // 連接號
recvversion = argu[2]; // Version
recvmode = argu[3]; // mode
INFO("[INFO-RECV:%u] RECV Thread START\n", threadsnum);
if(!mysql_init(&fsql_res))
{
INFO("[-ERR:RECV:%u] init error \n", threadsnum);
exit(EXIT_FAILURE);
}
if (!mysql_real_connect(&fsql_res, send_q_host, send_q_user, send_q_pass, send_q_db, send_q_port, NULL, 0) )
{
INFO("[-ERR:RECV:%u] RECV MO sql_connect: %s\n", threadsnum, mysql_error(&fsql_res));
exit(EXIT_FAILURE);
}
socketbuffer = malloc(socketbuffersize);
if(socketbuffer == NULL)
{
INFO("[INFO-RECV:%u] malloc error .\n", threadsnum);
socketbuffer = malloc(socketbuffersize);
}
buffersize = 0;
INFO("[INFO-RECV:%u]: Recv thread start!\n", threadsnum);
while(Rsocket_fd <=0)
{
if ( threads[cn][0] >0 )
{
close(threads[cn][0]);
INFO("[INFO-RECV:%u]: sleep5 ready reconnect....\n", threadsnum);
sleep(5);
}
Rsocket_fd = ConnectISMG(CNGP_IP, recvport, LOGIN_NAME, LOGIN_PASSWORD, Sequence_Num, recvmode, recvversion, threadsnum);
if(Rsocket_fd > 0)
{
time(&lastLoginTime); //保存上次登錄的時間.
pthread_mutex_lock(&cnum_lock[cn]);
threads[cn][0] = Rsocket_fd;
pthread_mutex_unlock(&cnum_lock[cn]);
INFO("[INFO-RECV:%u] ConnectISMG socket :%d\n", threadsnum, Rsocket_fd);
}
else{
recvtime.tv_sec = time(NULL)+5;
recvtime.tv_nsec = 0;
pthread_mutex_lock(&recvlock);
ret = pthread_cond_timedwait(&recvcond, &recvlock, (const struct timespec *)&recvtime);
if(ret !=0 )
{
INFO("[INFO-RECV:%u]: Connect to ISMG Error , wait 5 Sec .\n", threadsnum);
pthread_mutex_unlock(&recvlock);
}
}
}
icount=0;
while(1)
{
icount++;
if( HaveDataTimeout(Rsocket_fd, timeoutu) > 0 )
{
//INFO("[INFO-RECV:%u] ----RecvHaveData--- \n",threadsnum);
// have data to receive
if( buffersize < socketbuffersize)
{
recv_len = recv(Rsocket_fd, socketbuffer+buffersize, socketbuffersize-buffersize-1, 0);
if(recv_len < 0) // <= 0)
{
int fdtmp=0;
// reconnect
INFO("[INFO-RECV:%u] recv thread cannot receive data from socket, reconnect. recv_len:%d\n",threadsnum, recv_len);
INFO("[INFO-RECV:%u] Reset login status lock\n", threadsnum);
fdtmp =threads[cn][0];
pthread_mutex_lock(&cnum_lock[cn]);
threads[cn][1] = 0; // login status
threads[cn][2] = 0; // need submit resp
threads[cn][0] = 0; // socket
pthread_mutex_unlock(&cnum_lock[cn]);
INFO("[INFO-RECV:%u] Reset login status unlock\n", threadsnum);
CNGP_Exit(Rsocket_fd, SUB_Sequence_Num);
close(Rsocket_fd);
close(datatmp);
sleep(5);
INFO("[INFO-RECV:%u] Buffersize == %d\n", threadsnum, buffersize);
if( 0 == buffersize )
{
INFO("[INFO-RECV:%u] Buffersize ==0 , reconnect\n", threadsnum);
Sequence_Num =1;
Rsocket_fd = ConnectISMG(CNGP_IP, recvport, LOGIN_NAME, LOGIN_PASSWORD, Sequence_Num, recvmode, recvversion, threadsnum);
if(Rsocket_fd > 0)
{
time(&lastLoginTime);
pthread_mutex_lock(&cnum_lock[cn]);
threads[cn][0] = Rsocket_fd;
pthread_mutex_unlock(&cnum_lock[cn]);
}
//模擬 response
INFO("[INFO-RECV:%u] simulate resp lock\n", threadsnum );
pthread_mutex_lock(&sendq_lock[cn]);
pthread_mutex_lock(&resq_lock[cn]);
n = simulateres(SENDQ[cn], RESQ[cn]);
pthread_mutex_unlock(&resq_lock[cn]);
pthread_mutex_unlock(&sendq_lock[cn]);
INFO("[INFO-RECV:%u] simulate resp unlock simulate:%d\n", threadsnum ,n);
//修改等待resp數量
INFO("[INFO-RECV:%u] change need sub resp num lock\n", threadsnum);
pthread_mutex_lock(&cnum_lock[cn]);
threads[cn][2] -= n;
pthread_mutex_unlock(&cnum_lock[cn]);
INFO("[INFO-RECV:%u] change need sub resp num lock\n", threadsnum);
}
}
else
{
if( 0== recv_len )
{
if( icount%7==0 )
INFO("[INFO-RECV:%u] RECV DATA %d\n", threadsnum, recv_len);
}else
{
INFO("[INFO-RECV:%u] RECV DATA %d\n", threadsnum, recv_len);
}
buffersize += recv_len;
}
}
}
resqnum = checkresqnum(RESQ[cn]);
while( (buffersize >= LEN_CNGP_HEADER) && (resqnum >0))
{
n = CNGP_HEADER_Recv_mem(socketbuffer, &Body_Length, &Command_ID, &Command_Status, &SUB_Sequence_Num);
buffersize = buffersize - LEN_CNGP_HEADER;
INFO("[INFO-RECV:%u] buffsersize:%d Command_ID:%08x Body_Length:%d SUB_Sequence_Num:%u\n", threadsnum, buffersize, Command_ID, Body_Length, SUB_Sequence_Num);
memmove(socketbuffer , socketbuffer+LEN_CNGP_HEADER, buffersize);
if( (buffersize < Body_Length) && (threads[cn][1] ==2) )
{
recv_len = recv(Rsocket_fd, socketbuffer+buffersize, socketbuffersize-buffersize-1, 0);
if(recv_len> 0){
buffersize += recv_len;
}
}
if(Command_ID == ID_CNGP_ACTIVE_TEST_RESP)
{
// ACTIVE_TEST_RESP
INFO("[INFO-RECV:%u] Receive Active_Test_Resp .\n", threadsnum);
}
else if(Command_ID == ID_CNGP_ACTIVE_TEST)
{
// ACTIVE_TEST
INFO("[INFO-RECV:%u] Receive Active_Test .\n", threadsnum);
CNGP_Active_TestRes(Rsocket_fd, SUB_Sequence_Num);
INFO("[INFO-RECV:%u] Send Active_Test_Resp.\n", threadsnum);
}
else if( Command_ID == ID_CNGP_LOGIN_RESP)
{
CNGP_Receive_LoginRes_mem(socketbuffer, Body_Length, &Version);
buffersize = buffersize - Body_Length;
memmove(socketbuffer, socketbuffer+Body_Length, buffersize);
if( 0 == Command_Status)
{
pthread_mutex_lock(&cnum_lock[cn]);
threads[cn][1] = 2; // login status
threads[cn][2] = 0; // need submit resp
pthread_mutex_unlock(&cnum_lock[cn]);
Rsocket_fd = threads[cn][0];
INFO("[INFO-RECV:%u] connect resp Recv OK,chang login status .\n", threadsnum );
}
else
{
INFO("[INFO-RECV:%u] connect resp Recv ,Status :%d login err .lock\n", threadsnum, Command_Status);
if( threads[cn][0]>0)
close(threads[cn][0]);
close(Rsocket_fd);
buffersize=0;
pthread_mutex_lock(&cnum_lock[cn]);
threads[cn][0] = 0; // socket
threads[cn][1] = 0; // not login
threads[cn][2] = 0; // reset need submit resp
pthread_mutex_unlock(&cnum_lock[cn]);
INFO("[INFO-RECV:%u] connect resp Recv ,login err . unlock\n", threadsnum );
sleep(5);
}
}
else if(Command_ID == ID_CNGP_DELIVER)
{
memset (Msg_ID, 0, 10);
memset (Recv_Time, 0, 20);
memset (Src_Term_ID, 0, 21);
memset (Dest_Term_ID, 0, 21);
memset (Msg_Content, 0, sizeof(Msg_Content) );
memset (tmp, 0, 500);
memset (ContentPackType,0,16);
memset (IsmpPackType,0,16);
CNGP_Receive_Deliver_mem( socketbuffer, Body_Length, Msg_ID, &Is_Report, Recv_Time, Src_Term_ID, Dest_Term_ID, &Msg_Length, Msg_Content);
//buffersize = buffersize - Body_Length;
if( buffersize < Body_Length)
{
int ki=0;
for(ki=0;ki<5;ki++)
{
recv_len = recv(Rsocket_fd, socketbuffer+buffersize, socketbuffersize-buffersize-1, 0);
if( recv_len >0 )
ki=99;
else
usleep(200*1000);
}
if( recv_len > 0)
{
buffersize += recv_len;
CNGP_Receive_Deliver_mem( socketbuffer, Body_Length, Msg_ID, &Is_Report, Recv_Time, Src_Term_ID, Dest_Term_ID, &Msg_Length, Msg_Content);
}
else{
#if 1
INFO("[INFO-RECV:%u] Deliver error packet.\n",threadsnum);
//INFO("[INFO-RECV:%u] error packet content:%s\n",threadsnum,socketbuffer);
memset( socketbuffer,0,socketbuffersize );
buffersize=0;
INFO("[INFO-RECV:%u] not handle error packet.\n",threadsnum);
INFO("[INFO-RECV:%u] simulate resp lock\n", threadsnum );
pthread_mutex_lock(&sendq_lock[cn]);
pthread_mutex_lock(&resq_lock[cn]);
n = simulateres(SENDQ[cn], RESQ[cn]);
pthread_mutex_unlock(&resq_lock[cn]);
pthread_mutex_unlock(&sendq_lock[cn]);
INFO("[INFO-RECV:%u] simulate resp u
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -