diff options
author | Matthias Radestock <matthias@lshift.net> | 2008-12-23 16:00:34 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2008-12-23 16:00:34 +0000 |
commit | 62874e89b1cd969e12ffc288fb43f46f1b72e05d (patch) | |
tree | aa7c7c4c84a5963653b2a3cb348b4ab23e9daa6b | |
parent | 0d160f405915259c1b7aa64a8f8c1ecf07978434 (diff) | |
download | rabbitmq-server-62874e89b1cd969e12ffc288fb43f46f1b72e05d.tar.gz |
make limiter keep track of all queues with subscriptions
This is more efficient since it avoids the repeated (de)monitoring and
updates to the limiter state.
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 12 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 58 |
2 files changed, 52 insertions, 18 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 53b569b4..c01f08df 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -634,6 +634,11 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, C1 = C#cr{consumers = [Consumer | Consumers], limiter_pid = LimiterPid}, store_ch_record(C1), + if Consumers == [] -> + ok = rabbit_limiter:register(LimiterPid, self()); + true -> + ok + end, State1 = State#q{has_had_consumers = true, exclusive_consumer = if @@ -653,12 +658,17 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, not_found -> ok = maybe_send_reply(ChPid, OkMsg), reply(ok, State); - C = #cr{consumers = Consumers} -> + C = #cr{consumers = Consumers, limiter_pid = LimiterPid} -> NewConsumers = lists:filter (fun (#consumer{tag = CT}) -> CT /= ConsumerTag end, Consumers), C1 = C#cr{consumers = NewConsumers}, store_ch_record(C1), + if NewConsumers == [] -> + ok = rabbit_limiter:unregister(LimiterPid, self()); + true -> + ok + end, ok = maybe_send_reply(ChPid, OkMsg), case check_auto_delete( State#q{exclusive_consumer = cancel_holder(ChPid, diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 12632625..f1a45415 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -36,7 +36,7 @@ -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). -export([start_link/1]). --export([limit/2, can_send/2, ack/2]). +-export([limit/2, can_send/2, ack/2, register/2, unregister/2]). %%---------------------------------------------------------------------------- @@ -45,6 +45,8 @@ -spec(limit/2 :: (pid(), non_neg_integer()) -> 'ok'). -spec(can_send/2 :: (pid(), pid()) -> bool()). -spec(ack/2 :: (pid(), non_neg_integer()) -> 'ok'). +-spec(register/2 :: (pid(), pid()) -> 'ok'). +-spec(unregister/2 :: (pid(), pid()) -> 'ok'). -endif. @@ -76,6 +78,12 @@ can_send(LimiterPid, QPid) -> ack(LimiterPid, Count) -> gen_server:cast(LimiterPid, {ack, Count}). +register(LimiterPid, QPid) -> + gen_server:cast(LimiterPid, {register, QPid}). + +unregister(LimiterPid, QPid) -> + gen_server:cast(LimiterPid, {unregister, QPid}). + %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- @@ -83,9 +91,13 @@ ack(LimiterPid, Count) -> init([ChPid]) -> {ok, #lim{ch_pid = ChPid} }. -handle_call({can_send, QPid}, _From, State = #lim{in_use = InUse}) -> +handle_call({can_send, _QPid}, _From, State = #lim{in_use = InUse}) -> case limit_reached(State) of - true -> {reply, false, remember_queue(QPid, State)}; + true -> + %% TODO: keep track of the fact that the specific QPid has + %% had a can_send request rejected, so we can restrict the + %% notifications to these QPids only. + {reply, false, State}; false -> {reply, true, State#lim{in_use = InUse + 1}} end. @@ -96,11 +108,16 @@ handle_cast({ack, Count}, State = #lim{in_use = InUse}) -> NewInUse = if InUse == 0 -> 0; true -> InUse - Count end, - {noreply, maybe_notify(State, State#lim{in_use = NewInUse})}. + {noreply, maybe_notify(State, State#lim{in_use = NewInUse})}; + +handle_cast({register, QPid}, State) -> + {noreply, remember_queue(QPid, State)}; -handle_info({'DOWN', _MonitorRef, _Type, QPid, _Info}, - State = #lim{queues = Queues}) -> - {noreply, State#lim{queues = dict:erase(QPid, Queues)}}. +handle_cast({unregister, QPid}, State) -> + {noreply, forget_queue(QPid, State)}. + +handle_info({'DOWN', _MonitorRef, _Type, QPid, _Info}, State) -> + {noreply, forget_queue(QPid, State)}. terminate(_, _) -> ok. @@ -114,9 +131,10 @@ code_change(_, State, _) -> maybe_notify(OldState, NewState) -> case limit_reached(OldState) andalso not(limit_reached(NewState)) of - true -> forget_queues(NewState); - false -> NewState - end. + true -> ok = notify_queues(NewState#lim.ch_pid, NewState#lim.queues); + false -> ok + end, + NewState. limit_reached(#lim{prefetch_count = Limit, in_use = InUse}) -> Limit =/= 0 andalso InUse >= Limit. @@ -128,7 +146,16 @@ remember_queue(QPid, State = #lim{queues = Queues}) -> true -> State end. -forget_queues(State = #lim{ch_pid = ChPid, queues = Queues}) -> +forget_queue(QPid, State = #lim{ch_pid = ChPid, queues = Queues}) -> + case dict:find(QPid, Queues) of + {ok, MonitorRef} -> + true = erlang:demonitor(MonitorRef), + ok = rabbit_amqqueue:unblock(QPid, ChPid), + State#lim{queues = dict:erase(QPid, Queues)}; + error -> State + end. + +notify_queues(ChPid, Queues) -> QList = dict:to_list(Queues), case length(QList) of 0 -> ok; @@ -137,9 +164,6 @@ forget_queues(State = #lim{ch_pid = ChPid, queues = Queues}) -> %% appears in the list, thus ensuring that each queue has %% an equal chance of being notified first. {L1, L2} = lists:split(random:uniform(L), QList), - [begin - true = erlang:demonitor(Ref), - ok = rabbit_amqqueue:unblock(Q, ChPid) - end || {Q, Ref} <- L2 ++ L1] - end, - State#lim{queues = dict:new()}. + [ok = rabbit_amqqueue:unblock(Q, ChPid) || {Q, _} <- L2 ++ L1], + ok + end. |