summaryrefslogtreecommitdiff
path: root/examples/amqps_consumer.c
diff options
context:
space:
mode:
Diffstat (limited to 'examples/amqps_consumer.c')
-rw-r--r--examples/amqps_consumer.c99
1 files changed, 59 insertions, 40 deletions
diff --git a/examples/amqps_consumer.c b/examples/amqps_consumer.c
index fff6677..d4cd294 100644
--- a/examples/amqps_consumer.c
+++ b/examples/amqps_consumer.c
@@ -60,13 +60,13 @@ static void run(amqp_connection_state_t conn)
uint64_t next_summary_time = start_time + SUMMARY_EVERY_US;
amqp_frame_t frame;
- int result;
- size_t body_received;
- size_t body_target;
uint64_t now;
while (1) {
+ amqp_rpc_reply_t ret;
+ amqp_envelope_t envelope;
+
now = now_microseconds();
if (now > next_summary_time) {
int countOverInterval = received - previous_received;
@@ -80,45 +80,64 @@ static void run(amqp_connection_state_t conn)
}
amqp_maybe_release_buffers(conn);
- result = amqp_simple_wait_frame(conn, &frame);
- if (result < 0) {
- return;
- }
-
- if (frame.frame_type != AMQP_FRAME_METHOD) {
- continue;
- }
-
- if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) {
- continue;
- }
-
- result = amqp_simple_wait_frame(conn, &frame);
- if (result < 0) {
- return;
- }
-
- if (frame.frame_type != AMQP_FRAME_HEADER) {
- fprintf(stderr, "Expected header!");
- abort();
- }
-
- body_target = frame.payload.properties.body_size;
- body_received = 0;
-
- while (body_received < body_target) {
- result = amqp_simple_wait_frame(conn, &frame);
- if (result < 0) {
- return;
- }
-
- if (frame.frame_type != AMQP_FRAME_BODY) {
- fprintf(stderr, "Expected body!");
- abort();
+ ret = amqp_consume_message(conn, &envelope, NULL, 0);
+
+ if (AMQP_RESPONSE_NORMAL != ret.reply_type) {
+ if (AMQP_RESPONSE_LIBRARY_EXCEPTION == ret.reply_type &&
+ AMQP_STATUS_UNEXPECTED_STATE == ret.library_error) {
+ if (AMQP_STATUS_OK != amqp_simple_wait_frame(conn, &frame)) {
+ return;
+ }
+
+ if (AMQP_FRAME_METHOD == frame.frame_type) {
+ switch (frame.payload.method.id) {
+ case AMQP_BASIC_ACK_METHOD:
+ /* if we've turned publisher confirms on, and we've published a message
+ * here is a message being confirmed
+ */
+
+ break;
+ case AMQP_BASIC_RETURN_METHOD:
+ /* if a published message couldn't be routed and the mandatory flag was set
+ * this is what would be returned. The message then needs to be read.
+ */
+ {
+ amqp_message_t message;
+ ret = amqp_read_message(conn, frame.channel, &message, 0);
+ if (AMQP_RESPONSE_NORMAL != ret.reply_type) {
+ return;
+ }
+
+ amqp_destroy_message(&message);
+ }
+
+ break;
+
+ case AMQP_CHANNEL_CLOSE_METHOD:
+ /* a channel.close method happens when a channel exception occurs, this
+ * can happen by publishing to an exchange that doesn't exist for example
+ *
+ * In this case you would need to open another channel redeclare any queues
+ * that were declared auto-delete, and restart any consumers that were attached
+ * to the previous channel
+ */
+ return;
+
+ case AMQP_CONNECTION_CLOSE_METHOD:
+ /* a connection.close method happens when a connection exception occurs,
+ * this can happen by trying to use a channel that isn't open for example.
+ *
+ * In this case the whole connection must be restarted.
+ */
+ return;
+
+ default:
+ fprintf(stderr ,"An unexpected method was received %d\n", frame.payload.method.id);
+ return;
+ }
+ }
}
- body_received += frame.payload.body_fragment.len;
- assert(body_received <= body_target);
}
received++;