summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKarl Nilsson <kjnilsson@gmail.com>2022-01-07 12:06:56 +0000
committerKarl Nilsson <kjnilsson@gmail.com>2022-01-07 12:11:11 +0000
commit9a5d0f9d85bda218124c0b95edc37fc717cb8cc5 (patch)
tree79f025c9d235377cba3ca06b0bf690411a54926b
parente5ccf267ffda0ac9e79abae89a073a456e72312e (diff)
downloadrabbitmq-server-git-9a5d0f9d85bda218124c0b95edc37fc717cb8cc5.tar.gz
Make stream coodinator machine versioned
In order to retain deterministic results of state machine applications during upgrades we need to make the stream coordinator versioned such that we only use the new logic once the stream coordinator switches to machine version 1.
-rw-r--r--deps/rabbit/src/rabbit_stream_coordinator.erl50
-rw-r--r--deps/rabbit/src/rabbit_stream_coordinator.hrl2
-rw-r--r--deps/rabbit/test/rabbit_stream_coordinator_SUITE.erl1
3 files changed, 38 insertions, 15 deletions
diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl
index fd587e20cb..39f5300641 100644
--- a/deps/rabbit/src/rabbit_stream_coordinator.erl
+++ b/deps/rabbit/src/rabbit_stream_coordinator.erl
@@ -16,7 +16,9 @@
state_enter/2,
init_aux/1,
handle_aux/6,
- tick/2]).
+ tick/2,
+ version/0,
+ which_module/1]).
-export([recover/0,
add_replica/2,
@@ -311,6 +313,11 @@ all_coord_members() ->
Nodes = rabbit_mnesia:cluster_nodes(running) -- [node()],
[{?MODULE, Node} || Node <- [node() | Nodes]].
+version() -> 1.
+
+which_module(_) ->
+ ?MODULE.
+
init(_Conf) ->
#?MODULE{}.
@@ -320,7 +327,7 @@ apply(#{index := _Idx} = Meta0, {_CmdTag, StreamId, #{}} = Cmd,
#?MODULE{streams = Streams0,
monitors = Monitors0} = State0) ->
Stream0 = maps:get(StreamId, Streams0, undefined),
- Meta = maps:without([term, machine_version], Meta0),
+ Meta = maps:without([term], Meta0),
case filter_command(Meta, Cmd, Stream0) of
ok ->
Stream1 = update_stream(Meta, Cmd, Stream0),
@@ -1040,7 +1047,8 @@ update_stream0(#{system_time := _Ts},
%% epochs?
Stream0
end;
-update_stream0(#{system_time := _Ts},
+update_stream0(#{system_time := _Ts,
+ machine_version := MachineVersion},
{member_stopped, _StreamId,
#{node := Node,
index := Idx,
@@ -1089,15 +1097,15 @@ update_stream0(#{system_time := _Ts},
Members1 = Members0#{Node => Member},
- Offsets = [{N, T}
- || #member{state = {stopped, E, T},
- target = running,
- node = N} <- maps:values(Members1),
- E == Epoch],
- case is_quorum(length(Nodes), length(Offsets)) of
+ EpochOffsets = [{N, T}
+ || #member{state = {stopped, E, T},
+ target = running,
+ node = N} <- maps:values(Members1),
+ E == Epoch],
+ case is_quorum(length(Nodes), length(EpochOffsets)) of
true ->
%% select leader
- NewWriterNode = select_leader(Offsets),
+ NewWriterNode = select_leader(MachineVersion, EpochOffsets),
NextEpoch = Epoch + 1,
Members = maps:map(
fun (N, #member{state = {stopped, E, _}} = M)
@@ -1527,16 +1535,30 @@ find_leader(Members) ->
{undefined, Replicas}
end.
-select_leader(Offsets) ->
- [{Node, _} | _] = lists:sort(fun({_, {E, Ao}}, {_, {E, Bo}}) ->
+select_leader(0, EpochOffsets) ->
+ %% this is the version 0 faulty version of this code,
+ %% retained for versioning
+ [{Node, _} | _] = lists:sort(fun({_, {Ao, E}}, {_, {Bo, E}}) ->
Ao >= Bo;
- ({_, {Ae, _}}, {_, {Be, _}}) ->
+ ({_, {_, Ae}}, {_, {_, Be}}) ->
Ae >= Be;
({_, empty}, _) ->
false;
(_, {_, empty}) ->
true
- end, Offsets),
+ end, EpochOffsets),
+ Node;
+select_leader(_Version, EpochOffsets) ->
+ [{Node, _} | _] = lists:sort(
+ fun({_, {Epoch, OffsetA}}, {_, {Epoch, OffsetB}}) ->
+ OffsetA >= OffsetB;
+ ({_, {EpochA, _}}, {_, {EpochB, _}}) ->
+ EpochA >= EpochB;
+ ({_, empty}, _) ->
+ false;
+ (_, {_, empty}) ->
+ true
+ end, EpochOffsets),
Node.
maybe_sleep({{nodedown, _}, _}) ->
diff --git a/deps/rabbit/src/rabbit_stream_coordinator.hrl b/deps/rabbit/src/rabbit_stream_coordinator.hrl
index cbf6b69b1e..8e4317a3d9 100644
--- a/deps/rabbit/src/rabbit_stream_coordinator.hrl
+++ b/deps/rabbit/src/rabbit_stream_coordinator.hrl
@@ -11,7 +11,7 @@
atom() => term()}.
-type monitor_role() :: member | listener.
-type queue_ref() :: rabbit_types:r(queue).
--type tail() :: {osiris:offset(), osiris:epoch()} | empty.
+-type tail() :: {osiris:epoch(), osiris:offset()} | empty.
-record(member,
{state = {down, 0} :: {down, osiris:epoch()}
diff --git a/deps/rabbit/test/rabbit_stream_coordinator_SUITE.erl b/deps/rabbit/test/rabbit_stream_coordinator_SUITE.erl
index 316d6c262b..e2900b8cee 100644
--- a/deps/rabbit/test/rabbit_stream_coordinator_SUITE.erl
+++ b/deps/rabbit/test/rabbit_stream_coordinator_SUITE.erl
@@ -1131,6 +1131,7 @@ delete_replica_leader(_) ->
meta(N) when is_integer(N) ->
#{index => N,
+ machine_version => 1,
system_time => N + 1000}.
started_stream(StreamId, LeaderPid, ReplicaPids) ->