diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-01-15 14:14:00 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-01-15 14:14:00 +0000 |
commit | 7459680308c6e3660e6c5fdbca9dc988b4924f28 (patch) | |
tree | 2584a0ff320ac777e13afa8b602f65414154f1bf /src/rabbit_queue_consumers.erl | |
parent | 66fc0579894163b93a392c791648914442b91419 (diff) | |
download | rabbitmq-server-7459680308c6e3660e6c5fdbca9dc988b4924f28.tar.gz |
We can't have drain=true and mode=auto so let's unify to a single mode: manual/drain/auto.
Diffstat (limited to 'src/rabbit_queue_consumers.erl')
-rw-r--r-- | src/rabbit_queue_consumers.erl | 28 |
1 files changed, 15 insertions, 13 deletions
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index ca500d48..6024889c 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -120,7 +120,7 @@ count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]). unacknowledged_message_count() -> lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]). -add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty, +add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty, State = #state{consumers = Consumers}) -> C = #cr{consumer_count = Count, limiter = Limiter} = ch_record(ChPid, LimiterPid), @@ -131,14 +131,13 @@ add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty, C1 = C#cr{consumer_count = Count + 1, limiter = Limiter1}, update_ch_record( case parse_credit_args(Args) of - none -> C1; - {0, _Drain, auto} -> C1; - {Credit, _Drain, auto} when NoAck -> C1; - {Credit, Drain, Mode} -> credit_and_drain( - C1, ConsumerTag, Credit, - Drain, Mode, IsEmpty) + none -> C1; + {0, auto} -> C1; + {_Credit, auto} when NoAck -> C1; + {Credit, Mode} -> credit_and_drain( + C1, CTag, Credit, Mode, IsEmpty) end), - Consumer = #consumer{tag = ConsumerTag, + Consumer = #consumer{tag = CTag, ack_required = not NoAck, args = Args}, State#state{consumers = add_consumer({ChPid, Consumer}, Consumers)}. @@ -147,11 +146,11 @@ parse_credit_args(Args) -> case rabbit_misc:table_lookup(Args, <<"x-credit">>) of {table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>), rabbit_misc:table_lookup(T, <<"drain">>)} of - {{long, C}, {bool, D}} -> {C, D, manual}; + {{long, C}, {bool, D}} -> {C, drain_mode(D)}; _ -> none end; undefined -> case rabbit_misc:table_lookup(Args, <<"x-prefetch">>) of - {_, Prefetch} -> {Prefetch, false, auto}; + {_, Prefetch} -> {Prefetch, auto}; _ -> none end end. @@ -330,7 +329,7 @@ credit(IsEmpty, Credit, Drain, ChPid, CTag, State) -> unchanged; #cr{limiter = Limiter} = C -> C1 = #cr{limiter = Limiter1} = - credit_and_drain(C, CTag, Credit, Drain, manual, IsEmpty), + credit_and_drain(C, CTag, Credit, drain_mode(Drain), IsEmpty), case is_ch_blocked(C1) orelse (not rabbit_limiter:is_consumer_blocked(Limiter, CTag)) orelse rabbit_limiter:is_consumer_blocked(Limiter1, CTag) of @@ -340,6 +339,9 @@ credit(IsEmpty, Credit, Drain, ChPid, CTag, State) -> end end. +drain_mode(true) -> drain; +drain_mode(false) -> manual. + utilisation(#state{use = {active, Since, Avg}}) -> use_avg(now_micros() - Since, 0, Avg); utilisation(#state{use = {inactive, Since, Active, Avg}}) -> @@ -405,8 +407,8 @@ send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) -> end. credit_and_drain(C = #cr{ch_pid = ChPid, limiter = Limiter}, - CTag, Credit, Drain, Mode, IsEmpty) -> - case rabbit_limiter:credit(Limiter, CTag, Credit, Drain, Mode, IsEmpty) of + CTag, Credit, Mode, IsEmpty) -> + case rabbit_limiter:credit(Limiter, CTag, Credit, Mode, IsEmpty) of {true, Limiter1} -> rabbit_channel:send_drained(ChPid, [{CTag, Credit}]), C#cr{limiter = Limiter1}; |