diff options
Diffstat (limited to 'tools/consume.c')
-rw-r--r-- | tools/consume.c | 90 |
1 files changed, 36 insertions, 54 deletions
diff --git a/tools/consume.c b/tools/consume.c index 146b0a7..34037d9 100644 --- a/tools/consume.c +++ b/tools/consume.c @@ -84,61 +84,45 @@ static char *stringify_bytes(amqp_bytes_t bytes) static amqp_bytes_t setup_queue(amqp_connection_state_t conn, char *queue, char *exchange, - char *exchange_type, char *routing_key) + char *routing_key, int declare) { - amqp_bytes_t queue_bytes; - amqp_queue_declare_ok_t *res; + amqp_bytes_t queue_bytes = cstring_bytes(queue); /* if an exchange name wasn't provided, check that we don't have options that require it. */ - if (!exchange) { - char *opt = NULL; - if (routing_key) - opt = "--routing-key"; - else if (exchange_type) - opt = "--exchange-type"; - - if (opt) { - fprintf(stderr, - "%s option requires an exchange name to be " - "provided with --exchange\n", opt); - exit(1); - } + if (!exchange && routing_key) { + fprintf(stderr, "--routing-key option requires an exchange" + " name to be provided with --exchange\n"); + exit(1); } - /* Declare the queue as auto-delete. If the queue already - exists, this won't have any effect. */ - queue_bytes = cstring_bytes(queue); - res = amqp_queue_declare(conn, 1, queue_bytes, 0, 0, 0, 1, - AMQP_EMPTY_TABLE); - if (!res) - die_rpc(amqp_get_rpc_reply(conn), "queue.declare"); - - if (!queue) { - /* the server should have provided a queue name */ - char *sq; - queue_bytes = amqp_bytes_malloc_dup(res->queue); - sq = stringify_bytes(queue_bytes); - fprintf(stderr, "Server provided queue name: %s\n", sq); - free(sq); - } + if (!queue || exchange || declare) { + /* Declare the queue as auto-delete. */ + amqp_queue_declare_ok_t *res = amqp_queue_declare(conn, 1, + queue_bytes, 0, 0, 1, 1, + AMQP_EMPTY_TABLE); + if (!res) + die_rpc(amqp_get_rpc_reply(conn), "queue.declare"); + + if (!queue) { + /* the server should have provided a queue name */ + char *sq; + queue_bytes = amqp_bytes_malloc_dup(res->queue); + sq = stringify_bytes(queue_bytes); + fprintf(stderr, "Server provided queue name: %s\n", + sq); + free(sq); + } - /* Bind to an exchange if requested */ - if (exchange) { - amqp_bytes_t eb = amqp_cstring_bytes(exchange); - - if (exchange_type) { - /* we should create the exchange */ - if (!amqp_exchange_declare(conn, 1, eb, - amqp_cstring_bytes(exchange_type), - 0, 0, AMQP_EMPTY_TABLE)) - die_rpc(amqp_get_rpc_reply(conn), "exchange.declare"); + /* Bind to an exchange if requested */ + if (exchange) { + amqp_bytes_t eb = amqp_cstring_bytes(exchange); + if (!amqp_queue_bind(conn, 1, queue_bytes, eb, + cstring_bytes(routing_key), + AMQP_EMPTY_TABLE)) + die_rpc(amqp_get_rpc_reply(conn), + "queue.bind"); } - - if (!amqp_queue_bind(conn, 1, queue_bytes, eb, - cstring_bytes(routing_key), - AMQP_EMPTY_TABLE)) - die_rpc(amqp_get_rpc_reply(conn), "queue.bind"); } return queue_bytes; @@ -181,13 +165,13 @@ static void do_consume(amqp_connection_state_t conn, amqp_bytes_t queue, int main(int argc, const char **argv) { poptContext opts; - int no_ack; amqp_connection_state_t conn; const char * const *cmd_argv; char *queue = NULL; char *exchange = NULL; - char *exchange_type = NULL; char *routing_key = NULL; + int declare = 0; + int no_ack = 0; amqp_bytes_t queue_bytes; struct poptOption options[] = { @@ -196,11 +180,10 @@ int main(int argc, const char **argv) "the queue to consume from", "queue"}, {"exchange", 'e', POPT_ARG_STRING, &exchange, 0, "bind the queue to this exchange", "exchange"}, - {"exchange-type", 't', POPT_ARG_STRING, &exchange_type, 0, - "create auto-delete exchange of this type for binding", - "type"}, {"routing-key", 'r', POPT_ARG_STRING, &routing_key, 0, "the routing key to bind with", "routing key"}, + {"declare", 'd', POPT_ARG_NONE, &declare, 0, + "declare an exclusive queue", NULL}, {"no-ack", 'A', POPT_ARG_NONE, &no_ack, 0, "consume in no-ack mode", NULL}, POPT_AUTOHELP @@ -218,8 +201,7 @@ int main(int argc, const char **argv) } conn = make_connection(); - queue_bytes = setup_queue(conn, queue, exchange, exchange_type, - routing_key); + queue_bytes = setup_queue(conn, queue, exchange, routing_key, declare); do_consume(conn, queue_bytes, no_ack, cmd_argv); close_connection(conn); return 0; |