diff options
author | Michael Klishin <klishinm@vmware.com> | 2021-11-16 16:53:54 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-11-16 16:53:54 +0300 |
commit | f47de0091030d352e24d089cf82bbad7f7fcde39 (patch) | |
tree | e1caa340f33c1933c316f5b33296852210f6521c | |
parent | e5a4907a60f7da479a349f6289eab1ebd42c00b4 (diff) | |
parent | bc7b339e7ac87c43f7b42557444c1b5a20cf0263 (diff) | |
download | rabbitmq-server-git-f47de0091030d352e24d089cf82bbad7f7fcde39.tar.gz |
Merge pull request #3731 from rabbitmq/stream-mnesia-update
Stream coordinator: only update amqqueue record if stream id matches
-rw-r--r-- | deps/rabbit/src/rabbit_stream_coordinator.erl | 13 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_stream_queue.erl | 19 | ||||
-rw-r--r-- | deps/rabbit/test/rabbit_stream_queue_SUITE.erl | 16 |
3 files changed, 38 insertions, 10 deletions
diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl index 54a0e7be09..c07b172b43 100644 --- a/deps/rabbit/src/rabbit_stream_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_coordinator.erl @@ -817,7 +817,18 @@ phase_update_mnesia(StreamId, Args, #{reference := QName, rabbit_log:debug("~s: running mnesia update for ~s: ~W", [?MODULE, StreamId, Conf, 10]), Fun = fun (Q) -> - amqqueue:set_type_state(amqqueue:set_pid(Q, LeaderPid), Conf) + case amqqueue:get_type_state(Q) of + #{name := S} when S == StreamId -> + %% the stream id matches so we can update the + %% amqqueue record + amqqueue:set_type_state( + amqqueue:set_pid(Q, LeaderPid), Conf); + _ -> + %% if the stream id isn't a match this is a stale + %% update from a previous stream incarnation for the + %% same queue name and we ignore it + Q + end end, try rabbit_misc:execute_mnesia_transaction( fun() -> diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index ed11b5083f..cc6700b42f 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -757,15 +757,16 @@ make_stream_conf(Node, Q) -> R =/= undefined end, [{max_bytes, MaxBytes}, {max_age, MaxAge}]), - add_if_defined(max_segment_size_bytes, MaxSegmentSizeBytes, #{reference => QName, - name => Name, - retention => Retention, - nodes => [Node | Replicas], - leader_locator_strategy => LeaderLocator, - leader_node => Node, - replica_nodes => Replicas, - event_formatter => Formatter, - epoch => 1}). + add_if_defined(max_segment_size_bytes, MaxSegmentSizeBytes, + #{reference => QName, + name => Name, + retention => Retention, + nodes => [Node | Replicas], + leader_locator_strategy => LeaderLocator, + leader_node => Node, + replica_nodes => Replicas, + event_formatter => Formatter, + epoch => 1}). select_stream_nodes(Size, All) when length(All) =< Size -> All; diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl index 879b1a3288..6896d76f58 100644 --- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl @@ -61,6 +61,7 @@ groups() -> initial_cluster_size_two, initial_cluster_size_one_policy, leader_locator_client_local, + declare_delete_same_stream, leader_locator_random, leader_locator_least_leaders]}, {cluster_size_3_parallel_2, [parallel], all_tests()}, @@ -1645,6 +1646,21 @@ initial_cluster_size_one_policy(Config) -> ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"cluster-size">>), rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). +declare_delete_same_stream(Config) -> + Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Q = ?config(queue_name, Config), + + [begin + Ch = rabbit_ct_client_helpers:open_channel(Config, S), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + ?assertMatch(#'queue.delete_ok'{}, + amqp_channel:call(Ch, #'queue.delete'{queue = Q})), + rabbit_ct_client_helpers:close_channel(Ch) + end || _ <- lists:seq(1, 20), S <- Servers], + + ok. + leader_locator_client_local(Config) -> [Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), |