summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-02-07 14:04:49 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2012-02-07 14:04:49 +0000
commit4ec9137027455dd34ddf1cdeb758c1eee0428ad1 (patch)
tree360e040b56c1fd8a3a136aceba6b499e73c36bbf
parent2fdee97e0f1cad03faf1827eb6f43ec78c1d3e1f (diff)
downloadrabbitmq-server-4ec9137027455dd34ddf1cdeb758c1eee0428ad1.tar.gz
credit_flow-like throttling of queue->writer message flow
Previously we effectively had a credit_spec of {100,1}, i.e. the queue would send up to 100 messages to a consumer channel/writer, and the writer would 'ack' them individually. That is horrendeously inefficient: - when draining a queue, after the queue had sent 100 messages it would block the consumer, unblock when the notify_sent 'ack' came in, send another message to the channel/queue, block again. So a vast amount of work per message - in a cluster, the notify_sent 'acks' effectively doubled the cross-cluster traffic We now use a scheme much like credit_flow. Except we cannot *actually* use credit_flow because - rather than wanting to know whether a sender is lacking credit for *any* receiver, as indicated by credit_flow:blocked(), we need to know *which* receiver we are lacking credit for. - (lack of) credit from receiver should *not* propagate to senders, i.e. sender and receiver credits are completely decoupled. Instead the queue should, er, queue messages when receivers cannot keep up. While we could modify credit_flow to accomodate the above, the changes would be quite unpleasant and not actually reduce the amount of code vs implementing a more specialised scheme. The downside is that the contract for using rabbit_amqqueue:notify_sent becomes somewhat mysterious. In particular it sets up a monitor for queues in the caller, and expects the caller to invoke rabbit_amqqueue:notify_sent_queue_down when a 'DOWN' message is received.
-rw-r--r--src/rabbit_amqqueue.erl21
-rw-r--r--src/rabbit_amqqueue_process.erl8
-rw-r--r--src/rabbit_writer.erl3
3 files changed, 26 insertions, 6 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index fb4540a3..01695cd0 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -25,7 +25,7 @@
-export([force_event_refresh/0]).
-export([consumers/1, consumers_all/1, consumer_info_keys/0]).
-export([basic_get/3, basic_consume/7, basic_cancel/4]).
--export([notify_sent/2, unblock/2, flush_all/2]).
+-export([notify_sent/2, notify_sent_queue_down/1, unblock/2, flush_all/2]).
-export([notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
-export([store_queue/1]).
@@ -40,6 +40,8 @@
-define(INTEGER_ARG_TYPES, [byte, short, signedint, long]).
+-define(MORE_CONSUMER_CREDIT_AFTER, 50).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -137,6 +139,7 @@
-spec(basic_cancel/4 ::
(rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
+-spec(notify_sent_queue_down/1 :: (pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
-spec(flush_all/2 :: (qpids(), pid()) -> 'ok').
-spec(internal_delete/1 ::
@@ -461,7 +464,21 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
ok = delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
notify_sent(QPid, ChPid) ->
- gen_server2:cast(QPid, {notify_sent, ChPid}).
+ Key = {consumer_credit_to, QPid},
+ put(Key, case get(Key) of
+ 1 -> gen_server2:cast(
+ QPid, {notify_sent, ChPid,
+ ?MORE_CONSUMER_CREDIT_AFTER}),
+ ?MORE_CONSUMER_CREDIT_AFTER;
+ undefined -> erlang:monitor(process, QPid),
+ ?MORE_CONSUMER_CREDIT_AFTER - 1;
+ C0 -> C0 - 1
+ end),
+ ok.
+
+notify_sent_queue_down(QPid) ->
+ erase({consumer_credit_to, QPid}),
+ ok.
unblock(QPid, ChPid) ->
delegate_cast(QPid, {unblock, ChPid}).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index c4921510..2b4b7418 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -20,7 +20,7 @@
-behaviour(gen_server2).
--define(UNSENT_MESSAGE_LIMIT, 100).
+-define(UNSENT_MESSAGE_LIMIT, 200).
-define(SYNC_INTERVAL, 25). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
@@ -830,7 +830,7 @@ prioritise_cast(Msg, _State) ->
{set_maximum_since_use, _Age} -> 8;
{ack, _AckTags, _ChPid} -> 7;
{reject, _AckTags, _Requeue, _ChPid} -> 7;
- {notify_sent, _ChPid} -> 7;
+ {notify_sent, _ChPid, _Credit} -> 7;
{unblock, _ChPid} -> 7;
{run_backing_queue, _Mod, _Fun} -> 6;
_ -> 0
@@ -1064,11 +1064,11 @@ handle_cast({unblock, ChPid}, State) ->
possibly_unblock(State, ChPid,
fun (C) -> C#cr{is_limit_active = false} end));
-handle_cast({notify_sent, ChPid}, State) ->
+handle_cast({notify_sent, ChPid, Credit}, State) ->
noreply(
possibly_unblock(State, ChPid,
fun (C = #cr{unsent_message_count = Count}) ->
- C#cr{unsent_message_count = Count - 1}
+ C#cr{unsent_message_count = Count - Credit}
end));
handle_cast({limit, ChPid, Limiter}, State) ->
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index 269128df..dc74b2f5 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -129,6 +129,9 @@ handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content},
ok = internal_send_command_async(MethodRecord, Content, State),
rabbit_amqqueue:notify_sent(QPid, ChPid),
State;
+handle_message({'DOWN', _MRef, process, QPid, _Reason}, State) ->
+ rabbit_amqqueue:notify_sent_queue_down(QPid),
+ State;
handle_message({inet_reply, _, ok}, State) ->
State;
handle_message({inet_reply, _, Status}, _State) ->