diff options
author | Emile Joubert <emile@rabbitmq.com> | 2011-12-09 15:41:59 +0000 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2011-12-09 15:41:59 +0000 |
commit | 8042f5f31ada2fdbbbcd8cd0d63db413a6fea200 (patch) | |
tree | c8136adb0677e4903ff3a07a71fd82a007b80cb7 | |
parent | 7c5faf36be6e4f4b21f96a6341a8c7debf6b3912 (diff) | |
download | rabbitmq-server-8042f5f31ada2fdbbbcd8cd0d63db413a6fea200.tar.gz |
Extend member structure to prevent pid collisions
-rw-r--r-- | src/gm.erl | 36 |
1 files changed, 22 insertions, 14 deletions
@@ -515,8 +515,8 @@ group_members(Server) -> init([GroupName, Module, Args]) -> {MegaSecs, Secs, MicroSecs} = now(), random:seed(MegaSecs, Secs, MicroSecs), + Self = {rabbit_guid:guid(), self()}, gen_server2:cast(self(), join), - Self = self(), {ok, #state { self = Self, left = {Self, undefined}, right = {Self, undefined}, @@ -541,7 +541,8 @@ handle_call({confirmed_broadcast, Msg}, _From, right = {Self, undefined}, module = Module, callback_args = Args }) -> - handle_callback_result({Module:handle_msg(Args, Self, Msg), ok, State}); + handle_callback_result({Module:handle_msg(Args, get_pid(Self), Msg), + ok, State}); handle_call({confirmed_broadcast, Msg}, From, State) -> internal_broadcast(Msg, From, State); @@ -604,7 +605,8 @@ handle_cast({broadcast, Msg}, right = {Self, undefined}, module = Module, callback_args = Args }) -> - handle_callback_result({Module:handle_msg(Args, Self, Msg), State}); + handle_callback_result({Module:handle_msg(Args, get_pid(Self), Msg), + State}); handle_cast({broadcast, Msg}, State) -> internal_broadcast(Msg, none, State); @@ -623,7 +625,7 @@ handle_cast(join, State = #state { self = Self, State1 = check_neighbours(State #state { view = View, members_state = MembersState }), handle_callback_result( - {Module:joined(Args, all_known_members(View)), State1}); + {Module:joined(Args, get_pids(all_known_members(View))), State1}); handle_cast(leave, State) -> {stop, normal, State}. @@ -817,7 +819,7 @@ internal_broadcast(Msg, From, State = #state { self = Self, confirms = Confirms, callback_args = Args, broadcast_buffer = Buffer }) -> - Result = Module:handle_msg(Args, Self, Msg), + Result = Module:handle_msg(Args, get_pid(Self), Msg), Buffer1 = [{PubCount, Msg} | Buffer], Confirms1 = case From of none -> Confirms; @@ -979,7 +981,7 @@ join_group(Self, GroupName, #gm_group { members = Members } = Group) -> end, try case gen_server2:call( - Left, {add_on_right, Self}, infinity) of + get_pid(Left), {add_on_right, Self}, infinity) of {ok, Group1} -> group_to_view(Group1); not_ready -> join_group(Self, GroupName) end @@ -1114,24 +1116,25 @@ can_erase_view_member(_Self, _Id, _LA, _LP) -> false. ensure_neighbour(_Ver, Self, {Self, undefined}, Self) -> {Self, undefined}; ensure_neighbour(Ver, Self, {Self, undefined}, RealNeighbour) -> - ok = gen_server2:cast(RealNeighbour, {?TAG, Ver, check_neighbours}), + ok = gen_server2:cast(get_pid(RealNeighbour), + {?TAG, Ver, check_neighbours}), {RealNeighbour, maybe_monitor(RealNeighbour, Self)}; ensure_neighbour(_Ver, _Self, {RealNeighbour, MRef}, RealNeighbour) -> {RealNeighbour, MRef}; ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) -> true = erlang:demonitor(MRef), Msg = {?TAG, Ver, check_neighbours}, - ok = gen_server2:cast(RealNeighbour, Msg), + ok = gen_server2:cast(get_pid(RealNeighbour), Msg), ok = case Neighbour of Self -> ok; - _ -> gen_server2:cast(Neighbour, Msg) + _ -> gen_server2:cast(get_pid(Neighbour), Msg) end, {Neighbour, maybe_monitor(Neighbour, Self)}. maybe_monitor(Self, Self) -> undefined; maybe_monitor(Other, _Self) -> - erlang:monitor(process, Other). + erlang:monitor(process, get_pid(Other)). check_neighbours(State = #state { self = Self, left = Left, @@ -1238,6 +1241,9 @@ prepare_members_state(MembersState) -> build_members_state(MembersStateList) -> ?DICT:from_list(MembersStateList). +get_pid({_Guid, Pid}) -> Pid. + +get_pids(Ids) -> [Pid || {_Guid, Pid} <- Ids]. %% --------------------------------------------------------------------------- %% Activity assembly @@ -1262,13 +1268,13 @@ maybe_send_activity(Activity, #state { self = Self, send_right(Right, View, {activity, Self, Activity}). send_right(Right, View, Msg) -> - ok = gen_server2:cast(Right, {?TAG, view_version(View), Msg}). + ok = gen_server2:cast(get_pid(Right), {?TAG, view_version(View), Msg}). callback(Args, Module, Activity) -> lists:foldl( fun ({Id, Pubs, _Acks}, ok) -> lists:foldl(fun ({_PubNum, Pub}, ok) -> - Module:handle_msg(Args, Id, Pub); + Module:handle_msg(Args, get_pid(Id), Pub); (_, Error) -> Error end, ok, Pubs); @@ -1283,7 +1289,8 @@ callback_view_changed(Args, Module, OldView, NewView) -> Deaths = OldMembers -- NewMembers, case {Births, Deaths} of {[], []} -> ok; - _ -> Module:members_changed(Args, Births, Deaths) + _ -> Module:members_changed(Args, get_pids(Births), + get_pids(Deaths)) end. handle_callback_result({Result, State}) -> @@ -1333,7 +1340,8 @@ maybe_confirm(_Self, _Id, Confirms, _PubNums) -> Confirms. purge_confirms(Confirms) -> - [gen_server2:reply(From, ok) || {_PubNum, From} <- queue:to_list(Confirms)], + [gen_server2:reply(From, ok) || + {_PubNum, From} <- queue:to_list(Confirms)], queue:new(). |