summaryrefslogtreecommitdiff
path: root/librabbitmq/amqp_socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'librabbitmq/amqp_socket.c')
-rw-r--r--librabbitmq/amqp_socket.c26
1 files changed, 15 insertions, 11 deletions
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index e33adf3..964782e 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -201,11 +201,11 @@ amqp_os_socket_close(int sockfd)
}
ssize_t
-amqp_socket_send(amqp_socket_t *self, const void *buf, size_t len)
+amqp_socket_send(amqp_socket_t *self, const void *buf, size_t len, int flags)
{
assert(self);
assert(self->klass->send);
- return self->klass->send(self, buf, len);
+ return self->klass->send(self, buf, len, flags);
}
ssize_t
@@ -320,14 +320,14 @@ static ssize_t do_poll(amqp_connection_state_t state, ssize_t res,
}
ssize_t amqp_try_send(amqp_connection_state_t state, const void *buf,
- size_t len, amqp_time_t deadline) {
+ size_t len, amqp_time_t deadline, int flags) {
ssize_t res;
void* buf_left = (void*)buf;
/* Assume that len is not going to be larger than ssize_t can hold. */
ssize_t len_left = (size_t)len;
start_send:
- res = amqp_socket_send(state->socket, buf_left, len_left);
+ res = amqp_socket_send(state->socket, buf_left, len_left, flags);
if (res > 0) {
len_left -= res;
@@ -493,7 +493,8 @@ int amqp_send_header(amqp_connection_state_t state)
AMQP_PROTOCOL_VERSION_MINOR,
AMQP_PROTOCOL_VERSION_REVISION
};
- res = amqp_try_send(state, header, sizeof(header), amqp_time_infinite());
+ res = amqp_try_send(state, header, sizeof(header), amqp_time_infinite(),
+ AMQP_SF_NONE);
if (sizeof(header) == res) {
return AMQP_STATUS_OK;
}
@@ -966,18 +967,21 @@ int amqp_simple_wait_method(amqp_connection_state_t state,
output);
}
-int amqp_send_method(amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_method_number_t id,
- void *decoded)
-{
+int amqp_send_method(amqp_connection_state_t state, amqp_channel_t channel,
+ amqp_method_number_t id, void *decoded) {
+ return amqp_send_method_inner(state, channel, id, decoded, AMQP_SF_NONE);
+}
+
+int amqp_send_method_inner(amqp_connection_state_t state,
+ amqp_channel_t channel, amqp_method_number_t id,
+ void *decoded, int flags) {
amqp_frame_t frame;
frame.frame_type = AMQP_FRAME_METHOD;
frame.channel = channel;
frame.payload.method.id = id;
frame.payload.method.decoded = decoded;
- return amqp_send_frame(state, &frame);
+ return amqp_send_frame_inner(state, &frame, flags);
}
static int amqp_id_in_reply_list( amqp_method_number_t expected, amqp_method_number_t *list )