From ad2b116059e22d393b7e44ad54f345a3fb4e267b Mon Sep 17 00:00:00 2001 From: Alan Antonuk Date: Mon, 8 Apr 2013 14:52:53 -0700 Subject: Formatted source code with astyle utilty --- .astyle | 4 + examples/amqp_bind.c | 13 +- examples/amqp_consumer.c | 32 +- examples/amqp_exchange_declare.c | 7 +- examples/amqp_listen.c | 72 ++-- examples/amqp_listenq.c | 68 ++-- examples/amqp_producer.c | 29 +- examples/amqp_rpc_sendstring_client.c | 18 +- examples/amqp_sendstring.c | 21 +- examples/amqp_unbind.c | 13 +- examples/utils.c | 119 ++++--- examples/win32/platform_utils.c | 2 +- librabbitmq/amqp.h | 98 +++--- librabbitmq/amqp_api.c | 86 ++--- librabbitmq/amqp_connection.c | 147 ++++---- librabbitmq/amqp_mem.c | 48 ++- librabbitmq/amqp_private.h | 160 +++++---- librabbitmq/amqp_socket.c | 294 ++++++++-------- librabbitmq/amqp_table.c | 143 ++++---- librabbitmq/amqp_url.c | 298 ++++++++-------- librabbitmq/unix/socket.c | 35 +- librabbitmq/win32/socket.c | 59 ++-- librabbitmq/win32/socket.h | 6 +- tests/test_parse_url.c | 214 ++++++------ tests/test_tables.c | 60 ++-- tools/common.c | 627 ++++++++++++++++++---------------- tools/common.h | 16 +- tools/consume.c | 307 +++++++++-------- tools/declare_queue.c | 69 ++-- tools/delete_queue.c | 72 ++-- tools/get.c | 55 +-- tools/publish.c | 156 +++++---- tools/unix/process.c | 56 +-- tools/unix/process.h | 4 +- tools/win32/compat.c | 27 +- tools/win32/process.c | 327 +++++++++--------- tools/win32/process.h | 4 +- 37 files changed, 2014 insertions(+), 1752 deletions(-) create mode 100644 .astyle diff --git a/.astyle b/.astyle new file mode 100644 index 0000000..11156b1 --- /dev/null +++ b/.astyle @@ -0,0 +1,4 @@ +--style=1tbs +--indent=spaces=2 +--indent-preprocessor +--align-pointer=name diff --git a/examples/amqp_bind.c b/examples/amqp_bind.c index 7864871..3c9a863 100644 --- a/examples/amqp_bind.c +++ b/examples/amqp_bind.c @@ -41,7 +41,8 @@ #include "utils.h" -int main(int argc, char const * const *argv) { +int main(int argc, char const *const *argv) +{ char const *hostname; int port; char const *exchange; @@ -67,15 +68,15 @@ int main(int argc, char const * const *argv) { die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket"); amqp_set_sockfd(conn, sockfd); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), - "Logging in"); + "Logging in"); amqp_channel_open(conn, 1); die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); amqp_queue_bind(conn, 1, - amqp_cstring_bytes(queue), - amqp_cstring_bytes(exchange), - amqp_cstring_bytes(bindingkey), - amqp_empty_table); + amqp_cstring_bytes(queue), + amqp_cstring_bytes(exchange), + amqp_cstring_bytes(bindingkey), + amqp_empty_table); die_on_amqp_error(amqp_get_rpc_reply(conn), "Unbinding"); die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); diff --git a/examples/amqp_consumer.c b/examples/amqp_consumer.c index c1c0873..aa63346 100644 --- a/examples/amqp_consumer.c +++ b/examples/amqp_consumer.c @@ -66,7 +66,7 @@ static void run(amqp_connection_state_t conn) int countOverInterval = received - previous_received; double intervalRate = countOverInterval / ((now - previous_report_time) / 1000000.0); printf("%d ms: Received %d - %d since last report (%d Hz)\n", - (int)(now - start_time) / 1000, received, countOverInterval, (int) intervalRate); + (int)(now - start_time) / 1000, received, countOverInterval, (int) intervalRate); previous_received = received; previous_report_time = now; @@ -75,18 +75,22 @@ static void run(amqp_connection_state_t conn) amqp_maybe_release_buffers(conn); result = amqp_simple_wait_frame(conn, &frame); - if (result < 0) + if (result < 0) { return; + } - if (frame.frame_type != AMQP_FRAME_METHOD) + if (frame.frame_type != AMQP_FRAME_METHOD) { continue; + } - if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) + if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) { continue; + } result = amqp_simple_wait_frame(conn, &frame); - if (result < 0) + if (result < 0) { return; + } if (frame.frame_type != AMQP_FRAME_HEADER) { fprintf(stderr, "Expected header!"); @@ -98,12 +102,13 @@ static void run(amqp_connection_state_t conn) while (body_received < body_target) { result = amqp_simple_wait_frame(conn, &frame); - if (result < 0) - return; + if (result < 0) { + return; + } if (frame.frame_type != AMQP_FRAME_BODY) { - fprintf(stderr, "Expected body!"); - abort(); + fprintf(stderr, "Expected body!"); + abort(); } body_received += frame.payload.body_fragment.len; @@ -114,7 +119,8 @@ static void run(amqp_connection_state_t conn) } } -int main(int argc, char const * const *argv) { +int main(int argc, char const *const *argv) +{ char const *hostname; int port; char const *exchange; @@ -140,13 +146,13 @@ int main(int argc, char const * const *argv) { die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket"); amqp_set_sockfd(conn, sockfd); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), - "Logging in"); + "Logging in"); amqp_channel_open(conn, 1); die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); { amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_empty_bytes, 0, 0, 0, 1, - amqp_empty_table); + amqp_empty_table); die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue"); queuename = amqp_bytes_malloc_dup(r->queue); if (queuename.bytes == NULL) { @@ -156,7 +162,7 @@ int main(int argc, char const * const *argv) { } amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey), - amqp_empty_table); + amqp_empty_table); die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue"); amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 1, 0, amqp_empty_table); diff --git a/examples/amqp_exchange_declare.c b/examples/amqp_exchange_declare.c index 74f7465..749109c 100644 --- a/examples/amqp_exchange_declare.c +++ b/examples/amqp_exchange_declare.c @@ -41,7 +41,8 @@ #include "utils.h" -int main(int argc, char const * const *argv) { +int main(int argc, char const *const *argv) +{ char const *hostname; int port; char const *exchange; @@ -65,12 +66,12 @@ int main(int argc, char const * const *argv) { die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket"); amqp_set_sockfd(conn, sockfd); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), - "Logging in"); + "Logging in"); amqp_channel_open(conn, 1); die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); amqp_exchange_declare(conn, 1, amqp_cstring_bytes(exchange), amqp_cstring_bytes(exchangetype), - 0, 0, amqp_empty_table); + 0, 0, amqp_empty_table); die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring exchange"); die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); diff --git a/examples/amqp_listen.c b/examples/amqp_listen.c index a589fa5..0347375 100644 --- a/examples/amqp_listen.c +++ b/examples/amqp_listen.c @@ -43,7 +43,8 @@ #include "utils.h" -int main(int argc, char const * const *argv) { +int main(int argc, char const *const *argv) +{ char const *hostname; int port; char const *exchange; @@ -69,13 +70,13 @@ int main(int argc, char const * const *argv) { die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket"); amqp_set_sockfd(conn, sockfd); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), - "Logging in"); + "Logging in"); amqp_channel_open(conn, 1); die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); { amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_empty_bytes, 0, 0, 0, 1, - amqp_empty_table); + amqp_empty_table); die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue"); queuename = amqp_bytes_malloc_dup(r->queue); if (queuename.bytes == NULL) { @@ -85,7 +86,7 @@ int main(int argc, char const * const *argv) { } amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey), - amqp_empty_table); + amqp_empty_table); die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue"); amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 1, 0, amqp_empty_table); @@ -104,35 +105,39 @@ int main(int argc, char const * const *argv) { amqp_maybe_release_buffers(conn); result = amqp_simple_wait_frame(conn, &frame); printf("Result %d\n", result); - if (result < 0) - break; + if (result < 0) { + break; + } printf("Frame type %d, channel %d\n", frame.frame_type, frame.channel); - if (frame.frame_type != AMQP_FRAME_METHOD) - continue; + if (frame.frame_type != AMQP_FRAME_METHOD) { + continue; + } printf("Method %s\n", amqp_method_name(frame.payload.method.id)); - if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) - continue; + if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) { + continue; + } d = (amqp_basic_deliver_t *) frame.payload.method.decoded; printf("Delivery %u, exchange %.*s routingkey %.*s\n", - (unsigned) d->delivery_tag, - (int) d->exchange.len, (char *) d->exchange.bytes, - (int) d->routing_key.len, (char *) d->routing_key.bytes); + (unsigned) d->delivery_tag, + (int) d->exchange.len, (char *) d->exchange.bytes, + (int) d->routing_key.len, (char *) d->routing_key.bytes); result = amqp_simple_wait_frame(conn, &frame); - if (result < 0) - break; + if (result < 0) { + break; + } if (frame.frame_type != AMQP_FRAME_HEADER) { - fprintf(stderr, "Expected header!"); - abort(); + fprintf(stderr, "Expected header!"); + abort(); } p = (amqp_basic_properties_t *) frame.payload.properties.decoded; if (p->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) { - printf("Content-type: %.*s\n", - (int) p->content_type.len, (char *) p->content_type.bytes); + printf("Content-type: %.*s\n", + (int) p->content_type.len, (char *) p->content_type.bytes); } printf("----\n"); @@ -140,26 +145,27 @@ int main(int argc, char const * const *argv) { body_received = 0; while (body_received < body_target) { - result = amqp_simple_wait_frame(conn, &frame); - if (result < 0) - break; + result = amqp_simple_wait_frame(conn, &frame); + if (result < 0) { + break; + } - if (frame.frame_type != AMQP_FRAME_BODY) { - fprintf(stderr, "Expected body!"); - abort(); - } + if (frame.frame_type != AMQP_FRAME_BODY) { + fprintf(stderr, "Expected body!"); + abort(); + } - body_received += frame.payload.body_fragment.len; - assert(body_received <= body_target); + body_received += frame.payload.body_fragment.len; + assert(body_received <= body_target); - amqp_dump(frame.payload.body_fragment.bytes, - frame.payload.body_fragment.len); + amqp_dump(frame.payload.body_fragment.bytes, + frame.payload.body_fragment.len); } if (body_received != body_target) { - /* Can only happen when amqp_simple_wait_frame returns <= 0 */ - /* We break here to close the connection */ - break; + /* Can only happen when amqp_simple_wait_frame returns <= 0 */ + /* We break here to close the connection */ + break; } } } diff --git a/examples/amqp_listenq.c b/examples/amqp_listenq.c index 8af0f08..762831d 100644 --- a/examples/amqp_listenq.c +++ b/examples/amqp_listenq.c @@ -43,7 +43,8 @@ #include "utils.h" -int main(int argc, char const * const *argv) { +int main(int argc, char const *const *argv) +{ char const *hostname; int port; char const *queuename; @@ -65,7 +66,7 @@ int main(int argc, char const * const *argv) { die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket"); amqp_set_sockfd(conn, sockfd); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), - "Logging in"); + "Logging in"); amqp_channel_open(conn, 1); die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); @@ -85,35 +86,39 @@ int main(int argc, char const * const *argv) { amqp_maybe_release_buffers(conn); result = amqp_simple_wait_frame(conn, &frame); printf("Result %d\n", result); - if (result < 0) - break; + if (result < 0) { + break; + } printf("Frame type %d, channel %d\n", frame.frame_type, frame.channel); - if (frame.frame_type != AMQP_FRAME_METHOD) - continue; + if (frame.frame_type != AMQP_FRAME_METHOD) { + continue; + } printf("Method %s\n", amqp_method_name(frame.payload.method.id)); - if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) - continue; + if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) { + continue; + } d = (amqp_basic_deliver_t *) frame.payload.method.decoded; printf("Delivery %u, exchange %.*s routingkey %.*s\n", - (unsigned) d->delivery_tag, - (int) d->exchange.len, (char *) d->exchange.bytes, - (int) d->routing_key.len, (char *) d->routing_key.bytes); + (unsigned) d->delivery_tag, + (int) d->exchange.len, (char *) d->exchange.bytes, + (int) d->routing_key.len, (char *) d->routing_key.bytes); result = amqp_simple_wait_frame(conn, &frame); - if (result < 0) - break; + if (result < 0) { + break; + } if (frame.frame_type != AMQP_FRAME_HEADER) { - fprintf(stderr, "Expected header!"); - abort(); + fprintf(stderr, "Expected header!"); + abort(); } p = (amqp_basic_properties_t *) frame.payload.properties.decoded; if (p->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) { - printf("Content-type: %.*s\n", - (int) p->content_type.len, (char *) p->content_type.bytes); + printf("Content-type: %.*s\n", + (int) p->content_type.len, (char *) p->content_type.bytes); } printf("----\n"); @@ -121,26 +126,27 @@ int main(int argc, char const * const *argv) { body_received = 0; while (body_received < body_target) { - result = amqp_simple_wait_frame(conn, &frame); - if (result < 0) - break; + result = amqp_simple_wait_frame(conn, &frame); + if (result < 0) { + break; + } - if (frame.frame_type != AMQP_FRAME_BODY) { - fprintf(stderr, "Expected body!"); - abort(); - } + if (frame.frame_type != AMQP_FRAME_BODY) { + fprintf(stderr, "Expected body!"); + abort(); + } - body_received += frame.payload.body_fragment.len; - assert(body_received <= body_target); + body_received += frame.payload.body_fragment.len; + assert(body_received <= body_target); - amqp_dump(frame.payload.body_fragment.bytes, - frame.payload.body_fragment.len); + amqp_dump(frame.payload.body_fragment.bytes, + frame.payload.body_fragment.len); } if (body_received != body_target) { - /* Can only happen when amqp_simple_wait_frame returns <= 0 */ - /* We break here to close the connection */ - break; + /* Can only happen when amqp_simple_wait_frame returns <= 0 */ + /* We break here to close the connection */ + break; } amqp_basic_ack(conn, 1, d->delivery_tag, 0); diff --git a/examples/amqp_producer.c b/examples/amqp_producer.c index d85b944..4a1cd68 100644 --- a/examples/amqp_producer.c +++ b/examples/amqp_producer.c @@ -44,9 +44,9 @@ #define SUMMARY_EVERY_US 1000000 static void send_batch(amqp_connection_state_t conn, - char const *queue_name, - int rate_limit, - int message_count) + char const *queue_name, + int rate_limit, + int message_count) { uint64_t start_time = now_microseconds(); int i; @@ -69,20 +69,20 @@ static void send_batch(amqp_connection_state_t conn, uint64_t now = now_microseconds(); die_on_error(amqp_basic_publish(conn, - 1, - amqp_cstring_bytes("amq.direct"), - amqp_cstring_bytes(queue_name), - 0, - 0, - NULL, - message_bytes), - "Publishing"); + 1, + amqp_cstring_bytes("amq.direct"), + amqp_cstring_bytes(queue_name), + 0, + 0, + NULL, + message_bytes), + "Publishing"); sent++; if (now > next_summary_time) { int countOverInterval = sent - previous_sent; double intervalRate = countOverInterval / ((now - previous_report_time) / 1000000.0); printf("%d ms: Sent %d - %d since last report (%d Hz)\n", - (int)(now - start_time) / 1000, sent, countOverInterval, (int) intervalRate); + (int)(now - start_time) / 1000, sent, countOverInterval, (int) intervalRate); previous_sent = sent; previous_report_time = now; @@ -105,7 +105,8 @@ static void send_batch(amqp_connection_state_t conn, } } -int main(int argc, char const * const *argv) { +int main(int argc, char const *const *argv) +{ char const *hostname; int port; int rate_limit; @@ -129,7 +130,7 @@ int main(int argc, char const * const *argv) { die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket"); amqp_set_sockfd(conn, sockfd); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), - "Logging in"); + "Logging in"); amqp_channel_open(conn, 1); die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); diff --git a/examples/amqp_rpc_sendstring_client.c b/examples/amqp_rpc_sendstring_client.c index 04c073e..7914e2c 100644 --- a/examples/amqp_rpc_sendstring_client.c +++ b/examples/amqp_rpc_sendstring_client.c @@ -43,7 +43,8 @@ #include "utils.h" -int main(int argc, char* argv[]) { +int main(int argc, char *argv[]) +{ char const *hostname; int port; char const *exchange; @@ -152,16 +153,19 @@ int main(int argc, char* argv[]) { amqp_maybe_release_buffers(conn); result = amqp_simple_wait_frame(conn, &frame); printf("Result: %d\n", result); - if (result < 0) + if (result < 0) { break; + } printf("Frame type: %d channel: %d\n", frame.frame_type, frame.channel); - if (frame.frame_type != AMQP_FRAME_METHOD) + if (frame.frame_type != AMQP_FRAME_METHOD) { continue; + } printf("Method: %s\n", amqp_method_name(frame.payload.method.id)); - if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) + if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) { continue; + } d = (amqp_basic_deliver_t *) frame.payload.method.decoded; printf("Delivery: %u exchange: %.*s routingkey: %.*s\n", @@ -170,8 +174,9 @@ int main(int argc, char* argv[]) { (int) d->routing_key.len, (char *) d->routing_key.bytes); result = amqp_simple_wait_frame(conn, &frame); - if (result < 0) + if (result < 0) { break; + } if (frame.frame_type != AMQP_FRAME_HEADER) { fprintf(stderr, "Expected header!"); @@ -189,8 +194,9 @@ int main(int argc, char* argv[]) { while (body_received < body_target) { result = amqp_simple_wait_frame(conn, &frame); - if (result < 0) + if (result < 0) { break; + } if (frame.frame_type != AMQP_FRAME_BODY) { fprintf(stderr, "Expected body!"); diff --git a/examples/amqp_sendstring.c b/examples/amqp_sendstring.c index e108ea7..f1456c9 100644 --- a/examples/amqp_sendstring.c +++ b/examples/amqp_sendstring.c @@ -41,7 +41,8 @@ #include "utils.h" -int main(int argc, char const * const *argv) { +int main(int argc, char const *const *argv) +{ char const *hostname; int port; char const *exchange; @@ -67,7 +68,7 @@ int main(int argc, char const * const *argv) { die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket"); amqp_set_sockfd(conn, sockfd); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), - "Logging in"); + "Logging in"); amqp_channel_open(conn, 1); die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); @@ -77,14 +78,14 @@ int main(int argc, char const * const *argv) { props.content_type = amqp_cstring_bytes("text/plain"); props.delivery_mode = 2; /* persistent delivery mode */ die_on_error(amqp_basic_publish(conn, - 1, - amqp_cstring_bytes(exchange), - amqp_cstring_bytes(routingkey), - 0, - 0, - &props, - amqp_cstring_bytes(messagebody)), - "Publishing"); + 1, + amqp_cstring_bytes(exchange), + amqp_cstring_bytes(routingkey), + 0, + 0, + &props, + amqp_cstring_bytes(messagebody)), + "Publishing"); } die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); diff --git a/examples/amqp_unbind.c b/examples/amqp_unbind.c index 546b356..011f561 100644 --- a/examples/amqp_unbind.c +++ b/examples/amqp_unbind.c @@ -41,7 +41,8 @@ #include "utils.h" -int main(int argc, char const * const *argv) { +int main(int argc, char const *const *argv) +{ char const *hostname; int port; char const *exchange; @@ -67,15 +68,15 @@ int main(int argc, char const * const *argv) { die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket"); amqp_set_sockfd(conn, sockfd); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), - "Logging in"); + "Logging in"); amqp_channel_open(conn, 1); die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); amqp_queue_unbind(conn, 1, - amqp_cstring_bytes(queue), - amqp_cstring_bytes(exchange), - amqp_cstring_bytes(bindingkey), - amqp_empty_table); + amqp_cstring_bytes(queue), + amqp_cstring_bytes(exchange), + amqp_cstring_bytes(bindingkey), + amqp_empty_table); die_on_amqp_error(amqp_get_rpc_reply(conn), "Unbinding"); die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); diff --git a/examples/utils.c b/examples/utils.c index 691f14a..328a5e5 100644 --- a/examples/utils.c +++ b/examples/utils.c @@ -42,7 +42,8 @@ #include "utils.h" -void die_on_error(int x, char const *context) { +void die_on_error(int x, char const *context) +{ if (x < 0) { char *errstr = amqp_error_string(-x); fprintf(stderr, "%s: %s\n", context, errstr); @@ -51,85 +52,93 @@ void die_on_error(int x, char const *context) { } } -void die_on_amqp_error(amqp_rpc_reply_t x, char const *context) { +void die_on_amqp_error(amqp_rpc_reply_t x, char const *context) +{ switch (x.reply_type) { - case AMQP_RESPONSE_NORMAL: - return; - - case AMQP_RESPONSE_NONE: - fprintf(stderr, "%s: missing RPC reply type!\n", context); + case AMQP_RESPONSE_NORMAL: + return; + + case AMQP_RESPONSE_NONE: + fprintf(stderr, "%s: missing RPC reply type!\n", context); + break; + + case AMQP_RESPONSE_LIBRARY_EXCEPTION: + fprintf(stderr, "%s: %s\n", context, amqp_error_string(x.library_error)); + break; + + case AMQP_RESPONSE_SERVER_EXCEPTION: + switch (x.reply.id) { + case AMQP_CONNECTION_CLOSE_METHOD: { + amqp_connection_close_t *m = (amqp_connection_close_t *) x.reply.decoded; + fprintf(stderr, "%s: server connection error %d, message: %.*s\n", + context, + m->reply_code, + (int) m->reply_text.len, (char *) m->reply_text.bytes); break; - - case AMQP_RESPONSE_LIBRARY_EXCEPTION: - fprintf(stderr, "%s: %s\n", context, amqp_error_string(x.library_error)); + } + case AMQP_CHANNEL_CLOSE_METHOD: { + amqp_channel_close_t *m = (amqp_channel_close_t *) x.reply.decoded; + fprintf(stderr, "%s: server channel error %d, message: %.*s\n", + context, + m->reply_code, + (int) m->reply_text.len, (char *) m->reply_text.bytes); break; - - case AMQP_RESPONSE_SERVER_EXCEPTION: - switch (x.reply.id) { - case AMQP_CONNECTION_CLOSE_METHOD: { - amqp_connection_close_t *m = (amqp_connection_close_t *) x.reply.decoded; - fprintf(stderr, "%s: server connection error %d, message: %.*s\n", - context, - m->reply_code, - (int) m->reply_text.len, (char *) m->reply_text.bytes); - break; - } - case AMQP_CHANNEL_CLOSE_METHOD: { - amqp_channel_close_t *m = (amqp_channel_close_t *) x.reply.decoded; - fprintf(stderr, "%s: server channel error %d, message: %.*s\n", - context, - m->reply_code, - (int) m->reply_text.len, (char *) m->reply_text.bytes); - break; - } - default: - fprintf(stderr, "%s: unknown server error, method id 0x%08X\n", context, x.reply.id); - break; - } + } + default: + fprintf(stderr, "%s: unknown server error, method id 0x%08X\n", context, x.reply.id); break; + } + break; } exit(1); } -static void dump_row(long count, int numinrow, int *chs) { +static void dump_row(long count, int numinrow, int *chs) +{ int i; printf("%08lX:", count - numinrow); if (numinrow > 0) { for (i = 0; i < numinrow; i++) { - if (i == 8) - printf(" :"); + if (i == 8) { + printf(" :"); + } printf(" %02X", chs[i]); } for (i = numinrow; i < 16; i++) { - if (i == 8) - printf(" :"); + if (i == 8) { + printf(" :"); + } printf(" "); } printf(" "); for (i = 0; i < numinrow; i++) { - if (isprint(chs[i])) - printf("%c", chs[i]); - else - printf("."); + if (isprint(chs[i])) { + printf("%c", chs[i]); + } else { + printf("."); + } } } printf("\n"); } -static int rows_eq(int *a, int *b) { +static int rows_eq(int *a, int *b) +{ int i; for (i=0; i<16; i++) - if (a[i] != b[i]) + if (a[i] != b[i]) { return 0; + } return 1; } -void amqp_dump(void const *buffer, size_t len) { +void amqp_dump(void const *buffer, size_t len) +{ unsigned char *buf = (unsigned char *) buffer; long count = 0; int numinrow = 0; @@ -145,17 +154,18 @@ void amqp_dump(void const *buffer, size_t len) { int i; if (rows_eq(oldchs, chs)) { - if (!showed_dots) { - showed_dots = 1; - printf(" .. .. .. .. .. .. .. .. : .. .. .. .. .. .. .. ..\n"); - } + if (!showed_dots) { + showed_dots = 1; + printf(" .. .. .. .. .. .. .. .. : .. .. .. .. .. .. .. ..\n"); + } } else { - showed_dots = 0; - dump_row(count, numinrow, chs); + showed_dots = 0; + dump_row(count, numinrow, chs); } - for (i=0; i<16; i++) - oldchs[i] = chs[i]; + for (i=0; i<16; i++) { + oldchs[i] = chs[i]; + } numinrow = 0; } @@ -166,6 +176,7 @@ void amqp_dump(void const *buffer, size_t len) { dump_row(count, numinrow, chs); - if (numinrow != 0) + if (numinrow != 0) { printf("%08lX:\n", count); + } } diff --git a/examples/win32/platform_utils.c b/examples/win32/platform_utils.c index c7807f9..dff6243 100644 --- a/examples/win32/platform_utils.c +++ b/examples/win32/platform_utils.c @@ -40,7 +40,7 @@ uint64_t now_microseconds(void) FILETIME ft; GetSystemTimeAsFileTime(&ft); return (((uint64_t)ft.dwHighDateTime << 32) | (uint64_t)ft.dwLowDateTime) - / 10; + / 10; } void microsleep(int usec) diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h index 5de39d5..c379125 100644 --- a/librabbitmq/amqp.h +++ b/librabbitmq/amqp.h @@ -152,28 +152,28 @@ typedef struct amqp_array_t_ { t t Boolean b b Signed 8-bit B Unsigned 8-bit - U s Signed 16-bit (A1) + U s Signed 16-bit (A1) u Unsigned 16-bit - I I I Signed 32-bit - i Unsigned 32-bit - L l Signed 64-bit (B) - l Unsigned 64-bit - f f 32-bit float - d d 64-bit float - D D D Decimal - s Short string (A2) - S S S Long string - A Nested Array - T T T Timestamp (u64) - F F F Nested Table - V V V Void - x Byte array + I I I Signed 32-bit + i Unsigned 32-bit + L l Signed 64-bit (B) + l Unsigned 64-bit + f f 32-bit float + d d 64-bit float + D D D Decimal + s Short string (A2) + S S S Long string + A Nested Array + T T T Timestamp (u64) + F F F Nested Table + V V V Void + x Byte array Remarks: A1, A2: Notice how the types **CONFLICT** here. In Qpid and Rabbit, 's' means a signed 16-bit integer; in 0-9-1, it means a - short string. + short string. B: Notice how the signednesses **CONFLICT** here. In Qpid and Rabbit, 'l' means a signed 64-bit integer; in 0-9-1, it means an unsigned @@ -360,9 +360,9 @@ AMQP_CALL amqp_set_sockfd(amqp_connection_state_t state, int sockfd); AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_tune_connection(amqp_connection_state_t state, - int channel_max, - int frame_max, - int heartbeat); + int channel_max, + int frame_max, + int heartbeat); AMQP_PUBLIC_FUNCTION int @@ -375,8 +375,8 @@ AMQP_CALL amqp_destroy_connection(amqp_connection_state_t state); AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_handle_input(amqp_connection_state_t state, - amqp_bytes_t received_data, - amqp_frame_t *decoded_frame); + amqp_bytes_t received_data, + amqp_frame_t *decoded_frame); AMQP_PUBLIC_FUNCTION amqp_boolean_t @@ -413,37 +413,37 @@ AMQP_CALL amqp_frames_enqueued(amqp_connection_state_t state); AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_simple_wait_frame(amqp_connection_state_t state, - amqp_frame_t *decoded_frame); + amqp_frame_t *decoded_frame); AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_simple_wait_method(amqp_connection_state_t state, - amqp_channel_t expected_channel, - amqp_method_number_t expected_method, - amqp_method_t *output); + amqp_channel_t expected_channel, + amqp_method_number_t expected_method, + amqp_method_t *output); AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_send_method(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_method_number_t id, - void *decoded); + amqp_channel_t channel, + amqp_method_number_t id, + void *decoded); AMQP_PUBLIC_FUNCTION amqp_rpc_reply_t AMQP_CALL amqp_simple_rpc(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_method_number_t request_id, - amqp_method_number_t *expected_reply_ids, - void *decoded_request_method); + amqp_channel_t channel, + amqp_method_number_t request_id, + amqp_method_number_t *expected_reply_ids, + void *decoded_request_method); AMQP_PUBLIC_FUNCTION void * AMQP_CALL amqp_simple_rpc_decoded(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_method_number_t request_id, - amqp_method_number_t reply_id, - void *decoded_request_method); + amqp_channel_t channel, + amqp_method_number_t request_id, + amqp_method_number_t reply_id, + void *decoded_request_method); /* * The API methods corresponding to most synchronous AMQP methods @@ -465,23 +465,23 @@ AMQP_CALL amqp_get_rpc_reply(amqp_connection_state_t state); AMQP_PUBLIC_FUNCTION amqp_rpc_reply_t AMQP_CALL amqp_login(amqp_connection_state_t state, char const *vhost, - int channel_max, int frame_max, int heartbeat, - amqp_sasl_method_enum sasl_method, ...); + int channel_max, int frame_max, int heartbeat, + amqp_sasl_method_enum sasl_method, ...); struct amqp_basic_properties_t_; AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_basic_publish(amqp_connection_state_t state, amqp_channel_t channel, - amqp_bytes_t exchange, amqp_bytes_t routing_key, - amqp_boolean_t mandatory, amqp_boolean_t immediate, - struct amqp_basic_properties_t_ const *properties, - amqp_bytes_t body); + amqp_bytes_t exchange, amqp_bytes_t routing_key, + amqp_boolean_t mandatory, amqp_boolean_t immediate, + struct amqp_basic_properties_t_ const *properties, + amqp_bytes_t body); AMQP_PUBLIC_FUNCTION amqp_rpc_reply_t AMQP_CALL amqp_channel_close(amqp_connection_state_t state, amqp_channel_t channel, - int code); + int code); AMQP_PUBLIC_FUNCTION amqp_rpc_reply_t @@ -490,17 +490,17 @@ AMQP_CALL amqp_connection_close(amqp_connection_state_t state, int code); AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_basic_ack(amqp_connection_state_t state, amqp_channel_t channel, - uint64_t delivery_tag, amqp_boolean_t multiple); + uint64_t delivery_tag, amqp_boolean_t multiple); AMQP_PUBLIC_FUNCTION amqp_rpc_reply_t AMQP_CALL amqp_basic_get(amqp_connection_state_t state, amqp_channel_t channel, - amqp_bytes_t queue, amqp_boolean_t no_ack); + amqp_bytes_t queue, amqp_boolean_t no_ack); AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_basic_reject(amqp_connection_state_t state, amqp_channel_t channel, - uint64_t delivery_tag, amqp_boolean_t requeue); + uint64_t delivery_tag, amqp_boolean_t requeue); /* * Can be used to see if there is data still in the buffer, if so @@ -526,7 +526,7 @@ AMQP_CALL amqp_error_string(int err); AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_decode_table(amqp_bytes_t encoded, amqp_pool_t *pool, - amqp_table_t *output, size_t *offset); + amqp_table_t *output, size_t *offset); AMQP_PUBLIC_FUNCTION int @@ -541,11 +541,11 @@ struct amqp_connection_info { }; AMQP_PUBLIC_FUNCTION -void +void AMQP_CALL amqp_default_connection_info(struct amqp_connection_info *parsed); AMQP_PUBLIC_FUNCTION -int +int AMQP_CALL amqp_parse_url(char *url, struct amqp_connection_info *parsed); AMQP_END_DECLS diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c index 6faabbb..aacaf91 100644 --- a/librabbitmq/amqp_api.c +++ b/librabbitmq/amqp_api.c @@ -62,10 +62,11 @@ char *amqp_error_string(int err) switch (category) { case ERROR_CATEGORY_CLIENT: - if (err < 1 || err > ERROR_MAX) + if (err < 1 || err > ERROR_MAX) { str = "(undefined librabbitmq error)"; - else + } else { str = client_error_strings[err - 1]; + } break; case ERROR_CATEGORY_OS: @@ -80,31 +81,31 @@ char *amqp_error_string(int err) void amqp_abort(const char *fmt, ...) { - va_list ap; - va_start(ap, fmt); - vfprintf(stderr, fmt, ap); - va_end(ap); - fputc('\n', stderr); - abort(); + va_list ap; + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); + fputc('\n', stderr); + abort(); } const amqp_bytes_t amqp_empty_bytes = { 0, NULL }; const amqp_table_t amqp_empty_table = { 0, NULL }; const amqp_array_t amqp_empty_array = { 0, NULL }; -#define RPC_REPLY(replytype) \ - (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL \ - ? (replytype *) state->most_recent_api_result.reply.decoded \ +#define RPC_REPLY(replytype)\ + (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL\ + ? (replytype *) state->most_recent_api_result.reply.decoded\ : NULL) int amqp_basic_publish(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t exchange, - amqp_bytes_t routing_key, - amqp_boolean_t mandatory, - amqp_boolean_t immediate, - amqp_basic_properties_t const *properties, - amqp_bytes_t body) + amqp_channel_t channel, + amqp_bytes_t exchange, + amqp_bytes_t routing_key, + amqp_boolean_t mandatory, + amqp_boolean_t immediate, + amqp_basic_properties_t const *properties, + amqp_bytes_t body) { amqp_frame_t f; size_t body_offset; @@ -121,8 +122,9 @@ int amqp_basic_publish(amqp_connection_state_t state, m.ticket = 0; res = amqp_send_method(state, channel, AMQP_BASIC_PUBLISH_METHOD, &m); - if (res < 0) + if (res < 0) { return res; + } if (properties == NULL) { memset(&default_properties, 0, sizeof(default_properties)); @@ -136,15 +138,17 @@ int amqp_basic_publish(amqp_connection_state_t state, f.payload.properties.decoded = (void *) properties; res = amqp_send_frame(state, &f); - if (res < 0) + if (res < 0) { return res; + } body_offset = 0; while (body_offset < body.len) { size_t remaining = body.len - body_offset; - if (remaining == 0) + if (remaining == 0) { break; + } f.frame_type = AMQP_FRAME_BODY; f.channel = channel; @@ -157,16 +161,17 @@ int amqp_basic_publish(amqp_connection_state_t state, body_offset += f.payload.body_fragment.len; res = amqp_send_frame(state, &f); - if (res < 0) + if (res < 0) { return res; + } } return 0; } amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state, - amqp_channel_t channel, - int code) + amqp_channel_t channel, + int code) { char codestr[13]; amqp_method_number_t replies[2] = { AMQP_CHANNEL_CLOSE_OK_METHOD, 0}; @@ -179,11 +184,11 @@ amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state, req.method_id = 0; return amqp_simple_rpc(state, channel, AMQP_CHANNEL_CLOSE_METHOD, - replies, &req); + replies, &req); } amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state, - int code) + int code) { char codestr[13]; amqp_method_number_t replies[2] = { AMQP_CONNECTION_CLOSE_OK_METHOD, 0}; @@ -196,13 +201,13 @@ amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state, req.method_id = 0; return amqp_simple_rpc(state, 0, AMQP_CONNECTION_CLOSE_METHOD, - replies, &req); + replies, &req); } int amqp_basic_ack(amqp_connection_state_t state, - amqp_channel_t channel, - uint64_t delivery_tag, - amqp_boolean_t multiple) + amqp_channel_t channel, + uint64_t delivery_tag, + amqp_boolean_t multiple) { amqp_basic_ack_t m; m.delivery_tag = delivery_tag; @@ -211,28 +216,29 @@ int amqp_basic_ack(amqp_connection_state_t state, } amqp_rpc_reply_t amqp_basic_get(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_bytes_t queue, - amqp_boolean_t no_ack) + amqp_channel_t channel, + amqp_bytes_t queue, + amqp_boolean_t no_ack) { amqp_method_number_t replies[] = { AMQP_BASIC_GET_OK_METHOD, - AMQP_BASIC_GET_EMPTY_METHOD, - 0 }; + AMQP_BASIC_GET_EMPTY_METHOD, + 0 + }; amqp_basic_get_t req; req.ticket = 0; req.queue = queue; req.no_ack = no_ack; state->most_recent_api_result = amqp_simple_rpc(state, channel, - AMQP_BASIC_GET_METHOD, - replies, &req); + AMQP_BASIC_GET_METHOD, + replies, &req); return state->most_recent_api_result; } int amqp_basic_reject(amqp_connection_state_t state, - amqp_channel_t channel, - uint64_t delivery_tag, - amqp_boolean_t requeue) + amqp_channel_t channel, + uint64_t delivery_tag, + amqp_boolean_t requeue) { amqp_basic_reject_t req; req.delivery_tag = delivery_tag; diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c index 421c745..be7b1a8 100644 --- a/librabbitmq/amqp_connection.c +++ b/librabbitmq/amqp_connection.c @@ -46,36 +46,40 @@ #define INITIAL_DECODING_POOL_PAGE_SIZE 131072 #define INITIAL_INBOUND_SOCK_BUFFER_SIZE 131072 -#define ENFORCE_STATE(statevec, statenum) \ - { \ - amqp_connection_state_t _check_state = (statevec); \ - size_t _wanted_state = (statenum); \ - if (_check_state->state != _wanted_state) \ +#define ENFORCE_STATE(statevec, statenum) \ + { \ + amqp_connection_state_t _check_state = (statevec); \ + size_t _wanted_state = (statenum); \ + if (_check_state->state != _wanted_state) \ amqp_abort("Programming error: invalid AMQP connection state: expected %d, got %d", \ - _wanted_state, \ - _check_state->state); \ + _wanted_state, \ + _check_state->state); \ } -amqp_connection_state_t amqp_new_connection(void) { +amqp_connection_state_t amqp_new_connection(void) +{ int res; amqp_connection_state_t state = (amqp_connection_state_t) calloc(1, sizeof(struct amqp_connection_state_t_)); - if (state == NULL) + if (state == NULL) { return NULL; + } init_amqp_pool(&state->frame_pool, INITIAL_FRAME_POOL_PAGE_SIZE); init_amqp_pool(&state->decoding_pool, INITIAL_DECODING_POOL_PAGE_SIZE); res = amqp_tune_connection(state, 0, INITIAL_FRAME_POOL_PAGE_SIZE, 0); - if (-ERROR_NO_MEMORY == res) + if (-ERROR_NO_MEMORY == res) { return NULL; - else if (0 != res) + } else if (0 != res) { goto out_nomem; + } state->inbound_buffer.bytes = amqp_pool_alloc(&state->frame_pool, state->inbound_buffer.len); - if (state->inbound_buffer.bytes == NULL) + if (state->inbound_buffer.bytes == NULL) { goto out_nomem; + } state->state = CONNECTION_STATE_INITIAL; /* the server protocol version response is 8 bytes, which conveniently @@ -85,12 +89,13 @@ amqp_connection_state_t amqp_new_connection(void) { state->sockfd = -1; state->sock_inbound_buffer.len = INITIAL_INBOUND_SOCK_BUFFER_SIZE; state->sock_inbound_buffer.bytes = malloc(INITIAL_INBOUND_SOCK_BUFFER_SIZE); - if (state->sock_inbound_buffer.bytes == NULL) + if (state->sock_inbound_buffer.bytes == NULL) { goto out_nomem; + } return state; - out_nomem: +out_nomem: free(state->sock_inbound_buffer.bytes); empty_amqp_pool(&state->frame_pool); empty_amqp_pool(&state->decoding_pool); @@ -98,20 +103,21 @@ amqp_connection_state_t amqp_new_connection(void) { return NULL; } -int amqp_get_sockfd(amqp_connection_state_t state) { +int amqp_get_sockfd(amqp_connection_state_t state) +{ return state->sockfd; } void amqp_set_sockfd(amqp_connection_state_t state, - int sockfd) + int sockfd) { state->sockfd = sockfd; } int amqp_tune_connection(amqp_connection_state_t state, - int channel_max, - int frame_max, - int heartbeat) + int channel_max, + int frame_max, + int heartbeat) { void *newbuf; @@ -136,11 +142,13 @@ int amqp_tune_connection(amqp_connection_state_t state, return 0; } -int amqp_get_channel_max(amqp_connection_state_t state) { +int amqp_get_channel_max(amqp_connection_state_t state) +{ return state->channel_max; } -int amqp_destroy_connection(amqp_connection_state_t state) { +int amqp_destroy_connection(amqp_connection_state_t state) +{ int s = state->sockfd; empty_amqp_pool(&state->frame_pool); @@ -149,13 +157,15 @@ int amqp_destroy_connection(amqp_connection_state_t state) { free(state->sock_inbound_buffer.bytes); free(state); - if (s >= 0 && amqp_socket_close(s) < 0) + if (s >= 0 && amqp_socket_close(s) < 0) { return -amqp_socket_error(); - else + } else { return 0; + } } -static void return_to_idle(amqp_connection_state_t state) { +static void return_to_idle(amqp_connection_state_t state) +{ state->inbound_buffer.bytes = NULL; state->inbound_offset = 0; state->target_size = HEADER_SIZE; @@ -163,15 +173,16 @@ static void return_to_idle(amqp_connection_state_t state) { } static size_t consume_data(amqp_connection_state_t state, - amqp_bytes_t *received_data) + amqp_bytes_t *received_data) { /* how much data is available and will fit? */ size_t bytes_consumed = state->target_size - state->inbound_offset; - if (received_data->len < bytes_consumed) + if (received_data->len < bytes_consumed) { bytes_consumed = received_data->len; + } memcpy(amqp_offset(state->inbound_buffer.bytes, state->inbound_offset), - received_data->bytes, bytes_consumed); + received_data->bytes, bytes_consumed); state->inbound_offset += bytes_consumed; received_data->bytes = amqp_offset(received_data->bytes, bytes_consumed); received_data->len -= bytes_consumed; @@ -180,8 +191,8 @@ static size_t consume_data(amqp_connection_state_t state, } int amqp_handle_input(amqp_connection_state_t state, - amqp_bytes_t received_data, - amqp_frame_t *decoded_frame) + amqp_bytes_t received_data, + amqp_frame_t *decoded_frame) { size_t bytes_consumed; void *raw_frame; @@ -190,17 +201,20 @@ int amqp_handle_input(amqp_connection_state_t state, or a complete, ignored frame was read. */ decoded_frame->frame_type = 0; - if (received_data.len == 0) + if (received_data.len == 0) { return 0; + } if (state->state == CONNECTION_STATE_IDLE) { state->inbound_buffer.bytes = amqp_pool_alloc(&state->frame_pool, - state->inbound_buffer.len); + state->inbound_buffer.len); if (state->inbound_buffer.bytes == NULL) /* state->inbound_buffer.len is always nonzero, because it - corresponds to frame_max, which is not permitted to be less - than AMQP_FRAME_MIN_SIZE (currently 4096 bytes). */ + corresponds to frame_max, which is not permitted to be less + than AMQP_FRAME_MIN_SIZE (currently 4096 bytes). */ + { return -ERROR_NO_MEMORY; + } state->state = CONNECTION_STATE_HEADER; } @@ -209,8 +223,9 @@ int amqp_handle_input(amqp_connection_state_t state, /* do we have target_size data yet? if not, return with the expectation that more will arrive */ - if (state->inbound_offset < state->target_size) + if (state->inbound_offset < state->target_size) { return bytes_consumed; + } raw_frame = state->inbound_buffer.bytes; @@ -222,13 +237,13 @@ int amqp_handle_input(amqp_connection_state_t state, decoded_frame->channel = 0; decoded_frame->payload.protocol_header.transport_high - = amqp_d8(raw_frame, 4); + = amqp_d8(raw_frame, 4); decoded_frame->payload.protocol_header.transport_low - = amqp_d8(raw_frame, 5); + = amqp_d8(raw_frame, 5); decoded_frame->payload.protocol_header.protocol_version_major - = amqp_d8(raw_frame, 6); + = amqp_d8(raw_frame, 6); decoded_frame->payload.protocol_header.protocol_version_minor - = amqp_d8(raw_frame, 7); + = amqp_d8(raw_frame, 7); return_to_idle(state); return bytes_consumed; @@ -240,15 +255,16 @@ int amqp_handle_input(amqp_connection_state_t state, case CONNECTION_STATE_HEADER: /* frame length is 3 bytes in */ state->target_size - = amqp_d32(raw_frame, 3) + HEADER_SIZE + FOOTER_SIZE; + = amqp_d32(raw_frame, 3) + HEADER_SIZE + FOOTER_SIZE; state->state = CONNECTION_STATE_BODY; bytes_consumed += consume_data(state, &received_data); /* do we have target_size data yet? if not, return with the expectation that more will arrive */ - if (state->inbound_offset < state->target_size) + if (state->inbound_offset < state->target_size) { return bytes_consumed; + } /* fall through to process body */ @@ -257,8 +273,9 @@ int amqp_handle_input(amqp_connection_state_t state, int res; /* Check frame end marker (footer) */ - if (amqp_d8(raw_frame, state->target_size - 1) != AMQP_FRAME_END) + if (amqp_d8(raw_frame, state->target_size - 1) != AMQP_FRAME_END) { return -ERROR_BAD_AMQP_DATA; + } decoded_frame->frame_type = amqp_d8(raw_frame, 0); decoded_frame->channel = amqp_d16(raw_frame, 1); @@ -270,19 +287,20 @@ int amqp_handle_input(amqp_connection_state_t state, encoded.len = state->target_size - HEADER_SIZE - 4 - FOOTER_SIZE; res = amqp_decode_method(decoded_frame->payload.method.id, - &state->decoding_pool, encoded, - &decoded_frame->payload.method.decoded); - if (res < 0) - return res; + &state->decoding_pool, encoded, + &decoded_frame->payload.method.decoded); + if (res < 0) { + return res; + } break; case AMQP_FRAME_HEADER: decoded_frame->payload.properties.class_id - = amqp_d16(raw_frame, HEADER_SIZE); + = amqp_d16(raw_frame, HEADER_SIZE); /* unused 2-byte weight field goes here */ decoded_frame->payload.properties.body_size - = amqp_d64(raw_frame, HEADER_SIZE + 4); + = amqp_d64(raw_frame, HEADER_SIZE + 4); encoded.bytes = amqp_offset(raw_frame, HEADER_SIZE + 12); encoded.len = state->target_size - HEADER_SIZE - 12 - FOOTER_SIZE; decoded_frame->payload.properties.raw = encoded; @@ -290,16 +308,17 @@ int amqp_handle_input(amqp_connection_state_t state, res = amqp_decode_properties(decoded_frame->payload.properties.class_id, &state->decoding_pool, encoded, &decoded_frame->payload.properties.decoded); - if (res < 0) + if (res < 0) { return res; + } break; case AMQP_FRAME_BODY: decoded_frame->payload.body_fragment.len - = state->target_size - HEADER_SIZE - FOOTER_SIZE; + = state->target_size - HEADER_SIZE - FOOTER_SIZE; decoded_frame->payload.body_fragment.bytes - = amqp_offset(raw_frame, HEADER_SIZE); + = amqp_offset(raw_frame, HEADER_SIZE); break; case AMQP_FRAME_HEARTBEAT: @@ -321,28 +340,32 @@ int amqp_handle_input(amqp_connection_state_t state, } } -amqp_boolean_t amqp_release_buffers_ok(amqp_connection_state_t state) { +amqp_boolean_t amqp_release_buffers_ok(amqp_connection_state_t state) +{ return (state->state == CONNECTION_STATE_IDLE) && (state->first_queued_frame == NULL); } -void amqp_release_buffers(amqp_connection_state_t state) { +void amqp_release_buffers(amqp_connection_state_t state) +{ ENFORCE_STATE(state, CONNECTION_STATE_IDLE); - if (state->first_queued_frame) + if (state->first_queued_frame) { amqp_abort("Programming error: attempt to amqp_release_buffers while waiting events enqueued"); + } recycle_amqp_pool(&state->frame_pool); recycle_amqp_pool(&state->decoding_pool); } -void amqp_maybe_release_buffers(amqp_connection_state_t state) { +void amqp_maybe_release_buffers(amqp_connection_state_t state) +{ if (amqp_release_buffers_ok(state)) { amqp_release_buffers(state); } } int amqp_send_frame(amqp_connection_state_t state, - const amqp_frame_t *frame) + const amqp_frame_t *frame) { void *out_frame = state->outbound_buffer.bytes; int res; @@ -367,8 +390,7 @@ int amqp_send_frame(amqp_connection_state_t state, iov[2].iov_len = FOOTER_SIZE; res = amqp_socket_writev(state->sockfd, iov, 3); - } - else { + } else { size_t out_frame_len; amqp_bytes_t encoded; @@ -381,8 +403,9 @@ int amqp_send_frame(amqp_connection_state_t state, res = amqp_encode_method(frame->payload.method.id, frame->payload.method.decoded, encoded); - if (res < 0) + if (res < 0) { return res; + } out_frame_len = res + 4; break; @@ -397,8 +420,9 @@ int amqp_send_frame(amqp_connection_state_t state, res = amqp_encode_properties(frame->payload.properties.class_id, frame->payload.properties.decoded, encoded); - if (res < 0) + if (res < 0) { return res; + } out_frame_len = res + 12; break; @@ -417,8 +441,9 @@ int amqp_send_frame(amqp_connection_state_t state, out_frame_len + HEADER_SIZE + FOOTER_SIZE, MSG_NOSIGNAL); } - if (res < 0) + if (res < 0) { return -amqp_socket_error(); - else + } else { return 0; + } } diff --git a/librabbitmq/amqp_mem.c b/librabbitmq/amqp_mem.c index 6915763..0f49df2 100644 --- a/librabbitmq/amqp_mem.c +++ b/librabbitmq/amqp_mem.c @@ -43,11 +43,13 @@ #include #include -char const *amqp_version(void) { +char const *amqp_version(void) +{ return VERSION; /* defined in config.h */ } -void init_amqp_pool(amqp_pool_t *pool, size_t pagesize) { +void init_amqp_pool(amqp_pool_t *pool, size_t pagesize) +{ pool->pagesize = pagesize ? pagesize : 4096; pool->pages.num_blocks = 0; @@ -61,7 +63,8 @@ void init_amqp_pool(amqp_pool_t *pool, size_t pagesize) { pool->alloc_used = 0; } -static void empty_blocklist(amqp_pool_blocklist_t *x) { +static void empty_blocklist(amqp_pool_blocklist_t *x) +{ int i; for (i = 0; i < x->num_blocks; i++) { @@ -74,30 +77,35 @@ static void empty_blocklist(amqp_pool_blocklist_t *x) { x->blocklist = NULL; } -void recycle_amqp_pool(amqp_pool_t *pool) { +void recycle_amqp_pool(amqp_pool_t *pool) +{ empty_blocklist(&pool->large_blocks); pool->next_page = 0; pool->alloc_block = NULL; pool->alloc_used = 0; } -void empty_amqp_pool(amqp_pool_t *pool) { +void empty_amqp_pool(amqp_pool_t *pool) +{ recycle_amqp_pool(pool); empty_blocklist(&pool->pages); } /* Returns 1 on success, 0 on failure */ -static int record_pool_block(amqp_pool_blocklist_t *x, void *block) { +static int record_pool_block(amqp_pool_blocklist_t *x, void *block) +{ size_t blocklistlength = sizeof(void *) * (x->num_blocks + 1); if (x->blocklist == NULL) { x->blocklist = malloc(blocklistlength); - if (x->blocklist == NULL) + if (x->blocklist == NULL) { return 0; + } } else { void *newbl = realloc(x->blocklist, blocklistlength); - if (newbl == NULL) + if (newbl == NULL) { return 0; + } x->blocklist = newbl; } @@ -106,7 +114,8 @@ static int record_pool_block(amqp_pool_blocklist_t *x, void *block) { return 1; } -void *amqp_pool_alloc(amqp_pool_t *pool, size_t amount) { +void *amqp_pool_alloc(amqp_pool_t *pool, size_t amount) +{ if (amount == 0) { return NULL; } @@ -118,8 +127,9 @@ void *amqp_pool_alloc(amqp_pool_t *pool, size_t amount) { if (result == NULL) { return NULL; } - if (!record_pool_block(&pool->large_blocks, result)) + if (!record_pool_block(&pool->large_blocks, result)) { return NULL; + } return result; } @@ -138,8 +148,9 @@ void *amqp_pool_alloc(amqp_pool_t *pool, size_t amount) { if (pool->alloc_block == NULL) { return NULL; } - if (!record_pool_block(&pool->pages, pool->alloc_block)) + if (!record_pool_block(&pool->pages, pool->alloc_block)) { return NULL; + } pool->next_page = pool->pages.num_blocks; } else { pool->alloc_block = pool->pages.blocklist[pool->next_page]; @@ -151,19 +162,22 @@ void *amqp_pool_alloc(amqp_pool_t *pool, size_t amount) { return pool->alloc_block; } -void amqp_pool_alloc_bytes(amqp_pool_t *pool, size_t amount, amqp_bytes_t *output) { +void amqp_pool_alloc_bytes(amqp_pool_t *pool, size_t amount, amqp_bytes_t *output) +{ output->len = amount; output->bytes = amqp_pool_alloc(pool, amount); } -amqp_bytes_t amqp_cstring_bytes(char const *cstr) { +amqp_bytes_t amqp_cstring_bytes(char const *cstr) +{ amqp_bytes_t result; result.len = strlen(cstr); result.bytes = (void *) cstr; return result; } -amqp_bytes_t amqp_bytes_malloc_dup(amqp_bytes_t src) { +amqp_bytes_t amqp_bytes_malloc_dup(amqp_bytes_t src) +{ amqp_bytes_t result; result.len = src.len; result.bytes = malloc(src.len); @@ -173,13 +187,15 @@ amqp_bytes_t amqp_bytes_malloc_dup(amqp_bytes_t src) { return result; } -amqp_bytes_t amqp_bytes_malloc(size_t amount) { +amqp_bytes_t amqp_bytes_malloc(size_t amount) +{ amqp_bytes_t result; result.len = amount; result.bytes = malloc(amount); /* will return NULL if it fails */ return result; } -void amqp_bytes_free(amqp_bytes_t bytes) { +void amqp_bytes_free(amqp_bytes_t bytes) +{ free(bytes.bytes); } diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h index 725e3c5..191155e 100644 --- a/librabbitmq/amqp_private.h +++ b/librabbitmq/amqp_private.h @@ -159,53 +159,53 @@ static inline void *amqp_offset(void *data, size_t offset) /* This macro defines the encoding and decoding functions associated with a simple type. */ -#define DECLARE_CODEC_BASE_TYPE(bits, htonx, ntohx) \ - \ -static inline void amqp_e##bits(void *data, size_t offset, \ - uint##bits##_t val) \ -{ \ - /* The AMQP data might be unaligned. So we encode and then copy the \ - result into place. */ \ - uint##bits##_t res = htonx(val); \ - memcpy(amqp_offset(data, offset), &res, bits/8); \ -} \ - \ -static inline uint##bits##_t amqp_d##bits(void *data, size_t offset) \ -{ \ - /* The AMQP data might be unaligned. So we copy the source value \ - into a variable and then decode it. */ \ - uint##bits##_t val; \ - memcpy(&val, amqp_offset(data, offset), bits/8); \ - return ntohx(val); \ -} \ - \ -static inline int amqp_encode_##bits(amqp_bytes_t encoded, size_t *offset, \ - uint##bits##_t input) \ - \ -{ \ - size_t o = *offset; \ - if ((*offset = o + bits / 8) <= encoded.len) { \ - amqp_e##bits(encoded.bytes, o, input); \ - return 1; \ - } \ - else { \ - return 0; \ - } \ -} \ - \ -static inline int amqp_decode_##bits(amqp_bytes_t encoded, size_t *offset, \ - uint##bits##_t *output) \ - \ -{ \ - size_t o = *offset; \ - if ((*offset = o + bits / 8) <= encoded.len) { \ - *output = amqp_d##bits(encoded.bytes, o); \ - return 1; \ - } \ - else { \ - return 0; \ - } \ -} +#define DECLARE_CODEC_BASE_TYPE(bits, htonx, ntohx) \ + \ + static inline void amqp_e##bits(void *data, size_t offset, \ + uint##bits##_t val) \ + { \ + /* The AMQP data might be unaligned. So we encode and then copy the \ + result into place. */ \ + uint##bits##_t res = htonx(val); \ + memcpy(amqp_offset(data, offset), &res, bits/8); \ + } \ + \ + static inline uint##bits##_t amqp_d##bits(void *data, size_t offset) \ + { \ + /* The AMQP data might be unaligned. So we copy the source value \ + into a variable and then decode it. */ \ + uint##bits##_t val; \ + memcpy(&val, amqp_offset(data, offset), bits/8); \ + return ntohx(val); \ + } \ + \ + static inline int amqp_encode_##bits(amqp_bytes_t encoded, size_t *offset, \ + uint##bits##_t input) \ + \ + { \ + size_t o = *offset; \ + if ((*offset = o + bits / 8) <= encoded.len) { \ + amqp_e##bits(encoded.bytes, o, input); \ + return 1; \ + } \ + else { \ + return 0; \ + } \ + } \ + \ + static inline int amqp_decode_##bits(amqp_bytes_t encoded, size_t *offset, \ + uint##bits##_t *output) \ + \ + { \ + size_t o = *offset; \ + if ((*offset = o + bits / 8) <= encoded.len) { \ + *output = amqp_d##bits(encoded.bytes, o); \ + return 1; \ + } \ + else { \ + return 0; \ + } \ + } /* Determine byte order */ #if defined(__GLIBC__) @@ -215,7 +215,7 @@ static inline int amqp_decode_##bits(amqp_bytes_t encoded, size_t *offset, \ # elif (__BYTE_ORDER == __BIG_ENDIAN) # define AMQP_BIG_ENDIAN # else - /* Don't define anything */ +/* Don't define anything */ # endif #elif defined(_BIG_ENDIAN) && !defined(_LITTLE_ENDIAN) || \ defined(__BIG_ENDIAN__) && !defined(__LITTLE_ENDIAN__) @@ -235,40 +235,40 @@ static inline int amqp_decode_##bits(amqp_bytes_t encoded, size_t *offset, \ defined(__i386__) || defined(_M_IX86) # define AMQP_LITTLE_ENDIAN #else - /* Don't define anything */ +/* Don't define anything */ #endif #if defined(AMQP_LITTLE_ENDIAN) -#define DECLARE_XTOXLL(func) \ -static inline uint64_t func##ll(uint64_t val) \ -{ \ - union { \ - uint64_t whole; \ - uint32_t halves[2]; \ - } u; \ - uint32_t t; \ - u.whole = val; \ - t = u.halves[0]; \ - u.halves[0] = func##l(u.halves[1]); \ - u.halves[1] = func##l(t); \ - return u.whole; \ -} +#define DECLARE_XTOXLL(func) \ + static inline uint64_t func##ll(uint64_t val) \ + { \ + union { \ + uint64_t whole; \ + uint32_t halves[2]; \ + } u; \ + uint32_t t; \ + u.whole = val; \ + t = u.halves[0]; \ + u.halves[0] = func##l(u.halves[1]); \ + u.halves[1] = func##l(t); \ + return u.whole; \ + } #elif defined(AMQP_BIG_ENDIAN) -#define DECLARE_XTOXLL(func) \ -static inline uint64_t func##ll(uint64_t val) \ -{ \ - union { \ - uint64_t whole; \ - uint32_t halves[2]; \ - } u; \ - u.whole = val; \ - u.halves[0] = func##l(u.halves[0]); \ - u.halves[1] = func##l(u.halves[1]); \ - return u.whole; \ -} +#define DECLARE_XTOXLL(func) \ + static inline uint64_t func##ll(uint64_t val) \ + { \ + union { \ + uint64_t whole; \ + uint32_t halves[2]; \ + } u; \ + u.whole = val; \ + u.halves[0] = func##l(u.halves[0]); \ + u.halves[1] = func##l(u.halves[1]); \ + return u.whole; \ + } #else # error Endianness not known @@ -285,28 +285,26 @@ DECLARE_CODEC_BASE_TYPE(32, htonl, ntohl) DECLARE_CODEC_BASE_TYPE(64, htonll, ntohll) static inline int amqp_encode_bytes(amqp_bytes_t encoded, size_t *offset, - amqp_bytes_t input) + amqp_bytes_t input) { size_t o = *offset; if ((*offset = o + input.len) <= encoded.len) { memcpy(amqp_offset(encoded.bytes, o), input.bytes, input.len); return 1; - } - else { + } else { return 0; } } static inline int amqp_decode_bytes(amqp_bytes_t encoded, size_t *offset, - amqp_bytes_t *output, size_t len) + amqp_bytes_t *output, size_t len) { size_t o = *offset; if ((*offset = o + len) <= encoded.len) { output->bytes = amqp_offset(encoded.bytes, o); output->len = len; return 1; - } - else { + } else { return 0; } } diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index c323792..8454d45 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -44,7 +44,7 @@ #include int amqp_open_socket(char const *hostname, - int portnumber) + int portnumber) { struct addrinfo hint; struct addrinfo *address_list; @@ -54,8 +54,9 @@ int amqp_open_socket(char const *hostname, int last_error = 0; int one = 1; /* for setsockopt */ - if (0 != (last_error = amqp_socket_init())) + if (0 != (last_error = amqp_socket_init())) { return last_error; + } memset(&hint, 0, sizeof(hint)); hint.ai_family = PF_UNSPEC; /* PF_INET or PF_INET6 */ @@ -66,63 +67,58 @@ int amqp_open_socket(char const *hostname, last_error = getaddrinfo(hostname, portnumber_string, &hint, &address_list); - if (last_error != 0) - { + if (last_error != 0) { return -ERROR_GETHOSTBYNAME_FAILED; } - for (addr = address_list; addr; addr = addr->ai_next) - { + for (addr = address_list; addr; addr = addr->ai_next) { /* This cast is to squash warnings on Win64, see: http://stackoverflow.com/questions/1953639/is-it-safe-to-cast-socket-to-int-under-win64 */ sockfd = (int)socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol); - if (-1 == sockfd) - { + if (-1 == sockfd) { last_error = -amqp_socket_error(); continue; } #ifdef DISABLE_SIGPIPE_WITH_SETSOCKOPT - if (0 != amqp_socket_setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one))) - { + if (0 != amqp_socket_setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one))) { last_error = -amqp_socket_error(); amqp_socket_close(sockfd); continue; } #endif /* DISABLE_SIGPIPE_WITH_SETSOCKOPT */ if (0 != amqp_socket_setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one)) - || 0 != connect(sockfd, addr->ai_addr, addr->ai_addrlen)) - { + || 0 != connect(sockfd, addr->ai_addr, addr->ai_addrlen)) { last_error = -amqp_socket_error(); amqp_socket_close(sockfd); continue; - } - else - { + } else { last_error = 0; break; } } freeaddrinfo(address_list); - if (last_error != 0) - { + if (last_error != 0) { return last_error; } return sockfd; } -int amqp_send_header(amqp_connection_state_t state) { +int amqp_send_header(amqp_connection_state_t state) +{ static const uint8_t header[8] = { 'A', 'M', 'Q', 'P', 0, - AMQP_PROTOCOL_VERSION_MAJOR, - AMQP_PROTOCOL_VERSION_MINOR, - AMQP_PROTOCOL_VERSION_REVISION }; + AMQP_PROTOCOL_VERSION_MAJOR, + AMQP_PROTOCOL_VERSION_MINOR, + AMQP_PROTOCOL_VERSION_REVISION + }; return send(state->sockfd, (void *)header, 8, MSG_NOSIGNAL); } -static amqp_bytes_t sasl_method_name(amqp_sasl_method_enum method) { +static amqp_bytes_t sasl_method_name(amqp_sasl_method_enum method) +{ amqp_bytes_t res; switch (method) { @@ -139,40 +135,43 @@ static amqp_bytes_t sasl_method_name(amqp_sasl_method_enum method) { } static amqp_bytes_t sasl_response(amqp_pool_t *pool, - amqp_sasl_method_enum method, - va_list args) + amqp_sasl_method_enum method, + va_list args) { amqp_bytes_t response; switch (method) { - case AMQP_SASL_METHOD_PLAIN: { - char *username = va_arg(args, char *); - size_t username_len = strlen(username); - char *password = va_arg(args, char *); - size_t password_len = strlen(password); - char *response_buf; - - amqp_pool_alloc_bytes(pool, strlen(username) + strlen(password) + 2, &response); - if (response.bytes == NULL) - /* We never request a zero-length block, because of the +2 - above, so a NULL here really is ENOMEM. */ - return response; - - response_buf = response.bytes; - response_buf[0] = 0; - memcpy(response_buf + 1, username, username_len); - response_buf[username_len + 1] = 0; - memcpy(response_buf + username_len + 2, password, password_len); - break; + case AMQP_SASL_METHOD_PLAIN: { + char *username = va_arg(args, char *); + size_t username_len = strlen(username); + char *password = va_arg(args, char *); + size_t password_len = strlen(password); + char *response_buf; + + amqp_pool_alloc_bytes(pool, strlen(username) + strlen(password) + 2, &response); + if (response.bytes == NULL) + /* We never request a zero-length block, because of the +2 + above, so a NULL here really is ENOMEM. */ + { + return response; } - default: - amqp_abort("Invalid SASL method: %d", (int) method); + + response_buf = response.bytes; + response_buf[0] = 0; + memcpy(response_buf + 1, username, username_len); + response_buf[username_len + 1] = 0; + memcpy(response_buf + username_len + 2, password, password_len); + break; + } + default: + amqp_abort("Invalid SASL method: %d", (int) method); } return response; } -amqp_boolean_t amqp_frames_enqueued(amqp_connection_state_t state) { +amqp_boolean_t amqp_frames_enqueued(amqp_connection_state_t state) +{ return (state->first_queued_frame != NULL); } @@ -180,12 +179,13 @@ amqp_boolean_t amqp_frames_enqueued(amqp_connection_state_t state) { * Check to see if we have data in our buffer. If this returns 1, we * will avoid an immediate blocking read in amqp_simple_wait_frame. */ -amqp_boolean_t amqp_data_in_buffer(amqp_connection_state_t state) { +amqp_boolean_t amqp_data_in_buffer(amqp_connection_state_t state) +{ return (state->sock_inbound_offset < state->sock_inbound_limit); } static int wait_frame_inner(amqp_connection_state_t state, - amqp_frame_t *decoded_frame) + amqp_frame_t *decoded_frame) { while (1) { int res; @@ -196,26 +196,29 @@ static int wait_frame_inner(amqp_connection_state_t state, buffer.bytes = ((char *) state->sock_inbound_buffer.bytes) + state->sock_inbound_offset; res = amqp_handle_input(state, buffer, decoded_frame); - if (res < 0) - return res; + if (res < 0) { + return res; + } state->sock_inbound_offset += res; - if (decoded_frame->frame_type != 0) - /* Complete frame was read. Return it. */ - return 0; + if (decoded_frame->frame_type != 0) { + /* Complete frame was read. Return it. */ + return 0; + } /* Incomplete or ignored frame. Keep processing input. */ assert(res != 0); } res = recv(state->sockfd, state->sock_inbound_buffer.bytes, - state->sock_inbound_buffer.len, 0); + state->sock_inbound_buffer.len, 0); if (res <= 0) { - if (res == 0) - return -ERROR_CONNECTION_CLOSED; - else - return -amqp_socket_error(); + if (res == 0) { + return -ERROR_CONNECTION_CLOSED; + } else { + return -amqp_socket_error(); + } } state->sock_inbound_limit = res; @@ -224,7 +227,7 @@ static int wait_frame_inner(amqp_connection_state_t state, } int amqp_simple_wait_frame(amqp_connection_state_t state, - amqp_frame_t *decoded_frame) + amqp_frame_t *decoded_frame) { if (state->first_queued_frame != NULL) { amqp_frame_t *f = (amqp_frame_t *) state->first_queued_frame->data; @@ -240,38 +243,42 @@ int amqp_simple_wait_frame(amqp_connection_state_t state, } int amqp_simple_wait_method(amqp_connection_state_t state, - amqp_channel_t expected_channel, - amqp_method_number_t expected_method, - amqp_method_t *output) + amqp_channel_t expected_channel, + amqp_method_number_t expected_method, + amqp_method_t *output) { amqp_frame_t frame; int res = amqp_simple_wait_frame(state, &frame); - if (res < 0) + if (res < 0) { return res; + } - if (frame.channel != expected_channel) + if (frame.channel != expected_channel) { amqp_abort("Expected 0x%08X method frame on channel %d, got frame on channel %d", - expected_method, - expected_channel, - frame.channel); - if (frame.frame_type != AMQP_FRAME_METHOD) + expected_method, + expected_channel, + frame.channel); + } + if (frame.frame_type != AMQP_FRAME_METHOD) { amqp_abort("Expected 0x%08X method frame on channel %d, got frame type %d", - expected_method, - expected_channel, - frame.frame_type); - if (frame.payload.method.id != expected_method) + expected_method, + expected_channel, + frame.frame_type); + } + if (frame.payload.method.id != expected_method) { amqp_abort("Expected method ID 0x%08X on channel %d, got ID 0x%08X", - expected_method, - expected_channel, - frame.payload.method.id); + expected_method, + expected_channel, + frame.payload.method.id); + } *output = frame.payload.method; return 0; } int amqp_send_method(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_method_number_t id, - void *decoded) + amqp_channel_t channel, + amqp_method_number_t id, + void *decoded) { amqp_frame_t frame; @@ -285,17 +292,19 @@ int amqp_send_method(amqp_connection_state_t state, static int amqp_id_in_reply_list( amqp_method_number_t expected, amqp_method_number_t *list ) { while ( *list != 0 ) { - if ( *list == expected ) return 1; + if ( *list == expected ) { + return 1; + } list++; } return 0; } amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_method_number_t request_id, - amqp_method_number_t *expected_reply_ids, - void *decoded_request_method) + amqp_channel_t channel, + amqp_method_number_t request_id, + amqp_method_number_t *expected_reply_ids, + void *decoded_request_method) { int status; amqp_rpc_reply_t result; @@ -312,7 +321,7 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, { amqp_frame_t frame; - retry: +retry: status = wait_frame_inner(state, &frame); if (status < 0) { result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; @@ -328,21 +337,23 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, * - on the channel we want, and a channel.close frame, or * - on channel zero, and a connection.close frame. */ - if (!( (frame.frame_type == AMQP_FRAME_METHOD) && - ( ((frame.channel == channel) && - ((amqp_id_in_reply_list(frame.payload.method.id, expected_reply_ids)) || - (frame.payload.method.id == AMQP_CHANNEL_CLOSE_METHOD))) - || - ((frame.channel == 0) && - (frame.payload.method.id == AMQP_CONNECTION_CLOSE_METHOD)) ) )) - { + if (!((frame.frame_type == AMQP_FRAME_METHOD) + && ( + ((frame.channel == channel) + && (amqp_id_in_reply_list(frame.payload.method.id, expected_reply_ids) + || (frame.payload.method.id == AMQP_CHANNEL_CLOSE_METHOD))) + || + ((frame.channel == 0) + && (frame.payload.method.id == AMQP_CONNECTION_CLOSE_METHOD)) + ) + )) { amqp_frame_t *frame_copy = amqp_pool_alloc(&state->decoding_pool, sizeof(amqp_frame_t)); amqp_link_t *link = amqp_pool_alloc(&state->decoding_pool, sizeof(amqp_link_t)); if (frame_copy == NULL || link == NULL) { - result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; - result.library_error = ERROR_NO_MEMORY; - return result; + result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; + result.library_error = ERROR_NO_MEMORY; + return result; } *frame_copy = frame; @@ -351,9 +362,9 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, link->data = frame_copy; if (state->last_queued_frame == NULL) { - state->first_queued_frame = link; + state->first_queued_frame = link; } else { - state->last_queued_frame->next = link; + state->last_queued_frame->next = link; } state->last_queued_frame = link; @@ -361,8 +372,8 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, } result.reply_type = (amqp_id_in_reply_list(frame.payload.method.id, expected_reply_ids)) - ? AMQP_RESPONSE_NORMAL - : AMQP_RESPONSE_SERVER_EXCEPTION; + ? AMQP_RESPONSE_NORMAL + : AMQP_RESPONSE_SERVER_EXCEPTION; result.reply = frame.payload.method; return result; @@ -370,10 +381,10 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, } void *amqp_simple_rpc_decoded(amqp_connection_state_t state, - amqp_channel_t channel, - amqp_method_number_t request_id, - amqp_method_number_t reply_id, - void *decoded_request_method) + amqp_channel_t channel, + amqp_method_number_t request_id, + amqp_method_number_t reply_id, + void *decoded_request_method) { amqp_method_number_t replies[2]; @@ -381,12 +392,13 @@ void *amqp_simple_rpc_decoded(amqp_connection_state_t state, replies[1] = 0; state->most_recent_api_result = amqp_simple_rpc(state, channel, - request_id, replies, - decoded_request_method); - if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) + request_id, replies, + decoded_request_method); + if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) { return state->most_recent_api_result.reply.decoded; - else + } else { return NULL; + } } amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state) @@ -396,11 +408,11 @@ amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state) static int amqp_login_inner(amqp_connection_state_t state, - int channel_max, - int frame_max, - int heartbeat, - amqp_sasl_method_enum sasl_method, - va_list vl) + int channel_max, + int frame_max, + int heartbeat, + amqp_sasl_method_enum sasl_method, + va_list vl) { int res; amqp_method_t method; @@ -411,14 +423,15 @@ static int amqp_login_inner(amqp_connection_state_t state, amqp_send_header(state); res = amqp_simple_wait_method(state, 0, AMQP_CONNECTION_START_METHOD, - &method); - if (res < 0) + &method); + if (res < 0) { return res; + } { amqp_connection_start_t *s = (amqp_connection_start_t *) method.decoded; if ((s->version_major != AMQP_PROTOCOL_VERSION_MAJOR) || - (s->version_minor != AMQP_PROTOCOL_VERSION_MINOR)) { + (s->version_minor != AMQP_PROTOCOL_VERSION_MINOR)) { return -ERROR_INCOMPATIBLE_AMQP_VERSION; } @@ -431,10 +444,11 @@ static int amqp_login_inner(amqp_connection_state_t state, amqp_table_entry_t properties[2]; amqp_connection_start_ok_t s; amqp_bytes_t response_bytes = sasl_response(&state->decoding_pool, - sasl_method, vl); + sasl_method, vl); - if (response_bytes.bytes == NULL) + if (response_bytes.bytes == NULL) { return -ERROR_NO_MEMORY; + } properties[0].key = amqp_cstring_bytes("product"); properties[0].value.kind = AMQP_FIELD_KIND_UTF8; @@ -454,16 +468,18 @@ static int amqp_login_inner(amqp_connection_state_t state, s.locale.len = 5; res = amqp_send_method(state, 0, AMQP_CONNECTION_START_OK_METHOD, &s); - if (res < 0) + if (res < 0) { return res; + } } amqp_release_buffers(state); res = amqp_simple_wait_method(state, 0, AMQP_CONNECTION_TUNE_METHOD, - &method); - if (res < 0) + &method); + if (res < 0) { return res; + } { amqp_connection_tune_t *s = (amqp_connection_tune_t *) method.decoded; @@ -472,18 +488,22 @@ static int amqp_login_inner(amqp_connection_state_t state, server_heartbeat = s->heartbeat; } - if (server_channel_max != 0 && server_channel_max < channel_max) + if (server_channel_max != 0 && server_channel_max < channel_max) { channel_max = server_channel_max; + } - if (server_frame_max != 0 && server_frame_max < frame_max) + if (server_frame_max != 0 && server_frame_max < frame_max) { frame_max = server_frame_max; + } - if (server_heartbeat != 0 && server_heartbeat < heartbeat) + if (server_heartbeat != 0 && server_heartbeat < heartbeat) { heartbeat = server_heartbeat; + } res = amqp_tune_connection(state, channel_max, frame_max, heartbeat); - if (res < 0) + if (res < 0) { return res; + } { amqp_connection_tune_ok_t s; @@ -492,8 +512,9 @@ static int amqp_login_inner(amqp_connection_state_t state, s.heartbeat = heartbeat; res = amqp_send_method(state, 0, AMQP_CONNECTION_TUNE_OK_METHOD, &s); - if (res < 0) + if (res < 0) { return res; + } } amqp_release_buffers(state); @@ -502,12 +523,12 @@ static int amqp_login_inner(amqp_connection_state_t state, } amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, - char const *vhost, - int channel_max, - int frame_max, - int heartbeat, - amqp_sasl_method_enum sasl_method, - ...) + char const *vhost, + int channel_max, + int frame_max, + int heartbeat, + amqp_sasl_method_enum sasl_method, + ...) { va_list vl; amqp_rpc_reply_t result; @@ -533,12 +554,13 @@ amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, s.insist = 1; result = amqp_simple_rpc(state, - 0, - AMQP_CONNECTION_OPEN_METHOD, - (amqp_method_number_t *) &replies, - &s); - if (result.reply_type != AMQP_RESPONSE_NORMAL) + 0, + AMQP_CONNECTION_OPEN_METHOD, + (amqp_method_number_t *) &replies, + &s); + if (result.reply_type != AMQP_RESPONSE_NORMAL) { return result; + } } amqp_maybe_release_buffers(state); diff --git a/librabbitmq/amqp_table.c b/librabbitmq/amqp_table.c index d82c946..d9383d6 100644 --- a/librabbitmq/amqp_table.c +++ b/librabbitmq/amqp_table.c @@ -46,20 +46,20 @@ #define INITIAL_TABLE_SIZE 16 static int amqp_decode_field_value(amqp_bytes_t encoded, - amqp_pool_t *pool, - amqp_field_value_t *entry, - size_t *offset); + amqp_pool_t *pool, + amqp_field_value_t *entry, + size_t *offset); static int amqp_encode_field_value(amqp_bytes_t encoded, - amqp_field_value_t *entry, - size_t *offset); + amqp_field_value_t *entry, + size_t *offset); /*---------------------------------------------------------------------------*/ static int amqp_decode_array(amqp_bytes_t encoded, - amqp_pool_t *pool, - amqp_array_t *output, - size_t *offset) + amqp_pool_t *pool, + amqp_array_t *output, + size_t *offset) { uint32_t arraysize; int num_entries = 0; @@ -68,12 +68,14 @@ static int amqp_decode_array(amqp_bytes_t encoded, size_t limit; int res; - if (!amqp_decode_32(encoded, offset, &arraysize)) + if (!amqp_decode_32(encoded, offset, &arraysize)) { return -ERROR_BAD_AMQP_DATA; + } entries = malloc(allocated_entries * sizeof(amqp_field_value_t)); - if (entries == NULL) + if (entries == NULL) { return -ERROR_NO_MEMORY; + } limit = *offset + arraysize; while (*offset < limit) { @@ -82,16 +84,18 @@ static int amqp_decode_array(amqp_bytes_t encoded, allocated_entries = allocated_entries * 2; newentries = realloc(entries, allocated_entries * sizeof(amqp_field_value_t)); res = -ERROR_NO_MEMORY; - if (newentries == NULL) - goto out; + if (newentries == NULL) { + goto out; + } entries = newentries; } res = amqp_decode_field_value(encoded, pool, &entries[num_entries], - offset); - if (res < 0) + offset); + if (res < 0) { goto out; + } num_entries++; } @@ -100,21 +104,22 @@ static int amqp_decode_array(amqp_bytes_t encoded, output->entries = amqp_pool_alloc(pool, num_entries * sizeof(amqp_field_value_t)); res = -ERROR_NO_MEMORY; /* NULL is legitimate if we requested a zero-length block. */ - if (output->entries == NULL && num_entries > 0) + if (output->entries == NULL && num_entries > 0) { goto out; + } memcpy(output->entries, entries, num_entries * sizeof(amqp_field_value_t)); res = 0; - out: +out: free(entries); return res; } int amqp_decode_table(amqp_bytes_t encoded, - amqp_pool_t *pool, - amqp_table_t *output, - size_t *offset) + amqp_pool_t *pool, + amqp_table_t *output, + size_t *offset) { uint32_t tablesize; int num_entries = 0; @@ -123,40 +128,46 @@ int amqp_decode_table(amqp_bytes_t encoded, size_t limit; int res; - if (!amqp_decode_32(encoded, offset, &tablesize)) + if (!amqp_decode_32(encoded, offset, &tablesize)) { return -ERROR_BAD_AMQP_DATA; + } entries = malloc(allocated_entries * sizeof(amqp_table_entry_t)); - if (entries == NULL) + if (entries == NULL) { return -ERROR_NO_MEMORY; + } limit = *offset + tablesize; while (*offset < limit) { uint8_t keylen; res = -ERROR_BAD_AMQP_DATA; - if (!amqp_decode_8(encoded, offset, &keylen)) + if (!amqp_decode_8(encoded, offset, &keylen)) { goto out; + } if (num_entries >= allocated_entries) { void *newentries; allocated_entries = allocated_entries * 2; newentries = realloc(entries, allocated_entries * sizeof(amqp_table_entry_t)); res = -ERROR_NO_MEMORY; - if (newentries == NULL) - goto out; + if (newentries == NULL) { + goto out; + } entries = newentries; } res = -ERROR_BAD_AMQP_DATA; - if (!amqp_decode_bytes(encoded, offset, &entries[num_entries].key, keylen)) + if (!amqp_decode_bytes(encoded, offset, &entries[num_entries].key, keylen)) { goto out; + } res = amqp_decode_field_value(encoded, pool, &entries[num_entries].value, - offset); - if (res < 0) + offset); + if (res < 0) { goto out; + } num_entries++; } @@ -165,26 +176,28 @@ int amqp_decode_table(amqp_bytes_t encoded, output->entries = amqp_pool_alloc(pool, num_entries * sizeof(amqp_table_entry_t)); res = -ERROR_NO_MEMORY; /* NULL is legitimate if we requested a zero-length block. */ - if (output->entries == NULL && num_entries > 0) + if (output->entries == NULL && num_entries > 0) { goto out; + } memcpy(output->entries, entries, num_entries * sizeof(amqp_table_entry_t)); res = 0; - out: +out: free(entries); return res; } static int amqp_decode_field_value(amqp_bytes_t encoded, - amqp_pool_t *pool, - amqp_field_value_t *entry, - size_t *offset) + amqp_pool_t *pool, + amqp_field_value_t *entry, + size_t *offset) { int res = -ERROR_BAD_AMQP_DATA; - if (!amqp_decode_8(encoded, offset, &entry->kind)) + if (!amqp_decode_8(encoded, offset, &entry->kind)) { goto out; + } #define TRIVIAL_FIELD_DECODER(bits) if (!amqp_decode_##bits(encoded, offset, &entry->value.u##bits)) goto out; break #define SIMPLE_FIELD_DECODER(bits, dest, how) { uint##bits##_t val; if (!amqp_decode_##bits(encoded, offset, &val)) goto out; entry->value.dest = how; } break @@ -223,8 +236,9 @@ static int amqp_decode_field_value(amqp_bytes_t encoded, case AMQP_FIELD_KIND_DECIMAL: if (!amqp_decode_8(encoded, offset, &entry->value.decimal.decimals) - || !amqp_decode_32(encoded, offset, &entry->value.decimal.value)) + || !amqp_decode_32(encoded, offset, &entry->value.decimal.value)) { goto out; + } break; case AMQP_FIELD_KIND_UTF8: @@ -234,8 +248,9 @@ static int amqp_decode_field_value(amqp_bytes_t encoded, case AMQP_FIELD_KIND_BYTES: { uint32_t len; if (!amqp_decode_32(encoded, offset, &len) - || !amqp_decode_bytes(encoded, offset, &entry->value.bytes, len)) + || !amqp_decode_bytes(encoded, offset, &entry->value.bytes, len)) { goto out; + } break; } @@ -259,15 +274,15 @@ static int amqp_decode_field_value(amqp_bytes_t encoded, res = 0; - out: +out: return res; } /*---------------------------------------------------------------------------*/ static int amqp_encode_array(amqp_bytes_t encoded, - amqp_array_t *input, - size_t *offset) + amqp_array_t *input, + size_t *offset) { size_t start = *offset; int i, res; @@ -276,22 +291,24 @@ static int amqp_encode_array(amqp_bytes_t encoded, for (i = 0; i < input->num_entries; i++) { res = amqp_encode_field_value(encoded, &input->entries[i], offset); - if (res < 0) + if (res < 0) { goto out; + } } - if (amqp_encode_32(encoded, &start, *offset - start - 4)) + if (amqp_encode_32(encoded, &start, *offset - start - 4)) { res = 0; - else + } else { res = -ERROR_BAD_AMQP_DATA; + } - out: +out: return res; } int amqp_encode_table(amqp_bytes_t encoded, - amqp_table_t *input, - size_t *offset) + amqp_table_t *input, + size_t *offset) { size_t start = *offset; int i, res; @@ -300,35 +317,40 @@ int amqp_encode_table(amqp_bytes_t encoded, for (i = 0; i < input->num_entries; i++) { res = amqp_encode_8(encoded, offset, input->entries[i].key.len); - if (res < 0) + if (res < 0) { goto out; + } res = amqp_encode_bytes(encoded, offset, input->entries[i].key); - if (res < 0) + if (res < 0) { goto out; + } res = amqp_encode_field_value(encoded, &input->entries[i].value, offset); - if (res < 0) + if (res < 0) { goto out; + } } - if (amqp_encode_32(encoded, &start, *offset - start - 4)) + if (amqp_encode_32(encoded, &start, *offset - start - 4)) { res = 0; - else + } else { res = -ERROR_BAD_AMQP_DATA; + } - out: +out: return res; } static int amqp_encode_field_value(amqp_bytes_t encoded, - amqp_field_value_t *entry, - size_t *offset) + amqp_field_value_t *entry, + size_t *offset) { int res = -ERROR_BAD_AMQP_DATA; - if (!amqp_encode_8(encoded, offset, entry->kind)) + if (!amqp_encode_8(encoded, offset, entry->kind)) { goto out; + } #define FIELD_ENCODER(bits, val) if (!amqp_encode_##bits(encoded, offset, val)) goto out; break @@ -366,8 +388,9 @@ static int amqp_encode_field_value(amqp_bytes_t encoded, case AMQP_FIELD_KIND_DECIMAL: if (!amqp_encode_8(encoded, offset, entry->value.decimal.decimals) - || !amqp_encode_32(encoded, offset, entry->value.decimal.value)) + || !amqp_encode_32(encoded, offset, entry->value.decimal.value)) { goto out; + } break; case AMQP_FIELD_KIND_UTF8: @@ -376,8 +399,9 @@ static int amqp_encode_field_value(amqp_bytes_t encoded, /* fall through */ case AMQP_FIELD_KIND_BYTES: if (!amqp_encode_32(encoded, offset, entry->value.bytes.len) - || !amqp_encode_bytes(encoded, offset, entry->value.bytes)) + || !amqp_encode_bytes(encoded, offset, entry->value.bytes)) { goto out; + } break; case AMQP_FIELD_KIND_ARRAY: @@ -400,13 +424,14 @@ static int amqp_encode_field_value(amqp_bytes_t encoded, res = 0; - out: +out: return res; } /*---------------------------------------------------------------------------*/ -int amqp_table_entry_cmp(void const *entry1, void const *entry2) { +int amqp_table_entry_cmp(void const *entry1, void const *entry2) +{ amqp_table_entry_t const *p1 = (amqp_table_entry_t const *) entry1; amqp_table_entry_t const *p2 = (amqp_table_entry_t const *) entry2; @@ -414,7 +439,9 @@ int amqp_table_entry_cmp(void const *entry1, void const *entry2) { size_t minlen; minlen = p1->key.len; - if (p2->key.len < minlen) minlen = p2->key.len; + if (p2->key.len < minlen) { + minlen = p2->key.len; + } d = memcmp(p1->key.bytes, p2->key.bytes, minlen); if (d != 0) { diff --git a/librabbitmq/amqp_url.c b/librabbitmq/amqp_url.c index 367376d..d3e6724 100644 --- a/librabbitmq/amqp_url.c +++ b/librabbitmq/amqp_url.c @@ -43,159 +43,167 @@ void amqp_default_connection_info(struct amqp_connection_info *ci) { - /* Apply defaults */ - ci->user = "guest"; - ci->password = "guest"; - ci->host = "localhost"; - ci->port = 5672; - ci->vhost = "/"; + /* Apply defaults */ + ci->user = "guest"; + ci->password = "guest"; + ci->host = "localhost"; + ci->port = 5672; + ci->vhost = "/"; } /* Scan for the next delimiter, handling percent-encodings on the way. */ static char find_delim(char **pp, int colon_and_at_sign_are_delims) { - char *from = *pp; - char *to = from; - - for (;;) { - char ch = *from++; - - switch (ch) { - case ':': - case '@': - if (!colon_and_at_sign_are_delims) { - *to++ = ch; - break; - } - - /* fall through */ - case 0: - case '/': - case '?': - case '#': - case '[': - case ']': - *to = 0; - *pp = from; - return ch; - - case '%': { - unsigned int val; - int chars; - int res = sscanf(from, "%2x%n", &val, &chars); - - if (res == EOF || res < 1 || chars != 2) - /* Return a surprising delimiter to - force an error. */ - return '%'; - - *to++ = val; - from += 2; - break; - } - - default: - *to++ = ch; - break; - } - } + char *from = *pp; + char *to = from; + + for (;;) { + char ch = *from++; + + switch (ch) { + case ':': + case '@': + if (!colon_and_at_sign_are_delims) { + *to++ = ch; + break; + } + + /* fall through */ + case 0: + case '/': + case '?': + case '#': + case '[': + case ']': + *to = 0; + *pp = from; + return ch; + + case '%': { + unsigned int val; + int chars; + int res = sscanf(from, "%2x%n", &val, &chars); + + if (res == EOF || res < 1 || chars != 2) + /* Return a surprising delimiter to + force an error. */ + { + return '%'; + } + + *to++ = val; + from += 2; + break; + } + + default: + *to++ = ch; + break; + } + } } /* Parse an AMQP URL into its component parts. */ int amqp_parse_url(char *url, struct amqp_connection_info *parsed) { - int res = -ERROR_BAD_AMQP_URL; - char delim; - char *start; - char *host; - char *port = NULL; - - /* check the prefix */ - if (strncmp(url, "amqp://", 7)) - goto out; - - host = start = url += 7; - delim = find_delim(&url, 1); - - if (delim == ':') { - /* The colon could be introducing the port or the - password part of the userinfo. We don't know yet, - so stash the preceding component. */ - port = start = url; - delim = find_delim(&url, 1); - } - - if (delim == '@') { - /* What might have been the host and port were in fact - the username and password */ - parsed->user = host; - if (port) - parsed->password = port; - - port = NULL; - host = start = url; - delim = find_delim(&url, 1); - } - - if (delim == '[') { - /* IPv6 address. The bracket should be the first - character in the host. */ - if (host != start || *host != 0) - goto out; - - start = url; - delim = find_delim(&url, 0); - - if (delim != ']') - goto out; - - parsed->host = start; - start = url; - delim = find_delim(&url, 1); - - /* Closing bracket should be the last character in the - host. */ - if (*start != 0) - goto out; - } - else { - /* If we haven't seen the host yet, this is it. */ - if (*host != 0) - parsed->host = host; - } - - if (delim == ':') { - port = start = url; - delim = find_delim(&url, 1); - } - - if (port) { - char *end; - long portnum = strtol(port, &end, 10); - - if (port == end || *end != 0 || portnum < 0 || portnum > 65535) - goto out; - - parsed->port = portnum; - } - - if (delim == '/') { - start = url; - delim = find_delim(&url, 1); - - if (delim != 0) - goto out; - - parsed->vhost = start; - res = 0; - } - else if (delim == 0) { - res = 0; - } - - /* Any other delimiter is bad, and we will return - ERROR_BAD_AMQP_URL. */ - - out: - return res; + int res = -ERROR_BAD_AMQP_URL; + char delim; + char *start; + char *host; + char *port = NULL; + + /* check the prefix */ + if (strncmp(url, "amqp://", 7)) { + goto out; + } + + host = start = url += 7; + delim = find_delim(&url, 1); + + if (delim == ':') { + /* The colon could be introducing the port or the + password part of the userinfo. We don't know yet, + so stash the preceding component. */ + port = start = url; + delim = find_delim(&url, 1); + } + + if (delim == '@') { + /* What might have been the host and port were in fact + the username and password */ + parsed->user = host; + if (port) { + parsed->password = port; + } + + port = NULL; + host = start = url; + delim = find_delim(&url, 1); + } + + if (delim == '[') { + /* IPv6 address. The bracket should be the first + character in the host. */ + if (host != start || *host != 0) { + goto out; + } + + start = url; + delim = find_delim(&url, 0); + + if (delim != ']') { + goto out; + } + + parsed->host = start; + start = url; + delim = find_delim(&url, 1); + + /* Closing bracket should be the last character in the + host. */ + if (*start != 0) { + goto out; + } + } else { + /* If we haven't seen the host yet, this is it. */ + if (*host != 0) { + parsed->host = host; + } + } + + if (delim == ':') { + port = start = url; + delim = find_delim(&url, 1); + } + + if (port) { + char *end; + long portnum = strtol(port, &end, 10); + + if (port == end || *end != 0 || portnum < 0 || portnum > 65535) { + goto out; + } + + parsed->port = portnum; + } + + if (delim == '/') { + start = url; + delim = find_delim(&url, 1); + + if (delim != 0) { + goto out; + } + + parsed->vhost = start; + res = 0; + } else if (delim == 0) { + res = 0; + } + + /* Any other delimiter is bad, and we will return + ERROR_BAD_AMQP_URL. */ + +out: + return res; } diff --git a/librabbitmq/unix/socket.c b/librabbitmq/unix/socket.c index 69bb036..8c8ba6b 100644 --- a/librabbitmq/unix/socket.c +++ b/librabbitmq/unix/socket.c @@ -45,37 +45,38 @@ int amqp_socket_init(void) { - return 0; + return 0; } int amqp_socket_error(void) { - return errno | ERROR_CATEGORY_OS; + return errno | ERROR_CATEGORY_OS; } int amqp_socket_socket(int domain, int type, int proto) { - int flags; + int flags; - int s = socket(domain, type, proto); - if (s < 0) - return s; + int s = socket(domain, type, proto); + if (s < 0) { + return s; + } - /* Always enable CLOEXEC on the socket */ - flags = fcntl(s, F_GETFD); - if (flags == -1 - || fcntl(s, F_SETFD, (long)(flags | FD_CLOEXEC)) == -1) { - int e = errno; - close(s); - errno = e; - return -1; - } + /* Always enable CLOEXEC on the socket */ + flags = fcntl(s, F_GETFD); + if (flags == -1 + || fcntl(s, F_SETFD, (long)(flags | FD_CLOEXEC)) == -1) { + int e = errno; + close(s); + errno = e; + return -1; + } - return s; + return s; } char *amqp_os_error_string(int err) { - return strdup(strerror(err)); + return strdup(strerror(err)); } diff --git a/librabbitmq/win32/socket.c b/librabbitmq/win32/socket.c index 9919b6b..f3c8fc7 100644 --- a/librabbitmq/win32/socket.c +++ b/librabbitmq/win32/socket.c @@ -48,55 +48,58 @@ static int called_wsastartup; int amqp_socket_init(void) { - if (!called_wsastartup) { - WSADATA data; - int res = WSAStartup(0x0202, &data); - if (res) - return -res; + if (!called_wsastartup) { + WSADATA data; + int res = WSAStartup(0x0202, &data); + if (res) { + return -res; + } - called_wsastartup = 1; - } + called_wsastartup = 1; + } - return 0; + return 0; } char *amqp_os_error_string(int err) { - char *msg, *copy; + char *msg, *copy; - if (!FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM - | FORMAT_MESSAGE_ALLOCATE_BUFFER, - NULL, err, - MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), - (LPSTR)&msg, 0, NULL)) - return strdup("(error retrieving Windows error message)"); + if (!FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM + | FORMAT_MESSAGE_ALLOCATE_BUFFER, + NULL, err, + MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), + (LPSTR)&msg, 0, NULL)) { + return strdup("(error retrieving Windows error message)"); + } - copy = strdup(msg); - LocalFree(msg); - return copy; + copy = strdup(msg); + LocalFree(msg); + return copy; } int amqp_socket_setsockopt(int sock, int level, int optname, - const void *optval, size_t optlen) + const void *optval, size_t optlen) { - /* the winsock setsockopt function has its 4th argument as a - const char * */ - return setsockopt(sock, level, optname, (const char *)optval, optlen); + /* the winsock setsockopt function has its 4th argument as a + const char * */ + return setsockopt(sock, level, optname, (const char *)optval, optlen); } int amqp_socket_writev(int sock, struct iovec *iov, int nvecs) { - DWORD ret; - if (WSASend(sock, (LPWSABUF)iov, nvecs, &ret, 0, NULL, NULL) == 0) - return ret; - else - return -1; + DWORD ret; + if (WSASend(sock, (LPWSABUF)iov, nvecs, &ret, 0, NULL, NULL) == 0) { + return ret; + } else { + return -1; + } } int amqp_socket_error(void) { - return WSAGetLastError() | ERROR_CATEGORY_OS; + return WSAGetLastError() | ERROR_CATEGORY_OS; } diff --git a/librabbitmq/win32/socket.h b/librabbitmq/win32/socket.h index 9351b1f..43e2a13 100644 --- a/librabbitmq/win32/socket.h +++ b/librabbitmq/win32/socket.h @@ -39,8 +39,8 @@ /* same as WSABUF */ struct iovec { - u_long iov_len; - void *iov_base; + u_long iov_len; + void *iov_base; }; int @@ -51,7 +51,7 @@ amqp_socket_init(void); int amqp_socket_setsockopt(int sock, int level, int optname, const void *optval, - size_t optlen); + size_t optlen); int amqp_socket_writev(int sock, struct iovec *iov, int nvecs); diff --git a/tests/test_parse_url.c b/tests/test_parse_url.c index 2e7d1c7..585514d 100644 --- a/tests/test_parse_url.c +++ b/tests/test_parse_url.c @@ -43,127 +43,127 @@ static void match_string(const char *what, const char *expect, const char *got) { - if (strcmp(got, expect)) { - fprintf(stderr, "Expected %s '%s', got '%s'\n", - what, expect, got); - abort(); - } + if (strcmp(got, expect)) { + fprintf(stderr, "Expected %s '%s', got '%s'\n", + what, expect, got); + abort(); + } } static void match_int(const char *what, int expect, int got) { - if (got != expect) { - fprintf(stderr, "Expected %s '%d', got '%d'\n", - what, expect, got); - abort(); - } + if (got != expect) { + fprintf(stderr, "Expected %s '%d', got '%d'\n", + what, expect, got); + abort(); + } } static void parse_success(const char *url, - const char *user, - const char *password, - const char *host, - int port, - const char *vhost) + const char *user, + const char *password, + const char *host, + int port, + const char *vhost) { - char *s = strdup(url); - struct amqp_connection_info ci; - int res; - - amqp_default_connection_info(&ci); - res = amqp_parse_url(s, &ci); - if (res) { - fprintf(stderr, - "Expected to successfully parse URL, but didn't: %s (%s)\n", - url, amqp_error_string(-res)); - abort(); - } - - match_string("user", user, ci.user); - match_string("password", password, ci.password); - match_string("host", host, ci.host); - match_int("port", port, ci.port); - match_string("vhost", vhost, ci.vhost); - - free(s); + char *s = strdup(url); + struct amqp_connection_info ci; + int res; + + amqp_default_connection_info(&ci); + res = amqp_parse_url(s, &ci); + if (res) { + fprintf(stderr, + "Expected to successfully parse URL, but didn't: %s (%s)\n", + url, amqp_error_string(-res)); + abort(); + } + + match_string("user", user, ci.user); + match_string("password", password, ci.password); + match_string("host", host, ci.host); + match_int("port", port, ci.port); + match_string("vhost", vhost, ci.vhost); + + free(s); } static void parse_fail(const char *url) { - char *s = strdup(url); - struct amqp_connection_info ci; - - amqp_default_connection_info(&ci); - if (amqp_parse_url(s, &ci) >= 0) { - fprintf(stderr, - "Expected to fail parsing URL, but didn't: %s\n", - url); - abort(); - } - - free(s); + char *s = strdup(url); + struct amqp_connection_info ci; + + amqp_default_connection_info(&ci); + if (amqp_parse_url(s, &ci) >= 0) { + fprintf(stderr, + "Expected to fail parsing URL, but didn't: %s\n", + url); + abort(); + } + + free(s); } int main(void) { - /* From the spec */ - parse_success("amqp://user:pass@host:10000/vhost", "user", "pass", - "host", 10000, "vhost"); - parse_success("amqp://user%61:%61pass@ho%61st:10000/v%2fhost", - "usera", "apass", "hoast", 10000, "v/host"); - parse_success("amqp://", "guest", "guest", "localhost", 5672, "/"); - parse_success("amqp://:@/", "", "", "localhost", 5672, ""); - parse_success("amqp://user@", "user", "guest", "localhost", 5672, "/"); - parse_success("amqp://user:pass@", "user", "pass", - "localhost", 5672, "/"); - parse_success("amqp://host", "guest", "guest", "host", 5672, "/"); - parse_success("amqp://:10000", "guest", "guest", "localhost", 10000, - "/"); - parse_success("amqp:///vhost", "guest", "guest", "localhost", 5672, - "vhost"); - parse_success("amqp://host/", "guest", "guest", "host", 5672, ""); - parse_success("amqp://host/%2f", "guest", "guest", "host", 5672, "/"); - parse_success("amqp://[::1]", "guest", "guest", "::1", 5672, "/"); - - /* Various other success cases */ - parse_success("amqp://host:100", "guest", "guest", "host", 100, "/"); - parse_success("amqp://[::1]:100", "guest", "guest", "::1", 100, "/"); - - parse_success("amqp://host/blah", "guest", "guest", - "host", 5672, "blah"); - parse_success("amqp://host:100/blah", "guest", "guest", - "host", 100, "blah"); - parse_success("amqp://:100/blah", "guest", "guest", - "localhost", 100, "blah"); - parse_success("amqp://[::1]/blah", "guest", "guest", - "::1", 5672, "blah"); - parse_success("amqp://[::1]:100/blah", "guest", "guest", - "::1", 100, "blah"); - - parse_success("amqp://user:pass@host", "user", "pass", - "host", 5672, "/"); - parse_success("amqp://user:pass@host:100", "user", "pass", - "host", 100, "/"); - parse_success("amqp://user:pass@:100", "user", "pass", - "localhost", 100, "/"); - parse_success("amqp://user:pass@[::1]", "user", "pass", - "::1", 5672, "/"); - parse_success("amqp://user:pass@[::1]:100", "user", "pass", - "::1", 100, "/"); - - /* Various failure cases */ - parse_fail("http://www.rabbitmq.com"); - parse_fail("amqp://foo:bar:baz"); - parse_fail("amqp://foo[::1]"); - parse_fail("amqp://foo:[::1]"); - parse_fail("amqp://[::1]foo"); - parse_fail("amqp://foo:1000xyz"); - parse_fail("amqp://foo:1000000"); - parse_fail("amqp://foo/bar/baz"); - - parse_fail("amqp://foo%1"); - parse_fail("amqp://foo%1x"); - parse_fail("amqp://foo%xy"); - - return 0; + /* From the spec */ + parse_success("amqp://user:pass@host:10000/vhost", "user", "pass", + "host", 10000, "vhost"); + parse_success("amqp://user%61:%61pass@ho%61st:10000/v%2fhost", + "usera", "apass", "hoast", 10000, "v/host"); + parse_success("amqp://", "guest", "guest", "localhost", 5672, "/"); + parse_success("amqp://:@/", "", "", "localhost", 5672, ""); + parse_success("amqp://user@", "user", "guest", "localhost", 5672, "/"); + parse_success("amqp://user:pass@", "user", "pass", + "localhost", 5672, "/"); + parse_success("amqp://host", "guest", "guest", "host", 5672, "/"); + parse_success("amqp://:10000", "guest", "guest", "localhost", 10000, + "/"); + parse_success("amqp:///vhost", "guest", "guest", "localhost", 5672, + "vhost"); + parse_success("amqp://host/", "guest", "guest", "host", 5672, ""); + parse_success("amqp://host/%2f", "guest", "guest", "host", 5672, "/"); + parse_success("amqp://[::1]", "guest", "guest", "::1", 5672, "/"); + + /* Various other success cases */ + parse_success("amqp://host:100", "guest", "guest", "host", 100, "/"); + parse_success("amqp://[::1]:100", "guest", "guest", "::1", 100, "/"); + + parse_success("amqp://host/blah", "guest", "guest", + "host", 5672, "blah"); + parse_success("amqp://host:100/blah", "guest", "guest", + "host", 100, "blah"); + parse_success("amqp://:100/blah", "guest", "guest", + "localhost", 100, "blah"); + parse_success("amqp://[::1]/blah", "guest", "guest", + "::1", 5672, "blah"); + parse_success("amqp://[::1]:100/blah", "guest", "guest", + "::1", 100, "blah"); + + parse_success("amqp://user:pass@host", "user", "pass", + "host", 5672, "/"); + parse_success("amqp://user:pass@host:100", "user", "pass", + "host", 100, "/"); + parse_success("amqp://user:pass@:100", "user", "pass", + "localhost", 100, "/"); + parse_success("amqp://user:pass@[::1]", "user", "pass", + "::1", 5672, "/"); + parse_success("amqp://user:pass@[::1]:100", "user", "pass", + "::1", 100, "/"); + + /* Various failure cases */ + parse_fail("http://www.rabbitmq.com"); + parse_fail("amqp://foo:bar:baz"); + parse_fail("amqp://foo[::1]"); + parse_fail("amqp://foo:[::1]"); + parse_fail("amqp://[::1]foo"); + parse_fail("amqp://foo:1000xyz"); + parse_fail("amqp://foo:1000000"); + parse_fail("amqp://foo/bar/baz"); + + parse_fail("amqp://foo%1"); + parse_fail("amqp://foo%1x"); + parse_fail("amqp://foo%xy"); + + return 0; } diff --git a/tests/test_tables.c b/tests/test_tables.c index 404a6d8..21e1c01 100644 --- a/tests/test_tables.c +++ b/tests/test_tables.c @@ -48,20 +48,21 @@ void die(const char *fmt, ...) { - va_list ap; - va_start(ap, fmt); - vfprintf(stderr, fmt, ap); - va_end(ap); - fprintf(stderr, "\n"); - abort(); + va_list ap; + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); + fprintf(stderr, "\n"); + abort(); } static void dump_indent(int indent, FILE *out) { int i; - for (i = 0; i < indent; i++) + for (i = 0; i < indent; i++) { fputc(' ', out); + } } static void dump_value(int indent, amqp_field_value_t v, FILE *out) @@ -114,26 +115,28 @@ static void dump_value(int indent, amqp_field_value_t v, FILE *out) case AMQP_FIELD_KIND_DECIMAL: fprintf(out, " %d:::%u\n", v.value.decimal.decimals, - v.value.decimal.value); + v.value.decimal.value); break; case AMQP_FIELD_KIND_UTF8: fprintf(out, " %.*s\n", (int)v.value.bytes.len, - (char *)v.value.bytes.bytes); + (char *)v.value.bytes.bytes); break; case AMQP_FIELD_KIND_BYTES: fputc(' ', out); - for (i = 0; i < (int)v.value.bytes.len; i++) + for (i = 0; i < (int)v.value.bytes.len; i++) { fprintf(out, "%02x", ((char *) v.value.bytes.bytes)[i]); + } fputc('\n', out); break; case AMQP_FIELD_KIND_ARRAY: fputc('\n', out); - for (i = 0; i < v.value.array.num_entries; i++) + for (i = 0; i < v.value.array.num_entries; i++) { dump_value(indent + 2, v.value.array.entries[i], out); + } break; @@ -146,8 +149,8 @@ static void dump_value(int indent, amqp_field_value_t v, FILE *out) for (i = 0; i < v.value.table.num_entries; i++) { dump_indent(indent + 2, out); fprintf(out, "%.*s ->\n", - (int)v.value.table.entries[i].key.len, - (char *)v.value.table.entries[i].key.bytes); + (int)v.value.table.entries[i].key.len, + (char *)v.value.table.entries[i].key.bytes); dump_value(indent + 4, v.value.table.entries[i].value, out); } @@ -361,9 +364,10 @@ static void test_table_codec(FILE *out) decoding_bytes.bytes = pre_encoded_table; result = amqp_decode_table(decoding_bytes, &pool, &decoded, - &decoding_offset); - if (result < 0) + &decoding_offset); + if (result < 0) { die("Table decoding failed: %s", amqp_error_string(-result)); + } fprintf(out, "BBBBBBBBBB\n"); @@ -386,16 +390,18 @@ static void test_table_codec(FILE *out) encoding_result.bytes = &encoding_buffer[0]; result = amqp_encode_table(encoding_result, &table, &offset); - if (result < 0) + if (result < 0) { die("Table encoding failed: %s", amqp_error_string(-result)); + } if (offset != sizeof(pre_encoded_table)) die("Offset should be %ld, was %ld", (long)sizeof(pre_encoded_table), - (long)offset); + (long)offset); result = memcmp(pre_encoded_table, encoding_buffer, offset); - if (result != 0) + if (result != 0) { die("Table encoding differed", result); + } } empty_amqp_pool(&pool); @@ -417,12 +423,14 @@ static int compare_files(FILE *f1_in, FILE *f2_in) size_t f2_got = fread(f2_buf, 1, CHUNK_SIZE, f2_in); res = memcmp(f1_buf, f2_buf, f1_got < f2_got ? f1_got : f2_got); - if (res) + if (res) { break; + } if (f1_got < CHUNK_SIZE || f2_got < CHUNK_SIZE) { - if (f1_got != f2_got) + if (f1_got != f2_got) { res = (f1_got < f2_got ? -1 : 1); + } break; } } @@ -439,24 +447,28 @@ int main(void) char *expected_path; out = tmpfile(); - if (out == NULL) + if (out == NULL) { die("failed to create temporary file: %s", strerror(errno)); + } test_table_codec(out); fprintf(out, "----------\n"); test_dump_value(out); - if (srcdir == NULL) + if (srcdir == NULL) { srcdir = "."; + } expected_path = malloc(strlen(srcdir) + strlen(expected_file_name) + 2); sprintf(expected_path, "%s/%s", srcdir, expected_file_name); expected = fopen(expected_path, "r"); - if (!expected) + if (!expected) { die("failed to open %s: %s", expected_path, strerror(errno)); + } - if (compare_files(expected, out)) + if (compare_files(expected, out)) { die("output file did not have expected contents"); + } fclose(out); fclose(expected); diff --git a/tools/common.c b/tools/common.c index c5b2a77..bead858 100644 --- a/tools/common.c +++ b/tools/common.c @@ -54,113 +54,116 @@ void die(const char *fmt, ...) { - va_list ap; - va_start(ap, fmt); - vfprintf(stderr, fmt, ap); - va_end(ap); - fprintf(stderr, "\n"); - exit(1); + va_list ap; + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); + fprintf(stderr, "\n"); + exit(1); } void die_errno(int err, const char *fmt, ...) { - va_list ap; + va_list ap; - if (err == 0) - return; + if (err == 0) { + return; + } - va_start(ap, fmt); - vfprintf(stderr, fmt, ap); - va_end(ap); - fprintf(stderr, ": %s\n", strerror(err)); - exit(1); + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); + fprintf(stderr, ": %s\n", strerror(err)); + exit(1); } void die_amqp_error(int err, const char *fmt, ...) { - va_list ap; - char *errstr; - - if (err >= 0) - return; - - va_start(ap, fmt); - vfprintf(stderr, fmt, ap); - va_end(ap); - fprintf(stderr, ": %s\n", errstr = amqp_error_string(-err)); - free(errstr); - exit(1); + va_list ap; + char *errstr; + + if (err >= 0) { + return; + } + + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); + fprintf(stderr, ": %s\n", errstr = amqp_error_string(-err)); + free(errstr); + exit(1); } char *amqp_server_exception_string(amqp_rpc_reply_t r) { - int res; - char *s; - - switch (r.reply.id) { - case AMQP_CONNECTION_CLOSE_METHOD: { - amqp_connection_close_t *m - = (amqp_connection_close_t *)r.reply.decoded; - res = asprintf(&s, "server connection error %d, message: %.*s", - m->reply_code, - (int)m->reply_text.len, - (char *)m->reply_text.bytes); - break; - } - - case AMQP_CHANNEL_CLOSE_METHOD: { - amqp_channel_close_t *m - = (amqp_channel_close_t *)r.reply.decoded; - res = asprintf(&s, "server channel error %d, message: %.*s", - m->reply_code, - (int)m->reply_text.len, - (char *)m->reply_text.bytes); - break; - } - - default: - res = asprintf(&s, "unknown server error, method id 0x%08X", - r.reply.id); - break; - } - - return res >= 0 ? s : NULL; + int res; + char *s; + + switch (r.reply.id) { + case AMQP_CONNECTION_CLOSE_METHOD: { + amqp_connection_close_t *m + = (amqp_connection_close_t *)r.reply.decoded; + res = asprintf(&s, "server connection error %d, message: %.*s", + m->reply_code, + (int)m->reply_text.len, + (char *)m->reply_text.bytes); + break; + } + + case AMQP_CHANNEL_CLOSE_METHOD: { + amqp_channel_close_t *m + = (amqp_channel_close_t *)r.reply.decoded; + res = asprintf(&s, "server channel error %d, message: %.*s", + m->reply_code, + (int)m->reply_text.len, + (char *)m->reply_text.bytes); + break; + } + + default: + res = asprintf(&s, "unknown server error, method id 0x%08X", + r.reply.id); + break; + } + + return res >= 0 ? s : NULL; } char *amqp_rpc_reply_string(amqp_rpc_reply_t r) { - switch (r.reply_type) { - case AMQP_RESPONSE_NORMAL: - return strdup("normal response"); + switch (r.reply_type) { + case AMQP_RESPONSE_NORMAL: + return strdup("normal response"); - case AMQP_RESPONSE_NONE: - return strdup("missing RPC reply type"); + case AMQP_RESPONSE_NONE: + return strdup("missing RPC reply type"); - case AMQP_RESPONSE_LIBRARY_EXCEPTION: - return amqp_error_string(r.library_error); + case AMQP_RESPONSE_LIBRARY_EXCEPTION: + return amqp_error_string(r.library_error); - case AMQP_RESPONSE_SERVER_EXCEPTION: - return amqp_server_exception_string(r); + case AMQP_RESPONSE_SERVER_EXCEPTION: + return amqp_server_exception_string(r); - default: - abort(); - } + default: + abort(); + } } void die_rpc(amqp_rpc_reply_t r, const char *fmt, ...) { - va_list ap; - char *errstr; - - if (r.reply_type == AMQP_RESPONSE_NORMAL) - return; - - va_start(ap, fmt); - vfprintf(stderr, fmt, ap); - va_end(ap); - fprintf(stderr, ": %s\n", errstr = amqp_rpc_reply_string(r)); - free(errstr); - exit(1); + va_list ap; + char *errstr; + + if (r.reply_type == AMQP_RESPONSE_NORMAL) { + return; + } + + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); + fprintf(stderr, ": %s\n", errstr = amqp_rpc_reply_string(r)); + free(errstr); + exit(1); } static char *amqp_url; @@ -172,267 +175,299 @@ static char *amqp_password; const char *connect_options_title = "Connection options"; struct poptOption connect_options[] = { - {"url", 'u', POPT_ARG_STRING, &amqp_url, 0, - "the AMQP URL to connect to", "amqp://..."}, - {"server", 's', POPT_ARG_STRING, &amqp_server, 0, - "the AMQP server to connect to", "hostname"}, - {"port", 0, POPT_ARG_INT, &amqp_port, 0, - "the port to connect on", "port" }, - {"vhost", 0, POPT_ARG_STRING, &amqp_vhost, 0, - "the vhost to use when connecting", "vhost"}, - {"username", 0, POPT_ARG_STRING, &amqp_username, 0, - "the username to login with", "username"}, - {"password", 0, POPT_ARG_STRING, &amqp_password, 0, - "the password to login with", "password"}, - { NULL, '\0', 0, NULL, 0, NULL, NULL } + { + "url", 'u', POPT_ARG_STRING, &amqp_url, 0, + "the AMQP URL to connect to", "amqp://..." + }, + { + "server", 's', POPT_ARG_STRING, &amqp_server, 0, + "the AMQP server to connect to", "hostname" + }, + { + "port", 0, POPT_ARG_INT, &amqp_port, 0, + "the port to connect on", "port" + }, + { + "vhost", 0, POPT_ARG_STRING, &amqp_vhost, 0, + "the vhost to use when connecting", "vhost" + }, + { + "username", 0, POPT_ARG_STRING, &amqp_username, 0, + "the username to login with", "username" + }, + { + "password", 0, POPT_ARG_STRING, &amqp_password, 0, + "the password to login with", "password" + }, + { NULL, '\0', 0, NULL, 0, NULL, NULL } }; static void init_connection_info(struct amqp_connection_info *ci) { - struct amqp_connection_info defaults; + struct amqp_connection_info defaults; - ci->user = NULL; - ci->password = NULL; - ci->host = NULL; - ci->port = -1; - ci->vhost = NULL; - ci->user = NULL; + ci->user = NULL; + ci->password = NULL; + ci->host = NULL; + ci->port = -1; + ci->vhost = NULL; + ci->user = NULL; - if (amqp_url) - die_amqp_error(amqp_parse_url(strdup(amqp_url), ci), - "Parsing URL '%s'", amqp_url); + if (amqp_url) { + die_amqp_error(amqp_parse_url(strdup(amqp_url), ci), + "Parsing URL '%s'", amqp_url); + } - if (amqp_server) { + if (amqp_server) { char *colon; - if (ci->host) - die("both --server and --url options specify" - " server host"); - - /* parse the server string into a hostname and a port */ - colon = strchr(amqp_server, ':'); - if (colon) { - char *port_end; - size_t host_len; - - /* Deprecate specifying the port number with the - --server option, because it is not ipv6 friendly. - --url now allows connection options to be - specificied concisely. */ - fprintf(stderr, "Specifying the port number with" - " --server is deprecated\n"); - - host_len = colon - amqp_server; - ci->host = malloc(host_len + 1); - memcpy(ci->host, amqp_server, host_len); - ci->host[host_len] = 0; - - if (ci->port >= 0) - die("both --server and --url options specify" - " server port"); - if (amqp_port >= 0) - die("both --server and --port options specify" - " server port"); - - ci->port = strtol(colon+1, &port_end, 10); - if (ci->port < 0 - || ci->port > 65535 - || port_end == colon+1 - || *port_end != 0) - die("bad server port number in '%s'", - amqp_server); - } - } - - if (amqp_port >= 0) { - if (ci->port >= 0) - die("both --port and --url options specify" - " server port"); - - ci->port = amqp_port; - } - - if (amqp_username) { - if (ci->user) - die("both --username and --url options specify" - " AMQP username"); - - ci->user = amqp_username; - } - - if (amqp_password) { - if (ci->password) - die("both --password and --url options specify" - " AMQP password"); - - ci->password = amqp_password; - } - - if (amqp_vhost) { - if (ci->vhost) - die("both --vhost and --url options specify" - " AMQP vhost"); - - ci->vhost = amqp_vhost; - } - - amqp_default_connection_info(&defaults); - - if (!ci->user) - ci->user = defaults.user; - if (!ci->password) - ci->password = defaults.password; - if (!ci->host) - ci->host = defaults.host; - if (ci->port < 0) - ci->port = defaults.port; - if (!ci->vhost) - ci->vhost = defaults.vhost; + if (ci->host) { + die("both --server and --url options specify" + " server host"); + } + + /* parse the server string into a hostname and a port */ + colon = strchr(amqp_server, ':'); + if (colon) { + char *port_end; + size_t host_len; + + /* Deprecate specifying the port number with the + --server option, because it is not ipv6 friendly. + --url now allows connection options to be + specificied concisely. */ + fprintf(stderr, "Specifying the port number with" + " --server is deprecated\n"); + + host_len = colon - amqp_server; + ci->host = malloc(host_len + 1); + memcpy(ci->host, amqp_server, host_len); + ci->host[host_len] = 0; + + if (ci->port >= 0) { + die("both --server and --url options specify" + " server port"); + } + if (amqp_port >= 0) { + die("both --server and --port options specify" + " server port"); + } + + ci->port = strtol(colon+1, &port_end, 10); + if (ci->port < 0 + || ci->port > 65535 + || port_end == colon+1 + || *port_end != 0) { + die("bad server port number in '%s'", + amqp_server); + } + } + } + + if (amqp_port >= 0) { + if (ci->port >= 0) { + die("both --port and --url options specify" + " server port"); + } + + ci->port = amqp_port; + } + + if (amqp_username) { + if (ci->user) { + die("both --username and --url options specify" + " AMQP username"); + } + + ci->user = amqp_username; + } + + if (amqp_password) { + if (ci->password) { + die("both --password and --url options specify" + " AMQP password"); + } + + ci->password = amqp_password; + } + + if (amqp_vhost) { + if (ci->vhost) { + die("both --vhost and --url options specify" + " AMQP vhost"); + } + + ci->vhost = amqp_vhost; + } + + amqp_default_connection_info(&defaults); + + if (!ci->user) { + ci->user = defaults.user; + } + if (!ci->password) { + ci->password = defaults.password; + } + if (!ci->host) { + ci->host = defaults.host; + } + if (ci->port < 0) { + ci->port = defaults.port; + } + if (!ci->vhost) { + ci->vhost = defaults.vhost; + } } amqp_connection_state_t make_connection(void) { - int s; - struct amqp_connection_info ci; - amqp_connection_state_t conn; + int s; + struct amqp_connection_info ci; + amqp_connection_state_t conn; - init_connection_info(&ci); + init_connection_info(&ci); - s = amqp_open_socket(ci.host, ci.port); - die_amqp_error(s, "opening socket to %s:%d", ci.host, ci.port); + s = amqp_open_socket(ci.host, ci.port); + die_amqp_error(s, "opening socket to %s:%d", ci.host, ci.port); - conn = amqp_new_connection(); - amqp_set_sockfd(conn, s); + conn = amqp_new_connection(); + amqp_set_sockfd(conn, s); - die_rpc(amqp_login(conn, ci.vhost, 0, 131072, 0, - AMQP_SASL_METHOD_PLAIN, - ci.user, ci.password), - "logging in to AMQP server"); + die_rpc(amqp_login(conn, ci.vhost, 0, 131072, 0, + AMQP_SASL_METHOD_PLAIN, + ci.user, ci.password), + "logging in to AMQP server"); - if (!amqp_channel_open(conn, 1)) - die_rpc(amqp_get_rpc_reply(conn), "opening channel"); + if (!amqp_channel_open(conn, 1)) { + die_rpc(amqp_get_rpc_reply(conn), "opening channel"); + } - return conn; + return conn; } void close_connection(amqp_connection_state_t conn) { - int res; - die_rpc(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), - "closing channel"); - die_rpc(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), - "closing connection"); - - res = amqp_destroy_connection(conn); - die_amqp_error(res, "closing connection"); + int res; + die_rpc(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), + "closing channel"); + die_rpc(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), + "closing connection"); + + res = amqp_destroy_connection(conn); + die_amqp_error(res, "closing connection"); } amqp_bytes_t read_all(int fd) { - size_t space = 4096; - amqp_bytes_t bytes; - - bytes.bytes = malloc(space); - bytes.len = 0; - - for (;;) { - ssize_t res = read(fd, (char *)bytes.bytes + bytes.len, - space-bytes.len); - if (res == 0) - break; - - if (res < 0) { - if (errno == EINTR) - continue; - - die_errno(errno, "reading"); - } - - bytes.len += res; - if (bytes.len == space) { - space *= 2; - bytes.bytes = realloc(bytes.bytes, space); - } - } - - return bytes; + size_t space = 4096; + amqp_bytes_t bytes; + + bytes.bytes = malloc(space); + bytes.len = 0; + + for (;;) { + ssize_t res = read(fd, (char *)bytes.bytes + bytes.len, + space-bytes.len); + if (res == 0) { + break; + } + + if (res < 0) { + if (errno == EINTR) { + continue; + } + + die_errno(errno, "reading"); + } + + bytes.len += res; + if (bytes.len == space) { + space *= 2; + bytes.bytes = realloc(bytes.bytes, space); + } + } + + return bytes; } void write_all(int fd, amqp_bytes_t data) { - while (data.len > 0) { - ssize_t res = write(fd, data.bytes, data.len); - if (res < 0) - die_errno(errno, "write"); - - data.len -= res; - data.bytes = (char *)data.bytes + res; - } + while (data.len > 0) { + ssize_t res = write(fd, data.bytes, data.len); + if (res < 0) { + die_errno(errno, "write"); + } + + data.len -= res; + data.bytes = (char *)data.bytes + res; + } } void copy_body(amqp_connection_state_t conn, int fd) { - size_t body_remaining; - amqp_frame_t frame; - - int res = amqp_simple_wait_frame(conn, &frame); - die_amqp_error(res, "waiting for header frame"); - if (frame.frame_type != AMQP_FRAME_HEADER) - die("expected header, got frame type 0x%X", - frame.frame_type); - - body_remaining = frame.payload.properties.body_size; - while (body_remaining) { - res = amqp_simple_wait_frame(conn, &frame); - die_amqp_error(res, "waiting for body frame"); - if (frame.frame_type != AMQP_FRAME_BODY) - die("expected body, got frame type 0x%X", - frame.frame_type); - - write_all(fd, frame.payload.body_fragment); - body_remaining -= frame.payload.body_fragment.len; - } + size_t body_remaining; + amqp_frame_t frame; + + int res = amqp_simple_wait_frame(conn, &frame); + die_amqp_error(res, "waiting for header frame"); + if (frame.frame_type != AMQP_FRAME_HEADER) { + die("expected header, got frame type 0x%X", + frame.frame_type); + } + + body_remaining = frame.payload.properties.body_size; + while (body_remaining) { + res = amqp_simple_wait_frame(conn, &frame); + die_amqp_error(res, "waiting for body frame"); + if (frame.frame_type != AMQP_FRAME_BODY) { + die("expected body, got frame type 0x%X", + frame.frame_type); + } + + write_all(fd, frame.payload.body_fragment); + body_remaining -= frame.payload.body_fragment.len; + } } poptContext process_options(int argc, const char **argv, - struct poptOption *options, - const char *help) + struct poptOption *options, + const char *help) { - int c; - poptContext opts = poptGetContext(NULL, argc, argv, options, 0); - poptSetOtherOptionHelp(opts, help); - - while ((c = poptGetNextOpt(opts)) >= 0) { - /* no options require explicit handling */ - } - - if (c < -1) { - fprintf(stderr, "%s: %s\n", - poptBadOption(opts, POPT_BADOPTION_NOALIAS), - poptStrerror(c)); - poptPrintUsage(opts, stderr, 0); - exit(1); - } - - return opts; + int c; + poptContext opts = poptGetContext(NULL, argc, argv, options, 0); + poptSetOtherOptionHelp(opts, help); + + while ((c = poptGetNextOpt(opts)) >= 0) { + /* no options require explicit handling */ + } + + if (c < -1) { + fprintf(stderr, "%s: %s\n", + poptBadOption(opts, POPT_BADOPTION_NOALIAS), + poptStrerror(c)); + poptPrintUsage(opts, stderr, 0); + exit(1); + } + + return opts; } void process_all_options(int argc, const char **argv, - struct poptOption *options) + struct poptOption *options) { - poptContext opts = process_options(argc, argv, options, - "[OPTIONS]..."); - const char *opt = poptPeekArg(opts); + poptContext opts = process_options(argc, argv, options, + "[OPTIONS]..."); + const char *opt = poptPeekArg(opts); - if (opt) { - fprintf(stderr, "unexpected operand: %s\n", opt); - poptPrintUsage(opts, stderr, 0); - exit(1); - } + if (opt) { + fprintf(stderr, "unexpected operand: %s\n", opt); + poptPrintUsage(opts, stderr, 0); + exit(1); + } - poptFreeContext(opts); + poptFreeContext(opts); } amqp_bytes_t cstring_bytes(const char *str) { - return str ? amqp_cstring_bytes(str) : amqp_empty_bytes; + return str ? amqp_cstring_bytes(str) : amqp_empty_bytes; } diff --git a/tools/common.h b/tools/common.h index 0a9af9d..77979ee 100644 --- a/tools/common.h +++ b/tools/common.h @@ -42,13 +42,13 @@ extern char *amqp_server_exception_string(amqp_rpc_reply_t r); extern char *amqp_rpc_reply_string(amqp_rpc_reply_t r); extern void die(const char *fmt, ...) - __attribute__ ((format (printf, 1, 2))); +__attribute__ ((format (printf, 1, 2))); extern void die_errno(int err, const char *fmt, ...) - __attribute__ ((format (printf, 2, 3))); +__attribute__ ((format (printf, 2, 3))); extern void die_amqp_error(int err, const char *fmt, ...) - __attribute__ ((format (printf, 2, 3))); +__attribute__ ((format (printf, 2, 3))); extern void die_rpc(amqp_rpc_reply_t r, const char *fmt, ...) - __attribute__ ((format (printf, 2, 3))); +__attribute__ ((format (printf, 2, 3))); extern const char *connect_options_title; extern struct poptOption connect_options[]; @@ -61,12 +61,12 @@ extern void write_all(int fd, amqp_bytes_t data); extern void copy_body(amqp_connection_state_t conn, int fd); #define INCLUDE_OPTIONS(options) \ - {NULL, 0, POPT_ARG_INCLUDE_TABLE, options, 0, options ## _title, NULL} + {NULL, 0, POPT_ARG_INCLUDE_TABLE, options, 0, options ## _title, NULL} extern poptContext process_options(int argc, const char **argv, - struct poptOption *options, - const char *help); + struct poptOption *options, + const char *help); extern void process_all_options(int argc, const char **argv, - struct poptOption *options); + struct poptOption *options); extern amqp_bytes_t cstring_bytes(const char *str); diff --git a/tools/consume.c b/tools/consume.c index d6ddfa8..c52311e 100644 --- a/tools/consume.c +++ b/tools/consume.c @@ -45,164 +45,179 @@ use the same escaping conventions as rabbitmqctl. */ static char *stringify_bytes(amqp_bytes_t bytes) { - /* We will need up to 4 chars per byte, plus the terminating 0 */ - char *res = malloc(bytes.len * 4 + 1); - uint8_t *data = bytes.bytes; - char *p = res; - size_t i; - - for (i = 0; i < bytes.len; i++) { - if (data[i] >= 32 && data[i] != 127) { - *p++ = data[i]; - } - else { - *p++ = '\\'; - *p++ = '0' + (data[i] >> 6); - *p++ = '0' + (data[i] >> 3 & 0x7); - *p++ = '0' + (data[i] & 0x7); - } - } - - *p = 0; - return res; + /* We will need up to 4 chars per byte, plus the terminating 0 */ + char *res = malloc(bytes.len * 4 + 1); + uint8_t *data = bytes.bytes; + char *p = res; + size_t i; + + for (i = 0; i < bytes.len; i++) { + if (data[i] >= 32 && data[i] != 127) { + *p++ = data[i]; + } else { + *p++ = '\\'; + *p++ = '0' + (data[i] >> 6); + *p++ = '0' + (data[i] >> 3 & 0x7); + *p++ = '0' + (data[i] & 0x7); + } + } + + *p = 0; + return res; } static amqp_bytes_t setup_queue(amqp_connection_state_t conn, - char *queue, char *exchange, - char *routing_key, int declare) + char *queue, char *exchange, + char *routing_key, int declare) { - amqp_bytes_t queue_bytes = cstring_bytes(queue); - - /* if an exchange name wasn't provided, check that we don't - have options that require it. */ - if (!exchange && routing_key) { - fprintf(stderr, "--routing-key option requires an exchange" - " name to be provided with --exchange\n"); - exit(1); - } - - if (!queue || exchange || declare) { - /* Declare the queue as auto-delete. */ - amqp_queue_declare_ok_t *res = amqp_queue_declare(conn, 1, - queue_bytes, 0, 0, 1, 1, - amqp_empty_table); - if (!res) - die_rpc(amqp_get_rpc_reply(conn), "queue.declare"); - - if (!queue) { - /* the server should have provided a queue name */ - char *sq; - queue_bytes = amqp_bytes_malloc_dup(res->queue); - sq = stringify_bytes(queue_bytes); - fprintf(stderr, "Server provided queue name: %s\n", - sq); - free(sq); - } - - /* Bind to an exchange if requested */ - if (exchange) { - amqp_bytes_t eb = amqp_cstring_bytes(exchange); - if (!amqp_queue_bind(conn, 1, queue_bytes, eb, - cstring_bytes(routing_key), - amqp_empty_table)) - die_rpc(amqp_get_rpc_reply(conn), - "queue.bind"); - } - } - - return queue_bytes; + amqp_bytes_t queue_bytes = cstring_bytes(queue); + + /* if an exchange name wasn't provided, check that we don't + have options that require it. */ + if (!exchange && routing_key) { + fprintf(stderr, "--routing-key option requires an exchange" + " name to be provided with --exchange\n"); + exit(1); + } + + if (!queue || exchange || declare) { + /* Declare the queue as auto-delete. */ + amqp_queue_declare_ok_t *res = amqp_queue_declare(conn, 1, + queue_bytes, 0, 0, 1, 1, + amqp_empty_table); + if (!res) { + die_rpc(amqp_get_rpc_reply(conn), "queue.declare"); + } + + if (!queue) { + /* the server should have provided a queue name */ + char *sq; + queue_bytes = amqp_bytes_malloc_dup(res->queue); + sq = stringify_bytes(queue_bytes); + fprintf(stderr, "Server provided queue name: %s\n", + sq); + free(sq); + } + + /* Bind to an exchange if requested */ + if (exchange) { + amqp_bytes_t eb = amqp_cstring_bytes(exchange); + if (!amqp_queue_bind(conn, 1, queue_bytes, eb, + cstring_bytes(routing_key), + amqp_empty_table)) + die_rpc(amqp_get_rpc_reply(conn), + "queue.bind"); + } + } + + return queue_bytes; } static void do_consume(amqp_connection_state_t conn, amqp_bytes_t queue, - int no_ack, int count, const char * const *argv) + int no_ack, int count, const char *const *argv) { - int i; - - /* If there is a limit, set the qos to match */ - if (count > 0 && count <= 65535 - && !amqp_basic_qos(conn, 1, 0, count, 0)) - die_rpc(amqp_get_rpc_reply(conn), "basic.qos"); - - if (!amqp_basic_consume(conn, 1, queue, amqp_empty_bytes, 0, no_ack, - 0, amqp_empty_table)) - die_rpc(amqp_get_rpc_reply(conn), "basic.consume"); - - for (i = 0; count < 0 || i < count; i++) { - amqp_frame_t frame; - struct pipeline pl; - uint64_t delivery_tag; - int res = amqp_simple_wait_frame(conn, &frame); - die_amqp_error(res, "waiting for header frame"); - - if (frame.frame_type != AMQP_FRAME_METHOD - || frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) - continue; - - amqp_basic_deliver_t *deliver - = (amqp_basic_deliver_t *)frame.payload.method.decoded; - delivery_tag = deliver->delivery_tag; - - pipeline(argv, &pl); - copy_body(conn, pl.infd); - - if (finish_pipeline(&pl) && !no_ack) - die_amqp_error(amqp_basic_ack(conn, 1, delivery_tag, - 0), - "basic.ack"); - - amqp_maybe_release_buffers(conn); - } + int i; + + /* If there is a limit, set the qos to match */ + if (count > 0 && count <= 65535 + && !amqp_basic_qos(conn, 1, 0, count, 0)) { + die_rpc(amqp_get_rpc_reply(conn), "basic.qos"); + } + + if (!amqp_basic_consume(conn, 1, queue, amqp_empty_bytes, 0, no_ack, + 0, amqp_empty_table)) { + die_rpc(amqp_get_rpc_reply(conn), "basic.consume"); + } + + for (i = 0; count < 0 || i < count; i++) { + amqp_frame_t frame; + struct pipeline pl; + uint64_t delivery_tag; + int res = amqp_simple_wait_frame(conn, &frame); + die_amqp_error(res, "waiting for header frame"); + + if (frame.frame_type != AMQP_FRAME_METHOD + || frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) { + continue; + } + + amqp_basic_deliver_t *deliver + = (amqp_basic_deliver_t *)frame.payload.method.decoded; + delivery_tag = deliver->delivery_tag; + + pipeline(argv, &pl); + copy_body(conn, pl.infd); + + if (finish_pipeline(&pl) && !no_ack) + die_amqp_error(amqp_basic_ack(conn, 1, delivery_tag, + 0), + "basic.ack"); + + amqp_maybe_release_buffers(conn); + } } int main(int argc, const char **argv) { - poptContext opts; - amqp_connection_state_t conn; - const char * const *cmd_argv; - char *queue = NULL; - char *exchange = NULL; - char *routing_key = NULL; - int declare = 0; - int no_ack = 0; - int count = -1; - amqp_bytes_t queue_bytes; - - struct poptOption options[] = { - INCLUDE_OPTIONS(connect_options), - {"queue", 'q', POPT_ARG_STRING, &queue, 0, - "the queue to consume from", "queue"}, - {"exchange", 'e', POPT_ARG_STRING, &exchange, 0, - "bind the queue to this exchange", "exchange"}, - {"routing-key", 'r', POPT_ARG_STRING, &routing_key, 0, - "the routing key to bind with", "routing key"}, - {"declare", 'd', POPT_ARG_NONE, &declare, 0, - "declare an exclusive queue", NULL}, - {"no-ack", 'A', POPT_ARG_NONE, &no_ack, 0, - "consume in no-ack mode", NULL}, - {"count", 'c', POPT_ARG_INT, &count, 0, - "stop consuming after this many messages are consumed", - "limit"}, - POPT_AUTOHELP - { NULL, '\0', 0, NULL, 0, NULL, NULL } - }; - - opts = process_options(argc, argv, options, - "[OPTIONS]... "); - - cmd_argv = poptGetArgs(opts); - if (!cmd_argv || !cmd_argv[0]) { - fprintf(stderr, "consuming command not specified\n"); - poptPrintUsage(opts, stderr, 0); - goto error; - } - - conn = make_connection(); - queue_bytes = setup_queue(conn, queue, exchange, routing_key, declare); - do_consume(conn, queue_bytes, no_ack, count, cmd_argv); - close_connection(conn); - return 0; + poptContext opts; + amqp_connection_state_t conn; + const char *const *cmd_argv; + char *queue = NULL; + char *exchange = NULL; + char *routing_key = NULL; + int declare = 0; + int no_ack = 0; + int count = -1; + amqp_bytes_t queue_bytes; + + struct poptOption options[] = { + INCLUDE_OPTIONS(connect_options), + { + "queue", 'q', POPT_ARG_STRING, &queue, 0, + "the queue to consume from", "queue" + }, + { + "exchange", 'e', POPT_ARG_STRING, &exchange, 0, + "bind the queue to this exchange", "exchange" + }, + { + "routing-key", 'r', POPT_ARG_STRING, &routing_key, 0, + "the routing key to bind with", "routing key" + }, + { + "declare", 'd', POPT_ARG_NONE, &declare, 0, + "declare an exclusive queue", NULL + }, + { + "no-ack", 'A', POPT_ARG_NONE, &no_ack, 0, + "consume in no-ack mode", NULL + }, + { + "count", 'c', POPT_ARG_INT, &count, 0, + "stop consuming after this many messages are consumed", + "limit" + }, + POPT_AUTOHELP + { NULL, '\0', 0, NULL, 0, NULL, NULL } + }; + + opts = process_options(argc, argv, options, + "[OPTIONS]... "); + + cmd_argv = poptGetArgs(opts); + if (!cmd_argv || !cmd_argv[0]) { + fprintf(stderr, "consuming command not specified\n"); + poptPrintUsage(opts, stderr, 0); + goto error; + } + + conn = make_connection(); + queue_bytes = setup_queue(conn, queue, exchange, routing_key, declare); + do_consume(conn, queue_bytes, no_ack, count, cmd_argv); + close_connection(conn); + return 0; error: - poptFreeContext(opts); - return 1; + poptFreeContext(opts); + return 1; } diff --git a/tools/declare_queue.c b/tools/declare_queue.c index 26c3a68..25a94b1 100644 --- a/tools/declare_queue.c +++ b/tools/declare_queue.c @@ -44,41 +44,46 @@ int main(int argc, const char **argv) { - amqp_connection_state_t conn; - char *queue = NULL; - int durable = 0; + amqp_connection_state_t conn; + char *queue = NULL; + int durable = 0; - struct poptOption options[] = { - INCLUDE_OPTIONS(connect_options), - {"queue", 'q', POPT_ARG_STRING, &queue, 0, - "the queue name to declare, or the empty string", "queue"}, - {"durable", 'd', POPT_ARG_VAL, &durable, 1, - "declare a durable queue", NULL}, - POPT_AUTOHELP - { NULL, '\0', 0, NULL, 0, NULL, NULL } - }; + struct poptOption options[] = { + INCLUDE_OPTIONS(connect_options), + { + "queue", 'q', POPT_ARG_STRING, &queue, 0, + "the queue name to declare, or the empty string", "queue" + }, + { + "durable", 'd', POPT_ARG_VAL, &durable, 1, + "declare a durable queue", NULL + }, + POPT_AUTOHELP + { NULL, '\0', 0, NULL, 0, NULL, NULL } + }; - process_all_options(argc, argv, options); + process_all_options(argc, argv, options); - if (queue == NULL) { - fprintf(stderr, "queue name not specified\n"); - return 1; - } + if (queue == NULL) { + fprintf(stderr, "queue name not specified\n"); + return 1; + } - conn = make_connection(); - { - amqp_queue_declare_ok_t *reply = amqp_queue_declare(conn, 1, - cstring_bytes(queue), - 0, - durable, - 0, - 0, - amqp_empty_table); - if (reply == NULL) - die_rpc(amqp_get_rpc_reply(conn), "queue.declare"); + conn = make_connection(); + { + amqp_queue_declare_ok_t *reply = amqp_queue_declare(conn, 1, + cstring_bytes(queue), + 0, + durable, + 0, + 0, + amqp_empty_table); + if (reply == NULL) { + die_rpc(amqp_get_rpc_reply(conn), "queue.declare"); + } - printf("%.*s\n", (int)reply->queue.len, (char *)reply->queue.bytes); - } - close_connection(conn); - return 0; + printf("%.*s\n", (int)reply->queue.len, (char *)reply->queue.bytes); + } + close_connection(conn); + return 0; } diff --git a/tools/delete_queue.c b/tools/delete_queue.c index cb92f7b..4ebbd94 100644 --- a/tools/delete_queue.c +++ b/tools/delete_queue.c @@ -44,41 +44,47 @@ int main(int argc, const char **argv) { - amqp_connection_state_t conn; - char *queue = NULL; - int if_unused = 0; - int if_empty = 0; + amqp_connection_state_t conn; + char *queue = NULL; + int if_unused = 0; + int if_empty = 0; - struct poptOption options[] = { - INCLUDE_OPTIONS(connect_options), - {"queue", 'q', POPT_ARG_STRING, &queue, 0, - "the queue name to delete", "queue"}, - {"if-unused", 'u', POPT_ARG_VAL, &if_unused, 1, - "do not delete unless queue is unused", NULL}, - {"if-empty", 'e', POPT_ARG_VAL, &if_empty, 1, - "do not delete unless queue is empty", NULL}, - POPT_AUTOHELP - { NULL, '\0', 0, NULL, 0, NULL, NULL } - }; + struct poptOption options[] = { + INCLUDE_OPTIONS(connect_options), + { + "queue", 'q', POPT_ARG_STRING, &queue, 0, + "the queue name to delete", "queue" + }, + { + "if-unused", 'u', POPT_ARG_VAL, &if_unused, 1, + "do not delete unless queue is unused", NULL + }, + { + "if-empty", 'e', POPT_ARG_VAL, &if_empty, 1, + "do not delete unless queue is empty", NULL + }, + POPT_AUTOHELP + { NULL, '\0', 0, NULL, 0, NULL, NULL } + }; - process_all_options(argc, argv, options); + process_all_options(argc, argv, options); - if (queue == NULL || *queue == '\0') { - fprintf(stderr, "queue name not specified\n"); - return 1; - } + if (queue == NULL || *queue == '\0') { + fprintf(stderr, "queue name not specified\n"); + return 1; + } - conn = make_connection(); - { - amqp_queue_delete_ok_t *reply = amqp_queue_delete(conn, 1, - cstring_bytes(queue), - if_unused, - if_empty); - if (reply == NULL) { - die_rpc(amqp_get_rpc_reply(conn), "queue.delete"); - } - printf("%u\n", reply->message_count); - } - close_connection(conn); - return 0; + conn = make_connection(); + { + amqp_queue_delete_ok_t *reply = amqp_queue_delete(conn, 1, + cstring_bytes(queue), + if_unused, + if_empty); + if (reply == NULL) { + die_rpc(amqp_get_rpc_reply(conn), "queue.delete"); + } + printf("%u\n", reply->message_count); + } + close_connection(conn); + return 0; } diff --git a/tools/get.c b/tools/get.c index 888e069..bbcde72 100644 --- a/tools/get.c +++ b/tools/get.c @@ -41,40 +41,43 @@ static int do_get(amqp_connection_state_t conn, char *queue) { - amqp_rpc_reply_t r - = amqp_basic_get(conn, 1, cstring_bytes(queue), 1); - die_rpc(r, "basic.get"); + amqp_rpc_reply_t r + = amqp_basic_get(conn, 1, cstring_bytes(queue), 1); + die_rpc(r, "basic.get"); - if (r.reply.id == AMQP_BASIC_GET_EMPTY_METHOD) - return 0; + if (r.reply.id == AMQP_BASIC_GET_EMPTY_METHOD) { + return 0; + } - copy_body(conn, 1); - return 1; + copy_body(conn, 1); + return 1; } int main(int argc, const char **argv) { - amqp_connection_state_t conn; - char *queue = NULL; - int got_something; + amqp_connection_state_t conn; + char *queue = NULL; + int got_something; - struct poptOption options[] = { - INCLUDE_OPTIONS(connect_options), - {"queue", 'q', POPT_ARG_STRING, &queue, 0, - "the queue to consume from", "queue"}, - POPT_AUTOHELP - { NULL, '\0', 0, NULL, 0, NULL, NULL } - }; + struct poptOption options[] = { + INCLUDE_OPTIONS(connect_options), + { + "queue", 'q', POPT_ARG_STRING, &queue, 0, + "the queue to consume from", "queue" + }, + POPT_AUTOHELP + { NULL, '\0', 0, NULL, 0, NULL, NULL } + }; - process_all_options(argc, argv, options); + process_all_options(argc, argv, options); - if (!queue) { - fprintf(stderr, "queue not specified\n"); - return 1; - } + if (!queue) { + fprintf(stderr, "queue not specified\n"); + return 1; + } - conn = make_connection(); - got_something = do_get(conn, queue); - close_connection(conn); - return got_something ? 0 : 2; + conn = make_connection(); + got_something = do_get(conn, queue); + close_connection(conn); + return got_something ? 0 : 2; } diff --git a/tools/publish.c b/tools/publish.c index 5ebc191..617b055 100644 --- a/tools/publish.c +++ b/tools/publish.c @@ -43,80 +43,94 @@ static void do_publish(amqp_connection_state_t conn, char *exchange, char *routing_key, - amqp_basic_properties_t *props, amqp_bytes_t body) + amqp_basic_properties_t *props, amqp_bytes_t body) { - int res = amqp_basic_publish(conn, 1, - cstring_bytes(exchange), - cstring_bytes(routing_key), - 0, 0, props, body); - die_amqp_error(res, "basic.publish"); + int res = amqp_basic_publish(conn, 1, + cstring_bytes(exchange), + cstring_bytes(routing_key), + 0, 0, props, body); + die_amqp_error(res, "basic.publish"); } int main(int argc, const char **argv) { - amqp_connection_state_t conn; - char *exchange = NULL; - char *routing_key = NULL; - char *content_type = NULL; - char *content_encoding = NULL; - char *body = NULL; - amqp_basic_properties_t props; - amqp_bytes_t body_bytes; - int delivery = 1; /* non-persistent by default */ - - struct poptOption options[] = { - INCLUDE_OPTIONS(connect_options), - {"exchange", 'e', POPT_ARG_STRING, &exchange, 0, - "the exchange to publish to", "exchange"}, - {"routing-key", 'r', POPT_ARG_STRING, &routing_key, 0, - "the routing key to publish with", "routing key"}, - {"persistent", 'p', POPT_ARG_VAL, &delivery, 2, - "use the persistent delivery mode", NULL}, - {"content-type", 'C', POPT_ARG_STRING, &content_type, 0, - "the content-type for the message", "content type"}, - {"content-encoding", 'E', POPT_ARG_STRING, - &content_encoding, 0, - "the content-encoding for the message", "content encoding"}, - {"body", 'b', POPT_ARG_STRING, &body, 0, - "specify the message body", "body"}, - POPT_AUTOHELP - { NULL, '\0', 0, NULL, 0, NULL, NULL } - }; - - process_all_options(argc, argv, options); - - if (!exchange && !routing_key) { - fprintf(stderr, - "neither exchange nor routing key specified\n"); - return 1; - } - - memset(&props, 0, sizeof props); - props._flags = AMQP_BASIC_DELIVERY_MODE_FLAG; - props.delivery_mode = 2; /* persistent delivery mode */ - - if (content_type) { - props._flags |= AMQP_BASIC_CONTENT_TYPE_FLAG; - props.content_type = amqp_cstring_bytes(content_type); - } - - if (content_encoding) { - props._flags |= AMQP_BASIC_CONTENT_ENCODING_FLAG; - props.content_encoding = amqp_cstring_bytes(content_encoding); - } - - conn = make_connection(); - - if (body) - body_bytes = amqp_cstring_bytes(body); - else - body_bytes = read_all(0); - - do_publish(conn, exchange, routing_key, &props, body_bytes); - - if (!body) - free(body_bytes.bytes); - - close_connection(conn); - return 0; + amqp_connection_state_t conn; + char *exchange = NULL; + char *routing_key = NULL; + char *content_type = NULL; + char *content_encoding = NULL; + char *body = NULL; + amqp_basic_properties_t props; + amqp_bytes_t body_bytes; + int delivery = 1; /* non-persistent by default */ + + struct poptOption options[] = { + INCLUDE_OPTIONS(connect_options), + { + "exchange", 'e', POPT_ARG_STRING, &exchange, 0, + "the exchange to publish to", "exchange" + }, + { + "routing-key", 'r', POPT_ARG_STRING, &routing_key, 0, + "the routing key to publish with", "routing key" + }, + { + "persistent", 'p', POPT_ARG_VAL, &delivery, 2, + "use the persistent delivery mode", NULL + }, + { + "content-type", 'C', POPT_ARG_STRING, &content_type, 0, + "the content-type for the message", "content type" + }, + { + "content-encoding", 'E', POPT_ARG_STRING, + &content_encoding, 0, + "the content-encoding for the message", "content encoding" + }, + { + "body", 'b', POPT_ARG_STRING, &body, 0, + "specify the message body", "body" + }, + POPT_AUTOHELP + { NULL, '\0', 0, NULL, 0, NULL, NULL } + }; + + process_all_options(argc, argv, options); + + if (!exchange && !routing_key) { + fprintf(stderr, + "neither exchange nor routing key specified\n"); + return 1; + } + + memset(&props, 0, sizeof props); + props._flags = AMQP_BASIC_DELIVERY_MODE_FLAG; + props.delivery_mode = 2; /* persistent delivery mode */ + + if (content_type) { + props._flags |= AMQP_BASIC_CONTENT_TYPE_FLAG; + props.content_type = amqp_cstring_bytes(content_type); + } + + if (content_encoding) { + props._flags |= AMQP_BASIC_CONTENT_ENCODING_FLAG; + props.content_encoding = amqp_cstring_bytes(content_encoding); + } + + conn = make_connection(); + + if (body) { + body_bytes = amqp_cstring_bytes(body); + } else { + body_bytes = read_all(0); + } + + do_publish(conn, exchange, routing_key, &props, body_bytes); + + if (!body) { + free(body_bytes.bytes); + } + + close_connection(conn); + return 0; } diff --git a/tools/unix/process.c b/tools/unix/process.c index a714928..249fba3 100644 --- a/tools/unix/process.c +++ b/tools/unix/process.c @@ -47,41 +47,45 @@ extern char **environ; void pipeline(const char *const *argv, struct pipeline *pl) { - posix_spawn_file_actions_t file_acts; + posix_spawn_file_actions_t file_acts; - int pipefds[2]; - if (pipe(pipefds)) - die_errno(errno, "pipe"); + int pipefds[2]; + if (pipe(pipefds)) { + die_errno(errno, "pipe"); + } - die_errno(posix_spawn_file_actions_init(&file_acts), - "posix_spawn_file_actions_init"); - die_errno(posix_spawn_file_actions_adddup2(&file_acts, pipefds[0], 0), - "posix_spawn_file_actions_adddup2"); - die_errno(posix_spawn_file_actions_addclose(&file_acts, pipefds[0]), - "posix_spawn_file_actions_addclose"); - die_errno(posix_spawn_file_actions_addclose(&file_acts, pipefds[1]), - "posix_spawn_file_actions_addclose"); + die_errno(posix_spawn_file_actions_init(&file_acts), + "posix_spawn_file_actions_init"); + die_errno(posix_spawn_file_actions_adddup2(&file_acts, pipefds[0], 0), + "posix_spawn_file_actions_adddup2"); + die_errno(posix_spawn_file_actions_addclose(&file_acts, pipefds[0]), + "posix_spawn_file_actions_addclose"); + die_errno(posix_spawn_file_actions_addclose(&file_acts, pipefds[1]), + "posix_spawn_file_actions_addclose"); - die_errno(posix_spawnp(&pl->pid, argv[0], &file_acts, NULL, - (char * const *)argv, environ), - "posix_spawnp: %s", argv[0]); + die_errno(posix_spawnp(&pl->pid, argv[0], &file_acts, NULL, + (char * const *)argv, environ), + "posix_spawnp: %s", argv[0]); - die_errno(posix_spawn_file_actions_destroy(&file_acts), - "posix_spawn_file_actions_destroy"); + die_errno(posix_spawn_file_actions_destroy(&file_acts), + "posix_spawn_file_actions_destroy"); - if (close(pipefds[0])) - die_errno(errno, "close"); + if (close(pipefds[0])) { + die_errno(errno, "close"); + } - pl->infd = pipefds[1]; + pl->infd = pipefds[1]; } int finish_pipeline(struct pipeline *pl) { - int status; + int status; - if (close(pl->infd)) - die_errno(errno, "close"); - if (waitpid(pl->pid, &status, 0) < 0) - die_errno(errno, "waitpid"); - return WIFEXITED(status) && WEXITSTATUS(status) == 0; + if (close(pl->infd)) { + die_errno(errno, "close"); + } + if (waitpid(pl->pid, &status, 0) < 0) { + die_errno(errno, "waitpid"); + } + return WIFEXITED(status) && WEXITSTATUS(status) == 0; } diff --git a/tools/unix/process.h b/tools/unix/process.h index 0aad292..a211f27 100644 --- a/tools/unix/process.h +++ b/tools/unix/process.h @@ -32,8 +32,8 @@ */ struct pipeline { - int pid; - int infd; + int pid; + int infd; }; extern void pipeline(const char *const *argv, struct pipeline *pl); diff --git a/tools/win32/compat.c b/tools/win32/compat.c index cbac8e6..7c7d97e 100644 --- a/tools/win32/compat.c +++ b/tools/win32/compat.c @@ -43,21 +43,22 @@ int asprintf(char **strp, const char *fmt, ...) { - va_list ap; - int len; + va_list ap; + int len; - va_start(ap, fmt); - len = _vscprintf(fmt, ap); - va_end(ap); + va_start(ap, fmt); + len = _vscprintf(fmt, ap); + va_end(ap); - *strp = malloc(len+1); - if (!*strp) - return -1; + *strp = malloc(len+1); + if (!*strp) { + return -1; + } - va_start(ap, fmt); - _vsnprintf(*strp, len+1, fmt, ap); - va_end(ap); + va_start(ap, fmt); + _vsnprintf(*strp, len+1, fmt, ap); + va_end(ap); - (*strp)[len] = 0; - return len; + (*strp)[len] = 0; + return len; } diff --git a/tools/win32/process.c b/tools/win32/process.c index 699f60f..4d5270b 100644 --- a/tools/win32/process.c +++ b/tools/win32/process.c @@ -44,171 +44,188 @@ void die_windows_error(const char *fmt, ...) { - char *msg; - - va_list ap; - va_start(ap, fmt); - vfprintf(stderr, fmt, ap); - va_end(ap); - - if (!FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM - | FORMAT_MESSAGE_ALLOCATE_BUFFER, - NULL, GetLastError(), - MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), - (LPSTR)&msg, 0, NULL)) - msg = "(failed to retrieve Windows error message)"; - - fprintf(stderr, ": %s\n", msg); - exit(1); + char *msg; + + va_list ap; + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); + + if (!FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM + | FORMAT_MESSAGE_ALLOCATE_BUFFER, + NULL, GetLastError(), + MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), + (LPSTR)&msg, 0, NULL)) { + msg = "(failed to retrieve Windows error message)"; + } + + fprintf(stderr, ": %s\n", msg); + exit(1); } static char *make_command_line(const char *const *argv) { - int i; - size_t len = 1; /* initial quotes */ - char *buf; - char *dest; - - /* calculate the length of the required buffer, making worst - case assumptions for simplicity */ - for (i = 0;;) { - /* each character could need escaping */ - len += strlen(argv[i]) * 2; - - if (!argv[++i]) - break; - - len += 3; /* quotes, space, quotes */ - } - - len += 2; /* final quotes and the terminating zero */ - - dest = buf = malloc(len); - if (!buf) - die("allocating memory for subprocess command line"); - - /* Here we perform the inverse of the CommandLineToArgvW - function. Note that its rules are slightly crazy: A - sequence of backslashes only act to escape if followed by - double quotes. A sequence of backslashes not followed by - double quotes is untouched. */ - - for (i = 0;;) { - const char *src = argv[i]; - int backslashes = 0; - - *dest++ = '\"'; - - for (;;) { - switch (*src) { - case 0: - goto done; - - case '\"': - for (; backslashes; backslashes--) - *dest++ = '\\'; - - *dest++ = '\\'; - *dest++ = '\"'; - break; - - case '\\': - backslashes++; - *dest++ = '\\'; - break; - - default: - backslashes = 0; - *dest++ = *src; - break; - } - - src++; - } - done: - for (; backslashes; backslashes--) - *dest++ = '\\'; - - *dest++ = '\"'; - - if (!argv[++i]) - break; - - *dest++ = ' '; - } - - *dest++ = 0; - return buf; + int i; + size_t len = 1; /* initial quotes */ + char *buf; + char *dest; + + /* calculate the length of the required buffer, making worst + case assumptions for simplicity */ + for (i = 0;;) { + /* each character could need escaping */ + len += strlen(argv[i]) * 2; + + if (!argv[++i]) { + break; + } + + len += 3; /* quotes, space, quotes */ + } + + len += 2; /* final quotes and the terminating zero */ + + dest = buf = malloc(len); + if (!buf) { + die("allocating memory for subprocess command line"); + } + + /* Here we perform the inverse of the CommandLineToArgvW + function. Note that its rules are slightly crazy: A + sequence of backslashes only act to escape if followed by + double quotes. A sequence of backslashes not followed by + double quotes is untouched. */ + + for (i = 0;;) { + const char *src = argv[i]; + int backslashes = 0; + + *dest++ = '\"'; + + for (;;) { + switch (*src) { + case 0: + goto done; + + case '\"': + for (; backslashes; backslashes--) { + *dest++ = '\\'; + } + + *dest++ = '\\'; + *dest++ = '\"'; + break; + + case '\\': + backslashes++; + *dest++ = '\\'; + break; + + default: + backslashes = 0; + *dest++ = *src; + break; + } + + src++; + } +done: + for (; backslashes; backslashes--) { + *dest++ = '\\'; + } + + *dest++ = '\"'; + + if (!argv[++i]) { + break; + } + + *dest++ = ' '; + } + + *dest++ = 0; + return buf; } void pipeline(const char *const *argv, struct pipeline *pl) { - HANDLE in_read_handle, in_write_handle; - SECURITY_ATTRIBUTES sec_attr; - PROCESS_INFORMATION proc_info; - STARTUPINFO start_info; - char *cmdline = make_command_line(argv); - - sec_attr.nLength = sizeof sec_attr; - sec_attr.bInheritHandle = TRUE; - sec_attr.lpSecurityDescriptor = NULL; - - if (!CreatePipe(&in_read_handle, &in_write_handle, &sec_attr, 0)) - die_windows_error("CreatePipe"); - - if (!SetHandleInformation(in_write_handle, HANDLE_FLAG_INHERIT, 0)) - die_windows_error("SetHandleInformation"); - - /* when in Rome... */ - ZeroMemory(&proc_info, sizeof proc_info); - ZeroMemory(&start_info, sizeof start_info); - - start_info.cb = sizeof start_info; - start_info.dwFlags |= STARTF_USESTDHANDLES; - - if ((start_info.hStdError = GetStdHandle(STD_ERROR_HANDLE)) - == INVALID_HANDLE_VALUE - || (start_info.hStdOutput = GetStdHandle(STD_OUTPUT_HANDLE)) - == INVALID_HANDLE_VALUE) - die_windows_error("GetStdHandle"); - - start_info.hStdInput = in_read_handle; - - if (!CreateProcess(NULL, cmdline, NULL, NULL, TRUE, 0, - NULL, NULL, &start_info, &proc_info)) - die_windows_error("CreateProcess"); - - free(cmdline); - - if (!CloseHandle(proc_info.hThread)) - die_windows_error("CloseHandle for thread"); - if (!CloseHandle(in_read_handle)) - die_windows_error("CloseHandle"); - - pl->proc_handle = proc_info.hProcess; - pl->infd = _open_osfhandle((intptr_t)in_write_handle, 0); + HANDLE in_read_handle, in_write_handle; + SECURITY_ATTRIBUTES sec_attr; + PROCESS_INFORMATION proc_info; + STARTUPINFO start_info; + char *cmdline = make_command_line(argv); + + sec_attr.nLength = sizeof sec_attr; + sec_attr.bInheritHandle = TRUE; + sec_attr.lpSecurityDescriptor = NULL; + + if (!CreatePipe(&in_read_handle, &in_write_handle, &sec_attr, 0)) { + die_windows_error("CreatePipe"); + } + + if (!SetHandleInformation(in_write_handle, HANDLE_FLAG_INHERIT, 0)) { + die_windows_error("SetHandleInformation"); + } + + /* when in Rome... */ + ZeroMemory(&proc_info, sizeof proc_info); + ZeroMemory(&start_info, sizeof start_info); + + start_info.cb = sizeof start_info; + start_info.dwFlags |= STARTF_USESTDHANDLES; + + if ((start_info.hStdError = GetStdHandle(STD_ERROR_HANDLE)) + == INVALID_HANDLE_VALUE + || (start_info.hStdOutput = GetStdHandle(STD_OUTPUT_HANDLE)) + == INVALID_HANDLE_VALUE) { + die_windows_error("GetStdHandle"); + } + + start_info.hStdInput = in_read_handle; + + if (!CreateProcess(NULL, cmdline, NULL, NULL, TRUE, 0, + NULL, NULL, &start_info, &proc_info)) { + die_windows_error("CreateProcess"); + } + + free(cmdline); + + if (!CloseHandle(proc_info.hThread)) { + die_windows_error("CloseHandle for thread"); + } + if (!CloseHandle(in_read_handle)) { + die_windows_error("CloseHandle"); + } + + pl->proc_handle = proc_info.hProcess; + pl->infd = _open_osfhandle((intptr_t)in_write_handle, 0); } int finish_pipeline(struct pipeline *pl) { - DWORD code; - - if (close(pl->infd)) - die_errno(errno, "close"); - - for (;;) { - if (!GetExitCodeProcess(pl->proc_handle, &code)) - die_windows_error("GetExitCodeProcess"); - if (code != STILL_ACTIVE) - break; - - if (WaitForSingleObject(pl->proc_handle, INFINITE) - == WAIT_FAILED) - die_windows_error("WaitForSingleObject"); - } - - if (!CloseHandle(pl->proc_handle)) - die_windows_error("CloseHandle for process"); - - return code; + DWORD code; + + if (close(pl->infd)) { + die_errno(errno, "close"); + } + + for (;;) { + if (!GetExitCodeProcess(pl->proc_handle, &code)) { + die_windows_error("GetExitCodeProcess"); + } + if (code != STILL_ACTIVE) { + break; + } + + if (WaitForSingleObject(pl->proc_handle, INFINITE) + == WAIT_FAILED) { + die_windows_error("WaitForSingleObject"); + } + } + + if (!CloseHandle(pl->proc_handle)) { + die_windows_error("CloseHandle for process"); + } + + return code; } diff --git a/tools/win32/process.h b/tools/win32/process.h index 1429ad2..88f2c7a 100644 --- a/tools/win32/process.h +++ b/tools/win32/process.h @@ -34,8 +34,8 @@ #include struct pipeline { - HANDLE proc_handle; - int infd; + HANDLE proc_handle; + int infd; }; extern void pipeline(const char *const *argv, struct pipeline *pl); -- cgit v1.2.1