diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-06-25 17:43:57 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-06-25 17:43:57 +0100 |
commit | cf7d449b3df465a77921af7a50e52ee4e9d1c77c (patch) | |
tree | 2e8bf09689272d14e6fa5deff86e80dd7efc76c3 | |
parent | dfe2579cfe2dfb454cfa03536dea77e92c7c2e4f (diff) | |
download | rabbitmq-server-cf7d449b3df465a77921af7a50e52ee4e9d1c77c.tar.gz |
A bit more faff, to deal with genuinely dying nodes.
-rw-r--r-- | src/dmon.erl | 43 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 15 |
2 files changed, 42 insertions, 16 deletions
diff --git a/src/dmon.erl b/src/dmon.erl index dfb420c3..3f89c83a 100644 --- a/src/dmon.erl +++ b/src/dmon.erl @@ -16,8 +16,8 @@ -module(dmon). --export([new/0, monitor/2, monitor_all/2, demonitor/2, is_monitored/2, erase/2, - monitored/1, is_empty/1]). +-export([new/0, monitor/2, monitor_all/2, demonitor/2, is_monitored/2, + monitored/1, monitored/2, is_empty/1]). -compile({no_auto_import, [monitor/2]}). @@ -36,8 +36,8 @@ -spec(monitor_all/2 :: ([item()], ?MODULE()) -> ?MODULE()). -spec(demonitor/2 :: (item(), ?MODULE()) -> ?MODULE()). -spec(is_monitored/2 :: (item(), ?MODULE()) -> boolean()). --spec(erase/2 :: (item(), ?MODULE()) -> ?MODULE()). -spec(monitored/1 :: (?MODULE()) -> [item()]). +-spec(monitored/2 :: (node(), ?MODULE()) -> [item()]). -spec(is_empty/1 :: (?MODULE()) -> boolean()). -endif. @@ -45,9 +45,14 @@ new() -> dict:new(). monitor(Item, M) -> - case dict:is_key(Item, M) of + N = case dict:find(node(Item), M) of + {ok, N0} -> N0; + error -> dict:new() + end, + case dict:is_key(Item, N) of true -> M; - false -> dict:store(Item, delegate:monitor(Item), M) + false -> N2 = dict:store(Item, delegate:monitor(Item), N), + dict:store(node(Item), N2, M) end. monitor_all([], M) -> M; %% optimisation @@ -55,16 +60,30 @@ monitor_all([Item], M) -> monitor(Item, M); %% optimisation monitor_all(Items, M) -> lists:foldl(fun monitor/2, M, Items). demonitor(Item, M) -> - case dict:find(Item, M) of - {ok, MRef} -> delegate:demonitor(Item, MRef), - dict:erase(Item, M); - error -> M + Node = node(Item), + case dict:find(Node, M) of + {ok, N} -> case dict:find(Item, N) of + {ok, MRef} -> delegate:demonitor(Item, MRef), + N2 = dict:erase(Item, N), + case dict:size(N2) of + 0 -> erlang:monitor_node(Node, false), + dict:erase(Node, M); + _ -> dict:store(Node, N2, M) + end; + error -> M + end; + error -> M end. -is_monitored(Item, M) -> dict:is_key(Item, M). +is_monitored(Item, M) -> dict:is_key(node(Item), M) andalso + dict:is_key(Item, dict:fetch(node(Item), M)). -erase(Item, M) -> dict:erase(Item, M). +monitored(M) -> lists:flatten([dict:fetch_keys(dict:fetch(Node, M)) || + Node <- dict:fetch_keys(M)]). -monitored(M) -> dict:fetch_keys(M). +monitored(Node, M) -> case dict:find(Node, M) of + {ok, N} -> dict:fetch_keys(N); + error -> [] + end. is_empty(M) -> dict:size(M) == 0. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index ca9418c6..b4c01440 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -273,7 +273,12 @@ handle_info({'DOWN', _MonitorRef, process, MPid, _Reason}, noreply(State); handle_info({'DOWN', _MonitorRef, process, ChPid, _Reason}, State) -> - noreply(local_sender_death(ChPid, State)); + local_sender_death(ChPid, State), + noreply(State); + +handle_info({node_down, Node}, State) -> + local_sender_node_death(Node, State), + noreply(State); handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; @@ -604,7 +609,7 @@ stop_rate_timer(State) -> rabbit_misc:stop_timer(State, #state.rate_timer_ref). ensure_monitoring(ChPid, State = #state { known_senders = KS }) -> State #state { known_senders = dmon:monitor(ChPid, KS) }. -local_sender_death(ChPid, State = #state { known_senders = KS }) -> +local_sender_death(ChPid, #state { known_senders = KS }) -> %% The channel will be monitored iff we have received a delivery %% from it but not heard about its death from the master. So if it %% is monitored we need to point the death out to the master (see @@ -612,8 +617,10 @@ local_sender_death(ChPid, State = #state { known_senders = KS }) -> ok = case dmon:is_monitored(ChPid, KS) of false -> ok; true -> confirm_sender_death(ChPid) - end, - State. + end. + +local_sender_node_death(Node, State = #state { known_senders = KS }) -> + [local_sender_death(ChPid, State) || ChPid <- dmon:monitored(Node, KS)]. confirm_sender_death(Pid) -> %% We have to deal with the possibility that we'll be promoted to |