diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-06-25 16:35:24 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-06-25 16:35:24 +0100 |
commit | dfe2579cfe2dfb454cfa03536dea77e92c7c2e4f (patch) | |
tree | 28396d00365c59b1d57b6c5a70fe6421ead1b68a | |
parent | b1eb07923baf6892b542a6a7448a71022ddcdf9f (diff) | |
download | rabbitmq-server-dfe2579cfe2dfb454cfa03536dea77e92c7c2e4f.tar.gz |
Delegate monitoring, with a fairly glaring hole.
-rw-r--r-- | src/delegate.erl | 66 | ||||
-rw-r--r-- | src/dmon.erl | 70 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 14 |
3 files changed, 128 insertions, 22 deletions
diff --git a/src/delegate.erl b/src/delegate.erl index e833b819..a205f2f1 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -18,11 +18,14 @@ -behaviour(gen_server2). --export([start_link/1, invoke_no_result/2, invoke/2, call/2, cast/2]). +-export([start_link/1, invoke_no_result/2, invoke/2, monitor/1, demonitor/2, + call/2, cast/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +-record(state, {node, monitors}). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -35,6 +38,9 @@ [{pid(), term()}]}). -spec(invoke_no_result/2 :: (pid() | [pid()], fun ((pid()) -> any())) -> 'ok'). +-spec(monitor/1 :: (pid()) -> reference()). +-spec(demonitor/2 :: (pid(), reference()) -> 'true'). + -spec(call/2 :: ( pid(), any()) -> any(); ([pid()], any()) -> {[{pid(), any()}], [{pid(), term()}]}). @@ -78,7 +84,7 @@ invoke(Pids, Fun) when is_list(Pids) -> case orddict:fetch_keys(Grouped) of [] -> {[], []}; RemoteNodes -> gen_server2:multi_call( - RemoteNodes, delegate(RemoteNodes), + RemoteNodes, delegate(self(), RemoteNodes), {invoke, Fun, Grouped}, infinity) end, BadPids = [{Pid, {exit, {nodedown, BadNode}, []}} || @@ -106,12 +112,23 @@ invoke_no_result(Pids, Fun) when is_list(Pids) -> {LocalPids, Grouped} = group_pids_by_node(Pids), case orddict:fetch_keys(Grouped) of [] -> ok; - RemoteNodes -> gen_server2:abcast(RemoteNodes, delegate(RemoteNodes), - {invoke, Fun, Grouped}) + RemoteNodes -> gen_server2:abcast( + RemoteNodes, delegate(self(), RemoteNodes), + {invoke, Fun, Grouped}) end, safe_invoke(LocalPids, Fun), %% must not die ok. +monitor(Pid) -> + Node = node(Pid), + Name = delegate(Pid, [Node]), + gen_server2:call({Name, Node}, {monitor, self(), Pid}, infinity). + +demonitor(Pid, Ref) -> + Node = node(Pid), + Name = delegate(Pid, [Node]), + gen_server2:call({Name, Node}, {demonitor, Ref}, infinity). + call(PidOrPids, Msg) -> invoke(PidOrPids, fun (P) -> gen_server2:call(P, Msg, infinity) end). @@ -134,10 +151,10 @@ group_pids_by_node(Pids) -> delegate_name(Hash) -> list_to_atom("delegate_" ++ integer_to_list(Hash)). -delegate(RemoteNodes) -> +delegate(Pid, RemoteNodes) -> case get(delegate) of undefined -> Name = delegate_name( - erlang:phash2(self(), + erlang:phash2(Pid, delegate_sup:count(RemoteNodes))), put(delegate, Name), Name; @@ -156,21 +173,40 @@ safe_invoke(Pid, Fun) when is_pid(Pid) -> %%---------------------------------------------------------------------------- init([]) -> - {ok, node(), hibernate, + {ok, #state{node = node(), monitors = dict:new()}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -handle_call({invoke, Fun, Grouped}, _From, Node) -> - {reply, safe_invoke(orddict:fetch(Node, Grouped), Fun), Node, hibernate}. +handle_call({invoke, Fun, Grouped}, _From, State = #state{node = Node}) -> + {reply, safe_invoke(orddict:fetch(Node, Grouped), Fun), State, hibernate}; -handle_cast({invoke, Fun, Grouped}, Node) -> +handle_call({monitor, WantsMonitor, ToMonitor}, _From, + State = #state{monitors = Monitors}) -> + Ref = erlang:monitor(process, ToMonitor), + State1 = State#state{monitors = dict:store(Ref, WantsMonitor, Monitors)}, + {reply, Ref, State1, hibernate}; + +handle_call({demonitor, Ref}, _From, State = #state{monitors = Monitors}) -> + %% We need to ensure we don't then respond to a 'DOWN' that's + %% currently in our mailbox - if we did then our client might then + %% get a 'DOWN' after demonitor() returns. + State1 = State#state{monitors = dict:erase(Ref, Monitors)}, + {reply, erlang:demonitor(Ref, [flush]), State1, hibernate}. + +handle_cast({invoke, Fun, Grouped}, State = #state{node = Node}) -> safe_invoke(orddict:fetch(Node, Grouped), Fun), - {noreply, Node, hibernate}. + {noreply, State, hibernate}. + +handle_info({'DOWN', Ref, process, Object, Info}, + State = #state{monitors = Monitors}) -> + WantsMonitor = dict:fetch(Ref, Monitors), + WantsMonitor ! {'DOWN', Ref, process, Object, Info}, + {noreply, State#state{monitors = dict:erase(Ref, Monitors)}, hibernate}; -handle_info(_Info, Node) -> - {noreply, Node, hibernate}. +handle_info(_Info, State) -> + {noreply, State, hibernate}. terminate(_Reason, _State) -> ok. -code_change(_OldVsn, Node, _Extra) -> - {ok, Node}. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/src/dmon.erl b/src/dmon.erl new file mode 100644 index 00000000..dfb420c3 --- /dev/null +++ b/src/dmon.erl @@ -0,0 +1,70 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2011-2013 VMware, Inc. All rights reserved. +%% + +-module(dmon). + +-export([new/0, monitor/2, monitor_all/2, demonitor/2, is_monitored/2, erase/2, + monitored/1, is_empty/1]). + +-compile({no_auto_import, [monitor/2]}). + +-ifdef(use_specs). + +%%---------------------------------------------------------------------------- + +-export_type([?MODULE/0]). + +-opaque(?MODULE() :: dict()). + +-type(item() :: pid() | {atom(), node()}). + +-spec(new/0 :: () -> ?MODULE()). +-spec(monitor/2 :: (item(), ?MODULE()) -> ?MODULE()). +-spec(monitor_all/2 :: ([item()], ?MODULE()) -> ?MODULE()). +-spec(demonitor/2 :: (item(), ?MODULE()) -> ?MODULE()). +-spec(is_monitored/2 :: (item(), ?MODULE()) -> boolean()). +-spec(erase/2 :: (item(), ?MODULE()) -> ?MODULE()). +-spec(monitored/1 :: (?MODULE()) -> [item()]). +-spec(is_empty/1 :: (?MODULE()) -> boolean()). + +-endif. + +new() -> dict:new(). + +monitor(Item, M) -> + case dict:is_key(Item, M) of + true -> M; + false -> dict:store(Item, delegate:monitor(Item), M) + end. + +monitor_all([], M) -> M; %% optimisation +monitor_all([Item], M) -> monitor(Item, M); %% optimisation +monitor_all(Items, M) -> lists:foldl(fun monitor/2, M, Items). + +demonitor(Item, M) -> + case dict:find(Item, M) of + {ok, MRef} -> delegate:demonitor(Item, MRef), + dict:erase(Item, M); + error -> M + end. + +is_monitored(Item, M) -> dict:is_key(Item, M). + +erase(Item, M) -> dict:erase(Item, M). + +monitored(M) -> dict:fetch_keys(M). + +is_empty(M) -> dict:size(M) == 0. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 964b0eb4..ca9418c6 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -120,7 +120,7 @@ init(Q = #amqqueue { name = QName }) -> msg_id_ack = dict:new(), msg_id_status = dict:new(), - known_senders = pmon:new(), + known_senders = dmon:new(), depth_delta = undefined }, @@ -488,7 +488,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, %% Everything that we're monitoring, we need to ensure our new %% coordinator is monitoring. - MPids = pmon:monitored(KS), + MPids = dmon:monitored(KS), ok = rabbit_mirror_queue_coordinator:ensure_monitoring(CPid, MPids), %% We find all the messages that we've received from channels but @@ -602,14 +602,14 @@ ensure_rate_timer(State) -> stop_rate_timer(State) -> rabbit_misc:stop_timer(State, #state.rate_timer_ref). ensure_monitoring(ChPid, State = #state { known_senders = KS }) -> - State #state { known_senders = pmon:monitor(ChPid, KS) }. + State #state { known_senders = dmon:monitor(ChPid, KS) }. local_sender_death(ChPid, State = #state { known_senders = KS }) -> %% The channel will be monitored iff we have received a delivery %% from it but not heard about its death from the master. So if it %% is monitored we need to point the death out to the master (see %% essay). - ok = case pmon:is_monitored(ChPid, KS) of + ok = case dmon:is_monitored(ChPid, KS) of false -> ok; true -> confirm_sender_death(ChPid) end, @@ -628,7 +628,7 @@ confirm_sender_death(Pid) -> %% See comment in local_sender_death/2; we might have %% received a sender_death in the meanwhile so check %% again. - ok = case pmon:is_monitored(Pid, KS) of + ok = case dmon:is_monitored(Pid, KS) of false -> ok; true -> gm:broadcast(GM, {ensure_monitoring, [Pid]}), confirm_sender_death(Pid) @@ -776,7 +776,7 @@ process_instruction({sender_death, ChPid}, %% The channel will be monitored iff we have received a message %% from it. In this case we just want to avoid doing work if we %% never got any messages. - {ok, case pmon:is_monitored(ChPid, KS) of + {ok, case dmon:is_monitored(ChPid, KS) of false -> State; true -> MS1 = case dict:find(ChPid, SQ) of error -> @@ -788,7 +788,7 @@ process_instruction({sender_death, ChPid}, credit_flow:peer_down(ChPid), State #state { sender_queues = dict:erase(ChPid, SQ), msg_id_status = MS1, - known_senders = pmon:demonitor(ChPid, KS) } + known_senders = dmon:demonitor(ChPid, KS) } end}; process_instruction({depth, Depth}, State = #state { backing_queue = BQ, |