From ad2b116059e22d393b7e44ad54f345a3fb4e267b Mon Sep 17 00:00:00 2001 From: Alan Antonuk Date: Mon, 8 Apr 2013 14:52:53 -0700 Subject: Formatted source code with astyle utilty --- tools/common.c | 627 ++++++++++++++++++++++++++------------------------ tools/common.h | 16 +- tools/consume.c | 307 ++++++++++++------------ tools/declare_queue.c | 69 +++--- tools/delete_queue.c | 72 +++--- tools/get.c | 55 ++--- tools/publish.c | 156 +++++++------ tools/unix/process.c | 56 ++--- tools/unix/process.h | 4 +- tools/win32/compat.c | 27 +-- tools/win32/process.c | 327 +++++++++++++------------- tools/win32/process.h | 4 +- 12 files changed, 910 insertions(+), 810 deletions(-) (limited to 'tools') diff --git a/tools/common.c b/tools/common.c index c5b2a77..bead858 100644 --- a/tools/common.c +++ b/tools/common.c @@ -54,113 +54,116 @@ void die(const char *fmt, ...) { - va_list ap; - va_start(ap, fmt); - vfprintf(stderr, fmt, ap); - va_end(ap); - fprintf(stderr, "\n"); - exit(1); + va_list ap; + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); + fprintf(stderr, "\n"); + exit(1); } void die_errno(int err, const char *fmt, ...) { - va_list ap; + va_list ap; - if (err == 0) - return; + if (err == 0) { + return; + } - va_start(ap, fmt); - vfprintf(stderr, fmt, ap); - va_end(ap); - fprintf(stderr, ": %s\n", strerror(err)); - exit(1); + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); + fprintf(stderr, ": %s\n", strerror(err)); + exit(1); } void die_amqp_error(int err, const char *fmt, ...) { - va_list ap; - char *errstr; - - if (err >= 0) - return; - - va_start(ap, fmt); - vfprintf(stderr, fmt, ap); - va_end(ap); - fprintf(stderr, ": %s\n", errstr = amqp_error_string(-err)); - free(errstr); - exit(1); + va_list ap; + char *errstr; + + if (err >= 0) { + return; + } + + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); + fprintf(stderr, ": %s\n", errstr = amqp_error_string(-err)); + free(errstr); + exit(1); } char *amqp_server_exception_string(amqp_rpc_reply_t r) { - int res; - char *s; - - switch (r.reply.id) { - case AMQP_CONNECTION_CLOSE_METHOD: { - amqp_connection_close_t *m - = (amqp_connection_close_t *)r.reply.decoded; - res = asprintf(&s, "server connection error %d, message: %.*s", - m->reply_code, - (int)m->reply_text.len, - (char *)m->reply_text.bytes); - break; - } - - case AMQP_CHANNEL_CLOSE_METHOD: { - amqp_channel_close_t *m - = (amqp_channel_close_t *)r.reply.decoded; - res = asprintf(&s, "server channel error %d, message: %.*s", - m->reply_code, - (int)m->reply_text.len, - (char *)m->reply_text.bytes); - break; - } - - default: - res = asprintf(&s, "unknown server error, method id 0x%08X", - r.reply.id); - break; - } - - return res >= 0 ? s : NULL; + int res; + char *s; + + switch (r.reply.id) { + case AMQP_CONNECTION_CLOSE_METHOD: { + amqp_connection_close_t *m + = (amqp_connection_close_t *)r.reply.decoded; + res = asprintf(&s, "server connection error %d, message: %.*s", + m->reply_code, + (int)m->reply_text.len, + (char *)m->reply_text.bytes); + break; + } + + case AMQP_CHANNEL_CLOSE_METHOD: { + amqp_channel_close_t *m + = (amqp_channel_close_t *)r.reply.decoded; + res = asprintf(&s, "server channel error %d, message: %.*s", + m->reply_code, + (int)m->reply_text.len, + (char *)m->reply_text.bytes); + break; + } + + default: + res = asprintf(&s, "unknown server error, method id 0x%08X", + r.reply.id); + break; + } + + return res >= 0 ? s : NULL; } char *amqp_rpc_reply_string(amqp_rpc_reply_t r) { - switch (r.reply_type) { - case AMQP_RESPONSE_NORMAL: - return strdup("normal response"); + switch (r.reply_type) { + case AMQP_RESPONSE_NORMAL: + return strdup("normal response"); - case AMQP_RESPONSE_NONE: - return strdup("missing RPC reply type"); + case AMQP_RESPONSE_NONE: + return strdup("missing RPC reply type"); - case AMQP_RESPONSE_LIBRARY_EXCEPTION: - return amqp_error_string(r.library_error); + case AMQP_RESPONSE_LIBRARY_EXCEPTION: + return amqp_error_string(r.library_error); - case AMQP_RESPONSE_SERVER_EXCEPTION: - return amqp_server_exception_string(r); + case AMQP_RESPONSE_SERVER_EXCEPTION: + return amqp_server_exception_string(r); - default: - abort(); - } + default: + abort(); + } } void die_rpc(amqp_rpc_reply_t r, const char *fmt, ...) { - va_list ap; - char *errstr; - - if (r.reply_type == AMQP_RESPONSE_NORMAL) - return; - - va_start(ap, fmt); - vfprintf(stderr, fmt, ap); - va_end(ap); - fprintf(stderr, ": %s\n", errstr = amqp_rpc_reply_string(r)); - free(errstr); - exit(1); + va_list ap; + char *errstr; + + if (r.reply_type == AMQP_RESPONSE_NORMAL) { + return; + } + + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); + fprintf(stderr, ": %s\n", errstr = amqp_rpc_reply_string(r)); + free(errstr); + exit(1); } static char *amqp_url; @@ -172,267 +175,299 @@ static char *amqp_password; const char *connect_options_title = "Connection options"; struct poptOption connect_options[] = { - {"url", 'u', POPT_ARG_STRING, &amqp_url, 0, - "the AMQP URL to connect to", "amqp://..."}, - {"server", 's', POPT_ARG_STRING, &amqp_server, 0, - "the AMQP server to connect to", "hostname"}, - {"port", 0, POPT_ARG_INT, &amqp_port, 0, - "the port to connect on", "port" }, - {"vhost", 0, POPT_ARG_STRING, &amqp_vhost, 0, - "the vhost to use when connecting", "vhost"}, - {"username", 0, POPT_ARG_STRING, &amqp_username, 0, - "the username to login with", "username"}, - {"password", 0, POPT_ARG_STRING, &amqp_password, 0, - "the password to login with", "password"}, - { NULL, '\0', 0, NULL, 0, NULL, NULL } + { + "url", 'u', POPT_ARG_STRING, &amqp_url, 0, + "the AMQP URL to connect to", "amqp://..." + }, + { + "server", 's', POPT_ARG_STRING, &amqp_server, 0, + "the AMQP server to connect to", "hostname" + }, + { + "port", 0, POPT_ARG_INT, &amqp_port, 0, + "the port to connect on", "port" + }, + { + "vhost", 0, POPT_ARG_STRING, &amqp_vhost, 0, + "the vhost to use when connecting", "vhost" + }, + { + "username", 0, POPT_ARG_STRING, &amqp_username, 0, + "the username to login with", "username" + }, + { + "password", 0, POPT_ARG_STRING, &amqp_password, 0, + "the password to login with", "password" + }, + { NULL, '\0', 0, NULL, 0, NULL, NULL } }; static void init_connection_info(struct amqp_connection_info *ci) { - struct amqp_connection_info defaults; + struct amqp_connection_info defaults; - ci->user = NULL; - ci->password = NULL; - ci->host = NULL; - ci->port = -1; - ci->vhost = NULL; - ci->user = NULL; + ci->user = NULL; + ci->password = NULL; + ci->host = NULL; + ci->port = -1; + ci->vhost = NULL; + ci->user = NULL; - if (amqp_url) - die_amqp_error(amqp_parse_url(strdup(amqp_url), ci), - "Parsing URL '%s'", amqp_url); + if (amqp_url) { + die_amqp_error(amqp_parse_url(strdup(amqp_url), ci), + "Parsing URL '%s'", amqp_url); + } - if (amqp_server) { + if (amqp_server) { char *colon; - if (ci->host) - die("both --server and --url options specify" - " server host"); - - /* parse the server string into a hostname and a port */ - colon = strchr(amqp_server, ':'); - if (colon) { - char *port_end; - size_t host_len; - - /* Deprecate specifying the port number with the - --server option, because it is not ipv6 friendly. - --url now allows connection options to be - specificied concisely. */ - fprintf(stderr, "Specifying the port number with" - " --server is deprecated\n"); - - host_len = colon - amqp_server; - ci->host = malloc(host_len + 1); - memcpy(ci->host, amqp_server, host_len); - ci->host[host_len] = 0; - - if (ci->port >= 0) - die("both --server and --url options specify" - " server port"); - if (amqp_port >= 0) - die("both --server and --port options specify" - " server port"); - - ci->port = strtol(colon+1, &port_end, 10); - if (ci->port < 0 - || ci->port > 65535 - || port_end == colon+1 - || *port_end != 0) - die("bad server port number in '%s'", - amqp_server); - } - } - - if (amqp_port >= 0) { - if (ci->port >= 0) - die("both --port and --url options specify" - " server port"); - - ci->port = amqp_port; - } - - if (amqp_username) { - if (ci->user) - die("both --username and --url options specify" - " AMQP username"); - - ci->user = amqp_username; - } - - if (amqp_password) { - if (ci->password) - die("both --password and --url options specify" - " AMQP password"); - - ci->password = amqp_password; - } - - if (amqp_vhost) { - if (ci->vhost) - die("both --vhost and --url options specify" - " AMQP vhost"); - - ci->vhost = amqp_vhost; - } - - amqp_default_connection_info(&defaults); - - if (!ci->user) - ci->user = defaults.user; - if (!ci->password) - ci->password = defaults.password; - if (!ci->host) - ci->host = defaults.host; - if (ci->port < 0) - ci->port = defaults.port; - if (!ci->vhost) - ci->vhost = defaults.vhost; + if (ci->host) { + die("both --server and --url options specify" + " server host"); + } + + /* parse the server string into a hostname and a port */ + colon = strchr(amqp_server, ':'); + if (colon) { + char *port_end; + size_t host_len; + + /* Deprecate specifying the port number with the + --server option, because it is not ipv6 friendly. + --url now allows connection options to be + specificied concisely. */ + fprintf(stderr, "Specifying the port number with" + " --server is deprecated\n"); + + host_len = colon - amqp_server; + ci->host = malloc(host_len + 1); + memcpy(ci->host, amqp_server, host_len); + ci->host[host_len] = 0; + + if (ci->port >= 0) { + die("both --server and --url options specify" + " server port"); + } + if (amqp_port >= 0) { + die("both --server and --port options specify" + " server port"); + } + + ci->port = strtol(colon+1, &port_end, 10); + if (ci->port < 0 + || ci->port > 65535 + || port_end == colon+1 + || *port_end != 0) { + die("bad server port number in '%s'", + amqp_server); + } + } + } + + if (amqp_port >= 0) { + if (ci->port >= 0) { + die("both --port and --url options specify" + " server port"); + } + + ci->port = amqp_port; + } + + if (amqp_username) { + if (ci->user) { + die("both --username and --url options specify" + " AMQP username"); + } + + ci->user = amqp_username; + } + + if (amqp_password) { + if (ci->password) { + die("both --password and --url options specify" + " AMQP password"); + } + + ci->password = amqp_password; + } + + if (amqp_vhost) { + if (ci->vhost) { + die("both --vhost and --url options specify" + " AMQP vhost"); + } + + ci->vhost = amqp_vhost; + } + + amqp_default_connection_info(&defaults); + + if (!ci->user) { + ci->user = defaults.user; + } + if (!ci->password) { + ci->password = defaults.password; + } + if (!ci->host) { + ci->host = defaults.host; + } + if (ci->port < 0) { + ci->port = defaults.port; + } + if (!ci->vhost) { + ci->vhost = defaults.vhost; + } } amqp_connection_state_t make_connection(void) { - int s; - struct amqp_connection_info ci; - amqp_connection_state_t conn; + int s; + struct amqp_connection_info ci; + amqp_connection_state_t conn; - init_connection_info(&ci); + init_connection_info(&ci); - s = amqp_open_socket(ci.host, ci.port); - die_amqp_error(s, "opening socket to %s:%d", ci.host, ci.port); + s = amqp_open_socket(ci.host, ci.port); + die_amqp_error(s, "opening socket to %s:%d", ci.host, ci.port); - conn = amqp_new_connection(); - amqp_set_sockfd(conn, s); + conn = amqp_new_connection(); + amqp_set_sockfd(conn, s); - die_rpc(amqp_login(conn, ci.vhost, 0, 131072, 0, - AMQP_SASL_METHOD_PLAIN, - ci.user, ci.password), - "logging in to AMQP server"); + die_rpc(amqp_login(conn, ci.vhost, 0, 131072, 0, + AMQP_SASL_METHOD_PLAIN, + ci.user, ci.password), + "logging in to AMQP server"); - if (!amqp_channel_open(conn, 1)) - die_rpc(amqp_get_rpc_reply(conn), "opening channel"); + if (!amqp_channel_open(conn, 1)) { + die_rpc(amqp_get_rpc_reply(conn), "opening channel"); + } - return conn; + return conn; } void close_connection(amqp_connection_state_t conn) { - int res; - die_rpc(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), - "closing channel"); - die_rpc(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), - "closing connection"); - - res = amqp_destroy_connection(conn); - die_amqp_error(res, "closing connection"); + int res; + die_rpc(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), + "closing channel"); + die_rpc(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), + "closing connection"); + + res = amqp_destroy_connection(conn); + die_amqp_error(res, "closing connection"); } amqp_bytes_t read_all(int fd) { - size_t space = 4096; - amqp_bytes_t bytes; - - bytes.bytes = malloc(space); - bytes.len = 0; - - for (;;) { - ssize_t res = read(fd, (char *)bytes.bytes + bytes.len, - space-bytes.len); - if (res == 0) - break; - - if (res < 0) { - if (errno == EINTR) - continue; - - die_errno(errno, "reading"); - } - - bytes.len += res; - if (bytes.len == space) { - space *= 2; - bytes.bytes = realloc(bytes.bytes, space); - } - } - - return bytes; + size_t space = 4096; + amqp_bytes_t bytes; + + bytes.bytes = malloc(space); + bytes.len = 0; + + for (;;) { + ssize_t res = read(fd, (char *)bytes.bytes + bytes.len, + space-bytes.len); + if (res == 0) { + break; + } + + if (res < 0) { + if (errno == EINTR) { + continue; + } + + die_errno(errno, "reading"); + } + + bytes.len += res; + if (bytes.len == space) { + space *= 2; + bytes.bytes = realloc(bytes.bytes, space); + } + } + + return bytes; } void write_all(int fd, amqp_bytes_t data) { - while (data.len > 0) { - ssize_t res = write(fd, data.bytes, data.len); - if (res < 0) - die_errno(errno, "write"); - - data.len -= res; - data.bytes = (char *)data.bytes + res; - } + while (data.len > 0) { + ssize_t res = write(fd, data.bytes, data.len); + if (res < 0) { + die_errno(errno, "write"); + } + + data.len -= res; + data.bytes = (char *)data.bytes + res; + } } void copy_body(amqp_connection_state_t conn, int fd) { - size_t body_remaining; - amqp_frame_t frame; - - int res = amqp_simple_wait_frame(conn, &frame); - die_amqp_error(res, "waiting for header frame"); - if (frame.frame_type != AMQP_FRAME_HEADER) - die("expected header, got frame type 0x%X", - frame.frame_type); - - body_remaining = frame.payload.properties.body_size; - while (body_remaining) { - res = amqp_simple_wait_frame(conn, &frame); - die_amqp_error(res, "waiting for body frame"); - if (frame.frame_type != AMQP_FRAME_BODY) - die("expected body, got frame type 0x%X", - frame.frame_type); - - write_all(fd, frame.payload.body_fragment); - body_remaining -= frame.payload.body_fragment.len; - } + size_t body_remaining; + amqp_frame_t frame; + + int res = amqp_simple_wait_frame(conn, &frame); + die_amqp_error(res, "waiting for header frame"); + if (frame.frame_type != AMQP_FRAME_HEADER) { + die("expected header, got frame type 0x%X", + frame.frame_type); + } + + body_remaining = frame.payload.properties.body_size; + while (body_remaining) { + res = amqp_simple_wait_frame(conn, &frame); + die_amqp_error(res, "waiting for body frame"); + if (frame.frame_type != AMQP_FRAME_BODY) { + die("expected body, got frame type 0x%X", + frame.frame_type); + } + + write_all(fd, frame.payload.body_fragment); + body_remaining -= frame.payload.body_fragment.len; + } } poptContext process_options(int argc, const char **argv, - struct poptOption *options, - const char *help) + struct poptOption *options, + const char *help) { - int c; - poptContext opts = poptGetContext(NULL, argc, argv, options, 0); - poptSetOtherOptionHelp(opts, help); - - while ((c = poptGetNextOpt(opts)) >= 0) { - /* no options require explicit handling */ - } - - if (c < -1) { - fprintf(stderr, "%s: %s\n", - poptBadOption(opts, POPT_BADOPTION_NOALIAS), - poptStrerror(c)); - poptPrintUsage(opts, stderr, 0); - exit(1); - } - - return opts; + int c; + poptContext opts = poptGetContext(NULL, argc, argv, options, 0); + poptSetOtherOptionHelp(opts, help); + + while ((c = poptGetNextOpt(opts)) >= 0) { + /* no options require explicit handling */ + } + + if (c < -1) { + fprintf(stderr, "%s: %s\n", + poptBadOption(opts, POPT_BADOPTION_NOALIAS), + poptStrerror(c)); + poptPrintUsage(opts, stderr, 0); + exit(1); + } + + return opts; } void process_all_options(int argc, const char **argv, - struct poptOption *options) + struct poptOption *options) { - poptContext opts = process_options(argc, argv, options, - "[OPTIONS]..."); - const char *opt = poptPeekArg(opts); + poptContext opts = process_options(argc, argv, options, + "[OPTIONS]..."); + const char *opt = poptPeekArg(opts); - if (opt) { - fprintf(stderr, "unexpected operand: %s\n", opt); - poptPrintUsage(opts, stderr, 0); - exit(1); - } + if (opt) { + fprintf(stderr, "unexpected operand: %s\n", opt); + poptPrintUsage(opts, stderr, 0); + exit(1); + } - poptFreeContext(opts); + poptFreeContext(opts); } amqp_bytes_t cstring_bytes(const char *str) { - return str ? amqp_cstring_bytes(str) : amqp_empty_bytes; + return str ? amqp_cstring_bytes(str) : amqp_empty_bytes; } diff --git a/tools/common.h b/tools/common.h index 0a9af9d..77979ee 100644 --- a/tools/common.h +++ b/tools/common.h @@ -42,13 +42,13 @@ extern char *amqp_server_exception_string(amqp_rpc_reply_t r); extern char *amqp_rpc_reply_string(amqp_rpc_reply_t r); extern void die(const char *fmt, ...) - __attribute__ ((format (printf, 1, 2))); +__attribute__ ((format (printf, 1, 2))); extern void die_errno(int err, const char *fmt, ...) - __attribute__ ((format (printf, 2, 3))); +__attribute__ ((format (printf, 2, 3))); extern void die_amqp_error(int err, const char *fmt, ...) - __attribute__ ((format (printf, 2, 3))); +__attribute__ ((format (printf, 2, 3))); extern void die_rpc(amqp_rpc_reply_t r, const char *fmt, ...) - __attribute__ ((format (printf, 2, 3))); +__attribute__ ((format (printf, 2, 3))); extern const char *connect_options_title; extern struct poptOption connect_options[]; @@ -61,12 +61,12 @@ extern void write_all(int fd, amqp_bytes_t data); extern void copy_body(amqp_connection_state_t conn, int fd); #define INCLUDE_OPTIONS(options) \ - {NULL, 0, POPT_ARG_INCLUDE_TABLE, options, 0, options ## _title, NULL} + {NULL, 0, POPT_ARG_INCLUDE_TABLE, options, 0, options ## _title, NULL} extern poptContext process_options(int argc, const char **argv, - struct poptOption *options, - const char *help); + struct poptOption *options, + const char *help); extern void process_all_options(int argc, const char **argv, - struct poptOption *options); + struct poptOption *options); extern amqp_bytes_t cstring_bytes(const char *str); diff --git a/tools/consume.c b/tools/consume.c index d6ddfa8..c52311e 100644 --- a/tools/consume.c +++ b/tools/consume.c @@ -45,164 +45,179 @@ use the same escaping conventions as rabbitmqctl. */ static char *stringify_bytes(amqp_bytes_t bytes) { - /* We will need up to 4 chars per byte, plus the terminating 0 */ - char *res = malloc(bytes.len * 4 + 1); - uint8_t *data = bytes.bytes; - char *p = res; - size_t i; - - for (i = 0; i < bytes.len; i++) { - if (data[i] >= 32 && data[i] != 127) { - *p++ = data[i]; - } - else { - *p++ = '\\'; - *p++ = '0' + (data[i] >> 6); - *p++ = '0' + (data[i] >> 3 & 0x7); - *p++ = '0' + (data[i] & 0x7); - } - } - - *p = 0; - return res; + /* We will need up to 4 chars per byte, plus the terminating 0 */ + char *res = malloc(bytes.len * 4 + 1); + uint8_t *data = bytes.bytes; + char *p = res; + size_t i; + + for (i = 0; i < bytes.len; i++) { + if (data[i] >= 32 && data[i] != 127) { + *p++ = data[i]; + } else { + *p++ = '\\'; + *p++ = '0' + (data[i] >> 6); + *p++ = '0' + (data[i] >> 3 & 0x7); + *p++ = '0' + (data[i] & 0x7); + } + } + + *p = 0; + return res; } static amqp_bytes_t setup_queue(amqp_connection_state_t conn, - char *queue, char *exchange, - char *routing_key, int declare) + char *queue, char *exchange, + char *routing_key, int declare) { - amqp_bytes_t queue_bytes = cstring_bytes(queue); - - /* if an exchange name wasn't provided, check that we don't - have options that require it. */ - if (!exchange && routing_key) { - fprintf(stderr, "--routing-key option requires an exchange" - " name to be provided with --exchange\n"); - exit(1); - } - - if (!queue || exchange || declare) { - /* Declare the queue as auto-delete. */ - amqp_queue_declare_ok_t *res = amqp_queue_declare(conn, 1, - queue_bytes, 0, 0, 1, 1, - amqp_empty_table); - if (!res) - die_rpc(amqp_get_rpc_reply(conn), "queue.declare"); - - if (!queue) { - /* the server should have provided a queue name */ - char *sq; - queue_bytes = amqp_bytes_malloc_dup(res->queue); - sq = stringify_bytes(queue_bytes); - fprintf(stderr, "Server provided queue name: %s\n", - sq); - free(sq); - } - - /* Bind to an exchange if requested */ - if (exchange) { - amqp_bytes_t eb = amqp_cstring_bytes(exchange); - if (!amqp_queue_bind(conn, 1, queue_bytes, eb, - cstring_bytes(routing_key), - amqp_empty_table)) - die_rpc(amqp_get_rpc_reply(conn), - "queue.bind"); - } - } - - return queue_bytes; + amqp_bytes_t queue_bytes = cstring_bytes(queue); + + /* if an exchange name wasn't provided, check that we don't + have options that require it. */ + if (!exchange && routing_key) { + fprintf(stderr, "--routing-key option requires an exchange" + " name to be provided with --exchange\n"); + exit(1); + } + + if (!queue || exchange || declare) { + /* Declare the queue as auto-delete. */ + amqp_queue_declare_ok_t *res = amqp_queue_declare(conn, 1, + queue_bytes, 0, 0, 1, 1, + amqp_empty_table); + if (!res) { + die_rpc(amqp_get_rpc_reply(conn), "queue.declare"); + } + + if (!queue) { + /* the server should have provided a queue name */ + char *sq; + queue_bytes = amqp_bytes_malloc_dup(res->queue); + sq = stringify_bytes(queue_bytes); + fprintf(stderr, "Server provided queue name: %s\n", + sq); + free(sq); + } + + /* Bind to an exchange if requested */ + if (exchange) { + amqp_bytes_t eb = amqp_cstring_bytes(exchange); + if (!amqp_queue_bind(conn, 1, queue_bytes, eb, + cstring_bytes(routing_key), + amqp_empty_table)) + die_rpc(amqp_get_rpc_reply(conn), + "queue.bind"); + } + } + + return queue_bytes; } static void do_consume(amqp_connection_state_t conn, amqp_bytes_t queue, - int no_ack, int count, const char * const *argv) + int no_ack, int count, const char *const *argv) { - int i; - - /* If there is a limit, set the qos to match */ - if (count > 0 && count <= 65535 - && !amqp_basic_qos(conn, 1, 0, count, 0)) - die_rpc(amqp_get_rpc_reply(conn), "basic.qos"); - - if (!amqp_basic_consume(conn, 1, queue, amqp_empty_bytes, 0, no_ack, - 0, amqp_empty_table)) - die_rpc(amqp_get_rpc_reply(conn), "basic.consume"); - - for (i = 0; count < 0 || i < count; i++) { - amqp_frame_t frame; - struct pipeline pl; - uint64_t delivery_tag; - int res = amqp_simple_wait_frame(conn, &frame); - die_amqp_error(res, "waiting for header frame"); - - if (frame.frame_type != AMQP_FRAME_METHOD - || frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) - continue; - - amqp_basic_deliver_t *deliver - = (amqp_basic_deliver_t *)frame.payload.method.decoded; - delivery_tag = deliver->delivery_tag; - - pipeline(argv, &pl); - copy_body(conn, pl.infd); - - if (finish_pipeline(&pl) && !no_ack) - die_amqp_error(amqp_basic_ack(conn, 1, delivery_tag, - 0), - "basic.ack"); - - amqp_maybe_release_buffers(conn); - } + int i; + + /* If there is a limit, set the qos to match */ + if (count > 0 && count <= 65535 + && !amqp_basic_qos(conn, 1, 0, count, 0)) { + die_rpc(amqp_get_rpc_reply(conn), "basic.qos"); + } + + if (!amqp_basic_consume(conn, 1, queue, amqp_empty_bytes, 0, no_ack, + 0, amqp_empty_table)) { + die_rpc(amqp_get_rpc_reply(conn), "basic.consume"); + } + + for (i = 0; count < 0 || i < count; i++) { + amqp_frame_t frame; + struct pipeline pl; + uint64_t delivery_tag; + int res = amqp_simple_wait_frame(conn, &frame); + die_amqp_error(res, "waiting for header frame"); + + if (frame.frame_type != AMQP_FRAME_METHOD + || frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) { + continue; + } + + amqp_basic_deliver_t *deliver + = (amqp_basic_deliver_t *)frame.payload.method.decoded; + delivery_tag = deliver->delivery_tag; + + pipeline(argv, &pl); + copy_body(conn, pl.infd); + + if (finish_pipeline(&pl) && !no_ack) + die_amqp_error(amqp_basic_ack(conn, 1, delivery_tag, + 0), + "basic.ack"); + + amqp_maybe_release_buffers(conn); + } } int main(int argc, const char **argv) { - poptContext opts; - amqp_connection_state_t conn; - const char * const *cmd_argv; - char *queue = NULL; - char *exchange = NULL; - char *routing_key = NULL; - int declare = 0; - int no_ack = 0; - int count = -1; - amqp_bytes_t queue_bytes; - - struct poptOption options[] = { - INCLUDE_OPTIONS(connect_options), - {"queue", 'q', POPT_ARG_STRING, &queue, 0, - "the queue to consume from", "queue"}, - {"exchange", 'e', POPT_ARG_STRING, &exchange, 0, - "bind the queue to this exchange", "exchange"}, - {"routing-key", 'r', POPT_ARG_STRING, &routing_key, 0, - "the routing key to bind with", "routing key"}, - {"declare", 'd', POPT_ARG_NONE, &declare, 0, - "declare an exclusive queue", NULL}, - {"no-ack", 'A', POPT_ARG_NONE, &no_ack, 0, - "consume in no-ack mode", NULL}, - {"count", 'c', POPT_ARG_INT, &count, 0, - "stop consuming after this many messages are consumed", - "limit"}, - POPT_AUTOHELP - { NULL, '\0', 0, NULL, 0, NULL, NULL } - }; - - opts = process_options(argc, argv, options, - "[OPTIONS]... "); - - cmd_argv = poptGetArgs(opts); - if (!cmd_argv || !cmd_argv[0]) { - fprintf(stderr, "consuming command not specified\n"); - poptPrintUsage(opts, stderr, 0); - goto error; - } - - conn = make_connection(); - queue_bytes = setup_queue(conn, queue, exchange, routing_key, declare); - do_consume(conn, queue_bytes, no_ack, count, cmd_argv); - close_connection(conn); - return 0; + poptContext opts; + amqp_connection_state_t conn; + const char *const *cmd_argv; + char *queue = NULL; + char *exchange = NULL; + char *routing_key = NULL; + int declare = 0; + int no_ack = 0; + int count = -1; + amqp_bytes_t queue_bytes; + + struct poptOption options[] = { + INCLUDE_OPTIONS(connect_options), + { + "queue", 'q', POPT_ARG_STRING, &queue, 0, + "the queue to consume from", "queue" + }, + { + "exchange", 'e', POPT_ARG_STRING, &exchange, 0, + "bind the queue to this exchange", "exchange" + }, + { + "routing-key", 'r', POPT_ARG_STRING, &routing_key, 0, + "the routing key to bind with", "routing key" + }, + { + "declare", 'd', POPT_ARG_NONE, &declare, 0, + "declare an exclusive queue", NULL + }, + { + "no-ack", 'A', POPT_ARG_NONE, &no_ack, 0, + "consume in no-ack mode", NULL + }, + { + "count", 'c', POPT_ARG_INT, &count, 0, + "stop consuming after this many messages are consumed", + "limit" + }, + POPT_AUTOHELP + { NULL, '\0', 0, NULL, 0, NULL, NULL } + }; + + opts = process_options(argc, argv, options, + "[OPTIONS]... "); + + cmd_argv = poptGetArgs(opts); + if (!cmd_argv || !cmd_argv[0]) { + fprintf(stderr, "consuming command not specified\n"); + poptPrintUsage(opts, stderr, 0); + goto error; + } + + conn = make_connection(); + queue_bytes = setup_queue(conn, queue, exchange, routing_key, declare); + do_consume(conn, queue_bytes, no_ack, count, cmd_argv); + close_connection(conn); + return 0; error: - poptFreeContext(opts); - return 1; + poptFreeContext(opts); + return 1; } diff --git a/tools/declare_queue.c b/tools/declare_queue.c index 26c3a68..25a94b1 100644 --- a/tools/declare_queue.c +++ b/tools/declare_queue.c @@ -44,41 +44,46 @@ int main(int argc, const char **argv) { - amqp_connection_state_t conn; - char *queue = NULL; - int durable = 0; + amqp_connection_state_t conn; + char *queue = NULL; + int durable = 0; - struct poptOption options[] = { - INCLUDE_OPTIONS(connect_options), - {"queue", 'q', POPT_ARG_STRING, &queue, 0, - "the queue name to declare, or the empty string", "queue"}, - {"durable", 'd', POPT_ARG_VAL, &durable, 1, - "declare a durable queue", NULL}, - POPT_AUTOHELP - { NULL, '\0', 0, NULL, 0, NULL, NULL } - }; + struct poptOption options[] = { + INCLUDE_OPTIONS(connect_options), + { + "queue", 'q', POPT_ARG_STRING, &queue, 0, + "the queue name to declare, or the empty string", "queue" + }, + { + "durable", 'd', POPT_ARG_VAL, &durable, 1, + "declare a durable queue", NULL + }, + POPT_AUTOHELP + { NULL, '\0', 0, NULL, 0, NULL, NULL } + }; - process_all_options(argc, argv, options); + process_all_options(argc, argv, options); - if (queue == NULL) { - fprintf(stderr, "queue name not specified\n"); - return 1; - } + if (queue == NULL) { + fprintf(stderr, "queue name not specified\n"); + return 1; + } - conn = make_connection(); - { - amqp_queue_declare_ok_t *reply = amqp_queue_declare(conn, 1, - cstring_bytes(queue), - 0, - durable, - 0, - 0, - amqp_empty_table); - if (reply == NULL) - die_rpc(amqp_get_rpc_reply(conn), "queue.declare"); + conn = make_connection(); + { + amqp_queue_declare_ok_t *reply = amqp_queue_declare(conn, 1, + cstring_bytes(queue), + 0, + durable, + 0, + 0, + amqp_empty_table); + if (reply == NULL) { + die_rpc(amqp_get_rpc_reply(conn), "queue.declare"); + } - printf("%.*s\n", (int)reply->queue.len, (char *)reply->queue.bytes); - } - close_connection(conn); - return 0; + printf("%.*s\n", (int)reply->queue.len, (char *)reply->queue.bytes); + } + close_connection(conn); + return 0; } diff --git a/tools/delete_queue.c b/tools/delete_queue.c index cb92f7b..4ebbd94 100644 --- a/tools/delete_queue.c +++ b/tools/delete_queue.c @@ -44,41 +44,47 @@ int main(int argc, const char **argv) { - amqp_connection_state_t conn; - char *queue = NULL; - int if_unused = 0; - int if_empty = 0; + amqp_connection_state_t conn; + char *queue = NULL; + int if_unused = 0; + int if_empty = 0; - struct poptOption options[] = { - INCLUDE_OPTIONS(connect_options), - {"queue", 'q', POPT_ARG_STRING, &queue, 0, - "the queue name to delete", "queue"}, - {"if-unused", 'u', POPT_ARG_VAL, &if_unused, 1, - "do not delete unless queue is unused", NULL}, - {"if-empty", 'e', POPT_ARG_VAL, &if_empty, 1, - "do not delete unless queue is empty", NULL}, - POPT_AUTOHELP - { NULL, '\0', 0, NULL, 0, NULL, NULL } - }; + struct poptOption options[] = { + INCLUDE_OPTIONS(connect_options), + { + "queue", 'q', POPT_ARG_STRING, &queue, 0, + "the queue name to delete", "queue" + }, + { + "if-unused", 'u', POPT_ARG_VAL, &if_unused, 1, + "do not delete unless queue is unused", NULL + }, + { + "if-empty", 'e', POPT_ARG_VAL, &if_empty, 1, + "do not delete unless queue is empty", NULL + }, + POPT_AUTOHELP + { NULL, '\0', 0, NULL, 0, NULL, NULL } + }; - process_all_options(argc, argv, options); + process_all_options(argc, argv, options); - if (queue == NULL || *queue == '\0') { - fprintf(stderr, "queue name not specified\n"); - return 1; - } + if (queue == NULL || *queue == '\0') { + fprintf(stderr, "queue name not specified\n"); + return 1; + } - conn = make_connection(); - { - amqp_queue_delete_ok_t *reply = amqp_queue_delete(conn, 1, - cstring_bytes(queue), - if_unused, - if_empty); - if (reply == NULL) { - die_rpc(amqp_get_rpc_reply(conn), "queue.delete"); - } - printf("%u\n", reply->message_count); - } - close_connection(conn); - return 0; + conn = make_connection(); + { + amqp_queue_delete_ok_t *reply = amqp_queue_delete(conn, 1, + cstring_bytes(queue), + if_unused, + if_empty); + if (reply == NULL) { + die_rpc(amqp_get_rpc_reply(conn), "queue.delete"); + } + printf("%u\n", reply->message_count); + } + close_connection(conn); + return 0; } diff --git a/tools/get.c b/tools/get.c index 888e069..bbcde72 100644 --- a/tools/get.c +++ b/tools/get.c @@ -41,40 +41,43 @@ static int do_get(amqp_connection_state_t conn, char *queue) { - amqp_rpc_reply_t r - = amqp_basic_get(conn, 1, cstring_bytes(queue), 1); - die_rpc(r, "basic.get"); + amqp_rpc_reply_t r + = amqp_basic_get(conn, 1, cstring_bytes(queue), 1); + die_rpc(r, "basic.get"); - if (r.reply.id == AMQP_BASIC_GET_EMPTY_METHOD) - return 0; + if (r.reply.id == AMQP_BASIC_GET_EMPTY_METHOD) { + return 0; + } - copy_body(conn, 1); - return 1; + copy_body(conn, 1); + return 1; } int main(int argc, const char **argv) { - amqp_connection_state_t conn; - char *queue = NULL; - int got_something; + amqp_connection_state_t conn; + char *queue = NULL; + int got_something; - struct poptOption options[] = { - INCLUDE_OPTIONS(connect_options), - {"queue", 'q', POPT_ARG_STRING, &queue, 0, - "the queue to consume from", "queue"}, - POPT_AUTOHELP - { NULL, '\0', 0, NULL, 0, NULL, NULL } - }; + struct poptOption options[] = { + INCLUDE_OPTIONS(connect_options), + { + "queue", 'q', POPT_ARG_STRING, &queue, 0, + "the queue to consume from", "queue" + }, + POPT_AUTOHELP + { NULL, '\0', 0, NULL, 0, NULL, NULL } + }; - process_all_options(argc, argv, options); + process_all_options(argc, argv, options); - if (!queue) { - fprintf(stderr, "queue not specified\n"); - return 1; - } + if (!queue) { + fprintf(stderr, "queue not specified\n"); + return 1; + } - conn = make_connection(); - got_something = do_get(conn, queue); - close_connection(conn); - return got_something ? 0 : 2; + conn = make_connection(); + got_something = do_get(conn, queue); + close_connection(conn); + return got_something ? 0 : 2; } diff --git a/tools/publish.c b/tools/publish.c index 5ebc191..617b055 100644 --- a/tools/publish.c +++ b/tools/publish.c @@ -43,80 +43,94 @@ static void do_publish(amqp_connection_state_t conn, char *exchange, char *routing_key, - amqp_basic_properties_t *props, amqp_bytes_t body) + amqp_basic_properties_t *props, amqp_bytes_t body) { - int res = amqp_basic_publish(conn, 1, - cstring_bytes(exchange), - cstring_bytes(routing_key), - 0, 0, props, body); - die_amqp_error(res, "basic.publish"); + int res = amqp_basic_publish(conn, 1, + cstring_bytes(exchange), + cstring_bytes(routing_key), + 0, 0, props, body); + die_amqp_error(res, "basic.publish"); } int main(int argc, const char **argv) { - amqp_connection_state_t conn; - char *exchange = NULL; - char *routing_key = NULL; - char *content_type = NULL; - char *content_encoding = NULL; - char *body = NULL; - amqp_basic_properties_t props; - amqp_bytes_t body_bytes; - int delivery = 1; /* non-persistent by default */ - - struct poptOption options[] = { - INCLUDE_OPTIONS(connect_options), - {"exchange", 'e', POPT_ARG_STRING, &exchange, 0, - "the exchange to publish to", "exchange"}, - {"routing-key", 'r', POPT_ARG_STRING, &routing_key, 0, - "the routing key to publish with", "routing key"}, - {"persistent", 'p', POPT_ARG_VAL, &delivery, 2, - "use the persistent delivery mode", NULL}, - {"content-type", 'C', POPT_ARG_STRING, &content_type, 0, - "the content-type for the message", "content type"}, - {"content-encoding", 'E', POPT_ARG_STRING, - &content_encoding, 0, - "the content-encoding for the message", "content encoding"}, - {"body", 'b', POPT_ARG_STRING, &body, 0, - "specify the message body", "body"}, - POPT_AUTOHELP - { NULL, '\0', 0, NULL, 0, NULL, NULL } - }; - - process_all_options(argc, argv, options); - - if (!exchange && !routing_key) { - fprintf(stderr, - "neither exchange nor routing key specified\n"); - return 1; - } - - memset(&props, 0, sizeof props); - props._flags = AMQP_BASIC_DELIVERY_MODE_FLAG; - props.delivery_mode = 2; /* persistent delivery mode */ - - if (content_type) { - props._flags |= AMQP_BASIC_CONTENT_TYPE_FLAG; - props.content_type = amqp_cstring_bytes(content_type); - } - - if (content_encoding) { - props._flags |= AMQP_BASIC_CONTENT_ENCODING_FLAG; - props.content_encoding = amqp_cstring_bytes(content_encoding); - } - - conn = make_connection(); - - if (body) - body_bytes = amqp_cstring_bytes(body); - else - body_bytes = read_all(0); - - do_publish(conn, exchange, routing_key, &props, body_bytes); - - if (!body) - free(body_bytes.bytes); - - close_connection(conn); - return 0; + amqp_connection_state_t conn; + char *exchange = NULL; + char *routing_key = NULL; + char *content_type = NULL; + char *content_encoding = NULL; + char *body = NULL; + amqp_basic_properties_t props; + amqp_bytes_t body_bytes; + int delivery = 1; /* non-persistent by default */ + + struct poptOption options[] = { + INCLUDE_OPTIONS(connect_options), + { + "exchange", 'e', POPT_ARG_STRING, &exchange, 0, + "the exchange to publish to", "exchange" + }, + { + "routing-key", 'r', POPT_ARG_STRING, &routing_key, 0, + "the routing key to publish with", "routing key" + }, + { + "persistent", 'p', POPT_ARG_VAL, &delivery, 2, + "use the persistent delivery mode", NULL + }, + { + "content-type", 'C', POPT_ARG_STRING, &content_type, 0, + "the content-type for the message", "content type" + }, + { + "content-encoding", 'E', POPT_ARG_STRING, + &content_encoding, 0, + "the content-encoding for the message", "content encoding" + }, + { + "body", 'b', POPT_ARG_STRING, &body, 0, + "specify the message body", "body" + }, + POPT_AUTOHELP + { NULL, '\0', 0, NULL, 0, NULL, NULL } + }; + + process_all_options(argc, argv, options); + + if (!exchange && !routing_key) { + fprintf(stderr, + "neither exchange nor routing key specified\n"); + return 1; + } + + memset(&props, 0, sizeof props); + props._flags = AMQP_BASIC_DELIVERY_MODE_FLAG; + props.delivery_mode = 2; /* persistent delivery mode */ + + if (content_type) { + props._flags |= AMQP_BASIC_CONTENT_TYPE_FLAG; + props.content_type = amqp_cstring_bytes(content_type); + } + + if (content_encoding) { + props._flags |= AMQP_BASIC_CONTENT_ENCODING_FLAG; + props.content_encoding = amqp_cstring_bytes(content_encoding); + } + + conn = make_connection(); + + if (body) { + body_bytes = amqp_cstring_bytes(body); + } else { + body_bytes = read_all(0); + } + + do_publish(conn, exchange, routing_key, &props, body_bytes); + + if (!body) { + free(body_bytes.bytes); + } + + close_connection(conn); + return 0; } diff --git a/tools/unix/process.c b/tools/unix/process.c index a714928..249fba3 100644 --- a/tools/unix/process.c +++ b/tools/unix/process.c @@ -47,41 +47,45 @@ extern char **environ; void pipeline(const char *const *argv, struct pipeline *pl) { - posix_spawn_file_actions_t file_acts; + posix_spawn_file_actions_t file_acts; - int pipefds[2]; - if (pipe(pipefds)) - die_errno(errno, "pipe"); + int pipefds[2]; + if (pipe(pipefds)) { + die_errno(errno, "pipe"); + } - die_errno(posix_spawn_file_actions_init(&file_acts), - "posix_spawn_file_actions_init"); - die_errno(posix_spawn_file_actions_adddup2(&file_acts, pipefds[0], 0), - "posix_spawn_file_actions_adddup2"); - die_errno(posix_spawn_file_actions_addclose(&file_acts, pipefds[0]), - "posix_spawn_file_actions_addclose"); - die_errno(posix_spawn_file_actions_addclose(&file_acts, pipefds[1]), - "posix_spawn_file_actions_addclose"); + die_errno(posix_spawn_file_actions_init(&file_acts), + "posix_spawn_file_actions_init"); + die_errno(posix_spawn_file_actions_adddup2(&file_acts, pipefds[0], 0), + "posix_spawn_file_actions_adddup2"); + die_errno(posix_spawn_file_actions_addclose(&file_acts, pipefds[0]), + "posix_spawn_file_actions_addclose"); + die_errno(posix_spawn_file_actions_addclose(&file_acts, pipefds[1]), + "posix_spawn_file_actions_addclose"); - die_errno(posix_spawnp(&pl->pid, argv[0], &file_acts, NULL, - (char * const *)argv, environ), - "posix_spawnp: %s", argv[0]); + die_errno(posix_spawnp(&pl->pid, argv[0], &file_acts, NULL, + (char * const *)argv, environ), + "posix_spawnp: %s", argv[0]); - die_errno(posix_spawn_file_actions_destroy(&file_acts), - "posix_spawn_file_actions_destroy"); + die_errno(posix_spawn_file_actions_destroy(&file_acts), + "posix_spawn_file_actions_destroy"); - if (close(pipefds[0])) - die_errno(errno, "close"); + if (close(pipefds[0])) { + die_errno(errno, "close"); + } - pl->infd = pipefds[1]; + pl->infd = pipefds[1]; } int finish_pipeline(struct pipeline *pl) { - int status; + int status; - if (close(pl->infd)) - die_errno(errno, "close"); - if (waitpid(pl->pid, &status, 0) < 0) - die_errno(errno, "waitpid"); - return WIFEXITED(status) && WEXITSTATUS(status) == 0; + if (close(pl->infd)) { + die_errno(errno, "close"); + } + if (waitpid(pl->pid, &status, 0) < 0) { + die_errno(errno, "waitpid"); + } + return WIFEXITED(status) && WEXITSTATUS(status) == 0; } diff --git a/tools/unix/process.h b/tools/unix/process.h index 0aad292..a211f27 100644 --- a/tools/unix/process.h +++ b/tools/unix/process.h @@ -32,8 +32,8 @@ */ struct pipeline { - int pid; - int infd; + int pid; + int infd; }; extern void pipeline(const char *const *argv, struct pipeline *pl); diff --git a/tools/win32/compat.c b/tools/win32/compat.c index cbac8e6..7c7d97e 100644 --- a/tools/win32/compat.c +++ b/tools/win32/compat.c @@ -43,21 +43,22 @@ int asprintf(char **strp, const char *fmt, ...) { - va_list ap; - int len; + va_list ap; + int len; - va_start(ap, fmt); - len = _vscprintf(fmt, ap); - va_end(ap); + va_start(ap, fmt); + len = _vscprintf(fmt, ap); + va_end(ap); - *strp = malloc(len+1); - if (!*strp) - return -1; + *strp = malloc(len+1); + if (!*strp) { + return -1; + } - va_start(ap, fmt); - _vsnprintf(*strp, len+1, fmt, ap); - va_end(ap); + va_start(ap, fmt); + _vsnprintf(*strp, len+1, fmt, ap); + va_end(ap); - (*strp)[len] = 0; - return len; + (*strp)[len] = 0; + return len; } diff --git a/tools/win32/process.c b/tools/win32/process.c index 699f60f..4d5270b 100644 --- a/tools/win32/process.c +++ b/tools/win32/process.c @@ -44,171 +44,188 @@ void die_windows_error(const char *fmt, ...) { - char *msg; - - va_list ap; - va_start(ap, fmt); - vfprintf(stderr, fmt, ap); - va_end(ap); - - if (!FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM - | FORMAT_MESSAGE_ALLOCATE_BUFFER, - NULL, GetLastError(), - MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), - (LPSTR)&msg, 0, NULL)) - msg = "(failed to retrieve Windows error message)"; - - fprintf(stderr, ": %s\n", msg); - exit(1); + char *msg; + + va_list ap; + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); + + if (!FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM + | FORMAT_MESSAGE_ALLOCATE_BUFFER, + NULL, GetLastError(), + MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), + (LPSTR)&msg, 0, NULL)) { + msg = "(failed to retrieve Windows error message)"; + } + + fprintf(stderr, ": %s\n", msg); + exit(1); } static char *make_command_line(const char *const *argv) { - int i; - size_t len = 1; /* initial quotes */ - char *buf; - char *dest; - - /* calculate the length of the required buffer, making worst - case assumptions for simplicity */ - for (i = 0;;) { - /* each character could need escaping */ - len += strlen(argv[i]) * 2; - - if (!argv[++i]) - break; - - len += 3; /* quotes, space, quotes */ - } - - len += 2; /* final quotes and the terminating zero */ - - dest = buf = malloc(len); - if (!buf) - die("allocating memory for subprocess command line"); - - /* Here we perform the inverse of the CommandLineToArgvW - function. Note that its rules are slightly crazy: A - sequence of backslashes only act to escape if followed by - double quotes. A sequence of backslashes not followed by - double quotes is untouched. */ - - for (i = 0;;) { - const char *src = argv[i]; - int backslashes = 0; - - *dest++ = '\"'; - - for (;;) { - switch (*src) { - case 0: - goto done; - - case '\"': - for (; backslashes; backslashes--) - *dest++ = '\\'; - - *dest++ = '\\'; - *dest++ = '\"'; - break; - - case '\\': - backslashes++; - *dest++ = '\\'; - break; - - default: - backslashes = 0; - *dest++ = *src; - break; - } - - src++; - } - done: - for (; backslashes; backslashes--) - *dest++ = '\\'; - - *dest++ = '\"'; - - if (!argv[++i]) - break; - - *dest++ = ' '; - } - - *dest++ = 0; - return buf; + int i; + size_t len = 1; /* initial quotes */ + char *buf; + char *dest; + + /* calculate the length of the required buffer, making worst + case assumptions for simplicity */ + for (i = 0;;) { + /* each character could need escaping */ + len += strlen(argv[i]) * 2; + + if (!argv[++i]) { + break; + } + + len += 3; /* quotes, space, quotes */ + } + + len += 2; /* final quotes and the terminating zero */ + + dest = buf = malloc(len); + if (!buf) { + die("allocating memory for subprocess command line"); + } + + /* Here we perform the inverse of the CommandLineToArgvW + function. Note that its rules are slightly crazy: A + sequence of backslashes only act to escape if followed by + double quotes. A sequence of backslashes not followed by + double quotes is untouched. */ + + for (i = 0;;) { + const char *src = argv[i]; + int backslashes = 0; + + *dest++ = '\"'; + + for (;;) { + switch (*src) { + case 0: + goto done; + + case '\"': + for (; backslashes; backslashes--) { + *dest++ = '\\'; + } + + *dest++ = '\\'; + *dest++ = '\"'; + break; + + case '\\': + backslashes++; + *dest++ = '\\'; + break; + + default: + backslashes = 0; + *dest++ = *src; + break; + } + + src++; + } +done: + for (; backslashes; backslashes--) { + *dest++ = '\\'; + } + + *dest++ = '\"'; + + if (!argv[++i]) { + break; + } + + *dest++ = ' '; + } + + *dest++ = 0; + return buf; } void pipeline(const char *const *argv, struct pipeline *pl) { - HANDLE in_read_handle, in_write_handle; - SECURITY_ATTRIBUTES sec_attr; - PROCESS_INFORMATION proc_info; - STARTUPINFO start_info; - char *cmdline = make_command_line(argv); - - sec_attr.nLength = sizeof sec_attr; - sec_attr.bInheritHandle = TRUE; - sec_attr.lpSecurityDescriptor = NULL; - - if (!CreatePipe(&in_read_handle, &in_write_handle, &sec_attr, 0)) - die_windows_error("CreatePipe"); - - if (!SetHandleInformation(in_write_handle, HANDLE_FLAG_INHERIT, 0)) - die_windows_error("SetHandleInformation"); - - /* when in Rome... */ - ZeroMemory(&proc_info, sizeof proc_info); - ZeroMemory(&start_info, sizeof start_info); - - start_info.cb = sizeof start_info; - start_info.dwFlags |= STARTF_USESTDHANDLES; - - if ((start_info.hStdError = GetStdHandle(STD_ERROR_HANDLE)) - == INVALID_HANDLE_VALUE - || (start_info.hStdOutput = GetStdHandle(STD_OUTPUT_HANDLE)) - == INVALID_HANDLE_VALUE) - die_windows_error("GetStdHandle"); - - start_info.hStdInput = in_read_handle; - - if (!CreateProcess(NULL, cmdline, NULL, NULL, TRUE, 0, - NULL, NULL, &start_info, &proc_info)) - die_windows_error("CreateProcess"); - - free(cmdline); - - if (!CloseHandle(proc_info.hThread)) - die_windows_error("CloseHandle for thread"); - if (!CloseHandle(in_read_handle)) - die_windows_error("CloseHandle"); - - pl->proc_handle = proc_info.hProcess; - pl->infd = _open_osfhandle((intptr_t)in_write_handle, 0); + HANDLE in_read_handle, in_write_handle; + SECURITY_ATTRIBUTES sec_attr; + PROCESS_INFORMATION proc_info; + STARTUPINFO start_info; + char *cmdline = make_command_line(argv); + + sec_attr.nLength = sizeof sec_attr; + sec_attr.bInheritHandle = TRUE; + sec_attr.lpSecurityDescriptor = NULL; + + if (!CreatePipe(&in_read_handle, &in_write_handle, &sec_attr, 0)) { + die_windows_error("CreatePipe"); + } + + if (!SetHandleInformation(in_write_handle, HANDLE_FLAG_INHERIT, 0)) { + die_windows_error("SetHandleInformation"); + } + + /* when in Rome... */ + ZeroMemory(&proc_info, sizeof proc_info); + ZeroMemory(&start_info, sizeof start_info); + + start_info.cb = sizeof start_info; + start_info.dwFlags |= STARTF_USESTDHANDLES; + + if ((start_info.hStdError = GetStdHandle(STD_ERROR_HANDLE)) + == INVALID_HANDLE_VALUE + || (start_info.hStdOutput = GetStdHandle(STD_OUTPUT_HANDLE)) + == INVALID_HANDLE_VALUE) { + die_windows_error("GetStdHandle"); + } + + start_info.hStdInput = in_read_handle; + + if (!CreateProcess(NULL, cmdline, NULL, NULL, TRUE, 0, + NULL, NULL, &start_info, &proc_info)) { + die_windows_error("CreateProcess"); + } + + free(cmdline); + + if (!CloseHandle(proc_info.hThread)) { + die_windows_error("CloseHandle for thread"); + } + if (!CloseHandle(in_read_handle)) { + die_windows_error("CloseHandle"); + } + + pl->proc_handle = proc_info.hProcess; + pl->infd = _open_osfhandle((intptr_t)in_write_handle, 0); } int finish_pipeline(struct pipeline *pl) { - DWORD code; - - if (close(pl->infd)) - die_errno(errno, "close"); - - for (;;) { - if (!GetExitCodeProcess(pl->proc_handle, &code)) - die_windows_error("GetExitCodeProcess"); - if (code != STILL_ACTIVE) - break; - - if (WaitForSingleObject(pl->proc_handle, INFINITE) - == WAIT_FAILED) - die_windows_error("WaitForSingleObject"); - } - - if (!CloseHandle(pl->proc_handle)) - die_windows_error("CloseHandle for process"); - - return code; + DWORD code; + + if (close(pl->infd)) { + die_errno(errno, "close"); + } + + for (;;) { + if (!GetExitCodeProcess(pl->proc_handle, &code)) { + die_windows_error("GetExitCodeProcess"); + } + if (code != STILL_ACTIVE) { + break; + } + + if (WaitForSingleObject(pl->proc_handle, INFINITE) + == WAIT_FAILED) { + die_windows_error("WaitForSingleObject"); + } + } + + if (!CloseHandle(pl->proc_handle)) { + die_windows_error("CloseHandle for process"); + } + + return code; } diff --git a/tools/win32/process.h b/tools/win32/process.h index 1429ad2..88f2c7a 100644 --- a/tools/win32/process.h +++ b/tools/win32/process.h @@ -34,8 +34,8 @@ #include struct pipeline { - HANDLE proc_handle; - int infd; + HANDLE proc_handle; + int infd; }; extern void pipeline(const char *const *argv, struct pipeline *pl); -- cgit v1.2.1