From daa0e663ed6ae6d49ed3cc0da3043e3d4e8b2252 Mon Sep 17 00:00:00 2001 From: Alan Antonuk Date: Fri, 14 Jun 2013 17:48:05 -0700 Subject: Add partial support for heartbeats in wait_frame --- librabbitmq/amqp.h | 1 + librabbitmq/amqp_api.c | 3 +- librabbitmq/amqp_connection.c | 18 ++++++++++ librabbitmq/amqp_private.h | 3 ++ librabbitmq/amqp_socket.c | 77 +++++++++++++++++++++++++++++++++++-------- 5 files changed, 88 insertions(+), 14 deletions(-) diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h index de2afbd..7f479c8 100644 --- a/librabbitmq/amqp.h +++ b/librabbitmq/amqp.h @@ -348,6 +348,7 @@ typedef enum amqp_status_enum_ AMQP_STATUS_WRONG_METHOD = -0x000C, AMQP_STATUS_TIMEOUT = -0x000D, AMQP_STATUS_TIMER_FAILURE = -0x000E, + AMQP_STATUS_HEARTBEAT_TIMEOUT = -0x000F, AMQP_STATUS_TCP_ERROR = -0x0100, AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR = -0x0101, diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c index e6c9a0d..bdd7d80 100644 --- a/librabbitmq/amqp_api.c +++ b/librabbitmq/amqp_api.c @@ -70,7 +70,8 @@ static const char *base_error_strings[] = { "table too large for buffer", /* AMQP_STATUS_TABLE_TOO_BIG -0x000B */ "unexpected method received", /* AMQP_STATUS_WRONG_METHOD -0x000C */ "request timed out", /* AMQP_STATUS_TIMEOUT -0x000D */ - "system timer has failed" /* AMQP_STATUS_TIMER_FAILED -0x000E */ + "system timer has failed", /* AMQP_STATUS_TIMER_FAILED -0x000E */ + "heartbeat timeout, connection closed"/* AMQP_STATUS_HEARTBEAT_TIMEOUT -0x000F */ }; static const char *tcp_error_strings[] = { diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c index ae3edf4..7ba6f42 100644 --- a/librabbitmq/amqp_connection.c +++ b/librabbitmq/amqp_connection.c @@ -40,6 +40,7 @@ #include "amqp_tcp_socket.h" #include "amqp_private.h" +#include "amqp_timer.h" #include #include #include @@ -133,6 +134,15 @@ int amqp_tune_connection(amqp_connection_state_t state, state->frame_max = frame_max; state->heartbeat = heartbeat; + if (state->heartbeat > 0) { + uint64_t current_time = amqp_get_monotonic_timestamp(); + if (0 == current_time) { + return AMQP_STATUS_TIMER_FAILURE; + } + state->next_send_heartbeat = current_time + (state->heartbeat * AMQP_NS_PER_S); + state->next_recv_heartbeat = current_time + (2 * state->heartbeat * AMQP_NS_PER_S); + } + state->outbound_buffer.len = frame_max; newbuf = realloc(state->outbound_buffer.bytes, frame_max); if (newbuf == NULL) { @@ -491,5 +501,13 @@ int amqp_send_frame(amqp_connection_state_t state, out_frame_len + HEADER_SIZE + FOOTER_SIZE); } + if (state->heartbeat > 0) { + uint64_t current_time = amqp_get_monotonic_timestamp(); + if (0 == current_time) { + return AMQP_STATUS_TIMER_FAILURE; + } + state->next_send_heartbeat = current_time + (state->heartbeat * AMQP_NS_PER_S); + } + return res; } diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h index baf96e1..cb7494b 100644 --- a/librabbitmq/amqp_private.h +++ b/librabbitmq/amqp_private.h @@ -157,6 +157,9 @@ struct amqp_connection_state_t_ { amqp_link_t *last_queued_frame; amqp_rpc_reply_t most_recent_api_result; + + uint64_t next_recv_heartbeat; + uint64_t next_send_heartbeat; }; amqp_pool_t *amqp_get_or_create_channel_pool(amqp_connection_state_t connection, amqp_channel_t channel); diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index a4ec537..8966cb9 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -355,6 +355,11 @@ static int wait_frame_inner(amqp_connection_state_t state, { uint64_t current_timestamp = 0; uint64_t timeout_timestamp = 0; + uint64_t next_timestamp = 0; + + if (timeout && (timeout->tv_sec < 0 || timeout->tv_usec < 0)) { + return AMQP_STATUS_INVALID_PARAMETER; + } while (1) { int res; @@ -371,6 +376,10 @@ static int wait_frame_inner(amqp_connection_state_t state, state->sock_inbound_offset += res; + if (AMQP_FRAME_HEARTBEAT == decoded_frame->frame_type) { + continue; + } + if (decoded_frame->frame_type != 0) { /* Complete frame was read. Return it. */ return AMQP_STATUS_OK; @@ -380,10 +389,7 @@ static int wait_frame_inner(amqp_connection_state_t state, assert(res != 0); } - if (timeout) { - if (timeout->tv_sec < 0 || timeout->tv_usec < 0) { - return AMQP_STATUS_INVALID_PARAMETER; - } + if (timeout || state->heartbeat > 0) { while (1) { int fd; fd_set read_fd; @@ -402,28 +408,54 @@ static int wait_frame_inner(amqp_connection_state_t state, FD_ZERO(&except_fd); FD_SET(fd, &except_fd); + current_timestamp = amqp_get_monotonic_timestamp(); if (0 == current_timestamp) { + return AMQP_STATUS_TIMER_FAILURE; + } + + if (state->heartbeat > 0 && current_timestamp > state->next_send_heartbeat) { + amqp_frame_t heartbeat; + heartbeat.channel = 0; + heartbeat.frame_type = AMQP_FRAME_HEARTBEAT; + + res = amqp_send_frame(state, &heartbeat); + if (AMQP_STATUS_OK != res) { + return res; + } + current_timestamp = amqp_get_monotonic_timestamp(); if (0 == current_timestamp) { return AMQP_STATUS_TIMER_FAILURE; } + } + if (timeout && 0 == timeout_timestamp) { timeout_timestamp = current_timestamp + timeout->tv_sec * AMQP_NS_PER_S + timeout->tv_usec * AMQP_NS_PER_US; - } else { - current_timestamp = amqp_get_monotonic_timestamp(); - if (0 == current_timestamp) { - return AMQP_STATUS_TIMER_FAILURE; - } } - /* TODO: Heartbeat timeout goes here */ - if (current_timestamp > timeout_timestamp) { return AMQP_STATUS_TIMEOUT; } + if (state->heartbeat > 0) { + if (current_timestamp > state->next_recv_heartbeat) { + state->next_recv_heartbeat = current_timestamp; + } + next_timestamp = (state->next_recv_heartbeat < state->next_send_heartbeat ? + state->next_recv_heartbeat : + state->next_send_heartbeat); + if (timeout) { + next_timestamp = (timeout_timestamp < next_timestamp ? + timeout_timestamp : next_timestamp); + } + } else if (timeout) { + next_timestamp = timeout_timestamp; + } else { + amqp_abort("Internal error: both timeout == NULL && state->heartbeat == 0"); + } + ns_until_next_timeout = timeout_timestamp - current_timestamp; memset(&tv, 0, sizeof(struct timeval)); @@ -436,8 +468,19 @@ static int wait_frame_inner(amqp_connection_state_t state, /* socket is ready to be read from */ break; } else if (0 == res) { - /* Timed out - return */ - return AMQP_STATUS_TIMEOUT; + if (next_timestamp == state->next_recv_heartbeat) { + amqp_socket_close(state->socket); + state->socket = NULL; + return AMQP_STATUS_HEARTBEAT_TIMEOUT; + + } else if (next_timestamp == timeout_timestamp) { + return AMQP_STATUS_TIMEOUT; + } else if (next_timestamp == state->next_send_heartbeat) { + /* send heartbeat happens at the beginning of this loop */ + continue; + } else { + amqp_abort("Internal error: unable to determine timeout reason"); + } } else if (errno == EINTR) { /* Try again */ continue; @@ -453,6 +496,14 @@ static int wait_frame_inner(amqp_connection_state_t state, return res; } + if (state->heartbeat > 0) { + uint64_t current_time = amqp_get_monotonic_timestamp(); + if (0 == current_time) { + return AMQP_STATUS_TIMER_FAILURE; + } + state->next_recv_heartbeat = current_time + (2 * state->heartbeat * AMQP_NS_PER_S); + } + state->sock_inbound_limit = res; state->sock_inbound_offset = 0; } -- cgit v1.2.1