summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Antonuk <alan.antonuk@gmail.com>2015-04-08 00:01:08 -0700
committerAlan Antonuk <alan.antonuk@gmail.com>2015-04-19 22:09:33 -0700
commit8ed47ad13d174c3051e04aef79a520ebd7e70756 (patch)
tree6bb506aa5d64b59e8ff3b5edc36452359c743c60
parent301e9f111c9b049772a0b6ca1d3b05bc5688531f (diff)
downloadrabbitmq-c-8ed47ad13d174c3051e04aef79a520ebd7e70756.tar.gz
Add support for send/writev in non-blocking mode
-rw-r--r--librabbitmq/amqp_connection.c6
-rw-r--r--librabbitmq/amqp_socket.c162
-rw-r--r--librabbitmq/amqp_socket.h6
-rw-r--r--librabbitmq/amqp_tcp_socket.c91
4 files changed, 197 insertions, 68 deletions
diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c
index 3bb6dca..649c3c4 100644
--- a/librabbitmq/amqp_connection.c
+++ b/librabbitmq/amqp_connection.c
@@ -476,7 +476,7 @@ int amqp_send_frame(amqp_connection_state_t state,
iov[2].iov_base = &frame_end_byte;
iov[2].iov_len = FOOTER_SIZE;
- res = amqp_socket_writev(state->socket, iov, 3);
+ res = amqp_try_writev(state, iov, 3);
} else {
size_t out_frame_len;
amqp_bytes_t encoded;
@@ -524,8 +524,8 @@ int amqp_send_frame(amqp_connection_state_t state,
amqp_e32(out_frame, 3, out_frame_len);
amqp_e8(out_frame, out_frame_len + HEADER_SIZE, AMQP_FRAME_END);
- res = amqp_socket_send(state->socket, out_frame,
- out_frame_len + HEADER_SIZE + FOOTER_SIZE);
+ res = amqp_try_send(state, out_frame,
+ out_frame_len + HEADER_SIZE + FOOTER_SIZE);
}
if (state->heartbeat > 0) {
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index 52a9671..e9aa2a7 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -258,6 +258,158 @@ amqp_socket_get_sockfd(amqp_socket_t *self)
return self->klass->get_sockfd(self);
}
+static int poll_for_write(int fd, uint64_t start, struct timeval *timeout) {
+ struct pollfd pfd;
+ int res;
+ int timeout_ms;
+
+start_poll:
+ pfd.fd = fd;
+ pfd.events = POLLOUT;
+
+ if (timeout) {
+ timeout_ms =
+ timeout->tv_sec * AMQP_MS_PER_S + timeout->tv_usec / AMQP_US_PER_MS;
+ } else {
+ timeout_ms = -1;
+ }
+
+ res = poll(&pfd, 1, timeout_ms);
+
+ if (0 < res) {
+ return AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE;
+ } else if (0 == res) {
+ return AMQP_STATUS_TIMEOUT;
+ } else {
+ switch (amqp_os_socket_error()) {
+ case EINTR:
+ if (timeout) {
+ uint64_t end_timestamp;
+ uint64_t time_left;
+ uint64_t current_timestamp = amqp_get_monotonic_timestamp();
+ if (0 == current_timestamp) {
+ return AMQP_STATUS_TIMER_FAILURE;
+ }
+ end_timestamp = start + (uint64_t)timeout->tv_sec * AMQP_NS_PER_S +
+ (uint64_t)timeout->tv_usec * AMQP_NS_PER_US;
+ if (current_timestamp > end_timestamp) {
+ return AMQP_STATUS_TIMEOUT;
+ }
+ time_left = end_timestamp - current_timestamp;
+
+ timeout->tv_sec = time_left / AMQP_NS_PER_S;
+ timeout->tv_usec = (time_left % AMQP_NS_PER_S) / AMQP_NS_PER_S;
+ }
+ goto start_poll;
+ default:
+ return AMQP_STATUS_SOCKET_ERROR;
+ }
+ }
+}
+
+ssize_t amqp_try_writev(amqp_connection_state_t state, struct iovec *iov,
+ int iovcnt) {
+ int i;
+ ssize_t res;
+ struct iovec *iov_left = iov;
+ int iovcnt_left = iovcnt;
+ /* TODO(alanxz) this should probably be a parameter */
+ struct timeval *timeout = NULL;
+ uint64_t start;
+ ssize_t len_left;
+ if (timeout) {
+ start = amqp_get_monotonic_timestamp();
+ if (0 == start) {
+ return AMQP_STATUS_TIMER_FAILURE;
+ }
+ }
+
+ len_left = 0;
+ for (i = 0; i < iovcnt_left; ++i) {
+ len_left += iov_left[i].iov_len;
+ }
+
+start_send:
+ res = amqp_socket_writev(state->socket, iov_left, iovcnt_left);
+
+ if (res > 0) {
+ len_left -= res;
+ if (0 == len_left) {
+ return AMQP_STATUS_OK;
+ }
+ for (i = 0; i < iovcnt_left; ++i) {
+ if (res < (ssize_t)iov_left[i].iov_len) {
+ iov_left[i].iov_base = ((char *)iov_left[i].iov_base) + res;
+ iov_left[i].iov_len -= res;
+
+ iovcnt_left -= i;
+ iov_left += i;
+ break;
+ } else {
+ res -= iov_left[i].iov_len;
+ }
+ }
+ goto start_send;
+ } else if (res != AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE) {
+ return res;
+ }
+ {
+ int fd;
+ fd = amqp_get_sockfd(state);
+ if (-1 == fd) {
+ return AMQP_STATUS_SOCKET_CLOSED;
+ }
+ res = poll_for_write(fd, start, timeout);
+ if (AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE == res) {
+ goto start_send;
+ }
+ }
+ return res;
+}
+
+ssize_t amqp_try_send(amqp_connection_state_t state, const void *buf,
+ size_t len) {
+ ssize_t res;
+ void* buf_left = (void*)buf;
+ /* Assume that len is going to be larger than ssize_t can hold. */
+ ssize_t len_left = (size_t)len;
+ /* TODO(alanxz) this should probably be a parameter */
+ struct timeval *timeout = NULL;
+ uint64_t start;
+ if (timeout) {
+ start = amqp_get_monotonic_timestamp();
+ if (0 == start) {
+ return AMQP_STATUS_TIMER_FAILURE;
+ }
+ }
+
+start_send:
+ res = amqp_socket_send(state->socket, buf_left, len_left);
+
+ if (res > 0) {
+ len_left -= res;
+ buf_left = (char*)buf_left - res;
+ if (0 == len_left) {
+ return AMQP_STATUS_OK;
+ }
+ goto start_send;
+ } else if (res != AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE) {
+ return res;
+ }
+ {
+ int fd;
+ fd = amqp_get_sockfd(state);
+ if (-1 == fd) {
+ return AMQP_STATUS_SOCKET_CLOSED;
+ }
+ res = poll_for_write(fd, start, timeout);
+ if (AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE == res) {
+ goto start_send;
+ }
+ }
+ return res;
+}
+
int
amqp_open_socket(char const *hostname,
int portnumber)
@@ -341,9 +493,9 @@ int amqp_open_socket_noblock(char const *hostname,
struct pollfd pfd;
int timeout_ms;
- pfd.fd = sockfd;
- pfd.events = POLLOUT;
- pfd.revents = 0;
+ pfd.fd = sockfd;
+ pfd.events = POLLOUT;
+ pfd.revents = 0;
if (timeout) {
timer_error = amqp_timer_update(&timer, timeout);
@@ -426,7 +578,7 @@ int amqp_send_header(amqp_connection_state_t state)
AMQP_PROTOCOL_VERSION_MINOR,
AMQP_PROTOCOL_VERSION_REVISION
};
- return amqp_socket_send(state->socket, header, sizeof(header));
+ return amqp_try_send(state, header, sizeof(header));
}
static amqp_bytes_t sasl_method_name(amqp_sasl_method_enum method)
@@ -624,7 +776,7 @@ start_recv:
} else if (0 == res) {
return AMQP_STATUS_TIMEOUT;
} else if (-1 == res) {
- switch (errno) {
+ switch (amqp_os_socket_error()) {
case EINTR:
if (timeout) {
uint64_t end_timestamp;
diff --git a/librabbitmq/amqp_socket.h b/librabbitmq/amqp_socket.h
index 4b25499..a1c4040 100644
--- a/librabbitmq/amqp_socket.h
+++ b/librabbitmq/amqp_socket.h
@@ -108,6 +108,9 @@ amqp_set_socket(amqp_connection_state_t state, amqp_socket_t *socket);
ssize_t
amqp_socket_writev(amqp_socket_t *self, struct iovec *iov, int iovcnt);
+ssize_t amqp_try_writev(amqp_connection_state_t state, struct iovec *iov,
+ int iovcnt);
+
/**
* Send a message from a socket.
*
@@ -125,6 +128,9 @@ amqp_socket_writev(amqp_socket_t *self, struct iovec *iov, int iovcnt);
ssize_t
amqp_socket_send(amqp_socket_t *self, const void *buf, size_t len);
+ssize_t amqp_try_send(amqp_connection_state_t state, const void *buf,
+ size_t len);
+
/**
* Receive a message from a socket.
*
diff --git a/librabbitmq/amqp_tcp_socket.c b/librabbitmq/amqp_tcp_socket.c
index 4b597ed..6d4ef45 100644
--- a/librabbitmq/amqp_tcp_socket.c
+++ b/librabbitmq/amqp_tcp_socket.c
@@ -50,8 +50,6 @@ amqp_tcp_socket_send_inner(void *base, const void *buf, size_t len, int flags)
{
struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
ssize_t res;
- const char *buf_left = buf;
- ssize_t len_left = len;
if (-1 == self->sockfd) {
return AMQP_STATUS_SOCKET_CLOSED;
@@ -62,24 +60,24 @@ amqp_tcp_socket_send_inner(void *base, const void *buf, size_t len, int flags)
#endif
start:
- res = send(self->sockfd, buf_left, len_left, flags);
+ res = send(self->sockfd, buf, len, flags);
if (res < 0) {
self->internal_error = amqp_os_socket_error();
- if (EINTR == self->internal_error) {
- goto start;
- } else {
- res = AMQP_STATUS_SOCKET_ERROR;
+ switch (self->internal_error) {
+ case EINTR:
+ goto start;
+ case EWOULDBLOCK:
+#if defined(EAGAIN) && EAGAIN != EWOULDBLOCK
+ case EAGAIN:
+#endif
+ res = AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE;
+ break;
+ default:
+ res = AMQP_STATUS_SOCKET_ERROR;
}
} else {
- if (res == len_left) {
- self->internal_error = 0;
- res = AMQP_STATUS_OK;
- } else {
- buf_left += res;
- len_left -= res;
- goto start;
- }
+ self->internal_error = 0;
}
return res;
@@ -109,27 +107,15 @@ amqp_tcp_socket_writev(void *base, struct iovec *iov, int iovcnt)
if (WSASend(self->sockfd, (LPWSABUF)iov, iovcnt, &res, 0, NULL, NULL) ==
0) {
self->internal_error = 0;
- ret = AMQP_STATUS_OK;
+ ret = res;
} else {
self->internal_error = WSAGetLastError();
- ret = AMQP_STATUS_SOCKET_ERROR;
- }
- return ret;
- }
-
-#elif defined(MSG_MORE)
- {
- int i;
- for (i = 0; i < iovcnt - 1; ++i) {
- ret = amqp_tcp_socket_send_inner(self, iov[i].iov_base, iov[i].iov_len,
- MSG_MORE);
- if (ret != AMQP_STATUS_OK) {
- goto exit;
+ if (WSAEWOULDBLOCK == self->internal_error) {
+ ret = AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE;
+ } else {
+ ret = AMQP_STATUS_SOCKET_ERROR;
}
}
- ret = amqp_tcp_socket_send_inner(self, iov[i].iov_base, iov[i].iov_len, 0);
-
- exit:
return ret;
}
@@ -146,38 +132,23 @@ amqp_tcp_socket_writev(void *base, struct iovec *iov, int iovcnt)
}
start:
- ret = writev(self->sockfd, iov_left, iovcnt_left);
+ ret = writev(self->sockfd, iov, iovcnt);
if (ret < 0) {
self->internal_error = amqp_os_socket_error();
- if (EINTR == self->internal_error) {
- goto start;
- } else {
- self->internal_error = amqp_os_socket_error();
- ret = AMQP_STATUS_SOCKET_ERROR;
- }
- } else {
- if (ret == len_left) {
- self->internal_error = 0;
- ret = AMQP_STATUS_OK;
- } else {
- len_left -= ret;
- for (i = 0; i < iovcnt_left; ++i) {
- if (ret < (ssize_t)iov_left[i].iov_len) {
- iov_left[i].iov_base = ((char *)iov_left[i].iov_base) + ret;
- iov_left[i].iov_len -= ret;
-
- iovcnt_left -= i;
- iov_left += i;
- break;
- } else {
- ret -= iov_left[i].iov_len;
- }
- }
- goto start;
+ switch (self->internal_error) {
+ case EINTR:
+ goto start;
+ case EWOULDBLOCK:
+#if defined(EAGAIN) && EAGAIN != EWOULDBLOCK
+ case EAGAIN:
+#endif
+ ret = AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE;
+ break;
+ default:
+ ret = AMQP_STATUS_SOCKET_ERROR;
}
}
-
return ret;
}
@@ -205,7 +176,7 @@ amqp_tcp_socket_writev(void *base, struct iovec *iov, int iovcnt)
bufferp = self->buffer;
for (i = 0; i < iovcnt; ++i) {
memcpy(bufferp, iov[i].iov_base, iov[i].iov_len);
- bufferp += iov[i].iov_len;
+ bufferp = (char*)bufferp + iov[i].iov_len;
}
ret = amqp_tcp_socket_send_inner(self, self->buffer, bytes, 0);