From c745aca12166b086d58dc9c8ededa4896529e2d1 Mon Sep 17 00:00:00 2001 From: Alan Antonuk Date: Mon, 13 Apr 2015 23:45:31 -0700 Subject: Refactor read/write poll function usage. --- librabbitmq/amqp_socket.c | 160 +++++++++++++++++++++------------------------- librabbitmq/amqp_socket.h | 6 ++ 2 files changed, 78 insertions(+), 88 deletions(-) diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index e9aa2a7..4b996f3 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -258,14 +258,17 @@ amqp_socket_get_sockfd(amqp_socket_t *self) return self->klass->get_sockfd(self); } -static int poll_for_write(int fd, uint64_t start, struct timeval *timeout) { +static int amqp_poll(int fd, short event, uint64_t start, struct timeval *timeout) { struct pollfd pfd; int res; int timeout_ms; + /* Function should only ever be called with one of these two */ + assert(event == POLLIN || event == POLLOUT); + start_poll: pfd.fd = fd; - pfd.events = POLLOUT; + pfd.events = event; if (timeout) { timeout_ms = @@ -277,7 +280,10 @@ start_poll: res = poll(&pfd, 1, timeout_ms); if (0 < res) { - return AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE; + /* TODO: optimize this a bit by returning the AMQP_STATUS_SOCKET_ERROR or + * equivalent when pdf.revent is POLLHUP or POLLERR, so an extra syscall + * doesn't need to be made. */ + return AMQP_STATUS_OK; } else if (0 == res) { return AMQP_STATUS_TIMEOUT; } else { @@ -305,17 +311,27 @@ start_poll: return AMQP_STATUS_SOCKET_ERROR; } } + return AMQP_STATUS_OK; +} + +int amqp_poll_read(int fd, uint64_t start, struct timeval *timeout) { + return amqp_poll(fd, POLLIN, start, timeout); +} + +int amqp_poll_write(int fd, uint64_t start, struct timeval* timeout) { + return amqp_poll(fd, POLLOUT, start, timeout); } ssize_t amqp_try_writev(amqp_connection_state_t state, struct iovec *iov, int iovcnt) { int i; + int fd; ssize_t res; struct iovec *iov_left = iov; int iovcnt_left = iovcnt; /* TODO(alanxz) this should probably be a parameter */ struct timeval *timeout = NULL; - uint64_t start; + uint64_t start = 0; ssize_t len_left; if (timeout) { start = amqp_get_monotonic_timestamp(); @@ -350,19 +366,23 @@ start_send: } } goto start_send; - } else if (res != AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE) { - return res; } - { - int fd; - fd = amqp_get_sockfd(state); - if (-1 == fd) { - return AMQP_STATUS_SOCKET_CLOSED; - } - res = poll_for_write(fd, start, timeout); - if (AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE == res) { - goto start_send; - } + fd = amqp_get_sockfd(state); + if (-1 == fd) { + return AMQP_STATUS_SOCKET_CLOSED; + } + switch (res) { + default: + return res; + case AMQP_PRIVATE_STATUS_SOCKET_NEEDREAD: + res = amqp_poll_read(fd, start, timeout); + break; + case AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE: + res = amqp_poll_write(fd, start, timeout); + break; + } + if (AMQP_STATUS_OK == res) { + goto start_send; } return res; } @@ -370,12 +390,13 @@ start_send: ssize_t amqp_try_send(amqp_connection_state_t state, const void *buf, size_t len) { ssize_t res; + int fd; void* buf_left = (void*)buf; /* Assume that len is going to be larger than ssize_t can hold. */ ssize_t len_left = (size_t)len; /* TODO(alanxz) this should probably be a parameter */ struct timeval *timeout = NULL; - uint64_t start; + uint64_t start = 0; if (timeout) { start = amqp_get_monotonic_timestamp(); if (0 == start) { @@ -393,19 +414,23 @@ start_send: return AMQP_STATUS_OK; } goto start_send; - } else if (res != AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE) { - return res; } - { - int fd; - fd = amqp_get_sockfd(state); - if (-1 == fd) { - return AMQP_STATUS_SOCKET_CLOSED; - } - res = poll_for_write(fd, start, timeout); - if (AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE == res) { - goto start_send; - } + fd = amqp_get_sockfd(state); + if (-1 == fd) { + return AMQP_STATUS_SOCKET_CLOSED; + } + switch (res) { + default: + return res; + case AMQP_PRIVATE_STATUS_SOCKET_NEEDREAD: + res = amqp_poll_read(fd, start, timeout); + break; + case AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE: + res = amqp_poll_write(fd, start, timeout); + break; + } + if (AMQP_STATUS_OK == res) { + goto start_send; } return res; } @@ -734,6 +759,7 @@ static int consume_one_frame(amqp_connection_state_t state, amqp_frame_t *decode static int recv_with_timeout(amqp_connection_state_t state, uint64_t start, struct timeval *timeout) { int res; + int fd; if (timeout && (timeout->tv_sec < 0 || timeout->tv_usec < 0 || INT_MAX < ((uint64_t)timeout->tv_sec * AMQP_MS_PER_S + @@ -746,66 +772,24 @@ start_recv: state->sock_inbound_buffer.len, 0); if (res < 0) { - if (AMQP_PRIVATE_STATUS_SOCKET_NEEDREAD == res) { - int fd; - - fd = amqp_get_sockfd(state); - if (-1 == fd) { - return AMQP_STATUS_CONNECTION_CLOSED; - } - - while (1) { - struct pollfd pfd; - int timeout_ms; - - pfd.fd = fd; - pfd.events = POLLIN; - pfd.revents = 0; - - if (timeout) { - timeout_ms = timeout->tv_sec * AMQP_MS_PER_S + - timeout->tv_usec / AMQP_US_PER_MS; - } else { - timeout_ms = -1; - } - - res = poll(&pfd, 1, timeout_ms); - - if (0 < res) { - goto start_recv; - } else if (0 == res) { - return AMQP_STATUS_TIMEOUT; - } else if (-1 == res) { - switch (amqp_os_socket_error()) { - case EINTR: - if (timeout) { - uint64_t end_timestamp; - uint64_t time_left; - uint64_t current_timestamp = amqp_get_monotonic_timestamp(); - if (0 == current_timestamp) { - return AMQP_STATUS_TIMER_FAILURE; - } - end_timestamp = start + - (uint64_t)timeout->tv_sec * AMQP_NS_PER_S + - (uint64_t)timeout->tv_usec * AMQP_NS_PER_US; - if (current_timestamp > end_timestamp) { - return AMQP_STATUS_TIMEOUT; - } - - time_left = end_timestamp - current_timestamp; - - timeout->tv_sec = time_left / AMQP_NS_PER_S; - timeout->tv_usec = (time_left % AMQP_NS_PER_S) / AMQP_NS_PER_US; - } - continue; - default: - return AMQP_STATUS_SOCKET_ERROR; - } - } - } - } else { - return res; + fd = amqp_get_sockfd(state); + if (-1 == fd) { + return AMQP_STATUS_CONNECTION_CLOSED; } + switch (res) { + default: + return res; + case AMQP_PRIVATE_STATUS_SOCKET_NEEDREAD: + res = amqp_poll_read(fd, start, timeout); + break; + case AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE: + res = amqp_poll_write(fd, start, timeout); + break; + } + if (AMQP_STATUS_OK == res) { + goto start_recv; + } + return res; } state->sock_inbound_limit = res; diff --git a/librabbitmq/amqp_socket.h b/librabbitmq/amqp_socket.h index a1c4040..41617a0 100644 --- a/librabbitmq/amqp_socket.h +++ b/librabbitmq/amqp_socket.h @@ -185,6 +185,12 @@ amqp_socket_delete(amqp_socket_t *self); int amqp_open_socket_noblock(char const *hostname, int portnumber, struct timeval *timeout); +int +amqp_poll_read(int fd, uint64_t start, struct timeval *timeout); + +int +amqp_poll_write(int fd, uint64_t start, struct timeval *timeout); + int amqp_queue_frame(amqp_connection_state_t state, amqp_frame_t *frame); -- cgit v1.2.1