summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-02-12 13:30:30 +0000
committerSimon MacMullen <simon@rabbitmq.com>2013-02-12 13:30:30 +0000
commit6f4290c5279bb1ac8c954bda6a1a97af9d437ec0 (patch)
tree8e0d0aa802694e68d04eb0788b06ede743d43190
parent79fd234457ccaeababfe1a2ce30d4d3eb979eb7f (diff)
downloadrabbitmq-server-6f4290c5279bb1ac8c954bda6a1a97af9d437ec0.tar.gz
Move rabbit_channel:send_drained/2 invocations into the queue module, and make sure we send drained for all consumers in drain mode.
-rw-r--r--src/rabbit_amqqueue_process.erl43
-rw-r--r--src/rabbit_channel.erl17
-rw-r--r--src/rabbit_limiter.erl65
3 files changed, 67 insertions, 58 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 88d13290..f5648eca 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -405,6 +405,21 @@ erase_ch_record(#cr{ch_pid = ChPid,
erase({ch, ChPid}),
ok.
+maybe_send_drained(#q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ case BQ:len(BQS) of
+ 0 -> [maybe_send_drained(C) || C <- all_ch_record()];
+ _ -> ok
+ end;
+maybe_send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) ->
+ case rabbit_limiter:drained(Limiter) of
+ {[], Limiter} ->
+ ok;
+ {CTagCredit, Limiter2} ->
+ rabbit_channel:send_drained(ChPid, CTagCredit),
+ update_ch_record(C#cr{limiter = Limiter2})
+ end.
+
update_consumer_count(C = #cr{consumer_count = 0, limiter = Limiter}, +1) ->
ok = rabbit_limiter:register(Limiter, self()),
update_ch_record(C#cr{consumer_count = 1});
@@ -437,9 +452,7 @@ deliver_msgs_to_consumers(DeliverFun, false,
deliver_msgs_to_consumers(DeliverFun, Stop, State1)
end.
-deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer},
- State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
+deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) ->
C = ch_record(ChPid),
case is_ch_blocked(C) of
true ->
@@ -449,8 +462,7 @@ deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer},
#cr{limiter = Limiter, ch_pid = ChPid, blocked_ctags = BCTags} = C,
#consumer{tag = CTag} = Consumer,
case rabbit_limiter:can_send(
- Limiter, self(), Consumer#consumer.ack_required,
- ChPid, CTag, BQ:len(BQS)) of
+ Limiter, self(), Consumer#consumer.ack_required, CTag) of
consumer_blocked ->
block_consumer(C#cr{blocked_ctags = [CTag | BCTags]}, E),
{false, State};
@@ -483,6 +495,7 @@ deliver_msg_to_consumer(DeliverFun, NewLimiter,
update_ch_record(C#cr{acktags = ChAckTags1,
limiter = NewLimiter,
unsent_message_count = Count + 1}),
+ maybe_send_drained(State1),
{Stop, State1}.
deliver_from_queue_deliver(AckRequired, State) ->
@@ -1098,9 +1111,7 @@ handle_call({basic_get, ChPid, NoAck}, _From,
handle_call({basic_consume, NoAck, ChPid, Limiter,
ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg},
- _From, State = #q{exclusive_consumer = Holder,
- backing_queue = BQ,
- backing_queue_state = BQS}) ->
+ _From, State = #q{exclusive_consumer = Holder}) ->
case check_exclusive_access(Holder, ExclusiveConsume, State) of
in_use ->
reply({error, exclusive_consume_unavailable}, State);
@@ -1111,8 +1122,7 @@ handle_call({basic_consume, NoAck, ChPid, Limiter,
Limiter;
{Credit, Drain} ->
rabbit_limiter:initial_credit(
- Limiter, ChPid, ConsumerTag, Credit, Drain,
- BQ:len(BQS))
+ Limiter, ConsumerTag, Credit, Drain)
end,
C1 = update_consumer_count(C#cr{limiter = Limiter2}, +1),
Consumer = #consumer{tag = ConsumerTag,
@@ -1132,6 +1142,7 @@ handle_call({basic_consume, NoAck, ChPid, Limiter,
AC1 = queue:in(E, State1#q.active_consumers),
run_message_queue(State1#q{active_consumers = AC1})
end,
+ maybe_send_drained(State2),
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
not NoAck, qname(State2)),
reply(ok, State2)
@@ -1342,11 +1353,13 @@ handle_cast({credit, ChPid, CTag, Credit, Drain},
backing_queue_state = BQS}) ->
#cr{limiter = Lim,
blocked_ctags = BCTags} = ch_record(ChPid),
- {Unblock, Lim2} = rabbit_limiter:credit(
- Lim, ChPid, CTag, Credit, Drain, BQ:len(BQS)),
- noreply(possibly_unblock(
- State, ChPid, fun(C) -> C#cr{blocked_ctags = BCTags -- Unblock,
- limiter = Lim2} end));
+ {Unblock, Lim2} = rabbit_limiter:credit(Lim, CTag, Credit, Drain),
+ rabbit_channel:send_credit_reply(ChPid, BQ:len(BQS)),
+ State1 = possibly_unblock(
+ State, ChPid, fun(C) -> C#cr{blocked_ctags = BCTags -- Unblock,
+ limiter = Lim2} end),
+ maybe_send_drained(State1),
+ noreply(State1);
handle_cast(wake_up, State) ->
noreply(State).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index c1eb126c..aed25344 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -21,7 +21,7 @@
-behaviour(gen_server2).
-export([start_link/11, do/2, do/3, do_flow/3, flush/1, shutdown/1]).
--export([send_command/2, deliver/4, send_credit_reply/2, send_drained/3,
+-export([send_command/2, deliver/4, send_credit_reply/2, send_drained/2,
flushed/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
-export([refresh_config_local/0, ready_for_close/1]).
@@ -96,7 +96,7 @@
(pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg())
-> 'ok').
-spec(send_credit_reply/2 :: (pid(), non_neg_integer()) -> 'ok').
--spec(send_drained/3 :: (pid(), rabbit_types:ctag(), non_neg_integer())
+-spec(send_drained/2 :: (pid(), [{rabbit_types:ctag(), non_neg_integer()}])
-> 'ok').
-spec(flushed/2 :: (pid(), pid()) -> 'ok').
-spec(list/0 :: () -> [pid()]).
@@ -145,8 +145,8 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) ->
send_credit_reply(Pid, Len) ->
gen_server2:cast(Pid, {send_credit_reply, Len}).
-send_drained(Pid, ConsumerTag, CreditDrained) ->
- gen_server2:cast(Pid, {send_drained, ConsumerTag, CreditDrained}).
+send_drained(Pid, CTagCredit) ->
+ gen_server2:cast(Pid, {send_drained, CTagCredit}).
flushed(Pid, QPid) ->
gen_server2:cast(Pid, {flushed, QPid}).
@@ -330,11 +330,12 @@ handle_cast({send_credit_reply, Len}, State = #ch{writer_pid = WriterPid}) ->
WriterPid, #'basic.credit_ok'{available = Len}),
noreply(State);
-handle_cast({send_drained, ConsumerTag, CreditDrained},
+handle_cast({send_drained, CTagCredit},
State = #ch{writer_pid = WriterPid}) ->
- ok = rabbit_writer:send_command(
- WriterPid, #'basic.credit_drained'{consumer_tag = ConsumerTag,
- credit_drained = CreditDrained}),
+ [ok = rabbit_writer:send_command(
+ WriterPid, #'basic.credit_drained'{consumer_tag = ConsumerTag,
+ credit_drained = CreditDrained})
+ || {ConsumerTag, CreditDrained} <- CTagCredit],
noreply(State);
handle_cast(force_event_refresh, State) ->
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 46b465bc..1ee5448e 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -22,9 +22,10 @@
handle_info/2, prioritise_call/3]).
-export([start_link/0, make_token/0, make_token/1, is_enabled/1, enable/2,
disable/1]).
--export([limit/2, can_send/6, ack/2, register/2, unregister/2]).
+-export([limit/2, can_send/4, ack/2, register/2, unregister/2]).
-export([get_limit/1, block/1, unblock/1, is_blocked/1]).
--export([initial_credit/6, credit/6, forget_consumer/2, copy_queue_state/2]).
+-export([initial_credit/4, credit/4, drained/1, forget_consumer/2,
+ copy_queue_state/2]).
-import(rabbit_misc, [serial_add/2, serial_diff/2]).
@@ -45,8 +46,7 @@
-spec(enable/2 :: (token(), non_neg_integer()) -> token()).
-spec(disable/1 :: (token()) -> token()).
-spec(limit/2 :: (token(), non_neg_integer()) -> 'ok' | {'disabled', token()}).
--spec(can_send/6 :: (token(), pid(), boolean(), pid(), rabbit_types:ctag(),
- non_neg_integer())
+-spec(can_send/4 :: (token(), pid(), boolean(), rabbit_types:ctag())
-> token() | 'consumer_blocked' | 'channel_blocked').
-spec(ack/2 :: (token(), non_neg_integer()) -> 'ok').
-spec(register/2 :: (token(), pid()) -> 'ok').
@@ -55,12 +55,12 @@
-spec(block/1 :: (token()) -> 'ok').
-spec(unblock/1 :: (token()) -> 'ok' | {'disabled', token()}).
-spec(is_blocked/1 :: (token()) -> boolean()).
--spec(initial_credit/6 :: (token(), pid(), rabbit_types:ctag(),
- non_neg_integer(), boolean(), non_neg_integer())
- -> token()).
--spec(credit/6 :: (token(), pid(), rabbit_types:ctag(),
- non_neg_integer(), boolean(), non_neg_integer())
+-spec(initial_credit/4 :: (token(), rabbit_types:ctag(),
+ non_neg_integer(), boolean()) -> token()).
+-spec(credit/4 :: (token(), rabbit_types:ctag(), non_neg_integer(), boolean())
-> {[rabbit_types:ctag()], token()}).
+-spec(drained/1 :: (token())
+ -> {[{rabbit_types:ctag(), non_neg_integer()}], token()}).
-spec(forget_consumer/2 :: (token(), rabbit_types:ctag()) -> token()).
-spec(copy_queue_state/2 :: (token(), token()) -> token()).
@@ -105,7 +105,7 @@ limit(Limiter, PrefetchCount) ->
%% to avoid always going through with_exit_handler/2, even when the
%% limiter is disabled.
can_send(Token = #token{pid = Pid, enabled = Enabled, q_state = Credits},
- QPid, AckReq, ChPid, CTag, Len) ->
+ QPid, AckReq, CTag) ->
ConsAllows = case dict:find(CTag, Credits) of
{ok, #credit{credit = C}} when C > 0 -> true;
{ok, #credit{}} -> false;
@@ -113,8 +113,7 @@ can_send(Token = #token{pid = Pid, enabled = Enabled, q_state = Credits},
end,
case ConsAllows of
true -> case not Enabled orelse call_can_send(Pid, QPid, AckReq) of
- true -> Credits2 = record_send_q(
- CTag, Len, ChPid, Credits),
+ true -> Credits2 = record_send_q(CTag, Credits),
Token#token{q_state = Credits2};
false -> channel_blocked
end;
@@ -150,19 +149,24 @@ unblock(Limiter) ->
is_blocked(Limiter) ->
maybe_call(Limiter, is_blocked, false).
-initial_credit(Limiter = #token{q_state = Credits},
- ChPid, CTag, Credit, Drain, Len) ->
- {[], Credits2} = update_credit(
- CTag, Len, ChPid, Credit, Drain, Credits),
+initial_credit(Limiter = #token{q_state = Credits}, CTag, Credit, Drain) ->
+ {[], Credits2} = update_credit(CTag, Credit, Drain, Credits),
Limiter#token{q_state = Credits2}.
-credit(Limiter = #token{q_state = Credits},
- ChPid, CTag, Credit, Drain, Len) ->
- {Unblock, Credits2} = update_credit(
- CTag, Len, ChPid, Credit, Drain, Credits),
- rabbit_channel:send_credit_reply(ChPid, Len),
+credit(Limiter = #token{q_state = Credits}, CTag, Credit, Drain) ->
+ {Unblock, Credits2} = update_credit(CTag, Credit, Drain, Credits),
{Unblock, Limiter#token{q_state = Credits2}}.
+drained(Limiter = #token{q_state = Credits}) ->
+ {CTagCredits, Credits2} =
+ dict:fold(
+ fun (CTag, #credit{credit = C, drain = true}, {Acc, Creds0}) ->
+ {[{CTag, C} | Acc], write_credit(CTag, 0, false, Creds0)};
+ (_CTag, #credit{credit = _C, drain = false}, {Acc, Creds0}) ->
+ {Acc, Creds0}
+ end, {[], Credits}, Credits),
+ {CTagCredits, Limiter#token{q_state = Credits2}}.
+
forget_consumer(Limiter = #token{q_state = Credits}, CTag) ->
Limiter#token{q_state = dict:erase(CTag, Credits)}.
@@ -179,19 +183,17 @@ copy_queue_state(#token{q_state = Credits}, Token) ->
%% we get the queue to hold a bit of state for us (#token.q_state), and
%% maintain a fiction that the limiter is making the decisions...
-record_send_q(CTag, Len, ChPid, Credits) ->
+record_send_q(CTag, Credits) ->
case dict:find(CTag, Credits) of
{ok, #credit{credit = Credit, drain = Drain}} ->
- NewCredit = maybe_drain(Len - 1, Drain, CTag, ChPid, Credit - 1),
- write_credit(CTag, NewCredit, Drain, Credits);
+ write_credit(CTag, Credit, Drain, Credits);
error ->
Credits
end.
-update_credit(CTag, Len, ChPid, Credit, Drain, Credits) ->
- NewCredit = maybe_drain(Len, Drain, CTag, ChPid, Credit),
- NewCredits = write_credit(CTag, NewCredit, Drain, Credits),
- case NewCredit > 0 of
+update_credit(CTag, Credit, Drain, Credits) ->
+ NewCredits = write_credit(CTag, Credit, Drain, Credits),
+ case Credit > 0 of
true -> {[CTag], NewCredits};
false -> {[], NewCredits}
end.
@@ -199,13 +201,6 @@ update_credit(CTag, Len, ChPid, Credit, Drain, Credits) ->
write_credit(CTag, Credit, Drain, Credits) ->
dict:store(CTag, #credit{credit = Credit, drain = Drain}, Credits).
-maybe_drain(0, true, CTag, ChPid, Credit) ->
- rabbit_channel:send_drained(ChPid, CTag, Credit),
- 0; %% Magic reduction to 0
-
-maybe_drain(_, _, _, _, Credit) ->
- Credit.
-
%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------