diff options
author | Robert Newson <rnewson@apache.org> | 2022-06-18 10:27:41 +0100 |
---|---|---|
committer | Robert Newson <rnewson@apache.org> | 2022-09-01 17:05:37 +0100 |
commit | db0ab57f7385a5d5fd133d991108c530626bb058 (patch) | |
tree | 9594db637f2fda51454f8ffcdf3185134e2ed076 | |
parent | 9ce27998e78e2361886c2ac3e028946e3bb3b74f (diff) | |
download | couchdb-db0ab57f7385a5d5fd133d991108c530626bb058.tar.gz |
introduce store abstraction (WIP)
-rw-r--r-- | Makefile | 10 | ||||
-rw-r--r-- | src/couch/src/couch_bt_engine.erl | 2 | ||||
-rw-r--r-- | src/couch/src/couch_raft.erl | 154 | ||||
-rw-r--r-- | src/couch/src/couch_raft_log.erl | 52 | ||||
-rw-r--r-- | src/couch/src/couch_raft_store.erl | 35 | ||||
-rw-r--r-- | src/couch/src/couch_raft_store_sha256.erl | 80 | ||||
-rw-r--r-- | src/couch/test/couch_raft_SUITE.erl (renamed from src/couch/test/eunit/couch_raft_SUITE.erl) | 40 |
7 files changed, 246 insertions, 127 deletions
@@ -176,6 +176,16 @@ eunit: couch COUCHDB_VERSION=$(COUCHDB_VERSION) COUCHDB_GIT_SHA=$(COUCHDB_GIT_SHA) $(REBAR) -r eunit $(EUNIT_OPTS) apps=$$dir || exit 1; \ done +.PHONY: ct +ct: export BUILDDIR = $(shell pwd) +ct: export ERL_AFLAGS = -config $(shell pwd)/rel/files/eunit.config +ct: export COUCHDB_QUERY_SERVER_JAVASCRIPT = $(shell pwd)/bin/couchjs $(shell pwd)/share/server/main.js +ct: export COUCHDB_TEST_ADMIN_PARTY_OVERRIDE=1 +ct: couch + @COUCHDB_VERSION=$(COUCHDB_VERSION) COUCHDB_GIT_SHA=$(COUCHDB_GIT_SHA) $(REBAR) setup_eunit 2> /dev/null + @for dir in $(subdirs); do \ + COUCHDB_VERSION=$(COUCHDB_VERSION) COUCHDB_GIT_SHA=$(COUCHDB_GIT_SHA) $(REBAR) -r ct $(EUNIT_OPTS) apps=$$dir || exit 1; \ + done .PHONY: exunit # target: exunit - Run ExUnit tests diff --git a/src/couch/src/couch_bt_engine.erl b/src/couch/src/couch_bt_engine.erl index 8c1a2756d..d93071c1e 100644 --- a/src/couch/src/couch_bt_engine.erl +++ b/src/couch/src/couch_bt_engine.erl @@ -673,7 +673,6 @@ raft_discard(#st{} = St, UpTo) -> needs_commit = true }}. - raft_last(#st{} = St) -> {ok, {_First, Last}} = couch_btree:full_reduce(St#st.raft_tree), Last. @@ -852,7 +851,6 @@ raft_tree_split({Index, Term, Value}) -> raft_tree_join(Index, {Term, Value}) -> {Index, Term, Value}. - raft_tree_reduce(reduce, []) -> {{0, 0}, {0, 0}}; raft_tree_reduce(reduce, Entries) -> diff --git a/src/couch/src/couch_raft.erl b/src/couch/src/couch_raft.erl index f398b4f2a..fda19cc22 100644 --- a/src/couch/src/couch_raft.erl +++ b/src/couch/src/couch_raft.erl @@ -26,8 +26,8 @@ % public api -export([ - start/2, - start_link/2, + start/3, + start_link/3, stop/1, call/2 ]). @@ -42,28 +42,23 @@ %% public api -start(Name, Cohort) -> - gen_statem:start({local, Name}, ?MODULE, new(Name, Cohort), []). +start(Name, StoreModule, StoreState) -> + gen_statem:start({local, Name}, ?MODULE, new(Name, StoreModule, StoreState), []). -start_link(Name, Cohort) -> - gen_statem:start_link({local, Name}, ?MODULE, new(Name, Cohort), []). +start_link(Name, StoreModule, StoreState) -> + gen_statem:start_link({local, Name}, ?MODULE, new(Name, StoreModule, StoreState), []). -new(Name, Cohort) -> +new(Name, StoreModule, StoreState) -> + #{cohort := Cohort} = StoreState, Peers = peers(Cohort), - #{ + maps:merge(#{ name => Name, - cohort => Cohort, - term => 0, - votedFor => undefined, + store_module => StoreModule, votesGranted => #{}, nextIndex => maps:from_list([{Peer, 1} || Peer <- Peers]), matchIndex => maps:from_list([{Peer, 0} || Peer <- Peers]), - log => couch_raft_log:new(), - commitIndex => 0, - froms => #{}, - lastApplied => 0, - machine => <<0>> - }. + froms => #{} + }, StoreState). stop(ServerRef) -> gen_statem:stop(ServerRef). @@ -80,25 +75,26 @@ callback_mode() -> %% erlfmt-ignore handle_event(cast, #{term := FutureTerm} = Msg, _State, #{term := CurrentTerm} = Data) when FutureTerm > CurrentTerm -> couch_log:notice("~p received message from future term ~B, moving to that term, becoming follower and clearing votedFor", [node(), FutureTerm]), - {next_state, follower, Data#{term => FutureTerm, votedFor => undefined}, {next_event, cast, Msg}}; + persist({next_state, follower, Data#{term => FutureTerm, votedFor => undefined}, {next_event, cast, Msg}}); handle_event(enter, _OldState, follower, Data) -> #{term := Term, froms := Froms} = Data, couch_log:notice("~p became follower in term ~B", [node(), Term]), Replies = [{reply, From, {error, deposed}} || From <- maps:values(Froms)], - {keep_state, Data#{votedFor => undefined, froms => #{}}, [restart_election_timeout() | Replies]}; + persist({keep_state, Data#{votedFor => undefined, froms => #{}}, [restart_election_timeout() | Replies]}); handle_event(enter, _OldState, candidate, Data) -> #{term := Term} = Data, couch_log:notice("~p became candidate in term ~B", [node(), Term]), - {keep_state, start_election(Data), restart_election_timeout()}; + persist({keep_state, start_election(Data), restart_election_timeout()}); handle_event(enter, _OldState, leader, Data) -> - #{log := Log, cohort := Cohort, term := Term} = Data, + #{store_module := StoreModule, cohort := Cohort, term := Term} = Data, couch_log:notice("~p became leader in term ~B", [node(), Term]), Peers = peers(Cohort), + {LastIndex, _} = StoreModule:last(Data), {keep_state, Data#{ - nextIndex => maps:from_list([{Peer, couch_raft_log:index(couch_raft_log:last(Log)) + 1} || Peer <- Peers]), + nextIndex => maps:from_list([{Peer, LastIndex + 1} || Peer <- Peers]), matchIndex => maps:from_list([{Peer, 0} || Peer <- Peers]) }, restart_heartbeat_timeout()}; @@ -110,10 +106,11 @@ handle_event(cast, #{type := 'RequestVoteRequest', term := Term} = Msg, _State, lastLogTerm := MLastLogTerm } = Msg, #{ - log := Log, + store_module := StoreModule, votedFor := VotedFor } = Data, - LogOk = MLastLogTerm > couch_raft_log:term(couch_raft_log:last(Log)) orelse (MLastLogTerm == couch_raft_log:term(couch_raft_log:last(Log)) andalso MLastLogIndex >= couch_raft_log:index(couch_raft_log:last(Log))), + {LastIndex, LastTerm} = StoreModule:last(Data), + LogOk = MLastLogTerm > LastTerm orelse (MLastLogTerm == LastTerm andalso MLastLogIndex >= LastIndex), Grant = Term == CurrentTerm andalso LogOk andalso (VotedFor == undefined orelse VotedFor == MSource), couch_log:notice("~p received RequestVoteRequest from ~p in term ~B when in term ~B (Grant:~p, LogOk:~p, VotedFor:~p)", [node(), MSource, Term, CurrentTerm, Grant, LogOk, VotedFor]), Reply = #{ @@ -125,7 +122,7 @@ handle_event(cast, #{type := 'RequestVoteRequest', term := Term} = Msg, _State, cast(MSource, Reply, Data), if Grant -> - {keep_state, Data#{votedFor => MSource}, restart_election_timeout()}; + persist({keep_state, Data#{votedFor => MSource}, restart_election_timeout()}); true -> {keep_state_and_data, restart_election_timeout()} end; @@ -158,9 +155,11 @@ handle_event(cast, #{type := 'AppendEntriesRequest', term := Term} = Msg, State, commitIndex := MCommitIndex } = Msg, #{ - log := Log + store_module := StoreModule } = Data, - LogOk = MPrevLogIndex == 0 orelse (MPrevLogIndex > 0 andalso MPrevLogIndex =< couch_raft_log:index(couch_raft_log:last(Log)) andalso MPrevLogTerm == couch_raft_log:term(couch_raft_log:nth(MPrevLogIndex,Log))), + {LastIndex, _LastTerm} = StoreModule:last(Data), + {NthTerm, _} = StoreModule:lookup(MPrevLogIndex, Data), + LogOk = MPrevLogIndex == 0 orelse (MPrevLogIndex > 0 andalso MPrevLogIndex =< LastIndex andalso MPrevLogTerm == NthTerm), if Term < CurrentTerm orelse (Term == CurrentTerm andalso State == follower andalso not LogOk) -> Reply = #{ @@ -194,11 +193,10 @@ handle_event(cast, #{type := 'AppendEntriesRequest', term := Term} = Msg, State, {keep_state, update_state_machine(Data#{commitIndex => MCommitIndex}), restart_election_timeout()}; true -> Index = MPrevLogIndex + 1, - LastLogIndex = couch_raft_log:index(couch_raft_log:last(Log)), if - LastLogIndex >= Index -> - NthLogTerm = couch_raft_log:term(couch_raft_log:nth(Index, Log)), - FirstEntryTerm = couch_raft_log:term(hd(MEntries)), + LastIndex >= Index -> + {NthLogTerm, _} = StoreModule:lookup(Index, Data), + {FirstEntryTerm, _} = hd(MEntries), if NthLogTerm == FirstEntryTerm -> Reply = #{ @@ -213,11 +211,21 @@ handle_event(cast, #{type := 'AppendEntriesRequest', term := Term} = Msg, State, {keep_state, update_state_machine(Data#{commitIndex => MCommitIndex}), restart_election_timeout()}; NthLogTerm /= FirstEntryTerm -> couch_log:notice("~p received conflicting entry:~p, deleting it", [node(), MEntries]), - {keep_state, Data#{log => lists:sublist(Log, LastLogIndex - 1)}, [{next_event, cast, Msg}, restart_election_timeout()]} + case StoreModule:truncate(LastIndex - 1, Data) of + {ok, NewData} -> + {keep_state, NewData, [{next_event, cast, Msg}, restart_election_timeout()]}; + {error, Reason} -> + {stop, Reason} + end end; - LastLogIndex == MPrevLogIndex -> + LastIndex == MPrevLogIndex -> couch_log:notice("~p received new entries:~p, appending it to log", [node(), MEntries]), - {keep_state, Data#{log => couch_raft_log:append(Log, MEntries)}, [{next_event, cast, Msg}, restart_election_timeout()]} + case StoreModule:append(MEntries, Data) of + {ok, _EntryIndex, NewData} -> + {keep_state, NewData, [{next_event, cast, Msg}, restart_election_timeout()]}; + {error, Reason} -> + {stop, Reason} + end end end end; @@ -245,10 +253,14 @@ handle_event(cast, #{type := 'AppendEntriesResponse', term := Term} = Msg, _Stat handle_event({call, From}, #{type := 'ClientRequest'} = Msg, leader, Data) -> #{value := Value} = Msg, - #{term := Term, log := Log, froms := Froms} = Data, - EntryIndex = couch_raft_log:index(couch_raft_log:last(Log)) + 1, - Entry = {EntryIndex, Term, Value}, - {keep_state, Data#{log => couch_raft_log:append(Log, [Entry]), froms => Froms#{EntryIndex => From}}}; + #{term := Term, store_module := StoreModule, froms := Froms} = Data, + Entry = {Term, Value}, + case StoreModule:append([Entry], Data) of + {ok, EntryIndex, NewData} -> + {keep_state, NewData#{froms => Froms#{EntryIndex => From}}}; + {error, Reason} -> + {stop_and_reply, Reason, {reply, From, {error, Reason}}} + end; handle_event({call, From}, #{type := 'ClientRequest'}, _State, _Data) -> {keep_state_and_data, {reply, From, {error, not_leader}}}; @@ -256,7 +268,7 @@ handle_event({call, From}, #{type := 'ClientRequest'}, _State, _Data) -> handle_event(state_timeout, new_election, State, Data) when State == follower; State == candidate -> #{term := Term} = Data, couch_log:notice("~p election timeout in state ~p, term ~B", [node(), State, Term]), - {next_state, candidate, start_election(Data), restart_election_timeout()}; + persist({next_state, candidate, start_election(Data), restart_election_timeout()}); handle_event(state_timeout, heartbeat, leader, Data) -> #{term := Term} = Data, @@ -267,18 +279,22 @@ handle_event(state_timeout, heartbeat, leader, Data) -> handle_event(EventType, EventContent, State, Data) -> {stop, {unknown_event, EventType, EventContent, State, Data}}. - send_append_entries(#{cohort := Cohort} = Data) -> send_append_entries(peers(Cohort), Data). send_append_entries([], _Data) -> ok; send_append_entries([Peer | Rest], Data) -> - #{term := Term, nextIndex := NextIndex, log := Log, commitIndex := CommitIndex} = Data, + #{term := Term, nextIndex := NextIndex, store_module := StoreModule, commitIndex := CommitIndex} = Data, PrevLogIndex = maps:get(Peer, NextIndex) - 1, - PrevLogTerm = if PrevLogIndex > 0 -> couch_raft_log:term(couch_raft_log:nth(PrevLogIndex, Log)); true -> 0 end, - LastEntry = min(couch_raft_log:index(couch_raft_log:last(Log)), PrevLogIndex + 2), - Entries = couch_raft_log:sublist(Log, PrevLogIndex + 1, ?BATCH_SIZE), + PrevLogTerm = + if + PrevLogIndex > 0 -> {NthTerm, _} = StoreModule:lookup(PrevLogIndex, Data), NthTerm; + true -> 0 + end, + {LastIndex, _} = StoreModule:last(Data), + LastEntry = min(LastIndex, PrevLogIndex + 2), + Entries = StoreModule:range(PrevLogIndex + 1, ?BATCH_SIZE, Data), Msg = #{ type => 'AppendEntriesRequest', term => Term, @@ -292,9 +308,9 @@ send_append_entries([Peer | Rest], Data) -> send_append_entries(Rest, Data). advance_commit_index(Data) -> - #{matchIndex := MatchIndex, log := Log, cohort := Cohort, term := Term} = Data, - LastTerm = couch_raft_log:term(couch_raft_log:last(Log)), - LastIndexes = lists:sort([couch_raft_log:index(couch_raft_log:last(Log)) | maps:values(MatchIndex)]), + #{matchIndex := MatchIndex, store_module := StoreModule, cohort := Cohort, term := Term} = Data, + {LastIndex, LastTerm} = StoreModule:last(Data), + LastIndexes = lists:sort([LastIndex | maps:values(MatchIndex)]), NewCommitIndex = lists:nth(length(Cohort) div 2 + 1, LastIndexes), if LastTerm == Term -> @@ -305,33 +321,37 @@ advance_commit_index(Data) -> update_state_machine(#{lastApplied := Same, commitIndex := Same} = Data) -> Data; -update_state_machine(#{lastApplied := LastApplied, commitIndex := CommitIndex} = Data) when LastApplied < CommitIndex -> - #{log := Log, froms := Froms0, machine := Machine0} = Data, +update_state_machine(#{lastApplied := LastApplied, commitIndex := CommitIndex} = Data0) when + LastApplied < CommitIndex +-> + #{store_module := StoreModule, froms := Froms0} = Data0, From = LastApplied + 1, - To = min(couch_raft_log:index(couch_raft_log:last(Log)), CommitIndex), - Fun = fun(Index, {Froms, Machine}) -> - Value = couch_raft_log:value(couch_raft_log:nth(Index, Log)), - Result = crypto:hash(sha256, <<Machine/binary, Value/binary>>), + {LastIndex, _} = StoreModule:last(Data0), + To = min(LastIndex, CommitIndex), + Fun = fun(Index, {Froms, Data}) -> + {_, Value} = StoreModule:lookup(Index, Data), + {Result, NewData} = StoreModule:apply(Value, Data), case maps:is_key(Index, Froms) of true -> gen_statem:reply(maps:get(Index, Froms), Result), - {maps:remove(Index, Froms), Result}; + {maps:remove(Index, Froms), NewData}; false -> - {Froms, Result} + {Froms, NewData} end end, - {Froms1, Machine1} = lists:foldl(Fun, {Froms0, Machine0}, lists:seq(From, To)), - Data#{froms => Froms1, machine => Machine1, lastApplied => To}. + {Froms1, Data1} = lists:foldl(Fun, {Froms0, Data0}, lists:seq(From, To)), + Data1#{froms => Froms1, lastApplied => To}. start_election(Data) -> - #{term := Term, cohort := Cohort, log := Log} = Data, + #{term := Term, cohort := Cohort, store_module := StoreModule} = Data, ElectionTerm = Term + 1, couch_log:notice("~p starting election in term ~B", [node(), ElectionTerm]), + {LastLogIndex, LastLogTerm} = StoreModule:last(Data), RequestVote = #{ type => 'RequestVoteRequest', term => ElectionTerm, - lastLogIndex => couch_raft_log:index(couch_raft_log:last(Log)), - lastLogTerm => couch_raft_log:term(couch_raft_log:last(Log)), + lastLogIndex => LastLogIndex, + lastLogTerm => LastLogTerm, source => node() }, lists:foreach(fun(Peer) -> cast(Peer, RequestVote, Data) end, peers(Cohort)), @@ -348,3 +368,17 @@ restart_heartbeat_timeout() -> peers(Cohort) -> Cohort -- [node()]. + +persist({next_state, _NextState, NewData, _Actions} = HandleEventResult) -> + persist(NewData, HandleEventResult); +persist({keep_state, NewData, _Actions} = HandleEventResult) -> + persist(NewData, HandleEventResult). + +persist(Data, HandleEventResult) -> + #{store_module := StoreModule} = Data, + case StoreModule:save_state(Data) of + ok -> + HandleEventResult; + {error, Reason} -> + {stop, Reason} + end. diff --git a/src/couch/src/couch_raft_log.erl b/src/couch/src/couch_raft_log.erl deleted file mode 100644 index 987212457..000000000 --- a/src/couch/src/couch_raft_log.erl +++ /dev/null @@ -1,52 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - -%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*- - --module(couch_raft_log). - --export([ - new/0, - append/2, - sublist/3, - nth/2, - last/1, - index/1, - term/1, - value/1 -]). - -new() -> - []. - -append(Log, Items) -> - lists:append(Log, Items). - -sublist(Log, Start, Len) -> - lists:sublist(Log, Start, Len). - -nth(N, Log) -> - lists:nth(N, Log). - -last([]) -> - {0, 0, undefined}; -last(Log) -> - lists:last(Log). - -index(Entry) -> - element(1, Entry). - -term(Entry) -> - element(2, Entry). - -value(Entry) -> - element(3, Entry). diff --git a/src/couch/src/couch_raft_store.erl b/src/couch/src/couch_raft_store.erl new file mode 100644 index 000000000..81ebe684e --- /dev/null +++ b/src/couch/src/couch_raft_store.erl @@ -0,0 +1,35 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*- + +-module(couch_raft_store). + +-callback init(Args :: term()) -> {ok, State :: #{}} | {stop, Reason :: term()}. + +% raft state callbacks + +-callback save_state(State :: #{}) -> ok | {error, Reason :: term()}. + +%% log callbacks +-type log_entry() :: {Term :: non_neg_integer(), Value :: term()}. +-callback last(State :: #{}) -> {Index :: non_neg_integer(), Term :: non_neg_integer()}. +-callback lookup(N :: non_neg_integer(), State :: #{}) -> log_entry() | not_found. +-callback range(Start :: non_neg_integer(), Len :: non_neg_integer(), State :: #{}) -> [log_entry() | not_found]. +-callback append(Entries :: [log_entry()], State :: #{}) -> + {ok, Index :: non_neg_integer(), NewState :: #{}} | {error, Reason :: term()}. +-callback truncate(To :: non_neg_integer(), State :: #{}) -> {ok, NewState :: #{}} | {error, Reason :: term()}. +-callback discard(UpTo :: non_neg_integer(), State :: #{}) -> + {ok, NewState :: #{}} | {error, Reason :: term()}. + +%% state machine callbacks +-callback apply(Args :: term(), State :: #{}) -> {Result :: term(), NewState :: #{}}. diff --git a/src/couch/src/couch_raft_store_sha256.erl b/src/couch/src/couch_raft_store_sha256.erl new file mode 100644 index 000000000..e313da3e2 --- /dev/null +++ b/src/couch/src/couch_raft_store_sha256.erl @@ -0,0 +1,80 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*- + +%% a non-persistent implementation of the raft_log_store behaviour for testing purposes. + +-module(couch_raft_store_sha256). +-behaviour(couch_raft_store). + +-export([ + init/1, + save_state/1, + %% log + last/1, + lookup/2, + range/3, + append/2, + truncate/2, + discard/2, + %% state machine + apply/2 +]). + +init(Cohort) -> + {ok, #{ + cohort => Cohort, + commitIndex => 0, + lastApplied => 0, + log => [], + machine => <<0>>, + term => 0, + votedFor => undefined + }}. + +% raft state callbacks + +save_state(#{} = State) -> + _WouldPersist = maps:with([cohort, term, votedFor, lastApplied, machine], State), + ok. + +%% log callbacks +last(#{log := []}) -> + {0, 0}; +last(#{log := Log}) -> + {LastTerm, _} = lists:last(Log), + {length(Log), LastTerm}. + +lookup(0, #{}) -> + {0, 0}; +lookup(N, #{log := Log}) when N > 0 -> + lists:nth(N, Log). + +range(Start, Len, #{log := Log}) when Start > 0, Len > 0 -> + lists:sublist(Log, Start, Len). + +append(Entries, #{log := Log} = State) when is_list(Entries) -> + NewLog = lists:append(Log, Entries), + {ok, length(NewLog), State#{log => NewLog}}. + +truncate(To, #{log := Log} = State) -> + {ok, State#{log => lists:sublist(Log, To)}}. + +discard(_UpTo, #{}) -> + {error, not_implemented}. + +%% state machine callbacks + +apply(Bin, #{machine := Machine0} = State) when is_binary(Bin), is_binary(Machine0) -> + Machine1 = crypto:hash(sha256, <<Machine0/binary, Bin/binary>>), + {Machine1, State#{machine => Machine1}}. diff --git a/src/couch/test/eunit/couch_raft_SUITE.erl b/src/couch/test/couch_raft_SUITE.erl index 1c3f8ebc2..42e1f4ab3 100644 --- a/src/couch/test/eunit/couch_raft_SUITE.erl +++ b/src/couch/test/couch_raft_SUITE.erl @@ -27,41 +27,55 @@ all() -> three_nodes(Config) when is_list(Config) -> N = 3, - Args = ["-pa", filename:dirname(code:which(craft))], + Args = [ + "-pa", filename:dirname(code:which(couch_raft)), + "-pa", filename:dirname(code:which(couch_log)), + "-pa", filename:dirname(code:which(couch_stats)) + ], Peers = [?CT_PEER(#{wait_boot => {self(), tag}, args => Args}) || _ <- lists:seq(1, N)], - Cohort = [receive {tag, {started, Node, Peer}} -> Node end || {ok, Peer} <- Peers], + Cohort = [ + receive + {tag, {started, Node, Peer}} -> Node + end + || {ok, Peer} <- Peers + ], - Crafts = [erpc:call(Node, craft3, start, [foo, Cohort]) || Node <- Cohort], + {ok, InitialState} = couch_raft_store_sha256:init(Cohort), + Crafts = [erpc:call(Node, couch_raft, start, [foo, couch_raft_store_sha256, InitialState]) || Node <- Cohort], % wait for leader election timer:sleep(500), % verify only one leader elected - [{leader, FirstLeader}] = lists:filter(fun({State, _Pid}) -> State == leader end, - [{element(1, sys:get_state(Pid)), Pid} || {ok, Pid} <- Crafts]), + [{leader, FirstLeader}] = lists:filter( + fun({State, _Pid}) -> State == leader end, + [{element(1, sys:get_state(Pid)), Pid} || {ok, Pid} <- Crafts] + ), % make a series of calls Hash1 = crypto:hash(sha256, <<0, 1>>), - ?assertEqual(Hash1, craft3:call(FirstLeader, <<1>>)), + ?assertEqual(Hash1, couch_raft:call(FirstLeader, <<1>>)), Hash2 = crypto:hash(sha256, <<Hash1/binary, 2>>), - ?assertEqual(Hash2, craft3:call(FirstLeader, <<2>>)), + ?assertEqual(Hash2, couch_raft:call(FirstLeader, <<2>>)), Hash3 = crypto:hash(sha256, <<Hash2/binary, 3>>), - ?assertEqual(Hash3, craft3:call(FirstLeader, <<3>>)), + ?assertEqual(Hash3, couch_raft:call(FirstLeader, <<3>>)), % force a re-election - craft3:stop(FirstLeader), + couch_raft:stop(FirstLeader), timer:sleep(500), % verify new leader elected - [{leader, SecondLeader}] = lists:filter(fun({State, _Pid}) -> State == leader end, - [{element(1, sys:get_state(Pid)), Pid} || {ok, Pid} <- Crafts, Pid /= FirstLeader]), + [{leader, SecondLeader}] = lists:filter( + fun({State, _Pid}) -> State == leader end, + [{element(1, sys:get_state(Pid)), Pid} || {ok, Pid} <- Crafts, Pid /= FirstLeader] + ), ?assertNotEqual(FirstLeader, SecondLeader), % make another call Hash4 = crypto:hash(sha256, <<Hash3/binary, 4>>), - ?assertEqual(Hash4, craft3:call(SecondLeader, <<4>>)), + ?assertEqual(Hash4, couch_raft:call(SecondLeader, <<4>>)), - [craft3:stop(Pid) || {ok, Pid} <- Crafts, Pid /= FirstLeader], + [couch_raft:stop(Pid) || {ok, Pid} <- Crafts, Pid /= FirstLeader], [peer:stop(Peer) || {ok, Peer} <- Peers]. |