summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2014-01-10 14:49:44 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2014-01-10 14:49:44 +0000
commit3d5b0a0e50844e15822fd767768b4c11ceb7d8df (patch)
tree5b3b78757b7c6afcf191d6c4690592065dc7e917
parent97a842805979bdea82dcd88e3f043c5b19052e3c (diff)
downloadrabbitmq-server-bug25954.tar.gz
make draining part of r_limiter:creditbug25954
which makes the contract more obvious and is also more efficient since we no longer trawl through all consumers in order to find drainable ones.
-rw-r--r--src/rabbit_limiter.erl24
-rw-r--r--src/rabbit_queue_consumers.erl39
2 files changed, 34 insertions, 29 deletions
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index d5cfbce6..ffe055d6 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -69,7 +69,7 @@
%% about - and channel.flow blocking - that's what block/1,
%% unblock/1 and is_blocked/1 are for. They also tell the limiter
%% queue state (via the queue) about consumer credit changes -
-%% that's what credit/4 is for.
+%% that's what credit/5 is for.
%%
%% 2. Queues also tell the limiter queue state about the queue
%% becoming empty (via drained/1) and consumers leaving (via
@@ -126,7 +126,7 @@
get_prefetch_limit/1, ack/2, pid/1]).
%% queue API
-export([client/1, activate/1, can_send/3, resume/1, deactivate/1,
- is_suspended/1, is_consumer_blocked/2, credit/4, drained/1,
+ is_suspended/1, is_consumer_blocked/2, credit/5, drained/1,
forget_consumer/2]).
%% callbacks
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
@@ -168,8 +168,8 @@
-spec(deactivate/1 :: (qstate()) -> qstate()).
-spec(is_suspended/1 :: (qstate()) -> boolean()).
-spec(is_consumer_blocked/2 :: (qstate(), rabbit_types:ctag()) -> boolean()).
--spec(credit/4 :: (qstate(), rabbit_types:ctag(), non_neg_integer(), boolean())
- -> qstate()).
+-spec(credit/5 :: (qstate(), rabbit_types:ctag(), non_neg_integer(), boolean(),
+ boolean()) -> {boolean(), qstate()}).
-spec(drained/1 :: (qstate())
-> {[{rabbit_types:ctag(), non_neg_integer()}], qstate()}).
-spec(forget_consumer/2 :: (qstate(), rabbit_types:ctag()) -> qstate()).
@@ -276,8 +276,12 @@ is_consumer_blocked(#qstate{credits = Credits}, CTag) ->
{value, #credit{}} -> true
end.
-credit(Limiter = #qstate{credits = Credits}, CTag, Credit, Drain) ->
- Limiter#qstate{credits = update_credit(CTag, Credit, Drain, Credits)}.
+credit(Limiter = #qstate{credits = Credits}, CTag, Credit, Drain, IsEmpty) ->
+ {Res, Cr} = case IsEmpty andalso Drain of
+ true -> {true, make_credit(0, false)};
+ false -> {false, make_credit(Credit, Drain)}
+ end,
+ {Res, Limiter#qstate{credits = gb_trees:enter(CTag, Cr, Credits)}}.
drained(Limiter = #qstate{credits = Credits}) ->
{CTagCredits, Credits2} =
@@ -303,6 +307,10 @@ forget_consumer(Limiter = #qstate{credits = Credits}, CTag) ->
%% state for us (#qstate.credits), and maintain a fiction that the
%% limiter is making the decisions...
+make_credit(Credit, Drain) ->
+ %% Using up all credit implies no need to send a 'drained' event
+ #credit{credit = Credit, drain = Drain andalso Credit > 0}.
+
decrement_credit(CTag, Credits) ->
case gb_trees:lookup(CTag, Credits) of
{value, #credit{credit = Credit, drain = Drain}} ->
@@ -312,9 +320,7 @@ decrement_credit(CTag, Credits) ->
end.
update_credit(CTag, Credit, Drain, Credits) ->
- %% Using up all credit implies no need to send a 'drained' event
- Drain1 = Drain andalso Credit > 0,
- gb_trees:enter(CTag, #credit{credit = Credit, drain = Drain1}, Credits).
+ gb_trees:update(CTag, make_credit(Credit, Drain), Credits).
%%----------------------------------------------------------------------------
%% gen_server callbacks
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index 0a823366..908e4783 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -128,16 +128,11 @@ add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, CreditArgs, OtherArgs,
true -> rabbit_limiter:activate(Limiter);
false -> Limiter
end,
- Limiter2 = case CreditArgs of
- none -> Limiter1;
- {Crd, Drain} -> rabbit_limiter:credit(
- Limiter1, ConsumerTag, Crd, Drain)
- end,
- C1 = C#cr{consumer_count = Count + 1,
- limiter = Limiter2},
- update_ch_record(case IsEmpty of
- true -> send_drained(C1);
- false -> C1
+ C1 = C#cr{consumer_count = Count + 1, limiter = Limiter1},
+ update_ch_record(case CreditArgs of
+ none -> C1;
+ {Crd, Drain} -> credit_and_drain(
+ C1, ConsumerTag, Crd, Drain, IsEmpty)
end),
Consumer = #consumer{tag = ConsumerTag,
ack_required = not NoAck,
@@ -311,19 +306,14 @@ credit(IsEmpty, Credit, Drain, ChPid, CTag, State) ->
not_found ->
unchanged;
#cr{limiter = Limiter} = C ->
- C1 = C#cr{limiter = rabbit_limiter:credit(
- Limiter, CTag, Credit, Drain)},
- C2 = #cr{limiter = Limiter1} =
- case Drain andalso IsEmpty of
- true -> send_drained(C1);
- false -> C1
- end,
- case is_ch_blocked(C2) orelse
+ C1 = #cr{limiter = Limiter1} =
+ credit_and_drain(C, CTag, Credit, Drain, IsEmpty),
+ case is_ch_blocked(C1) orelse
(not rabbit_limiter:is_consumer_blocked(Limiter, CTag)) orelse
rabbit_limiter:is_consumer_blocked(Limiter1, CTag) of
- true -> update_ch_record(C2),
+ true -> update_ch_record(C1),
unchanged;
- false -> unblock(C2, State)
+ false -> unblock(C1, State)
end
end.
@@ -391,6 +381,15 @@ send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) ->
C#cr{limiter = Limiter2}
end.
+credit_and_drain(C = #cr{ch_pid = ChPid, limiter = Limiter},
+ CTag, Credit, Drain, IsEmpty) ->
+ case rabbit_limiter:credit(Limiter, CTag, Credit, Drain, IsEmpty) of
+ {true, Limiter1} -> rabbit_channel:send_drained(ChPid,
+ [{CTag, Credit}]),
+ C#cr{limiter = Limiter1};
+ {false, Limiter1} -> C#cr{limiter = Limiter1}
+ end.
+
tags(CList) -> [CTag || {_P, {_ChPid, #consumer{tag = CTag}}} <- CList].
add_consumer({ChPid, Consumer = #consumer{args = Args}}, Queue) ->