diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-07-02 14:20:29 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-07-02 14:20:29 +0100 |
commit | e4c7f6bceec82208e65ac86366e9a67817b90f75 (patch) | |
tree | 0dbccdbef95268f5f0cac6ac9677fd2d980ebf34 | |
parent | 09400b8b088aebe3265fcafc5314e77c3d416625 (diff) | |
download | rabbitmq-server-e4c7f6bceec82208e65ac86366e9a67817b90f75.tar.gz |
Make the delegate monitoring API a drop in replacement for the built in one, and thus parameterise pmon and remove dmon.
-rw-r--r-- | src/delegate.erl | 48 | ||||
-rw-r--r-- | src/dmon.erl | 70 | ||||
-rw-r--r-- | src/pmon.erl | 53 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 14 |
4 files changed, 68 insertions, 117 deletions
diff --git a/src/delegate.erl b/src/delegate.erl index 475b087f..03086a59 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -18,8 +18,8 @@ -behaviour(gen_server2). --export([start_link/1, invoke_no_result/2, invoke/2, monitor/1, demonitor/2, - call/2, cast/2]). +-export([start_link/1, invoke_no_result/2, invoke/2, monitor/2, + demonitor/1, demonitor/2, call/2, cast/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -30,6 +30,10 @@ -ifdef(use_specs). +-export_type([monitor_ref/0]). + +-type(monitor_ref() :: reference() | {atom(), reference()}). + -spec(start_link/1 :: (non_neg_integer()) -> {'ok', pid()} | ignore | {'error', any()}). -spec(invoke/2 :: @@ -38,8 +42,9 @@ [{pid(), term()}]}). -spec(invoke_no_result/2 :: (pid() | [pid()], fun ((pid()) -> any())) -> 'ok'). --spec(monitor/1 :: (pid()) -> reference()). --spec(demonitor/2 :: (pid(), reference()) -> 'true'). +-spec(monitor/2 :: (any(), pid()) -> monitor_ref()). +-spec(demonitor/1 :: (monitor_ref()) -> 'true'). +-spec(demonitor/2 :: (monitor_ref(), [any()]) -> 'true'). -spec(call/2 :: ( pid(), any()) -> any(); @@ -119,19 +124,18 @@ invoke_no_result(Pids, Fun) when is_list(Pids) -> safe_invoke(LocalPids, Fun), %% must not die ok. -monitor(Pid) when node(Pid) =:= node() -> - erlang:monitor(process, Pid); -monitor(Pid) -> - Node = node(Pid), - Name = delegate(Pid, [Node]), - gen_server2:call(Name, {monitor, self(), Pid}, infinity). +monitor(Type, Pid) when node(Pid) =:= node() -> + erlang:monitor(Type, Pid); +monitor(Type, Pid) -> + Name = delegate(Pid, [node(Pid)]), + {Name, gen_server2:call(Name, {monitor, Type, self(), Pid}, infinity)}. -demonitor(Pid, Ref) when node(Pid) =:= node() -> - erlang:demonitor(Ref, [flush]); -demonitor(Pid, Ref) -> - Node = node(Pid), - Name = delegate(Pid, [Node]), - gen_server2:call(Name, {demonitor, Ref}, infinity). +demonitor(Ref) -> ?MODULE:demonitor(Ref, []). + +demonitor(Ref, Options) when is_reference(Ref) -> + erlang:demonitor(Ref, Options); +demonitor({Name, Ref}, Options) -> + gen_server2:call(Name, {demonitor, Ref, Options}, infinity). call(PidOrPids, Msg) -> invoke(PidOrPids, fun (P) -> gen_server2:call(P, Msg, infinity) end). @@ -183,18 +187,16 @@ init([]) -> handle_call({invoke, Fun, Grouped}, _From, State = #state{node = Node}) -> {reply, safe_invoke(orddict:fetch(Node, Grouped), Fun), State, hibernate}; -handle_call({monitor, WantsMonitor, ToMonitor}, _From, +handle_call({monitor, Type, WantsMonitor, ToMonitor}, _From, State = #state{monitors = Monitors}) -> - Ref = erlang:monitor(process, ToMonitor), + Ref = erlang:monitor(Type, ToMonitor), State1 = State#state{monitors = dict:store(Ref, WantsMonitor, Monitors)}, {reply, Ref, State1, hibernate}; -handle_call({demonitor, Ref}, _From, State = #state{monitors = Monitors}) -> - %% We need to ensure we don't then respond to a 'DOWN' that's - %% currently in our mailbox - if we did then our client might then - %% get a 'DOWN' after demonitor() returns. +handle_call({demonitor, Ref, Options}, _From, + State = #state{monitors = Monitors}) -> State1 = State#state{monitors = dict:erase(Ref, Monitors)}, - {reply, erlang:demonitor(Ref, [flush]), State1, hibernate}. + {reply, erlang:demonitor(Ref, Options), State1, hibernate}. handle_cast({invoke, Fun, Grouped}, State = #state{node = Node}) -> safe_invoke(orddict:fetch(Node, Grouped), Fun), diff --git a/src/dmon.erl b/src/dmon.erl deleted file mode 100644 index dfb420c3..00000000 --- a/src/dmon.erl +++ /dev/null @@ -1,70 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2011-2013 VMware, Inc. All rights reserved. -%% - --module(dmon). - --export([new/0, monitor/2, monitor_all/2, demonitor/2, is_monitored/2, erase/2, - monitored/1, is_empty/1]). - --compile({no_auto_import, [monitor/2]}). - --ifdef(use_specs). - -%%---------------------------------------------------------------------------- - --export_type([?MODULE/0]). - --opaque(?MODULE() :: dict()). - --type(item() :: pid() | {atom(), node()}). - --spec(new/0 :: () -> ?MODULE()). --spec(monitor/2 :: (item(), ?MODULE()) -> ?MODULE()). --spec(monitor_all/2 :: ([item()], ?MODULE()) -> ?MODULE()). --spec(demonitor/2 :: (item(), ?MODULE()) -> ?MODULE()). --spec(is_monitored/2 :: (item(), ?MODULE()) -> boolean()). --spec(erase/2 :: (item(), ?MODULE()) -> ?MODULE()). --spec(monitored/1 :: (?MODULE()) -> [item()]). --spec(is_empty/1 :: (?MODULE()) -> boolean()). - --endif. - -new() -> dict:new(). - -monitor(Item, M) -> - case dict:is_key(Item, M) of - true -> M; - false -> dict:store(Item, delegate:monitor(Item), M) - end. - -monitor_all([], M) -> M; %% optimisation -monitor_all([Item], M) -> monitor(Item, M); %% optimisation -monitor_all(Items, M) -> lists:foldl(fun monitor/2, M, Items). - -demonitor(Item, M) -> - case dict:find(Item, M) of - {ok, MRef} -> delegate:demonitor(Item, MRef), - dict:erase(Item, M); - error -> M - end. - -is_monitored(Item, M) -> dict:is_key(Item, M). - -erase(Item, M) -> dict:erase(Item, M). - -monitored(M) -> dict:fetch_keys(M). - -is_empty(M) -> dict:size(M) == 0. diff --git a/src/pmon.erl b/src/pmon.erl index ed32b8b2..136f6b90 100644 --- a/src/pmon.erl +++ b/src/pmon.erl @@ -16,22 +16,31 @@ -module(pmon). --export([new/0, monitor/2, monitor_all/2, demonitor/2, is_monitored/2, erase/2, - monitored/1, is_empty/1]). +-export([new/0, new/1, new/3, monitor/2, monitor_all/2, demonitor/2, + is_monitored/2, erase/2, monitored/1, is_empty/1]). -compile({no_auto_import, [monitor/2]}). +-record(state, {dict, monitor, demonitor1, demonitor2}). + -ifdef(use_specs). %%---------------------------------------------------------------------------- -export_type([?MODULE/0]). --opaque(?MODULE() :: dict()). +-opaque(?MODULE() :: #state{dict :: dict(), + monitor :: fun((atom(), any()) -> any()), + demonitor1 :: fun((any()) -> 'true'), + demonitor2 :: fun((any(), [any()]) -> 'true')}). -type(item() :: pid() | {atom(), node()}). -spec(new/0 :: () -> ?MODULE()). +-spec(new/1 :: ('erlang' | 'delegate') -> ?MODULE()). +-spec(new/3 :: (fun((atom(), any()) -> any()), + fun((any()) -> 'true'), + fun((any(), [any()]) -> 'true')) -> ?MODULE()). -spec(monitor/2 :: (item(), ?MODULE()) -> ?MODULE()). -spec(monitor_all/2 :: ([item()], ?MODULE()) -> ?MODULE()). -spec(demonitor/2 :: (item(), ?MODULE()) -> ?MODULE()). @@ -42,29 +51,39 @@ -endif. -new() -> dict:new(). +new() -> new(erlang). + +new(erlang) -> new(fun erlang:monitor/2, + fun erlang:demonitor/1, fun erlang:demonitor/2); +new(delegate) -> new(fun delegate:monitor/2, + fun delegate:demonitor/1, fun delegate:demonitor/2). + +new(Monitor, Demonitor1, Demonitor2) -> #state{dict = dict:new(), + monitor = Monitor, + demonitor1 = Demonitor1, + demonitor2 = Demonitor2}. -monitor(Item, M) -> +monitor(Item, S = #state{dict = M, monitor = Monitor}) -> case dict:is_key(Item, M) of - true -> M; - false -> dict:store(Item, erlang:monitor(process, Item), M) + true -> S; + false -> S#state{dict = dict:store(Item, Monitor(process, Item), M)} end. -monitor_all([], M) -> M; %% optimisation -monitor_all([Item], M) -> monitor(Item, M); %% optimisation -monitor_all(Items, M) -> lists:foldl(fun monitor/2, M, Items). +monitor_all([], S) -> S; %% optimisation +monitor_all([Item], S) -> monitor(Item, S); %% optimisation +monitor_all(Items, S) -> lists:foldl(fun monitor/2, S, Items). -demonitor(Item, M) -> +demonitor(Item, S = #state{dict = M, demonitor1 = Demonitor1}) -> case dict:find(Item, M) of - {ok, MRef} -> erlang:demonitor(MRef), - dict:erase(Item, M); + {ok, MRef} -> Demonitor1(MRef), + S#state{dict = dict:erase(Item, M)}; error -> M end. -is_monitored(Item, M) -> dict:is_key(Item, M). +is_monitored(Item, #state{dict = M}) -> dict:is_key(Item, M). -erase(Item, M) -> dict:erase(Item, M). +erase(Item, S = #state{dict = M}) -> S#state{dict = dict:erase(Item, M)}. -monitored(M) -> dict:fetch_keys(M). +monitored(#state{dict = M}) -> dict:fetch_keys(M). -is_empty(M) -> dict:size(M) == 0. +is_empty(#state{dict = M}) -> dict:size(M) == 0. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index ac3e0df0..88b0f005 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -120,7 +120,7 @@ init(Q = #amqqueue { name = QName }) -> msg_id_ack = dict:new(), msg_id_status = dict:new(), - known_senders = dmon:new(), + known_senders = pmon:new(delegate), depth_delta = undefined }, @@ -489,7 +489,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, %% Everything that we're monitoring, we need to ensure our new %% coordinator is monitoring. - MPids = dmon:monitored(KS), + MPids = pmon:monitored(KS), ok = rabbit_mirror_queue_coordinator:ensure_monitoring(CPid, MPids), %% We find all the messages that we've received from channels but @@ -603,14 +603,14 @@ ensure_rate_timer(State) -> stop_rate_timer(State) -> rabbit_misc:stop_timer(State, #state.rate_timer_ref). ensure_monitoring(ChPid, State = #state { known_senders = KS }) -> - State #state { known_senders = dmon:monitor(ChPid, KS) }. + State #state { known_senders = pmon:monitor(ChPid, KS) }. local_sender_death(ChPid, #state { known_senders = KS }) -> %% The channel will be monitored iff we have received a delivery %% from it but not heard about its death from the master. So if it %% is monitored we need to point the death out to the master (see %% essay). - ok = case dmon:is_monitored(ChPid, KS) of + ok = case pmon:is_monitored(ChPid, KS) of false -> ok; true -> confirm_sender_death(ChPid) end. @@ -628,7 +628,7 @@ confirm_sender_death(Pid) -> %% See comment in local_sender_death/2; we might have %% received a sender_death in the meanwhile so check %% again. - ok = case dmon:is_monitored(Pid, KS) of + ok = case pmon:is_monitored(Pid, KS) of false -> ok; true -> gm:broadcast(GM, {ensure_monitoring, [Pid]}), confirm_sender_death(Pid) @@ -776,7 +776,7 @@ process_instruction({sender_death, ChPid}, %% The channel will be monitored iff we have received a message %% from it. In this case we just want to avoid doing work if we %% never got any messages. - {ok, case dmon:is_monitored(ChPid, KS) of + {ok, case pmon:is_monitored(ChPid, KS) of false -> State; true -> MS1 = case dict:find(ChPid, SQ) of error -> @@ -788,7 +788,7 @@ process_instruction({sender_death, ChPid}, credit_flow:peer_down(ChPid), State #state { sender_queues = dict:erase(ChPid, SQ), msg_id_status = MS1, - known_senders = dmon:demonitor(ChPid, KS) } + known_senders = pmon:demonitor(ChPid, KS) } end}; process_instruction({depth, Depth}, State = #state { backing_queue = BQ, |