diff options
Diffstat (limited to 'deps/rabbit/src/rabbit_stream_queue.erl')
-rw-r--r-- | deps/rabbit/src/rabbit_stream_queue.erl | 16 |
1 files changed, 13 insertions, 3 deletions
diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index c8c5f57566..bcbd592789 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -654,14 +654,24 @@ status(Vhost, QueueName) -> get_key(Key, Cnt) -> {Key, maps:get(Key, Cnt, undefined)}. +-spec is_writer({pid() | undefined, writer | replica}) -> boolean(). +is_writer({_, writer}) -> true; +is_writer(_Member) -> false. + get_counters(Q) -> #{name := StreamId} = amqqueue:get_type_state(Q), {ok, Members} = rabbit_stream_coordinator:members(StreamId), + %% split members to query the writer last + %% this minimizes the risk of confusing output where replicas are ahead of the writer + Writer = maps:keys(maps:filter(fun (_, M) -> is_writer(M) end, Members)), + Replicas = maps:keys(maps:filter(fun (_, M) -> not is_writer(M) end, Members)), QName = amqqueue:get_name(Q), - Counters = [begin + Counters0 = [begin safe_get_overview(Node, QName) - end || Node <- maps:keys(Members)], - lists:filter(fun (X) -> X =/= undefined end, Counters). + end || Node <- lists:append(Replicas, Writer)], + Counters1 = lists:filter(fun (X) -> X =/= undefined end, Counters0), + %% sort again in the original order (by node) + lists:sort(fun ({_, M1}, {_, M2}) -> maps:get(node, M1) < maps:get(node, M2) end, Counters1). safe_get_overview(Node, QName) -> case rpc:call(Node, ?MODULE, get_overview, [QName]) of |