summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_quorum_queue.erl10
-rw-r--r--test/quorum_queue_SUITE.erl21
2 files changed, 27 insertions, 4 deletions
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 5515bfd4cb..867af6cb75 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -252,14 +252,15 @@ handle_tick(QName,
0 -> 0;
_ -> rabbit_fifo:usage(Name)
end,
- Infos = [{consumers, C}, {consumer_utilisation, Util},
+ Infos = [{consumers, C},
+ {consumer_utilisation, Util},
{message_bytes_ready, MsgBytesReady},
{message_bytes_unacknowledged, MsgBytesUnack},
{message_bytes, MsgBytesReady + MsgBytesUnack},
{message_bytes_persistent, MsgBytesReady + MsgBytesUnack},
{messages_persistent, M}
- | infos(QName)],
+ | infos(QName, ?STATISTICS_KEYS -- [consumers])],
rabbit_core_metrics:queue_stats(QName, Infos),
rabbit_event:notify(queue_stats,
Infos ++ [{name, QName},
@@ -582,9 +583,12 @@ info(Q) ->
-spec infos(rabbit_types:r('queue')) -> rabbit_types:infos().
infos(QName) ->
+ infos(QName, ?STATISTICS_KEYS).
+
+infos(QName, Keys) ->
case rabbit_amqqueue:lookup(QName) of
{ok, Q} ->
- info(Q, ?STATISTICS_KEYS);
+ info(Q, Keys);
{error, not_found} ->
[]
end.
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index aa617ce4bc..0894c1fb17 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -127,7 +127,8 @@ all_tests() ->
queue_length_in_memory_bytes_limit_subscribe,
queue_length_in_memory_bytes_limit,
queue_length_in_memory_purge,
- in_memory
+ in_memory,
+ consumer_metrics
].
memory_tests() ->
@@ -2244,6 +2245,24 @@ in_memory(Config) ->
?assertEqual([{0, 0}],
dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)).
+consumer_metrics(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch1, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ subscribe(Ch1, QQ, false),
+
+ RaName = ra_name(QQ),
+ {ok, _, {_, Leader}} = ra:members({RaName, Server}),
+ timer:sleep(5000),
+ QNameRes = rabbit_misc:r(<<"/">>, queue, QQ),
+ [{_, PropList, _}] = rpc:call(Leader, ets, lookup, [queue_metrics, QNameRes]),
+ ?assertMatch([{consumers, 1}], lists:filter(fun({Key, _}) ->
+ Key == consumers
+ end, PropList)).
+
%%----------------------------------------------------------------------------
declare(Ch, Q) ->