summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-07-23 14:33:17 +0100
committerSimon MacMullen <simon@rabbitmq.com>2013-07-23 14:33:17 +0100
commitc433f342d1a61b5a188bb4de449127a8ddb5433f (patch)
tree1ba7a26a3011e4c51f7de73486a3e49d8370fe11
parent3de186a95a8c81cd83b9b4c7e5a7aa5cfbc12f5f (diff)
downloadrabbitmq-server-c433f342d1a61b5a188bb4de449127a8ddb5433f.tar.gz
Make monitoring via delegates async. This has the downside that you can't monitor the same pid more than once from the same process, but that is enforced by pmon anyway which is the only client of this code. The upside is that cross-cluster basic.get doesn't deadlock...
-rw-r--r--src/delegate.erl51
1 files changed, 29 insertions, 22 deletions
diff --git a/src/delegate.erl b/src/delegate.erl
index dad2dd3c..30ee33b6 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -24,7 +24,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--record(state, {node, monitors}).
+-record(state, {node, monitors, name}).
%%----------------------------------------------------------------------------
@@ -32,7 +32,7 @@
-export_type([monitor_ref/0]).
--type(monitor_ref() :: reference() | {atom(), reference()}).
+-type(monitor_ref() :: reference() | {atom(), pid()}).
-spec(start_link/1 ::
(non_neg_integer()) -> {'ok', pid()} | ignore | {'error', any()}).
@@ -61,7 +61,8 @@
%%----------------------------------------------------------------------------
start_link(Num) ->
- gen_server2:start_link({local, delegate_name(Num)}, ?MODULE, [], []).
+ Name = delegate_name(Num),
+ gen_server2:start_link({local, Name}, ?MODULE, [Name], []).
invoke(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() ->
Fun(Pid);
@@ -128,14 +129,15 @@ monitor(Type, Pid) when node(Pid) =:= node() ->
erlang:monitor(Type, Pid);
monitor(Type, Pid) ->
Name = delegate(Pid, [node(Pid)]),
- {Name, gen_server2:call(Name, {monitor, Type, self(), Pid}, infinity)}.
+ gen_server2:cast(Name, {monitor, Type, self(), Pid}),
+ {Name, Pid}.
demonitor(Ref) -> ?MODULE:demonitor(Ref, []).
demonitor(Ref, Options) when is_reference(Ref) ->
erlang:demonitor(Ref, Options);
-demonitor({Name, Ref}, Options) ->
- gen_server2:call(Name, {demonitor, Ref, Options}, infinity).
+demonitor({Name, Pid}, Options) ->
+ gen_server2:cast(Name, {demonitor, Pid, Options}).
call(PidOrPids, Msg) ->
invoke(PidOrPids, fun (P) -> gen_server2:call(P, Msg, infinity) end).
@@ -180,34 +182,39 @@ safe_invoke(Pid, Fun) when is_pid(Pid) ->
%%----------------------------------------------------------------------------
-init([]) ->
- {ok, #state{node = node(), monitors = dict:new()}, hibernate,
+init([Name]) ->
+ {ok, #state{node = node(), monitors = dict:new(), name = Name}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
handle_call({invoke, Fun, Grouped}, _From, State = #state{node = Node}) ->
- {reply, safe_invoke(orddict:fetch(Node, Grouped), Fun), State, hibernate};
+ {reply, safe_invoke(orddict:fetch(Node, Grouped), Fun), State, hibernate}.
-handle_call({monitor, Type, WantsMonitor, ToMonitor}, _From,
+handle_cast({monitor, Type, WantsMonitor, Pid},
State = #state{monitors = Monitors}) ->
- Ref = erlang:monitor(Type, ToMonitor),
- State1 = State#state{monitors = dict:store(Ref, WantsMonitor, Monitors)},
- {reply, Ref, State1, hibernate};
+ Ref = erlang:monitor(Type, Pid),
+ Monitors1 = dict:store(Pid, {WantsMonitor, Ref}, Monitors),
+ {noreply, State#state{monitors = Monitors1}, hibernate};
-handle_call({demonitor, Ref, Options}, _From,
+handle_cast({demonitor, Pid, Options},
State = #state{monitors = Monitors}) ->
- State1 = State#state{monitors = dict:erase(Ref, Monitors)},
- {reply, erlang:demonitor(Ref, Options), State1, hibernate}.
+ {noreply, case dict:find(Pid, Monitors) of
+ {ok, {_WantsMonitor, Ref}} ->
+ erlang:demonitor(Ref, Options),
+ State#state{monitors = dict:erase(Pid, Monitors)};
+ error ->
+ State
+ end, hibernate};
handle_cast({invoke, Fun, Grouped}, State = #state{node = Node}) ->
safe_invoke(orddict:fetch(Node, Grouped), Fun),
{noreply, State, hibernate}.
-handle_info({'DOWN', Ref, process, Object, Info},
- State = #state{monitors = Monitors}) ->
- {noreply, case dict:find(Ref, Monitors) of
- {ok, WantsMonitor} ->
- WantsMonitor ! {'DOWN', Ref, process, Object, Info},
- State#state{monitors = dict:erase(Ref, Monitors)};
+handle_info({'DOWN', Ref, process, Pid, Info},
+ State = #state{monitors = Monitors, name = Name}) ->
+ {noreply, case dict:find(Pid, Monitors) of
+ {ok, {WantsMonitor, Ref}} ->
+ WantsMonitor ! {'DOWN', {Name, Pid}, process, Pid, Info},
+ State#state{monitors = dict:erase(Pid, Monitors)};
error ->
State
end, hibernate};