From afaab64a3f4c6977d66811cb7235431a085de0b2 Mon Sep 17 00:00:00 2001 From: Alan Antonuk Date: Mon, 4 Dec 2017 23:23:22 -0800 Subject: Format code with clang-format --- tools/common.c | 213 +++++++++++++++++++------------------------------- tools/common.h | 11 ++- tools/consume.c | 136 +++++++++++++------------------- tools/declare_queue.c | 30 +++---- tools/delete_queue.c | 33 +++----- tools/get.c | 21 ++--- tools/publish.c | 98 +++++++++-------------- tools/unix/process.c | 10 +-- tools/win32/compat.c | 9 +-- tools/win32/process.c | 74 ++++++++---------- 10 files changed, 245 insertions(+), 390 deletions(-) (limited to 'tools') diff --git a/tools/common.c b/tools/common.c index f305378..13839a8 100644 --- a/tools/common.c +++ b/tools/common.c @@ -54,8 +54,7 @@ #include "compat.h" #endif -void die(const char *fmt, ...) -{ +void die(const char *fmt, ...) { va_list ap; va_start(ap, fmt); vfprintf(stderr, fmt, ap); @@ -64,8 +63,7 @@ void die(const char *fmt, ...) exit(1); } -void die_errno(int err, const char *fmt, ...) -{ +void die_errno(int err, const char *fmt, ...) { va_list ap; if (err == 0) { @@ -79,8 +77,7 @@ void die_errno(int err, const char *fmt, ...) exit(1); } -void die_amqp_error(int err, const char *fmt, ...) -{ +void die_amqp_error(int err, const char *fmt, ...) { va_list ap; if (err >= 0) { @@ -94,63 +91,56 @@ void die_amqp_error(int err, const char *fmt, ...) exit(1); } -const char *amqp_server_exception_string(amqp_rpc_reply_t r) -{ +const char *amqp_server_exception_string(amqp_rpc_reply_t r) { int res; static char s[512]; switch (r.reply.id) { - case AMQP_CONNECTION_CLOSE_METHOD: { - amqp_connection_close_t *m - = (amqp_connection_close_t *)r.reply.decoded; - res = snprintf(s, sizeof(s), "server connection error %d, message: %.*s", - m->reply_code, - (int)m->reply_text.len, - (char *)m->reply_text.bytes); - break; - } + case AMQP_CONNECTION_CLOSE_METHOD: { + amqp_connection_close_t *m = (amqp_connection_close_t *)r.reply.decoded; + res = snprintf(s, sizeof(s), "server connection error %d, message: %.*s", + m->reply_code, (int)m->reply_text.len, + (char *)m->reply_text.bytes); + break; + } - case AMQP_CHANNEL_CLOSE_METHOD: { - amqp_channel_close_t *m - = (amqp_channel_close_t *)r.reply.decoded; - res = snprintf(s, sizeof(s), "server channel error %d, message: %.*s", - m->reply_code, - (int)m->reply_text.len, - (char *)m->reply_text.bytes); - break; - } + case AMQP_CHANNEL_CLOSE_METHOD: { + amqp_channel_close_t *m = (amqp_channel_close_t *)r.reply.decoded; + res = snprintf(s, sizeof(s), "server channel error %d, message: %.*s", + m->reply_code, (int)m->reply_text.len, + (char *)m->reply_text.bytes); + break; + } - default: - res = snprintf(s, sizeof(s), "unknown server error, method id 0x%08X", - r.reply.id); - break; + default: + res = snprintf(s, sizeof(s), "unknown server error, method id 0x%08X", + r.reply.id); + break; } return res >= 0 ? s : NULL; } -const char *amqp_rpc_reply_string(amqp_rpc_reply_t r) -{ +const char *amqp_rpc_reply_string(amqp_rpc_reply_t r) { switch (r.reply_type) { - case AMQP_RESPONSE_NORMAL: - return "normal response"; + case AMQP_RESPONSE_NORMAL: + return "normal response"; - case AMQP_RESPONSE_NONE: - return "missing RPC reply type"; + case AMQP_RESPONSE_NONE: + return "missing RPC reply type"; - case AMQP_RESPONSE_LIBRARY_EXCEPTION: - return amqp_error_string2(r.library_error); + case AMQP_RESPONSE_LIBRARY_EXCEPTION: + return amqp_error_string2(r.library_error); - case AMQP_RESPONSE_SERVER_EXCEPTION: - return amqp_server_exception_string(r); + case AMQP_RESPONSE_SERVER_EXCEPTION: + return amqp_server_exception_string(r); - default: - abort(); + default: + abort(); } } -void die_rpc(amqp_rpc_reply_t r, const char *fmt, ...) -{ +void die_rpc(amqp_rpc_reply_t r, const char *fmt, ...) { va_list ap; if (r.reply_type == AMQP_RESPONSE_NORMAL) { @@ -180,57 +170,31 @@ static char *amqp_cert = NULL; const char *connect_options_title = "Connection options"; struct poptOption connect_options[] = { - { - "url", 'u', POPT_ARG_STRING, &amqp_url, 0, - "the AMQP URL to connect to", "amqp://..." - }, - { - "server", 's', POPT_ARG_STRING, &amqp_server, 0, - "the AMQP server to connect to", "hostname" - }, - { - "port", 0, POPT_ARG_INT, &amqp_port, 0, - "the port to connect on", "port" - }, - { - "vhost", 0, POPT_ARG_STRING, &amqp_vhost, 0, - "the vhost to use when connecting", "vhost" - }, - { - "username", 0, POPT_ARG_STRING, &amqp_username, 0, - "the username to login with", "username" - }, - { - "password", 0, POPT_ARG_STRING, &amqp_password, 0, - "the password to login with", "password" - }, - { - "heartbeat", 0, POPT_ARG_INT, &amqp_heartbeat, 0, - "heartbeat interval, set to 0 to disable", "heartbeat" - }, + {"url", 'u', POPT_ARG_STRING, &amqp_url, 0, "the AMQP URL to connect to", + "amqp://..."}, + {"server", 's', POPT_ARG_STRING, &amqp_server, 0, + "the AMQP server to connect to", "hostname"}, + {"port", 0, POPT_ARG_INT, &amqp_port, 0, "the port to connect on", "port"}, + {"vhost", 0, POPT_ARG_STRING, &amqp_vhost, 0, + "the vhost to use when connecting", "vhost"}, + {"username", 0, POPT_ARG_STRING, &amqp_username, 0, + "the username to login with", "username"}, + {"password", 0, POPT_ARG_STRING, &amqp_password, 0, + "the password to login with", "password"}, + {"heartbeat", 0, POPT_ARG_INT, &amqp_heartbeat, 0, + "heartbeat interval, set to 0 to disable", "heartbeat"}, #ifdef WITH_SSL - { - "ssl", 0, POPT_ARG_NONE, &amqp_ssl, 0, - "connect over SSL/TLS", NULL - }, - { - "cacert", 0, POPT_ARG_STRING, &amqp_cacert, 0, - "path to the CA certificate file", "cacert.pem" - }, - { - "key", 0, POPT_ARG_STRING, &amqp_key, 0, - "path to the client private key file", "key.pem" - }, - { - "cert", 0, POPT_ARG_STRING, &amqp_cert, 0, - "path to the client certificate file", "cert.pem" - }, + {"ssl", 0, POPT_ARG_NONE, &amqp_ssl, 0, "connect over SSL/TLS", NULL}, + {"cacert", 0, POPT_ARG_STRING, &amqp_cacert, 0, + "path to the CA certificate file", "cacert.pem"}, + {"key", 0, POPT_ARG_STRING, &amqp_key, 0, + "path to the client private key file", "key.pem"}, + {"cert", 0, POPT_ARG_STRING, &amqp_cert, 0, + "path to the client certificate file", "cert.pem"}, #endif /* WITH_SSL */ - { NULL, '\0', 0, NULL, 0, NULL, NULL } -}; + {NULL, '\0', 0, NULL, 0, NULL, NULL}}; -static void init_connection_info(struct amqp_connection_info *ci) -{ +static void init_connection_info(struct amqp_connection_info *ci) { ci->user = NULL; ci->password = NULL; ci->host = NULL; @@ -241,8 +205,8 @@ static void init_connection_info(struct amqp_connection_info *ci) amqp_default_connection_info(ci); if (amqp_url) - die_amqp_error(amqp_parse_url(strdup(amqp_url), ci), - "Parsing URL '%s'", amqp_url); + die_amqp_error(amqp_parse_url(strdup(amqp_url), ci), "Parsing URL '%s'", + amqp_url); if (amqp_server) { char *colon; @@ -260,8 +224,8 @@ static void init_connection_info(struct amqp_connection_info *ci) --server option, because it is not ipv6 friendly. --url now allows connection options to be specified concisely. */ - fprintf(stderr, "Specifying the port number with" - " --server is deprecated\n"); + fprintf(stderr, + "Specifying the port number with --server is deprecated\n"); host_len = colon - amqp_server; ci->host = malloc(host_len + 1); @@ -272,13 +236,10 @@ static void init_connection_info(struct amqp_connection_info *ci) die("both --server and --port options specify server port"); } - ci->port = strtol(colon+1, &port_end, 10); - if (ci->port < 0 - || ci->port > 65535 - || port_end == colon+1 - || *port_end != 0) - die("bad server port number in '%s'", - amqp_server); + ci->port = strtol(colon + 1, &port_end, 10); + if (ci->port < 0 || ci->port > 65535 || port_end == colon + 1 || + *port_end != 0) + die("bad server port number in '%s'", amqp_server); } #if WITH_SSL @@ -326,8 +287,7 @@ static void init_connection_info(struct amqp_connection_info *ci) } } -amqp_connection_state_t make_connection(void) -{ +amqp_connection_state_t make_connection(void) { int status; amqp_socket_t *socket = NULL; struct amqp_connection_info ci; @@ -361,8 +321,7 @@ amqp_connection_state_t make_connection(void) die("opening socket to %s:%d", ci.host, ci.port); } die_rpc(amqp_login(conn, ci.vhost, 0, 131072, amqp_heartbeat, - AMQP_SASL_METHOD_PLAIN, - ci.user, ci.password), + AMQP_SASL_METHOD_PLAIN, ci.user, ci.password), "logging in to AMQP server"); if (!amqp_channel_open(conn, 1)) { die_rpc(amqp_get_rpc_reply(conn), "opening channel"); @@ -370,11 +329,9 @@ amqp_connection_state_t make_connection(void) return conn; } -void close_connection(amqp_connection_state_t conn) -{ +void close_connection(amqp_connection_state_t conn) { int res; - die_rpc(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), - "closing channel"); + die_rpc(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "closing channel"); die_rpc(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "closing connection"); @@ -382,8 +339,7 @@ void close_connection(amqp_connection_state_t conn) die_amqp_error(res, "closing connection"); } -amqp_bytes_t read_all(int fd) -{ +amqp_bytes_t read_all(int fd) { size_t space = 4096; amqp_bytes_t bytes; @@ -391,8 +347,7 @@ amqp_bytes_t read_all(int fd) bytes.len = 0; for (;;) { - ssize_t res = read(fd, (char *)bytes.bytes + bytes.len, - space-bytes.len); + ssize_t res = read(fd, (char *)bytes.bytes + bytes.len, space - bytes.len); if (res == 0) { break; } @@ -415,8 +370,7 @@ amqp_bytes_t read_all(int fd) return bytes; } -void write_all(int fd, amqp_bytes_t data) -{ +void write_all(int fd, amqp_bytes_t data) { while (data.len > 0) { ssize_t res = write(fd, data.bytes, data.len); if (res < 0) { @@ -428,16 +382,14 @@ void write_all(int fd, amqp_bytes_t data) } } -void copy_body(amqp_connection_state_t conn, int fd) -{ +void copy_body(amqp_connection_state_t conn, int fd) { size_t body_remaining; amqp_frame_t frame; int res = amqp_simple_wait_frame(conn, &frame); die_amqp_error(res, "waiting for header frame"); if (frame.frame_type != AMQP_FRAME_HEADER) { - die("expected header, got frame type 0x%X", - frame.frame_type); + die("expected header, got frame type 0x%X", frame.frame_type); } body_remaining = frame.payload.properties.body_size; @@ -445,8 +397,7 @@ void copy_body(amqp_connection_state_t conn, int fd) res = amqp_simple_wait_frame(conn, &frame); die_amqp_error(res, "waiting for body frame"); if (frame.frame_type != AMQP_FRAME_BODY) { - die("expected body, got frame type 0x%X", - frame.frame_type); + die("expected body, got frame type 0x%X", frame.frame_type); } write_all(fd, frame.payload.body_fragment); @@ -455,9 +406,7 @@ void copy_body(amqp_connection_state_t conn, int fd) } poptContext process_options(int argc, const char **argv, - struct poptOption *options, - const char *help) -{ + struct poptOption *options, const char *help) { int c; poptContext opts = poptGetContext(NULL, argc, argv, options, 0); poptSetOtherOptionHelp(opts, help); @@ -467,8 +416,7 @@ poptContext process_options(int argc, const char **argv, } if (c < -1) { - fprintf(stderr, "%s: %s\n", - poptBadOption(opts, POPT_BADOPTION_NOALIAS), + fprintf(stderr, "%s: %s\n", poptBadOption(opts, POPT_BADOPTION_NOALIAS), poptStrerror(c)); poptPrintUsage(opts, stderr, 0); exit(1); @@ -478,10 +426,8 @@ poptContext process_options(int argc, const char **argv, } void process_all_options(int argc, const char **argv, - struct poptOption *options) -{ - poptContext opts = process_options(argc, argv, options, - "[OPTIONS]..."); + struct poptOption *options) { + poptContext opts = process_options(argc, argv, options, "[OPTIONS]..."); const char *opt = poptPeekArg(opts); if (opt) { @@ -493,7 +439,6 @@ void process_all_options(int argc, const char **argv, poptFreeContext(opts); } -amqp_bytes_t cstring_bytes(const char *str) -{ +amqp_bytes_t cstring_bytes(const char *str) { return str ? amqp_cstring_bytes(str) : amqp_empty_bytes; } diff --git a/tools/common.h b/tools/common.h index 642d1b3..36b5153 100644 --- a/tools/common.h +++ b/tools/common.h @@ -43,14 +43,13 @@ extern const char *amqp_server_exception_string(amqp_rpc_reply_t r); extern const char *amqp_rpc_reply_string(amqp_rpc_reply_t r); -extern void die(const char *fmt, ...) -__attribute__ ((format (printf, 1, 2))); +extern void die(const char *fmt, ...) __attribute__((format(printf, 1, 2))); extern void die_errno(int err, const char *fmt, ...) -__attribute__ ((format (printf, 2, 3))); + __attribute__((format(printf, 2, 3))); extern void die_amqp_error(int err, const char *fmt, ...) -__attribute__ ((format (printf, 2, 3))); + __attribute__((format(printf, 2, 3))); extern void die_rpc(amqp_rpc_reply_t r, const char *fmt, ...) -__attribute__ ((format (printf, 2, 3))); + __attribute__((format(printf, 2, 3))); extern const char *connect_options_title; extern struct poptOption connect_options[]; @@ -63,7 +62,7 @@ extern void write_all(int fd, amqp_bytes_t data); extern void copy_body(amqp_connection_state_t conn, int fd); #define INCLUDE_OPTIONS(options) \ - {NULL, 0, POPT_ARG_INCLUDE_TABLE, options, 0, options ## _title, NULL} + { NULL, 0, POPT_ARG_INCLUDE_TABLE, options, 0, options##_title, NULL } extern poptContext process_options(int argc, const char **argv, struct poptOption *options, diff --git a/tools/consume.c b/tools/consume.c index 485a0cf..dbc164a 100644 --- a/tools/consume.c +++ b/tools/consume.c @@ -49,8 +49,7 @@ /* Convert a amqp_bytes_t to an escaped string form for printing. We use the same escaping conventions as rabbitmqctl. */ -static char *stringify_bytes(amqp_bytes_t bytes) -{ +static char *stringify_bytes(amqp_bytes_t bytes) { /* We will need up to 4 chars per byte, plus the terminating 0 */ char *res = malloc(bytes.len * 4 + 1); uint8_t *data = bytes.bytes; @@ -72,11 +71,9 @@ static char *stringify_bytes(amqp_bytes_t bytes) return res; } -static amqp_bytes_t setup_queue(amqp_connection_state_t conn, - char *queue, char *exchange, - char *routing_key, int declare, - int exclusive) -{ +static amqp_bytes_t setup_queue(amqp_connection_state_t conn, char *queue, + char *exchange, char *routing_key, int declare, + int exclusive) { amqp_bytes_t queue_bytes = cstring_bytes(queue); char *routing_key_rest; @@ -84,19 +81,19 @@ static amqp_bytes_t setup_queue(amqp_connection_state_t conn, char *routing_tmp; int routing_key_count = 0; - /* if an exchange name wasn't provided, check that we don't - have options that require it. */ + /* if an exchange name wasn't provided, check that we don't have options that + * require it. */ if (!exchange && routing_key) { - fprintf(stderr, "--routing-key option requires an exchange" - " name to be provided with --exchange\n"); + fprintf(stderr, + "--routing-key option requires an exchange name to be provided " + "with --exchange\n"); exit(1); } if (!queue || exchange || declare || exclusive) { /* Declare the queue as auto-delete. */ - amqp_queue_declare_ok_t *res = amqp_queue_declare(conn, 1, - queue_bytes, 0, 0, exclusive, 1, - amqp_empty_table); + amqp_queue_declare_ok_t *res = amqp_queue_declare( + conn, 1, queue_bytes, 0, 0, exclusive, 1, amqp_empty_table); if (!res) { die_rpc(amqp_get_rpc_reply(conn), "queue.declare"); } @@ -106,8 +103,7 @@ static amqp_bytes_t setup_queue(amqp_connection_state_t conn, char *sq; queue_bytes = amqp_bytes_malloc_dup(res->queue); sq = stringify_bytes(queue_bytes); - fprintf(stderr, "Server provided queue name: %s\n", - sq); + fprintf(stderr, "Server provided queue name: %s\n", sq); free(sq); } @@ -115,17 +111,17 @@ static amqp_bytes_t setup_queue(amqp_connection_state_t conn, if (exchange) { amqp_bytes_t eb = amqp_cstring_bytes(exchange); - routing_tmp = strdup( routing_key ); - if ( NULL == routing_tmp ) { - fprintf(stderr, "could not allocate memory to parse routing key\n" ); + routing_tmp = strdup(routing_key); + if (NULL == routing_tmp) { + fprintf(stderr, "could not allocate memory to parse routing key\n"); exit(1); } - for ( - routing_key_token = strtok_r( routing_tmp, LISTEN_KEYS_DELIMITER, &routing_key_rest ) - ; NULL != routing_key_token && routing_key_count < MAX_LISTEN_KEYS - 1 - ; routing_key_token = strtok_r( NULL, LISTEN_KEYS_DELIMITER, &routing_key_rest ) - ) { + for (routing_key_token = + strtok_r(routing_tmp, LISTEN_KEYS_DELIMITER, &routing_key_rest); + NULL != routing_key_token && routing_key_count < MAX_LISTEN_KEYS - 1; + routing_key_token = + strtok_r(NULL, LISTEN_KEYS_DELIMITER, &routing_key_rest)) { if (!amqp_queue_bind(conn, 1, queue_bytes, eb, cstring_bytes(routing_key_token), @@ -133,7 +129,7 @@ static amqp_bytes_t setup_queue(amqp_connection_state_t conn, die_rpc(amqp_get_rpc_reply(conn), "queue.bind"); } } - free( routing_tmp ); + free(routing_tmp); } } @@ -144,13 +140,12 @@ static amqp_bytes_t setup_queue(amqp_connection_state_t conn, static void do_consume(amqp_connection_state_t conn, amqp_bytes_t queue, int no_ack, int count, int prefetch_count, - const char *const *argv) -{ + const char *const *argv) { int i; /* If there is a limit, set the qos to match */ - if (count > 0 && count <= AMQP_CONSUME_MAX_PREFETCH_COUNT - && !amqp_basic_qos(conn, 1, 0, count, 0)) { + if (count > 0 && count <= AMQP_CONSUME_MAX_PREFETCH_COUNT && + !amqp_basic_qos(conn, 1, 0, count, 0)) { die_rpc(amqp_get_rpc_reply(conn), "basic.qos"); } @@ -159,15 +154,16 @@ static void do_consume(amqp_connection_state_t conn, amqp_bytes_t queue, if (prefetch_count > 0 && prefetch_count <= AMQP_CONSUME_MAX_PREFETCH_COUNT) { /* the maximum number of messages to be received at a time must be less * than the global maximum number of messages. */ - if (!(count > 0 && count <= AMQP_CONSUME_MAX_PREFETCH_COUNT && prefetch_count >= count)) { + if (!(count > 0 && count <= AMQP_CONSUME_MAX_PREFETCH_COUNT && + prefetch_count >= count)) { if (!amqp_basic_qos(conn, 1, 0, prefetch_count, 0)) { die_rpc(amqp_get_rpc_reply(conn), "basic.qos"); } } } - if (!amqp_basic_consume(conn, 1, queue, amqp_empty_bytes, 0, no_ack, - 0, amqp_empty_table)) { + if (!amqp_basic_consume(conn, 1, queue, amqp_empty_bytes, 0, no_ack, 0, + amqp_empty_table)) { die_rpc(amqp_get_rpc_reply(conn), "basic.consume"); } @@ -179,8 +175,8 @@ static void do_consume(amqp_connection_state_t conn, amqp_bytes_t queue, int res = amqp_simple_wait_frame(conn, &frame); die_amqp_error(res, "waiting for header frame"); - if (frame.frame_type != AMQP_FRAME_METHOD - || frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) { + if (frame.frame_type != AMQP_FRAME_METHOD || + frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) { continue; } @@ -191,16 +187,13 @@ static void do_consume(amqp_connection_state_t conn, amqp_bytes_t queue, copy_body(conn, pl.infd); if (finish_pipeline(&pl) && !no_ack) - die_amqp_error(amqp_basic_ack(conn, 1, delivery_tag, - 0), - "basic.ack"); + die_amqp_error(amqp_basic_ack(conn, 1, delivery_tag, 0), "basic.ack"); amqp_maybe_release_buffers(conn); } } -int main(int argc, const char **argv) -{ +int main(int argc, const char **argv) { poptContext opts; amqp_connection_state_t conn; const char *const *cmd_argv; @@ -215,47 +208,27 @@ int main(int argc, const char **argv) amqp_bytes_t queue_bytes; struct poptOption options[] = { - INCLUDE_OPTIONS(connect_options), - { - "queue", 'q', POPT_ARG_STRING, &queue, 0, - "the queue to consume from", "queue" - }, - { - "exchange", 'e', POPT_ARG_STRING, &exchange, 0, - "bind the queue to this exchange", "exchange" - }, - { - "routing-key", 'r', POPT_ARG_STRING, &routing_key, 0, - "the routing key to bind with", "routing key" - }, - { - "declare", 'd', POPT_ARG_NONE, &declare, 0, - "declare an exclusive queue (deprecated, use --exclusive instead)", NULL - }, - { - "exclusive", 'x', POPT_ARG_NONE, &exclusive, 0, - "declare the queue as exclusive", NULL - }, - { - "no-ack", 'A', POPT_ARG_NONE, &no_ack, 0, - "consume in no-ack mode", NULL - }, - { - "count", 'c', POPT_ARG_INT, &count, 0, - "stop consuming after this many messages are consumed", - "limit" - }, - { - "prefetch-count", 'p', POPT_ARG_INT, &prefetch_count, 0, - "receive only this many message at a time from the server", - "limit" - }, - POPT_AUTOHELP - { NULL, '\0', 0, NULL, 0, NULL, NULL } - }; - - opts = process_options(argc, argv, options, - "[OPTIONS]... "); + INCLUDE_OPTIONS(connect_options), + {"queue", 'q', POPT_ARG_STRING, &queue, 0, "the queue to consume from", + "queue"}, + {"exchange", 'e', POPT_ARG_STRING, &exchange, 0, + "bind the queue to this exchange", "exchange"}, + {"routing-key", 'r', POPT_ARG_STRING, &routing_key, 0, + "the routing key to bind with", "routing key"}, + {"declare", 'd', POPT_ARG_NONE, &declare, 0, + "declare an exclusive queue (deprecated, use --exclusive instead)", + NULL}, + {"exclusive", 'x', POPT_ARG_NONE, &exclusive, 0, + "declare the queue as exclusive", NULL}, + {"no-ack", 'A', POPT_ARG_NONE, &no_ack, 0, "consume in no-ack mode", + NULL}, + {"count", 'c', POPT_ARG_INT, &count, 0, + "stop consuming after this many messages are consumed", "limit"}, + {"prefetch-count", 'p', POPT_ARG_INT, &prefetch_count, 0, + "receive only this many message at a time from the server", "limit"}, + POPT_AUTOHELP{NULL, '\0', 0, NULL, 0, NULL, NULL}}; + + opts = process_options(argc, argv, options, "[OPTIONS]... "); cmd_argv = poptGetArgs(opts); if (!cmd_argv || !cmd_argv[0]) { @@ -265,7 +238,8 @@ int main(int argc, const char **argv) } conn = make_connection(); - queue_bytes = setup_queue(conn, queue, exchange, routing_key, declare, exclusive); + queue_bytes = + setup_queue(conn, queue, exchange, routing_key, declare, exclusive); do_consume(conn, queue_bytes, no_ack, count, prefetch_count, cmd_argv); close_connection(conn); return 0; diff --git a/tools/declare_queue.c b/tools/declare_queue.c index 15f0b2a..0b98580 100644 --- a/tools/declare_queue.c +++ b/tools/declare_queue.c @@ -44,25 +44,18 @@ #include "common.h" -int main(int argc, const char **argv) -{ +int main(int argc, const char **argv) { amqp_connection_state_t conn; static char *queue = NULL; static int durable = 0; struct poptOption options[] = { - INCLUDE_OPTIONS(connect_options), - { - "queue", 'q', POPT_ARG_STRING, &queue, 0, - "the queue name to declare, or the empty string", "queue" - }, - { - "durable", 'd', POPT_ARG_VAL, &durable, 1, - "declare a durable queue", NULL - }, - POPT_AUTOHELP - { NULL, '\0', 0, NULL, 0, NULL, NULL } - }; + INCLUDE_OPTIONS(connect_options), + {"queue", 'q', POPT_ARG_STRING, &queue, 0, + "the queue name to declare, or the empty string", "queue"}, + {"durable", 'd', POPT_ARG_VAL, &durable, 1, "declare a durable queue", + NULL}, + POPT_AUTOHELP{NULL, '\0', 0, NULL, 0, NULL, NULL}}; process_all_options(argc, argv, options); @@ -73,13 +66,8 @@ int main(int argc, const char **argv) conn = make_connection(); { - amqp_queue_declare_ok_t *reply = amqp_queue_declare(conn, 1, - cstring_bytes(queue), - 0, - durable, - 0, - 0, - amqp_empty_table); + amqp_queue_declare_ok_t *reply = amqp_queue_declare( + conn, 1, cstring_bytes(queue), 0, durable, 0, 0, amqp_empty_table); if (reply == NULL) { die_rpc(amqp_get_rpc_reply(conn), "queue.declare"); } diff --git a/tools/delete_queue.c b/tools/delete_queue.c index 6ac0130..f9d01ab 100644 --- a/tools/delete_queue.c +++ b/tools/delete_queue.c @@ -44,30 +44,21 @@ #include "common.h" -int main(int argc, const char **argv) -{ +int main(int argc, const char **argv) { amqp_connection_state_t conn; static char *queue = NULL; static int if_unused = 0; static int if_empty = 0; struct poptOption options[] = { - INCLUDE_OPTIONS(connect_options), - { - "queue", 'q', POPT_ARG_STRING, &queue, 0, - "the queue name to delete", "queue" - }, - { - "if-unused", 'u', POPT_ARG_VAL, &if_unused, 1, - "do not delete unless queue is unused", NULL - }, - { - "if-empty", 'e', POPT_ARG_VAL, &if_empty, 1, - "do not delete unless queue is empty", NULL - }, - POPT_AUTOHELP - { NULL, '\0', 0, NULL, 0, NULL, NULL } - }; + INCLUDE_OPTIONS(connect_options), + {"queue", 'q', POPT_ARG_STRING, &queue, 0, "the queue name to delete", + "queue"}, + {"if-unused", 'u', POPT_ARG_VAL, &if_unused, 1, + "do not delete unless queue is unused", NULL}, + {"if-empty", 'e', POPT_ARG_VAL, &if_empty, 1, + "do not delete unless queue is empty", NULL}, + POPT_AUTOHELP{NULL, '\0', 0, NULL, 0, NULL, NULL}}; process_all_options(argc, argv, options); @@ -78,10 +69,8 @@ int main(int argc, const char **argv) conn = make_connection(); { - amqp_queue_delete_ok_t *reply = amqp_queue_delete(conn, 1, - cstring_bytes(queue), - if_unused, - if_empty); + amqp_queue_delete_ok_t *reply = + amqp_queue_delete(conn, 1, cstring_bytes(queue), if_unused, if_empty); if (reply == NULL) { die_rpc(amqp_get_rpc_reply(conn), "queue.delete"); } diff --git a/tools/get.c b/tools/get.c index 598a014..f418e2f 100644 --- a/tools/get.c +++ b/tools/get.c @@ -41,10 +41,8 @@ #include "common.h" -static int do_get(amqp_connection_state_t conn, char *queue) -{ - amqp_rpc_reply_t r - = amqp_basic_get(conn, 1, cstring_bytes(queue), 1); +static int do_get(amqp_connection_state_t conn, char *queue) { + amqp_rpc_reply_t r = amqp_basic_get(conn, 1, cstring_bytes(queue), 1); die_rpc(r, "basic.get"); if (r.reply.id == AMQP_BASIC_GET_EMPTY_METHOD) { @@ -55,21 +53,16 @@ static int do_get(amqp_connection_state_t conn, char *queue) return 1; } -int main(int argc, const char **argv) -{ +int main(int argc, const char **argv) { amqp_connection_state_t conn; static char *queue = NULL; int got_something; struct poptOption options[] = { - INCLUDE_OPTIONS(connect_options), - { - "queue", 'q', POPT_ARG_STRING, &queue, 0, - "the queue to consume from", "queue" - }, - POPT_AUTOHELP - { NULL, '\0', 0, NULL, 0, NULL, NULL } - }; + INCLUDE_OPTIONS(connect_options), + {"queue", 'q', POPT_ARG_STRING, &queue, 0, "the queue to consume from", + "queue"}, + POPT_AUTOHELP{NULL, '\0', 0, NULL, 0, NULL, NULL}}; process_all_options(argc, argv, options); diff --git a/tools/publish.c b/tools/publish.c index 881004d..b2a2a1e 100644 --- a/tools/publish.c +++ b/tools/publish.c @@ -45,19 +45,15 @@ #define MAX_LINE_LENGTH 1024 * 32 -static void do_publish(amqp_connection_state_t conn, - char *exchange, char *routing_key, - amqp_basic_properties_t *props, amqp_bytes_t body) -{ - int res = amqp_basic_publish(conn, 1, - cstring_bytes(exchange), - cstring_bytes(routing_key), - 0, 0, props, body); +static void do_publish(amqp_connection_state_t conn, char *exchange, + char *routing_key, amqp_basic_properties_t *props, + amqp_bytes_t body) { + int res = amqp_basic_publish(conn, 1, cstring_bytes(exchange), + cstring_bytes(routing_key), 0, 0, props, body); die_amqp_error(res, "basic.publish"); } -int main(int argc, const char **argv) -{ +int main(int argc, const char **argv) { amqp_connection_state_t conn; static char *exchange = NULL; static char *routing_key = NULL; @@ -73,53 +69,32 @@ int main(int argc, const char **argv) static char **pos; struct poptOption options[] = { - INCLUDE_OPTIONS(connect_options), - { - "exchange", 'e', POPT_ARG_STRING, &exchange, 0, - "the exchange to publish to", "exchange" - }, - { - "routing-key", 'r', POPT_ARG_STRING, &routing_key, 0, - "the routing key to publish with", "routing key" - }, - { - "persistent", 'p', POPT_ARG_VAL, &delivery, 2, - "use the persistent delivery mode", NULL - }, - { - "content-type", 'C', POPT_ARG_STRING, &content_type, 0, - "the content-type for the message", "content type" - }, - { - "reply-to", 't', POPT_ARG_STRING, &reply_to, 0, - "the replyTo to use for the message", "reply to" - }, - { - "line-buffered", 'l', POPT_ARG_VAL, &line_buffered, 2, - "treat each line from standard in as a separate message", NULL - }, - { - "content-encoding", 'E', POPT_ARG_STRING, - &content_encoding, 0, - "the content-encoding for the message", "content encoding" - }, - { - "header", 'H', POPT_ARG_ARGV, &headers, 0, - "set a message header (may be specified multiple times)", "\"key: value\"" - }, - { - "body", 'b', POPT_ARG_STRING, &body, 0, - "specify the message body", "body" - }, - POPT_AUTOHELP - { NULL, '\0', 0, NULL, 0, NULL, NULL } - }; + INCLUDE_OPTIONS(connect_options), + {"exchange", 'e', POPT_ARG_STRING, &exchange, 0, + "the exchange to publish to", "exchange"}, + {"routing-key", 'r', POPT_ARG_STRING, &routing_key, 0, + "the routing key to publish with", "routing key"}, + {"persistent", 'p', POPT_ARG_VAL, &delivery, 2, + "use the persistent delivery mode", NULL}, + {"content-type", 'C', POPT_ARG_STRING, &content_type, 0, + "the content-type for the message", "content type"}, + {"reply-to", 't', POPT_ARG_STRING, &reply_to, 0, + "the replyTo to use for the message", "reply to"}, + {"line-buffered", 'l', POPT_ARG_VAL, &line_buffered, 2, + "treat each line from standard in as a separate message", NULL}, + {"content-encoding", 'E', POPT_ARG_STRING, &content_encoding, 0, + "the content-encoding for the message", "content encoding"}, + {"header", 'H', POPT_ARG_ARGV, &headers, 0, + "set a message header (may be specified multiple times)", + "\"key: value\""}, + {"body", 'b', POPT_ARG_STRING, &body, 0, "specify the message body", + "body"}, + POPT_AUTOHELP{NULL, '\0', 0, NULL, 0, NULL, NULL}}; process_all_options(argc, argv, options); if (!exchange && !routing_key) { - fprintf(stderr, - "neither exchange nor routing key specified\n"); + fprintf(stderr, "neither exchange nor routing key specified\n"); return 1; } @@ -162,9 +137,10 @@ int main(int argc, const char **argv) table->entries[i].value.kind = AMQP_FIELD_KIND_UTF8; table->entries[i].value.value.bytes = amqp_cstring_bytes(colon); i++; - } - else { - fprintf(stderr, "Ignored header definition missing ':' delimiter in \"%s\"\n", *pos); + } else { + fprintf(stderr, + "Ignored header definition missing ':' delimiter in \"%s\"\n", + *pos); } } props._flags |= AMQP_BASIC_HEADERS_FLAG; @@ -176,10 +152,10 @@ int main(int argc, const char **argv) if (body) { body_bytes = amqp_cstring_bytes(body); } else { - if ( line_buffered ) { - body_bytes.bytes = ( char * ) malloc( MAX_LINE_LENGTH ); - while ( fgets( body_bytes.bytes, MAX_LINE_LENGTH, stdin ) ) { - body_bytes.len = strlen( body_bytes.bytes ); + if (line_buffered) { + body_bytes.bytes = (char *)malloc(MAX_LINE_LENGTH); + while (fgets(body_bytes.bytes, MAX_LINE_LENGTH, stdin)) { + body_bytes.len = strlen(body_bytes.bytes); do_publish(conn, exchange, routing_key, &props, body_bytes); } } else { @@ -187,7 +163,7 @@ int main(int argc, const char **argv) } } - if ( !line_buffered ) { + if (!line_buffered) { do_publish(conn, exchange, routing_key, &props, body_bytes); } diff --git a/tools/unix/process.c b/tools/unix/process.c index 987b49c..596f40a 100644 --- a/tools/unix/process.c +++ b/tools/unix/process.c @@ -37,18 +37,17 @@ #include "config.h" #endif -#include #include #include #include +#include #include "common.h" #include "process.h" extern char **environ; -void pipeline(const char *const *argv, struct pipeline *pl) -{ +void pipeline(const char *const *argv, struct pipeline *pl) { posix_spawn_file_actions_t file_acts; int pipefds[2]; @@ -66,7 +65,7 @@ void pipeline(const char *const *argv, struct pipeline *pl) "posix_spawn_file_actions_addclose"); die_errno(posix_spawnp(&pl->pid, argv[0], &file_acts, NULL, - (char * const *)argv, environ), + (char *const *)argv, environ), "posix_spawnp: %s", argv[0]); die_errno(posix_spawn_file_actions_destroy(&file_acts), @@ -79,8 +78,7 @@ void pipeline(const char *const *argv, struct pipeline *pl) pl->infd = pipefds[1]; } -int finish_pipeline(struct pipeline *pl) -{ +int finish_pipeline(struct pipeline *pl) { int status; if (close(pl->infd)) { diff --git a/tools/win32/compat.c b/tools/win32/compat.c index f9aecd6..10828a6 100644 --- a/tools/win32/compat.c +++ b/tools/win32/compat.c @@ -37,14 +37,13 @@ #include "config.h" #endif -#include #include +#include #include #include "compat.h" -int asprintf(char **strp, const char *fmt, ...) -{ +int asprintf(char **strp, const char *fmt, ...) { va_list ap; int len; @@ -52,13 +51,13 @@ int asprintf(char **strp, const char *fmt, ...) len = _vscprintf(fmt, ap); va_end(ap); - *strp = malloc(len+1); + *strp = malloc(len + 1); if (!*strp) { return -1; } va_start(ap, fmt); - _vsnprintf(*strp, len+1, fmt, ap); + _vsnprintf(*strp, len + 1, fmt, ap); va_end(ap); (*strp)[len] = 0; diff --git a/tools/win32/process.c b/tools/win32/process.c index 640d8df..fbb68f0 100644 --- a/tools/win32/process.c +++ b/tools/win32/process.c @@ -37,15 +37,14 @@ #include "config.h" #endif -#include #include +#include #include #include "common.h" #include "process.h" -void die_windows_error(const char *fmt, ...) -{ +void die_windows_error(const char *fmt, ...) { char *msg; va_list ap; @@ -53,11 +52,10 @@ void die_windows_error(const char *fmt, ...) vfprintf(stderr, fmt, ap); va_end(ap); - if (!FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM - | FORMAT_MESSAGE_ALLOCATE_BUFFER, - NULL, GetLastError(), - MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), - (LPSTR)&msg, 0, NULL)) { + if (!FormatMessage( + FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_ALLOCATE_BUFFER, NULL, + GetLastError(), MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), + (LPSTR)&msg, 0, NULL)) { msg = "(failed to retrieve Windows error message)"; } @@ -65,8 +63,7 @@ void die_windows_error(const char *fmt, ...) exit(1); } -static char *make_command_line(const char *const *argv) -{ +static char *make_command_line(const char *const *argv) { int i; size_t len = 1; /* initial quotes */ char *buf; @@ -106,32 +103,32 @@ static char *make_command_line(const char *const *argv) for (;;) { switch (*src) { - case 0: - goto done; + case 0: + goto done; - case '\"': - for (; backslashes; backslashes--) { - *dest++ = '\\'; - } + case '\"': + for (; backslashes; backslashes--) { + *dest++ = '\\'; + } - *dest++ = '\\'; - *dest++ = '\"'; - break; + *dest++ = '\\'; + *dest++ = '\"'; + break; - case '\\': - backslashes++; - *dest++ = '\\'; - break; + case '\\': + backslashes++; + *dest++ = '\\'; + break; - default: - backslashes = 0; - *dest++ = *src; - break; + default: + backslashes = 0; + *dest++ = *src; + break; } src++; } -done: + done: for (; backslashes; backslashes--) { *dest++ = '\\'; } @@ -149,8 +146,7 @@ done: return buf; } -void pipeline(const char *const *argv, struct pipeline *pl) -{ +void pipeline(const char *const *argv, struct pipeline *pl) { HANDLE in_read_handle, in_write_handle; SECURITY_ATTRIBUTES sec_attr; PROCESS_INFORMATION proc_info; @@ -176,17 +172,17 @@ void pipeline(const char *const *argv, struct pipeline *pl) start_info.cb = sizeof start_info; start_info.dwFlags |= STARTF_USESTDHANDLES; - if ((start_info.hStdError = GetStdHandle(STD_ERROR_HANDLE)) - == INVALID_HANDLE_VALUE - || (start_info.hStdOutput = GetStdHandle(STD_OUTPUT_HANDLE)) - == INVALID_HANDLE_VALUE) { + if ((start_info.hStdError = GetStdHandle(STD_ERROR_HANDLE)) == + INVALID_HANDLE_VALUE || + (start_info.hStdOutput = GetStdHandle(STD_OUTPUT_HANDLE)) == + INVALID_HANDLE_VALUE) { die_windows_error("GetStdHandle"); } start_info.hStdInput = in_read_handle; - if (!CreateProcess(NULL, cmdline, NULL, NULL, TRUE, 0, - NULL, NULL, &start_info, &proc_info)) { + if (!CreateProcess(NULL, cmdline, NULL, NULL, TRUE, 0, NULL, NULL, + &start_info, &proc_info)) { die_windows_error("CreateProcess"); } @@ -203,8 +199,7 @@ void pipeline(const char *const *argv, struct pipeline *pl) pl->infd = _open_osfhandle((intptr_t)in_write_handle, 0); } -int finish_pipeline(struct pipeline *pl) -{ +int finish_pipeline(struct pipeline *pl) { DWORD code; if (close(pl->infd)) { @@ -219,8 +214,7 @@ int finish_pipeline(struct pipeline *pl) break; } - if (WaitForSingleObject(pl->proc_handle, INFINITE) - == WAIT_FAILED) { + if (WaitForSingleObject(pl->proc_handle, INFINITE) == WAIT_FAILED) { die_windows_error("WaitForSingleObject"); } } -- cgit v1.2.1