summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordcorbacho <dparracorbacho@piotal.io>2021-11-30 15:04:00 +0100
committerdcorbacho <dparracorbacho@piotal.io>2021-11-30 15:09:30 +0100
commit5e9664f9e720bc283aac87c03e79bec54b89440e (patch)
tree2305f951356115ed0f9fd20fce4566e93885d909
parentc9109e0dc1cf6e846a9156f089d9a4857f9be767 (diff)
downloadrabbitmq-server-git-5e9664f9e720bc283aac87c03e79bec54b89440e.tar.gz
Query total number of messages on stream leader on queue.declarestream-declare-size
-rw-r--r--deps/rabbit/src/rabbit_stream_queue.erl14
-rw-r--r--deps/rabbit/test/rabbit_stream_queue_SUITE.erl32
2 files changed, 44 insertions, 2 deletions
diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl
index cc6700b42f..65f21a101c 100644
--- a/deps/rabbit/src/rabbit_stream_queue.erl
+++ b/deps/rabbit/src/rabbit_stream_queue.erl
@@ -158,7 +158,19 @@ policy_changed(Q) ->
ok.
stat(Q) ->
- {ok, i(messages, Q), 0}.
+ QName = amqqueue:get_name(Q),
+ Conf = amqqueue:get_type_state(Q),
+ case maps:get(leader_node, Conf) of
+ Node when Node =/= node() ->
+ case rpc:call(Node, ?MODULE, info, [Q, [messages]]) of
+ {badrpc, _} ->
+ {ok, 0, 0};
+ [{messages, Messages}] ->
+ {ok, Messages, 0}
+ end;
+ _ ->
+ {ok, i(messages, Q), 0}
+ end.
consume(Q, #{prefetch_count := 0}, _)
when ?amqqueue_is_stream(Q) ->
diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl
index 6896d76f58..12aadff011 100644
--- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl
+++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl
@@ -11,6 +11,7 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
+-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
-compile(nowarn_export_all).
-compile(export_all).
@@ -48,7 +49,8 @@ groups() ->
leader_failover_dedupe,
add_replicas,
publish_coordinator_unavailable,
- leader_locator_policy]},
+ leader_locator_policy,
+ queue_size_on_declare]},
{cluster_size_3_1, [], [shrink_coordinator_cluster]},
{cluster_size_3_2, [], [recover,
declare_with_node_down]},
@@ -1799,6 +1801,34 @@ leader_locator_policy(Config) ->
ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"leader-locator">>),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
+queue_size_on_declare(Config) ->
+ [Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server1),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+ publish_confirm(Ch1, Q, [<<"msg1">> || _ <- lists:seq(1, 100)]),
+
+ %% Metrics update is not synchronous, wait until metrics are updated on the leader node.
+ %% Afterwards, all replicas will get the right size as they have to query the writer node
+ ?awaitMatch({'queue.declare_ok', Q, 100, 0},
+ declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}]),
+ 60000),
+ amqp_channel:close(Ch1),
+
+ Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server2),
+ ?assertEqual({'queue.declare_ok', Q, 100, 0},
+ declare(Ch2, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+ amqp_channel:close(Ch2),
+
+ Ch3 = rabbit_ct_client_helpers:open_channel(Config, Server3),
+ ?assertEqual({'queue.declare_ok', Q, 100, 0},
+ declare(Ch3, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+ amqp_channel:close(Ch3),
+
+ rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
+
repeat_until(_, 0) ->
ct:fail("Condition did not materialize in the expected amount of attempts");
repeat_until(Fun, N) ->