From 301e9f111c9b049772a0b6ca1d3b05bc5688531f Mon Sep 17 00:00:00 2001 From: Alan Antonuk Date: Mon, 6 Apr 2015 23:44:27 -0700 Subject: Add support for non-blocking read in rabbitmq-c --- librabbitmq/amqp_socket.c | 118 +++++++++++++++++++++++----------------------- 1 file changed, 60 insertions(+), 58 deletions(-) diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index 15c616a..52a9671 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -282,12 +282,6 @@ int amqp_open_socket_noblock(char const *hostname, AMQP_INIT_TIMER(timer); - if (timeout && (timeout->tv_sec < 0 || timeout->tv_usec < 0 || - INT_MAX < ((uint64_t)timeout->tv_sec * AMQP_MS_PER_S + - (uint64_t)timeout->tv_usec / AMQP_US_PER_MS))) { - return AMQP_STATUS_INVALID_PARAMETER; - } - last_error = amqp_os_socket_init(); if (AMQP_STATUS_OK != last_error) { return last_error; @@ -589,71 +583,79 @@ static int recv_with_timeout(amqp_connection_state_t state, uint64_t start, stru { int res; - if (timeout) { - int fd; + if (timeout && (timeout->tv_sec < 0 || timeout->tv_usec < 0 || + INT_MAX < ((uint64_t)timeout->tv_sec * AMQP_MS_PER_S + + (uint64_t)timeout->tv_usec / AMQP_US_PER_MS))) { + return AMQP_STATUS_INVALID_PARAMETER; + } + +start_recv: + res = amqp_socket_recv(state->socket, state->sock_inbound_buffer.bytes, + state->sock_inbound_buffer.len, 0); - fd = amqp_get_sockfd(state); - if (-1 == fd) { - return AMQP_STATUS_CONNECTION_CLOSED; - } + if (res < 0) { + if (AMQP_PRIVATE_STATUS_SOCKET_NEEDREAD == res) { + int fd; - if (INT_MAX < (uint64_t)timeout->tv_sec * AMQP_MS_PER_S + - (uint64_t)timeout->tv_usec / AMQP_US_PER_MS) { - return AMQP_STATUS_INVALID_PARAMETER; - } + fd = amqp_get_sockfd(state); + if (-1 == fd) { + return AMQP_STATUS_CONNECTION_CLOSED; + } - while (1) { - struct pollfd pfd; - int timeout_ms; + while (1) { + struct pollfd pfd; + int timeout_ms; - pfd.fd = fd; - pfd.events = POLLIN; - pfd.revents = 0; + pfd.fd = fd; + pfd.events = POLLIN; + pfd.revents = 0; - timeout_ms = timeout->tv_sec * AMQP_MS_PER_S + - timeout->tv_usec / AMQP_US_PER_MS; + if (timeout) { + timeout_ms = timeout->tv_sec * AMQP_MS_PER_S + + timeout->tv_usec / AMQP_US_PER_MS; + } else { + timeout_ms = -1; + } - res = poll(&pfd, 1, timeout_ms); + res = poll(&pfd, 1, timeout_ms); - if (0 < res) { - break; - } else if (0 == res) { - return AMQP_STATUS_TIMEOUT; - } else if (-1 == res) { - if (EINTR == errno) { - if (timeout) { - uint64_t end_timestamp; - uint64_t time_left; - uint64_t current_timestamp = amqp_get_monotonic_timestamp(); - if (0 == current_timestamp) { - return AMQP_STATUS_TIMER_FAILURE; - } - end_timestamp = start + - (uint64_t)timeout->tv_sec * AMQP_NS_PER_S + - (uint64_t)timeout->tv_usec * AMQP_NS_PER_US; - if (current_timestamp > end_timestamp) { - return AMQP_STATUS_TIMEOUT; - } - - time_left = end_timestamp - current_timestamp; - - timeout->tv_sec = time_left / AMQP_NS_PER_S; - timeout->tv_usec = (time_left % AMQP_NS_PER_S) / AMQP_NS_PER_US; + if (0 < res) { + goto start_recv; + } else if (0 == res) { + return AMQP_STATUS_TIMEOUT; + } else if (-1 == res) { + switch (errno) { + case EINTR: + if (timeout) { + uint64_t end_timestamp; + uint64_t time_left; + uint64_t current_timestamp = amqp_get_monotonic_timestamp(); + if (0 == current_timestamp) { + return AMQP_STATUS_TIMER_FAILURE; + } + end_timestamp = start + + (uint64_t)timeout->tv_sec * AMQP_NS_PER_S + + (uint64_t)timeout->tv_usec * AMQP_NS_PER_US; + if (current_timestamp > end_timestamp) { + return AMQP_STATUS_TIMEOUT; + } + + time_left = end_timestamp - current_timestamp; + + timeout->tv_sec = time_left / AMQP_NS_PER_S; + timeout->tv_usec = (time_left % AMQP_NS_PER_S) / AMQP_NS_PER_US; + } + continue; + default: + return AMQP_STATUS_SOCKET_ERROR; } - continue; } - return AMQP_STATUS_SOCKET_ERROR; } + } else { + return res; } } - res = amqp_socket_recv(state->socket, state->sock_inbound_buffer.bytes, - state->sock_inbound_buffer.len, 0); - - if (res < 0) { - return res; - } - state->sock_inbound_limit = res; state->sock_inbound_offset = 0; -- cgit v1.2.1