diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-01-07 12:06:33 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-01-07 12:06:33 +0000 |
commit | d2e7aeed688ee6f1491799bb11d7fd3c955b7ae8 (patch) | |
tree | 8901fc29424a968477f4ed53830669fd2eba4d38 | |
parent | caf6ab4e9c4327be98ed1a452f3d6c7eced9b278 (diff) | |
download | rabbitmq-server-d2e7aeed688ee6f1491799bb11d7fd3c955b7ae8.tar.gz |
Fix handling of credit arriving with drain=true and an empty queue - we need to not really add the credit (since it is "used up" immediately and thus not unblock if needed).
-rw-r--r-- | src/rabbit_limiter.erl | 14 | ||||
-rw-r--r-- | src/rabbit_queue_consumers.erl | 8 |
2 files changed, 13 insertions, 9 deletions
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 22da465b..c5e704b6 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -126,7 +126,7 @@ get_prefetch_limit/1, ack/2, pid/1]). %% queue API -export([client/1, activate/1, can_send/3, resume/1, deactivate/1, - is_suspended/1, is_consumer_blocked/2, credit/4, drained/1, + is_suspended/1, is_consumer_blocked/2, credit/5, drained/1, forget_consumer/2]). %% callbacks -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, @@ -168,8 +168,8 @@ -spec(deactivate/1 :: (qstate()) -> qstate()). -spec(is_suspended/1 :: (qstate()) -> boolean()). -spec(is_consumer_blocked/2 :: (qstate(), rabbit_types:ctag()) -> boolean()). --spec(credit/4 :: (qstate(), rabbit_types:ctag(), non_neg_integer(), boolean()) - -> qstate()). +-spec(credit/5 :: (qstate(), rabbit_types:ctag(), non_neg_integer(), boolean(), + boolean()) -> qstate()). -spec(drained/1 :: (qstate()) -> {[{rabbit_types:ctag(), non_neg_integer()}], qstate()}). -spec(forget_consumer/2 :: (qstate(), rabbit_types:ctag()) -> qstate()). @@ -276,8 +276,12 @@ is_consumer_blocked(#qstate{credits = Credits}, CTag) -> none -> false end. -credit(Limiter = #qstate{credits = Credits}, CTag, Credit, Drain) -> - Limiter#qstate{credits = update_credit(CTag, Credit, Drain, Credits)}. +credit(Limiter = #qstate{credits = Credits}, CTag, Credit, IsEmpty, Drain) -> + Credit1 = case Drain andalso IsEmpty of + true -> 0; + false -> Credit + end, + Limiter#qstate{credits = update_credit(CTag, Credit1, Drain, Credits)}. drained(Limiter = #qstate{credits = Credits}) -> {CTagCredits, Credits2} = diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index ea0ab6da..702091dc 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -121,7 +121,7 @@ unacknowledged_message_count() -> lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]). add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, CreditArgs, OtherArgs, - Drained, State = #state{consumers = Consumers}) -> + IsEmpty, State = #state{consumers = Consumers}) -> C = #cr{consumer_count = Count, limiter = Limiter} = ch_record(ChPid, LimiterPid), Limiter1 = case LimiterActive of @@ -131,11 +131,11 @@ add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, CreditArgs, OtherArgs, Limiter2 = case CreditArgs of none -> Limiter1; {Crd, Drain} -> rabbit_limiter:credit( - Limiter1, ConsumerTag, Crd, Drain) + Limiter1, ConsumerTag, Crd, IsEmpty, Drain) end, C1 = C#cr{consumer_count = Count + 1, limiter = Limiter2}, - update_ch_record(case Drained of + update_ch_record(case IsEmpty of true -> send_drained(C1); false -> C1 end), @@ -313,7 +313,7 @@ activate_limit_fun() -> credit_fun(IsEmpty, Credit, Drain, CTag) -> fun (C = #cr{limiter = Limiter}) -> C1 = C#cr{limiter = rabbit_limiter:credit( - Limiter, CTag, Credit, Drain)}, + Limiter, CTag, Credit, IsEmpty, Drain)}, case Drain andalso IsEmpty of true -> send_drained(C1); false -> C1 |