summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKarl Nilsson <kjnilsson@gmail.com>2021-11-15 15:35:56 +0000
committerKarl Nilsson <kjnilsson@gmail.com>2021-11-16 12:32:33 +0000
commitbc7b339e7ac87c43f7b42557444c1b5a20cf0263 (patch)
treeedcc8c44ab6165119ef1b920ff7436a2cef8ec45
parent7e4a33708b1c3b05c4282136b4eb81113628e5a8 (diff)
downloadrabbitmq-server-git-bc7b339e7ac87c43f7b42557444c1b5a20cf0263.tar.gz
Stream coordinator: only update amqqueue record if stream id matchesstream-mnesia-update
From the coordinator's POV each stream has a unique id consisting of the vhost, queuename and a high resolution timestamp even if several stream ids relate to the same queue record. When performing the mnesia update the coordinator now checks that the current stream id matches that of the update_mnesia action and does not change the queue record if the stream id is not the same. This should avoid "old" incarnations of a stream queue updating newer ones with incorrect information.
-rw-r--r--deps/rabbit/src/rabbit_stream_coordinator.erl13
-rw-r--r--deps/rabbit/src/rabbit_stream_queue.erl19
-rw-r--r--deps/rabbit/test/rabbit_stream_queue_SUITE.erl16
3 files changed, 38 insertions, 10 deletions
diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl
index 54a0e7be09..c07b172b43 100644
--- a/deps/rabbit/src/rabbit_stream_coordinator.erl
+++ b/deps/rabbit/src/rabbit_stream_coordinator.erl
@@ -817,7 +817,18 @@ phase_update_mnesia(StreamId, Args, #{reference := QName,
rabbit_log:debug("~s: running mnesia update for ~s: ~W",
[?MODULE, StreamId, Conf, 10]),
Fun = fun (Q) ->
- amqqueue:set_type_state(amqqueue:set_pid(Q, LeaderPid), Conf)
+ case amqqueue:get_type_state(Q) of
+ #{name := S} when S == StreamId ->
+ %% the stream id matches so we can update the
+ %% amqqueue record
+ amqqueue:set_type_state(
+ amqqueue:set_pid(Q, LeaderPid), Conf);
+ _ ->
+ %% if the stream id isn't a match this is a stale
+ %% update from a previous stream incarnation for the
+ %% same queue name and we ignore it
+ Q
+ end
end,
try rabbit_misc:execute_mnesia_transaction(
fun() ->
diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl
index ed11b5083f..cc6700b42f 100644
--- a/deps/rabbit/src/rabbit_stream_queue.erl
+++ b/deps/rabbit/src/rabbit_stream_queue.erl
@@ -757,15 +757,16 @@ make_stream_conf(Node, Q) ->
R =/= undefined
end, [{max_bytes, MaxBytes},
{max_age, MaxAge}]),
- add_if_defined(max_segment_size_bytes, MaxSegmentSizeBytes, #{reference => QName,
- name => Name,
- retention => Retention,
- nodes => [Node | Replicas],
- leader_locator_strategy => LeaderLocator,
- leader_node => Node,
- replica_nodes => Replicas,
- event_formatter => Formatter,
- epoch => 1}).
+ add_if_defined(max_segment_size_bytes, MaxSegmentSizeBytes,
+ #{reference => QName,
+ name => Name,
+ retention => Retention,
+ nodes => [Node | Replicas],
+ leader_locator_strategy => LeaderLocator,
+ leader_node => Node,
+ replica_nodes => Replicas,
+ event_formatter => Formatter,
+ epoch => 1}).
select_stream_nodes(Size, All) when length(All) =< Size ->
All;
diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl
index 879b1a3288..6896d76f58 100644
--- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl
+++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl
@@ -61,6 +61,7 @@ groups() ->
initial_cluster_size_two,
initial_cluster_size_one_policy,
leader_locator_client_local,
+ declare_delete_same_stream,
leader_locator_random,
leader_locator_least_leaders]},
{cluster_size_3_parallel_2, [parallel], all_tests()},
@@ -1645,6 +1646,21 @@ initial_cluster_size_one_policy(Config) ->
ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"cluster-size">>),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
+declare_delete_same_stream(Config) ->
+ Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Q = ?config(queue_name, Config),
+
+ [begin
+ Ch = rabbit_ct_client_helpers:open_channel(Config, S),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+ ?assertMatch(#'queue.delete_ok'{},
+ amqp_channel:call(Ch, #'queue.delete'{queue = Q})),
+ rabbit_ct_client_helpers:close_channel(Ch)
+ end || _ <- lists:seq(1, 20), S <- Servers],
+
+ ok.
+
leader_locator_client_local(Config) ->
[Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),