summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Antonuk <alan.antonuk@gmail.com>2013-06-14 14:54:02 -0700
committerAlan Antonuk <alan.antonuk@gmail.com>2013-06-14 15:53:02 -0700
commitf8cfc728087a484dd71f6a5302a9b43123b0f03d (patch)
tree3fb17ea3e0a1050b478b460b2f01f1d7079b3b6c
parent2308069ccad42b892f10f7cb4d5dd68e890408c8 (diff)
downloadrabbitmq-c-f8cfc728087a484dd71f6a5302a9b43123b0f03d.tar.gz
Add amqp_simple_wait_frame_noblock() function
Add non-blocking variant of amqp_simple_wait_frame() to assist clients in writing programs that want non-blocking behavior when consuming messages from the broker.
-rw-r--r--librabbitmq/amqp.h10
-rw-r--r--librabbitmq/amqp_api.c4
-rw-r--r--librabbitmq/amqp_socket.c82
3 files changed, 92 insertions, 4 deletions
diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h
index 2020549..3c77a5d 100644
--- a/librabbitmq/amqp.h
+++ b/librabbitmq/amqp.h
@@ -152,6 +152,8 @@ typedef _W64 int ssize_t;
#include <stddef.h>
#include <stdint.h>
+struct timeval;
+
AMQP_BEGIN_DECLS
typedef int amqp_boolean_t;
@@ -344,6 +346,8 @@ typedef enum amqp_status_enum_
AMQP_STATUS_INVALID_PARAMETER = -0x000A,
AMQP_STATUS_TABLE_TOO_BIG = -0x000B,
AMQP_STATUS_WRONG_METHOD = -0x000C,
+ AMQP_STATUS_TIMEOUT = -0x000D,
+ AMQP_STATUS_TIMER_FAILURE = -0x000E,
AMQP_STATUS_TCP_ERROR = -0x0100,
AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR = -0x0101,
@@ -483,6 +487,12 @@ AMQP_CALL amqp_simple_wait_frame(amqp_connection_state_t state,
AMQP_PUBLIC_FUNCTION
int
+AMQP_CALL amqp_simple_wait_frame_noblock(amqp_connection_state_t state,
+ amqp_frame_t *decoded_frame,
+ struct timeval *tv);
+
+AMQP_PUBLIC_FUNCTION
+int
AMQP_CALL amqp_simple_wait_method(amqp_connection_state_t state,
amqp_channel_t expected_channel,
amqp_method_number_t expected_method,
diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c
index 8e037e1..50666db 100644
--- a/librabbitmq/amqp_api.c
+++ b/librabbitmq/amqp_api.c
@@ -68,7 +68,9 @@ static const char *base_error_strings[] = {
"a socket error occurred", /* AMQP_STATUS_SOCKET_ERROR -0x0009 */
"invalid parameter", /* AMQP_STATUS_INVALID_PARAMETER -0x000A */
"table too large for buffer", /* AMQP_STATUS_TABLE_TOO_BIG -0x000B */
- "unexpected method received" /* AMQP_STATUS_WRONG_METHOD -0x000C */
+ "unexpected method received", /* AMQP_STATUS_WRONG_METHOD -0x000C */
+ "request timed out", /* AMQP_STATUS_TIMEOUT -0x000D */
+ "system timer has failed" /* AMQP_STATUS_TIMER_FAILED -0x000E */
};
static const char *tcp_error_strings[] = {
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index 5fd0aa9..d32849a 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -39,6 +39,7 @@
#endif
#include "amqp_private.h"
+#include "amqp_timer.h"
#include <assert.h>
#include <stdarg.h>
@@ -349,8 +350,12 @@ amqp_boolean_t amqp_data_in_buffer(amqp_connection_state_t state)
}
static int wait_frame_inner(amqp_connection_state_t state,
- amqp_frame_t *decoded_frame)
+ amqp_frame_t *decoded_frame,
+ struct timeval *timeout)
{
+ uint64_t current_timestamp = 0;
+ uint64_t timeout_timestamp = 0;
+
while (1) {
int res;
@@ -375,6 +380,70 @@ static int wait_frame_inner(amqp_connection_state_t state,
assert(res != 0);
}
+ if (timeout) {
+ if (timeout->tv_sec < 0 || timeout->tv_usec < 0) {
+ return AMQP_STATUS_INVALID_PARAMETER;
+ }
+ while (1) {
+ int fd;
+ fd_set read_fd;
+ fd_set except_fd;
+ uint64_t ns_until_next_timeout;
+ struct timeval tv;
+
+ fd = amqp_get_sockfd(state);
+
+ FD_ZERO(&read_fd);
+ FD_SET(fd, &read_fd);
+
+ FD_ZERO(&except_fd);
+ FD_SET(fd, &except_fd);
+
+ if (0 == current_timestamp) {
+ current_timestamp = amqp_get_monotonic_timestamp();
+ if (0 == current_timestamp) {
+ return AMQP_STATUS_TIMER_FAILURE;
+ }
+
+ timeout_timestamp = current_timestamp +
+ timeout->tv_sec * AMQP_NS_PER_S +
+ timeout->tv_usec * AMQP_NS_PER_US;
+ } else {
+ current_timestamp = amqp_get_monotonic_timestamp();
+ if (0 == current_timestamp) {
+ return AMQP_STATUS_TIMER_FAILURE;
+ }
+ }
+
+ /* TODO: Heartbeat timeout goes here */
+
+ if (current_timestamp > timeout_timestamp) {
+ return AMQP_STATUS_TIMEOUT;
+ }
+
+ ns_until_next_timeout = timeout_timestamp - current_timestamp;
+
+ memset(&tv, 0, sizeof(struct timeval));
+ tv.tv_sec = ns_until_next_timeout / AMQP_NS_PER_S;
+ tv.tv_usec = (ns_until_next_timeout % AMQP_NS_PER_S) / AMQP_NS_PER_US;
+
+ res = select(fd + 1, &read_fd, NULL, &except_fd, &tv);
+
+ if (res > 0) {
+ /* socket is ready to be read from */
+ break;
+ } else if (0 == res) {
+ /* Timed out - return */
+ return AMQP_STATUS_TIMEOUT;
+ } else if (errno == EINTR) {
+ /* Try again */
+ continue;
+ } else {
+ return AMQP_STATUS_SOCKET_ERROR;
+ }
+ }
+ }
+
res = amqp_socket_recv(state->socket, state->sock_inbound_buffer.bytes,
state->sock_inbound_buffer.len, 0);
if (res < 0) {
@@ -389,6 +458,13 @@ static int wait_frame_inner(amqp_connection_state_t state,
int amqp_simple_wait_frame(amqp_connection_state_t state,
amqp_frame_t *decoded_frame)
{
+ return amqp_simple_wait_frame_noblock(state, decoded_frame, NULL);
+}
+
+int amqp_simple_wait_frame_noblock(amqp_connection_state_t state,
+ amqp_frame_t *decoded_frame,
+ struct timeval *timeout)
+{
if (state->first_queued_frame != NULL) {
amqp_frame_t *f = (amqp_frame_t *) state->first_queued_frame->data;
state->first_queued_frame = state->first_queued_frame->next;
@@ -398,7 +474,7 @@ int amqp_simple_wait_frame(amqp_connection_state_t state,
*decoded_frame = *f;
return AMQP_STATUS_OK;
} else {
- return wait_frame_inner(state, decoded_frame);
+ return wait_frame_inner(state, decoded_frame, timeout);
}
}
@@ -471,7 +547,7 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
amqp_frame_t frame;
retry:
- status = wait_frame_inner(state, &frame);
+ status = wait_frame_inner(state, &frame, NULL);
if (status < 0) {
result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
result.library_error = status;