diff options
author | Matthias Radestock <matthias@lshift.net> | 2008-12-18 20:58:51 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2008-12-18 20:58:51 +0000 |
commit | c2c4195b7d9c1261e0e7a775ae4e9e5782d85f39 (patch) | |
tree | f954d13783e990f811d8e59a66bce8d73681fa36 /src/rabbit_limiter.erl | |
parent | 39478fac4b45ab2f710f68f66f5e656f903fda0f (diff) | |
download | rabbitmq-server-c2c4195b7d9c1261e0e7a775ae4e9e5782d85f39.tar.gz |
lots of tweaks and fixes
- remove superfluous (or wrong) comments
- notification and demonitoring always go together
- don't change the in_use count when limit is altered
- fix the limit_reached condition
Diffstat (limited to 'src/rabbit_limiter.erl')
-rw-r--r-- | src/rabbit_limiter.erl | 68 |
1 files changed, 20 insertions, 48 deletions
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index b939b4bb..e5e54563 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -84,45 +84,26 @@ decrement_capacity(LimiterPid, Magnitude) -> init([ChPid]) -> {ok, #lim{ch_pid = ChPid} }. -% This queries the limiter to ask if it is possible to send a message -% without breaching a limit for this queue process handle_call({can_send, QPid}, _From, State = #lim{in_use = InUse}) -> - NewState = monitor_queue(QPid, State), - case limit_reached(NewState) of - true -> {reply, false, NewState}; - false -> - {reply, true, NewState#lim{in_use = InUse + 1}} + case limit_reached(State) of + true -> {reply, false, remember_queue(QPid, State)}; + false -> {reply, true, State#lim{in_use = InUse + 1}} end. -% When the new limit is larger than the existing limit, -% notify all queues and forget about all queues handle_cast({prefetch_count, PrefetchCount}, - State = #lim{prefetch_count = CurrentLimit}) - when PrefetchCount > CurrentLimit -> - notify_queues(State), - NewState = demonitor_all(State), - {noreply, NewState#lim{prefetch_count = PrefetchCount, in_use = 0}}; - -% Default setter of the prefetch count -handle_cast({prefetch_count, PrefetchCount}, State) -> - {noreply, State#lim{prefetch_count = PrefetchCount}}; - -% This is an asynchronous ack from a queue that it has received an ack from -% a queue. This allows the limiter to update the the in-use-by-that queue -% capacity infromation. -handle_cast({decrement_capacity, Magnitude}, State = #lim{in_use = InUse}) -> + State = #lim{prefetch_count = CurrentLimit}) -> + NewState = State#lim{prefetch_count = PrefetchCount}, + {noreply, if PrefetchCount > CurrentLimit -> forget_queues(NewState); + true -> NewState + end}; + +handle_cast({decrement_capacity, Magnitude}, State) -> NewState = decrement_in_use(Magnitude, State), ShouldNotify = limit_reached(State) and not(limit_reached(NewState)), - if - ShouldNotify -> - notify_queues(State), - NextState = demonitor_all(State), - {noreply, NextState#lim{in_use = InUse - Magnitude}}; - true -> - {noreply, NewState} - end. + {noreply, if ShouldNotify -> forget_queues(NewState); + true -> NewState + end}. -%% This is received when a queue dies handle_info({'DOWN', _MonitorRef, _Type, QPid, _Info}, State = #lim{queues = Queues}) -> {noreply, State#lim{queues = dict:erase(QPid, Queues)}}. @@ -137,35 +118,26 @@ code_change(_, State, _) -> % Internal plumbing %--------------------------------------------------------------------------- -% Starts to monitor a particular queue -monitor_queue(QPid, State = #lim{queues = Queues}) -> +remember_queue(QPid, State = #lim{queues = Queues}) -> case dict:is_key(QPid, Queues) of false -> MonitorRef = erlang:monitor(process, QPid), State#lim{queues = dict:store(QPid, MonitorRef, Queues)}; true -> State end. -% Stops monitoring all queues -demonitor_all(State = #lim{queues = Queues}) -> - dict:map(fun(_, Ref) -> true = erlang:demonitor(Ref) end, Queues), +forget_queues(State = #lim{ch_pid = ChPid, queues = Queues}) -> + ok = dict:fold(fun(Q, Ref, ok) -> + true = erlang:demonitor(Ref), + rabbit_amqqueue:unblock(Q, ChPid) + end, ok, Queues), State#lim{queues = dict:new()}. -% Reduces the in-use-count of the queue by a specific magnitude decrement_in_use(_, State = #lim{in_use = 0}) -> State#lim{in_use = 0}; - decrement_in_use(Magnitude, State = #lim{in_use = InUse}) -> State#lim{in_use = InUse - Magnitude}. -% Unblocks every queue that this limiter knows about -notify_queues(#lim{ch_pid = ChPid, queues = Queues}) -> - dict:map(fun(Q, _) -> rabbit_amqqueue:unblock(Q, ChPid) end, Queues). - -% A prefetch limit of zero means unlimited limit_reached(#lim{prefetch_count = 0}) -> false; - -% Works out whether the limit is breached for the current limiter state limit_reached(#lim{prefetch_count = Limit, in_use = InUse}) -> - InUse == Limit. - + InUse >= Limit. |