summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-06-25 16:35:24 +0100
committerSimon MacMullen <simon@rabbitmq.com>2013-06-25 16:35:24 +0100
commitdfe2579cfe2dfb454cfa03536dea77e92c7c2e4f (patch)
tree28396d00365c59b1d57b6c5a70fe6421ead1b68a
parentb1eb07923baf6892b542a6a7448a71022ddcdf9f (diff)
downloadrabbitmq-server-dfe2579cfe2dfb454cfa03536dea77e92c7c2e4f.tar.gz
Delegate monitoring, with a fairly glaring hole.
-rw-r--r--src/delegate.erl66
-rw-r--r--src/dmon.erl70
-rw-r--r--src/rabbit_mirror_queue_slave.erl14
3 files changed, 128 insertions, 22 deletions
diff --git a/src/delegate.erl b/src/delegate.erl
index e833b819..a205f2f1 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -18,11 +18,14 @@
-behaviour(gen_server2).
--export([start_link/1, invoke_no_result/2, invoke/2, call/2, cast/2]).
+-export([start_link/1, invoke_no_result/2, invoke/2, monitor/1, demonitor/2,
+ call/2, cast/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
+-record(state, {node, monitors}).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -35,6 +38,9 @@
[{pid(), term()}]}).
-spec(invoke_no_result/2 ::
(pid() | [pid()], fun ((pid()) -> any())) -> 'ok').
+-spec(monitor/1 :: (pid()) -> reference()).
+-spec(demonitor/2 :: (pid(), reference()) -> 'true').
+
-spec(call/2 ::
( pid(), any()) -> any();
([pid()], any()) -> {[{pid(), any()}], [{pid(), term()}]}).
@@ -78,7 +84,7 @@ invoke(Pids, Fun) when is_list(Pids) ->
case orddict:fetch_keys(Grouped) of
[] -> {[], []};
RemoteNodes -> gen_server2:multi_call(
- RemoteNodes, delegate(RemoteNodes),
+ RemoteNodes, delegate(self(), RemoteNodes),
{invoke, Fun, Grouped}, infinity)
end,
BadPids = [{Pid, {exit, {nodedown, BadNode}, []}} ||
@@ -106,12 +112,23 @@ invoke_no_result(Pids, Fun) when is_list(Pids) ->
{LocalPids, Grouped} = group_pids_by_node(Pids),
case orddict:fetch_keys(Grouped) of
[] -> ok;
- RemoteNodes -> gen_server2:abcast(RemoteNodes, delegate(RemoteNodes),
- {invoke, Fun, Grouped})
+ RemoteNodes -> gen_server2:abcast(
+ RemoteNodes, delegate(self(), RemoteNodes),
+ {invoke, Fun, Grouped})
end,
safe_invoke(LocalPids, Fun), %% must not die
ok.
+monitor(Pid) ->
+ Node = node(Pid),
+ Name = delegate(Pid, [Node]),
+ gen_server2:call({Name, Node}, {monitor, self(), Pid}, infinity).
+
+demonitor(Pid, Ref) ->
+ Node = node(Pid),
+ Name = delegate(Pid, [Node]),
+ gen_server2:call({Name, Node}, {demonitor, Ref}, infinity).
+
call(PidOrPids, Msg) ->
invoke(PidOrPids, fun (P) -> gen_server2:call(P, Msg, infinity) end).
@@ -134,10 +151,10 @@ group_pids_by_node(Pids) ->
delegate_name(Hash) ->
list_to_atom("delegate_" ++ integer_to_list(Hash)).
-delegate(RemoteNodes) ->
+delegate(Pid, RemoteNodes) ->
case get(delegate) of
undefined -> Name = delegate_name(
- erlang:phash2(self(),
+ erlang:phash2(Pid,
delegate_sup:count(RemoteNodes))),
put(delegate, Name),
Name;
@@ -156,21 +173,40 @@ safe_invoke(Pid, Fun) when is_pid(Pid) ->
%%----------------------------------------------------------------------------
init([]) ->
- {ok, node(), hibernate,
+ {ok, #state{node = node(), monitors = dict:new()}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-handle_call({invoke, Fun, Grouped}, _From, Node) ->
- {reply, safe_invoke(orddict:fetch(Node, Grouped), Fun), Node, hibernate}.
+handle_call({invoke, Fun, Grouped}, _From, State = #state{node = Node}) ->
+ {reply, safe_invoke(orddict:fetch(Node, Grouped), Fun), State, hibernate};
-handle_cast({invoke, Fun, Grouped}, Node) ->
+handle_call({monitor, WantsMonitor, ToMonitor}, _From,
+ State = #state{monitors = Monitors}) ->
+ Ref = erlang:monitor(process, ToMonitor),
+ State1 = State#state{monitors = dict:store(Ref, WantsMonitor, Monitors)},
+ {reply, Ref, State1, hibernate};
+
+handle_call({demonitor, Ref}, _From, State = #state{monitors = Monitors}) ->
+ %% We need to ensure we don't then respond to a 'DOWN' that's
+ %% currently in our mailbox - if we did then our client might then
+ %% get a 'DOWN' after demonitor() returns.
+ State1 = State#state{monitors = dict:erase(Ref, Monitors)},
+ {reply, erlang:demonitor(Ref, [flush]), State1, hibernate}.
+
+handle_cast({invoke, Fun, Grouped}, State = #state{node = Node}) ->
safe_invoke(orddict:fetch(Node, Grouped), Fun),
- {noreply, Node, hibernate}.
+ {noreply, State, hibernate}.
+
+handle_info({'DOWN', Ref, process, Object, Info},
+ State = #state{monitors = Monitors}) ->
+ WantsMonitor = dict:fetch(Ref, Monitors),
+ WantsMonitor ! {'DOWN', Ref, process, Object, Info},
+ {noreply, State#state{monitors = dict:erase(Ref, Monitors)}, hibernate};
-handle_info(_Info, Node) ->
- {noreply, Node, hibernate}.
+handle_info(_Info, State) ->
+ {noreply, State, hibernate}.
terminate(_Reason, _State) ->
ok.
-code_change(_OldVsn, Node, _Extra) ->
- {ok, Node}.
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
diff --git a/src/dmon.erl b/src/dmon.erl
new file mode 100644
index 00000000..dfb420c3
--- /dev/null
+++ b/src/dmon.erl
@@ -0,0 +1,70 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2011-2013 VMware, Inc. All rights reserved.
+%%
+
+-module(dmon).
+
+-export([new/0, monitor/2, monitor_all/2, demonitor/2, is_monitored/2, erase/2,
+ monitored/1, is_empty/1]).
+
+-compile({no_auto_import, [monitor/2]}).
+
+-ifdef(use_specs).
+
+%%----------------------------------------------------------------------------
+
+-export_type([?MODULE/0]).
+
+-opaque(?MODULE() :: dict()).
+
+-type(item() :: pid() | {atom(), node()}).
+
+-spec(new/0 :: () -> ?MODULE()).
+-spec(monitor/2 :: (item(), ?MODULE()) -> ?MODULE()).
+-spec(monitor_all/2 :: ([item()], ?MODULE()) -> ?MODULE()).
+-spec(demonitor/2 :: (item(), ?MODULE()) -> ?MODULE()).
+-spec(is_monitored/2 :: (item(), ?MODULE()) -> boolean()).
+-spec(erase/2 :: (item(), ?MODULE()) -> ?MODULE()).
+-spec(monitored/1 :: (?MODULE()) -> [item()]).
+-spec(is_empty/1 :: (?MODULE()) -> boolean()).
+
+-endif.
+
+new() -> dict:new().
+
+monitor(Item, M) ->
+ case dict:is_key(Item, M) of
+ true -> M;
+ false -> dict:store(Item, delegate:monitor(Item), M)
+ end.
+
+monitor_all([], M) -> M; %% optimisation
+monitor_all([Item], M) -> monitor(Item, M); %% optimisation
+monitor_all(Items, M) -> lists:foldl(fun monitor/2, M, Items).
+
+demonitor(Item, M) ->
+ case dict:find(Item, M) of
+ {ok, MRef} -> delegate:demonitor(Item, MRef),
+ dict:erase(Item, M);
+ error -> M
+ end.
+
+is_monitored(Item, M) -> dict:is_key(Item, M).
+
+erase(Item, M) -> dict:erase(Item, M).
+
+monitored(M) -> dict:fetch_keys(M).
+
+is_empty(M) -> dict:size(M) == 0.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 964b0eb4..ca9418c6 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -120,7 +120,7 @@ init(Q = #amqqueue { name = QName }) ->
msg_id_ack = dict:new(),
msg_id_status = dict:new(),
- known_senders = pmon:new(),
+ known_senders = dmon:new(),
depth_delta = undefined
},
@@ -488,7 +488,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
%% Everything that we're monitoring, we need to ensure our new
%% coordinator is monitoring.
- MPids = pmon:monitored(KS),
+ MPids = dmon:monitored(KS),
ok = rabbit_mirror_queue_coordinator:ensure_monitoring(CPid, MPids),
%% We find all the messages that we've received from channels but
@@ -602,14 +602,14 @@ ensure_rate_timer(State) ->
stop_rate_timer(State) -> rabbit_misc:stop_timer(State, #state.rate_timer_ref).
ensure_monitoring(ChPid, State = #state { known_senders = KS }) ->
- State #state { known_senders = pmon:monitor(ChPid, KS) }.
+ State #state { known_senders = dmon:monitor(ChPid, KS) }.
local_sender_death(ChPid, State = #state { known_senders = KS }) ->
%% The channel will be monitored iff we have received a delivery
%% from it but not heard about its death from the master. So if it
%% is monitored we need to point the death out to the master (see
%% essay).
- ok = case pmon:is_monitored(ChPid, KS) of
+ ok = case dmon:is_monitored(ChPid, KS) of
false -> ok;
true -> confirm_sender_death(ChPid)
end,
@@ -628,7 +628,7 @@ confirm_sender_death(Pid) ->
%% See comment in local_sender_death/2; we might have
%% received a sender_death in the meanwhile so check
%% again.
- ok = case pmon:is_monitored(Pid, KS) of
+ ok = case dmon:is_monitored(Pid, KS) of
false -> ok;
true -> gm:broadcast(GM, {ensure_monitoring, [Pid]}),
confirm_sender_death(Pid)
@@ -776,7 +776,7 @@ process_instruction({sender_death, ChPid},
%% The channel will be monitored iff we have received a message
%% from it. In this case we just want to avoid doing work if we
%% never got any messages.
- {ok, case pmon:is_monitored(ChPid, KS) of
+ {ok, case dmon:is_monitored(ChPid, KS) of
false -> State;
true -> MS1 = case dict:find(ChPid, SQ) of
error ->
@@ -788,7 +788,7 @@ process_instruction({sender_death, ChPid},
credit_flow:peer_down(ChPid),
State #state { sender_queues = dict:erase(ChPid, SQ),
msg_id_status = MS1,
- known_senders = pmon:demonitor(ChPid, KS) }
+ known_senders = dmon:demonitor(ChPid, KS) }
end};
process_instruction({depth, Depth},
State = #state { backing_queue = BQ,