summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authortomyouyou <yxszyn@163.com>2021-11-27 20:30:04 +0800
committerGitHub <noreply@github.com>2021-11-27 20:30:04 +0800
commit50160a0232450735f97bea22d5310c0a470e61c2 (patch)
tree8ebad81b17a48490c70593b30ab065351e6ba901
parent45f69f88296b92acbec209de8b8c02d4e7893667 (diff)
downloadrabbitmq-server-git-50160a0232450735f97bea22d5310c0a470e61c2.tar.gz
To deactivate limit of all QPids when the limiter has been changed from 'limit' to 'unlimit'.
Otherwise, the QPids will still to ask limiter whether it can be sent before delivering. This will degrade performance, especially when the limiter and QPid are on different nodes. When 'can_send' is deactivated, the test results are as follows: id: test-100147-150, time: 400.016s, sent: 17654 msg/s, returned: 0 msg/s, confirmed: 17658 msg/s, nacked: 0 msg/s, received: 17663 msg/s, min/median/75th/95th/99th consumer latency: 1775/5899/6486/7369/8440 μs, confirm latency: 2171/5581/6127/7026/7911 μs test stopped (Reached time limit) id: test-100147-150, sending rate avg: 17630 msg/s id: test-100147-150, receiving rate avg: 17630 msg/s When limiter and QPid are on the same node and 'can_send' is activated, the test results are as follows: id: test-095229-474, time: 400.015s, sent: 13246 msg/s, returned: 0 msg/s, confirmed: 13247 msg/s, nacked: 0 msg/s, received: 13245 msg/s, min/median/75th/95th/99th consumer latency: 3777/7316/8345/10447/11392 μs, confirm latency: 4074/7308/8257/10336/11341 μs test stopped (Reached time limit) id: test-095229-474, sending rate avg: 13317 msg/s id: test-095229-474, receiving rate avg: 13317 msg/s we have seen, for the message rate, the test showed a 24% drop.
-rw-r--r--deps/rabbit/src/rabbit_amqqueue.erl9
-rw-r--r--deps/rabbit/src/rabbit_amqqueue_process.erl4
-rw-r--r--deps/rabbit/src/rabbit_channel.erl10
-rw-r--r--deps/rabbit/src/rabbit_queue_consumers.erl9
4 files changed, 32 insertions, 0 deletions
diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl
index a10b30e0a5..91f4a3f130 100644
--- a/deps/rabbit/src/rabbit_amqqueue.erl
+++ b/deps/rabbit/src/rabbit_amqqueue.erl
@@ -64,6 +64,8 @@
-export([check_max_age/1]).
-export([get_queue_type/1, get_resource_vhost_name/1, get_resource_name/1]).
+-export([deactivate_limit_all/2]).
+
%% internal
-export([internal_declare/2, internal_delete/2, run_backing_queue/3,
set_ram_duration_target/2, set_maximum_since_use/2,
@@ -1618,6 +1620,13 @@ activate_limit_all(QRefs, ChPid) ->
delegate:invoke_no_result(QPids, {gen_server2, cast,
[{activate_limit, ChPid}]}).
+-spec deactivate_limit_all(qpids(), pid()) -> ok.
+
+deactivate_limit_all(QRefs, ChPid) ->
+ QPids = [P || P <- QRefs, ?IS_CLASSIC(P)],
+ delegate:invoke_no_result(QPids, {gen_server2, cast,
+ [{deactivate_limit, ChPid}]}).
+
-spec credit(amqqueue:amqqueue(),
rabbit_types:ctag(),
non_neg_integer(),
diff --git a/deps/rabbit/src/rabbit_amqqueue_process.erl b/deps/rabbit/src/rabbit_amqqueue_process.erl
index d8b29d1d32..a9d673d65d 100644
--- a/deps/rabbit/src/rabbit_amqqueue_process.erl
+++ b/deps/rabbit/src/rabbit_amqqueue_process.erl
@@ -1582,6 +1582,10 @@ handle_cast({activate_limit, ChPid}, State) ->
noreply(possibly_unblock(rabbit_queue_consumers:activate_limit_fun(),
ChPid, State));
+handle_cast({deactivate_limit, ChPid}, State) ->
+ noreply(possibly_unblock(rabbit_queue_consumers:deactivate_limit_fun(),
+ ChPid, State));
+
handle_cast({set_ram_duration_target, Duration},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
BQS1 = BQ:set_ram_duration_target(Duration, BQS),
diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl
index 37e8fd21ae..7fee29f2c3 100644
--- a/deps/rabbit/src/rabbit_channel.erl
+++ b/deps/rabbit/src/rabbit_channel.erl
@@ -1546,6 +1546,11 @@ handle_method(#'basic.qos'{global = false,
limiter = Limiter}) ->
%% Ensures that if default was set, it's overridden
Limiter1 = rabbit_limiter:unlimit_prefetch(Limiter),
+ case rabbit_limiter:is_active(Limiter) of
+ true -> rabbit_amqqueue:deactivate_limit_all(
+ classic_consumer_queue_pids(State#ch.consumer_mapping), self());
+ false -> ok
+ end,
{reply, #'basic.qos_ok'{}, State#ch{cfg = Cfg#conf{consumer_prefetch = PrefetchCount},
limiter = Limiter1}};
@@ -1553,6 +1558,11 @@ handle_method(#'basic.qos'{global = true,
prefetch_count = 0},
_, State = #ch{limiter = Limiter}) ->
Limiter1 = rabbit_limiter:unlimit_prefetch(Limiter),
+ case rabbit_limiter:is_active(Limiter) of
+ true -> rabbit_amqqueue:deactivate_limit_all(
+ classic_consumer_queue_pids(State#ch.consumer_mapping), self());
+ false -> ok
+ end,
{reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter1}};
handle_method(#'basic.qos'{global = true,
diff --git a/deps/rabbit/src/rabbit_queue_consumers.erl b/deps/rabbit/src/rabbit_queue_consumers.erl
index 83538bd5f5..39c759aa5e 100644
--- a/deps/rabbit/src/rabbit_queue_consumers.erl
+++ b/deps/rabbit/src/rabbit_queue_consumers.erl
@@ -15,6 +15,8 @@
credit/6, utilisation/1, capacity/1, is_same/3, get_consumer/1, get/3,
consumer_tag/1, get_infos/1]).
+-export([deactivate_limit_fun/0]).
+
%%----------------------------------------------------------------------------
-define(QUEUE, lqueue).
@@ -385,6 +387,13 @@ activate_limit_fun() ->
C#cr{limiter = rabbit_limiter:activate(Limiter)}
end.
+-spec deactivate_limit_fun() -> cr_fun().
+
+deactivate_limit_fun() ->
+ fun (C = #cr{limiter = Limiter}) ->
+ C#cr{limiter = rabbit_limiter:deactivate(Limiter)}
+ end.
+
-spec credit(boolean(), integer(), boolean(), ch(), rabbit_types:ctag(),
state()) -> 'unchanged' | {'unblocked', state()}.