diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-05-24 11:37:15 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-05-24 11:37:15 +0100 |
commit | 89b3be6ef4138452869b82324c582a1aff72c6c8 (patch) | |
tree | a405512c6ca16f81c4aa6a2a4ee857b247f04b5a /src/rabbit_amqqueue_process.erl | |
parent | 2a245a183664fb656c54ce3f22e655dc790186d3 (diff) | |
download | rabbitmq-server-89b3be6ef4138452869b82324c582a1aff72c6c8.tar.gz |
Make sure we update the federation state when a consumer becomes inactive or is cancelled.
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 20 |
1 files changed, 15 insertions, 5 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 2dc85d63..71dbb9de 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -440,12 +440,14 @@ deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) -> C = lookup_ch(ChPid), case is_ch_blocked(C) of true -> block_consumer(C, E), + notify_federation(State), {false, State}; false -> case rabbit_limiter:can_send(C#cr.limiter, Consumer#consumer.ack_required, Consumer#consumer.tag) of {suspend, Limiter} -> block_consumer(C#cr{limiter = Limiter}, E), + notify_federation(State), {false, State}; {continue, Limiter} -> AC1 = queue:in(E, State#q.active_consumers), @@ -523,15 +525,22 @@ discard(#delivery{sender = SenderPid, BQS1 = BQ:discard(MsgId, SenderPid, BQS), State1#q{backing_queue_state = BQS1}. -run_message_queue(State = #q{q = Q}) -> - {IsEmpty1, State1} = deliver_msgs_to_consumers( +run_message_queue(State) -> + {_IsEmpty1, State1} = deliver_msgs_to_consumers( fun deliver_from_queue_deliver/2, is_empty(State), State), - case IsEmpty1 andalso active_unfederated(State1#q.active_consumers) of + notify_federation(State1), + State1. + +notify_federation(#q{q = Q, + active_consumers = ActiveConsumers, + backing_queue = BQ, + backing_queue_state = BQS}) -> + IsEmpty = BQ:is_empty(BQS), + case IsEmpty andalso active_unfederated(ActiveConsumers) of true -> rabbit_federation_queue:run(Q); false -> rabbit_federation_queue:stop(Q) - end, - State1. + end. active_unfederated(Cs) -> case queue:out(Cs) of @@ -1194,6 +1203,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, active_consumers = remove_consumer( ChPid, ConsumerTag, State#q.active_consumers)}, + notify_federation(State1), case should_auto_delete(State1) of false -> reply(ok, ensure_expiry_timer(State1)); true -> stop(ok, State1) |