diff options
authorMatthias Radestock <>2008-12-25 19:46:49 +0000
committerMatthias Radestock <>2008-12-25 19:46:49 +0000
commit4882f8c367da4a7f608d1b7be5ab941a4f592441 (patch)
parent7de6e196f14ca8d83815b7e8e023535dda98647e (diff)
optimisation: only notify queues that have had can_send requests rejected
1 files changed, 22 insertions, 18 deletions
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 3e09bb37..38bf4cd4 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -104,13 +104,9 @@ unregister(LimiterPid, QPid) -> gen_server:cast(LimiterPid, {unregister, QPid}).
init([ChPid]) ->
{ok, #lim{ch_pid = ChPid} }.
-handle_call({can_send, _QPid}, _From, State = #lim{in_use = InUse}) ->
+handle_call({can_send, QPid}, _From, State = #lim{in_use = InUse}) ->
case limit_reached(State) of
- true ->
- %% TODO: keep track of the fact that the specific QPid has
- %% had a can_send request rejected, so we can restrict the
- %% notifications to these QPids only.
- {reply, false, State};
+ true -> {reply, false, limit_queue(QPid, State)};
false -> {reply, true, State#lim{in_use = InUse + 1}}
@@ -147,32 +143,39 @@ code_change(_, State, _) ->
maybe_notify(OldState, NewState) ->
case limit_reached(OldState) andalso not(limit_reached(NewState)) of
- true -> ok = notify_queues(NewState#lim.ch_pid, NewState#lim.queues);
- false -> ok
- end,
- NewState.
+ true -> notify_queues(NewState);
+ false -> NewState
+ end.
limit_reached(#lim{prefetch_count = Limit, in_use = InUse}) ->
Limit =/= 0 andalso InUse >= Limit.
remember_queue(QPid, State = #lim{queues = Queues}) ->
case dict:is_key(QPid, Queues) of
- false -> MonitorRef = erlang:monitor(process, QPid),
- State#lim{queues = dict:store(QPid, MonitorRef, Queues)};
+ false -> MRef = erlang:monitor(process, QPid),
+ State#lim{queues = dict:store(QPid, {MRef, false}, Queues)};
true -> State
forget_queue(QPid, State = #lim{ch_pid = ChPid, queues = Queues}) ->
case dict:find(QPid, Queues) of
- {ok, MonitorRef} ->
- true = erlang:demonitor(MonitorRef),
+ {ok, {MRef, _}} ->
+ true = erlang:demonitor(MRef),
ok = rabbit_amqqueue:unblock(QPid, ChPid),
State#lim{queues = dict:erase(QPid, Queues)};
error -> State
-notify_queues(ChPid, Queues) ->
- QList = dict:to_list(Queues),
+limit_queue(QPid, State = #lim{queues = Queues}) ->
+ UpdateFun = fun ({MRef, _}) -> {MRef, true} end,
+ State#lim{queues = dict:update(QPid, UpdateFun, Queues)}.
+notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) ->
+ {QList, NewQueues} =
+ dict:fold(fun (_QPid, {_, false}, Acc) -> Acc;
+ (QPid, {MRef, true}, {L, D}) ->
+ {[QPid | L], dict:store(QPid, {MRef, false}, D)}
+ end, {[], Queues}, Queues),
case length(QList) of
0 -> ok;
L ->
@@ -180,6 +183,7 @@ notify_queues(ChPid, Queues) ->
%% appears in the list, thus ensuring that each queue has
%% an equal chance of being notified first.
{L1, L2} = lists:split(random:uniform(L), QList),
- [ok = rabbit_amqqueue:unblock(Q, ChPid) || {Q, _} <- L2 ++ L1],
+ [ok = rabbit_amqqueue:unblock(Q, ChPid) || Q <- L2 ++ L1],
- end.
+ end,
+ State#lim{queues = NewQueues}.