summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <klishinm@vmware.com>2022-01-08 02:34:28 +0300
committerGitHub <noreply@github.com>2022-01-08 02:34:28 +0300
commit4a3d92677c6413a570f7c7162cf4012f8137d4ff (patch)
treef114d98e3f4c19d82e685bb88a8e1cb648e32163
parent8e2edc76c288e04ae89fbd6996fd429be733b082 (diff)
parent82470e9d1c8f2019ff9b3d5054a36d9357f8f90a (diff)
downloadrabbitmq-server-git-4a3d92677c6413a570f7c7162cf4012f8137d4ff.tar.gz
Merge pull request #3967 from rabbitmq/tomyouyou-stream_select_leader
Fix Stream coordinator leader selection bug
-rw-r--r--deps/rabbit/src/rabbit_stream_coordinator.erl48
-rw-r--r--deps/rabbit/src/rabbit_stream_coordinator.hrl2
-rw-r--r--deps/rabbit/test/rabbit_stream_coordinator_SUITE.erl1
3 files changed, 38 insertions, 13 deletions
diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl
index f88822a54f..82e93da876 100644
--- a/deps/rabbit/src/rabbit_stream_coordinator.erl
+++ b/deps/rabbit/src/rabbit_stream_coordinator.erl
@@ -16,7 +16,9 @@
state_enter/2,
init_aux/1,
handle_aux/6,
- tick/2]).
+ tick/2,
+ version/0,
+ which_module/1]).
-export([recover/0,
add_replica/2,
@@ -311,6 +313,11 @@ all_coord_members() ->
Nodes = rabbit_mnesia:cluster_nodes(running) -- [node()],
[{?MODULE, Node} || Node <- [node() | Nodes]].
+version() -> 1.
+
+which_module(_) ->
+ ?MODULE.
+
init(_Conf) ->
#?MODULE{}.
@@ -320,7 +327,7 @@ apply(#{index := _Idx} = Meta0, {_CmdTag, StreamId, #{}} = Cmd,
#?MODULE{streams = Streams0,
monitors = Monitors0} = State0) ->
Stream0 = maps:get(StreamId, Streams0, undefined),
- Meta = maps:without([term, machine_version], Meta0),
+ Meta = maps:without([term], Meta0),
case filter_command(Meta, Cmd, Stream0) of
ok ->
Stream1 = update_stream(Meta, Cmd, Stream0),
@@ -431,6 +438,8 @@ apply(Meta, {nodeup, Node} = Cmd,
end, {Streams0, Effects0}, Streams0),
return(Meta, State#?MODULE{monitors = Monitors,
streams = Streams}, ok, Effects);
+apply(Meta, {machine_version, _From, _To}, State) ->
+ return(Meta, State, ok, []);
apply(Meta, UnkCmd, State) ->
rabbit_log:debug("~s: unknown command ~W",
[?MODULE, UnkCmd, 10]),
@@ -1040,7 +1049,8 @@ update_stream0(#{system_time := _Ts},
%% epochs?
Stream0
end;
-update_stream0(#{system_time := _Ts},
+update_stream0(#{system_time := _Ts,
+ machine_version := MachineVersion},
{member_stopped, _StreamId,
#{node := Node,
index := Idx,
@@ -1089,15 +1099,15 @@ update_stream0(#{system_time := _Ts},
Members1 = Members0#{Node => Member},
- Offsets = [{N, T}
- || #member{state = {stopped, E, T},
- target = running,
- node = N} <- maps:values(Members1),
- E == Epoch],
- case is_quorum(length(Nodes), length(Offsets)) of
+ EpochOffsets = [{N, T}
+ || #member{state = {stopped, E, T},
+ target = running,
+ node = N} <- maps:values(Members1),
+ E == Epoch],
+ case is_quorum(length(Nodes), length(EpochOffsets)) of
true ->
%% select leader
- NewWriterNode = select_leader(Offsets),
+ NewWriterNode = select_leader(MachineVersion, EpochOffsets),
NextEpoch = Epoch + 1,
Members = maps:map(
fun (N, #member{state = {stopped, E, _}} = M)
@@ -1527,7 +1537,9 @@ find_leader(Members) ->
{undefined, Replicas}
end.
-select_leader(Offsets) ->
+select_leader(0, EpochOffsets) ->
+ %% this is the version 0 faulty version of this code,
+ %% retained for versioning
[{Node, _} | _] = lists:sort(fun({_, {Ao, E}}, {_, {Bo, E}}) ->
Ao >= Bo;
({_, {_, Ae}}, {_, {_, Be}}) ->
@@ -1536,7 +1548,19 @@ select_leader(Offsets) ->
false;
(_, {_, empty}) ->
true
- end, Offsets),
+ end, EpochOffsets),
+ Node;
+select_leader(_Version, EpochOffsets) ->
+ [{Node, _} | _] = lists:sort(
+ fun({_, {Epoch, OffsetA}}, {_, {Epoch, OffsetB}}) ->
+ OffsetA >= OffsetB;
+ ({_, {EpochA, _}}, {_, {EpochB, _}}) ->
+ EpochA >= EpochB;
+ ({_, empty}, _) ->
+ false;
+ (_, {_, empty}) ->
+ true
+ end, EpochOffsets),
Node.
maybe_sleep({{nodedown, _}, _}) ->
diff --git a/deps/rabbit/src/rabbit_stream_coordinator.hrl b/deps/rabbit/src/rabbit_stream_coordinator.hrl
index cbf6b69b1e..8e4317a3d9 100644
--- a/deps/rabbit/src/rabbit_stream_coordinator.hrl
+++ b/deps/rabbit/src/rabbit_stream_coordinator.hrl
@@ -11,7 +11,7 @@
atom() => term()}.
-type monitor_role() :: member | listener.
-type queue_ref() :: rabbit_types:r(queue).
--type tail() :: {osiris:offset(), osiris:epoch()} | empty.
+-type tail() :: {osiris:epoch(), osiris:offset()} | empty.
-record(member,
{state = {down, 0} :: {down, osiris:epoch()}
diff --git a/deps/rabbit/test/rabbit_stream_coordinator_SUITE.erl b/deps/rabbit/test/rabbit_stream_coordinator_SUITE.erl
index 316d6c262b..e2900b8cee 100644
--- a/deps/rabbit/test/rabbit_stream_coordinator_SUITE.erl
+++ b/deps/rabbit/test/rabbit_stream_coordinator_SUITE.erl
@@ -1131,6 +1131,7 @@ delete_replica_leader(_) ->
meta(N) when is_integer(N) ->
#{index => N,
+ machine_version => 1,
system_time => N + 1000}.
started_stream(StreamId, LeaderPid, ReplicaPids) ->