summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-02-06 15:36:06 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-02-06 15:36:06 +0000
commit86f42d904a8a9b6cfdc65379e471d48f5ad714f6 (patch)
treefe0209001c40490e38ab51db84de8aa7c5b9c3ac
parent7456792ec74cdf72afa7ddc54935b22e933f4316 (diff)
downloadrabbitmq-server-86f42d904a8a9b6cfdc65379e471d48f5ad714f6.tar.gz
Allow specifying a tx-fun.
-rw-r--r--src/mirrored_supervisor.erl103
1 files changed, 59 insertions, 44 deletions
diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl
index d5f51db0..6c5c09df 100644
--- a/src/mirrored_supervisor.erl
+++ b/src/mirrored_supervisor.erl
@@ -57,10 +57,10 @@
%% This is basically the same as for supervisor, except that:
%%
%% 1) start_link(Module, Args) becomes
-%% start_link(Group, Module, Args).
+%% start_link(Group, TxFun, Module, Args).
%%
%% 2) start_link({local, Name}, Module, Args) becomes
-%% start_link({local, Name}, Group, Module, Args).
+%% start_link({local, Name}, Group, TxFun, Module, Args).
%%
%% 3) start_link({global, Name}, Module, Args) is not available.
%%
@@ -115,7 +115,7 @@
{attributes, record_info(fields, mirrored_sup_childspec)}]}).
-define(TABLE_MATCH, {match, #mirrored_sup_childspec{ _ = '_' }}).
--export([start_link/3, start_link/4,
+-export([start_link/4, start_link/5,
start_child/2, restart_child/2,
delete_child/2, terminate_child/2,
which_children/1, count_children/1, check_childspecs/1]).
@@ -126,7 +126,7 @@
-export([init/1, handle_call/3, handle_info/2, terminate/2, code_change/3,
handle_cast/2]).
--export([start_internal/2]).
+-export([start_internal/3]).
-export([create_tables/0, table_definitions/0]).
-record(mirrored_sup_childspec, {key, mirroring_pid, childspec}).
@@ -134,6 +134,7 @@
-record(state, {overall,
delegate,
group,
+ tx_fun,
initial_childspecs}).
%%----------------------------------------------------------------------------
@@ -160,19 +161,25 @@
-type group_name() :: any().
--spec start_link(GroupName, Module, Args) -> startlink_ret() when
+-type(tx_fun() :: fun((fun(() -> any())) -> any())).
+
+-spec start_link(GroupName, TxFun, Module, Args) -> startlink_ret() when
GroupName :: group_name(),
+ TxFun :: tx_fun(),
Module :: module(),
Args :: term().
--spec start_link(SupName, GroupName, Module, Args) -> startlink_ret() when
+-spec start_link(SupName, GroupName, TxFun, Module, Args) ->
+ startlink_ret() when
SupName :: supervisor2:sup_name(),
GroupName :: group_name(),
+ TxFun :: tx_fun(),
Module :: module(),
Args :: term().
--spec start_internal(Group, ChildSpecs) -> Result when
+-spec start_internal(Group, TxFun, ChildSpecs) -> Result when
Group :: group_name(),
+ TxFun :: tx_fun(),
ChildSpecs :: [supervisor2:child_spec()],
Result :: {'ok', pid()} | {'error', term()}.
@@ -190,18 +197,18 @@ behaviour_info(_Other) -> undefined.
%%----------------------------------------------------------------------------
-start_link(Group, Mod, Args) ->
- start_link0([], Group, init(Mod, Args)).
+start_link(Group, TxFun, Mod, Args) ->
+ start_link0([], Group, TxFun, init(Mod, Args)).
-start_link({local, SupName}, Group, Mod, Args) ->
- start_link0([{local, SupName}], Group, init(Mod, Args));
+start_link({local, SupName}, Group, TxFun, Mod, Args) ->
+ start_link0([{local, SupName}], Group, TxFun, init(Mod, Args));
-start_link({global, _SupName}, _Group, _Mod, _Args) ->
+start_link({global, _SupName}, _Group, _TxFun, _Mod, _Args) ->
erlang:error(badarg).
-start_link0(Prefix, Group, Init) ->
+start_link0(Prefix, Group, TxFun, Init) ->
case apply(?SUPERVISOR, start_link,
- Prefix ++ [?MODULE, {overall, Group, Init}]) of
+ Prefix ++ [?MODULE, {overall, Group, TxFun, Init}]) of
{ok, Pid} -> case catch call(Pid, {init, Pid}) of
ok -> {ok, Pid};
E -> E
@@ -257,14 +264,14 @@ mirroring(Sup) -> child(Sup, mirroring).
%%----------------------------------------------------------------------------
-start_internal(Group, ChildSpecs) ->
- ?GEN_SERVER:start_link(?MODULE, {mirroring, Group, ChildSpecs},
+start_internal(Group, TxFun, ChildSpecs) ->
+ ?GEN_SERVER:start_link(?MODULE, {mirroring, Group, TxFun, ChildSpecs},
[{timeout, infinity}]).
%%----------------------------------------------------------------------------
-init({overall, _Group, ignore}) -> ignore;
-init({overall, Group, {ok, {Restart, ChildSpecs}}}) ->
+init({overall, _Group, _TxFun, ignore}) -> ignore;
+init({overall, Group, TxFun, {ok, {Restart, ChildSpecs}}}) ->
%% Important: Delegate MUST start before Mirroring so that when we
%% shut down from above it shuts down last, so Mirroring does not
%% see it die.
@@ -273,27 +280,30 @@ init({overall, Group, {ok, {Restart, ChildSpecs}}}) ->
{ok, {{one_for_all, 0, 1},
[{delegate, {?SUPERVISOR, start_link, [?MODULE, {delegate, Restart}]},
temporary, 16#ffffffff, supervisor, [?SUPERVISOR]},
- {mirroring, {?MODULE, start_internal, [Group, ChildSpecs]},
+ {mirroring, {?MODULE, start_internal, [Group, TxFun, ChildSpecs]},
permanent, 16#ffffffff, worker, [?MODULE]}]}};
init({delegate, Restart}) ->
{ok, {Restart, []}};
-init({mirroring, Group, ChildSpecs}) ->
- {ok, #state{group = Group, initial_childspecs = ChildSpecs}}.
+init({mirroring, Group, TxFun, ChildSpecs}) ->
+ {ok, #state{group = Group,
+ tx_fun = TxFun,
+ initial_childspecs = ChildSpecs}}.
handle_call({init, Overall}, _From,
State = #state{overall = undefined,
delegate = undefined,
group = Group,
+ tx_fun = TxFun,
initial_childspecs = ChildSpecs}) ->
process_flag(trap_exit, true),
?PG2:create(Group),
ok = ?PG2:join(Group, Overall),
Rest = ?PG2:get_members(Group) -- [Overall],
case Rest of
- [] -> {atomic, _} = mnesia:transaction(fun() -> delete_all(Group) end);
+ [] -> TxFun(fun() -> delete_all(Group) end);
_ -> ok
end,
[begin
@@ -303,7 +313,8 @@ handle_call({init, Overall}, _From,
Delegate = delegate(Overall),
erlang:monitor(process, Delegate),
State1 = State#state{overall = Overall, delegate = Delegate},
- case errors([maybe_start(Group, Overall, Delegate, S) || S <- ChildSpecs]) of
+ case errors([maybe_start(Group, TxFun, Overall, Delegate, S)
+ || S <- ChildSpecs]) of
[] -> {reply, ok, State1};
Errors -> {stop, {shutdown, Errors}, State1}
end;
@@ -311,16 +322,18 @@ handle_call({init, Overall}, _From,
handle_call({start_child, ChildSpec}, _From,
State = #state{overall = Overall,
delegate = Delegate,
- group = Group}) ->
- {reply, case maybe_start(Group, Overall, Delegate, ChildSpec) of
+ group = Group,
+ tx_fun = TxFun}) ->
+ {reply, case maybe_start(Group, TxFun, Overall, Delegate, ChildSpec) of
already_in_mnesia -> {error, already_present};
{already_in_mnesia, Pid} -> {error, {already_started, Pid}};
Else -> Else
end, State};
handle_call({delete_child, Id}, _From, State = #state{delegate = Delegate,
- group = Group}) ->
- {reply, stop(Group, Delegate, Id), State};
+ group = Group,
+ tx_fun = TxFun}) ->
+ {reply, stop(Group, TxFun, Delegate, Id), State};
handle_call({msg, F, A}, _From, State = #state{delegate = Delegate}) ->
{reply, apply(?SUPERVISOR, F, [Delegate | A]), State};
@@ -357,14 +370,15 @@ handle_info({'DOWN', _Ref, process, Pid, Reason},
{stop, Reason, State};
handle_info({'DOWN', _Ref, process, Pid, _Reason},
- State = #state{delegate = Delegate, group = Group,
- overall = O}) ->
+ State = #state{delegate = Delegate,
+ group = Group,
+ tx_fun = TxFun,
+ overall = O}) ->
%% TODO load balance this
%% No guarantee pg2 will have received the DOWN before us.
R = case lists:sort(?PG2:get_members(Group)) -- [Pid] of
- [O | _] -> {atomic, ChildSpecs} =
- mnesia:transaction(
- fun() -> update_all(O, Pid) end),
+ [O | _] -> ChildSpecs =
+ TxFun(fun() -> update_all(O, Pid) end),
[start(Delegate, ChildSpec) || ChildSpec <- ChildSpecs];
_ -> []
end,
@@ -387,14 +401,14 @@ code_change(_OldVsn, State, _Extra) ->
tell_all_peers_to_die(Group, Reason) ->
[cast(P, {die, Reason}) || P <- ?PG2:get_members(Group) -- [self()]].
-maybe_start(Group, Overall, Delegate, ChildSpec) ->
- case mnesia:transaction(
- fun() -> check_start(Group, Overall, Delegate, ChildSpec) end) of
- {atomic, start} -> start(Delegate, ChildSpec);
- {atomic, undefined} -> already_in_mnesia;
- {atomic, Pid} -> {already_in_mnesia, Pid};
+maybe_start(Group, TxFun, Overall, Delegate, ChildSpec) ->
+ try TxFun(fun() -> check_start(Group, Overall, Delegate, ChildSpec) end) of
+ start -> start(Delegate, ChildSpec);
+ undefined -> already_in_mnesia;
+ Pid -> {already_in_mnesia, Pid}
+ catch
%% If we are torn down while in the transaction...
- {aborted, E} -> {error, E}
+ {error, E} -> {error, E}
end.
check_start(Group, Overall, Delegate, ChildSpec) ->
@@ -429,11 +443,12 @@ delete(Group, Id) ->
start(Delegate, ChildSpec) ->
apply(?SUPERVISOR, start_child, [Delegate, ChildSpec]).
-stop(Group, Delegate, Id) ->
- case mnesia:transaction(fun() -> check_stop(Group, Delegate, Id) end) of
- {atomic, deleted} -> apply(?SUPERVISOR, delete_child, [Delegate, Id]);
- {atomic, running} -> {error, running};
- {aborted, E} -> {error, E}
+stop(Group, TxFun, Delegate, Id) ->
+ try TxFun(fun() -> check_stop(Group, Delegate, Id) end) of
+ deleted -> apply(?SUPERVISOR, delete_child, [Delegate, Id]);
+ running -> {error, running}
+ catch
+ {error, E} -> {error, E}
end.
check_stop(Group, Delegate, Id) ->