diff options
-rw-r--r-- | librabbitmq/amqp.h | 50 | ||||
-rw-r--r-- | librabbitmq/amqp_api.c | 25 | ||||
-rw-r--r-- | librabbitmq/amqp_connection.c | 31 | ||||
-rw-r--r-- | librabbitmq/amqp_private.h | 6 | ||||
-rw-r--r-- | librabbitmq/amqp_socket.c | 162 | ||||
-rw-r--r-- | librabbitmq/amqp_socket.h | 3 |
6 files changed, 205 insertions, 72 deletions
diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h index 7f031d2..03cf565 100644 --- a/librabbitmq/amqp.h +++ b/librabbitmq/amqp.h @@ -2443,6 +2443,56 @@ amqp_table_t * AMQP_CALL amqp_get_client_properties(amqp_connection_state_t state); +/** + * Get the login handshake timeout. + * + * amqp_login and amqp_login_with_properties perform the login handshake with + * the broker. This function returns the timeout associated with completing + * this operation from the client side. This value can be set by using the + * amqp_set_handshake_timeout. + * + * Note that the RabbitMQ broker has configurable timeout for completing the + * login handshake, the default is 10 seconds. rabbitmq-c has a default of 12 + * seconds. + * + * \param [in] state the connection object + * \return a struct timeval representing the current login timeout for the state + * object. A NULL value represents an infinite timeout. The memory returned is + * owned by the connection object. + * + * \since v0.9.0 + */ +AMQP_PUBLIC_FUNCTION +struct timeval *AMQP_CALL + amqp_get_handshake_timeout(amqp_connection_state_t state); + +/** + * Set the login handshake timeout. + * + * amqp_login and amqp_login_with_properties perform the login handshake with + * the broker. This function sets the timeout associated with completing this + * operation from the client side. + * + * The timeout must be set before amqp_login or amqp_login_with_properties is + * called to change from the default timeout. + * + * Note that the RabbitMQ broker has a configurable timeout for completing the + * login handshake, the default is 10 seconds. rabbitmq-c has a default of 12 + * seconds. + * + * \param [in] state the connection object + * \param [in] timeout a struct timetval* representing new login timeout for the + * state object. NULL represents an infinite timeout. The value of timeout is + * copied internally, the caller is responsible for ownership of the passed in + * pointer, it does not need to remain valid after this function is called. + * \return AMQP_STATUS_OK on success. + * + * \since v0.9.0 + */ +AMQP_PUBLIC_FUNCTION +int AMQP_CALL amqp_set_handshake_timeout(amqp_connection_state_t state, + struct timeval *timeout); + AMQP_END_DECLS diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c index ab60ee4..72bd97a 100644 --- a/librabbitmq/amqp_api.c +++ b/librabbitmq/amqp_api.c @@ -208,7 +208,7 @@ int amqp_basic_publish(amqp_connection_state_t state, } res = amqp_send_method_inner(state, channel, AMQP_BASIC_PUBLISH_METHOD, &m, - AMQP_SF_MORE); + AMQP_SF_MORE, amqp_time_infinite()); if (res < 0) { return res; } @@ -224,7 +224,7 @@ int amqp_basic_publish(amqp_connection_state_t state, f.payload.properties.body_size = body.len; f.payload.properties.decoded = (void *) properties; - res = amqp_send_frame_inner(state, &f, AMQP_SF_MORE); + res = amqp_send_frame_inner(state, &f, AMQP_SF_MORE, amqp_time_infinite()); if (res < 0) { return res; } @@ -250,7 +250,7 @@ int amqp_basic_publish(amqp_connection_state_t state, } body_offset += f.payload.body_fragment.len; - res = amqp_send_frame_inner(state, &f, flagz); + res = amqp_send_frame_inner(state, &f, flagz, amqp_time_infinite()); if (res < 0) { return res; } @@ -354,3 +354,22 @@ int amqp_basic_nack(amqp_connection_state_t state, amqp_channel_t channel, req.requeue = requeue; return amqp_send_method(state, channel, AMQP_BASIC_NACK_METHOD, &req); } + +struct timeval *amqp_get_handshake_timeout(amqp_connection_state_t state) { + return state->handshake_timeout; +} + +int amqp_set_handshake_timeout(amqp_connection_state_t state, + struct timeval *timeout) { + if (timeout) { + if (timeout->tv_sec < 0 || timeout->tv_usec < 0) { + return AMQP_STATUS_INVALID_PARAMETER; + } + state->internal_handshake_timeout = *timeout; + state->handshake_timeout = &state->internal_handshake_timeout; + } else { + state->handshake_timeout = NULL; + } + + return AMQP_STATUS_OK; +} diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c index 8e86f78..85a248d 100644 --- a/librabbitmq/amqp_connection.c +++ b/librabbitmq/amqp_connection.c @@ -59,6 +59,10 @@ #define AMQP_INITIAL_INBOUND_SOCK_BUFFER_SIZE 131072 #endif +#ifndef AMQP_DEFAULT_LOGIN_TIMEOUT_SEC +#define AMQP_DEFAULT_LOGIN_TIMEOUT_SEC 12 +#endif + #define ENFORCE_STATE(statevec, statenum) \ { \ amqp_connection_state_t _check_state = (statevec); \ @@ -101,6 +105,11 @@ amqp_connection_state_t amqp_new_connection(void) init_amqp_pool(&state->properties_pool, 512); + /* Use address of the internal_handshake_timeout object by default. */ + state->internal_handshake_timeout.tv_sec = AMQP_DEFAULT_LOGIN_TIMEOUT_SEC; + state->internal_handshake_timeout.tv_usec = 0; + state->handshake_timeout = &state->internal_handshake_timeout; + return state; out_nomem: @@ -537,14 +546,17 @@ static int amqp_frame_to_bytes(const amqp_frame_t *frame, amqp_bytes_t buffer, int amqp_send_frame(amqp_connection_state_t state, const amqp_frame_t *frame) { - return amqp_send_frame_inner(state, frame, AMQP_SF_NONE); + return amqp_send_frame_inner(state, frame, AMQP_SF_NONE, + amqp_time_infinite()); } int amqp_send_frame_inner(amqp_connection_state_t state, - const amqp_frame_t *frame, int flags) { + const amqp_frame_t *frame, int flags, + amqp_time_t deadline) { int res; ssize_t sent; amqp_bytes_t encoded; + amqp_time_t next_timeout; /* TODO: if the AMQP_SF_MORE socket optimization can be shown to work * correctly, then this could be un-done so that body-frames are sent as 3 @@ -557,15 +569,22 @@ int amqp_send_frame_inner(amqp_connection_state_t state, } start_send: - sent = amqp_try_send(state, encoded.bytes, encoded.len, - state->next_recv_heartbeat, flags); + + next_timeout = amqp_time_first(deadline, state->next_recv_heartbeat); + + sent = amqp_try_send(state, encoded.bytes, encoded.len, next_timeout, flags); if (0 > sent) { return (int)sent; } - /* A partial send has occurred, because of a heartbeat timeout, try and recv - * something */ + /* A partial send has occurred, because of a heartbeat timeout (so try recv + * something) or common timeout (so return AMQP_STATUS_TIMEOUT) */ if ((ssize_t)encoded.len != sent) { + if (amqp_time_equal(next_timeout, deadline)) { + /* timeout of method was received, so return from method*/ + return AMQP_STATUS_TIMEOUT; + } + res = amqp_try_recv(state); if (AMQP_STATUS_TIMEOUT == res) { diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h index 219b1be..d4f9171 100644 --- a/librabbitmq/amqp_private.h +++ b/librabbitmq/amqp_private.h @@ -193,6 +193,9 @@ struct amqp_connection_state_t_ { amqp_table_t server_properties; amqp_table_t client_properties; amqp_pool_t properties_pool; + + struct timeval *handshake_timeout; + struct timeval internal_handshake_timeout; }; amqp_pool_t *amqp_get_or_create_channel_pool(amqp_connection_state_t connection, amqp_channel_t channel); @@ -388,5 +391,6 @@ static inline amqp_rpc_reply_t amqp_rpc_reply_error(amqp_status_enum status) { } int amqp_send_frame_inner(amqp_connection_state_t state, - const amqp_frame_t *frame, int flags); + const amqp_frame_t *frame, int flags, + amqp_time_t deadline); #endif diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index 723dffa..0c7c052 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -541,22 +541,25 @@ int amqp_open_socket_inner(char const *hostname, return sockfd; } -int amqp_send_header(amqp_connection_state_t state) -{ +static int send_header_inner(amqp_connection_state_t state, + amqp_time_t deadline) { ssize_t res; static const uint8_t header[8] = { 'A', 'M', 'Q', 'P', 0, AMQP_PROTOCOL_VERSION_MAJOR, AMQP_PROTOCOL_VERSION_MINOR, AMQP_PROTOCOL_VERSION_REVISION }; - res = amqp_try_send(state, header, sizeof(header), amqp_time_infinite(), - AMQP_SF_NONE); + res = amqp_try_send(state, header, sizeof(header), deadline, AMQP_SF_NONE); if (sizeof(header) == res) { return AMQP_STATUS_OK; } return (int)res; } +int amqp_send_header(amqp_connection_state_t state) { + return send_header_inner(state, amqp_time_infinite()); +} + static amqp_bytes_t sasl_method_name(amqp_sasl_method_enum method) { amqp_bytes_t res; @@ -793,17 +796,10 @@ int amqp_try_recv(amqp_connection_state_t state) { static int wait_frame_inner(amqp_connection_state_t state, amqp_frame_t *decoded_frame, - struct timeval *timeout) -{ + amqp_time_t timeout_deadline) { amqp_time_t deadline; - amqp_time_t timeout_deadline; int res; - res = amqp_time_from_now(&timeout_deadline, timeout); - if (AMQP_STATUS_OK != res) { - return res; - } - for (;;) { while (amqp_data_in_buffer(state)) { res = consume_one_frame(state, decoded_frame); @@ -949,7 +945,7 @@ int amqp_simple_wait_frame_on_channel(amqp_connection_state_t state, } for (;;) { - res = wait_frame_inner(state, decoded_frame, NULL); + res = wait_frame_inner(state, decoded_frame, amqp_time_infinite()); if (AMQP_STATUS_OK != res) { return res; @@ -976,6 +972,13 @@ int amqp_simple_wait_frame_noblock(amqp_connection_state_t state, amqp_frame_t *decoded_frame, struct timeval *timeout) { + amqp_time_t deadline; + + int res = amqp_time_from_now(&deadline, timeout); + if (AMQP_STATUS_OK != res) { + return res; + } + if (state->first_queued_frame != NULL) { amqp_frame_t *f = (amqp_frame_t *) state->first_queued_frame->data; state->first_queued_frame = state->first_queued_frame->next; @@ -985,16 +988,25 @@ int amqp_simple_wait_frame_noblock(amqp_connection_state_t state, *decoded_frame = *f; return AMQP_STATUS_OK; } else { - return wait_frame_inner(state, decoded_frame, timeout); + return wait_frame_inner(state, decoded_frame, deadline); } } static int amqp_simple_wait_method_list(amqp_connection_state_t state, amqp_channel_t expected_channel, amqp_method_number_t *expected_methods, + amqp_time_t deadline, amqp_method_t *output) { amqp_frame_t frame; - int res = amqp_simple_wait_frame(state, &frame); + struct timeval tv; + struct timeval *tvp; + + int res = amqp_time_tv_until(deadline, &tv, &tvp); + if (res != AMQP_STATUS_OK) { + return res; + } + + res = amqp_simple_wait_frame_noblock(state, &frame, tvp); if (AMQP_STATUS_OK != res) { return res; } @@ -1008,32 +1020,41 @@ static int amqp_simple_wait_method_list(amqp_connection_state_t state, return AMQP_STATUS_OK; } +static int simple_wait_method_inner(amqp_connection_state_t state, + amqp_channel_t expected_channel, + amqp_method_number_t expected_method, + amqp_time_t deadline, + amqp_method_t *output) { + amqp_method_number_t expected_methods[] = {expected_method, 0}; + return amqp_simple_wait_method_list(state, expected_channel, expected_methods, + deadline, output); +} + int amqp_simple_wait_method(amqp_connection_state_t state, amqp_channel_t expected_channel, amqp_method_number_t expected_method, amqp_method_t *output) { - amqp_method_number_t expected_methods[] = { 0, 0 }; - expected_methods[0] = expected_method; - return amqp_simple_wait_method_list(state, expected_channel, expected_methods, - output); + return simple_wait_method_inner(state, expected_channel, expected_method, + amqp_time_infinite(), output); } int amqp_send_method(amqp_connection_state_t state, amqp_channel_t channel, amqp_method_number_t id, void *decoded) { - return amqp_send_method_inner(state, channel, id, decoded, AMQP_SF_NONE); + return amqp_send_method_inner(state, channel, id, decoded, AMQP_SF_NONE, + amqp_time_infinite()); } int amqp_send_method_inner(amqp_connection_state_t state, amqp_channel_t channel, amqp_method_number_t id, - void *decoded, int flags) { + void *decoded, int flags, amqp_time_t deadline) { amqp_frame_t frame; frame.frame_type = AMQP_FRAME_METHOD; frame.channel = channel; frame.payload.method.id = id; frame.payload.method.decoded = decoded; - return amqp_send_frame_inner(state, &frame, flags); + return amqp_send_frame_inner(state, &frame, flags, deadline); } static int amqp_id_in_reply_list( amqp_method_number_t expected, amqp_method_number_t *list ) @@ -1047,12 +1068,10 @@ static int amqp_id_in_reply_list( amqp_method_number_t expected, amqp_method_num return 0; } -amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_method_number_t request_id, - amqp_method_number_t *expected_reply_ids, - void *decoded_request_method) -{ +static amqp_rpc_reply_t simple_rpc_inner( + amqp_connection_state_t state, amqp_channel_t channel, + amqp_method_number_t request_id, amqp_method_number_t *expected_reply_ids, + void *decoded_request_method, amqp_time_t deadline) { int status; amqp_rpc_reply_t result; @@ -1069,7 +1088,7 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, amqp_frame_t frame; retry: - status = wait_frame_inner(state, &frame, NULL); + status = wait_frame_inner(state, &frame, deadline); if (status < 0) { result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; result.library_error = status; @@ -1138,20 +1157,29 @@ retry: } } -void *amqp_simple_rpc_decoded(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_method_number_t request_id, - amqp_method_number_t reply_id, - void *decoded_request_method) -{ +amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, + amqp_channel_t channel, + amqp_method_number_t request_id, + amqp_method_number_t *expected_reply_ids, + void *decoded_request_method) { + return simple_rpc_inner(state, channel, request_id, expected_reply_ids, + decoded_request_method, amqp_time_infinite()); +} + +static void *simple_rpc_decoded_inner(amqp_connection_state_t state, + amqp_channel_t channel, + amqp_method_number_t request_id, + amqp_method_number_t reply_id, + void *decoded_request_method, + amqp_time_t deadline) { amqp_method_number_t replies[2]; replies[0] = reply_id; replies[1] = 0; - state->most_recent_api_result = amqp_simple_rpc(state, channel, - request_id, replies, - decoded_request_method); + state->most_recent_api_result = simple_rpc_inner( + state, channel, request_id, replies, decoded_request_method, deadline); + if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) { return state->most_recent_api_result.reply.decoded; } else { @@ -1159,6 +1187,15 @@ void *amqp_simple_rpc_decoded(amqp_connection_state_t state, } } +void *amqp_simple_rpc_decoded(amqp_connection_state_t state, + amqp_channel_t channel, + amqp_method_number_t request_id, + amqp_method_number_t reply_id, + void *decoded_request_method) { + return simple_rpc_decoded_inner(state, channel, request_id, reply_id, + decoded_request_method, amqp_time_infinite()); +} + amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state) { return state->most_recent_api_result; @@ -1225,15 +1262,10 @@ error_out: return res; } -static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state, - char const *vhost, - int channel_max, - int frame_max, - int heartbeat, - const amqp_table_t *client_properties, - amqp_sasl_method_enum sasl_method, - va_list vl) -{ +static amqp_rpc_reply_t amqp_login_inner( + amqp_connection_state_t state, char const *vhost, int channel_max, + int frame_max, int heartbeat, const amqp_table_t *client_properties, + struct timeval *timeout, amqp_sasl_method_enum sasl_method, va_list vl) { int res; amqp_method_t method; @@ -1246,6 +1278,7 @@ static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state, uint16_t server_heartbeat; amqp_rpc_reply_t result; + amqp_time_t deadline; if (channel_max < 0 || channel_max > UINT16_MAX) { return amqp_rpc_reply_error(AMQP_STATUS_INVALID_PARAMETER); @@ -1262,14 +1295,19 @@ static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state, } client_heartbeat = (uint16_t)heartbeat; - res = amqp_send_header(state); + res = amqp_time_from_now(&deadline, timeout); if (AMQP_STATUS_OK != res) { goto error_res; } - res = amqp_simple_wait_method(state, 0, AMQP_CONNECTION_START_METHOD, - &method); - if (res != AMQP_STATUS_OK) { + res = send_header_inner(state, deadline); + if (AMQP_STATUS_OK != res) { + goto error_res; + } + + res = simple_wait_method_inner(state, 0, AMQP_CONNECTION_START_METHOD, + deadline, &method); + if (AMQP_STATUS_OK != res) { goto error_res; } @@ -1354,7 +1392,8 @@ static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state, s.response = response_bytes; s.locale = amqp_cstring_bytes("en_US"); - res = amqp_send_method(state, 0, AMQP_CONNECTION_START_OK_METHOD, &s); + res = amqp_send_method_inner(state, 0, AMQP_CONNECTION_START_OK_METHOD, &s, + AMQP_SF_NONE, deadline); if (res < 0) { goto error_res; } @@ -1365,7 +1404,8 @@ static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state, { amqp_method_number_t expected[] = { AMQP_CONNECTION_TUNE_METHOD, AMQP_CONNECTION_CLOSE_METHOD, 0 }; - res = amqp_simple_wait_method_list(state, 0, expected, &method); + + res = amqp_simple_wait_method_list(state, 0, expected, deadline, &method); if (AMQP_STATUS_OK != res) { goto error_res; } @@ -1412,7 +1452,8 @@ static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state, s.channel_max = client_channel_max; s.heartbeat = client_heartbeat; - res = amqp_send_method(state, 0, AMQP_CONNECTION_TUNE_OK_METHOD, &s); + res = amqp_send_method_inner(state, 0, AMQP_CONNECTION_TUNE_OK_METHOD, &s, + AMQP_SF_NONE, deadline); if (res < 0) { goto error_res; } @@ -1427,11 +1468,8 @@ static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state, s.capabilities = amqp_empty_bytes; s.insist = 1; - result = amqp_simple_rpc(state, - 0, - AMQP_CONNECTION_OPEN_METHOD, - replies, - &s); + result = simple_rpc_inner(state, 0, AMQP_CONNECTION_OPEN_METHOD, replies, + &s, deadline); if (result.reply_type != AMQP_RESPONSE_NORMAL) { goto out; } @@ -1469,7 +1507,8 @@ amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, va_start(vl, sasl_method); ret = amqp_login_inner(state, vhost, channel_max, frame_max, heartbeat, - &amqp_empty_table, sasl_method, vl); + &amqp_empty_table, state->handshake_timeout, + sasl_method, vl); va_end(vl); @@ -1491,7 +1530,8 @@ amqp_rpc_reply_t amqp_login_with_properties(amqp_connection_state_t state, va_start(vl, sasl_method); ret = amqp_login_inner(state, vhost, channel_max, frame_max, heartbeat, - client_properties, sasl_method, vl); + client_properties, state->handshake_timeout, + sasl_method, vl); va_end(vl); diff --git a/librabbitmq/amqp_socket.h b/librabbitmq/amqp_socket.h index dffeec7..c2ec2a6 100644 --- a/librabbitmq/amqp_socket.h +++ b/librabbitmq/amqp_socket.h @@ -179,7 +179,8 @@ int amqp_poll(int fd, int event, amqp_time_t deadline); int amqp_send_method_inner(amqp_connection_state_t state, amqp_channel_t channel, amqp_method_number_t id, - void *decoded, int flags); + void *decoded, int flags, amqp_time_t deadline); + int amqp_queue_frame(amqp_connection_state_t state, amqp_frame_t *frame); |