diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-10-25 18:26:18 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-10-25 18:26:18 +0100 |
commit | 3777b5c641be257d62d9a4c1a7736bb6aed1c436 (patch) | |
tree | 5bafeeef9548fa1a7208a0969fdf36a9aaa531be /src | |
parent | 5eef275a9a465662082de22d9196b0a834d77c28 (diff) | |
parent | bb516c47652a81dba4a18583004c743eef26b079 (diff) | |
download | rabbitmq-server-3777b5c641be257d62d9a4c1a7736bb6aed1c436.tar.gz |
Merge in stable
Diffstat (limited to 'src')
46 files changed, 2182 insertions, 1167 deletions
diff --git a/src/delegate.erl b/src/delegate.erl index 4e1dcd2e..0331ca01 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -18,23 +18,32 @@ -behaviour(gen_server2). --export([start_link/1, invoke_no_result/2, invoke/2, call/2, cast/2]). +-export([start_link/1, invoke_no_result/2, invoke/2, + monitor/2, demonitor/1, call/2, cast/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +-record(state, {node, monitors, name}). + %%---------------------------------------------------------------------------- -ifdef(use_specs). +-export_type([monitor_ref/0]). + +-type(monitor_ref() :: reference() | {atom(), pid()}). +-type(fun_or_mfa(A) :: fun ((pid()) -> A) | {atom(), atom(), [any()]}). + -spec(start_link/1 :: (non_neg_integer()) -> {'ok', pid()} | ignore | {'error', any()}). --spec(invoke/2 :: - ( pid(), fun ((pid()) -> A)) -> A; - ([pid()], fun ((pid()) -> A)) -> {[{pid(), A}], - [{pid(), term()}]}). --spec(invoke_no_result/2 :: - (pid() | [pid()], fun ((pid()) -> any())) -> 'ok'). +-spec(invoke/2 :: ( pid(), fun_or_mfa(A)) -> A; + ([pid()], fun_or_mfa(A)) -> {[{pid(), A}], + [{pid(), term()}]}). +-spec(invoke_no_result/2 :: (pid() | [pid()], fun_or_mfa(any())) -> 'ok'). +-spec(monitor/2 :: ('process', pid()) -> monitor_ref()). +-spec(demonitor/1 :: (monitor_ref()) -> 'true'). + -spec(call/2 :: ( pid(), any()) -> any(); ([pid()], any()) -> {[{pid(), any()}], [{pid(), term()}]}). @@ -50,26 +59,27 @@ %%---------------------------------------------------------------------------- start_link(Num) -> - gen_server2:start_link({local, delegate_name(Num)}, ?MODULE, [], []). + Name = delegate_name(Num), + gen_server2:start_link({local, Name}, ?MODULE, [Name], []). -invoke(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() -> - Fun(Pid); -invoke(Pid, Fun) when is_pid(Pid) -> - case invoke([Pid], Fun) of +invoke(Pid, FunOrMFA) when is_pid(Pid) andalso node(Pid) =:= node() -> + apply1(FunOrMFA, Pid); +invoke(Pid, FunOrMFA) when is_pid(Pid) -> + case invoke([Pid], FunOrMFA) of {[{Pid, Result}], []} -> Result; {[], [{Pid, {Class, Reason, StackTrace}}]} -> erlang:raise(Class, Reason, StackTrace) end; -invoke([], _Fun) -> %% optimisation +invoke([], _FunOrMFA) -> %% optimisation {[], []}; -invoke([Pid], Fun) when node(Pid) =:= node() -> %% optimisation - case safe_invoke(Pid, Fun) of +invoke([Pid], FunOrMFA) when node(Pid) =:= node() -> %% optimisation + case safe_invoke(Pid, FunOrMFA) of {ok, _, Result} -> {[{Pid, Result}], []}; {error, _, Error} -> {[], [{Pid, Error}]} end; -invoke(Pids, Fun) when is_list(Pids) -> +invoke(Pids, FunOrMFA) when is_list(Pids) -> {LocalPids, Grouped} = group_pids_by_node(Pids), %% The use of multi_call is only safe because the timeout is %% infinity, and thus there is no process spawned in order to do @@ -78,45 +88,58 @@ invoke(Pids, Fun) when is_list(Pids) -> case orddict:fetch_keys(Grouped) of [] -> {[], []}; RemoteNodes -> gen_server2:multi_call( - RemoteNodes, delegate(RemoteNodes), - {invoke, Fun, Grouped}, infinity) + RemoteNodes, delegate(self(), RemoteNodes), + {invoke, FunOrMFA, Grouped}, infinity) end, BadPids = [{Pid, {exit, {nodedown, BadNode}, []}} || BadNode <- BadNodes, Pid <- orddict:fetch(BadNode, Grouped)], - ResultsNoNode = lists:append([safe_invoke(LocalPids, Fun) | + ResultsNoNode = lists:append([safe_invoke(LocalPids, FunOrMFA) | [Results || {_Node, Results} <- Replies]]), lists:foldl( fun ({ok, Pid, Result}, {Good, Bad}) -> {[{Pid, Result} | Good], Bad}; ({error, Pid, Error}, {Good, Bad}) -> {Good, [{Pid, Error} | Bad]} end, {[], BadPids}, ResultsNoNode). -invoke_no_result(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() -> - safe_invoke(Pid, Fun), %% we don't care about any error +invoke_no_result(Pid, FunOrMFA) when is_pid(Pid) andalso node(Pid) =:= node() -> + safe_invoke(Pid, FunOrMFA), %% we don't care about any error ok; -invoke_no_result(Pid, Fun) when is_pid(Pid) -> - invoke_no_result([Pid], Fun); +invoke_no_result(Pid, FunOrMFA) when is_pid(Pid) -> + invoke_no_result([Pid], FunOrMFA); -invoke_no_result([], _Fun) -> %% optimisation +invoke_no_result([], _FunOrMFA) -> %% optimisation ok; -invoke_no_result([Pid], Fun) when node(Pid) =:= node() -> %% optimisation - safe_invoke(Pid, Fun), %% must not die +invoke_no_result([Pid], FunOrMFA) when node(Pid) =:= node() -> %% optimisation + safe_invoke(Pid, FunOrMFA), %% must not die ok; -invoke_no_result(Pids, Fun) when is_list(Pids) -> +invoke_no_result(Pids, FunOrMFA) when is_list(Pids) -> {LocalPids, Grouped} = group_pids_by_node(Pids), case orddict:fetch_keys(Grouped) of [] -> ok; - RemoteNodes -> gen_server2:abcast(RemoteNodes, delegate(RemoteNodes), - {invoke, Fun, Grouped}) + RemoteNodes -> gen_server2:abcast( + RemoteNodes, delegate(self(), RemoteNodes), + {invoke, FunOrMFA, Grouped}) end, - safe_invoke(LocalPids, Fun), %% must not die + safe_invoke(LocalPids, FunOrMFA), %% must not die ok. +monitor(process, Pid) when node(Pid) =:= node() -> + erlang:monitor(process, Pid); +monitor(process, Pid) -> + Name = delegate(Pid, [node(Pid)]), + gen_server2:cast(Name, {monitor, self(), Pid}), + {Name, Pid}. + +demonitor(Ref) when is_reference(Ref) -> + erlang:demonitor(Ref); +demonitor({Name, Pid}) -> + gen_server2:cast(Name, {demonitor, self(), Pid}). + call(PidOrPids, Msg) -> - invoke(PidOrPids, fun (P) -> gen_server2:call(P, Msg, infinity) end). + invoke(PidOrPids, {gen_server2, call, [Msg, infinity]}). cast(PidOrPids, Msg) -> - invoke_no_result(PidOrPids, fun (P) -> gen_server2:cast(P, Msg) end). + invoke_no_result(PidOrPids, {gen_server2, cast, [Msg]}). %%---------------------------------------------------------------------------- @@ -134,43 +157,88 @@ group_pids_by_node(Pids) -> delegate_name(Hash) -> list_to_atom("delegate_" ++ integer_to_list(Hash)). -delegate(RemoteNodes) -> +delegate(Pid, RemoteNodes) -> case get(delegate) of undefined -> Name = delegate_name( - erlang:phash2(self(), + erlang:phash2(Pid, delegate_sup:count(RemoteNodes))), put(delegate, Name), Name; Name -> Name end. -safe_invoke(Pids, Fun) when is_list(Pids) -> - [safe_invoke(Pid, Fun) || Pid <- Pids]; -safe_invoke(Pid, Fun) when is_pid(Pid) -> +safe_invoke(Pids, FunOrMFA) when is_list(Pids) -> + [safe_invoke(Pid, FunOrMFA) || Pid <- Pids]; +safe_invoke(Pid, FunOrMFA) when is_pid(Pid) -> try - {ok, Pid, Fun(Pid)} + {ok, Pid, apply1(FunOrMFA, Pid)} catch Class:Reason -> {error, Pid, {Class, Reason, erlang:get_stacktrace()}} end. +apply1({M, F, A}, Arg) -> apply(M, F, [Arg | A]); +apply1(Fun, Arg) -> Fun(Arg). + %%---------------------------------------------------------------------------- -init([]) -> - {ok, node(), hibernate, +init([Name]) -> + {ok, #state{node = node(), monitors = dict:new(), name = Name}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -handle_call({invoke, Fun, Grouped}, _From, Node) -> - {reply, safe_invoke(orddict:fetch(Node, Grouped), Fun), Node, hibernate}. - -handle_cast({invoke, Fun, Grouped}, Node) -> - safe_invoke(orddict:fetch(Node, Grouped), Fun), - {noreply, Node, hibernate}. - -handle_info(_Info, Node) -> - {noreply, Node, hibernate}. +handle_call({invoke, FunOrMFA, Grouped}, _From, State = #state{node = Node}) -> + {reply, safe_invoke(orddict:fetch(Node, Grouped), FunOrMFA), State, + hibernate}. + +handle_cast({monitor, MonitoringPid, Pid}, + State = #state{monitors = Monitors}) -> + Monitors1 = case dict:find(Pid, Monitors) of + {ok, {Ref, Pids}} -> + Pids1 = gb_sets:add_element(MonitoringPid, Pids), + dict:store(Pid, {Ref, Pids1}, Monitors); + error -> + Ref = erlang:monitor(process, Pid), + Pids = gb_sets:singleton(MonitoringPid), + dict:store(Pid, {Ref, Pids}, Monitors) + end, + {noreply, State#state{monitors = Monitors1}, hibernate}; + +handle_cast({demonitor, MonitoringPid, Pid}, + State = #state{monitors = Monitors}) -> + Monitors1 = case dict:find(Pid, Monitors) of + {ok, {Ref, Pids}} -> + Pids1 = gb_sets:del_element(MonitoringPid, Pids), + case gb_sets:is_empty(Pids1) of + true -> erlang:demonitor(Ref), + dict:erase(Pid, Monitors); + false -> dict:store(Pid, {Ref, Pids1}, Monitors) + end; + error -> + Monitors + end, + {noreply, State#state{monitors = Monitors1}, hibernate}; + +handle_cast({invoke, FunOrMFA, Grouped}, State = #state{node = Node}) -> + safe_invoke(orddict:fetch(Node, Grouped), FunOrMFA), + {noreply, State, hibernate}. + +handle_info({'DOWN', Ref, process, Pid, Info}, + State = #state{monitors = Monitors, name = Name}) -> + {noreply, + case dict:find(Pid, Monitors) of + {ok, {Ref, Pids}} -> + Msg = {'DOWN', {Name, Pid}, process, Pid, Info}, + gb_sets:fold(fun (MonitoringPid, _) -> MonitoringPid ! Msg end, + none, Pids), + State#state{monitors = dict:erase(Pid, Monitors)}; + error -> + State + end, hibernate}; + +handle_info(_Info, State) -> + {noreply, State, hibernate}. terminate(_Reason, _State) -> ok. -code_change(_OldVsn, Node, _Extra) -> - {ok, Node}. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl index 3b16c53a..d5f51db0 100644 --- a/src/mirrored_supervisor.erl +++ b/src/mirrored_supervisor.erl @@ -212,9 +212,8 @@ start_link0(Prefix, Group, Init) -> init(Mod, Args) -> case Mod:init(Args) of {ok, {{Bad, _, _}, _ChildSpecs}} when - Bad =:= simple_one_for_one orelse - Bad =:= simple_one_for_one_terminate -> erlang:error(badarg); - Init -> Init + Bad =:= simple_one_for_one -> erlang:error(badarg); + Init -> Init end. start_child(Sup, ChildSpec) -> call(Sup, {start_child, ChildSpec}). diff --git a/src/pmon.erl b/src/pmon.erl index b9db66fb..86308167 100644 --- a/src/pmon.erl +++ b/src/pmon.erl @@ -16,22 +16,26 @@ -module(pmon). --export([new/0, monitor/2, monitor_all/2, demonitor/2, is_monitored/2, erase/2, - monitored/1, is_empty/1]). +-export([new/0, new/1, monitor/2, monitor_all/2, demonitor/2, + is_monitored/2, erase/2, monitored/1, is_empty/1]). -compile({no_auto_import, [monitor/2]}). +-record(state, {dict, module}). + -ifdef(use_specs). %%---------------------------------------------------------------------------- -export_type([?MODULE/0]). --opaque(?MODULE() :: dict()). +-opaque(?MODULE() :: #state{dict :: dict(), + module :: atom()}). -type(item() :: pid() | {atom(), node()}). -spec(new/0 :: () -> ?MODULE()). +-spec(new/1 :: ('erlang' | 'delegate') -> ?MODULE()). -spec(monitor/2 :: (item(), ?MODULE()) -> ?MODULE()). -spec(monitor_all/2 :: ([item()], ?MODULE()) -> ?MODULE()). -spec(demonitor/2 :: (item(), ?MODULE()) -> ?MODULE()). @@ -42,29 +46,33 @@ -endif. -new() -> dict:new(). +new() -> new(erlang). + +new(Module) -> #state{dict = dict:new(), + module = Module}. -monitor(Item, M) -> +monitor(Item, S = #state{dict = M, module = Module}) -> case dict:is_key(Item, M) of - true -> M; - false -> dict:store(Item, erlang:monitor(process, Item), M) + true -> S; + false -> S#state{dict = dict:store( + Item, Module:monitor(process, Item), M)} end. -monitor_all([], M) -> M; %% optimisation -monitor_all([Item], M) -> monitor(Item, M); %% optimisation -monitor_all(Items, M) -> lists:foldl(fun monitor/2, M, Items). +monitor_all([], S) -> S; %% optimisation +monitor_all([Item], S) -> monitor(Item, S); %% optimisation +monitor_all(Items, S) -> lists:foldl(fun monitor/2, S, Items). -demonitor(Item, M) -> +demonitor(Item, S = #state{dict = M, module = Module}) -> case dict:find(Item, M) of - {ok, MRef} -> erlang:demonitor(MRef), - dict:erase(Item, M); + {ok, MRef} -> Module:demonitor(MRef), + S#state{dict = dict:erase(Item, M)}; error -> M end. -is_monitored(Item, M) -> dict:is_key(Item, M). +is_monitored(Item, #state{dict = M}) -> dict:is_key(Item, M). -erase(Item, M) -> dict:erase(Item, M). +erase(Item, S = #state{dict = M}) -> S#state{dict = dict:erase(Item, M)}. -monitored(M) -> dict:fetch_keys(M). +monitored(#state{dict = M}) -> dict:fetch_keys(M). -is_empty(M) -> dict:size(M) == 0. +is_empty(#state{dict = M}) -> dict:size(M) == 0. diff --git a/src/priority_queue.erl b/src/priority_queue.erl index 6995c3be..c76c0d33 100644 --- a/src/priority_queue.erl +++ b/src/priority_queue.erl @@ -40,8 +40,8 @@ -module(priority_queue). --export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3, - out/1, join/2]). +-export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, from_list/1, + in/2, in/3, out/1, out_p/1, join/2, filter/2, fold/3, highest/1]). %%---------------------------------------------------------------------------- @@ -59,10 +59,16 @@ -spec(is_empty/1 :: (pqueue()) -> boolean()). -spec(len/1 :: (pqueue()) -> non_neg_integer()). -spec(to_list/1 :: (pqueue()) -> [{priority(), any()}]). +-spec(from_list/1 :: ([{priority(), any()}]) -> pqueue()). -spec(in/2 :: (any(), pqueue()) -> pqueue()). -spec(in/3 :: (any(), priority(), pqueue()) -> pqueue()). -spec(out/1 :: (pqueue()) -> {empty | {value, any()}, pqueue()}). +-spec(out_p/1 :: (pqueue()) -> {empty | {value, any(), priority()}, pqueue()}). -spec(join/2 :: (pqueue(), pqueue()) -> pqueue()). +-spec(filter/2 :: (fun ((any()) -> boolean()), pqueue()) -> pqueue()). +-spec(fold/3 :: + (fun ((any(), priority(), A) -> A), A, pqueue()) -> A). +-spec(highest/1 :: (pqueue()) -> priority() | 'empty'). -endif. @@ -96,6 +102,9 @@ to_list({pqueue, Queues}) -> [{maybe_negate_priority(P), V} || {P, Q} <- Queues, {0, V} <- to_list(Q)]. +from_list(L) -> + lists:foldl(fun ({P, E}, Q) -> in(E, P, Q) end, new(), L). + in(Item, Q) -> in(Item, 0, Q). @@ -147,6 +156,14 @@ out({pqueue, [{P, Q} | Queues]}) -> end, {R, NewQ}. +out_p({queue, _, _, _} = Q) -> add_p(out(Q), 0); +out_p({pqueue, [{P, _} | _]} = Q) -> add_p(out(Q), maybe_negate_priority(P)). + +add_p(R, P) -> case R of + {empty, Q} -> {empty, Q}; + {{value, V}, Q} -> {{value, V, P}, Q} + end. + join(A, {queue, [], [], 0}) -> A; join({queue, [], [], 0}, B) -> @@ -185,6 +202,22 @@ merge([{PA, A}|As], Bs = [{PB, _}|_], Acc) when PA < PB orelse PA == infinity -> merge(As = [{_, _}|_], [{PB, B}|Bs], Acc) -> merge(As, Bs, [ {PB, B} | Acc ]). +filter(Pred, Q) -> fold(fun(V, P, Acc) -> + case Pred(V) of + true -> in(V, P, Acc); + false -> Acc + end + end, new(), Q). + +fold(Fun, Init, Q) -> case out_p(Q) of + {empty, _Q} -> Init; + {{value, V, P}, Q1} -> fold(Fun, Fun(V, P, Init), Q1) + end. + +highest({queue, [], [], 0}) -> empty; +highest({queue, _, _, _}) -> 0; +highest({pqueue, [{P, _} | _]}) -> maybe_negate_priority(P). + r2f([], 0) -> {queue, [], [], 0}; r2f([_] = R, 1) -> {queue, [], R, 1}; r2f([X,Y], 2) -> {queue, [X], [Y], 2}; diff --git a/src/rabbit.erl b/src/rabbit.erl index cb9e6376..1b7fe6da 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -588,6 +588,7 @@ boot_delegate() -> rabbit_sup:start_supervisor_child(delegate_sup, [Count]). recover() -> + rabbit_policy:recover(), Qs = rabbit_amqqueue:recover(), ok = rabbit_binding:recover(rabbit_exchange:recover(), [QName || #amqqueue{name = QName} <- Qs]), diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index 90be4f80..d54c2a8d 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -74,9 +74,7 @@ check_vhost_access(User = #user{ username = Username, true -> Module:check_vhost_access(User, VHostPath) end end, - "~s failed checking vhost access to ~s for ~s: ~p~n", - [Module, VHostPath, Username], - "access to vhost '~s' refused for user '~s'", + Module, "access to vhost '~s' refused for user '~s'", [VHostPath, Username]). check_resource_access(User, R = #resource{kind = exchange, name = <<"">>}, @@ -87,15 +85,14 @@ check_resource_access(User = #user{username = Username, auth_backend = Module}, Resource, Permission) -> check_access( fun() -> Module:check_resource_access(User, Resource, Permission) end, - "~s failed checking resource access to ~p for ~s: ~p~n", - [Module, Resource, Username], - "access to ~s refused for user '~s'", + Module, "access to ~s refused for user '~s'", [rabbit_misc:rs(Resource), Username]). -check_access(Fun, ErrStr, ErrArgs, RefStr, RefArgs) -> +check_access(Fun, Module, ErrStr, ErrArgs) -> Allow = case Fun() of - {error, _} = E -> - rabbit_log:error(ErrStr, ErrArgs ++ [E]), + {error, E} -> + rabbit_log:error(ErrStr ++ " by ~s: ~p~n", + ErrArgs ++ [Module, E]), false; Else -> Else @@ -104,5 +101,5 @@ check_access(Fun, ErrStr, ErrArgs, RefStr, RefArgs) -> true -> ok; false -> - rabbit_misc:protocol_error(access_refused, RefStr, RefArgs) + rabbit_misc:protocol_error(access_refused, ErrStr, ErrArgs) end. diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index 6607c4f6..cd1d125b 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -37,7 +37,7 @@ -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). -spec(start/0 :: () -> 'ok'). -spec(stop/0 :: () -> 'ok'). --spec(register/2 :: (pid(), rabbit_types:mfargs()) -> boolean()). +-spec(register/2 :: (pid(), rabbit_types:mfargs()) -> [atom()]). -spec(set_alarm/1 :: (any()) -> 'ok'). -spec(clear_alarm/1 :: (any()) -> 'ok'). -spec(on_node_up/1 :: (node()) -> 'ok'). @@ -93,8 +93,8 @@ init([]) -> alarmed_nodes = dict:new(), alarms = []}}. -handle_call({register, Pid, AlertMFA}, State) -> - {ok, 0 < dict:size(State#alarms.alarmed_nodes), +handle_call({register, Pid, AlertMFA}, State = #alarms{alarmed_nodes = AN}) -> + {ok, lists:usort(lists:append([V || {_, V} <- dict:to_list(AN)])), internal_register(Pid, AlertMFA, State)}; handle_call(get_alarms, State = #alarms{alarms = Alarms}) -> @@ -104,11 +104,20 @@ handle_call(_Request, State) -> {ok, not_understood, State}. handle_event({set_alarm, Alarm}, State = #alarms{alarms = Alarms}) -> - handle_set_alarm(Alarm, State#alarms{alarms = [Alarm|Alarms]}); + case lists:member(Alarm, Alarms) of + true -> {ok, State}; + false -> UpdatedAlarms = lists:usort([Alarm|Alarms]), + handle_set_alarm(Alarm, State#alarms{alarms = UpdatedAlarms}) + end; handle_event({clear_alarm, Alarm}, State = #alarms{alarms = Alarms}) -> - handle_clear_alarm(Alarm, State#alarms{alarms = lists:keydelete(Alarm, 1, - Alarms)}); + case lists:keymember(Alarm, 1, Alarms) of + true -> handle_clear_alarm( + Alarm, State#alarms{alarms = lists:keydelete( + Alarm, 1, Alarms)}); + false -> {ok, State} + + end; handle_event({node_up, Node}, State) -> %% Must do this via notify and not call to avoid possible deadlock. @@ -118,7 +127,7 @@ handle_event({node_up, Node}, State) -> {ok, State}; handle_event({node_down, Node}, State) -> - {ok, maybe_alert(fun dict_unappend_all/3, Node, [], State)}; + {ok, maybe_alert(fun dict_unappend_all/3, Node, [], false, State)}; handle_event({register, Pid, AlertMFA}, State) -> {ok, internal_register(Pid, AlertMFA, State)}; @@ -141,45 +150,36 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- +dict_append(Key, Val, Dict) -> + L = case dict:find(Key, Dict) of + {ok, V} -> V; + error -> [] + end, + dict:store(Key, lists:usort([Val|L]), Dict). + dict_unappend_all(Key, _Val, Dict) -> dict:erase(Key, Dict). dict_unappend(Key, Val, Dict) -> - case lists:delete(Val, dict:fetch(Key, Dict)) of + L = case dict:find(Key, Dict) of + {ok, V} -> V; + error -> [] + end, + + case lists:delete(Val, L) of [] -> dict:erase(Key, Dict); X -> dict:store(Key, X, Dict) end. -count_dict_values(Val, Dict) -> - dict:fold(fun (_Node, List, Count) -> - Count + case lists:member(Val, List) of - true -> 1; - false -> 0 - end - end, 0, Dict). - -maybe_alert(UpdateFun, Node, Source, +maybe_alert(UpdateFun, Node, Source, Alert, State = #alarms{alarmed_nodes = AN, alertees = Alertees}) -> AN1 = UpdateFun(Node, Source, AN), - BeforeSz = count_dict_values(Source, AN), - AfterSz = count_dict_values(Source, AN1), - - %% If we have changed our alarm state, inform the remotes. - IsLocal = Node =:= node(), - if IsLocal andalso BeforeSz < AfterSz -> - ok = alert_remote(true, Alertees, Source); - IsLocal andalso BeforeSz > AfterSz -> - ok = alert_remote(false, Alertees, Source); - true -> - ok - end, - %% If the overall alarm state has changed, inform the locals. - case {dict:size(AN), dict:size(AN1)} of - {0, 1} -> ok = alert_local(true, Alertees, Source); - {1, 0} -> ok = alert_local(false, Alertees, Source); - {_, _} -> ok + case node() of + Node -> ok = alert_remote(Alert, Alertees, Source); + _ -> ok end, + ok = alert_local(Alert, Alertees, Source), State#alarms{alarmed_nodes = AN1}. alert_local(Alert, Alertees, Source) -> @@ -214,7 +214,7 @@ handle_set_alarm({{resource_limit, Source, Node}, []}, State) -> "*** Publishers will be blocked until this alarm clears ***~n" "**********************************************************~n", [Source, Node]), - {ok, maybe_alert(fun dict:append/3, Node, Source, State)}; + {ok, maybe_alert(fun dict_append/3, Node, Source, true, State)}; handle_set_alarm({file_descriptor_limit, []}, State) -> rabbit_log:warning( "file descriptor limit alarm set.~n~n" @@ -229,7 +229,7 @@ handle_set_alarm(Alarm, State) -> handle_clear_alarm({resource_limit, Source, Node}, State) -> rabbit_log:warning("~s resource limit alarm cleared on node ~p~n", [Source, Node]), - {ok, maybe_alert(fun dict_unappend/3, Node, Source, State)}; + {ok, maybe_alert(fun dict_unappend/3, Node, Source, false, State)}; handle_clear_alarm(file_descriptor_limit, State) -> rabbit_log:warning("file descriptor limit alarm cleared~n"), {ok, State}; diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 32feac30..8a84c9f4 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -24,9 +24,9 @@ check_exclusive_access/2, with_exclusive_access_or_die/3, stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]). -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). --export([force_event_refresh/0, wake_up/1]). +-export([force_event_refresh/0, notify_policy_changed/1]). -export([consumers/1, consumers_all/1, consumer_info_keys/0]). --export([basic_get/4, basic_consume/9, basic_cancel/4]). +-export([basic_get/4, basic_consume/10, basic_cancel/4, notify_decorators/1]). -export([notify_sent/2, notify_sent_queue_down/1, resume/2, flush_all/2]). -export([notify_down_all/2, activate_limit_all/2, credit/5]). -export([on_node_down/1]). @@ -79,7 +79,8 @@ -> queue_or_absent() | rabbit_misc:thunk(queue_or_absent())). -spec(update/2 :: (name(), - fun((rabbit_types:amqqueue()) -> rabbit_types:amqqueue())) -> 'ok'). + fun((rabbit_types:amqqueue()) -> rabbit_types:amqqueue())) + -> 'not_found' | rabbit_types:amqqueue()). -spec(lookup/1 :: (name()) -> rabbit_types:ok(rabbit_types:amqqueue()) | rabbit_types:error('not_found'); @@ -111,7 +112,7 @@ -spec(info_all/2 :: (rabbit_types:vhost(), rabbit_types:info_keys()) -> [rabbit_types:infos()]). -spec(force_event_refresh/0 :: () -> 'ok'). --spec(wake_up/1 :: (rabbit_types:amqqueue()) -> 'ok'). +-spec(notify_policy_changed/1 :: (rabbit_types:amqqueue()) -> 'ok'). -spec(consumers/1 :: (rabbit_types:amqqueue()) -> [{pid(), rabbit_types:ctag(), boolean()}]). @@ -149,12 +150,13 @@ {'ok', non_neg_integer(), qmsg()} | 'empty'). -spec(credit/5 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), non_neg_integer(), boolean()) -> 'ok'). --spec(basic_consume/9 :: +-spec(basic_consume/10 :: (rabbit_types:amqqueue(), boolean(), pid(), pid(), boolean(), - rabbit_types:ctag(), boolean(), {non_neg_integer(), boolean()} | 'none', any()) + rabbit_types:ctag(), boolean(), {non_neg_integer(), boolean()} | 'none', any(), any()) -> rabbit_types:ok_or_error('exclusive_consume_unavailable')). -spec(basic_cancel/4 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok'). +-spec(notify_decorators/1 :: (rabbit_types:amqqueue()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(notify_sent_queue_down/1 :: (pid()) -> 'ok'). -spec(resume/2 :: (pid(), pid()) -> 'ok'). @@ -184,7 +186,7 @@ %%---------------------------------------------------------------------------- -define(CONSUMER_INFO_KEYS, - [queue_name, channel_pid, consumer_tag, ack_required]). + [queue_name, channel_pid, consumer_tag, ack_required, arguments]). recover() -> %% Clear out remnants of old incarnation, in case we restarted @@ -278,9 +280,10 @@ update(Name, Fun) -> case Durable of true -> ok = mnesia:write(rabbit_durable_queue, Q1, write); _ -> ok - end; + end, + Q1; [] -> - ok + not_found end. store_queue(Q = #amqqueue{durable = true}) -> @@ -294,11 +297,15 @@ store_queue(Q = #amqqueue{durable = false}) -> ok = mnesia:write(rabbit_queue, Q, write), ok. -policy_changed(Q1, Q2) -> +policy_changed(Q1 = #amqqueue{decorators = Decorators1}, + Q2 = #amqqueue{decorators = Decorators2}) -> rabbit_mirror_queue_misc:update_mirrors(Q1, Q2), + D1 = rabbit_queue_decorator:select(Decorators1), + D2 = rabbit_queue_decorator:select(Decorators2), + [ok = M:policy_changed(Q1, Q2) || M <- lists:usort(D1 ++ D2)], %% Make sure we emit a stats event even if nothing %% mirroring-related has changed - the policy may have changed anyway. - wake_up(Q1). + notify_policy_changed(Q1). start_queue_process(Node, Q) -> {ok, Pid} = rabbit_amqqueue_sup:start_child(Node, [Q]), @@ -393,9 +400,15 @@ with_exclusive_access_or_die(Name, ReaderPid, F) -> assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args}, RequiredArgs) -> rabbit_misc:assert_args_equivalence(Args, RequiredArgs, QueueName, - [Key || {Key, _Fun} <- args()]). + [Key || {Key, _Fun} <- declare_args()]). check_declare_arguments(QueueName, Args) -> + check_arguments(QueueName, Args, declare_args()). + +check_consume_arguments(QueueName, Args) -> + check_arguments(QueueName, Args, consume_args()). + +check_arguments(QueueName, Args, Validators) -> [case rabbit_misc:table_lookup(Args, Key) of undefined -> ok; TypeVal -> case Fun(TypeVal, Args) of @@ -406,15 +419,17 @@ check_declare_arguments(QueueName, Args) -> [Key, rabbit_misc:rs(QueueName), Error]) end - end || {Key, Fun} <- args()], + end || {Key, Fun} <- Validators], ok. -args() -> +declare_args() -> [{<<"x-expires">>, fun check_expires_arg/2}, {<<"x-message-ttl">>, fun check_message_ttl_arg/2}, {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}, {<<"x-max-length">>, fun check_max_length_arg/2}]. +consume_args() -> [{<<"x-priority">>, fun check_int_arg/2}]. + check_int_arg({Type, _}, _) -> case lists:member(Type, ?INTEGER_ARG_TYPES) of true -> ok; @@ -492,7 +507,8 @@ force_event_refresh(QNames) -> force_event_refresh(Failed) end. -wake_up(#amqqueue{pid = QPid}) -> gen_server2:cast(QPid, wake_up). +notify_policy_changed(#amqqueue{pid = QPid}) -> + gen_server2:cast(QPid, policy_changed). consumers(#amqqueue{ pid = QPid }) -> delegate:call(QPid, consumers). @@ -504,8 +520,8 @@ consumers_all(VHostPath) -> map(VHostPath, fun (Q) -> [lists:zip(ConsumerInfoKeys, - [Q#amqqueue.name, ChPid, ConsumerTag, AckRequired]) || - {ChPid, ConsumerTag, AckRequired} <- consumers(Q)] + [Q#amqqueue.name, ChPid, CTag, AckRequired, Args]) || + {ChPid, CTag, AckRequired, Args} <- consumers(Q)] end)). stat(#amqqueue{pid = QPid}) -> delegate:call(QPid, stat). @@ -549,14 +565,20 @@ credit(#amqqueue{pid = QPid}, ChPid, CTag, Credit, Drain) -> basic_get(#amqqueue{pid = QPid}, ChPid, NoAck, LimiterPid) -> delegate:call(QPid, {basic_get, ChPid, NoAck, LimiterPid}). -basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, LimiterPid, LimiterActive, - ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg) -> +basic_consume(#amqqueue{pid = QPid, name = QName}, NoAck, ChPid, + LimiterPid, LimiterActive, + ConsumerTag, ExclusiveConsume, CreditArgs, OtherArgs, OkMsg) -> + ok = check_consume_arguments(QName, OtherArgs), delegate:call(QPid, {basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, - ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg}). + ConsumerTag, ExclusiveConsume, CreditArgs, OtherArgs, + OkMsg}). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). +notify_decorators(#amqqueue{pid = QPid}) -> + delegate:cast(QPid, notify_decorators). + notify_sent(QPid, ChPid) -> Key = {consumer_credit_to, QPid}, put(Key, case get(Key) of @@ -620,8 +642,7 @@ forget_all_durable(Node) -> internal_delete1(Name)) || #amqqueue{name = Name, pid = Pid} = Q <- Qs, node(Pid) =:= Node, - rabbit_policy:get(<<"ha-mode">>, Q) - =:= {error, not_found}], + rabbit_policy:get(<<"ha-mode">>, Q) =:= undefined], ok end), ok. diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 1baef6d8..4ff30ce0 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -52,10 +52,11 @@ dlx, dlx_routing_key, max_length, + args_policy_version, status }). --record(consumer, {tag, ack_required}). +-record(consumer, {tag, ack_required, args}). %% These are held in our process dictionary -record(cr, {ch_pid, @@ -136,19 +137,22 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, rate_timer_ref = RateTRef, senders = Senders, msg_id_to_channel = MTC}, - State2 = process_args(State1), - lists:foldl(fun (Delivery, StateN) -> - deliver_or_enqueue(Delivery, true, StateN) - end, State2, Deliveries). + State2 = process_args_policy(State1), + State3 = lists:foldl(fun (Delivery, StateN) -> + deliver_or_enqueue(Delivery, true, StateN) + end, State2, Deliveries), + notify_decorators(startup, [], State3), + State3. init_state(Q) -> State = #q{q = Q, exclusive_consumer = none, has_had_consumers = false, - active_consumers = queue:new(), - senders = pmon:new(), + active_consumers = priority_queue:new(), + senders = pmon:new(delegate), msg_id_to_channel = gb_trees:empty(), - status = running}, + status = running, + args_policy_version = 0}, rabbit_event:init_stats_timer(State, #q.stats_timer). terminate(shutdown = R, State = #q{backing_queue = BQ}) -> @@ -196,8 +200,10 @@ declare(Recover, From, State = #q{q = Q, BQ = backing_queue_module(Q1), BQS = bq_init(BQ, Q, Recover), recovery_barrier(Recover), - State1 = process_args(State#q{backing_queue = BQ, - backing_queue_state = BQS}), + State1 = process_args_policy( + State#q{backing_queue = BQ, + backing_queue_state = BQS}), + notify_decorators(startup, [], State), rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State1)), rabbit_event:if_enabled(State1, #q.stats_timer, @@ -222,6 +228,27 @@ matches(new, Q1, Q2) -> matches(_, Q, Q) -> true; matches(_, _Q, _Q1) -> false. +notify_decorators(Event, Props, State) when Event =:= startup; + Event =:= shutdown -> + decorator_callback(qname(State), Event, Props); + +notify_decorators(Event, Props, State = #q{active_consumers = ACs, + backing_queue = BQ, + backing_queue_state = BQS}) -> + decorator_callback( + qname(State), notify, + [Event, [{max_active_consumer_priority, priority_queue:highest(ACs)}, + {is_empty, BQ:is_empty(BQS)} | Props]]). + +decorator_callback(QName, F, A) -> + %% Look up again in case policy and hence decorators have changed + case rabbit_amqqueue:lookup(QName) of + {ok, Q = #amqqueue{decorators = Ds}} -> + [ok = apply(M, F, [Q|A]) || M <- rabbit_queue_decorator:select(Ds)]; + {error, not_found} -> + ok + end. + bq_init(BQ, Q, Recover) -> Self = self(), BQ:init(Q, Recover =/= new, @@ -238,32 +265,51 @@ recovery_barrier(BarrierPid) -> {'DOWN', MRef, process, _, _} -> ok end. -process_args(State = #q{q = #amqqueue{arguments = Arguments}}) -> - drop_expired_msgs( - lists:foldl( - fun({Arg, Fun}, State1) -> - case rabbit_misc:table_lookup(Arguments, Arg) of - {_Type, Val} -> Fun(Val, State1); - undefined -> State1 - end - end, State, - [{<<"x-expires">>, fun init_expires/2}, - {<<"x-dead-letter-exchange">>, fun init_dlx/2}, - {<<"x-dead-letter-routing-key">>, fun init_dlx_routing_key/2}, - {<<"x-message-ttl">>, fun init_ttl/2}, - {<<"x-max-length">>, fun init_max_length/2}])). - -init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}). - -init_ttl(TTL, State) -> State#q{ttl = TTL}. +process_args_policy(State = #q{q = Q, + args_policy_version = N}) -> + ArgsTable = + [{<<"expires">>, fun res_min/2, fun init_exp/2}, + {<<"dead-letter-exchange">>, fun res_arg/2, fun init_dlx/2}, + {<<"dead-letter-routing-key">>, fun res_arg/2, fun init_dlx_rkey/2}, + {<<"message-ttl">>, fun res_min/2, fun init_ttl/2}, + {<<"max-length">>, fun res_min/2, fun init_max_length/2}], + drop_expired_msgs( + lists:foldl(fun({Name, Resolve, Fun}, StateN) -> + Fun(args_policy_lookup(Name, Resolve, Q), StateN) + end, State#q{args_policy_version = N + 1}, ArgsTable)). + +args_policy_lookup(Name, Resolve, Q = #amqqueue{arguments = Args}) -> + AName = <<"x-", Name/binary>>, + case {rabbit_policy:get(Name, Q), rabbit_misc:table_lookup(Args, AName)} of + {undefined, undefined} -> undefined; + {undefined, {_Type, Val}} -> Val; + {Val, undefined} -> Val; + {PolVal, {_Type, ArgVal}} -> Resolve(PolVal, ArgVal) + end. + +res_arg(_PolVal, ArgVal) -> ArgVal. +res_min(PolVal, ArgVal) -> erlang:min(PolVal, ArgVal). + +%% In both these we init with the undefined variant first to stop any +%% existing timer, then start a new one which may fire after a +%% different time. +init_exp(undefined, State) -> stop_expiry_timer(State#q{expires = undefined}); +init_exp(Expires, State) -> State1 = init_exp(undefined, State), + ensure_expiry_timer(State1#q{expires = Expires}). +init_ttl(undefined, State) -> stop_ttl_timer(State#q{ttl = undefined}); +init_ttl(TTL, State) -> (init_ttl(undefined, State))#q{ttl = TTL}. + +init_dlx(undefined, State) -> + State#q{dlx = undefined}; init_dlx(DLX, State = #q{q = #amqqueue{name = QName}}) -> State#q{dlx = rabbit_misc:r(QName, exchange, DLX)}. -init_dlx_routing_key(RoutingKey, State) -> - State#q{dlx_routing_key = RoutingKey}. +init_dlx_rkey(RoutingKey, State) -> State#q{dlx_routing_key = RoutingKey}. -init_max_length(MaxLen, State) -> State#q{max_length = MaxLen}. +init_max_length(MaxLen, State) -> + {_Dropped, State1} = maybe_drop_head(State#q{max_length = MaxLen}), + State1. terminate_shutdown(Fun, State) -> State1 = #q{backing_queue_state = BQS} = @@ -276,6 +322,7 @@ terminate_shutdown(Fun, State) -> undefined -> State1; _ -> ok = rabbit_memory_monitor:deregister(self()), QName = qname(State), + notify_decorators(shutdown, [], State), [emit_consumer_deleted(Ch, CTag, QName) || {Ch, CTag, _} <- consumers(State1)], State1#q{backing_queue_state = Fun(BQS)} @@ -324,11 +371,12 @@ stop_rate_timer(State) -> rabbit_misc:stop_timer(State, #q.rate_timer_ref). %% configured period. ensure_expiry_timer(State = #q{expires = undefined}) -> State; -ensure_expiry_timer(State = #q{expires = Expires}) -> +ensure_expiry_timer(State = #q{expires = Expires, + args_policy_version = Version}) -> case is_unused(State) of true -> NewState = stop_expiry_timer(State), rabbit_misc:ensure_timer(NewState, #q.expiry_timer_ref, - Expires, maybe_expire); + Expires, {maybe_expire, Version}); false -> State end. @@ -336,12 +384,13 @@ stop_expiry_timer(State) -> rabbit_misc:stop_timer(State, #q.expiry_timer_ref). ensure_ttl_timer(undefined, State) -> State; -ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined}) -> +ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined, + args_policy_version = Version}) -> After = (case Expiry - now_micros() of V when V > 0 -> V + 999; %% always fire later _ -> 0 end) div 1000, - TRef = erlang:send_after(After, self(), drop_expired), + TRef = erlang:send_after(After, self(), {drop_expired, Version}), State#q{ttl_timer_ref = TRef, ttl_timer_expiry = Expiry}; ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = TRef, ttl_timer_expiry = TExpiry}) @@ -359,7 +408,7 @@ ensure_stats_timer(State) -> rabbit_event:ensure_stats_timer(State, #q.stats_timer, emit_stats). assert_invariant(State = #q{active_consumers = AC}) -> - true = (queue:is_empty(AC) orelse is_empty(State)). + true = (priority_queue:is_empty(AC) orelse is_empty(State)). is_empty(#q{backing_queue = BQ, backing_queue_state = BQS}) -> BQ:is_empty(BQS). @@ -378,7 +427,7 @@ ch_record(ChPid, LimiterPid) -> monitor_ref = MonitorRef, acktags = queue:new(), consumer_count = 0, - blocked_consumers = queue:new(), + blocked_consumers = priority_queue:new(), limiter = Limiter, unsent_message_count = 0}, put(Key, C), @@ -406,15 +455,18 @@ erase_ch_record(#cr{ch_pid = ChPid, monitor_ref = MonitorRef}) -> all_ch_record() -> [C || {{ch, _}, C} <- get()]. -block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry) -> - update_ch_record(C#cr{blocked_consumers = queue:in(QEntry, Blocked)}). +block_consumer(C = #cr{blocked_consumers = Blocked}, + {_ChPid, #consumer{tag = CTag}} = QEntry, State) -> + update_ch_record(C#cr{blocked_consumers = add_consumer(QEntry, Blocked)}), + notify_decorators(consumer_blocked, [{consumer_tag, CTag}], State). is_ch_blocked(#cr{unsent_message_count = Count, limiter = Limiter}) -> Count >= ?UNSENT_MESSAGE_LIMIT orelse rabbit_limiter:is_suspended(Limiter). maybe_send_drained(WasEmpty, State) -> case (not WasEmpty) andalso is_empty(State) of - true -> [send_drained(C) || C <- all_ch_record()]; + true -> notify_decorators(queue_empty, [], State), + [send_drained(C) || C <- all_ch_record()]; false -> ok end, State. @@ -431,42 +483,43 @@ deliver_msgs_to_consumers(_DeliverFun, true, State) -> {true, State}; deliver_msgs_to_consumers(DeliverFun, false, State = #q{active_consumers = ActiveConsumers}) -> - case queue:out(ActiveConsumers) of + case priority_queue:out_p(ActiveConsumers) of {empty, _} -> {false, State}; - {{value, QEntry}, Tail} -> + {{value, QEntry, Priority}, Tail} -> {Stop, State1} = deliver_msg_to_consumer( - DeliverFun, QEntry, + DeliverFun, QEntry, Priority, State#q{active_consumers = Tail}), deliver_msgs_to_consumers(DeliverFun, Stop, State1) end. -deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) -> +deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, Priority, State) -> C = lookup_ch(ChPid), case is_ch_blocked(C) of - true -> block_consumer(C, E), + true -> block_consumer(C, E, State), {false, State}; false -> case rabbit_limiter:can_send(C#cr.limiter, Consumer#consumer.ack_required, Consumer#consumer.tag) of {suspend, Limiter} -> - block_consumer(C#cr{limiter = Limiter}, E), + block_consumer(C#cr{limiter = Limiter}, E, State), {false, State}; {continue, Limiter} -> - AC1 = queue:in(E, State#q.active_consumers), - deliver_msg_to_consumer( + AC1 = priority_queue:in(E, Priority, + State#q.active_consumers), + deliver_msg_to_consumer0( DeliverFun, Consumer, C#cr{limiter = Limiter}, State#q{active_consumers = AC1}) end end. -deliver_msg_to_consumer(DeliverFun, - #consumer{tag = ConsumerTag, - ack_required = AckRequired}, - C = #cr{ch_pid = ChPid, - acktags = ChAckTags, - unsent_message_count = Count}, - State = #q{q = #amqqueue{name = QName}}) -> +deliver_msg_to_consumer0(DeliverFun, + #consumer{tag = ConsumerTag, + ack_required = AckRequired}, + C = #cr{ch_pid = ChPid, + acktags = ChAckTags, + unsent_message_count = Count}, + State = #q{q = #amqqueue{name = QName}}) -> {{Message, IsDelivered, AckTag}, Stop, State1} = DeliverFun(AckRequired, State), rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired, @@ -534,6 +587,13 @@ run_message_queue(State) -> is_empty(State), State), State1. +add_consumer({ChPid, Consumer = #consumer{args = Args}}, ActiveConsumers) -> + Priority = case rabbit_misc:table_lookup(Args, <<"x-priority">>) of + {_, P} -> P; + _ -> 0 + end, + priority_queue:in({ChPid, Consumer}, Priority, ActiveConsumers). + attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, Props, Delivered, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> @@ -630,17 +690,17 @@ requeue(AckTags, ChPid, State) -> fun (State1) -> requeue_and_run(AckTags, State1) end). remove_consumer(ChPid, ConsumerTag, Queue) -> - queue:filter(fun ({CP, #consumer{tag = CTag}}) -> - (CP /= ChPid) or (CTag /= ConsumerTag) - end, Queue). + priority_queue:filter(fun ({CP, #consumer{tag = CTag}}) -> + (CP /= ChPid) or (CTag /= ConsumerTag) + end, Queue). remove_consumers(ChPid, Queue, QName) -> - queue:filter(fun ({CP, #consumer{tag = CTag}}) when CP =:= ChPid -> - emit_consumer_deleted(ChPid, CTag, QName), - false; - (_) -> - true - end, Queue). + priority_queue:filter(fun ({CP, #consumer{tag = CTag}}) when CP =:= ChPid -> + emit_consumer_deleted(ChPid, CTag, QName), + false; + (_) -> + true + end, Queue). possibly_unblock(State, ChPid, Update) -> case lookup_ch(ChPid) of @@ -655,18 +715,22 @@ possibly_unblock(State, ChPid, Update) -> unblock(State, C = #cr{limiter = Limiter}) -> case lists:partition( - fun({_ChPid, #consumer{tag = CTag}}) -> + fun({_P, {_ChPid, #consumer{tag = CTag}}}) -> rabbit_limiter:is_consumer_blocked(Limiter, CTag) - end, queue:to_list(C#cr.blocked_consumers)) of + end, priority_queue:to_list(C#cr.blocked_consumers)) of {_, []} -> update_ch_record(C), State; {Blocked, Unblocked} -> - BlockedQ = queue:from_list(Blocked), - UnblockedQ = queue:from_list(Unblocked), + BlockedQ = priority_queue:from_list(Blocked), + UnblockedQ = priority_queue:from_list(Unblocked), update_ch_record(C#cr{blocked_consumers = BlockedQ}), - AC1 = queue:join(State#q.active_consumers, UnblockedQ), - run_message_queue(State#q{active_consumers = AC1}) + AC1 = priority_queue:join(State#q.active_consumers, UnblockedQ), + State1 = State#q{active_consumers = AC1}, + [notify_decorators( + consumer_unblocked, [{consumer_tag, CTag}], State1) || + {_P, {_ChPid, #consumer{tag = CTag}}} <- Unblocked], + run_message_queue(State1) end. should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false; @@ -951,8 +1015,7 @@ i(owner_pid, #q{q = #amqqueue{exclusive_owner = none}}) -> ''; i(owner_pid, #q{q = #amqqueue{exclusive_owner = ExclusiveOwner}}) -> ExclusiveOwner; -i(policy, #q{q = #amqqueue{name = Name}}) -> - {ok, Q} = rabbit_amqqueue:lookup(Name), +i(policy, #q{q = Q}) -> case rabbit_policy:name(Q) of none -> ''; Policy -> Policy @@ -1003,9 +1066,10 @@ consumers(#q{active_consumers = ActiveConsumers}) -> consumers(ActiveConsumers, []), all_ch_record()). consumers(Consumers, Acc) -> - rabbit_misc:queue_fold( - fun ({ChPid, #consumer{tag = CTag, ack_required = AckRequired}}, Acc1) -> - [{ChPid, CTag, AckRequired} | Acc1] + priority_queue:fold( + fun ({ChPid, Consumer}, _P, Acc1) -> + #consumer{tag = CTag, ack_required = Ack, args = Args} = Consumer, + [{ChPid, CTag, Ack, Args} | Acc1] end, Acc, Consumers). emit_stats(State) -> @@ -1014,13 +1078,14 @@ emit_stats(State) -> emit_stats(State, Extra) -> rabbit_event:notify(queue_stats, Extra ++ infos(?STATISTICS_KEYS, State)). -emit_consumer_created(ChPid, ConsumerTag, Exclusive, AckRequired, QName) -> +emit_consumer_created(ChPid, CTag, Exclusive, AckRequired, QName, Args) -> rabbit_event:notify(consumer_created, - [{consumer_tag, ConsumerTag}, + [{consumer_tag, CTag}, {exclusive, Exclusive}, {ack_required, AckRequired}, {channel, ChPid}, - {queue, QName}]). + {queue, QName}, + {arguments, Args}]). emit_consumer_deleted(ChPid, ConsumerTag, QName) -> rabbit_event:notify(consumer_deleted, @@ -1052,8 +1117,8 @@ prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) -> case Msg of {'DOWN', _, process, DownPid, _} -> 8; update_ram_duration -> 8; - maybe_expire -> 8; - drop_expired -> 8; + {maybe_expire, _Version} -> 8; + {drop_expired, _Version} -> 8; emit_stats -> 7; sync_timeout -> 6; _ -> 0 @@ -1063,6 +1128,10 @@ handle_call({init, Recover}, From, State = #q{q = #amqqueue{exclusive_owner = none}}) -> declare(Recover, From, State); +%% You used to be able to declare an exclusive durable queue. Sadly we +%% need to still tidy up after that case, there could be the remnants +%% of one left over from an upgrade. So that's why we don't enforce +%% Recover = false here. handle_call({init, Recover}, From, State = #q{q = #amqqueue{exclusive_owner = Owner}}) -> case rabbit_misc:is_process_alive(Owner) of @@ -1129,7 +1198,7 @@ handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From, end; handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, - ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg}, + ConsumerTag, ExclusiveConsume, CreditArgs, OtherArgs, OkMsg}, _From, State = #q{exclusive_consumer = Holder}) -> case check_exclusive_access(Holder, ExclusiveConsume, State) of in_use -> @@ -1152,8 +1221,9 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, true -> send_drained(C1); false -> ok end, - Consumer = #consumer{tag = ConsumerTag, - ack_required = not NoAck}, + Consumer = #consumer{tag = ConsumerTag, + ack_required = not NoAck, + args = OtherArgs}, ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag}; true -> Holder end, @@ -1161,9 +1231,12 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, exclusive_consumer = ExclusiveConsumer}, ok = maybe_send_reply(ChPid, OkMsg), emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, - not NoAck, qname(State1)), - AC1 = queue:in({ChPid, Consumer}, State1#q.active_consumers), - reply(ok, run_message_queue(State1#q{active_consumers = AC1})) + not NoAck, qname(State1), OtherArgs), + AC1 = add_consumer({ChPid, Consumer}, State1#q.active_consumers), + State2 = State1#q{active_consumers = AC1}, + notify_decorators( + basic_consume, [{consumer_tag, ConsumerTag}], State2), + reply(ok, run_message_queue(State2)) end; handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, @@ -1193,6 +1266,8 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, active_consumers = remove_consumer( ChPid, ConsumerTag, State#q.active_consumers)}, + notify_decorators( + basic_cancel, [{consumer_tag, ConsumerTag}], State1), case should_auto_delete(State1) of false -> reply(ok, ensure_expiry_timer(State1)); true -> stop(ok, State1) @@ -1259,10 +1334,11 @@ handle_call(force_event_refresh, _From, QName = qname(State), case Exclusive of none -> [emit_consumer_created( - Ch, CTag, false, AckRequired, QName) || - {Ch, CTag, AckRequired} <- consumers(State)]; - {Ch, CTag} -> [{Ch, CTag, AckRequired}] = consumers(State), - emit_consumer_created(Ch, CTag, true, AckRequired, QName) + Ch, CTag, false, AckRequired, QName, Args) || + {Ch, CTag, AckRequired, Args} <- consumers(State)]; + {Ch, CTag} -> [{Ch, CTag, AckRequired, Args}] = consumers(State), + emit_consumer_created( + Ch, CTag, true, AckRequired, QName, Args) end, reply(ok, State). @@ -1369,20 +1445,37 @@ handle_cast({credit, ChPid, CTag, Credit, Drain}, end end); -handle_cast(wake_up, State) -> - noreply(State). +handle_cast(notify_decorators, State) -> + notify_decorators(refresh, [], State), + noreply(State); + +handle_cast(policy_changed, State = #q{q = #amqqueue{name = Name}}) -> + %% We depend on the #q.q field being up to date at least WRT + %% policy (but not slave pids) in various places, so when it + %% changes we go and read it from Mnesia again. + %% + %% This also has the side effect of waking us up so we emit a + %% stats event - so event consumers see the changed policy. + {ok, Q} = rabbit_amqqueue:lookup(Name), + noreply(process_args_policy(State#q{q = Q})). -handle_info(maybe_expire, State) -> +handle_info({maybe_expire, Vsn}, State = #q{args_policy_version = Vsn}) -> case is_unused(State) of true -> stop(State); false -> noreply(State#q{expiry_timer_ref = undefined}) end; -handle_info(drop_expired, State) -> +handle_info({maybe_expire, _Vsn}, State) -> + noreply(State); + +handle_info({drop_expired, Vsn}, State = #q{args_policy_version = Vsn}) -> WasEmpty = is_empty(State), State1 = drop_expired_msgs(State#q{ttl_timer_ref = undefined}), noreply(maybe_send_drained(WasEmpty, State1)); +handle_info({drop_expired, _Vsn}, State) -> + noreply(State); + handle_info(emit_stats, State) -> emit_stats(State), %% Don't call noreply/1, we don't want to set timers diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl index 0515e82e..74ae59da 100644 --- a/src/rabbit_amqqueue_sup.erl +++ b/src/rabbit_amqqueue_sup.erl @@ -47,6 +47,6 @@ start_child(Node, Args) -> supervisor2:start_child({?SERVER, Node}, Args). init([]) -> - {ok, {{simple_one_for_one_terminate, 10, 10}, + {ok, {{simple_one_for_one, 10, 10}, [{rabbit_amqqueue, {rabbit_amqqueue_process, start_link, []}, temporary, ?MAX_WAIT, worker, [rabbit_amqqueue_process]}]}}. diff --git a/src/rabbit_auth_mechanism_plain.erl b/src/rabbit_auth_mechanism_plain.erl index a7e8fb36..5ab22e75 100644 --- a/src/rabbit_auth_mechanism_plain.erl +++ b/src/rabbit_auth_mechanism_plain.erl @@ -31,9 +31,8 @@ %% SASL PLAIN, as used by the Qpid Java client and our clients. Also, %% apparently, by OpenAMQ. -%% TODO: once the minimum erlang becomes R13B03, reimplement this -%% using the binary module - that makes use of BIFs to do binary -%% matching and will thus be much faster. +%% TODO: reimplement this using the binary module? - that makes use of +%% BIFs to do binary matching and will thus be much faster. description() -> [{description, <<"SASL PLAIN authentication mechanism">>}]. diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 5ff96c23..11e6bd38 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -152,7 +152,7 @@ exists(Binding) -> binding_action( Binding, fun (_Src, _Dst, B) -> rabbit_misc:const(mnesia:read({rabbit_route, B}) /= []) - end). + end, fun not_found_or_absent_errs/1). add(Binding) -> add(Binding, fun (_Src, _Dst) -> ok end). @@ -177,7 +177,7 @@ add(Binding, InnerFun) -> {error, _} = Err -> rabbit_misc:const(Err) end - end). + end, fun not_found_or_absent_errs/1). add(Src, Dst, B) -> [SrcDurable, DstDurable] = [durable(E) || E <- [Src, Dst]], @@ -200,14 +200,15 @@ remove(Binding, InnerFun) -> binding_action( Binding, fun (Src, Dst, B) -> - case mnesia:read(rabbit_route, B, write) of - [] -> rabbit_misc:const({error, binding_not_found}); - [_] -> case InnerFun(Src, Dst) of - ok -> remove(Src, Dst, B); - {error, _} = Err -> rabbit_misc:const(Err) - end + case mnesia:read(rabbit_route, B, write) =:= [] andalso + mnesia:read(rabbit_durable_route, B, write) =/= [] of + true -> rabbit_misc:const({error, binding_not_found}); + false -> case InnerFun(Src, Dst) of + ok -> remove(Src, Dst, B); + {error, _} = Err -> rabbit_misc:const(Err) + end end - end). + end, fun absent_errs_only/1). remove(Src, Dst, B) -> ok = sync_route(#route{binding = B}, durable(Src), durable(Dst), @@ -308,13 +309,13 @@ durable(#amqqueue{durable = D}) -> D. binding_action(Binding = #binding{source = SrcName, destination = DstName, - args = Arguments}, Fun) -> + args = Arguments}, Fun, ErrFun) -> call_with_source_and_destination( SrcName, DstName, fun (Src, Dst) -> SortedArgs = rabbit_misc:sort_field_table(Arguments), Fun(Src, Dst, Binding#binding{args = SortedArgs}) - end). + end, ErrFun). delete_object(Tab, Record, LockKind) -> %% this 'guarded' delete prevents unnecessary writes to the mnesia @@ -339,13 +340,9 @@ sync_transient_route(Route, Fun) -> ok = Fun(rabbit_route, Route, write), ok = Fun(rabbit_reverse_route, reverse_route(Route), write). -call_with_source_and_destination(SrcName, DstName, Fun) -> +call_with_source_and_destination(SrcName, DstName, Fun, ErrFun) -> SrcTable = table_for_resource(SrcName), DstTable = table_for_resource(DstName), - ErrFun = fun (Names) -> - Errs = [not_found_or_absent(Name) || Name <- Names], - rabbit_misc:const({error, {resources_missing, Errs}}) - end, rabbit_misc:execute_mnesia_tx_with_tail( fun () -> case {mnesia:read({SrcTable, SrcName}), @@ -357,6 +354,18 @@ call_with_source_and_destination(SrcName, DstName, Fun) -> end end). +not_found_or_absent_errs(Names) -> + Errs = [not_found_or_absent(Name) || Name <- Names], + rabbit_misc:const({error, {resources_missing, Errs}}). + +absent_errs_only(Names) -> + Errs = [E || Name <- Names, + {absent, _Q} = E <- [not_found_or_absent(Name)]], + rabbit_misc:const(case Errs of + [] -> ok; + _ -> {error, {resources_missing, Errs}} + end). + table_for_resource(#resource{kind = exchange}) -> rabbit_exchange; table_for_resource(#resource{kind = queue}) -> rabbit_queue. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 6c04f4cd..dc37959b 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -734,7 +734,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, no_ack = NoAck, exclusive = ExclusiveConsume, nowait = NoWait, - arguments = Arguments}, + arguments = Args}, _, State = #ch{conn_pid = ConnPid, limiter = Limiter, consumer_mapping = ConsumerMapping}) -> @@ -755,12 +755,13 @@ handle_method(#'basic.consume'{queue = QueueNameBin, case rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ConnPid, fun (Q) -> + {CreditArgs, OtherArgs} = parse_credit_args(Args), {rabbit_amqqueue:basic_consume( Q, NoAck, self(), rabbit_limiter:pid(Limiter), rabbit_limiter:is_active(Limiter), ActualConsumerTag, ExclusiveConsume, - parse_credit_args(Arguments), + CreditArgs, OtherArgs, ok_msg(NoWait, #'basic.consume_ok'{ consumer_tag = ActualConsumerTag})), Q} @@ -934,7 +935,7 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin, check_configure_permitted(ExchangeName, State), case rabbit_exchange:delete(ExchangeName, IfUnused) of {error, not_found} -> - rabbit_misc:not_found(ExchangeName); + return_ok(State, NoWait, #'exchange.delete_ok'{}); {error, in_use} -> precondition_failed("~s in use", [rabbit_misc:rs(ExchangeName)]); ok -> @@ -961,7 +962,7 @@ handle_method(#'exchange.unbind'{destination = DestinationNameBin, handle_method(#'queue.declare'{queue = QueueNameBin, passive = false, - durable = Durable, + durable = DurableDeclare, exclusive = ExclusiveDeclare, auto_delete = AutoDelete, nowait = NoWait, @@ -973,6 +974,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, true -> ConnPid; false -> none end, + Durable = DurableDeclare andalso not ExclusiveDeclare, ActualNameBin = case QueueNameBin of <<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(), "amq.gen"); @@ -1052,9 +1054,15 @@ handle_method(#'queue.delete'{queue = QueueNameBin, _, State = #ch{conn_pid = ConnPid}) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_configure_permitted(QueueName, State), - case rabbit_amqqueue:with_exclusive_access_or_die( - QueueName, ConnPid, - fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of + case rabbit_amqqueue:with( + QueueName, + fun (Q) -> + rabbit_amqqueue:check_exclusive_access(Q, ConnPid), + rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) + end, + fun (not_found) -> {ok, 0}; + ({absent, Q}) -> rabbit_misc:absent(Q) + end) of {error, in_use} -> precondition_failed("~s in use", [rabbit_misc:rs(QueueName)]); {error, not_empty} -> @@ -1246,12 +1254,12 @@ handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) -> parse_credit_args(Arguments) -> case rabbit_misc:table_lookup(Arguments, <<"x-credit">>) of - {table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>), - rabbit_misc:table_lookup(T, <<"drain">>)} of - {{long, Credit}, {boolean, Drain}} -> {Credit, Drain}; - _ -> none - end; - undefined -> none + {table, T} -> {case {rabbit_misc:table_lookup(T, <<"credit">>), + rabbit_misc:table_lookup(T, <<"drain">>)} of + {{long, Credit}, {bool, Drain}} -> {Credit, Drain}; + _ -> none + end, lists:keydelete(<<"x-credit">>, 1, Arguments)}; + undefined -> {none, Arguments} end. binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, diff --git a/src/rabbit_channel_sup_sup.erl b/src/rabbit_channel_sup_sup.erl index 1d9ba48b..e2c255db 100644 --- a/src/rabbit_channel_sup_sup.erl +++ b/src/rabbit_channel_sup_sup.erl @@ -43,6 +43,6 @@ start_channel(Pid, Args) -> %%---------------------------------------------------------------------------- init([]) -> - {ok, {{simple_one_for_one_terminate, 0, 1}, + {ok, {{simple_one_for_one, 0, 1}, [{channel_sup, {rabbit_channel_sup, start_link, []}, temporary, infinity, supervisor, [rabbit_channel_sup]}]}}. diff --git a/src/rabbit_client_sup.erl b/src/rabbit_client_sup.erl index d6536e16..843bb615 100644 --- a/src/rabbit_client_sup.erl +++ b/src/rabbit_client_sup.erl @@ -49,8 +49,9 @@ start_link_worker(SupName, Callback) -> supervisor2:start_link(SupName, ?MODULE, {Callback, worker}). init({M,F,A}) -> - {ok, {{simple_one_for_one_terminate, 0, 1}, + {ok, {{simple_one_for_one, 0, 1}, [{client, {M,F,A}, temporary, infinity, supervisor, [M]}]}}; init({{M,F,A}, worker}) -> - {ok, {{simple_one_for_one_terminate, 0, 1}, + {ok, {{simple_one_for_one, 0, 1}, [{client, {M,F,A}, temporary, ?MAX_WAIT, worker, [M]}]}}. + diff --git a/src/rabbit_intermediate_sup.erl b/src/rabbit_connection_helper_sup.erl index a9381f20..e51615e8 100644 --- a/src/rabbit_intermediate_sup.erl +++ b/src/rabbit_connection_helper_sup.erl @@ -11,21 +11,27 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2013-2013 GoPivotal, Inc. All rights reserved. +%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved. %% --module(rabbit_intermediate_sup). +-module(rabbit_connection_helper_sup). -behaviour(supervisor2). -export([start_link/0]). +-export([start_channel_sup_sup/1, + start_queue_collector/1]). -export([init/1]). +-include("rabbit.hrl"). + %%---------------------------------------------------------------------------- -ifdef(use_specs). -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). +-spec(start_channel_sup_sup/1 :: (pid()) -> rabbit_types:ok_pid_or_error()). +-spec(start_queue_collector/1 :: (pid()) -> rabbit_types:ok_pid_or_error()). -endif. %%---------------------------------------------------------------------------- @@ -33,7 +39,20 @@ start_link() -> supervisor2:start_link(?MODULE, []). +start_channel_sup_sup(SupPid) -> + supervisor2:start_child( + SupPid, + {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []}, + intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}). + +start_queue_collector(SupPid) -> + supervisor2:start_child( + SupPid, + {collector, {rabbit_queue_collector, start_link, []}, + intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}). + %%---------------------------------------------------------------------------- init([]) -> {ok, {{one_for_one, 10, 10}, []}}. + diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl index c1fa17aa..9ed5dc77 100644 --- a/src/rabbit_connection_sup.erl +++ b/src/rabbit_connection_sup.erl @@ -37,27 +37,25 @@ start_link() -> {ok, SupPid} = supervisor2:start_link(?MODULE, []), - {ok, Collector} = - supervisor2:start_child( - SupPid, - {collector, {rabbit_queue_collector, start_link, []}, - intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}), %% We need to get channels in the hierarchy here so they get shut %% down after the reader, so the reader gets a chance to terminate %% them cleanly. But for 1.0 readers we can't start the real %% ch_sup_sup (because we don't know if we will be 0-9-1 or 1.0) - %% so we add another supervisor into the hierarchy. - {ok, ChannelSup3Pid} = + %% + %% This supervisor also acts as an intermediary for heartbeaters and + %% the queue collector process, since these must not be siblings of the + %% reader due to the potential for deadlock if they are added/restarted + %% whilst the supervision tree is shutting down. + {ok, HelperSup} = supervisor2:start_child( SupPid, - {channel_sup3, {rabbit_intermediate_sup, start_link, []}, - intrinsic, infinity, supervisor, [rabbit_intermediate_sup]}), + {helper_sup, {rabbit_connection_helper_sup, start_link, []}, + intrinsic, infinity, supervisor, [rabbit_connection_helper_sup]}), {ok, ReaderPid} = supervisor2:start_child( SupPid, - {reader, {rabbit_reader, start_link, - [ChannelSup3Pid, Collector, - rabbit_heartbeat:start_heartbeat_fun(SupPid)]}, + {reader, {rabbit_reader, start_link, [HelperSup]}, intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}), {ok, SupPid, ReaderPid}. diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index 0b666a36..6f36f99d 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -17,7 +17,8 @@ -module(rabbit_control_main). -include("rabbit.hrl"). --export([start/0, stop/0, action/5, sync_queue/1, cancel_sync_queue/1]). +-export([start/0, stop/0, parse_arguments/2, action/5, + sync_queue/1, cancel_sync_queue/1]). -define(RPC_TIMEOUT, infinity). -define(EXTERNAL_CHECK_INTERVAL, 1000). @@ -25,12 +26,16 @@ -define(QUIET_OPT, "-q"). -define(NODE_OPT, "-n"). -define(VHOST_OPT, "-p"). +-define(PRIORITY_OPT, "--priority"). +-define(APPLY_TO_OPT, "--apply-to"). -define(RAM_OPT, "--ram"). -define(OFFLINE_OPT, "--offline"). -define(QUIET_DEF, {?QUIET_OPT, flag}). -define(NODE_DEF(Node), {?NODE_OPT, {option, Node}}). -define(VHOST_DEF, {?VHOST_OPT, {option, "/"}}). +-define(PRIORITY_DEF, {?PRIORITY_OPT, {option, "0"}}). +-define(APPLY_TO_DEF, {?APPLY_TO_OPT, {option, "all"}}). -define(RAM_DEF, {?RAM_OPT, flag}). -define(OFFLINE_DEF, {?OFFLINE_OPT, flag}). @@ -72,7 +77,7 @@ {clear_parameter, [?VHOST_DEF]}, {list_parameters, [?VHOST_DEF]}, - {set_policy, [?VHOST_DEF]}, + {set_policy, [?VHOST_DEF, ?PRIORITY_DEF, ?APPLY_TO_DEF]}, {clear_policy, [?VHOST_DEF]}, {list_policies, [?VHOST_DEF]}, @@ -127,19 +132,13 @@ start() -> {ok, [[NodeStr|_]|_]} = init:get_argument(nodename), {Command, Opts, Args} = - case rabbit_misc:parse_arguments(?COMMANDS, ?GLOBAL_DEFS(NodeStr), - init:get_plain_arguments()) - of + case parse_arguments(init:get_plain_arguments(), NodeStr) of {ok, Res} -> Res; no_command -> print_error("could not recognise command", []), usage() end, - Opts1 = [case K of - ?NODE_OPT -> {?NODE_OPT, rabbit_nodes:make(V)}; - _ -> {K, V} - end || {K, V} <- Opts], - Quiet = proplists:get_bool(?QUIET_OPT, Opts1), - Node = proplists:get_value(?NODE_OPT, Opts1), + Quiet = proplists:get_bool(?QUIET_OPT, Opts), + Node = proplists:get_value(?NODE_OPT, Opts), Inform = case Quiet of true -> fun (_Format, _Args1) -> ok end; false -> fun (Format, Args1) -> @@ -230,6 +229,19 @@ usage() -> io:format("~s", [rabbit_ctl_usage:usage()]), rabbit_misc:quit(1). +parse_arguments(CmdLine, NodeStr) -> + case rabbit_misc:parse_arguments( + ?COMMANDS, ?GLOBAL_DEFS(NodeStr), CmdLine) of + {ok, {Cmd, Opts0, Args}} -> + Opts = [case K of + ?NODE_OPT -> {?NODE_OPT, rabbit_nodes:make(V)}; + _ -> {K, V} + end || {K, V} <- Opts0], + {ok, {Cmd, Opts, Args}}; + E -> + E + end. + %%---------------------------------------------------------------------------- action(stop, Node, Args, _Opts, Inform) -> @@ -484,16 +496,15 @@ action(list_parameters, Node, [], Opts, Inform) -> rpc_call(Node, rabbit_runtime_parameters, list_formatted, [VHostArg]), rabbit_runtime_parameters:info_keys()); -action(set_policy, Node, [Key, Pattern, Defn | Prio], Opts, Inform) - when Prio == [] orelse length(Prio) == 1 -> - Msg = "Setting policy ~p for pattern ~p to ~p", - {InformMsg, Prio1} = case Prio of [] -> {Msg, undefined}; - [P] -> {Msg ++ " with priority ~s", P} - end, +action(set_policy, Node, [Key, Pattern, Defn], Opts, Inform) -> + Msg = "Setting policy ~p for pattern ~p to ~p with priority ~p", VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), - Inform(InformMsg, [Key, Pattern, Defn] ++ Prio), - rpc_call(Node, rabbit_policy, parse_set, - [VHostArg, list_to_binary(Key), Pattern, Defn, Prio1]); + PriorityArg = proplists:get_value(?PRIORITY_OPT, Opts), + ApplyToArg = list_to_binary(proplists:get_value(?APPLY_TO_OPT, Opts)), + Inform(Msg, [Key, Pattern, Defn, PriorityArg]), + rpc_call( + Node, rabbit_policy, parse_set, + [VHostArg, list_to_binary(Key), Pattern, Defn, PriorityArg, ApplyToArg]); action(clear_policy, Node, [Key], Opts, Inform) -> VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index 9002514f..5a004792 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -35,8 +35,10 @@ {rabbit_types:username(), rabbit_types:password()}), rabbit_types:vhost(), rabbit_types:protocol(), pid(), rabbit_event:event_props()) -> - {'ok', {rabbit_types:user(), - rabbit_framing:amqp_table()}}). + rabbit_types:ok_or_error2( + {rabbit_types:user(), rabbit_framing:amqp_table()}, + 'broker_not_found_on_node' | + {'auth_failure', string()} | 'access_refused')). -spec(start_channel/9 :: (rabbit_channel:channel_number(), pid(), pid(), string(), rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(), @@ -76,21 +78,24 @@ connect(User = #user{}, VHost, Protocol, Pid, Infos) -> end; connect({Username, Password}, VHost, Protocol, Pid, Infos) -> - connect0(check_user_pass_login, Username, Password, VHost, Protocol, Pid, - Infos); + connect0(fun () -> rabbit_access_control:check_user_pass_login( + Username, Password) end, + VHost, Protocol, Pid, Infos); connect(Username, VHost, Protocol, Pid, Infos) -> - connect0(check_user_login, Username, [], VHost, Protocol, Pid, Infos). + connect0(fun () -> rabbit_access_control:check_user_login( + Username, []) end, + VHost, Protocol, Pid, Infos). -connect0(FunctionName, U, P, VHost, Protocol, Pid, Infos) -> +connect0(AuthFun, VHost, Protocol, Pid, Infos) -> case rabbit:is_running() of - true -> - case rabbit_access_control:FunctionName(U, P) of - {ok, User} -> connect(User, VHost, Protocol, Pid, Infos); - {refused, _M, _A} -> {error, auth_failure} - end; - false -> - {error, broker_not_found_on_node} + true -> case AuthFun() of + {ok, User} -> + connect(User, VHost, Protocol, Pid, Infos); + {refused, _M, _A} -> + {error, {auth_failure, "Refused"}} + end; + false -> {error, broker_not_found_on_node} end. diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl index 5aaa1b2d..f153641e 100644 --- a/src/rabbit_disk_monitor.erl +++ b/src/rabbit_disk_monitor.erl @@ -23,16 +23,22 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([get_disk_free_limit/0, set_disk_free_limit/1, get_check_interval/0, - set_check_interval/1, get_disk_free/0]). +-export([get_disk_free_limit/0, set_disk_free_limit/1, + get_min_check_interval/0, set_min_check_interval/1, + get_max_check_interval/0, set_max_check_interval/1, + get_disk_free/0]). -define(SERVER, ?MODULE). --define(DEFAULT_DISK_CHECK_INTERVAL, 10000). +-define(DEFAULT_MIN_DISK_CHECK_INTERVAL, 100). +-define(DEFAULT_MAX_DISK_CHECK_INTERVAL, 10000). +%% 250MB/s i.e. 250kB/ms +-define(FAST_RATE, (250 * 1000)). -record(state, {dir, limit, actual, - timeout, + min_interval, + max_interval, timer, alarmed }). @@ -45,8 +51,10 @@ -spec(start_link/1 :: (disk_free_limit()) -> rabbit_types:ok_pid_or_error()). -spec(get_disk_free_limit/0 :: () -> integer()). -spec(set_disk_free_limit/1 :: (disk_free_limit()) -> 'ok'). --spec(get_check_interval/0 :: () -> integer()). --spec(set_check_interval/1 :: (integer()) -> 'ok'). +-spec(get_min_check_interval/0 :: () -> integer()). +-spec(set_min_check_interval/1 :: (integer()) -> 'ok'). +-spec(get_max_check_interval/0 :: () -> integer()). +-spec(set_max_check_interval/1 :: (integer()) -> 'ok'). -spec(get_disk_free/0 :: () -> (integer() | 'unknown')). -endif. @@ -61,11 +69,17 @@ get_disk_free_limit() -> set_disk_free_limit(Limit) -> gen_server:call(?MODULE, {set_disk_free_limit, Limit}, infinity). -get_check_interval() -> - gen_server:call(?MODULE, get_check_interval, infinity). +get_min_check_interval() -> + gen_server:call(?MODULE, get_min_check_interval, infinity). -set_check_interval(Interval) -> - gen_server:call(?MODULE, {set_check_interval, Interval}, infinity). +set_min_check_interval(Interval) -> + gen_server:call(?MODULE, {set_min_check_interval, Interval}, infinity). + +get_max_check_interval() -> + gen_server:call(?MODULE, get_max_check_interval, infinity). + +set_max_check_interval(Interval) -> + gen_server:call(?MODULE, {set_max_check_interval, Interval}, infinity). get_disk_free() -> gen_server:call(?MODULE, get_disk_free, infinity). @@ -78,16 +92,15 @@ start_link(Args) -> gen_server:start_link({local, ?SERVER}, ?MODULE, [Args], []). init([Limit]) -> - TRef = start_timer(?DEFAULT_DISK_CHECK_INTERVAL), Dir = dir(), - State = #state { dir = Dir, - timeout = ?DEFAULT_DISK_CHECK_INTERVAL, - timer = TRef, - alarmed = false}, + State = #state{dir = Dir, + min_interval = ?DEFAULT_MIN_DISK_CHECK_INTERVAL, + max_interval = ?DEFAULT_MAX_DISK_CHECK_INTERVAL, + alarmed = false}, case {catch get_disk_free(Dir), vm_memory_monitor:get_total_memory()} of {N1, N2} when is_integer(N1), is_integer(N2) -> - {ok, set_disk_limits(State, Limit)}; + {ok, start_timer(set_disk_limits(State, Limit))}; Err -> rabbit_log:info("Disabling disk free space monitoring " "on unsupported platform: ~p~n", [Err]), @@ -100,12 +113,17 @@ handle_call(get_disk_free_limit, _From, State) -> handle_call({set_disk_free_limit, Limit}, _From, State) -> {reply, ok, set_disk_limits(State, Limit)}; -handle_call(get_check_interval, _From, State) -> - {reply, State#state.timeout, State}; +handle_call(get_min_check_interval, _From, State) -> + {reply, State#state.min_interval, State}; + +handle_call(get_max_check_interval, _From, State) -> + {reply, State#state.max_interval, State}; + +handle_call({set_min_check_interval, MinInterval}, _From, State) -> + {reply, ok, State#state{min_interval = MinInterval}}; -handle_call({set_check_interval, Timeout}, _From, State) -> - {ok, cancel} = timer:cancel(State#state.timer), - {reply, ok, State#state{timeout = Timeout, timer = start_timer(Timeout)}}; +handle_call({set_max_check_interval, MaxInterval}, _From, State) -> + {reply, ok, State#state{max_interval = MaxInterval}}; handle_call(get_disk_free, _From, State = #state { actual = Actual }) -> {reply, Actual, State}; @@ -117,7 +135,7 @@ handle_cast(_Request, State) -> {noreply, State}. handle_info(update, State) -> - {noreply, internal_update(State)}; + {noreply, start_timer(internal_update(State))}; handle_info(_Info, State) -> {noreply, State}. @@ -193,6 +211,15 @@ emit_update_info(StateStr, CurrentFree, Limit) -> "Disk free space ~s. Free bytes:~p Limit:~p~n", [StateStr, CurrentFree, Limit]). -start_timer(Timeout) -> - {ok, TRef} = timer:send_interval(Timeout, update), - TRef. +start_timer(State) -> + State#state{timer = erlang:send_after(interval(State), self(), update)}. + +interval(#state{alarmed = true, + max_interval = MaxInterval}) -> + MaxInterval; +interval(#state{limit = Limit, + actual = Actual, + min_interval = MinInterval, + max_interval = MaxInterval}) -> + IdealInterval = 2 * (Actual - Limit) / ?FAST_RATE, + trunc(erlang:max(MinInterval, erlang:min(MaxInterval, IdealInterval))). diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl index 4d3ddc79..a713d76b 100644 --- a/src/rabbit_event.erl +++ b/src/rabbit_event.erl @@ -141,8 +141,6 @@ notify_if(true, Type, Props) -> notify(Type, Props); notify_if(false, _Type, _Props) -> ok. notify(Type, Props) -> - %% TODO: switch to os:timestamp() when we drop support for - %% Erlang/OTP < R13B01 gen_event:notify(?MODULE, #event{type = Type, props = Props, - timestamp = now()}). + timestamp = os:timestamp()}). diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 49952a4d..fc131519 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -347,12 +347,11 @@ route1(Delivery, Decorators, lists:foldl(fun process_route/2, {WorkList, SeenXs, QNames}, AlternateDests ++ DecorateDests ++ ExchangeDests)). -process_alternate(#exchange{arguments = []}, _Results) -> %% optimisation - []; -process_alternate(#exchange{name = XName, arguments = Args}, []) -> - case rabbit_misc:r_arg(XName, exchange, Args, <<"alternate-exchange">>) of +process_alternate(X = #exchange{name = XName}, []) -> + case rabbit_policy:get_arg( + <<"alternate-exchange">>, <<"alternate-exchange">>, X) of undefined -> []; - AName -> [AName] + AName -> [rabbit_misc:r(XName, exchange, AName)] end; process_alternate(_X, _Results) -> []. diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl index baec9c29..c841560e 100644 --- a/src/rabbit_exchange_type_headers.erl +++ b/src/rabbit_exchange_type_headers.erl @@ -103,17 +103,15 @@ headers_match([{PK, _PT, _PV} | PRest], Data = [{DK, _DT, _DV} | _], headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest], AllMatch, AnyMatch, MatchKind) when PK == DK -> {AllMatch1, AnyMatch1} = - if + case rabbit_misc:type_class(PT) == rabbit_misc:type_class(DT) of %% It's not properly specified, but a "no value" in a %% pattern field is supposed to mean simple presence of %% the corresponding data field. I've interpreted that to %% mean a type of "void" for the pattern field. - PT == void -> {AllMatch, true}; - %% Similarly, it's not specified, but I assume that a - %% mismatched type causes a mismatched value. - PT =/= DT -> {false, AnyMatch}; - PV == DV -> {AllMatch, true}; - true -> {false, AnyMatch} + _ when PT == void -> {AllMatch, true}; + false -> {false, AnyMatch}; + _ when PV == DV -> {AllMatch, true}; + _ -> {false, AnyMatch} end, headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind). diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index df9baed9..ca67254b 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -16,8 +16,11 @@ -module(rabbit_heartbeat). +-export([start/6]). -export([start_heartbeat_sender/3, start_heartbeat_receiver/3, - start_heartbeat_fun/1, pause_monitor/1, resume_monitor/1]). + pause_monitor/1, resume_monitor/1]). + +-export([system_continue/3, system_terminate/4, system_code_change/4]). -include("rabbit.hrl"). @@ -26,16 +29,15 @@ -ifdef(use_specs). -export_type([heartbeaters/0]). --export_type([start_heartbeat_fun/0]). -type(heartbeaters() :: {rabbit_types:maybe(pid()), rabbit_types:maybe(pid())}). -type(heartbeat_callback() :: fun (() -> any())). --type(start_heartbeat_fun() :: - fun((rabbit_net:socket(), non_neg_integer(), heartbeat_callback(), - non_neg_integer(), heartbeat_callback()) -> - no_return())). +-spec(start/6 :: + (pid(), rabbit_net:socket(), + non_neg_integer(), heartbeat_callback(), + non_neg_integer(), heartbeat_callback()) -> heartbeaters()). -spec(start_heartbeat_sender/3 :: (rabbit_net:socket(), non_neg_integer(), heartbeat_callback()) -> @@ -44,17 +46,28 @@ (rabbit_net:socket(), non_neg_integer(), heartbeat_callback()) -> rabbit_types:ok(pid())). --spec(start_heartbeat_fun/1 :: - (pid()) -> start_heartbeat_fun()). - - -spec(pause_monitor/1 :: (heartbeaters()) -> 'ok'). -spec(resume_monitor/1 :: (heartbeaters()) -> 'ok'). +-spec(system_code_change/4 :: (_,_,_,_) -> {'ok',_}). +-spec(system_continue/3 :: (_,_,{_, _}) -> any()). +-spec(system_terminate/4 :: (_,_,_,_) -> none()). + -endif. %%---------------------------------------------------------------------------- +start(SupPid, Sock, SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) -> + {ok, Sender} = + start_heartbeater(SendTimeoutSec, SupPid, Sock, + SendFun, heartbeat_sender, + start_heartbeat_sender), + {ok, Receiver} = + start_heartbeater(ReceiveTimeoutSec, SupPid, Sock, + ReceiveFun, heartbeat_receiver, + start_heartbeat_receiver), + {Sender, Receiver}. + start_heartbeat_sender(Sock, TimeoutSec, SendFun) -> %% the 'div 2' is there so that we don't end up waiting for nearly %% 2 * TimeoutSec before sending a heartbeat in the boundary case @@ -69,25 +82,21 @@ start_heartbeat_receiver(Sock, TimeoutSec, ReceiveFun) -> heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1, fun () -> ReceiveFun(), stop end}). -start_heartbeat_fun(SupPid) -> - fun (Sock, SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) -> - {ok, Sender} = - start_heartbeater(SendTimeoutSec, SupPid, Sock, - SendFun, heartbeat_sender, - start_heartbeat_sender), - {ok, Receiver} = - start_heartbeater(ReceiveTimeoutSec, SupPid, Sock, - ReceiveFun, heartbeat_receiver, - start_heartbeat_receiver), - {Sender, Receiver} - end. - pause_monitor({_Sender, none}) -> ok; pause_monitor({_Sender, Receiver}) -> Receiver ! pause, ok. resume_monitor({_Sender, none}) -> ok; resume_monitor({_Sender, Receiver}) -> Receiver ! resume, ok. +system_continue(_Parent, Deb, {Params, State}) -> + heartbeater(Params, Deb, State). + +system_terminate(Reason, _Parent, _Deb, _State) -> + exit(Reason). + +system_code_change(Misc, _Module, _OldVsn, _Extra) -> + {ok, Misc}. + %%---------------------------------------------------------------------------- start_heartbeater(0, _SupPid, _Sock, _TimeoutFun, _Name, _Callback) -> {ok, none}; @@ -98,17 +107,27 @@ start_heartbeater(TimeoutSec, SupPid, Sock, TimeoutFun, Name, Callback) -> transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}). heartbeater(Params) -> - {ok, proc_lib:spawn_link(fun () -> heartbeater(Params, {0, 0}) end)}. + Deb = sys:debug_options([]), + {ok, proc_lib:spawn_link(fun () -> heartbeater(Params, Deb, {0, 0}) end)}. heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params, - {StatVal, SameCount}) -> - Recurse = fun (V) -> heartbeater(Params, V) end, + Deb, {StatVal, SameCount} = State) -> + Recurse = fun (State1) -> heartbeater(Params, Deb, State1) end, + System = fun (From, Req) -> + sys:handle_system_msg( + Req, From, self(), ?MODULE, Deb, {Params, State}) + end, receive - pause -> receive - resume -> Recurse({0, 0}); - Other -> exit({unexpected_message, Other}) - end; - Other -> exit({unexpected_message, Other}) + pause -> + receive + resume -> Recurse({0, 0}); + {system, From, Req} -> System(From, Req); + Other -> exit({unexpected_message, Other}) + end; + {system, From, Req} -> + System(From, Req); + Other -> + exit({unexpected_message, Other}) after TimeoutMillisec -> case rabbit_net:getstat(Sock, [StatName]) of {ok, [{StatName, NewStatVal}]} -> diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl index 495f6fdd..4bd1a575 100644 --- a/src/rabbit_memory_monitor.erl +++ b/src/rabbit_memory_monitor.erl @@ -25,7 +25,7 @@ -behaviour(gen_server2). -export([start_link/0, register/2, deregister/1, - report_ram_duration/2, stop/0]). + report_ram_duration/2, stop/0, conserve_resources/3]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -36,24 +36,14 @@ queue_durations, %% ets #process queue_duration_sum, %% sum of all queue_durations queue_duration_count, %% number of elements in sum - desired_duration %% the desired queue duration + desired_duration, %% the desired queue duration + disk_alarm %% disable paging, disk alarm has fired }). -define(SERVER, ?MODULE). -define(DEFAULT_UPDATE_INTERVAL, 2500). -define(TABLE_NAME, ?MODULE). -%% Because we have a feedback loop here, we need to ensure that we -%% have some space for when the queues don't quite respond as fast as -%% we would like, or when there is buffering going on in other parts -%% of the system. In short, we aim to stay some distance away from -%% when the memory alarms will go off, which cause backpressure (of -%% some sort) on producers. Note that all other Thresholds are -%% relative to this scaling. --define(MEMORY_LIMIT_SCALING, 0.4). - --define(LIMIT_THRESHOLD, 0.5). %% don't limit queues when mem use is < this - %% If all queues are pushed to disk (duration 0), then the sum of %% their reported lengths will be 0. If memory then becomes available, %% unless we manually intervene, the sum will remain 0, and the queues @@ -97,6 +87,11 @@ report_ram_duration(Pid, QueueDuration) -> stop() -> gen_server2:cast(?SERVER, stop). +conserve_resources(Pid, disk, Conserve) -> + gen_server2:cast(Pid, {disk_alarm, Conserve}); +conserve_resources(_Pid, _Source, _Conserve) -> + ok. + %%---------------------------------------------------------------------------- %% Gen_server callbacks %%---------------------------------------------------------------------------- @@ -105,13 +100,14 @@ init([]) -> {ok, TRef} = timer:send_interval(?DEFAULT_UPDATE_INTERVAL, update), Ets = ets:new(?TABLE_NAME, [set, private, {keypos, #process.pid}]), - + Alarms = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), {ok, internal_update( #state { timer = TRef, queue_durations = Ets, queue_duration_sum = 0.0, queue_duration_count = 0, - desired_duration = infinity })}. + desired_duration = infinity, + disk_alarm = lists:member(disk, Alarms)})}. handle_call({report_ram_duration, Pid, QueueDuration}, From, State = #state { queue_duration_sum = Sum, @@ -148,6 +144,12 @@ handle_call({register, Pid, MFA}, _From, handle_call(_Request, _From, State) -> {noreply, State}. +handle_cast({disk_alarm, Alarm}, State = #state{disk_alarm = Alarm}) -> + {noreply, State}; + +handle_cast({disk_alarm, Alarm}, State) -> + {noreply, internal_update(State#state{disk_alarm = Alarm})}; + handle_cast({deregister, Pid}, State) -> {noreply, internal_deregister(Pid, true, State)}; @@ -203,55 +205,65 @@ internal_deregister(Pid, Demonitor, queue_duration_count = Count1 } end. -internal_update(State = #state { queue_durations = Durations, - desired_duration = DesiredDurationAvg, - queue_duration_sum = Sum, - queue_duration_count = Count }) -> - MemoryLimit = ?MEMORY_LIMIT_SCALING * vm_memory_monitor:get_memory_limit(), +internal_update(State = #state{queue_durations = Durations, + desired_duration = DesiredDurationAvg, + disk_alarm = DiskAlarm}) -> + DesiredDurationAvg1 = desired_duration_average(State), + ShouldInform = should_inform_predicate(DiskAlarm), + case ShouldInform(DesiredDurationAvg, DesiredDurationAvg1) of + true -> inform_queues(ShouldInform, DesiredDurationAvg1, Durations); + false -> ok + end, + State#state{desired_duration = DesiredDurationAvg1}. + +desired_duration_average(#state{disk_alarm = true}) -> + infinity; +desired_duration_average(#state{disk_alarm = false, + queue_duration_sum = Sum, + queue_duration_count = Count}) -> + {ok, LimitThreshold} = + application:get_env(rabbit, vm_memory_high_watermark_paging_ratio), + MemoryLimit = vm_memory_monitor:get_memory_limit(), MemoryRatio = case MemoryLimit > 0.0 of true -> erlang:memory(total) / MemoryLimit; false -> infinity end, - DesiredDurationAvg1 = - if MemoryRatio =:= infinity -> - 0.0; - MemoryRatio < ?LIMIT_THRESHOLD orelse Count == 0 -> - infinity; - MemoryRatio < ?SUM_INC_THRESHOLD -> - ((Sum + ?SUM_INC_AMOUNT) / Count) / MemoryRatio; - true -> - (Sum / Count) / MemoryRatio - end, - State1 = State #state { desired_duration = DesiredDurationAvg1 }, - - %% only inform queues immediately if the desired duration has - %% decreased - case DesiredDurationAvg1 == infinity orelse - (DesiredDurationAvg /= infinity andalso - DesiredDurationAvg1 >= DesiredDurationAvg) of - true -> - ok; - false -> - true = - ets:foldl( - fun (Proc = #process { reported = QueueDuration, - sent = PrevSendDuration, - callback = {M, F, A} }, true) -> - case should_send(QueueDuration, PrevSendDuration, - DesiredDurationAvg1) of - true -> ok = erlang:apply( - M, F, A ++ [DesiredDurationAvg1]), - ets:insert( - Durations, - Proc #process { - sent = DesiredDurationAvg1}); - false -> true - end - end, true, Durations) - end, - State1. + if MemoryRatio =:= infinity -> + 0.0; + MemoryRatio < LimitThreshold orelse Count == 0 -> + infinity; + MemoryRatio < ?SUM_INC_THRESHOLD -> + ((Sum + ?SUM_INC_AMOUNT) / Count) / MemoryRatio; + true -> + (Sum / Count) / MemoryRatio + end. -should_send(infinity, infinity, _) -> true; -should_send(infinity, D, DD) -> DD < D; -should_send(D, infinity, DD) -> DD < D; -should_send(D1, D2, DD) -> DD < lists:min([D1, D2]). +inform_queues(ShouldInform, DesiredDurationAvg, Durations) -> + true = + ets:foldl( + fun (Proc = #process{reported = QueueDuration, + sent = PrevSendDuration, + callback = {M, F, A}}, true) -> + case ShouldInform(PrevSendDuration, DesiredDurationAvg) + andalso ShouldInform(QueueDuration, DesiredDurationAvg) of + true -> ok = erlang:apply( + M, F, A ++ [DesiredDurationAvg]), + ets:insert( + Durations, + Proc#process{sent = DesiredDurationAvg}); + false -> true + end + end, true, Durations). + +%% In normal use, we only inform queues immediately if the desired +%% duration has decreased, we want to ensure timely paging. +should_inform_predicate(false) -> fun greater_than/2; +%% When the disk alarm has gone off though, we want to inform queues +%% immediately if the desired duration has *increased* - we want to +%% ensure timely stopping paging. +should_inform_predicate(true) -> fun (D1, D2) -> greater_than(D2, D1) end. + +greater_than(infinity, infinity) -> false; +greater_than(infinity, _D2) -> true; +greater_than(_D1, infinity) -> false; +greater_than(D1, D2) -> D1 > D2. diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index ab466c3c..a0e8bcc6 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -222,20 +222,19 @@ %% sender_death message to all the slaves, saying the sender has %% died. Once the slaves receive the sender_death message, they know %% that they're not going to receive any more instructions from the gm -%% regarding that sender, thus they throw away any publications from -%% the sender pending publication instructions. However, it is -%% possible that the coordinator receives the DOWN and communicates -%% that to the master before the master has finished receiving and -%% processing publishes from the sender. This turns out not to be a -%% problem: the sender has actually died, and so will not need to -%% receive confirms or other feedback, and should further messages be -%% "received" from the sender, the master will ask the coordinator to -%% set up a new monitor, and will continue to process the messages -%% normally. Slaves may thus receive publishes via gm from previously -%% declared "dead" senders, but again, this is fine: should the slave -%% have just thrown out the message it had received directly from the -%% sender (due to receiving a sender_death message via gm), it will be -%% able to cope with the publication purely from the master via gm. +%% regarding that sender. However, it is possible that the coordinator +%% receives the DOWN and communicates that to the master before the +%% master has finished receiving and processing publishes from the +%% sender. This turns out not to be a problem: the sender has actually +%% died, and so will not need to receive confirms or other feedback, +%% and should further messages be "received" from the sender, the +%% master will ask the coordinator to set up a new monitor, and +%% will continue to process the messages normally. Slaves may thus +%% receive publishes via gm from previously declared "dead" senders, +%% but again, this is fine: should the slave have just thrown out the +%% message it had received directly from the sender (due to receiving +%% a sender_death message via gm), it will be able to cope with the +%% publication purely from the master via gm. %% %% When a slave receives a DOWN message for a sender, if it has not %% received the sender_death message from the master via gm already, diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 4ea1d984..8ad7c62f 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -204,8 +204,6 @@ start_child(Name, MirrorNode, Q) -> report_deaths(_MirrorPid, _IsMaster, _QueueName, []) -> ok; report_deaths(MirrorPid, IsMaster, QueueName, DeadPids) -> - rabbit_event:notify(queue_mirror_deaths, [{name, QueueName}, - {pids, DeadPids}]), rabbit_log:info("Mirrored-queue (~s): ~s ~s saw deaths of mirrors ~s~n", [rabbit_misc:rs(QueueName), case IsMaster of @@ -223,7 +221,7 @@ store_updated_slaves(Q = #amqqueue{slave_pids = SPids, Q1 = Q#amqqueue{sync_slave_pids = SSPids1}, ok = rabbit_amqqueue:store_queue(Q1), %% Wake it up so that we emit a stats event - rabbit_amqqueue:wake_up(Q1), + rabbit_amqqueue:notify_policy_changed(Q1), Q1. %%---------------------------------------------------------------------------- @@ -239,28 +237,32 @@ suggested_queue_nodes(Q) -> %% This variant exists so we can pull a call to %% rabbit_mnesia:cluster_nodes(running) out of a loop or %% transaction or both. -suggested_queue_nodes(Q, All) -> +suggested_queue_nodes(Q = #amqqueue{exclusive_owner = Owner}, All) -> {MNode0, SNodes, SSNodes} = actual_queue_nodes(Q), MNode = case MNode0 of none -> node(); _ -> MNode0 end, - Params = policy(<<"ha-params">>, Q), - case module(Q) of - {ok, M} -> M:suggested_queue_nodes(Params, MNode, SNodes, SSNodes, All); - _ -> {MNode, []} + case Owner of + none -> Params = policy(<<"ha-params">>, Q), + case module(Q) of + {ok, M} -> M:suggested_queue_nodes( + Params, MNode, SNodes, SSNodes, All); + _ -> {MNode, []} + end; + _ -> {MNode, []} end. policy(Policy, Q) -> case rabbit_policy:get(Policy, Q) of - {ok, P} -> P; - _ -> none + undefined -> none; + P -> P end. module(#amqqueue{} = Q) -> case rabbit_policy:get(<<"ha-mode">>, Q) of - {ok, Mode} -> module(Mode); - _ -> not_mirrored + undefined -> not_mirrored; + Mode -> module(Mode) end; module(Mode) when is_binary(Mode) -> diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index a5d1f68e..b1a86493 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -41,15 +41,13 @@ %%---------------------------------------------------------------------------- --define(CREATION_EVENT_KEYS, +-define(INFO_KEYS, [pid, name, master_pid, is_synchronised ]). --define(INFO_KEYS, ?CREATION_EVENT_KEYS). - -define(SYNC_INTERVAL, 25). %% milliseconds -define(RAM_DURATION_UPDATE_INTERVAL, 5000). -define(DEATH_TIMEOUT, 20000). %% 20 seconds @@ -61,7 +59,7 @@ sync_timer_ref, rate_timer_ref, - sender_queues, %% :: Pid -> {Q Msg, Set MsgId} + sender_queues, %% :: Pid -> {Q Msg, Set MsgId, ChState} msg_id_ack, %% :: MsgId -> AckTag msg_id_status, @@ -120,12 +118,10 @@ init(Q = #amqqueue { name = QName }) -> msg_id_ack = dict:new(), msg_id_status = dict:new(), - known_senders = pmon:new(), + known_senders = pmon:new(delegate), depth_delta = undefined }, - rabbit_event:notify(queue_slave_created, - infos(?CREATION_EVENT_KEYS, State)), ok = gm:broadcast(GM, request_depth), ok = gm:validate_members(GM, [GM | [G || {G, _} <- GMPids]]), {ok, State, hibernate, @@ -284,7 +280,8 @@ handle_info({'DOWN', _MonitorRef, process, MPid, _Reason}, noreply(State); handle_info({'DOWN', _MonitorRef, process, ChPid, _Reason}, State) -> - noreply(local_sender_death(ChPid, State)); + local_sender_death(ChPid, State), + noreply(maybe_forget_sender(ChPid, down_from_ch, State)); handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; @@ -574,10 +571,15 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, (_Msgid, _Status, MTC0) -> MTC0 end, gb_trees:empty(), MS), - Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ), - Delivery <- queue:to_list(PubQ)], + Deliveries = [Delivery || + {_ChPid, {PubQ, _PendCh, _ChState}} <- dict:to_list(SQ), + Delivery <- queue:to_list(PubQ)], + AwaitGmDown = [ChPid || {ChPid, {_, _, down_from_ch}} <- dict:to_list(SQ)], + KS1 = lists:foldl(fun (ChPid0, KS0) -> + pmon:demonitor(ChPid0, KS0) + end, KS, AwaitGmDown), rabbit_amqqueue_process:init_with_backing_queue_state( - Q1, rabbit_mirror_queue_master, MasterState, RateTRef, Deliveries, KS, + Q1, rabbit_mirror_queue_master, MasterState, RateTRef, Deliveries, KS1, MTC). noreply(State) -> @@ -617,7 +619,7 @@ stop_rate_timer(State) -> rabbit_misc:stop_timer(State, #state.rate_timer_ref). ensure_monitoring(ChPid, State = #state { known_senders = KS }) -> State #state { known_senders = pmon:monitor(ChPid, KS) }. -local_sender_death(ChPid, State = #state { known_senders = KS }) -> +local_sender_death(ChPid, #state { known_senders = KS }) -> %% The channel will be monitored iff we have received a delivery %% from it but not heard about its death from the master. So if it %% is monitored we need to point the death out to the master (see @@ -625,8 +627,7 @@ local_sender_death(ChPid, State = #state { known_senders = KS }) -> ok = case pmon:is_monitored(ChPid, KS) of false -> ok; true -> confirm_sender_death(ChPid) - end, - State. + end. confirm_sender_death(Pid) -> %% We have to deal with the possibility that we'll be promoted to @@ -655,12 +656,38 @@ confirm_sender_death(Pid) -> State end, %% Note that we do not remove our knowledge of this ChPid until we - %% get the sender_death from GM. + %% get the sender_death from GM as well as a DOWN notification. {ok, _TRef} = timer:apply_after( ?DEATH_TIMEOUT, rabbit_amqqueue, run_backing_queue, [self(), rabbit_mirror_queue_master, Fun]), ok. +forget_sender(_, running) -> false; +forget_sender(Down1, Down2) when Down1 =/= Down2 -> true. + +%% Record and process lifetime events from channels. Forget all about a channel +%% only when down notifications are received from both the channel and from gm. +maybe_forget_sender(ChPid, ChState, State = #state { sender_queues = SQ, + msg_id_status = MS, + known_senders = KS }) -> + case dict:find(ChPid, SQ) of + error -> + State; + {ok, {MQ, PendCh, ChStateRecord}} -> + case forget_sender(ChState, ChStateRecord) of + true -> + credit_flow:peer_down(ChPid), + State #state { sender_queues = dict:erase(ChPid, SQ), + msg_id_status = lists:foldl( + fun dict:erase/2, + MS, sets:to_list(PendCh)), + known_senders = pmon:demonitor(ChPid, KS) }; + false -> + SQ1 = dict:store(ChPid, {MQ, PendCh, ChState}, SQ), + State #state { sender_queues = SQ1 } + end + end. + maybe_enqueue_message( Delivery = #delivery { message = #basic_message { id = MsgId }, sender = ChPid }, @@ -669,9 +696,9 @@ maybe_enqueue_message( %% We will never see {published, ChPid, MsgSeqNo} here. case dict:find(MsgId, MS) of error -> - {MQ, PendingCh} = get_sender_queue(ChPid, SQ), + {MQ, PendingCh, ChState} = get_sender_queue(ChPid, SQ), MQ1 = queue:in(Delivery, MQ), - SQ1 = dict:store(ChPid, {MQ1, PendingCh}, SQ), + SQ1 = dict:store(ChPid, {MQ1, PendingCh, ChState}, SQ), State1 #state { sender_queues = SQ1 }; {ok, Status} -> MS1 = send_or_record_confirm( @@ -683,7 +710,7 @@ maybe_enqueue_message( get_sender_queue(ChPid, SQ) -> case dict:find(ChPid, SQ) of - error -> {queue:new(), sets:new()}; + error -> {queue:new(), sets:new(), running}; {ok, Val} -> Val end. @@ -691,19 +718,20 @@ remove_from_pending_ch(MsgId, ChPid, SQ) -> case dict:find(ChPid, SQ) of error -> SQ; - {ok, {MQ, PendingCh}} -> - dict:store(ChPid, {MQ, sets:del_element(MsgId, PendingCh)}, SQ) + {ok, {MQ, PendingCh, ChState}} -> + dict:store(ChPid, {MQ, sets:del_element(MsgId, PendingCh), ChState}, + SQ) end. publish_or_discard(Status, ChPid, MsgId, State = #state { sender_queues = SQ, msg_id_status = MS }) -> %% We really are going to do the publish/discard right now, even %% though we may not have seen it directly from the channel. But - %% we cannot issues confirms until the latter has happened. So we + %% we cannot issue confirms until the latter has happened. So we %% need to keep track of the MsgId and its confirmation status in %% the meantime. State1 = ensure_monitoring(ChPid, State), - {MQ, PendingCh} = get_sender_queue(ChPid, SQ), + {MQ, PendingCh, ChState} = get_sender_queue(ChPid, SQ), {MQ1, PendingCh1, MS1} = case queue:out(MQ) of {empty, _MQ2} -> @@ -723,7 +751,7 @@ publish_or_discard(Status, ChPid, MsgId, %% expecting any confirms from us. {MQ, PendingCh, MS} end, - SQ1 = dict:store(ChPid, {MQ1, PendingCh1}, SQ), + SQ1 = dict:store(ChPid, {MQ1, PendingCh1, ChState}, SQ), State1 #state { sender_queues = SQ1, msg_id_status = MS1 }. @@ -784,25 +812,13 @@ process_instruction({requeue, MsgIds}, {ok, State #state { msg_id_ack = MA1, backing_queue_state = BQS1 }}; process_instruction({sender_death, ChPid}, - State = #state { sender_queues = SQ, - msg_id_status = MS, - known_senders = KS }) -> + State = #state { known_senders = KS }) -> %% The channel will be monitored iff we have received a message %% from it. In this case we just want to avoid doing work if we %% never got any messages. {ok, case pmon:is_monitored(ChPid, KS) of false -> State; - true -> MS1 = case dict:find(ChPid, SQ) of - error -> - MS; - {ok, {_MQ, PendingCh}} -> - lists:foldl(fun dict:erase/2, MS, - sets:to_list(PendingCh)) - end, - credit_flow:peer_down(ChPid), - State #state { sender_queues = dict:erase(ChPid, SQ), - msg_id_status = MS1, - known_senders = pmon:demonitor(ChPid, KS) } + true -> maybe_forget_sender(ChPid, down_from_gm, State) end}; process_instruction({depth, Depth}, State = #state { backing_queue = BQ, diff --git a/src/rabbit_mirror_queue_slave_sup.erl b/src/rabbit_mirror_queue_slave_sup.erl index 43dbb4e9..6fba99db 100644 --- a/src/rabbit_mirror_queue_slave_sup.erl +++ b/src/rabbit_mirror_queue_slave_sup.erl @@ -31,7 +31,7 @@ start_link() -> supervisor2:start_link({local, ?SERVER}, ?MODULE, []). start_child(Node, Args) -> supervisor2:start_child({?SERVER, Node}, Args). init([]) -> - {ok, {{simple_one_for_one_terminate, 10, 10}, + {ok, {{simple_one_for_one, 10, 10}, [{rabbit_mirror_queue_slave, {rabbit_mirror_queue_slave, start_link, []}, temporary, ?MAX_WAIT, worker, [rabbit_mirror_queue_slave]}]}}. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index bca9d5ce..00c4eaf3 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -21,7 +21,8 @@ -export([method_record_type/1, polite_pause/0, polite_pause/1]). -export([die/1, frame_error/2, amqp_error/4, quit/1, protocol_error/3, protocol_error/4, protocol_error/1]). --export([not_found/1, absent/1, assert_args_equivalence/4]). +-export([not_found/1, absent/1]). +-export([type_class/1, assert_args_equivalence/4]). -export([dirty_read/1]). -export([table_lookup/2, set_table_value/4]). -export([r/3, r/2, r_arg/4, rs/1]). @@ -75,9 +76,6 @@ R =:= noproc; R =:= noconnection; R =:= nodedown; R =:= normal; R =:= shutdown). -%% This is dictated by `erlang:send_after' on which we depend to implement TTL. --define(MAX_EXPIRY_TIMER, 4294967295). - %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -120,6 +118,7 @@ (rabbit_types:amqp_error()) -> channel_or_connection_exit()). -spec(not_found/1 :: (rabbit_types:r(atom())) -> rabbit_types:channel_exit()). -spec(absent/1 :: (rabbit_types:amqqueue()) -> rabbit_types:channel_exit()). +-spec(type_class/1 :: (rabbit_framing:amqp_field_type()) -> atom()). -spec(assert_args_equivalence/4 :: (rabbit_framing:amqp_table(), rabbit_framing:amqp_table(), rabbit_types:r(any()), [binary()]) -> diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 0ed6c7b2..3a8fae7f 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -396,7 +396,7 @@ cluster_status(WhichNodes) -> node_info() -> {erlang:system_info(otp_release), rabbit_misc:version(), - delegate_beam_hash(), cluster_status_from_mnesia()}. + cluster_status_from_mnesia()}. node_type() -> {_AllNodes, DiscNodes, _RunningNodes} = @@ -460,10 +460,11 @@ init_db_and_upgrade(ClusterNodes, NodeType, CheckOtherNodes) -> %% about the cluster case NodeType of ram -> start_mnesia(), - change_extra_db_nodes(ClusterNodes, false), - rabbit_table:wait_for_replicated(); + change_extra_db_nodes(ClusterNodes, false); disc -> ok end, + %% ...and all nodes will need to wait for tables + rabbit_table:wait_for_replicated(), ok. init_db_with_mnesia(ClusterNodes, NodeType, @@ -570,16 +571,16 @@ check_cluster_consistency(Node) -> case rpc:call(Node, rabbit_mnesia, node_info, []) of {badrpc, _Reason} -> {error, not_found}; - {_OTP, _Rabbit, _Hash, {error, _}} -> + {_OTP, _Rabbit, {error, _}} -> {error, not_found}; - {_OTP, Rabbit, _Status} -> - %% pre-2013/04 format implies version mismatch - version_error("Rabbit", rabbit_misc:version(), Rabbit); - {OTP, Rabbit, Hash, {ok, Status}} -> - case check_consistency(OTP, Rabbit, Hash, Node, Status) of + {OTP, Rabbit, {ok, Status}} -> + case check_consistency(OTP, Rabbit, Node, Status) of {error, _} = E -> E; {ok, Res} -> {ok, Res} - end + end; + {_OTP, Rabbit, _Hash, _Status} -> + %% delegate hash checking implies version mismatch + version_error("Rabbit", rabbit_misc:version(), Rabbit) end. %%-------------------------------------------------------------------- @@ -743,17 +744,15 @@ change_extra_db_nodes(ClusterNodes0, CheckOtherNodes) -> Nodes end. -check_consistency(OTP, Rabbit, Hash) -> +check_consistency(OTP, Rabbit) -> rabbit_misc:sequence_error( [check_otp_consistency(OTP), - check_rabbit_consistency(Rabbit), - check_beam_compatibility(Hash)]). + check_rabbit_consistency(Rabbit)]). -check_consistency(OTP, Rabbit, Hash, Node, Status) -> +check_consistency(OTP, Rabbit, Node, Status) -> rabbit_misc:sequence_error( [check_otp_consistency(OTP), check_rabbit_consistency(Rabbit), - check_beam_compatibility(Hash), check_nodes_consistency(Node, Status)]). check_nodes_consistency(Node, RemoteStatus = {RemoteAllNodes, _, _}) -> @@ -784,31 +783,11 @@ version_error(Name, This, Remote) -> check_otp_consistency(Remote) -> check_version_consistency(erlang:system_info(otp_release), Remote, "OTP"). -%% Unlike the rest of 3.0.x, 3.0.0 is not compatible. This can be -%% removed after 3.1.0 is released. -check_rabbit_consistency("3.0.0") -> - version_error("Rabbit", rabbit_misc:version(), "3.0.0"); - check_rabbit_consistency(Remote) -> check_version_consistency( rabbit_misc:version(), Remote, "Rabbit", fun rabbit_misc:version_minor_equivalent/2). -check_beam_compatibility(RemoteHash) -> - case RemoteHash == delegate_beam_hash() of - true -> ok; - false -> {error, {incompatible_bytecode, - "Incompatible Erlang bytecode found on nodes"}} - end. - -%% The delegate module sends functions across the cluster; if it is -%% out of sync (say due to mixed compilers), we will get badfun -%% exceptions when trying to do so. Let's detect that at startup. -delegate_beam_hash() -> - {delegate, Obj, _} = code:get_object_code(delegate), - {ok, {delegate, Hash}} = beam_lib:md5(Obj), - Hash. - %% This is fairly tricky. We want to know if the node is in the state %% that a `reset' would leave it in. We cannot simply check if the %% mnesia tables aren't there because restarted RAM nodes won't have @@ -834,12 +813,13 @@ find_good_node([]) -> none; find_good_node([Node | Nodes]) -> case rpc:call(Node, rabbit_mnesia, node_info, []) of - {badrpc, _Reason} -> find_good_node(Nodes); - {_OTP, _Rabbit, _} -> find_good_node(Nodes); - {OTP, Rabbit, Hash, _} -> case check_consistency(OTP, Rabbit, Hash) of - {error, _} -> find_good_node(Nodes); - ok -> {ok, Node} - end + {badrpc, _Reason} -> find_good_node(Nodes); + %% old delegate hash check + {_OTP, _Rabbit, _Hash, _} -> find_good_node(Nodes); + {OTP, Rabbit, _} -> case check_consistency(OTP, Rabbit) of + {error, _} -> find_good_node(Nodes); + ok -> {ok, Node} + end end. is_only_clustered_disc_node() -> diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 46cfabe3..91be4dcb 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -149,14 +149,22 @@ ensure_ssl() -> ok = app_utils:start_applications(SslAppsConfig), {ok, SslOptsConfig} = application:get_env(rabbit, ssl_options), - % unknown_ca errors are silently ignored prior to R14B unless we - % supply this verify_fun - remove when at least R14B is required - case proplists:get_value(verify, SslOptsConfig, verify_none) of - verify_none -> SslOptsConfig; - verify_peer -> [{verify_fun, fun([]) -> true; - ([_|_]) -> false - end} - | SslOptsConfig] + case rabbit_misc:pget(verify_fun, SslOptsConfig) of + {Module, Function} -> + rabbit_misc:pset(verify_fun, + fun (ErrorList) -> + Module:Function(ErrorList) + end, SslOptsConfig); + undefined -> + % unknown_ca errors are silently ignored prior to R14B unless we + % supply this verify_fun - remove when at least R14B is required + case proplists:get_value(verify, SslOptsConfig, verify_none) of + verify_none -> SslOptsConfig; + verify_peer -> [{verify_fun, fun([]) -> true; + ([_|_]) -> false + end} + | SslOptsConfig] + end end. ssl_transform_fun(SslOpts) -> diff --git a/src/rabbit_plugins_main.erl b/src/rabbit_plugins_main.erl index 6355f935..948d2ab0 100644 --- a/src/rabbit_plugins_main.erl +++ b/src/rabbit_plugins_main.erl @@ -121,7 +121,6 @@ action(enable, ToEnable0, _Opts, PluginsFile, PluginsDir) -> fmt_missing("dependencies", MissingDeps)}) end, write_enabled_plugins(PluginsFile, NewEnabled), - maybe_warn_mochiweb(NewImplicitlyEnabled), case NewEnabled -- ImplicitlyEnabled of [] -> io:format("Plugin configuration unchanged.~n"); _ -> print_list("The following plugins have been enabled:", @@ -263,25 +262,6 @@ write_enabled_plugins(PluginsFile, Plugins) -> PluginsFile, Reason}}) end. -maybe_warn_mochiweb(Enabled) -> - V = erlang:system_info(otp_release), - case lists:member(mochiweb, Enabled) andalso V < "R13B01" of - true -> - Stars = string:copies("*", 80), - io:format("~n~n~s~n" - " Warning: Mochiweb enabled and Erlang version ~s " - "detected.~n" - " Enabling plugins that depend on Mochiweb is not " - "supported on this Erlang~n" - " version. At least R13B01 is required.~n~n" - " RabbitMQ will not start successfully in this " - "configuration. You *must*~n" - " disable the Mochiweb plugin, or upgrade Erlang.~n" - "~s~n~n~n", [Stars, V, Stars]); - false -> - ok - end. - report_change() -> io:format("Plugin configuration has changed. " "Restart RabbitMQ for changes to take effect.~n"). diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl new file mode 100644 index 00000000..c4a37e7a --- /dev/null +++ b/src/rabbit_policies.erl @@ -0,0 +1,81 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved. +%% + +-module(rabbit_policies). +-behaviour(rabbit_policy_validator). + +-include("rabbit.hrl"). + +-export([register/0, validate_policy/1]). + +-rabbit_boot_step({?MODULE, + [{description, "internal policies"}, + {mfa, {rabbit_policies, register, []}}, + {requires, rabbit_registry}, + {enables, recovery}]}). + +register() -> + [rabbit_registry:register(Class, Name, ?MODULE) || + {Class, Name} <- [{policy_validator, <<"alternate-exchange">>}, + {policy_validator, <<"dead-letter-exchange">>}, + {policy_validator, <<"dead-letter-routing-key">>}, + {policy_validator, <<"message-ttl">>}, + {policy_validator, <<"expires">>}, + {policy_validator, <<"max-length">>}]], + ok. + +validate_policy(Terms) -> + lists:foldl(fun ({Key, Value}, ok) -> validate_policy0(Key, Value); + (_, Error) -> Error + end, ok, Terms). + +validate_policy0(<<"alternate-exchange">>, Value) + when is_binary(Value) -> + ok; +validate_policy0(<<"alternate-exchange">>, Value) -> + {error, "~p is not a valid alternate exchange name", [Value]}; + +validate_policy0(<<"dead-letter-exchange">>, Value) + when is_binary(Value) -> + ok; +validate_policy0(<<"dead-letter-exchange">>, Value) -> + {error, "~p is not a valid dead letter exchange name", [Value]}; + +validate_policy0(<<"dead-letter-routing-key">>, Value) + when is_binary(Value) -> + ok; +validate_policy0(<<"dead-letter-routing-key">>, Value) -> + {error, "~p is not a valid dead letter routing key", [Value]}; + +validate_policy0(<<"message-ttl">>, Value) + when is_integer(Value), Value >= 0, Value =< ?MAX_EXPIRY_TIMER -> + ok; +validate_policy0(<<"message-ttl">>, Value) -> + {error, "~p is not a valid message TTL", [Value]}; + +validate_policy0(<<"expires">>, Value) + when is_integer(Value), Value >= 1, Value =< ?MAX_EXPIRY_TIMER -> + ok; +validate_policy0(<<"expires">>, Value) -> + {error, "~p is not a valid queue expiry", [Value]}; + +validate_policy0(<<"max-length">>, Value) + when is_integer(Value), Value >= 0 -> + ok; +validate_policy0(<<"max-length">>, Value) -> + {error, "~p is not a valid maximum length", [Value]}. + + diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl index 924737f6..db6d042b 100644 --- a/src/rabbit_policy.erl +++ b/src/rabbit_policy.erl @@ -25,9 +25,10 @@ -import(rabbit_misc, [pget/2]). -export([register/0]). --export([name/1, get/2, set/1]). +-export([invalidate/0, recover/0]). +-export([name/1, get/2, get_arg/3, set/1]). -export([validate/4, notify/4, notify_clear/3]). --export([parse_set/5, set/5, delete/2, lookup/2, list/0, list/1, +-export([parse_set/6, set/6, delete/2, lookup/2, list/0, list/1, list_formatted/1, info_keys/0]). -rabbit_boot_step({?MODULE, @@ -45,55 +46,106 @@ name(#exchange{policy = Policy}) -> name0(Policy). name0(undefined) -> none; name0(Policy) -> pget(name, Policy). -set(Q = #amqqueue{name = Name}) -> Q#amqqueue{policy = set0(Name)}; +set(Q = #amqqueue{name = Name}) -> rabbit_queue_decorator:set( + Q#amqqueue{policy = set0(Name)}); set(X = #exchange{name = Name}) -> rabbit_exchange_decorator:set( X#exchange{policy = set0(Name)}). set0(Name = #resource{virtual_host = VHost}) -> match(Name, list(VHost)). +set(Q = #amqqueue{name = Name}, Ps) -> Q#amqqueue{policy = match(Name, Ps)}; +set(X = #exchange{name = Name}, Ps) -> rabbit_exchange_decorator:set( + X#exchange{policy = match(Name, Ps)}). + get(Name, #amqqueue{policy = Policy}) -> get0(Name, Policy); get(Name, #exchange{policy = Policy}) -> get0(Name, Policy); %% Caution - SLOW. get(Name, EntityName = #resource{virtual_host = VHost}) -> get0(Name, match(EntityName, list(VHost))). -get0(_Name, undefined) -> {error, not_found}; +get0(_Name, undefined) -> undefined; get0(Name, List) -> case pget(definition, List) of - undefined -> {error, not_found}; - Policy -> case pget(Name, Policy) of - undefined -> {error, not_found}; - Value -> {ok, Value} - end + undefined -> undefined; + Policy -> pget(Name, Policy) end. +%% Many heads for optimisation +get_arg(_AName, _PName, #exchange{arguments = [], policy = undefined}) -> + undefined; +get_arg(_AName, PName, X = #exchange{arguments = []}) -> + get(PName, X); +get_arg(AName, PName, X = #exchange{arguments = Args}) -> + case rabbit_misc:table_lookup(Args, AName) of + undefined -> get(PName, X); + {_Type, Arg} -> Arg + end. + +%%---------------------------------------------------------------------------- + +%% Gets called during upgrades - therefore must not assume anything about the +%% state of Mnesia +invalidate() -> + rabbit_file:write_file(invalid_file(), <<"">>). + +recover() -> + case rabbit_file:is_file(invalid_file()) of + true -> recover0(), + rabbit_file:delete(invalid_file()); + false -> ok + end. + +%% To get here we have to have just completed an Mnesia upgrade - i.e. we are +%% the first node starting. So we can rewrite the whole database. Note that +%% recovery has not yet happened; we must work with the rabbit_durable_<thing> +%% variants. +recover0() -> + Xs = mnesia:dirty_match_object(rabbit_durable_exchange, #exchange{_ = '_'}), + Qs = mnesia:dirty_match_object(rabbit_durable_queue, #amqqueue{_ = '_'}), + Policies = list(), + [rabbit_misc:execute_mnesia_transaction( + fun () -> + mnesia:write(rabbit_durable_exchange, set(X, Policies), write) + end) || X <- Xs], + [rabbit_misc:execute_mnesia_transaction( + fun () -> + mnesia:write(rabbit_durable_queue, set(Q, Policies), write) + end) || Q <- Qs], + ok. + +invalid_file() -> + filename:join(rabbit_mnesia:dir(), "policies_are_invalid"). + %%---------------------------------------------------------------------------- -parse_set(VHost, Name, Pattern, Definition, undefined) -> - parse_set0(VHost, Name, Pattern, Definition, 0); -parse_set(VHost, Name, Pattern, Definition, Priority) -> +parse_set(VHost, Name, Pattern, Definition, Priority, ApplyTo) -> try list_to_integer(Priority) of - Num -> parse_set0(VHost, Name, Pattern, Definition, Num) + Num -> parse_set0(VHost, Name, Pattern, Definition, Num, ApplyTo) catch error:badarg -> {error, "~p priority must be a number", [Priority]} end. -parse_set0(VHost, Name, Pattern, Defn, Priority) -> +parse_set0(VHost, Name, Pattern, Defn, Priority, ApplyTo) -> case rabbit_misc:json_decode(Defn) of {ok, JSON} -> set0(VHost, Name, [{<<"pattern">>, list_to_binary(Pattern)}, {<<"definition">>, rabbit_misc:json_to_term(JSON)}, - {<<"priority">>, Priority}]); + {<<"priority">>, Priority}, + {<<"apply-to">>, ApplyTo}]); error -> {error_string, "JSON decoding error"} end. -set(VHost, Name, Pattern, Definition, Priority) -> +set(VHost, Name, Pattern, Definition, Priority, ApplyTo) -> PolicyProps = [{<<"pattern">>, Pattern}, {<<"definition">>, Definition}, {<<"priority">>, case Priority of undefined -> 0; _ -> Priority + end}, + {<<"apply-to">>, case ApplyTo of + undefined -> <<"all">>; + _ -> ApplyTo end}], set0(VHost, Name, PolicyProps). @@ -130,6 +182,7 @@ p(Parameter, DefnFun) -> [{vhost, pget(vhost, Parameter)}, {name, pget(name, Parameter)}, {pattern, pget(<<"pattern">>, Value)}, + {'apply-to', pget(<<"apply-to">>, Value)}, {definition, DefnFun(pget(<<"definition">>, Value))}, {priority, pget(<<"priority">>, Value)}]. @@ -139,7 +192,7 @@ format(Term) -> ident(X) -> X. -info_keys() -> [vhost, name, pattern, definition, priority]. +info_keys() -> [vhost, name, 'apply-to', pattern, definition, priority]. %%---------------------------------------------------------------------------- @@ -211,9 +264,14 @@ update_exchange(X = #exchange{name = XName, policy = OldPolicy}, Policies) -> update_queue(Q = #amqqueue{name = QName, policy = OldPolicy}, Policies) -> case match(QName, Policies) of OldPolicy -> no_change; - NewPolicy -> rabbit_amqqueue:update( - QName, fun(Q1) -> Q1#amqqueue{policy = NewPolicy} end), - {Q, Q#amqqueue{policy = NewPolicy}} + NewPolicy -> case rabbit_amqqueue:update( + QName, fun(Q1) -> + rabbit_queue_decorator:set( + Q1#amqqueue{policy = NewPolicy}) + end) of + #amqqueue{} = Q1 -> {Q, Q1}; + not_found -> {Q, Q } + end end. notify(no_change)-> @@ -229,8 +287,16 @@ match(Name, Policies) -> [Policy | _Rest] -> Policy end. -matches(#resource{name = Name}, Policy) -> - match =:= re:run(Name, pget(pattern, Policy), [{capture, none}]). +matches(#resource{name = Name, kind = Kind, virtual_host = VHost}, Policy) -> + matches_type(Kind, pget('apply-to', Policy)) andalso + match =:= re:run(Name, pget(pattern, Policy), [{capture, none}]) andalso + VHost =:= pget(vhost, Policy). + +matches_type(exchange, <<"exchanges">>) -> true; +matches_type(queue, <<"queues">>) -> true; +matches_type(exchange, <<"all">>) -> true; +matches_type(queue, <<"all">>) -> true; +matches_type(_, _) -> false. sort_pred(A, B) -> pget(priority, A) >= pget(priority, B). @@ -239,6 +305,7 @@ sort_pred(A, B) -> pget(priority, A) >= pget(priority, B). policy_validation() -> [{<<"priority">>, fun rabbit_parameter_validation:number/2, mandatory}, {<<"pattern">>, fun rabbit_parameter_validation:regex/2, mandatory}, + {<<"apply-to">>, fun apply_to_validation/2, optional}, {<<"definition">>, fun validation/2, mandatory}]. validation(_Name, []) -> @@ -284,3 +351,10 @@ a2b(A) -> list_to_binary(atom_to_list(A)). dups(L) -> L -- lists:usort(L). is_proplist(L) -> length(L) =:= length([I || I = {_, _} <- L]). + +apply_to_validation(_Name, <<"all">>) -> ok; +apply_to_validation(_Name, <<"exchanges">>) -> ok; +apply_to_validation(_Name, <<"queues">>) -> ok; +apply_to_validation(_Name, Term) -> + {error, "apply-to '~s' unrecognised; should be 'queues', 'exchanges' " + "or 'all'", [Term]}. diff --git a/src/rabbit_queue_decorator.erl b/src/rabbit_queue_decorator.erl new file mode 100644 index 00000000..8f6375a5 --- /dev/null +++ b/src/rabbit_queue_decorator.erl @@ -0,0 +1,48 @@ +-module(rabbit_queue_decorator). + +-include("rabbit.hrl"). + +-export([select/1, set/1]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(notify_event() :: 'consumer_blocked' | + 'consumer_unblocked' | + 'queue_empty' | + 'basic_consume' | + 'basic_cancel' | + 'refresh'). + +-callback startup(rabbit_types:amqqueue()) -> 'ok'. + +-callback shutdown(rabbit_types:amqqueue()) -> 'ok'. + +-callback policy_changed(rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> + 'ok'. + +-callback active_for(rabbit_types:amqqueue()) -> boolean(). + +-callback notify(rabbit_types:amqqueue(), notify_event(), any()) -> 'ok'. + +-else. + +-export([behaviour_info/1]). + +behaviour_info(callbacks) -> + [{description, 0}, {startup, 1}, {shutdown, 1}, {policy_changed, 2}, + {active_for, 1}, {notify, 3}]; +behaviour_info(_Other) -> + undefined. + +-endif. + +%%---------------------------------------------------------------------------- + +select(Modules) -> + [M || M <- Modules, code:which(M) =/= non_existing]. + +set(Q) -> Q#amqqueue{decorators = [D || D <- list(), D:active_for(Q)]}. + +list() -> [M || {_, M} <- rabbit_registry:lookup_all(queue_decorator)]. diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 0908fb73..f69d8355 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -23,7 +23,7 @@ -export([scan/3]). --export([add_queue_ttl/0]). +-export([add_queue_ttl/0, avoid_zeroes/0]). -define(CLEAN_FILENAME, "clean.dot"). @@ -123,9 +123,9 @@ -define(REL_SEQ_BITS, 14). -define(SEGMENT_ENTRY_COUNT, 16384). %% trunc(math:pow(2,?REL_SEQ_BITS))). -%% seq only is binary 00 followed by 14 bits of rel seq id +%% seq only is binary 01 followed by 14 bits of rel seq id %% (range: 0 - 16383) --define(REL_SEQ_ONLY_PREFIX, 00). +-define(REL_SEQ_ONLY_PREFIX, 01). -define(REL_SEQ_ONLY_PREFIX_BITS, 2). -define(REL_SEQ_ONLY_RECORD_BYTES, 2). @@ -171,6 +171,7 @@ %%---------------------------------------------------------------------------- -rabbit_upgrade({add_queue_ttl, local, []}). +-rabbit_upgrade({avoid_zeroes, local, [add_queue_ttl]}). -ifdef(use_specs). @@ -715,7 +716,11 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) -> load_journal_entries(add_to_journal(SeqId, ack, State)); _ -> case file_handle_cache:read(Hdl, ?PUB_RECORD_BODY_BYTES) of - {ok, Bin} -> + %% Journal entry composed only of zeroes was probably + %% produced during a dirty shutdown so stop reading + {ok, <<0:?PUB_RECORD_BODY_BYTES/unit:8>>} -> + State; + {ok, <<Bin:?PUB_RECORD_BODY_BYTES/binary>>} -> {MsgId, MsgProps} = parse_pub_record_body(Bin), IsPersistent = case Prefix of ?PUB_PERSIST_JPREFIX -> true; @@ -1068,6 +1073,21 @@ add_queue_ttl_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, add_queue_ttl_segment(_) -> stop. +avoid_zeroes() -> + foreach_queue_index({none, fun avoid_zeroes_segment/1}). + +avoid_zeroes_segment(<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, + RelSeq:?REL_SEQ_BITS, MsgId:?MSG_ID_BITS, + Expiry:?EXPIRY_BITS, Rest/binary>>) -> + {<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS, + MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS>>, Rest}; +avoid_zeroes_segment(<<0:?REL_SEQ_ONLY_PREFIX_BITS, + RelSeq:?REL_SEQ_BITS, Rest/binary>>) -> + {<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>, + Rest}; +avoid_zeroes_segment(_) -> + stop. + %%---------------------------------------------------------------------------- foreach_queue_index(Funs) -> @@ -1092,7 +1112,9 @@ transform_queue(Dir, Gatherer, {JournalFun, SegmentFun}) -> || Seg <- rabbit_file:wildcard(".*\\" ++ ?SEGMENT_EXTENSION, Dir)], ok = gatherer:finish(Gatherer). -transform_file(Path, Fun) -> +transform_file(_Path, none) -> + ok; +transform_file(Path, Fun) when is_function(Fun)-> PathTmp = Path ++ ".upgrade", case rabbit_file:file_size(Path) of 0 -> ok; diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 9b6039d1..e00732fd 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -18,12 +18,12 @@ -include("rabbit_framing.hrl"). -include("rabbit.hrl"). --export([start_link/3, info_keys/0, info/1, info/2, force_event_refresh/1, +-export([start_link/1, info_keys/0, info/1, info/2, force_event_refresh/1, shutdown/2]). -export([system_continue/3, system_terminate/4, system_code_change/4]). --export([init/4, mainloop/2, recvloop/2]). +-export([init/2, mainloop/2, recvloop/2]). -export([conserve_resources/3, server_properties/1]). @@ -36,16 +36,16 @@ %%-------------------------------------------------------------------------- -record(v1, {parent, sock, connection, callback, recv_len, pending_recv, - connection_state, queue_collector, heartbeater, stats_timer, - ch_sup3_pid, channel_sup_sup_pid, start_heartbeat_fun, - buf, buf_len, throttle}). + connection_state, helper_sup, queue_collector, heartbeater, + stats_timer, channel_sup_sup_pid, buf, buf_len, throttle}). -record(connection, {name, host, peer_host, port, peer_port, protocol, user, timeout_sec, frame_max, vhost, client_properties, capabilities, auth_mechanism, auth_state}). --record(throttle, {conserve_resources, last_blocked_by, last_blocked_at}). +-record(throttle, {alarmed_by, last_blocked_by, last_blocked_at, + blocked_sent}). -define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, send_pend, state, last_blocked_by, last_blocked_age, @@ -73,8 +73,7 @@ -ifdef(use_specs). --spec(start_link/3 :: (pid(), pid(), rabbit_heartbeat:start_heartbeat_fun()) -> - rabbit_types:ok(pid())). +-spec(start_link/1 :: (pid()) -> rabbit_types:ok(pid())). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -spec(info/1 :: (pid()) -> rabbit_types:infos()). -spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()). @@ -85,11 +84,9 @@ rabbit_framing:amqp_table()). %% These specs only exists to add no_return() to keep dialyzer happy --spec(init/4 :: (pid(), pid(), pid(), rabbit_heartbeat:start_heartbeat_fun()) - -> no_return()). --spec(start_connection/7 :: - (pid(), pid(), pid(), rabbit_heartbeat:start_heartbeat_fun(), any(), - rabbit_net:socket(), +-spec(init/2 :: (pid(), pid()) -> no_return()). +-spec(start_connection/5 :: + (pid(), pid(), any(), rabbit_net:socket(), fun ((rabbit_net:socket()) -> rabbit_types:ok_or_error2( rabbit_net:socket(), any()))) -> no_return()). @@ -103,24 +100,21 @@ %%-------------------------------------------------------------------------- -start_link(ChannelSup3Pid, Collector, StartHeartbeatFun) -> - {ok, proc_lib:spawn_link(?MODULE, init, [self(), ChannelSup3Pid, - Collector, StartHeartbeatFun])}. +start_link(HelperSup) -> + {ok, proc_lib:spawn_link(?MODULE, init, [self(), HelperSup])}. shutdown(Pid, Explanation) -> gen_server:call(Pid, {shutdown, Explanation}, infinity). -init(Parent, ChSup3Pid, Collector, StartHeartbeatFun) -> +init(Parent, HelperSup) -> Deb = sys:debug_options([]), receive {go, Sock, SockTransform} -> - start_connection( - Parent, ChSup3Pid, Collector, StartHeartbeatFun, Deb, Sock, - SockTransform) + start_connection(Parent, HelperSup, Deb, Sock, SockTransform) end. system_continue(Parent, Deb, State) -> - ?MODULE:mainloop(Deb, State#v1{parent = Parent}). + mainloop(Deb, State#v1{parent = Parent}). system_terminate(Reason, _Parent, _Deb, _State) -> exit(Reason). @@ -142,8 +136,8 @@ info(Pid, Items) -> force_event_refresh(Pid) -> gen_server:cast(Pid, force_event_refresh). -conserve_resources(Pid, _Source, Conserve) -> - Pid ! {conserve_resources, Conserve}, +conserve_resources(Pid, Source, Conserve) -> + Pid ! {conserve_resources, Source, Conserve}, ok. server_properties(Protocol) -> @@ -175,10 +169,13 @@ server_properties(Protocol) -> NormalizedConfigServerProps). server_capabilities(rabbit_framing_amqp_0_9_1) -> - [{<<"publisher_confirms">>, bool, true}, - {<<"exchange_exchange_bindings">>, bool, true}, - {<<"basic.nack">>, bool, true}, - {<<"consumer_cancel_notify">>, bool, true}]; + [{<<"publisher_confirms">>, bool, true}, + {<<"exchange_exchange_bindings">>, bool, true}, + {<<"basic.nack">>, bool, true}, + {<<"consumer_cancel_notify">>, bool, true}, + {<<"connection.blocked">>, bool, true}, + {<<"consumer_priorities">>, bool, true}, + {<<"authentication_failure_close">>, bool, true}]; server_capabilities(_) -> []. @@ -201,8 +198,7 @@ socket_op(Sock, Fun) -> exit(normal) end. -start_connection(Parent, ChSup3Pid, Collector, StartHeartbeatFun, Deb, - Sock, SockTransform) -> +start_connection(Parent, HelperSup, Deb, Sock, SockTransform) -> process_flag(trap_exit, true), Name = case rabbit_net:connection_string(Sock, inbound) of {ok, Str} -> Str; @@ -238,17 +234,17 @@ start_connection(Parent, ChSup3Pid, Collector, StartHeartbeatFun, Deb, recv_len = 0, pending_recv = false, connection_state = pre_init, - queue_collector = Collector, + queue_collector = undefined, %% started on tune-ok + helper_sup = HelperSup, heartbeater = none, - ch_sup3_pid = ChSup3Pid, channel_sup_sup_pid = none, - start_heartbeat_fun = StartHeartbeatFun, buf = [], buf_len = 0, throttle = #throttle{ - conserve_resources = false, - last_blocked_by = none, - last_blocked_at = never}}, + alarmed_by = [], + last_blocked_by = none, + last_blocked_at = never, + blocked_sent = false}}, try run({?MODULE, recvloop, [Deb, switch_callback(rabbit_event:init_stats_timer( @@ -321,9 +317,14 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) -> end end. -handle_other({conserve_resources, Conserve}, - State = #v1{throttle = Throttle}) -> - Throttle1 = Throttle#throttle{conserve_resources = Conserve}, +handle_other({conserve_resources, Source, Conserve}, + State = #v1{throttle = Throttle = + #throttle{alarmed_by = CR}}) -> + CR1 = case Conserve of + true -> lists:usort([Source | CR]); + false -> CR -- [Source] + end, + Throttle1 = Throttle#throttle{alarmed_by = CR1}, control_throttle(State#v1{throttle = Throttle1}); handle_other({channel_closing, ChPid}, State) -> ok = rabbit_channel:ready_for_close(ChPid), @@ -409,30 +410,61 @@ terminate(_Explanation, State) -> {force, State}. control_throttle(State = #v1{connection_state = CS, throttle = Throttle}) -> - case {CS, (Throttle#throttle.conserve_resources orelse - credit_flow:blocked())} of + IsThrottled = ((Throttle#throttle.alarmed_by =/= []) orelse + credit_flow:blocked()), + case {CS, IsThrottled} of {running, true} -> State#v1{connection_state = blocking}; {blocking, false} -> State#v1{connection_state = running}; {blocked, false} -> ok = rabbit_heartbeat:resume_monitor( State#v1.heartbeater), - State#v1{connection_state = running}; + maybe_send_unblocked(State), + State#v1{connection_state = running, + throttle = Throttle#throttle{ + blocked_sent = false}}; {blocked, true} -> State#v1{throttle = update_last_blocked_by( Throttle)}; {_, _} -> State end. -maybe_block(State = #v1{connection_state = blocking, throttle = Throttle}) -> +maybe_block(State = #v1{connection_state = blocking, + throttle = Throttle}) -> ok = rabbit_heartbeat:pause_monitor(State#v1.heartbeater), + Sent = maybe_send_blocked(State), State#v1{connection_state = blocked, throttle = update_last_blocked_by( - Throttle#throttle{last_blocked_at = erlang:now()})}; + Throttle#throttle{last_blocked_at = erlang:now(), + blocked_sent = Sent})}; maybe_block(State) -> State. -update_last_blocked_by(Throttle = #throttle{conserve_resources = true}) -> - Throttle#throttle{last_blocked_by = resource}; -update_last_blocked_by(Throttle = #throttle{conserve_resources = false}) -> - Throttle#throttle{last_blocked_by = flow}. +maybe_send_blocked(#v1{throttle = #throttle{alarmed_by = []}}) -> + false; +maybe_send_blocked(#v1{throttle = #throttle{alarmed_by = CR}, + connection = #connection{ + protocol = Protocol, + capabilities = Capabilities}, + sock = Sock}) -> + case rabbit_misc:table_lookup(Capabilities, <<"connection.blocked">>) of + {bool, true} -> + RStr = string:join([atom_to_list(A) || A <- CR], " & "), + Reason = list_to_binary(rabbit_misc:format("low on ~s", [RStr])), + ok = send_on_channel0(Sock, #'connection.blocked'{reason = Reason}, + Protocol), + true; + _ -> + false + end. + +maybe_send_unblocked(#v1{throttle = #throttle{blocked_sent = false}}) -> + ok; +maybe_send_unblocked(#v1{connection = #connection{protocol = Protocol}, + sock = Sock}) -> + ok = send_on_channel0(Sock, #'connection.unblocked'{}, Protocol). + +update_last_blocked_by(Throttle = #throttle{alarmed_by = []}) -> + Throttle#throttle{last_blocked_by = flow}; +update_last_blocked_by(Throttle) -> + Throttle#throttle{last_blocked_by = resource}. %%-------------------------------------------------------------------------- %% error handling / termination @@ -810,8 +842,8 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, heartbeat = ClientHeartbeat}, State = #v1{connection_state = tuning, connection = Connection, - sock = Sock, - start_heartbeat_fun = SHF}) -> + helper_sup = SupPid, + sock = Sock}) -> ServerFrameMax = server_frame_max(), if FrameMax /= 0 andalso FrameMax < ?FRAME_MIN_SIZE -> rabbit_misc:protocol_error( @@ -822,16 +854,20 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, not_allowed, "frame_max=~w > ~w max size", [FrameMax, ServerFrameMax]); true -> + {ok, Collector} = + rabbit_connection_helper_sup:start_queue_collector(SupPid), Frame = rabbit_binary_generator:build_heartbeat_frame(), SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end, Parent = self(), ReceiveFun = fun() -> Parent ! heartbeat_timeout end, - Heartbeater = SHF(Sock, ClientHeartbeat, SendFun, - ClientHeartbeat, ReceiveFun), + Heartbeater = + rabbit_heartbeat:start(SupPid, Sock, ClientHeartbeat, + SendFun, ClientHeartbeat, ReceiveFun), State#v1{connection_state = opening, connection = Connection#connection{ timeout_sec = ClientHeartbeat, frame_max = FrameMax}, + queue_collector = Collector, heartbeater = Heartbeater} end; @@ -840,19 +876,16 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, connection = Connection = #connection{ user = User, protocol = Protocol}, - ch_sup3_pid = ChSup3Pid, + helper_sup = SupPid, sock = Sock, throttle = Throttle}) -> ok = rabbit_access_control:check_vhost_access(User, VHostPath), NewConnection = Connection#connection{vhost = VHostPath}, ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol), Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), - Throttle1 = Throttle#throttle{conserve_resources = Conserve}, + Throttle1 = Throttle#throttle{alarmed_by = Conserve}, {ok, ChannelSupSupPid} = - supervisor2:start_child( - ChSup3Pid, - {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []}, - intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}), + rabbit_connection_helper_sup:start_channel_sup_sup(SupPid), State1 = control_throttle( State#v1{connection_state = running, connection = NewConnection, @@ -925,14 +958,29 @@ auth_mechanisms_binary(Sock) -> auth_phase(Response, State = #v1{connection = Connection = #connection{protocol = Protocol, + capabilities = Capabilities, auth_mechanism = {Name, AuthMechanism}, auth_state = AuthState}, sock = Sock}) -> case AuthMechanism:handle_response(Response, AuthState) of {refused, Msg, Args} -> - rabbit_misc:protocol_error( - access_refused, "~s login refused: ~s", - [Name, io_lib:format(Msg, Args)]); + AmqpError = rabbit_misc:amqp_error( + access_refused, "~s login refused: ~s", + [Name, io_lib:format(Msg, Args)], none), + case rabbit_misc:table_lookup(Capabilities, + <<"authentication_failure_close">>) of + {bool, true} -> + SafeMsg = io_lib:format( + "Login was refused using authentication " + "mechanism ~s. For details see the broker " + "logfile.", [Name]), + AmqpError1 = AmqpError#amqp_error{explanation = SafeMsg}, + {0, CloseMethod} = rabbit_binary_generator:map_exception( + 0, AmqpError1, Protocol), + ok = send_on_channel0(State#v1.sock, CloseMethod, Protocol); + _ -> ok + end, + rabbit_misc:protocol_error(AmqpError); {protocol_error, Msg, Args} -> rabbit_misc:protocol_error(syntax_error, Msg, Args); {challenge, Challenge, AuthState1} -> @@ -1050,10 +1098,7 @@ pack_for_1_0(#v1{parent = Parent, sock = Sock, recv_len = RecvLen, pending_recv = PendingRecv, - queue_collector = QueueCollector, - ch_sup3_pid = ChSup3Pid, - start_heartbeat_fun = SHF, + helper_sup = SupPid, buf = Buf, buf_len = BufLen}) -> - {Parent, Sock, RecvLen, PendingRecv, QueueCollector, ChSup3Pid, SHF, - Buf, BufLen}. + {Parent, Sock, RecvLen, PendingRecv, SupPid, Buf, BufLen}. diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl index f933e4e9..3014aeb7 100644 --- a/src/rabbit_registry.erl +++ b/src/rabbit_registry.erl @@ -130,6 +130,7 @@ class_module(exchange) -> rabbit_exchange_type; class_module(auth_mechanism) -> rabbit_auth_mechanism; class_module(runtime_parameter) -> rabbit_runtime_parameter; class_module(exchange_decorator) -> rabbit_exchange_decorator; +class_module(queue_decorator) -> rabbit_queue_decorator; class_module(policy_validator) -> rabbit_policy_validator; class_module(ha_mode) -> rabbit_mirror_queue_mode. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 30cf9114..76421d1a 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -60,6 +60,7 @@ all_tests() -> passed = test_user_management(), passed = test_runtime_parameters(), passed = test_policy_validation(), + passed = test_policy_opts_validation(), passed = test_ha_policy_validation(), passed = test_server_status(), passed = test_amqp_connection_refusal(), @@ -1083,29 +1084,57 @@ test_runtime_parameters() -> test_policy_validation() -> rabbit_runtime_parameters_test:register_policy_validator(), - SetPol = - fun (Key, Val) -> - control_action( - set_policy, - ["name", ".*", rabbit_misc:format("{\"~s\":~p}", [Key, Val])]) - end, + SetPol = fun (Key, Val) -> + control_action_opts( + ["set_policy", "name", ".*", + rabbit_misc:format("{\"~s\":~p}", [Key, Val])]) + end, - ok = SetPol("testeven", []), - ok = SetPol("testeven", [1, 2]), - ok = SetPol("testeven", [1, 2, 3, 4]), - ok = SetPol("testpos", [2, 5, 5678]), + ok = SetPol("testeven", []), + ok = SetPol("testeven", [1, 2]), + ok = SetPol("testeven", [1, 2, 3, 4]), + ok = SetPol("testpos", [2, 5, 5678]), - {error_string, _} = SetPol("testpos", [-1, 0, 1]), - {error_string, _} = SetPol("testeven", [ 1, 2, 3]), + error = SetPol("testpos", [-1, 0, 1]), + error = SetPol("testeven", [ 1, 2, 3]), ok = control_action(clear_policy, ["name"]), rabbit_runtime_parameters_test:unregister_policy_validator(), passed. +test_policy_opts_validation() -> + Set = fun (Extra) -> control_action_opts( + ["set_policy", "name", ".*", "{\"ha-mode\":\"all\"}" + | Extra]) end, + OK = fun (Extra) -> ok = Set(Extra) end, + Fail = fun (Extra) -> error = Set(Extra) end, + + OK ([]), + + OK (["--priority", "0"]), + OK (["--priority", "3"]), + Fail(["--priority", "banana"]), + Fail(["--priority"]), + + OK (["--apply-to", "all"]), + OK (["--apply-to", "queues"]), + Fail(["--apply-to", "bananas"]), + Fail(["--apply-to"]), + + OK (["--priority", "3", "--apply-to", "queues"]), + Fail(["--priority", "banana", "--apply-to", "queues"]), + Fail(["--priority", "3", "--apply-to", "bananas"]), + + Fail(["--offline"]), + + ok = control_action(clear_policy, ["name"]), + passed. + test_ha_policy_validation() -> - Set = fun (JSON) -> control_action(set_policy, ["name", ".*", JSON]) end, + Set = fun (JSON) -> control_action_opts( + ["set_policy", "name", ".*", JSON]) end, OK = fun (JSON) -> ok = Set(JSON) end, - Fail = fun (JSON) -> {error_string, _} = Set(JSON) end, + Fail = fun (JSON) -> error = Set(JSON) end, OK ("{\"ha-mode\":\"all\"}"), Fail("{\"ha-mode\":\"made_up\"}"), @@ -1139,7 +1168,7 @@ test_server_status() -> rabbit_misc:r(<<"/">>, queue, Name), false, false, [], none)]], ok = rabbit_amqqueue:basic_consume( - Q, true, Ch, Limiter, false, <<"ctag">>, true, none, undefined), + Q, true, Ch, Limiter, false, <<"ctag">>, true, none, [], undefined), %% list queues ok = info_action(list_queues, rabbit_amqqueue:info_keys(), true), @@ -1611,6 +1640,18 @@ control_action(Command, Node, Args, Opts) -> Other end. +control_action_opts(Raw) -> + NodeStr = atom_to_list(node()), + case rabbit_control_main:parse_arguments(Raw, NodeStr) of + {ok, {Cmd, Opts, Args}} -> + case control_action(Cmd, node(), Args, Opts) of + ok -> ok; + _ -> error + end; + _ -> + error + end. + info_action(Command, Args, CheckVHost) -> ok = control_action(Command, []), if CheckVHost -> ok = control_action(Command, [], ["-p", "/"]); diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 1613838c..6f95ef60 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -44,6 +44,8 @@ -rabbit_upgrade({no_mirror_nodes, mnesia, [sync_slave_pids]}). -rabbit_upgrade({gm_pids, mnesia, [no_mirror_nodes]}). -rabbit_upgrade({exchange_decorators, mnesia, [policy]}). +-rabbit_upgrade({policy_apply_to, mnesia, [runtime_parameters]}). +-rabbit_upgrade({queue_decorators, mnesia, [gm_pids]}). %% ------------------------------------------------------------------- @@ -70,6 +72,8 @@ -spec(no_mirror_nodes/0 :: () -> 'ok'). -spec(gm_pids/0 :: () -> 'ok'). -spec(exchange_decorators/0 :: () -> 'ok'). +-spec(policy_apply_to/0 :: () -> 'ok'). +-spec(queue_decorators/0 :: () -> 'ok'). -endif. @@ -299,6 +303,42 @@ exchange_decorators(Table) -> [name, type, durable, auto_delete, internal, arguments, scratches, policy, decorators]). +policy_apply_to() -> + transform( + rabbit_runtime_parameters, + fun ({runtime_parameters, Key = {_VHost, <<"policy">>, _Name}, Value}) -> + ApplyTo = apply_to(proplists:get_value(<<"definition">>, Value)), + {runtime_parameters, Key, [{<<"apply-to">>, ApplyTo} | Value]}; + ({runtime_parameters, Key, Value}) -> + {runtime_parameters, Key, Value} + end, + [key, value]), + rabbit_policy:invalidate(), + ok. + +apply_to(Def) -> + case [proplists:get_value(K, Def) || + K <- [<<"federation-upstream-set">>, <<"ha-mode">>]] of + [undefined, undefined] -> <<"all">>; + [_, undefined] -> <<"exchanges">>; + [undefined, _] -> <<"queues">>; + [_, _] -> <<"all">> + end. + +queue_decorators() -> + ok = queue_decorators(rabbit_queue), + ok = queue_decorators(rabbit_durable_queue). + +queue_decorators(Table) -> + transform( + Table, + fun ({amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments, + Pid, SlavePids, SyncSlavePids, Policy, GmPids}) -> + {amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments, + Pid, SlavePids, SyncSlavePids, Policy, GmPids, []} + end, + [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids, + sync_slave_pids, policy, gm_pids, decorators]). %%-------------------------------------------------------------------- diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index c0b1f8e4..34dd3d3b 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -19,14 +19,18 @@ -include("rabbit_framing.hrl"). -export([start/5, start_link/5, start/6, start_link/6]). + +-export([system_continue/3, system_terminate/4, system_code_change/4]). + -export([send_command/2, send_command/3, send_command_sync/2, send_command_sync/3, send_command_and_notify/4, send_command_and_notify/5, + send_command_flow/2, send_command_flow/3, flush/1]). -export([internal_send_command/4, internal_send_command/6]). %% internal --export([mainloop/1, mainloop1/1]). +-export([mainloop/2, mainloop1/2]). -record(wstate, {sock, channel, frame_max, protocol, reader, stats_timer, pending}). @@ -53,6 +57,11 @@ (rabbit_net:socket(), rabbit_channel:channel_number(), non_neg_integer(), rabbit_types:protocol(), pid(), boolean()) -> rabbit_types:ok(pid())). + +-spec(system_code_change/4 :: (_,_,_,_) -> {'ok',_}). +-spec(system_continue/3 :: (_,_,#wstate{}) -> any()). +-spec(system_terminate/4 :: (_,_,_,_) -> none()). + -spec(send_command/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(send_command/3 :: @@ -70,6 +79,11 @@ (pid(), pid(), pid(), rabbit_framing:amqp_method_record(), rabbit_types:content()) -> 'ok'). +-spec(send_command_flow/2 :: + (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). +-spec(send_command_flow/3 :: + (pid(), rabbit_framing:amqp_method_record(), rabbit_types:content()) + -> 'ok'). -spec(flush/1 :: (pid()) -> 'ok'). -spec(internal_send_command/4 :: (rabbit_net:socket(), rabbit_channel:channel_number(), @@ -94,12 +108,14 @@ start_link(Sock, Channel, FrameMax, Protocol, ReaderPid) -> start(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) -> State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats), - {ok, proc_lib:spawn(?MODULE, mainloop, [State])}. + Deb = sys:debug_options([]), + {ok, proc_lib:spawn(?MODULE, mainloop, [Deb, State])}. start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) -> State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats), - {ok, proc_lib:spawn_link(?MODULE, mainloop, [State])}. + Deb = sys:debug_options([]), + {ok, proc_lib:spawn_link(?MODULE, mainloop, [Deb, State])}. initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) -> (case ReaderWantsStats of @@ -113,32 +129,54 @@ initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) -> pending = []}, #wstate.stats_timer). -mainloop(State) -> +system_continue(Parent, Deb, State) -> + mainloop(Deb, State#wstate{reader = Parent}). + +system_terminate(Reason, _Parent, _Deb, _State) -> + exit(Reason). + +system_code_change(Misc, _Module, _OldVsn, _Extra) -> + {ok, Misc}. + +mainloop(Deb, State) -> try - mainloop1(State) + mainloop1(Deb, State) catch exit:Error -> #wstate{reader = ReaderPid, channel = Channel} = State, ReaderPid ! {channel_exit, Channel, Error} end, done. -mainloop1(State = #wstate{pending = []}) -> +mainloop1(Deb, State = #wstate{pending = []}) -> receive - Message -> ?MODULE:mainloop1(handle_message(Message, State)) + Message -> {Deb1, State1} = handle_message(Deb, Message, State), + ?MODULE:mainloop1(Deb1, State1) after ?HIBERNATE_AFTER -> - erlang:hibernate(?MODULE, mainloop, [State]) + erlang:hibernate(?MODULE, mainloop, [Deb, State]) end; -mainloop1(State) -> +mainloop1(Deb, State) -> receive - Message -> ?MODULE:mainloop1(handle_message(Message, State)) + Message -> {Deb1, State1} = handle_message(Deb, Message, State), + ?MODULE:mainloop1(Deb1, State1) after 0 -> - ?MODULE:mainloop1(internal_flush(State)) + ?MODULE:mainloop1(Deb, internal_flush(State)) end. +handle_message(Deb, {system, From, Req}, State = #wstate{reader = Parent}) -> + sys:handle_system_msg(Req, From, Parent, ?MODULE, Deb, State); +handle_message(Deb, Message, State) -> + {Deb, handle_message(Message, State)}. + handle_message({send_command, MethodRecord}, State) -> internal_send_command_async(MethodRecord, State); handle_message({send_command, MethodRecord, Content}, State) -> internal_send_command_async(MethodRecord, Content, State); +handle_message({send_command_flow, MethodRecord, Sender}, State) -> + credit_flow:ack(Sender), + internal_send_command_async(MethodRecord, State); +handle_message({send_command_flow, MethodRecord, Content, Sender}, State) -> + credit_flow:ack(Sender), + internal_send_command_async(MethodRecord, Content, State); handle_message({'$gen_call', From, {send_command_sync, MethodRecord}}, State) -> State1 = internal_flush( internal_send_command_async(MethodRecord, State)), @@ -186,6 +224,16 @@ send_command(W, MethodRecord, Content) -> W ! {send_command, MethodRecord, Content}, ok. +send_command_flow(W, MethodRecord) -> + credit_flow:send(W), + W ! {send_command_flow, MethodRecord, self()}, + ok. + +send_command_flow(W, MethodRecord, Content) -> + credit_flow:send(W), + W ! {send_command_flow, MethodRecord, Content, self()}, + ok. + send_command_sync(W, MethodRecord) -> call(W, {send_command_sync, MethodRecord}). diff --git a/src/supervisor2.erl b/src/supervisor2.erl index 23bfe7f1..db4c388a 100644 --- a/src/supervisor2.erl +++ b/src/supervisor2.erl @@ -1,14 +1,18 @@ -%% This file is a copy of supervisor.erl from the R13B-3 Erlang/OTP +%% This file is a copy of supervisor.erl from the R16B Erlang/OTP %% distribution, with the following modifications: %% %% 1) the module name is supervisor2 %% -%% 2) there is a new strategy called -%% simple_one_for_one_terminate. This is exactly the same as for -%% simple_one_for_one, except that children *are* explicitly -%% terminated as per the shutdown component of the child_spec. +%% 2) a find_child/2 utility function has been added %% -%% 3) child specifications can contain, as the restart type, a tuple +%% 3) Added an 'intrinsic' restart type. Like the transient type, this +%% type means the child should only be restarted if the child exits +%% abnormally. Unlike the transient type, if the child exits +%% normally, the supervisor itself also exits normally. If the +%% child is a supervisor and it exits normally (i.e. with reason of +%% 'shutdown') then the child's parent also exits normally. +%% +%% 4) child specifications can contain, as the restart type, a tuple %% {permanent, Delay} | {transient, Delay} | {intrinsic, Delay} %% where Delay >= 0 (see point (4) below for intrinsic). The delay, %% in seconds, indicates what should happen if a child, upon being @@ -41,13 +45,6 @@ %% perspective it's a normal exit, whilst from supervisor's %% perspective, it's an abnormal exit. %% -%% 4) Added an 'intrinsic' restart type. Like the transient type, this -%% type means the child should only be restarted if the child exits -%% abnormally. Unlike the transient type, if the child exits -%% normally, the supervisor itself also exits normally. If the -%% child is a supervisor and it exits normally (i.e. with reason of -%% 'shutdown') then the child's parent also exits normally. -%% %% 5) normal, and {shutdown, _} exit reasons are all treated the same %% (i.e. are regarded as normal exits) %% @@ -55,7 +52,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 1996-2009. All Rights Reserved. +%% Copyright Ericsson AB 1996-2012. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -75,61 +72,34 @@ -behaviour(gen_server). %% External exports --export([start_link/2,start_link/3, +-export([start_link/2, start_link/3, start_child/2, restart_child/2, delete_child/2, terminate_child/2, - which_children/1, find_child/2, - check_childspecs/1]). + which_children/1, count_children/1, + find_child/2, check_childspecs/1]). %% Internal exports --export([init/1, handle_call/3, handle_info/2, terminate/2, code_change/3]). --export([handle_cast/2]). - --define(DICT, dict). - --record(state, {name, - strategy, - children = [], - dynamics = ?DICT:new(), - intensity, - period, - restarts = [], - module, - args}). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). +-export([try_again_restart/3]). --record(child, {pid = undefined, % pid is undefined when child is not running - name, - mfa, - restart_type, - shutdown, - child_type, - modules = []}). - --define(is_simple(State), State#state.strategy =:= simple_one_for_one orelse - State#state.strategy =:= simple_one_for_one_terminate). --define(is_terminate_simple(State), - State#state.strategy =:= simple_one_for_one_terminate). - --ifdef(use_specs). - -%%-------------------------------------------------------------------------- -%% Types %%-------------------------------------------------------------------------- - +-ifdef(use_specs). -export_type([child_spec/0, startchild_ret/0, strategy/0, sup_name/0]). +-endif. +%%-------------------------------------------------------------------------- --type child() :: 'undefined' | pid(). +-ifdef(use_specs). +-type child() :: 'undefined' | pid(). -type child_id() :: term(). --type mfargs() :: {M :: module(), F :: atom(), A :: [term()] | undefined}. --type modules() :: [module()] | 'dynamic'. --type delay() :: non_neg_integer(). --type restart() :: 'permanent' | 'transient' | 'temporary' | 'intrinsic' - | {'permanent', delay()} | {'transient', delay()} - | {'intrinsic', delay()}. +-type mfargs() :: {M :: module(), F :: atom(), A :: [term()] | undefined}. +-type modules() :: [module()] | 'dynamic'. +-type delay() :: non_neg_integer(). +-type restart() :: 'permanent' | 'transient' | 'temporary' | 'intrinsic' | {'permanent', delay()} | {'transient', delay()} | {'intrinsic', delay()}. -type shutdown() :: 'brutal_kill' | timeout(). --type worker() :: 'worker' | 'supervisor'. +-type worker() :: 'worker' | 'supervisor'. -type sup_name() :: {'local', Name :: atom()} | {'global', Name :: atom()}. --type sup_ref() :: (Name :: atom()) +-type sup_ref() :: (Name :: atom()) | {Name :: atom(), Node :: node()} | {'global', Name :: atom()} | pid(). @@ -140,40 +110,110 @@ Type :: worker(), Modules :: modules()}. - -type strategy() :: 'one_for_all' | 'one_for_one' - | 'rest_for_one' | 'simple_one_for_one' - | 'simple_one_for_one_terminate'. - --type child_rec() :: #child{pid :: child() | {restarting,pid()} | [pid()], - name :: child_id(), - mfa :: mfargs(), - restart_type :: restart(), - shutdown :: shutdown(), - child_type :: worker(), - modules :: modules()}. - --type state() :: #state{strategy :: strategy(), - children :: [child_rec()], - dynamics :: ?DICT(), - intensity :: non_neg_integer(), - period :: pos_integer()}. + | 'rest_for_one' | 'simple_one_for_one'. +-endif. %%-------------------------------------------------------------------------- -%% Callback behaviour -%%-------------------------------------------------------------------------- +-ifdef(use_specs). +-record(child, {% pid is undefined when child is not running + pid = undefined :: child() | {restarting,pid()} | [pid()], + name :: child_id(), + mfargs :: mfargs(), + restart_type :: restart(), + shutdown :: shutdown(), + child_type :: worker(), + modules = [] :: modules()}). +-type child_rec() :: #child{}. +-else. +-record(child, { + pid = undefined, + name, + mfargs, + restart_type, + shutdown, + child_type, + modules = []}). +-endif. + +-define(DICT, dict). +-define(SETS, sets). +-define(SET, set). + +-ifdef(use_specs). +-record(state, {name, + strategy :: strategy(), + children = [] :: [child_rec()], + dynamics :: ?DICT() | ?SET(), + intensity :: non_neg_integer(), + period :: pos_integer(), + restarts = [], + module, + args}). +-type state() :: #state{}. +-else. +-record(state, {name, + strategy, + children = [], + dynamics, + intensity, + period, + restarts = [], + module, + args}). +-endif. + +-define(is_simple(State), State#state.strategy =:= simple_one_for_one). +-define(is_permanent(R), ((R =:= permanent) orelse + (is_tuple(R) andalso + tuple_size(R) == 2 andalso + element(1, R) =:= permanent))). +-define(is_explicit_restart(R), + R == {shutdown, restart}). + +-ifdef(use_specs). -callback init(Args :: term()) -> {ok, {{RestartStrategy :: strategy(), - MaxR :: non_neg_integer(), - MaxT :: non_neg_integer()}, + MaxR :: non_neg_integer(), + MaxT :: non_neg_integer()}, [ChildSpec :: child_spec()]}} | ignore. +-endif. +-define(restarting(_Pid_), {restarting,_Pid_}). -%%-------------------------------------------------------------------------- -%% Specs -%%-------------------------------------------------------------------------- +%%% --------------------------------------------------- +%%% This is a general process supervisor built upon gen_server.erl. +%%% Servers/processes should/could also be built using gen_server.erl. +%%% SupName = {local, atom()} | {global, atom()}. +%%% --------------------------------------------------- +-ifdef(use_specs). +-type startlink_err() :: {'already_started', pid()} + | {'shutdown', term()} + | term(). +-type startlink_ret() :: {'ok', pid()} | 'ignore' | {'error', startlink_err()}. + +-spec start_link(Module, Args) -> startlink_ret() when + Module :: module(), + Args :: term(). +-endif. +start_link(Mod, Args) -> + gen_server:start_link(?MODULE, {self, Mod, Args}, []). + +-ifdef(use_specs). +-spec start_link(SupName, Module, Args) -> startlink_ret() when + SupName :: sup_name(), + Module :: module(), + Args :: term(). +-endif. +start_link(SupName, Mod, Args) -> + gen_server:start_link(SupName, ?MODULE, {SupName, Mod, Args}, []). + +%%% --------------------------------------------------- +%%% Interface functions. +%%% --------------------------------------------------- +-ifdef(use_specs). -type startchild_err() :: 'already_present' | {'already_started', Child :: child()} | term(). -type startchild_ret() :: {'ok', Child :: child()} @@ -183,91 +223,30 @@ -spec start_child(SupRef, ChildSpec) -> startchild_ret() when SupRef :: sup_ref(), ChildSpec :: child_spec() | (List :: [term()]). +-endif. +start_child(Supervisor, ChildSpec) -> + call(Supervisor, {start_child, ChildSpec}). +-ifdef(use_specs). -spec restart_child(SupRef, Id) -> Result when SupRef :: sup_ref(), Id :: child_id(), Result :: {'ok', Child :: child()} | {'ok', Child :: child(), Info :: term()} | {'error', Error}, - Error :: 'running' | 'not_found' | 'simple_one_for_one' | term(). + Error :: 'running' | 'restarting' | 'not_found' | 'simple_one_for_one' | + term(). +-endif. +restart_child(Supervisor, Name) -> + call(Supervisor, {restart_child, Name}). +-ifdef(use_specs). -spec delete_child(SupRef, Id) -> Result when SupRef :: sup_ref(), Id :: child_id(), Result :: 'ok' | {'error', Error}, - Error :: 'running' | 'not_found' | 'simple_one_for_one'. - --spec terminate_child(SupRef, Id) -> Result when - SupRef :: sup_ref(), - Id :: pid() | child_id(), - Result :: 'ok' | {'error', Error}, - Error :: 'not_found' | 'simple_one_for_one'. - --spec which_children(SupRef) -> [{Id,Child,Type,Modules}] when - SupRef :: sup_ref(), - Id :: child_id() | 'undefined', - Child :: child(), - Type :: worker(), - Modules :: modules(). - --spec check_childspecs(ChildSpecs) -> Result when - ChildSpecs :: [child_spec()], - Result :: 'ok' | {'error', Error :: term()}. - --type init_sup_name() :: sup_name() | 'self'. - --type stop_rsn() :: 'shutdown' | {'bad_return', {module(),'init', term()}} - | {'bad_start_spec', term()} | {'start_spec', term()} - | {'supervisor_data', term()}. - --spec init({init_sup_name(), module(), [term()]}) -> - {'ok', state()} | 'ignore' | {'stop', stop_rsn()}. - --type call() :: 'which_children'. --spec handle_call(call(), term(), state()) -> {'reply', term(), state()}. - --spec handle_cast('null', state()) -> {'noreply', state()}. - --spec handle_info(term(), state()) -> - {'noreply', state()} | {'stop', 'shutdown', state()}. - --spec terminate(term(), state()) -> 'ok'. - --spec code_change(term(), state(), term()) -> - {'ok', state()} | {'error', term()}. - --else. - --export([behaviour_info/1]). - -behaviour_info(callbacks) -> - [{init,1}]; -behaviour_info(_Other) -> - undefined. - + Error :: 'running' | 'restarting' | 'not_found' | 'simple_one_for_one'. -endif. - -%%% --------------------------------------------------- -%%% This is a general process supervisor built upon gen_server.erl. -%%% Servers/processes should/could also be built using gen_server.erl. -%%% SupName = {local, atom()} | {global, atom()}. -%%% --------------------------------------------------- -start_link(Mod, Args) -> - gen_server:start_link(?MODULE, {self, Mod, Args}, []). - -start_link(SupName, Mod, Args) -> - gen_server:start_link(SupName, ?MODULE, {SupName, Mod, Args}, []). - -%%% --------------------------------------------------- -%%% Interface functions. -%%% --------------------------------------------------- -start_child(Supervisor, ChildSpec) -> - call(Supervisor, {start_child, ChildSpec}). - -restart_child(Supervisor, Name) -> - call(Supervisor, {restart_child, Name}). - delete_child(Supervisor, Name) -> call(Supervisor, {delete_child, Name}). @@ -277,12 +256,44 @@ delete_child(Supervisor, Name) -> %% Note that the child is *always* terminated in some %% way (maybe killed). %%----------------------------------------------------------------- +-ifdef(use_specs). +-spec terminate_child(SupRef, Id) -> Result when + SupRef :: sup_ref(), + Id :: pid() | child_id(), + Result :: 'ok' | {'error', Error}, + Error :: 'not_found' | 'simple_one_for_one'. +-endif. terminate_child(Supervisor, Name) -> call(Supervisor, {terminate_child, Name}). +-ifdef(use_specs). +-spec which_children(SupRef) -> [{Id,Child,Type,Modules}] when + SupRef :: sup_ref(), + Id :: child_id() | undefined, + Child :: child() | 'restarting', + Type :: worker(), + Modules :: modules(). +-endif. which_children(Supervisor) -> call(Supervisor, which_children). +-ifdef(use_specs). +-spec count_children(SupRef) -> PropListOfCounts when + SupRef :: sup_ref(), + PropListOfCounts :: [Count], + Count :: {specs, ChildSpecCount :: non_neg_integer()} + | {active, ActiveProcessCount :: non_neg_integer()} + | {supervisors, ChildSupervisorCount :: non_neg_integer()} + |{workers, ChildWorkerCount :: non_neg_integer()}. +-endif. +count_children(Supervisor) -> + call(Supervisor, count_children). + +-ifdef(use_specs). +-spec find_child(Supervisor, Name) -> [pid()] when + Supervisor :: sup_ref(), + Name :: child_id(). +-endif. find_child(Supervisor, Name) -> [Pid || {Name1, Pid, _Type, _Modules} <- which_children(Supervisor), Name1 =:= Name]. @@ -290,6 +301,11 @@ find_child(Supervisor, Name) -> call(Supervisor, Req) -> gen_server:call(Supervisor, Req, infinity). +-ifdef(use_specs). +-spec check_childspecs(ChildSpecs) -> Result when + ChildSpecs :: [child_spec()], + Result :: 'ok' | {'error', Error :: term()}. +-endif. check_childspecs(ChildSpecs) when is_list(ChildSpecs) -> case check_startspec(ChildSpecs) of {ok, _} -> ok; @@ -297,11 +313,37 @@ check_childspecs(ChildSpecs) when is_list(ChildSpecs) -> end; check_childspecs(X) -> {error, {badarg, X}}. +%%%----------------------------------------------------------------- +%%% Called by timer:apply_after from restart/2 +-ifdef(use_specs). +-spec try_again_restart(SupRef, Child, Reason) -> ok when + SupRef :: sup_ref(), + Child :: child_id() | pid(), + Reason :: term(). +-endif. +try_again_restart(Supervisor, Child, Reason) -> + cast(Supervisor, {try_again_restart, Child, Reason}). + +cast(Supervisor, Req) -> + gen_server:cast(Supervisor, Req). + %%% --------------------------------------------------- -%%% +%%% %%% Initialize the supervisor. -%%% +%%% %%% --------------------------------------------------- +-ifdef(use_specs). +-type init_sup_name() :: sup_name() | 'self'. + +-type stop_rsn() :: {'shutdown', term()} + | {'bad_return', {module(),'init', term()}} + | {'bad_start_spec', term()} + | {'start_spec', term()} + | {'supervisor_data', term()}. + +-spec init({init_sup_name(), module(), [term()]}) -> + {'ok', state()} | 'ignore' | {'stop', stop_rsn()}. +-endif. init({SupName, Mod, Args}) -> process_flag(trap_exit, true), case Mod:init(Args) of @@ -327,9 +369,9 @@ init_children(State, StartSpec) -> case start_children(Children, SupName) of {ok, NChildren} -> {ok, State#state{children = NChildren}}; - {error, NChildren} -> + {error, NChildren, Reason} -> terminate_children(NChildren, SupName), - {stop, shutdown} + {stop, {shutdown, Reason}} end; Error -> {stop, {start_spec, Error}} @@ -347,32 +389,35 @@ init_dynamic(_State, StartSpec) -> %%----------------------------------------------------------------- %% Func: start_children/2 -%% Args: Children = [#child] in start order -%% SupName = {local, atom()} | {global, atom()} | {pid(),Mod} +%% Args: Children = [child_rec()] in start order +%% SupName = {local, atom()} | {global, atom()} | {pid(), Mod} %% Purpose: Start all children. The new list contains #child's %% with pids. -%% Returns: {ok, NChildren} | {error, NChildren} -%% NChildren = [#child] in termination order (reversed +%% Returns: {ok, NChildren} | {error, NChildren, Reason} +%% NChildren = [child_rec()] in termination order (reversed %% start order) %%----------------------------------------------------------------- start_children(Children, SupName) -> start_children(Children, [], SupName). start_children([Child|Chs], NChildren, SupName) -> case do_start_child(SupName, Child) of + {ok, undefined} when Child#child.restart_type =:= temporary -> + start_children(Chs, NChildren, SupName); {ok, Pid} -> start_children(Chs, [Child#child{pid = Pid}|NChildren], SupName); {ok, Pid, _Extra} -> start_children(Chs, [Child#child{pid = Pid}|NChildren], SupName); {error, Reason} -> report_error(start_error, Reason, Child, SupName), - {error, lists:reverse(Chs) ++ [Child | NChildren]} + {error, lists:reverse(Chs) ++ [Child | NChildren], + {failed_to_start_child,Child#child.name,Reason}} end; start_children([], NChildren, _SupName) -> {ok, NChildren}. do_start_child(SupName, Child) -> - #child{mfa = {M, F, A}} = Child, - case catch apply(M, F, A) of + #child{mfargs = {M, F, Args}} = Child, + case catch apply(M, F, Args) of {ok, Pid} when is_pid(Pid) -> NChild = Child#child{pid = Pid}, report_progress(NChild, SupName), @@ -401,35 +446,54 @@ do_start_child_i(M, F, A) -> {error, What} end. - %%% --------------------------------------------------- -%%% +%%% %%% Callback functions. -%%% +%%% %%% --------------------------------------------------- +-ifdef(use_specs). +-type call() :: 'which_children' | 'count_children' | {_, _}. % XXX: refine +-spec handle_call(call(), term(), state()) -> {'reply', term(), state()}. +-endif. handle_call({start_child, EArgs}, _From, State) when ?is_simple(State) -> - #child{mfa = {M, F, A}} = hd(State#state.children), + Child = hd(State#state.children), + #child{mfargs = {M, F, A}} = Child, Args = A ++ EArgs, case do_start_child_i(M, F, Args) of - {ok, undefined} -> - {reply, {ok, undefined}, State}; + {ok, undefined} when Child#child.restart_type =:= temporary -> + {reply, {ok, undefined}, State}; {ok, Pid} -> - NState = State#state{dynamics = - ?DICT:store(Pid, Args, State#state.dynamics)}, + NState = save_dynamic_child(Child#child.restart_type, Pid, Args, State), {reply, {ok, Pid}, NState}; {ok, Pid, Extra} -> - NState = State#state{dynamics = - ?DICT:store(Pid, Args, State#state.dynamics)}, + NState = save_dynamic_child(Child#child.restart_type, Pid, Args, State), {reply, {ok, Pid, Extra}, NState}; What -> {reply, What, State} end; -%%% The requests terminate_child, delete_child and restart_child are -%%% invalid for simple_one_for_one and simple_one_for_one_terminate -%%% supervisors. +%% terminate_child for simple_one_for_one can only be done with pid +handle_call({terminate_child, Name}, _From, State) when not is_pid(Name), + ?is_simple(State) -> + {reply, {error, simple_one_for_one}, State}; + +handle_call({terminate_child, Name}, _From, State) -> + case get_child(Name, State, ?is_simple(State)) of + {value, Child} -> + case do_terminate(Child, State#state.name) of + #child{restart_type=RT} when RT=:=temporary; ?is_simple(State) -> + {reply, ok, state_del_child(Child, State)}; + NChild -> + {reply, ok, replace_child(NChild, State)} + end; + false -> + {reply, {error, not_found}, State} + end; + +%%% The requests delete_child and restart_child are invalid for +%%% simple_one_for_one supervisors. handle_call({_Req, _Data}, _From, State) when ?is_simple(State) -> - {reply, {error, State#state.strategy}, State}; + {reply, {error, simple_one_for_one}, State}; handle_call({start_child, ChildSpec}, _From, State) -> case check_childspec(ChildSpec) of @@ -453,6 +517,8 @@ handle_call({restart_child, Name}, _From, State) -> Error -> {reply, Error, State} end; + {value, #child{pid=?restarting(_)}} -> + {reply, {error, restarting}, State}; {value, _} -> {reply, {error, running}, State}; _ -> @@ -464,60 +530,146 @@ handle_call({delete_child, Name}, _From, State) -> {value, Child} when Child#child.pid =:= undefined -> NState = remove_child(Child, State), {reply, ok, NState}; + {value, #child{pid=?restarting(_)}} -> + {reply, {error, restarting}, State}; {value, _} -> {reply, {error, running}, State}; _ -> {reply, {error, not_found}, State} end; -handle_call({terminate_child, Name}, _From, State) -> - case get_child(Name, State) of - {value, Child} -> - NChild = do_terminate(Child, State#state.name), - {reply, ok, replace_child(NChild, State)}; - _ -> - {reply, {error, not_found}, State} - end; +handle_call(which_children, _From, #state{children = [#child{restart_type = temporary, + child_type = CT, + modules = Mods}]} = + State) when ?is_simple(State) -> + Reply = lists:map(fun(Pid) -> {undefined, Pid, CT, Mods} end, + ?SETS:to_list(dynamics_db(temporary, State#state.dynamics))), + {reply, Reply, State}; -handle_call(which_children, _From, State) when ?is_simple(State) -> - [#child{child_type = CT, modules = Mods}] = State#state.children, - Reply = lists:map(fun ({Pid, _}) -> {undefined, Pid, CT, Mods} end, - ?DICT:to_list(State#state.dynamics)), +handle_call(which_children, _From, #state{children = [#child{restart_type = RType, + child_type = CT, + modules = Mods}]} = + State) when ?is_simple(State) -> + Reply = lists:map(fun({?restarting(_),_}) -> {undefined,restarting,CT,Mods}; + ({Pid, _}) -> {undefined, Pid, CT, Mods} end, + ?DICT:to_list(dynamics_db(RType, State#state.dynamics))), {reply, Reply, State}; handle_call(which_children, _From, State) -> Resp = - lists:map(fun (#child{pid = Pid, name = Name, + lists:map(fun(#child{pid = ?restarting(_), name = Name, + child_type = ChildType, modules = Mods}) -> + {Name, restarting, ChildType, Mods}; + (#child{pid = Pid, name = Name, child_type = ChildType, modules = Mods}) -> - {Name, Pid, ChildType, Mods} + {Name, Pid, ChildType, Mods} end, State#state.children), - {reply, Resp, State}. + {reply, Resp, State}; -%%% Hopefully cause a function-clause as there is no API function -%%% that utilizes cast. -handle_cast(null, State) -> - error_logger:error_msg("ERROR: Supervisor received cast-message 'null'~n", - []), - {noreply, State}. +handle_call(count_children, _From, #state{children = [#child{restart_type = temporary, + child_type = CT}]} = State) + when ?is_simple(State) -> + {Active, Count} = + ?SETS:fold(fun(Pid, {Alive, Tot}) -> + case is_pid(Pid) andalso is_process_alive(Pid) of + true ->{Alive+1, Tot +1}; + false -> + {Alive, Tot + 1} + end + end, {0, 0}, dynamics_db(temporary, State#state.dynamics)), + Reply = case CT of + supervisor -> [{specs, 1}, {active, Active}, + {supervisors, Count}, {workers, 0}]; + worker -> [{specs, 1}, {active, Active}, + {supervisors, 0}, {workers, Count}] + end, + {reply, Reply, State}; -handle_info({delayed_restart, {RestartType, Reason, Child}}, State) +handle_call(count_children, _From, #state{children = [#child{restart_type = RType, + child_type = CT}]} = State) when ?is_simple(State) -> - {ok, NState} = do_restart(RestartType, Reason, Child, State), - {noreply, NState}; -handle_info({delayed_restart, {RestartType, Reason, Child}}, State) -> - case get_child(Child#child.name, State) of - {value, Child1} -> - {ok, NState} = do_restart(RestartType, Reason, Child1, State), - {noreply, NState}; - _ -> + {Active, Count} = + ?DICT:fold(fun(Pid, _Val, {Alive, Tot}) -> + case is_pid(Pid) andalso is_process_alive(Pid) of + true -> + {Alive+1, Tot +1}; + false -> + {Alive, Tot + 1} + end + end, {0, 0}, dynamics_db(RType, State#state.dynamics)), + Reply = case CT of + supervisor -> [{specs, 1}, {active, Active}, + {supervisors, Count}, {workers, 0}]; + worker -> [{specs, 1}, {active, Active}, + {supervisors, 0}, {workers, Count}] + end, + {reply, Reply, State}; + +handle_call(count_children, _From, State) -> + %% Specs and children are together on the children list... + {Specs, Active, Supers, Workers} = + lists:foldl(fun(Child, Counts) -> + count_child(Child, Counts) + end, {0,0,0,0}, State#state.children), + + %% Reformat counts to a property list. + Reply = [{specs, Specs}, {active, Active}, + {supervisors, Supers}, {workers, Workers}], + {reply, Reply, State}. + + +count_child(#child{pid = Pid, child_type = worker}, + {Specs, Active, Supers, Workers}) -> + case is_pid(Pid) andalso is_process_alive(Pid) of + true -> {Specs+1, Active+1, Supers, Workers+1}; + false -> {Specs+1, Active, Supers, Workers+1} + end; +count_child(#child{pid = Pid, child_type = supervisor}, + {Specs, Active, Supers, Workers}) -> + case is_pid(Pid) andalso is_process_alive(Pid) of + true -> {Specs+1, Active+1, Supers+1, Workers}; + false -> {Specs+1, Active, Supers+1, Workers} + end. + + +%%% If a restart attempt failed, this message is sent via +%%% timer:apply_after(0,...) in order to give gen_server the chance to +%%% check it's inbox before trying again. +-ifdef(use_specs). +-spec handle_cast({try_again_restart, child_id() | pid(), term()}, state()) -> + {'noreply', state()} | {stop, shutdown, state()}. +-endif. +handle_cast({try_again_restart,Pid,Reason}, #state{children=[Child]}=State) + when ?is_simple(State) -> + RT = Child#child.restart_type, + RPid = restarting(Pid), + case dynamic_child_args(RPid, dynamics_db(RT, State#state.dynamics)) of + {ok, Args} -> + {M, F, _} = Child#child.mfargs, + NChild = Child#child{pid = RPid, mfargs = {M, F, Args}}, + try_restart(Child#child.restart_type, Reason, NChild, State); + error -> {noreply, State} end; +handle_cast({try_again_restart,Name,Reason}, State) -> + %% we still support >= R12-B3 in which lists:keyfind/3 doesn't exist + case lists:keysearch(Name,#child.name,State#state.children) of + {value, Child = #child{pid=?restarting(_), restart_type=RestartType}} -> + try_restart(RestartType, Reason, Child, State); + _ -> + {noreply,State} + end. + %% %% Take care of terminated children. %% +-ifdef(use_specs). +-spec handle_info(term(), state()) -> + {'noreply', state()} | {'stop', 'shutdown', state()}. +-endif. handle_info({'EXIT', Pid, Reason}, State) -> case restart_child(Pid, Reason, State) of {ok, State1} -> @@ -526,20 +678,34 @@ handle_info({'EXIT', Pid, Reason}, State) -> {stop, shutdown, State1} end; +handle_info({delayed_restart, {RestartType, Reason, Child}}, State) + when ?is_simple(State) -> + try_restart(RestartType, Reason, Child, State); +handle_info({delayed_restart, {RestartType, Reason, Child}}, State) -> + case get_child(Child#child.name, State) of + {value, Child1} -> + try_restart(RestartType, Reason, Child1, State); + _What -> + {noreply, State} + end; + handle_info(Msg, State) -> - error_logger:error_msg("Supervisor received unexpected message: ~p~n", + error_logger:error_msg("Supervisor received unexpected message: ~p~n", [Msg]), {noreply, State}. + %% %% Terminate this server. %% -terminate(_Reason, State) when ?is_terminate_simple(State) -> - terminate_simple_children( - hd(State#state.children), State#state.dynamics, State#state.name), - ok; +-ifdef(use_specs). +-spec terminate(term(), state()) -> 'ok'. +-endif. +terminate(_Reason, #state{children=[Child]} = State) when ?is_simple(State) -> + terminate_dynamic_children(Child, dynamics_db(Child#child.restart_type, + State#state.dynamics), + State#state.name); terminate(_Reason, State) -> - terminate_children(State#state.children, State#state.name), - ok. + terminate_children(State#state.children, State#state.name). %% %% Change code for the supervisor. @@ -550,6 +716,10 @@ terminate(_Reason, State) -> %% NOTE: This requires that the init function of the call-back module %% does not have any side effects. %% +-ifdef(use_specs). +-spec code_change(term(), state(), term()) -> + {'ok', state()} | {'error', term()}. +-endif. code_change(_, State, _) -> case (State#state.module):init(State#state.args) of {ok, {SupFlags, StartSpec}} -> @@ -577,14 +747,13 @@ check_flags({Strategy, MaxIntensity, Period}) -> check_flags(What) -> {bad_flags, What}. -update_childspec(State, StartSpec) when ?is_simple(State) -> +update_childspec(State, StartSpec) when ?is_simple(State) -> case check_startspec(StartSpec) of {ok, [Child]} -> {ok, State#state{children = [Child]}}; Error -> {error, Error} end; - update_childspec(State, StartSpec) -> case check_startspec(StartSpec) of {ok, Children} -> @@ -603,11 +772,11 @@ update_childspec1([Child|OldC], Children, KeepOld) -> update_childspec1(OldC, Children, [Child|KeepOld]) end; update_childspec1([], Children, KeepOld) -> - % Return them in (keeped) reverse start order. + %% Return them in (kept) reverse start order. lists:reverse(Children ++ KeepOld). update_chsp(OldCh, Children) -> - case lists:map(fun (Ch) when OldCh#child.name =:= Ch#child.name -> + case lists:map(fun(Ch) when OldCh#child.name =:= Ch#child.name -> Ch#child{pid = OldCh#child.pid}; (Ch) -> Ch @@ -618,7 +787,7 @@ update_chsp(OldCh, Children) -> NewC -> {ok, NewC} end. - + %%% --------------------------------------------------- %%% Start a new child. %%% --------------------------------------------------- @@ -627,20 +796,16 @@ handle_start_child(Child, State) -> case get_child(Child#child.name, State) of false -> case do_start_child(State#state.name, Child) of + {ok, undefined} when Child#child.restart_type =:= temporary -> + {{ok, undefined}, State}; {ok, Pid} -> - Children = State#state.children, - {{ok, Pid}, - State#state{children = - [Child#child{pid = Pid}|Children]}}; + {{ok, Pid}, save_child(Child#child{pid = Pid}, State)}; {ok, Pid, Extra} -> - Children = State#state.children, - {{ok, Pid, Extra}, - State#state{children = - [Child#child{pid = Pid}|Children]}}; + {{ok, Pid, Extra}, save_child(Child#child{pid = Pid}, State)}; {error, What} -> {{error, {What, Child}}, State} end; - {value, OldChild} when OldChild#child.pid =/= undefined -> + {value, OldChild} when is_pid(OldChild#child.pid) -> {{error, {already_started, OldChild#child.pid}}, State}; {value, _OldChild} -> {{error, already_present}, State} @@ -648,105 +813,145 @@ handle_start_child(Child, State) -> %%% --------------------------------------------------- %%% Restart. A process has terminated. -%%% Returns: {ok, #state} | {shutdown, #state} +%%% Returns: {ok, state()} | {shutdown, state()} %%% --------------------------------------------------- -restart_child(Pid, Reason, State) when ?is_simple(State) -> - case ?DICT:find(Pid, State#state.dynamics) of +restart_child(Pid, Reason, #state{children = [Child]} = State) when ?is_simple(State) -> + RestartType = Child#child.restart_type, + case dynamic_child_args(Pid, dynamics_db(RestartType, State#state.dynamics)) of {ok, Args} -> - [Child] = State#state.children, - RestartType = Child#child.restart_type, - {M, F, _} = Child#child.mfa, - NChild = Child#child{pid = Pid, mfa = {M, F, Args}}, + {M, F, _} = Child#child.mfargs, + NChild = Child#child{pid = Pid, mfargs = {M, F, Args}}, do_restart(RestartType, Reason, NChild, State); error -> - {ok, State} + {ok, State} end; + restart_child(Pid, Reason, State) -> Children = State#state.children, + %% we still support >= R12-B3 in which lists:keyfind/3 doesn't exist case lists:keysearch(Pid, #child.pid, Children) of - {value, Child} -> - RestartType = Child#child.restart_type, + {value, #child{restart_type = RestartType} = Child} -> do_restart(RestartType, Reason, Child, State); - _ -> + false -> {ok, State} end. -do_restart({permanent = RestartType, Delay}, Reason, Child, State) -> - do_restart_delay({RestartType, Delay}, Reason, Child, State); -do_restart(permanent, Reason, Child, State) -> - report_error(child_terminated, Reason, Child, State#state.name), - restart(Child, State); -do_restart(Type, normal, Child, State) -> - del_child_and_maybe_shutdown(Type, Child, State); -do_restart({RestartType, Delay}, {shutdown, restart} = Reason, Child, State) - when RestartType =:= transient orelse RestartType =:= intrinsic -> - do_restart_delay({RestartType, Delay}, Reason, Child, State); -do_restart(Type, {shutdown, _}, Child, State) -> - del_child_and_maybe_shutdown(Type, Child, State); -do_restart(Type, shutdown, Child = #child{child_type = supervisor}, State) -> - del_child_and_maybe_shutdown(Type, Child, State); -do_restart({RestartType, Delay}, Reason, Child, State) - when RestartType =:= transient orelse RestartType =:= intrinsic -> - do_restart_delay({RestartType, Delay}, Reason, Child, State); -do_restart(Type, Reason, Child, State) when Type =:= transient orelse - Type =:= intrinsic -> - report_error(child_terminated, Reason, Child, State#state.name), +try_restart(RestartType, Reason, Child, State) -> + case handle_restart(RestartType, Reason, Child, State) of + {ok, NState} -> {noreply, NState}; + {shutdown, State2} -> {stop, shutdown, State2} + end. + +do_restart(RestartType, Reason, Child, State) -> + maybe_report_error(RestartType, Reason, Child, State), + handle_restart(RestartType, Reason, Child, State). + +maybe_report_error(permanent, Reason, Child, State) -> + report_child_termination(Reason, Child, State); +maybe_report_error({permanent, _}, Reason, Child, State) -> + report_child_termination(Reason, Child, State); +maybe_report_error(_Type, Reason, Child, State) -> + case is_abnormal_termination(Reason) of + true -> report_child_termination(Reason, Child, State); + false -> ok + end. + +report_child_termination(Reason, Child, State) -> + report_error(child_terminated, Reason, Child, State#state.name). + +handle_restart(permanent, _Reason, Child, State) -> restart(Child, State); -do_restart(temporary, Reason, Child, State) -> - report_error(child_terminated, Reason, Child, State#state.name), - NState = state_del_child(Child, State), - {ok, NState}. +handle_restart(transient, Reason, Child, State) -> + restart_if_explicit_or_abnormal(fun restart/2, + fun delete_child_and_continue/2, + Reason, Child, State); +handle_restart(intrinsic, Reason, Child, State) -> + restart_if_explicit_or_abnormal(fun restart/2, + fun delete_child_and_stop/2, + Reason, Child, State); +handle_restart(temporary, _Reason, Child, State) -> + delete_child_and_continue(Child, State); +handle_restart({permanent, _Delay}=Restart, Reason, Child, State) -> + do_restart_delay(Restart, Reason, Child, State); +handle_restart({transient, _Delay}=Restart, Reason, Child, State) -> + restart_if_explicit_or_abnormal(defer_to_restart_delay(Restart, Reason), + fun delete_child_and_continue/2, + Reason, Child, State); +handle_restart({intrinsic, _Delay}=Restart, Reason, Child, State) -> + restart_if_explicit_or_abnormal(defer_to_restart_delay(Restart, Reason), + fun delete_child_and_stop/2, + Reason, Child, State). + +restart_if_explicit_or_abnormal(RestartHow, Otherwise, Reason, Child, State) -> + case ?is_explicit_restart(Reason) orelse is_abnormal_termination(Reason) of + true -> RestartHow(Child, State); + false -> Otherwise(Child, State) + end. + +defer_to_restart_delay(Restart, Reason) -> + fun(Child, State) -> do_restart_delay(Restart, Reason, Child, State) end. + +delete_child_and_continue(Child, State) -> + {ok, state_del_child(Child, State)}. + +delete_child_and_stop(Child, State) -> + {shutdown, state_del_child(Child, State)}. + +is_abnormal_termination(normal) -> false; +is_abnormal_termination(shutdown) -> false; +is_abnormal_termination({shutdown, _}) -> false; +is_abnormal_termination(_Other) -> true. do_restart_delay({RestartType, Delay}, Reason, Child, State) -> - case restart1(Child, State) of + case add_restart(State) of {ok, NState} -> - {ok, NState}; - {terminate, NState} -> + maybe_restart(NState#state.strategy, Child, NState); + {terminate, _NState} -> + %% we've reached the max restart intensity, but the + %% add_restart will have added to the restarts + %% field. Given we don't want to die here, we need to go + %% back to the old restarts field otherwise we'll never + %% attempt to restart later, which is why we ignore + %% NState for this clause. _TRef = erlang:send_after(trunc(Delay*1000), self(), {delayed_restart, {{RestartType, Delay}, Reason, Child}}), - {ok, state_del_child(Child, NState)} + {ok, state_del_child(Child, State)} end. -del_child_and_maybe_shutdown(intrinsic, Child, State) -> - {shutdown, state_del_child(Child, State)}; -del_child_and_maybe_shutdown({intrinsic, _Delay}, Child, State) -> - {shutdown, state_del_child(Child, State)}; -del_child_and_maybe_shutdown(_, Child, State) -> - {ok, state_del_child(Child, State)}. - restart(Child, State) -> case add_restart(State) of {ok, NState} -> - restart(NState#state.strategy, Child, NState, fun restart/2); + maybe_restart(NState#state.strategy, Child, NState); {terminate, NState} -> report_error(shutdown, reached_max_restart_intensity, Child, State#state.name), - {shutdown, state_del_child(Child, NState)} + {shutdown, remove_child(Child, NState)} end. -restart1(Child, State) -> - case add_restart(State) of - {ok, NState} -> - restart(NState#state.strategy, Child, NState, fun restart1/2); - {terminate, _NState} -> - %% we've reached the max restart intensity, but the - %% add_restart will have added to the restarts - %% field. Given we don't want to die here, we need to go - %% back to the old restarts field otherwise we'll never - %% attempt to restart later. - {terminate, State} +maybe_restart(Strategy, Child, State) -> + case restart(Strategy, Child, State) of + {try_again, Reason, NState2} -> + %% Leaving control back to gen_server before + %% trying again. This way other incoming requsts + %% for the supervisor can be handled - e.g. a + %% shutdown request for the supervisor or the + %% child. + Id = if ?is_simple(State) -> Child#child.pid; + true -> Child#child.name + end, + timer:apply_after(0,?MODULE,try_again_restart,[self(),Id,Reason]), + {ok,NState2}; + Other -> + Other end. -restart(Strategy, Child, State, Restart) - when Strategy =:= simple_one_for_one orelse - Strategy =:= simple_one_for_one_terminate -> - #child{mfa = {M, F, A}} = Child, - Dynamics = ?DICT:erase(Child#child.pid, State#state.dynamics), +restart(simple_one_for_one, Child, State) -> + #child{pid = OldPid, mfargs = {M, F, A}} = Child, + Dynamics = ?DICT:erase(OldPid, dynamics_db(Child#child.restart_type, + State#state.dynamics)), case do_start_child_i(M, F, A) of - {ok, undefined} -> - {ok, State}; {ok, Pid} -> NState = State#state{dynamics = ?DICT:store(Pid, A, Dynamics)}, {ok, NState}; @@ -754,10 +959,13 @@ restart(Strategy, Child, State, Restart) NState = State#state{dynamics = ?DICT:store(Pid, A, Dynamics)}, {ok, NState}; {error, Error} -> + NState = State#state{dynamics = ?DICT:store(restarting(OldPid), A, + Dynamics)}, report_error(start_error, Error, Child, State#state.name), - Restart(Child, State) + {try_again, Error, NState} end; -restart(one_for_one, Child, State, Restart) -> +restart(one_for_one, Child, State) -> + OldPid = Child#child.pid, case do_start_child(State#state.name, Child) of {ok, Pid} -> NState = replace_child(Child#child{pid = Pid}, State), @@ -766,139 +974,83 @@ restart(one_for_one, Child, State, Restart) -> NState = replace_child(Child#child{pid = Pid}, State), {ok, NState}; {error, Reason} -> + NState = replace_child(Child#child{pid = restarting(OldPid)}, State), report_error(start_error, Reason, Child, State#state.name), - Restart(Child, State) + {try_again, Reason, NState} end; -restart(rest_for_one, Child, State, Restart) -> +restart(rest_for_one, Child, State) -> {ChAfter, ChBefore} = split_child(Child#child.pid, State#state.children), ChAfter2 = terminate_children(ChAfter, State#state.name), case start_children(ChAfter2, State#state.name) of {ok, ChAfter3} -> {ok, State#state{children = ChAfter3 ++ ChBefore}}; - {error, ChAfter3} -> - Restart(Child, State#state{children = ChAfter3 ++ ChBefore}) + {error, ChAfter3, Reason} -> + NChild = Child#child{pid=restarting(Child#child.pid)}, + NState = State#state{children = ChAfter3 ++ ChBefore}, + {try_again, Reason, replace_child(NChild,NState)} end; -restart(one_for_all, Child, State, Restart) -> +restart(one_for_all, Child, State) -> Children1 = del_child(Child#child.pid, State#state.children), Children2 = terminate_children(Children1, State#state.name), case start_children(Children2, State#state.name) of {ok, NChs} -> {ok, State#state{children = NChs}}; - {error, NChs} -> - Restart(Child, State#state{children = NChs}) + {error, NChs, Reason} -> + NChild = Child#child{pid=restarting(Child#child.pid)}, + NState = State#state{children = NChs}, + {try_again, Reason, replace_child(NChild,NState)} end. +restarting(Pid) when is_pid(Pid) -> ?restarting(Pid); +restarting(RPid) -> RPid. + %%----------------------------------------------------------------- %% Func: terminate_children/2 -%% Args: Children = [#child] in termination order +%% Args: Children = [child_rec()] in termination order %% SupName = {local, atom()} | {global, atom()} | {pid(),Mod} -%% Returns: NChildren = [#child] in +%% Returns: NChildren = [child_rec()] in %% startup order (reversed termination order) %%----------------------------------------------------------------- terminate_children(Children, SupName) -> terminate_children(Children, SupName, []). +%% Temporary children should not be restarted and thus should +%% be skipped when building the list of terminated children, although +%% we do want them to be shut down as many functions from this module +%% use this function to just clear everything. +terminate_children([Child = #child{restart_type=temporary} | Children], SupName, Res) -> + do_terminate(Child, SupName), + terminate_children(Children, SupName, Res); terminate_children([Child | Children], SupName, Res) -> NChild = do_terminate(Child, SupName), terminate_children(Children, SupName, [NChild | Res]); terminate_children([], _SupName, Res) -> Res. -terminate_simple_children(Child, Dynamics, SupName) -> - Pids = dict:fold(fun (Pid, _Args, Pids) -> - erlang:monitor(process, Pid), - unlink(Pid), - exit(Pid, child_exit_reason(Child)), - [Pid | Pids] - end, [], Dynamics), - TimeoutMsg = {timeout, make_ref()}, - TRef = timeout_start(Child, TimeoutMsg), - {Replies, Timedout} = - lists:foldl( - fun (_Pid, {Replies, Timedout}) -> - {Pid1, Reason1, Timedout1} = - receive - TimeoutMsg -> - Remaining = Pids -- [P || {P, _} <- Replies], - [exit(P, kill) || P <- Remaining], - receive - {'DOWN', _MRef, process, Pid, Reason} -> - {Pid, Reason, true} - end; - {'DOWN', _MRef, process, Pid, Reason} -> - {Pid, Reason, Timedout} - end, - {[{Pid1, child_res(Child, Reason1, Timedout1)} | Replies], - Timedout1} - end, {[], false}, Pids), - timeout_stop(Child, TRef, TimeoutMsg, Timedout), - ReportError = shutdown_error_reporter(SupName), - Report = fun(_, ok) -> ok; - (Pid, {error, R}) -> ReportError(R, Child#child{pid = Pid}) - end, - [receive - {'EXIT', Pid, Reason} -> - Report(Pid, child_res(Child, Reason, Timedout)) - after - 0 -> Report(Pid, Reply) - end || {Pid, Reply} <- Replies], - ok. - -child_exit_reason(#child{shutdown = brutal_kill}) -> kill; -child_exit_reason(#child{}) -> shutdown. - -child_res(#child{shutdown=brutal_kill}, killed, false) -> ok; -child_res(#child{}, shutdown, false) -> ok; -child_res(#child{restart_type=permanent}, normal, false) -> {error, normal}; -child_res(#child{restart_type={permanent,_}},normal, false) -> {error, normal}; -child_res(#child{}, normal, false) -> ok; -child_res(#child{}, R, _) -> {error, R}. - -timeout_start(#child{shutdown = Time}, Msg) when is_integer(Time) -> - erlang:send_after(Time, self(), Msg); -timeout_start(#child{}, _Msg) -> - ok. - -timeout_stop(#child{shutdown = Time}, TRef, Msg, false) when is_integer(Time) -> - erlang:cancel_timer(TRef), - receive - Msg -> ok - after - 0 -> ok - end; -timeout_stop(#child{}, _TRef, _Msg, _Timedout) -> - ok. - -do_terminate(Child, SupName) when Child#child.pid =/= undefined -> - ReportError = shutdown_error_reporter(SupName), +do_terminate(Child, SupName) when is_pid(Child#child.pid) -> case shutdown(Child#child.pid, Child#child.shutdown) of ok -> ok; - {error, normal} -> - case Child#child.restart_type of - permanent -> ReportError(normal, Child); - {permanent, _Delay} -> ReportError(normal, Child); - _ -> ok - end; + {error, normal} when not ?is_permanent(Child#child.restart_type) -> + ok; {error, OtherReason} -> - ReportError(OtherReason, Child) + report_error(shutdown_error, OtherReason, Child, SupName) end, Child#child{pid = undefined}; do_terminate(Child, _SupName) -> - Child. + Child#child{pid = undefined}. %%----------------------------------------------------------------- -%% Shutdowns a child. We must check the EXIT value +%% Shutdowns a child. We must check the EXIT value %% of the child, because it might have died with another reason than -%% the wanted. In that case we want to report the error. We put a -%% monitor on the child an check for the 'DOWN' message instead of -%% checking for the 'EXIT' message, because if we check the 'EXIT' -%% message a "naughty" child, who does unlink(Sup), could hang the -%% supervisor. +%% the wanted. In that case we want to report the error. We put a +%% monitor on the child an check for the 'DOWN' message instead of +%% checking for the 'EXIT' message, because if we check the 'EXIT' +%% message a "naughty" child, who does unlink(Sup), could hang the +%% supervisor. %% Returns: ok | {error, OtherReason} (this should be reported) %%----------------------------------------------------------------- shutdown(Pid, brutal_kill) -> - case monitor_child(Pid) of ok -> exit(Pid, kill), @@ -908,16 +1060,14 @@ shutdown(Pid, brutal_kill) -> {'DOWN', _MRef, process, Pid, OtherReason} -> {error, OtherReason} end; - {error, Reason} -> + {error, Reason} -> {error, Reason} end; - shutdown(Pid, Time) -> - case monitor_child(Pid) of ok -> exit(Pid, shutdown), %% Try to shutdown gracefully - receive + receive {'DOWN', _MRef, process, Pid, shutdown} -> ok; {'DOWN', _MRef, process, Pid, OtherReason} -> @@ -929,14 +1079,14 @@ shutdown(Pid, Time) -> {error, OtherReason} end end; - {error, Reason} -> + {error, Reason} -> {error, Reason} end. %% Help function to shutdown/2 switches from link to monitor approach monitor_child(Pid) -> - - %% Do the monitor operation first so that if the child dies + + %% Do the monitor operation first so that if the child dies %% before the monitoring is done causing a 'DOWN'-message with %% reason noproc, we will get the real reason in the 'EXIT'-message %% unless a naughty child has already done unlink... @@ -946,34 +1096,177 @@ monitor_child(Pid) -> receive %% If the child dies before the unlik we must empty %% the mail-box of the 'EXIT'-message and the 'DOWN'-message. - {'EXIT', Pid, Reason} -> - receive + {'EXIT', Pid, Reason} -> + receive {'DOWN', _, process, Pid, _} -> {error, Reason} end - after 0 -> + after 0 -> %% If a naughty child did unlink and the child dies before - %% monitor the result will be that shutdown/2 receives a + %% monitor the result will be that shutdown/2 receives a %% 'DOWN'-message with reason noproc. %% If the child should die after the unlink there %% will be a 'DOWN'-message with a correct reason - %% that will be handled in shutdown/2. - ok + %% that will be handled in shutdown/2. + ok end. %%----------------------------------------------------------------- +%% Func: terminate_dynamic_children/3 +%% Args: Child = child_rec() +%% Dynamics = ?DICT() | ?SET() +%% SupName = {local, atom()} | {global, atom()} | {pid(),Mod} +%% Returns: ok +%% +%% +%% Shutdown all dynamic children. This happens when the supervisor is +%% stopped. Because the supervisor can have millions of dynamic children, we +%% can have an significative overhead here. +%%----------------------------------------------------------------- +terminate_dynamic_children(Child, Dynamics, SupName) -> + {Pids, EStack0} = monitor_dynamic_children(Child, Dynamics), + Sz = ?SETS:size(Pids), + EStack = case Child#child.shutdown of + brutal_kill -> + ?SETS:fold(fun(P, _) -> exit(P, kill) end, ok, Pids), + wait_dynamic_children(Child, Pids, Sz, undefined, EStack0); + infinity -> + ?SETS:fold(fun(P, _) -> exit(P, shutdown) end, ok, Pids), + wait_dynamic_children(Child, Pids, Sz, undefined, EStack0); + Time -> + ?SETS:fold(fun(P, _) -> exit(P, shutdown) end, ok, Pids), + TRef = erlang:start_timer(Time, self(), kill), + wait_dynamic_children(Child, Pids, Sz, TRef, EStack0) + end, + %% Unroll stacked errors and report them + ?DICT:fold(fun(Reason, Ls, _) -> + report_error(shutdown_error, Reason, + Child#child{pid=Ls}, SupName) + end, ok, EStack). + + +monitor_dynamic_children(#child{restart_type=temporary}, Dynamics) -> + ?SETS:fold(fun(P, {Pids, EStack}) -> + case monitor_child(P) of + ok -> + {?SETS:add_element(P, Pids), EStack}; + {error, normal} -> + {Pids, EStack}; + {error, Reason} -> + {Pids, ?DICT:append(Reason, P, EStack)} + end + end, {?SETS:new(), ?DICT:new()}, Dynamics); +monitor_dynamic_children(#child{restart_type=RType}, Dynamics) -> + ?DICT:fold(fun(P, _, {Pids, EStack}) when is_pid(P) -> + case monitor_child(P) of + ok -> + {?SETS:add_element(P, Pids), EStack}; + {error, normal} when not ?is_permanent(RType) -> + {Pids, EStack}; + {error, Reason} -> + {Pids, ?DICT:append(Reason, P, EStack)} + end; + (?restarting(_), _, {Pids, EStack}) -> + {Pids, EStack} + end, {?SETS:new(), ?DICT:new()}, Dynamics). + +wait_dynamic_children(_Child, _Pids, 0, undefined, EStack) -> + EStack; +wait_dynamic_children(_Child, _Pids, 0, TRef, EStack) -> + %% If the timer has expired before its cancellation, we must empty the + %% mail-box of the 'timeout'-message. + erlang:cancel_timer(TRef), + receive + {timeout, TRef, kill} -> + EStack + after 0 -> + EStack + end; +wait_dynamic_children(#child{shutdown=brutal_kill} = Child, Pids, Sz, + TRef, EStack) -> + receive + {'DOWN', _MRef, process, Pid, killed} -> + wait_dynamic_children(Child, ?SETS:del_element(Pid, Pids), Sz-1, + TRef, EStack); + + {'DOWN', _MRef, process, Pid, Reason} -> + wait_dynamic_children(Child, ?SETS:del_element(Pid, Pids), Sz-1, + TRef, ?DICT:append(Reason, Pid, EStack)) + end; +wait_dynamic_children(#child{restart_type=RType} = Child, Pids, Sz, + TRef, EStack) -> + receive + {'DOWN', _MRef, process, Pid, shutdown} -> + wait_dynamic_children(Child, ?SETS:del_element(Pid, Pids), Sz-1, + TRef, EStack); + + {'DOWN', _MRef, process, Pid, normal} when not ?is_permanent(RType) -> + wait_dynamic_children(Child, ?SETS:del_element(Pid, Pids), Sz-1, + TRef, EStack); + + {'DOWN', _MRef, process, Pid, Reason} -> + wait_dynamic_children(Child, ?SETS:del_element(Pid, Pids), Sz-1, + TRef, ?DICT:append(Reason, Pid, EStack)); + + {timeout, TRef, kill} -> + ?SETS:fold(fun(P, _) -> exit(P, kill) end, ok, Pids), + wait_dynamic_children(Child, Pids, Sz-1, undefined, EStack) + end. + +%%----------------------------------------------------------------- %% Child/State manipulating functions. %%----------------------------------------------------------------- -state_del_child(#child{pid = Pid}, State) when ?is_simple(State) -> - NDynamics = ?DICT:erase(Pid, State#state.dynamics), + +%% Note we do not want to save the parameter list for temporary processes as +%% they will not be restarted, and hence we do not need this information. +%% Especially for dynamic children to simple_one_for_one supervisors +%% it could become very costly as it is not uncommon to spawn +%% very many such processes. +save_child(#child{restart_type = temporary, + mfargs = {M, F, _}} = Child, #state{children = Children} = State) -> + State#state{children = [Child#child{mfargs = {M, F, undefined}} |Children]}; +save_child(Child, #state{children = Children} = State) -> + State#state{children = [Child |Children]}. + +save_dynamic_child(temporary, Pid, _, #state{dynamics = Dynamics} = State) -> + State#state{dynamics = ?SETS:add_element(Pid, dynamics_db(temporary, Dynamics))}; +save_dynamic_child(RestartType, Pid, Args, #state{dynamics = Dynamics} = State) -> + State#state{dynamics = ?DICT:store(Pid, Args, dynamics_db(RestartType, Dynamics))}. + +dynamics_db(temporary, undefined) -> + ?SETS:new(); +dynamics_db(_, undefined) -> + ?DICT:new(); +dynamics_db(_,Dynamics) -> + Dynamics. + +dynamic_child_args(Pid, Dynamics) -> + case ?SETS:is_set(Dynamics) of + true -> + {ok, undefined}; + false -> + ?DICT:find(Pid, Dynamics) + end. + +state_del_child(#child{pid = Pid, restart_type = temporary}, State) when ?is_simple(State) -> + NDynamics = ?SETS:del_element(Pid, dynamics_db(temporary, State#state.dynamics)), + State#state{dynamics = NDynamics}; +state_del_child(#child{pid = Pid, restart_type = RType}, State) when ?is_simple(State) -> + NDynamics = ?DICT:erase(Pid, dynamics_db(RType, State#state.dynamics)), State#state{dynamics = NDynamics}; state_del_child(Child, State) -> NChildren = del_child(Child#child.name, State#state.children), State#state{children = NChildren}. +del_child(Name, [Ch=#child{pid = ?restarting(_)}|_]=Chs) when Ch#child.name =:= Name -> + Chs; +del_child(Name, [Ch|Chs]) when Ch#child.name =:= Name, Ch#child.restart_type =:= temporary -> + Chs; del_child(Name, [Ch|Chs]) when Ch#child.name =:= Name -> [Ch#child{pid = undefined} | Chs]; +del_child(Pid, [Ch|Chs]) when Ch#child.pid =:= Pid, Ch#child.restart_type =:= temporary -> + Chs; del_child(Pid, [Ch|Chs]) when Ch#child.pid =:= Pid -> [Ch#child{pid = undefined} | Chs]; del_child(Name, [Ch|Chs]) -> @@ -996,7 +1289,38 @@ split_child(_, [], After) -> {lists:reverse(After), []}. get_child(Name, State) -> + get_child(Name, State, false). +get_child(Pid, State, AllowPid) when AllowPid, is_pid(Pid) -> + get_dynamic_child(Pid, State); +get_child(Name, State, _) -> lists:keysearch(Name, #child.name, State#state.children). + +get_dynamic_child(Pid, #state{children=[Child], dynamics=Dynamics}) -> + DynamicsDb = dynamics_db(Child#child.restart_type, Dynamics), + case is_dynamic_pid(Pid, DynamicsDb) of + true -> + {value, Child#child{pid=Pid}}; + false -> + RPid = restarting(Pid), + case is_dynamic_pid(RPid, DynamicsDb) of + true -> + {value, Child#child{pid=RPid}}; + false -> + case erlang:is_process_alive(Pid) of + true -> false; + false -> {value, Child} + end + end + end. + +is_dynamic_pid(Pid, Dynamics) -> + case ?SETS:is_set(Dynamics) of + true -> + ?SETS:is_element(Pid, Dynamics); + false -> + ?DICT:is_key(Pid, Dynamics) + end. + replace_child(Child, State) -> Chs = do_replace_child(Child, State#state.children), State#state{children = Chs}. @@ -1016,12 +1340,12 @@ remove_child(Child, State) -> %% Type = {Strategy, MaxIntensity, Period} %% Strategy = one_for_one | one_for_all | simple_one_for_one | %% rest_for_one -%% MaxIntensity = integer() -%% Period = integer() +%% MaxIntensity = integer() >= 0 +%% Period = integer() > 0 %% Mod :== atom() -%% Arsg :== term() +%% Args :== term() %% Purpose: Check that Type is of correct type (!) -%% Returns: {ok, #state} | Error +%% Returns: {ok, state()} | Error %%----------------------------------------------------------------- init_state(SupName, Type, Mod, Args) -> case catch init_state1(SupName, Type, Mod, Args) of @@ -1036,46 +1360,45 @@ init_state1(SupName, {Strategy, MaxIntensity, Period}, Mod, Args) -> validIntensity(MaxIntensity), validPeriod(Period), {ok, #state{name = supname(SupName,Mod), - strategy = Strategy, - intensity = MaxIntensity, - period = Period, - module = Mod, - args = Args}}; + strategy = Strategy, + intensity = MaxIntensity, + period = Period, + module = Mod, + args = Args}}; init_state1(_SupName, Type, _, _) -> {invalid_type, Type}. -validStrategy(simple_one_for_one_terminate) -> true; -validStrategy(simple_one_for_one) -> true; -validStrategy(one_for_one) -> true; -validStrategy(one_for_all) -> true; -validStrategy(rest_for_one) -> true; -validStrategy(What) -> throw({invalid_strategy, What}). +validStrategy(simple_one_for_one) -> true; +validStrategy(one_for_one) -> true; +validStrategy(one_for_all) -> true; +validStrategy(rest_for_one) -> true; +validStrategy(What) -> throw({invalid_strategy, What}). validIntensity(Max) when is_integer(Max), Max >= 0 -> true; -validIntensity(What) -> throw({invalid_intensity, What}). +validIntensity(What) -> throw({invalid_intensity, What}). validPeriod(Period) when is_integer(Period), Period > 0 -> true; validPeriod(What) -> throw({invalid_period, What}). -supname(self,Mod) -> {self(),Mod}; -supname(N,_) -> N. +supname(self, Mod) -> {self(), Mod}; +supname(N, _) -> N. %%% ------------------------------------------------------ %%% Check that the children start specification is valid. %%% Shall be a six (6) tuple %%% {Name, Func, RestartType, Shutdown, ChildType, Modules} %%% where Name is an atom -%%% Func is {Mod, Fun, Args} == {atom, atom, list} +%%% Func is {Mod, Fun, Args} == {atom(), atom(), list()} %%% RestartType is permanent | temporary | transient | %%% intrinsic | {permanent, Delay} | %%% {transient, Delay} | {intrinsic, Delay} %% where Delay >= 0 -%%% Shutdown = integer() | infinity | brutal_kill +%%% Shutdown = integer() > 0 | infinity | brutal_kill %%% ChildType = supervisor | worker %%% Modules = [atom()] | dynamic -%%% Returns: {ok, [#child]} | Error +%%% Returns: {ok, [child_rec()]} | Error %%% ------------------------------------------------------ check_startspec(Children) -> check_startspec(Children, []). @@ -1103,7 +1426,7 @@ check_childspec(Name, Func, RestartType, Shutdown, ChildType, Mods) -> validChildType(ChildType), validShutdown(Shutdown, ChildType), validMods(Mods), - {ok, #child{name = Name, mfa = Func, restart_type = RestartType, + {ok, #child{name = Name, mfargs = Func, restart_type = RestartType, shutdown = Shutdown, child_type = ChildType, modules = Mods}}. validChildType(supervisor) -> true; @@ -1112,8 +1435,8 @@ validChildType(What) -> throw({invalid_child_type, What}). validName(_Name) -> true. -validFunc({M, F, A}) when is_atom(M), - is_atom(F), +validFunc({M, F, A}) when is_atom(M), + is_atom(F), is_list(A) -> true; validFunc(Func) -> throw({invalid_mfa, Func}). @@ -1131,15 +1454,15 @@ validDelay(Delay) when is_number(Delay), Delay >= 0 -> true; validDelay(What) -> throw({invalid_delay, What}). -validShutdown(Shutdown, _) +validShutdown(Shutdown, _) when is_integer(Shutdown), Shutdown > 0 -> true; -validShutdown(infinity, supervisor) -> true; +validShutdown(infinity, _) -> true; validShutdown(brutal_kill, _) -> true; validShutdown(Shutdown, _) -> throw({invalid_shutdown, Shutdown}). validMods(dynamic) -> true; validMods(Mods) when is_list(Mods) -> - lists:foreach(fun (Mod) -> + lists:foreach(fun(Mod) -> if is_atom(Mod) -> ok; true -> throw({invalid_module, Mod}) @@ -1157,7 +1480,7 @@ validMods(Mods) -> throw({invalid_modules, Mods}). %%% Returns: {ok, State'} | {terminate, State'} %%% ------------------------------------------------------ -add_restart(State) -> +add_restart(State) -> I = State#state.intensity, P = State#state.period, R = State#state.restarts, @@ -1213,15 +1536,18 @@ report_error(Error, Reason, Child, SupName) -> {offender, extract_child(Child)}], error_logger:error_report(supervisor_report, ErrorMsg). -shutdown_error_reporter(SupName) -> - fun(Reason, Child) -> - report_error(shutdown_error, Reason, Child, SupName) - end. +extract_child(Child) when is_list(Child#child.pid) -> + [{nb_children, length(Child#child.pid)}, + {name, Child#child.name}, + {mfargs, Child#child.mfargs}, + {restart_type, Child#child.restart_type}, + {shutdown, Child#child.shutdown}, + {child_type, Child#child.child_type}]; extract_child(Child) -> [{pid, Child#child.pid}, {name, Child#child.name}, - {mfa, Child#child.mfa}, + {mfargs, Child#child.mfargs}, {restart_type, Child#child.restart_type}, {shutdown, Child#child.shutdown}, {child_type, Child#child.child_type}]. diff --git a/src/supervisor2_tests.erl b/src/supervisor2_tests.erl index a841b1f0..5a47e309 100644 --- a/src/supervisor2_tests.erl +++ b/src/supervisor2_tests.erl @@ -65,6 +65,6 @@ init([Timeout]) -> [{local, ?MODULE}, ?MODULE, []]}, transient, Timeout, supervisor, [?MODULE]}]}}; init([]) -> - {ok, {{simple_one_for_one_terminate, 0, 1}, + {ok, {{simple_one_for_one, 0, 1}, [{test_worker, {?MODULE, start_link, []}, temporary, 1000, worker, [?MODULE]}]}}. diff --git a/src/test_sup.erl b/src/test_sup.erl index 51ff7b4e..da325f1e 100644 --- a/src/test_sup.erl +++ b/src/test_sup.erl @@ -34,7 +34,7 @@ %%---------------------------------------------------------------------------- test_supervisor_delayed_restart() -> - passed = with_sup(simple_one_for_one_terminate, + passed = with_sup(simple_one_for_one, fun (SupPid) -> {ok, _ChildPid} = supervisor2:start_child(SupPid, []), diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl index d60b7fec..a07f6c65 100644 --- a/src/vm_memory_monitor.erl +++ b/src/vm_memory_monitor.erl @@ -263,29 +263,11 @@ get_total_memory({unix,openbsd}) -> sysctl("hw.usermem"); get_total_memory({win32,_OSname}) -> - %% Due to the Erlang print format bug, on Windows boxes the memory - %% size is broken. For example Windows 7 64 bit with 4Gigs of RAM - %% we get negative memory size: - %% > os_mon_sysinfo:get_mem_info(). - %% ["76 -1658880 1016913920 -1 -1021628416 2147352576 2134794240\n"] - %% Due to this bug, we don't actually know anything. Even if the - %% number is postive we can't be sure if it's correct. This only - %% affects us on os_mon versions prior to 2.2.1. - case application:get_key(os_mon, vsn) of - undefined -> - unknown; - {ok, Version} -> - case rabbit_misc:version_compare(Version, "2.2.1", lt) of - true -> %% os_mon is < 2.2.1, so we know nothing - unknown; - false -> - [Result|_] = os_mon_sysinfo:get_mem_info(), - {ok, [_MemLoad, TotPhys, _AvailPhys, - _TotPage, _AvailPage, _TotV, _AvailV], _RestStr} = - io_lib:fread("~d~d~d~d~d~d~d", Result), - TotPhys - end - end; + [Result|_] = os_mon_sysinfo:get_mem_info(), + {ok, [_MemLoad, TotPhys, _AvailPhys, _TotPage, _AvailPage, _TotV, _AvailV], + _RestStr} = + io_lib:fread("~d~d~d~d~d~d~d", Result), + TotPhys; get_total_memory({unix, linux}) -> File = read_proc_file("/proc/meminfo"), |