summaryrefslogtreecommitdiff
path: root/src/rabbit_queue_consumers.erl
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-03-04 10:53:01 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-03-04 10:53:01 +0000
commit75715c5369f5ec5a71423cb2f701e5ec9ad6bf24 (patch)
tree3d2b9648fd2876fbce607c184c5c8dd843793935 /src/rabbit_queue_consumers.erl
parent99c1e201362e3642e00f5a9052b982c77ac59f50 (diff)
downloadrabbitmq-server-75715c5369f5ec5a71423cb2f701e5ec9ad6bf24.tar.gz
Change the semantics of the basic.qos global flag to switch between per-consumer and per-channel prefetch.
Diffstat (limited to 'src/rabbit_queue_consumers.erl')
-rw-r--r--src/rabbit_queue_consumers.erl17
1 files changed, 9 insertions, 8 deletions
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index ca050f14..2086c856 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -17,7 +17,7 @@
-module(rabbit_queue_consumers).
-export([new/0, max_active_priority/1, inactive/1, all/1, count/0,
- unacknowledged_message_count/0, add/8, remove/3, erase_ch/2,
+ unacknowledged_message_count/0, add/9, remove/3, erase_ch/2,
send_drained/0, deliver/3, record_ack/3, subtract_acks/3,
possibly_unblock/3,
resume_fun/0, notify_sent_fun/1, activate_limit_fun/0,
@@ -70,7 +70,8 @@
-spec count() -> non_neg_integer().
-spec unacknowledged_message_count() -> non_neg_integer().
-spec add(ch(), rabbit_types:ctag(), boolean(), pid(), boolean(),
- rabbit_framing:amqp_table(), boolean(), state()) -> state().
+ non_neg_integer(), rabbit_framing:amqp_table(), boolean(), state())
+ -> state().
-spec remove(ch(), rabbit_types:ctag(), state()) ->
'not_found' | state().
-spec erase_ch(ch(), state()) ->
@@ -122,8 +123,8 @@ count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]).
unacknowledged_message_count() ->
lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]).
-add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty,
- State = #state{consumers = Consumers}) ->
+add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, ConsumerPrefetchCount, Args,
+ IsEmpty, State = #state{consumers = Consumers}) ->
C = #cr{consumer_count = Count,
limiter = Limiter} = ch_record(ChPid, LimiterPid),
Limiter1 = case LimiterActive of
@@ -132,7 +133,7 @@ add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty,
end,
C1 = C#cr{consumer_count = Count + 1, limiter = Limiter1},
update_ch_record(
- case parse_credit_args(Args) of
+ case parse_credit_args(ConsumerPrefetchCount, Args) of
none -> C1;
{0, auto} -> C1;
{_Credit, auto} when NoAck -> C1;
@@ -347,16 +348,16 @@ utilisation(#state{use = {inactive, Since, Active, Avg}}) ->
%%----------------------------------------------------------------------------
-parse_credit_args(Args) ->
+parse_credit_args(Default, Args) ->
case rabbit_misc:table_lookup(Args, <<"x-credit">>) of
{table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>),
rabbit_misc:table_lookup(T, <<"drain">>)} of
{{long, C}, {bool, D}} -> {C, drain_mode(D)};
- _ -> none
+ _ -> {Default, auto}
end;
undefined -> case rabbit_misc:table_lookup(Args, <<"x-prefetch">>) of
{_, Prefetch} -> {Prefetch, auto};
- _ -> none
+ _ -> {Default, auto}
end
end.