summaryrefslogtreecommitdiff
path: root/src/rabbit_queue_consumers.erl
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-01-15 13:54:24 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-01-15 13:54:24 +0000
commit66fc0579894163b93a392c791648914442b91419 (patch)
treeba70e1af1ac162af0f923a1ea8a2a68475362d95 /src/rabbit_queue_consumers.erl
parentc7a01e7829f4dcb72610c69de4be2e78e3480a99 (diff)
downloadrabbitmq-server-66fc0579894163b93a392c791648914442b91419.tar.gz
Unify rabbit_limiter:credit/5 and rabbit_limiter:set_consumer_prefetch/4.
Diffstat (limited to 'src/rabbit_queue_consumers.erl')
-rw-r--r--src/rabbit_queue_consumers.erl24
1 files changed, 11 insertions, 13 deletions
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index f864a5ba..ca500d48 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -131,14 +131,12 @@ add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty,
C1 = C#cr{consumer_count = Count + 1, limiter = Limiter1},
update_ch_record(
case parse_credit_args(Args) of
- none ->
- C1;
- {credit, Credit, Drain} ->
- credit_and_drain(C1, ConsumerTag, Credit, Drain, IsEmpty);
- {prefetch, P} ->
- Limiter2 = rabbit_limiter:set_consumer_prefetch(
- Limiter1, ConsumerTag, NoAck, P),
- C1#cr{limiter = Limiter2}
+ none -> C1;
+ {0, _Drain, auto} -> C1;
+ {Credit, _Drain, auto} when NoAck -> C1;
+ {Credit, Drain, Mode} -> credit_and_drain(
+ C1, ConsumerTag, Credit,
+ Drain, Mode, IsEmpty)
end),
Consumer = #consumer{tag = ConsumerTag,
ack_required = not NoAck,
@@ -149,11 +147,11 @@ parse_credit_args(Args) ->
case rabbit_misc:table_lookup(Args, <<"x-credit">>) of
{table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>),
rabbit_misc:table_lookup(T, <<"drain">>)} of
- {{long, C}, {bool, D}} -> {credit, C, D};
+ {{long, C}, {bool, D}} -> {C, D, manual};
_ -> none
end;
undefined -> case rabbit_misc:table_lookup(Args, <<"x-prefetch">>) of
- {_, Prefetch} -> {prefetch, Prefetch};
+ {_, Prefetch} -> {Prefetch, false, auto};
_ -> none
end
end.
@@ -332,7 +330,7 @@ credit(IsEmpty, Credit, Drain, ChPid, CTag, State) ->
unchanged;
#cr{limiter = Limiter} = C ->
C1 = #cr{limiter = Limiter1} =
- credit_and_drain(C, CTag, Credit, Drain, IsEmpty),
+ credit_and_drain(C, CTag, Credit, Drain, manual, IsEmpty),
case is_ch_blocked(C1) orelse
(not rabbit_limiter:is_consumer_blocked(Limiter, CTag)) orelse
rabbit_limiter:is_consumer_blocked(Limiter1, CTag) of
@@ -407,8 +405,8 @@ send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) ->
end.
credit_and_drain(C = #cr{ch_pid = ChPid, limiter = Limiter},
- CTag, Credit, Drain, IsEmpty) ->
- case rabbit_limiter:credit(Limiter, CTag, Credit, Drain, IsEmpty) of
+ CTag, Credit, Drain, Mode, IsEmpty) ->
+ case rabbit_limiter:credit(Limiter, CTag, Credit, Drain, Mode, IsEmpty) of
{true, Limiter1} -> rabbit_channel:send_drained(ChPid,
[{CTag, Credit}]),
C#cr{limiter = Limiter1};