diff options
author | Tim Watson <watson.timothy@gmail.com> | 2012-10-29 15:14:10 +0000 |
---|---|---|
committer | Tim Watson <watson.timothy@gmail.com> | 2012-10-29 15:14:10 +0000 |
commit | d8a163f5185bcb3849143f2af8e479d848bc0b93 (patch) | |
tree | 74ccd06c4c7a321f749ff478bbb020f41fc51eeb | |
parent | 8282fc8c337d9c08bd044af9055dd42f185d2651 (diff) | |
download | rabbitmq-server-d8a163f5185bcb3849143f2af8e479d848bc0b93.tar.gz |
no more simple_one_for_one_terminate; include support for dynamic_db
-rw-r--r-- | src/mirrored_supervisor.erl | 5 | ||||
-rw-r--r-- | src/rabbit_amqqueue_sup.erl | 2 | ||||
-rw-r--r-- | src/rabbit_channel_sup_sup.erl | 2 | ||||
-rw-r--r-- | src/rabbit_client_sup.erl | 2 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave_sup.erl | 2 | ||||
-rw-r--r-- | src/supervisor2.erl | 246 | ||||
-rw-r--r-- | src/supervisor2_tests.erl | 2 | ||||
-rw-r--r-- | src/test_sup.erl | 2 |
8 files changed, 204 insertions, 59 deletions
diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl index 24c3ebd0..e070d520 100644 --- a/src/mirrored_supervisor.erl +++ b/src/mirrored_supervisor.erl @@ -212,9 +212,8 @@ start_link0(Prefix, Group, Init) -> init(Mod, Args) -> case Mod:init(Args) of {ok, {{Bad, _, _}, _ChildSpecs}} when - Bad =:= simple_one_for_one orelse - Bad =:= simple_one_for_one_terminate -> erlang:error(badarg); - Init -> Init + Bad =:= simple_one_for_one -> erlang:error(badarg); + Init -> Init end. start_child(Sup, ChildSpec) -> call(Sup, {start_child, ChildSpec}). diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl index a4305e5f..7586fe46 100644 --- a/src/rabbit_amqqueue_sup.erl +++ b/src/rabbit_amqqueue_sup.erl @@ -47,6 +47,6 @@ start_child(Node, Args) -> supervisor2:start_child({?SERVER, Node}, Args). init([]) -> - {ok, {{simple_one_for_one_terminate, 10, 10}, + {ok, {{simple_one_for_one, 10, 10}, [{rabbit_amqqueue, {rabbit_amqqueue_process, start_link, []}, temporary, ?MAX_WAIT, worker, [rabbit_amqqueue_process]}]}}. diff --git a/src/rabbit_channel_sup_sup.erl b/src/rabbit_channel_sup_sup.erl index 995c41fb..29cd8787 100644 --- a/src/rabbit_channel_sup_sup.erl +++ b/src/rabbit_channel_sup_sup.erl @@ -43,6 +43,6 @@ start_channel(Pid, Args) -> %%---------------------------------------------------------------------------- init([]) -> - {ok, {{simple_one_for_one_terminate, 0, 1}, + {ok, {{simple_one_for_one, 0, 1}, [{channel_sup, {rabbit_channel_sup, start_link, []}, temporary, infinity, supervisor, [rabbit_channel_sup]}]}}. diff --git a/src/rabbit_client_sup.erl b/src/rabbit_client_sup.erl index c508f1b9..b4db97b1 100644 --- a/src/rabbit_client_sup.erl +++ b/src/rabbit_client_sup.erl @@ -44,5 +44,5 @@ start_link(SupName, Callback) -> supervisor2:start_link(SupName, ?MODULE, Callback). init({M,F,A}) -> - {ok, {{simple_one_for_one_terminate, 0, 1}, + {ok, {{simple_one_for_one, 0, 1}, [{client, {M,F,A}, temporary, infinity, supervisor, [M]}]}}. diff --git a/src/rabbit_mirror_queue_slave_sup.erl b/src/rabbit_mirror_queue_slave_sup.erl index a2034876..a20c50ef 100644 --- a/src/rabbit_mirror_queue_slave_sup.erl +++ b/src/rabbit_mirror_queue_slave_sup.erl @@ -31,7 +31,7 @@ start_link() -> supervisor2:start_link({local, ?SERVER}, ?MODULE, []). start_child(Node, Args) -> supervisor2:start_child({?SERVER, Node}, Args). init([]) -> - {ok, {{simple_one_for_one_terminate, 10, 10}, + {ok, {{simple_one_for_one, 10, 10}, [{rabbit_mirror_queue_slave, {rabbit_mirror_queue_slave, start_link, []}, temporary, ?MAX_WAIT, worker, [rabbit_mirror_queue_slave]}]}}. diff --git a/src/supervisor2.erl b/src/supervisor2.erl index eae2f298..2256b98e 100644 --- a/src/supervisor2.erl +++ b/src/supervisor2.erl @@ -3,12 +3,7 @@ %% %% 1) the module name is supervisor2 %% -%% 2) there is a new strategy called -%% simple_one_for_one_terminate. This is exactly the same as for -%% simple_one_for_one, except that children *are* explicitly -%% terminated as per the shutdown component of the child_spec. -%% -%% 3) child specifications can contain, as the restart type, a tuple +%% 2) child specifications can contain, as the restart type, a tuple %% {permanent, Delay} | {transient, Delay} | {intrinsic, Delay} %% where Delay >= 0 (see point (4) below for intrinsic). The delay, %% in seconds, indicates what should happen if a child, upon being @@ -41,14 +36,14 @@ %% perspective it's a normal exit, whilst from supervisor's %% perspective, it's an abnormal exit. %% -%% 4) Added an 'intrinsic' restart type. Like the transient type, this +%% 3) Added an 'intrinsic' restart type. Like the transient type, this %% type means the child should only be restarted if the child exits %% abnormally. Unlike the transient type, if the child exits %% normally, the supervisor itself also exits normally. If the %% child is a supervisor and it exits normally (i.e. with reason of %% 'shutdown') then the child's parent also exits normally. %% -%% 5) normal, and {shutdown, _} exit reasons are all treated the same +%% 4) normal, and {shutdown, _} exit reasons are all treated the same %% (i.e. are regarded as normal exits) %% %% All modifications are (C) 2010-2012 VMware, Inc. @@ -91,7 +86,7 @@ %%-------------------------------------------------------------------------- --type child() :: 'undefined' | pid(). +-type child() :: 'undefined' | pid(). -type child_id() :: term(). -type mfargs() :: {M :: module(), F :: atom(), A :: [term()] | undefined}. -type modules() :: [module()] | 'dynamic'. @@ -112,7 +107,7 @@ Modules :: modules()}. -type strategy() :: 'one_for_all' | 'one_for_one' - | 'rest_for_one' | 'simple_one_for_one' | 'simple_one_for_one_terminate'. + | 'rest_for_one' | 'simple_one_for_one'. %%-------------------------------------------------------------------------- @@ -132,17 +127,16 @@ -record(state, {name, strategy :: strategy(), - children = [] :: [child_rec()], - dynamics = ?DICT:new() :: ?DICT(), - intensity :: non_neg_integer(), - period :: pos_integer(), + children = [] :: [child_rec()], + dynamics :: ?DICT() | ?SET(), + intensity :: non_neg_integer(), + period :: pos_integer(), restarts = [], module, args}). -type state() :: #state{}. --define(is_simple(State), State#state.strategy =:= simple_one_for_one orelse State#state.strategy =:= simple_one_for_one_terminate). --define(is_terminate_simple(State), State#state.strategy =:= simple_one_for_one_terminate). +-define(is_simple(State), State#state.strategy =:= simple_one_for_one). -callback init(Args :: term()) -> {ok, {{RestartStrategy :: strategy(), @@ -166,7 +160,7 @@ Module :: module(), Args :: term(). start_link(Mod, Args) -> - gen_server:start_link(supervisor, {self, Mod, Args}, []). + gen_server:start_link(supervisor2, {self, Mod, Args}, []). -spec start_link(SupName, Module, Args) -> startlink_ret() when SupName :: sup_name(), @@ -398,10 +392,10 @@ handle_call({start_child, EArgs}, _From, State) when ?is_simple(State) -> {ok, undefined} -> {reply, {ok, undefined}, State}; {ok, Pid} -> - NState = State#state{dynamics = ?DICT:store(Pid, Args, State#state.dynamics)}, + NState = save_dynamic_child(Child#child.restart_type, Pid, Args, State), {reply, {ok, Pid}, NState}; {ok, Pid, Extra} -> - NState = State#state{dynamics = ?DICT:store(Pid, Args, State#state.dynamics)}, + NState = save_dynamic_child(Child#child.restart_type, Pid, Args, State), {reply, {ok, Pid, Extra}, NState}; What -> {reply, What, State} @@ -422,7 +416,7 @@ handle_call({terminate_child, Name}, _From, State) -> end; %%% The requests delete_child and restart_child are invalid for -%%% simple_one_for_one and simple_one_for_one_terminate supervisors. +%%% simple_one_for_one supervisors. handle_call({_Req, _Data}, _From, State) when ?is_simple(State) -> {reply, {error, State#state.strategy}, State}; @@ -465,10 +459,20 @@ handle_call({delete_child, Name}, _From, State) -> {reply, {error, not_found}, State} end; -handle_call(which_children, _From, State) when ?is_simple(State) -> - [#child{child_type = CT, modules = Mods}] = State#state.children, +handle_call(which_children, _From, #state{children = [#child{restart_type = temporary, + child_type = CT, + modules = Mods}]} = + State) when ?is_simple(State) -> + Reply = lists:map(fun(Pid) -> {undefined, Pid, CT, Mods} end, + ?SETS:to_list(dynamics_db(temporary, State#state.dynamics))), + {reply, Reply, State}; + +handle_call(which_children, _From, #state{children = [#child{restart_type = RType, + child_type = CT, + modules = Mods}]} = + State) when ?is_simple(State) -> Reply = lists:map(fun ({Pid, _}) -> {undefined, Pid, CT, Mods} end, - ?DICT:to_list(State#state.dynamics)), + ?DICT:to_list(dynamics_db(RType, State#state.dynamics))), {reply, Reply, State}; handle_call(which_children, _From, State) -> @@ -589,13 +593,12 @@ handle_info(Msg, State) -> %% -spec terminate(term(), state()) -> 'ok'. -terminate(_Reason, State) when ?is_terminate_simple(State) -> - terminate_simple_children( - hd(State#state.children), State#state.dynamics, State#state.name), - ok; +terminate(_Reason, #state{children=[Child]} = State) when ?is_simple(State) -> + terminate_dynamic_children(Child, dynamics_db(Child#child.restart_type, + State#state.dynamics), + State#state.name); terminate(_Reason, State) -> - terminate_children(State#state.children, State#state.name), - ok. + terminate_children(State#state.children, State#state.name). %% %% Change code for the supervisor. @@ -686,11 +689,9 @@ handle_start_child(Child, State) -> false -> case do_start_child(State#state.name, Child) of {ok, Pid} -> - Children = State#state.children, - {{ok, Pid}, State#state{children = [Child#child{pid = Pid}|Children]}}; + {{ok, Pid}, save_child(Child#child{pid = Pid}, State)}; {ok, Pid, Extra} -> - Children = State#state.children, - {{ok, Pid, Extra}, State#state{children = [Child#child{pid = Pid}|Children]}}; + {{ok, Pid, Extra}, save_child(Child#child{pid = Pid}, State)}; {error, What} -> {{error, {What, Child}}, State} end; @@ -705,16 +706,15 @@ handle_start_child(Child, State) -> %%% Returns: {ok, state()} | {shutdown, state()} %%% --------------------------------------------------- -restart_child(Pid, Reason, State) when ?is_simple(State) -> - case ?DICT:find(Pid, State#state.dynamics) of +restart_child(Pid, Reason, #state{children = [Child]} = State) when ?is_simple(State) -> + RestartType = Child#child.restart_type, + case dynamic_child_args(Pid, dynamics_db(RestartType, State#state.dynamics)) of {ok, Args} -> - [Child] = State#state.children, - RestartType = Child#child.restart_type, {M, F, _} = Child#child.mfa, NChild = Child#child{pid = Pid, mfa = {M, F, Args}}, do_restart(RestartType, Reason, NChild, State); error -> - {ok, State} + {ok, State} end; restart_child(Pid, Reason, State) -> Children = State#state.children, @@ -793,9 +793,7 @@ restart1(Child, State) -> {terminate, State} end. -restart(Strategy, Child, State, Restart) - when Strategy =:= simple_one_for_one orelse - Strategy =:= simple_one_for_one_terminate -> +restart(simple_one_for_one, Child, State, Restart) -> #child{mfa = {M, F, A}} = Child, Dynamics = ?DICT:erase(Child#child.pid, State#state.dynamics), case do_start_child_i(M, F, A) of @@ -859,12 +857,7 @@ terminate_children([], _SupName, Res) -> Res. terminate_simple_children(Child, Dynamics, SupName) -> - Pids = dict:fold(fun (Pid, _Args, Pids) -> - erlang:monitor(process, Pid), - unlink(Pid), - exit(Pid, child_exit_reason(Child)), - [Pid | Pids] - end, [], Dynamics), + Pids = monitor_children(Child, Dynamics), TimeoutMsg = {timeout, make_ref()}, TRef = timeout_start(Child, TimeoutMsg), {Replies, Timedout} = @@ -898,6 +891,21 @@ terminate_simple_children(Child, Dynamics, SupName) -> end || {Pid, Reply} <- Replies], ok. +monitor_children(Child=#child{restart_type=temporary}, Dynamics) -> + ?SETS:fold(fun (Pid, _Args, Pids) -> + erlang:monitor(process, Pid), + unlink(Pid), + exit(Pid, child_exit_reason(Child)), + [Pid | Pids] + end, [], Dynamics); +monitor_children(Child, Dynamics) -> + dict:fold(fun (Pid, _Args, Pids) -> + erlang:monitor(process, Pid), + unlink(Pid), + exit(Pid, child_exit_reason(Child)), + [Pid | Pids] + end, [], Dynamics). + child_exit_reason(#child{shutdown = brutal_kill}) -> kill; child_exit_reason(#child{}) -> shutdown. @@ -1017,10 +1025,148 @@ monitor_child(Pid) -> %%----------------------------------------------------------------- +%% Func: terminate_dynamic_children/3 +%% Args: Child = child_rec() +%% Dynamics = ?DICT() | ?SET() +%% SupName = {local, atom()} | {global, atom()} | {pid(),Mod} +%% Returns: ok +%% +%% +%% Shutdown all dynamic children. This happens when the supervisor is +%% stopped. Because the supervisor can have millions of dynamic children, we +%% can have an significative overhead here. +%%----------------------------------------------------------------- +terminate_dynamic_children(Child, Dynamics, SupName) -> + {Pids, EStack0} = monitor_dynamic_children(Child, Dynamics), + Sz = ?SETS:size(Pids), + EStack = case Child#child.shutdown of + brutal_kill -> + ?SETS:fold(fun(P, _) -> exit(P, kill) end, ok, Pids), + wait_dynamic_children(Child, Pids, Sz, undefined, EStack0); + infinity -> + ?SETS:fold(fun(P, _) -> exit(P, shutdown) end, ok, Pids), + wait_dynamic_children(Child, Pids, Sz, undefined, EStack0); + Time -> + ?SETS:fold(fun(P, _) -> exit(P, shutdown) end, ok, Pids), + TRef = erlang:start_timer(Time, self(), kill), + wait_dynamic_children(Child, Pids, Sz, TRef, EStack0) + end, + %% Unroll stacked errors and report them + ?DICT:fold(fun(Reason, Ls, _) -> + report_error(shutdown_error, Reason, + Child#child{pid=Ls}, SupName) + end, ok, EStack). + + +monitor_dynamic_children(#child{restart_type=temporary}, Dynamics) -> + ?SETS:fold(fun(P, {Pids, EStack}) -> + case monitor_child(P) of + ok -> + {?SETS:add_element(P, Pids), EStack}; + {error, normal} -> + {Pids, EStack}; + {error, Reason} -> + {Pids, ?DICT:append(Reason, P, EStack)} + end + end, {?SETS:new(), ?DICT:new()}, Dynamics); +monitor_dynamic_children(#child{restart_type=RType}, Dynamics) -> + ?DICT:fold(fun(P, _, {Pids, EStack}) when is_pid(P) -> + case monitor_child(P) of + ok -> + {?SETS:add_element(P, Pids), EStack}; + {error, normal} when RType =/= permanent -> + {Pids, EStack}; + {error, Reason} -> + {Pids, ?DICT:append(Reason, P, EStack)} + end; + (?restarting(_), _, {Pids, EStack}) -> + {Pids, EStack} + end, {?SETS:new(), ?DICT:new()}, Dynamics). + + +wait_dynamic_children(_Child, _Pids, 0, undefined, EStack) -> + EStack; +wait_dynamic_children(_Child, _Pids, 0, TRef, EStack) -> + %% If the timer has expired before its cancellation, we must empty the + %% mail-box of the 'timeout'-message. + erlang:cancel_timer(TRef), + receive + {timeout, TRef, kill} -> + EStack + after 0 -> + EStack + end; +wait_dynamic_children(#child{shutdown=brutal_kill} = Child, Pids, Sz, + TRef, EStack) -> + receive + {'DOWN', _MRef, process, Pid, killed} -> + wait_dynamic_children(Child, ?SETS:del_element(Pid, Pids), Sz-1, + TRef, EStack); + + {'DOWN', _MRef, process, Pid, Reason} -> + wait_dynamic_children(Child, ?SETS:del_element(Pid, Pids), Sz-1, + TRef, ?DICT:append(Reason, Pid, EStack)) + end; +wait_dynamic_children(#child{restart_type=RType} = Child, Pids, Sz, + TRef, EStack) -> + receive + {'DOWN', _MRef, process, Pid, shutdown} -> + wait_dynamic_children(Child, ?SETS:del_element(Pid, Pids), Sz-1, + TRef, EStack); + + {'DOWN', _MRef, process, Pid, normal} when RType =/= permanent -> + wait_dynamic_children(Child, ?SETS:del_element(Pid, Pids), Sz-1, + TRef, EStack); + + {'DOWN', _MRef, process, Pid, Reason} -> + wait_dynamic_children(Child, ?SETS:del_element(Pid, Pids), Sz-1, + TRef, ?DICT:append(Reason, Pid, EStack)); + + {timeout, TRef, kill} -> + ?SETS:fold(fun(P, _) -> exit(P, kill) end, ok, Pids), + wait_dynamic_children(Child, Pids, Sz-1, undefined, EStack) + end. + +%%----------------------------------------------------------------- %% Child/State manipulating functions. %%----------------------------------------------------------------- -state_del_child(#child{pid = Pid}, State) when ?is_simple(State) -> - NDynamics = ?DICT:erase(Pid, State#state.dynamics), + +%% Note we do not want to save the parameter list for temporary processes as +%% they will not be restarted, and hence we do not need this information. +%% Especially for dynamic children to simple_one_for_one supervisors +%% it could become very costly as it is not uncommon to spawn +%% very many such processes. +save_child(#child{restart_type = temporary, + mfa = {M, F, _}} = Child, #state{children = Children} = State) -> + State#state{children = [Child#child{mfa = {M, F, undefined}} |Children]}; +save_child(Child, #state{children = Children} = State) -> + State#state{children = [Child |Children]}. + +save_dynamic_child(temporary, Pid, _, #state{dynamics = Dynamics} = State) -> + State#state{dynamics = ?SETS:add_element(Pid, dynamics_db(temporary, Dynamics))}; +save_dynamic_child(RestartType, Pid, Args, #state{dynamics = Dynamics} = State) -> + State#state{dynamics = ?DICT:store(Pid, Args, dynamics_db(RestartType, Dynamics))}. + +dynamics_db(temporary, undefined) -> + ?SETS:new(); +dynamics_db(_, undefined) -> + ?DICT:new(); +dynamics_db(_,Dynamics) -> + Dynamics. + +dynamic_child_args(Pid, Dynamics) -> + case ?SETS:is_set(Dynamics) of + true -> + {ok, undefined}; + false -> + ?DICT:find(Pid, Dynamics) + end. + +state_del_child(#child{pid = Pid, restart_type = temporary}, State) when ?is_simple(State) -> + NDynamics = ?SETS:del_element(Pid, dynamics_db(temporary, State#state.dynamics)), + State#state{dynamics = NDynamics}; +state_del_child(#child{pid = Pid, restart_type = RType}, State) when ?is_simple(State) -> + NDynamics = ?DICT:erase(Pid, dynamics_db(RType, State#state.dynamics)), State#state{dynamics = NDynamics}; state_del_child(Child, State) -> NChildren = del_child(Child#child.name, State#state.children), @@ -1051,6 +1197,7 @@ split_child(_, [], After) -> get_child(Name, State) -> lists:keysearch(Name, #child.name, State#state.children). + replace_child(Child, State) -> Chs = do_replace_child(Child, State#state.children), State#state{children = Chs}. @@ -1098,7 +1245,6 @@ init_state1(SupName, {Strategy, MaxIntensity, Period}, Mod, Args) -> init_state1(_SupName, Type, _, _) -> {invalid_type, Type}. -validStrategy(simple_one_for_one_terminate) -> true; validStrategy(simple_one_for_one) -> true; validStrategy(one_for_one) -> true; validStrategy(one_for_all) -> true; diff --git a/src/supervisor2_tests.erl b/src/supervisor2_tests.erl index e42ded7b..ff1a7f3c 100644 --- a/src/supervisor2_tests.erl +++ b/src/supervisor2_tests.erl @@ -65,6 +65,6 @@ init([Timeout]) -> [{local, ?MODULE}, ?MODULE, []]}, transient, Timeout, supervisor, [?MODULE]}]}}; init([]) -> - {ok, {{simple_one_for_one_terminate, 0, 1}, + {ok, {{simple_one_for_one, 0, 1}, [{test_worker, {?MODULE, start_link, []}, temporary, 1000, worker, [?MODULE]}]}}. diff --git a/src/test_sup.erl b/src/test_sup.erl index 7f4b5049..6a56e64a 100644 --- a/src/test_sup.erl +++ b/src/test_sup.erl @@ -34,7 +34,7 @@ %%---------------------------------------------------------------------------- test_supervisor_delayed_restart() -> - passed = with_sup(simple_one_for_one_terminate, + passed = with_sup(simple_one_for_one, fun (SupPid) -> {ok, _ChildPid} = supervisor2:start_child(SupPid, []), |