summaryrefslogtreecommitdiff
path: root/tools/consume.c
diff options
context:
space:
mode:
Diffstat (limited to 'tools/consume.c')
-rw-r--r--tools/consume.c90
1 files changed, 36 insertions, 54 deletions
diff --git a/tools/consume.c b/tools/consume.c
index 146b0a7..34037d9 100644
--- a/tools/consume.c
+++ b/tools/consume.c
@@ -84,61 +84,45 @@ static char *stringify_bytes(amqp_bytes_t bytes)
static amqp_bytes_t setup_queue(amqp_connection_state_t conn,
char *queue, char *exchange,
- char *exchange_type, char *routing_key)
+ char *routing_key, int declare)
{
- amqp_bytes_t queue_bytes;
- amqp_queue_declare_ok_t *res;
+ amqp_bytes_t queue_bytes = cstring_bytes(queue);
/* if an exchange name wasn't provided, check that we don't
have options that require it. */
- if (!exchange) {
- char *opt = NULL;
- if (routing_key)
- opt = "--routing-key";
- else if (exchange_type)
- opt = "--exchange-type";
-
- if (opt) {
- fprintf(stderr,
- "%s option requires an exchange name to be "
- "provided with --exchange\n", opt);
- exit(1);
- }
+ if (!exchange && routing_key) {
+ fprintf(stderr, "--routing-key option requires an exchange"
+ " name to be provided with --exchange\n");
+ exit(1);
}
- /* Declare the queue as auto-delete. If the queue already
- exists, this won't have any effect. */
- queue_bytes = cstring_bytes(queue);
- res = amqp_queue_declare(conn, 1, queue_bytes, 0, 0, 0, 1,
- AMQP_EMPTY_TABLE);
- if (!res)
- die_rpc(amqp_get_rpc_reply(conn), "queue.declare");
-
- if (!queue) {
- /* the server should have provided a queue name */
- char *sq;
- queue_bytes = amqp_bytes_malloc_dup(res->queue);
- sq = stringify_bytes(queue_bytes);
- fprintf(stderr, "Server provided queue name: %s\n", sq);
- free(sq);
- }
+ if (!queue || exchange || declare) {
+ /* Declare the queue as auto-delete. */
+ amqp_queue_declare_ok_t *res = amqp_queue_declare(conn, 1,
+ queue_bytes, 0, 0, 1, 1,
+ AMQP_EMPTY_TABLE);
+ if (!res)
+ die_rpc(amqp_get_rpc_reply(conn), "queue.declare");
+
+ if (!queue) {
+ /* the server should have provided a queue name */
+ char *sq;
+ queue_bytes = amqp_bytes_malloc_dup(res->queue);
+ sq = stringify_bytes(queue_bytes);
+ fprintf(stderr, "Server provided queue name: %s\n",
+ sq);
+ free(sq);
+ }
- /* Bind to an exchange if requested */
- if (exchange) {
- amqp_bytes_t eb = amqp_cstring_bytes(exchange);
-
- if (exchange_type) {
- /* we should create the exchange */
- if (!amqp_exchange_declare(conn, 1, eb,
- amqp_cstring_bytes(exchange_type),
- 0, 0, AMQP_EMPTY_TABLE))
- die_rpc(amqp_get_rpc_reply(conn), "exchange.declare");
+ /* Bind to an exchange if requested */
+ if (exchange) {
+ amqp_bytes_t eb = amqp_cstring_bytes(exchange);
+ if (!amqp_queue_bind(conn, 1, queue_bytes, eb,
+ cstring_bytes(routing_key),
+ AMQP_EMPTY_TABLE))
+ die_rpc(amqp_get_rpc_reply(conn),
+ "queue.bind");
}
-
- if (!amqp_queue_bind(conn, 1, queue_bytes, eb,
- cstring_bytes(routing_key),
- AMQP_EMPTY_TABLE))
- die_rpc(amqp_get_rpc_reply(conn), "queue.bind");
}
return queue_bytes;
@@ -181,13 +165,13 @@ static void do_consume(amqp_connection_state_t conn, amqp_bytes_t queue,
int main(int argc, const char **argv)
{
poptContext opts;
- int no_ack;
amqp_connection_state_t conn;
const char * const *cmd_argv;
char *queue = NULL;
char *exchange = NULL;
- char *exchange_type = NULL;
char *routing_key = NULL;
+ int declare = 0;
+ int no_ack = 0;
amqp_bytes_t queue_bytes;
struct poptOption options[] = {
@@ -196,11 +180,10 @@ int main(int argc, const char **argv)
"the queue to consume from", "queue"},
{"exchange", 'e', POPT_ARG_STRING, &exchange, 0,
"bind the queue to this exchange", "exchange"},
- {"exchange-type", 't', POPT_ARG_STRING, &exchange_type, 0,
- "create auto-delete exchange of this type for binding",
- "type"},
{"routing-key", 'r', POPT_ARG_STRING, &routing_key, 0,
"the routing key to bind with", "routing key"},
+ {"declare", 'd', POPT_ARG_NONE, &declare, 0,
+ "declare an exclusive queue", NULL},
{"no-ack", 'A', POPT_ARG_NONE, &no_ack, 0,
"consume in no-ack mode", NULL},
POPT_AUTOHELP
@@ -218,8 +201,7 @@ int main(int argc, const char **argv)
}
conn = make_connection();
- queue_bytes = setup_queue(conn, queue, exchange, exchange_type,
- routing_key);
+ queue_bytes = setup_queue(conn, queue, exchange, routing_key, declare);
do_consume(conn, queue_bytes, no_ack, cmd_argv);
close_connection(conn);
return 0;