diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-07-23 14:33:17 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-07-23 14:33:17 +0100 |
commit | c433f342d1a61b5a188bb4de449127a8ddb5433f (patch) | |
tree | 1ba7a26a3011e4c51f7de73486a3e49d8370fe11 | |
parent | 3de186a95a8c81cd83b9b4c7e5a7aa5cfbc12f5f (diff) | |
download | rabbitmq-server-c433f342d1a61b5a188bb4de449127a8ddb5433f.tar.gz |
Make monitoring via delegates async. This has the downside that you can't monitor the same pid more than once from the same process, but that is enforced by pmon anyway which is the only client of this code. The upside is that cross-cluster basic.get doesn't deadlock...
-rw-r--r-- | src/delegate.erl | 51 |
1 files changed, 29 insertions, 22 deletions
diff --git a/src/delegate.erl b/src/delegate.erl index dad2dd3c..30ee33b6 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -24,7 +24,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {node, monitors}). +-record(state, {node, monitors, name}). %%---------------------------------------------------------------------------- @@ -32,7 +32,7 @@ -export_type([monitor_ref/0]). --type(monitor_ref() :: reference() | {atom(), reference()}). +-type(monitor_ref() :: reference() | {atom(), pid()}). -spec(start_link/1 :: (non_neg_integer()) -> {'ok', pid()} | ignore | {'error', any()}). @@ -61,7 +61,8 @@ %%---------------------------------------------------------------------------- start_link(Num) -> - gen_server2:start_link({local, delegate_name(Num)}, ?MODULE, [], []). + Name = delegate_name(Num), + gen_server2:start_link({local, Name}, ?MODULE, [Name], []). invoke(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() -> Fun(Pid); @@ -128,14 +129,15 @@ monitor(Type, Pid) when node(Pid) =:= node() -> erlang:monitor(Type, Pid); monitor(Type, Pid) -> Name = delegate(Pid, [node(Pid)]), - {Name, gen_server2:call(Name, {monitor, Type, self(), Pid}, infinity)}. + gen_server2:cast(Name, {monitor, Type, self(), Pid}), + {Name, Pid}. demonitor(Ref) -> ?MODULE:demonitor(Ref, []). demonitor(Ref, Options) when is_reference(Ref) -> erlang:demonitor(Ref, Options); -demonitor({Name, Ref}, Options) -> - gen_server2:call(Name, {demonitor, Ref, Options}, infinity). +demonitor({Name, Pid}, Options) -> + gen_server2:cast(Name, {demonitor, Pid, Options}). call(PidOrPids, Msg) -> invoke(PidOrPids, fun (P) -> gen_server2:call(P, Msg, infinity) end). @@ -180,34 +182,39 @@ safe_invoke(Pid, Fun) when is_pid(Pid) -> %%---------------------------------------------------------------------------- -init([]) -> - {ok, #state{node = node(), monitors = dict:new()}, hibernate, +init([Name]) -> + {ok, #state{node = node(), monitors = dict:new(), name = Name}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. handle_call({invoke, Fun, Grouped}, _From, State = #state{node = Node}) -> - {reply, safe_invoke(orddict:fetch(Node, Grouped), Fun), State, hibernate}; + {reply, safe_invoke(orddict:fetch(Node, Grouped), Fun), State, hibernate}. -handle_call({monitor, Type, WantsMonitor, ToMonitor}, _From, +handle_cast({monitor, Type, WantsMonitor, Pid}, State = #state{monitors = Monitors}) -> - Ref = erlang:monitor(Type, ToMonitor), - State1 = State#state{monitors = dict:store(Ref, WantsMonitor, Monitors)}, - {reply, Ref, State1, hibernate}; + Ref = erlang:monitor(Type, Pid), + Monitors1 = dict:store(Pid, {WantsMonitor, Ref}, Monitors), + {noreply, State#state{monitors = Monitors1}, hibernate}; -handle_call({demonitor, Ref, Options}, _From, +handle_cast({demonitor, Pid, Options}, State = #state{monitors = Monitors}) -> - State1 = State#state{monitors = dict:erase(Ref, Monitors)}, - {reply, erlang:demonitor(Ref, Options), State1, hibernate}. + {noreply, case dict:find(Pid, Monitors) of + {ok, {_WantsMonitor, Ref}} -> + erlang:demonitor(Ref, Options), + State#state{monitors = dict:erase(Pid, Monitors)}; + error -> + State + end, hibernate}; handle_cast({invoke, Fun, Grouped}, State = #state{node = Node}) -> safe_invoke(orddict:fetch(Node, Grouped), Fun), {noreply, State, hibernate}. -handle_info({'DOWN', Ref, process, Object, Info}, - State = #state{monitors = Monitors}) -> - {noreply, case dict:find(Ref, Monitors) of - {ok, WantsMonitor} -> - WantsMonitor ! {'DOWN', Ref, process, Object, Info}, - State#state{monitors = dict:erase(Ref, Monitors)}; +handle_info({'DOWN', Ref, process, Pid, Info}, + State = #state{monitors = Monitors, name = Name}) -> + {noreply, case dict:find(Pid, Monitors) of + {ok, {WantsMonitor, Ref}} -> + WantsMonitor ! {'DOWN', {Name, Pid}, process, Pid, Info}, + State#state{monitors = dict:erase(Pid, Monitors)}; error -> State end, hibernate}; |