// Copyright 2007 - 2021, Alan Antonuk and the rabbitmq-c contributors. // SPDX-License-Identifier: mit #include "common.h" #ifdef WITH_SSL #include #endif #include #include #include #include #include #include #include #include #ifdef WINDOWS #include "compat.h" #endif void die(const char *fmt, ...) { va_list ap; va_start(ap, fmt); vfprintf(stderr, fmt, ap); va_end(ap); fprintf(stderr, "\n"); exit(1); } void die_errno(int err, const char *fmt, ...) { va_list ap; if (err == 0) { return; } va_start(ap, fmt); vfprintf(stderr, fmt, ap); va_end(ap); fprintf(stderr, ": %s\n", strerror(err)); exit(1); } void die_amqp_error(int err, const char *fmt, ...) { va_list ap; if (err >= 0) { return; } va_start(ap, fmt); vfprintf(stderr, fmt, ap); va_end(ap); fprintf(stderr, ": %s\n", amqp_error_string2(err)); exit(1); } const char *amqp_server_exception_string(amqp_rpc_reply_t r) { int res; static char s[512]; switch (r.reply.id) { case AMQP_CONNECTION_CLOSE_METHOD: { amqp_connection_close_t *m = (amqp_connection_close_t *)r.reply.decoded; res = snprintf(s, sizeof(s), "server connection error %d, message: %.*s", m->reply_code, (int)m->reply_text.len, (char *)m->reply_text.bytes); break; } case AMQP_CHANNEL_CLOSE_METHOD: { amqp_channel_close_t *m = (amqp_channel_close_t *)r.reply.decoded; res = snprintf(s, sizeof(s), "server channel error %d, message: %.*s", m->reply_code, (int)m->reply_text.len, (char *)m->reply_text.bytes); break; } default: res = snprintf(s, sizeof(s), "unknown server error, method id 0x%08X", r.reply.id); break; } return res >= 0 ? s : NULL; } const char *amqp_rpc_reply_string(amqp_rpc_reply_t r) { switch (r.reply_type) { case AMQP_RESPONSE_NORMAL: return "normal response"; case AMQP_RESPONSE_NONE: return "missing RPC reply type"; case AMQP_RESPONSE_LIBRARY_EXCEPTION: return amqp_error_string2(r.library_error); case AMQP_RESPONSE_SERVER_EXCEPTION: return amqp_server_exception_string(r); default: abort(); } } void die_rpc(amqp_rpc_reply_t r, const char *fmt, ...) { va_list ap; if (r.reply_type == AMQP_RESPONSE_NORMAL) { return; } va_start(ap, fmt); vfprintf(stderr, fmt, ap); va_end(ap); fprintf(stderr, ": %s\n", amqp_rpc_reply_string(r)); exit(1); } static char *amqp_url; static char *amqp_server; static int amqp_port = -1; static char *amqp_vhost; static char *amqp_username; static char *amqp_password; static int amqp_heartbeat = 0; #ifdef WITH_SSL static int amqp_ssl = 0; static char *amqp_cacert = "/etc/ssl/certs/cacert.pem"; static char *amqp_key = NULL; static char *amqp_cert = NULL; #endif /* WITH_SSL */ const char *connect_options_title = "Connection options"; struct poptOption connect_options[] = { {"url", 'u', POPT_ARG_STRING, &amqp_url, 0, "the AMQP URL to connect to", "amqp://..."}, {"server", 's', POPT_ARG_STRING, &amqp_server, 0, "the AMQP server to connect to", "hostname"}, {"port", 0, POPT_ARG_INT, &amqp_port, 0, "the port to connect on", "port"}, {"vhost", 0, POPT_ARG_STRING, &amqp_vhost, 0, "the vhost to use when connecting", "vhost"}, {"username", 0, POPT_ARG_STRING, &amqp_username, 0, "the username to login with", "username"}, {"password", 0, POPT_ARG_STRING, &amqp_password, 0, "the password to login with", "password"}, {"heartbeat", 0, POPT_ARG_INT, &amqp_heartbeat, 0, "heartbeat interval, set to 0 to disable", "heartbeat"}, #ifdef WITH_SSL {"ssl", 0, POPT_ARG_NONE, &amqp_ssl, 0, "connect over SSL/TLS", NULL}, {"cacert", 0, POPT_ARG_STRING, &amqp_cacert, 0, "path to the CA certificate file", "cacert.pem"}, {"key", 0, POPT_ARG_STRING, &amqp_key, 0, "path to the client private key file", "key.pem"}, {"cert", 0, POPT_ARG_STRING, &amqp_cert, 0, "path to the client certificate file", "cert.pem"}, #endif /* WITH_SSL */ {NULL, '\0', 0, NULL, 0, NULL, NULL}}; static void init_connection_info(struct amqp_connection_info *ci) { ci->user = NULL; ci->password = NULL; ci->host = NULL; ci->port = -1; ci->vhost = NULL; ci->user = NULL; amqp_default_connection_info(ci); if (amqp_url) die_amqp_error(amqp_parse_url(strdup(amqp_url), ci), "Parsing URL '%s'", amqp_url); if (amqp_server) { char *colon; if (amqp_url) { die("--server and --url options cannot be used at the same time"); } /* parse the server string into a hostname and a port */ colon = strchr(amqp_server, ':'); if (colon) { char *port_end; size_t host_len; /* Deprecate specifying the port number with the --server option, because it is not ipv6 friendly. --url now allows connection options to be specified concisely. */ fprintf(stderr, "Specifying the port number with --server is deprecated\n"); host_len = colon - amqp_server; ci->host = malloc(host_len + 1); memcpy(ci->host, amqp_server, host_len); ci->host[host_len] = 0; if (amqp_port >= 0) { die("both --server and --port options specify server port"); } ci->port = strtol(colon + 1, &port_end, 10); if (ci->port < 0 || ci->port > 65535 || port_end == colon + 1 || *port_end != 0) die("bad server port number in '%s'", amqp_server); } else { ci->host = amqp_server; ci->port = 5672; #if WITH_SSL if (amqp_ssl) { ci->port = 5671; } #endif } } #if WITH_SSL if (amqp_ssl && !ci->ssl) { if (amqp_url) { die("the --ssl option specifies an SSL connection" " but the --url option does not"); } else { ci->ssl = 1; } } #endif if (amqp_port >= 0) { if (amqp_url) { die("--port and --url options cannot be used at the same time"); } ci->port = amqp_port; } if (amqp_username) { if (amqp_url) { die("--username and --url options cannot be used at the same time"); } ci->user = amqp_username; } if (amqp_password) { if (amqp_url) { die("--password and --url options cannot be used at the same time"); } ci->password = amqp_password; } if (amqp_vhost) { if (amqp_url) { die("--vhost and --url options cannot be used at the same time"); } ci->vhost = amqp_vhost; } if (amqp_heartbeat < 0) { die("--heartbeat must be a positive value"); } } amqp_connection_state_t make_connection(void) { int status; amqp_socket_t *socket = NULL; struct amqp_connection_info ci; amqp_connection_state_t conn; init_connection_info(&ci); conn = amqp_new_connection(); if (ci.ssl) { #ifdef WITH_SSL socket = amqp_ssl_socket_new(conn); if (!socket) { die("creating SSL/TLS socket"); } if (amqp_cacert) { amqp_ssl_socket_set_cacert(socket, amqp_cacert); } if (amqp_key) { amqp_ssl_socket_set_key(socket, amqp_cert, amqp_key); } #else die("librabbitmq was not built with SSL/TLS support"); #endif } else { socket = amqp_tcp_socket_new(conn); if (!socket) { die("creating TCP socket (out of memory)"); } } status = amqp_socket_open(socket, ci.host, ci.port); if (status) { die_amqp_error(status, "opening socket to %s:%d", ci.host, ci.port); } die_rpc(amqp_login(conn, ci.vhost, 0, 131072, amqp_heartbeat, AMQP_SASL_METHOD_PLAIN, ci.user, ci.password), "logging in to AMQP server"); if (!amqp_channel_open(conn, 1)) { die_rpc(amqp_get_rpc_reply(conn), "opening channel"); } return conn; } void close_connection(amqp_connection_state_t conn) { int res; die_rpc(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "closing channel"); die_rpc(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "closing connection"); res = amqp_destroy_connection(conn); die_amqp_error(res, "closing connection"); } amqp_bytes_t read_all(int fd) { size_t space = 4096; amqp_bytes_t bytes; bytes.bytes = malloc(space); bytes.len = 0; for (;;) { ssize_t res = read(fd, (char *)bytes.bytes + bytes.len, space - bytes.len); if (res == 0) { break; } if (res < 0) { if (errno == EINTR) { continue; } die_errno(errno, "reading"); } bytes.len += res; if (bytes.len == space) { space *= 2; bytes.bytes = realloc(bytes.bytes, space); } } return bytes; } void write_all(int fd, amqp_bytes_t data) { while (data.len > 0) { ssize_t res = write(fd, data.bytes, data.len); if (res < 0) { die_errno(errno, "write"); } data.len -= res; data.bytes = (char *)data.bytes + res; } } void copy_body(amqp_connection_state_t conn, int fd) { size_t body_remaining; amqp_frame_t frame; int res = amqp_simple_wait_frame(conn, &frame); die_amqp_error(res, "waiting for header frame"); if (frame.frame_type != AMQP_FRAME_HEADER) { die("expected header, got frame type 0x%X", frame.frame_type); } body_remaining = frame.payload.properties.body_size; while (body_remaining) { res = amqp_simple_wait_frame(conn, &frame); die_amqp_error(res, "waiting for body frame"); if (frame.frame_type != AMQP_FRAME_BODY) { die("expected body, got frame type 0x%X", frame.frame_type); } write_all(fd, frame.payload.body_fragment); body_remaining -= frame.payload.body_fragment.len; } } poptContext process_options(int argc, const char **argv, struct poptOption *options, const char *help) { int c; poptContext opts = poptGetContext(NULL, argc, argv, options, 0); poptSetOtherOptionHelp(opts, help); while ((c = poptGetNextOpt(opts)) >= 0) { /* no options require explicit handling */ } if (c < -1) { fprintf(stderr, "%s: %s\n", poptBadOption(opts, POPT_BADOPTION_NOALIAS), poptStrerror(c)); poptPrintUsage(opts, stderr, 0); exit(1); } return opts; } void process_all_options(int argc, const char **argv, struct poptOption *options) { poptContext opts = process_options(argc, argv, options, "[OPTIONS]..."); const char *opt = poptPeekArg(opts); if (opt) { fprintf(stderr, "unexpected operand: %s\n", opt); poptPrintUsage(opts, stderr, 0); exit(1); } poptFreeContext(opts); } amqp_bytes_t cstring_bytes(const char *str) { return str ? amqp_cstring_bytes(str) : amqp_empty_bytes; }