summaryrefslogtreecommitdiff
path: root/librabbitmq/amqp_tcp_socket.c
diff options
context:
space:
mode:
authorAlan Antonuk <alan.antonuk@gmail.com>2013-06-11 10:56:07 -0700
committerAlan Antonuk <alan.antonuk@gmail.com>2013-06-13 16:40:29 -0700
commit22e41b8b52bace283c424d9a125656fcb0a41120 (patch)
tree24a7b782a4f48643e6d7303884d00e32385c98a2 /librabbitmq/amqp_tcp_socket.c
parent7231d921b3db2f7d62e716976c31abdbf8e0edc5 (diff)
downloadrabbitmq-c-22e41b8b52bace283c424d9a125656fcb0a41120.tar.gz
Improve error handling in socket functions
Diffstat (limited to 'librabbitmq/amqp_tcp_socket.c')
-rw-r--r--librabbitmq/amqp_tcp_socket.c183
1 files changed, 175 insertions, 8 deletions
diff --git a/librabbitmq/amqp_tcp_socket.c b/librabbitmq/amqp_tcp_socket.c
index 1a155bb..c699fc5 100644
--- a/librabbitmq/amqp_tcp_socket.c
+++ b/librabbitmq/amqp_tcp_socket.c
@@ -27,33 +27,196 @@
#include "amqp_private.h"
#include "amqp_tcp_socket.h"
+
+#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
struct amqp_tcp_socket_t {
const struct amqp_socket_class_t *klass;
int sockfd;
+ void *buffer;
+ size_t buffer_length;
+ int internal_error;
};
+
static ssize_t
-amqp_tcp_socket_writev(void *base, const struct iovec *iov, int iovcnt)
+amqp_tcp_socket_send_inner(void *base, const void *buf, size_t len, int flags)
{
struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
- return amqp_os_socket_writev(self->sockfd, iov, iovcnt);
+ ssize_t res;
+
+ const char *buf_left = buf;
+ ssize_t len_left = len;
+
+#ifdef MSG_NOSIGNAL
+ flags |= MSG_NOSIGNAL;
+#endif
+
+start:
+ res = send(self->sockfd, buf, len, flags);
+
+ if (res < 0) {
+ self->internal_error = amqp_os_socket_error();
+ if (EINTR == self->internal_error) {
+ goto start;
+ } else {
+ res = AMQP_STATUS_SOCKET_ERROR;
+ }
+ } else {
+ if (res == len_left) {
+ self->internal_error = 0;
+ res = AMQP_STATUS_OK;
+ } else {
+ buf_left += res;
+ len_left -= res;
+ goto start;
+ }
+ }
+
+ return res;
}
static ssize_t
-amqp_tcp_socket_send(void *base, const void *buf, size_t len, int flags)
+amqp_tcp_socket_send(void *base, const void *buf, size_t len)
+{
+ return amqp_tcp_socket_send_inner(base, buf, len, 0);
+}
+
+static ssize_t
+amqp_tcp_socket_writev(void *base, struct iovec *iov, int iovcnt)
{
struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
- return send(self->sockfd, buf, len, flags);
+ ssize_t ret;
+
+#if defined(_WIN32)
+ DWORD res;
+ /* Making the assumption here that WSAsend won't do a partial send
+ * unless an error occured, in which case we're hosed so it doesn't matter */
+ if (WSASend(self->sockfd, (LPWSABUF)iov, iovcnt, &res, 0, NULL, NULL) == 0) {
+ self->internal_error = 0;
+ ret = AMQP_STATUS_OK;
+ } else {
+ self->internal_error = WSAGetLastError();
+ ret = AMQP_STATUS_SOCKET_ERROR;
+ }
+ return ret;
+
+#elif defined(MSG_MORE)
+ int i;
+ for (i = 0; i < iovcnt - 1; ++i) {
+ ret = amqp_tcp_socket_send_inner(self, iov[i].iov_base, iov[i].iov_len, MSG_MORE);
+ if (ret != AMQP_STATUS_OK) {
+ goto exit;
+ }
+ }
+ ret = amqp_tcp_socket_send_inner(self, iov[i].iov_base, iov[i].iov_len, 0);
+
+exit:
+ return ret;
+
+#elif defined(SO_NOSIGPIPE) || !defined(MSG_NOSIGNAL)
+ int i;
+ ssize_t len_left = 0;
+
+
+ struct iovec *iov_left = iov;
+ int iovcnt_left = iovcnt;
+
+ for (int i = 0; i < iovcnt; ++i) {
+ len_left += iov[i].iov_len;
+ }
+
+start:
+ ret = writev(self->sockfd, iov_left, iovcnt_left);
+
+ if (ret < 0) {
+ self->internal_error = amqp_os_socket_error();
+ if (EINTR == self->internal_error) {
+ goto start;
+ } else {
+ self->internal_error = amqp_os_socket_error();
+ ret = AMQP_STATUS_SOCKET_ERROR;
+ }
+ } else {
+ if (ret == len_left) {
+ self->internal_error = 0;
+ ret = AMQP_STATUS_OK;
+ } else {
+ len_left -= ret;
+ for (i = 0; i < iovcnt_left; ++i) {
+ if (ret < (ssize_t)iov_left[i].iov_len) {
+ iov_left[i].iov_base = ((char*)iov_left[i].iov_base) + ret;
+ iov_left[i].iov_len -= ret;
+
+ iovcnt_left -= i;
+ iov_left += i;
+ break;
+ } else {
+ ret -= iov_left[i].iov_len;
+ }
+ }
+ goto start;
+ }
+ }
+
+ return ret;
+
+#else
+ int i;
+ size_t bytes = 0;
+ void *bufferp;
+
+ for (i = 0; i < iovcnt; ++i) {
+ bytes += iov[i].iov_len;
+ }
+
+ if (self->buffer_length < bytes) {
+ self->buffer = realloc(self->buffer, bytes);
+ if (NULL == self->buffer) {
+ self->buffer_length = 0;
+ self->internal_error = 0;
+ ret = AMQP_STATUS_NO_MEMORY;
+ goto exit;
+ }
+ self->buffer_length = bytes;
+ }
+
+ bufferp = self->buffer;
+ for (i = 0; i < iovcnt; ++i) {
+ memcpy(bufferp, iov[i].iov_base, iov[i].iov_len);
+ bufferp += iov[i].iov_len;
+ }
+
+ ret = amqp_tcp_socket_send_inner(self, self->buffer, bytes, 0);
+
+exit:
+ return ret;
+#endif
}
static ssize_t
amqp_tcp_socket_recv(void *base, void *buf, size_t len, int flags)
{
struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
- return recv(self->sockfd, buf, len, flags);
+ ssize_t ret;
+
+start:
+ ret = recv(self->sockfd, buf, len, flags);
+
+ if (0 > ret) {
+ self->internal_error = amqp_os_socket_error();
+ if (EINTR == self->internal_error) {
+ goto start;
+ } else {
+ ret = AMQP_STATUS_SOCKET_ERROR;
+ }
+ } else if (0 == ret) {
+ ret = AMQP_STATUS_CONNECTION_CLOSED;
+ }
+
+ return ret;
}
static int
@@ -62,9 +225,11 @@ amqp_tcp_socket_open(void *base, const char *host, int port)
struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
self->sockfd = amqp_open_socket(host, port);
if (0 > self->sockfd) {
- return -1;
+ int err = self->sockfd;
+ self->sockfd = -1;
+ return err;
}
- return 0;
+ return AMQP_STATUS_OK;
}
static int
@@ -74,6 +239,7 @@ amqp_tcp_socket_close(void *base)
int status = -1;
if (self) {
status = amqp_os_socket_close(self->sockfd);
+ free(self->buffer);
free(self);
}
@@ -87,7 +253,8 @@ amqp_tcp_socket_close(void *base)
static int
amqp_tcp_socket_error(AMQP_UNUSED void *base)
{
- return amqp_os_socket_error();
+ struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
+ return self->internal_error;
}
static int