summaryrefslogtreecommitdiff
path: root/librabbitmq
diff options
context:
space:
mode:
authorAlan Antonuk <alan.antonuk@gmail.com>2013-07-03 12:35:27 -0700
committerAlan Antonuk <alan.antonuk@gmail.com>2013-07-03 12:35:27 -0700
commitda9c2c109ad9740177adfc93e5e92cba92c56134 (patch)
treef89f2512aea79d7abb64aa1f7900057eae44f8c4 /librabbitmq
parent448ab68ac6299b73f6ccd697949d2d566d5a1a64 (diff)
downloadrabbitmq-c-github-ask-da9c2c109ad9740177adfc93e5e92cba92c56134.tar.gz
Make connection the socket object owner
Improve the socket interface by making the amqp_connection_state_t object the amqp_socket_t owner, and tie its lifetime to the connection's lifetime. This prevents a class of silly errors where the socket object isn't freed, or the socket object is assigned to two different connection objects
Diffstat (limited to 'librabbitmq')
-rw-r--r--librabbitmq/amqp.h20
-rw-r--r--librabbitmq/amqp_connection.c13
-rw-r--r--librabbitmq/amqp_openssl.c56
-rw-r--r--librabbitmq/amqp_socket.c15
-rw-r--r--librabbitmq/amqp_socket.h26
-rw-r--r--librabbitmq/amqp_ssl_socket.h2
-rw-r--r--librabbitmq/amqp_tcp_socket.c37
-rw-r--r--librabbitmq/amqp_tcp_socket.h2
8 files changed, 119 insertions, 52 deletions
diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h
index 7f479c8..704e9fc 100644
--- a/librabbitmq/amqp.h
+++ b/librabbitmq/amqp.h
@@ -670,22 +670,6 @@ AMQP_CALL
amqp_socket_open(amqp_socket_t *self, const char *host, int port);
/**
- * Close a socket connection and free resources.
- *
- * This function closes a socket connection and releases any resources used by
- * the object. After calling this function the specified socket should no
- * longer be referenced.
- *
- * \param [in,out] self A socket object.
- *
- * \return Zero upon success, non-zero otherwise.
- */
-AMQP_PUBLIC_FUNCTION
-int
-AMQP_CALL
-amqp_socket_close(amqp_socket_t *self);
-
-/**
* Retrieve an error code for the last socket operation.
*
* At the time of writing, this interface is not well supported and is subject
@@ -716,6 +700,10 @@ int
AMQP_CALL
amqp_socket_get_sockfd(amqp_socket_t *self);
+AMQP_PUBLIC_FUNCTION
+amqp_socket_t *
+amqp_get_socket(amqp_connection_state_t state);
+
AMQP_END_DECLS
#include <amqp_framing.h>
diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c
index d5c29b0..df6a462 100644
--- a/librabbitmq/amqp_connection.c
+++ b/librabbitmq/amqp_connection.c
@@ -107,20 +107,25 @@ int amqp_get_sockfd(amqp_connection_state_t state)
void amqp_set_sockfd(amqp_connection_state_t state,
int sockfd)
{
- amqp_socket_t *socket = amqp_tcp_socket_new();
+ amqp_socket_t *socket = amqp_tcp_socket_new(state);
if (!socket) {
amqp_abort("%s", strerror(errno));
}
amqp_tcp_socket_set_sockfd(socket, sockfd);
- amqp_set_socket(state, socket);
}
void amqp_set_socket(amqp_connection_state_t state, amqp_socket_t *socket)
{
- amqp_socket_close(state->socket);
+ amqp_socket_delete(state->socket);
state->socket = socket;
}
+amqp_socket_t *
+amqp_get_socket(amqp_connection_state_t state)
+{
+ return state->socket;
+}
+
int amqp_tune_connection(amqp_connection_state_t state,
int channel_max,
int frame_max,
@@ -175,7 +180,7 @@ int amqp_destroy_connection(amqp_connection_state_t state)
free(state->outbound_buffer.bytes);
free(state->sock_inbound_buffer.bytes);
- status = amqp_socket_close(state->socket);
+ amqp_socket_delete(state->socket);
free(state);
}
return status;
diff --git a/librabbitmq/amqp_openssl.c b/librabbitmq/amqp_openssl.c
index 0f6c12c..cf0fc5b 100644
--- a/librabbitmq/amqp_openssl.c
+++ b/librabbitmq/amqp_openssl.c
@@ -300,15 +300,22 @@ static int
amqp_ssl_socket_close(void *base)
{
struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
- if (self) {
+
+ if (self->ssl) {
+ SSL_shutdown(self->ssl);
SSL_free(self->ssl);
- amqp_os_socket_close(self->sockfd);
- SSL_CTX_free(self->ctx);
- free(self->buffer);
- free(self);
+ self->ssl = NULL;
}
- destroy_openssl();
- return 0;
+
+ if (-1 != self->sockfd) {
+ if (amqp_os_socket_close(self->sockfd)) {
+ return AMQP_STATUS_SOCKET_ERROR;
+ }
+
+ self->sockfd = -1;
+ }
+
+ return AMQP_STATUS_OK;
}
static int
@@ -331,6 +338,21 @@ amqp_ssl_socket_get_sockfd(void *base)
return self->sockfd;
}
+static void
+amqp_ssl_socket_delete(void *base)
+{
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+
+ if (self) {
+ amqp_ssl_socket_close(self);
+
+ SSL_CTX_free(self->ctx);
+ free(self->buffer);
+ free(self);
+ }
+ destroy_openssl();
+}
+
static const struct amqp_socket_class_t amqp_ssl_socket_class = {
amqp_ssl_socket_writev, /* writev */
amqp_ssl_socket_send, /* send */
@@ -338,30 +360,38 @@ static const struct amqp_socket_class_t amqp_ssl_socket_class = {
amqp_ssl_socket_open, /* open */
amqp_ssl_socket_close, /* close */
amqp_ssl_socket_error, /* error */
- amqp_ssl_socket_get_sockfd /* get_sockfd */
+ amqp_ssl_socket_get_sockfd, /* get_sockfd */
+ amqp_ssl_socket_delete /* delete */
};
amqp_socket_t *
-amqp_ssl_socket_new(void)
+amqp_ssl_socket_new(amqp_connection_state_t state)
{
struct amqp_ssl_socket_t *self = calloc(1, sizeof(*self));
int status;
if (!self) {
- goto error;
+ return NULL;
}
+
+ self->sockfd = -1;
+ self->klass = &amqp_ssl_socket_class;
+ self->verify = 1;
+
status = initialize_openssl();
if (status) {
goto error;
}
+
self->ctx = SSL_CTX_new(SSLv23_client_method());
if (!self->ctx) {
goto error;
}
- self->klass = &amqp_ssl_socket_class;
- self->verify = 1;
+
+ amqp_set_socket(state, (amqp_socket_t *)self);
+
return (amqp_socket_t *)self;
error:
- amqp_socket_close((amqp_socket_t *)self);
+ amqp_ssl_socket_delete((amqp_socket_t *)self);
return NULL;
}
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index 441192a..575189f 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -188,11 +188,18 @@ amqp_socket_open(amqp_socket_t *self, const char *host, int port)
int
amqp_socket_close(amqp_socket_t *self)
{
+ assert(self);
+ assert(self->klass->close);
+ return self->klass->close(self);
+}
+
+void
+amqp_socket_delete(amqp_socket_t *self)
+{
if (self) {
- assert(self->klass->close);
- return self->klass->close(self);
+ assert(self->klass->delete);
+ self->klass->delete(self);
}
- return AMQP_STATUS_OK;
}
int
@@ -600,7 +607,6 @@ beginrecv:
if (AMQP_STATUS_TIMEOUT == res) {
if (next_timestamp == state->next_recv_heartbeat) {
amqp_socket_close(state->socket);
- state->socket = NULL;
return AMQP_STATUS_HEARTBEAT_TIMEOUT;
} else if (next_timestamp == timeout_timestamp) {
return AMQP_STATUS_TIMEOUT;
@@ -654,7 +660,6 @@ int amqp_simple_wait_method(amqp_connection_state_t state,
|| frame.frame_type != AMQP_FRAME_METHOD
|| frame.payload.method.id != expected_method) {
amqp_socket_close(state->socket);
- state->socket = NULL;
return AMQP_STATUS_WRONG_METHOD;
}
*output = frame.payload.method;
diff --git a/librabbitmq/amqp_socket.h b/librabbitmq/amqp_socket.h
index 52815fe..e0a1b85 100644
--- a/librabbitmq/amqp_socket.h
+++ b/librabbitmq/amqp_socket.h
@@ -50,6 +50,7 @@ typedef int (*amqp_socket_open_fn)(void *, const char *, int);
typedef int (*amqp_socket_close_fn)(void *);
typedef int (*amqp_socket_error_fn)(void *);
typedef int (*amqp_socket_get_sockfd_fn)(void *);
+typedef void (*amqp_socket_delete_fn)(void *);
/** V-table for amqp_socket_t */
struct amqp_socket_class_t {
@@ -60,6 +61,7 @@ struct amqp_socket_class_t {
amqp_socket_close_fn close;
amqp_socket_error_fn error;
amqp_socket_get_sockfd_fn get_sockfd;
+ amqp_socket_delete_fn delete;
};
/** Abstract base class for amqp_socket_t */
@@ -127,6 +129,30 @@ amqp_socket_send(amqp_socket_t *self, const void *buf, size_t len);
ssize_t
amqp_socket_recv(amqp_socket_t *self, void *buf, size_t len, int flags);
+/**
+ * Close a socket connection and free resources.
+ *
+ * This function closes a socket connection and releases any resources used by
+ * the object. After calling this function the specified socket should no
+ * longer be referenced.
+ *
+ * \param [in,out] self A socket object.
+ *
+ * \return Zero upon success, non-zero otherwise.
+ */
+AMQP_PUBLIC_FUNCTION
+int
+AMQP_CALL
+amqp_socket_close(amqp_socket_t *self);
+
+/**
+ * Destroy a socket object
+ *
+ * \param [in] self the socket object to delete
+ */
+void
+amqp_socket_delete(amqp_socket_t *self);
+
AMQP_END_DECLS
#endif /* AMQP_SOCKET_H */
diff --git a/librabbitmq/amqp_ssl_socket.h b/librabbitmq/amqp_ssl_socket.h
index 3bfce51..0da70a7 100644
--- a/librabbitmq/amqp_ssl_socket.h
+++ b/librabbitmq/amqp_ssl_socket.h
@@ -42,7 +42,7 @@ AMQP_BEGIN_DECLS
AMQP_PUBLIC_FUNCTION
amqp_socket_t *
AMQP_CALL
-amqp_ssl_socket_new(void);
+amqp_ssl_socket_new(amqp_connection_state_t state);
/**
* Set the CA certificate.
diff --git a/librabbitmq/amqp_tcp_socket.c b/librabbitmq/amqp_tcp_socket.c
index e43a596..5eae027 100644
--- a/librabbitmq/amqp_tcp_socket.c
+++ b/librabbitmq/amqp_tcp_socket.c
@@ -236,18 +236,15 @@ static int
amqp_tcp_socket_close(void *base)
{
struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
- int status = -1;
- if (self) {
- status = amqp_os_socket_close(self->sockfd);
- free(self->buffer);
- free(self);
- }
- if (0 == status) {
- return AMQP_STATUS_OK;
- } else {
- return AMQP_STATUS_SOCKET_ERROR;
+ if (-1 != self->sockfd) {
+ if (amqp_os_socket_close(self->sockfd)) {
+ return AMQP_STATUS_SOCKET_ERROR;
+ }
+ self->sockfd = -1;
}
+
+ return AMQP_STATUS_OK;
}
static int
@@ -264,6 +261,18 @@ amqp_tcp_socket_get_sockfd(void *base)
return self->sockfd;
}
+static void
+amqp_tcp_socket_delete(void *base)
+{
+ struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
+
+ if (self) {
+ amqp_tcp_socket_close(self);
+ free(self->buffer);
+ free(self);
+ }
+}
+
static const struct amqp_socket_class_t amqp_tcp_socket_class = {
amqp_tcp_socket_writev, /* writev */
amqp_tcp_socket_send, /* send */
@@ -271,11 +280,12 @@ static const struct amqp_socket_class_t amqp_tcp_socket_class = {
amqp_tcp_socket_open, /* open */
amqp_tcp_socket_close, /* close */
amqp_tcp_socket_error, /* error */
- amqp_tcp_socket_get_sockfd /* get_sockfd */
+ amqp_tcp_socket_get_sockfd, /* get_sockfd */
+ amqp_tcp_socket_delete /* delete */
};
amqp_socket_t *
-amqp_tcp_socket_new(void)
+amqp_tcp_socket_new(amqp_connection_state_t state)
{
struct amqp_tcp_socket_t *self = calloc(1, sizeof(*self));
if (!self) {
@@ -283,6 +293,9 @@ amqp_tcp_socket_new(void)
}
self->klass = &amqp_tcp_socket_class;
self->sockfd = -1;
+
+ amqp_set_socket(state, (amqp_socket_t *)self);
+
return (amqp_socket_t *)self;
}
diff --git a/librabbitmq/amqp_tcp_socket.h b/librabbitmq/amqp_tcp_socket.h
index 4c8ba54..2fcd2cb 100644
--- a/librabbitmq/amqp_tcp_socket.h
+++ b/librabbitmq/amqp_tcp_socket.h
@@ -42,7 +42,7 @@ AMQP_BEGIN_DECLS
AMQP_PUBLIC_FUNCTION
amqp_socket_t *
AMQP_CALL
-amqp_tcp_socket_new(void);
+amqp_tcp_socket_new(amqp_connection_state_t state);
/**
* Assign an open file descriptor to a socket object.