diff options
authorAdam Kocoloski <>2021-10-27 12:40:07 -0400
committerGitHub <>2021-10-27 12:40:07 -0400
commit294e777e7b85e60cbcfc319984655ecb715c4534 (patch)
parent8513c5bae1de0089e5909a51b7e15abf9af2196c (diff)
Minimize rewinds when a node is down (#3792)
Our existing logic for handling rewinds in the changes feed addresses the following cases: - A node that contributed to a sequence is in maintenance mode - A shard that contributed to a sequence has been split This patch adds support for cases where the node that contributed to a client-supplied sequence is down at the beginning of the request handling. It reuses the same logic as the maintenance mode case as these two situations really ought to be handled the same way. A future improvement would be to unify the "node down" and "shard split" logic so that we could handle the compound case, e.g. replacing a shard from a down node with a pair of shards from nodes that cover the same range. Fixes #3788 Co-authored-by: Nick Vatamaniuc <>
1 files changed, 59 insertions, 17 deletions
diff --git a/src/fabric/src/fabric_view_changes.erl b/src/fabric/src/fabric_view_changes.erl
index eea6a72bb..9fdbf06df 100644
--- a/src/fabric/src/fabric_view_changes.erl
+++ b/src/fabric/src/fabric_view_changes.erl
@@ -146,29 +146,22 @@ send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn, Timeout) ->
Ref = rexi:cast(N, {fabric_rpc, changes, [Name, ChangesArgs, Arg]}),
{S#shard{ref = Ref}, Seq}
end, WSplitSeqs0),
- % For ranges that were not split start sequences from 0
- WReps = lists:map(fun(#shard{name = Name, node = N} = S) ->
- Ref = rexi:cast(N, {fabric_rpc, changes, [Name, ChangesArgs, 0]}),
+ % For ranges that were not split, look for a replacement on a different node
+ WReps = lists:map(fun(#shard{name = Name, node = NewNode, range = R} = S) ->
+ Arg = find_replacement_sequence(Dead, R),
+ case Arg =/= 0 of true -> ok; false ->
+ couch_log:warning("~p reset seq for ~p", [?MODULE, S])
+ end,
+ Ref = rexi:cast(NewNode, {fabric_rpc, changes, [Name, ChangesArgs, Arg]}),
{S#shard{ref = Ref}, 0}
end, Reps1),
Seqs = WSeqs ++ WSplitSeqs ++ WReps,
{Workers0, _} = lists:unzip(Seqs),
Repls = fabric_ring:get_shard_replacements(DbName, Workers0),
StartFun = fun(#shard{name=Name, node=N, range=R0}=Shard) ->
- %% Find the original shard copy in the Seqs array
- case lists:dropwhile(fun({S, _}) -> S#shard.range =/= R0 end, Seqs) of
- [{#shard{}, {replace, _, _, _}} | _] ->
- % Don't attempt to replace a replacement
- SeqArg = 0;
- [{#shard{node = OldNode}, OldSeq} | _] ->
- SeqArg = make_replacement_arg(OldNode, OldSeq);
- _ ->
- % TODO this clause is probably unreachable in the N>2
- % case because we compute replacements only if a shard has one
- % in the original set.
- couch_log:error("Streaming ~s from zero while replacing ~p",
- [Name, PackedSeqs]),
- SeqArg = 0
+ SeqArg = find_replacement_sequence(Seqs, R0),
+ case SeqArg =/= 0 of true -> ok; false ->
+ couch_log:warning("~p StartFun reset seq for ~p", [?MODULE, Shard])
Ref = rexi:cast(N, {fabric_rpc, changes, [Name, ChangesArgs, SeqArg]}),
Shard#shard{ref = Ref}
@@ -670,6 +663,22 @@ find_split_shard_replacements(DeadWorkers, Shards) ->
{fabric_dict:from_list(Workers), Available}.
+find_replacement_sequence(OriginalSeqs, R0) ->
+ %% Find the original shard copy in the Seqs array
+ case lists:dropwhile(fun({S, _}) -> S#shard.range =/= R0 end, OriginalSeqs) of
+ [{#shard{}, {replace, _, _, _}} | _] ->
+ % Don't attempt to replace a replacement
+ 0;
+ [{#shard{node = OldNode}, OldSeq} | _] ->
+ make_replacement_arg(OldNode, OldSeq);
+ _ ->
+ % TODO we don't currently attempt to replace a shard with split
+ % replicas of that range on other nodes, so it's possible to end
+ % up with an empty list here.
+ 0
+ end.
make_split_seq({Num, Uuid, Node}, RepCount) when RepCount > 1 ->
{Num, {split, Uuid}, Node};
make_split_seq(Seq, _) ->
@@ -892,3 +901,36 @@ find_split_shard_replacements_test() ->
{Workers3, ShardsLeft3} = find_split_shard_replacements(Dead3, Shards3),
?assertEqual([], Workers3),
?assertEqual(Shards3, ShardsLeft3).
+find_replacement_sequence_test() ->
+ Shards = [{"n2", 0, 10}, {"n3", 0, 5}],
+ Uuid = <<"abc1234">>,
+ Epoch = 'n1',
+ % Not safe to use a plain integer sequence number
+ Dead1 = mk_workers(Shards, 42),
+ ?assertEqual(0, find_replacement_sequence(Dead1, [0, 10])),
+ ?assertEqual(0, find_replacement_sequence(Dead1, [0, 5])),
+ % {Seq, Uuid} should work
+ Dead2 = mk_workers(Shards, {43, Uuid}),
+ ?assertEqual({replace, 'n2', Uuid, 43},
+ find_replacement_sequence(Dead2, [0, 10])),
+ ?assertEqual({replace, 'n3', Uuid, 43},
+ find_replacement_sequence(Dead2, [0, 5])),
+ % Can't find the range at all
+ ?assertEqual(0, find_replacement_sequence(Dead2, [0, 4])),
+ % {Seq, Uuids, EpochNode} should work
+ Dead3 = mk_workers(Shards, {44, Uuid, Epoch}),
+ ?assertEqual({replace, 'n1', Uuid, 44},
+ find_replacement_sequence(Dead3, [0, 10])),
+ ?assertEqual({replace, 'n1', Uuid, 44},
+ find_replacement_sequence(Dead3, [0, 5])),
+ % Cannot replace a replacement
+ Dead4 = mk_workers(Shards, {replace, 'n1', Uuid, 45}),
+ ?assertEqual(0, find_replacement_sequence(Dead4, [0, 10])),
+ ?assertEqual(0, find_replacement_sequence(Dead4, [0, 5])).