summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Antonuk <alan.antonuk@gmail.com>2013-08-20 10:45:30 -0700
committerAlan Antonuk <alan.antonuk@gmail.com>2013-08-20 10:45:30 -0700
commite7626673f7834aa59858347196c5f549759ccec3 (patch)
tree861465c7c5c4117af6423dbea41b662bac06eeb5
parent5dc23cc52a147d3a9db3fced2a3ab5019fb1c0c3 (diff)
downloadrabbitmq-c-double_ack_example.tar.gz
Example of double-ack'ing a messagedouble_ack_example
-rw-r--r--examples/amqp_consumer.c32
1 files changed, 30 insertions, 2 deletions
diff --git a/examples/amqp_consumer.c b/examples/amqp_consumer.c
index 62da0ca..09b5486 100644
--- a/examples/amqp_consumer.c
+++ b/examples/amqp_consumer.c
@@ -64,6 +64,7 @@ static void run(amqp_connection_state_t conn)
while (1) {
amqp_rpc_reply_t ret;
amqp_envelope_t envelope;
+ int res;
now = now_microseconds();
if (now > next_summary_time) {
@@ -119,6 +120,13 @@ static void run(amqp_connection_state_t conn)
* that were declared auto-delete, and restart any consumers that were attached
* to the previous channel
*/
+ {
+ amqp_channel_close_t *channel_closed = (amqp_channel_close_t*)frame.payload.method.decoded;
+
+ fprintf(stderr, "Channel closed: Reason: %.*s Method: %s\n",
+ (int)channel_closed->reply_text.len, (char*)channel_closed->reply_text.bytes,
+ amqp_method_name(channel_closed->class_id << 16 | channel_closed->method_id));
+ }
return;
case AMQP_CONNECTION_CLOSE_METHOD:
@@ -131,15 +139,35 @@ static void run(amqp_connection_state_t conn)
default:
fprintf(stderr ,"An unexpected method was received %d\n", frame.payload.method.id);
- return;
+ goto err_out;
}
}
}
}
+ res = amqp_basic_ack(conn, envelope.channel, envelope.delivery_tag, 0);
+ if (AMQP_STATUS_OK != res) {
+ fprintf(stderr, "Failed: %s\n", amqp_error_string2(res));
+ goto err_out;
+ }
+
+ res = amqp_basic_ack(conn, envelope.channel, envelope.delivery_tag, 0);
+ if (AMQP_STATUS_OK != res) {
+ fprintf(stderr, "Failed: %s\n", amqp_error_string2(res));
+ goto err_out;
+ }
+
+ amqp_destroy_envelope(&envelope);
+
received++;
+ continue;
+
+err_out:
+ amqp_destroy_envelope(&envelope);
+ break;
}
+
}
int main(int argc, char const *const *argv)
@@ -195,7 +223,7 @@ int main(int argc, char const *const *argv)
amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue");
- amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
+ amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 0, 0, amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");
run(conn);