diff options
author | Alan Antonuk <alan.antonuk@gmail.com> | 2015-04-19 22:11:32 -0700 |
---|---|---|
committer | Alan Antonuk <alan.antonuk@gmail.com> | 2015-04-19 22:11:32 -0700 |
commit | 8d1d5ccf6244c6b290318ef92ec7307e32917464 (patch) | |
tree | 0a3ee0534fcd9e24b4171e4f4b683802af966d44 | |
parent | c745aca12166b086d58dc9c8ededa4896529e2d1 (diff) | |
download | rabbitmq-c-8d1d5ccf6244c6b290318ef92ec7307e32917464.tar.gz |
Simplify the timer/timeout logic.
Instead of passing around a start time and timeout interval, calculate this
early and pass around the timeout value (as a implementation detail), then
calculate against current timestamp. This simplifies the code, gives a more
realistic 'timeout' for the function at the cost of potentially a few more timer
pings.
Conflicts:
librabbitmq/amqp_socket.c
-rw-r--r-- | librabbitmq/amqp_api.c | 2 | ||||
-rw-r--r-- | librabbitmq/amqp_private.h | 2 | ||||
-rw-r--r-- | librabbitmq/amqp_socket.c | 186 | ||||
-rw-r--r-- | librabbitmq/amqp_socket.h | 10 | ||||
-rw-r--r-- | librabbitmq/amqp_timer.c | 88 | ||||
-rw-r--r-- | librabbitmq/amqp_timer.h | 24 |
6 files changed, 146 insertions, 166 deletions
diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c index 79e78c5..c5a999d 100644 --- a/librabbitmq/amqp_api.c +++ b/librabbitmq/amqp_api.c @@ -198,7 +198,7 @@ int amqp_basic_publish(amqp_connection_state_t state, } if (current_timestamp > state->next_recv_heartbeat) { - res = amqp_try_recv(state, current_timestamp); + res = amqp_try_recv(state); if (AMQP_STATUS_TIMEOUT == res) { return AMQP_STATUS_HEARTBEAT_TIMEOUT; } else if (AMQP_STATUS_OK != res) { diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h index 50905e1..99c45ab 100644 --- a/librabbitmq/amqp_private.h +++ b/librabbitmq/amqp_private.h @@ -205,7 +205,7 @@ static inline uint64_t amqp_calc_next_recv_heartbeat(amqp_connection_state_t sta return cur + ((uint64_t)state->heartbeat * 2 * AMQP_NS_PER_S); } -int amqp_try_recv(amqp_connection_state_t state, uint64_t current_time); +int amqp_try_recv(amqp_connection_state_t state); static inline void *amqp_offset(void *data, size_t offset) { diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index 4b996f3..451b67f 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -258,7 +258,7 @@ amqp_socket_get_sockfd(amqp_socket_t *self) return self->klass->get_sockfd(self); } -static int amqp_poll(int fd, short event, uint64_t start, struct timeval *timeout) { +static int amqp_poll(int fd, short event, amqp_timer_t timeout) { struct pollfd pfd; int res; int timeout_ms; @@ -270,11 +270,9 @@ start_poll: pfd.fd = fd; pfd.events = event; - if (timeout) { - timeout_ms = - timeout->tv_sec * AMQP_MS_PER_S + timeout->tv_usec / AMQP_US_PER_MS; - } else { - timeout_ms = -1; + timeout_ms = amqp_timer_ms_left(timeout); + if (-1 > timeout_ms) { + return timeout_ms; } res = poll(&pfd, 1, timeout_ms); @@ -289,23 +287,6 @@ start_poll: } else { switch (amqp_os_socket_error()) { case EINTR: - if (timeout) { - uint64_t end_timestamp; - uint64_t time_left; - uint64_t current_timestamp = amqp_get_monotonic_timestamp(); - if (0 == current_timestamp) { - return AMQP_STATUS_TIMER_FAILURE; - } - end_timestamp = start + (uint64_t)timeout->tv_sec * AMQP_NS_PER_S + - (uint64_t)timeout->tv_usec * AMQP_NS_PER_US; - if (current_timestamp > end_timestamp) { - return AMQP_STATUS_TIMEOUT; - } - time_left = end_timestamp - current_timestamp; - - timeout->tv_sec = time_left / AMQP_NS_PER_S; - timeout->tv_usec = (time_left % AMQP_NS_PER_S) / AMQP_NS_PER_S; - } goto start_poll; default: return AMQP_STATUS_SOCKET_ERROR; @@ -314,12 +295,12 @@ start_poll: return AMQP_STATUS_OK; } -int amqp_poll_read(int fd, uint64_t start, struct timeval *timeout) { - return amqp_poll(fd, POLLIN, start, timeout); +int amqp_poll_read(int fd, amqp_timer_t timeout) { + return amqp_poll(fd, POLLIN, timeout); } -int amqp_poll_write(int fd, uint64_t start, struct timeval* timeout) { - return amqp_poll(fd, POLLOUT, start, timeout); +int amqp_poll_write(int fd, amqp_timer_t timeout) { + return amqp_poll(fd, POLLOUT, timeout); } ssize_t amqp_try_writev(amqp_connection_state_t state, struct iovec *iov, @@ -329,15 +310,13 @@ ssize_t amqp_try_writev(amqp_connection_state_t state, struct iovec *iov, ssize_t res; struct iovec *iov_left = iov; int iovcnt_left = iovcnt; - /* TODO(alanxz) this should probably be a parameter */ - struct timeval *timeout = NULL; - uint64_t start = 0; ssize_t len_left; - if (timeout) { - start = amqp_get_monotonic_timestamp(); - if (0 == start) { - return AMQP_STATUS_TIMER_FAILURE; - } + /* TODO(alanxz) this should probably be a parameter */ + amqp_timer_t timeout; + + res = amqp_timer_start(&timeout, NULL); + if (AMQP_STATUS_OK != res) { + return res; } len_left = 0; @@ -375,10 +354,10 @@ start_send: default: return res; case AMQP_PRIVATE_STATUS_SOCKET_NEEDREAD: - res = amqp_poll_read(fd, start, timeout); + res = amqp_poll_read(fd, timeout); break; case AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE: - res = amqp_poll_write(fd, start, timeout); + res = amqp_poll_write(fd, timeout); break; } if (AMQP_STATUS_OK == res) { @@ -395,13 +374,10 @@ ssize_t amqp_try_send(amqp_connection_state_t state, const void *buf, /* Assume that len is going to be larger than ssize_t can hold. */ ssize_t len_left = (size_t)len; /* TODO(alanxz) this should probably be a parameter */ - struct timeval *timeout = NULL; - uint64_t start = 0; - if (timeout) { - start = amqp_get_monotonic_timestamp(); - if (0 == start) { - return AMQP_STATUS_TIMER_FAILURE; - } + amqp_timer_t timeout; + res = amqp_timer_start(&timeout, NULL); + if (AMQP_STATUS_OK != res) { + return res; } start_send: @@ -423,10 +399,10 @@ start_send: default: return res; case AMQP_PRIVATE_STATUS_SOCKET_NEEDREAD: - res = amqp_poll_read(fd, start, timeout); + res = amqp_poll_read(fd, timeout); break; case AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE: - res = amqp_poll_write(fd, start, timeout); + res = amqp_poll_write(fd, timeout); break; } if (AMQP_STATUS_OK == res) { @@ -439,13 +415,25 @@ int amqp_open_socket(char const *hostname, int portnumber) { - return amqp_open_socket_noblock(hostname, portnumber, NULL); + return amqp_open_socket_inner(hostname, portnumber, + amqp_timer_start_infinite()); } int amqp_open_socket_noblock(char const *hostname, int portnumber, struct timeval *timeout) { + amqp_timer_t timer; + int res = amqp_timer_start(&timer, timeout); + if (AMQP_STATUS_OK != res) { + return res; + } + return amqp_open_socket_inner(hostname, portnumber, timer); +} + +int amqp_open_socket_inner(char const *hostname, + int portnumber, + amqp_timer_t timer) { struct addrinfo hint; struct addrinfo *address_list; struct addrinfo *addr; @@ -454,10 +442,6 @@ int amqp_open_socket_noblock(char const *hostname, int last_error = AMQP_STATUS_OK; int one = 1; /* for setsockopt */ int res; - int timer_error; - amqp_timer_t timer; - - AMQP_INIT_TIMER(timer); last_error = amqp_os_socket_init(); if (AMQP_STATUS_OK != last_error) { @@ -514,62 +498,18 @@ int amqp_open_socket_noblock(char const *hostname, } if (EINPROGRESS == amqp_os_socket_error()) { - while (1) { - struct pollfd pfd; - int timeout_ms; + last_error = amqp_poll_write(sockfd, timer); + if (AMQP_STATUS_OK == last_error) { + int result; + socklen_t result_len = sizeof(result); - pfd.fd = sockfd; - pfd.events = POLLOUT; - pfd.revents = 0; - - if (timeout) { - timer_error = amqp_timer_update(&timer, timeout); - if (timer_error < 0) { - last_error = timer_error; - break; - } - - timeout_ms = timer.tv.tv_sec * AMQP_MS_PER_S + - timer.tv.tv_usec / AMQP_US_PER_MS; + if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &result, &result_len) < + 0 || result != 0) { + last_error = AMQP_STATUS_SOCKET_ERROR; } else { - timeout_ms = -1; - } - /* Win32 requires except_fds to be passed to detect connection - * failure. Other platforms only need write_fds, passing except_fds - * seems to be harmless otherwise - */ - res = poll(&pfd, 1, timeout_ms); - - if (res > 0) { - int result; - socklen_t result_len = sizeof(result); - - if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &result, &result_len) < - 0) { - last_error = AMQP_STATUS_SOCKET_ERROR; - break; - } - - if (result != 0) { - last_error = AMQP_STATUS_SOCKET_ERROR; - break; - } - last_error = AMQP_STATUS_OK; - break; - } else if (0 == res) { - /* Timed out - return */ - last_error = AMQP_STATUS_TIMEOUT; - break; - } else if (errno == EINTR) { - /* Try again */ - continue; - } else { - /* Error connecting */ - last_error = AMQP_STATUS_SOCKET_ERROR; - break; } - } /* end while(1) loop */ + } if (last_error == AMQP_STATUS_OK || last_error == AMQP_STATUS_TIMEOUT || last_error == AMQP_STATUS_TIMER_FAILURE) { @@ -756,17 +696,10 @@ static int consume_one_frame(amqp_connection_state_t state, amqp_frame_t *decode } -static int recv_with_timeout(amqp_connection_state_t state, uint64_t start, struct timeval *timeout) -{ +static int recv_with_timeout(amqp_connection_state_t state, amqp_timer_t timeout) { int res; int fd; - if (timeout && (timeout->tv_sec < 0 || timeout->tv_usec < 0 || - INT_MAX < ((uint64_t)timeout->tv_sec * AMQP_MS_PER_S + - (uint64_t)timeout->tv_usec / AMQP_US_PER_MS))) { - return AMQP_STATUS_INVALID_PARAMETER; - } - start_recv: res = amqp_socket_recv(state->socket, state->sock_inbound_buffer.bytes, state->sock_inbound_buffer.len, 0); @@ -780,10 +713,10 @@ start_recv: default: return res; case AMQP_PRIVATE_STATUS_SOCKET_NEEDREAD: - res = amqp_poll_read(fd, start, timeout); + res = amqp_poll_read(fd, timeout); break; case AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE: - res = amqp_poll_write(fd, start, timeout); + res = amqp_poll_write(fd, timeout); break; } if (AMQP_STATUS_OK == res) { @@ -806,9 +739,8 @@ start_recv: return AMQP_STATUS_OK; } -int amqp_try_recv(amqp_connection_state_t state, uint64_t current_time) -{ - struct timeval tv; +int amqp_try_recv(amqp_connection_state_t state) { + amqp_timer_t timeout; while (amqp_data_in_buffer(state)) { amqp_frame_t frame; @@ -848,12 +780,9 @@ int amqp_try_recv(amqp_connection_state_t state, uint64_t current_time) state->last_queued_frame = link; } } + timeout = amqp_timer_start_immediate(); - memset(&tv, 0, sizeof(struct timeval)); - tv.tv_sec = 0; - tv.tv_usec = 0; - - return recv_with_timeout(state, current_time, &tv); + return recv_with_timeout(state, timeout); } static int wait_frame_inner(amqp_connection_state_t state, @@ -864,7 +793,7 @@ static int wait_frame_inner(amqp_connection_state_t state, uint64_t timeout_timestamp = 0; uint64_t next_timestamp = 0; struct timeval tv; - struct timeval *tvp = NULL; + amqp_timer_t timer; if (timeout && (timeout->tv_sec < 0 || timeout->tv_usec < 0)) { return AMQP_STATUS_INVALID_PARAMETER; @@ -928,7 +857,6 @@ beginrecv: } } - if (amqp_heartbeat_enabled(state)) { if (current_timestamp > state->next_recv_heartbeat) { state->next_recv_heartbeat = current_timestamp; @@ -952,10 +880,18 @@ beginrecv: tv.tv_sec = ns_until_next_timeout / AMQP_NS_PER_S; tv.tv_usec = (ns_until_next_timeout % AMQP_NS_PER_S) / AMQP_NS_PER_US; - tvp = &tv; + /* TODO: refactor the above so that this doesn't require a timer ping */ + res = amqp_timer_start(&timer, &tv); + if (AMQP_STATUS_OK != res) { + return res; + } + } else { + timer = amqp_timer_start_infinite(); } - res = recv_with_timeout(state, current_timestamp, tvp); + /* TODO this needs to wait for a _frame_ and not anything written from the + * socket */ + res = recv_with_timeout(state, timer); if (AMQP_STATUS_TIMEOUT == res) { if (next_timestamp == state->next_recv_heartbeat) { diff --git a/librabbitmq/amqp_socket.h b/librabbitmq/amqp_socket.h index 41617a0..82ad39b 100644 --- a/librabbitmq/amqp_socket.h +++ b/librabbitmq/amqp_socket.h @@ -33,6 +33,7 @@ #define AMQP_SOCKET_H #include "amqp_private.h" +#include "amqp_timer.h" AMQP_BEGIN_DECLS @@ -185,11 +186,12 @@ amqp_socket_delete(amqp_socket_t *self); int amqp_open_socket_noblock(char const *hostname, int portnumber, struct timeval *timeout); -int -amqp_poll_read(int fd, uint64_t start, struct timeval *timeout); +int amqp_open_socket_inner(char const *hostname, int portnumber, + amqp_timer_t timeout); -int -amqp_poll_write(int fd, uint64_t start, struct timeval *timeout); +int amqp_poll_read(int fd, amqp_timer_t timeout); + +int amqp_poll_write(int fd, amqp_timer_t timeout); int amqp_queue_frame(amqp_connection_state_t state, amqp_frame_t *frame); diff --git a/librabbitmq/amqp_timer.c b/librabbitmq/amqp_timer.c index 46410dc..c4722bb 100644 --- a/librabbitmq/amqp_timer.c +++ b/librabbitmq/amqp_timer.c @@ -23,6 +23,8 @@ */ #include "amqp.h" #include "amqp_timer.h" +#include <assert.h> +#include <limits.h> #include <string.h> #if (defined(_WIN32) || defined(__WIN32__) || defined(WIN32)) @@ -104,37 +106,77 @@ amqp_get_monotonic_timestamp(void) } #endif /* AMQP_POSIX_TIMER_API */ -int -amqp_timer_update(amqp_timer_t *timer, struct timeval *timeout) -{ - if (0 == timer->current_timestamp) { - timer->current_timestamp = amqp_get_monotonic_timestamp(); +int amqp_timer_start(amqp_timer_t *timer, struct timeval *timeout) { + uint64_t now_ns; + uint64_t delta_ns; - if (0 == timer->current_timestamp) { - return AMQP_STATUS_TIMER_FAILURE; - } + assert(NULL != timer); - timer->timeout_timestamp = timer->current_timestamp + - (uint64_t)timeout->tv_sec * AMQP_NS_PER_S + - (uint64_t)timeout->tv_usec * AMQP_NS_PER_US; + if (NULL == timeout) { + *timer = amqp_timer_start_infinite(); + return AMQP_STATUS_OK; + } + if (0 == timeout->tv_sec && 0 == timeout->tv_usec) { + *timer = amqp_timer_start_immediate(); + return AMQP_STATUS_OK; + } - } else { - timer->current_timestamp = amqp_get_monotonic_timestamp(); + if (timeout->tv_sec < 0 || timeout->tv_usec < 0) { + return AMQP_STATUS_INVALID_PARAMETER; + } - if (0 == timer->current_timestamp) { - return AMQP_STATUS_TIMER_FAILURE; - } + delta_ns = (uint64_t)timeout->tv_sec * AMQP_NS_PER_S + + (uint64_t)timeout->tv_usec * AMQP_NS_PER_US; + + now_ns = amqp_get_monotonic_timestamp(); + if (0 == now_ns) { + return AMQP_STATUS_TIMER_FAILURE; } - if (timer->current_timestamp > timer->timeout_timestamp) { - return AMQP_STATUS_TIMEOUT; + timer->expiration_ns = now_ns + delta_ns; + if (now_ns > timer->expiration_ns || + delta_ns > timer->expiration_ns) { + return AMQP_STATUS_INVALID_PARAMETER; } - timer->ns_until_next_timeout = timer->timeout_timestamp - timer->current_timestamp; + return AMQP_STATUS_OK; +} - memset(&timer->tv, 0, sizeof(struct timeval)); - timer->tv.tv_sec = timer->ns_until_next_timeout / AMQP_NS_PER_S; - timer->tv.tv_usec = (timer->ns_until_next_timeout % AMQP_NS_PER_S) / AMQP_NS_PER_US; +amqp_timer_t amqp_timer_start_immediate(void) { + amqp_timer_t timer; + timer.expiration_ns = 0; + return timer; +} - return AMQP_STATUS_OK; +amqp_timer_t amqp_timer_start_infinite(void) { + amqp_timer_t timer; + timer.expiration_ns = UINT64_MAX; + return timer; +} + +int amqp_timer_ms_left(amqp_timer_t timer) { + uint64_t now_ns; + uint64_t delta_ns; + int left_ms; + + if (UINT64_MAX == timer.expiration_ns) { + return -1; + } + if (0 == timer.expiration_ns) { + return 0; + } + + now_ns = amqp_get_monotonic_timestamp(); + if (0 == now_ns) { + return AMQP_STATUS_TIMER_FAILURE; + } + + if (now_ns >= timer.expiration_ns) { + return 0; + } + + delta_ns = timer.expiration_ns - now_ns; + left_ms = delta_ns / AMQP_NS_PER_MS; + + return left_ms; } diff --git a/librabbitmq/amqp_timer.h b/librabbitmq/amqp_timer.h index 3936922..195f70b 100644 --- a/librabbitmq/amqp_timer.h +++ b/librabbitmq/amqp_timer.h @@ -44,24 +44,24 @@ #define AMQP_NS_PER_MS 1000000 #define AMQP_NS_PER_US 1000 -#define AMQP_INIT_TIMER(structure) { \ - structure.current_timestamp = 0; \ - structure.timeout_timestamp = 0; \ -} - typedef struct amqp_timer_t_ { - uint64_t current_timestamp; - uint64_t timeout_timestamp; - uint64_t ns_until_next_timeout; - struct timeval tv; + uint64_t expiration_ns; } amqp_timer_t; /* Gets a monotonic timestamp in ns */ uint64_t amqp_get_monotonic_timestamp(void); -/* Prepare timeout value and modify timer state based on timer state. */ -int -amqp_timer_update(amqp_timer_t *timer, struct timeval *timeout); +/* Start an amqp_timer_t set to expire timeout from now. */ +int amqp_timer_start(amqp_timer_t *timer, struct timeval *timeout); + +amqp_timer_t amqp_timer_start_immediate(void); + +amqp_timer_t amqp_timer_start_infinite(void); + +/* Get the (positive) number of ms left on the timer, or -1 for an infinite + * timer, or AMQP_STATUS_TIMEOUT on time out, or AMQP_STATUS_TIMER_FAILURE on + * failure. */ +int amqp_timer_ms_left(amqp_timer_t timer); #endif /* AMQP_TIMER_H */ |