summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Antonuk <alan.antonuk@gmail.com>2015-05-23 23:56:51 -0700
committerAlan Antonuk <alan.antonuk@gmail.com>2015-05-31 22:57:38 -0700
commit2bc1f9b1b03c217512ce7026f76976cdcb8cc17e (patch)
treeec9d45fc68e1f1dedb570b1694df34738c8cbb04
parented363ad6db549e9d3fa57f7ca761032eecb2a1a1 (diff)
downloadrabbitmq-c-2bc1f9b1b03c217512ce7026f76976cdcb8cc17e.tar.gz
lib: use MSG_MORE on Linux for basic.publish
As a performance optimization in the send path, add an AMQP_SF_MORE flag indicating that more data is intended to be sent, and that packets shouldn't be sent out on the wire unless there is a full packet's worth of data available. Use this to specify MSG_MORE to send() on Linux.
-rw-r--r--librabbitmq/amqp_api.c10
-rw-r--r--librabbitmq/amqp_connection.c14
-rw-r--r--librabbitmq/amqp_openssl.c7
-rw-r--r--librabbitmq/amqp_private.h2
-rw-r--r--librabbitmq/amqp_socket.c26
-rw-r--r--librabbitmq/amqp_socket.h15
-rw-r--r--librabbitmq/amqp_tcp_socket.c16
7 files changed, 60 insertions, 30 deletions
diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c
index 7f29eee..83d18fa 100644
--- a/librabbitmq/amqp_api.c
+++ b/librabbitmq/amqp_api.c
@@ -212,7 +212,8 @@ int amqp_basic_publish(amqp_connection_state_t state,
}
}
- res = amqp_send_method(state, channel, AMQP_BASIC_PUBLISH_METHOD, &m);
+ res = amqp_send_method_inner(state, channel, AMQP_BASIC_PUBLISH_METHOD, &m,
+ AMQP_SF_MORE);
if (res < 0) {
return res;
}
@@ -228,7 +229,7 @@ int amqp_basic_publish(amqp_connection_state_t state,
f.payload.properties.body_size = body.len;
f.payload.properties.decoded = (void *) properties;
- res = amqp_send_frame(state, &f);
+ res = amqp_send_frame_inner(state, &f, AMQP_SF_MORE);
if (res < 0) {
return res;
}
@@ -236,6 +237,7 @@ int amqp_basic_publish(amqp_connection_state_t state,
body_offset = 0;
while (body_offset < body.len) {
size_t remaining = body.len - body_offset;
+ int flagz;
if (remaining == 0) {
break;
@@ -246,12 +248,14 @@ int amqp_basic_publish(amqp_connection_state_t state,
f.payload.body_fragment.bytes = amqp_offset(body.bytes, body_offset);
if (remaining >= usable_body_payload_size) {
f.payload.body_fragment.len = usable_body_payload_size;
+ flagz = AMQP_SF_MORE;
} else {
f.payload.body_fragment.len = remaining;
+ flagz = AMQP_SF_NONE;
}
body_offset += f.payload.body_fragment.len;
- res = amqp_send_frame(state, &f);
+ res = amqp_send_frame_inner(state, &f, flagz);
if (res < 0) {
return res;
}
diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c
index d2a0d41..2b8c86a 100644
--- a/librabbitmq/amqp_connection.c
+++ b/librabbitmq/amqp_connection.c
@@ -533,13 +533,21 @@ static int amqp_frame_to_bytes(const amqp_frame_t *frame, amqp_bytes_t buffer,
}
int amqp_send_frame(amqp_connection_state_t state,
- const amqp_frame_t *frame)
-{
+ const amqp_frame_t *frame) {
+ return amqp_send_frame_inner(state, frame, AMQP_SF_NONE);
+}
+int amqp_send_frame_inner(amqp_connection_state_t state,
+ const amqp_frame_t *frame, int flags) {
int res;
ssize_t sent;
amqp_bytes_t encoded;
+ /* TODO: if the AMQP_SF_MORE socket optimization can be shown to work
+ * correctly, then this could be un-done so that body-frames are sent as 3
+ * send calls, getting rid of the copy of the body content, some testing
+ * would need to be done to see if this would actually a win for performance.
+ * */
res = amqp_frame_to_bytes(frame, state->outbound_buffer, &encoded);
if (AMQP_STATUS_OK != res) {
return res;
@@ -547,7 +555,7 @@ int amqp_send_frame(amqp_connection_state_t state,
start_send:
sent = amqp_try_send(state, encoded.bytes, encoded.len,
- state->next_recv_heartbeat);
+ state->next_recv_heartbeat, flags);
if (0 > sent) {
return (int)sent;
}
diff --git a/librabbitmq/amqp_openssl.c b/librabbitmq/amqp_openssl.c
index c3c6294..76c0d56 100644
--- a/librabbitmq/amqp_openssl.c
+++ b/librabbitmq/amqp_openssl.c
@@ -76,11 +76,8 @@ struct amqp_ssl_socket_t {
int internal_error;
};
-static ssize_t
-amqp_ssl_socket_send(void *base,
- const void *buf,
- size_t len)
-{
+static ssize_t amqp_ssl_socket_send(void *base, const void *buf, size_t len,
+ int flags) {
struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
ssize_t res;
if (-1 == self->sockfd) {
diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h
index 81c0b84..f803330 100644
--- a/librabbitmq/amqp_private.h
+++ b/librabbitmq/amqp_private.h
@@ -369,4 +369,6 @@ amqp_abort(const char *fmt, ...);
int amqp_bytes_equal(amqp_bytes_t r, amqp_bytes_t l);
+int amqp_send_frame_inner(amqp_connection_state_t state,
+ const amqp_frame_t *frame, int flags);
#endif
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index e33adf3..964782e 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -201,11 +201,11 @@ amqp_os_socket_close(int sockfd)
}
ssize_t
-amqp_socket_send(amqp_socket_t *self, const void *buf, size_t len)
+amqp_socket_send(amqp_socket_t *self, const void *buf, size_t len, int flags)
{
assert(self);
assert(self->klass->send);
- return self->klass->send(self, buf, len);
+ return self->klass->send(self, buf, len, flags);
}
ssize_t
@@ -320,14 +320,14 @@ static ssize_t do_poll(amqp_connection_state_t state, ssize_t res,
}
ssize_t amqp_try_send(amqp_connection_state_t state, const void *buf,
- size_t len, amqp_time_t deadline) {
+ size_t len, amqp_time_t deadline, int flags) {
ssize_t res;
void* buf_left = (void*)buf;
/* Assume that len is not going to be larger than ssize_t can hold. */
ssize_t len_left = (size_t)len;
start_send:
- res = amqp_socket_send(state->socket, buf_left, len_left);
+ res = amqp_socket_send(state->socket, buf_left, len_left, flags);
if (res > 0) {
len_left -= res;
@@ -493,7 +493,8 @@ int amqp_send_header(amqp_connection_state_t state)
AMQP_PROTOCOL_VERSION_MINOR,
AMQP_PROTOCOL_VERSION_REVISION
};
- res = amqp_try_send(state, header, sizeof(header), amqp_time_infinite());
+ res = amqp_try_send(state, header, sizeof(header), amqp_time_infinite(),
+ AMQP_SF_NONE);
if (sizeof(header) == res) {
return AMQP_STATUS_OK;
}
@@ -966,18 +967,21 @@ int amqp_simple_wait_method(amqp_connection_state_t state,
output);
}
-int amqp_send_method(amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_method_number_t id,
- void *decoded)
-{
+int amqp_send_method(amqp_connection_state_t state, amqp_channel_t channel,
+ amqp_method_number_t id, void *decoded) {
+ return amqp_send_method_inner(state, channel, id, decoded, AMQP_SF_NONE);
+}
+
+int amqp_send_method_inner(amqp_connection_state_t state,
+ amqp_channel_t channel, amqp_method_number_t id,
+ void *decoded, int flags) {
amqp_frame_t frame;
frame.frame_type = AMQP_FRAME_METHOD;
frame.channel = channel;
frame.payload.method.id = id;
frame.payload.method.decoded = decoded;
- return amqp_send_frame(state, &frame);
+ return amqp_send_frame_inner(state, &frame, flags);
}
static int amqp_id_in_reply_list( amqp_method_number_t expected, amqp_method_number_t *list )
diff --git a/librabbitmq/amqp_socket.h b/librabbitmq/amqp_socket.h
index fee7d7b..45c2f1b 100644
--- a/librabbitmq/amqp_socket.h
+++ b/librabbitmq/amqp_socket.h
@@ -37,6 +37,11 @@
AMQP_BEGIN_DECLS
+typedef enum {
+ AMQP_SF_NONE = 0,
+ AMQP_SF_MORE = 1
+} amqp_socket_flag_enum;
+
int
amqp_os_socket_error(void);
@@ -44,7 +49,7 @@ int
amqp_os_socket_close(int sockfd);
/* Socket callbacks. */
-typedef ssize_t (*amqp_socket_send_fn)(void *, const void *, size_t);
+typedef ssize_t (*amqp_socket_send_fn)(void *, const void *, size_t, int);
typedef ssize_t (*amqp_socket_recv_fn)(void *, void *, size_t, int);
typedef int (*amqp_socket_open_fn)(void *, const char *, int, struct timeval *);
typedef int (*amqp_socket_close_fn)(void *);
@@ -91,14 +96,15 @@ amqp_set_socket(amqp_connection_state_t state, amqp_socket_t *socket);
* \param [in,out] self A socket object.
* \param [in] buf A buffer to read from.
* \param [in] len The number of bytes in \e buf.
+ * \param [in]
*
* \return AMQP_STATUS_OK on success. amqp_status_enum value otherwise
*/
ssize_t
-amqp_socket_send(amqp_socket_t *self, const void *buf, size_t len);
+amqp_socket_send(amqp_socket_t *self, const void *buf, size_t len, int flags);
ssize_t amqp_try_send(amqp_connection_state_t state, const void *buf,
- size_t len, amqp_time_t deadline);
+ size_t len, amqp_time_t deadline, int flags);
/**
* Receive a message from a socket.
@@ -163,6 +169,9 @@ int amqp_poll_read(int fd, amqp_time_t deadline);
/* Wait up to deadline for fd to become writeable */
int amqp_poll_write(int fd, amqp_time_t deadline);
+int amqp_send_method_inner(amqp_connection_state_t state,
+ amqp_channel_t channel, amqp_method_number_t id,
+ void *decoded, int flags);
int
amqp_queue_frame(amqp_connection_state_t state, amqp_frame_t *frame);
diff --git a/librabbitmq/amqp_tcp_socket.c b/librabbitmq/amqp_tcp_socket.c
index e14cf38..e363b90 100644
--- a/librabbitmq/amqp_tcp_socket.c
+++ b/librabbitmq/amqp_tcp_socket.c
@@ -42,25 +42,31 @@ struct amqp_tcp_socket_t {
static ssize_t
-amqp_tcp_socket_send(void *base, const void *buf, size_t len)
+amqp_tcp_socket_send(void *base, const void *buf, size_t len, int flags)
{
struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
ssize_t res;
- int flags = 0;
+ int flagz = 0;
if (-1 == self->sockfd) {
return AMQP_STATUS_SOCKET_CLOSED;
}
#ifdef MSG_NOSIGNAL
- flags |= MSG_NOSIGNAL;
+ flagz |= MSG_NOSIGNAL;
+#endif
+
+#if defined(MSG_MORE)
+ if (flags & AMQP_SF_MORE) {
+ flagz |= MSG_MORE;
+ }
#endif
start:
#ifdef _WIN32
- res = send(self->sockfd, buf, (int)len, flags);
+ res = send(self->sockfd, buf, (int)len, flagz);
#else
- res = send(self->sockfd, buf, len, flags);
+ res = send(self->sockfd, buf, len, flagz);
#endif
if (res < 0) {