diff options
author | Tim Watson <tim@rabbitmq.com> | 2012-10-17 16:25:40 +0100 |
---|---|---|
committer | Tim Watson <tim@rabbitmq.com> | 2012-10-17 16:25:40 +0100 |
commit | 0a744726491bec6d2c0d0c75434bcc2c3c264bcd (patch) | |
tree | 555e31eff31ff2c9feda108d312858820ac707d4 | |
parent | 80d879c17eb4e46cf94d158cc8c86787211a4cb0 (diff) | |
download | rabbitmq-server-0a744726491bec6d2c0d0c75434bcc2c3c264bcd.tar.gz |
do not couple gm with rabbit_misc
-rw-r--r-- | src/gm.erl | 115 | ||||
-rw-r--r-- | src/gm_soak_test.erl | 4 | ||||
-rw-r--r-- | src/gm_speed_test.erl | 3 | ||||
-rw-r--r-- | src/gm_tests.erl | 10 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_coordinator.erl | 5 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 3 |
6 files changed, 81 insertions, 59 deletions
@@ -376,7 +376,7 @@ -behaviour(gen_server2). --export([create_tables/0, start_link/3, leave/1, broadcast/2, +-export([create_tables/0, start_link/4, leave/1, broadcast/2, confirmed_broadcast/2, info/1, forget_group/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, @@ -408,7 +408,8 @@ callback_args, confirms, broadcast_buffer, - broadcast_timer + broadcast_timer, + txn_executor }). -record(gm_group, { name, version, members }). @@ -428,9 +429,10 @@ -export_type([group_name/0]). -type(group_name() :: any()). +-type(txn_fun() :: fun((fun(() -> any())) -> any())). -spec(create_tables/0 :: () -> 'ok' | {'aborted', any()}). --spec(start_link/3 :: (group_name(), atom(), any()) -> +-spec(start_link/4 :: (group_name(), atom(), any(), txn_fun()) -> rabbit_types:ok_pid_or_error()). -spec(leave/1 :: (pid()) -> 'ok'). -spec(broadcast/2 :: (pid(), any()) -> 'ok'). @@ -507,8 +509,8 @@ table_definitions() -> {Name, Attributes} = ?TABLE, [{Name, [?TABLE_MATCH | Attributes]}]. -start_link(GroupName, Module, Args) -> - gen_server2:start_link(?MODULE, [GroupName, Module, Args], []). +start_link(GroupName, Module, TxnFun, Args) -> + gen_server2:start_link(?MODULE, [GroupName, Module, TxnFun, Args], []). leave(Server) -> gen_server2:cast(Server, leave). @@ -523,13 +525,13 @@ info(Server) -> gen_server2:call(Server, info, infinity). forget_group(GroupName) -> - ok = rabbit_misc:execute_mnesia_transaction( - fun () -> - mnesia:delete({?GROUP_TABLE, GroupName}) - end), + {atomic, ok} = mnesia:sync_transaction( + fun () -> + mnesia:delete({?GROUP_TABLE, GroupName}) + end), ok. -init([GroupName, Module, Args]) -> +init([GroupName, Module, TxnFun, Args]) -> {MegaSecs, Secs, MicroSecs} = now(), random:seed(MegaSecs, Secs, MicroSecs), Self = make_member(GroupName), @@ -545,7 +547,8 @@ init([GroupName, Module, Args]) -> callback_args = Args, confirms = queue:new(), broadcast_buffer = [], - broadcast_timer = undefined }, hibernate, + broadcast_timer = undefined, + txn_executor = TxnFun }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -585,7 +588,8 @@ handle_call({add_on_right, NewMember}, _From, view = View, members_state = MembersState, module = Module, - callback_args = Args }) -> + callback_args = Args, + txn_executor = TxnFun }) -> {MembersState1, Group} = record_new_member_in_group( GroupName, Self, NewMember, @@ -596,7 +600,7 @@ handle_call({add_on_right, NewMember}, _From, {catchup, Self, prepare_members_state(MembersState1)}), MembersState1 - end), + end, TxnFun), View2 = group_to_view(Group), State1 = check_neighbours(State #state { view = View2, members_state = MembersState1 }), @@ -642,8 +646,9 @@ handle_cast(join, State = #state { self = Self, group_name = GroupName, members_state = undefined, module = Module, - callback_args = Args }) -> - View = join_group(Self, GroupName), + callback_args = Args, + txn_executor = TxnFun }) -> + View = join_group(Self, GroupName, TxnFun), MembersState = case alive_view_members(View) of [Self] -> blank_member_state(); @@ -670,7 +675,8 @@ handle_info({'DOWN', MRef, process, _Pid, Reason}, view = View, module = Module, callback_args = Args, - confirms = Confirms }) -> + confirms = Confirms, + txn_executor = TxnFun }) -> Member = case {Left, Right} of {{Member1, MRef}, _} -> Member1; {_, {Member1, MRef}} -> Member1; @@ -683,7 +689,8 @@ handle_info({'DOWN', MRef, process, _Pid, Reason}, noreply(State); _ -> View1 = - group_to_view(record_dead_member_in_group(Member, GroupName)), + group_to_view(record_dead_member_in_group(Member, + GroupName, TxnFun)), {Result, State2} = case alive_view_members(View1) of [Self] -> @@ -985,14 +992,15 @@ ensure_alive_suffix1(MembersQ) -> %% View modification %% --------------------------------------------------------------------------- -join_group(Self, GroupName) -> - join_group(Self, GroupName, read_group(GroupName)). +join_group(Self, GroupName, TxnFun) -> + join_group(Self, GroupName, read_group(GroupName), TxnFun). -join_group(Self, GroupName, {error, not_found}) -> - join_group(Self, GroupName, prune_or_create_group(Self, GroupName)); -join_group(Self, _GroupName, #gm_group { members = [Self] } = Group) -> +join_group(Self, GroupName, {error, not_found}, TxnFun) -> + join_group(Self, GroupName, + prune_or_create_group(Self, GroupName, TxnFun), TxnFun); +join_group(Self, _GroupName, #gm_group { members = [Self] } = Group, _TxnFun) -> group_to_view(Group); -join_group(Self, GroupName, #gm_group { members = Members } = Group) -> +join_group(Self, GroupName, #gm_group { members = Members } = Group, TxnFun) -> case lists:member(Self, Members) of true -> group_to_view(Group); @@ -1000,20 +1008,23 @@ join_group(Self, GroupName, #gm_group { members = Members } = Group) -> case lists:filter(fun is_member_alive/1, Members) of [] -> join_group(Self, GroupName, - prune_or_create_group(Self, GroupName)); + prune_or_create_group(Self, GroupName, TxnFun)); Alive -> Left = lists:nth(random:uniform(length(Alive)), Alive), Handler = fun () -> join_group( Self, GroupName, - record_dead_member_in_group(Left, GroupName)) + record_dead_member_in_group(Left, + GroupName, + TxnFun), + TxnFun) end, try case gen_server2:call( get_pid(Left), {add_on_right, Self}, infinity) of {ok, Group1} -> group_to_view(Group1); - not_ready -> join_group(Self, GroupName) + not_ready -> join_group(Self, GroupName, TxnFun) end catch exit:{R, _} @@ -1032,29 +1043,28 @@ read_group(GroupName) -> [Group] -> Group end. -prune_or_create_group(Self, GroupName) -> - Group = - rabbit_misc:execute_mnesia_transaction( - fun () -> GroupNew = #gm_group { name = GroupName, - members = [Self], - version = ?VERSION_START }, - case mnesia:read({?GROUP_TABLE, GroupName}) of - [] -> - mnesia:write(GroupNew), - GroupNew; - [Group1 = #gm_group { members = Members }] -> - case lists:any(fun is_member_alive/1, Members) of - true -> Group1; - false -> mnesia:write(GroupNew), - GroupNew - end - end - end), +prune_or_create_group(Self, GroupName, TxnFun) -> + Group = TxnFun( + fun () -> GroupNew = #gm_group { name = GroupName, + members = [Self], + version = ?VERSION_START }, + case mnesia:read({?GROUP_TABLE, GroupName}) of + [] -> + mnesia:write(GroupNew), + GroupNew; + [Group1 = #gm_group { members = Members }] -> + case lists:any(fun is_member_alive/1, Members) of + true -> Group1; + false -> mnesia:write(GroupNew), + GroupNew + end + end + end), Group. -record_dead_member_in_group(Member, GroupName) -> +record_dead_member_in_group(Member, GroupName, TxnFun) -> Group = - rabbit_misc:execute_mnesia_transaction( + TxnFun( fun () -> [Group1 = #gm_group { members = Members, version = Ver }] = mnesia:read({?GROUP_TABLE, GroupName}), case lists:splitwith( @@ -1071,9 +1081,9 @@ record_dead_member_in_group(Member, GroupName) -> end), Group. -record_new_member_in_group(GroupName, Left, NewMember, Fun) -> +record_new_member_in_group(GroupName, Left, NewMember, Fun, TxnFun) -> {Result, Group} = - rabbit_misc:execute_mnesia_transaction( + TxnFun( fun () -> [#gm_group { members = Members, version = Ver } = Group1] = mnesia:read({?GROUP_TABLE, GroupName}), @@ -1088,10 +1098,10 @@ record_new_member_in_group(GroupName, Left, NewMember, Fun) -> end), {Result, Group}. -erase_members_in_group(Members, GroupName) -> +erase_members_in_group(Members, GroupName, TxnFun) -> DeadMembers = [{dead, Id} || Id <- Members], Group = - rabbit_misc:execute_mnesia_transaction( + TxnFun( fun () -> [Group1 = #gm_group { members = [_|_] = Members1, version = Ver }] = @@ -1112,7 +1122,8 @@ maybe_erase_aliases(State = #state { self = Self, view = View0, members_state = MembersState, module = Module, - callback_args = Args }, View) -> + callback_args = Args, + txn_executor = TxnFun }, View) -> #view_member { aliases = Aliases } = fetch_view_member(Self, View), {Erasable, MembersState1} = ?SETS:fold( @@ -1129,7 +1140,7 @@ maybe_erase_aliases(State = #state { self = Self, case Erasable of [] -> {ok, State1 #state { view = View }}; _ -> View1 = group_to_view( - erase_members_in_group(Erasable, GroupName)), + erase_members_in_group(Erasable, GroupName, TxnFun)), {callback_view_changed(Args, Module, View0, View1), check_neighbours(State1 #state { view = View1 })} end. diff --git a/src/gm_soak_test.erl b/src/gm_soak_test.erl index 57217541..5fbfc223 100644 --- a/src/gm_soak_test.erl +++ b/src/gm_soak_test.erl @@ -105,7 +105,9 @@ spawn_member() -> random:seed(MegaSecs, Secs, MicroSecs), %% start up delay of no more than 10 seconds timer:sleep(random:uniform(10000)), - {ok, Pid} = gm:start_link(?MODULE, ?MODULE, []), + {ok, Pid} = gm:start_link( + ?MODULE, ?MODULE, [], + fun rabbit_misc:execute_mnesia_transaction/1), Start = random:uniform(10000), send_loop(Pid, Start, Start + random:uniform(10000)), gm:leave(Pid), diff --git a/src/gm_speed_test.erl b/src/gm_speed_test.erl index dad75bd4..84d4ab2f 100644 --- a/src/gm_speed_test.erl +++ b/src/gm_speed_test.erl @@ -44,7 +44,8 @@ terminate(Owner, _Reason) -> %% other wile_e_coyote(Time, WriteUnit) -> - {ok, Pid} = gm:start_link(?MODULE, ?MODULE, self()), + {ok, Pid} = gm:start_link(?MODULE, ?MODULE, self(), + fun rabbit_misc:execute_mnesia_transaction/1), receive joined -> ok end, timer:sleep(1000), %% wait for all to join timer:send_after(Time, stop), diff --git a/src/gm_tests.erl b/src/gm_tests.erl index 0a2d4204..a9c0ba90 100644 --- a/src/gm_tests.erl +++ b/src/gm_tests.erl @@ -76,7 +76,9 @@ test_confirmed_broadcast() -> test_member_death() -> with_two_members( fun (Pid, Pid2) -> - {ok, Pid3} = gm:start_link(?MODULE, ?MODULE, self()), + {ok, Pid3} = gm:start_link( + ?MODULE, ?MODULE, self(), + fun rabbit_misc:execute_mnesia_transaction/1), passed = receive_joined(Pid3, [Pid, Pid2, Pid3], timeout_joining_gm_group_3), passed = receive_birth(Pid, Pid3, timeout_waiting_for_birth_3_1), @@ -128,10 +130,12 @@ test_broadcast_fun(Fun) -> with_two_members(Fun) -> ok = gm:create_tables(), - {ok, Pid} = gm:start_link(?MODULE, ?MODULE, self()), + {ok, Pid} = gm:start_link(?MODULE, ?MODULE, self(), + fun rabbit_misc:execute_mnesia_transaction/1), passed = receive_joined(Pid, [Pid], timeout_joining_gm_group_1), - {ok, Pid2} = gm:start_link(?MODULE, ?MODULE, self()), + {ok, Pid2} = gm:start_link(?MODULE, ?MODULE, self(), + fun rabbit_misc:execute_mnesia_transaction/1), passed = receive_joined(Pid2, [Pid, Pid2], timeout_joining_gm_group_2), passed = receive_birth(Pid, Pid2, timeout_waiting_for_birth_2), diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index 16690693..bcb6192a 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -326,7 +326,10 @@ ensure_monitoring(CPid, Pids) -> init([#amqqueue { name = QueueName } = Q, GM, DeathFun, DepthFun]) -> GM1 = case GM of undefined -> - {ok, GM2} = gm:start_link(QueueName, ?MODULE, [self()]), + {ok, GM2} = gm:start_link( + QueueName, ?MODULE, + [self()], + fun rabbit_misc:execute_mnesia_transaction/1), receive {joined, GM2, _Members} -> ok end, diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 3d8bd8b4..d0efe37a 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -99,7 +99,8 @@ init(Q = #amqqueue { name = QName }) -> %% above. %% process_flag(trap_exit, true), %% amqqueue_process traps exits too. - {ok, GM} = gm:start_link(QName, ?MODULE, [self()]), + {ok, GM} = gm:start_link(QName, ?MODULE, [self()], + fun rabbit_misc:execute_mnesia_transaction/1), receive {joined, GM} -> ok end, Self = self(), Node = node(), |