summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Antonuk <alan.antonuk@gmail.com>2017-03-01 21:29:16 -0800
committerAlan Antonuk <alan.antonuk@gmail.com>2017-03-15 23:47:41 -0700
commitbaabb2addee3047517e25b4723c48db852255d6e (patch)
tree6dd153ef7bd47a70360253d1cb378f60841c600d
parenta13b8209c3f7ef8ba1396bab1b4e80b7eb9d372c (diff)
downloadrabbitmq-c-rpc-timeout.tar.gz
Lib: add methods to set timeout of AMQP RPCsrpc-timeout
Fixes #403
-rw-r--r--librabbitmq/amqp.h58
-rw-r--r--librabbitmq/amqp_api.c18
-rw-r--r--librabbitmq/amqp_private.h2
-rw-r--r--librabbitmq/amqp_socket.c45
4 files changed, 104 insertions, 19 deletions
diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h
index c6e20c4..e803019 100644
--- a/librabbitmq/amqp.h
+++ b/librabbitmq/amqp.h
@@ -2493,6 +2493,64 @@ AMQP_PUBLIC_FUNCTION
int AMQP_CALL amqp_set_handshake_timeout(amqp_connection_state_t state,
struct timeval *timeout);
+/**
+ * Get the RPC timeout
+ *
+ * Gets the timeout for any RPC-style AMQP command (e.g., amqp_queue_declare).
+ * This timeout may be changed at any time by calling \amqp_set_rpc_timeout
+ * function with a new timeout. The timeout applies individually to each RPC
+ * that is made.
+ *
+ * The default value is NULL, or an infinite timeout.
+ *
+ * When an RPC times out, the function will return an error AMQP_STATUS_TIMEOUT,
+ * and the connection will be closed.
+ *
+ *\warning RPC-timeouts are an advanced feature intended to be used to detect
+ * dead connections quickly when the rabbitmq-c implementation of heartbeats
+ * does not work. Do not use RPC timeouts unless you understand the implications
+ * of doing so.
+ *
+ * \param [in] state the connection object
+ * \return a struct timeval representing the current RPC timeout for the state
+ * object. A NULL value represents an infinite timeout. The memory returned is
+ * owned by the connection object.
+ *
+ * \since v0.9.0
+ */
+AMQP_PUBLIC_FUNCTION
+struct timeval *AMQP_CALL amqp_get_rpc_timeout(amqp_connection_state_t state);
+
+/**
+ * Set the RPC timeout
+ *
+ * Sets the timeout for any RPC-style AMQP command (e.g., amqp_queue_declare).
+ * This timeout may be changed at any time by calling this function with a new
+ * timeout. The timeout applies individually to each RPC that is made.
+ *
+ * The default value is NULL, or an infinite timeout.
+ *
+ * When an RPC times out, the function will return an error AMQP_STATUS_TIMEOUT,
+ * and the connection will be closed.
+ *
+ *\warning RPC-timeouts are an advanced feature intended to be used to detect
+ * dead connections quickly when the rabbitmq-c implementation of heartbeats
+ * does not work. Do not use RPC timeouts unless you understand the implications
+ * of doing so.
+ *
+ * \param [in] state the connection object
+ * \param [in] timeout a struct timeval* representing new RPC timeout for the
+ * state object. NULL represents an infinite timeout. The value of timeout is
+ * copied internally, the caller is responsible for ownership of the passed
+ * pointer, it does not need to remain valid after this function is called.
+ * \return AMQP_STATUS_SUCCESS on success.
+ *
+ * \since v0.9.0
+ */
+AMQP_PUBLIC_FUNCTION
+int AMQP_CALL amqp_set_rpc_timeout(amqp_connection_state_t state,
+ struct timeval *timeout);
+
AMQP_END_DECLS
diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c
index 72bd97a..cafc2f7 100644
--- a/librabbitmq/amqp_api.c
+++ b/librabbitmq/amqp_api.c
@@ -373,3 +373,21 @@ int amqp_set_handshake_timeout(amqp_connection_state_t state,
return AMQP_STATUS_OK;
}
+
+struct timeval * amqp_get_rpc_timeout(amqp_connection_state_t state) {
+ return state->rpc_timeout;
+}
+
+int amqp_set_rpc_timeout(amqp_connection_state_t state,
+ struct timeval *timeout) {
+ if (timeout) {
+ if (timeout->tv_sec < 0 || timeout->tv_usec < 0) {
+ return AMQP_STATUS_INVALID_PARAMETER;
+ }
+ state->rpc_timeout = &state->internal_rpc_timeout;
+ *state->rpc_timeout = *timeout;
+ } else {
+ state->rpc_timeout = NULL;
+ }
+ return AMQP_STATUS_OK;
+}
diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h
index d4f9171..31ce4ea 100644
--- a/librabbitmq/amqp_private.h
+++ b/librabbitmq/amqp_private.h
@@ -196,6 +196,8 @@ struct amqp_connection_state_t_ {
struct timeval *handshake_timeout;
struct timeval internal_handshake_timeout;
+ struct timeval *rpc_timeout;
+ struct timeval internal_rpc_timeout;
};
amqp_pool_t *amqp_get_or_create_channel_pool(amqp_connection_state_t connection, amqp_channel_t channel);
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index 22d66de..a63a0b6 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -1088,9 +1088,10 @@ static amqp_rpc_reply_t simple_rpc_inner(
retry:
status = wait_frame_inner(state, &frame, deadline);
if (status < 0) {
- result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
- result.library_error = status;
- return result;
+ if (status == AMQP_STATUS_TIMEOUT) {
+ amqp_socket_close(state->socket, AMQP_SC_FORCE);
+ }
+ return amqp_rpc_reply_error(status);
}
/*
@@ -1156,18 +1157,33 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
amqp_method_number_t request_id,
amqp_method_number_t *expected_reply_ids,
void *decoded_request_method) {
+ amqp_time_t deadline;
+ int res;
+
+ res = amqp_time_from_now(&deadline, state->rpc_timeout);
+ if (res != AMQP_STATUS_OK) {
+ return amqp_rpc_reply_error(res);
+ }
+
return simple_rpc_inner(state, channel, request_id, expected_reply_ids,
- decoded_request_method, amqp_time_infinite());
+ decoded_request_method, deadline);
}
-static void *simple_rpc_decoded_inner(amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_method_number_t request_id,
- amqp_method_number_t reply_id,
- void *decoded_request_method,
- amqp_time_t deadline) {
+void *amqp_simple_rpc_decoded(amqp_connection_state_t state,
+ amqp_channel_t channel,
+ amqp_method_number_t request_id,
+ amqp_method_number_t reply_id,
+ void *decoded_request_method) {
+ amqp_time_t deadline;
+ int res;
amqp_method_number_t replies[2];
+ res = amqp_time_from_now(&deadline, state->rpc_timeout);
+ if (res != AMQP_STATUS_OK) {
+ state->most_recent_api_result = amqp_rpc_reply_error(res);
+ return NULL;
+ }
+
replies[0] = reply_id;
replies[1] = 0;
@@ -1181,15 +1197,6 @@ static void *simple_rpc_decoded_inner(amqp_connection_state_t state,
}
}
-void *amqp_simple_rpc_decoded(amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_method_number_t request_id,
- amqp_method_number_t reply_id,
- void *decoded_request_method) {
- return simple_rpc_decoded_inner(state, channel, request_id, reply_id,
- decoded_request_method, amqp_time_infinite());
-}
-
amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state)
{
return state->most_recent_api_result;