summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <klishinm@vmware.com>2021-11-16 16:53:54 +0300
committerGitHub <noreply@github.com>2021-11-16 16:53:54 +0300
commitf47de0091030d352e24d089cf82bbad7f7fcde39 (patch)
treee1caa340f33c1933c316f5b33296852210f6521c
parente5a4907a60f7da479a349f6289eab1ebd42c00b4 (diff)
parentbc7b339e7ac87c43f7b42557444c1b5a20cf0263 (diff)
downloadrabbitmq-server-git-f47de0091030d352e24d089cf82bbad7f7fcde39.tar.gz
Merge pull request #3731 from rabbitmq/stream-mnesia-update
Stream coordinator: only update amqqueue record if stream id matches
-rw-r--r--deps/rabbit/src/rabbit_stream_coordinator.erl13
-rw-r--r--deps/rabbit/src/rabbit_stream_queue.erl19
-rw-r--r--deps/rabbit/test/rabbit_stream_queue_SUITE.erl16
3 files changed, 38 insertions, 10 deletions
diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl
index 54a0e7be09..c07b172b43 100644
--- a/deps/rabbit/src/rabbit_stream_coordinator.erl
+++ b/deps/rabbit/src/rabbit_stream_coordinator.erl
@@ -817,7 +817,18 @@ phase_update_mnesia(StreamId, Args, #{reference := QName,
rabbit_log:debug("~s: running mnesia update for ~s: ~W",
[?MODULE, StreamId, Conf, 10]),
Fun = fun (Q) ->
- amqqueue:set_type_state(amqqueue:set_pid(Q, LeaderPid), Conf)
+ case amqqueue:get_type_state(Q) of
+ #{name := S} when S == StreamId ->
+ %% the stream id matches so we can update the
+ %% amqqueue record
+ amqqueue:set_type_state(
+ amqqueue:set_pid(Q, LeaderPid), Conf);
+ _ ->
+ %% if the stream id isn't a match this is a stale
+ %% update from a previous stream incarnation for the
+ %% same queue name and we ignore it
+ Q
+ end
end,
try rabbit_misc:execute_mnesia_transaction(
fun() ->
diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl
index ed11b5083f..cc6700b42f 100644
--- a/deps/rabbit/src/rabbit_stream_queue.erl
+++ b/deps/rabbit/src/rabbit_stream_queue.erl
@@ -757,15 +757,16 @@ make_stream_conf(Node, Q) ->
R =/= undefined
end, [{max_bytes, MaxBytes},
{max_age, MaxAge}]),
- add_if_defined(max_segment_size_bytes, MaxSegmentSizeBytes, #{reference => QName,
- name => Name,
- retention => Retention,
- nodes => [Node | Replicas],
- leader_locator_strategy => LeaderLocator,
- leader_node => Node,
- replica_nodes => Replicas,
- event_formatter => Formatter,
- epoch => 1}).
+ add_if_defined(max_segment_size_bytes, MaxSegmentSizeBytes,
+ #{reference => QName,
+ name => Name,
+ retention => Retention,
+ nodes => [Node | Replicas],
+ leader_locator_strategy => LeaderLocator,
+ leader_node => Node,
+ replica_nodes => Replicas,
+ event_formatter => Formatter,
+ epoch => 1}).
select_stream_nodes(Size, All) when length(All) =< Size ->
All;
diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl
index 879b1a3288..6896d76f58 100644
--- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl
+++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl
@@ -61,6 +61,7 @@ groups() ->
initial_cluster_size_two,
initial_cluster_size_one_policy,
leader_locator_client_local,
+ declare_delete_same_stream,
leader_locator_random,
leader_locator_least_leaders]},
{cluster_size_3_parallel_2, [parallel], all_tests()},
@@ -1645,6 +1646,21 @@ initial_cluster_size_one_policy(Config) ->
ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"cluster-size">>),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
+declare_delete_same_stream(Config) ->
+ Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Q = ?config(queue_name, Config),
+
+ [begin
+ Ch = rabbit_ct_client_helpers:open_channel(Config, S),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+ ?assertMatch(#'queue.delete_ok'{},
+ amqp_channel:call(Ch, #'queue.delete'{queue = Q})),
+ rabbit_ct_client_helpers:close_channel(Ch)
+ end || _ <- lists:seq(1, 20), S <- Servers],
+
+ ok.
+
leader_locator_client_local(Config) ->
[Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),