summaryrefslogtreecommitdiff
path: root/examples/amqp_consumer.c
diff options
context:
space:
mode:
Diffstat (limited to 'examples/amqp_consumer.c')
-rw-r--r--examples/amqp_consumer.c21
1 files changed, 12 insertions, 9 deletions
diff --git a/examples/amqp_consumer.c b/examples/amqp_consumer.c
index a8ed9e3..b1754f5 100644
--- a/examples/amqp_consumer.c
+++ b/examples/amqp_consumer.c
@@ -51,7 +51,6 @@
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
-#include <errno.h>
#include <stdint.h>
#include <amqp.h>
@@ -84,8 +83,8 @@ static void run(amqp_connection_state_t conn)
if (now > next_summary_time) {
int countOverInterval = received - previous_received;
double intervalRate = countOverInterval / ((now - previous_report_time) / 1000000.0);
- printf("%lld ms: Received %d - %d since last report (%d Hz)\n",
- (now - start_time) / 1000, received, countOverInterval, (int) intervalRate);
+ printf("%d ms: Received %d - %d since last report (%d Hz)\n",
+ (int)(now - start_time) / 1000, received, countOverInterval, (int) intervalRate);
previous_received = received;
previous_report_time = now;
@@ -94,7 +93,8 @@ static void run(amqp_connection_state_t conn)
amqp_maybe_release_buffers(conn);
result = amqp_simple_wait_frame(conn, &frame);
- if (result <= 0) return;
+ if (result < 0)
+ return;
if (frame.frame_type != AMQP_FRAME_METHOD)
continue;
@@ -103,7 +103,9 @@ static void run(amqp_connection_state_t conn)
continue;
result = amqp_simple_wait_frame(conn, &frame);
- if (result <= 0) return;
+ if (result < 0)
+ return;
+
if (frame.frame_type != AMQP_FRAME_HEADER) {
fprintf(stderr, "Expected header!");
abort();
@@ -114,7 +116,8 @@ static void run(amqp_connection_state_t conn)
while (body_received < body_target) {
result = amqp_simple_wait_frame(conn, &frame);
- if (result <= 0) return;
+ if (result < 0)
+ return;
if (frame.frame_type != AMQP_FRAME_BODY) {
fprintf(stderr, "Expected body!");
@@ -165,7 +168,8 @@ int main(int argc, char const * const *argv) {
die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue");
queuename = amqp_bytes_malloc_dup(r->queue);
if (queuename.bytes == NULL) {
- die_on_error(-ENOMEM, "Copying queue name");
+ fprintf(stderr, "Out of memory while copying queue name");
+ return 1;
}
}
@@ -180,8 +184,7 @@ int main(int argc, char const * const *argv) {
die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
- amqp_destroy_connection(conn);
- die_on_error(close(sockfd), "Closing socket");
+ die_on_error(amqp_destroy_connection(conn), "Ending connection");
return 0;
}