diff options
author | Ben Hood <0x6e6562@gmail.com> | 2008-11-24 00:52:28 +0000 |
---|---|---|
committer | Ben Hood <0x6e6562@gmail.com> | 2008-11-24 00:52:28 +0000 |
commit | 1b8c0ff6a1f6a305945afacdb3e5d223ae1221f9 (patch) | |
tree | ed964a155eb58ebf2d01c8ab31769ce3b3855f36 | |
parent | 3410d55dcbb1ad78ec2d7ca600bb0bda4c6cb502 (diff) | |
download | rabbitmq-server-1b8c0ff6a1f6a305945afacdb3e5d223ae1221f9.tar.gz |
Now the channel sends the ack directly to the limiter instead of via the queue
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 1 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 3 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 18 |
3 files changed, 12 insertions, 10 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index c0f48ad1..b4d0d52d 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -666,7 +666,6 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) -> not_found -> noreply(State); C = #cr{unacked_messages = UAM, limiter_pid = LimiterPid} -> - rabbit_limiter:decrement_capacity(LimiterPid), {Acked, Remaining} = collect_messages(MsgIds, UAM), persist_acks(Txn, qname(State), Acked), case Txn of diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index c6108489..4abc3494 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -271,6 +271,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, handle_method(#'basic.ack'{delivery_tag = DeliveryTag, multiple = Multiple}, _, State = #ch{transaction_id = TxnKey, + limiter = Limiter, next_tag = NextDeliveryTag, unacked_message_q = UAMQ}) -> if DeliveryTag >= NextDeliveryTag -> @@ -279,6 +280,8 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, true -> ok end, {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), + % CC the limiter on the number of acks that have been received + rabbit_limiter:decrement_capacity(Limiter, queue:len(Acked)), Participants = ack(State#ch.proxy_pid, TxnKey, Acked), {noreply, case TxnKey of none -> State#ch{unacked_message_q = Remaining}; diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index b83af0c9..4e130ea0 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -10,7 +10,7 @@ -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). -export([start_link/1]). --export([set_prefetch_count/2, can_send/2, decrement_capacity/1]). +-export([set_prefetch_count/2, can_send/2, decrement_capacity/2]). -record(lim, {prefetch_count = 0, ch_pid, @@ -36,8 +36,8 @@ can_send(LimiterPid, QPid) -> % Lets the limiter know that a queue has received an ack from a consumer % and hence can reduce the in-use-by-that queue capcity information -decrement_capacity(LimiterPid) -> - gen_server:cast(LimiterPid, decrement_capacity). +decrement_capacity(LimiterPid, Magnitude) -> + gen_server:cast(LimiterPid, {decrement_capacity, Magnitude}). %--------------------------------------------------------------------------- % gen_server callbacks @@ -75,8 +75,8 @@ handle_cast({prefetch_count, PrefetchCount}, State) -> % This is an asynchronous ack from a queue that it has received an ack from % a queue. This allows the limiter to update the the in-use-by-that queue % capacity infromation. -handle_cast(decrement_capacity, State = #lim{in_use = InUse}) -> - NewState = decrement_in_use(State), +handle_cast({decrement_capacity, Magnitude}, State = #lim{in_use = InUse}) -> + NewState = decrement_in_use(Magnitude, State), ShouldNotify = limit_reached(State) and not(limit_reached(NewState)), if ShouldNotify -> @@ -99,12 +99,12 @@ code_change(_, State, _) -> % Internal plumbing %--------------------------------------------------------------------------- -% Reduces the in-use-count of the queue by one -decrement_in_use(State = #lim{in_use = 0}) -> +% Reduces the in-use-count of the queue by a specific magnitude +decrement_in_use(_, State = #lim{in_use = 0}) -> State#lim{in_use = 0}; -decrement_in_use(State = #lim{in_use = InUse}) -> - State#lim{in_use = InUse - 1}. +decrement_in_use(Magnitude, State = #lim{in_use = InUse}) -> + State#lim{in_use = InUse - Magnitude}. % Unblocks every queue that this limiter knows about notify_queues(#lim{ch_pid = ChPid, queues = Queues}) -> |