summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Newson <rnewson@apache.org>2022-06-18 10:27:41 +0100
committerRobert Newson <rnewson@apache.org>2022-09-01 17:05:37 +0100
commitdb0ab57f7385a5d5fd133d991108c530626bb058 (patch)
tree9594db637f2fda51454f8ffcdf3185134e2ed076
parent9ce27998e78e2361886c2ac3e028946e3bb3b74f (diff)
downloadcouchdb-db0ab57f7385a5d5fd133d991108c530626bb058.tar.gz
introduce store abstraction (WIP)
-rw-r--r--Makefile10
-rw-r--r--src/couch/src/couch_bt_engine.erl2
-rw-r--r--src/couch/src/couch_raft.erl154
-rw-r--r--src/couch/src/couch_raft_log.erl52
-rw-r--r--src/couch/src/couch_raft_store.erl35
-rw-r--r--src/couch/src/couch_raft_store_sha256.erl80
-rw-r--r--src/couch/test/couch_raft_SUITE.erl (renamed from src/couch/test/eunit/couch_raft_SUITE.erl)40
7 files changed, 246 insertions, 127 deletions
diff --git a/Makefile b/Makefile
index 82c2b335b..e7a389469 100644
--- a/Makefile
+++ b/Makefile
@@ -176,6 +176,16 @@ eunit: couch
COUCHDB_VERSION=$(COUCHDB_VERSION) COUCHDB_GIT_SHA=$(COUCHDB_GIT_SHA) $(REBAR) -r eunit $(EUNIT_OPTS) apps=$$dir || exit 1; \
done
+.PHONY: ct
+ct: export BUILDDIR = $(shell pwd)
+ct: export ERL_AFLAGS = -config $(shell pwd)/rel/files/eunit.config
+ct: export COUCHDB_QUERY_SERVER_JAVASCRIPT = $(shell pwd)/bin/couchjs $(shell pwd)/share/server/main.js
+ct: export COUCHDB_TEST_ADMIN_PARTY_OVERRIDE=1
+ct: couch
+ @COUCHDB_VERSION=$(COUCHDB_VERSION) COUCHDB_GIT_SHA=$(COUCHDB_GIT_SHA) $(REBAR) setup_eunit 2> /dev/null
+ @for dir in $(subdirs); do \
+ COUCHDB_VERSION=$(COUCHDB_VERSION) COUCHDB_GIT_SHA=$(COUCHDB_GIT_SHA) $(REBAR) -r ct $(EUNIT_OPTS) apps=$$dir || exit 1; \
+ done
.PHONY: exunit
# target: exunit - Run ExUnit tests
diff --git a/src/couch/src/couch_bt_engine.erl b/src/couch/src/couch_bt_engine.erl
index 8c1a2756d..d93071c1e 100644
--- a/src/couch/src/couch_bt_engine.erl
+++ b/src/couch/src/couch_bt_engine.erl
@@ -673,7 +673,6 @@ raft_discard(#st{} = St, UpTo) ->
needs_commit = true
}}.
-
raft_last(#st{} = St) ->
{ok, {_First, Last}} = couch_btree:full_reduce(St#st.raft_tree),
Last.
@@ -852,7 +851,6 @@ raft_tree_split({Index, Term, Value}) ->
raft_tree_join(Index, {Term, Value}) ->
{Index, Term, Value}.
-
raft_tree_reduce(reduce, []) ->
{{0, 0}, {0, 0}};
raft_tree_reduce(reduce, Entries) ->
diff --git a/src/couch/src/couch_raft.erl b/src/couch/src/couch_raft.erl
index f398b4f2a..fda19cc22 100644
--- a/src/couch/src/couch_raft.erl
+++ b/src/couch/src/couch_raft.erl
@@ -26,8 +26,8 @@
% public api
-export([
- start/2,
- start_link/2,
+ start/3,
+ start_link/3,
stop/1,
call/2
]).
@@ -42,28 +42,23 @@
%% public api
-start(Name, Cohort) ->
- gen_statem:start({local, Name}, ?MODULE, new(Name, Cohort), []).
+start(Name, StoreModule, StoreState) ->
+ gen_statem:start({local, Name}, ?MODULE, new(Name, StoreModule, StoreState), []).
-start_link(Name, Cohort) ->
- gen_statem:start_link({local, Name}, ?MODULE, new(Name, Cohort), []).
+start_link(Name, StoreModule, StoreState) ->
+ gen_statem:start_link({local, Name}, ?MODULE, new(Name, StoreModule, StoreState), []).
-new(Name, Cohort) ->
+new(Name, StoreModule, StoreState) ->
+ #{cohort := Cohort} = StoreState,
Peers = peers(Cohort),
- #{
+ maps:merge(#{
name => Name,
- cohort => Cohort,
- term => 0,
- votedFor => undefined,
+ store_module => StoreModule,
votesGranted => #{},
nextIndex => maps:from_list([{Peer, 1} || Peer <- Peers]),
matchIndex => maps:from_list([{Peer, 0} || Peer <- Peers]),
- log => couch_raft_log:new(),
- commitIndex => 0,
- froms => #{},
- lastApplied => 0,
- machine => <<0>>
- }.
+ froms => #{}
+ }, StoreState).
stop(ServerRef) ->
gen_statem:stop(ServerRef).
@@ -80,25 +75,26 @@ callback_mode() ->
%% erlfmt-ignore
handle_event(cast, #{term := FutureTerm} = Msg, _State, #{term := CurrentTerm} = Data) when FutureTerm > CurrentTerm ->
couch_log:notice("~p received message from future term ~B, moving to that term, becoming follower and clearing votedFor", [node(), FutureTerm]),
- {next_state, follower, Data#{term => FutureTerm, votedFor => undefined}, {next_event, cast, Msg}};
+ persist({next_state, follower, Data#{term => FutureTerm, votedFor => undefined}, {next_event, cast, Msg}});
handle_event(enter, _OldState, follower, Data) ->
#{term := Term, froms := Froms} = Data,
couch_log:notice("~p became follower in term ~B", [node(), Term]),
Replies = [{reply, From, {error, deposed}} || From <- maps:values(Froms)],
- {keep_state, Data#{votedFor => undefined, froms => #{}}, [restart_election_timeout() | Replies]};
+ persist({keep_state, Data#{votedFor => undefined, froms => #{}}, [restart_election_timeout() | Replies]});
handle_event(enter, _OldState, candidate, Data) ->
#{term := Term} = Data,
couch_log:notice("~p became candidate in term ~B", [node(), Term]),
- {keep_state, start_election(Data), restart_election_timeout()};
+ persist({keep_state, start_election(Data), restart_election_timeout()});
handle_event(enter, _OldState, leader, Data) ->
- #{log := Log, cohort := Cohort, term := Term} = Data,
+ #{store_module := StoreModule, cohort := Cohort, term := Term} = Data,
couch_log:notice("~p became leader in term ~B", [node(), Term]),
Peers = peers(Cohort),
+ {LastIndex, _} = StoreModule:last(Data),
{keep_state, Data#{
- nextIndex => maps:from_list([{Peer, couch_raft_log:index(couch_raft_log:last(Log)) + 1} || Peer <- Peers]),
+ nextIndex => maps:from_list([{Peer, LastIndex + 1} || Peer <- Peers]),
matchIndex => maps:from_list([{Peer, 0} || Peer <- Peers])
}, restart_heartbeat_timeout()};
@@ -110,10 +106,11 @@ handle_event(cast, #{type := 'RequestVoteRequest', term := Term} = Msg, _State,
lastLogTerm := MLastLogTerm
} = Msg,
#{
- log := Log,
+ store_module := StoreModule,
votedFor := VotedFor
} = Data,
- LogOk = MLastLogTerm > couch_raft_log:term(couch_raft_log:last(Log)) orelse (MLastLogTerm == couch_raft_log:term(couch_raft_log:last(Log)) andalso MLastLogIndex >= couch_raft_log:index(couch_raft_log:last(Log))),
+ {LastIndex, LastTerm} = StoreModule:last(Data),
+ LogOk = MLastLogTerm > LastTerm orelse (MLastLogTerm == LastTerm andalso MLastLogIndex >= LastIndex),
Grant = Term == CurrentTerm andalso LogOk andalso (VotedFor == undefined orelse VotedFor == MSource),
couch_log:notice("~p received RequestVoteRequest from ~p in term ~B when in term ~B (Grant:~p, LogOk:~p, VotedFor:~p)", [node(), MSource, Term, CurrentTerm, Grant, LogOk, VotedFor]),
Reply = #{
@@ -125,7 +122,7 @@ handle_event(cast, #{type := 'RequestVoteRequest', term := Term} = Msg, _State,
cast(MSource, Reply, Data),
if
Grant ->
- {keep_state, Data#{votedFor => MSource}, restart_election_timeout()};
+ persist({keep_state, Data#{votedFor => MSource}, restart_election_timeout()});
true ->
{keep_state_and_data, restart_election_timeout()}
end;
@@ -158,9 +155,11 @@ handle_event(cast, #{type := 'AppendEntriesRequest', term := Term} = Msg, State,
commitIndex := MCommitIndex
} = Msg,
#{
- log := Log
+ store_module := StoreModule
} = Data,
- LogOk = MPrevLogIndex == 0 orelse (MPrevLogIndex > 0 andalso MPrevLogIndex =< couch_raft_log:index(couch_raft_log:last(Log)) andalso MPrevLogTerm == couch_raft_log:term(couch_raft_log:nth(MPrevLogIndex,Log))),
+ {LastIndex, _LastTerm} = StoreModule:last(Data),
+ {NthTerm, _} = StoreModule:lookup(MPrevLogIndex, Data),
+ LogOk = MPrevLogIndex == 0 orelse (MPrevLogIndex > 0 andalso MPrevLogIndex =< LastIndex andalso MPrevLogTerm == NthTerm),
if
Term < CurrentTerm orelse (Term == CurrentTerm andalso State == follower andalso not LogOk) ->
Reply = #{
@@ -194,11 +193,10 @@ handle_event(cast, #{type := 'AppendEntriesRequest', term := Term} = Msg, State,
{keep_state, update_state_machine(Data#{commitIndex => MCommitIndex}), restart_election_timeout()};
true ->
Index = MPrevLogIndex + 1,
- LastLogIndex = couch_raft_log:index(couch_raft_log:last(Log)),
if
- LastLogIndex >= Index ->
- NthLogTerm = couch_raft_log:term(couch_raft_log:nth(Index, Log)),
- FirstEntryTerm = couch_raft_log:term(hd(MEntries)),
+ LastIndex >= Index ->
+ {NthLogTerm, _} = StoreModule:lookup(Index, Data),
+ {FirstEntryTerm, _} = hd(MEntries),
if
NthLogTerm == FirstEntryTerm ->
Reply = #{
@@ -213,11 +211,21 @@ handle_event(cast, #{type := 'AppendEntriesRequest', term := Term} = Msg, State,
{keep_state, update_state_machine(Data#{commitIndex => MCommitIndex}), restart_election_timeout()};
NthLogTerm /= FirstEntryTerm ->
couch_log:notice("~p received conflicting entry:~p, deleting it", [node(), MEntries]),
- {keep_state, Data#{log => lists:sublist(Log, LastLogIndex - 1)}, [{next_event, cast, Msg}, restart_election_timeout()]}
+ case StoreModule:truncate(LastIndex - 1, Data) of
+ {ok, NewData} ->
+ {keep_state, NewData, [{next_event, cast, Msg}, restart_election_timeout()]};
+ {error, Reason} ->
+ {stop, Reason}
+ end
end;
- LastLogIndex == MPrevLogIndex ->
+ LastIndex == MPrevLogIndex ->
couch_log:notice("~p received new entries:~p, appending it to log", [node(), MEntries]),
- {keep_state, Data#{log => couch_raft_log:append(Log, MEntries)}, [{next_event, cast, Msg}, restart_election_timeout()]}
+ case StoreModule:append(MEntries, Data) of
+ {ok, _EntryIndex, NewData} ->
+ {keep_state, NewData, [{next_event, cast, Msg}, restart_election_timeout()]};
+ {error, Reason} ->
+ {stop, Reason}
+ end
end
end
end;
@@ -245,10 +253,14 @@ handle_event(cast, #{type := 'AppendEntriesResponse', term := Term} = Msg, _Stat
handle_event({call, From}, #{type := 'ClientRequest'} = Msg, leader, Data) ->
#{value := Value} = Msg,
- #{term := Term, log := Log, froms := Froms} = Data,
- EntryIndex = couch_raft_log:index(couch_raft_log:last(Log)) + 1,
- Entry = {EntryIndex, Term, Value},
- {keep_state, Data#{log => couch_raft_log:append(Log, [Entry]), froms => Froms#{EntryIndex => From}}};
+ #{term := Term, store_module := StoreModule, froms := Froms} = Data,
+ Entry = {Term, Value},
+ case StoreModule:append([Entry], Data) of
+ {ok, EntryIndex, NewData} ->
+ {keep_state, NewData#{froms => Froms#{EntryIndex => From}}};
+ {error, Reason} ->
+ {stop_and_reply, Reason, {reply, From, {error, Reason}}}
+ end;
handle_event({call, From}, #{type := 'ClientRequest'}, _State, _Data) ->
{keep_state_and_data, {reply, From, {error, not_leader}}};
@@ -256,7 +268,7 @@ handle_event({call, From}, #{type := 'ClientRequest'}, _State, _Data) ->
handle_event(state_timeout, new_election, State, Data) when State == follower; State == candidate ->
#{term := Term} = Data,
couch_log:notice("~p election timeout in state ~p, term ~B", [node(), State, Term]),
- {next_state, candidate, start_election(Data), restart_election_timeout()};
+ persist({next_state, candidate, start_election(Data), restart_election_timeout()});
handle_event(state_timeout, heartbeat, leader, Data) ->
#{term := Term} = Data,
@@ -267,18 +279,22 @@ handle_event(state_timeout, heartbeat, leader, Data) ->
handle_event(EventType, EventContent, State, Data) ->
{stop, {unknown_event, EventType, EventContent, State, Data}}.
-
send_append_entries(#{cohort := Cohort} = Data) ->
send_append_entries(peers(Cohort), Data).
send_append_entries([], _Data) ->
ok;
send_append_entries([Peer | Rest], Data) ->
- #{term := Term, nextIndex := NextIndex, log := Log, commitIndex := CommitIndex} = Data,
+ #{term := Term, nextIndex := NextIndex, store_module := StoreModule, commitIndex := CommitIndex} = Data,
PrevLogIndex = maps:get(Peer, NextIndex) - 1,
- PrevLogTerm = if PrevLogIndex > 0 -> couch_raft_log:term(couch_raft_log:nth(PrevLogIndex, Log)); true -> 0 end,
- LastEntry = min(couch_raft_log:index(couch_raft_log:last(Log)), PrevLogIndex + 2),
- Entries = couch_raft_log:sublist(Log, PrevLogIndex + 1, ?BATCH_SIZE),
+ PrevLogTerm =
+ if
+ PrevLogIndex > 0 -> {NthTerm, _} = StoreModule:lookup(PrevLogIndex, Data), NthTerm;
+ true -> 0
+ end,
+ {LastIndex, _} = StoreModule:last(Data),
+ LastEntry = min(LastIndex, PrevLogIndex + 2),
+ Entries = StoreModule:range(PrevLogIndex + 1, ?BATCH_SIZE, Data),
Msg = #{
type => 'AppendEntriesRequest',
term => Term,
@@ -292,9 +308,9 @@ send_append_entries([Peer | Rest], Data) ->
send_append_entries(Rest, Data).
advance_commit_index(Data) ->
- #{matchIndex := MatchIndex, log := Log, cohort := Cohort, term := Term} = Data,
- LastTerm = couch_raft_log:term(couch_raft_log:last(Log)),
- LastIndexes = lists:sort([couch_raft_log:index(couch_raft_log:last(Log)) | maps:values(MatchIndex)]),
+ #{matchIndex := MatchIndex, store_module := StoreModule, cohort := Cohort, term := Term} = Data,
+ {LastIndex, LastTerm} = StoreModule:last(Data),
+ LastIndexes = lists:sort([LastIndex | maps:values(MatchIndex)]),
NewCommitIndex = lists:nth(length(Cohort) div 2 + 1, LastIndexes),
if
LastTerm == Term ->
@@ -305,33 +321,37 @@ advance_commit_index(Data) ->
update_state_machine(#{lastApplied := Same, commitIndex := Same} = Data) ->
Data;
-update_state_machine(#{lastApplied := LastApplied, commitIndex := CommitIndex} = Data) when LastApplied < CommitIndex ->
- #{log := Log, froms := Froms0, machine := Machine0} = Data,
+update_state_machine(#{lastApplied := LastApplied, commitIndex := CommitIndex} = Data0) when
+ LastApplied < CommitIndex
+->
+ #{store_module := StoreModule, froms := Froms0} = Data0,
From = LastApplied + 1,
- To = min(couch_raft_log:index(couch_raft_log:last(Log)), CommitIndex),
- Fun = fun(Index, {Froms, Machine}) ->
- Value = couch_raft_log:value(couch_raft_log:nth(Index, Log)),
- Result = crypto:hash(sha256, <<Machine/binary, Value/binary>>),
+ {LastIndex, _} = StoreModule:last(Data0),
+ To = min(LastIndex, CommitIndex),
+ Fun = fun(Index, {Froms, Data}) ->
+ {_, Value} = StoreModule:lookup(Index, Data),
+ {Result, NewData} = StoreModule:apply(Value, Data),
case maps:is_key(Index, Froms) of
true ->
gen_statem:reply(maps:get(Index, Froms), Result),
- {maps:remove(Index, Froms), Result};
+ {maps:remove(Index, Froms), NewData};
false ->
- {Froms, Result}
+ {Froms, NewData}
end
end,
- {Froms1, Machine1} = lists:foldl(Fun, {Froms0, Machine0}, lists:seq(From, To)),
- Data#{froms => Froms1, machine => Machine1, lastApplied => To}.
+ {Froms1, Data1} = lists:foldl(Fun, {Froms0, Data0}, lists:seq(From, To)),
+ Data1#{froms => Froms1, lastApplied => To}.
start_election(Data) ->
- #{term := Term, cohort := Cohort, log := Log} = Data,
+ #{term := Term, cohort := Cohort, store_module := StoreModule} = Data,
ElectionTerm = Term + 1,
couch_log:notice("~p starting election in term ~B", [node(), ElectionTerm]),
+ {LastLogIndex, LastLogTerm} = StoreModule:last(Data),
RequestVote = #{
type => 'RequestVoteRequest',
term => ElectionTerm,
- lastLogIndex => couch_raft_log:index(couch_raft_log:last(Log)),
- lastLogTerm => couch_raft_log:term(couch_raft_log:last(Log)),
+ lastLogIndex => LastLogIndex,
+ lastLogTerm => LastLogTerm,
source => node()
},
lists:foreach(fun(Peer) -> cast(Peer, RequestVote, Data) end, peers(Cohort)),
@@ -348,3 +368,17 @@ restart_heartbeat_timeout() ->
peers(Cohort) ->
Cohort -- [node()].
+
+persist({next_state, _NextState, NewData, _Actions} = HandleEventResult) ->
+ persist(NewData, HandleEventResult);
+persist({keep_state, NewData, _Actions} = HandleEventResult) ->
+ persist(NewData, HandleEventResult).
+
+persist(Data, HandleEventResult) ->
+ #{store_module := StoreModule} = Data,
+ case StoreModule:save_state(Data) of
+ ok ->
+ HandleEventResult;
+ {error, Reason} ->
+ {stop, Reason}
+ end.
diff --git a/src/couch/src/couch_raft_log.erl b/src/couch/src/couch_raft_log.erl
deleted file mode 100644
index 987212457..000000000
--- a/src/couch/src/couch_raft_log.erl
+++ /dev/null
@@ -1,52 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
-%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
-
--module(couch_raft_log).
-
--export([
- new/0,
- append/2,
- sublist/3,
- nth/2,
- last/1,
- index/1,
- term/1,
- value/1
-]).
-
-new() ->
- [].
-
-append(Log, Items) ->
- lists:append(Log, Items).
-
-sublist(Log, Start, Len) ->
- lists:sublist(Log, Start, Len).
-
-nth(N, Log) ->
- lists:nth(N, Log).
-
-last([]) ->
- {0, 0, undefined};
-last(Log) ->
- lists:last(Log).
-
-index(Entry) ->
- element(1, Entry).
-
-term(Entry) ->
- element(2, Entry).
-
-value(Entry) ->
- element(3, Entry).
diff --git a/src/couch/src/couch_raft_store.erl b/src/couch/src/couch_raft_store.erl
new file mode 100644
index 000000000..81ebe684e
--- /dev/null
+++ b/src/couch/src/couch_raft_store.erl
@@ -0,0 +1,35 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
+
+-module(couch_raft_store).
+
+-callback init(Args :: term()) -> {ok, State :: #{}} | {stop, Reason :: term()}.
+
+% raft state callbacks
+
+-callback save_state(State :: #{}) -> ok | {error, Reason :: term()}.
+
+%% log callbacks
+-type log_entry() :: {Term :: non_neg_integer(), Value :: term()}.
+-callback last(State :: #{}) -> {Index :: non_neg_integer(), Term :: non_neg_integer()}.
+-callback lookup(N :: non_neg_integer(), State :: #{}) -> log_entry() | not_found.
+-callback range(Start :: non_neg_integer(), Len :: non_neg_integer(), State :: #{}) -> [log_entry() | not_found].
+-callback append(Entries :: [log_entry()], State :: #{}) ->
+ {ok, Index :: non_neg_integer(), NewState :: #{}} | {error, Reason :: term()}.
+-callback truncate(To :: non_neg_integer(), State :: #{}) -> {ok, NewState :: #{}} | {error, Reason :: term()}.
+-callback discard(UpTo :: non_neg_integer(), State :: #{}) ->
+ {ok, NewState :: #{}} | {error, Reason :: term()}.
+
+%% state machine callbacks
+-callback apply(Args :: term(), State :: #{}) -> {Result :: term(), NewState :: #{}}.
diff --git a/src/couch/src/couch_raft_store_sha256.erl b/src/couch/src/couch_raft_store_sha256.erl
new file mode 100644
index 000000000..e313da3e2
--- /dev/null
+++ b/src/couch/src/couch_raft_store_sha256.erl
@@ -0,0 +1,80 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
+
+%% a non-persistent implementation of the raft_log_store behaviour for testing purposes.
+
+-module(couch_raft_store_sha256).
+-behaviour(couch_raft_store).
+
+-export([
+ init/1,
+ save_state/1,
+ %% log
+ last/1,
+ lookup/2,
+ range/3,
+ append/2,
+ truncate/2,
+ discard/2,
+ %% state machine
+ apply/2
+]).
+
+init(Cohort) ->
+ {ok, #{
+ cohort => Cohort,
+ commitIndex => 0,
+ lastApplied => 0,
+ log => [],
+ machine => <<0>>,
+ term => 0,
+ votedFor => undefined
+ }}.
+
+% raft state callbacks
+
+save_state(#{} = State) ->
+ _WouldPersist = maps:with([cohort, term, votedFor, lastApplied, machine], State),
+ ok.
+
+%% log callbacks
+last(#{log := []}) ->
+ {0, 0};
+last(#{log := Log}) ->
+ {LastTerm, _} = lists:last(Log),
+ {length(Log), LastTerm}.
+
+lookup(0, #{}) ->
+ {0, 0};
+lookup(N, #{log := Log}) when N > 0 ->
+ lists:nth(N, Log).
+
+range(Start, Len, #{log := Log}) when Start > 0, Len > 0 ->
+ lists:sublist(Log, Start, Len).
+
+append(Entries, #{log := Log} = State) when is_list(Entries) ->
+ NewLog = lists:append(Log, Entries),
+ {ok, length(NewLog), State#{log => NewLog}}.
+
+truncate(To, #{log := Log} = State) ->
+ {ok, State#{log => lists:sublist(Log, To)}}.
+
+discard(_UpTo, #{}) ->
+ {error, not_implemented}.
+
+%% state machine callbacks
+
+apply(Bin, #{machine := Machine0} = State) when is_binary(Bin), is_binary(Machine0) ->
+ Machine1 = crypto:hash(sha256, <<Machine0/binary, Bin/binary>>),
+ {Machine1, State#{machine => Machine1}}.
diff --git a/src/couch/test/eunit/couch_raft_SUITE.erl b/src/couch/test/couch_raft_SUITE.erl
index 1c3f8ebc2..42e1f4ab3 100644
--- a/src/couch/test/eunit/couch_raft_SUITE.erl
+++ b/src/couch/test/couch_raft_SUITE.erl
@@ -27,41 +27,55 @@ all() ->
three_nodes(Config) when is_list(Config) ->
N = 3,
- Args = ["-pa", filename:dirname(code:which(craft))],
+ Args = [
+ "-pa", filename:dirname(code:which(couch_raft)),
+ "-pa", filename:dirname(code:which(couch_log)),
+ "-pa", filename:dirname(code:which(couch_stats))
+ ],
Peers = [?CT_PEER(#{wait_boot => {self(), tag}, args => Args}) || _ <- lists:seq(1, N)],
- Cohort = [receive {tag, {started, Node, Peer}} -> Node end || {ok, Peer} <- Peers],
+ Cohort = [
+ receive
+ {tag, {started, Node, Peer}} -> Node
+ end
+ || {ok, Peer} <- Peers
+ ],
- Crafts = [erpc:call(Node, craft3, start, [foo, Cohort]) || Node <- Cohort],
+ {ok, InitialState} = couch_raft_store_sha256:init(Cohort),
+ Crafts = [erpc:call(Node, couch_raft, start, [foo, couch_raft_store_sha256, InitialState]) || Node <- Cohort],
% wait for leader election
timer:sleep(500),
% verify only one leader elected
- [{leader, FirstLeader}] = lists:filter(fun({State, _Pid}) -> State == leader end,
- [{element(1, sys:get_state(Pid)), Pid} || {ok, Pid} <- Crafts]),
+ [{leader, FirstLeader}] = lists:filter(
+ fun({State, _Pid}) -> State == leader end,
+ [{element(1, sys:get_state(Pid)), Pid} || {ok, Pid} <- Crafts]
+ ),
% make a series of calls
Hash1 = crypto:hash(sha256, <<0, 1>>),
- ?assertEqual(Hash1, craft3:call(FirstLeader, <<1>>)),
+ ?assertEqual(Hash1, couch_raft:call(FirstLeader, <<1>>)),
Hash2 = crypto:hash(sha256, <<Hash1/binary, 2>>),
- ?assertEqual(Hash2, craft3:call(FirstLeader, <<2>>)),
+ ?assertEqual(Hash2, couch_raft:call(FirstLeader, <<2>>)),
Hash3 = crypto:hash(sha256, <<Hash2/binary, 3>>),
- ?assertEqual(Hash3, craft3:call(FirstLeader, <<3>>)),
+ ?assertEqual(Hash3, couch_raft:call(FirstLeader, <<3>>)),
% force a re-election
- craft3:stop(FirstLeader),
+ couch_raft:stop(FirstLeader),
timer:sleep(500),
% verify new leader elected
- [{leader, SecondLeader}] = lists:filter(fun({State, _Pid}) -> State == leader end,
- [{element(1, sys:get_state(Pid)), Pid} || {ok, Pid} <- Crafts, Pid /= FirstLeader]),
+ [{leader, SecondLeader}] = lists:filter(
+ fun({State, _Pid}) -> State == leader end,
+ [{element(1, sys:get_state(Pid)), Pid} || {ok, Pid} <- Crafts, Pid /= FirstLeader]
+ ),
?assertNotEqual(FirstLeader, SecondLeader),
% make another call
Hash4 = crypto:hash(sha256, <<Hash3/binary, 4>>),
- ?assertEqual(Hash4, craft3:call(SecondLeader, <<4>>)),
+ ?assertEqual(Hash4, couch_raft:call(SecondLeader, <<4>>)),
- [craft3:stop(Pid) || {ok, Pid} <- Crafts, Pid /= FirstLeader],
+ [couch_raft:stop(Pid) || {ok, Pid} <- Crafts, Pid /= FirstLeader],
[peer:stop(Peer) || {ok, Peer} <- Peers].