diff options
Diffstat (limited to 'librabbitmq/amqp_socket.c')
-rw-r--r-- | librabbitmq/amqp_socket.c | 45 |
1 files changed, 26 insertions, 19 deletions
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index 22d66de..a63a0b6 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -1088,9 +1088,10 @@ static amqp_rpc_reply_t simple_rpc_inner( retry: status = wait_frame_inner(state, &frame, deadline); if (status < 0) { - result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; - result.library_error = status; - return result; + if (status == AMQP_STATUS_TIMEOUT) { + amqp_socket_close(state->socket, AMQP_SC_FORCE); + } + return amqp_rpc_reply_error(status); } /* @@ -1156,18 +1157,33 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, amqp_method_number_t request_id, amqp_method_number_t *expected_reply_ids, void *decoded_request_method) { + amqp_time_t deadline; + int res; + + res = amqp_time_from_now(&deadline, state->rpc_timeout); + if (res != AMQP_STATUS_OK) { + return amqp_rpc_reply_error(res); + } + return simple_rpc_inner(state, channel, request_id, expected_reply_ids, - decoded_request_method, amqp_time_infinite()); + decoded_request_method, deadline); } -static void *simple_rpc_decoded_inner(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_method_number_t request_id, - amqp_method_number_t reply_id, - void *decoded_request_method, - amqp_time_t deadline) { +void *amqp_simple_rpc_decoded(amqp_connection_state_t state, + amqp_channel_t channel, + amqp_method_number_t request_id, + amqp_method_number_t reply_id, + void *decoded_request_method) { + amqp_time_t deadline; + int res; amqp_method_number_t replies[2]; + res = amqp_time_from_now(&deadline, state->rpc_timeout); + if (res != AMQP_STATUS_OK) { + state->most_recent_api_result = amqp_rpc_reply_error(res); + return NULL; + } + replies[0] = reply_id; replies[1] = 0; @@ -1181,15 +1197,6 @@ static void *simple_rpc_decoded_inner(amqp_connection_state_t state, } } -void *amqp_simple_rpc_decoded(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_method_number_t request_id, - amqp_method_number_t reply_id, - void *decoded_request_method) { - return simple_rpc_decoded_inner(state, channel, request_id, reply_id, - decoded_request_method, amqp_time_infinite()); -} - amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state) { return state->most_recent_api_result; |