From 89b3be6ef4138452869b82324c582a1aff72c6c8 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Fri, 24 May 2013 11:37:15 +0100 Subject: Make sure we update the federation state when a consumer becomes inactive or is cancelled. --- src/rabbit_amqqueue_process.erl | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) (limited to 'src/rabbit_amqqueue_process.erl') diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 2dc85d63..71dbb9de 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -440,12 +440,14 @@ deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) -> C = lookup_ch(ChPid), case is_ch_blocked(C) of true -> block_consumer(C, E), + notify_federation(State), {false, State}; false -> case rabbit_limiter:can_send(C#cr.limiter, Consumer#consumer.ack_required, Consumer#consumer.tag) of {suspend, Limiter} -> block_consumer(C#cr{limiter = Limiter}, E), + notify_federation(State), {false, State}; {continue, Limiter} -> AC1 = queue:in(E, State#q.active_consumers), @@ -523,15 +525,22 @@ discard(#delivery{sender = SenderPid, BQS1 = BQ:discard(MsgId, SenderPid, BQS), State1#q{backing_queue_state = BQS1}. -run_message_queue(State = #q{q = Q}) -> - {IsEmpty1, State1} = deliver_msgs_to_consumers( +run_message_queue(State) -> + {_IsEmpty1, State1} = deliver_msgs_to_consumers( fun deliver_from_queue_deliver/2, is_empty(State), State), - case IsEmpty1 andalso active_unfederated(State1#q.active_consumers) of + notify_federation(State1), + State1. + +notify_federation(#q{q = Q, + active_consumers = ActiveConsumers, + backing_queue = BQ, + backing_queue_state = BQS}) -> + IsEmpty = BQ:is_empty(BQS), + case IsEmpty andalso active_unfederated(ActiveConsumers) of true -> rabbit_federation_queue:run(Q); false -> rabbit_federation_queue:stop(Q) - end, - State1. + end. active_unfederated(Cs) -> case queue:out(Cs) of @@ -1194,6 +1203,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, active_consumers = remove_consumer( ChPid, ConsumerTag, State#q.active_consumers)}, + notify_federation(State1), case should_auto_delete(State1) of false -> reply(ok, ensure_expiry_timer(State1)); true -> stop(ok, State1) -- cgit v1.2.1