summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAntonio Terceiro <terceiro@softwarelivre.org>2014-08-30 19:32:13 -0700
committerAlan Antonuk <alan.antonuk@gmail.com>2014-09-03 07:55:35 -0700
commit7188e5dce20e56e43314ed3f9c61c8717f961eff (patch)
tree92abde2eeaf2a1ba3fae72a9d54d6bf730ad896a
parent8e0600a65b5d8563ae4091b34307fc03e1000f77 (diff)
downloadrabbitmq-c-github-ask-7188e5dce20e56e43314ed3f9c61c8717f961eff.tar.gz
amqp-consume: support consuming N messages at a time
If you have a single consumer C₁ and 10 messages are published, all 10 will be streamed to that one customer. Assume each message takes a few minutes to be handled. If a second consumer C₂ comes up before C₁ is able to process its first message, it will stay idle until new messages are published, while C₁ will still have to process the other 9 messages after finishing with the first one. If both consumers were started with `--messages 1`, C₁ would only fetch a single message, and start handling it; C₂ would start and already receive the second message .
-rw-r--r--tools/consume.c27
-rw-r--r--tools/doc/amqp-consume.xml21
2 files changed, 45 insertions, 3 deletions
diff --git a/tools/consume.c b/tools/consume.c
index 5b7777e..4a0c75d 100644
--- a/tools/consume.c
+++ b/tools/consume.c
@@ -141,17 +141,32 @@ static amqp_bytes_t setup_queue(amqp_connection_state_t conn,
return queue_bytes;
}
+#define AMQP_CONSUME_MAX_PREFETCH_COUNT 65535
+
static void do_consume(amqp_connection_state_t conn, amqp_bytes_t queue,
- int no_ack, int count, const char *const *argv)
+ int no_ack, int count, int prefetch_count,
+ const char *const *argv)
{
int i;
/* If there is a limit, set the qos to match */
- if (count > 0 && count <= 65535
+ if (count > 0 && count <= AMQP_CONSUME_MAX_PREFETCH_COUNT
&& !amqp_basic_qos(conn, 1, 0, count, 0)) {
die_rpc(amqp_get_rpc_reply(conn), "basic.qos");
}
+ /* if there is a maximum number of messages to be received at a time, set the
+ * qos to match */
+ if (prefetch_count > 0 && prefetch_count <= AMQP_CONSUME_MAX_PREFETCH_COUNT) {
+ /* the maximum number of messages to be received at a time must be less
+ * than the global maximum number of messages. */
+ if (!(count > 0 && count <= AMQP_CONSUME_MAX_PREFETCH_COUNT && prefetch_count >= count)) {
+ if (!amqp_basic_qos(conn, 1, 0, prefetch_count, 0)) {
+ die_rpc(amqp_get_rpc_reply(conn), "basic.qos");
+ }
+ }
+ }
+
if (!amqp_basic_consume(conn, 1, queue, amqp_empty_bytes, 0, no_ack,
0, amqp_empty_table)) {
die_rpc(amqp_get_rpc_reply(conn), "basic.consume");
@@ -197,6 +212,7 @@ int main(int argc, const char **argv)
int exclusive = 0;
int no_ack = 0;
int count = -1;
+ int prefetch_count = -1;
amqp_bytes_t queue_bytes;
struct poptOption options[] = {
@@ -230,6 +246,11 @@ int main(int argc, const char **argv)
"stop consuming after this many messages are consumed",
"limit"
},
+ {
+ "prefetch-count", 'p', POPT_ARG_INT, &prefetch_count, 0,
+ "receive only this many message at a time from the server",
+ "limit"
+ },
POPT_AUTOHELP
{ NULL, '\0', 0, NULL, 0, NULL, NULL }
};
@@ -246,7 +267,7 @@ int main(int argc, const char **argv)
conn = make_connection();
queue_bytes = setup_queue(conn, queue, exchange, routing_key, declare, exclusive);
- do_consume(conn, queue_bytes, no_ack, count, cmd_argv);
+ do_consume(conn, queue_bytes, no_ack, count, prefetch_count, cmd_argv);
close_connection(conn);
return 0;
diff --git a/tools/doc/amqp-consume.xml b/tools/doc/amqp-consume.xml
index f6f51ba..9ee12e8 100644
--- a/tools/doc/amqp-consume.xml
+++ b/tools/doc/amqp-consume.xml
@@ -160,6 +160,27 @@
</para>
</listitem>
</varlistentry>
+ <varlistentry>
+ <term><option>-p</option></term>
+ <term><option>--prefetch-count</option>=<replaceable class="parameter">limit</replaceable></term>
+ <listitem>
+ <para>
+ Request the server to only send
+ <replaceable class="parameter">limit</replaceable>
+ messages at a time.
+ </para>
+ <para>
+ If any value was passed to <option>--count</option>,
+ the value passed to <option>--prefetch-count</option>
+ should be smaller than that, or otherwise it will be
+ ignored.
+ </para>
+ <para>
+ If <option>-A</option>/<option>--no-ack</option> is
+ passed, this option has no effect.
+ </para>
+ </listitem>
+ </varlistentry>
</variablelist>
</refsect1>