summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--librabbitmq/amqp.h50
-rw-r--r--librabbitmq/amqp_api.c25
-rw-r--r--librabbitmq/amqp_connection.c31
-rw-r--r--librabbitmq/amqp_private.h6
-rw-r--r--librabbitmq/amqp_socket.c162
-rw-r--r--librabbitmq/amqp_socket.h3
6 files changed, 205 insertions, 72 deletions
diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h
index 7f031d2..03cf565 100644
--- a/librabbitmq/amqp.h
+++ b/librabbitmq/amqp.h
@@ -2443,6 +2443,56 @@ amqp_table_t *
AMQP_CALL
amqp_get_client_properties(amqp_connection_state_t state);
+/**
+ * Get the login handshake timeout.
+ *
+ * amqp_login and amqp_login_with_properties perform the login handshake with
+ * the broker. This function returns the timeout associated with completing
+ * this operation from the client side. This value can be set by using the
+ * amqp_set_handshake_timeout.
+ *
+ * Note that the RabbitMQ broker has configurable timeout for completing the
+ * login handshake, the default is 10 seconds. rabbitmq-c has a default of 12
+ * seconds.
+ *
+ * \param [in] state the connection object
+ * \return a struct timeval representing the current login timeout for the state
+ * object. A NULL value represents an infinite timeout. The memory returned is
+ * owned by the connection object.
+ *
+ * \since v0.9.0
+ */
+AMQP_PUBLIC_FUNCTION
+struct timeval *AMQP_CALL
+ amqp_get_handshake_timeout(amqp_connection_state_t state);
+
+/**
+ * Set the login handshake timeout.
+ *
+ * amqp_login and amqp_login_with_properties perform the login handshake with
+ * the broker. This function sets the timeout associated with completing this
+ * operation from the client side.
+ *
+ * The timeout must be set before amqp_login or amqp_login_with_properties is
+ * called to change from the default timeout.
+ *
+ * Note that the RabbitMQ broker has a configurable timeout for completing the
+ * login handshake, the default is 10 seconds. rabbitmq-c has a default of 12
+ * seconds.
+ *
+ * \param [in] state the connection object
+ * \param [in] timeout a struct timetval* representing new login timeout for the
+ * state object. NULL represents an infinite timeout. The value of timeout is
+ * copied internally, the caller is responsible for ownership of the passed in
+ * pointer, it does not need to remain valid after this function is called.
+ * \return AMQP_STATUS_OK on success.
+ *
+ * \since v0.9.0
+ */
+AMQP_PUBLIC_FUNCTION
+int AMQP_CALL amqp_set_handshake_timeout(amqp_connection_state_t state,
+ struct timeval *timeout);
+
AMQP_END_DECLS
diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c
index ab60ee4..72bd97a 100644
--- a/librabbitmq/amqp_api.c
+++ b/librabbitmq/amqp_api.c
@@ -208,7 +208,7 @@ int amqp_basic_publish(amqp_connection_state_t state,
}
res = amqp_send_method_inner(state, channel, AMQP_BASIC_PUBLISH_METHOD, &m,
- AMQP_SF_MORE);
+ AMQP_SF_MORE, amqp_time_infinite());
if (res < 0) {
return res;
}
@@ -224,7 +224,7 @@ int amqp_basic_publish(amqp_connection_state_t state,
f.payload.properties.body_size = body.len;
f.payload.properties.decoded = (void *) properties;
- res = amqp_send_frame_inner(state, &f, AMQP_SF_MORE);
+ res = amqp_send_frame_inner(state, &f, AMQP_SF_MORE, amqp_time_infinite());
if (res < 0) {
return res;
}
@@ -250,7 +250,7 @@ int amqp_basic_publish(amqp_connection_state_t state,
}
body_offset += f.payload.body_fragment.len;
- res = amqp_send_frame_inner(state, &f, flagz);
+ res = amqp_send_frame_inner(state, &f, flagz, amqp_time_infinite());
if (res < 0) {
return res;
}
@@ -354,3 +354,22 @@ int amqp_basic_nack(amqp_connection_state_t state, amqp_channel_t channel,
req.requeue = requeue;
return amqp_send_method(state, channel, AMQP_BASIC_NACK_METHOD, &req);
}
+
+struct timeval *amqp_get_handshake_timeout(amqp_connection_state_t state) {
+ return state->handshake_timeout;
+}
+
+int amqp_set_handshake_timeout(amqp_connection_state_t state,
+ struct timeval *timeout) {
+ if (timeout) {
+ if (timeout->tv_sec < 0 || timeout->tv_usec < 0) {
+ return AMQP_STATUS_INVALID_PARAMETER;
+ }
+ state->internal_handshake_timeout = *timeout;
+ state->handshake_timeout = &state->internal_handshake_timeout;
+ } else {
+ state->handshake_timeout = NULL;
+ }
+
+ return AMQP_STATUS_OK;
+}
diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c
index 8e86f78..85a248d 100644
--- a/librabbitmq/amqp_connection.c
+++ b/librabbitmq/amqp_connection.c
@@ -59,6 +59,10 @@
#define AMQP_INITIAL_INBOUND_SOCK_BUFFER_SIZE 131072
#endif
+#ifndef AMQP_DEFAULT_LOGIN_TIMEOUT_SEC
+#define AMQP_DEFAULT_LOGIN_TIMEOUT_SEC 12
+#endif
+
#define ENFORCE_STATE(statevec, statenum) \
{ \
amqp_connection_state_t _check_state = (statevec); \
@@ -101,6 +105,11 @@ amqp_connection_state_t amqp_new_connection(void)
init_amqp_pool(&state->properties_pool, 512);
+ /* Use address of the internal_handshake_timeout object by default. */
+ state->internal_handshake_timeout.tv_sec = AMQP_DEFAULT_LOGIN_TIMEOUT_SEC;
+ state->internal_handshake_timeout.tv_usec = 0;
+ state->handshake_timeout = &state->internal_handshake_timeout;
+
return state;
out_nomem:
@@ -537,14 +546,17 @@ static int amqp_frame_to_bytes(const amqp_frame_t *frame, amqp_bytes_t buffer,
int amqp_send_frame(amqp_connection_state_t state,
const amqp_frame_t *frame) {
- return amqp_send_frame_inner(state, frame, AMQP_SF_NONE);
+ return amqp_send_frame_inner(state, frame, AMQP_SF_NONE,
+ amqp_time_infinite());
}
int amqp_send_frame_inner(amqp_connection_state_t state,
- const amqp_frame_t *frame, int flags) {
+ const amqp_frame_t *frame, int flags,
+ amqp_time_t deadline) {
int res;
ssize_t sent;
amqp_bytes_t encoded;
+ amqp_time_t next_timeout;
/* TODO: if the AMQP_SF_MORE socket optimization can be shown to work
* correctly, then this could be un-done so that body-frames are sent as 3
@@ -557,15 +569,22 @@ int amqp_send_frame_inner(amqp_connection_state_t state,
}
start_send:
- sent = amqp_try_send(state, encoded.bytes, encoded.len,
- state->next_recv_heartbeat, flags);
+
+ next_timeout = amqp_time_first(deadline, state->next_recv_heartbeat);
+
+ sent = amqp_try_send(state, encoded.bytes, encoded.len, next_timeout, flags);
if (0 > sent) {
return (int)sent;
}
- /* A partial send has occurred, because of a heartbeat timeout, try and recv
- * something */
+ /* A partial send has occurred, because of a heartbeat timeout (so try recv
+ * something) or common timeout (so return AMQP_STATUS_TIMEOUT) */
if ((ssize_t)encoded.len != sent) {
+ if (amqp_time_equal(next_timeout, deadline)) {
+ /* timeout of method was received, so return from method*/
+ return AMQP_STATUS_TIMEOUT;
+ }
+
res = amqp_try_recv(state);
if (AMQP_STATUS_TIMEOUT == res) {
diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h
index 219b1be..d4f9171 100644
--- a/librabbitmq/amqp_private.h
+++ b/librabbitmq/amqp_private.h
@@ -193,6 +193,9 @@ struct amqp_connection_state_t_ {
amqp_table_t server_properties;
amqp_table_t client_properties;
amqp_pool_t properties_pool;
+
+ struct timeval *handshake_timeout;
+ struct timeval internal_handshake_timeout;
};
amqp_pool_t *amqp_get_or_create_channel_pool(amqp_connection_state_t connection, amqp_channel_t channel);
@@ -388,5 +391,6 @@ static inline amqp_rpc_reply_t amqp_rpc_reply_error(amqp_status_enum status) {
}
int amqp_send_frame_inner(amqp_connection_state_t state,
- const amqp_frame_t *frame, int flags);
+ const amqp_frame_t *frame, int flags,
+ amqp_time_t deadline);
#endif
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index 723dffa..0c7c052 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -541,22 +541,25 @@ int amqp_open_socket_inner(char const *hostname,
return sockfd;
}
-int amqp_send_header(amqp_connection_state_t state)
-{
+static int send_header_inner(amqp_connection_state_t state,
+ amqp_time_t deadline) {
ssize_t res;
static const uint8_t header[8] = { 'A', 'M', 'Q', 'P', 0,
AMQP_PROTOCOL_VERSION_MAJOR,
AMQP_PROTOCOL_VERSION_MINOR,
AMQP_PROTOCOL_VERSION_REVISION
};
- res = amqp_try_send(state, header, sizeof(header), amqp_time_infinite(),
- AMQP_SF_NONE);
+ res = amqp_try_send(state, header, sizeof(header), deadline, AMQP_SF_NONE);
if (sizeof(header) == res) {
return AMQP_STATUS_OK;
}
return (int)res;
}
+int amqp_send_header(amqp_connection_state_t state) {
+ return send_header_inner(state, amqp_time_infinite());
+}
+
static amqp_bytes_t sasl_method_name(amqp_sasl_method_enum method)
{
amqp_bytes_t res;
@@ -793,17 +796,10 @@ int amqp_try_recv(amqp_connection_state_t state) {
static int wait_frame_inner(amqp_connection_state_t state,
amqp_frame_t *decoded_frame,
- struct timeval *timeout)
-{
+ amqp_time_t timeout_deadline) {
amqp_time_t deadline;
- amqp_time_t timeout_deadline;
int res;
- res = amqp_time_from_now(&timeout_deadline, timeout);
- if (AMQP_STATUS_OK != res) {
- return res;
- }
-
for (;;) {
while (amqp_data_in_buffer(state)) {
res = consume_one_frame(state, decoded_frame);
@@ -949,7 +945,7 @@ int amqp_simple_wait_frame_on_channel(amqp_connection_state_t state,
}
for (;;) {
- res = wait_frame_inner(state, decoded_frame, NULL);
+ res = wait_frame_inner(state, decoded_frame, amqp_time_infinite());
if (AMQP_STATUS_OK != res) {
return res;
@@ -976,6 +972,13 @@ int amqp_simple_wait_frame_noblock(amqp_connection_state_t state,
amqp_frame_t *decoded_frame,
struct timeval *timeout)
{
+ amqp_time_t deadline;
+
+ int res = amqp_time_from_now(&deadline, timeout);
+ if (AMQP_STATUS_OK != res) {
+ return res;
+ }
+
if (state->first_queued_frame != NULL) {
amqp_frame_t *f = (amqp_frame_t *) state->first_queued_frame->data;
state->first_queued_frame = state->first_queued_frame->next;
@@ -985,16 +988,25 @@ int amqp_simple_wait_frame_noblock(amqp_connection_state_t state,
*decoded_frame = *f;
return AMQP_STATUS_OK;
} else {
- return wait_frame_inner(state, decoded_frame, timeout);
+ return wait_frame_inner(state, decoded_frame, deadline);
}
}
static int amqp_simple_wait_method_list(amqp_connection_state_t state,
amqp_channel_t expected_channel,
amqp_method_number_t *expected_methods,
+ amqp_time_t deadline,
amqp_method_t *output) {
amqp_frame_t frame;
- int res = amqp_simple_wait_frame(state, &frame);
+ struct timeval tv;
+ struct timeval *tvp;
+
+ int res = amqp_time_tv_until(deadline, &tv, &tvp);
+ if (res != AMQP_STATUS_OK) {
+ return res;
+ }
+
+ res = amqp_simple_wait_frame_noblock(state, &frame, tvp);
if (AMQP_STATUS_OK != res) {
return res;
}
@@ -1008,32 +1020,41 @@ static int amqp_simple_wait_method_list(amqp_connection_state_t state,
return AMQP_STATUS_OK;
}
+static int simple_wait_method_inner(amqp_connection_state_t state,
+ amqp_channel_t expected_channel,
+ amqp_method_number_t expected_method,
+ amqp_time_t deadline,
+ amqp_method_t *output) {
+ amqp_method_number_t expected_methods[] = {expected_method, 0};
+ return amqp_simple_wait_method_list(state, expected_channel, expected_methods,
+ deadline, output);
+}
+
int amqp_simple_wait_method(amqp_connection_state_t state,
amqp_channel_t expected_channel,
amqp_method_number_t expected_method,
amqp_method_t *output)
{
- amqp_method_number_t expected_methods[] = { 0, 0 };
- expected_methods[0] = expected_method;
- return amqp_simple_wait_method_list(state, expected_channel, expected_methods,
- output);
+ return simple_wait_method_inner(state, expected_channel, expected_method,
+ amqp_time_infinite(), output);
}
int amqp_send_method(amqp_connection_state_t state, amqp_channel_t channel,
amqp_method_number_t id, void *decoded) {
- return amqp_send_method_inner(state, channel, id, decoded, AMQP_SF_NONE);
+ return amqp_send_method_inner(state, channel, id, decoded, AMQP_SF_NONE,
+ amqp_time_infinite());
}
int amqp_send_method_inner(amqp_connection_state_t state,
amqp_channel_t channel, amqp_method_number_t id,
- void *decoded, int flags) {
+ void *decoded, int flags, amqp_time_t deadline) {
amqp_frame_t frame;
frame.frame_type = AMQP_FRAME_METHOD;
frame.channel = channel;
frame.payload.method.id = id;
frame.payload.method.decoded = decoded;
- return amqp_send_frame_inner(state, &frame, flags);
+ return amqp_send_frame_inner(state, &frame, flags, deadline);
}
static int amqp_id_in_reply_list( amqp_method_number_t expected, amqp_method_number_t *list )
@@ -1047,12 +1068,10 @@ static int amqp_id_in_reply_list( amqp_method_number_t expected, amqp_method_num
return 0;
}
-amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_method_number_t request_id,
- amqp_method_number_t *expected_reply_ids,
- void *decoded_request_method)
-{
+static amqp_rpc_reply_t simple_rpc_inner(
+ amqp_connection_state_t state, amqp_channel_t channel,
+ amqp_method_number_t request_id, amqp_method_number_t *expected_reply_ids,
+ void *decoded_request_method, amqp_time_t deadline) {
int status;
amqp_rpc_reply_t result;
@@ -1069,7 +1088,7 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
amqp_frame_t frame;
retry:
- status = wait_frame_inner(state, &frame, NULL);
+ status = wait_frame_inner(state, &frame, deadline);
if (status < 0) {
result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
result.library_error = status;
@@ -1138,20 +1157,29 @@ retry:
}
}
-void *amqp_simple_rpc_decoded(amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_method_number_t request_id,
- amqp_method_number_t reply_id,
- void *decoded_request_method)
-{
+amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
+ amqp_channel_t channel,
+ amqp_method_number_t request_id,
+ amqp_method_number_t *expected_reply_ids,
+ void *decoded_request_method) {
+ return simple_rpc_inner(state, channel, request_id, expected_reply_ids,
+ decoded_request_method, amqp_time_infinite());
+}
+
+static void *simple_rpc_decoded_inner(amqp_connection_state_t state,
+ amqp_channel_t channel,
+ amqp_method_number_t request_id,
+ amqp_method_number_t reply_id,
+ void *decoded_request_method,
+ amqp_time_t deadline) {
amqp_method_number_t replies[2];
replies[0] = reply_id;
replies[1] = 0;
- state->most_recent_api_result = amqp_simple_rpc(state, channel,
- request_id, replies,
- decoded_request_method);
+ state->most_recent_api_result = simple_rpc_inner(
+ state, channel, request_id, replies, decoded_request_method, deadline);
+
if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) {
return state->most_recent_api_result.reply.decoded;
} else {
@@ -1159,6 +1187,15 @@ void *amqp_simple_rpc_decoded(amqp_connection_state_t state,
}
}
+void *amqp_simple_rpc_decoded(amqp_connection_state_t state,
+ amqp_channel_t channel,
+ amqp_method_number_t request_id,
+ amqp_method_number_t reply_id,
+ void *decoded_request_method) {
+ return simple_rpc_decoded_inner(state, channel, request_id, reply_id,
+ decoded_request_method, amqp_time_infinite());
+}
+
amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state)
{
return state->most_recent_api_result;
@@ -1225,15 +1262,10 @@ error_out:
return res;
}
-static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state,
- char const *vhost,
- int channel_max,
- int frame_max,
- int heartbeat,
- const amqp_table_t *client_properties,
- amqp_sasl_method_enum sasl_method,
- va_list vl)
-{
+static amqp_rpc_reply_t amqp_login_inner(
+ amqp_connection_state_t state, char const *vhost, int channel_max,
+ int frame_max, int heartbeat, const amqp_table_t *client_properties,
+ struct timeval *timeout, amqp_sasl_method_enum sasl_method, va_list vl) {
int res;
amqp_method_t method;
@@ -1246,6 +1278,7 @@ static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state,
uint16_t server_heartbeat;
amqp_rpc_reply_t result;
+ amqp_time_t deadline;
if (channel_max < 0 || channel_max > UINT16_MAX) {
return amqp_rpc_reply_error(AMQP_STATUS_INVALID_PARAMETER);
@@ -1262,14 +1295,19 @@ static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state,
}
client_heartbeat = (uint16_t)heartbeat;
- res = amqp_send_header(state);
+ res = amqp_time_from_now(&deadline, timeout);
if (AMQP_STATUS_OK != res) {
goto error_res;
}
- res = amqp_simple_wait_method(state, 0, AMQP_CONNECTION_START_METHOD,
- &method);
- if (res != AMQP_STATUS_OK) {
+ res = send_header_inner(state, deadline);
+ if (AMQP_STATUS_OK != res) {
+ goto error_res;
+ }
+
+ res = simple_wait_method_inner(state, 0, AMQP_CONNECTION_START_METHOD,
+ deadline, &method);
+ if (AMQP_STATUS_OK != res) {
goto error_res;
}
@@ -1354,7 +1392,8 @@ static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state,
s.response = response_bytes;
s.locale = amqp_cstring_bytes("en_US");
- res = amqp_send_method(state, 0, AMQP_CONNECTION_START_OK_METHOD, &s);
+ res = amqp_send_method_inner(state, 0, AMQP_CONNECTION_START_OK_METHOD, &s,
+ AMQP_SF_NONE, deadline);
if (res < 0) {
goto error_res;
}
@@ -1365,7 +1404,8 @@ static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state,
{
amqp_method_number_t expected[] = { AMQP_CONNECTION_TUNE_METHOD,
AMQP_CONNECTION_CLOSE_METHOD, 0 };
- res = amqp_simple_wait_method_list(state, 0, expected, &method);
+
+ res = amqp_simple_wait_method_list(state, 0, expected, deadline, &method);
if (AMQP_STATUS_OK != res) {
goto error_res;
}
@@ -1412,7 +1452,8 @@ static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state,
s.channel_max = client_channel_max;
s.heartbeat = client_heartbeat;
- res = amqp_send_method(state, 0, AMQP_CONNECTION_TUNE_OK_METHOD, &s);
+ res = amqp_send_method_inner(state, 0, AMQP_CONNECTION_TUNE_OK_METHOD, &s,
+ AMQP_SF_NONE, deadline);
if (res < 0) {
goto error_res;
}
@@ -1427,11 +1468,8 @@ static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state,
s.capabilities = amqp_empty_bytes;
s.insist = 1;
- result = amqp_simple_rpc(state,
- 0,
- AMQP_CONNECTION_OPEN_METHOD,
- replies,
- &s);
+ result = simple_rpc_inner(state, 0, AMQP_CONNECTION_OPEN_METHOD, replies,
+ &s, deadline);
if (result.reply_type != AMQP_RESPONSE_NORMAL) {
goto out;
}
@@ -1469,7 +1507,8 @@ amqp_rpc_reply_t amqp_login(amqp_connection_state_t state,
va_start(vl, sasl_method);
ret = amqp_login_inner(state, vhost, channel_max, frame_max, heartbeat,
- &amqp_empty_table, sasl_method, vl);
+ &amqp_empty_table, state->handshake_timeout,
+ sasl_method, vl);
va_end(vl);
@@ -1491,7 +1530,8 @@ amqp_rpc_reply_t amqp_login_with_properties(amqp_connection_state_t state,
va_start(vl, sasl_method);
ret = amqp_login_inner(state, vhost, channel_max, frame_max, heartbeat,
- client_properties, sasl_method, vl);
+ client_properties, state->handshake_timeout,
+ sasl_method, vl);
va_end(vl);
diff --git a/librabbitmq/amqp_socket.h b/librabbitmq/amqp_socket.h
index dffeec7..c2ec2a6 100644
--- a/librabbitmq/amqp_socket.h
+++ b/librabbitmq/amqp_socket.h
@@ -179,7 +179,8 @@ int amqp_poll(int fd, int event, amqp_time_t deadline);
int amqp_send_method_inner(amqp_connection_state_t state,
amqp_channel_t channel, amqp_method_number_t id,
- void *decoded, int flags);
+ void *decoded, int flags, amqp_time_t deadline);
+
int
amqp_queue_frame(amqp_connection_state_t state, amqp_frame_t *frame);