diff options
author | Alan Antonuk <alan.antonuk@gmail.com> | 2015-04-21 23:41:42 -0700 |
---|---|---|
committer | Alan Antonuk <alan.antonuk@gmail.com> | 2015-05-03 20:24:02 -0700 |
commit | 5498dc66c3be4176c3631ee4b4ec26d6cb4966ae (patch) | |
tree | e0b16bc62d0c647bef581faae204cdf4dcf790ce | |
parent | a1326beab0054f35cf607297ae47ed3330086585 (diff) | |
download | rabbitmq-c-5498dc66c3be4176c3631ee4b4ec26d6cb4966ae.tar.gz |
Refactor heartbeat timeout code to be simpler
Refactor the heartbeat timeout code to hopefully simplify it and hopefully make
it less hairy to deal with in the future.
-rw-r--r-- | librabbitmq/amqp_api.c | 24 | ||||
-rw-r--r-- | librabbitmq/amqp_connection.c | 33 | ||||
-rw-r--r-- | librabbitmq/amqp_private.h | 23 | ||||
-rw-r--r-- | librabbitmq/amqp_socket.c | 101 | ||||
-rw-r--r-- | librabbitmq/amqp_time.c | 55 | ||||
-rw-r--r-- | librabbitmq/amqp_time.h | 24 |
6 files changed, 140 insertions, 120 deletions
diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c index ec2f9a1..3088b10 100644 --- a/librabbitmq/amqp_api.c +++ b/librabbitmq/amqp_api.c @@ -191,19 +191,17 @@ int amqp_basic_publish(amqp_connection_state_t state, m.immediate = immediate; m.ticket = 0; - if (amqp_heartbeat_enabled(state)) { - uint64_t current_timestamp = amqp_get_monotonic_timestamp(); - if (0 == current_timestamp) { - return AMQP_STATUS_TIMER_FAILURE; - } - - if (current_timestamp > state->next_recv_heartbeat) { - res = amqp_try_recv(state); - if (AMQP_STATUS_TIMEOUT == res) { - return AMQP_STATUS_HEARTBEAT_TIMEOUT; - } else if (AMQP_STATUS_OK != res) { - return res; - } + /* TODO(alanxz): this heartbeat check is happening in the wrong place, it + * should really be done in amqp_try_send/writev */ + res = amqp_time_has_past(state->next_recv_heartbeat); + if (AMQP_STATUS_TIMER_FAILURE == res) { + return res; + } else if (AMQP_STATUS_TIMEOUT == res) { + res = amqp_try_recv(state); + if (AMQP_STATUS_TIMEOUT == res) { + return AMQP_STATUS_HEARTBEAT_TIMEOUT; + } else if (AMQP_STATUS_OK != res) { + return res; } } diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c index 6fb2a62..9175f95 100644 --- a/librabbitmq/amqp_connection.c +++ b/librabbitmq/amqp_connection.c @@ -139,20 +139,27 @@ int amqp_tune_connection(amqp_connection_state_t state, int heartbeat) { void *newbuf; + int res; ENFORCE_STATE(state, CONNECTION_STATE_IDLE); state->channel_max = channel_max; state->frame_max = frame_max; + state->heartbeat = heartbeat; + if (0 > state->heartbeat) { + state->heartbeat = 0; + } - if (amqp_heartbeat_enabled(state)) { - uint64_t current_time = amqp_get_monotonic_timestamp(); - if (0 == current_time) { - return AMQP_STATUS_TIMER_FAILURE; - } - state->next_send_heartbeat = amqp_calc_next_send_heartbeat(state, current_time); - state->next_recv_heartbeat = amqp_calc_next_recv_heartbeat(state, current_time); + res = amqp_time_s_from_now(&state->next_send_heartbeat, + amqp_heartbeat_send(state)); + if (AMQP_STATUS_OK != res) { + return res; + } + res = amqp_time_s_from_now(&state->next_recv_heartbeat, + amqp_heartbeat_recv(state)); + if (AMQP_STATUS_OK != res) { + return res; } state->outbound_buffer.len = frame_max; @@ -527,17 +534,15 @@ int amqp_send_frame(amqp_connection_state_t state, res = amqp_try_send(state, out_frame, out_frame_len + HEADER_SIZE + FOOTER_SIZE); } - - if (state->heartbeat > 0) { - uint64_t current_time = amqp_get_monotonic_timestamp(); - if (0 == current_time) { - return AMQP_STATUS_TIMER_FAILURE; - } - state->next_send_heartbeat = amqp_calc_next_send_heartbeat(state, current_time); + if (AMQP_STATUS_OK != res) { + return res; } + res = amqp_time_s_from_now(&state->next_send_heartbeat, + amqp_heartbeat_send(state)); return res; } + amqp_table_t * amqp_get_server_properties(amqp_connection_state_t state) { diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h index 62c8ff0..9497cc8 100644 --- a/librabbitmq/amqp_private.h +++ b/librabbitmq/amqp_private.h @@ -156,7 +156,13 @@ struct amqp_connection_state_t_ { int channel_max; int frame_max; + + /* Heartbeat interval in seconds. If this is <= 0, then heartbeats are not + * enabled, and next_recv_heartbeat and next_send_heartbeat are set to + * infinite */ int heartbeat; + amqp_time_t next_recv_heartbeat; + amqp_time_t next_send_heartbeat; /* buffer for holding frame headers. Allows us to delay allocating * the raw frame buffer until the type, channel, and size are all known @@ -180,9 +186,6 @@ struct amqp_connection_state_t_ { amqp_rpc_reply_t most_recent_api_result; - uint64_t next_recv_heartbeat; - uint64_t next_send_heartbeat; - amqp_table_t server_properties; amqp_pool_t properties_pool; }; @@ -190,19 +193,13 @@ struct amqp_connection_state_t_ { amqp_pool_t *amqp_get_or_create_channel_pool(amqp_connection_state_t connection, amqp_channel_t channel); amqp_pool_t *amqp_get_channel_pool(amqp_connection_state_t state, amqp_channel_t channel); -static inline amqp_boolean_t amqp_heartbeat_enabled(amqp_connection_state_t state) -{ - return (state->heartbeat > 0); -} -static inline uint64_t amqp_calc_next_send_heartbeat(amqp_connection_state_t state, uint64_t cur) -{ - return cur + ((uint64_t)state->heartbeat * AMQP_NS_PER_S); +static inline int amqp_heartbeat_send(amqp_connection_state_t state) { + return state->heartbeat; } -static inline uint64_t amqp_calc_next_recv_heartbeat(amqp_connection_state_t state, uint64_t cur) -{ - return cur + ((uint64_t)state->heartbeat * 2 * AMQP_NS_PER_S); +static inline int amqp_heartbeat_recv(amqp_connection_state_t state) { + return 2 * state->heartbeat; } int amqp_try_recv(amqp_connection_state_t state); diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index c3a3000..6785187 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -728,14 +728,11 @@ start_recv: state->sock_inbound_limit = res; state->sock_inbound_offset = 0; - if (amqp_heartbeat_enabled(state)) { - uint64_t current_time = amqp_get_monotonic_timestamp(); - if (0 == current_time) { - return AMQP_STATUS_TIMER_FAILURE; - } - state->next_recv_heartbeat = amqp_calc_next_recv_heartbeat(state, current_time); + res = amqp_time_s_from_now(&state->next_recv_heartbeat, + amqp_heartbeat_recv(state)); + if (AMQP_STATUS_OK != res) { + return res; } - return AMQP_STATUS_OK; } @@ -789,14 +786,13 @@ static int wait_frame_inner(amqp_connection_state_t state, amqp_frame_t *decoded_frame, struct timeval *timeout) { - uint64_t current_timestamp = 0; - uint64_t timeout_timestamp = 0; - uint64_t next_timestamp = 0; - struct timeval tv; amqp_time_t deadline; + amqp_time_t timeout_deadline; + int res; - if (timeout && (timeout->tv_sec < 0 || timeout->tv_usec < 0)) { - return AMQP_STATUS_INVALID_PARAMETER; + res = amqp_time_from_now(&timeout_deadline, timeout); + if (AMQP_STATUS_OK != res) { + return res; } while (1) { @@ -821,85 +817,34 @@ static int wait_frame_inner(amqp_connection_state_t state, } beginrecv: - if (timeout || amqp_heartbeat_enabled(state)) { - uint64_t ns_until_next_timeout; - - current_timestamp = amqp_get_monotonic_timestamp(); - if (0 == current_timestamp) { - return AMQP_STATUS_TIMER_FAILURE; - } - - if (amqp_heartbeat_enabled(state) && current_timestamp > state->next_send_heartbeat) { - amqp_frame_t heartbeat; - heartbeat.channel = 0; - heartbeat.frame_type = AMQP_FRAME_HEARTBEAT; - - res = amqp_send_frame(state, &heartbeat); - if (AMQP_STATUS_OK != res) { - return res; - } - - current_timestamp = amqp_get_monotonic_timestamp(); - if (0 == current_timestamp) { - return AMQP_STATUS_TIMER_FAILURE; - } - } - - if (timeout) { - if (0 == timeout_timestamp) { - timeout_timestamp = current_timestamp + - (uint64_t)timeout->tv_sec * AMQP_NS_PER_S + - (uint64_t)timeout->tv_usec * AMQP_NS_PER_US; - } - - if (current_timestamp > timeout_timestamp) { - return AMQP_STATUS_TIMEOUT; - } - } - - if (amqp_heartbeat_enabled(state)) { - if (current_timestamp > state->next_recv_heartbeat) { - state->next_recv_heartbeat = current_timestamp; - } - next_timestamp = (state->next_recv_heartbeat < state->next_send_heartbeat ? - state->next_recv_heartbeat : - state->next_send_heartbeat); - if (timeout) { - next_timestamp = (timeout_timestamp < next_timestamp ? - timeout_timestamp : next_timestamp); - } - } else if (timeout) { - next_timestamp = timeout_timestamp; - } else { - amqp_abort("Internal error: both timeout == NULL && state->heartbeat == 0"); - } - - ns_until_next_timeout = next_timestamp - current_timestamp; - - memset(&tv, 0, sizeof(struct timeval)); - tv.tv_sec = ns_until_next_timeout / AMQP_NS_PER_S; - tv.tv_usec = (ns_until_next_timeout % AMQP_NS_PER_S) / AMQP_NS_PER_US; + res = amqp_time_has_past(state->next_send_heartbeat); + if (AMQP_STATUS_TIMER_FAILURE == res) { + return res; + } else if (AMQP_STATUS_TIMEOUT == res) { + amqp_frame_t heartbeat; + heartbeat.channel = 0; + heartbeat.frame_type = AMQP_FRAME_HEARTBEAT; - /* TODO: refactor the above so that this doesn't require a timer ping */ - res = amqp_time_from_now(&deadline, &tv); + res = amqp_send_frame(state, &heartbeat); if (AMQP_STATUS_OK != res) { return res; } - } else { - deadline = amqp_time_infinite(); } + deadline = amqp_time_first(timeout_deadline, + amqp_time_first(state->next_recv_heartbeat, + state->next_send_heartbeat)); /* TODO this needs to wait for a _frame_ and not anything written from the * socket */ res = recv_with_timeout(state, deadline); if (AMQP_STATUS_TIMEOUT == res) { - if (next_timestamp == state->next_recv_heartbeat) { + if (amqp_time_equal(deadline, state->next_recv_heartbeat)) { amqp_socket_close(state->socket); return AMQP_STATUS_HEARTBEAT_TIMEOUT; - } else if (next_timestamp == timeout_timestamp) { + } else if (amqp_time_equal(deadline, timeout_deadline)) { return AMQP_STATUS_TIMEOUT; - } else if (next_timestamp == state->next_send_heartbeat) { + } else if (amqp_time_equal(deadline, state->next_send_heartbeat)) { /* send heartbeat happens before we do recv_with_timeout */ goto beginrecv; } else { diff --git a/librabbitmq/amqp_time.c b/librabbitmq/amqp_time.c index c1bd286..f083315 100644 --- a/librabbitmq/amqp_time.c +++ b/librabbitmq/amqp_time.c @@ -134,8 +134,31 @@ int amqp_time_from_now(amqp_time_t *time, struct timeval *timeout) { } time->time_point_ns = now_ns + delta_ns; - if (now_ns > time->time_point_ns || - delta_ns > time->time_point_ns) { + if (now_ns > time->time_point_ns || delta_ns > time->time_point_ns) { + return AMQP_STATUS_INVALID_PARAMETER; + } + + return AMQP_STATUS_OK; +} + +int amqp_time_s_from_now(amqp_time_t *time, int seconds) { + uint64_t now_ns; + uint64_t delta_ns; + assert(NULL != time); + + if (0 >= seconds) { + *time = amqp_time_infinite(); + return AMQP_STATUS_OK; + } + + now_ns = amqp_get_monotonic_timestamp(); + if (0 == now_ns) { + return AMQP_STATUS_TIMER_FAILURE; + } + + delta_ns = (uint64_t)seconds * AMQP_NS_PER_S; + time->time_point_ns = now_ns + delta_ns; + if (now_ns > time->time_point_ns || delta_ns > time->time_point_ns) { return AMQP_STATUS_INVALID_PARAMETER; } @@ -180,3 +203,31 @@ int amqp_time_ms_until(amqp_time_t time) { return left_ms; } + +int amqp_time_has_past(amqp_time_t time) { + uint64_t now_ns; + if (UINT64_MAX == time.time_point_ns) { + return AMQP_STATUS_OK; + } + + now_ns = amqp_get_monotonic_timestamp(); + if (0 == now_ns) { + return AMQP_STATUS_TIMER_FAILURE; + } + + if (now_ns > time.time_point_ns) { + return AMQP_STATUS_TIMEOUT; + } + return AMQP_STATUS_OK; +} + +amqp_time_t amqp_time_first(amqp_time_t l, amqp_time_t r) { + if (l.time_point_ns < r.time_point_ns) { + return l; + } + return r; +} + +int amqp_time_equal(amqp_time_t l, amqp_time_t r) { + return l.time_point_ns == r.time_point_ns; +} diff --git a/librabbitmq/amqp_time.h b/librabbitmq/amqp_time.h index fa81384..1bafd50 100644 --- a/librabbitmq/amqp_time.h +++ b/librabbitmq/amqp_time.h @@ -74,6 +74,15 @@ uint64_t amqp_get_monotonic_timestamp(void); */ int amqp_time_from_now(amqp_time_t *time, struct timeval *timeout); +/* Get a amqp_time_t that is seconds from now. + * If seconds <= 0, then amqp_time_infinite() is created. + * + * Returns AMQP_STATUS_OK on success. + * AMQP_STATUS_TIMER_FAILURE if the underlying call to get the current timestamp + * fails. + */ +int amqp_time_s_from_now(amqp_time_t *time, int seconds); + /* Create an immediate amqp_time_t */ amqp_time_t amqp_time_immediate(void); @@ -91,4 +100,19 @@ amqp_time_t amqp_time_infinite(void); */ int amqp_time_ms_until(amqp_time_t time); +/* Test whether current time is past the provided time. + * + * TODO: this isn't a great interface to use. Fix this. + * + * Return AMQP_STATUS_OK if time has not past + * Return AMQP_STATUS_TIMEOUT if time has past + * Return AMQP_STATUS_TIMER_FAILURE if the underlying call to get the current + * timestamp fails. + */ +int amqp_time_has_past(amqp_time_t time); + +/* Return the time value that happens first */ +amqp_time_t amqp_time_first(amqp_time_t l, amqp_time_t r); + +int amqp_time_equal(amqp_time_t l, amqp_time_t r); #endif /* AMQP_TIMER_H */ |