diff options
author | David Wragg <dpw@lshift.net> | 2010-02-20 22:58:40 +0000 |
---|---|---|
committer | David Wragg <dpw@lshift.net> | 2010-02-20 22:58:40 +0000 |
commit | c90336ef0b6c8507a2f409db7e33dd1844b25517 (patch) | |
tree | fe58dc22a21ac75341dd1064fb48ba60a5c2e70c /tools/common.c | |
parent | 55ac202750859482c4319addb8c54368b2369455 (diff) | |
download | rabbitmq-c-github-ask-c90336ef0b6c8507a2f409db7e33dd1844b25517.tar.gz |
Command line AMQP tools based on rabbitmq-c
Diffstat (limited to 'tools/common.c')
-rw-r--r-- | tools/common.c | 364 |
1 files changed, 364 insertions, 0 deletions
diff --git a/tools/common.c b/tools/common.c new file mode 100644 index 0000000..7baab8c --- /dev/null +++ b/tools/common.c @@ -0,0 +1,364 @@ +#include "config.h" + +#include <stdio.h> +#include <stdlib.h> +#include <stdarg.h> +#include <string.h> + +#include <unistd.h> +#include <fcntl.h> +#include <errno.h> +#include <spawn.h> +#include <sys/wait.h> + +#include <popt.h> + +#include "common.h" + +extern char **environ; + +void die(const char *fmt, ...) +{ + va_list ap; + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); + fprintf(stderr, "\n"); + exit(1); +} + +void die_errno(int err, const char *fmt, ...) +{ + if (err == 0) + return; + + va_list ap; + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); + fprintf(stderr, ": %s\n", strerror(err)); + exit(1); +} + +char *amqp_server_exception_string(amqp_rpc_reply_t r) +{ + int res; + char *s; + + switch (r.reply.id) { + case AMQP_CONNECTION_CLOSE_METHOD: { + amqp_connection_close_t *m + = (amqp_connection_close_t *)r.reply.decoded; + res = asprintf(&s, "server connection error %d, message: %.*s", + m->reply_code, + (int)m->reply_text.len, + (char *)m->reply_text.bytes); + break; + } + + case AMQP_CHANNEL_CLOSE_METHOD: { + amqp_channel_close_t *m + = (amqp_channel_close_t *)r.reply.decoded; + res = asprintf(&s, "server channel error %d, message: %.*s", + m->reply_code, + (int)m->reply_text.len, + (char *)m->reply_text.bytes); + break; + } + + default: + res = asprintf(&s, "unknown server error, method id 0x%08X", + r.reply.id); + break; + } + + return res >= 0 ? s : NULL; +} + +char *amqp_rpc_reply_string(amqp_rpc_reply_t r) +{ + const char *s; + + switch (r.reply_type) { + case AMQP_RESPONSE_NORMAL: + s = "normal response"; + break; + + case AMQP_RESPONSE_NONE: + s = "missing RPC reply type"; + break; + + case AMQP_RESPONSE_LIBRARY_EXCEPTION: + if (r.library_errno) + s = strerror(r.library_errno); + else + s = "end of stream"; + + break; + + case AMQP_RESPONSE_SERVER_EXCEPTION: + return amqp_server_exception_string(r); + + default: + abort(); + } + + return strdup(s); +} + +void die_rpc(amqp_rpc_reply_t r, const char *fmt, ...) +{ + if (r.reply_type == AMQP_RESPONSE_NORMAL) + return; + + va_list ap; + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); + fprintf(stderr, ": %s\n", amqp_rpc_reply_string(r)); + exit(1); +} + +void set_cloexec(int fd) +{ + int flags; + + flags = fcntl(fd, F_GETFD); + if (flags == -1 + || fcntl(fd, F_SETFD, (long)(flags | FD_CLOEXEC)) == -1) + die_errno(errno, "set_cloexec"); +} + +static char *amqp_server = "localhost"; +static char *amqp_vhost = "/"; +static char *amqp_username = "guest"; +static char *amqp_password = "guest"; + +const char *connect_options_title = "Connection options"; +struct poptOption connect_options[] = { + {"server", 's', POPT_ARG_STRING, &amqp_server, 0, + "the AMQP server to connect to", "server"}, + {"vhost", 0, POPT_ARG_STRING, &amqp_vhost, 0, + "the vhost to use when connecting", "vhost"}, + {"username", 0, POPT_ARG_STRING, &amqp_username, 0, + "the username to login with", "username"}, + {"password", 0, POPT_ARG_STRING, &amqp_password, 0, + "the password to login with", "password"}, + { NULL, 0, 0, NULL, 0 } +}; + +amqp_connection_state_t make_connection(void) +{ + int s; + amqp_connection_state_t conn; + char *host = amqp_server; + int port = 0; + + /* parse the server string into a hostname and a port */ + char *colon = strchr(amqp_server, ':'); + if (colon) { + char *port_end; + size_t host_len = colon - amqp_server; + host = malloc(host_len + 1); + memcpy(host, amqp_server, host_len); + host[host_len] = 0; + + port = strtol(colon+1, &port_end, 10); + if (port < 0 + || port > 65535 + || port_end == colon+1 + || *port_end != 0) + die("bad server port number in %s", amqp_server); + } + + s = amqp_open_socket(host, port ? port : 5672); + if (s < 0) { + if (s == -ENOENT) + die("unknown host %s", host); + else + die_errno(-s, "opening socket to %s", amqp_server); + } + + set_cloexec(s); + + conn = amqp_new_connection(); + amqp_set_sockfd(conn, s); + + die_rpc(amqp_login(conn, amqp_vhost, 0, 131072, 0, + AMQP_SASL_METHOD_PLAIN, + amqp_username, amqp_password), + "logging in to AMQP server"); + + if (!amqp_channel_open(conn, 1)) + die_rpc(amqp_get_rpc_reply(conn), "opening channel"); + + return conn; +} + +void close_connection(amqp_connection_state_t conn) +{ + int s = amqp_get_sockfd(conn); + + die_rpc(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), + "closing channel"); + die_rpc(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), + "closing connection"); + amqp_destroy_connection(conn); + + if (close(s) < 0) + die_errno(errno, "closing socket"); +} + +amqp_bytes_t read_all(int fd) +{ + size_t space = 4096; + amqp_bytes_t bytes; + + bytes.bytes = malloc(space); + bytes.len = 0; + + for (;;) { + ssize_t res = read(fd, bytes.bytes+bytes.len, space-bytes.len); + if (res == 0) + break; + + if (res < 0) { + if (errno == EINTR) + continue; + + die_errno(errno, "reading"); + } + + bytes.len += res; + if (bytes.len == space) { + space *= 2; + bytes.bytes = realloc(bytes.bytes, space); + } + } + + return bytes; +} + +void write_all(int fd, amqp_bytes_t data) +{ + while (data.len > 0) { + ssize_t res = write(fd, data.bytes, data.len); + if (res < 0) + die_errno(errno, "write"); + + data.len -= res; + data.bytes += res; + } +} + +void copy_body(amqp_connection_state_t conn, int fd) +{ + size_t body_remaining; + amqp_frame_t frame; + + int res = amqp_simple_wait_frame(conn, &frame); + if (res < 0) + die_errno(-res, "waiting for header frame"); + if (frame.frame_type != AMQP_FRAME_HEADER) + die("expected header, got frame type 0x%X", + frame.frame_type); + + body_remaining = frame.payload.properties.body_size; + while (body_remaining) { + res = amqp_simple_wait_frame(conn, &frame); + if (res < 0) + die_errno(-res, "waiting for body frame"); + if (frame.frame_type != AMQP_FRAME_BODY) + die("expected body, got frame type 0x%X", + frame.frame_type); + + write_all(fd, frame.payload.body_fragment); + body_remaining -= frame.payload.body_fragment.len; + } +} + +void pipeline(const char * const *argv, struct pipeline *pl) +{ + posix_spawn_file_actions_t file_acts; + + int pipefds[2]; + if (pipe(pipefds)) + die_errno(errno, "pipe"); + + die_errno(posix_spawn_file_actions_init(&file_acts), + "posix_spawn_file_actions_init"); + die_errno(posix_spawn_file_actions_adddup2(&file_acts, pipefds[0], 0), + "posix_spawn_file_actions_adddup2"); + die_errno(posix_spawn_file_actions_addclose(&file_acts, pipefds[0]), + "posix_spawn_file_actions_addclose"); + die_errno(posix_spawn_file_actions_addclose(&file_acts, pipefds[1]), + "posix_spawn_file_actions_addclose"); + + die_errno(posix_spawnp(&pl->pid, argv[0], &file_acts, NULL, + (char * const *)argv, environ), + "posix_spawnp: %s", argv[0]); + + die_errno(posix_spawn_file_actions_destroy(&file_acts), + "posix_spawn_file_actions_destroy"); + + if (close(pipefds[0])) + die_errno(errno, "close"); + + pl->infd = pipefds[1]; +} + +int finish_pipeline(struct pipeline *pl) +{ + int status; + + if (close(pl->infd)) + die_errno(errno, "close"); + if (waitpid(pl->pid, &status, 0) < 0) + die_errno(errno, "waitpid"); + return WIFEXITED(status) && WEXITSTATUS(status) == 0; +} + +poptContext process_options(int argc, const char **argv, + struct poptOption *options, + const char *help) +{ + int c; + poptContext opts = poptGetContext(NULL, argc, argv, options, 0); + + if (!help) + poptSetOtherOptionHelp(opts, "[OPTIONS]... <command> <args>"); + + while ((c = poptGetNextOpt(opts)) >= 0) { + // no options require explicit handling + } + + if (c < -1) { + fprintf(stderr, "%s: %s\n", + poptBadOption(opts, POPT_BADOPTION_NOALIAS), + poptStrerror(c)); + poptPrintUsage(opts, stderr, 0); + exit(1); + } + + return opts; +} + +void process_all_options(int argc, const char **argv, + struct poptOption *options) +{ + poptContext opts = process_options(argc, argv, options, NULL); + const char *opt = poptPeekArg(opts); + + if (opt) { + fprintf(stderr, "unexpected operand: %s\n", opt); + poptPrintUsage(opts, stderr, 0); + exit(1); + } + + poptFreeContext(opts); +} + +amqp_bytes_t cstring_bytes(const char *str) +{ + return str ? amqp_cstring_bytes(str) : AMQP_EMPTY_BYTES; +} |