summaryrefslogtreecommitdiff
path: root/librabbitmq/amqp_socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'librabbitmq/amqp_socket.c')
-rw-r--r--librabbitmq/amqp_socket.c162
1 files changed, 101 insertions, 61 deletions
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index 723dffa..0c7c052 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -541,22 +541,25 @@ int amqp_open_socket_inner(char const *hostname,
return sockfd;
}
-int amqp_send_header(amqp_connection_state_t state)
-{
+static int send_header_inner(amqp_connection_state_t state,
+ amqp_time_t deadline) {
ssize_t res;
static const uint8_t header[8] = { 'A', 'M', 'Q', 'P', 0,
AMQP_PROTOCOL_VERSION_MAJOR,
AMQP_PROTOCOL_VERSION_MINOR,
AMQP_PROTOCOL_VERSION_REVISION
};
- res = amqp_try_send(state, header, sizeof(header), amqp_time_infinite(),
- AMQP_SF_NONE);
+ res = amqp_try_send(state, header, sizeof(header), deadline, AMQP_SF_NONE);
if (sizeof(header) == res) {
return AMQP_STATUS_OK;
}
return (int)res;
}
+int amqp_send_header(amqp_connection_state_t state) {
+ return send_header_inner(state, amqp_time_infinite());
+}
+
static amqp_bytes_t sasl_method_name(amqp_sasl_method_enum method)
{
amqp_bytes_t res;
@@ -793,17 +796,10 @@ int amqp_try_recv(amqp_connection_state_t state) {
static int wait_frame_inner(amqp_connection_state_t state,
amqp_frame_t *decoded_frame,
- struct timeval *timeout)
-{
+ amqp_time_t timeout_deadline) {
amqp_time_t deadline;
- amqp_time_t timeout_deadline;
int res;
- res = amqp_time_from_now(&timeout_deadline, timeout);
- if (AMQP_STATUS_OK != res) {
- return res;
- }
-
for (;;) {
while (amqp_data_in_buffer(state)) {
res = consume_one_frame(state, decoded_frame);
@@ -949,7 +945,7 @@ int amqp_simple_wait_frame_on_channel(amqp_connection_state_t state,
}
for (;;) {
- res = wait_frame_inner(state, decoded_frame, NULL);
+ res = wait_frame_inner(state, decoded_frame, amqp_time_infinite());
if (AMQP_STATUS_OK != res) {
return res;
@@ -976,6 +972,13 @@ int amqp_simple_wait_frame_noblock(amqp_connection_state_t state,
amqp_frame_t *decoded_frame,
struct timeval *timeout)
{
+ amqp_time_t deadline;
+
+ int res = amqp_time_from_now(&deadline, timeout);
+ if (AMQP_STATUS_OK != res) {
+ return res;
+ }
+
if (state->first_queued_frame != NULL) {
amqp_frame_t *f = (amqp_frame_t *) state->first_queued_frame->data;
state->first_queued_frame = state->first_queued_frame->next;
@@ -985,16 +988,25 @@ int amqp_simple_wait_frame_noblock(amqp_connection_state_t state,
*decoded_frame = *f;
return AMQP_STATUS_OK;
} else {
- return wait_frame_inner(state, decoded_frame, timeout);
+ return wait_frame_inner(state, decoded_frame, deadline);
}
}
static int amqp_simple_wait_method_list(amqp_connection_state_t state,
amqp_channel_t expected_channel,
amqp_method_number_t *expected_methods,
+ amqp_time_t deadline,
amqp_method_t *output) {
amqp_frame_t frame;
- int res = amqp_simple_wait_frame(state, &frame);
+ struct timeval tv;
+ struct timeval *tvp;
+
+ int res = amqp_time_tv_until(deadline, &tv, &tvp);
+ if (res != AMQP_STATUS_OK) {
+ return res;
+ }
+
+ res = amqp_simple_wait_frame_noblock(state, &frame, tvp);
if (AMQP_STATUS_OK != res) {
return res;
}
@@ -1008,32 +1020,41 @@ static int amqp_simple_wait_method_list(amqp_connection_state_t state,
return AMQP_STATUS_OK;
}
+static int simple_wait_method_inner(amqp_connection_state_t state,
+ amqp_channel_t expected_channel,
+ amqp_method_number_t expected_method,
+ amqp_time_t deadline,
+ amqp_method_t *output) {
+ amqp_method_number_t expected_methods[] = {expected_method, 0};
+ return amqp_simple_wait_method_list(state, expected_channel, expected_methods,
+ deadline, output);
+}
+
int amqp_simple_wait_method(amqp_connection_state_t state,
amqp_channel_t expected_channel,
amqp_method_number_t expected_method,
amqp_method_t *output)
{
- amqp_method_number_t expected_methods[] = { 0, 0 };
- expected_methods[0] = expected_method;
- return amqp_simple_wait_method_list(state, expected_channel, expected_methods,
- output);
+ return simple_wait_method_inner(state, expected_channel, expected_method,
+ amqp_time_infinite(), output);
}
int amqp_send_method(amqp_connection_state_t state, amqp_channel_t channel,
amqp_method_number_t id, void *decoded) {
- return amqp_send_method_inner(state, channel, id, decoded, AMQP_SF_NONE);
+ return amqp_send_method_inner(state, channel, id, decoded, AMQP_SF_NONE,
+ amqp_time_infinite());
}
int amqp_send_method_inner(amqp_connection_state_t state,
amqp_channel_t channel, amqp_method_number_t id,
- void *decoded, int flags) {
+ void *decoded, int flags, amqp_time_t deadline) {
amqp_frame_t frame;
frame.frame_type = AMQP_FRAME_METHOD;
frame.channel = channel;
frame.payload.method.id = id;
frame.payload.method.decoded = decoded;
- return amqp_send_frame_inner(state, &frame, flags);
+ return amqp_send_frame_inner(state, &frame, flags, deadline);
}
static int amqp_id_in_reply_list( amqp_method_number_t expected, amqp_method_number_t *list )
@@ -1047,12 +1068,10 @@ static int amqp_id_in_reply_list( amqp_method_number_t expected, amqp_method_num
return 0;
}
-amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_method_number_t request_id,
- amqp_method_number_t *expected_reply_ids,
- void *decoded_request_method)
-{
+static amqp_rpc_reply_t simple_rpc_inner(
+ amqp_connection_state_t state, amqp_channel_t channel,
+ amqp_method_number_t request_id, amqp_method_number_t *expected_reply_ids,
+ void *decoded_request_method, amqp_time_t deadline) {
int status;
amqp_rpc_reply_t result;
@@ -1069,7 +1088,7 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
amqp_frame_t frame;
retry:
- status = wait_frame_inner(state, &frame, NULL);
+ status = wait_frame_inner(state, &frame, deadline);
if (status < 0) {
result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
result.library_error = status;
@@ -1138,20 +1157,29 @@ retry:
}
}
-void *amqp_simple_rpc_decoded(amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_method_number_t request_id,
- amqp_method_number_t reply_id,
- void *decoded_request_method)
-{
+amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
+ amqp_channel_t channel,
+ amqp_method_number_t request_id,
+ amqp_method_number_t *expected_reply_ids,
+ void *decoded_request_method) {
+ return simple_rpc_inner(state, channel, request_id, expected_reply_ids,
+ decoded_request_method, amqp_time_infinite());
+}
+
+static void *simple_rpc_decoded_inner(amqp_connection_state_t state,
+ amqp_channel_t channel,
+ amqp_method_number_t request_id,
+ amqp_method_number_t reply_id,
+ void *decoded_request_method,
+ amqp_time_t deadline) {
amqp_method_number_t replies[2];
replies[0] = reply_id;
replies[1] = 0;
- state->most_recent_api_result = amqp_simple_rpc(state, channel,
- request_id, replies,
- decoded_request_method);
+ state->most_recent_api_result = simple_rpc_inner(
+ state, channel, request_id, replies, decoded_request_method, deadline);
+
if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) {
return state->most_recent_api_result.reply.decoded;
} else {
@@ -1159,6 +1187,15 @@ void *amqp_simple_rpc_decoded(amqp_connection_state_t state,
}
}
+void *amqp_simple_rpc_decoded(amqp_connection_state_t state,
+ amqp_channel_t channel,
+ amqp_method_number_t request_id,
+ amqp_method_number_t reply_id,
+ void *decoded_request_method) {
+ return simple_rpc_decoded_inner(state, channel, request_id, reply_id,
+ decoded_request_method, amqp_time_infinite());
+}
+
amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state)
{
return state->most_recent_api_result;
@@ -1225,15 +1262,10 @@ error_out:
return res;
}
-static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state,
- char const *vhost,
- int channel_max,
- int frame_max,
- int heartbeat,
- const amqp_table_t *client_properties,
- amqp_sasl_method_enum sasl_method,
- va_list vl)
-{
+static amqp_rpc_reply_t amqp_login_inner(
+ amqp_connection_state_t state, char const *vhost, int channel_max,
+ int frame_max, int heartbeat, const amqp_table_t *client_properties,
+ struct timeval *timeout, amqp_sasl_method_enum sasl_method, va_list vl) {
int res;
amqp_method_t method;
@@ -1246,6 +1278,7 @@ static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state,
uint16_t server_heartbeat;
amqp_rpc_reply_t result;
+ amqp_time_t deadline;
if (channel_max < 0 || channel_max > UINT16_MAX) {
return amqp_rpc_reply_error(AMQP_STATUS_INVALID_PARAMETER);
@@ -1262,14 +1295,19 @@ static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state,
}
client_heartbeat = (uint16_t)heartbeat;
- res = amqp_send_header(state);
+ res = amqp_time_from_now(&deadline, timeout);
if (AMQP_STATUS_OK != res) {
goto error_res;
}
- res = amqp_simple_wait_method(state, 0, AMQP_CONNECTION_START_METHOD,
- &method);
- if (res != AMQP_STATUS_OK) {
+ res = send_header_inner(state, deadline);
+ if (AMQP_STATUS_OK != res) {
+ goto error_res;
+ }
+
+ res = simple_wait_method_inner(state, 0, AMQP_CONNECTION_START_METHOD,
+ deadline, &method);
+ if (AMQP_STATUS_OK != res) {
goto error_res;
}
@@ -1354,7 +1392,8 @@ static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state,
s.response = response_bytes;
s.locale = amqp_cstring_bytes("en_US");
- res = amqp_send_method(state, 0, AMQP_CONNECTION_START_OK_METHOD, &s);
+ res = amqp_send_method_inner(state, 0, AMQP_CONNECTION_START_OK_METHOD, &s,
+ AMQP_SF_NONE, deadline);
if (res < 0) {
goto error_res;
}
@@ -1365,7 +1404,8 @@ static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state,
{
amqp_method_number_t expected[] = { AMQP_CONNECTION_TUNE_METHOD,
AMQP_CONNECTION_CLOSE_METHOD, 0 };
- res = amqp_simple_wait_method_list(state, 0, expected, &method);
+
+ res = amqp_simple_wait_method_list(state, 0, expected, deadline, &method);
if (AMQP_STATUS_OK != res) {
goto error_res;
}
@@ -1412,7 +1452,8 @@ static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state,
s.channel_max = client_channel_max;
s.heartbeat = client_heartbeat;
- res = amqp_send_method(state, 0, AMQP_CONNECTION_TUNE_OK_METHOD, &s);
+ res = amqp_send_method_inner(state, 0, AMQP_CONNECTION_TUNE_OK_METHOD, &s,
+ AMQP_SF_NONE, deadline);
if (res < 0) {
goto error_res;
}
@@ -1427,11 +1468,8 @@ static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state,
s.capabilities = amqp_empty_bytes;
s.insist = 1;
- result = amqp_simple_rpc(state,
- 0,
- AMQP_CONNECTION_OPEN_METHOD,
- replies,
- &s);
+ result = simple_rpc_inner(state, 0, AMQP_CONNECTION_OPEN_METHOD, replies,
+ &s, deadline);
if (result.reply_type != AMQP_RESPONSE_NORMAL) {
goto out;
}
@@ -1469,7 +1507,8 @@ amqp_rpc_reply_t amqp_login(amqp_connection_state_t state,
va_start(vl, sasl_method);
ret = amqp_login_inner(state, vhost, channel_max, frame_max, heartbeat,
- &amqp_empty_table, sasl_method, vl);
+ &amqp_empty_table, state->handshake_timeout,
+ sasl_method, vl);
va_end(vl);
@@ -1491,7 +1530,8 @@ amqp_rpc_reply_t amqp_login_with_properties(amqp_connection_state_t state,
va_start(vl, sasl_method);
ret = amqp_login_inner(state, vhost, channel_max, frame_max, heartbeat,
- client_properties, sasl_method, vl);
+ client_properties, state->handshake_timeout,
+ sasl_method, vl);
va_end(vl);