summaryrefslogtreecommitdiff
path: root/tools
diff options
context:
space:
mode:
authorDavid Wragg <david@rabbitmq.com>2011-11-09 14:36:20 +0000
committerDavid Wragg <david@rabbitmq.com>2011-11-09 14:36:20 +0000
commit89071d2ff9f6757402e219c4b8e6cf0c580a7b02 (patch)
tree9646dcb6d81349f731f5570c5d38d42a24bc135d /tools
parent3bd58ca342079a85e355faa09af1b7d3c20f243f (diff)
downloadrabbitmq-c-github-ask-89071d2ff9f6757402e219c4b8e6cf0c580a7b02.tar.gz
Add an option to limit how many messages are consumed
Diffstat (limited to 'tools')
-rw-r--r--tools/consume.c17
-rw-r--r--tools/doc/consume.xml10
2 files changed, 24 insertions, 3 deletions
diff --git a/tools/consume.c b/tools/consume.c
index 3a19d78..9d8a7f0 100644
--- a/tools/consume.c
+++ b/tools/consume.c
@@ -117,13 +117,20 @@ static amqp_bytes_t setup_queue(amqp_connection_state_t conn,
}
static void do_consume(amqp_connection_state_t conn, amqp_bytes_t queue,
- int no_ack, const char * const *argv)
+ int no_ack, int count, const char * const *argv)
{
+ int i;
+
+ /* If there is a limit, set the qos to match */
+ if (count > 0 && count <= 65535
+ && !amqp_basic_qos(conn, 1, 0, count, 0))
+ die_rpc(amqp_get_rpc_reply(conn), "basic.qos");
+
if (!amqp_basic_consume(conn, 1, queue, amqp_empty_bytes, 0, no_ack,
0, amqp_empty_table))
die_rpc(amqp_get_rpc_reply(conn), "basic.consume");
- for (;;) {
+ for (i = 0; count < 0 || i < count; i++) {
amqp_frame_t frame;
struct pipeline pl;
uint64_t delivery_tag;
@@ -160,6 +167,7 @@ int main(int argc, const char **argv)
char *routing_key = NULL;
int declare = 0;
int no_ack = 0;
+ int count = -1;
amqp_bytes_t queue_bytes;
struct poptOption options[] = {
@@ -174,6 +182,9 @@ int main(int argc, const char **argv)
"declare an exclusive queue", NULL},
{"no-ack", 'A', POPT_ARG_NONE, &no_ack, 0,
"consume in no-ack mode", NULL},
+ {"count", 'c', POPT_ARG_INT, &count, 0,
+ "stop consuming after this many messages are consumed",
+ "limit"},
POPT_AUTOHELP
{ NULL, 0, 0, NULL, 0 }
};
@@ -190,7 +201,7 @@ int main(int argc, const char **argv)
conn = make_connection();
queue_bytes = setup_queue(conn, queue, exchange, routing_key, declare);
- do_consume(conn, queue_bytes, no_ack, cmd_argv);
+ do_consume(conn, queue_bytes, no_ack, count, cmd_argv);
close_connection(conn);
return 0;
diff --git a/tools/doc/consume.xml b/tools/doc/consume.xml
index a912e19..b5f40d7 100644
--- a/tools/doc/consume.xml
+++ b/tools/doc/consume.xml
@@ -135,6 +135,16 @@
</para>
</listitem>
</varlistentry>
+ <varlistentry>
+ <term><option>-c</option></term>
+ <term><option>--count</option>=<replaceable class="parameter">limit</replaceable></term>
+ <listitem>
+ <para>
+ Stop consuming after the given number of
+ messages have been received.
+ </para>
+ </listitem>
+ </varlistentry>
</variablelist>
</refsect1>