summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <klishinm@vmware.com>2022-10-28 23:23:32 +0400
committerGitHub <noreply@github.com>2022-10-28 23:23:32 +0400
commitf802ba29efa2dcf72f1dfb7e64dc5525b3bb3191 (patch)
treedc1aaffe637f82827491602335fda816f3432381
parent329db668337b64e6a80a136980b95964b245c091 (diff)
parent1a5a29a19c4661d9267b67b3c2619de04dfaa657 (diff)
downloadrabbitmq-server-git-f802ba29efa2dcf72f1dfb7e64dc5525b3bb3191.tar.gz
Merge pull request #6276 from rabbitmq/mergify/bp/v3.11.x/pr-6183
QQ: don't try to contact non-connected nodes for stats (backport #6183)
-rw-r--r--deps/rabbit/src/rabbit_quorum_queue.erl95
-rw-r--r--deps/rabbit/test/quorum_queue_SUITE.erl2
2 files changed, 65 insertions, 32 deletions
diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl
index 0831289963..bd4c2e877d 100644
--- a/deps/rabbit/src/rabbit_quorum_queue.erl
+++ b/deps/rabbit/src/rabbit_quorum_queue.erl
@@ -203,8 +203,8 @@ start_cluster(Q) ->
?SNAPSHOT_INTERVAL),
RaConfs = [make_ra_conf(NewQ, ServerId, TickTimeout, SnapshotInterval)
|| ServerId <- members(NewQ)],
- Timeout = erpc_timeout(Leader, ?START_CLUSTER_RPC_TIMEOUT),
- try erpc:call(Leader, ra, start_cluster, [?RA_SYSTEM, RaConfs], Timeout) of
+ try erpc_call(Leader, ra, start_cluster, [?RA_SYSTEM, RaConfs],
+ ?START_CLUSTER_RPC_TIMEOUT) of
{ok, _, _} ->
%% ensure the latest config is evaluated properly
%% even when running the machine version from 0
@@ -285,17 +285,23 @@ single_active_consumer_on(Q) ->
_ -> false
end.
-update_consumer_handler(QName, {ConsumerTag, ChPid}, Exclusive, AckRequired, Prefetch, Active, ActivityStatus, Args) ->
- local_or_remote_handler(ChPid, rabbit_quorum_queue, update_consumer,
- [QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, Active, ActivityStatus, Args]).
+update_consumer_handler(QName, {ConsumerTag, ChPid}, Exclusive, AckRequired,
+ Prefetch, Active, ActivityStatus, Args) ->
+ catch local_or_remote_handler(ChPid, rabbit_quorum_queue, update_consumer,
+ [QName, ChPid, ConsumerTag, Exclusive,
+ AckRequired, Prefetch, Active,
+ ActivityStatus, Args]).
-update_consumer(QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, Active, ActivityStatus, Args) ->
- catch rabbit_core_metrics:consumer_updated(ChPid, ConsumerTag, Exclusive, AckRequired,
- QName, Prefetch, Active, ActivityStatus, Args).
+update_consumer(QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch,
+ Active, ActivityStatus, Args) ->
+ catch rabbit_core_metrics:consumer_updated(ChPid, ConsumerTag, Exclusive,
+ AckRequired,
+ QName, Prefetch, Active,
+ ActivityStatus, Args).
cancel_consumer_handler(QName, {ConsumerTag, ChPid}) ->
- local_or_remote_handler(ChPid, rabbit_quorum_queue, cancel_consumer,
- [QName, ChPid, ConsumerTag]).
+ catch local_or_remote_handler(ChPid, rabbit_quorum_queue, cancel_consumer,
+ [QName, ChPid, ConsumerTag]).
cancel_consumer(QName, ChPid, ConsumerTag) ->
catch rabbit_core_metrics:consumer_deleted(ChPid, ConsumerTag, QName),
@@ -309,7 +315,7 @@ local_or_remote_handler(ChPid, Module, Function, Args) ->
false ->
%% this could potentially block for a while if the node is
%% in disconnected state or tcp buffers are full
- rpc:cast(Node, Module, Function, Args)
+ erpc:cast(Node, Module, Function, Args)
end.
become_leader(QName, Name) ->
@@ -329,8 +335,8 @@ become_leader(QName, Name) ->
case rabbit_amqqueue:lookup(QName) of
{ok, Q0} when ?is_amqqueue(Q0) ->
Nodes = get_nodes(Q0),
- [rpc:call(Node, ?MODULE, rpc_delete_metrics,
- [QName], ?RPC_TIMEOUT)
+ [_ = erpc_call(Node, ?MODULE, rpc_delete_metrics,
+ [QName], ?RPC_TIMEOUT)
|| Node <- Nodes, Node =/= node()];
_ ->
ok
@@ -676,8 +682,8 @@ delete(Q, _IfUnused, _IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) ->
end,
notify_decorators(QName, shutdown),
ok = delete_queue_data(QName, ActingUser),
- rpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName],
- ?RPC_TIMEOUT),
+ _ = erpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName],
+ ?RPC_TIMEOUT),
{ok, ReadyMsgs};
{error, {no_more_servers_to_try, Errs}} ->
case lists:all(fun({{error, noproc}, _}) -> true;
@@ -971,9 +977,8 @@ cleanup_data_dir() ->
end
|| Q <- rabbit_amqqueue:list_by_type(?MODULE),
lists:member(node(), get_nodes(Q))],
- NoQQClusters = rabbit_ra_registry:list_not_quorum_clusters(),
Registered = ra_directory:list_registered(?RA_SYSTEM),
- Running = Names ++ NoQQClusters,
+ Running = Names,
_ = [maybe_delete_data_dir(UId) || {Name, UId} <- Registered,
not lists:member(Name, Running)],
ok.
@@ -1436,9 +1441,11 @@ i(memory, Q) when ?is_amqqueue(Q) ->
i(state, Q) when ?is_amqqueue(Q) ->
{Name, Node} = amqqueue:get_pid(Q),
%% Check against the leader or last known leader
- case rpc:call(Node, ?MODULE, cluster_state, [Name], ?RPC_TIMEOUT) of
- {badrpc, _} -> down;
- State -> State
+ case erpc_call(Node, ?MODULE, cluster_state, [Name], ?RPC_TIMEOUT) of
+ {error, _} ->
+ down;
+ State ->
+ State
end;
i(local_state, Q) when ?is_amqqueue(Q) ->
{Name, _} = amqqueue:get_pid(Q),
@@ -1457,7 +1464,7 @@ i(online, Q) -> online(Q);
i(leader, Q) -> leader(Q);
i(open_files, Q) when ?is_amqqueue(Q) ->
{Name, _} = amqqueue:get_pid(Q),
- Nodes = get_nodes(Q),
+ Nodes = get_connected_nodes(Q),
{Data, _} = rpc:multicall(Nodes, ?MODULE, open_files, [Name]),
lists:flatten(Data);
i(single_active_consumer_pid, Q) when ?is_amqqueue(Q) ->
@@ -1559,7 +1566,7 @@ peek(_Pos, Q) when ?is_amqqueue(Q) andalso ?amqqueue_is_classic(Q) ->
{error, classic_queue_not_supported}.
online(Q) when ?is_amqqueue(Q) ->
- Nodes = get_nodes(Q),
+ Nodes = get_connected_nodes(Q),
{Name, _} = amqqueue:get_pid(Q),
[Node || Node <- Nodes, is_process_alive(Name, Node)].
@@ -1568,7 +1575,10 @@ format(Q) when ?is_amqqueue(Q) ->
[{members, Nodes}, {online, online(Q)}, {leader, leader(Q)}].
is_process_alive(Name, Node) ->
- erlang:is_pid(rpc:call(Node, erlang, whereis, [Name], ?RPC_TIMEOUT)).
+ %% don't attempt rpc if node is not already connected
+ %% as this function is used for metrics and stats and the additional
+ %% latency isn't warranted
+ erlang:is_pid(erpc_call(Node, erlang, whereis, [Name], ?RPC_TIMEOUT)).
-spec quorum_messages(rabbit_amqqueue:name()) -> non_neg_integer().
@@ -1626,6 +1636,10 @@ get_nodes(Q) when ?is_amqqueue(Q) ->
#{nodes := Nodes} = amqqueue:get_type_state(Q),
Nodes.
+get_connected_nodes(Q) when ?is_amqqueue(Q) ->
+ ErlangNodes = [node() | nodes()],
+ [N || N <- get_nodes(Q), lists:member(N, ErlangNodes)].
+
update_type_state(Q, Fun) when ?is_amqqueue(Q) ->
Ts = amqqueue:get_type_state(Q),
amqqueue:set_type_state(Q, Fun(Ts)).
@@ -1691,14 +1705,6 @@ prepare_content(Content) ->
%% rabbit_fifo can directly parse it without having to decode again.
Content.
-erpc_timeout(Node, _)
- when Node =:= node() ->
- %% Only timeout 'infinity' optimises the local call in OTP 23-25 avoiding a new process being spawned:
- %% https://github.com/erlang/otp/blob/47f121af8ee55a0dbe2a8c9ab85031ba052bad6b/lib/kernel/src/erpc.erl#L121
- infinity;
-erpc_timeout(_, Timeout) ->
- Timeout.
-
ets_lookup_element(Tbl, Key, Pos, Default) ->
try ets:lookup_element(Tbl, Key, Pos) of
V -> V
@@ -1706,3 +1712,30 @@ ets_lookup_element(Tbl, Key, Pos, Default) ->
_:badarg ->
Default
end.
+
+erpc_call(Node, M, F, A, Timeout)
+ when is_integer(Timeout) andalso Node == node() ->
+ %% Only timeout 'infinity' optimises the local call in OTP 23-25 avoiding a new process being spawned:
+ %% https://github.com/erlang/otp/blob/47f121af8ee55a0dbe2a8c9ab85031ba052bad6b/lib/kernel/src/erpc.erl#L121
+ try erpc:call(Node, M, F, A, Timeout) of
+ Result ->
+ Result
+ catch
+ error:Err ->
+ {error, Err}
+ end;
+erpc_call(Node, M, F, A, Timeout) ->
+ case lists:member(Node, nodes()) of
+ true ->
+ try erpc:call(Node, M, F, A, Timeout) of
+ Result ->
+ Result
+ catch
+ error:Err ->
+ {error, Err}
+ end;
+ false ->
+ {error, noconnection}
+ end.
+
+
diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl
index 035398bd56..6dd29c5312 100644
--- a/deps/rabbit/test/quorum_queue_SUITE.erl
+++ b/deps/rabbit/test/quorum_queue_SUITE.erl
@@ -1705,7 +1705,7 @@ add_member(Config) ->
ok = rabbit_control_helper:command(stop_app, Server1),
ok = rabbit_control_helper:command(join_cluster, Server1, [atom_to_list(Server0)], []),
rabbit_control_helper:command(start_app, Server1),
- ?assertEqual(ok, rpc:call(Server0, rabbit_quorum_queue, add_member,
+ ?assertEqual(ok, rpc:call(Server1, rabbit_quorum_queue, add_member,
[<<"/">>, QQ, Server1, 5000])),
Info = rpc:call(Server0, rabbit_quorum_queue, infos,
[rabbit_misc:r(<<"/">>, queue, QQ)]),