summaryrefslogtreecommitdiff
path: root/src/rabbit_queue_consumers.erl
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-02-03 14:23:45 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-02-03 14:23:45 +0000
commitff8d9580356dcef266924d0d70a845465cae97b2 (patch)
tree323f199daae092409bd70b7701cf7aeaaabede59 /src/rabbit_queue_consumers.erl
parent9458aae390d88ba90fe9bb11c433e40c84b48bbc (diff)
parent855d8926f66a5162d0bc65540f7ead603f2e4386 (diff)
downloadrabbitmq-server-ff8d9580356dcef266924d0d70a845465cae97b2.tar.gz
Merge default
Diffstat (limited to 'src/rabbit_queue_consumers.erl')
-rw-r--r--src/rabbit_queue_consumers.erl52
1 files changed, 34 insertions, 18 deletions
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index c9540da8..017a4f9c 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -18,7 +18,7 @@
-export([new/0, max_active_priority/1, inactive/1, all/1, count/0,
unacknowledged_message_count/0, add/8, remove/3, erase_ch/2,
- send_drained/0, deliver/3, record_ack/3, subtract_acks/2,
+ send_drained/0, deliver/3, record_ack/3, subtract_acks/4,
possibly_unblock/3,
resume_fun/0, notify_sent_fun/1, activate_limit_fun/0,
credit/6, utilisation/1]).
@@ -82,7 +82,8 @@
{'delivered', boolean(), T, state()} |
{'undelivered', boolean(), state()}.
-spec record_ack(ch(), pid(), ack()) -> 'ok'.
--spec subtract_acks(ch(), [ack()]) -> 'not_found' | 'ok'.
+-spec subtract_acks([ack()], rabbit_types:ctag(), ch(), state()) ->
+ 'not_found' | 'unchanged' | {'unblocked', state()}.
-spec possibly_unblock(cr_fun(), ch(), state()) ->
'unchanged' | {'unblocked', state()}.
-spec resume_fun() -> cr_fun().
@@ -130,11 +131,14 @@ add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty,
false -> Limiter
end,
C1 = C#cr{consumer_count = Count + 1, limiter = Limiter1},
- update_ch_record(case parse_credit_args(Args) of
- none -> C1;
- {Crd, Drain} -> credit_and_drain(
- C1, CTag, Crd, Drain, IsEmpty)
- end),
+ update_ch_record(
+ case parse_credit_args(Args) of
+ none -> C1;
+ {0, auto} -> C1;
+ {_Credit, auto} when NoAck -> C1;
+ {Credit, Mode} -> credit_and_drain(
+ C1, CTag, Credit, Mode, IsEmpty)
+ end),
Consumer = #consumer{tag = CTag,
ack_required = not NoAck,
args = Args},
@@ -238,14 +242,20 @@ record_ack(ChPid, LimiterPid, AckTag) ->
update_ch_record(C#cr{acktags = queue:in(AckTag, ChAckTags)}),
ok.
-subtract_acks(ChPid, AckTags) ->
+subtract_acks(AckTags, CTag, ChPid, State) ->
case lookup_ch(ChPid) of
not_found ->
not_found;
- C = #cr{acktags = ChAckTags} ->
- update_ch_record(
- C#cr{acktags = subtract_acks(AckTags, [], ChAckTags)}),
- ok
+ C = #cr{acktags = ChAckTags, limiter = Lim} ->
+ {Unblocked, Lim2} =
+ rabbit_limiter:ack_from_queue(Lim, CTag, length(AckTags)),
+ AckTags2 = subtract_acks(AckTags, [], ChAckTags),
+ C2 = C#cr{acktags = AckTags2, limiter = Lim2},
+ case Unblocked of
+ true -> unblock(C2, State);
+ false -> update_ch_record(C2),
+ unchanged
+ end
end.
subtract_acks([], [], AckQ) ->
@@ -308,7 +318,7 @@ credit(IsEmpty, Credit, Drain, ChPid, CTag, State) ->
unchanged;
#cr{limiter = Limiter} = C ->
C1 = #cr{limiter = Limiter1} =
- credit_and_drain(C, CTag, Credit, Drain, IsEmpty),
+ credit_and_drain(C, CTag, Credit, drain_mode(Drain), IsEmpty),
case is_ch_blocked(C1) orelse
(not rabbit_limiter:is_consumer_blocked(Limiter, CTag)) orelse
rabbit_limiter:is_consumer_blocked(Limiter1, CTag) of
@@ -318,6 +328,9 @@ credit(IsEmpty, Credit, Drain, ChPid, CTag, State) ->
end
end.
+drain_mode(true) -> drain;
+drain_mode(false) -> manual.
+
utilisation(#state{use = {active, Since, Avg}}) ->
use_avg(now_micros() - Since, 0, Avg);
utilisation(#state{use = {inactive, Since, Active, Avg}}) ->
@@ -329,10 +342,13 @@ parse_credit_args(Args) ->
case rabbit_misc:table_lookup(Args, <<"x-credit">>) of
{table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>),
rabbit_misc:table_lookup(T, <<"drain">>)} of
- {{long, Credit}, {bool, Drain}} -> {Credit, Drain};
- _ -> none
+ {{long, C}, {bool, D}} -> {C, drain_mode(D)};
+ _ -> none
end;
- undefined -> none
+ undefined -> case rabbit_misc:table_lookup(Args, <<"x-prefetch">>) of
+ {_, Prefetch} -> {Prefetch, auto};
+ _ -> none
+ end
end.
lookup_ch(ChPid) ->
@@ -393,8 +409,8 @@ send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) ->
end.
credit_and_drain(C = #cr{ch_pid = ChPid, limiter = Limiter},
- CTag, Credit, Drain, IsEmpty) ->
- case rabbit_limiter:credit(Limiter, CTag, Credit, Drain, IsEmpty) of
+ CTag, Credit, Mode, IsEmpty) ->
+ case rabbit_limiter:credit(Limiter, CTag, Credit, Mode, IsEmpty) of
{true, Limiter1} -> rabbit_channel:send_drained(ChPid,
[{CTag, Credit}]),
C#cr{limiter = Limiter1};