diff options
author | Alan Antonuk <alan.antonuk@gmail.com> | 2013-06-14 14:54:02 -0700 |
---|---|---|
committer | Alan Antonuk <alan.antonuk@gmail.com> | 2013-06-14 15:53:02 -0700 |
commit | f8cfc728087a484dd71f6a5302a9b43123b0f03d (patch) | |
tree | 3fb17ea3e0a1050b478b460b2f01f1d7079b3b6c | |
parent | 2308069ccad42b892f10f7cb4d5dd68e890408c8 (diff) | |
download | rabbitmq-c-f8cfc728087a484dd71f6a5302a9b43123b0f03d.tar.gz |
Add amqp_simple_wait_frame_noblock() function
Add non-blocking variant of amqp_simple_wait_frame() to assist clients
in writing programs that want non-blocking behavior when consuming
messages from the broker.
-rw-r--r-- | librabbitmq/amqp.h | 10 | ||||
-rw-r--r-- | librabbitmq/amqp_api.c | 4 | ||||
-rw-r--r-- | librabbitmq/amqp_socket.c | 82 |
3 files changed, 92 insertions, 4 deletions
diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h index 2020549..3c77a5d 100644 --- a/librabbitmq/amqp.h +++ b/librabbitmq/amqp.h @@ -152,6 +152,8 @@ typedef _W64 int ssize_t; #include <stddef.h> #include <stdint.h> +struct timeval; + AMQP_BEGIN_DECLS typedef int amqp_boolean_t; @@ -344,6 +346,8 @@ typedef enum amqp_status_enum_ AMQP_STATUS_INVALID_PARAMETER = -0x000A, AMQP_STATUS_TABLE_TOO_BIG = -0x000B, AMQP_STATUS_WRONG_METHOD = -0x000C, + AMQP_STATUS_TIMEOUT = -0x000D, + AMQP_STATUS_TIMER_FAILURE = -0x000E, AMQP_STATUS_TCP_ERROR = -0x0100, AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR = -0x0101, @@ -483,6 +487,12 @@ AMQP_CALL amqp_simple_wait_frame(amqp_connection_state_t state, AMQP_PUBLIC_FUNCTION int +AMQP_CALL amqp_simple_wait_frame_noblock(amqp_connection_state_t state, + amqp_frame_t *decoded_frame, + struct timeval *tv); + +AMQP_PUBLIC_FUNCTION +int AMQP_CALL amqp_simple_wait_method(amqp_connection_state_t state, amqp_channel_t expected_channel, amqp_method_number_t expected_method, diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c index 8e037e1..50666db 100644 --- a/librabbitmq/amqp_api.c +++ b/librabbitmq/amqp_api.c @@ -68,7 +68,9 @@ static const char *base_error_strings[] = { "a socket error occurred", /* AMQP_STATUS_SOCKET_ERROR -0x0009 */ "invalid parameter", /* AMQP_STATUS_INVALID_PARAMETER -0x000A */ "table too large for buffer", /* AMQP_STATUS_TABLE_TOO_BIG -0x000B */ - "unexpected method received" /* AMQP_STATUS_WRONG_METHOD -0x000C */ + "unexpected method received", /* AMQP_STATUS_WRONG_METHOD -0x000C */ + "request timed out", /* AMQP_STATUS_TIMEOUT -0x000D */ + "system timer has failed" /* AMQP_STATUS_TIMER_FAILED -0x000E */ }; static const char *tcp_error_strings[] = { diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index 5fd0aa9..d32849a 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -39,6 +39,7 @@ #endif #include "amqp_private.h" +#include "amqp_timer.h" #include <assert.h> #include <stdarg.h> @@ -349,8 +350,12 @@ amqp_boolean_t amqp_data_in_buffer(amqp_connection_state_t state) } static int wait_frame_inner(amqp_connection_state_t state, - amqp_frame_t *decoded_frame) + amqp_frame_t *decoded_frame, + struct timeval *timeout) { + uint64_t current_timestamp = 0; + uint64_t timeout_timestamp = 0; + while (1) { int res; @@ -375,6 +380,70 @@ static int wait_frame_inner(amqp_connection_state_t state, assert(res != 0); } + if (timeout) { + if (timeout->tv_sec < 0 || timeout->tv_usec < 0) { + return AMQP_STATUS_INVALID_PARAMETER; + } + while (1) { + int fd; + fd_set read_fd; + fd_set except_fd; + uint64_t ns_until_next_timeout; + struct timeval tv; + + fd = amqp_get_sockfd(state); + + FD_ZERO(&read_fd); + FD_SET(fd, &read_fd); + + FD_ZERO(&except_fd); + FD_SET(fd, &except_fd); + + if (0 == current_timestamp) { + current_timestamp = amqp_get_monotonic_timestamp(); + if (0 == current_timestamp) { + return AMQP_STATUS_TIMER_FAILURE; + } + + timeout_timestamp = current_timestamp + + timeout->tv_sec * AMQP_NS_PER_S + + timeout->tv_usec * AMQP_NS_PER_US; + } else { + current_timestamp = amqp_get_monotonic_timestamp(); + if (0 == current_timestamp) { + return AMQP_STATUS_TIMER_FAILURE; + } + } + + /* TODO: Heartbeat timeout goes here */ + + if (current_timestamp > timeout_timestamp) { + return AMQP_STATUS_TIMEOUT; + } + + ns_until_next_timeout = timeout_timestamp - current_timestamp; + + memset(&tv, 0, sizeof(struct timeval)); + tv.tv_sec = ns_until_next_timeout / AMQP_NS_PER_S; + tv.tv_usec = (ns_until_next_timeout % AMQP_NS_PER_S) / AMQP_NS_PER_US; + + res = select(fd + 1, &read_fd, NULL, &except_fd, &tv); + + if (res > 0) { + /* socket is ready to be read from */ + break; + } else if (0 == res) { + /* Timed out - return */ + return AMQP_STATUS_TIMEOUT; + } else if (errno == EINTR) { + /* Try again */ + continue; + } else { + return AMQP_STATUS_SOCKET_ERROR; + } + } + } + res = amqp_socket_recv(state->socket, state->sock_inbound_buffer.bytes, state->sock_inbound_buffer.len, 0); if (res < 0) { @@ -389,6 +458,13 @@ static int wait_frame_inner(amqp_connection_state_t state, int amqp_simple_wait_frame(amqp_connection_state_t state, amqp_frame_t *decoded_frame) { + return amqp_simple_wait_frame_noblock(state, decoded_frame, NULL); +} + +int amqp_simple_wait_frame_noblock(amqp_connection_state_t state, + amqp_frame_t *decoded_frame, + struct timeval *timeout) +{ if (state->first_queued_frame != NULL) { amqp_frame_t *f = (amqp_frame_t *) state->first_queued_frame->data; state->first_queued_frame = state->first_queued_frame->next; @@ -398,7 +474,7 @@ int amqp_simple_wait_frame(amqp_connection_state_t state, *decoded_frame = *f; return AMQP_STATUS_OK; } else { - return wait_frame_inner(state, decoded_frame); + return wait_frame_inner(state, decoded_frame, timeout); } } @@ -471,7 +547,7 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, amqp_frame_t frame; retry: - status = wait_frame_inner(state, &frame); + status = wait_frame_inner(state, &frame, NULL); if (status < 0) { result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; result.library_error = status; |