diff options
author | Karl Nilsson <kjnilsson@gmail.com> | 2021-11-15 15:35:56 +0000 |
---|---|---|
committer | Karl Nilsson <kjnilsson@gmail.com> | 2021-11-16 12:32:33 +0000 |
commit | bc7b339e7ac87c43f7b42557444c1b5a20cf0263 (patch) | |
tree | edcc8c44ab6165119ef1b920ff7436a2cef8ec45 | |
parent | 7e4a33708b1c3b05c4282136b4eb81113628e5a8 (diff) | |
download | rabbitmq-server-git-bc7b339e7ac87c43f7b42557444c1b5a20cf0263.tar.gz |
Stream coordinator: only update amqqueue record if stream id matchesstream-mnesia-update
From the coordinator's POV each stream has a unique id consisting of the
vhost, queuename and a high resolution timestamp even if several stream ids
relate to the same queue record.
When performing the mnesia update the coordinator now checks that the current stream id
matches that of the update_mnesia action and does not change the queue record if
the stream id is not the same.
This should avoid "old" incarnations of a stream queue updating newer ones
with incorrect information.
-rw-r--r-- | deps/rabbit/src/rabbit_stream_coordinator.erl | 13 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_stream_queue.erl | 19 | ||||
-rw-r--r-- | deps/rabbit/test/rabbit_stream_queue_SUITE.erl | 16 |
3 files changed, 38 insertions, 10 deletions
diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl index 54a0e7be09..c07b172b43 100644 --- a/deps/rabbit/src/rabbit_stream_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_coordinator.erl @@ -817,7 +817,18 @@ phase_update_mnesia(StreamId, Args, #{reference := QName, rabbit_log:debug("~s: running mnesia update for ~s: ~W", [?MODULE, StreamId, Conf, 10]), Fun = fun (Q) -> - amqqueue:set_type_state(amqqueue:set_pid(Q, LeaderPid), Conf) + case amqqueue:get_type_state(Q) of + #{name := S} when S == StreamId -> + %% the stream id matches so we can update the + %% amqqueue record + amqqueue:set_type_state( + amqqueue:set_pid(Q, LeaderPid), Conf); + _ -> + %% if the stream id isn't a match this is a stale + %% update from a previous stream incarnation for the + %% same queue name and we ignore it + Q + end end, try rabbit_misc:execute_mnesia_transaction( fun() -> diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index ed11b5083f..cc6700b42f 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -757,15 +757,16 @@ make_stream_conf(Node, Q) -> R =/= undefined end, [{max_bytes, MaxBytes}, {max_age, MaxAge}]), - add_if_defined(max_segment_size_bytes, MaxSegmentSizeBytes, #{reference => QName, - name => Name, - retention => Retention, - nodes => [Node | Replicas], - leader_locator_strategy => LeaderLocator, - leader_node => Node, - replica_nodes => Replicas, - event_formatter => Formatter, - epoch => 1}). + add_if_defined(max_segment_size_bytes, MaxSegmentSizeBytes, + #{reference => QName, + name => Name, + retention => Retention, + nodes => [Node | Replicas], + leader_locator_strategy => LeaderLocator, + leader_node => Node, + replica_nodes => Replicas, + event_formatter => Formatter, + epoch => 1}). select_stream_nodes(Size, All) when length(All) =< Size -> All; diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl index 879b1a3288..6896d76f58 100644 --- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl @@ -61,6 +61,7 @@ groups() -> initial_cluster_size_two, initial_cluster_size_one_policy, leader_locator_client_local, + declare_delete_same_stream, leader_locator_random, leader_locator_least_leaders]}, {cluster_size_3_parallel_2, [parallel], all_tests()}, @@ -1645,6 +1646,21 @@ initial_cluster_size_one_policy(Config) -> ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"cluster-size">>), rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). +declare_delete_same_stream(Config) -> + Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Q = ?config(queue_name, Config), + + [begin + Ch = rabbit_ct_client_helpers:open_channel(Config, S), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + ?assertMatch(#'queue.delete_ok'{}, + amqp_channel:call(Ch, #'queue.delete'{queue = Q})), + rabbit_ct_client_helpers:close_channel(Ch) + end || _ <- lists:seq(1, 20), S <- Servers], + + ok. + leader_locator_client_local(Config) -> [Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), |