diff options
Diffstat (limited to 'examples/amqps_consumer.c')
-rw-r--r-- | examples/amqps_consumer.c | 99 |
1 files changed, 59 insertions, 40 deletions
diff --git a/examples/amqps_consumer.c b/examples/amqps_consumer.c index fff6677..d4cd294 100644 --- a/examples/amqps_consumer.c +++ b/examples/amqps_consumer.c @@ -60,13 +60,13 @@ static void run(amqp_connection_state_t conn) uint64_t next_summary_time = start_time + SUMMARY_EVERY_US; amqp_frame_t frame; - int result; - size_t body_received; - size_t body_target; uint64_t now; while (1) { + amqp_rpc_reply_t ret; + amqp_envelope_t envelope; + now = now_microseconds(); if (now > next_summary_time) { int countOverInterval = received - previous_received; @@ -80,45 +80,64 @@ static void run(amqp_connection_state_t conn) } amqp_maybe_release_buffers(conn); - result = amqp_simple_wait_frame(conn, &frame); - if (result < 0) { - return; - } - - if (frame.frame_type != AMQP_FRAME_METHOD) { - continue; - } - - if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) { - continue; - } - - result = amqp_simple_wait_frame(conn, &frame); - if (result < 0) { - return; - } - - if (frame.frame_type != AMQP_FRAME_HEADER) { - fprintf(stderr, "Expected header!"); - abort(); - } - - body_target = frame.payload.properties.body_size; - body_received = 0; - - while (body_received < body_target) { - result = amqp_simple_wait_frame(conn, &frame); - if (result < 0) { - return; - } - - if (frame.frame_type != AMQP_FRAME_BODY) { - fprintf(stderr, "Expected body!"); - abort(); + ret = amqp_consume_message(conn, &envelope, NULL, 0); + + if (AMQP_RESPONSE_NORMAL != ret.reply_type) { + if (AMQP_RESPONSE_LIBRARY_EXCEPTION == ret.reply_type && + AMQP_STATUS_UNEXPECTED_STATE == ret.library_error) { + if (AMQP_STATUS_OK != amqp_simple_wait_frame(conn, &frame)) { + return; + } + + if (AMQP_FRAME_METHOD == frame.frame_type) { + switch (frame.payload.method.id) { + case AMQP_BASIC_ACK_METHOD: + /* if we've turned publisher confirms on, and we've published a message + * here is a message being confirmed + */ + + break; + case AMQP_BASIC_RETURN_METHOD: + /* if a published message couldn't be routed and the mandatory flag was set + * this is what would be returned. The message then needs to be read. + */ + { + amqp_message_t message; + ret = amqp_read_message(conn, frame.channel, &message, 0); + if (AMQP_RESPONSE_NORMAL != ret.reply_type) { + return; + } + + amqp_destroy_message(&message); + } + + break; + + case AMQP_CHANNEL_CLOSE_METHOD: + /* a channel.close method happens when a channel exception occurs, this + * can happen by publishing to an exchange that doesn't exist for example + * + * In this case you would need to open another channel redeclare any queues + * that were declared auto-delete, and restart any consumers that were attached + * to the previous channel + */ + return; + + case AMQP_CONNECTION_CLOSE_METHOD: + /* a connection.close method happens when a connection exception occurs, + * this can happen by trying to use a channel that isn't open for example. + * + * In this case the whole connection must be restarted. + */ + return; + + default: + fprintf(stderr ,"An unexpected method was received %d\n", frame.payload.method.id); + return; + } + } } - body_received += frame.payload.body_fragment.len; - assert(body_received <= body_target); } received++; |