From e6c256d96bb7b9dd3888a97a0bf30feb33c814ee Mon Sep 17 00:00:00 2001 From: Brian Hammond Date: Mon, 25 Mar 2013 15:00:15 -0400 Subject: listen to multiple routing keys separated by commas --- tools/consume.c | 34 +++++++++++++++++++++++++++++----- tools/publish.c | 31 +++++++++++++++++++++++++++++-- 2 files changed, 58 insertions(+), 7 deletions(-) (limited to 'tools') diff --git a/tools/consume.c b/tools/consume.c index f4af4e2..9075302 100644 --- a/tools/consume.c +++ b/tools/consume.c @@ -40,10 +40,14 @@ #include #include +#include #include "common.h" #include "process.h" +#define MAX_LISTEN_KEYS 1024 +#define LISTEN_KEYS_DELIMITER "," + /* Convert a amqp_bytes_t to an escaped string form for printing. We use the same escaping conventions as rabbitmqctl. */ static char *stringify_bytes(amqp_bytes_t bytes) @@ -75,6 +79,11 @@ static amqp_bytes_t setup_queue(amqp_connection_state_t conn, { amqp_bytes_t queue_bytes = cstring_bytes(queue); + char *routing_key_rest; + char *routing_key_token; + char *routing_tmp; + int routing_key_count = 0; + /* if an exchange name wasn't provided, check that we don't have options that require it. */ if (!exchange && routing_key) { @@ -105,11 +114,26 @@ static amqp_bytes_t setup_queue(amqp_connection_state_t conn, /* Bind to an exchange if requested */ if (exchange) { amqp_bytes_t eb = amqp_cstring_bytes(exchange); - if (!amqp_queue_bind(conn, 1, queue_bytes, eb, - cstring_bytes(routing_key), - amqp_empty_table)) - die_rpc(amqp_get_rpc_reply(conn), - "queue.bind"); + + routing_tmp = strdup( routing_key ); + if ( NULL == routing_tmp ) { + fprintf(stderr, "could not allocate memory to parse routing key\n" ); + exit(1); + } + + for ( + routing_key_token = strtok_r( routing_tmp, LISTEN_KEYS_DELIMITER, &routing_key_rest ) + ; NULL != routing_key_token && routing_key_count < MAX_LISTEN_KEYS - 1 + ; routing_key_token = strtok_r( NULL, LISTEN_KEYS_DELIMITER, &routing_key_rest ) + ) { + + if (!amqp_queue_bind(conn, 1, queue_bytes, eb, + cstring_bytes(routing_key_token), + amqp_empty_table)) { + die_rpc(amqp_get_rpc_reply(conn), "queue.bind"); + } + } + free( routing_tmp ); } } diff --git a/tools/publish.c b/tools/publish.c index f9e4380..4fe43cb 100644 --- a/tools/publish.c +++ b/tools/publish.c @@ -44,6 +44,8 @@ #include "common.h" +#define MAX_LINE_LENGTH 1024 * 32 + static void do_publish(amqp_connection_state_t conn, char *exchange, char *routing_key, amqp_basic_properties_t *props, amqp_bytes_t body) @@ -62,10 +64,12 @@ int main(int argc, const char **argv) char *routing_key = NULL; char *content_type = NULL; char *content_encoding = NULL; + char *reply_to = NULL; char *body = NULL; amqp_basic_properties_t props; amqp_bytes_t body_bytes; int delivery = 1; /* non-persistent by default */ + int line_buffered = 0; struct poptOption options[] = { INCLUDE_OPTIONS(connect_options), @@ -85,6 +89,14 @@ int main(int argc, const char **argv) "content-type", 'C', POPT_ARG_STRING, &content_type, 0, "the content-type for the message", "content type" }, + { + "reply-to", 't', POPT_ARG_STRING, &reply_to, 0, + "the replyTo to use for the message", "reply to" + }, + { + "line-buffered", 'l', POPT_ARG_VAL, &line_buffered, 2, + "treat each line from standard in as a separate message", NULL + }, { "content-encoding", 'E', POPT_ARG_STRING, &content_encoding, 0, @@ -120,15 +132,30 @@ int main(int argc, const char **argv) props.content_encoding = amqp_cstring_bytes(content_encoding); } + if (reply_to) { + props._flags |= AMQP_BASIC_REPLY_TO_FLAG; + props.reply_to = amqp_cstring_bytes(reply_to); + } + conn = make_connection(); if (body) { body_bytes = amqp_cstring_bytes(body); } else { - body_bytes = read_all(0); + if ( line_buffered ) { + body_bytes.bytes = ( char * ) malloc( MAX_LINE_LENGTH ); + while ( fgets( body_bytes.bytes, MAX_LINE_LENGTH, stdin ) ) { + body_bytes.len = strlen( body_bytes.bytes ); + do_publish(conn, exchange, routing_key, &props, body_bytes); + } + } else { + body_bytes = read_all(0); + } } - do_publish(conn, exchange, routing_key, &props, body_bytes); + if ( !line_buffered ) { + do_publish(conn, exchange, routing_key, &props, body_bytes); + } if (!body) { free(body_bytes.bytes); -- cgit v1.2.1