summaryrefslogtreecommitdiff
path: root/librabbitmq/amqp_socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'librabbitmq/amqp_socket.c')
-rw-r--r--librabbitmq/amqp_socket.c186
1 files changed, 61 insertions, 125 deletions
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index 4b996f3..451b67f 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -258,7 +258,7 @@ amqp_socket_get_sockfd(amqp_socket_t *self)
return self->klass->get_sockfd(self);
}
-static int amqp_poll(int fd, short event, uint64_t start, struct timeval *timeout) {
+static int amqp_poll(int fd, short event, amqp_timer_t timeout) {
struct pollfd pfd;
int res;
int timeout_ms;
@@ -270,11 +270,9 @@ start_poll:
pfd.fd = fd;
pfd.events = event;
- if (timeout) {
- timeout_ms =
- timeout->tv_sec * AMQP_MS_PER_S + timeout->tv_usec / AMQP_US_PER_MS;
- } else {
- timeout_ms = -1;
+ timeout_ms = amqp_timer_ms_left(timeout);
+ if (-1 > timeout_ms) {
+ return timeout_ms;
}
res = poll(&pfd, 1, timeout_ms);
@@ -289,23 +287,6 @@ start_poll:
} else {
switch (amqp_os_socket_error()) {
case EINTR:
- if (timeout) {
- uint64_t end_timestamp;
- uint64_t time_left;
- uint64_t current_timestamp = amqp_get_monotonic_timestamp();
- if (0 == current_timestamp) {
- return AMQP_STATUS_TIMER_FAILURE;
- }
- end_timestamp = start + (uint64_t)timeout->tv_sec * AMQP_NS_PER_S +
- (uint64_t)timeout->tv_usec * AMQP_NS_PER_US;
- if (current_timestamp > end_timestamp) {
- return AMQP_STATUS_TIMEOUT;
- }
- time_left = end_timestamp - current_timestamp;
-
- timeout->tv_sec = time_left / AMQP_NS_PER_S;
- timeout->tv_usec = (time_left % AMQP_NS_PER_S) / AMQP_NS_PER_S;
- }
goto start_poll;
default:
return AMQP_STATUS_SOCKET_ERROR;
@@ -314,12 +295,12 @@ start_poll:
return AMQP_STATUS_OK;
}
-int amqp_poll_read(int fd, uint64_t start, struct timeval *timeout) {
- return amqp_poll(fd, POLLIN, start, timeout);
+int amqp_poll_read(int fd, amqp_timer_t timeout) {
+ return amqp_poll(fd, POLLIN, timeout);
}
-int amqp_poll_write(int fd, uint64_t start, struct timeval* timeout) {
- return amqp_poll(fd, POLLOUT, start, timeout);
+int amqp_poll_write(int fd, amqp_timer_t timeout) {
+ return amqp_poll(fd, POLLOUT, timeout);
}
ssize_t amqp_try_writev(amqp_connection_state_t state, struct iovec *iov,
@@ -329,15 +310,13 @@ ssize_t amqp_try_writev(amqp_connection_state_t state, struct iovec *iov,
ssize_t res;
struct iovec *iov_left = iov;
int iovcnt_left = iovcnt;
- /* TODO(alanxz) this should probably be a parameter */
- struct timeval *timeout = NULL;
- uint64_t start = 0;
ssize_t len_left;
- if (timeout) {
- start = amqp_get_monotonic_timestamp();
- if (0 == start) {
- return AMQP_STATUS_TIMER_FAILURE;
- }
+ /* TODO(alanxz) this should probably be a parameter */
+ amqp_timer_t timeout;
+
+ res = amqp_timer_start(&timeout, NULL);
+ if (AMQP_STATUS_OK != res) {
+ return res;
}
len_left = 0;
@@ -375,10 +354,10 @@ start_send:
default:
return res;
case AMQP_PRIVATE_STATUS_SOCKET_NEEDREAD:
- res = amqp_poll_read(fd, start, timeout);
+ res = amqp_poll_read(fd, timeout);
break;
case AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE:
- res = amqp_poll_write(fd, start, timeout);
+ res = amqp_poll_write(fd, timeout);
break;
}
if (AMQP_STATUS_OK == res) {
@@ -395,13 +374,10 @@ ssize_t amqp_try_send(amqp_connection_state_t state, const void *buf,
/* Assume that len is going to be larger than ssize_t can hold. */
ssize_t len_left = (size_t)len;
/* TODO(alanxz) this should probably be a parameter */
- struct timeval *timeout = NULL;
- uint64_t start = 0;
- if (timeout) {
- start = amqp_get_monotonic_timestamp();
- if (0 == start) {
- return AMQP_STATUS_TIMER_FAILURE;
- }
+ amqp_timer_t timeout;
+ res = amqp_timer_start(&timeout, NULL);
+ if (AMQP_STATUS_OK != res) {
+ return res;
}
start_send:
@@ -423,10 +399,10 @@ start_send:
default:
return res;
case AMQP_PRIVATE_STATUS_SOCKET_NEEDREAD:
- res = amqp_poll_read(fd, start, timeout);
+ res = amqp_poll_read(fd, timeout);
break;
case AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE:
- res = amqp_poll_write(fd, start, timeout);
+ res = amqp_poll_write(fd, timeout);
break;
}
if (AMQP_STATUS_OK == res) {
@@ -439,13 +415,25 @@ int
amqp_open_socket(char const *hostname,
int portnumber)
{
- return amqp_open_socket_noblock(hostname, portnumber, NULL);
+ return amqp_open_socket_inner(hostname, portnumber,
+ amqp_timer_start_infinite());
}
int amqp_open_socket_noblock(char const *hostname,
int portnumber,
struct timeval *timeout)
{
+ amqp_timer_t timer;
+ int res = amqp_timer_start(&timer, timeout);
+ if (AMQP_STATUS_OK != res) {
+ return res;
+ }
+ return amqp_open_socket_inner(hostname, portnumber, timer);
+}
+
+int amqp_open_socket_inner(char const *hostname,
+ int portnumber,
+ amqp_timer_t timer) {
struct addrinfo hint;
struct addrinfo *address_list;
struct addrinfo *addr;
@@ -454,10 +442,6 @@ int amqp_open_socket_noblock(char const *hostname,
int last_error = AMQP_STATUS_OK;
int one = 1; /* for setsockopt */
int res;
- int timer_error;
- amqp_timer_t timer;
-
- AMQP_INIT_TIMER(timer);
last_error = amqp_os_socket_init();
if (AMQP_STATUS_OK != last_error) {
@@ -514,62 +498,18 @@ int amqp_open_socket_noblock(char const *hostname,
}
if (EINPROGRESS == amqp_os_socket_error()) {
- while (1) {
- struct pollfd pfd;
- int timeout_ms;
+ last_error = amqp_poll_write(sockfd, timer);
+ if (AMQP_STATUS_OK == last_error) {
+ int result;
+ socklen_t result_len = sizeof(result);
- pfd.fd = sockfd;
- pfd.events = POLLOUT;
- pfd.revents = 0;
-
- if (timeout) {
- timer_error = amqp_timer_update(&timer, timeout);
- if (timer_error < 0) {
- last_error = timer_error;
- break;
- }
-
- timeout_ms = timer.tv.tv_sec * AMQP_MS_PER_S +
- timer.tv.tv_usec / AMQP_US_PER_MS;
+ if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &result, &result_len) <
+ 0 || result != 0) {
+ last_error = AMQP_STATUS_SOCKET_ERROR;
} else {
- timeout_ms = -1;
- }
- /* Win32 requires except_fds to be passed to detect connection
- * failure. Other platforms only need write_fds, passing except_fds
- * seems to be harmless otherwise
- */
- res = poll(&pfd, 1, timeout_ms);
-
- if (res > 0) {
- int result;
- socklen_t result_len = sizeof(result);
-
- if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &result, &result_len) <
- 0) {
- last_error = AMQP_STATUS_SOCKET_ERROR;
- break;
- }
-
- if (result != 0) {
- last_error = AMQP_STATUS_SOCKET_ERROR;
- break;
- }
-
last_error = AMQP_STATUS_OK;
- break;
- } else if (0 == res) {
- /* Timed out - return */
- last_error = AMQP_STATUS_TIMEOUT;
- break;
- } else if (errno == EINTR) {
- /* Try again */
- continue;
- } else {
- /* Error connecting */
- last_error = AMQP_STATUS_SOCKET_ERROR;
- break;
}
- } /* end while(1) loop */
+ }
if (last_error == AMQP_STATUS_OK || last_error == AMQP_STATUS_TIMEOUT ||
last_error == AMQP_STATUS_TIMER_FAILURE) {
@@ -756,17 +696,10 @@ static int consume_one_frame(amqp_connection_state_t state, amqp_frame_t *decode
}
-static int recv_with_timeout(amqp_connection_state_t state, uint64_t start, struct timeval *timeout)
-{
+static int recv_with_timeout(amqp_connection_state_t state, amqp_timer_t timeout) {
int res;
int fd;
- if (timeout && (timeout->tv_sec < 0 || timeout->tv_usec < 0 ||
- INT_MAX < ((uint64_t)timeout->tv_sec * AMQP_MS_PER_S +
- (uint64_t)timeout->tv_usec / AMQP_US_PER_MS))) {
- return AMQP_STATUS_INVALID_PARAMETER;
- }
-
start_recv:
res = amqp_socket_recv(state->socket, state->sock_inbound_buffer.bytes,
state->sock_inbound_buffer.len, 0);
@@ -780,10 +713,10 @@ start_recv:
default:
return res;
case AMQP_PRIVATE_STATUS_SOCKET_NEEDREAD:
- res = amqp_poll_read(fd, start, timeout);
+ res = amqp_poll_read(fd, timeout);
break;
case AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE:
- res = amqp_poll_write(fd, start, timeout);
+ res = amqp_poll_write(fd, timeout);
break;
}
if (AMQP_STATUS_OK == res) {
@@ -806,9 +739,8 @@ start_recv:
return AMQP_STATUS_OK;
}
-int amqp_try_recv(amqp_connection_state_t state, uint64_t current_time)
-{
- struct timeval tv;
+int amqp_try_recv(amqp_connection_state_t state) {
+ amqp_timer_t timeout;
while (amqp_data_in_buffer(state)) {
amqp_frame_t frame;
@@ -848,12 +780,9 @@ int amqp_try_recv(amqp_connection_state_t state, uint64_t current_time)
state->last_queued_frame = link;
}
}
+ timeout = amqp_timer_start_immediate();
- memset(&tv, 0, sizeof(struct timeval));
- tv.tv_sec = 0;
- tv.tv_usec = 0;
-
- return recv_with_timeout(state, current_time, &tv);
+ return recv_with_timeout(state, timeout);
}
static int wait_frame_inner(amqp_connection_state_t state,
@@ -864,7 +793,7 @@ static int wait_frame_inner(amqp_connection_state_t state,
uint64_t timeout_timestamp = 0;
uint64_t next_timestamp = 0;
struct timeval tv;
- struct timeval *tvp = NULL;
+ amqp_timer_t timer;
if (timeout && (timeout->tv_sec < 0 || timeout->tv_usec < 0)) {
return AMQP_STATUS_INVALID_PARAMETER;
@@ -928,7 +857,6 @@ beginrecv:
}
}
-
if (amqp_heartbeat_enabled(state)) {
if (current_timestamp > state->next_recv_heartbeat) {
state->next_recv_heartbeat = current_timestamp;
@@ -952,10 +880,18 @@ beginrecv:
tv.tv_sec = ns_until_next_timeout / AMQP_NS_PER_S;
tv.tv_usec = (ns_until_next_timeout % AMQP_NS_PER_S) / AMQP_NS_PER_US;
- tvp = &tv;
+ /* TODO: refactor the above so that this doesn't require a timer ping */
+ res = amqp_timer_start(&timer, &tv);
+ if (AMQP_STATUS_OK != res) {
+ return res;
+ }
+ } else {
+ timer = amqp_timer_start_infinite();
}
- res = recv_with_timeout(state, current_timestamp, tvp);
+ /* TODO this needs to wait for a _frame_ and not anything written from the
+ * socket */
+ res = recv_with_timeout(state, timer);
if (AMQP_STATUS_TIMEOUT == res) {
if (next_timestamp == state->next_recv_heartbeat) {