diff options
author | Loïc Hoguin <lhoguin@vmware.com> | 2022-10-21 10:29:59 +0200 |
---|---|---|
committer | Loïc Hoguin <lhoguin@vmware.com> | 2022-10-21 10:29:59 +0200 |
commit | f854076aa3d5e68dde626c1720a47c33904ad5d9 (patch) | |
tree | 2ca005e57a149e7d69926e5ffe26a8acf955259c | |
parent | b8e5c107d7c1cfae38ed0825b49c145c823f4eba (diff) | |
download | rabbitmq-server-git-f854076aa3d5e68dde626c1720a47c33904ad5d9.tar.gz |
Fix ack crashes when using CMQs with v2
The problem comes from the fact that CQv2 does not store the
MsgId in its index when storing in the v2 store as it is not
necessary... or so I thought. It turns out that CMQ slaves
need it to ack, and CMQs need it for sync. This patch ensures
that a correct MsgId is returned in all cases. The only downside
is that when consuming long queues the slaves will need to read
the messages from disk where they didn't before. This only impacts
"embedded" v2 messages.
-rw-r--r-- | deps/rabbit/src/rabbit_variable_queue.erl | 28 |
1 files changed, 19 insertions, 9 deletions
diff --git a/deps/rabbit/src/rabbit_variable_queue.erl b/deps/rabbit/src/rabbit_variable_queue.erl index 210bdbbf13..caa4c974dd 100644 --- a/deps/rabbit/src/rabbit_variable_queue.erl +++ b/deps/rabbit/src/rabbit_variable_queue.erl @@ -745,8 +745,8 @@ fetch(AckRequired, State) -> {{value, MsgStatus}, State1} -> %% it is possible that the message wasn't read from disk %% at this point, so read it in. - {Msg, State2} = read_msg(MsgStatus, State1), - {AckTag, State3} = remove(AckRequired, MsgStatus, State2), + {Msg = #basic_message{id = MsgId}, State2} = read_msg(MsgStatus, State1), + {AckTag, State3} = remove(AckRequired, MsgStatus#msg_status{msg_id = MsgId}, State2), {{Msg, MsgStatus#msg_status.is_delivered, AckTag}, a(State3)} end. @@ -755,8 +755,17 @@ drop(AckRequired, State) -> {empty, State1} -> {empty, a(State1)}; {{value, MsgStatus}, State1} -> - {AckTag, State2} = remove(AckRequired, MsgStatus, State1), - {{MsgStatus#msg_status.msg_id, AckTag}, a(State2)} + %% This is necessary for CMQs so that slaves can drop the message. + %% @todo Remove this once CMQs are gone. + {MsgStatus1, State3} = case {AckRequired, MsgStatus} of + {true, #msg_status{msg_id = undefined}} -> + {#basic_message{id = MsgId}, State2} = read_msg(MsgStatus, State1), + {MsgStatus#msg_status{msg_id = MsgId}, State2}; + _ -> + {MsgStatus, State1} + end, + {AckTag, State4} = remove(AckRequired, MsgStatus1, State3), + {{MsgStatus1#msg_status.msg_id, AckTag}, a(State4)} end. %% Duplicated from rabbit_backing_queue @@ -1881,9 +1890,9 @@ in_r(MsgStatus = #msg_status { msg = undefined }, State = #vqstate { mode = default, q3 = Q3, q4 = Q4 }) -> case ?QUEUE:is_empty(Q4) of true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) }; - false -> {Msg, State1 = #vqstate { q4 = Q4a }} = + false -> {Msg = #basic_message{id = MsgId}, State1 = #vqstate { q4 = Q4a }} = read_msg(MsgStatus, State), - MsgStatus1 = MsgStatus#msg_status{msg = Msg}, + MsgStatus1 = MsgStatus#msg_status{msg_id = MsgId, msg = Msg}, stats(ready0, {MsgStatus, MsgStatus1}, 0, State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus1, Q4a) }) end; @@ -2134,9 +2143,10 @@ process_queue_entries1( #msg_status { seq_id = SeqId } = MsgStatus, Fun, {NextDeliverSeqId, FetchAcc, State}) -> - {Msg, State1} = read_msg(MsgStatus, State), + {Msg = #basic_message{id = MsgId}, State1} = read_msg(MsgStatus, State), State2 = record_pending_ack( MsgStatus #msg_status { + msg_id = MsgId, is_delivered = true }, State1), {next_deliver_seq_id(SeqId, NextDeliverSeqId), Fun(Msg, SeqId, FetchAcc), @@ -2776,8 +2786,8 @@ msgs_and_indices_written_to_disk(Callback, MsgIdSet) -> %%---------------------------------------------------------------------------- publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) -> - {Msg, State1} = read_msg(MsgStatus, State), - MsgStatus1 = MsgStatus#msg_status { msg = Msg }, + {Msg=#basic_message{id=MsgId}, State1} = read_msg(MsgStatus, State), + MsgStatus1 = MsgStatus#msg_status { msg_id = MsgId, msg = Msg }, {MsgStatus1, stats({1, -1}, {MsgStatus, MsgStatus1}, 0, State1)}; publish_alpha(MsgStatus, State) -> {MsgStatus, stats({1, -1}, {MsgStatus, MsgStatus}, 0, State)}. |