summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-06-26 12:59:33 +0100
committerSimon MacMullen <simon@rabbitmq.com>2013-06-26 12:59:33 +0100
commit09400b8b088aebe3265fcafc5314e77c3d416625 (patch)
tree42b83a1cdac95687b1b6e0d0f168738015922a20
parentcf7d449b3df465a77921af7a50e52ee4e9d1c77c (diff)
downloadrabbitmq-server-09400b8b088aebe3265fcafc5314e77c3d416625.tar.gz
Do the monitoring on the correct node(!) and remove all that nonsense about node-wide monitoring.
-rw-r--r--src/delegate.erl8
-rw-r--r--src/dmon.erl43
-rw-r--r--src/rabbit_mirror_queue_slave.erl7
3 files changed, 18 insertions, 40 deletions
diff --git a/src/delegate.erl b/src/delegate.erl
index a205f2f1..475b087f 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -119,15 +119,19 @@ invoke_no_result(Pids, Fun) when is_list(Pids) ->
safe_invoke(LocalPids, Fun), %% must not die
ok.
+monitor(Pid) when node(Pid) =:= node() ->
+ erlang:monitor(process, Pid);
monitor(Pid) ->
Node = node(Pid),
Name = delegate(Pid, [Node]),
- gen_server2:call({Name, Node}, {monitor, self(), Pid}, infinity).
+ gen_server2:call(Name, {monitor, self(), Pid}, infinity).
+demonitor(Pid, Ref) when node(Pid) =:= node() ->
+ erlang:demonitor(Ref, [flush]);
demonitor(Pid, Ref) ->
Node = node(Pid),
Name = delegate(Pid, [Node]),
- gen_server2:call({Name, Node}, {demonitor, Ref}, infinity).
+ gen_server2:call(Name, {demonitor, Ref}, infinity).
call(PidOrPids, Msg) ->
invoke(PidOrPids, fun (P) -> gen_server2:call(P, Msg, infinity) end).
diff --git a/src/dmon.erl b/src/dmon.erl
index 3f89c83a..dfb420c3 100644
--- a/src/dmon.erl
+++ b/src/dmon.erl
@@ -16,8 +16,8 @@
-module(dmon).
--export([new/0, monitor/2, monitor_all/2, demonitor/2, is_monitored/2,
- monitored/1, monitored/2, is_empty/1]).
+-export([new/0, monitor/2, monitor_all/2, demonitor/2, is_monitored/2, erase/2,
+ monitored/1, is_empty/1]).
-compile({no_auto_import, [monitor/2]}).
@@ -36,8 +36,8 @@
-spec(monitor_all/2 :: ([item()], ?MODULE()) -> ?MODULE()).
-spec(demonitor/2 :: (item(), ?MODULE()) -> ?MODULE()).
-spec(is_monitored/2 :: (item(), ?MODULE()) -> boolean()).
+-spec(erase/2 :: (item(), ?MODULE()) -> ?MODULE()).
-spec(monitored/1 :: (?MODULE()) -> [item()]).
--spec(monitored/2 :: (node(), ?MODULE()) -> [item()]).
-spec(is_empty/1 :: (?MODULE()) -> boolean()).
-endif.
@@ -45,14 +45,9 @@
new() -> dict:new().
monitor(Item, M) ->
- N = case dict:find(node(Item), M) of
- {ok, N0} -> N0;
- error -> dict:new()
- end,
- case dict:is_key(Item, N) of
+ case dict:is_key(Item, M) of
true -> M;
- false -> N2 = dict:store(Item, delegate:monitor(Item), N),
- dict:store(node(Item), N2, M)
+ false -> dict:store(Item, delegate:monitor(Item), M)
end.
monitor_all([], M) -> M; %% optimisation
@@ -60,30 +55,16 @@ monitor_all([Item], M) -> monitor(Item, M); %% optimisation
monitor_all(Items, M) -> lists:foldl(fun monitor/2, M, Items).
demonitor(Item, M) ->
- Node = node(Item),
- case dict:find(Node, M) of
- {ok, N} -> case dict:find(Item, N) of
- {ok, MRef} -> delegate:demonitor(Item, MRef),
- N2 = dict:erase(Item, N),
- case dict:size(N2) of
- 0 -> erlang:monitor_node(Node, false),
- dict:erase(Node, M);
- _ -> dict:store(Node, N2, M)
- end;
- error -> M
- end;
- error -> M
+ case dict:find(Item, M) of
+ {ok, MRef} -> delegate:demonitor(Item, MRef),
+ dict:erase(Item, M);
+ error -> M
end.
-is_monitored(Item, M) -> dict:is_key(node(Item), M) andalso
- dict:is_key(Item, dict:fetch(node(Item), M)).
+is_monitored(Item, M) -> dict:is_key(Item, M).
-monitored(M) -> lists:flatten([dict:fetch_keys(dict:fetch(Node, M)) ||
- Node <- dict:fetch_keys(M)]).
+erase(Item, M) -> dict:erase(Item, M).
-monitored(Node, M) -> case dict:find(Node, M) of
- {ok, N} -> dict:fetch_keys(N);
- error -> []
- end.
+monitored(M) -> dict:fetch_keys(M).
is_empty(M) -> dict:size(M) == 0.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index b4c01440..ac3e0df0 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -276,10 +276,6 @@ handle_info({'DOWN', _MonitorRef, process, ChPid, _Reason}, State) ->
local_sender_death(ChPid, State),
noreply(State);
-handle_info({node_down, Node}, State) ->
- local_sender_node_death(Node, State),
- noreply(State);
-
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
@@ -619,9 +615,6 @@ local_sender_death(ChPid, #state { known_senders = KS }) ->
true -> confirm_sender_death(ChPid)
end.
-local_sender_node_death(Node, State = #state { known_senders = KS }) ->
- [local_sender_death(ChPid, State) || ChPid <- dmon:monitored(Node, KS)].
-
confirm_sender_death(Pid) ->
%% We have to deal with the possibility that we'll be promoted to
%% master before this thing gets run. Consequently we set the