summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Newson <rnewson@apache.org>2022-05-14 20:28:05 +0100
committerRobert Newson <rnewson@apache.org>2022-06-13 11:34:25 +0100
commit490df65b0b17933c8222afe0c65f94bd0ebbc7d2 (patch)
treeba24df984b617dd046ee6b7c9e914837fa88dc7e
parent53adf72f48b958c0c85756c3e83d16e60657e5d2 (diff)
downloadcouchdb-raft.tar.gz
Integrate raft algorithm (WIP)raft
couch_raft.erl is a complete implementation of the raft algorithm but currently only manages an in-memory state machine and log. Preliminary work is also here to add a new btree inside the `.couch` files, which will be the real raft log. The intent is that log entries can be removed from this log and applied to by_id and by_seq trees atomically. raft log is preserved over compaction in the same manner as local docs, all entries are slurped into memory and written in one pass. This should be fine as the log should stay short, committed entries can be promptly removed. It's probably not fine for local docs, though... Anyway, it's progress and hopefully we're going somewhere cool.
-rw-r--r--src/couch/src/couch_bt_engine.erl95
-rw-r--r--src/couch/src/couch_bt_engine.hrl3
-rw-r--r--src/couch/src/couch_bt_engine_header.erl7
-rw-r--r--src/couch/src/couch_db.erl19
-rw-r--r--src/couch/src/couch_db_engine.erl27
-rw-r--r--src/couch/src/couch_db_updater.erl10
-rw-r--r--src/couch/src/couch_raft.erl350
-rw-r--r--src/couch/src/couch_raft_log.erl52
-rw-r--r--src/couch/test/eunit/couch_raft_SUITE.erl67
9 files changed, 621 insertions, 9 deletions
diff --git a/src/couch/src/couch_bt_engine.erl b/src/couch/src/couch_bt_engine.erl
index 486ed7cb0..8feb360d3 100644
--- a/src/couch/src/couch_bt_engine.erl
+++ b/src/couch/src/couch_bt_engine.erl
@@ -66,6 +66,11 @@
purge_docs/3,
copy_purge_infos/2,
+ raft_lookup/2,
+ raft_insert/2,
+ raft_discard/2,
+ raft_last/1,
+
commit_data/1,
open_write_stream/2,
@@ -102,7 +107,11 @@
purge_tree_join/2,
purge_tree_reduce/2,
purge_seq_tree_split/1,
- purge_seq_tree_join/2
+ purge_seq_tree_join/2,
+
+ raft_tree_split/1,
+ raft_tree_join/2,
+ raft_tree_reduce/2
]).
% Used by the compactor
@@ -631,6 +640,44 @@ count_changes_since(St, SinceSeq) ->
{ok, Changes} = couch_btree:fold_reduce(BTree, FoldFun, 0, Opts),
Changes.
+raft_insert(#st{} = St, Entries) when is_list(Entries) ->
+ #st{
+ raft_tree = RaftTree0
+ } = St,
+ {ok, RaftTree1} = couch_btree:add_remove(RaftTree0, Entries, []),
+ {ok, St#st{
+ raft_tree = RaftTree1,
+ needs_commit = true
+ }}.
+
+raft_lookup(#st{} = St, Indexes) ->
+ Results = couch_btree:lookup(St#st.raft_tree, Indexes),
+ lists:map(
+ fun
+ ({ok, Entry}) -> Entry;
+ (not_found) -> not_found
+ end,
+ Results
+ ).
+
+raft_discard(#st{} = St, UpTo) ->
+ #st{
+ raft_tree = RaftTree0
+ } = St,
+ {ok, {First, _Last}} = couch_btree:full_reduce(RaftTree0),
+ {FirstIndex, _FirstTerm} = First,
+ Remove = lists:seq(FirstIndex, UpTo),
+ {ok, RaftTree1} = couch_btree:add_remove(RaftTree0, [], Remove),
+ {ok, St#st{
+ raft_tree = RaftTree1,
+ needs_commit = true
+ }}.
+
+
+raft_last(#st{} = St) ->
+ {ok, {_First, Last}} = couch_btree:full_reduce(St#st.raft_tree),
+ Last.
+
start_compaction(St, DbName, Options, Parent) ->
Args = [St, DbName, Options, Parent],
Pid = spawn_link(couch_bt_engine_compactor, start, Args),
@@ -799,6 +846,23 @@ purge_tree_reduce(reduce, IdRevs) ->
purge_tree_reduce(rereduce, Reds) ->
lists:sum(Reds).
+raft_tree_split({Index, Term, Value}) ->
+ {Index, {Term, Value}}.
+
+raft_tree_join(Index, {Term, Value}) ->
+ {Index, Term, Value}.
+
+
+raft_tree_reduce(reduce, []) ->
+ {{0, 0}, {0, 0}};
+raft_tree_reduce(reduce, Entries) ->
+ {MinIndex, MinTerm, _} = lists:min(Entries),
+ {MaxIndex, MaxTerm, _} = lists:max(Entries),
+ {{MinIndex, MinTerm}, {MaxIndex, MaxTerm}};
+raft_tree_reduce(rereduce, Reds) ->
+ {Mins, Maxs} = lists:unzip(Reds),
+ {lists:min(Mins), lists:max(Maxs)}.
+
set_update_seq(#st{header = Header} = St, UpdateSeq) ->
{ok, St#st{
header = couch_bt_engine_header:set(Header, [
@@ -894,6 +958,13 @@ init_state(FilePath, Fd, Header0, Options) ->
{reduce, fun ?MODULE:purge_tree_reduce/2}
]),
+ RaftTreeState = couch_bt_engine_header:raft_tree_state(Header),
+ {ok, RaftTree} = couch_btree:open(RaftTreeState, Fd, [
+ {split, fun ?MODULE:raft_tree_split/1},
+ {join, fun ?MODULE:raft_tree_join/2},
+ {reduce, fun ?MODULE:raft_tree_reduce/2}
+ ]),
+
ok = couch_file:set_db_pid(Fd, self()),
St = #st{
@@ -907,7 +978,8 @@ init_state(FilePath, Fd, Header0, Options) ->
local_tree = LocalTree,
compression = Compression,
purge_tree = PurgeTree,
- purge_seq_tree = PurgeSeqTree
+ purge_seq_tree = PurgeSeqTree,
+ raft_tree = RaftTree
},
% If this is a new database we've just created a
@@ -927,7 +999,8 @@ update_header(St, Header) ->
{id_tree_state, couch_btree:get_state(St#st.id_tree)},
{local_tree_state, couch_btree:get_state(St#st.local_tree)},
{purge_tree_state, couch_btree:get_state(St#st.purge_tree)},
- {purge_seq_tree_state, couch_btree:get_state(St#st.purge_seq_tree)}
+ {purge_seq_tree_state, couch_btree:get_state(St#st.purge_seq_tree)},
+ {raft_tree_state, couch_btree:get_state(St#st.raft_tree)}
]).
increment_update_seq(#st{header = Header} = St) ->
@@ -1097,7 +1170,8 @@ active_size(#st{} = St, #size_info{} = SI) ->
St#st.seq_tree,
St#st.local_tree,
St#st.purge_tree,
- St#st.purge_seq_tree
+ St#st.purge_seq_tree,
+ St#st.raft_tree
],
lists:foldl(
fun(T, Acc) ->
@@ -1171,12 +1245,14 @@ fold_docs_reduce_to_count(Reds) ->
finish_compaction_int(#st{} = OldSt, #st{} = NewSt1) ->
#st{
filepath = FilePath,
- local_tree = OldLocal
+ local_tree = OldLocal,
+ raft_tree = OldRaft
} = OldSt,
#st{
filepath = CompactDataPath,
header = Header,
- local_tree = NewLocal1
+ local_tree = NewLocal1,
+ raft_tree = NewRaft1
} = NewSt1,
% suck up all the local docs into memory and write them to the new db
@@ -1186,13 +1262,18 @@ finish_compaction_int(#st{} = OldSt, #st{} = NewSt1) ->
{ok, _, LocalDocs} = couch_btree:foldl(OldLocal, LoadFun, []),
{ok, NewLocal2} = couch_btree:add(NewLocal1, LocalDocs),
+ % do the same for the raft log
+ {ok, _, RaftLog} = couch_btree:foldl(OldRaft, LoadFun, []),
+ {ok, NewRaft2} = couch_btree:add(NewRaft1, RaftLog),
+
{ok, NewSt2} = commit_data(NewSt1#st{
header = couch_bt_engine_header:set(Header, [
{compacted_seq, get_update_seq(OldSt)},
{revs_limit, get_revs_limit(OldSt)},
{purge_infos_limit, get_purge_infos_limit(OldSt)}
]),
- local_tree = NewLocal2
+ local_tree = NewLocal2,
+ raft_tree = NewRaft2
}),
% Rename our *.compact.data file to *.compact so that if we
diff --git a/src/couch/src/couch_bt_engine.hrl b/src/couch/src/couch_bt_engine.hrl
index e3c1d4983..0d347e99b 100644
--- a/src/couch/src/couch_bt_engine.hrl
+++ b/src/couch/src/couch_bt_engine.hrl
@@ -23,5 +23,6 @@
local_tree,
compression,
purge_tree,
- purge_seq_tree
+ purge_seq_tree,
+ raft_tree
}).
diff --git a/src/couch/src/couch_bt_engine_header.erl b/src/couch/src/couch_bt_engine_header.erl
index e28f07723..9e663b096 100644
--- a/src/couch/src/couch_bt_engine_header.erl
+++ b/src/couch/src/couch_bt_engine_header.erl
@@ -34,6 +34,7 @@
purge_tree_state/1,
purge_seq_tree_state/1,
purge_infos_limit/1,
+ raft_tree_state/1,
security_ptr/1,
revs_limit/1,
uuid/1,
@@ -69,7 +70,8 @@
epochs,
compacted_seq,
purge_infos_limit = 1000,
- props_ptr
+ props_ptr,
+ raft_tree_state = nil
}).
-define(PARTITION_DISK_VERSION, 8).
@@ -177,6 +179,9 @@ compacted_seq(Header) ->
purge_infos_limit(Header) ->
get_field(Header, purge_infos_limit).
+raft_tree_state(Header) ->
+ get_field(Header, raft_tree_state).
+
get_field(Header, Field) ->
get_field(Header, Field, undefined).
diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl
index 70ba1c2b9..274526acd 100644
--- a/src/couch/src/couch_db.erl
+++ b/src/couch/src/couch_db.erl
@@ -114,6 +114,11 @@
fold_purge_infos/4,
fold_purge_infos/5,
+ raft_insert/2,
+ raft_lookup/2,
+ raft_discard/2,
+ raft_last/1,
+
calculate_start_seq/3,
owner_of/2,
@@ -1813,6 +1818,20 @@ fold_purge_infos(Db, StartPurgeSeq, Fun, Acc) ->
fold_purge_infos(Db, StartPurgeSeq, UFun, UAcc, Opts) ->
couch_db_engine:fold_purge_infos(Db, StartPurgeSeq, UFun, UAcc, Opts).
+raft_insert(#db{main_pid = Pid} = Db, Entries) ->
+ check_is_admin(Db),
+ gen_server:call(Pid, {raft_insert, Entries}, infinity).
+
+raft_lookup(Db, Indexes) ->
+ couch_db_engine:raft_lookup(Db, Indexes).
+
+raft_discard(#db{main_pid = Pid} = Db, UpTo) ->
+ check_is_admin(Db),
+ gen_server:call(Pid, {raft_discard, UpTo}, infinity).
+
+raft_last(Db) ->
+ couch_db_engine:raft_last(Db).
+
count_changes_since(Db, SinceSeq) ->
couch_db_engine:count_changes_since(Db, SinceSeq).
diff --git a/src/couch/src/couch_db_engine.erl b/src/couch/src/couch_db_engine.erl
index 9e46b816b..2969df956 100644
--- a/src/couch/src/couch_db_engine.erl
+++ b/src/couch/src/couch_db_engine.erl
@@ -704,6 +704,11 @@
read_doc_body/2,
load_purge_infos/2,
+ raft_lookup/2,
+ raft_insert/2,
+ raft_discard/2,
+ raft_last/1,
+
serialize_doc/2,
write_doc_body/2,
write_doc_infos/3,
@@ -927,6 +932,28 @@ copy_purge_infos(#db{} = Db, Purges) ->
),
{ok, Db#db{engine = {Engine, NewSt}}}.
+raft_insert(#db{} = Db, Entries) ->
+ #db{engine = {Engine, EngineState}} = Db,
+ {ok, NewSt} = Engine:raft_insert(
+ EngineState, Entries
+ ),
+ {ok, Db#db{engine = {Engine, NewSt}}}.
+
+raft_lookup(#db{} = Db, Indexes) ->
+ #db{engine = {Engine, EngineState}} = Db,
+ Engine:raft_lookup(EngineState, Indexes).
+
+raft_discard(#db{} = Db, UpTo) ->
+ #db{engine = {Engine, EngineState}} = Db,
+ {ok, NewSt} = Engine:raft_discard(
+ EngineState, UpTo
+ ),
+ {ok, Db#db{engine = {Engine, NewSt}}}.
+
+raft_last(#db{} = Db) ->
+ #db{engine = {Engine, EngineState}} = Db,
+ Engine:raft_last(EngineState).
+
commit_data(#db{} = Db) ->
#db{engine = {Engine, EngineState}} = Db,
{ok, NewSt} = Engine:commit_data(EngineState),
diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl
index 17a1e9160..2a449e869 100644
--- a/src/couch/src/couch_db_updater.erl
+++ b/src/couch/src/couch_db_updater.erl
@@ -118,6 +118,16 @@ handle_call({purge_docs, PurgeReqs0, Options}, _From, Db) ->
end,
{ok, NewDb, Replies} = purge_docs(Db, PurgeReqs),
{reply, {ok, Replies}, NewDb, idle_limit()};
+handle_call({raft_insert, Entries}, _From, Db) ->
+ {ok, Db2} = couch_db_engine:raft_insert(Db, Entries),
+ Db3 = commit_data(Db2),
+ ok = couch_server:db_updated(Db3),
+ {reply, ok, Db3, idle_limit()};
+handle_call({raft_discard, UpTo}, _From, Db) ->
+ {ok, Db2} = couch_db_engine:raft_discard(Db, UpTo),
+ Db3 = commit_data(Db2),
+ ok = couch_server:db_updated(Db3),
+ {reply, ok, Db3, idle_limit()};
handle_call(Msg, From, Db) ->
case couch_db_engine:handle_db_updater_call(Msg, From, Db) of
{reply, Resp, NewDb} ->
diff --git a/src/couch/src/couch_raft.erl b/src/couch/src/couch_raft.erl
new file mode 100644
index 000000000..f398b4f2a
--- /dev/null
+++ b/src/couch/src/couch_raft.erl
@@ -0,0 +1,350 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
+
+-module(couch_raft).
+-behaviour(gen_statem).
+
+-define(ELECTION_DELAY, 150).
+-define(ELECTION_SPLAY, 150).
+-define(LEADER_HEARTBEAT, 75).
+-define(CLIENT_TIMEOUT, 5_000).
+
+% maximum number of entries to send in one go.
+-define(BATCH_SIZE, 10).
+
+% public api
+
+-export([
+ start/2,
+ start_link/2,
+ stop/1,
+ call/2
+]).
+
+% mandatory gen_statem callbacks
+
+-export([
+ init/1,
+ callback_mode/0,
+ handle_event/4
+]).
+
+%% public api
+
+start(Name, Cohort) ->
+ gen_statem:start({local, Name}, ?MODULE, new(Name, Cohort), []).
+
+start_link(Name, Cohort) ->
+ gen_statem:start_link({local, Name}, ?MODULE, new(Name, Cohort), []).
+
+new(Name, Cohort) ->
+ Peers = peers(Cohort),
+ #{
+ name => Name,
+ cohort => Cohort,
+ term => 0,
+ votedFor => undefined,
+ votesGranted => #{},
+ nextIndex => maps:from_list([{Peer, 1} || Peer <- Peers]),
+ matchIndex => maps:from_list([{Peer, 0} || Peer <- Peers]),
+ log => couch_raft_log:new(),
+ commitIndex => 0,
+ froms => #{},
+ lastApplied => 0,
+ machine => <<0>>
+ }.
+
+stop(ServerRef) ->
+ gen_statem:stop(ServerRef).
+
+call(ServerRef, Value) ->
+ gen_statem:call(ServerRef, #{type => 'ClientRequest', value => Value}, ?CLIENT_TIMEOUT).
+
+init(Data) ->
+ {ok, follower, Data}.
+
+callback_mode() ->
+ [handle_event_function, state_enter].
+
+%% erlfmt-ignore
+handle_event(cast, #{term := FutureTerm} = Msg, _State, #{term := CurrentTerm} = Data) when FutureTerm > CurrentTerm ->
+ couch_log:notice("~p received message from future term ~B, moving to that term, becoming follower and clearing votedFor", [node(), FutureTerm]),
+ {next_state, follower, Data#{term => FutureTerm, votedFor => undefined}, {next_event, cast, Msg}};
+
+handle_event(enter, _OldState, follower, Data) ->
+ #{term := Term, froms := Froms} = Data,
+ couch_log:notice("~p became follower in term ~B", [node(), Term]),
+ Replies = [{reply, From, {error, deposed}} || From <- maps:values(Froms)],
+ {keep_state, Data#{votedFor => undefined, froms => #{}}, [restart_election_timeout() | Replies]};
+
+handle_event(enter, _OldState, candidate, Data) ->
+ #{term := Term} = Data,
+ couch_log:notice("~p became candidate in term ~B", [node(), Term]),
+ {keep_state, start_election(Data), restart_election_timeout()};
+
+handle_event(enter, _OldState, leader, Data) ->
+ #{log := Log, cohort := Cohort, term := Term} = Data,
+ couch_log:notice("~p became leader in term ~B", [node(), Term]),
+ Peers = peers(Cohort),
+ {keep_state, Data#{
+ nextIndex => maps:from_list([{Peer, couch_raft_log:index(couch_raft_log:last(Log)) + 1} || Peer <- Peers]),
+ matchIndex => maps:from_list([{Peer, 0} || Peer <- Peers])
+ }, restart_heartbeat_timeout()};
+
+handle_event(cast, #{type := 'RequestVoteRequest', term := Term} = Msg, _State, #{term := CurrentTerm} = Data)
+ when Term =< CurrentTerm ->
+ #{
+ source := MSource,
+ lastLogIndex := MLastLogIndex,
+ lastLogTerm := MLastLogTerm
+ } = Msg,
+ #{
+ log := Log,
+ votedFor := VotedFor
+ } = Data,
+ LogOk = MLastLogTerm > couch_raft_log:term(couch_raft_log:last(Log)) orelse (MLastLogTerm == couch_raft_log:term(couch_raft_log:last(Log)) andalso MLastLogIndex >= couch_raft_log:index(couch_raft_log:last(Log))),
+ Grant = Term == CurrentTerm andalso LogOk andalso (VotedFor == undefined orelse VotedFor == MSource),
+ couch_log:notice("~p received RequestVoteRequest from ~p in term ~B when in term ~B (Grant:~p, LogOk:~p, VotedFor:~p)", [node(), MSource, Term, CurrentTerm, Grant, LogOk, VotedFor]),
+ Reply = #{
+ type => 'RequestVoteResponse',
+ term => CurrentTerm,
+ voteGranted => Grant,
+ source => node()
+ },
+ cast(MSource, Reply, Data),
+ if
+ Grant ->
+ {keep_state, Data#{votedFor => MSource}, restart_election_timeout()};
+ true ->
+ {keep_state_and_data, restart_election_timeout()}
+ end;
+
+handle_event(cast, #{type := 'RequestVoteResponse', term := PastTerm}, _State, #{term := CurrentTerm}) when PastTerm < CurrentTerm ->
+ couch_log:notice("~p ignored RequestVoteResponse from past term ~B", [node(), PastTerm]),
+ keep_state_and_data;
+
+handle_event(cast, #{type := 'RequestVoteResponse', term := Term} = Msg, _State, #{term := Term} = Data) ->
+ #{source := MSource, voteGranted := MVoteGranted} = Msg,
+ #{cohort := Cohort, votesGranted := VotesGranted0} = Data,
+ VotesGranted1 = if MVoteGranted -> lists:usort([MSource | VotesGranted0]); true -> VotesGranted0 end,
+ couch_log:notice("~p received RequestVoteResponse from ~p in current term ~B (VotesGranted:~p)", [node(), MSource, Term, VotesGranted1]),
+ if
+ length(VotesGranted1) >= length(Cohort) div 2 + 1 ->
+ couch_log:notice("~p has enough votes to be leader in term ~B", [node(), Term]),
+ {next_state, leader, Data#{votesGranted => VotesGranted1}};
+ true ->
+ {keep_state, Data#{votesGranted => VotesGranted1}}
+ end;
+
+
+handle_event(cast, #{type := 'AppendEntriesRequest', term := Term} = Msg, State, #{term := CurrentTerm} = Data)
+ when Term =< CurrentTerm ->
+ #{
+ source := MSource,
+ prevLogIndex := MPrevLogIndex,
+ prevLogTerm := MPrevLogTerm,
+ entries := MEntries,
+ commitIndex := MCommitIndex
+ } = Msg,
+ #{
+ log := Log
+ } = Data,
+ LogOk = MPrevLogIndex == 0 orelse (MPrevLogIndex > 0 andalso MPrevLogIndex =< couch_raft_log:index(couch_raft_log:last(Log)) andalso MPrevLogTerm == couch_raft_log:term(couch_raft_log:nth(MPrevLogIndex,Log))),
+ if
+ Term < CurrentTerm orelse (Term == CurrentTerm andalso State == follower andalso not LogOk) ->
+ Reply = #{
+ type => 'AppendEntriesResponse',
+ term => CurrentTerm,
+ success => false,
+ matchIndex => 0,
+ source => node()
+ },
+ cast(MSource, Reply, Data),
+ if
+ State == leader ->
+ keep_state_and_data;
+ true ->
+ {keep_state_and_data, restart_election_timeout()}
+ end;
+ Term == CurrentTerm andalso State == candidate ->
+ {next_state, follower, Data, {next_event, cast, Msg}};
+ Term == CurrentTerm andalso State == follower andalso LogOk ->
+ if
+ MEntries == [] ->
+ Reply = #{
+ type => 'AppendEntriesResponse',
+ term => CurrentTerm,
+ success => true,
+ matchIndex => MPrevLogIndex,
+ source => node()
+ },
+ couch_log:debug("~p received heartbeat and everything matches, sending matchIndex:~p", [node(), MPrevLogIndex]),
+ cast(MSource, Reply, Data),
+ {keep_state, update_state_machine(Data#{commitIndex => MCommitIndex}), restart_election_timeout()};
+ true ->
+ Index = MPrevLogIndex + 1,
+ LastLogIndex = couch_raft_log:index(couch_raft_log:last(Log)),
+ if
+ LastLogIndex >= Index ->
+ NthLogTerm = couch_raft_log:term(couch_raft_log:nth(Index, Log)),
+ FirstEntryTerm = couch_raft_log:term(hd(MEntries)),
+ if
+ NthLogTerm == FirstEntryTerm ->
+ Reply = #{
+ type => 'AppendEntriesResponse',
+ term => CurrentTerm,
+ success => true,
+ matchIndex => MPrevLogIndex + length(MEntries),
+ source => node()
+ },
+ couch_log:notice("~p received entry:~p that's already applied, sending matchIndex:~p", [node(), MEntries, MPrevLogIndex + length(MEntries)]),
+ cast(MSource, Reply, Data),
+ {keep_state, update_state_machine(Data#{commitIndex => MCommitIndex}), restart_election_timeout()};
+ NthLogTerm /= FirstEntryTerm ->
+ couch_log:notice("~p received conflicting entry:~p, deleting it", [node(), MEntries]),
+ {keep_state, Data#{log => lists:sublist(Log, LastLogIndex - 1)}, [{next_event, cast, Msg}, restart_election_timeout()]}
+ end;
+ LastLogIndex == MPrevLogIndex ->
+ couch_log:notice("~p received new entries:~p, appending it to log", [node(), MEntries]),
+ {keep_state, Data#{log => couch_raft_log:append(Log, MEntries)}, [{next_event, cast, Msg}, restart_election_timeout()]}
+ end
+ end
+ end;
+
+handle_event(cast, #{type := 'AppendEntriesResponse', term := PastTerm}, _State, #{term := CurrentTerm}) when PastTerm < CurrentTerm ->
+ couch_log:notice("~p ignored AppendEntriesResponse from past term ~B", [node(), PastTerm]),
+ keep_state_and_data;
+
+handle_event(cast, #{type := 'AppendEntriesResponse', term := Term} = Msg, _State, #{term := Term} = Data) ->
+ #{success := MSuccess, matchIndex := MMatchIndex, source := MSource} = Msg,
+ #{nextIndex := NextIndex, matchIndex := MatchIndex} = Data,
+ couch_log:debug("~p received AppendEntriesResponse from ~p in current term ~B (Success:~p)", [node(), MSource, Term, MSuccess]),
+ SourceNextIndex = maps:get(MSource, NextIndex),
+ if
+ MSuccess ->
+ {keep_state, Data#{
+ nextIndex => NextIndex#{MSource => MMatchIndex + 1},
+ matchIndex => MatchIndex#{MSource => MMatchIndex}
+ }};
+ true ->
+ {keep_state, Data#{
+ nextIndex => NextIndex#{MSource => max(SourceNextIndex - 1, 1)}
+ }}
+ end;
+
+handle_event({call, From}, #{type := 'ClientRequest'} = Msg, leader, Data) ->
+ #{value := Value} = Msg,
+ #{term := Term, log := Log, froms := Froms} = Data,
+ EntryIndex = couch_raft_log:index(couch_raft_log:last(Log)) + 1,
+ Entry = {EntryIndex, Term, Value},
+ {keep_state, Data#{log => couch_raft_log:append(Log, [Entry]), froms => Froms#{EntryIndex => From}}};
+
+handle_event({call, From}, #{type := 'ClientRequest'}, _State, _Data) ->
+ {keep_state_and_data, {reply, From, {error, not_leader}}};
+
+handle_event(state_timeout, new_election, State, Data) when State == follower; State == candidate ->
+ #{term := Term} = Data,
+ couch_log:notice("~p election timeout in state ~p, term ~B", [node(), State, Term]),
+ {next_state, candidate, start_election(Data), restart_election_timeout()};
+
+handle_event(state_timeout, heartbeat, leader, Data) ->
+ #{term := Term} = Data,
+ couch_log:debug("~p leader sending a heartbeat in term ~B", [node(), Term]),
+ ok = send_append_entries(Data),
+ {keep_state, advance_commit_index(Data), restart_heartbeat_timeout()};
+
+handle_event(EventType, EventContent, State, Data) ->
+ {stop, {unknown_event, EventType, EventContent, State, Data}}.
+
+
+send_append_entries(#{cohort := Cohort} = Data) ->
+ send_append_entries(peers(Cohort), Data).
+
+send_append_entries([], _Data) ->
+ ok;
+send_append_entries([Peer | Rest], Data) ->
+ #{term := Term, nextIndex := NextIndex, log := Log, commitIndex := CommitIndex} = Data,
+ PrevLogIndex = maps:get(Peer, NextIndex) - 1,
+ PrevLogTerm = if PrevLogIndex > 0 -> couch_raft_log:term(couch_raft_log:nth(PrevLogIndex, Log)); true -> 0 end,
+ LastEntry = min(couch_raft_log:index(couch_raft_log:last(Log)), PrevLogIndex + 2),
+ Entries = couch_raft_log:sublist(Log, PrevLogIndex + 1, ?BATCH_SIZE),
+ Msg = #{
+ type => 'AppendEntriesRequest',
+ term => Term,
+ source => node(),
+ prevLogIndex => PrevLogIndex,
+ prevLogTerm => PrevLogTerm,
+ entries => Entries,
+ commitIndex => min(CommitIndex, LastEntry)
+ },
+ cast(Peer, Msg, Data),
+ send_append_entries(Rest, Data).
+
+advance_commit_index(Data) ->
+ #{matchIndex := MatchIndex, log := Log, cohort := Cohort, term := Term} = Data,
+ LastTerm = couch_raft_log:term(couch_raft_log:last(Log)),
+ LastIndexes = lists:sort([couch_raft_log:index(couch_raft_log:last(Log)) | maps:values(MatchIndex)]),
+ NewCommitIndex = lists:nth(length(Cohort) div 2 + 1, LastIndexes),
+ if
+ LastTerm == Term ->
+ update_state_machine(Data#{commitIndex => NewCommitIndex});
+ true ->
+ Data
+ end.
+
+update_state_machine(#{lastApplied := Same, commitIndex := Same} = Data) ->
+ Data;
+update_state_machine(#{lastApplied := LastApplied, commitIndex := CommitIndex} = Data) when LastApplied < CommitIndex ->
+ #{log := Log, froms := Froms0, machine := Machine0} = Data,
+ From = LastApplied + 1,
+ To = min(couch_raft_log:index(couch_raft_log:last(Log)), CommitIndex),
+ Fun = fun(Index, {Froms, Machine}) ->
+ Value = couch_raft_log:value(couch_raft_log:nth(Index, Log)),
+ Result = crypto:hash(sha256, <<Machine/binary, Value/binary>>),
+ case maps:is_key(Index, Froms) of
+ true ->
+ gen_statem:reply(maps:get(Index, Froms), Result),
+ {maps:remove(Index, Froms), Result};
+ false ->
+ {Froms, Result}
+ end
+ end,
+ {Froms1, Machine1} = lists:foldl(Fun, {Froms0, Machine0}, lists:seq(From, To)),
+ Data#{froms => Froms1, machine => Machine1, lastApplied => To}.
+
+start_election(Data) ->
+ #{term := Term, cohort := Cohort, log := Log} = Data,
+ ElectionTerm = Term + 1,
+ couch_log:notice("~p starting election in term ~B", [node(), ElectionTerm]),
+ RequestVote = #{
+ type => 'RequestVoteRequest',
+ term => ElectionTerm,
+ lastLogIndex => couch_raft_log:index(couch_raft_log:last(Log)),
+ lastLogTerm => couch_raft_log:term(couch_raft_log:last(Log)),
+ source => node()
+ },
+ lists:foreach(fun(Peer) -> cast(Peer, RequestVote, Data) end, peers(Cohort)),
+ Data#{term => ElectionTerm, votedFor => node(), votesGranted => [node()]}.
+
+cast(Node, Msg, #{name := Name}) ->
+ gen_statem:cast({Name, Node}, Msg).
+
+restart_election_timeout() ->
+ {state_timeout, ?ELECTION_DELAY + rand:uniform(?ELECTION_SPLAY), new_election}.
+
+restart_heartbeat_timeout() ->
+ {state_timeout, ?LEADER_HEARTBEAT, heartbeat}.
+
+peers(Cohort) ->
+ Cohort -- [node()].
diff --git a/src/couch/src/couch_raft_log.erl b/src/couch/src/couch_raft_log.erl
new file mode 100644
index 000000000..987212457
--- /dev/null
+++ b/src/couch/src/couch_raft_log.erl
@@ -0,0 +1,52 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
+
+-module(couch_raft_log).
+
+-export([
+ new/0,
+ append/2,
+ sublist/3,
+ nth/2,
+ last/1,
+ index/1,
+ term/1,
+ value/1
+]).
+
+new() ->
+ [].
+
+append(Log, Items) ->
+ lists:append(Log, Items).
+
+sublist(Log, Start, Len) ->
+ lists:sublist(Log, Start, Len).
+
+nth(N, Log) ->
+ lists:nth(N, Log).
+
+last([]) ->
+ {0, 0, undefined};
+last(Log) ->
+ lists:last(Log).
+
+index(Entry) ->
+ element(1, Entry).
+
+term(Entry) ->
+ element(2, Entry).
+
+value(Entry) ->
+ element(3, Entry).
diff --git a/src/couch/test/eunit/couch_raft_SUITE.erl b/src/couch/test/eunit/couch_raft_SUITE.erl
new file mode 100644
index 000000000..1c3f8ebc2
--- /dev/null
+++ b/src/couch/test/eunit/couch_raft_SUITE.erl
@@ -0,0 +1,67 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
+
+-module(couch_raft_SUITE).
+
+-behaviour(ct_suite).
+
+-export([all/0]).
+-export([three_nodes/1]).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+all() ->
+ [three_nodes].
+
+three_nodes(Config) when is_list(Config) ->
+ N = 3,
+ Args = ["-pa", filename:dirname(code:which(craft))],
+ Peers = [?CT_PEER(#{wait_boot => {self(), tag}, args => Args}) || _ <- lists:seq(1, N)],
+ Cohort = [receive {tag, {started, Node, Peer}} -> Node end || {ok, Peer} <- Peers],
+
+ Crafts = [erpc:call(Node, craft3, start, [foo, Cohort]) || Node <- Cohort],
+
+ % wait for leader election
+ timer:sleep(500),
+
+ % verify only one leader elected
+ [{leader, FirstLeader}] = lists:filter(fun({State, _Pid}) -> State == leader end,
+ [{element(1, sys:get_state(Pid)), Pid} || {ok, Pid} <- Crafts]),
+
+ % make a series of calls
+ Hash1 = crypto:hash(sha256, <<0, 1>>),
+ ?assertEqual(Hash1, craft3:call(FirstLeader, <<1>>)),
+
+ Hash2 = crypto:hash(sha256, <<Hash1/binary, 2>>),
+ ?assertEqual(Hash2, craft3:call(FirstLeader, <<2>>)),
+
+ Hash3 = crypto:hash(sha256, <<Hash2/binary, 3>>),
+ ?assertEqual(Hash3, craft3:call(FirstLeader, <<3>>)),
+
+ % force a re-election
+ craft3:stop(FirstLeader),
+ timer:sleep(500),
+
+ % verify new leader elected
+ [{leader, SecondLeader}] = lists:filter(fun({State, _Pid}) -> State == leader end,
+ [{element(1, sys:get_state(Pid)), Pid} || {ok, Pid} <- Crafts, Pid /= FirstLeader]),
+ ?assertNotEqual(FirstLeader, SecondLeader),
+
+ % make another call
+ Hash4 = crypto:hash(sha256, <<Hash3/binary, 4>>),
+ ?assertEqual(Hash4, craft3:call(SecondLeader, <<4>>)),
+
+ [craft3:stop(Pid) || {ok, Pid} <- Crafts, Pid /= FirstLeader],
+ [peer:stop(Peer) || {ok, Peer} <- Peers].