summaryrefslogtreecommitdiff
path: root/tools
diff options
context:
space:
mode:
authorDavid Wragg <dpw@lshift.net>2010-02-20 22:58:40 +0000
committerDavid Wragg <dpw@lshift.net>2010-02-20 22:58:40 +0000
commitc90336ef0b6c8507a2f409db7e33dd1844b25517 (patch)
treefe58dc22a21ac75341dd1064fb48ba60a5c2e70c /tools
parent55ac202750859482c4319addb8c54368b2369455 (diff)
downloadrabbitmq-c-github-ask-c90336ef0b6c8507a2f409db7e33dd1844b25517.tar.gz
Command line AMQP tools based on rabbitmq-c
Diffstat (limited to 'tools')
-rw-r--r--tools/Makefile.am12
-rw-r--r--tools/common.c364
-rw-r--r--tools/common.h43
-rw-r--r--tools/common_consume.c110
-rw-r--r--tools/common_consume.h4
-rw-r--r--tools/consume.c77
-rw-r--r--tools/get.c41
-rw-r--r--tools/publish.c90
8 files changed, 741 insertions, 0 deletions
diff --git a/tools/Makefile.am b/tools/Makefile.am
new file mode 100644
index 0000000..f67d46b
--- /dev/null
+++ b/tools/Makefile.am
@@ -0,0 +1,12 @@
+bin_PROGRAMS = amqp-publish amqp-get amqp-consume
+
+AM_CFLAGS = -I$(top_srcdir)/librabbitmq
+AM_LDFLAGS = $(top_builddir)/librabbitmq/librabbitmq.la
+
+LDADD=$(LIBPOPT)
+
+noinst_HEADERS = common.h
+
+amqp_publish_SOURCES = publish.c common.c
+amqp_get_SOURCES = get.c common.c common_consume.c
+amqp_consume_SOURCES = consume.c common.c common_consume.c
diff --git a/tools/common.c b/tools/common.c
new file mode 100644
index 0000000..7baab8c
--- /dev/null
+++ b/tools/common.c
@@ -0,0 +1,364 @@
+#include "config.h"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdarg.h>
+#include <string.h>
+
+#include <unistd.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <spawn.h>
+#include <sys/wait.h>
+
+#include <popt.h>
+
+#include "common.h"
+
+extern char **environ;
+
+void die(const char *fmt, ...)
+{
+ va_list ap;
+ va_start(ap, fmt);
+ vfprintf(stderr, fmt, ap);
+ va_end(ap);
+ fprintf(stderr, "\n");
+ exit(1);
+}
+
+void die_errno(int err, const char *fmt, ...)
+{
+ if (err == 0)
+ return;
+
+ va_list ap;
+ va_start(ap, fmt);
+ vfprintf(stderr, fmt, ap);
+ va_end(ap);
+ fprintf(stderr, ": %s\n", strerror(err));
+ exit(1);
+}
+
+char *amqp_server_exception_string(amqp_rpc_reply_t r)
+{
+ int res;
+ char *s;
+
+ switch (r.reply.id) {
+ case AMQP_CONNECTION_CLOSE_METHOD: {
+ amqp_connection_close_t *m
+ = (amqp_connection_close_t *)r.reply.decoded;
+ res = asprintf(&s, "server connection error %d, message: %.*s",
+ m->reply_code,
+ (int)m->reply_text.len,
+ (char *)m->reply_text.bytes);
+ break;
+ }
+
+ case AMQP_CHANNEL_CLOSE_METHOD: {
+ amqp_channel_close_t *m
+ = (amqp_channel_close_t *)r.reply.decoded;
+ res = asprintf(&s, "server channel error %d, message: %.*s",
+ m->reply_code,
+ (int)m->reply_text.len,
+ (char *)m->reply_text.bytes);
+ break;
+ }
+
+ default:
+ res = asprintf(&s, "unknown server error, method id 0x%08X",
+ r.reply.id);
+ break;
+ }
+
+ return res >= 0 ? s : NULL;
+}
+
+char *amqp_rpc_reply_string(amqp_rpc_reply_t r)
+{
+ const char *s;
+
+ switch (r.reply_type) {
+ case AMQP_RESPONSE_NORMAL:
+ s = "normal response";
+ break;
+
+ case AMQP_RESPONSE_NONE:
+ s = "missing RPC reply type";
+ break;
+
+ case AMQP_RESPONSE_LIBRARY_EXCEPTION:
+ if (r.library_errno)
+ s = strerror(r.library_errno);
+ else
+ s = "end of stream";
+
+ break;
+
+ case AMQP_RESPONSE_SERVER_EXCEPTION:
+ return amqp_server_exception_string(r);
+
+ default:
+ abort();
+ }
+
+ return strdup(s);
+}
+
+void die_rpc(amqp_rpc_reply_t r, const char *fmt, ...)
+{
+ if (r.reply_type == AMQP_RESPONSE_NORMAL)
+ return;
+
+ va_list ap;
+ va_start(ap, fmt);
+ vfprintf(stderr, fmt, ap);
+ va_end(ap);
+ fprintf(stderr, ": %s\n", amqp_rpc_reply_string(r));
+ exit(1);
+}
+
+void set_cloexec(int fd)
+{
+ int flags;
+
+ flags = fcntl(fd, F_GETFD);
+ if (flags == -1
+ || fcntl(fd, F_SETFD, (long)(flags | FD_CLOEXEC)) == -1)
+ die_errno(errno, "set_cloexec");
+}
+
+static char *amqp_server = "localhost";
+static char *amqp_vhost = "/";
+static char *amqp_username = "guest";
+static char *amqp_password = "guest";
+
+const char *connect_options_title = "Connection options";
+struct poptOption connect_options[] = {
+ {"server", 's', POPT_ARG_STRING, &amqp_server, 0,
+ "the AMQP server to connect to", "server"},
+ {"vhost", 0, POPT_ARG_STRING, &amqp_vhost, 0,
+ "the vhost to use when connecting", "vhost"},
+ {"username", 0, POPT_ARG_STRING, &amqp_username, 0,
+ "the username to login with", "username"},
+ {"password", 0, POPT_ARG_STRING, &amqp_password, 0,
+ "the password to login with", "password"},
+ { NULL, 0, 0, NULL, 0 }
+};
+
+amqp_connection_state_t make_connection(void)
+{
+ int s;
+ amqp_connection_state_t conn;
+ char *host = amqp_server;
+ int port = 0;
+
+ /* parse the server string into a hostname and a port */
+ char *colon = strchr(amqp_server, ':');
+ if (colon) {
+ char *port_end;
+ size_t host_len = colon - amqp_server;
+ host = malloc(host_len + 1);
+ memcpy(host, amqp_server, host_len);
+ host[host_len] = 0;
+
+ port = strtol(colon+1, &port_end, 10);
+ if (port < 0
+ || port > 65535
+ || port_end == colon+1
+ || *port_end != 0)
+ die("bad server port number in %s", amqp_server);
+ }
+
+ s = amqp_open_socket(host, port ? port : 5672);
+ if (s < 0) {
+ if (s == -ENOENT)
+ die("unknown host %s", host);
+ else
+ die_errno(-s, "opening socket to %s", amqp_server);
+ }
+
+ set_cloexec(s);
+
+ conn = amqp_new_connection();
+ amqp_set_sockfd(conn, s);
+
+ die_rpc(amqp_login(conn, amqp_vhost, 0, 131072, 0,
+ AMQP_SASL_METHOD_PLAIN,
+ amqp_username, amqp_password),
+ "logging in to AMQP server");
+
+ if (!amqp_channel_open(conn, 1))
+ die_rpc(amqp_get_rpc_reply(conn), "opening channel");
+
+ return conn;
+}
+
+void close_connection(amqp_connection_state_t conn)
+{
+ int s = amqp_get_sockfd(conn);
+
+ die_rpc(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS),
+ "closing channel");
+ die_rpc(amqp_connection_close(conn, AMQP_REPLY_SUCCESS),
+ "closing connection");
+ amqp_destroy_connection(conn);
+
+ if (close(s) < 0)
+ die_errno(errno, "closing socket");
+}
+
+amqp_bytes_t read_all(int fd)
+{
+ size_t space = 4096;
+ amqp_bytes_t bytes;
+
+ bytes.bytes = malloc(space);
+ bytes.len = 0;
+
+ for (;;) {
+ ssize_t res = read(fd, bytes.bytes+bytes.len, space-bytes.len);
+ if (res == 0)
+ break;
+
+ if (res < 0) {
+ if (errno == EINTR)
+ continue;
+
+ die_errno(errno, "reading");
+ }
+
+ bytes.len += res;
+ if (bytes.len == space) {
+ space *= 2;
+ bytes.bytes = realloc(bytes.bytes, space);
+ }
+ }
+
+ return bytes;
+}
+
+void write_all(int fd, amqp_bytes_t data)
+{
+ while (data.len > 0) {
+ ssize_t res = write(fd, data.bytes, data.len);
+ if (res < 0)
+ die_errno(errno, "write");
+
+ data.len -= res;
+ data.bytes += res;
+ }
+}
+
+void copy_body(amqp_connection_state_t conn, int fd)
+{
+ size_t body_remaining;
+ amqp_frame_t frame;
+
+ int res = amqp_simple_wait_frame(conn, &frame);
+ if (res < 0)
+ die_errno(-res, "waiting for header frame");
+ if (frame.frame_type != AMQP_FRAME_HEADER)
+ die("expected header, got frame type 0x%X",
+ frame.frame_type);
+
+ body_remaining = frame.payload.properties.body_size;
+ while (body_remaining) {
+ res = amqp_simple_wait_frame(conn, &frame);
+ if (res < 0)
+ die_errno(-res, "waiting for body frame");
+ if (frame.frame_type != AMQP_FRAME_BODY)
+ die("expected body, got frame type 0x%X",
+ frame.frame_type);
+
+ write_all(fd, frame.payload.body_fragment);
+ body_remaining -= frame.payload.body_fragment.len;
+ }
+}
+
+void pipeline(const char * const *argv, struct pipeline *pl)
+{
+ posix_spawn_file_actions_t file_acts;
+
+ int pipefds[2];
+ if (pipe(pipefds))
+ die_errno(errno, "pipe");
+
+ die_errno(posix_spawn_file_actions_init(&file_acts),
+ "posix_spawn_file_actions_init");
+ die_errno(posix_spawn_file_actions_adddup2(&file_acts, pipefds[0], 0),
+ "posix_spawn_file_actions_adddup2");
+ die_errno(posix_spawn_file_actions_addclose(&file_acts, pipefds[0]),
+ "posix_spawn_file_actions_addclose");
+ die_errno(posix_spawn_file_actions_addclose(&file_acts, pipefds[1]),
+ "posix_spawn_file_actions_addclose");
+
+ die_errno(posix_spawnp(&pl->pid, argv[0], &file_acts, NULL,
+ (char * const *)argv, environ),
+ "posix_spawnp: %s", argv[0]);
+
+ die_errno(posix_spawn_file_actions_destroy(&file_acts),
+ "posix_spawn_file_actions_destroy");
+
+ if (close(pipefds[0]))
+ die_errno(errno, "close");
+
+ pl->infd = pipefds[1];
+}
+
+int finish_pipeline(struct pipeline *pl)
+{
+ int status;
+
+ if (close(pl->infd))
+ die_errno(errno, "close");
+ if (waitpid(pl->pid, &status, 0) < 0)
+ die_errno(errno, "waitpid");
+ return WIFEXITED(status) && WEXITSTATUS(status) == 0;
+}
+
+poptContext process_options(int argc, const char **argv,
+ struct poptOption *options,
+ const char *help)
+{
+ int c;
+ poptContext opts = poptGetContext(NULL, argc, argv, options, 0);
+
+ if (!help)
+ poptSetOtherOptionHelp(opts, "[OPTIONS]... <command> <args>");
+
+ while ((c = poptGetNextOpt(opts)) >= 0) {
+ // no options require explicit handling
+ }
+
+ if (c < -1) {
+ fprintf(stderr, "%s: %s\n",
+ poptBadOption(opts, POPT_BADOPTION_NOALIAS),
+ poptStrerror(c));
+ poptPrintUsage(opts, stderr, 0);
+ exit(1);
+ }
+
+ return opts;
+}
+
+void process_all_options(int argc, const char **argv,
+ struct poptOption *options)
+{
+ poptContext opts = process_options(argc, argv, options, NULL);
+ const char *opt = poptPeekArg(opts);
+
+ if (opt) {
+ fprintf(stderr, "unexpected operand: %s\n", opt);
+ poptPrintUsage(opts, stderr, 0);
+ exit(1);
+ }
+
+ poptFreeContext(opts);
+}
+
+amqp_bytes_t cstring_bytes(const char *str)
+{
+ return str ? amqp_cstring_bytes(str) : AMQP_EMPTY_BYTES;
+}
diff --git a/tools/common.h b/tools/common.h
new file mode 100644
index 0000000..b54afcb
--- /dev/null
+++ b/tools/common.h
@@ -0,0 +1,43 @@
+#include <stdint.h>
+
+#include <amqp.h>
+#include <amqp_framing.h>
+
+extern char *amqp_server_exception_string(amqp_rpc_reply_t r);
+extern char *amqp_rpc_reply_string(amqp_rpc_reply_t r);
+
+extern void die(const char *fmt, ...)
+ __attribute__ ((format (printf, 1, 2)));
+extern void die_errno(int err, const char *fmt, ...)
+ __attribute__ ((format (printf, 2, 3)));
+extern void die_rpc(amqp_rpc_reply_t r, const char *fmt, ...)
+ __attribute__ ((format (printf, 2, 3)));
+
+extern const char *connect_options_title;
+extern struct poptOption connect_options[];
+extern amqp_connection_state_t make_connection(void);
+extern void close_connection(amqp_connection_state_t conn);
+
+extern amqp_bytes_t read_all(int fd);
+extern void write_all(int fd, amqp_bytes_t data);
+
+extern void copy_body(amqp_connection_state_t conn, int fd);
+
+struct pipeline {
+ int pid;
+ int infd;
+};
+
+extern void pipeline(const char * const *argv, struct pipeline *pl);
+extern int finish_pipeline(struct pipeline *pl);
+
+#define INCLUDE_OPTIONS(options) \
+ {NULL, 0, POPT_ARG_INCLUDE_TABLE, options, 0, options ## _title, NULL}
+
+extern poptContext process_options(int argc, const char **argv,
+ struct poptOption *options,
+ const char *help);
+extern void process_all_options(int argc, const char **argv,
+ struct poptOption *options);
+
+extern amqp_bytes_t cstring_bytes(const char *str);
diff --git a/tools/common_consume.c b/tools/common_consume.c
new file mode 100644
index 0000000..85b9069
--- /dev/null
+++ b/tools/common_consume.c
@@ -0,0 +1,110 @@
+#include "config.h"
+
+#include <stdio.h>
+#include <stdlib.h>
+
+#include <popt.h>
+
+#include "common.h"
+
+static char *queue;
+static char *exchange;
+static char *exchange_type;
+static char *routing_key;
+
+const char *consume_queue_options_title = "Source queue options";
+struct poptOption consume_queue_options[] = {
+ {"queue", 'q', POPT_ARG_STRING, &queue, 0,
+ "the queue to consume from", "queue"},
+ {"exchange", 'e', POPT_ARG_STRING, &exchange, 0,
+ "bind the queue to this exchange", "exchange"},
+ {"exchange-type", 't', POPT_ARG_STRING, &exchange_type, 0,
+ "create auto-delete exchange of this type for binding", "type"},
+ {"routing-key", 'r', POPT_ARG_STRING, &routing_key, 0,
+ "the routing key to bind with", "routing key"},
+ { NULL, 0, 0, NULL, 0 }
+};
+
+/* Convert a amqp_bytes_t to an escaped string form for printing. We
+ use the same escaping conventions as rabbitmqctl. */
+static char *stringify_bytes(amqp_bytes_t bytes)
+{
+ /* W will need up to 4 chars per byte, plus the terminating 0 */
+ char *res = malloc(bytes.len * 4 + 1);
+ uint8_t *data = bytes.bytes;
+ char *p = res;
+ size_t i;
+
+ for (i = 0; i < bytes.len; i++) {
+ if (data[i] >= 32 && data[i] != 127) {
+ *p++ = data[i];
+ }
+ else {
+ *p++ = '\\';
+ *p++ = '0' + (data[i] >> 6);
+ *p++ = '0' + (data[i] >> 3 & 0x7);
+ *p++ = '0' + (data[i] & 0x7);
+ }
+ }
+
+ *p = 0;
+ return res;
+}
+
+amqp_bytes_t setup_queue(amqp_connection_state_t conn)
+{
+ /* if an exchange name wasn't provided, check that we don't
+ have options that require it. */
+ if (!exchange) {
+ char *opt = NULL;
+ if (routing_key)
+ opt = "--routing-key";
+ else if (exchange_type)
+ opt = "--exchange-type";
+
+ if (opt) {
+ fprintf(stderr,
+ "%s option requires an exchange name to be "
+ "provided with --exchange\n", opt);
+ exit(1);
+ }
+ }
+
+ /* Declare the queue. If the queue already exists, this won't have
+ any effect. */
+ amqp_bytes_t queue_bytes = cstring_bytes(queue);
+ amqp_queue_declare_ok_t *res
+ = amqp_queue_declare(conn, 1, queue_bytes, 0, 0, 0, 1,
+ AMQP_EMPTY_TABLE);
+ if (!res)
+ die_rpc(amqp_get_rpc_reply(conn), "queue.declare");
+
+ if (!queue) {
+ // the server should have provided a queue name
+ char *sq;
+ queue_bytes = amqp_bytes_malloc_dup(res->queue);
+ sq = stringify_bytes(queue_bytes);
+ fprintf(stderr, "Server provided queue name: %s\n", sq);
+ free(sq);
+ }
+
+ /* Bind to an exchange if requested */
+ if (exchange) {
+ amqp_bytes_t eb = amqp_cstring_bytes(exchange);
+
+ if (exchange_type) {
+ // we should create the exchange
+ if (!amqp_exchange_declare(conn, 1, eb,
+ amqp_cstring_bytes(exchange_type),
+ 0, 0, 1, AMQP_EMPTY_TABLE))
+ die_rpc(amqp_get_rpc_reply(conn), "exchange.declare");
+ }
+
+ if (!amqp_queue_bind(conn, 1, queue_bytes, eb,
+ cstring_bytes(routing_key),
+ AMQP_EMPTY_TABLE))
+ die_rpc(amqp_get_rpc_reply(conn), "queue.bind");
+ }
+
+ return queue_bytes;
+}
diff --git a/tools/common_consume.h b/tools/common_consume.h
new file mode 100644
index 0000000..e5786be
--- /dev/null
+++ b/tools/common_consume.h
@@ -0,0 +1,4 @@
+extern const char *consume_queue_options_title;
+extern struct poptOption consume_queue_options[];
+extern amqp_bytes_t setup_queue(amqp_connection_state_t conn);
+
diff --git a/tools/consume.c b/tools/consume.c
new file mode 100644
index 0000000..6ca00e0
--- /dev/null
+++ b/tools/consume.c
@@ -0,0 +1,77 @@
+#include "config.h"
+
+#include <stdio.h>
+
+#include <popt.h>
+
+#include "common.h"
+#include "common_consume.h"
+
+static void do_consume(amqp_connection_state_t conn, int no_ack,
+ const char * const *argv)
+{
+ if (!amqp_basic_consume(conn, 1, setup_queue(conn),
+ AMQP_EMPTY_BYTES, 0, no_ack, 0))
+ die_rpc(amqp_get_rpc_reply(conn), "basic.consume");
+
+ for (;;) {
+ amqp_frame_t frame;
+ struct pipeline pl;
+ uint64_t delivery_tag;
+ int res = amqp_simple_wait_frame(conn, &frame);
+ if (res < 0)
+ die_errno(-res, "waiting for header frame");
+
+ if (frame.frame_type != AMQP_FRAME_METHOD
+ || frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD)
+ continue;
+
+ amqp_basic_deliver_t *deliver
+ = (amqp_basic_deliver_t *)frame.payload.method.decoded;
+ delivery_tag = deliver->delivery_tag;
+
+ pipeline(argv, &pl);
+ copy_body(conn, pl.infd);
+
+ if (finish_pipeline(&pl) && !no_ack)
+ die_errno(-amqp_basic_ack(conn, 1, delivery_tag, 0),
+ "basic.ack");
+
+ amqp_maybe_release_buffers(conn);
+ }
+}
+
+int main(int argc, const char **argv)
+{
+ poptContext opts;
+ int no_ack;
+ amqp_connection_state_t conn;
+ const char * const *cmd_argv;
+
+ struct poptOption options[] = {
+ INCLUDE_OPTIONS(connect_options),
+ INCLUDE_OPTIONS(consume_queue_options),
+ {"no-ack", 'A', POPT_ARG_NONE, &no_ack, 0,
+ "consume in no-ack mode", NULL},
+ POPT_AUTOHELP
+ { NULL, 0, 0, NULL, 0 }
+ };
+
+ opts = process_options(argc, argv, options,
+ "[OPTIONS]... <command> <args>");
+
+ cmd_argv = poptGetArgs(opts);
+ if (!cmd_argv[0]) {
+ fprintf(stderr, "consuming command not specified");
+ goto error;
+ }
+
+ conn = make_connection();
+ do_consume(conn, no_ack, cmd_argv);
+ close_connection(conn);
+ return 0;
+
+error:
+ poptFreeContext(opts);
+ return 1;
+}
diff --git a/tools/get.c b/tools/get.c
new file mode 100644
index 0000000..6efccd1
--- /dev/null
+++ b/tools/get.c
@@ -0,0 +1,41 @@
+#include "config.h"
+
+#include <stdio.h>
+
+#include <popt.h>
+
+#include "common.h"
+#include "common_consume.h"
+
+static int do_get(amqp_connection_state_t conn)
+{
+ amqp_rpc_reply_t r
+ = amqp_basic_get(conn, 1, setup_queue(conn), 1);
+ die_rpc(r, "basic.get");
+
+ if (r.reply.id == AMQP_BASIC_GET_EMPTY_METHOD)
+ return 0;
+
+ copy_body(conn, 1);
+ return 1;
+}
+
+int main(int argc, const char **argv)
+{
+ amqp_connection_state_t conn;
+ int got_something;
+
+ struct poptOption options[] = {
+ INCLUDE_OPTIONS(connect_options),
+ INCLUDE_OPTIONS(consume_queue_options),
+ POPT_AUTOHELP
+ { NULL, 0, 0, NULL, 0 }
+ };
+
+ process_all_options(argc, argv, options);
+
+ conn = make_connection();
+ got_something = do_get(conn);
+ close_connection(conn);
+ return got_something ? 0 : 2;
+}
diff --git a/tools/publish.c b/tools/publish.c
new file mode 100644
index 0000000..08a78b7
--- /dev/null
+++ b/tools/publish.c
@@ -0,0 +1,90 @@
+#include "config.h"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <popt.h>
+
+#include "common.h"
+
+static void do_publish(amqp_connection_state_t conn,
+ char *exchange, char *routing_key,
+ amqp_basic_properties_t *props, amqp_bytes_t body)
+{
+ int res = amqp_basic_publish(conn, 1,
+ cstring_bytes(exchange),
+ cstring_bytes(routing_key),
+ 0, 0, props, body);
+ if (res != 0)
+ die_errno(-res, "basic.publish");
+}
+
+int main(int argc, const char **argv)
+{
+ amqp_connection_state_t conn;
+ char *exchange = NULL;
+ char *routing_key = NULL;
+ char *content_type = NULL;
+ char *content_encoding = NULL;
+ char *body = NULL;
+ amqp_basic_properties_t props;
+ amqp_bytes_t body_bytes;
+ int delivery = 1; /* non-persistent by default */
+
+ struct poptOption options[] = {
+ INCLUDE_OPTIONS(connect_options),
+ {"exchange", 'e', POPT_ARG_STRING, &exchange, 0,
+ "the exchange to publish to", "exchange"},
+ {"routing-key", 'r', POPT_ARG_STRING, &routing_key, 0,
+ "the routing key to publish with", "routing key"},
+ {"persistent", 'p', POPT_ARG_VAL, &delivery, 2,
+ "use the persistent delivery mode", NULL},
+ {"content-type", 'C', POPT_ARG_STRING, &content_type, 0,
+ "the content-type for the message", "content type"},
+ {"content-encoding", 'E', POPT_ARG_STRING,
+ &content_encoding, 0,
+ "the content-encoding for the message", "content encoding"},
+ {"body", 'b', POPT_ARG_STRING, &body, 0,
+ "specify the message body", "body"},
+ POPT_AUTOHELP
+ { NULL, 0, 0, NULL, 0 }
+ };
+
+ process_all_options(argc, argv, options);
+
+ if (!exchange && !routing_key) {
+ fprintf(stderr,
+ "neither exchange nor routing key specified\n");
+ return 1;
+ }
+
+ memset(&props, 0, sizeof props);
+ props._flags = AMQP_BASIC_DELIVERY_MODE_FLAG;
+ props.delivery_mode = 2; // persistent delivery mode
+
+ if (content_type) {
+ props._flags |= AMQP_BASIC_CONTENT_TYPE_FLAG;
+ props.content_type = amqp_cstring_bytes(content_type);
+ }
+
+ if (content_encoding) {
+ props._flags |= AMQP_BASIC_CONTENT_ENCODING_FLAG;
+ props.content_encoding = amqp_cstring_bytes(content_encoding);
+ }
+
+ conn = make_connection();
+
+ if (body)
+ body_bytes = amqp_cstring_bytes(body);
+ else
+ body_bytes = read_all(0);
+
+ do_publish(conn, exchange, routing_key, &props, body_bytes);
+
+ if (!body)
+ free(body_bytes.bytes);
+
+ close_connection(conn);
+ return 0;
+}