summaryrefslogtreecommitdiff
path: root/examples/amqp_producer.c
diff options
context:
space:
mode:
Diffstat (limited to 'examples/amqp_producer.c')
-rw-r--r--examples/amqp_producer.c23
1 files changed, 13 insertions, 10 deletions
diff --git a/examples/amqp_producer.c b/examples/amqp_producer.c
index b83e030..77c3e92 100644
--- a/examples/amqp_producer.c
+++ b/examples/amqp_producer.c
@@ -56,9 +56,7 @@
#include <amqp.h>
#include <amqp_framing.h>
-#include <unistd.h>
-
-#include "example_utils.h"
+#include "utils.h"
#define SUMMARY_EVERY_US 1000000
@@ -67,21 +65,26 @@ static void send_batch(amqp_connection_state_t conn,
int rate_limit,
int message_count)
{
- long long start_time = now_microseconds();
+ uint64_t start_time = now_microseconds();
int i;
int sent = 0;
int previous_sent = 0;
- long long previous_report_time = start_time;
- long long next_summary_time = start_time + SUMMARY_EVERY_US;
+ uint64_t previous_report_time = start_time;
+ uint64_t next_summary_time = start_time + SUMMARY_EVERY_US;
char message[256];
+ amqp_bytes_t message_bytes;
for (i = 0; i < sizeof(message); i++) {
message[i] = i & 0xff;
}
+ message_bytes.len = sizeof(message);
+ message_bytes.bytes = message;
+
for (i = 0; i < message_count; i++) {
- long long now = now_microseconds();
+ uint64_t now = now_microseconds();
+
die_on_error(amqp_basic_publish(conn,
1,
amqp_cstring_bytes("amq.direct"),
@@ -89,7 +92,7 @@ static void send_batch(amqp_connection_state_t conn,
0,
0,
NULL,
- (amqp_bytes_t) {.len = sizeof(message), .bytes = message}),
+ message_bytes),
"Publishing");
sent++;
if (now > next_summary_time) {
@@ -104,13 +107,13 @@ static void send_batch(amqp_connection_state_t conn,
}
while (((i * 1000000.0) / (now - start_time)) > rate_limit) {
- usleep(2000);
+ microsleep(2000);
now = now_microseconds();
}
}
{
- long long stop_time = now_microseconds();
+ uint64_t stop_time = now_microseconds();
int total_delta = stop_time - start_time;
printf("PRODUCER - Message count: %d\n", message_count);