summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/fabric/src/fabric_view_changes.erl76
1 files changed, 59 insertions, 17 deletions
diff --git a/src/fabric/src/fabric_view_changes.erl b/src/fabric/src/fabric_view_changes.erl
index eea6a72bb..9fdbf06df 100644
--- a/src/fabric/src/fabric_view_changes.erl
+++ b/src/fabric/src/fabric_view_changes.erl
@@ -146,29 +146,22 @@ send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn, Timeout) ->
Ref = rexi:cast(N, {fabric_rpc, changes, [Name, ChangesArgs, Arg]}),
{S#shard{ref = Ref}, Seq}
end, WSplitSeqs0),
- % For ranges that were not split start sequences from 0
- WReps = lists:map(fun(#shard{name = Name, node = N} = S) ->
- Ref = rexi:cast(N, {fabric_rpc, changes, [Name, ChangesArgs, 0]}),
+ % For ranges that were not split, look for a replacement on a different node
+ WReps = lists:map(fun(#shard{name = Name, node = NewNode, range = R} = S) ->
+ Arg = find_replacement_sequence(Dead, R),
+ case Arg =/= 0 of true -> ok; false ->
+ couch_log:warning("~p reset seq for ~p", [?MODULE, S])
+ end,
+ Ref = rexi:cast(NewNode, {fabric_rpc, changes, [Name, ChangesArgs, Arg]}),
{S#shard{ref = Ref}, 0}
end, Reps1),
Seqs = WSeqs ++ WSplitSeqs ++ WReps,
{Workers0, _} = lists:unzip(Seqs),
Repls = fabric_ring:get_shard_replacements(DbName, Workers0),
StartFun = fun(#shard{name=Name, node=N, range=R0}=Shard) ->
- %% Find the original shard copy in the Seqs array
- case lists:dropwhile(fun({S, _}) -> S#shard.range =/= R0 end, Seqs) of
- [{#shard{}, {replace, _, _, _}} | _] ->
- % Don't attempt to replace a replacement
- SeqArg = 0;
- [{#shard{node = OldNode}, OldSeq} | _] ->
- SeqArg = make_replacement_arg(OldNode, OldSeq);
- _ ->
- % TODO this clause is probably unreachable in the N>2
- % case because we compute replacements only if a shard has one
- % in the original set.
- couch_log:error("Streaming ~s from zero while replacing ~p",
- [Name, PackedSeqs]),
- SeqArg = 0
+ SeqArg = find_replacement_sequence(Seqs, R0),
+ case SeqArg =/= 0 of true -> ok; false ->
+ couch_log:warning("~p StartFun reset seq for ~p", [?MODULE, Shard])
end,
Ref = rexi:cast(N, {fabric_rpc, changes, [Name, ChangesArgs, SeqArg]}),
Shard#shard{ref = Ref}
@@ -670,6 +663,22 @@ find_split_shard_replacements(DeadWorkers, Shards) ->
{fabric_dict:from_list(Workers), Available}.
+find_replacement_sequence(OriginalSeqs, R0) ->
+ %% Find the original shard copy in the Seqs array
+ case lists:dropwhile(fun({S, _}) -> S#shard.range =/= R0 end, OriginalSeqs) of
+ [{#shard{}, {replace, _, _, _}} | _] ->
+ % Don't attempt to replace a replacement
+ 0;
+ [{#shard{node = OldNode}, OldSeq} | _] ->
+ make_replacement_arg(OldNode, OldSeq);
+ _ ->
+ % TODO we don't currently attempt to replace a shard with split
+ % replicas of that range on other nodes, so it's possible to end
+ % up with an empty list here.
+ 0
+ end.
+
+
make_split_seq({Num, Uuid, Node}, RepCount) when RepCount > 1 ->
{Num, {split, Uuid}, Node};
make_split_seq(Seq, _) ->
@@ -892,3 +901,36 @@ find_split_shard_replacements_test() ->
{Workers3, ShardsLeft3} = find_split_shard_replacements(Dead3, Shards3),
?assertEqual([], Workers3),
?assertEqual(Shards3, ShardsLeft3).
+
+
+find_replacement_sequence_test() ->
+ Shards = [{"n2", 0, 10}, {"n3", 0, 5}],
+ Uuid = <<"abc1234">>,
+ Epoch = 'n1',
+
+ % Not safe to use a plain integer sequence number
+ Dead1 = mk_workers(Shards, 42),
+ ?assertEqual(0, find_replacement_sequence(Dead1, [0, 10])),
+ ?assertEqual(0, find_replacement_sequence(Dead1, [0, 5])),
+
+ % {Seq, Uuid} should work
+ Dead2 = mk_workers(Shards, {43, Uuid}),
+ ?assertEqual({replace, 'n2', Uuid, 43},
+ find_replacement_sequence(Dead2, [0, 10])),
+ ?assertEqual({replace, 'n3', Uuid, 43},
+ find_replacement_sequence(Dead2, [0, 5])),
+
+ % Can't find the range at all
+ ?assertEqual(0, find_replacement_sequence(Dead2, [0, 4])),
+
+ % {Seq, Uuids, EpochNode} should work
+ Dead3 = mk_workers(Shards, {44, Uuid, Epoch}),
+ ?assertEqual({replace, 'n1', Uuid, 44},
+ find_replacement_sequence(Dead3, [0, 10])),
+ ?assertEqual({replace, 'n1', Uuid, 44},
+ find_replacement_sequence(Dead3, [0, 5])),
+
+ % Cannot replace a replacement
+ Dead4 = mk_workers(Shards, {replace, 'n1', Uuid, 45}),
+ ?assertEqual(0, find_replacement_sequence(Dead4, [0, 10])),
+ ?assertEqual(0, find_replacement_sequence(Dead4, [0, 5])).