diff options
Diffstat (limited to 'librabbitmq/amqp_socket.c')
-rw-r--r-- | librabbitmq/amqp_socket.c | 162 |
1 files changed, 101 insertions, 61 deletions
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index 723dffa..0c7c052 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -541,22 +541,25 @@ int amqp_open_socket_inner(char const *hostname, return sockfd; } -int amqp_send_header(amqp_connection_state_t state) -{ +static int send_header_inner(amqp_connection_state_t state, + amqp_time_t deadline) { ssize_t res; static const uint8_t header[8] = { 'A', 'M', 'Q', 'P', 0, AMQP_PROTOCOL_VERSION_MAJOR, AMQP_PROTOCOL_VERSION_MINOR, AMQP_PROTOCOL_VERSION_REVISION }; - res = amqp_try_send(state, header, sizeof(header), amqp_time_infinite(), - AMQP_SF_NONE); + res = amqp_try_send(state, header, sizeof(header), deadline, AMQP_SF_NONE); if (sizeof(header) == res) { return AMQP_STATUS_OK; } return (int)res; } +int amqp_send_header(amqp_connection_state_t state) { + return send_header_inner(state, amqp_time_infinite()); +} + static amqp_bytes_t sasl_method_name(amqp_sasl_method_enum method) { amqp_bytes_t res; @@ -793,17 +796,10 @@ int amqp_try_recv(amqp_connection_state_t state) { static int wait_frame_inner(amqp_connection_state_t state, amqp_frame_t *decoded_frame, - struct timeval *timeout) -{ + amqp_time_t timeout_deadline) { amqp_time_t deadline; - amqp_time_t timeout_deadline; int res; - res = amqp_time_from_now(&timeout_deadline, timeout); - if (AMQP_STATUS_OK != res) { - return res; - } - for (;;) { while (amqp_data_in_buffer(state)) { res = consume_one_frame(state, decoded_frame); @@ -949,7 +945,7 @@ int amqp_simple_wait_frame_on_channel(amqp_connection_state_t state, } for (;;) { - res = wait_frame_inner(state, decoded_frame, NULL); + res = wait_frame_inner(state, decoded_frame, amqp_time_infinite()); if (AMQP_STATUS_OK != res) { return res; @@ -976,6 +972,13 @@ int amqp_simple_wait_frame_noblock(amqp_connection_state_t state, amqp_frame_t *decoded_frame, struct timeval *timeout) { + amqp_time_t deadline; + + int res = amqp_time_from_now(&deadline, timeout); + if (AMQP_STATUS_OK != res) { + return res; + } + if (state->first_queued_frame != NULL) { amqp_frame_t *f = (amqp_frame_t *) state->first_queued_frame->data; state->first_queued_frame = state->first_queued_frame->next; @@ -985,16 +988,25 @@ int amqp_simple_wait_frame_noblock(amqp_connection_state_t state, *decoded_frame = *f; return AMQP_STATUS_OK; } else { - return wait_frame_inner(state, decoded_frame, timeout); + return wait_frame_inner(state, decoded_frame, deadline); } } static int amqp_simple_wait_method_list(amqp_connection_state_t state, amqp_channel_t expected_channel, amqp_method_number_t *expected_methods, + amqp_time_t deadline, amqp_method_t *output) { amqp_frame_t frame; - int res = amqp_simple_wait_frame(state, &frame); + struct timeval tv; + struct timeval *tvp; + + int res = amqp_time_tv_until(deadline, &tv, &tvp); + if (res != AMQP_STATUS_OK) { + return res; + } + + res = amqp_simple_wait_frame_noblock(state, &frame, tvp); if (AMQP_STATUS_OK != res) { return res; } @@ -1008,32 +1020,41 @@ static int amqp_simple_wait_method_list(amqp_connection_state_t state, return AMQP_STATUS_OK; } +static int simple_wait_method_inner(amqp_connection_state_t state, + amqp_channel_t expected_channel, + amqp_method_number_t expected_method, + amqp_time_t deadline, + amqp_method_t *output) { + amqp_method_number_t expected_methods[] = {expected_method, 0}; + return amqp_simple_wait_method_list(state, expected_channel, expected_methods, + deadline, output); +} + int amqp_simple_wait_method(amqp_connection_state_t state, amqp_channel_t expected_channel, amqp_method_number_t expected_method, amqp_method_t *output) { - amqp_method_number_t expected_methods[] = { 0, 0 }; - expected_methods[0] = expected_method; - return amqp_simple_wait_method_list(state, expected_channel, expected_methods, - output); + return simple_wait_method_inner(state, expected_channel, expected_method, + amqp_time_infinite(), output); } int amqp_send_method(amqp_connection_state_t state, amqp_channel_t channel, amqp_method_number_t id, void *decoded) { - return amqp_send_method_inner(state, channel, id, decoded, AMQP_SF_NONE); + return amqp_send_method_inner(state, channel, id, decoded, AMQP_SF_NONE, + amqp_time_infinite()); } int amqp_send_method_inner(amqp_connection_state_t state, amqp_channel_t channel, amqp_method_number_t id, - void *decoded, int flags) { + void *decoded, int flags, amqp_time_t deadline) { amqp_frame_t frame; frame.frame_type = AMQP_FRAME_METHOD; frame.channel = channel; frame.payload.method.id = id; frame.payload.method.decoded = decoded; - return amqp_send_frame_inner(state, &frame, flags); + return amqp_send_frame_inner(state, &frame, flags, deadline); } static int amqp_id_in_reply_list( amqp_method_number_t expected, amqp_method_number_t *list ) @@ -1047,12 +1068,10 @@ static int amqp_id_in_reply_list( amqp_method_number_t expected, amqp_method_num return 0; } -amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_method_number_t request_id, - amqp_method_number_t *expected_reply_ids, - void *decoded_request_method) -{ +static amqp_rpc_reply_t simple_rpc_inner( + amqp_connection_state_t state, amqp_channel_t channel, + amqp_method_number_t request_id, amqp_method_number_t *expected_reply_ids, + void *decoded_request_method, amqp_time_t deadline) { int status; amqp_rpc_reply_t result; @@ -1069,7 +1088,7 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, amqp_frame_t frame; retry: - status = wait_frame_inner(state, &frame, NULL); + status = wait_frame_inner(state, &frame, deadline); if (status < 0) { result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; result.library_error = status; @@ -1138,20 +1157,29 @@ retry: } } -void *amqp_simple_rpc_decoded(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_method_number_t request_id, - amqp_method_number_t reply_id, - void *decoded_request_method) -{ +amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, + amqp_channel_t channel, + amqp_method_number_t request_id, + amqp_method_number_t *expected_reply_ids, + void *decoded_request_method) { + return simple_rpc_inner(state, channel, request_id, expected_reply_ids, + decoded_request_method, amqp_time_infinite()); +} + +static void *simple_rpc_decoded_inner(amqp_connection_state_t state, + amqp_channel_t channel, + amqp_method_number_t request_id, + amqp_method_number_t reply_id, + void *decoded_request_method, + amqp_time_t deadline) { amqp_method_number_t replies[2]; replies[0] = reply_id; replies[1] = 0; - state->most_recent_api_result = amqp_simple_rpc(state, channel, - request_id, replies, - decoded_request_method); + state->most_recent_api_result = simple_rpc_inner( + state, channel, request_id, replies, decoded_request_method, deadline); + if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) { return state->most_recent_api_result.reply.decoded; } else { @@ -1159,6 +1187,15 @@ void *amqp_simple_rpc_decoded(amqp_connection_state_t state, } } +void *amqp_simple_rpc_decoded(amqp_connection_state_t state, + amqp_channel_t channel, + amqp_method_number_t request_id, + amqp_method_number_t reply_id, + void *decoded_request_method) { + return simple_rpc_decoded_inner(state, channel, request_id, reply_id, + decoded_request_method, amqp_time_infinite()); +} + amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state) { return state->most_recent_api_result; @@ -1225,15 +1262,10 @@ error_out: return res; } -static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state, - char const *vhost, - int channel_max, - int frame_max, - int heartbeat, - const amqp_table_t *client_properties, - amqp_sasl_method_enum sasl_method, - va_list vl) -{ +static amqp_rpc_reply_t amqp_login_inner( + amqp_connection_state_t state, char const *vhost, int channel_max, + int frame_max, int heartbeat, const amqp_table_t *client_properties, + struct timeval *timeout, amqp_sasl_method_enum sasl_method, va_list vl) { int res; amqp_method_t method; @@ -1246,6 +1278,7 @@ static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state, uint16_t server_heartbeat; amqp_rpc_reply_t result; + amqp_time_t deadline; if (channel_max < 0 || channel_max > UINT16_MAX) { return amqp_rpc_reply_error(AMQP_STATUS_INVALID_PARAMETER); @@ -1262,14 +1295,19 @@ static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state, } client_heartbeat = (uint16_t)heartbeat; - res = amqp_send_header(state); + res = amqp_time_from_now(&deadline, timeout); if (AMQP_STATUS_OK != res) { goto error_res; } - res = amqp_simple_wait_method(state, 0, AMQP_CONNECTION_START_METHOD, - &method); - if (res != AMQP_STATUS_OK) { + res = send_header_inner(state, deadline); + if (AMQP_STATUS_OK != res) { + goto error_res; + } + + res = simple_wait_method_inner(state, 0, AMQP_CONNECTION_START_METHOD, + deadline, &method); + if (AMQP_STATUS_OK != res) { goto error_res; } @@ -1354,7 +1392,8 @@ static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state, s.response = response_bytes; s.locale = amqp_cstring_bytes("en_US"); - res = amqp_send_method(state, 0, AMQP_CONNECTION_START_OK_METHOD, &s); + res = amqp_send_method_inner(state, 0, AMQP_CONNECTION_START_OK_METHOD, &s, + AMQP_SF_NONE, deadline); if (res < 0) { goto error_res; } @@ -1365,7 +1404,8 @@ static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state, { amqp_method_number_t expected[] = { AMQP_CONNECTION_TUNE_METHOD, AMQP_CONNECTION_CLOSE_METHOD, 0 }; - res = amqp_simple_wait_method_list(state, 0, expected, &method); + + res = amqp_simple_wait_method_list(state, 0, expected, deadline, &method); if (AMQP_STATUS_OK != res) { goto error_res; } @@ -1412,7 +1452,8 @@ static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state, s.channel_max = client_channel_max; s.heartbeat = client_heartbeat; - res = amqp_send_method(state, 0, AMQP_CONNECTION_TUNE_OK_METHOD, &s); + res = amqp_send_method_inner(state, 0, AMQP_CONNECTION_TUNE_OK_METHOD, &s, + AMQP_SF_NONE, deadline); if (res < 0) { goto error_res; } @@ -1427,11 +1468,8 @@ static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state, s.capabilities = amqp_empty_bytes; s.insist = 1; - result = amqp_simple_rpc(state, - 0, - AMQP_CONNECTION_OPEN_METHOD, - replies, - &s); + result = simple_rpc_inner(state, 0, AMQP_CONNECTION_OPEN_METHOD, replies, + &s, deadline); if (result.reply_type != AMQP_RESPONSE_NORMAL) { goto out; } @@ -1469,7 +1507,8 @@ amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, va_start(vl, sasl_method); ret = amqp_login_inner(state, vhost, channel_max, frame_max, heartbeat, - &amqp_empty_table, sasl_method, vl); + &amqp_empty_table, state->handshake_timeout, + sasl_method, vl); va_end(vl); @@ -1491,7 +1530,8 @@ amqp_rpc_reply_t amqp_login_with_properties(amqp_connection_state_t state, va_start(vl, sasl_method); ret = amqp_login_inner(state, vhost, channel_max, frame_max, heartbeat, - client_properties, sasl_method, vl); + client_properties, state->handshake_timeout, + sasl_method, vl); va_end(vl); |