summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLoïc Hoguin <lhoguin@vmware.com>2022-10-21 10:29:59 +0200
committerLoïc Hoguin <lhoguin@vmware.com>2022-10-21 10:29:59 +0200
commitf854076aa3d5e68dde626c1720a47c33904ad5d9 (patch)
tree2ca005e57a149e7d69926e5ffe26a8acf955259c
parentb8e5c107d7c1cfae38ed0825b49c145c823f4eba (diff)
downloadrabbitmq-server-git-f854076aa3d5e68dde626c1720a47c33904ad5d9.tar.gz
Fix ack crashes when using CMQs with v2
The problem comes from the fact that CQv2 does not store the MsgId in its index when storing in the v2 store as it is not necessary... or so I thought. It turns out that CMQ slaves need it to ack, and CMQs need it for sync. This patch ensures that a correct MsgId is returned in all cases. The only downside is that when consuming long queues the slaves will need to read the messages from disk where they didn't before. This only impacts "embedded" v2 messages.
-rw-r--r--deps/rabbit/src/rabbit_variable_queue.erl28
1 files changed, 19 insertions, 9 deletions
diff --git a/deps/rabbit/src/rabbit_variable_queue.erl b/deps/rabbit/src/rabbit_variable_queue.erl
index 210bdbbf13..caa4c974dd 100644
--- a/deps/rabbit/src/rabbit_variable_queue.erl
+++ b/deps/rabbit/src/rabbit_variable_queue.erl
@@ -745,8 +745,8 @@ fetch(AckRequired, State) ->
{{value, MsgStatus}, State1} ->
%% it is possible that the message wasn't read from disk
%% at this point, so read it in.
- {Msg, State2} = read_msg(MsgStatus, State1),
- {AckTag, State3} = remove(AckRequired, MsgStatus, State2),
+ {Msg = #basic_message{id = MsgId}, State2} = read_msg(MsgStatus, State1),
+ {AckTag, State3} = remove(AckRequired, MsgStatus#msg_status{msg_id = MsgId}, State2),
{{Msg, MsgStatus#msg_status.is_delivered, AckTag}, a(State3)}
end.
@@ -755,8 +755,17 @@ drop(AckRequired, State) ->
{empty, State1} ->
{empty, a(State1)};
{{value, MsgStatus}, State1} ->
- {AckTag, State2} = remove(AckRequired, MsgStatus, State1),
- {{MsgStatus#msg_status.msg_id, AckTag}, a(State2)}
+ %% This is necessary for CMQs so that slaves can drop the message.
+ %% @todo Remove this once CMQs are gone.
+ {MsgStatus1, State3} = case {AckRequired, MsgStatus} of
+ {true, #msg_status{msg_id = undefined}} ->
+ {#basic_message{id = MsgId}, State2} = read_msg(MsgStatus, State1),
+ {MsgStatus#msg_status{msg_id = MsgId}, State2};
+ _ ->
+ {MsgStatus, State1}
+ end,
+ {AckTag, State4} = remove(AckRequired, MsgStatus1, State3),
+ {{MsgStatus1#msg_status.msg_id, AckTag}, a(State4)}
end.
%% Duplicated from rabbit_backing_queue
@@ -1881,9 +1890,9 @@ in_r(MsgStatus = #msg_status { msg = undefined },
State = #vqstate { mode = default, q3 = Q3, q4 = Q4 }) ->
case ?QUEUE:is_empty(Q4) of
true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) };
- false -> {Msg, State1 = #vqstate { q4 = Q4a }} =
+ false -> {Msg = #basic_message{id = MsgId}, State1 = #vqstate { q4 = Q4a }} =
read_msg(MsgStatus, State),
- MsgStatus1 = MsgStatus#msg_status{msg = Msg},
+ MsgStatus1 = MsgStatus#msg_status{msg_id = MsgId, msg = Msg},
stats(ready0, {MsgStatus, MsgStatus1}, 0,
State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus1, Q4a) })
end;
@@ -2134,9 +2143,10 @@ process_queue_entries1(
#msg_status { seq_id = SeqId } = MsgStatus,
Fun,
{NextDeliverSeqId, FetchAcc, State}) ->
- {Msg, State1} = read_msg(MsgStatus, State),
+ {Msg = #basic_message{id = MsgId}, State1} = read_msg(MsgStatus, State),
State2 = record_pending_ack(
MsgStatus #msg_status {
+ msg_id = MsgId,
is_delivered = true }, State1),
{next_deliver_seq_id(SeqId, NextDeliverSeqId),
Fun(Msg, SeqId, FetchAcc),
@@ -2776,8 +2786,8 @@ msgs_and_indices_written_to_disk(Callback, MsgIdSet) ->
%%----------------------------------------------------------------------------
publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) ->
- {Msg, State1} = read_msg(MsgStatus, State),
- MsgStatus1 = MsgStatus#msg_status { msg = Msg },
+ {Msg=#basic_message{id=MsgId}, State1} = read_msg(MsgStatus, State),
+ MsgStatus1 = MsgStatus#msg_status { msg_id = MsgId, msg = Msg },
{MsgStatus1, stats({1, -1}, {MsgStatus, MsgStatus1}, 0, State1)};
publish_alpha(MsgStatus, State) ->
{MsgStatus, stats({1, -1}, {MsgStatus, MsgStatus}, 0, State)}.