diff options
author | Michael Klishin <klishinm@vmware.com> | 2022-10-28 23:23:32 +0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-10-28 23:23:32 +0400 |
commit | f802ba29efa2dcf72f1dfb7e64dc5525b3bb3191 (patch) | |
tree | dc1aaffe637f82827491602335fda816f3432381 | |
parent | 329db668337b64e6a80a136980b95964b245c091 (diff) | |
parent | 1a5a29a19c4661d9267b67b3c2619de04dfaa657 (diff) | |
download | rabbitmq-server-git-f802ba29efa2dcf72f1dfb7e64dc5525b3bb3191.tar.gz |
Merge pull request #6276 from rabbitmq/mergify/bp/v3.11.x/pr-6183
QQ: don't try to contact non-connected nodes for stats (backport #6183)
-rw-r--r-- | deps/rabbit/src/rabbit_quorum_queue.erl | 95 | ||||
-rw-r--r-- | deps/rabbit/test/quorum_queue_SUITE.erl | 2 |
2 files changed, 65 insertions, 32 deletions
diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 0831289963..bd4c2e877d 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -203,8 +203,8 @@ start_cluster(Q) -> ?SNAPSHOT_INTERVAL), RaConfs = [make_ra_conf(NewQ, ServerId, TickTimeout, SnapshotInterval) || ServerId <- members(NewQ)], - Timeout = erpc_timeout(Leader, ?START_CLUSTER_RPC_TIMEOUT), - try erpc:call(Leader, ra, start_cluster, [?RA_SYSTEM, RaConfs], Timeout) of + try erpc_call(Leader, ra, start_cluster, [?RA_SYSTEM, RaConfs], + ?START_CLUSTER_RPC_TIMEOUT) of {ok, _, _} -> %% ensure the latest config is evaluated properly %% even when running the machine version from 0 @@ -285,17 +285,23 @@ single_active_consumer_on(Q) -> _ -> false end. -update_consumer_handler(QName, {ConsumerTag, ChPid}, Exclusive, AckRequired, Prefetch, Active, ActivityStatus, Args) -> - local_or_remote_handler(ChPid, rabbit_quorum_queue, update_consumer, - [QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, Active, ActivityStatus, Args]). +update_consumer_handler(QName, {ConsumerTag, ChPid}, Exclusive, AckRequired, + Prefetch, Active, ActivityStatus, Args) -> + catch local_or_remote_handler(ChPid, rabbit_quorum_queue, update_consumer, + [QName, ChPid, ConsumerTag, Exclusive, + AckRequired, Prefetch, Active, + ActivityStatus, Args]). -update_consumer(QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, Active, ActivityStatus, Args) -> - catch rabbit_core_metrics:consumer_updated(ChPid, ConsumerTag, Exclusive, AckRequired, - QName, Prefetch, Active, ActivityStatus, Args). +update_consumer(QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, + Active, ActivityStatus, Args) -> + catch rabbit_core_metrics:consumer_updated(ChPid, ConsumerTag, Exclusive, + AckRequired, + QName, Prefetch, Active, + ActivityStatus, Args). cancel_consumer_handler(QName, {ConsumerTag, ChPid}) -> - local_or_remote_handler(ChPid, rabbit_quorum_queue, cancel_consumer, - [QName, ChPid, ConsumerTag]). + catch local_or_remote_handler(ChPid, rabbit_quorum_queue, cancel_consumer, + [QName, ChPid, ConsumerTag]). cancel_consumer(QName, ChPid, ConsumerTag) -> catch rabbit_core_metrics:consumer_deleted(ChPid, ConsumerTag, QName), @@ -309,7 +315,7 @@ local_or_remote_handler(ChPid, Module, Function, Args) -> false -> %% this could potentially block for a while if the node is %% in disconnected state or tcp buffers are full - rpc:cast(Node, Module, Function, Args) + erpc:cast(Node, Module, Function, Args) end. become_leader(QName, Name) -> @@ -329,8 +335,8 @@ become_leader(QName, Name) -> case rabbit_amqqueue:lookup(QName) of {ok, Q0} when ?is_amqqueue(Q0) -> Nodes = get_nodes(Q0), - [rpc:call(Node, ?MODULE, rpc_delete_metrics, - [QName], ?RPC_TIMEOUT) + [_ = erpc_call(Node, ?MODULE, rpc_delete_metrics, + [QName], ?RPC_TIMEOUT) || Node <- Nodes, Node =/= node()]; _ -> ok @@ -676,8 +682,8 @@ delete(Q, _IfUnused, _IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) -> end, notify_decorators(QName, shutdown), ok = delete_queue_data(QName, ActingUser), - rpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName], - ?RPC_TIMEOUT), + _ = erpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName], + ?RPC_TIMEOUT), {ok, ReadyMsgs}; {error, {no_more_servers_to_try, Errs}} -> case lists:all(fun({{error, noproc}, _}) -> true; @@ -971,9 +977,8 @@ cleanup_data_dir() -> end || Q <- rabbit_amqqueue:list_by_type(?MODULE), lists:member(node(), get_nodes(Q))], - NoQQClusters = rabbit_ra_registry:list_not_quorum_clusters(), Registered = ra_directory:list_registered(?RA_SYSTEM), - Running = Names ++ NoQQClusters, + Running = Names, _ = [maybe_delete_data_dir(UId) || {Name, UId} <- Registered, not lists:member(Name, Running)], ok. @@ -1436,9 +1441,11 @@ i(memory, Q) when ?is_amqqueue(Q) -> i(state, Q) when ?is_amqqueue(Q) -> {Name, Node} = amqqueue:get_pid(Q), %% Check against the leader or last known leader - case rpc:call(Node, ?MODULE, cluster_state, [Name], ?RPC_TIMEOUT) of - {badrpc, _} -> down; - State -> State + case erpc_call(Node, ?MODULE, cluster_state, [Name], ?RPC_TIMEOUT) of + {error, _} -> + down; + State -> + State end; i(local_state, Q) when ?is_amqqueue(Q) -> {Name, _} = amqqueue:get_pid(Q), @@ -1457,7 +1464,7 @@ i(online, Q) -> online(Q); i(leader, Q) -> leader(Q); i(open_files, Q) when ?is_amqqueue(Q) -> {Name, _} = amqqueue:get_pid(Q), - Nodes = get_nodes(Q), + Nodes = get_connected_nodes(Q), {Data, _} = rpc:multicall(Nodes, ?MODULE, open_files, [Name]), lists:flatten(Data); i(single_active_consumer_pid, Q) when ?is_amqqueue(Q) -> @@ -1559,7 +1566,7 @@ peek(_Pos, Q) when ?is_amqqueue(Q) andalso ?amqqueue_is_classic(Q) -> {error, classic_queue_not_supported}. online(Q) when ?is_amqqueue(Q) -> - Nodes = get_nodes(Q), + Nodes = get_connected_nodes(Q), {Name, _} = amqqueue:get_pid(Q), [Node || Node <- Nodes, is_process_alive(Name, Node)]. @@ -1568,7 +1575,10 @@ format(Q) when ?is_amqqueue(Q) -> [{members, Nodes}, {online, online(Q)}, {leader, leader(Q)}]. is_process_alive(Name, Node) -> - erlang:is_pid(rpc:call(Node, erlang, whereis, [Name], ?RPC_TIMEOUT)). + %% don't attempt rpc if node is not already connected + %% as this function is used for metrics and stats and the additional + %% latency isn't warranted + erlang:is_pid(erpc_call(Node, erlang, whereis, [Name], ?RPC_TIMEOUT)). -spec quorum_messages(rabbit_amqqueue:name()) -> non_neg_integer(). @@ -1626,6 +1636,10 @@ get_nodes(Q) when ?is_amqqueue(Q) -> #{nodes := Nodes} = amqqueue:get_type_state(Q), Nodes. +get_connected_nodes(Q) when ?is_amqqueue(Q) -> + ErlangNodes = [node() | nodes()], + [N || N <- get_nodes(Q), lists:member(N, ErlangNodes)]. + update_type_state(Q, Fun) when ?is_amqqueue(Q) -> Ts = amqqueue:get_type_state(Q), amqqueue:set_type_state(Q, Fun(Ts)). @@ -1691,14 +1705,6 @@ prepare_content(Content) -> %% rabbit_fifo can directly parse it without having to decode again. Content. -erpc_timeout(Node, _) - when Node =:= node() -> - %% Only timeout 'infinity' optimises the local call in OTP 23-25 avoiding a new process being spawned: - %% https://github.com/erlang/otp/blob/47f121af8ee55a0dbe2a8c9ab85031ba052bad6b/lib/kernel/src/erpc.erl#L121 - infinity; -erpc_timeout(_, Timeout) -> - Timeout. - ets_lookup_element(Tbl, Key, Pos, Default) -> try ets:lookup_element(Tbl, Key, Pos) of V -> V @@ -1706,3 +1712,30 @@ ets_lookup_element(Tbl, Key, Pos, Default) -> _:badarg -> Default end. + +erpc_call(Node, M, F, A, Timeout) + when is_integer(Timeout) andalso Node == node() -> + %% Only timeout 'infinity' optimises the local call in OTP 23-25 avoiding a new process being spawned: + %% https://github.com/erlang/otp/blob/47f121af8ee55a0dbe2a8c9ab85031ba052bad6b/lib/kernel/src/erpc.erl#L121 + try erpc:call(Node, M, F, A, Timeout) of + Result -> + Result + catch + error:Err -> + {error, Err} + end; +erpc_call(Node, M, F, A, Timeout) -> + case lists:member(Node, nodes()) of + true -> + try erpc:call(Node, M, F, A, Timeout) of + Result -> + Result + catch + error:Err -> + {error, Err} + end; + false -> + {error, noconnection} + end. + + diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 035398bd56..6dd29c5312 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -1705,7 +1705,7 @@ add_member(Config) -> ok = rabbit_control_helper:command(stop_app, Server1), ok = rabbit_control_helper:command(join_cluster, Server1, [atom_to_list(Server0)], []), rabbit_control_helper:command(start_app, Server1), - ?assertEqual(ok, rpc:call(Server0, rabbit_quorum_queue, add_member, + ?assertEqual(ok, rpc:call(Server1, rabbit_quorum_queue, add_member, [<<"/">>, QQ, Server1, 5000])), Info = rpc:call(Server0, rabbit_quorum_queue, infos, [rabbit_misc:r(<<"/">>, queue, QQ)]), |