summaryrefslogtreecommitdiff
path: root/librabbitmq/amqp_socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'librabbitmq/amqp_socket.c')
-rw-r--r--librabbitmq/amqp_socket.c45
1 files changed, 26 insertions, 19 deletions
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index 22d66de..a63a0b6 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -1088,9 +1088,10 @@ static amqp_rpc_reply_t simple_rpc_inner(
retry:
status = wait_frame_inner(state, &frame, deadline);
if (status < 0) {
- result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
- result.library_error = status;
- return result;
+ if (status == AMQP_STATUS_TIMEOUT) {
+ amqp_socket_close(state->socket, AMQP_SC_FORCE);
+ }
+ return amqp_rpc_reply_error(status);
}
/*
@@ -1156,18 +1157,33 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
amqp_method_number_t request_id,
amqp_method_number_t *expected_reply_ids,
void *decoded_request_method) {
+ amqp_time_t deadline;
+ int res;
+
+ res = amqp_time_from_now(&deadline, state->rpc_timeout);
+ if (res != AMQP_STATUS_OK) {
+ return amqp_rpc_reply_error(res);
+ }
+
return simple_rpc_inner(state, channel, request_id, expected_reply_ids,
- decoded_request_method, amqp_time_infinite());
+ decoded_request_method, deadline);
}
-static void *simple_rpc_decoded_inner(amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_method_number_t request_id,
- amqp_method_number_t reply_id,
- void *decoded_request_method,
- amqp_time_t deadline) {
+void *amqp_simple_rpc_decoded(amqp_connection_state_t state,
+ amqp_channel_t channel,
+ amqp_method_number_t request_id,
+ amqp_method_number_t reply_id,
+ void *decoded_request_method) {
+ amqp_time_t deadline;
+ int res;
amqp_method_number_t replies[2];
+ res = amqp_time_from_now(&deadline, state->rpc_timeout);
+ if (res != AMQP_STATUS_OK) {
+ state->most_recent_api_result = amqp_rpc_reply_error(res);
+ return NULL;
+ }
+
replies[0] = reply_id;
replies[1] = 0;
@@ -1181,15 +1197,6 @@ static void *simple_rpc_decoded_inner(amqp_connection_state_t state,
}
}
-void *amqp_simple_rpc_decoded(amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_method_number_t request_id,
- amqp_method_number_t reply_id,
- void *decoded_request_method) {
- return simple_rpc_decoded_inner(state, channel, request_id, reply_id,
- decoded_request_method, amqp_time_infinite());
-}
-
amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state)
{
return state->most_recent_api_result;