diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2012-05-03 16:07:07 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2012-05-03 16:07:07 +0100 |
commit | cac2ed51f2007a585e159e6fa6196a7bd9a5056a (patch) | |
tree | ecb9959aa777cb3cb247db4dae21969d6c56a9f3 | |
parent | fecdf8e522bffd744be255d5f12ec808db0a0fa7 (diff) | |
download | rabbitmq-server-cac2ed51f2007a585e159e6fa6196a7bd9a5056a.tar.gz |
Switch to monitoring the overall supervisor.
-rw-r--r-- | src/mirrored_supervisor.erl | 72 |
1 files changed, 38 insertions, 34 deletions
diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl index 4fe93981..135279a9 100644 --- a/src/mirrored_supervisor.erl +++ b/src/mirrored_supervisor.erl @@ -226,7 +226,10 @@ count_children(Sup) -> fold(count_children, Sup, fun add_proplists/2). check_childspecs(Specs) -> ?SUPERVISOR:check_childspecs(Specs). call(Sup, Msg) -> - ?GEN_SERVER:call(child(Sup, mirroring), Msg, infinity). + ?GEN_SERVER:call(mirroring(Sup), Msg, infinity). + +cast(Sup, Msg) -> + ?GEN_SERVER:cast(mirroring(Sup), Msg). find_call(Sup, Id, Msg) -> Group = call(Sup, group), @@ -237,7 +240,7 @@ find_call(Sup, Id, Msg) -> %% immediately after the tx - we can't be 100% here. So we may as %% well dirty_select. case mnesia:dirty_select(?TABLE, [{MatchHead, [], ['$1']}]) of - [Mirror] -> ?GEN_SERVER:call(Mirror, Msg, infinity); + [Mirror] -> call(Mirror, Msg); [] -> {error, not_found} end. @@ -246,13 +249,16 @@ fold(FunAtom, Sup, AggFun) -> lists:foldl(AggFun, [], [apply(?SUPERVISOR, FunAtom, [D]) || M <- ?PG2:get_members(Group), - D <- [?GEN_SERVER:call(M, delegate_supervisor, infinity)]]). + D <- [delegate(M)]]). child(Sup, Id) -> [Pid] = [Pid || {Id1, Pid, _, _} <- ?SUPERVISOR:which_children(Sup), Id1 =:= Id], Pid. +delegate(Sup) -> child(Sup, delegate). +mirroring(Sup) -> child(Sup, mirroring). + %%---------------------------------------------------------------------------- start_internal(Group, ChildSpecs) -> @@ -288,28 +294,29 @@ handle_call({init, Overall}, _From, initial_childspecs = ChildSpecs}) -> process_flag(trap_exit, true), ?PG2:create(Group), - ok = ?PG2:join(Group, self()), - Rest = ?PG2:get_members(Group) -- [self()], + ok = ?PG2:join(Group, Overall), + Rest = ?PG2:get_members(Group) -- [Overall], case Rest of [] -> {atomic, _} = mnesia:transaction(fun() -> delete_all(Group) end); _ -> ok end, [begin - ?GEN_SERVER:cast(Pid, {ensure_monitoring, self()}), + ?GEN_SERVER:cast(mirroring(Pid), {ensure_monitoring, Overall}), erlang:monitor(process, Pid) end || Pid <- Rest], - Delegate = child(Overall, delegate), + Delegate = delegate(Overall), erlang:monitor(process, Delegate), State1 = State#state{overall = Overall, delegate = Delegate}, - case errors([maybe_start(Group, Delegate, S) || S <- ChildSpecs]) of + case errors([maybe_start(Group, Overall, Delegate, S) || S <- ChildSpecs]) of [] -> {reply, ok, State1}; Errors -> {stop, {shutdown, Errors}, State1} end; handle_call({start_child, ChildSpec}, _From, - State = #state{delegate = Delegate, + State = #state{overall = Overall, + delegate = Delegate, group = Group}) -> - {reply, case maybe_start(Group, Delegate, ChildSpec) of + {reply, case maybe_start(Group, Overall, Delegate, ChildSpec) of already_in_mnesia -> {error, already_present}; {already_in_mnesia, Pid} -> {error, {already_started, Pid}}; Else -> Else @@ -322,9 +329,6 @@ handle_call({delete_child, Id}, _From, State = #state{delegate = Delegate, handle_call({msg, F, A}, _From, State = #state{delegate = Delegate}) -> {reply, apply(?SUPERVISOR, F, [Delegate | A]), State}; -handle_call(delegate_supervisor, _From, State = #state{delegate = Delegate}) -> - {reply, Delegate, State}; - handle_call(group, _From, State = #state{group = Group}) -> {reply, Group, State}; @@ -357,15 +361,16 @@ handle_info({'DOWN', _Ref, process, Pid, Reason}, {stop, Reason, State}; handle_info({'DOWN', _Ref, process, Pid, _Reason}, - State = #state{delegate = Delegate, group = Group}) -> + State = #state{delegate = Delegate, group = Group, + overall = O}) -> %% TODO load balance this %% No guarantee pg2 will have received the DOWN before us. - Self = self(), R = case lists:sort(?PG2:get_members(Group)) -- [Pid] of - [Self | _] -> {atomic, ChildSpecs} = - mnesia:transaction(fun() -> update_all(Pid) end), - [start(Delegate, ChildSpec) || ChildSpec <- ChildSpecs]; - _ -> [] + [O | _] -> {atomic, ChildSpecs} = + mnesia:transaction( + fun() -> update_all(O, Pid) end), + [start(Delegate, ChildSpec) || ChildSpec <- ChildSpecs]; + _ -> [] end, case errors(R) of [] -> {noreply, State}; @@ -384,13 +389,12 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- tell_all_peers_to_die(Group, Reason) -> - [?GEN_SERVER:cast(P, {die, Reason}) || + [cast(P, {die, Reason}) || P <- ?PG2:get_members(Group) -- [self()]]. -maybe_start(Group, Delegate, ChildSpec) -> - case mnesia:transaction(fun() -> - check_start(Group, Delegate, ChildSpec) - end) of +maybe_start(Group, Overall, Delegate, ChildSpec) -> + case mnesia:transaction( + fun() -> check_start(Group, Overall, Delegate, ChildSpec) end) of {atomic, start} -> start(Delegate, ChildSpec); {atomic, undefined} -> already_in_mnesia; {atomic, Pid} -> {already_in_mnesia, Pid}; @@ -398,16 +402,16 @@ maybe_start(Group, Delegate, ChildSpec) -> {aborted, E} -> {error, E} end. -check_start(Group, Delegate, ChildSpec) -> +check_start(Group, Overall, Delegate, ChildSpec) -> case mnesia:wread({?TABLE, {Group, id(ChildSpec)}}) of - [] -> write(Group, ChildSpec), + [] -> write(Group, Overall, ChildSpec), start; [S] -> #mirrored_sup_childspec{key = {Group, Id}, mirroring_pid = Pid} = S, - case self() of + case Overall of Pid -> child(Delegate, Id); _ -> case supervisor(Pid) of - dead -> write(Group, ChildSpec), + dead -> write(Group, Overall, ChildSpec), start; Delegate0 -> child(Delegate0, Id) end @@ -417,12 +421,12 @@ check_start(Group, Delegate, ChildSpec) -> supervisor(Pid) -> with_exit_handler( fun() -> dead end, - fun() -> gen_server:call(Pid, delegate_supervisor, infinity) end). + fun() -> delegate(Pid) end). -write(Group, ChildSpec) -> +write(Group, Overall, ChildSpec) -> ok = mnesia:write( #mirrored_sup_childspec{key = {Group, id(ChildSpec)}, - mirroring_pid = self(), + mirroring_pid = Overall, childspec = ChildSpec}), ChildSpec. @@ -448,12 +452,12 @@ check_stop(Group, Delegate, Id) -> id({Id, _, _, _, _, _}) -> Id. -update_all(OldPid) -> - MatchHead = #mirrored_sup_childspec{mirroring_pid = OldPid, +update_all(Overall, OldOverall) -> + MatchHead = #mirrored_sup_childspec{mirroring_pid = OldOverall, key = '$1', childspec = '$2', _ = '_'}, - [write(Group, C) || + [write(Group, Overall, C) || [{Group, _Id}, C] <- mnesia:select(?TABLE, [{MatchHead, [], ['$$']}])]. delete_all(Group) -> |