summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKarl Nilsson <kjnilsson@gmail.com>2021-11-12 17:27:34 +0000
committerKarl Nilsson <kjnilsson@gmail.com>2021-11-12 17:27:34 +0000
commit7e4a33708b1c3b05c4282136b4eb81113628e5a8 (patch)
tree1de5d191c2bc0c2f8d111944b3d89a1bcb250a69
parent115b951b9cc718fd40aa560e49319f57c762f05d (diff)
downloadrabbitmq-server-git-7e4a33708b1c3b05c4282136b4eb81113628e5a8.tar.gz
Stream coordinator: reset reply_to for delete_stream commandstream-coordinator-delete-stream-fix
So that a reply is sent to the caller immediately after the command has been processed as intended. Previously it was possible if reply_to was already set that a reply never was sent to the caller and the caller times out. This should improve some flakyness in the rabbit_stream_queue suite as well. Strictly this is a change that introduces indeterminism in the coordinator state machine as during an upgrade different members may run different code for this command. But as this state only affects side effects (replies) and the state for the streams affected will shortly be removed this is very unlikely to cause any real issues.
-rw-r--r--deps/rabbit/src/rabbit_stream_coordinator.erl11
1 files changed, 4 insertions, 7 deletions
diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl
index 10eccefd5f..54a0e7be09 100644
--- a/deps/rabbit/src/rabbit_stream_coordinator.erl
+++ b/deps/rabbit/src/rabbit_stream_coordinator.erl
@@ -922,7 +922,9 @@ update_stream0(#{system_time := _Ts} = _Meta,
M#member{target = deleted}
end, Members0),
Stream0#stream{members = Members,
- % reply_to = maps:get(from, Meta, undefined),
+ %% reset reply_to here to ensure a reply
+ %% is returned as the command has been accepted
+ reply_to = undefined,
target = deleted};
update_stream0(#{system_time := _Ts} = _Meta,
{add_replica, _StreamId, #{node := Node}},
@@ -1260,12 +1262,7 @@ evaluate_stream(#{index := Idx} = Meta,
Action = {aux, {delete_member, StreamId, LeaderNode,
make_writer_conf(Writer0, Stream0)}},
Writer = Writer0#member{current = {deleting, Idx}},
- Effs = case From of
- undefined ->
- [Action | Effs0];
- _ ->
- wrap_reply(From, {ok, 0}) ++ [Action | Effs0]
- end,
+ Effs = [Action | Effs0],
Stream = Stream0#stream{reply_to = undefined},
eval_replicas(Meta, Writer, Replicas, Stream, Effs);
{#member{state = {down, Epoch},