diff options
-rw-r--r-- | librabbitmq/amqp_connection.c | 5 | ||||
-rw-r--r-- | librabbitmq/amqp_socket.c | 19 | ||||
-rw-r--r-- | librabbitmq/amqp_socket.h | 4 |
3 files changed, 8 insertions, 20 deletions
diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c index 9175f95..5760502 100644 --- a/librabbitmq/amqp_connection.c +++ b/librabbitmq/amqp_connection.c @@ -483,7 +483,7 @@ int amqp_send_frame(amqp_connection_state_t state, iov[2].iov_base = &frame_end_byte; iov[2].iov_len = FOOTER_SIZE; - res = amqp_try_writev(state, iov, 3); + res = amqp_try_writev(state, iov, 3, amqp_time_infinite()); } else { size_t out_frame_len; amqp_bytes_t encoded; @@ -532,7 +532,8 @@ int amqp_send_frame(amqp_connection_state_t state, amqp_e32(out_frame, 3, out_frame_len); amqp_e8(out_frame, out_frame_len + HEADER_SIZE, AMQP_FRAME_END); res = amqp_try_send(state, out_frame, - out_frame_len + HEADER_SIZE + FOOTER_SIZE); + out_frame_len + HEADER_SIZE + FOOTER_SIZE, + amqp_time_infinite()); } if (AMQP_STATUS_OK != res) { return res; diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index 6785187..957b7d2 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -304,20 +304,13 @@ int amqp_poll_write(int fd, amqp_time_t deadline) { } ssize_t amqp_try_writev(amqp_connection_state_t state, struct iovec *iov, - int iovcnt) { + int iovcnt, amqp_time_t deadline) { int i; int fd; ssize_t res; struct iovec *iov_left = iov; int iovcnt_left = iovcnt; ssize_t len_left; - /* TODO(alanxz) this should probably be a parameter */ - amqp_time_t deadline; - - res = amqp_time_from_now(&deadline, NULL); - if (AMQP_STATUS_OK != res) { - return res; - } len_left = 0; for (i = 0; i < iovcnt_left; ++i) { @@ -367,18 +360,12 @@ start_send: } ssize_t amqp_try_send(amqp_connection_state_t state, const void *buf, - size_t len) { + size_t len, amqp_time_t deadline) { ssize_t res; int fd; void* buf_left = (void*)buf; /* Assume that len is going to be larger than ssize_t can hold. */ ssize_t len_left = (size_t)len; - /* TODO(alanxz) this should probably be a parameter */ - amqp_time_t deadline; - res = amqp_time_from_now(&deadline, NULL); - if (AMQP_STATUS_OK != res) { - return res; - } start_send: res = amqp_socket_send(state->socket, buf_left, len_left); @@ -543,7 +530,7 @@ int amqp_send_header(amqp_connection_state_t state) AMQP_PROTOCOL_VERSION_MINOR, AMQP_PROTOCOL_VERSION_REVISION }; - return amqp_try_send(state, header, sizeof(header)); + return amqp_try_send(state, header, sizeof(header), amqp_time_infinite()); } static amqp_bytes_t sasl_method_name(amqp_sasl_method_enum method) diff --git a/librabbitmq/amqp_socket.h b/librabbitmq/amqp_socket.h index a1983aa..813189c 100644 --- a/librabbitmq/amqp_socket.h +++ b/librabbitmq/amqp_socket.h @@ -110,7 +110,7 @@ ssize_t amqp_socket_writev(amqp_socket_t *self, struct iovec *iov, int iovcnt); ssize_t amqp_try_writev(amqp_connection_state_t state, struct iovec *iov, - int iovcnt); + int iovcnt, amqp_time_t deadline); /** * Send a message from a socket. @@ -130,7 +130,7 @@ ssize_t amqp_socket_send(amqp_socket_t *self, const void *buf, size_t len); ssize_t amqp_try_send(amqp_connection_state_t state, const void *buf, - size_t len); + size_t len, amqp_time_t deadline); /** * Receive a message from a socket. |