diff options
-rw-r--r-- | src/fabric/src/fabric_view_changes.erl | 76 |
1 files changed, 59 insertions, 17 deletions
diff --git a/src/fabric/src/fabric_view_changes.erl b/src/fabric/src/fabric_view_changes.erl index eea6a72bb..9fdbf06df 100644 --- a/src/fabric/src/fabric_view_changes.erl +++ b/src/fabric/src/fabric_view_changes.erl @@ -146,29 +146,22 @@ send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn, Timeout) -> Ref = rexi:cast(N, {fabric_rpc, changes, [Name, ChangesArgs, Arg]}), {S#shard{ref = Ref}, Seq} end, WSplitSeqs0), - % For ranges that were not split start sequences from 0 - WReps = lists:map(fun(#shard{name = Name, node = N} = S) -> - Ref = rexi:cast(N, {fabric_rpc, changes, [Name, ChangesArgs, 0]}), + % For ranges that were not split, look for a replacement on a different node + WReps = lists:map(fun(#shard{name = Name, node = NewNode, range = R} = S) -> + Arg = find_replacement_sequence(Dead, R), + case Arg =/= 0 of true -> ok; false -> + couch_log:warning("~p reset seq for ~p", [?MODULE, S]) + end, + Ref = rexi:cast(NewNode, {fabric_rpc, changes, [Name, ChangesArgs, Arg]}), {S#shard{ref = Ref}, 0} end, Reps1), Seqs = WSeqs ++ WSplitSeqs ++ WReps, {Workers0, _} = lists:unzip(Seqs), Repls = fabric_ring:get_shard_replacements(DbName, Workers0), StartFun = fun(#shard{name=Name, node=N, range=R0}=Shard) -> - %% Find the original shard copy in the Seqs array - case lists:dropwhile(fun({S, _}) -> S#shard.range =/= R0 end, Seqs) of - [{#shard{}, {replace, _, _, _}} | _] -> - % Don't attempt to replace a replacement - SeqArg = 0; - [{#shard{node = OldNode}, OldSeq} | _] -> - SeqArg = make_replacement_arg(OldNode, OldSeq); - _ -> - % TODO this clause is probably unreachable in the N>2 - % case because we compute replacements only if a shard has one - % in the original set. - couch_log:error("Streaming ~s from zero while replacing ~p", - [Name, PackedSeqs]), - SeqArg = 0 + SeqArg = find_replacement_sequence(Seqs, R0), + case SeqArg =/= 0 of true -> ok; false -> + couch_log:warning("~p StartFun reset seq for ~p", [?MODULE, Shard]) end, Ref = rexi:cast(N, {fabric_rpc, changes, [Name, ChangesArgs, SeqArg]}), Shard#shard{ref = Ref} @@ -670,6 +663,22 @@ find_split_shard_replacements(DeadWorkers, Shards) -> {fabric_dict:from_list(Workers), Available}. +find_replacement_sequence(OriginalSeqs, R0) -> + %% Find the original shard copy in the Seqs array + case lists:dropwhile(fun({S, _}) -> S#shard.range =/= R0 end, OriginalSeqs) of + [{#shard{}, {replace, _, _, _}} | _] -> + % Don't attempt to replace a replacement + 0; + [{#shard{node = OldNode}, OldSeq} | _] -> + make_replacement_arg(OldNode, OldSeq); + _ -> + % TODO we don't currently attempt to replace a shard with split + % replicas of that range on other nodes, so it's possible to end + % up with an empty list here. + 0 + end. + + make_split_seq({Num, Uuid, Node}, RepCount) when RepCount > 1 -> {Num, {split, Uuid}, Node}; make_split_seq(Seq, _) -> @@ -892,3 +901,36 @@ find_split_shard_replacements_test() -> {Workers3, ShardsLeft3} = find_split_shard_replacements(Dead3, Shards3), ?assertEqual([], Workers3), ?assertEqual(Shards3, ShardsLeft3). + + +find_replacement_sequence_test() -> + Shards = [{"n2", 0, 10}, {"n3", 0, 5}], + Uuid = <<"abc1234">>, + Epoch = 'n1', + + % Not safe to use a plain integer sequence number + Dead1 = mk_workers(Shards, 42), + ?assertEqual(0, find_replacement_sequence(Dead1, [0, 10])), + ?assertEqual(0, find_replacement_sequence(Dead1, [0, 5])), + + % {Seq, Uuid} should work + Dead2 = mk_workers(Shards, {43, Uuid}), + ?assertEqual({replace, 'n2', Uuid, 43}, + find_replacement_sequence(Dead2, [0, 10])), + ?assertEqual({replace, 'n3', Uuid, 43}, + find_replacement_sequence(Dead2, [0, 5])), + + % Can't find the range at all + ?assertEqual(0, find_replacement_sequence(Dead2, [0, 4])), + + % {Seq, Uuids, EpochNode} should work + Dead3 = mk_workers(Shards, {44, Uuid, Epoch}), + ?assertEqual({replace, 'n1', Uuid, 44}, + find_replacement_sequence(Dead3, [0, 10])), + ?assertEqual({replace, 'n1', Uuid, 44}, + find_replacement_sequence(Dead3, [0, 5])), + + % Cannot replace a replacement + Dead4 = mk_workers(Shards, {replace, 'n1', Uuid, 45}), + ?assertEqual(0, find_replacement_sequence(Dead4, [0, 10])), + ?assertEqual(0, find_replacement_sequence(Dead4, [0, 5])). |