summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Hood <0x6e6562@gmail.com>2008-12-18 18:46:05 +0000
committerBen Hood <0x6e6562@gmail.com>2008-12-18 18:46:05 +0000
commit742fcd66dd6b4a56a9481ba17f4d052389cc8386 (patch)
treec5580af58cc4f2050c04daac3c451ff21ab49a86
parent6c25cdcea5b0ab405d3ab160446784221533e802 (diff)
downloadrabbitmq-server-742fcd66dd6b4a56a9481ba17f4d052389cc8386.tar.gz
Added handler for monitor notifications
-rw-r--r--src/rabbit_limiter.erl23
1 files changed, 6 insertions, 17 deletions
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 0d938580..9f23724e 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -95,20 +95,13 @@ handle_call({can_send, QPid}, _From, State = #lim{in_use = InUse}) ->
end.
% When the new limit is larger than the existing limit,
-% notify all queues and forget about queues with an in-use
-% capcity of zero
+% notify all queues and forget about all queues
handle_cast({prefetch_count, PrefetchCount},
State = #lim{prefetch_count = CurrentLimit})
when PrefetchCount > CurrentLimit ->
notify_queues(State),
NewState = demonitor_all(State),
- {noreply, NewState#lim{prefetch_count = PrefetchCount,
- in_use = 0}};
-
-% Removes the queue process from the set of monitored queues
-handle_cast({unregister_queue, QPid}, State = #lim{}) ->
- NewState = decrement_in_use(1, State),
- {noreply, demonitor_queue(QPid, NewState)};
+ {noreply, NewState#lim{prefetch_count = PrefetchCount, in_use = 0}};
% Default setter of the prefetch count
handle_cast({prefetch_count, PrefetchCount}, State) ->
@@ -129,8 +122,10 @@ handle_cast({decrement_capacity, Magnitude}, State = #lim{in_use = InUse}) ->
{noreply, NewState}
end.
-handle_info(_, State) ->
- {noreply, State}.
+%% This is received when a queue dies
+handle_info({'DOWN', _MonitorRef, _Type, QPid, _Info},
+ State = #lim{queues = Queues}) ->
+ {noreply, State#lim{queues = dict:erase(QPid, Queues)}}.
terminate(_, _) ->
ok.
@@ -150,12 +145,6 @@ monitor_queue(QPid, State = #lim{queues = Queues}) ->
true -> State
end.
-% Stops monitoring a particular queue
-demonitor_queue(QPid, State = #lim{queues = Queues}) ->
- MonitorRef = dict:fetch(QPid, Queues),
- true = erlang:demonitor(MonitorRef),
- State#lim{queues = dict:erase(QPid, Queues)}.
-
% Stops monitoring all queues
demonitor_all(State = #lim{queues = Queues}) ->
dict:map(fun(_, Ref) -> true = erlang:demonitor(Ref) end, Queues),