summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-02-20 19:42:27 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-02-20 19:42:27 +0000
commitd011068eabb9861e49e3c1323a9bf1a8465a2b30 (patch)
treeb83545284379a02610370ccc06b8a412701bc41d
parentadf8c37c62b490fb42db906a318fe957ac39d299 (diff)
downloadrabbitmq-server-d011068eabb9861e49e3c1323a9bf1a8465a2b30.tar.gz
refactor possibly_unblock
-rw-r--r--src/rabbit_amqqueue_process.erl37
1 files changed, 19 insertions, 18 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index fb60a043..0530d650 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -644,25 +644,26 @@ remove_consumers(ChPid, Queue, QName) ->
possibly_unblock(State, ChPid, Update) ->
case lookup_ch(ChPid) of
- not_found ->
+ not_found -> State;
+ C -> possibly_unblock(State, Update(C))
+ end.
+
+possibly_unblock(State, C = #cr{limiter = Limiter}) ->
+ IsChBlocked = is_ch_blocked(C),
+ case lists:partition(
+ fun({_ChPid, #consumer{tag = CTag}}) ->
+ IsChBlocked orelse
+ rabbit_limiter:is_consumer_blocked(Limiter, CTag)
+ end, queue:to_list(C#cr.blocked_consumers)) of
+ {_, []} ->
+ update_ch_record(C),
State;
- C ->
- C1 = #cr{limiter = Limiter} = Update(C),
- {Blocked, Unblocked} =
- lists:partition(
- fun({_ChPid, #consumer{tag = CTag}}) ->
- is_ch_blocked(C1) orelse
- rabbit_limiter:is_consumer_blocked(Limiter, CTag)
- end, queue:to_list(C1#cr.blocked_consumers)),
- case Unblocked of
- [] -> update_ch_record(C1),
- State;
- _ -> update_ch_record(
- C1#cr{blocked_consumers = queue:from_list(Blocked)}),
- AC1 = queue:join(State#q.active_consumers,
- queue:from_list(Unblocked)),
- run_message_queue(State#q{active_consumers = AC1})
- end
+ {Blocked, Unblocked} ->
+ BlockedQ = queue:from_list(Blocked),
+ UnblockedQ = queue:from_list(Unblocked),
+ update_ch_record(C#cr{blocked_consumers = BlockedQ}),
+ AC1 = queue:join(State#q.active_consumers, UnblockedQ),
+ run_message_queue(State#q{active_consumers = AC1})
end.
should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false;