summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2011-12-09 15:41:59 +0000
committerEmile Joubert <emile@rabbitmq.com>2011-12-09 15:41:59 +0000
commit8042f5f31ada2fdbbbcd8cd0d63db413a6fea200 (patch)
treec8136adb0677e4903ff3a07a71fd82a007b80cb7
parent7c5faf36be6e4f4b21f96a6341a8c7debf6b3912 (diff)
downloadrabbitmq-server-8042f5f31ada2fdbbbcd8cd0d63db413a6fea200.tar.gz
Extend member structure to prevent pid collisions
-rw-r--r--src/gm.erl36
1 files changed, 22 insertions, 14 deletions
diff --git a/src/gm.erl b/src/gm.erl
index 8c838a70..17aca830 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -515,8 +515,8 @@ group_members(Server) ->
init([GroupName, Module, Args]) ->
{MegaSecs, Secs, MicroSecs} = now(),
random:seed(MegaSecs, Secs, MicroSecs),
+ Self = {rabbit_guid:guid(), self()},
gen_server2:cast(self(), join),
- Self = self(),
{ok, #state { self = Self,
left = {Self, undefined},
right = {Self, undefined},
@@ -541,7 +541,8 @@ handle_call({confirmed_broadcast, Msg}, _From,
right = {Self, undefined},
module = Module,
callback_args = Args }) ->
- handle_callback_result({Module:handle_msg(Args, Self, Msg), ok, State});
+ handle_callback_result({Module:handle_msg(Args, get_pid(Self), Msg),
+ ok, State});
handle_call({confirmed_broadcast, Msg}, From, State) ->
internal_broadcast(Msg, From, State);
@@ -604,7 +605,8 @@ handle_cast({broadcast, Msg},
right = {Self, undefined},
module = Module,
callback_args = Args }) ->
- handle_callback_result({Module:handle_msg(Args, Self, Msg), State});
+ handle_callback_result({Module:handle_msg(Args, get_pid(Self), Msg),
+ State});
handle_cast({broadcast, Msg}, State) ->
internal_broadcast(Msg, none, State);
@@ -623,7 +625,7 @@ handle_cast(join, State = #state { self = Self,
State1 = check_neighbours(State #state { view = View,
members_state = MembersState }),
handle_callback_result(
- {Module:joined(Args, all_known_members(View)), State1});
+ {Module:joined(Args, get_pids(all_known_members(View))), State1});
handle_cast(leave, State) ->
{stop, normal, State}.
@@ -817,7 +819,7 @@ internal_broadcast(Msg, From, State = #state { self = Self,
confirms = Confirms,
callback_args = Args,
broadcast_buffer = Buffer }) ->
- Result = Module:handle_msg(Args, Self, Msg),
+ Result = Module:handle_msg(Args, get_pid(Self), Msg),
Buffer1 = [{PubCount, Msg} | Buffer],
Confirms1 = case From of
none -> Confirms;
@@ -979,7 +981,7 @@ join_group(Self, GroupName, #gm_group { members = Members } = Group) ->
end,
try
case gen_server2:call(
- Left, {add_on_right, Self}, infinity) of
+ get_pid(Left), {add_on_right, Self}, infinity) of
{ok, Group1} -> group_to_view(Group1);
not_ready -> join_group(Self, GroupName)
end
@@ -1114,24 +1116,25 @@ can_erase_view_member(_Self, _Id, _LA, _LP) -> false.
ensure_neighbour(_Ver, Self, {Self, undefined}, Self) ->
{Self, undefined};
ensure_neighbour(Ver, Self, {Self, undefined}, RealNeighbour) ->
- ok = gen_server2:cast(RealNeighbour, {?TAG, Ver, check_neighbours}),
+ ok = gen_server2:cast(get_pid(RealNeighbour),
+ {?TAG, Ver, check_neighbours}),
{RealNeighbour, maybe_monitor(RealNeighbour, Self)};
ensure_neighbour(_Ver, _Self, {RealNeighbour, MRef}, RealNeighbour) ->
{RealNeighbour, MRef};
ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) ->
true = erlang:demonitor(MRef),
Msg = {?TAG, Ver, check_neighbours},
- ok = gen_server2:cast(RealNeighbour, Msg),
+ ok = gen_server2:cast(get_pid(RealNeighbour), Msg),
ok = case Neighbour of
Self -> ok;
- _ -> gen_server2:cast(Neighbour, Msg)
+ _ -> gen_server2:cast(get_pid(Neighbour), Msg)
end,
{Neighbour, maybe_monitor(Neighbour, Self)}.
maybe_monitor(Self, Self) ->
undefined;
maybe_monitor(Other, _Self) ->
- erlang:monitor(process, Other).
+ erlang:monitor(process, get_pid(Other)).
check_neighbours(State = #state { self = Self,
left = Left,
@@ -1238,6 +1241,9 @@ prepare_members_state(MembersState) ->
build_members_state(MembersStateList) ->
?DICT:from_list(MembersStateList).
+get_pid({_Guid, Pid}) -> Pid.
+
+get_pids(Ids) -> [Pid || {_Guid, Pid} <- Ids].
%% ---------------------------------------------------------------------------
%% Activity assembly
@@ -1262,13 +1268,13 @@ maybe_send_activity(Activity, #state { self = Self,
send_right(Right, View, {activity, Self, Activity}).
send_right(Right, View, Msg) ->
- ok = gen_server2:cast(Right, {?TAG, view_version(View), Msg}).
+ ok = gen_server2:cast(get_pid(Right), {?TAG, view_version(View), Msg}).
callback(Args, Module, Activity) ->
lists:foldl(
fun ({Id, Pubs, _Acks}, ok) ->
lists:foldl(fun ({_PubNum, Pub}, ok) ->
- Module:handle_msg(Args, Id, Pub);
+ Module:handle_msg(Args, get_pid(Id), Pub);
(_, Error) ->
Error
end, ok, Pubs);
@@ -1283,7 +1289,8 @@ callback_view_changed(Args, Module, OldView, NewView) ->
Deaths = OldMembers -- NewMembers,
case {Births, Deaths} of
{[], []} -> ok;
- _ -> Module:members_changed(Args, Births, Deaths)
+ _ -> Module:members_changed(Args, get_pids(Births),
+ get_pids(Deaths))
end.
handle_callback_result({Result, State}) ->
@@ -1333,7 +1340,8 @@ maybe_confirm(_Self, _Id, Confirms, _PubNums) ->
Confirms.
purge_confirms(Confirms) ->
- [gen_server2:reply(From, ok) || {_PubNum, From} <- queue:to_list(Confirms)],
+ [gen_server2:reply(From, ok) ||
+ {_PubNum, From} <- queue:to_list(Confirms)],
queue:new().