?? itgrecv.cpp
字號:
/* Component of the D-ITG 2.4 Platform
*
*
* copyright : (C) 2004 by Stefano Avallone, Alessio Botta, Donato Emma,
* Salvatore Guadagno, Antonio Pescape'
* DIS Dipartimento di Informatica e Sistemistica
* (Computer Science Department)
* University of Naples "Federico II"
* email: : {stavallo, pescape}@unina.it, {abotta, demma, sguadagno}@napoli.consorzio-cini.it
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*/
#include "../common/ITG.h"
#include "../common/thread.h"
#include "../common/pipes.h"
#include "ITGRecv.h"
#include "data.h"
#ifdef WIN32
#include <math.h>
#endif
#ifdef LINUX_OS
#include <sys/uio.h>
#include <netinet/ip.h>
#include <netinet/udp.h>
#include <wait.h>
#endif
struct addrinfo *globaleLogHost;
int globaleLogSock = 0;
int globaleLogSockSignaling = 0;
int sockSignaling;
int flagTerm = NO_TERMINATE;
int logCheck = 0;
int logRemote = 0;
ofstream out;
char logFile[DIM_LOG_FILE];
BYTE l4ProtoLog = DEFAULT_PROTOCOL_TX_LOG;
manageLogFile memLogFile[MAX_NUM_THREAD];
bool setPriority = false;
char nameProgram[]="ITGRecv";
#ifdef WIN32
const char DEFAULT_LOG_FILE[] = "ITGRecv.log";
HANDLE mutexLog;
int userId = 0;
#endif
#ifdef LINUX_OS
const char DEFAULT_LOG_FILE[] = "/tmp/ITGRecv.log";
uid_t userId;
pthread_mutex_t mutexLog;
#endif
void reportErrorAndExit(char *function, char *program, char *msg)
{
if (flagTerm == NO_TERMINATE)
{
printf("\n** ERROR_TERMINATE **\n");
printf("Function %s aborted caused by %s \n",function,program);
printf("** %s ** \n",msg);
terminate(SIGTERM);
}
else if (flagTerm == ERROR_TERMINATE)
{
printf("\n** ERROR_TERMINATE IN TERMINATE **\n");
printf("Function %s aborted caused by %s \n",function,program);
printf("** %s ** \n",msg);
printf("Finish with error in terminate!\n");
exit(1);
}
else
sleep(INFINITE);
}
void terminate(int sign)
{
if (flagTerm == NO_TERMINATE) {
flagTerm = TERMINATE;
#ifdef DEBUG
printf("** Terminate function ***\n");
#endif
if (logCheck == 1) out.close();
else if (logRemote ==1)
{
signaling signalingLog;
signalingLog.stop = true;
if ( sendto(globaleLogSockSignaling, (char *) &signalingLog, sizeof(signalingLog), 0,
globaleLogHost->ai_addr, globaleLogHost->ai_addrlen) < 0) {
flagTerm = ERROR_TERMINATE;
reportErrorAndExit("terminate","sendto","Cannot send to LogServer info about stopping signaling");
}
#ifdef DEBUG
printf("Signaling Log STOP sent to Log Server \n");
#endif
if ( closeSock(globaleLogSock) < 0) {
flagTerm = ERROR_TERMINATE;
reportErrorAndExit("terminate","closeSock","Cannot close socket logSock");
}
if ( closeSock(globaleLogSockSignaling) < 0) {
flagTerm = ERROR_TERMINATE;
reportErrorAndExit("terminate","closeSock","Cannot close socket logSockSignaling");
}
}
if ( MUTEX_THREAD_RELEASE(mutexLog) < 0) {
flagTerm = ERROR_TERMINATE;
reportErrorAndExit("terminate","MUTEX_THREAD_RELEASE","Cannot release Log Mutex");
}
#ifdef WIN32
if ( WSACleanup() != 0) {
flagTerm = ERROR_TERMINATE;
reportErrorAndExit("terminate","WSACleanup","Cannot clean WSA");
}
#endif
if (sign == SIGINT)
printf("Finish with CRTL-C! \n");
else if (sign == SIGTERM)
printf("Finish requested caused by errors! \n");
}
exit(1);
}
char *allowedLogFile(char logFile[DIM_LOG_FILE])
{
int i = 0;
bool find = true;
while ((i < MAX_NUM_THREAD) && (find == true)) {
if (strcmp(memLogFile[i].logFile, logFile) == 0) {
find = false;
memLogFile[i].num++;
return NULL;
} else
i++;
}
i = 0;
while (memLogFile[i].num != -1)
i++;
memLogFile[i].out.open(logFile, ios::out | ios::binary | ios::trunc);
if (!memLogFile[i].out) {
char* tail = (char *) malloc(sizeof("Error into open this file : ") + sizeof(logFile));
if (tail == NULL)
reportErrorAndExit("allowedLogFile","malloc","Insifficient memory available");
sprintf(tail,"Error into open this file : %s",logFile);
reportErrorAndExit("allowedLogFile","open",tail);
}
memLogFile[i].num = 1;
strcpy(memLogFile[i].logFile, logFile);
return (char *) &memLogFile[i].out;
}
void closeFileLog(ofstream * out)
{
int i = 0;
while (out != (ofstream *) & memLogFile[i].out)
i++;
memLogFile[i].num--;
if (memLogFile[i].num == 0) {
memLogFile[i].num = -1;
strcpy(memLogFile[i].logFile, " ");
(*out).close();
}
}
int sendAck(int signaling, BYTE typeMessage)
{
char msg;
putValue(&msg, (void *) &typeMessage, sizeof(typeMessage));
if ( send(signaling, (char *) &msg, sizeof(msg), 0) < 0)
return -1;
else
return 0;
}
int sendAckFlow(int signaling, BYTE typeMessage, int flowId)
{
char msg[sizeof(BYTE) + sizeof(int)];
char *next;
int sizeMessag = sizeof(BYTE) + sizeof (int);
next = putValue(&msg, (void *) &typeMessage, sizeof(typeMessage));
next = putValue(next, (void *) &flowId, sizeof(int));
if (send(signaling, (char *) &msg, sizeMessag, 0) < 0)
return -1;
else
return 0;
}
void *signalManager(void *param)
{
struct addrinfo logHost;
int logSock = 0;
int logSockSignaling = 0;
paramThread *para;
para = (paramThread *) param;
pthread_t hThr[MAX_NUM_THREAD];
#ifdef LINUX_OS
fd_set activeSet;
timeval timeOutFile;
#endif
memChannel flowIdNum[MAX_NUM_THREAD];
paramThread paraThread[MAX_NUM_THREAD];
BYTE type;
int numFlow = 0;
int newSockSignaling = para->socket;
char buffer[1];
bool uscita = false;
int numDiscovery = 0;
char *fileDescriptor = (char*)&out;
char nameFileLog[DIM_LOG_FILE];
BYTE protocolLog;
#ifdef LINUX_OS
int fd, maxfd;
int rPipe[2];
#endif
#ifdef WIN32
HANDLE rPipe[3];
HANDLE fd, namedPipe;
HANDLE events[2];
DWORD available = 0;
DWORD waited;
unsigned long pending;
#endif
for (int i = 0; i < MAX_NUM_THREAD; i++) {
flowIdNum[i].flowId = -1;
hThr[i] = 0;
paraThread[i].flowId = 0;
paraThread[i].count = 0;
paraThread[i].socket = 0;
paraThread[i].socketClose = 0;
}
if (createNewPipe(rPipe) < 0) {
printf("Error in signal to create a new pipe \n");
}
do {
if ( recv(newSockSignaling, (char *) &buffer, sizeof(BYTE), 0) < 0)
reportErrorAndExit("signalManager","recv - connect","Cannot receive newSockSignaling data");
} while (*(BYTE *) buffer != TSP_CONNECT);
#ifdef DEBUG
printf("Signal Manager : Received TSP_CONNECT(1) message\n");
#endif
if ( sendAck(newSockSignaling, TSP_ACK_CONNECT) < 0)
reportErrorAndExit("signalManager","sendAck","Cannot send connect ack");
#ifdef DEBUG
printf("Signal Manager : Sent TSP_ACK_CONNECT(2) message\n");
#endif
fd = rPipe[0];
#ifdef LINUX_OS
maxfd = max( fd, newSockSignaling);
#endif
#ifdef WIN32
events[0] = WSACreateEvent();
WSAEventSelect(newSockSignaling, events[0], FD_READ);
events[1] = rPipe[1];
namedPipe = rPipe[2];
#endif
if (logRemote == 1){
logSockSignaling = globaleLogSockSignaling;
logSock = globaleLogSock;
copia(globaleLogHost, logHost);
}
while (1) {
type = 0;
#ifdef LINUX_OS
FD_ZERO(&activeSet);
FD_SET(fd, &activeSet);
FD_SET((unsigned int)newSockSignaling, &activeSet);
timeOutFile.tv_sec = TIME_OUT;
timeOutFile.tv_usec = 0;
if (select(FD_SETSIZE, &activeSet, NULL, NULL, &timeOutFile) < 0)
reportErrorAndExit("signalManager","select - type","Invalid file descriptor");
if (FD_ISSET(fd, &activeSet)) {
#endif
#ifdef WIN32
waited=WaitForMultipleObjects(2, (const HANDLE*)events,FALSE, TIME_OUT*1000);
ResetEvent(events[0]);
available = 0;
if (PeekNamedPipe(namedPipe, NULL , 0 , NULL , &available , NULL) == 0) {
reportErrorAndExit("signalManager","PeekNamedPipee","Error in peek named Pipe");
}
#ifdef DEBUG
printf("available: %d\n", available);
#endif
while(available > 0) {
#endif
pipeParser(newSockSignaling,numFlow,rPipe,flowIdNum, paraThread, hThr);
#ifdef WIN32
if (PeekNamedPipe(namedPipe, NULL , 0 , NULL , &available , NULL) == 0) {
reportErrorAndExit("signalManager","PeekNamedPipee","Error in peek named Pipe");
}
#endif
}
#ifdef LINUX_OS
else if (FD_ISSET(newSockSignaling, &activeSet)) {
#endif
#ifdef WIN32
pending = 0;
ioctlsocket(newSockSignaling, FIONREAD, &pending);
#ifdef DEBUG
printf("pending - out: %d\n", pending);
#endif
while ((pending > 0) && (uscita!=true)){
#endif
numDiscovery = 0;
if ( recv(newSockSignaling, (char *) &type, sizeof(type), 0) < 0)
reportErrorAndExit("signalManager","recv - type",
"Cannot receive data on newSockSignaling");
if (type == TSP_SEND_NAME_LOG){
recvNameLog(nameFileLog, newSockSignaling);
if (logCheck != 1){
fileDescriptor = allowedLogFile(nameFileLog);
if (fileDescriptor == NULL) {
if (sendAck(newSockSignaling, TSP_ERR_MSG_4) < 0)
reportErrorAndExit("pipeParser","",
"Cannot send TSP_ERR_MSG_4 message to sender");
#ifdef DEBUG
printf("Signal manager : sent TSP_ERR_MSG_4(22) message\n");
#endif
}
else {
if ( sendAck(newSockSignaling, TSP_ACK_SEND_NAME_LOG) < 0)
reportErrorAndExit("recvNameLog","sendAck - type = TSP_ACK_SEND_NAME_LOG(18)",
"Cannot send Ack on newSockSignaling");
#ifdef DEBUG
printf("Signal manager : sent TSP_ACK_SEND_FLOW_LOG(18) message\n");
#endif
logCheck = 2;
}
}
else {
if ( sendAck(newSockSignaling, TSP_ACK_SEND_NAME_LOG) < 0)
reportErrorAndExit("recvNameLog","sendAck - type = TSP_ACK_SEND_NAME_LOG(18)",
"Cannot send Ack on newSockSignaling");
#ifdef DEBUG
printf("Signal manager : sent TSP_ACK_SEND_FLOW_LOG(18) message\n");
#endif
fileDescriptor = (char*)&out;
}
} else if (type == TSP_SEND_FLOW_LOG) {
recvFlowLog(newSockSignaling, logHost, protocolLog, nameFileLog);
if (logRemote != 1) {
createRemoteLogFile(logHost, nameFileLog, protocolLog, logSockSignaling, logSock);
logRemote = 2;
} else {
logSockSignaling = globaleLogSockSignaling;
logSock = globaleLogSock;
copia(globaleLogHost, logHost);
}
if ( sendAck(newSockSignaling, TSP_ACK_SEND_FLOW_LOG) < 0)
reportErrorAndExit("sendFlowLog","sendAck - type = TSP_ACK_SEND_FLOW_LOG(12)",
"Cannot send Ack on newSockSignaling");
#ifdef DEBUG
printf("Signal manager : sent TSP_ACK_SEND_FLOW_LOG(12) message\n");
#endif
} else if ((type == TSP_RELEASE) || (type == TSP_CLOSED_ERR) || (type == TSP_SENDER_DOWN) || (type == TSP_SEND_FLOW) ||
(type == TSP_CLOSED_FLOW) || (type == TSP_ERR_MSG_1)){
if (typeParser(type, numFlow, newSockSignaling , flowIdNum, paraThread, hThr, rPipe, fileDescriptor, logSock, logSockSignaling, &logHost) == -1) {
uscita = true;
}
}
#ifdef WIN32
ioctlsocket(newSockSignaling, FIONREAD, &pending);
#ifdef DEBUG
printf("pending -in : %d\n", pending);
#endif
#endif
}
#ifdef LINUX_OS
else {
#endif
#ifdef WIN32
if (waited == WAIT_TIMEOUT) {
#endif
type = TSP_DISCOVERY;
if ( send(newSockSignaling, (char *) &type, sizeof(type), 0) < 0)
reportErrorAndExit("signalManager","recv - discovery",
"Cannot receive data on newSockSignaling");
#ifdef DEBUG
printf("Signal manager : Sent TSP_DISCOVERY message\n");
#endif
numDiscovery++;
if (numDiscovery == 2) {
#ifdef DEBUG
printf("Signal manager : Sender is down\n");
#endif
type = TSP_SENDER_DOWN;
uscita = true;
}
}
if (uscita == true) break;
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -