summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Antonuk <aega@med.umich.edu>2012-05-29 16:43:21 -0400
committerAlan Antonuk <aega@med.umich.edu>2012-05-29 16:51:13 -0400
commiteff3e939a4200752b76a79585d1bc2d77afeca26 (patch)
tree0d46e5e2cdba3f07006b4c4c245a50d6ea231514
parenteb467fe75ca2de3b3f6fe7e1502166b0916eea2a (diff)
downloadrabbitmq-c-github-ask-eff3e939a4200752b76a79585d1bc2d77afeca26.tar.gz
Adding support for OpenSSL in multithreaded applications
-rw-r--r--CMakeLists.txt3
-rw-r--r--librabbitmq/CMakeLists.txt6
-rw-r--r--librabbitmq/amqp-openssl.c102
-rw-r--r--librabbitmq/unix/threads.h6
4 files changed, 111 insertions, 6 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 1ae0b7b..721043a 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -70,12 +70,15 @@ if (POPT_FOUND AND XmlTo_FOUND)
set(DO_DOCS ON)
endif()
+find_package(Threads)
+
option(BUILD_SHARED_LIBS "Build rabbitmq-c as a shared library" ON)
option(BUILD_EXAMPLES "Build Examples" ON)
option(BUILD_TOOLS "Build Tools (requires POPT Library)" ${POPT_FOUND})
option(BUILD_TOOLS_DOCS "Build man pages for Tools (requires xmlto)" ${DO_DOCS})
option(BUILD_TESTS "Build tests (run tests with make test)" ON)
option(ENABLE_SSL_SUPPORT "Enable SSL support" ON)
+option(ENABLE_THREAD_SAFETY "Enable thread safety when using OpenSSL" ${Threads_FOUND})
set(SSL_ENGINE "OpenSSL" CACHE STRING "SSL Backend to use, valid options: OpenSSL, cyaSSL, GnuTLS, PolarSSL")
mark_as_advanced(SSL_ENGINE)
diff --git a/librabbitmq/CMakeLists.txt b/librabbitmq/CMakeLists.txt
index 958b641..0fb8026 100644
--- a/librabbitmq/CMakeLists.txt
+++ b/librabbitmq/CMakeLists.txt
@@ -97,6 +97,10 @@ if (ENABLE_SSL_SUPPORT)
else()
message(FATAL_ERROR "Unknown SSL_ENGINE ${SSL_ENGINE}")
endif()
+
+ if (ENABLE_THREAD_SAFETY)
+ add_definitions(-DENABLE_THREAD_SAFETY)
+ endif()
endif()
set(RABBITMQ_SOURCES
@@ -118,7 +122,7 @@ endif()
add_library(rabbitmq ${RABBITMQ_SOURCES})
if (ENABLE_SSL_SUPPORT)
- target_link_libraries(rabbitmq ${AMQP_SSL_LIBS})
+ target_link_libraries(rabbitmq ${AMQP_SSL_LIBS} ${CMAKE_THREAD_LIBS_INIT})
endif()
if(WIN32)
diff --git a/librabbitmq/amqp-openssl.c b/librabbitmq/amqp-openssl.c
index d136b70..965f6e1 100644
--- a/librabbitmq/amqp-openssl.c
+++ b/librabbitmq/amqp-openssl.c
@@ -26,8 +26,10 @@
#include "amqp-ssl.h"
#include "amqp_private.h"
+#include "threads.h"
#include <ctype.h>
#include <openssl/bio.h>
+#include <openssl/conf.h>
#include <openssl/err.h>
#include <openssl/ssl.h>
#include <stdlib.h>
@@ -35,9 +37,17 @@
static int initialize_openssl();
static int destroy_openssl();
-int open_connections = 0;
-amqp_boolean_t do_initialize_openssl = 1;
-amqp_boolean_t openssl_initialized = 0;
+static int open_ssl_connections = 0;
+static amqp_boolean_t do_initialize_openssl = 1;
+static amqp_boolean_t openssl_initialized = 0;
+
+#ifdef ENABLE_THREAD_SAFETY
+static unsigned long amqp_ssl_threadid_callback(void);
+static void amqp_ssl_locking_callback(int mode, int n, const char *file, int line);
+
+static pthread_mutex_t openssl_init_mutex = PTHREAD_MUTEX_INITIALIZER;
+static pthread_mutex_t *amqp_openssl_lockarray = NULL;
+#endif /* ENABLE_THREAD_SAFETY */
struct amqp_ssl_socket_context {
BIO *bio;
@@ -286,11 +296,68 @@ amqp_set_initialize_ssl_library(amqp_boolean_t do_initialize)
}
}
+
+#ifdef ENABLE_THREAD_SAFETY
+unsigned long
+amqp_ssl_threadid_callback(void)
+{
+ return (unsigned long)pthread_self();
+}
+
+void
+amqp_ssl_locking_callback(int mode, int n, const char *file, int line)
+{
+ if (mode & CRYPTO_LOCK)
+ {
+ if (pthread_mutex_lock(&amqp_openssl_lockarray[n]))
+ amqp_abort("Runtime error: Failure in trying to lock OpenSSL mutex");
+ }
+ else
+ {
+ if (pthread_mutex_unlock(&amqp_openssl_lockarray[n]))
+ amqp_abort("Runtime error: Failure in trying to unlock OpenSSL mutex");
+ }
+}
+#endif /* ENABLE_THREAD_SAFETY */
+
static int
initialize_openssl()
{
+#ifdef ENABLE_THREAD_SAFETY
+ if (pthread_mutex_lock(&openssl_init_mutex))
+ return -1;
+#endif /* ENABLE_THREAD_SAFETY */
if (do_initialize_openssl)
{
+#ifdef ENABLE_THREAD_SAFETY
+ if (NULL == amqp_openssl_lockarray)
+ {
+ int i = 0;
+ amqp_openssl_lockarray = calloc(CRYPTO_num_locks(), sizeof(pthread_mutex_t));
+ if (!amqp_openssl_lockarray)
+ {
+ pthread_mutex_unlock(&openssl_init_mutex);
+ return -1;
+ }
+ for (int i = 0; i < CRYPTO_num_locks(); ++i)
+ {
+ if (pthread_mutex_init(&amqp_openssl_lockarray[i], NULL))
+ {
+ free(amqp_openssl_lockarray);
+ amqp_openssl_lockarray = NULL;
+ pthread_mutex_unlock(&openssl_init_mutex);
+ return -1;
+ }
+ }
+ }
+
+ if (0 == open_ssl_connections)
+ {
+ CRYPTO_set_id_callback(amqp_ssl_threadid_callback);
+ CRYPTO_set_locking_callback(amqp_ssl_locking_callback);
+ }
+#endif /* ENABLE_THREAD_SAFETY */
+
if (!openssl_initialized)
{
OPENSSL_config(NULL);
@@ -302,12 +369,37 @@ initialize_openssl()
}
}
- ++open_connections;
+ ++open_ssl_connections;
+
+#ifdef ENABLE_THREAD_SAFETY
+ pthread_mutex_unlock(&openssl_init_mutex);
+#endif /* ENABLE_THREAD_SAFETY */
return 0;
}
static int
destroy_openssl()
{
- --open_connections;
+#ifdef ENABLE_THREAD_SAFETY
+ if (pthread_mutex_lock(&openssl_init_mutex))
+ return -1;
+#endif /* ENABLE_THREAD_SAFETY */
+
+ if (open_ssl_connections > 0)
+ --open_ssl_connections;
+
+#ifdef ENABLE_THREAD_SAFETY
+ if (0 == open_ssl_connections && do_initialize_openssl)
+ {
+ /* Unsetting these allows the rabbitmq-c library to be unloaded
+ * safely. We do leak the amqp_openssl_lockarray. Which is only
+ * an issue if you repeatedly unload and load the library
+ */
+ CRYPTO_set_locking_callback(NULL);
+ CRYPTO_set_id_callback(NULL);
+ }
+
+ pthread_mutex_unlock(&openssl_init_mutex);
+#endif /* ENABLE_THREAD_SAFETY */
+ return 0;
}
diff --git a/librabbitmq/unix/threads.h b/librabbitmq/unix/threads.h
new file mode 100644
index 0000000..4b92b4a
--- /dev/null
+++ b/librabbitmq/unix/threads.h
@@ -0,0 +1,6 @@
+#ifndef AMQP_THREADS_H
+#define AMQP_THREADS_H
+
+#include <pthread.h>
+
+#endif /* AMQP_THREADS_H */