summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Antonuk <alan.antonuk@gmail.com>2016-10-14 21:06:05 -0700
committerAlan Antonuk <alan.antonuk@gmail.com>2016-10-14 21:06:05 -0700
commitb77ecb92ad18e6381eebcdceaef9f55a246a07f1 (patch)
tree6c91f13b08c3b8180c5cd4ea7e1883a18aee84ad
parentd7fcefe56f526b1a996746433220ddf0b9566789 (diff)
downloadrabbitmq-c-acks_examples.tar.gz
Add publish-confirms, and ack examplesacks_examples
Modify amqp_sendstring to demonstrate how to use publisher confirms. Modify amqp_listen to explicitly acknowledge delivered message (instead of using no_ack on the consumer).
-rw-r--r--examples/amqp_listen.c10
-rw-r--r--examples/amqp_sendstring.c22
2 files changed, 31 insertions, 1 deletions
diff --git a/examples/amqp_listen.c b/examples/amqp_listen.c
index b452cb5..6273b06 100644
--- a/examples/amqp_listen.c
+++ b/examples/amqp_listen.c
@@ -100,7 +100,10 @@ int main(int argc, char const *const *argv)
amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue");
- amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
+ amqp_basic_qos(conn, 1, 0, 1, 0);
+ die_on_amqp_error(amqp_get_rpc_reply(conn), "Setting QoS");
+
+ amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 0, 0, amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");
{
@@ -130,6 +133,11 @@ int main(int argc, char const *const *argv)
amqp_dump(envelope.message.body.bytes, envelope.message.body.len);
+ status = amqp_basic_ack(conn, 1, envelope.delivery_tag, 0);
+ if (AMQP_STATUS_OK != status) {
+ die("failed to acknowledge message");
+ }
+
amqp_destroy_envelope(&envelope);
}
}
diff --git a/examples/amqp_sendstring.c b/examples/amqp_sendstring.c
index bc48054..bd3f591 100644
--- a/examples/amqp_sendstring.c
+++ b/examples/amqp_sendstring.c
@@ -34,6 +34,7 @@
* ***** END LICENSE BLOCK *****
*/
+#include <inttypes.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
@@ -84,6 +85,12 @@ int main(int argc, char const *const *argv)
die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
{
+ amqp_confirm_select_ok_t* confirm = amqp_confirm_select(conn, 1);
+ if (confirm == NULL) {
+ die_on_amqp_error(amqp_get_rpc_reply(conn), "enabling confirms");
+ }
+ }
+ {
amqp_basic_properties_t props;
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
props.content_type = amqp_cstring_bytes("text/plain");
@@ -98,6 +105,21 @@ int main(int argc, char const *const *argv)
amqp_cstring_bytes(messagebody)),
"Publishing");
}
+ {
+ amqp_frame_t ack_frame;
+ amqp_basic_ack_t *ack_method;
+ status = amqp_simple_wait_frame(conn, &ack_frame);
+ if (status != AMQP_STATUS_OK) {
+ die_on_error(status, "Waiting for ack");
+ }
+ if (ack_frame.channel != 1 || ack_frame.frame_type != AMQP_FRAME_METHOD ||
+ ack_frame.payload.method.id != AMQP_BASIC_ACK_METHOD) {
+ die("received unexpected method");
+ }
+ ack_method = (amqp_basic_ack_t *)ack_frame.payload.method.decoded;
+ printf("Ack received for envelope: %" PRIu64 " multiple %d\n",
+ ack_method->delivery_tag, ack_method->multiple);
+ }
die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");