From 8ed47ad13d174c3051e04aef79a520ebd7e70756 Mon Sep 17 00:00:00 2001 From: Alan Antonuk Date: Wed, 8 Apr 2015 00:01:08 -0700 Subject: Add support for send/writev in non-blocking mode --- librabbitmq/amqp_connection.c | 6 +- librabbitmq/amqp_socket.c | 162 ++++++++++++++++++++++++++++++++++++++++-- librabbitmq/amqp_socket.h | 6 ++ librabbitmq/amqp_tcp_socket.c | 91 ++++++++---------------- 4 files changed, 197 insertions(+), 68 deletions(-) diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c index 3bb6dca..649c3c4 100644 --- a/librabbitmq/amqp_connection.c +++ b/librabbitmq/amqp_connection.c @@ -476,7 +476,7 @@ int amqp_send_frame(amqp_connection_state_t state, iov[2].iov_base = &frame_end_byte; iov[2].iov_len = FOOTER_SIZE; - res = amqp_socket_writev(state->socket, iov, 3); + res = amqp_try_writev(state, iov, 3); } else { size_t out_frame_len; amqp_bytes_t encoded; @@ -524,8 +524,8 @@ int amqp_send_frame(amqp_connection_state_t state, amqp_e32(out_frame, 3, out_frame_len); amqp_e8(out_frame, out_frame_len + HEADER_SIZE, AMQP_FRAME_END); - res = amqp_socket_send(state->socket, out_frame, - out_frame_len + HEADER_SIZE + FOOTER_SIZE); + res = amqp_try_send(state, out_frame, + out_frame_len + HEADER_SIZE + FOOTER_SIZE); } if (state->heartbeat > 0) { diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index 52a9671..e9aa2a7 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -258,6 +258,158 @@ amqp_socket_get_sockfd(amqp_socket_t *self) return self->klass->get_sockfd(self); } +static int poll_for_write(int fd, uint64_t start, struct timeval *timeout) { + struct pollfd pfd; + int res; + int timeout_ms; + +start_poll: + pfd.fd = fd; + pfd.events = POLLOUT; + + if (timeout) { + timeout_ms = + timeout->tv_sec * AMQP_MS_PER_S + timeout->tv_usec / AMQP_US_PER_MS; + } else { + timeout_ms = -1; + } + + res = poll(&pfd, 1, timeout_ms); + + if (0 < res) { + return AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE; + } else if (0 == res) { + return AMQP_STATUS_TIMEOUT; + } else { + switch (amqp_os_socket_error()) { + case EINTR: + if (timeout) { + uint64_t end_timestamp; + uint64_t time_left; + uint64_t current_timestamp = amqp_get_monotonic_timestamp(); + if (0 == current_timestamp) { + return AMQP_STATUS_TIMER_FAILURE; + } + end_timestamp = start + (uint64_t)timeout->tv_sec * AMQP_NS_PER_S + + (uint64_t)timeout->tv_usec * AMQP_NS_PER_US; + if (current_timestamp > end_timestamp) { + return AMQP_STATUS_TIMEOUT; + } + time_left = end_timestamp - current_timestamp; + + timeout->tv_sec = time_left / AMQP_NS_PER_S; + timeout->tv_usec = (time_left % AMQP_NS_PER_S) / AMQP_NS_PER_S; + } + goto start_poll; + default: + return AMQP_STATUS_SOCKET_ERROR; + } + } +} + +ssize_t amqp_try_writev(amqp_connection_state_t state, struct iovec *iov, + int iovcnt) { + int i; + ssize_t res; + struct iovec *iov_left = iov; + int iovcnt_left = iovcnt; + /* TODO(alanxz) this should probably be a parameter */ + struct timeval *timeout = NULL; + uint64_t start; + ssize_t len_left; + if (timeout) { + start = amqp_get_monotonic_timestamp(); + if (0 == start) { + return AMQP_STATUS_TIMER_FAILURE; + } + } + + len_left = 0; + for (i = 0; i < iovcnt_left; ++i) { + len_left += iov_left[i].iov_len; + } + +start_send: + res = amqp_socket_writev(state->socket, iov_left, iovcnt_left); + + if (res > 0) { + len_left -= res; + if (0 == len_left) { + return AMQP_STATUS_OK; + } + for (i = 0; i < iovcnt_left; ++i) { + if (res < (ssize_t)iov_left[i].iov_len) { + iov_left[i].iov_base = ((char *)iov_left[i].iov_base) + res; + iov_left[i].iov_len -= res; + + iovcnt_left -= i; + iov_left += i; + break; + } else { + res -= iov_left[i].iov_len; + } + } + goto start_send; + } else if (res != AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE) { + return res; + } + { + int fd; + fd = amqp_get_sockfd(state); + if (-1 == fd) { + return AMQP_STATUS_SOCKET_CLOSED; + } + res = poll_for_write(fd, start, timeout); + if (AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE == res) { + goto start_send; + } + } + return res; +} + +ssize_t amqp_try_send(amqp_connection_state_t state, const void *buf, + size_t len) { + ssize_t res; + void* buf_left = (void*)buf; + /* Assume that len is going to be larger than ssize_t can hold. */ + ssize_t len_left = (size_t)len; + /* TODO(alanxz) this should probably be a parameter */ + struct timeval *timeout = NULL; + uint64_t start; + if (timeout) { + start = amqp_get_monotonic_timestamp(); + if (0 == start) { + return AMQP_STATUS_TIMER_FAILURE; + } + } + +start_send: + res = amqp_socket_send(state->socket, buf_left, len_left); + + if (res > 0) { + len_left -= res; + buf_left = (char*)buf_left - res; + if (0 == len_left) { + return AMQP_STATUS_OK; + } + goto start_send; + } else if (res != AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE) { + return res; + } + { + int fd; + fd = amqp_get_sockfd(state); + if (-1 == fd) { + return AMQP_STATUS_SOCKET_CLOSED; + } + res = poll_for_write(fd, start, timeout); + if (AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE == res) { + goto start_send; + } + } + return res; +} + int amqp_open_socket(char const *hostname, int portnumber) @@ -341,9 +493,9 @@ int amqp_open_socket_noblock(char const *hostname, struct pollfd pfd; int timeout_ms; - pfd.fd = sockfd; - pfd.events = POLLOUT; - pfd.revents = 0; + pfd.fd = sockfd; + pfd.events = POLLOUT; + pfd.revents = 0; if (timeout) { timer_error = amqp_timer_update(&timer, timeout); @@ -426,7 +578,7 @@ int amqp_send_header(amqp_connection_state_t state) AMQP_PROTOCOL_VERSION_MINOR, AMQP_PROTOCOL_VERSION_REVISION }; - return amqp_socket_send(state->socket, header, sizeof(header)); + return amqp_try_send(state, header, sizeof(header)); } static amqp_bytes_t sasl_method_name(amqp_sasl_method_enum method) @@ -624,7 +776,7 @@ start_recv: } else if (0 == res) { return AMQP_STATUS_TIMEOUT; } else if (-1 == res) { - switch (errno) { + switch (amqp_os_socket_error()) { case EINTR: if (timeout) { uint64_t end_timestamp; diff --git a/librabbitmq/amqp_socket.h b/librabbitmq/amqp_socket.h index 4b25499..a1c4040 100644 --- a/librabbitmq/amqp_socket.h +++ b/librabbitmq/amqp_socket.h @@ -108,6 +108,9 @@ amqp_set_socket(amqp_connection_state_t state, amqp_socket_t *socket); ssize_t amqp_socket_writev(amqp_socket_t *self, struct iovec *iov, int iovcnt); +ssize_t amqp_try_writev(amqp_connection_state_t state, struct iovec *iov, + int iovcnt); + /** * Send a message from a socket. * @@ -125,6 +128,9 @@ amqp_socket_writev(amqp_socket_t *self, struct iovec *iov, int iovcnt); ssize_t amqp_socket_send(amqp_socket_t *self, const void *buf, size_t len); +ssize_t amqp_try_send(amqp_connection_state_t state, const void *buf, + size_t len); + /** * Receive a message from a socket. * diff --git a/librabbitmq/amqp_tcp_socket.c b/librabbitmq/amqp_tcp_socket.c index 4b597ed..6d4ef45 100644 --- a/librabbitmq/amqp_tcp_socket.c +++ b/librabbitmq/amqp_tcp_socket.c @@ -50,8 +50,6 @@ amqp_tcp_socket_send_inner(void *base, const void *buf, size_t len, int flags) { struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; ssize_t res; - const char *buf_left = buf; - ssize_t len_left = len; if (-1 == self->sockfd) { return AMQP_STATUS_SOCKET_CLOSED; @@ -62,24 +60,24 @@ amqp_tcp_socket_send_inner(void *base, const void *buf, size_t len, int flags) #endif start: - res = send(self->sockfd, buf_left, len_left, flags); + res = send(self->sockfd, buf, len, flags); if (res < 0) { self->internal_error = amqp_os_socket_error(); - if (EINTR == self->internal_error) { - goto start; - } else { - res = AMQP_STATUS_SOCKET_ERROR; + switch (self->internal_error) { + case EINTR: + goto start; + case EWOULDBLOCK: +#if defined(EAGAIN) && EAGAIN != EWOULDBLOCK + case EAGAIN: +#endif + res = AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE; + break; + default: + res = AMQP_STATUS_SOCKET_ERROR; } } else { - if (res == len_left) { - self->internal_error = 0; - res = AMQP_STATUS_OK; - } else { - buf_left += res; - len_left -= res; - goto start; - } + self->internal_error = 0; } return res; @@ -109,27 +107,15 @@ amqp_tcp_socket_writev(void *base, struct iovec *iov, int iovcnt) if (WSASend(self->sockfd, (LPWSABUF)iov, iovcnt, &res, 0, NULL, NULL) == 0) { self->internal_error = 0; - ret = AMQP_STATUS_OK; + ret = res; } else { self->internal_error = WSAGetLastError(); - ret = AMQP_STATUS_SOCKET_ERROR; - } - return ret; - } - -#elif defined(MSG_MORE) - { - int i; - for (i = 0; i < iovcnt - 1; ++i) { - ret = amqp_tcp_socket_send_inner(self, iov[i].iov_base, iov[i].iov_len, - MSG_MORE); - if (ret != AMQP_STATUS_OK) { - goto exit; + if (WSAEWOULDBLOCK == self->internal_error) { + ret = AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE; + } else { + ret = AMQP_STATUS_SOCKET_ERROR; } } - ret = amqp_tcp_socket_send_inner(self, iov[i].iov_base, iov[i].iov_len, 0); - - exit: return ret; } @@ -146,38 +132,23 @@ amqp_tcp_socket_writev(void *base, struct iovec *iov, int iovcnt) } start: - ret = writev(self->sockfd, iov_left, iovcnt_left); + ret = writev(self->sockfd, iov, iovcnt); if (ret < 0) { self->internal_error = amqp_os_socket_error(); - if (EINTR == self->internal_error) { - goto start; - } else { - self->internal_error = amqp_os_socket_error(); - ret = AMQP_STATUS_SOCKET_ERROR; - } - } else { - if (ret == len_left) { - self->internal_error = 0; - ret = AMQP_STATUS_OK; - } else { - len_left -= ret; - for (i = 0; i < iovcnt_left; ++i) { - if (ret < (ssize_t)iov_left[i].iov_len) { - iov_left[i].iov_base = ((char *)iov_left[i].iov_base) + ret; - iov_left[i].iov_len -= ret; - - iovcnt_left -= i; - iov_left += i; - break; - } else { - ret -= iov_left[i].iov_len; - } - } - goto start; + switch (self->internal_error) { + case EINTR: + goto start; + case EWOULDBLOCK: +#if defined(EAGAIN) && EAGAIN != EWOULDBLOCK + case EAGAIN: +#endif + ret = AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE; + break; + default: + ret = AMQP_STATUS_SOCKET_ERROR; } } - return ret; } @@ -205,7 +176,7 @@ amqp_tcp_socket_writev(void *base, struct iovec *iov, int iovcnt) bufferp = self->buffer; for (i = 0; i < iovcnt; ++i) { memcpy(bufferp, iov[i].iov_base, iov[i].iov_len); - bufferp += iov[i].iov_len; + bufferp = (char*)bufferp + iov[i].iov_len; } ret = amqp_tcp_socket_send_inner(self, self->buffer, bytes, 0); -- cgit v1.2.1