summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Antonuk <alan.antonuk@gmail.com>2013-04-08 14:52:53 -0700
committerAlan Antonuk <alan.antonuk@gmail.com>2013-04-08 14:52:53 -0700
commitad2b116059e22d393b7e44ad54f345a3fb4e267b (patch)
treecfd98876757f85d9fe7bfe56d179bd5fb63a8ff0
parentf3074030723b840690458983b63c746549aa3bd5 (diff)
downloadrabbitmq-c-github-ask-ad2b116059e22d393b7e44ad54f345a3fb4e267b.tar.gz
Formatted source code with astyle utilty
-rw-r--r--.astyle4
-rw-r--r--examples/amqp_bind.c13
-rw-r--r--examples/amqp_consumer.c32
-rw-r--r--examples/amqp_exchange_declare.c7
-rw-r--r--examples/amqp_listen.c72
-rw-r--r--examples/amqp_listenq.c68
-rw-r--r--examples/amqp_producer.c29
-rw-r--r--examples/amqp_rpc_sendstring_client.c18
-rw-r--r--examples/amqp_sendstring.c21
-rw-r--r--examples/amqp_unbind.c13
-rw-r--r--examples/utils.c119
-rw-r--r--examples/win32/platform_utils.c2
-rw-r--r--librabbitmq/amqp.h98
-rw-r--r--librabbitmq/amqp_api.c86
-rw-r--r--librabbitmq/amqp_connection.c147
-rw-r--r--librabbitmq/amqp_mem.c48
-rw-r--r--librabbitmq/amqp_private.h160
-rw-r--r--librabbitmq/amqp_socket.c294
-rw-r--r--librabbitmq/amqp_table.c143
-rw-r--r--librabbitmq/amqp_url.c298
-rw-r--r--librabbitmq/unix/socket.c35
-rw-r--r--librabbitmq/win32/socket.c59
-rw-r--r--librabbitmq/win32/socket.h6
-rw-r--r--tests/test_parse_url.c214
-rw-r--r--tests/test_tables.c60
-rw-r--r--tools/common.c627
-rw-r--r--tools/common.h16
-rw-r--r--tools/consume.c307
-rw-r--r--tools/declare_queue.c69
-rw-r--r--tools/delete_queue.c72
-rw-r--r--tools/get.c55
-rw-r--r--tools/publish.c156
-rw-r--r--tools/unix/process.c56
-rw-r--r--tools/unix/process.h4
-rw-r--r--tools/win32/compat.c27
-rw-r--r--tools/win32/process.c327
-rw-r--r--tools/win32/process.h4
37 files changed, 2014 insertions, 1752 deletions
diff --git a/.astyle b/.astyle
new file mode 100644
index 0000000..11156b1
--- /dev/null
+++ b/.astyle
@@ -0,0 +1,4 @@
+--style=1tbs
+--indent=spaces=2
+--indent-preprocessor
+--align-pointer=name
diff --git a/examples/amqp_bind.c b/examples/amqp_bind.c
index 7864871..3c9a863 100644
--- a/examples/amqp_bind.c
+++ b/examples/amqp_bind.c
@@ -41,7 +41,8 @@
#include "utils.h"
-int main(int argc, char const * const *argv) {
+int main(int argc, char const *const *argv)
+{
char const *hostname;
int port;
char const *exchange;
@@ -67,15 +68,15 @@ int main(int argc, char const * const *argv) {
die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket");
amqp_set_sockfd(conn, sockfd);
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
- "Logging in");
+ "Logging in");
amqp_channel_open(conn, 1);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
amqp_queue_bind(conn, 1,
- amqp_cstring_bytes(queue),
- amqp_cstring_bytes(exchange),
- amqp_cstring_bytes(bindingkey),
- amqp_empty_table);
+ amqp_cstring_bytes(queue),
+ amqp_cstring_bytes(exchange),
+ amqp_cstring_bytes(bindingkey),
+ amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Unbinding");
die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
diff --git a/examples/amqp_consumer.c b/examples/amqp_consumer.c
index c1c0873..aa63346 100644
--- a/examples/amqp_consumer.c
+++ b/examples/amqp_consumer.c
@@ -66,7 +66,7 @@ static void run(amqp_connection_state_t conn)
int countOverInterval = received - previous_received;
double intervalRate = countOverInterval / ((now - previous_report_time) / 1000000.0);
printf("%d ms: Received %d - %d since last report (%d Hz)\n",
- (int)(now - start_time) / 1000, received, countOverInterval, (int) intervalRate);
+ (int)(now - start_time) / 1000, received, countOverInterval, (int) intervalRate);
previous_received = received;
previous_report_time = now;
@@ -75,18 +75,22 @@ static void run(amqp_connection_state_t conn)
amqp_maybe_release_buffers(conn);
result = amqp_simple_wait_frame(conn, &frame);
- if (result < 0)
+ if (result < 0) {
return;
+ }
- if (frame.frame_type != AMQP_FRAME_METHOD)
+ if (frame.frame_type != AMQP_FRAME_METHOD) {
continue;
+ }
- if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD)
+ if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) {
continue;
+ }
result = amqp_simple_wait_frame(conn, &frame);
- if (result < 0)
+ if (result < 0) {
return;
+ }
if (frame.frame_type != AMQP_FRAME_HEADER) {
fprintf(stderr, "Expected header!");
@@ -98,12 +102,13 @@ static void run(amqp_connection_state_t conn)
while (body_received < body_target) {
result = amqp_simple_wait_frame(conn, &frame);
- if (result < 0)
- return;
+ if (result < 0) {
+ return;
+ }
if (frame.frame_type != AMQP_FRAME_BODY) {
- fprintf(stderr, "Expected body!");
- abort();
+ fprintf(stderr, "Expected body!");
+ abort();
}
body_received += frame.payload.body_fragment.len;
@@ -114,7 +119,8 @@ static void run(amqp_connection_state_t conn)
}
}
-int main(int argc, char const * const *argv) {
+int main(int argc, char const *const *argv)
+{
char const *hostname;
int port;
char const *exchange;
@@ -140,13 +146,13 @@ int main(int argc, char const * const *argv) {
die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket");
amqp_set_sockfd(conn, sockfd);
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
- "Logging in");
+ "Logging in");
amqp_channel_open(conn, 1);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
{
amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_empty_bytes, 0, 0, 0, 1,
- amqp_empty_table);
+ amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue");
queuename = amqp_bytes_malloc_dup(r->queue);
if (queuename.bytes == NULL) {
@@ -156,7 +162,7 @@ int main(int argc, char const * const *argv) {
}
amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey),
- amqp_empty_table);
+ amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue");
amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
diff --git a/examples/amqp_exchange_declare.c b/examples/amqp_exchange_declare.c
index 74f7465..749109c 100644
--- a/examples/amqp_exchange_declare.c
+++ b/examples/amqp_exchange_declare.c
@@ -41,7 +41,8 @@
#include "utils.h"
-int main(int argc, char const * const *argv) {
+int main(int argc, char const *const *argv)
+{
char const *hostname;
int port;
char const *exchange;
@@ -65,12 +66,12 @@ int main(int argc, char const * const *argv) {
die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket");
amqp_set_sockfd(conn, sockfd);
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
- "Logging in");
+ "Logging in");
amqp_channel_open(conn, 1);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
amqp_exchange_declare(conn, 1, amqp_cstring_bytes(exchange), amqp_cstring_bytes(exchangetype),
- 0, 0, amqp_empty_table);
+ 0, 0, amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring exchange");
die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
diff --git a/examples/amqp_listen.c b/examples/amqp_listen.c
index a589fa5..0347375 100644
--- a/examples/amqp_listen.c
+++ b/examples/amqp_listen.c
@@ -43,7 +43,8 @@
#include "utils.h"
-int main(int argc, char const * const *argv) {
+int main(int argc, char const *const *argv)
+{
char const *hostname;
int port;
char const *exchange;
@@ -69,13 +70,13 @@ int main(int argc, char const * const *argv) {
die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket");
amqp_set_sockfd(conn, sockfd);
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
- "Logging in");
+ "Logging in");
amqp_channel_open(conn, 1);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
{
amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_empty_bytes, 0, 0, 0, 1,
- amqp_empty_table);
+ amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue");
queuename = amqp_bytes_malloc_dup(r->queue);
if (queuename.bytes == NULL) {
@@ -85,7 +86,7 @@ int main(int argc, char const * const *argv) {
}
amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey),
- amqp_empty_table);
+ amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue");
amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
@@ -104,35 +105,39 @@ int main(int argc, char const * const *argv) {
amqp_maybe_release_buffers(conn);
result = amqp_simple_wait_frame(conn, &frame);
printf("Result %d\n", result);
- if (result < 0)
- break;
+ if (result < 0) {
+ break;
+ }
printf("Frame type %d, channel %d\n", frame.frame_type, frame.channel);
- if (frame.frame_type != AMQP_FRAME_METHOD)
- continue;
+ if (frame.frame_type != AMQP_FRAME_METHOD) {
+ continue;
+ }
printf("Method %s\n", amqp_method_name(frame.payload.method.id));
- if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD)
- continue;
+ if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) {
+ continue;
+ }
d = (amqp_basic_deliver_t *) frame.payload.method.decoded;
printf("Delivery %u, exchange %.*s routingkey %.*s\n",
- (unsigned) d->delivery_tag,
- (int) d->exchange.len, (char *) d->exchange.bytes,
- (int) d->routing_key.len, (char *) d->routing_key.bytes);
+ (unsigned) d->delivery_tag,
+ (int) d->exchange.len, (char *) d->exchange.bytes,
+ (int) d->routing_key.len, (char *) d->routing_key.bytes);
result = amqp_simple_wait_frame(conn, &frame);
- if (result < 0)
- break;
+ if (result < 0) {
+ break;
+ }
if (frame.frame_type != AMQP_FRAME_HEADER) {
- fprintf(stderr, "Expected header!");
- abort();
+ fprintf(stderr, "Expected header!");
+ abort();
}
p = (amqp_basic_properties_t *) frame.payload.properties.decoded;
if (p->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
- printf("Content-type: %.*s\n",
- (int) p->content_type.len, (char *) p->content_type.bytes);
+ printf("Content-type: %.*s\n",
+ (int) p->content_type.len, (char *) p->content_type.bytes);
}
printf("----\n");
@@ -140,26 +145,27 @@ int main(int argc, char const * const *argv) {
body_received = 0;
while (body_received < body_target) {
- result = amqp_simple_wait_frame(conn, &frame);
- if (result < 0)
- break;
+ result = amqp_simple_wait_frame(conn, &frame);
+ if (result < 0) {
+ break;
+ }
- if (frame.frame_type != AMQP_FRAME_BODY) {
- fprintf(stderr, "Expected body!");
- abort();
- }
+ if (frame.frame_type != AMQP_FRAME_BODY) {
+ fprintf(stderr, "Expected body!");
+ abort();
+ }
- body_received += frame.payload.body_fragment.len;
- assert(body_received <= body_target);
+ body_received += frame.payload.body_fragment.len;
+ assert(body_received <= body_target);
- amqp_dump(frame.payload.body_fragment.bytes,
- frame.payload.body_fragment.len);
+ amqp_dump(frame.payload.body_fragment.bytes,
+ frame.payload.body_fragment.len);
}
if (body_received != body_target) {
- /* Can only happen when amqp_simple_wait_frame returns <= 0 */
- /* We break here to close the connection */
- break;
+ /* Can only happen when amqp_simple_wait_frame returns <= 0 */
+ /* We break here to close the connection */
+ break;
}
}
}
diff --git a/examples/amqp_listenq.c b/examples/amqp_listenq.c
index 8af0f08..762831d 100644
--- a/examples/amqp_listenq.c
+++ b/examples/amqp_listenq.c
@@ -43,7 +43,8 @@
#include "utils.h"
-int main(int argc, char const * const *argv) {
+int main(int argc, char const *const *argv)
+{
char const *hostname;
int port;
char const *queuename;
@@ -65,7 +66,7 @@ int main(int argc, char const * const *argv) {
die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket");
amqp_set_sockfd(conn, sockfd);
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
- "Logging in");
+ "Logging in");
amqp_channel_open(conn, 1);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
@@ -85,35 +86,39 @@ int main(int argc, char const * const *argv) {
amqp_maybe_release_buffers(conn);
result = amqp_simple_wait_frame(conn, &frame);
printf("Result %d\n", result);
- if (result < 0)
- break;
+ if (result < 0) {
+ break;
+ }
printf("Frame type %d, channel %d\n", frame.frame_type, frame.channel);
- if (frame.frame_type != AMQP_FRAME_METHOD)
- continue;
+ if (frame.frame_type != AMQP_FRAME_METHOD) {
+ continue;
+ }
printf("Method %s\n", amqp_method_name(frame.payload.method.id));
- if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD)
- continue;
+ if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) {
+ continue;
+ }
d = (amqp_basic_deliver_t *) frame.payload.method.decoded;
printf("Delivery %u, exchange %.*s routingkey %.*s\n",
- (unsigned) d->delivery_tag,
- (int) d->exchange.len, (char *) d->exchange.bytes,
- (int) d->routing_key.len, (char *) d->routing_key.bytes);
+ (unsigned) d->delivery_tag,
+ (int) d->exchange.len, (char *) d->exchange.bytes,
+ (int) d->routing_key.len, (char *) d->routing_key.bytes);
result = amqp_simple_wait_frame(conn, &frame);
- if (result < 0)
- break;
+ if (result < 0) {
+ break;
+ }
if (frame.frame_type != AMQP_FRAME_HEADER) {
- fprintf(stderr, "Expected header!");
- abort();
+ fprintf(stderr, "Expected header!");
+ abort();
}
p = (amqp_basic_properties_t *) frame.payload.properties.decoded;
if (p->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
- printf("Content-type: %.*s\n",
- (int) p->content_type.len, (char *) p->content_type.bytes);
+ printf("Content-type: %.*s\n",
+ (int) p->content_type.len, (char *) p->content_type.bytes);
}
printf("----\n");
@@ -121,26 +126,27 @@ int main(int argc, char const * const *argv) {
body_received = 0;
while (body_received < body_target) {
- result = amqp_simple_wait_frame(conn, &frame);
- if (result < 0)
- break;
+ result = amqp_simple_wait_frame(conn, &frame);
+ if (result < 0) {
+ break;
+ }
- if (frame.frame_type != AMQP_FRAME_BODY) {
- fprintf(stderr, "Expected body!");
- abort();
- }
+ if (frame.frame_type != AMQP_FRAME_BODY) {
+ fprintf(stderr, "Expected body!");
+ abort();
+ }
- body_received += frame.payload.body_fragment.len;
- assert(body_received <= body_target);
+ body_received += frame.payload.body_fragment.len;
+ assert(body_received <= body_target);
- amqp_dump(frame.payload.body_fragment.bytes,
- frame.payload.body_fragment.len);
+ amqp_dump(frame.payload.body_fragment.bytes,
+ frame.payload.body_fragment.len);
}
if (body_received != body_target) {
- /* Can only happen when amqp_simple_wait_frame returns <= 0 */
- /* We break here to close the connection */
- break;
+ /* Can only happen when amqp_simple_wait_frame returns <= 0 */
+ /* We break here to close the connection */
+ break;
}
amqp_basic_ack(conn, 1, d->delivery_tag, 0);
diff --git a/examples/amqp_producer.c b/examples/amqp_producer.c
index d85b944..4a1cd68 100644
--- a/examples/amqp_producer.c
+++ b/examples/amqp_producer.c
@@ -44,9 +44,9 @@
#define SUMMARY_EVERY_US 1000000
static void send_batch(amqp_connection_state_t conn,
- char const *queue_name,
- int rate_limit,
- int message_count)
+ char const *queue_name,
+ int rate_limit,
+ int message_count)
{
uint64_t start_time = now_microseconds();
int i;
@@ -69,20 +69,20 @@ static void send_batch(amqp_connection_state_t conn,
uint64_t now = now_microseconds();
die_on_error(amqp_basic_publish(conn,
- 1,
- amqp_cstring_bytes("amq.direct"),
- amqp_cstring_bytes(queue_name),
- 0,
- 0,
- NULL,
- message_bytes),
- "Publishing");
+ 1,
+ amqp_cstring_bytes("amq.direct"),
+ amqp_cstring_bytes(queue_name),
+ 0,
+ 0,
+ NULL,
+ message_bytes),
+ "Publishing");
sent++;
if (now > next_summary_time) {
int countOverInterval = sent - previous_sent;
double intervalRate = countOverInterval / ((now - previous_report_time) / 1000000.0);
printf("%d ms: Sent %d - %d since last report (%d Hz)\n",
- (int)(now - start_time) / 1000, sent, countOverInterval, (int) intervalRate);
+ (int)(now - start_time) / 1000, sent, countOverInterval, (int) intervalRate);
previous_sent = sent;
previous_report_time = now;
@@ -105,7 +105,8 @@ static void send_batch(amqp_connection_state_t conn,
}
}
-int main(int argc, char const * const *argv) {
+int main(int argc, char const *const *argv)
+{
char const *hostname;
int port;
int rate_limit;
@@ -129,7 +130,7 @@ int main(int argc, char const * const *argv) {
die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket");
amqp_set_sockfd(conn, sockfd);
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
- "Logging in");
+ "Logging in");
amqp_channel_open(conn, 1);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
diff --git a/examples/amqp_rpc_sendstring_client.c b/examples/amqp_rpc_sendstring_client.c
index 04c073e..7914e2c 100644
--- a/examples/amqp_rpc_sendstring_client.c
+++ b/examples/amqp_rpc_sendstring_client.c
@@ -43,7 +43,8 @@
#include "utils.h"
-int main(int argc, char* argv[]) {
+int main(int argc, char *argv[])
+{
char const *hostname;
int port;
char const *exchange;
@@ -152,16 +153,19 @@ int main(int argc, char* argv[]) {
amqp_maybe_release_buffers(conn);
result = amqp_simple_wait_frame(conn, &frame);
printf("Result: %d\n", result);
- if (result < 0)
+ if (result < 0) {
break;
+ }
printf("Frame type: %d channel: %d\n", frame.frame_type, frame.channel);
- if (frame.frame_type != AMQP_FRAME_METHOD)
+ if (frame.frame_type != AMQP_FRAME_METHOD) {
continue;
+ }
printf("Method: %s\n", amqp_method_name(frame.payload.method.id));
- if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD)
+ if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) {
continue;
+ }
d = (amqp_basic_deliver_t *) frame.payload.method.decoded;
printf("Delivery: %u exchange: %.*s routingkey: %.*s\n",
@@ -170,8 +174,9 @@ int main(int argc, char* argv[]) {
(int) d->routing_key.len, (char *) d->routing_key.bytes);
result = amqp_simple_wait_frame(conn, &frame);
- if (result < 0)
+ if (result < 0) {
break;
+ }
if (frame.frame_type != AMQP_FRAME_HEADER) {
fprintf(stderr, "Expected header!");
@@ -189,8 +194,9 @@ int main(int argc, char* argv[]) {
while (body_received < body_target) {
result = amqp_simple_wait_frame(conn, &frame);
- if (result < 0)
+ if (result < 0) {
break;
+ }
if (frame.frame_type != AMQP_FRAME_BODY) {
fprintf(stderr, "Expected body!");
diff --git a/examples/amqp_sendstring.c b/examples/amqp_sendstring.c
index e108ea7..f1456c9 100644
--- a/examples/amqp_sendstring.c
+++ b/examples/amqp_sendstring.c
@@ -41,7 +41,8 @@
#include "utils.h"
-int main(int argc, char const * const *argv) {
+int main(int argc, char const *const *argv)
+{
char const *hostname;
int port;
char const *exchange;
@@ -67,7 +68,7 @@ int main(int argc, char const * const *argv) {
die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket");
amqp_set_sockfd(conn, sockfd);
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
- "Logging in");
+ "Logging in");
amqp_channel_open(conn, 1);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
@@ -77,14 +78,14 @@ int main(int argc, char const * const *argv) {
props.content_type = amqp_cstring_bytes("text/plain");
props.delivery_mode = 2; /* persistent delivery mode */
die_on_error(amqp_basic_publish(conn,
- 1,
- amqp_cstring_bytes(exchange),
- amqp_cstring_bytes(routingkey),
- 0,
- 0,
- &props,
- amqp_cstring_bytes(messagebody)),
- "Publishing");
+ 1,
+ amqp_cstring_bytes(exchange),
+ amqp_cstring_bytes(routingkey),
+ 0,
+ 0,
+ &props,
+ amqp_cstring_bytes(messagebody)),
+ "Publishing");
}
die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
diff --git a/examples/amqp_unbind.c b/examples/amqp_unbind.c
index 546b356..011f561 100644
--- a/examples/amqp_unbind.c
+++ b/examples/amqp_unbind.c
@@ -41,7 +41,8 @@
#include "utils.h"
-int main(int argc, char const * const *argv) {
+int main(int argc, char const *const *argv)
+{
char const *hostname;
int port;
char const *exchange;
@@ -67,15 +68,15 @@ int main(int argc, char const * const *argv) {
die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket");
amqp_set_sockfd(conn, sockfd);
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
- "Logging in");
+ "Logging in");
amqp_channel_open(conn, 1);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
amqp_queue_unbind(conn, 1,
- amqp_cstring_bytes(queue),
- amqp_cstring_bytes(exchange),
- amqp_cstring_bytes(bindingkey),
- amqp_empty_table);
+ amqp_cstring_bytes(queue),
+ amqp_cstring_bytes(exchange),
+ amqp_cstring_bytes(bindingkey),
+ amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Unbinding");
die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
diff --git a/examples/utils.c b/examples/utils.c
index 691f14a..328a5e5 100644
--- a/examples/utils.c
+++ b/examples/utils.c
@@ -42,7 +42,8 @@
#include "utils.h"
-void die_on_error(int x, char const *context) {
+void die_on_error(int x, char const *context)
+{
if (x < 0) {
char *errstr = amqp_error_string(-x);
fprintf(stderr, "%s: %s\n", context, errstr);
@@ -51,85 +52,93 @@ void die_on_error(int x, char const *context) {
}
}
-void die_on_amqp_error(amqp_rpc_reply_t x, char const *context) {
+void die_on_amqp_error(amqp_rpc_reply_t x, char const *context)
+{
switch (x.reply_type) {
- case AMQP_RESPONSE_NORMAL:
- return;
-
- case AMQP_RESPONSE_NONE:
- fprintf(stderr, "%s: missing RPC reply type!\n", context);
+ case AMQP_RESPONSE_NORMAL:
+ return;
+
+ case AMQP_RESPONSE_NONE:
+ fprintf(stderr, "%s: missing RPC reply type!\n", context);
+ break;
+
+ case AMQP_RESPONSE_LIBRARY_EXCEPTION:
+ fprintf(stderr, "%s: %s\n", context, amqp_error_string(x.library_error));
+ break;
+
+ case AMQP_RESPONSE_SERVER_EXCEPTION:
+ switch (x.reply.id) {
+ case AMQP_CONNECTION_CLOSE_METHOD: {
+ amqp_connection_close_t *m = (amqp_connection_close_t *) x.reply.decoded;
+ fprintf(stderr, "%s: server connection error %d, message: %.*s\n",
+ context,
+ m->reply_code,
+ (int) m->reply_text.len, (char *) m->reply_text.bytes);
break;
-
- case AMQP_RESPONSE_LIBRARY_EXCEPTION:
- fprintf(stderr, "%s: %s\n", context, amqp_error_string(x.library_error));
+ }
+ case AMQP_CHANNEL_CLOSE_METHOD: {
+ amqp_channel_close_t *m = (amqp_channel_close_t *) x.reply.decoded;
+ fprintf(stderr, "%s: server channel error %d, message: %.*s\n",
+ context,
+ m->reply_code,
+ (int) m->reply_text.len, (char *) m->reply_text.bytes);
break;
-
- case AMQP_RESPONSE_SERVER_EXCEPTION:
- switch (x.reply.id) {
- case AMQP_CONNECTION_CLOSE_METHOD: {
- amqp_connection_close_t *m = (amqp_connection_close_t *) x.reply.decoded;
- fprintf(stderr, "%s: server connection error %d, message: %.*s\n",
- context,
- m->reply_code,
- (int) m->reply_text.len, (char *) m->reply_text.bytes);
- break;
- }
- case AMQP_CHANNEL_CLOSE_METHOD: {
- amqp_channel_close_t *m = (amqp_channel_close_t *) x.reply.decoded;
- fprintf(stderr, "%s: server channel error %d, message: %.*s\n",
- context,
- m->reply_code,
- (int) m->reply_text.len, (char *) m->reply_text.bytes);
- break;
- }
- default:
- fprintf(stderr, "%s: unknown server error, method id 0x%08X\n", context, x.reply.id);
- break;
- }
+ }
+ default:
+ fprintf(stderr, "%s: unknown server error, method id 0x%08X\n", context, x.reply.id);
break;
+ }
+ break;
}
exit(1);
}
-static void dump_row(long count, int numinrow, int *chs) {
+static void dump_row(long count, int numinrow, int *chs)
+{
int i;
printf("%08lX:", count - numinrow);
if (numinrow > 0) {
for (i = 0; i < numinrow; i++) {
- if (i == 8)
- printf(" :");
+ if (i == 8) {
+ printf(" :");
+ }
printf(" %02X", chs[i]);
}
for (i = numinrow; i < 16; i++) {
- if (i == 8)
- printf(" :");
+ if (i == 8) {
+ printf(" :");
+ }
printf(" ");
}
printf(" ");
for (i = 0; i < numinrow; i++) {
- if (isprint(chs[i]))
- printf("%c", chs[i]);
- else
- printf(".");
+ if (isprint(chs[i])) {
+ printf("%c", chs[i]);
+ } else {
+ printf(".");
+ }
}
}
printf("\n");
}
-static int rows_eq(int *a, int *b) {
+static int rows_eq(int *a, int *b)
+{
int i;
for (i=0; i<16; i++)
- if (a[i] != b[i])
+ if (a[i] != b[i]) {
return 0;
+ }
return 1;
}
-void amqp_dump(void const *buffer, size_t len) {
+void amqp_dump(void const *buffer, size_t len)
+{
unsigned char *buf = (unsigned char *) buffer;
long count = 0;
int numinrow = 0;
@@ -145,17 +154,18 @@ void amqp_dump(void const *buffer, size_t len) {
int i;
if (rows_eq(oldchs, chs)) {
- if (!showed_dots) {
- showed_dots = 1;
- printf(" .. .. .. .. .. .. .. .. : .. .. .. .. .. .. .. ..\n");
- }
+ if (!showed_dots) {
+ showed_dots = 1;
+ printf(" .. .. .. .. .. .. .. .. : .. .. .. .. .. .. .. ..\n");
+ }
} else {
- showed_dots = 0;
- dump_row(count, numinrow, chs);
+ showed_dots = 0;
+ dump_row(count, numinrow, chs);
}
- for (i=0; i<16; i++)
- oldchs[i] = chs[i];
+ for (i=0; i<16; i++) {
+ oldchs[i] = chs[i];
+ }
numinrow = 0;
}
@@ -166,6 +176,7 @@ void amqp_dump(void const *buffer, size_t len) {
dump_row(count, numinrow, chs);
- if (numinrow != 0)
+ if (numinrow != 0) {
printf("%08lX:\n", count);
+ }
}
diff --git a/examples/win32/platform_utils.c b/examples/win32/platform_utils.c
index c7807f9..dff6243 100644
--- a/examples/win32/platform_utils.c
+++ b/examples/win32/platform_utils.c
@@ -40,7 +40,7 @@ uint64_t now_microseconds(void)
FILETIME ft;
GetSystemTimeAsFileTime(&ft);
return (((uint64_t)ft.dwHighDateTime << 32) | (uint64_t)ft.dwLowDateTime)
- / 10;
+ / 10;
}
void microsleep(int usec)
diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h
index 5de39d5..c379125 100644
--- a/librabbitmq/amqp.h
+++ b/librabbitmq/amqp.h
@@ -152,28 +152,28 @@ typedef struct amqp_array_t_ {
t t Boolean
b b Signed 8-bit
B Unsigned 8-bit
- U s Signed 16-bit (A1)
+ U s Signed 16-bit (A1)
u Unsigned 16-bit
- I I I Signed 32-bit
- i Unsigned 32-bit
- L l Signed 64-bit (B)
- l Unsigned 64-bit
- f f 32-bit float
- d d 64-bit float
- D D D Decimal
- s Short string (A2)
- S S S Long string
- A Nested Array
- T T T Timestamp (u64)
- F F F Nested Table
- V V V Void
- x Byte array
+ I I I Signed 32-bit
+ i Unsigned 32-bit
+ L l Signed 64-bit (B)
+ l Unsigned 64-bit
+ f f 32-bit float
+ d d 64-bit float
+ D D D Decimal
+ s Short string (A2)
+ S S S Long string
+ A Nested Array
+ T T T Timestamp (u64)
+ F F F Nested Table
+ V V V Void
+ x Byte array
Remarks:
A1, A2: Notice how the types **CONFLICT** here. In Qpid and Rabbit,
's' means a signed 16-bit integer; in 0-9-1, it means a
- short string.
+ short string.
B: Notice how the signednesses **CONFLICT** here. In Qpid and Rabbit,
'l' means a signed 64-bit integer; in 0-9-1, it means an unsigned
@@ -360,9 +360,9 @@ AMQP_CALL amqp_set_sockfd(amqp_connection_state_t state, int sockfd);
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_tune_connection(amqp_connection_state_t state,
- int channel_max,
- int frame_max,
- int heartbeat);
+ int channel_max,
+ int frame_max,
+ int heartbeat);
AMQP_PUBLIC_FUNCTION
int
@@ -375,8 +375,8 @@ AMQP_CALL amqp_destroy_connection(amqp_connection_state_t state);
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_handle_input(amqp_connection_state_t state,
- amqp_bytes_t received_data,
- amqp_frame_t *decoded_frame);
+ amqp_bytes_t received_data,
+ amqp_frame_t *decoded_frame);
AMQP_PUBLIC_FUNCTION
amqp_boolean_t
@@ -413,37 +413,37 @@ AMQP_CALL amqp_frames_enqueued(amqp_connection_state_t state);
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_simple_wait_frame(amqp_connection_state_t state,
- amqp_frame_t *decoded_frame);
+ amqp_frame_t *decoded_frame);
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_simple_wait_method(amqp_connection_state_t state,
- amqp_channel_t expected_channel,
- amqp_method_number_t expected_method,
- amqp_method_t *output);
+ amqp_channel_t expected_channel,
+ amqp_method_number_t expected_method,
+ amqp_method_t *output);
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_send_method(amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_method_number_t id,
- void *decoded);
+ amqp_channel_t channel,
+ amqp_method_number_t id,
+ void *decoded);
AMQP_PUBLIC_FUNCTION
amqp_rpc_reply_t
AMQP_CALL amqp_simple_rpc(amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_method_number_t request_id,
- amqp_method_number_t *expected_reply_ids,
- void *decoded_request_method);
+ amqp_channel_t channel,
+ amqp_method_number_t request_id,
+ amqp_method_number_t *expected_reply_ids,
+ void *decoded_request_method);
AMQP_PUBLIC_FUNCTION
void *
AMQP_CALL amqp_simple_rpc_decoded(amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_method_number_t request_id,
- amqp_method_number_t reply_id,
- void *decoded_request_method);
+ amqp_channel_t channel,
+ amqp_method_number_t request_id,
+ amqp_method_number_t reply_id,
+ void *decoded_request_method);
/*
* The API methods corresponding to most synchronous AMQP methods
@@ -465,23 +465,23 @@ AMQP_CALL amqp_get_rpc_reply(amqp_connection_state_t state);
AMQP_PUBLIC_FUNCTION
amqp_rpc_reply_t
AMQP_CALL amqp_login(amqp_connection_state_t state, char const *vhost,
- int channel_max, int frame_max, int heartbeat,
- amqp_sasl_method_enum sasl_method, ...);
+ int channel_max, int frame_max, int heartbeat,
+ amqp_sasl_method_enum sasl_method, ...);
struct amqp_basic_properties_t_;
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_basic_publish(amqp_connection_state_t state, amqp_channel_t channel,
- amqp_bytes_t exchange, amqp_bytes_t routing_key,
- amqp_boolean_t mandatory, amqp_boolean_t immediate,
- struct amqp_basic_properties_t_ const *properties,
- amqp_bytes_t body);
+ amqp_bytes_t exchange, amqp_bytes_t routing_key,
+ amqp_boolean_t mandatory, amqp_boolean_t immediate,
+ struct amqp_basic_properties_t_ const *properties,
+ amqp_bytes_t body);
AMQP_PUBLIC_FUNCTION
amqp_rpc_reply_t
AMQP_CALL amqp_channel_close(amqp_connection_state_t state, amqp_channel_t channel,
- int code);
+ int code);
AMQP_PUBLIC_FUNCTION
amqp_rpc_reply_t
@@ -490,17 +490,17 @@ AMQP_CALL amqp_connection_close(amqp_connection_state_t state, int code);
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_basic_ack(amqp_connection_state_t state, amqp_channel_t channel,
- uint64_t delivery_tag, amqp_boolean_t multiple);
+ uint64_t delivery_tag, amqp_boolean_t multiple);
AMQP_PUBLIC_FUNCTION
amqp_rpc_reply_t
AMQP_CALL amqp_basic_get(amqp_connection_state_t state, amqp_channel_t channel,
- amqp_bytes_t queue, amqp_boolean_t no_ack);
+ amqp_bytes_t queue, amqp_boolean_t no_ack);
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_basic_reject(amqp_connection_state_t state, amqp_channel_t channel,
- uint64_t delivery_tag, amqp_boolean_t requeue);
+ uint64_t delivery_tag, amqp_boolean_t requeue);
/*
* Can be used to see if there is data still in the buffer, if so
@@ -526,7 +526,7 @@ AMQP_CALL amqp_error_string(int err);
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_decode_table(amqp_bytes_t encoded, amqp_pool_t *pool,
- amqp_table_t *output, size_t *offset);
+ amqp_table_t *output, size_t *offset);
AMQP_PUBLIC_FUNCTION
int
@@ -541,11 +541,11 @@ struct amqp_connection_info {
};
AMQP_PUBLIC_FUNCTION
-void
+void
AMQP_CALL amqp_default_connection_info(struct amqp_connection_info *parsed);
AMQP_PUBLIC_FUNCTION
-int
+int
AMQP_CALL amqp_parse_url(char *url, struct amqp_connection_info *parsed);
AMQP_END_DECLS
diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c
index 6faabbb..aacaf91 100644
--- a/librabbitmq/amqp_api.c
+++ b/librabbitmq/amqp_api.c
@@ -62,10 +62,11 @@ char *amqp_error_string(int err)
switch (category) {
case ERROR_CATEGORY_CLIENT:
- if (err < 1 || err > ERROR_MAX)
+ if (err < 1 || err > ERROR_MAX) {
str = "(undefined librabbitmq error)";
- else
+ } else {
str = client_error_strings[err - 1];
+ }
break;
case ERROR_CATEGORY_OS:
@@ -80,31 +81,31 @@ char *amqp_error_string(int err)
void amqp_abort(const char *fmt, ...)
{
- va_list ap;
- va_start(ap, fmt);
- vfprintf(stderr, fmt, ap);
- va_end(ap);
- fputc('\n', stderr);
- abort();
+ va_list ap;
+ va_start(ap, fmt);
+ vfprintf(stderr, fmt, ap);
+ va_end(ap);
+ fputc('\n', stderr);
+ abort();
}
const amqp_bytes_t amqp_empty_bytes = { 0, NULL };
const amqp_table_t amqp_empty_table = { 0, NULL };
const amqp_array_t amqp_empty_array = { 0, NULL };
-#define RPC_REPLY(replytype) \
- (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL \
- ? (replytype *) state->most_recent_api_result.reply.decoded \
+#define RPC_REPLY(replytype)\
+ (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL\
+ ? (replytype *) state->most_recent_api_result.reply.decoded\
: NULL)
int amqp_basic_publish(amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_bytes_t exchange,
- amqp_bytes_t routing_key,
- amqp_boolean_t mandatory,
- amqp_boolean_t immediate,
- amqp_basic_properties_t const *properties,
- amqp_bytes_t body)
+ amqp_channel_t channel,
+ amqp_bytes_t exchange,
+ amqp_bytes_t routing_key,
+ amqp_boolean_t mandatory,
+ amqp_boolean_t immediate,
+ amqp_basic_properties_t const *properties,
+ amqp_bytes_t body)
{
amqp_frame_t f;
size_t body_offset;
@@ -121,8 +122,9 @@ int amqp_basic_publish(amqp_connection_state_t state,
m.ticket = 0;
res = amqp_send_method(state, channel, AMQP_BASIC_PUBLISH_METHOD, &m);
- if (res < 0)
+ if (res < 0) {
return res;
+ }
if (properties == NULL) {
memset(&default_properties, 0, sizeof(default_properties));
@@ -136,15 +138,17 @@ int amqp_basic_publish(amqp_connection_state_t state,
f.payload.properties.decoded = (void *) properties;
res = amqp_send_frame(state, &f);
- if (res < 0)
+ if (res < 0) {
return res;
+ }
body_offset = 0;
while (body_offset < body.len) {
size_t remaining = body.len - body_offset;
- if (remaining == 0)
+ if (remaining == 0) {
break;
+ }
f.frame_type = AMQP_FRAME_BODY;
f.channel = channel;
@@ -157,16 +161,17 @@ int amqp_basic_publish(amqp_connection_state_t state,
body_offset += f.payload.body_fragment.len;
res = amqp_send_frame(state, &f);
- if (res < 0)
+ if (res < 0) {
return res;
+ }
}
return 0;
}
amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state,
- amqp_channel_t channel,
- int code)
+ amqp_channel_t channel,
+ int code)
{
char codestr[13];
amqp_method_number_t replies[2] = { AMQP_CHANNEL_CLOSE_OK_METHOD, 0};
@@ -179,11 +184,11 @@ amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state,
req.method_id = 0;
return amqp_simple_rpc(state, channel, AMQP_CHANNEL_CLOSE_METHOD,
- replies, &req);
+ replies, &req);
}
amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state,
- int code)
+ int code)
{
char codestr[13];
amqp_method_number_t replies[2] = { AMQP_CONNECTION_CLOSE_OK_METHOD, 0};
@@ -196,13 +201,13 @@ amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state,
req.method_id = 0;
return amqp_simple_rpc(state, 0, AMQP_CONNECTION_CLOSE_METHOD,
- replies, &req);
+ replies, &req);
}
int amqp_basic_ack(amqp_connection_state_t state,
- amqp_channel_t channel,
- uint64_t delivery_tag,
- amqp_boolean_t multiple)
+ amqp_channel_t channel,
+ uint64_t delivery_tag,
+ amqp_boolean_t multiple)
{
amqp_basic_ack_t m;
m.delivery_tag = delivery_tag;
@@ -211,28 +216,29 @@ int amqp_basic_ack(amqp_connection_state_t state,
}
amqp_rpc_reply_t amqp_basic_get(amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_bytes_t queue,
- amqp_boolean_t no_ack)
+ amqp_channel_t channel,
+ amqp_bytes_t queue,
+ amqp_boolean_t no_ack)
{
amqp_method_number_t replies[] = { AMQP_BASIC_GET_OK_METHOD,
- AMQP_BASIC_GET_EMPTY_METHOD,
- 0 };
+ AMQP_BASIC_GET_EMPTY_METHOD,
+ 0
+ };
amqp_basic_get_t req;
req.ticket = 0;
req.queue = queue;
req.no_ack = no_ack;
state->most_recent_api_result = amqp_simple_rpc(state, channel,
- AMQP_BASIC_GET_METHOD,
- replies, &req);
+ AMQP_BASIC_GET_METHOD,
+ replies, &req);
return state->most_recent_api_result;
}
int amqp_basic_reject(amqp_connection_state_t state,
- amqp_channel_t channel,
- uint64_t delivery_tag,
- amqp_boolean_t requeue)
+ amqp_channel_t channel,
+ uint64_t delivery_tag,
+ amqp_boolean_t requeue)
{
amqp_basic_reject_t req;
req.delivery_tag = delivery_tag;
diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c
index 421c745..be7b1a8 100644
--- a/librabbitmq/amqp_connection.c
+++ b/librabbitmq/amqp_connection.c
@@ -46,36 +46,40 @@
#define INITIAL_DECODING_POOL_PAGE_SIZE 131072
#define INITIAL_INBOUND_SOCK_BUFFER_SIZE 131072
-#define ENFORCE_STATE(statevec, statenum) \
- { \
- amqp_connection_state_t _check_state = (statevec); \
- size_t _wanted_state = (statenum); \
- if (_check_state->state != _wanted_state) \
+#define ENFORCE_STATE(statevec, statenum) \
+ { \
+ amqp_connection_state_t _check_state = (statevec); \
+ size_t _wanted_state = (statenum); \
+ if (_check_state->state != _wanted_state) \
amqp_abort("Programming error: invalid AMQP connection state: expected %d, got %d", \
- _wanted_state, \
- _check_state->state); \
+ _wanted_state, \
+ _check_state->state); \
}
-amqp_connection_state_t amqp_new_connection(void) {
+amqp_connection_state_t amqp_new_connection(void)
+{
int res;
amqp_connection_state_t state =
(amqp_connection_state_t) calloc(1, sizeof(struct amqp_connection_state_t_));
- if (state == NULL)
+ if (state == NULL) {
return NULL;
+ }
init_amqp_pool(&state->frame_pool, INITIAL_FRAME_POOL_PAGE_SIZE);
init_amqp_pool(&state->decoding_pool, INITIAL_DECODING_POOL_PAGE_SIZE);
res = amqp_tune_connection(state, 0, INITIAL_FRAME_POOL_PAGE_SIZE, 0);
- if (-ERROR_NO_MEMORY == res)
+ if (-ERROR_NO_MEMORY == res) {
return NULL;
- else if (0 != res)
+ } else if (0 != res) {
goto out_nomem;
+ }
state->inbound_buffer.bytes = amqp_pool_alloc(&state->frame_pool, state->inbound_buffer.len);
- if (state->inbound_buffer.bytes == NULL)
+ if (state->inbound_buffer.bytes == NULL) {
goto out_nomem;
+ }
state->state = CONNECTION_STATE_INITIAL;
/* the server protocol version response is 8 bytes, which conveniently
@@ -85,12 +89,13 @@ amqp_connection_state_t amqp_new_connection(void) {
state->sockfd = -1;
state->sock_inbound_buffer.len = INITIAL_INBOUND_SOCK_BUFFER_SIZE;
state->sock_inbound_buffer.bytes = malloc(INITIAL_INBOUND_SOCK_BUFFER_SIZE);
- if (state->sock_inbound_buffer.bytes == NULL)
+ if (state->sock_inbound_buffer.bytes == NULL) {
goto out_nomem;
+ }
return state;
- out_nomem:
+out_nomem:
free(state->sock_inbound_buffer.bytes);
empty_amqp_pool(&state->frame_pool);
empty_amqp_pool(&state->decoding_pool);
@@ -98,20 +103,21 @@ amqp_connection_state_t amqp_new_connection(void) {
return NULL;
}
-int amqp_get_sockfd(amqp_connection_state_t state) {
+int amqp_get_sockfd(amqp_connection_state_t state)
+{
return state->sockfd;
}
void amqp_set_sockfd(amqp_connection_state_t state,
- int sockfd)
+ int sockfd)
{
state->sockfd = sockfd;
}
int amqp_tune_connection(amqp_connection_state_t state,
- int channel_max,
- int frame_max,
- int heartbeat)
+ int channel_max,
+ int frame_max,
+ int heartbeat)
{
void *newbuf;
@@ -136,11 +142,13 @@ int amqp_tune_connection(amqp_connection_state_t state,
return 0;
}
-int amqp_get_channel_max(amqp_connection_state_t state) {
+int amqp_get_channel_max(amqp_connection_state_t state)
+{
return state->channel_max;
}
-int amqp_destroy_connection(amqp_connection_state_t state) {
+int amqp_destroy_connection(amqp_connection_state_t state)
+{
int s = state->sockfd;
empty_amqp_pool(&state->frame_pool);
@@ -149,13 +157,15 @@ int amqp_destroy_connection(amqp_connection_state_t state) {
free(state->sock_inbound_buffer.bytes);
free(state);
- if (s >= 0 && amqp_socket_close(s) < 0)
+ if (s >= 0 && amqp_socket_close(s) < 0) {
return -amqp_socket_error();
- else
+ } else {
return 0;
+ }
}
-static void return_to_idle(amqp_connection_state_t state) {
+static void return_to_idle(amqp_connection_state_t state)
+{
state->inbound_buffer.bytes = NULL;
state->inbound_offset = 0;
state->target_size = HEADER_SIZE;
@@ -163,15 +173,16 @@ static void return_to_idle(amqp_connection_state_t state) {
}
static size_t consume_data(amqp_connection_state_t state,
- amqp_bytes_t *received_data)
+ amqp_bytes_t *received_data)
{
/* how much data is available and will fit? */
size_t bytes_consumed = state->target_size - state->inbound_offset;
- if (received_data->len < bytes_consumed)
+ if (received_data->len < bytes_consumed) {
bytes_consumed = received_data->len;
+ }
memcpy(amqp_offset(state->inbound_buffer.bytes, state->inbound_offset),
- received_data->bytes, bytes_consumed);
+ received_data->bytes, bytes_consumed);
state->inbound_offset += bytes_consumed;
received_data->bytes = amqp_offset(received_data->bytes, bytes_consumed);
received_data->len -= bytes_consumed;
@@ -180,8 +191,8 @@ static size_t consume_data(amqp_connection_state_t state,
}
int amqp_handle_input(amqp_connection_state_t state,
- amqp_bytes_t received_data,
- amqp_frame_t *decoded_frame)
+ amqp_bytes_t received_data,
+ amqp_frame_t *decoded_frame)
{
size_t bytes_consumed;
void *raw_frame;
@@ -190,17 +201,20 @@ int amqp_handle_input(amqp_connection_state_t state,
or a complete, ignored frame was read. */
decoded_frame->frame_type = 0;
- if (received_data.len == 0)
+ if (received_data.len == 0) {
return 0;
+ }
if (state->state == CONNECTION_STATE_IDLE) {
state->inbound_buffer.bytes = amqp_pool_alloc(&state->frame_pool,
- state->inbound_buffer.len);
+ state->inbound_buffer.len);
if (state->inbound_buffer.bytes == NULL)
/* state->inbound_buffer.len is always nonzero, because it
- corresponds to frame_max, which is not permitted to be less
- than AMQP_FRAME_MIN_SIZE (currently 4096 bytes). */
+ corresponds to frame_max, which is not permitted to be less
+ than AMQP_FRAME_MIN_SIZE (currently 4096 bytes). */
+ {
return -ERROR_NO_MEMORY;
+ }
state->state = CONNECTION_STATE_HEADER;
}
@@ -209,8 +223,9 @@ int amqp_handle_input(amqp_connection_state_t state,
/* do we have target_size data yet? if not, return with the
expectation that more will arrive */
- if (state->inbound_offset < state->target_size)
+ if (state->inbound_offset < state->target_size) {
return bytes_consumed;
+ }
raw_frame = state->inbound_buffer.bytes;
@@ -222,13 +237,13 @@ int amqp_handle_input(amqp_connection_state_t state,
decoded_frame->channel = 0;
decoded_frame->payload.protocol_header.transport_high
- = amqp_d8(raw_frame, 4);
+ = amqp_d8(raw_frame, 4);
decoded_frame->payload.protocol_header.transport_low
- = amqp_d8(raw_frame, 5);
+ = amqp_d8(raw_frame, 5);
decoded_frame->payload.protocol_header.protocol_version_major
- = amqp_d8(raw_frame, 6);
+ = amqp_d8(raw_frame, 6);
decoded_frame->payload.protocol_header.protocol_version_minor
- = amqp_d8(raw_frame, 7);
+ = amqp_d8(raw_frame, 7);
return_to_idle(state);
return bytes_consumed;
@@ -240,15 +255,16 @@ int amqp_handle_input(amqp_connection_state_t state,
case CONNECTION_STATE_HEADER:
/* frame length is 3 bytes in */
state->target_size
- = amqp_d32(raw_frame, 3) + HEADER_SIZE + FOOTER_SIZE;
+ = amqp_d32(raw_frame, 3) + HEADER_SIZE + FOOTER_SIZE;
state->state = CONNECTION_STATE_BODY;
bytes_consumed += consume_data(state, &received_data);
/* do we have target_size data yet? if not, return with the
expectation that more will arrive */
- if (state->inbound_offset < state->target_size)
+ if (state->inbound_offset < state->target_size) {
return bytes_consumed;
+ }
/* fall through to process body */
@@ -257,8 +273,9 @@ int amqp_handle_input(amqp_connection_state_t state,
int res;
/* Check frame end marker (footer) */
- if (amqp_d8(raw_frame, state->target_size - 1) != AMQP_FRAME_END)
+ if (amqp_d8(raw_frame, state->target_size - 1) != AMQP_FRAME_END) {
return -ERROR_BAD_AMQP_DATA;
+ }
decoded_frame->frame_type = amqp_d8(raw_frame, 0);
decoded_frame->channel = amqp_d16(raw_frame, 1);
@@ -270,19 +287,20 @@ int amqp_handle_input(amqp_connection_state_t state,
encoded.len = state->target_size - HEADER_SIZE - 4 - FOOTER_SIZE;
res = amqp_decode_method(decoded_frame->payload.method.id,
- &state->decoding_pool, encoded,
- &decoded_frame->payload.method.decoded);
- if (res < 0)
- return res;
+ &state->decoding_pool, encoded,
+ &decoded_frame->payload.method.decoded);
+ if (res < 0) {
+ return res;
+ }
break;
case AMQP_FRAME_HEADER:
decoded_frame->payload.properties.class_id
- = amqp_d16(raw_frame, HEADER_SIZE);
+ = amqp_d16(raw_frame, HEADER_SIZE);
/* unused 2-byte weight field goes here */
decoded_frame->payload.properties.body_size
- = amqp_d64(raw_frame, HEADER_SIZE + 4);
+ = amqp_d64(raw_frame, HEADER_SIZE + 4);
encoded.bytes = amqp_offset(raw_frame, HEADER_SIZE + 12);
encoded.len = state->target_size - HEADER_SIZE - 12 - FOOTER_SIZE;
decoded_frame->payload.properties.raw = encoded;
@@ -290,16 +308,17 @@ int amqp_handle_input(amqp_connection_state_t state,
res = amqp_decode_properties(decoded_frame->payload.properties.class_id,
&state->decoding_pool, encoded,
&decoded_frame->payload.properties.decoded);
- if (res < 0)
+ if (res < 0) {
return res;
+ }
break;
case AMQP_FRAME_BODY:
decoded_frame->payload.body_fragment.len
- = state->target_size - HEADER_SIZE - FOOTER_SIZE;
+ = state->target_size - HEADER_SIZE - FOOTER_SIZE;
decoded_frame->payload.body_fragment.bytes
- = amqp_offset(raw_frame, HEADER_SIZE);
+ = amqp_offset(raw_frame, HEADER_SIZE);
break;
case AMQP_FRAME_HEARTBEAT:
@@ -321,28 +340,32 @@ int amqp_handle_input(amqp_connection_state_t state,
}
}
-amqp_boolean_t amqp_release_buffers_ok(amqp_connection_state_t state) {
+amqp_boolean_t amqp_release_buffers_ok(amqp_connection_state_t state)
+{
return (state->state == CONNECTION_STATE_IDLE) && (state->first_queued_frame == NULL);
}
-void amqp_release_buffers(amqp_connection_state_t state) {
+void amqp_release_buffers(amqp_connection_state_t state)
+{
ENFORCE_STATE(state, CONNECTION_STATE_IDLE);
- if (state->first_queued_frame)
+ if (state->first_queued_frame) {
amqp_abort("Programming error: attempt to amqp_release_buffers while waiting events enqueued");
+ }
recycle_amqp_pool(&state->frame_pool);
recycle_amqp_pool(&state->decoding_pool);
}
-void amqp_maybe_release_buffers(amqp_connection_state_t state) {
+void amqp_maybe_release_buffers(amqp_connection_state_t state)
+{
if (amqp_release_buffers_ok(state)) {
amqp_release_buffers(state);
}
}
int amqp_send_frame(amqp_connection_state_t state,
- const amqp_frame_t *frame)
+ const amqp_frame_t *frame)
{
void *out_frame = state->outbound_buffer.bytes;
int res;
@@ -367,8 +390,7 @@ int amqp_send_frame(amqp_connection_state_t state,
iov[2].iov_len = FOOTER_SIZE;
res = amqp_socket_writev(state->sockfd, iov, 3);
- }
- else {
+ } else {
size_t out_frame_len;
amqp_bytes_t encoded;
@@ -381,8 +403,9 @@ int amqp_send_frame(amqp_connection_state_t state,
res = amqp_encode_method(frame->payload.method.id,
frame->payload.method.decoded, encoded);
- if (res < 0)
+ if (res < 0) {
return res;
+ }
out_frame_len = res + 4;
break;
@@ -397,8 +420,9 @@ int amqp_send_frame(amqp_connection_state_t state,
res = amqp_encode_properties(frame->payload.properties.class_id,
frame->payload.properties.decoded, encoded);
- if (res < 0)
+ if (res < 0) {
return res;
+ }
out_frame_len = res + 12;
break;
@@ -417,8 +441,9 @@ int amqp_send_frame(amqp_connection_state_t state,
out_frame_len + HEADER_SIZE + FOOTER_SIZE, MSG_NOSIGNAL);
}
- if (res < 0)
+ if (res < 0) {
return -amqp_socket_error();
- else
+ } else {
return 0;
+ }
}
diff --git a/librabbitmq/amqp_mem.c b/librabbitmq/amqp_mem.c
index 6915763..0f49df2 100644
--- a/librabbitmq/amqp_mem.c
+++ b/librabbitmq/amqp_mem.c
@@ -43,11 +43,13 @@
#include <string.h>
#include <sys/types.h>
-char const *amqp_version(void) {
+char const *amqp_version(void)
+{
return VERSION; /* defined in config.h */
}
-void init_amqp_pool(amqp_pool_t *pool, size_t pagesize) {
+void init_amqp_pool(amqp_pool_t *pool, size_t pagesize)
+{
pool->pagesize = pagesize ? pagesize : 4096;
pool->pages.num_blocks = 0;
@@ -61,7 +63,8 @@ void init_amqp_pool(amqp_pool_t *pool, size_t pagesize) {
pool->alloc_used = 0;
}
-static void empty_blocklist(amqp_pool_blocklist_t *x) {
+static void empty_blocklist(amqp_pool_blocklist_t *x)
+{
int i;
for (i = 0; i < x->num_blocks; i++) {
@@ -74,30 +77,35 @@ static void empty_blocklist(amqp_pool_blocklist_t *x) {
x->blocklist = NULL;
}
-void recycle_amqp_pool(amqp_pool_t *pool) {
+void recycle_amqp_pool(amqp_pool_t *pool)
+{
empty_blocklist(&pool->large_blocks);
pool->next_page = 0;
pool->alloc_block = NULL;
pool->alloc_used = 0;
}
-void empty_amqp_pool(amqp_pool_t *pool) {
+void empty_amqp_pool(amqp_pool_t *pool)
+{
recycle_amqp_pool(pool);
empty_blocklist(&pool->pages);
}
/* Returns 1 on success, 0 on failure */
-static int record_pool_block(amqp_pool_blocklist_t *x, void *block) {
+static int record_pool_block(amqp_pool_blocklist_t *x, void *block)
+{
size_t blocklistlength = sizeof(void *) * (x->num_blocks + 1);
if (x->blocklist == NULL) {
x->blocklist = malloc(blocklistlength);
- if (x->blocklist == NULL)
+ if (x->blocklist == NULL) {
return 0;
+ }
} else {
void *newbl = realloc(x->blocklist, blocklistlength);
- if (newbl == NULL)
+ if (newbl == NULL) {
return 0;
+ }
x->blocklist = newbl;
}
@@ -106,7 +114,8 @@ static int record_pool_block(amqp_pool_blocklist_t *x, void *block) {
return 1;
}
-void *amqp_pool_alloc(amqp_pool_t *pool, size_t amount) {
+void *amqp_pool_alloc(amqp_pool_t *pool, size_t amount)
+{
if (amount == 0) {
return NULL;
}
@@ -118,8 +127,9 @@ void *amqp_pool_alloc(amqp_pool_t *pool, size_t amount) {
if (result == NULL) {
return NULL;
}
- if (!record_pool_block(&pool->large_blocks, result))
+ if (!record_pool_block(&pool->large_blocks, result)) {
return NULL;
+ }
return result;
}
@@ -138,8 +148,9 @@ void *amqp_pool_alloc(amqp_pool_t *pool, size_t amount) {
if (pool->alloc_block == NULL) {
return NULL;
}
- if (!record_pool_block(&pool->pages, pool->alloc_block))
+ if (!record_pool_block(&pool->pages, pool->alloc_block)) {
return NULL;
+ }
pool->next_page = pool->pages.num_blocks;
} else {
pool->alloc_block = pool->pages.blocklist[pool->next_page];
@@ -151,19 +162,22 @@ void *amqp_pool_alloc(amqp_pool_t *pool, size_t amount) {
return pool->alloc_block;
}
-void amqp_pool_alloc_bytes(amqp_pool_t *pool, size_t amount, amqp_bytes_t *output) {
+void amqp_pool_alloc_bytes(amqp_pool_t *pool, size_t amount, amqp_bytes_t *output)
+{
output->len = amount;
output->bytes = amqp_pool_alloc(pool, amount);
}
-amqp_bytes_t amqp_cstring_bytes(char const *cstr) {
+amqp_bytes_t amqp_cstring_bytes(char const *cstr)
+{
amqp_bytes_t result;
result.len = strlen(cstr);
result.bytes = (void *) cstr;
return result;
}
-amqp_bytes_t amqp_bytes_malloc_dup(amqp_bytes_t src) {
+amqp_bytes_t amqp_bytes_malloc_dup(amqp_bytes_t src)
+{
amqp_bytes_t result;
result.len = src.len;
result.bytes = malloc(src.len);
@@ -173,13 +187,15 @@ amqp_bytes_t amqp_bytes_malloc_dup(amqp_bytes_t src) {
return result;
}
-amqp_bytes_t amqp_bytes_malloc(size_t amount) {
+amqp_bytes_t amqp_bytes_malloc(size_t amount)
+{
amqp_bytes_t result;
result.len = amount;
result.bytes = malloc(amount); /* will return NULL if it fails */
return result;
}
-void amqp_bytes_free(amqp_bytes_t bytes) {
+void amqp_bytes_free(amqp_bytes_t bytes)
+{
free(bytes.bytes);
}
diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h
index 725e3c5..191155e 100644
--- a/librabbitmq/amqp_private.h
+++ b/librabbitmq/amqp_private.h
@@ -159,53 +159,53 @@ static inline void *amqp_offset(void *data, size_t offset)
/* This macro defines the encoding and decoding functions associated with a
simple type. */
-#define DECLARE_CODEC_BASE_TYPE(bits, htonx, ntohx) \
- \
-static inline void amqp_e##bits(void *data, size_t offset, \
- uint##bits##_t val) \
-{ \
- /* The AMQP data might be unaligned. So we encode and then copy the \
- result into place. */ \
- uint##bits##_t res = htonx(val); \
- memcpy(amqp_offset(data, offset), &res, bits/8); \
-} \
- \
-static inline uint##bits##_t amqp_d##bits(void *data, size_t offset) \
-{ \
- /* The AMQP data might be unaligned. So we copy the source value \
- into a variable and then decode it. */ \
- uint##bits##_t val; \
- memcpy(&val, amqp_offset(data, offset), bits/8); \
- return ntohx(val); \
-} \
- \
-static inline int amqp_encode_##bits(amqp_bytes_t encoded, size_t *offset, \
- uint##bits##_t input) \
- \
-{ \
- size_t o = *offset; \
- if ((*offset = o + bits / 8) <= encoded.len) { \
- amqp_e##bits(encoded.bytes, o, input); \
- return 1; \
- } \
- else { \
- return 0; \
- } \
-} \
- \
-static inline int amqp_decode_##bits(amqp_bytes_t encoded, size_t *offset, \
- uint##bits##_t *output) \
- \
-{ \
- size_t o = *offset; \
- if ((*offset = o + bits / 8) <= encoded.len) { \
- *output = amqp_d##bits(encoded.bytes, o); \
- return 1; \
- } \
- else { \
- return 0; \
- } \
-}
+#define DECLARE_CODEC_BASE_TYPE(bits, htonx, ntohx) \
+ \
+ static inline void amqp_e##bits(void *data, size_t offset, \
+ uint##bits##_t val) \
+ { \
+ /* The AMQP data might be unaligned. So we encode and then copy the \
+ result into place. */ \
+ uint##bits##_t res = htonx(val); \
+ memcpy(amqp_offset(data, offset), &res, bits/8); \
+ } \
+ \
+ static inline uint##bits##_t amqp_d##bits(void *data, size_t offset) \
+ { \
+ /* The AMQP data might be unaligned. So we copy the source value \
+ into a variable and then decode it. */ \
+ uint##bits##_t val; \
+ memcpy(&val, amqp_offset(data, offset), bits/8); \
+ return ntohx(val); \
+ } \
+ \
+ static inline int amqp_encode_##bits(amqp_bytes_t encoded, size_t *offset, \
+ uint##bits##_t input) \
+ \
+ { \
+ size_t o = *offset; \
+ if ((*offset = o + bits / 8) <= encoded.len) { \
+ amqp_e##bits(encoded.bytes, o, input); \
+ return 1; \
+ } \
+ else { \
+ return 0; \
+ } \
+ } \
+ \
+ static inline int amqp_decode_##bits(amqp_bytes_t encoded, size_t *offset, \
+ uint##bits##_t *output) \
+ \
+ { \
+ size_t o = *offset; \
+ if ((*offset = o + bits / 8) <= encoded.len) { \
+ *output = amqp_d##bits(encoded.bytes, o); \
+ return 1; \
+ } \
+ else { \
+ return 0; \
+ } \
+ }
/* Determine byte order */
#if defined(__GLIBC__)
@@ -215,7 +215,7 @@ static inline int amqp_decode_##bits(amqp_bytes_t encoded, size_t *offset, \
# elif (__BYTE_ORDER == __BIG_ENDIAN)
# define AMQP_BIG_ENDIAN
# else
- /* Don't define anything */
+/* Don't define anything */
# endif
#elif defined(_BIG_ENDIAN) && !defined(_LITTLE_ENDIAN) || \
defined(__BIG_ENDIAN__) && !defined(__LITTLE_ENDIAN__)
@@ -235,40 +235,40 @@ static inline int amqp_decode_##bits(amqp_bytes_t encoded, size_t *offset, \
defined(__i386__) || defined(_M_IX86)
# define AMQP_LITTLE_ENDIAN
#else
- /* Don't define anything */
+/* Don't define anything */
#endif
#if defined(AMQP_LITTLE_ENDIAN)
-#define DECLARE_XTOXLL(func) \
-static inline uint64_t func##ll(uint64_t val) \
-{ \
- union { \
- uint64_t whole; \
- uint32_t halves[2]; \
- } u; \
- uint32_t t; \
- u.whole = val; \
- t = u.halves[0]; \
- u.halves[0] = func##l(u.halves[1]); \
- u.halves[1] = func##l(t); \
- return u.whole; \
-}
+#define DECLARE_XTOXLL(func) \
+ static inline uint64_t func##ll(uint64_t val) \
+ { \
+ union { \
+ uint64_t whole; \
+ uint32_t halves[2]; \
+ } u; \
+ uint32_t t; \
+ u.whole = val; \
+ t = u.halves[0]; \
+ u.halves[0] = func##l(u.halves[1]); \
+ u.halves[1] = func##l(t); \
+ return u.whole; \
+ }
#elif defined(AMQP_BIG_ENDIAN)
-#define DECLARE_XTOXLL(func) \
-static inline uint64_t func##ll(uint64_t val) \
-{ \
- union { \
- uint64_t whole; \
- uint32_t halves[2]; \
- } u; \
- u.whole = val; \
- u.halves[0] = func##l(u.halves[0]); \
- u.halves[1] = func##l(u.halves[1]); \
- return u.whole; \
-}
+#define DECLARE_XTOXLL(func) \
+ static inline uint64_t func##ll(uint64_t val) \
+ { \
+ union { \
+ uint64_t whole; \
+ uint32_t halves[2]; \
+ } u; \
+ u.whole = val; \
+ u.halves[0] = func##l(u.halves[0]); \
+ u.halves[1] = func##l(u.halves[1]); \
+ return u.whole; \
+ }
#else
# error Endianness not known
@@ -285,28 +285,26 @@ DECLARE_CODEC_BASE_TYPE(32, htonl, ntohl)
DECLARE_CODEC_BASE_TYPE(64, htonll, ntohll)
static inline int amqp_encode_bytes(amqp_bytes_t encoded, size_t *offset,
- amqp_bytes_t input)
+ amqp_bytes_t input)
{
size_t o = *offset;
if ((*offset = o + input.len) <= encoded.len) {
memcpy(amqp_offset(encoded.bytes, o), input.bytes, input.len);
return 1;
- }
- else {
+ } else {
return 0;
}
}
static inline int amqp_decode_bytes(amqp_bytes_t encoded, size_t *offset,
- amqp_bytes_t *output, size_t len)
+ amqp_bytes_t *output, size_t len)
{
size_t o = *offset;
if ((*offset = o + len) <= encoded.len) {
output->bytes = amqp_offset(encoded.bytes, o);
output->len = len;
return 1;
- }
- else {
+ } else {
return 0;
}
}
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index c323792..8454d45 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -44,7 +44,7 @@
#include <assert.h>
int amqp_open_socket(char const *hostname,
- int portnumber)
+ int portnumber)
{
struct addrinfo hint;
struct addrinfo *address_list;
@@ -54,8 +54,9 @@ int amqp_open_socket(char const *hostname,
int last_error = 0;
int one = 1; /* for setsockopt */
- if (0 != (last_error = amqp_socket_init()))
+ if (0 != (last_error = amqp_socket_init())) {
return last_error;
+ }
memset(&hint, 0, sizeof(hint));
hint.ai_family = PF_UNSPEC; /* PF_INET or PF_INET6 */
@@ -66,63 +67,58 @@ int amqp_open_socket(char const *hostname,
last_error = getaddrinfo(hostname, portnumber_string, &hint, &address_list);
- if (last_error != 0)
- {
+ if (last_error != 0) {
return -ERROR_GETHOSTBYNAME_FAILED;
}
- for (addr = address_list; addr; addr = addr->ai_next)
- {
+ for (addr = address_list; addr; addr = addr->ai_next) {
/*
This cast is to squash warnings on Win64, see:
http://stackoverflow.com/questions/1953639/is-it-safe-to-cast-socket-to-int-under-win64
*/
sockfd = (int)socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol);
- if (-1 == sockfd)
- {
+ if (-1 == sockfd) {
last_error = -amqp_socket_error();
continue;
}
#ifdef DISABLE_SIGPIPE_WITH_SETSOCKOPT
- if (0 != amqp_socket_setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one)))
- {
+ if (0 != amqp_socket_setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one))) {
last_error = -amqp_socket_error();
amqp_socket_close(sockfd);
continue;
}
#endif /* DISABLE_SIGPIPE_WITH_SETSOCKOPT */
if (0 != amqp_socket_setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one))
- || 0 != connect(sockfd, addr->ai_addr, addr->ai_addrlen))
- {
+ || 0 != connect(sockfd, addr->ai_addr, addr->ai_addrlen)) {
last_error = -amqp_socket_error();
amqp_socket_close(sockfd);
continue;
- }
- else
- {
+ } else {
last_error = 0;
break;
}
}
freeaddrinfo(address_list);
- if (last_error != 0)
- {
+ if (last_error != 0) {
return last_error;
}
return sockfd;
}
-int amqp_send_header(amqp_connection_state_t state) {
+int amqp_send_header(amqp_connection_state_t state)
+{
static const uint8_t header[8] = { 'A', 'M', 'Q', 'P', 0,
- AMQP_PROTOCOL_VERSION_MAJOR,
- AMQP_PROTOCOL_VERSION_MINOR,
- AMQP_PROTOCOL_VERSION_REVISION };
+ AMQP_PROTOCOL_VERSION_MAJOR,
+ AMQP_PROTOCOL_VERSION_MINOR,
+ AMQP_PROTOCOL_VERSION_REVISION
+ };
return send(state->sockfd, (void *)header, 8, MSG_NOSIGNAL);
}
-static amqp_bytes_t sasl_method_name(amqp_sasl_method_enum method) {
+static amqp_bytes_t sasl_method_name(amqp_sasl_method_enum method)
+{
amqp_bytes_t res;
switch (method) {
@@ -139,40 +135,43 @@ static amqp_bytes_t sasl_method_name(amqp_sasl_method_enum method) {
}
static amqp_bytes_t sasl_response(amqp_pool_t *pool,
- amqp_sasl_method_enum method,
- va_list args)
+ amqp_sasl_method_enum method,
+ va_list args)
{
amqp_bytes_t response;
switch (method) {
- case AMQP_SASL_METHOD_PLAIN: {
- char *username = va_arg(args, char *);
- size_t username_len = strlen(username);
- char *password = va_arg(args, char *);
- size_t password_len = strlen(password);
- char *response_buf;
-
- amqp_pool_alloc_bytes(pool, strlen(username) + strlen(password) + 2, &response);
- if (response.bytes == NULL)
- /* We never request a zero-length block, because of the +2
- above, so a NULL here really is ENOMEM. */
- return response;
-
- response_buf = response.bytes;
- response_buf[0] = 0;
- memcpy(response_buf + 1, username, username_len);
- response_buf[username_len + 1] = 0;
- memcpy(response_buf + username_len + 2, password, password_len);
- break;
+ case AMQP_SASL_METHOD_PLAIN: {
+ char *username = va_arg(args, char *);
+ size_t username_len = strlen(username);
+ char *password = va_arg(args, char *);
+ size_t password_len = strlen(password);
+ char *response_buf;
+
+ amqp_pool_alloc_bytes(pool, strlen(username) + strlen(password) + 2, &response);
+ if (response.bytes == NULL)
+ /* We never request a zero-length block, because of the +2
+ above, so a NULL here really is ENOMEM. */
+ {
+ return response;
}
- default:
- amqp_abort("Invalid SASL method: %d", (int) method);
+
+ response_buf = response.bytes;
+ response_buf[0] = 0;
+ memcpy(response_buf + 1, username, username_len);
+ response_buf[username_len + 1] = 0;
+ memcpy(response_buf + username_len + 2, password, password_len);
+ break;
+ }
+ default:
+ amqp_abort("Invalid SASL method: %d", (int) method);
}
return response;
}
-amqp_boolean_t amqp_frames_enqueued(amqp_connection_state_t state) {
+amqp_boolean_t amqp_frames_enqueued(amqp_connection_state_t state)
+{
return (state->first_queued_frame != NULL);
}
@@ -180,12 +179,13 @@ amqp_boolean_t amqp_frames_enqueued(amqp_connection_state_t state) {
* Check to see if we have data in our buffer. If this returns 1, we
* will avoid an immediate blocking read in amqp_simple_wait_frame.
*/
-amqp_boolean_t amqp_data_in_buffer(amqp_connection_state_t state) {
+amqp_boolean_t amqp_data_in_buffer(amqp_connection_state_t state)
+{
return (state->sock_inbound_offset < state->sock_inbound_limit);
}
static int wait_frame_inner(amqp_connection_state_t state,
- amqp_frame_t *decoded_frame)
+ amqp_frame_t *decoded_frame)
{
while (1) {
int res;
@@ -196,26 +196,29 @@ static int wait_frame_inner(amqp_connection_state_t state,
buffer.bytes = ((char *) state->sock_inbound_buffer.bytes) + state->sock_inbound_offset;
res = amqp_handle_input(state, buffer, decoded_frame);
- if (res < 0)
- return res;
+ if (res < 0) {
+ return res;
+ }
state->sock_inbound_offset += res;
- if (decoded_frame->frame_type != 0)
- /* Complete frame was read. Return it. */
- return 0;
+ if (decoded_frame->frame_type != 0) {
+ /* Complete frame was read. Return it. */
+ return 0;
+ }
/* Incomplete or ignored frame. Keep processing input. */
assert(res != 0);
}
res = recv(state->sockfd, state->sock_inbound_buffer.bytes,
- state->sock_inbound_buffer.len, 0);
+ state->sock_inbound_buffer.len, 0);
if (res <= 0) {
- if (res == 0)
- return -ERROR_CONNECTION_CLOSED;
- else
- return -amqp_socket_error();
+ if (res == 0) {
+ return -ERROR_CONNECTION_CLOSED;
+ } else {
+ return -amqp_socket_error();
+ }
}
state->sock_inbound_limit = res;
@@ -224,7 +227,7 @@ static int wait_frame_inner(amqp_connection_state_t state,
}
int amqp_simple_wait_frame(amqp_connection_state_t state,
- amqp_frame_t *decoded_frame)
+ amqp_frame_t *decoded_frame)
{
if (state->first_queued_frame != NULL) {
amqp_frame_t *f = (amqp_frame_t *) state->first_queued_frame->data;
@@ -240,38 +243,42 @@ int amqp_simple_wait_frame(amqp_connection_state_t state,
}
int amqp_simple_wait_method(amqp_connection_state_t state,
- amqp_channel_t expected_channel,
- amqp_method_number_t expected_method,
- amqp_method_t *output)
+ amqp_channel_t expected_channel,
+ amqp_method_number_t expected_method,
+ amqp_method_t *output)
{
amqp_frame_t frame;
int res = amqp_simple_wait_frame(state, &frame);
- if (res < 0)
+ if (res < 0) {
return res;
+ }
- if (frame.channel != expected_channel)
+ if (frame.channel != expected_channel) {
amqp_abort("Expected 0x%08X method frame on channel %d, got frame on channel %d",
- expected_method,
- expected_channel,
- frame.channel);
- if (frame.frame_type != AMQP_FRAME_METHOD)
+ expected_method,
+ expected_channel,
+ frame.channel);
+ }
+ if (frame.frame_type != AMQP_FRAME_METHOD) {
amqp_abort("Expected 0x%08X method frame on channel %d, got frame type %d",
- expected_method,
- expected_channel,
- frame.frame_type);
- if (frame.payload.method.id != expected_method)
+ expected_method,
+ expected_channel,
+ frame.frame_type);
+ }
+ if (frame.payload.method.id != expected_method) {
amqp_abort("Expected method ID 0x%08X on channel %d, got ID 0x%08X",
- expected_method,
- expected_channel,
- frame.payload.method.id);
+ expected_method,
+ expected_channel,
+ frame.payload.method.id);
+ }
*output = frame.payload.method;
return 0;
}
int amqp_send_method(amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_method_number_t id,
- void *decoded)
+ amqp_channel_t channel,
+ amqp_method_number_t id,
+ void *decoded)
{
amqp_frame_t frame;
@@ -285,17 +292,19 @@ int amqp_send_method(amqp_connection_state_t state,
static int amqp_id_in_reply_list( amqp_method_number_t expected, amqp_method_number_t *list )
{
while ( *list != 0 ) {
- if ( *list == expected ) return 1;
+ if ( *list == expected ) {
+ return 1;
+ }
list++;
}
return 0;
}
amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_method_number_t request_id,
- amqp_method_number_t *expected_reply_ids,
- void *decoded_request_method)
+ amqp_channel_t channel,
+ amqp_method_number_t request_id,
+ amqp_method_number_t *expected_reply_ids,
+ void *decoded_request_method)
{
int status;
amqp_rpc_reply_t result;
@@ -312,7 +321,7 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
{
amqp_frame_t frame;
- retry:
+retry:
status = wait_frame_inner(state, &frame);
if (status < 0) {
result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
@@ -328,21 +337,23 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
* - on the channel we want, and a channel.close frame, or
* - on channel zero, and a connection.close frame.
*/
- if (!( (frame.frame_type == AMQP_FRAME_METHOD) &&
- ( ((frame.channel == channel) &&
- ((amqp_id_in_reply_list(frame.payload.method.id, expected_reply_ids)) ||
- (frame.payload.method.id == AMQP_CHANNEL_CLOSE_METHOD)))
- ||
- ((frame.channel == 0) &&
- (frame.payload.method.id == AMQP_CONNECTION_CLOSE_METHOD)) ) ))
- {
+ if (!((frame.frame_type == AMQP_FRAME_METHOD)
+ && (
+ ((frame.channel == channel)
+ && (amqp_id_in_reply_list(frame.payload.method.id, expected_reply_ids)
+ || (frame.payload.method.id == AMQP_CHANNEL_CLOSE_METHOD)))
+ ||
+ ((frame.channel == 0)
+ && (frame.payload.method.id == AMQP_CONNECTION_CLOSE_METHOD))
+ )
+ )) {
amqp_frame_t *frame_copy = amqp_pool_alloc(&state->decoding_pool, sizeof(amqp_frame_t));
amqp_link_t *link = amqp_pool_alloc(&state->decoding_pool, sizeof(amqp_link_t));
if (frame_copy == NULL || link == NULL) {
- result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
- result.library_error = ERROR_NO_MEMORY;
- return result;
+ result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
+ result.library_error = ERROR_NO_MEMORY;
+ return result;
}
*frame_copy = frame;
@@ -351,9 +362,9 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
link->data = frame_copy;
if (state->last_queued_frame == NULL) {
- state->first_queued_frame = link;
+ state->first_queued_frame = link;
} else {
- state->last_queued_frame->next = link;
+ state->last_queued_frame->next = link;
}
state->last_queued_frame = link;
@@ -361,8 +372,8 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
}
result.reply_type = (amqp_id_in_reply_list(frame.payload.method.id, expected_reply_ids))
- ? AMQP_RESPONSE_NORMAL
- : AMQP_RESPONSE_SERVER_EXCEPTION;
+ ? AMQP_RESPONSE_NORMAL
+ : AMQP_RESPONSE_SERVER_EXCEPTION;
result.reply = frame.payload.method;
return result;
@@ -370,10 +381,10 @@ amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
}
void *amqp_simple_rpc_decoded(amqp_connection_state_t state,
- amqp_channel_t channel,
- amqp_method_number_t request_id,
- amqp_method_number_t reply_id,
- void *decoded_request_method)
+ amqp_channel_t channel,
+ amqp_method_number_t request_id,
+ amqp_method_number_t reply_id,
+ void *decoded_request_method)
{
amqp_method_number_t replies[2];
@@ -381,12 +392,13 @@ void *amqp_simple_rpc_decoded(amqp_connection_state_t state,
replies[1] = 0;
state->most_recent_api_result = amqp_simple_rpc(state, channel,
- request_id, replies,
- decoded_request_method);
- if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL)
+ request_id, replies,
+ decoded_request_method);
+ if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) {
return state->most_recent_api_result.reply.decoded;
- else
+ } else {
return NULL;
+ }
}
amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state)
@@ -396,11 +408,11 @@ amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state)
static int amqp_login_inner(amqp_connection_state_t state,
- int channel_max,
- int frame_max,
- int heartbeat,
- amqp_sasl_method_enum sasl_method,
- va_list vl)
+ int channel_max,
+ int frame_max,
+ int heartbeat,
+ amqp_sasl_method_enum sasl_method,
+ va_list vl)
{
int res;
amqp_method_t method;
@@ -411,14 +423,15 @@ static int amqp_login_inner(amqp_connection_state_t state,
amqp_send_header(state);
res = amqp_simple_wait_method(state, 0, AMQP_CONNECTION_START_METHOD,
- &method);
- if (res < 0)
+ &method);
+ if (res < 0) {
return res;
+ }
{
amqp_connection_start_t *s = (amqp_connection_start_t *) method.decoded;
if ((s->version_major != AMQP_PROTOCOL_VERSION_MAJOR) ||
- (s->version_minor != AMQP_PROTOCOL_VERSION_MINOR)) {
+ (s->version_minor != AMQP_PROTOCOL_VERSION_MINOR)) {
return -ERROR_INCOMPATIBLE_AMQP_VERSION;
}
@@ -431,10 +444,11 @@ static int amqp_login_inner(amqp_connection_state_t state,
amqp_table_entry_t properties[2];
amqp_connection_start_ok_t s;
amqp_bytes_t response_bytes = sasl_response(&state->decoding_pool,
- sasl_method, vl);
+ sasl_method, vl);
- if (response_bytes.bytes == NULL)
+ if (response_bytes.bytes == NULL) {
return -ERROR_NO_MEMORY;
+ }
properties[0].key = amqp_cstring_bytes("product");
properties[0].value.kind = AMQP_FIELD_KIND_UTF8;
@@ -454,16 +468,18 @@ static int amqp_login_inner(amqp_connection_state_t state,
s.locale.len = 5;
res = amqp_send_method(state, 0, AMQP_CONNECTION_START_OK_METHOD, &s);
- if (res < 0)
+ if (res < 0) {
return res;
+ }
}
amqp_release_buffers(state);
res = amqp_simple_wait_method(state, 0, AMQP_CONNECTION_TUNE_METHOD,
- &method);
- if (res < 0)
+ &method);
+ if (res < 0) {
return res;
+ }
{
amqp_connection_tune_t *s = (amqp_connection_tune_t *) method.decoded;
@@ -472,18 +488,22 @@ static int amqp_login_inner(amqp_connection_state_t state,
server_heartbeat = s->heartbeat;
}
- if (server_channel_max != 0 && server_channel_max < channel_max)
+ if (server_channel_max != 0 && server_channel_max < channel_max) {
channel_max = server_channel_max;
+ }
- if (server_frame_max != 0 && server_frame_max < frame_max)
+ if (server_frame_max != 0 && server_frame_max < frame_max) {
frame_max = server_frame_max;
+ }
- if (server_heartbeat != 0 && server_heartbeat < heartbeat)
+ if (server_heartbeat != 0 && server_heartbeat < heartbeat) {
heartbeat = server_heartbeat;
+ }
res = amqp_tune_connection(state, channel_max, frame_max, heartbeat);
- if (res < 0)
+ if (res < 0) {
return res;
+ }
{
amqp_connection_tune_ok_t s;
@@ -492,8 +512,9 @@ static int amqp_login_inner(amqp_connection_state_t state,
s.heartbeat = heartbeat;
res = amqp_send_method(state, 0, AMQP_CONNECTION_TUNE_OK_METHOD, &s);
- if (res < 0)
+ if (res < 0) {
return res;
+ }
}
amqp_release_buffers(state);
@@ -502,12 +523,12 @@ static int amqp_login_inner(amqp_connection_state_t state,
}
amqp_rpc_reply_t amqp_login(amqp_connection_state_t state,
- char const *vhost,
- int channel_max,
- int frame_max,
- int heartbeat,
- amqp_sasl_method_enum sasl_method,
- ...)
+ char const *vhost,
+ int channel_max,
+ int frame_max,
+ int heartbeat,
+ amqp_sasl_method_enum sasl_method,
+ ...)
{
va_list vl;
amqp_rpc_reply_t result;
@@ -533,12 +554,13 @@ amqp_rpc_reply_t amqp_login(amqp_connection_state_t state,
s.insist = 1;
result = amqp_simple_rpc(state,
- 0,
- AMQP_CONNECTION_OPEN_METHOD,
- (amqp_method_number_t *) &replies,
- &s);
- if (result.reply_type != AMQP_RESPONSE_NORMAL)
+ 0,
+ AMQP_CONNECTION_OPEN_METHOD,
+ (amqp_method_number_t *) &replies,
+ &s);
+ if (result.reply_type != AMQP_RESPONSE_NORMAL) {
return result;
+ }
}
amqp_maybe_release_buffers(state);
diff --git a/librabbitmq/amqp_table.c b/librabbitmq/amqp_table.c
index d82c946..d9383d6 100644
--- a/librabbitmq/amqp_table.c
+++ b/librabbitmq/amqp_table.c
@@ -46,20 +46,20 @@
#define INITIAL_TABLE_SIZE 16
static int amqp_decode_field_value(amqp_bytes_t encoded,
- amqp_pool_t *pool,
- amqp_field_value_t *entry,
- size_t *offset);
+ amqp_pool_t *pool,
+ amqp_field_value_t *entry,
+ size_t *offset);
static int amqp_encode_field_value(amqp_bytes_t encoded,
- amqp_field_value_t *entry,
- size_t *offset);
+ amqp_field_value_t *entry,
+ size_t *offset);
/*---------------------------------------------------------------------------*/
static int amqp_decode_array(amqp_bytes_t encoded,
- amqp_pool_t *pool,
- amqp_array_t *output,
- size_t *offset)
+ amqp_pool_t *pool,
+ amqp_array_t *output,
+ size_t *offset)
{
uint32_t arraysize;
int num_entries = 0;
@@ -68,12 +68,14 @@ static int amqp_decode_array(amqp_bytes_t encoded,
size_t limit;
int res;
- if (!amqp_decode_32(encoded, offset, &arraysize))
+ if (!amqp_decode_32(encoded, offset, &arraysize)) {
return -ERROR_BAD_AMQP_DATA;
+ }
entries = malloc(allocated_entries * sizeof(amqp_field_value_t));
- if (entries == NULL)
+ if (entries == NULL) {
return -ERROR_NO_MEMORY;
+ }
limit = *offset + arraysize;
while (*offset < limit) {
@@ -82,16 +84,18 @@ static int amqp_decode_array(amqp_bytes_t encoded,
allocated_entries = allocated_entries * 2;
newentries = realloc(entries, allocated_entries * sizeof(amqp_field_value_t));
res = -ERROR_NO_MEMORY;
- if (newentries == NULL)
- goto out;
+ if (newentries == NULL) {
+ goto out;
+ }
entries = newentries;
}
res = amqp_decode_field_value(encoded, pool, &entries[num_entries],
- offset);
- if (res < 0)
+ offset);
+ if (res < 0) {
goto out;
+ }
num_entries++;
}
@@ -100,21 +104,22 @@ static int amqp_decode_array(amqp_bytes_t encoded,
output->entries = amqp_pool_alloc(pool, num_entries * sizeof(amqp_field_value_t));
res = -ERROR_NO_MEMORY;
/* NULL is legitimate if we requested a zero-length block. */
- if (output->entries == NULL && num_entries > 0)
+ if (output->entries == NULL && num_entries > 0) {
goto out;
+ }
memcpy(output->entries, entries, num_entries * sizeof(amqp_field_value_t));
res = 0;
- out:
+out:
free(entries);
return res;
}
int amqp_decode_table(amqp_bytes_t encoded,
- amqp_pool_t *pool,
- amqp_table_t *output,
- size_t *offset)
+ amqp_pool_t *pool,
+ amqp_table_t *output,
+ size_t *offset)
{
uint32_t tablesize;
int num_entries = 0;
@@ -123,40 +128,46 @@ int amqp_decode_table(amqp_bytes_t encoded,
size_t limit;
int res;
- if (!amqp_decode_32(encoded, offset, &tablesize))
+ if (!amqp_decode_32(encoded, offset, &tablesize)) {
return -ERROR_BAD_AMQP_DATA;
+ }
entries = malloc(allocated_entries * sizeof(amqp_table_entry_t));
- if (entries == NULL)
+ if (entries == NULL) {
return -ERROR_NO_MEMORY;
+ }
limit = *offset + tablesize;
while (*offset < limit) {
uint8_t keylen;
res = -ERROR_BAD_AMQP_DATA;
- if (!amqp_decode_8(encoded, offset, &keylen))
+ if (!amqp_decode_8(encoded, offset, &keylen)) {
goto out;
+ }
if (num_entries >= allocated_entries) {
void *newentries;
allocated_entries = allocated_entries * 2;
newentries = realloc(entries, allocated_entries * sizeof(amqp_table_entry_t));
res = -ERROR_NO_MEMORY;
- if (newentries == NULL)
- goto out;
+ if (newentries == NULL) {
+ goto out;
+ }
entries = newentries;
}
res = -ERROR_BAD_AMQP_DATA;
- if (!amqp_decode_bytes(encoded, offset, &entries[num_entries].key, keylen))
+ if (!amqp_decode_bytes(encoded, offset, &entries[num_entries].key, keylen)) {
goto out;
+ }
res = amqp_decode_field_value(encoded, pool, &entries[num_entries].value,
- offset);
- if (res < 0)
+ offset);
+ if (res < 0) {
goto out;
+ }
num_entries++;
}
@@ -165,26 +176,28 @@ int amqp_decode_table(amqp_bytes_t encoded,
output->entries = amqp_pool_alloc(pool, num_entries * sizeof(amqp_table_entry_t));
res = -ERROR_NO_MEMORY;
/* NULL is legitimate if we requested a zero-length block. */
- if (output->entries == NULL && num_entries > 0)
+ if (output->entries == NULL && num_entries > 0) {
goto out;
+ }
memcpy(output->entries, entries, num_entries * sizeof(amqp_table_entry_t));
res = 0;
- out:
+out:
free(entries);
return res;
}
static int amqp_decode_field_value(amqp_bytes_t encoded,
- amqp_pool_t *pool,
- amqp_field_value_t *entry,
- size_t *offset)
+ amqp_pool_t *pool,
+ amqp_field_value_t *entry,
+ size_t *offset)
{
int res = -ERROR_BAD_AMQP_DATA;
- if (!amqp_decode_8(encoded, offset, &entry->kind))
+ if (!amqp_decode_8(encoded, offset, &entry->kind)) {
goto out;
+ }
#define TRIVIAL_FIELD_DECODER(bits) if (!amqp_decode_##bits(encoded, offset, &entry->value.u##bits)) goto out; break
#define SIMPLE_FIELD_DECODER(bits, dest, how) { uint##bits##_t val; if (!amqp_decode_##bits(encoded, offset, &val)) goto out; entry->value.dest = how; } break
@@ -223,8 +236,9 @@ static int amqp_decode_field_value(amqp_bytes_t encoded,
case AMQP_FIELD_KIND_DECIMAL:
if (!amqp_decode_8(encoded, offset, &entry->value.decimal.decimals)
- || !amqp_decode_32(encoded, offset, &entry->value.decimal.value))
+ || !amqp_decode_32(encoded, offset, &entry->value.decimal.value)) {
goto out;
+ }
break;
case AMQP_FIELD_KIND_UTF8:
@@ -234,8 +248,9 @@ static int amqp_decode_field_value(amqp_bytes_t encoded,
case AMQP_FIELD_KIND_BYTES: {
uint32_t len;
if (!amqp_decode_32(encoded, offset, &len)
- || !amqp_decode_bytes(encoded, offset, &entry->value.bytes, len))
+ || !amqp_decode_bytes(encoded, offset, &entry->value.bytes, len)) {
goto out;
+ }
break;
}
@@ -259,15 +274,15 @@ static int amqp_decode_field_value(amqp_bytes_t encoded,
res = 0;
- out:
+out:
return res;
}
/*---------------------------------------------------------------------------*/
static int amqp_encode_array(amqp_bytes_t encoded,
- amqp_array_t *input,
- size_t *offset)
+ amqp_array_t *input,
+ size_t *offset)
{
size_t start = *offset;
int i, res;
@@ -276,22 +291,24 @@ static int amqp_encode_array(amqp_bytes_t encoded,
for (i = 0; i < input->num_entries; i++) {
res = amqp_encode_field_value(encoded, &input->entries[i], offset);
- if (res < 0)
+ if (res < 0) {
goto out;
+ }
}
- if (amqp_encode_32(encoded, &start, *offset - start - 4))
+ if (amqp_encode_32(encoded, &start, *offset - start - 4)) {
res = 0;
- else
+ } else {
res = -ERROR_BAD_AMQP_DATA;
+ }
- out:
+out:
return res;
}
int amqp_encode_table(amqp_bytes_t encoded,
- amqp_table_t *input,
- size_t *offset)
+ amqp_table_t *input,
+ size_t *offset)
{
size_t start = *offset;
int i, res;
@@ -300,35 +317,40 @@ int amqp_encode_table(amqp_bytes_t encoded,
for (i = 0; i < input->num_entries; i++) {
res = amqp_encode_8(encoded, offset, input->entries[i].key.len);
- if (res < 0)
+ if (res < 0) {
goto out;
+ }
res = amqp_encode_bytes(encoded, offset, input->entries[i].key);
- if (res < 0)
+ if (res < 0) {
goto out;
+ }
res = amqp_encode_field_value(encoded, &input->entries[i].value, offset);
- if (res < 0)
+ if (res < 0) {
goto out;
+ }
}
- if (amqp_encode_32(encoded, &start, *offset - start - 4))
+ if (amqp_encode_32(encoded, &start, *offset - start - 4)) {
res = 0;
- else
+ } else {
res = -ERROR_BAD_AMQP_DATA;
+ }
- out:
+out:
return res;
}
static int amqp_encode_field_value(amqp_bytes_t encoded,
- amqp_field_value_t *entry,
- size_t *offset)
+ amqp_field_value_t *entry,
+ size_t *offset)
{
int res = -ERROR_BAD_AMQP_DATA;
- if (!amqp_encode_8(encoded, offset, entry->kind))
+ if (!amqp_encode_8(encoded, offset, entry->kind)) {
goto out;
+ }
#define FIELD_ENCODER(bits, val) if (!amqp_encode_##bits(encoded, offset, val)) goto out; break
@@ -366,8 +388,9 @@ static int amqp_encode_field_value(amqp_bytes_t encoded,
case AMQP_FIELD_KIND_DECIMAL:
if (!amqp_encode_8(encoded, offset, entry->value.decimal.decimals)
- || !amqp_encode_32(encoded, offset, entry->value.decimal.value))
+ || !amqp_encode_32(encoded, offset, entry->value.decimal.value)) {
goto out;
+ }
break;
case AMQP_FIELD_KIND_UTF8:
@@ -376,8 +399,9 @@ static int amqp_encode_field_value(amqp_bytes_t encoded,
/* fall through */
case AMQP_FIELD_KIND_BYTES:
if (!amqp_encode_32(encoded, offset, entry->value.bytes.len)
- || !amqp_encode_bytes(encoded, offset, entry->value.bytes))
+ || !amqp_encode_bytes(encoded, offset, entry->value.bytes)) {
goto out;
+ }
break;
case AMQP_FIELD_KIND_ARRAY:
@@ -400,13 +424,14 @@ static int amqp_encode_field_value(amqp_bytes_t encoded,
res = 0;
- out:
+out:
return res;
}
/*---------------------------------------------------------------------------*/
-int amqp_table_entry_cmp(void const *entry1, void const *entry2) {
+int amqp_table_entry_cmp(void const *entry1, void const *entry2)
+{
amqp_table_entry_t const *p1 = (amqp_table_entry_t const *) entry1;
amqp_table_entry_t const *p2 = (amqp_table_entry_t const *) entry2;
@@ -414,7 +439,9 @@ int amqp_table_entry_cmp(void const *entry1, void const *entry2) {
size_t minlen;
minlen = p1->key.len;
- if (p2->key.len < minlen) minlen = p2->key.len;
+ if (p2->key.len < minlen) {
+ minlen = p2->key.len;
+ }
d = memcmp(p1->key.bytes, p2->key.bytes, minlen);
if (d != 0) {
diff --git a/librabbitmq/amqp_url.c b/librabbitmq/amqp_url.c
index 367376d..d3e6724 100644
--- a/librabbitmq/amqp_url.c
+++ b/librabbitmq/amqp_url.c
@@ -43,159 +43,167 @@
void amqp_default_connection_info(struct amqp_connection_info *ci)
{
- /* Apply defaults */
- ci->user = "guest";
- ci->password = "guest";
- ci->host = "localhost";
- ci->port = 5672;
- ci->vhost = "/";
+ /* Apply defaults */
+ ci->user = "guest";
+ ci->password = "guest";
+ ci->host = "localhost";
+ ci->port = 5672;
+ ci->vhost = "/";
}
/* Scan for the next delimiter, handling percent-encodings on the way. */
static char find_delim(char **pp, int colon_and_at_sign_are_delims)
{
- char *from = *pp;
- char *to = from;
-
- for (;;) {
- char ch = *from++;
-
- switch (ch) {
- case ':':
- case '@':
- if (!colon_and_at_sign_are_delims) {
- *to++ = ch;
- break;
- }
-
- /* fall through */
- case 0:
- case '/':
- case '?':
- case '#':
- case '[':
- case ']':
- *to = 0;
- *pp = from;
- return ch;
-
- case '%': {
- unsigned int val;
- int chars;
- int res = sscanf(from, "%2x%n", &val, &chars);
-
- if (res == EOF || res < 1 || chars != 2)
- /* Return a surprising delimiter to
- force an error. */
- return '%';
-
- *to++ = val;
- from += 2;
- break;
- }
-
- default:
- *to++ = ch;
- break;
- }
- }
+ char *from = *pp;
+ char *to = from;
+
+ for (;;) {
+ char ch = *from++;
+
+ switch (ch) {
+ case ':':
+ case '@':
+ if (!colon_and_at_sign_are_delims) {
+ *to++ = ch;
+ break;
+ }
+
+ /* fall through */
+ case 0:
+ case '/':
+ case '?':
+ case '#':
+ case '[':
+ case ']':
+ *to = 0;
+ *pp = from;
+ return ch;
+
+ case '%': {
+ unsigned int val;
+ int chars;
+ int res = sscanf(from, "%2x%n", &val, &chars);
+
+ if (res == EOF || res < 1 || chars != 2)
+ /* Return a surprising delimiter to
+ force an error. */
+ {
+ return '%';
+ }
+
+ *to++ = val;
+ from += 2;
+ break;
+ }
+
+ default:
+ *to++ = ch;
+ break;
+ }
+ }
}
/* Parse an AMQP URL into its component parts. */
int amqp_parse_url(char *url, struct amqp_connection_info *parsed)
{
- int res = -ERROR_BAD_AMQP_URL;
- char delim;
- char *start;
- char *host;
- char *port = NULL;
-
- /* check the prefix */
- if (strncmp(url, "amqp://", 7))
- goto out;
-
- host = start = url += 7;
- delim = find_delim(&url, 1);
-
- if (delim == ':') {
- /* The colon could be introducing the port or the
- password part of the userinfo. We don't know yet,
- so stash the preceding component. */
- port = start = url;
- delim = find_delim(&url, 1);
- }
-
- if (delim == '@') {
- /* What might have been the host and port were in fact
- the username and password */
- parsed->user = host;
- if (port)
- parsed->password = port;
-
- port = NULL;
- host = start = url;
- delim = find_delim(&url, 1);
- }
-
- if (delim == '[') {
- /* IPv6 address. The bracket should be the first
- character in the host. */
- if (host != start || *host != 0)
- goto out;
-
- start = url;
- delim = find_delim(&url, 0);
-
- if (delim != ']')
- goto out;
-
- parsed->host = start;
- start = url;
- delim = find_delim(&url, 1);
-
- /* Closing bracket should be the last character in the
- host. */
- if (*start != 0)
- goto out;
- }
- else {
- /* If we haven't seen the host yet, this is it. */
- if (*host != 0)
- parsed->host = host;
- }
-
- if (delim == ':') {
- port = start = url;
- delim = find_delim(&url, 1);
- }
-
- if (port) {
- char *end;
- long portnum = strtol(port, &end, 10);
-
- if (port == end || *end != 0 || portnum < 0 || portnum > 65535)
- goto out;
-
- parsed->port = portnum;
- }
-
- if (delim == '/') {
- start = url;
- delim = find_delim(&url, 1);
-
- if (delim != 0)
- goto out;
-
- parsed->vhost = start;
- res = 0;
- }
- else if (delim == 0) {
- res = 0;
- }
-
- /* Any other delimiter is bad, and we will return
- ERROR_BAD_AMQP_URL. */
-
- out:
- return res;
+ int res = -ERROR_BAD_AMQP_URL;
+ char delim;
+ char *start;
+ char *host;
+ char *port = NULL;
+
+ /* check the prefix */
+ if (strncmp(url, "amqp://", 7)) {
+ goto out;
+ }
+
+ host = start = url += 7;
+ delim = find_delim(&url, 1);
+
+ if (delim == ':') {
+ /* The colon could be introducing the port or the
+ password part of the userinfo. We don't know yet,
+ so stash the preceding component. */
+ port = start = url;
+ delim = find_delim(&url, 1);
+ }
+
+ if (delim == '@') {
+ /* What might have been the host and port were in fact
+ the username and password */
+ parsed->user = host;
+ if (port) {
+ parsed->password = port;
+ }
+
+ port = NULL;
+ host = start = url;
+ delim = find_delim(&url, 1);
+ }
+
+ if (delim == '[') {
+ /* IPv6 address. The bracket should be the first
+ character in the host. */
+ if (host != start || *host != 0) {
+ goto out;
+ }
+
+ start = url;
+ delim = find_delim(&url, 0);
+
+ if (delim != ']') {
+ goto out;
+ }
+
+ parsed->host = start;
+ start = url;
+ delim = find_delim(&url, 1);
+
+ /* Closing bracket should be the last character in the
+ host. */
+ if (*start != 0) {
+ goto out;
+ }
+ } else {
+ /* If we haven't seen the host yet, this is it. */
+ if (*host != 0) {
+ parsed->host = host;
+ }
+ }
+
+ if (delim == ':') {
+ port = start = url;
+ delim = find_delim(&url, 1);
+ }
+
+ if (port) {
+ char *end;
+ long portnum = strtol(port, &end, 10);
+
+ if (port == end || *end != 0 || portnum < 0 || portnum > 65535) {
+ goto out;
+ }
+
+ parsed->port = portnum;
+ }
+
+ if (delim == '/') {
+ start = url;
+ delim = find_delim(&url, 1);
+
+ if (delim != 0) {
+ goto out;
+ }
+
+ parsed->vhost = start;
+ res = 0;
+ } else if (delim == 0) {
+ res = 0;
+ }
+
+ /* Any other delimiter is bad, and we will return
+ ERROR_BAD_AMQP_URL. */
+
+out:
+ return res;
}
diff --git a/librabbitmq/unix/socket.c b/librabbitmq/unix/socket.c
index 69bb036..8c8ba6b 100644
--- a/librabbitmq/unix/socket.c
+++ b/librabbitmq/unix/socket.c
@@ -45,37 +45,38 @@
int
amqp_socket_init(void)
{
- return 0;
+ return 0;
}
int
amqp_socket_error(void)
{
- return errno | ERROR_CATEGORY_OS;
+ return errno | ERROR_CATEGORY_OS;
}
int amqp_socket_socket(int domain, int type, int proto)
{
- int flags;
+ int flags;
- int s = socket(domain, type, proto);
- if (s < 0)
- return s;
+ int s = socket(domain, type, proto);
+ if (s < 0) {
+ return s;
+ }
- /* Always enable CLOEXEC on the socket */
- flags = fcntl(s, F_GETFD);
- if (flags == -1
- || fcntl(s, F_SETFD, (long)(flags | FD_CLOEXEC)) == -1) {
- int e = errno;
- close(s);
- errno = e;
- return -1;
- }
+ /* Always enable CLOEXEC on the socket */
+ flags = fcntl(s, F_GETFD);
+ if (flags == -1
+ || fcntl(s, F_SETFD, (long)(flags | FD_CLOEXEC)) == -1) {
+ int e = errno;
+ close(s);
+ errno = e;
+ return -1;
+ }
- return s;
+ return s;
}
char *amqp_os_error_string(int err)
{
- return strdup(strerror(err));
+ return strdup(strerror(err));
}
diff --git a/librabbitmq/win32/socket.c b/librabbitmq/win32/socket.c
index 9919b6b..f3c8fc7 100644
--- a/librabbitmq/win32/socket.c
+++ b/librabbitmq/win32/socket.c
@@ -48,55 +48,58 @@ static int called_wsastartup;
int amqp_socket_init(void)
{
- if (!called_wsastartup) {
- WSADATA data;
- int res = WSAStartup(0x0202, &data);
- if (res)
- return -res;
+ if (!called_wsastartup) {
+ WSADATA data;
+ int res = WSAStartup(0x0202, &data);
+ if (res) {
+ return -res;
+ }
- called_wsastartup = 1;
- }
+ called_wsastartup = 1;
+ }
- return 0;
+ return 0;
}
char *amqp_os_error_string(int err)
{
- char *msg, *copy;
+ char *msg, *copy;
- if (!FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM
- | FORMAT_MESSAGE_ALLOCATE_BUFFER,
- NULL, err,
- MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
- (LPSTR)&msg, 0, NULL))
- return strdup("(error retrieving Windows error message)");
+ if (!FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM
+ | FORMAT_MESSAGE_ALLOCATE_BUFFER,
+ NULL, err,
+ MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
+ (LPSTR)&msg, 0, NULL)) {
+ return strdup("(error retrieving Windows error message)");
+ }
- copy = strdup(msg);
- LocalFree(msg);
- return copy;
+ copy = strdup(msg);
+ LocalFree(msg);
+ return copy;
}
int
amqp_socket_setsockopt(int sock, int level, int optname,
- const void *optval, size_t optlen)
+ const void *optval, size_t optlen)
{
- /* the winsock setsockopt function has its 4th argument as a
- const char * */
- return setsockopt(sock, level, optname, (const char *)optval, optlen);
+ /* the winsock setsockopt function has its 4th argument as a
+ const char * */
+ return setsockopt(sock, level, optname, (const char *)optval, optlen);
}
int
amqp_socket_writev(int sock, struct iovec *iov, int nvecs)
{
- DWORD ret;
- if (WSASend(sock, (LPWSABUF)iov, nvecs, &ret, 0, NULL, NULL) == 0)
- return ret;
- else
- return -1;
+ DWORD ret;
+ if (WSASend(sock, (LPWSABUF)iov, nvecs, &ret, 0, NULL, NULL) == 0) {
+ return ret;
+ } else {
+ return -1;
+ }
}
int
amqp_socket_error(void)
{
- return WSAGetLastError() | ERROR_CATEGORY_OS;
+ return WSAGetLastError() | ERROR_CATEGORY_OS;
}
diff --git a/librabbitmq/win32/socket.h b/librabbitmq/win32/socket.h
index 9351b1f..43e2a13 100644
--- a/librabbitmq/win32/socket.h
+++ b/librabbitmq/win32/socket.h
@@ -39,8 +39,8 @@
/* same as WSABUF */
struct iovec {
- u_long iov_len;
- void *iov_base;
+ u_long iov_len;
+ void *iov_base;
};
int
@@ -51,7 +51,7 @@ amqp_socket_init(void);
int
amqp_socket_setsockopt(int sock, int level, int optname, const void *optval,
- size_t optlen);
+ size_t optlen);
int
amqp_socket_writev(int sock, struct iovec *iov, int nvecs);
diff --git a/tests/test_parse_url.c b/tests/test_parse_url.c
index 2e7d1c7..585514d 100644
--- a/tests/test_parse_url.c
+++ b/tests/test_parse_url.c
@@ -43,127 +43,127 @@
static void match_string(const char *what, const char *expect, const char *got)
{
- if (strcmp(got, expect)) {
- fprintf(stderr, "Expected %s '%s', got '%s'\n",
- what, expect, got);
- abort();
- }
+ if (strcmp(got, expect)) {
+ fprintf(stderr, "Expected %s '%s', got '%s'\n",
+ what, expect, got);
+ abort();
+ }
}
static void match_int(const char *what, int expect, int got)
{
- if (got != expect) {
- fprintf(stderr, "Expected %s '%d', got '%d'\n",
- what, expect, got);
- abort();
- }
+ if (got != expect) {
+ fprintf(stderr, "Expected %s '%d', got '%d'\n",
+ what, expect, got);
+ abort();
+ }
}
static void parse_success(const char *url,
- const char *user,
- const char *password,
- const char *host,
- int port,
- const char *vhost)
+ const char *user,
+ const char *password,
+ const char *host,
+ int port,
+ const char *vhost)
{
- char *s = strdup(url);
- struct amqp_connection_info ci;
- int res;
-
- amqp_default_connection_info(&ci);
- res = amqp_parse_url(s, &ci);
- if (res) {
- fprintf(stderr,
- "Expected to successfully parse URL, but didn't: %s (%s)\n",
- url, amqp_error_string(-res));
- abort();
- }
-
- match_string("user", user, ci.user);
- match_string("password", password, ci.password);
- match_string("host", host, ci.host);
- match_int("port", port, ci.port);
- match_string("vhost", vhost, ci.vhost);
-
- free(s);
+ char *s = strdup(url);
+ struct amqp_connection_info ci;
+ int res;
+
+ amqp_default_connection_info(&ci);
+ res = amqp_parse_url(s, &ci);
+ if (res) {
+ fprintf(stderr,
+ "Expected to successfully parse URL, but didn't: %s (%s)\n",
+ url, amqp_error_string(-res));
+ abort();
+ }
+
+ match_string("user", user, ci.user);
+ match_string("password", password, ci.password);
+ match_string("host", host, ci.host);
+ match_int("port", port, ci.port);
+ match_string("vhost", vhost, ci.vhost);
+
+ free(s);
}
static void parse_fail(const char *url)
{
- char *s = strdup(url);
- struct amqp_connection_info ci;
-
- amqp_default_connection_info(&ci);
- if (amqp_parse_url(s, &ci) >= 0) {
- fprintf(stderr,
- "Expected to fail parsing URL, but didn't: %s\n",
- url);
- abort();
- }
-
- free(s);
+ char *s = strdup(url);
+ struct amqp_connection_info ci;
+
+ amqp_default_connection_info(&ci);
+ if (amqp_parse_url(s, &ci) >= 0) {
+ fprintf(stderr,
+ "Expected to fail parsing URL, but didn't: %s\n",
+ url);
+ abort();
+ }
+
+ free(s);
}
int main(void)
{
- /* From the spec */
- parse_success("amqp://user:pass@host:10000/vhost", "user", "pass",
- "host", 10000, "vhost");
- parse_success("amqp://user%61:%61pass@ho%61st:10000/v%2fhost",
- "usera", "apass", "hoast", 10000, "v/host");
- parse_success("amqp://", "guest", "guest", "localhost", 5672, "/");
- parse_success("amqp://:@/", "", "", "localhost", 5672, "");
- parse_success("amqp://user@", "user", "guest", "localhost", 5672, "/");
- parse_success("amqp://user:pass@", "user", "pass",
- "localhost", 5672, "/");
- parse_success("amqp://host", "guest", "guest", "host", 5672, "/");
- parse_success("amqp://:10000", "guest", "guest", "localhost", 10000,
- "/");
- parse_success("amqp:///vhost", "guest", "guest", "localhost", 5672,
- "vhost");
- parse_success("amqp://host/", "guest", "guest", "host", 5672, "");
- parse_success("amqp://host/%2f", "guest", "guest", "host", 5672, "/");
- parse_success("amqp://[::1]", "guest", "guest", "::1", 5672, "/");
-
- /* Various other success cases */
- parse_success("amqp://host:100", "guest", "guest", "host", 100, "/");
- parse_success("amqp://[::1]:100", "guest", "guest", "::1", 100, "/");
-
- parse_success("amqp://host/blah", "guest", "guest",
- "host", 5672, "blah");
- parse_success("amqp://host:100/blah", "guest", "guest",
- "host", 100, "blah");
- parse_success("amqp://:100/blah", "guest", "guest",
- "localhost", 100, "blah");
- parse_success("amqp://[::1]/blah", "guest", "guest",
- "::1", 5672, "blah");
- parse_success("amqp://[::1]:100/blah", "guest", "guest",
- "::1", 100, "blah");
-
- parse_success("amqp://user:pass@host", "user", "pass",
- "host", 5672, "/");
- parse_success("amqp://user:pass@host:100", "user", "pass",
- "host", 100, "/");
- parse_success("amqp://user:pass@:100", "user", "pass",
- "localhost", 100, "/");
- parse_success("amqp://user:pass@[::1]", "user", "pass",
- "::1", 5672, "/");
- parse_success("amqp://user:pass@[::1]:100", "user", "pass",
- "::1", 100, "/");
-
- /* Various failure cases */
- parse_fail("http://www.rabbitmq.com");
- parse_fail("amqp://foo:bar:baz");
- parse_fail("amqp://foo[::1]");
- parse_fail("amqp://foo:[::1]");
- parse_fail("amqp://[::1]foo");
- parse_fail("amqp://foo:1000xyz");
- parse_fail("amqp://foo:1000000");
- parse_fail("amqp://foo/bar/baz");
-
- parse_fail("amqp://foo%1");
- parse_fail("amqp://foo%1x");
- parse_fail("amqp://foo%xy");
-
- return 0;
+ /* From the spec */
+ parse_success("amqp://user:pass@host:10000/vhost", "user", "pass",
+ "host", 10000, "vhost");
+ parse_success("amqp://user%61:%61pass@ho%61st:10000/v%2fhost",
+ "usera", "apass", "hoast", 10000, "v/host");
+ parse_success("amqp://", "guest", "guest", "localhost", 5672, "/");
+ parse_success("amqp://:@/", "", "", "localhost", 5672, "");
+ parse_success("amqp://user@", "user", "guest", "localhost", 5672, "/");
+ parse_success("amqp://user:pass@", "user", "pass",
+ "localhost", 5672, "/");
+ parse_success("amqp://host", "guest", "guest", "host", 5672, "/");
+ parse_success("amqp://:10000", "guest", "guest", "localhost", 10000,
+ "/");
+ parse_success("amqp:///vhost", "guest", "guest", "localhost", 5672,
+ "vhost");
+ parse_success("amqp://host/", "guest", "guest", "host", 5672, "");
+ parse_success("amqp://host/%2f", "guest", "guest", "host", 5672, "/");
+ parse_success("amqp://[::1]", "guest", "guest", "::1", 5672, "/");
+
+ /* Various other success cases */
+ parse_success("amqp://host:100", "guest", "guest", "host", 100, "/");
+ parse_success("amqp://[::1]:100", "guest", "guest", "::1", 100, "/");
+
+ parse_success("amqp://host/blah", "guest", "guest",
+ "host", 5672, "blah");
+ parse_success("amqp://host:100/blah", "guest", "guest",
+ "host", 100, "blah");
+ parse_success("amqp://:100/blah", "guest", "guest",
+ "localhost", 100, "blah");
+ parse_success("amqp://[::1]/blah", "guest", "guest",
+ "::1", 5672, "blah");
+ parse_success("amqp://[::1]:100/blah", "guest", "guest",
+ "::1", 100, "blah");
+
+ parse_success("amqp://user:pass@host", "user", "pass",
+ "host", 5672, "/");
+ parse_success("amqp://user:pass@host:100", "user", "pass",
+ "host", 100, "/");
+ parse_success("amqp://user:pass@:100", "user", "pass",
+ "localhost", 100, "/");
+ parse_success("amqp://user:pass@[::1]", "user", "pass",
+ "::1", 5672, "/");
+ parse_success("amqp://user:pass@[::1]:100", "user", "pass",
+ "::1", 100, "/");
+
+ /* Various failure cases */
+ parse_fail("http://www.rabbitmq.com");
+ parse_fail("amqp://foo:bar:baz");
+ parse_fail("amqp://foo[::1]");
+ parse_fail("amqp://foo:[::1]");
+ parse_fail("amqp://[::1]foo");
+ parse_fail("amqp://foo:1000xyz");
+ parse_fail("amqp://foo:1000000");
+ parse_fail("amqp://foo/bar/baz");
+
+ parse_fail("amqp://foo%1");
+ parse_fail("amqp://foo%1x");
+ parse_fail("amqp://foo%xy");
+
+ return 0;
}
diff --git a/tests/test_tables.c b/tests/test_tables.c
index 404a6d8..21e1c01 100644
--- a/tests/test_tables.c
+++ b/tests/test_tables.c
@@ -48,20 +48,21 @@
void die(const char *fmt, ...)
{
- va_list ap;
- va_start(ap, fmt);
- vfprintf(stderr, fmt, ap);
- va_end(ap);
- fprintf(stderr, "\n");
- abort();
+ va_list ap;
+ va_start(ap, fmt);
+ vfprintf(stderr, fmt, ap);
+ va_end(ap);
+ fprintf(stderr, "\n");
+ abort();
}
static void dump_indent(int indent, FILE *out)
{
int i;
- for (i = 0; i < indent; i++)
+ for (i = 0; i < indent; i++) {
fputc(' ', out);
+ }
}
static void dump_value(int indent, amqp_field_value_t v, FILE *out)
@@ -114,26 +115,28 @@ static void dump_value(int indent, amqp_field_value_t v, FILE *out)
case AMQP_FIELD_KIND_DECIMAL:
fprintf(out, " %d:::%u\n", v.value.decimal.decimals,
- v.value.decimal.value);
+ v.value.decimal.value);
break;
case AMQP_FIELD_KIND_UTF8:
fprintf(out, " %.*s\n", (int)v.value.bytes.len,
- (char *)v.value.bytes.bytes);
+ (char *)v.value.bytes.bytes);
break;
case AMQP_FIELD_KIND_BYTES:
fputc(' ', out);
- for (i = 0; i < (int)v.value.bytes.len; i++)
+ for (i = 0; i < (int)v.value.bytes.len; i++) {
fprintf(out, "%02x", ((char *) v.value.bytes.bytes)[i]);
+ }
fputc('\n', out);
break;
case AMQP_FIELD_KIND_ARRAY:
fputc('\n', out);
- for (i = 0; i < v.value.array.num_entries; i++)
+ for (i = 0; i < v.value.array.num_entries; i++) {
dump_value(indent + 2, v.value.array.entries[i], out);
+ }
break;
@@ -146,8 +149,8 @@ static void dump_value(int indent, amqp_field_value_t v, FILE *out)
for (i = 0; i < v.value.table.num_entries; i++) {
dump_indent(indent + 2, out);
fprintf(out, "%.*s ->\n",
- (int)v.value.table.entries[i].key.len,
- (char *)v.value.table.entries[i].key.bytes);
+ (int)v.value.table.entries[i].key.len,
+ (char *)v.value.table.entries[i].key.bytes);
dump_value(indent + 4, v.value.table.entries[i].value, out);
}
@@ -361,9 +364,10 @@ static void test_table_codec(FILE *out)
decoding_bytes.bytes = pre_encoded_table;
result = amqp_decode_table(decoding_bytes, &pool, &decoded,
- &decoding_offset);
- if (result < 0)
+ &decoding_offset);
+ if (result < 0) {
die("Table decoding failed: %s", amqp_error_string(-result));
+ }
fprintf(out, "BBBBBBBBBB\n");
@@ -386,16 +390,18 @@ static void test_table_codec(FILE *out)
encoding_result.bytes = &encoding_buffer[0];
result = amqp_encode_table(encoding_result, &table, &offset);
- if (result < 0)
+ if (result < 0) {
die("Table encoding failed: %s", amqp_error_string(-result));
+ }
if (offset != sizeof(pre_encoded_table))
die("Offset should be %ld, was %ld", (long)sizeof(pre_encoded_table),
- (long)offset);
+ (long)offset);
result = memcmp(pre_encoded_table, encoding_buffer, offset);
- if (result != 0)
+ if (result != 0) {
die("Table encoding differed", result);
+ }
}
empty_amqp_pool(&pool);
@@ -417,12 +423,14 @@ static int compare_files(FILE *f1_in, FILE *f2_in)
size_t f2_got = fread(f2_buf, 1, CHUNK_SIZE, f2_in);
res = memcmp(f1_buf, f2_buf, f1_got < f2_got ? f1_got : f2_got);
- if (res)
+ if (res) {
break;
+ }
if (f1_got < CHUNK_SIZE || f2_got < CHUNK_SIZE) {
- if (f1_got != f2_got)
+ if (f1_got != f2_got) {
res = (f1_got < f2_got ? -1 : 1);
+ }
break;
}
}
@@ -439,24 +447,28 @@ int main(void)
char *expected_path;
out = tmpfile();
- if (out == NULL)
+ if (out == NULL) {
die("failed to create temporary file: %s", strerror(errno));
+ }
test_table_codec(out);
fprintf(out, "----------\n");
test_dump_value(out);
- if (srcdir == NULL)
+ if (srcdir == NULL) {
srcdir = ".";
+ }
expected_path = malloc(strlen(srcdir) + strlen(expected_file_name) + 2);
sprintf(expected_path, "%s/%s", srcdir, expected_file_name);
expected = fopen(expected_path, "r");
- if (!expected)
+ if (!expected) {
die("failed to open %s: %s", expected_path, strerror(errno));
+ }
- if (compare_files(expected, out))
+ if (compare_files(expected, out)) {
die("output file did not have expected contents");
+ }
fclose(out);
fclose(expected);
diff --git a/tools/common.c b/tools/common.c
index c5b2a77..bead858 100644
--- a/tools/common.c
+++ b/tools/common.c
@@ -54,113 +54,116 @@
void die(const char *fmt, ...)
{
- va_list ap;
- va_start(ap, fmt);
- vfprintf(stderr, fmt, ap);
- va_end(ap);
- fprintf(stderr, "\n");
- exit(1);
+ va_list ap;
+ va_start(ap, fmt);
+ vfprintf(stderr, fmt, ap);
+ va_end(ap);
+ fprintf(stderr, "\n");
+ exit(1);
}
void die_errno(int err, const char *fmt, ...)
{
- va_list ap;
+ va_list ap;
- if (err == 0)
- return;
+ if (err == 0) {
+ return;
+ }
- va_start(ap, fmt);
- vfprintf(stderr, fmt, ap);
- va_end(ap);
- fprintf(stderr, ": %s\n", strerror(err));
- exit(1);
+ va_start(ap, fmt);
+ vfprintf(stderr, fmt, ap);
+ va_end(ap);
+ fprintf(stderr, ": %s\n", strerror(err));
+ exit(1);
}
void die_amqp_error(int err, const char *fmt, ...)
{
- va_list ap;
- char *errstr;
-
- if (err >= 0)
- return;
-
- va_start(ap, fmt);
- vfprintf(stderr, fmt, ap);
- va_end(ap);
- fprintf(stderr, ": %s\n", errstr = amqp_error_string(-err));
- free(errstr);
- exit(1);
+ va_list ap;
+ char *errstr;
+
+ if (err >= 0) {
+ return;
+ }
+
+ va_start(ap, fmt);
+ vfprintf(stderr, fmt, ap);
+ va_end(ap);
+ fprintf(stderr, ": %s\n", errstr = amqp_error_string(-err));
+ free(errstr);
+ exit(1);
}
char *amqp_server_exception_string(amqp_rpc_reply_t r)
{
- int res;
- char *s;
-
- switch (r.reply.id) {
- case AMQP_CONNECTION_CLOSE_METHOD: {
- amqp_connection_close_t *m
- = (amqp_connection_close_t *)r.reply.decoded;
- res = asprintf(&s, "server connection error %d, message: %.*s",
- m->reply_code,
- (int)m->reply_text.len,
- (char *)m->reply_text.bytes);
- break;
- }
-
- case AMQP_CHANNEL_CLOSE_METHOD: {
- amqp_channel_close_t *m
- = (amqp_channel_close_t *)r.reply.decoded;
- res = asprintf(&s, "server channel error %d, message: %.*s",
- m->reply_code,
- (int)m->reply_text.len,
- (char *)m->reply_text.bytes);
- break;
- }
-
- default:
- res = asprintf(&s, "unknown server error, method id 0x%08X",
- r.reply.id);
- break;
- }
-
- return res >= 0 ? s : NULL;
+ int res;
+ char *s;
+
+ switch (r.reply.id) {
+ case AMQP_CONNECTION_CLOSE_METHOD: {
+ amqp_connection_close_t *m
+ = (amqp_connection_close_t *)r.reply.decoded;
+ res = asprintf(&s, "server connection error %d, message: %.*s",
+ m->reply_code,
+ (int)m->reply_text.len,
+ (char *)m->reply_text.bytes);
+ break;
+ }
+
+ case AMQP_CHANNEL_CLOSE_METHOD: {
+ amqp_channel_close_t *m
+ = (amqp_channel_close_t *)r.reply.decoded;
+ res = asprintf(&s, "server channel error %d, message: %.*s",
+ m->reply_code,
+ (int)m->reply_text.len,
+ (char *)m->reply_text.bytes);
+ break;
+ }
+
+ default:
+ res = asprintf(&s, "unknown server error, method id 0x%08X",
+ r.reply.id);
+ break;
+ }
+
+ return res >= 0 ? s : NULL;
}
char *amqp_rpc_reply_string(amqp_rpc_reply_t r)
{
- switch (r.reply_type) {
- case AMQP_RESPONSE_NORMAL:
- return strdup("normal response");
+ switch (r.reply_type) {
+ case AMQP_RESPONSE_NORMAL:
+ return strdup("normal response");
- case AMQP_RESPONSE_NONE:
- return strdup("missing RPC reply type");
+ case AMQP_RESPONSE_NONE:
+ return strdup("missing RPC reply type");
- case AMQP_RESPONSE_LIBRARY_EXCEPTION:
- return amqp_error_string(r.library_error);
+ case AMQP_RESPONSE_LIBRARY_EXCEPTION:
+ return amqp_error_string(r.library_error);
- case AMQP_RESPONSE_SERVER_EXCEPTION:
- return amqp_server_exception_string(r);
+ case AMQP_RESPONSE_SERVER_EXCEPTION:
+ return amqp_server_exception_string(r);
- default:
- abort();
- }
+ default:
+ abort();
+ }
}
void die_rpc(amqp_rpc_reply_t r, const char *fmt, ...)
{
- va_list ap;
- char *errstr;
-
- if (r.reply_type == AMQP_RESPONSE_NORMAL)
- return;
-
- va_start(ap, fmt);
- vfprintf(stderr, fmt, ap);
- va_end(ap);
- fprintf(stderr, ": %s\n", errstr = amqp_rpc_reply_string(r));
- free(errstr);
- exit(1);
+ va_list ap;
+ char *errstr;
+
+ if (r.reply_type == AMQP_RESPONSE_NORMAL) {
+ return;
+ }
+
+ va_start(ap, fmt);
+ vfprintf(stderr, fmt, ap);
+ va_end(ap);
+ fprintf(stderr, ": %s\n", errstr = amqp_rpc_reply_string(r));
+ free(errstr);
+ exit(1);
}
static char *amqp_url;
@@ -172,267 +175,299 @@ static char *amqp_password;
const char *connect_options_title = "Connection options";
struct poptOption connect_options[] = {
- {"url", 'u', POPT_ARG_STRING, &amqp_url, 0,
- "the AMQP URL to connect to", "amqp://..."},
- {"server", 's', POPT_ARG_STRING, &amqp_server, 0,
- "the AMQP server to connect to", "hostname"},
- {"port", 0, POPT_ARG_INT, &amqp_port, 0,
- "the port to connect on", "port" },
- {"vhost", 0, POPT_ARG_STRING, &amqp_vhost, 0,
- "the vhost to use when connecting", "vhost"},
- {"username", 0, POPT_ARG_STRING, &amqp_username, 0,
- "the username to login with", "username"},
- {"password", 0, POPT_ARG_STRING, &amqp_password, 0,
- "the password to login with", "password"},
- { NULL, '\0', 0, NULL, 0, NULL, NULL }
+ {
+ "url", 'u', POPT_ARG_STRING, &amqp_url, 0,
+ "the AMQP URL to connect to", "amqp://..."
+ },
+ {
+ "server", 's', POPT_ARG_STRING, &amqp_server, 0,
+ "the AMQP server to connect to", "hostname"
+ },
+ {
+ "port", 0, POPT_ARG_INT, &amqp_port, 0,
+ "the port to connect on", "port"
+ },
+ {
+ "vhost", 0, POPT_ARG_STRING, &amqp_vhost, 0,
+ "the vhost to use when connecting", "vhost"
+ },
+ {
+ "username", 0, POPT_ARG_STRING, &amqp_username, 0,
+ "the username to login with", "username"
+ },
+ {
+ "password", 0, POPT_ARG_STRING, &amqp_password, 0,
+ "the password to login with", "password"
+ },
+ { NULL, '\0', 0, NULL, 0, NULL, NULL }
};
static void init_connection_info(struct amqp_connection_info *ci)
{
- struct amqp_connection_info defaults;
+ struct amqp_connection_info defaults;
- ci->user = NULL;
- ci->password = NULL;
- ci->host = NULL;
- ci->port = -1;
- ci->vhost = NULL;
- ci->user = NULL;
+ ci->user = NULL;
+ ci->password = NULL;
+ ci->host = NULL;
+ ci->port = -1;
+ ci->vhost = NULL;
+ ci->user = NULL;
- if (amqp_url)
- die_amqp_error(amqp_parse_url(strdup(amqp_url), ci),
- "Parsing URL '%s'", amqp_url);
+ if (amqp_url) {
+ die_amqp_error(amqp_parse_url(strdup(amqp_url), ci),
+ "Parsing URL '%s'", amqp_url);
+ }
- if (amqp_server) {
+ if (amqp_server) {
char *colon;
- if (ci->host)
- die("both --server and --url options specify"
- " server host");
-
- /* parse the server string into a hostname and a port */
- colon = strchr(amqp_server, ':');
- if (colon) {
- char *port_end;
- size_t host_len;
-
- /* Deprecate specifying the port number with the
- --server option, because it is not ipv6 friendly.
- --url now allows connection options to be
- specificied concisely. */
- fprintf(stderr, "Specifying the port number with"
- " --server is deprecated\n");
-
- host_len = colon - amqp_server;
- ci->host = malloc(host_len + 1);
- memcpy(ci->host, amqp_server, host_len);
- ci->host[host_len] = 0;
-
- if (ci->port >= 0)
- die("both --server and --url options specify"
- " server port");
- if (amqp_port >= 0)
- die("both --server and --port options specify"
- " server port");
-
- ci->port = strtol(colon+1, &port_end, 10);
- if (ci->port < 0
- || ci->port > 65535
- || port_end == colon+1
- || *port_end != 0)
- die("bad server port number in '%s'",
- amqp_server);
- }
- }
-
- if (amqp_port >= 0) {
- if (ci->port >= 0)
- die("both --port and --url options specify"
- " server port");
-
- ci->port = amqp_port;
- }
-
- if (amqp_username) {
- if (ci->user)
- die("both --username and --url options specify"
- " AMQP username");
-
- ci->user = amqp_username;
- }
-
- if (amqp_password) {
- if (ci->password)
- die("both --password and --url options specify"
- " AMQP password");
-
- ci->password = amqp_password;
- }
-
- if (amqp_vhost) {
- if (ci->vhost)
- die("both --vhost and --url options specify"
- " AMQP vhost");
-
- ci->vhost = amqp_vhost;
- }
-
- amqp_default_connection_info(&defaults);
-
- if (!ci->user)
- ci->user = defaults.user;
- if (!ci->password)
- ci->password = defaults.password;
- if (!ci->host)
- ci->host = defaults.host;
- if (ci->port < 0)
- ci->port = defaults.port;
- if (!ci->vhost)
- ci->vhost = defaults.vhost;
+ if (ci->host) {
+ die("both --server and --url options specify"
+ " server host");
+ }
+
+ /* parse the server string into a hostname and a port */
+ colon = strchr(amqp_server, ':');
+ if (colon) {
+ char *port_end;
+ size_t host_len;
+
+ /* Deprecate specifying the port number with the
+ --server option, because it is not ipv6 friendly.
+ --url now allows connection options to be
+ specificied concisely. */
+ fprintf(stderr, "Specifying the port number with"
+ " --server is deprecated\n");
+
+ host_len = colon - amqp_server;
+ ci->host = malloc(host_len + 1);
+ memcpy(ci->host, amqp_server, host_len);
+ ci->host[host_len] = 0;
+
+ if (ci->port >= 0) {
+ die("both --server and --url options specify"
+ " server port");
+ }
+ if (amqp_port >= 0) {
+ die("both --server and --port options specify"
+ " server port");
+ }
+
+ ci->port = strtol(colon+1, &port_end, 10);
+ if (ci->port < 0
+ || ci->port > 65535
+ || port_end == colon+1
+ || *port_end != 0) {
+ die("bad server port number in '%s'",
+ amqp_server);
+ }
+ }
+ }
+
+ if (amqp_port >= 0) {
+ if (ci->port >= 0) {
+ die("both --port and --url options specify"
+ " server port");
+ }
+
+ ci->port = amqp_port;
+ }
+
+ if (amqp_username) {
+ if (ci->user) {
+ die("both --username and --url options specify"
+ " AMQP username");
+ }
+
+ ci->user = amqp_username;
+ }
+
+ if (amqp_password) {
+ if (ci->password) {
+ die("both --password and --url options specify"
+ " AMQP password");
+ }
+
+ ci->password = amqp_password;
+ }
+
+ if (amqp_vhost) {
+ if (ci->vhost) {
+ die("both --vhost and --url options specify"
+ " AMQP vhost");
+ }
+
+ ci->vhost = amqp_vhost;
+ }
+
+ amqp_default_connection_info(&defaults);
+
+ if (!ci->user) {
+ ci->user = defaults.user;
+ }
+ if (!ci->password) {
+ ci->password = defaults.password;
+ }
+ if (!ci->host) {
+ ci->host = defaults.host;
+ }
+ if (ci->port < 0) {
+ ci->port = defaults.port;
+ }
+ if (!ci->vhost) {
+ ci->vhost = defaults.vhost;
+ }
}
amqp_connection_state_t make_connection(void)
{
- int s;
- struct amqp_connection_info ci;
- amqp_connection_state_t conn;
+ int s;
+ struct amqp_connection_info ci;
+ amqp_connection_state_t conn;
- init_connection_info(&ci);
+ init_connection_info(&ci);
- s = amqp_open_socket(ci.host, ci.port);
- die_amqp_error(s, "opening socket to %s:%d", ci.host, ci.port);
+ s = amqp_open_socket(ci.host, ci.port);
+ die_amqp_error(s, "opening socket to %s:%d", ci.host, ci.port);
- conn = amqp_new_connection();
- amqp_set_sockfd(conn, s);
+ conn = amqp_new_connection();
+ amqp_set_sockfd(conn, s);
- die_rpc(amqp_login(conn, ci.vhost, 0, 131072, 0,
- AMQP_SASL_METHOD_PLAIN,
- ci.user, ci.password),
- "logging in to AMQP server");
+ die_rpc(amqp_login(conn, ci.vhost, 0, 131072, 0,
+ AMQP_SASL_METHOD_PLAIN,
+ ci.user, ci.password),
+ "logging in to AMQP server");
- if (!amqp_channel_open(conn, 1))
- die_rpc(amqp_get_rpc_reply(conn), "opening channel");
+ if (!amqp_channel_open(conn, 1)) {
+ die_rpc(amqp_get_rpc_reply(conn), "opening channel");
+ }
- return conn;
+ return conn;
}
void close_connection(amqp_connection_state_t conn)
{
- int res;
- die_rpc(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS),
- "closing channel");
- die_rpc(amqp_connection_close(conn, AMQP_REPLY_SUCCESS),
- "closing connection");
-
- res = amqp_destroy_connection(conn);
- die_amqp_error(res, "closing connection");
+ int res;
+ die_rpc(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS),
+ "closing channel");
+ die_rpc(amqp_connection_close(conn, AMQP_REPLY_SUCCESS),
+ "closing connection");
+
+ res = amqp_destroy_connection(conn);
+ die_amqp_error(res, "closing connection");
}
amqp_bytes_t read_all(int fd)
{
- size_t space = 4096;
- amqp_bytes_t bytes;
-
- bytes.bytes = malloc(space);
- bytes.len = 0;
-
- for (;;) {
- ssize_t res = read(fd, (char *)bytes.bytes + bytes.len,
- space-bytes.len);
- if (res == 0)
- break;
-
- if (res < 0) {
- if (errno == EINTR)
- continue;
-
- die_errno(errno, "reading");
- }
-
- bytes.len += res;
- if (bytes.len == space) {
- space *= 2;
- bytes.bytes = realloc(bytes.bytes, space);
- }
- }
-
- return bytes;
+ size_t space = 4096;
+ amqp_bytes_t bytes;
+
+ bytes.bytes = malloc(space);
+ bytes.len = 0;
+
+ for (;;) {
+ ssize_t res = read(fd, (char *)bytes.bytes + bytes.len,
+ space-bytes.len);
+ if (res == 0) {
+ break;
+ }
+
+ if (res < 0) {
+ if (errno == EINTR) {
+ continue;
+ }
+
+ die_errno(errno, "reading");
+ }
+
+ bytes.len += res;
+ if (bytes.len == space) {
+ space *= 2;
+ bytes.bytes = realloc(bytes.bytes, space);
+ }
+ }
+
+ return bytes;
}
void write_all(int fd, amqp_bytes_t data)
{
- while (data.len > 0) {
- ssize_t res = write(fd, data.bytes, data.len);
- if (res < 0)
- die_errno(errno, "write");
-
- data.len -= res;
- data.bytes = (char *)data.bytes + res;
- }
+ while (data.len > 0) {
+ ssize_t res = write(fd, data.bytes, data.len);
+ if (res < 0) {
+ die_errno(errno, "write");
+ }
+
+ data.len -= res;
+ data.bytes = (char *)data.bytes + res;
+ }
}
void copy_body(amqp_connection_state_t conn, int fd)
{
- size_t body_remaining;
- amqp_frame_t frame;
-
- int res = amqp_simple_wait_frame(conn, &frame);
- die_amqp_error(res, "waiting for header frame");
- if (frame.frame_type != AMQP_FRAME_HEADER)
- die("expected header, got frame type 0x%X",
- frame.frame_type);
-
- body_remaining = frame.payload.properties.body_size;
- while (body_remaining) {
- res = amqp_simple_wait_frame(conn, &frame);
- die_amqp_error(res, "waiting for body frame");
- if (frame.frame_type != AMQP_FRAME_BODY)
- die("expected body, got frame type 0x%X",
- frame.frame_type);
-
- write_all(fd, frame.payload.body_fragment);
- body_remaining -= frame.payload.body_fragment.len;
- }
+ size_t body_remaining;
+ amqp_frame_t frame;
+
+ int res = amqp_simple_wait_frame(conn, &frame);
+ die_amqp_error(res, "waiting for header frame");
+ if (frame.frame_type != AMQP_FRAME_HEADER) {
+ die("expected header, got frame type 0x%X",
+ frame.frame_type);
+ }
+
+ body_remaining = frame.payload.properties.body_size;
+ while (body_remaining) {
+ res = amqp_simple_wait_frame(conn, &frame);
+ die_amqp_error(res, "waiting for body frame");
+ if (frame.frame_type != AMQP_FRAME_BODY) {
+ die("expected body, got frame type 0x%X",
+ frame.frame_type);
+ }
+
+ write_all(fd, frame.payload.body_fragment);
+ body_remaining -= frame.payload.body_fragment.len;
+ }
}
poptContext process_options(int argc, const char **argv,
- struct poptOption *options,
- const char *help)
+ struct poptOption *options,
+ const char *help)
{
- int c;
- poptContext opts = poptGetContext(NULL, argc, argv, options, 0);
- poptSetOtherOptionHelp(opts, help);
-
- while ((c = poptGetNextOpt(opts)) >= 0) {
- /* no options require explicit handling */
- }
-
- if (c < -1) {
- fprintf(stderr, "%s: %s\n",
- poptBadOption(opts, POPT_BADOPTION_NOALIAS),
- poptStrerror(c));
- poptPrintUsage(opts, stderr, 0);
- exit(1);
- }
-
- return opts;
+ int c;
+ poptContext opts = poptGetContext(NULL, argc, argv, options, 0);
+ poptSetOtherOptionHelp(opts, help);
+
+ while ((c = poptGetNextOpt(opts)) >= 0) {
+ /* no options require explicit handling */
+ }
+
+ if (c < -1) {
+ fprintf(stderr, "%s: %s\n",
+ poptBadOption(opts, POPT_BADOPTION_NOALIAS),
+ poptStrerror(c));
+ poptPrintUsage(opts, stderr, 0);
+ exit(1);
+ }
+
+ return opts;
}
void process_all_options(int argc, const char **argv,
- struct poptOption *options)
+ struct poptOption *options)
{
- poptContext opts = process_options(argc, argv, options,
- "[OPTIONS]...");
- const char *opt = poptPeekArg(opts);
+ poptContext opts = process_options(argc, argv, options,
+ "[OPTIONS]...");
+ const char *opt = poptPeekArg(opts);
- if (opt) {
- fprintf(stderr, "unexpected operand: %s\n", opt);
- poptPrintUsage(opts, stderr, 0);
- exit(1);
- }
+ if (opt) {
+ fprintf(stderr, "unexpected operand: %s\n", opt);
+ poptPrintUsage(opts, stderr, 0);
+ exit(1);
+ }
- poptFreeContext(opts);
+ poptFreeContext(opts);
}
amqp_bytes_t cstring_bytes(const char *str)
{
- return str ? amqp_cstring_bytes(str) : amqp_empty_bytes;
+ return str ? amqp_cstring_bytes(str) : amqp_empty_bytes;
}
diff --git a/tools/common.h b/tools/common.h
index 0a9af9d..77979ee 100644
--- a/tools/common.h
+++ b/tools/common.h
@@ -42,13 +42,13 @@ extern char *amqp_server_exception_string(amqp_rpc_reply_t r);
extern char *amqp_rpc_reply_string(amqp_rpc_reply_t r);
extern void die(const char *fmt, ...)
- __attribute__ ((format (printf, 1, 2)));
+__attribute__ ((format (printf, 1, 2)));
extern void die_errno(int err, const char *fmt, ...)
- __attribute__ ((format (printf, 2, 3)));
+__attribute__ ((format (printf, 2, 3)));
extern void die_amqp_error(int err, const char *fmt, ...)
- __attribute__ ((format (printf, 2, 3)));
+__attribute__ ((format (printf, 2, 3)));
extern void die_rpc(amqp_rpc_reply_t r, const char *fmt, ...)
- __attribute__ ((format (printf, 2, 3)));
+__attribute__ ((format (printf, 2, 3)));
extern const char *connect_options_title;
extern struct poptOption connect_options[];
@@ -61,12 +61,12 @@ extern void write_all(int fd, amqp_bytes_t data);
extern void copy_body(amqp_connection_state_t conn, int fd);
#define INCLUDE_OPTIONS(options) \
- {NULL, 0, POPT_ARG_INCLUDE_TABLE, options, 0, options ## _title, NULL}
+ {NULL, 0, POPT_ARG_INCLUDE_TABLE, options, 0, options ## _title, NULL}
extern poptContext process_options(int argc, const char **argv,
- struct poptOption *options,
- const char *help);
+ struct poptOption *options,
+ const char *help);
extern void process_all_options(int argc, const char **argv,
- struct poptOption *options);
+ struct poptOption *options);
extern amqp_bytes_t cstring_bytes(const char *str);
diff --git a/tools/consume.c b/tools/consume.c
index d6ddfa8..c52311e 100644
--- a/tools/consume.c
+++ b/tools/consume.c
@@ -45,164 +45,179 @@
use the same escaping conventions as rabbitmqctl. */
static char *stringify_bytes(amqp_bytes_t bytes)
{
- /* We will need up to 4 chars per byte, plus the terminating 0 */
- char *res = malloc(bytes.len * 4 + 1);
- uint8_t *data = bytes.bytes;
- char *p = res;
- size_t i;
-
- for (i = 0; i < bytes.len; i++) {
- if (data[i] >= 32 && data[i] != 127) {
- *p++ = data[i];
- }
- else {
- *p++ = '\\';
- *p++ = '0' + (data[i] >> 6);
- *p++ = '0' + (data[i] >> 3 & 0x7);
- *p++ = '0' + (data[i] & 0x7);
- }
- }
-
- *p = 0;
- return res;
+ /* We will need up to 4 chars per byte, plus the terminating 0 */
+ char *res = malloc(bytes.len * 4 + 1);
+ uint8_t *data = bytes.bytes;
+ char *p = res;
+ size_t i;
+
+ for (i = 0; i < bytes.len; i++) {
+ if (data[i] >= 32 && data[i] != 127) {
+ *p++ = data[i];
+ } else {
+ *p++ = '\\';
+ *p++ = '0' + (data[i] >> 6);
+ *p++ = '0' + (data[i] >> 3 & 0x7);
+ *p++ = '0' + (data[i] & 0x7);
+ }
+ }
+
+ *p = 0;
+ return res;
}
static amqp_bytes_t setup_queue(amqp_connection_state_t conn,
- char *queue, char *exchange,
- char *routing_key, int declare)
+ char *queue, char *exchange,
+ char *routing_key, int declare)
{
- amqp_bytes_t queue_bytes = cstring_bytes(queue);
-
- /* if an exchange name wasn't provided, check that we don't
- have options that require it. */
- if (!exchange && routing_key) {
- fprintf(stderr, "--routing-key option requires an exchange"
- " name to be provided with --exchange\n");
- exit(1);
- }
-
- if (!queue || exchange || declare) {
- /* Declare the queue as auto-delete. */
- amqp_queue_declare_ok_t *res = amqp_queue_declare(conn, 1,
- queue_bytes, 0, 0, 1, 1,
- amqp_empty_table);
- if (!res)
- die_rpc(amqp_get_rpc_reply(conn), "queue.declare");
-
- if (!queue) {
- /* the server should have provided a queue name */
- char *sq;
- queue_bytes = amqp_bytes_malloc_dup(res->queue);
- sq = stringify_bytes(queue_bytes);
- fprintf(stderr, "Server provided queue name: %s\n",
- sq);
- free(sq);
- }
-
- /* Bind to an exchange if requested */
- if (exchange) {
- amqp_bytes_t eb = amqp_cstring_bytes(exchange);
- if (!amqp_queue_bind(conn, 1, queue_bytes, eb,
- cstring_bytes(routing_key),
- amqp_empty_table))
- die_rpc(amqp_get_rpc_reply(conn),
- "queue.bind");
- }
- }
-
- return queue_bytes;
+ amqp_bytes_t queue_bytes = cstring_bytes(queue);
+
+ /* if an exchange name wasn't provided, check that we don't
+ have options that require it. */
+ if (!exchange && routing_key) {
+ fprintf(stderr, "--routing-key option requires an exchange"
+ " name to be provided with --exchange\n");
+ exit(1);
+ }
+
+ if (!queue || exchange || declare) {
+ /* Declare the queue as auto-delete. */
+ amqp_queue_declare_ok_t *res = amqp_queue_declare(conn, 1,
+ queue_bytes, 0, 0, 1, 1,
+ amqp_empty_table);
+ if (!res) {
+ die_rpc(amqp_get_rpc_reply(conn), "queue.declare");
+ }
+
+ if (!queue) {
+ /* the server should have provided a queue name */
+ char *sq;
+ queue_bytes = amqp_bytes_malloc_dup(res->queue);
+ sq = stringify_bytes(queue_bytes);
+ fprintf(stderr, "Server provided queue name: %s\n",
+ sq);
+ free(sq);
+ }
+
+ /* Bind to an exchange if requested */
+ if (exchange) {
+ amqp_bytes_t eb = amqp_cstring_bytes(exchange);
+ if (!amqp_queue_bind(conn, 1, queue_bytes, eb,
+ cstring_bytes(routing_key),
+ amqp_empty_table))
+ die_rpc(amqp_get_rpc_reply(conn),
+ "queue.bind");
+ }
+ }
+
+ return queue_bytes;
}
static void do_consume(amqp_connection_state_t conn, amqp_bytes_t queue,
- int no_ack, int count, const char * const *argv)
+ int no_ack, int count, const char *const *argv)
{
- int i;
-
- /* If there is a limit, set the qos to match */
- if (count > 0 && count <= 65535
- && !amqp_basic_qos(conn, 1, 0, count, 0))
- die_rpc(amqp_get_rpc_reply(conn), "basic.qos");
-
- if (!amqp_basic_consume(conn, 1, queue, amqp_empty_bytes, 0, no_ack,
- 0, amqp_empty_table))
- die_rpc(amqp_get_rpc_reply(conn), "basic.consume");
-
- for (i = 0; count < 0 || i < count; i++) {
- amqp_frame_t frame;
- struct pipeline pl;
- uint64_t delivery_tag;
- int res = amqp_simple_wait_frame(conn, &frame);
- die_amqp_error(res, "waiting for header frame");
-
- if (frame.frame_type != AMQP_FRAME_METHOD
- || frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD)
- continue;
-
- amqp_basic_deliver_t *deliver
- = (amqp_basic_deliver_t *)frame.payload.method.decoded;
- delivery_tag = deliver->delivery_tag;
-
- pipeline(argv, &pl);
- copy_body(conn, pl.infd);
-
- if (finish_pipeline(&pl) && !no_ack)
- die_amqp_error(amqp_basic_ack(conn, 1, delivery_tag,
- 0),
- "basic.ack");
-
- amqp_maybe_release_buffers(conn);
- }
+ int i;
+
+ /* If there is a limit, set the qos to match */
+ if (count > 0 && count <= 65535
+ && !amqp_basic_qos(conn, 1, 0, count, 0)) {
+ die_rpc(amqp_get_rpc_reply(conn), "basic.qos");
+ }
+
+ if (!amqp_basic_consume(conn, 1, queue, amqp_empty_bytes, 0, no_ack,
+ 0, amqp_empty_table)) {
+ die_rpc(amqp_get_rpc_reply(conn), "basic.consume");
+ }
+
+ for (i = 0; count < 0 || i < count; i++) {
+ amqp_frame_t frame;
+ struct pipeline pl;
+ uint64_t delivery_tag;
+ int res = amqp_simple_wait_frame(conn, &frame);
+ die_amqp_error(res, "waiting for header frame");
+
+ if (frame.frame_type != AMQP_FRAME_METHOD
+ || frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) {
+ continue;
+ }
+
+ amqp_basic_deliver_t *deliver
+ = (amqp_basic_deliver_t *)frame.payload.method.decoded;
+ delivery_tag = deliver->delivery_tag;
+
+ pipeline(argv, &pl);
+ copy_body(conn, pl.infd);
+
+ if (finish_pipeline(&pl) && !no_ack)
+ die_amqp_error(amqp_basic_ack(conn, 1, delivery_tag,
+ 0),
+ "basic.ack");
+
+ amqp_maybe_release_buffers(conn);
+ }
}
int main(int argc, const char **argv)
{
- poptContext opts;
- amqp_connection_state_t conn;
- const char * const *cmd_argv;
- char *queue = NULL;
- char *exchange = NULL;
- char *routing_key = NULL;
- int declare = 0;
- int no_ack = 0;
- int count = -1;
- amqp_bytes_t queue_bytes;
-
- struct poptOption options[] = {
- INCLUDE_OPTIONS(connect_options),
- {"queue", 'q', POPT_ARG_STRING, &queue, 0,
- "the queue to consume from", "queue"},
- {"exchange", 'e', POPT_ARG_STRING, &exchange, 0,
- "bind the queue to this exchange", "exchange"},
- {"routing-key", 'r', POPT_ARG_STRING, &routing_key, 0,
- "the routing key to bind with", "routing key"},
- {"declare", 'd', POPT_ARG_NONE, &declare, 0,
- "declare an exclusive queue", NULL},
- {"no-ack", 'A', POPT_ARG_NONE, &no_ack, 0,
- "consume in no-ack mode", NULL},
- {"count", 'c', POPT_ARG_INT, &count, 0,
- "stop consuming after this many messages are consumed",
- "limit"},
- POPT_AUTOHELP
- { NULL, '\0', 0, NULL, 0, NULL, NULL }
- };
-
- opts = process_options(argc, argv, options,
- "[OPTIONS]... <command> <args>");
-
- cmd_argv = poptGetArgs(opts);
- if (!cmd_argv || !cmd_argv[0]) {
- fprintf(stderr, "consuming command not specified\n");
- poptPrintUsage(opts, stderr, 0);
- goto error;
- }
-
- conn = make_connection();
- queue_bytes = setup_queue(conn, queue, exchange, routing_key, declare);
- do_consume(conn, queue_bytes, no_ack, count, cmd_argv);
- close_connection(conn);
- return 0;
+ poptContext opts;
+ amqp_connection_state_t conn;
+ const char *const *cmd_argv;
+ char *queue = NULL;
+ char *exchange = NULL;
+ char *routing_key = NULL;
+ int declare = 0;
+ int no_ack = 0;
+ int count = -1;
+ amqp_bytes_t queue_bytes;
+
+ struct poptOption options[] = {
+ INCLUDE_OPTIONS(connect_options),
+ {
+ "queue", 'q', POPT_ARG_STRING, &queue, 0,
+ "the queue to consume from", "queue"
+ },
+ {
+ "exchange", 'e', POPT_ARG_STRING, &exchange, 0,
+ "bind the queue to this exchange", "exchange"
+ },
+ {
+ "routing-key", 'r', POPT_ARG_STRING, &routing_key, 0,
+ "the routing key to bind with", "routing key"
+ },
+ {
+ "declare", 'd', POPT_ARG_NONE, &declare, 0,
+ "declare an exclusive queue", NULL
+ },
+ {
+ "no-ack", 'A', POPT_ARG_NONE, &no_ack, 0,
+ "consume in no-ack mode", NULL
+ },
+ {
+ "count", 'c', POPT_ARG_INT, &count, 0,
+ "stop consuming after this many messages are consumed",
+ "limit"
+ },
+ POPT_AUTOHELP
+ { NULL, '\0', 0, NULL, 0, NULL, NULL }
+ };
+
+ opts = process_options(argc, argv, options,
+ "[OPTIONS]... <command> <args>");
+
+ cmd_argv = poptGetArgs(opts);
+ if (!cmd_argv || !cmd_argv[0]) {
+ fprintf(stderr, "consuming command not specified\n");
+ poptPrintUsage(opts, stderr, 0);
+ goto error;
+ }
+
+ conn = make_connection();
+ queue_bytes = setup_queue(conn, queue, exchange, routing_key, declare);
+ do_consume(conn, queue_bytes, no_ack, count, cmd_argv);
+ close_connection(conn);
+ return 0;
error:
- poptFreeContext(opts);
- return 1;
+ poptFreeContext(opts);
+ return 1;
}
diff --git a/tools/declare_queue.c b/tools/declare_queue.c
index 26c3a68..25a94b1 100644
--- a/tools/declare_queue.c
+++ b/tools/declare_queue.c
@@ -44,41 +44,46 @@
int main(int argc, const char **argv)
{
- amqp_connection_state_t conn;
- char *queue = NULL;
- int durable = 0;
+ amqp_connection_state_t conn;
+ char *queue = NULL;
+ int durable = 0;
- struct poptOption options[] = {
- INCLUDE_OPTIONS(connect_options),
- {"queue", 'q', POPT_ARG_STRING, &queue, 0,
- "the queue name to declare, or the empty string", "queue"},
- {"durable", 'd', POPT_ARG_VAL, &durable, 1,
- "declare a durable queue", NULL},
- POPT_AUTOHELP
- { NULL, '\0', 0, NULL, 0, NULL, NULL }
- };
+ struct poptOption options[] = {
+ INCLUDE_OPTIONS(connect_options),
+ {
+ "queue", 'q', POPT_ARG_STRING, &queue, 0,
+ "the queue name to declare, or the empty string", "queue"
+ },
+ {
+ "durable", 'd', POPT_ARG_VAL, &durable, 1,
+ "declare a durable queue", NULL
+ },
+ POPT_AUTOHELP
+ { NULL, '\0', 0, NULL, 0, NULL, NULL }
+ };
- process_all_options(argc, argv, options);
+ process_all_options(argc, argv, options);
- if (queue == NULL) {
- fprintf(stderr, "queue name not specified\n");
- return 1;
- }
+ if (queue == NULL) {
+ fprintf(stderr, "queue name not specified\n");
+ return 1;
+ }
- conn = make_connection();
- {
- amqp_queue_declare_ok_t *reply = amqp_queue_declare(conn, 1,
- cstring_bytes(queue),
- 0,
- durable,
- 0,
- 0,
- amqp_empty_table);
- if (reply == NULL)
- die_rpc(amqp_get_rpc_reply(conn), "queue.declare");
+ conn = make_connection();
+ {
+ amqp_queue_declare_ok_t *reply = amqp_queue_declare(conn, 1,
+ cstring_bytes(queue),
+ 0,
+ durable,
+ 0,
+ 0,
+ amqp_empty_table);
+ if (reply == NULL) {
+ die_rpc(amqp_get_rpc_reply(conn), "queue.declare");
+ }
- printf("%.*s\n", (int)reply->queue.len, (char *)reply->queue.bytes);
- }
- close_connection(conn);
- return 0;
+ printf("%.*s\n", (int)reply->queue.len, (char *)reply->queue.bytes);
+ }
+ close_connection(conn);
+ return 0;
}
diff --git a/tools/delete_queue.c b/tools/delete_queue.c
index cb92f7b..4ebbd94 100644
--- a/tools/delete_queue.c
+++ b/tools/delete_queue.c
@@ -44,41 +44,47 @@
int main(int argc, const char **argv)
{
- amqp_connection_state_t conn;
- char *queue = NULL;
- int if_unused = 0;
- int if_empty = 0;
+ amqp_connection_state_t conn;
+ char *queue = NULL;
+ int if_unused = 0;
+ int if_empty = 0;
- struct poptOption options[] = {
- INCLUDE_OPTIONS(connect_options),
- {"queue", 'q', POPT_ARG_STRING, &queue, 0,
- "the queue name to delete", "queue"},
- {"if-unused", 'u', POPT_ARG_VAL, &if_unused, 1,
- "do not delete unless queue is unused", NULL},
- {"if-empty", 'e', POPT_ARG_VAL, &if_empty, 1,
- "do not delete unless queue is empty", NULL},
- POPT_AUTOHELP
- { NULL, '\0', 0, NULL, 0, NULL, NULL }
- };
+ struct poptOption options[] = {
+ INCLUDE_OPTIONS(connect_options),
+ {
+ "queue", 'q', POPT_ARG_STRING, &queue, 0,
+ "the queue name to delete", "queue"
+ },
+ {
+ "if-unused", 'u', POPT_ARG_VAL, &if_unused, 1,
+ "do not delete unless queue is unused", NULL
+ },
+ {
+ "if-empty", 'e', POPT_ARG_VAL, &if_empty, 1,
+ "do not delete unless queue is empty", NULL
+ },
+ POPT_AUTOHELP
+ { NULL, '\0', 0, NULL, 0, NULL, NULL }
+ };
- process_all_options(argc, argv, options);
+ process_all_options(argc, argv, options);
- if (queue == NULL || *queue == '\0') {
- fprintf(stderr, "queue name not specified\n");
- return 1;
- }
+ if (queue == NULL || *queue == '\0') {
+ fprintf(stderr, "queue name not specified\n");
+ return 1;
+ }
- conn = make_connection();
- {
- amqp_queue_delete_ok_t *reply = amqp_queue_delete(conn, 1,
- cstring_bytes(queue),
- if_unused,
- if_empty);
- if (reply == NULL) {
- die_rpc(amqp_get_rpc_reply(conn), "queue.delete");
- }
- printf("%u\n", reply->message_count);
- }
- close_connection(conn);
- return 0;
+ conn = make_connection();
+ {
+ amqp_queue_delete_ok_t *reply = amqp_queue_delete(conn, 1,
+ cstring_bytes(queue),
+ if_unused,
+ if_empty);
+ if (reply == NULL) {
+ die_rpc(amqp_get_rpc_reply(conn), "queue.delete");
+ }
+ printf("%u\n", reply->message_count);
+ }
+ close_connection(conn);
+ return 0;
}
diff --git a/tools/get.c b/tools/get.c
index 888e069..bbcde72 100644
--- a/tools/get.c
+++ b/tools/get.c
@@ -41,40 +41,43 @@
static int do_get(amqp_connection_state_t conn, char *queue)
{
- amqp_rpc_reply_t r
- = amqp_basic_get(conn, 1, cstring_bytes(queue), 1);
- die_rpc(r, "basic.get");
+ amqp_rpc_reply_t r
+ = amqp_basic_get(conn, 1, cstring_bytes(queue), 1);
+ die_rpc(r, "basic.get");
- if (r.reply.id == AMQP_BASIC_GET_EMPTY_METHOD)
- return 0;
+ if (r.reply.id == AMQP_BASIC_GET_EMPTY_METHOD) {
+ return 0;
+ }
- copy_body(conn, 1);
- return 1;
+ copy_body(conn, 1);
+ return 1;
}
int main(int argc, const char **argv)
{
- amqp_connection_state_t conn;
- char *queue = NULL;
- int got_something;
+ amqp_connection_state_t conn;
+ char *queue = NULL;
+ int got_something;
- struct poptOption options[] = {
- INCLUDE_OPTIONS(connect_options),
- {"queue", 'q', POPT_ARG_STRING, &queue, 0,
- "the queue to consume from", "queue"},
- POPT_AUTOHELP
- { NULL, '\0', 0, NULL, 0, NULL, NULL }
- };
+ struct poptOption options[] = {
+ INCLUDE_OPTIONS(connect_options),
+ {
+ "queue", 'q', POPT_ARG_STRING, &queue, 0,
+ "the queue to consume from", "queue"
+ },
+ POPT_AUTOHELP
+ { NULL, '\0', 0, NULL, 0, NULL, NULL }
+ };
- process_all_options(argc, argv, options);
+ process_all_options(argc, argv, options);
- if (!queue) {
- fprintf(stderr, "queue not specified\n");
- return 1;
- }
+ if (!queue) {
+ fprintf(stderr, "queue not specified\n");
+ return 1;
+ }
- conn = make_connection();
- got_something = do_get(conn, queue);
- close_connection(conn);
- return got_something ? 0 : 2;
+ conn = make_connection();
+ got_something = do_get(conn, queue);
+ close_connection(conn);
+ return got_something ? 0 : 2;
}
diff --git a/tools/publish.c b/tools/publish.c
index 5ebc191..617b055 100644
--- a/tools/publish.c
+++ b/tools/publish.c
@@ -43,80 +43,94 @@
static void do_publish(amqp_connection_state_t conn,
char *exchange, char *routing_key,
- amqp_basic_properties_t *props, amqp_bytes_t body)
+ amqp_basic_properties_t *props, amqp_bytes_t body)
{
- int res = amqp_basic_publish(conn, 1,
- cstring_bytes(exchange),
- cstring_bytes(routing_key),
- 0, 0, props, body);
- die_amqp_error(res, "basic.publish");
+ int res = amqp_basic_publish(conn, 1,
+ cstring_bytes(exchange),
+ cstring_bytes(routing_key),
+ 0, 0, props, body);
+ die_amqp_error(res, "basic.publish");
}
int main(int argc, const char **argv)
{
- amqp_connection_state_t conn;
- char *exchange = NULL;
- char *routing_key = NULL;
- char *content_type = NULL;
- char *content_encoding = NULL;
- char *body = NULL;
- amqp_basic_properties_t props;
- amqp_bytes_t body_bytes;
- int delivery = 1; /* non-persistent by default */
-
- struct poptOption options[] = {
- INCLUDE_OPTIONS(connect_options),
- {"exchange", 'e', POPT_ARG_STRING, &exchange, 0,
- "the exchange to publish to", "exchange"},
- {"routing-key", 'r', POPT_ARG_STRING, &routing_key, 0,
- "the routing key to publish with", "routing key"},
- {"persistent", 'p', POPT_ARG_VAL, &delivery, 2,
- "use the persistent delivery mode", NULL},
- {"content-type", 'C', POPT_ARG_STRING, &content_type, 0,
- "the content-type for the message", "content type"},
- {"content-encoding", 'E', POPT_ARG_STRING,
- &content_encoding, 0,
- "the content-encoding for the message", "content encoding"},
- {"body", 'b', POPT_ARG_STRING, &body, 0,
- "specify the message body", "body"},
- POPT_AUTOHELP
- { NULL, '\0', 0, NULL, 0, NULL, NULL }
- };
-
- process_all_options(argc, argv, options);
-
- if (!exchange && !routing_key) {
- fprintf(stderr,
- "neither exchange nor routing key specified\n");
- return 1;
- }
-
- memset(&props, 0, sizeof props);
- props._flags = AMQP_BASIC_DELIVERY_MODE_FLAG;
- props.delivery_mode = 2; /* persistent delivery mode */
-
- if (content_type) {
- props._flags |= AMQP_BASIC_CONTENT_TYPE_FLAG;
- props.content_type = amqp_cstring_bytes(content_type);
- }
-
- if (content_encoding) {
- props._flags |= AMQP_BASIC_CONTENT_ENCODING_FLAG;
- props.content_encoding = amqp_cstring_bytes(content_encoding);
- }
-
- conn = make_connection();
-
- if (body)
- body_bytes = amqp_cstring_bytes(body);
- else
- body_bytes = read_all(0);
-
- do_publish(conn, exchange, routing_key, &props, body_bytes);
-
- if (!body)
- free(body_bytes.bytes);
-
- close_connection(conn);
- return 0;
+ amqp_connection_state_t conn;
+ char *exchange = NULL;
+ char *routing_key = NULL;
+ char *content_type = NULL;
+ char *content_encoding = NULL;
+ char *body = NULL;
+ amqp_basic_properties_t props;
+ amqp_bytes_t body_bytes;
+ int delivery = 1; /* non-persistent by default */
+
+ struct poptOption options[] = {
+ INCLUDE_OPTIONS(connect_options),
+ {
+ "exchange", 'e', POPT_ARG_STRING, &exchange, 0,
+ "the exchange to publish to", "exchange"
+ },
+ {
+ "routing-key", 'r', POPT_ARG_STRING, &routing_key, 0,
+ "the routing key to publish with", "routing key"
+ },
+ {
+ "persistent", 'p', POPT_ARG_VAL, &delivery, 2,
+ "use the persistent delivery mode", NULL
+ },
+ {
+ "content-type", 'C', POPT_ARG_STRING, &content_type, 0,
+ "the content-type for the message", "content type"
+ },
+ {
+ "content-encoding", 'E', POPT_ARG_STRING,
+ &content_encoding, 0,
+ "the content-encoding for the message", "content encoding"
+ },
+ {
+ "body", 'b', POPT_ARG_STRING, &body, 0,
+ "specify the message body", "body"
+ },
+ POPT_AUTOHELP
+ { NULL, '\0', 0, NULL, 0, NULL, NULL }
+ };
+
+ process_all_options(argc, argv, options);
+
+ if (!exchange && !routing_key) {
+ fprintf(stderr,
+ "neither exchange nor routing key specified\n");
+ return 1;
+ }
+
+ memset(&props, 0, sizeof props);
+ props._flags = AMQP_BASIC_DELIVERY_MODE_FLAG;
+ props.delivery_mode = 2; /* persistent delivery mode */
+
+ if (content_type) {
+ props._flags |= AMQP_BASIC_CONTENT_TYPE_FLAG;
+ props.content_type = amqp_cstring_bytes(content_type);
+ }
+
+ if (content_encoding) {
+ props._flags |= AMQP_BASIC_CONTENT_ENCODING_FLAG;
+ props.content_encoding = amqp_cstring_bytes(content_encoding);
+ }
+
+ conn = make_connection();
+
+ if (body) {
+ body_bytes = amqp_cstring_bytes(body);
+ } else {
+ body_bytes = read_all(0);
+ }
+
+ do_publish(conn, exchange, routing_key, &props, body_bytes);
+
+ if (!body) {
+ free(body_bytes.bytes);
+ }
+
+ close_connection(conn);
+ return 0;
}
diff --git a/tools/unix/process.c b/tools/unix/process.c
index a714928..249fba3 100644
--- a/tools/unix/process.c
+++ b/tools/unix/process.c
@@ -47,41 +47,45 @@ extern char **environ;
void pipeline(const char *const *argv, struct pipeline *pl)
{
- posix_spawn_file_actions_t file_acts;
+ posix_spawn_file_actions_t file_acts;
- int pipefds[2];
- if (pipe(pipefds))
- die_errno(errno, "pipe");
+ int pipefds[2];
+ if (pipe(pipefds)) {
+ die_errno(errno, "pipe");
+ }
- die_errno(posix_spawn_file_actions_init(&file_acts),
- "posix_spawn_file_actions_init");
- die_errno(posix_spawn_file_actions_adddup2(&file_acts, pipefds[0], 0),
- "posix_spawn_file_actions_adddup2");
- die_errno(posix_spawn_file_actions_addclose(&file_acts, pipefds[0]),
- "posix_spawn_file_actions_addclose");
- die_errno(posix_spawn_file_actions_addclose(&file_acts, pipefds[1]),
- "posix_spawn_file_actions_addclose");
+ die_errno(posix_spawn_file_actions_init(&file_acts),
+ "posix_spawn_file_actions_init");
+ die_errno(posix_spawn_file_actions_adddup2(&file_acts, pipefds[0], 0),
+ "posix_spawn_file_actions_adddup2");
+ die_errno(posix_spawn_file_actions_addclose(&file_acts, pipefds[0]),
+ "posix_spawn_file_actions_addclose");
+ die_errno(posix_spawn_file_actions_addclose(&file_acts, pipefds[1]),
+ "posix_spawn_file_actions_addclose");
- die_errno(posix_spawnp(&pl->pid, argv[0], &file_acts, NULL,
- (char * const *)argv, environ),
- "posix_spawnp: %s", argv[0]);
+ die_errno(posix_spawnp(&pl->pid, argv[0], &file_acts, NULL,
+ (char * const *)argv, environ),
+ "posix_spawnp: %s", argv[0]);
- die_errno(posix_spawn_file_actions_destroy(&file_acts),
- "posix_spawn_file_actions_destroy");
+ die_errno(posix_spawn_file_actions_destroy(&file_acts),
+ "posix_spawn_file_actions_destroy");
- if (close(pipefds[0]))
- die_errno(errno, "close");
+ if (close(pipefds[0])) {
+ die_errno(errno, "close");
+ }
- pl->infd = pipefds[1];
+ pl->infd = pipefds[1];
}
int finish_pipeline(struct pipeline *pl)
{
- int status;
+ int status;
- if (close(pl->infd))
- die_errno(errno, "close");
- if (waitpid(pl->pid, &status, 0) < 0)
- die_errno(errno, "waitpid");
- return WIFEXITED(status) && WEXITSTATUS(status) == 0;
+ if (close(pl->infd)) {
+ die_errno(errno, "close");
+ }
+ if (waitpid(pl->pid, &status, 0) < 0) {
+ die_errno(errno, "waitpid");
+ }
+ return WIFEXITED(status) && WEXITSTATUS(status) == 0;
}
diff --git a/tools/unix/process.h b/tools/unix/process.h
index 0aad292..a211f27 100644
--- a/tools/unix/process.h
+++ b/tools/unix/process.h
@@ -32,8 +32,8 @@
*/
struct pipeline {
- int pid;
- int infd;
+ int pid;
+ int infd;
};
extern void pipeline(const char *const *argv, struct pipeline *pl);
diff --git a/tools/win32/compat.c b/tools/win32/compat.c
index cbac8e6..7c7d97e 100644
--- a/tools/win32/compat.c
+++ b/tools/win32/compat.c
@@ -43,21 +43,22 @@
int asprintf(char **strp, const char *fmt, ...)
{
- va_list ap;
- int len;
+ va_list ap;
+ int len;
- va_start(ap, fmt);
- len = _vscprintf(fmt, ap);
- va_end(ap);
+ va_start(ap, fmt);
+ len = _vscprintf(fmt, ap);
+ va_end(ap);
- *strp = malloc(len+1);
- if (!*strp)
- return -1;
+ *strp = malloc(len+1);
+ if (!*strp) {
+ return -1;
+ }
- va_start(ap, fmt);
- _vsnprintf(*strp, len+1, fmt, ap);
- va_end(ap);
+ va_start(ap, fmt);
+ _vsnprintf(*strp, len+1, fmt, ap);
+ va_end(ap);
- (*strp)[len] = 0;
- return len;
+ (*strp)[len] = 0;
+ return len;
}
diff --git a/tools/win32/process.c b/tools/win32/process.c
index 699f60f..4d5270b 100644
--- a/tools/win32/process.c
+++ b/tools/win32/process.c
@@ -44,171 +44,188 @@
void die_windows_error(const char *fmt, ...)
{
- char *msg;
-
- va_list ap;
- va_start(ap, fmt);
- vfprintf(stderr, fmt, ap);
- va_end(ap);
-
- if (!FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM
- | FORMAT_MESSAGE_ALLOCATE_BUFFER,
- NULL, GetLastError(),
- MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
- (LPSTR)&msg, 0, NULL))
- msg = "(failed to retrieve Windows error message)";
-
- fprintf(stderr, ": %s\n", msg);
- exit(1);
+ char *msg;
+
+ va_list ap;
+ va_start(ap, fmt);
+ vfprintf(stderr, fmt, ap);
+ va_end(ap);
+
+ if (!FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM
+ | FORMAT_MESSAGE_ALLOCATE_BUFFER,
+ NULL, GetLastError(),
+ MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
+ (LPSTR)&msg, 0, NULL)) {
+ msg = "(failed to retrieve Windows error message)";
+ }
+
+ fprintf(stderr, ": %s\n", msg);
+ exit(1);
}
static char *make_command_line(const char *const *argv)
{
- int i;
- size_t len = 1; /* initial quotes */
- char *buf;
- char *dest;
-
- /* calculate the length of the required buffer, making worst
- case assumptions for simplicity */
- for (i = 0;;) {
- /* each character could need escaping */
- len += strlen(argv[i]) * 2;
-
- if (!argv[++i])
- break;
-
- len += 3; /* quotes, space, quotes */
- }
-
- len += 2; /* final quotes and the terminating zero */
-
- dest = buf = malloc(len);
- if (!buf)
- die("allocating memory for subprocess command line");
-
- /* Here we perform the inverse of the CommandLineToArgvW
- function. Note that its rules are slightly crazy: A
- sequence of backslashes only act to escape if followed by
- double quotes. A sequence of backslashes not followed by
- double quotes is untouched. */
-
- for (i = 0;;) {
- const char *src = argv[i];
- int backslashes = 0;
-
- *dest++ = '\"';
-
- for (;;) {
- switch (*src) {
- case 0:
- goto done;
-
- case '\"':
- for (; backslashes; backslashes--)
- *dest++ = '\\';
-
- *dest++ = '\\';
- *dest++ = '\"';
- break;
-
- case '\\':
- backslashes++;
- *dest++ = '\\';
- break;
-
- default:
- backslashes = 0;
- *dest++ = *src;
- break;
- }
-
- src++;
- }
- done:
- for (; backslashes; backslashes--)
- *dest++ = '\\';
-
- *dest++ = '\"';
-
- if (!argv[++i])
- break;
-
- *dest++ = ' ';
- }
-
- *dest++ = 0;
- return buf;
+ int i;
+ size_t len = 1; /* initial quotes */
+ char *buf;
+ char *dest;
+
+ /* calculate the length of the required buffer, making worst
+ case assumptions for simplicity */
+ for (i = 0;;) {
+ /* each character could need escaping */
+ len += strlen(argv[i]) * 2;
+
+ if (!argv[++i]) {
+ break;
+ }
+
+ len += 3; /* quotes, space, quotes */
+ }
+
+ len += 2; /* final quotes and the terminating zero */
+
+ dest = buf = malloc(len);
+ if (!buf) {
+ die("allocating memory for subprocess command line");
+ }
+
+ /* Here we perform the inverse of the CommandLineToArgvW
+ function. Note that its rules are slightly crazy: A
+ sequence of backslashes only act to escape if followed by
+ double quotes. A sequence of backslashes not followed by
+ double quotes is untouched. */
+
+ for (i = 0;;) {
+ const char *src = argv[i];
+ int backslashes = 0;
+
+ *dest++ = '\"';
+
+ for (;;) {
+ switch (*src) {
+ case 0:
+ goto done;
+
+ case '\"':
+ for (; backslashes; backslashes--) {
+ *dest++ = '\\';
+ }
+
+ *dest++ = '\\';
+ *dest++ = '\"';
+ break;
+
+ case '\\':
+ backslashes++;
+ *dest++ = '\\';
+ break;
+
+ default:
+ backslashes = 0;
+ *dest++ = *src;
+ break;
+ }
+
+ src++;
+ }
+done:
+ for (; backslashes; backslashes--) {
+ *dest++ = '\\';
+ }
+
+ *dest++ = '\"';
+
+ if (!argv[++i]) {
+ break;
+ }
+
+ *dest++ = ' ';
+ }
+
+ *dest++ = 0;
+ return buf;
}
void pipeline(const char *const *argv, struct pipeline *pl)
{
- HANDLE in_read_handle, in_write_handle;
- SECURITY_ATTRIBUTES sec_attr;
- PROCESS_INFORMATION proc_info;
- STARTUPINFO start_info;
- char *cmdline = make_command_line(argv);
-
- sec_attr.nLength = sizeof sec_attr;
- sec_attr.bInheritHandle = TRUE;
- sec_attr.lpSecurityDescriptor = NULL;
-
- if (!CreatePipe(&in_read_handle, &in_write_handle, &sec_attr, 0))
- die_windows_error("CreatePipe");
-
- if (!SetHandleInformation(in_write_handle, HANDLE_FLAG_INHERIT, 0))
- die_windows_error("SetHandleInformation");
-
- /* when in Rome... */
- ZeroMemory(&proc_info, sizeof proc_info);
- ZeroMemory(&start_info, sizeof start_info);
-
- start_info.cb = sizeof start_info;
- start_info.dwFlags |= STARTF_USESTDHANDLES;
-
- if ((start_info.hStdError = GetStdHandle(STD_ERROR_HANDLE))
- == INVALID_HANDLE_VALUE
- || (start_info.hStdOutput = GetStdHandle(STD_OUTPUT_HANDLE))
- == INVALID_HANDLE_VALUE)
- die_windows_error("GetStdHandle");
-
- start_info.hStdInput = in_read_handle;
-
- if (!CreateProcess(NULL, cmdline, NULL, NULL, TRUE, 0,
- NULL, NULL, &start_info, &proc_info))
- die_windows_error("CreateProcess");
-
- free(cmdline);
-
- if (!CloseHandle(proc_info.hThread))
- die_windows_error("CloseHandle for thread");
- if (!CloseHandle(in_read_handle))
- die_windows_error("CloseHandle");
-
- pl->proc_handle = proc_info.hProcess;
- pl->infd = _open_osfhandle((intptr_t)in_write_handle, 0);
+ HANDLE in_read_handle, in_write_handle;
+ SECURITY_ATTRIBUTES sec_attr;
+ PROCESS_INFORMATION proc_info;
+ STARTUPINFO start_info;
+ char *cmdline = make_command_line(argv);
+
+ sec_attr.nLength = sizeof sec_attr;
+ sec_attr.bInheritHandle = TRUE;
+ sec_attr.lpSecurityDescriptor = NULL;
+
+ if (!CreatePipe(&in_read_handle, &in_write_handle, &sec_attr, 0)) {
+ die_windows_error("CreatePipe");
+ }
+
+ if (!SetHandleInformation(in_write_handle, HANDLE_FLAG_INHERIT, 0)) {
+ die_windows_error("SetHandleInformation");
+ }
+
+ /* when in Rome... */
+ ZeroMemory(&proc_info, sizeof proc_info);
+ ZeroMemory(&start_info, sizeof start_info);
+
+ start_info.cb = sizeof start_info;
+ start_info.dwFlags |= STARTF_USESTDHANDLES;
+
+ if ((start_info.hStdError = GetStdHandle(STD_ERROR_HANDLE))
+ == INVALID_HANDLE_VALUE
+ || (start_info.hStdOutput = GetStdHandle(STD_OUTPUT_HANDLE))
+ == INVALID_HANDLE_VALUE) {
+ die_windows_error("GetStdHandle");
+ }
+
+ start_info.hStdInput = in_read_handle;
+
+ if (!CreateProcess(NULL, cmdline, NULL, NULL, TRUE, 0,
+ NULL, NULL, &start_info, &proc_info)) {
+ die_windows_error("CreateProcess");
+ }
+
+ free(cmdline);
+
+ if (!CloseHandle(proc_info.hThread)) {
+ die_windows_error("CloseHandle for thread");
+ }
+ if (!CloseHandle(in_read_handle)) {
+ die_windows_error("CloseHandle");
+ }
+
+ pl->proc_handle = proc_info.hProcess;
+ pl->infd = _open_osfhandle((intptr_t)in_write_handle, 0);
}
int finish_pipeline(struct pipeline *pl)
{
- DWORD code;
-
- if (close(pl->infd))
- die_errno(errno, "close");
-
- for (;;) {
- if (!GetExitCodeProcess(pl->proc_handle, &code))
- die_windows_error("GetExitCodeProcess");
- if (code != STILL_ACTIVE)
- break;
-
- if (WaitForSingleObject(pl->proc_handle, INFINITE)
- == WAIT_FAILED)
- die_windows_error("WaitForSingleObject");
- }
-
- if (!CloseHandle(pl->proc_handle))
- die_windows_error("CloseHandle for process");
-
- return code;
+ DWORD code;
+
+ if (close(pl->infd)) {
+ die_errno(errno, "close");
+ }
+
+ for (;;) {
+ if (!GetExitCodeProcess(pl->proc_handle, &code)) {
+ die_windows_error("GetExitCodeProcess");
+ }
+ if (code != STILL_ACTIVE) {
+ break;
+ }
+
+ if (WaitForSingleObject(pl->proc_handle, INFINITE)
+ == WAIT_FAILED) {
+ die_windows_error("WaitForSingleObject");
+ }
+ }
+
+ if (!CloseHandle(pl->proc_handle)) {
+ die_windows_error("CloseHandle for process");
+ }
+
+ return code;
}
diff --git a/tools/win32/process.h b/tools/win32/process.h
index 1429ad2..88f2c7a 100644
--- a/tools/win32/process.h
+++ b/tools/win32/process.h
@@ -34,8 +34,8 @@
#include <windef.h>
struct pipeline {
- HANDLE proc_handle;
- int infd;
+ HANDLE proc_handle;
+ int infd;
};
extern void pipeline(const char *const *argv, struct pipeline *pl);