summaryrefslogtreecommitdiff
path: root/librabbitmq/amqp_tcp_socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'librabbitmq/amqp_tcp_socket.c')
-rw-r--r--librabbitmq/amqp_tcp_socket.c91
1 files changed, 31 insertions, 60 deletions
diff --git a/librabbitmq/amqp_tcp_socket.c b/librabbitmq/amqp_tcp_socket.c
index 4b597ed..6d4ef45 100644
--- a/librabbitmq/amqp_tcp_socket.c
+++ b/librabbitmq/amqp_tcp_socket.c
@@ -50,8 +50,6 @@ amqp_tcp_socket_send_inner(void *base, const void *buf, size_t len, int flags)
{
struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
ssize_t res;
- const char *buf_left = buf;
- ssize_t len_left = len;
if (-1 == self->sockfd) {
return AMQP_STATUS_SOCKET_CLOSED;
@@ -62,24 +60,24 @@ amqp_tcp_socket_send_inner(void *base, const void *buf, size_t len, int flags)
#endif
start:
- res = send(self->sockfd, buf_left, len_left, flags);
+ res = send(self->sockfd, buf, len, flags);
if (res < 0) {
self->internal_error = amqp_os_socket_error();
- if (EINTR == self->internal_error) {
- goto start;
- } else {
- res = AMQP_STATUS_SOCKET_ERROR;
+ switch (self->internal_error) {
+ case EINTR:
+ goto start;
+ case EWOULDBLOCK:
+#if defined(EAGAIN) && EAGAIN != EWOULDBLOCK
+ case EAGAIN:
+#endif
+ res = AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE;
+ break;
+ default:
+ res = AMQP_STATUS_SOCKET_ERROR;
}
} else {
- if (res == len_left) {
- self->internal_error = 0;
- res = AMQP_STATUS_OK;
- } else {
- buf_left += res;
- len_left -= res;
- goto start;
- }
+ self->internal_error = 0;
}
return res;
@@ -109,27 +107,15 @@ amqp_tcp_socket_writev(void *base, struct iovec *iov, int iovcnt)
if (WSASend(self->sockfd, (LPWSABUF)iov, iovcnt, &res, 0, NULL, NULL) ==
0) {
self->internal_error = 0;
- ret = AMQP_STATUS_OK;
+ ret = res;
} else {
self->internal_error = WSAGetLastError();
- ret = AMQP_STATUS_SOCKET_ERROR;
- }
- return ret;
- }
-
-#elif defined(MSG_MORE)
- {
- int i;
- for (i = 0; i < iovcnt - 1; ++i) {
- ret = amqp_tcp_socket_send_inner(self, iov[i].iov_base, iov[i].iov_len,
- MSG_MORE);
- if (ret != AMQP_STATUS_OK) {
- goto exit;
+ if (WSAEWOULDBLOCK == self->internal_error) {
+ ret = AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE;
+ } else {
+ ret = AMQP_STATUS_SOCKET_ERROR;
}
}
- ret = amqp_tcp_socket_send_inner(self, iov[i].iov_base, iov[i].iov_len, 0);
-
- exit:
return ret;
}
@@ -146,38 +132,23 @@ amqp_tcp_socket_writev(void *base, struct iovec *iov, int iovcnt)
}
start:
- ret = writev(self->sockfd, iov_left, iovcnt_left);
+ ret = writev(self->sockfd, iov, iovcnt);
if (ret < 0) {
self->internal_error = amqp_os_socket_error();
- if (EINTR == self->internal_error) {
- goto start;
- } else {
- self->internal_error = amqp_os_socket_error();
- ret = AMQP_STATUS_SOCKET_ERROR;
- }
- } else {
- if (ret == len_left) {
- self->internal_error = 0;
- ret = AMQP_STATUS_OK;
- } else {
- len_left -= ret;
- for (i = 0; i < iovcnt_left; ++i) {
- if (ret < (ssize_t)iov_left[i].iov_len) {
- iov_left[i].iov_base = ((char *)iov_left[i].iov_base) + ret;
- iov_left[i].iov_len -= ret;
-
- iovcnt_left -= i;
- iov_left += i;
- break;
- } else {
- ret -= iov_left[i].iov_len;
- }
- }
- goto start;
+ switch (self->internal_error) {
+ case EINTR:
+ goto start;
+ case EWOULDBLOCK:
+#if defined(EAGAIN) && EAGAIN != EWOULDBLOCK
+ case EAGAIN:
+#endif
+ ret = AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE;
+ break;
+ default:
+ ret = AMQP_STATUS_SOCKET_ERROR;
}
}
-
return ret;
}
@@ -205,7 +176,7 @@ amqp_tcp_socket_writev(void *base, struct iovec *iov, int iovcnt)
bufferp = self->buffer;
for (i = 0; i < iovcnt; ++i) {
memcpy(bufferp, iov[i].iov_base, iov[i].iov_len);
- bufferp += iov[i].iov_len;
+ bufferp = (char*)bufferp + iov[i].iov_len;
}
ret = amqp_tcp_socket_send_inner(self, self->buffer, bytes, 0);