?? ipcsocket.c
字號:
&& ch->send_queue->current_qlen > 0){ if (socket_resume_io(ch) != IPC_OK){ break; } } socket_disconnect(ch); socket_destroy_queue(ch->send_queue); socket_destroy_queue(ch->recv_queue); if (ch->ch_private != NULL) { struct SOCKET_CH_PRIVATE *priv = (struct SOCKET_CH_PRIVATE *) ch->ch_private; if(priv->peer_addr != NULL) { unlink(priv->peer_addr->sun_path); g_free((void*)(priv->peer_addr)); } g_free((void*)(ch->ch_private)); } memset(ch, 0xff, sizeof(*ch)); g_free((void*)ch);}/* * Called by socket_destory(). Disconnect the connection * and set ch_status to IPC_DISCONNECT. * * parameters : * ch (IN) the pointer to the channel. * * return values : * IPC_OK the connection is disconnected successfully. * IPC_FAIL operation fails.*/static intsocket_disconnect(struct IPC_CHANNEL* ch){ struct SOCKET_CH_PRIVATE* conn_info; conn_info = (struct SOCKET_CH_PRIVATE*) ch->ch_private;#if 0 if (ch->ch_status != IPC_DISCONNECT) { cl_log(LOG_INFO, "forced disconnect for fd %d", conn_info->s); }#endif close(conn_info->s); cl_poll_ignore(conn_info->s); conn_info->s = -1; ch->ch_status = IPC_DISCONNECT; return IPC_OK;}static intsocket_check_disc_pending(struct IPC_CHANNEL* ch){ int rc; struct pollfd sockpoll; if (ch->ch_status == IPC_DISCONNECT) { cl_log(LOG_ERR, "check_disc_pending() already disconnected"); return IPC_BROKEN; } if (ch->recv_queue->current_qlen > 0) { return IPC_OK; } sockpoll.fd = ch->ops->get_recv_select_fd(ch); sockpoll.events = POLLIN; rc = ipc_pollfunc_ptr(&sockpoll, 1, 0); if (rc < 0) { cl_log(LOG_INFO , "socket_check_disc_pending() bad poll call"); ch->ch_status = IPC_DISCONNECT; return IPC_BROKEN; } if (sockpoll.revents & POLLHUP) { if (sockpoll.revents & POLLIN) { ch->ch_status = IPC_DISC_PENDING; }else{#if 1 cl_log(LOG_INFO, "HUP without input");#endif ch->ch_status = IPC_DISCONNECT; return IPC_BROKEN; } } if (sockpoll.revents & POLLIN) { int dummy; socket_resume_io_read(ch, &dummy, FALSE); } return IPC_OK;}static int socket_initiate_connection(struct IPC_CHANNEL * ch){ struct SOCKET_CH_PRIVATE* conn_info; struct sockaddr_un peer_addr; /* connector's address information */ conn_info = (struct SOCKET_CH_PRIVATE*) ch->ch_private; /* Prepare the socket */ memset(&peer_addr, 0, sizeof(peer_addr)); peer_addr.sun_family = AF_LOCAL; /* host byte order */ if (strlen(conn_info->path_name) >= sizeof(peer_addr.sun_path)) { return IPC_FAIL; } strncpy(peer_addr.sun_path, conn_info->path_name, sizeof(peer_addr.sun_path)); /* Send connection request */ if (connect(conn_info->s, (struct sockaddr *)&peer_addr , sizeof(struct sockaddr_un)) == -1) { cl_log(LOG_WARNING, "initiate_connection: connect failure: %s", strerror(errno) ); return IPC_FAIL; } ch->ch_status = IPC_CONNECT; ch->farside_pid = socket_get_farside_pid(conn_info->s); return IPC_OK;}static voidsocket_set_high_flow_callback(IPC_Channel* ch, flow_callback_t callback, void* userdata){ ch->high_flow_callback = callback; ch->high_flow_userdata = userdata; }static voidsocket_set_low_flow_callback(IPC_Channel* ch, flow_callback_t callback, void* userdata){ ch->low_flow_callback = callback; ch->low_flow_userdata = userdata; }static voidsocket_check_flow_control(struct IPC_CHANNEL* ch, int orig_qlen, int curr_qlen){ if (!IPC_ISRCONN(ch)) { return; } if (curr_qlen >= ch->high_flow_mark && ch->high_flow_callback){ ch->high_flow_callback(ch, ch->high_flow_userdata); } if (curr_qlen <= ch->low_flow_mark && orig_qlen > ch->low_flow_mark && ch->low_flow_callback){ ch->low_flow_callback(ch, ch->low_flow_userdata); } return; }static int socket_send(struct IPC_CHANNEL * ch, struct IPC_MESSAGE* msg){ int orig_qlen; if (msg->msg_len < 0 || msg->msg_len > MAXDATASIZE) { cl_log(LOG_ERR, "socket_send: " "invalid message"); return IPC_FAIL; } if (ch->ch_status != IPC_CONNECT){ return IPC_FAIL; } ch->ops->resume_io(ch); if ( !ch->should_send_blocking && ch->send_queue->current_qlen >= ch->send_queue->max_qlen) { /*cl_log(LOG_WARNING, "send queue maximum length(%d) exceeded", ch->send_queue->max_qlen );*/ return IPC_FAIL; } while (ch->send_queue->current_qlen >= ch->send_queue->max_qlen){ cl_shortsleep(); ch->ops->resume_io(ch); } /* add the message into the send queue */ CHECKFOO(0,ch, msg, SavedQueuedBody, "queued message"); SocketIPCStats.noutqueued++; ch->send_queue->queue = g_list_append(ch->send_queue->queue, msg); orig_qlen = ch->send_queue->current_qlen++; socket_check_flow_control(ch, orig_qlen, orig_qlen +1 ); /* resume io */ ch->ops->resume_io(ch); return IPC_OK; }static int socket_recv(struct IPC_CHANNEL * ch, struct IPC_MESSAGE** message){ GList *element; int nbytes; int result; socket_resume_io(ch); result = socket_resume_io_read(ch, &nbytes, TRUE); *message = NULL; if (ch->recv_queue->current_qlen == 0) { return result != IPC_OK ? result : IPC_FAIL; /*return IPC_OK;*/ } element = g_list_first(ch->recv_queue->queue); if (element == NULL) { /* Internal accounting error, but correctable */ cl_log(LOG_ERR , "recv failure: qlen (%d) > 0, but no message found." , ch->recv_queue->current_qlen); ch->recv_queue->current_qlen = 0; return IPC_FAIL; } *message = (struct IPC_MESSAGE *) (element->data); CHECKFOO(1,ch, *message, SavedReadBody, "read message"); SocketIPCStats.nreceived++; ch->recv_queue->queue = g_list_remove(ch->recv_queue->queue , element->data); ch->recv_queue->current_qlen--; return IPC_OK;}static intsocket_check_poll(struct IPC_CHANNEL * ch, struct pollfd * sockpoll){ if (ch->ch_status == IPC_DISCONNECT) { return IPC_OK; } if (sockpoll->revents & POLLHUP) { /* If input present, or this is an output-only poll... */ if (sockpoll->revents & POLLIN || (sockpoll-> events & POLLIN) == 0 ) { ch->ch_status = IPC_DISC_PENDING; return IPC_OK; }#if 1 cl_log(LOG_INFO, "socket_check_poll(): HUP without input");#endif ch->ch_status = IPC_DISCONNECT; return IPC_BROKEN; }else if (sockpoll->revents & (POLLNVAL|POLLERR)) { /* Have we already closed the socket? */ if (fcntl(sockpoll->fd, F_GETFL) < 0) { cl_perror("socket_check_poll(pid %d): bad fd [%d]" , (int) getpid(), sockpoll->fd); ch->ch_status = IPC_DISCONNECT; return IPC_OK; } cl_log(LOG_ERR , "revents failure: fd %d, flags 0x%x" , sockpoll->fd, sockpoll->revents); errno = EINVAL; return IPC_FAIL; } return IPC_OK;}static intsocket_waitfor(struct IPC_CHANNEL * ch, gboolean (*finished)(struct IPC_CHANNEL * ch)){ struct pollfd sockpoll; CHANAUDIT(ch); if (finished(ch)) { return IPC_OK; } if (ch->ch_status == IPC_DISCONNECT) { return IPC_BROKEN; } sockpoll.fd = ch->ops->get_recv_select_fd(ch); while (!finished(ch) && IPC_ISRCONN(ch)) { int rc; sockpoll.events = POLLIN; /* Cannot call resume_io after the call to finished() * and before the call to poll because we might * change the state of the thing finished() is * waiting for. * This means that the poll call below would be * not only pointless, but might * make us hang forever waiting for this * event which has already happened */ if (ch->send_queue->current_qlen > 0) { sockpoll.events |= POLLOUT; } rc = ipc_pollfunc_ptr(&sockpoll, 1, -1); if (rc < 0) { return (errno == EINTR ? IPC_INTR : IPC_FAIL); } rc = socket_check_poll(ch, &sockpoll); if (sockpoll.revents & POLLIN) { socket_resume_io(ch); } if (rc != IPC_OK) { CHANAUDIT(ch); return rc; } } CHANAUDIT(ch); return IPC_OK;}static intsocket_waitin(struct IPC_CHANNEL * ch){ return socket_waitfor(ch, ch->ops->is_message_pending);}static gbooleansocket_is_output_flushed(struct IPC_CHANNEL * ch){ return ! ch->ops->is_sending_blocked(ch);}static intsocket_waitout(struct IPC_CHANNEL * ch){ int rc; CHANAUDIT(ch); rc = socket_waitfor(ch, socket_is_output_flushed); if (rc != IPC_OK) { cl_log(LOG_ERR , "socket_waitout failure: rc = %d", rc); }else if (ch->ops->is_sending_blocked(ch)) { cl_log(LOG_ERR, "socket_waitout output still blocked"); } CHANAUDIT(ch); return rc;}static gbooleansocket_is_message_pending(struct IPC_CHANNEL * ch){ ch->ops->resume_io(ch); if (ch->recv_queue->current_qlen > 0) { return TRUE; } return !IPC_ISRCONN(ch);}static gbooleansocket_is_output_pending(struct IPC_CHANNEL * ch){ socket_resume_io(ch); return ch->ch_status == IPC_CONNECT && ch->send_queue->current_qlen > 0;}static int socket_assert_auth(struct IPC_CHANNEL *ch, GHashTable *auth){ cl_log(LOG_ERR , "the assert_auth function for domain socket is not implemented"); return IPC_FAIL;}static intsocket_resume_io_read(struct IPC_CHANNEL *ch, int* nbytes, gboolean read1anyway){ struct SOCKET_CH_PRIVATE* conn_info; int retcode = IPC_OK; struct pollfd sockpoll; int debug_loopcount = 0; int debug_bytecount = 0; int maxqlen = ch->recv_queue->max_qlen; struct ipc_bufpool* pool = ch->pool; int nmsgs = 0; int spaceneeded; *nbytes = 0; CHANAUDIT(ch); conn_info = (struct SOCKET_CH_PRIVATE *) ch->ch_private; if (ch->ch_status == IPC_DISCONNECT) { return IPC_BROKEN; } if (pool == NULL){ ch->pool = pool = ipc_bufpool_new(0); if (pool == NULL){ cl_log(LOG_ERR, "socket_resume_io_read: " "memory allocation for ipc pool failed"); exit(1); } } if (ipc_bufpool_full(pool, ch, &spaceneeded)){ struct ipc_bufpool* newpool; newpool = ipc_bufpool_new(spaceneeded); if (newpool == NULL){ cl_log(LOG_ERR, "socket_resume_io_read: " "memory allocation for a new ipc pool failed"); exit(1); } ipc_bufpool_partial_copy(newpool, pool); ipc_bufpool_unref(pool); ch->pool = pool = newpool; } if (maxqlen <= 0 && read1anyway) { maxqlen = 1; } if (ch->recv_queue->current_qlen < maxqlen && retcode == IPC_OK) { void * msg_begin; int msg_len; int len; CHANAUDIT(ch); ++debug_loopcount; len = ipc_bufpool_spaceleft(pool); msg_begin = pool->currpos; CHANAUDIT(ch); /* Now try to receive some data */ msg_len = recv(conn_info->s, msg_begin, len, MSG_DONTWAIT); SocketIPCStats.last_recv_rc = msg_len; SocketIPCStats.last_recv_errno = errno; ++SocketIPCStats.recv_count; /* Did we get an error? */ if (msg_len < 0) { switch (errno) { case EAGAIN: if (ch->ch_status==IPC_DISC_PENDING){ ch->ch_status =IPC_DISCONNECT; retcode = IPC_BROKEN; } break; case ECONNREFUSED: case ECONNRESET: retcode= socket_check_disc_pending(ch); break; default: cl_perror("socket_resume_io_read" ": unknown recv error"); ch->ch_status = IPC_DISCONNECT; retcode = IPC_FAIL; break; } }else if (msg_len == 0) { if (ch->ch_status == IPC_DISC_PENDING && ch->recv_queue->current_qlen <= 0) { ch->ch_status = IPC_DISCONNECT; retcode = IPC_FAIL; } }else { /* We read something! */ /* Note that all previous cases break out of the loop */ debug_bytecount += msg_len; *nbytes = msg_len; nmsgs = ipc_bufpool_update(pool, ch, msg_len, ch->recv_queue) ; SocketIPCStats.ninqueued += nmsgs; } } /* Check for errors uncaught by recv() */ /* NOTE: It doesn't seem right we have to do this every time */ /* FIXME?? */ memset(&sockpoll,0, sizeof(struct pollfd)); if ((retcode == IPC_OK) && (sockpoll.fd = conn_info->s) >= 0) { /* Just check for errors, not for data */ sockpoll.events = 0; ipc_pollfunc_ptr(&sockpoll, 1, 0); retcode = socket_check_poll(ch, &sockpoll); } CHANAUDIT(ch); if (retcode != IPC_OK) { return retcode; } return IPC_ISRCONN(ch) ? IPC_OK : IPC_BROKEN;}static intsocket_resume_io_write(struct IPC_CHANNEL *ch, int* nmsg){ int retcode = IPC_OK; struct SOCKET_CH_PRIVATE* conn_info; CHANAUDIT(ch); *nmsg = 0; conn_info = (struct SOCKET_CH_PRIVATE *) ch->ch_private; while (ch->ch_status == IPC_CONNECT && retcode == IPC_OK && ch->send_queue->current_qlen > 0) { GList * element; struct IPC_MESSAGE * msg; struct SOCKET_MSG_HEAD* head; struct IPC_MESSAGE* oldmsg = NULL; int sendrc = 0; struct IPC_MESSAGE* newmsg; char* p; unsigned int bytes_remaining;
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -