diff options
author | Alan Antonuk <alan.antonuk@gmail.com> | 2013-07-03 12:35:27 -0700 |
---|---|---|
committer | Alan Antonuk <alan.antonuk@gmail.com> | 2013-07-03 12:35:27 -0700 |
commit | da9c2c109ad9740177adfc93e5e92cba92c56134 (patch) | |
tree | f89f2512aea79d7abb64aa1f7900057eae44f8c4 /librabbitmq | |
parent | 448ab68ac6299b73f6ccd697949d2d566d5a1a64 (diff) | |
download | rabbitmq-c-github-ask-da9c2c109ad9740177adfc93e5e92cba92c56134.tar.gz |
Make connection the socket object owner
Improve the socket interface by making the amqp_connection_state_t
object the amqp_socket_t owner, and tie its lifetime to the connection's
lifetime. This prevents a class of silly errors where the socket object
isn't freed, or the socket object is assigned to two different
connection objects
Diffstat (limited to 'librabbitmq')
-rw-r--r-- | librabbitmq/amqp.h | 20 | ||||
-rw-r--r-- | librabbitmq/amqp_connection.c | 13 | ||||
-rw-r--r-- | librabbitmq/amqp_openssl.c | 56 | ||||
-rw-r--r-- | librabbitmq/amqp_socket.c | 15 | ||||
-rw-r--r-- | librabbitmq/amqp_socket.h | 26 | ||||
-rw-r--r-- | librabbitmq/amqp_ssl_socket.h | 2 | ||||
-rw-r--r-- | librabbitmq/amqp_tcp_socket.c | 37 | ||||
-rw-r--r-- | librabbitmq/amqp_tcp_socket.h | 2 |
8 files changed, 119 insertions, 52 deletions
diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h index 7f479c8..704e9fc 100644 --- a/librabbitmq/amqp.h +++ b/librabbitmq/amqp.h @@ -670,22 +670,6 @@ AMQP_CALL amqp_socket_open(amqp_socket_t *self, const char *host, int port); /** - * Close a socket connection and free resources. - * - * This function closes a socket connection and releases any resources used by - * the object. After calling this function the specified socket should no - * longer be referenced. - * - * \param [in,out] self A socket object. - * - * \return Zero upon success, non-zero otherwise. - */ -AMQP_PUBLIC_FUNCTION -int -AMQP_CALL -amqp_socket_close(amqp_socket_t *self); - -/** * Retrieve an error code for the last socket operation. * * At the time of writing, this interface is not well supported and is subject @@ -716,6 +700,10 @@ int AMQP_CALL amqp_socket_get_sockfd(amqp_socket_t *self); +AMQP_PUBLIC_FUNCTION +amqp_socket_t * +amqp_get_socket(amqp_connection_state_t state); + AMQP_END_DECLS #include <amqp_framing.h> diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c index d5c29b0..df6a462 100644 --- a/librabbitmq/amqp_connection.c +++ b/librabbitmq/amqp_connection.c @@ -107,20 +107,25 @@ int amqp_get_sockfd(amqp_connection_state_t state) void amqp_set_sockfd(amqp_connection_state_t state, int sockfd) { - amqp_socket_t *socket = amqp_tcp_socket_new(); + amqp_socket_t *socket = amqp_tcp_socket_new(state); if (!socket) { amqp_abort("%s", strerror(errno)); } amqp_tcp_socket_set_sockfd(socket, sockfd); - amqp_set_socket(state, socket); } void amqp_set_socket(amqp_connection_state_t state, amqp_socket_t *socket) { - amqp_socket_close(state->socket); + amqp_socket_delete(state->socket); state->socket = socket; } +amqp_socket_t * +amqp_get_socket(amqp_connection_state_t state) +{ + return state->socket; +} + int amqp_tune_connection(amqp_connection_state_t state, int channel_max, int frame_max, @@ -175,7 +180,7 @@ int amqp_destroy_connection(amqp_connection_state_t state) free(state->outbound_buffer.bytes); free(state->sock_inbound_buffer.bytes); - status = amqp_socket_close(state->socket); + amqp_socket_delete(state->socket); free(state); } return status; diff --git a/librabbitmq/amqp_openssl.c b/librabbitmq/amqp_openssl.c index 0f6c12c..cf0fc5b 100644 --- a/librabbitmq/amqp_openssl.c +++ b/librabbitmq/amqp_openssl.c @@ -300,15 +300,22 @@ static int amqp_ssl_socket_close(void *base) { struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; - if (self) { + + if (self->ssl) { + SSL_shutdown(self->ssl); SSL_free(self->ssl); - amqp_os_socket_close(self->sockfd); - SSL_CTX_free(self->ctx); - free(self->buffer); - free(self); + self->ssl = NULL; } - destroy_openssl(); - return 0; + + if (-1 != self->sockfd) { + if (amqp_os_socket_close(self->sockfd)) { + return AMQP_STATUS_SOCKET_ERROR; + } + + self->sockfd = -1; + } + + return AMQP_STATUS_OK; } static int @@ -331,6 +338,21 @@ amqp_ssl_socket_get_sockfd(void *base) return self->sockfd; } +static void +amqp_ssl_socket_delete(void *base) +{ + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + + if (self) { + amqp_ssl_socket_close(self); + + SSL_CTX_free(self->ctx); + free(self->buffer); + free(self); + } + destroy_openssl(); +} + static const struct amqp_socket_class_t amqp_ssl_socket_class = { amqp_ssl_socket_writev, /* writev */ amqp_ssl_socket_send, /* send */ @@ -338,30 +360,38 @@ static const struct amqp_socket_class_t amqp_ssl_socket_class = { amqp_ssl_socket_open, /* open */ amqp_ssl_socket_close, /* close */ amqp_ssl_socket_error, /* error */ - amqp_ssl_socket_get_sockfd /* get_sockfd */ + amqp_ssl_socket_get_sockfd, /* get_sockfd */ + amqp_ssl_socket_delete /* delete */ }; amqp_socket_t * -amqp_ssl_socket_new(void) +amqp_ssl_socket_new(amqp_connection_state_t state) { struct amqp_ssl_socket_t *self = calloc(1, sizeof(*self)); int status; if (!self) { - goto error; + return NULL; } + + self->sockfd = -1; + self->klass = &amqp_ssl_socket_class; + self->verify = 1; + status = initialize_openssl(); if (status) { goto error; } + self->ctx = SSL_CTX_new(SSLv23_client_method()); if (!self->ctx) { goto error; } - self->klass = &amqp_ssl_socket_class; - self->verify = 1; + + amqp_set_socket(state, (amqp_socket_t *)self); + return (amqp_socket_t *)self; error: - amqp_socket_close((amqp_socket_t *)self); + amqp_ssl_socket_delete((amqp_socket_t *)self); return NULL; } diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index 441192a..575189f 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -188,11 +188,18 @@ amqp_socket_open(amqp_socket_t *self, const char *host, int port) int amqp_socket_close(amqp_socket_t *self) { + assert(self); + assert(self->klass->close); + return self->klass->close(self); +} + +void +amqp_socket_delete(amqp_socket_t *self) +{ if (self) { - assert(self->klass->close); - return self->klass->close(self); + assert(self->klass->delete); + self->klass->delete(self); } - return AMQP_STATUS_OK; } int @@ -600,7 +607,6 @@ beginrecv: if (AMQP_STATUS_TIMEOUT == res) { if (next_timestamp == state->next_recv_heartbeat) { amqp_socket_close(state->socket); - state->socket = NULL; return AMQP_STATUS_HEARTBEAT_TIMEOUT; } else if (next_timestamp == timeout_timestamp) { return AMQP_STATUS_TIMEOUT; @@ -654,7 +660,6 @@ int amqp_simple_wait_method(amqp_connection_state_t state, || frame.frame_type != AMQP_FRAME_METHOD || frame.payload.method.id != expected_method) { amqp_socket_close(state->socket); - state->socket = NULL; return AMQP_STATUS_WRONG_METHOD; } *output = frame.payload.method; diff --git a/librabbitmq/amqp_socket.h b/librabbitmq/amqp_socket.h index 52815fe..e0a1b85 100644 --- a/librabbitmq/amqp_socket.h +++ b/librabbitmq/amqp_socket.h @@ -50,6 +50,7 @@ typedef int (*amqp_socket_open_fn)(void *, const char *, int); typedef int (*amqp_socket_close_fn)(void *); typedef int (*amqp_socket_error_fn)(void *); typedef int (*amqp_socket_get_sockfd_fn)(void *); +typedef void (*amqp_socket_delete_fn)(void *); /** V-table for amqp_socket_t */ struct amqp_socket_class_t { @@ -60,6 +61,7 @@ struct amqp_socket_class_t { amqp_socket_close_fn close; amqp_socket_error_fn error; amqp_socket_get_sockfd_fn get_sockfd; + amqp_socket_delete_fn delete; }; /** Abstract base class for amqp_socket_t */ @@ -127,6 +129,30 @@ amqp_socket_send(amqp_socket_t *self, const void *buf, size_t len); ssize_t amqp_socket_recv(amqp_socket_t *self, void *buf, size_t len, int flags); +/** + * Close a socket connection and free resources. + * + * This function closes a socket connection and releases any resources used by + * the object. After calling this function the specified socket should no + * longer be referenced. + * + * \param [in,out] self A socket object. + * + * \return Zero upon success, non-zero otherwise. + */ +AMQP_PUBLIC_FUNCTION +int +AMQP_CALL +amqp_socket_close(amqp_socket_t *self); + +/** + * Destroy a socket object + * + * \param [in] self the socket object to delete + */ +void +amqp_socket_delete(amqp_socket_t *self); + AMQP_END_DECLS #endif /* AMQP_SOCKET_H */ diff --git a/librabbitmq/amqp_ssl_socket.h b/librabbitmq/amqp_ssl_socket.h index 3bfce51..0da70a7 100644 --- a/librabbitmq/amqp_ssl_socket.h +++ b/librabbitmq/amqp_ssl_socket.h @@ -42,7 +42,7 @@ AMQP_BEGIN_DECLS AMQP_PUBLIC_FUNCTION amqp_socket_t * AMQP_CALL -amqp_ssl_socket_new(void); +amqp_ssl_socket_new(amqp_connection_state_t state); /** * Set the CA certificate. diff --git a/librabbitmq/amqp_tcp_socket.c b/librabbitmq/amqp_tcp_socket.c index e43a596..5eae027 100644 --- a/librabbitmq/amqp_tcp_socket.c +++ b/librabbitmq/amqp_tcp_socket.c @@ -236,18 +236,15 @@ static int amqp_tcp_socket_close(void *base) { struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; - int status = -1; - if (self) { - status = amqp_os_socket_close(self->sockfd); - free(self->buffer); - free(self); - } - if (0 == status) { - return AMQP_STATUS_OK; - } else { - return AMQP_STATUS_SOCKET_ERROR; + if (-1 != self->sockfd) { + if (amqp_os_socket_close(self->sockfd)) { + return AMQP_STATUS_SOCKET_ERROR; + } + self->sockfd = -1; } + + return AMQP_STATUS_OK; } static int @@ -264,6 +261,18 @@ amqp_tcp_socket_get_sockfd(void *base) return self->sockfd; } +static void +amqp_tcp_socket_delete(void *base) +{ + struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; + + if (self) { + amqp_tcp_socket_close(self); + free(self->buffer); + free(self); + } +} + static const struct amqp_socket_class_t amqp_tcp_socket_class = { amqp_tcp_socket_writev, /* writev */ amqp_tcp_socket_send, /* send */ @@ -271,11 +280,12 @@ static const struct amqp_socket_class_t amqp_tcp_socket_class = { amqp_tcp_socket_open, /* open */ amqp_tcp_socket_close, /* close */ amqp_tcp_socket_error, /* error */ - amqp_tcp_socket_get_sockfd /* get_sockfd */ + amqp_tcp_socket_get_sockfd, /* get_sockfd */ + amqp_tcp_socket_delete /* delete */ }; amqp_socket_t * -amqp_tcp_socket_new(void) +amqp_tcp_socket_new(amqp_connection_state_t state) { struct amqp_tcp_socket_t *self = calloc(1, sizeof(*self)); if (!self) { @@ -283,6 +293,9 @@ amqp_tcp_socket_new(void) } self->klass = &amqp_tcp_socket_class; self->sockfd = -1; + + amqp_set_socket(state, (amqp_socket_t *)self); + return (amqp_socket_t *)self; } diff --git a/librabbitmq/amqp_tcp_socket.h b/librabbitmq/amqp_tcp_socket.h index 4c8ba54..2fcd2cb 100644 --- a/librabbitmq/amqp_tcp_socket.h +++ b/librabbitmq/amqp_tcp_socket.h @@ -42,7 +42,7 @@ AMQP_BEGIN_DECLS AMQP_PUBLIC_FUNCTION amqp_socket_t * AMQP_CALL -amqp_tcp_socket_new(void); +amqp_tcp_socket_new(amqp_connection_state_t state); /** * Assign an open file descriptor to a socket object. |