summaryrefslogtreecommitdiff
path: root/src/rabbit_queue_consumers.erl
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-01-15 14:14:00 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-01-15 14:14:00 +0000
commit7459680308c6e3660e6c5fdbca9dc988b4924f28 (patch)
tree2584a0ff320ac777e13afa8b602f65414154f1bf /src/rabbit_queue_consumers.erl
parent66fc0579894163b93a392c791648914442b91419 (diff)
downloadrabbitmq-server-7459680308c6e3660e6c5fdbca9dc988b4924f28.tar.gz
We can't have drain=true and mode=auto so let's unify to a single mode: manual/drain/auto.
Diffstat (limited to 'src/rabbit_queue_consumers.erl')
-rw-r--r--src/rabbit_queue_consumers.erl28
1 files changed, 15 insertions, 13 deletions
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index ca500d48..6024889c 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -120,7 +120,7 @@ count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]).
unacknowledged_message_count() ->
lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]).
-add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty,
+add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty,
State = #state{consumers = Consumers}) ->
C = #cr{consumer_count = Count,
limiter = Limiter} = ch_record(ChPid, LimiterPid),
@@ -131,14 +131,13 @@ add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty,
C1 = C#cr{consumer_count = Count + 1, limiter = Limiter1},
update_ch_record(
case parse_credit_args(Args) of
- none -> C1;
- {0, _Drain, auto} -> C1;
- {Credit, _Drain, auto} when NoAck -> C1;
- {Credit, Drain, Mode} -> credit_and_drain(
- C1, ConsumerTag, Credit,
- Drain, Mode, IsEmpty)
+ none -> C1;
+ {0, auto} -> C1;
+ {_Credit, auto} when NoAck -> C1;
+ {Credit, Mode} -> credit_and_drain(
+ C1, CTag, Credit, Mode, IsEmpty)
end),
- Consumer = #consumer{tag = ConsumerTag,
+ Consumer = #consumer{tag = CTag,
ack_required = not NoAck,
args = Args},
State#state{consumers = add_consumer({ChPid, Consumer}, Consumers)}.
@@ -147,11 +146,11 @@ parse_credit_args(Args) ->
case rabbit_misc:table_lookup(Args, <<"x-credit">>) of
{table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>),
rabbit_misc:table_lookup(T, <<"drain">>)} of
- {{long, C}, {bool, D}} -> {C, D, manual};
+ {{long, C}, {bool, D}} -> {C, drain_mode(D)};
_ -> none
end;
undefined -> case rabbit_misc:table_lookup(Args, <<"x-prefetch">>) of
- {_, Prefetch} -> {Prefetch, false, auto};
+ {_, Prefetch} -> {Prefetch, auto};
_ -> none
end
end.
@@ -330,7 +329,7 @@ credit(IsEmpty, Credit, Drain, ChPid, CTag, State) ->
unchanged;
#cr{limiter = Limiter} = C ->
C1 = #cr{limiter = Limiter1} =
- credit_and_drain(C, CTag, Credit, Drain, manual, IsEmpty),
+ credit_and_drain(C, CTag, Credit, drain_mode(Drain), IsEmpty),
case is_ch_blocked(C1) orelse
(not rabbit_limiter:is_consumer_blocked(Limiter, CTag)) orelse
rabbit_limiter:is_consumer_blocked(Limiter1, CTag) of
@@ -340,6 +339,9 @@ credit(IsEmpty, Credit, Drain, ChPid, CTag, State) ->
end
end.
+drain_mode(true) -> drain;
+drain_mode(false) -> manual.
+
utilisation(#state{use = {active, Since, Avg}}) ->
use_avg(now_micros() - Since, 0, Avg);
utilisation(#state{use = {inactive, Since, Active, Avg}}) ->
@@ -405,8 +407,8 @@ send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) ->
end.
credit_and_drain(C = #cr{ch_pid = ChPid, limiter = Limiter},
- CTag, Credit, Drain, Mode, IsEmpty) ->
- case rabbit_limiter:credit(Limiter, CTag, Credit, Drain, Mode, IsEmpty) of
+ CTag, Credit, Mode, IsEmpty) ->
+ case rabbit_limiter:credit(Limiter, CTag, Credit, Mode, IsEmpty) of
{true, Limiter1} -> rabbit_channel:send_drained(ChPid,
[{CTag, Credit}]),
C#cr{limiter = Limiter1};