summaryrefslogtreecommitdiff
path: root/librabbitmq
diff options
context:
space:
mode:
Diffstat (limited to 'librabbitmq')
-rw-r--r--librabbitmq/amqp.h95
-rw-r--r--librabbitmq/amqp_connection.c67
-rw-r--r--librabbitmq/amqp_private.h11
-rw-r--r--librabbitmq/amqp_socket.c28
-rw-r--r--librabbitmq/unix/socket.c30
-rw-r--r--librabbitmq/unix/socket.h21
-rw-r--r--librabbitmq/win32/socket.c17
-rw-r--r--librabbitmq/win32/socket.h4
8 files changed, 189 insertions, 84 deletions
diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h
index c8d0127..287a55f 100644
--- a/librabbitmq/amqp.h
+++ b/librabbitmq/amqp.h
@@ -48,6 +48,7 @@
*/
#if defined(_WIN32) && defined(_MSC_VER)
+struct iovec;
# if defined(AMQP_BUILD) && !defined(AMQP_STATIC)
# define AMQP_PUBLIC_FUNCTION __declspec(dllexport)
# define AMQP_PUBLIC_VARIABLE __declspec(dllexport) extern
@@ -62,6 +63,7 @@
# define AMQP_CALL __cdecl
#elif defined(_WIN32) && defined(__BORLANDC__)
+struct iovec;
# if defined(AMQP_BUILD) && !defined(AMQP_STATIC)
# define AMQP_PUBLIC_FUNCTION __declspec(dllexport)
# define AMQP_PUBLIC_VARIABLE __declspec(dllexport) extern
@@ -76,6 +78,7 @@
# define AMQP_CALL __cdecl
#elif defined(_WIN32) && defined(__MINGW32__)
+struct iovec;
# if defined(AMQP_BUILD) && !defined(AMQP_STATIC)
# define AMQP_PUBLIC_FUNCTION __declspec(dllexport)
# define AMQP_PUBLIC_VARIABLE __declspec(dllexport)
@@ -90,6 +93,7 @@
# define AMQP_CALL __cdecl
#elif defined(_WIN32) && defined(__CYGWIN__)
+struct iovec;
# if defined(AMQP_BUILD) && !defined(AMQP_STATIC)
# define AMQP_PUBLIC_FUNCTION __declspec(dllexport)
# define AMQP_PUBLIC_VARIABLE __declspec(dllexport)
@@ -104,6 +108,7 @@
# define AMQP_CALL __cdecl
#elif defined(__GNUC__) && __GNUC__ >= 4
+# include <sys/uio.h>
# define AMQP_PUBLIC_FUNCTION \
__attribute__ ((visibility ("default")))
# define AMQP_PUBLIC_VARIABLE \
@@ -293,6 +298,13 @@ typedef enum amqp_sasl_method_enum_ {
/* Opaque struct. */
typedef struct amqp_connection_state_t_ *amqp_connection_state_t;
+/* Socket callbacks. */
+typedef ssize_t (*amqp_socket_writev_fn)(int, const struct iovec *, int, void *);
+typedef ssize_t (*amqp_socket_send_fn)(int, const void *, size_t, int, void *);
+typedef ssize_t (*amqp_socket_recv_fn)(int, void *, size_t, int, void *);
+typedef int (*amqp_socket_close_fn)(int, void *);
+typedef int (*amqp_socket_error_fn)(void *);
+
AMQP_PUBLIC_FUNCTION
char const *
AMQP_CALL amqp_version(void);
@@ -357,6 +369,16 @@ void
AMQP_CALL amqp_set_sockfd(amqp_connection_state_t state, int sockfd);
AMQP_PUBLIC_FUNCTION
+void
+amqp_set_sockfd_full(amqp_connection_state_t state, int sockfd,
+ amqp_socket_writev_fn writev_fn,
+ amqp_socket_send_fn send_fn,
+ amqp_socket_recv_fn recv_fn,
+ amqp_socket_close_fn close_fn,
+ amqp_socket_error_fn error_fn,
+ void *user_data);
+
+AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_tune_connection(amqp_connection_state_t state,
int channel_max,
@@ -374,8 +396,8 @@ AMQP_CALL amqp_destroy_connection(amqp_connection_state_t state);
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_handle_input(amqp_connection_state_t state,
- amqp_bytes_t received_data,
- amqp_frame_t *decoded_frame);
+ amqp_bytes_t received_data,
+ amqp_frame_t *decoded_frame);
AMQP_PUBLIC_FUNCTION
amqp_boolean_t
@@ -412,37 +434,37 @@ AMQP_CALL amqp_frames_enqueued(amqp_connection_state_t state);
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_simple_wait_frame(amqp_connection_state_t state,
- amqp_frame_t *decoded_frame);
+ amqp_frame_t *decoded_frame);
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_simple_wait_method(amqp_connection_state_t state,
- amqp_channel_t expected_channel,
- amqp_method_number_t expected_method,
- amqp_method_t *output);
+ amqp_channel_t expected_channel,
+ amqp_method_number_t expected_method,
+ amqp_method_t *output);
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_send_method(amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_method_number_t id,
- void *decoded);
+ amqp_channel_t channel,
+ amqp_method_number_t id,
+ void *decoded);
AMQP_PUBLIC_FUNCTION
amqp_rpc_reply_t
AMQP_CALL amqp_simple_rpc(amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_method_number_t request_id,
- amqp_method_number_t *expected_reply_ids,
- void *decoded_request_method);
+ amqp_channel_t channel,
+ amqp_method_number_t request_id,
+ amqp_method_number_t *expected_reply_ids,
+ void *decoded_request_method);
AMQP_PUBLIC_FUNCTION
void *
AMQP_CALL amqp_simple_rpc_decoded(amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_method_number_t request_id,
- amqp_method_number_t reply_id,
- void *decoded_request_method);
+ amqp_channel_t channel,
+ amqp_method_number_t request_id,
+ amqp_method_number_t reply_id,
+ void *decoded_request_method);
/*
* The API methods corresponding to most synchronous AMQP methods
@@ -463,24 +485,27 @@ AMQP_CALL amqp_get_rpc_reply(amqp_connection_state_t state);
AMQP_PUBLIC_FUNCTION
amqp_rpc_reply_t
-AMQP_CALL amqp_login(amqp_connection_state_t state, char const *vhost,
- int channel_max, int frame_max, int heartbeat,
- amqp_sasl_method_enum sasl_method, ...);
+AMQP_CALL
+amqp_login(amqp_connection_state_t state, char const *vhost,
+ int channel_max, int frame_max, int heartbeat,
+ amqp_sasl_method_enum sasl_method, ...);
struct amqp_basic_properties_t_;
AMQP_PUBLIC_FUNCTION
int
-AMQP_CALL amqp_basic_publish(amqp_connection_state_t state, amqp_channel_t channel,
- amqp_bytes_t exchange, amqp_bytes_t routing_key,
- amqp_boolean_t mandatory, amqp_boolean_t immediate,
- struct amqp_basic_properties_t_ const *properties,
- amqp_bytes_t body);
+AMQP_CALL
+amqp_basic_publish(amqp_connection_state_t state, amqp_channel_t channel,
+ amqp_bytes_t exchange, amqp_bytes_t routing_key,
+ amqp_boolean_t mandatory, amqp_boolean_t immediate,
+ struct amqp_basic_properties_t_ const *properties,
+ amqp_bytes_t body);
AMQP_PUBLIC_FUNCTION
amqp_rpc_reply_t
-AMQP_CALL amqp_channel_close(amqp_connection_state_t state, amqp_channel_t channel,
- int code);
+AMQP_CALL amqp_channel_close(amqp_connection_state_t state,
+ amqp_channel_t channel,
+ int code);
AMQP_PUBLIC_FUNCTION
amqp_rpc_reply_t
@@ -489,17 +514,19 @@ AMQP_CALL amqp_connection_close(amqp_connection_state_t state, int code);
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_basic_ack(amqp_connection_state_t state, amqp_channel_t channel,
- uint64_t delivery_tag, amqp_boolean_t multiple);
+ uint64_t delivery_tag, amqp_boolean_t multiple);
AMQP_PUBLIC_FUNCTION
amqp_rpc_reply_t
AMQP_CALL amqp_basic_get(amqp_connection_state_t state, amqp_channel_t channel,
- amqp_bytes_t queue, amqp_boolean_t no_ack);
+ amqp_bytes_t queue, amqp_boolean_t no_ack);
AMQP_PUBLIC_FUNCTION
int
-AMQP_CALL amqp_basic_reject(amqp_connection_state_t state, amqp_channel_t channel,
- uint64_t delivery_tag, amqp_boolean_t requeue);
+AMQP_CALL amqp_basic_reject(amqp_connection_state_t state,
+ amqp_channel_t channel,
+ uint64_t delivery_tag,
+ amqp_boolean_t requeue);
/*
* Can be used to see if there is data still in the buffer, if so
@@ -525,7 +552,7 @@ AMQP_CALL amqp_error_string(int err);
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_decode_table(amqp_bytes_t encoded, amqp_pool_t *pool,
- amqp_table_t *output, size_t *offset);
+ amqp_table_t *output, size_t *offset);
AMQP_PUBLIC_FUNCTION
int
@@ -540,11 +567,11 @@ struct amqp_connection_info {
};
AMQP_PUBLIC_FUNCTION
-void
+void
AMQP_CALL amqp_default_connection_info(struct amqp_connection_info *parsed);
AMQP_PUBLIC_FUNCTION
-int
+int
AMQP_CALL amqp_parse_url(char *url, struct amqp_connection_info *parsed);
AMQP_END_DECLS
diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c
index 561d496..20b56b8 100644
--- a/librabbitmq/amqp_connection.c
+++ b/librabbitmq/amqp_connection.c
@@ -55,6 +55,18 @@
_check_state->state); \
}
+static ssize_t amqp_socket_send(int sockfd, const void *buf, size_t len,
+ int flags, AMQP_UNUSED void *user_data)
+{
+ return send(sockfd, buf, len, flags);
+}
+
+static ssize_t amqp_socket_recv(int sockfd, void *buf, size_t len, int flags,
+ AMQP_UNUSED void *user_data)
+{
+ return recv(sockfd, buf, len, flags);
+}
+
amqp_connection_state_t amqp_new_connection(void) {
int res;
amqp_connection_state_t state =
@@ -105,6 +117,29 @@ void amqp_set_sockfd(amqp_connection_state_t state,
int sockfd)
{
state->sockfd = sockfd;
+ state->writev = amqp_socket_writev;
+ state->send = amqp_socket_send;
+ state->recv = amqp_socket_recv;
+ state->close = amqp_socket_close;
+ state->error = amqp_socket_error;
+ state->user_data = NULL;
+}
+
+void amqp_set_sockfd_full(amqp_connection_state_t state, int sockfd,
+ amqp_socket_writev_fn writev_fn,
+ amqp_socket_send_fn send_fn,
+ amqp_socket_recv_fn recv_fn,
+ amqp_socket_close_fn close_fn,
+ amqp_socket_error_fn error_fn,
+ void *user_data)
+{
+ state->sockfd = sockfd;
+ state->writev = writev_fn;
+ state->send = send_fn;
+ state->recv = recv_fn;
+ state->close = close_fn;
+ state->error = error_fn;
+ state->user_data = user_data;
}
int amqp_tune_connection(amqp_connection_state_t state,
@@ -140,18 +175,17 @@ int amqp_get_channel_max(amqp_connection_state_t state) {
}
int amqp_destroy_connection(amqp_connection_state_t state) {
- int s = state->sockfd;
-
- empty_amqp_pool(&state->frame_pool);
- empty_amqp_pool(&state->decoding_pool);
- free(state->outbound_buffer.bytes);
- free(state->sock_inbound_buffer.bytes);
- free(state);
-
- if (s >= 0 && amqp_socket_close(s) < 0)
- return -amqp_socket_error();
- else
- return 0;
+ int status = 0;
+ if (state) {
+ empty_amqp_pool(&state->frame_pool);
+ empty_amqp_pool(&state->decoding_pool);
+ free(state->outbound_buffer.bytes);
+ free(state->sock_inbound_buffer.bytes);
+ if (state->sockfd >= 0 && state->close(state->sockfd, state->user_data) < 0)
+ status = -state->error(state->user_data);
+ free(state);
+ }
+ return status;
}
static void return_to_idle(amqp_connection_state_t state) {
@@ -365,7 +399,7 @@ int amqp_send_frame(amqp_connection_state_t state,
iov[2].iov_base = &frame_end_byte;
iov[2].iov_len = FOOTER_SIZE;
- res = amqp_socket_writev(state->sockfd, iov, 3);
+ res = state->writev(state->sockfd, iov, 3, state->user_data);
}
else {
size_t out_frame_len;
@@ -412,12 +446,13 @@ int amqp_send_frame(amqp_connection_state_t state,
amqp_e32(out_frame, 3, out_frame_len);
amqp_e8(out_frame, out_frame_len + HEADER_SIZE, AMQP_FRAME_END);
- res = send(state->sockfd, out_frame,
- out_frame_len + HEADER_SIZE + FOOTER_SIZE, 0);
+ res = state->send(state->sockfd, out_frame,
+ out_frame_len + HEADER_SIZE + FOOTER_SIZE,
+ 0, state->user_data);
}
if (res < 0)
- return -amqp_socket_error();
+ return -state->error(state->user_data);
else
return 0;
}
diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h
index bbee792..192cbba 100644
--- a/librabbitmq/amqp_private.h
+++ b/librabbitmq/amqp_private.h
@@ -39,6 +39,7 @@
#include "amqp.h"
#include "amqp_framing.h"
+#include <arpa/inet.h>
#include <string.h>
/* Error numbering: Because of differences in error numbering on
@@ -67,8 +68,11 @@
#if __GNUC__ > 2 | (__GNUC__ == 2 && __GNUC_MINOR__ > 4)
#define AMQP_NORETURN \
__attribute__ ((__noreturn__))
+#define AMQP_UNUSED \
+ __attribute__ ((__unused__))
#else
#define AMQP_NORETURN
+#define AMQP_UNUSED
#endif
#if __GNUC__ >= 4
@@ -140,6 +144,13 @@ struct amqp_connection_state_t_ {
amqp_bytes_t outbound_buffer;
int sockfd;
+ amqp_socket_writev_fn writev;
+ amqp_socket_send_fn send;
+ amqp_socket_recv_fn recv;
+ amqp_socket_close_fn close;
+ amqp_socket_error_fn error;
+ void *user_data;
+
amqp_bytes_t sock_inbound_buffer;
size_t sock_inbound_offset;
size_t sock_inbound_limit;
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index 6b9486c..2025549 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -35,12 +35,16 @@
#endif
#include "amqp_private.h"
-#include <stdlib.h>
+#include <assert.h>
+#include <netdb.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <stdarg.h>
+#include <stdint.h>
#include <stdio.h>
+#include <stdlib.h>
#include <string.h>
-#include <stdint.h>
-#include <stdarg.h>
-#include <assert.h>
+#include <sys/socket.h>
int amqp_open_socket(char const *hostname,
int portnumber)
@@ -62,16 +66,16 @@ int amqp_open_socket(char const *hostname,
addr.sin_port = htons(portnumber);
addr.sin_addr.s_addr = * (uint32_t *) he->h_addr_list[0];
- sockfd = socket(PF_INET, SOCK_STREAM, 0);
+ sockfd = amqp_socket_socket(PF_INET, SOCK_STREAM, 0);
if (sockfd == -1)
- return -amqp_socket_error();
+ return -amqp_socket_error(NULL);
if (amqp_socket_setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &one,
sizeof(one)) < 0
|| connect(sockfd, (struct sockaddr *) &addr, sizeof(addr)) < 0)
{
- res = -amqp_socket_error();
- amqp_socket_close(sockfd);
+ res = -amqp_socket_error(NULL);
+ amqp_socket_close(sockfd, NULL);
return res;
}
@@ -83,7 +87,7 @@ int amqp_send_header(amqp_connection_state_t state) {
AMQP_PROTOCOL_VERSION_MAJOR,
AMQP_PROTOCOL_VERSION_MINOR,
AMQP_PROTOCOL_VERSION_REVISION };
- return send(state->sockfd, (void *)header, 8, 0);
+ return state->send(state->sockfd, (void *)header, 8, 0, state->user_data);
}
static amqp_bytes_t sasl_method_name(amqp_sasl_method_enum method) {
@@ -173,13 +177,13 @@ static int wait_frame_inner(amqp_connection_state_t state,
assert(res != 0);
}
- res = recv(state->sockfd, state->sock_inbound_buffer.bytes,
- state->sock_inbound_buffer.len, 0);
+ res = state->recv(state->sockfd, state->sock_inbound_buffer.bytes,
+ state->sock_inbound_buffer.len, 0, state->user_data);
if (res <= 0) {
if (res == 0)
return -ERROR_CONNECTION_CLOSED;
else
- return -amqp_socket_error();
+ return -state->error(state->user_data);
}
state->sock_inbound_limit = res;
diff --git a/librabbitmq/unix/socket.c b/librabbitmq/unix/socket.c
index cb8a2b9..0a15f2b 100644
--- a/librabbitmq/unix/socket.c
+++ b/librabbitmq/unix/socket.c
@@ -36,6 +36,7 @@
#include "amqp_private.h"
#include "socket.h"
+#include <errno.h>
#include <fcntl.h>
#include <stdint.h>
#include <stdlib.h>
@@ -51,12 +52,7 @@ amqp_socket_init(void)
}
int
-amqp_socket_error(void)
-{
- return errno | ERROR_CATEGORY_OS;
-}
-
-int amqp_socket_socket(int domain, int type, int proto)
+amqp_socket_socket(int domain, int type, int proto)
{
int flags;
@@ -77,7 +73,27 @@ int amqp_socket_socket(int domain, int type, int proto)
return s;
}
-char *amqp_os_error_string(int err)
+char *
+amqp_os_error_string(int err)
{
return strdup(strerror(err));
}
+
+int
+amqp_socket_close(int sockfd, AMQP_UNUSED void *user_data)
+{
+ return close(sockfd);
+}
+
+int
+amqp_socket_writev(int sockfd, const struct iovec *iov,
+ int iovcnt, AMQP_UNUSED void *user_data)
+{
+ return writev(sockfd, iov, iovcnt);
+}
+
+int
+amqp_socket_error(AMQP_UNUSED void *user_data)
+{
+ return errno | ERROR_CATEGORY_OS;
+}
diff --git a/librabbitmq/unix/socket.h b/librabbitmq/unix/socket.h
index ff6fa73..051cc72 100644
--- a/librabbitmq/unix/socket.h
+++ b/librabbitmq/unix/socket.h
@@ -33,14 +33,7 @@
* ***** END LICENSE BLOCK *****
*/
-#include <errno.h>
-#include <netdb.h>
-#include <netinet/in.h>
-#include <netinet/tcp.h>
-#include <sys/socket.h>
-#include <sys/types.h>
#include <sys/uio.h>
-#include <unistd.h>
int
amqp_socket_init(void);
@@ -49,10 +42,18 @@ int
amqp_socket_socket(int domain, int type, int proto);
int
-amqp_socket_error(void);
+amqp_socket_error(void *user_data);
+
+int
+amqp_socket_socket(int domain, int type, int proto);
+
+int
+amqp_socket_close(int sockfd, void *user_data);
+
+int
+amqp_socket_writev(int sockfd, const struct iovec *iov, int iovcnt,
+ void *user_data);
#define amqp_socket_setsockopt setsockopt
-#define amqp_socket_close close
-#define amqp_socket_writev writev
#endif
diff --git a/librabbitmq/win32/socket.c b/librabbitmq/win32/socket.c
index 43b919b..06db54e 100644
--- a/librabbitmq/win32/socket.c
+++ b/librabbitmq/win32/socket.c
@@ -45,7 +45,8 @@
static int called_wsastartup;
-int amqp_socket_init(void)
+int
+amqp_socket_init(void)
{
if (!called_wsastartup) {
WSADATA data;
@@ -59,7 +60,8 @@ int amqp_socket_init(void)
return 0;
}
-char *amqp_os_error_string(int err)
+char *
+amqp_os_error_string(int err)
{
char *msg, *copy;
@@ -85,7 +87,14 @@ amqp_socket_setsockopt(int sock, int level, int optname,
}
int
-amqp_socket_writev(int sock, struct iovec *iov, int nvecs)
+amqp_socket_close(int sockfd, AMQP_UNUSED void *user_data)
+{
+ return closesocket(sockfd);
+}
+
+int
+amqp_socket_writev(int sock, struct iovec *iov, int nvecs,
+ AMQP_UNUSED void *user_data)
{
DWORD ret;
if (WSASend(sock, (LPWSABUF)iov, nvecs, &ret, 0, NULL, NULL) == 0)
@@ -95,7 +104,7 @@ amqp_socket_writev(int sock, struct iovec *iov, int nvecs)
}
int
-amqp_socket_error(void)
+amqp_socket_error(AMQP_UNUSED void *user_data)
{
return WSAGetLastError() | ERROR_CATEGORY_OS;
}
diff --git a/librabbitmq/win32/socket.h b/librabbitmq/win32/socket.h
index 4572410..4353658 100644
--- a/librabbitmq/win32/socket.h
+++ b/librabbitmq/win32/socket.h
@@ -33,6 +33,7 @@
* ***** END LICENSE BLOCK *****
*/
+#include "amqp_private.h"
#include <winsock2.h>
/* same as WSABUF */
@@ -45,7 +46,8 @@ int
amqp_socket_init(void);
#define amqp_socket_socket socket
-#define amqp_socket_close closesocket
+int
+amqp_socket_close(int sockfd, AMQP_UNUSED void *user_data);
int
amqp_socket_setsockopt(int sock, int level, int optname, const void *optval,