summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Antonuk <alan.antonuk@gmail.com>2013-04-17 15:11:56 -0700
committerAlan Antonuk <alan.antonuk@gmail.com>2013-04-17 15:11:56 -0700
commit473c8659a8fd4111de8e1c93c45b4261defc63e5 (patch)
treeb8e9533efacf676086a1cc59d503114224261cd3
parent21b124e2fd2f1c343fb37b708f393d1b9580cfad (diff)
parentadbaf5f18d5f2480f5b01ea352d6c8ab5b17697d (diff)
downloadrabbitmq-c-github-ask-473c8659a8fd4111de8e1c93c45b4261defc63e5.tar.gz
Merge branch 'ssl-plumbing'
Fixes #17 Fixes #14
-rw-r--r--.gitignore10
-rw-r--r--.travis.yml9
-rw-r--r--CMakeLists.txt25
-rw-r--r--Makefile.am95
-rw-r--r--cmake/FindPolarSSL.cmake23
-rw-r--r--cmake/FindcyaSSL.cmake23
-rw-r--r--configure.ac40
-rw-r--r--examples/CMakeLists.txt28
-rw-r--r--examples/amqp_bind.c19
-rw-r--r--examples/amqp_consumer.c19
-rw-r--r--examples/amqp_exchange_declare.c19
-rw-r--r--examples/amqp_listen.c19
-rw-r--r--examples/amqp_listenq.c19
-rw-r--r--examples/amqp_producer.c19
-rw-r--r--examples/amqp_rpc_sendstring_client.c19
-rw-r--r--examples/amqp_sendstring.c19
-rw-r--r--examples/amqp_unbind.c19
-rw-r--r--examples/amqps_bind.c115
-rw-r--r--examples/amqps_consumer.c206
-rw-r--r--examples/amqps_exchange_declare.c110
-rw-r--r--examples/amqps_listen.c207
-rw-r--r--examples/amqps_listenq.c190
-rw-r--r--examples/amqps_producer.c172
-rw-r--r--examples/amqps_sendstring.c124
-rw-r--r--examples/amqps_unbind.c115
-rw-r--r--examples/utils.c11
-rw-r--r--examples/utils.h1
-rw-r--r--librabbitmq/CMakeLists.txt49
-rw-r--r--librabbitmq/amqp.h113
-rw-r--r--librabbitmq/amqp_api.c5
-rw-r--r--librabbitmq/amqp_connection.c50
-rw-r--r--librabbitmq/amqp_cyassl.c270
-rw-r--r--librabbitmq/amqp_gnutls.c362
-rw-r--r--librabbitmq/amqp_openssl.c573
-rw-r--r--librabbitmq/amqp_polarssl.c354
-rw-r--r--librabbitmq/amqp_private.h21
-rw-r--r--librabbitmq/amqp_socket.c140
-rw-r--r--librabbitmq/amqp_socket.h107
-rw-r--r--librabbitmq/amqp_ssl_socket.h141
-rw-r--r--librabbitmq/amqp_tcp_socket.c126
-rw-r--r--librabbitmq/amqp_tcp_socket.h64
-rw-r--r--librabbitmq/amqp_url.c10
-rw-r--r--librabbitmq/unix/socket.c31
-rw-r--r--librabbitmq/unix/socket.h11
-rw-r--r--librabbitmq/unix/threads.h29
-rw-r--r--librabbitmq/win32/socket.c18
-rw-r--r--librabbitmq/win32/socket.h17
-rw-r--r--librabbitmq/win32/threads.c63
-rw-r--r--librabbitmq/win32/threads.h37
-rw-r--r--m4/polarssl.m475
-rw-r--r--tests/test_parse_url.c81
-rw-r--r--tools/common.c133
52 files changed, 4355 insertions, 200 deletions
diff --git a/.gitignore b/.gitignore
index 8008bfe..797169a 100644
--- a/.gitignore
+++ b/.gitignore
@@ -14,6 +14,7 @@
/config.status
/config.sub
/configure
+/cscope.*
/depcomp
/install-sh
/libtool
@@ -32,7 +33,16 @@ examples/amqp_producer
examples/amqp_rpc_sendstring_client
examples/amqp_sendstring
examples/amqp_unbind
+examples/amqps_bind
+examples/amqps_consumer
+examples/amqps_exchange_declare
+examples/amqps_listen
+examples/amqps_listenq
+examples/amqps_producer
+examples/amqps_sendstring
+examples/amqps_unbind
librabbitmq.pc
+test-driver
tests/test_parse_url
tests/test_tables
tools/amqp-consume
diff --git a/.travis.yml b/.travis.yml
index 8daba29..7dd5503 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -10,12 +10,17 @@ compiler:
# Settings to try
env:
+ # OpenSSL
- PRE_CONFIGURE=true CONFIGURE="cmake .. -DCMAKE_INSTALL_PREFIX=../_install" BUILD_INSTALL="cmake --build . --target install"
+ # gnutls
+ #- PRE_CONFIGURE=true CONFIGURE="cmake .. -DSSL_ENGINE=GnuTLS -DCMAKE_INSTALL_PREFIX=../_install" BUILD_INSTALL="cmake --build . --target install"
+ # PolarSSL
+ #- PRE_CONFIGURE=true CONFIGURE="cmake .. -DSSL_ENGINE=PolarSSL -DCMAKE_INSTALL_PREFIX=../_install" BUILD_INSTALL="cmake --build . --target install"
- PRE_CONFIGURE="autoreconf -i" CONFIGURE="../configure --prefix=`pwd`/_install" BUILD_INSTALL="make install"
-
+
# Make sure CMake is installed
install:
- - sudo apt-get install cmake libpopt-dev
+ - sudo apt-get install cmake libpopt-dev libpolarssl-dev libgnutls-dev
# Run the Build script
script:
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 55d56e1..91d0812 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -207,6 +207,8 @@ if (POPT_FOUND AND XmlTo_FOUND)
set(DO_DOCS ON)
endif()
+find_package(Threads)
+
option(BUILD_SHARED_LIBS "Build rabbitmq-c as a shared library" ON)
option(BUILD_STATIC_LIBS "Build rabbitmq-c as a static library" OFF)
@@ -214,6 +216,29 @@ option(BUILD_EXAMPLES "Build Examples" ON)
option(BUILD_TOOLS "Build Tools (requires POPT Library)" ${POPT_FOUND})
option(BUILD_TOOLS_DOCS "Build man pages for Tools (requires xmlto)" ${DO_DOCS})
option(BUILD_TESTS "Build tests (run tests with make test)" ON)
+option(ENABLE_SSL_SUPPORT "Enable SSL support" ON)
+option(ENABLE_THREAD_SAFETY "Enable thread safety when using OpenSSL" ${Threads_FOUND})
+
+set(SSL_ENGINE "OpenSSL" CACHE STRING "SSL Backend to use, valid options: OpenSSL, cyaSSL, GnuTLS, PolarSSL")
+mark_as_advanced(SSL_ENGINE)
+
+if (ENABLE_SSL_SUPPORT)
+ if (SSL_ENGINE STREQUAL "OpenSSL")
+ find_package(OpenSSL 0.9.8 REQUIRED)
+
+ elseif (SSL_ENGINE STREQUAL "cyaSSL")
+ find_package(cyaSSL REQUIRED)
+
+ elseif (SSL_ENGINE STREQUAL "GnuTLS")
+ find_package(GnuTLS REQUIRED)
+
+ elseif (SSL_ENGINE STREQUAL "PolarSSL")
+ find_package(PolarSSL REQUIRED)
+
+ else()
+ message(FATAL_ERROR "Unsupported SSL_ENGINE ${SSL_ENGINE}, valid engines: OpenSSL, cyaSSL, GnuTLS, or PolarSSL")
+ endif()
+endif()
if (NOT BUILD_SHARED_LIBS AND NOT BUILD_STATIC_LIBS)
message(FATAL_ERROR "One or both of BUILD_SHARED_LIBS or BUILD_STATIC_LIBS must be set to ON to build")
diff --git a/Makefile.am b/Makefile.am
index d7bbe67..d8b4554 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -12,6 +12,8 @@ endif #REGENERATE_AMQP_FRAMING
lib_LTLIBRARIES = librabbitmq/librabbitmq.la
librabbitmq_librabbitmq_la_SOURCES = \
+ librabbitmq/amqp-socket.h \
+ librabbitmq/amqp_tcp_socket.c \
librabbitmq/amqp_api.c \
librabbitmq/amqp_connection.c \
librabbitmq/amqp_mem.c \
@@ -26,28 +28,54 @@ else
librabbitmq_librabbitmq_la_SOURCES += librabbitmq/amqp_framing.c
endif
+if SSL_CYASSL
+librabbitmq_librabbitmq_la_SOURCES += librabbitmq/amqp_cyassl.c
+endif
+
+if SSL_GNUTLS
+librabbitmq_librabbitmq_la_SOURCES += librabbitmq/amqp_gnutls.c
+endif
+
+if SSL_OPENSSL
+librabbitmq_librabbitmq_la_SOURCES += librabbitmq/amqp_openssl.c
+endif
+
+if SSL_POLARSSL
+librabbitmq_librabbitmq_la_SOURCES += librabbitmq/amqp_polarssl.c
+endif
+
+librabbitmq_librabbitmq_la_CFLAGS = \
+ -I$(top_srcdir)/librabbitmq \
+ $(SSL_CFLAGS) \
+ $(AM_CFLAGS)
librabbitmq_librabbitmq_la_LDFLAGS = \
-version-info $(LT_CURRENT):$(LT_REVISION):$(LT_AGE) \
- $(NO_UNDEFINED)
-librabbitmq_librabbitmq_la_CFLAGS = $(AM_CFLAGS)
-
+ $(NO_UNDEFINED) \
+ $(SSL_LIBS)
if OS_UNIX
librabbitmq_librabbitmq_la_SOURCES += librabbitmq/unix/socket.c
librabbitmq_librabbitmq_la_SOURCES += librabbitmq/unix/socket.h
+librabbitmq_librabbitmq_la_SOURCES += librabbitmq/unix/threads.h
librabbitmq_librabbitmq_la_CFLAGS += -I$(top_srcdir)/librabbitmq/unix
endif
if OS_WIN32
librabbitmq_librabbitmq_la_SOURCES += librabbitmq/win32/socket.c
librabbitmq_librabbitmq_la_SOURCES += librabbitmq/win32/socket.h
+librabbitmq_librabbitmq_la_SOURCES += librabbitmq/win32/threads.h
librabbitmq_librabbitmq_la_CFLAGS += -I$(top_srcdir)/librabbitmq/win32
librabbitmq_librabbitmq_la_CFLAGS += -I$(top_srcdir)/librabbitmq/win32/msinttypes
endif
include_HEADERS = \
$(top_srcdir)/librabbitmq/amqp.h
+ $(top_builddir)/librabbitmq/amqp_tcp_socket.h
+
+if SSL
+include_HEADERS += librabbitmq/amqp_ssl_socket.h
+endif
if REGENERATE_AMQP_FRAMING
@@ -106,7 +134,10 @@ tests_test_tables_LDADD = librabbitmq/librabbitmq.la
tests_test_parse_url_SOURCES = tests/test_parse_url.c
tests_test_parse_url_LDADD = librabbitmq/librabbitmq.la
-noinst_LTLIBRARIES = examples/libutils.la
+noinst_LTLIBRARIES =
+
+if EXAMPLES
+noinst_LTLIBRARIES += examples/libutils.la
examples_libutils_la_SOURCES = \
examples/utils.c \
@@ -173,11 +204,65 @@ examples_amqp_listenq_LDADD = \
examples/libutils.la \
librabbitmq/librabbitmq.la
-examples_amqp_rpc_sendstring_client_SOURCES = examples/amqp_rpc_sendstring_client.c
+examples_amqp_rpc_sendstring_client_SOURCES = \
+ examples/amqp_rpc_sendstring_client.c
examples_amqp_rpc_sendstring_client_LDADD = \
examples/libutils.la \
librabbitmq/librabbitmq.la
+if SSL
+noinst_PROGRAMS += \
+ examples/amqps_bind \
+ examples/amqps_consumer \
+ examples/amqps_exchange_declare \
+ examples/amqps_listen \
+ examples/amqps_listenq \
+ examples/amqps_producer \
+ examples/amqps_sendstring \
+ examples/amqps_unbind
+
+examples_amqps_bind_SOURCES = examples/amqps_bind.c
+examples_amqps_bind_LDADD = \
+ examples/libutils.la \
+ librabbitmq/librabbitmq.la
+
+examples_amqps_consumer_SOURCES = examples/amqps_consumer.c
+examples_amqps_consumer_LDADD = \
+ examples/libutils.la \
+ librabbitmq/librabbitmq.la
+
+examples_amqps_exchange_declare_SOURCES = examples/amqps_exchange_declare.c
+examples_amqps_exchange_declare_LDADD = \
+ examples/libutils.la \
+ librabbitmq/librabbitmq.la
+
+examples_amqps_listen_SOURCES = examples/amqps_listen.c
+examples_amqps_listen_LDADD = \
+ examples/libutils.la \
+ librabbitmq/librabbitmq.la
+
+examples_amqps_listenq_SOURCES = examples/amqps_listenq.c
+examples_amqps_listenq_LDADD = \
+ examples/libutils.la \
+ librabbitmq/librabbitmq.la
+
+examples_amqps_producer_SOURCES = examples/amqps_producer.c
+examples_amqps_producer_LDADD = \
+ examples/libutils.la \
+ librabbitmq/librabbitmq.la
+
+examples_amqps_sendstring_SOURCES = examples/amqps_sendstring.c
+examples_amqps_sendstring_LDADD = \
+ examples/libutils.la \
+ librabbitmq/librabbitmq.la
+
+examples_amqps_unbind_SOURCES = examples/amqps_unbind.c
+examples_amqps_unbind_LDADD = \
+ examples/libutils.la \
+ librabbitmq/librabbitmq.la
+endif
+endif
+
if TOOLS
noinst_LTLIBRARIES += tools/libcommon.la
diff --git a/cmake/FindPolarSSL.cmake b/cmake/FindPolarSSL.cmake
new file mode 100644
index 0000000..b9d04ef
--- /dev/null
+++ b/cmake/FindPolarSSL.cmake
@@ -0,0 +1,23 @@
+# - Try to find the PolarSSL SSL Library
+# The module will set the following variables
+#
+# POLARSSL_FOUND - System has popt
+# POLARSSL_INCLUDE_DIR - The popt include directory
+# POLARSSL_LIBRARIES - The libraries needed to use popt
+
+# Find the include directories
+FIND_PATH(POLARSSL_INCLUDE_DIR
+ NAMES polarssl/ssl.h
+ DOC "Path containing the polarssl/ssl.h include file"
+ )
+
+FIND_LIBRARY(POLARSSL_LIBRARIES
+ NAMES polarssl
+ DOC "polarssl library path"
+ )
+
+include(FindPackageHandleStandardArgs)
+
+FIND_PACKAGE_HANDLE_STANDARD_ARGS(POLARSSL
+ REQUIRED_VARS POLARSSL_INCLUDE_DIR POLARSSL_LIBRARIES
+ )
diff --git a/cmake/FindcyaSSL.cmake b/cmake/FindcyaSSL.cmake
new file mode 100644
index 0000000..06bac00
--- /dev/null
+++ b/cmake/FindcyaSSL.cmake
@@ -0,0 +1,23 @@
+# - Try to find the cyaSSL SSL Library
+# The module will set the following variables
+#
+# CYASSL_FOUND - System has popt
+# CYASSL_INCLUDE_DIR - The popt include directory
+# CYASSL_LIBRARIES - The libraries needed to use popt
+
+# Find the include directories
+FIND_PATH(CYASSL_INCLUDE_DIR
+ NAMES cyassl/ssl.h
+ DOC "Path containing the cyassl/ssl.h include file"
+ )
+
+FIND_LIBRARY(CYASSL_LIBRARIES
+ NAMES cyassl
+ DOC "cyassl library path"
+ )
+
+include(FindPackageHandleStandardArgs)
+
+FIND_PACKAGE_HANDLE_STANDARD_ARGS(CYASSL
+ REQUIRED_VARS CYASSL_INCLUDE_DIR CYASSL_LIBRARIES
+ )
diff --git a/configure.ac b/configure.ac
index 5bafa6d..c4b058b 100644
--- a/configure.ac
+++ b/configure.ac
@@ -34,6 +34,7 @@ m4_ifdef([AC_PROG_CC_C99], [AC_PROG_CC_C99],
[AC_MSG_WARN([Attempt c99 workaround for old versions of autoconf])
AC_PROG_CC
AX_TRY_CFLAGS([-std=c99], [AX_CFLAGS([-std=c99])])])
+PKG_PROG_PKG_CONFIG([0.17])
# Environment setup
AC_CANONICAL_HOST
@@ -68,6 +69,7 @@ AS_CASE([$host],
[os_unix=yes])
AM_CONDITIONAL([OS_UNIX], [test "x$os_unix" = xyes])
AM_CONDITIONAL([OS_WIN32], [test "x$os_win32" = xyes])
+AC_DEFINE([ENABLE_THREAD_SAFETY], [1], [Define to 1 to enable thread safety])
# Extra Win32 setup
AS_IF([test "x$os_win32" = xyes],
@@ -127,6 +129,35 @@ AS_IF([test "x$enable_regen_amqp_framing" = "xyes"],
[HAVE_PYTHON3=no])
AM_CONDITIONAL([PYTHON3], [test "x$HAVE_PYTHON3" = "xyes"])
+# Configure SSL/TLS
+AC_ARG_WITH([ssl],
+ [AS_HELP_STRING([--with-ssl=@<:@cyassl/gnutls/no/openssl/polarssl/yes@:>@],
+ [enable SSL/TLS support @<:@default=openssl@:>@])],
+ [AS_CASE([$withval],
+ [yes], [with_ssl=openssl],
+ [*], [with_ssl=$withval])],
+ [with_ssl=openssl])
+
+AS_IF([test "x$with_ssl" = "xcyassl"],
+ [PKG_CHECK_MODULES([SSL], [libcyassl],, [with_ssl=no])],
+ [test "x$with_ssl" = "xgnutls"],
+ [PKG_CHECK_MODULES([SSL], [gnutls],, [with_ssl=no])],
+ [test "x$with_ssl" = "xopenssl"],
+ [PKG_CHECK_MODULES([SSL], [openssl >= 0.9.8],, [with_ssl=no])],
+ [test "x$with_ssl" = "xpolarssl"],
+ [AX_LIB_POLARSSL([SSL_CFLAGS=$POLARSSL_CFLAGS
+ SSL_LIBS=$POLARSSL_LIBS],
+ [with_ssl=no])],
+ [test "x$with_ssl" = "xno"],,
+ [AC_MSG_ERROR([unknown SSL/TLS implementation: $with_ssl])])
+AM_CONDITIONAL([SSL_CYASSL], [test "x$with_ssl" = "xcyassl"])
+AM_CONDITIONAL([SSL_GNUTLS], [test "x$with_ssl" = "xgnutls"])
+AM_CONDITIONAL([SSL_OPENSSL], [test "x$with_ssl" = "xopenssl"])
+AM_CONDITIONAL([SSL_POLARSSL], [test "x$with_ssl" = "xpolarssl"])
+AM_CONDITIONAL([SSL], [test "x$with_ssl" != "xno"])
+AS_IF([test "x$with_ssl" != "xno"],
+ [AC_DEFINE([WITH_SSL], [1], [Define to 1 if SSL/TLS is enabled.])])
+
# Configure AMQP command-line tools
AC_ARG_ENABLE([tools],
[AS_HELP_STRING([--enable-tools],
@@ -152,6 +183,13 @@ AS_IF([test "x$enable_docs" != "xno"],
[enable_docs=no])])
AM_CONDITIONAL([DOCS], [test "x$enable_docs" = "xyes"])
+# Configure examples
+AC_ARG_ENABLE([examples],
+ [AS_HELP_STRING([--enable-examples],
+ [build example code @<:@auto@:>@])],,
+ [enable_examples=yes])
+AM_CONDITIONAL([EXAMPLES], [test "x$enable_examples" = "xyes"])
+
AC_CONFIG_HEADERS([config.h])
AC_CONFIG_FILES([
librabbitmq.pc
@@ -162,6 +200,8 @@ AC_MSG_RESULT([
$PACKAGE_NAME build options:
Host: $host
Version: $VERSION
+ SSL/TLS: $with_ssl
Tools: $enable_tools
Documentation: $enable_docs
+ Examples: $enable_examples
])
diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt
index 5a610cb..a184cf6 100644
--- a/examples/CMakeLists.txt
+++ b/examples/CMakeLists.txt
@@ -17,7 +17,7 @@ add_executable(amqp_sendstring amqp_sendstring.c ${COMMON_SRCS})
target_link_libraries(amqp_sendstring ${RMQ_LIBRARY_TARGET})
add_executable(amqp_rpc_sendstring_client amqp_rpc_sendstring_client.c ${COMMON_SRCS})
-target_link_libraries(amqp_rpc_sendstring_client rabbitmq)
+target_link_libraries(amqp_rpc_sendstring_client ${RMQ_LIBRARY_TARGET})
add_executable(amqp_exchange_declare amqp_exchange_declare.c ${COMMON_SRCS})
target_link_libraries(amqp_exchange_declare ${RMQ_LIBRARY_TARGET})
@@ -39,3 +39,29 @@ target_link_libraries(amqp_bind ${RMQ_LIBRARY_TARGET})
add_executable(amqp_listenq amqp_listenq.c ${COMMON_SRCS})
target_link_libraries(amqp_listenq ${RMQ_LIBRARY_TARGET})
+
+if (ENABLE_SSL_SUPPORT)
+add_executable(amqps_sendstring amqps_sendstring.c ${COMMON_SRCS})
+target_link_libraries(amqps_sendstring ${RMQ_LIBRARY_TARGET})
+
+add_executable(amqps_exchange_declare amqps_exchange_declare.c ${COMMON_SRCS})
+target_link_libraries(amqps_exchange_declare ${RMQ_LIBRARY_TARGET})
+
+add_executable(amqps_listen amqps_listen.c ${COMMON_SRCS})
+target_link_libraries(amqps_listen ${RMQ_LIBRARY_TARGET})
+
+add_executable(amqps_producer amqps_producer.c ${COMMON_SRCS})
+target_link_libraries(amqps_producer ${RMQ_LIBRARY_TARGET})
+
+add_executable(amqps_consumer amqps_consumer.c ${COMMON_SRCS})
+target_link_libraries(amqps_consumer ${RMQ_LIBRARY_TARGET})
+
+add_executable(amqps_unbind amqps_unbind.c ${COMMON_SRCS})
+target_link_libraries(amqps_unbind ${RMQ_LIBRARY_TARGET})
+
+add_executable(amqps_bind amqps_bind.c ${COMMON_SRCS})
+target_link_libraries(amqps_bind ${RMQ_LIBRARY_TARGET})
+
+add_executable(amqps_listenq amqps_listenq.c ${COMMON_SRCS})
+target_link_libraries(amqps_listenq ${RMQ_LIBRARY_TARGET})
+endif (ENABLE_SSL_SUPPORT)
diff --git a/examples/amqp_bind.c b/examples/amqp_bind.c
index b778d25..765e746 100644
--- a/examples/amqp_bind.c
+++ b/examples/amqp_bind.c
@@ -39,6 +39,7 @@
#include <string.h>
#include <stdint.h>
+#include <amqp_tcp_socket.h>
#include <amqp.h>
#include <amqp_framing.h>
@@ -47,12 +48,11 @@
int main(int argc, char const *const *argv)
{
char const *hostname;
- int port;
+ int port, status;
char const *exchange;
char const *bindingkey;
char const *queue;
-
- int sockfd;
+ amqp_socket_t *socket = NULL;
amqp_connection_state_t conn;
if (argc < 6) {
@@ -68,8 +68,17 @@ int main(int argc, char const *const *argv)
conn = amqp_new_connection();
- die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket");
- amqp_set_sockfd(conn, sockfd);
+ socket = amqp_tcp_socket_new();
+ if (!socket) {
+ die("creating TCP socket");
+ }
+
+ status = amqp_socket_open(socket, hostname, port);
+ if (status) {
+ die("opening TCP socket");
+ }
+
+ amqp_set_socket(conn, socket);
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
diff --git a/examples/amqp_consumer.c b/examples/amqp_consumer.c
index 878bf29..72bf654 100644
--- a/examples/amqp_consumer.c
+++ b/examples/amqp_consumer.c
@@ -39,6 +39,7 @@
#include <string.h>
#include <stdint.h>
+#include <amqp_tcp_socket.h>
#include <amqp.h>
#include <amqp_framing.h>
@@ -125,11 +126,10 @@ static void run(amqp_connection_state_t conn)
int main(int argc, char const *const *argv)
{
char const *hostname;
- int port;
+ int port, status;
char const *exchange;
char const *bindingkey;
-
- int sockfd;
+ amqp_socket_t *socket = NULL;
amqp_connection_state_t conn;
amqp_bytes_t queuename;
@@ -146,8 +146,17 @@ int main(int argc, char const *const *argv)
conn = amqp_new_connection();
- die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket");
- amqp_set_sockfd(conn, sockfd);
+ socket = amqp_tcp_socket_new();
+ if (!socket) {
+ die("creating TCP socket");
+ }
+
+ status = amqp_socket_open(socket, hostname, port);
+ if (status) {
+ die("opening TCP socket");
+ }
+
+ amqp_set_socket(conn, socket);
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
diff --git a/examples/amqp_exchange_declare.c b/examples/amqp_exchange_declare.c
index a12319b..55860e5 100644
--- a/examples/amqp_exchange_declare.c
+++ b/examples/amqp_exchange_declare.c
@@ -39,6 +39,7 @@
#include <string.h>
#include <stdint.h>
+#include <amqp_tcp_socket.h>
#include <amqp.h>
#include <amqp_framing.h>
@@ -47,11 +48,10 @@
int main(int argc, char const *const *argv)
{
char const *hostname;
- int port;
+ int port, status;
char const *exchange;
char const *exchangetype;
-
- int sockfd;
+ amqp_socket_t *socket = NULL;
amqp_connection_state_t conn;
if (argc < 5) {
@@ -66,8 +66,17 @@ int main(int argc, char const *const *argv)
conn = amqp_new_connection();
- die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket");
- amqp_set_sockfd(conn, sockfd);
+ socket = amqp_tcp_socket_new();
+ if (!socket) {
+ die("creating TCP socket");
+ }
+
+ status = amqp_socket_open(socket, hostname, port);
+ if (status) {
+ die("opening TCP socket");
+ }
+
+ amqp_set_socket(conn, socket);
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
diff --git a/examples/amqp_listen.c b/examples/amqp_listen.c
index 1e8fa79..9385c17 100644
--- a/examples/amqp_listen.c
+++ b/examples/amqp_listen.c
@@ -39,6 +39,7 @@
#include <string.h>
#include <stdint.h>
+#include <amqp_tcp_socket.h>
#include <amqp.h>
#include <amqp_framing.h>
@@ -49,11 +50,10 @@
int main(int argc, char const *const *argv)
{
char const *hostname;
- int port;
+ int port, status;
char const *exchange;
char const *bindingkey;
-
- int sockfd;
+ amqp_socket_t *socket = NULL;
amqp_connection_state_t conn;
amqp_bytes_t queuename;
@@ -70,8 +70,17 @@ int main(int argc, char const *const *argv)
conn = amqp_new_connection();
- die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket");
- amqp_set_sockfd(conn, sockfd);
+ socket = amqp_tcp_socket_new();
+ if (!socket) {
+ die("creating TCP socket");
+ }
+
+ status = amqp_socket_open(socket, hostname, port);
+ if (status) {
+ die("opening TCP socket");
+ }
+
+ amqp_set_socket(conn, socket);
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
diff --git a/examples/amqp_listenq.c b/examples/amqp_listenq.c
index 8d9c3e3..54c1189 100644
--- a/examples/amqp_listenq.c
+++ b/examples/amqp_listenq.c
@@ -39,6 +39,7 @@
#include <string.h>
#include <stdint.h>
+#include <amqp_tcp_socket.h>
#include <amqp.h>
#include <amqp_framing.h>
@@ -49,10 +50,9 @@
int main(int argc, char const *const *argv)
{
char const *hostname;
- int port;
+ int port, status;
char const *queuename;
-
- int sockfd;
+ amqp_socket_t *socket = NULL;
amqp_connection_state_t conn;
if (argc < 4) {
@@ -66,8 +66,17 @@ int main(int argc, char const *const *argv)
conn = amqp_new_connection();
- die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket");
- amqp_set_sockfd(conn, sockfd);
+ socket = amqp_tcp_socket_new();
+ if (!socket) {
+ die("creating TCP socket");
+ }
+
+ status = amqp_socket_open(socket, hostname, port);
+ if (status) {
+ die("opening TCP socket");
+ }
+
+ amqp_set_socket(conn, socket);
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
diff --git a/examples/amqp_producer.c b/examples/amqp_producer.c
index 229756b..efa1a20 100644
--- a/examples/amqp_producer.c
+++ b/examples/amqp_producer.c
@@ -39,6 +39,7 @@
#include <string.h>
#include <stdint.h>
+#include <amqp_tcp_socket.h>
#include <amqp.h>
#include <amqp_framing.h>
@@ -111,11 +112,10 @@ static void send_batch(amqp_connection_state_t conn,
int main(int argc, char const *const *argv)
{
char const *hostname;
- int port;
+ int port, status;
int rate_limit;
int message_count;
-
- int sockfd;
+ amqp_socket_t *socket = NULL;
amqp_connection_state_t conn;
if (argc < 5) {
@@ -130,8 +130,17 @@ int main(int argc, char const *const *argv)
conn = amqp_new_connection();
- die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket");
- amqp_set_sockfd(conn, sockfd);
+ socket = amqp_tcp_socket_new();
+ if (!socket) {
+ die("creating TCP socket");
+ }
+
+ status = amqp_socket_open(socket, hostname, port);
+ if (status) {
+ die("opening TCP socket");
+ }
+
+ amqp_set_socket(conn, socket);
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
diff --git a/examples/amqp_rpc_sendstring_client.c b/examples/amqp_rpc_sendstring_client.c
index 3357524..6688195 100644
--- a/examples/amqp_rpc_sendstring_client.c
+++ b/examples/amqp_rpc_sendstring_client.c
@@ -39,6 +39,7 @@
#include <string.h>
#include <stdint.h>
+#include <amqp_tcp_socket.h>
#include <amqp.h>
#include <amqp_framing.h>
@@ -49,12 +50,11 @@
int main(int argc, char *argv[])
{
char const *hostname;
- int port;
+ int port, status;
char const *exchange;
char const *routingkey;
char const *messagebody;
-
- int sockfd;
+ amqp_socket_t *socket = NULL;
amqp_connection_state_t conn;
amqp_bytes_t reply_to_queue;
@@ -75,8 +75,17 @@ int main(int argc, char *argv[])
conn = amqp_new_connection();
- die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket");
- amqp_set_sockfd(conn, sockfd);
+ socket = amqp_tcp_socket_new();
+ if (!socket) {
+ die("creating TCP socket");
+ }
+
+ status = amqp_socket_open(socket, hostname, port);
+ if (status) {
+ die("opening TCP socket");
+ }
+
+ amqp_set_socket(conn, socket);
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
diff --git a/examples/amqp_sendstring.c b/examples/amqp_sendstring.c
index 424bd68..0b64024 100644
--- a/examples/amqp_sendstring.c
+++ b/examples/amqp_sendstring.c
@@ -39,6 +39,7 @@
#include <string.h>
#include <stdint.h>
+#include <amqp_tcp_socket.h>
#include <amqp.h>
#include <amqp_framing.h>
@@ -47,12 +48,11 @@
int main(int argc, char const *const *argv)
{
char const *hostname;
- int port;
+ int port, status;
char const *exchange;
char const *routingkey;
char const *messagebody;
-
- int sockfd;
+ amqp_socket_t *socket = NULL;
amqp_connection_state_t conn;
if (argc < 6) {
@@ -68,8 +68,17 @@ int main(int argc, char const *const *argv)
conn = amqp_new_connection();
- die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket");
- amqp_set_sockfd(conn, sockfd);
+ socket = amqp_tcp_socket_new();
+ if (!socket) {
+ die("creating TCP socket");
+ }
+
+ status = amqp_socket_open(socket, hostname, port);
+ if (status) {
+ die("opening TCP socket");
+ }
+
+ amqp_set_socket(conn, socket);
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
diff --git a/examples/amqp_unbind.c b/examples/amqp_unbind.c
index 77ac724..7948d0b 100644
--- a/examples/amqp_unbind.c
+++ b/examples/amqp_unbind.c
@@ -39,6 +39,7 @@
#include <string.h>
#include <stdint.h>
+#include <amqp_tcp_socket.h>
#include <amqp.h>
#include <amqp_framing.h>
@@ -47,12 +48,11 @@
int main(int argc, char const *const *argv)
{
char const *hostname;
- int port;
+ int port, status;
char const *exchange;
char const *bindingkey;
char const *queue;
-
- int sockfd;
+ amqp_socket_t *socket = NULL;
amqp_connection_state_t conn;
if (argc < 6) {
@@ -68,8 +68,17 @@ int main(int argc, char const *const *argv)
conn = amqp_new_connection();
- die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket");
- amqp_set_sockfd(conn, sockfd);
+ socket = amqp_tcp_socket_new();
+ if (!socket) {
+ die("creating TCP socket");
+ }
+
+ status = amqp_socket_open(socket, hostname, port);
+ if (status) {
+ die("opening TCP socket");
+ }
+
+ amqp_set_socket(conn, socket);
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
diff --git a/examples/amqps_bind.c b/examples/amqps_bind.c
new file mode 100644
index 0000000..fbde025
--- /dev/null
+++ b/examples/amqps_bind.c
@@ -0,0 +1,115 @@
+/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */
+/*
+ * ***** BEGIN LICENSE BLOCK *****
+ * Version: MIT
+ *
+ * Portions created by Alan Antonuk are Copyright (c) 2012-2013
+ * Alan Antonuk. All Rights Reserved.
+ *
+ * Portions created by Mike Steinert are Copyright (c) 2012-2013
+ * Mike Steinert. All Rights Reserved.
+ *
+ * Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc.
+ * All Rights Reserved.
+ *
+ * Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010
+ * VMware, Inc. and Tony Garnock-Jones. All Rights Reserved.
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use, copy,
+ * modify, merge, publish, distribute, sublicense, and/or sell copies
+ * of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ * ***** END LICENSE BLOCK *****
+ */
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+
+#include <stdint.h>
+#include <amqp_ssl_socket.h>
+#include <amqp_framing.h>
+
+#include "utils.h"
+
+int main(int argc, char const *const *argv)
+{
+ char const *hostname;
+ int port, status;
+ char const *exchange;
+ char const *bindingkey;
+ char const *queue;
+ amqp_socket_t *socket;
+ amqp_connection_state_t conn;
+
+ if (argc < 6) {
+ fprintf(stderr, "Usage: amqps_bind host port exchange bindingkey queue "
+ "[cacert.pem [key.pem cert.pem]]\n");
+ return 1;
+ }
+
+ hostname = argv[1];
+ port = atoi(argv[2]);
+ exchange = argv[3];
+ bindingkey = argv[4];
+ queue = argv[5];
+
+ conn = amqp_new_connection();
+
+ socket = amqp_ssl_socket_new();
+ if (!socket) {
+ die("creating SSL/TLS socket");
+ }
+
+ if (argc > 6) {
+ status = amqp_ssl_socket_set_cacert(socket, argv[6]);
+ if (status) {
+ die("setting CA certificate");
+ }
+ }
+
+ if (argc > 8) {
+ status = amqp_ssl_socket_set_key(socket, argv[8], argv[7]);
+ if (status) {
+ die("setting client cert");
+ }
+ }
+
+ status = amqp_socket_open(socket, hostname, port);
+ if (status) {
+ die("opening SSL/TLS connection");
+ }
+
+ amqp_set_socket(conn, socket);
+ die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
+ "Logging in");
+ amqp_channel_open(conn, 1);
+ die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
+
+ amqp_queue_bind(conn, 1,
+ amqp_cstring_bytes(queue),
+ amqp_cstring_bytes(exchange),
+ amqp_cstring_bytes(bindingkey),
+ amqp_empty_table);
+ die_on_amqp_error(amqp_get_rpc_reply(conn), "Unbinding");
+
+ die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
+ die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
+ die_on_error(amqp_destroy_connection(conn), "Ending connection");
+ return 0;
+}
diff --git a/examples/amqps_consumer.c b/examples/amqps_consumer.c
new file mode 100644
index 0000000..137457f
--- /dev/null
+++ b/examples/amqps_consumer.c
@@ -0,0 +1,206 @@
+/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */
+/*
+ * ***** BEGIN LICENSE BLOCK *****
+ * Version: MIT
+ *
+ * Portions created by Alan Antonuk are Copyright (c) 2012-2013
+ * Alan Antonuk. All Rights Reserved.
+ *
+ * Portions created by Mike Steinert are Copyright (c) 2012-2013
+ * Mike Steinert. All Rights Reserved.
+ *
+ * Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc.
+ * All Rights Reserved.
+ *
+ * Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010
+ * VMware, Inc. and Tony Garnock-Jones. All Rights Reserved.
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use, copy,
+ * modify, merge, publish, distribute, sublicense, and/or sell copies
+ * of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ * ***** END LICENSE BLOCK *****
+ */
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+
+#include <stdint.h>
+#include <amqp_ssl_socket.h>
+#include <amqp_framing.h>
+
+#include <assert.h>
+
+#include "utils.h"
+
+#define SUMMARY_EVERY_US 1000000
+
+static void run(amqp_connection_state_t conn)
+{
+ uint64_t start_time = now_microseconds();
+ int received = 0;
+ int previous_received = 0;
+ uint64_t previous_report_time = start_time;
+ uint64_t next_summary_time = start_time + SUMMARY_EVERY_US;
+
+ amqp_frame_t frame;
+ int result;
+ size_t body_received;
+ size_t body_target;
+
+ uint64_t now;
+
+ while (1) {
+ now = now_microseconds();
+ if (now > next_summary_time) {
+ int countOverInterval = received - previous_received;
+ double intervalRate = countOverInterval / ((now - previous_report_time) / 1000000.0);
+ printf("%d ms: Received %d - %d since last report (%d Hz)\n",
+ (int)(now - start_time) / 1000, received, countOverInterval, (int) intervalRate);
+
+ previous_received = received;
+ previous_report_time = now;
+ next_summary_time += SUMMARY_EVERY_US;
+ }
+
+ amqp_maybe_release_buffers(conn);
+ result = amqp_simple_wait_frame(conn, &frame);
+ if (result < 0) {
+ return;
+ }
+
+ if (frame.frame_type != AMQP_FRAME_METHOD) {
+ continue;
+ }
+
+ if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) {
+ continue;
+ }
+
+ result = amqp_simple_wait_frame(conn, &frame);
+ if (result < 0) {
+ return;
+ }
+
+ if (frame.frame_type != AMQP_FRAME_HEADER) {
+ fprintf(stderr, "Expected header!");
+ abort();
+ }
+
+ body_target = frame.payload.properties.body_size;
+ body_received = 0;
+
+ while (body_received < body_target) {
+ result = amqp_simple_wait_frame(conn, &frame);
+ if (result < 0) {
+ return;
+ }
+
+ if (frame.frame_type != AMQP_FRAME_BODY) {
+ fprintf(stderr, "Expected body!");
+ abort();
+ }
+
+ body_received += frame.payload.body_fragment.len;
+ assert(body_received <= body_target);
+ }
+
+ received++;
+ }
+}
+
+int main(int argc, char const *const *argv)
+{
+ char const *hostname;
+ int port, status;
+ char const *exchange;
+ char const *bindingkey;
+ amqp_socket_t *socket;
+ amqp_connection_state_t conn;
+ amqp_bytes_t queuename;
+
+ if (argc < 3) {
+ fprintf(stderr, "Usage: amqps_consumer host port "
+ "[cacert.pem [key.pem cert.pem]]\n");
+ return 1;
+ }
+
+ hostname = argv[1];
+ port = atoi(argv[2]);
+ exchange = "amq.direct"; /* argv[3]; */
+ bindingkey = "test queue"; /* argv[4]; */
+
+ conn = amqp_new_connection();
+
+ socket = amqp_ssl_socket_new();
+ if (!socket) {
+ die("creating SSL/TLS socket");
+ }
+
+ if (argc > 3) {
+ status = amqp_ssl_socket_set_cacert(socket, argv[3]);
+ if (status) {
+ die("setting CA certificate");
+ }
+ }
+
+ if (argc > 5) {
+ status = amqp_ssl_socket_set_key(socket, argv[5], argv[4]);
+ if (status) {
+ die("setting client key");
+ }
+ }
+
+ status = amqp_socket_open(socket, hostname, port);
+ if (status) {
+ die("opening SSL/TLS connection");
+ }
+
+ amqp_set_socket(conn, socket);
+ die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
+ "Logging in");
+ amqp_channel_open(conn, 1);
+ die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
+
+ {
+ amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_empty_bytes, 0, 0, 0, 1,
+ amqp_empty_table);
+ die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue");
+ queuename = amqp_bytes_malloc_dup(r->queue);
+ if (queuename.bytes == NULL) {
+ fprintf(stderr, "Out of memory while copying queue name");
+ return 1;
+ }
+ }
+
+ amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey),
+ amqp_empty_table);
+ die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue");
+
+ amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
+ die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");
+
+ run(conn);
+
+ die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
+ die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
+ die_on_error(amqp_destroy_connection(conn), "Ending connection");
+
+ return 0;
+}
diff --git a/examples/amqps_exchange_declare.c b/examples/amqps_exchange_declare.c
new file mode 100644
index 0000000..bae2f57
--- /dev/null
+++ b/examples/amqps_exchange_declare.c
@@ -0,0 +1,110 @@
+/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */
+/*
+ * ***** BEGIN LICENSE BLOCK *****
+ * Version: MIT
+ *
+ * Portions created by Alan Antonuk are Copyright (c) 2012-2013
+ * Alan Antonuk. All Rights Reserved.
+ *
+ * Portions created by Mike Steinert are Copyright (c) 2012-2013
+ * Mike Steinert. All Rights Reserved.
+ *
+ * Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc.
+ * All Rights Reserved.
+ *
+ * Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010
+ * VMware, Inc. and Tony Garnock-Jones. All Rights Reserved.
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use, copy,
+ * modify, merge, publish, distribute, sublicense, and/or sell copies
+ * of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ * ***** END LICENSE BLOCK *****
+ */
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+
+#include <stdint.h>
+#include <amqp_ssl_socket.h>
+#include <amqp_framing.h>
+
+#include "utils.h"
+
+int main(int argc, char const *const *argv)
+{
+ char const *hostname;
+ int port, status;
+ char const *exchange;
+ char const *exchangetype;
+ amqp_socket_t *socket;
+ amqp_connection_state_t conn;
+
+ if (argc < 5) {
+ fprintf(stderr, "Usage: amqps_exchange_declare host port exchange "
+ "exchangetype [cacert.pem [key.pem cert.pem]]\n");
+ return 1;
+ }
+
+ hostname = argv[1];
+ port = atoi(argv[2]);
+ exchange = argv[3];
+ exchangetype = argv[4];
+
+ conn = amqp_new_connection();
+
+ socket = amqp_ssl_socket_new();
+ if (!socket) {
+ die("creating SSL/TLS socket");
+ }
+
+ if (argc > 5) {
+ status = amqp_ssl_socket_set_cacert(socket, argv[5]);
+ if (status) {
+ die("setting CA certificate");
+ }
+ }
+
+ if (argc > 7) {
+ status = amqp_ssl_socket_set_key(socket, argv[7], argv[6]);
+ if (status) {
+ die("setting client key/cert");
+ }
+ }
+
+ status = amqp_socket_open(socket, hostname, port);
+ if (status) {
+ die("opening SSL/TLS connection");
+ }
+
+ amqp_set_socket(conn, socket);
+ die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
+ "Logging in");
+ amqp_channel_open(conn, 1);
+ die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
+
+ amqp_exchange_declare(conn, 1, amqp_cstring_bytes(exchange), amqp_cstring_bytes(exchangetype),
+ 0, 0, amqp_empty_table);
+ die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring exchange");
+
+ die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
+ die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
+ die_on_error(amqp_destroy_connection(conn), "Ending connection");
+ return 0;
+}
diff --git a/examples/amqps_listen.c b/examples/amqps_listen.c
new file mode 100644
index 0000000..0e45162
--- /dev/null
+++ b/examples/amqps_listen.c
@@ -0,0 +1,207 @@
+/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */
+/*
+ * ***** BEGIN LICENSE BLOCK *****
+ * Version: MIT
+ *
+ * Portions created by Alan Antonuk are Copyright (c) 2012-2013
+ * Alan Antonuk. All Rights Reserved.
+ *
+ * Portions created by Mike Steinert are Copyright (c) 2012-2013
+ * Mike Steinert. All Rights Reserved.
+ *
+ * Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc.
+ * All Rights Reserved.
+ *
+ * Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010
+ * VMware, Inc. and Tony Garnock-Jones. All Rights Reserved.
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use, copy,
+ * modify, merge, publish, distribute, sublicense, and/or sell copies
+ * of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ * ***** END LICENSE BLOCK *****
+ */
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+
+#include <stdint.h>
+#include <amqp_ssl_socket.h>
+#include <amqp_framing.h>
+
+#include <assert.h>
+
+#include "utils.h"
+
+int main(int argc, char const *const *argv)
+{
+ char const *hostname;
+ int port, status;
+ char const *exchange;
+ char const *bindingkey;
+ amqp_socket_t *socket;
+ amqp_connection_state_t conn;
+
+ amqp_bytes_t queuename;
+
+ if (argc < 5) {
+ fprintf(stderr, "Usage: amqps_listen host port exchange bindingkey "
+ "[cacert.pem [key.pem cert.pem]]\n");
+ return 1;
+ }
+
+ hostname = argv[1];
+ port = atoi(argv[2]);
+ exchange = argv[3];
+ bindingkey = argv[4];
+
+ conn = amqp_new_connection();
+
+ socket = amqp_ssl_socket_new();
+ if (!socket) {
+ die("creating SSL/TLS socket");
+ }
+
+ if (argc > 5) {
+ status = amqp_ssl_socket_set_cacert(socket, argv[5]);
+ if (status) {
+ die("setting CA certificate");
+ }
+ }
+
+ if (argc > 7) {
+ status = amqp_ssl_socket_set_key(socket, argv[7], argv[6]);
+ if (status) {
+ die("setting client cert");
+ }
+ }
+
+ status = amqp_socket_open(socket, hostname, port);
+ if (status) {
+ die("opening SSL/TLS connection");
+ }
+
+ amqp_set_socket(conn, socket);
+ die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
+ "Logging in");
+ amqp_channel_open(conn, 1);
+ die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
+
+ {
+ amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_empty_bytes, 0, 0, 0, 1,
+ amqp_empty_table);
+ die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue");
+ queuename = amqp_bytes_malloc_dup(r->queue);
+ if (queuename.bytes == NULL) {
+ fprintf(stderr, "Out of memory while copying queue name");
+ return 1;
+ }
+ }
+
+ amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey),
+ amqp_empty_table);
+ die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue");
+
+ amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
+ die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");
+
+ {
+ amqp_frame_t frame;
+ int result;
+
+ amqp_basic_deliver_t *d;
+ amqp_basic_properties_t *p;
+ size_t body_target;
+ size_t body_received;
+
+ while (1) {
+ amqp_maybe_release_buffers(conn);
+ result = amqp_simple_wait_frame(conn, &frame);
+ printf("Result %d\n", result);
+ if (result < 0) {
+ break;
+ }
+
+ printf("Frame type %d, channel %d\n", frame.frame_type, frame.channel);
+ if (frame.frame_type != AMQP_FRAME_METHOD) {
+ continue;
+ }
+
+ printf("Method %s\n", amqp_method_name(frame.payload.method.id));
+ if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) {
+ continue;
+ }
+
+ d = (amqp_basic_deliver_t *) frame.payload.method.decoded;
+ printf("Delivery %u, exchange %.*s routingkey %.*s\n",
+ (unsigned) d->delivery_tag,
+ (int) d->exchange.len, (char *) d->exchange.bytes,
+ (int) d->routing_key.len, (char *) d->routing_key.bytes);
+
+ result = amqp_simple_wait_frame(conn, &frame);
+ if (result < 0) {
+ break;
+ }
+
+ if (frame.frame_type != AMQP_FRAME_HEADER) {
+ fprintf(stderr, "Expected header!");
+ abort();
+ }
+ p = (amqp_basic_properties_t *) frame.payload.properties.decoded;
+ if (p->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
+ printf("Content-type: %.*s\n",
+ (int) p->content_type.len, (char *) p->content_type.bytes);
+ }
+ printf("----\n");
+
+ body_target = frame.payload.properties.body_size;
+ body_received = 0;
+
+ while (body_received < body_target) {
+ result = amqp_simple_wait_frame(conn, &frame);
+ if (result < 0) {
+ break;
+ }
+
+ if (frame.frame_type != AMQP_FRAME_BODY) {
+ fprintf(stderr, "Expected body!");
+ abort();
+ }
+
+ body_received += frame.payload.body_fragment.len;
+ assert(body_received <= body_target);
+
+ amqp_dump(frame.payload.body_fragment.bytes,
+ frame.payload.body_fragment.len);
+ }
+
+ if (body_received != body_target) {
+ /* Can only happen when amqp_simple_wait_frame returns <= 0 */
+ /* We break here to close the connection */
+ break;
+ }
+ }
+ }
+
+ die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
+ die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
+ die_on_error(amqp_destroy_connection(conn), "Ending connection");
+
+ return 0;
+}
diff --git a/examples/amqps_listenq.c b/examples/amqps_listenq.c
new file mode 100644
index 0000000..321c6a3
--- /dev/null
+++ b/examples/amqps_listenq.c
@@ -0,0 +1,190 @@
+/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */
+/*
+ * ***** BEGIN LICENSE BLOCK *****
+ * Version: MIT
+ *
+ * Portions created by Alan Antonuk are Copyright (c) 2012-2013
+ * Alan Antonuk. All Rights Reserved.
+ *
+ * Portions created by Mike Steinert are Copyright (c) 2012-2013
+ * Mike Steinert. All Rights Reserved.
+ *
+ * Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc.
+ * All Rights Reserved.
+ *
+ * Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010
+ * VMware, Inc. and Tony Garnock-Jones. All Rights Reserved.
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use, copy,
+ * modify, merge, publish, distribute, sublicense, and/or sell copies
+ * of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ * ***** END LICENSE BLOCK *****
+ */
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+
+#include <stdint.h>
+#include <amqp_ssl_socket.h>
+#include <amqp_framing.h>
+
+#include <assert.h>
+
+#include "utils.h"
+
+int main(int argc, char const *const *argv)
+{
+ char const *hostname;
+ int port, status;
+ char const *queuename;
+ amqp_socket_t *socket;
+ amqp_connection_state_t conn;
+
+ if (argc < 4) {
+ fprintf(stderr, "Usage: amqps_listenq host port queuename "
+ "[cacert.pem [key.pem cert.pem]]\n");
+ return 1;
+ }
+
+ hostname = argv[1];
+ port = atoi(argv[2]);
+ queuename = argv[3];
+
+ conn = amqp_new_connection();
+
+ socket = amqp_ssl_socket_new();
+ if (!socket) {
+ die("creating SSL/TLS socket");
+ }
+
+ if (argc > 4) {
+ status = amqp_ssl_socket_set_cacert(socket, argv[4]);
+ if (status) {
+ die("setting CA certificate");
+ }
+ }
+
+ if (argc > 6) {
+ status = amqp_ssl_socket_set_key(socket, argv[6], argv[5]);
+ if (status) {
+ die("setting client cert");
+ }
+ }
+
+ status = amqp_socket_open(socket, hostname, port);
+ if (status) {
+ die("opening SSL/TLS connection");
+ }
+
+ amqp_set_socket(conn, socket);
+ die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
+ "Logging in");
+ amqp_channel_open(conn, 1);
+ die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
+
+ amqp_basic_consume(conn, 1, amqp_cstring_bytes(queuename), amqp_empty_bytes, 0, 0, 0, amqp_empty_table);
+ die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");
+
+ {
+ amqp_frame_t frame;
+ int result;
+
+ amqp_basic_deliver_t *d;
+ amqp_basic_properties_t *p;
+ size_t body_target;
+ size_t body_received;
+
+ while (1) {
+ amqp_maybe_release_buffers(conn);
+ result = amqp_simple_wait_frame(conn, &frame);
+ printf("Result %d\n", result);
+ if (result < 0) {
+ break;
+ }
+
+ printf("Frame type %d, channel %d\n", frame.frame_type, frame.channel);
+ if (frame.frame_type != AMQP_FRAME_METHOD) {
+ continue;
+ }
+
+ printf("Method %s\n", amqp_method_name(frame.payload.method.id));
+ if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) {
+ continue;
+ }
+
+ d = (amqp_basic_deliver_t *) frame.payload.method.decoded;
+ printf("Delivery %u, exchange %.*s routingkey %.*s\n",
+ (unsigned) d->delivery_tag,
+ (int) d->exchange.len, (char *) d->exchange.bytes,
+ (int) d->routing_key.len, (char *) d->routing_key.bytes);
+
+ result = amqp_simple_wait_frame(conn, &frame);
+ if (result < 0) {
+ break;
+ }
+
+ if (frame.frame_type != AMQP_FRAME_HEADER) {
+ fprintf(stderr, "Expected header!");
+ abort();
+ }
+ p = (amqp_basic_properties_t *) frame.payload.properties.decoded;
+ if (p->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
+ printf("Content-type: %.*s\n",
+ (int) p->content_type.len, (char *) p->content_type.bytes);
+ }
+ printf("----\n");
+
+ body_target = frame.payload.properties.body_size;
+ body_received = 0;
+
+ while (body_received < body_target) {
+ result = amqp_simple_wait_frame(conn, &frame);
+ if (result < 0) {
+ break;
+ }
+
+ if (frame.frame_type != AMQP_FRAME_BODY) {
+ fprintf(stderr, "Expected body!");
+ abort();
+ }
+
+ body_received += frame.payload.body_fragment.len;
+ assert(body_received <= body_target);
+
+ amqp_dump(frame.payload.body_fragment.bytes,
+ frame.payload.body_fragment.len);
+ }
+
+ if (body_received != body_target) {
+ /* Can only happen when amqp_simple_wait_frame returns <= 0 */
+ /* We break here to close the connection */
+ break;
+ }
+
+ amqp_basic_ack(conn, 1, d->delivery_tag, 0);
+ }
+ }
+
+ die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
+ die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
+ die_on_error(amqp_destroy_connection(conn), "Ending connection");
+
+ return 0;
+}
diff --git a/examples/amqps_producer.c b/examples/amqps_producer.c
new file mode 100644
index 0000000..f8f6dc6
--- /dev/null
+++ b/examples/amqps_producer.c
@@ -0,0 +1,172 @@
+/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */
+/*
+ * ***** BEGIN LICENSE BLOCK *****
+ * Version: MIT
+ *
+ * Portions created by Alan Antonuk are Copyright (c) 2012-2013
+ * Alan Antonuk. All Rights Reserved.
+ *
+ * Portions created by Mike Steinert are Copyright (c) 2012-2013
+ * Mike Steinert. All Rights Reserved.
+ *
+ * Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc.
+ * All Rights Reserved.
+ *
+ * Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010
+ * VMware, Inc. and Tony Garnock-Jones. All Rights Reserved.
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use, copy,
+ * modify, merge, publish, distribute, sublicense, and/or sell copies
+ * of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ * ***** END LICENSE BLOCK *****
+ */
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+
+#include <stdint.h>
+#include <amqp_ssl_socket.h>
+#include <amqp_framing.h>
+
+#include "utils.h"
+
+#define SUMMARY_EVERY_US 1000000
+
+static void send_batch(amqp_connection_state_t conn,
+ char const *queue_name,
+ int rate_limit,
+ int message_count)
+{
+ uint64_t start_time = now_microseconds();
+ int i;
+ int sent = 0;
+ int previous_sent = 0;
+ uint64_t previous_report_time = start_time;
+ uint64_t next_summary_time = start_time + SUMMARY_EVERY_US;
+
+ char message[256];
+ amqp_bytes_t message_bytes;
+
+ for (i = 0; i < (int)sizeof(message); i++) {
+ message[i] = i & 0xff;
+ }
+
+ message_bytes.len = sizeof(message);
+ message_bytes.bytes = message;
+
+ for (i = 0; i < message_count; i++) {
+ uint64_t now = now_microseconds();
+
+ die_on_error(amqp_basic_publish(conn,
+ 1,
+ amqp_cstring_bytes("amq.direct"),
+ amqp_cstring_bytes(queue_name),
+ 0,
+ 0,
+ NULL,
+ message_bytes),
+ "Publishing");
+ sent++;
+ if (now > next_summary_time) {
+ int countOverInterval = sent - previous_sent;
+ double intervalRate = countOverInterval / ((now - previous_report_time) / 1000000.0);
+ printf("%d ms: Sent %d - %d since last report (%d Hz)\n",
+ (int)(now - start_time) / 1000, sent, countOverInterval, (int) intervalRate);
+
+ previous_sent = sent;
+ previous_report_time = now;
+ next_summary_time += SUMMARY_EVERY_US;
+ }
+
+ while (((i * 1000000.0) / (now - start_time)) > rate_limit) {
+ microsleep(2000);
+ now = now_microseconds();
+ }
+ }
+
+ {
+ uint64_t stop_time = now_microseconds();
+ int total_delta = stop_time - start_time;
+
+ printf("PRODUCER - Message count: %d\n", message_count);
+ printf("Total time, milliseconds: %d\n", total_delta / 1000);
+ printf("Overall messages-per-second: %g\n", (message_count / (total_delta / 1000000.0)));
+ }
+}
+
+int main(int argc, char const *const *argv)
+{
+ char const *hostname;
+ int port, status;
+ int rate_limit;
+ int message_count;
+ amqp_socket_t *socket;
+ amqp_connection_state_t conn;
+
+ if (argc < 5) {
+ fprintf(stderr, "Usage: amqps_producer host port rate_limit message_count "
+ "[cacert.pem [key.pem cert.pem]]\n");
+ return 1;
+ }
+
+ hostname = argv[1];
+ port = atoi(argv[2]);
+ rate_limit = atoi(argv[3]);
+ message_count = atoi(argv[4]);
+
+ conn = amqp_new_connection();
+
+ socket = amqp_ssl_socket_new();
+ if (!socket) {
+ die("creating SSL/TLS socket");
+ }
+
+ if (argc > 5) {
+ status = amqp_ssl_socket_set_cacert(socket, argv[5]);
+ if (status) {
+ die("setting CA certificate");
+ }
+ }
+
+ if (argc > 7) {
+ status = amqp_ssl_socket_set_key(socket, argv[7], argv[6]);
+ if (status) {
+ die("setting client cert");
+ }
+ }
+
+ status = amqp_socket_open(socket, hostname, port);
+ if (status) {
+ die("opening SSL/TLS connection");
+ }
+
+ amqp_set_socket(conn, socket);
+ die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
+ "Logging in");
+ amqp_channel_open(conn, 1);
+ die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
+
+ send_batch(conn, "test queue", rate_limit, message_count);
+
+ die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
+ die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
+ die_on_error(amqp_destroy_connection(conn), "Ending connection");
+ return 0;
+}
diff --git a/examples/amqps_sendstring.c b/examples/amqps_sendstring.c
new file mode 100644
index 0000000..7465ef2
--- /dev/null
+++ b/examples/amqps_sendstring.c
@@ -0,0 +1,124 @@
+/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */
+/*
+ * ***** BEGIN LICENSE BLOCK *****
+ * Version: MIT
+ *
+ * Portions created by Alan Antonuk are Copyright (c) 2012-2013
+ * Alan Antonuk. All Rights Reserved.
+ *
+ * Portions created by Mike Steinert are Copyright (c) 2012-2013
+ * Mike Steinert. All Rights Reserved.
+ *
+ * Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc.
+ * All Rights Reserved.
+ *
+ * Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010
+ * VMware, Inc. and Tony Garnock-Jones. All Rights Reserved.
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use, copy,
+ * modify, merge, publish, distribute, sublicense, and/or sell copies
+ * of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ * ***** END LICENSE BLOCK *****
+ */
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+
+#include <stdint.h>
+#include <amqp_ssl_socket.h>
+#include <amqp_framing.h>
+
+#include "utils.h"
+
+int main(int argc, char const *const *argv)
+{
+ char const *hostname;
+ int port, status;
+ char const *exchange;
+ char const *routingkey;
+ char const *messagebody;
+ amqp_socket_t *socket;
+ amqp_connection_state_t conn;
+
+ if (argc < 6) {
+ fprintf(stderr, "Usage: amqps_sendstring host port exchange routingkey "
+ "messagebody [cacert.pem [key.pem cert.pem]]\n");
+ return 1;
+ }
+
+ hostname = argv[1];
+ port = atoi(argv[2]);
+ exchange = argv[3];
+ routingkey = argv[4];
+ messagebody = argv[5];
+
+ conn = amqp_new_connection();
+
+ socket = amqp_ssl_socket_new();
+ if (!socket) {
+ die("creating SSL/TLS socket");
+ }
+
+ if (argc > 6) {
+ status = amqp_ssl_socket_set_cacert(socket, argv[6]);
+ if (status) {
+ die("setting CA certificate");
+ }
+ }
+
+ if (argc > 8) {
+ status = amqp_ssl_socket_set_key(socket, argv[8], argv[7]);
+ if (status) {
+ die("setting client cert");
+ }
+ }
+
+ status = amqp_socket_open(socket, hostname, port);
+ if (status) {
+ die("opening SSL/TLS connection");
+ }
+
+ amqp_set_socket(conn, socket);
+ die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
+ "Logging in");
+ amqp_channel_open(conn, 1);
+ die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
+
+ {
+ amqp_basic_properties_t props;
+ props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
+ props.content_type = amqp_cstring_bytes("text/plain");
+ props.delivery_mode = 2; /* persistent delivery mode */
+ die_on_error(amqp_basic_publish(conn,
+ 1,
+ amqp_cstring_bytes(exchange),
+ amqp_cstring_bytes(routingkey),
+ 0,
+ 0,
+ &props,
+ amqp_cstring_bytes(messagebody)),
+ "Publishing");
+ }
+
+ die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
+ die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
+ die_on_error(amqp_destroy_connection(conn), "Ending connection");
+ return 0;
+}
diff --git a/examples/amqps_unbind.c b/examples/amqps_unbind.c
new file mode 100644
index 0000000..bae017d
--- /dev/null
+++ b/examples/amqps_unbind.c
@@ -0,0 +1,115 @@
+/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */
+/*
+ * ***** BEGIN LICENSE BLOCK *****
+ * Version: MIT
+ *
+ * Portions created by Alan Antonuk are Copyright (c) 2012-2013
+ * Alan Antonuk. All Rights Reserved.
+ *
+ * Portions created by Mike Steinert are Copyright (c) 2012-2013
+ * Mike Steinert. All Rights Reserved.
+ *
+ * Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc.
+ * All Rights Reserved.
+ *
+ * Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010
+ * VMware, Inc. and Tony Garnock-Jones. All Rights Reserved.
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use, copy,
+ * modify, merge, publish, distribute, sublicense, and/or sell copies
+ * of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ * ***** END LICENSE BLOCK *****
+ */
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+
+#include <stdint.h>
+#include <amqp_ssl_socket.h>
+#include <amqp_framing.h>
+
+#include "utils.h"
+
+int main(int argc, char const *const *argv)
+{
+ char const *hostname;
+ int port, status;
+ char const *exchange;
+ char const *bindingkey;
+ char const *queue;
+ amqp_socket_t *socket;
+ amqp_connection_state_t conn;
+
+ if (argc < 6) {
+ fprintf(stderr, "Usage: amqps_unbind host port exchange bindingkey queue "
+ "[cacert.pem [key.pem cert.pem]]\n");
+ return 1;
+ }
+
+ hostname = argv[1];
+ port = atoi(argv[2]);
+ exchange = argv[3];
+ bindingkey = argv[4];
+ queue = argv[5];
+
+ conn = amqp_new_connection();
+
+ socket = amqp_ssl_socket_new();
+ if (!socket) {
+ die("creating SSL/TLS socket");
+ }
+
+ if (argc > 6) {
+ status = amqp_ssl_socket_set_cacert(socket, argv[6]);
+ if (status) {
+ die("setting CA certificate");
+ }
+ }
+
+ if (argc > 8) {
+ status = amqp_ssl_socket_set_key(socket, argv[8], argv[7]);
+ if (status) {
+ die("setting client cert");
+ }
+ }
+
+ status = amqp_socket_open(socket, hostname, port);
+ if (status) {
+ die("opening SSL/TLS connection");
+ }
+
+ amqp_set_socket(conn, socket);
+ die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
+ "Logging in");
+ amqp_channel_open(conn, 1);
+ die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
+
+ amqp_queue_unbind(conn, 1,
+ amqp_cstring_bytes(queue),
+ amqp_cstring_bytes(exchange),
+ amqp_cstring_bytes(bindingkey),
+ amqp_empty_table);
+ die_on_amqp_error(amqp_get_rpc_reply(conn), "Unbinding");
+
+ die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
+ die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
+ die_on_error(amqp_destroy_connection(conn), "Ending connection");
+ return 0;
+}
diff --git a/examples/utils.c b/examples/utils.c
index 609c354..4b00470 100644
--- a/examples/utils.c
+++ b/examples/utils.c
@@ -34,6 +34,7 @@
* ***** END LICENSE BLOCK *****
*/
+#include <stdarg.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
@@ -45,6 +46,16 @@
#include "utils.h"
+void die(const char *fmt, ...)
+{
+ va_list ap;
+ va_start(ap, fmt);
+ vfprintf(stderr, fmt, ap);
+ va_end(ap);
+ fprintf(stderr, "\n");
+ exit(1);
+}
+
void die_on_error(int x, char const *context)
{
if (x < 0) {
diff --git a/examples/utils.h b/examples/utils.h
index 2e7b15f..dea86da 100644
--- a/examples/utils.h
+++ b/examples/utils.h
@@ -37,6 +37,7 @@
* ***** END LICENSE BLOCK *****
*/
+void die(const char *fmt, ...);
extern void die_on_error(int x, char const *context);
extern void die_on_amqp_error(amqp_rpc_reply_t x, char const *context);
diff --git a/librabbitmq/CMakeLists.txt b/librabbitmq/CMakeLists.txt
index a9996cf..52e2682 100644
--- a/librabbitmq/CMakeLists.txt
+++ b/librabbitmq/CMakeLists.txt
@@ -76,13 +76,51 @@ set(LIBRABBITMQ_INCLUDE_DIRS
add_definitions(-DHAVE_CONFIG_H)
+if (ENABLE_SSL_SUPPORT)
+ add_definitions(-DWITH_SSL=1)
+
+ if (SSL_ENGINE STREQUAL "OpenSSL")
+ set(AMQP_SSL_SRCS amqp_ssl_socket.h amqp_openssl.c)
+ include_directories(${OPENSSL_INCLUDE_DIR})
+ set(AMQP_SSL_LIBS ${OPENSSL_LIBRARIES})
+
+ elseif (SSL_ENGINE STREQUAL "cyaSSL")
+ set(AMQP_SSL_SRCS amqp_ssl_socket.h amqp_cyassl.c)
+ include_directories(${CYASSL_INCLUDE_DIR})
+ set(AMQP_SSL_LIBS ${CYASSL_LIBRARIES})
+
+ elseif (SSL_ENGINE STREQUAL "GnuTLS")
+ set(AMQP_SSL_SRCS amqp_ssl_socket.h amqp_gnutls.c)
+ include_directories(${GNUTLS_INCLUDE_DIR})
+ add_definitions(${GNUTLS_DEFINITIONS})
+ set(AMQP_SSL_LIBS ${GNUTLS_LIBRARIES})
+
+ elseif (SSL_ENGINE STREQUAL "PolarSSL")
+ set(AMQP_SSL_SRCS amqp_ssl_socket.h amqp_polarssl.c)
+ include_directories(${POLARSSL_INCLUDE_DIR})
+ set(AMQP_SSL_LIBS ${POLARSSL_LIBRARIES})
+
+ else()
+ message(FATAL_ERROR "Unknown SSL_ENGINE ${SSL_ENGINE}")
+ endif()
+
+ if (ENABLE_THREAD_SAFETY)
+ add_definitions(-DENABLE_THREAD_SAFETY)
+ if (WIN32)
+ set(AMQP_SSL_SRCS ${AMQP_SSL_SRCS} win32/threads.h win32/threads.c)
+ else()
+ set(AMQP_SSL_SRCS ${AMQP_SSL_SRCS} unix/threads.h)
+ endif()
+ endif()
+endif()
+
set(RABBITMQ_SOURCES
${AMQP_FRAMING_H_PATH}
${AMQP_FRAMING_C_PATH}
- amqp_api.c amqp.h
- amqp_connection.c amqp_mem.c amqp_private.h amqp_socket.c amqp_table.c
- amqp_url.c
+ amqp_api.c amqp.h amqp_connection.c amqp_mem.c amqp_private.h amqp_socket.c
+ amqp_table.c amqp_url.c amqp_socket.h amqp_tcp_socket.c amqp_tcp_socket.h
${SOCKET_IMPL}/socket.h ${SOCKET_IMPL}/socket.c
+ ${AMQP_SSL_SRCS}
)
add_definitions(-DAMQP_BUILD)
@@ -92,6 +130,10 @@ include(InstallMacros)
if (BUILD_SHARED_LIBS)
add_library(rabbitmq SHARED ${RABBITMQ_SOURCES})
+ if (ENABLE_SSL_SUPPORT)
+ target_link_libraries(rabbitmq ${AMQP_SSL_LIBS} ${CMAKE_THREAD_LIBS_INIT})
+ endif()
+
if (WIN32)
set_target_properties(rabbitmq PROPERTIES VERSION ${RMQ_VERSION} OUTPUT_NAME rabbitmq.${RMQ_SOVERSION})
else (WIN32)
@@ -141,4 +183,3 @@ install(FILES amqp.h ${AMQP_FRAMING_H_PATH} ${STDINT_H_INSTALL_FILE}
)
set(RMQ_LIBRARY_TARGET ${RMQ_LIBRARY_TARGET} PARENT_SCOPE)
-
diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h
index 184ef51..36c1411 100644
--- a/librabbitmq/amqp.h
+++ b/librabbitmq/amqp.h
@@ -108,6 +108,7 @@
# define AMQP_CALL __cdecl
#elif defined(__GNUC__) && __GNUC__ >= 4
+# include <sys/uio.h>
# define AMQP_PUBLIC_FUNCTION \
__attribute__ ((visibility ("default")))
# define AMQP_PUBLIC_VARIABLE \
@@ -119,6 +120,35 @@
# define AMQP_CALL
#endif
+#if __GNUC__ > 3 || (__GNUC__ == 3 && __GNUC_MINOR__ >= 1)
+# define AMQP_DEPRECATED(function) \
+ function __attribute__ ((__deprecated__))
+#elif defined(_MSC_VER)
+# define AMQP_DEPRECATED(function) \
+ __declspec(deprecated) function
+#else
+# define AMQP_DEPRECATED(function)
+#endif
+
+/* Define ssize_t on Win32/64 platforms
+ See: http://lists.cs.uiuc.edu/pipermail/llvmdev/2010-April/030649.html for details
+ */
+#if !defined(_W64)
+#if !defined(__midl) && (defined(_X86_) || defined(_M_IX86)) && _MSC_VER >= 1300
+#define _W64 __w64
+#else
+#define _W64
+#endif
+#endif
+
+#ifdef _MSC_VER
+#ifdef _WIN64
+typedef __int64 ssize_t;
+#else
+typedef _W64 int ssize_t;
+#endif
+#endif
+
#include <stddef.h>
#include <stdint.h>
@@ -297,6 +327,8 @@ typedef enum amqp_sasl_method_enum_ {
/* Opaque struct. */
typedef struct amqp_connection_state_t_ *amqp_connection_state_t;
+typedef struct amqp_socket_t_ amqp_socket_t;
+
AMQP_PUBLIC_FUNCTION
char const *
AMQP_CALL amqp_version(void);
@@ -356,9 +388,15 @@ AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_get_sockfd(amqp_connection_state_t state);
+AMQP_DEPRECATED(
+ AMQP_PUBLIC_FUNCTION
+ void
+ AMQP_CALL amqp_set_sockfd(amqp_connection_state_t state, int sockfd)
+);
+
AMQP_PUBLIC_FUNCTION
void
-AMQP_CALL amqp_set_sockfd(amqp_connection_state_t state, int sockfd);
+AMQP_CALL amqp_set_socket(amqp_connection_state_t state, amqp_socket_t *socket);
AMQP_PUBLIC_FUNCTION
int
@@ -474,8 +512,8 @@ AMQP_CALL amqp_login(amqp_connection_state_t state, char const *vhost,
AMQP_PUBLIC_FUNCTION
amqp_rpc_reply_t
AMQP_CALL amqp_login_with_properties(amqp_connection_state_t state, char const *vhost,
- int channel_max, int frame_max, int heartbeat,
- const amqp_table_t *properties, amqp_sasl_method_enum sasl_method, ...);
+ int channel_max, int frame_max, int heartbeat,
+ const amqp_table_t *properties, amqp_sasl_method_enum sasl_method, ...);
struct amqp_basic_properties_t_;
@@ -547,6 +585,7 @@ struct amqp_connection_info {
char *host;
char *vhost;
int port;
+ amqp_boolean_t ssl;
};
AMQP_PUBLIC_FUNCTION
@@ -557,6 +596,74 @@ AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_parse_url(char *url, struct amqp_connection_info *parsed);
+/* socket API */
+
+/**
+ * Open a socket connection.
+ *
+ * This function opens a socket connection returned from amqp_tcp_socket_new()
+ * or amqp_ssl_socket_new(). This function should be called after setting
+ * socket options and prior to assigning the socket to an AMQP connection with
+ * amqp_set_socket().
+ *
+ * \param [in,out] self A socket object.
+ * \param [in] host Connect to this host.
+ * \param [in] port Connect on this remote port.
+ *
+ * \return Zero upon success, non-zero otherwise.
+ */
+AMQP_PUBLIC_FUNCTION
+int
+AMQP_CALL
+amqp_socket_open(amqp_socket_t *self, const char *host, int port);
+
+/**
+ * Close a socket connection and free resources.
+ *
+ * This function closes a socket connection and releases any resources used by
+ * the object. After calling this function the specified socket should no
+ * longer be referenced.
+ *
+ * \param [in,out] self A socket object.
+ *
+ * \return Zero upon success, non-zero otherwise.
+ */
+AMQP_PUBLIC_FUNCTION
+int
+AMQP_CALL
+amqp_socket_close(amqp_socket_t *self);
+
+/**
+ * Retrieve an error code for the last socket operation.
+ *
+ * At the time of writing, this interface is not well supported and is subject
+ * to changes!
+ *
+ * \param [in,out] self A socket object.
+ *
+ * \return Zero upon success, an opaque error code otherwise
+ */
+AMQP_PUBLIC_FUNCTION
+int
+AMQP_CALL
+amqp_socket_error(amqp_socket_t *self);
+
+/**
+ * Get the socket descriptor in use by a socket object.
+ *
+ * Retrieve the underlying socket descriptor. This function can be used to
+ * perform low-level socket operations that aren't supported by the socket
+ * interface. Use with caution!
+ *
+ * \param [in,out] self A socket object.
+ *
+ * \return The underlying socket descriptor.
+ */
+AMQP_PUBLIC_FUNCTION
+int
+AMQP_CALL
+amqp_socket_get_sockfd(amqp_socket_t *self);
+
AMQP_END_DECLS
#include <amqp_framing.h>
diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c
index 4a84bd3..6bc8397 100644
--- a/librabbitmq/amqp_api.c
+++ b/librabbitmq/amqp_api.c
@@ -75,6 +75,11 @@ char *amqp_error_string(int err)
case ERROR_CATEGORY_OS:
return amqp_os_error_string(err);
+#ifdef WITH_SSL
+ case ERROR_CATEGORY_SSL:
+ return amqp_ssl_error_string(err);
+#endif
+
default:
str = "(undefined error category)";
}
diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c
index b3196b1..2d50ad3 100644
--- a/librabbitmq/amqp_connection.c
+++ b/librabbitmq/amqp_connection.c
@@ -38,8 +38,10 @@
#include "config.h"
#endif
+#include "amqp_tcp_socket.h"
#include "amqp_private.h"
#include <assert.h>
+#include <errno.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
@@ -89,7 +91,6 @@ amqp_connection_state_t amqp_new_connection(void)
is also the minimum frame size */
state->target_size = 8;
- state->sockfd = -1;
state->sock_inbound_buffer.len = INITIAL_INBOUND_SOCK_BUFFER_SIZE;
state->sock_inbound_buffer.bytes = malloc(INITIAL_INBOUND_SOCK_BUFFER_SIZE);
if (state->sock_inbound_buffer.bytes == NULL) {
@@ -108,13 +109,24 @@ out_nomem:
int amqp_get_sockfd(amqp_connection_state_t state)
{
- return state->sockfd;
+ return state->socket ? amqp_socket_get_sockfd(state->socket) : -1;
}
void amqp_set_sockfd(amqp_connection_state_t state,
int sockfd)
{
- state->sockfd = sockfd;
+ amqp_socket_t *socket = amqp_tcp_socket_new();
+ if (!socket) {
+ amqp_abort("%s", strerror(errno));
+ }
+ amqp_tcp_socket_set_sockfd(socket, sockfd);
+ amqp_set_socket(state, socket);
+}
+
+void amqp_set_socket(amqp_connection_state_t state, amqp_socket_t *socket)
+{
+ amqp_socket_close(state->socket);
+ state->socket = socket;
}
int amqp_tune_connection(amqp_connection_state_t state,
@@ -152,19 +164,18 @@ int amqp_get_channel_max(amqp_connection_state_t state)
int amqp_destroy_connection(amqp_connection_state_t state)
{
- int s = state->sockfd;
-
- empty_amqp_pool(&state->frame_pool);
- empty_amqp_pool(&state->decoding_pool);
- free(state->outbound_buffer.bytes);
- free(state->sock_inbound_buffer.bytes);
- free(state);
-
- if (s >= 0 && amqp_socket_close(s) < 0) {
- return -amqp_socket_error();
- } else {
- return 0;
+ int status = 0;
+ if (state) {
+ empty_amqp_pool(&state->frame_pool);
+ empty_amqp_pool(&state->decoding_pool);
+ free(state->outbound_buffer.bytes);
+ free(state->sock_inbound_buffer.bytes);
+ if (amqp_socket_close(state->socket) < 0) {
+ status = -amqp_socket_error(state->socket);
+ }
+ free(state);
}
+ return status;
}
static void return_to_idle(amqp_connection_state_t state)
@@ -392,7 +403,7 @@ int amqp_send_frame(amqp_connection_state_t state,
iov[2].iov_base = &frame_end_byte;
iov[2].iov_len = FOOTER_SIZE;
- res = amqp_socket_writev(state->sockfd, iov, 3);
+ res = amqp_socket_writev(state->socket, iov, 3);
} else {
size_t out_frame_len;
amqp_bytes_t encoded;
@@ -440,12 +451,13 @@ int amqp_send_frame(amqp_connection_state_t state,
amqp_e32(out_frame, 3, out_frame_len);
amqp_e8(out_frame, out_frame_len + HEADER_SIZE, AMQP_FRAME_END);
- res = send(state->sockfd, out_frame,
- out_frame_len + HEADER_SIZE + FOOTER_SIZE, MSG_NOSIGNAL);
+ res = amqp_socket_send(state->socket, out_frame,
+ out_frame_len + HEADER_SIZE + FOOTER_SIZE,
+ MSG_NOSIGNAL);
}
if (res < 0) {
- return -amqp_socket_error();
+ return -amqp_socket_error(state->socket);
} else {
return 0;
}
diff --git a/librabbitmq/amqp_cyassl.c b/librabbitmq/amqp_cyassl.c
new file mode 100644
index 0000000..9657449
--- /dev/null
+++ b/librabbitmq/amqp_cyassl.c
@@ -0,0 +1,270 @@
+/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */
+/*
+ * Copyright 2012-2013 Michael Steinert
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "amqp_ssl_socket.h"
+#include "amqp_private.h"
+#include <cyassl/ssl.h>
+#include <stdlib.h>
+#include <string.h>
+
+#ifndef AMQP_USE_UNTESTED_SSL_BACKEND
+# error This SSL backend is alpha quality and likely contains errors.\
+ -DAMQP_USE_UNTESTED_SSL_BACKEND to use this backend
+#endif
+
+struct amqp_ssl_socket_t {
+ const struct amqp_socket_class_t *klass;
+ CYASSL_CTX *ctx;
+ CYASSL *ssl;
+ int sockfd;
+ char *buffer;
+ size_t length;
+ int last_error;
+};
+
+static ssize_t
+amqp_ssl_socket_send(void *base,
+ const void *buf,
+ size_t len,
+ AMQP_UNUSED int flags)
+{
+ int status;
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+
+ self->last_error = 0;
+ status = CyaSSL_write(self->ssl, buf, len);
+ if (status <= 0) {
+ self->last_error = ERROR_CATEGORY_SSL;
+ }
+
+ return status;
+}
+
+static ssize_t
+amqp_ssl_socket_writev(void *base,
+ const struct iovec *iov,
+ int iovcnt)
+{
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+ ssize_t written = -1;
+ char *bufferp;
+ size_t bytes;
+ int i;
+ self->last_error = 0;
+ bytes = 0;
+ for (i = 0; i < iovcnt; ++i) {
+ bytes += iov[i].iov_len;
+ }
+ if (self->length < bytes) {
+ free(self->buffer);
+ self->buffer = malloc(bytes);
+ if (!self->buffer) {
+ self->length = 0;
+ self->last_error = ERROR_NO_MEMORY;
+ goto exit;
+ }
+ self->length = bytes;
+ }
+ bufferp = self->buffer;
+ for (i = 0; i < iovcnt; ++i) {
+ memcpy(bufferp, iov[i].iov_base, iov[i].iov_len);
+ bufferp += iov[i].iov_len;
+ }
+ written = amqp_ssl_socket_send(self, self->buffer, bytes, 0);
+exit:
+ return written;
+}
+
+static ssize_t
+amqp_ssl_socket_recv(void *base,
+ void *buf,
+ size_t len,
+ AMQP_UNUSED int flags)
+{
+ int status;
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+
+ self->last_error = 0;
+ status = CyaSSL_read(self->ssl, buf, len);
+ if (status <= 0) {
+ self->last_error = ERROR_CATEGORY_SSL;
+ }
+
+ return status;
+}
+
+static int
+amqp_ssl_socket_get_sockfd(void *base)
+{
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+ return self->sockfd;
+}
+
+static int
+amqp_ssl_socket_close(void *base)
+{
+ int status = -1;
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+ if (self->sockfd >= 0) {
+ status = amqp_os_socket_close(self->sockfd);
+ }
+ if (self) {
+ CyaSSL_free(self->ssl);
+ CyaSSL_CTX_free(self->ctx);
+ free(self->buffer);
+ free(self);
+ }
+ return status;
+}
+
+static int
+amqp_ssl_socket_error(void *base)
+{
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+ return self->last_error;
+}
+
+char *
+amqp_ssl_error_string(AMQP_UNUSED int err)
+{
+ return strdup("A ssl socket error occurred.");
+}
+
+static int
+amqp_ssl_socket_open(void *base, const char *host, int port)
+{
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+ int status;
+ self->last_error = 0;
+
+ self->ssl = CyaSSL_new(self->ctx);
+ if (NULL == self->ssl) {
+ self->last_error = ERROR_CATEGORY_SSL;
+ return -1;
+ }
+
+ self->sockfd = amqp_open_socket(host, port);
+ if (0 > self->sockfd) {
+ self->last_error = - self->sockfd;
+ return -1;
+ }
+ CyaSSL_set_fd(self->ssl, self->sockfd);
+ status = CyaSSL_connect(self->ssl);
+ if (SSL_SUCCESS != status) {
+ self->last_error = ERROR_CATEGORY_SSL;
+ return -1;
+ }
+ return 0;
+}
+
+static const struct amqp_socket_class_t amqp_ssl_socket_class = {
+ amqp_ssl_socket_writev, /* writev */
+ amqp_ssl_socket_send, /* send */
+ amqp_ssl_socket_recv, /* recv */
+ amqp_ssl_socket_open, /* open */
+ amqp_ssl_socket_close, /* close */
+ amqp_ssl_socket_error, /* error */
+ amqp_ssl_socket_get_sockfd /* get_sockfd */
+};
+
+amqp_socket_t *
+amqp_ssl_socket_new(void)
+{
+ struct amqp_ssl_socket_t *self = calloc(1, sizeof(*self));
+ if (!self) {
+ goto error;
+ }
+ CyaSSL_Init();
+ self->ctx = CyaSSL_CTX_new(CyaSSLv23_client_method());
+ if (!self->ctx) {
+ goto error;
+ }
+ self->klass = &amqp_ssl_socket_class;
+ return (amqp_socket_t *)self;
+error:
+ amqp_socket_close((amqp_socket_t *)self);
+ return NULL;
+}
+
+int
+amqp_ssl_socket_set_cacert(amqp_socket_t *base,
+ const char *cacert)
+{
+ int status;
+ struct amqp_ssl_socket_t *self;
+ if (base->klass != &amqp_ssl_socket_class) {
+ amqp_abort("<%p> is not of type amqp_ssl_socket_t", base);
+ }
+ self = (struct amqp_ssl_socket_t *)base;
+ status = CyaSSL_CTX_load_verify_locations(self->ctx, cacert, NULL);
+ if (SSL_SUCCESS != status) {
+ return -1;
+ }
+ return 0;
+}
+
+int
+amqp_ssl_socket_set_key(amqp_socket_t *base,
+ const char *cert,
+ const char *key)
+{
+ int status;
+ struct amqp_ssl_socket_t *self;
+ if (base->klass != &amqp_ssl_socket_class) {
+ amqp_abort("<%p> is not of type amqp_ssl_socket_t", base);
+ }
+ self = (struct amqp_ssl_socket_t *)base;
+ status = CyaSSL_CTX_use_PrivateKey_file(self->ctx, key,
+ SSL_FILETYPE_PEM);
+ if (SSL_SUCCESS != status) {
+ return -1;
+ }
+ status = CyaSSL_CTX_use_certificate_chain_file(self->ctx, cert);
+ return 0;
+}
+
+int
+amqp_ssl_socket_set_key_buffer(AMQP_UNUSED amqp_socket_t *base,
+ AMQP_UNUSED const char *cert,
+ AMQP_UNUSED const void *key,
+ AMQP_UNUSED size_t n)
+{
+ amqp_abort("%s is not implemented for CyaSSL", __func__);
+ return -1;
+}
+
+void
+amqp_ssl_socket_set_verify(AMQP_UNUSED amqp_socket_t *base,
+ AMQP_UNUSED amqp_boolean_t verify)
+{
+ /* noop for CyaSSL */
+}
+
+void
+amqp_set_initialize_ssl_library(AMQP_UNUSED amqp_boolean_t do_initialize)
+{
+}
diff --git a/librabbitmq/amqp_gnutls.c b/librabbitmq/amqp_gnutls.c
new file mode 100644
index 0000000..f05fcb8
--- /dev/null
+++ b/librabbitmq/amqp_gnutls.c
@@ -0,0 +1,362 @@
+/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */
+/*
+ * Copyright 2012-2013 Michael Steinert
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "amqp_ssl_socket.h"
+#include "amqp_private.h"
+#include <gnutls/gnutls.h>
+#include <gnutls/x509.h>
+#include <stdlib.h>
+#include <string.h>
+
+#ifndef AMQP_USE_UNTESTED_SSL_BACKEND
+# error This SSL backend is alpha quality and likely contains errors.\
+ -DAMQP_USE_UNTESTED_SSL_BACKEND to use this backend
+#endif
+
+struct amqp_ssl_socket_t {
+ const struct amqp_socket_class_t *klass;
+ gnutls_session_t session;
+ gnutls_certificate_credentials_t credentials;
+ int sockfd;
+ char *host;
+ char *buffer;
+ size_t length;
+ int last_error;
+};
+
+static ssize_t
+amqp_ssl_socket_send(void *base,
+ const void *buf,
+ size_t len,
+ AMQP_UNUSED int flags)
+{
+ ssize_t status;
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+
+ self->last_error = 0;
+ status = gnutls_record_send(self->session, buf, len);
+ if (status < 0) {
+ self->last_error = ERROR_CATEGORY_SSL;
+ }
+ return status;
+}
+
+static ssize_t
+amqp_ssl_socket_writev(void *base,
+ const struct iovec *iov,
+ int iovcnt)
+{
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+ ssize_t written = -1;
+ char *bufferp;
+ size_t bytes;
+ int i;
+ self->last_error = 0;
+ bytes = 0;
+ for (i = 0; i < iovcnt; ++i) {
+ bytes += iov[i].iov_len;
+ }
+ if (self->length < bytes) {
+ free(self->buffer);
+ self->buffer = malloc(bytes);
+ if (!self->buffer) {
+ self->length = 0;
+ self->last_error = ERROR_NO_MEMORY;
+ goto exit;
+ }
+ self->length = 0;
+ }
+ bufferp = self->buffer;
+ for (i = 0; i < iovcnt; ++i) {
+ memcpy(bufferp, iov[i].iov_base, iov[i].iov_len);
+ bufferp += iov[i].iov_len;
+ }
+ written = amqp_ssl_socket_send(self, self->buffer, bytes, 0);
+exit:
+ return written;
+}
+
+static ssize_t
+amqp_ssl_socket_recv(void *base,
+ void *buf,
+ size_t len,
+ AMQP_UNUSED int flags)
+{
+ ssize_t status;
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+
+ self->last_error = 0;
+ status = gnutls_record_recv(self->session, buf, len);
+ if (status < 0) {
+ self->last_error = ERROR_CATEGORY_SSL;
+ }
+
+ return status;
+}
+
+static int
+amqp_ssl_socket_open(void *base, const char *host, int port)
+{
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+ int status;
+ self->last_error = 0;
+
+ free(self->host);
+ self->host = strdup(host);
+ if (NULL == self->host) {
+ self->last_error = ERROR_NO_MEMORY;
+ return -1;
+ }
+
+ self->sockfd = amqp_open_socket(host, port);
+ if (0 > self->sockfd) {
+ self->last_error = -self->sockfd;
+ return -1;
+ }
+ gnutls_transport_set_ptr(self->session,
+ (gnutls_transport_ptr_t)self->sockfd);
+ do {
+ status = gnutls_handshake(self->session);
+ } while (status < 0 && !gnutls_error_is_fatal(status));
+
+ if (gnutls_error_is_fatal(status)) {
+ self->last_error = ERROR_CATEGORY_SSL;
+ }
+
+ return status;
+}
+
+static int
+amqp_ssl_socket_close(void *base)
+{
+ int status = -1;
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+ if (self->sockfd >= 0) {
+ status = amqp_os_socket_close(self->sockfd);
+ }
+ if (self) {
+ gnutls_deinit(self->session);
+ gnutls_certificate_free_credentials(self->credentials);
+ free(self->host);
+ free(self->buffer);
+ free(self);
+ }
+ return status;
+}
+
+static int
+amqp_ssl_socket_error(void *base)
+{
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+ return self->last_error;
+}
+
+char *
+amqp_ssl_error_string(AMQP_UNUSED int err)
+{
+ return strdup("A SSL error occurred");
+}
+
+static int
+amqp_ssl_socket_get_sockfd(void *base)
+{
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+ return self->sockfd;
+}
+
+static int
+amqp_ssl_verify(gnutls_session_t session)
+{
+ int ret;
+ unsigned int status, size;
+ const gnutls_datum_t *list;
+ gnutls_x509_crt_t cert = NULL;
+ struct amqp_ssl_socket_t *self = gnutls_session_get_ptr(session);
+ ret = gnutls_certificate_verify_peers2(session, &status);
+ if (0 > ret) {
+ goto error;
+ }
+ if (status & GNUTLS_CERT_INVALID) {
+ goto error;
+ }
+ if (status & GNUTLS_CERT_SIGNER_NOT_FOUND) {
+ goto error;
+ }
+ if (status & GNUTLS_CERT_REVOKED) {
+ goto error;
+ }
+ if (status & GNUTLS_CERT_EXPIRED) {
+ goto error;
+ }
+ if (status & GNUTLS_CERT_NOT_ACTIVATED) {
+ goto error;
+ }
+ if (gnutls_certificate_type_get(session) != GNUTLS_CRT_X509) {
+ goto error;
+ }
+ if (gnutls_x509_crt_init(&cert) < 0) {
+ goto error;
+ }
+ list = gnutls_certificate_get_peers(session, &size);
+ if (!list) {
+ goto error;
+ }
+ ret = gnutls_x509_crt_import(cert, &list[0], GNUTLS_X509_FMT_DER);
+ if (0 > ret) {
+ goto error;
+ }
+ if (!gnutls_x509_crt_check_hostname(cert, self->host)) {
+ goto error;
+ }
+ gnutls_x509_crt_deinit(cert);
+ return 0;
+error:
+ if (cert) {
+ gnutls_x509_crt_deinit (cert);
+ }
+ return GNUTLS_E_CERTIFICATE_ERROR;
+}
+
+static const struct amqp_socket_class_t amqp_ssl_socket_class = {
+ amqp_ssl_socket_writev, /* writev */
+ amqp_ssl_socket_send, /* send */
+ amqp_ssl_socket_recv, /* recv */
+ amqp_ssl_socket_open, /* open */
+ amqp_ssl_socket_close, /* close */
+ amqp_ssl_socket_error, /* error */
+ amqp_ssl_socket_get_sockfd /* get_sockfd */
+};
+
+amqp_socket_t *
+amqp_ssl_socket_new(void)
+{
+ struct amqp_ssl_socket_t *self = calloc(1, sizeof(*self));
+ const char *error;
+ int status;
+ if (!self) {
+ goto error;
+ }
+ gnutls_global_init();
+ status = gnutls_init(&self->session, GNUTLS_CLIENT);
+ if (GNUTLS_E_SUCCESS != status) {
+ goto error;
+ }
+ status = gnutls_certificate_allocate_credentials(&self->credentials);
+ if (GNUTLS_E_SUCCESS != status) {
+ goto error;
+ }
+ gnutls_certificate_set_verify_function(self->credentials,
+ amqp_ssl_verify);
+ status = gnutls_credentials_set(self->session, GNUTLS_CRD_CERTIFICATE,
+ self->credentials);
+ if (GNUTLS_E_SUCCESS != status) {
+ goto error;
+ }
+ gnutls_session_set_ptr(self->session, self);
+ status = gnutls_priority_set_direct(self->session, "NORMAL", &error);
+ if (GNUTLS_E_SUCCESS != status) {
+ goto error;
+ }
+ self->klass = &amqp_ssl_socket_class;
+ return (amqp_socket_t *)self;
+error:
+ amqp_socket_close((amqp_socket_t *)self);
+ return NULL;
+}
+
+int
+amqp_ssl_socket_set_cacert(amqp_socket_t *base,
+ const char *cacert)
+{
+ int status;
+ struct amqp_ssl_socket_t *self;
+ if (base->klass != &amqp_ssl_socket_class) {
+ amqp_abort("<%p> is not of type amqp_ssl_socket_t", base);
+ }
+ self = (struct amqp_ssl_socket_t *)base;
+ status = gnutls_certificate_set_x509_trust_file(self->credentials,
+ cacert,
+ GNUTLS_X509_FMT_PEM);
+ if (0 > status) {
+ return -1;
+ }
+ return 0;
+}
+
+int
+amqp_ssl_socket_set_key(amqp_socket_t *base,
+ const char *cert,
+ const char *key)
+{
+ int status;
+ struct amqp_ssl_socket_t *self;
+ if (base->klass != &amqp_ssl_socket_class) {
+ amqp_abort("<%p> is not of type amqp_ssl_socket_t", base);
+ }
+ self = (struct amqp_ssl_socket_t *)base;
+ status = gnutls_certificate_set_x509_key_file(self->credentials,
+ cert,
+ key,
+ GNUTLS_X509_FMT_PEM);
+ if (0 > status) {
+ return -1;
+ }
+ return 0;
+}
+
+int
+amqp_ssl_socket_set_key_buffer(AMQP_UNUSED amqp_socket_t *base,
+ AMQP_UNUSED const char *cert,
+ AMQP_UNUSED const void *key,
+ AMQP_UNUSED size_t n)
+{
+ amqp_abort("%s is not implemented for GnuTLS", __func__);
+ return -1;
+}
+
+void
+amqp_ssl_socket_set_verify(amqp_socket_t *base,
+ amqp_boolean_t verify)
+{
+ struct amqp_ssl_socket_t *self;
+ if (base->klass != &amqp_ssl_socket_class) {
+ amqp_abort("<%p> is not of type amqp_ssl_socket_t", base);
+ }
+ self = (struct amqp_ssl_socket_t *)base;
+ if (verify) {
+ gnutls_certificate_set_verify_function(self->credentials,
+ amqp_ssl_verify);
+ } else {
+ gnutls_certificate_set_verify_function(self->credentials,
+ NULL);
+ }
+}
+
+void
+amqp_set_initialize_ssl_library(AMQP_UNUSED amqp_boolean_t do_initialize)
+{
+}
diff --git a/librabbitmq/amqp_openssl.c b/librabbitmq/amqp_openssl.c
new file mode 100644
index 0000000..301a10d
--- /dev/null
+++ b/librabbitmq/amqp_openssl.c
@@ -0,0 +1,573 @@
+/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */
+/*
+ * Copyright 2012-2013 Michael Steinert
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "amqp_ssl_socket.h"
+#include "amqp_private.h"
+#include "threads.h"
+#include <ctype.h>
+#include <openssl/conf.h>
+#include <openssl/err.h>
+#include <openssl/ssl.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "socket.h"
+
+static int initialize_openssl(void);
+static int destroy_openssl(void);
+
+static int open_ssl_connections = 0;
+static amqp_boolean_t do_initialize_openssl = 1;
+static amqp_boolean_t openssl_initialized = 0;
+
+#ifdef ENABLE_THREAD_SAFETY
+static unsigned long amqp_ssl_threadid_callback(void);
+static void amqp_ssl_locking_callback(int mode, int n, const char *file, int line);
+
+#ifdef _WIN32
+static long win32_create_mutex = 0;
+static pthread_mutex_t openssl_init_mutex = NULL;
+#else
+static pthread_mutex_t openssl_init_mutex = PTHREAD_MUTEX_INITIALIZER;
+#endif
+static pthread_mutex_t *amqp_openssl_lockarray = NULL;
+#endif /* ENABLE_THREAD_SAFETY */
+
+struct amqp_ssl_socket_t {
+ const struct amqp_socket_class_t *klass;
+ SSL_CTX *ctx;
+ int sockfd;
+ SSL *ssl;
+ char *buffer;
+ size_t length;
+ amqp_boolean_t verify;
+ int last_error;
+};
+
+static ssize_t
+amqp_ssl_socket_send(void *base,
+ const void *buf,
+ size_t len,
+ AMQP_UNUSED int flags)
+{
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+ ssize_t sent;
+ ERR_clear_error();
+ self->last_error = 0;
+ sent = SSL_write(self->ssl, buf, len);
+ if (0 > sent) {
+ self->last_error = ERROR_CATEGORY_SSL;
+ switch (SSL_get_error(self->ssl, sent)) {
+ case SSL_ERROR_NONE:
+ case SSL_ERROR_ZERO_RETURN:
+ case SSL_ERROR_WANT_READ:
+ case SSL_ERROR_WANT_WRITE:
+ sent = 0;
+ break;
+ }
+ }
+ return sent;
+}
+
+static ssize_t
+amqp_ssl_socket_writev(void *base,
+ const struct iovec *iov,
+ int iovcnt)
+{
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+ ssize_t written = -1;
+ char *bufferp;
+ size_t bytes;
+ int i;
+ self->last_error = 0;
+ bytes = 0;
+ for (i = 0; i < iovcnt; ++i) {
+ bytes += iov[i].iov_len;
+ }
+ if (self->length < bytes) {
+ free(self->buffer);
+ self->buffer = malloc(bytes);
+ if (!self->buffer) {
+ self->length = 0;
+ self->last_error = ERROR_NO_MEMORY;
+ goto exit;
+ }
+ self->length = bytes;
+ }
+ bufferp = self->buffer;
+ for (i = 0; i < iovcnt; ++i) {
+ memcpy(bufferp, iov[i].iov_base, iov[i].iov_len);
+ bufferp += iov[i].iov_len;
+ }
+ written = amqp_ssl_socket_send(self, self->buffer, bytes, 0);
+exit:
+ return written;
+}
+
+static ssize_t
+amqp_ssl_socket_recv(void *base,
+ void *buf,
+ size_t len,
+ AMQP_UNUSED int flags)
+{
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+ ssize_t received;
+ ERR_clear_error();
+ self->last_error = 0;
+ received = SSL_read(self->ssl, buf, len);
+ if (0 > received) {
+ self->last_error = ERROR_CATEGORY_SSL;
+ switch(SSL_get_error(self->ssl, received)) {
+ case SSL_ERROR_WANT_READ:
+ case SSL_ERROR_WANT_WRITE:
+ received = 0;
+ break;
+ }
+ }
+ return received;
+}
+
+static int
+amqp_ssl_socket_verify(void *base, const char *host)
+{
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+ unsigned char *utf8_value = NULL, *cp, ch;
+ int pos, utf8_length, status = 0;
+ ASN1_STRING *entry_string;
+ X509_NAME_ENTRY *entry;
+ X509_NAME *name;
+ X509 *peer;
+ peer = SSL_get_peer_certificate(self->ssl);
+ if (!peer) {
+ goto error;
+ }
+ name = X509_get_subject_name(peer);
+ if (!name) {
+ goto error;
+ }
+ pos = X509_NAME_get_index_by_NID(name, NID_commonName, -1);
+ if (0 > pos) {
+ goto error;
+ }
+ entry = X509_NAME_get_entry(name, pos);
+ if (!entry) {
+ goto error;
+ }
+ entry_string = X509_NAME_ENTRY_get_data(entry);
+ if (!entry_string) {
+ goto error;
+ }
+ utf8_length = ASN1_STRING_to_UTF8(&utf8_value, entry_string);
+ if (0 > utf8_length) {
+ goto error;
+ }
+ while (utf8_length > 0 && utf8_value[utf8_length - 1] == 0) {
+ --utf8_length;
+ }
+ if (utf8_length >= 256) {
+ goto error;
+ }
+ if ((size_t)utf8_length != strlen((char *)utf8_value)) {
+ goto error;
+ }
+ for (cp = utf8_value; (ch = *cp) != '\0'; ++cp) {
+ if (isascii(ch) && !isprint(ch)) {
+ goto error;
+ }
+ }
+#ifdef _MSC_VER
+#define strcasecmp _stricmp
+#endif
+ if (strcasecmp(host, (char *)utf8_value)) {
+ goto error;
+ }
+#ifdef _MSC_VER
+#undef strcasecmp
+#endif
+exit:
+ OPENSSL_free(utf8_value);
+ return status;
+error:
+ status = -1;
+ goto exit;
+}
+
+static int
+amqp_ssl_socket_open(void *base, const char *host, int port)
+{
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+ long result;
+ int status;
+ self->last_error = 0;
+ self->ssl = SSL_new(self->ctx);
+ if (!self->ssl) {
+ self->last_error = ERROR_CATEGORY_SSL;
+ return -1;
+ }
+ SSL_set_mode(self->ssl, SSL_MODE_AUTO_RETRY);
+ self->sockfd = amqp_open_socket(host, port);
+ if (0 > self->sockfd) {
+ self->last_error = -self->sockfd;
+ return -1;
+ }
+ status = SSL_set_fd(self->ssl, self->sockfd);
+ if (!status) {
+ self->last_error = ERROR_CATEGORY_SSL;
+ return -1;
+ }
+ status = SSL_connect(self->ssl);
+ if (!status) {
+ self->last_error = ERROR_CATEGORY_SSL;
+ return -1;
+ }
+ result = SSL_get_verify_result(self->ssl);
+ if (X509_V_OK != result) {
+ self->last_error = ERROR_CATEGORY_SSL;
+ return -1;
+ }
+ if (self->verify) {
+ int status = amqp_ssl_socket_verify(self, host);
+ if (status) {
+ self->last_error = ERROR_CATEGORY_SSL;
+ return -1;
+ }
+ }
+ return 0;
+}
+
+static int
+amqp_ssl_socket_close(void *base)
+{
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+ if (self) {
+ SSL_free(self->ssl);
+ amqp_os_socket_close(self->sockfd);
+ SSL_CTX_free(self->ctx);
+ free(self->buffer);
+ free(self);
+ }
+ destroy_openssl();
+ return 0;
+}
+
+static int
+amqp_ssl_socket_error(void *base)
+{
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+ return self->last_error;
+}
+
+char *
+amqp_ssl_error_string(AMQP_UNUSED int err)
+{
+ return strdup("A ssl socket error occurred.");
+}
+
+static int
+amqp_ssl_socket_get_sockfd(void *base)
+{
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+ return self->sockfd;
+}
+
+static const struct amqp_socket_class_t amqp_ssl_socket_class = {
+ amqp_ssl_socket_writev, /* writev */
+ amqp_ssl_socket_send, /* send */
+ amqp_ssl_socket_recv, /* recv */
+ amqp_ssl_socket_open, /* open */
+ amqp_ssl_socket_close, /* close */
+ amqp_ssl_socket_error, /* error */
+ amqp_ssl_socket_get_sockfd /* get_sockfd */
+};
+
+amqp_socket_t *
+amqp_ssl_socket_new(void)
+{
+ struct amqp_ssl_socket_t *self = calloc(1, sizeof(*self));
+ int status;
+ if (!self) {
+ goto error;
+ }
+ status = initialize_openssl();
+ if (status) {
+ goto error;
+ }
+ self->ctx = SSL_CTX_new(SSLv23_client_method());
+ if (!self->ctx) {
+ goto error;
+ }
+ self->klass = &amqp_ssl_socket_class;
+ self->verify = 1;
+ return (amqp_socket_t *)self;
+error:
+ amqp_socket_close((amqp_socket_t *)self);
+ return NULL;
+}
+
+int
+amqp_ssl_socket_set_cacert(amqp_socket_t *base,
+ const char *cacert)
+{
+ int status;
+ struct amqp_ssl_socket_t *self;
+ if (base->klass != &amqp_ssl_socket_class) {
+ amqp_abort("<%p> is not of type amqp_ssl_socket_t", base);
+ }
+ self = (struct amqp_ssl_socket_t *)base;
+ status = SSL_CTX_load_verify_locations(self->ctx, cacert, NULL);
+ if (1 != status) {
+ return -1;
+ }
+ return 0;
+}
+
+int
+amqp_ssl_socket_set_key(amqp_socket_t *base,
+ const char *cert,
+ const char *key)
+{
+ int status;
+ struct amqp_ssl_socket_t *self;
+ if (base->klass != &amqp_ssl_socket_class) {
+ amqp_abort("<%p> is not of type amqp_ssl_socket_t", base);
+ }
+ self = (struct amqp_ssl_socket_t *)base;
+ status = SSL_CTX_use_certificate_chain_file(self->ctx, cert);
+ if (1 != status) {
+ return -1;
+ }
+ status = SSL_CTX_use_PrivateKey_file(self->ctx, key,
+ SSL_FILETYPE_PEM);
+ if (1 != status) {
+ return -1;
+ }
+ return 0;
+}
+
+static int
+password_cb(AMQP_UNUSED char *buffer,
+ AMQP_UNUSED int length,
+ AMQP_UNUSED int rwflag,
+ AMQP_UNUSED void *user_data)
+{
+ amqp_abort("rabbitmq-c does not support password protected keys");
+ return 0;
+}
+
+int
+amqp_ssl_socket_set_key_buffer(amqp_socket_t *base,
+ const char *cert,
+ const void *key,
+ size_t n)
+{
+ int status = 0;
+ BIO *buf = NULL;
+ RSA *rsa = NULL;
+ struct amqp_ssl_socket_t *self;
+ if (base->klass != &amqp_ssl_socket_class) {
+ amqp_abort("<%p> is not of type amqp_ssl_socket_t", base);
+ }
+ self = (struct amqp_ssl_socket_t *)base;
+ status = SSL_CTX_use_certificate_chain_file(self->ctx, cert);
+ if (1 != status) {
+ return -1;
+ }
+ buf = BIO_new_mem_buf((void *)key, n);
+ if (!buf) {
+ goto error;
+ }
+ rsa = PEM_read_bio_RSAPrivateKey(buf, NULL, password_cb, NULL);
+ if (!rsa) {
+ goto error;
+ }
+ status = SSL_CTX_use_RSAPrivateKey(self->ctx, rsa);
+ if (1 != status) {
+ goto error;
+ }
+exit:
+ BIO_vfree(buf);
+ RSA_free(rsa);
+ return status;
+error:
+ status = -1;
+ goto exit;
+}
+
+int
+amqp_ssl_socket_set_cert(amqp_socket_t *base,
+ const char *cert)
+{
+ int status;
+ struct amqp_ssl_socket_t *self;
+ if (base->klass != &amqp_ssl_socket_class) {
+ amqp_abort("<%p> is not of type amqp_ssl_socket_t", base);
+ }
+ self = (struct amqp_ssl_socket_t *)base;
+ status = SSL_CTX_use_certificate_chain_file(self->ctx, cert);
+ if (1 != status) {
+ return -1;
+ }
+ return 0;
+}
+
+void
+amqp_ssl_socket_set_verify(amqp_socket_t *base,
+ amqp_boolean_t verify)
+{
+ struct amqp_ssl_socket_t *self;
+ if (base->klass != &amqp_ssl_socket_class) {
+ amqp_abort("<%p> is not of type amqp_ssl_socket_t", base);
+ }
+ self = (struct amqp_ssl_socket_t *)base;
+ self->verify = verify;
+}
+
+void
+amqp_set_initialize_ssl_library(amqp_boolean_t do_initialize)
+{
+ if (!openssl_initialized) {
+ do_initialize_openssl = do_initialize;
+ }
+}
+
+#ifdef ENABLE_THREAD_SAFETY
+unsigned long
+amqp_ssl_threadid_callback(void)
+{
+ return (unsigned long)pthread_self();
+}
+
+void
+amqp_ssl_locking_callback(int mode, int n,
+ AMQP_UNUSED const char *file,
+ AMQP_UNUSED int line)
+{
+ if (mode & CRYPTO_LOCK) {
+ if (pthread_mutex_lock(&amqp_openssl_lockarray[n])) {
+ amqp_abort("Runtime error: Failure in trying to lock OpenSSL mutex");
+ }
+ } else {
+ if (pthread_mutex_unlock(&amqp_openssl_lockarray[n])) {
+ amqp_abort("Runtime error: Failure in trying to unlock OpenSSL mutex");
+ }
+ }
+}
+#endif /* ENABLE_THREAD_SAFETY */
+
+static int
+initialize_openssl(void)
+{
+#ifdef _WIN32
+ /* No such thing as PTHREAD_INITIALIZE_MUTEX macro on Win32, so we use this */
+ if (NULL == openssl_init_mutex) {
+ while (InterlockedExchange(&win32_create_mutex, 1) == 1)
+ /* Loop, someone else is holding this lock */ ;
+
+ if (NULL == openssl_init_mutex) {
+ if (pthread_mutex_init(&openssl_init_mutex, NULL)) {
+ return -1;
+ }
+ }
+ InterlockedExchange(&win32_create_mutex, 0);
+ }
+#endif /* _WIN32 */
+
+#ifdef ENABLE_THREAD_SAFETY
+ if (pthread_mutex_lock(&openssl_init_mutex)) {
+ return -1;
+ }
+#endif /* ENABLE_THREAD_SAFETY */
+ if (do_initialize_openssl) {
+#ifdef ENABLE_THREAD_SAFETY
+ if (NULL == amqp_openssl_lockarray) {
+ int i = 0;
+ amqp_openssl_lockarray = calloc(CRYPTO_num_locks(), sizeof(pthread_mutex_t));
+ if (!amqp_openssl_lockarray) {
+ pthread_mutex_unlock(&openssl_init_mutex);
+ return -1;
+ }
+ for (i = 0; i < CRYPTO_num_locks(); ++i) {
+ if (pthread_mutex_init(&amqp_openssl_lockarray[i], NULL)) {
+ free(amqp_openssl_lockarray);
+ amqp_openssl_lockarray = NULL;
+ pthread_mutex_unlock(&openssl_init_mutex);
+ return -1;
+ }
+ }
+ }
+
+ if (0 == open_ssl_connections) {
+ CRYPTO_set_id_callback(amqp_ssl_threadid_callback);
+ CRYPTO_set_locking_callback(amqp_ssl_locking_callback);
+ }
+#endif /* ENABLE_THREAD_SAFETY */
+
+ if (!openssl_initialized) {
+ OPENSSL_config(NULL);
+
+ SSL_library_init();
+ SSL_load_error_strings();
+
+ openssl_initialized = 1;
+ }
+ }
+
+ ++open_ssl_connections;
+
+#ifdef ENABLE_THREAD_SAFETY
+ pthread_mutex_unlock(&openssl_init_mutex);
+#endif /* ENABLE_THREAD_SAFETY */
+ return 0;
+}
+
+static int
+destroy_openssl(void)
+{
+#ifdef ENABLE_THREAD_SAFETY
+ if (pthread_mutex_lock(&openssl_init_mutex)) {
+ return -1;
+ }
+#endif /* ENABLE_THREAD_SAFETY */
+
+ if (open_ssl_connections > 0) {
+ --open_ssl_connections;
+ }
+
+#ifdef ENABLE_THREAD_SAFETY
+ if (0 == open_ssl_connections && do_initialize_openssl) {
+ /* Unsetting these allows the rabbitmq-c library to be unloaded
+ * safely. We do leak the amqp_openssl_lockarray. Which is only
+ * an issue if you repeatedly unload and load the library
+ */
+ CRYPTO_set_locking_callback(NULL);
+ CRYPTO_set_id_callback(NULL);
+ }
+
+ pthread_mutex_unlock(&openssl_init_mutex);
+#endif /* ENABLE_THREAD_SAFETY */
+ return 0;
+}
diff --git a/librabbitmq/amqp_polarssl.c b/librabbitmq/amqp_polarssl.c
new file mode 100644
index 0000000..56c45c6
--- /dev/null
+++ b/librabbitmq/amqp_polarssl.c
@@ -0,0 +1,354 @@
+/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */
+/*
+ * Copyright 2012-2013 Michael Steinert
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "amqp_ssl_socket.h"
+#include "amqp_private.h"
+#include <polarssl/ctr_drbg.h>
+#include <polarssl/entropy.h>
+#include <polarssl/net.h>
+#include <polarssl/ssl.h>
+#include <polarssl/version.h>
+#include <stdlib.h>
+#include <string.h>
+
+#ifndef AMQP_USE_UNTESTED_SSL_BACKEND
+# error This SSL backend is alpha quality and likely contains errors.\
+ -DAMQP_USE_UNTESTED_SSL_BACKEND to use this backend
+#endif
+
+struct amqp_ssl_socket_t {
+ const struct amqp_socket_class_t *klass;
+ int sockfd;
+ entropy_context *entropy;
+ ctr_drbg_context *ctr_drbg;
+ x509_cert *cacert;
+ rsa_context *key;
+ x509_cert *cert;
+ ssl_context *ssl;
+ ssl_session *session;
+ char *buffer;
+ size_t length;
+ int last_error;
+};
+
+static ssize_t
+amqp_ssl_socket_send(void *base,
+ const void *buf,
+ size_t len,
+ AMQP_UNUSED int flags)
+{
+ ssize_t status;
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+
+ self->last_error = 0;
+ status = ssl_write(self->ssl, buf, len);
+ if (status < 0) {
+ self->last_error = ERROR_CATEGORY_SSL;
+ }
+
+ return status;
+}
+
+static ssize_t
+amqp_ssl_socket_writev(void *base,
+ const struct iovec *iov,
+ int iovcnt)
+{
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+ ssize_t written = -1;
+ char *bufferp;
+ size_t bytes;
+ int i;
+ self->last_error = 0;
+ bytes = 0;
+ for (i = 0; i < iovcnt; ++i) {
+ bytes += iov[i].iov_len;
+ }
+ if (self->length < bytes) {
+ free(self->buffer);
+ self->buffer = malloc(bytes);
+ if (!self->buffer) {
+ self->length = 0;
+ self->last_error = ERROR_NO_MEMORY;
+ goto exit;
+ }
+ self->length = bytes;
+ }
+ bufferp = self->buffer;
+ for (i = 0; i < iovcnt; ++i) {
+ memcpy(bufferp, iov[i].iov_base, iov[i].iov_len);
+ bufferp += iov[i].iov_len;
+ }
+ written = amqp_ssl_socket_send(self, (const unsigned char *)self->buffer,
+ bytes, 0);
+exit:
+ return written;
+}
+
+static ssize_t
+amqp_ssl_socket_recv(void *base,
+ void *buf,
+ size_t len,
+ AMQP_UNUSED int flags)
+{
+ ssize_t status;
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+
+ self->last_error = 0;
+ status = ssl_read(self->ssl, buf, len);
+ if (status < 0) {
+ self->last_error = ERROR_CATEGORY_SSL;
+ }
+
+ return status;
+}
+
+static int
+amqp_ssl_socket_open(void *base, const char *host, int port)
+{
+ int status;
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+ self->last_error = 0;
+
+ status = net_connect(&self->sockfd, host, port);
+ if (status) {
+ /* This isn't quite right. We should probably translate between
+ * POLARSSL_ERR_* to our internal error codes
+ */
+ self->last_error = ERROR_CATEGORY_SSL;
+ return -1;
+ }
+ if (self->cacert) {
+ ssl_set_ca_chain(self->ssl, self->cacert, NULL, host);
+ }
+ ssl_set_bio(self->ssl, net_recv, &self->sockfd,
+ net_send, &self->sockfd);
+ if (self->key && self->cert) {
+ ssl_set_own_cert(self->ssl, self->cert, self->key);
+ }
+ while (0 != (status = ssl_handshake(self->ssl))) {
+ switch (status) {
+ case POLARSSL_ERR_NET_WANT_READ:
+ case POLARSSL_ERR_NET_WANT_WRITE:
+ continue;
+ default:
+ self->last_error = ERROR_CATEGORY_SSL;
+ break;
+ }
+ }
+ return status;
+}
+
+static int
+amqp_ssl_socket_close(void *base)
+{
+ int status = -1;
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+ if (self) {
+ free(self->entropy);
+ free(self->ctr_drbg);
+ x509_free(self->cacert);
+ free(self->cacert);
+ rsa_free(self->key);
+ free(self->key);
+ x509_free(self->cert);
+ free(self->cert);
+ ssl_free(self->ssl);
+ free(self->ssl);
+ free(self->session);
+ free(self->buffer);
+ if (self->sockfd >= 0) {
+ net_close(self->sockfd);
+ status = 0;
+ }
+ free(self);
+ }
+ return status;
+}
+
+static int
+amqp_ssl_socket_error(AMQP_UNUSED void *user_data)
+{
+ return ERROR_CATEGORY_SSL;
+}
+
+char *
+amqp_ssl_error_string(AMQP_UNUSED int err)
+{
+ return strdup("A SSL socket error occurred");
+}
+
+static int
+amqp_ssl_socket_get_sockfd(void *base)
+{
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+ return self->sockfd;
+}
+
+static const struct amqp_socket_class_t amqp_ssl_socket_class = {
+ amqp_ssl_socket_writev, /* writev */
+ amqp_ssl_socket_send, /* send */
+ amqp_ssl_socket_recv, /* recv */
+ amqp_ssl_socket_open, /* open */
+ amqp_ssl_socket_close, /* close */
+ amqp_ssl_socket_error, /* error */
+ amqp_ssl_socket_get_sockfd /* get_sockfd */
+};
+
+amqp_socket_t *
+amqp_ssl_socket_new(void)
+{
+ struct amqp_ssl_socket_t *self = calloc(1, sizeof(*self));
+ int status;
+ if (!self) {
+ goto error;
+ }
+ self->entropy = calloc(1, sizeof(*self->entropy));
+ if (!self->entropy) {
+ goto error;
+ }
+ self->sockfd = -1;
+ entropy_init(self->entropy);
+ self->ctr_drbg = calloc(1, sizeof(*self->ctr_drbg));
+ if (!self->ctr_drbg) {
+ goto error;
+ }
+ status = ctr_drbg_init(self->ctr_drbg, entropy_func, self->entropy,
+ NULL, 0);
+ if (status) {
+ goto error;
+ }
+ self->ssl = calloc(1, sizeof(*self->ssl));
+ if (!self->ssl) {
+ goto error;
+ }
+ status = ssl_init(self->ssl);
+ if (status) {
+ goto error;
+ }
+ ssl_set_endpoint(self->ssl, SSL_IS_CLIENT);
+ ssl_set_rng(self->ssl, ctr_drbg_random, self->ctr_drbg);
+ ssl_set_ciphersuites(self->ssl, ssl_default_ciphersuites);
+ ssl_set_authmode(self->ssl, SSL_VERIFY_REQUIRED);
+ self->session = calloc(1, sizeof(*self->session));
+ if (!self->session) {
+ goto error;
+ }
+#if POLARSSL_VERSION_NUMBER >= 0x01020000
+ ssl_set_session(self->ssl, self->session);
+#else
+ ssl_set_session(self->ssl, 0, 0, self->session);
+#endif
+
+ self->klass = &amqp_ssl_socket_class;
+ return (amqp_socket_t *)self;
+error:
+ amqp_socket_close((amqp_socket_t *)self);
+ return NULL;
+}
+
+int
+amqp_ssl_socket_set_cacert(amqp_socket_t *base,
+ const char *cacert)
+{
+ int status;
+ struct amqp_ssl_socket_t *self;
+ if (base->klass != &amqp_ssl_socket_class) {
+ amqp_abort("<%p> is not of type amqp_ssl_socket_t", base);
+ }
+ self = (struct amqp_ssl_socket_t *)base;
+ self->cacert = calloc(1, sizeof(*self->cacert));
+ if (!self->cacert) {
+ return -1;
+ }
+ status = x509parse_crtfile(self->cacert, cacert);
+ if (status) {
+ return -1;
+ }
+ return 0;
+}
+
+int
+amqp_ssl_socket_set_key(amqp_socket_t *base,
+ const char *cert,
+ const char *key)
+{
+ int status;
+ struct amqp_ssl_socket_t *self;
+ if (base->klass != &amqp_ssl_socket_class) {
+ amqp_abort("<%p> is not of type amqp_ssl_socket_t", base);
+ }
+ self = (struct amqp_ssl_socket_t *)base;
+ self->key = calloc(1, sizeof(*self->key));
+ if (!self->key) {
+ return -1;
+ }
+ status = x509parse_keyfile(self->key, key, NULL);
+ if (status) {
+ return -1;
+ }
+ self->cert = calloc(1, sizeof(*self->cert));
+ if (!self->cert) {
+ return -1;
+ }
+ status = x509parse_crtfile(self->cert, cert);
+ if (status) {
+ return -1;
+ }
+ return 0;
+}
+
+int
+amqp_ssl_socket_set_key_buffer(AMQP_UNUSED amqp_socket_t *base,
+ AMQP_UNUSED const char *cert,
+ AMQP_UNUSED const void *key,
+ AMQP_UNUSED size_t n)
+{
+ amqp_abort("%s is not implemented for PolarSSL", __func__);
+ return -1;
+}
+
+void
+amqp_ssl_socket_set_verify(amqp_socket_t *base,
+ amqp_boolean_t verify)
+{
+ struct amqp_ssl_socket_t *self;
+ if (base->klass != &amqp_ssl_socket_class) {
+ amqp_abort("<%p> is not of type amqp_ssl_socket_t", base);
+ }
+ self = (struct amqp_ssl_socket_t *)base;
+ if (verify) {
+ ssl_set_authmode(self->ssl, SSL_VERIFY_REQUIRED);
+ } else {
+ ssl_set_authmode(self->ssl, SSL_VERIFY_NONE);
+ }
+}
+
+void
+amqp_set_initialize_ssl_library(AMQP_UNUSED amqp_boolean_t do_initialize)
+{
+}
diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h
index 68239d1..a33205e 100644
--- a/librabbitmq/amqp_private.h
+++ b/librabbitmq/amqp_private.h
@@ -45,16 +45,20 @@
#include "amqp_framing.h"
#include <string.h>
+#ifdef HAVE_ARPA_INET_H
+#include <arpa/inet.h>
+#endif
+
/* Error numbering: Because of differences in error numbering on
* different platforms, we want to keep error numbers opaque for
* client code. Internally, we encode the category of an error
* (i.e. where its number comes from) in the top bits of the number
* (assuming that an int has at least 32 bits).
*/
-#define ERROR_CATEGORY_MASK (1 << 29)
-
#define ERROR_CATEGORY_CLIENT (0 << 29) /* librabbitmq error codes */
#define ERROR_CATEGORY_OS (1 << 29) /* OS-specific error codes */
+#define ERROR_CATEGORY_SSL (1 << 28) /* SSL-specific error codes */
+#define ERROR_CATEGORY_MASK (ERROR_CATEGORY_OS | ERROR_CATEGORY_SSL)
/* librabbitmq error codes */
#define ERROR_NO_MEMORY 1
@@ -71,8 +75,11 @@
#if __GNUC__ > 2 | (__GNUC__ == 2 && __GNUC_MINOR__ > 4)
#define AMQP_NORETURN \
__attribute__ ((__noreturn__))
+#define AMQP_UNUSED \
+ __attribute__ ((__unused__))
#else
#define AMQP_NORETURN
+#define AMQP_UNUSED
#endif
#if __GNUC__ >= 4
@@ -85,7 +92,12 @@
char *
amqp_os_error_string(int err);
-#include "socket.h"
+#ifdef WITH_SSL
+char *
+amqp_ssl_error_string(int err);
+#endif
+
+#include "amqp_socket.h"
/*
* Connection states: XXX FIX THIS
@@ -143,7 +155,8 @@ struct amqp_connection_state_t_ {
amqp_bytes_t outbound_buffer;
- int sockfd;
+ amqp_socket_t *socket;
+
amqp_bytes_t sock_inbound_buffer;
size_t sock_inbound_offset;
size_t sock_inbound_limit;
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index 22cc2c5..ecb7de5 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -39,12 +39,73 @@
#endif
#include "amqp_private.h"
-#include <stdlib.h>
+
+#include "socket.h"
+
+#include <assert.h>
+#include <stdarg.h>
+#include <stdint.h>
#include <stdio.h>
+#include <stdlib.h>
#include <string.h>
-#include <stdint.h>
-#include <stdarg.h>
-#include <assert.h>
+
+ssize_t
+amqp_socket_writev(amqp_socket_t *self, const struct iovec *iov, int iovcnt)
+{
+ assert(self);
+ assert(self->klass->writev);
+ return self->klass->writev(self, iov, iovcnt);
+}
+
+ssize_t
+amqp_socket_send(amqp_socket_t *self, const void *buf, size_t len, int flags)
+{
+ assert(self);
+ assert(self->klass->send);
+ return self->klass->send(self, buf, len, flags);
+}
+
+ssize_t
+amqp_socket_recv(amqp_socket_t *self, void *buf, size_t len, int flags)
+{
+ assert(self);
+ assert(self->klass->recv);
+ return self->klass->recv(self, buf, len, flags);
+}
+
+int
+amqp_socket_open(amqp_socket_t *self, const char *host, int port)
+{
+ assert(self);
+ assert(self->klass->open);
+ return self->klass->open(self, host, port);
+}
+
+int
+amqp_socket_close(amqp_socket_t *self)
+{
+ if (self) {
+ assert(self->klass->close);
+ return self->klass->close(self);
+ }
+ return 0;
+}
+
+int
+amqp_socket_error(amqp_socket_t *self)
+{
+ assert(self);
+ assert(self->klass->error);
+ return self->klass->error(self);
+}
+
+int
+amqp_socket_get_sockfd(amqp_socket_t *self)
+{
+ assert(self);
+ assert(self->klass->get_sockfd);
+ return self->klass->get_sockfd(self);
+}
int amqp_open_socket(char const *hostname,
int portnumber)
@@ -57,8 +118,9 @@ int amqp_open_socket(char const *hostname,
int last_error = 0;
int one = 1; /* for setsockopt */
- if (0 != (last_error = amqp_socket_init())) {
- return last_error;
+ last_error = amqp_socket_init();
+ if (0 != last_error) {
+ return -last_error;
}
memset(&hint, 0, sizeof(hint));
@@ -81,20 +143,20 @@ int amqp_open_socket(char const *hostname,
*/
sockfd = (int)socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol);
if (-1 == sockfd) {
- last_error = -amqp_socket_error();
+ last_error = -amqp_os_socket_error();
continue;
}
#ifdef DISABLE_SIGPIPE_WITH_SETSOCKOPT
if (0 != amqp_socket_setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one))) {
- last_error = -amqp_socket_error();
- amqp_socket_close(sockfd);
+ last_error = -amqp_os_socket_error();
+ amqp_os_socket_close(sockfd);
continue;
}
#endif /* DISABLE_SIGPIPE_WITH_SETSOCKOPT */
if (0 != amqp_socket_setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one))
|| 0 != connect(sockfd, addr->ai_addr, addr->ai_addrlen)) {
- last_error = -amqp_socket_error();
- amqp_socket_close(sockfd);
+ last_error = -amqp_os_socket_error();
+ amqp_os_socket_close(sockfd);
continue;
} else {
last_error = 0;
@@ -117,7 +179,7 @@ int amqp_send_header(amqp_connection_state_t state)
AMQP_PROTOCOL_VERSION_MINOR,
AMQP_PROTOCOL_VERSION_REVISION
};
- return send(state->sockfd, (void *)header, 8, MSG_NOSIGNAL);
+ return amqp_socket_send(state->socket, header, 8, MSG_NOSIGNAL);
}
static amqp_bytes_t sasl_method_name(amqp_sasl_method_enum method)
@@ -214,13 +276,13 @@ static int wait_frame_inner(amqp_connection_state_t state,
assert(res != 0);
}
- res = recv(state->sockfd, state->sock_inbound_buffer.bytes,
- state->sock_inbound_buffer.len, 0);
+ res = amqp_socket_recv(state->socket, state->sock_inbound_buffer.bytes,
+ state->sock_inbound_buffer.len, 0);
if (res <= 0) {
if (res == 0) {
return -ERROR_CONNECTION_CLOSED;
} else {
- return -amqp_socket_error();
+ return -amqp_socket_error(state->socket);
}
}
@@ -342,14 +404,14 @@ retry:
*/
if (!((frame.frame_type == AMQP_FRAME_METHOD)
&& (
- ((frame.channel == channel)
- && (amqp_id_in_reply_list(frame.payload.method.id, expected_reply_ids)
- || (frame.payload.method.id == AMQP_CHANNEL_CLOSE_METHOD)))
- ||
- ((frame.channel == 0)
- && (frame.payload.method.id == AMQP_CONNECTION_CLOSE_METHOD))
- )
- )) {
+ ((frame.channel == channel)
+ && (amqp_id_in_reply_list(frame.payload.method.id, expected_reply_ids)
+ || (frame.payload.method.id == AMQP_CHANNEL_CLOSE_METHOD)))
+ ||
+ ((frame.channel == 0)
+ && (frame.payload.method.id == AMQP_CONNECTION_CLOSE_METHOD))
+ )
+ )) {
amqp_frame_t *frame_copy = amqp_pool_alloc(&state->decoding_pool, sizeof(amqp_frame_t));
amqp_link_t *link = amqp_pool_alloc(&state->decoding_pool, sizeof(amqp_link_t));
@@ -431,13 +493,13 @@ static int amqp_table_contains_entry(const amqp_table_t *table,
}
static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state,
- char const *vhost,
- int channel_max,
- int frame_max,
- int heartbeat,
- const amqp_table_t *client_properties,
- amqp_sasl_method_enum sasl_method,
- va_list vl)
+ char const *vhost,
+ int channel_max,
+ int frame_max,
+ int heartbeat,
+ const amqp_table_t *client_properties,
+ amqp_sasl_method_enum sasl_method,
+ va_list vl)
{
int res;
amqp_method_t method;
@@ -472,7 +534,7 @@ static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state,
amqp_table_t default_table;
amqp_connection_start_ok_t s;
amqp_bytes_t response_bytes = sasl_response(&state->decoding_pool,
- sasl_method, vl);
+ sasl_method, vl);
if (response_bytes.bytes == NULL) {
res = -ERROR_NO_MEMORY;
@@ -508,7 +570,7 @@ static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state,
amqp_table_entry_t *current_entry;
s.client_properties.entries = amqp_pool_alloc(&state->decoding_pool,
- sizeof(amqp_table_entry_t) * (default_table.num_entries + client_properties->num_entries));
+ sizeof(amqp_table_entry_t) * (default_table.num_entries + client_properties->num_entries));
if (NULL == s.client_properties.entries) {
res = -ERROR_NO_MEMORY;
goto error_res;
@@ -643,13 +705,13 @@ amqp_rpc_reply_t amqp_login(amqp_connection_state_t state,
}
amqp_rpc_reply_t amqp_login_with_properties(amqp_connection_state_t state,
- char const *vhost,
- int channel_max,
- int frame_max,
- int heartbeat,
- const amqp_table_t *client_properties,
- amqp_sasl_method_enum sasl_method,
- ...)
+ char const *vhost,
+ int channel_max,
+ int frame_max,
+ int heartbeat,
+ const amqp_table_t *client_properties,
+ amqp_sasl_method_enum sasl_method,
+ ...)
{
va_list vl;
diff --git a/librabbitmq/amqp_socket.h b/librabbitmq/amqp_socket.h
new file mode 100644
index 0000000..ef3462b
--- /dev/null
+++ b/librabbitmq/amqp_socket.h
@@ -0,0 +1,107 @@
+/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */
+/*
+ * Copyright 2012-2013 Michael Steinert
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ */
+
+/**
+ * An abstract socket interface.
+ */
+
+#ifndef AMQP_SOCKET_H
+#define AMQP_SOCKET_H
+
+#include "amqp.h"
+#include "socket.h"
+
+AMQP_BEGIN_DECLS
+
+/* Socket callbacks. */
+typedef ssize_t (*amqp_socket_writev_fn)(void *, const struct iovec *, int);
+typedef ssize_t (*amqp_socket_send_fn)(void *, const void *, size_t, int);
+typedef ssize_t (*amqp_socket_recv_fn)(void *, void *, size_t, int);
+typedef int (*amqp_socket_open_fn)(void *, const char *, int);
+typedef int (*amqp_socket_close_fn)(void *);
+typedef int (*amqp_socket_error_fn)(void *);
+typedef int (*amqp_socket_get_sockfd_fn)(void *);
+
+/** V-table for amqp_socket_t */
+struct amqp_socket_class_t {
+ amqp_socket_writev_fn writev;
+ amqp_socket_send_fn send;
+ amqp_socket_recv_fn recv;
+ amqp_socket_open_fn open;
+ amqp_socket_close_fn close;
+ amqp_socket_error_fn error;
+ amqp_socket_get_sockfd_fn get_sockfd;
+};
+
+/** Abstract base class for amqp_socket_t */
+struct amqp_socket_t_ {
+ const struct amqp_socket_class_t *klass;
+};
+
+/**
+ * Write to a socket.
+ *
+ * This function is analagous to writev(2).
+ *
+ * \param [in,out] self A socket object.
+ * \param [in] iov One or more data vecors.
+ * \param [in] iovcnt The number of vectors in \e iov.
+ *
+ * \return The number of bytes written, or -1 if an error occurred.
+ */
+ssize_t
+amqp_socket_writev(amqp_socket_t *self, const struct iovec *iov, int iovcnt);
+
+/**
+ * Send a message from a socket.
+ *
+ * This function is analagous to send(2).
+ *
+ * \param [in,out] self A socket object.
+ * \param [in] buf A buffer to read from.
+ * \param [in] len The number of bytes in \e buf.
+ * \param [in] flags Send flags, implementation specific.
+ *
+ * \return The number of bytes sent, or -1 if an error occurred.
+ */
+ssize_t
+amqp_socket_send(amqp_socket_t *self, const void *buf, size_t len, int flags);
+
+/**
+ * Receive a message from a socket.
+ *
+ * This function is analagous to recv(2).
+ *
+ * \param [in,out] self A socket object.
+ * \param [out] buf A buffer to write to.
+ * \param [in] len The number of bytes at \e buf.
+ * \param [in] flags Receive flags, implementation specific.
+ *
+ * \return The number of bytes received, or -1 if an error occurred.
+ */
+ssize_t
+amqp_socket_recv(amqp_socket_t *self, void *buf, size_t len, int flags);
+
+AMQP_END_DECLS
+
+#endif /* AMQP_SOCKET_H */
diff --git a/librabbitmq/amqp_ssl_socket.h b/librabbitmq/amqp_ssl_socket.h
new file mode 100644
index 0000000..3bfce51
--- /dev/null
+++ b/librabbitmq/amqp_ssl_socket.h
@@ -0,0 +1,141 @@
+/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */
+/*
+ * Copyright 2012-2013 Michael Steinert
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ */
+
+/**
+ * An SSL socket connection.
+ */
+
+#ifndef AMQP_SSL_H
+#define AMQP_SSL_H
+
+#include <amqp.h>
+
+AMQP_BEGIN_DECLS
+
+/**
+ * Create a new SSL/TLS socket object.
+ *
+ * Call amqp_socket_close() to release socket resources.
+ *
+ * \return A new socket object or NULL if an error occurred.
+ */
+AMQP_PUBLIC_FUNCTION
+amqp_socket_t *
+AMQP_CALL
+amqp_ssl_socket_new(void);
+
+/**
+ * Set the CA certificate.
+ *
+ * \param [in,out] self An SSL/TLS socket object.
+ * \param [in] cacert Path to the CA cert file in PEM format.
+ *
+ * \return Zero if successful, -1 otherwise.
+ */
+AMQP_PUBLIC_FUNCTION
+int
+AMQP_CALL
+amqp_ssl_socket_set_cacert(amqp_socket_t *self,
+ const char *cacert);
+
+/**
+ * Set the client key.
+ *
+ * \param [in,out] self An SSL/TLS socket object.
+ * \param [in] cert Path to the client certificate in PEM foramt.
+ * \param [in] key Path to the client key in PEM format.
+ *
+ * \return Zero if successful, -1 otherwise.
+ */
+AMQP_PUBLIC_FUNCTION
+int
+AMQP_CALL
+amqp_ssl_socket_set_key(amqp_socket_t *self,
+ const char *cert,
+ const char *key);
+
+/**
+ * Set the client key from a buffer.
+ *
+ * \param [in,out] self An SSL/TLS socket object.
+ * \param [in] cert Path to the client certificate in PEM foramt.
+ * \param [in] key A buffer containing client key in PEM format.
+ * \param [in] n The length of the buffer.
+ *
+ * \return Zero if successful, -1 otherwise.
+ */
+AMQP_PUBLIC_FUNCTION
+int
+AMQP_CALL
+amqp_ssl_socket_set_key_buffer(amqp_socket_t *self,
+ const char *cert,
+ const void *key,
+ size_t n);
+
+/**
+ * Enable or disable peer verification.
+ *
+ * If peer verification is enabled then the common name in the server
+ * certificate must match the server name. Peer verification is enabled by
+ * default.
+ *
+ * \param [in,out] self An SSL/TLS socket object.
+ * \param [in] verify Enable or disable peer verification.
+ */
+AMQP_PUBLIC_FUNCTION
+void
+AMQP_CALL
+amqp_ssl_socket_set_verify(amqp_socket_t *self,
+ amqp_boolean_t verify);
+
+/**
+ * Sets whether rabbitmq-c initializes the underlying SSL library.
+ *
+ * For SSL libraries that require a one-time initialization across
+ * a whole program (e.g., OpenSSL) this sets whether or not rabbitmq-c
+ * will initialize the SSL library when the first call to
+ * amqp_open_ssl_socket() is made. You should call this function with
+ * do_init = 0 if the underlying SSL library is intialized somewhere else
+ * the program.
+ *
+ * Failing to initialize or double initialization of the SSL library will
+ * result in undefined behavior
+ *
+ * By default rabbitmq-c will initialize the underlying SSL library
+ *
+ * NOTE: calling this function after the first socket has been opened with
+ * amqp_open_ssl_socket() will not have any effect.
+ *
+ * \param [in] do_initalize If 0 rabbitmq-c will not initialize the SSL
+ * library, otherwise rabbitmq-c will initialize the
+ * SL library
+ *
+ */
+AMQP_PUBLIC_FUNCTION
+void
+AMQP_CALL
+amqp_set_initialize_ssl_library(amqp_boolean_t do_initialize);
+
+AMQP_END_DECLS
+
+#endif /* AMQP_SSL_H */
diff --git a/librabbitmq/amqp_tcp_socket.c b/librabbitmq/amqp_tcp_socket.c
new file mode 100644
index 0000000..2eb366f
--- /dev/null
+++ b/librabbitmq/amqp_tcp_socket.c
@@ -0,0 +1,126 @@
+/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */
+/*
+ * Copyright 2012-2013 Michael Steinert
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "amqp_private.h"
+#include "amqp_tcp_socket.h"
+#include <stdio.h>
+#include <stdlib.h>
+
+struct amqp_tcp_socket_t {
+ const struct amqp_socket_class_t *klass;
+ int sockfd;
+};
+
+static ssize_t
+amqp_tcp_socket_writev(void *base, const struct iovec *iov, int iovcnt)
+{
+ struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
+ return amqp_os_socket_writev(self->sockfd, iov, iovcnt);
+}
+
+static ssize_t
+amqp_tcp_socket_send(void *base, const void *buf, size_t len, int flags)
+{
+ struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
+ return send(self->sockfd, buf, len, flags);
+}
+
+static ssize_t
+amqp_tcp_socket_recv(void *base, void *buf, size_t len, int flags)
+{
+ struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
+ return recv(self->sockfd, buf, len, flags);
+}
+
+static int
+amqp_tcp_socket_open(void *base, const char *host, int port)
+{
+ struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
+ self->sockfd = amqp_open_socket(host, port);
+ if (0 > self->sockfd) {
+ return -1;
+ }
+ return 0;
+}
+
+static int
+amqp_tcp_socket_close(void *base)
+{
+ struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
+ int status = -1;
+ if (self) {
+ status = amqp_os_socket_close(self->sockfd);
+ free(self);
+ }
+ return status;
+}
+
+static int
+amqp_tcp_socket_error(AMQP_UNUSED void *base)
+{
+ return amqp_os_socket_error();
+}
+
+static int
+amqp_tcp_socket_get_sockfd(void *base)
+{
+ struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
+ return self->sockfd;
+}
+
+static const struct amqp_socket_class_t amqp_tcp_socket_class = {
+ amqp_tcp_socket_writev, /* writev */
+ amqp_tcp_socket_send, /* send */
+ amqp_tcp_socket_recv, /* recv */
+ amqp_tcp_socket_open, /* open */
+ amqp_tcp_socket_close, /* close */
+ amqp_tcp_socket_error, /* error */
+ amqp_tcp_socket_get_sockfd /* get_sockfd */
+};
+
+amqp_socket_t *
+amqp_tcp_socket_new(void)
+{
+ struct amqp_tcp_socket_t *self = calloc(1, sizeof(*self));
+ if (!self) {
+ return NULL;
+ }
+ self->klass = &amqp_tcp_socket_class;
+ self->sockfd = -1;
+ return (amqp_socket_t *)self;
+}
+
+void
+amqp_tcp_socket_set_sockfd(amqp_socket_t *base, int sockfd)
+{
+ struct amqp_tcp_socket_t *self;
+ if (base->klass != &amqp_tcp_socket_class) {
+ amqp_abort("<%p> is not of type amqp_tcp_socket_t", base);
+ }
+ self = (struct amqp_tcp_socket_t *)base;
+ self->sockfd = sockfd;
+}
diff --git a/librabbitmq/amqp_tcp_socket.h b/librabbitmq/amqp_tcp_socket.h
new file mode 100644
index 0000000..4c8ba54
--- /dev/null
+++ b/librabbitmq/amqp_tcp_socket.h
@@ -0,0 +1,64 @@
+/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */
+/*
+ * Copyright 2012-2013 Michael Steinert
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ */
+
+/**
+ * A TCP socket connection.
+ */
+
+#ifndef AMQP_TCP_SOCKET_H
+#define AMQP_TCP_SOCKET_H
+
+#include <amqp.h>
+
+AMQP_BEGIN_DECLS
+
+/**
+ * Create a new TCP socket.
+ *
+ * Call amqp_socket_close() to release socket resources.
+ *
+ * \return A new socket object or NULL if an error occurred.
+ */
+AMQP_PUBLIC_FUNCTION
+amqp_socket_t *
+AMQP_CALL
+amqp_tcp_socket_new(void);
+
+/**
+ * Assign an open file descriptor to a socket object.
+ *
+ * This function must not be used in conjunction with amqp_socket_open(), i.e.
+ * the socket connection should already be open(2) when this function is
+ * called.
+ *
+ * \param [in,out] self A TCP socket object.
+ * \param [in] sockfd An open socket descriptor.
+ */
+AMQP_PUBLIC_FUNCTION
+void
+AMQP_CALL
+amqp_tcp_socket_set_sockfd(amqp_socket_t *base, int sockfd);
+
+AMQP_END_DECLS
+
+#endif /* AMQP_TCP_SOCKET_H */
diff --git a/librabbitmq/amqp_url.c b/librabbitmq/amqp_url.c
index 277b5a6..b200adc 100644
--- a/librabbitmq/amqp_url.c
+++ b/librabbitmq/amqp_url.c
@@ -52,6 +52,7 @@ void amqp_default_connection_info(struct amqp_connection_info *ci)
ci->host = "localhost";
ci->port = 5672;
ci->vhost = "/";
+ ci->ssl = 0;
}
/* Scan for the next delimiter, handling percent-encodings on the way. */
@@ -116,11 +117,16 @@ int amqp_parse_url(char *url, struct amqp_connection_info *parsed)
char *port = NULL;
/* check the prefix */
- if (strncmp(url, "amqp://", 7)) {
+ if (!strncmp(url, "amqp://", 7)) {
+ /* do nothing */
+ } else if (!strncmp(url, "amqps://", 8)) {
+ parsed->port = 5671;
+ parsed->ssl = 1;
+ } else {
goto out;
}
- host = start = url += 7;
+ host = start = url += (parsed->ssl ? 8 : 7);
delim = find_delim(&url, 1);
if (delim == ':') {
diff --git a/librabbitmq/unix/socket.c b/librabbitmq/unix/socket.c
index 38064f1..4615480 100644
--- a/librabbitmq/unix/socket.c
+++ b/librabbitmq/unix/socket.c
@@ -40,10 +40,12 @@
#include "amqp_private.h"
#include "socket.h"
+#include <errno.h>
#include <fcntl.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
+#include <unistd.h>
int
amqp_socket_init(void)
@@ -52,12 +54,7 @@ amqp_socket_init(void)
}
int
-amqp_socket_error(void)
-{
- return errno | ERROR_CATEGORY_OS;
-}
-
-int amqp_socket_socket(int domain, int type, int proto)
+amqp_socket_socket(int domain, int type, int proto)
{
int flags;
@@ -79,7 +76,27 @@ int amqp_socket_socket(int domain, int type, int proto)
return s;
}
-char *amqp_os_error_string(int err)
+char *
+amqp_os_error_string(int err)
{
return strdup(strerror(err));
}
+
+int
+amqp_os_socket_close(int sockfd)
+{
+ return close(sockfd);
+}
+
+ssize_t
+amqp_os_socket_writev(int sockfd, const struct iovec *iov,
+ int iovcnt)
+{
+ return writev(sockfd, iov, iovcnt);
+}
+
+int
+amqp_os_socket_error(void)
+{
+ return errno | ERROR_CATEGORY_OS;
+}
diff --git a/librabbitmq/unix/socket.h b/librabbitmq/unix/socket.h
index 1382ca3..34de1a2 100644
--- a/librabbitmq/unix/socket.h
+++ b/librabbitmq/unix/socket.h
@@ -44,7 +44,6 @@
#include <sys/socket.h>
#include <netdb.h>
#include <sys/uio.h>
-#include <unistd.h>
int
amqp_socket_init(void);
@@ -53,11 +52,15 @@ int
amqp_socket_socket(int domain, int type, int proto);
int
-amqp_socket_error(void);
+amqp_os_socket_error(void);
+
+int
+amqp_os_socket_close(int sockfd);
+
+ssize_t
+amqp_os_socket_writev(int sockfd, const struct iovec *iov, int iovcnt);
#define amqp_socket_setsockopt setsockopt
-#define amqp_socket_close close
-#define amqp_socket_writev writev
#if defined(SO_NOSIGPIPE) && !defined(MSG_NOSIGNAL)
# define DISABLE_SIGPIPE_WITH_SETSOCKOPT
diff --git a/librabbitmq/unix/threads.h b/librabbitmq/unix/threads.h
new file mode 100644
index 0000000..c2d80a6
--- /dev/null
+++ b/librabbitmq/unix/threads.h
@@ -0,0 +1,29 @@
+/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */
+/*
+ * Copyright 2012-2013 Michael Steinert
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ */
+
+#ifndef AMQP_THREADS_H
+#define AMQP_THREADS_H
+
+#include <pthread.h>
+
+#endif /* AMQP_THREADS_H */
diff --git a/librabbitmq/win32/socket.c b/librabbitmq/win32/socket.c
index a5d9454..82051fe 100644
--- a/librabbitmq/win32/socket.c
+++ b/librabbitmq/win32/socket.c
@@ -49,13 +49,14 @@
static int called_wsastartup;
-int amqp_socket_init(void)
+int
+amqp_socket_init(void)
{
if (!called_wsastartup) {
WSADATA data;
int res = WSAStartup(0x0202, &data);
if (res) {
- return -res;
+ return (ERROR_CATEGORY_OS | res);
}
called_wsastartup = 1;
@@ -64,7 +65,8 @@ int amqp_socket_init(void)
return 0;
}
-char *amqp_os_error_string(int err)
+char *
+amqp_os_error_string(int err)
{
char *msg, *copy;
@@ -91,7 +93,13 @@ amqp_socket_setsockopt(int sock, int level, int optname,
}
int
-amqp_socket_writev(int sock, struct iovec *iov, int nvecs)
+amqp_os_socket_close(int sockfd)
+{
+ return closesocket(sockfd);
+}
+
+ssize_t
+amqp_os_socket_writev(int sock, struct iovec *iov, int nvecs)
{
DWORD ret;
if (WSASend(sock, (LPWSABUF)iov, nvecs, &ret, 0, NULL, NULL) == 0) {
@@ -102,7 +110,7 @@ amqp_socket_writev(int sock, struct iovec *iov, int nvecs)
}
int
-amqp_socket_error(void)
+amqp_os_socket_error(void)
{
return WSAGetLastError() | ERROR_CATEGORY_OS;
}
diff --git a/librabbitmq/win32/socket.h b/librabbitmq/win32/socket.h
index cea361d..cddbee5 100644
--- a/librabbitmq/win32/socket.h
+++ b/librabbitmq/win32/socket.h
@@ -37,6 +37,7 @@
* ***** END LICENSE BLOCK *****
*/
+#include "amqp_private.h"
#include <winsock2.h>
#include <WS2tcpip.h>
@@ -50,17 +51,23 @@ int
amqp_socket_init(void);
#define amqp_socket_socket socket
-#define amqp_socket_close closesocket
+
+char *
+amqp_os_error_string(int err);
int
-amqp_socket_setsockopt(int sock, int level, int optname, const void *optval,
- size_t optlen);
+amqp_socket_setsockopt(int sock, int level, int optname,
+ const void *optval, size_t optlen);
int
-amqp_socket_writev(int sock, struct iovec *iov, int nvecs);
+amqp_os_socket_close(int sockfd);
+
+
+ssize_t
+amqp_os_socket_writev(int sock, struct iovec *iov, int nvecs);
int
-amqp_socket_error(void);
+amqp_os_socket_error(void);
#ifndef MSG_NOSIGNAL
# define MSG_NOSIGNAL 0x0
diff --git a/librabbitmq/win32/threads.c b/librabbitmq/win32/threads.c
new file mode 100644
index 0000000..edf2d2d
--- /dev/null
+++ b/librabbitmq/win32/threads.c
@@ -0,0 +1,63 @@
+/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */
+/*
+ * Copyright 2012-2013 Michael Steinert
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ */
+
+#include "threads.h"
+
+DWORD
+pthread_self(void)
+{
+ return GetCurrentThreadId();
+}
+
+int
+pthread_mutex_init(pthread_mutex_t *mutex, void *attr)
+{
+ *mutex = malloc(sizeof(CRITICAL_SECTION));
+ if (!*mutex) {
+ return 1;
+ }
+ InitializeCriticalSection(*mutex);
+ return 0;
+}
+
+int
+pthread_mutex_lock(pthread_mutex_t *mutex)
+{
+ if (!*mutex) {
+ return 1;
+ }
+
+ EnterCriticalSection(*mutex);
+ return 0;
+}
+
+int
+pthread_mutex_unlock(pthread_mutex_t *mutex)
+{
+ if (!*mutex) {
+ return 1;
+ }
+
+ LeaveCriticalSection(*mutex);
+ return 0;
+}
diff --git a/librabbitmq/win32/threads.h b/librabbitmq/win32/threads.h
new file mode 100644
index 0000000..668b2a3
--- /dev/null
+++ b/librabbitmq/win32/threads.h
@@ -0,0 +1,37 @@
+/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */
+/*
+ * Copyright 2012-2013 Michael Steinert
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ */
+
+#ifndef AMQP_THREAD_H
+#define AMQP_THREAD_H
+
+#include <Windows.h>
+
+typedef CRITICAL_SECTION *pthread_mutex_t;
+typedef int pthread_once_t;
+
+DWORD pthread_self(void);
+
+int pthread_mutex_init(pthread_mutex_t *, void *attr);
+int pthread_mutex_lock(pthread_mutex_t *);
+int pthread_mutex_unlock(pthread_mutex_t *);
+#endif /* AMQP_THREAD_H */
diff --git a/m4/polarssl.m4 b/m4/polarssl.m4
new file mode 100644
index 0000000..2c87bbd
--- /dev/null
+++ b/m4/polarssl.m4
@@ -0,0 +1,75 @@
+# polarssl.m4 - Check for PolarSSL
+#
+# Copyright 2012 Michael Steinert
+#
+# Permission is hereby granted, free of charge, to any person obtaining
+# a copy of this software and associated documentation files (the
+# "Software"), to deal in the Software without restriction, including
+# without limitation the rights to use, copy, modify, merge, publish,
+# distribute, sublicense, and/or sell copies of the Software, and to
+# permit persons to whom the Software is furnished to do so, subject to
+# the following conditions:
+#
+# The above copyright notice and this permission notice shall be
+# included in all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
+# SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR
+# THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+#serial 1
+
+# _AX_LIB_POLARSSL
+# ----------------
+# Check for the PolarSSL library and header file. If found the cache variable
+# ax_cv_have_polarssl will be set to yes.
+AC_DEFUN([_AX_LIB_POLARSSL],
+[dnl
+ax_cv_have_polarssl=no
+_ax_polarssl_h=no
+_ax_polarssl_lib=no
+AC_ARG_VAR([POLARSSL_CFLAGS],
+ [C compiler flags for PolarSSL, overriding defaults])
+AC_ARG_VAR([POLARSSL_LIBS], [linker flags for PolarSSL, overriding defaults])
+AC_CHECK_HEADERS([polarssl/ssl.h],
+ [_ax_polarssl_h=yes],,
+ [$POLARSSL_CFLAGS])
+AS_IF([test "x$POLARSSL_LIBS" = "x"],
+ [AC_CHECK_LIB([polarssl], [entropy_init],
+ [POLARSSL_LIBS=-lpolarssl
+ _ax_polarssl_lib=yes])],
+ [_ax_polarssl_cflags=$CFLAGS
+ CFLAGS="$POLARSSL_CFLAGS $CFLAGS"
+ _ax_polarssl_ldflags=$LDFLAGS
+ LDFLAGS="$POLARSSL_LIBS $LDFLAGS"
+ AC_MSG_CHECKING([for libpolarssl])
+ AC_TRY_LINK([#include <polarssl/entropy.h>],
+ [entropy_init(NULL)],
+ [AC_MSG_RESULT([$POLARSSL_LIBS])
+ _ax_polarssl_lib=yes],
+ [AC_MSG_RESULT([no])])
+ CFLAGS=$_ax_polarssl_cflags
+ LDFLAGS=$_ax_polarssl_ldflags])
+AS_IF([test "x$_ax_polarssl_h" = "xyes" && \
+ test "x$_ax_polarssl_lib" = "xyes"],
+ [ax_cv_have_polarssl=yes])
+])dnl
+
+# AX_LIB_POLARSSL([ACTION-IF-TRUE], [ACTION-IF-FALSE])
+# ------------------------------------------------
+# Check if PolarSSL is installed. If found the variable ax_have_polarssl will
+# be set to yes.
+# ACTION-IF-TRUE: commands to execute if PolarSSL is installed
+# ACTION-IF-FALSE: commands to execute if PoloarSSL is not installed
+AC_DEFUN([AX_LIB_POLARSSL],
+[dnl
+AC_CACHE_VAL([ax_cv_have_polarssl], [_AX_LIB_POLARSSL])
+ax_have_polarssl=$ax_cv_have_polarssl
+AS_IF([test "x$ax_have_polarssl" = "xyes"],
+ [AC_DEFINE([HAVE_POLARSSL], [1], [Define to 1 if PolarSSL is available.])
+ $1], [$2])
+])dnl
diff --git a/tests/test_parse_url.c b/tests/test_parse_url.c
index b816cd8..da3e681 100644
--- a/tests/test_parse_url.c
+++ b/tests/test_parse_url.c
@@ -112,61 +112,142 @@ int main(void)
/* From the spec */
parse_success("amqp://user:pass@host:10000/vhost", "user", "pass",
"host", 10000, "vhost");
+ parse_success("amqps://user:pass@host:10000/vhost", "user", "pass",
+ "host", 10000, "vhost");
+
parse_success("amqp://user%61:%61pass@ho%61st:10000/v%2fhost",
"usera", "apass", "hoast", 10000, "v/host");
+ parse_success("amqps://user%61:%61pass@ho%61st:10000/v%2fhost",
+ "usera", "apass", "hoast", 10000, "v/host");
+
parse_success("amqp://", "guest", "guest", "localhost", 5672, "/");
+ parse_success("amqps://", "guest", "guest", "localhost", 5671, "/");
+
parse_success("amqp://:@/", "", "", "localhost", 5672, "");
+ parse_success("amqps://:@/", "", "", "localhost", 5671, "");
+
parse_success("amqp://user@", "user", "guest", "localhost", 5672, "/");
+ parse_success("amqps://user@", "user", "guest", "localhost", 5671, "/");
+
parse_success("amqp://user:pass@", "user", "pass",
"localhost", 5672, "/");
+ parse_success("amqps://user:pass@", "user", "pass",
+ "localhost", 5671, "/");
+
parse_success("amqp://host", "guest", "guest", "host", 5672, "/");
+ parse_success("amqps://host", "guest", "guest", "host", 5671, "/");
+
parse_success("amqp://:10000", "guest", "guest", "localhost", 10000,
"/");
+ parse_success("amqps://:10000", "guest", "guest", "localhost", 10000,
+ "/");
+
parse_success("amqp:///vhost", "guest", "guest", "localhost", 5672,
"vhost");
+ parse_success("amqps:///vhost", "guest", "guest", "localhost", 5671,
+ "vhost");
+
parse_success("amqp://host/", "guest", "guest", "host", 5672, "");
+ parse_success("amqps://host/", "guest", "guest", "host", 5671, "");
+
parse_success("amqp://host/%2f", "guest", "guest", "host", 5672, "/");
+ parse_success("amqps://host/%2f", "guest", "guest", "host", 5671, "/");
+
parse_success("amqp://[::1]", "guest", "guest", "::1", 5672, "/");
+ parse_success("amqps://[::1]", "guest", "guest", "::1", 5671, "/");
/* Various other success cases */
parse_success("amqp://host:100", "guest", "guest", "host", 100, "/");
+ parse_success("amqps://host:100", "guest", "guest", "host", 100, "/");
+
parse_success("amqp://[::1]:100", "guest", "guest", "::1", 100, "/");
+ parse_success("amqps://[::1]:100", "guest", "guest", "::1", 100, "/");
parse_success("amqp://host/blah", "guest", "guest",
"host", 5672, "blah");
+ parse_success("amqps://host/blah", "guest", "guest",
+ "host", 5671, "blah");
+
parse_success("amqp://host:100/blah", "guest", "guest",
"host", 100, "blah");
+ parse_success("amqps://host:100/blah", "guest", "guest",
+ "host", 100, "blah");
+
parse_success("amqp://:100/blah", "guest", "guest",
"localhost", 100, "blah");
+ parse_success("amqps://:100/blah", "guest", "guest",
+ "localhost", 100, "blah");
+
parse_success("amqp://[::1]/blah", "guest", "guest",
"::1", 5672, "blah");
+ parse_success("amqps://[::1]/blah", "guest", "guest",
+ "::1", 5671, "blah");
+
parse_success("amqp://[::1]:100/blah", "guest", "guest",
"::1", 100, "blah");
+ parse_success("amqps://[::1]:100/blah", "guest", "guest",
+ "::1", 100, "blah");
parse_success("amqp://user:pass@host", "user", "pass",
"host", 5672, "/");
+ parse_success("amqps://user:pass@host", "user", "pass",
+ "host", 5671, "/");
+
parse_success("amqp://user:pass@host:100", "user", "pass",
"host", 100, "/");
+ parse_success("amqps://user:pass@host:100", "user", "pass",
+ "host", 100, "/");
+
parse_success("amqp://user:pass@:100", "user", "pass",
"localhost", 100, "/");
+ parse_success("amqps://user:pass@:100", "user", "pass",
+ "localhost", 100, "/");
+
parse_success("amqp://user:pass@[::1]", "user", "pass",
"::1", 5672, "/");
+ parse_success("amqps://user:pass@[::1]", "user", "pass",
+ "::1", 5671, "/");
+
parse_success("amqp://user:pass@[::1]:100", "user", "pass",
"::1", 100, "/");
+ parse_success("amqps://user:pass@[::1]:100", "user", "pass",
+ "::1", 100, "/");
/* Various failure cases */
parse_fail("http://www.rabbitmq.com");
+
parse_fail("amqp://foo:bar:baz");
+ parse_fail("amqps://foo:bar:baz");
+
+ parse_fail("amqp://foo[::1]");
+ parse_fail("amqps://foo[::1]");
+
parse_fail("amqp://foo[::1]");
+ parse_fail("amqps://foo[::1]");
+
parse_fail("amqp://foo:[::1]");
+ parse_fail("amqps://foo:[::1]");
+
parse_fail("amqp://[::1]foo");
+ parse_fail("amqps://[::1]foo");
+
parse_fail("amqp://foo:1000xyz");
+ parse_fail("amqps://foo:1000xyz");
+
parse_fail("amqp://foo:1000000");
+ parse_fail("amqps://foo:1000000");
+
parse_fail("amqp://foo/bar/baz");
+ parse_fail("amqps://foo/bar/baz");
parse_fail("amqp://foo%1");
+ parse_fail("amqps://foo%1");
+
parse_fail("amqp://foo%1x");
+ parse_fail("amqps://foo%1x");
+
parse_fail("amqp://foo%xy");
+ parse_fail("amqps://foo%xy");
return 0;
}
diff --git a/tools/common.c b/tools/common.c
index 7659521..0c61d16 100644
--- a/tools/common.c
+++ b/tools/common.c
@@ -38,18 +38,18 @@
#include "config.h"
#endif
-/* needed for asnprintf */
-#define _GNU_SOURCE
+#include "common.h"
+#ifdef WITH_SSL
+#include <amqp_ssl_socket.h>
+#endif
+#include <amqp_tcp_socket.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
-#include <stdarg.h>
#include <string.h>
-
#include <unistd.h>
-#include <fcntl.h>
-#include <errno.h>
-
-#include "common.h"
#ifdef WINDOWS
#include "compat.h"
@@ -175,6 +175,12 @@ static int amqp_port = -1;
static char *amqp_vhost;
static char *amqp_username;
static char *amqp_password;
+#ifdef WITH_SSL
+static int amqp_ssl = 0;
+static char *amqp_cacert = "/etc/ssl/certs/cacert.pem";
+static char *amqp_key = NULL;
+static char *amqp_cert = NULL;
+#endif /* WITH_SSL */
const char *connect_options_title = "Connection options";
struct poptOption connect_options[] = {
@@ -202,13 +208,29 @@ struct poptOption connect_options[] = {
"password", 0, POPT_ARG_STRING, &amqp_password, 0,
"the password to login with", "password"
},
+#ifdef WITH_SSL
+ {
+ "ssl", 0, POPT_ARG_NONE, &amqp_ssl, 0,
+ "connect over SSL/TLS", NULL
+ },
+ {
+ "cacert", 0, POPT_ARG_STRING, &amqp_cacert, 0,
+ "path to the CA certificate file", "cacert.pem"
+ },
+ {
+ "key", 0, POPT_ARG_STRING, &amqp_key, 0,
+ "path to the client private key file", "key.pem"
+ },
+ {
+ "cert", 0, POPT_ARG_STRING, &amqp_cert, 0,
+ "path to the client certificate file", "cert.pem"
+ },
+#endif /* WITH_SSL */
{ NULL, '\0', 0, NULL, 0, NULL, NULL }
};
static void init_connection_info(struct amqp_connection_info *ci)
{
- struct amqp_connection_info defaults;
-
ci->user = NULL;
ci->password = NULL;
ci->host = NULL;
@@ -216,17 +238,17 @@ static void init_connection_info(struct amqp_connection_info *ci)
ci->vhost = NULL;
ci->user = NULL;
- if (amqp_url) {
+ amqp_default_connection_info(ci);
+
+ if (amqp_url)
die_amqp_error(amqp_parse_url(strdup(amqp_url), ci),
"Parsing URL '%s'", amqp_url);
- }
if (amqp_server) {
char *colon;
- if (ci->host) {
+ if (ci->host)
die("both --server and --url options specify"
" server host");
- }
/* parse the server string into a hostname and a port */
colon = strchr(amqp_server, ':');
@@ -246,104 +268,105 @@ static void init_connection_info(struct amqp_connection_info *ci)
memcpy(ci->host, amqp_server, host_len);
ci->host[host_len] = 0;
- if (ci->port >= 0) {
+ if (ci->port >= 0)
die("both --server and --url options specify"
" server port");
- }
- if (amqp_port >= 0) {
+ if (amqp_port >= 0)
die("both --server and --port options specify"
" server port");
- }
ci->port = strtol(colon+1, &port_end, 10);
if (ci->port < 0
|| ci->port > 65535
|| port_end == colon+1
- || *port_end != 0) {
+ || *port_end != 0)
die("bad server port number in '%s'",
amqp_server);
- }
}
+
+#if WITH_SSL
+ if (amqp_ssl && !ci->ssl) {
+ die("the --ssl option specifies an SSL connection"
+ " but the --server option does not");
+ }
+#endif
}
if (amqp_port >= 0) {
- if (ci->port >= 0) {
+ if (ci->port >= 0)
die("both --port and --url options specify"
" server port");
- }
ci->port = amqp_port;
}
if (amqp_username) {
- if (ci->user) {
+ if (ci->user)
die("both --username and --url options specify"
" AMQP username");
- }
ci->user = amqp_username;
}
if (amqp_password) {
- if (ci->password) {
+ if (ci->password)
die("both --password and --url options specify"
" AMQP password");
- }
ci->password = amqp_password;
}
if (amqp_vhost) {
- if (ci->vhost) {
+ if (ci->vhost)
die("both --vhost and --url options specify"
" AMQP vhost");
- }
ci->vhost = amqp_vhost;
}
-
- amqp_default_connection_info(&defaults);
-
- if (!ci->user) {
- ci->user = defaults.user;
- }
- if (!ci->password) {
- ci->password = defaults.password;
- }
- if (!ci->host) {
- ci->host = defaults.host;
- }
- if (ci->port < 0) {
- ci->port = defaults.port;
- }
- if (!ci->vhost) {
- ci->vhost = defaults.vhost;
- }
}
amqp_connection_state_t make_connection(void)
{
- int s;
+ int status;
+ amqp_socket_t *socket = NULL;
struct amqp_connection_info ci;
amqp_connection_state_t conn;
init_connection_info(&ci);
-
- s = amqp_open_socket(ci.host, ci.port);
- die_amqp_error(s, "opening socket to %s:%d", ci.host, ci.port);
-
conn = amqp_new_connection();
- amqp_set_sockfd(conn, s);
-
+ if (ci.ssl) {
+#ifdef WITH_SSL
+ socket = amqp_ssl_socket_new();
+ if (!socket) {
+ die("creating SSL/TLS socket");
+ }
+ if (amqp_cacert) {
+ amqp_ssl_socket_set_cacert(socket, amqp_cacert);
+ }
+ if (amqp_key) {
+ amqp_ssl_socket_set_key(socket, amqp_cert, amqp_key);
+ }
+#else
+ die("librabbitmq was not built with SSL/TLS support");
+#endif
+ } else {
+ socket = amqp_tcp_socket_new();
+ if (!socket) {
+ die("creating TCP socket (out of memory)");
+ }
+ }
+ status = amqp_socket_open(socket, ci.host, ci.port);
+ if (status) {
+ die("opening socket to %s:%d", ci.host, ci.port);
+ }
+ amqp_set_socket(conn, socket);
die_rpc(amqp_login(conn, ci.vhost, 0, 131072, 0,
AMQP_SASL_METHOD_PLAIN,
ci.user, ci.password),
"logging in to AMQP server");
-
if (!amqp_channel_open(conn, 1)) {
die_rpc(amqp_get_rpc_reply(conn), "opening channel");
}
-
return conn;
}