diff options
-rw-r--r-- | examples/amqp_listen.c | 10 | ||||
-rw-r--r-- | examples/amqp_sendstring.c | 22 |
2 files changed, 31 insertions, 1 deletions
diff --git a/examples/amqp_listen.c b/examples/amqp_listen.c index b452cb5..6273b06 100644 --- a/examples/amqp_listen.c +++ b/examples/amqp_listen.c @@ -100,7 +100,10 @@ int main(int argc, char const *const *argv) amqp_empty_table); die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue"); - amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 1, 0, amqp_empty_table); + amqp_basic_qos(conn, 1, 0, 1, 0); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Setting QoS"); + + amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 0, 0, amqp_empty_table); die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming"); { @@ -130,6 +133,11 @@ int main(int argc, char const *const *argv) amqp_dump(envelope.message.body.bytes, envelope.message.body.len); + status = amqp_basic_ack(conn, 1, envelope.delivery_tag, 0); + if (AMQP_STATUS_OK != status) { + die("failed to acknowledge message"); + } + amqp_destroy_envelope(&envelope); } } diff --git a/examples/amqp_sendstring.c b/examples/amqp_sendstring.c index bc48054..bd3f591 100644 --- a/examples/amqp_sendstring.c +++ b/examples/amqp_sendstring.c @@ -34,6 +34,7 @@ * ***** END LICENSE BLOCK ***** */ +#include <inttypes.h> #include <stdlib.h> #include <stdio.h> #include <string.h> @@ -84,6 +85,12 @@ int main(int argc, char const *const *argv) die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); { + amqp_confirm_select_ok_t* confirm = amqp_confirm_select(conn, 1); + if (confirm == NULL) { + die_on_amqp_error(amqp_get_rpc_reply(conn), "enabling confirms"); + } + } + { amqp_basic_properties_t props; props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG; props.content_type = amqp_cstring_bytes("text/plain"); @@ -98,6 +105,21 @@ int main(int argc, char const *const *argv) amqp_cstring_bytes(messagebody)), "Publishing"); } + { + amqp_frame_t ack_frame; + amqp_basic_ack_t *ack_method; + status = amqp_simple_wait_frame(conn, &ack_frame); + if (status != AMQP_STATUS_OK) { + die_on_error(status, "Waiting for ack"); + } + if (ack_frame.channel != 1 || ack_frame.frame_type != AMQP_FRAME_METHOD || + ack_frame.payload.method.id != AMQP_BASIC_ACK_METHOD) { + die("received unexpected method"); + } + ack_method = (amqp_basic_ack_t *)ack_frame.payload.method.decoded; + printf("Ack received for envelope: %" PRIu64 " multiple %d\n", + ack_method->delivery_tag, ack_method->multiple); + } die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); |