diff options
author | Karl Nilsson <kjnilsson@gmail.com> | 2021-11-17 10:56:44 +0000 |
---|---|---|
committer | mergify-bot <noreply@mergify.io> | 2021-11-18 12:38:42 +0000 |
commit | dacbfc86533e251e4fa0f29197517bf7a4160e2f (patch) | |
tree | 6008d1de7a00f5c406c356971870ad85f865bacb | |
parent | 92fcf4cb2530d3886a0765f17deb7d015effd711 (diff) | |
download | rabbitmq-server-git-dacbfc86533e251e4fa0f29197517bf7a4160e2f.tar.gz |
Stream coordinator: only initialise queue record if stream id matchesmergify/bp/v3.9.x/pr-3752
(cherry picked from commit 8542b54f7e7a881555b3b1e87d95a84022bcd90c)
-rw-r--r-- | deps/rabbit/src/rabbit_stream_coordinator.erl | 22 |
1 files changed, 19 insertions, 3 deletions
diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl index 5ab71b4e1a..64efcf6930 100644 --- a/deps/rabbit/src/rabbit_stream_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_coordinator.erl @@ -823,7 +823,10 @@ phase_update_mnesia(StreamId, Args, #{reference := QName, %% amqqueue record amqqueue:set_type_state( amqqueue:set_pid(Q, LeaderPid), Conf); - _ -> + Ts -> + S = maps:get(name, Ts, undefined), + rabbit_log:debug("~s: refusing mnesia update for stale stream id ~s, current ~s", + [?MODULE, StreamId, S]), %% if the stream id isn't a match this is a stale %% update from a previous stream incarnation for the %% same queue name and we ignore it @@ -835,10 +838,23 @@ phase_update_mnesia(StreamId, Args, #{reference := QName, rabbit_amqqueue:update(QName, Fun) end) of not_found -> + rabbit_log:debug("~s: resource for stream id ~s not found, " + "recovering from rabbit_durable_queue", + [?MODULE, StreamId]), %% This can happen during recovery + %% we need to re-initialise the queue record + %% if the stream id is a match [Q] = mnesia:dirty_read(rabbit_durable_queue, QName), - %% TODO: what is the possible return type here? - _ = rabbit_amqqueue:ensure_rabbit_queue_record_is_initialized(Fun(Q)), + case amqqueue:get_type_state(Q) of + #{name := S} when S == StreamId -> + rabbit_log:debug("~s: initializing queue record for stream id ~s", + [?MODULE, StreamId]), + _ = rabbit_amqqueue:ensure_rabbit_queue_record_is_initialized(Fun(Q)), + ok; + _ -> + ok + end, + send_self_command({mnesia_updated, StreamId, Args}); _ -> send_self_command({mnesia_updated, StreamId, Args}) |