summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2014-04-23 18:27:55 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2014-04-23 18:27:55 +0100
commitdbd5a1f0e252aca4eb22173772bec3d5b827fc17 (patch)
tree9aaeaa7e8bf07fe4d90735165eee5b9a5d9ac19f
parentc2482805716da071f0b71a5003ec86636b181c6f (diff)
parent074d731e6e978855006aa6d924e568db98e9d775 (diff)
downloadrabbitmq-server-dbd5a1f0e252aca4eb22173772bec3d5b827fc17.tar.gz
merge stable into bug26084
-rw-r--r--src/gm.erl219
1 files changed, 97 insertions, 122 deletions
diff --git a/src/gm.erl b/src/gm.erl
index 0b0ada21..acc94447 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -509,9 +509,9 @@ create_tables([]) ->
ok;
create_tables([{Table, Attributes} | Tables]) ->
case mnesia:create_table(Table, Attributes) of
- {atomic, ok} -> create_tables(Tables);
- {aborted, {already_exists, gm_group}} -> create_tables(Tables);
- Err -> Err
+ {atomic, ok} -> create_tables(Tables);
+ {aborted, {already_exists, Table}} -> create_tables(Tables);
+ Err -> Err
end.
table_definitions() ->
@@ -605,43 +605,27 @@ handle_call({add_on_right, _NewMember}, _From,
handle_call({add_on_right, NewMember}, _From,
State = #state { self = Self,
group_name = GroupName,
- view = View,
members_state = MembersState,
- module = Module,
- callback_args = Args,
txn_executor = TxnFun }) ->
- {MembersState1, Group} =
- record_new_member_in_group(
- GroupName, Self, NewMember,
- fun (Group1) ->
- View1 = group_to_view(Group1),
- MembersState1 = remove_erased_members(MembersState, View1),
- ok = send_right(NewMember, View1,
- {catchup, Self,
- prepare_members_state(MembersState1)}),
- MembersState1
- end, TxnFun),
- View2 = group_to_view(Group),
- State1 = check_neighbours(State #state { view = View2,
- members_state = MembersState1 }),
- Result = callback_view_changed(Args, Module, View, View2),
+ Group = record_new_member_in_group(NewMember, Self, GroupName, TxnFun),
+ View1 = group_to_view(Group),
+ MembersState1 = remove_erased_members(MembersState, View1),
+ ok = send_right(NewMember, View1,
+ {catchup, Self, prepare_members_state(MembersState1)}),
+ {Result, State1} = change_view(View1, State #state {
+ members_state = MembersState1 }),
handle_callback_result({Result, {ok, Group}, State1}).
-
handle_cast({?TAG, ReqVer, Msg},
State = #state { view = View,
members_state = MembersState,
- group_name = GroupName,
- module = Module,
- callback_args = Args }) ->
+ group_name = GroupName }) ->
{Result, State1} =
case needs_view_update(ReqVer, View) of
- true -> View1 = group_to_view(read_group(GroupName)),
+ true -> View1 = group_to_view(dirty_read_group(GroupName)),
MemberState1 = remove_erased_members(MembersState, View1),
- {callback_view_changed(Args, Module, View, View1),
- check_neighbours(
- State #state { view = View1,
- members_state = MemberState1 })};
+ change_view(View1, State #state {
+ members_state = MemberState1 });
false -> {ok, State}
end,
handle_callback_result(
@@ -725,15 +709,16 @@ handle_info({'DOWN', MRef, process, _Pid, Reason},
_ ->
View1 = group_to_view(record_dead_member_in_group(
Member, GroupName, TxnFun)),
- {Result, State1} = maybe_erase_aliases(State, View1),
handle_callback_result(
- {Result,
- case alive_view_members(View1) of
- [Self] -> State1 #state {
- members_state = blank_member_state(),
- confirms = purge_confirms(Confirms) };
- _ -> State1
- end})
+ case alive_view_members(View1) of
+ [Self] ->
+ {Result, State1} = maybe_erase_aliases(State, View1),
+ {Result, State1 #state {
+ members_state = blank_member_state(),
+ confirms = purge_confirms(Confirms) }};
+ _ ->
+ change_view(View1, State)
+ end)
end.
@@ -1037,7 +1022,7 @@ ensure_alive_suffix1(MembersQ) ->
%% ---------------------------------------------------------------------------
join_group(Self, GroupName, TxnFun) ->
- join_group(Self, GroupName, read_group(GroupName), TxnFun).
+ join_group(Self, GroupName, dirty_read_group(GroupName), TxnFun).
join_group(Self, GroupName, {error, not_found}, TxnFun) ->
join_group(Self, GroupName,
@@ -1080,93 +1065,82 @@ join_group(Self, GroupName, #gm_group { members = Members } = Group, TxnFun) ->
end
end.
-read_group(GroupName) ->
+dirty_read_group(GroupName) ->
case mnesia:dirty_read(?GROUP_TABLE, GroupName) of
[] -> {error, not_found};
[Group] -> Group
end.
+read_group(GroupName) ->
+ case mnesia:read({?GROUP_TABLE, GroupName}) of
+ [] -> {error, not_found};
+ [Group] -> Group
+ end.
+
+write_group(Group) -> mnesia:write(?GROUP_TABLE, Group, write), Group.
+
prune_or_create_group(Self, GroupName, TxnFun) ->
- Group = TxnFun(
- fun () ->
- GroupNew = #gm_group { name = GroupName,
- members = [Self],
- version = get_version(Self) },
- case mnesia:read({?GROUP_TABLE, GroupName}) of
- [] ->
- mnesia:write(GroupNew),
- GroupNew;
- [Group1 = #gm_group { members = Members }] ->
- case lists:any(fun is_member_alive/1, Members) of
- true -> Group1;
- false -> mnesia:write(GroupNew),
- GroupNew
- end
+ TxnFun(
+ fun () ->
+ GroupNew = #gm_group { name = GroupName,
+ members = [Self],
+ version = get_version(Self) },
+ case read_group(GroupName) of
+ {error, not_found} ->
+ write_group(GroupNew);
+ Group = #gm_group { members = Members } ->
+ case lists:any(fun is_member_alive/1, Members) of
+ true -> Group;
+ false -> write_group(GroupNew)
end
- end),
- Group.
+ end
+ end).
record_dead_member_in_group(Member, GroupName, TxnFun) ->
- Group =
- TxnFun(
- fun () -> [Group1 = #gm_group { members = Members, version = Ver }] =
- mnesia:read({?GROUP_TABLE, GroupName}),
- case lists:splitwith(
- fun (Member1) -> Member1 =/= Member end, Members) of
- {_Members1, []} -> %% not found - already recorded dead
- Group1;
- {Members1, [Member | Members2]} ->
- Members3 = Members1 ++ [{dead, Member} | Members2],
- Group2 = Group1 #gm_group { members = Members3,
- version = Ver + 1 },
- mnesia:write(Group2),
- Group2
- end
- end),
- Group.
-
-record_new_member_in_group(GroupName, Left, NewMember, Fun, TxnFun) ->
- {Result, Group} =
- TxnFun(
- fun () ->
- [#gm_group { members = Members, version = Ver } = Group1] =
- mnesia:read({?GROUP_TABLE, GroupName}),
- {Prefix, [Left | Suffix]} =
- lists:splitwith(fun (M) -> M =/= Left end, Members),
- Members1 = Prefix ++ [Left, NewMember | Suffix],
- Group2 = Group1 #gm_group { members = Members1,
- version = Ver + 1 },
- Result = Fun(Group2),
- mnesia:write(Group2),
- {Result, Group2}
- end),
- {Result, Group}.
+ TxnFun(
+ fun () ->
+ Group = #gm_group { members = Members, version = Ver } =
+ read_group(GroupName),
+ case lists:splitwith(
+ fun (Member1) -> Member1 =/= Member end, Members) of
+ {_Members1, []} -> %% not found - already recorded dead
+ Group;
+ {Members1, [Member | Members2]} ->
+ Members3 = Members1 ++ [{dead, Member} | Members2],
+ write_group(Group #gm_group { members = Members3,
+ version = Ver + 1 })
+ end
+ end).
+
+record_new_member_in_group(NewMember, Left, GroupName, TxnFun) ->
+ TxnFun(
+ fun () ->
+ Group = #gm_group { members = Members, version = Ver } =
+ read_group(GroupName),
+ {Prefix, [Left | Suffix]} =
+ lists:splitwith(fun (M) -> M =/= Left end, Members),
+ write_group(Group #gm_group {
+ members = Prefix ++ [Left, NewMember | Suffix],
+ version = Ver + 1 })
+ end).
erase_members_in_group(Members, GroupName, TxnFun) ->
DeadMembers = [{dead, Id} || Id <- Members],
- Group =
- TxnFun(
- fun () ->
- [Group1 = #gm_group { members = [_|_] = Members1,
- version = Ver }] =
- mnesia:read({?GROUP_TABLE, GroupName}),
- case Members1 -- DeadMembers of
- Members1 -> Group1;
- Members2 -> Group2 =
- Group1 #gm_group { members = Members2,
- version = Ver + 1 },
- mnesia:write(Group2),
- Group2
- end
- end),
- Group.
+ TxnFun(
+ fun () ->
+ Group = #gm_group { members = [_|_] = Members1, version = Ver } =
+ read_group(GroupName),
+ case Members1 -- DeadMembers of
+ Members1 -> Group;
+ Members2 -> write_group(
+ Group #gm_group { members = Members2,
+ version = Ver + 1 })
+ end
+ end).
maybe_erase_aliases(State = #state { self = Self,
group_name = GroupName,
- view = View0,
members_state = MembersState,
- module = Module,
- callback_args = Args,
txn_executor = TxnFun }, View) ->
#view_member { aliases = Aliases } = fetch_view_member(Self, View),
{Erasable, MembersState1}
@@ -1185,9 +1159,7 @@ maybe_erase_aliases(State = #state { self = Self,
_ -> group_to_view(
erase_members_in_group(Erasable, GroupName, TxnFun))
end,
- State1 = State #state { members_state = MembersState1, view = View1 },
- {callback_view_changed(Args, Module, View0, View1),
- check_neighbours(State1)}.
+ change_view(View1, State #state { members_state = MembersState1 }).
can_erase_view_member(Self, Self, _LA, _LP) -> false;
can_erase_view_member(_Self, _Id, N, N) -> true;
@@ -1321,7 +1293,7 @@ prepare_members_state(MembersState) -> ?DICT:to_list(MembersState).
build_members_state(MembersStateList) -> ?DICT:from_list(MembersStateList).
make_member(GroupName) ->
- {case read_group(GroupName) of
+ {case dirty_read_group(GroupName) of
#gm_group { version = Version } -> Version;
{error, not_found} -> ?VERSION_START
end, self()}.
@@ -1385,16 +1357,19 @@ callback(Args, Module, Activity) ->
{stop, _Reason} = Error -> Error
end.
-callback_view_changed(Args, Module, OldView, NewView) ->
- OldMembers = all_known_members(OldView),
- NewMembers = all_known_members(NewView),
+change_view(View, State = #state { view = View0,
+ module = Module,
+ callback_args = Args }) ->
+ OldMembers = all_known_members(View0),
+ NewMembers = all_known_members(View),
Births = NewMembers -- OldMembers,
Deaths = OldMembers -- NewMembers,
- case {Births, Deaths} of
- {[], []} -> ok;
- _ -> Module:members_changed(
- Args, get_pids(Births), get_pids(Deaths))
- end.
+ Result = case {Births, Deaths} of
+ {[], []} -> ok;
+ _ -> Module:members_changed(
+ Args, get_pids(Births), get_pids(Deaths))
+ end,
+ {Result, check_neighbours(State #state { view = View })}.
handle_callback_result({Result, State}) ->
if_callback_success(