summaryrefslogtreecommitdiff
path: root/librabbitmq/amqp_connection.c
diff options
context:
space:
mode:
authorHaster <haster2004@yandex.ru>2016-10-10 01:11:18 +0400
committerAlan Antonuk <alan.antonuk@gmail.com>2016-11-27 19:45:30 -0800
commit7f92d532360dd254bf0484085e7a51b812cc578f (patch)
treed94341d4393965ae1753d4e42f02876a419f8f8a /librabbitmq/amqp_connection.c
parent66e2842e4a59b62cccbf0654536976f8450803c2 (diff)
downloadrabbitmq-c-7f92d532360dd254bf0484085e7a51b812cc578f.tar.gz
Lib: add timeout for amqp_login and friendspr383
By default the RabbitMQ broker sets a tunable timeout of 10 seconds from socket-open to successful handshake. This introduces a similar login timeout on the client side. If the login does not complete within this timeout, amqp_login and friends will return AMQP_STATUS_TIMEOUT and the connection will be considered dead. Two new functions amqp_set_handshake_timeout and amqp_get_handshake_timeout are introduced to tune this behavior.
Diffstat (limited to 'librabbitmq/amqp_connection.c')
-rw-r--r--librabbitmq/amqp_connection.c31
1 files changed, 25 insertions, 6 deletions
diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c
index 8e86f78..85a248d 100644
--- a/librabbitmq/amqp_connection.c
+++ b/librabbitmq/amqp_connection.c
@@ -59,6 +59,10 @@
#define AMQP_INITIAL_INBOUND_SOCK_BUFFER_SIZE 131072
#endif
+#ifndef AMQP_DEFAULT_LOGIN_TIMEOUT_SEC
+#define AMQP_DEFAULT_LOGIN_TIMEOUT_SEC 12
+#endif
+
#define ENFORCE_STATE(statevec, statenum) \
{ \
amqp_connection_state_t _check_state = (statevec); \
@@ -101,6 +105,11 @@ amqp_connection_state_t amqp_new_connection(void)
init_amqp_pool(&state->properties_pool, 512);
+ /* Use address of the internal_handshake_timeout object by default. */
+ state->internal_handshake_timeout.tv_sec = AMQP_DEFAULT_LOGIN_TIMEOUT_SEC;
+ state->internal_handshake_timeout.tv_usec = 0;
+ state->handshake_timeout = &state->internal_handshake_timeout;
+
return state;
out_nomem:
@@ -537,14 +546,17 @@ static int amqp_frame_to_bytes(const amqp_frame_t *frame, amqp_bytes_t buffer,
int amqp_send_frame(amqp_connection_state_t state,
const amqp_frame_t *frame) {
- return amqp_send_frame_inner(state, frame, AMQP_SF_NONE);
+ return amqp_send_frame_inner(state, frame, AMQP_SF_NONE,
+ amqp_time_infinite());
}
int amqp_send_frame_inner(amqp_connection_state_t state,
- const amqp_frame_t *frame, int flags) {
+ const amqp_frame_t *frame, int flags,
+ amqp_time_t deadline) {
int res;
ssize_t sent;
amqp_bytes_t encoded;
+ amqp_time_t next_timeout;
/* TODO: if the AMQP_SF_MORE socket optimization can be shown to work
* correctly, then this could be un-done so that body-frames are sent as 3
@@ -557,15 +569,22 @@ int amqp_send_frame_inner(amqp_connection_state_t state,
}
start_send:
- sent = amqp_try_send(state, encoded.bytes, encoded.len,
- state->next_recv_heartbeat, flags);
+
+ next_timeout = amqp_time_first(deadline, state->next_recv_heartbeat);
+
+ sent = amqp_try_send(state, encoded.bytes, encoded.len, next_timeout, flags);
if (0 > sent) {
return (int)sent;
}
- /* A partial send has occurred, because of a heartbeat timeout, try and recv
- * something */
+ /* A partial send has occurred, because of a heartbeat timeout (so try recv
+ * something) or common timeout (so return AMQP_STATUS_TIMEOUT) */
if ((ssize_t)encoded.len != sent) {
+ if (amqp_time_equal(next_timeout, deadline)) {
+ /* timeout of method was received, so return from method*/
+ return AMQP_STATUS_TIMEOUT;
+ }
+
res = amqp_try_recv(state);
if (AMQP_STATUS_TIMEOUT == res) {