diff options
-rw-r--r-- | Makefile.am | 5 | ||||
-rw-r--r-- | examples/amqps_bind.c | 37 | ||||
-rw-r--r-- | examples/amqps_consumer.c | 38 | ||||
-rw-r--r-- | examples/amqps_exchange_declare.c | 37 | ||||
-rw-r--r-- | examples/amqps_listen.c | 37 | ||||
-rw-r--r-- | examples/amqps_listenq.c | 37 | ||||
-rw-r--r-- | examples/amqps_producer.c | 37 | ||||
-rw-r--r-- | examples/amqps_sendstring.c | 37 | ||||
-rw-r--r-- | examples/amqps_unbind.c | 37 | ||||
-rw-r--r-- | examples/utils.c | 12 | ||||
-rw-r--r-- | examples/utils.h | 1 | ||||
-rw-r--r-- | librabbitmq/CMakeLists.txt | 2 | ||||
-rw-r--r-- | librabbitmq/amqp-openssl.c | 284 | ||||
-rw-r--r-- | librabbitmq/amqp-socket.h | 61 | ||||
-rw-r--r-- | librabbitmq/amqp-ssl-socket.h (renamed from librabbitmq/amqp-ssl.h) | 84 | ||||
-rw-r--r-- | librabbitmq/amqp-tcp-socket.c | 143 | ||||
-rw-r--r-- | librabbitmq/amqp-tcp-socket.h | 55 | ||||
-rw-r--r-- | librabbitmq/amqp.h | 53 | ||||
-rw-r--r-- | librabbitmq/amqp_connection.c | 61 | ||||
-rw-r--r-- | librabbitmq/amqp_private.h | 10 | ||||
-rw-r--r-- | librabbitmq/amqp_socket.c | 53 | ||||
-rw-r--r-- | librabbitmq/unix/socket.c | 8 | ||||
-rw-r--r-- | librabbitmq/unix/socket.h | 10 | ||||
-rw-r--r-- | tools/common.c | 55 |
24 files changed, 859 insertions, 335 deletions
diff --git a/Makefile.am b/Makefile.am index e8c0e2a..f25c09c 100644 --- a/Makefile.am +++ b/Makefile.am @@ -12,6 +12,8 @@ endif #REGENERATE_AMQP_FRAMING lib_LTLIBRARIES = librabbitmq/librabbitmq.la librabbitmq_librabbitmq_la_SOURCES = \ + librabbitmq/amqp-socket.h \ + librabbitmq/amqp-tcp-socket.c \ librabbitmq/amqp_api.c \ librabbitmq/amqp_connection.c \ librabbitmq/amqp_mem.c \ @@ -67,9 +69,10 @@ endif include_HEADERS = \ $(top_srcdir)/librabbitmq/amqp.h + $(top_builddir)/librabbitmq/amqp-tcp-socket.h \ if SSL -include_HEADERS += librabbitmq/amqp-ssl.h +include_HEADERS += librabbitmq/amqp-ssl-socket.h endif if REGENERATE_AMQP_FRAMING diff --git a/examples/amqps_bind.c b/examples/amqps_bind.c index 1e2c84f..3255e70 100644 --- a/examples/amqps_bind.c +++ b/examples/amqps_bind.c @@ -35,19 +35,18 @@ #include <string.h> #include <stdint.h> -#include <amqp-ssl.h> +#include <amqp-ssl-socket.h> #include <amqp_framing.h> #include "utils.h" int main(int argc, char const * const *argv) { char const *hostname; - int port; + int port, status; char const *exchange; char const *bindingkey; char const *queue; - - int sockfd; + amqp_socket_t *socket; amqp_connection_state_t conn; if (argc < 6) { @@ -64,11 +63,31 @@ int main(int argc, char const * const *argv) { conn = amqp_new_connection(); - die_on_error(sockfd = amqp_open_ssl_socket(conn, hostname, port, - argc > 6 ? argv[6] : NULL, - argc > 8 ? argv[7] : NULL, - argc > 8 ? argv[8] : NULL), - "Opening SSL/TLS socket"); + socket = amqp_ssl_socket_new(); + if (!socket) { + die("creating SSL/TLS socket"); + } + + if (argc > 6) { + status = amqp_ssl_socket_set_cacert(socket, argv[6]); + if (status) { + die("setting CA certificate"); + } + } + + if (argc > 8) { + status = amqp_ssl_socket_set_key(socket, argv[7], argv[8]); + if (status) { + die("setting client key/cert"); + } + } + + status = amqp_socket_open(socket, hostname, port); + if (status) { + die("opening SSL/TLS connection"); + } + + amqp_set_socket(conn, socket); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in"); amqp_channel_open(conn, 1); diff --git a/examples/amqps_consumer.c b/examples/amqps_consumer.c index 31d251d..cb47dff 100644 --- a/examples/amqps_consumer.c +++ b/examples/amqps_consumer.c @@ -35,7 +35,7 @@ #include <string.h> #include <stdint.h> -#include <amqp-ssl.h> +#include <amqp-ssl-socket.h> #include <amqp_framing.h> #include <assert.h> @@ -115,13 +115,11 @@ static void run(amqp_connection_state_t conn) int main(int argc, char const * const *argv) { char const *hostname; - int port; + int port, status; char const *exchange; char const *bindingkey; - - int sockfd; + amqp_socket_t *socket; amqp_connection_state_t conn; - amqp_bytes_t queuename; if (argc < 3) { @@ -137,11 +135,31 @@ int main(int argc, char const * const *argv) { conn = amqp_new_connection(); - die_on_error(sockfd = amqp_open_ssl_socket(conn, hostname, port, - argc > 3 ? argv[3] : NULL, - argc > 5 ? argv[4] : NULL, - argc > 5 ? argv[5] : NULL), - "Opening SSL/TLS socket"); + socket = amqp_ssl_socket_new(); + if (!socket) { + die("creating SSL/TLS socket"); + } + + if (argc > 3) { + status = amqp_ssl_socket_set_cacert(socket, argv[3]); + if (status) { + die("setting CA certificate"); + } + } + + if (argc > 4) { + status = amqp_ssl_socket_set_key(socket, argv[5], argv[5]); + if (status) { + die("setting client key/cert"); + } + } + + status = amqp_socket_open(socket, hostname, port); + if (status) { + die("opening SSL/TLS connection"); + } + + amqp_set_socket(conn, socket); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in"); amqp_channel_open(conn, 1); diff --git a/examples/amqps_exchange_declare.c b/examples/amqps_exchange_declare.c index 6e81087..1310235 100644 --- a/examples/amqps_exchange_declare.c +++ b/examples/amqps_exchange_declare.c @@ -35,18 +35,17 @@ #include <string.h> #include <stdint.h> -#include <amqp-ssl.h> +#include <amqp-ssl-socket.h> #include <amqp_framing.h> #include "utils.h" int main(int argc, char const * const *argv) { char const *hostname; - int port; + int port, status; char const *exchange; char const *exchangetype; - - int sockfd; + amqp_socket_t *socket; amqp_connection_state_t conn; if (argc < 5) { @@ -62,11 +61,31 @@ int main(int argc, char const * const *argv) { conn = amqp_new_connection(); - die_on_error(sockfd = amqp_open_ssl_socket(conn, hostname, port, - argc > 5 ? argv[5] : NULL, - argc > 7 ? argv[6] : NULL, - argc > 7 ? argv[7] : NULL), - "Opening SSL/TLS socket"); + socket = amqp_ssl_socket_new(); + if (!socket) { + die("creating SSL/TLS socket"); + } + + if (argc > 5) { + status = amqp_ssl_socket_set_cacert(socket, argv[5]); + if (status) { + die("setting CA certificate"); + } + } + + if (argc > 7) { + status = amqp_ssl_socket_set_key(socket, argv[6], argv[7]); + if (status) { + die("setting client key/cert"); + } + } + + status = amqp_socket_open(socket, hostname, port); + if (status) { + die("opening SSL/TLS connection"); + } + + amqp_set_socket(conn, socket); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in"); amqp_channel_open(conn, 1); diff --git a/examples/amqps_listen.c b/examples/amqps_listen.c index 306bbaa..c6344bd 100644 --- a/examples/amqps_listen.c +++ b/examples/amqps_listen.c @@ -35,7 +35,7 @@ #include <string.h> #include <stdint.h> -#include <amqp-ssl.h> +#include <amqp-ssl-socket.h> #include <amqp_framing.h> #include <assert.h> @@ -44,11 +44,10 @@ int main(int argc, char const * const *argv) { char const *hostname; - int port; + int port, status; char const *exchange; char const *bindingkey; - - int sockfd; + amqp_socket_t *socket; amqp_connection_state_t conn; amqp_bytes_t queuename; @@ -66,11 +65,31 @@ int main(int argc, char const * const *argv) { conn = amqp_new_connection(); - die_on_error(sockfd = amqp_open_ssl_socket(conn, hostname, port, - argc > 5 ? argv[5] : NULL, - argc > 7 ? argv[6] : NULL, - argc > 7 ? argv[7] : NULL), - "Opening socket"); + socket = amqp_ssl_socket_new(); + if (!socket) { + die("creating SSL/TLS socket"); + } + + if (argc > 5) { + status = amqp_ssl_socket_set_cacert(socket, argv[5]); + if (status) { + die("setting CA certificate"); + } + } + + if (argc > 7) { + status = amqp_ssl_socket_set_key(socket, argv[6], argv[7]); + if (status) { + die("setting client key/cert"); + } + } + + status = amqp_socket_open(socket, hostname, port); + if (status) { + die("opening SSL/TLS connection"); + } + + amqp_set_socket(conn, socket); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in"); amqp_channel_open(conn, 1); diff --git a/examples/amqps_listenq.c b/examples/amqps_listenq.c index 90b608a..0a9c687 100644 --- a/examples/amqps_listenq.c +++ b/examples/amqps_listenq.c @@ -35,7 +35,7 @@ #include <string.h> #include <stdint.h> -#include <amqp-ssl.h> +#include <amqp-ssl-socket.h> #include <amqp_framing.h> #include <assert.h> @@ -44,10 +44,9 @@ int main(int argc, char const * const *argv) { char const *hostname; - int port; + int port, status; char const *queuename; - - int sockfd; + amqp_socket_t *socket; amqp_connection_state_t conn; if (argc < 4) { @@ -62,11 +61,31 @@ int main(int argc, char const * const *argv) { conn = amqp_new_connection(); - die_on_error(sockfd = amqp_open_ssl_socket(conn, hostname, port, - argc > 4 ? argv[4] : NULL, - argc > 6 ? argv[5] : NULL, - argc > 6 ? argv[6] : NULL), - "Opening socket"); + socket = amqp_ssl_socket_new(); + if (!socket) { + die("creating SSL/TLS socket"); + } + + if (argc > 4) { + status = amqp_ssl_socket_set_cacert(socket, argv[4]); + if (status) { + die("setting CA certificate"); + } + } + + if (argc > 6) { + status = amqp_ssl_socket_set_key(socket, argv[5], argv[6]); + if (status) { + die("setting client key/cert"); + } + } + + status = amqp_socket_open(socket, hostname, port); + if (status) { + die("opening SSL/TLS connection"); + } + + amqp_set_socket(conn, socket); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in"); amqp_channel_open(conn, 1); diff --git a/examples/amqps_producer.c b/examples/amqps_producer.c index 6e00f29..43649ad 100644 --- a/examples/amqps_producer.c +++ b/examples/amqps_producer.c @@ -35,7 +35,7 @@ #include <string.h> #include <stdint.h> -#include <amqp-ssl.h> +#include <amqp-ssl-socket.h> #include <amqp_framing.h> #include "utils.h" @@ -106,11 +106,10 @@ static void send_batch(amqp_connection_state_t conn, int main(int argc, char const * const *argv) { char const *hostname; - int port; + int port, status; int rate_limit; int message_count; - - int sockfd; + amqp_socket_t *socket; amqp_connection_state_t conn; if (argc < 5) { @@ -126,11 +125,31 @@ int main(int argc, char const * const *argv) { conn = amqp_new_connection(); - die_on_error(sockfd = amqp_open_ssl_socket(conn, hostname, port, - argc > 5 ? argv[5] : NULL, - argc > 7 ? argv[6] : NULL, - argc > 7 ? argv[7] : NULL), - "Opening SSL/TLS socket"); + socket = amqp_ssl_socket_new(); + if (!socket) { + die("creating SSL/TLS socket"); + } + + if (argc > 5) { + status = amqp_ssl_socket_set_cacert(socket, argv[5]); + if (status) { + die("setting CA certificate"); + } + } + + if (argc > 7) { + status = amqp_ssl_socket_set_key(socket, argv[6], argv[7]); + if (status) { + die("setting client key/cert"); + } + } + + status = amqp_socket_open(socket, hostname, port); + if (status) { + die("opening SSL/TLS connection"); + } + + amqp_set_socket(conn, socket); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in"); amqp_channel_open(conn, 1); diff --git a/examples/amqps_sendstring.c b/examples/amqps_sendstring.c index 2315982..a888da8 100644 --- a/examples/amqps_sendstring.c +++ b/examples/amqps_sendstring.c @@ -35,19 +35,18 @@ #include <string.h> #include <stdint.h> -#include <amqp-ssl.h> +#include <amqp-ssl-socket.h> #include <amqp_framing.h> #include "utils.h" int main(int argc, char const * const *argv) { char const *hostname; - int port; + int port, status; char const *exchange; char const *routingkey; char const *messagebody; - - int sockfd; + amqp_socket_t *socket; amqp_connection_state_t conn; if (argc < 6) { @@ -64,11 +63,31 @@ int main(int argc, char const * const *argv) { conn = amqp_new_connection(); - die_on_error(sockfd = amqp_open_ssl_socket(conn, hostname, port, - argc > 6 ? argv[6] : NULL, - argc > 8 ? argv[7] : NULL, - argc > 8 ? argv[8] : NULL), - "Opening socket"); + socket = amqp_ssl_socket_new(); + if (!socket) { + die("creating SSL/TLS socket"); + } + + if (argc > 6) { + status = amqp_ssl_socket_set_cacert(socket, argv[6]); + if (status) { + die("setting CA certificate"); + } + } + + if (argc > 8) { + status = amqp_ssl_socket_set_key(socket, argv[7], argv[8]); + if (status) { + die("setting client key/cert"); + } + } + + status = amqp_socket_open(socket, hostname, port); + if (status) { + die("opening SSL/TLS connection"); + } + + amqp_set_socket(conn, socket); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in"); amqp_channel_open(conn, 1); diff --git a/examples/amqps_unbind.c b/examples/amqps_unbind.c index e429c6d..e9f4317 100644 --- a/examples/amqps_unbind.c +++ b/examples/amqps_unbind.c @@ -35,19 +35,18 @@ #include <string.h> #include <stdint.h> -#include <amqp-ssl.h> +#include <amqp-ssl-socket.h> #include <amqp_framing.h> #include "utils.h" int main(int argc, char const * const *argv) { char const *hostname; - int port; + int port, status; char const *exchange; char const *bindingkey; char const *queue; - - int sockfd; + amqp_socket_t *socket; amqp_connection_state_t conn; if (argc < 6) { @@ -64,11 +63,31 @@ int main(int argc, char const * const *argv) { conn = amqp_new_connection(); - die_on_error(sockfd = amqp_open_ssl_socket(conn, hostname, port, - argc > 6 ? argv[6] : NULL, - argc > 8 ? argv[7] : NULL, - argc > 8 ? argv[8] : NULL), - "Opening socket"); + socket = amqp_ssl_socket_new(); + if (!socket) { + die("creating SSL/TLS socket"); + } + + if (argc > 6) { + status = amqp_ssl_socket_set_cacert(socket, argv[6]); + if (status) { + die("setting CA certificate"); + } + } + + if (argc > 8) { + status = amqp_ssl_socket_set_key(socket, argv[7], argv[8]); + if (status) { + die("setting client key/cert"); + } + } + + status = amqp_socket_open(socket, hostname, port); + if (status) { + die("opening SSL/TLS connection"); + } + + amqp_set_socket(conn, socket); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in"); amqp_channel_open(conn, 1); diff --git a/examples/utils.c b/examples/utils.c index 609c354..0830738 100644 --- a/examples/utils.c +++ b/examples/utils.c @@ -34,6 +34,7 @@ * ***** END LICENSE BLOCK ***** */ +#include <stdarg.h> #include <stdlib.h> #include <stdio.h> #include <string.h> @@ -45,8 +46,17 @@ #include "utils.h" -void die_on_error(int x, char const *context) +void die(const char *fmt, ...) { + va_list ap; + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); + fprintf(stderr, "\n"); + exit(1); +} + +void die_on_error(int x, char const *context) { if (x < 0) { char *errstr = amqp_error_string(-x); fprintf(stderr, "%s: %s\n", context, errstr); diff --git a/examples/utils.h b/examples/utils.h index 2e7b15f..dea86da 100644 --- a/examples/utils.h +++ b/examples/utils.h @@ -37,6 +37,7 @@ * ***** END LICENSE BLOCK ***** */ +void die(const char *fmt, ...); extern void die_on_error(int x, char const *context); extern void die_on_amqp_error(amqp_rpc_reply_t x, char const *context); diff --git a/librabbitmq/CMakeLists.txt b/librabbitmq/CMakeLists.txt index 1f78f4b..977b1fe 100644 --- a/librabbitmq/CMakeLists.txt +++ b/librabbitmq/CMakeLists.txt @@ -119,7 +119,7 @@ set(RABBITMQ_SOURCES ${AMQP_FRAMING_C_PATH} amqp_api.c amqp.h amqp_connection.c amqp_mem.c amqp_private.h amqp_socket.c amqp_table.c - amqp_url.c + amqp_url.c amqp-socket.h amqp-tcp-socket.c amqp-tcp-socket.h ${SOCKET_IMPL}/socket.h ${SOCKET_IMPL}/socket.c ${AMQP_SSL_SRCS} ) diff --git a/librabbitmq/amqp-openssl.c b/librabbitmq/amqp-openssl.c index 531a2ff..16d81e0 100644 --- a/librabbitmq/amqp-openssl.c +++ b/librabbitmq/amqp-openssl.c @@ -24,7 +24,7 @@ #include "config.h" #endif -#include "amqp-ssl.h" +#include "amqp-ssl-socket.h" #include "amqp_private.h" #include "threads.h" #include <ctype.h> @@ -54,22 +54,23 @@ static pthread_mutex_t openssl_init_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t *amqp_openssl_lockarray = NULL; #endif /* ENABLE_THREAD_SAFETY */ -struct amqp_ssl_socket_context { +struct amqp_ssl_socket_t { + amqp_socket_t base; BIO *bio; SSL_CTX *ctx; char *buffer; size_t length; + amqp_boolean_t verify; }; static ssize_t -amqp_ssl_socket_send(AMQP_UNUSED int sockfd, +amqp_ssl_socket_send(void *base, const void *buf, size_t len, - AMQP_UNUSED int flags, - void *user_data) + AMQP_UNUSED int flags) { + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; ssize_t sent; - struct amqp_ssl_socket_context *self = user_data; ERR_clear_error(); sent = BIO_write(self->bio, buf, len); if (0 > sent) { @@ -90,12 +91,11 @@ amqp_ssl_socket_send(AMQP_UNUSED int sockfd, } static ssize_t -amqp_ssl_socket_writev(AMQP_UNUSED int sockfd, +amqp_ssl_socket_writev(void *base, const struct iovec *iov, - int iovcnt, - void *user_data) + int iovcnt) { - struct amqp_ssl_socket_context *self = user_data; + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; ssize_t written = -1; char *bufferp; size_t bytes; @@ -118,19 +118,18 @@ amqp_ssl_socket_writev(AMQP_UNUSED int sockfd, memcpy(bufferp, iov[i].iov_base, iov[i].iov_len); bufferp += iov[i].iov_len; } - written = amqp_ssl_socket_send(sockfd, self->buffer, bytes, 0, self); + written = amqp_ssl_socket_send(self, self->buffer, bytes, 0); exit: return written; } static ssize_t -amqp_ssl_socket_recv(AMQP_UNUSED int sockfd, +amqp_ssl_socket_recv(void *base, void *buf, size_t len, - AMQP_UNUSED int flags, - void *user_data) + AMQP_UNUSED int flags) { - struct amqp_ssl_socket_context *self = user_data; + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; ssize_t received; ERR_clear_error(); received = BIO_read(self->bio, buf, len); @@ -150,157 +149,204 @@ amqp_ssl_socket_recv(AMQP_UNUSED int sockfd, } static int -amqp_ssl_socket_close(int sockfd, - void *user_data) +amqp_ssl_socket_verify(void *base, const char *host) { - struct amqp_ssl_socket_context *self = user_data; - if (self) { - BIO_free_all(self->bio); - SSL_CTX_free(self->ctx); - free(self->buffer); - free(self); - } - destroy_openssl(); - return 0 > sockfd ? -1 : 0; -} - -static int -amqp_ssl_socket_error(AMQP_UNUSED void *user_data) -{ - return -1; -} - -int -amqp_open_ssl_socket(amqp_connection_state_t state, - const char *host, - int port, - const char *cacert, - const char *key, - const char *cert) -{ - SSL *ssl; - X509 *peer; - long result; - X509_NAME *name; - X509_NAME_ENTRY *entry; - ASN1_STRING *entry_string; - struct amqp_ssl_socket_context *self; - int sockfd, status, pos, utf8_length; + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; unsigned char *utf8_value = NULL, *cp, ch; - initialize_openssl(); - self = calloc(1, sizeof(*self)); - if (!self) { - goto error; - } - self->ctx = SSL_CTX_new(SSLv23_client_method()); - if (!self->ctx) { - goto error; - } - status = SSL_CTX_load_verify_locations(self->ctx, cacert, NULL); - if (1 != status) { - goto error; - } - if (key && cert) { - status = SSL_CTX_use_PrivateKey_file(self->ctx, key, - SSL_FILETYPE_PEM); - if (1 != status) { - goto error; - } - status = SSL_CTX_use_certificate_chain_file(self->ctx, cert); - if (1 != status) { - goto error; - } - } - self->bio = BIO_new_ssl_connect(self->ctx); - if (!self->bio) { - goto error; - } + ASN1_STRING *entry_string; + X509_NAME_ENTRY *entry; + int pos, utf8_length; + X509_NAME *name; + X509 *peer; + SSL *ssl; BIO_get_ssl(self->bio, &ssl); - SSL_set_mode(ssl, SSL_MODE_AUTO_RETRY); - BIO_set_conn_hostname(self->bio, host); - BIO_set_conn_int_port(self->bio, &port); - status = BIO_do_connect(self->bio); - if (1 != status) { - goto error; - } - result = SSL_get_verify_result(ssl); - if (X509_V_OK != result) { - goto error; - } peer = SSL_get_peer_certificate(ssl); if (!peer) { - goto error; + return -1; } name = X509_get_subject_name(peer); if (!name) { - goto error; + return -1; } pos = X509_NAME_get_index_by_NID(name, NID_commonName, -1); if (0 > pos) { - goto error; + return -1; } entry = X509_NAME_get_entry(name, pos); if (!entry) { - goto error; + return -1; } entry_string = X509_NAME_ENTRY_get_data(entry); if (!entry_string) { - goto error; + return -1; } utf8_length = ASN1_STRING_to_UTF8(&utf8_value, entry_string); if (0 > utf8_length) { - goto error; + return -1; } while (utf8_length > 0 && utf8_value[utf8_length - 1] == 0) { --utf8_length; } if (utf8_length >= 256) { - goto error; + return -1; } if ((size_t)utf8_length != strlen((char *)utf8_value)) { - goto error; + return -1; } for (cp = utf8_value; (ch = *cp) != '\0'; ++cp) { if (isascii(ch) && !isprint(ch)) { - goto error; + return -1; } } #ifdef _MSC_VER #define strcasecmp _stricmp #endif if (strcasecmp(host, (char *)utf8_value)) { - goto error; + return -1; } #ifdef _MSC_VER #undef strcasecmp #endif - sockfd = BIO_get_fd(self->bio, NULL); - amqp_set_sockfd_full(state, sockfd, - amqp_ssl_socket_writev, - amqp_ssl_socket_send, - amqp_ssl_socket_recv, - amqp_ssl_socket_close, - amqp_ssl_socket_error, - self); -exit: - OPENSSL_free(utf8_value); - return sockfd; + return 0; +} + +static int +amqp_ssl_socket_open(void *base, const char *host, int port) +{ + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + long result; + int status; + SSL *ssl; + self->bio = BIO_new_ssl_connect(self->ctx); + if (!self->bio) { + return -1; + } + BIO_get_ssl(self->bio, &ssl); + SSL_set_mode(ssl, SSL_MODE_AUTO_RETRY); + BIO_set_conn_hostname(self->bio, host); + BIO_set_conn_int_port(self->bio, &port); + status = BIO_do_connect(self->bio); + if (1 != status) { + return -1; + } + result = SSL_get_verify_result(ssl); + if (X509_V_OK != result) { + return -1; + } + if (self->verify) { + int status = amqp_ssl_socket_verify(self, host); + if (status) { + return -1; + } + } + return 0; +} + +static int +amqp_ssl_socket_close(void *base) +{ + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + if (self) { + BIO_free_all(self->bio); + SSL_CTX_free(self->ctx); + free(self->buffer); + free(self); + } + destroy_openssl(); + return 0; +} + +static int +amqp_ssl_socket_error(AMQP_UNUSED void *base) +{ + return -1; +} + +static int +amqp_ssl_socket_get_sockfd(void *base) +{ + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + return BIO_get_fd(self->bio, NULL); +} + +amqp_socket_t * +amqp_ssl_socket_new(void) +{ + struct amqp_ssl_socket_t *self = calloc(1, sizeof(*self)); + int status; + if (!self) { + goto error; + } + status = initialize_openssl(); + if (status) { + goto error; + } + self->ctx = SSL_CTX_new(SSLv23_client_method()); + if (!self->ctx) { + goto error; + } + self->base.writev = amqp_ssl_socket_writev; + self->base.send = amqp_ssl_socket_send; + self->base.recv = amqp_ssl_socket_recv; + self->base.open = amqp_ssl_socket_open; + self->base.close = amqp_ssl_socket_close; + self->base.error = amqp_ssl_socket_error; + self->base.get_sockfd = amqp_ssl_socket_get_sockfd; + self->verify = 1; + return (amqp_socket_t *)self; error: - OPENSSL_free(utf8_value); - amqp_ssl_socket_close(-1, self); - sockfd = -1; - goto exit; + amqp_socket_close((amqp_socket_t *)self); + return NULL; +} + +int +amqp_ssl_socket_set_cacert(amqp_socket_t *base, + const char *cacert) +{ + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + int status = SSL_CTX_load_verify_locations(self->ctx, cacert, NULL); + if (1 != status) { + return -1; + } + return 0; +} + +int +amqp_ssl_socket_set_key(amqp_socket_t *base, + const char *key, + const char *cert) +{ + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + if (key && cert) { + int status = SSL_CTX_use_PrivateKey_file(self->ctx, key, + SSL_FILETYPE_PEM); + if (1 != status) { + return -1; + } + status = SSL_CTX_use_certificate_chain_file(self->ctx, cert); + if (1 != status) { + return -1; + } + return 0; + } + return -1; } void -amqp_set_initialize_ssl_library(amqp_boolean_t do_initialize) +amqp_ssl_socket_set_verify(amqp_socket_t *base, + amqp_boolean_t verify) { - if (!openssl_initialized) - { - do_initialize_openssl = do_initialize; - } + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + self->verify = verify; } +void +amqp_set_initialize_ssl_library(amqp_boolean_t do_initialize) +{ + if (!openssl_initialized) { + do_initialize_openssl = do_initialize; + } +} #ifdef ENABLE_THREAD_SAFETY unsigned long diff --git a/librabbitmq/amqp-socket.h b/librabbitmq/amqp-socket.h new file mode 100644 index 0000000..1c21da0 --- /dev/null +++ b/librabbitmq/amqp-socket.h @@ -0,0 +1,61 @@ +/* + * Copyright 2012 Michael Steinert + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#ifndef AMQP_SOCKET_H +#define AMQP_SOCKET_H + +#include "amqp.h" +#include "socket.h" + +AMQP_BEGIN_DECLS + +/* Socket callbacks. */ +typedef ssize_t (*amqp_socket_writev_fn)(void *, const struct iovec *, int); +typedef ssize_t (*amqp_socket_send_fn)(void *, const void *, size_t, int); +typedef ssize_t (*amqp_socket_recv_fn)(void *, void *, size_t, int); +typedef int (*amqp_socket_open_fn)(void *, const char *, int); +typedef int (*amqp_socket_close_fn)(void *); +typedef int (*amqp_socket_error_fn)(void *); +typedef int (*amqp_socket_get_sockfd_fn)(void *); + +struct amqp_socket_t_ { + amqp_socket_writev_fn writev; + amqp_socket_send_fn send; + amqp_socket_recv_fn recv; + amqp_socket_open_fn open; + amqp_socket_close_fn close; + amqp_socket_error_fn error; + amqp_socket_get_sockfd_fn get_sockfd; +}; + +ssize_t +amqp_socket_writev(amqp_socket_t *self, const struct iovec *iov, int iovcnt); + +ssize_t +amqp_socket_send(amqp_socket_t *self, const void *buf, size_t len, int flags); + +ssize_t +amqp_socket_recv(amqp_socket_t *self, void *buf, size_t len, int flags); + +AMQP_END_DECLS + +#endif /* AMQP_SOCKET_H */ diff --git a/librabbitmq/amqp-ssl.h b/librabbitmq/amqp-ssl-socket.h index 0a1ef56..bea5b75 100644 --- a/librabbitmq/amqp-ssl.h +++ b/librabbitmq/amqp-ssl-socket.h @@ -31,45 +31,71 @@ #include <amqp.h> /** - * Open an SSL connection to an AMQP broker. + * Create a new SSL/TLS socket object. * - * This function will setup an AMQP connection state object for SSL/TLS - * communication. The caller of this function should not use the returned - * file descriptor as input to amqp_set_sockfd() or amqp_set_sockfd_full(), - * or directly for network I/O. + * \return A new socket object or NULL if an error occurred. + */ +AMQP_PUBLIC_FUNCTION +amqp_socket_t * +AMQP_CALL +amqp_ssl_socket_new(void); + +/** + * Set the CA certificate. + * + * \param [in,out] self An SSL/TLS socket object. + * \param [in] cacert Path to the CA cert file in PEM format. * - * If a client key or certificate file is provide then they should both be - * provided. + * \return Zero if successful, -1 otherwise. + */ +AMQP_PUBLIC_FUNCTION +int +AMQP_CALL +amqp_ssl_socket_set_cacert(amqp_socket_t *self, + const char *cacert); + +/** + * Set the client key & certificate. * - * \param [in,out] state An AMQP connection state object. - * \param [in] host The name of the host to connect to. - * \param [in] port The port to connect on. - * \param [in] caert Path the CA cert file in PEM format. - * \param [in] key Path to the client key in PEM format. (may be NULL) - * \param [in] cert Path to the client cert in PEM format. (may be NULL) + * \param [in,out] self An SSL/TLS socket object. + * \param [in] key Path to the client key in PEM format. + * \param [in] cert Path to the client certificate in PEM foramt. * - * \return A socket file-descriptor (-1 if an error occurred). + * \return Zero if successful, false otherwise. */ AMQP_PUBLIC_FUNCTION int -amqp_open_ssl_socket(amqp_connection_state_t state, - const char *host, - int port, - const char *cacert, - const char *key, - const char *cert); +AMQP_CALL +amqp_ssl_socket_set_key(amqp_socket_t *self, + const char *key, + const char *cert); /** - * Sets whether rabbitmq-c initializes the underlying SSL library + * Enable or disable peer verification. + * + * If peer verification is enabled then the common name in the server + * certificate must match the server name. * - * For SSL libraries that require a one-time initialization across + * \param [in,out] self An SSL/TLS socket object. + * \param [in] verify Enable or disable peer verification. + */ +AMQP_PUBLIC_FUNCTION +void +AMQP_CALL +amqp_ssl_socket_set_verify(amqp_socket_t *self, + amqp_boolean_t verify); + +/** + * Sets whether rabbitmq-c initializes the underlying SSL library. + * + * For SSL libraries that require a one-time initialization across * a whole program (e.g., OpenSSL) this sets whether or not rabbitmq-c - * will initialize the SSL library when the first call to + * will initialize the SSL library when the first call to * amqp_open_ssl_socket() is made. You should call this function with * do_init = 0 if the underlying SSL library is intialized somewhere else - * the program. + * the program. * - * Failing to initialize or double initialization of the SSL library will + * Failing to initialize or double initialization of the SSL library will * result in undefined behavior * * By default rabbitmq-c will initialize the underlying SSL library @@ -77,12 +103,14 @@ amqp_open_ssl_socket(amqp_connection_state_t state, * NOTE: calling this function after the first socket has been opened with * amqp_open_ssl_socket() will not have any effect. * - * \param [in] do_initalize, if 0 rabbitmq-c will not initialize the SSL library, - * otherwise rabbitmq-c will initialize the SSL library - * + * \param [in] do_initalize If 0 rabbitmq-c will not initialize the SSL + * library, otherwise rabbitmq-c will initialize the + * SL library + * */ AMQP_PUBLIC_FUNCTION void +AMQP_CALL amqp_set_initialize_ssl_library(amqp_boolean_t do_initialize); #endif /* AMQP_SSL_H */ diff --git a/librabbitmq/amqp-tcp-socket.c b/librabbitmq/amqp-tcp-socket.c new file mode 100644 index 0000000..10430a9 --- /dev/null +++ b/librabbitmq/amqp-tcp-socket.c @@ -0,0 +1,143 @@ +/* + * Copyright 2012 Michael Steinert + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "amqp_private.h" +#include "amqp-tcp-socket.h" +#include <stdlib.h> + +struct amqp_tcp_socket_t { + amqp_socket_t base; + int sockfd; +}; + +static ssize_t +amqp_tcp_socket_writev(void *base, const struct iovec *iov, int iovcnt) +{ + struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; + return amqp_os_socket_writev(self->sockfd, iov, iovcnt); +} + +static ssize_t +amqp_tcp_socket_send(void *base, const void *buf, size_t len, int flags) +{ + struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; + return send(self->sockfd, buf, len, flags); +} + +static ssize_t +amqp_tcp_socket_recv(void *base, void *buf, size_t len, int flags) +{ + struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; + return recv(self->sockfd, buf, len, flags); +} + +static int +amqp_tcp_socket_open(void *base, const char *host, int port) +{ + struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; + struct sockaddr_in addr; + int status, one = 1; + struct hostent *he; + status = amqp_socket_init(); + if (status) { + return status; + } + he = gethostbyname(host); + if (!he) { + return -ERROR_GETHOSTBYNAME_FAILED; + } + addr.sin_family = AF_INET; + addr.sin_port = htons(port); + addr.sin_addr.s_addr = *(uint32_t *)he->h_addr_list[0]; + self->sockfd = amqp_socket_socket(PF_INET, SOCK_STREAM, 0); + if (-1 == self->sockfd) { + return -amqp_os_socket_error(); + } + status = amqp_socket_setsockopt(self->sockfd, IPPROTO_TCP, TCP_NODELAY, + &one, sizeof(one)); + if (0 > status) { + status = -amqp_os_socket_error(); + amqp_os_socket_close(self->sockfd); + return status; + } + status = connect(self->sockfd, (struct sockaddr *)&addr, sizeof(addr)); + if (0 > status) { + status = -amqp_os_socket_error(); + amqp_os_socket_close(self->sockfd); + return status; + } + return 0; +} + +static int +amqp_tcp_socket_close(void *base) +{ + struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; + int status = -1; + if (self) { + status = amqp_os_socket_close(self->sockfd); + free(self); + } + return status; +} + +static int +amqp_tcp_socket_error(AMQP_UNUSED void *base) +{ + return amqp_os_socket_error(); +} + +static int +amqp_tcp_socket_get_sockfd(void *base) +{ + struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; + return self->sockfd; +} + +amqp_socket_t * +amqp_tcp_socket_new(void) +{ + struct amqp_tcp_socket_t *self = calloc(1, sizeof(*self)); + if (!self) { + return NULL; + } + self->base.writev = amqp_tcp_socket_writev; + self->base.send = amqp_tcp_socket_send; + self->base.recv = amqp_tcp_socket_recv; + self->base.open = amqp_tcp_socket_open; + self->base.close = amqp_tcp_socket_close; + self->base.error = amqp_tcp_socket_error; + self->base.get_sockfd = amqp_tcp_socket_get_sockfd; + self->sockfd = -1; + return (amqp_socket_t *)self; +} + +void +amqp_tcp_socket_set_sockfd(amqp_socket_t *base, int sockfd) +{ + struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; + self->sockfd = sockfd; +} diff --git a/librabbitmq/amqp-tcp-socket.h b/librabbitmq/amqp-tcp-socket.h new file mode 100644 index 0000000..6e0afa0 --- /dev/null +++ b/librabbitmq/amqp-tcp-socket.h @@ -0,0 +1,55 @@ +/* + * Copyright 2012 Michael Steinert + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#ifndef AMQP_TCP_SOCKET_H +#define AMQP_TCP_SOCKET_H + +#include <amqp.h> + +AMQP_BEGIN_DECLS + +/** + * Create a new TCP socket. + * + * \return A new socket object or NULL if an error occurred. + */ +AMQP_PUBLIC_FUNCTION +amqp_socket_t * +AMQP_CALL +amqp_tcp_socket_new(void); + +/** + * Assign an open file descriptor to a socket object. + * + * This function must not be used in conjunction with amqp_socket_open(). + * + * \param [in,out] self A TCP socket object. + * \param [in] sockfd An open socket descriptor. + */ +AMQP_PUBLIC_FUNCTION +void +AMQP_CALL +amqp_tcp_socket_set_sockfd(amqp_socket_t *base, int sockfd); + +AMQP_END_DECLS + +#endif /* AMQP_TCP_SOCKET_H */ diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h index 0efbc74..734094b 100644 --- a/librabbitmq/amqp.h +++ b/librabbitmq/amqp.h @@ -52,7 +52,6 @@ */ #if defined(_WIN32) && defined(_MSC_VER) -struct iovec; # if defined(AMQP_BUILD) && !defined(AMQP_STATIC) # define AMQP_PUBLIC_FUNCTION __declspec(dllexport) # define AMQP_PUBLIC_VARIABLE __declspec(dllexport) extern @@ -67,7 +66,6 @@ struct iovec; # define AMQP_CALL __cdecl #elif defined(_WIN32) && defined(__BORLANDC__) -struct iovec; # if defined(AMQP_BUILD) && !defined(AMQP_STATIC) # define AMQP_PUBLIC_FUNCTION __declspec(dllexport) # define AMQP_PUBLIC_VARIABLE __declspec(dllexport) extern @@ -82,7 +80,6 @@ struct iovec; # define AMQP_CALL __cdecl #elif defined(_WIN32) && defined(__MINGW32__) -struct iovec; # if defined(AMQP_BUILD) && !defined(AMQP_STATIC) # define AMQP_PUBLIC_FUNCTION __declspec(dllexport) # define AMQP_PUBLIC_VARIABLE __declspec(dllexport) @@ -97,7 +94,6 @@ struct iovec; # define AMQP_CALL __cdecl #elif defined(_WIN32) && defined(__CYGWIN__) -struct iovec; # if defined(AMQP_BUILD) && !defined(AMQP_STATIC) # define AMQP_PUBLIC_FUNCTION __declspec(dllexport) # define AMQP_PUBLIC_VARIABLE __declspec(dllexport) @@ -124,6 +120,13 @@ struct iovec; # define AMQP_CALL #endif +#if __GNUC__ > 3 || (__GNUC__ == 3 && __GNUC_MINOR__ >= 1) +# define AMQP_DEPRECATED \ + __attribute__ ((__deprecated__)) +#else +# define AMQP_DEPRECATED +#endif + /* Define ssize_t on Win32/64 platforms See: http://lists.cs.uiuc.edu/pipermail/llvmdev/2010-April/030649.html for details */ @@ -321,12 +324,7 @@ typedef enum amqp_sasl_method_enum_ { /* Opaque struct. */ typedef struct amqp_connection_state_t_ *amqp_connection_state_t; -/* Socket callbacks. */ -typedef ssize_t (*amqp_socket_writev_fn)(int, const struct iovec *, int, void *); -typedef ssize_t (*amqp_socket_send_fn)(int, const void *, size_t, int, void *); -typedef ssize_t (*amqp_socket_recv_fn)(int, void *, size_t, int, void *); -typedef int (*amqp_socket_close_fn)(int, void *); -typedef int (*amqp_socket_error_fn)(void *); +typedef struct amqp_socket_t_ amqp_socket_t; AMQP_PUBLIC_FUNCTION char const * @@ -389,17 +387,12 @@ AMQP_CALL amqp_get_sockfd(amqp_connection_state_t state); AMQP_PUBLIC_FUNCTION void -AMQP_CALL amqp_set_sockfd(amqp_connection_state_t state, int sockfd); +AMQP_CALL amqp_set_sockfd(amqp_connection_state_t state, int sockfd) + AMQP_DEPRECATED; AMQP_PUBLIC_FUNCTION void -amqp_set_sockfd_full(amqp_connection_state_t state, int sockfd, - amqp_socket_writev_fn writev_fn, - amqp_socket_send_fn send_fn, - amqp_socket_recv_fn recv_fn, - amqp_socket_close_fn close_fn, - amqp_socket_error_fn error_fn, - void *user_data); +AMQP_CALL amqp_set_socket(amqp_connection_state_t state, amqp_socket_t *socket); AMQP_PUBLIC_FUNCTION int @@ -444,7 +437,8 @@ AMQP_CALL amqp_table_entry_cmp(void const *entry1, void const *entry2); AMQP_PUBLIC_FUNCTION int -AMQP_CALL amqp_open_socket(char const *hostname, int portnumber); +AMQP_CALL amqp_open_socket(char const *hostname, int portnumber) + AMQP_DEPRECATED; AMQP_PUBLIC_FUNCTION int @@ -599,6 +593,27 @@ AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_parse_url(char *url, struct amqp_connection_info *parsed); +/* socket API */ +AMQP_PUBLIC_FUNCTION +int +AMQP_CALL +amqp_socket_open(amqp_socket_t *self, const char *host, int port); + +AMQP_PUBLIC_FUNCTION +int +AMQP_CALL +amqp_socket_close(amqp_socket_t *self); + +AMQP_PUBLIC_FUNCTION +int +AMQP_CALL +amqp_socket_error(amqp_socket_t *self); + +AMQP_PUBLIC_FUNCTION +int +AMQP_CALL +amqp_socket_get_sockfd(amqp_socket_t *self); + AMQP_END_DECLS #include <amqp_framing.h> diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c index 6468da7..4666100 100644 --- a/librabbitmq/amqp_connection.c +++ b/librabbitmq/amqp_connection.c @@ -38,8 +38,10 @@ #include "config.h" #endif +#include "amqp-tcp-socket.h" #include "amqp_private.h" #include <assert.h> +#include <errno.h> #include <stdint.h> #include <stdio.h> #include <stdlib.h> @@ -59,18 +61,6 @@ _check_state->state); \ } -static ssize_t amqp_socket_send(int sockfd, const void *buf, size_t len, - int flags, AMQP_UNUSED void *user_data) -{ - return send(sockfd, buf, len, flags); -} - -static ssize_t amqp_socket_recv(int sockfd, void *buf, size_t len, int flags, - AMQP_UNUSED void *user_data) -{ - return recv(sockfd, buf, len, flags); -} - amqp_connection_state_t amqp_new_connection(void) { int res; @@ -101,7 +91,6 @@ amqp_connection_state_t amqp_new_connection(void) is also the minimum frame size */ state->target_size = 8; - state->sockfd = -1; state->sock_inbound_buffer.len = INITIAL_INBOUND_SOCK_BUFFER_SIZE; state->sock_inbound_buffer.bytes = malloc(INITIAL_INBOUND_SOCK_BUFFER_SIZE); if (state->sock_inbound_buffer.bytes == NULL) { @@ -120,36 +109,24 @@ out_nomem: int amqp_get_sockfd(amqp_connection_state_t state) { - return state->sockfd; + return amqp_socket_get_sockfd(state->socket); } void amqp_set_sockfd(amqp_connection_state_t state, int sockfd) { - state->sockfd = sockfd; - state->writev = amqp_socket_writev; - state->send = amqp_socket_send; - state->recv = amqp_socket_recv; - state->close = amqp_socket_close; - state->error = amqp_socket_error; - state->user_data = NULL; + amqp_socket_t *socket = amqp_tcp_socket_new(); + if (!socket) { + amqp_abort("%s", strerror(errno)); + } + amqp_tcp_socket_set_sockfd(socket, sockfd); + amqp_set_socket(state, socket); } -void amqp_set_sockfd_full(amqp_connection_state_t state, int sockfd, - amqp_socket_writev_fn writev_fn, - amqp_socket_send_fn send_fn, - amqp_socket_recv_fn recv_fn, - amqp_socket_close_fn close_fn, - amqp_socket_error_fn error_fn, - void *user_data) +void amqp_set_socket(amqp_connection_state_t state, amqp_socket_t *socket) { - state->sockfd = sockfd; - state->writev = writev_fn; - state->send = send_fn; - state->recv = recv_fn; - state->close = close_fn; - state->error = error_fn; - state->user_data = user_data; + amqp_socket_close(state->socket); + state->socket = socket; } int amqp_tune_connection(amqp_connection_state_t state, @@ -193,8 +170,8 @@ int amqp_destroy_connection(amqp_connection_state_t state) empty_amqp_pool(&state->decoding_pool); free(state->outbound_buffer.bytes); free(state->sock_inbound_buffer.bytes); - if (state->sockfd >= 0 && state->close(state->sockfd, state->user_data) < 0) - status = -state->error(state->user_data); + if (amqp_socket_close(state->socket) < 0) + status = -amqp_socket_error(state->socket); free(state); } return status; @@ -425,7 +402,7 @@ int amqp_send_frame(amqp_connection_state_t state, iov[2].iov_base = &frame_end_byte; iov[2].iov_len = FOOTER_SIZE; - res = state->writev(state->sockfd, iov, 3, state->user_data); + res = amqp_socket_writev(state->socket, iov, 3); } else { size_t out_frame_len; amqp_bytes_t encoded; @@ -473,13 +450,13 @@ int amqp_send_frame(amqp_connection_state_t state, amqp_e32(out_frame, 3, out_frame_len); amqp_e8(out_frame, out_frame_len + HEADER_SIZE, AMQP_FRAME_END); - res = state->send(state->sockfd, out_frame, - out_frame_len + HEADER_SIZE + FOOTER_SIZE, - MSG_NOSIGNAL, state->user_data); + res = amqp_socket_send(state->socket, out_frame, + out_frame_len + HEADER_SIZE + FOOTER_SIZE, + MSG_NOSIGNAL); } if (res < 0) { - return -state->error(state->user_data); + return -amqp_socket_error(state->socket); } else { return 0; } diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h index 7c77af8..d32f664 100644 --- a/librabbitmq/amqp_private.h +++ b/librabbitmq/amqp_private.h @@ -92,7 +92,7 @@ char * amqp_os_error_string(int err); -#include "socket.h" +#include "amqp-socket.h" /* * Connection states: XXX FIX THIS @@ -150,13 +150,7 @@ struct amqp_connection_state_t_ { amqp_bytes_t outbound_buffer; - int sockfd; - amqp_socket_writev_fn writev; - amqp_socket_send_fn send; - amqp_socket_recv_fn recv; - amqp_socket_close_fn close; - amqp_socket_error_fn error; - void *user_data; + amqp_socket_t *socket; amqp_bytes_t sock_inbound_buffer; size_t sock_inbound_offset; diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index ab1674d..d7be798 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -49,6 +49,51 @@ #include <stdlib.h> #include <string.h> +ssize_t +amqp_socket_writev(amqp_socket_t *self, const struct iovec *iov, int iovcnt) +{ + return self->writev(self, iov, iovcnt); +} + +ssize_t +amqp_socket_send(amqp_socket_t *self, const void *buf, size_t len, int flags) +{ + return self->send(self, buf, len, flags); +} + +ssize_t +amqp_socket_recv(amqp_socket_t *self, void *buf, size_t len, int flags) +{ + return self->recv(self, buf, len, flags); +} + +int +amqp_socket_open(amqp_socket_t *self, const char *host, int port) +{ + return self->open(self, host, port); +} + +int +amqp_socket_close(amqp_socket_t *self) +{ + if (self) { + return self->close(self); + } + return 0; +} + +int +amqp_socket_error(amqp_socket_t *self) +{ + return self->error(self); +} + +int +amqp_socket_get_sockfd(amqp_socket_t *self) +{ + return self->get_sockfd(self); +} + int amqp_open_socket(char const *hostname, int portnumber) { @@ -120,7 +165,7 @@ int amqp_send_header(amqp_connection_state_t state) AMQP_PROTOCOL_VERSION_MINOR, AMQP_PROTOCOL_VERSION_REVISION }; - return state->send(state->sockfd, (void *)header, 8, 0, state->user_data); + return amqp_socket_send(state->socket, header, 8, 0); } static amqp_bytes_t sasl_method_name(amqp_sasl_method_enum method) @@ -217,13 +262,13 @@ static int wait_frame_inner(amqp_connection_state_t state, assert(res != 0); } - res = state->recv(state->sockfd, state->sock_inbound_buffer.bytes, - state->sock_inbound_buffer.len, 0, state->user_data); + res = amqp_socket_recv(state->socket, state->sock_inbound_buffer.bytes, + state->sock_inbound_buffer.len, 0); if (res <= 0) { if (res == 0) { return -ERROR_CONNECTION_CLOSED; } else { - return -state->error(state->user_data); + return -amqp_socket_error(state->socket); } } diff --git a/librabbitmq/unix/socket.c b/librabbitmq/unix/socket.c index 1f2dff8..af1015d 100644 --- a/librabbitmq/unix/socket.c +++ b/librabbitmq/unix/socket.c @@ -82,20 +82,20 @@ amqp_os_error_string(int err) } int -amqp_socket_close(int sockfd, AMQP_UNUSED void *user_data) +amqp_os_socket_close(int sockfd) { return close(sockfd); } ssize_t -amqp_socket_writev(int sockfd, const struct iovec *iov, - int iovcnt, AMQP_UNUSED void *user_data) +amqp_os_socket_writev(int sockfd, const struct iovec *iov, + int iovcnt) { return writev(sockfd, iov, iovcnt); } int -amqp_socket_error(AMQP_UNUSED void *user_data) +amqp_os_socket_error(void) { return errno | ERROR_CATEGORY_OS; } diff --git a/librabbitmq/unix/socket.h b/librabbitmq/unix/socket.h index af5cde8..34de1a2 100644 --- a/librabbitmq/unix/socket.h +++ b/librabbitmq/unix/socket.h @@ -52,17 +52,13 @@ int amqp_socket_socket(int domain, int type, int proto); int -amqp_socket_error(void *user_data); +amqp_os_socket_error(void); int -amqp_socket_socket(int domain, int type, int proto); - -int -amqp_socket_close(int sockfd, void *user_data); +amqp_os_socket_close(int sockfd); ssize_t -amqp_socket_writev(int sockfd, const struct iovec *iov, int iovcnt, - void *user_data); +amqp_os_socket_writev(int sockfd, const struct iovec *iov, int iovcnt); #define amqp_socket_setsockopt setsockopt diff --git a/tools/common.c b/tools/common.c index 77bcb61..9738ba6 100644 --- a/tools/common.c +++ b/tools/common.c @@ -39,7 +39,10 @@ #endif #include "common.h" -#include <amqp-ssl.h> +#ifdef WITH_SSL +#include <amqp-ssl-socket.h> +#endif +#include <amqp-tcp-socket.h> #include <errno.h> #include <fcntl.h> #include <stdarg.h> @@ -220,8 +223,6 @@ struct poptOption connect_options[] = { static void init_connection_info(struct amqp_connection_info *ci) { - struct amqp_connection_info defaults; - ci->user = NULL; ci->password = NULL; ci->host = NULL; @@ -229,6 +230,8 @@ static void init_connection_info(struct amqp_connection_info *ci) ci->vhost = NULL; ci->user = NULL; + amqp_default_connection_info(ci); + if (amqp_url) die_amqp_error(amqp_parse_url(strdup(amqp_url), ci), "Parsing URL '%s'", amqp_url); @@ -312,30 +315,12 @@ static void init_connection_info(struct amqp_connection_info *ci) ci->vhost = amqp_vhost; } - -#if WITH_SSL - if (amqp_ssl) { - ci->ssl = true; - } -#endif - - amqp_default_connection_info(&defaults); - - if (!ci->user) - ci->user = defaults.user; - if (!ci->password) - ci->password = defaults.password; - if (!ci->host) - ci->host = defaults.host; - if (ci->port < 0) - ci->port = defaults.port; - if (!ci->vhost) - ci->vhost = defaults.vhost; } amqp_connection_state_t make_connection(void) { - int s; + int status; + amqp_socket_t *socket; struct amqp_connection_info ci; amqp_connection_state_t conn; @@ -343,16 +328,30 @@ amqp_connection_state_t make_connection(void) conn = amqp_new_connection(); if (ci.ssl) { #ifdef WITH_SSL - s = amqp_open_ssl_socket(conn, ci.host, ci.port, amqp_cacert, - amqp_key, amqp_cert); + socket = amqp_ssl_socket_new(); + if (!socket) { + die("creating SSL/TLS socket"); + } + if (amqp_cacert) { + amqp_ssl_socket_set_cacert(socket, amqp_cacert); + } + if (amqp_key && amqp_cert) { + amqp_ssl_socket_set_key(socket, amqp_key, amqp_cert); + } #else die("librabbitmq was not built with SSL/TLS support"); #endif } else { - s = amqp_open_socket(ci.host, ci.port); - amqp_set_sockfd(conn, s); + socket = amqp_tcp_socket_new(); + if (!socket) { + die("creating TCP socket (out of memory)"); + } + } + status = amqp_socket_open(socket, ci.host, ci.port); + if (status) { + die("opening socket to %s:%d", ci.host, ci.port); } - die_amqp_error(s, "opening socket to %s:%d", ci.host, ci.port); + amqp_set_socket(conn, socket); die_rpc(amqp_login(conn, ci.vhost, 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, ci.user, ci.password), |