diff options
Diffstat (limited to 'librabbitmq/amqp_socket.c')
-rw-r--r-- | librabbitmq/amqp_socket.c | 26 |
1 files changed, 15 insertions, 11 deletions
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index e33adf3..964782e 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -201,11 +201,11 @@ amqp_os_socket_close(int sockfd) } ssize_t -amqp_socket_send(amqp_socket_t *self, const void *buf, size_t len) +amqp_socket_send(amqp_socket_t *self, const void *buf, size_t len, int flags) { assert(self); assert(self->klass->send); - return self->klass->send(self, buf, len); + return self->klass->send(self, buf, len, flags); } ssize_t @@ -320,14 +320,14 @@ static ssize_t do_poll(amqp_connection_state_t state, ssize_t res, } ssize_t amqp_try_send(amqp_connection_state_t state, const void *buf, - size_t len, amqp_time_t deadline) { + size_t len, amqp_time_t deadline, int flags) { ssize_t res; void* buf_left = (void*)buf; /* Assume that len is not going to be larger than ssize_t can hold. */ ssize_t len_left = (size_t)len; start_send: - res = amqp_socket_send(state->socket, buf_left, len_left); + res = amqp_socket_send(state->socket, buf_left, len_left, flags); if (res > 0) { len_left -= res; @@ -493,7 +493,8 @@ int amqp_send_header(amqp_connection_state_t state) AMQP_PROTOCOL_VERSION_MINOR, AMQP_PROTOCOL_VERSION_REVISION }; - res = amqp_try_send(state, header, sizeof(header), amqp_time_infinite()); + res = amqp_try_send(state, header, sizeof(header), amqp_time_infinite(), + AMQP_SF_NONE); if (sizeof(header) == res) { return AMQP_STATUS_OK; } @@ -966,18 +967,21 @@ int amqp_simple_wait_method(amqp_connection_state_t state, output); } -int amqp_send_method(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_method_number_t id, - void *decoded) -{ +int amqp_send_method(amqp_connection_state_t state, amqp_channel_t channel, + amqp_method_number_t id, void *decoded) { + return amqp_send_method_inner(state, channel, id, decoded, AMQP_SF_NONE); +} + +int amqp_send_method_inner(amqp_connection_state_t state, + amqp_channel_t channel, amqp_method_number_t id, + void *decoded, int flags) { amqp_frame_t frame; frame.frame_type = AMQP_FRAME_METHOD; frame.channel = channel; frame.payload.method.id = id; frame.payload.method.decoded = decoded; - return amqp_send_frame(state, &frame); + return amqp_send_frame_inner(state, &frame, flags); } static int amqp_id_in_reply_list( amqp_method_number_t expected, amqp_method_number_t *list ) |