summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Hood <0x6e6562@gmail.com>2008-11-24 00:52:28 +0000
committerBen Hood <0x6e6562@gmail.com>2008-11-24 00:52:28 +0000
commit1b8c0ff6a1f6a305945afacdb3e5d223ae1221f9 (patch)
treeed964a155eb58ebf2d01c8ab31769ce3b3855f36
parent3410d55dcbb1ad78ec2d7ca600bb0bda4c6cb502 (diff)
downloadrabbitmq-server-1b8c0ff6a1f6a305945afacdb3e5d223ae1221f9.tar.gz
Now the channel sends the ack directly to the limiter instead of via the queue
-rw-r--r--src/rabbit_amqqueue_process.erl1
-rw-r--r--src/rabbit_channel.erl3
-rw-r--r--src/rabbit_limiter.erl18
3 files changed, 12 insertions, 10 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index c0f48ad1..b4d0d52d 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -666,7 +666,6 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) ->
not_found ->
noreply(State);
C = #cr{unacked_messages = UAM, limiter_pid = LimiterPid} ->
- rabbit_limiter:decrement_capacity(LimiterPid),
{Acked, Remaining} = collect_messages(MsgIds, UAM),
persist_acks(Txn, qname(State), Acked),
case Txn of
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index c6108489..4abc3494 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -271,6 +271,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
multiple = Multiple},
_, State = #ch{transaction_id = TxnKey,
+ limiter = Limiter,
next_tag = NextDeliveryTag,
unacked_message_q = UAMQ}) ->
if DeliveryTag >= NextDeliveryTag ->
@@ -279,6 +280,8 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
true -> ok
end,
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
+ % CC the limiter on the number of acks that have been received
+ rabbit_limiter:decrement_capacity(Limiter, queue:len(Acked)),
Participants = ack(State#ch.proxy_pid, TxnKey, Acked),
{noreply, case TxnKey of
none -> State#ch{unacked_message_q = Remaining};
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index b83af0c9..4e130ea0 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -10,7 +10,7 @@
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2]).
-export([start_link/1]).
--export([set_prefetch_count/2, can_send/2, decrement_capacity/1]).
+-export([set_prefetch_count/2, can_send/2, decrement_capacity/2]).
-record(lim, {prefetch_count = 0,
ch_pid,
@@ -36,8 +36,8 @@ can_send(LimiterPid, QPid) ->
% Lets the limiter know that a queue has received an ack from a consumer
% and hence can reduce the in-use-by-that queue capcity information
-decrement_capacity(LimiterPid) ->
- gen_server:cast(LimiterPid, decrement_capacity).
+decrement_capacity(LimiterPid, Magnitude) ->
+ gen_server:cast(LimiterPid, {decrement_capacity, Magnitude}).
%---------------------------------------------------------------------------
% gen_server callbacks
@@ -75,8 +75,8 @@ handle_cast({prefetch_count, PrefetchCount}, State) ->
% This is an asynchronous ack from a queue that it has received an ack from
% a queue. This allows the limiter to update the the in-use-by-that queue
% capacity infromation.
-handle_cast(decrement_capacity, State = #lim{in_use = InUse}) ->
- NewState = decrement_in_use(State),
+handle_cast({decrement_capacity, Magnitude}, State = #lim{in_use = InUse}) ->
+ NewState = decrement_in_use(Magnitude, State),
ShouldNotify = limit_reached(State) and not(limit_reached(NewState)),
if
ShouldNotify ->
@@ -99,12 +99,12 @@ code_change(_, State, _) ->
% Internal plumbing
%---------------------------------------------------------------------------
-% Reduces the in-use-count of the queue by one
-decrement_in_use(State = #lim{in_use = 0}) ->
+% Reduces the in-use-count of the queue by a specific magnitude
+decrement_in_use(_, State = #lim{in_use = 0}) ->
State#lim{in_use = 0};
-decrement_in_use(State = #lim{in_use = InUse}) ->
- State#lim{in_use = InUse - 1}.
+decrement_in_use(Magnitude, State = #lim{in_use = InUse}) ->
+ State#lim{in_use = InUse - Magnitude}.
% Unblocks every queue that this limiter knows about
notify_queues(#lim{ch_pid = ChPid, queues = Queues}) ->