summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Antonuk <alan.antonuk@gmail.com>2015-04-06 23:44:27 -0700
committerAlan Antonuk <alan.antonuk@gmail.com>2015-04-19 22:09:32 -0700
commit301e9f111c9b049772a0b6ca1d3b05bc5688531f (patch)
treef24f0c62181a1255913149019ba1c310b20710f3
parent3b775c45f7ba6a3f8b54491f3535d54efe5f89fa (diff)
downloadrabbitmq-c-301e9f111c9b049772a0b6ca1d3b05bc5688531f.tar.gz
Add support for non-blocking read in rabbitmq-c
-rw-r--r--librabbitmq/amqp_socket.c118
1 files changed, 60 insertions, 58 deletions
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index 15c616a..52a9671 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -282,12 +282,6 @@ int amqp_open_socket_noblock(char const *hostname,
AMQP_INIT_TIMER(timer);
- if (timeout && (timeout->tv_sec < 0 || timeout->tv_usec < 0 ||
- INT_MAX < ((uint64_t)timeout->tv_sec * AMQP_MS_PER_S +
- (uint64_t)timeout->tv_usec / AMQP_US_PER_MS))) {
- return AMQP_STATUS_INVALID_PARAMETER;
- }
-
last_error = amqp_os_socket_init();
if (AMQP_STATUS_OK != last_error) {
return last_error;
@@ -589,71 +583,79 @@ static int recv_with_timeout(amqp_connection_state_t state, uint64_t start, stru
{
int res;
- if (timeout) {
- int fd;
+ if (timeout && (timeout->tv_sec < 0 || timeout->tv_usec < 0 ||
+ INT_MAX < ((uint64_t)timeout->tv_sec * AMQP_MS_PER_S +
+ (uint64_t)timeout->tv_usec / AMQP_US_PER_MS))) {
+ return AMQP_STATUS_INVALID_PARAMETER;
+ }
+
+start_recv:
+ res = amqp_socket_recv(state->socket, state->sock_inbound_buffer.bytes,
+ state->sock_inbound_buffer.len, 0);
- fd = amqp_get_sockfd(state);
- if (-1 == fd) {
- return AMQP_STATUS_CONNECTION_CLOSED;
- }
+ if (res < 0) {
+ if (AMQP_PRIVATE_STATUS_SOCKET_NEEDREAD == res) {
+ int fd;
- if (INT_MAX < (uint64_t)timeout->tv_sec * AMQP_MS_PER_S +
- (uint64_t)timeout->tv_usec / AMQP_US_PER_MS) {
- return AMQP_STATUS_INVALID_PARAMETER;
- }
+ fd = amqp_get_sockfd(state);
+ if (-1 == fd) {
+ return AMQP_STATUS_CONNECTION_CLOSED;
+ }
- while (1) {
- struct pollfd pfd;
- int timeout_ms;
+ while (1) {
+ struct pollfd pfd;
+ int timeout_ms;
- pfd.fd = fd;
- pfd.events = POLLIN;
- pfd.revents = 0;
+ pfd.fd = fd;
+ pfd.events = POLLIN;
+ pfd.revents = 0;
- timeout_ms = timeout->tv_sec * AMQP_MS_PER_S +
- timeout->tv_usec / AMQP_US_PER_MS;
+ if (timeout) {
+ timeout_ms = timeout->tv_sec * AMQP_MS_PER_S +
+ timeout->tv_usec / AMQP_US_PER_MS;
+ } else {
+ timeout_ms = -1;
+ }
- res = poll(&pfd, 1, timeout_ms);
+ res = poll(&pfd, 1, timeout_ms);
- if (0 < res) {
- break;
- } else if (0 == res) {
- return AMQP_STATUS_TIMEOUT;
- } else if (-1 == res) {
- if (EINTR == errno) {
- if (timeout) {
- uint64_t end_timestamp;
- uint64_t time_left;
- uint64_t current_timestamp = amqp_get_monotonic_timestamp();
- if (0 == current_timestamp) {
- return AMQP_STATUS_TIMER_FAILURE;
- }
- end_timestamp = start +
- (uint64_t)timeout->tv_sec * AMQP_NS_PER_S +
- (uint64_t)timeout->tv_usec * AMQP_NS_PER_US;
- if (current_timestamp > end_timestamp) {
- return AMQP_STATUS_TIMEOUT;
- }
-
- time_left = end_timestamp - current_timestamp;
-
- timeout->tv_sec = time_left / AMQP_NS_PER_S;
- timeout->tv_usec = (time_left % AMQP_NS_PER_S) / AMQP_NS_PER_US;
+ if (0 < res) {
+ goto start_recv;
+ } else if (0 == res) {
+ return AMQP_STATUS_TIMEOUT;
+ } else if (-1 == res) {
+ switch (errno) {
+ case EINTR:
+ if (timeout) {
+ uint64_t end_timestamp;
+ uint64_t time_left;
+ uint64_t current_timestamp = amqp_get_monotonic_timestamp();
+ if (0 == current_timestamp) {
+ return AMQP_STATUS_TIMER_FAILURE;
+ }
+ end_timestamp = start +
+ (uint64_t)timeout->tv_sec * AMQP_NS_PER_S +
+ (uint64_t)timeout->tv_usec * AMQP_NS_PER_US;
+ if (current_timestamp > end_timestamp) {
+ return AMQP_STATUS_TIMEOUT;
+ }
+
+ time_left = end_timestamp - current_timestamp;
+
+ timeout->tv_sec = time_left / AMQP_NS_PER_S;
+ timeout->tv_usec = (time_left % AMQP_NS_PER_S) / AMQP_NS_PER_US;
+ }
+ continue;
+ default:
+ return AMQP_STATUS_SOCKET_ERROR;
}
- continue;
}
- return AMQP_STATUS_SOCKET_ERROR;
}
+ } else {
+ return res;
}
}
- res = amqp_socket_recv(state->socket, state->sock_inbound_buffer.bytes,
- state->sock_inbound_buffer.len, 0);
-
- if (res < 0) {
- return res;
- }
-
state->sock_inbound_limit = res;
state->sock_inbound_offset = 0;