summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Antonuk <alan.antonuk@gmail.com>2013-06-14 17:48:05 -0700
committerAlan Antonuk <alan.antonuk@gmail.com>2013-06-25 16:10:51 -0700
commitdaa0e663ed6ae6d49ed3cc0da3043e3d4e8b2252 (patch)
treec4f85982841a272f3f1a0b76708755adde52c378
parent3866433fe273eded1e601b3274028684c3e77919 (diff)
downloadrabbitmq-c-daa0e663ed6ae6d49ed3cc0da3043e3d4e8b2252.tar.gz
Add partial support for heartbeats in wait_frame
-rw-r--r--librabbitmq/amqp.h1
-rw-r--r--librabbitmq/amqp_api.c3
-rw-r--r--librabbitmq/amqp_connection.c18
-rw-r--r--librabbitmq/amqp_private.h3
-rw-r--r--librabbitmq/amqp_socket.c77
5 files changed, 88 insertions, 14 deletions
diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h
index de2afbd..7f479c8 100644
--- a/librabbitmq/amqp.h
+++ b/librabbitmq/amqp.h
@@ -348,6 +348,7 @@ typedef enum amqp_status_enum_
AMQP_STATUS_WRONG_METHOD = -0x000C,
AMQP_STATUS_TIMEOUT = -0x000D,
AMQP_STATUS_TIMER_FAILURE = -0x000E,
+ AMQP_STATUS_HEARTBEAT_TIMEOUT = -0x000F,
AMQP_STATUS_TCP_ERROR = -0x0100,
AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR = -0x0101,
diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c
index e6c9a0d..bdd7d80 100644
--- a/librabbitmq/amqp_api.c
+++ b/librabbitmq/amqp_api.c
@@ -70,7 +70,8 @@ static const char *base_error_strings[] = {
"table too large for buffer", /* AMQP_STATUS_TABLE_TOO_BIG -0x000B */
"unexpected method received", /* AMQP_STATUS_WRONG_METHOD -0x000C */
"request timed out", /* AMQP_STATUS_TIMEOUT -0x000D */
- "system timer has failed" /* AMQP_STATUS_TIMER_FAILED -0x000E */
+ "system timer has failed", /* AMQP_STATUS_TIMER_FAILED -0x000E */
+ "heartbeat timeout, connection closed"/* AMQP_STATUS_HEARTBEAT_TIMEOUT -0x000F */
};
static const char *tcp_error_strings[] = {
diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c
index ae3edf4..7ba6f42 100644
--- a/librabbitmq/amqp_connection.c
+++ b/librabbitmq/amqp_connection.c
@@ -40,6 +40,7 @@
#include "amqp_tcp_socket.h"
#include "amqp_private.h"
+#include "amqp_timer.h"
#include <assert.h>
#include <errno.h>
#include <stdint.h>
@@ -133,6 +134,15 @@ int amqp_tune_connection(amqp_connection_state_t state,
state->frame_max = frame_max;
state->heartbeat = heartbeat;
+ if (state->heartbeat > 0) {
+ uint64_t current_time = amqp_get_monotonic_timestamp();
+ if (0 == current_time) {
+ return AMQP_STATUS_TIMER_FAILURE;
+ }
+ state->next_send_heartbeat = current_time + (state->heartbeat * AMQP_NS_PER_S);
+ state->next_recv_heartbeat = current_time + (2 * state->heartbeat * AMQP_NS_PER_S);
+ }
+
state->outbound_buffer.len = frame_max;
newbuf = realloc(state->outbound_buffer.bytes, frame_max);
if (newbuf == NULL) {
@@ -491,5 +501,13 @@ int amqp_send_frame(amqp_connection_state_t state,
out_frame_len + HEADER_SIZE + FOOTER_SIZE);
}
+ if (state->heartbeat > 0) {
+ uint64_t current_time = amqp_get_monotonic_timestamp();
+ if (0 == current_time) {
+ return AMQP_STATUS_TIMER_FAILURE;
+ }
+ state->next_send_heartbeat = current_time + (state->heartbeat * AMQP_NS_PER_S);
+ }
+
return res;
}
diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h
index baf96e1..cb7494b 100644
--- a/librabbitmq/amqp_private.h
+++ b/librabbitmq/amqp_private.h
@@ -157,6 +157,9 @@ struct amqp_connection_state_t_ {
amqp_link_t *last_queued_frame;
amqp_rpc_reply_t most_recent_api_result;
+
+ uint64_t next_recv_heartbeat;
+ uint64_t next_send_heartbeat;
};
amqp_pool_t *amqp_get_or_create_channel_pool(amqp_connection_state_t connection, amqp_channel_t channel);
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index a4ec537..8966cb9 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -355,6 +355,11 @@ static int wait_frame_inner(amqp_connection_state_t state,
{
uint64_t current_timestamp = 0;
uint64_t timeout_timestamp = 0;
+ uint64_t next_timestamp = 0;
+
+ if (timeout && (timeout->tv_sec < 0 || timeout->tv_usec < 0)) {
+ return AMQP_STATUS_INVALID_PARAMETER;
+ }
while (1) {
int res;
@@ -371,6 +376,10 @@ static int wait_frame_inner(amqp_connection_state_t state,
state->sock_inbound_offset += res;
+ if (AMQP_FRAME_HEARTBEAT == decoded_frame->frame_type) {
+ continue;
+ }
+
if (decoded_frame->frame_type != 0) {
/* Complete frame was read. Return it. */
return AMQP_STATUS_OK;
@@ -380,10 +389,7 @@ static int wait_frame_inner(amqp_connection_state_t state,
assert(res != 0);
}
- if (timeout) {
- if (timeout->tv_sec < 0 || timeout->tv_usec < 0) {
- return AMQP_STATUS_INVALID_PARAMETER;
- }
+ if (timeout || state->heartbeat > 0) {
while (1) {
int fd;
fd_set read_fd;
@@ -402,28 +408,54 @@ static int wait_frame_inner(amqp_connection_state_t state,
FD_ZERO(&except_fd);
FD_SET(fd, &except_fd);
+ current_timestamp = amqp_get_monotonic_timestamp();
if (0 == current_timestamp) {
+ return AMQP_STATUS_TIMER_FAILURE;
+ }
+
+ if (state->heartbeat > 0 && current_timestamp > state->next_send_heartbeat) {
+ amqp_frame_t heartbeat;
+ heartbeat.channel = 0;
+ heartbeat.frame_type = AMQP_FRAME_HEARTBEAT;
+
+ res = amqp_send_frame(state, &heartbeat);
+ if (AMQP_STATUS_OK != res) {
+ return res;
+ }
+
current_timestamp = amqp_get_monotonic_timestamp();
if (0 == current_timestamp) {
return AMQP_STATUS_TIMER_FAILURE;
}
+ }
+ if (timeout && 0 == timeout_timestamp) {
timeout_timestamp = current_timestamp +
timeout->tv_sec * AMQP_NS_PER_S +
timeout->tv_usec * AMQP_NS_PER_US;
- } else {
- current_timestamp = amqp_get_monotonic_timestamp();
- if (0 == current_timestamp) {
- return AMQP_STATUS_TIMER_FAILURE;
- }
}
- /* TODO: Heartbeat timeout goes here */
-
if (current_timestamp > timeout_timestamp) {
return AMQP_STATUS_TIMEOUT;
}
+ if (state->heartbeat > 0) {
+ if (current_timestamp > state->next_recv_heartbeat) {
+ state->next_recv_heartbeat = current_timestamp;
+ }
+ next_timestamp = (state->next_recv_heartbeat < state->next_send_heartbeat ?
+ state->next_recv_heartbeat :
+ state->next_send_heartbeat);
+ if (timeout) {
+ next_timestamp = (timeout_timestamp < next_timestamp ?
+ timeout_timestamp : next_timestamp);
+ }
+ } else if (timeout) {
+ next_timestamp = timeout_timestamp;
+ } else {
+ amqp_abort("Internal error: both timeout == NULL && state->heartbeat == 0");
+ }
+
ns_until_next_timeout = timeout_timestamp - current_timestamp;
memset(&tv, 0, sizeof(struct timeval));
@@ -436,8 +468,19 @@ static int wait_frame_inner(amqp_connection_state_t state,
/* socket is ready to be read from */
break;
} else if (0 == res) {
- /* Timed out - return */
- return AMQP_STATUS_TIMEOUT;
+ if (next_timestamp == state->next_recv_heartbeat) {
+ amqp_socket_close(state->socket);
+ state->socket = NULL;
+ return AMQP_STATUS_HEARTBEAT_TIMEOUT;
+
+ } else if (next_timestamp == timeout_timestamp) {
+ return AMQP_STATUS_TIMEOUT;
+ } else if (next_timestamp == state->next_send_heartbeat) {
+ /* send heartbeat happens at the beginning of this loop */
+ continue;
+ } else {
+ amqp_abort("Internal error: unable to determine timeout reason");
+ }
} else if (errno == EINTR) {
/* Try again */
continue;
@@ -453,6 +496,14 @@ static int wait_frame_inner(amqp_connection_state_t state,
return res;
}
+ if (state->heartbeat > 0) {
+ uint64_t current_time = amqp_get_monotonic_timestamp();
+ if (0 == current_time) {
+ return AMQP_STATUS_TIMER_FAILURE;
+ }
+ state->next_recv_heartbeat = current_time + (2 * state->heartbeat * AMQP_NS_PER_S);
+ }
+
state->sock_inbound_limit = res;
state->sock_inbound_offset = 0;
}