diff options
author | Brian Hammond <brianin3d@yahoo.com> | 2013-03-25 15:00:15 -0400 |
---|---|---|
committer | Alan Antonuk <alan.antonuk@gmail.com> | 2013-06-26 10:53:13 -0700 |
commit | e6c256d96bb7b9dd3888a97a0bf30feb33c814ee (patch) | |
tree | a26abbfbdc86726831004186f6d7d0597900285d | |
parent | ee0a716ae8ccd6b1305c06b8a6211e8e95ab375b (diff) | |
download | rabbitmq-c-e6c256d96bb7b9dd3888a97a0bf30feb33c814ee.tar.gz |
listen to multiple routing keys separated by commas
-rw-r--r-- | tools/consume.c | 34 | ||||
-rw-r--r-- | tools/publish.c | 31 |
2 files changed, 58 insertions, 7 deletions
diff --git a/tools/consume.c b/tools/consume.c index f4af4e2..9075302 100644 --- a/tools/consume.c +++ b/tools/consume.c @@ -40,10 +40,14 @@ #include <stdio.h> #include <stdlib.h> +#include <string.h> #include "common.h" #include "process.h" +#define MAX_LISTEN_KEYS 1024 +#define LISTEN_KEYS_DELIMITER "," + /* Convert a amqp_bytes_t to an escaped string form for printing. We use the same escaping conventions as rabbitmqctl. */ static char *stringify_bytes(amqp_bytes_t bytes) @@ -75,6 +79,11 @@ static amqp_bytes_t setup_queue(amqp_connection_state_t conn, { amqp_bytes_t queue_bytes = cstring_bytes(queue); + char *routing_key_rest; + char *routing_key_token; + char *routing_tmp; + int routing_key_count = 0; + /* if an exchange name wasn't provided, check that we don't have options that require it. */ if (!exchange && routing_key) { @@ -105,11 +114,26 @@ static amqp_bytes_t setup_queue(amqp_connection_state_t conn, /* Bind to an exchange if requested */ if (exchange) { amqp_bytes_t eb = amqp_cstring_bytes(exchange); - if (!amqp_queue_bind(conn, 1, queue_bytes, eb, - cstring_bytes(routing_key), - amqp_empty_table)) - die_rpc(amqp_get_rpc_reply(conn), - "queue.bind"); + + routing_tmp = strdup( routing_key ); + if ( NULL == routing_tmp ) { + fprintf(stderr, "could not allocate memory to parse routing key\n" ); + exit(1); + } + + for ( + routing_key_token = strtok_r( routing_tmp, LISTEN_KEYS_DELIMITER, &routing_key_rest ) + ; NULL != routing_key_token && routing_key_count < MAX_LISTEN_KEYS - 1 + ; routing_key_token = strtok_r( NULL, LISTEN_KEYS_DELIMITER, &routing_key_rest ) + ) { + + if (!amqp_queue_bind(conn, 1, queue_bytes, eb, + cstring_bytes(routing_key_token), + amqp_empty_table)) { + die_rpc(amqp_get_rpc_reply(conn), "queue.bind"); + } + } + free( routing_tmp ); } } diff --git a/tools/publish.c b/tools/publish.c index f9e4380..4fe43cb 100644 --- a/tools/publish.c +++ b/tools/publish.c @@ -44,6 +44,8 @@ #include "common.h" +#define MAX_LINE_LENGTH 1024 * 32 + static void do_publish(amqp_connection_state_t conn, char *exchange, char *routing_key, amqp_basic_properties_t *props, amqp_bytes_t body) @@ -62,10 +64,12 @@ int main(int argc, const char **argv) char *routing_key = NULL; char *content_type = NULL; char *content_encoding = NULL; + char *reply_to = NULL; char *body = NULL; amqp_basic_properties_t props; amqp_bytes_t body_bytes; int delivery = 1; /* non-persistent by default */ + int line_buffered = 0; struct poptOption options[] = { INCLUDE_OPTIONS(connect_options), @@ -86,6 +90,14 @@ int main(int argc, const char **argv) "the content-type for the message", "content type" }, { + "reply-to", 't', POPT_ARG_STRING, &reply_to, 0, + "the replyTo to use for the message", "reply to" + }, + { + "line-buffered", 'l', POPT_ARG_VAL, &line_buffered, 2, + "treat each line from standard in as a separate message", NULL + }, + { "content-encoding", 'E', POPT_ARG_STRING, &content_encoding, 0, "the content-encoding for the message", "content encoding" @@ -120,15 +132,30 @@ int main(int argc, const char **argv) props.content_encoding = amqp_cstring_bytes(content_encoding); } + if (reply_to) { + props._flags |= AMQP_BASIC_REPLY_TO_FLAG; + props.reply_to = amqp_cstring_bytes(reply_to); + } + conn = make_connection(); if (body) { body_bytes = amqp_cstring_bytes(body); } else { - body_bytes = read_all(0); + if ( line_buffered ) { + body_bytes.bytes = ( char * ) malloc( MAX_LINE_LENGTH ); + while ( fgets( body_bytes.bytes, MAX_LINE_LENGTH, stdin ) ) { + body_bytes.len = strlen( body_bytes.bytes ); + do_publish(conn, exchange, routing_key, &props, body_bytes); + } + } else { + body_bytes = read_all(0); + } } - do_publish(conn, exchange, routing_key, &props, body_bytes); + if ( !line_buffered ) { + do_publish(conn, exchange, routing_key, &props, body_bytes); + } if (!body) { free(body_bytes.bytes); |