summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.hgignore5
-rw-r--r--Makefile.am10
-rw-r--r--configure.ac27
-rw-r--r--tools/Makefile.am12
-rw-r--r--tools/common.c413
-rw-r--r--tools/common.h93
-rw-r--r--tools/common_consume.c160
-rw-r--r--tools/common_consume.h54
-rw-r--r--tools/consume.c128
-rw-r--r--tools/get.c91
-rw-r--r--tools/publish.c140
11 files changed, 1130 insertions, 3 deletions
diff --git a/.hgignore b/.hgignore
index 01ba313..3719f47 100644
--- a/.hgignore
+++ b/.hgignore
@@ -18,7 +18,7 @@
^librabbitmq/amqp_framing\.[ch]$
-^(|librabbitmq/|tests/|examples/)Makefile(\.in)?$
+^(|librabbitmq/|tests/|examples/|tools/)Makefile(\.in)?$
^tests/test_tables$
^examples/amqp_sendstring$
^examples/amqp_exchange_declare$
@@ -28,3 +28,6 @@
^examples/amqp_unbind$
^examples/amqp_bind$
^examples/amqp_listenq$
+^tools/amqp-publish$
+^tools/amqp-get$
+^tools/amqp-consume$
diff --git a/Makefile.am b/Makefile.am
index 3153ad9..c5ff6e7 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -1,7 +1,13 @@
-SUBDIRS=librabbitmq tests examples
+if TOOLS
+TOOLS_SUBDIR=tools
+else
+TOOLS_SUBDIR=
+endif
+
+SUBDIRS=librabbitmq tests examples $(TOOLS_SUBDIR)
squeakyclean: maintainer-clean
- rm -f Makefile.in librabbitmq/Makefile.in tests/Makefile.in examples/Makefile.in
+ rm -f Makefile.in librabbitmq/Makefile.in tests/Makefile.in examples/Makefile.in tools/Makefile.in
rm -f aclocal.m4
rm -f config.guess config.h.in* config.sub configure
rm -f depcomp install-sh ltmain.sh missing
diff --git a/configure.ac b/configure.ac
index cb835e8..30b7d05 100644
--- a/configure.ac
+++ b/configure.ac
@@ -5,6 +5,7 @@ AM_INIT_AUTOMAKE
AC_CONFIG_HEADER([config.h])
dnl Program checks
+AC_GNU_SOURCE
AC_PROG_CC
dnl Library checks
@@ -63,9 +64,35 @@ AC_SUBST(AMQP_CODEGEN_DIR)
AC_SUBST(AMQP_SPEC_JSON_PATH)
AC_SUBST(PYTHON)
+# Check for libpopt, which we need to build the tools
+
+AC_ARG_WITH([popt],
+ [AS_HELP_STRING([--with-popt], [use the popt library. Needed for tools])],
+ [],
+ [with_popt=check])
+
+LIBPOPT=
+AS_IF([test "x$with_popt" != xno],
+ [AC_CHECK_LIB([popt], [poptGetContext],
+ [AC_SUBST([LIBPOPT], ["-lpopt"])
+ AC_DEFINE([HAVE_LIBPOPT], [1], [Define if you have libpopt])
+ ],
+ [if test "x$with_popt" != xcheck; then
+ AC_MSG_FAILURE([--with-popt was given, but test for libpopt failed])
+ fi
+ ])])
+
+AS_IF([test "x$LIBPOPT" != "x"],
+ [AC_CHECK_HEADER([popt.h], [],
+ [AC_MSG_FAILURE([You have libpopt, but could not find the popt.h header])])
+ ])
+
+AM_CONDITIONAL(TOOLS, test "x$LIBPOPT" != "x")
+
AC_OUTPUT(
Makefile
librabbitmq/Makefile
tests/Makefile
examples/Makefile
+tools/Makefile
)
diff --git a/tools/Makefile.am b/tools/Makefile.am
new file mode 100644
index 0000000..f67d46b
--- /dev/null
+++ b/tools/Makefile.am
@@ -0,0 +1,12 @@
+bin_PROGRAMS = amqp-publish amqp-get amqp-consume
+
+AM_CFLAGS = -I$(top_srcdir)/librabbitmq
+AM_LDFLAGS = $(top_builddir)/librabbitmq/librabbitmq.la
+
+LDADD=$(LIBPOPT)
+
+noinst_HEADERS = common.h
+
+amqp_publish_SOURCES = publish.c common.c
+amqp_get_SOURCES = get.c common.c common_consume.c
+amqp_consume_SOURCES = consume.c common.c common_consume.c
diff --git a/tools/common.c b/tools/common.c
new file mode 100644
index 0000000..0772738
--- /dev/null
+++ b/tools/common.c
@@ -0,0 +1,413 @@
+/*
+ * ***** BEGIN LICENSE BLOCK *****
+ * Version: MPL 1.1/GPL 2.0
+ *
+ * The contents of this file are subject to the Mozilla Public License
+ * Version 1.1 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ * http://www.mozilla.org/MPL/
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and
+ * limitations under the License.
+ *
+ * The Original Code is librabbitmq.
+ *
+ * The Initial Developers of the Original Code are LShift Ltd, Cohesive
+ * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions
+ * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive
+ * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright
+ * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and
+ * Rabbit Technologies Ltd.
+ *
+ * Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+ * Ltd. Portions created by Cohesive Financial Technologies LLC are
+ * Copyright (C) 2007-2009 Cohesive Financial Technologies
+ * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C)
+ * 2007-2009 Rabbit Technologies Ltd.
+ *
+ * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010
+ * LShift Ltd and Tony Garnock-Jones.
+ *
+ * All Rights Reserved.
+ *
+ * Contributor(s): ______________________________________.
+ *
+ * Alternatively, the contents of this file may be used under the terms
+ * of the GNU General Public License Version 2 or later (the "GPL"), in
+ * which case the provisions of the GPL are applicable instead of those
+ * above. If you wish to allow use of your version of this file only
+ * under the terms of the GPL, and not to allow others to use your
+ * version of this file under the terms of the MPL, indicate your
+ * decision by deleting the provisions above and replace them with the
+ * notice and other provisions required by the GPL. If you do not
+ * delete the provisions above, a recipient may use your version of
+ * this file under the terms of any one of the MPL or the GPL.
+ *
+ * ***** END LICENSE BLOCK *****
+ */
+
+#include "config.h"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdarg.h>
+#include <string.h>
+
+#include <unistd.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <spawn.h>
+#include <sys/wait.h>
+
+#include <popt.h>
+
+#include "common.h"
+
+extern char **environ;
+
+void die(const char *fmt, ...)
+{
+ va_list ap;
+ va_start(ap, fmt);
+ vfprintf(stderr, fmt, ap);
+ va_end(ap);
+ fprintf(stderr, "\n");
+ exit(1);
+}
+
+void die_errno(int err, const char *fmt, ...)
+{
+ if (err == 0)
+ return;
+
+ va_list ap;
+ va_start(ap, fmt);
+ vfprintf(stderr, fmt, ap);
+ va_end(ap);
+ fprintf(stderr, ": %s\n", strerror(err));
+ exit(1);
+}
+
+char *amqp_server_exception_string(amqp_rpc_reply_t r)
+{
+ int res;
+ char *s;
+
+ switch (r.reply.id) {
+ case AMQP_CONNECTION_CLOSE_METHOD: {
+ amqp_connection_close_t *m
+ = (amqp_connection_close_t *)r.reply.decoded;
+ res = asprintf(&s, "server connection error %d, message: %.*s",
+ m->reply_code,
+ (int)m->reply_text.len,
+ (char *)m->reply_text.bytes);
+ break;
+ }
+
+ case AMQP_CHANNEL_CLOSE_METHOD: {
+ amqp_channel_close_t *m
+ = (amqp_channel_close_t *)r.reply.decoded;
+ res = asprintf(&s, "server channel error %d, message: %.*s",
+ m->reply_code,
+ (int)m->reply_text.len,
+ (char *)m->reply_text.bytes);
+ break;
+ }
+
+ default:
+ res = asprintf(&s, "unknown server error, method id 0x%08X",
+ r.reply.id);
+ break;
+ }
+
+ return res >= 0 ? s : NULL;
+}
+
+char *amqp_rpc_reply_string(amqp_rpc_reply_t r)
+{
+ const char *s;
+
+ switch (r.reply_type) {
+ case AMQP_RESPONSE_NORMAL:
+ s = "normal response";
+ break;
+
+ case AMQP_RESPONSE_NONE:
+ s = "missing RPC reply type";
+ break;
+
+ case AMQP_RESPONSE_LIBRARY_EXCEPTION:
+ if (r.library_errno)
+ s = strerror(r.library_errno);
+ else
+ s = "end of stream";
+
+ break;
+
+ case AMQP_RESPONSE_SERVER_EXCEPTION:
+ return amqp_server_exception_string(r);
+
+ default:
+ abort();
+ }
+
+ return strdup(s);
+}
+
+void die_rpc(amqp_rpc_reply_t r, const char *fmt, ...)
+{
+ if (r.reply_type == AMQP_RESPONSE_NORMAL)
+ return;
+
+ va_list ap;
+ va_start(ap, fmt);
+ vfprintf(stderr, fmt, ap);
+ va_end(ap);
+ fprintf(stderr, ": %s\n", amqp_rpc_reply_string(r));
+ exit(1);
+}
+
+void set_cloexec(int fd)
+{
+ int flags;
+
+ flags = fcntl(fd, F_GETFD);
+ if (flags == -1
+ || fcntl(fd, F_SETFD, (long)(flags | FD_CLOEXEC)) == -1)
+ die_errno(errno, "set_cloexec");
+}
+
+static char *amqp_server = "localhost";
+static char *amqp_vhost = "/";
+static char *amqp_username = "guest";
+static char *amqp_password = "guest";
+
+const char *connect_options_title = "Connection options";
+struct poptOption connect_options[] = {
+ {"server", 's', POPT_ARG_STRING, &amqp_server, 0,
+ "the AMQP server to connect to", "server"},
+ {"vhost", 0, POPT_ARG_STRING, &amqp_vhost, 0,
+ "the vhost to use when connecting", "vhost"},
+ {"username", 0, POPT_ARG_STRING, &amqp_username, 0,
+ "the username to login with", "username"},
+ {"password", 0, POPT_ARG_STRING, &amqp_password, 0,
+ "the password to login with", "password"},
+ { NULL, 0, 0, NULL, 0 }
+};
+
+amqp_connection_state_t make_connection(void)
+{
+ int s;
+ amqp_connection_state_t conn;
+ char *host = amqp_server;
+ int port = 0;
+
+ /* parse the server string into a hostname and a port */
+ char *colon = strchr(amqp_server, ':');
+ if (colon) {
+ char *port_end;
+ size_t host_len = colon - amqp_server;
+ host = malloc(host_len + 1);
+ memcpy(host, amqp_server, host_len);
+ host[host_len] = 0;
+
+ port = strtol(colon+1, &port_end, 10);
+ if (port < 0
+ || port > 65535
+ || port_end == colon+1
+ || *port_end != 0)
+ die("bad server port number in %s", amqp_server);
+ }
+
+ s = amqp_open_socket(host, port ? port : 5672);
+ if (s < 0) {
+ if (s == -ENOENT)
+ die("unknown host %s", host);
+ else
+ die_errno(-s, "opening socket to %s", amqp_server);
+ }
+
+ set_cloexec(s);
+
+ conn = amqp_new_connection();
+ amqp_set_sockfd(conn, s);
+
+ die_rpc(amqp_login(conn, amqp_vhost, 0, 131072, 0,
+ AMQP_SASL_METHOD_PLAIN,
+ amqp_username, amqp_password),
+ "logging in to AMQP server");
+
+ if (!amqp_channel_open(conn, 1))
+ die_rpc(amqp_get_rpc_reply(conn), "opening channel");
+
+ return conn;
+}
+
+void close_connection(amqp_connection_state_t conn)
+{
+ int s = amqp_get_sockfd(conn);
+
+ die_rpc(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS),
+ "closing channel");
+ die_rpc(amqp_connection_close(conn, AMQP_REPLY_SUCCESS),
+ "closing connection");
+ amqp_destroy_connection(conn);
+
+ if (close(s) < 0)
+ die_errno(errno, "closing socket");
+}
+
+amqp_bytes_t read_all(int fd)
+{
+ size_t space = 4096;
+ amqp_bytes_t bytes;
+
+ bytes.bytes = malloc(space);
+ bytes.len = 0;
+
+ for (;;) {
+ ssize_t res = read(fd, bytes.bytes+bytes.len, space-bytes.len);
+ if (res == 0)
+ break;
+
+ if (res < 0) {
+ if (errno == EINTR)
+ continue;
+
+ die_errno(errno, "reading");
+ }
+
+ bytes.len += res;
+ if (bytes.len == space) {
+ space *= 2;
+ bytes.bytes = realloc(bytes.bytes, space);
+ }
+ }
+
+ return bytes;
+}
+
+void write_all(int fd, amqp_bytes_t data)
+{
+ while (data.len > 0) {
+ ssize_t res = write(fd, data.bytes, data.len);
+ if (res < 0)
+ die_errno(errno, "write");
+
+ data.len -= res;
+ data.bytes += res;
+ }
+}
+
+void copy_body(amqp_connection_state_t conn, int fd)
+{
+ size_t body_remaining;
+ amqp_frame_t frame;
+
+ int res = amqp_simple_wait_frame(conn, &frame);
+ if (res < 0)
+ die_errno(-res, "waiting for header frame");
+ if (frame.frame_type != AMQP_FRAME_HEADER)
+ die("expected header, got frame type 0x%X",
+ frame.frame_type);
+
+ body_remaining = frame.payload.properties.body_size;
+ while (body_remaining) {
+ res = amqp_simple_wait_frame(conn, &frame);
+ if (res < 0)
+ die_errno(-res, "waiting for body frame");
+ if (frame.frame_type != AMQP_FRAME_BODY)
+ die("expected body, got frame type 0x%X",
+ frame.frame_type);
+
+ write_all(fd, frame.payload.body_fragment);
+ body_remaining -= frame.payload.body_fragment.len;
+ }
+}
+
+void pipeline(const char * const *argv, struct pipeline *pl)
+{
+ posix_spawn_file_actions_t file_acts;
+
+ int pipefds[2];
+ if (pipe(pipefds))
+ die_errno(errno, "pipe");
+
+ die_errno(posix_spawn_file_actions_init(&file_acts),
+ "posix_spawn_file_actions_init");
+ die_errno(posix_spawn_file_actions_adddup2(&file_acts, pipefds[0], 0),
+ "posix_spawn_file_actions_adddup2");
+ die_errno(posix_spawn_file_actions_addclose(&file_acts, pipefds[0]),
+ "posix_spawn_file_actions_addclose");
+ die_errno(posix_spawn_file_actions_addclose(&file_acts, pipefds[1]),
+ "posix_spawn_file_actions_addclose");
+
+ die_errno(posix_spawnp(&pl->pid, argv[0], &file_acts, NULL,
+ (char * const *)argv, environ),
+ "posix_spawnp: %s", argv[0]);
+
+ die_errno(posix_spawn_file_actions_destroy(&file_acts),
+ "posix_spawn_file_actions_destroy");
+
+ if (close(pipefds[0]))
+ die_errno(errno, "close");
+
+ pl->infd = pipefds[1];
+}
+
+int finish_pipeline(struct pipeline *pl)
+{
+ int status;
+
+ if (close(pl->infd))
+ die_errno(errno, "close");
+ if (waitpid(pl->pid, &status, 0) < 0)
+ die_errno(errno, "waitpid");
+ return WIFEXITED(status) && WEXITSTATUS(status) == 0;
+}
+
+poptContext process_options(int argc, const char **argv,
+ struct poptOption *options,
+ const char *help)
+{
+ int c;
+ poptContext opts = poptGetContext(NULL, argc, argv, options, 0);
+ poptSetOtherOptionHelp(opts, help);
+
+ while ((c = poptGetNextOpt(opts)) >= 0) {
+ // no options require explicit handling
+ }
+
+ if (c < -1) {
+ fprintf(stderr, "%s: %s\n",
+ poptBadOption(opts, POPT_BADOPTION_NOALIAS),
+ poptStrerror(c));
+ poptPrintUsage(opts, stderr, 0);
+ exit(1);
+ }
+
+ return opts;
+}
+
+void process_all_options(int argc, const char **argv,
+ struct poptOption *options)
+{
+ poptContext opts = process_options(argc, argv, options,
+ "[OPTIONS]...");
+ const char *opt = poptPeekArg(opts);
+
+ if (opt) {
+ fprintf(stderr, "unexpected operand: %s\n", opt);
+ poptPrintUsage(opts, stderr, 0);
+ exit(1);
+ }
+
+ poptFreeContext(opts);
+}
+
+amqp_bytes_t cstring_bytes(const char *str)
+{
+ return str ? amqp_cstring_bytes(str) : AMQP_EMPTY_BYTES;
+}
diff --git a/tools/common.h b/tools/common.h
new file mode 100644
index 0000000..09a9242
--- /dev/null
+++ b/tools/common.h
@@ -0,0 +1,93 @@
+/*
+ * ***** BEGIN LICENSE BLOCK *****
+ * Version: MPL 1.1/GPL 2.0
+ *
+ * The contents of this file are subject to the Mozilla Public License
+ * Version 1.1 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ * http://www.mozilla.org/MPL/
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and
+ * limitations under the License.
+ *
+ * The Original Code is librabbitmq.
+ *
+ * The Initial Developers of the Original Code are LShift Ltd, Cohesive
+ * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions
+ * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive
+ * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright
+ * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and
+ * Rabbit Technologies Ltd.
+ *
+ * Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+ * Ltd. Portions created by Cohesive Financial Technologies LLC are
+ * Copyright (C) 2007-2009 Cohesive Financial Technologies
+ * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C)
+ * 2007-2009 Rabbit Technologies Ltd.
+ *
+ * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010
+ * LShift Ltd and Tony Garnock-Jones.
+ *
+ * All Rights Reserved.
+ *
+ * Contributor(s): ______________________________________.
+ *
+ * Alternatively, the contents of this file may be used under the terms
+ * of the GNU General Public License Version 2 or later (the "GPL"), in
+ * which case the provisions of the GPL are applicable instead of those
+ * above. If you wish to allow use of your version of this file only
+ * under the terms of the GPL, and not to allow others to use your
+ * version of this file under the terms of the MPL, indicate your
+ * decision by deleting the provisions above and replace them with the
+ * notice and other provisions required by the GPL. If you do not
+ * delete the provisions above, a recipient may use your version of
+ * this file under the terms of any one of the MPL or the GPL.
+ *
+ * ***** END LICENSE BLOCK *****
+ */
+
+#include <stdint.h>
+
+#include <amqp.h>
+#include <amqp_framing.h>
+
+extern char *amqp_server_exception_string(amqp_rpc_reply_t r);
+extern char *amqp_rpc_reply_string(amqp_rpc_reply_t r);
+
+extern void die(const char *fmt, ...)
+ __attribute__ ((format (printf, 1, 2)));
+extern void die_errno(int err, const char *fmt, ...)
+ __attribute__ ((format (printf, 2, 3)));
+extern void die_rpc(amqp_rpc_reply_t r, const char *fmt, ...)
+ __attribute__ ((format (printf, 2, 3)));
+
+extern const char *connect_options_title;
+extern struct poptOption connect_options[];
+extern amqp_connection_state_t make_connection(void);
+extern void close_connection(amqp_connection_state_t conn);
+
+extern amqp_bytes_t read_all(int fd);
+extern void write_all(int fd, amqp_bytes_t data);
+
+extern void copy_body(amqp_connection_state_t conn, int fd);
+
+struct pipeline {
+ int pid;
+ int infd;
+};
+
+extern void pipeline(const char * const *argv, struct pipeline *pl);
+extern int finish_pipeline(struct pipeline *pl);
+
+#define INCLUDE_OPTIONS(options) \
+ {NULL, 0, POPT_ARG_INCLUDE_TABLE, options, 0, options ## _title, NULL}
+
+extern poptContext process_options(int argc, const char **argv,
+ struct poptOption *options,
+ const char *help);
+extern void process_all_options(int argc, const char **argv,
+ struct poptOption *options);
+
+extern amqp_bytes_t cstring_bytes(const char *str);
diff --git a/tools/common_consume.c b/tools/common_consume.c
new file mode 100644
index 0000000..318fa2c
--- /dev/null
+++ b/tools/common_consume.c
@@ -0,0 +1,160 @@
+/*
+ * ***** BEGIN LICENSE BLOCK *****
+ * Version: MPL 1.1/GPL 2.0
+ *
+ * The contents of this file are subject to the Mozilla Public License
+ * Version 1.1 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ * http://www.mozilla.org/MPL/
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and
+ * limitations under the License.
+ *
+ * The Original Code is librabbitmq.
+ *
+ * The Initial Developers of the Original Code are LShift Ltd, Cohesive
+ * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions
+ * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive
+ * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright
+ * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and
+ * Rabbit Technologies Ltd.
+ *
+ * Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+ * Ltd. Portions created by Cohesive Financial Technologies LLC are
+ * Copyright (C) 2007-2009 Cohesive Financial Technologies
+ * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C)
+ * 2007-2009 Rabbit Technologies Ltd.
+ *
+ * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010
+ * LShift Ltd and Tony Garnock-Jones.
+ *
+ * All Rights Reserved.
+ *
+ * Contributor(s): ______________________________________.
+ *
+ * Alternatively, the contents of this file may be used under the terms
+ * of the GNU General Public License Version 2 or later (the "GPL"), in
+ * which case the provisions of the GPL are applicable instead of those
+ * above. If you wish to allow use of your version of this file only
+ * under the terms of the GPL, and not to allow others to use your
+ * version of this file under the terms of the MPL, indicate your
+ * decision by deleting the provisions above and replace them with the
+ * notice and other provisions required by the GPL. If you do not
+ * delete the provisions above, a recipient may use your version of
+ * this file under the terms of any one of the MPL or the GPL.
+ *
+ * ***** END LICENSE BLOCK *****
+ */
+
+#include "config.h"
+
+#include <stdio.h>
+#include <stdlib.h>
+
+#include <popt.h>
+
+#include "common.h"
+
+static char *queue;
+static char *exchange;
+static char *exchange_type;
+static char *routing_key;
+
+const char *consume_queue_options_title = "Source queue options";
+struct poptOption consume_queue_options[] = {
+ {"queue", 'q', POPT_ARG_STRING, &queue, 0,
+ "the queue to consume from", "queue"},
+ {"exchange", 'e', POPT_ARG_STRING, &exchange, 0,
+ "bind the queue to this exchange", "exchange"},
+ {"exchange-type", 't', POPT_ARG_STRING, &exchange_type, 0,
+ "create auto-delete exchange of this type for binding", "type"},
+ {"routing-key", 'r', POPT_ARG_STRING, &routing_key, 0,
+ "the routing key to bind with", "routing key"},
+ { NULL, 0, 0, NULL, 0 }
+};
+
+/* Convert a amqp_bytes_t to an escaped string form for printing. We
+ use the same escaping conventions as rabbitmqctl. */
+static char *stringify_bytes(amqp_bytes_t bytes)
+{
+ /* W will need up to 4 chars per byte, plus the terminating 0 */
+ char *res = malloc(bytes.len * 4 + 1);
+ uint8_t *data = bytes.bytes;
+ char *p = res;
+ size_t i;
+
+ for (i = 0; i < bytes.len; i++) {
+ if (data[i] >= 32 && data[i] != 127) {
+ *p++ = data[i];
+ }
+ else {
+ *p++ = '\\';
+ *p++ = '0' + (data[i] >> 6);
+ *p++ = '0' + (data[i] >> 3 & 0x7);
+ *p++ = '0' + (data[i] & 0x7);
+ }
+ }
+
+ *p = 0;
+ return res;
+}
+
+amqp_bytes_t setup_queue(amqp_connection_state_t conn)
+{
+ /* if an exchange name wasn't provided, check that we don't
+ have options that require it. */
+ if (!exchange) {
+ char *opt = NULL;
+ if (routing_key)
+ opt = "--routing-key";
+ else if (exchange_type)
+ opt = "--exchange-type";
+
+ if (opt) {
+ fprintf(stderr,
+ "%s option requires an exchange name to be "
+ "provided with --exchange\n", opt);
+ exit(1);
+ }
+ }
+
+ /* Declare the queue. If the queue already exists, this won't have
+ any effect. */
+ amqp_bytes_t queue_bytes = cstring_bytes(queue);
+ amqp_queue_declare_ok_t *res
+ = amqp_queue_declare(conn, 1, queue_bytes, 0, 0, 0, 1,
+ AMQP_EMPTY_TABLE);
+ if (!res)
+ die_rpc(amqp_get_rpc_reply(conn), "queue.declare");
+
+ if (!queue) {
+ // the server should have provided a queue name
+ char *sq;
+ queue_bytes = amqp_bytes_malloc_dup(res->queue);
+ sq = stringify_bytes(queue_bytes);
+ fprintf(stderr, "Server provided queue name: %s\n", sq);
+ free(sq);
+ }
+
+ /* Bind to an exchange if requested */
+ if (exchange) {
+ amqp_bytes_t eb = amqp_cstring_bytes(exchange);
+
+ if (exchange_type) {
+ // we should create the exchange
+ if (!amqp_exchange_declare(conn, 1, eb,
+ amqp_cstring_bytes(exchange_type),
+ 0, 0, 1, AMQP_EMPTY_TABLE))
+ die_rpc(amqp_get_rpc_reply(conn), "exchange.declare");
+ }
+
+ if (!amqp_queue_bind(conn, 1, queue_bytes, eb,
+ cstring_bytes(routing_key),
+ AMQP_EMPTY_TABLE))
+ die_rpc(amqp_get_rpc_reply(conn), "queue.bind");
+ }
+
+ return queue_bytes;
+}
diff --git a/tools/common_consume.h b/tools/common_consume.h
new file mode 100644
index 0000000..703f4bd
--- /dev/null
+++ b/tools/common_consume.h
@@ -0,0 +1,54 @@
+/*
+ * ***** BEGIN LICENSE BLOCK *****
+ * Version: MPL 1.1/GPL 2.0
+ *
+ * The contents of this file are subject to the Mozilla Public License
+ * Version 1.1 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ * http://www.mozilla.org/MPL/
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and
+ * limitations under the License.
+ *
+ * The Original Code is librabbitmq.
+ *
+ * The Initial Developers of the Original Code are LShift Ltd, Cohesive
+ * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions
+ * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive
+ * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright
+ * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and
+ * Rabbit Technologies Ltd.
+ *
+ * Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+ * Ltd. Portions created by Cohesive Financial Technologies LLC are
+ * Copyright (C) 2007-2009 Cohesive Financial Technologies
+ * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C)
+ * 2007-2009 Rabbit Technologies Ltd.
+ *
+ * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010
+ * LShift Ltd and Tony Garnock-Jones.
+ *
+ * All Rights Reserved.
+ *
+ * Contributor(s): ______________________________________.
+ *
+ * Alternatively, the contents of this file may be used under the terms
+ * of the GNU General Public License Version 2 or later (the "GPL"), in
+ * which case the provisions of the GPL are applicable instead of those
+ * above. If you wish to allow use of your version of this file only
+ * under the terms of the GPL, and not to allow others to use your
+ * version of this file under the terms of the MPL, indicate your
+ * decision by deleting the provisions above and replace them with the
+ * notice and other provisions required by the GPL. If you do not
+ * delete the provisions above, a recipient may use your version of
+ * this file under the terms of any one of the MPL or the GPL.
+ *
+ * ***** END LICENSE BLOCK *****
+ */
+
+extern const char *consume_queue_options_title;
+extern struct poptOption consume_queue_options[];
+extern amqp_bytes_t setup_queue(amqp_connection_state_t conn);
+
diff --git a/tools/consume.c b/tools/consume.c
new file mode 100644
index 0000000..7ede926
--- /dev/null
+++ b/tools/consume.c
@@ -0,0 +1,128 @@
+/*
+ * ***** BEGIN LICENSE BLOCK *****
+ * Version: MPL 1.1/GPL 2.0
+ *
+ * The contents of this file are subject to the Mozilla Public License
+ * Version 1.1 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ * http://www.mozilla.org/MPL/
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and
+ * limitations under the License.
+ *
+ * The Original Code is librabbitmq.
+ *
+ * The Initial Developers of the Original Code are LShift Ltd, Cohesive
+ * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions
+ * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive
+ * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright
+ * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and
+ * Rabbit Technologies Ltd.
+ *
+ * Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+ * Ltd. Portions created by Cohesive Financial Technologies LLC are
+ * Copyright (C) 2007-2009 Cohesive Financial Technologies
+ * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C)
+ * 2007-2009 Rabbit Technologies Ltd.
+ *
+ * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010
+ * LShift Ltd and Tony Garnock-Jones.
+ *
+ * All Rights Reserved.
+ *
+ * Contributor(s): ______________________________________.
+ *
+ * Alternatively, the contents of this file may be used under the terms
+ * of the GNU General Public License Version 2 or later (the "GPL"), in
+ * which case the provisions of the GPL are applicable instead of those
+ * above. If you wish to allow use of your version of this file only
+ * under the terms of the GPL, and not to allow others to use your
+ * version of this file under the terms of the MPL, indicate your
+ * decision by deleting the provisions above and replace them with the
+ * notice and other provisions required by the GPL. If you do not
+ * delete the provisions above, a recipient may use your version of
+ * this file under the terms of any one of the MPL or the GPL.
+ *
+ * ***** END LICENSE BLOCK *****
+ */
+
+#include "config.h"
+
+#include <stdio.h>
+
+#include <popt.h>
+
+#include "common.h"
+#include "common_consume.h"
+
+static void do_consume(amqp_connection_state_t conn, int no_ack,
+ const char * const *argv)
+{
+ if (!amqp_basic_consume(conn, 1, setup_queue(conn),
+ AMQP_EMPTY_BYTES, 0, no_ack, 0))
+ die_rpc(amqp_get_rpc_reply(conn), "basic.consume");
+
+ for (;;) {
+ amqp_frame_t frame;
+ struct pipeline pl;
+ uint64_t delivery_tag;
+ int res = amqp_simple_wait_frame(conn, &frame);
+ if (res < 0)
+ die_errno(-res, "waiting for header frame");
+
+ if (frame.frame_type != AMQP_FRAME_METHOD
+ || frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD)
+ continue;
+
+ amqp_basic_deliver_t *deliver
+ = (amqp_basic_deliver_t *)frame.payload.method.decoded;
+ delivery_tag = deliver->delivery_tag;
+
+ pipeline(argv, &pl);
+ copy_body(conn, pl.infd);
+
+ if (finish_pipeline(&pl) && !no_ack)
+ die_errno(-amqp_basic_ack(conn, 1, delivery_tag, 0),
+ "basic.ack");
+
+ amqp_maybe_release_buffers(conn);
+ }
+}
+
+int main(int argc, const char **argv)
+{
+ poptContext opts;
+ int no_ack;
+ amqp_connection_state_t conn;
+ const char * const *cmd_argv;
+
+ struct poptOption options[] = {
+ INCLUDE_OPTIONS(connect_options),
+ INCLUDE_OPTIONS(consume_queue_options),
+ {"no-ack", 'A', POPT_ARG_NONE, &no_ack, 0,
+ "consume in no-ack mode", NULL},
+ POPT_AUTOHELP
+ { NULL, 0, 0, NULL, 0 }
+ };
+
+ opts = process_options(argc, argv, options,
+ "[OPTIONS]... <command> <args>");
+
+ cmd_argv = poptGetArgs(opts);
+ if (!cmd_argv || !cmd_argv[0]) {
+ fprintf(stderr, "consuming command not specified\n");
+ poptPrintUsage(opts, stderr, 0);
+ goto error;
+ }
+
+ conn = make_connection();
+ do_consume(conn, no_ack, cmd_argv);
+ close_connection(conn);
+ return 0;
+
+error:
+ poptFreeContext(opts);
+ return 1;
+}
diff --git a/tools/get.c b/tools/get.c
new file mode 100644
index 0000000..b433e2b
--- /dev/null
+++ b/tools/get.c
@@ -0,0 +1,91 @@
+/*
+ * ***** BEGIN LICENSE BLOCK *****
+ * Version: MPL 1.1/GPL 2.0
+ *
+ * The contents of this file are subject to the Mozilla Public License
+ * Version 1.1 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ * http://www.mozilla.org/MPL/
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and
+ * limitations under the License.
+ *
+ * The Original Code is librabbitmq.
+ *
+ * The Initial Developers of the Original Code are LShift Ltd, Cohesive
+ * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions
+ * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive
+ * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright
+ * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and
+ * Rabbit Technologies Ltd.
+ *
+ * Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+ * Ltd. Portions created by Cohesive Financial Technologies LLC are
+ * Copyright (C) 2007-2009 Cohesive Financial Technologies
+ * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C)
+ * 2007-2009 Rabbit Technologies Ltd.
+ *
+ * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010
+ * LShift Ltd and Tony Garnock-Jones.
+ *
+ * All Rights Reserved.
+ *
+ * Contributor(s): ______________________________________.
+ *
+ * Alternatively, the contents of this file may be used under the terms
+ * of the GNU General Public License Version 2 or later (the "GPL"), in
+ * which case the provisions of the GPL are applicable instead of those
+ * above. If you wish to allow use of your version of this file only
+ * under the terms of the GPL, and not to allow others to use your
+ * version of this file under the terms of the MPL, indicate your
+ * decision by deleting the provisions above and replace them with the
+ * notice and other provisions required by the GPL. If you do not
+ * delete the provisions above, a recipient may use your version of
+ * this file under the terms of any one of the MPL or the GPL.
+ *
+ * ***** END LICENSE BLOCK *****
+ */
+
+#include "config.h"
+
+#include <stdio.h>
+
+#include <popt.h>
+
+#include "common.h"
+#include "common_consume.h"
+
+static int do_get(amqp_connection_state_t conn)
+{
+ amqp_rpc_reply_t r
+ = amqp_basic_get(conn, 1, setup_queue(conn), 1);
+ die_rpc(r, "basic.get");
+
+ if (r.reply.id == AMQP_BASIC_GET_EMPTY_METHOD)
+ return 0;
+
+ copy_body(conn, 1);
+ return 1;
+}
+
+int main(int argc, const char **argv)
+{
+ amqp_connection_state_t conn;
+ int got_something;
+
+ struct poptOption options[] = {
+ INCLUDE_OPTIONS(connect_options),
+ INCLUDE_OPTIONS(consume_queue_options),
+ POPT_AUTOHELP
+ { NULL, 0, 0, NULL, 0 }
+ };
+
+ process_all_options(argc, argv, options);
+
+ conn = make_connection();
+ got_something = do_get(conn);
+ close_connection(conn);
+ return got_something ? 0 : 2;
+}
diff --git a/tools/publish.c b/tools/publish.c
new file mode 100644
index 0000000..a5b86b2
--- /dev/null
+++ b/tools/publish.c
@@ -0,0 +1,140 @@
+/*
+ * ***** BEGIN LICENSE BLOCK *****
+ * Version: MPL 1.1/GPL 2.0
+ *
+ * The contents of this file are subject to the Mozilla Public License
+ * Version 1.1 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ * http://www.mozilla.org/MPL/
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and
+ * limitations under the License.
+ *
+ * The Original Code is librabbitmq.
+ *
+ * The Initial Developers of the Original Code are LShift Ltd, Cohesive
+ * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions
+ * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive
+ * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright
+ * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and
+ * Rabbit Technologies Ltd.
+ *
+ * Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+ * Ltd. Portions created by Cohesive Financial Technologies LLC are
+ * Copyright (C) 2007-2009 Cohesive Financial Technologies
+ * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C)
+ * 2007-2009 Rabbit Technologies Ltd.
+ *
+ * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010
+ * LShift Ltd and Tony Garnock-Jones.
+ *
+ * All Rights Reserved.
+ *
+ * Contributor(s): ______________________________________.
+ *
+ * Alternatively, the contents of this file may be used under the terms
+ * of the GNU General Public License Version 2 or later (the "GPL"), in
+ * which case the provisions of the GPL are applicable instead of those
+ * above. If you wish to allow use of your version of this file only
+ * under the terms of the GPL, and not to allow others to use your
+ * version of this file under the terms of the MPL, indicate your
+ * decision by deleting the provisions above and replace them with the
+ * notice and other provisions required by the GPL. If you do not
+ * delete the provisions above, a recipient may use your version of
+ * this file under the terms of any one of the MPL or the GPL.
+ *
+ * ***** END LICENSE BLOCK *****
+ */
+
+#include "config.h"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <popt.h>
+
+#include "common.h"
+
+static void do_publish(amqp_connection_state_t conn,
+ char *exchange, char *routing_key,
+ amqp_basic_properties_t *props, amqp_bytes_t body)
+{
+ int res = amqp_basic_publish(conn, 1,
+ cstring_bytes(exchange),
+ cstring_bytes(routing_key),
+ 0, 0, props, body);
+ if (res != 0)
+ die_errno(-res, "basic.publish");
+}
+
+int main(int argc, const char **argv)
+{
+ amqp_connection_state_t conn;
+ char *exchange = NULL;
+ char *routing_key = NULL;
+ char *content_type = NULL;
+ char *content_encoding = NULL;
+ char *body = NULL;
+ amqp_basic_properties_t props;
+ amqp_bytes_t body_bytes;
+ int delivery = 1; /* non-persistent by default */
+
+ struct poptOption options[] = {
+ INCLUDE_OPTIONS(connect_options),
+ {"exchange", 'e', POPT_ARG_STRING, &exchange, 0,
+ "the exchange to publish to", "exchange"},
+ {"routing-key", 'r', POPT_ARG_STRING, &routing_key, 0,
+ "the routing key to publish with", "routing key"},
+ {"persistent", 'p', POPT_ARG_VAL, &delivery, 2,
+ "use the persistent delivery mode", NULL},
+ {"content-type", 'C', POPT_ARG_STRING, &content_type, 0,
+ "the content-type for the message", "content type"},
+ {"content-encoding", 'E', POPT_ARG_STRING,
+ &content_encoding, 0,
+ "the content-encoding for the message", "content encoding"},
+ {"body", 'b', POPT_ARG_STRING, &body, 0,
+ "specify the message body", "body"},
+ POPT_AUTOHELP
+ { NULL, 0, 0, NULL, 0 }
+ };
+
+ process_all_options(argc, argv, options);
+
+ if (!exchange && !routing_key) {
+ fprintf(stderr,
+ "neither exchange nor routing key specified\n");
+ return 1;
+ }
+
+ memset(&props, 0, sizeof props);
+ props._flags = AMQP_BASIC_DELIVERY_MODE_FLAG;
+ props.delivery_mode = 2; // persistent delivery mode
+
+ if (content_type) {
+ props._flags |= AMQP_BASIC_CONTENT_TYPE_FLAG;
+ props.content_type = amqp_cstring_bytes(content_type);
+ }
+
+ if (content_encoding) {
+ props._flags |= AMQP_BASIC_CONTENT_ENCODING_FLAG;
+ props.content_encoding = amqp_cstring_bytes(content_encoding);
+ }
+
+ conn = make_connection();
+
+ if (body)
+ body_bytes = amqp_cstring_bytes(body);
+ else
+ body_bytes = read_all(0);
+
+ do_publish(conn, exchange, routing_key, &props, body_bytes);
+
+ if (!body)
+ free(body_bytes.bytes);
+
+ close_connection(conn);
+ return 0;
+}