summaryrefslogtreecommitdiff
path: root/tools
diff options
context:
space:
mode:
authorAlan Antonuk <alan.antonuk@gmail.com>2013-04-08 14:52:53 -0700
committerAlan Antonuk <alan.antonuk@gmail.com>2013-04-08 14:52:53 -0700
commitad2b116059e22d393b7e44ad54f345a3fb4e267b (patch)
treecfd98876757f85d9fe7bfe56d179bd5fb63a8ff0 /tools
parentf3074030723b840690458983b63c746549aa3bd5 (diff)
downloadrabbitmq-c-github-ask-ad2b116059e22d393b7e44ad54f345a3fb4e267b.tar.gz
Formatted source code with astyle utilty
Diffstat (limited to 'tools')
-rw-r--r--tools/common.c627
-rw-r--r--tools/common.h16
-rw-r--r--tools/consume.c307
-rw-r--r--tools/declare_queue.c69
-rw-r--r--tools/delete_queue.c72
-rw-r--r--tools/get.c55
-rw-r--r--tools/publish.c156
-rw-r--r--tools/unix/process.c56
-rw-r--r--tools/unix/process.h4
-rw-r--r--tools/win32/compat.c27
-rw-r--r--tools/win32/process.c327
-rw-r--r--tools/win32/process.h4
12 files changed, 910 insertions, 810 deletions
diff --git a/tools/common.c b/tools/common.c
index c5b2a77..bead858 100644
--- a/tools/common.c
+++ b/tools/common.c
@@ -54,113 +54,116 @@
void die(const char *fmt, ...)
{
- va_list ap;
- va_start(ap, fmt);
- vfprintf(stderr, fmt, ap);
- va_end(ap);
- fprintf(stderr, "\n");
- exit(1);
+ va_list ap;
+ va_start(ap, fmt);
+ vfprintf(stderr, fmt, ap);
+ va_end(ap);
+ fprintf(stderr, "\n");
+ exit(1);
}
void die_errno(int err, const char *fmt, ...)
{
- va_list ap;
+ va_list ap;
- if (err == 0)
- return;
+ if (err == 0) {
+ return;
+ }
- va_start(ap, fmt);
- vfprintf(stderr, fmt, ap);
- va_end(ap);
- fprintf(stderr, ": %s\n", strerror(err));
- exit(1);
+ va_start(ap, fmt);
+ vfprintf(stderr, fmt, ap);
+ va_end(ap);
+ fprintf(stderr, ": %s\n", strerror(err));
+ exit(1);
}
void die_amqp_error(int err, const char *fmt, ...)
{
- va_list ap;
- char *errstr;
-
- if (err >= 0)
- return;
-
- va_start(ap, fmt);
- vfprintf(stderr, fmt, ap);
- va_end(ap);
- fprintf(stderr, ": %s\n", errstr = amqp_error_string(-err));
- free(errstr);
- exit(1);
+ va_list ap;
+ char *errstr;
+
+ if (err >= 0) {
+ return;
+ }
+
+ va_start(ap, fmt);
+ vfprintf(stderr, fmt, ap);
+ va_end(ap);
+ fprintf(stderr, ": %s\n", errstr = amqp_error_string(-err));
+ free(errstr);
+ exit(1);
}
char *amqp_server_exception_string(amqp_rpc_reply_t r)
{
- int res;
- char *s;
-
- switch (r.reply.id) {
- case AMQP_CONNECTION_CLOSE_METHOD: {
- amqp_connection_close_t *m
- = (amqp_connection_close_t *)r.reply.decoded;
- res = asprintf(&s, "server connection error %d, message: %.*s",
- m->reply_code,
- (int)m->reply_text.len,
- (char *)m->reply_text.bytes);
- break;
- }
-
- case AMQP_CHANNEL_CLOSE_METHOD: {
- amqp_channel_close_t *m
- = (amqp_channel_close_t *)r.reply.decoded;
- res = asprintf(&s, "server channel error %d, message: %.*s",
- m->reply_code,
- (int)m->reply_text.len,
- (char *)m->reply_text.bytes);
- break;
- }
-
- default:
- res = asprintf(&s, "unknown server error, method id 0x%08X",
- r.reply.id);
- break;
- }
-
- return res >= 0 ? s : NULL;
+ int res;
+ char *s;
+
+ switch (r.reply.id) {
+ case AMQP_CONNECTION_CLOSE_METHOD: {
+ amqp_connection_close_t *m
+ = (amqp_connection_close_t *)r.reply.decoded;
+ res = asprintf(&s, "server connection error %d, message: %.*s",
+ m->reply_code,
+ (int)m->reply_text.len,
+ (char *)m->reply_text.bytes);
+ break;
+ }
+
+ case AMQP_CHANNEL_CLOSE_METHOD: {
+ amqp_channel_close_t *m
+ = (amqp_channel_close_t *)r.reply.decoded;
+ res = asprintf(&s, "server channel error %d, message: %.*s",
+ m->reply_code,
+ (int)m->reply_text.len,
+ (char *)m->reply_text.bytes);
+ break;
+ }
+
+ default:
+ res = asprintf(&s, "unknown server error, method id 0x%08X",
+ r.reply.id);
+ break;
+ }
+
+ return res >= 0 ? s : NULL;
}
char *amqp_rpc_reply_string(amqp_rpc_reply_t r)
{
- switch (r.reply_type) {
- case AMQP_RESPONSE_NORMAL:
- return strdup("normal response");
+ switch (r.reply_type) {
+ case AMQP_RESPONSE_NORMAL:
+ return strdup("normal response");
- case AMQP_RESPONSE_NONE:
- return strdup("missing RPC reply type");
+ case AMQP_RESPONSE_NONE:
+ return strdup("missing RPC reply type");
- case AMQP_RESPONSE_LIBRARY_EXCEPTION:
- return amqp_error_string(r.library_error);
+ case AMQP_RESPONSE_LIBRARY_EXCEPTION:
+ return amqp_error_string(r.library_error);
- case AMQP_RESPONSE_SERVER_EXCEPTION:
- return amqp_server_exception_string(r);
+ case AMQP_RESPONSE_SERVER_EXCEPTION:
+ return amqp_server_exception_string(r);
- default:
- abort();
- }
+ default:
+ abort();
+ }
}
void die_rpc(amqp_rpc_reply_t r, const char *fmt, ...)
{
- va_list ap;
- char *errstr;
-
- if (r.reply_type == AMQP_RESPONSE_NORMAL)
- return;
-
- va_start(ap, fmt);
- vfprintf(stderr, fmt, ap);
- va_end(ap);
- fprintf(stderr, ": %s\n", errstr = amqp_rpc_reply_string(r));
- free(errstr);
- exit(1);
+ va_list ap;
+ char *errstr;
+
+ if (r.reply_type == AMQP_RESPONSE_NORMAL) {
+ return;
+ }
+
+ va_start(ap, fmt);
+ vfprintf(stderr, fmt, ap);
+ va_end(ap);
+ fprintf(stderr, ": %s\n", errstr = amqp_rpc_reply_string(r));
+ free(errstr);
+ exit(1);
}
static char *amqp_url;
@@ -172,267 +175,299 @@ static char *amqp_password;
const char *connect_options_title = "Connection options";
struct poptOption connect_options[] = {
- {"url", 'u', POPT_ARG_STRING, &amqp_url, 0,
- "the AMQP URL to connect to", "amqp://..."},
- {"server", 's', POPT_ARG_STRING, &amqp_server, 0,
- "the AMQP server to connect to", "hostname"},
- {"port", 0, POPT_ARG_INT, &amqp_port, 0,
- "the port to connect on", "port" },
- {"vhost", 0, POPT_ARG_STRING, &amqp_vhost, 0,
- "the vhost to use when connecting", "vhost"},
- {"username", 0, POPT_ARG_STRING, &amqp_username, 0,
- "the username to login with", "username"},
- {"password", 0, POPT_ARG_STRING, &amqp_password, 0,
- "the password to login with", "password"},
- { NULL, '\0', 0, NULL, 0, NULL, NULL }
+ {
+ "url", 'u', POPT_ARG_STRING, &amqp_url, 0,
+ "the AMQP URL to connect to", "amqp://..."
+ },
+ {
+ "server", 's', POPT_ARG_STRING, &amqp_server, 0,
+ "the AMQP server to connect to", "hostname"
+ },
+ {
+ "port", 0, POPT_ARG_INT, &amqp_port, 0,
+ "the port to connect on", "port"
+ },
+ {
+ "vhost", 0, POPT_ARG_STRING, &amqp_vhost, 0,
+ "the vhost to use when connecting", "vhost"
+ },
+ {
+ "username", 0, POPT_ARG_STRING, &amqp_username, 0,
+ "the username to login with", "username"
+ },
+ {
+ "password", 0, POPT_ARG_STRING, &amqp_password, 0,
+ "the password to login with", "password"
+ },
+ { NULL, '\0', 0, NULL, 0, NULL, NULL }
};
static void init_connection_info(struct amqp_connection_info *ci)
{
- struct amqp_connection_info defaults;
+ struct amqp_connection_info defaults;
- ci->user = NULL;
- ci->password = NULL;
- ci->host = NULL;
- ci->port = -1;
- ci->vhost = NULL;
- ci->user = NULL;
+ ci->user = NULL;
+ ci->password = NULL;
+ ci->host = NULL;
+ ci->port = -1;
+ ci->vhost = NULL;
+ ci->user = NULL;
- if (amqp_url)
- die_amqp_error(amqp_parse_url(strdup(amqp_url), ci),
- "Parsing URL '%s'", amqp_url);
+ if (amqp_url) {
+ die_amqp_error(amqp_parse_url(strdup(amqp_url), ci),
+ "Parsing URL '%s'", amqp_url);
+ }
- if (amqp_server) {
+ if (amqp_server) {
char *colon;
- if (ci->host)
- die("both --server and --url options specify"
- " server host");
-
- /* parse the server string into a hostname and a port */
- colon = strchr(amqp_server, ':');
- if (colon) {
- char *port_end;
- size_t host_len;
-
- /* Deprecate specifying the port number with the
- --server option, because it is not ipv6 friendly.
- --url now allows connection options to be
- specificied concisely. */
- fprintf(stderr, "Specifying the port number with"
- " --server is deprecated\n");
-
- host_len = colon - amqp_server;
- ci->host = malloc(host_len + 1);
- memcpy(ci->host, amqp_server, host_len);
- ci->host[host_len] = 0;
-
- if (ci->port >= 0)
- die("both --server and --url options specify"
- " server port");
- if (amqp_port >= 0)
- die("both --server and --port options specify"
- " server port");
-
- ci->port = strtol(colon+1, &port_end, 10);
- if (ci->port < 0
- || ci->port > 65535
- || port_end == colon+1
- || *port_end != 0)
- die("bad server port number in '%s'",
- amqp_server);
- }
- }
-
- if (amqp_port >= 0) {
- if (ci->port >= 0)
- die("both --port and --url options specify"
- " server port");
-
- ci->port = amqp_port;
- }
-
- if (amqp_username) {
- if (ci->user)
- die("both --username and --url options specify"
- " AMQP username");
-
- ci->user = amqp_username;
- }
-
- if (amqp_password) {
- if (ci->password)
- die("both --password and --url options specify"
- " AMQP password");
-
- ci->password = amqp_password;
- }
-
- if (amqp_vhost) {
- if (ci->vhost)
- die("both --vhost and --url options specify"
- " AMQP vhost");
-
- ci->vhost = amqp_vhost;
- }
-
- amqp_default_connection_info(&defaults);
-
- if (!ci->user)
- ci->user = defaults.user;
- if (!ci->password)
- ci->password = defaults.password;
- if (!ci->host)
- ci->host = defaults.host;
- if (ci->port < 0)
- ci->port = defaults.port;
- if (!ci->vhost)
- ci->vhost = defaults.vhost;
+ if (ci->host) {
+ die("both --server and --url options specify"
+ " server host");
+ }
+
+ /* parse the server string into a hostname and a port */
+ colon = strchr(amqp_server, ':');
+ if (colon) {
+ char *port_end;
+ size_t host_len;
+
+ /* Deprecate specifying the port number with the
+ --server option, because it is not ipv6 friendly.
+ --url now allows connection options to be
+ specificied concisely. */
+ fprintf(stderr, "Specifying the port number with"
+ " --server is deprecated\n");
+
+ host_len = colon - amqp_server;
+ ci->host = malloc(host_len + 1);
+ memcpy(ci->host, amqp_server, host_len);
+ ci->host[host_len] = 0;
+
+ if (ci->port >= 0) {
+ die("both --server and --url options specify"
+ " server port");
+ }
+ if (amqp_port >= 0) {
+ die("both --server and --port options specify"
+ " server port");
+ }
+
+ ci->port = strtol(colon+1, &port_end, 10);
+ if (ci->port < 0
+ || ci->port > 65535
+ || port_end == colon+1
+ || *port_end != 0) {
+ die("bad server port number in '%s'",
+ amqp_server);
+ }
+ }
+ }
+
+ if (amqp_port >= 0) {
+ if (ci->port >= 0) {
+ die("both --port and --url options specify"
+ " server port");
+ }
+
+ ci->port = amqp_port;
+ }
+
+ if (amqp_username) {
+ if (ci->user) {
+ die("both --username and --url options specify"
+ " AMQP username");
+ }
+
+ ci->user = amqp_username;
+ }
+
+ if (amqp_password) {
+ if (ci->password) {
+ die("both --password and --url options specify"
+ " AMQP password");
+ }
+
+ ci->password = amqp_password;
+ }
+
+ if (amqp_vhost) {
+ if (ci->vhost) {
+ die("both --vhost and --url options specify"
+ " AMQP vhost");
+ }
+
+ ci->vhost = amqp_vhost;
+ }
+
+ amqp_default_connection_info(&defaults);
+
+ if (!ci->user) {
+ ci->user = defaults.user;
+ }
+ if (!ci->password) {
+ ci->password = defaults.password;
+ }
+ if (!ci->host) {
+ ci->host = defaults.host;
+ }
+ if (ci->port < 0) {
+ ci->port = defaults.port;
+ }
+ if (!ci->vhost) {
+ ci->vhost = defaults.vhost;
+ }
}
amqp_connection_state_t make_connection(void)
{
- int s;
- struct amqp_connection_info ci;
- amqp_connection_state_t conn;
+ int s;
+ struct amqp_connection_info ci;
+ amqp_connection_state_t conn;
- init_connection_info(&ci);
+ init_connection_info(&ci);
- s = amqp_open_socket(ci.host, ci.port);
- die_amqp_error(s, "opening socket to %s:%d", ci.host, ci.port);
+ s = amqp_open_socket(ci.host, ci.port);
+ die_amqp_error(s, "opening socket to %s:%d", ci.host, ci.port);
- conn = amqp_new_connection();
- amqp_set_sockfd(conn, s);
+ conn = amqp_new_connection();
+ amqp_set_sockfd(conn, s);
- die_rpc(amqp_login(conn, ci.vhost, 0, 131072, 0,
- AMQP_SASL_METHOD_PLAIN,
- ci.user, ci.password),
- "logging in to AMQP server");
+ die_rpc(amqp_login(conn, ci.vhost, 0, 131072, 0,
+ AMQP_SASL_METHOD_PLAIN,
+ ci.user, ci.password),
+ "logging in to AMQP server");
- if (!amqp_channel_open(conn, 1))
- die_rpc(amqp_get_rpc_reply(conn), "opening channel");
+ if (!amqp_channel_open(conn, 1)) {
+ die_rpc(amqp_get_rpc_reply(conn), "opening channel");
+ }
- return conn;
+ return conn;
}
void close_connection(amqp_connection_state_t conn)
{
- int res;
- die_rpc(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS),
- "closing channel");
- die_rpc(amqp_connection_close(conn, AMQP_REPLY_SUCCESS),
- "closing connection");
-
- res = amqp_destroy_connection(conn);
- die_amqp_error(res, "closing connection");
+ int res;
+ die_rpc(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS),
+ "closing channel");
+ die_rpc(amqp_connection_close(conn, AMQP_REPLY_SUCCESS),
+ "closing connection");
+
+ res = amqp_destroy_connection(conn);
+ die_amqp_error(res, "closing connection");
}
amqp_bytes_t read_all(int fd)
{
- size_t space = 4096;
- amqp_bytes_t bytes;
-
- bytes.bytes = malloc(space);
- bytes.len = 0;
-
- for (;;) {
- ssize_t res = read(fd, (char *)bytes.bytes + bytes.len,
- space-bytes.len);
- if (res == 0)
- break;
-
- if (res < 0) {
- if (errno == EINTR)
- continue;
-
- die_errno(errno, "reading");
- }
-
- bytes.len += res;
- if (bytes.len == space) {
- space *= 2;
- bytes.bytes = realloc(bytes.bytes, space);
- }
- }
-
- return bytes;
+ size_t space = 4096;
+ amqp_bytes_t bytes;
+
+ bytes.bytes = malloc(space);
+ bytes.len = 0;
+
+ for (;;) {
+ ssize_t res = read(fd, (char *)bytes.bytes + bytes.len,
+ space-bytes.len);
+ if (res == 0) {
+ break;
+ }
+
+ if (res < 0) {
+ if (errno == EINTR) {
+ continue;
+ }
+
+ die_errno(errno, "reading");
+ }
+
+ bytes.len += res;
+ if (bytes.len == space) {
+ space *= 2;
+ bytes.bytes = realloc(bytes.bytes, space);
+ }
+ }
+
+ return bytes;
}
void write_all(int fd, amqp_bytes_t data)
{
- while (data.len > 0) {
- ssize_t res = write(fd, data.bytes, data.len);
- if (res < 0)
- die_errno(errno, "write");
-
- data.len -= res;
- data.bytes = (char *)data.bytes + res;
- }
+ while (data.len > 0) {
+ ssize_t res = write(fd, data.bytes, data.len);
+ if (res < 0) {
+ die_errno(errno, "write");
+ }
+
+ data.len -= res;
+ data.bytes = (char *)data.bytes + res;
+ }
}
void copy_body(amqp_connection_state_t conn, int fd)
{
- size_t body_remaining;
- amqp_frame_t frame;
-
- int res = amqp_simple_wait_frame(conn, &frame);
- die_amqp_error(res, "waiting for header frame");
- if (frame.frame_type != AMQP_FRAME_HEADER)
- die("expected header, got frame type 0x%X",
- frame.frame_type);
-
- body_remaining = frame.payload.properties.body_size;
- while (body_remaining) {
- res = amqp_simple_wait_frame(conn, &frame);
- die_amqp_error(res, "waiting for body frame");
- if (frame.frame_type != AMQP_FRAME_BODY)
- die("expected body, got frame type 0x%X",
- frame.frame_type);
-
- write_all(fd, frame.payload.body_fragment);
- body_remaining -= frame.payload.body_fragment.len;
- }
+ size_t body_remaining;
+ amqp_frame_t frame;
+
+ int res = amqp_simple_wait_frame(conn, &frame);
+ die_amqp_error(res, "waiting for header frame");
+ if (frame.frame_type != AMQP_FRAME_HEADER) {
+ die("expected header, got frame type 0x%X",
+ frame.frame_type);
+ }
+
+ body_remaining = frame.payload.properties.body_size;
+ while (body_remaining) {
+ res = amqp_simple_wait_frame(conn, &frame);
+ die_amqp_error(res, "waiting for body frame");
+ if (frame.frame_type != AMQP_FRAME_BODY) {
+ die("expected body, got frame type 0x%X",
+ frame.frame_type);
+ }
+
+ write_all(fd, frame.payload.body_fragment);
+ body_remaining -= frame.payload.body_fragment.len;
+ }
}
poptContext process_options(int argc, const char **argv,
- struct poptOption *options,
- const char *help)
+ struct poptOption *options,
+ const char *help)
{
- int c;
- poptContext opts = poptGetContext(NULL, argc, argv, options, 0);
- poptSetOtherOptionHelp(opts, help);
-
- while ((c = poptGetNextOpt(opts)) >= 0) {
- /* no options require explicit handling */
- }
-
- if (c < -1) {
- fprintf(stderr, "%s: %s\n",
- poptBadOption(opts, POPT_BADOPTION_NOALIAS),
- poptStrerror(c));
- poptPrintUsage(opts, stderr, 0);
- exit(1);
- }
-
- return opts;
+ int c;
+ poptContext opts = poptGetContext(NULL, argc, argv, options, 0);
+ poptSetOtherOptionHelp(opts, help);
+
+ while ((c = poptGetNextOpt(opts)) >= 0) {
+ /* no options require explicit handling */
+ }
+
+ if (c < -1) {
+ fprintf(stderr, "%s: %s\n",
+ poptBadOption(opts, POPT_BADOPTION_NOALIAS),
+ poptStrerror(c));
+ poptPrintUsage(opts, stderr, 0);
+ exit(1);
+ }
+
+ return opts;
}
void process_all_options(int argc, const char **argv,
- struct poptOption *options)
+ struct poptOption *options)
{
- poptContext opts = process_options(argc, argv, options,
- "[OPTIONS]...");
- const char *opt = poptPeekArg(opts);
+ poptContext opts = process_options(argc, argv, options,
+ "[OPTIONS]...");
+ const char *opt = poptPeekArg(opts);
- if (opt) {
- fprintf(stderr, "unexpected operand: %s\n", opt);
- poptPrintUsage(opts, stderr, 0);
- exit(1);
- }
+ if (opt) {
+ fprintf(stderr, "unexpected operand: %s\n", opt);
+ poptPrintUsage(opts, stderr, 0);
+ exit(1);
+ }
- poptFreeContext(opts);
+ poptFreeContext(opts);
}
amqp_bytes_t cstring_bytes(const char *str)
{
- return str ? amqp_cstring_bytes(str) : amqp_empty_bytes;
+ return str ? amqp_cstring_bytes(str) : amqp_empty_bytes;
}
diff --git a/tools/common.h b/tools/common.h
index 0a9af9d..77979ee 100644
--- a/tools/common.h
+++ b/tools/common.h
@@ -42,13 +42,13 @@ extern char *amqp_server_exception_string(amqp_rpc_reply_t r);
extern char *amqp_rpc_reply_string(amqp_rpc_reply_t r);
extern void die(const char *fmt, ...)
- __attribute__ ((format (printf, 1, 2)));
+__attribute__ ((format (printf, 1, 2)));
extern void die_errno(int err, const char *fmt, ...)
- __attribute__ ((format (printf, 2, 3)));
+__attribute__ ((format (printf, 2, 3)));
extern void die_amqp_error(int err, const char *fmt, ...)
- __attribute__ ((format (printf, 2, 3)));
+__attribute__ ((format (printf, 2, 3)));
extern void die_rpc(amqp_rpc_reply_t r, const char *fmt, ...)
- __attribute__ ((format (printf, 2, 3)));
+__attribute__ ((format (printf, 2, 3)));
extern const char *connect_options_title;
extern struct poptOption connect_options[];
@@ -61,12 +61,12 @@ extern void write_all(int fd, amqp_bytes_t data);
extern void copy_body(amqp_connection_state_t conn, int fd);
#define INCLUDE_OPTIONS(options) \
- {NULL, 0, POPT_ARG_INCLUDE_TABLE, options, 0, options ## _title, NULL}
+ {NULL, 0, POPT_ARG_INCLUDE_TABLE, options, 0, options ## _title, NULL}
extern poptContext process_options(int argc, const char **argv,
- struct poptOption *options,
- const char *help);
+ struct poptOption *options,
+ const char *help);
extern void process_all_options(int argc, const char **argv,
- struct poptOption *options);
+ struct poptOption *options);
extern amqp_bytes_t cstring_bytes(const char *str);
diff --git a/tools/consume.c b/tools/consume.c
index d6ddfa8..c52311e 100644
--- a/tools/consume.c
+++ b/tools/consume.c
@@ -45,164 +45,179 @@
use the same escaping conventions as rabbitmqctl. */
static char *stringify_bytes(amqp_bytes_t bytes)
{
- /* We will need up to 4 chars per byte, plus the terminating 0 */
- char *res = malloc(bytes.len * 4 + 1);
- uint8_t *data = bytes.bytes;
- char *p = res;
- size_t i;
-
- for (i = 0; i < bytes.len; i++) {
- if (data[i] >= 32 && data[i] != 127) {
- *p++ = data[i];
- }
- else {
- *p++ = '\\';
- *p++ = '0' + (data[i] >> 6);
- *p++ = '0' + (data[i] >> 3 & 0x7);
- *p++ = '0' + (data[i] & 0x7);
- }
- }
-
- *p = 0;
- return res;
+ /* We will need up to 4 chars per byte, plus the terminating 0 */
+ char *res = malloc(bytes.len * 4 + 1);
+ uint8_t *data = bytes.bytes;
+ char *p = res;
+ size_t i;
+
+ for (i = 0; i < bytes.len; i++) {
+ if (data[i] >= 32 && data[i] != 127) {
+ *p++ = data[i];
+ } else {
+ *p++ = '\\';
+ *p++ = '0' + (data[i] >> 6);
+ *p++ = '0' + (data[i] >> 3 & 0x7);
+ *p++ = '0' + (data[i] & 0x7);
+ }
+ }
+
+ *p = 0;
+ return res;
}
static amqp_bytes_t setup_queue(amqp_connection_state_t conn,
- char *queue, char *exchange,
- char *routing_key, int declare)
+ char *queue, char *exchange,
+ char *routing_key, int declare)
{
- amqp_bytes_t queue_bytes = cstring_bytes(queue);
-
- /* if an exchange name wasn't provided, check that we don't
- have options that require it. */
- if (!exchange && routing_key) {
- fprintf(stderr, "--routing-key option requires an exchange"
- " name to be provided with --exchange\n");
- exit(1);
- }
-
- if (!queue || exchange || declare) {
- /* Declare the queue as auto-delete. */
- amqp_queue_declare_ok_t *res = amqp_queue_declare(conn, 1,
- queue_bytes, 0, 0, 1, 1,
- amqp_empty_table);
- if (!res)
- die_rpc(amqp_get_rpc_reply(conn), "queue.declare");
-
- if (!queue) {
- /* the server should have provided a queue name */
- char *sq;
- queue_bytes = amqp_bytes_malloc_dup(res->queue);
- sq = stringify_bytes(queue_bytes);
- fprintf(stderr, "Server provided queue name: %s\n",
- sq);
- free(sq);
- }
-
- /* Bind to an exchange if requested */
- if (exchange) {
- amqp_bytes_t eb = amqp_cstring_bytes(exchange);
- if (!amqp_queue_bind(conn, 1, queue_bytes, eb,
- cstring_bytes(routing_key),
- amqp_empty_table))
- die_rpc(amqp_get_rpc_reply(conn),
- "queue.bind");
- }
- }
-
- return queue_bytes;
+ amqp_bytes_t queue_bytes = cstring_bytes(queue);
+
+ /* if an exchange name wasn't provided, check that we don't
+ have options that require it. */
+ if (!exchange && routing_key) {
+ fprintf(stderr, "--routing-key option requires an exchange"
+ " name to be provided with --exchange\n");
+ exit(1);
+ }
+
+ if (!queue || exchange || declare) {
+ /* Declare the queue as auto-delete. */
+ amqp_queue_declare_ok_t *res = amqp_queue_declare(conn, 1,
+ queue_bytes, 0, 0, 1, 1,
+ amqp_empty_table);
+ if (!res) {
+ die_rpc(amqp_get_rpc_reply(conn), "queue.declare");
+ }
+
+ if (!queue) {
+ /* the server should have provided a queue name */
+ char *sq;
+ queue_bytes = amqp_bytes_malloc_dup(res->queue);
+ sq = stringify_bytes(queue_bytes);
+ fprintf(stderr, "Server provided queue name: %s\n",
+ sq);
+ free(sq);
+ }
+
+ /* Bind to an exchange if requested */
+ if (exchange) {
+ amqp_bytes_t eb = amqp_cstring_bytes(exchange);
+ if (!amqp_queue_bind(conn, 1, queue_bytes, eb,
+ cstring_bytes(routing_key),
+ amqp_empty_table))
+ die_rpc(amqp_get_rpc_reply(conn),
+ "queue.bind");
+ }
+ }
+
+ return queue_bytes;
}
static void do_consume(amqp_connection_state_t conn, amqp_bytes_t queue,
- int no_ack, int count, const char * const *argv)
+ int no_ack, int count, const char *const *argv)
{
- int i;
-
- /* If there is a limit, set the qos to match */
- if (count > 0 && count <= 65535
- && !amqp_basic_qos(conn, 1, 0, count, 0))
- die_rpc(amqp_get_rpc_reply(conn), "basic.qos");
-
- if (!amqp_basic_consume(conn, 1, queue, amqp_empty_bytes, 0, no_ack,
- 0, amqp_empty_table))
- die_rpc(amqp_get_rpc_reply(conn), "basic.consume");
-
- for (i = 0; count < 0 || i < count; i++) {
- amqp_frame_t frame;
- struct pipeline pl;
- uint64_t delivery_tag;
- int res = amqp_simple_wait_frame(conn, &frame);
- die_amqp_error(res, "waiting for header frame");
-
- if (frame.frame_type != AMQP_FRAME_METHOD
- || frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD)
- continue;
-
- amqp_basic_deliver_t *deliver
- = (amqp_basic_deliver_t *)frame.payload.method.decoded;
- delivery_tag = deliver->delivery_tag;
-
- pipeline(argv, &pl);
- copy_body(conn, pl.infd);
-
- if (finish_pipeline(&pl) && !no_ack)
- die_amqp_error(amqp_basic_ack(conn, 1, delivery_tag,
- 0),
- "basic.ack");
-
- amqp_maybe_release_buffers(conn);
- }
+ int i;
+
+ /* If there is a limit, set the qos to match */
+ if (count > 0 && count <= 65535
+ && !amqp_basic_qos(conn, 1, 0, count, 0)) {
+ die_rpc(amqp_get_rpc_reply(conn), "basic.qos");
+ }
+
+ if (!amqp_basic_consume(conn, 1, queue, amqp_empty_bytes, 0, no_ack,
+ 0, amqp_empty_table)) {
+ die_rpc(amqp_get_rpc_reply(conn), "basic.consume");
+ }
+
+ for (i = 0; count < 0 || i < count; i++) {
+ amqp_frame_t frame;
+ struct pipeline pl;
+ uint64_t delivery_tag;
+ int res = amqp_simple_wait_frame(conn, &frame);
+ die_amqp_error(res, "waiting for header frame");
+
+ if (frame.frame_type != AMQP_FRAME_METHOD
+ || frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) {
+ continue;
+ }
+
+ amqp_basic_deliver_t *deliver
+ = (amqp_basic_deliver_t *)frame.payload.method.decoded;
+ delivery_tag = deliver->delivery_tag;
+
+ pipeline(argv, &pl);
+ copy_body(conn, pl.infd);
+
+ if (finish_pipeline(&pl) && !no_ack)
+ die_amqp_error(amqp_basic_ack(conn, 1, delivery_tag,
+ 0),
+ "basic.ack");
+
+ amqp_maybe_release_buffers(conn);
+ }
}
int main(int argc, const char **argv)
{
- poptContext opts;
- amqp_connection_state_t conn;
- const char * const *cmd_argv;
- char *queue = NULL;
- char *exchange = NULL;
- char *routing_key = NULL;
- int declare = 0;
- int no_ack = 0;
- int count = -1;
- amqp_bytes_t queue_bytes;
-
- struct poptOption options[] = {
- INCLUDE_OPTIONS(connect_options),
- {"queue", 'q', POPT_ARG_STRING, &queue, 0,
- "the queue to consume from", "queue"},
- {"exchange", 'e', POPT_ARG_STRING, &exchange, 0,
- "bind the queue to this exchange", "exchange"},
- {"routing-key", 'r', POPT_ARG_STRING, &routing_key, 0,
- "the routing key to bind with", "routing key"},
- {"declare", 'd', POPT_ARG_NONE, &declare, 0,
- "declare an exclusive queue", NULL},
- {"no-ack", 'A', POPT_ARG_NONE, &no_ack, 0,
- "consume in no-ack mode", NULL},
- {"count", 'c', POPT_ARG_INT, &count, 0,
- "stop consuming after this many messages are consumed",
- "limit"},
- POPT_AUTOHELP
- { NULL, '\0', 0, NULL, 0, NULL, NULL }
- };
-
- opts = process_options(argc, argv, options,
- "[OPTIONS]... <command> <args>");
-
- cmd_argv = poptGetArgs(opts);
- if (!cmd_argv || !cmd_argv[0]) {
- fprintf(stderr, "consuming command not specified\n");
- poptPrintUsage(opts, stderr, 0);
- goto error;
- }
-
- conn = make_connection();
- queue_bytes = setup_queue(conn, queue, exchange, routing_key, declare);
- do_consume(conn, queue_bytes, no_ack, count, cmd_argv);
- close_connection(conn);
- return 0;
+ poptContext opts;
+ amqp_connection_state_t conn;
+ const char *const *cmd_argv;
+ char *queue = NULL;
+ char *exchange = NULL;
+ char *routing_key = NULL;
+ int declare = 0;
+ int no_ack = 0;
+ int count = -1;
+ amqp_bytes_t queue_bytes;
+
+ struct poptOption options[] = {
+ INCLUDE_OPTIONS(connect_options),
+ {
+ "queue", 'q', POPT_ARG_STRING, &queue, 0,
+ "the queue to consume from", "queue"
+ },
+ {
+ "exchange", 'e', POPT_ARG_STRING, &exchange, 0,
+ "bind the queue to this exchange", "exchange"
+ },
+ {
+ "routing-key", 'r', POPT_ARG_STRING, &routing_key, 0,
+ "the routing key to bind with", "routing key"
+ },
+ {
+ "declare", 'd', POPT_ARG_NONE, &declare, 0,
+ "declare an exclusive queue", NULL
+ },
+ {
+ "no-ack", 'A', POPT_ARG_NONE, &no_ack, 0,
+ "consume in no-ack mode", NULL
+ },
+ {
+ "count", 'c', POPT_ARG_INT, &count, 0,
+ "stop consuming after this many messages are consumed",
+ "limit"
+ },
+ POPT_AUTOHELP
+ { NULL, '\0', 0, NULL, 0, NULL, NULL }
+ };
+
+ opts = process_options(argc, argv, options,
+ "[OPTIONS]... <command> <args>");
+
+ cmd_argv = poptGetArgs(opts);
+ if (!cmd_argv || !cmd_argv[0]) {
+ fprintf(stderr, "consuming command not specified\n");
+ poptPrintUsage(opts, stderr, 0);
+ goto error;
+ }
+
+ conn = make_connection();
+ queue_bytes = setup_queue(conn, queue, exchange, routing_key, declare);
+ do_consume(conn, queue_bytes, no_ack, count, cmd_argv);
+ close_connection(conn);
+ return 0;
error:
- poptFreeContext(opts);
- return 1;
+ poptFreeContext(opts);
+ return 1;
}
diff --git a/tools/declare_queue.c b/tools/declare_queue.c
index 26c3a68..25a94b1 100644
--- a/tools/declare_queue.c
+++ b/tools/declare_queue.c
@@ -44,41 +44,46 @@
int main(int argc, const char **argv)
{
- amqp_connection_state_t conn;
- char *queue = NULL;
- int durable = 0;
+ amqp_connection_state_t conn;
+ char *queue = NULL;
+ int durable = 0;
- struct poptOption options[] = {
- INCLUDE_OPTIONS(connect_options),
- {"queue", 'q', POPT_ARG_STRING, &queue, 0,
- "the queue name to declare, or the empty string", "queue"},
- {"durable", 'd', POPT_ARG_VAL, &durable, 1,
- "declare a durable queue", NULL},
- POPT_AUTOHELP
- { NULL, '\0', 0, NULL, 0, NULL, NULL }
- };
+ struct poptOption options[] = {
+ INCLUDE_OPTIONS(connect_options),
+ {
+ "queue", 'q', POPT_ARG_STRING, &queue, 0,
+ "the queue name to declare, or the empty string", "queue"
+ },
+ {
+ "durable", 'd', POPT_ARG_VAL, &durable, 1,
+ "declare a durable queue", NULL
+ },
+ POPT_AUTOHELP
+ { NULL, '\0', 0, NULL, 0, NULL, NULL }
+ };
- process_all_options(argc, argv, options);
+ process_all_options(argc, argv, options);
- if (queue == NULL) {
- fprintf(stderr, "queue name not specified\n");
- return 1;
- }
+ if (queue == NULL) {
+ fprintf(stderr, "queue name not specified\n");
+ return 1;
+ }
- conn = make_connection();
- {
- amqp_queue_declare_ok_t *reply = amqp_queue_declare(conn, 1,
- cstring_bytes(queue),
- 0,
- durable,
- 0,
- 0,
- amqp_empty_table);
- if (reply == NULL)
- die_rpc(amqp_get_rpc_reply(conn), "queue.declare");
+ conn = make_connection();
+ {
+ amqp_queue_declare_ok_t *reply = amqp_queue_declare(conn, 1,
+ cstring_bytes(queue),
+ 0,
+ durable,
+ 0,
+ 0,
+ amqp_empty_table);
+ if (reply == NULL) {
+ die_rpc(amqp_get_rpc_reply(conn), "queue.declare");
+ }
- printf("%.*s\n", (int)reply->queue.len, (char *)reply->queue.bytes);
- }
- close_connection(conn);
- return 0;
+ printf("%.*s\n", (int)reply->queue.len, (char *)reply->queue.bytes);
+ }
+ close_connection(conn);
+ return 0;
}
diff --git a/tools/delete_queue.c b/tools/delete_queue.c
index cb92f7b..4ebbd94 100644
--- a/tools/delete_queue.c
+++ b/tools/delete_queue.c
@@ -44,41 +44,47 @@
int main(int argc, const char **argv)
{
- amqp_connection_state_t conn;
- char *queue = NULL;
- int if_unused = 0;
- int if_empty = 0;
+ amqp_connection_state_t conn;
+ char *queue = NULL;
+ int if_unused = 0;
+ int if_empty = 0;
- struct poptOption options[] = {
- INCLUDE_OPTIONS(connect_options),
- {"queue", 'q', POPT_ARG_STRING, &queue, 0,
- "the queue name to delete", "queue"},
- {"if-unused", 'u', POPT_ARG_VAL, &if_unused, 1,
- "do not delete unless queue is unused", NULL},
- {"if-empty", 'e', POPT_ARG_VAL, &if_empty, 1,
- "do not delete unless queue is empty", NULL},
- POPT_AUTOHELP
- { NULL, '\0', 0, NULL, 0, NULL, NULL }
- };
+ struct poptOption options[] = {
+ INCLUDE_OPTIONS(connect_options),
+ {
+ "queue", 'q', POPT_ARG_STRING, &queue, 0,
+ "the queue name to delete", "queue"
+ },
+ {
+ "if-unused", 'u', POPT_ARG_VAL, &if_unused, 1,
+ "do not delete unless queue is unused", NULL
+ },
+ {
+ "if-empty", 'e', POPT_ARG_VAL, &if_empty, 1,
+ "do not delete unless queue is empty", NULL
+ },
+ POPT_AUTOHELP
+ { NULL, '\0', 0, NULL, 0, NULL, NULL }
+ };
- process_all_options(argc, argv, options);
+ process_all_options(argc, argv, options);
- if (queue == NULL || *queue == '\0') {
- fprintf(stderr, "queue name not specified\n");
- return 1;
- }
+ if (queue == NULL || *queue == '\0') {
+ fprintf(stderr, "queue name not specified\n");
+ return 1;
+ }
- conn = make_connection();
- {
- amqp_queue_delete_ok_t *reply = amqp_queue_delete(conn, 1,
- cstring_bytes(queue),
- if_unused,
- if_empty);
- if (reply == NULL) {
- die_rpc(amqp_get_rpc_reply(conn), "queue.delete");
- }
- printf("%u\n", reply->message_count);
- }
- close_connection(conn);
- return 0;
+ conn = make_connection();
+ {
+ amqp_queue_delete_ok_t *reply = amqp_queue_delete(conn, 1,
+ cstring_bytes(queue),
+ if_unused,
+ if_empty);
+ if (reply == NULL) {
+ die_rpc(amqp_get_rpc_reply(conn), "queue.delete");
+ }
+ printf("%u\n", reply->message_count);
+ }
+ close_connection(conn);
+ return 0;
}
diff --git a/tools/get.c b/tools/get.c
index 888e069..bbcde72 100644
--- a/tools/get.c
+++ b/tools/get.c
@@ -41,40 +41,43 @@
static int do_get(amqp_connection_state_t conn, char *queue)
{
- amqp_rpc_reply_t r
- = amqp_basic_get(conn, 1, cstring_bytes(queue), 1);
- die_rpc(r, "basic.get");
+ amqp_rpc_reply_t r
+ = amqp_basic_get(conn, 1, cstring_bytes(queue), 1);
+ die_rpc(r, "basic.get");
- if (r.reply.id == AMQP_BASIC_GET_EMPTY_METHOD)
- return 0;
+ if (r.reply.id == AMQP_BASIC_GET_EMPTY_METHOD) {
+ return 0;
+ }
- copy_body(conn, 1);
- return 1;
+ copy_body(conn, 1);
+ return 1;
}
int main(int argc, const char **argv)
{
- amqp_connection_state_t conn;
- char *queue = NULL;
- int got_something;
+ amqp_connection_state_t conn;
+ char *queue = NULL;
+ int got_something;
- struct poptOption options[] = {
- INCLUDE_OPTIONS(connect_options),
- {"queue", 'q', POPT_ARG_STRING, &queue, 0,
- "the queue to consume from", "queue"},
- POPT_AUTOHELP
- { NULL, '\0', 0, NULL, 0, NULL, NULL }
- };
+ struct poptOption options[] = {
+ INCLUDE_OPTIONS(connect_options),
+ {
+ "queue", 'q', POPT_ARG_STRING, &queue, 0,
+ "the queue to consume from", "queue"
+ },
+ POPT_AUTOHELP
+ { NULL, '\0', 0, NULL, 0, NULL, NULL }
+ };
- process_all_options(argc, argv, options);
+ process_all_options(argc, argv, options);
- if (!queue) {
- fprintf(stderr, "queue not specified\n");
- return 1;
- }
+ if (!queue) {
+ fprintf(stderr, "queue not specified\n");
+ return 1;
+ }
- conn = make_connection();
- got_something = do_get(conn, queue);
- close_connection(conn);
- return got_something ? 0 : 2;
+ conn = make_connection();
+ got_something = do_get(conn, queue);
+ close_connection(conn);
+ return got_something ? 0 : 2;
}
diff --git a/tools/publish.c b/tools/publish.c
index 5ebc191..617b055 100644
--- a/tools/publish.c
+++ b/tools/publish.c
@@ -43,80 +43,94 @@
static void do_publish(amqp_connection_state_t conn,
char *exchange, char *routing_key,
- amqp_basic_properties_t *props, amqp_bytes_t body)
+ amqp_basic_properties_t *props, amqp_bytes_t body)
{
- int res = amqp_basic_publish(conn, 1,
- cstring_bytes(exchange),
- cstring_bytes(routing_key),
- 0, 0, props, body);
- die_amqp_error(res, "basic.publish");
+ int res = amqp_basic_publish(conn, 1,
+ cstring_bytes(exchange),
+ cstring_bytes(routing_key),
+ 0, 0, props, body);
+ die_amqp_error(res, "basic.publish");
}
int main(int argc, const char **argv)
{
- amqp_connection_state_t conn;
- char *exchange = NULL;
- char *routing_key = NULL;
- char *content_type = NULL;
- char *content_encoding = NULL;
- char *body = NULL;
- amqp_basic_properties_t props;
- amqp_bytes_t body_bytes;
- int delivery = 1; /* non-persistent by default */
-
- struct poptOption options[] = {
- INCLUDE_OPTIONS(connect_options),
- {"exchange", 'e', POPT_ARG_STRING, &exchange, 0,
- "the exchange to publish to", "exchange"},
- {"routing-key", 'r', POPT_ARG_STRING, &routing_key, 0,
- "the routing key to publish with", "routing key"},
- {"persistent", 'p', POPT_ARG_VAL, &delivery, 2,
- "use the persistent delivery mode", NULL},
- {"content-type", 'C', POPT_ARG_STRING, &content_type, 0,
- "the content-type for the message", "content type"},
- {"content-encoding", 'E', POPT_ARG_STRING,
- &content_encoding, 0,
- "the content-encoding for the message", "content encoding"},
- {"body", 'b', POPT_ARG_STRING, &body, 0,
- "specify the message body", "body"},
- POPT_AUTOHELP
- { NULL, '\0', 0, NULL, 0, NULL, NULL }
- };
-
- process_all_options(argc, argv, options);
-
- if (!exchange && !routing_key) {
- fprintf(stderr,
- "neither exchange nor routing key specified\n");
- return 1;
- }
-
- memset(&props, 0, sizeof props);
- props._flags = AMQP_BASIC_DELIVERY_MODE_FLAG;
- props.delivery_mode = 2; /* persistent delivery mode */
-
- if (content_type) {
- props._flags |= AMQP_BASIC_CONTENT_TYPE_FLAG;
- props.content_type = amqp_cstring_bytes(content_type);
- }
-
- if (content_encoding) {
- props._flags |= AMQP_BASIC_CONTENT_ENCODING_FLAG;
- props.content_encoding = amqp_cstring_bytes(content_encoding);
- }
-
- conn = make_connection();
-
- if (body)
- body_bytes = amqp_cstring_bytes(body);
- else
- body_bytes = read_all(0);
-
- do_publish(conn, exchange, routing_key, &props, body_bytes);
-
- if (!body)
- free(body_bytes.bytes);
-
- close_connection(conn);
- return 0;
+ amqp_connection_state_t conn;
+ char *exchange = NULL;
+ char *routing_key = NULL;
+ char *content_type = NULL;
+ char *content_encoding = NULL;
+ char *body = NULL;
+ amqp_basic_properties_t props;
+ amqp_bytes_t body_bytes;
+ int delivery = 1; /* non-persistent by default */
+
+ struct poptOption options[] = {
+ INCLUDE_OPTIONS(connect_options),
+ {
+ "exchange", 'e', POPT_ARG_STRING, &exchange, 0,
+ "the exchange to publish to", "exchange"
+ },
+ {
+ "routing-key", 'r', POPT_ARG_STRING, &routing_key, 0,
+ "the routing key to publish with", "routing key"
+ },
+ {
+ "persistent", 'p', POPT_ARG_VAL, &delivery, 2,
+ "use the persistent delivery mode", NULL
+ },
+ {
+ "content-type", 'C', POPT_ARG_STRING, &content_type, 0,
+ "the content-type for the message", "content type"
+ },
+ {
+ "content-encoding", 'E', POPT_ARG_STRING,
+ &content_encoding, 0,
+ "the content-encoding for the message", "content encoding"
+ },
+ {
+ "body", 'b', POPT_ARG_STRING, &body, 0,
+ "specify the message body", "body"
+ },
+ POPT_AUTOHELP
+ { NULL, '\0', 0, NULL, 0, NULL, NULL }
+ };
+
+ process_all_options(argc, argv, options);
+
+ if (!exchange && !routing_key) {
+ fprintf(stderr,
+ "neither exchange nor routing key specified\n");
+ return 1;
+ }
+
+ memset(&props, 0, sizeof props);
+ props._flags = AMQP_BASIC_DELIVERY_MODE_FLAG;
+ props.delivery_mode = 2; /* persistent delivery mode */
+
+ if (content_type) {
+ props._flags |= AMQP_BASIC_CONTENT_TYPE_FLAG;
+ props.content_type = amqp_cstring_bytes(content_type);
+ }
+
+ if (content_encoding) {
+ props._flags |= AMQP_BASIC_CONTENT_ENCODING_FLAG;
+ props.content_encoding = amqp_cstring_bytes(content_encoding);
+ }
+
+ conn = make_connection();
+
+ if (body) {
+ body_bytes = amqp_cstring_bytes(body);
+ } else {
+ body_bytes = read_all(0);
+ }
+
+ do_publish(conn, exchange, routing_key, &props, body_bytes);
+
+ if (!body) {
+ free(body_bytes.bytes);
+ }
+
+ close_connection(conn);
+ return 0;
}
diff --git a/tools/unix/process.c b/tools/unix/process.c
index a714928..249fba3 100644
--- a/tools/unix/process.c
+++ b/tools/unix/process.c
@@ -47,41 +47,45 @@ extern char **environ;
void pipeline(const char *const *argv, struct pipeline *pl)
{
- posix_spawn_file_actions_t file_acts;
+ posix_spawn_file_actions_t file_acts;
- int pipefds[2];
- if (pipe(pipefds))
- die_errno(errno, "pipe");
+ int pipefds[2];
+ if (pipe(pipefds)) {
+ die_errno(errno, "pipe");
+ }
- die_errno(posix_spawn_file_actions_init(&file_acts),
- "posix_spawn_file_actions_init");
- die_errno(posix_spawn_file_actions_adddup2(&file_acts, pipefds[0], 0),
- "posix_spawn_file_actions_adddup2");
- die_errno(posix_spawn_file_actions_addclose(&file_acts, pipefds[0]),
- "posix_spawn_file_actions_addclose");
- die_errno(posix_spawn_file_actions_addclose(&file_acts, pipefds[1]),
- "posix_spawn_file_actions_addclose");
+ die_errno(posix_spawn_file_actions_init(&file_acts),
+ "posix_spawn_file_actions_init");
+ die_errno(posix_spawn_file_actions_adddup2(&file_acts, pipefds[0], 0),
+ "posix_spawn_file_actions_adddup2");
+ die_errno(posix_spawn_file_actions_addclose(&file_acts, pipefds[0]),
+ "posix_spawn_file_actions_addclose");
+ die_errno(posix_spawn_file_actions_addclose(&file_acts, pipefds[1]),
+ "posix_spawn_file_actions_addclose");
- die_errno(posix_spawnp(&pl->pid, argv[0], &file_acts, NULL,
- (char * const *)argv, environ),
- "posix_spawnp: %s", argv[0]);
+ die_errno(posix_spawnp(&pl->pid, argv[0], &file_acts, NULL,
+ (char * const *)argv, environ),
+ "posix_spawnp: %s", argv[0]);
- die_errno(posix_spawn_file_actions_destroy(&file_acts),
- "posix_spawn_file_actions_destroy");
+ die_errno(posix_spawn_file_actions_destroy(&file_acts),
+ "posix_spawn_file_actions_destroy");
- if (close(pipefds[0]))
- die_errno(errno, "close");
+ if (close(pipefds[0])) {
+ die_errno(errno, "close");
+ }
- pl->infd = pipefds[1];
+ pl->infd = pipefds[1];
}
int finish_pipeline(struct pipeline *pl)
{
- int status;
+ int status;
- if (close(pl->infd))
- die_errno(errno, "close");
- if (waitpid(pl->pid, &status, 0) < 0)
- die_errno(errno, "waitpid");
- return WIFEXITED(status) && WEXITSTATUS(status) == 0;
+ if (close(pl->infd)) {
+ die_errno(errno, "close");
+ }
+ if (waitpid(pl->pid, &status, 0) < 0) {
+ die_errno(errno, "waitpid");
+ }
+ return WIFEXITED(status) && WEXITSTATUS(status) == 0;
}
diff --git a/tools/unix/process.h b/tools/unix/process.h
index 0aad292..a211f27 100644
--- a/tools/unix/process.h
+++ b/tools/unix/process.h
@@ -32,8 +32,8 @@
*/
struct pipeline {
- int pid;
- int infd;
+ int pid;
+ int infd;
};
extern void pipeline(const char *const *argv, struct pipeline *pl);
diff --git a/tools/win32/compat.c b/tools/win32/compat.c
index cbac8e6..7c7d97e 100644
--- a/tools/win32/compat.c
+++ b/tools/win32/compat.c
@@ -43,21 +43,22 @@
int asprintf(char **strp, const char *fmt, ...)
{
- va_list ap;
- int len;
+ va_list ap;
+ int len;
- va_start(ap, fmt);
- len = _vscprintf(fmt, ap);
- va_end(ap);
+ va_start(ap, fmt);
+ len = _vscprintf(fmt, ap);
+ va_end(ap);
- *strp = malloc(len+1);
- if (!*strp)
- return -1;
+ *strp = malloc(len+1);
+ if (!*strp) {
+ return -1;
+ }
- va_start(ap, fmt);
- _vsnprintf(*strp, len+1, fmt, ap);
- va_end(ap);
+ va_start(ap, fmt);
+ _vsnprintf(*strp, len+1, fmt, ap);
+ va_end(ap);
- (*strp)[len] = 0;
- return len;
+ (*strp)[len] = 0;
+ return len;
}
diff --git a/tools/win32/process.c b/tools/win32/process.c
index 699f60f..4d5270b 100644
--- a/tools/win32/process.c
+++ b/tools/win32/process.c
@@ -44,171 +44,188 @@
void die_windows_error(const char *fmt, ...)
{
- char *msg;
-
- va_list ap;
- va_start(ap, fmt);
- vfprintf(stderr, fmt, ap);
- va_end(ap);
-
- if (!FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM
- | FORMAT_MESSAGE_ALLOCATE_BUFFER,
- NULL, GetLastError(),
- MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
- (LPSTR)&msg, 0, NULL))
- msg = "(failed to retrieve Windows error message)";
-
- fprintf(stderr, ": %s\n", msg);
- exit(1);
+ char *msg;
+
+ va_list ap;
+ va_start(ap, fmt);
+ vfprintf(stderr, fmt, ap);
+ va_end(ap);
+
+ if (!FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM
+ | FORMAT_MESSAGE_ALLOCATE_BUFFER,
+ NULL, GetLastError(),
+ MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
+ (LPSTR)&msg, 0, NULL)) {
+ msg = "(failed to retrieve Windows error message)";
+ }
+
+ fprintf(stderr, ": %s\n", msg);
+ exit(1);
}
static char *make_command_line(const char *const *argv)
{
- int i;
- size_t len = 1; /* initial quotes */
- char *buf;
- char *dest;
-
- /* calculate the length of the required buffer, making worst
- case assumptions for simplicity */
- for (i = 0;;) {
- /* each character could need escaping */
- len += strlen(argv[i]) * 2;
-
- if (!argv[++i])
- break;
-
- len += 3; /* quotes, space, quotes */
- }
-
- len += 2; /* final quotes and the terminating zero */
-
- dest = buf = malloc(len);
- if (!buf)
- die("allocating memory for subprocess command line");
-
- /* Here we perform the inverse of the CommandLineToArgvW
- function. Note that its rules are slightly crazy: A
- sequence of backslashes only act to escape if followed by
- double quotes. A sequence of backslashes not followed by
- double quotes is untouched. */
-
- for (i = 0;;) {
- const char *src = argv[i];
- int backslashes = 0;
-
- *dest++ = '\"';
-
- for (;;) {
- switch (*src) {
- case 0:
- goto done;
-
- case '\"':
- for (; backslashes; backslashes--)
- *dest++ = '\\';
-
- *dest++ = '\\';
- *dest++ = '\"';
- break;
-
- case '\\':
- backslashes++;
- *dest++ = '\\';
- break;
-
- default:
- backslashes = 0;
- *dest++ = *src;
- break;
- }
-
- src++;
- }
- done:
- for (; backslashes; backslashes--)
- *dest++ = '\\';
-
- *dest++ = '\"';
-
- if (!argv[++i])
- break;
-
- *dest++ = ' ';
- }
-
- *dest++ = 0;
- return buf;
+ int i;
+ size_t len = 1; /* initial quotes */
+ char *buf;
+ char *dest;
+
+ /* calculate the length of the required buffer, making worst
+ case assumptions for simplicity */
+ for (i = 0;;) {
+ /* each character could need escaping */
+ len += strlen(argv[i]) * 2;
+
+ if (!argv[++i]) {
+ break;
+ }
+
+ len += 3; /* quotes, space, quotes */
+ }
+
+ len += 2; /* final quotes and the terminating zero */
+
+ dest = buf = malloc(len);
+ if (!buf) {
+ die("allocating memory for subprocess command line");
+ }
+
+ /* Here we perform the inverse of the CommandLineToArgvW
+ function. Note that its rules are slightly crazy: A
+ sequence of backslashes only act to escape if followed by
+ double quotes. A sequence of backslashes not followed by
+ double quotes is untouched. */
+
+ for (i = 0;;) {
+ const char *src = argv[i];
+ int backslashes = 0;
+
+ *dest++ = '\"';
+
+ for (;;) {
+ switch (*src) {
+ case 0:
+ goto done;
+
+ case '\"':
+ for (; backslashes; backslashes--) {
+ *dest++ = '\\';
+ }
+
+ *dest++ = '\\';
+ *dest++ = '\"';
+ break;
+
+ case '\\':
+ backslashes++;
+ *dest++ = '\\';
+ break;
+
+ default:
+ backslashes = 0;
+ *dest++ = *src;
+ break;
+ }
+
+ src++;
+ }
+done:
+ for (; backslashes; backslashes--) {
+ *dest++ = '\\';
+ }
+
+ *dest++ = '\"';
+
+ if (!argv[++i]) {
+ break;
+ }
+
+ *dest++ = ' ';
+ }
+
+ *dest++ = 0;
+ return buf;
}
void pipeline(const char *const *argv, struct pipeline *pl)
{
- HANDLE in_read_handle, in_write_handle;
- SECURITY_ATTRIBUTES sec_attr;
- PROCESS_INFORMATION proc_info;
- STARTUPINFO start_info;
- char *cmdline = make_command_line(argv);
-
- sec_attr.nLength = sizeof sec_attr;
- sec_attr.bInheritHandle = TRUE;
- sec_attr.lpSecurityDescriptor = NULL;
-
- if (!CreatePipe(&in_read_handle, &in_write_handle, &sec_attr, 0))
- die_windows_error("CreatePipe");
-
- if (!SetHandleInformation(in_write_handle, HANDLE_FLAG_INHERIT, 0))
- die_windows_error("SetHandleInformation");
-
- /* when in Rome... */
- ZeroMemory(&proc_info, sizeof proc_info);
- ZeroMemory(&start_info, sizeof start_info);
-
- start_info.cb = sizeof start_info;
- start_info.dwFlags |= STARTF_USESTDHANDLES;
-
- if ((start_info.hStdError = GetStdHandle(STD_ERROR_HANDLE))
- == INVALID_HANDLE_VALUE
- || (start_info.hStdOutput = GetStdHandle(STD_OUTPUT_HANDLE))
- == INVALID_HANDLE_VALUE)
- die_windows_error("GetStdHandle");
-
- start_info.hStdInput = in_read_handle;
-
- if (!CreateProcess(NULL, cmdline, NULL, NULL, TRUE, 0,
- NULL, NULL, &start_info, &proc_info))
- die_windows_error("CreateProcess");
-
- free(cmdline);
-
- if (!CloseHandle(proc_info.hThread))
- die_windows_error("CloseHandle for thread");
- if (!CloseHandle(in_read_handle))
- die_windows_error("CloseHandle");
-
- pl->proc_handle = proc_info.hProcess;
- pl->infd = _open_osfhandle((intptr_t)in_write_handle, 0);
+ HANDLE in_read_handle, in_write_handle;
+ SECURITY_ATTRIBUTES sec_attr;
+ PROCESS_INFORMATION proc_info;
+ STARTUPINFO start_info;
+ char *cmdline = make_command_line(argv);
+
+ sec_attr.nLength = sizeof sec_attr;
+ sec_attr.bInheritHandle = TRUE;
+ sec_attr.lpSecurityDescriptor = NULL;
+
+ if (!CreatePipe(&in_read_handle, &in_write_handle, &sec_attr, 0)) {
+ die_windows_error("CreatePipe");
+ }
+
+ if (!SetHandleInformation(in_write_handle, HANDLE_FLAG_INHERIT, 0)) {
+ die_windows_error("SetHandleInformation");
+ }
+
+ /* when in Rome... */
+ ZeroMemory(&proc_info, sizeof proc_info);
+ ZeroMemory(&start_info, sizeof start_info);
+
+ start_info.cb = sizeof start_info;
+ start_info.dwFlags |= STARTF_USESTDHANDLES;
+
+ if ((start_info.hStdError = GetStdHandle(STD_ERROR_HANDLE))
+ == INVALID_HANDLE_VALUE
+ || (start_info.hStdOutput = GetStdHandle(STD_OUTPUT_HANDLE))
+ == INVALID_HANDLE_VALUE) {
+ die_windows_error("GetStdHandle");
+ }
+
+ start_info.hStdInput = in_read_handle;
+
+ if (!CreateProcess(NULL, cmdline, NULL, NULL, TRUE, 0,
+ NULL, NULL, &start_info, &proc_info)) {
+ die_windows_error("CreateProcess");
+ }
+
+ free(cmdline);
+
+ if (!CloseHandle(proc_info.hThread)) {
+ die_windows_error("CloseHandle for thread");
+ }
+ if (!CloseHandle(in_read_handle)) {
+ die_windows_error("CloseHandle");
+ }
+
+ pl->proc_handle = proc_info.hProcess;
+ pl->infd = _open_osfhandle((intptr_t)in_write_handle, 0);
}
int finish_pipeline(struct pipeline *pl)
{
- DWORD code;
-
- if (close(pl->infd))
- die_errno(errno, "close");
-
- for (;;) {
- if (!GetExitCodeProcess(pl->proc_handle, &code))
- die_windows_error("GetExitCodeProcess");
- if (code != STILL_ACTIVE)
- break;
-
- if (WaitForSingleObject(pl->proc_handle, INFINITE)
- == WAIT_FAILED)
- die_windows_error("WaitForSingleObject");
- }
-
- if (!CloseHandle(pl->proc_handle))
- die_windows_error("CloseHandle for process");
-
- return code;
+ DWORD code;
+
+ if (close(pl->infd)) {
+ die_errno(errno, "close");
+ }
+
+ for (;;) {
+ if (!GetExitCodeProcess(pl->proc_handle, &code)) {
+ die_windows_error("GetExitCodeProcess");
+ }
+ if (code != STILL_ACTIVE) {
+ break;
+ }
+
+ if (WaitForSingleObject(pl->proc_handle, INFINITE)
+ == WAIT_FAILED) {
+ die_windows_error("WaitForSingleObject");
+ }
+ }
+
+ if (!CloseHandle(pl->proc_handle)) {
+ die_windows_error("CloseHandle for process");
+ }
+
+ return code;
}
diff --git a/tools/win32/process.h b/tools/win32/process.h
index 1429ad2..88f2c7a 100644
--- a/tools/win32/process.h
+++ b/tools/win32/process.h
@@ -34,8 +34,8 @@
#include <windef.h>
struct pipeline {
- HANDLE proc_handle;
- int infd;
+ HANDLE proc_handle;
+ int infd;
};
extern void pipeline(const char *const *argv, struct pipeline *pl);