diff options
author | Alan Antonuk <alan.antonuk@gmail.com> | 2015-04-08 00:01:08 -0700 |
---|---|---|
committer | Alan Antonuk <alan.antonuk@gmail.com> | 2015-04-19 22:09:33 -0700 |
commit | 8ed47ad13d174c3051e04aef79a520ebd7e70756 (patch) | |
tree | 6bb506aa5d64b59e8ff3b5edc36452359c743c60 /librabbitmq/amqp_tcp_socket.c | |
parent | 301e9f111c9b049772a0b6ca1d3b05bc5688531f (diff) | |
download | rabbitmq-c-8ed47ad13d174c3051e04aef79a520ebd7e70756.tar.gz |
Add support for send/writev in non-blocking mode
Diffstat (limited to 'librabbitmq/amqp_tcp_socket.c')
-rw-r--r-- | librabbitmq/amqp_tcp_socket.c | 91 |
1 files changed, 31 insertions, 60 deletions
diff --git a/librabbitmq/amqp_tcp_socket.c b/librabbitmq/amqp_tcp_socket.c index 4b597ed..6d4ef45 100644 --- a/librabbitmq/amqp_tcp_socket.c +++ b/librabbitmq/amqp_tcp_socket.c @@ -50,8 +50,6 @@ amqp_tcp_socket_send_inner(void *base, const void *buf, size_t len, int flags) { struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; ssize_t res; - const char *buf_left = buf; - ssize_t len_left = len; if (-1 == self->sockfd) { return AMQP_STATUS_SOCKET_CLOSED; @@ -62,24 +60,24 @@ amqp_tcp_socket_send_inner(void *base, const void *buf, size_t len, int flags) #endif start: - res = send(self->sockfd, buf_left, len_left, flags); + res = send(self->sockfd, buf, len, flags); if (res < 0) { self->internal_error = amqp_os_socket_error(); - if (EINTR == self->internal_error) { - goto start; - } else { - res = AMQP_STATUS_SOCKET_ERROR; + switch (self->internal_error) { + case EINTR: + goto start; + case EWOULDBLOCK: +#if defined(EAGAIN) && EAGAIN != EWOULDBLOCK + case EAGAIN: +#endif + res = AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE; + break; + default: + res = AMQP_STATUS_SOCKET_ERROR; } } else { - if (res == len_left) { - self->internal_error = 0; - res = AMQP_STATUS_OK; - } else { - buf_left += res; - len_left -= res; - goto start; - } + self->internal_error = 0; } return res; @@ -109,27 +107,15 @@ amqp_tcp_socket_writev(void *base, struct iovec *iov, int iovcnt) if (WSASend(self->sockfd, (LPWSABUF)iov, iovcnt, &res, 0, NULL, NULL) == 0) { self->internal_error = 0; - ret = AMQP_STATUS_OK; + ret = res; } else { self->internal_error = WSAGetLastError(); - ret = AMQP_STATUS_SOCKET_ERROR; - } - return ret; - } - -#elif defined(MSG_MORE) - { - int i; - for (i = 0; i < iovcnt - 1; ++i) { - ret = amqp_tcp_socket_send_inner(self, iov[i].iov_base, iov[i].iov_len, - MSG_MORE); - if (ret != AMQP_STATUS_OK) { - goto exit; + if (WSAEWOULDBLOCK == self->internal_error) { + ret = AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE; + } else { + ret = AMQP_STATUS_SOCKET_ERROR; } } - ret = amqp_tcp_socket_send_inner(self, iov[i].iov_base, iov[i].iov_len, 0); - - exit: return ret; } @@ -146,38 +132,23 @@ amqp_tcp_socket_writev(void *base, struct iovec *iov, int iovcnt) } start: - ret = writev(self->sockfd, iov_left, iovcnt_left); + ret = writev(self->sockfd, iov, iovcnt); if (ret < 0) { self->internal_error = amqp_os_socket_error(); - if (EINTR == self->internal_error) { - goto start; - } else { - self->internal_error = amqp_os_socket_error(); - ret = AMQP_STATUS_SOCKET_ERROR; - } - } else { - if (ret == len_left) { - self->internal_error = 0; - ret = AMQP_STATUS_OK; - } else { - len_left -= ret; - for (i = 0; i < iovcnt_left; ++i) { - if (ret < (ssize_t)iov_left[i].iov_len) { - iov_left[i].iov_base = ((char *)iov_left[i].iov_base) + ret; - iov_left[i].iov_len -= ret; - - iovcnt_left -= i; - iov_left += i; - break; - } else { - ret -= iov_left[i].iov_len; - } - } - goto start; + switch (self->internal_error) { + case EINTR: + goto start; + case EWOULDBLOCK: +#if defined(EAGAIN) && EAGAIN != EWOULDBLOCK + case EAGAIN: +#endif + ret = AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE; + break; + default: + ret = AMQP_STATUS_SOCKET_ERROR; } } - return ret; } @@ -205,7 +176,7 @@ amqp_tcp_socket_writev(void *base, struct iovec *iov, int iovcnt) bufferp = self->buffer; for (i = 0; i < iovcnt; ++i) { memcpy(bufferp, iov[i].iov_base, iov[i].iov_len); - bufferp += iov[i].iov_len; + bufferp = (char*)bufferp + iov[i].iov_len; } ret = amqp_tcp_socket_send_inner(self, self->buffer, bytes, 0); |