diff options
Diffstat (limited to 'tools/consume.c')
-rw-r--r-- | tools/consume.c | 34 |
1 files changed, 29 insertions, 5 deletions
diff --git a/tools/consume.c b/tools/consume.c index f4af4e2..9075302 100644 --- a/tools/consume.c +++ b/tools/consume.c @@ -40,10 +40,14 @@ #include <stdio.h> #include <stdlib.h> +#include <string.h> #include "common.h" #include "process.h" +#define MAX_LISTEN_KEYS 1024 +#define LISTEN_KEYS_DELIMITER "," + /* Convert a amqp_bytes_t to an escaped string form for printing. We use the same escaping conventions as rabbitmqctl. */ static char *stringify_bytes(amqp_bytes_t bytes) @@ -75,6 +79,11 @@ static amqp_bytes_t setup_queue(amqp_connection_state_t conn, { amqp_bytes_t queue_bytes = cstring_bytes(queue); + char *routing_key_rest; + char *routing_key_token; + char *routing_tmp; + int routing_key_count = 0; + /* if an exchange name wasn't provided, check that we don't have options that require it. */ if (!exchange && routing_key) { @@ -105,11 +114,26 @@ static amqp_bytes_t setup_queue(amqp_connection_state_t conn, /* Bind to an exchange if requested */ if (exchange) { amqp_bytes_t eb = amqp_cstring_bytes(exchange); - if (!amqp_queue_bind(conn, 1, queue_bytes, eb, - cstring_bytes(routing_key), - amqp_empty_table)) - die_rpc(amqp_get_rpc_reply(conn), - "queue.bind"); + + routing_tmp = strdup( routing_key ); + if ( NULL == routing_tmp ) { + fprintf(stderr, "could not allocate memory to parse routing key\n" ); + exit(1); + } + + for ( + routing_key_token = strtok_r( routing_tmp, LISTEN_KEYS_DELIMITER, &routing_key_rest ) + ; NULL != routing_key_token && routing_key_count < MAX_LISTEN_KEYS - 1 + ; routing_key_token = strtok_r( NULL, LISTEN_KEYS_DELIMITER, &routing_key_rest ) + ) { + + if (!amqp_queue_bind(conn, 1, queue_bytes, eb, + cstring_bytes(routing_key_token), + amqp_empty_table)) { + die_rpc(amqp_get_rpc_reply(conn), "queue.bind"); + } + } + free( routing_tmp ); } } |