summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <klishinm@vmware.com>2021-11-04 02:28:29 +0300
committerGitHub <noreply@github.com>2021-11-04 02:28:29 +0300
commite544b518005dc8c13c1f550cf0f0a084fa43f969 (patch)
treec07c04569bd185e199fbafb85f90e8931233d1df
parent65fe1daeab130a1272e61645f59434df39859c2b (diff)
parent49c733be3a77d94d6a62f587d006699248c83255 (diff)
downloadrabbitmq-server-git-e544b518005dc8c13c1f550cf0f0a084fa43f969.tar.gz
Merge pull request #3615 from rabbitmq/relax-msg-store-assertion-around-dup-writes-from-same-queue
rabbit_msg_store: Accept duplicate writes from the same queue
-rw-r--r--deps/rabbit/src/rabbit_msg_store.erl15
1 files changed, 15 insertions, 0 deletions
diff --git a/deps/rabbit/src/rabbit_msg_store.erl b/deps/rabbit/src/rabbit_msg_store.erl
index aa7fbcffc1..3e4c70e5b2 100644
--- a/deps/rabbit/src/rabbit_msg_store.erl
+++ b/deps/rabbit/src/rabbit_msg_store.erl
@@ -686,6 +686,13 @@ client_update_flying(Diff, MsgId, #client_msstate { flying_ets = FlyingEts,
false -> try ets:update_counter(FlyingEts, Key, {2, Diff}) of
0 -> ok;
Diff -> ok;
+ Err when Err >= 2 ->
+ %% The message must be referenced twice in the queue
+ %% index. There is a bug somewhere, but we don't want
+ %% to take down anything just because of this. Let's
+ %% process the message as if the copies were in
+ %% different queues (fan-out).
+ ok;
Err -> throw({bad_flying_ets_update, Diff, Err, Key})
catch error:badarg ->
%% this is guaranteed to succeed since the
@@ -1082,6 +1089,14 @@ update_flying(Diff, MsgId, CRef, #msstate { flying_ets = FlyingEts }) ->
process;
[{_, 0}] -> true = ets:delete_object(FlyingEts, {Key, 0}),
ignore;
+ [{_, Err}] when Err >= 2 ->
+ %% The message must be referenced twice in the queue index. There
+ %% is a bug somewhere, but we don't want to take down anything
+ %% just because of this. Let's process the message as if the
+ %% copies were in different queues (fan-out).
+ ets:update_counter(FlyingEts, Key, {2, Diff}),
+ true = ets:delete_object(FlyingEts, {Key, 0}),
+ process;
[{_, Err}] -> throw({bad_flying_ets_record, Diff, Err, Key})
end.
%% [1] We can get here, for example, in the following scenario: There