summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2010-04-16 17:40:58 +0100
committerMatthias Radestock <matthias@lshift.net>2010-04-16 17:40:58 +0100
commitc8c21956bfe685eb9f9b57b955c2ce91d86b4d71 (patch)
tree5a0a12a59e53fe438a8bee90fc48e2f4274dd41f
parent39fdd3039cf23a323df9136f94c4266cc602922f (diff)
parentd8314cdeb215f33d25092d37903be91d94ad3459 (diff)
downloadrabbitmq-server-c8c21956bfe685eb9f9b57b955c2ce91d86b4d71.tar.gz
merge default into bug22616
-rw-r--r--src/rabbit.erl7
-rw-r--r--src/rabbit_amqqueue_sup.erl12
-rw-r--r--src/rabbit_memory_monitor.erl293
-rw-r--r--src/supervisor2.erl917
4 files changed, 1224 insertions, 5 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 259ac040..733a7a88 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -91,6 +91,13 @@
{requires, kernel_ready},
{enables, core_initialized}]}).
+-rabbit_boot_step({rabbit_memory_monitor,
+ [{description, "memory moniter"},
+ {mfa, {rabbit_sup, start_restartable_child,
+ [rabbit_memory_monitor]}},
+ {requires, rabbit_alarm},
+ {enables, core_initialized}]}).
+
-rabbit_boot_step({rabbit_router,
[{description, "cluster router"},
{mfa, {rabbit_sup, start_restartable_child,
diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl
index dbd65780..97d6cef9 100644
--- a/src/rabbit_amqqueue_sup.erl
+++ b/src/rabbit_amqqueue_sup.erl
@@ -31,21 +31,23 @@
-module(rabbit_amqqueue_sup).
--behaviour(supervisor).
+-behaviour(supervisor2).
-export([start_link/0, start_child/1]).
-export([init/1]).
+-include("rabbit.hrl").
+
-define(SERVER, ?MODULE).
start_link() ->
- supervisor:start_link({local, ?SERVER}, ?MODULE, []).
+ supervisor2:start_link({local, ?SERVER}, ?MODULE, []).
start_child(Args) ->
- supervisor:start_child(?SERVER, Args).
+ supervisor2:start_child(?SERVER, Args).
init([]) ->
- {ok, {{simple_one_for_one, 10, 10},
+ {ok, {{simple_one_for_one_terminate, 10, 10},
[{rabbit_amqqueue, {rabbit_amqqueue_process, start_link, []},
- temporary, brutal_kill, worker, [rabbit_amqqueue_process]}]}}.
+ temporary, ?MAX_WAIT, worker, [rabbit_amqqueue_process]}]}}.
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl
new file mode 100644
index 00000000..91e97ffe
--- /dev/null
+++ b/src/rabbit_memory_monitor.erl
@@ -0,0 +1,293 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+
+%% This module handles the node-wide memory statistics.
+%% It receives statistics from all queues, counts the desired
+%% queue length (in seconds), and sends this information back to
+%% queues.
+
+-module(rabbit_memory_monitor).
+
+-behaviour(gen_server2).
+
+-export([start_link/0, update/0, register/2, deregister/1,
+ report_ram_duration/2, stop/0]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-record(process, {pid, reported, sent, callback, monitor}).
+
+-record(state, {timer, %% 'internal_update' timer
+ queue_durations, %% ets #process
+ queue_duration_sum, %% sum of all queue_durations
+ queue_duration_count, %% number of elements in sum
+ memory_limit, %% how much memory we intend to use
+ desired_duration %% the desired queue duration
+ }).
+
+-define(SERVER, ?MODULE).
+-define(DEFAULT_UPDATE_INTERVAL, 2500).
+-define(TABLE_NAME, ?MODULE).
+
+%% Because we have a feedback loop here, we need to ensure that we
+%% have some space for when the queues don't quite respond as fast as
+%% we would like, or when there is buffering going on in other parts
+%% of the system. In short, we aim to stay some distance away from
+%% when the memory alarms will go off, which cause channel.flow.
+%% Note that all other Thresholds are relative to this scaling.
+-define(MEMORY_LIMIT_SCALING, 0.4).
+
+-define(LIMIT_THRESHOLD, 0.5). %% don't limit queues when mem use is < this
+
+%% If all queues are pushed to disk (duration 0), then the sum of
+%% their reported lengths will be 0. If memory then becomes available,
+%% unless we manually intervene, the sum will remain 0, and the queues
+%% will never get a non-zero duration. Thus when the mem use is <
+%% SUM_INC_THRESHOLD, increase the sum artificially by SUM_INC_AMOUNT.
+-define(SUM_INC_THRESHOLD, 0.95).
+-define(SUM_INC_AMOUNT, 1.0).
+
+%% If user disabled vm_memory_monitor, let's assume 1GB of memory we can use.
+-define(MEMORY_SIZE_FOR_DISABLED_VMM, 1073741824).
+
+-define(EPSILON, 0.000001). %% less than this and we clamp to 0
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/0 :: () -> 'ignore' | {'error', _} | {'ok', pid()}).
+-spec(update/0 :: () -> 'ok').
+-spec(register/2 :: (pid(), {atom(),atom(),[any()]}) -> 'ok').
+-spec(deregister/1 :: (pid()) -> 'ok').
+-spec(report_ram_duration/2 :: (pid(), float() | 'infinity') -> number()).
+-spec(stop/0 :: () -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+%% Public API
+%%----------------------------------------------------------------------------
+
+start_link() ->
+ gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []).
+
+update() ->
+ gen_server2:cast(?SERVER, update).
+
+register(Pid, MFA = {_M, _F, _A}) ->
+ gen_server2:call(?SERVER, {register, Pid, MFA}, infinity).
+
+deregister(Pid) ->
+ gen_server2:cast(?SERVER, {deregister, Pid}).
+
+report_ram_duration(Pid, QueueDuration) ->
+ gen_server2:call(?SERVER,
+ {report_ram_duration, Pid, QueueDuration}, infinity).
+
+stop() ->
+ gen_server2:cast(?SERVER, stop).
+
+%%----------------------------------------------------------------------------
+%% Gen_server callbacks
+%%----------------------------------------------------------------------------
+
+init([]) ->
+ MemoryLimit = trunc(?MEMORY_LIMIT_SCALING *
+ (try
+ vm_memory_monitor:get_memory_limit()
+ catch
+ exit:{noproc, _} -> ?MEMORY_SIZE_FOR_DISABLED_VMM
+ end)),
+
+ {ok, TRef} = timer:apply_interval(?DEFAULT_UPDATE_INTERVAL,
+ ?SERVER, update, []),
+
+ Ets = ets:new(?TABLE_NAME, [set, private, {keypos, #process.pid}]),
+
+ {ok, internal_update(
+ #state { timer = TRef,
+ queue_durations = Ets,
+ queue_duration_sum = 0.0,
+ queue_duration_count = 0,
+ memory_limit = MemoryLimit,
+ desired_duration = infinity })}.
+
+handle_call({report_ram_duration, Pid, QueueDuration}, From,
+ State = #state { queue_duration_sum = Sum,
+ queue_duration_count = Count,
+ queue_durations = Durations,
+ desired_duration = SendDuration }) ->
+
+ [Proc = #process { reported = PrevQueueDuration }] =
+ ets:lookup(Durations, Pid),
+
+ gen_server2:reply(From, SendDuration),
+
+ {Sum1, Count1} =
+ case {PrevQueueDuration, QueueDuration} of
+ {infinity, infinity} -> {Sum, Count};
+ {infinity, _} -> {Sum + QueueDuration, Count + 1};
+ {_, infinity} -> {Sum - PrevQueueDuration, Count - 1};
+ {_, _} -> {Sum - PrevQueueDuration + QueueDuration,
+ Count}
+ end,
+ true = ets:insert(Durations, Proc #process { reported = QueueDuration,
+ sent = SendDuration }),
+ {noreply, State #state { queue_duration_sum = zero_clamp(Sum1),
+ queue_duration_count = Count1 }};
+
+handle_call({register, Pid, MFA}, _From,
+ State = #state { queue_durations = Durations }) ->
+ MRef = erlang:monitor(process, Pid),
+ true = ets:insert(Durations, #process { pid = Pid, reported = infinity,
+ sent = infinity, callback = MFA,
+ monitor = MRef }),
+ {reply, ok, State};
+
+handle_call(_Request, _From, State) ->
+ {noreply, State}.
+
+handle_cast(update, State) ->
+ {noreply, internal_update(State)};
+
+handle_cast({deregister, Pid}, State) ->
+ {noreply, internal_deregister(Pid, true, State)};
+
+handle_cast(stop, State) ->
+ {stop, normal, State};
+
+handle_cast(_Request, State) ->
+ {noreply, State}.
+
+handle_info({'DOWN', _MRef, process, Pid, _Reason}, State) ->
+ {noreply, internal_deregister(Pid, false, State)};
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, #state { timer = TRef }) ->
+ timer:cancel(TRef),
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+
+%%----------------------------------------------------------------------------
+%% Internal functions
+%%----------------------------------------------------------------------------
+
+zero_clamp(Sum) ->
+ case Sum < ?EPSILON of
+ true -> 0.0;
+ false -> Sum
+ end.
+
+internal_deregister(Pid, Demonitor,
+ State = #state { queue_duration_sum = Sum,
+ queue_duration_count = Count,
+ queue_durations = Durations }) ->
+ case ets:lookup(Durations, Pid) of
+ [] -> State;
+ [#process { reported = PrevQueueDuration, monitor = MRef }] ->
+ true = case Demonitor of
+ true -> erlang:demonitor(MRef);
+ false -> true
+ end,
+ {Sum1, Count1} =
+ case PrevQueueDuration of
+ infinity -> {Sum, Count};
+ _ -> {zero_clamp(Sum - PrevQueueDuration),
+ Count - 1}
+ end,
+ true = ets:delete(Durations, Pid),
+ State #state { queue_duration_sum = Sum1,
+ queue_duration_count = Count1 }
+ end.
+
+internal_update(State = #state { memory_limit = Limit,
+ queue_durations = Durations,
+ desired_duration = DesiredDurationAvg,
+ queue_duration_sum = Sum,
+ queue_duration_count = Count }) ->
+ MemoryRatio = erlang:memory(total) / Limit,
+ DesiredDurationAvg1 =
+ case MemoryRatio < ?LIMIT_THRESHOLD orelse Count == 0 of
+ true ->
+ infinity;
+ false ->
+ Sum1 = case MemoryRatio < ?SUM_INC_THRESHOLD of
+ true -> Sum + ?SUM_INC_AMOUNT;
+ false -> Sum
+ end,
+ (Sum1 / Count) / MemoryRatio
+ end,
+ State1 = State #state { desired_duration = DesiredDurationAvg1 },
+
+ %% only inform queues immediately if the desired duration has
+ %% decreased
+ case DesiredDurationAvg1 == infinity orelse
+ (DesiredDurationAvg /= infinity andalso
+ DesiredDurationAvg1 >= DesiredDurationAvg) of
+ true ->
+ ok;
+ false ->
+ true =
+ ets:foldl(
+ fun (Proc = #process { reported = QueueDuration,
+ sent = PrevSendDuration,
+ callback = {M, F, A} }, true) ->
+ case (case {QueueDuration, PrevSendDuration} of
+ {infinity, infinity} ->
+ true;
+ {infinity, D} ->
+ DesiredDurationAvg1 < D;
+ {D, infinity} ->
+ DesiredDurationAvg1 < D;
+ {D1, D2} ->
+ DesiredDurationAvg1 <
+ lists:min([D1,D2])
+ end) of
+ true ->
+ ok = erlang:apply(
+ M, F, A ++ [DesiredDurationAvg1]),
+ ets:insert(
+ Durations,
+ Proc #process {sent = DesiredDurationAvg1});
+ false ->
+ true
+ end
+ end, true, Durations)
+ end,
+ State1.
diff --git a/src/supervisor2.erl b/src/supervisor2.erl
new file mode 100644
index 00000000..55753512
--- /dev/null
+++ b/src/supervisor2.erl
@@ -0,0 +1,917 @@
+%% This file is a copy of supervisor.erl from the R13B-3 Erlang/OTP
+%% distribution, with the following modifications:
+%%
+%% 1) the module name is supervisor2
+%%
+%% 2) there is a new strategy called
+%% simple_one_for_one_terminate. This is exactly the same as for
+%% simple_one_for_one, except that children *are* explicitly
+%% terminated as per the shutdown component of the child_spec.
+%%
+%% All modifications are (C) 2010 LShift Ltd.
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 1996-2009. All Rights Reserved.
+%%
+%% The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved online at http://www.erlang.org/.
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% %CopyrightEnd%
+%%
+-module(supervisor2).
+
+-behaviour(gen_server).
+
+%% External exports
+-export([start_link/2,start_link/3,
+ start_child/2, restart_child/2,
+ delete_child/2, terminate_child/2,
+ which_children/1,
+ check_childspecs/1]).
+
+-export([behaviour_info/1]).
+
+%% Internal exports
+-export([init/1, handle_call/3, handle_info/2, terminate/2, code_change/3]).
+-export([handle_cast/2]).
+
+-define(DICT, dict).
+
+-record(state, {name,
+ strategy,
+ children = [],
+ dynamics = ?DICT:new(),
+ intensity,
+ period,
+ restarts = [],
+ module,
+ args}).
+
+-record(child, {pid = undefined, % pid is undefined when child is not running
+ name,
+ mfa,
+ restart_type,
+ shutdown,
+ child_type,
+ modules = []}).
+
+-define(is_simple(State), State#state.strategy =:= simple_one_for_one orelse
+ State#state.strategy =:= simple_one_for_one_terminate).
+-define(is_terminate_simple(State),
+ State#state.strategy =:= simple_one_for_one_terminate).
+
+behaviour_info(callbacks) ->
+ [{init,1}];
+behaviour_info(_Other) ->
+ undefined.
+
+%%% ---------------------------------------------------
+%%% This is a general process supervisor built upon gen_server.erl.
+%%% Servers/processes should/could also be built using gen_server.erl.
+%%% SupName = {local, atom()} | {global, atom()}.
+%%% ---------------------------------------------------
+start_link(Mod, Args) ->
+ gen_server:start_link(?MODULE, {self, Mod, Args}, []).
+
+start_link(SupName, Mod, Args) ->
+ gen_server:start_link(SupName, ?MODULE, {SupName, Mod, Args}, []).
+
+%%% ---------------------------------------------------
+%%% Interface functions.
+%%% ---------------------------------------------------
+start_child(Supervisor, ChildSpec) ->
+ call(Supervisor, {start_child, ChildSpec}).
+
+restart_child(Supervisor, Name) ->
+ call(Supervisor, {restart_child, Name}).
+
+delete_child(Supervisor, Name) ->
+ call(Supervisor, {delete_child, Name}).
+
+%%-----------------------------------------------------------------
+%% Func: terminate_child/2
+%% Returns: ok | {error, Reason}
+%% Note that the child is *always* terminated in some
+%% way (maybe killed).
+%%-----------------------------------------------------------------
+terminate_child(Supervisor, Name) ->
+ call(Supervisor, {terminate_child, Name}).
+
+which_children(Supervisor) ->
+ call(Supervisor, which_children).
+
+call(Supervisor, Req) ->
+ gen_server:call(Supervisor, Req, infinity).
+
+check_childspecs(ChildSpecs) when is_list(ChildSpecs) ->
+ case check_startspec(ChildSpecs) of
+ {ok, _} -> ok;
+ Error -> {error, Error}
+ end;
+check_childspecs(X) -> {error, {badarg, X}}.
+
+%%% ---------------------------------------------------
+%%%
+%%% Initialize the supervisor.
+%%%
+%%% ---------------------------------------------------
+init({SupName, Mod, Args}) ->
+ process_flag(trap_exit, true),
+ case Mod:init(Args) of
+ {ok, {SupFlags, StartSpec}} ->
+ case init_state(SupName, SupFlags, Mod, Args) of
+ {ok, State} when ?is_simple(State) ->
+ init_dynamic(State, StartSpec);
+ {ok, State} ->
+ init_children(State, StartSpec);
+ Error ->
+ {stop, {supervisor_data, Error}}
+ end;
+ ignore ->
+ ignore;
+ Error ->
+ {stop, {bad_return, {Mod, init, Error}}}
+ end.
+
+init_children(State, StartSpec) ->
+ SupName = State#state.name,
+ case check_startspec(StartSpec) of
+ {ok, Children} ->
+ case start_children(Children, SupName) of
+ {ok, NChildren} ->
+ {ok, State#state{children = NChildren}};
+ {error, NChildren} ->
+ terminate_children(NChildren, SupName),
+ {stop, shutdown}
+ end;
+ Error ->
+ {stop, {start_spec, Error}}
+ end.
+
+init_dynamic(State, [StartSpec]) ->
+ case check_startspec([StartSpec]) of
+ {ok, Children} ->
+ {ok, State#state{children = Children}};
+ Error ->
+ {stop, {start_spec, Error}}
+ end;
+init_dynamic(_State, StartSpec) ->
+ {stop, {bad_start_spec, StartSpec}}.
+
+%%-----------------------------------------------------------------
+%% Func: start_children/2
+%% Args: Children = [#child] in start order
+%% SupName = {local, atom()} | {global, atom()} | {pid(),Mod}
+%% Purpose: Start all children. The new list contains #child's
+%% with pids.
+%% Returns: {ok, NChildren} | {error, NChildren}
+%% NChildren = [#child] in termination order (reversed
+%% start order)
+%%-----------------------------------------------------------------
+start_children(Children, SupName) -> start_children(Children, [], SupName).
+
+start_children([Child|Chs], NChildren, SupName) ->
+ case do_start_child(SupName, Child) of
+ {ok, Pid} ->
+ start_children(Chs, [Child#child{pid = Pid}|NChildren], SupName);
+ {ok, Pid, _Extra} ->
+ start_children(Chs, [Child#child{pid = Pid}|NChildren], SupName);
+ {error, Reason} ->
+ report_error(start_error, Reason, Child, SupName),
+ {error, lists:reverse(Chs) ++ [Child | NChildren]}
+ end;
+start_children([], NChildren, _SupName) ->
+ {ok, NChildren}.
+
+do_start_child(SupName, Child) ->
+ #child{mfa = {M, F, A}} = Child,
+ case catch apply(M, F, A) of
+ {ok, Pid} when is_pid(Pid) ->
+ NChild = Child#child{pid = Pid},
+ report_progress(NChild, SupName),
+ {ok, Pid};
+ {ok, Pid, Extra} when is_pid(Pid) ->
+ NChild = Child#child{pid = Pid},
+ report_progress(NChild, SupName),
+ {ok, Pid, Extra};
+ ignore ->
+ {ok, undefined};
+ {error, What} -> {error, What};
+ What -> {error, What}
+ end.
+
+do_start_child_i(M, F, A) ->
+ case catch apply(M, F, A) of
+ {ok, Pid} when is_pid(Pid) ->
+ {ok, Pid};
+ {ok, Pid, Extra} when is_pid(Pid) ->
+ {ok, Pid, Extra};
+ ignore ->
+ {ok, undefined};
+ {error, Error} ->
+ {error, Error};
+ What ->
+ {error, What}
+ end.
+
+
+%%% ---------------------------------------------------
+%%%
+%%% Callback functions.
+%%%
+%%% ---------------------------------------------------
+handle_call({start_child, EArgs}, _From, State) when ?is_simple(State) ->
+ #child{mfa = {M, F, A}} = hd(State#state.children),
+ Args = A ++ EArgs,
+ case do_start_child_i(M, F, Args) of
+ {ok, Pid} ->
+ NState = State#state{dynamics =
+ ?DICT:store(Pid, Args, State#state.dynamics)},
+ {reply, {ok, Pid}, NState};
+ {ok, Pid, Extra} ->
+ NState = State#state{dynamics =
+ ?DICT:store(Pid, Args, State#state.dynamics)},
+ {reply, {ok, Pid, Extra}, NState};
+ What ->
+ {reply, What, State}
+ end;
+
+%%% The requests terminate_child, delete_child and restart_child are
+%%% invalid for simple_one_for_one and simple_one_for_one_terminate
+%%% supervisors.
+handle_call({_Req, _Data}, _From, State) when ?is_simple(State) ->
+ {reply, {error, State#state.strategy}, State};
+
+handle_call({start_child, ChildSpec}, _From, State) ->
+ case check_childspec(ChildSpec) of
+ {ok, Child} ->
+ {Resp, NState} = handle_start_child(Child, State),
+ {reply, Resp, NState};
+ What ->
+ {reply, {error, What}, State}
+ end;
+
+handle_call({restart_child, Name}, _From, State) ->
+ case get_child(Name, State) of
+ {value, Child} when Child#child.pid =:= undefined ->
+ case do_start_child(State#state.name, Child) of
+ {ok, Pid} ->
+ NState = replace_child(Child#child{pid = Pid}, State),
+ {reply, {ok, Pid}, NState};
+ {ok, Pid, Extra} ->
+ NState = replace_child(Child#child{pid = Pid}, State),
+ {reply, {ok, Pid, Extra}, NState};
+ Error ->
+ {reply, Error, State}
+ end;
+ {value, _} ->
+ {reply, {error, running}, State};
+ _ ->
+ {reply, {error, not_found}, State}
+ end;
+
+handle_call({delete_child, Name}, _From, State) ->
+ case get_child(Name, State) of
+ {value, Child} when Child#child.pid =:= undefined ->
+ NState = remove_child(Child, State),
+ {reply, ok, NState};
+ {value, _} ->
+ {reply, {error, running}, State};
+ _ ->
+ {reply, {error, not_found}, State}
+ end;
+
+handle_call({terminate_child, Name}, _From, State) ->
+ case get_child(Name, State) of
+ {value, Child} ->
+ NChild = do_terminate(Child, State#state.name),
+ {reply, ok, replace_child(NChild, State)};
+ _ ->
+ {reply, {error, not_found}, State}
+ end;
+
+handle_call(which_children, _From, State) when ?is_simple(State) ->
+ [#child{child_type = CT, modules = Mods}] = State#state.children,
+ Reply = lists:map(fun({Pid, _}) -> {undefined, Pid, CT, Mods} end,
+ ?DICT:to_list(State#state.dynamics)),
+ {reply, Reply, State};
+
+handle_call(which_children, _From, State) ->
+ Resp =
+ lists:map(fun(#child{pid = Pid, name = Name,
+ child_type = ChildType, modules = Mods}) ->
+ {Name, Pid, ChildType, Mods}
+ end,
+ State#state.children),
+ {reply, Resp, State}.
+
+
+%%% Hopefully cause a function-clause as there is no API function
+%%% that utilizes cast.
+handle_cast(null, State) ->
+ error_logger:error_msg("ERROR: Supervisor received cast-message 'null'~n",
+ []),
+
+ {noreply, State}.
+
+%%
+%% Take care of terminated children.
+%%
+handle_info({'EXIT', Pid, Reason}, State) ->
+ case restart_child(Pid, Reason, State) of
+ {ok, State1} ->
+ {noreply, State1};
+ {shutdown, State1} ->
+ {stop, shutdown, State1}
+ end;
+
+handle_info(Msg, State) ->
+ error_logger:error_msg("Supervisor received unexpected message: ~p~n",
+ [Msg]),
+ {noreply, State}.
+%%
+%% Terminate this server.
+%%
+terminate(_Reason, State) when ?is_terminate_simple(State) ->
+ terminate_simple_children(
+ hd(State#state.children), State#state.dynamics, State#state.name),
+ ok;
+terminate(_Reason, State) ->
+ terminate_children(State#state.children, State#state.name),
+ ok.
+
+%%
+%% Change code for the supervisor.
+%% Call the new call-back module and fetch the new start specification.
+%% Combine the new spec. with the old. If the new start spec. is
+%% not valid the code change will not succeed.
+%% Use the old Args as argument to Module:init/1.
+%% NOTE: This requires that the init function of the call-back module
+%% does not have any side effects.
+%%
+code_change(_, State, _) ->
+ case (State#state.module):init(State#state.args) of
+ {ok, {SupFlags, StartSpec}} ->
+ case catch check_flags(SupFlags) of
+ ok ->
+ {Strategy, MaxIntensity, Period} = SupFlags,
+ update_childspec(State#state{strategy = Strategy,
+ intensity = MaxIntensity,
+ period = Period},
+ StartSpec);
+ Error ->
+ {error, Error}
+ end;
+ ignore ->
+ {ok, State};
+ Error ->
+ Error
+ end.
+
+check_flags({Strategy, MaxIntensity, Period}) ->
+ validStrategy(Strategy),
+ validIntensity(MaxIntensity),
+ validPeriod(Period),
+ ok;
+check_flags(What) ->
+ {bad_flags, What}.
+
+update_childspec(State, StartSpec) when ?is_simple(State) ->
+ case check_startspec(StartSpec) of
+ {ok, [Child]} ->
+ {ok, State#state{children = [Child]}};
+ Error ->
+ {error, Error}
+ end;
+
+update_childspec(State, StartSpec) ->
+ case check_startspec(StartSpec) of
+ {ok, Children} ->
+ OldC = State#state.children, % In reverse start order !
+ NewC = update_childspec1(OldC, Children, []),
+ {ok, State#state{children = NewC}};
+ Error ->
+ {error, Error}
+ end.
+
+update_childspec1([Child|OldC], Children, KeepOld) ->
+ case update_chsp(Child, Children) of
+ {ok,NewChildren} ->
+ update_childspec1(OldC, NewChildren, KeepOld);
+ false ->
+ update_childspec1(OldC, Children, [Child|KeepOld])
+ end;
+update_childspec1([], Children, KeepOld) ->
+ % Return them in (keeped) reverse start order.
+ lists:reverse(Children ++ KeepOld).
+
+update_chsp(OldCh, Children) ->
+ case lists:map(fun(Ch) when OldCh#child.name =:= Ch#child.name ->
+ Ch#child{pid = OldCh#child.pid};
+ (Ch) ->
+ Ch
+ end,
+ Children) of
+ Children ->
+ false; % OldCh not found in new spec.
+ NewC ->
+ {ok, NewC}
+ end.
+
+%%% ---------------------------------------------------
+%%% Start a new child.
+%%% ---------------------------------------------------
+
+handle_start_child(Child, State) ->
+ case get_child(Child#child.name, State) of
+ false ->
+ case do_start_child(State#state.name, Child) of
+ {ok, Pid} ->
+ Children = State#state.children,
+ {{ok, Pid},
+ State#state{children =
+ [Child#child{pid = Pid}|Children]}};
+ {ok, Pid, Extra} ->
+ Children = State#state.children,
+ {{ok, Pid, Extra},
+ State#state{children =
+ [Child#child{pid = Pid}|Children]}};
+ {error, What} ->
+ {{error, {What, Child}}, State}
+ end;
+ {value, OldChild} when OldChild#child.pid =/= undefined ->
+ {{error, {already_started, OldChild#child.pid}}, State};
+ {value, _OldChild} ->
+ {{error, already_present}, State}
+ end.
+
+%%% ---------------------------------------------------
+%%% Restart. A process has terminated.
+%%% Returns: {ok, #state} | {shutdown, #state}
+%%% ---------------------------------------------------
+
+restart_child(Pid, Reason, State) when ?is_simple(State) ->
+ case ?DICT:find(Pid, State#state.dynamics) of
+ {ok, Args} ->
+ [Child] = State#state.children,
+ RestartType = Child#child.restart_type,
+ {M, F, _} = Child#child.mfa,
+ NChild = Child#child{pid = Pid, mfa = {M, F, Args}},
+ do_restart(RestartType, Reason, NChild, State);
+ error ->
+ {ok, State}
+ end;
+restart_child(Pid, Reason, State) ->
+ Children = State#state.children,
+ case lists:keysearch(Pid, #child.pid, Children) of
+ {value, Child} ->
+ RestartType = Child#child.restart_type,
+ do_restart(RestartType, Reason, Child, State);
+ _ ->
+ {ok, State}
+ end.
+
+do_restart(permanent, Reason, Child, State) ->
+ report_error(child_terminated, Reason, Child, State#state.name),
+ restart(Child, State);
+do_restart(_, normal, Child, State) ->
+ NState = state_del_child(Child, State),
+ {ok, NState};
+do_restart(_, shutdown, Child, State) ->
+ NState = state_del_child(Child, State),
+ {ok, NState};
+do_restart(transient, Reason, Child, State) ->
+ report_error(child_terminated, Reason, Child, State#state.name),
+ restart(Child, State);
+do_restart(temporary, Reason, Child, State) ->
+ report_error(child_terminated, Reason, Child, State#state.name),
+ NState = state_del_child(Child, State),
+ {ok, NState}.
+
+restart(Child, State) ->
+ case add_restart(State) of
+ {ok, NState} ->
+ restart(NState#state.strategy, Child, NState);
+ {terminate, NState} ->
+ report_error(shutdown, reached_max_restart_intensity,
+ Child, State#state.name),
+ {shutdown, remove_child(Child, NState)}
+ end.
+
+restart(Strategy, Child, State)
+ when Strategy =:= simple_one_for_one orelse
+ Strategy =:= simple_one_for_one_terminate ->
+ #child{mfa = {M, F, A}} = Child,
+ Dynamics = ?DICT:erase(Child#child.pid, State#state.dynamics),
+ case do_start_child_i(M, F, A) of
+ {ok, Pid} ->
+ NState = State#state{dynamics = ?DICT:store(Pid, A, Dynamics)},
+ {ok, NState};
+ {ok, Pid, _Extra} ->
+ NState = State#state{dynamics = ?DICT:store(Pid, A, Dynamics)},
+ {ok, NState};
+ {error, Error} ->
+ report_error(start_error, Error, Child, State#state.name),
+ restart(Child, State)
+ end;
+restart(one_for_one, Child, State) ->
+ case do_start_child(State#state.name, Child) of
+ {ok, Pid} ->
+ NState = replace_child(Child#child{pid = Pid}, State),
+ {ok, NState};
+ {ok, Pid, _Extra} ->
+ NState = replace_child(Child#child{pid = Pid}, State),
+ {ok, NState};
+ {error, Reason} ->
+ report_error(start_error, Reason, Child, State#state.name),
+ restart(Child, State)
+ end;
+restart(rest_for_one, Child, State) ->
+ {ChAfter, ChBefore} = split_child(Child#child.pid, State#state.children),
+ ChAfter2 = terminate_children(ChAfter, State#state.name),
+ case start_children(ChAfter2, State#state.name) of
+ {ok, ChAfter3} ->
+ {ok, State#state{children = ChAfter3 ++ ChBefore}};
+ {error, ChAfter3} ->
+ restart(Child, State#state{children = ChAfter3 ++ ChBefore})
+ end;
+restart(one_for_all, Child, State) ->
+ Children1 = del_child(Child#child.pid, State#state.children),
+ Children2 = terminate_children(Children1, State#state.name),
+ case start_children(Children2, State#state.name) of
+ {ok, NChs} ->
+ {ok, State#state{children = NChs}};
+ {error, NChs} ->
+ restart(Child, State#state{children = NChs})
+ end.
+
+%%-----------------------------------------------------------------
+%% Func: terminate_children/2
+%% Args: Children = [#child] in termination order
+%% SupName = {local, atom()} | {global, atom()} | {pid(),Mod}
+%% Returns: NChildren = [#child] in
+%% startup order (reversed termination order)
+%%-----------------------------------------------------------------
+terminate_children(Children, SupName) ->
+ terminate_children(Children, SupName, []).
+
+terminate_children([Child | Children], SupName, Res) ->
+ NChild = do_terminate(Child, SupName),
+ terminate_children(Children, SupName, [NChild | Res]);
+terminate_children([], _SupName, Res) ->
+ Res.
+
+terminate_simple_children(Child, Dynamics, SupName) ->
+ dict:fold(fun (Pid, _Args, _Any) ->
+ do_terminate(Child#child{pid = Pid}, SupName)
+ end, ok, Dynamics),
+ ok.
+
+do_terminate(Child, SupName) when Child#child.pid =/= undefined ->
+ case shutdown(Child#child.pid,
+ Child#child.shutdown) of
+ ok ->
+ Child#child{pid = undefined};
+ {error, OtherReason} ->
+ report_error(shutdown_error, OtherReason, Child, SupName),
+ Child#child{pid = undefined}
+ end;
+do_terminate(Child, _SupName) ->
+ Child.
+
+%%-----------------------------------------------------------------
+%% Shutdowns a child. We must check the EXIT value
+%% of the child, because it might have died with another reason than
+%% the wanted. In that case we want to report the error. We put a
+%% monitor on the child an check for the 'DOWN' message instead of
+%% checking for the 'EXIT' message, because if we check the 'EXIT'
+%% message a "naughty" child, who does unlink(Sup), could hang the
+%% supervisor.
+%% Returns: ok | {error, OtherReason} (this should be reported)
+%%-----------------------------------------------------------------
+shutdown(Pid, brutal_kill) ->
+
+ case monitor_child(Pid) of
+ ok ->
+ exit(Pid, kill),
+ receive
+ {'DOWN', _MRef, process, Pid, killed} ->
+ ok;
+ {'DOWN', _MRef, process, Pid, OtherReason} ->
+ {error, OtherReason}
+ end;
+ {error, Reason} ->
+ {error, Reason}
+ end;
+
+shutdown(Pid, Time) ->
+
+ case monitor_child(Pid) of
+ ok ->
+ exit(Pid, shutdown), %% Try to shutdown gracefully
+ receive
+ {'DOWN', _MRef, process, Pid, shutdown} ->
+ ok;
+ {'DOWN', _MRef, process, Pid, OtherReason} ->
+ {error, OtherReason}
+ after Time ->
+ exit(Pid, kill), %% Force termination.
+ receive
+ {'DOWN', _MRef, process, Pid, OtherReason} ->
+ {error, OtherReason}
+ end
+ end;
+ {error, Reason} ->
+ {error, Reason}
+ end.
+
+%% Help function to shutdown/2 switches from link to monitor approach
+monitor_child(Pid) ->
+
+ %% Do the monitor operation first so that if the child dies
+ %% before the monitoring is done causing a 'DOWN'-message with
+ %% reason noproc, we will get the real reason in the 'EXIT'-message
+ %% unless a naughty child has already done unlink...
+ erlang:monitor(process, Pid),
+ unlink(Pid),
+
+ receive
+ %% If the child dies before the unlik we must empty
+ %% the mail-box of the 'EXIT'-message and the 'DOWN'-message.
+ {'EXIT', Pid, Reason} ->
+ receive
+ {'DOWN', _, process, Pid, _} ->
+ {error, Reason}
+ end
+ after 0 ->
+ %% If a naughty child did unlink and the child dies before
+ %% monitor the result will be that shutdown/2 receives a
+ %% 'DOWN'-message with reason noproc.
+ %% If the child should die after the unlink there
+ %% will be a 'DOWN'-message with a correct reason
+ %% that will be handled in shutdown/2.
+ ok
+ end.
+
+
+%%-----------------------------------------------------------------
+%% Child/State manipulating functions.
+%%-----------------------------------------------------------------
+state_del_child(#child{pid = Pid}, State) when ?is_simple(State) ->
+ NDynamics = ?DICT:erase(Pid, State#state.dynamics),
+ State#state{dynamics = NDynamics};
+state_del_child(Child, State) ->
+ NChildren = del_child(Child#child.name, State#state.children),
+ State#state{children = NChildren}.
+
+del_child(Name, [Ch|Chs]) when Ch#child.name =:= Name ->
+ [Ch#child{pid = undefined} | Chs];
+del_child(Pid, [Ch|Chs]) when Ch#child.pid =:= Pid ->
+ [Ch#child{pid = undefined} | Chs];
+del_child(Name, [Ch|Chs]) ->
+ [Ch|del_child(Name, Chs)];
+del_child(_, []) ->
+ [].
+
+%% Chs = [S4, S3, Ch, S1, S0]
+%% Ret: {[S4, S3, Ch], [S1, S0]}
+split_child(Name, Chs) ->
+ split_child(Name, Chs, []).
+
+split_child(Name, [Ch|Chs], After) when Ch#child.name =:= Name ->
+ {lists:reverse([Ch#child{pid = undefined} | After]), Chs};
+split_child(Pid, [Ch|Chs], After) when Ch#child.pid =:= Pid ->
+ {lists:reverse([Ch#child{pid = undefined} | After]), Chs};
+split_child(Name, [Ch|Chs], After) ->
+ split_child(Name, Chs, [Ch | After]);
+split_child(_, [], After) ->
+ {lists:reverse(After), []}.
+
+get_child(Name, State) ->
+ lists:keysearch(Name, #child.name, State#state.children).
+replace_child(Child, State) ->
+ Chs = do_replace_child(Child, State#state.children),
+ State#state{children = Chs}.
+
+do_replace_child(Child, [Ch|Chs]) when Ch#child.name =:= Child#child.name ->
+ [Child | Chs];
+do_replace_child(Child, [Ch|Chs]) ->
+ [Ch|do_replace_child(Child, Chs)].
+
+remove_child(Child, State) ->
+ Chs = lists:keydelete(Child#child.name, #child.name, State#state.children),
+ State#state{children = Chs}.
+
+%%-----------------------------------------------------------------
+%% Func: init_state/4
+%% Args: SupName = {local, atom()} | {global, atom()} | self
+%% Type = {Strategy, MaxIntensity, Period}
+%% Strategy = one_for_one | one_for_all | simple_one_for_one |
+%% rest_for_one
+%% MaxIntensity = integer()
+%% Period = integer()
+%% Mod :== atom()
+%% Arsg :== term()
+%% Purpose: Check that Type is of correct type (!)
+%% Returns: {ok, #state} | Error
+%%-----------------------------------------------------------------
+init_state(SupName, Type, Mod, Args) ->
+ case catch init_state1(SupName, Type, Mod, Args) of
+ {ok, State} ->
+ {ok, State};
+ Error ->
+ Error
+ end.
+
+init_state1(SupName, {Strategy, MaxIntensity, Period}, Mod, Args) ->
+ validStrategy(Strategy),
+ validIntensity(MaxIntensity),
+ validPeriod(Period),
+ {ok, #state{name = supname(SupName,Mod),
+ strategy = Strategy,
+ intensity = MaxIntensity,
+ period = Period,
+ module = Mod,
+ args = Args}};
+init_state1(_SupName, Type, _, _) ->
+ {invalid_type, Type}.
+
+validStrategy(simple_one_for_one_terminate) -> true;
+validStrategy(simple_one_for_one) -> true;
+validStrategy(one_for_one) -> true;
+validStrategy(one_for_all) -> true;
+validStrategy(rest_for_one) -> true;
+validStrategy(What) -> throw({invalid_strategy, What}).
+
+validIntensity(Max) when is_integer(Max),
+ Max >= 0 -> true;
+validIntensity(What) -> throw({invalid_intensity, What}).
+
+validPeriod(Period) when is_integer(Period),
+ Period > 0 -> true;
+validPeriod(What) -> throw({invalid_period, What}).
+
+supname(self,Mod) -> {self(),Mod};
+supname(N,_) -> N.
+
+%%% ------------------------------------------------------
+%%% Check that the children start specification is valid.
+%%% Shall be a six (6) tuple
+%%% {Name, Func, RestartType, Shutdown, ChildType, Modules}
+%%% where Name is an atom
+%%% Func is {Mod, Fun, Args} == {atom, atom, list}
+%%% RestartType is permanent | temporary | transient
+%%% Shutdown = integer() | infinity | brutal_kill
+%%% ChildType = supervisor | worker
+%%% Modules = [atom()] | dynamic
+%%% Returns: {ok, [#child]} | Error
+%%% ------------------------------------------------------
+
+check_startspec(Children) -> check_startspec(Children, []).
+
+check_startspec([ChildSpec|T], Res) ->
+ case check_childspec(ChildSpec) of
+ {ok, Child} ->
+ case lists:keymember(Child#child.name, #child.name, Res) of
+ true -> {duplicate_child_name, Child#child.name};
+ false -> check_startspec(T, [Child | Res])
+ end;
+ Error -> Error
+ end;
+check_startspec([], Res) ->
+ {ok, lists:reverse(Res)}.
+
+check_childspec({Name, Func, RestartType, Shutdown, ChildType, Mods}) ->
+ catch check_childspec(Name, Func, RestartType, Shutdown, ChildType, Mods);
+check_childspec(X) -> {invalid_child_spec, X}.
+
+check_childspec(Name, Func, RestartType, Shutdown, ChildType, Mods) ->
+ validName(Name),
+ validFunc(Func),
+ validRestartType(RestartType),
+ validChildType(ChildType),
+ validShutdown(Shutdown, ChildType),
+ validMods(Mods),
+ {ok, #child{name = Name, mfa = Func, restart_type = RestartType,
+ shutdown = Shutdown, child_type = ChildType, modules = Mods}}.
+
+validChildType(supervisor) -> true;
+validChildType(worker) -> true;
+validChildType(What) -> throw({invalid_child_type, What}).
+
+validName(_Name) -> true.
+
+validFunc({M, F, A}) when is_atom(M),
+ is_atom(F),
+ is_list(A) -> true;
+validFunc(Func) -> throw({invalid_mfa, Func}).
+
+validRestartType(permanent) -> true;
+validRestartType(temporary) -> true;
+validRestartType(transient) -> true;
+validRestartType(RestartType) -> throw({invalid_restart_type, RestartType}).
+
+validShutdown(Shutdown, _)
+ when is_integer(Shutdown), Shutdown > 0 -> true;
+validShutdown(infinity, supervisor) -> true;
+validShutdown(brutal_kill, _) -> true;
+validShutdown(Shutdown, _) -> throw({invalid_shutdown, Shutdown}).
+
+validMods(dynamic) -> true;
+validMods(Mods) when is_list(Mods) ->
+ lists:foreach(fun(Mod) ->
+ if
+ is_atom(Mod) -> ok;
+ true -> throw({invalid_module, Mod})
+ end
+ end,
+ Mods);
+validMods(Mods) -> throw({invalid_modules, Mods}).
+
+%%% ------------------------------------------------------
+%%% Add a new restart and calculate if the max restart
+%%% intensity has been reached (in that case the supervisor
+%%% shall terminate).
+%%% All restarts accured inside the period amount of seconds
+%%% are kept in the #state.restarts list.
+%%% Returns: {ok, State'} | {terminate, State'}
+%%% ------------------------------------------------------
+
+add_restart(State) ->
+ I = State#state.intensity,
+ P = State#state.period,
+ R = State#state.restarts,
+ Now = erlang:now(),
+ R1 = add_restart([Now|R], Now, P),
+ State1 = State#state{restarts = R1},
+ case length(R1) of
+ CurI when CurI =< I ->
+ {ok, State1};
+ _ ->
+ {terminate, State1}
+ end.
+
+add_restart([R|Restarts], Now, Period) ->
+ case inPeriod(R, Now, Period) of
+ true ->
+ [R|add_restart(Restarts, Now, Period)];
+ _ ->
+ []
+ end;
+add_restart([], _, _) ->
+ [].
+
+inPeriod(Time, Now, Period) ->
+ case difference(Time, Now) of
+ T when T > Period ->
+ false;
+ _ ->
+ true
+ end.
+
+%%
+%% Time = {MegaSecs, Secs, MicroSecs} (NOTE: MicroSecs is ignored)
+%% Calculate the time elapsed in seconds between two timestamps.
+%% If MegaSecs is equal just subtract Secs.
+%% Else calculate the Mega difference and add the Secs difference,
+%% note that Secs difference can be negative, e.g.
+%% {827, 999999, 676} diff {828, 1, 653753} == > 2 secs.
+%%
+difference({TimeM, TimeS, _}, {CurM, CurS, _}) when CurM > TimeM ->
+ ((CurM - TimeM) * 1000000) + (CurS - TimeS);
+difference({_, TimeS, _}, {_, CurS, _}) ->
+ CurS - TimeS.
+
+%%% ------------------------------------------------------
+%%% Error and progress reporting.
+%%% ------------------------------------------------------
+
+report_error(Error, Reason, Child, SupName) ->
+ ErrorMsg = [{supervisor, SupName},
+ {errorContext, Error},
+ {reason, Reason},
+ {offender, extract_child(Child)}],
+ error_logger:error_report(supervisor_report, ErrorMsg).
+
+
+extract_child(Child) ->
+ [{pid, Child#child.pid},
+ {name, Child#child.name},
+ {mfa, Child#child.mfa},
+ {restart_type, Child#child.restart_type},
+ {shutdown, Child#child.shutdown},
+ {child_type, Child#child.child_type}].
+
+report_progress(Child, SupName) ->
+ Progress = [{supervisor, SupName},
+ {started, extract_child(Child)}],
+ error_logger:info_report(progress, Progress).