summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Steinert <mike.steinert@gmail.com>2012-06-08 15:38:22 -0600
committerAlan Antonuk <alan.antonuk@gmail.com>2013-04-09 15:48:03 -0700
commit0f022fce389543404f86824ed6c5720d2502cd09 (patch)
tree6e39adb0a9268d32e6873a48723f3d84c1316e87
parent7240f48af34aea8412473f29e93f6443f79230db (diff)
downloadrabbitmq-c-0f022fce389543404f86824ed6c5720d2502cd09.tar.gz
Propose new socket API
The general idea is to have a non-instantiable socket base class. Connection-specific sub-classes provide a constructor and methods for modifying connection parameters. `amqp_socket_close()` is the destructor. Signed-off-by: Michael Steinert <mike.steinert@gmail.com>
-rw-r--r--Makefile.am5
-rw-r--r--examples/amqps_bind.c37
-rw-r--r--examples/amqps_consumer.c38
-rw-r--r--examples/amqps_exchange_declare.c37
-rw-r--r--examples/amqps_listen.c37
-rw-r--r--examples/amqps_listenq.c37
-rw-r--r--examples/amqps_producer.c37
-rw-r--r--examples/amqps_sendstring.c37
-rw-r--r--examples/amqps_unbind.c37
-rw-r--r--examples/utils.c12
-rw-r--r--examples/utils.h1
-rw-r--r--librabbitmq/CMakeLists.txt2
-rw-r--r--librabbitmq/amqp-openssl.c284
-rw-r--r--librabbitmq/amqp-socket.h61
-rw-r--r--librabbitmq/amqp-ssl-socket.h (renamed from librabbitmq/amqp-ssl.h)84
-rw-r--r--librabbitmq/amqp-tcp-socket.c143
-rw-r--r--librabbitmq/amqp-tcp-socket.h55
-rw-r--r--librabbitmq/amqp.h53
-rw-r--r--librabbitmq/amqp_connection.c61
-rw-r--r--librabbitmq/amqp_private.h10
-rw-r--r--librabbitmq/amqp_socket.c53
-rw-r--r--librabbitmq/unix/socket.c8
-rw-r--r--librabbitmq/unix/socket.h10
-rw-r--r--tools/common.c55
24 files changed, 859 insertions, 335 deletions
diff --git a/Makefile.am b/Makefile.am
index e8c0e2a..f25c09c 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -12,6 +12,8 @@ endif #REGENERATE_AMQP_FRAMING
lib_LTLIBRARIES = librabbitmq/librabbitmq.la
librabbitmq_librabbitmq_la_SOURCES = \
+ librabbitmq/amqp-socket.h \
+ librabbitmq/amqp-tcp-socket.c \
librabbitmq/amqp_api.c \
librabbitmq/amqp_connection.c \
librabbitmq/amqp_mem.c \
@@ -67,9 +69,10 @@ endif
include_HEADERS = \
$(top_srcdir)/librabbitmq/amqp.h
+ $(top_builddir)/librabbitmq/amqp-tcp-socket.h \
if SSL
-include_HEADERS += librabbitmq/amqp-ssl.h
+include_HEADERS += librabbitmq/amqp-ssl-socket.h
endif
if REGENERATE_AMQP_FRAMING
diff --git a/examples/amqps_bind.c b/examples/amqps_bind.c
index 1e2c84f..3255e70 100644
--- a/examples/amqps_bind.c
+++ b/examples/amqps_bind.c
@@ -35,19 +35,18 @@
#include <string.h>
#include <stdint.h>
-#include <amqp-ssl.h>
+#include <amqp-ssl-socket.h>
#include <amqp_framing.h>
#include "utils.h"
int main(int argc, char const * const *argv) {
char const *hostname;
- int port;
+ int port, status;
char const *exchange;
char const *bindingkey;
char const *queue;
-
- int sockfd;
+ amqp_socket_t *socket;
amqp_connection_state_t conn;
if (argc < 6) {
@@ -64,11 +63,31 @@ int main(int argc, char const * const *argv) {
conn = amqp_new_connection();
- die_on_error(sockfd = amqp_open_ssl_socket(conn, hostname, port,
- argc > 6 ? argv[6] : NULL,
- argc > 8 ? argv[7] : NULL,
- argc > 8 ? argv[8] : NULL),
- "Opening SSL/TLS socket");
+ socket = amqp_ssl_socket_new();
+ if (!socket) {
+ die("creating SSL/TLS socket");
+ }
+
+ if (argc > 6) {
+ status = amqp_ssl_socket_set_cacert(socket, argv[6]);
+ if (status) {
+ die("setting CA certificate");
+ }
+ }
+
+ if (argc > 8) {
+ status = amqp_ssl_socket_set_key(socket, argv[7], argv[8]);
+ if (status) {
+ die("setting client key/cert");
+ }
+ }
+
+ status = amqp_socket_open(socket, hostname, port);
+ if (status) {
+ die("opening SSL/TLS connection");
+ }
+
+ amqp_set_socket(conn, socket);
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
diff --git a/examples/amqps_consumer.c b/examples/amqps_consumer.c
index 31d251d..cb47dff 100644
--- a/examples/amqps_consumer.c
+++ b/examples/amqps_consumer.c
@@ -35,7 +35,7 @@
#include <string.h>
#include <stdint.h>
-#include <amqp-ssl.h>
+#include <amqp-ssl-socket.h>
#include <amqp_framing.h>
#include <assert.h>
@@ -115,13 +115,11 @@ static void run(amqp_connection_state_t conn)
int main(int argc, char const * const *argv) {
char const *hostname;
- int port;
+ int port, status;
char const *exchange;
char const *bindingkey;
-
- int sockfd;
+ amqp_socket_t *socket;
amqp_connection_state_t conn;
-
amqp_bytes_t queuename;
if (argc < 3) {
@@ -137,11 +135,31 @@ int main(int argc, char const * const *argv) {
conn = amqp_new_connection();
- die_on_error(sockfd = amqp_open_ssl_socket(conn, hostname, port,
- argc > 3 ? argv[3] : NULL,
- argc > 5 ? argv[4] : NULL,
- argc > 5 ? argv[5] : NULL),
- "Opening SSL/TLS socket");
+ socket = amqp_ssl_socket_new();
+ if (!socket) {
+ die("creating SSL/TLS socket");
+ }
+
+ if (argc > 3) {
+ status = amqp_ssl_socket_set_cacert(socket, argv[3]);
+ if (status) {
+ die("setting CA certificate");
+ }
+ }
+
+ if (argc > 4) {
+ status = amqp_ssl_socket_set_key(socket, argv[5], argv[5]);
+ if (status) {
+ die("setting client key/cert");
+ }
+ }
+
+ status = amqp_socket_open(socket, hostname, port);
+ if (status) {
+ die("opening SSL/TLS connection");
+ }
+
+ amqp_set_socket(conn, socket);
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
diff --git a/examples/amqps_exchange_declare.c b/examples/amqps_exchange_declare.c
index 6e81087..1310235 100644
--- a/examples/amqps_exchange_declare.c
+++ b/examples/amqps_exchange_declare.c
@@ -35,18 +35,17 @@
#include <string.h>
#include <stdint.h>
-#include <amqp-ssl.h>
+#include <amqp-ssl-socket.h>
#include <amqp_framing.h>
#include "utils.h"
int main(int argc, char const * const *argv) {
char const *hostname;
- int port;
+ int port, status;
char const *exchange;
char const *exchangetype;
-
- int sockfd;
+ amqp_socket_t *socket;
amqp_connection_state_t conn;
if (argc < 5) {
@@ -62,11 +61,31 @@ int main(int argc, char const * const *argv) {
conn = amqp_new_connection();
- die_on_error(sockfd = amqp_open_ssl_socket(conn, hostname, port,
- argc > 5 ? argv[5] : NULL,
- argc > 7 ? argv[6] : NULL,
- argc > 7 ? argv[7] : NULL),
- "Opening SSL/TLS socket");
+ socket = amqp_ssl_socket_new();
+ if (!socket) {
+ die("creating SSL/TLS socket");
+ }
+
+ if (argc > 5) {
+ status = amqp_ssl_socket_set_cacert(socket, argv[5]);
+ if (status) {
+ die("setting CA certificate");
+ }
+ }
+
+ if (argc > 7) {
+ status = amqp_ssl_socket_set_key(socket, argv[6], argv[7]);
+ if (status) {
+ die("setting client key/cert");
+ }
+ }
+
+ status = amqp_socket_open(socket, hostname, port);
+ if (status) {
+ die("opening SSL/TLS connection");
+ }
+
+ amqp_set_socket(conn, socket);
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
diff --git a/examples/amqps_listen.c b/examples/amqps_listen.c
index 306bbaa..c6344bd 100644
--- a/examples/amqps_listen.c
+++ b/examples/amqps_listen.c
@@ -35,7 +35,7 @@
#include <string.h>
#include <stdint.h>
-#include <amqp-ssl.h>
+#include <amqp-ssl-socket.h>
#include <amqp_framing.h>
#include <assert.h>
@@ -44,11 +44,10 @@
int main(int argc, char const * const *argv) {
char const *hostname;
- int port;
+ int port, status;
char const *exchange;
char const *bindingkey;
-
- int sockfd;
+ amqp_socket_t *socket;
amqp_connection_state_t conn;
amqp_bytes_t queuename;
@@ -66,11 +65,31 @@ int main(int argc, char const * const *argv) {
conn = amqp_new_connection();
- die_on_error(sockfd = amqp_open_ssl_socket(conn, hostname, port,
- argc > 5 ? argv[5] : NULL,
- argc > 7 ? argv[6] : NULL,
- argc > 7 ? argv[7] : NULL),
- "Opening socket");
+ socket = amqp_ssl_socket_new();
+ if (!socket) {
+ die("creating SSL/TLS socket");
+ }
+
+ if (argc > 5) {
+ status = amqp_ssl_socket_set_cacert(socket, argv[5]);
+ if (status) {
+ die("setting CA certificate");
+ }
+ }
+
+ if (argc > 7) {
+ status = amqp_ssl_socket_set_key(socket, argv[6], argv[7]);
+ if (status) {
+ die("setting client key/cert");
+ }
+ }
+
+ status = amqp_socket_open(socket, hostname, port);
+ if (status) {
+ die("opening SSL/TLS connection");
+ }
+
+ amqp_set_socket(conn, socket);
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
diff --git a/examples/amqps_listenq.c b/examples/amqps_listenq.c
index 90b608a..0a9c687 100644
--- a/examples/amqps_listenq.c
+++ b/examples/amqps_listenq.c
@@ -35,7 +35,7 @@
#include <string.h>
#include <stdint.h>
-#include <amqp-ssl.h>
+#include <amqp-ssl-socket.h>
#include <amqp_framing.h>
#include <assert.h>
@@ -44,10 +44,9 @@
int main(int argc, char const * const *argv) {
char const *hostname;
- int port;
+ int port, status;
char const *queuename;
-
- int sockfd;
+ amqp_socket_t *socket;
amqp_connection_state_t conn;
if (argc < 4) {
@@ -62,11 +61,31 @@ int main(int argc, char const * const *argv) {
conn = amqp_new_connection();
- die_on_error(sockfd = amqp_open_ssl_socket(conn, hostname, port,
- argc > 4 ? argv[4] : NULL,
- argc > 6 ? argv[5] : NULL,
- argc > 6 ? argv[6] : NULL),
- "Opening socket");
+ socket = amqp_ssl_socket_new();
+ if (!socket) {
+ die("creating SSL/TLS socket");
+ }
+
+ if (argc > 4) {
+ status = amqp_ssl_socket_set_cacert(socket, argv[4]);
+ if (status) {
+ die("setting CA certificate");
+ }
+ }
+
+ if (argc > 6) {
+ status = amqp_ssl_socket_set_key(socket, argv[5], argv[6]);
+ if (status) {
+ die("setting client key/cert");
+ }
+ }
+
+ status = amqp_socket_open(socket, hostname, port);
+ if (status) {
+ die("opening SSL/TLS connection");
+ }
+
+ amqp_set_socket(conn, socket);
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
diff --git a/examples/amqps_producer.c b/examples/amqps_producer.c
index 6e00f29..43649ad 100644
--- a/examples/amqps_producer.c
+++ b/examples/amqps_producer.c
@@ -35,7 +35,7 @@
#include <string.h>
#include <stdint.h>
-#include <amqp-ssl.h>
+#include <amqp-ssl-socket.h>
#include <amqp_framing.h>
#include "utils.h"
@@ -106,11 +106,10 @@ static void send_batch(amqp_connection_state_t conn,
int main(int argc, char const * const *argv) {
char const *hostname;
- int port;
+ int port, status;
int rate_limit;
int message_count;
-
- int sockfd;
+ amqp_socket_t *socket;
amqp_connection_state_t conn;
if (argc < 5) {
@@ -126,11 +125,31 @@ int main(int argc, char const * const *argv) {
conn = amqp_new_connection();
- die_on_error(sockfd = amqp_open_ssl_socket(conn, hostname, port,
- argc > 5 ? argv[5] : NULL,
- argc > 7 ? argv[6] : NULL,
- argc > 7 ? argv[7] : NULL),
- "Opening SSL/TLS socket");
+ socket = amqp_ssl_socket_new();
+ if (!socket) {
+ die("creating SSL/TLS socket");
+ }
+
+ if (argc > 5) {
+ status = amqp_ssl_socket_set_cacert(socket, argv[5]);
+ if (status) {
+ die("setting CA certificate");
+ }
+ }
+
+ if (argc > 7) {
+ status = amqp_ssl_socket_set_key(socket, argv[6], argv[7]);
+ if (status) {
+ die("setting client key/cert");
+ }
+ }
+
+ status = amqp_socket_open(socket, hostname, port);
+ if (status) {
+ die("opening SSL/TLS connection");
+ }
+
+ amqp_set_socket(conn, socket);
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
diff --git a/examples/amqps_sendstring.c b/examples/amqps_sendstring.c
index 2315982..a888da8 100644
--- a/examples/amqps_sendstring.c
+++ b/examples/amqps_sendstring.c
@@ -35,19 +35,18 @@
#include <string.h>
#include <stdint.h>
-#include <amqp-ssl.h>
+#include <amqp-ssl-socket.h>
#include <amqp_framing.h>
#include "utils.h"
int main(int argc, char const * const *argv) {
char const *hostname;
- int port;
+ int port, status;
char const *exchange;
char const *routingkey;
char const *messagebody;
-
- int sockfd;
+ amqp_socket_t *socket;
amqp_connection_state_t conn;
if (argc < 6) {
@@ -64,11 +63,31 @@ int main(int argc, char const * const *argv) {
conn = amqp_new_connection();
- die_on_error(sockfd = amqp_open_ssl_socket(conn, hostname, port,
- argc > 6 ? argv[6] : NULL,
- argc > 8 ? argv[7] : NULL,
- argc > 8 ? argv[8] : NULL),
- "Opening socket");
+ socket = amqp_ssl_socket_new();
+ if (!socket) {
+ die("creating SSL/TLS socket");
+ }
+
+ if (argc > 6) {
+ status = amqp_ssl_socket_set_cacert(socket, argv[6]);
+ if (status) {
+ die("setting CA certificate");
+ }
+ }
+
+ if (argc > 8) {
+ status = amqp_ssl_socket_set_key(socket, argv[7], argv[8]);
+ if (status) {
+ die("setting client key/cert");
+ }
+ }
+
+ status = amqp_socket_open(socket, hostname, port);
+ if (status) {
+ die("opening SSL/TLS connection");
+ }
+
+ amqp_set_socket(conn, socket);
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
diff --git a/examples/amqps_unbind.c b/examples/amqps_unbind.c
index e429c6d..e9f4317 100644
--- a/examples/amqps_unbind.c
+++ b/examples/amqps_unbind.c
@@ -35,19 +35,18 @@
#include <string.h>
#include <stdint.h>
-#include <amqp-ssl.h>
+#include <amqp-ssl-socket.h>
#include <amqp_framing.h>
#include "utils.h"
int main(int argc, char const * const *argv) {
char const *hostname;
- int port;
+ int port, status;
char const *exchange;
char const *bindingkey;
char const *queue;
-
- int sockfd;
+ amqp_socket_t *socket;
amqp_connection_state_t conn;
if (argc < 6) {
@@ -64,11 +63,31 @@ int main(int argc, char const * const *argv) {
conn = amqp_new_connection();
- die_on_error(sockfd = amqp_open_ssl_socket(conn, hostname, port,
- argc > 6 ? argv[6] : NULL,
- argc > 8 ? argv[7] : NULL,
- argc > 8 ? argv[8] : NULL),
- "Opening socket");
+ socket = amqp_ssl_socket_new();
+ if (!socket) {
+ die("creating SSL/TLS socket");
+ }
+
+ if (argc > 6) {
+ status = amqp_ssl_socket_set_cacert(socket, argv[6]);
+ if (status) {
+ die("setting CA certificate");
+ }
+ }
+
+ if (argc > 8) {
+ status = amqp_ssl_socket_set_key(socket, argv[7], argv[8]);
+ if (status) {
+ die("setting client key/cert");
+ }
+ }
+
+ status = amqp_socket_open(socket, hostname, port);
+ if (status) {
+ die("opening SSL/TLS connection");
+ }
+
+ amqp_set_socket(conn, socket);
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
diff --git a/examples/utils.c b/examples/utils.c
index 609c354..0830738 100644
--- a/examples/utils.c
+++ b/examples/utils.c
@@ -34,6 +34,7 @@
* ***** END LICENSE BLOCK *****
*/
+#include <stdarg.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
@@ -45,8 +46,17 @@
#include "utils.h"
-void die_on_error(int x, char const *context)
+void die(const char *fmt, ...)
{
+ va_list ap;
+ va_start(ap, fmt);
+ vfprintf(stderr, fmt, ap);
+ va_end(ap);
+ fprintf(stderr, "\n");
+ exit(1);
+}
+
+void die_on_error(int x, char const *context) {
if (x < 0) {
char *errstr = amqp_error_string(-x);
fprintf(stderr, "%s: %s\n", context, errstr);
diff --git a/examples/utils.h b/examples/utils.h
index 2e7b15f..dea86da 100644
--- a/examples/utils.h
+++ b/examples/utils.h
@@ -37,6 +37,7 @@
* ***** END LICENSE BLOCK *****
*/
+void die(const char *fmt, ...);
extern void die_on_error(int x, char const *context);
extern void die_on_amqp_error(amqp_rpc_reply_t x, char const *context);
diff --git a/librabbitmq/CMakeLists.txt b/librabbitmq/CMakeLists.txt
index 1f78f4b..977b1fe 100644
--- a/librabbitmq/CMakeLists.txt
+++ b/librabbitmq/CMakeLists.txt
@@ -119,7 +119,7 @@ set(RABBITMQ_SOURCES
${AMQP_FRAMING_C_PATH}
amqp_api.c amqp.h
amqp_connection.c amqp_mem.c amqp_private.h amqp_socket.c amqp_table.c
- amqp_url.c
+ amqp_url.c amqp-socket.h amqp-tcp-socket.c amqp-tcp-socket.h
${SOCKET_IMPL}/socket.h ${SOCKET_IMPL}/socket.c
${AMQP_SSL_SRCS}
)
diff --git a/librabbitmq/amqp-openssl.c b/librabbitmq/amqp-openssl.c
index 531a2ff..16d81e0 100644
--- a/librabbitmq/amqp-openssl.c
+++ b/librabbitmq/amqp-openssl.c
@@ -24,7 +24,7 @@
#include "config.h"
#endif
-#include "amqp-ssl.h"
+#include "amqp-ssl-socket.h"
#include "amqp_private.h"
#include "threads.h"
#include <ctype.h>
@@ -54,22 +54,23 @@ static pthread_mutex_t openssl_init_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t *amqp_openssl_lockarray = NULL;
#endif /* ENABLE_THREAD_SAFETY */
-struct amqp_ssl_socket_context {
+struct amqp_ssl_socket_t {
+ amqp_socket_t base;
BIO *bio;
SSL_CTX *ctx;
char *buffer;
size_t length;
+ amqp_boolean_t verify;
};
static ssize_t
-amqp_ssl_socket_send(AMQP_UNUSED int sockfd,
+amqp_ssl_socket_send(void *base,
const void *buf,
size_t len,
- AMQP_UNUSED int flags,
- void *user_data)
+ AMQP_UNUSED int flags)
{
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
ssize_t sent;
- struct amqp_ssl_socket_context *self = user_data;
ERR_clear_error();
sent = BIO_write(self->bio, buf, len);
if (0 > sent) {
@@ -90,12 +91,11 @@ amqp_ssl_socket_send(AMQP_UNUSED int sockfd,
}
static ssize_t
-amqp_ssl_socket_writev(AMQP_UNUSED int sockfd,
+amqp_ssl_socket_writev(void *base,
const struct iovec *iov,
- int iovcnt,
- void *user_data)
+ int iovcnt)
{
- struct amqp_ssl_socket_context *self = user_data;
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
ssize_t written = -1;
char *bufferp;
size_t bytes;
@@ -118,19 +118,18 @@ amqp_ssl_socket_writev(AMQP_UNUSED int sockfd,
memcpy(bufferp, iov[i].iov_base, iov[i].iov_len);
bufferp += iov[i].iov_len;
}
- written = amqp_ssl_socket_send(sockfd, self->buffer, bytes, 0, self);
+ written = amqp_ssl_socket_send(self, self->buffer, bytes, 0);
exit:
return written;
}
static ssize_t
-amqp_ssl_socket_recv(AMQP_UNUSED int sockfd,
+amqp_ssl_socket_recv(void *base,
void *buf,
size_t len,
- AMQP_UNUSED int flags,
- void *user_data)
+ AMQP_UNUSED int flags)
{
- struct amqp_ssl_socket_context *self = user_data;
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
ssize_t received;
ERR_clear_error();
received = BIO_read(self->bio, buf, len);
@@ -150,157 +149,204 @@ amqp_ssl_socket_recv(AMQP_UNUSED int sockfd,
}
static int
-amqp_ssl_socket_close(int sockfd,
- void *user_data)
+amqp_ssl_socket_verify(void *base, const char *host)
{
- struct amqp_ssl_socket_context *self = user_data;
- if (self) {
- BIO_free_all(self->bio);
- SSL_CTX_free(self->ctx);
- free(self->buffer);
- free(self);
- }
- destroy_openssl();
- return 0 > sockfd ? -1 : 0;
-}
-
-static int
-amqp_ssl_socket_error(AMQP_UNUSED void *user_data)
-{
- return -1;
-}
-
-int
-amqp_open_ssl_socket(amqp_connection_state_t state,
- const char *host,
- int port,
- const char *cacert,
- const char *key,
- const char *cert)
-{
- SSL *ssl;
- X509 *peer;
- long result;
- X509_NAME *name;
- X509_NAME_ENTRY *entry;
- ASN1_STRING *entry_string;
- struct amqp_ssl_socket_context *self;
- int sockfd, status, pos, utf8_length;
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
unsigned char *utf8_value = NULL, *cp, ch;
- initialize_openssl();
- self = calloc(1, sizeof(*self));
- if (!self) {
- goto error;
- }
- self->ctx = SSL_CTX_new(SSLv23_client_method());
- if (!self->ctx) {
- goto error;
- }
- status = SSL_CTX_load_verify_locations(self->ctx, cacert, NULL);
- if (1 != status) {
- goto error;
- }
- if (key && cert) {
- status = SSL_CTX_use_PrivateKey_file(self->ctx, key,
- SSL_FILETYPE_PEM);
- if (1 != status) {
- goto error;
- }
- status = SSL_CTX_use_certificate_chain_file(self->ctx, cert);
- if (1 != status) {
- goto error;
- }
- }
- self->bio = BIO_new_ssl_connect(self->ctx);
- if (!self->bio) {
- goto error;
- }
+ ASN1_STRING *entry_string;
+ X509_NAME_ENTRY *entry;
+ int pos, utf8_length;
+ X509_NAME *name;
+ X509 *peer;
+ SSL *ssl;
BIO_get_ssl(self->bio, &ssl);
- SSL_set_mode(ssl, SSL_MODE_AUTO_RETRY);
- BIO_set_conn_hostname(self->bio, host);
- BIO_set_conn_int_port(self->bio, &port);
- status = BIO_do_connect(self->bio);
- if (1 != status) {
- goto error;
- }
- result = SSL_get_verify_result(ssl);
- if (X509_V_OK != result) {
- goto error;
- }
peer = SSL_get_peer_certificate(ssl);
if (!peer) {
- goto error;
+ return -1;
}
name = X509_get_subject_name(peer);
if (!name) {
- goto error;
+ return -1;
}
pos = X509_NAME_get_index_by_NID(name, NID_commonName, -1);
if (0 > pos) {
- goto error;
+ return -1;
}
entry = X509_NAME_get_entry(name, pos);
if (!entry) {
- goto error;
+ return -1;
}
entry_string = X509_NAME_ENTRY_get_data(entry);
if (!entry_string) {
- goto error;
+ return -1;
}
utf8_length = ASN1_STRING_to_UTF8(&utf8_value, entry_string);
if (0 > utf8_length) {
- goto error;
+ return -1;
}
while (utf8_length > 0 && utf8_value[utf8_length - 1] == 0) {
--utf8_length;
}
if (utf8_length >= 256) {
- goto error;
+ return -1;
}
if ((size_t)utf8_length != strlen((char *)utf8_value)) {
- goto error;
+ return -1;
}
for (cp = utf8_value; (ch = *cp) != '\0'; ++cp) {
if (isascii(ch) && !isprint(ch)) {
- goto error;
+ return -1;
}
}
#ifdef _MSC_VER
#define strcasecmp _stricmp
#endif
if (strcasecmp(host, (char *)utf8_value)) {
- goto error;
+ return -1;
}
#ifdef _MSC_VER
#undef strcasecmp
#endif
- sockfd = BIO_get_fd(self->bio, NULL);
- amqp_set_sockfd_full(state, sockfd,
- amqp_ssl_socket_writev,
- amqp_ssl_socket_send,
- amqp_ssl_socket_recv,
- amqp_ssl_socket_close,
- amqp_ssl_socket_error,
- self);
-exit:
- OPENSSL_free(utf8_value);
- return sockfd;
+ return 0;
+}
+
+static int
+amqp_ssl_socket_open(void *base, const char *host, int port)
+{
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+ long result;
+ int status;
+ SSL *ssl;
+ self->bio = BIO_new_ssl_connect(self->ctx);
+ if (!self->bio) {
+ return -1;
+ }
+ BIO_get_ssl(self->bio, &ssl);
+ SSL_set_mode(ssl, SSL_MODE_AUTO_RETRY);
+ BIO_set_conn_hostname(self->bio, host);
+ BIO_set_conn_int_port(self->bio, &port);
+ status = BIO_do_connect(self->bio);
+ if (1 != status) {
+ return -1;
+ }
+ result = SSL_get_verify_result(ssl);
+ if (X509_V_OK != result) {
+ return -1;
+ }
+ if (self->verify) {
+ int status = amqp_ssl_socket_verify(self, host);
+ if (status) {
+ return -1;
+ }
+ }
+ return 0;
+}
+
+static int
+amqp_ssl_socket_close(void *base)
+{
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+ if (self) {
+ BIO_free_all(self->bio);
+ SSL_CTX_free(self->ctx);
+ free(self->buffer);
+ free(self);
+ }
+ destroy_openssl();
+ return 0;
+}
+
+static int
+amqp_ssl_socket_error(AMQP_UNUSED void *base)
+{
+ return -1;
+}
+
+static int
+amqp_ssl_socket_get_sockfd(void *base)
+{
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+ return BIO_get_fd(self->bio, NULL);
+}
+
+amqp_socket_t *
+amqp_ssl_socket_new(void)
+{
+ struct amqp_ssl_socket_t *self = calloc(1, sizeof(*self));
+ int status;
+ if (!self) {
+ goto error;
+ }
+ status = initialize_openssl();
+ if (status) {
+ goto error;
+ }
+ self->ctx = SSL_CTX_new(SSLv23_client_method());
+ if (!self->ctx) {
+ goto error;
+ }
+ self->base.writev = amqp_ssl_socket_writev;
+ self->base.send = amqp_ssl_socket_send;
+ self->base.recv = amqp_ssl_socket_recv;
+ self->base.open = amqp_ssl_socket_open;
+ self->base.close = amqp_ssl_socket_close;
+ self->base.error = amqp_ssl_socket_error;
+ self->base.get_sockfd = amqp_ssl_socket_get_sockfd;
+ self->verify = 1;
+ return (amqp_socket_t *)self;
error:
- OPENSSL_free(utf8_value);
- amqp_ssl_socket_close(-1, self);
- sockfd = -1;
- goto exit;
+ amqp_socket_close((amqp_socket_t *)self);
+ return NULL;
+}
+
+int
+amqp_ssl_socket_set_cacert(amqp_socket_t *base,
+ const char *cacert)
+{
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+ int status = SSL_CTX_load_verify_locations(self->ctx, cacert, NULL);
+ if (1 != status) {
+ return -1;
+ }
+ return 0;
+}
+
+int
+amqp_ssl_socket_set_key(amqp_socket_t *base,
+ const char *key,
+ const char *cert)
+{
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+ if (key && cert) {
+ int status = SSL_CTX_use_PrivateKey_file(self->ctx, key,
+ SSL_FILETYPE_PEM);
+ if (1 != status) {
+ return -1;
+ }
+ status = SSL_CTX_use_certificate_chain_file(self->ctx, cert);
+ if (1 != status) {
+ return -1;
+ }
+ return 0;
+ }
+ return -1;
}
void
-amqp_set_initialize_ssl_library(amqp_boolean_t do_initialize)
+amqp_ssl_socket_set_verify(amqp_socket_t *base,
+ amqp_boolean_t verify)
{
- if (!openssl_initialized)
- {
- do_initialize_openssl = do_initialize;
- }
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+ self->verify = verify;
}
+void
+amqp_set_initialize_ssl_library(amqp_boolean_t do_initialize)
+{
+ if (!openssl_initialized) {
+ do_initialize_openssl = do_initialize;
+ }
+}
#ifdef ENABLE_THREAD_SAFETY
unsigned long
diff --git a/librabbitmq/amqp-socket.h b/librabbitmq/amqp-socket.h
new file mode 100644
index 0000000..1c21da0
--- /dev/null
+++ b/librabbitmq/amqp-socket.h
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2012 Michael Steinert
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ */
+
+#ifndef AMQP_SOCKET_H
+#define AMQP_SOCKET_H
+
+#include "amqp.h"
+#include "socket.h"
+
+AMQP_BEGIN_DECLS
+
+/* Socket callbacks. */
+typedef ssize_t (*amqp_socket_writev_fn)(void *, const struct iovec *, int);
+typedef ssize_t (*amqp_socket_send_fn)(void *, const void *, size_t, int);
+typedef ssize_t (*amqp_socket_recv_fn)(void *, void *, size_t, int);
+typedef int (*amqp_socket_open_fn)(void *, const char *, int);
+typedef int (*amqp_socket_close_fn)(void *);
+typedef int (*amqp_socket_error_fn)(void *);
+typedef int (*amqp_socket_get_sockfd_fn)(void *);
+
+struct amqp_socket_t_ {
+ amqp_socket_writev_fn writev;
+ amqp_socket_send_fn send;
+ amqp_socket_recv_fn recv;
+ amqp_socket_open_fn open;
+ amqp_socket_close_fn close;
+ amqp_socket_error_fn error;
+ amqp_socket_get_sockfd_fn get_sockfd;
+};
+
+ssize_t
+amqp_socket_writev(amqp_socket_t *self, const struct iovec *iov, int iovcnt);
+
+ssize_t
+amqp_socket_send(amqp_socket_t *self, const void *buf, size_t len, int flags);
+
+ssize_t
+amqp_socket_recv(amqp_socket_t *self, void *buf, size_t len, int flags);
+
+AMQP_END_DECLS
+
+#endif /* AMQP_SOCKET_H */
diff --git a/librabbitmq/amqp-ssl.h b/librabbitmq/amqp-ssl-socket.h
index 0a1ef56..bea5b75 100644
--- a/librabbitmq/amqp-ssl.h
+++ b/librabbitmq/amqp-ssl-socket.h
@@ -31,45 +31,71 @@
#include <amqp.h>
/**
- * Open an SSL connection to an AMQP broker.
+ * Create a new SSL/TLS socket object.
*
- * This function will setup an AMQP connection state object for SSL/TLS
- * communication. The caller of this function should not use the returned
- * file descriptor as input to amqp_set_sockfd() or amqp_set_sockfd_full(),
- * or directly for network I/O.
+ * \return A new socket object or NULL if an error occurred.
+ */
+AMQP_PUBLIC_FUNCTION
+amqp_socket_t *
+AMQP_CALL
+amqp_ssl_socket_new(void);
+
+/**
+ * Set the CA certificate.
+ *
+ * \param [in,out] self An SSL/TLS socket object.
+ * \param [in] cacert Path to the CA cert file in PEM format.
*
- * If a client key or certificate file is provide then they should both be
- * provided.
+ * \return Zero if successful, -1 otherwise.
+ */
+AMQP_PUBLIC_FUNCTION
+int
+AMQP_CALL
+amqp_ssl_socket_set_cacert(amqp_socket_t *self,
+ const char *cacert);
+
+/**
+ * Set the client key & certificate.
*
- * \param [in,out] state An AMQP connection state object.
- * \param [in] host The name of the host to connect to.
- * \param [in] port The port to connect on.
- * \param [in] caert Path the CA cert file in PEM format.
- * \param [in] key Path to the client key in PEM format. (may be NULL)
- * \param [in] cert Path to the client cert in PEM format. (may be NULL)
+ * \param [in,out] self An SSL/TLS socket object.
+ * \param [in] key Path to the client key in PEM format.
+ * \param [in] cert Path to the client certificate in PEM foramt.
*
- * \return A socket file-descriptor (-1 if an error occurred).
+ * \return Zero if successful, false otherwise.
*/
AMQP_PUBLIC_FUNCTION
int
-amqp_open_ssl_socket(amqp_connection_state_t state,
- const char *host,
- int port,
- const char *cacert,
- const char *key,
- const char *cert);
+AMQP_CALL
+amqp_ssl_socket_set_key(amqp_socket_t *self,
+ const char *key,
+ const char *cert);
/**
- * Sets whether rabbitmq-c initializes the underlying SSL library
+ * Enable or disable peer verification.
+ *
+ * If peer verification is enabled then the common name in the server
+ * certificate must match the server name.
*
- * For SSL libraries that require a one-time initialization across
+ * \param [in,out] self An SSL/TLS socket object.
+ * \param [in] verify Enable or disable peer verification.
+ */
+AMQP_PUBLIC_FUNCTION
+void
+AMQP_CALL
+amqp_ssl_socket_set_verify(amqp_socket_t *self,
+ amqp_boolean_t verify);
+
+/**
+ * Sets whether rabbitmq-c initializes the underlying SSL library.
+ *
+ * For SSL libraries that require a one-time initialization across
* a whole program (e.g., OpenSSL) this sets whether or not rabbitmq-c
- * will initialize the SSL library when the first call to
+ * will initialize the SSL library when the first call to
* amqp_open_ssl_socket() is made. You should call this function with
* do_init = 0 if the underlying SSL library is intialized somewhere else
- * the program.
+ * the program.
*
- * Failing to initialize or double initialization of the SSL library will
+ * Failing to initialize or double initialization of the SSL library will
* result in undefined behavior
*
* By default rabbitmq-c will initialize the underlying SSL library
@@ -77,12 +103,14 @@ amqp_open_ssl_socket(amqp_connection_state_t state,
* NOTE: calling this function after the first socket has been opened with
* amqp_open_ssl_socket() will not have any effect.
*
- * \param [in] do_initalize, if 0 rabbitmq-c will not initialize the SSL library,
- * otherwise rabbitmq-c will initialize the SSL library
- *
+ * \param [in] do_initalize If 0 rabbitmq-c will not initialize the SSL
+ * library, otherwise rabbitmq-c will initialize the
+ * SL library
+ *
*/
AMQP_PUBLIC_FUNCTION
void
+AMQP_CALL
amqp_set_initialize_ssl_library(amqp_boolean_t do_initialize);
#endif /* AMQP_SSL_H */
diff --git a/librabbitmq/amqp-tcp-socket.c b/librabbitmq/amqp-tcp-socket.c
new file mode 100644
index 0000000..10430a9
--- /dev/null
+++ b/librabbitmq/amqp-tcp-socket.c
@@ -0,0 +1,143 @@
+/*
+ * Copyright 2012 Michael Steinert
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "amqp_private.h"
+#include "amqp-tcp-socket.h"
+#include <stdlib.h>
+
+struct amqp_tcp_socket_t {
+ amqp_socket_t base;
+ int sockfd;
+};
+
+static ssize_t
+amqp_tcp_socket_writev(void *base, const struct iovec *iov, int iovcnt)
+{
+ struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
+ return amqp_os_socket_writev(self->sockfd, iov, iovcnt);
+}
+
+static ssize_t
+amqp_tcp_socket_send(void *base, const void *buf, size_t len, int flags)
+{
+ struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
+ return send(self->sockfd, buf, len, flags);
+}
+
+static ssize_t
+amqp_tcp_socket_recv(void *base, void *buf, size_t len, int flags)
+{
+ struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
+ return recv(self->sockfd, buf, len, flags);
+}
+
+static int
+amqp_tcp_socket_open(void *base, const char *host, int port)
+{
+ struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
+ struct sockaddr_in addr;
+ int status, one = 1;
+ struct hostent *he;
+ status = amqp_socket_init();
+ if (status) {
+ return status;
+ }
+ he = gethostbyname(host);
+ if (!he) {
+ return -ERROR_GETHOSTBYNAME_FAILED;
+ }
+ addr.sin_family = AF_INET;
+ addr.sin_port = htons(port);
+ addr.sin_addr.s_addr = *(uint32_t *)he->h_addr_list[0];
+ self->sockfd = amqp_socket_socket(PF_INET, SOCK_STREAM, 0);
+ if (-1 == self->sockfd) {
+ return -amqp_os_socket_error();
+ }
+ status = amqp_socket_setsockopt(self->sockfd, IPPROTO_TCP, TCP_NODELAY,
+ &one, sizeof(one));
+ if (0 > status) {
+ status = -amqp_os_socket_error();
+ amqp_os_socket_close(self->sockfd);
+ return status;
+ }
+ status = connect(self->sockfd, (struct sockaddr *)&addr, sizeof(addr));
+ if (0 > status) {
+ status = -amqp_os_socket_error();
+ amqp_os_socket_close(self->sockfd);
+ return status;
+ }
+ return 0;
+}
+
+static int
+amqp_tcp_socket_close(void *base)
+{
+ struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
+ int status = -1;
+ if (self) {
+ status = amqp_os_socket_close(self->sockfd);
+ free(self);
+ }
+ return status;
+}
+
+static int
+amqp_tcp_socket_error(AMQP_UNUSED void *base)
+{
+ return amqp_os_socket_error();
+}
+
+static int
+amqp_tcp_socket_get_sockfd(void *base)
+{
+ struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
+ return self->sockfd;
+}
+
+amqp_socket_t *
+amqp_tcp_socket_new(void)
+{
+ struct amqp_tcp_socket_t *self = calloc(1, sizeof(*self));
+ if (!self) {
+ return NULL;
+ }
+ self->base.writev = amqp_tcp_socket_writev;
+ self->base.send = amqp_tcp_socket_send;
+ self->base.recv = amqp_tcp_socket_recv;
+ self->base.open = amqp_tcp_socket_open;
+ self->base.close = amqp_tcp_socket_close;
+ self->base.error = amqp_tcp_socket_error;
+ self->base.get_sockfd = amqp_tcp_socket_get_sockfd;
+ self->sockfd = -1;
+ return (amqp_socket_t *)self;
+}
+
+void
+amqp_tcp_socket_set_sockfd(amqp_socket_t *base, int sockfd)
+{
+ struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
+ self->sockfd = sockfd;
+}
diff --git a/librabbitmq/amqp-tcp-socket.h b/librabbitmq/amqp-tcp-socket.h
new file mode 100644
index 0000000..6e0afa0
--- /dev/null
+++ b/librabbitmq/amqp-tcp-socket.h
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2012 Michael Steinert
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ */
+
+#ifndef AMQP_TCP_SOCKET_H
+#define AMQP_TCP_SOCKET_H
+
+#include <amqp.h>
+
+AMQP_BEGIN_DECLS
+
+/**
+ * Create a new TCP socket.
+ *
+ * \return A new socket object or NULL if an error occurred.
+ */
+AMQP_PUBLIC_FUNCTION
+amqp_socket_t *
+AMQP_CALL
+amqp_tcp_socket_new(void);
+
+/**
+ * Assign an open file descriptor to a socket object.
+ *
+ * This function must not be used in conjunction with amqp_socket_open().
+ *
+ * \param [in,out] self A TCP socket object.
+ * \param [in] sockfd An open socket descriptor.
+ */
+AMQP_PUBLIC_FUNCTION
+void
+AMQP_CALL
+amqp_tcp_socket_set_sockfd(amqp_socket_t *base, int sockfd);
+
+AMQP_END_DECLS
+
+#endif /* AMQP_TCP_SOCKET_H */
diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h
index 0efbc74..734094b 100644
--- a/librabbitmq/amqp.h
+++ b/librabbitmq/amqp.h
@@ -52,7 +52,6 @@
*/
#if defined(_WIN32) && defined(_MSC_VER)
-struct iovec;
# if defined(AMQP_BUILD) && !defined(AMQP_STATIC)
# define AMQP_PUBLIC_FUNCTION __declspec(dllexport)
# define AMQP_PUBLIC_VARIABLE __declspec(dllexport) extern
@@ -67,7 +66,6 @@ struct iovec;
# define AMQP_CALL __cdecl
#elif defined(_WIN32) && defined(__BORLANDC__)
-struct iovec;
# if defined(AMQP_BUILD) && !defined(AMQP_STATIC)
# define AMQP_PUBLIC_FUNCTION __declspec(dllexport)
# define AMQP_PUBLIC_VARIABLE __declspec(dllexport) extern
@@ -82,7 +80,6 @@ struct iovec;
# define AMQP_CALL __cdecl
#elif defined(_WIN32) && defined(__MINGW32__)
-struct iovec;
# if defined(AMQP_BUILD) && !defined(AMQP_STATIC)
# define AMQP_PUBLIC_FUNCTION __declspec(dllexport)
# define AMQP_PUBLIC_VARIABLE __declspec(dllexport)
@@ -97,7 +94,6 @@ struct iovec;
# define AMQP_CALL __cdecl
#elif defined(_WIN32) && defined(__CYGWIN__)
-struct iovec;
# if defined(AMQP_BUILD) && !defined(AMQP_STATIC)
# define AMQP_PUBLIC_FUNCTION __declspec(dllexport)
# define AMQP_PUBLIC_VARIABLE __declspec(dllexport)
@@ -124,6 +120,13 @@ struct iovec;
# define AMQP_CALL
#endif
+#if __GNUC__ > 3 || (__GNUC__ == 3 && __GNUC_MINOR__ >= 1)
+# define AMQP_DEPRECATED \
+ __attribute__ ((__deprecated__))
+#else
+# define AMQP_DEPRECATED
+#endif
+
/* Define ssize_t on Win32/64 platforms
See: http://lists.cs.uiuc.edu/pipermail/llvmdev/2010-April/030649.html for details
*/
@@ -321,12 +324,7 @@ typedef enum amqp_sasl_method_enum_ {
/* Opaque struct. */
typedef struct amqp_connection_state_t_ *amqp_connection_state_t;
-/* Socket callbacks. */
-typedef ssize_t (*amqp_socket_writev_fn)(int, const struct iovec *, int, void *);
-typedef ssize_t (*amqp_socket_send_fn)(int, const void *, size_t, int, void *);
-typedef ssize_t (*amqp_socket_recv_fn)(int, void *, size_t, int, void *);
-typedef int (*amqp_socket_close_fn)(int, void *);
-typedef int (*amqp_socket_error_fn)(void *);
+typedef struct amqp_socket_t_ amqp_socket_t;
AMQP_PUBLIC_FUNCTION
char const *
@@ -389,17 +387,12 @@ AMQP_CALL amqp_get_sockfd(amqp_connection_state_t state);
AMQP_PUBLIC_FUNCTION
void
-AMQP_CALL amqp_set_sockfd(amqp_connection_state_t state, int sockfd);
+AMQP_CALL amqp_set_sockfd(amqp_connection_state_t state, int sockfd)
+ AMQP_DEPRECATED;
AMQP_PUBLIC_FUNCTION
void
-amqp_set_sockfd_full(amqp_connection_state_t state, int sockfd,
- amqp_socket_writev_fn writev_fn,
- amqp_socket_send_fn send_fn,
- amqp_socket_recv_fn recv_fn,
- amqp_socket_close_fn close_fn,
- amqp_socket_error_fn error_fn,
- void *user_data);
+AMQP_CALL amqp_set_socket(amqp_connection_state_t state, amqp_socket_t *socket);
AMQP_PUBLIC_FUNCTION
int
@@ -444,7 +437,8 @@ AMQP_CALL amqp_table_entry_cmp(void const *entry1, void const *entry2);
AMQP_PUBLIC_FUNCTION
int
-AMQP_CALL amqp_open_socket(char const *hostname, int portnumber);
+AMQP_CALL amqp_open_socket(char const *hostname, int portnumber)
+ AMQP_DEPRECATED;
AMQP_PUBLIC_FUNCTION
int
@@ -599,6 +593,27 @@ AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_parse_url(char *url, struct amqp_connection_info *parsed);
+/* socket API */
+AMQP_PUBLIC_FUNCTION
+int
+AMQP_CALL
+amqp_socket_open(amqp_socket_t *self, const char *host, int port);
+
+AMQP_PUBLIC_FUNCTION
+int
+AMQP_CALL
+amqp_socket_close(amqp_socket_t *self);
+
+AMQP_PUBLIC_FUNCTION
+int
+AMQP_CALL
+amqp_socket_error(amqp_socket_t *self);
+
+AMQP_PUBLIC_FUNCTION
+int
+AMQP_CALL
+amqp_socket_get_sockfd(amqp_socket_t *self);
+
AMQP_END_DECLS
#include <amqp_framing.h>
diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c
index 6468da7..4666100 100644
--- a/librabbitmq/amqp_connection.c
+++ b/librabbitmq/amqp_connection.c
@@ -38,8 +38,10 @@
#include "config.h"
#endif
+#include "amqp-tcp-socket.h"
#include "amqp_private.h"
#include <assert.h>
+#include <errno.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
@@ -59,18 +61,6 @@
_check_state->state); \
}
-static ssize_t amqp_socket_send(int sockfd, const void *buf, size_t len,
- int flags, AMQP_UNUSED void *user_data)
-{
- return send(sockfd, buf, len, flags);
-}
-
-static ssize_t amqp_socket_recv(int sockfd, void *buf, size_t len, int flags,
- AMQP_UNUSED void *user_data)
-{
- return recv(sockfd, buf, len, flags);
-}
-
amqp_connection_state_t amqp_new_connection(void)
{
int res;
@@ -101,7 +91,6 @@ amqp_connection_state_t amqp_new_connection(void)
is also the minimum frame size */
state->target_size = 8;
- state->sockfd = -1;
state->sock_inbound_buffer.len = INITIAL_INBOUND_SOCK_BUFFER_SIZE;
state->sock_inbound_buffer.bytes = malloc(INITIAL_INBOUND_SOCK_BUFFER_SIZE);
if (state->sock_inbound_buffer.bytes == NULL) {
@@ -120,36 +109,24 @@ out_nomem:
int amqp_get_sockfd(amqp_connection_state_t state)
{
- return state->sockfd;
+ return amqp_socket_get_sockfd(state->socket);
}
void amqp_set_sockfd(amqp_connection_state_t state,
int sockfd)
{
- state->sockfd = sockfd;
- state->writev = amqp_socket_writev;
- state->send = amqp_socket_send;
- state->recv = amqp_socket_recv;
- state->close = amqp_socket_close;
- state->error = amqp_socket_error;
- state->user_data = NULL;
+ amqp_socket_t *socket = amqp_tcp_socket_new();
+ if (!socket) {
+ amqp_abort("%s", strerror(errno));
+ }
+ amqp_tcp_socket_set_sockfd(socket, sockfd);
+ amqp_set_socket(state, socket);
}
-void amqp_set_sockfd_full(amqp_connection_state_t state, int sockfd,
- amqp_socket_writev_fn writev_fn,
- amqp_socket_send_fn send_fn,
- amqp_socket_recv_fn recv_fn,
- amqp_socket_close_fn close_fn,
- amqp_socket_error_fn error_fn,
- void *user_data)
+void amqp_set_socket(amqp_connection_state_t state, amqp_socket_t *socket)
{
- state->sockfd = sockfd;
- state->writev = writev_fn;
- state->send = send_fn;
- state->recv = recv_fn;
- state->close = close_fn;
- state->error = error_fn;
- state->user_data = user_data;
+ amqp_socket_close(state->socket);
+ state->socket = socket;
}
int amqp_tune_connection(amqp_connection_state_t state,
@@ -193,8 +170,8 @@ int amqp_destroy_connection(amqp_connection_state_t state)
empty_amqp_pool(&state->decoding_pool);
free(state->outbound_buffer.bytes);
free(state->sock_inbound_buffer.bytes);
- if (state->sockfd >= 0 && state->close(state->sockfd, state->user_data) < 0)
- status = -state->error(state->user_data);
+ if (amqp_socket_close(state->socket) < 0)
+ status = -amqp_socket_error(state->socket);
free(state);
}
return status;
@@ -425,7 +402,7 @@ int amqp_send_frame(amqp_connection_state_t state,
iov[2].iov_base = &frame_end_byte;
iov[2].iov_len = FOOTER_SIZE;
- res = state->writev(state->sockfd, iov, 3, state->user_data);
+ res = amqp_socket_writev(state->socket, iov, 3);
} else {
size_t out_frame_len;
amqp_bytes_t encoded;
@@ -473,13 +450,13 @@ int amqp_send_frame(amqp_connection_state_t state,
amqp_e32(out_frame, 3, out_frame_len);
amqp_e8(out_frame, out_frame_len + HEADER_SIZE, AMQP_FRAME_END);
- res = state->send(state->sockfd, out_frame,
- out_frame_len + HEADER_SIZE + FOOTER_SIZE,
- MSG_NOSIGNAL, state->user_data);
+ res = amqp_socket_send(state->socket, out_frame,
+ out_frame_len + HEADER_SIZE + FOOTER_SIZE,
+ MSG_NOSIGNAL);
}
if (res < 0) {
- return -state->error(state->user_data);
+ return -amqp_socket_error(state->socket);
} else {
return 0;
}
diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h
index 7c77af8..d32f664 100644
--- a/librabbitmq/amqp_private.h
+++ b/librabbitmq/amqp_private.h
@@ -92,7 +92,7 @@
char *
amqp_os_error_string(int err);
-#include "socket.h"
+#include "amqp-socket.h"
/*
* Connection states: XXX FIX THIS
@@ -150,13 +150,7 @@ struct amqp_connection_state_t_ {
amqp_bytes_t outbound_buffer;
- int sockfd;
- amqp_socket_writev_fn writev;
- amqp_socket_send_fn send;
- amqp_socket_recv_fn recv;
- amqp_socket_close_fn close;
- amqp_socket_error_fn error;
- void *user_data;
+ amqp_socket_t *socket;
amqp_bytes_t sock_inbound_buffer;
size_t sock_inbound_offset;
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index ab1674d..d7be798 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -49,6 +49,51 @@
#include <stdlib.h>
#include <string.h>
+ssize_t
+amqp_socket_writev(amqp_socket_t *self, const struct iovec *iov, int iovcnt)
+{
+ return self->writev(self, iov, iovcnt);
+}
+
+ssize_t
+amqp_socket_send(amqp_socket_t *self, const void *buf, size_t len, int flags)
+{
+ return self->send(self, buf, len, flags);
+}
+
+ssize_t
+amqp_socket_recv(amqp_socket_t *self, void *buf, size_t len, int flags)
+{
+ return self->recv(self, buf, len, flags);
+}
+
+int
+amqp_socket_open(amqp_socket_t *self, const char *host, int port)
+{
+ return self->open(self, host, port);
+}
+
+int
+amqp_socket_close(amqp_socket_t *self)
+{
+ if (self) {
+ return self->close(self);
+ }
+ return 0;
+}
+
+int
+amqp_socket_error(amqp_socket_t *self)
+{
+ return self->error(self);
+}
+
+int
+amqp_socket_get_sockfd(amqp_socket_t *self)
+{
+ return self->get_sockfd(self);
+}
+
int amqp_open_socket(char const *hostname,
int portnumber)
{
@@ -120,7 +165,7 @@ int amqp_send_header(amqp_connection_state_t state)
AMQP_PROTOCOL_VERSION_MINOR,
AMQP_PROTOCOL_VERSION_REVISION
};
- return state->send(state->sockfd, (void *)header, 8, 0, state->user_data);
+ return amqp_socket_send(state->socket, header, 8, 0);
}
static amqp_bytes_t sasl_method_name(amqp_sasl_method_enum method)
@@ -217,13 +262,13 @@ static int wait_frame_inner(amqp_connection_state_t state,
assert(res != 0);
}
- res = state->recv(state->sockfd, state->sock_inbound_buffer.bytes,
- state->sock_inbound_buffer.len, 0, state->user_data);
+ res = amqp_socket_recv(state->socket, state->sock_inbound_buffer.bytes,
+ state->sock_inbound_buffer.len, 0);
if (res <= 0) {
if (res == 0) {
return -ERROR_CONNECTION_CLOSED;
} else {
- return -state->error(state->user_data);
+ return -amqp_socket_error(state->socket);
}
}
diff --git a/librabbitmq/unix/socket.c b/librabbitmq/unix/socket.c
index 1f2dff8..af1015d 100644
--- a/librabbitmq/unix/socket.c
+++ b/librabbitmq/unix/socket.c
@@ -82,20 +82,20 @@ amqp_os_error_string(int err)
}
int
-amqp_socket_close(int sockfd, AMQP_UNUSED void *user_data)
+amqp_os_socket_close(int sockfd)
{
return close(sockfd);
}
ssize_t
-amqp_socket_writev(int sockfd, const struct iovec *iov,
- int iovcnt, AMQP_UNUSED void *user_data)
+amqp_os_socket_writev(int sockfd, const struct iovec *iov,
+ int iovcnt)
{
return writev(sockfd, iov, iovcnt);
}
int
-amqp_socket_error(AMQP_UNUSED void *user_data)
+amqp_os_socket_error(void)
{
return errno | ERROR_CATEGORY_OS;
}
diff --git a/librabbitmq/unix/socket.h b/librabbitmq/unix/socket.h
index af5cde8..34de1a2 100644
--- a/librabbitmq/unix/socket.h
+++ b/librabbitmq/unix/socket.h
@@ -52,17 +52,13 @@ int
amqp_socket_socket(int domain, int type, int proto);
int
-amqp_socket_error(void *user_data);
+amqp_os_socket_error(void);
int
-amqp_socket_socket(int domain, int type, int proto);
-
-int
-amqp_socket_close(int sockfd, void *user_data);
+amqp_os_socket_close(int sockfd);
ssize_t
-amqp_socket_writev(int sockfd, const struct iovec *iov, int iovcnt,
- void *user_data);
+amqp_os_socket_writev(int sockfd, const struct iovec *iov, int iovcnt);
#define amqp_socket_setsockopt setsockopt
diff --git a/tools/common.c b/tools/common.c
index 77bcb61..9738ba6 100644
--- a/tools/common.c
+++ b/tools/common.c
@@ -39,7 +39,10 @@
#endif
#include "common.h"
-#include <amqp-ssl.h>
+#ifdef WITH_SSL
+#include <amqp-ssl-socket.h>
+#endif
+#include <amqp-tcp-socket.h>
#include <errno.h>
#include <fcntl.h>
#include <stdarg.h>
@@ -220,8 +223,6 @@ struct poptOption connect_options[] = {
static void init_connection_info(struct amqp_connection_info *ci)
{
- struct amqp_connection_info defaults;
-
ci->user = NULL;
ci->password = NULL;
ci->host = NULL;
@@ -229,6 +230,8 @@ static void init_connection_info(struct amqp_connection_info *ci)
ci->vhost = NULL;
ci->user = NULL;
+ amqp_default_connection_info(ci);
+
if (amqp_url)
die_amqp_error(amqp_parse_url(strdup(amqp_url), ci),
"Parsing URL '%s'", amqp_url);
@@ -312,30 +315,12 @@ static void init_connection_info(struct amqp_connection_info *ci)
ci->vhost = amqp_vhost;
}
-
-#if WITH_SSL
- if (amqp_ssl) {
- ci->ssl = true;
- }
-#endif
-
- amqp_default_connection_info(&defaults);
-
- if (!ci->user)
- ci->user = defaults.user;
- if (!ci->password)
- ci->password = defaults.password;
- if (!ci->host)
- ci->host = defaults.host;
- if (ci->port < 0)
- ci->port = defaults.port;
- if (!ci->vhost)
- ci->vhost = defaults.vhost;
}
amqp_connection_state_t make_connection(void)
{
- int s;
+ int status;
+ amqp_socket_t *socket;
struct amqp_connection_info ci;
amqp_connection_state_t conn;
@@ -343,16 +328,30 @@ amqp_connection_state_t make_connection(void)
conn = amqp_new_connection();
if (ci.ssl) {
#ifdef WITH_SSL
- s = amqp_open_ssl_socket(conn, ci.host, ci.port, amqp_cacert,
- amqp_key, amqp_cert);
+ socket = amqp_ssl_socket_new();
+ if (!socket) {
+ die("creating SSL/TLS socket");
+ }
+ if (amqp_cacert) {
+ amqp_ssl_socket_set_cacert(socket, amqp_cacert);
+ }
+ if (amqp_key && amqp_cert) {
+ amqp_ssl_socket_set_key(socket, amqp_key, amqp_cert);
+ }
#else
die("librabbitmq was not built with SSL/TLS support");
#endif
} else {
- s = amqp_open_socket(ci.host, ci.port);
- amqp_set_sockfd(conn, s);
+ socket = amqp_tcp_socket_new();
+ if (!socket) {
+ die("creating TCP socket (out of memory)");
+ }
+ }
+ status = amqp_socket_open(socket, ci.host, ci.port);
+ if (status) {
+ die("opening socket to %s:%d", ci.host, ci.port);
}
- die_amqp_error(s, "opening socket to %s:%d", ci.host, ci.port);
+ amqp_set_socket(conn, socket);
die_rpc(amqp_login(conn, ci.vhost, 0, 131072, 0,
AMQP_SASL_METHOD_PLAIN,
ci.user, ci.password),