summaryrefslogtreecommitdiff
path: root/librabbitmq/amqp_socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'librabbitmq/amqp_socket.c')
-rw-r--r--librabbitmq/amqp_socket.c77
1 files changed, 64 insertions, 13 deletions
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index a4ec537..8966cb9 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -355,6 +355,11 @@ static int wait_frame_inner(amqp_connection_state_t state,
{
uint64_t current_timestamp = 0;
uint64_t timeout_timestamp = 0;
+ uint64_t next_timestamp = 0;
+
+ if (timeout && (timeout->tv_sec < 0 || timeout->tv_usec < 0)) {
+ return AMQP_STATUS_INVALID_PARAMETER;
+ }
while (1) {
int res;
@@ -371,6 +376,10 @@ static int wait_frame_inner(amqp_connection_state_t state,
state->sock_inbound_offset += res;
+ if (AMQP_FRAME_HEARTBEAT == decoded_frame->frame_type) {
+ continue;
+ }
+
if (decoded_frame->frame_type != 0) {
/* Complete frame was read. Return it. */
return AMQP_STATUS_OK;
@@ -380,10 +389,7 @@ static int wait_frame_inner(amqp_connection_state_t state,
assert(res != 0);
}
- if (timeout) {
- if (timeout->tv_sec < 0 || timeout->tv_usec < 0) {
- return AMQP_STATUS_INVALID_PARAMETER;
- }
+ if (timeout || state->heartbeat > 0) {
while (1) {
int fd;
fd_set read_fd;
@@ -402,28 +408,54 @@ static int wait_frame_inner(amqp_connection_state_t state,
FD_ZERO(&except_fd);
FD_SET(fd, &except_fd);
+ current_timestamp = amqp_get_monotonic_timestamp();
if (0 == current_timestamp) {
+ return AMQP_STATUS_TIMER_FAILURE;
+ }
+
+ if (state->heartbeat > 0 && current_timestamp > state->next_send_heartbeat) {
+ amqp_frame_t heartbeat;
+ heartbeat.channel = 0;
+ heartbeat.frame_type = AMQP_FRAME_HEARTBEAT;
+
+ res = amqp_send_frame(state, &heartbeat);
+ if (AMQP_STATUS_OK != res) {
+ return res;
+ }
+
current_timestamp = amqp_get_monotonic_timestamp();
if (0 == current_timestamp) {
return AMQP_STATUS_TIMER_FAILURE;
}
+ }
+ if (timeout && 0 == timeout_timestamp) {
timeout_timestamp = current_timestamp +
timeout->tv_sec * AMQP_NS_PER_S +
timeout->tv_usec * AMQP_NS_PER_US;
- } else {
- current_timestamp = amqp_get_monotonic_timestamp();
- if (0 == current_timestamp) {
- return AMQP_STATUS_TIMER_FAILURE;
- }
}
- /* TODO: Heartbeat timeout goes here */
-
if (current_timestamp > timeout_timestamp) {
return AMQP_STATUS_TIMEOUT;
}
+ if (state->heartbeat > 0) {
+ if (current_timestamp > state->next_recv_heartbeat) {
+ state->next_recv_heartbeat = current_timestamp;
+ }
+ next_timestamp = (state->next_recv_heartbeat < state->next_send_heartbeat ?
+ state->next_recv_heartbeat :
+ state->next_send_heartbeat);
+ if (timeout) {
+ next_timestamp = (timeout_timestamp < next_timestamp ?
+ timeout_timestamp : next_timestamp);
+ }
+ } else if (timeout) {
+ next_timestamp = timeout_timestamp;
+ } else {
+ amqp_abort("Internal error: both timeout == NULL && state->heartbeat == 0");
+ }
+
ns_until_next_timeout = timeout_timestamp - current_timestamp;
memset(&tv, 0, sizeof(struct timeval));
@@ -436,8 +468,19 @@ static int wait_frame_inner(amqp_connection_state_t state,
/* socket is ready to be read from */
break;
} else if (0 == res) {
- /* Timed out - return */
- return AMQP_STATUS_TIMEOUT;
+ if (next_timestamp == state->next_recv_heartbeat) {
+ amqp_socket_close(state->socket);
+ state->socket = NULL;
+ return AMQP_STATUS_HEARTBEAT_TIMEOUT;
+
+ } else if (next_timestamp == timeout_timestamp) {
+ return AMQP_STATUS_TIMEOUT;
+ } else if (next_timestamp == state->next_send_heartbeat) {
+ /* send heartbeat happens at the beginning of this loop */
+ continue;
+ } else {
+ amqp_abort("Internal error: unable to determine timeout reason");
+ }
} else if (errno == EINTR) {
/* Try again */
continue;
@@ -453,6 +496,14 @@ static int wait_frame_inner(amqp_connection_state_t state,
return res;
}
+ if (state->heartbeat > 0) {
+ uint64_t current_time = amqp_get_monotonic_timestamp();
+ if (0 == current_time) {
+ return AMQP_STATUS_TIMER_FAILURE;
+ }
+ state->next_recv_heartbeat = current_time + (2 * state->heartbeat * AMQP_NS_PER_S);
+ }
+
state->sock_inbound_limit = res;
state->sock_inbound_offset = 0;
}