diff options
author | Alan Antonuk <alan.antonuk@gmail.com> | 2015-05-23 23:56:51 -0700 |
---|---|---|
committer | Alan Antonuk <alan.antonuk@gmail.com> | 2015-05-31 22:57:38 -0700 |
commit | 2bc1f9b1b03c217512ce7026f76976cdcb8cc17e (patch) | |
tree | ec9d45fc68e1f1dedb570b1694df34738c8cbb04 | |
parent | ed363ad6db549e9d3fa57f7ca761032eecb2a1a1 (diff) | |
download | rabbitmq-c-2bc1f9b1b03c217512ce7026f76976cdcb8cc17e.tar.gz |
lib: use MSG_MORE on Linux for basic.publish
As a performance optimization in the send path, add an AMQP_SF_MORE flag
indicating that more data is intended to be sent, and that packets shouldn't be
sent out on the wire unless there is a full packet's worth of data available.
Use this to specify MSG_MORE to send() on Linux.
-rw-r--r-- | librabbitmq/amqp_api.c | 10 | ||||
-rw-r--r-- | librabbitmq/amqp_connection.c | 14 | ||||
-rw-r--r-- | librabbitmq/amqp_openssl.c | 7 | ||||
-rw-r--r-- | librabbitmq/amqp_private.h | 2 | ||||
-rw-r--r-- | librabbitmq/amqp_socket.c | 26 | ||||
-rw-r--r-- | librabbitmq/amqp_socket.h | 15 | ||||
-rw-r--r-- | librabbitmq/amqp_tcp_socket.c | 16 |
7 files changed, 60 insertions, 30 deletions
diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c index 7f29eee..83d18fa 100644 --- a/librabbitmq/amqp_api.c +++ b/librabbitmq/amqp_api.c @@ -212,7 +212,8 @@ int amqp_basic_publish(amqp_connection_state_t state, } } - res = amqp_send_method(state, channel, AMQP_BASIC_PUBLISH_METHOD, &m); + res = amqp_send_method_inner(state, channel, AMQP_BASIC_PUBLISH_METHOD, &m, + AMQP_SF_MORE); if (res < 0) { return res; } @@ -228,7 +229,7 @@ int amqp_basic_publish(amqp_connection_state_t state, f.payload.properties.body_size = body.len; f.payload.properties.decoded = (void *) properties; - res = amqp_send_frame(state, &f); + res = amqp_send_frame_inner(state, &f, AMQP_SF_MORE); if (res < 0) { return res; } @@ -236,6 +237,7 @@ int amqp_basic_publish(amqp_connection_state_t state, body_offset = 0; while (body_offset < body.len) { size_t remaining = body.len - body_offset; + int flagz; if (remaining == 0) { break; @@ -246,12 +248,14 @@ int amqp_basic_publish(amqp_connection_state_t state, f.payload.body_fragment.bytes = amqp_offset(body.bytes, body_offset); if (remaining >= usable_body_payload_size) { f.payload.body_fragment.len = usable_body_payload_size; + flagz = AMQP_SF_MORE; } else { f.payload.body_fragment.len = remaining; + flagz = AMQP_SF_NONE; } body_offset += f.payload.body_fragment.len; - res = amqp_send_frame(state, &f); + res = amqp_send_frame_inner(state, &f, flagz); if (res < 0) { return res; } diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c index d2a0d41..2b8c86a 100644 --- a/librabbitmq/amqp_connection.c +++ b/librabbitmq/amqp_connection.c @@ -533,13 +533,21 @@ static int amqp_frame_to_bytes(const amqp_frame_t *frame, amqp_bytes_t buffer, } int amqp_send_frame(amqp_connection_state_t state, - const amqp_frame_t *frame) -{ + const amqp_frame_t *frame) { + return amqp_send_frame_inner(state, frame, AMQP_SF_NONE); +} +int amqp_send_frame_inner(amqp_connection_state_t state, + const amqp_frame_t *frame, int flags) { int res; ssize_t sent; amqp_bytes_t encoded; + /* TODO: if the AMQP_SF_MORE socket optimization can be shown to work + * correctly, then this could be un-done so that body-frames are sent as 3 + * send calls, getting rid of the copy of the body content, some testing + * would need to be done to see if this would actually a win for performance. + * */ res = amqp_frame_to_bytes(frame, state->outbound_buffer, &encoded); if (AMQP_STATUS_OK != res) { return res; @@ -547,7 +555,7 @@ int amqp_send_frame(amqp_connection_state_t state, start_send: sent = amqp_try_send(state, encoded.bytes, encoded.len, - state->next_recv_heartbeat); + state->next_recv_heartbeat, flags); if (0 > sent) { return (int)sent; } diff --git a/librabbitmq/amqp_openssl.c b/librabbitmq/amqp_openssl.c index c3c6294..76c0d56 100644 --- a/librabbitmq/amqp_openssl.c +++ b/librabbitmq/amqp_openssl.c @@ -76,11 +76,8 @@ struct amqp_ssl_socket_t { int internal_error; }; -static ssize_t -amqp_ssl_socket_send(void *base, - const void *buf, - size_t len) -{ +static ssize_t amqp_ssl_socket_send(void *base, const void *buf, size_t len, + int flags) { struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; ssize_t res; if (-1 == self->sockfd) { diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h index 81c0b84..f803330 100644 --- a/librabbitmq/amqp_private.h +++ b/librabbitmq/amqp_private.h @@ -369,4 +369,6 @@ amqp_abort(const char *fmt, ...); int amqp_bytes_equal(amqp_bytes_t r, amqp_bytes_t l); +int amqp_send_frame_inner(amqp_connection_state_t state, + const amqp_frame_t *frame, int flags); #endif diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index e33adf3..964782e 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -201,11 +201,11 @@ amqp_os_socket_close(int sockfd) } ssize_t -amqp_socket_send(amqp_socket_t *self, const void *buf, size_t len) +amqp_socket_send(amqp_socket_t *self, const void *buf, size_t len, int flags) { assert(self); assert(self->klass->send); - return self->klass->send(self, buf, len); + return self->klass->send(self, buf, len, flags); } ssize_t @@ -320,14 +320,14 @@ static ssize_t do_poll(amqp_connection_state_t state, ssize_t res, } ssize_t amqp_try_send(amqp_connection_state_t state, const void *buf, - size_t len, amqp_time_t deadline) { + size_t len, amqp_time_t deadline, int flags) { ssize_t res; void* buf_left = (void*)buf; /* Assume that len is not going to be larger than ssize_t can hold. */ ssize_t len_left = (size_t)len; start_send: - res = amqp_socket_send(state->socket, buf_left, len_left); + res = amqp_socket_send(state->socket, buf_left, len_left, flags); if (res > 0) { len_left -= res; @@ -493,7 +493,8 @@ int amqp_send_header(amqp_connection_state_t state) AMQP_PROTOCOL_VERSION_MINOR, AMQP_PROTOCOL_VERSION_REVISION }; - res = amqp_try_send(state, header, sizeof(header), amqp_time_infinite()); + res = amqp_try_send(state, header, sizeof(header), amqp_time_infinite(), + AMQP_SF_NONE); if (sizeof(header) == res) { return AMQP_STATUS_OK; } @@ -966,18 +967,21 @@ int amqp_simple_wait_method(amqp_connection_state_t state, output); } -int amqp_send_method(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_method_number_t id, - void *decoded) -{ +int amqp_send_method(amqp_connection_state_t state, amqp_channel_t channel, + amqp_method_number_t id, void *decoded) { + return amqp_send_method_inner(state, channel, id, decoded, AMQP_SF_NONE); +} + +int amqp_send_method_inner(amqp_connection_state_t state, + amqp_channel_t channel, amqp_method_number_t id, + void *decoded, int flags) { amqp_frame_t frame; frame.frame_type = AMQP_FRAME_METHOD; frame.channel = channel; frame.payload.method.id = id; frame.payload.method.decoded = decoded; - return amqp_send_frame(state, &frame); + return amqp_send_frame_inner(state, &frame, flags); } static int amqp_id_in_reply_list( amqp_method_number_t expected, amqp_method_number_t *list ) diff --git a/librabbitmq/amqp_socket.h b/librabbitmq/amqp_socket.h index fee7d7b..45c2f1b 100644 --- a/librabbitmq/amqp_socket.h +++ b/librabbitmq/amqp_socket.h @@ -37,6 +37,11 @@ AMQP_BEGIN_DECLS +typedef enum { + AMQP_SF_NONE = 0, + AMQP_SF_MORE = 1 +} amqp_socket_flag_enum; + int amqp_os_socket_error(void); @@ -44,7 +49,7 @@ int amqp_os_socket_close(int sockfd); /* Socket callbacks. */ -typedef ssize_t (*amqp_socket_send_fn)(void *, const void *, size_t); +typedef ssize_t (*amqp_socket_send_fn)(void *, const void *, size_t, int); typedef ssize_t (*amqp_socket_recv_fn)(void *, void *, size_t, int); typedef int (*amqp_socket_open_fn)(void *, const char *, int, struct timeval *); typedef int (*amqp_socket_close_fn)(void *); @@ -91,14 +96,15 @@ amqp_set_socket(amqp_connection_state_t state, amqp_socket_t *socket); * \param [in,out] self A socket object. * \param [in] buf A buffer to read from. * \param [in] len The number of bytes in \e buf. + * \param [in] * * \return AMQP_STATUS_OK on success. amqp_status_enum value otherwise */ ssize_t -amqp_socket_send(amqp_socket_t *self, const void *buf, size_t len); +amqp_socket_send(amqp_socket_t *self, const void *buf, size_t len, int flags); ssize_t amqp_try_send(amqp_connection_state_t state, const void *buf, - size_t len, amqp_time_t deadline); + size_t len, amqp_time_t deadline, int flags); /** * Receive a message from a socket. @@ -163,6 +169,9 @@ int amqp_poll_read(int fd, amqp_time_t deadline); /* Wait up to deadline for fd to become writeable */ int amqp_poll_write(int fd, amqp_time_t deadline); +int amqp_send_method_inner(amqp_connection_state_t state, + amqp_channel_t channel, amqp_method_number_t id, + void *decoded, int flags); int amqp_queue_frame(amqp_connection_state_t state, amqp_frame_t *frame); diff --git a/librabbitmq/amqp_tcp_socket.c b/librabbitmq/amqp_tcp_socket.c index e14cf38..e363b90 100644 --- a/librabbitmq/amqp_tcp_socket.c +++ b/librabbitmq/amqp_tcp_socket.c @@ -42,25 +42,31 @@ struct amqp_tcp_socket_t { static ssize_t -amqp_tcp_socket_send(void *base, const void *buf, size_t len) +amqp_tcp_socket_send(void *base, const void *buf, size_t len, int flags) { struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; ssize_t res; - int flags = 0; + int flagz = 0; if (-1 == self->sockfd) { return AMQP_STATUS_SOCKET_CLOSED; } #ifdef MSG_NOSIGNAL - flags |= MSG_NOSIGNAL; + flagz |= MSG_NOSIGNAL; +#endif + +#if defined(MSG_MORE) + if (flags & AMQP_SF_MORE) { + flagz |= MSG_MORE; + } #endif start: #ifdef _WIN32 - res = send(self->sockfd, buf, (int)len, flags); + res = send(self->sockfd, buf, (int)len, flagz); #else - res = send(self->sockfd, buf, len, flags); + res = send(self->sockfd, buf, len, flagz); #endif if (res < 0) { |