diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-01-15 13:54:24 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-01-15 13:54:24 +0000 |
commit | 66fc0579894163b93a392c791648914442b91419 (patch) | |
tree | ba70e1af1ac162af0f923a1ea8a2a68475362d95 /src/rabbit_queue_consumers.erl | |
parent | c7a01e7829f4dcb72610c69de4be2e78e3480a99 (diff) | |
download | rabbitmq-server-66fc0579894163b93a392c791648914442b91419.tar.gz |
Unify rabbit_limiter:credit/5 and rabbit_limiter:set_consumer_prefetch/4.
Diffstat (limited to 'src/rabbit_queue_consumers.erl')
-rw-r--r-- | src/rabbit_queue_consumers.erl | 24 |
1 files changed, 11 insertions, 13 deletions
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index f864a5ba..ca500d48 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -131,14 +131,12 @@ add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty, C1 = C#cr{consumer_count = Count + 1, limiter = Limiter1}, update_ch_record( case parse_credit_args(Args) of - none -> - C1; - {credit, Credit, Drain} -> - credit_and_drain(C1, ConsumerTag, Credit, Drain, IsEmpty); - {prefetch, P} -> - Limiter2 = rabbit_limiter:set_consumer_prefetch( - Limiter1, ConsumerTag, NoAck, P), - C1#cr{limiter = Limiter2} + none -> C1; + {0, _Drain, auto} -> C1; + {Credit, _Drain, auto} when NoAck -> C1; + {Credit, Drain, Mode} -> credit_and_drain( + C1, ConsumerTag, Credit, + Drain, Mode, IsEmpty) end), Consumer = #consumer{tag = ConsumerTag, ack_required = not NoAck, @@ -149,11 +147,11 @@ parse_credit_args(Args) -> case rabbit_misc:table_lookup(Args, <<"x-credit">>) of {table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>), rabbit_misc:table_lookup(T, <<"drain">>)} of - {{long, C}, {bool, D}} -> {credit, C, D}; + {{long, C}, {bool, D}} -> {C, D, manual}; _ -> none end; undefined -> case rabbit_misc:table_lookup(Args, <<"x-prefetch">>) of - {_, Prefetch} -> {prefetch, Prefetch}; + {_, Prefetch} -> {Prefetch, false, auto}; _ -> none end end. @@ -332,7 +330,7 @@ credit(IsEmpty, Credit, Drain, ChPid, CTag, State) -> unchanged; #cr{limiter = Limiter} = C -> C1 = #cr{limiter = Limiter1} = - credit_and_drain(C, CTag, Credit, Drain, IsEmpty), + credit_and_drain(C, CTag, Credit, Drain, manual, IsEmpty), case is_ch_blocked(C1) orelse (not rabbit_limiter:is_consumer_blocked(Limiter, CTag)) orelse rabbit_limiter:is_consumer_blocked(Limiter1, CTag) of @@ -407,8 +405,8 @@ send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) -> end. credit_and_drain(C = #cr{ch_pid = ChPid, limiter = Limiter}, - CTag, Credit, Drain, IsEmpty) -> - case rabbit_limiter:credit(Limiter, CTag, Credit, Drain, IsEmpty) of + CTag, Credit, Drain, Mode, IsEmpty) -> + case rabbit_limiter:credit(Limiter, CTag, Credit, Drain, Mode, IsEmpty) of {true, Limiter1} -> rabbit_channel:send_drained(ChPid, [{CTag, Credit}]), C#cr{limiter = Limiter1}; |