summaryrefslogtreecommitdiff
path: root/librabbitmq/amqp_tcp_socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'librabbitmq/amqp_tcp_socket.c')
-rw-r--r--librabbitmq/amqp_tcp_socket.c84
1 files changed, 36 insertions, 48 deletions
diff --git a/librabbitmq/amqp_tcp_socket.c b/librabbitmq/amqp_tcp_socket.c
index f0b63b3..12e02cd 100644
--- a/librabbitmq/amqp_tcp_socket.c
+++ b/librabbitmq/amqp_tcp_socket.c
@@ -29,14 +29,14 @@
#include <errno.h>
#if ((defined(_WIN32)) || (defined(__MINGW32__)) || (defined(__MINGW64__)))
-# ifndef WIN32_LEAN_AND_MEAN
-# define WIN32_LEAN_AND_MEAN
-# endif
-# include <winsock2.h>
+#ifndef WIN32_LEAN_AND_MEAN
+#define WIN32_LEAN_AND_MEAN
+#endif
+#include <winsock2.h>
#else
-# include <sys/socket.h>
-# include <netinet/in.h>
-# include <netinet/tcp.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <sys/socket.h>
#endif
#include <stdio.h>
#include <stdlib.h>
@@ -48,10 +48,8 @@ struct amqp_tcp_socket_t {
int state;
};
-
-static ssize_t
-amqp_tcp_socket_send(void *base, const void *buf, size_t len, int flags)
-{
+static ssize_t amqp_tcp_socket_send(void *base, const void *buf, size_t len,
+ int flags) {
struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
ssize_t res;
int flagz = 0;
@@ -68,8 +66,8 @@ amqp_tcp_socket_send(void *base, const void *buf, size_t len, int flags)
if (flags & AMQP_SF_MORE) {
flagz |= MSG_MORE;
}
- /* Cygwin defines TCP_NOPUSH, but trying to use it will return not
- * implemented. Disable it here. */
+/* Cygwin defines TCP_NOPUSH, but trying to use it will return not
+ * implemented. Disable it here. */
#elif defined(TCP_NOPUSH) && !defined(__CYGWIN__)
if (flags & AMQP_SF_MORE && !(self->state & AMQP_SF_MORE)) {
int one = 1;
@@ -80,14 +78,15 @@ amqp_tcp_socket_send(void *base, const void *buf, size_t len, int flags)
}
self->state |= AMQP_SF_MORE;
} else if (!(flags & AMQP_SF_MORE) && self->state & AMQP_SF_MORE) {
- int zero = 0;
- res = setsockopt(self->sockfd, IPPROTO_TCP, TCP_NOPUSH, &zero, sizeof(&zero));
- if (0 != res) {
- self->internal_error = res;
- res = AMQP_STATUS_SOCKET_ERROR;
- } else {
- self->state &= ~AMQP_SF_MORE;
- }
+ int zero = 0;
+ res =
+ setsockopt(self->sockfd, IPPROTO_TCP, TCP_NOPUSH, &zero, sizeof(&zero));
+ if (0 != res) {
+ self->internal_error = res;
+ res = AMQP_STATUS_SOCKET_ERROR;
+ } else {
+ self->state &= ~AMQP_SF_MORE;
+ }
}
#endif
@@ -123,9 +122,8 @@ start:
return res;
}
-static ssize_t
-amqp_tcp_socket_recv(void *base, void *buf, size_t len, int flags)
-{
+static ssize_t amqp_tcp_socket_recv(void *base, void *buf, size_t len,
+ int flags) {
struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
ssize_t ret;
if (-1 == self->sockfd) {
@@ -164,9 +162,8 @@ start:
return ret;
}
-static int
-amqp_tcp_socket_open(void *base, const char *host, int port, struct timeval *timeout)
-{
+static int amqp_tcp_socket_open(void *base, const char *host, int port,
+ struct timeval *timeout) {
struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
if (-1 != self->sockfd) {
return AMQP_STATUS_SOCKET_INUSE;
@@ -180,9 +177,8 @@ amqp_tcp_socket_open(void *base, const char *host, int port, struct timeval *tim
return AMQP_STATUS_OK;
}
-static int
-amqp_tcp_socket_close(void *base, AMQP_UNUSED amqp_socket_close_enum force)
-{
+static int amqp_tcp_socket_close(void *base,
+ AMQP_UNUSED amqp_socket_close_enum force) {
struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
if (-1 == self->sockfd) {
return AMQP_STATUS_SOCKET_CLOSED;
@@ -196,16 +192,12 @@ amqp_tcp_socket_close(void *base, AMQP_UNUSED amqp_socket_close_enum force)
return AMQP_STATUS_OK;
}
-static int
-amqp_tcp_socket_get_sockfd(void *base)
-{
+static int amqp_tcp_socket_get_sockfd(void *base) {
struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
return self->sockfd;
}
-static void
-amqp_tcp_socket_delete(void *base)
-{
+static void amqp_tcp_socket_delete(void *base) {
struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
if (self) {
@@ -215,17 +207,15 @@ amqp_tcp_socket_delete(void *base)
}
static const struct amqp_socket_class_t amqp_tcp_socket_class = {
- amqp_tcp_socket_send, /* send */
- amqp_tcp_socket_recv, /* recv */
- amqp_tcp_socket_open, /* open */
- amqp_tcp_socket_close, /* close */
- amqp_tcp_socket_get_sockfd, /* get_sockfd */
- amqp_tcp_socket_delete /* delete */
+ amqp_tcp_socket_send, /* send */
+ amqp_tcp_socket_recv, /* recv */
+ amqp_tcp_socket_open, /* open */
+ amqp_tcp_socket_close, /* close */
+ amqp_tcp_socket_get_sockfd, /* get_sockfd */
+ amqp_tcp_socket_delete /* delete */
};
-amqp_socket_t *
-amqp_tcp_socket_new(amqp_connection_state_t state)
-{
+amqp_socket_t *amqp_tcp_socket_new(amqp_connection_state_t state) {
struct amqp_tcp_socket_t *self = calloc(1, sizeof(*self));
if (!self) {
return NULL;
@@ -238,9 +228,7 @@ amqp_tcp_socket_new(amqp_connection_state_t state)
return (amqp_socket_t *)self;
}
-void
-amqp_tcp_socket_set_sockfd(amqp_socket_t *base, int sockfd)
-{
+void amqp_tcp_socket_set_sockfd(amqp_socket_t *base, int sockfd) {
struct amqp_tcp_socket_t *self;
if (base->klass != &amqp_tcp_socket_class) {
amqp_abort("<%p> is not of type amqp_tcp_socket_t", base);