diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-02-03 14:23:45 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-02-03 14:23:45 +0000 |
commit | ff8d9580356dcef266924d0d70a845465cae97b2 (patch) | |
tree | 323f199daae092409bd70b7701cf7aeaaabede59 /src/rabbit_queue_consumers.erl | |
parent | 9458aae390d88ba90fe9bb11c433e40c84b48bbc (diff) | |
parent | 855d8926f66a5162d0bc65540f7ead603f2e4386 (diff) | |
download | rabbitmq-server-ff8d9580356dcef266924d0d70a845465cae97b2.tar.gz |
Merge default
Diffstat (limited to 'src/rabbit_queue_consumers.erl')
-rw-r--r-- | src/rabbit_queue_consumers.erl | 52 |
1 files changed, 34 insertions, 18 deletions
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index c9540da8..017a4f9c 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -18,7 +18,7 @@ -export([new/0, max_active_priority/1, inactive/1, all/1, count/0, unacknowledged_message_count/0, add/8, remove/3, erase_ch/2, - send_drained/0, deliver/3, record_ack/3, subtract_acks/2, + send_drained/0, deliver/3, record_ack/3, subtract_acks/4, possibly_unblock/3, resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, credit/6, utilisation/1]). @@ -82,7 +82,8 @@ {'delivered', boolean(), T, state()} | {'undelivered', boolean(), state()}. -spec record_ack(ch(), pid(), ack()) -> 'ok'. --spec subtract_acks(ch(), [ack()]) -> 'not_found' | 'ok'. +-spec subtract_acks([ack()], rabbit_types:ctag(), ch(), state()) -> + 'not_found' | 'unchanged' | {'unblocked', state()}. -spec possibly_unblock(cr_fun(), ch(), state()) -> 'unchanged' | {'unblocked', state()}. -spec resume_fun() -> cr_fun(). @@ -130,11 +131,14 @@ add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty, false -> Limiter end, C1 = C#cr{consumer_count = Count + 1, limiter = Limiter1}, - update_ch_record(case parse_credit_args(Args) of - none -> C1; - {Crd, Drain} -> credit_and_drain( - C1, CTag, Crd, Drain, IsEmpty) - end), + update_ch_record( + case parse_credit_args(Args) of + none -> C1; + {0, auto} -> C1; + {_Credit, auto} when NoAck -> C1; + {Credit, Mode} -> credit_and_drain( + C1, CTag, Credit, Mode, IsEmpty) + end), Consumer = #consumer{tag = CTag, ack_required = not NoAck, args = Args}, @@ -238,14 +242,20 @@ record_ack(ChPid, LimiterPid, AckTag) -> update_ch_record(C#cr{acktags = queue:in(AckTag, ChAckTags)}), ok. -subtract_acks(ChPid, AckTags) -> +subtract_acks(AckTags, CTag, ChPid, State) -> case lookup_ch(ChPid) of not_found -> not_found; - C = #cr{acktags = ChAckTags} -> - update_ch_record( - C#cr{acktags = subtract_acks(AckTags, [], ChAckTags)}), - ok + C = #cr{acktags = ChAckTags, limiter = Lim} -> + {Unblocked, Lim2} = + rabbit_limiter:ack_from_queue(Lim, CTag, length(AckTags)), + AckTags2 = subtract_acks(AckTags, [], ChAckTags), + C2 = C#cr{acktags = AckTags2, limiter = Lim2}, + case Unblocked of + true -> unblock(C2, State); + false -> update_ch_record(C2), + unchanged + end end. subtract_acks([], [], AckQ) -> @@ -308,7 +318,7 @@ credit(IsEmpty, Credit, Drain, ChPid, CTag, State) -> unchanged; #cr{limiter = Limiter} = C -> C1 = #cr{limiter = Limiter1} = - credit_and_drain(C, CTag, Credit, Drain, IsEmpty), + credit_and_drain(C, CTag, Credit, drain_mode(Drain), IsEmpty), case is_ch_blocked(C1) orelse (not rabbit_limiter:is_consumer_blocked(Limiter, CTag)) orelse rabbit_limiter:is_consumer_blocked(Limiter1, CTag) of @@ -318,6 +328,9 @@ credit(IsEmpty, Credit, Drain, ChPid, CTag, State) -> end end. +drain_mode(true) -> drain; +drain_mode(false) -> manual. + utilisation(#state{use = {active, Since, Avg}}) -> use_avg(now_micros() - Since, 0, Avg); utilisation(#state{use = {inactive, Since, Active, Avg}}) -> @@ -329,10 +342,13 @@ parse_credit_args(Args) -> case rabbit_misc:table_lookup(Args, <<"x-credit">>) of {table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>), rabbit_misc:table_lookup(T, <<"drain">>)} of - {{long, Credit}, {bool, Drain}} -> {Credit, Drain}; - _ -> none + {{long, C}, {bool, D}} -> {C, drain_mode(D)}; + _ -> none end; - undefined -> none + undefined -> case rabbit_misc:table_lookup(Args, <<"x-prefetch">>) of + {_, Prefetch} -> {Prefetch, auto}; + _ -> none + end end. lookup_ch(ChPid) -> @@ -393,8 +409,8 @@ send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) -> end. credit_and_drain(C = #cr{ch_pid = ChPid, limiter = Limiter}, - CTag, Credit, Drain, IsEmpty) -> - case rabbit_limiter:credit(Limiter, CTag, Credit, Drain, IsEmpty) of + CTag, Credit, Mode, IsEmpty) -> + case rabbit_limiter:credit(Limiter, CTag, Credit, Mode, IsEmpty) of {true, Limiter1} -> rabbit_channel:send_drained(ChPid, [{CTag, Credit}]), C#cr{limiter = Limiter1}; |