diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2014-01-10 14:49:44 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2014-01-10 14:49:44 +0000 |
commit | 3d5b0a0e50844e15822fd767768b4c11ceb7d8df (patch) | |
tree | 5b3b78757b7c6afcf191d6c4690592065dc7e917 | |
parent | 97a842805979bdea82dcd88e3f043c5b19052e3c (diff) | |
download | rabbitmq-server-bug25954.tar.gz |
make draining part of r_limiter:creditbug25954
which makes the contract more obvious and is also more efficient since
we no longer trawl through all consumers in order to find drainable
ones.
-rw-r--r-- | src/rabbit_limiter.erl | 24 | ||||
-rw-r--r-- | src/rabbit_queue_consumers.erl | 39 |
2 files changed, 34 insertions, 29 deletions
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index d5cfbce6..ffe055d6 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -69,7 +69,7 @@ %% about - and channel.flow blocking - that's what block/1, %% unblock/1 and is_blocked/1 are for. They also tell the limiter %% queue state (via the queue) about consumer credit changes - -%% that's what credit/4 is for. +%% that's what credit/5 is for. %% %% 2. Queues also tell the limiter queue state about the queue %% becoming empty (via drained/1) and consumers leaving (via @@ -126,7 +126,7 @@ get_prefetch_limit/1, ack/2, pid/1]). %% queue API -export([client/1, activate/1, can_send/3, resume/1, deactivate/1, - is_suspended/1, is_consumer_blocked/2, credit/4, drained/1, + is_suspended/1, is_consumer_blocked/2, credit/5, drained/1, forget_consumer/2]). %% callbacks -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, @@ -168,8 +168,8 @@ -spec(deactivate/1 :: (qstate()) -> qstate()). -spec(is_suspended/1 :: (qstate()) -> boolean()). -spec(is_consumer_blocked/2 :: (qstate(), rabbit_types:ctag()) -> boolean()). --spec(credit/4 :: (qstate(), rabbit_types:ctag(), non_neg_integer(), boolean()) - -> qstate()). +-spec(credit/5 :: (qstate(), rabbit_types:ctag(), non_neg_integer(), boolean(), + boolean()) -> {boolean(), qstate()}). -spec(drained/1 :: (qstate()) -> {[{rabbit_types:ctag(), non_neg_integer()}], qstate()}). -spec(forget_consumer/2 :: (qstate(), rabbit_types:ctag()) -> qstate()). @@ -276,8 +276,12 @@ is_consumer_blocked(#qstate{credits = Credits}, CTag) -> {value, #credit{}} -> true end. -credit(Limiter = #qstate{credits = Credits}, CTag, Credit, Drain) -> - Limiter#qstate{credits = update_credit(CTag, Credit, Drain, Credits)}. +credit(Limiter = #qstate{credits = Credits}, CTag, Credit, Drain, IsEmpty) -> + {Res, Cr} = case IsEmpty andalso Drain of + true -> {true, make_credit(0, false)}; + false -> {false, make_credit(Credit, Drain)} + end, + {Res, Limiter#qstate{credits = gb_trees:enter(CTag, Cr, Credits)}}. drained(Limiter = #qstate{credits = Credits}) -> {CTagCredits, Credits2} = @@ -303,6 +307,10 @@ forget_consumer(Limiter = #qstate{credits = Credits}, CTag) -> %% state for us (#qstate.credits), and maintain a fiction that the %% limiter is making the decisions... +make_credit(Credit, Drain) -> + %% Using up all credit implies no need to send a 'drained' event + #credit{credit = Credit, drain = Drain andalso Credit > 0}. + decrement_credit(CTag, Credits) -> case gb_trees:lookup(CTag, Credits) of {value, #credit{credit = Credit, drain = Drain}} -> @@ -312,9 +320,7 @@ decrement_credit(CTag, Credits) -> end. update_credit(CTag, Credit, Drain, Credits) -> - %% Using up all credit implies no need to send a 'drained' event - Drain1 = Drain andalso Credit > 0, - gb_trees:enter(CTag, #credit{credit = Credit, drain = Drain1}, Credits). + gb_trees:update(CTag, make_credit(Credit, Drain), Credits). %%---------------------------------------------------------------------------- %% gen_server callbacks diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index 0a823366..908e4783 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -128,16 +128,11 @@ add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, CreditArgs, OtherArgs, true -> rabbit_limiter:activate(Limiter); false -> Limiter end, - Limiter2 = case CreditArgs of - none -> Limiter1; - {Crd, Drain} -> rabbit_limiter:credit( - Limiter1, ConsumerTag, Crd, Drain) - end, - C1 = C#cr{consumer_count = Count + 1, - limiter = Limiter2}, - update_ch_record(case IsEmpty of - true -> send_drained(C1); - false -> C1 + C1 = C#cr{consumer_count = Count + 1, limiter = Limiter1}, + update_ch_record(case CreditArgs of + none -> C1; + {Crd, Drain} -> credit_and_drain( + C1, ConsumerTag, Crd, Drain, IsEmpty) end), Consumer = #consumer{tag = ConsumerTag, ack_required = not NoAck, @@ -311,19 +306,14 @@ credit(IsEmpty, Credit, Drain, ChPid, CTag, State) -> not_found -> unchanged; #cr{limiter = Limiter} = C -> - C1 = C#cr{limiter = rabbit_limiter:credit( - Limiter, CTag, Credit, Drain)}, - C2 = #cr{limiter = Limiter1} = - case Drain andalso IsEmpty of - true -> send_drained(C1); - false -> C1 - end, - case is_ch_blocked(C2) orelse + C1 = #cr{limiter = Limiter1} = + credit_and_drain(C, CTag, Credit, Drain, IsEmpty), + case is_ch_blocked(C1) orelse (not rabbit_limiter:is_consumer_blocked(Limiter, CTag)) orelse rabbit_limiter:is_consumer_blocked(Limiter1, CTag) of - true -> update_ch_record(C2), + true -> update_ch_record(C1), unchanged; - false -> unblock(C2, State) + false -> unblock(C1, State) end end. @@ -391,6 +381,15 @@ send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) -> C#cr{limiter = Limiter2} end. +credit_and_drain(C = #cr{ch_pid = ChPid, limiter = Limiter}, + CTag, Credit, Drain, IsEmpty) -> + case rabbit_limiter:credit(Limiter, CTag, Credit, Drain, IsEmpty) of + {true, Limiter1} -> rabbit_channel:send_drained(ChPid, + [{CTag, Credit}]), + C#cr{limiter = Limiter1}; + {false, Limiter1} -> C#cr{limiter = Limiter1} + end. + tags(CList) -> [CTag || {_P, {_ChPid, #consumer{tag = CTag}}} <- CList]. add_consumer({ChPid, Consumer = #consumer{args = Args}}, Queue) -> |