summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Antonuk <alan.antonuk@gmail.com>2015-04-21 23:41:42 -0700
committerAlan Antonuk <alan.antonuk@gmail.com>2015-05-03 20:24:02 -0700
commit5498dc66c3be4176c3631ee4b4ec26d6cb4966ae (patch)
treee0b16bc62d0c647bef581faae204cdf4dcf790ce
parenta1326beab0054f35cf607297ae47ed3330086585 (diff)
downloadrabbitmq-c-5498dc66c3be4176c3631ee4b4ec26d6cb4966ae.tar.gz
Refactor heartbeat timeout code to be simpler
Refactor the heartbeat timeout code to hopefully simplify it and hopefully make it less hairy to deal with in the future.
-rw-r--r--librabbitmq/amqp_api.c24
-rw-r--r--librabbitmq/amqp_connection.c33
-rw-r--r--librabbitmq/amqp_private.h23
-rw-r--r--librabbitmq/amqp_socket.c101
-rw-r--r--librabbitmq/amqp_time.c55
-rw-r--r--librabbitmq/amqp_time.h24
6 files changed, 140 insertions, 120 deletions
diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c
index ec2f9a1..3088b10 100644
--- a/librabbitmq/amqp_api.c
+++ b/librabbitmq/amqp_api.c
@@ -191,19 +191,17 @@ int amqp_basic_publish(amqp_connection_state_t state,
m.immediate = immediate;
m.ticket = 0;
- if (amqp_heartbeat_enabled(state)) {
- uint64_t current_timestamp = amqp_get_monotonic_timestamp();
- if (0 == current_timestamp) {
- return AMQP_STATUS_TIMER_FAILURE;
- }
-
- if (current_timestamp > state->next_recv_heartbeat) {
- res = amqp_try_recv(state);
- if (AMQP_STATUS_TIMEOUT == res) {
- return AMQP_STATUS_HEARTBEAT_TIMEOUT;
- } else if (AMQP_STATUS_OK != res) {
- return res;
- }
+ /* TODO(alanxz): this heartbeat check is happening in the wrong place, it
+ * should really be done in amqp_try_send/writev */
+ res = amqp_time_has_past(state->next_recv_heartbeat);
+ if (AMQP_STATUS_TIMER_FAILURE == res) {
+ return res;
+ } else if (AMQP_STATUS_TIMEOUT == res) {
+ res = amqp_try_recv(state);
+ if (AMQP_STATUS_TIMEOUT == res) {
+ return AMQP_STATUS_HEARTBEAT_TIMEOUT;
+ } else if (AMQP_STATUS_OK != res) {
+ return res;
}
}
diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c
index 6fb2a62..9175f95 100644
--- a/librabbitmq/amqp_connection.c
+++ b/librabbitmq/amqp_connection.c
@@ -139,20 +139,27 @@ int amqp_tune_connection(amqp_connection_state_t state,
int heartbeat)
{
void *newbuf;
+ int res;
ENFORCE_STATE(state, CONNECTION_STATE_IDLE);
state->channel_max = channel_max;
state->frame_max = frame_max;
+
state->heartbeat = heartbeat;
+ if (0 > state->heartbeat) {
+ state->heartbeat = 0;
+ }
- if (amqp_heartbeat_enabled(state)) {
- uint64_t current_time = amqp_get_monotonic_timestamp();
- if (0 == current_time) {
- return AMQP_STATUS_TIMER_FAILURE;
- }
- state->next_send_heartbeat = amqp_calc_next_send_heartbeat(state, current_time);
- state->next_recv_heartbeat = amqp_calc_next_recv_heartbeat(state, current_time);
+ res = amqp_time_s_from_now(&state->next_send_heartbeat,
+ amqp_heartbeat_send(state));
+ if (AMQP_STATUS_OK != res) {
+ return res;
+ }
+ res = amqp_time_s_from_now(&state->next_recv_heartbeat,
+ amqp_heartbeat_recv(state));
+ if (AMQP_STATUS_OK != res) {
+ return res;
}
state->outbound_buffer.len = frame_max;
@@ -527,17 +534,15 @@ int amqp_send_frame(amqp_connection_state_t state,
res = amqp_try_send(state, out_frame,
out_frame_len + HEADER_SIZE + FOOTER_SIZE);
}
-
- if (state->heartbeat > 0) {
- uint64_t current_time = amqp_get_monotonic_timestamp();
- if (0 == current_time) {
- return AMQP_STATUS_TIMER_FAILURE;
- }
- state->next_send_heartbeat = amqp_calc_next_send_heartbeat(state, current_time);
+ if (AMQP_STATUS_OK != res) {
+ return res;
}
+ res = amqp_time_s_from_now(&state->next_send_heartbeat,
+ amqp_heartbeat_send(state));
return res;
}
+
amqp_table_t *
amqp_get_server_properties(amqp_connection_state_t state)
{
diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h
index 62c8ff0..9497cc8 100644
--- a/librabbitmq/amqp_private.h
+++ b/librabbitmq/amqp_private.h
@@ -156,7 +156,13 @@ struct amqp_connection_state_t_ {
int channel_max;
int frame_max;
+
+ /* Heartbeat interval in seconds. If this is <= 0, then heartbeats are not
+ * enabled, and next_recv_heartbeat and next_send_heartbeat are set to
+ * infinite */
int heartbeat;
+ amqp_time_t next_recv_heartbeat;
+ amqp_time_t next_send_heartbeat;
/* buffer for holding frame headers. Allows us to delay allocating
* the raw frame buffer until the type, channel, and size are all known
@@ -180,9 +186,6 @@ struct amqp_connection_state_t_ {
amqp_rpc_reply_t most_recent_api_result;
- uint64_t next_recv_heartbeat;
- uint64_t next_send_heartbeat;
-
amqp_table_t server_properties;
amqp_pool_t properties_pool;
};
@@ -190,19 +193,13 @@ struct amqp_connection_state_t_ {
amqp_pool_t *amqp_get_or_create_channel_pool(amqp_connection_state_t connection, amqp_channel_t channel);
amqp_pool_t *amqp_get_channel_pool(amqp_connection_state_t state, amqp_channel_t channel);
-static inline amqp_boolean_t amqp_heartbeat_enabled(amqp_connection_state_t state)
-{
- return (state->heartbeat > 0);
-}
-static inline uint64_t amqp_calc_next_send_heartbeat(amqp_connection_state_t state, uint64_t cur)
-{
- return cur + ((uint64_t)state->heartbeat * AMQP_NS_PER_S);
+static inline int amqp_heartbeat_send(amqp_connection_state_t state) {
+ return state->heartbeat;
}
-static inline uint64_t amqp_calc_next_recv_heartbeat(amqp_connection_state_t state, uint64_t cur)
-{
- return cur + ((uint64_t)state->heartbeat * 2 * AMQP_NS_PER_S);
+static inline int amqp_heartbeat_recv(amqp_connection_state_t state) {
+ return 2 * state->heartbeat;
}
int amqp_try_recv(amqp_connection_state_t state);
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index c3a3000..6785187 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -728,14 +728,11 @@ start_recv:
state->sock_inbound_limit = res;
state->sock_inbound_offset = 0;
- if (amqp_heartbeat_enabled(state)) {
- uint64_t current_time = amqp_get_monotonic_timestamp();
- if (0 == current_time) {
- return AMQP_STATUS_TIMER_FAILURE;
- }
- state->next_recv_heartbeat = amqp_calc_next_recv_heartbeat(state, current_time);
+ res = amqp_time_s_from_now(&state->next_recv_heartbeat,
+ amqp_heartbeat_recv(state));
+ if (AMQP_STATUS_OK != res) {
+ return res;
}
-
return AMQP_STATUS_OK;
}
@@ -789,14 +786,13 @@ static int wait_frame_inner(amqp_connection_state_t state,
amqp_frame_t *decoded_frame,
struct timeval *timeout)
{
- uint64_t current_timestamp = 0;
- uint64_t timeout_timestamp = 0;
- uint64_t next_timestamp = 0;
- struct timeval tv;
amqp_time_t deadline;
+ amqp_time_t timeout_deadline;
+ int res;
- if (timeout && (timeout->tv_sec < 0 || timeout->tv_usec < 0)) {
- return AMQP_STATUS_INVALID_PARAMETER;
+ res = amqp_time_from_now(&timeout_deadline, timeout);
+ if (AMQP_STATUS_OK != res) {
+ return res;
}
while (1) {
@@ -821,85 +817,34 @@ static int wait_frame_inner(amqp_connection_state_t state,
}
beginrecv:
- if (timeout || amqp_heartbeat_enabled(state)) {
- uint64_t ns_until_next_timeout;
-
- current_timestamp = amqp_get_monotonic_timestamp();
- if (0 == current_timestamp) {
- return AMQP_STATUS_TIMER_FAILURE;
- }
-
- if (amqp_heartbeat_enabled(state) && current_timestamp > state->next_send_heartbeat) {
- amqp_frame_t heartbeat;
- heartbeat.channel = 0;
- heartbeat.frame_type = AMQP_FRAME_HEARTBEAT;
-
- res = amqp_send_frame(state, &heartbeat);
- if (AMQP_STATUS_OK != res) {
- return res;
- }
-
- current_timestamp = amqp_get_monotonic_timestamp();
- if (0 == current_timestamp) {
- return AMQP_STATUS_TIMER_FAILURE;
- }
- }
-
- if (timeout) {
- if (0 == timeout_timestamp) {
- timeout_timestamp = current_timestamp +
- (uint64_t)timeout->tv_sec * AMQP_NS_PER_S +
- (uint64_t)timeout->tv_usec * AMQP_NS_PER_US;
- }
-
- if (current_timestamp > timeout_timestamp) {
- return AMQP_STATUS_TIMEOUT;
- }
- }
-
- if (amqp_heartbeat_enabled(state)) {
- if (current_timestamp > state->next_recv_heartbeat) {
- state->next_recv_heartbeat = current_timestamp;
- }
- next_timestamp = (state->next_recv_heartbeat < state->next_send_heartbeat ?
- state->next_recv_heartbeat :
- state->next_send_heartbeat);
- if (timeout) {
- next_timestamp = (timeout_timestamp < next_timestamp ?
- timeout_timestamp : next_timestamp);
- }
- } else if (timeout) {
- next_timestamp = timeout_timestamp;
- } else {
- amqp_abort("Internal error: both timeout == NULL && state->heartbeat == 0");
- }
-
- ns_until_next_timeout = next_timestamp - current_timestamp;
-
- memset(&tv, 0, sizeof(struct timeval));
- tv.tv_sec = ns_until_next_timeout / AMQP_NS_PER_S;
- tv.tv_usec = (ns_until_next_timeout % AMQP_NS_PER_S) / AMQP_NS_PER_US;
+ res = amqp_time_has_past(state->next_send_heartbeat);
+ if (AMQP_STATUS_TIMER_FAILURE == res) {
+ return res;
+ } else if (AMQP_STATUS_TIMEOUT == res) {
+ amqp_frame_t heartbeat;
+ heartbeat.channel = 0;
+ heartbeat.frame_type = AMQP_FRAME_HEARTBEAT;
- /* TODO: refactor the above so that this doesn't require a timer ping */
- res = amqp_time_from_now(&deadline, &tv);
+ res = amqp_send_frame(state, &heartbeat);
if (AMQP_STATUS_OK != res) {
return res;
}
- } else {
- deadline = amqp_time_infinite();
}
+ deadline = amqp_time_first(timeout_deadline,
+ amqp_time_first(state->next_recv_heartbeat,
+ state->next_send_heartbeat));
/* TODO this needs to wait for a _frame_ and not anything written from the
* socket */
res = recv_with_timeout(state, deadline);
if (AMQP_STATUS_TIMEOUT == res) {
- if (next_timestamp == state->next_recv_heartbeat) {
+ if (amqp_time_equal(deadline, state->next_recv_heartbeat)) {
amqp_socket_close(state->socket);
return AMQP_STATUS_HEARTBEAT_TIMEOUT;
- } else if (next_timestamp == timeout_timestamp) {
+ } else if (amqp_time_equal(deadline, timeout_deadline)) {
return AMQP_STATUS_TIMEOUT;
- } else if (next_timestamp == state->next_send_heartbeat) {
+ } else if (amqp_time_equal(deadline, state->next_send_heartbeat)) {
/* send heartbeat happens before we do recv_with_timeout */
goto beginrecv;
} else {
diff --git a/librabbitmq/amqp_time.c b/librabbitmq/amqp_time.c
index c1bd286..f083315 100644
--- a/librabbitmq/amqp_time.c
+++ b/librabbitmq/amqp_time.c
@@ -134,8 +134,31 @@ int amqp_time_from_now(amqp_time_t *time, struct timeval *timeout) {
}
time->time_point_ns = now_ns + delta_ns;
- if (now_ns > time->time_point_ns ||
- delta_ns > time->time_point_ns) {
+ if (now_ns > time->time_point_ns || delta_ns > time->time_point_ns) {
+ return AMQP_STATUS_INVALID_PARAMETER;
+ }
+
+ return AMQP_STATUS_OK;
+}
+
+int amqp_time_s_from_now(amqp_time_t *time, int seconds) {
+ uint64_t now_ns;
+ uint64_t delta_ns;
+ assert(NULL != time);
+
+ if (0 >= seconds) {
+ *time = amqp_time_infinite();
+ return AMQP_STATUS_OK;
+ }
+
+ now_ns = amqp_get_monotonic_timestamp();
+ if (0 == now_ns) {
+ return AMQP_STATUS_TIMER_FAILURE;
+ }
+
+ delta_ns = (uint64_t)seconds * AMQP_NS_PER_S;
+ time->time_point_ns = now_ns + delta_ns;
+ if (now_ns > time->time_point_ns || delta_ns > time->time_point_ns) {
return AMQP_STATUS_INVALID_PARAMETER;
}
@@ -180,3 +203,31 @@ int amqp_time_ms_until(amqp_time_t time) {
return left_ms;
}
+
+int amqp_time_has_past(amqp_time_t time) {
+ uint64_t now_ns;
+ if (UINT64_MAX == time.time_point_ns) {
+ return AMQP_STATUS_OK;
+ }
+
+ now_ns = amqp_get_monotonic_timestamp();
+ if (0 == now_ns) {
+ return AMQP_STATUS_TIMER_FAILURE;
+ }
+
+ if (now_ns > time.time_point_ns) {
+ return AMQP_STATUS_TIMEOUT;
+ }
+ return AMQP_STATUS_OK;
+}
+
+amqp_time_t amqp_time_first(amqp_time_t l, amqp_time_t r) {
+ if (l.time_point_ns < r.time_point_ns) {
+ return l;
+ }
+ return r;
+}
+
+int amqp_time_equal(amqp_time_t l, amqp_time_t r) {
+ return l.time_point_ns == r.time_point_ns;
+}
diff --git a/librabbitmq/amqp_time.h b/librabbitmq/amqp_time.h
index fa81384..1bafd50 100644
--- a/librabbitmq/amqp_time.h
+++ b/librabbitmq/amqp_time.h
@@ -74,6 +74,15 @@ uint64_t amqp_get_monotonic_timestamp(void);
*/
int amqp_time_from_now(amqp_time_t *time, struct timeval *timeout);
+/* Get a amqp_time_t that is seconds from now.
+ * If seconds <= 0, then amqp_time_infinite() is created.
+ *
+ * Returns AMQP_STATUS_OK on success.
+ * AMQP_STATUS_TIMER_FAILURE if the underlying call to get the current timestamp
+ * fails.
+ */
+int amqp_time_s_from_now(amqp_time_t *time, int seconds);
+
/* Create an immediate amqp_time_t */
amqp_time_t amqp_time_immediate(void);
@@ -91,4 +100,19 @@ amqp_time_t amqp_time_infinite(void);
*/
int amqp_time_ms_until(amqp_time_t time);
+/* Test whether current time is past the provided time.
+ *
+ * TODO: this isn't a great interface to use. Fix this.
+ *
+ * Return AMQP_STATUS_OK if time has not past
+ * Return AMQP_STATUS_TIMEOUT if time has past
+ * Return AMQP_STATUS_TIMER_FAILURE if the underlying call to get the current
+ * timestamp fails.
+ */
+int amqp_time_has_past(amqp_time_t time);
+
+/* Return the time value that happens first */
+amqp_time_t amqp_time_first(amqp_time_t l, amqp_time_t r);
+
+int amqp_time_equal(amqp_time_t l, amqp_time_t r);
#endif /* AMQP_TIMER_H */