diff options
author | Alan Antonuk <alan.antonuk@gmail.com> | 2013-06-14 17:48:05 -0700 |
---|---|---|
committer | Alan Antonuk <alan.antonuk@gmail.com> | 2013-06-25 16:10:51 -0700 |
commit | daa0e663ed6ae6d49ed3cc0da3043e3d4e8b2252 (patch) | |
tree | c4f85982841a272f3f1a0b76708755adde52c378 /librabbitmq/amqp_socket.c | |
parent | 3866433fe273eded1e601b3274028684c3e77919 (diff) | |
download | rabbitmq-c-daa0e663ed6ae6d49ed3cc0da3043e3d4e8b2252.tar.gz |
Add partial support for heartbeats in wait_frame
Diffstat (limited to 'librabbitmq/amqp_socket.c')
-rw-r--r-- | librabbitmq/amqp_socket.c | 77 |
1 files changed, 64 insertions, 13 deletions
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index a4ec537..8966cb9 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -355,6 +355,11 @@ static int wait_frame_inner(amqp_connection_state_t state, { uint64_t current_timestamp = 0; uint64_t timeout_timestamp = 0; + uint64_t next_timestamp = 0; + + if (timeout && (timeout->tv_sec < 0 || timeout->tv_usec < 0)) { + return AMQP_STATUS_INVALID_PARAMETER; + } while (1) { int res; @@ -371,6 +376,10 @@ static int wait_frame_inner(amqp_connection_state_t state, state->sock_inbound_offset += res; + if (AMQP_FRAME_HEARTBEAT == decoded_frame->frame_type) { + continue; + } + if (decoded_frame->frame_type != 0) { /* Complete frame was read. Return it. */ return AMQP_STATUS_OK; @@ -380,10 +389,7 @@ static int wait_frame_inner(amqp_connection_state_t state, assert(res != 0); } - if (timeout) { - if (timeout->tv_sec < 0 || timeout->tv_usec < 0) { - return AMQP_STATUS_INVALID_PARAMETER; - } + if (timeout || state->heartbeat > 0) { while (1) { int fd; fd_set read_fd; @@ -402,28 +408,54 @@ static int wait_frame_inner(amqp_connection_state_t state, FD_ZERO(&except_fd); FD_SET(fd, &except_fd); + current_timestamp = amqp_get_monotonic_timestamp(); if (0 == current_timestamp) { + return AMQP_STATUS_TIMER_FAILURE; + } + + if (state->heartbeat > 0 && current_timestamp > state->next_send_heartbeat) { + amqp_frame_t heartbeat; + heartbeat.channel = 0; + heartbeat.frame_type = AMQP_FRAME_HEARTBEAT; + + res = amqp_send_frame(state, &heartbeat); + if (AMQP_STATUS_OK != res) { + return res; + } + current_timestamp = amqp_get_monotonic_timestamp(); if (0 == current_timestamp) { return AMQP_STATUS_TIMER_FAILURE; } + } + if (timeout && 0 == timeout_timestamp) { timeout_timestamp = current_timestamp + timeout->tv_sec * AMQP_NS_PER_S + timeout->tv_usec * AMQP_NS_PER_US; - } else { - current_timestamp = amqp_get_monotonic_timestamp(); - if (0 == current_timestamp) { - return AMQP_STATUS_TIMER_FAILURE; - } } - /* TODO: Heartbeat timeout goes here */ - if (current_timestamp > timeout_timestamp) { return AMQP_STATUS_TIMEOUT; } + if (state->heartbeat > 0) { + if (current_timestamp > state->next_recv_heartbeat) { + state->next_recv_heartbeat = current_timestamp; + } + next_timestamp = (state->next_recv_heartbeat < state->next_send_heartbeat ? + state->next_recv_heartbeat : + state->next_send_heartbeat); + if (timeout) { + next_timestamp = (timeout_timestamp < next_timestamp ? + timeout_timestamp : next_timestamp); + } + } else if (timeout) { + next_timestamp = timeout_timestamp; + } else { + amqp_abort("Internal error: both timeout == NULL && state->heartbeat == 0"); + } + ns_until_next_timeout = timeout_timestamp - current_timestamp; memset(&tv, 0, sizeof(struct timeval)); @@ -436,8 +468,19 @@ static int wait_frame_inner(amqp_connection_state_t state, /* socket is ready to be read from */ break; } else if (0 == res) { - /* Timed out - return */ - return AMQP_STATUS_TIMEOUT; + if (next_timestamp == state->next_recv_heartbeat) { + amqp_socket_close(state->socket); + state->socket = NULL; + return AMQP_STATUS_HEARTBEAT_TIMEOUT; + + } else if (next_timestamp == timeout_timestamp) { + return AMQP_STATUS_TIMEOUT; + } else if (next_timestamp == state->next_send_heartbeat) { + /* send heartbeat happens at the beginning of this loop */ + continue; + } else { + amqp_abort("Internal error: unable to determine timeout reason"); + } } else if (errno == EINTR) { /* Try again */ continue; @@ -453,6 +496,14 @@ static int wait_frame_inner(amqp_connection_state_t state, return res; } + if (state->heartbeat > 0) { + uint64_t current_time = amqp_get_monotonic_timestamp(); + if (0 == current_time) { + return AMQP_STATUS_TIMER_FAILURE; + } + state->next_recv_heartbeat = current_time + (2 * state->heartbeat * AMQP_NS_PER_S); + } + state->sock_inbound_limit = res; state->sock_inbound_offset = 0; } |