summaryrefslogtreecommitdiff
path: root/examples/amqps_producer.c
diff options
context:
space:
mode:
Diffstat (limited to 'examples/amqps_producer.c')
-rw-r--r--examples/amqps_producer.c42
1 files changed, 20 insertions, 22 deletions
diff --git a/examples/amqps_producer.c b/examples/amqps_producer.c
index 66bd181..120c21b 100644
--- a/examples/amqps_producer.c
+++ b/examples/amqps_producer.c
@@ -37,8 +37,8 @@
*/
#include <stdint.h>
-#include <stdlib.h>
#include <stdio.h>
+#include <stdlib.h>
#include <string.h>
#include <amqp.h>
@@ -48,11 +48,8 @@
#define SUMMARY_EVERY_US 1000000
-static void send_batch(amqp_connection_state_t conn,
- char const *queue_name,
- int rate_limit,
- int message_count)
-{
+static void send_batch(amqp_connection_state_t conn, char const *queue_name,
+ int rate_limit, int message_count) {
uint64_t start_time = now_microseconds();
int i;
int sent = 0;
@@ -73,21 +70,18 @@ static void send_batch(amqp_connection_state_t conn,
for (i = 0; i < message_count; i++) {
uint64_t now = now_microseconds();
- die_on_error(amqp_basic_publish(conn,
- 1,
- amqp_cstring_bytes("amq.direct"),
- amqp_cstring_bytes(queue_name),
- 0,
- 0,
- NULL,
+ die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes("amq.direct"),
+ amqp_cstring_bytes(queue_name), 0, 0, NULL,
message_bytes),
"Publishing");
sent++;
if (now > next_summary_time) {
int countOverInterval = sent - previous_sent;
- double intervalRate = countOverInterval / ((now - previous_report_time) / 1000000.0);
+ double intervalRate =
+ countOverInterval / ((now - previous_report_time) / 1000000.0);
printf("%d ms: Sent %d - %d since last report (%d Hz)\n",
- (int)(now - start_time) / 1000, sent, countOverInterval, (int) intervalRate);
+ (int)(now - start_time) / 1000, sent, countOverInterval,
+ (int)intervalRate);
previous_sent = sent;
previous_report_time = now;
@@ -106,12 +100,12 @@ static void send_batch(amqp_connection_state_t conn,
printf("PRODUCER - Message count: %d\n", message_count);
printf("Total time, milliseconds: %d\n", total_delta / 1000);
- printf("Overall messages-per-second: %g\n", (message_count / (total_delta / 1000000.0)));
+ printf("Overall messages-per-second: %g\n",
+ (message_count / (total_delta / 1000000.0)));
}
}
-int main(int argc, char const *const *argv)
-{
+int main(int argc, char const *const *argv) {
char const *hostname;
int port, status;
int rate_limit;
@@ -120,7 +114,8 @@ int main(int argc, char const *const *argv)
amqp_connection_state_t conn;
if (argc < 5) {
- fprintf(stderr, "Usage: amqps_producer host port rate_limit message_count "
+ fprintf(stderr,
+ "Usage: amqps_producer host port rate_limit message_count "
"[cacert.pem [verifypeer] [verifyhostname] [key.pem cert.pem]]\n");
return 1;
}
@@ -168,15 +163,18 @@ int main(int argc, char const *const *argv)
die("opening SSL/TLS connection");
}
- die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
+ die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,
+ "guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
send_batch(conn, "test queue", rate_limit, message_count);
- die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
- die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
+ die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS),
+ "Closing channel");
+ die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS),
+ "Closing connection");
die_on_error(amqp_destroy_connection(conn), "Ending connection");
return 0;
}