diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2014-04-23 18:27:55 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2014-04-23 18:27:55 +0100 |
commit | dbd5a1f0e252aca4eb22173772bec3d5b827fc17 (patch) | |
tree | 9aaeaa7e8bf07fe4d90735165eee5b9a5d9ac19f | |
parent | c2482805716da071f0b71a5003ec86636b181c6f (diff) | |
parent | 074d731e6e978855006aa6d924e568db98e9d775 (diff) | |
download | rabbitmq-server-dbd5a1f0e252aca4eb22173772bec3d5b827fc17.tar.gz |
merge stable into bug26084
-rw-r--r-- | src/gm.erl | 219 |
1 files changed, 97 insertions, 122 deletions
@@ -509,9 +509,9 @@ create_tables([]) -> ok; create_tables([{Table, Attributes} | Tables]) -> case mnesia:create_table(Table, Attributes) of - {atomic, ok} -> create_tables(Tables); - {aborted, {already_exists, gm_group}} -> create_tables(Tables); - Err -> Err + {atomic, ok} -> create_tables(Tables); + {aborted, {already_exists, Table}} -> create_tables(Tables); + Err -> Err end. table_definitions() -> @@ -605,43 +605,27 @@ handle_call({add_on_right, _NewMember}, _From, handle_call({add_on_right, NewMember}, _From, State = #state { self = Self, group_name = GroupName, - view = View, members_state = MembersState, - module = Module, - callback_args = Args, txn_executor = TxnFun }) -> - {MembersState1, Group} = - record_new_member_in_group( - GroupName, Self, NewMember, - fun (Group1) -> - View1 = group_to_view(Group1), - MembersState1 = remove_erased_members(MembersState, View1), - ok = send_right(NewMember, View1, - {catchup, Self, - prepare_members_state(MembersState1)}), - MembersState1 - end, TxnFun), - View2 = group_to_view(Group), - State1 = check_neighbours(State #state { view = View2, - members_state = MembersState1 }), - Result = callback_view_changed(Args, Module, View, View2), + Group = record_new_member_in_group(NewMember, Self, GroupName, TxnFun), + View1 = group_to_view(Group), + MembersState1 = remove_erased_members(MembersState, View1), + ok = send_right(NewMember, View1, + {catchup, Self, prepare_members_state(MembersState1)}), + {Result, State1} = change_view(View1, State #state { + members_state = MembersState1 }), handle_callback_result({Result, {ok, Group}, State1}). - handle_cast({?TAG, ReqVer, Msg}, State = #state { view = View, members_state = MembersState, - group_name = GroupName, - module = Module, - callback_args = Args }) -> + group_name = GroupName }) -> {Result, State1} = case needs_view_update(ReqVer, View) of - true -> View1 = group_to_view(read_group(GroupName)), + true -> View1 = group_to_view(dirty_read_group(GroupName)), MemberState1 = remove_erased_members(MembersState, View1), - {callback_view_changed(Args, Module, View, View1), - check_neighbours( - State #state { view = View1, - members_state = MemberState1 })}; + change_view(View1, State #state { + members_state = MemberState1 }); false -> {ok, State} end, handle_callback_result( @@ -725,15 +709,16 @@ handle_info({'DOWN', MRef, process, _Pid, Reason}, _ -> View1 = group_to_view(record_dead_member_in_group( Member, GroupName, TxnFun)), - {Result, State1} = maybe_erase_aliases(State, View1), handle_callback_result( - {Result, - case alive_view_members(View1) of - [Self] -> State1 #state { - members_state = blank_member_state(), - confirms = purge_confirms(Confirms) }; - _ -> State1 - end}) + case alive_view_members(View1) of + [Self] -> + {Result, State1} = maybe_erase_aliases(State, View1), + {Result, State1 #state { + members_state = blank_member_state(), + confirms = purge_confirms(Confirms) }}; + _ -> + change_view(View1, State) + end) end. @@ -1037,7 +1022,7 @@ ensure_alive_suffix1(MembersQ) -> %% --------------------------------------------------------------------------- join_group(Self, GroupName, TxnFun) -> - join_group(Self, GroupName, read_group(GroupName), TxnFun). + join_group(Self, GroupName, dirty_read_group(GroupName), TxnFun). join_group(Self, GroupName, {error, not_found}, TxnFun) -> join_group(Self, GroupName, @@ -1080,93 +1065,82 @@ join_group(Self, GroupName, #gm_group { members = Members } = Group, TxnFun) -> end end. -read_group(GroupName) -> +dirty_read_group(GroupName) -> case mnesia:dirty_read(?GROUP_TABLE, GroupName) of [] -> {error, not_found}; [Group] -> Group end. +read_group(GroupName) -> + case mnesia:read({?GROUP_TABLE, GroupName}) of + [] -> {error, not_found}; + [Group] -> Group + end. + +write_group(Group) -> mnesia:write(?GROUP_TABLE, Group, write), Group. + prune_or_create_group(Self, GroupName, TxnFun) -> - Group = TxnFun( - fun () -> - GroupNew = #gm_group { name = GroupName, - members = [Self], - version = get_version(Self) }, - case mnesia:read({?GROUP_TABLE, GroupName}) of - [] -> - mnesia:write(GroupNew), - GroupNew; - [Group1 = #gm_group { members = Members }] -> - case lists:any(fun is_member_alive/1, Members) of - true -> Group1; - false -> mnesia:write(GroupNew), - GroupNew - end + TxnFun( + fun () -> + GroupNew = #gm_group { name = GroupName, + members = [Self], + version = get_version(Self) }, + case read_group(GroupName) of + {error, not_found} -> + write_group(GroupNew); + Group = #gm_group { members = Members } -> + case lists:any(fun is_member_alive/1, Members) of + true -> Group; + false -> write_group(GroupNew) end - end), - Group. + end + end). record_dead_member_in_group(Member, GroupName, TxnFun) -> - Group = - TxnFun( - fun () -> [Group1 = #gm_group { members = Members, version = Ver }] = - mnesia:read({?GROUP_TABLE, GroupName}), - case lists:splitwith( - fun (Member1) -> Member1 =/= Member end, Members) of - {_Members1, []} -> %% not found - already recorded dead - Group1; - {Members1, [Member | Members2]} -> - Members3 = Members1 ++ [{dead, Member} | Members2], - Group2 = Group1 #gm_group { members = Members3, - version = Ver + 1 }, - mnesia:write(Group2), - Group2 - end - end), - Group. - -record_new_member_in_group(GroupName, Left, NewMember, Fun, TxnFun) -> - {Result, Group} = - TxnFun( - fun () -> - [#gm_group { members = Members, version = Ver } = Group1] = - mnesia:read({?GROUP_TABLE, GroupName}), - {Prefix, [Left | Suffix]} = - lists:splitwith(fun (M) -> M =/= Left end, Members), - Members1 = Prefix ++ [Left, NewMember | Suffix], - Group2 = Group1 #gm_group { members = Members1, - version = Ver + 1 }, - Result = Fun(Group2), - mnesia:write(Group2), - {Result, Group2} - end), - {Result, Group}. + TxnFun( + fun () -> + Group = #gm_group { members = Members, version = Ver } = + read_group(GroupName), + case lists:splitwith( + fun (Member1) -> Member1 =/= Member end, Members) of + {_Members1, []} -> %% not found - already recorded dead + Group; + {Members1, [Member | Members2]} -> + Members3 = Members1 ++ [{dead, Member} | Members2], + write_group(Group #gm_group { members = Members3, + version = Ver + 1 }) + end + end). + +record_new_member_in_group(NewMember, Left, GroupName, TxnFun) -> + TxnFun( + fun () -> + Group = #gm_group { members = Members, version = Ver } = + read_group(GroupName), + {Prefix, [Left | Suffix]} = + lists:splitwith(fun (M) -> M =/= Left end, Members), + write_group(Group #gm_group { + members = Prefix ++ [Left, NewMember | Suffix], + version = Ver + 1 }) + end). erase_members_in_group(Members, GroupName, TxnFun) -> DeadMembers = [{dead, Id} || Id <- Members], - Group = - TxnFun( - fun () -> - [Group1 = #gm_group { members = [_|_] = Members1, - version = Ver }] = - mnesia:read({?GROUP_TABLE, GroupName}), - case Members1 -- DeadMembers of - Members1 -> Group1; - Members2 -> Group2 = - Group1 #gm_group { members = Members2, - version = Ver + 1 }, - mnesia:write(Group2), - Group2 - end - end), - Group. + TxnFun( + fun () -> + Group = #gm_group { members = [_|_] = Members1, version = Ver } = + read_group(GroupName), + case Members1 -- DeadMembers of + Members1 -> Group; + Members2 -> write_group( + Group #gm_group { members = Members2, + version = Ver + 1 }) + end + end). maybe_erase_aliases(State = #state { self = Self, group_name = GroupName, - view = View0, members_state = MembersState, - module = Module, - callback_args = Args, txn_executor = TxnFun }, View) -> #view_member { aliases = Aliases } = fetch_view_member(Self, View), {Erasable, MembersState1} @@ -1185,9 +1159,7 @@ maybe_erase_aliases(State = #state { self = Self, _ -> group_to_view( erase_members_in_group(Erasable, GroupName, TxnFun)) end, - State1 = State #state { members_state = MembersState1, view = View1 }, - {callback_view_changed(Args, Module, View0, View1), - check_neighbours(State1)}. + change_view(View1, State #state { members_state = MembersState1 }). can_erase_view_member(Self, Self, _LA, _LP) -> false; can_erase_view_member(_Self, _Id, N, N) -> true; @@ -1321,7 +1293,7 @@ prepare_members_state(MembersState) -> ?DICT:to_list(MembersState). build_members_state(MembersStateList) -> ?DICT:from_list(MembersStateList). make_member(GroupName) -> - {case read_group(GroupName) of + {case dirty_read_group(GroupName) of #gm_group { version = Version } -> Version; {error, not_found} -> ?VERSION_START end, self()}. @@ -1385,16 +1357,19 @@ callback(Args, Module, Activity) -> {stop, _Reason} = Error -> Error end. -callback_view_changed(Args, Module, OldView, NewView) -> - OldMembers = all_known_members(OldView), - NewMembers = all_known_members(NewView), +change_view(View, State = #state { view = View0, + module = Module, + callback_args = Args }) -> + OldMembers = all_known_members(View0), + NewMembers = all_known_members(View), Births = NewMembers -- OldMembers, Deaths = OldMembers -- NewMembers, - case {Births, Deaths} of - {[], []} -> ok; - _ -> Module:members_changed( - Args, get_pids(Births), get_pids(Deaths)) - end. + Result = case {Births, Deaths} of + {[], []} -> ok; + _ -> Module:members_changed( + Args, get_pids(Births), get_pids(Deaths)) + end, + {Result, check_neighbours(State #state { view = View })}. handle_callback_result({Result, State}) -> if_callback_success( |