diff options
Diffstat (limited to 'librabbitmq/amqp_socket.c')
-rw-r--r-- | librabbitmq/amqp_socket.c | 186 |
1 files changed, 61 insertions, 125 deletions
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index 4b996f3..451b67f 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -258,7 +258,7 @@ amqp_socket_get_sockfd(amqp_socket_t *self) return self->klass->get_sockfd(self); } -static int amqp_poll(int fd, short event, uint64_t start, struct timeval *timeout) { +static int amqp_poll(int fd, short event, amqp_timer_t timeout) { struct pollfd pfd; int res; int timeout_ms; @@ -270,11 +270,9 @@ start_poll: pfd.fd = fd; pfd.events = event; - if (timeout) { - timeout_ms = - timeout->tv_sec * AMQP_MS_PER_S + timeout->tv_usec / AMQP_US_PER_MS; - } else { - timeout_ms = -1; + timeout_ms = amqp_timer_ms_left(timeout); + if (-1 > timeout_ms) { + return timeout_ms; } res = poll(&pfd, 1, timeout_ms); @@ -289,23 +287,6 @@ start_poll: } else { switch (amqp_os_socket_error()) { case EINTR: - if (timeout) { - uint64_t end_timestamp; - uint64_t time_left; - uint64_t current_timestamp = amqp_get_monotonic_timestamp(); - if (0 == current_timestamp) { - return AMQP_STATUS_TIMER_FAILURE; - } - end_timestamp = start + (uint64_t)timeout->tv_sec * AMQP_NS_PER_S + - (uint64_t)timeout->tv_usec * AMQP_NS_PER_US; - if (current_timestamp > end_timestamp) { - return AMQP_STATUS_TIMEOUT; - } - time_left = end_timestamp - current_timestamp; - - timeout->tv_sec = time_left / AMQP_NS_PER_S; - timeout->tv_usec = (time_left % AMQP_NS_PER_S) / AMQP_NS_PER_S; - } goto start_poll; default: return AMQP_STATUS_SOCKET_ERROR; @@ -314,12 +295,12 @@ start_poll: return AMQP_STATUS_OK; } -int amqp_poll_read(int fd, uint64_t start, struct timeval *timeout) { - return amqp_poll(fd, POLLIN, start, timeout); +int amqp_poll_read(int fd, amqp_timer_t timeout) { + return amqp_poll(fd, POLLIN, timeout); } -int amqp_poll_write(int fd, uint64_t start, struct timeval* timeout) { - return amqp_poll(fd, POLLOUT, start, timeout); +int amqp_poll_write(int fd, amqp_timer_t timeout) { + return amqp_poll(fd, POLLOUT, timeout); } ssize_t amqp_try_writev(amqp_connection_state_t state, struct iovec *iov, @@ -329,15 +310,13 @@ ssize_t amqp_try_writev(amqp_connection_state_t state, struct iovec *iov, ssize_t res; struct iovec *iov_left = iov; int iovcnt_left = iovcnt; - /* TODO(alanxz) this should probably be a parameter */ - struct timeval *timeout = NULL; - uint64_t start = 0; ssize_t len_left; - if (timeout) { - start = amqp_get_monotonic_timestamp(); - if (0 == start) { - return AMQP_STATUS_TIMER_FAILURE; - } + /* TODO(alanxz) this should probably be a parameter */ + amqp_timer_t timeout; + + res = amqp_timer_start(&timeout, NULL); + if (AMQP_STATUS_OK != res) { + return res; } len_left = 0; @@ -375,10 +354,10 @@ start_send: default: return res; case AMQP_PRIVATE_STATUS_SOCKET_NEEDREAD: - res = amqp_poll_read(fd, start, timeout); + res = amqp_poll_read(fd, timeout); break; case AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE: - res = amqp_poll_write(fd, start, timeout); + res = amqp_poll_write(fd, timeout); break; } if (AMQP_STATUS_OK == res) { @@ -395,13 +374,10 @@ ssize_t amqp_try_send(amqp_connection_state_t state, const void *buf, /* Assume that len is going to be larger than ssize_t can hold. */ ssize_t len_left = (size_t)len; /* TODO(alanxz) this should probably be a parameter */ - struct timeval *timeout = NULL; - uint64_t start = 0; - if (timeout) { - start = amqp_get_monotonic_timestamp(); - if (0 == start) { - return AMQP_STATUS_TIMER_FAILURE; - } + amqp_timer_t timeout; + res = amqp_timer_start(&timeout, NULL); + if (AMQP_STATUS_OK != res) { + return res; } start_send: @@ -423,10 +399,10 @@ start_send: default: return res; case AMQP_PRIVATE_STATUS_SOCKET_NEEDREAD: - res = amqp_poll_read(fd, start, timeout); + res = amqp_poll_read(fd, timeout); break; case AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE: - res = amqp_poll_write(fd, start, timeout); + res = amqp_poll_write(fd, timeout); break; } if (AMQP_STATUS_OK == res) { @@ -439,13 +415,25 @@ int amqp_open_socket(char const *hostname, int portnumber) { - return amqp_open_socket_noblock(hostname, portnumber, NULL); + return amqp_open_socket_inner(hostname, portnumber, + amqp_timer_start_infinite()); } int amqp_open_socket_noblock(char const *hostname, int portnumber, struct timeval *timeout) { + amqp_timer_t timer; + int res = amqp_timer_start(&timer, timeout); + if (AMQP_STATUS_OK != res) { + return res; + } + return amqp_open_socket_inner(hostname, portnumber, timer); +} + +int amqp_open_socket_inner(char const *hostname, + int portnumber, + amqp_timer_t timer) { struct addrinfo hint; struct addrinfo *address_list; struct addrinfo *addr; @@ -454,10 +442,6 @@ int amqp_open_socket_noblock(char const *hostname, int last_error = AMQP_STATUS_OK; int one = 1; /* for setsockopt */ int res; - int timer_error; - amqp_timer_t timer; - - AMQP_INIT_TIMER(timer); last_error = amqp_os_socket_init(); if (AMQP_STATUS_OK != last_error) { @@ -514,62 +498,18 @@ int amqp_open_socket_noblock(char const *hostname, } if (EINPROGRESS == amqp_os_socket_error()) { - while (1) { - struct pollfd pfd; - int timeout_ms; + last_error = amqp_poll_write(sockfd, timer); + if (AMQP_STATUS_OK == last_error) { + int result; + socklen_t result_len = sizeof(result); - pfd.fd = sockfd; - pfd.events = POLLOUT; - pfd.revents = 0; - - if (timeout) { - timer_error = amqp_timer_update(&timer, timeout); - if (timer_error < 0) { - last_error = timer_error; - break; - } - - timeout_ms = timer.tv.tv_sec * AMQP_MS_PER_S + - timer.tv.tv_usec / AMQP_US_PER_MS; + if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &result, &result_len) < + 0 || result != 0) { + last_error = AMQP_STATUS_SOCKET_ERROR; } else { - timeout_ms = -1; - } - /* Win32 requires except_fds to be passed to detect connection - * failure. Other platforms only need write_fds, passing except_fds - * seems to be harmless otherwise - */ - res = poll(&pfd, 1, timeout_ms); - - if (res > 0) { - int result; - socklen_t result_len = sizeof(result); - - if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &result, &result_len) < - 0) { - last_error = AMQP_STATUS_SOCKET_ERROR; - break; - } - - if (result != 0) { - last_error = AMQP_STATUS_SOCKET_ERROR; - break; - } - last_error = AMQP_STATUS_OK; - break; - } else if (0 == res) { - /* Timed out - return */ - last_error = AMQP_STATUS_TIMEOUT; - break; - } else if (errno == EINTR) { - /* Try again */ - continue; - } else { - /* Error connecting */ - last_error = AMQP_STATUS_SOCKET_ERROR; - break; } - } /* end while(1) loop */ + } if (last_error == AMQP_STATUS_OK || last_error == AMQP_STATUS_TIMEOUT || last_error == AMQP_STATUS_TIMER_FAILURE) { @@ -756,17 +696,10 @@ static int consume_one_frame(amqp_connection_state_t state, amqp_frame_t *decode } -static int recv_with_timeout(amqp_connection_state_t state, uint64_t start, struct timeval *timeout) -{ +static int recv_with_timeout(amqp_connection_state_t state, amqp_timer_t timeout) { int res; int fd; - if (timeout && (timeout->tv_sec < 0 || timeout->tv_usec < 0 || - INT_MAX < ((uint64_t)timeout->tv_sec * AMQP_MS_PER_S + - (uint64_t)timeout->tv_usec / AMQP_US_PER_MS))) { - return AMQP_STATUS_INVALID_PARAMETER; - } - start_recv: res = amqp_socket_recv(state->socket, state->sock_inbound_buffer.bytes, state->sock_inbound_buffer.len, 0); @@ -780,10 +713,10 @@ start_recv: default: return res; case AMQP_PRIVATE_STATUS_SOCKET_NEEDREAD: - res = amqp_poll_read(fd, start, timeout); + res = amqp_poll_read(fd, timeout); break; case AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE: - res = amqp_poll_write(fd, start, timeout); + res = amqp_poll_write(fd, timeout); break; } if (AMQP_STATUS_OK == res) { @@ -806,9 +739,8 @@ start_recv: return AMQP_STATUS_OK; } -int amqp_try_recv(amqp_connection_state_t state, uint64_t current_time) -{ - struct timeval tv; +int amqp_try_recv(amqp_connection_state_t state) { + amqp_timer_t timeout; while (amqp_data_in_buffer(state)) { amqp_frame_t frame; @@ -848,12 +780,9 @@ int amqp_try_recv(amqp_connection_state_t state, uint64_t current_time) state->last_queued_frame = link; } } + timeout = amqp_timer_start_immediate(); - memset(&tv, 0, sizeof(struct timeval)); - tv.tv_sec = 0; - tv.tv_usec = 0; - - return recv_with_timeout(state, current_time, &tv); + return recv_with_timeout(state, timeout); } static int wait_frame_inner(amqp_connection_state_t state, @@ -864,7 +793,7 @@ static int wait_frame_inner(amqp_connection_state_t state, uint64_t timeout_timestamp = 0; uint64_t next_timestamp = 0; struct timeval tv; - struct timeval *tvp = NULL; + amqp_timer_t timer; if (timeout && (timeout->tv_sec < 0 || timeout->tv_usec < 0)) { return AMQP_STATUS_INVALID_PARAMETER; @@ -928,7 +857,6 @@ beginrecv: } } - if (amqp_heartbeat_enabled(state)) { if (current_timestamp > state->next_recv_heartbeat) { state->next_recv_heartbeat = current_timestamp; @@ -952,10 +880,18 @@ beginrecv: tv.tv_sec = ns_until_next_timeout / AMQP_NS_PER_S; tv.tv_usec = (ns_until_next_timeout % AMQP_NS_PER_S) / AMQP_NS_PER_US; - tvp = &tv; + /* TODO: refactor the above so that this doesn't require a timer ping */ + res = amqp_timer_start(&timer, &tv); + if (AMQP_STATUS_OK != res) { + return res; + } + } else { + timer = amqp_timer_start_infinite(); } - res = recv_with_timeout(state, current_timestamp, tvp); + /* TODO this needs to wait for a _frame_ and not anything written from the + * socket */ + res = recv_with_timeout(state, timer); if (AMQP_STATUS_TIMEOUT == res) { if (next_timestamp == state->next_recv_heartbeat) { |