diff options
Diffstat (limited to 'src/couch/src/couch_raft.erl')
-rw-r--r-- | src/couch/src/couch_raft.erl | 350 |
1 files changed, 350 insertions, 0 deletions
diff --git a/src/couch/src/couch_raft.erl b/src/couch/src/couch_raft.erl new file mode 100644 index 000000000..f398b4f2a --- /dev/null +++ b/src/couch/src/couch_raft.erl @@ -0,0 +1,350 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*- + +-module(couch_raft). +-behaviour(gen_statem). + +-define(ELECTION_DELAY, 150). +-define(ELECTION_SPLAY, 150). +-define(LEADER_HEARTBEAT, 75). +-define(CLIENT_TIMEOUT, 5_000). + +% maximum number of entries to send in one go. +-define(BATCH_SIZE, 10). + +% public api + +-export([ + start/2, + start_link/2, + stop/1, + call/2 +]). + +% mandatory gen_statem callbacks + +-export([ + init/1, + callback_mode/0, + handle_event/4 +]). + +%% public api + +start(Name, Cohort) -> + gen_statem:start({local, Name}, ?MODULE, new(Name, Cohort), []). + +start_link(Name, Cohort) -> + gen_statem:start_link({local, Name}, ?MODULE, new(Name, Cohort), []). + +new(Name, Cohort) -> + Peers = peers(Cohort), + #{ + name => Name, + cohort => Cohort, + term => 0, + votedFor => undefined, + votesGranted => #{}, + nextIndex => maps:from_list([{Peer, 1} || Peer <- Peers]), + matchIndex => maps:from_list([{Peer, 0} || Peer <- Peers]), + log => couch_raft_log:new(), + commitIndex => 0, + froms => #{}, + lastApplied => 0, + machine => <<0>> + }. + +stop(ServerRef) -> + gen_statem:stop(ServerRef). + +call(ServerRef, Value) -> + gen_statem:call(ServerRef, #{type => 'ClientRequest', value => Value}, ?CLIENT_TIMEOUT). + +init(Data) -> + {ok, follower, Data}. + +callback_mode() -> + [handle_event_function, state_enter]. + +%% erlfmt-ignore +handle_event(cast, #{term := FutureTerm} = Msg, _State, #{term := CurrentTerm} = Data) when FutureTerm > CurrentTerm -> + couch_log:notice("~p received message from future term ~B, moving to that term, becoming follower and clearing votedFor", [node(), FutureTerm]), + {next_state, follower, Data#{term => FutureTerm, votedFor => undefined}, {next_event, cast, Msg}}; + +handle_event(enter, _OldState, follower, Data) -> + #{term := Term, froms := Froms} = Data, + couch_log:notice("~p became follower in term ~B", [node(), Term]), + Replies = [{reply, From, {error, deposed}} || From <- maps:values(Froms)], + {keep_state, Data#{votedFor => undefined, froms => #{}}, [restart_election_timeout() | Replies]}; + +handle_event(enter, _OldState, candidate, Data) -> + #{term := Term} = Data, + couch_log:notice("~p became candidate in term ~B", [node(), Term]), + {keep_state, start_election(Data), restart_election_timeout()}; + +handle_event(enter, _OldState, leader, Data) -> + #{log := Log, cohort := Cohort, term := Term} = Data, + couch_log:notice("~p became leader in term ~B", [node(), Term]), + Peers = peers(Cohort), + {keep_state, Data#{ + nextIndex => maps:from_list([{Peer, couch_raft_log:index(couch_raft_log:last(Log)) + 1} || Peer <- Peers]), + matchIndex => maps:from_list([{Peer, 0} || Peer <- Peers]) + }, restart_heartbeat_timeout()}; + +handle_event(cast, #{type := 'RequestVoteRequest', term := Term} = Msg, _State, #{term := CurrentTerm} = Data) + when Term =< CurrentTerm -> + #{ + source := MSource, + lastLogIndex := MLastLogIndex, + lastLogTerm := MLastLogTerm + } = Msg, + #{ + log := Log, + votedFor := VotedFor + } = Data, + LogOk = MLastLogTerm > couch_raft_log:term(couch_raft_log:last(Log)) orelse (MLastLogTerm == couch_raft_log:term(couch_raft_log:last(Log)) andalso MLastLogIndex >= couch_raft_log:index(couch_raft_log:last(Log))), + Grant = Term == CurrentTerm andalso LogOk andalso (VotedFor == undefined orelse VotedFor == MSource), + couch_log:notice("~p received RequestVoteRequest from ~p in term ~B when in term ~B (Grant:~p, LogOk:~p, VotedFor:~p)", [node(), MSource, Term, CurrentTerm, Grant, LogOk, VotedFor]), + Reply = #{ + type => 'RequestVoteResponse', + term => CurrentTerm, + voteGranted => Grant, + source => node() + }, + cast(MSource, Reply, Data), + if + Grant -> + {keep_state, Data#{votedFor => MSource}, restart_election_timeout()}; + true -> + {keep_state_and_data, restart_election_timeout()} + end; + +handle_event(cast, #{type := 'RequestVoteResponse', term := PastTerm}, _State, #{term := CurrentTerm}) when PastTerm < CurrentTerm -> + couch_log:notice("~p ignored RequestVoteResponse from past term ~B", [node(), PastTerm]), + keep_state_and_data; + +handle_event(cast, #{type := 'RequestVoteResponse', term := Term} = Msg, _State, #{term := Term} = Data) -> + #{source := MSource, voteGranted := MVoteGranted} = Msg, + #{cohort := Cohort, votesGranted := VotesGranted0} = Data, + VotesGranted1 = if MVoteGranted -> lists:usort([MSource | VotesGranted0]); true -> VotesGranted0 end, + couch_log:notice("~p received RequestVoteResponse from ~p in current term ~B (VotesGranted:~p)", [node(), MSource, Term, VotesGranted1]), + if + length(VotesGranted1) >= length(Cohort) div 2 + 1 -> + couch_log:notice("~p has enough votes to be leader in term ~B", [node(), Term]), + {next_state, leader, Data#{votesGranted => VotesGranted1}}; + true -> + {keep_state, Data#{votesGranted => VotesGranted1}} + end; + + +handle_event(cast, #{type := 'AppendEntriesRequest', term := Term} = Msg, State, #{term := CurrentTerm} = Data) + when Term =< CurrentTerm -> + #{ + source := MSource, + prevLogIndex := MPrevLogIndex, + prevLogTerm := MPrevLogTerm, + entries := MEntries, + commitIndex := MCommitIndex + } = Msg, + #{ + log := Log + } = Data, + LogOk = MPrevLogIndex == 0 orelse (MPrevLogIndex > 0 andalso MPrevLogIndex =< couch_raft_log:index(couch_raft_log:last(Log)) andalso MPrevLogTerm == couch_raft_log:term(couch_raft_log:nth(MPrevLogIndex,Log))), + if + Term < CurrentTerm orelse (Term == CurrentTerm andalso State == follower andalso not LogOk) -> + Reply = #{ + type => 'AppendEntriesResponse', + term => CurrentTerm, + success => false, + matchIndex => 0, + source => node() + }, + cast(MSource, Reply, Data), + if + State == leader -> + keep_state_and_data; + true -> + {keep_state_and_data, restart_election_timeout()} + end; + Term == CurrentTerm andalso State == candidate -> + {next_state, follower, Data, {next_event, cast, Msg}}; + Term == CurrentTerm andalso State == follower andalso LogOk -> + if + MEntries == [] -> + Reply = #{ + type => 'AppendEntriesResponse', + term => CurrentTerm, + success => true, + matchIndex => MPrevLogIndex, + source => node() + }, + couch_log:debug("~p received heartbeat and everything matches, sending matchIndex:~p", [node(), MPrevLogIndex]), + cast(MSource, Reply, Data), + {keep_state, update_state_machine(Data#{commitIndex => MCommitIndex}), restart_election_timeout()}; + true -> + Index = MPrevLogIndex + 1, + LastLogIndex = couch_raft_log:index(couch_raft_log:last(Log)), + if + LastLogIndex >= Index -> + NthLogTerm = couch_raft_log:term(couch_raft_log:nth(Index, Log)), + FirstEntryTerm = couch_raft_log:term(hd(MEntries)), + if + NthLogTerm == FirstEntryTerm -> + Reply = #{ + type => 'AppendEntriesResponse', + term => CurrentTerm, + success => true, + matchIndex => MPrevLogIndex + length(MEntries), + source => node() + }, + couch_log:notice("~p received entry:~p that's already applied, sending matchIndex:~p", [node(), MEntries, MPrevLogIndex + length(MEntries)]), + cast(MSource, Reply, Data), + {keep_state, update_state_machine(Data#{commitIndex => MCommitIndex}), restart_election_timeout()}; + NthLogTerm /= FirstEntryTerm -> + couch_log:notice("~p received conflicting entry:~p, deleting it", [node(), MEntries]), + {keep_state, Data#{log => lists:sublist(Log, LastLogIndex - 1)}, [{next_event, cast, Msg}, restart_election_timeout()]} + end; + LastLogIndex == MPrevLogIndex -> + couch_log:notice("~p received new entries:~p, appending it to log", [node(), MEntries]), + {keep_state, Data#{log => couch_raft_log:append(Log, MEntries)}, [{next_event, cast, Msg}, restart_election_timeout()]} + end + end + end; + +handle_event(cast, #{type := 'AppendEntriesResponse', term := PastTerm}, _State, #{term := CurrentTerm}) when PastTerm < CurrentTerm -> + couch_log:notice("~p ignored AppendEntriesResponse from past term ~B", [node(), PastTerm]), + keep_state_and_data; + +handle_event(cast, #{type := 'AppendEntriesResponse', term := Term} = Msg, _State, #{term := Term} = Data) -> + #{success := MSuccess, matchIndex := MMatchIndex, source := MSource} = Msg, + #{nextIndex := NextIndex, matchIndex := MatchIndex} = Data, + couch_log:debug("~p received AppendEntriesResponse from ~p in current term ~B (Success:~p)", [node(), MSource, Term, MSuccess]), + SourceNextIndex = maps:get(MSource, NextIndex), + if + MSuccess -> + {keep_state, Data#{ + nextIndex => NextIndex#{MSource => MMatchIndex + 1}, + matchIndex => MatchIndex#{MSource => MMatchIndex} + }}; + true -> + {keep_state, Data#{ + nextIndex => NextIndex#{MSource => max(SourceNextIndex - 1, 1)} + }} + end; + +handle_event({call, From}, #{type := 'ClientRequest'} = Msg, leader, Data) -> + #{value := Value} = Msg, + #{term := Term, log := Log, froms := Froms} = Data, + EntryIndex = couch_raft_log:index(couch_raft_log:last(Log)) + 1, + Entry = {EntryIndex, Term, Value}, + {keep_state, Data#{log => couch_raft_log:append(Log, [Entry]), froms => Froms#{EntryIndex => From}}}; + +handle_event({call, From}, #{type := 'ClientRequest'}, _State, _Data) -> + {keep_state_and_data, {reply, From, {error, not_leader}}}; + +handle_event(state_timeout, new_election, State, Data) when State == follower; State == candidate -> + #{term := Term} = Data, + couch_log:notice("~p election timeout in state ~p, term ~B", [node(), State, Term]), + {next_state, candidate, start_election(Data), restart_election_timeout()}; + +handle_event(state_timeout, heartbeat, leader, Data) -> + #{term := Term} = Data, + couch_log:debug("~p leader sending a heartbeat in term ~B", [node(), Term]), + ok = send_append_entries(Data), + {keep_state, advance_commit_index(Data), restart_heartbeat_timeout()}; + +handle_event(EventType, EventContent, State, Data) -> + {stop, {unknown_event, EventType, EventContent, State, Data}}. + + +send_append_entries(#{cohort := Cohort} = Data) -> + send_append_entries(peers(Cohort), Data). + +send_append_entries([], _Data) -> + ok; +send_append_entries([Peer | Rest], Data) -> + #{term := Term, nextIndex := NextIndex, log := Log, commitIndex := CommitIndex} = Data, + PrevLogIndex = maps:get(Peer, NextIndex) - 1, + PrevLogTerm = if PrevLogIndex > 0 -> couch_raft_log:term(couch_raft_log:nth(PrevLogIndex, Log)); true -> 0 end, + LastEntry = min(couch_raft_log:index(couch_raft_log:last(Log)), PrevLogIndex + 2), + Entries = couch_raft_log:sublist(Log, PrevLogIndex + 1, ?BATCH_SIZE), + Msg = #{ + type => 'AppendEntriesRequest', + term => Term, + source => node(), + prevLogIndex => PrevLogIndex, + prevLogTerm => PrevLogTerm, + entries => Entries, + commitIndex => min(CommitIndex, LastEntry) + }, + cast(Peer, Msg, Data), + send_append_entries(Rest, Data). + +advance_commit_index(Data) -> + #{matchIndex := MatchIndex, log := Log, cohort := Cohort, term := Term} = Data, + LastTerm = couch_raft_log:term(couch_raft_log:last(Log)), + LastIndexes = lists:sort([couch_raft_log:index(couch_raft_log:last(Log)) | maps:values(MatchIndex)]), + NewCommitIndex = lists:nth(length(Cohort) div 2 + 1, LastIndexes), + if + LastTerm == Term -> + update_state_machine(Data#{commitIndex => NewCommitIndex}); + true -> + Data + end. + +update_state_machine(#{lastApplied := Same, commitIndex := Same} = Data) -> + Data; +update_state_machine(#{lastApplied := LastApplied, commitIndex := CommitIndex} = Data) when LastApplied < CommitIndex -> + #{log := Log, froms := Froms0, machine := Machine0} = Data, + From = LastApplied + 1, + To = min(couch_raft_log:index(couch_raft_log:last(Log)), CommitIndex), + Fun = fun(Index, {Froms, Machine}) -> + Value = couch_raft_log:value(couch_raft_log:nth(Index, Log)), + Result = crypto:hash(sha256, <<Machine/binary, Value/binary>>), + case maps:is_key(Index, Froms) of + true -> + gen_statem:reply(maps:get(Index, Froms), Result), + {maps:remove(Index, Froms), Result}; + false -> + {Froms, Result} + end + end, + {Froms1, Machine1} = lists:foldl(Fun, {Froms0, Machine0}, lists:seq(From, To)), + Data#{froms => Froms1, machine => Machine1, lastApplied => To}. + +start_election(Data) -> + #{term := Term, cohort := Cohort, log := Log} = Data, + ElectionTerm = Term + 1, + couch_log:notice("~p starting election in term ~B", [node(), ElectionTerm]), + RequestVote = #{ + type => 'RequestVoteRequest', + term => ElectionTerm, + lastLogIndex => couch_raft_log:index(couch_raft_log:last(Log)), + lastLogTerm => couch_raft_log:term(couch_raft_log:last(Log)), + source => node() + }, + lists:foreach(fun(Peer) -> cast(Peer, RequestVote, Data) end, peers(Cohort)), + Data#{term => ElectionTerm, votedFor => node(), votesGranted => [node()]}. + +cast(Node, Msg, #{name := Name}) -> + gen_statem:cast({Name, Node}, Msg). + +restart_election_timeout() -> + {state_timeout, ?ELECTION_DELAY + rand:uniform(?ELECTION_SPLAY), new_election}. + +restart_heartbeat_timeout() -> + {state_timeout, ?LEADER_HEARTBEAT, heartbeat}. + +peers(Cohort) -> + Cohort -- [node()]. |