summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTim Watson <tim@rabbitmq.com>2012-10-17 16:25:40 +0100
committerTim Watson <tim@rabbitmq.com>2012-10-17 16:25:40 +0100
commit0a744726491bec6d2c0d0c75434bcc2c3c264bcd (patch)
tree555e31eff31ff2c9feda108d312858820ac707d4
parent80d879c17eb4e46cf94d158cc8c86787211a4cb0 (diff)
downloadrabbitmq-server-0a744726491bec6d2c0d0c75434bcc2c3c264bcd.tar.gz
do not couple gm with rabbit_misc
-rw-r--r--src/gm.erl115
-rw-r--r--src/gm_soak_test.erl4
-rw-r--r--src/gm_speed_test.erl3
-rw-r--r--src/gm_tests.erl10
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl5
-rw-r--r--src/rabbit_mirror_queue_slave.erl3
6 files changed, 81 insertions, 59 deletions
diff --git a/src/gm.erl b/src/gm.erl
index 285bb927..43cca495 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -376,7 +376,7 @@
-behaviour(gen_server2).
--export([create_tables/0, start_link/3, leave/1, broadcast/2,
+-export([create_tables/0, start_link/4, leave/1, broadcast/2,
confirmed_broadcast/2, info/1, forget_group/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
@@ -408,7 +408,8 @@
callback_args,
confirms,
broadcast_buffer,
- broadcast_timer
+ broadcast_timer,
+ txn_executor
}).
-record(gm_group, { name, version, members }).
@@ -428,9 +429,10 @@
-export_type([group_name/0]).
-type(group_name() :: any()).
+-type(txn_fun() :: fun((fun(() -> any())) -> any())).
-spec(create_tables/0 :: () -> 'ok' | {'aborted', any()}).
--spec(start_link/3 :: (group_name(), atom(), any()) ->
+-spec(start_link/4 :: (group_name(), atom(), any(), txn_fun()) ->
rabbit_types:ok_pid_or_error()).
-spec(leave/1 :: (pid()) -> 'ok').
-spec(broadcast/2 :: (pid(), any()) -> 'ok').
@@ -507,8 +509,8 @@ table_definitions() ->
{Name, Attributes} = ?TABLE,
[{Name, [?TABLE_MATCH | Attributes]}].
-start_link(GroupName, Module, Args) ->
- gen_server2:start_link(?MODULE, [GroupName, Module, Args], []).
+start_link(GroupName, Module, TxnFun, Args) ->
+ gen_server2:start_link(?MODULE, [GroupName, Module, TxnFun, Args], []).
leave(Server) ->
gen_server2:cast(Server, leave).
@@ -523,13 +525,13 @@ info(Server) ->
gen_server2:call(Server, info, infinity).
forget_group(GroupName) ->
- ok = rabbit_misc:execute_mnesia_transaction(
- fun () ->
- mnesia:delete({?GROUP_TABLE, GroupName})
- end),
+ {atomic, ok} = mnesia:sync_transaction(
+ fun () ->
+ mnesia:delete({?GROUP_TABLE, GroupName})
+ end),
ok.
-init([GroupName, Module, Args]) ->
+init([GroupName, Module, TxnFun, Args]) ->
{MegaSecs, Secs, MicroSecs} = now(),
random:seed(MegaSecs, Secs, MicroSecs),
Self = make_member(GroupName),
@@ -545,7 +547,8 @@ init([GroupName, Module, Args]) ->
callback_args = Args,
confirms = queue:new(),
broadcast_buffer = [],
- broadcast_timer = undefined }, hibernate,
+ broadcast_timer = undefined,
+ txn_executor = TxnFun }, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -585,7 +588,8 @@ handle_call({add_on_right, NewMember}, _From,
view = View,
members_state = MembersState,
module = Module,
- callback_args = Args }) ->
+ callback_args = Args,
+ txn_executor = TxnFun }) ->
{MembersState1, Group} =
record_new_member_in_group(
GroupName, Self, NewMember,
@@ -596,7 +600,7 @@ handle_call({add_on_right, NewMember}, _From,
{catchup, Self,
prepare_members_state(MembersState1)}),
MembersState1
- end),
+ end, TxnFun),
View2 = group_to_view(Group),
State1 = check_neighbours(State #state { view = View2,
members_state = MembersState1 }),
@@ -642,8 +646,9 @@ handle_cast(join, State = #state { self = Self,
group_name = GroupName,
members_state = undefined,
module = Module,
- callback_args = Args }) ->
- View = join_group(Self, GroupName),
+ callback_args = Args,
+ txn_executor = TxnFun }) ->
+ View = join_group(Self, GroupName, TxnFun),
MembersState =
case alive_view_members(View) of
[Self] -> blank_member_state();
@@ -670,7 +675,8 @@ handle_info({'DOWN', MRef, process, _Pid, Reason},
view = View,
module = Module,
callback_args = Args,
- confirms = Confirms }) ->
+ confirms = Confirms,
+ txn_executor = TxnFun }) ->
Member = case {Left, Right} of
{{Member1, MRef}, _} -> Member1;
{_, {Member1, MRef}} -> Member1;
@@ -683,7 +689,8 @@ handle_info({'DOWN', MRef, process, _Pid, Reason},
noreply(State);
_ ->
View1 =
- group_to_view(record_dead_member_in_group(Member, GroupName)),
+ group_to_view(record_dead_member_in_group(Member,
+ GroupName, TxnFun)),
{Result, State2} =
case alive_view_members(View1) of
[Self] ->
@@ -985,14 +992,15 @@ ensure_alive_suffix1(MembersQ) ->
%% View modification
%% ---------------------------------------------------------------------------
-join_group(Self, GroupName) ->
- join_group(Self, GroupName, read_group(GroupName)).
+join_group(Self, GroupName, TxnFun) ->
+ join_group(Self, GroupName, read_group(GroupName), TxnFun).
-join_group(Self, GroupName, {error, not_found}) ->
- join_group(Self, GroupName, prune_or_create_group(Self, GroupName));
-join_group(Self, _GroupName, #gm_group { members = [Self] } = Group) ->
+join_group(Self, GroupName, {error, not_found}, TxnFun) ->
+ join_group(Self, GroupName,
+ prune_or_create_group(Self, GroupName, TxnFun), TxnFun);
+join_group(Self, _GroupName, #gm_group { members = [Self] } = Group, _TxnFun) ->
group_to_view(Group);
-join_group(Self, GroupName, #gm_group { members = Members } = Group) ->
+join_group(Self, GroupName, #gm_group { members = Members } = Group, TxnFun) ->
case lists:member(Self, Members) of
true ->
group_to_view(Group);
@@ -1000,20 +1008,23 @@ join_group(Self, GroupName, #gm_group { members = Members } = Group) ->
case lists:filter(fun is_member_alive/1, Members) of
[] ->
join_group(Self, GroupName,
- prune_or_create_group(Self, GroupName));
+ prune_or_create_group(Self, GroupName, TxnFun));
Alive ->
Left = lists:nth(random:uniform(length(Alive)), Alive),
Handler =
fun () ->
join_group(
Self, GroupName,
- record_dead_member_in_group(Left, GroupName))
+ record_dead_member_in_group(Left,
+ GroupName,
+ TxnFun),
+ TxnFun)
end,
try
case gen_server2:call(
get_pid(Left), {add_on_right, Self}, infinity) of
{ok, Group1} -> group_to_view(Group1);
- not_ready -> join_group(Self, GroupName)
+ not_ready -> join_group(Self, GroupName, TxnFun)
end
catch
exit:{R, _}
@@ -1032,29 +1043,28 @@ read_group(GroupName) ->
[Group] -> Group
end.
-prune_or_create_group(Self, GroupName) ->
- Group =
- rabbit_misc:execute_mnesia_transaction(
- fun () -> GroupNew = #gm_group { name = GroupName,
- members = [Self],
- version = ?VERSION_START },
- case mnesia:read({?GROUP_TABLE, GroupName}) of
- [] ->
- mnesia:write(GroupNew),
- GroupNew;
- [Group1 = #gm_group { members = Members }] ->
- case lists:any(fun is_member_alive/1, Members) of
- true -> Group1;
- false -> mnesia:write(GroupNew),
- GroupNew
- end
- end
- end),
+prune_or_create_group(Self, GroupName, TxnFun) ->
+ Group = TxnFun(
+ fun () -> GroupNew = #gm_group { name = GroupName,
+ members = [Self],
+ version = ?VERSION_START },
+ case mnesia:read({?GROUP_TABLE, GroupName}) of
+ [] ->
+ mnesia:write(GroupNew),
+ GroupNew;
+ [Group1 = #gm_group { members = Members }] ->
+ case lists:any(fun is_member_alive/1, Members) of
+ true -> Group1;
+ false -> mnesia:write(GroupNew),
+ GroupNew
+ end
+ end
+ end),
Group.
-record_dead_member_in_group(Member, GroupName) ->
+record_dead_member_in_group(Member, GroupName, TxnFun) ->
Group =
- rabbit_misc:execute_mnesia_transaction(
+ TxnFun(
fun () -> [Group1 = #gm_group { members = Members, version = Ver }] =
mnesia:read({?GROUP_TABLE, GroupName}),
case lists:splitwith(
@@ -1071,9 +1081,9 @@ record_dead_member_in_group(Member, GroupName) ->
end),
Group.
-record_new_member_in_group(GroupName, Left, NewMember, Fun) ->
+record_new_member_in_group(GroupName, Left, NewMember, Fun, TxnFun) ->
{Result, Group} =
- rabbit_misc:execute_mnesia_transaction(
+ TxnFun(
fun () ->
[#gm_group { members = Members, version = Ver } = Group1] =
mnesia:read({?GROUP_TABLE, GroupName}),
@@ -1088,10 +1098,10 @@ record_new_member_in_group(GroupName, Left, NewMember, Fun) ->
end),
{Result, Group}.
-erase_members_in_group(Members, GroupName) ->
+erase_members_in_group(Members, GroupName, TxnFun) ->
DeadMembers = [{dead, Id} || Id <- Members],
Group =
- rabbit_misc:execute_mnesia_transaction(
+ TxnFun(
fun () ->
[Group1 = #gm_group { members = [_|_] = Members1,
version = Ver }] =
@@ -1112,7 +1122,8 @@ maybe_erase_aliases(State = #state { self = Self,
view = View0,
members_state = MembersState,
module = Module,
- callback_args = Args }, View) ->
+ callback_args = Args,
+ txn_executor = TxnFun }, View) ->
#view_member { aliases = Aliases } = fetch_view_member(Self, View),
{Erasable, MembersState1}
= ?SETS:fold(
@@ -1129,7 +1140,7 @@ maybe_erase_aliases(State = #state { self = Self,
case Erasable of
[] -> {ok, State1 #state { view = View }};
_ -> View1 = group_to_view(
- erase_members_in_group(Erasable, GroupName)),
+ erase_members_in_group(Erasable, GroupName, TxnFun)),
{callback_view_changed(Args, Module, View0, View1),
check_neighbours(State1 #state { view = View1 })}
end.
diff --git a/src/gm_soak_test.erl b/src/gm_soak_test.erl
index 57217541..5fbfc223 100644
--- a/src/gm_soak_test.erl
+++ b/src/gm_soak_test.erl
@@ -105,7 +105,9 @@ spawn_member() ->
random:seed(MegaSecs, Secs, MicroSecs),
%% start up delay of no more than 10 seconds
timer:sleep(random:uniform(10000)),
- {ok, Pid} = gm:start_link(?MODULE, ?MODULE, []),
+ {ok, Pid} = gm:start_link(
+ ?MODULE, ?MODULE, [],
+ fun rabbit_misc:execute_mnesia_transaction/1),
Start = random:uniform(10000),
send_loop(Pid, Start, Start + random:uniform(10000)),
gm:leave(Pid),
diff --git a/src/gm_speed_test.erl b/src/gm_speed_test.erl
index dad75bd4..84d4ab2f 100644
--- a/src/gm_speed_test.erl
+++ b/src/gm_speed_test.erl
@@ -44,7 +44,8 @@ terminate(Owner, _Reason) ->
%% other
wile_e_coyote(Time, WriteUnit) ->
- {ok, Pid} = gm:start_link(?MODULE, ?MODULE, self()),
+ {ok, Pid} = gm:start_link(?MODULE, ?MODULE, self(),
+ fun rabbit_misc:execute_mnesia_transaction/1),
receive joined -> ok end,
timer:sleep(1000), %% wait for all to join
timer:send_after(Time, stop),
diff --git a/src/gm_tests.erl b/src/gm_tests.erl
index 0a2d4204..a9c0ba90 100644
--- a/src/gm_tests.erl
+++ b/src/gm_tests.erl
@@ -76,7 +76,9 @@ test_confirmed_broadcast() ->
test_member_death() ->
with_two_members(
fun (Pid, Pid2) ->
- {ok, Pid3} = gm:start_link(?MODULE, ?MODULE, self()),
+ {ok, Pid3} = gm:start_link(
+ ?MODULE, ?MODULE, self(),
+ fun rabbit_misc:execute_mnesia_transaction/1),
passed = receive_joined(Pid3, [Pid, Pid2, Pid3],
timeout_joining_gm_group_3),
passed = receive_birth(Pid, Pid3, timeout_waiting_for_birth_3_1),
@@ -128,10 +130,12 @@ test_broadcast_fun(Fun) ->
with_two_members(Fun) ->
ok = gm:create_tables(),
- {ok, Pid} = gm:start_link(?MODULE, ?MODULE, self()),
+ {ok, Pid} = gm:start_link(?MODULE, ?MODULE, self(),
+ fun rabbit_misc:execute_mnesia_transaction/1),
passed = receive_joined(Pid, [Pid], timeout_joining_gm_group_1),
- {ok, Pid2} = gm:start_link(?MODULE, ?MODULE, self()),
+ {ok, Pid2} = gm:start_link(?MODULE, ?MODULE, self(),
+ fun rabbit_misc:execute_mnesia_transaction/1),
passed = receive_joined(Pid2, [Pid, Pid2], timeout_joining_gm_group_2),
passed = receive_birth(Pid, Pid2, timeout_waiting_for_birth_2),
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index 16690693..bcb6192a 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -326,7 +326,10 @@ ensure_monitoring(CPid, Pids) ->
init([#amqqueue { name = QueueName } = Q, GM, DeathFun, DepthFun]) ->
GM1 = case GM of
undefined ->
- {ok, GM2} = gm:start_link(QueueName, ?MODULE, [self()]),
+ {ok, GM2} = gm:start_link(
+ QueueName, ?MODULE,
+ [self()],
+ fun rabbit_misc:execute_mnesia_transaction/1),
receive {joined, GM2, _Members} ->
ok
end,
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 3d8bd8b4..d0efe37a 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -99,7 +99,8 @@ init(Q = #amqqueue { name = QName }) ->
%% above.
%%
process_flag(trap_exit, true), %% amqqueue_process traps exits too.
- {ok, GM} = gm:start_link(QName, ?MODULE, [self()]),
+ {ok, GM} = gm:start_link(QName, ?MODULE, [self()],
+ fun rabbit_misc:execute_mnesia_transaction/1),
receive {joined, GM} -> ok end,
Self = self(),
Node = node(),