diff options
-rw-r--r-- | librabbitmq/amqp.h | 22 | ||||
-rw-r--r-- | librabbitmq/amqp_connection.c | 66 | ||||
-rw-r--r-- | librabbitmq/amqp_private.h | 11 | ||||
-rw-r--r-- | librabbitmq/amqp_socket.c | 20 | ||||
-rw-r--r-- | librabbitmq/unix/socket.c | 30 | ||||
-rw-r--r-- | librabbitmq/unix/socket.h | 15 | ||||
-rw-r--r-- | librabbitmq/win32/socket.c | 17 | ||||
-rw-r--r-- | librabbitmq/win32/socket.h | 4 | ||||
-rw-r--r-- | tools/common.c | 1 |
9 files changed, 145 insertions, 41 deletions
diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h index 184ef51..d1bfe4e 100644 --- a/librabbitmq/amqp.h +++ b/librabbitmq/amqp.h @@ -52,6 +52,7 @@ */ #if defined(_WIN32) && defined(_MSC_VER) +struct iovec; # if defined(AMQP_BUILD) && !defined(AMQP_STATIC) # define AMQP_PUBLIC_FUNCTION __declspec(dllexport) # define AMQP_PUBLIC_VARIABLE __declspec(dllexport) extern @@ -66,6 +67,7 @@ # define AMQP_CALL __cdecl #elif defined(_WIN32) && defined(__BORLANDC__) +struct iovec; # if defined(AMQP_BUILD) && !defined(AMQP_STATIC) # define AMQP_PUBLIC_FUNCTION __declspec(dllexport) # define AMQP_PUBLIC_VARIABLE __declspec(dllexport) extern @@ -80,6 +82,7 @@ # define AMQP_CALL __cdecl #elif defined(_WIN32) && defined(__MINGW32__) +struct iovec; # if defined(AMQP_BUILD) && !defined(AMQP_STATIC) # define AMQP_PUBLIC_FUNCTION __declspec(dllexport) # define AMQP_PUBLIC_VARIABLE __declspec(dllexport) @@ -94,6 +97,7 @@ # define AMQP_CALL __cdecl #elif defined(_WIN32) && defined(__CYGWIN__) +struct iovec; # if defined(AMQP_BUILD) && !defined(AMQP_STATIC) # define AMQP_PUBLIC_FUNCTION __declspec(dllexport) # define AMQP_PUBLIC_VARIABLE __declspec(dllexport) @@ -108,6 +112,7 @@ # define AMQP_CALL __cdecl #elif defined(__GNUC__) && __GNUC__ >= 4 +# include <sys/uio.h> # define AMQP_PUBLIC_FUNCTION \ __attribute__ ((visibility ("default"))) # define AMQP_PUBLIC_VARIABLE \ @@ -297,6 +302,13 @@ typedef enum amqp_sasl_method_enum_ { /* Opaque struct. */ typedef struct amqp_connection_state_t_ *amqp_connection_state_t; +/* Socket callbacks. */ +typedef ssize_t (*amqp_socket_writev_fn)(int, const struct iovec *, int, void *); +typedef ssize_t (*amqp_socket_send_fn)(int, const void *, size_t, int, void *); +typedef ssize_t (*amqp_socket_recv_fn)(int, void *, size_t, int, void *); +typedef int (*amqp_socket_close_fn)(int, void *); +typedef int (*amqp_socket_error_fn)(void *); + AMQP_PUBLIC_FUNCTION char const * AMQP_CALL amqp_version(void); @@ -361,6 +373,16 @@ void AMQP_CALL amqp_set_sockfd(amqp_connection_state_t state, int sockfd); AMQP_PUBLIC_FUNCTION +void +amqp_set_sockfd_full(amqp_connection_state_t state, int sockfd, + amqp_socket_writev_fn writev_fn, + amqp_socket_send_fn send_fn, + amqp_socket_recv_fn recv_fn, + amqp_socket_close_fn close_fn, + amqp_socket_error_fn error_fn, + void *user_data); + +AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_tune_connection(amqp_connection_state_t state, int channel_max, diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c index b3196b1..6468da7 100644 --- a/librabbitmq/amqp_connection.c +++ b/librabbitmq/amqp_connection.c @@ -59,6 +59,18 @@ _check_state->state); \ } +static ssize_t amqp_socket_send(int sockfd, const void *buf, size_t len, + int flags, AMQP_UNUSED void *user_data) +{ + return send(sockfd, buf, len, flags); +} + +static ssize_t amqp_socket_recv(int sockfd, void *buf, size_t len, int flags, + AMQP_UNUSED void *user_data) +{ + return recv(sockfd, buf, len, flags); +} + amqp_connection_state_t amqp_new_connection(void) { int res; @@ -115,6 +127,29 @@ void amqp_set_sockfd(amqp_connection_state_t state, int sockfd) { state->sockfd = sockfd; + state->writev = amqp_socket_writev; + state->send = amqp_socket_send; + state->recv = amqp_socket_recv; + state->close = amqp_socket_close; + state->error = amqp_socket_error; + state->user_data = NULL; +} + +void amqp_set_sockfd_full(amqp_connection_state_t state, int sockfd, + amqp_socket_writev_fn writev_fn, + amqp_socket_send_fn send_fn, + amqp_socket_recv_fn recv_fn, + amqp_socket_close_fn close_fn, + amqp_socket_error_fn error_fn, + void *user_data) +{ + state->sockfd = sockfd; + state->writev = writev_fn; + state->send = send_fn; + state->recv = recv_fn; + state->close = close_fn; + state->error = error_fn; + state->user_data = user_data; } int amqp_tune_connection(amqp_connection_state_t state, @@ -152,19 +187,17 @@ int amqp_get_channel_max(amqp_connection_state_t state) int amqp_destroy_connection(amqp_connection_state_t state) { - int s = state->sockfd; - - empty_amqp_pool(&state->frame_pool); - empty_amqp_pool(&state->decoding_pool); - free(state->outbound_buffer.bytes); - free(state->sock_inbound_buffer.bytes); - free(state); - - if (s >= 0 && amqp_socket_close(s) < 0) { - return -amqp_socket_error(); - } else { - return 0; + int status = 0; + if (state) { + empty_amqp_pool(&state->frame_pool); + empty_amqp_pool(&state->decoding_pool); + free(state->outbound_buffer.bytes); + free(state->sock_inbound_buffer.bytes); + if (state->sockfd >= 0 && state->close(state->sockfd, state->user_data) < 0) + status = -state->error(state->user_data); + free(state); } + return status; } static void return_to_idle(amqp_connection_state_t state) @@ -392,7 +425,7 @@ int amqp_send_frame(amqp_connection_state_t state, iov[2].iov_base = &frame_end_byte; iov[2].iov_len = FOOTER_SIZE; - res = amqp_socket_writev(state->sockfd, iov, 3); + res = state->writev(state->sockfd, iov, 3, state->user_data); } else { size_t out_frame_len; amqp_bytes_t encoded; @@ -440,12 +473,13 @@ int amqp_send_frame(amqp_connection_state_t state, amqp_e32(out_frame, 3, out_frame_len); amqp_e8(out_frame, out_frame_len + HEADER_SIZE, AMQP_FRAME_END); - res = send(state->sockfd, out_frame, - out_frame_len + HEADER_SIZE + FOOTER_SIZE, MSG_NOSIGNAL); + res = state->send(state->sockfd, out_frame, + out_frame_len + HEADER_SIZE + FOOTER_SIZE, + MSG_NOSIGNAL, state->user_data); } if (res < 0) { - return -amqp_socket_error(); + return -state->error(state->user_data); } else { return 0; } diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h index 68239d1..b28d7bb 100644 --- a/librabbitmq/amqp_private.h +++ b/librabbitmq/amqp_private.h @@ -43,6 +43,7 @@ #include "amqp.h" #include "amqp_framing.h" +#include <arpa/inet.h> #include <string.h> /* Error numbering: Because of differences in error numbering on @@ -71,8 +72,11 @@ #if __GNUC__ > 2 | (__GNUC__ == 2 && __GNUC_MINOR__ > 4) #define AMQP_NORETURN \ __attribute__ ((__noreturn__)) +#define AMQP_UNUSED \ + __attribute__ ((__unused__)) #else #define AMQP_NORETURN +#define AMQP_UNUSED #endif #if __GNUC__ >= 4 @@ -144,6 +148,13 @@ struct amqp_connection_state_t_ { amqp_bytes_t outbound_buffer; int sockfd; + amqp_socket_writev_fn writev; + amqp_socket_send_fn send; + amqp_socket_recv_fn recv; + amqp_socket_close_fn close; + amqp_socket_error_fn error; + void *user_data; + amqp_bytes_t sock_inbound_buffer; size_t sock_inbound_offset; size_t sock_inbound_limit; diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index 22cc2c5..f57a512 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -39,12 +39,16 @@ #endif #include "amqp_private.h" -#include <stdlib.h> +#include <assert.h> +#include <netdb.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#include <stdarg.h> +#include <stdint.h> #include <stdio.h> +#include <stdlib.h> #include <string.h> -#include <stdint.h> -#include <stdarg.h> -#include <assert.h> +#include <sys/socket.h> int amqp_open_socket(char const *hostname, int portnumber) @@ -117,7 +121,7 @@ int amqp_send_header(amqp_connection_state_t state) AMQP_PROTOCOL_VERSION_MINOR, AMQP_PROTOCOL_VERSION_REVISION }; - return send(state->sockfd, (void *)header, 8, MSG_NOSIGNAL); + return state->send(state->sockfd, (void *)header, 8, 0, state->user_data); } static amqp_bytes_t sasl_method_name(amqp_sasl_method_enum method) @@ -214,13 +218,13 @@ static int wait_frame_inner(amqp_connection_state_t state, assert(res != 0); } - res = recv(state->sockfd, state->sock_inbound_buffer.bytes, - state->sock_inbound_buffer.len, 0); + res = state->recv(state->sockfd, state->sock_inbound_buffer.bytes, + state->sock_inbound_buffer.len, 0, state->user_data); if (res <= 0) { if (res == 0) { return -ERROR_CONNECTION_CLOSED; } else { - return -amqp_socket_error(); + return -state->error(state->user_data); } } diff --git a/librabbitmq/unix/socket.c b/librabbitmq/unix/socket.c index 38064f1..f6414d2 100644 --- a/librabbitmq/unix/socket.c +++ b/librabbitmq/unix/socket.c @@ -40,6 +40,7 @@ #include "amqp_private.h" #include "socket.h" +#include <errno.h> #include <fcntl.h> #include <stdint.h> #include <stdlib.h> @@ -52,12 +53,7 @@ amqp_socket_init(void) } int -amqp_socket_error(void) -{ - return errno | ERROR_CATEGORY_OS; -} - -int amqp_socket_socket(int domain, int type, int proto) +amqp_socket_socket(int domain, int type, int proto) { int flags; @@ -79,7 +75,27 @@ int amqp_socket_socket(int domain, int type, int proto) return s; } -char *amqp_os_error_string(int err) +char * +amqp_os_error_string(int err) { return strdup(strerror(err)); } + +int +amqp_socket_close(int sockfd, AMQP_UNUSED void *user_data) +{ + return close(sockfd); +} + +int +amqp_socket_writev(int sockfd, const struct iovec *iov, + int iovcnt, AMQP_UNUSED void *user_data) +{ + return writev(sockfd, iov, iovcnt); +} + +int +amqp_socket_error(AMQP_UNUSED void *user_data) +{ + return errno | ERROR_CATEGORY_OS; +} diff --git a/librabbitmq/unix/socket.h b/librabbitmq/unix/socket.h index 1382ca3..9c37930 100644 --- a/librabbitmq/unix/socket.h +++ b/librabbitmq/unix/socket.h @@ -44,7 +44,6 @@ #include <sys/socket.h> #include <netdb.h> #include <sys/uio.h> -#include <unistd.h> int amqp_socket_init(void); @@ -53,11 +52,19 @@ int amqp_socket_socket(int domain, int type, int proto); int -amqp_socket_error(void); +amqp_socket_error(void *user_data); + +int +amqp_socket_socket(int domain, int type, int proto); + +int +amqp_socket_close(int sockfd, void *user_data); + +int +amqp_socket_writev(int sockfd, const struct iovec *iov, int iovcnt, + void *user_data); #define amqp_socket_setsockopt setsockopt -#define amqp_socket_close close -#define amqp_socket_writev writev #if defined(SO_NOSIGPIPE) && !defined(MSG_NOSIGNAL) # define DISABLE_SIGPIPE_WITH_SETSOCKOPT diff --git a/librabbitmq/win32/socket.c b/librabbitmq/win32/socket.c index a5d9454..7c6c230 100644 --- a/librabbitmq/win32/socket.c +++ b/librabbitmq/win32/socket.c @@ -49,7 +49,8 @@ static int called_wsastartup; -int amqp_socket_init(void) +int +amqp_socket_init(void) { if (!called_wsastartup) { WSADATA data; @@ -64,7 +65,8 @@ int amqp_socket_init(void) return 0; } -char *amqp_os_error_string(int err) +char * +amqp_os_error_string(int err) { char *msg, *copy; @@ -91,7 +93,14 @@ amqp_socket_setsockopt(int sock, int level, int optname, } int -amqp_socket_writev(int sock, struct iovec *iov, int nvecs) +amqp_socket_close(int sockfd, AMQP_UNUSED void *user_data) +{ + return closesocket(sockfd); +} + +int +amqp_socket_writev(int sock, struct iovec *iov, int nvecs, + AMQP_UNUSED void *user_data) { DWORD ret; if (WSASend(sock, (LPWSABUF)iov, nvecs, &ret, 0, NULL, NULL) == 0) { @@ -102,7 +111,7 @@ amqp_socket_writev(int sock, struct iovec *iov, int nvecs) } int -amqp_socket_error(void) +amqp_socket_error(AMQP_UNUSED void *user_data) { return WSAGetLastError() | ERROR_CATEGORY_OS; } diff --git a/librabbitmq/win32/socket.h b/librabbitmq/win32/socket.h index cea361d..a47532e 100644 --- a/librabbitmq/win32/socket.h +++ b/librabbitmq/win32/socket.h @@ -37,6 +37,7 @@ * ***** END LICENSE BLOCK ***** */ +#include "amqp_private.h" #include <winsock2.h> #include <WS2tcpip.h> @@ -50,7 +51,8 @@ int amqp_socket_init(void); #define amqp_socket_socket socket -#define amqp_socket_close closesocket +int +amqp_socket_close(int sockfd, AMQP_UNUSED void *user_data); int amqp_socket_setsockopt(int sock, int level, int optname, const void *optval, diff --git a/tools/common.c b/tools/common.c index 7659521..46e0a26 100644 --- a/tools/common.c +++ b/tools/common.c @@ -39,7 +39,6 @@ #endif /* needed for asnprintf */ -#define _GNU_SOURCE #include <stdio.h> #include <stdlib.h> #include <stdarg.h> |