diff options
author | Ben Hood <0x6e6562@gmail.com> | 2008-12-18 18:25:26 +0000 |
---|---|---|
committer | Ben Hood <0x6e6562@gmail.com> | 2008-12-18 18:25:26 +0000 |
commit | 7498967e007fdf8693aad6dd636e57aa5bf6d531 (patch) | |
tree | 5b805688d3a205b1689f64edc55aa85f4ce9818f | |
parent | ad32ba9a6e7f63beafdf997df473cf9b279c6209 (diff) | |
download | rabbitmq-server-7498967e007fdf8693aad6dd636e57aa5bf6d531.tar.gz |
Put some monitors in
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 3 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 43 |
2 files changed, 26 insertions, 20 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index d26b0bb4..702a8aee 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -102,9 +102,6 @@ init(Q) -> round_robin = queue:new()}, ?HIBERNATE_AFTER}. terminate(_Reason, State) -> - %% Inform all limiters that we're dying - [ rabbit_limiter:unregister_queue(LimiterPid, self()) - || #cr{limiter_pid = LimiterPid} <- all_ch_record()], %% FIXME: How do we cancel active subscriptions? QName = qname(State), lists:foreach(fun (Txn) -> ok = rollback_work(Txn, QName) end, diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 20f54359..6cc170f9 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -37,7 +37,6 @@ handle_info/2]). -export([start_link/1]). -export([set_prefetch_count/2, can_send/2, decrement_capacity/2]). --export([unregister_queue/2]). %%---------------------------------------------------------------------------- @@ -46,7 +45,6 @@ -spec(set_prefetch_count/2 :: (pid(), non_neg_integer()) -> 'ok'). -spec(can_send/2 :: (pid(), pid()) -> bool()). -spec(decrement_capacity/2 :: (pid(), non_neg_integer()) -> 'ok'). --spec(unregister_queue/2 :: (pid(), pid()) -> 'ok'). -endif. @@ -54,7 +52,7 @@ -record(lim, {prefetch_count = 0, ch_pid, - queues = sets:new(), + queues = dict:new(), in_use = 0}). %--------------------------------------------------------------------------- @@ -78,11 +76,6 @@ can_send(LimiterPid, QPid) -> % and hence can reduce the in-use-by-that queue capcity information decrement_capacity(LimiterPid, Magnitude) -> gen_server:cast(LimiterPid, {decrement_capacity, Magnitude}). - -% This is called to tell the limiter that the queue is probably dead and -% it should be forgotten about -unregister_queue(LimiterPid, QPid) -> - gen_server:cast(LimiterPid, {unregister_queue, QPid}). %--------------------------------------------------------------------------- % gen_server callbacks @@ -93,9 +86,8 @@ init([ChPid]) -> % This queuries the limiter to ask if it is possible to send a message without % breaching a limit for this queue process -handle_call({can_send, QPid}, _From, State = #lim{in_use = InUse, - queues = Queues}) -> - NewState = State#lim{queues = sets:add_element(QPid, Queues)}, +handle_call({can_send, QPid}, _From, State = #lim{in_use = InUse}) -> + NewState = monitor_queue(QPid, State), case limit_reached(NewState) of true -> {reply, false, NewState}; false -> @@ -109,14 +101,14 @@ handle_cast({prefetch_count, PrefetchCount}, State = #lim{prefetch_count = CurrentLimit}) when PrefetchCount > CurrentLimit -> notify_queues(State), - {noreply, State#lim{prefetch_count = PrefetchCount, - queues = sets:new(), + NewState = demonitor_all(State), + {noreply, NewState#lim{prefetch_count = PrefetchCount, in_use = 0}}; % Removes the queue process from the set of monitored queues -handle_cast({unregister_queue, QPid}, State= #lim{queues = Queues}) -> +handle_cast({unregister_queue, QPid}, State = #lim{}) -> NewState = decrement_in_use(1, State), - {noreply, NewState#lim{queues = sets:del_element(QPid, Queues)}}; + {noreply, demonitor_queue(QPid, NewState)}; % Default setter of the prefetch count handle_cast({prefetch_count, PrefetchCount}, State) -> @@ -131,7 +123,8 @@ handle_cast({decrement_capacity, Magnitude}, State = #lim{in_use = InUse}) -> if ShouldNotify -> notify_queues(State), - {noreply, State#lim{queues = sets:new(), in_use = InUse - Magnitude}}; + NextState = demonitor_all(State), + {noreply, NextState#lim{in_use = InUse - Magnitude}}; true -> {noreply, NewState} end. @@ -149,6 +142,22 @@ code_change(_, State, _) -> % Internal plumbing %--------------------------------------------------------------------------- +% Starts to monitor a particular queue +monitor_queue(QPid, State = #lim{queues = Queues}) -> + MonitorRef = erlang:monitor(process, QPid), + State#lim{queues = dict:store(QPid, MonitorRef, Queues)}. + +% Stops monitoring a particular queue +demonitor_queue(QPid, State = #lim{queues = Queues}) -> + MonitorRef = dict:fetch(QPid, Queues), + true = erlang:demonitor(MonitorRef), + State#lim{queues = dict:erase(QPid, Queues)}. + +% Stops monitoring all queues +demonitor_all(State = #lim{queues = Queues}) -> + dict:map(fun(_, Ref) -> true = erlang:demonitor(Ref) end, Queues), + State#lim{queues = dict:new()}. + % Reduces the in-use-count of the queue by a specific magnitude decrement_in_use(_, State = #lim{in_use = 0}) -> State#lim{in_use = 0}; @@ -158,7 +167,7 @@ decrement_in_use(Magnitude, State = #lim{in_use = InUse}) -> % Unblocks every queue that this limiter knows about notify_queues(#lim{ch_pid = ChPid, queues = Queues}) -> - sets:fold(fun(Q,_) -> rabbit_amqqueue:unblock(Q, ChPid) end, [], Queues). + dict:map(fun(Q, _) -> rabbit_amqqueue:unblock(Q, ChPid) end, Queues). % A prefetch limit of zero means unlimited limit_reached(#lim{prefetch_count = 0}) -> |