summaryrefslogtreecommitdiff
path: root/tools/consume.c
diff options
context:
space:
mode:
Diffstat (limited to 'tools/consume.c')
-rw-r--r--tools/consume.c34
1 files changed, 29 insertions, 5 deletions
diff --git a/tools/consume.c b/tools/consume.c
index f4af4e2..9075302 100644
--- a/tools/consume.c
+++ b/tools/consume.c
@@ -40,10 +40,14 @@
#include <stdio.h>
#include <stdlib.h>
+#include <string.h>
#include "common.h"
#include "process.h"
+#define MAX_LISTEN_KEYS 1024
+#define LISTEN_KEYS_DELIMITER ","
+
/* Convert a amqp_bytes_t to an escaped string form for printing. We
use the same escaping conventions as rabbitmqctl. */
static char *stringify_bytes(amqp_bytes_t bytes)
@@ -75,6 +79,11 @@ static amqp_bytes_t setup_queue(amqp_connection_state_t conn,
{
amqp_bytes_t queue_bytes = cstring_bytes(queue);
+ char *routing_key_rest;
+ char *routing_key_token;
+ char *routing_tmp;
+ int routing_key_count = 0;
+
/* if an exchange name wasn't provided, check that we don't
have options that require it. */
if (!exchange && routing_key) {
@@ -105,11 +114,26 @@ static amqp_bytes_t setup_queue(amqp_connection_state_t conn,
/* Bind to an exchange if requested */
if (exchange) {
amqp_bytes_t eb = amqp_cstring_bytes(exchange);
- if (!amqp_queue_bind(conn, 1, queue_bytes, eb,
- cstring_bytes(routing_key),
- amqp_empty_table))
- die_rpc(amqp_get_rpc_reply(conn),
- "queue.bind");
+
+ routing_tmp = strdup( routing_key );
+ if ( NULL == routing_tmp ) {
+ fprintf(stderr, "could not allocate memory to parse routing key\n" );
+ exit(1);
+ }
+
+ for (
+ routing_key_token = strtok_r( routing_tmp, LISTEN_KEYS_DELIMITER, &routing_key_rest )
+ ; NULL != routing_key_token && routing_key_count < MAX_LISTEN_KEYS - 1
+ ; routing_key_token = strtok_r( NULL, LISTEN_KEYS_DELIMITER, &routing_key_rest )
+ ) {
+
+ if (!amqp_queue_bind(conn, 1, queue_bytes, eb,
+ cstring_bytes(routing_key_token),
+ amqp_empty_table)) {
+ die_rpc(amqp_get_rpc_reply(conn), "queue.bind");
+ }
+ }
+ free( routing_tmp );
}
}