diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2013-03-21 17:01:54 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2013-03-21 17:01:54 +0000 |
commit | 42d7a9385fd422e618124666369adadc7b9ac430 (patch) | |
tree | a6b505dd224664bbbd366e9b8136a91595fa07f0 | |
parent | c5b7be1527eb01264b4037e18c70bb3589e5e5fc (diff) | |
download | rabbitmq-server-42d7a9385fd422e618124666369adadc7b9ac430.tar.gz |
re-introduce state-transition optimisation for possibly_unblock
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 50 |
1 files changed, 27 insertions, 23 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index c6a8bf2f..e24568bb 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -644,28 +644,28 @@ remove_consumers(ChPid, Queue, QName) -> possibly_unblock(State, ChPid, Update) -> case lookup_ch(ChPid) of not_found -> State; - C -> possibly_unblock(State, Update(C)) + C -> C1 = Update(C), + case is_ch_blocked(C) andalso not is_ch_blocked(C1) of + false -> update_ch_record(C1), + State; + true -> unblock(State, C1) + end end. -possibly_unblock(State, C = #cr{limiter = Limiter}) -> - case is_ch_blocked(C) of - true -> update_ch_record(C), - State; - false -> case lists:partition( - fun({_ChPid, #consumer{tag = CTag}}) -> - rabbit_limiter:is_consumer_blocked( - Limiter, CTag) - end, queue:to_list(C#cr.blocked_consumers)) of - {_, []} -> - update_ch_record(C), - State; - {Blocked, Unblocked} -> - BlockedQ = queue:from_list(Blocked), - UnblockedQ = queue:from_list(Unblocked), - update_ch_record(C#cr{blocked_consumers = BlockedQ}), - AC1 = queue:join(State#q.active_consumers, UnblockedQ), - run_message_queue(State#q{active_consumers = AC1}) - end +unblock(State, C = #cr{limiter = Limiter}) -> + case lists:partition( + fun({_ChPid, #consumer{tag = CTag}}) -> + rabbit_limiter:is_consumer_blocked(Limiter, CTag) + end, queue:to_list(C#cr.blocked_consumers)) of + {_, []} -> + update_ch_record(C), + State; + {Blocked, Unblocked} -> + BlockedQ = queue:from_list(Blocked), + UnblockedQ = queue:from_list(Unblocked), + update_ch_record(C#cr{blocked_consumers = BlockedQ}), + AC1 = queue:join(State#q.active_consumers, UnblockedQ), + run_message_queue(State#q{active_consumers = AC1}) end. should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false; @@ -1389,13 +1389,17 @@ handle_cast({credit, ChPid, CTag, Credit, Drain}, backing_queue_state = BQS}) -> Len = BQ:len(BQS), rabbit_channel:send_credit_reply(ChPid, Len), - C = #cr{limiter = Lim} = lookup_ch(ChPid), - C1 = C#cr{limiter = rabbit_limiter:credit(Lim, CTag, Credit, Drain)}, + C = #cr{limiter = Limiter} = lookup_ch(ChPid), + C1 = C#cr{limiter = rabbit_limiter:credit(Limiter, CTag, Credit, Drain)}, noreply(case Drain andalso Len == 0 of true -> update_ch_record(C1), send_drained(C1), State; - false -> possibly_unblock(State, C1) + false -> case is_ch_blocked(C1) of + true -> update_ch_record(C1), + State; + false -> unblock(State, C1) + end end); handle_cast(wake_up, State) -> |