summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Steinert <mike.steinert@gmail.com>2012-05-10 13:28:17 -0600
committerAlan Antonuk <alan.antonuk@gmail.com>2013-04-09 15:18:39 -0700
commit795c1240c9fb09c42bcdc45d5a8d44e6a406ee9c (patch)
treece8b060d7ee08435a6c7b668f56ccf5b7b460b69
parent21b124e2fd2f1c343fb37b708f393d1b9580cfad (diff)
downloadrabbitmq-c-github-ask-795c1240c9fb09c42bcdc45d5a8d44e6a406ee9c.tar.gz
Add plumbing for SSL/TLS support
This change abstracts out the networking functions so that the user can provide an SSL/TLS implementation. Callback functions replace `writev()`, `send()`, and `recv()` (there is also a callback for error reporting). The default interface remains unchanged. If the user wants to create a SSL/TLS connection they first negotiate the connection and then use the new function `amqp_set_sockfd_full()` to provide the networking implementation for their SSL/TLS library. The user may provide an optional pointer to data that is passed through to the networking functions. Signed-off-by: Michael Steinert <mike.steinert@gmail.com>
-rw-r--r--librabbitmq/amqp.h22
-rw-r--r--librabbitmq/amqp_connection.c66
-rw-r--r--librabbitmq/amqp_private.h11
-rw-r--r--librabbitmq/amqp_socket.c20
-rw-r--r--librabbitmq/unix/socket.c30
-rw-r--r--librabbitmq/unix/socket.h15
-rw-r--r--librabbitmq/win32/socket.c17
-rw-r--r--librabbitmq/win32/socket.h4
-rw-r--r--tools/common.c1
9 files changed, 145 insertions, 41 deletions
diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h
index 184ef51..d1bfe4e 100644
--- a/librabbitmq/amqp.h
+++ b/librabbitmq/amqp.h
@@ -52,6 +52,7 @@
*/
#if defined(_WIN32) && defined(_MSC_VER)
+struct iovec;
# if defined(AMQP_BUILD) && !defined(AMQP_STATIC)
# define AMQP_PUBLIC_FUNCTION __declspec(dllexport)
# define AMQP_PUBLIC_VARIABLE __declspec(dllexport) extern
@@ -66,6 +67,7 @@
# define AMQP_CALL __cdecl
#elif defined(_WIN32) && defined(__BORLANDC__)
+struct iovec;
# if defined(AMQP_BUILD) && !defined(AMQP_STATIC)
# define AMQP_PUBLIC_FUNCTION __declspec(dllexport)
# define AMQP_PUBLIC_VARIABLE __declspec(dllexport) extern
@@ -80,6 +82,7 @@
# define AMQP_CALL __cdecl
#elif defined(_WIN32) && defined(__MINGW32__)
+struct iovec;
# if defined(AMQP_BUILD) && !defined(AMQP_STATIC)
# define AMQP_PUBLIC_FUNCTION __declspec(dllexport)
# define AMQP_PUBLIC_VARIABLE __declspec(dllexport)
@@ -94,6 +97,7 @@
# define AMQP_CALL __cdecl
#elif defined(_WIN32) && defined(__CYGWIN__)
+struct iovec;
# if defined(AMQP_BUILD) && !defined(AMQP_STATIC)
# define AMQP_PUBLIC_FUNCTION __declspec(dllexport)
# define AMQP_PUBLIC_VARIABLE __declspec(dllexport)
@@ -108,6 +112,7 @@
# define AMQP_CALL __cdecl
#elif defined(__GNUC__) && __GNUC__ >= 4
+# include <sys/uio.h>
# define AMQP_PUBLIC_FUNCTION \
__attribute__ ((visibility ("default")))
# define AMQP_PUBLIC_VARIABLE \
@@ -297,6 +302,13 @@ typedef enum amqp_sasl_method_enum_ {
/* Opaque struct. */
typedef struct amqp_connection_state_t_ *amqp_connection_state_t;
+/* Socket callbacks. */
+typedef ssize_t (*amqp_socket_writev_fn)(int, const struct iovec *, int, void *);
+typedef ssize_t (*amqp_socket_send_fn)(int, const void *, size_t, int, void *);
+typedef ssize_t (*amqp_socket_recv_fn)(int, void *, size_t, int, void *);
+typedef int (*amqp_socket_close_fn)(int, void *);
+typedef int (*amqp_socket_error_fn)(void *);
+
AMQP_PUBLIC_FUNCTION
char const *
AMQP_CALL amqp_version(void);
@@ -361,6 +373,16 @@ void
AMQP_CALL amqp_set_sockfd(amqp_connection_state_t state, int sockfd);
AMQP_PUBLIC_FUNCTION
+void
+amqp_set_sockfd_full(amqp_connection_state_t state, int sockfd,
+ amqp_socket_writev_fn writev_fn,
+ amqp_socket_send_fn send_fn,
+ amqp_socket_recv_fn recv_fn,
+ amqp_socket_close_fn close_fn,
+ amqp_socket_error_fn error_fn,
+ void *user_data);
+
+AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_tune_connection(amqp_connection_state_t state,
int channel_max,
diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c
index b3196b1..6468da7 100644
--- a/librabbitmq/amqp_connection.c
+++ b/librabbitmq/amqp_connection.c
@@ -59,6 +59,18 @@
_check_state->state); \
}
+static ssize_t amqp_socket_send(int sockfd, const void *buf, size_t len,
+ int flags, AMQP_UNUSED void *user_data)
+{
+ return send(sockfd, buf, len, flags);
+}
+
+static ssize_t amqp_socket_recv(int sockfd, void *buf, size_t len, int flags,
+ AMQP_UNUSED void *user_data)
+{
+ return recv(sockfd, buf, len, flags);
+}
+
amqp_connection_state_t amqp_new_connection(void)
{
int res;
@@ -115,6 +127,29 @@ void amqp_set_sockfd(amqp_connection_state_t state,
int sockfd)
{
state->sockfd = sockfd;
+ state->writev = amqp_socket_writev;
+ state->send = amqp_socket_send;
+ state->recv = amqp_socket_recv;
+ state->close = amqp_socket_close;
+ state->error = amqp_socket_error;
+ state->user_data = NULL;
+}
+
+void amqp_set_sockfd_full(amqp_connection_state_t state, int sockfd,
+ amqp_socket_writev_fn writev_fn,
+ amqp_socket_send_fn send_fn,
+ amqp_socket_recv_fn recv_fn,
+ amqp_socket_close_fn close_fn,
+ amqp_socket_error_fn error_fn,
+ void *user_data)
+{
+ state->sockfd = sockfd;
+ state->writev = writev_fn;
+ state->send = send_fn;
+ state->recv = recv_fn;
+ state->close = close_fn;
+ state->error = error_fn;
+ state->user_data = user_data;
}
int amqp_tune_connection(amqp_connection_state_t state,
@@ -152,19 +187,17 @@ int amqp_get_channel_max(amqp_connection_state_t state)
int amqp_destroy_connection(amqp_connection_state_t state)
{
- int s = state->sockfd;
-
- empty_amqp_pool(&state->frame_pool);
- empty_amqp_pool(&state->decoding_pool);
- free(state->outbound_buffer.bytes);
- free(state->sock_inbound_buffer.bytes);
- free(state);
-
- if (s >= 0 && amqp_socket_close(s) < 0) {
- return -amqp_socket_error();
- } else {
- return 0;
+ int status = 0;
+ if (state) {
+ empty_amqp_pool(&state->frame_pool);
+ empty_amqp_pool(&state->decoding_pool);
+ free(state->outbound_buffer.bytes);
+ free(state->sock_inbound_buffer.bytes);
+ if (state->sockfd >= 0 && state->close(state->sockfd, state->user_data) < 0)
+ status = -state->error(state->user_data);
+ free(state);
}
+ return status;
}
static void return_to_idle(amqp_connection_state_t state)
@@ -392,7 +425,7 @@ int amqp_send_frame(amqp_connection_state_t state,
iov[2].iov_base = &frame_end_byte;
iov[2].iov_len = FOOTER_SIZE;
- res = amqp_socket_writev(state->sockfd, iov, 3);
+ res = state->writev(state->sockfd, iov, 3, state->user_data);
} else {
size_t out_frame_len;
amqp_bytes_t encoded;
@@ -440,12 +473,13 @@ int amqp_send_frame(amqp_connection_state_t state,
amqp_e32(out_frame, 3, out_frame_len);
amqp_e8(out_frame, out_frame_len + HEADER_SIZE, AMQP_FRAME_END);
- res = send(state->sockfd, out_frame,
- out_frame_len + HEADER_SIZE + FOOTER_SIZE, MSG_NOSIGNAL);
+ res = state->send(state->sockfd, out_frame,
+ out_frame_len + HEADER_SIZE + FOOTER_SIZE,
+ MSG_NOSIGNAL, state->user_data);
}
if (res < 0) {
- return -amqp_socket_error();
+ return -state->error(state->user_data);
} else {
return 0;
}
diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h
index 68239d1..b28d7bb 100644
--- a/librabbitmq/amqp_private.h
+++ b/librabbitmq/amqp_private.h
@@ -43,6 +43,7 @@
#include "amqp.h"
#include "amqp_framing.h"
+#include <arpa/inet.h>
#include <string.h>
/* Error numbering: Because of differences in error numbering on
@@ -71,8 +72,11 @@
#if __GNUC__ > 2 | (__GNUC__ == 2 && __GNUC_MINOR__ > 4)
#define AMQP_NORETURN \
__attribute__ ((__noreturn__))
+#define AMQP_UNUSED \
+ __attribute__ ((__unused__))
#else
#define AMQP_NORETURN
+#define AMQP_UNUSED
#endif
#if __GNUC__ >= 4
@@ -144,6 +148,13 @@ struct amqp_connection_state_t_ {
amqp_bytes_t outbound_buffer;
int sockfd;
+ amqp_socket_writev_fn writev;
+ amqp_socket_send_fn send;
+ amqp_socket_recv_fn recv;
+ amqp_socket_close_fn close;
+ amqp_socket_error_fn error;
+ void *user_data;
+
amqp_bytes_t sock_inbound_buffer;
size_t sock_inbound_offset;
size_t sock_inbound_limit;
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index 22cc2c5..f57a512 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -39,12 +39,16 @@
#endif
#include "amqp_private.h"
-#include <stdlib.h>
+#include <assert.h>
+#include <netdb.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <stdarg.h>
+#include <stdint.h>
#include <stdio.h>
+#include <stdlib.h>
#include <string.h>
-#include <stdint.h>
-#include <stdarg.h>
-#include <assert.h>
+#include <sys/socket.h>
int amqp_open_socket(char const *hostname,
int portnumber)
@@ -117,7 +121,7 @@ int amqp_send_header(amqp_connection_state_t state)
AMQP_PROTOCOL_VERSION_MINOR,
AMQP_PROTOCOL_VERSION_REVISION
};
- return send(state->sockfd, (void *)header, 8, MSG_NOSIGNAL);
+ return state->send(state->sockfd, (void *)header, 8, 0, state->user_data);
}
static amqp_bytes_t sasl_method_name(amqp_sasl_method_enum method)
@@ -214,13 +218,13 @@ static int wait_frame_inner(amqp_connection_state_t state,
assert(res != 0);
}
- res = recv(state->sockfd, state->sock_inbound_buffer.bytes,
- state->sock_inbound_buffer.len, 0);
+ res = state->recv(state->sockfd, state->sock_inbound_buffer.bytes,
+ state->sock_inbound_buffer.len, 0, state->user_data);
if (res <= 0) {
if (res == 0) {
return -ERROR_CONNECTION_CLOSED;
} else {
- return -amqp_socket_error();
+ return -state->error(state->user_data);
}
}
diff --git a/librabbitmq/unix/socket.c b/librabbitmq/unix/socket.c
index 38064f1..f6414d2 100644
--- a/librabbitmq/unix/socket.c
+++ b/librabbitmq/unix/socket.c
@@ -40,6 +40,7 @@
#include "amqp_private.h"
#include "socket.h"
+#include <errno.h>
#include <fcntl.h>
#include <stdint.h>
#include <stdlib.h>
@@ -52,12 +53,7 @@ amqp_socket_init(void)
}
int
-amqp_socket_error(void)
-{
- return errno | ERROR_CATEGORY_OS;
-}
-
-int amqp_socket_socket(int domain, int type, int proto)
+amqp_socket_socket(int domain, int type, int proto)
{
int flags;
@@ -79,7 +75,27 @@ int amqp_socket_socket(int domain, int type, int proto)
return s;
}
-char *amqp_os_error_string(int err)
+char *
+amqp_os_error_string(int err)
{
return strdup(strerror(err));
}
+
+int
+amqp_socket_close(int sockfd, AMQP_UNUSED void *user_data)
+{
+ return close(sockfd);
+}
+
+int
+amqp_socket_writev(int sockfd, const struct iovec *iov,
+ int iovcnt, AMQP_UNUSED void *user_data)
+{
+ return writev(sockfd, iov, iovcnt);
+}
+
+int
+amqp_socket_error(AMQP_UNUSED void *user_data)
+{
+ return errno | ERROR_CATEGORY_OS;
+}
diff --git a/librabbitmq/unix/socket.h b/librabbitmq/unix/socket.h
index 1382ca3..9c37930 100644
--- a/librabbitmq/unix/socket.h
+++ b/librabbitmq/unix/socket.h
@@ -44,7 +44,6 @@
#include <sys/socket.h>
#include <netdb.h>
#include <sys/uio.h>
-#include <unistd.h>
int
amqp_socket_init(void);
@@ -53,11 +52,19 @@ int
amqp_socket_socket(int domain, int type, int proto);
int
-amqp_socket_error(void);
+amqp_socket_error(void *user_data);
+
+int
+amqp_socket_socket(int domain, int type, int proto);
+
+int
+amqp_socket_close(int sockfd, void *user_data);
+
+int
+amqp_socket_writev(int sockfd, const struct iovec *iov, int iovcnt,
+ void *user_data);
#define amqp_socket_setsockopt setsockopt
-#define amqp_socket_close close
-#define amqp_socket_writev writev
#if defined(SO_NOSIGPIPE) && !defined(MSG_NOSIGNAL)
# define DISABLE_SIGPIPE_WITH_SETSOCKOPT
diff --git a/librabbitmq/win32/socket.c b/librabbitmq/win32/socket.c
index a5d9454..7c6c230 100644
--- a/librabbitmq/win32/socket.c
+++ b/librabbitmq/win32/socket.c
@@ -49,7 +49,8 @@
static int called_wsastartup;
-int amqp_socket_init(void)
+int
+amqp_socket_init(void)
{
if (!called_wsastartup) {
WSADATA data;
@@ -64,7 +65,8 @@ int amqp_socket_init(void)
return 0;
}
-char *amqp_os_error_string(int err)
+char *
+amqp_os_error_string(int err)
{
char *msg, *copy;
@@ -91,7 +93,14 @@ amqp_socket_setsockopt(int sock, int level, int optname,
}
int
-amqp_socket_writev(int sock, struct iovec *iov, int nvecs)
+amqp_socket_close(int sockfd, AMQP_UNUSED void *user_data)
+{
+ return closesocket(sockfd);
+}
+
+int
+amqp_socket_writev(int sock, struct iovec *iov, int nvecs,
+ AMQP_UNUSED void *user_data)
{
DWORD ret;
if (WSASend(sock, (LPWSABUF)iov, nvecs, &ret, 0, NULL, NULL) == 0) {
@@ -102,7 +111,7 @@ amqp_socket_writev(int sock, struct iovec *iov, int nvecs)
}
int
-amqp_socket_error(void)
+amqp_socket_error(AMQP_UNUSED void *user_data)
{
return WSAGetLastError() | ERROR_CATEGORY_OS;
}
diff --git a/librabbitmq/win32/socket.h b/librabbitmq/win32/socket.h
index cea361d..a47532e 100644
--- a/librabbitmq/win32/socket.h
+++ b/librabbitmq/win32/socket.h
@@ -37,6 +37,7 @@
* ***** END LICENSE BLOCK *****
*/
+#include "amqp_private.h"
#include <winsock2.h>
#include <WS2tcpip.h>
@@ -50,7 +51,8 @@ int
amqp_socket_init(void);
#define amqp_socket_socket socket
-#define amqp_socket_close closesocket
+int
+amqp_socket_close(int sockfd, AMQP_UNUSED void *user_data);
int
amqp_socket_setsockopt(int sock, int level, int optname, const void *optval,
diff --git a/tools/common.c b/tools/common.c
index 7659521..46e0a26 100644
--- a/tools/common.c
+++ b/tools/common.c
@@ -39,7 +39,6 @@
#endif
/* needed for asnprintf */
-#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>