summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJean-Sébastien Pédron <jean-sebastien@rabbitmq.com>2021-10-27 11:39:50 +0200
committermergify-bot <noreply@mergify.io>2021-11-03 23:29:04 +0000
commitcff78c159db15d56b887a9622587eebf3d66c432 (patch)
tree5aa2ee4ac33a42e2d6324ea20605a474a60c7ffb
parentd2c5c63dbbea72e4168569021de8fb723fb3c2f1 (diff)
downloadrabbitmq-server-git-cff78c159db15d56b887a9622587eebf3d66c432.tar.gz
rabbit_msg_store: Accept duplicate writes from the same queuemergify/bp/v3.9.x/pr-3615
The situation is this one: if for some reason (i.e. a bug) a queue has the same message referenced twice (or more) in its index and this message is eligible for the message store (as opposed to being entirely stored in the queue index), it will send it multiple times to the message store. Until now, the code of the message store had two assertions: * one verified in the context of the queue process to make sure there was no two writes or two removes in a row; * one verified in the context of the message store process, doing exactly the same. Consider the following order of events: 1. the queue sends the first copy to the message store 2. the message store handles the first copy 3. the queue sends the second copy to the message 4. the message store handles the second copy In this scenario, none of the assertions are triggered and the message goes through the message store as if it was coming from different queues (i.e. a fan-out use case). Now consider this order of events: 1. the queue sends the first copy to the message store 2. the queue sends the second copy to the message store 3. the message store handles the first copy This time, the code will hit both assertions, leading to the crash of the queue, the crash of the message store and as a consequence, the crash of the entire vhost. In the case of two consecutive writes, those assertions are useless because the message store already knows how to handle multiple copies of the same message. However, the consequence are catastrophic: a single queue with a duplicate message could take down an entire vhost. This patch relaxes the assertion in the case of two consecutive writes. Now, both scenarii described above will be consistent: the copies from the same queue will be handled as any copies and the message store and the vhost will continue to work as usual. Note that this patch doesn't cover the reason why there were multiple copies in the queue in the first place! So the initial reason for this to happen is still there lurking somewhere. The user who saw this problem had duplicate messages in a dead-letter queue. Perhaps something related to the lack of publisher-confirms between the initial queue and the dead-letter one? (cherry picked from commit 49c733be3a77d94d6a62f587d006699248c83255)
-rw-r--r--deps/rabbit/src/rabbit_msg_store.erl15
1 files changed, 15 insertions, 0 deletions
diff --git a/deps/rabbit/src/rabbit_msg_store.erl b/deps/rabbit/src/rabbit_msg_store.erl
index aa7fbcffc1..3e4c70e5b2 100644
--- a/deps/rabbit/src/rabbit_msg_store.erl
+++ b/deps/rabbit/src/rabbit_msg_store.erl
@@ -686,6 +686,13 @@ client_update_flying(Diff, MsgId, #client_msstate { flying_ets = FlyingEts,
false -> try ets:update_counter(FlyingEts, Key, {2, Diff}) of
0 -> ok;
Diff -> ok;
+ Err when Err >= 2 ->
+ %% The message must be referenced twice in the queue
+ %% index. There is a bug somewhere, but we don't want
+ %% to take down anything just because of this. Let's
+ %% process the message as if the copies were in
+ %% different queues (fan-out).
+ ok;
Err -> throw({bad_flying_ets_update, Diff, Err, Key})
catch error:badarg ->
%% this is guaranteed to succeed since the
@@ -1082,6 +1089,14 @@ update_flying(Diff, MsgId, CRef, #msstate { flying_ets = FlyingEts }) ->
process;
[{_, 0}] -> true = ets:delete_object(FlyingEts, {Key, 0}),
ignore;
+ [{_, Err}] when Err >= 2 ->
+ %% The message must be referenced twice in the queue index. There
+ %% is a bug somewhere, but we don't want to take down anything
+ %% just because of this. Let's process the message as if the
+ %% copies were in different queues (fan-out).
+ ets:update_counter(FlyingEts, Key, {2, Diff}),
+ true = ets:delete_object(FlyingEts, {Key, 0}),
+ process;
[{_, Err}] -> throw({bad_flying_ets_record, Diff, Err, Key})
end.
%% [1] We can get here, for example, in the following scenario: There