summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-01-07 12:06:33 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-01-07 12:06:33 +0000
commitd2e7aeed688ee6f1491799bb11d7fd3c955b7ae8 (patch)
tree8901fc29424a968477f4ed53830669fd2eba4d38
parentcaf6ab4e9c4327be98ed1a452f3d6c7eced9b278 (diff)
downloadrabbitmq-server-d2e7aeed688ee6f1491799bb11d7fd3c955b7ae8.tar.gz
Fix handling of credit arriving with drain=true and an empty queue - we need to not really add the credit (since it is "used up" immediately and thus not unblock if needed).
-rw-r--r--src/rabbit_limiter.erl14
-rw-r--r--src/rabbit_queue_consumers.erl8
2 files changed, 13 insertions, 9 deletions
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 22da465b..c5e704b6 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -126,7 +126,7 @@
get_prefetch_limit/1, ack/2, pid/1]).
%% queue API
-export([client/1, activate/1, can_send/3, resume/1, deactivate/1,
- is_suspended/1, is_consumer_blocked/2, credit/4, drained/1,
+ is_suspended/1, is_consumer_blocked/2, credit/5, drained/1,
forget_consumer/2]).
%% callbacks
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
@@ -168,8 +168,8 @@
-spec(deactivate/1 :: (qstate()) -> qstate()).
-spec(is_suspended/1 :: (qstate()) -> boolean()).
-spec(is_consumer_blocked/2 :: (qstate(), rabbit_types:ctag()) -> boolean()).
--spec(credit/4 :: (qstate(), rabbit_types:ctag(), non_neg_integer(), boolean())
- -> qstate()).
+-spec(credit/5 :: (qstate(), rabbit_types:ctag(), non_neg_integer(), boolean(),
+ boolean()) -> qstate()).
-spec(drained/1 :: (qstate())
-> {[{rabbit_types:ctag(), non_neg_integer()}], qstate()}).
-spec(forget_consumer/2 :: (qstate(), rabbit_types:ctag()) -> qstate()).
@@ -276,8 +276,12 @@ is_consumer_blocked(#qstate{credits = Credits}, CTag) ->
none -> false
end.
-credit(Limiter = #qstate{credits = Credits}, CTag, Credit, Drain) ->
- Limiter#qstate{credits = update_credit(CTag, Credit, Drain, Credits)}.
+credit(Limiter = #qstate{credits = Credits}, CTag, Credit, IsEmpty, Drain) ->
+ Credit1 = case Drain andalso IsEmpty of
+ true -> 0;
+ false -> Credit
+ end,
+ Limiter#qstate{credits = update_credit(CTag, Credit1, Drain, Credits)}.
drained(Limiter = #qstate{credits = Credits}) ->
{CTagCredits, Credits2} =
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index ea0ab6da..702091dc 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -121,7 +121,7 @@ unacknowledged_message_count() ->
lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]).
add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, CreditArgs, OtherArgs,
- Drained, State = #state{consumers = Consumers}) ->
+ IsEmpty, State = #state{consumers = Consumers}) ->
C = #cr{consumer_count = Count,
limiter = Limiter} = ch_record(ChPid, LimiterPid),
Limiter1 = case LimiterActive of
@@ -131,11 +131,11 @@ add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, CreditArgs, OtherArgs,
Limiter2 = case CreditArgs of
none -> Limiter1;
{Crd, Drain} -> rabbit_limiter:credit(
- Limiter1, ConsumerTag, Crd, Drain)
+ Limiter1, ConsumerTag, Crd, IsEmpty, Drain)
end,
C1 = C#cr{consumer_count = Count + 1,
limiter = Limiter2},
- update_ch_record(case Drained of
+ update_ch_record(case IsEmpty of
true -> send_drained(C1);
false -> C1
end),
@@ -313,7 +313,7 @@ activate_limit_fun() ->
credit_fun(IsEmpty, Credit, Drain, CTag) ->
fun (C = #cr{limiter = Limiter}) ->
C1 = C#cr{limiter = rabbit_limiter:credit(
- Limiter, CTag, Credit, Drain)},
+ Limiter, CTag, Credit, IsEmpty, Drain)},
case Drain andalso IsEmpty of
true -> send_drained(C1);
false -> C1