summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Antonuk <alan.antonuk@gmail.com>2014-12-29 22:42:21 -0800
committerAlan Antonuk <alan.antonuk@gmail.com>2014-12-29 22:42:21 -0800
commitee54e274fe3c8d41bba0f274b997a587bff2482b (patch)
tree55756e4752e58112a5a9d61642047394eee2419b
parent9626dd5cd5f78894f1416a1afd2d624ddd4904ae (diff)
downloadrabbitmq-c-ee54e274fe3c8d41bba0f274b997a587bff2482b.tar.gz
Check for double close/open in socket impl
Check to state of socket when doing open/read/write/close to prevent double-open and double-close issues with the socket implementation. Fixes #228
-rw-r--r--librabbitmq/amqp.h4
-rw-r--r--librabbitmq/amqp_openssl.c27
-rw-r--r--librabbitmq/amqp_tcp_socket.c25
3 files changed, 41 insertions, 15 deletions
diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h
index 9fcd206..4ea0084 100644
--- a/librabbitmq/amqp.h
+++ b/librabbitmq/amqp.h
@@ -698,6 +698,10 @@ typedef enum amqp_status_enum_
heartbeat */
AMQP_STATUS_UNEXPECTED_STATE = -0x0010, /**< Unexpected protocol
state */
+ AMQP_STATUS_SOCKET_CLOSED = -0x0011, /**< Underlying socket is
+ closed */
+ AMQP_STATUS_SOCKET_INUSE = -0x0012, /**< Underlying socket is
+ already open */
AMQP_STATUS_TCP_ERROR = -0x0100, /**< A generic TCP error
occurred */
diff --git a/librabbitmq/amqp_openssl.c b/librabbitmq/amqp_openssl.c
index 393a84e..85b9ca5 100644
--- a/librabbitmq/amqp_openssl.c
+++ b/librabbitmq/amqp_openssl.c
@@ -117,6 +117,10 @@ amqp_ssl_socket_writev(void *base,
char *bufferp;
size_t bytes;
int i;
+ if (-1 == self->sockfd) {
+ return AMQP_STATUS_SOCKET_CLOSED;
+ }
+
bytes = 0;
for (i = 0; i < iovcnt; ++i) {
bytes += iov[i].iov_len;
@@ -148,6 +152,9 @@ amqp_ssl_socket_recv(void *base,
{
struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
ssize_t received;
+ if (-1 == self->sockfd) {
+ return AMQP_STATUS_SOCKET_CLOSED;
+ }
ERR_clear_error();
self->internal_error = 0;
@@ -232,6 +239,9 @@ amqp_ssl_socket_open(void *base, const char *host, int port, struct timeval *tim
struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
long result;
int status;
+ if (-1 != self->sockfd) {
+ return AMQP_STATUS_SOCKET_INUSE;
+ }
ERR_clear_error();
self->ssl = SSL_new(self->ctx);
@@ -301,19 +311,18 @@ amqp_ssl_socket_close(void *base)
{
struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
- if (self->ssl) {
- SSL_shutdown(self->ssl);
- SSL_free(self->ssl);
- self->ssl = NULL;
+ if (-1 == self->sockfd) {
+ return AMQP_STATUS_SOCKET_CLOSED;
}
- if (-1 != self->sockfd) {
- if (amqp_os_socket_close(self->sockfd)) {
- return AMQP_STATUS_SOCKET_ERROR;
- }
+ SSL_shutdown(self->ssl);
+ SSL_free(self->ssl);
+ self->ssl = NULL;
- self->sockfd = -1;
+ if (amqp_os_socket_close(self->sockfd)) {
+ return AMQP_STATUS_SOCKET_ERROR;
}
+ self->sockfd = -1;
return AMQP_STATUS_OK;
}
diff --git a/librabbitmq/amqp_tcp_socket.c b/librabbitmq/amqp_tcp_socket.c
index ed38c06..5bece5b 100644
--- a/librabbitmq/amqp_tcp_socket.c
+++ b/librabbitmq/amqp_tcp_socket.c
@@ -46,10 +46,13 @@ amqp_tcp_socket_send_inner(void *base, const void *buf, size_t len, int flags)
{
struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
ssize_t res;
-
const char *buf_left = buf;
ssize_t len_left = len;
+ if (-1 == self->sockfd) {
+ return AMQP_STATUS_SOCKET_CLOSED;
+ }
+
#ifdef MSG_NOSIGNAL
flags |= MSG_NOSIGNAL;
#endif
@@ -89,6 +92,9 @@ amqp_tcp_socket_writev(void *base, struct iovec *iov, int iovcnt)
{
struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
ssize_t ret;
+ if (-1 == self->sockfd) {
+ return AMQP_STATUS_SOCKET_CLOSED;
+ }
#if defined(_WIN32)
DWORD res;
@@ -201,6 +207,9 @@ amqp_tcp_socket_recv(void *base, void *buf, size_t len, int flags)
{
struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
ssize_t ret;
+ if (-1 == self->sockfd) {
+ return AMQP_STATUS_SOCKET_CLOSED;
+ }
start:
ret = recv(self->sockfd, buf, len, flags);
@@ -223,6 +232,9 @@ static int
amqp_tcp_socket_open(void *base, const char *host, int port, struct timeval *timeout)
{
struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
+ if (-1 != self->sockfd) {
+ return AMQP_STATUS_SOCKET_INUSE;
+ }
self->sockfd = amqp_open_socket_noblock(host, port, timeout);
if (0 > self->sockfd) {
int err = self->sockfd;
@@ -236,13 +248,14 @@ static int
amqp_tcp_socket_close(void *base)
{
struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
+ if (-1 == self->sockfd) {
+ return AMQP_STATUS_SOCKET_CLOSED;
+ }
- if (-1 != self->sockfd) {
- if (amqp_os_socket_close(self->sockfd)) {
- return AMQP_STATUS_SOCKET_ERROR;
- }
- self->sockfd = -1;
+ if (amqp_os_socket_close(self->sockfd)) {
+ return AMQP_STATUS_SOCKET_ERROR;
}
+ self->sockfd = -1;
return AMQP_STATUS_OK;
}