From aca5dc13c7ebce355f490b395ca3a57593348e84 Mon Sep 17 00:00:00 2001 From: Alan Antonuk Date: Tue, 25 Jun 2013 03:42:31 -0700 Subject: Add support for heartbeats in amqp_basic_publish Check heartbeats when doing basic.publish. Do this by doing a recv on the socket. --- librabbitmq/amqp_api.c | 17 +++ librabbitmq/amqp_connection.c | 4 +- librabbitmq/amqp_private.h | 2 + librabbitmq/amqp_socket.c | 307 ++++++++++++++++++++++++++++-------------- 4 files changed, 228 insertions(+), 102 deletions(-) (limited to 'librabbitmq') diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c index bdd7d80..525d92c 100644 --- a/librabbitmq/amqp_api.c +++ b/librabbitmq/amqp_api.c @@ -39,6 +39,7 @@ #endif #include "amqp_private.h" +#include "amqp_timer.h" #include #include #include @@ -186,6 +187,22 @@ int amqp_basic_publish(amqp_connection_state_t state, m.immediate = immediate; m.ticket = 0; + if (state->heartbeat > 0) { + uint64_t current_timestamp = amqp_get_monotonic_timestamp(); + if (0 == current_timestamp) { + return AMQP_STATUS_TIMER_FAILURE; + } + + if (current_timestamp > state->next_recv_heartbeat) { + res = amqp_try_recv(state, current_timestamp); + if (AMQP_STATUS_TIMEOUT == res) { + return AMQP_STATUS_HEARTBEAT_TIMEOUT; + } else if (AMQP_STATUS_OK != res) { + return res; + } + } + } + res = amqp_send_method(state, channel, AMQP_BASIC_PUBLISH_METHOD, &m); if (res < 0) { return res; diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c index 7ba6f42..214dbec 100644 --- a/librabbitmq/amqp_connection.c +++ b/librabbitmq/amqp_connection.c @@ -139,8 +139,8 @@ int amqp_tune_connection(amqp_connection_state_t state, if (0 == current_time) { return AMQP_STATUS_TIMER_FAILURE; } - state->next_send_heartbeat = current_time + (state->heartbeat * AMQP_NS_PER_S); - state->next_recv_heartbeat = current_time + (2 * state->heartbeat * AMQP_NS_PER_S); + state->next_send_heartbeat = current_time + ((uint64_t)state->heartbeat * AMQP_NS_PER_S); + state->next_recv_heartbeat = current_time + (2 * (uint64_t)state->heartbeat * AMQP_NS_PER_S); } state->outbound_buffer.len = frame_max; diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h index cb7494b..14a9881 100644 --- a/librabbitmq/amqp_private.h +++ b/librabbitmq/amqp_private.h @@ -165,6 +165,8 @@ struct amqp_connection_state_t_ { amqp_pool_t *amqp_get_or_create_channel_pool(amqp_connection_state_t connection, amqp_channel_t channel); amqp_pool_t *amqp_get_channel_pool(amqp_connection_state_t state, amqp_channel_t channel); +int amqp_try_recv(amqp_connection_state_t state, uint64_t current_time); + static inline void *amqp_offset(void *data, size_t offset) { return (char *)data + offset; diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index 8966cb9..580047a 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -349,6 +349,150 @@ amqp_boolean_t amqp_data_in_buffer(amqp_connection_state_t state) return (state->sock_inbound_offset < state->sock_inbound_limit); } +static int consume_one_frame(amqp_connection_state_t state, amqp_frame_t *decoded_frame) +{ + int res; + + amqp_bytes_t buffer; + buffer.len = state->sock_inbound_limit - state->sock_inbound_offset; + buffer.bytes = ((char *) state->sock_inbound_buffer.bytes) + state->sock_inbound_offset; + + res = amqp_handle_input(state, buffer, decoded_frame); + if (res < 0) { + return res; + } + + state->sock_inbound_offset += res; + + return AMQP_STATUS_OK; +} + + +static int recv_with_timeout(amqp_connection_state_t state, uint64_t start, struct timeval *timeout) +{ + int res; + + if (timeout) { + int fd; + fd_set read_fd; + fd_set except_fd; + + fd = amqp_get_sockfd(state); + if (-1 == fd) { + return AMQP_STATUS_CONNECTION_CLOSED; + } + + while (1) { + FD_ZERO(&read_fd); + FD_SET(fd, &read_fd); + + FD_ZERO(&except_fd); + FD_SET(fd, &except_fd); + + res = select(fd + 1, &read_fd, NULL, &except_fd, timeout); + + if (0 < res) { + break; + } else if (0 == res) { + return AMQP_STATUS_TIMEOUT; + } else if (-1 == res) { + if (EINTR == errno) { + if (timeout) { + uint64_t end_timestamp; + uint64_t time_left; + uint64_t current_timestamp = amqp_get_monotonic_timestamp(); + if (0 == current_timestamp) { + return AMQP_STATUS_TIMER_FAILURE; + } + end_timestamp = start + timeout->tv_sec * AMQP_NS_PER_S + + timeout->tv_usec * AMQP_NS_PER_US; + if (current_timestamp > end_timestamp) { + return AMQP_STATUS_TIMEOUT; + } + + time_left = end_timestamp - current_timestamp; + + timeout->tv_sec = time_left / AMQP_NS_PER_S; + timeout->tv_usec = (time_left % AMQP_NS_PER_S) / AMQP_NS_PER_US; + } + continue; + } + return AMQP_STATUS_SOCKET_ERROR; + } + } + } + + res = amqp_socket_recv(state->socket, state->sock_inbound_buffer.bytes, + state->sock_inbound_buffer.len, 0); + + if (res < 0) { + return res; + } + + state->sock_inbound_limit = res; + state->sock_inbound_offset = 0; + + if (state->heartbeat > 0) { + uint64_t current_time = amqp_get_monotonic_timestamp(); + if (0 == current_time) { + return AMQP_STATUS_TIMER_FAILURE; + } + state->next_recv_heartbeat = current_time + (2 * (uint64_t)state->heartbeat * AMQP_NS_PER_S); + } + + return AMQP_STATUS_OK; +} + +int amqp_try_recv(amqp_connection_state_t state, uint64_t current_time) +{ + struct timeval tv; + + while (amqp_data_in_buffer(state)) { + amqp_frame_t frame; + int res = consume_one_frame(state, &frame); + + if (AMQP_STATUS_OK != res) { + return res; + } + + if (frame.frame_type != 0) { + amqp_pool_t *channel_pool; + amqp_frame_t *frame_copy; + amqp_link_t *link; + + channel_pool = amqp_get_or_create_channel_pool(state, frame.channel); + if (NULL == channel_pool) { + return AMQP_STATUS_NO_MEMORY; + } + + frame_copy = amqp_pool_alloc(channel_pool, sizeof(amqp_frame_t)); + link = amqp_pool_alloc(channel_pool, sizeof(amqp_link_t)); + + if (frame_copy == NULL || link == NULL) { + return AMQP_STATUS_NO_MEMORY; + } + + *frame_copy = frame; + + link->next = NULL; + link->data = frame_copy; + + if (state->last_queued_frame == NULL) { + state->first_queued_frame = link; + } else { + state->last_queued_frame->next = link; + } + state->last_queued_frame = link; + } + } + + memset(&tv, 0, sizeof(struct timeval)); + tv.tv_sec = 0; + tv.tv_usec = 0; + + return recv_with_timeout(state, current_time, &tv); +} + static int wait_frame_inner(amqp_connection_state_t state, amqp_frame_t *decoded_frame, struct timeval *timeout) @@ -356,6 +500,8 @@ static int wait_frame_inner(amqp_connection_state_t state, uint64_t current_timestamp = 0; uint64_t timeout_timestamp = 0; uint64_t next_timestamp = 0; + struct timeval tv; + struct timeval *tvp = NULL; if (timeout && (timeout->tv_sec < 0 || timeout->tv_usec < 0)) { return AMQP_STATUS_INVALID_PARAMETER; @@ -365,18 +511,14 @@ static int wait_frame_inner(amqp_connection_state_t state, int res; while (amqp_data_in_buffer(state)) { - amqp_bytes_t buffer; - buffer.len = state->sock_inbound_limit - state->sock_inbound_offset; - buffer.bytes = ((char *) state->sock_inbound_buffer.bytes) + state->sock_inbound_offset; + res = consume_one_frame(state, decoded_frame); - res = amqp_handle_input(state, buffer, decoded_frame); - if (res < 0) { + if (AMQP_STATUS_OK != res) { return res; } - state->sock_inbound_offset += res; - if (AMQP_FRAME_HEARTBEAT == decoded_frame->frame_type) { + amqp_maybe_release_buffers_on_channel(state, 0); continue; } @@ -389,123 +531,88 @@ static int wait_frame_inner(amqp_connection_state_t state, assert(res != 0); } +beginrecv: if (timeout || state->heartbeat > 0) { - while (1) { - int fd; - fd_set read_fd; - fd_set except_fd; - uint64_t ns_until_next_timeout; - struct timeval tv; - - fd = amqp_get_sockfd(state); - if (-1 == fd) { - return AMQP_STATUS_CONNECTION_CLOSED; - } + uint64_t ns_until_next_timeout; - FD_ZERO(&read_fd); - FD_SET(fd, &read_fd); + current_timestamp = amqp_get_monotonic_timestamp(); + if (0 == current_timestamp) { + return AMQP_STATUS_TIMER_FAILURE; + } - FD_ZERO(&except_fd); - FD_SET(fd, &except_fd); + if (state->heartbeat > 0 && current_timestamp > state->next_send_heartbeat) { + amqp_frame_t heartbeat; + heartbeat.channel = 0; + heartbeat.frame_type = AMQP_FRAME_HEARTBEAT; + + res = amqp_send_frame(state, &heartbeat); + if (AMQP_STATUS_OK != res) { + return res; + } current_timestamp = amqp_get_monotonic_timestamp(); if (0 == current_timestamp) { return AMQP_STATUS_TIMER_FAILURE; } + } - if (state->heartbeat > 0 && current_timestamp > state->next_send_heartbeat) { - amqp_frame_t heartbeat; - heartbeat.channel = 0; - heartbeat.frame_type = AMQP_FRAME_HEARTBEAT; - - res = amqp_send_frame(state, &heartbeat); - if (AMQP_STATUS_OK != res) { - return res; - } - - current_timestamp = amqp_get_monotonic_timestamp(); - if (0 == current_timestamp) { - return AMQP_STATUS_TIMER_FAILURE; - } - } - - if (timeout && 0 == timeout_timestamp) { + if (timeout) { + if (0 == timeout_timestamp) { timeout_timestamp = current_timestamp + - timeout->tv_sec * AMQP_NS_PER_S + - timeout->tv_usec * AMQP_NS_PER_US; + timeout->tv_sec * AMQP_NS_PER_S + + timeout->tv_usec * AMQP_NS_PER_US; } if (current_timestamp > timeout_timestamp) { return AMQP_STATUS_TIMEOUT; } + } - if (state->heartbeat > 0) { - if (current_timestamp > state->next_recv_heartbeat) { - state->next_recv_heartbeat = current_timestamp; - } - next_timestamp = (state->next_recv_heartbeat < state->next_send_heartbeat ? - state->next_recv_heartbeat : - state->next_send_heartbeat); - if (timeout) { - next_timestamp = (timeout_timestamp < next_timestamp ? - timeout_timestamp : next_timestamp); - } - } else if (timeout) { - next_timestamp = timeout_timestamp; - } else { - amqp_abort("Internal error: both timeout == NULL && state->heartbeat == 0"); - } - ns_until_next_timeout = timeout_timestamp - current_timestamp; - - memset(&tv, 0, sizeof(struct timeval)); - tv.tv_sec = ns_until_next_timeout / AMQP_NS_PER_S; - tv.tv_usec = (ns_until_next_timeout % AMQP_NS_PER_S) / AMQP_NS_PER_US; - - res = select(fd + 1, &read_fd, NULL, &except_fd, &tv); - - if (res > 0) { - /* socket is ready to be read from */ - break; - } else if (0 == res) { - if (next_timestamp == state->next_recv_heartbeat) { - amqp_socket_close(state->socket); - state->socket = NULL; - return AMQP_STATUS_HEARTBEAT_TIMEOUT; - - } else if (next_timestamp == timeout_timestamp) { - return AMQP_STATUS_TIMEOUT; - } else if (next_timestamp == state->next_send_heartbeat) { - /* send heartbeat happens at the beginning of this loop */ - continue; - } else { - amqp_abort("Internal error: unable to determine timeout reason"); - } - } else if (errno == EINTR) { - /* Try again */ - continue; - } else { - return AMQP_STATUS_SOCKET_ERROR; + if (state->heartbeat > 0) { + if (current_timestamp > state->next_recv_heartbeat) { + state->next_recv_heartbeat = current_timestamp; } + next_timestamp = (state->next_recv_heartbeat < state->next_send_heartbeat ? + state->next_recv_heartbeat : + state->next_send_heartbeat); + if (timeout) { + next_timestamp = (timeout_timestamp < next_timestamp ? + timeout_timestamp : next_timestamp); + } + } else if (timeout) { + next_timestamp = timeout_timestamp; + } else { + amqp_abort("Internal error: both timeout == NULL && state->heartbeat == 0"); } - } - res = amqp_socket_recv(state->socket, state->sock_inbound_buffer.bytes, - state->sock_inbound_buffer.len, 0); - if (res < 0) { - return res; + ns_until_next_timeout = next_timestamp - current_timestamp; + + memset(&tv, 0, sizeof(struct timeval)); + tv.tv_sec = ns_until_next_timeout / AMQP_NS_PER_S; + tv.tv_usec = (ns_until_next_timeout % AMQP_NS_PER_S) / AMQP_NS_PER_US; + + tvp = &tv; } - if (state->heartbeat > 0) { - uint64_t current_time = amqp_get_monotonic_timestamp(); - if (0 == current_time) { - return AMQP_STATUS_TIMER_FAILURE; + res = recv_with_timeout(state, current_timestamp, tvp); + + if (AMQP_STATUS_TIMEOUT == res) { + if (next_timestamp == state->next_recv_heartbeat) { + amqp_socket_close(state->socket); + state->socket = NULL; + return AMQP_STATUS_HEARTBEAT_TIMEOUT; + } else if (next_timestamp == timeout_timestamp) { + return AMQP_STATUS_TIMEOUT; + } else if (next_timestamp == state->next_send_heartbeat) { + /* send heartbeat happens before we do recv_with_timeout */ + goto beginrecv; + } else { + amqp_abort("Internal error: unable to determine timeout reason"); } - state->next_recv_heartbeat = current_time + (2 * state->heartbeat * AMQP_NS_PER_S); + } else if (AMQP_STATUS_OK != res) { + return res; } - - state->sock_inbound_limit = res; - state->sock_inbound_offset = 0; } } -- cgit v1.2.1