diff options
author | Jean-Sébastien Pédron <jean-sebastien@rabbitmq.com> | 2021-10-27 11:39:50 +0200 |
---|---|---|
committer | mergify-bot <noreply@mergify.io> | 2021-11-03 23:29:04 +0000 |
commit | cff78c159db15d56b887a9622587eebf3d66c432 (patch) | |
tree | 5aa2ee4ac33a42e2d6324ea20605a474a60c7ffb | |
parent | d2c5c63dbbea72e4168569021de8fb723fb3c2f1 (diff) | |
download | rabbitmq-server-git-cff78c159db15d56b887a9622587eebf3d66c432.tar.gz |
rabbit_msg_store: Accept duplicate writes from the same queuemergify/bp/v3.9.x/pr-3615
The situation is this one: if for some reason (i.e. a bug) a queue has
the same message referenced twice (or more) in its index and this
message is eligible for the message store (as opposed to being entirely
stored in the queue index), it will send it multiple times to the
message store.
Until now, the code of the message store had two assertions:
* one verified in the context of the queue process to make sure there
was no two writes or two removes in a row;
* one verified in the context of the message store process, doing
exactly the same.
Consider the following order of events:
1. the queue sends the first copy to the message store
2. the message store handles the first copy
3. the queue sends the second copy to the message
4. the message store handles the second copy
In this scenario, none of the assertions are triggered and the message
goes through the message store as if it was coming from different queues
(i.e. a fan-out use case).
Now consider this order of events:
1. the queue sends the first copy to the message store
2. the queue sends the second copy to the message store
3. the message store handles the first copy
This time, the code will hit both assertions, leading to the crash of
the queue, the crash of the message store and as a consequence, the
crash of the entire vhost.
In the case of two consecutive writes, those assertions are useless
because the message store already knows how to handle multiple copies of
the same message. However, the consequence are catastrophic: a single
queue with a duplicate message could take down an entire vhost.
This patch relaxes the assertion in the case of two consecutive writes.
Now, both scenarii described above will be consistent: the copies from
the same queue will be handled as any copies and the message store and
the vhost will continue to work as usual.
Note that this patch doesn't cover the reason why there were multiple
copies in the queue in the first place! So the initial reason for this
to happen is still there lurking somewhere. The user who saw this
problem had duplicate messages in a dead-letter queue. Perhaps something
related to the lack of publisher-confirms between the initial queue and
the dead-letter one?
(cherry picked from commit 49c733be3a77d94d6a62f587d006699248c83255)
-rw-r--r-- | deps/rabbit/src/rabbit_msg_store.erl | 15 |
1 files changed, 15 insertions, 0 deletions
diff --git a/deps/rabbit/src/rabbit_msg_store.erl b/deps/rabbit/src/rabbit_msg_store.erl index aa7fbcffc1..3e4c70e5b2 100644 --- a/deps/rabbit/src/rabbit_msg_store.erl +++ b/deps/rabbit/src/rabbit_msg_store.erl @@ -686,6 +686,13 @@ client_update_flying(Diff, MsgId, #client_msstate { flying_ets = FlyingEts, false -> try ets:update_counter(FlyingEts, Key, {2, Diff}) of 0 -> ok; Diff -> ok; + Err when Err >= 2 -> + %% The message must be referenced twice in the queue + %% index. There is a bug somewhere, but we don't want + %% to take down anything just because of this. Let's + %% process the message as if the copies were in + %% different queues (fan-out). + ok; Err -> throw({bad_flying_ets_update, Diff, Err, Key}) catch error:badarg -> %% this is guaranteed to succeed since the @@ -1082,6 +1089,14 @@ update_flying(Diff, MsgId, CRef, #msstate { flying_ets = FlyingEts }) -> process; [{_, 0}] -> true = ets:delete_object(FlyingEts, {Key, 0}), ignore; + [{_, Err}] when Err >= 2 -> + %% The message must be referenced twice in the queue index. There + %% is a bug somewhere, but we don't want to take down anything + %% just because of this. Let's process the message as if the + %% copies were in different queues (fan-out). + ets:update_counter(FlyingEts, Key, {2, Diff}), + true = ets:delete_object(FlyingEts, {Key, 0}), + process; [{_, Err}] -> throw({bad_flying_ets_record, Diff, Err, Key}) end. %% [1] We can get here, for example, in the following scenario: There |