diff options
author | Alan Antonuk <alan.antonuk@gmail.com> | 2013-04-17 15:11:56 -0700 |
---|---|---|
committer | Alan Antonuk <alan.antonuk@gmail.com> | 2013-04-17 15:11:56 -0700 |
commit | 473c8659a8fd4111de8e1c93c45b4261defc63e5 (patch) | |
tree | b8e9533efacf676086a1cc59d503114224261cd3 | |
parent | 21b124e2fd2f1c343fb37b708f393d1b9580cfad (diff) | |
parent | adbaf5f18d5f2480f5b01ea352d6c8ab5b17697d (diff) | |
download | rabbitmq-c-github-ask-473c8659a8fd4111de8e1c93c45b4261defc63e5.tar.gz |
Merge branch 'ssl-plumbing'
Fixes #17
Fixes #14
52 files changed, 4355 insertions, 200 deletions
@@ -14,6 +14,7 @@ /config.status /config.sub /configure +/cscope.* /depcomp /install-sh /libtool @@ -32,7 +33,16 @@ examples/amqp_producer examples/amqp_rpc_sendstring_client examples/amqp_sendstring examples/amqp_unbind +examples/amqps_bind +examples/amqps_consumer +examples/amqps_exchange_declare +examples/amqps_listen +examples/amqps_listenq +examples/amqps_producer +examples/amqps_sendstring +examples/amqps_unbind librabbitmq.pc +test-driver tests/test_parse_url tests/test_tables tools/amqp-consume diff --git a/.travis.yml b/.travis.yml index 8daba29..7dd5503 100644 --- a/.travis.yml +++ b/.travis.yml @@ -10,12 +10,17 @@ compiler: # Settings to try env: + # OpenSSL - PRE_CONFIGURE=true CONFIGURE="cmake .. -DCMAKE_INSTALL_PREFIX=../_install" BUILD_INSTALL="cmake --build . --target install" + # gnutls + #- PRE_CONFIGURE=true CONFIGURE="cmake .. -DSSL_ENGINE=GnuTLS -DCMAKE_INSTALL_PREFIX=../_install" BUILD_INSTALL="cmake --build . --target install" + # PolarSSL + #- PRE_CONFIGURE=true CONFIGURE="cmake .. -DSSL_ENGINE=PolarSSL -DCMAKE_INSTALL_PREFIX=../_install" BUILD_INSTALL="cmake --build . --target install" - PRE_CONFIGURE="autoreconf -i" CONFIGURE="../configure --prefix=`pwd`/_install" BUILD_INSTALL="make install" - + # Make sure CMake is installed install: - - sudo apt-get install cmake libpopt-dev + - sudo apt-get install cmake libpopt-dev libpolarssl-dev libgnutls-dev # Run the Build script script: diff --git a/CMakeLists.txt b/CMakeLists.txt index 55d56e1..91d0812 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -207,6 +207,8 @@ if (POPT_FOUND AND XmlTo_FOUND) set(DO_DOCS ON) endif() +find_package(Threads) + option(BUILD_SHARED_LIBS "Build rabbitmq-c as a shared library" ON) option(BUILD_STATIC_LIBS "Build rabbitmq-c as a static library" OFF) @@ -214,6 +216,29 @@ option(BUILD_EXAMPLES "Build Examples" ON) option(BUILD_TOOLS "Build Tools (requires POPT Library)" ${POPT_FOUND}) option(BUILD_TOOLS_DOCS "Build man pages for Tools (requires xmlto)" ${DO_DOCS}) option(BUILD_TESTS "Build tests (run tests with make test)" ON) +option(ENABLE_SSL_SUPPORT "Enable SSL support" ON) +option(ENABLE_THREAD_SAFETY "Enable thread safety when using OpenSSL" ${Threads_FOUND}) + +set(SSL_ENGINE "OpenSSL" CACHE STRING "SSL Backend to use, valid options: OpenSSL, cyaSSL, GnuTLS, PolarSSL") +mark_as_advanced(SSL_ENGINE) + +if (ENABLE_SSL_SUPPORT) + if (SSL_ENGINE STREQUAL "OpenSSL") + find_package(OpenSSL 0.9.8 REQUIRED) + + elseif (SSL_ENGINE STREQUAL "cyaSSL") + find_package(cyaSSL REQUIRED) + + elseif (SSL_ENGINE STREQUAL "GnuTLS") + find_package(GnuTLS REQUIRED) + + elseif (SSL_ENGINE STREQUAL "PolarSSL") + find_package(PolarSSL REQUIRED) + + else() + message(FATAL_ERROR "Unsupported SSL_ENGINE ${SSL_ENGINE}, valid engines: OpenSSL, cyaSSL, GnuTLS, or PolarSSL") + endif() +endif() if (NOT BUILD_SHARED_LIBS AND NOT BUILD_STATIC_LIBS) message(FATAL_ERROR "One or both of BUILD_SHARED_LIBS or BUILD_STATIC_LIBS must be set to ON to build") diff --git a/Makefile.am b/Makefile.am index d7bbe67..d8b4554 100644 --- a/Makefile.am +++ b/Makefile.am @@ -12,6 +12,8 @@ endif #REGENERATE_AMQP_FRAMING lib_LTLIBRARIES = librabbitmq/librabbitmq.la librabbitmq_librabbitmq_la_SOURCES = \ + librabbitmq/amqp-socket.h \ + librabbitmq/amqp_tcp_socket.c \ librabbitmq/amqp_api.c \ librabbitmq/amqp_connection.c \ librabbitmq/amqp_mem.c \ @@ -26,28 +28,54 @@ else librabbitmq_librabbitmq_la_SOURCES += librabbitmq/amqp_framing.c endif +if SSL_CYASSL +librabbitmq_librabbitmq_la_SOURCES += librabbitmq/amqp_cyassl.c +endif + +if SSL_GNUTLS +librabbitmq_librabbitmq_la_SOURCES += librabbitmq/amqp_gnutls.c +endif + +if SSL_OPENSSL +librabbitmq_librabbitmq_la_SOURCES += librabbitmq/amqp_openssl.c +endif + +if SSL_POLARSSL +librabbitmq_librabbitmq_la_SOURCES += librabbitmq/amqp_polarssl.c +endif + +librabbitmq_librabbitmq_la_CFLAGS = \ + -I$(top_srcdir)/librabbitmq \ + $(SSL_CFLAGS) \ + $(AM_CFLAGS) librabbitmq_librabbitmq_la_LDFLAGS = \ -version-info $(LT_CURRENT):$(LT_REVISION):$(LT_AGE) \ - $(NO_UNDEFINED) -librabbitmq_librabbitmq_la_CFLAGS = $(AM_CFLAGS) - + $(NO_UNDEFINED) \ + $(SSL_LIBS) if OS_UNIX librabbitmq_librabbitmq_la_SOURCES += librabbitmq/unix/socket.c librabbitmq_librabbitmq_la_SOURCES += librabbitmq/unix/socket.h +librabbitmq_librabbitmq_la_SOURCES += librabbitmq/unix/threads.h librabbitmq_librabbitmq_la_CFLAGS += -I$(top_srcdir)/librabbitmq/unix endif if OS_WIN32 librabbitmq_librabbitmq_la_SOURCES += librabbitmq/win32/socket.c librabbitmq_librabbitmq_la_SOURCES += librabbitmq/win32/socket.h +librabbitmq_librabbitmq_la_SOURCES += librabbitmq/win32/threads.h librabbitmq_librabbitmq_la_CFLAGS += -I$(top_srcdir)/librabbitmq/win32 librabbitmq_librabbitmq_la_CFLAGS += -I$(top_srcdir)/librabbitmq/win32/msinttypes endif include_HEADERS = \ $(top_srcdir)/librabbitmq/amqp.h + $(top_builddir)/librabbitmq/amqp_tcp_socket.h + +if SSL +include_HEADERS += librabbitmq/amqp_ssl_socket.h +endif if REGENERATE_AMQP_FRAMING @@ -106,7 +134,10 @@ tests_test_tables_LDADD = librabbitmq/librabbitmq.la tests_test_parse_url_SOURCES = tests/test_parse_url.c tests_test_parse_url_LDADD = librabbitmq/librabbitmq.la -noinst_LTLIBRARIES = examples/libutils.la +noinst_LTLIBRARIES = + +if EXAMPLES +noinst_LTLIBRARIES += examples/libutils.la examples_libutils_la_SOURCES = \ examples/utils.c \ @@ -173,11 +204,65 @@ examples_amqp_listenq_LDADD = \ examples/libutils.la \ librabbitmq/librabbitmq.la -examples_amqp_rpc_sendstring_client_SOURCES = examples/amqp_rpc_sendstring_client.c +examples_amqp_rpc_sendstring_client_SOURCES = \ + examples/amqp_rpc_sendstring_client.c examples_amqp_rpc_sendstring_client_LDADD = \ examples/libutils.la \ librabbitmq/librabbitmq.la +if SSL +noinst_PROGRAMS += \ + examples/amqps_bind \ + examples/amqps_consumer \ + examples/amqps_exchange_declare \ + examples/amqps_listen \ + examples/amqps_listenq \ + examples/amqps_producer \ + examples/amqps_sendstring \ + examples/amqps_unbind + +examples_amqps_bind_SOURCES = examples/amqps_bind.c +examples_amqps_bind_LDADD = \ + examples/libutils.la \ + librabbitmq/librabbitmq.la + +examples_amqps_consumer_SOURCES = examples/amqps_consumer.c +examples_amqps_consumer_LDADD = \ + examples/libutils.la \ + librabbitmq/librabbitmq.la + +examples_amqps_exchange_declare_SOURCES = examples/amqps_exchange_declare.c +examples_amqps_exchange_declare_LDADD = \ + examples/libutils.la \ + librabbitmq/librabbitmq.la + +examples_amqps_listen_SOURCES = examples/amqps_listen.c +examples_amqps_listen_LDADD = \ + examples/libutils.la \ + librabbitmq/librabbitmq.la + +examples_amqps_listenq_SOURCES = examples/amqps_listenq.c +examples_amqps_listenq_LDADD = \ + examples/libutils.la \ + librabbitmq/librabbitmq.la + +examples_amqps_producer_SOURCES = examples/amqps_producer.c +examples_amqps_producer_LDADD = \ + examples/libutils.la \ + librabbitmq/librabbitmq.la + +examples_amqps_sendstring_SOURCES = examples/amqps_sendstring.c +examples_amqps_sendstring_LDADD = \ + examples/libutils.la \ + librabbitmq/librabbitmq.la + +examples_amqps_unbind_SOURCES = examples/amqps_unbind.c +examples_amqps_unbind_LDADD = \ + examples/libutils.la \ + librabbitmq/librabbitmq.la +endif +endif + if TOOLS noinst_LTLIBRARIES += tools/libcommon.la diff --git a/cmake/FindPolarSSL.cmake b/cmake/FindPolarSSL.cmake new file mode 100644 index 0000000..b9d04ef --- /dev/null +++ b/cmake/FindPolarSSL.cmake @@ -0,0 +1,23 @@ +# - Try to find the PolarSSL SSL Library +# The module will set the following variables +# +# POLARSSL_FOUND - System has popt +# POLARSSL_INCLUDE_DIR - The popt include directory +# POLARSSL_LIBRARIES - The libraries needed to use popt + +# Find the include directories +FIND_PATH(POLARSSL_INCLUDE_DIR + NAMES polarssl/ssl.h + DOC "Path containing the polarssl/ssl.h include file" + ) + +FIND_LIBRARY(POLARSSL_LIBRARIES + NAMES polarssl + DOC "polarssl library path" + ) + +include(FindPackageHandleStandardArgs) + +FIND_PACKAGE_HANDLE_STANDARD_ARGS(POLARSSL + REQUIRED_VARS POLARSSL_INCLUDE_DIR POLARSSL_LIBRARIES + ) diff --git a/cmake/FindcyaSSL.cmake b/cmake/FindcyaSSL.cmake new file mode 100644 index 0000000..06bac00 --- /dev/null +++ b/cmake/FindcyaSSL.cmake @@ -0,0 +1,23 @@ +# - Try to find the cyaSSL SSL Library +# The module will set the following variables +# +# CYASSL_FOUND - System has popt +# CYASSL_INCLUDE_DIR - The popt include directory +# CYASSL_LIBRARIES - The libraries needed to use popt + +# Find the include directories +FIND_PATH(CYASSL_INCLUDE_DIR + NAMES cyassl/ssl.h + DOC "Path containing the cyassl/ssl.h include file" + ) + +FIND_LIBRARY(CYASSL_LIBRARIES + NAMES cyassl + DOC "cyassl library path" + ) + +include(FindPackageHandleStandardArgs) + +FIND_PACKAGE_HANDLE_STANDARD_ARGS(CYASSL + REQUIRED_VARS CYASSL_INCLUDE_DIR CYASSL_LIBRARIES + ) diff --git a/configure.ac b/configure.ac index 5bafa6d..c4b058b 100644 --- a/configure.ac +++ b/configure.ac @@ -34,6 +34,7 @@ m4_ifdef([AC_PROG_CC_C99], [AC_PROG_CC_C99], [AC_MSG_WARN([Attempt c99 workaround for old versions of autoconf]) AC_PROG_CC AX_TRY_CFLAGS([-std=c99], [AX_CFLAGS([-std=c99])])]) +PKG_PROG_PKG_CONFIG([0.17]) # Environment setup AC_CANONICAL_HOST @@ -68,6 +69,7 @@ AS_CASE([$host], [os_unix=yes]) AM_CONDITIONAL([OS_UNIX], [test "x$os_unix" = xyes]) AM_CONDITIONAL([OS_WIN32], [test "x$os_win32" = xyes]) +AC_DEFINE([ENABLE_THREAD_SAFETY], [1], [Define to 1 to enable thread safety]) # Extra Win32 setup AS_IF([test "x$os_win32" = xyes], @@ -127,6 +129,35 @@ AS_IF([test "x$enable_regen_amqp_framing" = "xyes"], [HAVE_PYTHON3=no]) AM_CONDITIONAL([PYTHON3], [test "x$HAVE_PYTHON3" = "xyes"]) +# Configure SSL/TLS +AC_ARG_WITH([ssl], + [AS_HELP_STRING([--with-ssl=@<:@cyassl/gnutls/no/openssl/polarssl/yes@:>@], + [enable SSL/TLS support @<:@default=openssl@:>@])], + [AS_CASE([$withval], + [yes], [with_ssl=openssl], + [*], [with_ssl=$withval])], + [with_ssl=openssl]) + +AS_IF([test "x$with_ssl" = "xcyassl"], + [PKG_CHECK_MODULES([SSL], [libcyassl],, [with_ssl=no])], + [test "x$with_ssl" = "xgnutls"], + [PKG_CHECK_MODULES([SSL], [gnutls],, [with_ssl=no])], + [test "x$with_ssl" = "xopenssl"], + [PKG_CHECK_MODULES([SSL], [openssl >= 0.9.8],, [with_ssl=no])], + [test "x$with_ssl" = "xpolarssl"], + [AX_LIB_POLARSSL([SSL_CFLAGS=$POLARSSL_CFLAGS + SSL_LIBS=$POLARSSL_LIBS], + [with_ssl=no])], + [test "x$with_ssl" = "xno"],, + [AC_MSG_ERROR([unknown SSL/TLS implementation: $with_ssl])]) +AM_CONDITIONAL([SSL_CYASSL], [test "x$with_ssl" = "xcyassl"]) +AM_CONDITIONAL([SSL_GNUTLS], [test "x$with_ssl" = "xgnutls"]) +AM_CONDITIONAL([SSL_OPENSSL], [test "x$with_ssl" = "xopenssl"]) +AM_CONDITIONAL([SSL_POLARSSL], [test "x$with_ssl" = "xpolarssl"]) +AM_CONDITIONAL([SSL], [test "x$with_ssl" != "xno"]) +AS_IF([test "x$with_ssl" != "xno"], + [AC_DEFINE([WITH_SSL], [1], [Define to 1 if SSL/TLS is enabled.])]) + # Configure AMQP command-line tools AC_ARG_ENABLE([tools], [AS_HELP_STRING([--enable-tools], @@ -152,6 +183,13 @@ AS_IF([test "x$enable_docs" != "xno"], [enable_docs=no])]) AM_CONDITIONAL([DOCS], [test "x$enable_docs" = "xyes"]) +# Configure examples +AC_ARG_ENABLE([examples], + [AS_HELP_STRING([--enable-examples], + [build example code @<:@auto@:>@])],, + [enable_examples=yes]) +AM_CONDITIONAL([EXAMPLES], [test "x$enable_examples" = "xyes"]) + AC_CONFIG_HEADERS([config.h]) AC_CONFIG_FILES([ librabbitmq.pc @@ -162,6 +200,8 @@ AC_MSG_RESULT([ $PACKAGE_NAME build options: Host: $host Version: $VERSION + SSL/TLS: $with_ssl Tools: $enable_tools Documentation: $enable_docs + Examples: $enable_examples ]) diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 5a610cb..a184cf6 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -17,7 +17,7 @@ add_executable(amqp_sendstring amqp_sendstring.c ${COMMON_SRCS}) target_link_libraries(amqp_sendstring ${RMQ_LIBRARY_TARGET}) add_executable(amqp_rpc_sendstring_client amqp_rpc_sendstring_client.c ${COMMON_SRCS}) -target_link_libraries(amqp_rpc_sendstring_client rabbitmq) +target_link_libraries(amqp_rpc_sendstring_client ${RMQ_LIBRARY_TARGET}) add_executable(amqp_exchange_declare amqp_exchange_declare.c ${COMMON_SRCS}) target_link_libraries(amqp_exchange_declare ${RMQ_LIBRARY_TARGET}) @@ -39,3 +39,29 @@ target_link_libraries(amqp_bind ${RMQ_LIBRARY_TARGET}) add_executable(amqp_listenq amqp_listenq.c ${COMMON_SRCS}) target_link_libraries(amqp_listenq ${RMQ_LIBRARY_TARGET}) + +if (ENABLE_SSL_SUPPORT) +add_executable(amqps_sendstring amqps_sendstring.c ${COMMON_SRCS}) +target_link_libraries(amqps_sendstring ${RMQ_LIBRARY_TARGET}) + +add_executable(amqps_exchange_declare amqps_exchange_declare.c ${COMMON_SRCS}) +target_link_libraries(amqps_exchange_declare ${RMQ_LIBRARY_TARGET}) + +add_executable(amqps_listen amqps_listen.c ${COMMON_SRCS}) +target_link_libraries(amqps_listen ${RMQ_LIBRARY_TARGET}) + +add_executable(amqps_producer amqps_producer.c ${COMMON_SRCS}) +target_link_libraries(amqps_producer ${RMQ_LIBRARY_TARGET}) + +add_executable(amqps_consumer amqps_consumer.c ${COMMON_SRCS}) +target_link_libraries(amqps_consumer ${RMQ_LIBRARY_TARGET}) + +add_executable(amqps_unbind amqps_unbind.c ${COMMON_SRCS}) +target_link_libraries(amqps_unbind ${RMQ_LIBRARY_TARGET}) + +add_executable(amqps_bind amqps_bind.c ${COMMON_SRCS}) +target_link_libraries(amqps_bind ${RMQ_LIBRARY_TARGET}) + +add_executable(amqps_listenq amqps_listenq.c ${COMMON_SRCS}) +target_link_libraries(amqps_listenq ${RMQ_LIBRARY_TARGET}) +endif (ENABLE_SSL_SUPPORT) diff --git a/examples/amqp_bind.c b/examples/amqp_bind.c index b778d25..765e746 100644 --- a/examples/amqp_bind.c +++ b/examples/amqp_bind.c @@ -39,6 +39,7 @@ #include <string.h> #include <stdint.h> +#include <amqp_tcp_socket.h> #include <amqp.h> #include <amqp_framing.h> @@ -47,12 +48,11 @@ int main(int argc, char const *const *argv) { char const *hostname; - int port; + int port, status; char const *exchange; char const *bindingkey; char const *queue; - - int sockfd; + amqp_socket_t *socket = NULL; amqp_connection_state_t conn; if (argc < 6) { @@ -68,8 +68,17 @@ int main(int argc, char const *const *argv) conn = amqp_new_connection(); - die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket"); - amqp_set_sockfd(conn, sockfd); + socket = amqp_tcp_socket_new(); + if (!socket) { + die("creating TCP socket"); + } + + status = amqp_socket_open(socket, hostname, port); + if (status) { + die("opening TCP socket"); + } + + amqp_set_socket(conn, socket); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in"); amqp_channel_open(conn, 1); diff --git a/examples/amqp_consumer.c b/examples/amqp_consumer.c index 878bf29..72bf654 100644 --- a/examples/amqp_consumer.c +++ b/examples/amqp_consumer.c @@ -39,6 +39,7 @@ #include <string.h> #include <stdint.h> +#include <amqp_tcp_socket.h> #include <amqp.h> #include <amqp_framing.h> @@ -125,11 +126,10 @@ static void run(amqp_connection_state_t conn) int main(int argc, char const *const *argv) { char const *hostname; - int port; + int port, status; char const *exchange; char const *bindingkey; - - int sockfd; + amqp_socket_t *socket = NULL; amqp_connection_state_t conn; amqp_bytes_t queuename; @@ -146,8 +146,17 @@ int main(int argc, char const *const *argv) conn = amqp_new_connection(); - die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket"); - amqp_set_sockfd(conn, sockfd); + socket = amqp_tcp_socket_new(); + if (!socket) { + die("creating TCP socket"); + } + + status = amqp_socket_open(socket, hostname, port); + if (status) { + die("opening TCP socket"); + } + + amqp_set_socket(conn, socket); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in"); amqp_channel_open(conn, 1); diff --git a/examples/amqp_exchange_declare.c b/examples/amqp_exchange_declare.c index a12319b..55860e5 100644 --- a/examples/amqp_exchange_declare.c +++ b/examples/amqp_exchange_declare.c @@ -39,6 +39,7 @@ #include <string.h> #include <stdint.h> +#include <amqp_tcp_socket.h> #include <amqp.h> #include <amqp_framing.h> @@ -47,11 +48,10 @@ int main(int argc, char const *const *argv) { char const *hostname; - int port; + int port, status; char const *exchange; char const *exchangetype; - - int sockfd; + amqp_socket_t *socket = NULL; amqp_connection_state_t conn; if (argc < 5) { @@ -66,8 +66,17 @@ int main(int argc, char const *const *argv) conn = amqp_new_connection(); - die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket"); - amqp_set_sockfd(conn, sockfd); + socket = amqp_tcp_socket_new(); + if (!socket) { + die("creating TCP socket"); + } + + status = amqp_socket_open(socket, hostname, port); + if (status) { + die("opening TCP socket"); + } + + amqp_set_socket(conn, socket); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in"); amqp_channel_open(conn, 1); diff --git a/examples/amqp_listen.c b/examples/amqp_listen.c index 1e8fa79..9385c17 100644 --- a/examples/amqp_listen.c +++ b/examples/amqp_listen.c @@ -39,6 +39,7 @@ #include <string.h> #include <stdint.h> +#include <amqp_tcp_socket.h> #include <amqp.h> #include <amqp_framing.h> @@ -49,11 +50,10 @@ int main(int argc, char const *const *argv) { char const *hostname; - int port; + int port, status; char const *exchange; char const *bindingkey; - - int sockfd; + amqp_socket_t *socket = NULL; amqp_connection_state_t conn; amqp_bytes_t queuename; @@ -70,8 +70,17 @@ int main(int argc, char const *const *argv) conn = amqp_new_connection(); - die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket"); - amqp_set_sockfd(conn, sockfd); + socket = amqp_tcp_socket_new(); + if (!socket) { + die("creating TCP socket"); + } + + status = amqp_socket_open(socket, hostname, port); + if (status) { + die("opening TCP socket"); + } + + amqp_set_socket(conn, socket); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in"); amqp_channel_open(conn, 1); diff --git a/examples/amqp_listenq.c b/examples/amqp_listenq.c index 8d9c3e3..54c1189 100644 --- a/examples/amqp_listenq.c +++ b/examples/amqp_listenq.c @@ -39,6 +39,7 @@ #include <string.h> #include <stdint.h> +#include <amqp_tcp_socket.h> #include <amqp.h> #include <amqp_framing.h> @@ -49,10 +50,9 @@ int main(int argc, char const *const *argv) { char const *hostname; - int port; + int port, status; char const *queuename; - - int sockfd; + amqp_socket_t *socket = NULL; amqp_connection_state_t conn; if (argc < 4) { @@ -66,8 +66,17 @@ int main(int argc, char const *const *argv) conn = amqp_new_connection(); - die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket"); - amqp_set_sockfd(conn, sockfd); + socket = amqp_tcp_socket_new(); + if (!socket) { + die("creating TCP socket"); + } + + status = amqp_socket_open(socket, hostname, port); + if (status) { + die("opening TCP socket"); + } + + amqp_set_socket(conn, socket); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in"); amqp_channel_open(conn, 1); diff --git a/examples/amqp_producer.c b/examples/amqp_producer.c index 229756b..efa1a20 100644 --- a/examples/amqp_producer.c +++ b/examples/amqp_producer.c @@ -39,6 +39,7 @@ #include <string.h> #include <stdint.h> +#include <amqp_tcp_socket.h> #include <amqp.h> #include <amqp_framing.h> @@ -111,11 +112,10 @@ static void send_batch(amqp_connection_state_t conn, int main(int argc, char const *const *argv) { char const *hostname; - int port; + int port, status; int rate_limit; int message_count; - - int sockfd; + amqp_socket_t *socket = NULL; amqp_connection_state_t conn; if (argc < 5) { @@ -130,8 +130,17 @@ int main(int argc, char const *const *argv) conn = amqp_new_connection(); - die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket"); - amqp_set_sockfd(conn, sockfd); + socket = amqp_tcp_socket_new(); + if (!socket) { + die("creating TCP socket"); + } + + status = amqp_socket_open(socket, hostname, port); + if (status) { + die("opening TCP socket"); + } + + amqp_set_socket(conn, socket); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in"); amqp_channel_open(conn, 1); diff --git a/examples/amqp_rpc_sendstring_client.c b/examples/amqp_rpc_sendstring_client.c index 3357524..6688195 100644 --- a/examples/amqp_rpc_sendstring_client.c +++ b/examples/amqp_rpc_sendstring_client.c @@ -39,6 +39,7 @@ #include <string.h> #include <stdint.h> +#include <amqp_tcp_socket.h> #include <amqp.h> #include <amqp_framing.h> @@ -49,12 +50,11 @@ int main(int argc, char *argv[]) { char const *hostname; - int port; + int port, status; char const *exchange; char const *routingkey; char const *messagebody; - - int sockfd; + amqp_socket_t *socket = NULL; amqp_connection_state_t conn; amqp_bytes_t reply_to_queue; @@ -75,8 +75,17 @@ int main(int argc, char *argv[]) conn = amqp_new_connection(); - die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket"); - amqp_set_sockfd(conn, sockfd); + socket = amqp_tcp_socket_new(); + if (!socket) { + die("creating TCP socket"); + } + + status = amqp_socket_open(socket, hostname, port); + if (status) { + die("opening TCP socket"); + } + + amqp_set_socket(conn, socket); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in"); amqp_channel_open(conn, 1); diff --git a/examples/amqp_sendstring.c b/examples/amqp_sendstring.c index 424bd68..0b64024 100644 --- a/examples/amqp_sendstring.c +++ b/examples/amqp_sendstring.c @@ -39,6 +39,7 @@ #include <string.h> #include <stdint.h> +#include <amqp_tcp_socket.h> #include <amqp.h> #include <amqp_framing.h> @@ -47,12 +48,11 @@ int main(int argc, char const *const *argv) { char const *hostname; - int port; + int port, status; char const *exchange; char const *routingkey; char const *messagebody; - - int sockfd; + amqp_socket_t *socket = NULL; amqp_connection_state_t conn; if (argc < 6) { @@ -68,8 +68,17 @@ int main(int argc, char const *const *argv) conn = amqp_new_connection(); - die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket"); - amqp_set_sockfd(conn, sockfd); + socket = amqp_tcp_socket_new(); + if (!socket) { + die("creating TCP socket"); + } + + status = amqp_socket_open(socket, hostname, port); + if (status) { + die("opening TCP socket"); + } + + amqp_set_socket(conn, socket); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in"); amqp_channel_open(conn, 1); diff --git a/examples/amqp_unbind.c b/examples/amqp_unbind.c index 77ac724..7948d0b 100644 --- a/examples/amqp_unbind.c +++ b/examples/amqp_unbind.c @@ -39,6 +39,7 @@ #include <string.h> #include <stdint.h> +#include <amqp_tcp_socket.h> #include <amqp.h> #include <amqp_framing.h> @@ -47,12 +48,11 @@ int main(int argc, char const *const *argv) { char const *hostname; - int port; + int port, status; char const *exchange; char const *bindingkey; char const *queue; - - int sockfd; + amqp_socket_t *socket = NULL; amqp_connection_state_t conn; if (argc < 6) { @@ -68,8 +68,17 @@ int main(int argc, char const *const *argv) conn = amqp_new_connection(); - die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket"); - amqp_set_sockfd(conn, sockfd); + socket = amqp_tcp_socket_new(); + if (!socket) { + die("creating TCP socket"); + } + + status = amqp_socket_open(socket, hostname, port); + if (status) { + die("opening TCP socket"); + } + + amqp_set_socket(conn, socket); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in"); amqp_channel_open(conn, 1); diff --git a/examples/amqps_bind.c b/examples/amqps_bind.c new file mode 100644 index 0000000..fbde025 --- /dev/null +++ b/examples/amqps_bind.c @@ -0,0 +1,115 @@ +/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MIT + * + * Portions created by Alan Antonuk are Copyright (c) 2012-2013 + * Alan Antonuk. All Rights Reserved. + * + * Portions created by Mike Steinert are Copyright (c) 2012-2013 + * Mike Steinert. All Rights Reserved. + * + * Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc. + * All Rights Reserved. + * + * Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010 + * VMware, Inc. and Tony Garnock-Jones. All Rights Reserved. + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, copy, + * modify, merge, publish, distribute, sublicense, and/or sell copies + * of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * ***** END LICENSE BLOCK ***** + */ + +#include <stdlib.h> +#include <stdio.h> +#include <string.h> + +#include <stdint.h> +#include <amqp_ssl_socket.h> +#include <amqp_framing.h> + +#include "utils.h" + +int main(int argc, char const *const *argv) +{ + char const *hostname; + int port, status; + char const *exchange; + char const *bindingkey; + char const *queue; + amqp_socket_t *socket; + amqp_connection_state_t conn; + + if (argc < 6) { + fprintf(stderr, "Usage: amqps_bind host port exchange bindingkey queue " + "[cacert.pem [key.pem cert.pem]]\n"); + return 1; + } + + hostname = argv[1]; + port = atoi(argv[2]); + exchange = argv[3]; + bindingkey = argv[4]; + queue = argv[5]; + + conn = amqp_new_connection(); + + socket = amqp_ssl_socket_new(); + if (!socket) { + die("creating SSL/TLS socket"); + } + + if (argc > 6) { + status = amqp_ssl_socket_set_cacert(socket, argv[6]); + if (status) { + die("setting CA certificate"); + } + } + + if (argc > 8) { + status = amqp_ssl_socket_set_key(socket, argv[8], argv[7]); + if (status) { + die("setting client cert"); + } + } + + status = amqp_socket_open(socket, hostname, port); + if (status) { + die("opening SSL/TLS connection"); + } + + amqp_set_socket(conn, socket); + die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), + "Logging in"); + amqp_channel_open(conn, 1); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); + + amqp_queue_bind(conn, 1, + amqp_cstring_bytes(queue), + amqp_cstring_bytes(exchange), + amqp_cstring_bytes(bindingkey), + amqp_empty_table); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Unbinding"); + + die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); + die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); + die_on_error(amqp_destroy_connection(conn), "Ending connection"); + return 0; +} diff --git a/examples/amqps_consumer.c b/examples/amqps_consumer.c new file mode 100644 index 0000000..137457f --- /dev/null +++ b/examples/amqps_consumer.c @@ -0,0 +1,206 @@ +/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MIT + * + * Portions created by Alan Antonuk are Copyright (c) 2012-2013 + * Alan Antonuk. All Rights Reserved. + * + * Portions created by Mike Steinert are Copyright (c) 2012-2013 + * Mike Steinert. All Rights Reserved. + * + * Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc. + * All Rights Reserved. + * + * Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010 + * VMware, Inc. and Tony Garnock-Jones. All Rights Reserved. + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, copy, + * modify, merge, publish, distribute, sublicense, and/or sell copies + * of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * ***** END LICENSE BLOCK ***** + */ + +#include <stdlib.h> +#include <stdio.h> +#include <string.h> + +#include <stdint.h> +#include <amqp_ssl_socket.h> +#include <amqp_framing.h> + +#include <assert.h> + +#include "utils.h" + +#define SUMMARY_EVERY_US 1000000 + +static void run(amqp_connection_state_t conn) +{ + uint64_t start_time = now_microseconds(); + int received = 0; + int previous_received = 0; + uint64_t previous_report_time = start_time; + uint64_t next_summary_time = start_time + SUMMARY_EVERY_US; + + amqp_frame_t frame; + int result; + size_t body_received; + size_t body_target; + + uint64_t now; + + while (1) { + now = now_microseconds(); + if (now > next_summary_time) { + int countOverInterval = received - previous_received; + double intervalRate = countOverInterval / ((now - previous_report_time) / 1000000.0); + printf("%d ms: Received %d - %d since last report (%d Hz)\n", + (int)(now - start_time) / 1000, received, countOverInterval, (int) intervalRate); + + previous_received = received; + previous_report_time = now; + next_summary_time += SUMMARY_EVERY_US; + } + + amqp_maybe_release_buffers(conn); + result = amqp_simple_wait_frame(conn, &frame); + if (result < 0) { + return; + } + + if (frame.frame_type != AMQP_FRAME_METHOD) { + continue; + } + + if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) { + continue; + } + + result = amqp_simple_wait_frame(conn, &frame); + if (result < 0) { + return; + } + + if (frame.frame_type != AMQP_FRAME_HEADER) { + fprintf(stderr, "Expected header!"); + abort(); + } + + body_target = frame.payload.properties.body_size; + body_received = 0; + + while (body_received < body_target) { + result = amqp_simple_wait_frame(conn, &frame); + if (result < 0) { + return; + } + + if (frame.frame_type != AMQP_FRAME_BODY) { + fprintf(stderr, "Expected body!"); + abort(); + } + + body_received += frame.payload.body_fragment.len; + assert(body_received <= body_target); + } + + received++; + } +} + +int main(int argc, char const *const *argv) +{ + char const *hostname; + int port, status; + char const *exchange; + char const *bindingkey; + amqp_socket_t *socket; + amqp_connection_state_t conn; + amqp_bytes_t queuename; + + if (argc < 3) { + fprintf(stderr, "Usage: amqps_consumer host port " + "[cacert.pem [key.pem cert.pem]]\n"); + return 1; + } + + hostname = argv[1]; + port = atoi(argv[2]); + exchange = "amq.direct"; /* argv[3]; */ + bindingkey = "test queue"; /* argv[4]; */ + + conn = amqp_new_connection(); + + socket = amqp_ssl_socket_new(); + if (!socket) { + die("creating SSL/TLS socket"); + } + + if (argc > 3) { + status = amqp_ssl_socket_set_cacert(socket, argv[3]); + if (status) { + die("setting CA certificate"); + } + } + + if (argc > 5) { + status = amqp_ssl_socket_set_key(socket, argv[5], argv[4]); + if (status) { + die("setting client key"); + } + } + + status = amqp_socket_open(socket, hostname, port); + if (status) { + die("opening SSL/TLS connection"); + } + + amqp_set_socket(conn, socket); + die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), + "Logging in"); + amqp_channel_open(conn, 1); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); + + { + amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_empty_bytes, 0, 0, 0, 1, + amqp_empty_table); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue"); + queuename = amqp_bytes_malloc_dup(r->queue); + if (queuename.bytes == NULL) { + fprintf(stderr, "Out of memory while copying queue name"); + return 1; + } + } + + amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey), + amqp_empty_table); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue"); + + amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 1, 0, amqp_empty_table); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming"); + + run(conn); + + die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); + die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); + die_on_error(amqp_destroy_connection(conn), "Ending connection"); + + return 0; +} diff --git a/examples/amqps_exchange_declare.c b/examples/amqps_exchange_declare.c new file mode 100644 index 0000000..bae2f57 --- /dev/null +++ b/examples/amqps_exchange_declare.c @@ -0,0 +1,110 @@ +/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MIT + * + * Portions created by Alan Antonuk are Copyright (c) 2012-2013 + * Alan Antonuk. All Rights Reserved. + * + * Portions created by Mike Steinert are Copyright (c) 2012-2013 + * Mike Steinert. All Rights Reserved. + * + * Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc. + * All Rights Reserved. + * + * Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010 + * VMware, Inc. and Tony Garnock-Jones. All Rights Reserved. + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, copy, + * modify, merge, publish, distribute, sublicense, and/or sell copies + * of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * ***** END LICENSE BLOCK ***** + */ + +#include <stdlib.h> +#include <stdio.h> +#include <string.h> + +#include <stdint.h> +#include <amqp_ssl_socket.h> +#include <amqp_framing.h> + +#include "utils.h" + +int main(int argc, char const *const *argv) +{ + char const *hostname; + int port, status; + char const *exchange; + char const *exchangetype; + amqp_socket_t *socket; + amqp_connection_state_t conn; + + if (argc < 5) { + fprintf(stderr, "Usage: amqps_exchange_declare host port exchange " + "exchangetype [cacert.pem [key.pem cert.pem]]\n"); + return 1; + } + + hostname = argv[1]; + port = atoi(argv[2]); + exchange = argv[3]; + exchangetype = argv[4]; + + conn = amqp_new_connection(); + + socket = amqp_ssl_socket_new(); + if (!socket) { + die("creating SSL/TLS socket"); + } + + if (argc > 5) { + status = amqp_ssl_socket_set_cacert(socket, argv[5]); + if (status) { + die("setting CA certificate"); + } + } + + if (argc > 7) { + status = amqp_ssl_socket_set_key(socket, argv[7], argv[6]); + if (status) { + die("setting client key/cert"); + } + } + + status = amqp_socket_open(socket, hostname, port); + if (status) { + die("opening SSL/TLS connection"); + } + + amqp_set_socket(conn, socket); + die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), + "Logging in"); + amqp_channel_open(conn, 1); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); + + amqp_exchange_declare(conn, 1, amqp_cstring_bytes(exchange), amqp_cstring_bytes(exchangetype), + 0, 0, amqp_empty_table); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring exchange"); + + die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); + die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); + die_on_error(amqp_destroy_connection(conn), "Ending connection"); + return 0; +} diff --git a/examples/amqps_listen.c b/examples/amqps_listen.c new file mode 100644 index 0000000..0e45162 --- /dev/null +++ b/examples/amqps_listen.c @@ -0,0 +1,207 @@ +/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MIT + * + * Portions created by Alan Antonuk are Copyright (c) 2012-2013 + * Alan Antonuk. All Rights Reserved. + * + * Portions created by Mike Steinert are Copyright (c) 2012-2013 + * Mike Steinert. All Rights Reserved. + * + * Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc. + * All Rights Reserved. + * + * Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010 + * VMware, Inc. and Tony Garnock-Jones. All Rights Reserved. + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, copy, + * modify, merge, publish, distribute, sublicense, and/or sell copies + * of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * ***** END LICENSE BLOCK ***** + */ + +#include <stdlib.h> +#include <stdio.h> +#include <string.h> + +#include <stdint.h> +#include <amqp_ssl_socket.h> +#include <amqp_framing.h> + +#include <assert.h> + +#include "utils.h" + +int main(int argc, char const *const *argv) +{ + char const *hostname; + int port, status; + char const *exchange; + char const *bindingkey; + amqp_socket_t *socket; + amqp_connection_state_t conn; + + amqp_bytes_t queuename; + + if (argc < 5) { + fprintf(stderr, "Usage: amqps_listen host port exchange bindingkey " + "[cacert.pem [key.pem cert.pem]]\n"); + return 1; + } + + hostname = argv[1]; + port = atoi(argv[2]); + exchange = argv[3]; + bindingkey = argv[4]; + + conn = amqp_new_connection(); + + socket = amqp_ssl_socket_new(); + if (!socket) { + die("creating SSL/TLS socket"); + } + + if (argc > 5) { + status = amqp_ssl_socket_set_cacert(socket, argv[5]); + if (status) { + die("setting CA certificate"); + } + } + + if (argc > 7) { + status = amqp_ssl_socket_set_key(socket, argv[7], argv[6]); + if (status) { + die("setting client cert"); + } + } + + status = amqp_socket_open(socket, hostname, port); + if (status) { + die("opening SSL/TLS connection"); + } + + amqp_set_socket(conn, socket); + die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), + "Logging in"); + amqp_channel_open(conn, 1); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); + + { + amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_empty_bytes, 0, 0, 0, 1, + amqp_empty_table); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue"); + queuename = amqp_bytes_malloc_dup(r->queue); + if (queuename.bytes == NULL) { + fprintf(stderr, "Out of memory while copying queue name"); + return 1; + } + } + + amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey), + amqp_empty_table); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue"); + + amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 1, 0, amqp_empty_table); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming"); + + { + amqp_frame_t frame; + int result; + + amqp_basic_deliver_t *d; + amqp_basic_properties_t *p; + size_t body_target; + size_t body_received; + + while (1) { + amqp_maybe_release_buffers(conn); + result = amqp_simple_wait_frame(conn, &frame); + printf("Result %d\n", result); + if (result < 0) { + break; + } + + printf("Frame type %d, channel %d\n", frame.frame_type, frame.channel); + if (frame.frame_type != AMQP_FRAME_METHOD) { + continue; + } + + printf("Method %s\n", amqp_method_name(frame.payload.method.id)); + if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) { + continue; + } + + d = (amqp_basic_deliver_t *) frame.payload.method.decoded; + printf("Delivery %u, exchange %.*s routingkey %.*s\n", + (unsigned) d->delivery_tag, + (int) d->exchange.len, (char *) d->exchange.bytes, + (int) d->routing_key.len, (char *) d->routing_key.bytes); + + result = amqp_simple_wait_frame(conn, &frame); + if (result < 0) { + break; + } + + if (frame.frame_type != AMQP_FRAME_HEADER) { + fprintf(stderr, "Expected header!"); + abort(); + } + p = (amqp_basic_properties_t *) frame.payload.properties.decoded; + if (p->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) { + printf("Content-type: %.*s\n", + (int) p->content_type.len, (char *) p->content_type.bytes); + } + printf("----\n"); + + body_target = frame.payload.properties.body_size; + body_received = 0; + + while (body_received < body_target) { + result = amqp_simple_wait_frame(conn, &frame); + if (result < 0) { + break; + } + + if (frame.frame_type != AMQP_FRAME_BODY) { + fprintf(stderr, "Expected body!"); + abort(); + } + + body_received += frame.payload.body_fragment.len; + assert(body_received <= body_target); + + amqp_dump(frame.payload.body_fragment.bytes, + frame.payload.body_fragment.len); + } + + if (body_received != body_target) { + /* Can only happen when amqp_simple_wait_frame returns <= 0 */ + /* We break here to close the connection */ + break; + } + } + } + + die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); + die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); + die_on_error(amqp_destroy_connection(conn), "Ending connection"); + + return 0; +} diff --git a/examples/amqps_listenq.c b/examples/amqps_listenq.c new file mode 100644 index 0000000..321c6a3 --- /dev/null +++ b/examples/amqps_listenq.c @@ -0,0 +1,190 @@ +/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MIT + * + * Portions created by Alan Antonuk are Copyright (c) 2012-2013 + * Alan Antonuk. All Rights Reserved. + * + * Portions created by Mike Steinert are Copyright (c) 2012-2013 + * Mike Steinert. All Rights Reserved. + * + * Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc. + * All Rights Reserved. + * + * Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010 + * VMware, Inc. and Tony Garnock-Jones. All Rights Reserved. + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, copy, + * modify, merge, publish, distribute, sublicense, and/or sell copies + * of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * ***** END LICENSE BLOCK ***** + */ + +#include <stdlib.h> +#include <stdio.h> +#include <string.h> + +#include <stdint.h> +#include <amqp_ssl_socket.h> +#include <amqp_framing.h> + +#include <assert.h> + +#include "utils.h" + +int main(int argc, char const *const *argv) +{ + char const *hostname; + int port, status; + char const *queuename; + amqp_socket_t *socket; + amqp_connection_state_t conn; + + if (argc < 4) { + fprintf(stderr, "Usage: amqps_listenq host port queuename " + "[cacert.pem [key.pem cert.pem]]\n"); + return 1; + } + + hostname = argv[1]; + port = atoi(argv[2]); + queuename = argv[3]; + + conn = amqp_new_connection(); + + socket = amqp_ssl_socket_new(); + if (!socket) { + die("creating SSL/TLS socket"); + } + + if (argc > 4) { + status = amqp_ssl_socket_set_cacert(socket, argv[4]); + if (status) { + die("setting CA certificate"); + } + } + + if (argc > 6) { + status = amqp_ssl_socket_set_key(socket, argv[6], argv[5]); + if (status) { + die("setting client cert"); + } + } + + status = amqp_socket_open(socket, hostname, port); + if (status) { + die("opening SSL/TLS connection"); + } + + amqp_set_socket(conn, socket); + die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), + "Logging in"); + amqp_channel_open(conn, 1); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); + + amqp_basic_consume(conn, 1, amqp_cstring_bytes(queuename), amqp_empty_bytes, 0, 0, 0, amqp_empty_table); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming"); + + { + amqp_frame_t frame; + int result; + + amqp_basic_deliver_t *d; + amqp_basic_properties_t *p; + size_t body_target; + size_t body_received; + + while (1) { + amqp_maybe_release_buffers(conn); + result = amqp_simple_wait_frame(conn, &frame); + printf("Result %d\n", result); + if (result < 0) { + break; + } + + printf("Frame type %d, channel %d\n", frame.frame_type, frame.channel); + if (frame.frame_type != AMQP_FRAME_METHOD) { + continue; + } + + printf("Method %s\n", amqp_method_name(frame.payload.method.id)); + if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) { + continue; + } + + d = (amqp_basic_deliver_t *) frame.payload.method.decoded; + printf("Delivery %u, exchange %.*s routingkey %.*s\n", + (unsigned) d->delivery_tag, + (int) d->exchange.len, (char *) d->exchange.bytes, + (int) d->routing_key.len, (char *) d->routing_key.bytes); + + result = amqp_simple_wait_frame(conn, &frame); + if (result < 0) { + break; + } + + if (frame.frame_type != AMQP_FRAME_HEADER) { + fprintf(stderr, "Expected header!"); + abort(); + } + p = (amqp_basic_properties_t *) frame.payload.properties.decoded; + if (p->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) { + printf("Content-type: %.*s\n", + (int) p->content_type.len, (char *) p->content_type.bytes); + } + printf("----\n"); + + body_target = frame.payload.properties.body_size; + body_received = 0; + + while (body_received < body_target) { + result = amqp_simple_wait_frame(conn, &frame); + if (result < 0) { + break; + } + + if (frame.frame_type != AMQP_FRAME_BODY) { + fprintf(stderr, "Expected body!"); + abort(); + } + + body_received += frame.payload.body_fragment.len; + assert(body_received <= body_target); + + amqp_dump(frame.payload.body_fragment.bytes, + frame.payload.body_fragment.len); + } + + if (body_received != body_target) { + /* Can only happen when amqp_simple_wait_frame returns <= 0 */ + /* We break here to close the connection */ + break; + } + + amqp_basic_ack(conn, 1, d->delivery_tag, 0); + } + } + + die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); + die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); + die_on_error(amqp_destroy_connection(conn), "Ending connection"); + + return 0; +} diff --git a/examples/amqps_producer.c b/examples/amqps_producer.c new file mode 100644 index 0000000..f8f6dc6 --- /dev/null +++ b/examples/amqps_producer.c @@ -0,0 +1,172 @@ +/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MIT + * + * Portions created by Alan Antonuk are Copyright (c) 2012-2013 + * Alan Antonuk. All Rights Reserved. + * + * Portions created by Mike Steinert are Copyright (c) 2012-2013 + * Mike Steinert. All Rights Reserved. + * + * Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc. + * All Rights Reserved. + * + * Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010 + * VMware, Inc. and Tony Garnock-Jones. All Rights Reserved. + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, copy, + * modify, merge, publish, distribute, sublicense, and/or sell copies + * of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * ***** END LICENSE BLOCK ***** + */ + +#include <stdlib.h> +#include <stdio.h> +#include <string.h> + +#include <stdint.h> +#include <amqp_ssl_socket.h> +#include <amqp_framing.h> + +#include "utils.h" + +#define SUMMARY_EVERY_US 1000000 + +static void send_batch(amqp_connection_state_t conn, + char const *queue_name, + int rate_limit, + int message_count) +{ + uint64_t start_time = now_microseconds(); + int i; + int sent = 0; + int previous_sent = 0; + uint64_t previous_report_time = start_time; + uint64_t next_summary_time = start_time + SUMMARY_EVERY_US; + + char message[256]; + amqp_bytes_t message_bytes; + + for (i = 0; i < (int)sizeof(message); i++) { + message[i] = i & 0xff; + } + + message_bytes.len = sizeof(message); + message_bytes.bytes = message; + + for (i = 0; i < message_count; i++) { + uint64_t now = now_microseconds(); + + die_on_error(amqp_basic_publish(conn, + 1, + amqp_cstring_bytes("amq.direct"), + amqp_cstring_bytes(queue_name), + 0, + 0, + NULL, + message_bytes), + "Publishing"); + sent++; + if (now > next_summary_time) { + int countOverInterval = sent - previous_sent; + double intervalRate = countOverInterval / ((now - previous_report_time) / 1000000.0); + printf("%d ms: Sent %d - %d since last report (%d Hz)\n", + (int)(now - start_time) / 1000, sent, countOverInterval, (int) intervalRate); + + previous_sent = sent; + previous_report_time = now; + next_summary_time += SUMMARY_EVERY_US; + } + + while (((i * 1000000.0) / (now - start_time)) > rate_limit) { + microsleep(2000); + now = now_microseconds(); + } + } + + { + uint64_t stop_time = now_microseconds(); + int total_delta = stop_time - start_time; + + printf("PRODUCER - Message count: %d\n", message_count); + printf("Total time, milliseconds: %d\n", total_delta / 1000); + printf("Overall messages-per-second: %g\n", (message_count / (total_delta / 1000000.0))); + } +} + +int main(int argc, char const *const *argv) +{ + char const *hostname; + int port, status; + int rate_limit; + int message_count; + amqp_socket_t *socket; + amqp_connection_state_t conn; + + if (argc < 5) { + fprintf(stderr, "Usage: amqps_producer host port rate_limit message_count " + "[cacert.pem [key.pem cert.pem]]\n"); + return 1; + } + + hostname = argv[1]; + port = atoi(argv[2]); + rate_limit = atoi(argv[3]); + message_count = atoi(argv[4]); + + conn = amqp_new_connection(); + + socket = amqp_ssl_socket_new(); + if (!socket) { + die("creating SSL/TLS socket"); + } + + if (argc > 5) { + status = amqp_ssl_socket_set_cacert(socket, argv[5]); + if (status) { + die("setting CA certificate"); + } + } + + if (argc > 7) { + status = amqp_ssl_socket_set_key(socket, argv[7], argv[6]); + if (status) { + die("setting client cert"); + } + } + + status = amqp_socket_open(socket, hostname, port); + if (status) { + die("opening SSL/TLS connection"); + } + + amqp_set_socket(conn, socket); + die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), + "Logging in"); + amqp_channel_open(conn, 1); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); + + send_batch(conn, "test queue", rate_limit, message_count); + + die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); + die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); + die_on_error(amqp_destroy_connection(conn), "Ending connection"); + return 0; +} diff --git a/examples/amqps_sendstring.c b/examples/amqps_sendstring.c new file mode 100644 index 0000000..7465ef2 --- /dev/null +++ b/examples/amqps_sendstring.c @@ -0,0 +1,124 @@ +/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MIT + * + * Portions created by Alan Antonuk are Copyright (c) 2012-2013 + * Alan Antonuk. All Rights Reserved. + * + * Portions created by Mike Steinert are Copyright (c) 2012-2013 + * Mike Steinert. All Rights Reserved. + * + * Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc. + * All Rights Reserved. + * + * Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010 + * VMware, Inc. and Tony Garnock-Jones. All Rights Reserved. + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, copy, + * modify, merge, publish, distribute, sublicense, and/or sell copies + * of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * ***** END LICENSE BLOCK ***** + */ + +#include <stdlib.h> +#include <stdio.h> +#include <string.h> + +#include <stdint.h> +#include <amqp_ssl_socket.h> +#include <amqp_framing.h> + +#include "utils.h" + +int main(int argc, char const *const *argv) +{ + char const *hostname; + int port, status; + char const *exchange; + char const *routingkey; + char const *messagebody; + amqp_socket_t *socket; + amqp_connection_state_t conn; + + if (argc < 6) { + fprintf(stderr, "Usage: amqps_sendstring host port exchange routingkey " + "messagebody [cacert.pem [key.pem cert.pem]]\n"); + return 1; + } + + hostname = argv[1]; + port = atoi(argv[2]); + exchange = argv[3]; + routingkey = argv[4]; + messagebody = argv[5]; + + conn = amqp_new_connection(); + + socket = amqp_ssl_socket_new(); + if (!socket) { + die("creating SSL/TLS socket"); + } + + if (argc > 6) { + status = amqp_ssl_socket_set_cacert(socket, argv[6]); + if (status) { + die("setting CA certificate"); + } + } + + if (argc > 8) { + status = amqp_ssl_socket_set_key(socket, argv[8], argv[7]); + if (status) { + die("setting client cert"); + } + } + + status = amqp_socket_open(socket, hostname, port); + if (status) { + die("opening SSL/TLS connection"); + } + + amqp_set_socket(conn, socket); + die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), + "Logging in"); + amqp_channel_open(conn, 1); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); + + { + amqp_basic_properties_t props; + props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG; + props.content_type = amqp_cstring_bytes("text/plain"); + props.delivery_mode = 2; /* persistent delivery mode */ + die_on_error(amqp_basic_publish(conn, + 1, + amqp_cstring_bytes(exchange), + amqp_cstring_bytes(routingkey), + 0, + 0, + &props, + amqp_cstring_bytes(messagebody)), + "Publishing"); + } + + die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); + die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); + die_on_error(amqp_destroy_connection(conn), "Ending connection"); + return 0; +} diff --git a/examples/amqps_unbind.c b/examples/amqps_unbind.c new file mode 100644 index 0000000..bae017d --- /dev/null +++ b/examples/amqps_unbind.c @@ -0,0 +1,115 @@ +/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MIT + * + * Portions created by Alan Antonuk are Copyright (c) 2012-2013 + * Alan Antonuk. All Rights Reserved. + * + * Portions created by Mike Steinert are Copyright (c) 2012-2013 + * Mike Steinert. All Rights Reserved. + * + * Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc. + * All Rights Reserved. + * + * Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010 + * VMware, Inc. and Tony Garnock-Jones. All Rights Reserved. + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, copy, + * modify, merge, publish, distribute, sublicense, and/or sell copies + * of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * ***** END LICENSE BLOCK ***** + */ + +#include <stdlib.h> +#include <stdio.h> +#include <string.h> + +#include <stdint.h> +#include <amqp_ssl_socket.h> +#include <amqp_framing.h> + +#include "utils.h" + +int main(int argc, char const *const *argv) +{ + char const *hostname; + int port, status; + char const *exchange; + char const *bindingkey; + char const *queue; + amqp_socket_t *socket; + amqp_connection_state_t conn; + + if (argc < 6) { + fprintf(stderr, "Usage: amqps_unbind host port exchange bindingkey queue " + "[cacert.pem [key.pem cert.pem]]\n"); + return 1; + } + + hostname = argv[1]; + port = atoi(argv[2]); + exchange = argv[3]; + bindingkey = argv[4]; + queue = argv[5]; + + conn = amqp_new_connection(); + + socket = amqp_ssl_socket_new(); + if (!socket) { + die("creating SSL/TLS socket"); + } + + if (argc > 6) { + status = amqp_ssl_socket_set_cacert(socket, argv[6]); + if (status) { + die("setting CA certificate"); + } + } + + if (argc > 8) { + status = amqp_ssl_socket_set_key(socket, argv[8], argv[7]); + if (status) { + die("setting client cert"); + } + } + + status = amqp_socket_open(socket, hostname, port); + if (status) { + die("opening SSL/TLS connection"); + } + + amqp_set_socket(conn, socket); + die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), + "Logging in"); + amqp_channel_open(conn, 1); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); + + amqp_queue_unbind(conn, 1, + amqp_cstring_bytes(queue), + amqp_cstring_bytes(exchange), + amqp_cstring_bytes(bindingkey), + amqp_empty_table); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Unbinding"); + + die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); + die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); + die_on_error(amqp_destroy_connection(conn), "Ending connection"); + return 0; +} diff --git a/examples/utils.c b/examples/utils.c index 609c354..4b00470 100644 --- a/examples/utils.c +++ b/examples/utils.c @@ -34,6 +34,7 @@ * ***** END LICENSE BLOCK ***** */ +#include <stdarg.h> #include <stdlib.h> #include <stdio.h> #include <string.h> @@ -45,6 +46,16 @@ #include "utils.h" +void die(const char *fmt, ...) +{ + va_list ap; + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); + fprintf(stderr, "\n"); + exit(1); +} + void die_on_error(int x, char const *context) { if (x < 0) { diff --git a/examples/utils.h b/examples/utils.h index 2e7b15f..dea86da 100644 --- a/examples/utils.h +++ b/examples/utils.h @@ -37,6 +37,7 @@ * ***** END LICENSE BLOCK ***** */ +void die(const char *fmt, ...); extern void die_on_error(int x, char const *context); extern void die_on_amqp_error(amqp_rpc_reply_t x, char const *context); diff --git a/librabbitmq/CMakeLists.txt b/librabbitmq/CMakeLists.txt index a9996cf..52e2682 100644 --- a/librabbitmq/CMakeLists.txt +++ b/librabbitmq/CMakeLists.txt @@ -76,13 +76,51 @@ set(LIBRABBITMQ_INCLUDE_DIRS add_definitions(-DHAVE_CONFIG_H) +if (ENABLE_SSL_SUPPORT) + add_definitions(-DWITH_SSL=1) + + if (SSL_ENGINE STREQUAL "OpenSSL") + set(AMQP_SSL_SRCS amqp_ssl_socket.h amqp_openssl.c) + include_directories(${OPENSSL_INCLUDE_DIR}) + set(AMQP_SSL_LIBS ${OPENSSL_LIBRARIES}) + + elseif (SSL_ENGINE STREQUAL "cyaSSL") + set(AMQP_SSL_SRCS amqp_ssl_socket.h amqp_cyassl.c) + include_directories(${CYASSL_INCLUDE_DIR}) + set(AMQP_SSL_LIBS ${CYASSL_LIBRARIES}) + + elseif (SSL_ENGINE STREQUAL "GnuTLS") + set(AMQP_SSL_SRCS amqp_ssl_socket.h amqp_gnutls.c) + include_directories(${GNUTLS_INCLUDE_DIR}) + add_definitions(${GNUTLS_DEFINITIONS}) + set(AMQP_SSL_LIBS ${GNUTLS_LIBRARIES}) + + elseif (SSL_ENGINE STREQUAL "PolarSSL") + set(AMQP_SSL_SRCS amqp_ssl_socket.h amqp_polarssl.c) + include_directories(${POLARSSL_INCLUDE_DIR}) + set(AMQP_SSL_LIBS ${POLARSSL_LIBRARIES}) + + else() + message(FATAL_ERROR "Unknown SSL_ENGINE ${SSL_ENGINE}") + endif() + + if (ENABLE_THREAD_SAFETY) + add_definitions(-DENABLE_THREAD_SAFETY) + if (WIN32) + set(AMQP_SSL_SRCS ${AMQP_SSL_SRCS} win32/threads.h win32/threads.c) + else() + set(AMQP_SSL_SRCS ${AMQP_SSL_SRCS} unix/threads.h) + endif() + endif() +endif() + set(RABBITMQ_SOURCES ${AMQP_FRAMING_H_PATH} ${AMQP_FRAMING_C_PATH} - amqp_api.c amqp.h - amqp_connection.c amqp_mem.c amqp_private.h amqp_socket.c amqp_table.c - amqp_url.c + amqp_api.c amqp.h amqp_connection.c amqp_mem.c amqp_private.h amqp_socket.c + amqp_table.c amqp_url.c amqp_socket.h amqp_tcp_socket.c amqp_tcp_socket.h ${SOCKET_IMPL}/socket.h ${SOCKET_IMPL}/socket.c + ${AMQP_SSL_SRCS} ) add_definitions(-DAMQP_BUILD) @@ -92,6 +130,10 @@ include(InstallMacros) if (BUILD_SHARED_LIBS) add_library(rabbitmq SHARED ${RABBITMQ_SOURCES}) + if (ENABLE_SSL_SUPPORT) + target_link_libraries(rabbitmq ${AMQP_SSL_LIBS} ${CMAKE_THREAD_LIBS_INIT}) + endif() + if (WIN32) set_target_properties(rabbitmq PROPERTIES VERSION ${RMQ_VERSION} OUTPUT_NAME rabbitmq.${RMQ_SOVERSION}) else (WIN32) @@ -141,4 +183,3 @@ install(FILES amqp.h ${AMQP_FRAMING_H_PATH} ${STDINT_H_INSTALL_FILE} ) set(RMQ_LIBRARY_TARGET ${RMQ_LIBRARY_TARGET} PARENT_SCOPE) - diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h index 184ef51..36c1411 100644 --- a/librabbitmq/amqp.h +++ b/librabbitmq/amqp.h @@ -108,6 +108,7 @@ # define AMQP_CALL __cdecl #elif defined(__GNUC__) && __GNUC__ >= 4 +# include <sys/uio.h> # define AMQP_PUBLIC_FUNCTION \ __attribute__ ((visibility ("default"))) # define AMQP_PUBLIC_VARIABLE \ @@ -119,6 +120,35 @@ # define AMQP_CALL #endif +#if __GNUC__ > 3 || (__GNUC__ == 3 && __GNUC_MINOR__ >= 1) +# define AMQP_DEPRECATED(function) \ + function __attribute__ ((__deprecated__)) +#elif defined(_MSC_VER) +# define AMQP_DEPRECATED(function) \ + __declspec(deprecated) function +#else +# define AMQP_DEPRECATED(function) +#endif + +/* Define ssize_t on Win32/64 platforms + See: http://lists.cs.uiuc.edu/pipermail/llvmdev/2010-April/030649.html for details + */ +#if !defined(_W64) +#if !defined(__midl) && (defined(_X86_) || defined(_M_IX86)) && _MSC_VER >= 1300 +#define _W64 __w64 +#else +#define _W64 +#endif +#endif + +#ifdef _MSC_VER +#ifdef _WIN64 +typedef __int64 ssize_t; +#else +typedef _W64 int ssize_t; +#endif +#endif + #include <stddef.h> #include <stdint.h> @@ -297,6 +327,8 @@ typedef enum amqp_sasl_method_enum_ { /* Opaque struct. */ typedef struct amqp_connection_state_t_ *amqp_connection_state_t; +typedef struct amqp_socket_t_ amqp_socket_t; + AMQP_PUBLIC_FUNCTION char const * AMQP_CALL amqp_version(void); @@ -356,9 +388,15 @@ AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_get_sockfd(amqp_connection_state_t state); +AMQP_DEPRECATED( + AMQP_PUBLIC_FUNCTION + void + AMQP_CALL amqp_set_sockfd(amqp_connection_state_t state, int sockfd) +); + AMQP_PUBLIC_FUNCTION void -AMQP_CALL amqp_set_sockfd(amqp_connection_state_t state, int sockfd); +AMQP_CALL amqp_set_socket(amqp_connection_state_t state, amqp_socket_t *socket); AMQP_PUBLIC_FUNCTION int @@ -474,8 +512,8 @@ AMQP_CALL amqp_login(amqp_connection_state_t state, char const *vhost, AMQP_PUBLIC_FUNCTION amqp_rpc_reply_t AMQP_CALL amqp_login_with_properties(amqp_connection_state_t state, char const *vhost, - int channel_max, int frame_max, int heartbeat, - const amqp_table_t *properties, amqp_sasl_method_enum sasl_method, ...); + int channel_max, int frame_max, int heartbeat, + const amqp_table_t *properties, amqp_sasl_method_enum sasl_method, ...); struct amqp_basic_properties_t_; @@ -547,6 +585,7 @@ struct amqp_connection_info { char *host; char *vhost; int port; + amqp_boolean_t ssl; }; AMQP_PUBLIC_FUNCTION @@ -557,6 +596,74 @@ AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_parse_url(char *url, struct amqp_connection_info *parsed); +/* socket API */ + +/** + * Open a socket connection. + * + * This function opens a socket connection returned from amqp_tcp_socket_new() + * or amqp_ssl_socket_new(). This function should be called after setting + * socket options and prior to assigning the socket to an AMQP connection with + * amqp_set_socket(). + * + * \param [in,out] self A socket object. + * \param [in] host Connect to this host. + * \param [in] port Connect on this remote port. + * + * \return Zero upon success, non-zero otherwise. + */ +AMQP_PUBLIC_FUNCTION +int +AMQP_CALL +amqp_socket_open(amqp_socket_t *self, const char *host, int port); + +/** + * Close a socket connection and free resources. + * + * This function closes a socket connection and releases any resources used by + * the object. After calling this function the specified socket should no + * longer be referenced. + * + * \param [in,out] self A socket object. + * + * \return Zero upon success, non-zero otherwise. + */ +AMQP_PUBLIC_FUNCTION +int +AMQP_CALL +amqp_socket_close(amqp_socket_t *self); + +/** + * Retrieve an error code for the last socket operation. + * + * At the time of writing, this interface is not well supported and is subject + * to changes! + * + * \param [in,out] self A socket object. + * + * \return Zero upon success, an opaque error code otherwise + */ +AMQP_PUBLIC_FUNCTION +int +AMQP_CALL +amqp_socket_error(amqp_socket_t *self); + +/** + * Get the socket descriptor in use by a socket object. + * + * Retrieve the underlying socket descriptor. This function can be used to + * perform low-level socket operations that aren't supported by the socket + * interface. Use with caution! + * + * \param [in,out] self A socket object. + * + * \return The underlying socket descriptor. + */ +AMQP_PUBLIC_FUNCTION +int +AMQP_CALL +amqp_socket_get_sockfd(amqp_socket_t *self); + AMQP_END_DECLS #include <amqp_framing.h> diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c index 4a84bd3..6bc8397 100644 --- a/librabbitmq/amqp_api.c +++ b/librabbitmq/amqp_api.c @@ -75,6 +75,11 @@ char *amqp_error_string(int err) case ERROR_CATEGORY_OS: return amqp_os_error_string(err); +#ifdef WITH_SSL + case ERROR_CATEGORY_SSL: + return amqp_ssl_error_string(err); +#endif + default: str = "(undefined error category)"; } diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c index b3196b1..2d50ad3 100644 --- a/librabbitmq/amqp_connection.c +++ b/librabbitmq/amqp_connection.c @@ -38,8 +38,10 @@ #include "config.h" #endif +#include "amqp_tcp_socket.h" #include "amqp_private.h" #include <assert.h> +#include <errno.h> #include <stdint.h> #include <stdio.h> #include <stdlib.h> @@ -89,7 +91,6 @@ amqp_connection_state_t amqp_new_connection(void) is also the minimum frame size */ state->target_size = 8; - state->sockfd = -1; state->sock_inbound_buffer.len = INITIAL_INBOUND_SOCK_BUFFER_SIZE; state->sock_inbound_buffer.bytes = malloc(INITIAL_INBOUND_SOCK_BUFFER_SIZE); if (state->sock_inbound_buffer.bytes == NULL) { @@ -108,13 +109,24 @@ out_nomem: int amqp_get_sockfd(amqp_connection_state_t state) { - return state->sockfd; + return state->socket ? amqp_socket_get_sockfd(state->socket) : -1; } void amqp_set_sockfd(amqp_connection_state_t state, int sockfd) { - state->sockfd = sockfd; + amqp_socket_t *socket = amqp_tcp_socket_new(); + if (!socket) { + amqp_abort("%s", strerror(errno)); + } + amqp_tcp_socket_set_sockfd(socket, sockfd); + amqp_set_socket(state, socket); +} + +void amqp_set_socket(amqp_connection_state_t state, amqp_socket_t *socket) +{ + amqp_socket_close(state->socket); + state->socket = socket; } int amqp_tune_connection(amqp_connection_state_t state, @@ -152,19 +164,18 @@ int amqp_get_channel_max(amqp_connection_state_t state) int amqp_destroy_connection(amqp_connection_state_t state) { - int s = state->sockfd; - - empty_amqp_pool(&state->frame_pool); - empty_amqp_pool(&state->decoding_pool); - free(state->outbound_buffer.bytes); - free(state->sock_inbound_buffer.bytes); - free(state); - - if (s >= 0 && amqp_socket_close(s) < 0) { - return -amqp_socket_error(); - } else { - return 0; + int status = 0; + if (state) { + empty_amqp_pool(&state->frame_pool); + empty_amqp_pool(&state->decoding_pool); + free(state->outbound_buffer.bytes); + free(state->sock_inbound_buffer.bytes); + if (amqp_socket_close(state->socket) < 0) { + status = -amqp_socket_error(state->socket); + } + free(state); } + return status; } static void return_to_idle(amqp_connection_state_t state) @@ -392,7 +403,7 @@ int amqp_send_frame(amqp_connection_state_t state, iov[2].iov_base = &frame_end_byte; iov[2].iov_len = FOOTER_SIZE; - res = amqp_socket_writev(state->sockfd, iov, 3); + res = amqp_socket_writev(state->socket, iov, 3); } else { size_t out_frame_len; amqp_bytes_t encoded; @@ -440,12 +451,13 @@ int amqp_send_frame(amqp_connection_state_t state, amqp_e32(out_frame, 3, out_frame_len); amqp_e8(out_frame, out_frame_len + HEADER_SIZE, AMQP_FRAME_END); - res = send(state->sockfd, out_frame, - out_frame_len + HEADER_SIZE + FOOTER_SIZE, MSG_NOSIGNAL); + res = amqp_socket_send(state->socket, out_frame, + out_frame_len + HEADER_SIZE + FOOTER_SIZE, + MSG_NOSIGNAL); } if (res < 0) { - return -amqp_socket_error(); + return -amqp_socket_error(state->socket); } else { return 0; } diff --git a/librabbitmq/amqp_cyassl.c b/librabbitmq/amqp_cyassl.c new file mode 100644 index 0000000..9657449 --- /dev/null +++ b/librabbitmq/amqp_cyassl.c @@ -0,0 +1,270 @@ +/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */ +/* + * Copyright 2012-2013 Michael Steinert + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "amqp_ssl_socket.h" +#include "amqp_private.h" +#include <cyassl/ssl.h> +#include <stdlib.h> +#include <string.h> + +#ifndef AMQP_USE_UNTESTED_SSL_BACKEND +# error This SSL backend is alpha quality and likely contains errors.\ + -DAMQP_USE_UNTESTED_SSL_BACKEND to use this backend +#endif + +struct amqp_ssl_socket_t { + const struct amqp_socket_class_t *klass; + CYASSL_CTX *ctx; + CYASSL *ssl; + int sockfd; + char *buffer; + size_t length; + int last_error; +}; + +static ssize_t +amqp_ssl_socket_send(void *base, + const void *buf, + size_t len, + AMQP_UNUSED int flags) +{ + int status; + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + + self->last_error = 0; + status = CyaSSL_write(self->ssl, buf, len); + if (status <= 0) { + self->last_error = ERROR_CATEGORY_SSL; + } + + return status; +} + +static ssize_t +amqp_ssl_socket_writev(void *base, + const struct iovec *iov, + int iovcnt) +{ + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + ssize_t written = -1; + char *bufferp; + size_t bytes; + int i; + self->last_error = 0; + bytes = 0; + for (i = 0; i < iovcnt; ++i) { + bytes += iov[i].iov_len; + } + if (self->length < bytes) { + free(self->buffer); + self->buffer = malloc(bytes); + if (!self->buffer) { + self->length = 0; + self->last_error = ERROR_NO_MEMORY; + goto exit; + } + self->length = bytes; + } + bufferp = self->buffer; + for (i = 0; i < iovcnt; ++i) { + memcpy(bufferp, iov[i].iov_base, iov[i].iov_len); + bufferp += iov[i].iov_len; + } + written = amqp_ssl_socket_send(self, self->buffer, bytes, 0); +exit: + return written; +} + +static ssize_t +amqp_ssl_socket_recv(void *base, + void *buf, + size_t len, + AMQP_UNUSED int flags) +{ + int status; + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + + self->last_error = 0; + status = CyaSSL_read(self->ssl, buf, len); + if (status <= 0) { + self->last_error = ERROR_CATEGORY_SSL; + } + + return status; +} + +static int +amqp_ssl_socket_get_sockfd(void *base) +{ + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + return self->sockfd; +} + +static int +amqp_ssl_socket_close(void *base) +{ + int status = -1; + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + if (self->sockfd >= 0) { + status = amqp_os_socket_close(self->sockfd); + } + if (self) { + CyaSSL_free(self->ssl); + CyaSSL_CTX_free(self->ctx); + free(self->buffer); + free(self); + } + return status; +} + +static int +amqp_ssl_socket_error(void *base) +{ + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + return self->last_error; +} + +char * +amqp_ssl_error_string(AMQP_UNUSED int err) +{ + return strdup("A ssl socket error occurred."); +} + +static int +amqp_ssl_socket_open(void *base, const char *host, int port) +{ + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + int status; + self->last_error = 0; + + self->ssl = CyaSSL_new(self->ctx); + if (NULL == self->ssl) { + self->last_error = ERROR_CATEGORY_SSL; + return -1; + } + + self->sockfd = amqp_open_socket(host, port); + if (0 > self->sockfd) { + self->last_error = - self->sockfd; + return -1; + } + CyaSSL_set_fd(self->ssl, self->sockfd); + status = CyaSSL_connect(self->ssl); + if (SSL_SUCCESS != status) { + self->last_error = ERROR_CATEGORY_SSL; + return -1; + } + return 0; +} + +static const struct amqp_socket_class_t amqp_ssl_socket_class = { + amqp_ssl_socket_writev, /* writev */ + amqp_ssl_socket_send, /* send */ + amqp_ssl_socket_recv, /* recv */ + amqp_ssl_socket_open, /* open */ + amqp_ssl_socket_close, /* close */ + amqp_ssl_socket_error, /* error */ + amqp_ssl_socket_get_sockfd /* get_sockfd */ +}; + +amqp_socket_t * +amqp_ssl_socket_new(void) +{ + struct amqp_ssl_socket_t *self = calloc(1, sizeof(*self)); + if (!self) { + goto error; + } + CyaSSL_Init(); + self->ctx = CyaSSL_CTX_new(CyaSSLv23_client_method()); + if (!self->ctx) { + goto error; + } + self->klass = &amqp_ssl_socket_class; + return (amqp_socket_t *)self; +error: + amqp_socket_close((amqp_socket_t *)self); + return NULL; +} + +int +amqp_ssl_socket_set_cacert(amqp_socket_t *base, + const char *cacert) +{ + int status; + struct amqp_ssl_socket_t *self; + if (base->klass != &amqp_ssl_socket_class) { + amqp_abort("<%p> is not of type amqp_ssl_socket_t", base); + } + self = (struct amqp_ssl_socket_t *)base; + status = CyaSSL_CTX_load_verify_locations(self->ctx, cacert, NULL); + if (SSL_SUCCESS != status) { + return -1; + } + return 0; +} + +int +amqp_ssl_socket_set_key(amqp_socket_t *base, + const char *cert, + const char *key) +{ + int status; + struct amqp_ssl_socket_t *self; + if (base->klass != &amqp_ssl_socket_class) { + amqp_abort("<%p> is not of type amqp_ssl_socket_t", base); + } + self = (struct amqp_ssl_socket_t *)base; + status = CyaSSL_CTX_use_PrivateKey_file(self->ctx, key, + SSL_FILETYPE_PEM); + if (SSL_SUCCESS != status) { + return -1; + } + status = CyaSSL_CTX_use_certificate_chain_file(self->ctx, cert); + return 0; +} + +int +amqp_ssl_socket_set_key_buffer(AMQP_UNUSED amqp_socket_t *base, + AMQP_UNUSED const char *cert, + AMQP_UNUSED const void *key, + AMQP_UNUSED size_t n) +{ + amqp_abort("%s is not implemented for CyaSSL", __func__); + return -1; +} + +void +amqp_ssl_socket_set_verify(AMQP_UNUSED amqp_socket_t *base, + AMQP_UNUSED amqp_boolean_t verify) +{ + /* noop for CyaSSL */ +} + +void +amqp_set_initialize_ssl_library(AMQP_UNUSED amqp_boolean_t do_initialize) +{ +} diff --git a/librabbitmq/amqp_gnutls.c b/librabbitmq/amqp_gnutls.c new file mode 100644 index 0000000..f05fcb8 --- /dev/null +++ b/librabbitmq/amqp_gnutls.c @@ -0,0 +1,362 @@ +/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */ +/* + * Copyright 2012-2013 Michael Steinert + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "amqp_ssl_socket.h" +#include "amqp_private.h" +#include <gnutls/gnutls.h> +#include <gnutls/x509.h> +#include <stdlib.h> +#include <string.h> + +#ifndef AMQP_USE_UNTESTED_SSL_BACKEND +# error This SSL backend is alpha quality and likely contains errors.\ + -DAMQP_USE_UNTESTED_SSL_BACKEND to use this backend +#endif + +struct amqp_ssl_socket_t { + const struct amqp_socket_class_t *klass; + gnutls_session_t session; + gnutls_certificate_credentials_t credentials; + int sockfd; + char *host; + char *buffer; + size_t length; + int last_error; +}; + +static ssize_t +amqp_ssl_socket_send(void *base, + const void *buf, + size_t len, + AMQP_UNUSED int flags) +{ + ssize_t status; + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + + self->last_error = 0; + status = gnutls_record_send(self->session, buf, len); + if (status < 0) { + self->last_error = ERROR_CATEGORY_SSL; + } + return status; +} + +static ssize_t +amqp_ssl_socket_writev(void *base, + const struct iovec *iov, + int iovcnt) +{ + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + ssize_t written = -1; + char *bufferp; + size_t bytes; + int i; + self->last_error = 0; + bytes = 0; + for (i = 0; i < iovcnt; ++i) { + bytes += iov[i].iov_len; + } + if (self->length < bytes) { + free(self->buffer); + self->buffer = malloc(bytes); + if (!self->buffer) { + self->length = 0; + self->last_error = ERROR_NO_MEMORY; + goto exit; + } + self->length = 0; + } + bufferp = self->buffer; + for (i = 0; i < iovcnt; ++i) { + memcpy(bufferp, iov[i].iov_base, iov[i].iov_len); + bufferp += iov[i].iov_len; + } + written = amqp_ssl_socket_send(self, self->buffer, bytes, 0); +exit: + return written; +} + +static ssize_t +amqp_ssl_socket_recv(void *base, + void *buf, + size_t len, + AMQP_UNUSED int flags) +{ + ssize_t status; + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + + self->last_error = 0; + status = gnutls_record_recv(self->session, buf, len); + if (status < 0) { + self->last_error = ERROR_CATEGORY_SSL; + } + + return status; +} + +static int +amqp_ssl_socket_open(void *base, const char *host, int port) +{ + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + int status; + self->last_error = 0; + + free(self->host); + self->host = strdup(host); + if (NULL == self->host) { + self->last_error = ERROR_NO_MEMORY; + return -1; + } + + self->sockfd = amqp_open_socket(host, port); + if (0 > self->sockfd) { + self->last_error = -self->sockfd; + return -1; + } + gnutls_transport_set_ptr(self->session, + (gnutls_transport_ptr_t)self->sockfd); + do { + status = gnutls_handshake(self->session); + } while (status < 0 && !gnutls_error_is_fatal(status)); + + if (gnutls_error_is_fatal(status)) { + self->last_error = ERROR_CATEGORY_SSL; + } + + return status; +} + +static int +amqp_ssl_socket_close(void *base) +{ + int status = -1; + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + if (self->sockfd >= 0) { + status = amqp_os_socket_close(self->sockfd); + } + if (self) { + gnutls_deinit(self->session); + gnutls_certificate_free_credentials(self->credentials); + free(self->host); + free(self->buffer); + free(self); + } + return status; +} + +static int +amqp_ssl_socket_error(void *base) +{ + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + return self->last_error; +} + +char * +amqp_ssl_error_string(AMQP_UNUSED int err) +{ + return strdup("A SSL error occurred"); +} + +static int +amqp_ssl_socket_get_sockfd(void *base) +{ + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + return self->sockfd; +} + +static int +amqp_ssl_verify(gnutls_session_t session) +{ + int ret; + unsigned int status, size; + const gnutls_datum_t *list; + gnutls_x509_crt_t cert = NULL; + struct amqp_ssl_socket_t *self = gnutls_session_get_ptr(session); + ret = gnutls_certificate_verify_peers2(session, &status); + if (0 > ret) { + goto error; + } + if (status & GNUTLS_CERT_INVALID) { + goto error; + } + if (status & GNUTLS_CERT_SIGNER_NOT_FOUND) { + goto error; + } + if (status & GNUTLS_CERT_REVOKED) { + goto error; + } + if (status & GNUTLS_CERT_EXPIRED) { + goto error; + } + if (status & GNUTLS_CERT_NOT_ACTIVATED) { + goto error; + } + if (gnutls_certificate_type_get(session) != GNUTLS_CRT_X509) { + goto error; + } + if (gnutls_x509_crt_init(&cert) < 0) { + goto error; + } + list = gnutls_certificate_get_peers(session, &size); + if (!list) { + goto error; + } + ret = gnutls_x509_crt_import(cert, &list[0], GNUTLS_X509_FMT_DER); + if (0 > ret) { + goto error; + } + if (!gnutls_x509_crt_check_hostname(cert, self->host)) { + goto error; + } + gnutls_x509_crt_deinit(cert); + return 0; +error: + if (cert) { + gnutls_x509_crt_deinit (cert); + } + return GNUTLS_E_CERTIFICATE_ERROR; +} + +static const struct amqp_socket_class_t amqp_ssl_socket_class = { + amqp_ssl_socket_writev, /* writev */ + amqp_ssl_socket_send, /* send */ + amqp_ssl_socket_recv, /* recv */ + amqp_ssl_socket_open, /* open */ + amqp_ssl_socket_close, /* close */ + amqp_ssl_socket_error, /* error */ + amqp_ssl_socket_get_sockfd /* get_sockfd */ +}; + +amqp_socket_t * +amqp_ssl_socket_new(void) +{ + struct amqp_ssl_socket_t *self = calloc(1, sizeof(*self)); + const char *error; + int status; + if (!self) { + goto error; + } + gnutls_global_init(); + status = gnutls_init(&self->session, GNUTLS_CLIENT); + if (GNUTLS_E_SUCCESS != status) { + goto error; + } + status = gnutls_certificate_allocate_credentials(&self->credentials); + if (GNUTLS_E_SUCCESS != status) { + goto error; + } + gnutls_certificate_set_verify_function(self->credentials, + amqp_ssl_verify); + status = gnutls_credentials_set(self->session, GNUTLS_CRD_CERTIFICATE, + self->credentials); + if (GNUTLS_E_SUCCESS != status) { + goto error; + } + gnutls_session_set_ptr(self->session, self); + status = gnutls_priority_set_direct(self->session, "NORMAL", &error); + if (GNUTLS_E_SUCCESS != status) { + goto error; + } + self->klass = &amqp_ssl_socket_class; + return (amqp_socket_t *)self; +error: + amqp_socket_close((amqp_socket_t *)self); + return NULL; +} + +int +amqp_ssl_socket_set_cacert(amqp_socket_t *base, + const char *cacert) +{ + int status; + struct amqp_ssl_socket_t *self; + if (base->klass != &amqp_ssl_socket_class) { + amqp_abort("<%p> is not of type amqp_ssl_socket_t", base); + } + self = (struct amqp_ssl_socket_t *)base; + status = gnutls_certificate_set_x509_trust_file(self->credentials, + cacert, + GNUTLS_X509_FMT_PEM); + if (0 > status) { + return -1; + } + return 0; +} + +int +amqp_ssl_socket_set_key(amqp_socket_t *base, + const char *cert, + const char *key) +{ + int status; + struct amqp_ssl_socket_t *self; + if (base->klass != &amqp_ssl_socket_class) { + amqp_abort("<%p> is not of type amqp_ssl_socket_t", base); + } + self = (struct amqp_ssl_socket_t *)base; + status = gnutls_certificate_set_x509_key_file(self->credentials, + cert, + key, + GNUTLS_X509_FMT_PEM); + if (0 > status) { + return -1; + } + return 0; +} + +int +amqp_ssl_socket_set_key_buffer(AMQP_UNUSED amqp_socket_t *base, + AMQP_UNUSED const char *cert, + AMQP_UNUSED const void *key, + AMQP_UNUSED size_t n) +{ + amqp_abort("%s is not implemented for GnuTLS", __func__); + return -1; +} + +void +amqp_ssl_socket_set_verify(amqp_socket_t *base, + amqp_boolean_t verify) +{ + struct amqp_ssl_socket_t *self; + if (base->klass != &amqp_ssl_socket_class) { + amqp_abort("<%p> is not of type amqp_ssl_socket_t", base); + } + self = (struct amqp_ssl_socket_t *)base; + if (verify) { + gnutls_certificate_set_verify_function(self->credentials, + amqp_ssl_verify); + } else { + gnutls_certificate_set_verify_function(self->credentials, + NULL); + } +} + +void +amqp_set_initialize_ssl_library(AMQP_UNUSED amqp_boolean_t do_initialize) +{ +} diff --git a/librabbitmq/amqp_openssl.c b/librabbitmq/amqp_openssl.c new file mode 100644 index 0000000..301a10d --- /dev/null +++ b/librabbitmq/amqp_openssl.c @@ -0,0 +1,573 @@ +/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */ +/* + * Copyright 2012-2013 Michael Steinert + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "amqp_ssl_socket.h" +#include "amqp_private.h" +#include "threads.h" +#include <ctype.h> +#include <openssl/conf.h> +#include <openssl/err.h> +#include <openssl/ssl.h> +#include <stdlib.h> +#include <string.h> + +#include "socket.h" + +static int initialize_openssl(void); +static int destroy_openssl(void); + +static int open_ssl_connections = 0; +static amqp_boolean_t do_initialize_openssl = 1; +static amqp_boolean_t openssl_initialized = 0; + +#ifdef ENABLE_THREAD_SAFETY +static unsigned long amqp_ssl_threadid_callback(void); +static void amqp_ssl_locking_callback(int mode, int n, const char *file, int line); + +#ifdef _WIN32 +static long win32_create_mutex = 0; +static pthread_mutex_t openssl_init_mutex = NULL; +#else +static pthread_mutex_t openssl_init_mutex = PTHREAD_MUTEX_INITIALIZER; +#endif +static pthread_mutex_t *amqp_openssl_lockarray = NULL; +#endif /* ENABLE_THREAD_SAFETY */ + +struct amqp_ssl_socket_t { + const struct amqp_socket_class_t *klass; + SSL_CTX *ctx; + int sockfd; + SSL *ssl; + char *buffer; + size_t length; + amqp_boolean_t verify; + int last_error; +}; + +static ssize_t +amqp_ssl_socket_send(void *base, + const void *buf, + size_t len, + AMQP_UNUSED int flags) +{ + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + ssize_t sent; + ERR_clear_error(); + self->last_error = 0; + sent = SSL_write(self->ssl, buf, len); + if (0 > sent) { + self->last_error = ERROR_CATEGORY_SSL; + switch (SSL_get_error(self->ssl, sent)) { + case SSL_ERROR_NONE: + case SSL_ERROR_ZERO_RETURN: + case SSL_ERROR_WANT_READ: + case SSL_ERROR_WANT_WRITE: + sent = 0; + break; + } + } + return sent; +} + +static ssize_t +amqp_ssl_socket_writev(void *base, + const struct iovec *iov, + int iovcnt) +{ + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + ssize_t written = -1; + char *bufferp; + size_t bytes; + int i; + self->last_error = 0; + bytes = 0; + for (i = 0; i < iovcnt; ++i) { + bytes += iov[i].iov_len; + } + if (self->length < bytes) { + free(self->buffer); + self->buffer = malloc(bytes); + if (!self->buffer) { + self->length = 0; + self->last_error = ERROR_NO_MEMORY; + goto exit; + } + self->length = bytes; + } + bufferp = self->buffer; + for (i = 0; i < iovcnt; ++i) { + memcpy(bufferp, iov[i].iov_base, iov[i].iov_len); + bufferp += iov[i].iov_len; + } + written = amqp_ssl_socket_send(self, self->buffer, bytes, 0); +exit: + return written; +} + +static ssize_t +amqp_ssl_socket_recv(void *base, + void *buf, + size_t len, + AMQP_UNUSED int flags) +{ + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + ssize_t received; + ERR_clear_error(); + self->last_error = 0; + received = SSL_read(self->ssl, buf, len); + if (0 > received) { + self->last_error = ERROR_CATEGORY_SSL; + switch(SSL_get_error(self->ssl, received)) { + case SSL_ERROR_WANT_READ: + case SSL_ERROR_WANT_WRITE: + received = 0; + break; + } + } + return received; +} + +static int +amqp_ssl_socket_verify(void *base, const char *host) +{ + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + unsigned char *utf8_value = NULL, *cp, ch; + int pos, utf8_length, status = 0; + ASN1_STRING *entry_string; + X509_NAME_ENTRY *entry; + X509_NAME *name; + X509 *peer; + peer = SSL_get_peer_certificate(self->ssl); + if (!peer) { + goto error; + } + name = X509_get_subject_name(peer); + if (!name) { + goto error; + } + pos = X509_NAME_get_index_by_NID(name, NID_commonName, -1); + if (0 > pos) { + goto error; + } + entry = X509_NAME_get_entry(name, pos); + if (!entry) { + goto error; + } + entry_string = X509_NAME_ENTRY_get_data(entry); + if (!entry_string) { + goto error; + } + utf8_length = ASN1_STRING_to_UTF8(&utf8_value, entry_string); + if (0 > utf8_length) { + goto error; + } + while (utf8_length > 0 && utf8_value[utf8_length - 1] == 0) { + --utf8_length; + } + if (utf8_length >= 256) { + goto error; + } + if ((size_t)utf8_length != strlen((char *)utf8_value)) { + goto error; + } + for (cp = utf8_value; (ch = *cp) != '\0'; ++cp) { + if (isascii(ch) && !isprint(ch)) { + goto error; + } + } +#ifdef _MSC_VER +#define strcasecmp _stricmp +#endif + if (strcasecmp(host, (char *)utf8_value)) { + goto error; + } +#ifdef _MSC_VER +#undef strcasecmp +#endif +exit: + OPENSSL_free(utf8_value); + return status; +error: + status = -1; + goto exit; +} + +static int +amqp_ssl_socket_open(void *base, const char *host, int port) +{ + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + long result; + int status; + self->last_error = 0; + self->ssl = SSL_new(self->ctx); + if (!self->ssl) { + self->last_error = ERROR_CATEGORY_SSL; + return -1; + } + SSL_set_mode(self->ssl, SSL_MODE_AUTO_RETRY); + self->sockfd = amqp_open_socket(host, port); + if (0 > self->sockfd) { + self->last_error = -self->sockfd; + return -1; + } + status = SSL_set_fd(self->ssl, self->sockfd); + if (!status) { + self->last_error = ERROR_CATEGORY_SSL; + return -1; + } + status = SSL_connect(self->ssl); + if (!status) { + self->last_error = ERROR_CATEGORY_SSL; + return -1; + } + result = SSL_get_verify_result(self->ssl); + if (X509_V_OK != result) { + self->last_error = ERROR_CATEGORY_SSL; + return -1; + } + if (self->verify) { + int status = amqp_ssl_socket_verify(self, host); + if (status) { + self->last_error = ERROR_CATEGORY_SSL; + return -1; + } + } + return 0; +} + +static int +amqp_ssl_socket_close(void *base) +{ + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + if (self) { + SSL_free(self->ssl); + amqp_os_socket_close(self->sockfd); + SSL_CTX_free(self->ctx); + free(self->buffer); + free(self); + } + destroy_openssl(); + return 0; +} + +static int +amqp_ssl_socket_error(void *base) +{ + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + return self->last_error; +} + +char * +amqp_ssl_error_string(AMQP_UNUSED int err) +{ + return strdup("A ssl socket error occurred."); +} + +static int +amqp_ssl_socket_get_sockfd(void *base) +{ + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + return self->sockfd; +} + +static const struct amqp_socket_class_t amqp_ssl_socket_class = { + amqp_ssl_socket_writev, /* writev */ + amqp_ssl_socket_send, /* send */ + amqp_ssl_socket_recv, /* recv */ + amqp_ssl_socket_open, /* open */ + amqp_ssl_socket_close, /* close */ + amqp_ssl_socket_error, /* error */ + amqp_ssl_socket_get_sockfd /* get_sockfd */ +}; + +amqp_socket_t * +amqp_ssl_socket_new(void) +{ + struct amqp_ssl_socket_t *self = calloc(1, sizeof(*self)); + int status; + if (!self) { + goto error; + } + status = initialize_openssl(); + if (status) { + goto error; + } + self->ctx = SSL_CTX_new(SSLv23_client_method()); + if (!self->ctx) { + goto error; + } + self->klass = &amqp_ssl_socket_class; + self->verify = 1; + return (amqp_socket_t *)self; +error: + amqp_socket_close((amqp_socket_t *)self); + return NULL; +} + +int +amqp_ssl_socket_set_cacert(amqp_socket_t *base, + const char *cacert) +{ + int status; + struct amqp_ssl_socket_t *self; + if (base->klass != &amqp_ssl_socket_class) { + amqp_abort("<%p> is not of type amqp_ssl_socket_t", base); + } + self = (struct amqp_ssl_socket_t *)base; + status = SSL_CTX_load_verify_locations(self->ctx, cacert, NULL); + if (1 != status) { + return -1; + } + return 0; +} + +int +amqp_ssl_socket_set_key(amqp_socket_t *base, + const char *cert, + const char *key) +{ + int status; + struct amqp_ssl_socket_t *self; + if (base->klass != &amqp_ssl_socket_class) { + amqp_abort("<%p> is not of type amqp_ssl_socket_t", base); + } + self = (struct amqp_ssl_socket_t *)base; + status = SSL_CTX_use_certificate_chain_file(self->ctx, cert); + if (1 != status) { + return -1; + } + status = SSL_CTX_use_PrivateKey_file(self->ctx, key, + SSL_FILETYPE_PEM); + if (1 != status) { + return -1; + } + return 0; +} + +static int +password_cb(AMQP_UNUSED char *buffer, + AMQP_UNUSED int length, + AMQP_UNUSED int rwflag, + AMQP_UNUSED void *user_data) +{ + amqp_abort("rabbitmq-c does not support password protected keys"); + return 0; +} + +int +amqp_ssl_socket_set_key_buffer(amqp_socket_t *base, + const char *cert, + const void *key, + size_t n) +{ + int status = 0; + BIO *buf = NULL; + RSA *rsa = NULL; + struct amqp_ssl_socket_t *self; + if (base->klass != &amqp_ssl_socket_class) { + amqp_abort("<%p> is not of type amqp_ssl_socket_t", base); + } + self = (struct amqp_ssl_socket_t *)base; + status = SSL_CTX_use_certificate_chain_file(self->ctx, cert); + if (1 != status) { + return -1; + } + buf = BIO_new_mem_buf((void *)key, n); + if (!buf) { + goto error; + } + rsa = PEM_read_bio_RSAPrivateKey(buf, NULL, password_cb, NULL); + if (!rsa) { + goto error; + } + status = SSL_CTX_use_RSAPrivateKey(self->ctx, rsa); + if (1 != status) { + goto error; + } +exit: + BIO_vfree(buf); + RSA_free(rsa); + return status; +error: + status = -1; + goto exit; +} + +int +amqp_ssl_socket_set_cert(amqp_socket_t *base, + const char *cert) +{ + int status; + struct amqp_ssl_socket_t *self; + if (base->klass != &amqp_ssl_socket_class) { + amqp_abort("<%p> is not of type amqp_ssl_socket_t", base); + } + self = (struct amqp_ssl_socket_t *)base; + status = SSL_CTX_use_certificate_chain_file(self->ctx, cert); + if (1 != status) { + return -1; + } + return 0; +} + +void +amqp_ssl_socket_set_verify(amqp_socket_t *base, + amqp_boolean_t verify) +{ + struct amqp_ssl_socket_t *self; + if (base->klass != &amqp_ssl_socket_class) { + amqp_abort("<%p> is not of type amqp_ssl_socket_t", base); + } + self = (struct amqp_ssl_socket_t *)base; + self->verify = verify; +} + +void +amqp_set_initialize_ssl_library(amqp_boolean_t do_initialize) +{ + if (!openssl_initialized) { + do_initialize_openssl = do_initialize; + } +} + +#ifdef ENABLE_THREAD_SAFETY +unsigned long +amqp_ssl_threadid_callback(void) +{ + return (unsigned long)pthread_self(); +} + +void +amqp_ssl_locking_callback(int mode, int n, + AMQP_UNUSED const char *file, + AMQP_UNUSED int line) +{ + if (mode & CRYPTO_LOCK) { + if (pthread_mutex_lock(&amqp_openssl_lockarray[n])) { + amqp_abort("Runtime error: Failure in trying to lock OpenSSL mutex"); + } + } else { + if (pthread_mutex_unlock(&amqp_openssl_lockarray[n])) { + amqp_abort("Runtime error: Failure in trying to unlock OpenSSL mutex"); + } + } +} +#endif /* ENABLE_THREAD_SAFETY */ + +static int +initialize_openssl(void) +{ +#ifdef _WIN32 + /* No such thing as PTHREAD_INITIALIZE_MUTEX macro on Win32, so we use this */ + if (NULL == openssl_init_mutex) { + while (InterlockedExchange(&win32_create_mutex, 1) == 1) + /* Loop, someone else is holding this lock */ ; + + if (NULL == openssl_init_mutex) { + if (pthread_mutex_init(&openssl_init_mutex, NULL)) { + return -1; + } + } + InterlockedExchange(&win32_create_mutex, 0); + } +#endif /* _WIN32 */ + +#ifdef ENABLE_THREAD_SAFETY + if (pthread_mutex_lock(&openssl_init_mutex)) { + return -1; + } +#endif /* ENABLE_THREAD_SAFETY */ + if (do_initialize_openssl) { +#ifdef ENABLE_THREAD_SAFETY + if (NULL == amqp_openssl_lockarray) { + int i = 0; + amqp_openssl_lockarray = calloc(CRYPTO_num_locks(), sizeof(pthread_mutex_t)); + if (!amqp_openssl_lockarray) { + pthread_mutex_unlock(&openssl_init_mutex); + return -1; + } + for (i = 0; i < CRYPTO_num_locks(); ++i) { + if (pthread_mutex_init(&amqp_openssl_lockarray[i], NULL)) { + free(amqp_openssl_lockarray); + amqp_openssl_lockarray = NULL; + pthread_mutex_unlock(&openssl_init_mutex); + return -1; + } + } + } + + if (0 == open_ssl_connections) { + CRYPTO_set_id_callback(amqp_ssl_threadid_callback); + CRYPTO_set_locking_callback(amqp_ssl_locking_callback); + } +#endif /* ENABLE_THREAD_SAFETY */ + + if (!openssl_initialized) { + OPENSSL_config(NULL); + + SSL_library_init(); + SSL_load_error_strings(); + + openssl_initialized = 1; + } + } + + ++open_ssl_connections; + +#ifdef ENABLE_THREAD_SAFETY + pthread_mutex_unlock(&openssl_init_mutex); +#endif /* ENABLE_THREAD_SAFETY */ + return 0; +} + +static int +destroy_openssl(void) +{ +#ifdef ENABLE_THREAD_SAFETY + if (pthread_mutex_lock(&openssl_init_mutex)) { + return -1; + } +#endif /* ENABLE_THREAD_SAFETY */ + + if (open_ssl_connections > 0) { + --open_ssl_connections; + } + +#ifdef ENABLE_THREAD_SAFETY + if (0 == open_ssl_connections && do_initialize_openssl) { + /* Unsetting these allows the rabbitmq-c library to be unloaded + * safely. We do leak the amqp_openssl_lockarray. Which is only + * an issue if you repeatedly unload and load the library + */ + CRYPTO_set_locking_callback(NULL); + CRYPTO_set_id_callback(NULL); + } + + pthread_mutex_unlock(&openssl_init_mutex); +#endif /* ENABLE_THREAD_SAFETY */ + return 0; +} diff --git a/librabbitmq/amqp_polarssl.c b/librabbitmq/amqp_polarssl.c new file mode 100644 index 0000000..56c45c6 --- /dev/null +++ b/librabbitmq/amqp_polarssl.c @@ -0,0 +1,354 @@ +/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */ +/* + * Copyright 2012-2013 Michael Steinert + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "amqp_ssl_socket.h" +#include "amqp_private.h" +#include <polarssl/ctr_drbg.h> +#include <polarssl/entropy.h> +#include <polarssl/net.h> +#include <polarssl/ssl.h> +#include <polarssl/version.h> +#include <stdlib.h> +#include <string.h> + +#ifndef AMQP_USE_UNTESTED_SSL_BACKEND +# error This SSL backend is alpha quality and likely contains errors.\ + -DAMQP_USE_UNTESTED_SSL_BACKEND to use this backend +#endif + +struct amqp_ssl_socket_t { + const struct amqp_socket_class_t *klass; + int sockfd; + entropy_context *entropy; + ctr_drbg_context *ctr_drbg; + x509_cert *cacert; + rsa_context *key; + x509_cert *cert; + ssl_context *ssl; + ssl_session *session; + char *buffer; + size_t length; + int last_error; +}; + +static ssize_t +amqp_ssl_socket_send(void *base, + const void *buf, + size_t len, + AMQP_UNUSED int flags) +{ + ssize_t status; + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + + self->last_error = 0; + status = ssl_write(self->ssl, buf, len); + if (status < 0) { + self->last_error = ERROR_CATEGORY_SSL; + } + + return status; +} + +static ssize_t +amqp_ssl_socket_writev(void *base, + const struct iovec *iov, + int iovcnt) +{ + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + ssize_t written = -1; + char *bufferp; + size_t bytes; + int i; + self->last_error = 0; + bytes = 0; + for (i = 0; i < iovcnt; ++i) { + bytes += iov[i].iov_len; + } + if (self->length < bytes) { + free(self->buffer); + self->buffer = malloc(bytes); + if (!self->buffer) { + self->length = 0; + self->last_error = ERROR_NO_MEMORY; + goto exit; + } + self->length = bytes; + } + bufferp = self->buffer; + for (i = 0; i < iovcnt; ++i) { + memcpy(bufferp, iov[i].iov_base, iov[i].iov_len); + bufferp += iov[i].iov_len; + } + written = amqp_ssl_socket_send(self, (const unsigned char *)self->buffer, + bytes, 0); +exit: + return written; +} + +static ssize_t +amqp_ssl_socket_recv(void *base, + void *buf, + size_t len, + AMQP_UNUSED int flags) +{ + ssize_t status; + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + + self->last_error = 0; + status = ssl_read(self->ssl, buf, len); + if (status < 0) { + self->last_error = ERROR_CATEGORY_SSL; + } + + return status; +} + +static int +amqp_ssl_socket_open(void *base, const char *host, int port) +{ + int status; + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + self->last_error = 0; + + status = net_connect(&self->sockfd, host, port); + if (status) { + /* This isn't quite right. We should probably translate between + * POLARSSL_ERR_* to our internal error codes + */ + self->last_error = ERROR_CATEGORY_SSL; + return -1; + } + if (self->cacert) { + ssl_set_ca_chain(self->ssl, self->cacert, NULL, host); + } + ssl_set_bio(self->ssl, net_recv, &self->sockfd, + net_send, &self->sockfd); + if (self->key && self->cert) { + ssl_set_own_cert(self->ssl, self->cert, self->key); + } + while (0 != (status = ssl_handshake(self->ssl))) { + switch (status) { + case POLARSSL_ERR_NET_WANT_READ: + case POLARSSL_ERR_NET_WANT_WRITE: + continue; + default: + self->last_error = ERROR_CATEGORY_SSL; + break; + } + } + return status; +} + +static int +amqp_ssl_socket_close(void *base) +{ + int status = -1; + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + if (self) { + free(self->entropy); + free(self->ctr_drbg); + x509_free(self->cacert); + free(self->cacert); + rsa_free(self->key); + free(self->key); + x509_free(self->cert); + free(self->cert); + ssl_free(self->ssl); + free(self->ssl); + free(self->session); + free(self->buffer); + if (self->sockfd >= 0) { + net_close(self->sockfd); + status = 0; + } + free(self); + } + return status; +} + +static int +amqp_ssl_socket_error(AMQP_UNUSED void *user_data) +{ + return ERROR_CATEGORY_SSL; +} + +char * +amqp_ssl_error_string(AMQP_UNUSED int err) +{ + return strdup("A SSL socket error occurred"); +} + +static int +amqp_ssl_socket_get_sockfd(void *base) +{ + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + return self->sockfd; +} + +static const struct amqp_socket_class_t amqp_ssl_socket_class = { + amqp_ssl_socket_writev, /* writev */ + amqp_ssl_socket_send, /* send */ + amqp_ssl_socket_recv, /* recv */ + amqp_ssl_socket_open, /* open */ + amqp_ssl_socket_close, /* close */ + amqp_ssl_socket_error, /* error */ + amqp_ssl_socket_get_sockfd /* get_sockfd */ +}; + +amqp_socket_t * +amqp_ssl_socket_new(void) +{ + struct amqp_ssl_socket_t *self = calloc(1, sizeof(*self)); + int status; + if (!self) { + goto error; + } + self->entropy = calloc(1, sizeof(*self->entropy)); + if (!self->entropy) { + goto error; + } + self->sockfd = -1; + entropy_init(self->entropy); + self->ctr_drbg = calloc(1, sizeof(*self->ctr_drbg)); + if (!self->ctr_drbg) { + goto error; + } + status = ctr_drbg_init(self->ctr_drbg, entropy_func, self->entropy, + NULL, 0); + if (status) { + goto error; + } + self->ssl = calloc(1, sizeof(*self->ssl)); + if (!self->ssl) { + goto error; + } + status = ssl_init(self->ssl); + if (status) { + goto error; + } + ssl_set_endpoint(self->ssl, SSL_IS_CLIENT); + ssl_set_rng(self->ssl, ctr_drbg_random, self->ctr_drbg); + ssl_set_ciphersuites(self->ssl, ssl_default_ciphersuites); + ssl_set_authmode(self->ssl, SSL_VERIFY_REQUIRED); + self->session = calloc(1, sizeof(*self->session)); + if (!self->session) { + goto error; + } +#if POLARSSL_VERSION_NUMBER >= 0x01020000 + ssl_set_session(self->ssl, self->session); +#else + ssl_set_session(self->ssl, 0, 0, self->session); +#endif + + self->klass = &amqp_ssl_socket_class; + return (amqp_socket_t *)self; +error: + amqp_socket_close((amqp_socket_t *)self); + return NULL; +} + +int +amqp_ssl_socket_set_cacert(amqp_socket_t *base, + const char *cacert) +{ + int status; + struct amqp_ssl_socket_t *self; + if (base->klass != &amqp_ssl_socket_class) { + amqp_abort("<%p> is not of type amqp_ssl_socket_t", base); + } + self = (struct amqp_ssl_socket_t *)base; + self->cacert = calloc(1, sizeof(*self->cacert)); + if (!self->cacert) { + return -1; + } + status = x509parse_crtfile(self->cacert, cacert); + if (status) { + return -1; + } + return 0; +} + +int +amqp_ssl_socket_set_key(amqp_socket_t *base, + const char *cert, + const char *key) +{ + int status; + struct amqp_ssl_socket_t *self; + if (base->klass != &amqp_ssl_socket_class) { + amqp_abort("<%p> is not of type amqp_ssl_socket_t", base); + } + self = (struct amqp_ssl_socket_t *)base; + self->key = calloc(1, sizeof(*self->key)); + if (!self->key) { + return -1; + } + status = x509parse_keyfile(self->key, key, NULL); + if (status) { + return -1; + } + self->cert = calloc(1, sizeof(*self->cert)); + if (!self->cert) { + return -1; + } + status = x509parse_crtfile(self->cert, cert); + if (status) { + return -1; + } + return 0; +} + +int +amqp_ssl_socket_set_key_buffer(AMQP_UNUSED amqp_socket_t *base, + AMQP_UNUSED const char *cert, + AMQP_UNUSED const void *key, + AMQP_UNUSED size_t n) +{ + amqp_abort("%s is not implemented for PolarSSL", __func__); + return -1; +} + +void +amqp_ssl_socket_set_verify(amqp_socket_t *base, + amqp_boolean_t verify) +{ + struct amqp_ssl_socket_t *self; + if (base->klass != &amqp_ssl_socket_class) { + amqp_abort("<%p> is not of type amqp_ssl_socket_t", base); + } + self = (struct amqp_ssl_socket_t *)base; + if (verify) { + ssl_set_authmode(self->ssl, SSL_VERIFY_REQUIRED); + } else { + ssl_set_authmode(self->ssl, SSL_VERIFY_NONE); + } +} + +void +amqp_set_initialize_ssl_library(AMQP_UNUSED amqp_boolean_t do_initialize) +{ +} diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h index 68239d1..a33205e 100644 --- a/librabbitmq/amqp_private.h +++ b/librabbitmq/amqp_private.h @@ -45,16 +45,20 @@ #include "amqp_framing.h" #include <string.h> +#ifdef HAVE_ARPA_INET_H +#include <arpa/inet.h> +#endif + /* Error numbering: Because of differences in error numbering on * different platforms, we want to keep error numbers opaque for * client code. Internally, we encode the category of an error * (i.e. where its number comes from) in the top bits of the number * (assuming that an int has at least 32 bits). */ -#define ERROR_CATEGORY_MASK (1 << 29) - #define ERROR_CATEGORY_CLIENT (0 << 29) /* librabbitmq error codes */ #define ERROR_CATEGORY_OS (1 << 29) /* OS-specific error codes */ +#define ERROR_CATEGORY_SSL (1 << 28) /* SSL-specific error codes */ +#define ERROR_CATEGORY_MASK (ERROR_CATEGORY_OS | ERROR_CATEGORY_SSL) /* librabbitmq error codes */ #define ERROR_NO_MEMORY 1 @@ -71,8 +75,11 @@ #if __GNUC__ > 2 | (__GNUC__ == 2 && __GNUC_MINOR__ > 4) #define AMQP_NORETURN \ __attribute__ ((__noreturn__)) +#define AMQP_UNUSED \ + __attribute__ ((__unused__)) #else #define AMQP_NORETURN +#define AMQP_UNUSED #endif #if __GNUC__ >= 4 @@ -85,7 +92,12 @@ char * amqp_os_error_string(int err); -#include "socket.h" +#ifdef WITH_SSL +char * +amqp_ssl_error_string(int err); +#endif + +#include "amqp_socket.h" /* * Connection states: XXX FIX THIS @@ -143,7 +155,8 @@ struct amqp_connection_state_t_ { amqp_bytes_t outbound_buffer; - int sockfd; + amqp_socket_t *socket; + amqp_bytes_t sock_inbound_buffer; size_t sock_inbound_offset; size_t sock_inbound_limit; diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index 22cc2c5..ecb7de5 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -39,12 +39,73 @@ #endif #include "amqp_private.h" -#include <stdlib.h> + +#include "socket.h" + +#include <assert.h> +#include <stdarg.h> +#include <stdint.h> #include <stdio.h> +#include <stdlib.h> #include <string.h> -#include <stdint.h> -#include <stdarg.h> -#include <assert.h> + +ssize_t +amqp_socket_writev(amqp_socket_t *self, const struct iovec *iov, int iovcnt) +{ + assert(self); + assert(self->klass->writev); + return self->klass->writev(self, iov, iovcnt); +} + +ssize_t +amqp_socket_send(amqp_socket_t *self, const void *buf, size_t len, int flags) +{ + assert(self); + assert(self->klass->send); + return self->klass->send(self, buf, len, flags); +} + +ssize_t +amqp_socket_recv(amqp_socket_t *self, void *buf, size_t len, int flags) +{ + assert(self); + assert(self->klass->recv); + return self->klass->recv(self, buf, len, flags); +} + +int +amqp_socket_open(amqp_socket_t *self, const char *host, int port) +{ + assert(self); + assert(self->klass->open); + return self->klass->open(self, host, port); +} + +int +amqp_socket_close(amqp_socket_t *self) +{ + if (self) { + assert(self->klass->close); + return self->klass->close(self); + } + return 0; +} + +int +amqp_socket_error(amqp_socket_t *self) +{ + assert(self); + assert(self->klass->error); + return self->klass->error(self); +} + +int +amqp_socket_get_sockfd(amqp_socket_t *self) +{ + assert(self); + assert(self->klass->get_sockfd); + return self->klass->get_sockfd(self); +} int amqp_open_socket(char const *hostname, int portnumber) @@ -57,8 +118,9 @@ int amqp_open_socket(char const *hostname, int last_error = 0; int one = 1; /* for setsockopt */ - if (0 != (last_error = amqp_socket_init())) { - return last_error; + last_error = amqp_socket_init(); + if (0 != last_error) { + return -last_error; } memset(&hint, 0, sizeof(hint)); @@ -81,20 +143,20 @@ int amqp_open_socket(char const *hostname, */ sockfd = (int)socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol); if (-1 == sockfd) { - last_error = -amqp_socket_error(); + last_error = -amqp_os_socket_error(); continue; } #ifdef DISABLE_SIGPIPE_WITH_SETSOCKOPT if (0 != amqp_socket_setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one))) { - last_error = -amqp_socket_error(); - amqp_socket_close(sockfd); + last_error = -amqp_os_socket_error(); + amqp_os_socket_close(sockfd); continue; } #endif /* DISABLE_SIGPIPE_WITH_SETSOCKOPT */ if (0 != amqp_socket_setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one)) || 0 != connect(sockfd, addr->ai_addr, addr->ai_addrlen)) { - last_error = -amqp_socket_error(); - amqp_socket_close(sockfd); + last_error = -amqp_os_socket_error(); + amqp_os_socket_close(sockfd); continue; } else { last_error = 0; @@ -117,7 +179,7 @@ int amqp_send_header(amqp_connection_state_t state) AMQP_PROTOCOL_VERSION_MINOR, AMQP_PROTOCOL_VERSION_REVISION }; - return send(state->sockfd, (void *)header, 8, MSG_NOSIGNAL); + return amqp_socket_send(state->socket, header, 8, MSG_NOSIGNAL); } static amqp_bytes_t sasl_method_name(amqp_sasl_method_enum method) @@ -214,13 +276,13 @@ static int wait_frame_inner(amqp_connection_state_t state, assert(res != 0); } - res = recv(state->sockfd, state->sock_inbound_buffer.bytes, - state->sock_inbound_buffer.len, 0); + res = amqp_socket_recv(state->socket, state->sock_inbound_buffer.bytes, + state->sock_inbound_buffer.len, 0); if (res <= 0) { if (res == 0) { return -ERROR_CONNECTION_CLOSED; } else { - return -amqp_socket_error(); + return -amqp_socket_error(state->socket); } } @@ -342,14 +404,14 @@ retry: */ if (!((frame.frame_type == AMQP_FRAME_METHOD) && ( - ((frame.channel == channel) - && (amqp_id_in_reply_list(frame.payload.method.id, expected_reply_ids) - || (frame.payload.method.id == AMQP_CHANNEL_CLOSE_METHOD))) - || - ((frame.channel == 0) - && (frame.payload.method.id == AMQP_CONNECTION_CLOSE_METHOD)) - ) - )) { + ((frame.channel == channel) + && (amqp_id_in_reply_list(frame.payload.method.id, expected_reply_ids) + || (frame.payload.method.id == AMQP_CHANNEL_CLOSE_METHOD))) + || + ((frame.channel == 0) + && (frame.payload.method.id == AMQP_CONNECTION_CLOSE_METHOD)) + ) + )) { amqp_frame_t *frame_copy = amqp_pool_alloc(&state->decoding_pool, sizeof(amqp_frame_t)); amqp_link_t *link = amqp_pool_alloc(&state->decoding_pool, sizeof(amqp_link_t)); @@ -431,13 +493,13 @@ static int amqp_table_contains_entry(const amqp_table_t *table, } static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state, - char const *vhost, - int channel_max, - int frame_max, - int heartbeat, - const amqp_table_t *client_properties, - amqp_sasl_method_enum sasl_method, - va_list vl) + char const *vhost, + int channel_max, + int frame_max, + int heartbeat, + const amqp_table_t *client_properties, + amqp_sasl_method_enum sasl_method, + va_list vl) { int res; amqp_method_t method; @@ -472,7 +534,7 @@ static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state, amqp_table_t default_table; amqp_connection_start_ok_t s; amqp_bytes_t response_bytes = sasl_response(&state->decoding_pool, - sasl_method, vl); + sasl_method, vl); if (response_bytes.bytes == NULL) { res = -ERROR_NO_MEMORY; @@ -508,7 +570,7 @@ static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state, amqp_table_entry_t *current_entry; s.client_properties.entries = amqp_pool_alloc(&state->decoding_pool, - sizeof(amqp_table_entry_t) * (default_table.num_entries + client_properties->num_entries)); + sizeof(amqp_table_entry_t) * (default_table.num_entries + client_properties->num_entries)); if (NULL == s.client_properties.entries) { res = -ERROR_NO_MEMORY; goto error_res; @@ -643,13 +705,13 @@ amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, } amqp_rpc_reply_t amqp_login_with_properties(amqp_connection_state_t state, - char const *vhost, - int channel_max, - int frame_max, - int heartbeat, - const amqp_table_t *client_properties, - amqp_sasl_method_enum sasl_method, - ...) + char const *vhost, + int channel_max, + int frame_max, + int heartbeat, + const amqp_table_t *client_properties, + amqp_sasl_method_enum sasl_method, + ...) { va_list vl; diff --git a/librabbitmq/amqp_socket.h b/librabbitmq/amqp_socket.h new file mode 100644 index 0000000..ef3462b --- /dev/null +++ b/librabbitmq/amqp_socket.h @@ -0,0 +1,107 @@ +/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */ +/* + * Copyright 2012-2013 Michael Steinert + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +/** + * An abstract socket interface. + */ + +#ifndef AMQP_SOCKET_H +#define AMQP_SOCKET_H + +#include "amqp.h" +#include "socket.h" + +AMQP_BEGIN_DECLS + +/* Socket callbacks. */ +typedef ssize_t (*amqp_socket_writev_fn)(void *, const struct iovec *, int); +typedef ssize_t (*amqp_socket_send_fn)(void *, const void *, size_t, int); +typedef ssize_t (*amqp_socket_recv_fn)(void *, void *, size_t, int); +typedef int (*amqp_socket_open_fn)(void *, const char *, int); +typedef int (*amqp_socket_close_fn)(void *); +typedef int (*amqp_socket_error_fn)(void *); +typedef int (*amqp_socket_get_sockfd_fn)(void *); + +/** V-table for amqp_socket_t */ +struct amqp_socket_class_t { + amqp_socket_writev_fn writev; + amqp_socket_send_fn send; + amqp_socket_recv_fn recv; + amqp_socket_open_fn open; + amqp_socket_close_fn close; + amqp_socket_error_fn error; + amqp_socket_get_sockfd_fn get_sockfd; +}; + +/** Abstract base class for amqp_socket_t */ +struct amqp_socket_t_ { + const struct amqp_socket_class_t *klass; +}; + +/** + * Write to a socket. + * + * This function is analagous to writev(2). + * + * \param [in,out] self A socket object. + * \param [in] iov One or more data vecors. + * \param [in] iovcnt The number of vectors in \e iov. + * + * \return The number of bytes written, or -1 if an error occurred. + */ +ssize_t +amqp_socket_writev(amqp_socket_t *self, const struct iovec *iov, int iovcnt); + +/** + * Send a message from a socket. + * + * This function is analagous to send(2). + * + * \param [in,out] self A socket object. + * \param [in] buf A buffer to read from. + * \param [in] len The number of bytes in \e buf. + * \param [in] flags Send flags, implementation specific. + * + * \return The number of bytes sent, or -1 if an error occurred. + */ +ssize_t +amqp_socket_send(amqp_socket_t *self, const void *buf, size_t len, int flags); + +/** + * Receive a message from a socket. + * + * This function is analagous to recv(2). + * + * \param [in,out] self A socket object. + * \param [out] buf A buffer to write to. + * \param [in] len The number of bytes at \e buf. + * \param [in] flags Receive flags, implementation specific. + * + * \return The number of bytes received, or -1 if an error occurred. + */ +ssize_t +amqp_socket_recv(amqp_socket_t *self, void *buf, size_t len, int flags); + +AMQP_END_DECLS + +#endif /* AMQP_SOCKET_H */ diff --git a/librabbitmq/amqp_ssl_socket.h b/librabbitmq/amqp_ssl_socket.h new file mode 100644 index 0000000..3bfce51 --- /dev/null +++ b/librabbitmq/amqp_ssl_socket.h @@ -0,0 +1,141 @@ +/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */ +/* + * Copyright 2012-2013 Michael Steinert + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +/** + * An SSL socket connection. + */ + +#ifndef AMQP_SSL_H +#define AMQP_SSL_H + +#include <amqp.h> + +AMQP_BEGIN_DECLS + +/** + * Create a new SSL/TLS socket object. + * + * Call amqp_socket_close() to release socket resources. + * + * \return A new socket object or NULL if an error occurred. + */ +AMQP_PUBLIC_FUNCTION +amqp_socket_t * +AMQP_CALL +amqp_ssl_socket_new(void); + +/** + * Set the CA certificate. + * + * \param [in,out] self An SSL/TLS socket object. + * \param [in] cacert Path to the CA cert file in PEM format. + * + * \return Zero if successful, -1 otherwise. + */ +AMQP_PUBLIC_FUNCTION +int +AMQP_CALL +amqp_ssl_socket_set_cacert(amqp_socket_t *self, + const char *cacert); + +/** + * Set the client key. + * + * \param [in,out] self An SSL/TLS socket object. + * \param [in] cert Path to the client certificate in PEM foramt. + * \param [in] key Path to the client key in PEM format. + * + * \return Zero if successful, -1 otherwise. + */ +AMQP_PUBLIC_FUNCTION +int +AMQP_CALL +amqp_ssl_socket_set_key(amqp_socket_t *self, + const char *cert, + const char *key); + +/** + * Set the client key from a buffer. + * + * \param [in,out] self An SSL/TLS socket object. + * \param [in] cert Path to the client certificate in PEM foramt. + * \param [in] key A buffer containing client key in PEM format. + * \param [in] n The length of the buffer. + * + * \return Zero if successful, -1 otherwise. + */ +AMQP_PUBLIC_FUNCTION +int +AMQP_CALL +amqp_ssl_socket_set_key_buffer(amqp_socket_t *self, + const char *cert, + const void *key, + size_t n); + +/** + * Enable or disable peer verification. + * + * If peer verification is enabled then the common name in the server + * certificate must match the server name. Peer verification is enabled by + * default. + * + * \param [in,out] self An SSL/TLS socket object. + * \param [in] verify Enable or disable peer verification. + */ +AMQP_PUBLIC_FUNCTION +void +AMQP_CALL +amqp_ssl_socket_set_verify(amqp_socket_t *self, + amqp_boolean_t verify); + +/** + * Sets whether rabbitmq-c initializes the underlying SSL library. + * + * For SSL libraries that require a one-time initialization across + * a whole program (e.g., OpenSSL) this sets whether or not rabbitmq-c + * will initialize the SSL library when the first call to + * amqp_open_ssl_socket() is made. You should call this function with + * do_init = 0 if the underlying SSL library is intialized somewhere else + * the program. + * + * Failing to initialize or double initialization of the SSL library will + * result in undefined behavior + * + * By default rabbitmq-c will initialize the underlying SSL library + * + * NOTE: calling this function after the first socket has been opened with + * amqp_open_ssl_socket() will not have any effect. + * + * \param [in] do_initalize If 0 rabbitmq-c will not initialize the SSL + * library, otherwise rabbitmq-c will initialize the + * SL library + * + */ +AMQP_PUBLIC_FUNCTION +void +AMQP_CALL +amqp_set_initialize_ssl_library(amqp_boolean_t do_initialize); + +AMQP_END_DECLS + +#endif /* AMQP_SSL_H */ diff --git a/librabbitmq/amqp_tcp_socket.c b/librabbitmq/amqp_tcp_socket.c new file mode 100644 index 0000000..2eb366f --- /dev/null +++ b/librabbitmq/amqp_tcp_socket.c @@ -0,0 +1,126 @@ +/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */ +/* + * Copyright 2012-2013 Michael Steinert + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "amqp_private.h" +#include "amqp_tcp_socket.h" +#include <stdio.h> +#include <stdlib.h> + +struct amqp_tcp_socket_t { + const struct amqp_socket_class_t *klass; + int sockfd; +}; + +static ssize_t +amqp_tcp_socket_writev(void *base, const struct iovec *iov, int iovcnt) +{ + struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; + return amqp_os_socket_writev(self->sockfd, iov, iovcnt); +} + +static ssize_t +amqp_tcp_socket_send(void *base, const void *buf, size_t len, int flags) +{ + struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; + return send(self->sockfd, buf, len, flags); +} + +static ssize_t +amqp_tcp_socket_recv(void *base, void *buf, size_t len, int flags) +{ + struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; + return recv(self->sockfd, buf, len, flags); +} + +static int +amqp_tcp_socket_open(void *base, const char *host, int port) +{ + struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; + self->sockfd = amqp_open_socket(host, port); + if (0 > self->sockfd) { + return -1; + } + return 0; +} + +static int +amqp_tcp_socket_close(void *base) +{ + struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; + int status = -1; + if (self) { + status = amqp_os_socket_close(self->sockfd); + free(self); + } + return status; +} + +static int +amqp_tcp_socket_error(AMQP_UNUSED void *base) +{ + return amqp_os_socket_error(); +} + +static int +amqp_tcp_socket_get_sockfd(void *base) +{ + struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; + return self->sockfd; +} + +static const struct amqp_socket_class_t amqp_tcp_socket_class = { + amqp_tcp_socket_writev, /* writev */ + amqp_tcp_socket_send, /* send */ + amqp_tcp_socket_recv, /* recv */ + amqp_tcp_socket_open, /* open */ + amqp_tcp_socket_close, /* close */ + amqp_tcp_socket_error, /* error */ + amqp_tcp_socket_get_sockfd /* get_sockfd */ +}; + +amqp_socket_t * +amqp_tcp_socket_new(void) +{ + struct amqp_tcp_socket_t *self = calloc(1, sizeof(*self)); + if (!self) { + return NULL; + } + self->klass = &amqp_tcp_socket_class; + self->sockfd = -1; + return (amqp_socket_t *)self; +} + +void +amqp_tcp_socket_set_sockfd(amqp_socket_t *base, int sockfd) +{ + struct amqp_tcp_socket_t *self; + if (base->klass != &amqp_tcp_socket_class) { + amqp_abort("<%p> is not of type amqp_tcp_socket_t", base); + } + self = (struct amqp_tcp_socket_t *)base; + self->sockfd = sockfd; +} diff --git a/librabbitmq/amqp_tcp_socket.h b/librabbitmq/amqp_tcp_socket.h new file mode 100644 index 0000000..4c8ba54 --- /dev/null +++ b/librabbitmq/amqp_tcp_socket.h @@ -0,0 +1,64 @@ +/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */ +/* + * Copyright 2012-2013 Michael Steinert + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +/** + * A TCP socket connection. + */ + +#ifndef AMQP_TCP_SOCKET_H +#define AMQP_TCP_SOCKET_H + +#include <amqp.h> + +AMQP_BEGIN_DECLS + +/** + * Create a new TCP socket. + * + * Call amqp_socket_close() to release socket resources. + * + * \return A new socket object or NULL if an error occurred. + */ +AMQP_PUBLIC_FUNCTION +amqp_socket_t * +AMQP_CALL +amqp_tcp_socket_new(void); + +/** + * Assign an open file descriptor to a socket object. + * + * This function must not be used in conjunction with amqp_socket_open(), i.e. + * the socket connection should already be open(2) when this function is + * called. + * + * \param [in,out] self A TCP socket object. + * \param [in] sockfd An open socket descriptor. + */ +AMQP_PUBLIC_FUNCTION +void +AMQP_CALL +amqp_tcp_socket_set_sockfd(amqp_socket_t *base, int sockfd); + +AMQP_END_DECLS + +#endif /* AMQP_TCP_SOCKET_H */ diff --git a/librabbitmq/amqp_url.c b/librabbitmq/amqp_url.c index 277b5a6..b200adc 100644 --- a/librabbitmq/amqp_url.c +++ b/librabbitmq/amqp_url.c @@ -52,6 +52,7 @@ void amqp_default_connection_info(struct amqp_connection_info *ci) ci->host = "localhost"; ci->port = 5672; ci->vhost = "/"; + ci->ssl = 0; } /* Scan for the next delimiter, handling percent-encodings on the way. */ @@ -116,11 +117,16 @@ int amqp_parse_url(char *url, struct amqp_connection_info *parsed) char *port = NULL; /* check the prefix */ - if (strncmp(url, "amqp://", 7)) { + if (!strncmp(url, "amqp://", 7)) { + /* do nothing */ + } else if (!strncmp(url, "amqps://", 8)) { + parsed->port = 5671; + parsed->ssl = 1; + } else { goto out; } - host = start = url += 7; + host = start = url += (parsed->ssl ? 8 : 7); delim = find_delim(&url, 1); if (delim == ':') { diff --git a/librabbitmq/unix/socket.c b/librabbitmq/unix/socket.c index 38064f1..4615480 100644 --- a/librabbitmq/unix/socket.c +++ b/librabbitmq/unix/socket.c @@ -40,10 +40,12 @@ #include "amqp_private.h" #include "socket.h" +#include <errno.h> #include <fcntl.h> #include <stdint.h> #include <stdlib.h> #include <string.h> +#include <unistd.h> int amqp_socket_init(void) @@ -52,12 +54,7 @@ amqp_socket_init(void) } int -amqp_socket_error(void) -{ - return errno | ERROR_CATEGORY_OS; -} - -int amqp_socket_socket(int domain, int type, int proto) +amqp_socket_socket(int domain, int type, int proto) { int flags; @@ -79,7 +76,27 @@ int amqp_socket_socket(int domain, int type, int proto) return s; } -char *amqp_os_error_string(int err) +char * +amqp_os_error_string(int err) { return strdup(strerror(err)); } + +int +amqp_os_socket_close(int sockfd) +{ + return close(sockfd); +} + +ssize_t +amqp_os_socket_writev(int sockfd, const struct iovec *iov, + int iovcnt) +{ + return writev(sockfd, iov, iovcnt); +} + +int +amqp_os_socket_error(void) +{ + return errno | ERROR_CATEGORY_OS; +} diff --git a/librabbitmq/unix/socket.h b/librabbitmq/unix/socket.h index 1382ca3..34de1a2 100644 --- a/librabbitmq/unix/socket.h +++ b/librabbitmq/unix/socket.h @@ -44,7 +44,6 @@ #include <sys/socket.h> #include <netdb.h> #include <sys/uio.h> -#include <unistd.h> int amqp_socket_init(void); @@ -53,11 +52,15 @@ int amqp_socket_socket(int domain, int type, int proto); int -amqp_socket_error(void); +amqp_os_socket_error(void); + +int +amqp_os_socket_close(int sockfd); + +ssize_t +amqp_os_socket_writev(int sockfd, const struct iovec *iov, int iovcnt); #define amqp_socket_setsockopt setsockopt -#define amqp_socket_close close -#define amqp_socket_writev writev #if defined(SO_NOSIGPIPE) && !defined(MSG_NOSIGNAL) # define DISABLE_SIGPIPE_WITH_SETSOCKOPT diff --git a/librabbitmq/unix/threads.h b/librabbitmq/unix/threads.h new file mode 100644 index 0000000..c2d80a6 --- /dev/null +++ b/librabbitmq/unix/threads.h @@ -0,0 +1,29 @@ +/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */ +/* + * Copyright 2012-2013 Michael Steinert + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#ifndef AMQP_THREADS_H +#define AMQP_THREADS_H + +#include <pthread.h> + +#endif /* AMQP_THREADS_H */ diff --git a/librabbitmq/win32/socket.c b/librabbitmq/win32/socket.c index a5d9454..82051fe 100644 --- a/librabbitmq/win32/socket.c +++ b/librabbitmq/win32/socket.c @@ -49,13 +49,14 @@ static int called_wsastartup; -int amqp_socket_init(void) +int +amqp_socket_init(void) { if (!called_wsastartup) { WSADATA data; int res = WSAStartup(0x0202, &data); if (res) { - return -res; + return (ERROR_CATEGORY_OS | res); } called_wsastartup = 1; @@ -64,7 +65,8 @@ int amqp_socket_init(void) return 0; } -char *amqp_os_error_string(int err) +char * +amqp_os_error_string(int err) { char *msg, *copy; @@ -91,7 +93,13 @@ amqp_socket_setsockopt(int sock, int level, int optname, } int -amqp_socket_writev(int sock, struct iovec *iov, int nvecs) +amqp_os_socket_close(int sockfd) +{ + return closesocket(sockfd); +} + +ssize_t +amqp_os_socket_writev(int sock, struct iovec *iov, int nvecs) { DWORD ret; if (WSASend(sock, (LPWSABUF)iov, nvecs, &ret, 0, NULL, NULL) == 0) { @@ -102,7 +110,7 @@ amqp_socket_writev(int sock, struct iovec *iov, int nvecs) } int -amqp_socket_error(void) +amqp_os_socket_error(void) { return WSAGetLastError() | ERROR_CATEGORY_OS; } diff --git a/librabbitmq/win32/socket.h b/librabbitmq/win32/socket.h index cea361d..cddbee5 100644 --- a/librabbitmq/win32/socket.h +++ b/librabbitmq/win32/socket.h @@ -37,6 +37,7 @@ * ***** END LICENSE BLOCK ***** */ +#include "amqp_private.h" #include <winsock2.h> #include <WS2tcpip.h> @@ -50,17 +51,23 @@ int amqp_socket_init(void); #define amqp_socket_socket socket -#define amqp_socket_close closesocket + +char * +amqp_os_error_string(int err); int -amqp_socket_setsockopt(int sock, int level, int optname, const void *optval, - size_t optlen); +amqp_socket_setsockopt(int sock, int level, int optname, + const void *optval, size_t optlen); int -amqp_socket_writev(int sock, struct iovec *iov, int nvecs); +amqp_os_socket_close(int sockfd); + + +ssize_t +amqp_os_socket_writev(int sock, struct iovec *iov, int nvecs); int -amqp_socket_error(void); +amqp_os_socket_error(void); #ifndef MSG_NOSIGNAL # define MSG_NOSIGNAL 0x0 diff --git a/librabbitmq/win32/threads.c b/librabbitmq/win32/threads.c new file mode 100644 index 0000000..edf2d2d --- /dev/null +++ b/librabbitmq/win32/threads.c @@ -0,0 +1,63 @@ +/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */ +/* + * Copyright 2012-2013 Michael Steinert + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#include "threads.h" + +DWORD +pthread_self(void) +{ + return GetCurrentThreadId(); +} + +int +pthread_mutex_init(pthread_mutex_t *mutex, void *attr) +{ + *mutex = malloc(sizeof(CRITICAL_SECTION)); + if (!*mutex) { + return 1; + } + InitializeCriticalSection(*mutex); + return 0; +} + +int +pthread_mutex_lock(pthread_mutex_t *mutex) +{ + if (!*mutex) { + return 1; + } + + EnterCriticalSection(*mutex); + return 0; +} + +int +pthread_mutex_unlock(pthread_mutex_t *mutex) +{ + if (!*mutex) { + return 1; + } + + LeaveCriticalSection(*mutex); + return 0; +} diff --git a/librabbitmq/win32/threads.h b/librabbitmq/win32/threads.h new file mode 100644 index 0000000..668b2a3 --- /dev/null +++ b/librabbitmq/win32/threads.h @@ -0,0 +1,37 @@ +/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */ +/* + * Copyright 2012-2013 Michael Steinert + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#ifndef AMQP_THREAD_H +#define AMQP_THREAD_H + +#include <Windows.h> + +typedef CRITICAL_SECTION *pthread_mutex_t; +typedef int pthread_once_t; + +DWORD pthread_self(void); + +int pthread_mutex_init(pthread_mutex_t *, void *attr); +int pthread_mutex_lock(pthread_mutex_t *); +int pthread_mutex_unlock(pthread_mutex_t *); +#endif /* AMQP_THREAD_H */ diff --git a/m4/polarssl.m4 b/m4/polarssl.m4 new file mode 100644 index 0000000..2c87bbd --- /dev/null +++ b/m4/polarssl.m4 @@ -0,0 +1,75 @@ +# polarssl.m4 - Check for PolarSSL +# +# Copyright 2012 Michael Steinert +# +# Permission is hereby granted, free of charge, to any person obtaining +# a copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, +# distribute, sublicense, and/or sell copies of the Software, and to +# permit persons to whom the Software is furnished to do so, subject to +# the following conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +# SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR +# THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +#serial 1 + +# _AX_LIB_POLARSSL +# ---------------- +# Check for the PolarSSL library and header file. If found the cache variable +# ax_cv_have_polarssl will be set to yes. +AC_DEFUN([_AX_LIB_POLARSSL], +[dnl +ax_cv_have_polarssl=no +_ax_polarssl_h=no +_ax_polarssl_lib=no +AC_ARG_VAR([POLARSSL_CFLAGS], + [C compiler flags for PolarSSL, overriding defaults]) +AC_ARG_VAR([POLARSSL_LIBS], [linker flags for PolarSSL, overriding defaults]) +AC_CHECK_HEADERS([polarssl/ssl.h], + [_ax_polarssl_h=yes],, + [$POLARSSL_CFLAGS]) +AS_IF([test "x$POLARSSL_LIBS" = "x"], + [AC_CHECK_LIB([polarssl], [entropy_init], + [POLARSSL_LIBS=-lpolarssl + _ax_polarssl_lib=yes])], + [_ax_polarssl_cflags=$CFLAGS + CFLAGS="$POLARSSL_CFLAGS $CFLAGS" + _ax_polarssl_ldflags=$LDFLAGS + LDFLAGS="$POLARSSL_LIBS $LDFLAGS" + AC_MSG_CHECKING([for libpolarssl]) + AC_TRY_LINK([#include <polarssl/entropy.h>], + [entropy_init(NULL)], + [AC_MSG_RESULT([$POLARSSL_LIBS]) + _ax_polarssl_lib=yes], + [AC_MSG_RESULT([no])]) + CFLAGS=$_ax_polarssl_cflags + LDFLAGS=$_ax_polarssl_ldflags]) +AS_IF([test "x$_ax_polarssl_h" = "xyes" && \ + test "x$_ax_polarssl_lib" = "xyes"], + [ax_cv_have_polarssl=yes]) +])dnl + +# AX_LIB_POLARSSL([ACTION-IF-TRUE], [ACTION-IF-FALSE]) +# ------------------------------------------------ +# Check if PolarSSL is installed. If found the variable ax_have_polarssl will +# be set to yes. +# ACTION-IF-TRUE: commands to execute if PolarSSL is installed +# ACTION-IF-FALSE: commands to execute if PoloarSSL is not installed +AC_DEFUN([AX_LIB_POLARSSL], +[dnl +AC_CACHE_VAL([ax_cv_have_polarssl], [_AX_LIB_POLARSSL]) +ax_have_polarssl=$ax_cv_have_polarssl +AS_IF([test "x$ax_have_polarssl" = "xyes"], + [AC_DEFINE([HAVE_POLARSSL], [1], [Define to 1 if PolarSSL is available.]) + $1], [$2]) +])dnl diff --git a/tests/test_parse_url.c b/tests/test_parse_url.c index b816cd8..da3e681 100644 --- a/tests/test_parse_url.c +++ b/tests/test_parse_url.c @@ -112,61 +112,142 @@ int main(void) /* From the spec */ parse_success("amqp://user:pass@host:10000/vhost", "user", "pass", "host", 10000, "vhost"); + parse_success("amqps://user:pass@host:10000/vhost", "user", "pass", + "host", 10000, "vhost"); + parse_success("amqp://user%61:%61pass@ho%61st:10000/v%2fhost", "usera", "apass", "hoast", 10000, "v/host"); + parse_success("amqps://user%61:%61pass@ho%61st:10000/v%2fhost", + "usera", "apass", "hoast", 10000, "v/host"); + parse_success("amqp://", "guest", "guest", "localhost", 5672, "/"); + parse_success("amqps://", "guest", "guest", "localhost", 5671, "/"); + parse_success("amqp://:@/", "", "", "localhost", 5672, ""); + parse_success("amqps://:@/", "", "", "localhost", 5671, ""); + parse_success("amqp://user@", "user", "guest", "localhost", 5672, "/"); + parse_success("amqps://user@", "user", "guest", "localhost", 5671, "/"); + parse_success("amqp://user:pass@", "user", "pass", "localhost", 5672, "/"); + parse_success("amqps://user:pass@", "user", "pass", + "localhost", 5671, "/"); + parse_success("amqp://host", "guest", "guest", "host", 5672, "/"); + parse_success("amqps://host", "guest", "guest", "host", 5671, "/"); + parse_success("amqp://:10000", "guest", "guest", "localhost", 10000, "/"); + parse_success("amqps://:10000", "guest", "guest", "localhost", 10000, + "/"); + parse_success("amqp:///vhost", "guest", "guest", "localhost", 5672, "vhost"); + parse_success("amqps:///vhost", "guest", "guest", "localhost", 5671, + "vhost"); + parse_success("amqp://host/", "guest", "guest", "host", 5672, ""); + parse_success("amqps://host/", "guest", "guest", "host", 5671, ""); + parse_success("amqp://host/%2f", "guest", "guest", "host", 5672, "/"); + parse_success("amqps://host/%2f", "guest", "guest", "host", 5671, "/"); + parse_success("amqp://[::1]", "guest", "guest", "::1", 5672, "/"); + parse_success("amqps://[::1]", "guest", "guest", "::1", 5671, "/"); /* Various other success cases */ parse_success("amqp://host:100", "guest", "guest", "host", 100, "/"); + parse_success("amqps://host:100", "guest", "guest", "host", 100, "/"); + parse_success("amqp://[::1]:100", "guest", "guest", "::1", 100, "/"); + parse_success("amqps://[::1]:100", "guest", "guest", "::1", 100, "/"); parse_success("amqp://host/blah", "guest", "guest", "host", 5672, "blah"); + parse_success("amqps://host/blah", "guest", "guest", + "host", 5671, "blah"); + parse_success("amqp://host:100/blah", "guest", "guest", "host", 100, "blah"); + parse_success("amqps://host:100/blah", "guest", "guest", + "host", 100, "blah"); + parse_success("amqp://:100/blah", "guest", "guest", "localhost", 100, "blah"); + parse_success("amqps://:100/blah", "guest", "guest", + "localhost", 100, "blah"); + parse_success("amqp://[::1]/blah", "guest", "guest", "::1", 5672, "blah"); + parse_success("amqps://[::1]/blah", "guest", "guest", + "::1", 5671, "blah"); + parse_success("amqp://[::1]:100/blah", "guest", "guest", "::1", 100, "blah"); + parse_success("amqps://[::1]:100/blah", "guest", "guest", + "::1", 100, "blah"); parse_success("amqp://user:pass@host", "user", "pass", "host", 5672, "/"); + parse_success("amqps://user:pass@host", "user", "pass", + "host", 5671, "/"); + parse_success("amqp://user:pass@host:100", "user", "pass", "host", 100, "/"); + parse_success("amqps://user:pass@host:100", "user", "pass", + "host", 100, "/"); + parse_success("amqp://user:pass@:100", "user", "pass", "localhost", 100, "/"); + parse_success("amqps://user:pass@:100", "user", "pass", + "localhost", 100, "/"); + parse_success("amqp://user:pass@[::1]", "user", "pass", "::1", 5672, "/"); + parse_success("amqps://user:pass@[::1]", "user", "pass", + "::1", 5671, "/"); + parse_success("amqp://user:pass@[::1]:100", "user", "pass", "::1", 100, "/"); + parse_success("amqps://user:pass@[::1]:100", "user", "pass", + "::1", 100, "/"); /* Various failure cases */ parse_fail("http://www.rabbitmq.com"); + parse_fail("amqp://foo:bar:baz"); + parse_fail("amqps://foo:bar:baz"); + + parse_fail("amqp://foo[::1]"); + parse_fail("amqps://foo[::1]"); + parse_fail("amqp://foo[::1]"); + parse_fail("amqps://foo[::1]"); + parse_fail("amqp://foo:[::1]"); + parse_fail("amqps://foo:[::1]"); + parse_fail("amqp://[::1]foo"); + parse_fail("amqps://[::1]foo"); + parse_fail("amqp://foo:1000xyz"); + parse_fail("amqps://foo:1000xyz"); + parse_fail("amqp://foo:1000000"); + parse_fail("amqps://foo:1000000"); + parse_fail("amqp://foo/bar/baz"); + parse_fail("amqps://foo/bar/baz"); parse_fail("amqp://foo%1"); + parse_fail("amqps://foo%1"); + parse_fail("amqp://foo%1x"); + parse_fail("amqps://foo%1x"); + parse_fail("amqp://foo%xy"); + parse_fail("amqps://foo%xy"); return 0; } diff --git a/tools/common.c b/tools/common.c index 7659521..0c61d16 100644 --- a/tools/common.c +++ b/tools/common.c @@ -38,18 +38,18 @@ #include "config.h" #endif -/* needed for asnprintf */ -#define _GNU_SOURCE +#include "common.h" +#ifdef WITH_SSL +#include <amqp_ssl_socket.h> +#endif +#include <amqp_tcp_socket.h> +#include <errno.h> +#include <fcntl.h> +#include <stdarg.h> #include <stdio.h> #include <stdlib.h> -#include <stdarg.h> #include <string.h> - #include <unistd.h> -#include <fcntl.h> -#include <errno.h> - -#include "common.h" #ifdef WINDOWS #include "compat.h" @@ -175,6 +175,12 @@ static int amqp_port = -1; static char *amqp_vhost; static char *amqp_username; static char *amqp_password; +#ifdef WITH_SSL +static int amqp_ssl = 0; +static char *amqp_cacert = "/etc/ssl/certs/cacert.pem"; +static char *amqp_key = NULL; +static char *amqp_cert = NULL; +#endif /* WITH_SSL */ const char *connect_options_title = "Connection options"; struct poptOption connect_options[] = { @@ -202,13 +208,29 @@ struct poptOption connect_options[] = { "password", 0, POPT_ARG_STRING, &amqp_password, 0, "the password to login with", "password" }, +#ifdef WITH_SSL + { + "ssl", 0, POPT_ARG_NONE, &amqp_ssl, 0, + "connect over SSL/TLS", NULL + }, + { + "cacert", 0, POPT_ARG_STRING, &amqp_cacert, 0, + "path to the CA certificate file", "cacert.pem" + }, + { + "key", 0, POPT_ARG_STRING, &amqp_key, 0, + "path to the client private key file", "key.pem" + }, + { + "cert", 0, POPT_ARG_STRING, &amqp_cert, 0, + "path to the client certificate file", "cert.pem" + }, +#endif /* WITH_SSL */ { NULL, '\0', 0, NULL, 0, NULL, NULL } }; static void init_connection_info(struct amqp_connection_info *ci) { - struct amqp_connection_info defaults; - ci->user = NULL; ci->password = NULL; ci->host = NULL; @@ -216,17 +238,17 @@ static void init_connection_info(struct amqp_connection_info *ci) ci->vhost = NULL; ci->user = NULL; - if (amqp_url) { + amqp_default_connection_info(ci); + + if (amqp_url) die_amqp_error(amqp_parse_url(strdup(amqp_url), ci), "Parsing URL '%s'", amqp_url); - } if (amqp_server) { char *colon; - if (ci->host) { + if (ci->host) die("both --server and --url options specify" " server host"); - } /* parse the server string into a hostname and a port */ colon = strchr(amqp_server, ':'); @@ -246,104 +268,105 @@ static void init_connection_info(struct amqp_connection_info *ci) memcpy(ci->host, amqp_server, host_len); ci->host[host_len] = 0; - if (ci->port >= 0) { + if (ci->port >= 0) die("both --server and --url options specify" " server port"); - } - if (amqp_port >= 0) { + if (amqp_port >= 0) die("both --server and --port options specify" " server port"); - } ci->port = strtol(colon+1, &port_end, 10); if (ci->port < 0 || ci->port > 65535 || port_end == colon+1 - || *port_end != 0) { + || *port_end != 0) die("bad server port number in '%s'", amqp_server); - } } + +#if WITH_SSL + if (amqp_ssl && !ci->ssl) { + die("the --ssl option specifies an SSL connection" + " but the --server option does not"); + } +#endif } if (amqp_port >= 0) { - if (ci->port >= 0) { + if (ci->port >= 0) die("both --port and --url options specify" " server port"); - } ci->port = amqp_port; } if (amqp_username) { - if (ci->user) { + if (ci->user) die("both --username and --url options specify" " AMQP username"); - } ci->user = amqp_username; } if (amqp_password) { - if (ci->password) { + if (ci->password) die("both --password and --url options specify" " AMQP password"); - } ci->password = amqp_password; } if (amqp_vhost) { - if (ci->vhost) { + if (ci->vhost) die("both --vhost and --url options specify" " AMQP vhost"); - } ci->vhost = amqp_vhost; } - - amqp_default_connection_info(&defaults); - - if (!ci->user) { - ci->user = defaults.user; - } - if (!ci->password) { - ci->password = defaults.password; - } - if (!ci->host) { - ci->host = defaults.host; - } - if (ci->port < 0) { - ci->port = defaults.port; - } - if (!ci->vhost) { - ci->vhost = defaults.vhost; - } } amqp_connection_state_t make_connection(void) { - int s; + int status; + amqp_socket_t *socket = NULL; struct amqp_connection_info ci; amqp_connection_state_t conn; init_connection_info(&ci); - - s = amqp_open_socket(ci.host, ci.port); - die_amqp_error(s, "opening socket to %s:%d", ci.host, ci.port); - conn = amqp_new_connection(); - amqp_set_sockfd(conn, s); - + if (ci.ssl) { +#ifdef WITH_SSL + socket = amqp_ssl_socket_new(); + if (!socket) { + die("creating SSL/TLS socket"); + } + if (amqp_cacert) { + amqp_ssl_socket_set_cacert(socket, amqp_cacert); + } + if (amqp_key) { + amqp_ssl_socket_set_key(socket, amqp_cert, amqp_key); + } +#else + die("librabbitmq was not built with SSL/TLS support"); +#endif + } else { + socket = amqp_tcp_socket_new(); + if (!socket) { + die("creating TCP socket (out of memory)"); + } + } + status = amqp_socket_open(socket, ci.host, ci.port); + if (status) { + die("opening socket to %s:%d", ci.host, ci.port); + } + amqp_set_socket(conn, socket); die_rpc(amqp_login(conn, ci.vhost, 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, ci.user, ci.password), "logging in to AMQP server"); - if (!amqp_channel_open(conn, 1)) { die_rpc(amqp_get_rpc_reply(conn), "opening channel"); } - return conn; } |