summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Antonuk <alan.antonuk@gmail.com>2013-07-30 15:55:42 -0700
committerAlan Antonuk <alan.antonuk@gmail.com>2013-07-30 15:55:42 -0700
commiteff8134876282f2ba5a48b800109e300bcb3474f (patch)
treeaed67c35197763e680225f7130fca09b94af883b
parent7a451a42c93ba819c18bbbdf189a4e5fbaf64054 (diff)
downloadrabbitmq-c-consumer_cancel_example.tar.gz
Add consumer-cancel notify to amqp_consumer.cconsumer_cancel_example
Add consumer cancel notification support to amqp_consumer.c
-rw-r--r--examples/amqp_consumer.c33
1 files changed, 31 insertions, 2 deletions
diff --git a/examples/amqp_consumer.c b/examples/amqp_consumer.c
index 62da0ca..21ea464 100644
--- a/examples/amqp_consumer.c
+++ b/examples/amqp_consumer.c
@@ -95,6 +95,10 @@ static void run(amqp_connection_state_t conn)
*/
break;
+ case AMQP_BASIC_CANCEL_METHOD:
+ printf("Consumer cancelled!\n");
+
+ return;
case AMQP_BASIC_RETURN_METHOD:
/* if a published message couldn't be routed and the mandatory flag was set
* this is what would be returned. The message then needs to be read.
@@ -175,8 +179,33 @@ int main(int argc, char const *const *argv)
die("opening TCP socket");
}
- die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
- "Logging in");
+ {
+ amqp_table_entry_t consumer_cancel_capability;
+
+ amqp_table_entry_t capabilities_entry;
+ amqp_table_t *capabilities;
+
+ amqp_table_t properties;
+
+ consumer_cancel_capability.key = amqp_cstring_bytes("consumer_cancel_notify");
+ consumer_cancel_capability.value.kind = AMQP_FIELD_KIND_BOOLEAN;
+ consumer_cancel_capability.value.value.boolean = 1;
+
+ capabilities_entry.key = amqp_cstring_bytes("capabilities");
+ capabilities_entry.value.kind = AMQP_FIELD_KIND_TABLE;
+ capabilities = &capabilities_entry.value.value.table;
+
+ capabilities->num_entries = 1;
+ capabilities->entries = &consumer_cancel_capability;
+
+ properties.num_entries = 1;
+ properties.entries = &capabilities_entry;
+
+ die_on_amqp_error(amqp_login_with_properties(conn, "/", 0, 131072, 0,
+ &properties, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
+ "Logging in");
+
+ }
amqp_channel_open(conn, 1);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");