summaryrefslogtreecommitdiff
path: root/tools/common_consume.c
blob: 85b9069179880f73230f60d5b115fea510f3196b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
#include "config.h"

#include <stdio.h>
#include <stdlib.h>

#include <popt.h>

#include "common.h"

static char *queue;
static char *exchange;
static char *exchange_type;
static char *routing_key;

const char *consume_queue_options_title = "Source queue options";
struct poptOption consume_queue_options[] = {
	{"queue", 'q', POPT_ARG_STRING, &queue, 0,
	 "the queue to consume from", "queue"},
	{"exchange", 'e', POPT_ARG_STRING, &exchange, 0,
	 "bind the queue to this exchange", "exchange"},
	{"exchange-type", 't', POPT_ARG_STRING, &exchange_type, 0,
	 "create auto-delete exchange of this type for binding", "type"},
	{"routing-key", 'r', POPT_ARG_STRING, &routing_key, 0,
	 "the routing key to bind with", "routing key"},
	{ NULL, 0, 0, NULL, 0 }
};

/* Convert a amqp_bytes_t to an escaped string form for printing.  We
   use the same escaping conventions as rabbitmqctl. */
static char *stringify_bytes(amqp_bytes_t bytes)
{
	/* W will need up to 4 chars per byte, plus the terminating 0 */
	char *res = malloc(bytes.len * 4 + 1);
	uint8_t *data = bytes.bytes;
	char *p = res;
	size_t i;
	
	for (i = 0; i < bytes.len; i++) {
		if (data[i] >= 32 && data[i] != 127) {
			*p++ = data[i];
		}
		else {
			*p++ = '\\';
			*p++ = '0' + (data[i] >> 6);
			*p++ = '0' + (data[i] >> 3 & 0x7); 
			*p++ = '0' + (data[i] & 0x7);
		}
	}

	*p = 0;
	return res;
}

amqp_bytes_t setup_queue(amqp_connection_state_t conn)
{
	/* if an exchange name wasn't provided, check that we don't
	   have options that require it. */
	if (!exchange) {
		char *opt = NULL;
		if (routing_key)
			opt = "--routing-key";
		else if (exchange_type)
			opt = "--exchange-type";

		if (opt) {
			fprintf(stderr,
				"%s option requires an exchange name to be "
				"provided with --exchange\n", opt);
			exit(1);
		}
	}

	/* Declare the queue.  If the queue already exists, this won't have
	   any effect. */
	amqp_bytes_t queue_bytes = cstring_bytes(queue);
	amqp_queue_declare_ok_t *res
		= amqp_queue_declare(conn, 1, queue_bytes, 0, 0, 0, 1,
				     AMQP_EMPTY_TABLE);
	if (!res)
		die_rpc(amqp_get_rpc_reply(conn), "queue.declare");

	if (!queue) {
		// the server should have provided a queue name
		char *sq;
		queue_bytes = amqp_bytes_malloc_dup(res->queue);
		sq = stringify_bytes(queue_bytes);
		fprintf(stderr, "Server provided queue name: %s\n", sq);
		free(sq);
	}

	/* Bind to an exchange if requested */
	if (exchange) {
		amqp_bytes_t eb = amqp_cstring_bytes(exchange);
		
		if (exchange_type) {
			// we should create the exchange
			if (!amqp_exchange_declare(conn, 1, eb, 
					     amqp_cstring_bytes(exchange_type),
					     0, 0, 1, AMQP_EMPTY_TABLE))
			die_rpc(amqp_get_rpc_reply(conn), "exchange.declare");
		}
		
		if (!amqp_queue_bind(conn, 1, queue_bytes, eb,
				     cstring_bytes(routing_key),
				     AMQP_EMPTY_TABLE))
			die_rpc(amqp_get_rpc_reply(conn), "queue.bind");
	}
	
	return queue_bytes;
}