diff options
Diffstat (limited to 'examples/amqp_consumer.c')
-rw-r--r-- | examples/amqp_consumer.c | 65 |
1 files changed, 36 insertions, 29 deletions
diff --git a/examples/amqp_consumer.c b/examples/amqp_consumer.c index 189f874..93c7a21 100644 --- a/examples/amqp_consumer.c +++ b/examples/amqp_consumer.c @@ -34,8 +34,8 @@ */ #include <stdint.h> -#include <stdlib.h> #include <stdio.h> +#include <stdlib.h> #include <string.h> #include <amqp.h> @@ -47,8 +47,7 @@ #define SUMMARY_EVERY_US 1000000 -static void run(amqp_connection_state_t conn) -{ +static void run(amqp_connection_state_t conn) { uint64_t start_time = now_microseconds(); int received = 0; int previous_received = 0; @@ -66,9 +65,11 @@ static void run(amqp_connection_state_t conn) now = now_microseconds(); if (now > next_summary_time) { int countOverInterval = received - previous_received; - double intervalRate = countOverInterval / ((now - previous_report_time) / 1000000.0); + double intervalRate = + countOverInterval / ((now - previous_report_time) / 1000000.0); printf("%d ms: Received %d - %d since last report (%d Hz)\n", - (int)(now - start_time) / 1000, received, countOverInterval, (int) intervalRate); + (int)(now - start_time) / 1000, received, countOverInterval, + (int)intervalRate); previous_received = received; previous_report_time = now; @@ -88,14 +89,14 @@ static void run(amqp_connection_state_t conn) if (AMQP_FRAME_METHOD == frame.frame_type) { switch (frame.payload.method.id) { case AMQP_BASIC_ACK_METHOD: - /* if we've turned publisher confirms on, and we've published a message - * here is a message being confirmed + /* if we've turned publisher confirms on, and we've published a + * message here is a message being confirmed. */ - break; case AMQP_BASIC_RETURN_METHOD: - /* if a published message couldn't be routed and the mandatory flag was set - * this is what would be returned. The message then needs to be read. + /* if a published message couldn't be routed and the mandatory + * flag was set this is what would be returned. The message then + * needs to be read. */ { amqp_message_t message; @@ -110,25 +111,28 @@ static void run(amqp_connection_state_t conn) break; case AMQP_CHANNEL_CLOSE_METHOD: - /* a channel.close method happens when a channel exception occurs, this - * can happen by publishing to an exchange that doesn't exist for example + /* a channel.close method happens when a channel exception occurs, + * this can happen by publishing to an exchange that doesn't exist + * for example. * - * In this case you would need to open another channel redeclare any queues - * that were declared auto-delete, and restart any consumers that were attached - * to the previous channel + * In this case you would need to open another channel redeclare + * any queues that were declared auto-delete, and restart any + * consumers that were attached to the previous channel. */ return; case AMQP_CONNECTION_CLOSE_METHOD: - /* a connection.close method happens when a connection exception occurs, - * this can happen by trying to use a channel that isn't open for example. + /* a connection.close method happens when a connection exception + * occurs, this can happen by trying to use a channel that isn't + * open for example. * * In this case the whole connection must be restarted. */ return; default: - fprintf(stderr ,"An unexpected method was received %u\n", frame.payload.method.id); + fprintf(stderr, "An unexpected method was received %u\n", + frame.payload.method.id); return; } } @@ -142,8 +146,7 @@ static void run(amqp_connection_state_t conn) } } -int main(int argc, char const *const *argv) -{ +int main(int argc, char const *const *argv) { char const *hostname; int port, status; char const *exchange; @@ -160,7 +163,7 @@ int main(int argc, char const *const *argv) hostname = argv[1]; port = atoi(argv[2]); - exchange = "amq.direct"; /* argv[3]; */ + exchange = "amq.direct"; /* argv[3]; */ bindingkey = "test queue"; /* argv[4]; */ conn = amqp_new_connection(); @@ -175,14 +178,15 @@ int main(int argc, char const *const *argv) die("opening TCP socket"); } - die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), + die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, + "guest", "guest"), "Logging in"); amqp_channel_open(conn, 1); die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); { - amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_empty_bytes, 0, 0, 0, 1, - amqp_empty_table); + amqp_queue_declare_ok_t *r = amqp_queue_declare( + conn, 1, amqp_empty_bytes, 0, 0, 0, 1, amqp_empty_table); die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue"); queuename = amqp_bytes_malloc_dup(r->queue); if (queuename.bytes == NULL) { @@ -191,17 +195,20 @@ int main(int argc, char const *const *argv) } } - amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey), - amqp_empty_table); + amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange), + amqp_cstring_bytes(bindingkey), amqp_empty_table); die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue"); - amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 1, 0, amqp_empty_table); + amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 1, 0, + amqp_empty_table); die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming"); run(conn); - die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); - die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); + die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), + "Closing channel"); + die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), + "Closing connection"); die_on_error(amqp_destroy_connection(conn), "Ending connection"); return 0; |