summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Antonuk <alan.antonuk@gmail.com>2015-04-19 22:11:32 -0700
committerAlan Antonuk <alan.antonuk@gmail.com>2015-04-19 22:11:32 -0700
commit8d1d5ccf6244c6b290318ef92ec7307e32917464 (patch)
tree0a3ee0534fcd9e24b4171e4f4b683802af966d44
parentc745aca12166b086d58dc9c8ededa4896529e2d1 (diff)
downloadrabbitmq-c-8d1d5ccf6244c6b290318ef92ec7307e32917464.tar.gz
Simplify the timer/timeout logic.
Instead of passing around a start time and timeout interval, calculate this early and pass around the timeout value (as a implementation detail), then calculate against current timestamp. This simplifies the code, gives a more realistic 'timeout' for the function at the cost of potentially a few more timer pings. Conflicts: librabbitmq/amqp_socket.c
-rw-r--r--librabbitmq/amqp_api.c2
-rw-r--r--librabbitmq/amqp_private.h2
-rw-r--r--librabbitmq/amqp_socket.c186
-rw-r--r--librabbitmq/amqp_socket.h10
-rw-r--r--librabbitmq/amqp_timer.c88
-rw-r--r--librabbitmq/amqp_timer.h24
6 files changed, 146 insertions, 166 deletions
diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c
index 79e78c5..c5a999d 100644
--- a/librabbitmq/amqp_api.c
+++ b/librabbitmq/amqp_api.c
@@ -198,7 +198,7 @@ int amqp_basic_publish(amqp_connection_state_t state,
}
if (current_timestamp > state->next_recv_heartbeat) {
- res = amqp_try_recv(state, current_timestamp);
+ res = amqp_try_recv(state);
if (AMQP_STATUS_TIMEOUT == res) {
return AMQP_STATUS_HEARTBEAT_TIMEOUT;
} else if (AMQP_STATUS_OK != res) {
diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h
index 50905e1..99c45ab 100644
--- a/librabbitmq/amqp_private.h
+++ b/librabbitmq/amqp_private.h
@@ -205,7 +205,7 @@ static inline uint64_t amqp_calc_next_recv_heartbeat(amqp_connection_state_t sta
return cur + ((uint64_t)state->heartbeat * 2 * AMQP_NS_PER_S);
}
-int amqp_try_recv(amqp_connection_state_t state, uint64_t current_time);
+int amqp_try_recv(amqp_connection_state_t state);
static inline void *amqp_offset(void *data, size_t offset)
{
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index 4b996f3..451b67f 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -258,7 +258,7 @@ amqp_socket_get_sockfd(amqp_socket_t *self)
return self->klass->get_sockfd(self);
}
-static int amqp_poll(int fd, short event, uint64_t start, struct timeval *timeout) {
+static int amqp_poll(int fd, short event, amqp_timer_t timeout) {
struct pollfd pfd;
int res;
int timeout_ms;
@@ -270,11 +270,9 @@ start_poll:
pfd.fd = fd;
pfd.events = event;
- if (timeout) {
- timeout_ms =
- timeout->tv_sec * AMQP_MS_PER_S + timeout->tv_usec / AMQP_US_PER_MS;
- } else {
- timeout_ms = -1;
+ timeout_ms = amqp_timer_ms_left(timeout);
+ if (-1 > timeout_ms) {
+ return timeout_ms;
}
res = poll(&pfd, 1, timeout_ms);
@@ -289,23 +287,6 @@ start_poll:
} else {
switch (amqp_os_socket_error()) {
case EINTR:
- if (timeout) {
- uint64_t end_timestamp;
- uint64_t time_left;
- uint64_t current_timestamp = amqp_get_monotonic_timestamp();
- if (0 == current_timestamp) {
- return AMQP_STATUS_TIMER_FAILURE;
- }
- end_timestamp = start + (uint64_t)timeout->tv_sec * AMQP_NS_PER_S +
- (uint64_t)timeout->tv_usec * AMQP_NS_PER_US;
- if (current_timestamp > end_timestamp) {
- return AMQP_STATUS_TIMEOUT;
- }
- time_left = end_timestamp - current_timestamp;
-
- timeout->tv_sec = time_left / AMQP_NS_PER_S;
- timeout->tv_usec = (time_left % AMQP_NS_PER_S) / AMQP_NS_PER_S;
- }
goto start_poll;
default:
return AMQP_STATUS_SOCKET_ERROR;
@@ -314,12 +295,12 @@ start_poll:
return AMQP_STATUS_OK;
}
-int amqp_poll_read(int fd, uint64_t start, struct timeval *timeout) {
- return amqp_poll(fd, POLLIN, start, timeout);
+int amqp_poll_read(int fd, amqp_timer_t timeout) {
+ return amqp_poll(fd, POLLIN, timeout);
}
-int amqp_poll_write(int fd, uint64_t start, struct timeval* timeout) {
- return amqp_poll(fd, POLLOUT, start, timeout);
+int amqp_poll_write(int fd, amqp_timer_t timeout) {
+ return amqp_poll(fd, POLLOUT, timeout);
}
ssize_t amqp_try_writev(amqp_connection_state_t state, struct iovec *iov,
@@ -329,15 +310,13 @@ ssize_t amqp_try_writev(amqp_connection_state_t state, struct iovec *iov,
ssize_t res;
struct iovec *iov_left = iov;
int iovcnt_left = iovcnt;
- /* TODO(alanxz) this should probably be a parameter */
- struct timeval *timeout = NULL;
- uint64_t start = 0;
ssize_t len_left;
- if (timeout) {
- start = amqp_get_monotonic_timestamp();
- if (0 == start) {
- return AMQP_STATUS_TIMER_FAILURE;
- }
+ /* TODO(alanxz) this should probably be a parameter */
+ amqp_timer_t timeout;
+
+ res = amqp_timer_start(&timeout, NULL);
+ if (AMQP_STATUS_OK != res) {
+ return res;
}
len_left = 0;
@@ -375,10 +354,10 @@ start_send:
default:
return res;
case AMQP_PRIVATE_STATUS_SOCKET_NEEDREAD:
- res = amqp_poll_read(fd, start, timeout);
+ res = amqp_poll_read(fd, timeout);
break;
case AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE:
- res = amqp_poll_write(fd, start, timeout);
+ res = amqp_poll_write(fd, timeout);
break;
}
if (AMQP_STATUS_OK == res) {
@@ -395,13 +374,10 @@ ssize_t amqp_try_send(amqp_connection_state_t state, const void *buf,
/* Assume that len is going to be larger than ssize_t can hold. */
ssize_t len_left = (size_t)len;
/* TODO(alanxz) this should probably be a parameter */
- struct timeval *timeout = NULL;
- uint64_t start = 0;
- if (timeout) {
- start = amqp_get_monotonic_timestamp();
- if (0 == start) {
- return AMQP_STATUS_TIMER_FAILURE;
- }
+ amqp_timer_t timeout;
+ res = amqp_timer_start(&timeout, NULL);
+ if (AMQP_STATUS_OK != res) {
+ return res;
}
start_send:
@@ -423,10 +399,10 @@ start_send:
default:
return res;
case AMQP_PRIVATE_STATUS_SOCKET_NEEDREAD:
- res = amqp_poll_read(fd, start, timeout);
+ res = amqp_poll_read(fd, timeout);
break;
case AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE:
- res = amqp_poll_write(fd, start, timeout);
+ res = amqp_poll_write(fd, timeout);
break;
}
if (AMQP_STATUS_OK == res) {
@@ -439,13 +415,25 @@ int
amqp_open_socket(char const *hostname,
int portnumber)
{
- return amqp_open_socket_noblock(hostname, portnumber, NULL);
+ return amqp_open_socket_inner(hostname, portnumber,
+ amqp_timer_start_infinite());
}
int amqp_open_socket_noblock(char const *hostname,
int portnumber,
struct timeval *timeout)
{
+ amqp_timer_t timer;
+ int res = amqp_timer_start(&timer, timeout);
+ if (AMQP_STATUS_OK != res) {
+ return res;
+ }
+ return amqp_open_socket_inner(hostname, portnumber, timer);
+}
+
+int amqp_open_socket_inner(char const *hostname,
+ int portnumber,
+ amqp_timer_t timer) {
struct addrinfo hint;
struct addrinfo *address_list;
struct addrinfo *addr;
@@ -454,10 +442,6 @@ int amqp_open_socket_noblock(char const *hostname,
int last_error = AMQP_STATUS_OK;
int one = 1; /* for setsockopt */
int res;
- int timer_error;
- amqp_timer_t timer;
-
- AMQP_INIT_TIMER(timer);
last_error = amqp_os_socket_init();
if (AMQP_STATUS_OK != last_error) {
@@ -514,62 +498,18 @@ int amqp_open_socket_noblock(char const *hostname,
}
if (EINPROGRESS == amqp_os_socket_error()) {
- while (1) {
- struct pollfd pfd;
- int timeout_ms;
+ last_error = amqp_poll_write(sockfd, timer);
+ if (AMQP_STATUS_OK == last_error) {
+ int result;
+ socklen_t result_len = sizeof(result);
- pfd.fd = sockfd;
- pfd.events = POLLOUT;
- pfd.revents = 0;
-
- if (timeout) {
- timer_error = amqp_timer_update(&timer, timeout);
- if (timer_error < 0) {
- last_error = timer_error;
- break;
- }
-
- timeout_ms = timer.tv.tv_sec * AMQP_MS_PER_S +
- timer.tv.tv_usec / AMQP_US_PER_MS;
+ if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &result, &result_len) <
+ 0 || result != 0) {
+ last_error = AMQP_STATUS_SOCKET_ERROR;
} else {
- timeout_ms = -1;
- }
- /* Win32 requires except_fds to be passed to detect connection
- * failure. Other platforms only need write_fds, passing except_fds
- * seems to be harmless otherwise
- */
- res = poll(&pfd, 1, timeout_ms);
-
- if (res > 0) {
- int result;
- socklen_t result_len = sizeof(result);
-
- if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &result, &result_len) <
- 0) {
- last_error = AMQP_STATUS_SOCKET_ERROR;
- break;
- }
-
- if (result != 0) {
- last_error = AMQP_STATUS_SOCKET_ERROR;
- break;
- }
-
last_error = AMQP_STATUS_OK;
- break;
- } else if (0 == res) {
- /* Timed out - return */
- last_error = AMQP_STATUS_TIMEOUT;
- break;
- } else if (errno == EINTR) {
- /* Try again */
- continue;
- } else {
- /* Error connecting */
- last_error = AMQP_STATUS_SOCKET_ERROR;
- break;
}
- } /* end while(1) loop */
+ }
if (last_error == AMQP_STATUS_OK || last_error == AMQP_STATUS_TIMEOUT ||
last_error == AMQP_STATUS_TIMER_FAILURE) {
@@ -756,17 +696,10 @@ static int consume_one_frame(amqp_connection_state_t state, amqp_frame_t *decode
}
-static int recv_with_timeout(amqp_connection_state_t state, uint64_t start, struct timeval *timeout)
-{
+static int recv_with_timeout(amqp_connection_state_t state, amqp_timer_t timeout) {
int res;
int fd;
- if (timeout && (timeout->tv_sec < 0 || timeout->tv_usec < 0 ||
- INT_MAX < ((uint64_t)timeout->tv_sec * AMQP_MS_PER_S +
- (uint64_t)timeout->tv_usec / AMQP_US_PER_MS))) {
- return AMQP_STATUS_INVALID_PARAMETER;
- }
-
start_recv:
res = amqp_socket_recv(state->socket, state->sock_inbound_buffer.bytes,
state->sock_inbound_buffer.len, 0);
@@ -780,10 +713,10 @@ start_recv:
default:
return res;
case AMQP_PRIVATE_STATUS_SOCKET_NEEDREAD:
- res = amqp_poll_read(fd, start, timeout);
+ res = amqp_poll_read(fd, timeout);
break;
case AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE:
- res = amqp_poll_write(fd, start, timeout);
+ res = amqp_poll_write(fd, timeout);
break;
}
if (AMQP_STATUS_OK == res) {
@@ -806,9 +739,8 @@ start_recv:
return AMQP_STATUS_OK;
}
-int amqp_try_recv(amqp_connection_state_t state, uint64_t current_time)
-{
- struct timeval tv;
+int amqp_try_recv(amqp_connection_state_t state) {
+ amqp_timer_t timeout;
while (amqp_data_in_buffer(state)) {
amqp_frame_t frame;
@@ -848,12 +780,9 @@ int amqp_try_recv(amqp_connection_state_t state, uint64_t current_time)
state->last_queued_frame = link;
}
}
+ timeout = amqp_timer_start_immediate();
- memset(&tv, 0, sizeof(struct timeval));
- tv.tv_sec = 0;
- tv.tv_usec = 0;
-
- return recv_with_timeout(state, current_time, &tv);
+ return recv_with_timeout(state, timeout);
}
static int wait_frame_inner(amqp_connection_state_t state,
@@ -864,7 +793,7 @@ static int wait_frame_inner(amqp_connection_state_t state,
uint64_t timeout_timestamp = 0;
uint64_t next_timestamp = 0;
struct timeval tv;
- struct timeval *tvp = NULL;
+ amqp_timer_t timer;
if (timeout && (timeout->tv_sec < 0 || timeout->tv_usec < 0)) {
return AMQP_STATUS_INVALID_PARAMETER;
@@ -928,7 +857,6 @@ beginrecv:
}
}
-
if (amqp_heartbeat_enabled(state)) {
if (current_timestamp > state->next_recv_heartbeat) {
state->next_recv_heartbeat = current_timestamp;
@@ -952,10 +880,18 @@ beginrecv:
tv.tv_sec = ns_until_next_timeout / AMQP_NS_PER_S;
tv.tv_usec = (ns_until_next_timeout % AMQP_NS_PER_S) / AMQP_NS_PER_US;
- tvp = &tv;
+ /* TODO: refactor the above so that this doesn't require a timer ping */
+ res = amqp_timer_start(&timer, &tv);
+ if (AMQP_STATUS_OK != res) {
+ return res;
+ }
+ } else {
+ timer = amqp_timer_start_infinite();
}
- res = recv_with_timeout(state, current_timestamp, tvp);
+ /* TODO this needs to wait for a _frame_ and not anything written from the
+ * socket */
+ res = recv_with_timeout(state, timer);
if (AMQP_STATUS_TIMEOUT == res) {
if (next_timestamp == state->next_recv_heartbeat) {
diff --git a/librabbitmq/amqp_socket.h b/librabbitmq/amqp_socket.h
index 41617a0..82ad39b 100644
--- a/librabbitmq/amqp_socket.h
+++ b/librabbitmq/amqp_socket.h
@@ -33,6 +33,7 @@
#define AMQP_SOCKET_H
#include "amqp_private.h"
+#include "amqp_timer.h"
AMQP_BEGIN_DECLS
@@ -185,11 +186,12 @@ amqp_socket_delete(amqp_socket_t *self);
int
amqp_open_socket_noblock(char const *hostname, int portnumber, struct timeval *timeout);
-int
-amqp_poll_read(int fd, uint64_t start, struct timeval *timeout);
+int amqp_open_socket_inner(char const *hostname, int portnumber,
+ amqp_timer_t timeout);
-int
-amqp_poll_write(int fd, uint64_t start, struct timeval *timeout);
+int amqp_poll_read(int fd, amqp_timer_t timeout);
+
+int amqp_poll_write(int fd, amqp_timer_t timeout);
int
amqp_queue_frame(amqp_connection_state_t state, amqp_frame_t *frame);
diff --git a/librabbitmq/amqp_timer.c b/librabbitmq/amqp_timer.c
index 46410dc..c4722bb 100644
--- a/librabbitmq/amqp_timer.c
+++ b/librabbitmq/amqp_timer.c
@@ -23,6 +23,8 @@
*/
#include "amqp.h"
#include "amqp_timer.h"
+#include <assert.h>
+#include <limits.h>
#include <string.h>
#if (defined(_WIN32) || defined(__WIN32__) || defined(WIN32))
@@ -104,37 +106,77 @@ amqp_get_monotonic_timestamp(void)
}
#endif /* AMQP_POSIX_TIMER_API */
-int
-amqp_timer_update(amqp_timer_t *timer, struct timeval *timeout)
-{
- if (0 == timer->current_timestamp) {
- timer->current_timestamp = amqp_get_monotonic_timestamp();
+int amqp_timer_start(amqp_timer_t *timer, struct timeval *timeout) {
+ uint64_t now_ns;
+ uint64_t delta_ns;
- if (0 == timer->current_timestamp) {
- return AMQP_STATUS_TIMER_FAILURE;
- }
+ assert(NULL != timer);
- timer->timeout_timestamp = timer->current_timestamp +
- (uint64_t)timeout->tv_sec * AMQP_NS_PER_S +
- (uint64_t)timeout->tv_usec * AMQP_NS_PER_US;
+ if (NULL == timeout) {
+ *timer = amqp_timer_start_infinite();
+ return AMQP_STATUS_OK;
+ }
+ if (0 == timeout->tv_sec && 0 == timeout->tv_usec) {
+ *timer = amqp_timer_start_immediate();
+ return AMQP_STATUS_OK;
+ }
- } else {
- timer->current_timestamp = amqp_get_monotonic_timestamp();
+ if (timeout->tv_sec < 0 || timeout->tv_usec < 0) {
+ return AMQP_STATUS_INVALID_PARAMETER;
+ }
- if (0 == timer->current_timestamp) {
- return AMQP_STATUS_TIMER_FAILURE;
- }
+ delta_ns = (uint64_t)timeout->tv_sec * AMQP_NS_PER_S +
+ (uint64_t)timeout->tv_usec * AMQP_NS_PER_US;
+
+ now_ns = amqp_get_monotonic_timestamp();
+ if (0 == now_ns) {
+ return AMQP_STATUS_TIMER_FAILURE;
}
- if (timer->current_timestamp > timer->timeout_timestamp) {
- return AMQP_STATUS_TIMEOUT;
+ timer->expiration_ns = now_ns + delta_ns;
+ if (now_ns > timer->expiration_ns ||
+ delta_ns > timer->expiration_ns) {
+ return AMQP_STATUS_INVALID_PARAMETER;
}
- timer->ns_until_next_timeout = timer->timeout_timestamp - timer->current_timestamp;
+ return AMQP_STATUS_OK;
+}
- memset(&timer->tv, 0, sizeof(struct timeval));
- timer->tv.tv_sec = timer->ns_until_next_timeout / AMQP_NS_PER_S;
- timer->tv.tv_usec = (timer->ns_until_next_timeout % AMQP_NS_PER_S) / AMQP_NS_PER_US;
+amqp_timer_t amqp_timer_start_immediate(void) {
+ amqp_timer_t timer;
+ timer.expiration_ns = 0;
+ return timer;
+}
- return AMQP_STATUS_OK;
+amqp_timer_t amqp_timer_start_infinite(void) {
+ amqp_timer_t timer;
+ timer.expiration_ns = UINT64_MAX;
+ return timer;
+}
+
+int amqp_timer_ms_left(amqp_timer_t timer) {
+ uint64_t now_ns;
+ uint64_t delta_ns;
+ int left_ms;
+
+ if (UINT64_MAX == timer.expiration_ns) {
+ return -1;
+ }
+ if (0 == timer.expiration_ns) {
+ return 0;
+ }
+
+ now_ns = amqp_get_monotonic_timestamp();
+ if (0 == now_ns) {
+ return AMQP_STATUS_TIMER_FAILURE;
+ }
+
+ if (now_ns >= timer.expiration_ns) {
+ return 0;
+ }
+
+ delta_ns = timer.expiration_ns - now_ns;
+ left_ms = delta_ns / AMQP_NS_PER_MS;
+
+ return left_ms;
}
diff --git a/librabbitmq/amqp_timer.h b/librabbitmq/amqp_timer.h
index 3936922..195f70b 100644
--- a/librabbitmq/amqp_timer.h
+++ b/librabbitmq/amqp_timer.h
@@ -44,24 +44,24 @@
#define AMQP_NS_PER_MS 1000000
#define AMQP_NS_PER_US 1000
-#define AMQP_INIT_TIMER(structure) { \
- structure.current_timestamp = 0; \
- structure.timeout_timestamp = 0; \
-}
-
typedef struct amqp_timer_t_ {
- uint64_t current_timestamp;
- uint64_t timeout_timestamp;
- uint64_t ns_until_next_timeout;
- struct timeval tv;
+ uint64_t expiration_ns;
} amqp_timer_t;
/* Gets a monotonic timestamp in ns */
uint64_t
amqp_get_monotonic_timestamp(void);
-/* Prepare timeout value and modify timer state based on timer state. */
-int
-amqp_timer_update(amqp_timer_t *timer, struct timeval *timeout);
+/* Start an amqp_timer_t set to expire timeout from now. */
+int amqp_timer_start(amqp_timer_t *timer, struct timeval *timeout);
+
+amqp_timer_t amqp_timer_start_immediate(void);
+
+amqp_timer_t amqp_timer_start_infinite(void);
+
+/* Get the (positive) number of ms left on the timer, or -1 for an infinite
+ * timer, or AMQP_STATUS_TIMEOUT on time out, or AMQP_STATUS_TIMER_FAILURE on
+ * failure. */
+int amqp_timer_ms_left(amqp_timer_t timer);
#endif /* AMQP_TIMER_H */