diff options
author | Alan Antonuk <alan.antonuk@gmail.com> | 2016-10-14 21:06:05 -0700 |
---|---|---|
committer | Alan Antonuk <alan.antonuk@gmail.com> | 2016-10-14 21:06:05 -0700 |
commit | b77ecb92ad18e6381eebcdceaef9f55a246a07f1 (patch) | |
tree | 6c91f13b08c3b8180c5cd4ea7e1883a18aee84ad | |
parent | d7fcefe56f526b1a996746433220ddf0b9566789 (diff) | |
download | rabbitmq-c-acks_examples.tar.gz |
Add publish-confirms, and ack examplesacks_examples
Modify amqp_sendstring to demonstrate how to use publisher confirms.
Modify amqp_listen to explicitly acknowledge delivered message (instead
of using no_ack on the consumer).
-rw-r--r-- | examples/amqp_listen.c | 10 | ||||
-rw-r--r-- | examples/amqp_sendstring.c | 22 |
2 files changed, 31 insertions, 1 deletions
diff --git a/examples/amqp_listen.c b/examples/amqp_listen.c index b452cb5..6273b06 100644 --- a/examples/amqp_listen.c +++ b/examples/amqp_listen.c @@ -100,7 +100,10 @@ int main(int argc, char const *const *argv) amqp_empty_table); die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue"); - amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 1, 0, amqp_empty_table); + amqp_basic_qos(conn, 1, 0, 1, 0); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Setting QoS"); + + amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 0, 0, amqp_empty_table); die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming"); { @@ -130,6 +133,11 @@ int main(int argc, char const *const *argv) amqp_dump(envelope.message.body.bytes, envelope.message.body.len); + status = amqp_basic_ack(conn, 1, envelope.delivery_tag, 0); + if (AMQP_STATUS_OK != status) { + die("failed to acknowledge message"); + } + amqp_destroy_envelope(&envelope); } } diff --git a/examples/amqp_sendstring.c b/examples/amqp_sendstring.c index bc48054..bd3f591 100644 --- a/examples/amqp_sendstring.c +++ b/examples/amqp_sendstring.c @@ -34,6 +34,7 @@ * ***** END LICENSE BLOCK ***** */ +#include <inttypes.h> #include <stdlib.h> #include <stdio.h> #include <string.h> @@ -84,6 +85,12 @@ int main(int argc, char const *const *argv) die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); { + amqp_confirm_select_ok_t* confirm = amqp_confirm_select(conn, 1); + if (confirm == NULL) { + die_on_amqp_error(amqp_get_rpc_reply(conn), "enabling confirms"); + } + } + { amqp_basic_properties_t props; props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG; props.content_type = amqp_cstring_bytes("text/plain"); @@ -98,6 +105,21 @@ int main(int argc, char const *const *argv) amqp_cstring_bytes(messagebody)), "Publishing"); } + { + amqp_frame_t ack_frame; + amqp_basic_ack_t *ack_method; + status = amqp_simple_wait_frame(conn, &ack_frame); + if (status != AMQP_STATUS_OK) { + die_on_error(status, "Waiting for ack"); + } + if (ack_frame.channel != 1 || ack_frame.frame_type != AMQP_FRAME_METHOD || + ack_frame.payload.method.id != AMQP_BASIC_ACK_METHOD) { + die("received unexpected method"); + } + ack_method = (amqp_basic_ack_t *)ack_frame.payload.method.decoded; + printf("Ack received for envelope: %" PRIu64 " multiple %d\n", + ack_method->delivery_tag, ack_method->multiple); + } die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); |