diff options
author | Ben Hood <0x6e6562@gmail.com> | 2008-12-18 18:46:05 +0000 |
---|---|---|
committer | Ben Hood <0x6e6562@gmail.com> | 2008-12-18 18:46:05 +0000 |
commit | 742fcd66dd6b4a56a9481ba17f4d052389cc8386 (patch) | |
tree | c5580af58cc4f2050c04daac3c451ff21ab49a86 | |
parent | 6c25cdcea5b0ab405d3ab160446784221533e802 (diff) | |
download | rabbitmq-server-742fcd66dd6b4a56a9481ba17f4d052389cc8386.tar.gz |
Added handler for monitor notifications
-rw-r--r-- | src/rabbit_limiter.erl | 23 |
1 files changed, 6 insertions, 17 deletions
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 0d938580..9f23724e 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -95,20 +95,13 @@ handle_call({can_send, QPid}, _From, State = #lim{in_use = InUse}) -> end. % When the new limit is larger than the existing limit, -% notify all queues and forget about queues with an in-use -% capcity of zero +% notify all queues and forget about all queues handle_cast({prefetch_count, PrefetchCount}, State = #lim{prefetch_count = CurrentLimit}) when PrefetchCount > CurrentLimit -> notify_queues(State), NewState = demonitor_all(State), - {noreply, NewState#lim{prefetch_count = PrefetchCount, - in_use = 0}}; - -% Removes the queue process from the set of monitored queues -handle_cast({unregister_queue, QPid}, State = #lim{}) -> - NewState = decrement_in_use(1, State), - {noreply, demonitor_queue(QPid, NewState)}; + {noreply, NewState#lim{prefetch_count = PrefetchCount, in_use = 0}}; % Default setter of the prefetch count handle_cast({prefetch_count, PrefetchCount}, State) -> @@ -129,8 +122,10 @@ handle_cast({decrement_capacity, Magnitude}, State = #lim{in_use = InUse}) -> {noreply, NewState} end. -handle_info(_, State) -> - {noreply, State}. +%% This is received when a queue dies +handle_info({'DOWN', _MonitorRef, _Type, QPid, _Info}, + State = #lim{queues = Queues}) -> + {noreply, State#lim{queues = dict:erase(QPid, Queues)}}. terminate(_, _) -> ok. @@ -150,12 +145,6 @@ monitor_queue(QPid, State = #lim{queues = Queues}) -> true -> State end. -% Stops monitoring a particular queue -demonitor_queue(QPid, State = #lim{queues = Queues}) -> - MonitorRef = dict:fetch(QPid, Queues), - true = erlang:demonitor(MonitorRef), - State#lim{queues = dict:erase(QPid, Queues)}. - % Stops monitoring all queues demonitor_all(State = #lim{queues = Queues}) -> dict:map(fun(_, Ref) -> true = erlang:demonitor(Ref) end, Queues), |