summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Hood <0x6e6562@gmail.com>2008-12-18 18:25:26 +0000
committerBen Hood <0x6e6562@gmail.com>2008-12-18 18:25:26 +0000
commit7498967e007fdf8693aad6dd636e57aa5bf6d531 (patch)
tree5b805688d3a205b1689f64edc55aa85f4ce9818f
parentad32ba9a6e7f63beafdf997df473cf9b279c6209 (diff)
downloadrabbitmq-server-7498967e007fdf8693aad6dd636e57aa5bf6d531.tar.gz
Put some monitors in
-rw-r--r--src/rabbit_amqqueue_process.erl3
-rw-r--r--src/rabbit_limiter.erl43
2 files changed, 26 insertions, 20 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index d26b0bb4..702a8aee 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -102,9 +102,6 @@ init(Q) ->
round_robin = queue:new()}, ?HIBERNATE_AFTER}.
terminate(_Reason, State) ->
- %% Inform all limiters that we're dying
- [ rabbit_limiter:unregister_queue(LimiterPid, self())
- || #cr{limiter_pid = LimiterPid} <- all_ch_record()],
%% FIXME: How do we cancel active subscriptions?
QName = qname(State),
lists:foreach(fun (Txn) -> ok = rollback_work(Txn, QName) end,
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 20f54359..6cc170f9 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -37,7 +37,6 @@
handle_info/2]).
-export([start_link/1]).
-export([set_prefetch_count/2, can_send/2, decrement_capacity/2]).
--export([unregister_queue/2]).
%%----------------------------------------------------------------------------
@@ -46,7 +45,6 @@
-spec(set_prefetch_count/2 :: (pid(), non_neg_integer()) -> 'ok').
-spec(can_send/2 :: (pid(), pid()) -> bool()).
-spec(decrement_capacity/2 :: (pid(), non_neg_integer()) -> 'ok').
--spec(unregister_queue/2 :: (pid(), pid()) -> 'ok').
-endif.
@@ -54,7 +52,7 @@
-record(lim, {prefetch_count = 0,
ch_pid,
- queues = sets:new(),
+ queues = dict:new(),
in_use = 0}).
%---------------------------------------------------------------------------
@@ -78,11 +76,6 @@ can_send(LimiterPid, QPid) ->
% and hence can reduce the in-use-by-that queue capcity information
decrement_capacity(LimiterPid, Magnitude) ->
gen_server:cast(LimiterPid, {decrement_capacity, Magnitude}).
-
-% This is called to tell the limiter that the queue is probably dead and
-% it should be forgotten about
-unregister_queue(LimiterPid, QPid) ->
- gen_server:cast(LimiterPid, {unregister_queue, QPid}).
%---------------------------------------------------------------------------
% gen_server callbacks
@@ -93,9 +86,8 @@ init([ChPid]) ->
% This queuries the limiter to ask if it is possible to send a message without
% breaching a limit for this queue process
-handle_call({can_send, QPid}, _From, State = #lim{in_use = InUse,
- queues = Queues}) ->
- NewState = State#lim{queues = sets:add_element(QPid, Queues)},
+handle_call({can_send, QPid}, _From, State = #lim{in_use = InUse}) ->
+ NewState = monitor_queue(QPid, State),
case limit_reached(NewState) of
true -> {reply, false, NewState};
false ->
@@ -109,14 +101,14 @@ handle_cast({prefetch_count, PrefetchCount},
State = #lim{prefetch_count = CurrentLimit})
when PrefetchCount > CurrentLimit ->
notify_queues(State),
- {noreply, State#lim{prefetch_count = PrefetchCount,
- queues = sets:new(),
+ NewState = demonitor_all(State),
+ {noreply, NewState#lim{prefetch_count = PrefetchCount,
in_use = 0}};
% Removes the queue process from the set of monitored queues
-handle_cast({unregister_queue, QPid}, State= #lim{queues = Queues}) ->
+handle_cast({unregister_queue, QPid}, State = #lim{}) ->
NewState = decrement_in_use(1, State),
- {noreply, NewState#lim{queues = sets:del_element(QPid, Queues)}};
+ {noreply, demonitor_queue(QPid, NewState)};
% Default setter of the prefetch count
handle_cast({prefetch_count, PrefetchCount}, State) ->
@@ -131,7 +123,8 @@ handle_cast({decrement_capacity, Magnitude}, State = #lim{in_use = InUse}) ->
if
ShouldNotify ->
notify_queues(State),
- {noreply, State#lim{queues = sets:new(), in_use = InUse - Magnitude}};
+ NextState = demonitor_all(State),
+ {noreply, NextState#lim{in_use = InUse - Magnitude}};
true ->
{noreply, NewState}
end.
@@ -149,6 +142,22 @@ code_change(_, State, _) ->
% Internal plumbing
%---------------------------------------------------------------------------
+% Starts to monitor a particular queue
+monitor_queue(QPid, State = #lim{queues = Queues}) ->
+ MonitorRef = erlang:monitor(process, QPid),
+ State#lim{queues = dict:store(QPid, MonitorRef, Queues)}.
+
+% Stops monitoring a particular queue
+demonitor_queue(QPid, State = #lim{queues = Queues}) ->
+ MonitorRef = dict:fetch(QPid, Queues),
+ true = erlang:demonitor(MonitorRef),
+ State#lim{queues = dict:erase(QPid, Queues)}.
+
+% Stops monitoring all queues
+demonitor_all(State = #lim{queues = Queues}) ->
+ dict:map(fun(_, Ref) -> true = erlang:demonitor(Ref) end, Queues),
+ State#lim{queues = dict:new()}.
+
% Reduces the in-use-count of the queue by a specific magnitude
decrement_in_use(_, State = #lim{in_use = 0}) ->
State#lim{in_use = 0};
@@ -158,7 +167,7 @@ decrement_in_use(Magnitude, State = #lim{in_use = InUse}) ->
% Unblocks every queue that this limiter knows about
notify_queues(#lim{ch_pid = ChPid, queues = Queues}) ->
- sets:fold(fun(Q,_) -> rabbit_amqqueue:unblock(Q, ChPid) end, [], Queues).
+ dict:map(fun(Q, _) -> rabbit_amqqueue:unblock(Q, ChPid) end, Queues).
% A prefetch limit of zero means unlimited
limit_reached(#lim{prefetch_count = 0}) ->