summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Antonuk <alan.antonuk@gmail.com>2015-05-03 21:57:28 -0700
committerAlan Antonuk <alan.antonuk@gmail.com>2015-05-03 21:57:28 -0700
commit61fc4e190c847621688088912c240d7965ab02cd (patch)
tree0cd88f4d627557cb845ef38ff5e8965fa9b5f40a
parent43b6fc5ea0f36866dd79e9697e9e6a0135362e10 (diff)
downloadrabbitmq-c-timer_refactor.tar.gz
Check for heartbeats when in blocking send.timer_refactor
Check for recv heartbeats when blocking on sending to the socket. A blocked send can indicate that the broker is applying backpressure to publishers, it could also be that the TCP connection is dead, checking for recv heartbeats can give an earlier indication that the connection is broken.
-rw-r--r--librabbitmq/amqp_connection.c25
-rw-r--r--librabbitmq/amqp_socket.c12
2 files changed, 32 insertions, 5 deletions
diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c
index 7168bb5..54040ea 100644
--- a/librabbitmq/amqp_connection.c
+++ b/librabbitmq/amqp_connection.c
@@ -533,6 +533,7 @@ int amqp_send_frame(amqp_connection_state_t state,
{
int res;
+ ssize_t sent;
amqp_bytes_t encoded;
res = amqp_frame_to_bytes(frame, state->outbound_buffer, &encoded);
@@ -540,9 +541,27 @@ int amqp_send_frame(amqp_connection_state_t state,
return res;
}
- res = amqp_try_send(state, encoded.bytes, encoded.len, amqp_time_infinite());
- if (AMQP_STATUS_OK != res) {
- return res;
+start_send:
+ sent = amqp_try_send(state, encoded.bytes, encoded.len,
+ state->next_recv_heartbeat);
+ if (0 > sent) {
+ return (int)sent;
+ }
+
+ /* A partial send has occurred, because of a heartbeat timeout, try and recv
+ * something */
+ if ((ssize_t)encoded.len != sent) {
+ res = amqp_try_recv(state);
+
+ if (AMQP_STATUS_TIMEOUT == res) {
+ return AMQP_STATUS_HEARTBEAT_TIMEOUT;
+ } else if (AMQP_STATUS_OK != res) {
+ return res;
+ }
+
+ encoded.bytes = (uint8_t*)encoded.bytes + sent;
+ encoded.len -= sent;
+ goto start_send;
}
res = amqp_time_s_from_now(&state->next_send_heartbeat,
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index d69f778..2688580 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -326,7 +326,7 @@ start_send:
len_left -= res;
buf_left = (char*)buf_left + res;
if (0 == len_left) {
- return AMQP_STATUS_OK;
+ return (ssize_t)len;
}
goto start_send;
}
@@ -334,6 +334,9 @@ start_send:
if (AMQP_STATUS_OK == res) {
goto start_send;
}
+ if (AMQP_STATUS_TIMEOUT == res) {
+ return (ssize_t)len - len_left;
+ }
return res;
}
@@ -464,12 +467,17 @@ int amqp_open_socket_inner(char const *hostname,
int amqp_send_header(amqp_connection_state_t state)
{
+ int res;
static const uint8_t header[8] = { 'A', 'M', 'Q', 'P', 0,
AMQP_PROTOCOL_VERSION_MAJOR,
AMQP_PROTOCOL_VERSION_MINOR,
AMQP_PROTOCOL_VERSION_REVISION
};
- return amqp_try_send(state, header, sizeof(header), amqp_time_infinite());
+ res = amqp_try_send(state, header, sizeof(header), amqp_time_infinite());
+ if (sizeof(header) == res) {
+ return AMQP_STATUS_OK;
+ }
+ return res;
}
static amqp_bytes_t sasl_method_name(amqp_sasl_method_enum method)