diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-03-04 10:53:01 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-03-04 10:53:01 +0000 |
commit | 75715c5369f5ec5a71423cb2f701e5ec9ad6bf24 (patch) | |
tree | 3d2b9648fd2876fbce607c184c5c8dd843793935 /src/rabbit_queue_consumers.erl | |
parent | 99c1e201362e3642e00f5a9052b982c77ac59f50 (diff) | |
download | rabbitmq-server-75715c5369f5ec5a71423cb2f701e5ec9ad6bf24.tar.gz |
Change the semantics of the basic.qos global flag to switch between per-consumer and per-channel prefetch.
Diffstat (limited to 'src/rabbit_queue_consumers.erl')
-rw-r--r-- | src/rabbit_queue_consumers.erl | 17 |
1 files changed, 9 insertions, 8 deletions
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index ca050f14..2086c856 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -17,7 +17,7 @@ -module(rabbit_queue_consumers). -export([new/0, max_active_priority/1, inactive/1, all/1, count/0, - unacknowledged_message_count/0, add/8, remove/3, erase_ch/2, + unacknowledged_message_count/0, add/9, remove/3, erase_ch/2, send_drained/0, deliver/3, record_ack/3, subtract_acks/3, possibly_unblock/3, resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, @@ -70,7 +70,8 @@ -spec count() -> non_neg_integer(). -spec unacknowledged_message_count() -> non_neg_integer(). -spec add(ch(), rabbit_types:ctag(), boolean(), pid(), boolean(), - rabbit_framing:amqp_table(), boolean(), state()) -> state(). + non_neg_integer(), rabbit_framing:amqp_table(), boolean(), state()) + -> state(). -spec remove(ch(), rabbit_types:ctag(), state()) -> 'not_found' | state(). -spec erase_ch(ch(), state()) -> @@ -122,8 +123,8 @@ count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]). unacknowledged_message_count() -> lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]). -add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty, - State = #state{consumers = Consumers}) -> +add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, ConsumerPrefetchCount, Args, + IsEmpty, State = #state{consumers = Consumers}) -> C = #cr{consumer_count = Count, limiter = Limiter} = ch_record(ChPid, LimiterPid), Limiter1 = case LimiterActive of @@ -132,7 +133,7 @@ add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty, end, C1 = C#cr{consumer_count = Count + 1, limiter = Limiter1}, update_ch_record( - case parse_credit_args(Args) of + case parse_credit_args(ConsumerPrefetchCount, Args) of none -> C1; {0, auto} -> C1; {_Credit, auto} when NoAck -> C1; @@ -347,16 +348,16 @@ utilisation(#state{use = {inactive, Since, Active, Avg}}) -> %%---------------------------------------------------------------------------- -parse_credit_args(Args) -> +parse_credit_args(Default, Args) -> case rabbit_misc:table_lookup(Args, <<"x-credit">>) of {table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>), rabbit_misc:table_lookup(T, <<"drain">>)} of {{long, C}, {bool, D}} -> {C, drain_mode(D)}; - _ -> none + _ -> {Default, auto} end; undefined -> case rabbit_misc:table_lookup(Args, <<"x-prefetch">>) of {_, Prefetch} -> {Prefetch, auto}; - _ -> none + _ -> {Default, auto} end end. |