summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <klishinm@vmware.com>2021-11-30 19:53:25 +0300
committerGitHub <noreply@github.com>2021-11-30 19:53:25 +0300
commit3be3569fd705b9f032698f27eb114c0ad6912e42 (patch)
tree1c4ce48b9a195794b98d4b4c50427430f792bad3
parent6a3a9ecc95bcbb5d02ba6e972f7bf47a3a0a7711 (diff)
parentbe525c2ebe2652c9b2a779ecdb0c97a049a47790 (diff)
downloadrabbitmq-server-git-3be3569fd705b9f032698f27eb114c0ad6912e42.tar.gz
Merge pull request #3814 from rabbitmq/stream-declare-size
Query total number of messages on stream leader on queue.declare
-rw-r--r--deps/rabbit/src/rabbit_stream_queue.erl13
-rw-r--r--deps/rabbit/test/rabbit_stream_queue_SUITE.erl32
2 files changed, 43 insertions, 2 deletions
diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl
index cc6700b42f..dee8740a29 100644
--- a/deps/rabbit/src/rabbit_stream_queue.erl
+++ b/deps/rabbit/src/rabbit_stream_queue.erl
@@ -158,7 +158,18 @@ policy_changed(Q) ->
ok.
stat(Q) ->
- {ok, i(messages, Q), 0}.
+ Conf = amqqueue:get_type_state(Q),
+ case maps:get(leader_node, Conf) of
+ Node when Node =/= node() ->
+ case rpc:call(Node, ?MODULE, info, [Q, [messages]]) of
+ {badrpc, _} ->
+ {ok, 0, 0};
+ [{messages, Messages}] ->
+ {ok, Messages, 0}
+ end;
+ _ ->
+ {ok, i(messages, Q), 0}
+ end.
consume(Q, #{prefetch_count := 0}, _)
when ?amqqueue_is_stream(Q) ->
diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl
index 6896d76f58..12aadff011 100644
--- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl
+++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl
@@ -11,6 +11,7 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
+-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
-compile(nowarn_export_all).
-compile(export_all).
@@ -48,7 +49,8 @@ groups() ->
leader_failover_dedupe,
add_replicas,
publish_coordinator_unavailable,
- leader_locator_policy]},
+ leader_locator_policy,
+ queue_size_on_declare]},
{cluster_size_3_1, [], [shrink_coordinator_cluster]},
{cluster_size_3_2, [], [recover,
declare_with_node_down]},
@@ -1799,6 +1801,34 @@ leader_locator_policy(Config) ->
ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"leader-locator">>),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
+queue_size_on_declare(Config) ->
+ [Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server1),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+ publish_confirm(Ch1, Q, [<<"msg1">> || _ <- lists:seq(1, 100)]),
+
+ %% Metrics update is not synchronous, wait until metrics are updated on the leader node.
+ %% Afterwards, all replicas will get the right size as they have to query the writer node
+ ?awaitMatch({'queue.declare_ok', Q, 100, 0},
+ declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}]),
+ 60000),
+ amqp_channel:close(Ch1),
+
+ Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server2),
+ ?assertEqual({'queue.declare_ok', Q, 100, 0},
+ declare(Ch2, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+ amqp_channel:close(Ch2),
+
+ Ch3 = rabbit_ct_client_helpers:open_channel(Config, Server3),
+ ?assertEqual({'queue.declare_ok', Q, 100, 0},
+ declare(Ch3, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+ amqp_channel:close(Ch3),
+
+ rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
+
repeat_until(_, 0) ->
ct:fail("Condition did not materialize in the expected amount of attempts");
repeat_until(Fun, N) ->