From c90336ef0b6c8507a2f409db7e33dd1844b25517 Mon Sep 17 00:00:00 2001 From: David Wragg Date: Sat, 20 Feb 2010 22:58:40 +0000 Subject: Command line AMQP tools based on rabbitmq-c --- .hgignore | 5 +- Makefile.am | 10 +- configure.ac | 20 +++ tools/Makefile.am | 12 ++ tools/common.c | 364 +++++++++++++++++++++++++++++++++++++++++++++++++ tools/common.h | 43 ++++++ tools/common_consume.c | 110 +++++++++++++++ tools/common_consume.h | 4 + tools/consume.c | 77 +++++++++++ tools/get.c | 41 ++++++ tools/publish.c | 90 ++++++++++++ 11 files changed, 773 insertions(+), 3 deletions(-) create mode 100644 tools/Makefile.am create mode 100644 tools/common.c create mode 100644 tools/common.h create mode 100644 tools/common_consume.c create mode 100644 tools/common_consume.h create mode 100644 tools/consume.c create mode 100644 tools/get.c create mode 100644 tools/publish.c diff --git a/.hgignore b/.hgignore index 01ba313..3719f47 100644 --- a/.hgignore +++ b/.hgignore @@ -18,7 +18,7 @@ ^librabbitmq/amqp_framing\.[ch]$ -^(|librabbitmq/|tests/|examples/)Makefile(\.in)?$ +^(|librabbitmq/|tests/|examples/|tools/)Makefile(\.in)?$ ^tests/test_tables$ ^examples/amqp_sendstring$ ^examples/amqp_exchange_declare$ @@ -28,3 +28,6 @@ ^examples/amqp_unbind$ ^examples/amqp_bind$ ^examples/amqp_listenq$ +^tools/amqp-publish$ +^tools/amqp-get$ +^tools/amqp-consume$ diff --git a/Makefile.am b/Makefile.am index 3153ad9..c5ff6e7 100644 --- a/Makefile.am +++ b/Makefile.am @@ -1,7 +1,13 @@ -SUBDIRS=librabbitmq tests examples +if TOOLS +TOOLS_SUBDIR=tools +else +TOOLS_SUBDIR= +endif + +SUBDIRS=librabbitmq tests examples $(TOOLS_SUBDIR) squeakyclean: maintainer-clean - rm -f Makefile.in librabbitmq/Makefile.in tests/Makefile.in examples/Makefile.in + rm -f Makefile.in librabbitmq/Makefile.in tests/Makefile.in examples/Makefile.in tools/Makefile.in rm -f aclocal.m4 rm -f config.guess config.h.in* config.sub configure rm -f depcomp install-sh ltmain.sh missing diff --git a/configure.ac b/configure.ac index cb835e8..4b653ba 100644 --- a/configure.ac +++ b/configure.ac @@ -5,6 +5,7 @@ AM_INIT_AUTOMAKE AC_CONFIG_HEADER([config.h]) dnl Program checks +AC_GNU_SOURCE AC_PROG_CC dnl Library checks @@ -63,9 +64,28 @@ AC_SUBST(AMQP_CODEGEN_DIR) AC_SUBST(AMQP_SPEC_JSON_PATH) AC_SUBST(PYTHON) +AC_ARG_WITH([popt], + [AS_HELP_STRING([--with-popt], [use the popt library. Needed for tools])], + [], + [with_popt=check]) + +LIBPOPT= +AS_IF([test "x$with_popt" != xno], + [AC_CHECK_LIB([popt], [poptGetContext], + [AC_SUBST([LIBPOPT], ["-lpopt"]) + AC_DEFINE([HAVE_LIBPOPT], [1], [Define if you have libpopt]) + ], + [if test "x$with_popt" != xcheck; then + AC_MSG_FAILURE([--with-popt was given, but test for libpopt failed]) + fi + ])]) + +AM_CONDITIONAL(TOOLS, test "x$LIBPOPT" != "x") + AC_OUTPUT( Makefile librabbitmq/Makefile tests/Makefile examples/Makefile +tools/Makefile ) diff --git a/tools/Makefile.am b/tools/Makefile.am new file mode 100644 index 0000000..f67d46b --- /dev/null +++ b/tools/Makefile.am @@ -0,0 +1,12 @@ +bin_PROGRAMS = amqp-publish amqp-get amqp-consume + +AM_CFLAGS = -I$(top_srcdir)/librabbitmq +AM_LDFLAGS = $(top_builddir)/librabbitmq/librabbitmq.la + +LDADD=$(LIBPOPT) + +noinst_HEADERS = common.h + +amqp_publish_SOURCES = publish.c common.c +amqp_get_SOURCES = get.c common.c common_consume.c +amqp_consume_SOURCES = consume.c common.c common_consume.c diff --git a/tools/common.c b/tools/common.c new file mode 100644 index 0000000..7baab8c --- /dev/null +++ b/tools/common.c @@ -0,0 +1,364 @@ +#include "config.h" + +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include + +#include "common.h" + +extern char **environ; + +void die(const char *fmt, ...) +{ + va_list ap; + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); + fprintf(stderr, "\n"); + exit(1); +} + +void die_errno(int err, const char *fmt, ...) +{ + if (err == 0) + return; + + va_list ap; + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); + fprintf(stderr, ": %s\n", strerror(err)); + exit(1); +} + +char *amqp_server_exception_string(amqp_rpc_reply_t r) +{ + int res; + char *s; + + switch (r.reply.id) { + case AMQP_CONNECTION_CLOSE_METHOD: { + amqp_connection_close_t *m + = (amqp_connection_close_t *)r.reply.decoded; + res = asprintf(&s, "server connection error %d, message: %.*s", + m->reply_code, + (int)m->reply_text.len, + (char *)m->reply_text.bytes); + break; + } + + case AMQP_CHANNEL_CLOSE_METHOD: { + amqp_channel_close_t *m + = (amqp_channel_close_t *)r.reply.decoded; + res = asprintf(&s, "server channel error %d, message: %.*s", + m->reply_code, + (int)m->reply_text.len, + (char *)m->reply_text.bytes); + break; + } + + default: + res = asprintf(&s, "unknown server error, method id 0x%08X", + r.reply.id); + break; + } + + return res >= 0 ? s : NULL; +} + +char *amqp_rpc_reply_string(amqp_rpc_reply_t r) +{ + const char *s; + + switch (r.reply_type) { + case AMQP_RESPONSE_NORMAL: + s = "normal response"; + break; + + case AMQP_RESPONSE_NONE: + s = "missing RPC reply type"; + break; + + case AMQP_RESPONSE_LIBRARY_EXCEPTION: + if (r.library_errno) + s = strerror(r.library_errno); + else + s = "end of stream"; + + break; + + case AMQP_RESPONSE_SERVER_EXCEPTION: + return amqp_server_exception_string(r); + + default: + abort(); + } + + return strdup(s); +} + +void die_rpc(amqp_rpc_reply_t r, const char *fmt, ...) +{ + if (r.reply_type == AMQP_RESPONSE_NORMAL) + return; + + va_list ap; + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); + fprintf(stderr, ": %s\n", amqp_rpc_reply_string(r)); + exit(1); +} + +void set_cloexec(int fd) +{ + int flags; + + flags = fcntl(fd, F_GETFD); + if (flags == -1 + || fcntl(fd, F_SETFD, (long)(flags | FD_CLOEXEC)) == -1) + die_errno(errno, "set_cloexec"); +} + +static char *amqp_server = "localhost"; +static char *amqp_vhost = "/"; +static char *amqp_username = "guest"; +static char *amqp_password = "guest"; + +const char *connect_options_title = "Connection options"; +struct poptOption connect_options[] = { + {"server", 's', POPT_ARG_STRING, &amqp_server, 0, + "the AMQP server to connect to", "server"}, + {"vhost", 0, POPT_ARG_STRING, &amqp_vhost, 0, + "the vhost to use when connecting", "vhost"}, + {"username", 0, POPT_ARG_STRING, &amqp_username, 0, + "the username to login with", "username"}, + {"password", 0, POPT_ARG_STRING, &amqp_password, 0, + "the password to login with", "password"}, + { NULL, 0, 0, NULL, 0 } +}; + +amqp_connection_state_t make_connection(void) +{ + int s; + amqp_connection_state_t conn; + char *host = amqp_server; + int port = 0; + + /* parse the server string into a hostname and a port */ + char *colon = strchr(amqp_server, ':'); + if (colon) { + char *port_end; + size_t host_len = colon - amqp_server; + host = malloc(host_len + 1); + memcpy(host, amqp_server, host_len); + host[host_len] = 0; + + port = strtol(colon+1, &port_end, 10); + if (port < 0 + || port > 65535 + || port_end == colon+1 + || *port_end != 0) + die("bad server port number in %s", amqp_server); + } + + s = amqp_open_socket(host, port ? port : 5672); + if (s < 0) { + if (s == -ENOENT) + die("unknown host %s", host); + else + die_errno(-s, "opening socket to %s", amqp_server); + } + + set_cloexec(s); + + conn = amqp_new_connection(); + amqp_set_sockfd(conn, s); + + die_rpc(amqp_login(conn, amqp_vhost, 0, 131072, 0, + AMQP_SASL_METHOD_PLAIN, + amqp_username, amqp_password), + "logging in to AMQP server"); + + if (!amqp_channel_open(conn, 1)) + die_rpc(amqp_get_rpc_reply(conn), "opening channel"); + + return conn; +} + +void close_connection(amqp_connection_state_t conn) +{ + int s = amqp_get_sockfd(conn); + + die_rpc(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), + "closing channel"); + die_rpc(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), + "closing connection"); + amqp_destroy_connection(conn); + + if (close(s) < 0) + die_errno(errno, "closing socket"); +} + +amqp_bytes_t read_all(int fd) +{ + size_t space = 4096; + amqp_bytes_t bytes; + + bytes.bytes = malloc(space); + bytes.len = 0; + + for (;;) { + ssize_t res = read(fd, bytes.bytes+bytes.len, space-bytes.len); + if (res == 0) + break; + + if (res < 0) { + if (errno == EINTR) + continue; + + die_errno(errno, "reading"); + } + + bytes.len += res; + if (bytes.len == space) { + space *= 2; + bytes.bytes = realloc(bytes.bytes, space); + } + } + + return bytes; +} + +void write_all(int fd, amqp_bytes_t data) +{ + while (data.len > 0) { + ssize_t res = write(fd, data.bytes, data.len); + if (res < 0) + die_errno(errno, "write"); + + data.len -= res; + data.bytes += res; + } +} + +void copy_body(amqp_connection_state_t conn, int fd) +{ + size_t body_remaining; + amqp_frame_t frame; + + int res = amqp_simple_wait_frame(conn, &frame); + if (res < 0) + die_errno(-res, "waiting for header frame"); + if (frame.frame_type != AMQP_FRAME_HEADER) + die("expected header, got frame type 0x%X", + frame.frame_type); + + body_remaining = frame.payload.properties.body_size; + while (body_remaining) { + res = amqp_simple_wait_frame(conn, &frame); + if (res < 0) + die_errno(-res, "waiting for body frame"); + if (frame.frame_type != AMQP_FRAME_BODY) + die("expected body, got frame type 0x%X", + frame.frame_type); + + write_all(fd, frame.payload.body_fragment); + body_remaining -= frame.payload.body_fragment.len; + } +} + +void pipeline(const char * const *argv, struct pipeline *pl) +{ + posix_spawn_file_actions_t file_acts; + + int pipefds[2]; + if (pipe(pipefds)) + die_errno(errno, "pipe"); + + die_errno(posix_spawn_file_actions_init(&file_acts), + "posix_spawn_file_actions_init"); + die_errno(posix_spawn_file_actions_adddup2(&file_acts, pipefds[0], 0), + "posix_spawn_file_actions_adddup2"); + die_errno(posix_spawn_file_actions_addclose(&file_acts, pipefds[0]), + "posix_spawn_file_actions_addclose"); + die_errno(posix_spawn_file_actions_addclose(&file_acts, pipefds[1]), + "posix_spawn_file_actions_addclose"); + + die_errno(posix_spawnp(&pl->pid, argv[0], &file_acts, NULL, + (char * const *)argv, environ), + "posix_spawnp: %s", argv[0]); + + die_errno(posix_spawn_file_actions_destroy(&file_acts), + "posix_spawn_file_actions_destroy"); + + if (close(pipefds[0])) + die_errno(errno, "close"); + + pl->infd = pipefds[1]; +} + +int finish_pipeline(struct pipeline *pl) +{ + int status; + + if (close(pl->infd)) + die_errno(errno, "close"); + if (waitpid(pl->pid, &status, 0) < 0) + die_errno(errno, "waitpid"); + return WIFEXITED(status) && WEXITSTATUS(status) == 0; +} + +poptContext process_options(int argc, const char **argv, + struct poptOption *options, + const char *help) +{ + int c; + poptContext opts = poptGetContext(NULL, argc, argv, options, 0); + + if (!help) + poptSetOtherOptionHelp(opts, "[OPTIONS]... "); + + while ((c = poptGetNextOpt(opts)) >= 0) { + // no options require explicit handling + } + + if (c < -1) { + fprintf(stderr, "%s: %s\n", + poptBadOption(opts, POPT_BADOPTION_NOALIAS), + poptStrerror(c)); + poptPrintUsage(opts, stderr, 0); + exit(1); + } + + return opts; +} + +void process_all_options(int argc, const char **argv, + struct poptOption *options) +{ + poptContext opts = process_options(argc, argv, options, NULL); + const char *opt = poptPeekArg(opts); + + if (opt) { + fprintf(stderr, "unexpected operand: %s\n", opt); + poptPrintUsage(opts, stderr, 0); + exit(1); + } + + poptFreeContext(opts); +} + +amqp_bytes_t cstring_bytes(const char *str) +{ + return str ? amqp_cstring_bytes(str) : AMQP_EMPTY_BYTES; +} diff --git a/tools/common.h b/tools/common.h new file mode 100644 index 0000000..b54afcb --- /dev/null +++ b/tools/common.h @@ -0,0 +1,43 @@ +#include + +#include +#include + +extern char *amqp_server_exception_string(amqp_rpc_reply_t r); +extern char *amqp_rpc_reply_string(amqp_rpc_reply_t r); + +extern void die(const char *fmt, ...) + __attribute__ ((format (printf, 1, 2))); +extern void die_errno(int err, const char *fmt, ...) + __attribute__ ((format (printf, 2, 3))); +extern void die_rpc(amqp_rpc_reply_t r, const char *fmt, ...) + __attribute__ ((format (printf, 2, 3))); + +extern const char *connect_options_title; +extern struct poptOption connect_options[]; +extern amqp_connection_state_t make_connection(void); +extern void close_connection(amqp_connection_state_t conn); + +extern amqp_bytes_t read_all(int fd); +extern void write_all(int fd, amqp_bytes_t data); + +extern void copy_body(amqp_connection_state_t conn, int fd); + +struct pipeline { + int pid; + int infd; +}; + +extern void pipeline(const char * const *argv, struct pipeline *pl); +extern int finish_pipeline(struct pipeline *pl); + +#define INCLUDE_OPTIONS(options) \ + {NULL, 0, POPT_ARG_INCLUDE_TABLE, options, 0, options ## _title, NULL} + +extern poptContext process_options(int argc, const char **argv, + struct poptOption *options, + const char *help); +extern void process_all_options(int argc, const char **argv, + struct poptOption *options); + +extern amqp_bytes_t cstring_bytes(const char *str); diff --git a/tools/common_consume.c b/tools/common_consume.c new file mode 100644 index 0000000..85b9069 --- /dev/null +++ b/tools/common_consume.c @@ -0,0 +1,110 @@ +#include "config.h" + +#include +#include + +#include + +#include "common.h" + +static char *queue; +static char *exchange; +static char *exchange_type; +static char *routing_key; + +const char *consume_queue_options_title = "Source queue options"; +struct poptOption consume_queue_options[] = { + {"queue", 'q', POPT_ARG_STRING, &queue, 0, + "the queue to consume from", "queue"}, + {"exchange", 'e', POPT_ARG_STRING, &exchange, 0, + "bind the queue to this exchange", "exchange"}, + {"exchange-type", 't', POPT_ARG_STRING, &exchange_type, 0, + "create auto-delete exchange of this type for binding", "type"}, + {"routing-key", 'r', POPT_ARG_STRING, &routing_key, 0, + "the routing key to bind with", "routing key"}, + { NULL, 0, 0, NULL, 0 } +}; + +/* Convert a amqp_bytes_t to an escaped string form for printing. We + use the same escaping conventions as rabbitmqctl. */ +static char *stringify_bytes(amqp_bytes_t bytes) +{ + /* W will need up to 4 chars per byte, plus the terminating 0 */ + char *res = malloc(bytes.len * 4 + 1); + uint8_t *data = bytes.bytes; + char *p = res; + size_t i; + + for (i = 0; i < bytes.len; i++) { + if (data[i] >= 32 && data[i] != 127) { + *p++ = data[i]; + } + else { + *p++ = '\\'; + *p++ = '0' + (data[i] >> 6); + *p++ = '0' + (data[i] >> 3 & 0x7); + *p++ = '0' + (data[i] & 0x7); + } + } + + *p = 0; + return res; +} + +amqp_bytes_t setup_queue(amqp_connection_state_t conn) +{ + /* if an exchange name wasn't provided, check that we don't + have options that require it. */ + if (!exchange) { + char *opt = NULL; + if (routing_key) + opt = "--routing-key"; + else if (exchange_type) + opt = "--exchange-type"; + + if (opt) { + fprintf(stderr, + "%s option requires an exchange name to be " + "provided with --exchange\n", opt); + exit(1); + } + } + + /* Declare the queue. If the queue already exists, this won't have + any effect. */ + amqp_bytes_t queue_bytes = cstring_bytes(queue); + amqp_queue_declare_ok_t *res + = amqp_queue_declare(conn, 1, queue_bytes, 0, 0, 0, 1, + AMQP_EMPTY_TABLE); + if (!res) + die_rpc(amqp_get_rpc_reply(conn), "queue.declare"); + + if (!queue) { + // the server should have provided a queue name + char *sq; + queue_bytes = amqp_bytes_malloc_dup(res->queue); + sq = stringify_bytes(queue_bytes); + fprintf(stderr, "Server provided queue name: %s\n", sq); + free(sq); + } + + /* Bind to an exchange if requested */ + if (exchange) { + amqp_bytes_t eb = amqp_cstring_bytes(exchange); + + if (exchange_type) { + // we should create the exchange + if (!amqp_exchange_declare(conn, 1, eb, + amqp_cstring_bytes(exchange_type), + 0, 0, 1, AMQP_EMPTY_TABLE)) + die_rpc(amqp_get_rpc_reply(conn), "exchange.declare"); + } + + if (!amqp_queue_bind(conn, 1, queue_bytes, eb, + cstring_bytes(routing_key), + AMQP_EMPTY_TABLE)) + die_rpc(amqp_get_rpc_reply(conn), "queue.bind"); + } + + return queue_bytes; +} diff --git a/tools/common_consume.h b/tools/common_consume.h new file mode 100644 index 0000000..e5786be --- /dev/null +++ b/tools/common_consume.h @@ -0,0 +1,4 @@ +extern const char *consume_queue_options_title; +extern struct poptOption consume_queue_options[]; +extern amqp_bytes_t setup_queue(amqp_connection_state_t conn); + diff --git a/tools/consume.c b/tools/consume.c new file mode 100644 index 0000000..6ca00e0 --- /dev/null +++ b/tools/consume.c @@ -0,0 +1,77 @@ +#include "config.h" + +#include + +#include + +#include "common.h" +#include "common_consume.h" + +static void do_consume(amqp_connection_state_t conn, int no_ack, + const char * const *argv) +{ + if (!amqp_basic_consume(conn, 1, setup_queue(conn), + AMQP_EMPTY_BYTES, 0, no_ack, 0)) + die_rpc(amqp_get_rpc_reply(conn), "basic.consume"); + + for (;;) { + amqp_frame_t frame; + struct pipeline pl; + uint64_t delivery_tag; + int res = amqp_simple_wait_frame(conn, &frame); + if (res < 0) + die_errno(-res, "waiting for header frame"); + + if (frame.frame_type != AMQP_FRAME_METHOD + || frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) + continue; + + amqp_basic_deliver_t *deliver + = (amqp_basic_deliver_t *)frame.payload.method.decoded; + delivery_tag = deliver->delivery_tag; + + pipeline(argv, &pl); + copy_body(conn, pl.infd); + + if (finish_pipeline(&pl) && !no_ack) + die_errno(-amqp_basic_ack(conn, 1, delivery_tag, 0), + "basic.ack"); + + amqp_maybe_release_buffers(conn); + } +} + +int main(int argc, const char **argv) +{ + poptContext opts; + int no_ack; + amqp_connection_state_t conn; + const char * const *cmd_argv; + + struct poptOption options[] = { + INCLUDE_OPTIONS(connect_options), + INCLUDE_OPTIONS(consume_queue_options), + {"no-ack", 'A', POPT_ARG_NONE, &no_ack, 0, + "consume in no-ack mode", NULL}, + POPT_AUTOHELP + { NULL, 0, 0, NULL, 0 } + }; + + opts = process_options(argc, argv, options, + "[OPTIONS]... "); + + cmd_argv = poptGetArgs(opts); + if (!cmd_argv[0]) { + fprintf(stderr, "consuming command not specified"); + goto error; + } + + conn = make_connection(); + do_consume(conn, no_ack, cmd_argv); + close_connection(conn); + return 0; + +error: + poptFreeContext(opts); + return 1; +} diff --git a/tools/get.c b/tools/get.c new file mode 100644 index 0000000..6efccd1 --- /dev/null +++ b/tools/get.c @@ -0,0 +1,41 @@ +#include "config.h" + +#include + +#include + +#include "common.h" +#include "common_consume.h" + +static int do_get(amqp_connection_state_t conn) +{ + amqp_rpc_reply_t r + = amqp_basic_get(conn, 1, setup_queue(conn), 1); + die_rpc(r, "basic.get"); + + if (r.reply.id == AMQP_BASIC_GET_EMPTY_METHOD) + return 0; + + copy_body(conn, 1); + return 1; +} + +int main(int argc, const char **argv) +{ + amqp_connection_state_t conn; + int got_something; + + struct poptOption options[] = { + INCLUDE_OPTIONS(connect_options), + INCLUDE_OPTIONS(consume_queue_options), + POPT_AUTOHELP + { NULL, 0, 0, NULL, 0 } + }; + + process_all_options(argc, argv, options); + + conn = make_connection(); + got_something = do_get(conn); + close_connection(conn); + return got_something ? 0 : 2; +} diff --git a/tools/publish.c b/tools/publish.c new file mode 100644 index 0000000..08a78b7 --- /dev/null +++ b/tools/publish.c @@ -0,0 +1,90 @@ +#include "config.h" + +#include +#include +#include + +#include + +#include "common.h" + +static void do_publish(amqp_connection_state_t conn, + char *exchange, char *routing_key, + amqp_basic_properties_t *props, amqp_bytes_t body) +{ + int res = amqp_basic_publish(conn, 1, + cstring_bytes(exchange), + cstring_bytes(routing_key), + 0, 0, props, body); + if (res != 0) + die_errno(-res, "basic.publish"); +} + +int main(int argc, const char **argv) +{ + amqp_connection_state_t conn; + char *exchange = NULL; + char *routing_key = NULL; + char *content_type = NULL; + char *content_encoding = NULL; + char *body = NULL; + amqp_basic_properties_t props; + amqp_bytes_t body_bytes; + int delivery = 1; /* non-persistent by default */ + + struct poptOption options[] = { + INCLUDE_OPTIONS(connect_options), + {"exchange", 'e', POPT_ARG_STRING, &exchange, 0, + "the exchange to publish to", "exchange"}, + {"routing-key", 'r', POPT_ARG_STRING, &routing_key, 0, + "the routing key to publish with", "routing key"}, + {"persistent", 'p', POPT_ARG_VAL, &delivery, 2, + "use the persistent delivery mode", NULL}, + {"content-type", 'C', POPT_ARG_STRING, &content_type, 0, + "the content-type for the message", "content type"}, + {"content-encoding", 'E', POPT_ARG_STRING, + &content_encoding, 0, + "the content-encoding for the message", "content encoding"}, + {"body", 'b', POPT_ARG_STRING, &body, 0, + "specify the message body", "body"}, + POPT_AUTOHELP + { NULL, 0, 0, NULL, 0 } + }; + + process_all_options(argc, argv, options); + + if (!exchange && !routing_key) { + fprintf(stderr, + "neither exchange nor routing key specified\n"); + return 1; + } + + memset(&props, 0, sizeof props); + props._flags = AMQP_BASIC_DELIVERY_MODE_FLAG; + props.delivery_mode = 2; // persistent delivery mode + + if (content_type) { + props._flags |= AMQP_BASIC_CONTENT_TYPE_FLAG; + props.content_type = amqp_cstring_bytes(content_type); + } + + if (content_encoding) { + props._flags |= AMQP_BASIC_CONTENT_ENCODING_FLAG; + props.content_encoding = amqp_cstring_bytes(content_encoding); + } + + conn = make_connection(); + + if (body) + body_bytes = amqp_cstring_bytes(body); + else + body_bytes = read_all(0); + + do_publish(conn, exchange, routing_key, &props, body_bytes); + + if (!body) + free(body_bytes.bytes); + + close_connection(conn); + return 0; +} -- cgit v1.2.1