From d65b067aa9759edd4faab55aad4aaca53690b075 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Thu, 10 Jan 2013 18:01:16 +0000 Subject: Second attempt at moving credit into the queue. This time we pretend the limiter is still doing the work. While testing this I note that the credit calculation is crazy when testing with Proton. But it was just as bad before, so let's commit. --- src/rabbit_amqqueue.erl | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'src/rabbit_amqqueue.erl') diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 94150f1c..a337c722 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -32,6 +32,7 @@ -export([on_node_down/1]). -export([update/2, store_queue/1, policy_changed/2]). -export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1]). +-export([inform_limiter/3]). %% internal -export([internal_declare/2, internal_delete/1, run_backing_queue/3, @@ -175,6 +176,7 @@ -spec(stop_mirroring/1 :: (pid()) -> 'ok'). -spec(sync_mirrors/1 :: (pid()) -> 'ok' | rabbit_types:error('pending_acks' | 'not_mirrored')). +-spec(inform_limiter/3 :: (pid(), pid(), any()) -> 'ok'). -endif. @@ -604,6 +606,9 @@ stop_mirroring(QPid) -> ok = delegate:cast(QPid, stop_mirroring). sync_mirrors(QPid) -> delegate:call(QPid, sync_mirrors). +inform_limiter(ChPid, QPid, Msg) -> + delegate:cast(QPid, {inform_limiter, ChPid, Msg}). + on_node_down(Node) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> QsDels = -- cgit v1.2.1 From 2ad15707cb8ccf7c192fb619b4bdb5032fd40b9c Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Tue, 15 Jan 2013 18:49:13 +0000 Subject: cosmetic --- src/rabbit_amqqueue.erl | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) (limited to 'src/rabbit_amqqueue.erl') diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 1715e848..1f2d653e 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -28,12 +28,11 @@ -export([consumers/1, consumers_all/1, consumer_info_keys/0]). -export([basic_get/3, basic_consume/7, basic_cancel/4]). -export([notify_sent/2, notify_sent_queue_down/1, unblock/2, flush_all/2]). --export([notify_down_all/2, limit_all/3]). +-export([notify_down_all/2, limit_all/3, inform_limiter/3]). -export([on_node_down/1]). -export([update/2, store_queue/1, policy_changed/2]). -export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1, cancel_sync_mirrors/1]). --export([inform_limiter/3]). %% internal -export([internal_declare/2, internal_delete/1, run_backing_queue/3, @@ -146,6 +145,7 @@ -spec(notify_down_all/2 :: (qpids(), pid()) -> ok_or_errors()). -spec(limit_all/3 :: (qpids(), pid(), rabbit_limiter:token()) -> ok_or_errors()). +-spec(inform_limiter/3 :: (pid(), pid(), any()) -> 'ok'). -spec(basic_get/3 :: (rabbit_types:amqqueue(), pid(), boolean()) -> {'ok', non_neg_integer(), qmsg()} | 'empty'). -spec(basic_consume/7 :: @@ -178,7 +178,6 @@ -spec(sync_mirrors/1 :: (pid()) -> 'ok' | rabbit_types:error('pending_acks' | 'not_mirrored')). -spec(cancel_sync_mirrors/1 :: (pid()) -> 'ok' | {'ok', 'not_syncing'}). --spec(inform_limiter/3 :: (pid(), pid(), any()) -> 'ok'). -endif. @@ -535,6 +534,9 @@ notify_down_all(QPids, ChPid) -> limit_all(QPids, ChPid, Limiter) -> delegate:cast(QPids, {limit, ChPid, Limiter}). +inform_limiter(ChPid, QPid, Msg) -> + delegate:cast(QPid, {inform_limiter, ChPid, Msg}). + basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> delegate:call(QPid, {basic_get, ChPid, NoAck}). @@ -609,9 +611,6 @@ stop_mirroring(QPid) -> ok = delegate:cast(QPid, stop_mirroring). sync_mirrors(QPid) -> delegate:call(QPid, sync_mirrors). cancel_sync_mirrors(QPid) -> delegate:call(QPid, cancel_sync_mirrors). -inform_limiter(ChPid, QPid, Msg) -> - delegate:cast(QPid, {inform_limiter, ChPid, Msg}). - on_node_down(Node) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> QsDels = -- cgit v1.2.1 From 097182274da2cd2d5a37ac35f6add7d06963ba76 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Tue, 15 Jan 2013 18:50:56 +0000 Subject: API consistency --- src/rabbit_amqqueue.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/rabbit_amqqueue.erl') diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 1f2d653e..788ec558 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -145,7 +145,7 @@ -spec(notify_down_all/2 :: (qpids(), pid()) -> ok_or_errors()). -spec(limit_all/3 :: (qpids(), pid(), rabbit_limiter:token()) -> ok_or_errors()). --spec(inform_limiter/3 :: (pid(), pid(), any()) -> 'ok'). +-spec(inform_limiter/3 :: (rabbit_types:amqqueue(), pid(), any()) -> 'ok'). -spec(basic_get/3 :: (rabbit_types:amqqueue(), pid(), boolean()) -> {'ok', non_neg_integer(), qmsg()} | 'empty'). -spec(basic_consume/7 :: @@ -534,7 +534,7 @@ notify_down_all(QPids, ChPid) -> limit_all(QPids, ChPid, Limiter) -> delegate:cast(QPids, {limit, ChPid, Limiter}). -inform_limiter(ChPid, QPid, Msg) -> +inform_limiter(#amqqueue{pid = QPid}, ChPid, Msg) -> delegate:cast(QPid, {inform_limiter, ChPid, Msg}). basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> -- cgit v1.2.1 From f3025ce7562d1f511baef3dbfb7231651b9e3509 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 29 Jan 2013 14:50:16 +0000 Subject: inform_limiter -> credit. --- src/rabbit_amqqueue.erl | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) (limited to 'src/rabbit_amqqueue.erl') diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index cb7e961d..b9d41c25 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -28,7 +28,7 @@ -export([consumers/1, consumers_all/1, consumer_info_keys/0]). -export([basic_get/3, basic_consume/7, basic_cancel/4]). -export([notify_sent/2, notify_sent_queue_down/1, unblock/2, flush_all/2]). --export([notify_down_all/2, limit_all/3, inform_limiter/3]). +-export([notify_down_all/2, limit_all/3, credit/6]). -export([on_node_down/1]). -export([update/2, store_queue/1, policy_changed/2]). -export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1, @@ -145,7 +145,8 @@ -spec(notify_down_all/2 :: (qpids(), pid()) -> ok_or_errors()). -spec(limit_all/3 :: (qpids(), pid(), rabbit_limiter:token()) -> ok_or_errors()). --spec(inform_limiter/3 :: (rabbit_types:amqqueue(), pid(), any()) -> 'ok'). +-spec(credit/6 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), + non_neg_integer(), boolean(), boolean()) -> 'ok'). -spec(basic_get/3 :: (rabbit_types:amqqueue(), pid(), boolean()) -> {'ok', non_neg_integer(), qmsg()} | 'empty'). -spec(basic_consume/7 :: @@ -533,8 +534,8 @@ notify_down_all(QPids, ChPid) -> limit_all(QPids, ChPid, Limiter) -> delegate:cast(QPids, {limit, ChPid, Limiter}). -inform_limiter(#amqqueue{pid = QPid}, ChPid, Msg) -> - delegate:cast(QPid, {inform_limiter, ChPid, Msg}). +credit(#amqqueue{pid = QPid}, ChPid, CTag, Credit, Drain, Reply) -> + delegate:cast(QPid, {credit, ChPid, CTag, Credit, Drain, Reply}). basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> delegate:call(QPid, {basic_get, ChPid, NoAck}). -- cgit v1.2.1 From 39ac6385e1e3b0d215349ec3dc3476eda9432c7d Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 29 Jan 2013 16:06:46 +0000 Subject: rabbit_limiter:initial_credit/6. --- src/rabbit_amqqueue.erl | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) (limited to 'src/rabbit_amqqueue.erl') diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index b9d41c25..3673d06e 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -26,9 +26,9 @@ -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). -export([force_event_refresh/0, wake_up/1]). -export([consumers/1, consumers_all/1, consumer_info_keys/0]). --export([basic_get/3, basic_consume/7, basic_cancel/4]). +-export([basic_get/3, basic_consume/8, basic_cancel/4]). -export([notify_sent/2, notify_sent_queue_down/1, unblock/2, flush_all/2]). --export([notify_down_all/2, limit_all/3, credit/6]). +-export([notify_down_all/2, limit_all/3, credit/5]). -export([on_node_down/1]). -export([update/2, store_queue/1, policy_changed/2]). -export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1, @@ -145,13 +145,14 @@ -spec(notify_down_all/2 :: (qpids(), pid()) -> ok_or_errors()). -spec(limit_all/3 :: (qpids(), pid(), rabbit_limiter:token()) -> ok_or_errors()). --spec(credit/6 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), - non_neg_integer(), boolean(), boolean()) -> 'ok'). +-spec(credit/5 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), + non_neg_integer(), boolean()) -> 'ok'). -spec(basic_get/3 :: (rabbit_types:amqqueue(), pid(), boolean()) -> {'ok', non_neg_integer(), qmsg()} | 'empty'). --spec(basic_consume/7 :: +-spec(basic_consume/8 :: (rabbit_types:amqqueue(), boolean(), pid(), - rabbit_limiter:token(), rabbit_types:ctag(), boolean(), any()) + rabbit_limiter:token(), rabbit_types:ctag(), boolean(), + {non_neg_integer(), boolean()} | 'none', any()) -> rabbit_types:ok_or_error('exclusive_consume_unavailable')). -spec(basic_cancel/4 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok'). @@ -534,16 +535,16 @@ notify_down_all(QPids, ChPid) -> limit_all(QPids, ChPid, Limiter) -> delegate:cast(QPids, {limit, ChPid, Limiter}). -credit(#amqqueue{pid = QPid}, ChPid, CTag, Credit, Drain, Reply) -> - delegate:cast(QPid, {credit, ChPid, CTag, Credit, Drain, Reply}). +credit(#amqqueue{pid = QPid}, ChPid, CTag, Credit, Drain) -> + delegate:cast(QPid, {credit, ChPid, CTag, Credit, Drain}). basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> delegate:call(QPid, {basic_get, ChPid, NoAck}). basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, Limiter, - ConsumerTag, ExclusiveConsume, OkMsg) -> - delegate:call(QPid, {basic_consume, NoAck, ChPid, - Limiter, ConsumerTag, ExclusiveConsume, OkMsg}). + ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg) -> + delegate:call(QPid, {basic_consume, NoAck, ChPid, Limiter, + ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg}). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). -- cgit v1.2.1