?? mbus.c
字號(hào):
mbus_validate(m);
if (!mbus_waiting_ack(m)) {
return;
}
mbus_msg_validate(curr);
gettimeofday(&time, NULL);
/* diff is time in milliseconds that the message has been awaiting an ACK */
diff = ((time.tv_sec * 1000) + (time.tv_usec / 1000)) - ((curr->send_time.tv_sec * 1000) + (curr->send_time.tv_usec / 1000));
if (diff > 10000) {
debug_msg("Reliable mbus message failed!\n");
if (m->err_handler == NULL) {
abort();
}
m->err_handler(curr->seqnum, MBUS_MESSAGE_LOST);
/* if we don't delete this failed message, the error handler
gets triggered every time we call mbus_retransmit */
while (m->waiting_ack->num_cmds > 0) {
m->waiting_ack->num_cmds--;
xfree(m->waiting_ack->cmd_list[m->waiting_ack->num_cmds]);
xfree(m->waiting_ack->arg_list[m->waiting_ack->num_cmds]);
}
xfree(m->waiting_ack->dest);
xfree(m->waiting_ack);
m->waiting_ack = NULL;
return;
}
/* Note: We only send one retransmission each time, to avoid
* overflowing the receiver with a burst of requests...
*/
if ((diff > 750) && (curr->retransmit_count == 2)) {
resend(m, curr);
return;
}
if ((diff > 500) && (curr->retransmit_count == 1)) {
resend(m, curr);
return;
}
if ((diff > 250) && (curr->retransmit_count == 0)) {
resend(m, curr);
return;
}
}
void mbus_heartbeat(struct mbus *m, int interval)
{
struct timeval curr_time;
char *a = (char *) xmalloc(3);
sprintf(a, "()");
mbus_validate(m);
gettimeofday(&curr_time, NULL);
if (curr_time.tv_sec - m->last_heartbeat.tv_sec >= interval) {
mb_header(++m->seqnum, (int) curr_time.tv_sec, 'U', m->addr, "()", -1);
mb_add_command("mbus.hello", "");
mb_send(m);
m->last_heartbeat = curr_time;
/* Remove dead sources */
remove_inactiv_other_addr(m, curr_time, interval);
}
xfree(a);
}
int mbus_waiting_ack(struct mbus *m)
{
mbus_validate(m);
return m->waiting_ack != NULL;
}
int mbus_sent_all(struct mbus *m)
{
mbus_validate(m);
return (m->cmd_queue == NULL) && (m->waiting_ack == NULL);
}
struct mbus *mbus_init(void (*cmd_handler)(char *src, char *cmd, char *arg, void *dat),
void (*err_handler)(int seqnum, int reason),
char *addr)
{
struct mbus *m;
struct mbus_key k;
struct mbus_parser *mp;
int i;
char *net_addr, *tmp;
uint16_t net_port;
int net_scope;
m = (struct mbus *) xmalloc(sizeof(struct mbus));
if (m == NULL) {
debug_msg("Unable to allocate memory for mbus\n");
return NULL;
}
m->cfg = mbus_create_config();
mbus_lock_config_file(m->cfg);
net_addr = (char *) xmalloc(20);
mbus_get_net_addr(m->cfg, net_addr, &net_port, &net_scope);
m->s = udp_init(net_addr, net_port, net_port, net_scope);
if (m->s == NULL) {
debug_msg("Unable to initialize mbus address\n");
xfree(m);
return NULL;
}
m->seqnum = 0;
m->cmd_handler = cmd_handler;
m->err_handler = err_handler;
m->num_other_addr = 0;
m->max_other_addr = 10;
m->other_addr = (char **) xmalloc(sizeof(char *) * 10);
m->other_hello = (struct timeval **) xmalloc(sizeof(struct timeval *) * 10);
for (i = 0; i < 10; i++) {
m->other_addr[i] = NULL;
m->other_hello[i] = NULL;
}
m->cmd_queue = NULL;
m->waiting_ack = NULL;
m->magic = MBUS_MAGIC;
m->index = 0;
m->index_sent = 0;
mp = mbus_parse_init(xstrdup(addr));
if (!mbus_parse_lst(mp, &tmp)) {
debug_msg("Invalid mbus address\n");
abort();
}
m->addr = xstrdup(tmp);
mbus_parse_done(mp);
ASSERT(m->addr != NULL);
gettimeofday(&(m->last_heartbeat), NULL);
mbus_get_encrkey(m->cfg, &k);
m->encrkey = k.key;
m->encrkeylen = k.key_len;
mbus_get_hashkey(m->cfg, &k);
m->hashkey = k.key;
m->hashkeylen = k.key_len;
mbus_unlock_config_file(m->cfg);
xfree(net_addr);
return m;
}
void mbus_cmd_handler(struct mbus *m, void (*cmd_handler)(char *src, char *cmd, char *arg, void *dat))
{
mbus_validate(m);
m->cmd_handler = cmd_handler;
}
static void mbus_flush_msgs(struct mbus_msg **queue)
{
struct mbus_msg *curr, *next;
int i;
curr = *queue;
while(curr) {
next = curr->next;
xfree(curr->dest);
for(i = 0; i < curr->num_cmds; i++) {
xfree(curr->cmd_list[i]);
xfree(curr->arg_list[i]);
}
xfree(curr);
curr = next;
}
*queue = NULL;
}
void mbus_exit(struct mbus *m)
{
int i;
ASSERT(m != NULL);
mbus_validate(m);
mbus_qmsg(m, "()", "mbus.bye", "", FALSE);
mbus_send(m);
/* FIXME: It should be a fatal error to call mbus_exit() if some messages are still outstanding. */
/* We will need an mbus_flush() call first though, to ensure nothing is waiting. */
mbus_flush_msgs(&m->cmd_queue);
mbus_flush_msgs(&m->waiting_ack);
if (m->encrkey != NULL) {
xfree(m->encrkey);
}
if (m->hashkey != NULL) {
xfree(m->hashkey);
}
udp_exit(m->s);
/* Clean up other_* */
for (i=m->num_other_addr-1; i>=0; i--){
remove_other_addr(m, m->other_addr[i]);
}
xfree(m->addr);
xfree(m->other_addr);
xfree(m->other_hello);
xfree(m->cfg);
xfree(m);
}
void mbus_send(struct mbus *m)
{
/* Send one, or more, messages previosly queued with mbus_qmsg(). */
/* Messages for the same destination are batched together. Stops */
/* when a reliable message is sent, until the ACK is received. */
struct mbus_msg *curr = m->cmd_queue;
int i;
mbus_validate(m);
if (m->waiting_ack != NULL) {
return;
}
while (curr != NULL) {
mbus_msg_validate(curr);
/* It's okay for us to send messages which haven't been marked as complete - */
/* that just means we're sending something which has the potential to have */
/* more data piggybacked. However, if it's not complete it MUST be the last */
/* in the list, or something has been reordered - which is bad. */
if (!curr->complete) {
ASSERT(curr->next == NULL);
}
if (curr->reliable) {
if (!mbus_addr_valid(m, curr->dest)) {
debug_msg("Trying to send reliably to an unknown address...\n");
if (m->err_handler == NULL) {
abort();
}
m->err_handler(curr->seqnum, MBUS_DESTINATION_UNKNOWN);
}
if (!mbus_addr_unique(m, curr->dest)) {
debug_msg("Trying to send reliably but address is not unique...\n");
if (m->err_handler == NULL) {
abort();
}
m->err_handler(curr->seqnum, MBUS_DESTINATION_NOT_UNIQUE);
}
}
/* Create the message... */
mb_header(curr->seqnum, curr->comp_time.tv_sec, (char)(curr->reliable?'R':'U'), m->addr, curr->dest, -1);
for (i = 0; i < curr->num_cmds; i++) {
ASSERT(m->index_sent == (curr->idx_list[i] - 1));
m->index_sent = curr->idx_list[i];
mb_add_command(curr->cmd_list[i], curr->arg_list[i]);
}
mb_send(m);
m->cmd_queue = curr->next;
if (curr->reliable) {
/* Reliable message, wait for the ack... */
gettimeofday(&(curr->send_time), NULL);
m->waiting_ack = curr;
curr->next = NULL;
return;
} else {
while (curr->num_cmds > 0) {
curr->num_cmds--;
xfree(curr->cmd_list[curr->num_cmds]); curr->cmd_list[curr->num_cmds] = NULL;
xfree(curr->arg_list[curr->num_cmds]); curr->arg_list[curr->num_cmds] = NULL;
}
xfree(curr->dest);
xfree(curr);
}
curr = m->cmd_queue;
}
}
void mbus_qmsg(struct mbus *m, const char *dest, const char *cmnd, const char *args, int reliable)
{
/* Queue up a message for sending. The message is not */
/* actually sent until mbus_send() is called. */
struct mbus_msg *curr = m->cmd_queue;
struct mbus_msg *prev = NULL;
int alen = strlen(cmnd) + strlen(args) + 4;
int i;
mbus_validate(m);
while (curr != NULL) {
mbus_msg_validate(curr);
if (!curr->complete) {
/* This message is still open for new commands. It MUST be the last in the */
/* cmd_queue, else commands will be reordered. */
ASSERT(curr->next == NULL);
if (mbus_addr_identical(curr->dest, dest) &&
(curr->num_cmds < MBUS_MAX_QLEN) && ((curr->message_size + alen) < (MBUS_BUF_SIZE - 500))) {
curr->num_cmds++;
curr->reliable |= reliable;
curr->cmd_list[curr->num_cmds-1] = xstrdup(cmnd);
curr->arg_list[curr->num_cmds-1] = xstrdup(args);
curr->idx_list[curr->num_cmds-1] = ++(m->index);
curr->message_size += alen;
mbus_msg_validate(curr);
return;
} else {
curr->complete = TRUE;
}
}
prev = curr;
curr = curr->next;
}
/* If we get here, we've not found an open message in the cmd_queue. We */
/* have to create a new message, and add it to the end of the cmd_queue. */
curr = (struct mbus_msg *) xmalloc(sizeof(struct mbus_msg));
curr->magic = MBUS_MSG_MAGIC;
curr->next = NULL;
curr->dest = xstrdup(dest);
curr->retransmit_count = 0;
curr->message_size = alen + 60 + strlen(dest) + strlen(m->addr);
curr->seqnum = ++m->seqnum;
curr->reliable = reliable;
curr->complete = FALSE;
curr->num_cmds = 1;
curr->cmd_list[0] = xstrdup(cmnd);
curr->arg_list[0] = xstrdup(args);
curr->idx_list[curr->num_cmds-1] = ++(m->index);
for (i = 1; i < MBUS_MAX_QLEN; i++) {
curr->cmd_list[i] = NULL;
curr->arg_list[i] = NULL;
}
if (prev == NULL) {
m->cmd_queue = curr;
} else {
prev->next = curr;
}
gettimeofday(&(curr->send_time), NULL);
gettimeofday(&(curr->comp_time), NULL);
mbus_msg_validate(curr);
}
?? 快捷鍵說(shuō)明
復(fù)制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號(hào)
Ctrl + =
減小字號(hào)
Ctrl + -