diff options
author | tomyouyou <yxszyn@163.com> | 2021-11-27 20:30:04 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-11-27 20:30:04 +0800 |
commit | 50160a0232450735f97bea22d5310c0a470e61c2 (patch) | |
tree | 8ebad81b17a48490c70593b30ab065351e6ba901 | |
parent | 45f69f88296b92acbec209de8b8c02d4e7893667 (diff) | |
download | rabbitmq-server-git-50160a0232450735f97bea22d5310c0a470e61c2.tar.gz |
To deactivate limit of all QPids when the limiter has been changed from 'limit' to 'unlimit'.
Otherwise, the QPids will still to ask limiter whether it can be sent before delivering.
This will degrade performance, especially when the limiter and QPid are on different nodes.
When 'can_send' is deactivated, the test results are as follows:
id: test-100147-150, time: 400.016s, sent: 17654 msg/s, returned: 0 msg/s, confirmed: 17658 msg/s, nacked: 0 msg/s, received: 17663 msg/s, min/median/75th/95th/99th consumer latency: 1775/5899/6486/7369/8440 μs, confirm latency: 2171/5581/6127/7026/7911 μs
test stopped (Reached time limit)
id: test-100147-150, sending rate avg: 17630 msg/s
id: test-100147-150, receiving rate avg: 17630 msg/s
When limiter and QPid are on the same node and 'can_send' is activated, the test results are as follows:
id: test-095229-474, time: 400.015s, sent: 13246 msg/s, returned: 0 msg/s, confirmed: 13247 msg/s, nacked: 0 msg/s, received: 13245 msg/s, min/median/75th/95th/99th consumer latency: 3777/7316/8345/10447/11392 μs, confirm latency: 4074/7308/8257/10336/11341 μs
test stopped (Reached time limit)
id: test-095229-474, sending rate avg: 13317 msg/s
id: test-095229-474, receiving rate avg: 13317 msg/s
we have seen, for the message rate, the test showed a 24% drop.
-rw-r--r-- | deps/rabbit/src/rabbit_amqqueue.erl | 9 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_amqqueue_process.erl | 4 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_channel.erl | 10 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_queue_consumers.erl | 9 |
4 files changed, 32 insertions, 0 deletions
diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index a10b30e0a5..91f4a3f130 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -64,6 +64,8 @@ -export([check_max_age/1]). -export([get_queue_type/1, get_resource_vhost_name/1, get_resource_name/1]). +-export([deactivate_limit_all/2]). + %% internal -export([internal_declare/2, internal_delete/2, run_backing_queue/3, set_ram_duration_target/2, set_maximum_since_use/2, @@ -1618,6 +1620,13 @@ activate_limit_all(QRefs, ChPid) -> delegate:invoke_no_result(QPids, {gen_server2, cast, [{activate_limit, ChPid}]}). +-spec deactivate_limit_all(qpids(), pid()) -> ok. + +deactivate_limit_all(QRefs, ChPid) -> + QPids = [P || P <- QRefs, ?IS_CLASSIC(P)], + delegate:invoke_no_result(QPids, {gen_server2, cast, + [{deactivate_limit, ChPid}]}). + -spec credit(amqqueue:amqqueue(), rabbit_types:ctag(), non_neg_integer(), diff --git a/deps/rabbit/src/rabbit_amqqueue_process.erl b/deps/rabbit/src/rabbit_amqqueue_process.erl index d8b29d1d32..a9d673d65d 100644 --- a/deps/rabbit/src/rabbit_amqqueue_process.erl +++ b/deps/rabbit/src/rabbit_amqqueue_process.erl @@ -1582,6 +1582,10 @@ handle_cast({activate_limit, ChPid}, State) -> noreply(possibly_unblock(rabbit_queue_consumers:activate_limit_fun(), ChPid, State)); +handle_cast({deactivate_limit, ChPid}, State) -> + noreply(possibly_unblock(rabbit_queue_consumers:deactivate_limit_fun(), + ChPid, State)); + handle_cast({set_ram_duration_target, Duration}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> BQS1 = BQ:set_ram_duration_target(Duration, BQS), diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 37e8fd21ae..7fee29f2c3 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -1546,6 +1546,11 @@ handle_method(#'basic.qos'{global = false, limiter = Limiter}) -> %% Ensures that if default was set, it's overridden Limiter1 = rabbit_limiter:unlimit_prefetch(Limiter), + case rabbit_limiter:is_active(Limiter) of + true -> rabbit_amqqueue:deactivate_limit_all( + classic_consumer_queue_pids(State#ch.consumer_mapping), self()); + false -> ok + end, {reply, #'basic.qos_ok'{}, State#ch{cfg = Cfg#conf{consumer_prefetch = PrefetchCount}, limiter = Limiter1}}; @@ -1553,6 +1558,11 @@ handle_method(#'basic.qos'{global = true, prefetch_count = 0}, _, State = #ch{limiter = Limiter}) -> Limiter1 = rabbit_limiter:unlimit_prefetch(Limiter), + case rabbit_limiter:is_active(Limiter) of + true -> rabbit_amqqueue:deactivate_limit_all( + classic_consumer_queue_pids(State#ch.consumer_mapping), self()); + false -> ok + end, {reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter1}}; handle_method(#'basic.qos'{global = true, diff --git a/deps/rabbit/src/rabbit_queue_consumers.erl b/deps/rabbit/src/rabbit_queue_consumers.erl index 83538bd5f5..39c759aa5e 100644 --- a/deps/rabbit/src/rabbit_queue_consumers.erl +++ b/deps/rabbit/src/rabbit_queue_consumers.erl @@ -15,6 +15,8 @@ credit/6, utilisation/1, capacity/1, is_same/3, get_consumer/1, get/3, consumer_tag/1, get_infos/1]). +-export([deactivate_limit_fun/0]). + %%---------------------------------------------------------------------------- -define(QUEUE, lqueue). @@ -385,6 +387,13 @@ activate_limit_fun() -> C#cr{limiter = rabbit_limiter:activate(Limiter)} end. +-spec deactivate_limit_fun() -> cr_fun(). + +deactivate_limit_fun() -> + fun (C = #cr{limiter = Limiter}) -> + C#cr{limiter = rabbit_limiter:deactivate(Limiter)} + end. + -spec credit(boolean(), integer(), boolean(), ch(), rabbit_types:ctag(), state()) -> 'unchanged' | {'unblocked', state()}. |