diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-02-12 13:30:30 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-02-12 13:30:30 +0000 |
commit | 6f4290c5279bb1ac8c954bda6a1a97af9d437ec0 (patch) | |
tree | 8e0d0aa802694e68d04eb0788b06ede743d43190 | |
parent | 79fd234457ccaeababfe1a2ce30d4d3eb979eb7f (diff) | |
download | rabbitmq-server-6f4290c5279bb1ac8c954bda6a1a97af9d437ec0.tar.gz |
Move rabbit_channel:send_drained/2 invocations into the queue module, and make sure we send drained for all consumers in drain mode.
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 43 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 17 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 65 |
3 files changed, 67 insertions, 58 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 88d13290..f5648eca 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -405,6 +405,21 @@ erase_ch_record(#cr{ch_pid = ChPid, erase({ch, ChPid}), ok. +maybe_send_drained(#q{backing_queue = BQ, + backing_queue_state = BQS}) -> + case BQ:len(BQS) of + 0 -> [maybe_send_drained(C) || C <- all_ch_record()]; + _ -> ok + end; +maybe_send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) -> + case rabbit_limiter:drained(Limiter) of + {[], Limiter} -> + ok; + {CTagCredit, Limiter2} -> + rabbit_channel:send_drained(ChPid, CTagCredit), + update_ch_record(C#cr{limiter = Limiter2}) + end. + update_consumer_count(C = #cr{consumer_count = 0, limiter = Limiter}, +1) -> ok = rabbit_limiter:register(Limiter, self()), update_ch_record(C#cr{consumer_count = 1}); @@ -437,9 +452,7 @@ deliver_msgs_to_consumers(DeliverFun, false, deliver_msgs_to_consumers(DeliverFun, Stop, State1) end. -deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, - State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> +deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) -> C = ch_record(ChPid), case is_ch_blocked(C) of true -> @@ -449,8 +462,7 @@ deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, #cr{limiter = Limiter, ch_pid = ChPid, blocked_ctags = BCTags} = C, #consumer{tag = CTag} = Consumer, case rabbit_limiter:can_send( - Limiter, self(), Consumer#consumer.ack_required, - ChPid, CTag, BQ:len(BQS)) of + Limiter, self(), Consumer#consumer.ack_required, CTag) of consumer_blocked -> block_consumer(C#cr{blocked_ctags = [CTag | BCTags]}, E), {false, State}; @@ -483,6 +495,7 @@ deliver_msg_to_consumer(DeliverFun, NewLimiter, update_ch_record(C#cr{acktags = ChAckTags1, limiter = NewLimiter, unsent_message_count = Count + 1}), + maybe_send_drained(State1), {Stop, State1}. deliver_from_queue_deliver(AckRequired, State) -> @@ -1098,9 +1111,7 @@ handle_call({basic_get, ChPid, NoAck}, _From, handle_call({basic_consume, NoAck, ChPid, Limiter, ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg}, - _From, State = #q{exclusive_consumer = Holder, - backing_queue = BQ, - backing_queue_state = BQS}) -> + _From, State = #q{exclusive_consumer = Holder}) -> case check_exclusive_access(Holder, ExclusiveConsume, State) of in_use -> reply({error, exclusive_consume_unavailable}, State); @@ -1111,8 +1122,7 @@ handle_call({basic_consume, NoAck, ChPid, Limiter, Limiter; {Credit, Drain} -> rabbit_limiter:initial_credit( - Limiter, ChPid, ConsumerTag, Credit, Drain, - BQ:len(BQS)) + Limiter, ConsumerTag, Credit, Drain) end, C1 = update_consumer_count(C#cr{limiter = Limiter2}, +1), Consumer = #consumer{tag = ConsumerTag, @@ -1132,6 +1142,7 @@ handle_call({basic_consume, NoAck, ChPid, Limiter, AC1 = queue:in(E, State1#q.active_consumers), run_message_queue(State1#q{active_consumers = AC1}) end, + maybe_send_drained(State2), emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, not NoAck, qname(State2)), reply(ok, State2) @@ -1342,11 +1353,13 @@ handle_cast({credit, ChPid, CTag, Credit, Drain}, backing_queue_state = BQS}) -> #cr{limiter = Lim, blocked_ctags = BCTags} = ch_record(ChPid), - {Unblock, Lim2} = rabbit_limiter:credit( - Lim, ChPid, CTag, Credit, Drain, BQ:len(BQS)), - noreply(possibly_unblock( - State, ChPid, fun(C) -> C#cr{blocked_ctags = BCTags -- Unblock, - limiter = Lim2} end)); + {Unblock, Lim2} = rabbit_limiter:credit(Lim, CTag, Credit, Drain), + rabbit_channel:send_credit_reply(ChPid, BQ:len(BQS)), + State1 = possibly_unblock( + State, ChPid, fun(C) -> C#cr{blocked_ctags = BCTags -- Unblock, + limiter = Lim2} end), + maybe_send_drained(State1), + noreply(State1); handle_cast(wake_up, State) -> noreply(State). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index c1eb126c..aed25344 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -21,7 +21,7 @@ -behaviour(gen_server2). -export([start_link/11, do/2, do/3, do_flow/3, flush/1, shutdown/1]). --export([send_command/2, deliver/4, send_credit_reply/2, send_drained/3, +-export([send_command/2, deliver/4, send_credit_reply/2, send_drained/2, flushed/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). -export([refresh_config_local/0, ready_for_close/1]). @@ -96,7 +96,7 @@ (pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg()) -> 'ok'). -spec(send_credit_reply/2 :: (pid(), non_neg_integer()) -> 'ok'). --spec(send_drained/3 :: (pid(), rabbit_types:ctag(), non_neg_integer()) +-spec(send_drained/2 :: (pid(), [{rabbit_types:ctag(), non_neg_integer()}]) -> 'ok'). -spec(flushed/2 :: (pid(), pid()) -> 'ok'). -spec(list/0 :: () -> [pid()]). @@ -145,8 +145,8 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) -> send_credit_reply(Pid, Len) -> gen_server2:cast(Pid, {send_credit_reply, Len}). -send_drained(Pid, ConsumerTag, CreditDrained) -> - gen_server2:cast(Pid, {send_drained, ConsumerTag, CreditDrained}). +send_drained(Pid, CTagCredit) -> + gen_server2:cast(Pid, {send_drained, CTagCredit}). flushed(Pid, QPid) -> gen_server2:cast(Pid, {flushed, QPid}). @@ -330,11 +330,12 @@ handle_cast({send_credit_reply, Len}, State = #ch{writer_pid = WriterPid}) -> WriterPid, #'basic.credit_ok'{available = Len}), noreply(State); -handle_cast({send_drained, ConsumerTag, CreditDrained}, +handle_cast({send_drained, CTagCredit}, State = #ch{writer_pid = WriterPid}) -> - ok = rabbit_writer:send_command( - WriterPid, #'basic.credit_drained'{consumer_tag = ConsumerTag, - credit_drained = CreditDrained}), + [ok = rabbit_writer:send_command( + WriterPid, #'basic.credit_drained'{consumer_tag = ConsumerTag, + credit_drained = CreditDrained}) + || {ConsumerTag, CreditDrained} <- CTagCredit], noreply(State); handle_cast(force_event_refresh, State) -> diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 46b465bc..1ee5448e 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -22,9 +22,10 @@ handle_info/2, prioritise_call/3]). -export([start_link/0, make_token/0, make_token/1, is_enabled/1, enable/2, disable/1]). --export([limit/2, can_send/6, ack/2, register/2, unregister/2]). +-export([limit/2, can_send/4, ack/2, register/2, unregister/2]). -export([get_limit/1, block/1, unblock/1, is_blocked/1]). --export([initial_credit/6, credit/6, forget_consumer/2, copy_queue_state/2]). +-export([initial_credit/4, credit/4, drained/1, forget_consumer/2, + copy_queue_state/2]). -import(rabbit_misc, [serial_add/2, serial_diff/2]). @@ -45,8 +46,7 @@ -spec(enable/2 :: (token(), non_neg_integer()) -> token()). -spec(disable/1 :: (token()) -> token()). -spec(limit/2 :: (token(), non_neg_integer()) -> 'ok' | {'disabled', token()}). --spec(can_send/6 :: (token(), pid(), boolean(), pid(), rabbit_types:ctag(), - non_neg_integer()) +-spec(can_send/4 :: (token(), pid(), boolean(), rabbit_types:ctag()) -> token() | 'consumer_blocked' | 'channel_blocked'). -spec(ack/2 :: (token(), non_neg_integer()) -> 'ok'). -spec(register/2 :: (token(), pid()) -> 'ok'). @@ -55,12 +55,12 @@ -spec(block/1 :: (token()) -> 'ok'). -spec(unblock/1 :: (token()) -> 'ok' | {'disabled', token()}). -spec(is_blocked/1 :: (token()) -> boolean()). --spec(initial_credit/6 :: (token(), pid(), rabbit_types:ctag(), - non_neg_integer(), boolean(), non_neg_integer()) - -> token()). --spec(credit/6 :: (token(), pid(), rabbit_types:ctag(), - non_neg_integer(), boolean(), non_neg_integer()) +-spec(initial_credit/4 :: (token(), rabbit_types:ctag(), + non_neg_integer(), boolean()) -> token()). +-spec(credit/4 :: (token(), rabbit_types:ctag(), non_neg_integer(), boolean()) -> {[rabbit_types:ctag()], token()}). +-spec(drained/1 :: (token()) + -> {[{rabbit_types:ctag(), non_neg_integer()}], token()}). -spec(forget_consumer/2 :: (token(), rabbit_types:ctag()) -> token()). -spec(copy_queue_state/2 :: (token(), token()) -> token()). @@ -105,7 +105,7 @@ limit(Limiter, PrefetchCount) -> %% to avoid always going through with_exit_handler/2, even when the %% limiter is disabled. can_send(Token = #token{pid = Pid, enabled = Enabled, q_state = Credits}, - QPid, AckReq, ChPid, CTag, Len) -> + QPid, AckReq, CTag) -> ConsAllows = case dict:find(CTag, Credits) of {ok, #credit{credit = C}} when C > 0 -> true; {ok, #credit{}} -> false; @@ -113,8 +113,7 @@ can_send(Token = #token{pid = Pid, enabled = Enabled, q_state = Credits}, end, case ConsAllows of true -> case not Enabled orelse call_can_send(Pid, QPid, AckReq) of - true -> Credits2 = record_send_q( - CTag, Len, ChPid, Credits), + true -> Credits2 = record_send_q(CTag, Credits), Token#token{q_state = Credits2}; false -> channel_blocked end; @@ -150,19 +149,24 @@ unblock(Limiter) -> is_blocked(Limiter) -> maybe_call(Limiter, is_blocked, false). -initial_credit(Limiter = #token{q_state = Credits}, - ChPid, CTag, Credit, Drain, Len) -> - {[], Credits2} = update_credit( - CTag, Len, ChPid, Credit, Drain, Credits), +initial_credit(Limiter = #token{q_state = Credits}, CTag, Credit, Drain) -> + {[], Credits2} = update_credit(CTag, Credit, Drain, Credits), Limiter#token{q_state = Credits2}. -credit(Limiter = #token{q_state = Credits}, - ChPid, CTag, Credit, Drain, Len) -> - {Unblock, Credits2} = update_credit( - CTag, Len, ChPid, Credit, Drain, Credits), - rabbit_channel:send_credit_reply(ChPid, Len), +credit(Limiter = #token{q_state = Credits}, CTag, Credit, Drain) -> + {Unblock, Credits2} = update_credit(CTag, Credit, Drain, Credits), {Unblock, Limiter#token{q_state = Credits2}}. +drained(Limiter = #token{q_state = Credits}) -> + {CTagCredits, Credits2} = + dict:fold( + fun (CTag, #credit{credit = C, drain = true}, {Acc, Creds0}) -> + {[{CTag, C} | Acc], write_credit(CTag, 0, false, Creds0)}; + (_CTag, #credit{credit = _C, drain = false}, {Acc, Creds0}) -> + {Acc, Creds0} + end, {[], Credits}, Credits), + {CTagCredits, Limiter#token{q_state = Credits2}}. + forget_consumer(Limiter = #token{q_state = Credits}, CTag) -> Limiter#token{q_state = dict:erase(CTag, Credits)}. @@ -179,19 +183,17 @@ copy_queue_state(#token{q_state = Credits}, Token) -> %% we get the queue to hold a bit of state for us (#token.q_state), and %% maintain a fiction that the limiter is making the decisions... -record_send_q(CTag, Len, ChPid, Credits) -> +record_send_q(CTag, Credits) -> case dict:find(CTag, Credits) of {ok, #credit{credit = Credit, drain = Drain}} -> - NewCredit = maybe_drain(Len - 1, Drain, CTag, ChPid, Credit - 1), - write_credit(CTag, NewCredit, Drain, Credits); + write_credit(CTag, Credit, Drain, Credits); error -> Credits end. -update_credit(CTag, Len, ChPid, Credit, Drain, Credits) -> - NewCredit = maybe_drain(Len, Drain, CTag, ChPid, Credit), - NewCredits = write_credit(CTag, NewCredit, Drain, Credits), - case NewCredit > 0 of +update_credit(CTag, Credit, Drain, Credits) -> + NewCredits = write_credit(CTag, Credit, Drain, Credits), + case Credit > 0 of true -> {[CTag], NewCredits}; false -> {[], NewCredits} end. @@ -199,13 +201,6 @@ update_credit(CTag, Len, ChPid, Credit, Drain, Credits) -> write_credit(CTag, Credit, Drain, Credits) -> dict:store(CTag, #credit{credit = Credit, drain = Drain}, Credits). -maybe_drain(0, true, CTag, ChPid, Credit) -> - rabbit_channel:send_drained(ChPid, CTag, Credit), - 0; %% Magic reduction to 0 - -maybe_drain(_, _, _, _, Credit) -> - Credit. - %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- |