summaryrefslogtreecommitdiff
path: root/tools/consume.c
diff options
context:
space:
mode:
Diffstat (limited to 'tools/consume.c')
-rw-r--r--tools/consume.c136
1 files changed, 55 insertions, 81 deletions
diff --git a/tools/consume.c b/tools/consume.c
index 485a0cf..dbc164a 100644
--- a/tools/consume.c
+++ b/tools/consume.c
@@ -49,8 +49,7 @@
/* Convert a amqp_bytes_t to an escaped string form for printing. We
use the same escaping conventions as rabbitmqctl. */
-static char *stringify_bytes(amqp_bytes_t bytes)
-{
+static char *stringify_bytes(amqp_bytes_t bytes) {
/* We will need up to 4 chars per byte, plus the terminating 0 */
char *res = malloc(bytes.len * 4 + 1);
uint8_t *data = bytes.bytes;
@@ -72,11 +71,9 @@ static char *stringify_bytes(amqp_bytes_t bytes)
return res;
}
-static amqp_bytes_t setup_queue(amqp_connection_state_t conn,
- char *queue, char *exchange,
- char *routing_key, int declare,
- int exclusive)
-{
+static amqp_bytes_t setup_queue(amqp_connection_state_t conn, char *queue,
+ char *exchange, char *routing_key, int declare,
+ int exclusive) {
amqp_bytes_t queue_bytes = cstring_bytes(queue);
char *routing_key_rest;
@@ -84,19 +81,19 @@ static amqp_bytes_t setup_queue(amqp_connection_state_t conn,
char *routing_tmp;
int routing_key_count = 0;
- /* if an exchange name wasn't provided, check that we don't
- have options that require it. */
+ /* if an exchange name wasn't provided, check that we don't have options that
+ * require it. */
if (!exchange && routing_key) {
- fprintf(stderr, "--routing-key option requires an exchange"
- " name to be provided with --exchange\n");
+ fprintf(stderr,
+ "--routing-key option requires an exchange name to be provided "
+ "with --exchange\n");
exit(1);
}
if (!queue || exchange || declare || exclusive) {
/* Declare the queue as auto-delete. */
- amqp_queue_declare_ok_t *res = amqp_queue_declare(conn, 1,
- queue_bytes, 0, 0, exclusive, 1,
- amqp_empty_table);
+ amqp_queue_declare_ok_t *res = amqp_queue_declare(
+ conn, 1, queue_bytes, 0, 0, exclusive, 1, amqp_empty_table);
if (!res) {
die_rpc(amqp_get_rpc_reply(conn), "queue.declare");
}
@@ -106,8 +103,7 @@ static amqp_bytes_t setup_queue(amqp_connection_state_t conn,
char *sq;
queue_bytes = amqp_bytes_malloc_dup(res->queue);
sq = stringify_bytes(queue_bytes);
- fprintf(stderr, "Server provided queue name: %s\n",
- sq);
+ fprintf(stderr, "Server provided queue name: %s\n", sq);
free(sq);
}
@@ -115,17 +111,17 @@ static amqp_bytes_t setup_queue(amqp_connection_state_t conn,
if (exchange) {
amqp_bytes_t eb = amqp_cstring_bytes(exchange);
- routing_tmp = strdup( routing_key );
- if ( NULL == routing_tmp ) {
- fprintf(stderr, "could not allocate memory to parse routing key\n" );
+ routing_tmp = strdup(routing_key);
+ if (NULL == routing_tmp) {
+ fprintf(stderr, "could not allocate memory to parse routing key\n");
exit(1);
}
- for (
- routing_key_token = strtok_r( routing_tmp, LISTEN_KEYS_DELIMITER, &routing_key_rest )
- ; NULL != routing_key_token && routing_key_count < MAX_LISTEN_KEYS - 1
- ; routing_key_token = strtok_r( NULL, LISTEN_KEYS_DELIMITER, &routing_key_rest )
- ) {
+ for (routing_key_token =
+ strtok_r(routing_tmp, LISTEN_KEYS_DELIMITER, &routing_key_rest);
+ NULL != routing_key_token && routing_key_count < MAX_LISTEN_KEYS - 1;
+ routing_key_token =
+ strtok_r(NULL, LISTEN_KEYS_DELIMITER, &routing_key_rest)) {
if (!amqp_queue_bind(conn, 1, queue_bytes, eb,
cstring_bytes(routing_key_token),
@@ -133,7 +129,7 @@ static amqp_bytes_t setup_queue(amqp_connection_state_t conn,
die_rpc(amqp_get_rpc_reply(conn), "queue.bind");
}
}
- free( routing_tmp );
+ free(routing_tmp);
}
}
@@ -144,13 +140,12 @@ static amqp_bytes_t setup_queue(amqp_connection_state_t conn,
static void do_consume(amqp_connection_state_t conn, amqp_bytes_t queue,
int no_ack, int count, int prefetch_count,
- const char *const *argv)
-{
+ const char *const *argv) {
int i;
/* If there is a limit, set the qos to match */
- if (count > 0 && count <= AMQP_CONSUME_MAX_PREFETCH_COUNT
- && !amqp_basic_qos(conn, 1, 0, count, 0)) {
+ if (count > 0 && count <= AMQP_CONSUME_MAX_PREFETCH_COUNT &&
+ !amqp_basic_qos(conn, 1, 0, count, 0)) {
die_rpc(amqp_get_rpc_reply(conn), "basic.qos");
}
@@ -159,15 +154,16 @@ static void do_consume(amqp_connection_state_t conn, amqp_bytes_t queue,
if (prefetch_count > 0 && prefetch_count <= AMQP_CONSUME_MAX_PREFETCH_COUNT) {
/* the maximum number of messages to be received at a time must be less
* than the global maximum number of messages. */
- if (!(count > 0 && count <= AMQP_CONSUME_MAX_PREFETCH_COUNT && prefetch_count >= count)) {
+ if (!(count > 0 && count <= AMQP_CONSUME_MAX_PREFETCH_COUNT &&
+ prefetch_count >= count)) {
if (!amqp_basic_qos(conn, 1, 0, prefetch_count, 0)) {
die_rpc(amqp_get_rpc_reply(conn), "basic.qos");
}
}
}
- if (!amqp_basic_consume(conn, 1, queue, amqp_empty_bytes, 0, no_ack,
- 0, amqp_empty_table)) {
+ if (!amqp_basic_consume(conn, 1, queue, amqp_empty_bytes, 0, no_ack, 0,
+ amqp_empty_table)) {
die_rpc(amqp_get_rpc_reply(conn), "basic.consume");
}
@@ -179,8 +175,8 @@ static void do_consume(amqp_connection_state_t conn, amqp_bytes_t queue,
int res = amqp_simple_wait_frame(conn, &frame);
die_amqp_error(res, "waiting for header frame");
- if (frame.frame_type != AMQP_FRAME_METHOD
- || frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) {
+ if (frame.frame_type != AMQP_FRAME_METHOD ||
+ frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) {
continue;
}
@@ -191,16 +187,13 @@ static void do_consume(amqp_connection_state_t conn, amqp_bytes_t queue,
copy_body(conn, pl.infd);
if (finish_pipeline(&pl) && !no_ack)
- die_amqp_error(amqp_basic_ack(conn, 1, delivery_tag,
- 0),
- "basic.ack");
+ die_amqp_error(amqp_basic_ack(conn, 1, delivery_tag, 0), "basic.ack");
amqp_maybe_release_buffers(conn);
}
}
-int main(int argc, const char **argv)
-{
+int main(int argc, const char **argv) {
poptContext opts;
amqp_connection_state_t conn;
const char *const *cmd_argv;
@@ -215,47 +208,27 @@ int main(int argc, const char **argv)
amqp_bytes_t queue_bytes;
struct poptOption options[] = {
- INCLUDE_OPTIONS(connect_options),
- {
- "queue", 'q', POPT_ARG_STRING, &queue, 0,
- "the queue to consume from", "queue"
- },
- {
- "exchange", 'e', POPT_ARG_STRING, &exchange, 0,
- "bind the queue to this exchange", "exchange"
- },
- {
- "routing-key", 'r', POPT_ARG_STRING, &routing_key, 0,
- "the routing key to bind with", "routing key"
- },
- {
- "declare", 'd', POPT_ARG_NONE, &declare, 0,
- "declare an exclusive queue (deprecated, use --exclusive instead)", NULL
- },
- {
- "exclusive", 'x', POPT_ARG_NONE, &exclusive, 0,
- "declare the queue as exclusive", NULL
- },
- {
- "no-ack", 'A', POPT_ARG_NONE, &no_ack, 0,
- "consume in no-ack mode", NULL
- },
- {
- "count", 'c', POPT_ARG_INT, &count, 0,
- "stop consuming after this many messages are consumed",
- "limit"
- },
- {
- "prefetch-count", 'p', POPT_ARG_INT, &prefetch_count, 0,
- "receive only this many message at a time from the server",
- "limit"
- },
- POPT_AUTOHELP
- { NULL, '\0', 0, NULL, 0, NULL, NULL }
- };
-
- opts = process_options(argc, argv, options,
- "[OPTIONS]... <command> <args>");
+ INCLUDE_OPTIONS(connect_options),
+ {"queue", 'q', POPT_ARG_STRING, &queue, 0, "the queue to consume from",
+ "queue"},
+ {"exchange", 'e', POPT_ARG_STRING, &exchange, 0,
+ "bind the queue to this exchange", "exchange"},
+ {"routing-key", 'r', POPT_ARG_STRING, &routing_key, 0,
+ "the routing key to bind with", "routing key"},
+ {"declare", 'd', POPT_ARG_NONE, &declare, 0,
+ "declare an exclusive queue (deprecated, use --exclusive instead)",
+ NULL},
+ {"exclusive", 'x', POPT_ARG_NONE, &exclusive, 0,
+ "declare the queue as exclusive", NULL},
+ {"no-ack", 'A', POPT_ARG_NONE, &no_ack, 0, "consume in no-ack mode",
+ NULL},
+ {"count", 'c', POPT_ARG_INT, &count, 0,
+ "stop consuming after this many messages are consumed", "limit"},
+ {"prefetch-count", 'p', POPT_ARG_INT, &prefetch_count, 0,
+ "receive only this many message at a time from the server", "limit"},
+ POPT_AUTOHELP{NULL, '\0', 0, NULL, 0, NULL, NULL}};
+
+ opts = process_options(argc, argv, options, "[OPTIONS]... <command> <args>");
cmd_argv = poptGetArgs(opts);
if (!cmd_argv || !cmd_argv[0]) {
@@ -265,7 +238,8 @@ int main(int argc, const char **argv)
}
conn = make_connection();
- queue_bytes = setup_queue(conn, queue, exchange, routing_key, declare, exclusive);
+ queue_bytes =
+ setup_queue(conn, queue, exchange, routing_key, declare, exclusive);
do_consume(conn, queue_bytes, no_ack, count, prefetch_count, cmd_argv);
close_connection(conn);
return 0;