diff options
author | Alan Antonuk <alan.antonuk@gmail.com> | 2014-12-29 22:42:21 -0800 |
---|---|---|
committer | Alan Antonuk <alan.antonuk@gmail.com> | 2014-12-29 22:42:21 -0800 |
commit | ee54e274fe3c8d41bba0f274b997a587bff2482b (patch) | |
tree | 55756e4752e58112a5a9d61642047394eee2419b | |
parent | 9626dd5cd5f78894f1416a1afd2d624ddd4904ae (diff) | |
download | rabbitmq-c-ee54e274fe3c8d41bba0f274b997a587bff2482b.tar.gz |
Check for double close/open in socket impl
Check to state of socket when doing open/read/write/close to prevent double-open
and double-close issues with the socket implementation.
Fixes #228
-rw-r--r-- | librabbitmq/amqp.h | 4 | ||||
-rw-r--r-- | librabbitmq/amqp_openssl.c | 27 | ||||
-rw-r--r-- | librabbitmq/amqp_tcp_socket.c | 25 |
3 files changed, 41 insertions, 15 deletions
diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h index 9fcd206..4ea0084 100644 --- a/librabbitmq/amqp.h +++ b/librabbitmq/amqp.h @@ -698,6 +698,10 @@ typedef enum amqp_status_enum_ heartbeat */ AMQP_STATUS_UNEXPECTED_STATE = -0x0010, /**< Unexpected protocol state */ + AMQP_STATUS_SOCKET_CLOSED = -0x0011, /**< Underlying socket is + closed */ + AMQP_STATUS_SOCKET_INUSE = -0x0012, /**< Underlying socket is + already open */ AMQP_STATUS_TCP_ERROR = -0x0100, /**< A generic TCP error occurred */ diff --git a/librabbitmq/amqp_openssl.c b/librabbitmq/amqp_openssl.c index 393a84e..85b9ca5 100644 --- a/librabbitmq/amqp_openssl.c +++ b/librabbitmq/amqp_openssl.c @@ -117,6 +117,10 @@ amqp_ssl_socket_writev(void *base, char *bufferp; size_t bytes; int i; + if (-1 == self->sockfd) { + return AMQP_STATUS_SOCKET_CLOSED; + } + bytes = 0; for (i = 0; i < iovcnt; ++i) { bytes += iov[i].iov_len; @@ -148,6 +152,9 @@ amqp_ssl_socket_recv(void *base, { struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; ssize_t received; + if (-1 == self->sockfd) { + return AMQP_STATUS_SOCKET_CLOSED; + } ERR_clear_error(); self->internal_error = 0; @@ -232,6 +239,9 @@ amqp_ssl_socket_open(void *base, const char *host, int port, struct timeval *tim struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; long result; int status; + if (-1 != self->sockfd) { + return AMQP_STATUS_SOCKET_INUSE; + } ERR_clear_error(); self->ssl = SSL_new(self->ctx); @@ -301,19 +311,18 @@ amqp_ssl_socket_close(void *base) { struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; - if (self->ssl) { - SSL_shutdown(self->ssl); - SSL_free(self->ssl); - self->ssl = NULL; + if (-1 == self->sockfd) { + return AMQP_STATUS_SOCKET_CLOSED; } - if (-1 != self->sockfd) { - if (amqp_os_socket_close(self->sockfd)) { - return AMQP_STATUS_SOCKET_ERROR; - } + SSL_shutdown(self->ssl); + SSL_free(self->ssl); + self->ssl = NULL; - self->sockfd = -1; + if (amqp_os_socket_close(self->sockfd)) { + return AMQP_STATUS_SOCKET_ERROR; } + self->sockfd = -1; return AMQP_STATUS_OK; } diff --git a/librabbitmq/amqp_tcp_socket.c b/librabbitmq/amqp_tcp_socket.c index ed38c06..5bece5b 100644 --- a/librabbitmq/amqp_tcp_socket.c +++ b/librabbitmq/amqp_tcp_socket.c @@ -46,10 +46,13 @@ amqp_tcp_socket_send_inner(void *base, const void *buf, size_t len, int flags) { struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; ssize_t res; - const char *buf_left = buf; ssize_t len_left = len; + if (-1 == self->sockfd) { + return AMQP_STATUS_SOCKET_CLOSED; + } + #ifdef MSG_NOSIGNAL flags |= MSG_NOSIGNAL; #endif @@ -89,6 +92,9 @@ amqp_tcp_socket_writev(void *base, struct iovec *iov, int iovcnt) { struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; ssize_t ret; + if (-1 == self->sockfd) { + return AMQP_STATUS_SOCKET_CLOSED; + } #if defined(_WIN32) DWORD res; @@ -201,6 +207,9 @@ amqp_tcp_socket_recv(void *base, void *buf, size_t len, int flags) { struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; ssize_t ret; + if (-1 == self->sockfd) { + return AMQP_STATUS_SOCKET_CLOSED; + } start: ret = recv(self->sockfd, buf, len, flags); @@ -223,6 +232,9 @@ static int amqp_tcp_socket_open(void *base, const char *host, int port, struct timeval *timeout) { struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; + if (-1 != self->sockfd) { + return AMQP_STATUS_SOCKET_INUSE; + } self->sockfd = amqp_open_socket_noblock(host, port, timeout); if (0 > self->sockfd) { int err = self->sockfd; @@ -236,13 +248,14 @@ static int amqp_tcp_socket_close(void *base) { struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; + if (-1 == self->sockfd) { + return AMQP_STATUS_SOCKET_CLOSED; + } - if (-1 != self->sockfd) { - if (amqp_os_socket_close(self->sockfd)) { - return AMQP_STATUS_SOCKET_ERROR; - } - self->sockfd = -1; + if (amqp_os_socket_close(self->sockfd)) { + return AMQP_STATUS_SOCKET_ERROR; } + self->sockfd = -1; return AMQP_STATUS_OK; } |