diff options
Diffstat (limited to 'src/rabbit_limiter.erl')
-rw-r--r-- | src/rabbit_limiter.erl | 407 |
1 files changed, 288 insertions, 119 deletions
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 74d36aba..d9f1170e 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -14,42 +14,165 @@ %% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% +%% The purpose of the limiter is to stem the flow of messages from +%% queues to channels, in order to act upon various protocol-level +%% flow control mechanisms, specifically AMQP 0-9-1's basic.qos +%% prefetch_count and channel.flow, and AMQP 1.0's link (aka consumer) +%% credit mechanism. +%% +%% Each channel has an associated limiter process, created with +%% start_link/1, which it passes to queues on consumer creation with +%% rabbit_amqqueue:basic_consume/9, and rabbit_amqqueue:basic_get/4. +%% The latter isn't strictly necessary, since basic.get is not +%% subject to limiting, but it means that whenever a queue knows about +%% a channel, it also knows about its limiter, which is less fiddly. +%% +%% The limiter process holds state that is, in effect, shared between +%% the channel and all queues from which the channel is +%% consuming. Essentially all these queues are competing for access to +%% a single, limited resource - the ability to deliver messages via +%% the channel - and it is the job of the limiter process to mediate +%% that access. +%% +%% The limiter process is separate from the channel process for two +%% reasons: separation of concerns, and efficiency. Channels can get +%% very busy, particularly if they are also dealing with publishes. +%% With a separate limiter process all the aforementioned access +%% mediation can take place without touching the channel. +%% +%% For efficiency, both the channel and the queues keep some local +%% state, initialised from the limiter pid with new/1 and client/1, +%% respectively. In particular this allows them to avoid any +%% interaction with the limiter process when it is 'inactive', i.e. no +%% protocol-level flow control is taking place. +%% +%% This optimisation does come at the cost of some complexity though: +%% when a limiter becomes active, the channel needs to inform all its +%% consumer queues of this change in status. It does this by invoking +%% rabbit_amqqueue:activate_limit_all/2. Note that there is no inverse +%% transition, i.e. once a queue has been told about an active +%% limiter, it is not subsequently told when that limiter becomes +%% inactive. In practice it is rare for that to happen, though we +%% could optimise this case in the future. +%% +%% In addition, the consumer credit bookkeeping is local to queues, so +%% it is not necessary to store information about it in the limiter +%% process. But for abstraction we hide it from the queue behind the +%% limiter API, and it therefore becomes part of the queue local +%% state. +%% +%% The interactions with the limiter are as follows: +%% +%% 1. Channels tell the limiter about basic.qos prefetch counts - +%% that's what the limit_prefetch/3, unlimit_prefetch/1, +%% is_prefetch_limited/1, get_prefetch_limit/1 API functions are +%% about - and channel.flow blocking - that's what block/1, +%% unblock/1 and is_blocked/1 are for. They also tell the limiter +%% queue state (via the queue) about consumer credit changes - +%% that's what credit/4 is for. +%% +%% 2. Queues also tell the limiter queue state about the queue +%% becoming empty (via drained/1) and consumers leaving (via +%% forget_consumer/2). +%% +%% 3. Queues register with the limiter - this happens as part of +%% activate/1. +%% +%% 4. The limiter process maintains an internal counter of 'messages +%% sent but not yet acknowledged', called the 'volume'. +%% +%% 5. Queues ask the limiter for permission (with can_send/3) whenever +%% they want to deliver a message to a channel. The limiter checks +%% whether a) the channel isn't blocked by channel.flow, b) the +%% volume has not yet reached the prefetch limit, and c) whether +%% the consumer has enough credit. If so it increments the volume +%% and tells the queue to proceed. Otherwise it marks the queue as +%% requiring notification (see below) and tells the queue not to +%% proceed. +%% +%% 6. A queue that has been told to proceed (by the return value of +%% can_send/3) sends the message to the channel. Conversely, a +%% queue that has been told not to proceed, will not attempt to +%% deliver that message, or any future messages, to the +%% channel. This is accomplished by can_send/3 capturing the +%% outcome in the local state, where it can be accessed with +%% is_suspended/1. +%% +%% 7. When a channel receives an ack it tells the limiter (via ack/2) +%% how many messages were ack'ed. The limiter process decrements +%% the volume and if it falls below the prefetch_count then it +%% notifies (through rabbit_amqqueue:resume/2) all the queues +%% requiring notification, i.e. all those that had a can_send/3 +%% request denied. +%% +%% 8. Upon receipt of such a notification, queues resume delivery to +%% the channel, i.e. they will once again start asking limiter, as +%% described in (5). +%% +%% 9. When a queue has no more consumers associated with a particular +%% channel, it deactivates use of the limiter with deactivate/1, +%% which alters the local state such that no further interactions +%% with the limiter process take place until a subsequent +%% activate/1. + -module(rabbit_limiter). -behaviour(gen_server2). +-export([start_link/0]). +%% channel API +-export([new/1, limit_prefetch/3, unlimit_prefetch/1, block/1, unblock/1, + is_prefetch_limited/1, is_blocked/1, is_active/1, + get_prefetch_limit/1, ack/2, pid/1]). +%% queue API +-export([client/1, activate/1, can_send/3, resume/1, deactivate/1, + is_suspended/1, is_consumer_blocked/2, credit/4, drained/1, + forget_consumer/2]). +%% callbacks -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, prioritise_call/4]). --export([start_link/0, make_token/0, make_token/1, is_enabled/1, enable/2, - disable/1]). --export([limit/2, can_send/3, ack/2, register/2, unregister/2]). --export([get_limit/1, block/1, unblock/1, is_blocked/1]). %%---------------------------------------------------------------------------- --record(token, {pid, enabled}). +-record(lstate, {pid, prefetch_limited, blocked}). +-record(qstate, {pid, state, credits}). -ifdef(use_specs). --export_type([token/0]). - --opaque(token() :: #token{}). +-type(lstate() :: #lstate{pid :: pid(), + prefetch_limited :: boolean(), + blocked :: boolean()}). +-type(qstate() :: #qstate{pid :: pid(), + state :: 'dormant' | 'active' | 'suspended'}). -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). --spec(make_token/0 :: () -> token()). --spec(make_token/1 :: ('undefined' | pid()) -> token()). --spec(is_enabled/1 :: (token()) -> boolean()). --spec(enable/2 :: (token(), non_neg_integer()) -> token()). --spec(disable/1 :: (token()) -> token()). --spec(limit/2 :: (token(), non_neg_integer()) -> 'ok' | {'disabled', token()}). --spec(can_send/3 :: (token(), pid(), boolean()) -> boolean()). --spec(ack/2 :: (token(), non_neg_integer()) -> 'ok'). --spec(register/2 :: (token(), pid()) -> 'ok'). --spec(unregister/2 :: (token(), pid()) -> 'ok'). --spec(get_limit/1 :: (token()) -> non_neg_integer()). --spec(block/1 :: (token()) -> 'ok'). --spec(unblock/1 :: (token()) -> 'ok' | {'disabled', token()}). --spec(is_blocked/1 :: (token()) -> boolean()). +-spec(new/1 :: (pid()) -> lstate()). + +-spec(limit_prefetch/3 :: (lstate(), non_neg_integer(), non_neg_integer()) + -> lstate()). +-spec(unlimit_prefetch/1 :: (lstate()) -> lstate()). +-spec(block/1 :: (lstate()) -> lstate()). +-spec(unblock/1 :: (lstate()) -> lstate()). +-spec(is_prefetch_limited/1 :: (lstate()) -> boolean()). +-spec(is_blocked/1 :: (lstate()) -> boolean()). +-spec(is_active/1 :: (lstate()) -> boolean()). +-spec(get_prefetch_limit/1 :: (lstate()) -> non_neg_integer()). +-spec(ack/2 :: (lstate(), non_neg_integer()) -> 'ok'). +-spec(pid/1 :: (lstate()) -> pid()). + +-spec(client/1 :: (pid()) -> qstate()). +-spec(activate/1 :: (qstate()) -> qstate()). +-spec(can_send/3 :: (qstate(), boolean(), rabbit_types:ctag()) -> + {'continue' | 'suspend', qstate()}). +-spec(resume/1 :: (qstate()) -> qstate()). +-spec(deactivate/1 :: (qstate()) -> qstate()). +-spec(is_suspended/1 :: (qstate()) -> boolean()). +-spec(is_consumer_blocked/2 :: (qstate(), rabbit_types:ctag()) -> boolean()). +-spec(credit/4 :: (qstate(), rabbit_types:ctag(), non_neg_integer(), boolean()) + -> qstate()). +-spec(drained/1 :: (qstate()) + -> {[{rabbit_types:ctag(), non_neg_integer()}], qstate()}). +-spec(forget_consumer/2 :: (qstate(), rabbit_types:ctag()) -> qstate()). -endif. @@ -64,120 +187,181 @@ %% notified of a change in the limit or volume that may allow it to %% deliver more messages via the limiter's channel. +-record(credit, {credit = 0, drain = false}). + %%---------------------------------------------------------------------------- %% API %%---------------------------------------------------------------------------- start_link() -> gen_server2:start_link(?MODULE, [], []). -make_token() -> make_token(undefined). -make_token(Pid) -> #token{pid = Pid, enabled = false}. +new(Pid) -> + %% this a 'call' to ensure that it is invoked at most once. + ok = gen_server:call(Pid, {new, self()}), + #lstate{pid = Pid, prefetch_limited = false, blocked = false}. -is_enabled(#token{enabled = Enabled}) -> Enabled. +limit_prefetch(L, PrefetchCount, UnackedCount) when PrefetchCount > 0 -> + ok = gen_server:call(L#lstate.pid, + {limit_prefetch, PrefetchCount, UnackedCount}), + L#lstate{prefetch_limited = true}. -enable(#token{pid = Pid} = Token, Volume) -> - gen_server2:call(Pid, {enable, Token, self(), Volume}, infinity). +unlimit_prefetch(L) -> + ok = gen_server:call(L#lstate.pid, unlimit_prefetch), + L#lstate{prefetch_limited = false}. -disable(#token{pid = Pid} = Token) -> - gen_server2:call(Pid, {disable, Token}, infinity). +block(L) -> + ok = gen_server:call(L#lstate.pid, block), + L#lstate{blocked = true}. -limit(Limiter, PrefetchCount) -> - maybe_call(Limiter, {limit, PrefetchCount, Limiter}, ok). +unblock(L) -> + ok = gen_server:call(L#lstate.pid, unblock), + L#lstate{blocked = false}. -%% Ask the limiter whether the queue can deliver a message without -%% breaching a limit. Note that we don't use maybe_call here in order -%% to avoid always going through with_exit_handler/2, even when the -%% limiter is disabled. -can_send(#token{pid = Pid, enabled = true}, QPid, AckRequired) -> - rabbit_misc:with_exit_handler( - fun () -> true end, - fun () -> - gen_server2:call(Pid, {can_send, QPid, AckRequired}, infinity) - end); -can_send(_, _, _) -> - true. +is_prefetch_limited(#lstate{prefetch_limited = Limited}) -> Limited. + +is_blocked(#lstate{blocked = Blocked}) -> Blocked. + +is_active(L) -> is_prefetch_limited(L) orelse is_blocked(L). + +get_prefetch_limit(#lstate{prefetch_limited = false}) -> 0; +get_prefetch_limit(L) -> gen_server:call(L#lstate.pid, get_prefetch_limit). -%% Let the limiter know that the channel has received some acks from a -%% consumer -ack(Limiter, Count) -> maybe_cast(Limiter, {ack, Count}). +ack(#lstate{prefetch_limited = false}, _AckCount) -> ok; +ack(L, AckCount) -> gen_server:cast(L#lstate.pid, {ack, AckCount}). -register(Limiter, QPid) -> maybe_cast(Limiter, {register, QPid}). +pid(#lstate{pid = Pid}) -> Pid. -unregister(Limiter, QPid) -> maybe_cast(Limiter, {unregister, QPid}). +client(Pid) -> #qstate{pid = Pid, state = dormant, credits = gb_trees:empty()}. -get_limit(Limiter) -> +activate(L = #qstate{state = dormant}) -> + ok = gen_server:cast(L#qstate.pid, {register, self()}), + L#qstate{state = active}; +activate(L) -> L. + +can_send(L = #qstate{pid = Pid, state = State, credits = Credits}, + AckRequired, CTag) -> + case is_consumer_blocked(L, CTag) of + false -> case (State =/= active orelse + safe_call(Pid, {can_send, self(), AckRequired}, true)) of + true -> {continue, L#qstate{ + credits = record_send_q(CTag, Credits)}}; + false -> {suspend, L#qstate{state = suspended}} + end; + true -> {suspend, L} + end. + +safe_call(Pid, Msg, ExitValue) -> rabbit_misc:with_exit_handler( - fun () -> 0 end, - fun () -> maybe_call(Limiter, get_limit, 0) end). + fun () -> ExitValue end, + fun () -> gen_server2:call(Pid, Msg, infinity) end). + +resume(L) -> L#qstate{state = active}. -block(Limiter) -> - maybe_call(Limiter, block, ok). +deactivate(L = #qstate{state = dormant}) -> L; +deactivate(L) -> + ok = gen_server:cast(L#qstate.pid, {unregister, self()}), + L#qstate{state = dormant}. + +is_suspended(#qstate{state = suspended}) -> true; +is_suspended(#qstate{}) -> false. + +is_consumer_blocked(#qstate{credits = Credits}, CTag) -> + case gb_trees:lookup(CTag, Credits) of + {value, #credit{credit = C}} when C > 0 -> false; + {value, #credit{}} -> true; + none -> false + end. -unblock(Limiter) -> - maybe_call(Limiter, {unblock, Limiter}, ok). +credit(Limiter = #qstate{credits = Credits}, CTag, Credit, Drain) -> + Limiter#qstate{credits = update_credit(CTag, Credit, Drain, Credits)}. -is_blocked(Limiter) -> - maybe_call(Limiter, is_blocked, false). +drained(Limiter = #qstate{credits = Credits}) -> + {CTagCredits, Credits2} = + rabbit_misc:gb_trees_fold( + fun (CTag, #credit{credit = C, drain = true}, {Acc, Creds0}) -> + {[{CTag, C} | Acc], update_credit(CTag, 0, false, Creds0)}; + (_CTag, #credit{credit = _C, drain = false}, {Acc, Creds0}) -> + {Acc, Creds0} + end, {[], Credits}, Credits), + {CTagCredits, Limiter#qstate{credits = Credits2}}. + +forget_consumer(Limiter = #qstate{credits = Credits}, CTag) -> + Limiter#qstate{credits = gb_trees:delete_any(CTag, Credits)}. + +%%---------------------------------------------------------------------------- +%% Queue-local code +%%---------------------------------------------------------------------------- + +%% We want to do all the AMQP 1.0-ish link level credit calculations +%% in the queue (to do them elsewhere introduces a ton of +%% races). However, it's a big chunk of code that is conceptually very +%% linked to the limiter concept. So we get the queue to hold a bit of +%% state for us (#qstate.credits), and maintain a fiction that the +%% limiter is making the decisions... + +record_send_q(CTag, Credits) -> + case gb_trees:lookup(CTag, Credits) of + {value, #credit{credit = Credit, drain = Drain}} -> + update_credit(CTag, Credit - 1, Drain, Credits); + none -> + Credits + end. + +update_credit(CTag, Credit, Drain, Credits) -> + %% Using up all credit implies no need to send a 'drained' event + Drain1 = Drain andalso Credit > 0, + gb_trees:enter(CTag, #credit{credit = Credit, drain = Drain1}, Credits). %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- -init([]) -> - {ok, #lim{}}. +init([]) -> {ok, #lim{}}. + +prioritise_call(get_prefetch_limit, _From, _Len, _State) -> 9; +prioritise_call(_Msg, _From, _Len, _State) -> 0. -prioritise_call(get_limit, _From, _Len, _State) -> 9; -prioritise_call(_Msg, _From, _Len, _State) -> 0. +handle_call({new, ChPid}, _From, State = #lim{ch_pid = undefined}) -> + {reply, ok, State#lim{ch_pid = ChPid}}; + +handle_call({limit_prefetch, PrefetchCount, UnackedCount}, _From, State) -> + %% assertion + true = State#lim.prefetch_count == 0 orelse + State#lim.volume == UnackedCount, + {reply, ok, maybe_notify(State, State#lim{prefetch_count = PrefetchCount, + volume = UnackedCount})}; + +handle_call(unlimit_prefetch, _From, State) -> + {reply, ok, maybe_notify(State, State#lim{prefetch_count = 0, + volume = 0})}; + +handle_call(block, _From, State) -> + {reply, ok, State#lim{blocked = true}}; + +handle_call(unblock, _From, State) -> + {reply, ok, maybe_notify(State, State#lim{blocked = false})}; + +handle_call(get_prefetch_limit, _From, + State = #lim{prefetch_count = PrefetchCount}) -> + {reply, PrefetchCount, State}; handle_call({can_send, QPid, _AckRequired}, _From, State = #lim{blocked = true}) -> {reply, false, limit_queue(QPid, State)}; handle_call({can_send, QPid, AckRequired}, _From, State = #lim{volume = Volume}) -> - case limit_reached(State) of + case prefetch_limit_reached(State) of true -> {reply, false, limit_queue(QPid, State)}; false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1; true -> Volume end}} - end; - -handle_call(get_limit, _From, State = #lim{prefetch_count = PrefetchCount}) -> - {reply, PrefetchCount, State}; - -handle_call({limit, PrefetchCount, Token}, _From, State) -> - case maybe_notify(State, State#lim{prefetch_count = PrefetchCount}) of - {cont, State1} -> - {reply, ok, State1}; - {stop, State1} -> - {reply, {disabled, Token#token{enabled = false}}, State1} - end; - -handle_call(block, _From, State) -> - {reply, ok, State#lim{blocked = true}}; - -handle_call({unblock, Token}, _From, State) -> - case maybe_notify(State, State#lim{blocked = false}) of - {cont, State1} -> - {reply, ok, State1}; - {stop, State1} -> - {reply, {disabled, Token#token{enabled = false}}, State1} - end; - -handle_call(is_blocked, _From, State) -> - {reply, blocked(State), State}; - -handle_call({enable, Token, Channel, Volume}, _From, State) -> - {reply, Token#token{enabled = true}, - State#lim{ch_pid = Channel, volume = Volume}}; -handle_call({disable, Token}, _From, State) -> - {reply, Token#token{enabled = false}, State}. + end. handle_cast({ack, Count}, State = #lim{volume = Volume}) -> NewVolume = if Volume == 0 -> 0; true -> Volume - Count end, - {cont, State1} = maybe_notify(State, State#lim{volume = NewVolume}), - {noreply, State1}; + {noreply, maybe_notify(State, State#lim{volume = NewVolume})}; handle_cast({register, QPid}, State) -> {noreply, remember_queue(QPid, State)}; @@ -199,27 +383,13 @@ code_change(_, State, _) -> %%---------------------------------------------------------------------------- maybe_notify(OldState, NewState) -> - case (limit_reached(OldState) orelse blocked(OldState)) andalso - not (limit_reached(NewState) orelse blocked(NewState)) of - true -> NewState1 = notify_queues(NewState), - {case NewState1#lim.prefetch_count of - 0 -> stop; - _ -> cont - end, NewState1}; - false -> {cont, NewState} + case (prefetch_limit_reached(OldState) orelse blocked(OldState)) andalso + not (prefetch_limit_reached(NewState) orelse blocked(NewState)) of + true -> notify_queues(NewState); + false -> NewState end. -maybe_call(#token{pid = Pid, enabled = true}, Call, _Default) -> - gen_server2:call(Pid, Call, infinity); -maybe_call(_, _Call, Default) -> - Default. - -maybe_cast(#token{pid = Pid, enabled = true}, Cast) -> - gen_server2:cast(Pid, Cast); -maybe_cast(_, _Call) -> - ok. - -limit_reached(#lim{prefetch_count = Limit, volume = Volume}) -> +prefetch_limit_reached(#lim{prefetch_count = Limit, volume = Volume}) -> Limit =/= 0 andalso Volume >= Limit. blocked(#lim{blocked = Blocked}) -> Blocked. @@ -231,10 +401,9 @@ remember_queue(QPid, State = #lim{queues = Queues}) -> true -> State end. -forget_queue(QPid, State = #lim{ch_pid = ChPid, queues = Queues}) -> +forget_queue(QPid, State = #lim{queues = Queues}) -> case orddict:find(QPid, Queues) of {ok, {MRef, _}} -> true = erlang:demonitor(MRef), - ok = rabbit_amqqueue:unblock(QPid, ChPid), State#lim{queues = orddict:erase(QPid, Queues)}; error -> State end. @@ -251,13 +420,13 @@ notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) -> end, {[], Queues}, Queues), case length(QList) of 0 -> ok; - 1 -> ok = rabbit_amqqueue:unblock(hd(QList), ChPid); %% common case + 1 -> ok = rabbit_amqqueue:resume(hd(QList), ChPid); %% common case L -> %% We randomly vary the position of queues in the list, %% thus ensuring that each queue has an equal chance of %% being notified first. {L1, L2} = lists:split(random:uniform(L), QList), - [[ok = rabbit_amqqueue:unblock(Q, ChPid) || Q <- L3] + [[ok = rabbit_amqqueue:resume(Q, ChPid) || Q <- L3] || L3 <- [L2, L1]], ok end, |