summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2008-12-23 16:00:34 +0000
committerMatthias Radestock <matthias@lshift.net>2008-12-23 16:00:34 +0000
commit62874e89b1cd969e12ffc288fb43f46f1b72e05d (patch)
treeaa7c7c4c84a5963653b2a3cb348b4ab23e9daa6b
parent0d160f405915259c1b7aa64a8f8c1ecf07978434 (diff)
downloadrabbitmq-server-62874e89b1cd969e12ffc288fb43f46f1b72e05d.tar.gz
make limiter keep track of all queues with subscriptions
This is more efficient since it avoids the repeated (de)monitoring and updates to the limiter state.
-rw-r--r--src/rabbit_amqqueue_process.erl12
-rw-r--r--src/rabbit_limiter.erl58
2 files changed, 52 insertions, 18 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 53b569b4..c01f08df 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -634,6 +634,11 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
C1 = C#cr{consumers = [Consumer | Consumers],
limiter_pid = LimiterPid},
store_ch_record(C1),
+ if Consumers == [] ->
+ ok = rabbit_limiter:register(LimiterPid, self());
+ true ->
+ ok
+ end,
State1 = State#q{has_had_consumers = true,
exclusive_consumer =
if
@@ -653,12 +658,17 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
not_found ->
ok = maybe_send_reply(ChPid, OkMsg),
reply(ok, State);
- C = #cr{consumers = Consumers} ->
+ C = #cr{consumers = Consumers, limiter_pid = LimiterPid} ->
NewConsumers = lists:filter
(fun (#consumer{tag = CT}) -> CT /= ConsumerTag end,
Consumers),
C1 = C#cr{consumers = NewConsumers},
store_ch_record(C1),
+ if NewConsumers == [] ->
+ ok = rabbit_limiter:unregister(LimiterPid, self());
+ true ->
+ ok
+ end,
ok = maybe_send_reply(ChPid, OkMsg),
case check_auto_delete(
State#q{exclusive_consumer = cancel_holder(ChPid,
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 12632625..f1a45415 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -36,7 +36,7 @@
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2]).
-export([start_link/1]).
--export([limit/2, can_send/2, ack/2]).
+-export([limit/2, can_send/2, ack/2, register/2, unregister/2]).
%%----------------------------------------------------------------------------
@@ -45,6 +45,8 @@
-spec(limit/2 :: (pid(), non_neg_integer()) -> 'ok').
-spec(can_send/2 :: (pid(), pid()) -> bool()).
-spec(ack/2 :: (pid(), non_neg_integer()) -> 'ok').
+-spec(register/2 :: (pid(), pid()) -> 'ok').
+-spec(unregister/2 :: (pid(), pid()) -> 'ok').
-endif.
@@ -76,6 +78,12 @@ can_send(LimiterPid, QPid) ->
ack(LimiterPid, Count) ->
gen_server:cast(LimiterPid, {ack, Count}).
+register(LimiterPid, QPid) ->
+ gen_server:cast(LimiterPid, {register, QPid}).
+
+unregister(LimiterPid, QPid) ->
+ gen_server:cast(LimiterPid, {unregister, QPid}).
+
%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------
@@ -83,9 +91,13 @@ ack(LimiterPid, Count) ->
init([ChPid]) ->
{ok, #lim{ch_pid = ChPid} }.
-handle_call({can_send, QPid}, _From, State = #lim{in_use = InUse}) ->
+handle_call({can_send, _QPid}, _From, State = #lim{in_use = InUse}) ->
case limit_reached(State) of
- true -> {reply, false, remember_queue(QPid, State)};
+ true ->
+ %% TODO: keep track of the fact that the specific QPid has
+ %% had a can_send request rejected, so we can restrict the
+ %% notifications to these QPids only.
+ {reply, false, State};
false -> {reply, true, State#lim{in_use = InUse + 1}}
end.
@@ -96,11 +108,16 @@ handle_cast({ack, Count}, State = #lim{in_use = InUse}) ->
NewInUse = if InUse == 0 -> 0;
true -> InUse - Count
end,
- {noreply, maybe_notify(State, State#lim{in_use = NewInUse})}.
+ {noreply, maybe_notify(State, State#lim{in_use = NewInUse})};
+
+handle_cast({register, QPid}, State) ->
+ {noreply, remember_queue(QPid, State)};
-handle_info({'DOWN', _MonitorRef, _Type, QPid, _Info},
- State = #lim{queues = Queues}) ->
- {noreply, State#lim{queues = dict:erase(QPid, Queues)}}.
+handle_cast({unregister, QPid}, State) ->
+ {noreply, forget_queue(QPid, State)}.
+
+handle_info({'DOWN', _MonitorRef, _Type, QPid, _Info}, State) ->
+ {noreply, forget_queue(QPid, State)}.
terminate(_, _) ->
ok.
@@ -114,9 +131,10 @@ code_change(_, State, _) ->
maybe_notify(OldState, NewState) ->
case limit_reached(OldState) andalso not(limit_reached(NewState)) of
- true -> forget_queues(NewState);
- false -> NewState
- end.
+ true -> ok = notify_queues(NewState#lim.ch_pid, NewState#lim.queues);
+ false -> ok
+ end,
+ NewState.
limit_reached(#lim{prefetch_count = Limit, in_use = InUse}) ->
Limit =/= 0 andalso InUse >= Limit.
@@ -128,7 +146,16 @@ remember_queue(QPid, State = #lim{queues = Queues}) ->
true -> State
end.
-forget_queues(State = #lim{ch_pid = ChPid, queues = Queues}) ->
+forget_queue(QPid, State = #lim{ch_pid = ChPid, queues = Queues}) ->
+ case dict:find(QPid, Queues) of
+ {ok, MonitorRef} ->
+ true = erlang:demonitor(MonitorRef),
+ ok = rabbit_amqqueue:unblock(QPid, ChPid),
+ State#lim{queues = dict:erase(QPid, Queues)};
+ error -> State
+ end.
+
+notify_queues(ChPid, Queues) ->
QList = dict:to_list(Queues),
case length(QList) of
0 -> ok;
@@ -137,9 +164,6 @@ forget_queues(State = #lim{ch_pid = ChPid, queues = Queues}) ->
%% appears in the list, thus ensuring that each queue has
%% an equal chance of being notified first.
{L1, L2} = lists:split(random:uniform(L), QList),
- [begin
- true = erlang:demonitor(Ref),
- ok = rabbit_amqqueue:unblock(Q, ChPid)
- end || {Q, Ref} <- L2 ++ L1]
- end,
- State#lim{queues = dict:new()}.
+ [ok = rabbit_amqqueue:unblock(Q, ChPid) || {Q, _} <- L2 ++ L1],
+ ok
+ end.