diff options
-rw-r--r-- | librabbitmq/amqp_connection.c | 25 | ||||
-rw-r--r-- | librabbitmq/amqp_socket.c | 12 |
2 files changed, 32 insertions, 5 deletions
diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c index 7168bb5..54040ea 100644 --- a/librabbitmq/amqp_connection.c +++ b/librabbitmq/amqp_connection.c @@ -533,6 +533,7 @@ int amqp_send_frame(amqp_connection_state_t state, { int res; + ssize_t sent; amqp_bytes_t encoded; res = amqp_frame_to_bytes(frame, state->outbound_buffer, &encoded); @@ -540,9 +541,27 @@ int amqp_send_frame(amqp_connection_state_t state, return res; } - res = amqp_try_send(state, encoded.bytes, encoded.len, amqp_time_infinite()); - if (AMQP_STATUS_OK != res) { - return res; +start_send: + sent = amqp_try_send(state, encoded.bytes, encoded.len, + state->next_recv_heartbeat); + if (0 > sent) { + return (int)sent; + } + + /* A partial send has occurred, because of a heartbeat timeout, try and recv + * something */ + if ((ssize_t)encoded.len != sent) { + res = amqp_try_recv(state); + + if (AMQP_STATUS_TIMEOUT == res) { + return AMQP_STATUS_HEARTBEAT_TIMEOUT; + } else if (AMQP_STATUS_OK != res) { + return res; + } + + encoded.bytes = (uint8_t*)encoded.bytes + sent; + encoded.len -= sent; + goto start_send; } res = amqp_time_s_from_now(&state->next_send_heartbeat, diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index d69f778..2688580 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -326,7 +326,7 @@ start_send: len_left -= res; buf_left = (char*)buf_left + res; if (0 == len_left) { - return AMQP_STATUS_OK; + return (ssize_t)len; } goto start_send; } @@ -334,6 +334,9 @@ start_send: if (AMQP_STATUS_OK == res) { goto start_send; } + if (AMQP_STATUS_TIMEOUT == res) { + return (ssize_t)len - len_left; + } return res; } @@ -464,12 +467,17 @@ int amqp_open_socket_inner(char const *hostname, int amqp_send_header(amqp_connection_state_t state) { + int res; static const uint8_t header[8] = { 'A', 'M', 'Q', 'P', 0, AMQP_PROTOCOL_VERSION_MAJOR, AMQP_PROTOCOL_VERSION_MINOR, AMQP_PROTOCOL_VERSION_REVISION }; - return amqp_try_send(state, header, sizeof(header), amqp_time_infinite()); + res = amqp_try_send(state, header, sizeof(header), amqp_time_infinite()); + if (sizeof(header) == res) { + return AMQP_STATUS_OK; + } + return res; } static amqp_bytes_t sasl_method_name(amqp_sasl_method_enum method) |