From 89071d2ff9f6757402e219c4b8e6cf0c580a7b02 Mon Sep 17 00:00:00 2001 From: David Wragg Date: Wed, 9 Nov 2011 14:36:20 +0000 Subject: Add an option to limit how many messages are consumed --- tools/consume.c | 17 ++++++++++++++--- tools/doc/consume.xml | 10 ++++++++++ 2 files changed, 24 insertions(+), 3 deletions(-) (limited to 'tools') diff --git a/tools/consume.c b/tools/consume.c index 3a19d78..9d8a7f0 100644 --- a/tools/consume.c +++ b/tools/consume.c @@ -117,13 +117,20 @@ static amqp_bytes_t setup_queue(amqp_connection_state_t conn, } static void do_consume(amqp_connection_state_t conn, amqp_bytes_t queue, - int no_ack, const char * const *argv) + int no_ack, int count, const char * const *argv) { + int i; + + /* If there is a limit, set the qos to match */ + if (count > 0 && count <= 65535 + && !amqp_basic_qos(conn, 1, 0, count, 0)) + die_rpc(amqp_get_rpc_reply(conn), "basic.qos"); + if (!amqp_basic_consume(conn, 1, queue, amqp_empty_bytes, 0, no_ack, 0, amqp_empty_table)) die_rpc(amqp_get_rpc_reply(conn), "basic.consume"); - for (;;) { + for (i = 0; count < 0 || i < count; i++) { amqp_frame_t frame; struct pipeline pl; uint64_t delivery_tag; @@ -160,6 +167,7 @@ int main(int argc, const char **argv) char *routing_key = NULL; int declare = 0; int no_ack = 0; + int count = -1; amqp_bytes_t queue_bytes; struct poptOption options[] = { @@ -174,6 +182,9 @@ int main(int argc, const char **argv) "declare an exclusive queue", NULL}, {"no-ack", 'A', POPT_ARG_NONE, &no_ack, 0, "consume in no-ack mode", NULL}, + {"count", 'c', POPT_ARG_INT, &count, 0, + "stop consuming after this many messages are consumed", + "limit"}, POPT_AUTOHELP { NULL, 0, 0, NULL, 0 } }; @@ -190,7 +201,7 @@ int main(int argc, const char **argv) conn = make_connection(); queue_bytes = setup_queue(conn, queue, exchange, routing_key, declare); - do_consume(conn, queue_bytes, no_ack, cmd_argv); + do_consume(conn, queue_bytes, no_ack, count, cmd_argv); close_connection(conn); return 0; diff --git a/tools/doc/consume.xml b/tools/doc/consume.xml index a912e19..b5f40d7 100644 --- a/tools/doc/consume.xml +++ b/tools/doc/consume.xml @@ -135,6 +135,16 @@ + + + =limit + + + Stop consuming after the given number of + messages have been received. + + + -- cgit v1.2.1