From eff3e939a4200752b76a79585d1bc2d77afeca26 Mon Sep 17 00:00:00 2001 From: Alan Antonuk Date: Tue, 29 May 2012 16:43:21 -0400 Subject: Adding support for OpenSSL in multithreaded applications --- CMakeLists.txt | 3 ++ librabbitmq/CMakeLists.txt | 6 ++- librabbitmq/amqp-openssl.c | 102 ++++++++++++++++++++++++++++++++++++++++++--- librabbitmq/unix/threads.h | 6 +++ 4 files changed, 111 insertions(+), 6 deletions(-) create mode 100644 librabbitmq/unix/threads.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 1ae0b7b..721043a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -70,12 +70,15 @@ if (POPT_FOUND AND XmlTo_FOUND) set(DO_DOCS ON) endif() +find_package(Threads) + option(BUILD_SHARED_LIBS "Build rabbitmq-c as a shared library" ON) option(BUILD_EXAMPLES "Build Examples" ON) option(BUILD_TOOLS "Build Tools (requires POPT Library)" ${POPT_FOUND}) option(BUILD_TOOLS_DOCS "Build man pages for Tools (requires xmlto)" ${DO_DOCS}) option(BUILD_TESTS "Build tests (run tests with make test)" ON) option(ENABLE_SSL_SUPPORT "Enable SSL support" ON) +option(ENABLE_THREAD_SAFETY "Enable thread safety when using OpenSSL" ${Threads_FOUND}) set(SSL_ENGINE "OpenSSL" CACHE STRING "SSL Backend to use, valid options: OpenSSL, cyaSSL, GnuTLS, PolarSSL") mark_as_advanced(SSL_ENGINE) diff --git a/librabbitmq/CMakeLists.txt b/librabbitmq/CMakeLists.txt index 958b641..0fb8026 100644 --- a/librabbitmq/CMakeLists.txt +++ b/librabbitmq/CMakeLists.txt @@ -97,6 +97,10 @@ if (ENABLE_SSL_SUPPORT) else() message(FATAL_ERROR "Unknown SSL_ENGINE ${SSL_ENGINE}") endif() + + if (ENABLE_THREAD_SAFETY) + add_definitions(-DENABLE_THREAD_SAFETY) + endif() endif() set(RABBITMQ_SOURCES @@ -118,7 +122,7 @@ endif() add_library(rabbitmq ${RABBITMQ_SOURCES}) if (ENABLE_SSL_SUPPORT) - target_link_libraries(rabbitmq ${AMQP_SSL_LIBS}) + target_link_libraries(rabbitmq ${AMQP_SSL_LIBS} ${CMAKE_THREAD_LIBS_INIT}) endif() if(WIN32) diff --git a/librabbitmq/amqp-openssl.c b/librabbitmq/amqp-openssl.c index d136b70..965f6e1 100644 --- a/librabbitmq/amqp-openssl.c +++ b/librabbitmq/amqp-openssl.c @@ -26,8 +26,10 @@ #include "amqp-ssl.h" #include "amqp_private.h" +#include "threads.h" #include #include +#include #include #include #include @@ -35,9 +37,17 @@ static int initialize_openssl(); static int destroy_openssl(); -int open_connections = 0; -amqp_boolean_t do_initialize_openssl = 1; -amqp_boolean_t openssl_initialized = 0; +static int open_ssl_connections = 0; +static amqp_boolean_t do_initialize_openssl = 1; +static amqp_boolean_t openssl_initialized = 0; + +#ifdef ENABLE_THREAD_SAFETY +static unsigned long amqp_ssl_threadid_callback(void); +static void amqp_ssl_locking_callback(int mode, int n, const char *file, int line); + +static pthread_mutex_t openssl_init_mutex = PTHREAD_MUTEX_INITIALIZER; +static pthread_mutex_t *amqp_openssl_lockarray = NULL; +#endif /* ENABLE_THREAD_SAFETY */ struct amqp_ssl_socket_context { BIO *bio; @@ -286,11 +296,68 @@ amqp_set_initialize_ssl_library(amqp_boolean_t do_initialize) } } + +#ifdef ENABLE_THREAD_SAFETY +unsigned long +amqp_ssl_threadid_callback(void) +{ + return (unsigned long)pthread_self(); +} + +void +amqp_ssl_locking_callback(int mode, int n, const char *file, int line) +{ + if (mode & CRYPTO_LOCK) + { + if (pthread_mutex_lock(&amqp_openssl_lockarray[n])) + amqp_abort("Runtime error: Failure in trying to lock OpenSSL mutex"); + } + else + { + if (pthread_mutex_unlock(&amqp_openssl_lockarray[n])) + amqp_abort("Runtime error: Failure in trying to unlock OpenSSL mutex"); + } +} +#endif /* ENABLE_THREAD_SAFETY */ + static int initialize_openssl() { +#ifdef ENABLE_THREAD_SAFETY + if (pthread_mutex_lock(&openssl_init_mutex)) + return -1; +#endif /* ENABLE_THREAD_SAFETY */ if (do_initialize_openssl) { +#ifdef ENABLE_THREAD_SAFETY + if (NULL == amqp_openssl_lockarray) + { + int i = 0; + amqp_openssl_lockarray = calloc(CRYPTO_num_locks(), sizeof(pthread_mutex_t)); + if (!amqp_openssl_lockarray) + { + pthread_mutex_unlock(&openssl_init_mutex); + return -1; + } + for (int i = 0; i < CRYPTO_num_locks(); ++i) + { + if (pthread_mutex_init(&amqp_openssl_lockarray[i], NULL)) + { + free(amqp_openssl_lockarray); + amqp_openssl_lockarray = NULL; + pthread_mutex_unlock(&openssl_init_mutex); + return -1; + } + } + } + + if (0 == open_ssl_connections) + { + CRYPTO_set_id_callback(amqp_ssl_threadid_callback); + CRYPTO_set_locking_callback(amqp_ssl_locking_callback); + } +#endif /* ENABLE_THREAD_SAFETY */ + if (!openssl_initialized) { OPENSSL_config(NULL); @@ -302,12 +369,37 @@ initialize_openssl() } } - ++open_connections; + ++open_ssl_connections; + +#ifdef ENABLE_THREAD_SAFETY + pthread_mutex_unlock(&openssl_init_mutex); +#endif /* ENABLE_THREAD_SAFETY */ return 0; } static int destroy_openssl() { - --open_connections; +#ifdef ENABLE_THREAD_SAFETY + if (pthread_mutex_lock(&openssl_init_mutex)) + return -1; +#endif /* ENABLE_THREAD_SAFETY */ + + if (open_ssl_connections > 0) + --open_ssl_connections; + +#ifdef ENABLE_THREAD_SAFETY + if (0 == open_ssl_connections && do_initialize_openssl) + { + /* Unsetting these allows the rabbitmq-c library to be unloaded + * safely. We do leak the amqp_openssl_lockarray. Which is only + * an issue if you repeatedly unload and load the library + */ + CRYPTO_set_locking_callback(NULL); + CRYPTO_set_id_callback(NULL); + } + + pthread_mutex_unlock(&openssl_init_mutex); +#endif /* ENABLE_THREAD_SAFETY */ + return 0; } diff --git a/librabbitmq/unix/threads.h b/librabbitmq/unix/threads.h new file mode 100644 index 0000000..4b92b4a --- /dev/null +++ b/librabbitmq/unix/threads.h @@ -0,0 +1,6 @@ +#ifndef AMQP_THREADS_H +#define AMQP_THREADS_H + +#include + +#endif /* AMQP_THREADS_H */ -- cgit v1.2.1