diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-06-26 12:59:33 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-06-26 12:59:33 +0100 |
commit | 09400b8b088aebe3265fcafc5314e77c3d416625 (patch) | |
tree | 42b83a1cdac95687b1b6e0d0f168738015922a20 | |
parent | cf7d449b3df465a77921af7a50e52ee4e9d1c77c (diff) | |
download | rabbitmq-server-09400b8b088aebe3265fcafc5314e77c3d416625.tar.gz |
Do the monitoring on the correct node(!) and remove all that nonsense about node-wide monitoring.
-rw-r--r-- | src/delegate.erl | 8 | ||||
-rw-r--r-- | src/dmon.erl | 43 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 7 |
3 files changed, 18 insertions, 40 deletions
diff --git a/src/delegate.erl b/src/delegate.erl index a205f2f1..475b087f 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -119,15 +119,19 @@ invoke_no_result(Pids, Fun) when is_list(Pids) -> safe_invoke(LocalPids, Fun), %% must not die ok. +monitor(Pid) when node(Pid) =:= node() -> + erlang:monitor(process, Pid); monitor(Pid) -> Node = node(Pid), Name = delegate(Pid, [Node]), - gen_server2:call({Name, Node}, {monitor, self(), Pid}, infinity). + gen_server2:call(Name, {monitor, self(), Pid}, infinity). +demonitor(Pid, Ref) when node(Pid) =:= node() -> + erlang:demonitor(Ref, [flush]); demonitor(Pid, Ref) -> Node = node(Pid), Name = delegate(Pid, [Node]), - gen_server2:call({Name, Node}, {demonitor, Ref}, infinity). + gen_server2:call(Name, {demonitor, Ref}, infinity). call(PidOrPids, Msg) -> invoke(PidOrPids, fun (P) -> gen_server2:call(P, Msg, infinity) end). diff --git a/src/dmon.erl b/src/dmon.erl index 3f89c83a..dfb420c3 100644 --- a/src/dmon.erl +++ b/src/dmon.erl @@ -16,8 +16,8 @@ -module(dmon). --export([new/0, monitor/2, monitor_all/2, demonitor/2, is_monitored/2, - monitored/1, monitored/2, is_empty/1]). +-export([new/0, monitor/2, monitor_all/2, demonitor/2, is_monitored/2, erase/2, + monitored/1, is_empty/1]). -compile({no_auto_import, [monitor/2]}). @@ -36,8 +36,8 @@ -spec(monitor_all/2 :: ([item()], ?MODULE()) -> ?MODULE()). -spec(demonitor/2 :: (item(), ?MODULE()) -> ?MODULE()). -spec(is_monitored/2 :: (item(), ?MODULE()) -> boolean()). +-spec(erase/2 :: (item(), ?MODULE()) -> ?MODULE()). -spec(monitored/1 :: (?MODULE()) -> [item()]). --spec(monitored/2 :: (node(), ?MODULE()) -> [item()]). -spec(is_empty/1 :: (?MODULE()) -> boolean()). -endif. @@ -45,14 +45,9 @@ new() -> dict:new(). monitor(Item, M) -> - N = case dict:find(node(Item), M) of - {ok, N0} -> N0; - error -> dict:new() - end, - case dict:is_key(Item, N) of + case dict:is_key(Item, M) of true -> M; - false -> N2 = dict:store(Item, delegate:monitor(Item), N), - dict:store(node(Item), N2, M) + false -> dict:store(Item, delegate:monitor(Item), M) end. monitor_all([], M) -> M; %% optimisation @@ -60,30 +55,16 @@ monitor_all([Item], M) -> monitor(Item, M); %% optimisation monitor_all(Items, M) -> lists:foldl(fun monitor/2, M, Items). demonitor(Item, M) -> - Node = node(Item), - case dict:find(Node, M) of - {ok, N} -> case dict:find(Item, N) of - {ok, MRef} -> delegate:demonitor(Item, MRef), - N2 = dict:erase(Item, N), - case dict:size(N2) of - 0 -> erlang:monitor_node(Node, false), - dict:erase(Node, M); - _ -> dict:store(Node, N2, M) - end; - error -> M - end; - error -> M + case dict:find(Item, M) of + {ok, MRef} -> delegate:demonitor(Item, MRef), + dict:erase(Item, M); + error -> M end. -is_monitored(Item, M) -> dict:is_key(node(Item), M) andalso - dict:is_key(Item, dict:fetch(node(Item), M)). +is_monitored(Item, M) -> dict:is_key(Item, M). -monitored(M) -> lists:flatten([dict:fetch_keys(dict:fetch(Node, M)) || - Node <- dict:fetch_keys(M)]). +erase(Item, M) -> dict:erase(Item, M). -monitored(Node, M) -> case dict:find(Node, M) of - {ok, N} -> dict:fetch_keys(N); - error -> [] - end. +monitored(M) -> dict:fetch_keys(M). is_empty(M) -> dict:size(M) == 0. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index b4c01440..ac3e0df0 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -276,10 +276,6 @@ handle_info({'DOWN', _MonitorRef, process, ChPid, _Reason}, State) -> local_sender_death(ChPid, State), noreply(State); -handle_info({node_down, Node}, State) -> - local_sender_node_death(Node, State), - noreply(State); - handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; @@ -619,9 +615,6 @@ local_sender_death(ChPid, #state { known_senders = KS }) -> true -> confirm_sender_death(ChPid) end. -local_sender_node_death(Node, State = #state { known_senders = KS }) -> - [local_sender_death(ChPid, State) || ChPid <- dmon:monitored(Node, KS)]. - confirm_sender_death(Pid) -> %% We have to deal with the possibility that we'll be promoted to %% master before this thing gets run. Consequently we set the |