From 34b04bf9ef8ec934a640892ce30732bb5604e324 Mon Sep 17 00:00:00 2001 From: Michal Kuratczyk Date: Wed, 23 Nov 2022 13:56:42 +0100 Subject: Get counters from stream replicas first (#6442) Previously we queried the nodes in the alphabetical order of the node names. That means that the writer could be queried first, which could lead to confusing output where replicas are ahead of the writer (they have a higher offset). This change makes sure that we query the writer last. (cherry picked from commit d3b01942fa08fffb620d318ff9a3cb6c40922003) --- deps/rabbit/src/rabbit_stream_queue.erl | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index c8c5f57566..bcbd592789 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -654,14 +654,24 @@ status(Vhost, QueueName) -> get_key(Key, Cnt) -> {Key, maps:get(Key, Cnt, undefined)}. +-spec is_writer({pid() | undefined, writer | replica}) -> boolean(). +is_writer({_, writer}) -> true; +is_writer(_Member) -> false. + get_counters(Q) -> #{name := StreamId} = amqqueue:get_type_state(Q), {ok, Members} = rabbit_stream_coordinator:members(StreamId), + %% split members to query the writer last + %% this minimizes the risk of confusing output where replicas are ahead of the writer + Writer = maps:keys(maps:filter(fun (_, M) -> is_writer(M) end, Members)), + Replicas = maps:keys(maps:filter(fun (_, M) -> not is_writer(M) end, Members)), QName = amqqueue:get_name(Q), - Counters = [begin + Counters0 = [begin safe_get_overview(Node, QName) - end || Node <- maps:keys(Members)], - lists:filter(fun (X) -> X =/= undefined end, Counters). + end || Node <- lists:append(Replicas, Writer)], + Counters1 = lists:filter(fun (X) -> X =/= undefined end, Counters0), + %% sort again in the original order (by node) + lists:sort(fun ({_, M1}, {_, M2}) -> maps:get(node, M1) < maps:get(node, M2) end, Counters1). safe_get_overview(Node, QName) -> case rpc:call(Node, ?MODULE, get_overview, [QName]) of -- cgit v1.2.1