summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-08-15 12:49:32 +0100
committerSimon MacMullen <simon@rabbitmq.com>2013-08-15 12:49:32 +0100
commit5e69009198ab230c314aa6c2678accd55cf6ba6e (patch)
tree17579840855592111c316bad4e2a587bffe7059f
parent8909bee026f3ee6b7f20828ce59d5a30de1919fd (diff)
downloadrabbitmq-server-5e69009198ab230c314aa6c2678accd55cf6ba6e.tar.gz
Store monitors correctly, with a two level dictionary, mapping MonitoredPid -> MonitoringPid -> Ref and MonitoredPid -> Ref -> MonitoringPid.
-rw-r--r--src/delegate.erl67
1 files changed, 57 insertions, 10 deletions
diff --git a/src/delegate.erl b/src/delegate.erl
index 5277e59f..44a909dc 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -136,7 +136,7 @@ demonitor(Ref) -> ?MODULE:demonitor(Ref, []).
demonitor(Ref, Options) when is_reference(Ref) ->
erlang:demonitor(Ref, Options);
demonitor({Name, Pid}, Options) ->
- gen_server2:cast(Name, {demonitor, Pid, Options}).
+ gen_server2:cast(Name, {demonitor, self(), Pid, Options}).
call(PidOrPids, Msg) ->
invoke(PidOrPids, {gen_server2, call, [Msg, infinity]}).
@@ -185,7 +185,7 @@ apply1(Fun, Arg) -> Fun(Arg).
%%----------------------------------------------------------------------------
init([Name]) ->
- {ok, #state{node = node(), monitors = dict:new(), name = Name}, hibernate,
+ {ok, #state{node = node(), monitors = ddict_new(), name = Name}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
handle_call({invoke, FunOrMFA, Grouped}, _From, State = #state{node = Node}) ->
@@ -195,15 +195,16 @@ handle_call({invoke, FunOrMFA, Grouped}, _From, State = #state{node = Node}) ->
handle_cast({monitor, Type, WantsMonitor, Pid},
State = #state{monitors = Monitors}) ->
Ref = erlang:monitor(Type, Pid),
- Monitors1 = dict:store(Pid, {WantsMonitor, Ref}, Monitors),
+ Monitors1 = ddict_store(Pid, WantsMonitor, Ref, Monitors),
{noreply, State#state{monitors = Monitors1}, hibernate};
-handle_cast({demonitor, Pid, Options},
+handle_cast({demonitor, WantsMonitor, Pid, Options},
State = #state{monitors = Monitors}) ->
- {noreply, case dict:find(Pid, Monitors) of
- {ok, {_WantsMonitor, Ref}} ->
+ {noreply, case ddict_find_f(Pid, WantsMonitor, Monitors) of
+ {ok, Ref} ->
erlang:demonitor(Ref, Options),
- State#state{monitors = dict:erase(Pid, Monitors)};
+ State#state{monitors = ddict_erase_f(
+ Pid, WantsMonitor, Monitors)};
error ->
State
end, hibernate};
@@ -214,10 +215,10 @@ handle_cast({invoke, FunOrMFA, Grouped}, State = #state{node = Node}) ->
handle_info({'DOWN', Ref, process, Pid, Info},
State = #state{monitors = Monitors, name = Name}) ->
- {noreply, case dict:find(Pid, Monitors) of
- {ok, {WantsMonitor, Ref}} ->
+ {noreply, case ddict_find_r(Pid, Ref, Monitors) of
+ {ok, WantsMonitor} ->
WantsMonitor ! {'DOWN', {Name, Pid}, process, Pid, Info},
- State#state{monitors = dict:erase(Pid, Monitors)};
+ State#state{monitors = ddict_erase_r(Pid, Ref, Monitors)};
error ->
State
end, hibernate};
@@ -230,3 +231,49 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
+
+%%----------------------------------------------------------------------------
+
+ddict_new() -> dict:new().
+
+ddict_store(Key, Val1, Val2, Dict) ->
+ V = case dict:find(Key, Dict) of
+ {ok, {DictF, DictR}} -> {dict:store(Val1, Val2, DictF),
+ dict:store(Val2, Val1, DictR)};
+ error -> {dict:new(), dict:new()}
+ end,
+ dict:store(Key, V, Dict).
+
+ddict_find_f(Key, Val1, Dict) -> ddict_find(Key, Val1, Dict, fun select_f/1).
+ddict_find_r(Key, Val2, Dict) -> ddict_find(Key, Val2, Dict, fun select_r/1).
+
+ddict_find(Key, ValX, Dict, Select) ->
+ case dict:find(Key, Dict) of
+ {ok, Dicts} -> {DictX, _} = Select(Dicts),
+ dict:find(ValX, DictX);
+ error -> error
+ end.
+
+ddict_erase_f(Key, Val1, Dict) -> ddict_erase(Key, Val1, Dict, fun select_f/1).
+ddict_erase_r(Key, Val2, Dict) -> ddict_erase(Key, Val2, Dict, fun select_r/1).
+
+ddict_erase(Key, ValX, Dict, Select) ->
+ case dict:find(Key, Dict) of
+ {ok, Dicts} ->
+ {DictX, DictY} = Select(Dicts),
+ Dicts1 = {D, _} =
+ case dict:find(ValX, DictX) of
+ {ok, ValY} -> Select({dict:erase(ValX, DictX),
+ dict:erase(ValY, DictY)});
+ error -> Dicts
+ end,
+ case dict:size(D) of
+ 0 -> dict:erase(Key, Dict);
+ _ -> dict:store(Key, Dicts1, Dict)
+ end;
+ error ->
+ Dict
+ end.
+
+select_f({A, B}) -> {A, B}.
+select_r({A, B}) -> {B, A}.