summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--librabbitmq/amqp_connection.c5
-rw-r--r--librabbitmq/amqp_socket.c19
-rw-r--r--librabbitmq/amqp_socket.h4
3 files changed, 8 insertions, 20 deletions
diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c
index 9175f95..5760502 100644
--- a/librabbitmq/amqp_connection.c
+++ b/librabbitmq/amqp_connection.c
@@ -483,7 +483,7 @@ int amqp_send_frame(amqp_connection_state_t state,
iov[2].iov_base = &frame_end_byte;
iov[2].iov_len = FOOTER_SIZE;
- res = amqp_try_writev(state, iov, 3);
+ res = amqp_try_writev(state, iov, 3, amqp_time_infinite());
} else {
size_t out_frame_len;
amqp_bytes_t encoded;
@@ -532,7 +532,8 @@ int amqp_send_frame(amqp_connection_state_t state,
amqp_e32(out_frame, 3, out_frame_len);
amqp_e8(out_frame, out_frame_len + HEADER_SIZE, AMQP_FRAME_END);
res = amqp_try_send(state, out_frame,
- out_frame_len + HEADER_SIZE + FOOTER_SIZE);
+ out_frame_len + HEADER_SIZE + FOOTER_SIZE,
+ amqp_time_infinite());
}
if (AMQP_STATUS_OK != res) {
return res;
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index 6785187..957b7d2 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -304,20 +304,13 @@ int amqp_poll_write(int fd, amqp_time_t deadline) {
}
ssize_t amqp_try_writev(amqp_connection_state_t state, struct iovec *iov,
- int iovcnt) {
+ int iovcnt, amqp_time_t deadline) {
int i;
int fd;
ssize_t res;
struct iovec *iov_left = iov;
int iovcnt_left = iovcnt;
ssize_t len_left;
- /* TODO(alanxz) this should probably be a parameter */
- amqp_time_t deadline;
-
- res = amqp_time_from_now(&deadline, NULL);
- if (AMQP_STATUS_OK != res) {
- return res;
- }
len_left = 0;
for (i = 0; i < iovcnt_left; ++i) {
@@ -367,18 +360,12 @@ start_send:
}
ssize_t amqp_try_send(amqp_connection_state_t state, const void *buf,
- size_t len) {
+ size_t len, amqp_time_t deadline) {
ssize_t res;
int fd;
void* buf_left = (void*)buf;
/* Assume that len is going to be larger than ssize_t can hold. */
ssize_t len_left = (size_t)len;
- /* TODO(alanxz) this should probably be a parameter */
- amqp_time_t deadline;
- res = amqp_time_from_now(&deadline, NULL);
- if (AMQP_STATUS_OK != res) {
- return res;
- }
start_send:
res = amqp_socket_send(state->socket, buf_left, len_left);
@@ -543,7 +530,7 @@ int amqp_send_header(amqp_connection_state_t state)
AMQP_PROTOCOL_VERSION_MINOR,
AMQP_PROTOCOL_VERSION_REVISION
};
- return amqp_try_send(state, header, sizeof(header));
+ return amqp_try_send(state, header, sizeof(header), amqp_time_infinite());
}
static amqp_bytes_t sasl_method_name(amqp_sasl_method_enum method)
diff --git a/librabbitmq/amqp_socket.h b/librabbitmq/amqp_socket.h
index a1983aa..813189c 100644
--- a/librabbitmq/amqp_socket.h
+++ b/librabbitmq/amqp_socket.h
@@ -110,7 +110,7 @@ ssize_t
amqp_socket_writev(amqp_socket_t *self, struct iovec *iov, int iovcnt);
ssize_t amqp_try_writev(amqp_connection_state_t state, struct iovec *iov,
- int iovcnt);
+ int iovcnt, amqp_time_t deadline);
/**
* Send a message from a socket.
@@ -130,7 +130,7 @@ ssize_t
amqp_socket_send(amqp_socket_t *self, const void *buf, size_t len);
ssize_t amqp_try_send(amqp_connection_state_t state, const void *buf,
- size_t len);
+ size_t len, amqp_time_t deadline);
/**
* Receive a message from a socket.