summaryrefslogtreecommitdiff
path: root/librabbitmq/amqp_connection.c
diff options
context:
space:
mode:
Diffstat (limited to 'librabbitmq/amqp_connection.c')
-rw-r--r--librabbitmq/amqp_connection.c25
1 files changed, 22 insertions, 3 deletions
diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c
index 7168bb5..54040ea 100644
--- a/librabbitmq/amqp_connection.c
+++ b/librabbitmq/amqp_connection.c
@@ -533,6 +533,7 @@ int amqp_send_frame(amqp_connection_state_t state,
{
int res;
+ ssize_t sent;
amqp_bytes_t encoded;
res = amqp_frame_to_bytes(frame, state->outbound_buffer, &encoded);
@@ -540,9 +541,27 @@ int amqp_send_frame(amqp_connection_state_t state,
return res;
}
- res = amqp_try_send(state, encoded.bytes, encoded.len, amqp_time_infinite());
- if (AMQP_STATUS_OK != res) {
- return res;
+start_send:
+ sent = amqp_try_send(state, encoded.bytes, encoded.len,
+ state->next_recv_heartbeat);
+ if (0 > sent) {
+ return (int)sent;
+ }
+
+ /* A partial send has occurred, because of a heartbeat timeout, try and recv
+ * something */
+ if ((ssize_t)encoded.len != sent) {
+ res = amqp_try_recv(state);
+
+ if (AMQP_STATUS_TIMEOUT == res) {
+ return AMQP_STATUS_HEARTBEAT_TIMEOUT;
+ } else if (AMQP_STATUS_OK != res) {
+ return res;
+ }
+
+ encoded.bytes = (uint8_t*)encoded.bytes + sent;
+ encoded.len -= sent;
+ goto start_send;
}
res = amqp_time_s_from_now(&state->next_send_heartbeat,