diff options
author | Alan Antonuk <alan.antonuk@gmail.com> | 2017-03-01 21:29:16 -0800 |
---|---|---|
committer | Alan Antonuk <alan.antonuk@gmail.com> | 2017-03-15 23:47:41 -0700 |
commit | baabb2addee3047517e25b4723c48db852255d6e (patch) | |
tree | 6dd153ef7bd47a70360253d1cb378f60841c600d | |
parent | a13b8209c3f7ef8ba1396bab1b4e80b7eb9d372c (diff) | |
download | rabbitmq-c-rpc-timeout.tar.gz |
Lib: add methods to set timeout of AMQP RPCsrpc-timeout
Fixes #403
-rw-r--r-- | librabbitmq/amqp.h | 58 | ||||
-rw-r--r-- | librabbitmq/amqp_api.c | 18 | ||||
-rw-r--r-- | librabbitmq/amqp_private.h | 2 | ||||
-rw-r--r-- | librabbitmq/amqp_socket.c | 45 |
4 files changed, 104 insertions, 19 deletions
diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h index c6e20c4..e803019 100644 --- a/librabbitmq/amqp.h +++ b/librabbitmq/amqp.h @@ -2493,6 +2493,64 @@ AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_set_handshake_timeout(amqp_connection_state_t state, struct timeval *timeout); +/** + * Get the RPC timeout + * + * Gets the timeout for any RPC-style AMQP command (e.g., amqp_queue_declare). + * This timeout may be changed at any time by calling \amqp_set_rpc_timeout + * function with a new timeout. The timeout applies individually to each RPC + * that is made. + * + * The default value is NULL, or an infinite timeout. + * + * When an RPC times out, the function will return an error AMQP_STATUS_TIMEOUT, + * and the connection will be closed. + * + *\warning RPC-timeouts are an advanced feature intended to be used to detect + * dead connections quickly when the rabbitmq-c implementation of heartbeats + * does not work. Do not use RPC timeouts unless you understand the implications + * of doing so. + * + * \param [in] state the connection object + * \return a struct timeval representing the current RPC timeout for the state + * object. A NULL value represents an infinite timeout. The memory returned is + * owned by the connection object. + * + * \since v0.9.0 + */ +AMQP_PUBLIC_FUNCTION +struct timeval *AMQP_CALL amqp_get_rpc_timeout(amqp_connection_state_t state); + +/** + * Set the RPC timeout + * + * Sets the timeout for any RPC-style AMQP command (e.g., amqp_queue_declare). + * This timeout may be changed at any time by calling this function with a new + * timeout. The timeout applies individually to each RPC that is made. + * + * The default value is NULL, or an infinite timeout. + * + * When an RPC times out, the function will return an error AMQP_STATUS_TIMEOUT, + * and the connection will be closed. + * + *\warning RPC-timeouts are an advanced feature intended to be used to detect + * dead connections quickly when the rabbitmq-c implementation of heartbeats + * does not work. Do not use RPC timeouts unless you understand the implications + * of doing so. + * + * \param [in] state the connection object + * \param [in] timeout a struct timeval* representing new RPC timeout for the + * state object. NULL represents an infinite timeout. The value of timeout is + * copied internally, the caller is responsible for ownership of the passed + * pointer, it does not need to remain valid after this function is called. + * \return AMQP_STATUS_SUCCESS on success. + * + * \since v0.9.0 + */ +AMQP_PUBLIC_FUNCTION +int AMQP_CALL amqp_set_rpc_timeout(amqp_connection_state_t state, + struct timeval *timeout); + AMQP_END_DECLS diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c index 72bd97a..cafc2f7 100644 --- a/librabbitmq/amqp_api.c +++ b/librabbitmq/amqp_api.c @@ -373,3 +373,21 @@ int amqp_set_handshake_timeout(amqp_connection_state_t state, return AMQP_STATUS_OK; } + +struct timeval * amqp_get_rpc_timeout(amqp_connection_state_t state) { + return state->rpc_timeout; +} + +int amqp_set_rpc_timeout(amqp_connection_state_t state, + struct timeval *timeout) { + if (timeout) { + if (timeout->tv_sec < 0 || timeout->tv_usec < 0) { + return AMQP_STATUS_INVALID_PARAMETER; + } + state->rpc_timeout = &state->internal_rpc_timeout; + *state->rpc_timeout = *timeout; + } else { + state->rpc_timeout = NULL; + } + return AMQP_STATUS_OK; +} diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h index d4f9171..31ce4ea 100644 --- a/librabbitmq/amqp_private.h +++ b/librabbitmq/amqp_private.h @@ -196,6 +196,8 @@ struct amqp_connection_state_t_ { struct timeval *handshake_timeout; struct timeval internal_handshake_timeout; + struct timeval *rpc_timeout; + struct timeval internal_rpc_timeout; }; amqp_pool_t *amqp_get_or_create_channel_pool(amqp_connection_state_t connection, amqp_channel_t channel); diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index 22d66de..a63a0b6 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -1088,9 +1088,10 @@ static amqp_rpc_reply_t simple_rpc_inner( retry: status = wait_frame_inner(state, &frame, deadline); if (status < 0) { - result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; - result.library_error = status; - return result; + if (status == AMQP_STATUS_TIMEOUT) { + amqp_socket_close(state->socket, AMQP_SC_FORCE); + } + return amqp_rpc_reply_error(status); } /* @@ -1156,18 +1157,33 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, amqp_method_number_t request_id, amqp_method_number_t *expected_reply_ids, void *decoded_request_method) { + amqp_time_t deadline; + int res; + + res = amqp_time_from_now(&deadline, state->rpc_timeout); + if (res != AMQP_STATUS_OK) { + return amqp_rpc_reply_error(res); + } + return simple_rpc_inner(state, channel, request_id, expected_reply_ids, - decoded_request_method, amqp_time_infinite()); + decoded_request_method, deadline); } -static void *simple_rpc_decoded_inner(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_method_number_t request_id, - amqp_method_number_t reply_id, - void *decoded_request_method, - amqp_time_t deadline) { +void *amqp_simple_rpc_decoded(amqp_connection_state_t state, + amqp_channel_t channel, + amqp_method_number_t request_id, + amqp_method_number_t reply_id, + void *decoded_request_method) { + amqp_time_t deadline; + int res; amqp_method_number_t replies[2]; + res = amqp_time_from_now(&deadline, state->rpc_timeout); + if (res != AMQP_STATUS_OK) { + state->most_recent_api_result = amqp_rpc_reply_error(res); + return NULL; + } + replies[0] = reply_id; replies[1] = 0; @@ -1181,15 +1197,6 @@ static void *simple_rpc_decoded_inner(amqp_connection_state_t state, } } -void *amqp_simple_rpc_decoded(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_method_number_t request_id, - amqp_method_number_t reply_id, - void *decoded_request_method) { - return simple_rpc_decoded_inner(state, channel, request_id, reply_id, - decoded_request_method, amqp_time_infinite()); -} - amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state) { return state->most_recent_api_result; |