summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2021-08-10 01:33:32 +0300
committermergify-bot <noreply@mergify.io>2021-08-09 23:31:33 +0000
commite63d89a09d82730021aacf551e387fa4240f6245 (patch)
tree0bcc34c3b1c2c151547b0d6e71fc60817e049785
parentec344ef72458973decdf5c8979c65abd7bbea364 (diff)
downloadrabbitmq-server-git-e63d89a09d82730021aacf551e387fa4240f6245.tar.gz
Revert "CQ: don't deliver right before acking in the index"
This reverts commit 3ef858746ceba1e27167264351d8450f5fc780b4. This change does not handle expired messages safely. See https://github.com/rabbitmq/rabbitmq-server/discussions/3272. (cherry picked from commit 36ad3a5b3db6aa2cb4ecd0500f653001c9adda9e)
-rw-r--r--deps/rabbit/src/rabbit_queue_index.erl4
-rw-r--r--deps/rabbit/src/rabbit_variable_queue.erl11
2 files changed, 8 insertions, 7 deletions
diff --git a/deps/rabbit/src/rabbit_queue_index.erl b/deps/rabbit/src/rabbit_queue_index.erl
index 98185dbd82..12d39fce25 100644
--- a/deps/rabbit/src/rabbit_queue_index.erl
+++ b/deps/rabbit/src/rabbit_queue_index.erl
@@ -841,9 +841,9 @@ action_to_entry(RelSeq, Action, JEntries) ->
end};
({Pub, no_del, no_ack}) when Action == del ->
{set, {Pub, del, no_ack}};
- ({no_pub, _del, no_ack}) when Action == ack ->
+ ({no_pub, del, no_ack}) when Action == ack ->
{set, {no_pub, del, ack}};
- ({?PUB, _del, no_ack}) when Action == ack ->
+ ({?PUB, del, no_ack}) when Action == ack ->
{reset, none}
end.
diff --git a/deps/rabbit/src/rabbit_variable_queue.erl b/deps/rabbit/src/rabbit_variable_queue.erl
index f3cbe56288..8f27652d16 100644
--- a/deps/rabbit/src/rabbit_variable_queue.erl
+++ b/deps/rabbit/src/rabbit_variable_queue.erl
@@ -1774,21 +1774,22 @@ purge_betas_and_deltas(DelsAndAcksFun, State = #vqstate { mode = Mode }) ->
remove_queue_entries(Q, DelsAndAcksFun,
State = #vqstate{msg_store_clients = MSCState}) ->
- {MsgIdsByStore, Acks, State1} =
+ {MsgIdsByStore, Delivers, Acks, State1} =
?QUEUE:foldl(fun remove_queue_entries1/2,
- {maps:new(), [], State}, Q),
+ {maps:new(), [], [], State}, Q),
remove_msgs_by_id(MsgIdsByStore, MSCState),
- DelsAndAcksFun([], Acks, State1).
+ DelsAndAcksFun(Delivers, Acks, State1).
remove_queue_entries1(
- #msg_status { msg_id = MsgId, seq_id = SeqId,
+ #msg_status { msg_id = MsgId, seq_id = SeqId, is_delivered = IsDelivered,
msg_in_store = MsgInStore, index_on_disk = IndexOnDisk,
is_persistent = IsPersistent} = MsgStatus,
- {MsgIdsByStore, Acks, State}) ->
+ {MsgIdsByStore, Delivers, Acks, State}) ->
{case MsgInStore of
true -> rabbit_misc:maps_cons(IsPersistent, MsgId, MsgIdsByStore);
false -> MsgIdsByStore
end,
+ cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers),
cons_if(IndexOnDisk, SeqId, Acks),
stats({-1, 0}, {MsgStatus, none}, 0, State)}.