From 7188e5dce20e56e43314ed3f9c61c8717f961eff Mon Sep 17 00:00:00 2001 From: Antonio Terceiro Date: Sat, 30 Aug 2014 19:32:13 -0700 Subject: amqp-consume: support consuming N messages at a time MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit If you have a single consumer C₁ and 10 messages are published, all 10 will be streamed to that one customer. Assume each message takes a few minutes to be handled. If a second consumer C₂ comes up before C₁ is able to process its first message, it will stay idle until new messages are published, while C₁ will still have to process the other 9 messages after finishing with the first one. If both consumers were started with `--messages 1`, C₁ would only fetch a single message, and start handling it; C₂ would start and already receive the second message . --- tools/consume.c | 27 ++++++++++++++++++++++++--- tools/doc/amqp-consume.xml | 21 +++++++++++++++++++++ 2 files changed, 45 insertions(+), 3 deletions(-) diff --git a/tools/consume.c b/tools/consume.c index 5b7777e..4a0c75d 100644 --- a/tools/consume.c +++ b/tools/consume.c @@ -141,17 +141,32 @@ static amqp_bytes_t setup_queue(amqp_connection_state_t conn, return queue_bytes; } +#define AMQP_CONSUME_MAX_PREFETCH_COUNT 65535 + static void do_consume(amqp_connection_state_t conn, amqp_bytes_t queue, - int no_ack, int count, const char *const *argv) + int no_ack, int count, int prefetch_count, + const char *const *argv) { int i; /* If there is a limit, set the qos to match */ - if (count > 0 && count <= 65535 + if (count > 0 && count <= AMQP_CONSUME_MAX_PREFETCH_COUNT && !amqp_basic_qos(conn, 1, 0, count, 0)) { die_rpc(amqp_get_rpc_reply(conn), "basic.qos"); } + /* if there is a maximum number of messages to be received at a time, set the + * qos to match */ + if (prefetch_count > 0 && prefetch_count <= AMQP_CONSUME_MAX_PREFETCH_COUNT) { + /* the maximum number of messages to be received at a time must be less + * than the global maximum number of messages. */ + if (!(count > 0 && count <= AMQP_CONSUME_MAX_PREFETCH_COUNT && prefetch_count >= count)) { + if (!amqp_basic_qos(conn, 1, 0, prefetch_count, 0)) { + die_rpc(amqp_get_rpc_reply(conn), "basic.qos"); + } + } + } + if (!amqp_basic_consume(conn, 1, queue, amqp_empty_bytes, 0, no_ack, 0, amqp_empty_table)) { die_rpc(amqp_get_rpc_reply(conn), "basic.consume"); @@ -197,6 +212,7 @@ int main(int argc, const char **argv) int exclusive = 0; int no_ack = 0; int count = -1; + int prefetch_count = -1; amqp_bytes_t queue_bytes; struct poptOption options[] = { @@ -230,6 +246,11 @@ int main(int argc, const char **argv) "stop consuming after this many messages are consumed", "limit" }, + { + "prefetch-count", 'p', POPT_ARG_INT, &prefetch_count, 0, + "receive only this many message at a time from the server", + "limit" + }, POPT_AUTOHELP { NULL, '\0', 0, NULL, 0, NULL, NULL } }; @@ -246,7 +267,7 @@ int main(int argc, const char **argv) conn = make_connection(); queue_bytes = setup_queue(conn, queue, exchange, routing_key, declare, exclusive); - do_consume(conn, queue_bytes, no_ack, count, cmd_argv); + do_consume(conn, queue_bytes, no_ack, count, prefetch_count, cmd_argv); close_connection(conn); return 0; diff --git a/tools/doc/amqp-consume.xml b/tools/doc/amqp-consume.xml index f6f51ba..9ee12e8 100644 --- a/tools/doc/amqp-consume.xml +++ b/tools/doc/amqp-consume.xml @@ -160,6 +160,27 @@ + + + =limit + + + Request the server to only send + limit + messages at a time. + + + If any value was passed to , + the value passed to + should be smaller than that, or otherwise it will be + ignored. + + + If / is + passed, this option has no effect. + + + -- cgit v1.2.1