diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2012-02-09 14:26:42 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2012-02-09 14:26:42 +0000 |
commit | 54250f72aac333157ff6a4497ab69682a3bc27fb (patch) | |
tree | fe16c26d1e3da8e99370641b54bf4601cd47433d | |
parent | e19af4d7f8036f4317b956221010491899d7302d (diff) | |
parent | 2ee78027411176487c1dd121e4583ddb0d493e3d (diff) | |
download | rabbitmq-server-54250f72aac333157ff6a4497ab69682a3bc27fb.tar.gz |
Merge bug24668 (make consumer flow control more like credit_flow; improve performance along the way)
-rw-r--r-- | src/rabbit_amqqueue.erl | 21 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 8 | ||||
-rw-r--r-- | src/rabbit_writer.erl | 3 |
3 files changed, 26 insertions, 6 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index fb4540a3..a7dfd535 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -25,7 +25,7 @@ -export([force_event_refresh/0]). -export([consumers/1, consumers_all/1, consumer_info_keys/0]). -export([basic_get/3, basic_consume/7, basic_cancel/4]). --export([notify_sent/2, unblock/2, flush_all/2]). +-export([notify_sent/2, notify_sent_queue_down/1, unblock/2, flush_all/2]). -export([notify_down_all/2, limit_all/3]). -export([on_node_down/1]). -export([store_queue/1]). @@ -40,6 +40,8 @@ -define(INTEGER_ARG_TYPES, [byte, short, signedint, long]). +-define(MORE_CONSUMER_CREDIT_AFTER, 50). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -137,6 +139,7 @@ -spec(basic_cancel/4 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). +-spec(notify_sent_queue_down/1 :: (pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). -spec(flush_all/2 :: (qpids(), pid()) -> 'ok'). -spec(internal_delete/1 :: @@ -461,7 +464,21 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> ok = delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). notify_sent(QPid, ChPid) -> - gen_server2:cast(QPid, {notify_sent, ChPid}). + Key = {consumer_credit_to, QPid}, + put(Key, case get(Key) of + 1 -> gen_server2:cast( + QPid, {notify_sent, ChPid, + ?MORE_CONSUMER_CREDIT_AFTER}), + ?MORE_CONSUMER_CREDIT_AFTER; + undefined -> erlang:monitor(process, QPid), + ?MORE_CONSUMER_CREDIT_AFTER - 1; + C -> C - 1 + end), + ok. + +notify_sent_queue_down(QPid) -> + erase({consumer_credit_to, QPid}), + ok. unblock(QPid, ChPid) -> delegate_cast(QPid, {unblock, ChPid}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 5629b836..b3a620fa 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -20,7 +20,7 @@ -behaviour(gen_server2). --define(UNSENT_MESSAGE_LIMIT, 100). +-define(UNSENT_MESSAGE_LIMIT, 200). -define(SYNC_INTERVAL, 25). %% milliseconds -define(RAM_DURATION_UPDATE_INTERVAL, 5000). @@ -823,7 +823,7 @@ prioritise_cast(Msg, _State) -> {set_maximum_since_use, _Age} -> 8; {ack, _AckTags, _ChPid} -> 7; {reject, _AckTags, _Requeue, _ChPid} -> 7; - {notify_sent, _ChPid} -> 7; + {notify_sent, _ChPid, _Credit} -> 7; {unblock, _ChPid} -> 7; {run_backing_queue, _Mod, _Fun} -> 6; _ -> 0 @@ -1057,11 +1057,11 @@ handle_cast({unblock, ChPid}, State) -> possibly_unblock(State, ChPid, fun (C) -> C#cr{is_limit_active = false} end)); -handle_cast({notify_sent, ChPid}, State) -> +handle_cast({notify_sent, ChPid, Credit}, State) -> noreply( possibly_unblock(State, ChPid, fun (C = #cr{unsent_message_count = Count}) -> - C#cr{unsent_message_count = Count - 1} + C#cr{unsent_message_count = Count - Credit} end)); handle_cast({limit, ChPid, Limiter}, State) -> diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index 269128df..dc74b2f5 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -129,6 +129,9 @@ handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content}, ok = internal_send_command_async(MethodRecord, Content, State), rabbit_amqqueue:notify_sent(QPid, ChPid), State; +handle_message({'DOWN', _MRef, process, QPid, _Reason}, State) -> + rabbit_amqqueue:notify_sent_queue_down(QPid), + State; handle_message({inet_reply, _, ok}, State) -> State; handle_message({inet_reply, _, Status}, _State) -> |