summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-07-02 14:20:29 +0100
committerSimon MacMullen <simon@rabbitmq.com>2013-07-02 14:20:29 +0100
commite4c7f6bceec82208e65ac86366e9a67817b90f75 (patch)
tree0dbccdbef95268f5f0cac6ac9677fd2d980ebf34
parent09400b8b088aebe3265fcafc5314e77c3d416625 (diff)
downloadrabbitmq-server-e4c7f6bceec82208e65ac86366e9a67817b90f75.tar.gz
Make the delegate monitoring API a drop in replacement for the built in one, and thus parameterise pmon and remove dmon.
-rw-r--r--src/delegate.erl48
-rw-r--r--src/dmon.erl70
-rw-r--r--src/pmon.erl53
-rw-r--r--src/rabbit_mirror_queue_slave.erl14
4 files changed, 68 insertions, 117 deletions
diff --git a/src/delegate.erl b/src/delegate.erl
index 475b087f..03086a59 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -18,8 +18,8 @@
-behaviour(gen_server2).
--export([start_link/1, invoke_no_result/2, invoke/2, monitor/1, demonitor/2,
- call/2, cast/2]).
+-export([start_link/1, invoke_no_result/2, invoke/2, monitor/2,
+ demonitor/1, demonitor/2, call/2, cast/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -30,6 +30,10 @@
-ifdef(use_specs).
+-export_type([monitor_ref/0]).
+
+-type(monitor_ref() :: reference() | {atom(), reference()}).
+
-spec(start_link/1 ::
(non_neg_integer()) -> {'ok', pid()} | ignore | {'error', any()}).
-spec(invoke/2 ::
@@ -38,8 +42,9 @@
[{pid(), term()}]}).
-spec(invoke_no_result/2 ::
(pid() | [pid()], fun ((pid()) -> any())) -> 'ok').
--spec(monitor/1 :: (pid()) -> reference()).
--spec(demonitor/2 :: (pid(), reference()) -> 'true').
+-spec(monitor/2 :: (any(), pid()) -> monitor_ref()).
+-spec(demonitor/1 :: (monitor_ref()) -> 'true').
+-spec(demonitor/2 :: (monitor_ref(), [any()]) -> 'true').
-spec(call/2 ::
( pid(), any()) -> any();
@@ -119,19 +124,18 @@ invoke_no_result(Pids, Fun) when is_list(Pids) ->
safe_invoke(LocalPids, Fun), %% must not die
ok.
-monitor(Pid) when node(Pid) =:= node() ->
- erlang:monitor(process, Pid);
-monitor(Pid) ->
- Node = node(Pid),
- Name = delegate(Pid, [Node]),
- gen_server2:call(Name, {monitor, self(), Pid}, infinity).
+monitor(Type, Pid) when node(Pid) =:= node() ->
+ erlang:monitor(Type, Pid);
+monitor(Type, Pid) ->
+ Name = delegate(Pid, [node(Pid)]),
+ {Name, gen_server2:call(Name, {monitor, Type, self(), Pid}, infinity)}.
-demonitor(Pid, Ref) when node(Pid) =:= node() ->
- erlang:demonitor(Ref, [flush]);
-demonitor(Pid, Ref) ->
- Node = node(Pid),
- Name = delegate(Pid, [Node]),
- gen_server2:call(Name, {demonitor, Ref}, infinity).
+demonitor(Ref) -> ?MODULE:demonitor(Ref, []).
+
+demonitor(Ref, Options) when is_reference(Ref) ->
+ erlang:demonitor(Ref, Options);
+demonitor({Name, Ref}, Options) ->
+ gen_server2:call(Name, {demonitor, Ref, Options}, infinity).
call(PidOrPids, Msg) ->
invoke(PidOrPids, fun (P) -> gen_server2:call(P, Msg, infinity) end).
@@ -183,18 +187,16 @@ init([]) ->
handle_call({invoke, Fun, Grouped}, _From, State = #state{node = Node}) ->
{reply, safe_invoke(orddict:fetch(Node, Grouped), Fun), State, hibernate};
-handle_call({monitor, WantsMonitor, ToMonitor}, _From,
+handle_call({monitor, Type, WantsMonitor, ToMonitor}, _From,
State = #state{monitors = Monitors}) ->
- Ref = erlang:monitor(process, ToMonitor),
+ Ref = erlang:monitor(Type, ToMonitor),
State1 = State#state{monitors = dict:store(Ref, WantsMonitor, Monitors)},
{reply, Ref, State1, hibernate};
-handle_call({demonitor, Ref}, _From, State = #state{monitors = Monitors}) ->
- %% We need to ensure we don't then respond to a 'DOWN' that's
- %% currently in our mailbox - if we did then our client might then
- %% get a 'DOWN' after demonitor() returns.
+handle_call({demonitor, Ref, Options}, _From,
+ State = #state{monitors = Monitors}) ->
State1 = State#state{monitors = dict:erase(Ref, Monitors)},
- {reply, erlang:demonitor(Ref, [flush]), State1, hibernate}.
+ {reply, erlang:demonitor(Ref, Options), State1, hibernate}.
handle_cast({invoke, Fun, Grouped}, State = #state{node = Node}) ->
safe_invoke(orddict:fetch(Node, Grouped), Fun),
diff --git a/src/dmon.erl b/src/dmon.erl
deleted file mode 100644
index dfb420c3..00000000
--- a/src/dmon.erl
+++ /dev/null
@@ -1,70 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2011-2013 VMware, Inc. All rights reserved.
-%%
-
--module(dmon).
-
--export([new/0, monitor/2, monitor_all/2, demonitor/2, is_monitored/2, erase/2,
- monitored/1, is_empty/1]).
-
--compile({no_auto_import, [monitor/2]}).
-
--ifdef(use_specs).
-
-%%----------------------------------------------------------------------------
-
--export_type([?MODULE/0]).
-
--opaque(?MODULE() :: dict()).
-
--type(item() :: pid() | {atom(), node()}).
-
--spec(new/0 :: () -> ?MODULE()).
--spec(monitor/2 :: (item(), ?MODULE()) -> ?MODULE()).
--spec(monitor_all/2 :: ([item()], ?MODULE()) -> ?MODULE()).
--spec(demonitor/2 :: (item(), ?MODULE()) -> ?MODULE()).
--spec(is_monitored/2 :: (item(), ?MODULE()) -> boolean()).
--spec(erase/2 :: (item(), ?MODULE()) -> ?MODULE()).
--spec(monitored/1 :: (?MODULE()) -> [item()]).
--spec(is_empty/1 :: (?MODULE()) -> boolean()).
-
--endif.
-
-new() -> dict:new().
-
-monitor(Item, M) ->
- case dict:is_key(Item, M) of
- true -> M;
- false -> dict:store(Item, delegate:monitor(Item), M)
- end.
-
-monitor_all([], M) -> M; %% optimisation
-monitor_all([Item], M) -> monitor(Item, M); %% optimisation
-monitor_all(Items, M) -> lists:foldl(fun monitor/2, M, Items).
-
-demonitor(Item, M) ->
- case dict:find(Item, M) of
- {ok, MRef} -> delegate:demonitor(Item, MRef),
- dict:erase(Item, M);
- error -> M
- end.
-
-is_monitored(Item, M) -> dict:is_key(Item, M).
-
-erase(Item, M) -> dict:erase(Item, M).
-
-monitored(M) -> dict:fetch_keys(M).
-
-is_empty(M) -> dict:size(M) == 0.
diff --git a/src/pmon.erl b/src/pmon.erl
index ed32b8b2..136f6b90 100644
--- a/src/pmon.erl
+++ b/src/pmon.erl
@@ -16,22 +16,31 @@
-module(pmon).
--export([new/0, monitor/2, monitor_all/2, demonitor/2, is_monitored/2, erase/2,
- monitored/1, is_empty/1]).
+-export([new/0, new/1, new/3, monitor/2, monitor_all/2, demonitor/2,
+ is_monitored/2, erase/2, monitored/1, is_empty/1]).
-compile({no_auto_import, [monitor/2]}).
+-record(state, {dict, monitor, demonitor1, demonitor2}).
+
-ifdef(use_specs).
%%----------------------------------------------------------------------------
-export_type([?MODULE/0]).
--opaque(?MODULE() :: dict()).
+-opaque(?MODULE() :: #state{dict :: dict(),
+ monitor :: fun((atom(), any()) -> any()),
+ demonitor1 :: fun((any()) -> 'true'),
+ demonitor2 :: fun((any(), [any()]) -> 'true')}).
-type(item() :: pid() | {atom(), node()}).
-spec(new/0 :: () -> ?MODULE()).
+-spec(new/1 :: ('erlang' | 'delegate') -> ?MODULE()).
+-spec(new/3 :: (fun((atom(), any()) -> any()),
+ fun((any()) -> 'true'),
+ fun((any(), [any()]) -> 'true')) -> ?MODULE()).
-spec(monitor/2 :: (item(), ?MODULE()) -> ?MODULE()).
-spec(monitor_all/2 :: ([item()], ?MODULE()) -> ?MODULE()).
-spec(demonitor/2 :: (item(), ?MODULE()) -> ?MODULE()).
@@ -42,29 +51,39 @@
-endif.
-new() -> dict:new().
+new() -> new(erlang).
+
+new(erlang) -> new(fun erlang:monitor/2,
+ fun erlang:demonitor/1, fun erlang:demonitor/2);
+new(delegate) -> new(fun delegate:monitor/2,
+ fun delegate:demonitor/1, fun delegate:demonitor/2).
+
+new(Monitor, Demonitor1, Demonitor2) -> #state{dict = dict:new(),
+ monitor = Monitor,
+ demonitor1 = Demonitor1,
+ demonitor2 = Demonitor2}.
-monitor(Item, M) ->
+monitor(Item, S = #state{dict = M, monitor = Monitor}) ->
case dict:is_key(Item, M) of
- true -> M;
- false -> dict:store(Item, erlang:monitor(process, Item), M)
+ true -> S;
+ false -> S#state{dict = dict:store(Item, Monitor(process, Item), M)}
end.
-monitor_all([], M) -> M; %% optimisation
-monitor_all([Item], M) -> monitor(Item, M); %% optimisation
-monitor_all(Items, M) -> lists:foldl(fun monitor/2, M, Items).
+monitor_all([], S) -> S; %% optimisation
+monitor_all([Item], S) -> monitor(Item, S); %% optimisation
+monitor_all(Items, S) -> lists:foldl(fun monitor/2, S, Items).
-demonitor(Item, M) ->
+demonitor(Item, S = #state{dict = M, demonitor1 = Demonitor1}) ->
case dict:find(Item, M) of
- {ok, MRef} -> erlang:demonitor(MRef),
- dict:erase(Item, M);
+ {ok, MRef} -> Demonitor1(MRef),
+ S#state{dict = dict:erase(Item, M)};
error -> M
end.
-is_monitored(Item, M) -> dict:is_key(Item, M).
+is_monitored(Item, #state{dict = M}) -> dict:is_key(Item, M).
-erase(Item, M) -> dict:erase(Item, M).
+erase(Item, S = #state{dict = M}) -> S#state{dict = dict:erase(Item, M)}.
-monitored(M) -> dict:fetch_keys(M).
+monitored(#state{dict = M}) -> dict:fetch_keys(M).
-is_empty(M) -> dict:size(M) == 0.
+is_empty(#state{dict = M}) -> dict:size(M) == 0.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index ac3e0df0..88b0f005 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -120,7 +120,7 @@ init(Q = #amqqueue { name = QName }) ->
msg_id_ack = dict:new(),
msg_id_status = dict:new(),
- known_senders = dmon:new(),
+ known_senders = pmon:new(delegate),
depth_delta = undefined
},
@@ -489,7 +489,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
%% Everything that we're monitoring, we need to ensure our new
%% coordinator is monitoring.
- MPids = dmon:monitored(KS),
+ MPids = pmon:monitored(KS),
ok = rabbit_mirror_queue_coordinator:ensure_monitoring(CPid, MPids),
%% We find all the messages that we've received from channels but
@@ -603,14 +603,14 @@ ensure_rate_timer(State) ->
stop_rate_timer(State) -> rabbit_misc:stop_timer(State, #state.rate_timer_ref).
ensure_monitoring(ChPid, State = #state { known_senders = KS }) ->
- State #state { known_senders = dmon:monitor(ChPid, KS) }.
+ State #state { known_senders = pmon:monitor(ChPid, KS) }.
local_sender_death(ChPid, #state { known_senders = KS }) ->
%% The channel will be monitored iff we have received a delivery
%% from it but not heard about its death from the master. So if it
%% is monitored we need to point the death out to the master (see
%% essay).
- ok = case dmon:is_monitored(ChPid, KS) of
+ ok = case pmon:is_monitored(ChPid, KS) of
false -> ok;
true -> confirm_sender_death(ChPid)
end.
@@ -628,7 +628,7 @@ confirm_sender_death(Pid) ->
%% See comment in local_sender_death/2; we might have
%% received a sender_death in the meanwhile so check
%% again.
- ok = case dmon:is_monitored(Pid, KS) of
+ ok = case pmon:is_monitored(Pid, KS) of
false -> ok;
true -> gm:broadcast(GM, {ensure_monitoring, [Pid]}),
confirm_sender_death(Pid)
@@ -776,7 +776,7 @@ process_instruction({sender_death, ChPid},
%% The channel will be monitored iff we have received a message
%% from it. In this case we just want to avoid doing work if we
%% never got any messages.
- {ok, case dmon:is_monitored(ChPid, KS) of
+ {ok, case pmon:is_monitored(ChPid, KS) of
false -> State;
true -> MS1 = case dict:find(ChPid, SQ) of
error ->
@@ -788,7 +788,7 @@ process_instruction({sender_death, ChPid},
credit_flow:peer_down(ChPid),
State #state { sender_queues = dict:erase(ChPid, SQ),
msg_id_status = MS1,
- known_senders = dmon:demonitor(ChPid, KS) }
+ known_senders = pmon:demonitor(ChPid, KS) }
end};
process_instruction({depth, Depth},
State = #state { backing_queue = BQ,