diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-08-15 12:49:32 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-08-15 12:49:32 +0100 |
commit | 5e69009198ab230c314aa6c2678accd55cf6ba6e (patch) | |
tree | 17579840855592111c316bad4e2a587bffe7059f | |
parent | 8909bee026f3ee6b7f20828ce59d5a30de1919fd (diff) | |
download | rabbitmq-server-5e69009198ab230c314aa6c2678accd55cf6ba6e.tar.gz |
Store monitors correctly, with a two level dictionary, mapping MonitoredPid -> MonitoringPid -> Ref and MonitoredPid -> Ref -> MonitoringPid.
-rw-r--r-- | src/delegate.erl | 67 |
1 files changed, 57 insertions, 10 deletions
diff --git a/src/delegate.erl b/src/delegate.erl index 5277e59f..44a909dc 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -136,7 +136,7 @@ demonitor(Ref) -> ?MODULE:demonitor(Ref, []). demonitor(Ref, Options) when is_reference(Ref) -> erlang:demonitor(Ref, Options); demonitor({Name, Pid}, Options) -> - gen_server2:cast(Name, {demonitor, Pid, Options}). + gen_server2:cast(Name, {demonitor, self(), Pid, Options}). call(PidOrPids, Msg) -> invoke(PidOrPids, {gen_server2, call, [Msg, infinity]}). @@ -185,7 +185,7 @@ apply1(Fun, Arg) -> Fun(Arg). %%---------------------------------------------------------------------------- init([Name]) -> - {ok, #state{node = node(), monitors = dict:new(), name = Name}, hibernate, + {ok, #state{node = node(), monitors = ddict_new(), name = Name}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. handle_call({invoke, FunOrMFA, Grouped}, _From, State = #state{node = Node}) -> @@ -195,15 +195,16 @@ handle_call({invoke, FunOrMFA, Grouped}, _From, State = #state{node = Node}) -> handle_cast({monitor, Type, WantsMonitor, Pid}, State = #state{monitors = Monitors}) -> Ref = erlang:monitor(Type, Pid), - Monitors1 = dict:store(Pid, {WantsMonitor, Ref}, Monitors), + Monitors1 = ddict_store(Pid, WantsMonitor, Ref, Monitors), {noreply, State#state{monitors = Monitors1}, hibernate}; -handle_cast({demonitor, Pid, Options}, +handle_cast({demonitor, WantsMonitor, Pid, Options}, State = #state{monitors = Monitors}) -> - {noreply, case dict:find(Pid, Monitors) of - {ok, {_WantsMonitor, Ref}} -> + {noreply, case ddict_find_f(Pid, WantsMonitor, Monitors) of + {ok, Ref} -> erlang:demonitor(Ref, Options), - State#state{monitors = dict:erase(Pid, Monitors)}; + State#state{monitors = ddict_erase_f( + Pid, WantsMonitor, Monitors)}; error -> State end, hibernate}; @@ -214,10 +215,10 @@ handle_cast({invoke, FunOrMFA, Grouped}, State = #state{node = Node}) -> handle_info({'DOWN', Ref, process, Pid, Info}, State = #state{monitors = Monitors, name = Name}) -> - {noreply, case dict:find(Pid, Monitors) of - {ok, {WantsMonitor, Ref}} -> + {noreply, case ddict_find_r(Pid, Ref, Monitors) of + {ok, WantsMonitor} -> WantsMonitor ! {'DOWN', {Name, Pid}, process, Pid, Info}, - State#state{monitors = dict:erase(Pid, Monitors)}; + State#state{monitors = ddict_erase_r(Pid, Ref, Monitors)}; error -> State end, hibernate}; @@ -230,3 +231,49 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. + +%%---------------------------------------------------------------------------- + +ddict_new() -> dict:new(). + +ddict_store(Key, Val1, Val2, Dict) -> + V = case dict:find(Key, Dict) of + {ok, {DictF, DictR}} -> {dict:store(Val1, Val2, DictF), + dict:store(Val2, Val1, DictR)}; + error -> {dict:new(), dict:new()} + end, + dict:store(Key, V, Dict). + +ddict_find_f(Key, Val1, Dict) -> ddict_find(Key, Val1, Dict, fun select_f/1). +ddict_find_r(Key, Val2, Dict) -> ddict_find(Key, Val2, Dict, fun select_r/1). + +ddict_find(Key, ValX, Dict, Select) -> + case dict:find(Key, Dict) of + {ok, Dicts} -> {DictX, _} = Select(Dicts), + dict:find(ValX, DictX); + error -> error + end. + +ddict_erase_f(Key, Val1, Dict) -> ddict_erase(Key, Val1, Dict, fun select_f/1). +ddict_erase_r(Key, Val2, Dict) -> ddict_erase(Key, Val2, Dict, fun select_r/1). + +ddict_erase(Key, ValX, Dict, Select) -> + case dict:find(Key, Dict) of + {ok, Dicts} -> + {DictX, DictY} = Select(Dicts), + Dicts1 = {D, _} = + case dict:find(ValX, DictX) of + {ok, ValY} -> Select({dict:erase(ValX, DictX), + dict:erase(ValY, DictY)}); + error -> Dicts + end, + case dict:size(D) of + 0 -> dict:erase(Key, Dict); + _ -> dict:store(Key, Dicts1, Dict) + end; + error -> + Dict + end. + +select_f({A, B}) -> {A, B}. +select_r({A, B}) -> {B, A}. |