summaryrefslogtreecommitdiff
path: root/tools
diff options
context:
space:
mode:
authorBrian Hammond <brianin3d@yahoo.com>2013-03-25 15:00:15 -0400
committerAlan Antonuk <alan.antonuk@gmail.com>2013-06-26 10:53:13 -0700
commite6c256d96bb7b9dd3888a97a0bf30feb33c814ee (patch)
treea26abbfbdc86726831004186f6d7d0597900285d /tools
parentee0a716ae8ccd6b1305c06b8a6211e8e95ab375b (diff)
downloadrabbitmq-c-github-ask-e6c256d96bb7b9dd3888a97a0bf30feb33c814ee.tar.gz
listen to multiple routing keys separated by commas
Diffstat (limited to 'tools')
-rw-r--r--tools/consume.c34
-rw-r--r--tools/publish.c31
2 files changed, 58 insertions, 7 deletions
diff --git a/tools/consume.c b/tools/consume.c
index f4af4e2..9075302 100644
--- a/tools/consume.c
+++ b/tools/consume.c
@@ -40,10 +40,14 @@
#include <stdio.h>
#include <stdlib.h>
+#include <string.h>
#include "common.h"
#include "process.h"
+#define MAX_LISTEN_KEYS 1024
+#define LISTEN_KEYS_DELIMITER ","
+
/* Convert a amqp_bytes_t to an escaped string form for printing. We
use the same escaping conventions as rabbitmqctl. */
static char *stringify_bytes(amqp_bytes_t bytes)
@@ -75,6 +79,11 @@ static amqp_bytes_t setup_queue(amqp_connection_state_t conn,
{
amqp_bytes_t queue_bytes = cstring_bytes(queue);
+ char *routing_key_rest;
+ char *routing_key_token;
+ char *routing_tmp;
+ int routing_key_count = 0;
+
/* if an exchange name wasn't provided, check that we don't
have options that require it. */
if (!exchange && routing_key) {
@@ -105,11 +114,26 @@ static amqp_bytes_t setup_queue(amqp_connection_state_t conn,
/* Bind to an exchange if requested */
if (exchange) {
amqp_bytes_t eb = amqp_cstring_bytes(exchange);
- if (!amqp_queue_bind(conn, 1, queue_bytes, eb,
- cstring_bytes(routing_key),
- amqp_empty_table))
- die_rpc(amqp_get_rpc_reply(conn),
- "queue.bind");
+
+ routing_tmp = strdup( routing_key );
+ if ( NULL == routing_tmp ) {
+ fprintf(stderr, "could not allocate memory to parse routing key\n" );
+ exit(1);
+ }
+
+ for (
+ routing_key_token = strtok_r( routing_tmp, LISTEN_KEYS_DELIMITER, &routing_key_rest )
+ ; NULL != routing_key_token && routing_key_count < MAX_LISTEN_KEYS - 1
+ ; routing_key_token = strtok_r( NULL, LISTEN_KEYS_DELIMITER, &routing_key_rest )
+ ) {
+
+ if (!amqp_queue_bind(conn, 1, queue_bytes, eb,
+ cstring_bytes(routing_key_token),
+ amqp_empty_table)) {
+ die_rpc(amqp_get_rpc_reply(conn), "queue.bind");
+ }
+ }
+ free( routing_tmp );
}
}
diff --git a/tools/publish.c b/tools/publish.c
index f9e4380..4fe43cb 100644
--- a/tools/publish.c
+++ b/tools/publish.c
@@ -44,6 +44,8 @@
#include "common.h"
+#define MAX_LINE_LENGTH 1024 * 32
+
static void do_publish(amqp_connection_state_t conn,
char *exchange, char *routing_key,
amqp_basic_properties_t *props, amqp_bytes_t body)
@@ -62,10 +64,12 @@ int main(int argc, const char **argv)
char *routing_key = NULL;
char *content_type = NULL;
char *content_encoding = NULL;
+ char *reply_to = NULL;
char *body = NULL;
amqp_basic_properties_t props;
amqp_bytes_t body_bytes;
int delivery = 1; /* non-persistent by default */
+ int line_buffered = 0;
struct poptOption options[] = {
INCLUDE_OPTIONS(connect_options),
@@ -86,6 +90,14 @@ int main(int argc, const char **argv)
"the content-type for the message", "content type"
},
{
+ "reply-to", 't', POPT_ARG_STRING, &reply_to, 0,
+ "the replyTo to use for the message", "reply to"
+ },
+ {
+ "line-buffered", 'l', POPT_ARG_VAL, &line_buffered, 2,
+ "treat each line from standard in as a separate message", NULL
+ },
+ {
"content-encoding", 'E', POPT_ARG_STRING,
&content_encoding, 0,
"the content-encoding for the message", "content encoding"
@@ -120,15 +132,30 @@ int main(int argc, const char **argv)
props.content_encoding = amqp_cstring_bytes(content_encoding);
}
+ if (reply_to) {
+ props._flags |= AMQP_BASIC_REPLY_TO_FLAG;
+ props.reply_to = amqp_cstring_bytes(reply_to);
+ }
+
conn = make_connection();
if (body) {
body_bytes = amqp_cstring_bytes(body);
} else {
- body_bytes = read_all(0);
+ if ( line_buffered ) {
+ body_bytes.bytes = ( char * ) malloc( MAX_LINE_LENGTH );
+ while ( fgets( body_bytes.bytes, MAX_LINE_LENGTH, stdin ) ) {
+ body_bytes.len = strlen( body_bytes.bytes );
+ do_publish(conn, exchange, routing_key, &props, body_bytes);
+ }
+ } else {
+ body_bytes = read_all(0);
+ }
}
- do_publish(conn, exchange, routing_key, &props, body_bytes);
+ if ( !line_buffered ) {
+ do_publish(conn, exchange, routing_key, &props, body_bytes);
+ }
if (!body) {
free(body_bytes.bytes);