summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-03-21 17:01:54 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-03-21 17:01:54 +0000
commit42d7a9385fd422e618124666369adadc7b9ac430 (patch)
treea6b505dd224664bbbd366e9b8136a91595fa07f0
parentc5b7be1527eb01264b4037e18c70bb3589e5e5fc (diff)
downloadrabbitmq-server-42d7a9385fd422e618124666369adadc7b9ac430.tar.gz
re-introduce state-transition optimisation for possibly_unblock
-rw-r--r--src/rabbit_amqqueue_process.erl50
1 files changed, 27 insertions, 23 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index c6a8bf2f..e24568bb 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -644,28 +644,28 @@ remove_consumers(ChPid, Queue, QName) ->
possibly_unblock(State, ChPid, Update) ->
case lookup_ch(ChPid) of
not_found -> State;
- C -> possibly_unblock(State, Update(C))
+ C -> C1 = Update(C),
+ case is_ch_blocked(C) andalso not is_ch_blocked(C1) of
+ false -> update_ch_record(C1),
+ State;
+ true -> unblock(State, C1)
+ end
end.
-possibly_unblock(State, C = #cr{limiter = Limiter}) ->
- case is_ch_blocked(C) of
- true -> update_ch_record(C),
- State;
- false -> case lists:partition(
- fun({_ChPid, #consumer{tag = CTag}}) ->
- rabbit_limiter:is_consumer_blocked(
- Limiter, CTag)
- end, queue:to_list(C#cr.blocked_consumers)) of
- {_, []} ->
- update_ch_record(C),
- State;
- {Blocked, Unblocked} ->
- BlockedQ = queue:from_list(Blocked),
- UnblockedQ = queue:from_list(Unblocked),
- update_ch_record(C#cr{blocked_consumers = BlockedQ}),
- AC1 = queue:join(State#q.active_consumers, UnblockedQ),
- run_message_queue(State#q{active_consumers = AC1})
- end
+unblock(State, C = #cr{limiter = Limiter}) ->
+ case lists:partition(
+ fun({_ChPid, #consumer{tag = CTag}}) ->
+ rabbit_limiter:is_consumer_blocked(Limiter, CTag)
+ end, queue:to_list(C#cr.blocked_consumers)) of
+ {_, []} ->
+ update_ch_record(C),
+ State;
+ {Blocked, Unblocked} ->
+ BlockedQ = queue:from_list(Blocked),
+ UnblockedQ = queue:from_list(Unblocked),
+ update_ch_record(C#cr{blocked_consumers = BlockedQ}),
+ AC1 = queue:join(State#q.active_consumers, UnblockedQ),
+ run_message_queue(State#q{active_consumers = AC1})
end.
should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false;
@@ -1389,13 +1389,17 @@ handle_cast({credit, ChPid, CTag, Credit, Drain},
backing_queue_state = BQS}) ->
Len = BQ:len(BQS),
rabbit_channel:send_credit_reply(ChPid, Len),
- C = #cr{limiter = Lim} = lookup_ch(ChPid),
- C1 = C#cr{limiter = rabbit_limiter:credit(Lim, CTag, Credit, Drain)},
+ C = #cr{limiter = Limiter} = lookup_ch(ChPid),
+ C1 = C#cr{limiter = rabbit_limiter:credit(Limiter, CTag, Credit, Drain)},
noreply(case Drain andalso Len == 0 of
true -> update_ch_record(C1),
send_drained(C1),
State;
- false -> possibly_unblock(State, C1)
+ false -> case is_ch_blocked(C1) of
+ true -> update_ch_record(C1),
+ State;
+ false -> unblock(State, C1)
+ end
end);
handle_cast(wake_up, State) ->