summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2008-12-18 20:58:51 +0000
committerMatthias Radestock <matthias@lshift.net>2008-12-18 20:58:51 +0000
commitc2c4195b7d9c1261e0e7a775ae4e9e5782d85f39 (patch)
treef954d13783e990f811d8e59a66bce8d73681fa36
parent39478fac4b45ab2f710f68f66f5e656f903fda0f (diff)
downloadrabbitmq-server-c2c4195b7d9c1261e0e7a775ae4e9e5782d85f39.tar.gz
lots of tweaks and fixes
- remove superfluous (or wrong) comments - notification and demonitoring always go together - don't change the in_use count when limit is altered - fix the limit_reached condition
-rw-r--r--src/rabbit_limiter.erl68
1 files changed, 20 insertions, 48 deletions
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index b939b4bb..e5e54563 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -84,45 +84,26 @@ decrement_capacity(LimiterPid, Magnitude) ->
init([ChPid]) ->
{ok, #lim{ch_pid = ChPid} }.
-% This queries the limiter to ask if it is possible to send a message
-% without breaching a limit for this queue process
handle_call({can_send, QPid}, _From, State = #lim{in_use = InUse}) ->
- NewState = monitor_queue(QPid, State),
- case limit_reached(NewState) of
- true -> {reply, false, NewState};
- false ->
- {reply, true, NewState#lim{in_use = InUse + 1}}
+ case limit_reached(State) of
+ true -> {reply, false, remember_queue(QPid, State)};
+ false -> {reply, true, State#lim{in_use = InUse + 1}}
end.
-% When the new limit is larger than the existing limit,
-% notify all queues and forget about all queues
handle_cast({prefetch_count, PrefetchCount},
- State = #lim{prefetch_count = CurrentLimit})
- when PrefetchCount > CurrentLimit ->
- notify_queues(State),
- NewState = demonitor_all(State),
- {noreply, NewState#lim{prefetch_count = PrefetchCount, in_use = 0}};
-
-% Default setter of the prefetch count
-handle_cast({prefetch_count, PrefetchCount}, State) ->
- {noreply, State#lim{prefetch_count = PrefetchCount}};
-
-% This is an asynchronous ack from a queue that it has received an ack from
-% a queue. This allows the limiter to update the the in-use-by-that queue
-% capacity infromation.
-handle_cast({decrement_capacity, Magnitude}, State = #lim{in_use = InUse}) ->
+ State = #lim{prefetch_count = CurrentLimit}) ->
+ NewState = State#lim{prefetch_count = PrefetchCount},
+ {noreply, if PrefetchCount > CurrentLimit -> forget_queues(NewState);
+ true -> NewState
+ end};
+
+handle_cast({decrement_capacity, Magnitude}, State) ->
NewState = decrement_in_use(Magnitude, State),
ShouldNotify = limit_reached(State) and not(limit_reached(NewState)),
- if
- ShouldNotify ->
- notify_queues(State),
- NextState = demonitor_all(State),
- {noreply, NextState#lim{in_use = InUse - Magnitude}};
- true ->
- {noreply, NewState}
- end.
+ {noreply, if ShouldNotify -> forget_queues(NewState);
+ true -> NewState
+ end}.
-%% This is received when a queue dies
handle_info({'DOWN', _MonitorRef, _Type, QPid, _Info},
State = #lim{queues = Queues}) ->
{noreply, State#lim{queues = dict:erase(QPid, Queues)}}.
@@ -137,35 +118,26 @@ code_change(_, State, _) ->
% Internal plumbing
%---------------------------------------------------------------------------
-% Starts to monitor a particular queue
-monitor_queue(QPid, State = #lim{queues = Queues}) ->
+remember_queue(QPid, State = #lim{queues = Queues}) ->
case dict:is_key(QPid, Queues) of
false -> MonitorRef = erlang:monitor(process, QPid),
State#lim{queues = dict:store(QPid, MonitorRef, Queues)};
true -> State
end.
-% Stops monitoring all queues
-demonitor_all(State = #lim{queues = Queues}) ->
- dict:map(fun(_, Ref) -> true = erlang:demonitor(Ref) end, Queues),
+forget_queues(State = #lim{ch_pid = ChPid, queues = Queues}) ->
+ ok = dict:fold(fun(Q, Ref, ok) ->
+ true = erlang:demonitor(Ref),
+ rabbit_amqqueue:unblock(Q, ChPid)
+ end, ok, Queues),
State#lim{queues = dict:new()}.
-% Reduces the in-use-count of the queue by a specific magnitude
decrement_in_use(_, State = #lim{in_use = 0}) ->
State#lim{in_use = 0};
-
decrement_in_use(Magnitude, State = #lim{in_use = InUse}) ->
State#lim{in_use = InUse - Magnitude}.
-% Unblocks every queue that this limiter knows about
-notify_queues(#lim{ch_pid = ChPid, queues = Queues}) ->
- dict:map(fun(Q, _) -> rabbit_amqqueue:unblock(Q, ChPid) end, Queues).
-
-% A prefetch limit of zero means unlimited
limit_reached(#lim{prefetch_count = 0}) ->
false;
-
-% Works out whether the limit is breached for the current limiter state
limit_reached(#lim{prefetch_count = Limit, in_use = InUse}) ->
- InUse == Limit.
-
+ InUse >= Limit.