diff options
Diffstat (limited to 'librabbitmq')
-rw-r--r-- | librabbitmq/amqp.h | 95 | ||||
-rw-r--r-- | librabbitmq/amqp_connection.c | 67 | ||||
-rw-r--r-- | librabbitmq/amqp_private.h | 11 | ||||
-rw-r--r-- | librabbitmq/amqp_socket.c | 28 | ||||
-rw-r--r-- | librabbitmq/unix/socket.c | 30 | ||||
-rw-r--r-- | librabbitmq/unix/socket.h | 21 | ||||
-rw-r--r-- | librabbitmq/win32/socket.c | 17 | ||||
-rw-r--r-- | librabbitmq/win32/socket.h | 4 |
8 files changed, 189 insertions, 84 deletions
diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h index c8d0127..287a55f 100644 --- a/librabbitmq/amqp.h +++ b/librabbitmq/amqp.h @@ -48,6 +48,7 @@ */ #if defined(_WIN32) && defined(_MSC_VER) +struct iovec; # if defined(AMQP_BUILD) && !defined(AMQP_STATIC) # define AMQP_PUBLIC_FUNCTION __declspec(dllexport) # define AMQP_PUBLIC_VARIABLE __declspec(dllexport) extern @@ -62,6 +63,7 @@ # define AMQP_CALL __cdecl #elif defined(_WIN32) && defined(__BORLANDC__) +struct iovec; # if defined(AMQP_BUILD) && !defined(AMQP_STATIC) # define AMQP_PUBLIC_FUNCTION __declspec(dllexport) # define AMQP_PUBLIC_VARIABLE __declspec(dllexport) extern @@ -76,6 +78,7 @@ # define AMQP_CALL __cdecl #elif defined(_WIN32) && defined(__MINGW32__) +struct iovec; # if defined(AMQP_BUILD) && !defined(AMQP_STATIC) # define AMQP_PUBLIC_FUNCTION __declspec(dllexport) # define AMQP_PUBLIC_VARIABLE __declspec(dllexport) @@ -90,6 +93,7 @@ # define AMQP_CALL __cdecl #elif defined(_WIN32) && defined(__CYGWIN__) +struct iovec; # if defined(AMQP_BUILD) && !defined(AMQP_STATIC) # define AMQP_PUBLIC_FUNCTION __declspec(dllexport) # define AMQP_PUBLIC_VARIABLE __declspec(dllexport) @@ -104,6 +108,7 @@ # define AMQP_CALL __cdecl #elif defined(__GNUC__) && __GNUC__ >= 4 +# include <sys/uio.h> # define AMQP_PUBLIC_FUNCTION \ __attribute__ ((visibility ("default"))) # define AMQP_PUBLIC_VARIABLE \ @@ -293,6 +298,13 @@ typedef enum amqp_sasl_method_enum_ { /* Opaque struct. */ typedef struct amqp_connection_state_t_ *amqp_connection_state_t; +/* Socket callbacks. */ +typedef ssize_t (*amqp_socket_writev_fn)(int, const struct iovec *, int, void *); +typedef ssize_t (*amqp_socket_send_fn)(int, const void *, size_t, int, void *); +typedef ssize_t (*amqp_socket_recv_fn)(int, void *, size_t, int, void *); +typedef int (*amqp_socket_close_fn)(int, void *); +typedef int (*amqp_socket_error_fn)(void *); + AMQP_PUBLIC_FUNCTION char const * AMQP_CALL amqp_version(void); @@ -357,6 +369,16 @@ void AMQP_CALL amqp_set_sockfd(amqp_connection_state_t state, int sockfd); AMQP_PUBLIC_FUNCTION +void +amqp_set_sockfd_full(amqp_connection_state_t state, int sockfd, + amqp_socket_writev_fn writev_fn, + amqp_socket_send_fn send_fn, + amqp_socket_recv_fn recv_fn, + amqp_socket_close_fn close_fn, + amqp_socket_error_fn error_fn, + void *user_data); + +AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_tune_connection(amqp_connection_state_t state, int channel_max, @@ -374,8 +396,8 @@ AMQP_CALL amqp_destroy_connection(amqp_connection_state_t state); AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_handle_input(amqp_connection_state_t state, - amqp_bytes_t received_data, - amqp_frame_t *decoded_frame); + amqp_bytes_t received_data, + amqp_frame_t *decoded_frame); AMQP_PUBLIC_FUNCTION amqp_boolean_t @@ -412,37 +434,37 @@ AMQP_CALL amqp_frames_enqueued(amqp_connection_state_t state); AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_simple_wait_frame(amqp_connection_state_t state, - amqp_frame_t *decoded_frame); + amqp_frame_t *decoded_frame); AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_simple_wait_method(amqp_connection_state_t state, - amqp_channel_t expected_channel, - amqp_method_number_t expected_method, - amqp_method_t *output); + amqp_channel_t expected_channel, + amqp_method_number_t expected_method, + amqp_method_t *output); AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_send_method(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_method_number_t id, - void *decoded); + amqp_channel_t channel, + amqp_method_number_t id, + void *decoded); AMQP_PUBLIC_FUNCTION amqp_rpc_reply_t AMQP_CALL amqp_simple_rpc(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_method_number_t request_id, - amqp_method_number_t *expected_reply_ids, - void *decoded_request_method); + amqp_channel_t channel, + amqp_method_number_t request_id, + amqp_method_number_t *expected_reply_ids, + void *decoded_request_method); AMQP_PUBLIC_FUNCTION void * AMQP_CALL amqp_simple_rpc_decoded(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_method_number_t request_id, - amqp_method_number_t reply_id, - void *decoded_request_method); + amqp_channel_t channel, + amqp_method_number_t request_id, + amqp_method_number_t reply_id, + void *decoded_request_method); /* * The API methods corresponding to most synchronous AMQP methods @@ -463,24 +485,27 @@ AMQP_CALL amqp_get_rpc_reply(amqp_connection_state_t state); AMQP_PUBLIC_FUNCTION amqp_rpc_reply_t -AMQP_CALL amqp_login(amqp_connection_state_t state, char const *vhost, - int channel_max, int frame_max, int heartbeat, - amqp_sasl_method_enum sasl_method, ...); +AMQP_CALL +amqp_login(amqp_connection_state_t state, char const *vhost, + int channel_max, int frame_max, int heartbeat, + amqp_sasl_method_enum sasl_method, ...); struct amqp_basic_properties_t_; AMQP_PUBLIC_FUNCTION int -AMQP_CALL amqp_basic_publish(amqp_connection_state_t state, amqp_channel_t channel, - amqp_bytes_t exchange, amqp_bytes_t routing_key, - amqp_boolean_t mandatory, amqp_boolean_t immediate, - struct amqp_basic_properties_t_ const *properties, - amqp_bytes_t body); +AMQP_CALL +amqp_basic_publish(amqp_connection_state_t state, amqp_channel_t channel, + amqp_bytes_t exchange, amqp_bytes_t routing_key, + amqp_boolean_t mandatory, amqp_boolean_t immediate, + struct amqp_basic_properties_t_ const *properties, + amqp_bytes_t body); AMQP_PUBLIC_FUNCTION amqp_rpc_reply_t -AMQP_CALL amqp_channel_close(amqp_connection_state_t state, amqp_channel_t channel, - int code); +AMQP_CALL amqp_channel_close(amqp_connection_state_t state, + amqp_channel_t channel, + int code); AMQP_PUBLIC_FUNCTION amqp_rpc_reply_t @@ -489,17 +514,19 @@ AMQP_CALL amqp_connection_close(amqp_connection_state_t state, int code); AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_basic_ack(amqp_connection_state_t state, amqp_channel_t channel, - uint64_t delivery_tag, amqp_boolean_t multiple); + uint64_t delivery_tag, amqp_boolean_t multiple); AMQP_PUBLIC_FUNCTION amqp_rpc_reply_t AMQP_CALL amqp_basic_get(amqp_connection_state_t state, amqp_channel_t channel, - amqp_bytes_t queue, amqp_boolean_t no_ack); + amqp_bytes_t queue, amqp_boolean_t no_ack); AMQP_PUBLIC_FUNCTION int -AMQP_CALL amqp_basic_reject(amqp_connection_state_t state, amqp_channel_t channel, - uint64_t delivery_tag, amqp_boolean_t requeue); +AMQP_CALL amqp_basic_reject(amqp_connection_state_t state, + amqp_channel_t channel, + uint64_t delivery_tag, + amqp_boolean_t requeue); /* * Can be used to see if there is data still in the buffer, if so @@ -525,7 +552,7 @@ AMQP_CALL amqp_error_string(int err); AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_decode_table(amqp_bytes_t encoded, amqp_pool_t *pool, - amqp_table_t *output, size_t *offset); + amqp_table_t *output, size_t *offset); AMQP_PUBLIC_FUNCTION int @@ -540,11 +567,11 @@ struct amqp_connection_info { }; AMQP_PUBLIC_FUNCTION -void +void AMQP_CALL amqp_default_connection_info(struct amqp_connection_info *parsed); AMQP_PUBLIC_FUNCTION -int +int AMQP_CALL amqp_parse_url(char *url, struct amqp_connection_info *parsed); AMQP_END_DECLS diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c index 561d496..20b56b8 100644 --- a/librabbitmq/amqp_connection.c +++ b/librabbitmq/amqp_connection.c @@ -55,6 +55,18 @@ _check_state->state); \ } +static ssize_t amqp_socket_send(int sockfd, const void *buf, size_t len, + int flags, AMQP_UNUSED void *user_data) +{ + return send(sockfd, buf, len, flags); +} + +static ssize_t amqp_socket_recv(int sockfd, void *buf, size_t len, int flags, + AMQP_UNUSED void *user_data) +{ + return recv(sockfd, buf, len, flags); +} + amqp_connection_state_t amqp_new_connection(void) { int res; amqp_connection_state_t state = @@ -105,6 +117,29 @@ void amqp_set_sockfd(amqp_connection_state_t state, int sockfd) { state->sockfd = sockfd; + state->writev = amqp_socket_writev; + state->send = amqp_socket_send; + state->recv = amqp_socket_recv; + state->close = amqp_socket_close; + state->error = amqp_socket_error; + state->user_data = NULL; +} + +void amqp_set_sockfd_full(amqp_connection_state_t state, int sockfd, + amqp_socket_writev_fn writev_fn, + amqp_socket_send_fn send_fn, + amqp_socket_recv_fn recv_fn, + amqp_socket_close_fn close_fn, + amqp_socket_error_fn error_fn, + void *user_data) +{ + state->sockfd = sockfd; + state->writev = writev_fn; + state->send = send_fn; + state->recv = recv_fn; + state->close = close_fn; + state->error = error_fn; + state->user_data = user_data; } int amqp_tune_connection(amqp_connection_state_t state, @@ -140,18 +175,17 @@ int amqp_get_channel_max(amqp_connection_state_t state) { } int amqp_destroy_connection(amqp_connection_state_t state) { - int s = state->sockfd; - - empty_amqp_pool(&state->frame_pool); - empty_amqp_pool(&state->decoding_pool); - free(state->outbound_buffer.bytes); - free(state->sock_inbound_buffer.bytes); - free(state); - - if (s >= 0 && amqp_socket_close(s) < 0) - return -amqp_socket_error(); - else - return 0; + int status = 0; + if (state) { + empty_amqp_pool(&state->frame_pool); + empty_amqp_pool(&state->decoding_pool); + free(state->outbound_buffer.bytes); + free(state->sock_inbound_buffer.bytes); + if (state->sockfd >= 0 && state->close(state->sockfd, state->user_data) < 0) + status = -state->error(state->user_data); + free(state); + } + return status; } static void return_to_idle(amqp_connection_state_t state) { @@ -365,7 +399,7 @@ int amqp_send_frame(amqp_connection_state_t state, iov[2].iov_base = &frame_end_byte; iov[2].iov_len = FOOTER_SIZE; - res = amqp_socket_writev(state->sockfd, iov, 3); + res = state->writev(state->sockfd, iov, 3, state->user_data); } else { size_t out_frame_len; @@ -412,12 +446,13 @@ int amqp_send_frame(amqp_connection_state_t state, amqp_e32(out_frame, 3, out_frame_len); amqp_e8(out_frame, out_frame_len + HEADER_SIZE, AMQP_FRAME_END); - res = send(state->sockfd, out_frame, - out_frame_len + HEADER_SIZE + FOOTER_SIZE, 0); + res = state->send(state->sockfd, out_frame, + out_frame_len + HEADER_SIZE + FOOTER_SIZE, + 0, state->user_data); } if (res < 0) - return -amqp_socket_error(); + return -state->error(state->user_data); else return 0; } diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h index bbee792..192cbba 100644 --- a/librabbitmq/amqp_private.h +++ b/librabbitmq/amqp_private.h @@ -39,6 +39,7 @@ #include "amqp.h" #include "amqp_framing.h" +#include <arpa/inet.h> #include <string.h> /* Error numbering: Because of differences in error numbering on @@ -67,8 +68,11 @@ #if __GNUC__ > 2 | (__GNUC__ == 2 && __GNUC_MINOR__ > 4) #define AMQP_NORETURN \ __attribute__ ((__noreturn__)) +#define AMQP_UNUSED \ + __attribute__ ((__unused__)) #else #define AMQP_NORETURN +#define AMQP_UNUSED #endif #if __GNUC__ >= 4 @@ -140,6 +144,13 @@ struct amqp_connection_state_t_ { amqp_bytes_t outbound_buffer; int sockfd; + amqp_socket_writev_fn writev; + amqp_socket_send_fn send; + amqp_socket_recv_fn recv; + amqp_socket_close_fn close; + amqp_socket_error_fn error; + void *user_data; + amqp_bytes_t sock_inbound_buffer; size_t sock_inbound_offset; size_t sock_inbound_limit; diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index 6b9486c..2025549 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -35,12 +35,16 @@ #endif #include "amqp_private.h" -#include <stdlib.h> +#include <assert.h> +#include <netdb.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#include <stdarg.h> +#include <stdint.h> #include <stdio.h> +#include <stdlib.h> #include <string.h> -#include <stdint.h> -#include <stdarg.h> -#include <assert.h> +#include <sys/socket.h> int amqp_open_socket(char const *hostname, int portnumber) @@ -62,16 +66,16 @@ int amqp_open_socket(char const *hostname, addr.sin_port = htons(portnumber); addr.sin_addr.s_addr = * (uint32_t *) he->h_addr_list[0]; - sockfd = socket(PF_INET, SOCK_STREAM, 0); + sockfd = amqp_socket_socket(PF_INET, SOCK_STREAM, 0); if (sockfd == -1) - return -amqp_socket_error(); + return -amqp_socket_error(NULL); if (amqp_socket_setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one)) < 0 || connect(sockfd, (struct sockaddr *) &addr, sizeof(addr)) < 0) { - res = -amqp_socket_error(); - amqp_socket_close(sockfd); + res = -amqp_socket_error(NULL); + amqp_socket_close(sockfd, NULL); return res; } @@ -83,7 +87,7 @@ int amqp_send_header(amqp_connection_state_t state) { AMQP_PROTOCOL_VERSION_MAJOR, AMQP_PROTOCOL_VERSION_MINOR, AMQP_PROTOCOL_VERSION_REVISION }; - return send(state->sockfd, (void *)header, 8, 0); + return state->send(state->sockfd, (void *)header, 8, 0, state->user_data); } static amqp_bytes_t sasl_method_name(amqp_sasl_method_enum method) { @@ -173,13 +177,13 @@ static int wait_frame_inner(amqp_connection_state_t state, assert(res != 0); } - res = recv(state->sockfd, state->sock_inbound_buffer.bytes, - state->sock_inbound_buffer.len, 0); + res = state->recv(state->sockfd, state->sock_inbound_buffer.bytes, + state->sock_inbound_buffer.len, 0, state->user_data); if (res <= 0) { if (res == 0) return -ERROR_CONNECTION_CLOSED; else - return -amqp_socket_error(); + return -state->error(state->user_data); } state->sock_inbound_limit = res; diff --git a/librabbitmq/unix/socket.c b/librabbitmq/unix/socket.c index cb8a2b9..0a15f2b 100644 --- a/librabbitmq/unix/socket.c +++ b/librabbitmq/unix/socket.c @@ -36,6 +36,7 @@ #include "amqp_private.h" #include "socket.h" +#include <errno.h> #include <fcntl.h> #include <stdint.h> #include <stdlib.h> @@ -51,12 +52,7 @@ amqp_socket_init(void) } int -amqp_socket_error(void) -{ - return errno | ERROR_CATEGORY_OS; -} - -int amqp_socket_socket(int domain, int type, int proto) +amqp_socket_socket(int domain, int type, int proto) { int flags; @@ -77,7 +73,27 @@ int amqp_socket_socket(int domain, int type, int proto) return s; } -char *amqp_os_error_string(int err) +char * +amqp_os_error_string(int err) { return strdup(strerror(err)); } + +int +amqp_socket_close(int sockfd, AMQP_UNUSED void *user_data) +{ + return close(sockfd); +} + +int +amqp_socket_writev(int sockfd, const struct iovec *iov, + int iovcnt, AMQP_UNUSED void *user_data) +{ + return writev(sockfd, iov, iovcnt); +} + +int +amqp_socket_error(AMQP_UNUSED void *user_data) +{ + return errno | ERROR_CATEGORY_OS; +} diff --git a/librabbitmq/unix/socket.h b/librabbitmq/unix/socket.h index ff6fa73..051cc72 100644 --- a/librabbitmq/unix/socket.h +++ b/librabbitmq/unix/socket.h @@ -33,14 +33,7 @@ * ***** END LICENSE BLOCK ***** */ -#include <errno.h> -#include <netdb.h> -#include <netinet/in.h> -#include <netinet/tcp.h> -#include <sys/socket.h> -#include <sys/types.h> #include <sys/uio.h> -#include <unistd.h> int amqp_socket_init(void); @@ -49,10 +42,18 @@ int amqp_socket_socket(int domain, int type, int proto); int -amqp_socket_error(void); +amqp_socket_error(void *user_data); + +int +amqp_socket_socket(int domain, int type, int proto); + +int +amqp_socket_close(int sockfd, void *user_data); + +int +amqp_socket_writev(int sockfd, const struct iovec *iov, int iovcnt, + void *user_data); #define amqp_socket_setsockopt setsockopt -#define amqp_socket_close close -#define amqp_socket_writev writev #endif diff --git a/librabbitmq/win32/socket.c b/librabbitmq/win32/socket.c index 43b919b..06db54e 100644 --- a/librabbitmq/win32/socket.c +++ b/librabbitmq/win32/socket.c @@ -45,7 +45,8 @@ static int called_wsastartup; -int amqp_socket_init(void) +int +amqp_socket_init(void) { if (!called_wsastartup) { WSADATA data; @@ -59,7 +60,8 @@ int amqp_socket_init(void) return 0; } -char *amqp_os_error_string(int err) +char * +amqp_os_error_string(int err) { char *msg, *copy; @@ -85,7 +87,14 @@ amqp_socket_setsockopt(int sock, int level, int optname, } int -amqp_socket_writev(int sock, struct iovec *iov, int nvecs) +amqp_socket_close(int sockfd, AMQP_UNUSED void *user_data) +{ + return closesocket(sockfd); +} + +int +amqp_socket_writev(int sock, struct iovec *iov, int nvecs, + AMQP_UNUSED void *user_data) { DWORD ret; if (WSASend(sock, (LPWSABUF)iov, nvecs, &ret, 0, NULL, NULL) == 0) @@ -95,7 +104,7 @@ amqp_socket_writev(int sock, struct iovec *iov, int nvecs) } int -amqp_socket_error(void) +amqp_socket_error(AMQP_UNUSED void *user_data) { return WSAGetLastError() | ERROR_CATEGORY_OS; } diff --git a/librabbitmq/win32/socket.h b/librabbitmq/win32/socket.h index 4572410..4353658 100644 --- a/librabbitmq/win32/socket.h +++ b/librabbitmq/win32/socket.h @@ -33,6 +33,7 @@ * ***** END LICENSE BLOCK ***** */ +#include "amqp_private.h" #include <winsock2.h> /* same as WSABUF */ @@ -45,7 +46,8 @@ int amqp_socket_init(void); #define amqp_socket_socket socket -#define amqp_socket_close closesocket +int +amqp_socket_close(int sockfd, AMQP_UNUSED void *user_data); int amqp_socket_setsockopt(int sock, int level, int optname, const void *optval, |