summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--librabbitmq/amqp_api.c2
-rw-r--r--librabbitmq/amqp_connection.c8
-rw-r--r--librabbitmq/amqp_private.h16
-rw-r--r--librabbitmq/amqp_socket.c10
4 files changed, 26 insertions, 10 deletions
diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c
index 525d92c..2f40681 100644
--- a/librabbitmq/amqp_api.c
+++ b/librabbitmq/amqp_api.c
@@ -187,7 +187,7 @@ int amqp_basic_publish(amqp_connection_state_t state,
m.immediate = immediate;
m.ticket = 0;
- if (state->heartbeat > 0) {
+ if (amqp_heartbeat_enabled(state)) {
uint64_t current_timestamp = amqp_get_monotonic_timestamp();
if (0 == current_timestamp) {
return AMQP_STATUS_TIMER_FAILURE;
diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c
index 214dbec..d5c29b0 100644
--- a/librabbitmq/amqp_connection.c
+++ b/librabbitmq/amqp_connection.c
@@ -134,13 +134,13 @@ int amqp_tune_connection(amqp_connection_state_t state,
state->frame_max = frame_max;
state->heartbeat = heartbeat;
- if (state->heartbeat > 0) {
+ if (amqp_heartbeat_enabled(state)) {
uint64_t current_time = amqp_get_monotonic_timestamp();
if (0 == current_time) {
return AMQP_STATUS_TIMER_FAILURE;
}
- state->next_send_heartbeat = current_time + ((uint64_t)state->heartbeat * AMQP_NS_PER_S);
- state->next_recv_heartbeat = current_time + (2 * (uint64_t)state->heartbeat * AMQP_NS_PER_S);
+ state->next_send_heartbeat = amqp_calc_next_send_heartbeat(state, current_time);
+ state->next_recv_heartbeat = amqp_calc_next_recv_heartbeat(state, current_time);
}
state->outbound_buffer.len = frame_max;
@@ -506,7 +506,7 @@ int amqp_send_frame(amqp_connection_state_t state,
if (0 == current_time) {
return AMQP_STATUS_TIMER_FAILURE;
}
- state->next_send_heartbeat = current_time + (state->heartbeat * AMQP_NS_PER_S);
+ state->next_send_heartbeat = amqp_calc_next_send_heartbeat(state, current_time);
}
return res;
diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h
index 14a9881..7fc5b57 100644
--- a/librabbitmq/amqp_private.h
+++ b/librabbitmq/amqp_private.h
@@ -78,6 +78,7 @@ amqp_ssl_error_string(int err);
#endif
#include "amqp_socket.h"
+#include "amqp_timer.h"
/*
* Connection states: XXX FIX THIS
@@ -165,6 +166,21 @@ struct amqp_connection_state_t_ {
amqp_pool_t *amqp_get_or_create_channel_pool(amqp_connection_state_t connection, amqp_channel_t channel);
amqp_pool_t *amqp_get_channel_pool(amqp_connection_state_t state, amqp_channel_t channel);
+static inline amqp_boolean_t amqp_heartbeat_enabled(amqp_connection_state_t state)
+{
+ return (state->heartbeat > 0);
+}
+
+static inline uint64_t amqp_calc_next_send_heartbeat(amqp_connection_state_t state, uint64_t cur)
+{
+ return cur + ((uint64_t)state->heartbeat * AMQP_NS_PER_S);
+}
+
+static inline uint64_t amqp_calc_next_recv_heartbeat(amqp_connection_state_t state, uint64_t cur)
+{
+ return cur + ((uint64_t)state->heartbeat * 2 * AMQP_NS_PER_S);
+}
+
int amqp_try_recv(amqp_connection_state_t state, uint64_t current_time);
static inline void *amqp_offset(void *data, size_t offset)
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index 580047a..441192a 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -432,12 +432,12 @@ static int recv_with_timeout(amqp_connection_state_t state, uint64_t start, stru
state->sock_inbound_limit = res;
state->sock_inbound_offset = 0;
- if (state->heartbeat > 0) {
+ if (amqp_heartbeat_enabled(state)) {
uint64_t current_time = amqp_get_monotonic_timestamp();
if (0 == current_time) {
return AMQP_STATUS_TIMER_FAILURE;
}
- state->next_recv_heartbeat = current_time + (2 * (uint64_t)state->heartbeat * AMQP_NS_PER_S);
+ state->next_recv_heartbeat = amqp_calc_next_recv_heartbeat(state, current_time);
}
return AMQP_STATUS_OK;
@@ -532,7 +532,7 @@ static int wait_frame_inner(amqp_connection_state_t state,
}
beginrecv:
- if (timeout || state->heartbeat > 0) {
+ if (timeout || amqp_heartbeat_enabled(state)) {
uint64_t ns_until_next_timeout;
current_timestamp = amqp_get_monotonic_timestamp();
@@ -540,7 +540,7 @@ beginrecv:
return AMQP_STATUS_TIMER_FAILURE;
}
- if (state->heartbeat > 0 && current_timestamp > state->next_send_heartbeat) {
+ if (amqp_heartbeat_enabled(state) && current_timestamp > state->next_send_heartbeat) {
amqp_frame_t heartbeat;
heartbeat.channel = 0;
heartbeat.frame_type = AMQP_FRAME_HEARTBEAT;
@@ -569,7 +569,7 @@ beginrecv:
}
- if (state->heartbeat > 0) {
+ if (amqp_heartbeat_enabled(state)) {
if (current_timestamp > state->next_recv_heartbeat) {
state->next_recv_heartbeat = current_timestamp;
}