diff options
-rw-r--r-- | librabbitmq/amqp_tcp_socket.c | 23 |
1 files changed, 23 insertions, 0 deletions
diff --git a/librabbitmq/amqp_tcp_socket.c b/librabbitmq/amqp_tcp_socket.c index e363b90..490caad 100644 --- a/librabbitmq/amqp_tcp_socket.c +++ b/librabbitmq/amqp_tcp_socket.c @@ -29,6 +29,9 @@ #include "amqp_tcp_socket.h" #include <errno.h> +#ifndef _WIN32 +# include <netinet/tcp.h> +#endif #include <stdio.h> #include <stdlib.h> @@ -38,6 +41,7 @@ struct amqp_tcp_socket_t { void *buffer; size_t buffer_length; int internal_error; + int state; }; @@ -60,6 +64,25 @@ amqp_tcp_socket_send(void *base, const void *buf, size_t len, int flags) if (flags & AMQP_SF_MORE) { flagz |= MSG_MORE; } +#elif defined(TCP_NOPUSH) + if (flags & AMQP_SF_MORE && !(self->state & AMQP_SF_MORE)) { + int one = 1; + res = setsockopt(self->sockfd, IPPROTO_TCP, TCP_NOPUSH, &one, sizeof(one)); + if (0 != res) { + self->internal_error = res; + return AMQP_STATUS_SOCKET_ERROR; + } + self->state |= AMQP_SF_MORE; + } else if (!(flags & AMQP_SF_MORE) && self->state & AMQP_SF_MORE) { + int zero = 0; + res = setsockopt(self->sockfd, IPPROTO_TCP, TCP_NOPUSH, &zero, sizeof(&zero)); + if (0 != res) { + self->internal_error = res; + res = AMQP_STATUS_SOCKET_ERROR; + } else { + self->state &= ~AMQP_SF_MORE; + } + } #endif start: |