diff options
author | Alan Antonuk <alan.antonuk@gmail.com> | 2015-05-03 21:57:28 -0700 |
---|---|---|
committer | Alan Antonuk <alan.antonuk@gmail.com> | 2015-05-03 21:57:28 -0700 |
commit | 61fc4e190c847621688088912c240d7965ab02cd (patch) | |
tree | 0cd88f4d627557cb845ef38ff5e8965fa9b5f40a /librabbitmq/amqp_connection.c | |
parent | 43b6fc5ea0f36866dd79e9697e9e6a0135362e10 (diff) | |
download | rabbitmq-c-timer_refactor.tar.gz |
Check for heartbeats when in blocking send.timer_refactor
Check for recv heartbeats when blocking on sending to the socket. A blocked send
can indicate that the broker is applying backpressure to publishers, it could
also be that the TCP connection is dead, checking for recv heartbeats can give
an earlier indication that the connection is broken.
Diffstat (limited to 'librabbitmq/amqp_connection.c')
-rw-r--r-- | librabbitmq/amqp_connection.c | 25 |
1 files changed, 22 insertions, 3 deletions
diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c index 7168bb5..54040ea 100644 --- a/librabbitmq/amqp_connection.c +++ b/librabbitmq/amqp_connection.c @@ -533,6 +533,7 @@ int amqp_send_frame(amqp_connection_state_t state, { int res; + ssize_t sent; amqp_bytes_t encoded; res = amqp_frame_to_bytes(frame, state->outbound_buffer, &encoded); @@ -540,9 +541,27 @@ int amqp_send_frame(amqp_connection_state_t state, return res; } - res = amqp_try_send(state, encoded.bytes, encoded.len, amqp_time_infinite()); - if (AMQP_STATUS_OK != res) { - return res; +start_send: + sent = amqp_try_send(state, encoded.bytes, encoded.len, + state->next_recv_heartbeat); + if (0 > sent) { + return (int)sent; + } + + /* A partial send has occurred, because of a heartbeat timeout, try and recv + * something */ + if ((ssize_t)encoded.len != sent) { + res = amqp_try_recv(state); + + if (AMQP_STATUS_TIMEOUT == res) { + return AMQP_STATUS_HEARTBEAT_TIMEOUT; + } else if (AMQP_STATUS_OK != res) { + return res; + } + + encoded.bytes = (uint8_t*)encoded.bytes + sent; + encoded.len -= sent; + goto start_send; } res = amqp_time_s_from_now(&state->next_send_heartbeat, |