summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-05-03 16:07:07 +0100
committerSimon MacMullen <simon@rabbitmq.com>2012-05-03 16:07:07 +0100
commitcac2ed51f2007a585e159e6fa6196a7bd9a5056a (patch)
treeecb9959aa777cb3cb247db4dae21969d6c56a9f3
parentfecdf8e522bffd744be255d5f12ec808db0a0fa7 (diff)
downloadrabbitmq-server-cac2ed51f2007a585e159e6fa6196a7bd9a5056a.tar.gz
Switch to monitoring the overall supervisor.
-rw-r--r--src/mirrored_supervisor.erl72
1 files changed, 38 insertions, 34 deletions
diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl
index 4fe93981..135279a9 100644
--- a/src/mirrored_supervisor.erl
+++ b/src/mirrored_supervisor.erl
@@ -226,7 +226,10 @@ count_children(Sup) -> fold(count_children, Sup, fun add_proplists/2).
check_childspecs(Specs) -> ?SUPERVISOR:check_childspecs(Specs).
call(Sup, Msg) ->
- ?GEN_SERVER:call(child(Sup, mirroring), Msg, infinity).
+ ?GEN_SERVER:call(mirroring(Sup), Msg, infinity).
+
+cast(Sup, Msg) ->
+ ?GEN_SERVER:cast(mirroring(Sup), Msg).
find_call(Sup, Id, Msg) ->
Group = call(Sup, group),
@@ -237,7 +240,7 @@ find_call(Sup, Id, Msg) ->
%% immediately after the tx - we can't be 100% here. So we may as
%% well dirty_select.
case mnesia:dirty_select(?TABLE, [{MatchHead, [], ['$1']}]) of
- [Mirror] -> ?GEN_SERVER:call(Mirror, Msg, infinity);
+ [Mirror] -> call(Mirror, Msg);
[] -> {error, not_found}
end.
@@ -246,13 +249,16 @@ fold(FunAtom, Sup, AggFun) ->
lists:foldl(AggFun, [],
[apply(?SUPERVISOR, FunAtom, [D]) ||
M <- ?PG2:get_members(Group),
- D <- [?GEN_SERVER:call(M, delegate_supervisor, infinity)]]).
+ D <- [delegate(M)]]).
child(Sup, Id) ->
[Pid] = [Pid || {Id1, Pid, _, _} <- ?SUPERVISOR:which_children(Sup),
Id1 =:= Id],
Pid.
+delegate(Sup) -> child(Sup, delegate).
+mirroring(Sup) -> child(Sup, mirroring).
+
%%----------------------------------------------------------------------------
start_internal(Group, ChildSpecs) ->
@@ -288,28 +294,29 @@ handle_call({init, Overall}, _From,
initial_childspecs = ChildSpecs}) ->
process_flag(trap_exit, true),
?PG2:create(Group),
- ok = ?PG2:join(Group, self()),
- Rest = ?PG2:get_members(Group) -- [self()],
+ ok = ?PG2:join(Group, Overall),
+ Rest = ?PG2:get_members(Group) -- [Overall],
case Rest of
[] -> {atomic, _} = mnesia:transaction(fun() -> delete_all(Group) end);
_ -> ok
end,
[begin
- ?GEN_SERVER:cast(Pid, {ensure_monitoring, self()}),
+ ?GEN_SERVER:cast(mirroring(Pid), {ensure_monitoring, Overall}),
erlang:monitor(process, Pid)
end || Pid <- Rest],
- Delegate = child(Overall, delegate),
+ Delegate = delegate(Overall),
erlang:monitor(process, Delegate),
State1 = State#state{overall = Overall, delegate = Delegate},
- case errors([maybe_start(Group, Delegate, S) || S <- ChildSpecs]) of
+ case errors([maybe_start(Group, Overall, Delegate, S) || S <- ChildSpecs]) of
[] -> {reply, ok, State1};
Errors -> {stop, {shutdown, Errors}, State1}
end;
handle_call({start_child, ChildSpec}, _From,
- State = #state{delegate = Delegate,
+ State = #state{overall = Overall,
+ delegate = Delegate,
group = Group}) ->
- {reply, case maybe_start(Group, Delegate, ChildSpec) of
+ {reply, case maybe_start(Group, Overall, Delegate, ChildSpec) of
already_in_mnesia -> {error, already_present};
{already_in_mnesia, Pid} -> {error, {already_started, Pid}};
Else -> Else
@@ -322,9 +329,6 @@ handle_call({delete_child, Id}, _From, State = #state{delegate = Delegate,
handle_call({msg, F, A}, _From, State = #state{delegate = Delegate}) ->
{reply, apply(?SUPERVISOR, F, [Delegate | A]), State};
-handle_call(delegate_supervisor, _From, State = #state{delegate = Delegate}) ->
- {reply, Delegate, State};
-
handle_call(group, _From, State = #state{group = Group}) ->
{reply, Group, State};
@@ -357,15 +361,16 @@ handle_info({'DOWN', _Ref, process, Pid, Reason},
{stop, Reason, State};
handle_info({'DOWN', _Ref, process, Pid, _Reason},
- State = #state{delegate = Delegate, group = Group}) ->
+ State = #state{delegate = Delegate, group = Group,
+ overall = O}) ->
%% TODO load balance this
%% No guarantee pg2 will have received the DOWN before us.
- Self = self(),
R = case lists:sort(?PG2:get_members(Group)) -- [Pid] of
- [Self | _] -> {atomic, ChildSpecs} =
- mnesia:transaction(fun() -> update_all(Pid) end),
- [start(Delegate, ChildSpec) || ChildSpec <- ChildSpecs];
- _ -> []
+ [O | _] -> {atomic, ChildSpecs} =
+ mnesia:transaction(
+ fun() -> update_all(O, Pid) end),
+ [start(Delegate, ChildSpec) || ChildSpec <- ChildSpecs];
+ _ -> []
end,
case errors(R) of
[] -> {noreply, State};
@@ -384,13 +389,12 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
tell_all_peers_to_die(Group, Reason) ->
- [?GEN_SERVER:cast(P, {die, Reason}) ||
+ [cast(P, {die, Reason}) ||
P <- ?PG2:get_members(Group) -- [self()]].
-maybe_start(Group, Delegate, ChildSpec) ->
- case mnesia:transaction(fun() ->
- check_start(Group, Delegate, ChildSpec)
- end) of
+maybe_start(Group, Overall, Delegate, ChildSpec) ->
+ case mnesia:transaction(
+ fun() -> check_start(Group, Overall, Delegate, ChildSpec) end) of
{atomic, start} -> start(Delegate, ChildSpec);
{atomic, undefined} -> already_in_mnesia;
{atomic, Pid} -> {already_in_mnesia, Pid};
@@ -398,16 +402,16 @@ maybe_start(Group, Delegate, ChildSpec) ->
{aborted, E} -> {error, E}
end.
-check_start(Group, Delegate, ChildSpec) ->
+check_start(Group, Overall, Delegate, ChildSpec) ->
case mnesia:wread({?TABLE, {Group, id(ChildSpec)}}) of
- [] -> write(Group, ChildSpec),
+ [] -> write(Group, Overall, ChildSpec),
start;
[S] -> #mirrored_sup_childspec{key = {Group, Id},
mirroring_pid = Pid} = S,
- case self() of
+ case Overall of
Pid -> child(Delegate, Id);
_ -> case supervisor(Pid) of
- dead -> write(Group, ChildSpec),
+ dead -> write(Group, Overall, ChildSpec),
start;
Delegate0 -> child(Delegate0, Id)
end
@@ -417,12 +421,12 @@ check_start(Group, Delegate, ChildSpec) ->
supervisor(Pid) ->
with_exit_handler(
fun() -> dead end,
- fun() -> gen_server:call(Pid, delegate_supervisor, infinity) end).
+ fun() -> delegate(Pid) end).
-write(Group, ChildSpec) ->
+write(Group, Overall, ChildSpec) ->
ok = mnesia:write(
#mirrored_sup_childspec{key = {Group, id(ChildSpec)},
- mirroring_pid = self(),
+ mirroring_pid = Overall,
childspec = ChildSpec}),
ChildSpec.
@@ -448,12 +452,12 @@ check_stop(Group, Delegate, Id) ->
id({Id, _, _, _, _, _}) -> Id.
-update_all(OldPid) ->
- MatchHead = #mirrored_sup_childspec{mirroring_pid = OldPid,
+update_all(Overall, OldOverall) ->
+ MatchHead = #mirrored_sup_childspec{mirroring_pid = OldOverall,
key = '$1',
childspec = '$2',
_ = '_'},
- [write(Group, C) ||
+ [write(Group, Overall, C) ||
[{Group, _Id}, C] <- mnesia:select(?TABLE, [{MatchHead, [], ['$$']}])].
delete_all(Group) ->