diff options
Diffstat (limited to 'tools/common.c')
-rw-r--r-- | tools/common.c | 413 |
1 files changed, 413 insertions, 0 deletions
diff --git a/tools/common.c b/tools/common.c new file mode 100644 index 0000000..0772738 --- /dev/null +++ b/tools/common.c @@ -0,0 +1,413 @@ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MPL 1.1/GPL 2.0 + * + * The contents of this file are subject to the Mozilla Public License + * Version 1.1 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * http://www.mozilla.org/MPL/ + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and + * limitations under the License. + * + * The Original Code is librabbitmq. + * + * The Initial Developers of the Original Code are LShift Ltd, Cohesive + * Financial Technologies LLC, and Rabbit Technologies Ltd. Portions + * created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, Cohesive + * Financial Technologies LLC, or Rabbit Technologies Ltd are Copyright + * (C) 2007-2008 LShift Ltd, Cohesive Financial Technologies LLC, and + * Rabbit Technologies Ltd. + * + * Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift + * Ltd. Portions created by Cohesive Financial Technologies LLC are + * Copyright (C) 2007-2009 Cohesive Financial Technologies + * LLC. Portions created by Rabbit Technologies Ltd are Copyright (C) + * 2007-2009 Rabbit Technologies Ltd. + * + * Portions created by Tony Garnock-Jones are Copyright (C) 2009-2010 + * LShift Ltd and Tony Garnock-Jones. + * + * All Rights Reserved. + * + * Contributor(s): ______________________________________. + * + * Alternatively, the contents of this file may be used under the terms + * of the GNU General Public License Version 2 or later (the "GPL"), in + * which case the provisions of the GPL are applicable instead of those + * above. If you wish to allow use of your version of this file only + * under the terms of the GPL, and not to allow others to use your + * version of this file under the terms of the MPL, indicate your + * decision by deleting the provisions above and replace them with the + * notice and other provisions required by the GPL. If you do not + * delete the provisions above, a recipient may use your version of + * this file under the terms of any one of the MPL or the GPL. + * + * ***** END LICENSE BLOCK ***** + */ + +#include "config.h" + +#include <stdio.h> +#include <stdlib.h> +#include <stdarg.h> +#include <string.h> + +#include <unistd.h> +#include <fcntl.h> +#include <errno.h> +#include <spawn.h> +#include <sys/wait.h> + +#include <popt.h> + +#include "common.h" + +extern char **environ; + +void die(const char *fmt, ...) +{ + va_list ap; + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); + fprintf(stderr, "\n"); + exit(1); +} + +void die_errno(int err, const char *fmt, ...) +{ + if (err == 0) + return; + + va_list ap; + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); + fprintf(stderr, ": %s\n", strerror(err)); + exit(1); +} + +char *amqp_server_exception_string(amqp_rpc_reply_t r) +{ + int res; + char *s; + + switch (r.reply.id) { + case AMQP_CONNECTION_CLOSE_METHOD: { + amqp_connection_close_t *m + = (amqp_connection_close_t *)r.reply.decoded; + res = asprintf(&s, "server connection error %d, message: %.*s", + m->reply_code, + (int)m->reply_text.len, + (char *)m->reply_text.bytes); + break; + } + + case AMQP_CHANNEL_CLOSE_METHOD: { + amqp_channel_close_t *m + = (amqp_channel_close_t *)r.reply.decoded; + res = asprintf(&s, "server channel error %d, message: %.*s", + m->reply_code, + (int)m->reply_text.len, + (char *)m->reply_text.bytes); + break; + } + + default: + res = asprintf(&s, "unknown server error, method id 0x%08X", + r.reply.id); + break; + } + + return res >= 0 ? s : NULL; +} + +char *amqp_rpc_reply_string(amqp_rpc_reply_t r) +{ + const char *s; + + switch (r.reply_type) { + case AMQP_RESPONSE_NORMAL: + s = "normal response"; + break; + + case AMQP_RESPONSE_NONE: + s = "missing RPC reply type"; + break; + + case AMQP_RESPONSE_LIBRARY_EXCEPTION: + if (r.library_errno) + s = strerror(r.library_errno); + else + s = "end of stream"; + + break; + + case AMQP_RESPONSE_SERVER_EXCEPTION: + return amqp_server_exception_string(r); + + default: + abort(); + } + + return strdup(s); +} + +void die_rpc(amqp_rpc_reply_t r, const char *fmt, ...) +{ + if (r.reply_type == AMQP_RESPONSE_NORMAL) + return; + + va_list ap; + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); + fprintf(stderr, ": %s\n", amqp_rpc_reply_string(r)); + exit(1); +} + +void set_cloexec(int fd) +{ + int flags; + + flags = fcntl(fd, F_GETFD); + if (flags == -1 + || fcntl(fd, F_SETFD, (long)(flags | FD_CLOEXEC)) == -1) + die_errno(errno, "set_cloexec"); +} + +static char *amqp_server = "localhost"; +static char *amqp_vhost = "/"; +static char *amqp_username = "guest"; +static char *amqp_password = "guest"; + +const char *connect_options_title = "Connection options"; +struct poptOption connect_options[] = { + {"server", 's', POPT_ARG_STRING, &amqp_server, 0, + "the AMQP server to connect to", "server"}, + {"vhost", 0, POPT_ARG_STRING, &amqp_vhost, 0, + "the vhost to use when connecting", "vhost"}, + {"username", 0, POPT_ARG_STRING, &amqp_username, 0, + "the username to login with", "username"}, + {"password", 0, POPT_ARG_STRING, &amqp_password, 0, + "the password to login with", "password"}, + { NULL, 0, 0, NULL, 0 } +}; + +amqp_connection_state_t make_connection(void) +{ + int s; + amqp_connection_state_t conn; + char *host = amqp_server; + int port = 0; + + /* parse the server string into a hostname and a port */ + char *colon = strchr(amqp_server, ':'); + if (colon) { + char *port_end; + size_t host_len = colon - amqp_server; + host = malloc(host_len + 1); + memcpy(host, amqp_server, host_len); + host[host_len] = 0; + + port = strtol(colon+1, &port_end, 10); + if (port < 0 + || port > 65535 + || port_end == colon+1 + || *port_end != 0) + die("bad server port number in %s", amqp_server); + } + + s = amqp_open_socket(host, port ? port : 5672); + if (s < 0) { + if (s == -ENOENT) + die("unknown host %s", host); + else + die_errno(-s, "opening socket to %s", amqp_server); + } + + set_cloexec(s); + + conn = amqp_new_connection(); + amqp_set_sockfd(conn, s); + + die_rpc(amqp_login(conn, amqp_vhost, 0, 131072, 0, + AMQP_SASL_METHOD_PLAIN, + amqp_username, amqp_password), + "logging in to AMQP server"); + + if (!amqp_channel_open(conn, 1)) + die_rpc(amqp_get_rpc_reply(conn), "opening channel"); + + return conn; +} + +void close_connection(amqp_connection_state_t conn) +{ + int s = amqp_get_sockfd(conn); + + die_rpc(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), + "closing channel"); + die_rpc(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), + "closing connection"); + amqp_destroy_connection(conn); + + if (close(s) < 0) + die_errno(errno, "closing socket"); +} + +amqp_bytes_t read_all(int fd) +{ + size_t space = 4096; + amqp_bytes_t bytes; + + bytes.bytes = malloc(space); + bytes.len = 0; + + for (;;) { + ssize_t res = read(fd, bytes.bytes+bytes.len, space-bytes.len); + if (res == 0) + break; + + if (res < 0) { + if (errno == EINTR) + continue; + + die_errno(errno, "reading"); + } + + bytes.len += res; + if (bytes.len == space) { + space *= 2; + bytes.bytes = realloc(bytes.bytes, space); + } + } + + return bytes; +} + +void write_all(int fd, amqp_bytes_t data) +{ + while (data.len > 0) { + ssize_t res = write(fd, data.bytes, data.len); + if (res < 0) + die_errno(errno, "write"); + + data.len -= res; + data.bytes += res; + } +} + +void copy_body(amqp_connection_state_t conn, int fd) +{ + size_t body_remaining; + amqp_frame_t frame; + + int res = amqp_simple_wait_frame(conn, &frame); + if (res < 0) + die_errno(-res, "waiting for header frame"); + if (frame.frame_type != AMQP_FRAME_HEADER) + die("expected header, got frame type 0x%X", + frame.frame_type); + + body_remaining = frame.payload.properties.body_size; + while (body_remaining) { + res = amqp_simple_wait_frame(conn, &frame); + if (res < 0) + die_errno(-res, "waiting for body frame"); + if (frame.frame_type != AMQP_FRAME_BODY) + die("expected body, got frame type 0x%X", + frame.frame_type); + + write_all(fd, frame.payload.body_fragment); + body_remaining -= frame.payload.body_fragment.len; + } +} + +void pipeline(const char * const *argv, struct pipeline *pl) +{ + posix_spawn_file_actions_t file_acts; + + int pipefds[2]; + if (pipe(pipefds)) + die_errno(errno, "pipe"); + + die_errno(posix_spawn_file_actions_init(&file_acts), + "posix_spawn_file_actions_init"); + die_errno(posix_spawn_file_actions_adddup2(&file_acts, pipefds[0], 0), + "posix_spawn_file_actions_adddup2"); + die_errno(posix_spawn_file_actions_addclose(&file_acts, pipefds[0]), + "posix_spawn_file_actions_addclose"); + die_errno(posix_spawn_file_actions_addclose(&file_acts, pipefds[1]), + "posix_spawn_file_actions_addclose"); + + die_errno(posix_spawnp(&pl->pid, argv[0], &file_acts, NULL, + (char * const *)argv, environ), + "posix_spawnp: %s", argv[0]); + + die_errno(posix_spawn_file_actions_destroy(&file_acts), + "posix_spawn_file_actions_destroy"); + + if (close(pipefds[0])) + die_errno(errno, "close"); + + pl->infd = pipefds[1]; +} + +int finish_pipeline(struct pipeline *pl) +{ + int status; + + if (close(pl->infd)) + die_errno(errno, "close"); + if (waitpid(pl->pid, &status, 0) < 0) + die_errno(errno, "waitpid"); + return WIFEXITED(status) && WEXITSTATUS(status) == 0; +} + +poptContext process_options(int argc, const char **argv, + struct poptOption *options, + const char *help) +{ + int c; + poptContext opts = poptGetContext(NULL, argc, argv, options, 0); + poptSetOtherOptionHelp(opts, help); + + while ((c = poptGetNextOpt(opts)) >= 0) { + // no options require explicit handling + } + + if (c < -1) { + fprintf(stderr, "%s: %s\n", + poptBadOption(opts, POPT_BADOPTION_NOALIAS), + poptStrerror(c)); + poptPrintUsage(opts, stderr, 0); + exit(1); + } + + return opts; +} + +void process_all_options(int argc, const char **argv, + struct poptOption *options) +{ + poptContext opts = process_options(argc, argv, options, + "[OPTIONS]..."); + const char *opt = poptPeekArg(opts); + + if (opt) { + fprintf(stderr, "unexpected operand: %s\n", opt); + poptPrintUsage(opts, stderr, 0); + exit(1); + } + + poptFreeContext(opts); +} + +amqp_bytes_t cstring_bytes(const char *str) +{ + return str ? amqp_cstring_bytes(str) : AMQP_EMPTY_BYTES; +} |