diff options
author | Karl Nilsson <kjnilsson@gmail.com> | 2022-01-07 12:06:56 +0000 |
---|---|---|
committer | Karl Nilsson <kjnilsson@gmail.com> | 2022-01-07 12:11:11 +0000 |
commit | 9a5d0f9d85bda218124c0b95edc37fc717cb8cc5 (patch) | |
tree | 79f025c9d235377cba3ca06b0bf690411a54926b | |
parent | e5ccf267ffda0ac9e79abae89a073a456e72312e (diff) | |
download | rabbitmq-server-git-9a5d0f9d85bda218124c0b95edc37fc717cb8cc5.tar.gz |
Make stream coodinator machine versioned
In order to retain deterministic results of state machine applications
during upgrades we need to make the stream coordinator versioned such
that we only use the new logic once the stream coordinator switches to
machine version 1.
-rw-r--r-- | deps/rabbit/src/rabbit_stream_coordinator.erl | 50 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_stream_coordinator.hrl | 2 | ||||
-rw-r--r-- | deps/rabbit/test/rabbit_stream_coordinator_SUITE.erl | 1 |
3 files changed, 38 insertions, 15 deletions
diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl index fd587e20cb..39f5300641 100644 --- a/deps/rabbit/src/rabbit_stream_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_coordinator.erl @@ -16,7 +16,9 @@ state_enter/2, init_aux/1, handle_aux/6, - tick/2]). + tick/2, + version/0, + which_module/1]). -export([recover/0, add_replica/2, @@ -311,6 +313,11 @@ all_coord_members() -> Nodes = rabbit_mnesia:cluster_nodes(running) -- [node()], [{?MODULE, Node} || Node <- [node() | Nodes]]. +version() -> 1. + +which_module(_) -> + ?MODULE. + init(_Conf) -> #?MODULE{}. @@ -320,7 +327,7 @@ apply(#{index := _Idx} = Meta0, {_CmdTag, StreamId, #{}} = Cmd, #?MODULE{streams = Streams0, monitors = Monitors0} = State0) -> Stream0 = maps:get(StreamId, Streams0, undefined), - Meta = maps:without([term, machine_version], Meta0), + Meta = maps:without([term], Meta0), case filter_command(Meta, Cmd, Stream0) of ok -> Stream1 = update_stream(Meta, Cmd, Stream0), @@ -1040,7 +1047,8 @@ update_stream0(#{system_time := _Ts}, %% epochs? Stream0 end; -update_stream0(#{system_time := _Ts}, +update_stream0(#{system_time := _Ts, + machine_version := MachineVersion}, {member_stopped, _StreamId, #{node := Node, index := Idx, @@ -1089,15 +1097,15 @@ update_stream0(#{system_time := _Ts}, Members1 = Members0#{Node => Member}, - Offsets = [{N, T} - || #member{state = {stopped, E, T}, - target = running, - node = N} <- maps:values(Members1), - E == Epoch], - case is_quorum(length(Nodes), length(Offsets)) of + EpochOffsets = [{N, T} + || #member{state = {stopped, E, T}, + target = running, + node = N} <- maps:values(Members1), + E == Epoch], + case is_quorum(length(Nodes), length(EpochOffsets)) of true -> %% select leader - NewWriterNode = select_leader(Offsets), + NewWriterNode = select_leader(MachineVersion, EpochOffsets), NextEpoch = Epoch + 1, Members = maps:map( fun (N, #member{state = {stopped, E, _}} = M) @@ -1527,16 +1535,30 @@ find_leader(Members) -> {undefined, Replicas} end. -select_leader(Offsets) -> - [{Node, _} | _] = lists:sort(fun({_, {E, Ao}}, {_, {E, Bo}}) -> +select_leader(0, EpochOffsets) -> + %% this is the version 0 faulty version of this code, + %% retained for versioning + [{Node, _} | _] = lists:sort(fun({_, {Ao, E}}, {_, {Bo, E}}) -> Ao >= Bo; - ({_, {Ae, _}}, {_, {Be, _}}) -> + ({_, {_, Ae}}, {_, {_, Be}}) -> Ae >= Be; ({_, empty}, _) -> false; (_, {_, empty}) -> true - end, Offsets), + end, EpochOffsets), + Node; +select_leader(_Version, EpochOffsets) -> + [{Node, _} | _] = lists:sort( + fun({_, {Epoch, OffsetA}}, {_, {Epoch, OffsetB}}) -> + OffsetA >= OffsetB; + ({_, {EpochA, _}}, {_, {EpochB, _}}) -> + EpochA >= EpochB; + ({_, empty}, _) -> + false; + (_, {_, empty}) -> + true + end, EpochOffsets), Node. maybe_sleep({{nodedown, _}, _}) -> diff --git a/deps/rabbit/src/rabbit_stream_coordinator.hrl b/deps/rabbit/src/rabbit_stream_coordinator.hrl index cbf6b69b1e..8e4317a3d9 100644 --- a/deps/rabbit/src/rabbit_stream_coordinator.hrl +++ b/deps/rabbit/src/rabbit_stream_coordinator.hrl @@ -11,7 +11,7 @@ atom() => term()}. -type monitor_role() :: member | listener. -type queue_ref() :: rabbit_types:r(queue). --type tail() :: {osiris:offset(), osiris:epoch()} | empty. +-type tail() :: {osiris:epoch(), osiris:offset()} | empty. -record(member, {state = {down, 0} :: {down, osiris:epoch()} diff --git a/deps/rabbit/test/rabbit_stream_coordinator_SUITE.erl b/deps/rabbit/test/rabbit_stream_coordinator_SUITE.erl index 316d6c262b..e2900b8cee 100644 --- a/deps/rabbit/test/rabbit_stream_coordinator_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_coordinator_SUITE.erl @@ -1131,6 +1131,7 @@ delete_replica_leader(_) -> meta(N) when is_integer(N) -> #{index => N, + machine_version => 1, system_time => N + 1000}. started_stream(StreamId, LeaderPid, ReplicaPids) -> |