summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Antonuk <alan.antonuk@gmail.com>2013-07-03 12:35:27 -0700
committerAlan Antonuk <alan.antonuk@gmail.com>2013-07-03 12:35:27 -0700
commitda9c2c109ad9740177adfc93e5e92cba92c56134 (patch)
treef89f2512aea79d7abb64aa1f7900057eae44f8c4
parent448ab68ac6299b73f6ccd697949d2d566d5a1a64 (diff)
downloadrabbitmq-c-da9c2c109ad9740177adfc93e5e92cba92c56134.tar.gz
Make connection the socket object owner
Improve the socket interface by making the amqp_connection_state_t object the amqp_socket_t owner, and tie its lifetime to the connection's lifetime. This prevents a class of silly errors where the socket object isn't freed, or the socket object is assigned to two different connection objects
-rw-r--r--examples/amqp_bind.c3
-rw-r--r--examples/amqp_consumer.c3
-rw-r--r--examples/amqp_exchange_declare.c3
-rw-r--r--examples/amqp_listen.c3
-rw-r--r--examples/amqp_listenq.c3
-rw-r--r--examples/amqp_producer.c3
-rw-r--r--examples/amqp_rpc_sendstring_client.c3
-rw-r--r--examples/amqp_sendstring.c3
-rw-r--r--examples/amqp_unbind.c3
-rw-r--r--examples/amqps_bind.c3
-rw-r--r--examples/amqps_consumer.c3
-rw-r--r--examples/amqps_exchange_declare.c3
-rw-r--r--examples/amqps_listen.c3
-rw-r--r--examples/amqps_listenq.c3
-rw-r--r--examples/amqps_producer.c3
-rw-r--r--examples/amqps_sendstring.c3
-rw-r--r--examples/amqps_unbind.c3
-rw-r--r--librabbitmq/amqp.h20
-rw-r--r--librabbitmq/amqp_connection.c13
-rw-r--r--librabbitmq/amqp_openssl.c56
-rw-r--r--librabbitmq/amqp_socket.c15
-rw-r--r--librabbitmq/amqp_socket.h26
-rw-r--r--librabbitmq/amqp_ssl_socket.h2
-rw-r--r--librabbitmq/amqp_tcp_socket.c37
-rw-r--r--librabbitmq/amqp_tcp_socket.h2
-rw-r--r--tools/common.c5
26 files changed, 138 insertions, 89 deletions
diff --git a/examples/amqp_bind.c b/examples/amqp_bind.c
index 765e746..de1e0a5 100644
--- a/examples/amqp_bind.c
+++ b/examples/amqp_bind.c
@@ -68,7 +68,7 @@ int main(int argc, char const *const *argv)
conn = amqp_new_connection();
- socket = amqp_tcp_socket_new();
+ socket = amqp_tcp_socket_new(conn);
if (!socket) {
die("creating TCP socket");
}
@@ -78,7 +78,6 @@ int main(int argc, char const *const *argv)
die("opening TCP socket");
}
- amqp_set_socket(conn, socket);
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
diff --git a/examples/amqp_consumer.c b/examples/amqp_consumer.c
index 72bf654..21a5b48 100644
--- a/examples/amqp_consumer.c
+++ b/examples/amqp_consumer.c
@@ -146,7 +146,7 @@ int main(int argc, char const *const *argv)
conn = amqp_new_connection();
- socket = amqp_tcp_socket_new();
+ socket = amqp_tcp_socket_new(conn);
if (!socket) {
die("creating TCP socket");
}
@@ -156,7 +156,6 @@ int main(int argc, char const *const *argv)
die("opening TCP socket");
}
- amqp_set_socket(conn, socket);
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
diff --git a/examples/amqp_exchange_declare.c b/examples/amqp_exchange_declare.c
index 55860e5..9a20a62 100644
--- a/examples/amqp_exchange_declare.c
+++ b/examples/amqp_exchange_declare.c
@@ -66,7 +66,7 @@ int main(int argc, char const *const *argv)
conn = amqp_new_connection();
- socket = amqp_tcp_socket_new();
+ socket = amqp_tcp_socket_new(conn);
if (!socket) {
die("creating TCP socket");
}
@@ -76,7 +76,6 @@ int main(int argc, char const *const *argv)
die("opening TCP socket");
}
- amqp_set_socket(conn, socket);
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
diff --git a/examples/amqp_listen.c b/examples/amqp_listen.c
index 9385c17..bf5b716 100644
--- a/examples/amqp_listen.c
+++ b/examples/amqp_listen.c
@@ -70,7 +70,7 @@ int main(int argc, char const *const *argv)
conn = amqp_new_connection();
- socket = amqp_tcp_socket_new();
+ socket = amqp_tcp_socket_new(conn);
if (!socket) {
die("creating TCP socket");
}
@@ -80,7 +80,6 @@ int main(int argc, char const *const *argv)
die("opening TCP socket");
}
- amqp_set_socket(conn, socket);
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
diff --git a/examples/amqp_listenq.c b/examples/amqp_listenq.c
index 54c1189..e76cdb1 100644
--- a/examples/amqp_listenq.c
+++ b/examples/amqp_listenq.c
@@ -66,7 +66,7 @@ int main(int argc, char const *const *argv)
conn = amqp_new_connection();
- socket = amqp_tcp_socket_new();
+ socket = amqp_tcp_socket_new(conn);
if (!socket) {
die("creating TCP socket");
}
@@ -76,7 +76,6 @@ int main(int argc, char const *const *argv)
die("opening TCP socket");
}
- amqp_set_socket(conn, socket);
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
diff --git a/examples/amqp_producer.c b/examples/amqp_producer.c
index efa1a20..948d8f6 100644
--- a/examples/amqp_producer.c
+++ b/examples/amqp_producer.c
@@ -130,7 +130,7 @@ int main(int argc, char const *const *argv)
conn = amqp_new_connection();
- socket = amqp_tcp_socket_new();
+ socket = amqp_tcp_socket_new(conn);
if (!socket) {
die("creating TCP socket");
}
@@ -140,7 +140,6 @@ int main(int argc, char const *const *argv)
die("opening TCP socket");
}
- amqp_set_socket(conn, socket);
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
diff --git a/examples/amqp_rpc_sendstring_client.c b/examples/amqp_rpc_sendstring_client.c
index 6688195..84e7fdd 100644
--- a/examples/amqp_rpc_sendstring_client.c
+++ b/examples/amqp_rpc_sendstring_client.c
@@ -75,7 +75,7 @@ int main(int argc, char *argv[])
conn = amqp_new_connection();
- socket = amqp_tcp_socket_new();
+ socket = amqp_tcp_socket_new(conn);
if (!socket) {
die("creating TCP socket");
}
@@ -85,7 +85,6 @@ int main(int argc, char *argv[])
die("opening TCP socket");
}
- amqp_set_socket(conn, socket);
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
diff --git a/examples/amqp_sendstring.c b/examples/amqp_sendstring.c
index 0b64024..bc48054 100644
--- a/examples/amqp_sendstring.c
+++ b/examples/amqp_sendstring.c
@@ -68,7 +68,7 @@ int main(int argc, char const *const *argv)
conn = amqp_new_connection();
- socket = amqp_tcp_socket_new();
+ socket = amqp_tcp_socket_new(conn);
if (!socket) {
die("creating TCP socket");
}
@@ -78,7 +78,6 @@ int main(int argc, char const *const *argv)
die("opening TCP socket");
}
- amqp_set_socket(conn, socket);
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
diff --git a/examples/amqp_unbind.c b/examples/amqp_unbind.c
index 7948d0b..1ca3e83 100644
--- a/examples/amqp_unbind.c
+++ b/examples/amqp_unbind.c
@@ -68,7 +68,7 @@ int main(int argc, char const *const *argv)
conn = amqp_new_connection();
- socket = amqp_tcp_socket_new();
+ socket = amqp_tcp_socket_new(conn);
if (!socket) {
die("creating TCP socket");
}
@@ -78,7 +78,6 @@ int main(int argc, char const *const *argv)
die("opening TCP socket");
}
- amqp_set_socket(conn, socket);
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
diff --git a/examples/amqps_bind.c b/examples/amqps_bind.c
index fbde025..35c845f 100644
--- a/examples/amqps_bind.c
+++ b/examples/amqps_bind.c
@@ -71,7 +71,7 @@ int main(int argc, char const *const *argv)
conn = amqp_new_connection();
- socket = amqp_ssl_socket_new();
+ socket = amqp_ssl_socket_new(conn);
if (!socket) {
die("creating SSL/TLS socket");
}
@@ -95,7 +95,6 @@ int main(int argc, char const *const *argv)
die("opening SSL/TLS connection");
}
- amqp_set_socket(conn, socket);
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
diff --git a/examples/amqps_consumer.c b/examples/amqps_consumer.c
index 137457f..fff6677 100644
--- a/examples/amqps_consumer.c
+++ b/examples/amqps_consumer.c
@@ -148,7 +148,7 @@ int main(int argc, char const *const *argv)
conn = amqp_new_connection();
- socket = amqp_ssl_socket_new();
+ socket = amqp_ssl_socket_new(conn);
if (!socket) {
die("creating SSL/TLS socket");
}
@@ -172,7 +172,6 @@ int main(int argc, char const *const *argv)
die("opening SSL/TLS connection");
}
- amqp_set_socket(conn, socket);
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
diff --git a/examples/amqps_exchange_declare.c b/examples/amqps_exchange_declare.c
index bae2f57..85a29aa 100644
--- a/examples/amqps_exchange_declare.c
+++ b/examples/amqps_exchange_declare.c
@@ -69,7 +69,7 @@ int main(int argc, char const *const *argv)
conn = amqp_new_connection();
- socket = amqp_ssl_socket_new();
+ socket = amqp_ssl_socket_new(conn);
if (!socket) {
die("creating SSL/TLS socket");
}
@@ -93,7 +93,6 @@ int main(int argc, char const *const *argv)
die("opening SSL/TLS connection");
}
- amqp_set_socket(conn, socket);
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
diff --git a/examples/amqps_listen.c b/examples/amqps_listen.c
index 0e45162..a5eb692 100644
--- a/examples/amqps_listen.c
+++ b/examples/amqps_listen.c
@@ -73,7 +73,7 @@ int main(int argc, char const *const *argv)
conn = amqp_new_connection();
- socket = amqp_ssl_socket_new();
+ socket = amqp_ssl_socket_new(conn);
if (!socket) {
die("creating SSL/TLS socket");
}
@@ -97,7 +97,6 @@ int main(int argc, char const *const *argv)
die("opening SSL/TLS connection");
}
- amqp_set_socket(conn, socket);
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
diff --git a/examples/amqps_listenq.c b/examples/amqps_listenq.c
index 321c6a3..0210d88 100644
--- a/examples/amqps_listenq.c
+++ b/examples/amqps_listenq.c
@@ -69,7 +69,7 @@ int main(int argc, char const *const *argv)
conn = amqp_new_connection();
- socket = amqp_ssl_socket_new();
+ socket = amqp_ssl_socket_new(conn);
if (!socket) {
die("creating SSL/TLS socket");
}
@@ -93,7 +93,6 @@ int main(int argc, char const *const *argv)
die("opening SSL/TLS connection");
}
- amqp_set_socket(conn, socket);
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
diff --git a/examples/amqps_producer.c b/examples/amqps_producer.c
index f8f6dc6..25f850b 100644
--- a/examples/amqps_producer.c
+++ b/examples/amqps_producer.c
@@ -133,7 +133,7 @@ int main(int argc, char const *const *argv)
conn = amqp_new_connection();
- socket = amqp_ssl_socket_new();
+ socket = amqp_ssl_socket_new(conn);
if (!socket) {
die("creating SSL/TLS socket");
}
@@ -157,7 +157,6 @@ int main(int argc, char const *const *argv)
die("opening SSL/TLS connection");
}
- amqp_set_socket(conn, socket);
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
diff --git a/examples/amqps_sendstring.c b/examples/amqps_sendstring.c
index 7465ef2..fe3ac67 100644
--- a/examples/amqps_sendstring.c
+++ b/examples/amqps_sendstring.c
@@ -71,7 +71,7 @@ int main(int argc, char const *const *argv)
conn = amqp_new_connection();
- socket = amqp_ssl_socket_new();
+ socket = amqp_ssl_socket_new(conn);
if (!socket) {
die("creating SSL/TLS socket");
}
@@ -95,7 +95,6 @@ int main(int argc, char const *const *argv)
die("opening SSL/TLS connection");
}
- amqp_set_socket(conn, socket);
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
diff --git a/examples/amqps_unbind.c b/examples/amqps_unbind.c
index bae017d..7f4737e 100644
--- a/examples/amqps_unbind.c
+++ b/examples/amqps_unbind.c
@@ -71,7 +71,7 @@ int main(int argc, char const *const *argv)
conn = amqp_new_connection();
- socket = amqp_ssl_socket_new();
+ socket = amqp_ssl_socket_new(conn);
if (!socket) {
die("creating SSL/TLS socket");
}
@@ -95,7 +95,6 @@ int main(int argc, char const *const *argv)
die("opening SSL/TLS connection");
}
- amqp_set_socket(conn, socket);
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h
index 7f479c8..704e9fc 100644
--- a/librabbitmq/amqp.h
+++ b/librabbitmq/amqp.h
@@ -670,22 +670,6 @@ AMQP_CALL
amqp_socket_open(amqp_socket_t *self, const char *host, int port);
/**
- * Close a socket connection and free resources.
- *
- * This function closes a socket connection and releases any resources used by
- * the object. After calling this function the specified socket should no
- * longer be referenced.
- *
- * \param [in,out] self A socket object.
- *
- * \return Zero upon success, non-zero otherwise.
- */
-AMQP_PUBLIC_FUNCTION
-int
-AMQP_CALL
-amqp_socket_close(amqp_socket_t *self);
-
-/**
* Retrieve an error code for the last socket operation.
*
* At the time of writing, this interface is not well supported and is subject
@@ -716,6 +700,10 @@ int
AMQP_CALL
amqp_socket_get_sockfd(amqp_socket_t *self);
+AMQP_PUBLIC_FUNCTION
+amqp_socket_t *
+amqp_get_socket(amqp_connection_state_t state);
+
AMQP_END_DECLS
#include <amqp_framing.h>
diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c
index d5c29b0..df6a462 100644
--- a/librabbitmq/amqp_connection.c
+++ b/librabbitmq/amqp_connection.c
@@ -107,20 +107,25 @@ int amqp_get_sockfd(amqp_connection_state_t state)
void amqp_set_sockfd(amqp_connection_state_t state,
int sockfd)
{
- amqp_socket_t *socket = amqp_tcp_socket_new();
+ amqp_socket_t *socket = amqp_tcp_socket_new(state);
if (!socket) {
amqp_abort("%s", strerror(errno));
}
amqp_tcp_socket_set_sockfd(socket, sockfd);
- amqp_set_socket(state, socket);
}
void amqp_set_socket(amqp_connection_state_t state, amqp_socket_t *socket)
{
- amqp_socket_close(state->socket);
+ amqp_socket_delete(state->socket);
state->socket = socket;
}
+amqp_socket_t *
+amqp_get_socket(amqp_connection_state_t state)
+{
+ return state->socket;
+}
+
int amqp_tune_connection(amqp_connection_state_t state,
int channel_max,
int frame_max,
@@ -175,7 +180,7 @@ int amqp_destroy_connection(amqp_connection_state_t state)
free(state->outbound_buffer.bytes);
free(state->sock_inbound_buffer.bytes);
- status = amqp_socket_close(state->socket);
+ amqp_socket_delete(state->socket);
free(state);
}
return status;
diff --git a/librabbitmq/amqp_openssl.c b/librabbitmq/amqp_openssl.c
index 0f6c12c..cf0fc5b 100644
--- a/librabbitmq/amqp_openssl.c
+++ b/librabbitmq/amqp_openssl.c
@@ -300,15 +300,22 @@ static int
amqp_ssl_socket_close(void *base)
{
struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
- if (self) {
+
+ if (self->ssl) {
+ SSL_shutdown(self->ssl);
SSL_free(self->ssl);
- amqp_os_socket_close(self->sockfd);
- SSL_CTX_free(self->ctx);
- free(self->buffer);
- free(self);
+ self->ssl = NULL;
}
- destroy_openssl();
- return 0;
+
+ if (-1 != self->sockfd) {
+ if (amqp_os_socket_close(self->sockfd)) {
+ return AMQP_STATUS_SOCKET_ERROR;
+ }
+
+ self->sockfd = -1;
+ }
+
+ return AMQP_STATUS_OK;
}
static int
@@ -331,6 +338,21 @@ amqp_ssl_socket_get_sockfd(void *base)
return self->sockfd;
}
+static void
+amqp_ssl_socket_delete(void *base)
+{
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+
+ if (self) {
+ amqp_ssl_socket_close(self);
+
+ SSL_CTX_free(self->ctx);
+ free(self->buffer);
+ free(self);
+ }
+ destroy_openssl();
+}
+
static const struct amqp_socket_class_t amqp_ssl_socket_class = {
amqp_ssl_socket_writev, /* writev */
amqp_ssl_socket_send, /* send */
@@ -338,30 +360,38 @@ static const struct amqp_socket_class_t amqp_ssl_socket_class = {
amqp_ssl_socket_open, /* open */
amqp_ssl_socket_close, /* close */
amqp_ssl_socket_error, /* error */
- amqp_ssl_socket_get_sockfd /* get_sockfd */
+ amqp_ssl_socket_get_sockfd, /* get_sockfd */
+ amqp_ssl_socket_delete /* delete */
};
amqp_socket_t *
-amqp_ssl_socket_new(void)
+amqp_ssl_socket_new(amqp_connection_state_t state)
{
struct amqp_ssl_socket_t *self = calloc(1, sizeof(*self));
int status;
if (!self) {
- goto error;
+ return NULL;
}
+
+ self->sockfd = -1;
+ self->klass = &amqp_ssl_socket_class;
+ self->verify = 1;
+
status = initialize_openssl();
if (status) {
goto error;
}
+
self->ctx = SSL_CTX_new(SSLv23_client_method());
if (!self->ctx) {
goto error;
}
- self->klass = &amqp_ssl_socket_class;
- self->verify = 1;
+
+ amqp_set_socket(state, (amqp_socket_t *)self);
+
return (amqp_socket_t *)self;
error:
- amqp_socket_close((amqp_socket_t *)self);
+ amqp_ssl_socket_delete((amqp_socket_t *)self);
return NULL;
}
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index 441192a..575189f 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -188,11 +188,18 @@ amqp_socket_open(amqp_socket_t *self, const char *host, int port)
int
amqp_socket_close(amqp_socket_t *self)
{
+ assert(self);
+ assert(self->klass->close);
+ return self->klass->close(self);
+}
+
+void
+amqp_socket_delete(amqp_socket_t *self)
+{
if (self) {
- assert(self->klass->close);
- return self->klass->close(self);
+ assert(self->klass->delete);
+ self->klass->delete(self);
}
- return AMQP_STATUS_OK;
}
int
@@ -600,7 +607,6 @@ beginrecv:
if (AMQP_STATUS_TIMEOUT == res) {
if (next_timestamp == state->next_recv_heartbeat) {
amqp_socket_close(state->socket);
- state->socket = NULL;
return AMQP_STATUS_HEARTBEAT_TIMEOUT;
} else if (next_timestamp == timeout_timestamp) {
return AMQP_STATUS_TIMEOUT;
@@ -654,7 +660,6 @@ int amqp_simple_wait_method(amqp_connection_state_t state,
|| frame.frame_type != AMQP_FRAME_METHOD
|| frame.payload.method.id != expected_method) {
amqp_socket_close(state->socket);
- state->socket = NULL;
return AMQP_STATUS_WRONG_METHOD;
}
*output = frame.payload.method;
diff --git a/librabbitmq/amqp_socket.h b/librabbitmq/amqp_socket.h
index 52815fe..e0a1b85 100644
--- a/librabbitmq/amqp_socket.h
+++ b/librabbitmq/amqp_socket.h
@@ -50,6 +50,7 @@ typedef int (*amqp_socket_open_fn)(void *, const char *, int);
typedef int (*amqp_socket_close_fn)(void *);
typedef int (*amqp_socket_error_fn)(void *);
typedef int (*amqp_socket_get_sockfd_fn)(void *);
+typedef void (*amqp_socket_delete_fn)(void *);
/** V-table for amqp_socket_t */
struct amqp_socket_class_t {
@@ -60,6 +61,7 @@ struct amqp_socket_class_t {
amqp_socket_close_fn close;
amqp_socket_error_fn error;
amqp_socket_get_sockfd_fn get_sockfd;
+ amqp_socket_delete_fn delete;
};
/** Abstract base class for amqp_socket_t */
@@ -127,6 +129,30 @@ amqp_socket_send(amqp_socket_t *self, const void *buf, size_t len);
ssize_t
amqp_socket_recv(amqp_socket_t *self, void *buf, size_t len, int flags);
+/**
+ * Close a socket connection and free resources.
+ *
+ * This function closes a socket connection and releases any resources used by
+ * the object. After calling this function the specified socket should no
+ * longer be referenced.
+ *
+ * \param [in,out] self A socket object.
+ *
+ * \return Zero upon success, non-zero otherwise.
+ */
+AMQP_PUBLIC_FUNCTION
+int
+AMQP_CALL
+amqp_socket_close(amqp_socket_t *self);
+
+/**
+ * Destroy a socket object
+ *
+ * \param [in] self the socket object to delete
+ */
+void
+amqp_socket_delete(amqp_socket_t *self);
+
AMQP_END_DECLS
#endif /* AMQP_SOCKET_H */
diff --git a/librabbitmq/amqp_ssl_socket.h b/librabbitmq/amqp_ssl_socket.h
index 3bfce51..0da70a7 100644
--- a/librabbitmq/amqp_ssl_socket.h
+++ b/librabbitmq/amqp_ssl_socket.h
@@ -42,7 +42,7 @@ AMQP_BEGIN_DECLS
AMQP_PUBLIC_FUNCTION
amqp_socket_t *
AMQP_CALL
-amqp_ssl_socket_new(void);
+amqp_ssl_socket_new(amqp_connection_state_t state);
/**
* Set the CA certificate.
diff --git a/librabbitmq/amqp_tcp_socket.c b/librabbitmq/amqp_tcp_socket.c
index e43a596..5eae027 100644
--- a/librabbitmq/amqp_tcp_socket.c
+++ b/librabbitmq/amqp_tcp_socket.c
@@ -236,18 +236,15 @@ static int
amqp_tcp_socket_close(void *base)
{
struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
- int status = -1;
- if (self) {
- status = amqp_os_socket_close(self->sockfd);
- free(self->buffer);
- free(self);
- }
- if (0 == status) {
- return AMQP_STATUS_OK;
- } else {
- return AMQP_STATUS_SOCKET_ERROR;
+ if (-1 != self->sockfd) {
+ if (amqp_os_socket_close(self->sockfd)) {
+ return AMQP_STATUS_SOCKET_ERROR;
+ }
+ self->sockfd = -1;
}
+
+ return AMQP_STATUS_OK;
}
static int
@@ -264,6 +261,18 @@ amqp_tcp_socket_get_sockfd(void *base)
return self->sockfd;
}
+static void
+amqp_tcp_socket_delete(void *base)
+{
+ struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
+
+ if (self) {
+ amqp_tcp_socket_close(self);
+ free(self->buffer);
+ free(self);
+ }
+}
+
static const struct amqp_socket_class_t amqp_tcp_socket_class = {
amqp_tcp_socket_writev, /* writev */
amqp_tcp_socket_send, /* send */
@@ -271,11 +280,12 @@ static const struct amqp_socket_class_t amqp_tcp_socket_class = {
amqp_tcp_socket_open, /* open */
amqp_tcp_socket_close, /* close */
amqp_tcp_socket_error, /* error */
- amqp_tcp_socket_get_sockfd /* get_sockfd */
+ amqp_tcp_socket_get_sockfd, /* get_sockfd */
+ amqp_tcp_socket_delete /* delete */
};
amqp_socket_t *
-amqp_tcp_socket_new(void)
+amqp_tcp_socket_new(amqp_connection_state_t state)
{
struct amqp_tcp_socket_t *self = calloc(1, sizeof(*self));
if (!self) {
@@ -283,6 +293,9 @@ amqp_tcp_socket_new(void)
}
self->klass = &amqp_tcp_socket_class;
self->sockfd = -1;
+
+ amqp_set_socket(state, (amqp_socket_t *)self);
+
return (amqp_socket_t *)self;
}
diff --git a/librabbitmq/amqp_tcp_socket.h b/librabbitmq/amqp_tcp_socket.h
index 4c8ba54..2fcd2cb 100644
--- a/librabbitmq/amqp_tcp_socket.h
+++ b/librabbitmq/amqp_tcp_socket.h
@@ -42,7 +42,7 @@ AMQP_BEGIN_DECLS
AMQP_PUBLIC_FUNCTION
amqp_socket_t *
AMQP_CALL
-amqp_tcp_socket_new(void);
+amqp_tcp_socket_new(amqp_connection_state_t state);
/**
* Assign an open file descriptor to a socket object.
diff --git a/tools/common.c b/tools/common.c
index a624105..06b41a9 100644
--- a/tools/common.c
+++ b/tools/common.c
@@ -332,7 +332,7 @@ amqp_connection_state_t make_connection(void)
conn = amqp_new_connection();
if (ci.ssl) {
#ifdef WITH_SSL
- socket = amqp_ssl_socket_new();
+ socket = amqp_ssl_socket_new(conn);
if (!socket) {
die("creating SSL/TLS socket");
}
@@ -346,7 +346,7 @@ amqp_connection_state_t make_connection(void)
die("librabbitmq was not built with SSL/TLS support");
#endif
} else {
- socket = amqp_tcp_socket_new();
+ socket = amqp_tcp_socket_new(conn);
if (!socket) {
die("creating TCP socket (out of memory)");
}
@@ -355,7 +355,6 @@ amqp_connection_state_t make_connection(void)
if (status) {
die("opening socket to %s:%d", ci.host, ci.port);
}
- amqp_set_socket(conn, socket);
die_rpc(amqp_login(conn, ci.vhost, 0, 131072, 0,
AMQP_SASL_METHOD_PLAIN,
ci.user, ci.password),