diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-03-06 17:19:14 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-03-06 17:19:14 +0000 |
commit | 3c896df33888e606ee470ea27db0eb0ec296a412 (patch) | |
tree | 29d2936acc06bb6622b4111e07bb48c66cf45ad8 | |
parent | 4e08c02d50bd45b7805cf8b95c71de3d74213f7b (diff) | |
parent | 59040ec0c0518cd08f58c3700e256da4756695c5 (diff) | |
download | rabbitmq-server-3c896df33888e606ee470ea27db0eb0ec296a412.tar.gz |
Merge bug25957
-rw-r--r-- | docs/rabbitmqctl.1.xml | 11 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 28 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 60 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 31 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 86 | ||||
-rw-r--r-- | src/rabbit_queue_consumers.erl | 96 |
6 files changed, 196 insertions, 116 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index a7e42503..01b024a2 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -1574,7 +1574,11 @@ </varlistentry> <varlistentry> <term>prefetch_count</term> - <listitem><para>QoS prefetch count limit in force, 0 if unlimited.</para></listitem> + <listitem><para>QoS prefetch limit for new consumers, 0 if unlimited.</para></listitem> + </varlistentry> + <varlistentry> + <term>global_prefetch_count</term> + <listitem><para>QoS prefetch limit for the entire channel, 0 if unlimited.</para></listitem> </varlistentry> </variablelist> <para> @@ -1604,8 +1608,9 @@ and is managed, the consumer tag which uniquely identifies the subscription within a channel, a boolean indicating whether acknowledgements are expected for - messages delivered to this consumer, and any arguments for this - consumer. + messages delivered to this consumer, an integer indicating + the prefetch limit (with 0 meaning 'none'), and any arguments + for this consumer. </para> </listitem> </varlistentry> diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 019cebe6..9aed28d4 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -26,7 +26,7 @@ -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). -export([force_event_refresh/1, notify_policy_changed/1]). -export([consumers/1, consumers_all/1, consumer_info_keys/0]). --export([basic_get/4, basic_consume/9, basic_cancel/4, notify_decorators/1]). +-export([basic_get/4, basic_consume/10, basic_cancel/4, notify_decorators/1]). -export([notify_sent/2, notify_sent_queue_down/1, resume/2]). -export([notify_down_all/2, activate_limit_all/2, credit/5]). -export([on_node_down/1]). @@ -114,11 +114,12 @@ -spec(notify_policy_changed/1 :: (rabbit_types:amqqueue()) -> 'ok'). -spec(consumers/1 :: (rabbit_types:amqqueue()) -> [{pid(), rabbit_types:ctag(), boolean(), - rabbit_framing:amqp_table()}]). + non_neg_integer(), rabbit_framing:amqp_table()}]). -spec(consumer_info_keys/0 :: () -> rabbit_types:info_keys()). -spec(consumers_all/1 :: (rabbit_types:vhost()) - -> [{name(), pid(), rabbit_types:ctag(), boolean()}]). + -> [{name(), pid(), rabbit_types:ctag(), boolean(), + non_neg_integer(), rabbit_framing:amqp_table()}]). -spec(stat/1 :: (rabbit_types:amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}). @@ -149,9 +150,10 @@ {'ok', non_neg_integer(), qmsg()} | 'empty'). -spec(credit/5 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), non_neg_integer(), boolean()) -> 'ok'). --spec(basic_consume/9 :: +-spec(basic_consume/10 :: (rabbit_types:amqqueue(), boolean(), pid(), pid(), boolean(), - rabbit_types:ctag(), boolean(), rabbit_framing:amqp_table(), any()) + non_neg_integer(), rabbit_types:ctag(), boolean(), + rabbit_framing:amqp_table(), any()) -> rabbit_types:ok_or_error('exclusive_consume_unavailable')). -spec(basic_cancel/4 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok'). @@ -184,7 +186,8 @@ %%---------------------------------------------------------------------------- -define(CONSUMER_INFO_KEYS, - [queue_name, channel_pid, consumer_tag, ack_required, arguments]). + [queue_name, channel_pid, consumer_tag, ack_required, prefetch_count, + arguments]). recover() -> %% Clear out remnants of old incarnation, in case we restarted @@ -531,9 +534,10 @@ consumers_all(VHostPath) -> lists:append( map(VHostPath, fun (Q) -> - [lists:zip(ConsumerInfoKeys, - [Q#amqqueue.name, ChPid, CTag, AckRequired, Args]) || - {ChPid, CTag, AckRequired, Args} <- consumers(Q)] + [lists:zip( + ConsumerInfoKeys, + [Q#amqqueue.name, ChPid, CTag, AckRequired, Prefetch, Args]) || + {ChPid, CTag, AckRequired, Prefetch, Args} <- consumers(Q)] end)). stat(#amqqueue{pid = QPid}) -> delegate:call(QPid, stat). @@ -578,10 +582,12 @@ basic_get(#amqqueue{pid = QPid}, ChPid, NoAck, LimiterPid) -> delegate:call(QPid, {basic_get, ChPid, NoAck, LimiterPid}). basic_consume(#amqqueue{pid = QPid, name = QName}, NoAck, ChPid, LimiterPid, - LimiterActive, ConsumerTag, ExclusiveConsume, Args, OkMsg) -> + LimiterActive, ConsumerPrefetchCount, ConsumerTag, + ExclusiveConsume, Args, OkMsg) -> ok = check_consume_arguments(QName, Args), delegate:call(QPid, {basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, - ConsumerTag, ExclusiveConsume, Args, OkMsg}). + ConsumerPrefetchCount, ConsumerTag, ExclusiveConsume, + Args, OkMsg}). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index ebcd0536..d415b358 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -315,7 +315,7 @@ terminate_shutdown(Fun, State) -> QName = qname(State), notify_decorators(shutdown, State), [emit_consumer_deleted(Ch, CTag, QName) || - {Ch, CTag, _, _} <- + {Ch, CTag, _, _, _} <- rabbit_queue_consumers:all(Consumers)], State1#q{backing_queue_state = Fun(BQS)} end. @@ -656,10 +656,12 @@ backing_queue_timeout(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> State#q{backing_queue_state = BQ:timeout(BQS)}. -subtract_acks(ChPid, AckTags, State, Fun) -> - case rabbit_queue_consumers:subtract_acks(ChPid, AckTags) of - not_found -> State; - ok -> Fun(State) +subtract_acks(ChPid, AckTags, State = #q{consumers = Consumers}, Fun) -> + case rabbit_queue_consumers:subtract_acks(ChPid, AckTags, Consumers) of + not_found -> State; + unchanged -> Fun(State); + {unblocked, Consumers1} -> State1 = State#q{consumers = Consumers1}, + run_message_queue(true, Fun(State1)) end. message_properties(Message, Confirm, #q{ttl = TTL}) -> @@ -824,14 +826,16 @@ emit_stats(State, Extra) -> not lists:member(K, ExtraKs)], rabbit_event:notify(queue_stats, Extra ++ Infos). -emit_consumer_created(ChPid, CTag, Exclusive, AckRequired, QName, Args, Ref) -> +emit_consumer_created(ChPid, CTag, Exclusive, AckRequired, QName, + PrefetchCount, Args, Ref) -> rabbit_event:notify(consumer_created, - [{consumer_tag, CTag}, - {exclusive, Exclusive}, - {ack_required, AckRequired}, - {channel, ChPid}, - {queue, QName}, - {arguments, Args}], + [{consumer_tag, CTag}, + {exclusive, Exclusive}, + {ack_required, AckRequired}, + {channel, ChPid}, + {queue, QName}, + {prefetch_count, PrefetchCount}, + {arguments, Args}], Ref). emit_consumer_deleted(ChPid, ConsumerTag, QName) -> @@ -844,13 +848,13 @@ emit_consumer_deleted(ChPid, ConsumerTag, QName) -> prioritise_call(Msg, _From, _Len, State) -> case Msg of - info -> 9; - {info, _Items} -> 9; - consumers -> 9; - stat -> 7; - {basic_consume, _, _, _, _, _, _, _, _} -> consumer_bias(State); - {basic_cancel, _, _, _} -> consumer_bias(State); - _ -> 0 + info -> 9; + {info, _Items} -> 9; + consumers -> 9; + stat -> 7; + {basic_consume, _, _, _, _, _, _, _, _, _} -> consumer_bias(State); + {basic_cancel, _, _, _} -> consumer_bias(State); + _ -> 0 end. prioritise_cast(Msg, _Len, State) -> @@ -959,7 +963,7 @@ handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From, end; handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, - ConsumerTag, ExclusiveConsume, Args, OkMsg}, + PrefetchCount, ConsumerTag, ExclusiveConsume, Args, OkMsg}, _From, State = #q{consumers = Consumers, exclusive_consumer = Holder}) -> case check_exclusive_access(Holder, ExclusiveConsume, State) of @@ -967,7 +971,8 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, ok -> Consumers1 = rabbit_queue_consumers:add( ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, - Args, is_empty(State), Consumers), + PrefetchCount, Args, is_empty(State), + Consumers), ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag}; true -> Holder @@ -977,7 +982,8 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, exclusive_consumer = ExclusiveConsumer}, ok = maybe_send_reply(ChPid, OkMsg), emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, - not NoAck, qname(State1), Args, none), + not NoAck, qname(State1), + PrefetchCount, Args, none), notify_decorators(State1), reply(ok, run_message_queue(State1)) end; @@ -1066,11 +1072,13 @@ handle_call({force_event_refresh, Ref}, _From, AllConsumers = rabbit_queue_consumers:all(Consumers), case Exclusive of none -> [emit_consumer_created( - Ch, CTag, false, AckRequired, QName, Args, Ref) || - {Ch, CTag, AckRequired, Args} <- AllConsumers]; - {Ch, CTag} -> [{Ch, CTag, AckRequired, Args}] = AllConsumers, + Ch, CTag, false, AckRequired, QName, Prefetch, + Args, Ref) || + {Ch, CTag, AckRequired, Prefetch, Args} + <- AllConsumers]; + {Ch, CTag} -> [{Ch, CTag, AckRequired, Prefetch, Args}] = AllConsumers, emit_consumer_created( - Ch, CTag, true, AckRequired, QName, Args, Ref) + Ch, CTag, true, AckRequired, QName, Prefetch, Args, Ref) end, reply(ok, State). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 7907c96c..2a6b01f7 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -38,7 +38,8 @@ queue_names, queue_monitors, consumer_mapping, queue_consumers, delivering_queues, queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, - unconfirmed, confirmed, mandatory, capabilities, trace_state}). + unconfirmed, confirmed, mandatory, capabilities, trace_state, + consumer_prefetch}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -52,6 +53,7 @@ messages_uncommitted, acks_uncommitted, prefetch_count, + global_prefetch_count, state]). -define(CREATION_EVENT_KEYS, @@ -216,7 +218,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, confirmed = [], mandatory = dtree:empty(), capabilities = Capabilities, - trace_state = rabbit_trace:init(VHost)}, + trace_state = rabbit_trace:init(VHost), + consumer_prefetch = 0}, State1 = rabbit_event:init_stats_timer(State, #ch.stats_timer), rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State1)), rabbit_event:if_enabled(State1, #ch.stats_timer, @@ -752,9 +755,10 @@ handle_method(#'basic.consume'{queue = QueueNameBin, exclusive = ExclusiveConsume, nowait = NoWait, arguments = Args}, - _, State = #ch{conn_pid = ConnPid, - limiter = Limiter, - consumer_mapping = ConsumerMapping}) -> + _, State = #ch{conn_pid = ConnPid, + limiter = Limiter, + consumer_prefetch = ConsumerPrefetchCount, + consumer_mapping = ConsumerMapping}) -> case dict:find(ConsumerTag, ConsumerMapping) of error -> QueueName = qbin_to_resource(QueueNameBin, State), @@ -776,6 +780,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, Q, NoAck, self(), rabbit_limiter:pid(Limiter), rabbit_limiter:is_active(Limiter), + ConsumerPrefetchCount, ActualConsumerTag, ExclusiveConsume, Args, ok_msg(NoWait, #'basic.consume_ok'{ consumer_tag = ActualConsumerTag})), @@ -842,19 +847,22 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait}, end end; -handle_method(#'basic.qos'{global = true}, _, _State) -> - rabbit_misc:protocol_error(not_implemented, "global=true", []); - handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 -> rabbit_misc:protocol_error(not_implemented, "prefetch_size!=0 (~w)", [Size]); -handle_method(#'basic.qos'{prefetch_count = 0}, +handle_method(#'basic.qos'{global = false, + prefetch_count = PrefetchCount}, _, State) -> + {reply, #'basic.qos_ok'{}, State#ch{consumer_prefetch = PrefetchCount}}; + +handle_method(#'basic.qos'{global = true, + prefetch_count = 0}, _, State = #ch{limiter = Limiter}) -> Limiter1 = rabbit_limiter:unlimit_prefetch(Limiter), {reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter1}}; -handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, +handle_method(#'basic.qos'{global = true, + prefetch_count = PrefetchCount}, _, State = #ch{limiter = Limiter, unacked_message_q = UAMQ}) -> %% TODO queue:len(UAMQ) is not strictly right since that counts %% unacked messages from basic.get too. Pretty obscure though. @@ -1603,7 +1611,8 @@ i(acks_uncommitted, #ch{tx = {_Msgs, Acks}}) -> ack_len(Acks); i(acks_uncommitted, #ch{}) -> 0; i(state, #ch{state = running}) -> credit_flow:state(); i(state, #ch{state = State}) -> State; -i(prefetch_count, #ch{limiter = Limiter}) -> +i(prefetch_count, #ch{consumer_prefetch = C}) -> C; +i(global_prefetch_count, #ch{limiter = Limiter}) -> rabbit_limiter:get_prefetch_limit(Limiter); i(Item, _) -> throw({bad_argument, Item}). diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index d37b356c..5776fc3f 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -17,7 +17,8 @@ %% The purpose of the limiter is to stem the flow of messages from %% queues to channels, in order to act upon various protocol-level %% flow control mechanisms, specifically AMQP 0-9-1's basic.qos -%% prefetch_count and AMQP 1.0's link (aka consumer) credit mechanism. +%% prefetch_count, our consumer prefetch extension, and AMQP 1.0's +%% link (aka consumer) credit mechanism. %% %% Each channel has an associated limiter process, created with %% start_link/1, which it passes to queues on consumer creation with @@ -54,11 +55,15 @@ %% inactive. In practice it is rare for that to happen, though we %% could optimise this case in the future. %% -%% In addition, the consumer credit bookkeeping is local to queues, so -%% it is not necessary to store information about it in the limiter -%% process. But for abstraction we hide it from the queue behind the -%% limiter API, and it therefore becomes part of the queue local -%% state. +%% Consumer credit (for AMQP 1.0) and per-consumer prefetch (for AMQP +%% 0-9-1) are treated as essentially the same thing, but with the +%% exception that per-consumer prefetch gets an auto-topup when +%% acknowledgments come in. +%% +%% The bookkeeping for this is local to queues, so it is not necessary +%% to store information about it in the limiter process. But for +%% abstraction we hide it from the queue behind the limiter API, and +%% it therefore becomes part of the queue local state. %% %% The interactions with the limiter are as follows: %% @@ -66,7 +71,8 @@ %% that's what the limit_prefetch/3, unlimit_prefetch/1, %% get_prefetch_limit/1 API functions are about. They also tell the %% limiter queue state (via the queue) about consumer credit -%% changes - that's what credit/5 is for. +%% changes and message acknowledgement - that's what credit/5 and +%% ack_from_queue/3 are for. %% %% 2. Queues also tell the limiter queue state about the queue %% becoming empty (via drained/1) and consumers leaving (via @@ -123,8 +129,8 @@ get_prefetch_limit/1, ack/2, pid/1]). %% queue API -export([client/1, activate/1, can_send/3, resume/1, deactivate/1, - is_suspended/1, is_consumer_blocked/2, credit/5, drained/1, - forget_consumer/2]). + is_suspended/1, is_consumer_blocked/2, credit/5, ack_from_queue/3, + drained/1, forget_consumer/2]). %% callbacks -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, prioritise_call/4]). @@ -141,6 +147,8 @@ -type(qstate() :: #qstate{pid :: pid(), state :: 'dormant' | 'active' | 'suspended'}). +-type(credit_mode() :: 'manual' | 'drain' | 'auto'). + -spec(start_link/1 :: (rabbit_types:proc_name()) -> rabbit_types:ok_pid_or_error()). -spec(new/1 :: (pid()) -> lstate()). @@ -161,8 +169,10 @@ -spec(deactivate/1 :: (qstate()) -> qstate()). -spec(is_suspended/1 :: (qstate()) -> boolean()). -spec(is_consumer_blocked/2 :: (qstate(), rabbit_types:ctag()) -> boolean()). --spec(credit/5 :: (qstate(), rabbit_types:ctag(), non_neg_integer(), boolean(), - boolean()) -> {boolean(), qstate()}). +-spec(credit/5 :: (qstate(), rabbit_types:ctag(), non_neg_integer(), + credit_mode(), boolean()) -> {boolean(), qstate()}). +-spec(ack_from_queue/3 :: (qstate(), rabbit_types:ctag(), non_neg_integer()) + -> {boolean(), qstate()}). -spec(drained/1 :: (qstate()) -> {[{rabbit_types:ctag(), non_neg_integer()}], qstate()}). -spec(forget_consumer/2 :: (qstate(), rabbit_types:ctag()) -> qstate()). @@ -179,7 +189,7 @@ %% notified of a change in the limit or volume that may allow it to %% deliver more messages via the limiter's channel. --record(credit, {credit = 0, drain = false}). +-record(credit, {credit = 0, mode}). %%---------------------------------------------------------------------------- %% API @@ -256,19 +266,32 @@ is_consumer_blocked(#qstate{credits = Credits}, CTag) -> {value, #credit{}} -> true end. -credit(Limiter = #qstate{credits = Credits}, CTag, Credit, Drain, IsEmpty) -> - {Res, Cr} = case IsEmpty andalso Drain of - true -> {true, make_credit(0, false)}; - false -> {false, make_credit(Credit, Drain)} - end, - {Res, Limiter#qstate{credits = gb_trees:enter(CTag, Cr, Credits)}}. +credit(Limiter = #qstate{credits = Credits}, CTag, Crd, Mode, IsEmpty) -> + {Res, Cr} = + case IsEmpty andalso Mode =:= drain of + true -> {true, #credit{credit = 0, mode = manual}}; + false -> {false, #credit{credit = Crd, mode = Mode}} + end, + {Res, Limiter#qstate{credits = enter_credit(CTag, Cr, Credits)}}. + +ack_from_queue(Limiter = #qstate{credits = Credits}, CTag, Credit) -> + {Credits1, Unblocked} = + case gb_trees:lookup(CTag, Credits) of + {value, C = #credit{mode = auto, credit = C0}} -> + {update_credit(CTag, C#credit{credit = C0 + Credit}, Credits), + C0 =:= 0 andalso Credit =/= 0}; + _ -> + {Credits, false} + end, + {Unblocked, Limiter#qstate{credits = Credits1}}. drained(Limiter = #qstate{credits = Credits}) -> + Drain = fun(C) -> C#credit{credit = 0, mode = manual} end, {CTagCredits, Credits2} = rabbit_misc:gb_trees_fold( - fun (CTag, #credit{credit = C, drain = true}, {Acc, Creds0}) -> - {[{CTag, C} | Acc], update_credit(CTag, 0, false, Creds0)}; - (_CTag, #credit{credit = _C, drain = false}, {Acc, Creds0}) -> + fun (CTag, C = #credit{credit = Crd, mode = drain}, {Acc, Creds0}) -> + {[{CTag, Crd} | Acc], update_credit(CTag, Drain(C), Creds0)}; + (_CTag, #credit{credit = _Crd, mode = _Mode}, {Acc, Creds0}) -> {Acc, Creds0} end, {[], Credits}, Credits), {CTagCredits, Limiter#qstate{credits = Credits2}}. @@ -287,20 +310,25 @@ forget_consumer(Limiter = #qstate{credits = Credits}, CTag) -> %% state for us (#qstate.credits), and maintain a fiction that the %% limiter is making the decisions... -make_credit(Credit, Drain) -> - %% Using up all credit implies no need to send a 'drained' event - #credit{credit = Credit, drain = Drain andalso Credit > 0}. - decrement_credit(CTag, Credits) -> case gb_trees:lookup(CTag, Credits) of - {value, #credit{credit = Credit, drain = Drain}} -> - update_credit(CTag, Credit - 1, Drain, Credits); + {value, C = #credit{credit = Credit}} -> + update_credit(CTag, C#credit{credit = Credit - 1}, Credits); none -> Credits end. -update_credit(CTag, Credit, Drain, Credits) -> - gb_trees:update(CTag, make_credit(Credit, Drain), Credits). +enter_credit(CTag, C, Credits) -> + gb_trees:enter(CTag, ensure_credit_invariant(C), Credits). + +update_credit(CTag, C, Credits) -> + gb_trees:update(CTag, ensure_credit_invariant(C), Credits). + +ensure_credit_invariant(C = #credit{credit = 0, mode = drain}) -> + %% Using up all credit implies no need to send a 'drained' event + C#credit{mode = manual}; +ensure_credit_invariant(C) -> + C. %%---------------------------------------------------------------------------- %% gen_server callbacks diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index 177cb9e8..7ba5d25e 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -17,8 +17,8 @@ -module(rabbit_queue_consumers). -export([new/0, max_active_priority/1, inactive/1, all/1, count/0, - unacknowledged_message_count/0, add/8, remove/3, erase_ch/2, - send_drained/0, deliver/3, record_ack/3, subtract_acks/2, + unacknowledged_message_count/0, add/9, remove/3, erase_ch/2, + send_drained/0, deliver/3, record_ack/3, subtract_acks/3, possibly_unblock/3, resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, credit/6, utilisation/1]). @@ -32,7 +32,7 @@ -record(state, {consumers, use}). --record(consumer, {tag, ack_required, args}). +-record(consumer, {tag, ack_required, prefetch, args}). %% These are held in our process dictionary -record(cr, {ch_pid, @@ -66,11 +66,12 @@ -spec max_active_priority(state()) -> integer() | 'infinity' | 'empty'. -spec inactive(state()) -> boolean(). -spec all(state()) -> [{ch(), rabbit_types:ctag(), boolean(), - rabbit_framing:amqp_table()}]. + non_neg_integer(), rabbit_framing:amqp_table()}]. -spec count() -> non_neg_integer(). -spec unacknowledged_message_count() -> non_neg_integer(). -spec add(ch(), rabbit_types:ctag(), boolean(), pid(), boolean(), - rabbit_framing:amqp_table(), boolean(), state()) -> state(). + non_neg_integer(), rabbit_framing:amqp_table(), boolean(), state()) + -> state(). -spec remove(ch(), rabbit_types:ctag(), state()) -> 'not_found' | state(). -spec erase_ch(ch(), state()) -> @@ -82,7 +83,8 @@ {'delivered', boolean(), T, state()} | {'undelivered', boolean(), state()}. -spec record_ack(ch(), pid(), ack()) -> 'ok'. --spec subtract_acks(ch(), [ack()]) -> 'not_found' | 'ok'. +-spec subtract_acks(ch(), [ack()], state()) -> + 'not_found' | 'unchanged' | {'unblocked', state()}. -spec possibly_unblock(cr_fun(), ch(), state()) -> 'unchanged' | {'unblocked', state()}. -spec resume_fun() -> cr_fun(). @@ -112,8 +114,9 @@ all(#state{consumers = Consumers}) -> consumers(Consumers, Acc) -> priority_queue:fold( fun ({ChPid, Consumer}, _P, Acc1) -> - #consumer{tag = CTag, ack_required = Ack, args = Args} = Consumer, - [{ChPid, CTag, Ack, Args} | Acc1] + #consumer{tag = CTag, ack_required = Ack, prefetch = Prefetch, + args = Args} = Consumer, + [{ChPid, CTag, Ack, Prefetch, Args} | Acc1] end, Acc, Consumers). count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]). @@ -121,7 +124,7 @@ count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]). unacknowledged_message_count() -> lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]). -add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty, +add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Prefetch, Args, IsEmpty, State = #state{consumers = Consumers}) -> C = #cr{consumer_count = Count, limiter = Limiter} = ch_record(ChPid, LimiterPid), @@ -130,13 +133,16 @@ add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty, false -> Limiter end, C1 = C#cr{consumer_count = Count + 1, limiter = Limiter1}, - update_ch_record(case parse_credit_args(Args) of - none -> C1; - {Crd, Drain} -> credit_and_drain( - C1, CTag, Crd, Drain, IsEmpty) - end), + update_ch_record( + case parse_credit_args(Prefetch, Args) of + {0, auto} -> C1; + {_Credit, auto} when NoAck -> C1; + {Credit, Mode} -> credit_and_drain( + C1, CTag, Credit, Mode, IsEmpty) + end), Consumer = #consumer{tag = CTag, ack_required = not NoAck, + prefetch = Prefetch, args = Args}, State#state{consumers = add_consumer({ChPid, Consumer}, Consumers)}. @@ -169,7 +175,7 @@ erase_ch(ChPid, State = #state{consumers = Consumers}) -> blocked_consumers = BlockedQ} -> AllConsumers = priority_queue:join(Consumers, BlockedQ), ok = erase_ch_record(C), - {queue:to_list(ChAckTags), + {[AckTag || {AckTag, _CTag} <- queue:to_list(ChAckTags)], tags(priority_queue:to_list(AllConsumers)), State#state{consumers = remove_consumers(ChPid, Consumers)}} end. @@ -226,7 +232,7 @@ deliver_to_consumer(FetchFun, rabbit_channel:deliver(ChPid, CTag, AckRequired, {QName, self(), AckTag, IsDelivered, Message}), ChAckTags1 = case AckRequired of - true -> queue:in(AckTag, ChAckTags); + true -> queue:in({AckTag, CTag}, ChAckTags); false -> ChAckTags end, update_ch_record(C#cr{acktags = ChAckTags1, @@ -235,27 +241,42 @@ deliver_to_consumer(FetchFun, record_ack(ChPid, LimiterPid, AckTag) -> C = #cr{acktags = ChAckTags} = ch_record(ChPid, LimiterPid), - update_ch_record(C#cr{acktags = queue:in(AckTag, ChAckTags)}), + update_ch_record(C#cr{acktags = queue:in({AckTag, none}, ChAckTags)}), ok. -subtract_acks(ChPid, AckTags) -> +subtract_acks(ChPid, AckTags, State) -> case lookup_ch(ChPid) of not_found -> not_found; - C = #cr{acktags = ChAckTags} -> - update_ch_record( - C#cr{acktags = subtract_acks(AckTags, [], ChAckTags)}), - ok + C = #cr{acktags = ChAckTags, limiter = Lim} -> + {CTagCounts, AckTags2} = subtract_acks( + AckTags, [], orddict:new(), ChAckTags), + {Unblocked, Lim2} = + orddict:fold( + fun (CTag, Count, {UnblockedN, LimN}) -> + {Unblocked1, LimN1} = + rabbit_limiter:ack_from_queue(LimN, CTag, Count), + {UnblockedN orelse Unblocked1, LimN1} + end, {false, Lim}, CTagCounts), + C2 = C#cr{acktags = AckTags2, limiter = Lim2}, + case Unblocked of + true -> unblock(C2, State); + false -> update_ch_record(C2), + unchanged + end end. -subtract_acks([], [], AckQ) -> - AckQ; -subtract_acks([], Prefix, AckQ) -> - queue:join(queue:from_list(lists:reverse(Prefix)), AckQ); -subtract_acks([T | TL] = AckTags, Prefix, AckQ) -> +subtract_acks([], [], CTagCounts, AckQ) -> + {CTagCounts, AckQ}; +subtract_acks([], Prefix, CTagCounts, AckQ) -> + {CTagCounts, queue:join(queue:from_list(lists:reverse(Prefix)), AckQ)}; +subtract_acks([T | TL] = AckTags, Prefix, CTagCounts, AckQ) -> case queue:out(AckQ) of - {{value, T}, QTail} -> subtract_acks(TL, Prefix, QTail); - {{value, AT}, QTail} -> subtract_acks(AckTags, [AT | Prefix], QTail) + {{value, {T, CTag}}, QTail} -> + subtract_acks(TL, Prefix, + orddict:update_counter(CTag, 1, CTagCounts), QTail); + {{value, V}, QTail} -> + subtract_acks(AckTags, [V | Prefix], CTagCounts, QTail) end. possibly_unblock(Update, ChPid, State) -> @@ -308,7 +329,7 @@ credit(IsEmpty, Credit, Drain, ChPid, CTag, State) -> unchanged; #cr{limiter = Limiter} = C -> C1 = #cr{limiter = Limiter1} = - credit_and_drain(C, CTag, Credit, Drain, IsEmpty), + credit_and_drain(C, CTag, Credit, drain_mode(Drain), IsEmpty), case is_ch_blocked(C1) orelse (not rabbit_limiter:is_consumer_blocked(Limiter, CTag)) orelse rabbit_limiter:is_consumer_blocked(Limiter1, CTag) of @@ -318,6 +339,9 @@ credit(IsEmpty, Credit, Drain, ChPid, CTag, State) -> end end. +drain_mode(true) -> drain; +drain_mode(false) -> manual. + utilisation(#state{use = {active, Since, Avg}}) -> use_avg(now_micros() - Since, 0, Avg); utilisation(#state{use = {inactive, Since, Active, Avg}}) -> @@ -325,14 +349,14 @@ utilisation(#state{use = {inactive, Since, Active, Avg}}) -> %%---------------------------------------------------------------------------- -parse_credit_args(Args) -> +parse_credit_args(Default, Args) -> case rabbit_misc:table_lookup(Args, <<"x-credit">>) of {table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>), rabbit_misc:table_lookup(T, <<"drain">>)} of - {{long, Credit}, {bool, Drain}} -> {Credit, Drain}; - _ -> none + {{long, C}, {bool, D}} -> {C, drain_mode(D)}; + _ -> {Default, auto} end; - undefined -> none + undefined -> {Default, auto} end. lookup_ch(ChPid) -> @@ -393,8 +417,8 @@ send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) -> end. credit_and_drain(C = #cr{ch_pid = ChPid, limiter = Limiter}, - CTag, Credit, Drain, IsEmpty) -> - case rabbit_limiter:credit(Limiter, CTag, Credit, Drain, IsEmpty) of + CTag, Credit, Mode, IsEmpty) -> + case rabbit_limiter:credit(Limiter, CTag, Credit, Mode, IsEmpty) of {true, Limiter1} -> rabbit_channel:send_drained(ChPid, [{CTag, Credit}]), C#cr{limiter = Limiter1}; |