summaryrefslogtreecommitdiff
path: root/tools/common.c
diff options
context:
space:
mode:
Diffstat (limited to 'tools/common.c')
-rw-r--r--tools/common.c413
1 files changed, 413 insertions, 0 deletions
diff --git a/tools/common.c b/tools/common.c
new file mode 100644
index 0000000..0772738
--- /dev/null
+++ b/tools/common.c
@@ -0,0 +1,413 @@
+/*
+ * ***** BEGIN LICENSE BLOCK *****
+ * Version: MPL 1.1/GPL 2.0
+ *
+ * The contents of this file are subject to the Mozilla Public License
+ * Version 1.1 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ * http://www.mozilla.org/MPL/
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and
+ * limitations under the License.
+ *
+ * The Original Code is librabbitmq.
+ *
+ * The Initial Developers of the Original Code are LShift Ltd, Cohesive
+ * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions
+ * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive
+ * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright
+ * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and
+ * Rabbit Technologies Ltd.
+ *
+ * Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+ * Ltd. Portions created by Cohesive Financial Technologies LLC are
+ * Copyright (C) 2007-2009 Cohesive Financial Technologies
+ * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C)
+ * 2007-2009 Rabbit Technologies Ltd.
+ *
+ * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010
+ * LShift Ltd and Tony Garnock-Jones.
+ *
+ * All Rights Reserved.
+ *
+ * Contributor(s): ______________________________________.
+ *
+ * Alternatively, the contents of this file may be used under the terms
+ * of the GNU General Public License Version 2 or later (the "GPL"), in
+ * which case the provisions of the GPL are applicable instead of those
+ * above. If you wish to allow use of your version of this file only
+ * under the terms of the GPL, and not to allow others to use your
+ * version of this file under the terms of the MPL, indicate your
+ * decision by deleting the provisions above and replace them with the
+ * notice and other provisions required by the GPL. If you do not
+ * delete the provisions above, a recipient may use your version of
+ * this file under the terms of any one of the MPL or the GPL.
+ *
+ * ***** END LICENSE BLOCK *****
+ */
+
+#include "config.h"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdarg.h>
+#include <string.h>
+
+#include <unistd.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <spawn.h>
+#include <sys/wait.h>
+
+#include <popt.h>
+
+#include "common.h"
+
+extern char **environ;
+
+void die(const char *fmt, ...)
+{
+ va_list ap;
+ va_start(ap, fmt);
+ vfprintf(stderr, fmt, ap);
+ va_end(ap);
+ fprintf(stderr, "\n");
+ exit(1);
+}
+
+void die_errno(int err, const char *fmt, ...)
+{
+ if (err == 0)
+ return;
+
+ va_list ap;
+ va_start(ap, fmt);
+ vfprintf(stderr, fmt, ap);
+ va_end(ap);
+ fprintf(stderr, ": %s\n", strerror(err));
+ exit(1);
+}
+
+char *amqp_server_exception_string(amqp_rpc_reply_t r)
+{
+ int res;
+ char *s;
+
+ switch (r.reply.id) {
+ case AMQP_CONNECTION_CLOSE_METHOD: {
+ amqp_connection_close_t *m
+ = (amqp_connection_close_t *)r.reply.decoded;
+ res = asprintf(&s, "server connection error %d, message: %.*s",
+ m->reply_code,
+ (int)m->reply_text.len,
+ (char *)m->reply_text.bytes);
+ break;
+ }
+
+ case AMQP_CHANNEL_CLOSE_METHOD: {
+ amqp_channel_close_t *m
+ = (amqp_channel_close_t *)r.reply.decoded;
+ res = asprintf(&s, "server channel error %d, message: %.*s",
+ m->reply_code,
+ (int)m->reply_text.len,
+ (char *)m->reply_text.bytes);
+ break;
+ }
+
+ default:
+ res = asprintf(&s, "unknown server error, method id 0x%08X",
+ r.reply.id);
+ break;
+ }
+
+ return res >= 0 ? s : NULL;
+}
+
+char *amqp_rpc_reply_string(amqp_rpc_reply_t r)
+{
+ const char *s;
+
+ switch (r.reply_type) {
+ case AMQP_RESPONSE_NORMAL:
+ s = "normal response";
+ break;
+
+ case AMQP_RESPONSE_NONE:
+ s = "missing RPC reply type";
+ break;
+
+ case AMQP_RESPONSE_LIBRARY_EXCEPTION:
+ if (r.library_errno)
+ s = strerror(r.library_errno);
+ else
+ s = "end of stream";
+
+ break;
+
+ case AMQP_RESPONSE_SERVER_EXCEPTION:
+ return amqp_server_exception_string(r);
+
+ default:
+ abort();
+ }
+
+ return strdup(s);
+}
+
+void die_rpc(amqp_rpc_reply_t r, const char *fmt, ...)
+{
+ if (r.reply_type == AMQP_RESPONSE_NORMAL)
+ return;
+
+ va_list ap;
+ va_start(ap, fmt);
+ vfprintf(stderr, fmt, ap);
+ va_end(ap);
+ fprintf(stderr, ": %s\n", amqp_rpc_reply_string(r));
+ exit(1);
+}
+
+void set_cloexec(int fd)
+{
+ int flags;
+
+ flags = fcntl(fd, F_GETFD);
+ if (flags == -1
+ || fcntl(fd, F_SETFD, (long)(flags | FD_CLOEXEC)) == -1)
+ die_errno(errno, "set_cloexec");
+}
+
+static char *amqp_server = "localhost";
+static char *amqp_vhost = "/";
+static char *amqp_username = "guest";
+static char *amqp_password = "guest";
+
+const char *connect_options_title = "Connection options";
+struct poptOption connect_options[] = {
+ {"server", 's', POPT_ARG_STRING, &amqp_server, 0,
+ "the AMQP server to connect to", "server"},
+ {"vhost", 0, POPT_ARG_STRING, &amqp_vhost, 0,
+ "the vhost to use when connecting", "vhost"},
+ {"username", 0, POPT_ARG_STRING, &amqp_username, 0,
+ "the username to login with", "username"},
+ {"password", 0, POPT_ARG_STRING, &amqp_password, 0,
+ "the password to login with", "password"},
+ { NULL, 0, 0, NULL, 0 }
+};
+
+amqp_connection_state_t make_connection(void)
+{
+ int s;
+ amqp_connection_state_t conn;
+ char *host = amqp_server;
+ int port = 0;
+
+ /* parse the server string into a hostname and a port */
+ char *colon = strchr(amqp_server, ':');
+ if (colon) {
+ char *port_end;
+ size_t host_len = colon - amqp_server;
+ host = malloc(host_len + 1);
+ memcpy(host, amqp_server, host_len);
+ host[host_len] = 0;
+
+ port = strtol(colon+1, &port_end, 10);
+ if (port < 0
+ || port > 65535
+ || port_end == colon+1
+ || *port_end != 0)
+ die("bad server port number in %s", amqp_server);
+ }
+
+ s = amqp_open_socket(host, port ? port : 5672);
+ if (s < 0) {
+ if (s == -ENOENT)
+ die("unknown host %s", host);
+ else
+ die_errno(-s, "opening socket to %s", amqp_server);
+ }
+
+ set_cloexec(s);
+
+ conn = amqp_new_connection();
+ amqp_set_sockfd(conn, s);
+
+ die_rpc(amqp_login(conn, amqp_vhost, 0, 131072, 0,
+ AMQP_SASL_METHOD_PLAIN,
+ amqp_username, amqp_password),
+ "logging in to AMQP server");
+
+ if (!amqp_channel_open(conn, 1))
+ die_rpc(amqp_get_rpc_reply(conn), "opening channel");
+
+ return conn;
+}
+
+void close_connection(amqp_connection_state_t conn)
+{
+ int s = amqp_get_sockfd(conn);
+
+ die_rpc(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS),
+ "closing channel");
+ die_rpc(amqp_connection_close(conn, AMQP_REPLY_SUCCESS),
+ "closing connection");
+ amqp_destroy_connection(conn);
+
+ if (close(s) < 0)
+ die_errno(errno, "closing socket");
+}
+
+amqp_bytes_t read_all(int fd)
+{
+ size_t space = 4096;
+ amqp_bytes_t bytes;
+
+ bytes.bytes = malloc(space);
+ bytes.len = 0;
+
+ for (;;) {
+ ssize_t res = read(fd, bytes.bytes+bytes.len, space-bytes.len);
+ if (res == 0)
+ break;
+
+ if (res < 0) {
+ if (errno == EINTR)
+ continue;
+
+ die_errno(errno, "reading");
+ }
+
+ bytes.len += res;
+ if (bytes.len == space) {
+ space *= 2;
+ bytes.bytes = realloc(bytes.bytes, space);
+ }
+ }
+
+ return bytes;
+}
+
+void write_all(int fd, amqp_bytes_t data)
+{
+ while (data.len > 0) {
+ ssize_t res = write(fd, data.bytes, data.len);
+ if (res < 0)
+ die_errno(errno, "write");
+
+ data.len -= res;
+ data.bytes += res;
+ }
+}
+
+void copy_body(amqp_connection_state_t conn, int fd)
+{
+ size_t body_remaining;
+ amqp_frame_t frame;
+
+ int res = amqp_simple_wait_frame(conn, &frame);
+ if (res < 0)
+ die_errno(-res, "waiting for header frame");
+ if (frame.frame_type != AMQP_FRAME_HEADER)
+ die("expected header, got frame type 0x%X",
+ frame.frame_type);
+
+ body_remaining = frame.payload.properties.body_size;
+ while (body_remaining) {
+ res = amqp_simple_wait_frame(conn, &frame);
+ if (res < 0)
+ die_errno(-res, "waiting for body frame");
+ if (frame.frame_type != AMQP_FRAME_BODY)
+ die("expected body, got frame type 0x%X",
+ frame.frame_type);
+
+ write_all(fd, frame.payload.body_fragment);
+ body_remaining -= frame.payload.body_fragment.len;
+ }
+}
+
+void pipeline(const char * const *argv, struct pipeline *pl)
+{
+ posix_spawn_file_actions_t file_acts;
+
+ int pipefds[2];
+ if (pipe(pipefds))
+ die_errno(errno, "pipe");
+
+ die_errno(posix_spawn_file_actions_init(&file_acts),
+ "posix_spawn_file_actions_init");
+ die_errno(posix_spawn_file_actions_adddup2(&file_acts, pipefds[0], 0),
+ "posix_spawn_file_actions_adddup2");
+ die_errno(posix_spawn_file_actions_addclose(&file_acts, pipefds[0]),
+ "posix_spawn_file_actions_addclose");
+ die_errno(posix_spawn_file_actions_addclose(&file_acts, pipefds[1]),
+ "posix_spawn_file_actions_addclose");
+
+ die_errno(posix_spawnp(&pl->pid, argv[0], &file_acts, NULL,
+ (char * const *)argv, environ),
+ "posix_spawnp: %s", argv[0]);
+
+ die_errno(posix_spawn_file_actions_destroy(&file_acts),
+ "posix_spawn_file_actions_destroy");
+
+ if (close(pipefds[0]))
+ die_errno(errno, "close");
+
+ pl->infd = pipefds[1];
+}
+
+int finish_pipeline(struct pipeline *pl)
+{
+ int status;
+
+ if (close(pl->infd))
+ die_errno(errno, "close");
+ if (waitpid(pl->pid, &status, 0) < 0)
+ die_errno(errno, "waitpid");
+ return WIFEXITED(status) && WEXITSTATUS(status) == 0;
+}
+
+poptContext process_options(int argc, const char **argv,
+ struct poptOption *options,
+ const char *help)
+{
+ int c;
+ poptContext opts = poptGetContext(NULL, argc, argv, options, 0);
+ poptSetOtherOptionHelp(opts, help);
+
+ while ((c = poptGetNextOpt(opts)) >= 0) {
+ // no options require explicit handling
+ }
+
+ if (c < -1) {
+ fprintf(stderr, "%s: %s\n",
+ poptBadOption(opts, POPT_BADOPTION_NOALIAS),
+ poptStrerror(c));
+ poptPrintUsage(opts, stderr, 0);
+ exit(1);
+ }
+
+ return opts;
+}
+
+void process_all_options(int argc, const char **argv,
+ struct poptOption *options)
+{
+ poptContext opts = process_options(argc, argv, options,
+ "[OPTIONS]...");
+ const char *opt = poptPeekArg(opts);
+
+ if (opt) {
+ fprintf(stderr, "unexpected operand: %s\n", opt);
+ poptPrintUsage(opts, stderr, 0);
+ exit(1);
+ }
+
+ poptFreeContext(opts);
+}
+
+amqp_bytes_t cstring_bytes(const char *str)
+{
+ return str ? amqp_cstring_bytes(str) : AMQP_EMPTY_BYTES;
+}