diff options
author | Alan Antonuk <alan.antonuk@gmail.com> | 2013-08-20 10:45:30 -0700 |
---|---|---|
committer | Alan Antonuk <alan.antonuk@gmail.com> | 2013-08-20 10:45:30 -0700 |
commit | e7626673f7834aa59858347196c5f549759ccec3 (patch) | |
tree | 861465c7c5c4117af6423dbea41b662bac06eeb5 | |
parent | 5dc23cc52a147d3a9db3fced2a3ab5019fb1c0c3 (diff) | |
download | rabbitmq-c-double_ack_example.tar.gz |
Example of double-ack'ing a messagedouble_ack_example
-rw-r--r-- | examples/amqp_consumer.c | 32 |
1 files changed, 30 insertions, 2 deletions
diff --git a/examples/amqp_consumer.c b/examples/amqp_consumer.c index 62da0ca..09b5486 100644 --- a/examples/amqp_consumer.c +++ b/examples/amqp_consumer.c @@ -64,6 +64,7 @@ static void run(amqp_connection_state_t conn) while (1) { amqp_rpc_reply_t ret; amqp_envelope_t envelope; + int res; now = now_microseconds(); if (now > next_summary_time) { @@ -119,6 +120,13 @@ static void run(amqp_connection_state_t conn) * that were declared auto-delete, and restart any consumers that were attached * to the previous channel */ + { + amqp_channel_close_t *channel_closed = (amqp_channel_close_t*)frame.payload.method.decoded; + + fprintf(stderr, "Channel closed: Reason: %.*s Method: %s\n", + (int)channel_closed->reply_text.len, (char*)channel_closed->reply_text.bytes, + amqp_method_name(channel_closed->class_id << 16 | channel_closed->method_id)); + } return; case AMQP_CONNECTION_CLOSE_METHOD: @@ -131,15 +139,35 @@ static void run(amqp_connection_state_t conn) default: fprintf(stderr ,"An unexpected method was received %d\n", frame.payload.method.id); - return; + goto err_out; } } } } + res = amqp_basic_ack(conn, envelope.channel, envelope.delivery_tag, 0); + if (AMQP_STATUS_OK != res) { + fprintf(stderr, "Failed: %s\n", amqp_error_string2(res)); + goto err_out; + } + + res = amqp_basic_ack(conn, envelope.channel, envelope.delivery_tag, 0); + if (AMQP_STATUS_OK != res) { + fprintf(stderr, "Failed: %s\n", amqp_error_string2(res)); + goto err_out; + } + + amqp_destroy_envelope(&envelope); + received++; + continue; + +err_out: + amqp_destroy_envelope(&envelope); + break; } + } int main(int argc, char const *const *argv) @@ -195,7 +223,7 @@ int main(int argc, char const *const *argv) amqp_empty_table); die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue"); - amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 1, 0, amqp_empty_table); + amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 0, 0, amqp_empty_table); die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming"); run(conn); |