summaryrefslogtreecommitdiff
path: root/librabbitmq/amqp_api.c
diff options
context:
space:
mode:
Diffstat (limited to 'librabbitmq/amqp_api.c')
-rw-r--r--librabbitmq/amqp_api.c10
1 files changed, 7 insertions, 3 deletions
diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c
index 7f29eee..83d18fa 100644
--- a/librabbitmq/amqp_api.c
+++ b/librabbitmq/amqp_api.c
@@ -212,7 +212,8 @@ int amqp_basic_publish(amqp_connection_state_t state,
}
}
- res = amqp_send_method(state, channel, AMQP_BASIC_PUBLISH_METHOD, &m);
+ res = amqp_send_method_inner(state, channel, AMQP_BASIC_PUBLISH_METHOD, &m,
+ AMQP_SF_MORE);
if (res < 0) {
return res;
}
@@ -228,7 +229,7 @@ int amqp_basic_publish(amqp_connection_state_t state,
f.payload.properties.body_size = body.len;
f.payload.properties.decoded = (void *) properties;
- res = amqp_send_frame(state, &f);
+ res = amqp_send_frame_inner(state, &f, AMQP_SF_MORE);
if (res < 0) {
return res;
}
@@ -236,6 +237,7 @@ int amqp_basic_publish(amqp_connection_state_t state,
body_offset = 0;
while (body_offset < body.len) {
size_t remaining = body.len - body_offset;
+ int flagz;
if (remaining == 0) {
break;
@@ -246,12 +248,14 @@ int amqp_basic_publish(amqp_connection_state_t state,
f.payload.body_fragment.bytes = amqp_offset(body.bytes, body_offset);
if (remaining >= usable_body_payload_size) {
f.payload.body_fragment.len = usable_body_payload_size;
+ flagz = AMQP_SF_MORE;
} else {
f.payload.body_fragment.len = remaining;
+ flagz = AMQP_SF_NONE;
}
body_offset += f.payload.body_fragment.len;
- res = amqp_send_frame(state, &f);
+ res = amqp_send_frame_inner(state, &f, flagz);
if (res < 0) {
return res;
}