diff options
author | Matthias Radestock <matthias@lshift.net> | 2008-12-25 19:46:49 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2008-12-25 19:46:49 +0000 |
commit | 4882f8c367da4a7f608d1b7be5ab941a4f592441 (patch) | |
tree | 9f059ef90d50b88287c183b5b14081bf7ce044e0 | |
parent | 7de6e196f14ca8d83815b7e8e023535dda98647e (diff) | |
download | rabbitmq-server-4882f8c367da4a7f608d1b7be5ab941a4f592441.tar.gz |
optimisation: only notify queues that have had can_send requests rejected
-rw-r--r-- | src/rabbit_limiter.erl | 40 |
1 files changed, 22 insertions, 18 deletions
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 3e09bb37..38bf4cd4 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -104,13 +104,9 @@ unregister(LimiterPid, QPid) -> gen_server:cast(LimiterPid, {unregister, QPid}). init([ChPid]) -> {ok, #lim{ch_pid = ChPid} }. -handle_call({can_send, _QPid}, _From, State = #lim{in_use = InUse}) -> +handle_call({can_send, QPid}, _From, State = #lim{in_use = InUse}) -> case limit_reached(State) of - true -> - %% TODO: keep track of the fact that the specific QPid has - %% had a can_send request rejected, so we can restrict the - %% notifications to these QPids only. - {reply, false, State}; + true -> {reply, false, limit_queue(QPid, State)}; false -> {reply, true, State#lim{in_use = InUse + 1}} end. @@ -147,32 +143,39 @@ code_change(_, State, _) -> maybe_notify(OldState, NewState) -> case limit_reached(OldState) andalso not(limit_reached(NewState)) of - true -> ok = notify_queues(NewState#lim.ch_pid, NewState#lim.queues); - false -> ok - end, - NewState. + true -> notify_queues(NewState); + false -> NewState + end. limit_reached(#lim{prefetch_count = Limit, in_use = InUse}) -> Limit =/= 0 andalso InUse >= Limit. remember_queue(QPid, State = #lim{queues = Queues}) -> case dict:is_key(QPid, Queues) of - false -> MonitorRef = erlang:monitor(process, QPid), - State#lim{queues = dict:store(QPid, MonitorRef, Queues)}; + false -> MRef = erlang:monitor(process, QPid), + State#lim{queues = dict:store(QPid, {MRef, false}, Queues)}; true -> State end. forget_queue(QPid, State = #lim{ch_pid = ChPid, queues = Queues}) -> case dict:find(QPid, Queues) of - {ok, MonitorRef} -> - true = erlang:demonitor(MonitorRef), + {ok, {MRef, _}} -> + true = erlang:demonitor(MRef), ok = rabbit_amqqueue:unblock(QPid, ChPid), State#lim{queues = dict:erase(QPid, Queues)}; error -> State end. -notify_queues(ChPid, Queues) -> - QList = dict:to_list(Queues), +limit_queue(QPid, State = #lim{queues = Queues}) -> + UpdateFun = fun ({MRef, _}) -> {MRef, true} end, + State#lim{queues = dict:update(QPid, UpdateFun, Queues)}. + +notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) -> + {QList, NewQueues} = + dict:fold(fun (_QPid, {_, false}, Acc) -> Acc; + (QPid, {MRef, true}, {L, D}) -> + {[QPid | L], dict:store(QPid, {MRef, false}, D)} + end, {[], Queues}, Queues), case length(QList) of 0 -> ok; L -> @@ -180,6 +183,7 @@ notify_queues(ChPid, Queues) -> %% appears in the list, thus ensuring that each queue has %% an equal chance of being notified first. {L1, L2} = lists:split(random:uniform(L), QList), - [ok = rabbit_amqqueue:unblock(Q, ChPid) || {Q, _} <- L2 ++ L1], + [ok = rabbit_amqqueue:unblock(Q, ChPid) || Q <- L2 ++ L1], ok - end. + end, + State#lim{queues = NewQueues}. |