diff options
author | Alan Antonuk <alan.antonuk@gmail.com> | 2013-06-11 10:56:07 -0700 |
---|---|---|
committer | Alan Antonuk <alan.antonuk@gmail.com> | 2013-06-13 16:40:29 -0700 |
commit | 22e41b8b52bace283c424d9a125656fcb0a41120 (patch) | |
tree | 24a7b782a4f48643e6d7303884d00e32385c98a2 /librabbitmq/amqp_tcp_socket.c | |
parent | 7231d921b3db2f7d62e716976c31abdbf8e0edc5 (diff) | |
download | rabbitmq-c-22e41b8b52bace283c424d9a125656fcb0a41120.tar.gz |
Improve error handling in socket functions
Diffstat (limited to 'librabbitmq/amqp_tcp_socket.c')
-rw-r--r-- | librabbitmq/amqp_tcp_socket.c | 183 |
1 files changed, 175 insertions, 8 deletions
diff --git a/librabbitmq/amqp_tcp_socket.c b/librabbitmq/amqp_tcp_socket.c index 1a155bb..c699fc5 100644 --- a/librabbitmq/amqp_tcp_socket.c +++ b/librabbitmq/amqp_tcp_socket.c @@ -27,33 +27,196 @@ #include "amqp_private.h" #include "amqp_tcp_socket.h" + +#include <errno.h> #include <stdio.h> #include <stdlib.h> struct amqp_tcp_socket_t { const struct amqp_socket_class_t *klass; int sockfd; + void *buffer; + size_t buffer_length; + int internal_error; }; + static ssize_t -amqp_tcp_socket_writev(void *base, const struct iovec *iov, int iovcnt) +amqp_tcp_socket_send_inner(void *base, const void *buf, size_t len, int flags) { struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; - return amqp_os_socket_writev(self->sockfd, iov, iovcnt); + ssize_t res; + + const char *buf_left = buf; + ssize_t len_left = len; + +#ifdef MSG_NOSIGNAL + flags |= MSG_NOSIGNAL; +#endif + +start: + res = send(self->sockfd, buf, len, flags); + + if (res < 0) { + self->internal_error = amqp_os_socket_error(); + if (EINTR == self->internal_error) { + goto start; + } else { + res = AMQP_STATUS_SOCKET_ERROR; + } + } else { + if (res == len_left) { + self->internal_error = 0; + res = AMQP_STATUS_OK; + } else { + buf_left += res; + len_left -= res; + goto start; + } + } + + return res; } static ssize_t -amqp_tcp_socket_send(void *base, const void *buf, size_t len, int flags) +amqp_tcp_socket_send(void *base, const void *buf, size_t len) +{ + return amqp_tcp_socket_send_inner(base, buf, len, 0); +} + +static ssize_t +amqp_tcp_socket_writev(void *base, struct iovec *iov, int iovcnt) { struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; - return send(self->sockfd, buf, len, flags); + ssize_t ret; + +#if defined(_WIN32) + DWORD res; + /* Making the assumption here that WSAsend won't do a partial send + * unless an error occured, in which case we're hosed so it doesn't matter */ + if (WSASend(self->sockfd, (LPWSABUF)iov, iovcnt, &res, 0, NULL, NULL) == 0) { + self->internal_error = 0; + ret = AMQP_STATUS_OK; + } else { + self->internal_error = WSAGetLastError(); + ret = AMQP_STATUS_SOCKET_ERROR; + } + return ret; + +#elif defined(MSG_MORE) + int i; + for (i = 0; i < iovcnt - 1; ++i) { + ret = amqp_tcp_socket_send_inner(self, iov[i].iov_base, iov[i].iov_len, MSG_MORE); + if (ret != AMQP_STATUS_OK) { + goto exit; + } + } + ret = amqp_tcp_socket_send_inner(self, iov[i].iov_base, iov[i].iov_len, 0); + +exit: + return ret; + +#elif defined(SO_NOSIGPIPE) || !defined(MSG_NOSIGNAL) + int i; + ssize_t len_left = 0; + + + struct iovec *iov_left = iov; + int iovcnt_left = iovcnt; + + for (int i = 0; i < iovcnt; ++i) { + len_left += iov[i].iov_len; + } + +start: + ret = writev(self->sockfd, iov_left, iovcnt_left); + + if (ret < 0) { + self->internal_error = amqp_os_socket_error(); + if (EINTR == self->internal_error) { + goto start; + } else { + self->internal_error = amqp_os_socket_error(); + ret = AMQP_STATUS_SOCKET_ERROR; + } + } else { + if (ret == len_left) { + self->internal_error = 0; + ret = AMQP_STATUS_OK; + } else { + len_left -= ret; + for (i = 0; i < iovcnt_left; ++i) { + if (ret < (ssize_t)iov_left[i].iov_len) { + iov_left[i].iov_base = ((char*)iov_left[i].iov_base) + ret; + iov_left[i].iov_len -= ret; + + iovcnt_left -= i; + iov_left += i; + break; + } else { + ret -= iov_left[i].iov_len; + } + } + goto start; + } + } + + return ret; + +#else + int i; + size_t bytes = 0; + void *bufferp; + + for (i = 0; i < iovcnt; ++i) { + bytes += iov[i].iov_len; + } + + if (self->buffer_length < bytes) { + self->buffer = realloc(self->buffer, bytes); + if (NULL == self->buffer) { + self->buffer_length = 0; + self->internal_error = 0; + ret = AMQP_STATUS_NO_MEMORY; + goto exit; + } + self->buffer_length = bytes; + } + + bufferp = self->buffer; + for (i = 0; i < iovcnt; ++i) { + memcpy(bufferp, iov[i].iov_base, iov[i].iov_len); + bufferp += iov[i].iov_len; + } + + ret = amqp_tcp_socket_send_inner(self, self->buffer, bytes, 0); + +exit: + return ret; +#endif } static ssize_t amqp_tcp_socket_recv(void *base, void *buf, size_t len, int flags) { struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; - return recv(self->sockfd, buf, len, flags); + ssize_t ret; + +start: + ret = recv(self->sockfd, buf, len, flags); + + if (0 > ret) { + self->internal_error = amqp_os_socket_error(); + if (EINTR == self->internal_error) { + goto start; + } else { + ret = AMQP_STATUS_SOCKET_ERROR; + } + } else if (0 == ret) { + ret = AMQP_STATUS_CONNECTION_CLOSED; + } + + return ret; } static int @@ -62,9 +225,11 @@ amqp_tcp_socket_open(void *base, const char *host, int port) struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; self->sockfd = amqp_open_socket(host, port); if (0 > self->sockfd) { - return -1; + int err = self->sockfd; + self->sockfd = -1; + return err; } - return 0; + return AMQP_STATUS_OK; } static int @@ -74,6 +239,7 @@ amqp_tcp_socket_close(void *base) int status = -1; if (self) { status = amqp_os_socket_close(self->sockfd); + free(self->buffer); free(self); } @@ -87,7 +253,8 @@ amqp_tcp_socket_close(void *base) static int amqp_tcp_socket_error(AMQP_UNUSED void *base) { - return amqp_os_socket_error(); + struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; + return self->internal_error; } static int |