summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--examples/amqps_bind.c15
-rw-r--r--examples/amqps_consumer.c34
-rw-r--r--examples/amqps_exchange_declare.c9
-rw-r--r--examples/amqps_listen.c74
-rw-r--r--examples/amqps_listenq.c70
-rw-r--r--examples/amqps_producer.c31
-rw-r--r--examples/amqps_sendstring.c23
-rw-r--r--examples/amqps_unbind.c15
-rw-r--r--examples/utils.c3
-rw-r--r--librabbitmq/amqp.h10
-rw-r--r--librabbitmq/amqp_connection.c3
-rw-r--r--librabbitmq/amqp_cyassl.c244
-rw-r--r--librabbitmq/amqp_gnutls.c406
-rw-r--r--librabbitmq/amqp_openssl.c613
-rw-r--r--librabbitmq/amqp_polarssl.c400
-rw-r--r--librabbitmq/amqp_socket.c57
-rw-r--r--librabbitmq/amqp_socket.h16
-rw-r--r--librabbitmq/amqp_ssl_socket.h14
-rw-r--r--librabbitmq/amqp_tcp_socket.c88
-rw-r--r--librabbitmq/amqp_url.c27
-rw-r--r--librabbitmq/unix/socket.c8
-rw-r--r--librabbitmq/win32/socket.c4
-rw-r--r--librabbitmq/win32/socket.h2
-rw-r--r--librabbitmq/win32/threads.c9
-rw-r--r--tests/test_parse_url.c60
-rw-r--r--tools/common.c37
26 files changed, 1152 insertions, 1120 deletions
diff --git a/examples/amqps_bind.c b/examples/amqps_bind.c
index 8724ad7..53c5256 100644
--- a/examples/amqps_bind.c
+++ b/examples/amqps_bind.c
@@ -41,7 +41,8 @@
#include "utils.h"
-int main(int argc, char const * const *argv) {
+int main(int argc, char const *const *argv)
+{
char const *hostname;
int port, status;
char const *exchange;
@@ -52,7 +53,7 @@ int main(int argc, char const * const *argv) {
if (argc < 6) {
fprintf(stderr, "Usage: amqps_bind host port exchange bindingkey queue "
- "[cacert.pem [key.pem cert.pem]]\n");
+ "[cacert.pem [key.pem cert.pem]]\n");
return 1;
}
@@ -90,15 +91,15 @@ int main(int argc, char const * const *argv) {
amqp_set_socket(conn, socket);
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
- "Logging in");
+ "Logging in");
amqp_channel_open(conn, 1);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
amqp_queue_bind(conn, 1,
- amqp_cstring_bytes(queue),
- amqp_cstring_bytes(exchange),
- amqp_cstring_bytes(bindingkey),
- amqp_empty_table);
+ amqp_cstring_bytes(queue),
+ amqp_cstring_bytes(exchange),
+ amqp_cstring_bytes(bindingkey),
+ amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Unbinding");
die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
diff --git a/examples/amqps_consumer.c b/examples/amqps_consumer.c
index d935c90..c02168a 100644
--- a/examples/amqps_consumer.c
+++ b/examples/amqps_consumer.c
@@ -66,7 +66,7 @@ static void run(amqp_connection_state_t conn)
int countOverInterval = received - previous_received;
double intervalRate = countOverInterval / ((now - previous_report_time) / 1000000.0);
printf("%d ms: Received %d - %d since last report (%d Hz)\n",
- (int)(now - start_time) / 1000, received, countOverInterval, (int) intervalRate);
+ (int)(now - start_time) / 1000, received, countOverInterval, (int) intervalRate);
previous_received = received;
previous_report_time = now;
@@ -75,18 +75,22 @@ static void run(amqp_connection_state_t conn)
amqp_maybe_release_buffers(conn);
result = amqp_simple_wait_frame(conn, &frame);
- if (result < 0)
+ if (result < 0) {
return;
+ }
- if (frame.frame_type != AMQP_FRAME_METHOD)
+ if (frame.frame_type != AMQP_FRAME_METHOD) {
continue;
+ }
- if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD)
+ if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) {
continue;
+ }
result = amqp_simple_wait_frame(conn, &frame);
- if (result < 0)
+ if (result < 0) {
return;
+ }
if (frame.frame_type != AMQP_FRAME_HEADER) {
fprintf(stderr, "Expected header!");
@@ -98,12 +102,13 @@ static void run(amqp_connection_state_t conn)
while (body_received < body_target) {
result = amqp_simple_wait_frame(conn, &frame);
- if (result < 0)
- return;
+ if (result < 0) {
+ return;
+ }
if (frame.frame_type != AMQP_FRAME_BODY) {
- fprintf(stderr, "Expected body!");
- abort();
+ fprintf(stderr, "Expected body!");
+ abort();
}
body_received += frame.payload.body_fragment.len;
@@ -114,7 +119,8 @@ static void run(amqp_connection_state_t conn)
}
}
-int main(int argc, char const * const *argv) {
+int main(int argc, char const *const *argv)
+{
char const *hostname;
int port, status;
char const *exchange;
@@ -125,7 +131,7 @@ int main(int argc, char const * const *argv) {
if (argc < 3) {
fprintf(stderr, "Usage: amqps_consumer host port "
- "[cacert.pem [key.pem cert.pem]]\n");
+ "[cacert.pem [key.pem cert.pem]]\n");
return 1;
}
@@ -162,13 +168,13 @@ int main(int argc, char const * const *argv) {
amqp_set_socket(conn, socket);
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
- "Logging in");
+ "Logging in");
amqp_channel_open(conn, 1);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
{
amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_empty_bytes, 0, 0, 0, 1,
- amqp_empty_table);
+ amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue");
queuename = amqp_bytes_malloc_dup(r->queue);
if (queuename.bytes == NULL) {
@@ -178,7 +184,7 @@ int main(int argc, char const * const *argv) {
}
amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey),
- amqp_empty_table);
+ amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue");
amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
diff --git a/examples/amqps_exchange_declare.c b/examples/amqps_exchange_declare.c
index c9f2e6c..199c0c5 100644
--- a/examples/amqps_exchange_declare.c
+++ b/examples/amqps_exchange_declare.c
@@ -41,7 +41,8 @@
#include "utils.h"
-int main(int argc, char const * const *argv) {
+int main(int argc, char const *const *argv)
+{
char const *hostname;
int port, status;
char const *exchange;
@@ -51,7 +52,7 @@ int main(int argc, char const * const *argv) {
if (argc < 5) {
fprintf(stderr, "Usage: amqps_exchange_declare host port exchange "
- "exchangetype [cacert.pem [key.pem cert.pem]]\n");
+ "exchangetype [cacert.pem [key.pem cert.pem]]\n");
return 1;
}
@@ -88,12 +89,12 @@ int main(int argc, char const * const *argv) {
amqp_set_socket(conn, socket);
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
- "Logging in");
+ "Logging in");
amqp_channel_open(conn, 1);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
amqp_exchange_declare(conn, 1, amqp_cstring_bytes(exchange), amqp_cstring_bytes(exchangetype),
- 0, 0, amqp_empty_table);
+ 0, 0, amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring exchange");
die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
diff --git a/examples/amqps_listen.c b/examples/amqps_listen.c
index 9be9acb..728a343 100644
--- a/examples/amqps_listen.c
+++ b/examples/amqps_listen.c
@@ -43,7 +43,8 @@
#include "utils.h"
-int main(int argc, char const * const *argv) {
+int main(int argc, char const *const *argv)
+{
char const *hostname;
int port, status;
char const *exchange;
@@ -55,7 +56,7 @@ int main(int argc, char const * const *argv) {
if (argc < 5) {
fprintf(stderr, "Usage: amqps_listen host port exchange bindingkey "
- "[cacert.pem [key.pem cert.pem]]\n");
+ "[cacert.pem [key.pem cert.pem]]\n");
return 1;
}
@@ -92,13 +93,13 @@ int main(int argc, char const * const *argv) {
amqp_set_socket(conn, socket);
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
- "Logging in");
+ "Logging in");
amqp_channel_open(conn, 1);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
{
amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_empty_bytes, 0, 0, 0, 1,
- amqp_empty_table);
+ amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue");
queuename = amqp_bytes_malloc_dup(r->queue);
if (queuename.bytes == NULL) {
@@ -108,7 +109,7 @@ int main(int argc, char const * const *argv) {
}
amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey),
- amqp_empty_table);
+ amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue");
amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
@@ -127,35 +128,39 @@ int main(int argc, char const * const *argv) {
amqp_maybe_release_buffers(conn);
result = amqp_simple_wait_frame(conn, &frame);
printf("Result %d\n", result);
- if (result < 0)
- break;
+ if (result < 0) {
+ break;
+ }
printf("Frame type %d, channel %d\n", frame.frame_type, frame.channel);
- if (frame.frame_type != AMQP_FRAME_METHOD)
- continue;
+ if (frame.frame_type != AMQP_FRAME_METHOD) {
+ continue;
+ }
printf("Method %s\n", amqp_method_name(frame.payload.method.id));
- if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD)
- continue;
+ if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) {
+ continue;
+ }
d = (amqp_basic_deliver_t *) frame.payload.method.decoded;
printf("Delivery %u, exchange %.*s routingkey %.*s\n",
- (unsigned) d->delivery_tag,
- (int) d->exchange.len, (char *) d->exchange.bytes,
- (int) d->routing_key.len, (char *) d->routing_key.bytes);
+ (unsigned) d->delivery_tag,
+ (int) d->exchange.len, (char *) d->exchange.bytes,
+ (int) d->routing_key.len, (char *) d->routing_key.bytes);
result = amqp_simple_wait_frame(conn, &frame);
- if (result < 0)
- break;
+ if (result < 0) {
+ break;
+ }
if (frame.frame_type != AMQP_FRAME_HEADER) {
- fprintf(stderr, "Expected header!");
- abort();
+ fprintf(stderr, "Expected header!");
+ abort();
}
p = (amqp_basic_properties_t *) frame.payload.properties.decoded;
if (p->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
- printf("Content-type: %.*s\n",
- (int) p->content_type.len, (char *) p->content_type.bytes);
+ printf("Content-type: %.*s\n",
+ (int) p->content_type.len, (char *) p->content_type.bytes);
}
printf("----\n");
@@ -163,26 +168,27 @@ int main(int argc, char const * const *argv) {
body_received = 0;
while (body_received < body_target) {
- result = amqp_simple_wait_frame(conn, &frame);
- if (result < 0)
- break;
+ result = amqp_simple_wait_frame(conn, &frame);
+ if (result < 0) {
+ break;
+ }
- if (frame.frame_type != AMQP_FRAME_BODY) {
- fprintf(stderr, "Expected body!");
- abort();
- }
+ if (frame.frame_type != AMQP_FRAME_BODY) {
+ fprintf(stderr, "Expected body!");
+ abort();
+ }
- body_received += frame.payload.body_fragment.len;
- assert(body_received <= body_target);
+ body_received += frame.payload.body_fragment.len;
+ assert(body_received <= body_target);
- amqp_dump(frame.payload.body_fragment.bytes,
- frame.payload.body_fragment.len);
+ amqp_dump(frame.payload.body_fragment.bytes,
+ frame.payload.body_fragment.len);
}
if (body_received != body_target) {
- /* Can only happen when amqp_simple_wait_frame returns <= 0 */
- /* We break here to close the connection */
- break;
+ /* Can only happen when amqp_simple_wait_frame returns <= 0 */
+ /* We break here to close the connection */
+ break;
}
}
}
diff --git a/examples/amqps_listenq.c b/examples/amqps_listenq.c
index 3296ffe..ff28e24 100644
--- a/examples/amqps_listenq.c
+++ b/examples/amqps_listenq.c
@@ -43,7 +43,8 @@
#include "utils.h"
-int main(int argc, char const * const *argv) {
+int main(int argc, char const *const *argv)
+{
char const *hostname;
int port, status;
char const *queuename;
@@ -52,7 +53,7 @@ int main(int argc, char const * const *argv) {
if (argc < 4) {
fprintf(stderr, "Usage: amqps_listenq host port queuename "
- "[cacert.pem [key.pem cert.pem]]\n");
+ "[cacert.pem [key.pem cert.pem]]\n");
return 1;
}
@@ -88,7 +89,7 @@ int main(int argc, char const * const *argv) {
amqp_set_socket(conn, socket);
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
- "Logging in");
+ "Logging in");
amqp_channel_open(conn, 1);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
@@ -108,35 +109,39 @@ int main(int argc, char const * const *argv) {
amqp_maybe_release_buffers(conn);
result = amqp_simple_wait_frame(conn, &frame);
printf("Result %d\n", result);
- if (result < 0)
- break;
+ if (result < 0) {
+ break;
+ }
printf("Frame type %d, channel %d\n", frame.frame_type, frame.channel);
- if (frame.frame_type != AMQP_FRAME_METHOD)
- continue;
+ if (frame.frame_type != AMQP_FRAME_METHOD) {
+ continue;
+ }
printf("Method %s\n", amqp_method_name(frame.payload.method.id));
- if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD)
- continue;
+ if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) {
+ continue;
+ }
d = (amqp_basic_deliver_t *) frame.payload.method.decoded;
printf("Delivery %u, exchange %.*s routingkey %.*s\n",
- (unsigned) d->delivery_tag,
- (int) d->exchange.len, (char *) d->exchange.bytes,
- (int) d->routing_key.len, (char *) d->routing_key.bytes);
+ (unsigned) d->delivery_tag,
+ (int) d->exchange.len, (char *) d->exchange.bytes,
+ (int) d->routing_key.len, (char *) d->routing_key.bytes);
result = amqp_simple_wait_frame(conn, &frame);
- if (result < 0)
- break;
+ if (result < 0) {
+ break;
+ }
if (frame.frame_type != AMQP_FRAME_HEADER) {
- fprintf(stderr, "Expected header!");
- abort();
+ fprintf(stderr, "Expected header!");
+ abort();
}
p = (amqp_basic_properties_t *) frame.payload.properties.decoded;
if (p->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
- printf("Content-type: %.*s\n",
- (int) p->content_type.len, (char *) p->content_type.bytes);
+ printf("Content-type: %.*s\n",
+ (int) p->content_type.len, (char *) p->content_type.bytes);
}
printf("----\n");
@@ -144,26 +149,27 @@ int main(int argc, char const * const *argv) {
body_received = 0;
while (body_received < body_target) {
- result = amqp_simple_wait_frame(conn, &frame);
- if (result < 0)
- break;
+ result = amqp_simple_wait_frame(conn, &frame);
+ if (result < 0) {
+ break;
+ }
- if (frame.frame_type != AMQP_FRAME_BODY) {
- fprintf(stderr, "Expected body!");
- abort();
- }
+ if (frame.frame_type != AMQP_FRAME_BODY) {
+ fprintf(stderr, "Expected body!");
+ abort();
+ }
- body_received += frame.payload.body_fragment.len;
- assert(body_received <= body_target);
+ body_received += frame.payload.body_fragment.len;
+ assert(body_received <= body_target);
- amqp_dump(frame.payload.body_fragment.bytes,
- frame.payload.body_fragment.len);
+ amqp_dump(frame.payload.body_fragment.bytes,
+ frame.payload.body_fragment.len);
}
if (body_received != body_target) {
- /* Can only happen when amqp_simple_wait_frame returns <= 0 */
- /* We break here to close the connection */
- break;
+ /* Can only happen when amqp_simple_wait_frame returns <= 0 */
+ /* We break here to close the connection */
+ break;
}
amqp_basic_ack(conn, 1, d->delivery_tag, 0);
diff --git a/examples/amqps_producer.c b/examples/amqps_producer.c
index 04f5ae8..c67f8ca 100644
--- a/examples/amqps_producer.c
+++ b/examples/amqps_producer.c
@@ -44,9 +44,9 @@
#define SUMMARY_EVERY_US 1000000
static void send_batch(amqp_connection_state_t conn,
- char const *queue_name,
- int rate_limit,
- int message_count)
+ char const *queue_name,
+ int rate_limit,
+ int message_count)
{
uint64_t start_time = now_microseconds();
int i;
@@ -69,20 +69,20 @@ static void send_batch(amqp_connection_state_t conn,
uint64_t now = now_microseconds();
die_on_error(amqp_basic_publish(conn,
- 1,
- amqp_cstring_bytes("amq.direct"),
- amqp_cstring_bytes(queue_name),
- 0,
- 0,
- NULL,
- message_bytes),
- "Publishing");
+ 1,
+ amqp_cstring_bytes("amq.direct"),
+ amqp_cstring_bytes(queue_name),
+ 0,
+ 0,
+ NULL,
+ message_bytes),
+ "Publishing");
sent++;
if (now > next_summary_time) {
int countOverInterval = sent - previous_sent;
double intervalRate = countOverInterval / ((now - previous_report_time) / 1000000.0);
printf("%d ms: Sent %d - %d since last report (%d Hz)\n",
- (int)(now - start_time) / 1000, sent, countOverInterval, (int) intervalRate);
+ (int)(now - start_time) / 1000, sent, countOverInterval, (int) intervalRate);
previous_sent = sent;
previous_report_time = now;
@@ -105,7 +105,8 @@ static void send_batch(amqp_connection_state_t conn,
}
}
-int main(int argc, char const * const *argv) {
+int main(int argc, char const *const *argv)
+{
char const *hostname;
int port, status;
int rate_limit;
@@ -115,7 +116,7 @@ int main(int argc, char const * const *argv) {
if (argc < 5) {
fprintf(stderr, "Usage: amqps_producer host port rate_limit message_count "
- "[cacert.pem [key.pem cert.pem]]\n");
+ "[cacert.pem [key.pem cert.pem]]\n");
return 1;
}
@@ -152,7 +153,7 @@ int main(int argc, char const * const *argv) {
amqp_set_socket(conn, socket);
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
- "Logging in");
+ "Logging in");
amqp_channel_open(conn, 1);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
diff --git a/examples/amqps_sendstring.c b/examples/amqps_sendstring.c
index 2325778..989e73a 100644
--- a/examples/amqps_sendstring.c
+++ b/examples/amqps_sendstring.c
@@ -41,7 +41,8 @@
#include "utils.h"
-int main(int argc, char const * const *argv) {
+int main(int argc, char const *const *argv)
+{
char const *hostname;
int port, status;
char const *exchange;
@@ -52,7 +53,7 @@ int main(int argc, char const * const *argv) {
if (argc < 6) {
fprintf(stderr, "Usage: amqps_sendstring host port exchange routingkey "
- "messagebody [cacert.pem [key.pem cert.pem]]\n");
+ "messagebody [cacert.pem [key.pem cert.pem]]\n");
return 1;
}
@@ -90,7 +91,7 @@ int main(int argc, char const * const *argv) {
amqp_set_socket(conn, socket);
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
- "Logging in");
+ "Logging in");
amqp_channel_open(conn, 1);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
@@ -100,14 +101,14 @@ int main(int argc, char const * const *argv) {
props.content_type = amqp_cstring_bytes("text/plain");
props.delivery_mode = 2; /* persistent delivery mode */
die_on_error(amqp_basic_publish(conn,
- 1,
- amqp_cstring_bytes(exchange),
- amqp_cstring_bytes(routingkey),
- 0,
- 0,
- &props,
- amqp_cstring_bytes(messagebody)),
- "Publishing");
+ 1,
+ amqp_cstring_bytes(exchange),
+ amqp_cstring_bytes(routingkey),
+ 0,
+ 0,
+ &props,
+ amqp_cstring_bytes(messagebody)),
+ "Publishing");
}
die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
diff --git a/examples/amqps_unbind.c b/examples/amqps_unbind.c
index a74992a..dfe0f1a 100644
--- a/examples/amqps_unbind.c
+++ b/examples/amqps_unbind.c
@@ -41,7 +41,8 @@
#include "utils.h"
-int main(int argc, char const * const *argv) {
+int main(int argc, char const *const *argv)
+{
char const *hostname;
int port, status;
char const *exchange;
@@ -52,7 +53,7 @@ int main(int argc, char const * const *argv) {
if (argc < 6) {
fprintf(stderr, "Usage: amqps_unbind host port exchange bindingkey queue "
- "[cacert.pem [key.pem cert.pem]]\n");
+ "[cacert.pem [key.pem cert.pem]]\n");
return 1;
}
@@ -90,15 +91,15 @@ int main(int argc, char const * const *argv) {
amqp_set_socket(conn, socket);
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
- "Logging in");
+ "Logging in");
amqp_channel_open(conn, 1);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
amqp_queue_unbind(conn, 1,
- amqp_cstring_bytes(queue),
- amqp_cstring_bytes(exchange),
- amqp_cstring_bytes(bindingkey),
- amqp_empty_table);
+ amqp_cstring_bytes(queue),
+ amqp_cstring_bytes(exchange),
+ amqp_cstring_bytes(bindingkey),
+ amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Unbinding");
die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
diff --git a/examples/utils.c b/examples/utils.c
index 0830738..4b00470 100644
--- a/examples/utils.c
+++ b/examples/utils.c
@@ -56,7 +56,8 @@ void die(const char *fmt, ...)
exit(1);
}
-void die_on_error(int x, char const *context) {
+void die_on_error(int x, char const *context)
+{
if (x < 0) {
char *errstr = amqp_error_string(-x);
fprintf(stderr, "%s: %s\n", context, errstr);
diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h
index 4336fcd..5167f2e 100644
--- a/librabbitmq/amqp.h
+++ b/librabbitmq/amqp.h
@@ -389,9 +389,9 @@ int
AMQP_CALL amqp_get_sockfd(amqp_connection_state_t state);
AMQP_DEPRECATED(
-AMQP_PUBLIC_FUNCTION
-void
-AMQP_CALL amqp_set_sockfd(amqp_connection_state_t state, int sockfd)
+ AMQP_PUBLIC_FUNCTION
+ void
+ AMQP_CALL amqp_set_sockfd(amqp_connection_state_t state, int sockfd)
);
AMQP_PUBLIC_FUNCTION
@@ -512,8 +512,8 @@ AMQP_CALL amqp_login(amqp_connection_state_t state, char const *vhost,
AMQP_PUBLIC_FUNCTION
amqp_rpc_reply_t
AMQP_CALL amqp_login_with_properties(amqp_connection_state_t state, char const *vhost,
- int channel_max, int frame_max, int heartbeat,
- const amqp_table_t *properties, amqp_sasl_method_enum sasl_method, ...);
+ int channel_max, int frame_max, int heartbeat,
+ const amqp_table_t *properties, amqp_sasl_method_enum sasl_method, ...);
struct amqp_basic_properties_t_;
diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c
index 677038b..2d50ad3 100644
--- a/librabbitmq/amqp_connection.c
+++ b/librabbitmq/amqp_connection.c
@@ -170,8 +170,9 @@ int amqp_destroy_connection(amqp_connection_state_t state)
empty_amqp_pool(&state->decoding_pool);
free(state->outbound_buffer.bytes);
free(state->sock_inbound_buffer.bytes);
- if (amqp_socket_close(state->socket) < 0)
+ if (amqp_socket_close(state->socket) < 0) {
status = -amqp_socket_error(state->socket);
+ }
free(state);
}
return status;
diff --git a/librabbitmq/amqp_cyassl.c b/librabbitmq/amqp_cyassl.c
index 9e2dcfe..5bcfe6d 100644
--- a/librabbitmq/amqp_cyassl.c
+++ b/librabbitmq/amqp_cyassl.c
@@ -31,193 +31,193 @@
#include <stdlib.h>
struct amqp_ssl_socket_t {
- CYASSL_CTX *ctx;
- CYASSL *ssl;
- int sockfd;
- char *buffer;
- size_t length;
+ CYASSL_CTX *ctx;
+ CYASSL *ssl;
+ int sockfd;
+ char *buffer;
+ size_t length;
};
static ssize_t
amqp_ssl_socket_send(void *base,
- const void *buf,
- size_t len,
- AMQP_UNUSED int flags)
+ const void *buf,
+ size_t len,
+ AMQP_UNUSED int flags)
{
- struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
- return CyaSSL_write(self->ssl, buf, len);
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+ return CyaSSL_write(self->ssl, buf, len);
}
static ssize_t
amqp_ssl_socket_writev(void *base,
- const struct iovec *iov,
- int iovcnt)
+ const struct iovec *iov,
+ int iovcnt)
{
- struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
- ssize_t written = -1;
- char *bufferp;
- size_t bytes;
- int i;
- bytes = 0;
- for (i = 0; i < iovcnt; ++i) {
- bytes += iov[i].iov_len;
- }
- if (self->length < bytes) {
- free(self->buffer);
- self->buffer = malloc(bytes);
- if (!self->buffer) {
- self->length = 0;
- goto exit;
- }
- self->length = bytes;
- }
- bufferp = self->buffer;
- for (i = 0; i < iovcnt; ++i) {
- memcpy(bufferp, iov[i].iov_base, iov[i].iov_len);
- bufferp += iov[i].iov_len;
- }
- written = CyaSSL_write(self->ssl, self->buffer, bytes);
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+ ssize_t written = -1;
+ char *bufferp;
+ size_t bytes;
+ int i;
+ bytes = 0;
+ for (i = 0; i < iovcnt; ++i) {
+ bytes += iov[i].iov_len;
+ }
+ if (self->length < bytes) {
+ free(self->buffer);
+ self->buffer = malloc(bytes);
+ if (!self->buffer) {
+ self->length = 0;
+ goto exit;
+ }
+ self->length = bytes;
+ }
+ bufferp = self->buffer;
+ for (i = 0; i < iovcnt; ++i) {
+ memcpy(bufferp, iov[i].iov_base, iov[i].iov_len);
+ bufferp += iov[i].iov_len;
+ }
+ written = CyaSSL_write(self->ssl, self->buffer, bytes);
exit:
- return written;
+ return written;
}
static ssize_t
amqp_ssl_socket_recv(void *base,
- void *buf,
- size_t len,
- AMQP_UNUSED int flags)
+ void *buf,
+ size_t len,
+ AMQP_UNUSED int flags)
{
- struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
- return CyaSSL_read(self->ssl, buf, len);
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+ return CyaSSL_read(self->ssl, buf, len);
}
static int
amqp_ssl_socket_get_sockfd(void *base)
{
- struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
- return self->sockfd;
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+ return self->sockfd;
}
static int
amqp_ssl_socket_close(void *base)
{
- int status = -1;
- struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
- if (self->sockfd >= 0) {
- status = amqp_os_socket_close(self->sockfd);
- }
- if (self) {
- CyaSSL_free(self->ssl);
- CyaSSL_CTX_free(self->ctx);
- free(self->buffer);
- free(self);
- }
- return status;
+ int status = -1;
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+ if (self->sockfd >= 0) {
+ status = amqp_os_socket_close(self->sockfd);
+ }
+ if (self) {
+ CyaSSL_free(self->ssl);
+ CyaSSL_CTX_free(self->ctx);
+ free(self->buffer);
+ free(self);
+ }
+ return status;
}
static int
amqp_ssl_socket_error(AMQP_UNUSED void *user_data)
{
- return -1;
+ return -1;
}
static int
amqp_ssl_socket_open(void *base, const char *host, int port)
{
- struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
- int status;
- self->sockfd = amqp_open_socket(host, port);
- if (0 > self->sockfd) {
- return -1;
- }
- CyaSSL_set_fd(self->ssl, self->sockfd);
- status = CyaSSL_connect(self->ssl);
- if (SSL_SUCCESS != status) {
- return -1;
- }
- return 0;
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+ int status;
+ self->sockfd = amqp_open_socket(host, port);
+ if (0 > self->sockfd) {
+ return -1;
+ }
+ CyaSSL_set_fd(self->ssl, self->sockfd);
+ status = CyaSSL_connect(self->ssl);
+ if (SSL_SUCCESS != status) {
+ return -1;
+ }
+ return 0;
}
static const struct amqp_socket_class_t amqp_ssl_socket_class = {
- amqp_ssl_socket_writev, /* writev */
- amqp_ssl_socket_send, /* send */
- amqp_ssl_socket_recv, /* recv */
- amqp_ssl_socket_open, /* open */
- amqp_ssl_socket_close, /* close */
- amqp_ssl_socket_error, /* error */
- amqp_ssl_socket_get_sockfd /* get_sockfd */
+ amqp_ssl_socket_writev, /* writev */
+ amqp_ssl_socket_send, /* send */
+ amqp_ssl_socket_recv, /* recv */
+ amqp_ssl_socket_open, /* open */
+ amqp_ssl_socket_close, /* close */
+ amqp_ssl_socket_error, /* error */
+ amqp_ssl_socket_get_sockfd /* get_sockfd */
};
amqp_socket_t *
amqp_ssl_socket_new(void)
{
- struct amqp_ssl_socket_t *self = calloc(1, sizeof(*self));
- if (!self) {
- goto error;
- }
- CyaSSL_Init();
- self->ctx = CyaSSL_CTX_new(CyaSSLv23_client_method());
- if (!self->ctx) {
- goto error;
- }
- return (amqp_socket_t *)self;
+ struct amqp_ssl_socket_t *self = calloc(1, sizeof(*self));
+ if (!self) {
+ goto error;
+ }
+ CyaSSL_Init();
+ self->ctx = CyaSSL_CTX_new(CyaSSLv23_client_method());
+ if (!self->ctx) {
+ goto error;
+ }
+ return (amqp_socket_t *)self;
error:
- amqp_socket_close((amqp_socket_t *)self);
- return NULL;
+ amqp_socket_close((amqp_socket_t *)self);
+ return NULL;
}
int
amqp_ssl_socket_set_cacert(amqp_socket_t *base,
- const char *cacert)
+ const char *cacert)
{
- int status;
- struct amqp_ssl_socket_t *self;
- if (base->klass != &amqp_ssl_socket_class) {
- amqp_abort("<%p> is not of type amqp_ssl_socket_t", base);
- }
- self = (struct amqp_ssl_socket_t *)base;
- status = CyaSSL_CTX_load_verify_locations(self->ctx, cacert, NULL);
- if (SSL_SUCCESS != status) {
- return -1;
- }
- return 0;
+ int status;
+ struct amqp_ssl_socket_t *self;
+ if (base->klass != &amqp_ssl_socket_class) {
+ amqp_abort("<%p> is not of type amqp_ssl_socket_t", base);
+ }
+ self = (struct amqp_ssl_socket_t *)base;
+ status = CyaSSL_CTX_load_verify_locations(self->ctx, cacert, NULL);
+ if (SSL_SUCCESS != status) {
+ return -1;
+ }
+ return 0;
}
int
amqp_ssl_socket_set_key(amqp_socket_t *base,
- const char *cert,
- const char *key)
+ const char *cert,
+ const char *key)
{
- int status;
- struct amqp_ssl_socket_t *self;
- if (base->klass != &amqp_ssl_socket_class) {
- amqp_abort("<%p> is not of type amqp_ssl_socket_t", base);
- }
- self = (struct amqp_ssl_socket_t *)base;
- status = CyaSSL_CTX_use_PrivateKey_file(self->ctx, key,
- SSL_FILETYPE_PEM);
- if (SSL_SUCCESS != status) {
- return -1;
- }
- status = CyaSSL_CTX_use_certificate_chain_file(self->ctx, cert);
- return 0;
+ int status;
+ struct amqp_ssl_socket_t *self;
+ if (base->klass != &amqp_ssl_socket_class) {
+ amqp_abort("<%p> is not of type amqp_ssl_socket_t", base);
+ }
+ self = (struct amqp_ssl_socket_t *)base;
+ status = CyaSSL_CTX_use_PrivateKey_file(self->ctx, key,
+ SSL_FILETYPE_PEM);
+ if (SSL_SUCCESS != status) {
+ return -1;
+ }
+ status = CyaSSL_CTX_use_certificate_chain_file(self->ctx, cert);
+ return 0;
}
int
amqp_ssl_socket_set_key_buffer(AMQP_UNUSED amqp_socket_t *base,
- AMQP_UNUSED const char *cert,
- AMQP_UNUSED const void *key,
- AMQP_UNUSED size_t n)
+ AMQP_UNUSED const char *cert,
+ AMQP_UNUSED const void *key,
+ AMQP_UNUSED size_t n)
{
- amqp_abort("%s is not implemented for CyaSSL", __func__);
- return -1;
+ amqp_abort("%s is not implemented for CyaSSL", __func__);
+ return -1;
}
void
amqp_ssl_socket_set_verify(AMQP_UNUSED amqp_socket_t *base,
- AMQP_UNUSED amqp_boolean_t verify)
+ AMQP_UNUSED amqp_boolean_t verify)
{
- /* noop for CyaSSL */
+ /* noop for CyaSSL */
}
void
diff --git a/librabbitmq/amqp_gnutls.c b/librabbitmq/amqp_gnutls.c
index 1db7a0b..64b602c 100644
--- a/librabbitmq/amqp_gnutls.c
+++ b/librabbitmq/amqp_gnutls.c
@@ -32,280 +32,280 @@
#include <stdlib.h>
struct amqp_ssl_socket_t {
- gnutls_session_t session;
- gnutls_certificate_credentials_t credentials;
- int sockfd;
- char *host;
- char *buffer;
- size_t length;
+ gnutls_session_t session;
+ gnutls_certificate_credentials_t credentials;
+ int sockfd;
+ char *host;
+ char *buffer;
+ size_t length;
};
static ssize_t
amqp_ssl_socket_send(void *base,
- const void *buf,
- size_t len,
- AMQP_UNUSED int flags)
+ const void *buf,
+ size_t len,
+ AMQP_UNUSED int flags)
{
- struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
- return gnutls_record_send(self->session, buf, len);
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+ return gnutls_record_send(self->session, buf, len);
}
static ssize_t
amqp_ssl_socket_writev(void *base,
- const struct iovec *iov,
- int iovcnt)
+ const struct iovec *iov,
+ int iovcnt)
{
- struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
- ssize_t written = -1;
- char *bufferp;
- size_t bytes;
- int i;
- bytes = 0;
- for (i = 0; i < iovcnt; ++i) {
- bytes += iov[i].iov_len;
- }
- if (self->length < bytes) {
- free(self->buffer);
- self->buffer = malloc(bytes);
- if (!self->buffer) {
- self->length = 0;
- goto exit;
- }
- self->length = 0;
- }
- bufferp = self->buffer;
- for (i = 0; i < iovcnt; ++i) {
- memcpy(bufferp, iov[i].iov_base, iov[i].iov_len);
- bufferp += iov[i].iov_len;
- }
- written = gnutls_record_send(self->session, self->buffer, bytes);
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+ ssize_t written = -1;
+ char *bufferp;
+ size_t bytes;
+ int i;
+ bytes = 0;
+ for (i = 0; i < iovcnt; ++i) {
+ bytes += iov[i].iov_len;
+ }
+ if (self->length < bytes) {
+ free(self->buffer);
+ self->buffer = malloc(bytes);
+ if (!self->buffer) {
+ self->length = 0;
+ goto exit;
+ }
+ self->length = 0;
+ }
+ bufferp = self->buffer;
+ for (i = 0; i < iovcnt; ++i) {
+ memcpy(bufferp, iov[i].iov_base, iov[i].iov_len);
+ bufferp += iov[i].iov_len;
+ }
+ written = gnutls_record_send(self->session, self->buffer, bytes);
exit:
- return written;
+ return written;
}
static ssize_t
amqp_ssl_socket_recv(void *base,
- void *buf,
- size_t len,
- AMQP_UNUSED int flags)
+ void *buf,
+ size_t len,
+ AMQP_UNUSED int flags)
{
- struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
- return gnutls_record_recv(self->session, buf, len);
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+ return gnutls_record_recv(self->session, buf, len);
}
static int
amqp_ssl_socket_open(void *base, const char *host, int port)
{
- struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
- int status;
- self->sockfd = amqp_open_socket(host, port);
- if (0 > self->sockfd) {
- return -1;
- }
- gnutls_transport_set_ptr(self->session,
- (gnutls_transport_ptr_t)self->sockfd);
- do {
- status = gnutls_handshake(self->session);
- } while (status < 0 && !gnutls_error_is_fatal(status));
- return status;
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+ int status;
+ self->sockfd = amqp_open_socket(host, port);
+ if (0 > self->sockfd) {
+ return -1;
+ }
+ gnutls_transport_set_ptr(self->session,
+ (gnutls_transport_ptr_t)self->sockfd);
+ do {
+ status = gnutls_handshake(self->session);
+ } while (status < 0 && !gnutls_error_is_fatal(status));
+ return status;
}
static int
amqp_ssl_socket_close(void *base)
{
- int status = -1;
- struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
- if (self->sockfd >= 0) {
- status = amqp_os_socket_close(self->sockfd);
- }
- if (self) {
- gnutls_deinit(self->session);
- gnutls_certificate_free_credentials(self->credentials);
- free(self->host);
- free(self->buffer);
- free(self);
- }
- return status;
+ int status = -1;
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+ if (self->sockfd >= 0) {
+ status = amqp_os_socket_close(self->sockfd);
+ }
+ if (self) {
+ gnutls_deinit(self->session);
+ gnutls_certificate_free_credentials(self->credentials);
+ free(self->host);
+ free(self->buffer);
+ free(self);
+ }
+ return status;
}
static int
amqp_ssl_socket_error(AMQP_UNUSED void *user_data)
{
- return -1;
+ return -1;
}
static int
amqp_ssl_socket_get_sockfd(void *base)
{
- struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
- return self->sockfd;
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+ return self->sockfd;
}
static int
amqp_ssl_verify(gnutls_session_t session)
{
- int ret;
- unsigned int status, size;
- const gnutls_datum_t *list;
- gnutls_x509_crt_t cert = NULL;
- struct amqp_ssl_socket_t *self = gnutls_session_get_ptr(session);
- ret = gnutls_certificate_verify_peers2(session, &status);
- if (0 > ret) {
- goto error;
- }
- if (status & GNUTLS_CERT_INVALID) {
- goto error;
- }
- if (status & GNUTLS_CERT_SIGNER_NOT_FOUND) {
- goto error;
- }
- if (status & GNUTLS_CERT_REVOKED) {
- goto error;
- }
- if (status & GNUTLS_CERT_EXPIRED) {
- goto error;
- }
- if (status & GNUTLS_CERT_NOT_ACTIVATED) {
- goto error;
- }
- if (gnutls_certificate_type_get(session) != GNUTLS_CRT_X509) {
- goto error;
- }
- if (gnutls_x509_crt_init(&cert) < 0) {
- goto error;
- }
- list = gnutls_certificate_get_peers(session, &size);
- if (!list) {
- goto error;
- }
- ret = gnutls_x509_crt_import(cert, &list[0], GNUTLS_X509_FMT_DER);
- if (0 > ret) {
- goto error;
- }
- if (!gnutls_x509_crt_check_hostname(cert, self->host)) {
- goto error;
- }
- gnutls_x509_crt_deinit(cert);
- return 0;
+ int ret;
+ unsigned int status, size;
+ const gnutls_datum_t *list;
+ gnutls_x509_crt_t cert = NULL;
+ struct amqp_ssl_socket_t *self = gnutls_session_get_ptr(session);
+ ret = gnutls_certificate_verify_peers2(session, &status);
+ if (0 > ret) {
+ goto error;
+ }
+ if (status & GNUTLS_CERT_INVALID) {
+ goto error;
+ }
+ if (status & GNUTLS_CERT_SIGNER_NOT_FOUND) {
+ goto error;
+ }
+ if (status & GNUTLS_CERT_REVOKED) {
+ goto error;
+ }
+ if (status & GNUTLS_CERT_EXPIRED) {
+ goto error;
+ }
+ if (status & GNUTLS_CERT_NOT_ACTIVATED) {
+ goto error;
+ }
+ if (gnutls_certificate_type_get(session) != GNUTLS_CRT_X509) {
+ goto error;
+ }
+ if (gnutls_x509_crt_init(&cert) < 0) {
+ goto error;
+ }
+ list = gnutls_certificate_get_peers(session, &size);
+ if (!list) {
+ goto error;
+ }
+ ret = gnutls_x509_crt_import(cert, &list[0], GNUTLS_X509_FMT_DER);
+ if (0 > ret) {
+ goto error;
+ }
+ if (!gnutls_x509_crt_check_hostname(cert, self->host)) {
+ goto error;
+ }
+ gnutls_x509_crt_deinit(cert);
+ return 0;
error:
- if (cert) {
- gnutls_x509_crt_deinit (cert);
- }
- return GNUTLS_E_CERTIFICATE_ERROR;
+ if (cert) {
+ gnutls_x509_crt_deinit (cert);
+ }
+ return GNUTLS_E_CERTIFICATE_ERROR;
}
static const struct amqp_socket_class_t amqp_ssl_socket_class = {
- amqp_ssl_socket_writev, /* writev */
- amqp_ssl_socket_send, /* send */
- amqp_ssl_socket_recv, /* recv */
- amqp_ssl_socket_open, /* open */
- amqp_ssl_socket_close, /* close */
- amqp_ssl_socket_error, /* error */
- amqp_ssl_socket_get_sockfd /* get_sockfd */
+ amqp_ssl_socket_writev, /* writev */
+ amqp_ssl_socket_send, /* send */
+ amqp_ssl_socket_recv, /* recv */
+ amqp_ssl_socket_open, /* open */
+ amqp_ssl_socket_close, /* close */
+ amqp_ssl_socket_error, /* error */
+ amqp_ssl_socket_get_sockfd /* get_sockfd */
};
amqp_socket_t *
amqp_ssl_socket_new(void)
{
- struct amqp_ssl_socket_t *self = calloc(1, sizeof(*self));
- const char *error;
- int status;
- if (!self) {
- goto error;
- }
- gnutls_global_init();
- status = gnutls_init(&self->session, GNUTLS_CLIENT);
- if (GNUTLS_E_SUCCESS != status) {
- goto error;
- }
- status = gnutls_certificate_allocate_credentials(&self->credentials);
- if (GNUTLS_E_SUCCESS != status) {
- goto error;
- }
- gnutls_certificate_set_verify_function(self->credentials,
- amqp_ssl_verify);
- status = gnutls_credentials_set(self->session, GNUTLS_CRD_CERTIFICATE,
- self->credentials);
- if (GNUTLS_E_SUCCESS != status) {
- goto error;
- }
- gnutls_session_set_ptr(self->session, self);
- status = gnutls_priority_set_direct(self->session, "NORMAL", &error);
- if (GNUTLS_E_SUCCESS != status) {
- goto error;
- }
- return (amqp_socket_t *)self;
+ struct amqp_ssl_socket_t *self = calloc(1, sizeof(*self));
+ const char *error;
+ int status;
+ if (!self) {
+ goto error;
+ }
+ gnutls_global_init();
+ status = gnutls_init(&self->session, GNUTLS_CLIENT);
+ if (GNUTLS_E_SUCCESS != status) {
+ goto error;
+ }
+ status = gnutls_certificate_allocate_credentials(&self->credentials);
+ if (GNUTLS_E_SUCCESS != status) {
+ goto error;
+ }
+ gnutls_certificate_set_verify_function(self->credentials,
+ amqp_ssl_verify);
+ status = gnutls_credentials_set(self->session, GNUTLS_CRD_CERTIFICATE,
+ self->credentials);
+ if (GNUTLS_E_SUCCESS != status) {
+ goto error;
+ }
+ gnutls_session_set_ptr(self->session, self);
+ status = gnutls_priority_set_direct(self->session, "NORMAL", &error);
+ if (GNUTLS_E_SUCCESS != status) {
+ goto error;
+ }
+ return (amqp_socket_t *)self;
error:
- amqp_socket_close((amqp_socket_t *)self);
- return NULL;
+ amqp_socket_close((amqp_socket_t *)self);
+ return NULL;
}
int
amqp_ssl_socket_set_cacert(amqp_socket_t *base,
- const char *cacert)
+ const char *cacert)
{
- int status;
- struct amqp_ssl_socket_t *self;
- if (base->klass != &amqp_ssl_socket_class) {
- amqp_abort("<%p> is not of type amqp_ssl_socket_t", base);
- }
- self = (struct amqp_ssl_socket_t *)base;
- status = gnutls_certificate_set_x509_trust_file(self->credentials,
- cacert,
- GNUTLS_X509_FMT_PEM);
- if (0 > status) {
- return -1;
- }
- return 0;
+ int status;
+ struct amqp_ssl_socket_t *self;
+ if (base->klass != &amqp_ssl_socket_class) {
+ amqp_abort("<%p> is not of type amqp_ssl_socket_t", base);
+ }
+ self = (struct amqp_ssl_socket_t *)base;
+ status = gnutls_certificate_set_x509_trust_file(self->credentials,
+ cacert,
+ GNUTLS_X509_FMT_PEM);
+ if (0 > status) {
+ return -1;
+ }
+ return 0;
}
int
amqp_ssl_socket_set_key(amqp_socket_t *base,
- const char *cert,
- const char *key)
+ const char *cert,
+ const char *key)
{
- int status;
- struct amqp_ssl_socket_t *self;
- if (base->klass != &amqp_ssl_socket_class) {
- amqp_abort("<%p> is not of type amqp_ssl_socket_t", base);
- }
- self = (struct amqp_ssl_socket_t *)base;
- status = gnutls_certificate_set_x509_key_file(self->credentials,
- cert,
- key,
- GNUTLS_X509_FMT_PEM);
- if (0 > status) {
- return -1;
- }
- return 0;
+ int status;
+ struct amqp_ssl_socket_t *self;
+ if (base->klass != &amqp_ssl_socket_class) {
+ amqp_abort("<%p> is not of type amqp_ssl_socket_t", base);
+ }
+ self = (struct amqp_ssl_socket_t *)base;
+ status = gnutls_certificate_set_x509_key_file(self->credentials,
+ cert,
+ key,
+ GNUTLS_X509_FMT_PEM);
+ if (0 > status) {
+ return -1;
+ }
+ return 0;
}
int
amqp_ssl_socket_set_key_buffer(AMQP_UNUSED amqp_socket_t *base,
- AMQP_UNUSED const char *cert,
- AMQP_UNUSED const void *key,
- AMQP_UNUSED size_t n)
+ AMQP_UNUSED const char *cert,
+ AMQP_UNUSED const void *key,
+ AMQP_UNUSED size_t n)
{
- amqp_abort("%s is not implemented for GnuTLS", __func__);
- return -1;
+ amqp_abort("%s is not implemented for GnuTLS", __func__);
+ return -1;
}
void
amqp_ssl_socket_set_verify(amqp_socket_t *base,
- amqp_boolean_t verify)
+ amqp_boolean_t verify)
{
- struct amqp_ssl_socket_t *self;
- if (base->klass != &amqp_ssl_socket_class) {
- amqp_abort("<%p> is not of type amqp_ssl_socket_t", base);
- }
- self = (struct amqp_ssl_socket_t *)base;
- if (verify) {
- gnutls_certificate_set_verify_function(self->credentials,
- amqp_ssl_verify);
- } else {
- gnutls_certificate_set_verify_function(self->credentials,
- NULL);
- }
+ struct amqp_ssl_socket_t *self;
+ if (base->klass != &amqp_ssl_socket_class) {
+ amqp_abort("<%p> is not of type amqp_ssl_socket_t", base);
+ }
+ self = (struct amqp_ssl_socket_t *)base;
+ if (verify) {
+ gnutls_certificate_set_verify_function(self->credentials,
+ amqp_ssl_verify);
+ } else {
+ gnutls_certificate_set_verify_function(self->credentials,
+ NULL);
+ }
}
void
diff --git a/librabbitmq/amqp_openssl.c b/librabbitmq/amqp_openssl.c
index d84fab7..748f249 100644
--- a/librabbitmq/amqp_openssl.c
+++ b/librabbitmq/amqp_openssl.c
@@ -57,378 +57,378 @@ static pthread_mutex_t *amqp_openssl_lockarray = NULL;
#endif /* ENABLE_THREAD_SAFETY */
struct amqp_ssl_socket_t {
- const struct amqp_socket_class_t *klass;
- SSL_CTX *ctx;
- int sockfd;
- SSL *ssl;
- char *buffer;
- size_t length;
- amqp_boolean_t verify;
+ const struct amqp_socket_class_t *klass;
+ SSL_CTX *ctx;
+ int sockfd;
+ SSL *ssl;
+ char *buffer;
+ size_t length;
+ amqp_boolean_t verify;
};
static ssize_t
amqp_ssl_socket_send(void *base,
- const void *buf,
- size_t len,
- AMQP_UNUSED int flags)
+ const void *buf,
+ size_t len,
+ AMQP_UNUSED int flags)
{
- struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
- ssize_t sent;
- ERR_clear_error();
- sent = SSL_write(self->ssl, buf, len);
- if (0 > sent) {
- switch (SSL_get_error(self->ssl, sent)) {
- case SSL_ERROR_NONE:
- case SSL_ERROR_ZERO_RETURN:
- case SSL_ERROR_WANT_READ:
- case SSL_ERROR_WANT_WRITE:
- sent = 0;
- break;
- }
- }
- return sent;
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+ ssize_t sent;
+ ERR_clear_error();
+ sent = SSL_write(self->ssl, buf, len);
+ if (0 > sent) {
+ switch (SSL_get_error(self->ssl, sent)) {
+ case SSL_ERROR_NONE:
+ case SSL_ERROR_ZERO_RETURN:
+ case SSL_ERROR_WANT_READ:
+ case SSL_ERROR_WANT_WRITE:
+ sent = 0;
+ break;
+ }
+ }
+ return sent;
}
static ssize_t
amqp_ssl_socket_writev(void *base,
- const struct iovec *iov,
- int iovcnt)
+ const struct iovec *iov,
+ int iovcnt)
{
- struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
- ssize_t written = -1;
- char *bufferp;
- size_t bytes;
- int i;
- bytes = 0;
- for (i = 0; i < iovcnt; ++i) {
- bytes += iov[i].iov_len;
- }
- if (self->length < bytes) {
- free(self->buffer);
- self->buffer = malloc(bytes);
- if (!self->buffer) {
- self->length = 0;
- goto exit;
- }
- self->length = bytes;
- }
- bufferp = self->buffer;
- for (i = 0; i < iovcnt; ++i) {
- memcpy(bufferp, iov[i].iov_base, iov[i].iov_len);
- bufferp += iov[i].iov_len;
- }
- written = amqp_ssl_socket_send(self, self->buffer, bytes, 0);
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+ ssize_t written = -1;
+ char *bufferp;
+ size_t bytes;
+ int i;
+ bytes = 0;
+ for (i = 0; i < iovcnt; ++i) {
+ bytes += iov[i].iov_len;
+ }
+ if (self->length < bytes) {
+ free(self->buffer);
+ self->buffer = malloc(bytes);
+ if (!self->buffer) {
+ self->length = 0;
+ goto exit;
+ }
+ self->length = bytes;
+ }
+ bufferp = self->buffer;
+ for (i = 0; i < iovcnt; ++i) {
+ memcpy(bufferp, iov[i].iov_base, iov[i].iov_len);
+ bufferp += iov[i].iov_len;
+ }
+ written = amqp_ssl_socket_send(self, self->buffer, bytes, 0);
exit:
- return written;
+ return written;
}
static ssize_t
amqp_ssl_socket_recv(void *base,
- void *buf,
- size_t len,
- AMQP_UNUSED int flags)
+ void *buf,
+ size_t len,
+ AMQP_UNUSED int flags)
{
- struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
- ssize_t received;
- ERR_clear_error();
- received = SSL_read(self->ssl, buf, len);
- if (0 > received) {
- switch(SSL_get_error(self->ssl, received)) {
- case SSL_ERROR_WANT_READ:
- case SSL_ERROR_WANT_WRITE:
- received = 0;
- break;
- }
- }
- return received;
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+ ssize_t received;
+ ERR_clear_error();
+ received = SSL_read(self->ssl, buf, len);
+ if (0 > received) {
+ switch(SSL_get_error(self->ssl, received)) {
+ case SSL_ERROR_WANT_READ:
+ case SSL_ERROR_WANT_WRITE:
+ received = 0;
+ break;
+ }
+ }
+ return received;
}
static int
amqp_ssl_socket_verify(void *base, const char *host)
{
- struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
- unsigned char *utf8_value = NULL, *cp, ch;
- int pos, utf8_length, status = 0;
- ASN1_STRING *entry_string;
- X509_NAME_ENTRY *entry;
- X509_NAME *name;
- X509 *peer;
- peer = SSL_get_peer_certificate(self->ssl);
- if (!peer) {
- goto error;
- }
- name = X509_get_subject_name(peer);
- if (!name) {
- goto error;
- }
- pos = X509_NAME_get_index_by_NID(name, NID_commonName, -1);
- if (0 > pos) {
- goto error;
- }
- entry = X509_NAME_get_entry(name, pos);
- if (!entry) {
- goto error;
- }
- entry_string = X509_NAME_ENTRY_get_data(entry);
- if (!entry_string) {
- goto error;
- }
- utf8_length = ASN1_STRING_to_UTF8(&utf8_value, entry_string);
- if (0 > utf8_length) {
- goto error;
- }
- while (utf8_length > 0 && utf8_value[utf8_length - 1] == 0) {
- --utf8_length;
- }
- if (utf8_length >= 256) {
- goto error;
- }
- if ((size_t)utf8_length != strlen((char *)utf8_value)) {
- goto error;
- }
- for (cp = utf8_value; (ch = *cp) != '\0'; ++cp) {
- if (isascii(ch) && !isprint(ch)) {
- goto error;
- }
- }
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+ unsigned char *utf8_value = NULL, *cp, ch;
+ int pos, utf8_length, status = 0;
+ ASN1_STRING *entry_string;
+ X509_NAME_ENTRY *entry;
+ X509_NAME *name;
+ X509 *peer;
+ peer = SSL_get_peer_certificate(self->ssl);
+ if (!peer) {
+ goto error;
+ }
+ name = X509_get_subject_name(peer);
+ if (!name) {
+ goto error;
+ }
+ pos = X509_NAME_get_index_by_NID(name, NID_commonName, -1);
+ if (0 > pos) {
+ goto error;
+ }
+ entry = X509_NAME_get_entry(name, pos);
+ if (!entry) {
+ goto error;
+ }
+ entry_string = X509_NAME_ENTRY_get_data(entry);
+ if (!entry_string) {
+ goto error;
+ }
+ utf8_length = ASN1_STRING_to_UTF8(&utf8_value, entry_string);
+ if (0 > utf8_length) {
+ goto error;
+ }
+ while (utf8_length > 0 && utf8_value[utf8_length - 1] == 0) {
+ --utf8_length;
+ }
+ if (utf8_length >= 256) {
+ goto error;
+ }
+ if ((size_t)utf8_length != strlen((char *)utf8_value)) {
+ goto error;
+ }
+ for (cp = utf8_value; (ch = *cp) != '\0'; ++cp) {
+ if (isascii(ch) && !isprint(ch)) {
+ goto error;
+ }
+ }
#ifdef _MSC_VER
#define strcasecmp _stricmp
#endif
- if (strcasecmp(host, (char *)utf8_value)) {
- goto error;
- }
+ if (strcasecmp(host, (char *)utf8_value)) {
+ goto error;
+ }
#ifdef _MSC_VER
#undef strcasecmp
#endif
exit:
- OPENSSL_free(utf8_value);
- return status;
+ OPENSSL_free(utf8_value);
+ return status;
error:
- status = -1;
- goto exit;
+ status = -1;
+ goto exit;
}
static int
amqp_ssl_socket_open(void *base, const char *host, int port)
{
- struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
- long result;
- int status;
- self->ssl = SSL_new(self->ctx);
- if (!self->ssl) {
- return -1;
- }
- SSL_set_mode(self->ssl, SSL_MODE_AUTO_RETRY);
- self->sockfd = amqp_open_socket(host, port);
- if (0 > self->sockfd) {
- return -1;
- }
- status = SSL_set_fd(self->ssl, self->sockfd);
- if (!status) {
- return -1;
- }
- status = SSL_connect(self->ssl);
- if (!status) {
- return -1;
- }
- result = SSL_get_verify_result(self->ssl);
- if (X509_V_OK != result) {
- return -1;
- }
- if (self->verify) {
- int status = amqp_ssl_socket_verify(self, host);
- if (status) {
- return -1;
- }
- }
- return 0;
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+ long result;
+ int status;
+ self->ssl = SSL_new(self->ctx);
+ if (!self->ssl) {
+ return -1;
+ }
+ SSL_set_mode(self->ssl, SSL_MODE_AUTO_RETRY);
+ self->sockfd = amqp_open_socket(host, port);
+ if (0 > self->sockfd) {
+ return -1;
+ }
+ status = SSL_set_fd(self->ssl, self->sockfd);
+ if (!status) {
+ return -1;
+ }
+ status = SSL_connect(self->ssl);
+ if (!status) {
+ return -1;
+ }
+ result = SSL_get_verify_result(self->ssl);
+ if (X509_V_OK != result) {
+ return -1;
+ }
+ if (self->verify) {
+ int status = amqp_ssl_socket_verify(self, host);
+ if (status) {
+ return -1;
+ }
+ }
+ return 0;
}
static int
amqp_ssl_socket_close(void *base)
{
- struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
- if (self) {
- SSL_free(self->ssl);
- amqp_os_socket_close(self->sockfd);
- SSL_CTX_free(self->ctx);
- free(self->buffer);
- free(self);
- }
- destroy_openssl();
- return 0;
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+ if (self) {
+ SSL_free(self->ssl);
+ amqp_os_socket_close(self->sockfd);
+ SSL_CTX_free(self->ctx);
+ free(self->buffer);
+ free(self);
+ }
+ destroy_openssl();
+ return 0;
}
static int
amqp_ssl_socket_error(AMQP_UNUSED void *base)
{
- return -1;
+ return -1;
}
static int
amqp_ssl_socket_get_sockfd(void *base)
{
- struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
- return self->sockfd;
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+ return self->sockfd;
}
static const struct amqp_socket_class_t amqp_ssl_socket_class = {
- amqp_ssl_socket_writev, /* writev */
- amqp_ssl_socket_send, /* send */
- amqp_ssl_socket_recv, /* recv */
- amqp_ssl_socket_open, /* open */
- amqp_ssl_socket_close, /* close */
- amqp_ssl_socket_error, /* error */
- amqp_ssl_socket_get_sockfd /* get_sockfd */
+ amqp_ssl_socket_writev, /* writev */
+ amqp_ssl_socket_send, /* send */
+ amqp_ssl_socket_recv, /* recv */
+ amqp_ssl_socket_open, /* open */
+ amqp_ssl_socket_close, /* close */
+ amqp_ssl_socket_error, /* error */
+ amqp_ssl_socket_get_sockfd /* get_sockfd */
};
amqp_socket_t *
amqp_ssl_socket_new(void)
{
- struct amqp_ssl_socket_t *self = calloc(1, sizeof(*self));
- int status;
- if (!self) {
- goto error;
- }
- status = initialize_openssl();
- if (status) {
- goto error;
- }
- self->ctx = SSL_CTX_new(SSLv23_client_method());
- if (!self->ctx) {
- goto error;
- }
- self->klass = &amqp_ssl_socket_class;
- self->verify = 1;
- return (amqp_socket_t *)self;
+ struct amqp_ssl_socket_t *self = calloc(1, sizeof(*self));
+ int status;
+ if (!self) {
+ goto error;
+ }
+ status = initialize_openssl();
+ if (status) {
+ goto error;
+ }
+ self->ctx = SSL_CTX_new(SSLv23_client_method());
+ if (!self->ctx) {
+ goto error;
+ }
+ self->klass = &amqp_ssl_socket_class;
+ self->verify = 1;
+ return (amqp_socket_t *)self;
error:
- amqp_socket_close((amqp_socket_t *)self);
- return NULL;
+ amqp_socket_close((amqp_socket_t *)self);
+ return NULL;
}
int
amqp_ssl_socket_set_cacert(amqp_socket_t *base,
- const char *cacert)
+ const char *cacert)
{
- int status;
- struct amqp_ssl_socket_t *self;
- if (base->klass != &amqp_ssl_socket_class) {
- amqp_abort("<%p> is not of type amqp_ssl_socket_t", base);
- }
- self = (struct amqp_ssl_socket_t *)base;
- status = SSL_CTX_load_verify_locations(self->ctx, cacert, NULL);
- if (1 != status) {
- return -1;
- }
- return 0;
+ int status;
+ struct amqp_ssl_socket_t *self;
+ if (base->klass != &amqp_ssl_socket_class) {
+ amqp_abort("<%p> is not of type amqp_ssl_socket_t", base);
+ }
+ self = (struct amqp_ssl_socket_t *)base;
+ status = SSL_CTX_load_verify_locations(self->ctx, cacert, NULL);
+ if (1 != status) {
+ return -1;
+ }
+ return 0;
}
int
amqp_ssl_socket_set_key(amqp_socket_t *base,
- const char *cert,
- const char *key)
+ const char *cert,
+ const char *key)
{
- int status;
- struct amqp_ssl_socket_t *self;
- if (base->klass != &amqp_ssl_socket_class) {
- amqp_abort("<%p> is not of type amqp_ssl_socket_t", base);
- }
- self = (struct amqp_ssl_socket_t *)base;
- status = SSL_CTX_use_certificate_chain_file(self->ctx, cert);
- if (1 != status) {
- return -1;
- }
- status = SSL_CTX_use_PrivateKey_file(self->ctx, key,
- SSL_FILETYPE_PEM);
- if (1 != status) {
- return -1;
- }
- return 0;
+ int status;
+ struct amqp_ssl_socket_t *self;
+ if (base->klass != &amqp_ssl_socket_class) {
+ amqp_abort("<%p> is not of type amqp_ssl_socket_t", base);
+ }
+ self = (struct amqp_ssl_socket_t *)base;
+ status = SSL_CTX_use_certificate_chain_file(self->ctx, cert);
+ if (1 != status) {
+ return -1;
+ }
+ status = SSL_CTX_use_PrivateKey_file(self->ctx, key,
+ SSL_FILETYPE_PEM);
+ if (1 != status) {
+ return -1;
+ }
+ return 0;
}
static int
password_cb(AMQP_UNUSED char *buffer,
- AMQP_UNUSED int length,
- AMQP_UNUSED int rwflag,
- AMQP_UNUSED void *user_data)
+ AMQP_UNUSED int length,
+ AMQP_UNUSED int rwflag,
+ AMQP_UNUSED void *user_data)
{
- amqp_abort("rabbitmq-c does not support password protected keys");
- return 0;
+ amqp_abort("rabbitmq-c does not support password protected keys");
+ return 0;
}
int
amqp_ssl_socket_set_key_buffer(amqp_socket_t *base,
- const char *cert,
- const void *key,
- size_t n)
+ const char *cert,
+ const void *key,
+ size_t n)
{
- int status = 0;
- BIO *buf = NULL;
- RSA *rsa = NULL;
- struct amqp_ssl_socket_t *self;
- if (base->klass != &amqp_ssl_socket_class) {
- amqp_abort("<%p> is not of type amqp_ssl_socket_t", base);
- }
- self = (struct amqp_ssl_socket_t *)base;
- status = SSL_CTX_use_certificate_chain_file(self->ctx, cert);
- if (1 != status) {
- return -1;
- }
- buf = BIO_new_mem_buf((void *)key, n);
- if (!buf) {
- goto error;
- }
- rsa = PEM_read_bio_RSAPrivateKey(buf, NULL, password_cb, NULL);
- if (!rsa) {
- goto error;
- }
- status = SSL_CTX_use_RSAPrivateKey(self->ctx, rsa);
- if (1 != status) {
- goto error;
- }
+ int status = 0;
+ BIO *buf = NULL;
+ RSA *rsa = NULL;
+ struct amqp_ssl_socket_t *self;
+ if (base->klass != &amqp_ssl_socket_class) {
+ amqp_abort("<%p> is not of type amqp_ssl_socket_t", base);
+ }
+ self = (struct amqp_ssl_socket_t *)base;
+ status = SSL_CTX_use_certificate_chain_file(self->ctx, cert);
+ if (1 != status) {
+ return -1;
+ }
+ buf = BIO_new_mem_buf((void *)key, n);
+ if (!buf) {
+ goto error;
+ }
+ rsa = PEM_read_bio_RSAPrivateKey(buf, NULL, password_cb, NULL);
+ if (!rsa) {
+ goto error;
+ }
+ status = SSL_CTX_use_RSAPrivateKey(self->ctx, rsa);
+ if (1 != status) {
+ goto error;
+ }
exit:
- BIO_vfree(buf);
- RSA_free(rsa);
- return status;
+ BIO_vfree(buf);
+ RSA_free(rsa);
+ return status;
error:
- status = -1;
- goto exit;
+ status = -1;
+ goto exit;
}
int
amqp_ssl_socket_set_cert(amqp_socket_t *base,
- const char *cert)
+ const char *cert)
{
- struct amqp_ssl_socket_t *self;
- if (base->klass != &amqp_ssl_socket_class) {
- amqp_abort("<%p> is not of type amqp_ssl_socket_t", base);
- }
- self = (struct amqp_ssl_socket_t *)base;
- int status = SSL_CTX_use_certificate_chain_file(self->ctx, cert);
- if (1 != status) {
- return -1;
- }
- return 0;
+ struct amqp_ssl_socket_t *self;
+ if (base->klass != &amqp_ssl_socket_class) {
+ amqp_abort("<%p> is not of type amqp_ssl_socket_t", base);
+ }
+ self = (struct amqp_ssl_socket_t *)base;
+ int status = SSL_CTX_use_certificate_chain_file(self->ctx, cert);
+ if (1 != status) {
+ return -1;
+ }
+ return 0;
}
void
amqp_ssl_socket_set_verify(amqp_socket_t *base,
- amqp_boolean_t verify)
+ amqp_boolean_t verify)
{
- struct amqp_ssl_socket_t *self;
- if (base->klass != &amqp_ssl_socket_class) {
- amqp_abort("<%p> is not of type amqp_ssl_socket_t", base);
- }
- self = (struct amqp_ssl_socket_t *)base;
- self->verify = verify;
+ struct amqp_ssl_socket_t *self;
+ if (base->klass != &amqp_ssl_socket_class) {
+ amqp_abort("<%p> is not of type amqp_ssl_socket_t", base);
+ }
+ self = (struct amqp_ssl_socket_t *)base;
+ self->verify = verify;
}
void
amqp_set_initialize_ssl_library(amqp_boolean_t do_initialize)
{
- if (!openssl_initialized) {
- do_initialize_openssl = do_initialize;
- }
+ if (!openssl_initialized) {
+ do_initialize_openssl = do_initialize;
+ }
}
#ifdef ENABLE_THREAD_SAFETY
@@ -440,18 +440,17 @@ amqp_ssl_threadid_callback(void)
void
amqp_ssl_locking_callback(int mode, int n,
- AMQP_UNUSED const char *file,
- AMQP_UNUSED int line)
+ AMQP_UNUSED const char *file,
+ AMQP_UNUSED int line)
{
- if (mode & CRYPTO_LOCK)
- {
- if (pthread_mutex_lock(&amqp_openssl_lockarray[n]))
+ if (mode & CRYPTO_LOCK) {
+ if (pthread_mutex_lock(&amqp_openssl_lockarray[n])) {
amqp_abort("Runtime error: Failure in trying to lock OpenSSL mutex");
- }
- else
- {
- if (pthread_mutex_unlock(&amqp_openssl_lockarray[n]))
+ }
+ } else {
+ if (pthread_mutex_unlock(&amqp_openssl_lockarray[n])) {
amqp_abort("Runtime error: Failure in trying to unlock OpenSSL mutex");
+ }
}
}
#endif /* ENABLE_THREAD_SAFETY */
@@ -461,40 +460,35 @@ initialize_openssl(void)
{
#ifdef _WIN32
/* No such thing as PTHREAD_INITIALIZE_MUTEX macro on Win32, so we use this */
- if (NULL == openssl_init_mutex)
- {
+ if (NULL == openssl_init_mutex) {
while (InterlockedExchange(&win32_create_mutex, 1) == 1)
/* Loop, someone else is holding this lock */ ;
- if (NULL == openssl_init_mutex)
- {
- if (pthread_mutex_init(&openssl_init_mutex, NULL))
+ if (NULL == openssl_init_mutex) {
+ if (pthread_mutex_init(&openssl_init_mutex, NULL)) {
return -1;
+ }
}
InterlockedExchange(&win32_create_mutex, 0);
}
#endif /* _WIN32 */
#ifdef ENABLE_THREAD_SAFETY
- if (pthread_mutex_lock(&openssl_init_mutex))
+ if (pthread_mutex_lock(&openssl_init_mutex)) {
return -1;
+ }
#endif /* ENABLE_THREAD_SAFETY */
- if (do_initialize_openssl)
- {
+ if (do_initialize_openssl) {
#ifdef ENABLE_THREAD_SAFETY
- if (NULL == amqp_openssl_lockarray)
- {
+ if (NULL == amqp_openssl_lockarray) {
int i = 0;
amqp_openssl_lockarray = calloc(CRYPTO_num_locks(), sizeof(pthread_mutex_t));
- if (!amqp_openssl_lockarray)
- {
+ if (!amqp_openssl_lockarray) {
pthread_mutex_unlock(&openssl_init_mutex);
return -1;
}
- for (i = 0; i < CRYPTO_num_locks(); ++i)
- {
- if (pthread_mutex_init(&amqp_openssl_lockarray[i], NULL))
- {
+ for (i = 0; i < CRYPTO_num_locks(); ++i) {
+ if (pthread_mutex_init(&amqp_openssl_lockarray[i], NULL)) {
free(amqp_openssl_lockarray);
amqp_openssl_lockarray = NULL;
pthread_mutex_unlock(&openssl_init_mutex);
@@ -503,15 +497,13 @@ initialize_openssl(void)
}
}
- if (0 == open_ssl_connections)
- {
+ if (0 == open_ssl_connections) {
CRYPTO_set_id_callback(amqp_ssl_threadid_callback);
CRYPTO_set_locking_callback(amqp_ssl_locking_callback);
}
#endif /* ENABLE_THREAD_SAFETY */
- if (!openssl_initialized)
- {
+ if (!openssl_initialized) {
OPENSSL_config(NULL);
SSL_library_init();
@@ -533,16 +525,17 @@ static int
destroy_openssl(void)
{
#ifdef ENABLE_THREAD_SAFETY
- if (pthread_mutex_lock(&openssl_init_mutex))
+ if (pthread_mutex_lock(&openssl_init_mutex)) {
return -1;
+ }
#endif /* ENABLE_THREAD_SAFETY */
- if (open_ssl_connections > 0)
+ if (open_ssl_connections > 0) {
--open_ssl_connections;
+ }
#ifdef ENABLE_THREAD_SAFETY
- if (0 == open_ssl_connections && do_initialize_openssl)
- {
+ if (0 == open_ssl_connections && do_initialize_openssl) {
/* Unsetting these allows the rabbitmq-c library to be unloaded
* safely. We do leak the amqp_openssl_lockarray. Which is only
* an issue if you repeatedly unload and load the library
diff --git a/librabbitmq/amqp_polarssl.c b/librabbitmq/amqp_polarssl.c
index 027ee47..b89f602 100644
--- a/librabbitmq/amqp_polarssl.c
+++ b/librabbitmq/amqp_polarssl.c
@@ -34,271 +34,271 @@
#include <stdlib.h>
struct amqp_ssl_socket_t {
- int sockfd;
- entropy_context *entropy;
- ctr_drbg_context *ctr_drbg;
- x509_cert *cacert;
- rsa_context *key;
- x509_cert *cert;
- ssl_context *ssl;
- ssl_session *session;
- char *buffer;
- size_t length;
+ int sockfd;
+ entropy_context *entropy;
+ ctr_drbg_context *ctr_drbg;
+ x509_cert *cacert;
+ rsa_context *key;
+ x509_cert *cert;
+ ssl_context *ssl;
+ ssl_session *session;
+ char *buffer;
+ size_t length;
};
static ssize_t
amqp_ssl_socket_send(void *base,
- const void *buf,
- size_t len,
- AMQP_UNUSED int flags)
+ const void *buf,
+ size_t len,
+ AMQP_UNUSED int flags)
{
- struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
- return ssl_write(self->ssl, buf, len);
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+ return ssl_write(self->ssl, buf, len);
}
static ssize_t
amqp_ssl_socket_writev(void *base,
- const struct iovec *iov,
- int iovcnt)
+ const struct iovec *iov,
+ int iovcnt)
{
- struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
- ssize_t written = -1;
- char *bufferp;
- size_t bytes;
- int i;
- bytes = 0;
- for (i = 0; i < iovcnt; ++i) {
- bytes += iov[i].iov_len;
- }
- if (self->length < bytes) {
- free(self->buffer);
- self->buffer = malloc(bytes);
- if (!self->buffer) {
- self->length = 0;
- goto exit;
- }
- self->length = bytes;
- }
- bufferp = self->buffer;
- for (i = 0; i < iovcnt; ++i) {
- memcpy(bufferp, iov[i].iov_base, iov[i].iov_len);
- bufferp += iov[i].iov_len;
- }
- written = ssl_write(self->ssl, (const unsigned char *)self->buffer,
- bytes);
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+ ssize_t written = -1;
+ char *bufferp;
+ size_t bytes;
+ int i;
+ bytes = 0;
+ for (i = 0; i < iovcnt; ++i) {
+ bytes += iov[i].iov_len;
+ }
+ if (self->length < bytes) {
+ free(self->buffer);
+ self->buffer = malloc(bytes);
+ if (!self->buffer) {
+ self->length = 0;
+ goto exit;
+ }
+ self->length = bytes;
+ }
+ bufferp = self->buffer;
+ for (i = 0; i < iovcnt; ++i) {
+ memcpy(bufferp, iov[i].iov_base, iov[i].iov_len);
+ bufferp += iov[i].iov_len;
+ }
+ written = ssl_write(self->ssl, (const unsigned char *)self->buffer,
+ bytes);
exit:
- return written;
+ return written;
}
static ssize_t
amqp_ssl_socket_recv(void *base,
- void *buf,
- size_t len,
- AMQP_UNUSED int flags)
+ void *buf,
+ size_t len,
+ AMQP_UNUSED int flags)
{
- struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
- return ssl_read(self->ssl, buf, len);
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+ return ssl_read(self->ssl, buf, len);
}
static int
amqp_ssl_socket_open(void *base, const char *host, int port)
{
- struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
- int status = net_connect(&self->sockfd, host, port);
- if (status) {
- return -1;
- }
- if (self->cacert) {
- ssl_set_ca_chain(self->ssl, self->cacert, NULL, host);
- }
- ssl_set_bio(self->ssl, net_recv, &self->sockfd,
- net_send, &self->sockfd);
- if (self->key && self->cert) {
- ssl_set_own_cert(self->ssl, self->cert, self->key);
- }
- while (0 != (status = ssl_handshake(self->ssl))) {
- switch (status) {
- case POLARSSL_ERR_NET_WANT_READ:
- case POLARSSL_ERR_NET_WANT_WRITE:
- continue;
- default:
- break;
- }
- }
- return status;
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+ int status = net_connect(&self->sockfd, host, port);
+ if (status) {
+ return -1;
+ }
+ if (self->cacert) {
+ ssl_set_ca_chain(self->ssl, self->cacert, NULL, host);
+ }
+ ssl_set_bio(self->ssl, net_recv, &self->sockfd,
+ net_send, &self->sockfd);
+ if (self->key && self->cert) {
+ ssl_set_own_cert(self->ssl, self->cert, self->key);
+ }
+ while (0 != (status = ssl_handshake(self->ssl))) {
+ switch (status) {
+ case POLARSSL_ERR_NET_WANT_READ:
+ case POLARSSL_ERR_NET_WANT_WRITE:
+ continue;
+ default:
+ break;
+ }
+ }
+ return status;
}
static int
amqp_ssl_socket_close(void *base)
{
- int status = -1;
- struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
- if (self) {
- free(self->entropy);
- free(self->ctr_drbg);
- x509_free(self->cacert);
- free(self->cacert);
- rsa_free(self->key);
- free(self->key);
- x509_free(self->cert);
- free(self->cert);
- ssl_free(self->ssl);
- free(self->ssl);
- free(self->session);
- free(self->buffer);
- if (self->sockfd >= 0) {
- net_close(self->sockfd);
- status = 0;
- }
- free(self);
- }
- return status;
+ int status = -1;
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+ if (self) {
+ free(self->entropy);
+ free(self->ctr_drbg);
+ x509_free(self->cacert);
+ free(self->cacert);
+ rsa_free(self->key);
+ free(self->key);
+ x509_free(self->cert);
+ free(self->cert);
+ ssl_free(self->ssl);
+ free(self->ssl);
+ free(self->session);
+ free(self->buffer);
+ if (self->sockfd >= 0) {
+ net_close(self->sockfd);
+ status = 0;
+ }
+ free(self);
+ }
+ return status;
}
static int
amqp_ssl_socket_error(AMQP_UNUSED void *user_data)
{
- return -1;
+ return -1;
}
static int
amqp_ssl_socket_get_sockfd(void *base)
{
- struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
- return self->sockfd;
+ struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
+ return self->sockfd;
}
static const struct amqp_socket_class_t amqp_ssl_socket_class = {
- amqp_ssl_socket_writev, /* writev */
- amqp_ssl_socket_send, /* send */
- amqp_ssl_socket_recv, /* recv */
- amqp_ssl_socket_open, /* open */
- amqp_ssl_socket_close, /* close */
- amqp_ssl_socket_error, /* error */
- amqp_ssl_socket_get_sockfd /* get_sockfd */
+ amqp_ssl_socket_writev, /* writev */
+ amqp_ssl_socket_send, /* send */
+ amqp_ssl_socket_recv, /* recv */
+ amqp_ssl_socket_open, /* open */
+ amqp_ssl_socket_close, /* close */
+ amqp_ssl_socket_error, /* error */
+ amqp_ssl_socket_get_sockfd /* get_sockfd */
};
amqp_socket_t *
amqp_ssl_socket_new(void)
{
- struct amqp_ssl_socket_t *self = calloc(1, sizeof(*self));
- int status;
- if (!self) {
- goto error;
- }
- self->entropy = calloc(1, sizeof(*self->entropy));
- if (!self->entropy) {
- goto error;
- }
- self->sockfd = -1;
- entropy_init(self->entropy);
- self->ctr_drbg = calloc(1, sizeof(*self->ctr_drbg));
- if (!self->ctr_drbg) {
- goto error;
- }
- status = ctr_drbg_init(self->ctr_drbg, entropy_func, self->entropy,
- NULL, 0);
- if (status) {
- goto error;
- }
- self->ssl = calloc(1, sizeof(*self->ssl));
- if (!self->ssl) {
- goto error;
- }
- status = ssl_init(self->ssl);
- if (status) {
- goto error;
- }
- ssl_set_endpoint(self->ssl, SSL_IS_CLIENT);
- ssl_set_rng(self->ssl, ctr_drbg_random, self->ctr_drbg);
- ssl_set_ciphersuites(self->ssl, ssl_default_ciphersuites);
- ssl_set_authmode(self->ssl, SSL_VERIFY_REQUIRED);
- self->session = calloc(1, sizeof(*self->session));
- if (!self->session) {
- goto error;
- }
- ssl_set_session(self->ssl, 0, 0, self->session);
- return (amqp_socket_t *)self;
+ struct amqp_ssl_socket_t *self = calloc(1, sizeof(*self));
+ int status;
+ if (!self) {
+ goto error;
+ }
+ self->entropy = calloc(1, sizeof(*self->entropy));
+ if (!self->entropy) {
+ goto error;
+ }
+ self->sockfd = -1;
+ entropy_init(self->entropy);
+ self->ctr_drbg = calloc(1, sizeof(*self->ctr_drbg));
+ if (!self->ctr_drbg) {
+ goto error;
+ }
+ status = ctr_drbg_init(self->ctr_drbg, entropy_func, self->entropy,
+ NULL, 0);
+ if (status) {
+ goto error;
+ }
+ self->ssl = calloc(1, sizeof(*self->ssl));
+ if (!self->ssl) {
+ goto error;
+ }
+ status = ssl_init(self->ssl);
+ if (status) {
+ goto error;
+ }
+ ssl_set_endpoint(self->ssl, SSL_IS_CLIENT);
+ ssl_set_rng(self->ssl, ctr_drbg_random, self->ctr_drbg);
+ ssl_set_ciphersuites(self->ssl, ssl_default_ciphersuites);
+ ssl_set_authmode(self->ssl, SSL_VERIFY_REQUIRED);
+ self->session = calloc(1, sizeof(*self->session));
+ if (!self->session) {
+ goto error;
+ }
+ ssl_set_session(self->ssl, 0, 0, self->session);
+ return (amqp_socket_t *)self;
error:
- amqp_socket_close((amqp_socket_t *)self);
- return NULL;
+ amqp_socket_close((amqp_socket_t *)self);
+ return NULL;
}
int
amqp_ssl_socket_set_cacert(amqp_socket_t *base,
- const char *cacert)
+ const char *cacert)
{
- int status;
- struct amqp_ssl_socket_t *self;
- if (base->klass != &amqp_ssl_socket_class) {
- amqp_abort("<%p> is not of type amqp_ssl_socket_t", base);
- }
- self = (struct amqp_ssl_socket_t *)base;
- self->cacert = calloc(1, sizeof(*self->cacert));
- if (!self->cacert) {
- return -1;
- }
- status = x509parse_crtfile(self->cacert, cacert);
- if (status) {
- return -1;
- }
- return 0;
+ int status;
+ struct amqp_ssl_socket_t *self;
+ if (base->klass != &amqp_ssl_socket_class) {
+ amqp_abort("<%p> is not of type amqp_ssl_socket_t", base);
+ }
+ self = (struct amqp_ssl_socket_t *)base;
+ self->cacert = calloc(1, sizeof(*self->cacert));
+ if (!self->cacert) {
+ return -1;
+ }
+ status = x509parse_crtfile(self->cacert, cacert);
+ if (status) {
+ return -1;
+ }
+ return 0;
}
int
amqp_ssl_socket_set_key(amqp_socket_t *base,
- const char *cert,
- const char *key)
+ const char *cert,
+ const char *key)
{
- int status;
- struct amqp_ssl_socket_t *self;
- if (base->klass != &amqp_ssl_socket_class) {
- amqp_abort("<%p> is not of type amqp_ssl_socket_t", base);
- }
- self = (struct amqp_ssl_socket_t *)base;
- self->key = calloc(1, sizeof(*self->key));
- if (!self->key) {
- return -1;
- }
- status = x509parse_keyfile(self->key, key, NULL);
- if (status) {
- return -1;
- }
- self->cert = calloc(1, sizeof(*self->cert));
- if (!self->cert) {
- return -1;
- }
- status = x509parse_crtfile(self->cert, cert);
- if (status) {
- return -1;
- }
- return 0;
+ int status;
+ struct amqp_ssl_socket_t *self;
+ if (base->klass != &amqp_ssl_socket_class) {
+ amqp_abort("<%p> is not of type amqp_ssl_socket_t", base);
+ }
+ self = (struct amqp_ssl_socket_t *)base;
+ self->key = calloc(1, sizeof(*self->key));
+ if (!self->key) {
+ return -1;
+ }
+ status = x509parse_keyfile(self->key, key, NULL);
+ if (status) {
+ return -1;
+ }
+ self->cert = calloc(1, sizeof(*self->cert));
+ if (!self->cert) {
+ return -1;
+ }
+ status = x509parse_crtfile(self->cert, cert);
+ if (status) {
+ return -1;
+ }
+ return 0;
}
int
amqp_ssl_socket_set_key_buffer(AMQP_UNUSED amqp_socket_t *base,
- AMQP_UNUSED const char *cert,
- AMQP_UNUSED const void *key,
- AMQP_UNUSED size_t n)
+ AMQP_UNUSED const char *cert,
+ AMQP_UNUSED const void *key,
+ AMQP_UNUSED size_t n)
{
- amqp_abort("%s is not implemented for PolarSSL", __func__);
- return -1;
+ amqp_abort("%s is not implemented for PolarSSL", __func__);
+ return -1;
}
void
amqp_ssl_socket_set_verify(amqp_socket_t *base,
- amqp_boolean_t verify)
+ amqp_boolean_t verify)
{
- struct amqp_ssl_socket_t *self;
- if (base->klass != &amqp_ssl_socket_class) {
- amqp_abort("<%p> is not of type amqp_ssl_socket_t", base);
- }
- self = (struct amqp_ssl_socket_t *)base;
- if (verify) {
- ssl_set_authmode(self->ssl, SSL_VERIFY_REQUIRED);
- } else {
- ssl_set_authmode(self->ssl, SSL_VERIFY_NONE);
- }
+ struct amqp_ssl_socket_t *self;
+ if (base->klass != &amqp_ssl_socket_class) {
+ amqp_abort("<%p> is not of type amqp_ssl_socket_t", base);
+ }
+ self = (struct amqp_ssl_socket_t *)base;
+ if (verify) {
+ ssl_set_authmode(self->ssl, SSL_VERIFY_REQUIRED);
+ } else {
+ ssl_set_authmode(self->ssl, SSL_VERIFY_NONE);
+ }
}
void
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index 559be47..130d19a 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -141,22 +141,19 @@ int amqp_open_socket(char const *hostname,
http://stackoverflow.com/questions/1953639/is-it-safe-to-cast-socket-to-int-under-win64
*/
sockfd = (int)socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol);
- if (-1 == sockfd)
- {
+ if (-1 == sockfd) {
last_error = -amqp_os_socket_error();
continue;
}
#ifdef DISABLE_SIGPIPE_WITH_SETSOCKOPT
- if (0 != amqp_socket_setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one)))
- {
+ if (0 != amqp_socket_setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one))) {
last_error = -amqp_os_socket_error();
amqp_os_socket_close(sockfd);
continue;
}
#endif /* DISABLE_SIGPIPE_WITH_SETSOCKOPT */
if (0 != amqp_socket_setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one))
- || 0 != connect(sockfd, addr->ai_addr, addr->ai_addrlen))
- {
+ || 0 != connect(sockfd, addr->ai_addr, addr->ai_addrlen)) {
last_error = -amqp_os_socket_error();
amqp_os_socket_close(sockfd);
continue;
@@ -406,14 +403,14 @@ retry:
*/
if (!((frame.frame_type == AMQP_FRAME_METHOD)
&& (
- ((frame.channel == channel)
- && (amqp_id_in_reply_list(frame.payload.method.id, expected_reply_ids)
- || (frame.payload.method.id == AMQP_CHANNEL_CLOSE_METHOD)))
- ||
- ((frame.channel == 0)
- && (frame.payload.method.id == AMQP_CONNECTION_CLOSE_METHOD))
- )
- )) {
+ ((frame.channel == channel)
+ && (amqp_id_in_reply_list(frame.payload.method.id, expected_reply_ids)
+ || (frame.payload.method.id == AMQP_CHANNEL_CLOSE_METHOD)))
+ ||
+ ((frame.channel == 0)
+ && (frame.payload.method.id == AMQP_CONNECTION_CLOSE_METHOD))
+ )
+ )) {
amqp_frame_t *frame_copy = amqp_pool_alloc(&state->decoding_pool, sizeof(amqp_frame_t));
amqp_link_t *link = amqp_pool_alloc(&state->decoding_pool, sizeof(amqp_link_t));
@@ -495,13 +492,13 @@ static int amqp_table_contains_entry(const amqp_table_t *table,
}
static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state,
- char const *vhost,
- int channel_max,
- int frame_max,
- int heartbeat,
- const amqp_table_t *client_properties,
- amqp_sasl_method_enum sasl_method,
- va_list vl)
+ char const *vhost,
+ int channel_max,
+ int frame_max,
+ int heartbeat,
+ const amqp_table_t *client_properties,
+ amqp_sasl_method_enum sasl_method,
+ va_list vl)
{
int res;
amqp_method_t method;
@@ -536,7 +533,7 @@ static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state,
amqp_table_t default_table;
amqp_connection_start_ok_t s;
amqp_bytes_t response_bytes = sasl_response(&state->decoding_pool,
- sasl_method, vl);
+ sasl_method, vl);
if (response_bytes.bytes == NULL) {
res = -ERROR_NO_MEMORY;
@@ -572,7 +569,7 @@ static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state,
amqp_table_entry_t *current_entry;
s.client_properties.entries = amqp_pool_alloc(&state->decoding_pool,
- sizeof(amqp_table_entry_t) * (default_table.num_entries + client_properties->num_entries));
+ sizeof(amqp_table_entry_t) * (default_table.num_entries + client_properties->num_entries));
if (NULL == s.client_properties.entries) {
res = -ERROR_NO_MEMORY;
goto error_res;
@@ -707,13 +704,13 @@ amqp_rpc_reply_t amqp_login(amqp_connection_state_t state,
}
amqp_rpc_reply_t amqp_login_with_properties(amqp_connection_state_t state,
- char const *vhost,
- int channel_max,
- int frame_max,
- int heartbeat,
- const amqp_table_t *client_properties,
- amqp_sasl_method_enum sasl_method,
- ...)
+ char const *vhost,
+ int channel_max,
+ int frame_max,
+ int heartbeat,
+ const amqp_table_t *client_properties,
+ amqp_sasl_method_enum sasl_method,
+ ...)
{
va_list vl;
diff --git a/librabbitmq/amqp_socket.h b/librabbitmq/amqp_socket.h
index 5a6dfde..63c5e8d 100644
--- a/librabbitmq/amqp_socket.h
+++ b/librabbitmq/amqp_socket.h
@@ -44,18 +44,18 @@ typedef int (*amqp_socket_get_sockfd_fn)(void *);
/** V-table for amqp_socket_t */
struct amqp_socket_class_t {
- amqp_socket_writev_fn writev;
- amqp_socket_send_fn send;
- amqp_socket_recv_fn recv;
- amqp_socket_open_fn open;
- amqp_socket_close_fn close;
- amqp_socket_error_fn error;
- amqp_socket_get_sockfd_fn get_sockfd;
+ amqp_socket_writev_fn writev;
+ amqp_socket_send_fn send;
+ amqp_socket_recv_fn recv;
+ amqp_socket_open_fn open;
+ amqp_socket_close_fn close;
+ amqp_socket_error_fn error;
+ amqp_socket_get_sockfd_fn get_sockfd;
};
/** Abstract base class for amqp_socket_t */
struct amqp_socket_t_ {
- const struct amqp_socket_class_t *klass;
+ const struct amqp_socket_class_t *klass;
};
/**
diff --git a/librabbitmq/amqp_ssl_socket.h b/librabbitmq/amqp_ssl_socket.h
index fe61cd3..6348667 100644
--- a/librabbitmq/amqp_ssl_socket.h
+++ b/librabbitmq/amqp_ssl_socket.h
@@ -56,7 +56,7 @@ AMQP_PUBLIC_FUNCTION
int
AMQP_CALL
amqp_ssl_socket_set_cacert(amqp_socket_t *self,
- const char *cacert);
+ const char *cacert);
/**
* Set the client key.
@@ -71,8 +71,8 @@ AMQP_PUBLIC_FUNCTION
int
AMQP_CALL
amqp_ssl_socket_set_key(amqp_socket_t *self,
- const char *cert,
- const char *key);
+ const char *cert,
+ const char *key);
/**
* Set the client key from a buffer.
@@ -88,9 +88,9 @@ AMQP_PUBLIC_FUNCTION
int
AMQP_CALL
amqp_ssl_socket_set_key_buffer(amqp_socket_t *self,
- const char *cert,
- const void *key,
- size_t n);
+ const char *cert,
+ const void *key,
+ size_t n);
/**
* Enable or disable peer verification.
@@ -106,7 +106,7 @@ AMQP_PUBLIC_FUNCTION
void
AMQP_CALL
amqp_ssl_socket_set_verify(amqp_socket_t *self,
- amqp_boolean_t verify);
+ amqp_boolean_t verify);
/**
* Sets whether rabbitmq-c initializes the underlying SSL library.
diff --git a/librabbitmq/amqp_tcp_socket.c b/librabbitmq/amqp_tcp_socket.c
index cd3f6ce..f65a6ce 100644
--- a/librabbitmq/amqp_tcp_socket.c
+++ b/librabbitmq/amqp_tcp_socket.c
@@ -31,96 +31,96 @@
#include <stdlib.h>
struct amqp_tcp_socket_t {
- const struct amqp_socket_class_t *klass;
- int sockfd;
+ const struct amqp_socket_class_t *klass;
+ int sockfd;
};
static ssize_t
amqp_tcp_socket_writev(void *base, const struct iovec *iov, int iovcnt)
{
- struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
- return amqp_os_socket_writev(self->sockfd, iov, iovcnt);
+ struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
+ return amqp_os_socket_writev(self->sockfd, iov, iovcnt);
}
static ssize_t
amqp_tcp_socket_send(void *base, const void *buf, size_t len, int flags)
{
- struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
- return send(self->sockfd, buf, len, flags);
+ struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
+ return send(self->sockfd, buf, len, flags);
}
static ssize_t
amqp_tcp_socket_recv(void *base, void *buf, size_t len, int flags)
{
- struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
- return recv(self->sockfd, buf, len, flags);
+ struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
+ return recv(self->sockfd, buf, len, flags);
}
static int
amqp_tcp_socket_open(void *base, const char *host, int port)
{
- struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
- self->sockfd = amqp_open_socket(host, port);
- if (0 > self->sockfd) {
- return -1;
- }
- return 0;
+ struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
+ self->sockfd = amqp_open_socket(host, port);
+ if (0 > self->sockfd) {
+ return -1;
+ }
+ return 0;
}
static int
amqp_tcp_socket_close(void *base)
{
- struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
- int status = -1;
- if (self) {
- status = amqp_os_socket_close(self->sockfd);
- free(self);
- }
- return status;
+ struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
+ int status = -1;
+ if (self) {
+ status = amqp_os_socket_close(self->sockfd);
+ free(self);
+ }
+ return status;
}
static int
amqp_tcp_socket_error(AMQP_UNUSED void *base)
{
- return amqp_os_socket_error();
+ return amqp_os_socket_error();
}
static int
amqp_tcp_socket_get_sockfd(void *base)
{
- struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
- return self->sockfd;
+ struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
+ return self->sockfd;
}
static const struct amqp_socket_class_t amqp_tcp_socket_class = {
- amqp_tcp_socket_writev, /* writev */
- amqp_tcp_socket_send, /* send */
- amqp_tcp_socket_recv, /* recv */
- amqp_tcp_socket_open, /* open */
- amqp_tcp_socket_close, /* close */
- amqp_tcp_socket_error, /* error */
- amqp_tcp_socket_get_sockfd /* get_sockfd */
+ amqp_tcp_socket_writev, /* writev */
+ amqp_tcp_socket_send, /* send */
+ amqp_tcp_socket_recv, /* recv */
+ amqp_tcp_socket_open, /* open */
+ amqp_tcp_socket_close, /* close */
+ amqp_tcp_socket_error, /* error */
+ amqp_tcp_socket_get_sockfd /* get_sockfd */
};
amqp_socket_t *
amqp_tcp_socket_new(void)
{
- struct amqp_tcp_socket_t *self = calloc(1, sizeof(*self));
- if (!self) {
- return NULL;
- }
- self->klass = &amqp_tcp_socket_class;
- self->sockfd = -1;
- return (amqp_socket_t *)self;
+ struct amqp_tcp_socket_t *self = calloc(1, sizeof(*self));
+ if (!self) {
+ return NULL;
+ }
+ self->klass = &amqp_tcp_socket_class;
+ self->sockfd = -1;
+ return (amqp_socket_t *)self;
}
void
amqp_tcp_socket_set_sockfd(amqp_socket_t *base, int sockfd)
{
- struct amqp_tcp_socket_t *self;
- if (base->klass != &amqp_tcp_socket_class) {
- amqp_abort("<%p> is not of type amqp_tcp_socket_t", base);
- }
- self = (struct amqp_tcp_socket_t *)base;
- self->sockfd = sockfd;
+ struct amqp_tcp_socket_t *self;
+ if (base->klass != &amqp_tcp_socket_class) {
+ amqp_abort("<%p> is not of type amqp_tcp_socket_t", base);
+ }
+ self = (struct amqp_tcp_socket_t *)base;
+ self->sockfd = sockfd;
}
diff --git a/librabbitmq/amqp_url.c b/librabbitmq/amqp_url.c
index b15675c..b200adc 100644
--- a/librabbitmq/amqp_url.c
+++ b/librabbitmq/amqp_url.c
@@ -141,8 +141,9 @@ int amqp_parse_url(char *url, struct amqp_connection_info *parsed)
/* What might have been the host and port were in fact
the username and password */
parsed->user = host;
- if (port)
+ if (port) {
parsed->password = port;
+ }
port = NULL;
host = start = url;
@@ -152,14 +153,16 @@ int amqp_parse_url(char *url, struct amqp_connection_info *parsed)
if (delim == '[') {
/* IPv6 address. The bracket should be the first
character in the host. */
- if (host != start || *host != 0)
+ if (host != start || *host != 0) {
goto out;
+ }
start = url;
delim = find_delim(&url, 0);
- if (delim != ']')
+ if (delim != ']') {
goto out;
+ }
parsed->host = start;
start = url;
@@ -167,13 +170,14 @@ int amqp_parse_url(char *url, struct amqp_connection_info *parsed)
/* Closing bracket should be the last character in the
host. */
- if (*start != 0)
+ if (*start != 0) {
goto out;
- }
- else {
+ }
+ } else {
/* If we haven't seen the host yet, this is it. */
- if (*host != 0)
+ if (*host != 0) {
parsed->host = host;
+ }
}
if (delim == ':') {
@@ -185,8 +189,9 @@ int amqp_parse_url(char *url, struct amqp_connection_info *parsed)
char *end;
long portnum = strtol(port, &end, 10);
- if (port == end || *end != 0 || portnum < 0 || portnum > 65535)
+ if (port == end || *end != 0 || portnum < 0 || portnum > 65535) {
goto out;
+ }
parsed->port = portnum;
}
@@ -195,13 +200,13 @@ int amqp_parse_url(char *url, struct amqp_connection_info *parsed)
start = url;
delim = find_delim(&url, 1);
- if (delim != 0)
+ if (delim != 0) {
goto out;
+ }
parsed->vhost = start;
res = 0;
- }
- else if (delim == 0) {
+ } else if (delim == 0) {
res = 0;
}
diff --git a/librabbitmq/unix/socket.c b/librabbitmq/unix/socket.c
index e6621cc..4615480 100644
--- a/librabbitmq/unix/socket.c
+++ b/librabbitmq/unix/socket.c
@@ -85,18 +85,18 @@ amqp_os_error_string(int err)
int
amqp_os_socket_close(int sockfd)
{
- return close(sockfd);
+ return close(sockfd);
}
ssize_t
amqp_os_socket_writev(int sockfd, const struct iovec *iov,
- int iovcnt)
+ int iovcnt)
{
- return writev(sockfd, iov, iovcnt);
+ return writev(sockfd, iov, iovcnt);
}
int
amqp_os_socket_error(void)
{
- return errno | ERROR_CATEGORY_OS;
+ return errno | ERROR_CATEGORY_OS;
}
diff --git a/librabbitmq/win32/socket.c b/librabbitmq/win32/socket.c
index 815530f..fa2b0f7 100644
--- a/librabbitmq/win32/socket.c
+++ b/librabbitmq/win32/socket.c
@@ -95,12 +95,12 @@ amqp_socket_setsockopt(int sock, int level, int optname,
int
amqp_socket_close(int sockfd, AMQP_UNUSED void *user_data)
{
- return closesocket(sockfd);
+ return closesocket(sockfd);
}
ssize_t
amqp_socket_writev(int sock, struct iovec *iov, int nvecs,
- AMQP_UNUSED void *user_data)
+ AMQP_UNUSED void *user_data)
{
DWORD ret;
if (WSASend(sock, (LPWSABUF)iov, nvecs, &ret, 0, NULL, NULL) == 0) {
diff --git a/librabbitmq/win32/socket.h b/librabbitmq/win32/socket.h
index db40a97..58e1a99 100644
--- a/librabbitmq/win32/socket.h
+++ b/librabbitmq/win32/socket.h
@@ -60,7 +60,7 @@ amqp_socket_setsockopt(int sock, int level, int optname, const void *optval,
size_t optlen);
ssize_t
-amqp_socket_writev(int sock, struct iovec *iov, int nvecs, AMQP_UNUSED void* user_data);
+amqp_socket_writev(int sock, struct iovec *iov, int nvecs, AMQP_UNUSED void *user_data);
int
amqp_socket_error(AMQP_UNUSED void *user_data);
diff --git a/librabbitmq/win32/threads.c b/librabbitmq/win32/threads.c
index 0501408..1559c5f 100644
--- a/librabbitmq/win32/threads.c
+++ b/librabbitmq/win32/threads.c
@@ -11,8 +11,9 @@ int
pthread_mutex_init(pthread_mutex_t *mutex, void *attr)
{
*mutex = malloc(sizeof(CRITICAL_SECTION));
- if (!*mutex)
+ if (!*mutex) {
return 1;
+ }
InitializeCriticalSection(*mutex);
return 0;
}
@@ -20,8 +21,9 @@ pthread_mutex_init(pthread_mutex_t *mutex, void *attr)
int
pthread_mutex_lock(pthread_mutex_t *mutex)
{
- if (!*mutex)
+ if (!*mutex) {
return 1;
+ }
EnterCriticalSection(*mutex);
return 0;
@@ -30,8 +32,9 @@ pthread_mutex_lock(pthread_mutex_t *mutex)
int
pthread_mutex_unlock(pthread_mutex_t *mutex)
{
- if (!*mutex)
+ if (!*mutex) {
return 1;
+ }
LeaveCriticalSection(*mutex);
return 0;
diff --git a/tests/test_parse_url.c b/tests/test_parse_url.c
index 241cf8c..da3e681 100644
--- a/tests/test_parse_url.c
+++ b/tests/test_parse_url.c
@@ -111,14 +111,14 @@ int main(void)
{
/* From the spec */
parse_success("amqp://user:pass@host:10000/vhost", "user", "pass",
- "host", 10000, "vhost");
+ "host", 10000, "vhost");
parse_success("amqps://user:pass@host:10000/vhost", "user", "pass",
- "host", 10000, "vhost");
+ "host", 10000, "vhost");
parse_success("amqp://user%61:%61pass@ho%61st:10000/v%2fhost",
- "usera", "apass", "hoast", 10000, "v/host");
+ "usera", "apass", "hoast", 10000, "v/host");
parse_success("amqps://user%61:%61pass@ho%61st:10000/v%2fhost",
- "usera", "apass", "hoast", 10000, "v/host");
+ "usera", "apass", "hoast", 10000, "v/host");
parse_success("amqp://", "guest", "guest", "localhost", 5672, "/");
parse_success("amqps://", "guest", "guest", "localhost", 5671, "/");
@@ -130,22 +130,22 @@ int main(void)
parse_success("amqps://user@", "user", "guest", "localhost", 5671, "/");
parse_success("amqp://user:pass@", "user", "pass",
- "localhost", 5672, "/");
+ "localhost", 5672, "/");
parse_success("amqps://user:pass@", "user", "pass",
- "localhost", 5671, "/");
+ "localhost", 5671, "/");
parse_success("amqp://host", "guest", "guest", "host", 5672, "/");
parse_success("amqps://host", "guest", "guest", "host", 5671, "/");
parse_success("amqp://:10000", "guest", "guest", "localhost", 10000,
- "/");
+ "/");
parse_success("amqps://:10000", "guest", "guest", "localhost", 10000,
- "/");
+ "/");
parse_success("amqp:///vhost", "guest", "guest", "localhost", 5672,
- "vhost");
+ "vhost");
parse_success("amqps:///vhost", "guest", "guest", "localhost", 5671,
- "vhost");
+ "vhost");
parse_success("amqp://host/", "guest", "guest", "host", 5672, "");
parse_success("amqps://host/", "guest", "guest", "host", 5671, "");
@@ -164,54 +164,54 @@ int main(void)
parse_success("amqps://[::1]:100", "guest", "guest", "::1", 100, "/");
parse_success("amqp://host/blah", "guest", "guest",
- "host", 5672, "blah");
+ "host", 5672, "blah");
parse_success("amqps://host/blah", "guest", "guest",
- "host", 5671, "blah");
+ "host", 5671, "blah");
parse_success("amqp://host:100/blah", "guest", "guest",
- "host", 100, "blah");
+ "host", 100, "blah");
parse_success("amqps://host:100/blah", "guest", "guest",
- "host", 100, "blah");
+ "host", 100, "blah");
parse_success("amqp://:100/blah", "guest", "guest",
- "localhost", 100, "blah");
+ "localhost", 100, "blah");
parse_success("amqps://:100/blah", "guest", "guest",
- "localhost", 100, "blah");
+ "localhost", 100, "blah");
parse_success("amqp://[::1]/blah", "guest", "guest",
- "::1", 5672, "blah");
+ "::1", 5672, "blah");
parse_success("amqps://[::1]/blah", "guest", "guest",
- "::1", 5671, "blah");
+ "::1", 5671, "blah");
parse_success("amqp://[::1]:100/blah", "guest", "guest",
- "::1", 100, "blah");
+ "::1", 100, "blah");
parse_success("amqps://[::1]:100/blah", "guest", "guest",
- "::1", 100, "blah");
+ "::1", 100, "blah");
parse_success("amqp://user:pass@host", "user", "pass",
- "host", 5672, "/");
+ "host", 5672, "/");
parse_success("amqps://user:pass@host", "user", "pass",
- "host", 5671, "/");
+ "host", 5671, "/");
parse_success("amqp://user:pass@host:100", "user", "pass",
- "host", 100, "/");
+ "host", 100, "/");
parse_success("amqps://user:pass@host:100", "user", "pass",
- "host", 100, "/");
+ "host", 100, "/");
parse_success("amqp://user:pass@:100", "user", "pass",
- "localhost", 100, "/");
+ "localhost", 100, "/");
parse_success("amqps://user:pass@:100", "user", "pass",
- "localhost", 100, "/");
+ "localhost", 100, "/");
parse_success("amqp://user:pass@[::1]", "user", "pass",
- "::1", 5672, "/");
+ "::1", 5672, "/");
parse_success("amqps://user:pass@[::1]", "user", "pass",
- "::1", 5671, "/");
+ "::1", 5671, "/");
parse_success("amqp://user:pass@[::1]:100", "user", "pass",
- "::1", 100, "/");
+ "::1", 100, "/");
parse_success("amqps://user:pass@[::1]:100", "user", "pass",
- "::1", 100, "/");
+ "::1", 100, "/");
/* Various failure cases */
parse_fail("http://www.rabbitmq.com");
diff --git a/tools/common.c b/tools/common.c
index c1844ae..0c61d16 100644
--- a/tools/common.c
+++ b/tools/common.c
@@ -209,14 +209,22 @@ struct poptOption connect_options[] = {
"the password to login with", "password"
},
#ifdef WITH_SSL
- {"ssl", 0, POPT_ARG_NONE, &amqp_ssl, 0,
- "connect over SSL/TLS", NULL},
- {"cacert", 0, POPT_ARG_STRING, &amqp_cacert, 0,
- "path to the CA certificate file", "cacert.pem"},
- {"key", 0, POPT_ARG_STRING, &amqp_key, 0,
- "path to the client private key file", "key.pem"},
- {"cert", 0, POPT_ARG_STRING, &amqp_cert, 0,
- "path to the client certificate file", "cert.pem"},
+ {
+ "ssl", 0, POPT_ARG_NONE, &amqp_ssl, 0,
+ "connect over SSL/TLS", NULL
+ },
+ {
+ "cacert", 0, POPT_ARG_STRING, &amqp_cacert, 0,
+ "path to the CA certificate file", "cacert.pem"
+ },
+ {
+ "key", 0, POPT_ARG_STRING, &amqp_key, 0,
+ "path to the client private key file", "key.pem"
+ },
+ {
+ "cert", 0, POPT_ARG_STRING, &amqp_cert, 0,
+ "path to the client certificate file", "cert.pem"
+ },
#endif /* WITH_SSL */
{ NULL, '\0', 0, NULL, 0, NULL, NULL }
};
@@ -234,7 +242,7 @@ static void init_connection_info(struct amqp_connection_info *ci)
if (amqp_url)
die_amqp_error(amqp_parse_url(strdup(amqp_url), ci),
- "Parsing URL '%s'", amqp_url);
+ "Parsing URL '%s'", amqp_url);
if (amqp_server) {
char *colon;
@@ -253,7 +261,7 @@ static void init_connection_info(struct amqp_connection_info *ci)
--url now allows connection options to be
specificied concisely. */
fprintf(stderr, "Specifying the port number with"
- " --server is deprecated\n");
+ " --server is deprecated\n");
host_len = colon - amqp_server;
ci->host = malloc(host_len + 1);
@@ -353,11 +361,12 @@ amqp_connection_state_t make_connection(void)
}
amqp_set_socket(conn, socket);
die_rpc(amqp_login(conn, ci.vhost, 0, 131072, 0,
- AMQP_SASL_METHOD_PLAIN,
- ci.user, ci.password),
- "logging in to AMQP server");
- if (!amqp_channel_open(conn, 1))
+ AMQP_SASL_METHOD_PLAIN,
+ ci.user, ci.password),
+ "logging in to AMQP server");
+ if (!amqp_channel_open(conn, 1)) {
die_rpc(amqp_get_rpc_reply(conn), "opening channel");
+ }
return conn;
}