summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--librabbitmq/amqp_connection.c25
-rw-r--r--librabbitmq/amqp_socket.c12
2 files changed, 32 insertions, 5 deletions
diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c
index 7168bb5..54040ea 100644
--- a/librabbitmq/amqp_connection.c
+++ b/librabbitmq/amqp_connection.c
@@ -533,6 +533,7 @@ int amqp_send_frame(amqp_connection_state_t state,
{
int res;
+ ssize_t sent;
amqp_bytes_t encoded;
res = amqp_frame_to_bytes(frame, state->outbound_buffer, &encoded);
@@ -540,9 +541,27 @@ int amqp_send_frame(amqp_connection_state_t state,
return res;
}
- res = amqp_try_send(state, encoded.bytes, encoded.len, amqp_time_infinite());
- if (AMQP_STATUS_OK != res) {
- return res;
+start_send:
+ sent = amqp_try_send(state, encoded.bytes, encoded.len,
+ state->next_recv_heartbeat);
+ if (0 > sent) {
+ return (int)sent;
+ }
+
+ /* A partial send has occurred, because of a heartbeat timeout, try and recv
+ * something */
+ if ((ssize_t)encoded.len != sent) {
+ res = amqp_try_recv(state);
+
+ if (AMQP_STATUS_TIMEOUT == res) {
+ return AMQP_STATUS_HEARTBEAT_TIMEOUT;
+ } else if (AMQP_STATUS_OK != res) {
+ return res;
+ }
+
+ encoded.bytes = (uint8_t*)encoded.bytes + sent;
+ encoded.len -= sent;
+ goto start_send;
}
res = amqp_time_s_from_now(&state->next_send_heartbeat,
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index d69f778..2688580 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -326,7 +326,7 @@ start_send:
len_left -= res;
buf_left = (char*)buf_left + res;
if (0 == len_left) {
- return AMQP_STATUS_OK;
+ return (ssize_t)len;
}
goto start_send;
}
@@ -334,6 +334,9 @@ start_send:
if (AMQP_STATUS_OK == res) {
goto start_send;
}
+ if (AMQP_STATUS_TIMEOUT == res) {
+ return (ssize_t)len - len_left;
+ }
return res;
}
@@ -464,12 +467,17 @@ int amqp_open_socket_inner(char const *hostname,
int amqp_send_header(amqp_connection_state_t state)
{
+ int res;
static const uint8_t header[8] = { 'A', 'M', 'Q', 'P', 0,
AMQP_PROTOCOL_VERSION_MAJOR,
AMQP_PROTOCOL_VERSION_MINOR,
AMQP_PROTOCOL_VERSION_REVISION
};
- return amqp_try_send(state, header, sizeof(header), amqp_time_infinite());
+ res = amqp_try_send(state, header, sizeof(header), amqp_time_infinite());
+ if (sizeof(header) == res) {
+ return AMQP_STATUS_OK;
+ }
+ return res;
}
static amqp_bytes_t sasl_method_name(amqp_sasl_method_enum method)