diff options
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 37 |
1 files changed, 19 insertions, 18 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index fb60a043..0530d650 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -644,25 +644,26 @@ remove_consumers(ChPid, Queue, QName) -> possibly_unblock(State, ChPid, Update) -> case lookup_ch(ChPid) of - not_found -> + not_found -> State; + C -> possibly_unblock(State, Update(C)) + end. + +possibly_unblock(State, C = #cr{limiter = Limiter}) -> + IsChBlocked = is_ch_blocked(C), + case lists:partition( + fun({_ChPid, #consumer{tag = CTag}}) -> + IsChBlocked orelse + rabbit_limiter:is_consumer_blocked(Limiter, CTag) + end, queue:to_list(C#cr.blocked_consumers)) of + {_, []} -> + update_ch_record(C), State; - C -> - C1 = #cr{limiter = Limiter} = Update(C), - {Blocked, Unblocked} = - lists:partition( - fun({_ChPid, #consumer{tag = CTag}}) -> - is_ch_blocked(C1) orelse - rabbit_limiter:is_consumer_blocked(Limiter, CTag) - end, queue:to_list(C1#cr.blocked_consumers)), - case Unblocked of - [] -> update_ch_record(C1), - State; - _ -> update_ch_record( - C1#cr{blocked_consumers = queue:from_list(Blocked)}), - AC1 = queue:join(State#q.active_consumers, - queue:from_list(Unblocked)), - run_message_queue(State#q{active_consumers = AC1}) - end + {Blocked, Unblocked} -> + BlockedQ = queue:from_list(Blocked), + UnblockedQ = queue:from_list(Unblocked), + update_ch_record(C#cr{blocked_consumers = BlockedQ}), + AC1 = queue:join(State#q.active_consumers, UnblockedQ), + run_message_queue(State#q{active_consumers = AC1}) end. should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false; |