summaryrefslogtreecommitdiff
path: root/deps/rabbit/src/rabbit_stream_queue.erl
diff options
context:
space:
mode:
Diffstat (limited to 'deps/rabbit/src/rabbit_stream_queue.erl')
-rw-r--r--deps/rabbit/src/rabbit_stream_queue.erl16
1 files changed, 13 insertions, 3 deletions
diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl
index c8c5f57566..bcbd592789 100644
--- a/deps/rabbit/src/rabbit_stream_queue.erl
+++ b/deps/rabbit/src/rabbit_stream_queue.erl
@@ -654,14 +654,24 @@ status(Vhost, QueueName) ->
get_key(Key, Cnt) ->
{Key, maps:get(Key, Cnt, undefined)}.
+-spec is_writer({pid() | undefined, writer | replica}) -> boolean().
+is_writer({_, writer}) -> true;
+is_writer(_Member) -> false.
+
get_counters(Q) ->
#{name := StreamId} = amqqueue:get_type_state(Q),
{ok, Members} = rabbit_stream_coordinator:members(StreamId),
+ %% split members to query the writer last
+ %% this minimizes the risk of confusing output where replicas are ahead of the writer
+ Writer = maps:keys(maps:filter(fun (_, M) -> is_writer(M) end, Members)),
+ Replicas = maps:keys(maps:filter(fun (_, M) -> not is_writer(M) end, Members)),
QName = amqqueue:get_name(Q),
- Counters = [begin
+ Counters0 = [begin
safe_get_overview(Node, QName)
- end || Node <- maps:keys(Members)],
- lists:filter(fun (X) -> X =/= undefined end, Counters).
+ end || Node <- lists:append(Replicas, Writer)],
+ Counters1 = lists:filter(fun (X) -> X =/= undefined end, Counters0),
+ %% sort again in the original order (by node)
+ lists:sort(fun ({_, M1}, {_, M2}) -> maps:get(node, M1) < maps:get(node, M2) end, Counters1).
safe_get_overview(Node, QName) ->
case rpc:call(Node, ?MODULE, get_overview, [QName]) of