diff options
author | Emile Joubert <emile@rabbitmq.com> | 2013-03-27 14:22:34 +0000 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2013-03-27 14:22:34 +0000 |
commit | 5531872ad348f30afc9a43af8cacdc1420cc4f14 (patch) | |
tree | b0960d062ac909c8240e9771301301f1fc2de98f | |
parent | ededc010f092f61aca0336abeadffa27c46703ee (diff) | |
parent | 810b5f620066304fe85cb3029e7544f984ffe806 (diff) | |
download | rabbitmq-server-5531872ad348f30afc9a43af8cacdc1420cc4f14.tar.gz |
Merged bug25491 into default
-rw-r--r-- | src/file_handle_cache.erl | 4 | ||||
-rw-r--r-- | src/gen_server2.erl | 38 | ||||
-rw-r--r-- | src/gm.erl | 8 | ||||
-rw-r--r-- | src/priority_queue.erl | 70 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 42 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 218 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 161 | ||||
-rw-r--r-- | src/rabbit_channel_sup.erl | 4 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 36 | ||||
-rw-r--r-- | src/rabbit_exchange_decorator.erl | 12 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 409 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 10 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 10 | ||||
-rw-r--r-- | src/rabbit_msg_store_gc.erl | 6 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 23 | ||||
-rw-r--r-- | src/rabbit_registry.erl | 39 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 31 | ||||
-rw-r--r-- | src/rabbit_vhost.erl | 2 | ||||
-rw-r--r-- | src/worker_pool_worker.erl | 6 |
19 files changed, 732 insertions, 397 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index d2d4d295..406add8a 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -152,7 +152,7 @@ -export([ulimit/0]). -export([start_link/0, start_link/2, init/1, handle_call/3, handle_cast/2, - handle_info/2, terminate/2, code_change/3, prioritise_cast/2]). + handle_info/2, terminate/2, code_change/3, prioritise_cast/3]). -define(SERVER, ?MODULE). -define(RESERVED_FOR_OTHERS, 100). @@ -848,7 +848,7 @@ init([AlarmSet, AlarmClear]) -> alarm_set = AlarmSet, alarm_clear = AlarmClear }}. -prioritise_cast(Msg, _State) -> +prioritise_cast(Msg, _Len, _State) -> case Msg of {release, _, _} -> 5; _ -> 0 diff --git a/src/gen_server2.erl b/src/gen_server2.erl index c82327a2..9109febd 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -16,12 +16,15 @@ %% The original code could reorder messages when communicating with a %% process on a remote node that was not currently connected. %% -%% 4) The callback module can optionally implement prioritise_call/3, -%% prioritise_cast/2 and prioritise_info/2. These functions take -%% Message, From and State or just Message and State and return a -%% single integer representing the priority attached to the message. -%% Messages with higher priorities are processed before requests with -%% lower priorities. The default priority is 0. +%% 4) The callback module can optionally implement prioritise_call/4, +%% prioritise_cast/3 and prioritise_info/3. These functions take +%% Message, From, Length and State or just Message, Length and State +%% (where Length is the current number of messages waiting to be +%% processed) and return a single integer representing the priority +%% attached to the message, or 'drop' to ignore it (for +%% prioritise_cast/3 and prioritise_info/3 only). Messages with +%% higher priorities are processed before requests with lower +%% priorities. The default priority is 0. %% %% 5) The callback module can optionally implement %% handle_pre_hibernate/1 and handle_post_hibernate/1. These will be @@ -649,6 +652,9 @@ in({system, _From, _Req} = Input, GS2State) -> in(Input, GS2State = #gs2_state { prioritisers = {_, _, F} }) -> in(Input, F(Input, GS2State), GS2State). +in(_Input, drop, GS2State) -> + GS2State; + in(Input, Priority, GS2State = #gs2_state { queue = Queue }) -> GS2State # gs2_state { queue = priority_queue:in(Input, Priority, Queue) }. @@ -1148,27 +1154,33 @@ whereis_name(Name) -> end. find_prioritisers(GS2State = #gs2_state { mod = Mod }) -> - PCall = function_exported_or_default(Mod, 'prioritise_call', 3, + PCall = function_exported_or_default(Mod, 'prioritise_call', 4, fun (_Msg, _From, _State) -> 0 end), - PCast = function_exported_or_default(Mod, 'prioritise_cast', 2, + PCast = function_exported_or_default(Mod, 'prioritise_cast', 3, fun (_Msg, _State) -> 0 end), - PInfo = function_exported_or_default(Mod, 'prioritise_info', 2, + PInfo = function_exported_or_default(Mod, 'prioritise_info', 3, fun (_Msg, _State) -> 0 end), GS2State #gs2_state { prioritisers = {PCall, PCast, PInfo} }. function_exported_or_default(Mod, Fun, Arity, Default) -> case erlang:function_exported(Mod, Fun, Arity) of true -> case Arity of - 2 -> fun (Msg, GS2State = #gs2_state { state = State }) -> - case catch Mod:Fun(Msg, State) of + 3 -> fun (Msg, GS2State = #gs2_state { queue = Queue, + state = State }) -> + Length = priority_queue:len(Queue), + case catch Mod:Fun(Msg, Length, State) of + drop -> + drop; Res when is_integer(Res) -> Res; Err -> handle_common_termination(Err, Msg, GS2State) end end; - 3 -> fun (Msg, From, GS2State = #gs2_state { state = State }) -> - case catch Mod:Fun(Msg, From, State) of + 4 -> fun (Msg, From, GS2State = #gs2_state { queue = Queue, + state = State }) -> + Length = priority_queue:len(Queue), + case catch Mod:Fun(Msg, From, Length, State) of Res when is_integer(Res) -> Res; Err -> @@ -380,7 +380,7 @@ confirmed_broadcast/2, info/1, forget_group/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3, prioritise_info/2]). + code_change/3, prioritise_info/3]). -ifndef(use_specs). -export([behaviour_info/1]). @@ -721,12 +721,12 @@ terminate(Reason, State = #state { module = Module, code_change(_OldVsn, State, _Extra) -> {ok, State}. -prioritise_info(flush, _State) -> +prioritise_info(flush, _Len, _State) -> 1; -prioritise_info({'DOWN', _MRef, process, _Pid, _Reason}, +prioritise_info({'DOWN', _MRef, process, _Pid, _Reason}, _Len, #state { members_state = MS }) when MS /= undefined -> 1; -prioritise_info(_, _State) -> +prioritise_info(_, _Len, _State) -> 0. diff --git a/src/priority_queue.erl b/src/priority_queue.erl index 02a0a1df..0dc19819 100644 --- a/src/priority_queue.erl +++ b/src/priority_queue.erl @@ -69,9 +69,9 @@ %%---------------------------------------------------------------------------- new() -> - {queue, [], []}. + {queue, [], [], 0}. -is_queue({queue, R, F}) when is_list(R), is_list(F) -> +is_queue({queue, R, F, L}) when is_list(R), is_list(F), is_integer(L) -> true; is_queue({pqueue, Queues}) when is_list(Queues) -> lists:all(fun ({infinity, Q}) -> is_queue(Q); @@ -80,17 +80,17 @@ is_queue({pqueue, Queues}) when is_list(Queues) -> is_queue(_) -> false. -is_empty({queue, [], []}) -> +is_empty({queue, [], [], 0}) -> true; is_empty(_) -> false. -len({queue, R, F}) when is_list(R), is_list(F) -> - length(R) + length(F); +len({queue, _R, _F, L}) -> + L; len({pqueue, Queues}) -> lists:sum([len(Q) || {_, Q} <- Queues]). -to_list({queue, In, Out}) when is_list(In), is_list(Out) -> +to_list({queue, In, Out, _Len}) when is_list(In), is_list(Out) -> [{0, V} || V <- Out ++ lists:reverse(In, [])]; to_list({pqueue, Queues}) -> [{maybe_negate_priority(P), V} || {P, Q} <- Queues, @@ -99,13 +99,13 @@ to_list({pqueue, Queues}) -> in(Item, Q) -> in(Item, 0, Q). -in(X, 0, {queue, [_] = In, []}) -> - {queue, [X], In}; -in(X, 0, {queue, In, Out}) when is_list(In), is_list(Out) -> - {queue, [X|In], Out}; -in(X, Priority, _Q = {queue, [], []}) -> +in(X, 0, {queue, [_] = In, [], 1}) -> + {queue, [X], In, 2}; +in(X, 0, {queue, In, Out, Len}) when is_list(In), is_list(Out) -> + {queue, [X|In], Out, Len + 1}; +in(X, Priority, _Q = {queue, [], [], 0}) -> in(X, Priority, {pqueue, []}); -in(X, Priority, Q = {queue, _, _}) -> +in(X, Priority, Q = {queue, _, _, _}) -> in(X, Priority, {pqueue, [{0, Q}]}); in(X, Priority, {pqueue, Queues}) -> P = maybe_negate_priority(Priority), @@ -113,33 +113,33 @@ in(X, Priority, {pqueue, Queues}) -> {value, {_, Q}} -> lists:keyreplace(P, 1, Queues, {P, in(X, Q)}); false when P == infinity -> - [{P, {queue, [X], []}} | Queues]; + [{P, {queue, [X], [], 1}} | Queues]; false -> case Queues of [{infinity, InfQueue} | Queues1] -> [{infinity, InfQueue} | - lists:keysort(1, [{P, {queue, [X], []}} | Queues1])]; + lists:keysort(1, [{P, {queue, [X], [], 1}} | Queues1])]; _ -> - lists:keysort(1, [{P, {queue, [X], []}} | Queues]) + lists:keysort(1, [{P, {queue, [X], [], 1}} | Queues]) end end}. -out({queue, [], []} = Q) -> +out({queue, [], [], 0} = Q) -> {empty, Q}; -out({queue, [V], []}) -> - {{value, V}, {queue, [], []}}; -out({queue, [Y|In], []}) -> +out({queue, [V], [], 1}) -> + {{value, V}, {queue, [], [], 0}}; +out({queue, [Y|In], [], Len}) -> [V|Out] = lists:reverse(In, []), - {{value, V}, {queue, [Y], Out}}; -out({queue, In, [V]}) when is_list(In) -> - {{value,V}, r2f(In)}; -out({queue, In,[V|Out]}) when is_list(In) -> - {{value, V}, {queue, In, Out}}; + {{value, V}, {queue, [Y], Out}, Len - 1}; +out({queue, In, [V], Len}) when is_list(In) -> + {{value,V}, r2f(In, Len - 1)}; +out({queue, In,[V|Out], Len}) when is_list(In) -> + {{value, V}, {queue, In, Out, Len - 1}}; out({pqueue, [{P, Q} | Queues]}) -> {R, Q1} = out(Q), NewQ = case is_empty(Q1) of true -> case Queues of - [] -> {queue, [], []}; + [] -> {queue, [], [], 0}; [{0, OnlyQ}] -> OnlyQ; [_|_] -> {pqueue, Queues} end; @@ -147,13 +147,13 @@ out({pqueue, [{P, Q} | Queues]}) -> end, {R, NewQ}. -join(A, {queue, [], []}) -> +join(A, {queue, [], [], 0}) -> A; -join({queue, [], []}, B) -> +join({queue, [], [], 0}, B) -> B; -join({queue, AIn, AOut}, {queue, BIn, BOut}) -> - {queue, BIn, AOut ++ lists:reverse(AIn, BOut)}; -join(A = {queue, _, _}, {pqueue, BPQ}) -> +join({queue, AIn, AOut, ALen}, {queue, BIn, BOut, BLen}) -> + {queue, BIn, AOut ++ lists:reverse(AIn, BOut), ALen + BLen}; +join(A = {queue, _, _, _}, {pqueue, BPQ}) -> {Pre, Post} = lists:splitwith(fun ({P, _}) -> P < 0 orelse P == infinity end, BPQ), Post1 = case Post of @@ -162,7 +162,7 @@ join(A = {queue, _, _}, {pqueue, BPQ}) -> _ -> [ {0, A} | Post ] end, {pqueue, Pre ++ Post1}; -join({pqueue, APQ}, B = {queue, _, _}) -> +join({pqueue, APQ}, B = {queue, _, _, _}) -> {Pre, Post} = lists:splitwith(fun ({P, _}) -> P < 0 orelse P == infinity end, APQ), Post1 = case Post of @@ -185,10 +185,10 @@ merge([{PA, A}|As], Bs = [{PB, _}|_], Acc) when PA < PB orelse PA == infinity -> merge(As = [{_, _}|_], [{PB, B}|Bs], Acc) -> merge(As, Bs, [ {PB, B} | Acc ]). -r2f([]) -> {queue, [], []}; -r2f([_] = R) -> {queue, [], R}; -r2f([X,Y]) -> {queue, [X], [Y]}; -r2f([X,Y|R]) -> {queue, [X,Y], lists:reverse(R, [])}. +r2f([], 0) -> {queue, [], [], 0}; +r2f([_] = R, 1) -> {queue, [], R, 1}; +r2f([X,Y], 2) -> {queue, [X], [Y], 2}; +r2f([X,Y|R], L) -> {queue, [X,Y], lists:reverse(R, []), L}. maybe_negate_priority(infinity) -> infinity; maybe_negate_priority(P) -> -P. diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 82ac74fa..8c00c85c 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -26,9 +26,9 @@ -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). -export([force_event_refresh/0, wake_up/1]). -export([consumers/1, consumers_all/1, consumer_info_keys/0]). --export([basic_get/3, basic_consume/7, basic_cancel/4]). --export([notify_sent/2, notify_sent_queue_down/1, unblock/2, flush_all/2]). --export([notify_down_all/2, limit_all/3]). +-export([basic_get/4, basic_consume/9, basic_cancel/4]). +-export([notify_sent/2, notify_sent_queue_down/1, resume/2, flush_all/2]). +-export([notify_down_all/2, activate_limit_all/2, credit/5]). -export([on_node_down/1]). -export([update/2, store_queue/1, policy_changed/2]). -export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1, @@ -144,19 +144,20 @@ -spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok'). -spec(notify_down_all/2 :: (qpids(), pid()) -> ok_or_errors()). --spec(limit_all/3 :: (qpids(), pid(), rabbit_limiter:token()) -> - ok_or_errors()). --spec(basic_get/3 :: (rabbit_types:amqqueue(), pid(), boolean()) -> +-spec(activate_limit_all/2 :: (qpids(), pid()) -> ok_or_errors()). +-spec(basic_get/4 :: (rabbit_types:amqqueue(), pid(), boolean(), pid()) -> {'ok', non_neg_integer(), qmsg()} | 'empty'). --spec(basic_consume/7 :: - (rabbit_types:amqqueue(), boolean(), pid(), - rabbit_limiter:token(), rabbit_types:ctag(), boolean(), any()) +-spec(credit/5 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), + non_neg_integer(), boolean()) -> 'ok'). +-spec(basic_consume/9 :: + (rabbit_types:amqqueue(), boolean(), pid(), pid(), boolean(), + rabbit_types:ctag(), boolean(), {non_neg_integer(), boolean()} | 'none', any()) -> rabbit_types:ok_or_error('exclusive_consume_unavailable')). -spec(basic_cancel/4 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(notify_sent_queue_down/1 :: (pid()) -> 'ok'). --spec(unblock/2 :: (pid(), pid()) -> 'ok'). +-spec(resume/2 :: (pid(), pid()) -> 'ok'). -spec(flush_all/2 :: (qpids(), pid()) -> 'ok'). -spec(internal_delete/1 :: (name()) -> rabbit_types:ok_or_error('not_found') | @@ -538,16 +539,19 @@ notify_down_all(QPids, ChPid) -> Bads1 -> {error, Bads1} end. -limit_all(QPids, ChPid, Limiter) -> - delegate:cast(QPids, {limit, ChPid, Limiter}). +activate_limit_all(QPids, ChPid) -> + delegate:cast(QPids, {activate_limit, ChPid}). -basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> - delegate:call(QPid, {basic_get, ChPid, NoAck}). +credit(#amqqueue{pid = QPid}, ChPid, CTag, Credit, Drain) -> + delegate:cast(QPid, {credit, ChPid, CTag, Credit, Drain}). -basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, Limiter, - ConsumerTag, ExclusiveConsume, OkMsg) -> - delegate:call(QPid, {basic_consume, NoAck, ChPid, - Limiter, ConsumerTag, ExclusiveConsume, OkMsg}). +basic_get(#amqqueue{pid = QPid}, ChPid, NoAck, LimiterPid) -> + delegate:call(QPid, {basic_get, ChPid, NoAck, LimiterPid}). + +basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, LimiterPid, LimiterActive, + ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg) -> + delegate:call(QPid, {basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, + ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg}). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). @@ -569,7 +573,7 @@ notify_sent_queue_down(QPid) -> erase({consumer_credit_to, QPid}), ok. -unblock(QPid, ChPid) -> delegate:cast(QPid, {unblock, ChPid}). +resume(QPid, ChPid) -> delegate:cast(QPid, {resume, ChPid}). flush_all(QPids, ChPid) -> delegate:cast(QPids, {flush, ChPid}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 18b641d4..b016c4d2 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -29,8 +29,8 @@ -export([init_with_backing_queue_state/7]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, - handle_info/2, handle_pre_hibernate/1, prioritise_call/3, - prioritise_cast/2, prioritise_info/2, format_message_queue/2]). + handle_info/2, handle_pre_hibernate/1, prioritise_call/4, + prioritise_cast/3, prioritise_info/3, format_message_queue/2]). %% Queue's state -record(q, {q, @@ -66,9 +66,12 @@ monitor_ref, acktags, consumer_count, + %% Queue of {ChPid, #consumer{}} for consumers which have + %% been blocked for any reason blocked_consumers, + %% The limiter itself limiter, - is_limit_active, + %% Internal flow control for queue -> writer unsent_message_count}). %%---------------------------------------------------------------------------- @@ -165,6 +168,8 @@ terminate(Reason, State = #q{q = #amqqueue{name = QName}, fun (BQS) -> BQS1 = BQ:delete_and_terminate(Reason, BQS), %% don't care if the internal delete doesn't return 'ok'. + rabbit_event:if_enabled(State, #q.stats_timer, + fun() -> emit_stats(State) end), rabbit_amqqueue:internal_delete(QName), BQS1 end, State). @@ -362,17 +367,17 @@ lookup_ch(ChPid) -> C -> C end. -ch_record(ChPid) -> +ch_record(ChPid, LimiterPid) -> Key = {ch, ChPid}, case get(Key) of undefined -> MonitorRef = erlang:monitor(process, ChPid), + Limiter = rabbit_limiter:client(LimiterPid), C = #cr{ch_pid = ChPid, monitor_ref = MonitorRef, acktags = queue:new(), consumer_count = 0, blocked_consumers = queue:new(), - is_limit_active = false, - limiter = rabbit_limiter:make_token(), + limiter = Limiter, unsent_message_count = 0}, put(Key, C), C; @@ -392,37 +397,32 @@ store_ch_record(C = #cr{ch_pid = ChPid}) -> put({ch, ChPid}, C), ok. -erase_ch_record(#cr{ch_pid = ChPid, - limiter = Limiter, - monitor_ref = MonitorRef}) -> - ok = rabbit_limiter:unregister(Limiter, self()), +erase_ch_record(#cr{ch_pid = ChPid, monitor_ref = MonitorRef}) -> erlang:demonitor(MonitorRef), erase({ch, ChPid}), ok. -update_consumer_count(C = #cr{consumer_count = 0, limiter = Limiter}, +1) -> - ok = rabbit_limiter:register(Limiter, self()), - update_ch_record(C#cr{consumer_count = 1}); -update_consumer_count(C = #cr{consumer_count = 1, limiter = Limiter}, -1) -> - ok = rabbit_limiter:unregister(Limiter, self()), - update_ch_record(C#cr{consumer_count = 0, - limiter = rabbit_limiter:make_token()}); -update_consumer_count(C = #cr{consumer_count = Count}, Delta) -> - update_ch_record(C#cr{consumer_count = Count + Delta}). - all_ch_record() -> [C || {{ch, _}, C} <- get()]. block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry) -> update_ch_record(C#cr{blocked_consumers = queue:in(QEntry, Blocked)}). -is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) -> - Limited orelse Count >= ?UNSENT_MESSAGE_LIMIT. +is_ch_blocked(#cr{unsent_message_count = Count, limiter = Limiter}) -> + Count >= ?UNSENT_MESSAGE_LIMIT orelse rabbit_limiter:is_suspended(Limiter). -ch_record_state_transition(OldCR, NewCR) -> - case {is_ch_blocked(OldCR), is_ch_blocked(NewCR)} of - {true, false} -> unblock; - {false, true} -> block; - {_, _} -> ok +maybe_send_drained(WasEmpty, State) -> + case (not WasEmpty) andalso is_empty(State) of + true -> [send_drained(C) || C <- all_ch_record()]; + false -> ok + end, + State. + +send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) -> + case rabbit_limiter:drained(Limiter) of + {[], Limiter} -> ok; + {CTagCredit, Limiter2} -> rabbit_channel:send_drained( + ChPid, CTagCredit), + update_ch_record(C#cr{limiter = Limiter2}) end. deliver_msgs_to_consumers(_DeliverFun, true, State) -> @@ -440,18 +440,21 @@ deliver_msgs_to_consumers(DeliverFun, false, end. deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) -> - C = ch_record(ChPid), + C = lookup_ch(ChPid), case is_ch_blocked(C) of true -> block_consumer(C, E), {false, State}; - false -> case rabbit_limiter:can_send(C#cr.limiter, self(), - Consumer#consumer.ack_required) of - false -> block_consumer(C#cr{is_limit_active = true}, E), - {false, State}; - true -> AC1 = queue:in(E, State#q.active_consumers), - deliver_msg_to_consumer( - DeliverFun, Consumer, C, - State#q{active_consumers = AC1}) + false -> case rabbit_limiter:can_send(C#cr.limiter, + Consumer#consumer.ack_required, + Consumer#consumer.tag) of + {suspend, Limiter} -> + block_consumer(C#cr{limiter = Limiter}, E), + {false, State}; + {continue, Limiter} -> + AC1 = queue:in(E, State#q.active_consumers), + deliver_msg_to_consumer( + DeliverFun, Consumer, C#cr{limiter = Limiter}, + State#q{active_consumers = AC1}) end end. @@ -602,14 +605,16 @@ maybe_drop_head(State = #q{max_length = MaxLen, requeue_and_run(AckTags, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> + WasEmpty = BQ:is_empty(BQS), {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS), {_Dropped, State1} = maybe_drop_head(State#q{backing_queue_state = BQS1}), - run_message_queue(drop_expired_msgs(State1)). + run_message_queue(maybe_send_drained(WasEmpty, drop_expired_msgs(State1))). fetch(AckRequired, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {Result, BQS1} = BQ:fetch(AckRequired, BQS), - {Result, drop_expired_msgs(State#q{backing_queue_state = BQS1})}. + State1 = drop_expired_msgs(State#q{backing_queue_state = BQS1}), + {Result, maybe_send_drained(Result =:= empty, State1)}. ack(AckTags, ChPid, State) -> subtract_acks(ChPid, AckTags, State, @@ -638,20 +643,29 @@ remove_consumers(ChPid, Queue, QName) -> possibly_unblock(State, ChPid, Update) -> case lookup_ch(ChPid) of - not_found -> + not_found -> State; + C -> C1 = Update(C), + case is_ch_blocked(C) andalso not is_ch_blocked(C1) of + false -> update_ch_record(C1), + State; + true -> unblock(State, C1) + end + end. + +unblock(State, C = #cr{limiter = Limiter}) -> + case lists:partition( + fun({_ChPid, #consumer{tag = CTag}}) -> + rabbit_limiter:is_consumer_blocked(Limiter, CTag) + end, queue:to_list(C#cr.blocked_consumers)) of + {_, []} -> + update_ch_record(C), State; - C -> - C1 = Update(C), - case ch_record_state_transition(C, C1) of - ok -> update_ch_record(C1), - State; - unblock -> #cr{blocked_consumers = Consumers} = C1, - update_ch_record( - C1#cr{blocked_consumers = queue:new()}), - AC1 = queue:join(State#q.active_consumers, - Consumers), - run_message_queue(State#q{active_consumers = AC1}) - end + {Blocked, Unblocked} -> + BlockedQ = queue:from_list(Blocked), + UnblockedQ = queue:from_list(Unblocked), + update_ch_record(C#cr{blocked_consumers = BlockedQ}), + AC1 = queue:join(State#q.active_consumers, UnblockedQ), + run_message_queue(State#q{active_consumers = AC1}) end. should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false; @@ -748,6 +762,11 @@ calculate_msg_expiry(#basic_message{content = Content}, TTL) -> T -> now_micros() + T * 1000 end. +%% Logically this function should invoke maybe_send_drained/2. +%% However, that is expensive. Since some frequent callers of +%% drop_expired_msgs/1, in particular deliver_or_enqueue/3, cannot +%% possibly cause the queue to become empty, we push the +%% responsibility to the callers. So be cautious when adding new ones. drop_expired_msgs(State) -> case is_empty(State) of true -> State; @@ -1025,7 +1044,7 @@ emit_consumer_deleted(ChPid, ConsumerTag, QName) -> %%---------------------------------------------------------------------------- -prioritise_call(Msg, _From, _State) -> +prioritise_call(Msg, _From, _Len, _State) -> case Msg of info -> 9; {info, _Items} -> 9; @@ -1034,7 +1053,7 @@ prioritise_call(Msg, _From, _State) -> _ -> 0 end. -prioritise_cast(Msg, _State) -> +prioritise_cast(Msg, _Len, _State) -> case Msg of delete_immediately -> 8; {set_ram_duration_target, _Duration} -> 8; @@ -1043,7 +1062,7 @@ prioritise_cast(Msg, _State) -> _ -> 0 end. -prioritise_info(Msg, #q{q = #amqqueue{exclusive_owner = DownPid}}) -> +prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) -> case Msg of {'DOWN', _, process, DownPid, _} -> 8; update_ram_duration -> 8; @@ -1108,7 +1127,7 @@ handle_call({notify_down, ChPid}, From, State) -> {stop, State1} -> stop(From, ok, State1) end; -handle_call({basic_get, ChPid, NoAck}, _From, +handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From, State = #q{q = #amqqueue{name = QName}}) -> AckRequired = not NoAck, State1 = ensure_expiry_timer(State), @@ -1118,7 +1137,8 @@ handle_call({basic_get, ChPid, NoAck}, _From, {{Message, IsDelivered, AckTag}, State2} -> State3 = #q{backing_queue = BQ, backing_queue_state = BQS} = case AckRequired of - true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), + true -> C = #cr{acktags = ChAckTags} = + ch_record(ChPid, LimiterPid), ChAckTags1 = queue:in(AckTag, ChAckTags), update_ch_record(C#cr{acktags = ChAckTags1}), State2; @@ -1128,15 +1148,30 @@ handle_call({basic_get, ChPid, NoAck}, _From, reply({ok, BQ:len(BQS), Msg}, State3) end; -handle_call({basic_consume, NoAck, ChPid, Limiter, - ConsumerTag, ExclusiveConsume, OkMsg}, +handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, + ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg}, _From, State = #q{exclusive_consumer = Holder}) -> case check_exclusive_access(Holder, ExclusiveConsume, State) of in_use -> reply({error, exclusive_consume_unavailable}, State); ok -> - C = ch_record(ChPid), - update_consumer_count(C#cr{limiter = Limiter}, +1), + C = #cr{consumer_count = Count, + limiter = Limiter} = ch_record(ChPid, LimiterPid), + Limiter1 = case LimiterActive of + true -> rabbit_limiter:activate(Limiter); + false -> Limiter + end, + Limiter2 = case CreditArgs of + none -> Limiter1; + {Crd, Drain} -> rabbit_limiter:credit( + Limiter1, ConsumerTag, Crd, Drain) + end, + C1 = update_ch_record(C#cr{consumer_count = Count + 1, + limiter = Limiter2}), + case is_empty(State) of + true -> send_drained(C1); + false -> ok + end, Consumer = #consumer{tag = ConsumerTag, ack_required = not NoAck}, ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag}; @@ -1157,10 +1192,19 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From, case lookup_ch(ChPid) of not_found -> reply(ok, State); - C = #cr{blocked_consumers = Blocked} -> + C = #cr{consumer_count = Count, + limiter = Limiter, + blocked_consumers = Blocked} -> emit_consumer_deleted(ChPid, ConsumerTag, qname(State)), Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked), - update_consumer_count(C#cr{blocked_consumers = Blocked1}, -1), + Limiter1 = case Count of + 1 -> rabbit_limiter:deactivate(Limiter); + _ -> Limiter + end, + Limiter2 = rabbit_limiter:forget_consumer(Limiter1, ConsumerTag), + update_ch_record(C#cr{consumer_count = Count - 1, + limiter = Limiter2, + blocked_consumers = Blocked1}), State1 = State#q{ exclusive_consumer = case Holder of {ChPid, ConsumerTag} -> none; @@ -1193,7 +1237,8 @@ handle_call({delete, IfUnused, IfEmpty}, From, handle_call(purge, _From, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {Count, BQS1} = BQ:purge(BQS), - reply({ok, Count}, State#q{backing_queue_state = BQS1}); + State1 = State#q{backing_queue_state = BQS1}, + reply({ok, Count}, maybe_send_drained(Count =:= 0, State1)); handle_call({requeue, AckTags, ChPid}, From, State) -> gen_server2:reply(From, ok), @@ -1288,10 +1333,12 @@ handle_cast({reject, AckTags, false, ChPid}, State) -> handle_cast(delete_immediately, State) -> stop(State); -handle_cast({unblock, ChPid}, State) -> +handle_cast({resume, ChPid}, State) -> noreply( possibly_unblock(State, ChPid, - fun (C) -> C#cr{is_limit_active = false} end)); + fun (C = #cr{limiter = Limiter}) -> + C#cr{limiter = rabbit_limiter:resume(Limiter)} + end)); handle_cast({notify_sent, ChPid, Credit}, State) -> noreply( @@ -1300,21 +1347,12 @@ handle_cast({notify_sent, ChPid, Credit}, State) -> C#cr{unsent_message_count = Count - Credit} end)); -handle_cast({limit, ChPid, Limiter}, State) -> +handle_cast({activate_limit, ChPid}, State) -> noreply( - possibly_unblock( - State, ChPid, - fun (C = #cr{consumer_count = ConsumerCount, - limiter = OldLimiter, - is_limit_active = OldLimited}) -> - case (ConsumerCount =/= 0 andalso - not rabbit_limiter:is_enabled(OldLimiter)) of - true -> ok = rabbit_limiter:register(Limiter, self()); - false -> ok - end, - Limited = OldLimited andalso rabbit_limiter:is_enabled(Limiter), - C#cr{limiter = Limiter, is_limit_active = Limited} - end)); + possibly_unblock(State, ChPid, + fun (C = #cr{limiter = Limiter}) -> + C#cr{limiter = rabbit_limiter:activate(Limiter)} + end)); handle_cast({flush, ChPid}, State) -> ok = rabbit_channel:flushed(ChPid, self()), @@ -1346,6 +1384,24 @@ handle_cast(stop_mirroring, State = #q{backing_queue = BQ, noreply(State#q{backing_queue = BQ1, backing_queue_state = BQS1}); +handle_cast({credit, ChPid, CTag, Credit, Drain}, + State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + Len = BQ:len(BQS), + rabbit_channel:send_credit_reply(ChPid, Len), + C = #cr{limiter = Limiter} = lookup_ch(ChPid), + C1 = C#cr{limiter = rabbit_limiter:credit(Limiter, CTag, Credit, Drain)}, + noreply(case Drain andalso Len == 0 of + true -> update_ch_record(C1), + send_drained(C1), + State; + false -> case is_ch_blocked(C1) of + true -> update_ch_record(C1), + State; + false -> unblock(State, C1) + end + end); + handle_cast(wake_up, State) -> noreply(State). @@ -1365,7 +1421,9 @@ handle_info(maybe_expire, State) -> end; handle_info(drop_expired, State) -> - noreply(drop_expired_msgs(State#q{ttl_timer_ref = undefined})); + WasEmpty = is_empty(State), + State1 = drop_expired_msgs(State#q{ttl_timer_ref = undefined}), + noreply(maybe_send_drained(WasEmpty, State1)); handle_info(emit_stats, State) -> emit_stats(State), diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 792a06c9..52c6140e 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -21,14 +21,15 @@ -behaviour(gen_server2). -export([start_link/11, do/2, do/3, do_flow/3, flush/1, shutdown/1]). --export([send_command/2, deliver/4, flushed/2]). +-export([send_command/2, deliver/4, send_credit_reply/2, send_drained/2, + flushed/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). -export([refresh_config_local/0, ready_for_close/1]). -export([force_event_refresh/0]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, - handle_info/2, handle_pre_hibernate/1, prioritise_call/3, - prioritise_cast/2, prioritise_info/2, format_message_queue/2]). + handle_info/2, handle_pre_hibernate/1, prioritise_call/4, + prioritise_cast/3, prioritise_info/3, format_message_queue/2]). %% Internal -export([list_local/0]). @@ -81,8 +82,8 @@ -spec(start_link/11 :: (channel_number(), pid(), pid(), pid(), string(), rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(), - rabbit_framing:amqp_table(), - pid(), rabbit_limiter:token()) -> rabbit_types:ok_pid_or_error()). + rabbit_framing:amqp_table(), pid(), pid()) -> + rabbit_types:ok_pid_or_error()). -spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(), rabbit_types:maybe(rabbit_types:content())) -> 'ok'). @@ -94,6 +95,9 @@ -spec(deliver/4 :: (pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg()) -> 'ok'). +-spec(send_credit_reply/2 :: (pid(), non_neg_integer()) -> 'ok'). +-spec(send_drained/2 :: (pid(), [{rabbit_types:ctag(), non_neg_integer()}]) + -> 'ok'). -spec(flushed/2 :: (pid(), pid()) -> 'ok'). -spec(list/0 :: () -> [pid()]). -spec(list_local/0 :: () -> [pid()]). @@ -138,6 +142,12 @@ send_command(Pid, Msg) -> deliver(Pid, ConsumerTag, AckRequired, Msg) -> gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}). +send_credit_reply(Pid, Len) -> + gen_server2:cast(Pid, {send_credit_reply, Len}). + +send_drained(Pid, CTagCredit) -> + gen_server2:cast(Pid, {send_drained, CTagCredit}). + flushed(Pid, QPid) -> gen_server2:cast(Pid, {flushed, QPid}). @@ -180,7 +190,7 @@ force_event_refresh() -> %%--------------------------------------------------------------------------- init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, - Capabilities, CollectorPid, Limiter]) -> + Capabilities, CollectorPid, LimiterPid]) -> process_flag(trap_exit, true), ok = pg_local:join(rabbit_channels, self()), State = #ch{state = starting, @@ -190,7 +200,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, writer_pid = WriterPid, conn_pid = ConnPid, conn_name = ConnName, - limiter = Limiter, + limiter = rabbit_limiter:new(LimiterPid), tx = none, next_tag = 1, unacked_message_q = queue:new(), @@ -217,20 +227,20 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, {ok, State1, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -prioritise_call(Msg, _From, _State) -> +prioritise_call(Msg, _From, _Len, _State) -> case Msg of info -> 9; {info, _Items} -> 9; _ -> 0 end. -prioritise_cast(Msg, _State) -> +prioritise_cast(Msg, _Len, _State) -> case Msg of {confirm, _MsgSeqNos, _QPid} -> 5; _ -> 0 end. -prioritise_info(Msg, _State) -> +prioritise_info(Msg, _Len, _State) -> case Msg of emit_stats -> 7; _ -> 0 @@ -315,6 +325,18 @@ handle_cast({deliver, ConsumerTag, AckRequired, Content), noreply(record_sent(ConsumerTag, AckRequired, Msg, State)); +handle_cast({send_credit_reply, Len}, State = #ch{writer_pid = WriterPid}) -> + ok = rabbit_writer:send_command( + WriterPid, #'basic.credit_ok'{available = Len}), + noreply(State); + +handle_cast({send_drained, CTagCredit}, State = #ch{writer_pid = WriterPid}) -> + [ok = rabbit_writer:send_command( + WriterPid, #'basic.credit_drained'{consumer_tag = ConsumerTag, + credit_drained = CreditDrained}) + || {ConsumerTag, CreditDrained} <- CTagCredit], + noreply(State); + handle_cast(force_event_refresh, State) -> rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), noreply(State); @@ -372,6 +394,8 @@ terminate(Reason, State) -> _ -> ok end, pg_local:leave(rabbit_channels, self()), + rabbit_event:if_enabled(State, #ch.stats_timer, + fun() -> emit_stats(State) end), rabbit_event:notify(channel_closed, [{pid, self()}]). code_change(_OldVsn, State, _Extra) -> @@ -676,12 +700,15 @@ handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, _, State = #ch{writer_pid = WriterPid, conn_pid = ConnPid, + limiter = Limiter, next_tag = DeliveryTag}) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_read_permitted(QueueName, State), case rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ConnPid, - fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of + fun (Q) -> rabbit_amqqueue:basic_get( + Q, self(), NoAck, rabbit_limiter:pid(Limiter)) + end) of {ok, MessageCount, Msg = {QName, QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, @@ -706,7 +733,8 @@ handle_method(#'basic.consume'{queue = QueueNameBin, no_local = _, % FIXME: implement no_ack = NoAck, exclusive = ExclusiveConsume, - nowait = NoWait}, + nowait = NoWait, + arguments = Arguments}, _, State = #ch{conn_pid = ConnPid, limiter = Limiter, consumer_mapping = ConsumerMapping}) -> @@ -728,8 +756,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin, QueueName, ConnPid, fun (Q) -> {rabbit_amqqueue:basic_consume( - Q, NoAck, self(), Limiter, + Q, NoAck, self(), + rabbit_limiter:pid(Limiter), + rabbit_limiter:is_active(Limiter), ActualConsumerTag, ExclusiveConsume, + parse_credit_args(Arguments), ok_msg(NoWait, #'basic.consume_ok'{ consumer_tag = ActualConsumerTag})), Q} @@ -803,19 +834,17 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 -> rabbit_misc:protocol_error(not_implemented, "prefetch_size!=0 (~w)", [Size]); -handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, _, +handle_method(#'basic.qos'{prefetch_count = 0}, _, State = #ch{limiter = Limiter}) -> - Limiter1 = case {rabbit_limiter:is_enabled(Limiter), PrefetchCount} of - {false, 0} -> Limiter; - {false, _} -> enable_limiter(State); - {_, _} -> Limiter - end, - Limiter3 = case rabbit_limiter:limit(Limiter1, PrefetchCount) of - ok -> Limiter1; - {disabled, Limiter2} -> ok = limit_queues(Limiter2, State), - Limiter2 - end, - {reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter3}}; + Limiter1 = rabbit_limiter:unlimit_prefetch(Limiter), + {reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter1}}; + +handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, _, + State = #ch{limiter = Limiter, unacked_message_q = UAMQ}) -> + Limiter1 = rabbit_limiter:limit_prefetch(Limiter, + PrefetchCount, queue:len(UAMQ)), + {reply, #'basic.qos_ok'{}, + maybe_limit_queues(Limiter, Limiter1, State#ch{limiter = Limiter1})}; handle_method(#'basic.recover_async'{requeue = true}, _, State = #ch{unacked_message_q = UAMQ, @@ -1078,25 +1107,45 @@ handle_method(#'confirm.select'{nowait = NoWait}, _, State) -> handle_method(#'channel.flow'{active = true}, _, State = #ch{limiter = Limiter}) -> - Limiter2 = case rabbit_limiter:unblock(Limiter) of - ok -> Limiter; - {disabled, Limiter1} -> ok = limit_queues(Limiter1, State), - Limiter1 - end, - {reply, #'channel.flow_ok'{active = true}, State#ch{limiter = Limiter2}}; + Limiter1 = rabbit_limiter:unblock(Limiter), + {reply, #'channel.flow_ok'{active = true}, + maybe_limit_queues(Limiter, Limiter1, State#ch{limiter = Limiter1})}; handle_method(#'channel.flow'{active = false}, _, State = #ch{consumer_mapping = Consumers, limiter = Limiter}) -> - Limiter1 = case rabbit_limiter:is_enabled(Limiter) of - true -> Limiter; - false -> enable_limiter(State) - end, - State1 = State#ch{limiter = Limiter1}, - ok = rabbit_limiter:block(Limiter1), - QPids = consumer_queues(Consumers), - ok = rabbit_amqqueue:flush_all(QPids, self()), - {noreply, maybe_send_flow_ok(State1#ch{blocking = sets:from_list(QPids)})}; + case rabbit_limiter:is_blocked(Limiter) of + true -> {noreply, maybe_send_flow_ok(State)}; + false -> Limiter1 = rabbit_limiter:block(Limiter), + State1 = maybe_limit_queues(Limiter, Limiter1, + State#ch{limiter = Limiter1}), + %% The semantics of channel.flow{active=false} + %% require that no messages are delivered after the + %% channel.flow_ok has been sent. We accomplish that + %% by "flushing" all messages in flight from the + %% consumer queues to us. To do this we tell all the + %% queues to invoke rabbit_channel:flushed/2, which + %% will send us a {flushed, ...} message that appears + %% *after* all the {deliver, ...} messages. We keep + %% track of all the QPids thus asked, and once all of + %% them have responded (or died) we send the + %% channel.flow_ok. + QPids = consumer_queues(Consumers), + ok = rabbit_amqqueue:flush_all(QPids, self()), + {noreply, maybe_send_flow_ok( + State1#ch{blocking = sets:from_list(QPids)})} + end; + +handle_method(#'basic.credit'{consumer_tag = CTag, + credit = Credit, + drain = Drain}, _, + State = #ch{consumer_mapping = Consumers}) -> + case dict:find(CTag, Consumers) of + {ok, Q} -> ok = rabbit_amqqueue:credit( + Q, self(), CTag, Credit, Drain), + {noreply, State}; + error -> precondition_failed("unknown consumer tag '~s'", [CTag]) + end; handle_method(_MethodRecord, _Content, _State) -> rabbit_misc:protocol_error( @@ -1164,6 +1213,16 @@ handle_consuming_queue_down(QPid, handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) -> State#ch{delivering_queues = sets:del_element(QPid, DQ)}. +parse_credit_args(Arguments) -> + case rabbit_misc:table_lookup(Arguments, <<"x-credit">>) of + {table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>), + rabbit_misc:table_lookup(T, <<"drain">>)} of + {{long, Credit}, {boolean, Drain}} -> {Credit, Drain}; + _ -> none + end; + undefined -> none + end. + binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, RoutingKey, Arguments, ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath, @@ -1332,14 +1391,14 @@ foreach_per_queue(F, UAL) -> end, gb_trees:empty(), UAL), rabbit_misc:gb_trees_foreach(F, T). -enable_limiter(State = #ch{unacked_message_q = UAMQ, - limiter = Limiter}) -> - Limiter1 = rabbit_limiter:enable(Limiter, queue:len(UAMQ)), - ok = limit_queues(Limiter1, State), - Limiter1. - -limit_queues(Limiter, #ch{consumer_mapping = Consumers}) -> - rabbit_amqqueue:limit_all(consumer_queues(Consumers), self(), Limiter). +maybe_limit_queues(OldLimiter, NewLimiter, State) -> + case ((not rabbit_limiter:is_active(OldLimiter)) andalso + rabbit_limiter:is_active(NewLimiter)) of + true -> Queues = consumer_queues(State#ch.consumer_mapping), + rabbit_amqqueue:activate_limit_all(Queues, self()); + false -> ok + end, + State. consumer_queues(Consumers) -> lists:usort([QPid || @@ -1350,7 +1409,9 @@ consumer_queues(Consumers) -> %% messages sent in a response to a basic.get (identified by their %% 'none' consumer tag) notify_limiter(Limiter, Acked) -> - case rabbit_limiter:is_enabled(Limiter) of + %% optimisation: avoid the potentially expensive 'foldl' in the + %% common case. + case rabbit_limiter:is_prefetch_limited(Limiter) of false -> ok; true -> case lists:foldl(fun ({_, none, _}, Acc) -> Acc; ({_, _, _}, Acc) -> Acc + 1 @@ -1515,7 +1576,7 @@ i(messages_uncommitted, #ch{}) -> 0; i(acks_uncommitted, #ch{tx = {_Msgs, Acks}}) -> ack_len(Acks); i(acks_uncommitted, #ch{}) -> 0; i(prefetch_count, #ch{limiter = Limiter}) -> - rabbit_limiter:get_limit(Limiter); + rabbit_limiter:get_prefetch_limit(Limiter); i(client_flow_blocked, #ch{limiter = Limiter}) -> rabbit_limiter:is_blocked(Limiter); i(Item, _) -> diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index 8ea44a81..a0c7624b 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -58,7 +58,7 @@ start_link({tcp, Sock, Channel, FrameMax, ReaderPid, ConnName, Protocol, User, {channel, {rabbit_channel, start_link, [Channel, ReaderPid, WriterPid, ReaderPid, ConnName, Protocol, User, VHost, Capabilities, Collector, - rabbit_limiter:make_token(LimiterPid)]}, + LimiterPid]}, intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), {ok, AState} = rabbit_command_assembler:init(Protocol), {ok, SupPid, {ChannelPid, AState}}; @@ -72,7 +72,7 @@ start_link({direct, Channel, ClientChannelPid, ConnPid, ConnName, Protocol, {channel, {rabbit_channel, start_link, [Channel, ClientChannelPid, ClientChannelPid, ConnPid, ConnName, Protocol, User, VHost, Capabilities, Collector, - rabbit_limiter:make_token(LimiterPid)]}, + LimiterPid]}, intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), {ok, SupPid, {ChannelPid, none}}. diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 5f4fb9ec..9e98448d 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -326,34 +326,34 @@ route(#exchange{name = #resource{virtual_host = VHost, %% Optimisation [rabbit_misc:r(VHost, queue, RK) || RK <- lists:usort(RKs)]; {Decorators, _} -> - QNames = route1(Delivery, {[X], XName, []}), - lists:usort(decorate_route(Decorators, X, Delivery, QNames)) + lists:usort(route1(Delivery, Decorators, {[X], XName, []})) end. -decorate_route([], _X, _Delivery, QNames) -> +route1(_, _, {[], _, QNames}) -> QNames; -decorate_route(Decorators, X, Delivery, QNames) -> - QNames ++ - lists:append([Decorator:route(X, Delivery) || Decorator <- Decorators]). - -route1(_, {[], _, QNames}) -> - QNames; -route1(Delivery, {[X = #exchange{type = Type} | WorkList], SeenXs, QNames}) -> - DstNames = process_alternate( - X, ((type_to_module(Type)):route(X, Delivery))), - route1(Delivery, +route1(Delivery, Decorators, + {[X = #exchange{type = Type} | WorkList], SeenXs, QNames}) -> + ExchangeDests = (type_to_module(Type)):route(X, Delivery), + DecorateDests = process_decorators(X, Decorators, Delivery), + AlternateDests = process_alternate(X, ExchangeDests), + route1(Delivery, Decorators, lists:foldl(fun process_route/2, {WorkList, SeenXs, QNames}, - DstNames)). + AlternateDests ++ DecorateDests ++ ExchangeDests)). -process_alternate(#exchange{arguments = []}, Results) -> %% optimisation - Results; +process_alternate(#exchange{arguments = []}, _Results) -> %% optimisation + []; process_alternate(#exchange{name = XName, arguments = Args}, []) -> case rabbit_misc:r_arg(XName, exchange, Args, <<"alternate-exchange">>) of undefined -> []; AName -> [AName] end; -process_alternate(_X, Results) -> - Results. +process_alternate(_X, _Results) -> + []. + +process_decorators(_, [], _) -> %% optimisation + []; +process_decorators(X, Decorators, Delivery) -> + lists:append([Decorator:route(X, Delivery) || Decorator <- Decorators]). process_route(#resource{kind = exchange} = XName, {_WorkList, XName, _QNames} = Acc) -> diff --git a/src/rabbit_exchange_decorator.erl b/src/rabbit_exchange_decorator.erl index 8f17adfc..040b55db 100644 --- a/src/rabbit_exchange_decorator.erl +++ b/src/rabbit_exchange_decorator.erl @@ -57,12 +57,10 @@ -callback remove_bindings(serial(), rabbit_types:exchange(), [rabbit_types:binding()]) -> 'ok'. -%% called after exchange routing -%% return value is a list of queues to be added to the list of -%% destination queues. decorators must register separately for -%% this callback using exchange_decorator_route. --callback route(rabbit_types:exchange(), rabbit_types:delivery()) -> - [rabbit_amqqueue:name()]. +%% Decorators can optionally implement route/2 which allows additional +%% destinations to be added to the routing decision. +%% -callback route(rabbit_types:exchange(), rabbit_types:delivery()) -> +%% [rabbit_amqqueue:name() | rabbit_exchange:name()]. -else. @@ -70,7 +68,7 @@ behaviour_info(callbacks) -> [{description, 0}, {serialise_events, 1}, {create, 2}, {delete, 3}, - {policy_changed, 2}, {add_binding, 3}, {remove_bindings, 3}, {route, 2}]; + {policy_changed, 2}, {add_binding, 3}, {remove_bindings, 3}]; behaviour_info(_Other) -> undefined. diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 8a7d14fe..d9f1170e 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -14,42 +14,165 @@ %% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% +%% The purpose of the limiter is to stem the flow of messages from +%% queues to channels, in order to act upon various protocol-level +%% flow control mechanisms, specifically AMQP 0-9-1's basic.qos +%% prefetch_count and channel.flow, and AMQP 1.0's link (aka consumer) +%% credit mechanism. +%% +%% Each channel has an associated limiter process, created with +%% start_link/1, which it passes to queues on consumer creation with +%% rabbit_amqqueue:basic_consume/9, and rabbit_amqqueue:basic_get/4. +%% The latter isn't strictly necessary, since basic.get is not +%% subject to limiting, but it means that whenever a queue knows about +%% a channel, it also knows about its limiter, which is less fiddly. +%% +%% The limiter process holds state that is, in effect, shared between +%% the channel and all queues from which the channel is +%% consuming. Essentially all these queues are competing for access to +%% a single, limited resource - the ability to deliver messages via +%% the channel - and it is the job of the limiter process to mediate +%% that access. +%% +%% The limiter process is separate from the channel process for two +%% reasons: separation of concerns, and efficiency. Channels can get +%% very busy, particularly if they are also dealing with publishes. +%% With a separate limiter process all the aforementioned access +%% mediation can take place without touching the channel. +%% +%% For efficiency, both the channel and the queues keep some local +%% state, initialised from the limiter pid with new/1 and client/1, +%% respectively. In particular this allows them to avoid any +%% interaction with the limiter process when it is 'inactive', i.e. no +%% protocol-level flow control is taking place. +%% +%% This optimisation does come at the cost of some complexity though: +%% when a limiter becomes active, the channel needs to inform all its +%% consumer queues of this change in status. It does this by invoking +%% rabbit_amqqueue:activate_limit_all/2. Note that there is no inverse +%% transition, i.e. once a queue has been told about an active +%% limiter, it is not subsequently told when that limiter becomes +%% inactive. In practice it is rare for that to happen, though we +%% could optimise this case in the future. +%% +%% In addition, the consumer credit bookkeeping is local to queues, so +%% it is not necessary to store information about it in the limiter +%% process. But for abstraction we hide it from the queue behind the +%% limiter API, and it therefore becomes part of the queue local +%% state. +%% +%% The interactions with the limiter are as follows: +%% +%% 1. Channels tell the limiter about basic.qos prefetch counts - +%% that's what the limit_prefetch/3, unlimit_prefetch/1, +%% is_prefetch_limited/1, get_prefetch_limit/1 API functions are +%% about - and channel.flow blocking - that's what block/1, +%% unblock/1 and is_blocked/1 are for. They also tell the limiter +%% queue state (via the queue) about consumer credit changes - +%% that's what credit/4 is for. +%% +%% 2. Queues also tell the limiter queue state about the queue +%% becoming empty (via drained/1) and consumers leaving (via +%% forget_consumer/2). +%% +%% 3. Queues register with the limiter - this happens as part of +%% activate/1. +%% +%% 4. The limiter process maintains an internal counter of 'messages +%% sent but not yet acknowledged', called the 'volume'. +%% +%% 5. Queues ask the limiter for permission (with can_send/3) whenever +%% they want to deliver a message to a channel. The limiter checks +%% whether a) the channel isn't blocked by channel.flow, b) the +%% volume has not yet reached the prefetch limit, and c) whether +%% the consumer has enough credit. If so it increments the volume +%% and tells the queue to proceed. Otherwise it marks the queue as +%% requiring notification (see below) and tells the queue not to +%% proceed. +%% +%% 6. A queue that has been told to proceed (by the return value of +%% can_send/3) sends the message to the channel. Conversely, a +%% queue that has been told not to proceed, will not attempt to +%% deliver that message, or any future messages, to the +%% channel. This is accomplished by can_send/3 capturing the +%% outcome in the local state, where it can be accessed with +%% is_suspended/1. +%% +%% 7. When a channel receives an ack it tells the limiter (via ack/2) +%% how many messages were ack'ed. The limiter process decrements +%% the volume and if it falls below the prefetch_count then it +%% notifies (through rabbit_amqqueue:resume/2) all the queues +%% requiring notification, i.e. all those that had a can_send/3 +%% request denied. +%% +%% 8. Upon receipt of such a notification, queues resume delivery to +%% the channel, i.e. they will once again start asking limiter, as +%% described in (5). +%% +%% 9. When a queue has no more consumers associated with a particular +%% channel, it deactivates use of the limiter with deactivate/1, +%% which alters the local state such that no further interactions +%% with the limiter process take place until a subsequent +%% activate/1. + -module(rabbit_limiter). -behaviour(gen_server2). +-export([start_link/0]). +%% channel API +-export([new/1, limit_prefetch/3, unlimit_prefetch/1, block/1, unblock/1, + is_prefetch_limited/1, is_blocked/1, is_active/1, + get_prefetch_limit/1, ack/2, pid/1]). +%% queue API +-export([client/1, activate/1, can_send/3, resume/1, deactivate/1, + is_suspended/1, is_consumer_blocked/2, credit/4, drained/1, + forget_consumer/2]). +%% callbacks -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, - handle_info/2, prioritise_call/3]). --export([start_link/0, make_token/0, make_token/1, is_enabled/1, enable/2, - disable/1]). --export([limit/2, can_send/3, ack/2, register/2, unregister/2]). --export([get_limit/1, block/1, unblock/1, is_blocked/1]). + handle_info/2, prioritise_call/4]). %%---------------------------------------------------------------------------- --record(token, {pid, enabled}). +-record(lstate, {pid, prefetch_limited, blocked}). +-record(qstate, {pid, state, credits}). -ifdef(use_specs). --export_type([token/0]). - --opaque(token() :: #token{}). +-type(lstate() :: #lstate{pid :: pid(), + prefetch_limited :: boolean(), + blocked :: boolean()}). +-type(qstate() :: #qstate{pid :: pid(), + state :: 'dormant' | 'active' | 'suspended'}). -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). --spec(make_token/0 :: () -> token()). --spec(make_token/1 :: ('undefined' | pid()) -> token()). --spec(is_enabled/1 :: (token()) -> boolean()). --spec(enable/2 :: (token(), non_neg_integer()) -> token()). --spec(disable/1 :: (token()) -> token()). --spec(limit/2 :: (token(), non_neg_integer()) -> 'ok' | {'disabled', token()}). --spec(can_send/3 :: (token(), pid(), boolean()) -> boolean()). --spec(ack/2 :: (token(), non_neg_integer()) -> 'ok'). --spec(register/2 :: (token(), pid()) -> 'ok'). --spec(unregister/2 :: (token(), pid()) -> 'ok'). --spec(get_limit/1 :: (token()) -> non_neg_integer()). --spec(block/1 :: (token()) -> 'ok'). --spec(unblock/1 :: (token()) -> 'ok' | {'disabled', token()}). --spec(is_blocked/1 :: (token()) -> boolean()). +-spec(new/1 :: (pid()) -> lstate()). + +-spec(limit_prefetch/3 :: (lstate(), non_neg_integer(), non_neg_integer()) + -> lstate()). +-spec(unlimit_prefetch/1 :: (lstate()) -> lstate()). +-spec(block/1 :: (lstate()) -> lstate()). +-spec(unblock/1 :: (lstate()) -> lstate()). +-spec(is_prefetch_limited/1 :: (lstate()) -> boolean()). +-spec(is_blocked/1 :: (lstate()) -> boolean()). +-spec(is_active/1 :: (lstate()) -> boolean()). +-spec(get_prefetch_limit/1 :: (lstate()) -> non_neg_integer()). +-spec(ack/2 :: (lstate(), non_neg_integer()) -> 'ok'). +-spec(pid/1 :: (lstate()) -> pid()). + +-spec(client/1 :: (pid()) -> qstate()). +-spec(activate/1 :: (qstate()) -> qstate()). +-spec(can_send/3 :: (qstate(), boolean(), rabbit_types:ctag()) -> + {'continue' | 'suspend', qstate()}). +-spec(resume/1 :: (qstate()) -> qstate()). +-spec(deactivate/1 :: (qstate()) -> qstate()). +-spec(is_suspended/1 :: (qstate()) -> boolean()). +-spec(is_consumer_blocked/2 :: (qstate(), rabbit_types:ctag()) -> boolean()). +-spec(credit/4 :: (qstate(), rabbit_types:ctag(), non_neg_integer(), boolean()) + -> qstate()). +-spec(drained/1 :: (qstate()) + -> {[{rabbit_types:ctag(), non_neg_integer()}], qstate()}). +-spec(forget_consumer/2 :: (qstate(), rabbit_types:ctag()) -> qstate()). -endif. @@ -64,120 +187,181 @@ %% notified of a change in the limit or volume that may allow it to %% deliver more messages via the limiter's channel. +-record(credit, {credit = 0, drain = false}). + %%---------------------------------------------------------------------------- %% API %%---------------------------------------------------------------------------- start_link() -> gen_server2:start_link(?MODULE, [], []). -make_token() -> make_token(undefined). -make_token(Pid) -> #token{pid = Pid, enabled = false}. +new(Pid) -> + %% this a 'call' to ensure that it is invoked at most once. + ok = gen_server:call(Pid, {new, self()}), + #lstate{pid = Pid, prefetch_limited = false, blocked = false}. -is_enabled(#token{enabled = Enabled}) -> Enabled. +limit_prefetch(L, PrefetchCount, UnackedCount) when PrefetchCount > 0 -> + ok = gen_server:call(L#lstate.pid, + {limit_prefetch, PrefetchCount, UnackedCount}), + L#lstate{prefetch_limited = true}. -enable(#token{pid = Pid} = Token, Volume) -> - gen_server2:call(Pid, {enable, Token, self(), Volume}, infinity). +unlimit_prefetch(L) -> + ok = gen_server:call(L#lstate.pid, unlimit_prefetch), + L#lstate{prefetch_limited = false}. -disable(#token{pid = Pid} = Token) -> - gen_server2:call(Pid, {disable, Token}, infinity). +block(L) -> + ok = gen_server:call(L#lstate.pid, block), + L#lstate{blocked = true}. -limit(Limiter, PrefetchCount) -> - maybe_call(Limiter, {limit, PrefetchCount, Limiter}, ok). +unblock(L) -> + ok = gen_server:call(L#lstate.pid, unblock), + L#lstate{blocked = false}. -%% Ask the limiter whether the queue can deliver a message without -%% breaching a limit. Note that we don't use maybe_call here in order -%% to avoid always going through with_exit_handler/2, even when the -%% limiter is disabled. -can_send(#token{pid = Pid, enabled = true}, QPid, AckRequired) -> - rabbit_misc:with_exit_handler( - fun () -> true end, - fun () -> - gen_server2:call(Pid, {can_send, QPid, AckRequired}, infinity) - end); -can_send(_, _, _) -> - true. +is_prefetch_limited(#lstate{prefetch_limited = Limited}) -> Limited. + +is_blocked(#lstate{blocked = Blocked}) -> Blocked. + +is_active(L) -> is_prefetch_limited(L) orelse is_blocked(L). + +get_prefetch_limit(#lstate{prefetch_limited = false}) -> 0; +get_prefetch_limit(L) -> gen_server:call(L#lstate.pid, get_prefetch_limit). -%% Let the limiter know that the channel has received some acks from a -%% consumer -ack(Limiter, Count) -> maybe_cast(Limiter, {ack, Count}). +ack(#lstate{prefetch_limited = false}, _AckCount) -> ok; +ack(L, AckCount) -> gen_server:cast(L#lstate.pid, {ack, AckCount}). -register(Limiter, QPid) -> maybe_cast(Limiter, {register, QPid}). +pid(#lstate{pid = Pid}) -> Pid. -unregister(Limiter, QPid) -> maybe_cast(Limiter, {unregister, QPid}). +client(Pid) -> #qstate{pid = Pid, state = dormant, credits = gb_trees:empty()}. -get_limit(Limiter) -> +activate(L = #qstate{state = dormant}) -> + ok = gen_server:cast(L#qstate.pid, {register, self()}), + L#qstate{state = active}; +activate(L) -> L. + +can_send(L = #qstate{pid = Pid, state = State, credits = Credits}, + AckRequired, CTag) -> + case is_consumer_blocked(L, CTag) of + false -> case (State =/= active orelse + safe_call(Pid, {can_send, self(), AckRequired}, true)) of + true -> {continue, L#qstate{ + credits = record_send_q(CTag, Credits)}}; + false -> {suspend, L#qstate{state = suspended}} + end; + true -> {suspend, L} + end. + +safe_call(Pid, Msg, ExitValue) -> rabbit_misc:with_exit_handler( - fun () -> 0 end, - fun () -> maybe_call(Limiter, get_limit, 0) end). + fun () -> ExitValue end, + fun () -> gen_server2:call(Pid, Msg, infinity) end). + +resume(L) -> L#qstate{state = active}. -block(Limiter) -> - maybe_call(Limiter, block, ok). +deactivate(L = #qstate{state = dormant}) -> L; +deactivate(L) -> + ok = gen_server:cast(L#qstate.pid, {unregister, self()}), + L#qstate{state = dormant}. + +is_suspended(#qstate{state = suspended}) -> true; +is_suspended(#qstate{}) -> false. + +is_consumer_blocked(#qstate{credits = Credits}, CTag) -> + case gb_trees:lookup(CTag, Credits) of + {value, #credit{credit = C}} when C > 0 -> false; + {value, #credit{}} -> true; + none -> false + end. -unblock(Limiter) -> - maybe_call(Limiter, {unblock, Limiter}, ok). +credit(Limiter = #qstate{credits = Credits}, CTag, Credit, Drain) -> + Limiter#qstate{credits = update_credit(CTag, Credit, Drain, Credits)}. -is_blocked(Limiter) -> - maybe_call(Limiter, is_blocked, false). +drained(Limiter = #qstate{credits = Credits}) -> + {CTagCredits, Credits2} = + rabbit_misc:gb_trees_fold( + fun (CTag, #credit{credit = C, drain = true}, {Acc, Creds0}) -> + {[{CTag, C} | Acc], update_credit(CTag, 0, false, Creds0)}; + (_CTag, #credit{credit = _C, drain = false}, {Acc, Creds0}) -> + {Acc, Creds0} + end, {[], Credits}, Credits), + {CTagCredits, Limiter#qstate{credits = Credits2}}. + +forget_consumer(Limiter = #qstate{credits = Credits}, CTag) -> + Limiter#qstate{credits = gb_trees:delete_any(CTag, Credits)}. + +%%---------------------------------------------------------------------------- +%% Queue-local code +%%---------------------------------------------------------------------------- + +%% We want to do all the AMQP 1.0-ish link level credit calculations +%% in the queue (to do them elsewhere introduces a ton of +%% races). However, it's a big chunk of code that is conceptually very +%% linked to the limiter concept. So we get the queue to hold a bit of +%% state for us (#qstate.credits), and maintain a fiction that the +%% limiter is making the decisions... + +record_send_q(CTag, Credits) -> + case gb_trees:lookup(CTag, Credits) of + {value, #credit{credit = Credit, drain = Drain}} -> + update_credit(CTag, Credit - 1, Drain, Credits); + none -> + Credits + end. + +update_credit(CTag, Credit, Drain, Credits) -> + %% Using up all credit implies no need to send a 'drained' event + Drain1 = Drain andalso Credit > 0, + gb_trees:enter(CTag, #credit{credit = Credit, drain = Drain1}, Credits). %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- -init([]) -> - {ok, #lim{}}. +init([]) -> {ok, #lim{}}. + +prioritise_call(get_prefetch_limit, _From, _Len, _State) -> 9; +prioritise_call(_Msg, _From, _Len, _State) -> 0. -prioritise_call(get_limit, _From, _State) -> 9; -prioritise_call(_Msg, _From, _State) -> 0. +handle_call({new, ChPid}, _From, State = #lim{ch_pid = undefined}) -> + {reply, ok, State#lim{ch_pid = ChPid}}; + +handle_call({limit_prefetch, PrefetchCount, UnackedCount}, _From, State) -> + %% assertion + true = State#lim.prefetch_count == 0 orelse + State#lim.volume == UnackedCount, + {reply, ok, maybe_notify(State, State#lim{prefetch_count = PrefetchCount, + volume = UnackedCount})}; + +handle_call(unlimit_prefetch, _From, State) -> + {reply, ok, maybe_notify(State, State#lim{prefetch_count = 0, + volume = 0})}; + +handle_call(block, _From, State) -> + {reply, ok, State#lim{blocked = true}}; + +handle_call(unblock, _From, State) -> + {reply, ok, maybe_notify(State, State#lim{blocked = false})}; + +handle_call(get_prefetch_limit, _From, + State = #lim{prefetch_count = PrefetchCount}) -> + {reply, PrefetchCount, State}; handle_call({can_send, QPid, _AckRequired}, _From, State = #lim{blocked = true}) -> {reply, false, limit_queue(QPid, State)}; handle_call({can_send, QPid, AckRequired}, _From, State = #lim{volume = Volume}) -> - case limit_reached(State) of + case prefetch_limit_reached(State) of true -> {reply, false, limit_queue(QPid, State)}; false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1; true -> Volume end}} - end; - -handle_call(get_limit, _From, State = #lim{prefetch_count = PrefetchCount}) -> - {reply, PrefetchCount, State}; - -handle_call({limit, PrefetchCount, Token}, _From, State) -> - case maybe_notify(State, State#lim{prefetch_count = PrefetchCount}) of - {cont, State1} -> - {reply, ok, State1}; - {stop, State1} -> - {reply, {disabled, Token#token{enabled = false}}, State1} - end; - -handle_call(block, _From, State) -> - {reply, ok, State#lim{blocked = true}}; - -handle_call({unblock, Token}, _From, State) -> - case maybe_notify(State, State#lim{blocked = false}) of - {cont, State1} -> - {reply, ok, State1}; - {stop, State1} -> - {reply, {disabled, Token#token{enabled = false}}, State1} - end; - -handle_call(is_blocked, _From, State) -> - {reply, blocked(State), State}; - -handle_call({enable, Token, Channel, Volume}, _From, State) -> - {reply, Token#token{enabled = true}, - State#lim{ch_pid = Channel, volume = Volume}}; -handle_call({disable, Token}, _From, State) -> - {reply, Token#token{enabled = false}, State}. + end. handle_cast({ack, Count}, State = #lim{volume = Volume}) -> NewVolume = if Volume == 0 -> 0; true -> Volume - Count end, - {cont, State1} = maybe_notify(State, State#lim{volume = NewVolume}), - {noreply, State1}; + {noreply, maybe_notify(State, State#lim{volume = NewVolume})}; handle_cast({register, QPid}, State) -> {noreply, remember_queue(QPid, State)}; @@ -199,27 +383,13 @@ code_change(_, State, _) -> %%---------------------------------------------------------------------------- maybe_notify(OldState, NewState) -> - case (limit_reached(OldState) orelse blocked(OldState)) andalso - not (limit_reached(NewState) orelse blocked(NewState)) of - true -> NewState1 = notify_queues(NewState), - {case NewState1#lim.prefetch_count of - 0 -> stop; - _ -> cont - end, NewState1}; - false -> {cont, NewState} + case (prefetch_limit_reached(OldState) orelse blocked(OldState)) andalso + not (prefetch_limit_reached(NewState) orelse blocked(NewState)) of + true -> notify_queues(NewState); + false -> NewState end. -maybe_call(#token{pid = Pid, enabled = true}, Call, _Default) -> - gen_server2:call(Pid, Call, infinity); -maybe_call(_, _Call, Default) -> - Default. - -maybe_cast(#token{pid = Pid, enabled = true}, Cast) -> - gen_server2:cast(Pid, Cast); -maybe_cast(_, _Call) -> - ok. - -limit_reached(#lim{prefetch_count = Limit, volume = Volume}) -> +prefetch_limit_reached(#lim{prefetch_count = Limit, volume = Volume}) -> Limit =/= 0 andalso Volume >= Limit. blocked(#lim{blocked = Blocked}) -> Blocked. @@ -231,10 +401,9 @@ remember_queue(QPid, State = #lim{queues = Queues}) -> true -> State end. -forget_queue(QPid, State = #lim{ch_pid = ChPid, queues = Queues}) -> +forget_queue(QPid, State = #lim{queues = Queues}) -> case orddict:find(QPid, Queues) of {ok, {MRef, _}} -> true = erlang:demonitor(MRef), - ok = rabbit_amqqueue:unblock(QPid, ChPid), State#lim{queues = orddict:erase(QPid, Queues)}; error -> State end. @@ -251,13 +420,13 @@ notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) -> end, {[], Queues}, Queues), case length(QList) of 0 -> ok; - 1 -> ok = rabbit_amqqueue:unblock(hd(QList), ChPid); %% common case + 1 -> ok = rabbit_amqqueue:resume(hd(QList), ChPid); %% common case L -> %% We randomly vary the position of queues in the list, %% thus ensuring that each queue has an equal chance of %% being notified first. {L1, L2} = lists:split(random:uniform(L), QList), - [[ok = rabbit_amqqueue:unblock(Q, ChPid) || Q <- L3] + [[ok = rabbit_amqqueue:resume(Q, ChPid) || Q <- L3] || L3 <- [L2, L1]], ok end, diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index b435e0f3..22edfcb6 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -27,8 +27,8 @@ -export([start_link/1, set_maximum_since_use/2, info/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3, handle_pre_hibernate/1, prioritise_call/3, - prioritise_cast/2, prioritise_info/2, format_message_queue/2]). + code_change/3, handle_pre_hibernate/1, prioritise_call/4, + prioritise_cast/3, prioritise_info/3, format_message_queue/2]). -export([joined/2, members_changed/3, handle_msg/3]). @@ -323,14 +323,14 @@ handle_pre_hibernate(State = #state { backing_queue = BQ, BQS3 = BQ:handle_pre_hibernate(BQS2), {hibernate, stop_rate_timer(State #state { backing_queue_state = BQS3 })}. -prioritise_call(Msg, _From, _State) -> +prioritise_call(Msg, _From, _Len, _State) -> case Msg of info -> 9; {gm_deaths, _Deaths} -> 5; _ -> 0 end. -prioritise_cast(Msg, _State) -> +prioritise_cast(Msg, _Len, _State) -> case Msg of {set_ram_duration_target, _Duration} -> 8; {set_maximum_since_use, _Age} -> 8; @@ -339,7 +339,7 @@ prioritise_cast(Msg, _State) -> _ -> 0 end. -prioritise_info(Msg, _State) -> +prioritise_info(Msg, _Len, _State) -> case Msg of update_ram_duration -> 8; sync_timeout -> 6; diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 13b40a48..2344b1b2 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -29,8 +29,8 @@ -export([transform_dir/3, force_recovery/2]). %% upgrade -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3, prioritise_call/3, prioritise_cast/2, - prioritise_info/2, format_message_queue/2]). + code_change/3, prioritise_call/4, prioritise_cast/3, + prioritise_info/3, format_message_queue/2]). %%---------------------------------------------------------------------------- @@ -738,7 +738,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -prioritise_call(Msg, _From, _State) -> +prioritise_call(Msg, _From, _Len, _State) -> case Msg of successfully_recovered_state -> 7; {new_client_state, _Ref, _Pid, _MODC, _CloseFDsFun} -> 7; @@ -746,7 +746,7 @@ prioritise_call(Msg, _From, _State) -> _ -> 0 end. -prioritise_cast(Msg, _State) -> +prioritise_cast(Msg, _Len, _State) -> case Msg of {combine_files, _Source, _Destination, _Reclaimed} -> 8; {delete_file, _File, _Reclaimed} -> 8; @@ -755,7 +755,7 @@ prioritise_cast(Msg, _State) -> _ -> 0 end. -prioritise_info(Msg, _State) -> +prioritise_info(Msg, _Len, _State) -> case Msg of sync -> 8; _ -> 0 diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl index 3881de23..0dd7a7cc 100644 --- a/src/rabbit_msg_store_gc.erl +++ b/src/rabbit_msg_store_gc.erl @@ -23,7 +23,7 @@ -export([set_maximum_since_use/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3, prioritise_cast/2]). + terminate/2, code_change/3, prioritise_cast/3]). -record(state, { pending_no_readers, @@ -79,8 +79,8 @@ init([MsgStoreState]) -> msg_store_state = MsgStoreState }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -prioritise_cast({set_maximum_since_use, _Age}, _State) -> 8; -prioritise_cast(_Msg, _State) -> 0. +prioritise_cast({set_maximum_since_use, _Age}, _Len, _State) -> 8; +prioritise_cast(_Msg, _Len, _State) -> 0. handle_call(stop, _From, State) -> {stop, normal, ok, State}. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index aaaa179a..61fac0e2 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -306,8 +306,10 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) -> closed when State#v1.connection_state =:= closed -> ok; closed -> + maybe_emit_stats(State), throw(connection_closed_abruptly); {error, Reason} -> + maybe_emit_stats(State), throw({inet_error, Reason}); {other, {system, From, Request}} -> sys:handle_system_msg(Request, From, State#v1.parent, @@ -338,23 +340,28 @@ handle_other({'EXIT', Parent, Reason}, State = #v1{parent = Parent}) -> %% ordinary error case. However, since this termination is %% initiated by our parent it is probably more important to exit %% quickly. + maybe_emit_stats(State), exit(Reason); -handle_other({channel_exit, _Channel, E = {writer, send_failed, _E}}, _State) -> +handle_other({channel_exit, _Channel, E = {writer, send_failed, _E}}, State) -> + maybe_emit_stats(State), throw(E); handle_other({channel_exit, Channel, Reason}, State) -> handle_exception(State, Channel, Reason); handle_other({'DOWN', _MRef, process, ChPid, Reason}, State) -> handle_dependent_exit(ChPid, Reason, State); -handle_other(terminate_connection, _State) -> +handle_other(terminate_connection, State) -> + maybe_emit_stats(State), stop; handle_other(handshake_timeout, State) when ?IS_RUNNING(State) orelse ?IS_STOPPING(State) -> State; handle_other(handshake_timeout, State) -> + maybe_emit_stats(State), throw({handshake_timeout, State#v1.callback}); handle_other(heartbeat_timeout, State = #v1{connection_state = closed}) -> State; -handle_other(heartbeat_timeout, #v1{connection_state = S}) -> +handle_other(heartbeat_timeout, State = #v1{connection_state = S}) -> + maybe_emit_stats(State), throw({heartbeat_timeout, S}); handle_other({'$gen_call', From, {shutdown, Explanation}}, State) -> {ForceTermination, NewState} = terminate(Explanation, State), @@ -386,8 +393,9 @@ handle_other(emit_stats, State) -> handle_other({bump_credit, Msg}, State) -> credit_flow:handle_bump_msg(Msg), control_throttle(State); -handle_other(Other, _State) -> +handle_other(Other, State) -> %% internal error -> something worth dying for + maybe_emit_stats(State), exit({unexpected_message, Other}). switch_callback(State, Callback, Length) -> @@ -850,8 +858,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, rabbit_event:notify(connection_created, [{type, network} | infos(?CREATION_EVENT_KEYS, State1)]), - rabbit_event:if_enabled(State1, #v1.stats_timer, - fun() -> emit_stats(State1) end), + maybe_emit_stats(State1), State1; handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) -> lists:foreach(fun rabbit_channel:shutdown/1, all_channels()), @@ -1010,6 +1017,10 @@ cert_info(F, #v1{sock = Sock}) -> {ok, Cert} -> list_to_binary(F(Cert)) end. +maybe_emit_stats(State) -> + rabbit_event:if_enabled(State, #v1.stats_timer, + fun() -> emit_stats(State) end). + emit_stats(State) -> rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)), rabbit_event:reset_stats_timer(State, #v1.stats_timer). diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl index 3514e780..acdc2cff 100644 --- a/src/rabbit_registry.erl +++ b/src/rabbit_registry.erl @@ -84,12 +84,34 @@ internal_binary_to_type(TypeBin) when is_binary(TypeBin) -> internal_register(Class, TypeName, ModuleName) when is_atom(Class), is_binary(TypeName), is_atom(ModuleName) -> ok = sanity_check_module(class_module(Class), ModuleName), - true = ets:insert(?ETS_NAME, - {{Class, internal_binary_to_type(TypeName)}, ModuleName}), + RegArg = {{Class, internal_binary_to_type(TypeName)}, ModuleName}, + true = ets:insert(?ETS_NAME, RegArg), + conditional_register(RegArg), ok. internal_unregister(Class, TypeName) -> - true = ets:delete(?ETS_NAME, {Class, internal_binary_to_type(TypeName)}), + UnregArg = {Class, internal_binary_to_type(TypeName)}, + conditional_unregister(UnregArg), + true = ets:delete(?ETS_NAME, UnregArg), + ok. + +%% register exchange decorator route callback only when implemented, +%% in order to avoid unnecessary decorator calls on the fast +%% publishing path +conditional_register({{exchange_decorator, Type}, ModuleName}) -> + case erlang:function_exported(ModuleName, route, 2) of + true -> true = ets:insert(?ETS_NAME, + {{exchange_decorator_route, Type}, + ModuleName}); + false -> ok + end; +conditional_register(_) -> + ok. + +conditional_unregister({exchange_decorator, Type}) -> + true = ets:delete(?ETS_NAME, {exchange_decorator_route, Type}), + ok; +conditional_unregister(_) -> ok. sanity_check_module(ClassModule, Module) -> @@ -104,12 +126,11 @@ sanity_check_module(ClassModule, Module) -> true -> ok end. -class_module(exchange) -> rabbit_exchange_type; -class_module(auth_mechanism) -> rabbit_auth_mechanism; -class_module(runtime_parameter) -> rabbit_runtime_parameter; -class_module(exchange_decorator) -> rabbit_exchange_decorator; -class_module(exchange_decorator_route) -> rabbit_exchange_decorator; -class_module(policy_validator) -> rabbit_policy_validator. +class_module(exchange) -> rabbit_exchange_type; +class_module(auth_mechanism) -> rabbit_auth_mechanism; +class_module(runtime_parameter) -> rabbit_runtime_parameter; +class_module(exchange_decorator) -> rabbit_exchange_decorator; +class_module(policy_validator) -> rabbit_policy_validator. %%--------------------------------------------------------------------------- diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 1188c554..e7b69879 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1100,20 +1100,14 @@ test_policy_validation() -> test_server_status() -> %% create a few things so there is some useful information to list - Writer = spawn(fun test_writer/0), - {ok, Ch} = rabbit_channel:start_link( - 1, self(), Writer, self(), "", rabbit_framing_amqp_0_9_1, - user(<<"user">>), <<"/">>, [], self(), - rabbit_limiter:make_token(self())), + {_Writer, Limiter, Ch} = test_channel(), [Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>], {new, Queue = #amqqueue{}} <- [rabbit_amqqueue:declare( rabbit_misc:r(<<"/">>, queue, Name), false, false, [], none)]], - ok = rabbit_amqqueue:basic_consume( - Q, true, Ch, rabbit_limiter:make_token(), - <<"ctag">>, true, undefined), + Q, true, Ch, Limiter, false, <<"ctag">>, true, none, undefined), %% list queues ok = info_action(list_queues, rabbit_amqqueue:info_keys(), true), @@ -1191,8 +1185,6 @@ find_listener() -> N =:= node()], {H, P}. -test_writer() -> test_writer(none). - test_writer(Pid) -> receive {'$gen_call', From, flush} -> gen_server:reply(From, ok), @@ -1202,13 +1194,17 @@ test_writer(Pid) -> shutdown -> ok end. -test_spawn() -> +test_channel() -> Me = self(), Writer = spawn(fun () -> test_writer(Me) end), + {ok, Limiter} = rabbit_limiter:start_link(), {ok, Ch} = rabbit_channel:start_link( 1, Me, Writer, Me, "", rabbit_framing_amqp_0_9_1, - user(<<"guest">>), <<"/">>, [], Me, - rabbit_limiter:make_token(self())), + user(<<"guest">>), <<"/">>, [], Me, Limiter), + {Writer, Limiter, Ch}. + +test_spawn() -> + {Writer, _Limiter, Ch} = test_channel(), ok = rabbit_channel:do(Ch, #'channel.open'{}), receive #'channel.open_ok'{} -> ok after ?TIMEOUT -> throw(failed_to_receive_channel_open_ok) @@ -1580,7 +1576,7 @@ control_action(Command, Node, Args, Opts) -> info_action(Command, Args, CheckVHost) -> ok = control_action(Command, []), - if CheckVHost -> ok = control_action(Command, []); + if CheckVHost -> ok = control_action(Command, [], ["-p", "/"]); true -> ok end, ok = control_action(Command, lists:map(fun atom_to_list/1, Args)), @@ -2722,12 +2718,13 @@ test_queue_recover() -> end, rabbit_amqqueue:stop(), rabbit_amqqueue:start(rabbit_amqqueue:recover()), + {ok, Limiter} = rabbit_limiter:start_link(), rabbit_amqqueue:with_or_die( QName, fun (Q1 = #amqqueue { pid = QPid1 }) -> CountMinusOne = Count - 1, {ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} = - rabbit_amqqueue:basic_get(Q1, self(), false), + rabbit_amqqueue:basic_get(Q1, self(), false, Limiter), exit(QPid1, shutdown), VQ1 = variable_queue_init(Q, true), {{_Msg1, true, _AckTag1}, VQ2} = @@ -2748,9 +2745,11 @@ test_variable_queue_delete_msg_store_files_callback() -> rabbit_amqqueue:set_ram_duration_target(QPid, 0), + {ok, Limiter} = rabbit_limiter:start_link(), + CountMinusOne = Count - 1, {ok, CountMinusOne, {QName, QPid, _AckTag, false, _Msg}} = - rabbit_amqqueue:basic_get(Q, self(), true), + rabbit_amqqueue:basic_get(Q, self(), true, Limiter), {ok, CountMinusOne} = rabbit_amqqueue:purge(Q), %% give the queue a second to receive the close_fds callback msg diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index d0f39221..2858cf58 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -70,6 +70,7 @@ add(VHostPath) -> {<<"amq.rabbitmq.trace">>, topic}]], ok end), + rabbit_event:notify(vhost_created, info(VHostPath)), R. delete(VHostPath) -> @@ -87,6 +88,7 @@ delete(VHostPath) -> with(VHostPath, fun () -> ok = internal_delete(VHostPath) end)), + ok = rabbit_event:notify(vhost_deleted, [{name, VHostPath}]), R. internal_delete(VHostPath) -> diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl index 56e4b7b3..22b223d2 100644 --- a/src/worker_pool_worker.erl +++ b/src/worker_pool_worker.erl @@ -23,7 +23,7 @@ -export([set_maximum_since_use/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3, prioritise_cast/2]). + terminate/2, code_change/3, prioritise_cast/3]). %%---------------------------------------------------------------------------- @@ -73,8 +73,8 @@ init([WId]) -> {ok, WId, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -prioritise_cast({set_maximum_since_use, _Age}, _State) -> 8; -prioritise_cast(_Msg, _State) -> 0. +prioritise_cast({set_maximum_since_use, _Age}, _Len, _State) -> 8; +prioritise_cast(_Msg, _Len, _State) -> 0. handle_call({submit, Fun}, From, WId) -> gen_server2:reply(From, run(Fun)), |