diff options
author | Alan Antonuk <alan.antonuk@gmail.com> | 2013-06-11 10:56:07 -0700 |
---|---|---|
committer | Alan Antonuk <alan.antonuk@gmail.com> | 2013-06-13 16:40:29 -0700 |
commit | 22e41b8b52bace283c424d9a125656fcb0a41120 (patch) | |
tree | 24a7b782a4f48643e6d7303884d00e32385c98a2 | |
parent | 7231d921b3db2f7d62e716976c31abdbf8e0edc5 (diff) | |
download | rabbitmq-c-github-ask-22e41b8b52bace283c424d9a125656fcb0a41120.tar.gz |
Improve error handling in socket functions
-rw-r--r-- | librabbitmq/amqp_connection.c | 9 | ||||
-rw-r--r-- | librabbitmq/amqp_openssl.c | 68 | ||||
-rw-r--r-- | librabbitmq/amqp_socket.c | 13 | ||||
-rw-r--r-- | librabbitmq/amqp_socket.h | 27 | ||||
-rw-r--r-- | librabbitmq/amqp_tcp_socket.c | 183 |
5 files changed, 240 insertions, 60 deletions
diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c index 392ed62..f3552e9 100644 --- a/librabbitmq/amqp_connection.c +++ b/librabbitmq/amqp_connection.c @@ -448,13 +448,8 @@ int amqp_send_frame(amqp_connection_state_t state, amqp_e32(out_frame, 3, out_frame_len); amqp_e8(out_frame, out_frame_len + HEADER_SIZE, AMQP_FRAME_END); res = amqp_socket_send(state->socket, out_frame, - out_frame_len + HEADER_SIZE + FOOTER_SIZE, - MSG_NOSIGNAL); + out_frame_len + HEADER_SIZE + FOOTER_SIZE); } - if (res < 0) { - return amqp_socket_error(state->socket); - } else { - return AMQP_STATUS_OK; - } + return res; } diff --git a/librabbitmq/amqp_openssl.c b/librabbitmq/amqp_openssl.c index a42b109..9b962b5 100644 --- a/librabbitmq/amqp_openssl.c +++ b/librabbitmq/amqp_openssl.c @@ -71,49 +71,55 @@ struct amqp_ssl_socket_t { static ssize_t amqp_ssl_socket_send(void *base, const void *buf, - size_t len, - AMQP_UNUSED int flags) + size_t len) { struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; - ssize_t sent; + ssize_t res; ERR_clear_error(); - self->last_error = 0; - sent = SSL_write(self->ssl, buf, len); - if (0 >= sent) { - self->last_error = AMQP_STATUS_SSL_ERROR; - switch (SSL_get_error(self->ssl, sent)) { - case SSL_ERROR_NONE: - case SSL_ERROR_ZERO_RETURN: - case SSL_ERROR_WANT_READ: - case SSL_ERROR_WANT_WRITE: - sent = 0; - break; + self->internal_error = 0; + + /* This will only return on error, or once the whole buffer has been + * written to the SSL stream. See SSL_MODE_ENABLE_PARTIAL_WRITE */ + res = SSL_write(self->ssl, buf, len); + if (0 >= res) { + self->internal_error = SSL_get_error(self->ssl, res); + /* TODO: Close connection if it isn't already? */ + /* TODO: Possibly be more intelligent in reporting WHAT went wrong */ + switch (self->internal_error) { + case SSL_ERROR_ZERO_RETURN: + res = AMQP_STATUS_CONNECTION_CLOSED; + break; + default: + res = AMQP_STATUS_SSL_ERROR; + break; } + } else { + self->internal_error = 0; + res = AMQP_STATUS_OK; } - return sent; + + return res; } static ssize_t amqp_ssl_socket_writev(void *base, - const struct iovec *iov, + struct iovec *iov, int iovcnt) { struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; - ssize_t written = -1; + ssize_t ret = -1; char *bufferp; size_t bytes; int i; - self->last_error = 0; bytes = 0; for (i = 0; i < iovcnt; ++i) { bytes += iov[i].iov_len; } if (self->length < bytes) { - free(self->buffer); - self->buffer = malloc(bytes); + self->buffer = realloc(self->buffer, bytes); if (!self->buffer) { self->length = 0; - self->last_error = AMQP_STATUS_NO_MEMORY; + ret = AMQP_STATUS_NO_MEMORY; goto exit; } self->length = bytes; @@ -123,9 +129,9 @@ amqp_ssl_socket_writev(void *base, memcpy(bufferp, iov[i].iov_base, iov[i].iov_len); bufferp += iov[i].iov_len; } - written = amqp_ssl_socket_send(self, self->buffer, bytes, 0); + ret = amqp_ssl_socket_send(self, self->buffer, bytes); exit: - return written; + return ret; } static ssize_t @@ -137,17 +143,21 @@ amqp_ssl_socket_recv(void *base, struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; ssize_t received; ERR_clear_error(); - self->last_error = 0; + self->internal_error = 0; + received = SSL_read(self->ssl, buf, len); if (0 >= received) { - self->last_error = AMQP_STATUS_SSL_ERROR; - switch(SSL_get_error(self->ssl, received)) { - case SSL_ERROR_WANT_READ: - case SSL_ERROR_WANT_WRITE: - received = 0; + self->internal_error = SSL_get_error(self->ssl, received); + switch(self->internal_error) { + case SSL_ERROR_ZERO_RETURN: + received = AMQP_STATUS_CONNECTION_CLOSED; + break; + default: + received = AMQP_STATUS_SSL_ERROR; break; } } + return received; } diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index 9ab9fec..7649c74 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -50,7 +50,7 @@ #include <string.h> ssize_t -amqp_socket_writev(amqp_socket_t *self, const struct iovec *iov, int iovcnt) +amqp_socket_writev(amqp_socket_t *self, struct iovec *iov, int iovcnt) { assert(self); assert(self->klass->writev); @@ -58,11 +58,11 @@ amqp_socket_writev(amqp_socket_t *self, const struct iovec *iov, int iovcnt) } ssize_t -amqp_socket_send(amqp_socket_t *self, const void *buf, size_t len, int flags) +amqp_socket_send(amqp_socket_t *self, const void *buf, size_t len) { assert(self); assert(self->klass->send); - return self->klass->send(self, buf, len, flags); + return self->klass->send(self, buf, len); } ssize_t @@ -179,7 +179,7 @@ int amqp_send_header(amqp_connection_state_t state) AMQP_PROTOCOL_VERSION_MINOR, AMQP_PROTOCOL_VERSION_REVISION }; - return amqp_socket_send(state->socket, header, 8, MSG_NOSIGNAL); + return amqp_socket_send(state->socket, header, sizeof(header)); } static amqp_bytes_t sasl_method_name(amqp_sasl_method_enum method) @@ -496,7 +496,10 @@ static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state, uint16_t server_heartbeat; amqp_rpc_reply_t result; - amqp_send_header(state); + res = amqp_send_header(state); + if (AMQP_STATUS_OK != res) { + goto error_res; + } res = amqp_simple_wait_method(state, 0, AMQP_CONNECTION_START_METHOD, &method); diff --git a/librabbitmq/amqp_socket.h b/librabbitmq/amqp_socket.h index ef3462b..695befb 100644 --- a/librabbitmq/amqp_socket.h +++ b/librabbitmq/amqp_socket.h @@ -34,8 +34,8 @@ AMQP_BEGIN_DECLS /* Socket callbacks. */ -typedef ssize_t (*amqp_socket_writev_fn)(void *, const struct iovec *, int); -typedef ssize_t (*amqp_socket_send_fn)(void *, const void *, size_t, int); +typedef ssize_t (*amqp_socket_writev_fn)(void *, struct iovec *, int); +typedef ssize_t (*amqp_socket_send_fn)(void *, const void *, size_t); typedef ssize_t (*amqp_socket_recv_fn)(void *, void *, size_t, int); typedef int (*amqp_socket_open_fn)(void *, const char *, int); typedef int (*amqp_socket_close_fn)(void *); @@ -61,43 +61,48 @@ struct amqp_socket_t_ { /** * Write to a socket. * - * This function is analagous to writev(2). + * This function wraps writev(2) functionality. + * + * This function will only reutrn on error, or when all of the bytes referred + * to in iov have been sent. NOTE: this function may modify the iov struct. * * \param [in,out] self A socket object. * \param [in] iov One or more data vecors. * \param [in] iovcnt The number of vectors in \e iov. * - * \return The number of bytes written, or -1 if an error occurred. + * \return AMQP_STATUS_OK on success. amqp_status_enum value otherwise */ ssize_t -amqp_socket_writev(amqp_socket_t *self, const struct iovec *iov, int iovcnt); +amqp_socket_writev(amqp_socket_t *self, struct iovec *iov, int iovcnt); /** * Send a message from a socket. * - * This function is analagous to send(2). + * This function wraps send(2) functionality. + * + * This function will only return on error, or when all of the bytes in buf + * have been sent, or when an error occurs. * * \param [in,out] self A socket object. * \param [in] buf A buffer to read from. * \param [in] len The number of bytes in \e buf. - * \param [in] flags Send flags, implementation specific. * - * \return The number of bytes sent, or -1 if an error occurred. + * \return AMQP_STATUS_OK on success. amqp_status_enum value otherwise */ ssize_t -amqp_socket_send(amqp_socket_t *self, const void *buf, size_t len, int flags); +amqp_socket_send(amqp_socket_t *self, const void *buf, size_t len); /** * Receive a message from a socket. * - * This function is analagous to recv(2). + * This function wraps recv(2) functionality. * * \param [in,out] self A socket object. * \param [out] buf A buffer to write to. * \param [in] len The number of bytes at \e buf. * \param [in] flags Receive flags, implementation specific. * - * \return The number of bytes received, or -1 if an error occurred. + * \return The number of bytes received, or < 0 on error (\ref amqp_status_enum) */ ssize_t amqp_socket_recv(amqp_socket_t *self, void *buf, size_t len, int flags); diff --git a/librabbitmq/amqp_tcp_socket.c b/librabbitmq/amqp_tcp_socket.c index 1a155bb..c699fc5 100644 --- a/librabbitmq/amqp_tcp_socket.c +++ b/librabbitmq/amqp_tcp_socket.c @@ -27,33 +27,196 @@ #include "amqp_private.h" #include "amqp_tcp_socket.h" + +#include <errno.h> #include <stdio.h> #include <stdlib.h> struct amqp_tcp_socket_t { const struct amqp_socket_class_t *klass; int sockfd; + void *buffer; + size_t buffer_length; + int internal_error; }; + static ssize_t -amqp_tcp_socket_writev(void *base, const struct iovec *iov, int iovcnt) +amqp_tcp_socket_send_inner(void *base, const void *buf, size_t len, int flags) { struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; - return amqp_os_socket_writev(self->sockfd, iov, iovcnt); + ssize_t res; + + const char *buf_left = buf; + ssize_t len_left = len; + +#ifdef MSG_NOSIGNAL + flags |= MSG_NOSIGNAL; +#endif + +start: + res = send(self->sockfd, buf, len, flags); + + if (res < 0) { + self->internal_error = amqp_os_socket_error(); + if (EINTR == self->internal_error) { + goto start; + } else { + res = AMQP_STATUS_SOCKET_ERROR; + } + } else { + if (res == len_left) { + self->internal_error = 0; + res = AMQP_STATUS_OK; + } else { + buf_left += res; + len_left -= res; + goto start; + } + } + + return res; } static ssize_t -amqp_tcp_socket_send(void *base, const void *buf, size_t len, int flags) +amqp_tcp_socket_send(void *base, const void *buf, size_t len) +{ + return amqp_tcp_socket_send_inner(base, buf, len, 0); +} + +static ssize_t +amqp_tcp_socket_writev(void *base, struct iovec *iov, int iovcnt) { struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; - return send(self->sockfd, buf, len, flags); + ssize_t ret; + +#if defined(_WIN32) + DWORD res; + /* Making the assumption here that WSAsend won't do a partial send + * unless an error occured, in which case we're hosed so it doesn't matter */ + if (WSASend(self->sockfd, (LPWSABUF)iov, iovcnt, &res, 0, NULL, NULL) == 0) { + self->internal_error = 0; + ret = AMQP_STATUS_OK; + } else { + self->internal_error = WSAGetLastError(); + ret = AMQP_STATUS_SOCKET_ERROR; + } + return ret; + +#elif defined(MSG_MORE) + int i; + for (i = 0; i < iovcnt - 1; ++i) { + ret = amqp_tcp_socket_send_inner(self, iov[i].iov_base, iov[i].iov_len, MSG_MORE); + if (ret != AMQP_STATUS_OK) { + goto exit; + } + } + ret = amqp_tcp_socket_send_inner(self, iov[i].iov_base, iov[i].iov_len, 0); + +exit: + return ret; + +#elif defined(SO_NOSIGPIPE) || !defined(MSG_NOSIGNAL) + int i; + ssize_t len_left = 0; + + + struct iovec *iov_left = iov; + int iovcnt_left = iovcnt; + + for (int i = 0; i < iovcnt; ++i) { + len_left += iov[i].iov_len; + } + +start: + ret = writev(self->sockfd, iov_left, iovcnt_left); + + if (ret < 0) { + self->internal_error = amqp_os_socket_error(); + if (EINTR == self->internal_error) { + goto start; + } else { + self->internal_error = amqp_os_socket_error(); + ret = AMQP_STATUS_SOCKET_ERROR; + } + } else { + if (ret == len_left) { + self->internal_error = 0; + ret = AMQP_STATUS_OK; + } else { + len_left -= ret; + for (i = 0; i < iovcnt_left; ++i) { + if (ret < (ssize_t)iov_left[i].iov_len) { + iov_left[i].iov_base = ((char*)iov_left[i].iov_base) + ret; + iov_left[i].iov_len -= ret; + + iovcnt_left -= i; + iov_left += i; + break; + } else { + ret -= iov_left[i].iov_len; + } + } + goto start; + } + } + + return ret; + +#else + int i; + size_t bytes = 0; + void *bufferp; + + for (i = 0; i < iovcnt; ++i) { + bytes += iov[i].iov_len; + } + + if (self->buffer_length < bytes) { + self->buffer = realloc(self->buffer, bytes); + if (NULL == self->buffer) { + self->buffer_length = 0; + self->internal_error = 0; + ret = AMQP_STATUS_NO_MEMORY; + goto exit; + } + self->buffer_length = bytes; + } + + bufferp = self->buffer; + for (i = 0; i < iovcnt; ++i) { + memcpy(bufferp, iov[i].iov_base, iov[i].iov_len); + bufferp += iov[i].iov_len; + } + + ret = amqp_tcp_socket_send_inner(self, self->buffer, bytes, 0); + +exit: + return ret; +#endif } static ssize_t amqp_tcp_socket_recv(void *base, void *buf, size_t len, int flags) { struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; - return recv(self->sockfd, buf, len, flags); + ssize_t ret; + +start: + ret = recv(self->sockfd, buf, len, flags); + + if (0 > ret) { + self->internal_error = amqp_os_socket_error(); + if (EINTR == self->internal_error) { + goto start; + } else { + ret = AMQP_STATUS_SOCKET_ERROR; + } + } else if (0 == ret) { + ret = AMQP_STATUS_CONNECTION_CLOSED; + } + + return ret; } static int @@ -62,9 +225,11 @@ amqp_tcp_socket_open(void *base, const char *host, int port) struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; self->sockfd = amqp_open_socket(host, port); if (0 > self->sockfd) { - return -1; + int err = self->sockfd; + self->sockfd = -1; + return err; } - return 0; + return AMQP_STATUS_OK; } static int @@ -74,6 +239,7 @@ amqp_tcp_socket_close(void *base) int status = -1; if (self) { status = amqp_os_socket_close(self->sockfd); + free(self->buffer); free(self); } @@ -87,7 +253,8 @@ amqp_tcp_socket_close(void *base) static int amqp_tcp_socket_error(AMQP_UNUSED void *base) { - return amqp_os_socket_error(); + struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; + return self->internal_error; } static int |