diff options
author | Adam Kocoloski <kocolosk@apache.org> | 2021-10-27 12:40:07 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-10-27 12:40:07 -0400 |
commit | 294e777e7b85e60cbcfc319984655ecb715c4534 (patch) | |
tree | 397eba8885cacebd91ef91fae47a6337b5c66fab | |
parent | 8513c5bae1de0089e5909a51b7e15abf9af2196c (diff) | |
download | couchdb-294e777e7b85e60cbcfc319984655ecb715c4534.tar.gz |
Minimize rewinds when a node is down (#3792)
Our existing logic for handling rewinds in the changes feed addresses
the following cases:
- A node that contributed to a sequence is in maintenance mode
- A shard that contributed to a sequence has been split
This patch adds support for cases where the node that contributed to a
client-supplied sequence is down at the beginning of the request
handling. It reuses the same logic as the maintenance mode case as these
two situations really ought to be handled the same way.
A future improvement would be to unify the "node down" and "shard split"
logic so that we could handle the compound case, e.g. replacing a shard
from a down node with a pair of shards from nodes that cover the same
range.
Fixes #3788
Co-authored-by: Nick Vatamaniuc <vatamane@gmail.com>
-rw-r--r-- | src/fabric/src/fabric_view_changes.erl | 76 |
1 files changed, 59 insertions, 17 deletions
diff --git a/src/fabric/src/fabric_view_changes.erl b/src/fabric/src/fabric_view_changes.erl index eea6a72bb..9fdbf06df 100644 --- a/src/fabric/src/fabric_view_changes.erl +++ b/src/fabric/src/fabric_view_changes.erl @@ -146,29 +146,22 @@ send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn, Timeout) -> Ref = rexi:cast(N, {fabric_rpc, changes, [Name, ChangesArgs, Arg]}), {S#shard{ref = Ref}, Seq} end, WSplitSeqs0), - % For ranges that were not split start sequences from 0 - WReps = lists:map(fun(#shard{name = Name, node = N} = S) -> - Ref = rexi:cast(N, {fabric_rpc, changes, [Name, ChangesArgs, 0]}), + % For ranges that were not split, look for a replacement on a different node + WReps = lists:map(fun(#shard{name = Name, node = NewNode, range = R} = S) -> + Arg = find_replacement_sequence(Dead, R), + case Arg =/= 0 of true -> ok; false -> + couch_log:warning("~p reset seq for ~p", [?MODULE, S]) + end, + Ref = rexi:cast(NewNode, {fabric_rpc, changes, [Name, ChangesArgs, Arg]}), {S#shard{ref = Ref}, 0} end, Reps1), Seqs = WSeqs ++ WSplitSeqs ++ WReps, {Workers0, _} = lists:unzip(Seqs), Repls = fabric_ring:get_shard_replacements(DbName, Workers0), StartFun = fun(#shard{name=Name, node=N, range=R0}=Shard) -> - %% Find the original shard copy in the Seqs array - case lists:dropwhile(fun({S, _}) -> S#shard.range =/= R0 end, Seqs) of - [{#shard{}, {replace, _, _, _}} | _] -> - % Don't attempt to replace a replacement - SeqArg = 0; - [{#shard{node = OldNode}, OldSeq} | _] -> - SeqArg = make_replacement_arg(OldNode, OldSeq); - _ -> - % TODO this clause is probably unreachable in the N>2 - % case because we compute replacements only if a shard has one - % in the original set. - couch_log:error("Streaming ~s from zero while replacing ~p", - [Name, PackedSeqs]), - SeqArg = 0 + SeqArg = find_replacement_sequence(Seqs, R0), + case SeqArg =/= 0 of true -> ok; false -> + couch_log:warning("~p StartFun reset seq for ~p", [?MODULE, Shard]) end, Ref = rexi:cast(N, {fabric_rpc, changes, [Name, ChangesArgs, SeqArg]}), Shard#shard{ref = Ref} @@ -670,6 +663,22 @@ find_split_shard_replacements(DeadWorkers, Shards) -> {fabric_dict:from_list(Workers), Available}. +find_replacement_sequence(OriginalSeqs, R0) -> + %% Find the original shard copy in the Seqs array + case lists:dropwhile(fun({S, _}) -> S#shard.range =/= R0 end, OriginalSeqs) of + [{#shard{}, {replace, _, _, _}} | _] -> + % Don't attempt to replace a replacement + 0; + [{#shard{node = OldNode}, OldSeq} | _] -> + make_replacement_arg(OldNode, OldSeq); + _ -> + % TODO we don't currently attempt to replace a shard with split + % replicas of that range on other nodes, so it's possible to end + % up with an empty list here. + 0 + end. + + make_split_seq({Num, Uuid, Node}, RepCount) when RepCount > 1 -> {Num, {split, Uuid}, Node}; make_split_seq(Seq, _) -> @@ -892,3 +901,36 @@ find_split_shard_replacements_test() -> {Workers3, ShardsLeft3} = find_split_shard_replacements(Dead3, Shards3), ?assertEqual([], Workers3), ?assertEqual(Shards3, ShardsLeft3). + + +find_replacement_sequence_test() -> + Shards = [{"n2", 0, 10}, {"n3", 0, 5}], + Uuid = <<"abc1234">>, + Epoch = 'n1', + + % Not safe to use a plain integer sequence number + Dead1 = mk_workers(Shards, 42), + ?assertEqual(0, find_replacement_sequence(Dead1, [0, 10])), + ?assertEqual(0, find_replacement_sequence(Dead1, [0, 5])), + + % {Seq, Uuid} should work + Dead2 = mk_workers(Shards, {43, Uuid}), + ?assertEqual({replace, 'n2', Uuid, 43}, + find_replacement_sequence(Dead2, [0, 10])), + ?assertEqual({replace, 'n3', Uuid, 43}, + find_replacement_sequence(Dead2, [0, 5])), + + % Can't find the range at all + ?assertEqual(0, find_replacement_sequence(Dead2, [0, 4])), + + % {Seq, Uuids, EpochNode} should work + Dead3 = mk_workers(Shards, {44, Uuid, Epoch}), + ?assertEqual({replace, 'n1', Uuid, 44}, + find_replacement_sequence(Dead3, [0, 10])), + ?assertEqual({replace, 'n1', Uuid, 44}, + find_replacement_sequence(Dead3, [0, 5])), + + % Cannot replace a replacement + Dead4 = mk_workers(Shards, {replace, 'n1', Uuid, 45}), + ?assertEqual(0, find_replacement_sequence(Dead4, [0, 10])), + ?assertEqual(0, find_replacement_sequence(Dead4, [0, 5])). |