diff options
author | Alan Antonuk <alan.antonuk@gmail.com> | 2013-04-11 10:53:59 -0700 |
---|---|---|
committer | Alan Antonuk <alan.antonuk@gmail.com> | 2013-04-17 09:56:29 -0700 |
commit | a2a91f532b7e648b2468acad39ae7169ac020d0a (patch) | |
tree | 78010b44a0e00011cc17787d144abc7dc358c560 | |
parent | 317a56cfffb05e72713bfa8a8e7236b7c39ddb8d (diff) | |
download | rabbitmq-c-a2a91f532b7e648b2468acad39ae7169ac020d0a.tar.gz |
Fix code formatting to match the rest of the lib
-rw-r--r-- | examples/amqps_bind.c | 15 | ||||
-rw-r--r-- | examples/amqps_consumer.c | 34 | ||||
-rw-r--r-- | examples/amqps_exchange_declare.c | 9 | ||||
-rw-r--r-- | examples/amqps_listen.c | 74 | ||||
-rw-r--r-- | examples/amqps_listenq.c | 70 | ||||
-rw-r--r-- | examples/amqps_producer.c | 31 | ||||
-rw-r--r-- | examples/amqps_sendstring.c | 23 | ||||
-rw-r--r-- | examples/amqps_unbind.c | 15 | ||||
-rw-r--r-- | examples/utils.c | 3 | ||||
-rw-r--r-- | librabbitmq/amqp.h | 10 | ||||
-rw-r--r-- | librabbitmq/amqp_connection.c | 3 | ||||
-rw-r--r-- | librabbitmq/amqp_cyassl.c | 244 | ||||
-rw-r--r-- | librabbitmq/amqp_gnutls.c | 406 | ||||
-rw-r--r-- | librabbitmq/amqp_openssl.c | 613 | ||||
-rw-r--r-- | librabbitmq/amqp_polarssl.c | 400 | ||||
-rw-r--r-- | librabbitmq/amqp_socket.c | 57 | ||||
-rw-r--r-- | librabbitmq/amqp_socket.h | 16 | ||||
-rw-r--r-- | librabbitmq/amqp_ssl_socket.h | 14 | ||||
-rw-r--r-- | librabbitmq/amqp_tcp_socket.c | 88 | ||||
-rw-r--r-- | librabbitmq/amqp_url.c | 27 | ||||
-rw-r--r-- | librabbitmq/unix/socket.c | 8 | ||||
-rw-r--r-- | librabbitmq/win32/socket.c | 4 | ||||
-rw-r--r-- | librabbitmq/win32/socket.h | 2 | ||||
-rw-r--r-- | librabbitmq/win32/threads.c | 9 | ||||
-rw-r--r-- | tests/test_parse_url.c | 60 | ||||
-rw-r--r-- | tools/common.c | 37 |
26 files changed, 1152 insertions, 1120 deletions
diff --git a/examples/amqps_bind.c b/examples/amqps_bind.c index 8724ad7..53c5256 100644 --- a/examples/amqps_bind.c +++ b/examples/amqps_bind.c @@ -41,7 +41,8 @@ #include "utils.h" -int main(int argc, char const * const *argv) { +int main(int argc, char const *const *argv) +{ char const *hostname; int port, status; char const *exchange; @@ -52,7 +53,7 @@ int main(int argc, char const * const *argv) { if (argc < 6) { fprintf(stderr, "Usage: amqps_bind host port exchange bindingkey queue " - "[cacert.pem [key.pem cert.pem]]\n"); + "[cacert.pem [key.pem cert.pem]]\n"); return 1; } @@ -90,15 +91,15 @@ int main(int argc, char const * const *argv) { amqp_set_socket(conn, socket); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), - "Logging in"); + "Logging in"); amqp_channel_open(conn, 1); die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); amqp_queue_bind(conn, 1, - amqp_cstring_bytes(queue), - amqp_cstring_bytes(exchange), - amqp_cstring_bytes(bindingkey), - amqp_empty_table); + amqp_cstring_bytes(queue), + amqp_cstring_bytes(exchange), + amqp_cstring_bytes(bindingkey), + amqp_empty_table); die_on_amqp_error(amqp_get_rpc_reply(conn), "Unbinding"); die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); diff --git a/examples/amqps_consumer.c b/examples/amqps_consumer.c index d935c90..c02168a 100644 --- a/examples/amqps_consumer.c +++ b/examples/amqps_consumer.c @@ -66,7 +66,7 @@ static void run(amqp_connection_state_t conn) int countOverInterval = received - previous_received; double intervalRate = countOverInterval / ((now - previous_report_time) / 1000000.0); printf("%d ms: Received %d - %d since last report (%d Hz)\n", - (int)(now - start_time) / 1000, received, countOverInterval, (int) intervalRate); + (int)(now - start_time) / 1000, received, countOverInterval, (int) intervalRate); previous_received = received; previous_report_time = now; @@ -75,18 +75,22 @@ static void run(amqp_connection_state_t conn) amqp_maybe_release_buffers(conn); result = amqp_simple_wait_frame(conn, &frame); - if (result < 0) + if (result < 0) { return; + } - if (frame.frame_type != AMQP_FRAME_METHOD) + if (frame.frame_type != AMQP_FRAME_METHOD) { continue; + } - if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) + if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) { continue; + } result = amqp_simple_wait_frame(conn, &frame); - if (result < 0) + if (result < 0) { return; + } if (frame.frame_type != AMQP_FRAME_HEADER) { fprintf(stderr, "Expected header!"); @@ -98,12 +102,13 @@ static void run(amqp_connection_state_t conn) while (body_received < body_target) { result = amqp_simple_wait_frame(conn, &frame); - if (result < 0) - return; + if (result < 0) { + return; + } if (frame.frame_type != AMQP_FRAME_BODY) { - fprintf(stderr, "Expected body!"); - abort(); + fprintf(stderr, "Expected body!"); + abort(); } body_received += frame.payload.body_fragment.len; @@ -114,7 +119,8 @@ static void run(amqp_connection_state_t conn) } } -int main(int argc, char const * const *argv) { +int main(int argc, char const *const *argv) +{ char const *hostname; int port, status; char const *exchange; @@ -125,7 +131,7 @@ int main(int argc, char const * const *argv) { if (argc < 3) { fprintf(stderr, "Usage: amqps_consumer host port " - "[cacert.pem [key.pem cert.pem]]\n"); + "[cacert.pem [key.pem cert.pem]]\n"); return 1; } @@ -162,13 +168,13 @@ int main(int argc, char const * const *argv) { amqp_set_socket(conn, socket); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), - "Logging in"); + "Logging in"); amqp_channel_open(conn, 1); die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); { amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_empty_bytes, 0, 0, 0, 1, - amqp_empty_table); + amqp_empty_table); die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue"); queuename = amqp_bytes_malloc_dup(r->queue); if (queuename.bytes == NULL) { @@ -178,7 +184,7 @@ int main(int argc, char const * const *argv) { } amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey), - amqp_empty_table); + amqp_empty_table); die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue"); amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 1, 0, amqp_empty_table); diff --git a/examples/amqps_exchange_declare.c b/examples/amqps_exchange_declare.c index c9f2e6c..199c0c5 100644 --- a/examples/amqps_exchange_declare.c +++ b/examples/amqps_exchange_declare.c @@ -41,7 +41,8 @@ #include "utils.h" -int main(int argc, char const * const *argv) { +int main(int argc, char const *const *argv) +{ char const *hostname; int port, status; char const *exchange; @@ -51,7 +52,7 @@ int main(int argc, char const * const *argv) { if (argc < 5) { fprintf(stderr, "Usage: amqps_exchange_declare host port exchange " - "exchangetype [cacert.pem [key.pem cert.pem]]\n"); + "exchangetype [cacert.pem [key.pem cert.pem]]\n"); return 1; } @@ -88,12 +89,12 @@ int main(int argc, char const * const *argv) { amqp_set_socket(conn, socket); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), - "Logging in"); + "Logging in"); amqp_channel_open(conn, 1); die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); amqp_exchange_declare(conn, 1, amqp_cstring_bytes(exchange), amqp_cstring_bytes(exchangetype), - 0, 0, amqp_empty_table); + 0, 0, amqp_empty_table); die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring exchange"); die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); diff --git a/examples/amqps_listen.c b/examples/amqps_listen.c index 9be9acb..728a343 100644 --- a/examples/amqps_listen.c +++ b/examples/amqps_listen.c @@ -43,7 +43,8 @@ #include "utils.h" -int main(int argc, char const * const *argv) { +int main(int argc, char const *const *argv) +{ char const *hostname; int port, status; char const *exchange; @@ -55,7 +56,7 @@ int main(int argc, char const * const *argv) { if (argc < 5) { fprintf(stderr, "Usage: amqps_listen host port exchange bindingkey " - "[cacert.pem [key.pem cert.pem]]\n"); + "[cacert.pem [key.pem cert.pem]]\n"); return 1; } @@ -92,13 +93,13 @@ int main(int argc, char const * const *argv) { amqp_set_socket(conn, socket); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), - "Logging in"); + "Logging in"); amqp_channel_open(conn, 1); die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); { amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_empty_bytes, 0, 0, 0, 1, - amqp_empty_table); + amqp_empty_table); die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue"); queuename = amqp_bytes_malloc_dup(r->queue); if (queuename.bytes == NULL) { @@ -108,7 +109,7 @@ int main(int argc, char const * const *argv) { } amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey), - amqp_empty_table); + amqp_empty_table); die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue"); amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 1, 0, amqp_empty_table); @@ -127,35 +128,39 @@ int main(int argc, char const * const *argv) { amqp_maybe_release_buffers(conn); result = amqp_simple_wait_frame(conn, &frame); printf("Result %d\n", result); - if (result < 0) - break; + if (result < 0) { + break; + } printf("Frame type %d, channel %d\n", frame.frame_type, frame.channel); - if (frame.frame_type != AMQP_FRAME_METHOD) - continue; + if (frame.frame_type != AMQP_FRAME_METHOD) { + continue; + } printf("Method %s\n", amqp_method_name(frame.payload.method.id)); - if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) - continue; + if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) { + continue; + } d = (amqp_basic_deliver_t *) frame.payload.method.decoded; printf("Delivery %u, exchange %.*s routingkey %.*s\n", - (unsigned) d->delivery_tag, - (int) d->exchange.len, (char *) d->exchange.bytes, - (int) d->routing_key.len, (char *) d->routing_key.bytes); + (unsigned) d->delivery_tag, + (int) d->exchange.len, (char *) d->exchange.bytes, + (int) d->routing_key.len, (char *) d->routing_key.bytes); result = amqp_simple_wait_frame(conn, &frame); - if (result < 0) - break; + if (result < 0) { + break; + } if (frame.frame_type != AMQP_FRAME_HEADER) { - fprintf(stderr, "Expected header!"); - abort(); + fprintf(stderr, "Expected header!"); + abort(); } p = (amqp_basic_properties_t *) frame.payload.properties.decoded; if (p->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) { - printf("Content-type: %.*s\n", - (int) p->content_type.len, (char *) p->content_type.bytes); + printf("Content-type: %.*s\n", + (int) p->content_type.len, (char *) p->content_type.bytes); } printf("----\n"); @@ -163,26 +168,27 @@ int main(int argc, char const * const *argv) { body_received = 0; while (body_received < body_target) { - result = amqp_simple_wait_frame(conn, &frame); - if (result < 0) - break; + result = amqp_simple_wait_frame(conn, &frame); + if (result < 0) { + break; + } - if (frame.frame_type != AMQP_FRAME_BODY) { - fprintf(stderr, "Expected body!"); - abort(); - } + if (frame.frame_type != AMQP_FRAME_BODY) { + fprintf(stderr, "Expected body!"); + abort(); + } - body_received += frame.payload.body_fragment.len; - assert(body_received <= body_target); + body_received += frame.payload.body_fragment.len; + assert(body_received <= body_target); - amqp_dump(frame.payload.body_fragment.bytes, - frame.payload.body_fragment.len); + amqp_dump(frame.payload.body_fragment.bytes, + frame.payload.body_fragment.len); } if (body_received != body_target) { - /* Can only happen when amqp_simple_wait_frame returns <= 0 */ - /* We break here to close the connection */ - break; + /* Can only happen when amqp_simple_wait_frame returns <= 0 */ + /* We break here to close the connection */ + break; } } } diff --git a/examples/amqps_listenq.c b/examples/amqps_listenq.c index 3296ffe..ff28e24 100644 --- a/examples/amqps_listenq.c +++ b/examples/amqps_listenq.c @@ -43,7 +43,8 @@ #include "utils.h" -int main(int argc, char const * const *argv) { +int main(int argc, char const *const *argv) +{ char const *hostname; int port, status; char const *queuename; @@ -52,7 +53,7 @@ int main(int argc, char const * const *argv) { if (argc < 4) { fprintf(stderr, "Usage: amqps_listenq host port queuename " - "[cacert.pem [key.pem cert.pem]]\n"); + "[cacert.pem [key.pem cert.pem]]\n"); return 1; } @@ -88,7 +89,7 @@ int main(int argc, char const * const *argv) { amqp_set_socket(conn, socket); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), - "Logging in"); + "Logging in"); amqp_channel_open(conn, 1); die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); @@ -108,35 +109,39 @@ int main(int argc, char const * const *argv) { amqp_maybe_release_buffers(conn); result = amqp_simple_wait_frame(conn, &frame); printf("Result %d\n", result); - if (result < 0) - break; + if (result < 0) { + break; + } printf("Frame type %d, channel %d\n", frame.frame_type, frame.channel); - if (frame.frame_type != AMQP_FRAME_METHOD) - continue; + if (frame.frame_type != AMQP_FRAME_METHOD) { + continue; + } printf("Method %s\n", amqp_method_name(frame.payload.method.id)); - if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) - continue; + if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) { + continue; + } d = (amqp_basic_deliver_t *) frame.payload.method.decoded; printf("Delivery %u, exchange %.*s routingkey %.*s\n", - (unsigned) d->delivery_tag, - (int) d->exchange.len, (char *) d->exchange.bytes, - (int) d->routing_key.len, (char *) d->routing_key.bytes); + (unsigned) d->delivery_tag, + (int) d->exchange.len, (char *) d->exchange.bytes, + (int) d->routing_key.len, (char *) d->routing_key.bytes); result = amqp_simple_wait_frame(conn, &frame); - if (result < 0) - break; + if (result < 0) { + break; + } if (frame.frame_type != AMQP_FRAME_HEADER) { - fprintf(stderr, "Expected header!"); - abort(); + fprintf(stderr, "Expected header!"); + abort(); } p = (amqp_basic_properties_t *) frame.payload.properties.decoded; if (p->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) { - printf("Content-type: %.*s\n", - (int) p->content_type.len, (char *) p->content_type.bytes); + printf("Content-type: %.*s\n", + (int) p->content_type.len, (char *) p->content_type.bytes); } printf("----\n"); @@ -144,26 +149,27 @@ int main(int argc, char const * const *argv) { body_received = 0; while (body_received < body_target) { - result = amqp_simple_wait_frame(conn, &frame); - if (result < 0) - break; + result = amqp_simple_wait_frame(conn, &frame); + if (result < 0) { + break; + } - if (frame.frame_type != AMQP_FRAME_BODY) { - fprintf(stderr, "Expected body!"); - abort(); - } + if (frame.frame_type != AMQP_FRAME_BODY) { + fprintf(stderr, "Expected body!"); + abort(); + } - body_received += frame.payload.body_fragment.len; - assert(body_received <= body_target); + body_received += frame.payload.body_fragment.len; + assert(body_received <= body_target); - amqp_dump(frame.payload.body_fragment.bytes, - frame.payload.body_fragment.len); + amqp_dump(frame.payload.body_fragment.bytes, + frame.payload.body_fragment.len); } if (body_received != body_target) { - /* Can only happen when amqp_simple_wait_frame returns <= 0 */ - /* We break here to close the connection */ - break; + /* Can only happen when amqp_simple_wait_frame returns <= 0 */ + /* We break here to close the connection */ + break; } amqp_basic_ack(conn, 1, d->delivery_tag, 0); diff --git a/examples/amqps_producer.c b/examples/amqps_producer.c index 04f5ae8..c67f8ca 100644 --- a/examples/amqps_producer.c +++ b/examples/amqps_producer.c @@ -44,9 +44,9 @@ #define SUMMARY_EVERY_US 1000000 static void send_batch(amqp_connection_state_t conn, - char const *queue_name, - int rate_limit, - int message_count) + char const *queue_name, + int rate_limit, + int message_count) { uint64_t start_time = now_microseconds(); int i; @@ -69,20 +69,20 @@ static void send_batch(amqp_connection_state_t conn, uint64_t now = now_microseconds(); die_on_error(amqp_basic_publish(conn, - 1, - amqp_cstring_bytes("amq.direct"), - amqp_cstring_bytes(queue_name), - 0, - 0, - NULL, - message_bytes), - "Publishing"); + 1, + amqp_cstring_bytes("amq.direct"), + amqp_cstring_bytes(queue_name), + 0, + 0, + NULL, + message_bytes), + "Publishing"); sent++; if (now > next_summary_time) { int countOverInterval = sent - previous_sent; double intervalRate = countOverInterval / ((now - previous_report_time) / 1000000.0); printf("%d ms: Sent %d - %d since last report (%d Hz)\n", - (int)(now - start_time) / 1000, sent, countOverInterval, (int) intervalRate); + (int)(now - start_time) / 1000, sent, countOverInterval, (int) intervalRate); previous_sent = sent; previous_report_time = now; @@ -105,7 +105,8 @@ static void send_batch(amqp_connection_state_t conn, } } -int main(int argc, char const * const *argv) { +int main(int argc, char const *const *argv) +{ char const *hostname; int port, status; int rate_limit; @@ -115,7 +116,7 @@ int main(int argc, char const * const *argv) { if (argc < 5) { fprintf(stderr, "Usage: amqps_producer host port rate_limit message_count " - "[cacert.pem [key.pem cert.pem]]\n"); + "[cacert.pem [key.pem cert.pem]]\n"); return 1; } @@ -152,7 +153,7 @@ int main(int argc, char const * const *argv) { amqp_set_socket(conn, socket); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), - "Logging in"); + "Logging in"); amqp_channel_open(conn, 1); die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); diff --git a/examples/amqps_sendstring.c b/examples/amqps_sendstring.c index 2325778..989e73a 100644 --- a/examples/amqps_sendstring.c +++ b/examples/amqps_sendstring.c @@ -41,7 +41,8 @@ #include "utils.h" -int main(int argc, char const * const *argv) { +int main(int argc, char const *const *argv) +{ char const *hostname; int port, status; char const *exchange; @@ -52,7 +53,7 @@ int main(int argc, char const * const *argv) { if (argc < 6) { fprintf(stderr, "Usage: amqps_sendstring host port exchange routingkey " - "messagebody [cacert.pem [key.pem cert.pem]]\n"); + "messagebody [cacert.pem [key.pem cert.pem]]\n"); return 1; } @@ -90,7 +91,7 @@ int main(int argc, char const * const *argv) { amqp_set_socket(conn, socket); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), - "Logging in"); + "Logging in"); amqp_channel_open(conn, 1); die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); @@ -100,14 +101,14 @@ int main(int argc, char const * const *argv) { props.content_type = amqp_cstring_bytes("text/plain"); props.delivery_mode = 2; /* persistent delivery mode */ die_on_error(amqp_basic_publish(conn, - 1, - amqp_cstring_bytes(exchange), - amqp_cstring_bytes(routingkey), - 0, - 0, - &props, - amqp_cstring_bytes(messagebody)), - "Publishing"); + 1, + amqp_cstring_bytes(exchange), + amqp_cstring_bytes(routingkey), + 0, + 0, + &props, + amqp_cstring_bytes(messagebody)), + "Publishing"); } die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); diff --git a/examples/amqps_unbind.c b/examples/amqps_unbind.c index a74992a..dfe0f1a 100644 --- a/examples/amqps_unbind.c +++ b/examples/amqps_unbind.c @@ -41,7 +41,8 @@ #include "utils.h" -int main(int argc, char const * const *argv) { +int main(int argc, char const *const *argv) +{ char const *hostname; int port, status; char const *exchange; @@ -52,7 +53,7 @@ int main(int argc, char const * const *argv) { if (argc < 6) { fprintf(stderr, "Usage: amqps_unbind host port exchange bindingkey queue " - "[cacert.pem [key.pem cert.pem]]\n"); + "[cacert.pem [key.pem cert.pem]]\n"); return 1; } @@ -90,15 +91,15 @@ int main(int argc, char const * const *argv) { amqp_set_socket(conn, socket); die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), - "Logging in"); + "Logging in"); amqp_channel_open(conn, 1); die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); amqp_queue_unbind(conn, 1, - amqp_cstring_bytes(queue), - amqp_cstring_bytes(exchange), - amqp_cstring_bytes(bindingkey), - amqp_empty_table); + amqp_cstring_bytes(queue), + amqp_cstring_bytes(exchange), + amqp_cstring_bytes(bindingkey), + amqp_empty_table); die_on_amqp_error(amqp_get_rpc_reply(conn), "Unbinding"); die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); diff --git a/examples/utils.c b/examples/utils.c index 0830738..4b00470 100644 --- a/examples/utils.c +++ b/examples/utils.c @@ -56,7 +56,8 @@ void die(const char *fmt, ...) exit(1); } -void die_on_error(int x, char const *context) { +void die_on_error(int x, char const *context) +{ if (x < 0) { char *errstr = amqp_error_string(-x); fprintf(stderr, "%s: %s\n", context, errstr); diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h index 4336fcd..5167f2e 100644 --- a/librabbitmq/amqp.h +++ b/librabbitmq/amqp.h @@ -389,9 +389,9 @@ int AMQP_CALL amqp_get_sockfd(amqp_connection_state_t state); AMQP_DEPRECATED( -AMQP_PUBLIC_FUNCTION -void -AMQP_CALL amqp_set_sockfd(amqp_connection_state_t state, int sockfd) + AMQP_PUBLIC_FUNCTION + void + AMQP_CALL amqp_set_sockfd(amqp_connection_state_t state, int sockfd) ); AMQP_PUBLIC_FUNCTION @@ -512,8 +512,8 @@ AMQP_CALL amqp_login(amqp_connection_state_t state, char const *vhost, AMQP_PUBLIC_FUNCTION amqp_rpc_reply_t AMQP_CALL amqp_login_with_properties(amqp_connection_state_t state, char const *vhost, - int channel_max, int frame_max, int heartbeat, - const amqp_table_t *properties, amqp_sasl_method_enum sasl_method, ...); + int channel_max, int frame_max, int heartbeat, + const amqp_table_t *properties, amqp_sasl_method_enum sasl_method, ...); struct amqp_basic_properties_t_; diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c index 677038b..2d50ad3 100644 --- a/librabbitmq/amqp_connection.c +++ b/librabbitmq/amqp_connection.c @@ -170,8 +170,9 @@ int amqp_destroy_connection(amqp_connection_state_t state) empty_amqp_pool(&state->decoding_pool); free(state->outbound_buffer.bytes); free(state->sock_inbound_buffer.bytes); - if (amqp_socket_close(state->socket) < 0) + if (amqp_socket_close(state->socket) < 0) { status = -amqp_socket_error(state->socket); + } free(state); } return status; diff --git a/librabbitmq/amqp_cyassl.c b/librabbitmq/amqp_cyassl.c index 9e2dcfe..5bcfe6d 100644 --- a/librabbitmq/amqp_cyassl.c +++ b/librabbitmq/amqp_cyassl.c @@ -31,193 +31,193 @@ #include <stdlib.h> struct amqp_ssl_socket_t { - CYASSL_CTX *ctx; - CYASSL *ssl; - int sockfd; - char *buffer; - size_t length; + CYASSL_CTX *ctx; + CYASSL *ssl; + int sockfd; + char *buffer; + size_t length; }; static ssize_t amqp_ssl_socket_send(void *base, - const void *buf, - size_t len, - AMQP_UNUSED int flags) + const void *buf, + size_t len, + AMQP_UNUSED int flags) { - struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; - return CyaSSL_write(self->ssl, buf, len); + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + return CyaSSL_write(self->ssl, buf, len); } static ssize_t amqp_ssl_socket_writev(void *base, - const struct iovec *iov, - int iovcnt) + const struct iovec *iov, + int iovcnt) { - struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; - ssize_t written = -1; - char *bufferp; - size_t bytes; - int i; - bytes = 0; - for (i = 0; i < iovcnt; ++i) { - bytes += iov[i].iov_len; - } - if (self->length < bytes) { - free(self->buffer); - self->buffer = malloc(bytes); - if (!self->buffer) { - self->length = 0; - goto exit; - } - self->length = bytes; - } - bufferp = self->buffer; - for (i = 0; i < iovcnt; ++i) { - memcpy(bufferp, iov[i].iov_base, iov[i].iov_len); - bufferp += iov[i].iov_len; - } - written = CyaSSL_write(self->ssl, self->buffer, bytes); + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + ssize_t written = -1; + char *bufferp; + size_t bytes; + int i; + bytes = 0; + for (i = 0; i < iovcnt; ++i) { + bytes += iov[i].iov_len; + } + if (self->length < bytes) { + free(self->buffer); + self->buffer = malloc(bytes); + if (!self->buffer) { + self->length = 0; + goto exit; + } + self->length = bytes; + } + bufferp = self->buffer; + for (i = 0; i < iovcnt; ++i) { + memcpy(bufferp, iov[i].iov_base, iov[i].iov_len); + bufferp += iov[i].iov_len; + } + written = CyaSSL_write(self->ssl, self->buffer, bytes); exit: - return written; + return written; } static ssize_t amqp_ssl_socket_recv(void *base, - void *buf, - size_t len, - AMQP_UNUSED int flags) + void *buf, + size_t len, + AMQP_UNUSED int flags) { - struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; - return CyaSSL_read(self->ssl, buf, len); + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + return CyaSSL_read(self->ssl, buf, len); } static int amqp_ssl_socket_get_sockfd(void *base) { - struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; - return self->sockfd; + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + return self->sockfd; } static int amqp_ssl_socket_close(void *base) { - int status = -1; - struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; - if (self->sockfd >= 0) { - status = amqp_os_socket_close(self->sockfd); - } - if (self) { - CyaSSL_free(self->ssl); - CyaSSL_CTX_free(self->ctx); - free(self->buffer); - free(self); - } - return status; + int status = -1; + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + if (self->sockfd >= 0) { + status = amqp_os_socket_close(self->sockfd); + } + if (self) { + CyaSSL_free(self->ssl); + CyaSSL_CTX_free(self->ctx); + free(self->buffer); + free(self); + } + return status; } static int amqp_ssl_socket_error(AMQP_UNUSED void *user_data) { - return -1; + return -1; } static int amqp_ssl_socket_open(void *base, const char *host, int port) { - struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; - int status; - self->sockfd = amqp_open_socket(host, port); - if (0 > self->sockfd) { - return -1; - } - CyaSSL_set_fd(self->ssl, self->sockfd); - status = CyaSSL_connect(self->ssl); - if (SSL_SUCCESS != status) { - return -1; - } - return 0; + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + int status; + self->sockfd = amqp_open_socket(host, port); + if (0 > self->sockfd) { + return -1; + } + CyaSSL_set_fd(self->ssl, self->sockfd); + status = CyaSSL_connect(self->ssl); + if (SSL_SUCCESS != status) { + return -1; + } + return 0; } static const struct amqp_socket_class_t amqp_ssl_socket_class = { - amqp_ssl_socket_writev, /* writev */ - amqp_ssl_socket_send, /* send */ - amqp_ssl_socket_recv, /* recv */ - amqp_ssl_socket_open, /* open */ - amqp_ssl_socket_close, /* close */ - amqp_ssl_socket_error, /* error */ - amqp_ssl_socket_get_sockfd /* get_sockfd */ + amqp_ssl_socket_writev, /* writev */ + amqp_ssl_socket_send, /* send */ + amqp_ssl_socket_recv, /* recv */ + amqp_ssl_socket_open, /* open */ + amqp_ssl_socket_close, /* close */ + amqp_ssl_socket_error, /* error */ + amqp_ssl_socket_get_sockfd /* get_sockfd */ }; amqp_socket_t * amqp_ssl_socket_new(void) { - struct amqp_ssl_socket_t *self = calloc(1, sizeof(*self)); - if (!self) { - goto error; - } - CyaSSL_Init(); - self->ctx = CyaSSL_CTX_new(CyaSSLv23_client_method()); - if (!self->ctx) { - goto error; - } - return (amqp_socket_t *)self; + struct amqp_ssl_socket_t *self = calloc(1, sizeof(*self)); + if (!self) { + goto error; + } + CyaSSL_Init(); + self->ctx = CyaSSL_CTX_new(CyaSSLv23_client_method()); + if (!self->ctx) { + goto error; + } + return (amqp_socket_t *)self; error: - amqp_socket_close((amqp_socket_t *)self); - return NULL; + amqp_socket_close((amqp_socket_t *)self); + return NULL; } int amqp_ssl_socket_set_cacert(amqp_socket_t *base, - const char *cacert) + const char *cacert) { - int status; - struct amqp_ssl_socket_t *self; - if (base->klass != &amqp_ssl_socket_class) { - amqp_abort("<%p> is not of type amqp_ssl_socket_t", base); - } - self = (struct amqp_ssl_socket_t *)base; - status = CyaSSL_CTX_load_verify_locations(self->ctx, cacert, NULL); - if (SSL_SUCCESS != status) { - return -1; - } - return 0; + int status; + struct amqp_ssl_socket_t *self; + if (base->klass != &amqp_ssl_socket_class) { + amqp_abort("<%p> is not of type amqp_ssl_socket_t", base); + } + self = (struct amqp_ssl_socket_t *)base; + status = CyaSSL_CTX_load_verify_locations(self->ctx, cacert, NULL); + if (SSL_SUCCESS != status) { + return -1; + } + return 0; } int amqp_ssl_socket_set_key(amqp_socket_t *base, - const char *cert, - const char *key) + const char *cert, + const char *key) { - int status; - struct amqp_ssl_socket_t *self; - if (base->klass != &amqp_ssl_socket_class) { - amqp_abort("<%p> is not of type amqp_ssl_socket_t", base); - } - self = (struct amqp_ssl_socket_t *)base; - status = CyaSSL_CTX_use_PrivateKey_file(self->ctx, key, - SSL_FILETYPE_PEM); - if (SSL_SUCCESS != status) { - return -1; - } - status = CyaSSL_CTX_use_certificate_chain_file(self->ctx, cert); - return 0; + int status; + struct amqp_ssl_socket_t *self; + if (base->klass != &amqp_ssl_socket_class) { + amqp_abort("<%p> is not of type amqp_ssl_socket_t", base); + } + self = (struct amqp_ssl_socket_t *)base; + status = CyaSSL_CTX_use_PrivateKey_file(self->ctx, key, + SSL_FILETYPE_PEM); + if (SSL_SUCCESS != status) { + return -1; + } + status = CyaSSL_CTX_use_certificate_chain_file(self->ctx, cert); + return 0; } int amqp_ssl_socket_set_key_buffer(AMQP_UNUSED amqp_socket_t *base, - AMQP_UNUSED const char *cert, - AMQP_UNUSED const void *key, - AMQP_UNUSED size_t n) + AMQP_UNUSED const char *cert, + AMQP_UNUSED const void *key, + AMQP_UNUSED size_t n) { - amqp_abort("%s is not implemented for CyaSSL", __func__); - return -1; + amqp_abort("%s is not implemented for CyaSSL", __func__); + return -1; } void amqp_ssl_socket_set_verify(AMQP_UNUSED amqp_socket_t *base, - AMQP_UNUSED amqp_boolean_t verify) + AMQP_UNUSED amqp_boolean_t verify) { - /* noop for CyaSSL */ + /* noop for CyaSSL */ } void diff --git a/librabbitmq/amqp_gnutls.c b/librabbitmq/amqp_gnutls.c index 1db7a0b..64b602c 100644 --- a/librabbitmq/amqp_gnutls.c +++ b/librabbitmq/amqp_gnutls.c @@ -32,280 +32,280 @@ #include <stdlib.h> struct amqp_ssl_socket_t { - gnutls_session_t session; - gnutls_certificate_credentials_t credentials; - int sockfd; - char *host; - char *buffer; - size_t length; + gnutls_session_t session; + gnutls_certificate_credentials_t credentials; + int sockfd; + char *host; + char *buffer; + size_t length; }; static ssize_t amqp_ssl_socket_send(void *base, - const void *buf, - size_t len, - AMQP_UNUSED int flags) + const void *buf, + size_t len, + AMQP_UNUSED int flags) { - struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; - return gnutls_record_send(self->session, buf, len); + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + return gnutls_record_send(self->session, buf, len); } static ssize_t amqp_ssl_socket_writev(void *base, - const struct iovec *iov, - int iovcnt) + const struct iovec *iov, + int iovcnt) { - struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; - ssize_t written = -1; - char *bufferp; - size_t bytes; - int i; - bytes = 0; - for (i = 0; i < iovcnt; ++i) { - bytes += iov[i].iov_len; - } - if (self->length < bytes) { - free(self->buffer); - self->buffer = malloc(bytes); - if (!self->buffer) { - self->length = 0; - goto exit; - } - self->length = 0; - } - bufferp = self->buffer; - for (i = 0; i < iovcnt; ++i) { - memcpy(bufferp, iov[i].iov_base, iov[i].iov_len); - bufferp += iov[i].iov_len; - } - written = gnutls_record_send(self->session, self->buffer, bytes); + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + ssize_t written = -1; + char *bufferp; + size_t bytes; + int i; + bytes = 0; + for (i = 0; i < iovcnt; ++i) { + bytes += iov[i].iov_len; + } + if (self->length < bytes) { + free(self->buffer); + self->buffer = malloc(bytes); + if (!self->buffer) { + self->length = 0; + goto exit; + } + self->length = 0; + } + bufferp = self->buffer; + for (i = 0; i < iovcnt; ++i) { + memcpy(bufferp, iov[i].iov_base, iov[i].iov_len); + bufferp += iov[i].iov_len; + } + written = gnutls_record_send(self->session, self->buffer, bytes); exit: - return written; + return written; } static ssize_t amqp_ssl_socket_recv(void *base, - void *buf, - size_t len, - AMQP_UNUSED int flags) + void *buf, + size_t len, + AMQP_UNUSED int flags) { - struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; - return gnutls_record_recv(self->session, buf, len); + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + return gnutls_record_recv(self->session, buf, len); } static int amqp_ssl_socket_open(void *base, const char *host, int port) { - struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; - int status; - self->sockfd = amqp_open_socket(host, port); - if (0 > self->sockfd) { - return -1; - } - gnutls_transport_set_ptr(self->session, - (gnutls_transport_ptr_t)self->sockfd); - do { - status = gnutls_handshake(self->session); - } while (status < 0 && !gnutls_error_is_fatal(status)); - return status; + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + int status; + self->sockfd = amqp_open_socket(host, port); + if (0 > self->sockfd) { + return -1; + } + gnutls_transport_set_ptr(self->session, + (gnutls_transport_ptr_t)self->sockfd); + do { + status = gnutls_handshake(self->session); + } while (status < 0 && !gnutls_error_is_fatal(status)); + return status; } static int amqp_ssl_socket_close(void *base) { - int status = -1; - struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; - if (self->sockfd >= 0) { - status = amqp_os_socket_close(self->sockfd); - } - if (self) { - gnutls_deinit(self->session); - gnutls_certificate_free_credentials(self->credentials); - free(self->host); - free(self->buffer); - free(self); - } - return status; + int status = -1; + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + if (self->sockfd >= 0) { + status = amqp_os_socket_close(self->sockfd); + } + if (self) { + gnutls_deinit(self->session); + gnutls_certificate_free_credentials(self->credentials); + free(self->host); + free(self->buffer); + free(self); + } + return status; } static int amqp_ssl_socket_error(AMQP_UNUSED void *user_data) { - return -1; + return -1; } static int amqp_ssl_socket_get_sockfd(void *base) { - struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; - return self->sockfd; + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + return self->sockfd; } static int amqp_ssl_verify(gnutls_session_t session) { - int ret; - unsigned int status, size; - const gnutls_datum_t *list; - gnutls_x509_crt_t cert = NULL; - struct amqp_ssl_socket_t *self = gnutls_session_get_ptr(session); - ret = gnutls_certificate_verify_peers2(session, &status); - if (0 > ret) { - goto error; - } - if (status & GNUTLS_CERT_INVALID) { - goto error; - } - if (status & GNUTLS_CERT_SIGNER_NOT_FOUND) { - goto error; - } - if (status & GNUTLS_CERT_REVOKED) { - goto error; - } - if (status & GNUTLS_CERT_EXPIRED) { - goto error; - } - if (status & GNUTLS_CERT_NOT_ACTIVATED) { - goto error; - } - if (gnutls_certificate_type_get(session) != GNUTLS_CRT_X509) { - goto error; - } - if (gnutls_x509_crt_init(&cert) < 0) { - goto error; - } - list = gnutls_certificate_get_peers(session, &size); - if (!list) { - goto error; - } - ret = gnutls_x509_crt_import(cert, &list[0], GNUTLS_X509_FMT_DER); - if (0 > ret) { - goto error; - } - if (!gnutls_x509_crt_check_hostname(cert, self->host)) { - goto error; - } - gnutls_x509_crt_deinit(cert); - return 0; + int ret; + unsigned int status, size; + const gnutls_datum_t *list; + gnutls_x509_crt_t cert = NULL; + struct amqp_ssl_socket_t *self = gnutls_session_get_ptr(session); + ret = gnutls_certificate_verify_peers2(session, &status); + if (0 > ret) { + goto error; + } + if (status & GNUTLS_CERT_INVALID) { + goto error; + } + if (status & GNUTLS_CERT_SIGNER_NOT_FOUND) { + goto error; + } + if (status & GNUTLS_CERT_REVOKED) { + goto error; + } + if (status & GNUTLS_CERT_EXPIRED) { + goto error; + } + if (status & GNUTLS_CERT_NOT_ACTIVATED) { + goto error; + } + if (gnutls_certificate_type_get(session) != GNUTLS_CRT_X509) { + goto error; + } + if (gnutls_x509_crt_init(&cert) < 0) { + goto error; + } + list = gnutls_certificate_get_peers(session, &size); + if (!list) { + goto error; + } + ret = gnutls_x509_crt_import(cert, &list[0], GNUTLS_X509_FMT_DER); + if (0 > ret) { + goto error; + } + if (!gnutls_x509_crt_check_hostname(cert, self->host)) { + goto error; + } + gnutls_x509_crt_deinit(cert); + return 0; error: - if (cert) { - gnutls_x509_crt_deinit (cert); - } - return GNUTLS_E_CERTIFICATE_ERROR; + if (cert) { + gnutls_x509_crt_deinit (cert); + } + return GNUTLS_E_CERTIFICATE_ERROR; } static const struct amqp_socket_class_t amqp_ssl_socket_class = { - amqp_ssl_socket_writev, /* writev */ - amqp_ssl_socket_send, /* send */ - amqp_ssl_socket_recv, /* recv */ - amqp_ssl_socket_open, /* open */ - amqp_ssl_socket_close, /* close */ - amqp_ssl_socket_error, /* error */ - amqp_ssl_socket_get_sockfd /* get_sockfd */ + amqp_ssl_socket_writev, /* writev */ + amqp_ssl_socket_send, /* send */ + amqp_ssl_socket_recv, /* recv */ + amqp_ssl_socket_open, /* open */ + amqp_ssl_socket_close, /* close */ + amqp_ssl_socket_error, /* error */ + amqp_ssl_socket_get_sockfd /* get_sockfd */ }; amqp_socket_t * amqp_ssl_socket_new(void) { - struct amqp_ssl_socket_t *self = calloc(1, sizeof(*self)); - const char *error; - int status; - if (!self) { - goto error; - } - gnutls_global_init(); - status = gnutls_init(&self->session, GNUTLS_CLIENT); - if (GNUTLS_E_SUCCESS != status) { - goto error; - } - status = gnutls_certificate_allocate_credentials(&self->credentials); - if (GNUTLS_E_SUCCESS != status) { - goto error; - } - gnutls_certificate_set_verify_function(self->credentials, - amqp_ssl_verify); - status = gnutls_credentials_set(self->session, GNUTLS_CRD_CERTIFICATE, - self->credentials); - if (GNUTLS_E_SUCCESS != status) { - goto error; - } - gnutls_session_set_ptr(self->session, self); - status = gnutls_priority_set_direct(self->session, "NORMAL", &error); - if (GNUTLS_E_SUCCESS != status) { - goto error; - } - return (amqp_socket_t *)self; + struct amqp_ssl_socket_t *self = calloc(1, sizeof(*self)); + const char *error; + int status; + if (!self) { + goto error; + } + gnutls_global_init(); + status = gnutls_init(&self->session, GNUTLS_CLIENT); + if (GNUTLS_E_SUCCESS != status) { + goto error; + } + status = gnutls_certificate_allocate_credentials(&self->credentials); + if (GNUTLS_E_SUCCESS != status) { + goto error; + } + gnutls_certificate_set_verify_function(self->credentials, + amqp_ssl_verify); + status = gnutls_credentials_set(self->session, GNUTLS_CRD_CERTIFICATE, + self->credentials); + if (GNUTLS_E_SUCCESS != status) { + goto error; + } + gnutls_session_set_ptr(self->session, self); + status = gnutls_priority_set_direct(self->session, "NORMAL", &error); + if (GNUTLS_E_SUCCESS != status) { + goto error; + } + return (amqp_socket_t *)self; error: - amqp_socket_close((amqp_socket_t *)self); - return NULL; + amqp_socket_close((amqp_socket_t *)self); + return NULL; } int amqp_ssl_socket_set_cacert(amqp_socket_t *base, - const char *cacert) + const char *cacert) { - int status; - struct amqp_ssl_socket_t *self; - if (base->klass != &amqp_ssl_socket_class) { - amqp_abort("<%p> is not of type amqp_ssl_socket_t", base); - } - self = (struct amqp_ssl_socket_t *)base; - status = gnutls_certificate_set_x509_trust_file(self->credentials, - cacert, - GNUTLS_X509_FMT_PEM); - if (0 > status) { - return -1; - } - return 0; + int status; + struct amqp_ssl_socket_t *self; + if (base->klass != &amqp_ssl_socket_class) { + amqp_abort("<%p> is not of type amqp_ssl_socket_t", base); + } + self = (struct amqp_ssl_socket_t *)base; + status = gnutls_certificate_set_x509_trust_file(self->credentials, + cacert, + GNUTLS_X509_FMT_PEM); + if (0 > status) { + return -1; + } + return 0; } int amqp_ssl_socket_set_key(amqp_socket_t *base, - const char *cert, - const char *key) + const char *cert, + const char *key) { - int status; - struct amqp_ssl_socket_t *self; - if (base->klass != &amqp_ssl_socket_class) { - amqp_abort("<%p> is not of type amqp_ssl_socket_t", base); - } - self = (struct amqp_ssl_socket_t *)base; - status = gnutls_certificate_set_x509_key_file(self->credentials, - cert, - key, - GNUTLS_X509_FMT_PEM); - if (0 > status) { - return -1; - } - return 0; + int status; + struct amqp_ssl_socket_t *self; + if (base->klass != &amqp_ssl_socket_class) { + amqp_abort("<%p> is not of type amqp_ssl_socket_t", base); + } + self = (struct amqp_ssl_socket_t *)base; + status = gnutls_certificate_set_x509_key_file(self->credentials, + cert, + key, + GNUTLS_X509_FMT_PEM); + if (0 > status) { + return -1; + } + return 0; } int amqp_ssl_socket_set_key_buffer(AMQP_UNUSED amqp_socket_t *base, - AMQP_UNUSED const char *cert, - AMQP_UNUSED const void *key, - AMQP_UNUSED size_t n) + AMQP_UNUSED const char *cert, + AMQP_UNUSED const void *key, + AMQP_UNUSED size_t n) { - amqp_abort("%s is not implemented for GnuTLS", __func__); - return -1; + amqp_abort("%s is not implemented for GnuTLS", __func__); + return -1; } void amqp_ssl_socket_set_verify(amqp_socket_t *base, - amqp_boolean_t verify) + amqp_boolean_t verify) { - struct amqp_ssl_socket_t *self; - if (base->klass != &amqp_ssl_socket_class) { - amqp_abort("<%p> is not of type amqp_ssl_socket_t", base); - } - self = (struct amqp_ssl_socket_t *)base; - if (verify) { - gnutls_certificate_set_verify_function(self->credentials, - amqp_ssl_verify); - } else { - gnutls_certificate_set_verify_function(self->credentials, - NULL); - } + struct amqp_ssl_socket_t *self; + if (base->klass != &amqp_ssl_socket_class) { + amqp_abort("<%p> is not of type amqp_ssl_socket_t", base); + } + self = (struct amqp_ssl_socket_t *)base; + if (verify) { + gnutls_certificate_set_verify_function(self->credentials, + amqp_ssl_verify); + } else { + gnutls_certificate_set_verify_function(self->credentials, + NULL); + } } void diff --git a/librabbitmq/amqp_openssl.c b/librabbitmq/amqp_openssl.c index d84fab7..748f249 100644 --- a/librabbitmq/amqp_openssl.c +++ b/librabbitmq/amqp_openssl.c @@ -57,378 +57,378 @@ static pthread_mutex_t *amqp_openssl_lockarray = NULL; #endif /* ENABLE_THREAD_SAFETY */ struct amqp_ssl_socket_t { - const struct amqp_socket_class_t *klass; - SSL_CTX *ctx; - int sockfd; - SSL *ssl; - char *buffer; - size_t length; - amqp_boolean_t verify; + const struct amqp_socket_class_t *klass; + SSL_CTX *ctx; + int sockfd; + SSL *ssl; + char *buffer; + size_t length; + amqp_boolean_t verify; }; static ssize_t amqp_ssl_socket_send(void *base, - const void *buf, - size_t len, - AMQP_UNUSED int flags) + const void *buf, + size_t len, + AMQP_UNUSED int flags) { - struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; - ssize_t sent; - ERR_clear_error(); - sent = SSL_write(self->ssl, buf, len); - if (0 > sent) { - switch (SSL_get_error(self->ssl, sent)) { - case SSL_ERROR_NONE: - case SSL_ERROR_ZERO_RETURN: - case SSL_ERROR_WANT_READ: - case SSL_ERROR_WANT_WRITE: - sent = 0; - break; - } - } - return sent; + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + ssize_t sent; + ERR_clear_error(); + sent = SSL_write(self->ssl, buf, len); + if (0 > sent) { + switch (SSL_get_error(self->ssl, sent)) { + case SSL_ERROR_NONE: + case SSL_ERROR_ZERO_RETURN: + case SSL_ERROR_WANT_READ: + case SSL_ERROR_WANT_WRITE: + sent = 0; + break; + } + } + return sent; } static ssize_t amqp_ssl_socket_writev(void *base, - const struct iovec *iov, - int iovcnt) + const struct iovec *iov, + int iovcnt) { - struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; - ssize_t written = -1; - char *bufferp; - size_t bytes; - int i; - bytes = 0; - for (i = 0; i < iovcnt; ++i) { - bytes += iov[i].iov_len; - } - if (self->length < bytes) { - free(self->buffer); - self->buffer = malloc(bytes); - if (!self->buffer) { - self->length = 0; - goto exit; - } - self->length = bytes; - } - bufferp = self->buffer; - for (i = 0; i < iovcnt; ++i) { - memcpy(bufferp, iov[i].iov_base, iov[i].iov_len); - bufferp += iov[i].iov_len; - } - written = amqp_ssl_socket_send(self, self->buffer, bytes, 0); + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + ssize_t written = -1; + char *bufferp; + size_t bytes; + int i; + bytes = 0; + for (i = 0; i < iovcnt; ++i) { + bytes += iov[i].iov_len; + } + if (self->length < bytes) { + free(self->buffer); + self->buffer = malloc(bytes); + if (!self->buffer) { + self->length = 0; + goto exit; + } + self->length = bytes; + } + bufferp = self->buffer; + for (i = 0; i < iovcnt; ++i) { + memcpy(bufferp, iov[i].iov_base, iov[i].iov_len); + bufferp += iov[i].iov_len; + } + written = amqp_ssl_socket_send(self, self->buffer, bytes, 0); exit: - return written; + return written; } static ssize_t amqp_ssl_socket_recv(void *base, - void *buf, - size_t len, - AMQP_UNUSED int flags) + void *buf, + size_t len, + AMQP_UNUSED int flags) { - struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; - ssize_t received; - ERR_clear_error(); - received = SSL_read(self->ssl, buf, len); - if (0 > received) { - switch(SSL_get_error(self->ssl, received)) { - case SSL_ERROR_WANT_READ: - case SSL_ERROR_WANT_WRITE: - received = 0; - break; - } - } - return received; + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + ssize_t received; + ERR_clear_error(); + received = SSL_read(self->ssl, buf, len); + if (0 > received) { + switch(SSL_get_error(self->ssl, received)) { + case SSL_ERROR_WANT_READ: + case SSL_ERROR_WANT_WRITE: + received = 0; + break; + } + } + return received; } static int amqp_ssl_socket_verify(void *base, const char *host) { - struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; - unsigned char *utf8_value = NULL, *cp, ch; - int pos, utf8_length, status = 0; - ASN1_STRING *entry_string; - X509_NAME_ENTRY *entry; - X509_NAME *name; - X509 *peer; - peer = SSL_get_peer_certificate(self->ssl); - if (!peer) { - goto error; - } - name = X509_get_subject_name(peer); - if (!name) { - goto error; - } - pos = X509_NAME_get_index_by_NID(name, NID_commonName, -1); - if (0 > pos) { - goto error; - } - entry = X509_NAME_get_entry(name, pos); - if (!entry) { - goto error; - } - entry_string = X509_NAME_ENTRY_get_data(entry); - if (!entry_string) { - goto error; - } - utf8_length = ASN1_STRING_to_UTF8(&utf8_value, entry_string); - if (0 > utf8_length) { - goto error; - } - while (utf8_length > 0 && utf8_value[utf8_length - 1] == 0) { - --utf8_length; - } - if (utf8_length >= 256) { - goto error; - } - if ((size_t)utf8_length != strlen((char *)utf8_value)) { - goto error; - } - for (cp = utf8_value; (ch = *cp) != '\0'; ++cp) { - if (isascii(ch) && !isprint(ch)) { - goto error; - } - } + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + unsigned char *utf8_value = NULL, *cp, ch; + int pos, utf8_length, status = 0; + ASN1_STRING *entry_string; + X509_NAME_ENTRY *entry; + X509_NAME *name; + X509 *peer; + peer = SSL_get_peer_certificate(self->ssl); + if (!peer) { + goto error; + } + name = X509_get_subject_name(peer); + if (!name) { + goto error; + } + pos = X509_NAME_get_index_by_NID(name, NID_commonName, -1); + if (0 > pos) { + goto error; + } + entry = X509_NAME_get_entry(name, pos); + if (!entry) { + goto error; + } + entry_string = X509_NAME_ENTRY_get_data(entry); + if (!entry_string) { + goto error; + } + utf8_length = ASN1_STRING_to_UTF8(&utf8_value, entry_string); + if (0 > utf8_length) { + goto error; + } + while (utf8_length > 0 && utf8_value[utf8_length - 1] == 0) { + --utf8_length; + } + if (utf8_length >= 256) { + goto error; + } + if ((size_t)utf8_length != strlen((char *)utf8_value)) { + goto error; + } + for (cp = utf8_value; (ch = *cp) != '\0'; ++cp) { + if (isascii(ch) && !isprint(ch)) { + goto error; + } + } #ifdef _MSC_VER #define strcasecmp _stricmp #endif - if (strcasecmp(host, (char *)utf8_value)) { - goto error; - } + if (strcasecmp(host, (char *)utf8_value)) { + goto error; + } #ifdef _MSC_VER #undef strcasecmp #endif exit: - OPENSSL_free(utf8_value); - return status; + OPENSSL_free(utf8_value); + return status; error: - status = -1; - goto exit; + status = -1; + goto exit; } static int amqp_ssl_socket_open(void *base, const char *host, int port) { - struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; - long result; - int status; - self->ssl = SSL_new(self->ctx); - if (!self->ssl) { - return -1; - } - SSL_set_mode(self->ssl, SSL_MODE_AUTO_RETRY); - self->sockfd = amqp_open_socket(host, port); - if (0 > self->sockfd) { - return -1; - } - status = SSL_set_fd(self->ssl, self->sockfd); - if (!status) { - return -1; - } - status = SSL_connect(self->ssl); - if (!status) { - return -1; - } - result = SSL_get_verify_result(self->ssl); - if (X509_V_OK != result) { - return -1; - } - if (self->verify) { - int status = amqp_ssl_socket_verify(self, host); - if (status) { - return -1; - } - } - return 0; + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + long result; + int status; + self->ssl = SSL_new(self->ctx); + if (!self->ssl) { + return -1; + } + SSL_set_mode(self->ssl, SSL_MODE_AUTO_RETRY); + self->sockfd = amqp_open_socket(host, port); + if (0 > self->sockfd) { + return -1; + } + status = SSL_set_fd(self->ssl, self->sockfd); + if (!status) { + return -1; + } + status = SSL_connect(self->ssl); + if (!status) { + return -1; + } + result = SSL_get_verify_result(self->ssl); + if (X509_V_OK != result) { + return -1; + } + if (self->verify) { + int status = amqp_ssl_socket_verify(self, host); + if (status) { + return -1; + } + } + return 0; } static int amqp_ssl_socket_close(void *base) { - struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; - if (self) { - SSL_free(self->ssl); - amqp_os_socket_close(self->sockfd); - SSL_CTX_free(self->ctx); - free(self->buffer); - free(self); - } - destroy_openssl(); - return 0; + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + if (self) { + SSL_free(self->ssl); + amqp_os_socket_close(self->sockfd); + SSL_CTX_free(self->ctx); + free(self->buffer); + free(self); + } + destroy_openssl(); + return 0; } static int amqp_ssl_socket_error(AMQP_UNUSED void *base) { - return -1; + return -1; } static int amqp_ssl_socket_get_sockfd(void *base) { - struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; - return self->sockfd; + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + return self->sockfd; } static const struct amqp_socket_class_t amqp_ssl_socket_class = { - amqp_ssl_socket_writev, /* writev */ - amqp_ssl_socket_send, /* send */ - amqp_ssl_socket_recv, /* recv */ - amqp_ssl_socket_open, /* open */ - amqp_ssl_socket_close, /* close */ - amqp_ssl_socket_error, /* error */ - amqp_ssl_socket_get_sockfd /* get_sockfd */ + amqp_ssl_socket_writev, /* writev */ + amqp_ssl_socket_send, /* send */ + amqp_ssl_socket_recv, /* recv */ + amqp_ssl_socket_open, /* open */ + amqp_ssl_socket_close, /* close */ + amqp_ssl_socket_error, /* error */ + amqp_ssl_socket_get_sockfd /* get_sockfd */ }; amqp_socket_t * amqp_ssl_socket_new(void) { - struct amqp_ssl_socket_t *self = calloc(1, sizeof(*self)); - int status; - if (!self) { - goto error; - } - status = initialize_openssl(); - if (status) { - goto error; - } - self->ctx = SSL_CTX_new(SSLv23_client_method()); - if (!self->ctx) { - goto error; - } - self->klass = &amqp_ssl_socket_class; - self->verify = 1; - return (amqp_socket_t *)self; + struct amqp_ssl_socket_t *self = calloc(1, sizeof(*self)); + int status; + if (!self) { + goto error; + } + status = initialize_openssl(); + if (status) { + goto error; + } + self->ctx = SSL_CTX_new(SSLv23_client_method()); + if (!self->ctx) { + goto error; + } + self->klass = &amqp_ssl_socket_class; + self->verify = 1; + return (amqp_socket_t *)self; error: - amqp_socket_close((amqp_socket_t *)self); - return NULL; + amqp_socket_close((amqp_socket_t *)self); + return NULL; } int amqp_ssl_socket_set_cacert(amqp_socket_t *base, - const char *cacert) + const char *cacert) { - int status; - struct amqp_ssl_socket_t *self; - if (base->klass != &amqp_ssl_socket_class) { - amqp_abort("<%p> is not of type amqp_ssl_socket_t", base); - } - self = (struct amqp_ssl_socket_t *)base; - status = SSL_CTX_load_verify_locations(self->ctx, cacert, NULL); - if (1 != status) { - return -1; - } - return 0; + int status; + struct amqp_ssl_socket_t *self; + if (base->klass != &amqp_ssl_socket_class) { + amqp_abort("<%p> is not of type amqp_ssl_socket_t", base); + } + self = (struct amqp_ssl_socket_t *)base; + status = SSL_CTX_load_verify_locations(self->ctx, cacert, NULL); + if (1 != status) { + return -1; + } + return 0; } int amqp_ssl_socket_set_key(amqp_socket_t *base, - const char *cert, - const char *key) + const char *cert, + const char *key) { - int status; - struct amqp_ssl_socket_t *self; - if (base->klass != &amqp_ssl_socket_class) { - amqp_abort("<%p> is not of type amqp_ssl_socket_t", base); - } - self = (struct amqp_ssl_socket_t *)base; - status = SSL_CTX_use_certificate_chain_file(self->ctx, cert); - if (1 != status) { - return -1; - } - status = SSL_CTX_use_PrivateKey_file(self->ctx, key, - SSL_FILETYPE_PEM); - if (1 != status) { - return -1; - } - return 0; + int status; + struct amqp_ssl_socket_t *self; + if (base->klass != &amqp_ssl_socket_class) { + amqp_abort("<%p> is not of type amqp_ssl_socket_t", base); + } + self = (struct amqp_ssl_socket_t *)base; + status = SSL_CTX_use_certificate_chain_file(self->ctx, cert); + if (1 != status) { + return -1; + } + status = SSL_CTX_use_PrivateKey_file(self->ctx, key, + SSL_FILETYPE_PEM); + if (1 != status) { + return -1; + } + return 0; } static int password_cb(AMQP_UNUSED char *buffer, - AMQP_UNUSED int length, - AMQP_UNUSED int rwflag, - AMQP_UNUSED void *user_data) + AMQP_UNUSED int length, + AMQP_UNUSED int rwflag, + AMQP_UNUSED void *user_data) { - amqp_abort("rabbitmq-c does not support password protected keys"); - return 0; + amqp_abort("rabbitmq-c does not support password protected keys"); + return 0; } int amqp_ssl_socket_set_key_buffer(amqp_socket_t *base, - const char *cert, - const void *key, - size_t n) + const char *cert, + const void *key, + size_t n) { - int status = 0; - BIO *buf = NULL; - RSA *rsa = NULL; - struct amqp_ssl_socket_t *self; - if (base->klass != &amqp_ssl_socket_class) { - amqp_abort("<%p> is not of type amqp_ssl_socket_t", base); - } - self = (struct amqp_ssl_socket_t *)base; - status = SSL_CTX_use_certificate_chain_file(self->ctx, cert); - if (1 != status) { - return -1; - } - buf = BIO_new_mem_buf((void *)key, n); - if (!buf) { - goto error; - } - rsa = PEM_read_bio_RSAPrivateKey(buf, NULL, password_cb, NULL); - if (!rsa) { - goto error; - } - status = SSL_CTX_use_RSAPrivateKey(self->ctx, rsa); - if (1 != status) { - goto error; - } + int status = 0; + BIO *buf = NULL; + RSA *rsa = NULL; + struct amqp_ssl_socket_t *self; + if (base->klass != &amqp_ssl_socket_class) { + amqp_abort("<%p> is not of type amqp_ssl_socket_t", base); + } + self = (struct amqp_ssl_socket_t *)base; + status = SSL_CTX_use_certificate_chain_file(self->ctx, cert); + if (1 != status) { + return -1; + } + buf = BIO_new_mem_buf((void *)key, n); + if (!buf) { + goto error; + } + rsa = PEM_read_bio_RSAPrivateKey(buf, NULL, password_cb, NULL); + if (!rsa) { + goto error; + } + status = SSL_CTX_use_RSAPrivateKey(self->ctx, rsa); + if (1 != status) { + goto error; + } exit: - BIO_vfree(buf); - RSA_free(rsa); - return status; + BIO_vfree(buf); + RSA_free(rsa); + return status; error: - status = -1; - goto exit; + status = -1; + goto exit; } int amqp_ssl_socket_set_cert(amqp_socket_t *base, - const char *cert) + const char *cert) { - struct amqp_ssl_socket_t *self; - if (base->klass != &amqp_ssl_socket_class) { - amqp_abort("<%p> is not of type amqp_ssl_socket_t", base); - } - self = (struct amqp_ssl_socket_t *)base; - int status = SSL_CTX_use_certificate_chain_file(self->ctx, cert); - if (1 != status) { - return -1; - } - return 0; + struct amqp_ssl_socket_t *self; + if (base->klass != &amqp_ssl_socket_class) { + amqp_abort("<%p> is not of type amqp_ssl_socket_t", base); + } + self = (struct amqp_ssl_socket_t *)base; + int status = SSL_CTX_use_certificate_chain_file(self->ctx, cert); + if (1 != status) { + return -1; + } + return 0; } void amqp_ssl_socket_set_verify(amqp_socket_t *base, - amqp_boolean_t verify) + amqp_boolean_t verify) { - struct amqp_ssl_socket_t *self; - if (base->klass != &amqp_ssl_socket_class) { - amqp_abort("<%p> is not of type amqp_ssl_socket_t", base); - } - self = (struct amqp_ssl_socket_t *)base; - self->verify = verify; + struct amqp_ssl_socket_t *self; + if (base->klass != &amqp_ssl_socket_class) { + amqp_abort("<%p> is not of type amqp_ssl_socket_t", base); + } + self = (struct amqp_ssl_socket_t *)base; + self->verify = verify; } void amqp_set_initialize_ssl_library(amqp_boolean_t do_initialize) { - if (!openssl_initialized) { - do_initialize_openssl = do_initialize; - } + if (!openssl_initialized) { + do_initialize_openssl = do_initialize; + } } #ifdef ENABLE_THREAD_SAFETY @@ -440,18 +440,17 @@ amqp_ssl_threadid_callback(void) void amqp_ssl_locking_callback(int mode, int n, - AMQP_UNUSED const char *file, - AMQP_UNUSED int line) + AMQP_UNUSED const char *file, + AMQP_UNUSED int line) { - if (mode & CRYPTO_LOCK) - { - if (pthread_mutex_lock(&amqp_openssl_lockarray[n])) + if (mode & CRYPTO_LOCK) { + if (pthread_mutex_lock(&amqp_openssl_lockarray[n])) { amqp_abort("Runtime error: Failure in trying to lock OpenSSL mutex"); - } - else - { - if (pthread_mutex_unlock(&amqp_openssl_lockarray[n])) + } + } else { + if (pthread_mutex_unlock(&amqp_openssl_lockarray[n])) { amqp_abort("Runtime error: Failure in trying to unlock OpenSSL mutex"); + } } } #endif /* ENABLE_THREAD_SAFETY */ @@ -461,40 +460,35 @@ initialize_openssl(void) { #ifdef _WIN32 /* No such thing as PTHREAD_INITIALIZE_MUTEX macro on Win32, so we use this */ - if (NULL == openssl_init_mutex) - { + if (NULL == openssl_init_mutex) { while (InterlockedExchange(&win32_create_mutex, 1) == 1) /* Loop, someone else is holding this lock */ ; - if (NULL == openssl_init_mutex) - { - if (pthread_mutex_init(&openssl_init_mutex, NULL)) + if (NULL == openssl_init_mutex) { + if (pthread_mutex_init(&openssl_init_mutex, NULL)) { return -1; + } } InterlockedExchange(&win32_create_mutex, 0); } #endif /* _WIN32 */ #ifdef ENABLE_THREAD_SAFETY - if (pthread_mutex_lock(&openssl_init_mutex)) + if (pthread_mutex_lock(&openssl_init_mutex)) { return -1; + } #endif /* ENABLE_THREAD_SAFETY */ - if (do_initialize_openssl) - { + if (do_initialize_openssl) { #ifdef ENABLE_THREAD_SAFETY - if (NULL == amqp_openssl_lockarray) - { + if (NULL == amqp_openssl_lockarray) { int i = 0; amqp_openssl_lockarray = calloc(CRYPTO_num_locks(), sizeof(pthread_mutex_t)); - if (!amqp_openssl_lockarray) - { + if (!amqp_openssl_lockarray) { pthread_mutex_unlock(&openssl_init_mutex); return -1; } - for (i = 0; i < CRYPTO_num_locks(); ++i) - { - if (pthread_mutex_init(&amqp_openssl_lockarray[i], NULL)) - { + for (i = 0; i < CRYPTO_num_locks(); ++i) { + if (pthread_mutex_init(&amqp_openssl_lockarray[i], NULL)) { free(amqp_openssl_lockarray); amqp_openssl_lockarray = NULL; pthread_mutex_unlock(&openssl_init_mutex); @@ -503,15 +497,13 @@ initialize_openssl(void) } } - if (0 == open_ssl_connections) - { + if (0 == open_ssl_connections) { CRYPTO_set_id_callback(amqp_ssl_threadid_callback); CRYPTO_set_locking_callback(amqp_ssl_locking_callback); } #endif /* ENABLE_THREAD_SAFETY */ - if (!openssl_initialized) - { + if (!openssl_initialized) { OPENSSL_config(NULL); SSL_library_init(); @@ -533,16 +525,17 @@ static int destroy_openssl(void) { #ifdef ENABLE_THREAD_SAFETY - if (pthread_mutex_lock(&openssl_init_mutex)) + if (pthread_mutex_lock(&openssl_init_mutex)) { return -1; + } #endif /* ENABLE_THREAD_SAFETY */ - if (open_ssl_connections > 0) + if (open_ssl_connections > 0) { --open_ssl_connections; + } #ifdef ENABLE_THREAD_SAFETY - if (0 == open_ssl_connections && do_initialize_openssl) - { + if (0 == open_ssl_connections && do_initialize_openssl) { /* Unsetting these allows the rabbitmq-c library to be unloaded * safely. We do leak the amqp_openssl_lockarray. Which is only * an issue if you repeatedly unload and load the library diff --git a/librabbitmq/amqp_polarssl.c b/librabbitmq/amqp_polarssl.c index 027ee47..b89f602 100644 --- a/librabbitmq/amqp_polarssl.c +++ b/librabbitmq/amqp_polarssl.c @@ -34,271 +34,271 @@ #include <stdlib.h> struct amqp_ssl_socket_t { - int sockfd; - entropy_context *entropy; - ctr_drbg_context *ctr_drbg; - x509_cert *cacert; - rsa_context *key; - x509_cert *cert; - ssl_context *ssl; - ssl_session *session; - char *buffer; - size_t length; + int sockfd; + entropy_context *entropy; + ctr_drbg_context *ctr_drbg; + x509_cert *cacert; + rsa_context *key; + x509_cert *cert; + ssl_context *ssl; + ssl_session *session; + char *buffer; + size_t length; }; static ssize_t amqp_ssl_socket_send(void *base, - const void *buf, - size_t len, - AMQP_UNUSED int flags) + const void *buf, + size_t len, + AMQP_UNUSED int flags) { - struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; - return ssl_write(self->ssl, buf, len); + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + return ssl_write(self->ssl, buf, len); } static ssize_t amqp_ssl_socket_writev(void *base, - const struct iovec *iov, - int iovcnt) + const struct iovec *iov, + int iovcnt) { - struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; - ssize_t written = -1; - char *bufferp; - size_t bytes; - int i; - bytes = 0; - for (i = 0; i < iovcnt; ++i) { - bytes += iov[i].iov_len; - } - if (self->length < bytes) { - free(self->buffer); - self->buffer = malloc(bytes); - if (!self->buffer) { - self->length = 0; - goto exit; - } - self->length = bytes; - } - bufferp = self->buffer; - for (i = 0; i < iovcnt; ++i) { - memcpy(bufferp, iov[i].iov_base, iov[i].iov_len); - bufferp += iov[i].iov_len; - } - written = ssl_write(self->ssl, (const unsigned char *)self->buffer, - bytes); + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + ssize_t written = -1; + char *bufferp; + size_t bytes; + int i; + bytes = 0; + for (i = 0; i < iovcnt; ++i) { + bytes += iov[i].iov_len; + } + if (self->length < bytes) { + free(self->buffer); + self->buffer = malloc(bytes); + if (!self->buffer) { + self->length = 0; + goto exit; + } + self->length = bytes; + } + bufferp = self->buffer; + for (i = 0; i < iovcnt; ++i) { + memcpy(bufferp, iov[i].iov_base, iov[i].iov_len); + bufferp += iov[i].iov_len; + } + written = ssl_write(self->ssl, (const unsigned char *)self->buffer, + bytes); exit: - return written; + return written; } static ssize_t amqp_ssl_socket_recv(void *base, - void *buf, - size_t len, - AMQP_UNUSED int flags) + void *buf, + size_t len, + AMQP_UNUSED int flags) { - struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; - return ssl_read(self->ssl, buf, len); + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + return ssl_read(self->ssl, buf, len); } static int amqp_ssl_socket_open(void *base, const char *host, int port) { - struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; - int status = net_connect(&self->sockfd, host, port); - if (status) { - return -1; - } - if (self->cacert) { - ssl_set_ca_chain(self->ssl, self->cacert, NULL, host); - } - ssl_set_bio(self->ssl, net_recv, &self->sockfd, - net_send, &self->sockfd); - if (self->key && self->cert) { - ssl_set_own_cert(self->ssl, self->cert, self->key); - } - while (0 != (status = ssl_handshake(self->ssl))) { - switch (status) { - case POLARSSL_ERR_NET_WANT_READ: - case POLARSSL_ERR_NET_WANT_WRITE: - continue; - default: - break; - } - } - return status; + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + int status = net_connect(&self->sockfd, host, port); + if (status) { + return -1; + } + if (self->cacert) { + ssl_set_ca_chain(self->ssl, self->cacert, NULL, host); + } + ssl_set_bio(self->ssl, net_recv, &self->sockfd, + net_send, &self->sockfd); + if (self->key && self->cert) { + ssl_set_own_cert(self->ssl, self->cert, self->key); + } + while (0 != (status = ssl_handshake(self->ssl))) { + switch (status) { + case POLARSSL_ERR_NET_WANT_READ: + case POLARSSL_ERR_NET_WANT_WRITE: + continue; + default: + break; + } + } + return status; } static int amqp_ssl_socket_close(void *base) { - int status = -1; - struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; - if (self) { - free(self->entropy); - free(self->ctr_drbg); - x509_free(self->cacert); - free(self->cacert); - rsa_free(self->key); - free(self->key); - x509_free(self->cert); - free(self->cert); - ssl_free(self->ssl); - free(self->ssl); - free(self->session); - free(self->buffer); - if (self->sockfd >= 0) { - net_close(self->sockfd); - status = 0; - } - free(self); - } - return status; + int status = -1; + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + if (self) { + free(self->entropy); + free(self->ctr_drbg); + x509_free(self->cacert); + free(self->cacert); + rsa_free(self->key); + free(self->key); + x509_free(self->cert); + free(self->cert); + ssl_free(self->ssl); + free(self->ssl); + free(self->session); + free(self->buffer); + if (self->sockfd >= 0) { + net_close(self->sockfd); + status = 0; + } + free(self); + } + return status; } static int amqp_ssl_socket_error(AMQP_UNUSED void *user_data) { - return -1; + return -1; } static int amqp_ssl_socket_get_sockfd(void *base) { - struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; - return self->sockfd; + struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; + return self->sockfd; } static const struct amqp_socket_class_t amqp_ssl_socket_class = { - amqp_ssl_socket_writev, /* writev */ - amqp_ssl_socket_send, /* send */ - amqp_ssl_socket_recv, /* recv */ - amqp_ssl_socket_open, /* open */ - amqp_ssl_socket_close, /* close */ - amqp_ssl_socket_error, /* error */ - amqp_ssl_socket_get_sockfd /* get_sockfd */ + amqp_ssl_socket_writev, /* writev */ + amqp_ssl_socket_send, /* send */ + amqp_ssl_socket_recv, /* recv */ + amqp_ssl_socket_open, /* open */ + amqp_ssl_socket_close, /* close */ + amqp_ssl_socket_error, /* error */ + amqp_ssl_socket_get_sockfd /* get_sockfd */ }; amqp_socket_t * amqp_ssl_socket_new(void) { - struct amqp_ssl_socket_t *self = calloc(1, sizeof(*self)); - int status; - if (!self) { - goto error; - } - self->entropy = calloc(1, sizeof(*self->entropy)); - if (!self->entropy) { - goto error; - } - self->sockfd = -1; - entropy_init(self->entropy); - self->ctr_drbg = calloc(1, sizeof(*self->ctr_drbg)); - if (!self->ctr_drbg) { - goto error; - } - status = ctr_drbg_init(self->ctr_drbg, entropy_func, self->entropy, - NULL, 0); - if (status) { - goto error; - } - self->ssl = calloc(1, sizeof(*self->ssl)); - if (!self->ssl) { - goto error; - } - status = ssl_init(self->ssl); - if (status) { - goto error; - } - ssl_set_endpoint(self->ssl, SSL_IS_CLIENT); - ssl_set_rng(self->ssl, ctr_drbg_random, self->ctr_drbg); - ssl_set_ciphersuites(self->ssl, ssl_default_ciphersuites); - ssl_set_authmode(self->ssl, SSL_VERIFY_REQUIRED); - self->session = calloc(1, sizeof(*self->session)); - if (!self->session) { - goto error; - } - ssl_set_session(self->ssl, 0, 0, self->session); - return (amqp_socket_t *)self; + struct amqp_ssl_socket_t *self = calloc(1, sizeof(*self)); + int status; + if (!self) { + goto error; + } + self->entropy = calloc(1, sizeof(*self->entropy)); + if (!self->entropy) { + goto error; + } + self->sockfd = -1; + entropy_init(self->entropy); + self->ctr_drbg = calloc(1, sizeof(*self->ctr_drbg)); + if (!self->ctr_drbg) { + goto error; + } + status = ctr_drbg_init(self->ctr_drbg, entropy_func, self->entropy, + NULL, 0); + if (status) { + goto error; + } + self->ssl = calloc(1, sizeof(*self->ssl)); + if (!self->ssl) { + goto error; + } + status = ssl_init(self->ssl); + if (status) { + goto error; + } + ssl_set_endpoint(self->ssl, SSL_IS_CLIENT); + ssl_set_rng(self->ssl, ctr_drbg_random, self->ctr_drbg); + ssl_set_ciphersuites(self->ssl, ssl_default_ciphersuites); + ssl_set_authmode(self->ssl, SSL_VERIFY_REQUIRED); + self->session = calloc(1, sizeof(*self->session)); + if (!self->session) { + goto error; + } + ssl_set_session(self->ssl, 0, 0, self->session); + return (amqp_socket_t *)self; error: - amqp_socket_close((amqp_socket_t *)self); - return NULL; + amqp_socket_close((amqp_socket_t *)self); + return NULL; } int amqp_ssl_socket_set_cacert(amqp_socket_t *base, - const char *cacert) + const char *cacert) { - int status; - struct amqp_ssl_socket_t *self; - if (base->klass != &amqp_ssl_socket_class) { - amqp_abort("<%p> is not of type amqp_ssl_socket_t", base); - } - self = (struct amqp_ssl_socket_t *)base; - self->cacert = calloc(1, sizeof(*self->cacert)); - if (!self->cacert) { - return -1; - } - status = x509parse_crtfile(self->cacert, cacert); - if (status) { - return -1; - } - return 0; + int status; + struct amqp_ssl_socket_t *self; + if (base->klass != &amqp_ssl_socket_class) { + amqp_abort("<%p> is not of type amqp_ssl_socket_t", base); + } + self = (struct amqp_ssl_socket_t *)base; + self->cacert = calloc(1, sizeof(*self->cacert)); + if (!self->cacert) { + return -1; + } + status = x509parse_crtfile(self->cacert, cacert); + if (status) { + return -1; + } + return 0; } int amqp_ssl_socket_set_key(amqp_socket_t *base, - const char *cert, - const char *key) + const char *cert, + const char *key) { - int status; - struct amqp_ssl_socket_t *self; - if (base->klass != &amqp_ssl_socket_class) { - amqp_abort("<%p> is not of type amqp_ssl_socket_t", base); - } - self = (struct amqp_ssl_socket_t *)base; - self->key = calloc(1, sizeof(*self->key)); - if (!self->key) { - return -1; - } - status = x509parse_keyfile(self->key, key, NULL); - if (status) { - return -1; - } - self->cert = calloc(1, sizeof(*self->cert)); - if (!self->cert) { - return -1; - } - status = x509parse_crtfile(self->cert, cert); - if (status) { - return -1; - } - return 0; + int status; + struct amqp_ssl_socket_t *self; + if (base->klass != &amqp_ssl_socket_class) { + amqp_abort("<%p> is not of type amqp_ssl_socket_t", base); + } + self = (struct amqp_ssl_socket_t *)base; + self->key = calloc(1, sizeof(*self->key)); + if (!self->key) { + return -1; + } + status = x509parse_keyfile(self->key, key, NULL); + if (status) { + return -1; + } + self->cert = calloc(1, sizeof(*self->cert)); + if (!self->cert) { + return -1; + } + status = x509parse_crtfile(self->cert, cert); + if (status) { + return -1; + } + return 0; } int amqp_ssl_socket_set_key_buffer(AMQP_UNUSED amqp_socket_t *base, - AMQP_UNUSED const char *cert, - AMQP_UNUSED const void *key, - AMQP_UNUSED size_t n) + AMQP_UNUSED const char *cert, + AMQP_UNUSED const void *key, + AMQP_UNUSED size_t n) { - amqp_abort("%s is not implemented for PolarSSL", __func__); - return -1; + amqp_abort("%s is not implemented for PolarSSL", __func__); + return -1; } void amqp_ssl_socket_set_verify(amqp_socket_t *base, - amqp_boolean_t verify) + amqp_boolean_t verify) { - struct amqp_ssl_socket_t *self; - if (base->klass != &amqp_ssl_socket_class) { - amqp_abort("<%p> is not of type amqp_ssl_socket_t", base); - } - self = (struct amqp_ssl_socket_t *)base; - if (verify) { - ssl_set_authmode(self->ssl, SSL_VERIFY_REQUIRED); - } else { - ssl_set_authmode(self->ssl, SSL_VERIFY_NONE); - } + struct amqp_ssl_socket_t *self; + if (base->klass != &amqp_ssl_socket_class) { + amqp_abort("<%p> is not of type amqp_ssl_socket_t", base); + } + self = (struct amqp_ssl_socket_t *)base; + if (verify) { + ssl_set_authmode(self->ssl, SSL_VERIFY_REQUIRED); + } else { + ssl_set_authmode(self->ssl, SSL_VERIFY_NONE); + } } void diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index 559be47..130d19a 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -141,22 +141,19 @@ int amqp_open_socket(char const *hostname, http://stackoverflow.com/questions/1953639/is-it-safe-to-cast-socket-to-int-under-win64 */ sockfd = (int)socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol); - if (-1 == sockfd) - { + if (-1 == sockfd) { last_error = -amqp_os_socket_error(); continue; } #ifdef DISABLE_SIGPIPE_WITH_SETSOCKOPT - if (0 != amqp_socket_setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one))) - { + if (0 != amqp_socket_setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one))) { last_error = -amqp_os_socket_error(); amqp_os_socket_close(sockfd); continue; } #endif /* DISABLE_SIGPIPE_WITH_SETSOCKOPT */ if (0 != amqp_socket_setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one)) - || 0 != connect(sockfd, addr->ai_addr, addr->ai_addrlen)) - { + || 0 != connect(sockfd, addr->ai_addr, addr->ai_addrlen)) { last_error = -amqp_os_socket_error(); amqp_os_socket_close(sockfd); continue; @@ -406,14 +403,14 @@ retry: */ if (!((frame.frame_type == AMQP_FRAME_METHOD) && ( - ((frame.channel == channel) - && (amqp_id_in_reply_list(frame.payload.method.id, expected_reply_ids) - || (frame.payload.method.id == AMQP_CHANNEL_CLOSE_METHOD))) - || - ((frame.channel == 0) - && (frame.payload.method.id == AMQP_CONNECTION_CLOSE_METHOD)) - ) - )) { + ((frame.channel == channel) + && (amqp_id_in_reply_list(frame.payload.method.id, expected_reply_ids) + || (frame.payload.method.id == AMQP_CHANNEL_CLOSE_METHOD))) + || + ((frame.channel == 0) + && (frame.payload.method.id == AMQP_CONNECTION_CLOSE_METHOD)) + ) + )) { amqp_frame_t *frame_copy = amqp_pool_alloc(&state->decoding_pool, sizeof(amqp_frame_t)); amqp_link_t *link = amqp_pool_alloc(&state->decoding_pool, sizeof(amqp_link_t)); @@ -495,13 +492,13 @@ static int amqp_table_contains_entry(const amqp_table_t *table, } static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state, - char const *vhost, - int channel_max, - int frame_max, - int heartbeat, - const amqp_table_t *client_properties, - amqp_sasl_method_enum sasl_method, - va_list vl) + char const *vhost, + int channel_max, + int frame_max, + int heartbeat, + const amqp_table_t *client_properties, + amqp_sasl_method_enum sasl_method, + va_list vl) { int res; amqp_method_t method; @@ -536,7 +533,7 @@ static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state, amqp_table_t default_table; amqp_connection_start_ok_t s; amqp_bytes_t response_bytes = sasl_response(&state->decoding_pool, - sasl_method, vl); + sasl_method, vl); if (response_bytes.bytes == NULL) { res = -ERROR_NO_MEMORY; @@ -572,7 +569,7 @@ static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state, amqp_table_entry_t *current_entry; s.client_properties.entries = amqp_pool_alloc(&state->decoding_pool, - sizeof(amqp_table_entry_t) * (default_table.num_entries + client_properties->num_entries)); + sizeof(amqp_table_entry_t) * (default_table.num_entries + client_properties->num_entries)); if (NULL == s.client_properties.entries) { res = -ERROR_NO_MEMORY; goto error_res; @@ -707,13 +704,13 @@ amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, } amqp_rpc_reply_t amqp_login_with_properties(amqp_connection_state_t state, - char const *vhost, - int channel_max, - int frame_max, - int heartbeat, - const amqp_table_t *client_properties, - amqp_sasl_method_enum sasl_method, - ...) + char const *vhost, + int channel_max, + int frame_max, + int heartbeat, + const amqp_table_t *client_properties, + amqp_sasl_method_enum sasl_method, + ...) { va_list vl; diff --git a/librabbitmq/amqp_socket.h b/librabbitmq/amqp_socket.h index 5a6dfde..63c5e8d 100644 --- a/librabbitmq/amqp_socket.h +++ b/librabbitmq/amqp_socket.h @@ -44,18 +44,18 @@ typedef int (*amqp_socket_get_sockfd_fn)(void *); /** V-table for amqp_socket_t */ struct amqp_socket_class_t { - amqp_socket_writev_fn writev; - amqp_socket_send_fn send; - amqp_socket_recv_fn recv; - amqp_socket_open_fn open; - amqp_socket_close_fn close; - amqp_socket_error_fn error; - amqp_socket_get_sockfd_fn get_sockfd; + amqp_socket_writev_fn writev; + amqp_socket_send_fn send; + amqp_socket_recv_fn recv; + amqp_socket_open_fn open; + amqp_socket_close_fn close; + amqp_socket_error_fn error; + amqp_socket_get_sockfd_fn get_sockfd; }; /** Abstract base class for amqp_socket_t */ struct amqp_socket_t_ { - const struct amqp_socket_class_t *klass; + const struct amqp_socket_class_t *klass; }; /** diff --git a/librabbitmq/amqp_ssl_socket.h b/librabbitmq/amqp_ssl_socket.h index fe61cd3..6348667 100644 --- a/librabbitmq/amqp_ssl_socket.h +++ b/librabbitmq/amqp_ssl_socket.h @@ -56,7 +56,7 @@ AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_ssl_socket_set_cacert(amqp_socket_t *self, - const char *cacert); + const char *cacert); /** * Set the client key. @@ -71,8 +71,8 @@ AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_ssl_socket_set_key(amqp_socket_t *self, - const char *cert, - const char *key); + const char *cert, + const char *key); /** * Set the client key from a buffer. @@ -88,9 +88,9 @@ AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_ssl_socket_set_key_buffer(amqp_socket_t *self, - const char *cert, - const void *key, - size_t n); + const char *cert, + const void *key, + size_t n); /** * Enable or disable peer verification. @@ -106,7 +106,7 @@ AMQP_PUBLIC_FUNCTION void AMQP_CALL amqp_ssl_socket_set_verify(amqp_socket_t *self, - amqp_boolean_t verify); + amqp_boolean_t verify); /** * Sets whether rabbitmq-c initializes the underlying SSL library. diff --git a/librabbitmq/amqp_tcp_socket.c b/librabbitmq/amqp_tcp_socket.c index cd3f6ce..f65a6ce 100644 --- a/librabbitmq/amqp_tcp_socket.c +++ b/librabbitmq/amqp_tcp_socket.c @@ -31,96 +31,96 @@ #include <stdlib.h> struct amqp_tcp_socket_t { - const struct amqp_socket_class_t *klass; - int sockfd; + const struct amqp_socket_class_t *klass; + int sockfd; }; static ssize_t amqp_tcp_socket_writev(void *base, const struct iovec *iov, int iovcnt) { - struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; - return amqp_os_socket_writev(self->sockfd, iov, iovcnt); + struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; + return amqp_os_socket_writev(self->sockfd, iov, iovcnt); } static ssize_t amqp_tcp_socket_send(void *base, const void *buf, size_t len, int flags) { - struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; - return send(self->sockfd, buf, len, flags); + struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; + return send(self->sockfd, buf, len, flags); } static ssize_t amqp_tcp_socket_recv(void *base, void *buf, size_t len, int flags) { - struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; - return recv(self->sockfd, buf, len, flags); + struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; + return recv(self->sockfd, buf, len, flags); } static int amqp_tcp_socket_open(void *base, const char *host, int port) { - struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; - self->sockfd = amqp_open_socket(host, port); - if (0 > self->sockfd) { - return -1; - } - return 0; + struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; + self->sockfd = amqp_open_socket(host, port); + if (0 > self->sockfd) { + return -1; + } + return 0; } static int amqp_tcp_socket_close(void *base) { - struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; - int status = -1; - if (self) { - status = amqp_os_socket_close(self->sockfd); - free(self); - } - return status; + struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; + int status = -1; + if (self) { + status = amqp_os_socket_close(self->sockfd); + free(self); + } + return status; } static int amqp_tcp_socket_error(AMQP_UNUSED void *base) { - return amqp_os_socket_error(); + return amqp_os_socket_error(); } static int amqp_tcp_socket_get_sockfd(void *base) { - struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; - return self->sockfd; + struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; + return self->sockfd; } static const struct amqp_socket_class_t amqp_tcp_socket_class = { - amqp_tcp_socket_writev, /* writev */ - amqp_tcp_socket_send, /* send */ - amqp_tcp_socket_recv, /* recv */ - amqp_tcp_socket_open, /* open */ - amqp_tcp_socket_close, /* close */ - amqp_tcp_socket_error, /* error */ - amqp_tcp_socket_get_sockfd /* get_sockfd */ + amqp_tcp_socket_writev, /* writev */ + amqp_tcp_socket_send, /* send */ + amqp_tcp_socket_recv, /* recv */ + amqp_tcp_socket_open, /* open */ + amqp_tcp_socket_close, /* close */ + amqp_tcp_socket_error, /* error */ + amqp_tcp_socket_get_sockfd /* get_sockfd */ }; amqp_socket_t * amqp_tcp_socket_new(void) { - struct amqp_tcp_socket_t *self = calloc(1, sizeof(*self)); - if (!self) { - return NULL; - } - self->klass = &amqp_tcp_socket_class; - self->sockfd = -1; - return (amqp_socket_t *)self; + struct amqp_tcp_socket_t *self = calloc(1, sizeof(*self)); + if (!self) { + return NULL; + } + self->klass = &amqp_tcp_socket_class; + self->sockfd = -1; + return (amqp_socket_t *)self; } void amqp_tcp_socket_set_sockfd(amqp_socket_t *base, int sockfd) { - struct amqp_tcp_socket_t *self; - if (base->klass != &amqp_tcp_socket_class) { - amqp_abort("<%p> is not of type amqp_tcp_socket_t", base); - } - self = (struct amqp_tcp_socket_t *)base; - self->sockfd = sockfd; + struct amqp_tcp_socket_t *self; + if (base->klass != &amqp_tcp_socket_class) { + amqp_abort("<%p> is not of type amqp_tcp_socket_t", base); + } + self = (struct amqp_tcp_socket_t *)base; + self->sockfd = sockfd; } diff --git a/librabbitmq/amqp_url.c b/librabbitmq/amqp_url.c index b15675c..b200adc 100644 --- a/librabbitmq/amqp_url.c +++ b/librabbitmq/amqp_url.c @@ -141,8 +141,9 @@ int amqp_parse_url(char *url, struct amqp_connection_info *parsed) /* What might have been the host and port were in fact the username and password */ parsed->user = host; - if (port) + if (port) { parsed->password = port; + } port = NULL; host = start = url; @@ -152,14 +153,16 @@ int amqp_parse_url(char *url, struct amqp_connection_info *parsed) if (delim == '[') { /* IPv6 address. The bracket should be the first character in the host. */ - if (host != start || *host != 0) + if (host != start || *host != 0) { goto out; + } start = url; delim = find_delim(&url, 0); - if (delim != ']') + if (delim != ']') { goto out; + } parsed->host = start; start = url; @@ -167,13 +170,14 @@ int amqp_parse_url(char *url, struct amqp_connection_info *parsed) /* Closing bracket should be the last character in the host. */ - if (*start != 0) + if (*start != 0) { goto out; - } - else { + } + } else { /* If we haven't seen the host yet, this is it. */ - if (*host != 0) + if (*host != 0) { parsed->host = host; + } } if (delim == ':') { @@ -185,8 +189,9 @@ int amqp_parse_url(char *url, struct amqp_connection_info *parsed) char *end; long portnum = strtol(port, &end, 10); - if (port == end || *end != 0 || portnum < 0 || portnum > 65535) + if (port == end || *end != 0 || portnum < 0 || portnum > 65535) { goto out; + } parsed->port = portnum; } @@ -195,13 +200,13 @@ int amqp_parse_url(char *url, struct amqp_connection_info *parsed) start = url; delim = find_delim(&url, 1); - if (delim != 0) + if (delim != 0) { goto out; + } parsed->vhost = start; res = 0; - } - else if (delim == 0) { + } else if (delim == 0) { res = 0; } diff --git a/librabbitmq/unix/socket.c b/librabbitmq/unix/socket.c index e6621cc..4615480 100644 --- a/librabbitmq/unix/socket.c +++ b/librabbitmq/unix/socket.c @@ -85,18 +85,18 @@ amqp_os_error_string(int err) int amqp_os_socket_close(int sockfd) { - return close(sockfd); + return close(sockfd); } ssize_t amqp_os_socket_writev(int sockfd, const struct iovec *iov, - int iovcnt) + int iovcnt) { - return writev(sockfd, iov, iovcnt); + return writev(sockfd, iov, iovcnt); } int amqp_os_socket_error(void) { - return errno | ERROR_CATEGORY_OS; + return errno | ERROR_CATEGORY_OS; } diff --git a/librabbitmq/win32/socket.c b/librabbitmq/win32/socket.c index 815530f..fa2b0f7 100644 --- a/librabbitmq/win32/socket.c +++ b/librabbitmq/win32/socket.c @@ -95,12 +95,12 @@ amqp_socket_setsockopt(int sock, int level, int optname, int amqp_socket_close(int sockfd, AMQP_UNUSED void *user_data) { - return closesocket(sockfd); + return closesocket(sockfd); } ssize_t amqp_socket_writev(int sock, struct iovec *iov, int nvecs, - AMQP_UNUSED void *user_data) + AMQP_UNUSED void *user_data) { DWORD ret; if (WSASend(sock, (LPWSABUF)iov, nvecs, &ret, 0, NULL, NULL) == 0) { diff --git a/librabbitmq/win32/socket.h b/librabbitmq/win32/socket.h index db40a97..58e1a99 100644 --- a/librabbitmq/win32/socket.h +++ b/librabbitmq/win32/socket.h @@ -60,7 +60,7 @@ amqp_socket_setsockopt(int sock, int level, int optname, const void *optval, size_t optlen); ssize_t -amqp_socket_writev(int sock, struct iovec *iov, int nvecs, AMQP_UNUSED void* user_data); +amqp_socket_writev(int sock, struct iovec *iov, int nvecs, AMQP_UNUSED void *user_data); int amqp_socket_error(AMQP_UNUSED void *user_data); diff --git a/librabbitmq/win32/threads.c b/librabbitmq/win32/threads.c index 0501408..1559c5f 100644 --- a/librabbitmq/win32/threads.c +++ b/librabbitmq/win32/threads.c @@ -11,8 +11,9 @@ int pthread_mutex_init(pthread_mutex_t *mutex, void *attr) { *mutex = malloc(sizeof(CRITICAL_SECTION)); - if (!*mutex) + if (!*mutex) { return 1; + } InitializeCriticalSection(*mutex); return 0; } @@ -20,8 +21,9 @@ pthread_mutex_init(pthread_mutex_t *mutex, void *attr) int pthread_mutex_lock(pthread_mutex_t *mutex) { - if (!*mutex) + if (!*mutex) { return 1; + } EnterCriticalSection(*mutex); return 0; @@ -30,8 +32,9 @@ pthread_mutex_lock(pthread_mutex_t *mutex) int pthread_mutex_unlock(pthread_mutex_t *mutex) { - if (!*mutex) + if (!*mutex) { return 1; + } LeaveCriticalSection(*mutex); return 0; diff --git a/tests/test_parse_url.c b/tests/test_parse_url.c index 241cf8c..da3e681 100644 --- a/tests/test_parse_url.c +++ b/tests/test_parse_url.c @@ -111,14 +111,14 @@ int main(void) { /* From the spec */ parse_success("amqp://user:pass@host:10000/vhost", "user", "pass", - "host", 10000, "vhost"); + "host", 10000, "vhost"); parse_success("amqps://user:pass@host:10000/vhost", "user", "pass", - "host", 10000, "vhost"); + "host", 10000, "vhost"); parse_success("amqp://user%61:%61pass@ho%61st:10000/v%2fhost", - "usera", "apass", "hoast", 10000, "v/host"); + "usera", "apass", "hoast", 10000, "v/host"); parse_success("amqps://user%61:%61pass@ho%61st:10000/v%2fhost", - "usera", "apass", "hoast", 10000, "v/host"); + "usera", "apass", "hoast", 10000, "v/host"); parse_success("amqp://", "guest", "guest", "localhost", 5672, "/"); parse_success("amqps://", "guest", "guest", "localhost", 5671, "/"); @@ -130,22 +130,22 @@ int main(void) parse_success("amqps://user@", "user", "guest", "localhost", 5671, "/"); parse_success("amqp://user:pass@", "user", "pass", - "localhost", 5672, "/"); + "localhost", 5672, "/"); parse_success("amqps://user:pass@", "user", "pass", - "localhost", 5671, "/"); + "localhost", 5671, "/"); parse_success("amqp://host", "guest", "guest", "host", 5672, "/"); parse_success("amqps://host", "guest", "guest", "host", 5671, "/"); parse_success("amqp://:10000", "guest", "guest", "localhost", 10000, - "/"); + "/"); parse_success("amqps://:10000", "guest", "guest", "localhost", 10000, - "/"); + "/"); parse_success("amqp:///vhost", "guest", "guest", "localhost", 5672, - "vhost"); + "vhost"); parse_success("amqps:///vhost", "guest", "guest", "localhost", 5671, - "vhost"); + "vhost"); parse_success("amqp://host/", "guest", "guest", "host", 5672, ""); parse_success("amqps://host/", "guest", "guest", "host", 5671, ""); @@ -164,54 +164,54 @@ int main(void) parse_success("amqps://[::1]:100", "guest", "guest", "::1", 100, "/"); parse_success("amqp://host/blah", "guest", "guest", - "host", 5672, "blah"); + "host", 5672, "blah"); parse_success("amqps://host/blah", "guest", "guest", - "host", 5671, "blah"); + "host", 5671, "blah"); parse_success("amqp://host:100/blah", "guest", "guest", - "host", 100, "blah"); + "host", 100, "blah"); parse_success("amqps://host:100/blah", "guest", "guest", - "host", 100, "blah"); + "host", 100, "blah"); parse_success("amqp://:100/blah", "guest", "guest", - "localhost", 100, "blah"); + "localhost", 100, "blah"); parse_success("amqps://:100/blah", "guest", "guest", - "localhost", 100, "blah"); + "localhost", 100, "blah"); parse_success("amqp://[::1]/blah", "guest", "guest", - "::1", 5672, "blah"); + "::1", 5672, "blah"); parse_success("amqps://[::1]/blah", "guest", "guest", - "::1", 5671, "blah"); + "::1", 5671, "blah"); parse_success("amqp://[::1]:100/blah", "guest", "guest", - "::1", 100, "blah"); + "::1", 100, "blah"); parse_success("amqps://[::1]:100/blah", "guest", "guest", - "::1", 100, "blah"); + "::1", 100, "blah"); parse_success("amqp://user:pass@host", "user", "pass", - "host", 5672, "/"); + "host", 5672, "/"); parse_success("amqps://user:pass@host", "user", "pass", - "host", 5671, "/"); + "host", 5671, "/"); parse_success("amqp://user:pass@host:100", "user", "pass", - "host", 100, "/"); + "host", 100, "/"); parse_success("amqps://user:pass@host:100", "user", "pass", - "host", 100, "/"); + "host", 100, "/"); parse_success("amqp://user:pass@:100", "user", "pass", - "localhost", 100, "/"); + "localhost", 100, "/"); parse_success("amqps://user:pass@:100", "user", "pass", - "localhost", 100, "/"); + "localhost", 100, "/"); parse_success("amqp://user:pass@[::1]", "user", "pass", - "::1", 5672, "/"); + "::1", 5672, "/"); parse_success("amqps://user:pass@[::1]", "user", "pass", - "::1", 5671, "/"); + "::1", 5671, "/"); parse_success("amqp://user:pass@[::1]:100", "user", "pass", - "::1", 100, "/"); + "::1", 100, "/"); parse_success("amqps://user:pass@[::1]:100", "user", "pass", - "::1", 100, "/"); + "::1", 100, "/"); /* Various failure cases */ parse_fail("http://www.rabbitmq.com"); diff --git a/tools/common.c b/tools/common.c index c1844ae..0c61d16 100644 --- a/tools/common.c +++ b/tools/common.c @@ -209,14 +209,22 @@ struct poptOption connect_options[] = { "the password to login with", "password" }, #ifdef WITH_SSL - {"ssl", 0, POPT_ARG_NONE, &amqp_ssl, 0, - "connect over SSL/TLS", NULL}, - {"cacert", 0, POPT_ARG_STRING, &amqp_cacert, 0, - "path to the CA certificate file", "cacert.pem"}, - {"key", 0, POPT_ARG_STRING, &amqp_key, 0, - "path to the client private key file", "key.pem"}, - {"cert", 0, POPT_ARG_STRING, &amqp_cert, 0, - "path to the client certificate file", "cert.pem"}, + { + "ssl", 0, POPT_ARG_NONE, &amqp_ssl, 0, + "connect over SSL/TLS", NULL + }, + { + "cacert", 0, POPT_ARG_STRING, &amqp_cacert, 0, + "path to the CA certificate file", "cacert.pem" + }, + { + "key", 0, POPT_ARG_STRING, &amqp_key, 0, + "path to the client private key file", "key.pem" + }, + { + "cert", 0, POPT_ARG_STRING, &amqp_cert, 0, + "path to the client certificate file", "cert.pem" + }, #endif /* WITH_SSL */ { NULL, '\0', 0, NULL, 0, NULL, NULL } }; @@ -234,7 +242,7 @@ static void init_connection_info(struct amqp_connection_info *ci) if (amqp_url) die_amqp_error(amqp_parse_url(strdup(amqp_url), ci), - "Parsing URL '%s'", amqp_url); + "Parsing URL '%s'", amqp_url); if (amqp_server) { char *colon; @@ -253,7 +261,7 @@ static void init_connection_info(struct amqp_connection_info *ci) --url now allows connection options to be specificied concisely. */ fprintf(stderr, "Specifying the port number with" - " --server is deprecated\n"); + " --server is deprecated\n"); host_len = colon - amqp_server; ci->host = malloc(host_len + 1); @@ -353,11 +361,12 @@ amqp_connection_state_t make_connection(void) } amqp_set_socket(conn, socket); die_rpc(amqp_login(conn, ci.vhost, 0, 131072, 0, - AMQP_SASL_METHOD_PLAIN, - ci.user, ci.password), - "logging in to AMQP server"); - if (!amqp_channel_open(conn, 1)) + AMQP_SASL_METHOD_PLAIN, + ci.user, ci.password), + "logging in to AMQP server"); + if (!amqp_channel_open(conn, 1)) { die_rpc(amqp_get_rpc_reply(conn), "opening channel"); + } return conn; } |