diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-02-06 15:36:06 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-02-06 15:36:06 +0000 |
commit | 86f42d904a8a9b6cfdc65379e471d48f5ad714f6 (patch) | |
tree | fe0209001c40490e38ab51db84de8aa7c5b9c3ac | |
parent | 7456792ec74cdf72afa7ddc54935b22e933f4316 (diff) | |
download | rabbitmq-server-86f42d904a8a9b6cfdc65379e471d48f5ad714f6.tar.gz |
Allow specifying a tx-fun.
-rw-r--r-- | src/mirrored_supervisor.erl | 103 |
1 files changed, 59 insertions, 44 deletions
diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl index d5f51db0..6c5c09df 100644 --- a/src/mirrored_supervisor.erl +++ b/src/mirrored_supervisor.erl @@ -57,10 +57,10 @@ %% This is basically the same as for supervisor, except that: %% %% 1) start_link(Module, Args) becomes -%% start_link(Group, Module, Args). +%% start_link(Group, TxFun, Module, Args). %% %% 2) start_link({local, Name}, Module, Args) becomes -%% start_link({local, Name}, Group, Module, Args). +%% start_link({local, Name}, Group, TxFun, Module, Args). %% %% 3) start_link({global, Name}, Module, Args) is not available. %% @@ -115,7 +115,7 @@ {attributes, record_info(fields, mirrored_sup_childspec)}]}). -define(TABLE_MATCH, {match, #mirrored_sup_childspec{ _ = '_' }}). --export([start_link/3, start_link/4, +-export([start_link/4, start_link/5, start_child/2, restart_child/2, delete_child/2, terminate_child/2, which_children/1, count_children/1, check_childspecs/1]). @@ -126,7 +126,7 @@ -export([init/1, handle_call/3, handle_info/2, terminate/2, code_change/3, handle_cast/2]). --export([start_internal/2]). +-export([start_internal/3]). -export([create_tables/0, table_definitions/0]). -record(mirrored_sup_childspec, {key, mirroring_pid, childspec}). @@ -134,6 +134,7 @@ -record(state, {overall, delegate, group, + tx_fun, initial_childspecs}). %%---------------------------------------------------------------------------- @@ -160,19 +161,25 @@ -type group_name() :: any(). --spec start_link(GroupName, Module, Args) -> startlink_ret() when +-type(tx_fun() :: fun((fun(() -> any())) -> any())). + +-spec start_link(GroupName, TxFun, Module, Args) -> startlink_ret() when GroupName :: group_name(), + TxFun :: tx_fun(), Module :: module(), Args :: term(). --spec start_link(SupName, GroupName, Module, Args) -> startlink_ret() when +-spec start_link(SupName, GroupName, TxFun, Module, Args) -> + startlink_ret() when SupName :: supervisor2:sup_name(), GroupName :: group_name(), + TxFun :: tx_fun(), Module :: module(), Args :: term(). --spec start_internal(Group, ChildSpecs) -> Result when +-spec start_internal(Group, TxFun, ChildSpecs) -> Result when Group :: group_name(), + TxFun :: tx_fun(), ChildSpecs :: [supervisor2:child_spec()], Result :: {'ok', pid()} | {'error', term()}. @@ -190,18 +197,18 @@ behaviour_info(_Other) -> undefined. %%---------------------------------------------------------------------------- -start_link(Group, Mod, Args) -> - start_link0([], Group, init(Mod, Args)). +start_link(Group, TxFun, Mod, Args) -> + start_link0([], Group, TxFun, init(Mod, Args)). -start_link({local, SupName}, Group, Mod, Args) -> - start_link0([{local, SupName}], Group, init(Mod, Args)); +start_link({local, SupName}, Group, TxFun, Mod, Args) -> + start_link0([{local, SupName}], Group, TxFun, init(Mod, Args)); -start_link({global, _SupName}, _Group, _Mod, _Args) -> +start_link({global, _SupName}, _Group, _TxFun, _Mod, _Args) -> erlang:error(badarg). -start_link0(Prefix, Group, Init) -> +start_link0(Prefix, Group, TxFun, Init) -> case apply(?SUPERVISOR, start_link, - Prefix ++ [?MODULE, {overall, Group, Init}]) of + Prefix ++ [?MODULE, {overall, Group, TxFun, Init}]) of {ok, Pid} -> case catch call(Pid, {init, Pid}) of ok -> {ok, Pid}; E -> E @@ -257,14 +264,14 @@ mirroring(Sup) -> child(Sup, mirroring). %%---------------------------------------------------------------------------- -start_internal(Group, ChildSpecs) -> - ?GEN_SERVER:start_link(?MODULE, {mirroring, Group, ChildSpecs}, +start_internal(Group, TxFun, ChildSpecs) -> + ?GEN_SERVER:start_link(?MODULE, {mirroring, Group, TxFun, ChildSpecs}, [{timeout, infinity}]). %%---------------------------------------------------------------------------- -init({overall, _Group, ignore}) -> ignore; -init({overall, Group, {ok, {Restart, ChildSpecs}}}) -> +init({overall, _Group, _TxFun, ignore}) -> ignore; +init({overall, Group, TxFun, {ok, {Restart, ChildSpecs}}}) -> %% Important: Delegate MUST start before Mirroring so that when we %% shut down from above it shuts down last, so Mirroring does not %% see it die. @@ -273,27 +280,30 @@ init({overall, Group, {ok, {Restart, ChildSpecs}}}) -> {ok, {{one_for_all, 0, 1}, [{delegate, {?SUPERVISOR, start_link, [?MODULE, {delegate, Restart}]}, temporary, 16#ffffffff, supervisor, [?SUPERVISOR]}, - {mirroring, {?MODULE, start_internal, [Group, ChildSpecs]}, + {mirroring, {?MODULE, start_internal, [Group, TxFun, ChildSpecs]}, permanent, 16#ffffffff, worker, [?MODULE]}]}}; init({delegate, Restart}) -> {ok, {Restart, []}}; -init({mirroring, Group, ChildSpecs}) -> - {ok, #state{group = Group, initial_childspecs = ChildSpecs}}. +init({mirroring, Group, TxFun, ChildSpecs}) -> + {ok, #state{group = Group, + tx_fun = TxFun, + initial_childspecs = ChildSpecs}}. handle_call({init, Overall}, _From, State = #state{overall = undefined, delegate = undefined, group = Group, + tx_fun = TxFun, initial_childspecs = ChildSpecs}) -> process_flag(trap_exit, true), ?PG2:create(Group), ok = ?PG2:join(Group, Overall), Rest = ?PG2:get_members(Group) -- [Overall], case Rest of - [] -> {atomic, _} = mnesia:transaction(fun() -> delete_all(Group) end); + [] -> TxFun(fun() -> delete_all(Group) end); _ -> ok end, [begin @@ -303,7 +313,8 @@ handle_call({init, Overall}, _From, Delegate = delegate(Overall), erlang:monitor(process, Delegate), State1 = State#state{overall = Overall, delegate = Delegate}, - case errors([maybe_start(Group, Overall, Delegate, S) || S <- ChildSpecs]) of + case errors([maybe_start(Group, TxFun, Overall, Delegate, S) + || S <- ChildSpecs]) of [] -> {reply, ok, State1}; Errors -> {stop, {shutdown, Errors}, State1} end; @@ -311,16 +322,18 @@ handle_call({init, Overall}, _From, handle_call({start_child, ChildSpec}, _From, State = #state{overall = Overall, delegate = Delegate, - group = Group}) -> - {reply, case maybe_start(Group, Overall, Delegate, ChildSpec) of + group = Group, + tx_fun = TxFun}) -> + {reply, case maybe_start(Group, TxFun, Overall, Delegate, ChildSpec) of already_in_mnesia -> {error, already_present}; {already_in_mnesia, Pid} -> {error, {already_started, Pid}}; Else -> Else end, State}; handle_call({delete_child, Id}, _From, State = #state{delegate = Delegate, - group = Group}) -> - {reply, stop(Group, Delegate, Id), State}; + group = Group, + tx_fun = TxFun}) -> + {reply, stop(Group, TxFun, Delegate, Id), State}; handle_call({msg, F, A}, _From, State = #state{delegate = Delegate}) -> {reply, apply(?SUPERVISOR, F, [Delegate | A]), State}; @@ -357,14 +370,15 @@ handle_info({'DOWN', _Ref, process, Pid, Reason}, {stop, Reason, State}; handle_info({'DOWN', _Ref, process, Pid, _Reason}, - State = #state{delegate = Delegate, group = Group, - overall = O}) -> + State = #state{delegate = Delegate, + group = Group, + tx_fun = TxFun, + overall = O}) -> %% TODO load balance this %% No guarantee pg2 will have received the DOWN before us. R = case lists:sort(?PG2:get_members(Group)) -- [Pid] of - [O | _] -> {atomic, ChildSpecs} = - mnesia:transaction( - fun() -> update_all(O, Pid) end), + [O | _] -> ChildSpecs = + TxFun(fun() -> update_all(O, Pid) end), [start(Delegate, ChildSpec) || ChildSpec <- ChildSpecs]; _ -> [] end, @@ -387,14 +401,14 @@ code_change(_OldVsn, State, _Extra) -> tell_all_peers_to_die(Group, Reason) -> [cast(P, {die, Reason}) || P <- ?PG2:get_members(Group) -- [self()]]. -maybe_start(Group, Overall, Delegate, ChildSpec) -> - case mnesia:transaction( - fun() -> check_start(Group, Overall, Delegate, ChildSpec) end) of - {atomic, start} -> start(Delegate, ChildSpec); - {atomic, undefined} -> already_in_mnesia; - {atomic, Pid} -> {already_in_mnesia, Pid}; +maybe_start(Group, TxFun, Overall, Delegate, ChildSpec) -> + try TxFun(fun() -> check_start(Group, Overall, Delegate, ChildSpec) end) of + start -> start(Delegate, ChildSpec); + undefined -> already_in_mnesia; + Pid -> {already_in_mnesia, Pid} + catch %% If we are torn down while in the transaction... - {aborted, E} -> {error, E} + {error, E} -> {error, E} end. check_start(Group, Overall, Delegate, ChildSpec) -> @@ -429,11 +443,12 @@ delete(Group, Id) -> start(Delegate, ChildSpec) -> apply(?SUPERVISOR, start_child, [Delegate, ChildSpec]). -stop(Group, Delegate, Id) -> - case mnesia:transaction(fun() -> check_stop(Group, Delegate, Id) end) of - {atomic, deleted} -> apply(?SUPERVISOR, delete_child, [Delegate, Id]); - {atomic, running} -> {error, running}; - {aborted, E} -> {error, E} +stop(Group, TxFun, Delegate, Id) -> + try TxFun(fun() -> check_stop(Group, Delegate, Id) end) of + deleted -> apply(?SUPERVISOR, delete_child, [Delegate, Id]); + running -> {error, running} + catch + {error, E} -> {error, E} end. check_stop(Group, Delegate, Id) -> |