diff options
-rw-r--r-- | src/delegate.erl | 4 | ||||
-rw-r--r-- | src/pmon.erl | 33 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 2 |
3 files changed, 14 insertions, 25 deletions
diff --git a/src/delegate.erl b/src/delegate.erl index 460a4899..dad2dd3c 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -42,9 +42,9 @@ [{pid(), term()}]}). -spec(invoke_no_result/2 :: (pid() | [pid()], fun ((pid()) -> any())) -> 'ok'). --spec(monitor/2 :: (any(), pid()) -> monitor_ref()). +-spec(monitor/2 :: ('process', pid()) -> monitor_ref()). -spec(demonitor/1 :: (monitor_ref()) -> 'true'). --spec(demonitor/2 :: (monitor_ref(), [any()]) -> 'true'). +-spec(demonitor/2 :: (monitor_ref(), ['flush']) -> 'true'). -spec(call/2 :: ( pid(), any()) -> any(); diff --git a/src/pmon.erl b/src/pmon.erl index 136f6b90..1e31eb60 100644 --- a/src/pmon.erl +++ b/src/pmon.erl @@ -16,12 +16,12 @@ -module(pmon). --export([new/0, new/1, new/3, monitor/2, monitor_all/2, demonitor/2, +-export([new/0, new/1, monitor/2, monitor_all/2, demonitor/2, is_monitored/2, erase/2, monitored/1, is_empty/1]). -compile({no_auto_import, [monitor/2]}). --record(state, {dict, monitor, demonitor1, demonitor2}). +-record(state, {dict, module}). -ifdef(use_specs). @@ -29,18 +29,13 @@ -export_type([?MODULE/0]). --opaque(?MODULE() :: #state{dict :: dict(), - monitor :: fun((atom(), any()) -> any()), - demonitor1 :: fun((any()) -> 'true'), - demonitor2 :: fun((any(), [any()]) -> 'true')}). +-opaque(?MODULE() :: #state{dict :: dict(), + module :: atom()}). -type(item() :: pid() | {atom(), node()}). -spec(new/0 :: () -> ?MODULE()). -spec(new/1 :: ('erlang' | 'delegate') -> ?MODULE()). --spec(new/3 :: (fun((atom(), any()) -> any()), - fun((any()) -> 'true'), - fun((any(), [any()]) -> 'true')) -> ?MODULE()). -spec(monitor/2 :: (item(), ?MODULE()) -> ?MODULE()). -spec(monitor_all/2 :: ([item()], ?MODULE()) -> ?MODULE()). -spec(demonitor/2 :: (item(), ?MODULE()) -> ?MODULE()). @@ -53,29 +48,23 @@ new() -> new(erlang). -new(erlang) -> new(fun erlang:monitor/2, - fun erlang:demonitor/1, fun erlang:demonitor/2); -new(delegate) -> new(fun delegate:monitor/2, - fun delegate:demonitor/1, fun delegate:demonitor/2). +new(Module) -> #state{dict = dict:new(), + module = Module}. -new(Monitor, Demonitor1, Demonitor2) -> #state{dict = dict:new(), - monitor = Monitor, - demonitor1 = Demonitor1, - demonitor2 = Demonitor2}. - -monitor(Item, S = #state{dict = M, monitor = Monitor}) -> +monitor(Item, S = #state{dict = M, module = Module}) -> case dict:is_key(Item, M) of true -> S; - false -> S#state{dict = dict:store(Item, Monitor(process, Item), M)} + false -> S#state{dict = dict:store( + Item, Module:monitor(process, Item), M)} end. monitor_all([], S) -> S; %% optimisation monitor_all([Item], S) -> monitor(Item, S); %% optimisation monitor_all(Items, S) -> lists:foldl(fun monitor/2, S, Items). -demonitor(Item, S = #state{dict = M, demonitor1 = Demonitor1}) -> +demonitor(Item, S = #state{dict = M, module = Module}) -> case dict:find(Item, M) of - {ok, MRef} -> Demonitor1(MRef), + {ok, MRef} -> Module:demonitor(MRef), S#state{dict = dict:erase(Item, M)}; error -> M end. diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index c790a12d..c5045609 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -146,7 +146,7 @@ init_state(Q) -> exclusive_consumer = none, has_had_consumers = false, active_consumers = queue:new(), - senders = pmon:new(), + senders = pmon:new(delegate), msg_id_to_channel = gb_trees:empty(), status = running}, rabbit_event:init_stats_timer(State, #q.stats_timer). |