diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2012-02-07 14:04:49 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-02-07 14:04:49 +0000 |
commit | 4ec9137027455dd34ddf1cdeb758c1eee0428ad1 (patch) | |
tree | 360e040b56c1fd8a3a136aceba6b499e73c36bbf | |
parent | 2fdee97e0f1cad03faf1827eb6f43ec78c1d3e1f (diff) | |
download | rabbitmq-server-4ec9137027455dd34ddf1cdeb758c1eee0428ad1.tar.gz |
credit_flow-like throttling of queue->writer message flow
Previously we effectively had a credit_spec of {100,1}, i.e. the queue
would send up to 100 messages to a consumer channel/writer, and the
writer would 'ack' them individually. That is horrendeously
inefficient:
- when draining a queue, after the queue had sent 100 messages it
would block the consumer, unblock when the notify_sent 'ack' came in,
send another message to the channel/queue, block again. So a vast
amount of work per message
- in a cluster, the notify_sent 'acks' effectively doubled the
cross-cluster traffic
We now use a scheme much like credit_flow. Except we cannot *actually*
use credit_flow because
- rather than wanting to know whether a sender is lacking credit for
*any* receiver, as indicated by credit_flow:blocked(), we need to know
*which* receiver we are lacking credit for.
- (lack of) credit from receiver should *not* propagate to senders,
i.e. sender and receiver credits are completely decoupled. Instead the
queue should, er, queue messages when receivers cannot keep up.
While we could modify credit_flow to accomodate the above, the changes
would be quite unpleasant and not actually reduce the amount of code
vs implementing a more specialised scheme.
The downside is that the contract for using
rabbit_amqqueue:notify_sent becomes somewhat mysterious. In
particular it sets up a monitor for queues in the caller, and expects
the caller to invoke rabbit_amqqueue:notify_sent_queue_down when a
'DOWN' message is received.
-rw-r--r-- | src/rabbit_amqqueue.erl | 21 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 8 | ||||
-rw-r--r-- | src/rabbit_writer.erl | 3 |
3 files changed, 26 insertions, 6 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index fb4540a3..01695cd0 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -25,7 +25,7 @@ -export([force_event_refresh/0]). -export([consumers/1, consumers_all/1, consumer_info_keys/0]). -export([basic_get/3, basic_consume/7, basic_cancel/4]). --export([notify_sent/2, unblock/2, flush_all/2]). +-export([notify_sent/2, notify_sent_queue_down/1, unblock/2, flush_all/2]). -export([notify_down_all/2, limit_all/3]). -export([on_node_down/1]). -export([store_queue/1]). @@ -40,6 +40,8 @@ -define(INTEGER_ARG_TYPES, [byte, short, signedint, long]). +-define(MORE_CONSUMER_CREDIT_AFTER, 50). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -137,6 +139,7 @@ -spec(basic_cancel/4 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). +-spec(notify_sent_queue_down/1 :: (pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). -spec(flush_all/2 :: (qpids(), pid()) -> 'ok'). -spec(internal_delete/1 :: @@ -461,7 +464,21 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> ok = delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). notify_sent(QPid, ChPid) -> - gen_server2:cast(QPid, {notify_sent, ChPid}). + Key = {consumer_credit_to, QPid}, + put(Key, case get(Key) of + 1 -> gen_server2:cast( + QPid, {notify_sent, ChPid, + ?MORE_CONSUMER_CREDIT_AFTER}), + ?MORE_CONSUMER_CREDIT_AFTER; + undefined -> erlang:monitor(process, QPid), + ?MORE_CONSUMER_CREDIT_AFTER - 1; + C0 -> C0 - 1 + end), + ok. + +notify_sent_queue_down(QPid) -> + erase({consumer_credit_to, QPid}), + ok. unblock(QPid, ChPid) -> delegate_cast(QPid, {unblock, ChPid}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index c4921510..2b4b7418 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -20,7 +20,7 @@ -behaviour(gen_server2). --define(UNSENT_MESSAGE_LIMIT, 100). +-define(UNSENT_MESSAGE_LIMIT, 200). -define(SYNC_INTERVAL, 25). %% milliseconds -define(RAM_DURATION_UPDATE_INTERVAL, 5000). @@ -830,7 +830,7 @@ prioritise_cast(Msg, _State) -> {set_maximum_since_use, _Age} -> 8; {ack, _AckTags, _ChPid} -> 7; {reject, _AckTags, _Requeue, _ChPid} -> 7; - {notify_sent, _ChPid} -> 7; + {notify_sent, _ChPid, _Credit} -> 7; {unblock, _ChPid} -> 7; {run_backing_queue, _Mod, _Fun} -> 6; _ -> 0 @@ -1064,11 +1064,11 @@ handle_cast({unblock, ChPid}, State) -> possibly_unblock(State, ChPid, fun (C) -> C#cr{is_limit_active = false} end)); -handle_cast({notify_sent, ChPid}, State) -> +handle_cast({notify_sent, ChPid, Credit}, State) -> noreply( possibly_unblock(State, ChPid, fun (C = #cr{unsent_message_count = Count}) -> - C#cr{unsent_message_count = Count - 1} + C#cr{unsent_message_count = Count - Credit} end)); handle_cast({limit, ChPid, Limiter}, State) -> diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index 269128df..dc74b2f5 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -129,6 +129,9 @@ handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content}, ok = internal_send_command_async(MethodRecord, Content, State), rabbit_amqqueue:notify_sent(QPid, ChPid), State; +handle_message({'DOWN', _MRef, process, QPid, _Reason}, State) -> + rabbit_amqqueue:notify_sent_queue_down(QPid), + State; handle_message({inet_reply, _, ok}, State) -> State; handle_message({inet_reply, _, Status}, _State) -> |