summaryrefslogtreecommitdiff
path: root/librabbitmq/amqp_connection.c
diff options
context:
space:
mode:
authorAlan Antonuk <alan.antonuk@gmail.com>2015-05-03 21:57:28 -0700
committerAlan Antonuk <alan.antonuk@gmail.com>2015-05-03 21:57:28 -0700
commit61fc4e190c847621688088912c240d7965ab02cd (patch)
tree0cd88f4d627557cb845ef38ff5e8965fa9b5f40a /librabbitmq/amqp_connection.c
parent43b6fc5ea0f36866dd79e9697e9e6a0135362e10 (diff)
downloadrabbitmq-c-timer_refactor.tar.gz
Check for heartbeats when in blocking send.timer_refactor
Check for recv heartbeats when blocking on sending to the socket. A blocked send can indicate that the broker is applying backpressure to publishers, it could also be that the TCP connection is dead, checking for recv heartbeats can give an earlier indication that the connection is broken.
Diffstat (limited to 'librabbitmq/amqp_connection.c')
-rw-r--r--librabbitmq/amqp_connection.c25
1 files changed, 22 insertions, 3 deletions
diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c
index 7168bb5..54040ea 100644
--- a/librabbitmq/amqp_connection.c
+++ b/librabbitmq/amqp_connection.c
@@ -533,6 +533,7 @@ int amqp_send_frame(amqp_connection_state_t state,
{
int res;
+ ssize_t sent;
amqp_bytes_t encoded;
res = amqp_frame_to_bytes(frame, state->outbound_buffer, &encoded);
@@ -540,9 +541,27 @@ int amqp_send_frame(amqp_connection_state_t state,
return res;
}
- res = amqp_try_send(state, encoded.bytes, encoded.len, amqp_time_infinite());
- if (AMQP_STATUS_OK != res) {
- return res;
+start_send:
+ sent = amqp_try_send(state, encoded.bytes, encoded.len,
+ state->next_recv_heartbeat);
+ if (0 > sent) {
+ return (int)sent;
+ }
+
+ /* A partial send has occurred, because of a heartbeat timeout, try and recv
+ * something */
+ if ((ssize_t)encoded.len != sent) {
+ res = amqp_try_recv(state);
+
+ if (AMQP_STATUS_TIMEOUT == res) {
+ return AMQP_STATUS_HEARTBEAT_TIMEOUT;
+ } else if (AMQP_STATUS_OK != res) {
+ return res;
+ }
+
+ encoded.bytes = (uint8_t*)encoded.bytes + sent;
+ encoded.len -= sent;
+ goto start_send;
}
res = amqp_time_s_from_now(&state->next_send_heartbeat,