From 7f92d532360dd254bf0484085e7a51b812cc578f Mon Sep 17 00:00:00 2001 From: Haster Date: Mon, 10 Oct 2016 01:11:18 +0400 Subject: Lib: add timeout for amqp_login and friends By default the RabbitMQ broker sets a tunable timeout of 10 seconds from socket-open to successful handshake. This introduces a similar login timeout on the client side. If the login does not complete within this timeout, amqp_login and friends will return AMQP_STATUS_TIMEOUT and the connection will be considered dead. Two new functions amqp_set_handshake_timeout and amqp_get_handshake_timeout are introduced to tune this behavior. --- librabbitmq/amqp_connection.c | 31 +++++++++++++++++++++++++------ 1 file changed, 25 insertions(+), 6 deletions(-) (limited to 'librabbitmq/amqp_connection.c') diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c index 8e86f78..85a248d 100644 --- a/librabbitmq/amqp_connection.c +++ b/librabbitmq/amqp_connection.c @@ -59,6 +59,10 @@ #define AMQP_INITIAL_INBOUND_SOCK_BUFFER_SIZE 131072 #endif +#ifndef AMQP_DEFAULT_LOGIN_TIMEOUT_SEC +#define AMQP_DEFAULT_LOGIN_TIMEOUT_SEC 12 +#endif + #define ENFORCE_STATE(statevec, statenum) \ { \ amqp_connection_state_t _check_state = (statevec); \ @@ -101,6 +105,11 @@ amqp_connection_state_t amqp_new_connection(void) init_amqp_pool(&state->properties_pool, 512); + /* Use address of the internal_handshake_timeout object by default. */ + state->internal_handshake_timeout.tv_sec = AMQP_DEFAULT_LOGIN_TIMEOUT_SEC; + state->internal_handshake_timeout.tv_usec = 0; + state->handshake_timeout = &state->internal_handshake_timeout; + return state; out_nomem: @@ -537,14 +546,17 @@ static int amqp_frame_to_bytes(const amqp_frame_t *frame, amqp_bytes_t buffer, int amqp_send_frame(amqp_connection_state_t state, const amqp_frame_t *frame) { - return amqp_send_frame_inner(state, frame, AMQP_SF_NONE); + return amqp_send_frame_inner(state, frame, AMQP_SF_NONE, + amqp_time_infinite()); } int amqp_send_frame_inner(amqp_connection_state_t state, - const amqp_frame_t *frame, int flags) { + const amqp_frame_t *frame, int flags, + amqp_time_t deadline) { int res; ssize_t sent; amqp_bytes_t encoded; + amqp_time_t next_timeout; /* TODO: if the AMQP_SF_MORE socket optimization can be shown to work * correctly, then this could be un-done so that body-frames are sent as 3 @@ -557,15 +569,22 @@ int amqp_send_frame_inner(amqp_connection_state_t state, } start_send: - sent = amqp_try_send(state, encoded.bytes, encoded.len, - state->next_recv_heartbeat, flags); + + next_timeout = amqp_time_first(deadline, state->next_recv_heartbeat); + + sent = amqp_try_send(state, encoded.bytes, encoded.len, next_timeout, flags); if (0 > sent) { return (int)sent; } - /* A partial send has occurred, because of a heartbeat timeout, try and recv - * something */ + /* A partial send has occurred, because of a heartbeat timeout (so try recv + * something) or common timeout (so return AMQP_STATUS_TIMEOUT) */ if ((ssize_t)encoded.len != sent) { + if (amqp_time_equal(next_timeout, deadline)) { + /* timeout of method was received, so return from method*/ + return AMQP_STATUS_TIMEOUT; + } + res = amqp_try_recv(state); if (AMQP_STATUS_TIMEOUT == res) { -- cgit v1.2.1