?? ipcsocket.c
字號:
/* $Id: ipcsocket.c,v 1.123 2005/02/11 21:39:35 alan Exp $ *//* * ipcsocket unix domain socket implementation of IPC abstraction. * * Copyright (c) 2002 Xiaoxiang Liu <xiliu@ncsa.uiuc.edu> * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA * */#include <portability.h>#include <clplumbing/ipc.h>#include <clplumbing/cl_log.h>#include <clplumbing/realtime.h>#include <clplumbing/cl_poll.h>#include <stdio.h>#include <stdlib.h>#include <string.h>#include <syslog.h>#include <sched.h>#include <sys/types.h>#include <sys/stat.h>#include <sys/param.h>#include <sys/uio.h>#ifdef HAVE_SYS_FILIO_H# include <sys/filio.h>#endif#ifdef HAVE_SYS_SYSLIMITS_H# include <sys/syslimits.h>#endif#ifdef HAVE_SYS_CRED_H# include <sys/cred.h>#endif#ifdef HAVE_SYS_UCRED_H# include <sys/ucred.h>#endif#include <sys/socket.h>#include <sys/poll.h>#include <netinet/in.h>#include <sys/un.h>#include <sys/ioctl.h>#include <unistd.h>#include <errno.h>#include <fcntl.h>#ifndef UNIX_PATH_MAX# define UNIX_PATH_MAX 108#endif#define MAX_LISTEN_NUM 10#ifndef SUN_LEN# define SUN_LEN(ptr) ((size_t) (offsetof (sockaddr_un, sun_path) + strlen ((ptr)->sun_path))#endif#ifndef MSG_NOSIGNAL#define MSG_NOSIGNAL 0#endif#ifndef AF_LOCAL#define AF_LOCAL AF_UNIX#endif/*********************************************************************** * * Determine the IPC authentication scheme... More machine dependent than * we'd like, but don't know any better way... * ***********************************************************************/#ifdef SO_PEERCRED# define USE_SO_PEERCRED#elif HAVE_GETPEEREID# define USE_GETPEEREID#elif ON_DARWIN/* Darwin has SCM_CREDS but it has been crippled by Apple * - force USE_BINDSTAT_CREDS instead */# define USE_BINDSTAT_CREDS#elif defined(SCM_CREDS)# define USE_SCM_CREDS#else# define USE_DUMMY_CREDS/* This will make it compile, but attempts to authenticate * will fail. This is a stopgap measure ;-) */#endif/* wait connection private data. */struct SOCKET_WAIT_CONN_PRIVATE{ /* the path name wich the connection will be built on. */ char path_name[UNIX_PATH_MAX]; /* the domain socket. */ int s;};/* channel private data. */struct SOCKET_CH_PRIVATE{ /* the path name wich the connection will be built on. */ char path_name[UNIX_PATH_MAX]; /* the domain socket. */ int s; /* the size of expecting data for below buffered message buf_msg */ int remaining_data; /* The address of our peer - used by USE_BINDSTAT_CREDS version of * socket_verify_auth() */ struct sockaddr_un *peer_addr; /* the buf used to save unfinished message */ struct IPC_MESSAGE *buf_msg;};struct IPC_Stats { long nsent; long noutqueued; long send_count; long nreceived; long ninqueued; long recv_count; int last_recv_errno; int last_recv_rc; int last_send_errno; int last_send_rc;};struct IPC_Stats SocketIPCStats = {0,0,0,0};/* unix domain socket implementations of IPC functions. */static void socket_destroy_wait_conn(struct IPC_WAIT_CONNECTION * wait_conn);static int socket_wait_selectfd(struct IPC_WAIT_CONNECTION *wait_conn);static struct IPC_CHANNEL * socket_accept_connection(struct IPC_WAIT_CONNECTION * wait_conn, struct IPC_AUTH *auth_info);static void socket_destroy_channel(struct IPC_CHANNEL * ch);static int socket_initiate_connection(struct IPC_CHANNEL * ch);static int socket_send(struct IPC_CHANNEL * ch, struct IPC_MESSAGE* message);static int socket_recv(struct IPC_CHANNEL * ch, struct IPC_MESSAGE** message);static int socket_resume_io(struct IPC_CHANNEL *ch);static gboolean socket_is_message_pending(struct IPC_CHANNEL *ch);static gboolean socket_is_output_pending(struct IPC_CHANNEL *ch);static int socket_assert_auth(struct IPC_CHANNEL *ch, GHashTable *auth);static int socket_verify_auth(struct IPC_CHANNEL*ch, struct IPC_AUTH*auth_info);/* for domain socket, reve_fd = send_fd. */static int socket_get_recv_fd(struct IPC_CHANNEL *ch);static int socket_get_send_fd(struct IPC_CHANNEL *ch);static int socket_set_send_qlen (struct IPC_CHANNEL* ch, int q_len);static int socket_set_recv_qlen (struct IPC_CHANNEL* ch, int q_len);/* helper functions. */static int socket_disconnect(struct IPC_CHANNEL* ch);static struct IPC_QUEUE* socket_queue_new(void);static void socket_destroy_queue(struct IPC_QUEUE * q);static struct IPC_MESSAGE* socket_message_new(struct IPC_CHANNEL*ch, int msg_len);void socket_free_message(struct IPC_MESSAGE * msg);struct IPC_WAIT_CONNECTION *socket_wait_conn_new(GHashTable* ch_attrs);struct IPC_CHANNEL* socket_client_channel_new(GHashTable *attrs);struct IPC_CHANNEL* socket_server_channel_new(int sockfd);pid_t socket_get_farside_pid(int sockfd);extern int (*ipc_pollfunc_ptr)(struct pollfd *, nfds_t, int);static int socket_waitin(struct IPC_CHANNEL * ch);static int socket_waitout(struct IPC_CHANNEL * ch);static int socket_resume_io_read(struct IPC_CHANNEL *ch, int*, gboolean read1anyway);static void socket_set_high_flow_callback(IPC_Channel* ch, flow_callback_t callback, void* userdata);static void socket_set_low_flow_callback(IPC_Channel* ch, flow_callback_t callback, void* userdata);static IPC_Message* socket_new_ipcmsg(IPC_Channel* ch, const void* data, int len, void* private);/* socket object of the function table */static struct IPC_OPS socket_ops = { socket_destroy_channel, socket_initiate_connection, socket_verify_auth, socket_assert_auth, socket_send, socket_recv, socket_waitin, socket_waitout, socket_is_message_pending, socket_is_output_pending, socket_resume_io, socket_get_send_fd, socket_get_recv_fd, socket_set_send_qlen, socket_set_recv_qlen, socket_set_high_flow_callback, socket_set_low_flow_callback, socket_new_ipcmsg,};void dump_ipc_info(const IPC_Channel* chan);#undef AUDIT_CHANNELS#ifndef AUDIT_CHANNELS# define CHANAUDIT(ch) /*NOTHING */#else# define CHANAUDIT(ch) socket_chan_audit(ch)# define MAXPID 65535static voidsocket_chan_audit(const struct IPC_CHANNEL* ch){ int badch = FALSE; struct SOCKET_CH_PRIVATE *chp; struct stat b; if ((chp = ch->ch_private) == NULL) { cl_log(LOG_CRIT, "Bad ch_private"); badch = TRUE; } if (ch->ops != &socket_ops) { cl_log(LOG_CRIT, "Bad socket_ops"); badch = TRUE; } if (ch->ch_status == IPC_DISCONNECT) { return; } if (!IPC_ISRCONN(ch)) { cl_log(LOG_CRIT, "Bad ch_status [%d]", ch->ch_status); badch = TRUE; } if (ch->farside_pid < 0 || ch->farside_pid > MAXPID) { cl_log(LOG_CRIT, "Bad farside_pid"); badch = TRUE; } if (fstat(chp->s, &b) < 0) { badch = TRUE; }else if ((b.st_mode & S_IFMT) != S_IFSOCK) { cl_log(LOG_CRIT, "channel @ 0x%lx: not a socket" , (unsigned long)ch); badch = TRUE; } if (chp->remaining_data < 0) { cl_log(LOG_CRIT, "Negative remaining_data"); badch = TRUE; } if (chp->remaining_data < 0 || chp->remaining_data > MAXDATASIZE) { cl_log(LOG_CRIT, "Excessive/bad remaining_data"); badch = TRUE; } if (chp->remaining_data && chp->buf_msg == NULL) { cl_log(LOG_CRIT , "inconsistent remaining_data [%ld]/buf_msg[0x%lx]" , (long)chp->remaining_data, (unsigned long)chp->buf_msg); badch = TRUE; } if (chp->remaining_data == 0 && chp->buf_msg != NULL) { cl_log(LOG_CRIT , "inconsistent remaining_data [%ld]/buf_msg[0x%lx] (2)" , (long)chp->remaining_data, (unsigned long)chp->buf_msg); badch = TRUE; } if (ch->send_queue == NULL || ch->recv_queue == NULL) { cl_log(LOG_CRIT, "bad send/recv queue"); badch = TRUE; } if (ch->recv_queue->current_qlen < 0 || ch->recv_queue->current_qlen > ch->recv_queue->max_qlen) { cl_log(LOG_CRIT, "bad recv queue"); badch = TRUE; } if (ch->send_queue->current_qlen < 0 || ch->send_queue->current_qlen > ch->send_queue->max_qlen) { cl_log(LOG_CRIT, "bad send_queue"); badch = TRUE; } if (badch) { cl_log(LOG_CRIT, "Bad channel @ 0x%lx", (unsigned long)ch); dump_ipc_info(ch); abort(); }}#endif#ifdef CHEAT_CHECKSlong SeqNums[32];static longcheat_get_sequence(IPC_Message* msg){ const char header [] = "String-"; size_t header_len = sizeof(header)-1; char * body; if (msg == NULL || msg->msg_len < sizeof(header) || msg->msg_len > sizeof(header) + 10 || strncmp(msg->msg_body, header, header_len) != 0) { return -1L; } body = msg->msg_body; return atol(body+header_len);}static char SavedReadBody[32];static char SavedReceivedBody[32];static char SavedQueuedBody[32];static char SavedSentBody[32];#ifndef MIN# define MIN(a,b) (a < b ? a : b)#endifstatic voidsave_body(struct IPC_MESSAGE *msg, char * savearea, size_t length){ int mlen = strnlen(msg->msg_body, MIN(length, msg->msg_len)); memcpy(savearea, msg->msg_body, mlen); savearea[mlen] = EOS;}static voidaudit_readmsgq_msg(gpointer msg, gpointer user_data){ long cheatseq = cheat_get_sequence(msg); if (cheatseq < SeqNums[1] || cheatseq > SeqNums[2]) { cl_log(LOG_ERR , "Read Q Message %ld not in range [%ld:%ld]" , cheatseq, SeqNums[1], SeqNums[2]); }}static void saveandcheck(struct IPC_CHANNEL * ch, struct IPC_MESSAGE* msg, char * savearea, size_t savesize, long* lastseq, const char * text){ long cheatseq = cheat_get_sequence(msg); save_body(msg, savearea, savesize); if (*lastseq != 0 ) { if (cheatseq != *lastseq +1) { int j; cl_log(LOG_ERR , "%s packets out of sequence! %ld versus %ld [pid %d]" , text, cheatseq, *lastseq, (int)getpid()); dump_ipc_info(ch); for (j=0; j < 4; ++j) { cl_log(LOG_DEBUG , "SeqNums[%d] = %ld" , j, SeqNums[j]); } cl_log(LOG_ERR , "SocketIPCStats.nsent = %ld" , SocketIPCStats.nsent); cl_log(LOG_ERR , "SocketIPCStats.noutqueued = %ld" , SocketIPCStats.noutqueued); cl_log(LOG_ERR , "SocketIPCStats.nreceived = %ld" , SocketIPCStats.nreceived); cl_log(LOG_ERR , "SocketIPCStats.ninqueued = %ld" , SocketIPCStats.ninqueued); } } g_list_foreach(ch->recv_queue->queue, audit_readmsgq_msg, NULL); if (cheatseq > 0) { *lastseq = cheatseq; }}# define CHECKFOO(which, ch, msg, area, text) { \ saveandcheck(ch,msg,area,sizeof(area),SeqNums+which,text); \ }#else# define CHECKFOO(which, ch, msg, area, text) /* Nothing */#endifstatic voiddump_msg(struct IPC_MESSAGE *msg, const char * label){#ifdef CHEAT_CHECKS cl_log(LOG_DEBUG, "%s packet (length %d) [%s] %ld pid %d" , label, (int)msg->msg_len, (char*)msg->msg_body , cheat_get_sequence(msg), (int)getpid());#else cl_log(LOG_DEBUG, "%s length %d [%s] pid %d" , label, (int)msg->msg_len, (char*)msg->msg_body , (int)getpid());#endif}static voiddump_msgq_msg(gpointer data, gpointer user_data){ dump_msg(data, user_data);}voiddump_ipc_info(const IPC_Channel* chan){ char squeue[] = "Send queue"; char rqueue[] = "Receive queue";#ifdef CHEAT_CHECKS cl_log(LOG_DEBUG, "Saved Last Body read[%s]", SavedReadBody); cl_log(LOG_DEBUG, "Saved Last Body received[%s]", SavedReceivedBody); cl_log(LOG_DEBUG, "Saved Last Body Queued[%s]", SavedQueuedBody); cl_log(LOG_DEBUG, "Saved Last Body Sent[%s]", SavedSentBody);#endif g_list_foreach(chan->send_queue->queue, dump_msgq_msg, squeue); g_list_foreach(chan->recv_queue->queue, dump_msgq_msg, rqueue); CHANAUDIT(chan);}/* destroy socket wait channel */ static void socket_destroy_wait_conn(struct IPC_WAIT_CONNECTION * wait_conn){ struct SOCKET_WAIT_CONN_PRIVATE * wc = wait_conn->ch_private; if (wc != NULL) { close(wc->s); cl_poll_ignore(wc->s); unlink(wc->path_name); g_free(wc); } g_free((void*) wait_conn);}/* return a fd which can be listened on for new connections. */static int socket_wait_selectfd(struct IPC_WAIT_CONNECTION *wait_conn){ struct SOCKET_WAIT_CONN_PRIVATE * wc = wait_conn->ch_private; return (wc == NULL ? -1 : wc->s);}/* socket accept connection. */static struct IPC_CHANNEL* socket_accept_connection(struct IPC_WAIT_CONNECTION * wait_conn, struct IPC_AUTH *auth_info){ /* make peer_addr a pointer so it can be used by the * USE_BINDSTAT_CREDS implementation of socket_verify_auth() */ struct sockaddr_un * peer_addr; struct IPC_CHANNEL * ch = NULL; int sin_size; int s; int new_sock; struct SOCKET_WAIT_CONN_PRIVATE* conn_private; struct SOCKET_CH_PRIVATE * ch_private ; int auth_result = IPC_FAIL; gboolean was_error = FALSE; peer_addr = g_new(struct sockaddr_un, 1); /* get select fd */ s = wait_conn->ops->get_select_fd(wait_conn); if (s < 0) { cl_log(LOG_ERR, "get_select_fd: invalid fd"); g_free(peer_addr); peer_addr = NULL; return NULL; } /* Get client connection. */ sin_size = sizeof(struct sockaddr_un); if ((new_sock = accept(s, (struct sockaddr *)peer_addr, &sin_size)) == -1){ if (errno != EAGAIN && errno != EWOULDBLOCK) { cl_perror("socket_accept_connection: accept"); } was_error = TRUE; }else{ if ((ch = socket_server_channel_new(new_sock)) == NULL) { cl_log(LOG_ERR , "socket_accept_connection:" " Can't create new channel"); was_error = TRUE; }else{ conn_private=(struct SOCKET_WAIT_CONN_PRIVATE*) ( wait_conn->ch_private); ch_private = (struct SOCKET_CH_PRIVATE *)(ch->ch_private); strncpy(ch_private->path_name,conn_private->path_name , sizeof(conn_private->path_name)); ch_private->peer_addr = peer_addr; } } /* Verify the client authorization information. */ if(was_error == FALSE) { auth_result = ch->ops->verify_auth(ch, auth_info); if (auth_result == IPC_OK) { ch->ch_status = IPC_CONNECT; ch->farside_pid = socket_get_farside_pid(new_sock); return ch; } } g_free(peer_addr); peer_addr = NULL; return NULL;}static voidsocket_destroy_channel(struct IPC_CHANNEL * ch){ while (ch->ch_status == IPC_CONNECT
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -