diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2013-03-22 19:01:16 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2013-03-22 19:01:16 +0000 |
commit | bb754c38ec9b0fb562036b39a9762a905158227c (patch) | |
tree | 2274b3f3d254145d5bdd3ada7479390d436cbe32 | |
parent | 4dbd221c2a2486676d7ee031c9b4b1ce8d5355aa (diff) | |
parent | 7aeba22413273a33eb3693f4d216d6d17425019d (diff) | |
download | rabbitmq-server-bb754c38ec9b0fb562036b39a9762a905158227c.tar.gz |
merge bug23749 into default
-rw-r--r-- | src/rabbit_amqqueue.erl | 17 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 111 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 49 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 138 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 2 |
5 files changed, 255 insertions, 62 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 3f0a7f9c..8c00c85c 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -26,9 +26,9 @@ -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). -export([force_event_refresh/0, wake_up/1]). -export([consumers/1, consumers_all/1, consumer_info_keys/0]). --export([basic_get/4, basic_consume/8, basic_cancel/4]). +-export([basic_get/4, basic_consume/9, basic_cancel/4]). -export([notify_sent/2, notify_sent_queue_down/1, resume/2, flush_all/2]). --export([notify_down_all/2, activate_limit_all/2]). +-export([notify_down_all/2, activate_limit_all/2, credit/5]). -export([on_node_down/1]). -export([update/2, store_queue/1, policy_changed/2]). -export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1, @@ -147,9 +147,11 @@ -spec(activate_limit_all/2 :: (qpids(), pid()) -> ok_or_errors()). -spec(basic_get/4 :: (rabbit_types:amqqueue(), pid(), boolean(), pid()) -> {'ok', non_neg_integer(), qmsg()} | 'empty'). --spec(basic_consume/8 :: +-spec(credit/5 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), + non_neg_integer(), boolean()) -> 'ok'). +-spec(basic_consume/9 :: (rabbit_types:amqqueue(), boolean(), pid(), pid(), boolean(), - rabbit_types:ctag(), boolean(), any()) + rabbit_types:ctag(), boolean(), {non_neg_integer(), boolean()} | 'none', any()) -> rabbit_types:ok_or_error('exclusive_consume_unavailable')). -spec(basic_cancel/4 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok'). @@ -540,13 +542,16 @@ notify_down_all(QPids, ChPid) -> activate_limit_all(QPids, ChPid) -> delegate:cast(QPids, {activate_limit, ChPid}). +credit(#amqqueue{pid = QPid}, ChPid, CTag, Credit, Drain) -> + delegate:cast(QPid, {credit, ChPid, CTag, Credit, Drain}). + basic_get(#amqqueue{pid = QPid}, ChPid, NoAck, LimiterPid) -> delegate:call(QPid, {basic_get, ChPid, NoAck, LimiterPid}). basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, LimiterPid, LimiterActive, - ConsumerTag, ExclusiveConsume, OkMsg) -> + ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg) -> delegate:call(QPid, {basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, - ConsumerTag, ExclusiveConsume, OkMsg}). + ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg}). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index efe8efc4..e24568bb 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -66,8 +66,12 @@ monitor_ref, acktags, consumer_count, + %% Queue of {ChPid, #consumer{}} for consumers which have + %% been blocked for any reason blocked_consumers, + %% The limiter itself limiter, + %% Internal flow control for queue -> writer unsent_message_count}). %%---------------------------------------------------------------------------- @@ -406,6 +410,21 @@ block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry) -> is_ch_blocked(#cr{unsent_message_count = Count, limiter = Limiter}) -> Count >= ?UNSENT_MESSAGE_LIMIT orelse rabbit_limiter:is_suspended(Limiter). +maybe_send_drained(WasEmpty, State) -> + case (not WasEmpty) andalso is_empty(State) of + true -> [send_drained(C) || C <- all_ch_record()]; + false -> ok + end, + State. + +send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) -> + case rabbit_limiter:drained(Limiter) of + {[], Limiter} -> ok; + {CTagCredit, Limiter2} -> rabbit_channel:send_drained( + ChPid, CTagCredit), + update_ch_record(C#cr{limiter = Limiter2}) + end. + deliver_msgs_to_consumers(_DeliverFun, true, State) -> {true, State}; deliver_msgs_to_consumers(DeliverFun, false, @@ -426,7 +445,8 @@ deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) -> true -> block_consumer(C, E), {false, State}; false -> case rabbit_limiter:can_send(C#cr.limiter, - Consumer#consumer.ack_required) of + Consumer#consumer.ack_required, + Consumer#consumer.tag) of {suspend, Limiter} -> block_consumer(C#cr{limiter = Limiter}, E), {false, State}; @@ -585,14 +605,16 @@ maybe_drop_head(State = #q{max_length = MaxLen, requeue_and_run(AckTags, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> + WasEmpty = BQ:is_empty(BQS), {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS), {_Dropped, State1} = maybe_drop_head(State#q{backing_queue_state = BQS1}), - run_message_queue(drop_expired_msgs(State1)). + run_message_queue(maybe_send_drained(WasEmpty, drop_expired_msgs(State1))). fetch(AckRequired, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {Result, BQS1} = BQ:fetch(AckRequired, BQS), - {Result, drop_expired_msgs(State#q{backing_queue_state = BQS1})}. + State1 = drop_expired_msgs(State#q{backing_queue_state = BQS1}), + {Result, maybe_send_drained(Result =:= empty, State1)}. ack(AckTags, ChPid, State) -> subtract_acks(ChPid, AckTags, State, @@ -621,20 +643,29 @@ remove_consumers(ChPid, Queue, QName) -> possibly_unblock(State, ChPid, Update) -> case lookup_ch(ChPid) of - not_found -> + not_found -> State; + C -> C1 = Update(C), + case is_ch_blocked(C) andalso not is_ch_blocked(C1) of + false -> update_ch_record(C1), + State; + true -> unblock(State, C1) + end + end. + +unblock(State, C = #cr{limiter = Limiter}) -> + case lists:partition( + fun({_ChPid, #consumer{tag = CTag}}) -> + rabbit_limiter:is_consumer_blocked(Limiter, CTag) + end, queue:to_list(C#cr.blocked_consumers)) of + {_, []} -> + update_ch_record(C), State; - C -> - C1 = Update(C), - case is_ch_blocked(C) andalso not is_ch_blocked(C1) of - false -> update_ch_record(C1), - State; - true -> #cr{blocked_consumers = Consumers} = C1, - update_ch_record( - C1#cr{blocked_consumers = queue:new()}), - AC1 = queue:join(State#q.active_consumers, - Consumers), - run_message_queue(State#q{active_consumers = AC1}) - end + {Blocked, Unblocked} -> + BlockedQ = queue:from_list(Blocked), + UnblockedQ = queue:from_list(Unblocked), + update_ch_record(C#cr{blocked_consumers = BlockedQ}), + AC1 = queue:join(State#q.active_consumers, UnblockedQ), + run_message_queue(State#q{active_consumers = AC1}) end. should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false; @@ -731,6 +762,11 @@ calculate_msg_expiry(#basic_message{content = Content}, TTL) -> T -> now_micros() + T * 1000 end. +%% Logically this function should invoke maybe_send_drained/2. +%% However, that is expensive. Since some frequent callers of +%% drop_expired_msgs/1, in particular deliver_or_enqueue/3, cannot +%% possibly cause the queue to become empty, we push the +%% responsibility to the callers. So be cautious when adding new ones. drop_expired_msgs(State) -> case is_empty(State) of true -> State; @@ -1113,7 +1149,7 @@ handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From, end; handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, - ConsumerTag, ExclusiveConsume, OkMsg}, + ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg}, _From, State = #q{exclusive_consumer = Holder}) -> case check_exclusive_access(Holder, ExclusiveConsume, State) of in_use -> @@ -1125,8 +1161,17 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, true -> rabbit_limiter:activate(Limiter); false -> Limiter end, - update_ch_record(C#cr{consumer_count = Count + 1, - limiter = Limiter1}), + Limiter2 = case CreditArgs of + none -> Limiter1; + {Crd, Drain} -> rabbit_limiter:credit( + Limiter1, ConsumerTag, Crd, Drain) + end, + C1 = update_ch_record(C#cr{consumer_count = Count + 1, + limiter = Limiter2}), + case is_empty(State) of + true -> send_drained(C1); + false -> ok + end, Consumer = #consumer{tag = ConsumerTag, ack_required = not NoAck}, ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag}; @@ -1156,8 +1201,9 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From, 1 -> rabbit_limiter:deactivate(Limiter); _ -> Limiter end, + Limiter2 = rabbit_limiter:forget_consumer(Limiter1, ConsumerTag), update_ch_record(C#cr{consumer_count = Count - 1, - limiter = Limiter1, + limiter = Limiter2, blocked_consumers = Blocked1}), State1 = State#q{ exclusive_consumer = case Holder of @@ -1191,7 +1237,8 @@ handle_call({delete, IfUnused, IfEmpty}, From, handle_call(purge, _From, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {Count, BQS1} = BQ:purge(BQS), - reply({ok, Count}, State#q{backing_queue_state = BQS1}); + State1 = State#q{backing_queue_state = BQS1}, + reply({ok, Count}, maybe_send_drained(Count =:= 0, State1)); handle_call({requeue, AckTags, ChPid}, From, State) -> gen_server2:reply(From, ok), @@ -1337,6 +1384,24 @@ handle_cast(stop_mirroring, State = #q{backing_queue = BQ, noreply(State#q{backing_queue = BQ1, backing_queue_state = BQS1}); +handle_cast({credit, ChPid, CTag, Credit, Drain}, + State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + Len = BQ:len(BQS), + rabbit_channel:send_credit_reply(ChPid, Len), + C = #cr{limiter = Limiter} = lookup_ch(ChPid), + C1 = C#cr{limiter = rabbit_limiter:credit(Limiter, CTag, Credit, Drain)}, + noreply(case Drain andalso Len == 0 of + true -> update_ch_record(C1), + send_drained(C1), + State; + false -> case is_ch_blocked(C1) of + true -> update_ch_record(C1), + State; + false -> unblock(State, C1) + end + end); + handle_cast(wake_up, State) -> noreply(State). @@ -1356,7 +1421,9 @@ handle_info(maybe_expire, State) -> end; handle_info(drop_expired, State) -> - noreply(drop_expired_msgs(State#q{ttl_timer_ref = undefined})); + WasEmpty = is_empty(State), + State1 = drop_expired_msgs(State#q{ttl_timer_ref = undefined}), + noreply(maybe_send_drained(WasEmpty, State1)); handle_info(emit_stats, State) -> emit_stats(State), diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index e9f69b62..39bd375a 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -21,7 +21,8 @@ -behaviour(gen_server2). -export([start_link/11, do/2, do/3, do_flow/3, flush/1, shutdown/1]). --export([send_command/2, deliver/4, flushed/2]). +-export([send_command/2, deliver/4, send_credit_reply/2, send_drained/2, + flushed/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). -export([refresh_config_local/0, ready_for_close/1]). -export([force_event_refresh/0]). @@ -94,6 +95,9 @@ -spec(deliver/4 :: (pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg()) -> 'ok'). +-spec(send_credit_reply/2 :: (pid(), non_neg_integer()) -> 'ok'). +-spec(send_drained/2 :: (pid(), [{rabbit_types:ctag(), non_neg_integer()}]) + -> 'ok'). -spec(flushed/2 :: (pid(), pid()) -> 'ok'). -spec(list/0 :: () -> [pid()]). -spec(list_local/0 :: () -> [pid()]). @@ -138,6 +142,12 @@ send_command(Pid, Msg) -> deliver(Pid, ConsumerTag, AckRequired, Msg) -> gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}). +send_credit_reply(Pid, Len) -> + gen_server2:cast(Pid, {send_credit_reply, Len}). + +send_drained(Pid, CTagCredit) -> + gen_server2:cast(Pid, {send_drained, CTagCredit}). + flushed(Pid, QPid) -> gen_server2:cast(Pid, {flushed, QPid}). @@ -315,6 +325,18 @@ handle_cast({deliver, ConsumerTag, AckRequired, Content), noreply(record_sent(ConsumerTag, AckRequired, Msg, State)); +handle_cast({send_credit_reply, Len}, State = #ch{writer_pid = WriterPid}) -> + ok = rabbit_writer:send_command( + WriterPid, #'basic.credit_ok'{available = Len}), + noreply(State); + +handle_cast({send_drained, CTagCredit}, State = #ch{writer_pid = WriterPid}) -> + [ok = rabbit_writer:send_command( + WriterPid, #'basic.credit_drained'{consumer_tag = ConsumerTag, + credit_drained = CreditDrained}) + || {ConsumerTag, CreditDrained} <- CTagCredit], + noreply(State); + handle_cast(force_event_refresh, State) -> rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), noreply(State); @@ -711,7 +733,8 @@ handle_method(#'basic.consume'{queue = QueueNameBin, no_local = _, % FIXME: implement no_ack = NoAck, exclusive = ExclusiveConsume, - nowait = NoWait}, + nowait = NoWait, + arguments = Arguments}, _, State = #ch{conn_pid = ConnPid, limiter = Limiter, consumer_mapping = ConsumerMapping}) -> @@ -737,6 +760,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, rabbit_limiter:pid(Limiter), rabbit_limiter:is_active(Limiter), ActualConsumerTag, ExclusiveConsume, + parse_credit_args(Arguments), ok_msg(NoWait, #'basic.consume_ok'{ consumer_tag = ActualConsumerTag})), Q} @@ -1112,6 +1136,17 @@ handle_method(#'channel.flow'{active = false}, _, State1#ch{blocking = sets:from_list(QPids)})} end; +handle_method(#'basic.credit'{consumer_tag = CTag, + credit = Credit, + drain = Drain}, _, + State = #ch{consumer_mapping = Consumers}) -> + case dict:find(CTag, Consumers) of + {ok, Q} -> ok = rabbit_amqqueue:credit( + Q, self(), CTag, Credit, Drain), + {noreply, State}; + error -> precondition_failed("unknown consumer tag '~s'", [CTag]) + end; + handle_method(_MethodRecord, _Content, _State) -> rabbit_misc:protocol_error( command_invalid, "unimplemented method", []). @@ -1178,6 +1213,16 @@ handle_consuming_queue_down(QPid, handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) -> State#ch{delivering_queues = sets:del_element(QPid, DQ)}. +parse_credit_args(Arguments) -> + case rabbit_misc:table_lookup(Arguments, <<"x-credit">>) of + {table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>), + rabbit_misc:table_lookup(T, <<"drain">>)} of + {{long, Credit}, {boolean, Drain}} -> {Credit, Drain}; + _ -> none + end; + undefined -> none + end. + binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, RoutingKey, Arguments, ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath, diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 430c2716..3a279940 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -16,17 +16,18 @@ %% The purpose of the limiter is to stem the flow of messages from %% queues to channels, in order to act upon various protocol-level -%% flow control mechanisms, specifically AMQP's basic.qos -%% prefetch_count and channel.flow. +%% flow control mechanisms, specifically AMQP 0-9-1's basic.qos +%% prefetch_count and channel.flow, and AMQP 1.0's link (aka consumer) +%% credit mechanism. %% %% Each channel has an associated limiter process, created with %% start_link/1, which it passes to queues on consumer creation with -%% rabbit_amqqueue:basic_consume/8, and rabbit_amqqueue:basic_get/4. +%% rabbit_amqqueue:basic_consume/9, and rabbit_amqqueue:basic_get/4. %% The latter isn't strictly necessary, since basic.get is not %% subject to limiting, but it means that whenever a queue knows about %% a channel, it also knows about its limiter, which is less fiddly. %% -%% Th limiter process holds state that is, in effect, shared between +%% The limiter process holds state that is, in effect, shared between %% the channel and all queues from which the channel is %% consuming. Essentially all these queues are competing for access to %% a single, limited resource - the ability to deliver messages via @@ -54,33 +55,46 @@ %% inactive. In practice it is rare for that to happen, though we %% could optimise this case in the future. %% +%% In addition, the consumer credit bookkeeping is local to queues, so +%% it is not necessary to store information about it in the limiter +%% process. But for abstraction we hide it from the queue behind the +%% limiter API, and it therefore becomes part of the queue local +%% state. +%% %% The interactions with the limiter are as follows: %% %% 1. Channels tell the limiter about basic.qos prefetch counts - %% that's what the limit_prefetch/3, unlimit_prefetch/1, %% is_prefetch_limited/1, get_prefetch_limit/1 API functions are %% about - and channel.flow blocking - that's what block/1, -%% unblock/1 and is_blocked/1 are for. +%% unblock/1 and is_blocked/1 are for. They also tell the limiter +%% queue state (via the queue) about consumer credit changes - +%% that's what credit/4 is for. +%% +%% 2. Queues also tell the limiter queue state about the queue +%% becoming empty (via drained/1) and consumers leaving (via +%% forget_consumer/2). %% -%% 2. Queues register with the limiter - this happens as part of +%% 3. Queues register with the limiter - this happens as part of %% activate/1. %% %% 4. The limiter process maintains an internal counter of 'messages %% sent but not yet acknowledged', called the 'volume'. %% -%% 5. Queues ask the limiter for permission (with can_send/2) whenever +%% 5. Queues ask the limiter for permission (with can_send/3) whenever %% they want to deliver a message to a channel. The limiter checks -%% whether a) the channel isn't blocked by channel.flow, and b) the -%% volume has not yet reached the prefetch limit. If so it -%% increments the volume and tells the queue to proceed. Otherwise -%% it marks the queue as requiring notification (see below) and -%% tells the queue not to proceed. +%% whether a) the channel isn't blocked by channel.flow, b) the +%% volume has not yet reached the prefetch limit, and c) whether +%% the consumer has enough credit. If so it increments the volume +%% and tells the queue to proceed. Otherwise it marks the queue as +%% requiring notification (see below) and tells the queue not to +%% proceed. %% -%% 6. A queue that has told to proceed (by the return value of -%% can_send/2) sends the message to the channel. Conversely, a +%% 6. A queue that has been told to proceed (by the return value of +%% can_send/3) sends the message to the channel. Conversely, a %% queue that has been told not to proceed, will not attempt to %% deliver that message, or any future messages, to the -%% channel. This is accomplished by can_send/2 capturing the +%% channel. This is accomplished by can_send/3 capturing the %% outcome in the local state, where it can be accessed with %% is_suspended/1. %% @@ -88,14 +102,14 @@ %% how many messages were ack'ed. The limiter process decrements %% the volume and if it falls below the prefetch_count then it %% notifies (through rabbit_amqqueue:resume/2) all the queues -%% requiring notification, i.e. all those that had a can_send/2 +%% requiring notification, i.e. all those that had a can_send/3 %% request denied. %% %% 8. Upon receipt of such a notification, queues resume delivery to %% the channel, i.e. they will once again start asking limiter, as %% described in (5). %% -%% 9. When a queues has no more consumers associated with a particular +%% 9. When a queue has no more consumers associated with a particular %% channel, it deactivates use of the limiter with deactivate/1, %% which alters the local state such that no further interactions %% with the limiter process take place until a subsequent @@ -111,8 +125,9 @@ is_prefetch_limited/1, is_blocked/1, is_active/1, get_prefetch_limit/1, ack/2, pid/1]). %% queue API --export([client/1, activate/1, can_send/2, resume/1, deactivate/1, - is_suspended/1]). +-export([client/1, activate/1, can_send/3, resume/1, deactivate/1, + is_suspended/1, is_consumer_blocked/2, credit/4, drained/1, + forget_consumer/2]). %% callbacks -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, prioritise_call/3]). @@ -120,7 +135,7 @@ %%---------------------------------------------------------------------------- -record(lstate, {pid, prefetch_limited, blocked}). --record(qstate, {pid, state}). +-record(qstate, {pid, state, credits}). -ifdef(use_specs). @@ -147,11 +162,17 @@ -spec(client/1 :: (pid()) -> qstate()). -spec(activate/1 :: (qstate()) -> qstate()). --spec(can_send/2 :: (qstate(), boolean()) -> +-spec(can_send/3 :: (qstate(), boolean(), rabbit_types:ctag()) -> {'continue' | 'suspend', qstate()}). -spec(resume/1 :: (qstate()) -> qstate()). -spec(deactivate/1 :: (qstate()) -> qstate()). -spec(is_suspended/1 :: (qstate()) -> boolean()). +-spec(is_consumer_blocked/2 :: (qstate(), rabbit_types:ctag()) -> boolean()). +-spec(credit/4 :: (qstate(), rabbit_types:ctag(), non_neg_integer(), boolean()) + -> qstate()). +-spec(drained/1 :: (qstate()) + -> {[{rabbit_types:ctag(), non_neg_integer()}], qstate()}). +-spec(forget_consumer/2 :: (qstate(), rabbit_types:ctag()) -> qstate()). -endif. @@ -166,6 +187,8 @@ %% notified of a change in the limit or volume that may allow it to %% deliver more messages via the limiter's channel. +-record(credit, {credit = 0, drain = false}). + %%---------------------------------------------------------------------------- %% API %%---------------------------------------------------------------------------- @@ -208,23 +231,29 @@ ack(L, AckCount) -> gen_server:cast(L#lstate.pid, {ack, AckCount}). pid(#lstate{pid = Pid}) -> Pid. -client(Pid) -> #qstate{pid = Pid, state = dormant}. +client(Pid) -> #qstate{pid = Pid, state = dormant, credits = gb_trees:empty()}. activate(L = #qstate{state = dormant}) -> ok = gen_server:cast(L#qstate.pid, {register, self()}), L#qstate{state = active}; activate(L) -> L. -can_send(L = #qstate{state = active}, AckRequired) -> +can_send(L = #qstate{pid = Pid, state = State, credits = Credits}, + AckRequired, CTag) -> + case is_consumer_blocked(L, CTag) of + false -> case (State =/= active orelse + safe_call(Pid, {can_send, self(), AckRequired}, true)) of + true -> {continue, L#qstate{ + credits = record_send_q(CTag, Credits)}}; + false -> {suspend, L#qstate{state = suspended}} + end; + true -> {suspend, L} + end. + +safe_call(Pid, Msg, ExitValue) -> rabbit_misc:with_exit_handler( - fun () -> {continue, L} end, - fun () -> Msg = {can_send, self(), AckRequired}, - case gen_server2:call(L#qstate.pid, Msg, infinity) of - true -> {continue, L}; - false -> {suspend, L#qstate{state = suspended}} - end - end); -can_send(L, _AckRequired) -> {continue, L}. + fun () -> ExitValue end, + fun () -> gen_server2:call(Pid, Msg, infinity) end). resume(L) -> L#qstate{state = active}. @@ -236,6 +265,53 @@ deactivate(L) -> is_suspended(#qstate{state = suspended}) -> true; is_suspended(#qstate{}) -> false. +is_consumer_blocked(#qstate{credits = Credits}, CTag) -> + case gb_trees:lookup(CTag, Credits) of + {value, #credit{credit = C}} when C > 0 -> false; + {value, #credit{}} -> true; + none -> false + end. + +credit(Limiter = #qstate{credits = Credits}, CTag, Credit, Drain) -> + Limiter#qstate{credits = update_credit(CTag, Credit, Drain, Credits)}. + +drained(Limiter = #qstate{credits = Credits}) -> + {CTagCredits, Credits2} = + rabbit_misc:gb_trees_fold( + fun (CTag, #credit{credit = C, drain = true}, {Acc, Creds0}) -> + {[{CTag, C} | Acc], update_credit(CTag, 0, false, Creds0)}; + (_CTag, #credit{credit = _C, drain = false}, {Acc, Creds0}) -> + {Acc, Creds0} + end, {[], Credits}, Credits), + {CTagCredits, Limiter#qstate{credits = Credits2}}. + +forget_consumer(Limiter = #qstate{credits = Credits}, CTag) -> + Limiter#qstate{credits = gb_trees:delete_any(CTag, Credits)}. + +%%---------------------------------------------------------------------------- +%% Queue-local code +%%---------------------------------------------------------------------------- + +%% We want to do all the AMQP 1.0-ish link level credit calculations +%% in the queue (to do them elsewhere introduces a ton of +%% races). However, it's a big chunk of code that is conceptually very +%% linked to the limiter concept. So we get the queue to hold a bit of +%% state for us (#qstate.credits), and maintain a fiction that the +%% limiter is making the decisions... + +record_send_q(CTag, Credits) -> + case gb_trees:lookup(CTag, Credits) of + {value, #credit{credit = Credit, drain = Drain}} -> + update_credit(CTag, Credit - 1, Drain, Credits); + none -> + Credits + end. + +update_credit(CTag, Credit, Drain, Credits) -> + %% Using up all credit implies no need to send a 'drained' event + Drain1 = Drain andalso Credit > 0, + gb_trees:enter(CTag, #credit{credit = Credit, drain = Drain1}, Credits). + %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index b2c80364..e7b69879 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1107,7 +1107,7 @@ test_server_status() -> rabbit_misc:r(<<"/">>, queue, Name), false, false, [], none)]], ok = rabbit_amqqueue:basic_consume( - Q, true, Ch, Limiter, false, <<"ctag">>, true, undefined), + Q, true, Ch, Limiter, false, <<"ctag">>, true, none, undefined), %% list queues ok = info_action(list_queues, rabbit_amqqueue:info_keys(), true), |