summaryrefslogtreecommitdiff
path: root/librabbitmq
diff options
context:
space:
mode:
authorAlan Antonuk <alan.antonuk@gmail.com>2013-06-11 10:56:07 -0700
committerAlan Antonuk <alan.antonuk@gmail.com>2013-06-13 16:40:29 -0700
commit22e41b8b52bace283c424d9a125656fcb0a41120 (patch)
tree24a7b782a4f48643e6d7303884d00e32385c98a2 /librabbitmq
parent7231d921b3db2f7d62e716976c31abdbf8e0edc5 (diff)
downloadrabbitmq-c-github-ask-22e41b8b52bace283c424d9a125656fcb0a41120.tar.gz
Improve error handling in socket functions
Diffstat (limited to 'librabbitmq')
-rw-r--r--librabbitmq/amqp_connection.c9
-rw-r--r--librabbitmq/amqp_openssl.c68
-rw-r--r--librabbitmq/amqp_socket.c13
-rw-r--r--librabbitmq/amqp_socket.h27
-rw-r--r--librabbitmq/amqp_tcp_socket.c183
5 files changed, 240 insertions, 60 deletions
diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c
index 392ed62..f3552e9 100644
--- a/librabbitmq/amqp_connection.c
+++ b/librabbitmq/amqp_connection.c
@@ -448,13 +448,8 @@ int amqp_send_frame(amqp_connection_state_t state,
amqp_e32(out_frame, 3, out_frame_len);
amqp_e8(out_frame, out_frame_len + HEADER_SIZE, AMQP_FRAME_END);
res = amqp_socket_send(state->socket, out_frame,
- out_frame_len + HEADER_SIZE + FOOTER_SIZE,
- MSG_NOSIGNAL);
+ out_frame_len + HEADER_SIZE + FOOTER_SIZE);
}
- if (res < 0) {
- return amqp_socket_error(state->socket);
- } else {
- return AMQP_STATUS_OK;
- }
+ return res;
}
diff --git a/librabbitmq/amqp_openssl.c b/librabbitmq/amqp_openssl.c
index a42b109..9b962b5 100644
--- a/librabbitmq/amqp_openssl.c
+++ b/librabbitmq/amqp_openssl.c
@@ -71,49 +71,55 @@ struct amqp_ssl_socket_t {
static ssize_t
amqp_ssl_socket_send(void *base,
const void *buf,
- size_t len,
- AMQP_UNUSED int flags)
+ size_t len)
{
struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
- ssize_t sent;
+ ssize_t res;
ERR_clear_error();
- self->last_error = 0;
- sent = SSL_write(self->ssl, buf, len);
- if (0 >= sent) {
- self->last_error = AMQP_STATUS_SSL_ERROR;
- switch (SSL_get_error(self->ssl, sent)) {
- case SSL_ERROR_NONE:
- case SSL_ERROR_ZERO_RETURN:
- case SSL_ERROR_WANT_READ:
- case SSL_ERROR_WANT_WRITE:
- sent = 0;
- break;
+ self->internal_error = 0;
+
+ /* This will only return on error, or once the whole buffer has been
+ * written to the SSL stream. See SSL_MODE_ENABLE_PARTIAL_WRITE */
+ res = SSL_write(self->ssl, buf, len);
+ if (0 >= res) {
+ self->internal_error = SSL_get_error(self->ssl, res);
+ /* TODO: Close connection if it isn't already? */
+ /* TODO: Possibly be more intelligent in reporting WHAT went wrong */
+ switch (self->internal_error) {
+ case SSL_ERROR_ZERO_RETURN:
+ res = AMQP_STATUS_CONNECTION_CLOSED;
+ break;
+ default:
+ res = AMQP_STATUS_SSL_ERROR;
+ break;
}
+ } else {
+ self->internal_error = 0;
+ res = AMQP_STATUS_OK;
}
- return sent;
+
+ return res;
}
static ssize_t
amqp_ssl_socket_writev(void *base,
- const struct iovec *iov,
+ struct iovec *iov,
int iovcnt)
{
struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
- ssize_t written = -1;
+ ssize_t ret = -1;
char *bufferp;
size_t bytes;
int i;
- self->last_error = 0;
bytes = 0;
for (i = 0; i < iovcnt; ++i) {
bytes += iov[i].iov_len;
}
if (self->length < bytes) {
- free(self->buffer);
- self->buffer = malloc(bytes);
+ self->buffer = realloc(self->buffer, bytes);
if (!self->buffer) {
self->length = 0;
- self->last_error = AMQP_STATUS_NO_MEMORY;
+ ret = AMQP_STATUS_NO_MEMORY;
goto exit;
}
self->length = bytes;
@@ -123,9 +129,9 @@ amqp_ssl_socket_writev(void *base,
memcpy(bufferp, iov[i].iov_base, iov[i].iov_len);
bufferp += iov[i].iov_len;
}
- written = amqp_ssl_socket_send(self, self->buffer, bytes, 0);
+ ret = amqp_ssl_socket_send(self, self->buffer, bytes);
exit:
- return written;
+ return ret;
}
static ssize_t
@@ -137,17 +143,21 @@ amqp_ssl_socket_recv(void *base,
struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
ssize_t received;
ERR_clear_error();
- self->last_error = 0;
+ self->internal_error = 0;
+
received = SSL_read(self->ssl, buf, len);
if (0 >= received) {
- self->last_error = AMQP_STATUS_SSL_ERROR;
- switch(SSL_get_error(self->ssl, received)) {
- case SSL_ERROR_WANT_READ:
- case SSL_ERROR_WANT_WRITE:
- received = 0;
+ self->internal_error = SSL_get_error(self->ssl, received);
+ switch(self->internal_error) {
+ case SSL_ERROR_ZERO_RETURN:
+ received = AMQP_STATUS_CONNECTION_CLOSED;
+ break;
+ default:
+ received = AMQP_STATUS_SSL_ERROR;
break;
}
}
+
return received;
}
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index 9ab9fec..7649c74 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -50,7 +50,7 @@
#include <string.h>
ssize_t
-amqp_socket_writev(amqp_socket_t *self, const struct iovec *iov, int iovcnt)
+amqp_socket_writev(amqp_socket_t *self, struct iovec *iov, int iovcnt)
{
assert(self);
assert(self->klass->writev);
@@ -58,11 +58,11 @@ amqp_socket_writev(amqp_socket_t *self, const struct iovec *iov, int iovcnt)
}
ssize_t
-amqp_socket_send(amqp_socket_t *self, const void *buf, size_t len, int flags)
+amqp_socket_send(amqp_socket_t *self, const void *buf, size_t len)
{
assert(self);
assert(self->klass->send);
- return self->klass->send(self, buf, len, flags);
+ return self->klass->send(self, buf, len);
}
ssize_t
@@ -179,7 +179,7 @@ int amqp_send_header(amqp_connection_state_t state)
AMQP_PROTOCOL_VERSION_MINOR,
AMQP_PROTOCOL_VERSION_REVISION
};
- return amqp_socket_send(state->socket, header, 8, MSG_NOSIGNAL);
+ return amqp_socket_send(state->socket, header, sizeof(header));
}
static amqp_bytes_t sasl_method_name(amqp_sasl_method_enum method)
@@ -496,7 +496,10 @@ static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state,
uint16_t server_heartbeat;
amqp_rpc_reply_t result;
- amqp_send_header(state);
+ res = amqp_send_header(state);
+ if (AMQP_STATUS_OK != res) {
+ goto error_res;
+ }
res = amqp_simple_wait_method(state, 0, AMQP_CONNECTION_START_METHOD,
&method);
diff --git a/librabbitmq/amqp_socket.h b/librabbitmq/amqp_socket.h
index ef3462b..695befb 100644
--- a/librabbitmq/amqp_socket.h
+++ b/librabbitmq/amqp_socket.h
@@ -34,8 +34,8 @@
AMQP_BEGIN_DECLS
/* Socket callbacks. */
-typedef ssize_t (*amqp_socket_writev_fn)(void *, const struct iovec *, int);
-typedef ssize_t (*amqp_socket_send_fn)(void *, const void *, size_t, int);
+typedef ssize_t (*amqp_socket_writev_fn)(void *, struct iovec *, int);
+typedef ssize_t (*amqp_socket_send_fn)(void *, const void *, size_t);
typedef ssize_t (*amqp_socket_recv_fn)(void *, void *, size_t, int);
typedef int (*amqp_socket_open_fn)(void *, const char *, int);
typedef int (*amqp_socket_close_fn)(void *);
@@ -61,43 +61,48 @@ struct amqp_socket_t_ {
/**
* Write to a socket.
*
- * This function is analagous to writev(2).
+ * This function wraps writev(2) functionality.
+ *
+ * This function will only reutrn on error, or when all of the bytes referred
+ * to in iov have been sent. NOTE: this function may modify the iov struct.
*
* \param [in,out] self A socket object.
* \param [in] iov One or more data vecors.
* \param [in] iovcnt The number of vectors in \e iov.
*
- * \return The number of bytes written, or -1 if an error occurred.
+ * \return AMQP_STATUS_OK on success. amqp_status_enum value otherwise
*/
ssize_t
-amqp_socket_writev(amqp_socket_t *self, const struct iovec *iov, int iovcnt);
+amqp_socket_writev(amqp_socket_t *self, struct iovec *iov, int iovcnt);
/**
* Send a message from a socket.
*
- * This function is analagous to send(2).
+ * This function wraps send(2) functionality.
+ *
+ * This function will only return on error, or when all of the bytes in buf
+ * have been sent, or when an error occurs.
*
* \param [in,out] self A socket object.
* \param [in] buf A buffer to read from.
* \param [in] len The number of bytes in \e buf.
- * \param [in] flags Send flags, implementation specific.
*
- * \return The number of bytes sent, or -1 if an error occurred.
+ * \return AMQP_STATUS_OK on success. amqp_status_enum value otherwise
*/
ssize_t
-amqp_socket_send(amqp_socket_t *self, const void *buf, size_t len, int flags);
+amqp_socket_send(amqp_socket_t *self, const void *buf, size_t len);
/**
* Receive a message from a socket.
*
- * This function is analagous to recv(2).
+ * This function wraps recv(2) functionality.
*
* \param [in,out] self A socket object.
* \param [out] buf A buffer to write to.
* \param [in] len The number of bytes at \e buf.
* \param [in] flags Receive flags, implementation specific.
*
- * \return The number of bytes received, or -1 if an error occurred.
+ * \return The number of bytes received, or < 0 on error (\ref amqp_status_enum)
*/
ssize_t
amqp_socket_recv(amqp_socket_t *self, void *buf, size_t len, int flags);
diff --git a/librabbitmq/amqp_tcp_socket.c b/librabbitmq/amqp_tcp_socket.c
index 1a155bb..c699fc5 100644
--- a/librabbitmq/amqp_tcp_socket.c
+++ b/librabbitmq/amqp_tcp_socket.c
@@ -27,33 +27,196 @@
#include "amqp_private.h"
#include "amqp_tcp_socket.h"
+
+#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
struct amqp_tcp_socket_t {
const struct amqp_socket_class_t *klass;
int sockfd;
+ void *buffer;
+ size_t buffer_length;
+ int internal_error;
};
+
static ssize_t
-amqp_tcp_socket_writev(void *base, const struct iovec *iov, int iovcnt)
+amqp_tcp_socket_send_inner(void *base, const void *buf, size_t len, int flags)
{
struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
- return amqp_os_socket_writev(self->sockfd, iov, iovcnt);
+ ssize_t res;
+
+ const char *buf_left = buf;
+ ssize_t len_left = len;
+
+#ifdef MSG_NOSIGNAL
+ flags |= MSG_NOSIGNAL;
+#endif
+
+start:
+ res = send(self->sockfd, buf, len, flags);
+
+ if (res < 0) {
+ self->internal_error = amqp_os_socket_error();
+ if (EINTR == self->internal_error) {
+ goto start;
+ } else {
+ res = AMQP_STATUS_SOCKET_ERROR;
+ }
+ } else {
+ if (res == len_left) {
+ self->internal_error = 0;
+ res = AMQP_STATUS_OK;
+ } else {
+ buf_left += res;
+ len_left -= res;
+ goto start;
+ }
+ }
+
+ return res;
}
static ssize_t
-amqp_tcp_socket_send(void *base, const void *buf, size_t len, int flags)
+amqp_tcp_socket_send(void *base, const void *buf, size_t len)
+{
+ return amqp_tcp_socket_send_inner(base, buf, len, 0);
+}
+
+static ssize_t
+amqp_tcp_socket_writev(void *base, struct iovec *iov, int iovcnt)
{
struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
- return send(self->sockfd, buf, len, flags);
+ ssize_t ret;
+
+#if defined(_WIN32)
+ DWORD res;
+ /* Making the assumption here that WSAsend won't do a partial send
+ * unless an error occured, in which case we're hosed so it doesn't matter */
+ if (WSASend(self->sockfd, (LPWSABUF)iov, iovcnt, &res, 0, NULL, NULL) == 0) {
+ self->internal_error = 0;
+ ret = AMQP_STATUS_OK;
+ } else {
+ self->internal_error = WSAGetLastError();
+ ret = AMQP_STATUS_SOCKET_ERROR;
+ }
+ return ret;
+
+#elif defined(MSG_MORE)
+ int i;
+ for (i = 0; i < iovcnt - 1; ++i) {
+ ret = amqp_tcp_socket_send_inner(self, iov[i].iov_base, iov[i].iov_len, MSG_MORE);
+ if (ret != AMQP_STATUS_OK) {
+ goto exit;
+ }
+ }
+ ret = amqp_tcp_socket_send_inner(self, iov[i].iov_base, iov[i].iov_len, 0);
+
+exit:
+ return ret;
+
+#elif defined(SO_NOSIGPIPE) || !defined(MSG_NOSIGNAL)
+ int i;
+ ssize_t len_left = 0;
+
+
+ struct iovec *iov_left = iov;
+ int iovcnt_left = iovcnt;
+
+ for (int i = 0; i < iovcnt; ++i) {
+ len_left += iov[i].iov_len;
+ }
+
+start:
+ ret = writev(self->sockfd, iov_left, iovcnt_left);
+
+ if (ret < 0) {
+ self->internal_error = amqp_os_socket_error();
+ if (EINTR == self->internal_error) {
+ goto start;
+ } else {
+ self->internal_error = amqp_os_socket_error();
+ ret = AMQP_STATUS_SOCKET_ERROR;
+ }
+ } else {
+ if (ret == len_left) {
+ self->internal_error = 0;
+ ret = AMQP_STATUS_OK;
+ } else {
+ len_left -= ret;
+ for (i = 0; i < iovcnt_left; ++i) {
+ if (ret < (ssize_t)iov_left[i].iov_len) {
+ iov_left[i].iov_base = ((char*)iov_left[i].iov_base) + ret;
+ iov_left[i].iov_len -= ret;
+
+ iovcnt_left -= i;
+ iov_left += i;
+ break;
+ } else {
+ ret -= iov_left[i].iov_len;
+ }
+ }
+ goto start;
+ }
+ }
+
+ return ret;
+
+#else
+ int i;
+ size_t bytes = 0;
+ void *bufferp;
+
+ for (i = 0; i < iovcnt; ++i) {
+ bytes += iov[i].iov_len;
+ }
+
+ if (self->buffer_length < bytes) {
+ self->buffer = realloc(self->buffer, bytes);
+ if (NULL == self->buffer) {
+ self->buffer_length = 0;
+ self->internal_error = 0;
+ ret = AMQP_STATUS_NO_MEMORY;
+ goto exit;
+ }
+ self->buffer_length = bytes;
+ }
+
+ bufferp = self->buffer;
+ for (i = 0; i < iovcnt; ++i) {
+ memcpy(bufferp, iov[i].iov_base, iov[i].iov_len);
+ bufferp += iov[i].iov_len;
+ }
+
+ ret = amqp_tcp_socket_send_inner(self, self->buffer, bytes, 0);
+
+exit:
+ return ret;
+#endif
}
static ssize_t
amqp_tcp_socket_recv(void *base, void *buf, size_t len, int flags)
{
struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
- return recv(self->sockfd, buf, len, flags);
+ ssize_t ret;
+
+start:
+ ret = recv(self->sockfd, buf, len, flags);
+
+ if (0 > ret) {
+ self->internal_error = amqp_os_socket_error();
+ if (EINTR == self->internal_error) {
+ goto start;
+ } else {
+ ret = AMQP_STATUS_SOCKET_ERROR;
+ }
+ } else if (0 == ret) {
+ ret = AMQP_STATUS_CONNECTION_CLOSED;
+ }
+
+ return ret;
}
static int
@@ -62,9 +225,11 @@ amqp_tcp_socket_open(void *base, const char *host, int port)
struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
self->sockfd = amqp_open_socket(host, port);
if (0 > self->sockfd) {
- return -1;
+ int err = self->sockfd;
+ self->sockfd = -1;
+ return err;
}
- return 0;
+ return AMQP_STATUS_OK;
}
static int
@@ -74,6 +239,7 @@ amqp_tcp_socket_close(void *base)
int status = -1;
if (self) {
status = amqp_os_socket_close(self->sockfd);
+ free(self->buffer);
free(self);
}
@@ -87,7 +253,8 @@ amqp_tcp_socket_close(void *base)
static int
amqp_tcp_socket_error(AMQP_UNUSED void *base)
{
- return amqp_os_socket_error();
+ struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
+ return self->internal_error;
}
static int