diff options
-rw-r--r-- | librabbitmq/amqp_api.c | 2 | ||||
-rw-r--r-- | librabbitmq/amqp_connection.c | 8 | ||||
-rw-r--r-- | librabbitmq/amqp_private.h | 16 | ||||
-rw-r--r-- | librabbitmq/amqp_socket.c | 10 |
4 files changed, 26 insertions, 10 deletions
diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c index 525d92c..2f40681 100644 --- a/librabbitmq/amqp_api.c +++ b/librabbitmq/amqp_api.c @@ -187,7 +187,7 @@ int amqp_basic_publish(amqp_connection_state_t state, m.immediate = immediate; m.ticket = 0; - if (state->heartbeat > 0) { + if (amqp_heartbeat_enabled(state)) { uint64_t current_timestamp = amqp_get_monotonic_timestamp(); if (0 == current_timestamp) { return AMQP_STATUS_TIMER_FAILURE; diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c index 214dbec..d5c29b0 100644 --- a/librabbitmq/amqp_connection.c +++ b/librabbitmq/amqp_connection.c @@ -134,13 +134,13 @@ int amqp_tune_connection(amqp_connection_state_t state, state->frame_max = frame_max; state->heartbeat = heartbeat; - if (state->heartbeat > 0) { + if (amqp_heartbeat_enabled(state)) { uint64_t current_time = amqp_get_monotonic_timestamp(); if (0 == current_time) { return AMQP_STATUS_TIMER_FAILURE; } - state->next_send_heartbeat = current_time + ((uint64_t)state->heartbeat * AMQP_NS_PER_S); - state->next_recv_heartbeat = current_time + (2 * (uint64_t)state->heartbeat * AMQP_NS_PER_S); + state->next_send_heartbeat = amqp_calc_next_send_heartbeat(state, current_time); + state->next_recv_heartbeat = amqp_calc_next_recv_heartbeat(state, current_time); } state->outbound_buffer.len = frame_max; @@ -506,7 +506,7 @@ int amqp_send_frame(amqp_connection_state_t state, if (0 == current_time) { return AMQP_STATUS_TIMER_FAILURE; } - state->next_send_heartbeat = current_time + (state->heartbeat * AMQP_NS_PER_S); + state->next_send_heartbeat = amqp_calc_next_send_heartbeat(state, current_time); } return res; diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h index 14a9881..7fc5b57 100644 --- a/librabbitmq/amqp_private.h +++ b/librabbitmq/amqp_private.h @@ -78,6 +78,7 @@ amqp_ssl_error_string(int err); #endif #include "amqp_socket.h" +#include "amqp_timer.h" /* * Connection states: XXX FIX THIS @@ -165,6 +166,21 @@ struct amqp_connection_state_t_ { amqp_pool_t *amqp_get_or_create_channel_pool(amqp_connection_state_t connection, amqp_channel_t channel); amqp_pool_t *amqp_get_channel_pool(amqp_connection_state_t state, amqp_channel_t channel); +static inline amqp_boolean_t amqp_heartbeat_enabled(amqp_connection_state_t state) +{ + return (state->heartbeat > 0); +} + +static inline uint64_t amqp_calc_next_send_heartbeat(amqp_connection_state_t state, uint64_t cur) +{ + return cur + ((uint64_t)state->heartbeat * AMQP_NS_PER_S); +} + +static inline uint64_t amqp_calc_next_recv_heartbeat(amqp_connection_state_t state, uint64_t cur) +{ + return cur + ((uint64_t)state->heartbeat * 2 * AMQP_NS_PER_S); +} + int amqp_try_recv(amqp_connection_state_t state, uint64_t current_time); static inline void *amqp_offset(void *data, size_t offset) diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index 580047a..441192a 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -432,12 +432,12 @@ static int recv_with_timeout(amqp_connection_state_t state, uint64_t start, stru state->sock_inbound_limit = res; state->sock_inbound_offset = 0; - if (state->heartbeat > 0) { + if (amqp_heartbeat_enabled(state)) { uint64_t current_time = amqp_get_monotonic_timestamp(); if (0 == current_time) { return AMQP_STATUS_TIMER_FAILURE; } - state->next_recv_heartbeat = current_time + (2 * (uint64_t)state->heartbeat * AMQP_NS_PER_S); + state->next_recv_heartbeat = amqp_calc_next_recv_heartbeat(state, current_time); } return AMQP_STATUS_OK; @@ -532,7 +532,7 @@ static int wait_frame_inner(amqp_connection_state_t state, } beginrecv: - if (timeout || state->heartbeat > 0) { + if (timeout || amqp_heartbeat_enabled(state)) { uint64_t ns_until_next_timeout; current_timestamp = amqp_get_monotonic_timestamp(); @@ -540,7 +540,7 @@ beginrecv: return AMQP_STATUS_TIMER_FAILURE; } - if (state->heartbeat > 0 && current_timestamp > state->next_send_heartbeat) { + if (amqp_heartbeat_enabled(state) && current_timestamp > state->next_send_heartbeat) { amqp_frame_t heartbeat; heartbeat.channel = 0; heartbeat.frame_type = AMQP_FRAME_HEARTBEAT; @@ -569,7 +569,7 @@ beginrecv: } - if (state->heartbeat > 0) { + if (amqp_heartbeat_enabled(state)) { if (current_timestamp > state->next_recv_heartbeat) { state->next_recv_heartbeat = current_timestamp; } |