diff options
author | Michael Klishin <klishinm@vmware.com> | 2022-01-08 02:34:28 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-01-08 02:34:28 +0300 |
commit | 4a3d92677c6413a570f7c7162cf4012f8137d4ff (patch) | |
tree | f114d98e3f4c19d82e685bb88a8e1cb648e32163 | |
parent | 8e2edc76c288e04ae89fbd6996fd429be733b082 (diff) | |
parent | 82470e9d1c8f2019ff9b3d5054a36d9357f8f90a (diff) | |
download | rabbitmq-server-git-4a3d92677c6413a570f7c7162cf4012f8137d4ff.tar.gz |
Merge pull request #3967 from rabbitmq/tomyouyou-stream_select_leader
Fix Stream coordinator leader selection bug
-rw-r--r-- | deps/rabbit/src/rabbit_stream_coordinator.erl | 48 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_stream_coordinator.hrl | 2 | ||||
-rw-r--r-- | deps/rabbit/test/rabbit_stream_coordinator_SUITE.erl | 1 |
3 files changed, 38 insertions, 13 deletions
diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl index f88822a54f..82e93da876 100644 --- a/deps/rabbit/src/rabbit_stream_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_coordinator.erl @@ -16,7 +16,9 @@ state_enter/2, init_aux/1, handle_aux/6, - tick/2]). + tick/2, + version/0, + which_module/1]). -export([recover/0, add_replica/2, @@ -311,6 +313,11 @@ all_coord_members() -> Nodes = rabbit_mnesia:cluster_nodes(running) -- [node()], [{?MODULE, Node} || Node <- [node() | Nodes]]. +version() -> 1. + +which_module(_) -> + ?MODULE. + init(_Conf) -> #?MODULE{}. @@ -320,7 +327,7 @@ apply(#{index := _Idx} = Meta0, {_CmdTag, StreamId, #{}} = Cmd, #?MODULE{streams = Streams0, monitors = Monitors0} = State0) -> Stream0 = maps:get(StreamId, Streams0, undefined), - Meta = maps:without([term, machine_version], Meta0), + Meta = maps:without([term], Meta0), case filter_command(Meta, Cmd, Stream0) of ok -> Stream1 = update_stream(Meta, Cmd, Stream0), @@ -431,6 +438,8 @@ apply(Meta, {nodeup, Node} = Cmd, end, {Streams0, Effects0}, Streams0), return(Meta, State#?MODULE{monitors = Monitors, streams = Streams}, ok, Effects); +apply(Meta, {machine_version, _From, _To}, State) -> + return(Meta, State, ok, []); apply(Meta, UnkCmd, State) -> rabbit_log:debug("~s: unknown command ~W", [?MODULE, UnkCmd, 10]), @@ -1040,7 +1049,8 @@ update_stream0(#{system_time := _Ts}, %% epochs? Stream0 end; -update_stream0(#{system_time := _Ts}, +update_stream0(#{system_time := _Ts, + machine_version := MachineVersion}, {member_stopped, _StreamId, #{node := Node, index := Idx, @@ -1089,15 +1099,15 @@ update_stream0(#{system_time := _Ts}, Members1 = Members0#{Node => Member}, - Offsets = [{N, T} - || #member{state = {stopped, E, T}, - target = running, - node = N} <- maps:values(Members1), - E == Epoch], - case is_quorum(length(Nodes), length(Offsets)) of + EpochOffsets = [{N, T} + || #member{state = {stopped, E, T}, + target = running, + node = N} <- maps:values(Members1), + E == Epoch], + case is_quorum(length(Nodes), length(EpochOffsets)) of true -> %% select leader - NewWriterNode = select_leader(Offsets), + NewWriterNode = select_leader(MachineVersion, EpochOffsets), NextEpoch = Epoch + 1, Members = maps:map( fun (N, #member{state = {stopped, E, _}} = M) @@ -1527,7 +1537,9 @@ find_leader(Members) -> {undefined, Replicas} end. -select_leader(Offsets) -> +select_leader(0, EpochOffsets) -> + %% this is the version 0 faulty version of this code, + %% retained for versioning [{Node, _} | _] = lists:sort(fun({_, {Ao, E}}, {_, {Bo, E}}) -> Ao >= Bo; ({_, {_, Ae}}, {_, {_, Be}}) -> @@ -1536,7 +1548,19 @@ select_leader(Offsets) -> false; (_, {_, empty}) -> true - end, Offsets), + end, EpochOffsets), + Node; +select_leader(_Version, EpochOffsets) -> + [{Node, _} | _] = lists:sort( + fun({_, {Epoch, OffsetA}}, {_, {Epoch, OffsetB}}) -> + OffsetA >= OffsetB; + ({_, {EpochA, _}}, {_, {EpochB, _}}) -> + EpochA >= EpochB; + ({_, empty}, _) -> + false; + (_, {_, empty}) -> + true + end, EpochOffsets), Node. maybe_sleep({{nodedown, _}, _}) -> diff --git a/deps/rabbit/src/rabbit_stream_coordinator.hrl b/deps/rabbit/src/rabbit_stream_coordinator.hrl index cbf6b69b1e..8e4317a3d9 100644 --- a/deps/rabbit/src/rabbit_stream_coordinator.hrl +++ b/deps/rabbit/src/rabbit_stream_coordinator.hrl @@ -11,7 +11,7 @@ atom() => term()}. -type monitor_role() :: member | listener. -type queue_ref() :: rabbit_types:r(queue). --type tail() :: {osiris:offset(), osiris:epoch()} | empty. +-type tail() :: {osiris:epoch(), osiris:offset()} | empty. -record(member, {state = {down, 0} :: {down, osiris:epoch()} diff --git a/deps/rabbit/test/rabbit_stream_coordinator_SUITE.erl b/deps/rabbit/test/rabbit_stream_coordinator_SUITE.erl index 316d6c262b..e2900b8cee 100644 --- a/deps/rabbit/test/rabbit_stream_coordinator_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_coordinator_SUITE.erl @@ -1131,6 +1131,7 @@ delete_replica_leader(_) -> meta(N) when is_integer(N) -> #{index => N, + machine_version => 1, system_time => N + 1000}. started_stream(StreamId, LeaderPid, ReplicaPids) -> |