summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Antonuk <alan.antonuk@gmail.com>2013-06-25 03:42:31 -0700
committerAlan Antonuk <alan.antonuk@gmail.com>2013-06-25 16:10:51 -0700
commitaca5dc13c7ebce355f490b395ca3a57593348e84 (patch)
tree67acfb70c65200737a6e006e8764dd99c4c8743f
parentdaa0e663ed6ae6d49ed3cc0da3043e3d4e8b2252 (diff)
downloadrabbitmq-c-aca5dc13c7ebce355f490b395ca3a57593348e84.tar.gz
Add support for heartbeats in amqp_basic_publish
Check heartbeats when doing basic.publish. Do this by doing a recv on the socket.
-rw-r--r--librabbitmq/amqp_api.c17
-rw-r--r--librabbitmq/amqp_connection.c4
-rw-r--r--librabbitmq/amqp_private.h2
-rw-r--r--librabbitmq/amqp_socket.c307
4 files changed, 228 insertions, 102 deletions
diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c
index bdd7d80..525d92c 100644
--- a/librabbitmq/amqp_api.c
+++ b/librabbitmq/amqp_api.c
@@ -39,6 +39,7 @@
#endif
#include "amqp_private.h"
+#include "amqp_timer.h"
#include <assert.h>
#include <stdarg.h>
#include <stdint.h>
@@ -186,6 +187,22 @@ int amqp_basic_publish(amqp_connection_state_t state,
m.immediate = immediate;
m.ticket = 0;
+ if (state->heartbeat > 0) {
+ uint64_t current_timestamp = amqp_get_monotonic_timestamp();
+ if (0 == current_timestamp) {
+ return AMQP_STATUS_TIMER_FAILURE;
+ }
+
+ if (current_timestamp > state->next_recv_heartbeat) {
+ res = amqp_try_recv(state, current_timestamp);
+ if (AMQP_STATUS_TIMEOUT == res) {
+ return AMQP_STATUS_HEARTBEAT_TIMEOUT;
+ } else if (AMQP_STATUS_OK != res) {
+ return res;
+ }
+ }
+ }
+
res = amqp_send_method(state, channel, AMQP_BASIC_PUBLISH_METHOD, &m);
if (res < 0) {
return res;
diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c
index 7ba6f42..214dbec 100644
--- a/librabbitmq/amqp_connection.c
+++ b/librabbitmq/amqp_connection.c
@@ -139,8 +139,8 @@ int amqp_tune_connection(amqp_connection_state_t state,
if (0 == current_time) {
return AMQP_STATUS_TIMER_FAILURE;
}
- state->next_send_heartbeat = current_time + (state->heartbeat * AMQP_NS_PER_S);
- state->next_recv_heartbeat = current_time + (2 * state->heartbeat * AMQP_NS_PER_S);
+ state->next_send_heartbeat = current_time + ((uint64_t)state->heartbeat * AMQP_NS_PER_S);
+ state->next_recv_heartbeat = current_time + (2 * (uint64_t)state->heartbeat * AMQP_NS_PER_S);
}
state->outbound_buffer.len = frame_max;
diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h
index cb7494b..14a9881 100644
--- a/librabbitmq/amqp_private.h
+++ b/librabbitmq/amqp_private.h
@@ -165,6 +165,8 @@ struct amqp_connection_state_t_ {
amqp_pool_t *amqp_get_or_create_channel_pool(amqp_connection_state_t connection, amqp_channel_t channel);
amqp_pool_t *amqp_get_channel_pool(amqp_connection_state_t state, amqp_channel_t channel);
+int amqp_try_recv(amqp_connection_state_t state, uint64_t current_time);
+
static inline void *amqp_offset(void *data, size_t offset)
{
return (char *)data + offset;
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index 8966cb9..580047a 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -349,6 +349,150 @@ amqp_boolean_t amqp_data_in_buffer(amqp_connection_state_t state)
return (state->sock_inbound_offset < state->sock_inbound_limit);
}
+static int consume_one_frame(amqp_connection_state_t state, amqp_frame_t *decoded_frame)
+{
+ int res;
+
+ amqp_bytes_t buffer;
+ buffer.len = state->sock_inbound_limit - state->sock_inbound_offset;
+ buffer.bytes = ((char *) state->sock_inbound_buffer.bytes) + state->sock_inbound_offset;
+
+ res = amqp_handle_input(state, buffer, decoded_frame);
+ if (res < 0) {
+ return res;
+ }
+
+ state->sock_inbound_offset += res;
+
+ return AMQP_STATUS_OK;
+}
+
+
+static int recv_with_timeout(amqp_connection_state_t state, uint64_t start, struct timeval *timeout)
+{
+ int res;
+
+ if (timeout) {
+ int fd;
+ fd_set read_fd;
+ fd_set except_fd;
+
+ fd = amqp_get_sockfd(state);
+ if (-1 == fd) {
+ return AMQP_STATUS_CONNECTION_CLOSED;
+ }
+
+ while (1) {
+ FD_ZERO(&read_fd);
+ FD_SET(fd, &read_fd);
+
+ FD_ZERO(&except_fd);
+ FD_SET(fd, &except_fd);
+
+ res = select(fd + 1, &read_fd, NULL, &except_fd, timeout);
+
+ if (0 < res) {
+ break;
+ } else if (0 == res) {
+ return AMQP_STATUS_TIMEOUT;
+ } else if (-1 == res) {
+ if (EINTR == errno) {
+ if (timeout) {
+ uint64_t end_timestamp;
+ uint64_t time_left;
+ uint64_t current_timestamp = amqp_get_monotonic_timestamp();
+ if (0 == current_timestamp) {
+ return AMQP_STATUS_TIMER_FAILURE;
+ }
+ end_timestamp = start + timeout->tv_sec * AMQP_NS_PER_S +
+ timeout->tv_usec * AMQP_NS_PER_US;
+ if (current_timestamp > end_timestamp) {
+ return AMQP_STATUS_TIMEOUT;
+ }
+
+ time_left = end_timestamp - current_timestamp;
+
+ timeout->tv_sec = time_left / AMQP_NS_PER_S;
+ timeout->tv_usec = (time_left % AMQP_NS_PER_S) / AMQP_NS_PER_US;
+ }
+ continue;
+ }
+ return AMQP_STATUS_SOCKET_ERROR;
+ }
+ }
+ }
+
+ res = amqp_socket_recv(state->socket, state->sock_inbound_buffer.bytes,
+ state->sock_inbound_buffer.len, 0);
+
+ if (res < 0) {
+ return res;
+ }
+
+ state->sock_inbound_limit = res;
+ state->sock_inbound_offset = 0;
+
+ if (state->heartbeat > 0) {
+ uint64_t current_time = amqp_get_monotonic_timestamp();
+ if (0 == current_time) {
+ return AMQP_STATUS_TIMER_FAILURE;
+ }
+ state->next_recv_heartbeat = current_time + (2 * (uint64_t)state->heartbeat * AMQP_NS_PER_S);
+ }
+
+ return AMQP_STATUS_OK;
+}
+
+int amqp_try_recv(amqp_connection_state_t state, uint64_t current_time)
+{
+ struct timeval tv;
+
+ while (amqp_data_in_buffer(state)) {
+ amqp_frame_t frame;
+ int res = consume_one_frame(state, &frame);
+
+ if (AMQP_STATUS_OK != res) {
+ return res;
+ }
+
+ if (frame.frame_type != 0) {
+ amqp_pool_t *channel_pool;
+ amqp_frame_t *frame_copy;
+ amqp_link_t *link;
+
+ channel_pool = amqp_get_or_create_channel_pool(state, frame.channel);
+ if (NULL == channel_pool) {
+ return AMQP_STATUS_NO_MEMORY;
+ }
+
+ frame_copy = amqp_pool_alloc(channel_pool, sizeof(amqp_frame_t));
+ link = amqp_pool_alloc(channel_pool, sizeof(amqp_link_t));
+
+ if (frame_copy == NULL || link == NULL) {
+ return AMQP_STATUS_NO_MEMORY;
+ }
+
+ *frame_copy = frame;
+
+ link->next = NULL;
+ link->data = frame_copy;
+
+ if (state->last_queued_frame == NULL) {
+ state->first_queued_frame = link;
+ } else {
+ state->last_queued_frame->next = link;
+ }
+ state->last_queued_frame = link;
+ }
+ }
+
+ memset(&tv, 0, sizeof(struct timeval));
+ tv.tv_sec = 0;
+ tv.tv_usec = 0;
+
+ return recv_with_timeout(state, current_time, &tv);
+}
+
static int wait_frame_inner(amqp_connection_state_t state,
amqp_frame_t *decoded_frame,
struct timeval *timeout)
@@ -356,6 +500,8 @@ static int wait_frame_inner(amqp_connection_state_t state,
uint64_t current_timestamp = 0;
uint64_t timeout_timestamp = 0;
uint64_t next_timestamp = 0;
+ struct timeval tv;
+ struct timeval *tvp = NULL;
if (timeout && (timeout->tv_sec < 0 || timeout->tv_usec < 0)) {
return AMQP_STATUS_INVALID_PARAMETER;
@@ -365,18 +511,14 @@ static int wait_frame_inner(amqp_connection_state_t state,
int res;
while (amqp_data_in_buffer(state)) {
- amqp_bytes_t buffer;
- buffer.len = state->sock_inbound_limit - state->sock_inbound_offset;
- buffer.bytes = ((char *) state->sock_inbound_buffer.bytes) + state->sock_inbound_offset;
+ res = consume_one_frame(state, decoded_frame);
- res = amqp_handle_input(state, buffer, decoded_frame);
- if (res < 0) {
+ if (AMQP_STATUS_OK != res) {
return res;
}
- state->sock_inbound_offset += res;
-
if (AMQP_FRAME_HEARTBEAT == decoded_frame->frame_type) {
+ amqp_maybe_release_buffers_on_channel(state, 0);
continue;
}
@@ -389,123 +531,88 @@ static int wait_frame_inner(amqp_connection_state_t state,
assert(res != 0);
}
+beginrecv:
if (timeout || state->heartbeat > 0) {
- while (1) {
- int fd;
- fd_set read_fd;
- fd_set except_fd;
- uint64_t ns_until_next_timeout;
- struct timeval tv;
-
- fd = amqp_get_sockfd(state);
- if (-1 == fd) {
- return AMQP_STATUS_CONNECTION_CLOSED;
- }
+ uint64_t ns_until_next_timeout;
- FD_ZERO(&read_fd);
- FD_SET(fd, &read_fd);
+ current_timestamp = amqp_get_monotonic_timestamp();
+ if (0 == current_timestamp) {
+ return AMQP_STATUS_TIMER_FAILURE;
+ }
- FD_ZERO(&except_fd);
- FD_SET(fd, &except_fd);
+ if (state->heartbeat > 0 && current_timestamp > state->next_send_heartbeat) {
+ amqp_frame_t heartbeat;
+ heartbeat.channel = 0;
+ heartbeat.frame_type = AMQP_FRAME_HEARTBEAT;
+
+ res = amqp_send_frame(state, &heartbeat);
+ if (AMQP_STATUS_OK != res) {
+ return res;
+ }
current_timestamp = amqp_get_monotonic_timestamp();
if (0 == current_timestamp) {
return AMQP_STATUS_TIMER_FAILURE;
}
+ }
- if (state->heartbeat > 0 && current_timestamp > state->next_send_heartbeat) {
- amqp_frame_t heartbeat;
- heartbeat.channel = 0;
- heartbeat.frame_type = AMQP_FRAME_HEARTBEAT;
-
- res = amqp_send_frame(state, &heartbeat);
- if (AMQP_STATUS_OK != res) {
- return res;
- }
-
- current_timestamp = amqp_get_monotonic_timestamp();
- if (0 == current_timestamp) {
- return AMQP_STATUS_TIMER_FAILURE;
- }
- }
-
- if (timeout && 0 == timeout_timestamp) {
+ if (timeout) {
+ if (0 == timeout_timestamp) {
timeout_timestamp = current_timestamp +
- timeout->tv_sec * AMQP_NS_PER_S +
- timeout->tv_usec * AMQP_NS_PER_US;
+ timeout->tv_sec * AMQP_NS_PER_S +
+ timeout->tv_usec * AMQP_NS_PER_US;
}
if (current_timestamp > timeout_timestamp) {
return AMQP_STATUS_TIMEOUT;
}
+ }
- if (state->heartbeat > 0) {
- if (current_timestamp > state->next_recv_heartbeat) {
- state->next_recv_heartbeat = current_timestamp;
- }
- next_timestamp = (state->next_recv_heartbeat < state->next_send_heartbeat ?
- state->next_recv_heartbeat :
- state->next_send_heartbeat);
- if (timeout) {
- next_timestamp = (timeout_timestamp < next_timestamp ?
- timeout_timestamp : next_timestamp);
- }
- } else if (timeout) {
- next_timestamp = timeout_timestamp;
- } else {
- amqp_abort("Internal error: both timeout == NULL && state->heartbeat == 0");
- }
- ns_until_next_timeout = timeout_timestamp - current_timestamp;
-
- memset(&tv, 0, sizeof(struct timeval));
- tv.tv_sec = ns_until_next_timeout / AMQP_NS_PER_S;
- tv.tv_usec = (ns_until_next_timeout % AMQP_NS_PER_S) / AMQP_NS_PER_US;
-
- res = select(fd + 1, &read_fd, NULL, &except_fd, &tv);
-
- if (res > 0) {
- /* socket is ready to be read from */
- break;
- } else if (0 == res) {
- if (next_timestamp == state->next_recv_heartbeat) {
- amqp_socket_close(state->socket);
- state->socket = NULL;
- return AMQP_STATUS_HEARTBEAT_TIMEOUT;
-
- } else if (next_timestamp == timeout_timestamp) {
- return AMQP_STATUS_TIMEOUT;
- } else if (next_timestamp == state->next_send_heartbeat) {
- /* send heartbeat happens at the beginning of this loop */
- continue;
- } else {
- amqp_abort("Internal error: unable to determine timeout reason");
- }
- } else if (errno == EINTR) {
- /* Try again */
- continue;
- } else {
- return AMQP_STATUS_SOCKET_ERROR;
+ if (state->heartbeat > 0) {
+ if (current_timestamp > state->next_recv_heartbeat) {
+ state->next_recv_heartbeat = current_timestamp;
}
+ next_timestamp = (state->next_recv_heartbeat < state->next_send_heartbeat ?
+ state->next_recv_heartbeat :
+ state->next_send_heartbeat);
+ if (timeout) {
+ next_timestamp = (timeout_timestamp < next_timestamp ?
+ timeout_timestamp : next_timestamp);
+ }
+ } else if (timeout) {
+ next_timestamp = timeout_timestamp;
+ } else {
+ amqp_abort("Internal error: both timeout == NULL && state->heartbeat == 0");
}
- }
- res = amqp_socket_recv(state->socket, state->sock_inbound_buffer.bytes,
- state->sock_inbound_buffer.len, 0);
- if (res < 0) {
- return res;
+ ns_until_next_timeout = next_timestamp - current_timestamp;
+
+ memset(&tv, 0, sizeof(struct timeval));
+ tv.tv_sec = ns_until_next_timeout / AMQP_NS_PER_S;
+ tv.tv_usec = (ns_until_next_timeout % AMQP_NS_PER_S) / AMQP_NS_PER_US;
+
+ tvp = &tv;
}
- if (state->heartbeat > 0) {
- uint64_t current_time = amqp_get_monotonic_timestamp();
- if (0 == current_time) {
- return AMQP_STATUS_TIMER_FAILURE;
+ res = recv_with_timeout(state, current_timestamp, tvp);
+
+ if (AMQP_STATUS_TIMEOUT == res) {
+ if (next_timestamp == state->next_recv_heartbeat) {
+ amqp_socket_close(state->socket);
+ state->socket = NULL;
+ return AMQP_STATUS_HEARTBEAT_TIMEOUT;
+ } else if (next_timestamp == timeout_timestamp) {
+ return AMQP_STATUS_TIMEOUT;
+ } else if (next_timestamp == state->next_send_heartbeat) {
+ /* send heartbeat happens before we do recv_with_timeout */
+ goto beginrecv;
+ } else {
+ amqp_abort("Internal error: unable to determine timeout reason");
}
- state->next_recv_heartbeat = current_time + (2 * state->heartbeat * AMQP_NS_PER_S);
+ } else if (AMQP_STATUS_OK != res) {
+ return res;
}
-
- state->sock_inbound_limit = res;
- state->sock_inbound_offset = 0;
}
}