summaryrefslogtreecommitdiff
path: root/tools
diff options
context:
space:
mode:
authorAlan Antonuk <alan.antonuk@gmail.com>2017-12-04 23:23:22 -0800
committerAlan Antonuk <alan.antonuk@gmail.com>2017-12-05 00:19:29 -0800
commitafaab64a3f4c6977d66811cb7235431a085de0b2 (patch)
treee59fbd4ca9cbb8fcb0649329bc2b3c87b0ed3191 /tools
parent67048550b9ac3957fb29b7f9e7bf4b8ee3e9bc73 (diff)
downloadrabbitmq-c-afaab64a3f4c6977d66811cb7235431a085de0b2.tar.gz
Format code with clang-format
Diffstat (limited to 'tools')
-rw-r--r--tools/common.c213
-rw-r--r--tools/common.h11
-rw-r--r--tools/consume.c136
-rw-r--r--tools/declare_queue.c30
-rw-r--r--tools/delete_queue.c33
-rw-r--r--tools/get.c21
-rw-r--r--tools/publish.c98
-rw-r--r--tools/unix/process.c10
-rw-r--r--tools/win32/compat.c9
-rw-r--r--tools/win32/process.c74
10 files changed, 245 insertions, 390 deletions
diff --git a/tools/common.c b/tools/common.c
index f305378..13839a8 100644
--- a/tools/common.c
+++ b/tools/common.c
@@ -54,8 +54,7 @@
#include "compat.h"
#endif
-void die(const char *fmt, ...)
-{
+void die(const char *fmt, ...) {
va_list ap;
va_start(ap, fmt);
vfprintf(stderr, fmt, ap);
@@ -64,8 +63,7 @@ void die(const char *fmt, ...)
exit(1);
}
-void die_errno(int err, const char *fmt, ...)
-{
+void die_errno(int err, const char *fmt, ...) {
va_list ap;
if (err == 0) {
@@ -79,8 +77,7 @@ void die_errno(int err, const char *fmt, ...)
exit(1);
}
-void die_amqp_error(int err, const char *fmt, ...)
-{
+void die_amqp_error(int err, const char *fmt, ...) {
va_list ap;
if (err >= 0) {
@@ -94,63 +91,56 @@ void die_amqp_error(int err, const char *fmt, ...)
exit(1);
}
-const char *amqp_server_exception_string(amqp_rpc_reply_t r)
-{
+const char *amqp_server_exception_string(amqp_rpc_reply_t r) {
int res;
static char s[512];
switch (r.reply.id) {
- case AMQP_CONNECTION_CLOSE_METHOD: {
- amqp_connection_close_t *m
- = (amqp_connection_close_t *)r.reply.decoded;
- res = snprintf(s, sizeof(s), "server connection error %d, message: %.*s",
- m->reply_code,
- (int)m->reply_text.len,
- (char *)m->reply_text.bytes);
- break;
- }
+ case AMQP_CONNECTION_CLOSE_METHOD: {
+ amqp_connection_close_t *m = (amqp_connection_close_t *)r.reply.decoded;
+ res = snprintf(s, sizeof(s), "server connection error %d, message: %.*s",
+ m->reply_code, (int)m->reply_text.len,
+ (char *)m->reply_text.bytes);
+ break;
+ }
- case AMQP_CHANNEL_CLOSE_METHOD: {
- amqp_channel_close_t *m
- = (amqp_channel_close_t *)r.reply.decoded;
- res = snprintf(s, sizeof(s), "server channel error %d, message: %.*s",
- m->reply_code,
- (int)m->reply_text.len,
- (char *)m->reply_text.bytes);
- break;
- }
+ case AMQP_CHANNEL_CLOSE_METHOD: {
+ amqp_channel_close_t *m = (amqp_channel_close_t *)r.reply.decoded;
+ res = snprintf(s, sizeof(s), "server channel error %d, message: %.*s",
+ m->reply_code, (int)m->reply_text.len,
+ (char *)m->reply_text.bytes);
+ break;
+ }
- default:
- res = snprintf(s, sizeof(s), "unknown server error, method id 0x%08X",
- r.reply.id);
- break;
+ default:
+ res = snprintf(s, sizeof(s), "unknown server error, method id 0x%08X",
+ r.reply.id);
+ break;
}
return res >= 0 ? s : NULL;
}
-const char *amqp_rpc_reply_string(amqp_rpc_reply_t r)
-{
+const char *amqp_rpc_reply_string(amqp_rpc_reply_t r) {
switch (r.reply_type) {
- case AMQP_RESPONSE_NORMAL:
- return "normal response";
+ case AMQP_RESPONSE_NORMAL:
+ return "normal response";
- case AMQP_RESPONSE_NONE:
- return "missing RPC reply type";
+ case AMQP_RESPONSE_NONE:
+ return "missing RPC reply type";
- case AMQP_RESPONSE_LIBRARY_EXCEPTION:
- return amqp_error_string2(r.library_error);
+ case AMQP_RESPONSE_LIBRARY_EXCEPTION:
+ return amqp_error_string2(r.library_error);
- case AMQP_RESPONSE_SERVER_EXCEPTION:
- return amqp_server_exception_string(r);
+ case AMQP_RESPONSE_SERVER_EXCEPTION:
+ return amqp_server_exception_string(r);
- default:
- abort();
+ default:
+ abort();
}
}
-void die_rpc(amqp_rpc_reply_t r, const char *fmt, ...)
-{
+void die_rpc(amqp_rpc_reply_t r, const char *fmt, ...) {
va_list ap;
if (r.reply_type == AMQP_RESPONSE_NORMAL) {
@@ -180,57 +170,31 @@ static char *amqp_cert = NULL;
const char *connect_options_title = "Connection options";
struct poptOption connect_options[] = {
- {
- "url", 'u', POPT_ARG_STRING, &amqp_url, 0,
- "the AMQP URL to connect to", "amqp://..."
- },
- {
- "server", 's', POPT_ARG_STRING, &amqp_server, 0,
- "the AMQP server to connect to", "hostname"
- },
- {
- "port", 0, POPT_ARG_INT, &amqp_port, 0,
- "the port to connect on", "port"
- },
- {
- "vhost", 0, POPT_ARG_STRING, &amqp_vhost, 0,
- "the vhost to use when connecting", "vhost"
- },
- {
- "username", 0, POPT_ARG_STRING, &amqp_username, 0,
- "the username to login with", "username"
- },
- {
- "password", 0, POPT_ARG_STRING, &amqp_password, 0,
- "the password to login with", "password"
- },
- {
- "heartbeat", 0, POPT_ARG_INT, &amqp_heartbeat, 0,
- "heartbeat interval, set to 0 to disable", "heartbeat"
- },
+ {"url", 'u', POPT_ARG_STRING, &amqp_url, 0, "the AMQP URL to connect to",
+ "amqp://..."},
+ {"server", 's', POPT_ARG_STRING, &amqp_server, 0,
+ "the AMQP server to connect to", "hostname"},
+ {"port", 0, POPT_ARG_INT, &amqp_port, 0, "the port to connect on", "port"},
+ {"vhost", 0, POPT_ARG_STRING, &amqp_vhost, 0,
+ "the vhost to use when connecting", "vhost"},
+ {"username", 0, POPT_ARG_STRING, &amqp_username, 0,
+ "the username to login with", "username"},
+ {"password", 0, POPT_ARG_STRING, &amqp_password, 0,
+ "the password to login with", "password"},
+ {"heartbeat", 0, POPT_ARG_INT, &amqp_heartbeat, 0,
+ "heartbeat interval, set to 0 to disable", "heartbeat"},
#ifdef WITH_SSL
- {
- "ssl", 0, POPT_ARG_NONE, &amqp_ssl, 0,
- "connect over SSL/TLS", NULL
- },
- {
- "cacert", 0, POPT_ARG_STRING, &amqp_cacert, 0,
- "path to the CA certificate file", "cacert.pem"
- },
- {
- "key", 0, POPT_ARG_STRING, &amqp_key, 0,
- "path to the client private key file", "key.pem"
- },
- {
- "cert", 0, POPT_ARG_STRING, &amqp_cert, 0,
- "path to the client certificate file", "cert.pem"
- },
+ {"ssl", 0, POPT_ARG_NONE, &amqp_ssl, 0, "connect over SSL/TLS", NULL},
+ {"cacert", 0, POPT_ARG_STRING, &amqp_cacert, 0,
+ "path to the CA certificate file", "cacert.pem"},
+ {"key", 0, POPT_ARG_STRING, &amqp_key, 0,
+ "path to the client private key file", "key.pem"},
+ {"cert", 0, POPT_ARG_STRING, &amqp_cert, 0,
+ "path to the client certificate file", "cert.pem"},
#endif /* WITH_SSL */
- { NULL, '\0', 0, NULL, 0, NULL, NULL }
-};
+ {NULL, '\0', 0, NULL, 0, NULL, NULL}};
-static void init_connection_info(struct amqp_connection_info *ci)
-{
+static void init_connection_info(struct amqp_connection_info *ci) {
ci->user = NULL;
ci->password = NULL;
ci->host = NULL;
@@ -241,8 +205,8 @@ static void init_connection_info(struct amqp_connection_info *ci)
amqp_default_connection_info(ci);
if (amqp_url)
- die_amqp_error(amqp_parse_url(strdup(amqp_url), ci),
- "Parsing URL '%s'", amqp_url);
+ die_amqp_error(amqp_parse_url(strdup(amqp_url), ci), "Parsing URL '%s'",
+ amqp_url);
if (amqp_server) {
char *colon;
@@ -260,8 +224,8 @@ static void init_connection_info(struct amqp_connection_info *ci)
--server option, because it is not ipv6 friendly.
--url now allows connection options to be
specified concisely. */
- fprintf(stderr, "Specifying the port number with"
- " --server is deprecated\n");
+ fprintf(stderr,
+ "Specifying the port number with --server is deprecated\n");
host_len = colon - amqp_server;
ci->host = malloc(host_len + 1);
@@ -272,13 +236,10 @@ static void init_connection_info(struct amqp_connection_info *ci)
die("both --server and --port options specify server port");
}
- ci->port = strtol(colon+1, &port_end, 10);
- if (ci->port < 0
- || ci->port > 65535
- || port_end == colon+1
- || *port_end != 0)
- die("bad server port number in '%s'",
- amqp_server);
+ ci->port = strtol(colon + 1, &port_end, 10);
+ if (ci->port < 0 || ci->port > 65535 || port_end == colon + 1 ||
+ *port_end != 0)
+ die("bad server port number in '%s'", amqp_server);
}
#if WITH_SSL
@@ -326,8 +287,7 @@ static void init_connection_info(struct amqp_connection_info *ci)
}
}
-amqp_connection_state_t make_connection(void)
-{
+amqp_connection_state_t make_connection(void) {
int status;
amqp_socket_t *socket = NULL;
struct amqp_connection_info ci;
@@ -361,8 +321,7 @@ amqp_connection_state_t make_connection(void)
die("opening socket to %s:%d", ci.host, ci.port);
}
die_rpc(amqp_login(conn, ci.vhost, 0, 131072, amqp_heartbeat,
- AMQP_SASL_METHOD_PLAIN,
- ci.user, ci.password),
+ AMQP_SASL_METHOD_PLAIN, ci.user, ci.password),
"logging in to AMQP server");
if (!amqp_channel_open(conn, 1)) {
die_rpc(amqp_get_rpc_reply(conn), "opening channel");
@@ -370,11 +329,9 @@ amqp_connection_state_t make_connection(void)
return conn;
}
-void close_connection(amqp_connection_state_t conn)
-{
+void close_connection(amqp_connection_state_t conn) {
int res;
- die_rpc(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS),
- "closing channel");
+ die_rpc(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "closing channel");
die_rpc(amqp_connection_close(conn, AMQP_REPLY_SUCCESS),
"closing connection");
@@ -382,8 +339,7 @@ void close_connection(amqp_connection_state_t conn)
die_amqp_error(res, "closing connection");
}
-amqp_bytes_t read_all(int fd)
-{
+amqp_bytes_t read_all(int fd) {
size_t space = 4096;
amqp_bytes_t bytes;
@@ -391,8 +347,7 @@ amqp_bytes_t read_all(int fd)
bytes.len = 0;
for (;;) {
- ssize_t res = read(fd, (char *)bytes.bytes + bytes.len,
- space-bytes.len);
+ ssize_t res = read(fd, (char *)bytes.bytes + bytes.len, space - bytes.len);
if (res == 0) {
break;
}
@@ -415,8 +370,7 @@ amqp_bytes_t read_all(int fd)
return bytes;
}
-void write_all(int fd, amqp_bytes_t data)
-{
+void write_all(int fd, amqp_bytes_t data) {
while (data.len > 0) {
ssize_t res = write(fd, data.bytes, data.len);
if (res < 0) {
@@ -428,16 +382,14 @@ void write_all(int fd, amqp_bytes_t data)
}
}
-void copy_body(amqp_connection_state_t conn, int fd)
-{
+void copy_body(amqp_connection_state_t conn, int fd) {
size_t body_remaining;
amqp_frame_t frame;
int res = amqp_simple_wait_frame(conn, &frame);
die_amqp_error(res, "waiting for header frame");
if (frame.frame_type != AMQP_FRAME_HEADER) {
- die("expected header, got frame type 0x%X",
- frame.frame_type);
+ die("expected header, got frame type 0x%X", frame.frame_type);
}
body_remaining = frame.payload.properties.body_size;
@@ -445,8 +397,7 @@ void copy_body(amqp_connection_state_t conn, int fd)
res = amqp_simple_wait_frame(conn, &frame);
die_amqp_error(res, "waiting for body frame");
if (frame.frame_type != AMQP_FRAME_BODY) {
- die("expected body, got frame type 0x%X",
- frame.frame_type);
+ die("expected body, got frame type 0x%X", frame.frame_type);
}
write_all(fd, frame.payload.body_fragment);
@@ -455,9 +406,7 @@ void copy_body(amqp_connection_state_t conn, int fd)
}
poptContext process_options(int argc, const char **argv,
- struct poptOption *options,
- const char *help)
-{
+ struct poptOption *options, const char *help) {
int c;
poptContext opts = poptGetContext(NULL, argc, argv, options, 0);
poptSetOtherOptionHelp(opts, help);
@@ -467,8 +416,7 @@ poptContext process_options(int argc, const char **argv,
}
if (c < -1) {
- fprintf(stderr, "%s: %s\n",
- poptBadOption(opts, POPT_BADOPTION_NOALIAS),
+ fprintf(stderr, "%s: %s\n", poptBadOption(opts, POPT_BADOPTION_NOALIAS),
poptStrerror(c));
poptPrintUsage(opts, stderr, 0);
exit(1);
@@ -478,10 +426,8 @@ poptContext process_options(int argc, const char **argv,
}
void process_all_options(int argc, const char **argv,
- struct poptOption *options)
-{
- poptContext opts = process_options(argc, argv, options,
- "[OPTIONS]...");
+ struct poptOption *options) {
+ poptContext opts = process_options(argc, argv, options, "[OPTIONS]...");
const char *opt = poptPeekArg(opts);
if (opt) {
@@ -493,7 +439,6 @@ void process_all_options(int argc, const char **argv,
poptFreeContext(opts);
}
-amqp_bytes_t cstring_bytes(const char *str)
-{
+amqp_bytes_t cstring_bytes(const char *str) {
return str ? amqp_cstring_bytes(str) : amqp_empty_bytes;
}
diff --git a/tools/common.h b/tools/common.h
index 642d1b3..36b5153 100644
--- a/tools/common.h
+++ b/tools/common.h
@@ -43,14 +43,13 @@
extern const char *amqp_server_exception_string(amqp_rpc_reply_t r);
extern const char *amqp_rpc_reply_string(amqp_rpc_reply_t r);
-extern void die(const char *fmt, ...)
-__attribute__ ((format (printf, 1, 2)));
+extern void die(const char *fmt, ...) __attribute__((format(printf, 1, 2)));
extern void die_errno(int err, const char *fmt, ...)
-__attribute__ ((format (printf, 2, 3)));
+ __attribute__((format(printf, 2, 3)));
extern void die_amqp_error(int err, const char *fmt, ...)
-__attribute__ ((format (printf, 2, 3)));
+ __attribute__((format(printf, 2, 3)));
extern void die_rpc(amqp_rpc_reply_t r, const char *fmt, ...)
-__attribute__ ((format (printf, 2, 3)));
+ __attribute__((format(printf, 2, 3)));
extern const char *connect_options_title;
extern struct poptOption connect_options[];
@@ -63,7 +62,7 @@ extern void write_all(int fd, amqp_bytes_t data);
extern void copy_body(amqp_connection_state_t conn, int fd);
#define INCLUDE_OPTIONS(options) \
- {NULL, 0, POPT_ARG_INCLUDE_TABLE, options, 0, options ## _title, NULL}
+ { NULL, 0, POPT_ARG_INCLUDE_TABLE, options, 0, options##_title, NULL }
extern poptContext process_options(int argc, const char **argv,
struct poptOption *options,
diff --git a/tools/consume.c b/tools/consume.c
index 485a0cf..dbc164a 100644
--- a/tools/consume.c
+++ b/tools/consume.c
@@ -49,8 +49,7 @@
/* Convert a amqp_bytes_t to an escaped string form for printing. We
use the same escaping conventions as rabbitmqctl. */
-static char *stringify_bytes(amqp_bytes_t bytes)
-{
+static char *stringify_bytes(amqp_bytes_t bytes) {
/* We will need up to 4 chars per byte, plus the terminating 0 */
char *res = malloc(bytes.len * 4 + 1);
uint8_t *data = bytes.bytes;
@@ -72,11 +71,9 @@ static char *stringify_bytes(amqp_bytes_t bytes)
return res;
}
-static amqp_bytes_t setup_queue(amqp_connection_state_t conn,
- char *queue, char *exchange,
- char *routing_key, int declare,
- int exclusive)
-{
+static amqp_bytes_t setup_queue(amqp_connection_state_t conn, char *queue,
+ char *exchange, char *routing_key, int declare,
+ int exclusive) {
amqp_bytes_t queue_bytes = cstring_bytes(queue);
char *routing_key_rest;
@@ -84,19 +81,19 @@ static amqp_bytes_t setup_queue(amqp_connection_state_t conn,
char *routing_tmp;
int routing_key_count = 0;
- /* if an exchange name wasn't provided, check that we don't
- have options that require it. */
+ /* if an exchange name wasn't provided, check that we don't have options that
+ * require it. */
if (!exchange && routing_key) {
- fprintf(stderr, "--routing-key option requires an exchange"
- " name to be provided with --exchange\n");
+ fprintf(stderr,
+ "--routing-key option requires an exchange name to be provided "
+ "with --exchange\n");
exit(1);
}
if (!queue || exchange || declare || exclusive) {
/* Declare the queue as auto-delete. */
- amqp_queue_declare_ok_t *res = amqp_queue_declare(conn, 1,
- queue_bytes, 0, 0, exclusive, 1,
- amqp_empty_table);
+ amqp_queue_declare_ok_t *res = amqp_queue_declare(
+ conn, 1, queue_bytes, 0, 0, exclusive, 1, amqp_empty_table);
if (!res) {
die_rpc(amqp_get_rpc_reply(conn), "queue.declare");
}
@@ -106,8 +103,7 @@ static amqp_bytes_t setup_queue(amqp_connection_state_t conn,
char *sq;
queue_bytes = amqp_bytes_malloc_dup(res->queue);
sq = stringify_bytes(queue_bytes);
- fprintf(stderr, "Server provided queue name: %s\n",
- sq);
+ fprintf(stderr, "Server provided queue name: %s\n", sq);
free(sq);
}
@@ -115,17 +111,17 @@ static amqp_bytes_t setup_queue(amqp_connection_state_t conn,
if (exchange) {
amqp_bytes_t eb = amqp_cstring_bytes(exchange);
- routing_tmp = strdup( routing_key );
- if ( NULL == routing_tmp ) {
- fprintf(stderr, "could not allocate memory to parse routing key\n" );
+ routing_tmp = strdup(routing_key);
+ if (NULL == routing_tmp) {
+ fprintf(stderr, "could not allocate memory to parse routing key\n");
exit(1);
}
- for (
- routing_key_token = strtok_r( routing_tmp, LISTEN_KEYS_DELIMITER, &routing_key_rest )
- ; NULL != routing_key_token && routing_key_count < MAX_LISTEN_KEYS - 1
- ; routing_key_token = strtok_r( NULL, LISTEN_KEYS_DELIMITER, &routing_key_rest )
- ) {
+ for (routing_key_token =
+ strtok_r(routing_tmp, LISTEN_KEYS_DELIMITER, &routing_key_rest);
+ NULL != routing_key_token && routing_key_count < MAX_LISTEN_KEYS - 1;
+ routing_key_token =
+ strtok_r(NULL, LISTEN_KEYS_DELIMITER, &routing_key_rest)) {
if (!amqp_queue_bind(conn, 1, queue_bytes, eb,
cstring_bytes(routing_key_token),
@@ -133,7 +129,7 @@ static amqp_bytes_t setup_queue(amqp_connection_state_t conn,
die_rpc(amqp_get_rpc_reply(conn), "queue.bind");
}
}
- free( routing_tmp );
+ free(routing_tmp);
}
}
@@ -144,13 +140,12 @@ static amqp_bytes_t setup_queue(amqp_connection_state_t conn,
static void do_consume(amqp_connection_state_t conn, amqp_bytes_t queue,
int no_ack, int count, int prefetch_count,
- const char *const *argv)
-{
+ const char *const *argv) {
int i;
/* If there is a limit, set the qos to match */
- if (count > 0 && count <= AMQP_CONSUME_MAX_PREFETCH_COUNT
- && !amqp_basic_qos(conn, 1, 0, count, 0)) {
+ if (count > 0 && count <= AMQP_CONSUME_MAX_PREFETCH_COUNT &&
+ !amqp_basic_qos(conn, 1, 0, count, 0)) {
die_rpc(amqp_get_rpc_reply(conn), "basic.qos");
}
@@ -159,15 +154,16 @@ static void do_consume(amqp_connection_state_t conn, amqp_bytes_t queue,
if (prefetch_count > 0 && prefetch_count <= AMQP_CONSUME_MAX_PREFETCH_COUNT) {
/* the maximum number of messages to be received at a time must be less
* than the global maximum number of messages. */
- if (!(count > 0 && count <= AMQP_CONSUME_MAX_PREFETCH_COUNT && prefetch_count >= count)) {
+ if (!(count > 0 && count <= AMQP_CONSUME_MAX_PREFETCH_COUNT &&
+ prefetch_count >= count)) {
if (!amqp_basic_qos(conn, 1, 0, prefetch_count, 0)) {
die_rpc(amqp_get_rpc_reply(conn), "basic.qos");
}
}
}
- if (!amqp_basic_consume(conn, 1, queue, amqp_empty_bytes, 0, no_ack,
- 0, amqp_empty_table)) {
+ if (!amqp_basic_consume(conn, 1, queue, amqp_empty_bytes, 0, no_ack, 0,
+ amqp_empty_table)) {
die_rpc(amqp_get_rpc_reply(conn), "basic.consume");
}
@@ -179,8 +175,8 @@ static void do_consume(amqp_connection_state_t conn, amqp_bytes_t queue,
int res = amqp_simple_wait_frame(conn, &frame);
die_amqp_error(res, "waiting for header frame");
- if (frame.frame_type != AMQP_FRAME_METHOD
- || frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) {
+ if (frame.frame_type != AMQP_FRAME_METHOD ||
+ frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) {
continue;
}
@@ -191,16 +187,13 @@ static void do_consume(amqp_connection_state_t conn, amqp_bytes_t queue,
copy_body(conn, pl.infd);
if (finish_pipeline(&pl) && !no_ack)
- die_amqp_error(amqp_basic_ack(conn, 1, delivery_tag,
- 0),
- "basic.ack");
+ die_amqp_error(amqp_basic_ack(conn, 1, delivery_tag, 0), "basic.ack");
amqp_maybe_release_buffers(conn);
}
}
-int main(int argc, const char **argv)
-{
+int main(int argc, const char **argv) {
poptContext opts;
amqp_connection_state_t conn;
const char *const *cmd_argv;
@@ -215,47 +208,27 @@ int main(int argc, const char **argv)
amqp_bytes_t queue_bytes;
struct poptOption options[] = {
- INCLUDE_OPTIONS(connect_options),
- {
- "queue", 'q', POPT_ARG_STRING, &queue, 0,
- "the queue to consume from", "queue"
- },
- {
- "exchange", 'e', POPT_ARG_STRING, &exchange, 0,
- "bind the queue to this exchange", "exchange"
- },
- {
- "routing-key", 'r', POPT_ARG_STRING, &routing_key, 0,
- "the routing key to bind with", "routing key"
- },
- {
- "declare", 'd', POPT_ARG_NONE, &declare, 0,
- "declare an exclusive queue (deprecated, use --exclusive instead)", NULL
- },
- {
- "exclusive", 'x', POPT_ARG_NONE, &exclusive, 0,
- "declare the queue as exclusive", NULL
- },
- {
- "no-ack", 'A', POPT_ARG_NONE, &no_ack, 0,
- "consume in no-ack mode", NULL
- },
- {
- "count", 'c', POPT_ARG_INT, &count, 0,
- "stop consuming after this many messages are consumed",
- "limit"
- },
- {
- "prefetch-count", 'p', POPT_ARG_INT, &prefetch_count, 0,
- "receive only this many message at a time from the server",
- "limit"
- },
- POPT_AUTOHELP
- { NULL, '\0', 0, NULL, 0, NULL, NULL }
- };
-
- opts = process_options(argc, argv, options,
- "[OPTIONS]... <command> <args>");
+ INCLUDE_OPTIONS(connect_options),
+ {"queue", 'q', POPT_ARG_STRING, &queue, 0, "the queue to consume from",
+ "queue"},
+ {"exchange", 'e', POPT_ARG_STRING, &exchange, 0,
+ "bind the queue to this exchange", "exchange"},
+ {"routing-key", 'r', POPT_ARG_STRING, &routing_key, 0,
+ "the routing key to bind with", "routing key"},
+ {"declare", 'd', POPT_ARG_NONE, &declare, 0,
+ "declare an exclusive queue (deprecated, use --exclusive instead)",
+ NULL},
+ {"exclusive", 'x', POPT_ARG_NONE, &exclusive, 0,
+ "declare the queue as exclusive", NULL},
+ {"no-ack", 'A', POPT_ARG_NONE, &no_ack, 0, "consume in no-ack mode",
+ NULL},
+ {"count", 'c', POPT_ARG_INT, &count, 0,
+ "stop consuming after this many messages are consumed", "limit"},
+ {"prefetch-count", 'p', POPT_ARG_INT, &prefetch_count, 0,
+ "receive only this many message at a time from the server", "limit"},
+ POPT_AUTOHELP{NULL, '\0', 0, NULL, 0, NULL, NULL}};
+
+ opts = process_options(argc, argv, options, "[OPTIONS]... <command> <args>");
cmd_argv = poptGetArgs(opts);
if (!cmd_argv || !cmd_argv[0]) {
@@ -265,7 +238,8 @@ int main(int argc, const char **argv)
}
conn = make_connection();
- queue_bytes = setup_queue(conn, queue, exchange, routing_key, declare, exclusive);
+ queue_bytes =
+ setup_queue(conn, queue, exchange, routing_key, declare, exclusive);
do_consume(conn, queue_bytes, no_ack, count, prefetch_count, cmd_argv);
close_connection(conn);
return 0;
diff --git a/tools/declare_queue.c b/tools/declare_queue.c
index 15f0b2a..0b98580 100644
--- a/tools/declare_queue.c
+++ b/tools/declare_queue.c
@@ -44,25 +44,18 @@
#include "common.h"
-int main(int argc, const char **argv)
-{
+int main(int argc, const char **argv) {
amqp_connection_state_t conn;
static char *queue = NULL;
static int durable = 0;
struct poptOption options[] = {
- INCLUDE_OPTIONS(connect_options),
- {
- "queue", 'q', POPT_ARG_STRING, &queue, 0,
- "the queue name to declare, or the empty string", "queue"
- },
- {
- "durable", 'd', POPT_ARG_VAL, &durable, 1,
- "declare a durable queue", NULL
- },
- POPT_AUTOHELP
- { NULL, '\0', 0, NULL, 0, NULL, NULL }
- };
+ INCLUDE_OPTIONS(connect_options),
+ {"queue", 'q', POPT_ARG_STRING, &queue, 0,
+ "the queue name to declare, or the empty string", "queue"},
+ {"durable", 'd', POPT_ARG_VAL, &durable, 1, "declare a durable queue",
+ NULL},
+ POPT_AUTOHELP{NULL, '\0', 0, NULL, 0, NULL, NULL}};
process_all_options(argc, argv, options);
@@ -73,13 +66,8 @@ int main(int argc, const char **argv)
conn = make_connection();
{
- amqp_queue_declare_ok_t *reply = amqp_queue_declare(conn, 1,
- cstring_bytes(queue),
- 0,
- durable,
- 0,
- 0,
- amqp_empty_table);
+ amqp_queue_declare_ok_t *reply = amqp_queue_declare(
+ conn, 1, cstring_bytes(queue), 0, durable, 0, 0, amqp_empty_table);
if (reply == NULL) {
die_rpc(amqp_get_rpc_reply(conn), "queue.declare");
}
diff --git a/tools/delete_queue.c b/tools/delete_queue.c
index 6ac0130..f9d01ab 100644
--- a/tools/delete_queue.c
+++ b/tools/delete_queue.c
@@ -44,30 +44,21 @@
#include "common.h"
-int main(int argc, const char **argv)
-{
+int main(int argc, const char **argv) {
amqp_connection_state_t conn;
static char *queue = NULL;
static int if_unused = 0;
static int if_empty = 0;
struct poptOption options[] = {
- INCLUDE_OPTIONS(connect_options),
- {
- "queue", 'q', POPT_ARG_STRING, &queue, 0,
- "the queue name to delete", "queue"
- },
- {
- "if-unused", 'u', POPT_ARG_VAL, &if_unused, 1,
- "do not delete unless queue is unused", NULL
- },
- {
- "if-empty", 'e', POPT_ARG_VAL, &if_empty, 1,
- "do not delete unless queue is empty", NULL
- },
- POPT_AUTOHELP
- { NULL, '\0', 0, NULL, 0, NULL, NULL }
- };
+ INCLUDE_OPTIONS(connect_options),
+ {"queue", 'q', POPT_ARG_STRING, &queue, 0, "the queue name to delete",
+ "queue"},
+ {"if-unused", 'u', POPT_ARG_VAL, &if_unused, 1,
+ "do not delete unless queue is unused", NULL},
+ {"if-empty", 'e', POPT_ARG_VAL, &if_empty, 1,
+ "do not delete unless queue is empty", NULL},
+ POPT_AUTOHELP{NULL, '\0', 0, NULL, 0, NULL, NULL}};
process_all_options(argc, argv, options);
@@ -78,10 +69,8 @@ int main(int argc, const char **argv)
conn = make_connection();
{
- amqp_queue_delete_ok_t *reply = amqp_queue_delete(conn, 1,
- cstring_bytes(queue),
- if_unused,
- if_empty);
+ amqp_queue_delete_ok_t *reply =
+ amqp_queue_delete(conn, 1, cstring_bytes(queue), if_unused, if_empty);
if (reply == NULL) {
die_rpc(amqp_get_rpc_reply(conn), "queue.delete");
}
diff --git a/tools/get.c b/tools/get.c
index 598a014..f418e2f 100644
--- a/tools/get.c
+++ b/tools/get.c
@@ -41,10 +41,8 @@
#include "common.h"
-static int do_get(amqp_connection_state_t conn, char *queue)
-{
- amqp_rpc_reply_t r
- = amqp_basic_get(conn, 1, cstring_bytes(queue), 1);
+static int do_get(amqp_connection_state_t conn, char *queue) {
+ amqp_rpc_reply_t r = amqp_basic_get(conn, 1, cstring_bytes(queue), 1);
die_rpc(r, "basic.get");
if (r.reply.id == AMQP_BASIC_GET_EMPTY_METHOD) {
@@ -55,21 +53,16 @@ static int do_get(amqp_connection_state_t conn, char *queue)
return 1;
}
-int main(int argc, const char **argv)
-{
+int main(int argc, const char **argv) {
amqp_connection_state_t conn;
static char *queue = NULL;
int got_something;
struct poptOption options[] = {
- INCLUDE_OPTIONS(connect_options),
- {
- "queue", 'q', POPT_ARG_STRING, &queue, 0,
- "the queue to consume from", "queue"
- },
- POPT_AUTOHELP
- { NULL, '\0', 0, NULL, 0, NULL, NULL }
- };
+ INCLUDE_OPTIONS(connect_options),
+ {"queue", 'q', POPT_ARG_STRING, &queue, 0, "the queue to consume from",
+ "queue"},
+ POPT_AUTOHELP{NULL, '\0', 0, NULL, 0, NULL, NULL}};
process_all_options(argc, argv, options);
diff --git a/tools/publish.c b/tools/publish.c
index 881004d..b2a2a1e 100644
--- a/tools/publish.c
+++ b/tools/publish.c
@@ -45,19 +45,15 @@
#define MAX_LINE_LENGTH 1024 * 32
-static void do_publish(amqp_connection_state_t conn,
- char *exchange, char *routing_key,
- amqp_basic_properties_t *props, amqp_bytes_t body)
-{
- int res = amqp_basic_publish(conn, 1,
- cstring_bytes(exchange),
- cstring_bytes(routing_key),
- 0, 0, props, body);
+static void do_publish(amqp_connection_state_t conn, char *exchange,
+ char *routing_key, amqp_basic_properties_t *props,
+ amqp_bytes_t body) {
+ int res = amqp_basic_publish(conn, 1, cstring_bytes(exchange),
+ cstring_bytes(routing_key), 0, 0, props, body);
die_amqp_error(res, "basic.publish");
}
-int main(int argc, const char **argv)
-{
+int main(int argc, const char **argv) {
amqp_connection_state_t conn;
static char *exchange = NULL;
static char *routing_key = NULL;
@@ -73,53 +69,32 @@ int main(int argc, const char **argv)
static char **pos;
struct poptOption options[] = {
- INCLUDE_OPTIONS(connect_options),
- {
- "exchange", 'e', POPT_ARG_STRING, &exchange, 0,
- "the exchange to publish to", "exchange"
- },
- {
- "routing-key", 'r', POPT_ARG_STRING, &routing_key, 0,
- "the routing key to publish with", "routing key"
- },
- {
- "persistent", 'p', POPT_ARG_VAL, &delivery, 2,
- "use the persistent delivery mode", NULL
- },
- {
- "content-type", 'C', POPT_ARG_STRING, &content_type, 0,
- "the content-type for the message", "content type"
- },
- {
- "reply-to", 't', POPT_ARG_STRING, &reply_to, 0,
- "the replyTo to use for the message", "reply to"
- },
- {
- "line-buffered", 'l', POPT_ARG_VAL, &line_buffered, 2,
- "treat each line from standard in as a separate message", NULL
- },
- {
- "content-encoding", 'E', POPT_ARG_STRING,
- &content_encoding, 0,
- "the content-encoding for the message", "content encoding"
- },
- {
- "header", 'H', POPT_ARG_ARGV, &headers, 0,
- "set a message header (may be specified multiple times)", "\"key: value\""
- },
- {
- "body", 'b', POPT_ARG_STRING, &body, 0,
- "specify the message body", "body"
- },
- POPT_AUTOHELP
- { NULL, '\0', 0, NULL, 0, NULL, NULL }
- };
+ INCLUDE_OPTIONS(connect_options),
+ {"exchange", 'e', POPT_ARG_STRING, &exchange, 0,
+ "the exchange to publish to", "exchange"},
+ {"routing-key", 'r', POPT_ARG_STRING, &routing_key, 0,
+ "the routing key to publish with", "routing key"},
+ {"persistent", 'p', POPT_ARG_VAL, &delivery, 2,
+ "use the persistent delivery mode", NULL},
+ {"content-type", 'C', POPT_ARG_STRING, &content_type, 0,
+ "the content-type for the message", "content type"},
+ {"reply-to", 't', POPT_ARG_STRING, &reply_to, 0,
+ "the replyTo to use for the message", "reply to"},
+ {"line-buffered", 'l', POPT_ARG_VAL, &line_buffered, 2,
+ "treat each line from standard in as a separate message", NULL},
+ {"content-encoding", 'E', POPT_ARG_STRING, &content_encoding, 0,
+ "the content-encoding for the message", "content encoding"},
+ {"header", 'H', POPT_ARG_ARGV, &headers, 0,
+ "set a message header (may be specified multiple times)",
+ "\"key: value\""},
+ {"body", 'b', POPT_ARG_STRING, &body, 0, "specify the message body",
+ "body"},
+ POPT_AUTOHELP{NULL, '\0', 0, NULL, 0, NULL, NULL}};
process_all_options(argc, argv, options);
if (!exchange && !routing_key) {
- fprintf(stderr,
- "neither exchange nor routing key specified\n");
+ fprintf(stderr, "neither exchange nor routing key specified\n");
return 1;
}
@@ -162,9 +137,10 @@ int main(int argc, const char **argv)
table->entries[i].value.kind = AMQP_FIELD_KIND_UTF8;
table->entries[i].value.value.bytes = amqp_cstring_bytes(colon);
i++;
- }
- else {
- fprintf(stderr, "Ignored header definition missing ':' delimiter in \"%s\"\n", *pos);
+ } else {
+ fprintf(stderr,
+ "Ignored header definition missing ':' delimiter in \"%s\"\n",
+ *pos);
}
}
props._flags |= AMQP_BASIC_HEADERS_FLAG;
@@ -176,10 +152,10 @@ int main(int argc, const char **argv)
if (body) {
body_bytes = amqp_cstring_bytes(body);
} else {
- if ( line_buffered ) {
- body_bytes.bytes = ( char * ) malloc( MAX_LINE_LENGTH );
- while ( fgets( body_bytes.bytes, MAX_LINE_LENGTH, stdin ) ) {
- body_bytes.len = strlen( body_bytes.bytes );
+ if (line_buffered) {
+ body_bytes.bytes = (char *)malloc(MAX_LINE_LENGTH);
+ while (fgets(body_bytes.bytes, MAX_LINE_LENGTH, stdin)) {
+ body_bytes.len = strlen(body_bytes.bytes);
do_publish(conn, exchange, routing_key, &props, body_bytes);
}
} else {
@@ -187,7 +163,7 @@ int main(int argc, const char **argv)
}
}
- if ( !line_buffered ) {
+ if (!line_buffered) {
do_publish(conn, exchange, routing_key, &props, body_bytes);
}
diff --git a/tools/unix/process.c b/tools/unix/process.c
index 987b49c..596f40a 100644
--- a/tools/unix/process.c
+++ b/tools/unix/process.c
@@ -37,18 +37,17 @@
#include "config.h"
#endif
-#include <unistd.h>
#include <errno.h>
#include <spawn.h>
#include <sys/wait.h>
+#include <unistd.h>
#include "common.h"
#include "process.h"
extern char **environ;
-void pipeline(const char *const *argv, struct pipeline *pl)
-{
+void pipeline(const char *const *argv, struct pipeline *pl) {
posix_spawn_file_actions_t file_acts;
int pipefds[2];
@@ -66,7 +65,7 @@ void pipeline(const char *const *argv, struct pipeline *pl)
"posix_spawn_file_actions_addclose");
die_errno(posix_spawnp(&pl->pid, argv[0], &file_acts, NULL,
- (char * const *)argv, environ),
+ (char *const *)argv, environ),
"posix_spawnp: %s", argv[0]);
die_errno(posix_spawn_file_actions_destroy(&file_acts),
@@ -79,8 +78,7 @@ void pipeline(const char *const *argv, struct pipeline *pl)
pl->infd = pipefds[1];
}
-int finish_pipeline(struct pipeline *pl)
-{
+int finish_pipeline(struct pipeline *pl) {
int status;
if (close(pl->infd)) {
diff --git a/tools/win32/compat.c b/tools/win32/compat.c
index f9aecd6..10828a6 100644
--- a/tools/win32/compat.c
+++ b/tools/win32/compat.c
@@ -37,14 +37,13 @@
#include "config.h"
#endif
-#include <stdio.h>
#include <stdarg.h>
+#include <stdio.h>
#include <stdlib.h>
#include "compat.h"
-int asprintf(char **strp, const char *fmt, ...)
-{
+int asprintf(char **strp, const char *fmt, ...) {
va_list ap;
int len;
@@ -52,13 +51,13 @@ int asprintf(char **strp, const char *fmt, ...)
len = _vscprintf(fmt, ap);
va_end(ap);
- *strp = malloc(len+1);
+ *strp = malloc(len + 1);
if (!*strp) {
return -1;
}
va_start(ap, fmt);
- _vsnprintf(*strp, len+1, fmt, ap);
+ _vsnprintf(*strp, len + 1, fmt, ap);
va_end(ap);
(*strp)[len] = 0;
diff --git a/tools/win32/process.c b/tools/win32/process.c
index 640d8df..fbb68f0 100644
--- a/tools/win32/process.c
+++ b/tools/win32/process.c
@@ -37,15 +37,14 @@
#include "config.h"
#endif
-#include <stdio.h>
#include <io.h>
+#include <stdio.h>
#include <windows.h>
#include "common.h"
#include "process.h"
-void die_windows_error(const char *fmt, ...)
-{
+void die_windows_error(const char *fmt, ...) {
char *msg;
va_list ap;
@@ -53,11 +52,10 @@ void die_windows_error(const char *fmt, ...)
vfprintf(stderr, fmt, ap);
va_end(ap);
- if (!FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM
- | FORMAT_MESSAGE_ALLOCATE_BUFFER,
- NULL, GetLastError(),
- MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
- (LPSTR)&msg, 0, NULL)) {
+ if (!FormatMessage(
+ FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_ALLOCATE_BUFFER, NULL,
+ GetLastError(), MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
+ (LPSTR)&msg, 0, NULL)) {
msg = "(failed to retrieve Windows error message)";
}
@@ -65,8 +63,7 @@ void die_windows_error(const char *fmt, ...)
exit(1);
}
-static char *make_command_line(const char *const *argv)
-{
+static char *make_command_line(const char *const *argv) {
int i;
size_t len = 1; /* initial quotes */
char *buf;
@@ -106,32 +103,32 @@ static char *make_command_line(const char *const *argv)
for (;;) {
switch (*src) {
- case 0:
- goto done;
+ case 0:
+ goto done;
- case '\"':
- for (; backslashes; backslashes--) {
- *dest++ = '\\';
- }
+ case '\"':
+ for (; backslashes; backslashes--) {
+ *dest++ = '\\';
+ }
- *dest++ = '\\';
- *dest++ = '\"';
- break;
+ *dest++ = '\\';
+ *dest++ = '\"';
+ break;
- case '\\':
- backslashes++;
- *dest++ = '\\';
- break;
+ case '\\':
+ backslashes++;
+ *dest++ = '\\';
+ break;
- default:
- backslashes = 0;
- *dest++ = *src;
- break;
+ default:
+ backslashes = 0;
+ *dest++ = *src;
+ break;
}
src++;
}
-done:
+ done:
for (; backslashes; backslashes--) {
*dest++ = '\\';
}
@@ -149,8 +146,7 @@ done:
return buf;
}
-void pipeline(const char *const *argv, struct pipeline *pl)
-{
+void pipeline(const char *const *argv, struct pipeline *pl) {
HANDLE in_read_handle, in_write_handle;
SECURITY_ATTRIBUTES sec_attr;
PROCESS_INFORMATION proc_info;
@@ -176,17 +172,17 @@ void pipeline(const char *const *argv, struct pipeline *pl)
start_info.cb = sizeof start_info;
start_info.dwFlags |= STARTF_USESTDHANDLES;
- if ((start_info.hStdError = GetStdHandle(STD_ERROR_HANDLE))
- == INVALID_HANDLE_VALUE
- || (start_info.hStdOutput = GetStdHandle(STD_OUTPUT_HANDLE))
- == INVALID_HANDLE_VALUE) {
+ if ((start_info.hStdError = GetStdHandle(STD_ERROR_HANDLE)) ==
+ INVALID_HANDLE_VALUE ||
+ (start_info.hStdOutput = GetStdHandle(STD_OUTPUT_HANDLE)) ==
+ INVALID_HANDLE_VALUE) {
die_windows_error("GetStdHandle");
}
start_info.hStdInput = in_read_handle;
- if (!CreateProcess(NULL, cmdline, NULL, NULL, TRUE, 0,
- NULL, NULL, &start_info, &proc_info)) {
+ if (!CreateProcess(NULL, cmdline, NULL, NULL, TRUE, 0, NULL, NULL,
+ &start_info, &proc_info)) {
die_windows_error("CreateProcess");
}
@@ -203,8 +199,7 @@ void pipeline(const char *const *argv, struct pipeline *pl)
pl->infd = _open_osfhandle((intptr_t)in_write_handle, 0);
}
-int finish_pipeline(struct pipeline *pl)
-{
+int finish_pipeline(struct pipeline *pl) {
DWORD code;
if (close(pl->infd)) {
@@ -219,8 +214,7 @@ int finish_pipeline(struct pipeline *pl)
break;
}
- if (WaitForSingleObject(pl->proc_handle, INFINITE)
- == WAIT_FAILED) {
+ if (WaitForSingleObject(pl->proc_handle, INFINITE) == WAIT_FAILED) {
die_windows_error("WaitForSingleObject");
}
}