summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-07-04 14:56:48 +0100
committerSimon MacMullen <simon@rabbitmq.com>2013-07-04 14:56:48 +0100
commit3de186a95a8c81cd83b9b4c7e5a7aa5cfbc12f5f (patch)
treece623b195def168fdbf165c873a4575cbd689324
parentc70ac2c72afad9035bd7290208a5da6cb0595fb7 (diff)
downloadrabbitmq-server-3de186a95a8c81cd83b9b4c7e5a7aa5cfbc12f5f.tar.gz
Get amqqueu_process to montor via delegate too, tighten delegate specs, simplify pmon parameterisation.
-rw-r--r--src/delegate.erl4
-rw-r--r--src/pmon.erl33
-rw-r--r--src/rabbit_amqqueue_process.erl2
3 files changed, 14 insertions, 25 deletions
diff --git a/src/delegate.erl b/src/delegate.erl
index 460a4899..dad2dd3c 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -42,9 +42,9 @@
[{pid(), term()}]}).
-spec(invoke_no_result/2 ::
(pid() | [pid()], fun ((pid()) -> any())) -> 'ok').
--spec(monitor/2 :: (any(), pid()) -> monitor_ref()).
+-spec(monitor/2 :: ('process', pid()) -> monitor_ref()).
-spec(demonitor/1 :: (monitor_ref()) -> 'true').
--spec(demonitor/2 :: (monitor_ref(), [any()]) -> 'true').
+-spec(demonitor/2 :: (monitor_ref(), ['flush']) -> 'true').
-spec(call/2 ::
( pid(), any()) -> any();
diff --git a/src/pmon.erl b/src/pmon.erl
index 136f6b90..1e31eb60 100644
--- a/src/pmon.erl
+++ b/src/pmon.erl
@@ -16,12 +16,12 @@
-module(pmon).
--export([new/0, new/1, new/3, monitor/2, monitor_all/2, demonitor/2,
+-export([new/0, new/1, monitor/2, monitor_all/2, demonitor/2,
is_monitored/2, erase/2, monitored/1, is_empty/1]).
-compile({no_auto_import, [monitor/2]}).
--record(state, {dict, monitor, demonitor1, demonitor2}).
+-record(state, {dict, module}).
-ifdef(use_specs).
@@ -29,18 +29,13 @@
-export_type([?MODULE/0]).
--opaque(?MODULE() :: #state{dict :: dict(),
- monitor :: fun((atom(), any()) -> any()),
- demonitor1 :: fun((any()) -> 'true'),
- demonitor2 :: fun((any(), [any()]) -> 'true')}).
+-opaque(?MODULE() :: #state{dict :: dict(),
+ module :: atom()}).
-type(item() :: pid() | {atom(), node()}).
-spec(new/0 :: () -> ?MODULE()).
-spec(new/1 :: ('erlang' | 'delegate') -> ?MODULE()).
--spec(new/3 :: (fun((atom(), any()) -> any()),
- fun((any()) -> 'true'),
- fun((any(), [any()]) -> 'true')) -> ?MODULE()).
-spec(monitor/2 :: (item(), ?MODULE()) -> ?MODULE()).
-spec(monitor_all/2 :: ([item()], ?MODULE()) -> ?MODULE()).
-spec(demonitor/2 :: (item(), ?MODULE()) -> ?MODULE()).
@@ -53,29 +48,23 @@
new() -> new(erlang).
-new(erlang) -> new(fun erlang:monitor/2,
- fun erlang:demonitor/1, fun erlang:demonitor/2);
-new(delegate) -> new(fun delegate:monitor/2,
- fun delegate:demonitor/1, fun delegate:demonitor/2).
+new(Module) -> #state{dict = dict:new(),
+ module = Module}.
-new(Monitor, Demonitor1, Demonitor2) -> #state{dict = dict:new(),
- monitor = Monitor,
- demonitor1 = Demonitor1,
- demonitor2 = Demonitor2}.
-
-monitor(Item, S = #state{dict = M, monitor = Monitor}) ->
+monitor(Item, S = #state{dict = M, module = Module}) ->
case dict:is_key(Item, M) of
true -> S;
- false -> S#state{dict = dict:store(Item, Monitor(process, Item), M)}
+ false -> S#state{dict = dict:store(
+ Item, Module:monitor(process, Item), M)}
end.
monitor_all([], S) -> S; %% optimisation
monitor_all([Item], S) -> monitor(Item, S); %% optimisation
monitor_all(Items, S) -> lists:foldl(fun monitor/2, S, Items).
-demonitor(Item, S = #state{dict = M, demonitor1 = Demonitor1}) ->
+demonitor(Item, S = #state{dict = M, module = Module}) ->
case dict:find(Item, M) of
- {ok, MRef} -> Demonitor1(MRef),
+ {ok, MRef} -> Module:demonitor(MRef),
S#state{dict = dict:erase(Item, M)};
error -> M
end.
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index c790a12d..c5045609 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -146,7 +146,7 @@ init_state(Q) ->
exclusive_consumer = none,
has_had_consumers = false,
active_consumers = queue:new(),
- senders = pmon:new(),
+ senders = pmon:new(delegate),
msg_id_to_channel = gb_trees:empty(),
status = running},
rabbit_event:init_stats_timer(State, #q.stats_timer).