summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKarl Nilsson <kjnilsson@gmail.com>2021-11-12 17:27:34 +0000
committermergify-bot <noreply@mergify.io>2021-11-13 01:28:53 +0000
commit8c80d6cecebae6a336a443385d0375feb87ec514 (patch)
tree75f4c8a112f42c733097f56dea99ec06bbab165e
parent1c690a337ef851b284dd4e1cce7714203593beb0 (diff)
downloadrabbitmq-server-git-mergify/bp/v3.9.x/pr-3716.tar.gz
Stream coordinator: reset reply_to for delete_stream commandmergify/bp/v3.9.x/pr-3716
So that a reply is sent to the caller immediately after the command has been processed as intended. Previously it was possible if reply_to was already set that a reply never was sent to the caller and the caller times out. This should improve some flakyness in the rabbit_stream_queue suite as well. Strictly this is a change that introduces indeterminism in the coordinator state machine as during an upgrade different members may run different code for this command. But as this state only affects side effects (replies) and the state for the streams affected will shortly be removed this is very unlikely to cause any real issues. (cherry picked from commit 7e4a33708b1c3b05c4282136b4eb81113628e5a8)
-rw-r--r--deps/rabbit/src/rabbit_stream_coordinator.erl11
1 files changed, 4 insertions, 7 deletions
diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl
index 4a689d2df8..f8f34f4a77 100644
--- a/deps/rabbit/src/rabbit_stream_coordinator.erl
+++ b/deps/rabbit/src/rabbit_stream_coordinator.erl
@@ -922,7 +922,9 @@ update_stream0(#{system_time := _Ts} = _Meta,
M#member{target = deleted}
end, Members0),
Stream0#stream{members = Members,
- % reply_to = maps:get(from, Meta, undefined),
+ %% reset reply_to here to ensure a reply
+ %% is returned as the command has been accepted
+ reply_to = undefined,
target = deleted};
update_stream0(#{system_time := _Ts} = _Meta,
{add_replica, _StreamId, #{node := Node}},
@@ -1260,12 +1262,7 @@ evaluate_stream(#{index := Idx} = Meta,
Action = {aux, {delete_member, StreamId, LeaderNode,
make_writer_conf(Writer0, Stream0)}},
Writer = Writer0#member{current = {deleting, Idx}},
- Effs = case From of
- undefined ->
- [Action | Effs0];
- _ ->
- wrap_reply(From, {ok, 0}) ++ [Action | Effs0]
- end,
+ Effs = [Action | Effs0],
Stream = Stream0#stream{reply_to = undefined},
eval_replicas(Meta, Writer, Replicas, Stream, Effs);
{#member{state = {down, Epoch},