summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTim Watson <watson.timothy@gmail.com>2012-10-29 15:14:10 +0000
committerTim Watson <watson.timothy@gmail.com>2012-10-29 15:14:10 +0000
commitd8a163f5185bcb3849143f2af8e479d848bc0b93 (patch)
tree74ccd06c4c7a321f749ff478bbb020f41fc51eeb
parent8282fc8c337d9c08bd044af9055dd42f185d2651 (diff)
downloadrabbitmq-server-d8a163f5185bcb3849143f2af8e479d848bc0b93.tar.gz
no more simple_one_for_one_terminate; include support for dynamic_db
-rw-r--r--src/mirrored_supervisor.erl5
-rw-r--r--src/rabbit_amqqueue_sup.erl2
-rw-r--r--src/rabbit_channel_sup_sup.erl2
-rw-r--r--src/rabbit_client_sup.erl2
-rw-r--r--src/rabbit_mirror_queue_slave_sup.erl2
-rw-r--r--src/supervisor2.erl246
-rw-r--r--src/supervisor2_tests.erl2
-rw-r--r--src/test_sup.erl2
8 files changed, 204 insertions, 59 deletions
diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl
index 24c3ebd0..e070d520 100644
--- a/src/mirrored_supervisor.erl
+++ b/src/mirrored_supervisor.erl
@@ -212,9 +212,8 @@ start_link0(Prefix, Group, Init) ->
init(Mod, Args) ->
case Mod:init(Args) of
{ok, {{Bad, _, _}, _ChildSpecs}} when
- Bad =:= simple_one_for_one orelse
- Bad =:= simple_one_for_one_terminate -> erlang:error(badarg);
- Init -> Init
+ Bad =:= simple_one_for_one -> erlang:error(badarg);
+ Init -> Init
end.
start_child(Sup, ChildSpec) -> call(Sup, {start_child, ChildSpec}).
diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl
index a4305e5f..7586fe46 100644
--- a/src/rabbit_amqqueue_sup.erl
+++ b/src/rabbit_amqqueue_sup.erl
@@ -47,6 +47,6 @@ start_child(Node, Args) ->
supervisor2:start_child({?SERVER, Node}, Args).
init([]) ->
- {ok, {{simple_one_for_one_terminate, 10, 10},
+ {ok, {{simple_one_for_one, 10, 10},
[{rabbit_amqqueue, {rabbit_amqqueue_process, start_link, []},
temporary, ?MAX_WAIT, worker, [rabbit_amqqueue_process]}]}}.
diff --git a/src/rabbit_channel_sup_sup.erl b/src/rabbit_channel_sup_sup.erl
index 995c41fb..29cd8787 100644
--- a/src/rabbit_channel_sup_sup.erl
+++ b/src/rabbit_channel_sup_sup.erl
@@ -43,6 +43,6 @@ start_channel(Pid, Args) ->
%%----------------------------------------------------------------------------
init([]) ->
- {ok, {{simple_one_for_one_terminate, 0, 1},
+ {ok, {{simple_one_for_one, 0, 1},
[{channel_sup, {rabbit_channel_sup, start_link, []},
temporary, infinity, supervisor, [rabbit_channel_sup]}]}}.
diff --git a/src/rabbit_client_sup.erl b/src/rabbit_client_sup.erl
index c508f1b9..b4db97b1 100644
--- a/src/rabbit_client_sup.erl
+++ b/src/rabbit_client_sup.erl
@@ -44,5 +44,5 @@ start_link(SupName, Callback) ->
supervisor2:start_link(SupName, ?MODULE, Callback).
init({M,F,A}) ->
- {ok, {{simple_one_for_one_terminate, 0, 1},
+ {ok, {{simple_one_for_one, 0, 1},
[{client, {M,F,A}, temporary, infinity, supervisor, [M]}]}}.
diff --git a/src/rabbit_mirror_queue_slave_sup.erl b/src/rabbit_mirror_queue_slave_sup.erl
index a2034876..a20c50ef 100644
--- a/src/rabbit_mirror_queue_slave_sup.erl
+++ b/src/rabbit_mirror_queue_slave_sup.erl
@@ -31,7 +31,7 @@ start_link() -> supervisor2:start_link({local, ?SERVER}, ?MODULE, []).
start_child(Node, Args) -> supervisor2:start_child({?SERVER, Node}, Args).
init([]) ->
- {ok, {{simple_one_for_one_terminate, 10, 10},
+ {ok, {{simple_one_for_one, 10, 10},
[{rabbit_mirror_queue_slave,
{rabbit_mirror_queue_slave, start_link, []},
temporary, ?MAX_WAIT, worker, [rabbit_mirror_queue_slave]}]}}.
diff --git a/src/supervisor2.erl b/src/supervisor2.erl
index eae2f298..2256b98e 100644
--- a/src/supervisor2.erl
+++ b/src/supervisor2.erl
@@ -3,12 +3,7 @@
%%
%% 1) the module name is supervisor2
%%
-%% 2) there is a new strategy called
-%% simple_one_for_one_terminate. This is exactly the same as for
-%% simple_one_for_one, except that children *are* explicitly
-%% terminated as per the shutdown component of the child_spec.
-%%
-%% 3) child specifications can contain, as the restart type, a tuple
+%% 2) child specifications can contain, as the restart type, a tuple
%% {permanent, Delay} | {transient, Delay} | {intrinsic, Delay}
%% where Delay >= 0 (see point (4) below for intrinsic). The delay,
%% in seconds, indicates what should happen if a child, upon being
@@ -41,14 +36,14 @@
%% perspective it's a normal exit, whilst from supervisor's
%% perspective, it's an abnormal exit.
%%
-%% 4) Added an 'intrinsic' restart type. Like the transient type, this
+%% 3) Added an 'intrinsic' restart type. Like the transient type, this
%% type means the child should only be restarted if the child exits
%% abnormally. Unlike the transient type, if the child exits
%% normally, the supervisor itself also exits normally. If the
%% child is a supervisor and it exits normally (i.e. with reason of
%% 'shutdown') then the child's parent also exits normally.
%%
-%% 5) normal, and {shutdown, _} exit reasons are all treated the same
+%% 4) normal, and {shutdown, _} exit reasons are all treated the same
%% (i.e. are regarded as normal exits)
%%
%% All modifications are (C) 2010-2012 VMware, Inc.
@@ -91,7 +86,7 @@
%%--------------------------------------------------------------------------
--type child() :: 'undefined' | pid().
+-type child() :: 'undefined' | pid().
-type child_id() :: term().
-type mfargs() :: {M :: module(), F :: atom(), A :: [term()] | undefined}.
-type modules() :: [module()] | 'dynamic'.
@@ -112,7 +107,7 @@
Modules :: modules()}.
-type strategy() :: 'one_for_all' | 'one_for_one'
- | 'rest_for_one' | 'simple_one_for_one' | 'simple_one_for_one_terminate'.
+ | 'rest_for_one' | 'simple_one_for_one'.
%%--------------------------------------------------------------------------
@@ -132,17 +127,16 @@
-record(state, {name,
strategy :: strategy(),
- children = [] :: [child_rec()],
- dynamics = ?DICT:new() :: ?DICT(),
- intensity :: non_neg_integer(),
- period :: pos_integer(),
+ children = [] :: [child_rec()],
+ dynamics :: ?DICT() | ?SET(),
+ intensity :: non_neg_integer(),
+ period :: pos_integer(),
restarts = [],
module,
args}).
-type state() :: #state{}.
--define(is_simple(State), State#state.strategy =:= simple_one_for_one orelse State#state.strategy =:= simple_one_for_one_terminate).
--define(is_terminate_simple(State), State#state.strategy =:= simple_one_for_one_terminate).
+-define(is_simple(State), State#state.strategy =:= simple_one_for_one).
-callback init(Args :: term()) ->
{ok, {{RestartStrategy :: strategy(),
@@ -166,7 +160,7 @@
Module :: module(),
Args :: term().
start_link(Mod, Args) ->
- gen_server:start_link(supervisor, {self, Mod, Args}, []).
+ gen_server:start_link(supervisor2, {self, Mod, Args}, []).
-spec start_link(SupName, Module, Args) -> startlink_ret() when
SupName :: sup_name(),
@@ -398,10 +392,10 @@ handle_call({start_child, EArgs}, _From, State) when ?is_simple(State) ->
{ok, undefined} ->
{reply, {ok, undefined}, State};
{ok, Pid} ->
- NState = State#state{dynamics = ?DICT:store(Pid, Args, State#state.dynamics)},
+ NState = save_dynamic_child(Child#child.restart_type, Pid, Args, State),
{reply, {ok, Pid}, NState};
{ok, Pid, Extra} ->
- NState = State#state{dynamics = ?DICT:store(Pid, Args, State#state.dynamics)},
+ NState = save_dynamic_child(Child#child.restart_type, Pid, Args, State),
{reply, {ok, Pid, Extra}, NState};
What ->
{reply, What, State}
@@ -422,7 +416,7 @@ handle_call({terminate_child, Name}, _From, State) ->
end;
%%% The requests delete_child and restart_child are invalid for
-%%% simple_one_for_one and simple_one_for_one_terminate supervisors.
+%%% simple_one_for_one supervisors.
handle_call({_Req, _Data}, _From, State) when ?is_simple(State) ->
{reply, {error, State#state.strategy}, State};
@@ -465,10 +459,20 @@ handle_call({delete_child, Name}, _From, State) ->
{reply, {error, not_found}, State}
end;
-handle_call(which_children, _From, State) when ?is_simple(State) ->
- [#child{child_type = CT, modules = Mods}] = State#state.children,
+handle_call(which_children, _From, #state{children = [#child{restart_type = temporary,
+ child_type = CT,
+ modules = Mods}]} =
+ State) when ?is_simple(State) ->
+ Reply = lists:map(fun(Pid) -> {undefined, Pid, CT, Mods} end,
+ ?SETS:to_list(dynamics_db(temporary, State#state.dynamics))),
+ {reply, Reply, State};
+
+handle_call(which_children, _From, #state{children = [#child{restart_type = RType,
+ child_type = CT,
+ modules = Mods}]} =
+ State) when ?is_simple(State) ->
Reply = lists:map(fun ({Pid, _}) -> {undefined, Pid, CT, Mods} end,
- ?DICT:to_list(State#state.dynamics)),
+ ?DICT:to_list(dynamics_db(RType, State#state.dynamics))),
{reply, Reply, State};
handle_call(which_children, _From, State) ->
@@ -589,13 +593,12 @@ handle_info(Msg, State) ->
%%
-spec terminate(term(), state()) -> 'ok'.
-terminate(_Reason, State) when ?is_terminate_simple(State) ->
- terminate_simple_children(
- hd(State#state.children), State#state.dynamics, State#state.name),
- ok;
+terminate(_Reason, #state{children=[Child]} = State) when ?is_simple(State) ->
+ terminate_dynamic_children(Child, dynamics_db(Child#child.restart_type,
+ State#state.dynamics),
+ State#state.name);
terminate(_Reason, State) ->
- terminate_children(State#state.children, State#state.name),
- ok.
+ terminate_children(State#state.children, State#state.name).
%%
%% Change code for the supervisor.
@@ -686,11 +689,9 @@ handle_start_child(Child, State) ->
false ->
case do_start_child(State#state.name, Child) of
{ok, Pid} ->
- Children = State#state.children,
- {{ok, Pid}, State#state{children = [Child#child{pid = Pid}|Children]}};
+ {{ok, Pid}, save_child(Child#child{pid = Pid}, State)};
{ok, Pid, Extra} ->
- Children = State#state.children,
- {{ok, Pid, Extra}, State#state{children = [Child#child{pid = Pid}|Children]}};
+ {{ok, Pid, Extra}, save_child(Child#child{pid = Pid}, State)};
{error, What} ->
{{error, {What, Child}}, State}
end;
@@ -705,16 +706,15 @@ handle_start_child(Child, State) ->
%%% Returns: {ok, state()} | {shutdown, state()}
%%% ---------------------------------------------------
-restart_child(Pid, Reason, State) when ?is_simple(State) ->
- case ?DICT:find(Pid, State#state.dynamics) of
+restart_child(Pid, Reason, #state{children = [Child]} = State) when ?is_simple(State) ->
+ RestartType = Child#child.restart_type,
+ case dynamic_child_args(Pid, dynamics_db(RestartType, State#state.dynamics)) of
{ok, Args} ->
- [Child] = State#state.children,
- RestartType = Child#child.restart_type,
{M, F, _} = Child#child.mfa,
NChild = Child#child{pid = Pid, mfa = {M, F, Args}},
do_restart(RestartType, Reason, NChild, State);
error ->
- {ok, State}
+ {ok, State}
end;
restart_child(Pid, Reason, State) ->
Children = State#state.children,
@@ -793,9 +793,7 @@ restart1(Child, State) ->
{terminate, State}
end.
-restart(Strategy, Child, State, Restart)
- when Strategy =:= simple_one_for_one orelse
- Strategy =:= simple_one_for_one_terminate ->
+restart(simple_one_for_one, Child, State, Restart) ->
#child{mfa = {M, F, A}} = Child,
Dynamics = ?DICT:erase(Child#child.pid, State#state.dynamics),
case do_start_child_i(M, F, A) of
@@ -859,12 +857,7 @@ terminate_children([], _SupName, Res) ->
Res.
terminate_simple_children(Child, Dynamics, SupName) ->
- Pids = dict:fold(fun (Pid, _Args, Pids) ->
- erlang:monitor(process, Pid),
- unlink(Pid),
- exit(Pid, child_exit_reason(Child)),
- [Pid | Pids]
- end, [], Dynamics),
+ Pids = monitor_children(Child, Dynamics),
TimeoutMsg = {timeout, make_ref()},
TRef = timeout_start(Child, TimeoutMsg),
{Replies, Timedout} =
@@ -898,6 +891,21 @@ terminate_simple_children(Child, Dynamics, SupName) ->
end || {Pid, Reply} <- Replies],
ok.
+monitor_children(Child=#child{restart_type=temporary}, Dynamics) ->
+ ?SETS:fold(fun (Pid, _Args, Pids) ->
+ erlang:monitor(process, Pid),
+ unlink(Pid),
+ exit(Pid, child_exit_reason(Child)),
+ [Pid | Pids]
+ end, [], Dynamics);
+monitor_children(Child, Dynamics) ->
+ dict:fold(fun (Pid, _Args, Pids) ->
+ erlang:monitor(process, Pid),
+ unlink(Pid),
+ exit(Pid, child_exit_reason(Child)),
+ [Pid | Pids]
+ end, [], Dynamics).
+
child_exit_reason(#child{shutdown = brutal_kill}) -> kill;
child_exit_reason(#child{}) -> shutdown.
@@ -1017,10 +1025,148 @@ monitor_child(Pid) ->
%%-----------------------------------------------------------------
+%% Func: terminate_dynamic_children/3
+%% Args: Child = child_rec()
+%% Dynamics = ?DICT() | ?SET()
+%% SupName = {local, atom()} | {global, atom()} | {pid(),Mod}
+%% Returns: ok
+%%
+%%
+%% Shutdown all dynamic children. This happens when the supervisor is
+%% stopped. Because the supervisor can have millions of dynamic children, we
+%% can have an significative overhead here.
+%%-----------------------------------------------------------------
+terminate_dynamic_children(Child, Dynamics, SupName) ->
+ {Pids, EStack0} = monitor_dynamic_children(Child, Dynamics),
+ Sz = ?SETS:size(Pids),
+ EStack = case Child#child.shutdown of
+ brutal_kill ->
+ ?SETS:fold(fun(P, _) -> exit(P, kill) end, ok, Pids),
+ wait_dynamic_children(Child, Pids, Sz, undefined, EStack0);
+ infinity ->
+ ?SETS:fold(fun(P, _) -> exit(P, shutdown) end, ok, Pids),
+ wait_dynamic_children(Child, Pids, Sz, undefined, EStack0);
+ Time ->
+ ?SETS:fold(fun(P, _) -> exit(P, shutdown) end, ok, Pids),
+ TRef = erlang:start_timer(Time, self(), kill),
+ wait_dynamic_children(Child, Pids, Sz, TRef, EStack0)
+ end,
+ %% Unroll stacked errors and report them
+ ?DICT:fold(fun(Reason, Ls, _) ->
+ report_error(shutdown_error, Reason,
+ Child#child{pid=Ls}, SupName)
+ end, ok, EStack).
+
+
+monitor_dynamic_children(#child{restart_type=temporary}, Dynamics) ->
+ ?SETS:fold(fun(P, {Pids, EStack}) ->
+ case monitor_child(P) of
+ ok ->
+ {?SETS:add_element(P, Pids), EStack};
+ {error, normal} ->
+ {Pids, EStack};
+ {error, Reason} ->
+ {Pids, ?DICT:append(Reason, P, EStack)}
+ end
+ end, {?SETS:new(), ?DICT:new()}, Dynamics);
+monitor_dynamic_children(#child{restart_type=RType}, Dynamics) ->
+ ?DICT:fold(fun(P, _, {Pids, EStack}) when is_pid(P) ->
+ case monitor_child(P) of
+ ok ->
+ {?SETS:add_element(P, Pids), EStack};
+ {error, normal} when RType =/= permanent ->
+ {Pids, EStack};
+ {error, Reason} ->
+ {Pids, ?DICT:append(Reason, P, EStack)}
+ end;
+ (?restarting(_), _, {Pids, EStack}) ->
+ {Pids, EStack}
+ end, {?SETS:new(), ?DICT:new()}, Dynamics).
+
+
+wait_dynamic_children(_Child, _Pids, 0, undefined, EStack) ->
+ EStack;
+wait_dynamic_children(_Child, _Pids, 0, TRef, EStack) ->
+ %% If the timer has expired before its cancellation, we must empty the
+ %% mail-box of the 'timeout'-message.
+ erlang:cancel_timer(TRef),
+ receive
+ {timeout, TRef, kill} ->
+ EStack
+ after 0 ->
+ EStack
+ end;
+wait_dynamic_children(#child{shutdown=brutal_kill} = Child, Pids, Sz,
+ TRef, EStack) ->
+ receive
+ {'DOWN', _MRef, process, Pid, killed} ->
+ wait_dynamic_children(Child, ?SETS:del_element(Pid, Pids), Sz-1,
+ TRef, EStack);
+
+ {'DOWN', _MRef, process, Pid, Reason} ->
+ wait_dynamic_children(Child, ?SETS:del_element(Pid, Pids), Sz-1,
+ TRef, ?DICT:append(Reason, Pid, EStack))
+ end;
+wait_dynamic_children(#child{restart_type=RType} = Child, Pids, Sz,
+ TRef, EStack) ->
+ receive
+ {'DOWN', _MRef, process, Pid, shutdown} ->
+ wait_dynamic_children(Child, ?SETS:del_element(Pid, Pids), Sz-1,
+ TRef, EStack);
+
+ {'DOWN', _MRef, process, Pid, normal} when RType =/= permanent ->
+ wait_dynamic_children(Child, ?SETS:del_element(Pid, Pids), Sz-1,
+ TRef, EStack);
+
+ {'DOWN', _MRef, process, Pid, Reason} ->
+ wait_dynamic_children(Child, ?SETS:del_element(Pid, Pids), Sz-1,
+ TRef, ?DICT:append(Reason, Pid, EStack));
+
+ {timeout, TRef, kill} ->
+ ?SETS:fold(fun(P, _) -> exit(P, kill) end, ok, Pids),
+ wait_dynamic_children(Child, Pids, Sz-1, undefined, EStack)
+ end.
+
+%%-----------------------------------------------------------------
%% Child/State manipulating functions.
%%-----------------------------------------------------------------
-state_del_child(#child{pid = Pid}, State) when ?is_simple(State) ->
- NDynamics = ?DICT:erase(Pid, State#state.dynamics),
+
+%% Note we do not want to save the parameter list for temporary processes as
+%% they will not be restarted, and hence we do not need this information.
+%% Especially for dynamic children to simple_one_for_one supervisors
+%% it could become very costly as it is not uncommon to spawn
+%% very many such processes.
+save_child(#child{restart_type = temporary,
+ mfa = {M, F, _}} = Child, #state{children = Children} = State) ->
+ State#state{children = [Child#child{mfa = {M, F, undefined}} |Children]};
+save_child(Child, #state{children = Children} = State) ->
+ State#state{children = [Child |Children]}.
+
+save_dynamic_child(temporary, Pid, _, #state{dynamics = Dynamics} = State) ->
+ State#state{dynamics = ?SETS:add_element(Pid, dynamics_db(temporary, Dynamics))};
+save_dynamic_child(RestartType, Pid, Args, #state{dynamics = Dynamics} = State) ->
+ State#state{dynamics = ?DICT:store(Pid, Args, dynamics_db(RestartType, Dynamics))}.
+
+dynamics_db(temporary, undefined) ->
+ ?SETS:new();
+dynamics_db(_, undefined) ->
+ ?DICT:new();
+dynamics_db(_,Dynamics) ->
+ Dynamics.
+
+dynamic_child_args(Pid, Dynamics) ->
+ case ?SETS:is_set(Dynamics) of
+ true ->
+ {ok, undefined};
+ false ->
+ ?DICT:find(Pid, Dynamics)
+ end.
+
+state_del_child(#child{pid = Pid, restart_type = temporary}, State) when ?is_simple(State) ->
+ NDynamics = ?SETS:del_element(Pid, dynamics_db(temporary, State#state.dynamics)),
+ State#state{dynamics = NDynamics};
+state_del_child(#child{pid = Pid, restart_type = RType}, State) when ?is_simple(State) ->
+ NDynamics = ?DICT:erase(Pid, dynamics_db(RType, State#state.dynamics)),
State#state{dynamics = NDynamics};
state_del_child(Child, State) ->
NChildren = del_child(Child#child.name, State#state.children),
@@ -1051,6 +1197,7 @@ split_child(_, [], After) ->
get_child(Name, State) ->
lists:keysearch(Name, #child.name, State#state.children).
+
replace_child(Child, State) ->
Chs = do_replace_child(Child, State#state.children),
State#state{children = Chs}.
@@ -1098,7 +1245,6 @@ init_state1(SupName, {Strategy, MaxIntensity, Period}, Mod, Args) ->
init_state1(_SupName, Type, _, _) ->
{invalid_type, Type}.
-validStrategy(simple_one_for_one_terminate) -> true;
validStrategy(simple_one_for_one) -> true;
validStrategy(one_for_one) -> true;
validStrategy(one_for_all) -> true;
diff --git a/src/supervisor2_tests.erl b/src/supervisor2_tests.erl
index e42ded7b..ff1a7f3c 100644
--- a/src/supervisor2_tests.erl
+++ b/src/supervisor2_tests.erl
@@ -65,6 +65,6 @@ init([Timeout]) ->
[{local, ?MODULE}, ?MODULE, []]},
transient, Timeout, supervisor, [?MODULE]}]}};
init([]) ->
- {ok, {{simple_one_for_one_terminate, 0, 1},
+ {ok, {{simple_one_for_one, 0, 1},
[{test_worker, {?MODULE, start_link, []},
temporary, 1000, worker, [?MODULE]}]}}.
diff --git a/src/test_sup.erl b/src/test_sup.erl
index 7f4b5049..6a56e64a 100644
--- a/src/test_sup.erl
+++ b/src/test_sup.erl
@@ -34,7 +34,7 @@
%%----------------------------------------------------------------------------
test_supervisor_delayed_restart() ->
- passed = with_sup(simple_one_for_one_terminate,
+ passed = with_sup(simple_one_for_one,
fun (SupPid) ->
{ok, _ChildPid} =
supervisor2:start_child(SupPid, []),