summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <klishinm@vmware.com>2021-11-18 15:37:56 +0300
committerGitHub <noreply@github.com>2021-11-18 15:37:56 +0300
commit665d100e39538db1c5b90d4486a8ebf275353d35 (patch)
tree6b7d74789fbe2b10a5d4b53298e7322332a8085a
parentb9c7aa87090a70a0f250f0c3544240b7dee8df0f (diff)
parent8542b54f7e7a881555b3b1e87d95a84022bcd90c (diff)
downloadrabbitmq-server-git-665d100e39538db1c5b90d4486a8ebf275353d35.tar.gz
Merge pull request #3752 from rabbitmq/stream-flakes
Stream flake fixes
-rw-r--r--deps/rabbit/src/rabbit_stream_coordinator.erl22
1 files changed, 19 insertions, 3 deletions
diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl
index 7e8069d866..5c23fbb51d 100644
--- a/deps/rabbit/src/rabbit_stream_coordinator.erl
+++ b/deps/rabbit/src/rabbit_stream_coordinator.erl
@@ -825,7 +825,10 @@ phase_update_mnesia(StreamId, Args, #{reference := QName,
%% amqqueue record
amqqueue:set_type_state(
amqqueue:set_pid(Q, LeaderPid), Conf);
- _ ->
+ Ts ->
+ S = maps:get(name, Ts, undefined),
+ rabbit_log:debug("~s: refusing mnesia update for stale stream id ~s, current ~s",
+ [?MODULE, StreamId, S]),
%% if the stream id isn't a match this is a stale
%% update from a previous stream incarnation for the
%% same queue name and we ignore it
@@ -837,10 +840,23 @@ phase_update_mnesia(StreamId, Args, #{reference := QName,
rabbit_amqqueue:update(QName, Fun)
end) of
not_found ->
+ rabbit_log:debug("~s: resource for stream id ~s not found, "
+ "recovering from rabbit_durable_queue",
+ [?MODULE, StreamId]),
%% This can happen during recovery
+ %% we need to re-initialise the queue record
+ %% if the stream id is a match
[Q] = mnesia:dirty_read(rabbit_durable_queue, QName),
- %% TODO: what is the possible return type here?
- _ = rabbit_amqqueue:ensure_rabbit_queue_record_is_initialized(Fun(Q)),
+ case amqqueue:get_type_state(Q) of
+ #{name := S} when S == StreamId ->
+ rabbit_log:debug("~s: initializing queue record for stream id ~s",
+ [?MODULE, StreamId]),
+ _ = rabbit_amqqueue:ensure_rabbit_queue_record_is_initialized(Fun(Q)),
+ ok;
+ _ ->
+ ok
+ end,
+
send_self_command({mnesia_updated, StreamId, Args});
_ ->
send_self_command({mnesia_updated, StreamId, Args})