diff options
-rw-r--r-- | examples/amqp_bind.c | 3 | ||||
-rw-r--r-- | examples/amqp_consumer.c | 3 | ||||
-rw-r--r-- | examples/amqp_exchange_declare.c | 3 | ||||
-rw-r--r-- | examples/amqp_listen.c | 3 | ||||
-rw-r--r-- | examples/amqp_listenq.c | 3 | ||||
-rw-r--r-- | examples/amqp_producer.c | 3 | ||||
-rw-r--r-- | examples/amqp_rpc_sendstring_client.c | 3 | ||||
-rw-r--r-- | examples/amqp_sendstring.c | 3 | ||||
-rw-r--r-- | examples/amqp_unbind.c | 3 | ||||
-rw-r--r-- | examples/amqps_bind.c | 3 | ||||
-rw-r--r-- | examples/amqps_consumer.c | 3 | ||||
-rw-r--r-- | examples/amqps_exchange_declare.c | 3 | ||||
-rw-r--r-- | examples/amqps_listen.c | 3 | ||||
-rw-r--r-- | examples/amqps_listenq.c | 3 | ||||
-rw-r--r-- | examples/amqps_producer.c | 3 | ||||
-rw-r--r-- | examples/amqps_sendstring.c | 3 | ||||
-rw-r--r-- | examples/amqps_unbind.c | 3 | ||||
-rw-r--r-- | librabbitmq/amqp.h | 20 | ||||
-rw-r--r-- | librabbitmq/amqp_connection.c | 13 | ||||
-rw-r--r-- | librabbitmq/amqp_openssl.c | 56 | ||||
-rw-r--r-- | librabbitmq/amqp_socket.c | 15 | ||||
-rw-r--r-- | librabbitmq/amqp_socket.h | 26 | ||||
-rw-r--r-- | librabbitmq/amqp_ssl_socket.h | 2 | ||||
-rw-r--r-- | librabbitmq/amqp_tcp_socket.c | 37 | ||||
-rw-r--r-- | librabbitmq/amqp_tcp_socket.h | 2 | ||||
-rw-r--r-- | tools/common.c | 5 |
26 files changed, 138 insertions, 89 deletions
diff --git a/examples/amqp_bind.c b/examples/amqp_bind.c index 765e746..de1e0a5 100644 --- a/examples/amqp_bind.c +++ b/examples/amqp_bind.c @@ -68,7 +68,7 @@ int main(int argc, char const *const *argv) conn = amqp_new_connection(); - socket = amqp_tcp_socket_new(); + socket = amqp_tcp_socket_new(conn); if (!socket) { die("creating TCP socket"); } @@ -78,7 +78,6 @@ int main(int argc, char const *const *argv) die("opening TCP socket"); } - amqp_set_socket(conn, socket); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in"); amqp_channel_open(conn, 1); diff --git a/examples/amqp_consumer.c b/examples/amqp_consumer.c index 72bf654..21a5b48 100644 --- a/examples/amqp_consumer.c +++ b/examples/amqp_consumer.c @@ -146,7 +146,7 @@ int main(int argc, char const *const *argv) conn = amqp_new_connection(); - socket = amqp_tcp_socket_new(); + socket = amqp_tcp_socket_new(conn); if (!socket) { die("creating TCP socket"); } @@ -156,7 +156,6 @@ int main(int argc, char const *const *argv) die("opening TCP socket"); } - amqp_set_socket(conn, socket); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in"); amqp_channel_open(conn, 1); diff --git a/examples/amqp_exchange_declare.c b/examples/amqp_exchange_declare.c index 55860e5..9a20a62 100644 --- a/examples/amqp_exchange_declare.c +++ b/examples/amqp_exchange_declare.c @@ -66,7 +66,7 @@ int main(int argc, char const *const *argv) conn = amqp_new_connection(); - socket = amqp_tcp_socket_new(); + socket = amqp_tcp_socket_new(conn); if (!socket) { die("creating TCP socket"); } @@ -76,7 +76,6 @@ int main(int argc, char const *const *argv) die("opening TCP socket"); } - amqp_set_socket(conn, socket); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in"); amqp_channel_open(conn, 1); diff --git a/examples/amqp_listen.c b/examples/amqp_listen.c index 9385c17..bf5b716 100644 --- a/examples/amqp_listen.c +++ b/examples/amqp_listen.c @@ -70,7 +70,7 @@ int main(int argc, char const *const *argv) conn = amqp_new_connection(); - socket = amqp_tcp_socket_new(); + socket = amqp_tcp_socket_new(conn); if (!socket) { die("creating TCP socket"); } @@ -80,7 +80,6 @@ int main(int argc, char const *const *argv) die("opening TCP socket"); } - amqp_set_socket(conn, socket); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in"); amqp_channel_open(conn, 1); diff --git a/examples/amqp_listenq.c b/examples/amqp_listenq.c index 54c1189..e76cdb1 100644 --- a/examples/amqp_listenq.c +++ b/examples/amqp_listenq.c @@ -66,7 +66,7 @@ int main(int argc, char const *const *argv) conn = amqp_new_connection(); - socket = amqp_tcp_socket_new(); + socket = amqp_tcp_socket_new(conn); if (!socket) { die("creating TCP socket"); } @@ -76,7 +76,6 @@ int main(int argc, char const *const *argv) die("opening TCP socket"); } - amqp_set_socket(conn, socket); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in"); amqp_channel_open(conn, 1); diff --git a/examples/amqp_producer.c b/examples/amqp_producer.c index efa1a20..948d8f6 100644 --- a/examples/amqp_producer.c +++ b/examples/amqp_producer.c @@ -130,7 +130,7 @@ int main(int argc, char const *const *argv) conn = amqp_new_connection(); - socket = amqp_tcp_socket_new(); + socket = amqp_tcp_socket_new(conn); if (!socket) { die("creating TCP socket"); } @@ -140,7 +140,6 @@ int main(int argc, char const *const *argv) die("opening TCP socket"); } - amqp_set_socket(conn, socket); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in"); amqp_channel_open(conn, 1); diff --git a/examples/amqp_rpc_sendstring_client.c b/examples/amqp_rpc_sendstring_client.c index 6688195..84e7fdd 100644 --- a/examples/amqp_rpc_sendstring_client.c +++ b/examples/amqp_rpc_sendstring_client.c @@ -75,7 +75,7 @@ int main(int argc, char *argv[]) conn = amqp_new_connection(); - socket = amqp_tcp_socket_new(); + socket = amqp_tcp_socket_new(conn); if (!socket) { die("creating TCP socket"); } @@ -85,7 +85,6 @@ int main(int argc, char *argv[]) die("opening TCP socket"); } - amqp_set_socket(conn, socket); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in"); amqp_channel_open(conn, 1); diff --git a/examples/amqp_sendstring.c b/examples/amqp_sendstring.c index 0b64024..bc48054 100644 --- a/examples/amqp_sendstring.c +++ b/examples/amqp_sendstring.c @@ -68,7 +68,7 @@ int main(int argc, char const *const *argv) conn = amqp_new_connection(); - socket = amqp_tcp_socket_new(); + socket = amqp_tcp_socket_new(conn); if (!socket) { die("creating TCP socket"); } @@ -78,7 +78,6 @@ int main(int argc, char const *const *argv) die("opening TCP socket"); } - amqp_set_socket(conn, socket); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in"); amqp_channel_open(conn, 1); diff --git a/examples/amqp_unbind.c b/examples/amqp_unbind.c index 7948d0b..1ca3e83 100644 --- a/examples/amqp_unbind.c +++ b/examples/amqp_unbind.c @@ -68,7 +68,7 @@ int main(int argc, char const *const *argv) conn = amqp_new_connection(); - socket = amqp_tcp_socket_new(); + socket = amqp_tcp_socket_new(conn); if (!socket) { die("creating TCP socket"); } @@ -78,7 +78,6 @@ int main(int argc, char const *const *argv) die("opening TCP socket"); } - amqp_set_socket(conn, socket); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in"); amqp_channel_open(conn, 1); diff --git a/examples/amqps_bind.c b/examples/amqps_bind.c index fbde025..35c845f 100644 --- a/examples/amqps_bind.c +++ b/examples/amqps_bind.c @@ -71,7 +71,7 @@ int main(int argc, char const *const *argv) conn = amqp_new_connection(); - socket = amqp_ssl_socket_new(); + socket = amqp_ssl_socket_new(conn); if (!socket) { die("creating SSL/TLS socket"); } @@ -95,7 +95,6 @@ int main(int argc, char const *const *argv) die("opening SSL/TLS connection"); } - amqp_set_socket(conn, socket); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in"); amqp_channel_open(conn, 1); diff --git a/examples/amqps_consumer.c b/examples/amqps_consumer.c index 137457f..fff6677 100644 --- a/examples/amqps_consumer.c +++ b/examples/amqps_consumer.c @@ -148,7 +148,7 @@ int main(int argc, char const *const *argv) conn = amqp_new_connection(); - socket = amqp_ssl_socket_new(); + socket = amqp_ssl_socket_new(conn); if (!socket) { die("creating SSL/TLS socket"); } @@ -172,7 +172,6 @@ int main(int argc, char const *const *argv) die("opening SSL/TLS connection"); } - amqp_set_socket(conn, socket); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in"); amqp_channel_open(conn, 1); diff --git a/examples/amqps_exchange_declare.c b/examples/amqps_exchange_declare.c index bae2f57..85a29aa 100644 --- a/examples/amqps_exchange_declare.c +++ b/examples/amqps_exchange_declare.c @@ -69,7 +69,7 @@ int main(int argc, char const *const *argv) conn = amqp_new_connection(); - socket = amqp_ssl_socket_new(); + socket = amqp_ssl_socket_new(conn); if (!socket) { die("creating SSL/TLS socket"); } @@ -93,7 +93,6 @@ int main(int argc, char const *const *argv) die("opening SSL/TLS connection"); } - amqp_set_socket(conn, socket); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in"); amqp_channel_open(conn, 1); diff --git a/examples/amqps_listen.c b/examples/amqps_listen.c index 0e45162..a5eb692 100644 --- a/examples/amqps_listen.c +++ b/examples/amqps_listen.c @@ -73,7 +73,7 @@ int main(int argc, char const *const *argv) conn = amqp_new_connection(); - socket = amqp_ssl_socket_new(); + socket = amqp_ssl_socket_new(conn); if (!socket) { die("creating SSL/TLS socket"); } @@ -97,7 +97,6 @@ int main(int argc, char const *const *argv) die("opening SSL/TLS connection"); } - amqp_set_socket(conn, socket); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in"); amqp_channel_open(conn, 1); diff --git a/examples/amqps_listenq.c b/examples/amqps_listenq.c index 321c6a3..0210d88 100644 --- a/examples/amqps_listenq.c +++ b/examples/amqps_listenq.c @@ -69,7 +69,7 @@ int main(int argc, char const *const *argv) conn = amqp_new_connection(); - socket = amqp_ssl_socket_new(); + socket = amqp_ssl_socket_new(conn); if (!socket) { die("creating SSL/TLS socket"); } @@ -93,7 +93,6 @@ int main(int argc, char const *const *argv) die("opening SSL/TLS connection"); } - amqp_set_socket(conn, socket); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in"); amqp_channel_open(conn, 1); diff --git a/examples/amqps_producer.c b/examples/amqps_producer.c index f8f6dc6..25f850b 100644 --- a/examples/amqps_producer.c +++ b/examples/amqps_producer.c @@ -133,7 +133,7 @@ int main(int argc, char const *const *argv) conn = amqp_new_connection(); - socket = amqp_ssl_socket_new(); + socket = amqp_ssl_socket_new(conn); if (!socket) { die("creating SSL/TLS socket"); } @@ -157,7 +157,6 @@ int main(int argc, char const *const *argv) die("opening SSL/TLS connection"); } - amqp_set_socket(conn, socket); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in"); amqp_channel_open(conn, 1); diff --git a/examples/amqps_sendstring.c b/examples/amqps_sendstring.c index 7465ef2..fe3ac67 100644 --- a/examples/amqps_sendstring.c +++ b/examples/amqps_sendstring.c @@ -71,7 +71,7 @@ int main(int argc, char const *const *argv) conn = amqp_new_connection(); - socket = amqp_ssl_socket_new(); + socket = amqp_ssl_socket_new(conn); if (!socket) { die("creating SSL/TLS socket"); } @@ -95,7 +95,6 @@ int main(int argc, char const *const *argv) die("opening SSL/TLS connection"); } - amqp_set_socket(conn, socket); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in"); amqp_channel_open(conn, 1); diff --git a/examples/amqps_unbind.c b/examples/amqps_unbind.c index bae017d..7f4737e 100644 --- a/examples/amqps_unbind.c +++ b/examples/amqps_unbind.c @@ -71,7 +71,7 @@ int main(int argc, char const *const *argv) conn = amqp_new_connection(); - socket = amqp_ssl_socket_new(); + socket = amqp_ssl_socket_new(conn); if (!socket) { die("creating SSL/TLS socket"); } @@ -95,7 +95,6 @@ int main(int argc, char const *const *argv) die("opening SSL/TLS connection"); } - amqp_set_socket(conn, socket); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in"); amqp_channel_open(conn, 1); diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h index 7f479c8..704e9fc 100644 --- a/librabbitmq/amqp.h +++ b/librabbitmq/amqp.h @@ -670,22 +670,6 @@ AMQP_CALL amqp_socket_open(amqp_socket_t *self, const char *host, int port); /** - * Close a socket connection and free resources. - * - * This function closes a socket connection and releases any resources used by - * the object. After calling this function the specified socket should no - * longer be referenced. - * - * \param [in,out] self A socket object. - * - * \return Zero upon success, non-zero otherwise. - */ -AMQP_PUBLIC_FUNCTION -int -AMQP_CALL -amqp_socket_close(amqp_socket_t *self); - -/** * Retrieve an error code for the last socket operation. * * At the time of writing, this interface is not well supported and is subject @@ -716,6 +700,10 @@ int AMQP_CALL amqp_socket_get_sockfd(amqp_socket_t *self); +AMQP_PUBLIC_FUNCTION +amqp_socket_t * +amqp_get_socket(amqp_connection_state_t state); + AMQP_END_DECLS #include <amqp_framing.h> diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c index d5c29b0..df6a462 100644 --- a/librabbitmq/amqp_connection.c +++ b/librabbitmq/amqp_connection.c @@ -107,20 +107,25 @@ int amqp_get_sockfd(amqp_connection_state_t state) void amqp_set_sockfd(amqp_connection_state_t state, int sockfd) { - amqp_socket_t *socket = amqp_tcp_socket_new(); + amqp_socket_t *socket = amqp_tcp_socket_new(state); if (!socket) { amqp_abort("%s", strerror(errno)); } amqp_tcp_socket_set_sockfd(socket, sockfd); - amqp_set_socket(state, socket); } void amqp_set_socket(amqp_connection_state_t state, amqp_socket_t *socket) { - amqp_socket_close(state->socket); + amqp_socket_delete(state->socket); state->socket = socket; } +amqp_socket_t * +amqp_get_socket(amqp_connection_state_t state) +{ + return state->socket; +} + int amqp_tune_connection(amqp_connection_state_t state, int channel_max, int frame_max, @@ -175,7 +180,7 @@ int amqp_destroy_connection(amqp_connection_state_t state) free(state->outbound_buffer.bytes); free(state->sock_inbound_buffer.bytes); - status = amqp_socket_close(state->socket); + amqp_socket_delete(state->socket); free(state); } return status; diff --git a/librabbitmq/amqp_openssl.c b/librabbitmq/amqp_openssl.c index 0f6c12c..cf0fc5b 100644 --- a/librabbitmq/amqp_openssl.c +++ b/librabbitmq/amqp_openssl.c @@ -300,15 +300,22 @@ static int amqp_ssl_socket_close(void *base) { struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; - if (self) { + + if (self->ssl) { + SSL_shutdown(self->ssl); SSL_free(self->ssl); - amqp_os_socket_close(self->sockfd); - SSL_CTX_free(self->ctx); - free(self->buffer); - free(self); + self->ssl = NULL; } - destroy_openssl(); - return 0; + + if (-1 != self->sockfd) { + if (amqp_os_socket_close(self->sockfd)) { + return AMQP_STATUS_SOCKET_ERROR; + } + + self->sockfd = -1; + } + + return AMQP_STATUS_OK; } static int @@ -331,6 +338,21 @@ amqp_ssl_socket_get_sockfd(void *base) return self->sockfd; } +static void +amqp_ssl_socket_delete(void *base) +{ + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + + if (self) { + amqp_ssl_socket_close(self); + + SSL_CTX_free(self->ctx); + free(self->buffer); + free(self); + } + destroy_openssl(); +} + static const struct amqp_socket_class_t amqp_ssl_socket_class = { amqp_ssl_socket_writev, /* writev */ amqp_ssl_socket_send, /* send */ @@ -338,30 +360,38 @@ static const struct amqp_socket_class_t amqp_ssl_socket_class = { amqp_ssl_socket_open, /* open */ amqp_ssl_socket_close, /* close */ amqp_ssl_socket_error, /* error */ - amqp_ssl_socket_get_sockfd /* get_sockfd */ + amqp_ssl_socket_get_sockfd, /* get_sockfd */ + amqp_ssl_socket_delete /* delete */ }; amqp_socket_t * -amqp_ssl_socket_new(void) +amqp_ssl_socket_new(amqp_connection_state_t state) { struct amqp_ssl_socket_t *self = calloc(1, sizeof(*self)); int status; if (!self) { - goto error; + return NULL; } + + self->sockfd = -1; + self->klass = &amqp_ssl_socket_class; + self->verify = 1; + status = initialize_openssl(); if (status) { goto error; } + self->ctx = SSL_CTX_new(SSLv23_client_method()); if (!self->ctx) { goto error; } - self->klass = &amqp_ssl_socket_class; - self->verify = 1; + + amqp_set_socket(state, (amqp_socket_t *)self); + return (amqp_socket_t *)self; error: - amqp_socket_close((amqp_socket_t *)self); + amqp_ssl_socket_delete((amqp_socket_t *)self); return NULL; } diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index 441192a..575189f 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -188,11 +188,18 @@ amqp_socket_open(amqp_socket_t *self, const char *host, int port) int amqp_socket_close(amqp_socket_t *self) { + assert(self); + assert(self->klass->close); + return self->klass->close(self); +} + +void +amqp_socket_delete(amqp_socket_t *self) +{ if (self) { - assert(self->klass->close); - return self->klass->close(self); + assert(self->klass->delete); + self->klass->delete(self); } - return AMQP_STATUS_OK; } int @@ -600,7 +607,6 @@ beginrecv: if (AMQP_STATUS_TIMEOUT == res) { if (next_timestamp == state->next_recv_heartbeat) { amqp_socket_close(state->socket); - state->socket = NULL; return AMQP_STATUS_HEARTBEAT_TIMEOUT; } else if (next_timestamp == timeout_timestamp) { return AMQP_STATUS_TIMEOUT; @@ -654,7 +660,6 @@ int amqp_simple_wait_method(amqp_connection_state_t state, || frame.frame_type != AMQP_FRAME_METHOD || frame.payload.method.id != expected_method) { amqp_socket_close(state->socket); - state->socket = NULL; return AMQP_STATUS_WRONG_METHOD; } *output = frame.payload.method; diff --git a/librabbitmq/amqp_socket.h b/librabbitmq/amqp_socket.h index 52815fe..e0a1b85 100644 --- a/librabbitmq/amqp_socket.h +++ b/librabbitmq/amqp_socket.h @@ -50,6 +50,7 @@ typedef int (*amqp_socket_open_fn)(void *, const char *, int); typedef int (*amqp_socket_close_fn)(void *); typedef int (*amqp_socket_error_fn)(void *); typedef int (*amqp_socket_get_sockfd_fn)(void *); +typedef void (*amqp_socket_delete_fn)(void *); /** V-table for amqp_socket_t */ struct amqp_socket_class_t { @@ -60,6 +61,7 @@ struct amqp_socket_class_t { amqp_socket_close_fn close; amqp_socket_error_fn error; amqp_socket_get_sockfd_fn get_sockfd; + amqp_socket_delete_fn delete; }; /** Abstract base class for amqp_socket_t */ @@ -127,6 +129,30 @@ amqp_socket_send(amqp_socket_t *self, const void *buf, size_t len); ssize_t amqp_socket_recv(amqp_socket_t *self, void *buf, size_t len, int flags); +/** + * Close a socket connection and free resources. + * + * This function closes a socket connection and releases any resources used by + * the object. After calling this function the specified socket should no + * longer be referenced. + * + * \param [in,out] self A socket object. + * + * \return Zero upon success, non-zero otherwise. + */ +AMQP_PUBLIC_FUNCTION +int +AMQP_CALL +amqp_socket_close(amqp_socket_t *self); + +/** + * Destroy a socket object + * + * \param [in] self the socket object to delete + */ +void +amqp_socket_delete(amqp_socket_t *self); + AMQP_END_DECLS #endif /* AMQP_SOCKET_H */ diff --git a/librabbitmq/amqp_ssl_socket.h b/librabbitmq/amqp_ssl_socket.h index 3bfce51..0da70a7 100644 --- a/librabbitmq/amqp_ssl_socket.h +++ b/librabbitmq/amqp_ssl_socket.h @@ -42,7 +42,7 @@ AMQP_BEGIN_DECLS AMQP_PUBLIC_FUNCTION amqp_socket_t * AMQP_CALL -amqp_ssl_socket_new(void); +amqp_ssl_socket_new(amqp_connection_state_t state); /** * Set the CA certificate. diff --git a/librabbitmq/amqp_tcp_socket.c b/librabbitmq/amqp_tcp_socket.c index e43a596..5eae027 100644 --- a/librabbitmq/amqp_tcp_socket.c +++ b/librabbitmq/amqp_tcp_socket.c @@ -236,18 +236,15 @@ static int amqp_tcp_socket_close(void *base) { struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; - int status = -1; - if (self) { - status = amqp_os_socket_close(self->sockfd); - free(self->buffer); - free(self); - } - if (0 == status) { - return AMQP_STATUS_OK; - } else { - return AMQP_STATUS_SOCKET_ERROR; + if (-1 != self->sockfd) { + if (amqp_os_socket_close(self->sockfd)) { + return AMQP_STATUS_SOCKET_ERROR; + } + self->sockfd = -1; } + + return AMQP_STATUS_OK; } static int @@ -264,6 +261,18 @@ amqp_tcp_socket_get_sockfd(void *base) return self->sockfd; } +static void +amqp_tcp_socket_delete(void *base) +{ + struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; + + if (self) { + amqp_tcp_socket_close(self); + free(self->buffer); + free(self); + } +} + static const struct amqp_socket_class_t amqp_tcp_socket_class = { amqp_tcp_socket_writev, /* writev */ amqp_tcp_socket_send, /* send */ @@ -271,11 +280,12 @@ static const struct amqp_socket_class_t amqp_tcp_socket_class = { amqp_tcp_socket_open, /* open */ amqp_tcp_socket_close, /* close */ amqp_tcp_socket_error, /* error */ - amqp_tcp_socket_get_sockfd /* get_sockfd */ + amqp_tcp_socket_get_sockfd, /* get_sockfd */ + amqp_tcp_socket_delete /* delete */ }; amqp_socket_t * -amqp_tcp_socket_new(void) +amqp_tcp_socket_new(amqp_connection_state_t state) { struct amqp_tcp_socket_t *self = calloc(1, sizeof(*self)); if (!self) { @@ -283,6 +293,9 @@ amqp_tcp_socket_new(void) } self->klass = &amqp_tcp_socket_class; self->sockfd = -1; + + amqp_set_socket(state, (amqp_socket_t *)self); + return (amqp_socket_t *)self; } diff --git a/librabbitmq/amqp_tcp_socket.h b/librabbitmq/amqp_tcp_socket.h index 4c8ba54..2fcd2cb 100644 --- a/librabbitmq/amqp_tcp_socket.h +++ b/librabbitmq/amqp_tcp_socket.h @@ -42,7 +42,7 @@ AMQP_BEGIN_DECLS AMQP_PUBLIC_FUNCTION amqp_socket_t * AMQP_CALL -amqp_tcp_socket_new(void); +amqp_tcp_socket_new(amqp_connection_state_t state); /** * Assign an open file descriptor to a socket object. diff --git a/tools/common.c b/tools/common.c index a624105..06b41a9 100644 --- a/tools/common.c +++ b/tools/common.c @@ -332,7 +332,7 @@ amqp_connection_state_t make_connection(void) conn = amqp_new_connection(); if (ci.ssl) { #ifdef WITH_SSL - socket = amqp_ssl_socket_new(); + socket = amqp_ssl_socket_new(conn); if (!socket) { die("creating SSL/TLS socket"); } @@ -346,7 +346,7 @@ amqp_connection_state_t make_connection(void) die("librabbitmq was not built with SSL/TLS support"); #endif } else { - socket = amqp_tcp_socket_new(); + socket = amqp_tcp_socket_new(conn); if (!socket) { die("creating TCP socket (out of memory)"); } @@ -355,7 +355,6 @@ amqp_connection_state_t make_connection(void) if (status) { die("opening socket to %s:%d", ci.host, ci.port); } - amqp_set_socket(conn, socket); die_rpc(amqp_login(conn, ci.vhost, 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, ci.user, ci.password), |