?? cmslib_client.c
字號(hào):
/* * cmslib_client.c: SAForum AIS Message Service client library * * Copyright (c) 2004 Intel Corp. * * Author: Zou Yixiong (yixiong.zou@intel.com) * Author: Zhu Yi (yi.zhu@intel.com) * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 * USA * */#include <stdio.h>#include <stdlib.h>#include <unistd.h> /* dup, dup2 */#include <string.h>#include <assert.h>#include <clplumbing/realtime.h>#include <cl_log.h>#include <heartbeat.h>#include <saf/ais.h>#include "cmslib_client.h"#include "cms_client_types.h"#define PIPETRICK_DEBUG 0#ifdef DEBUG_LIBRARY#define dprintf(arg...) fprintf(stderr, ##arg)#else#define dprintf(arg...) {}#endif#define GET_CMS_HANDLE(x) ((x == NULL) ? NULL : \ (__cms_handle_t *)g_hash_table_lookup( \ __cmshandle_hash, x))#define GET_MQ_HANDLE(x) ((x == NULL) ? NULL : \ (__cms_handle_t *)g_hash_table_lookup( \ __mqhandle_hash, x))static GHashTable * __cmshandle_hash;static GHashTable * __mqhandle_hash;static GHashTable * __group_tracking_hash;static guint __cmshandle_counter = 0;static gboolean __cmsclient_init_flag = FALSE;static gboolean __notify_acked = TRUE;void cmsclient_hash_init(void);IPC_Channel *cms_channel_conn(void);int enqueue_dispatch_msg(__cms_handle_t * hd, client_header_t * msg);client_header_t * dequeue_dispatch_msg(GList ** queue);int read_and_queue_ipc_msg(__cms_handle_t * handle);int dispatch_msg(__cms_handle_t * handle, client_header_t * msg);int wait_for_msg(__cms_handle_t * handle, size_t msgtype, const SaNameT * name, client_header_t ** msg, SaTimeT timeout);int get_timeout_value(SaTimeT timeout, struct timeval * tv);static intsaname_cmp(const SaNameT s1, const SaNameT s2){ SaUint16T len1, len2; // dprintf("Length of s1: %d, s2: %d\n", s1.length, s2.length); len1 = s1.value[s1.length - 1] ? s1.length : s1.length - 1; len2 = s2.value[s2.length - 1] ? s2.length : s2.length - 1; if (len1 != len2) return len2 - len1; return strncmp(s1.value, s2.value, len1);}static intbad_saname(const SaNameT * name){ int i; if (!name || name->length <= 0 || name->length > SA_MAX_NAME_LENGTH - 1) return TRUE; /* * We don't support '\0' inside a SaNameT.value. */ for (i = 0; i < name->length; i++) if (name->value[i] == '\0') return TRUE; return FALSE;}static char *saname2str(SaNameT name){ char * str; if (name.length <= 0) return NULL; if (name.length > SA_MAX_NAME_LENGTH - 1) name.length = SA_MAX_NAME_LENGTH - 1; if ((str = (char *)ha_malloc(name.length + 1)) == NULL) return NULL; strncpy(str, name.value, name.length); str[name.length] = '\0'; return str;}static intactive_poll(__cms_handle_t * hd){ int fd; if (hd->backup_fd >= 0) { cl_log(LOG_WARNING, "%s: recursion detected", __FUNCTION__); return 1; } if ((fd = hd->ch->ops->get_recv_select_fd(hd->ch)) < 0) { cl_log(LOG_ERR, "%s: get_recv_select_fd failed", __FUNCTION__); return 1; } if ((hd->backup_fd = dup(fd)) == -1) { cl_log(LOG_ERR, "%s: dup2 failed", __FUNCTION__); perror("dup2"); return 1; } close(fd); if (dup2(hd->active_fd, fd) == -1) { cl_log(LOG_ERR, "%s: dup2 failed", __FUNCTION__); perror("dup2"); return 1; }#if PIPETRICK_DEBUG dprintf("acitve_poll for <%p>\n", hd);#endif return 0;}static intrestore_poll(__cms_handle_t * hd){ int fd; if (hd->backup_fd < 0) { cl_log(LOG_WARNING, "%s: recursion detected", __FUNCTION__); return 1; } if ((fd = hd->ch->ops->get_recv_select_fd(hd->ch)) < 0) { cl_log(LOG_ERR, "%s: get_recv_select_fd failed", __FUNCTION__); return 1; } if (dup2(hd->backup_fd, fd) == -1) { cl_log(LOG_ERR, "%s: dup2 failed", __FUNCTION__); return 1; } hd->backup_fd = -1; /* mark as unused */ #if PIPETRICK_DEBUG dprintf("restore_poll for <%p>\n", hd);#endif return 0;}static intcmsclient_message_recv(__cms_handle_t * hd, client_header_t ** data){ int ret; IPC_Message * ipc_msg; if (hd->backup_fd >= 0) restore_poll(hd); ret = hd->ch->ops->recv(hd->ch, &ipc_msg); if (ret != IPC_OK) return ret; *data = ha_malloc(ipc_msg->msg_len); memcpy(*data, ipc_msg->msg_body, ipc_msg->msg_len); ipc_msg->msg_done(ipc_msg); return ret;}static voidcmsclient_message_done(IPC_Message * msg){ char * name; client_header_t * message; message = msg->msg_body; name = saname2str(message->name); ha_free(msg->msg_private); ha_free(name);}static intcmsclient_message_send(__cms_handle_t * hd, size_t len, gpointer data){ IPC_Message * msg; if ((msg = ha_malloc(sizeof(IPC_Message) + len)) == NULL) { cl_log(LOG_ERR, "%s: ha_malloc failed", __FUNCTION__); return FALSE; } if (hd->backup_fd >= 0) restore_poll(hd); msg->msg_body = msg + 1; memcpy(msg->msg_body, data, len); msg->msg_len = len; msg->msg_private = msg; msg->msg_done = cmsclient_message_done; msg->msg_buf = 0; return hd->ch->ops->send(hd->ch, msg);}static gbooleanmsgqueue_remove(gpointer key, gpointer value, gpointer user_data){ __cms_queue_handle_t * qhd = (__cms_queue_handle_t *) value; client_mqueue_close_t cmg; SaNameT * qname; CMS_LIBRARY_TRACE(); qname = &(qhd->queue_name); cmg.header.type = CMS_QUEUE_CLOSE; cmg.header.name = *qname; cmg.handle = qhd->queue_handle; cmg.silent = TRUE; cmsclient_message_send(qhd->cms_handle, sizeof(cmg), &cmg); g_hash_table_remove(__mqhandle_hash, key); ha_free((__cms_queue_handle_t *) qhd); return TRUE;}static gbooleanlibrary_initialized(void){ return __cmsclient_init_flag;}voidcmsclient_hash_init(){ if (library_initialized()) return; __cmshandle_hash = g_hash_table_new(g_int_hash, g_int_equal); __mqhandle_hash = g_hash_table_new(g_int_hash, g_int_equal); __group_tracking_hash = g_hash_table_new(g_str_hash, g_str_equal); __cmsclient_init_flag = TRUE;}/* * This is a blocking wait for a particular type of msg on a particular queue. * Note: memory allocated in this function. caller needs to free(). */intwait_for_msg(__cms_handle_t * handle, size_t msgtype, const SaNameT * queueName, client_header_t ** msg, SaTimeT timeout){ int fd; client_header_t * cms_msg; longclock_t t_start = 0, t_end = 0; if (timeout < 0) return SA_ERR_INVALID_PARAM; if (timeout != SA_TIME_END) { t_start = time_longclock(); t_end = t_start + msto_longclock(timeout/1000); } if (handle->backup_fd >= 0) restore_poll(handle); fd = handle->ch->ops->get_recv_select_fd(handle->ch); dprintf("In %s for message type 0x%x\n", __FUNCTION__, msgtype); while (1) { int ret = -1; struct timeval * tv, to; fd_set rset; FD_ZERO(&rset); tv = NULL; if (timeout != SA_TIME_END) { to.tv_sec = longclockto_ms((t_end - t_start))/1000; to.tv_usec = (((t_end - t_start) - secsto_longclock(to.tv_sec)))/1000; tv = &to; } if (!handle->ch->ops->is_message_pending(handle->ch) && (ret = select(fd + 1, &rset, NULL, NULL, tv)) == -1) { cl_log(LOG_ERR, "%s: select error", __FUNCTION__); return SA_ERR_LIBRARY; } else if (ret == 0) { cl_log(LOG_WARNING, "%s: timeout!", __FUNCTION__); return SA_ERR_TIMEOUT; } if ((ret = cmsclient_message_recv(handle, &cms_msg))!= IPC_OK) { if (ret == IPC_FAIL) { cl_shortsleep(); continue; } cl_log(LOG_ERR, "%s: cmsclient_message_recv failed, " "rc = %d", __FUNCTION__, ret); return SA_ERR_LIBRARY; } if (cms_msg->type & msgtype) { if (!queueName || (queueName && (saname_cmp(cms_msg->name, *queueName) == 0))) { *msg = cms_msg; if (g_list_length(handle->dispatch_queue)) active_poll(handle); return SA_OK; } } enqueue_dispatch_msg(handle, cms_msg); t_start = time_longclock(); }}IPC_Channel *cms_channel_conn(void){ IPC_Channel * ch; GHashTable * attrs; char path[] = IPC_PATH_ATTR; char cms_socket[] = CMS_DOMAIN_SOCKET; int ret; attrs = g_hash_table_new(g_str_hash,g_str_equal); g_hash_table_insert(attrs, path, cms_socket); ch = ipc_channel_constructor(IPC_DOMAIN_SOCKET, attrs); g_hash_table_destroy(attrs); if (ch ) { ret = ch->ops->initiate_connection(ch); if (ret != IPC_OK) { cl_log(LOG_ERR, "cms_channel_conn failed, maybe " "you don't have cms server running..."); return NULL; } return ch; } else return NULL;}static intenqueue_dispatch_item(GList **queue, client_header_t * item){ *queue = g_list_append(*queue, item); return SA_OK;}intenqueue_dispatch_msg(__cms_handle_t * hd, client_header_t * msg){ client_message_t * fmsg = (client_message_t *)msg; dprintf("calling enqueue_dispatch_msg ..... \n"); /* * If it is a message, then add it to the msg queue. */ if (msg->type == CMS_MSG_NOTIFY) { dprintf("got a CMS_MSG_NOTIFY msg\n"); __notify_acked = FALSE; } return enqueue_dispatch_item(&(hd->dispatch_queue), (client_header_t *) fmsg);}client_header_t *dequeue_dispatch_msg(GList ** queue){ client_header_t * msg = NULL; GList * head; if (!g_list_length(*queue)) return NULL; head = g_list_first(*queue); *queue = g_list_remove_link(*queue, head); msg = head->data; g_list_free_1(head); return msg;}/** * Read all the ipc msg in the buffer and queue them to * the msg queue or the dispatch queue. */intread_and_queue_ipc_msg(__cms_handle_t * handle){ int ret, count = 0; client_header_t *rcmg; __mqgroup_track_t * track; client_mqgroup_notify_t *nsg, *m; dprintf("b4 the do loop of the read_and_queue_ipc_msg ...\n"); if (handle->backup_fd >= 0) restore_poll(handle); while (handle->ch->ops->is_message_pending(handle->ch)) { ret = cmsclient_message_recv(handle, &rcmg); if (ret == IPC_FAIL) { cl_shortsleep(); cl_log(LOG_WARNING, "%s: cmsclient_message_recv " "failed, rc = %d", __FUNCTION__, ret); break; } switch (rcmg->type) { case CMS_QUEUEGROUP_NOTIFY: /* * prepare the notify buffer */ m = (client_mqgroup_notify_t *)rcmg; m->data = (char *)rcmg + sizeof(client_mqgroup_notify_t); track = g_hash_table_lookup(__group_tracking_hash, (m->group_name).value); if (track == NULL) { /* * This is possible, because TrackStop * may be called before we get here. */ cl_log(LOG_INFO, "No one tracks the group" " [%s] membership now!" , m->group_name.value); return TRUE; } track->policy = m->policy; track->buf.numberOfItems = m->number; track->buf.notification = (SaMsgQueueGroupNotificationT *) ha_malloc(m->number * sizeof(SaMsgQueueGroupNotificationT)); memcpy(track->buf.notification, m->data, m->number * sizeof(SaMsgQueueGroupNotificationT)); /* * only enqueue head is enough for us */ dprintf("enqueue group notify msg head\n"); nsg = (client_mqgroup_notify_t *) ha_malloc(sizeof(client_mqgroup_notify_t)); memcpy(nsg, m, sizeof(client_mqgroup_notify_t)); enqueue_dispatch_msg(handle, (client_header_t *)nsg); ha_free(rcmg); break; default: enqueue_dispatch_msg(handle, rcmg); /* TODO: we have a memory leak here need to call the msg_done() */ break; } } return count;}intdispatch_msg(__cms_handle_t * handle, client_header_t * msg) { client_mqueue_open_t * omsg; client_mqgroup_notify_t * nmsg; client_message_ack_t * amsg; __mqgroup_track_t * track; client_message_t * gmsg; char * name; __cms_queue_handle_t * qhd; dprintf("In Function %s..\n", __FUNCTION__); dprintf("handle=<%p> msg->type=<%d>\n", handle, msg->type); if (handle == NULL || msg == NULL) return HA_FAIL; switch (msg->type) { case CMS_QUEUE_OPEN_ASYNC: omsg = (client_mqueue_open_t *) msg; if ((handle->callbacks).saMsgQueueOpenCallback) { if (omsg->header.flag != SA_OK) { omsg->handle = 0; } (handle->callbacks).saMsgQueueOpenCallback( omsg->invocation, &(omsg->handle), omsg->header.flag); } ha_free(omsg); break; case CMS_MSG_NOTIFY: gmsg = (client_message_t *) msg; qhd = g_hash_table_lookup(handle->queue_handle_hash, &(gmsg->handle)); if (handle->callbacks.saMsgMessageReceivedCallback) handle->callbacks.saMsgMessageReceivedCallback( &(qhd->queue_handle)); ha_free(gmsg); break; case CMS_MSG_ACK: amsg = (client_message_ack_t *) msg; if ((handle->callbacks).saMsgMessageDeliveredCallback) { (handle->callbacks).saMsgMessageDeliveredCallback( amsg->invocation, msg->flag); } ha_free(amsg); break; case CMS_QUEUEGROUP_NOTIFY: nmsg = (client_mqgroup_notify_t *)msg; name = (char *) ha_malloc(nmsg->group_name.length + 1); if (name == NULL) { cl_log(LOG_ERR, "%s: ha_malloc failed", __FUNCTION__); return FALSE; } dprintf("group name [%s], length [%d]\n" , nmsg->group_name.value, nmsg->group_name.length); strncpy(name, nmsg->group_name.value, nmsg->group_name.length); name[nmsg->group_name.length] = '\0'; dprintf("name = [%s]\n", name); track = g_hash_table_lookup(__group_tracking_hash, name); if (track == NULL) { cl_log(LOG_ERR, "Cannot find track buffer"); return FALSE; } if ((handle->callbacks).saMsgQueueGroupTrackCallback == NULL) return FALSE; (handle->callbacks).saMsgQueueGroupTrackCallback( track->name, &(track->buf), track->policy, track->buf.numberOfItems, SA_OK);
?? 快捷鍵說明
復(fù)制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號(hào)
Ctrl + =
減小字號(hào)
Ctrl + -