diff options
Diffstat (limited to 'librabbitmq/amqp_tcp_socket.c')
-rw-r--r-- | librabbitmq/amqp_tcp_socket.c | 84 |
1 files changed, 36 insertions, 48 deletions
diff --git a/librabbitmq/amqp_tcp_socket.c b/librabbitmq/amqp_tcp_socket.c index f0b63b3..12e02cd 100644 --- a/librabbitmq/amqp_tcp_socket.c +++ b/librabbitmq/amqp_tcp_socket.c @@ -29,14 +29,14 @@ #include <errno.h> #if ((defined(_WIN32)) || (defined(__MINGW32__)) || (defined(__MINGW64__))) -# ifndef WIN32_LEAN_AND_MEAN -# define WIN32_LEAN_AND_MEAN -# endif -# include <winsock2.h> +#ifndef WIN32_LEAN_AND_MEAN +#define WIN32_LEAN_AND_MEAN +#endif +#include <winsock2.h> #else -# include <sys/socket.h> -# include <netinet/in.h> -# include <netinet/tcp.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#include <sys/socket.h> #endif #include <stdio.h> #include <stdlib.h> @@ -48,10 +48,8 @@ struct amqp_tcp_socket_t { int state; }; - -static ssize_t -amqp_tcp_socket_send(void *base, const void *buf, size_t len, int flags) -{ +static ssize_t amqp_tcp_socket_send(void *base, const void *buf, size_t len, + int flags) { struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; ssize_t res; int flagz = 0; @@ -68,8 +66,8 @@ amqp_tcp_socket_send(void *base, const void *buf, size_t len, int flags) if (flags & AMQP_SF_MORE) { flagz |= MSG_MORE; } - /* Cygwin defines TCP_NOPUSH, but trying to use it will return not - * implemented. Disable it here. */ +/* Cygwin defines TCP_NOPUSH, but trying to use it will return not + * implemented. Disable it here. */ #elif defined(TCP_NOPUSH) && !defined(__CYGWIN__) if (flags & AMQP_SF_MORE && !(self->state & AMQP_SF_MORE)) { int one = 1; @@ -80,14 +78,15 @@ amqp_tcp_socket_send(void *base, const void *buf, size_t len, int flags) } self->state |= AMQP_SF_MORE; } else if (!(flags & AMQP_SF_MORE) && self->state & AMQP_SF_MORE) { - int zero = 0; - res = setsockopt(self->sockfd, IPPROTO_TCP, TCP_NOPUSH, &zero, sizeof(&zero)); - if (0 != res) { - self->internal_error = res; - res = AMQP_STATUS_SOCKET_ERROR; - } else { - self->state &= ~AMQP_SF_MORE; - } + int zero = 0; + res = + setsockopt(self->sockfd, IPPROTO_TCP, TCP_NOPUSH, &zero, sizeof(&zero)); + if (0 != res) { + self->internal_error = res; + res = AMQP_STATUS_SOCKET_ERROR; + } else { + self->state &= ~AMQP_SF_MORE; + } } #endif @@ -123,9 +122,8 @@ start: return res; } -static ssize_t -amqp_tcp_socket_recv(void *base, void *buf, size_t len, int flags) -{ +static ssize_t amqp_tcp_socket_recv(void *base, void *buf, size_t len, + int flags) { struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; ssize_t ret; if (-1 == self->sockfd) { @@ -164,9 +162,8 @@ start: return ret; } -static int -amqp_tcp_socket_open(void *base, const char *host, int port, struct timeval *timeout) -{ +static int amqp_tcp_socket_open(void *base, const char *host, int port, + struct timeval *timeout) { struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; if (-1 != self->sockfd) { return AMQP_STATUS_SOCKET_INUSE; @@ -180,9 +177,8 @@ amqp_tcp_socket_open(void *base, const char *host, int port, struct timeval *tim return AMQP_STATUS_OK; } -static int -amqp_tcp_socket_close(void *base, AMQP_UNUSED amqp_socket_close_enum force) -{ +static int amqp_tcp_socket_close(void *base, + AMQP_UNUSED amqp_socket_close_enum force) { struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; if (-1 == self->sockfd) { return AMQP_STATUS_SOCKET_CLOSED; @@ -196,16 +192,12 @@ amqp_tcp_socket_close(void *base, AMQP_UNUSED amqp_socket_close_enum force) return AMQP_STATUS_OK; } -static int -amqp_tcp_socket_get_sockfd(void *base) -{ +static int amqp_tcp_socket_get_sockfd(void *base) { struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; return self->sockfd; } -static void -amqp_tcp_socket_delete(void *base) -{ +static void amqp_tcp_socket_delete(void *base) { struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; if (self) { @@ -215,17 +207,15 @@ amqp_tcp_socket_delete(void *base) } static const struct amqp_socket_class_t amqp_tcp_socket_class = { - amqp_tcp_socket_send, /* send */ - amqp_tcp_socket_recv, /* recv */ - amqp_tcp_socket_open, /* open */ - amqp_tcp_socket_close, /* close */ - amqp_tcp_socket_get_sockfd, /* get_sockfd */ - amqp_tcp_socket_delete /* delete */ + amqp_tcp_socket_send, /* send */ + amqp_tcp_socket_recv, /* recv */ + amqp_tcp_socket_open, /* open */ + amqp_tcp_socket_close, /* close */ + amqp_tcp_socket_get_sockfd, /* get_sockfd */ + amqp_tcp_socket_delete /* delete */ }; -amqp_socket_t * -amqp_tcp_socket_new(amqp_connection_state_t state) -{ +amqp_socket_t *amqp_tcp_socket_new(amqp_connection_state_t state) { struct amqp_tcp_socket_t *self = calloc(1, sizeof(*self)); if (!self) { return NULL; @@ -238,9 +228,7 @@ amqp_tcp_socket_new(amqp_connection_state_t state) return (amqp_socket_t *)self; } -void -amqp_tcp_socket_set_sockfd(amqp_socket_t *base, int sockfd) -{ +void amqp_tcp_socket_set_sockfd(amqp_socket_t *base, int sockfd) { struct amqp_tcp_socket_t *self; if (base->klass != &amqp_tcp_socket_class) { amqp_abort("<%p> is not of type amqp_tcp_socket_t", base); |