- go/2,
- go/3,
- make_local_id/2,
- make_local_id/3,
- make_purge_id/2,
- verify_purge_checkpoint/2,
- find_source_seq/4,
- find_split_target_seq/4,
- local_id_hash/1
- changes_enumerator/2
--record(acc, {
- batch_size,
- batch_count,
- seq = 0,
- revcount = 0,
- source,
- targets,
- filter,
- db,
- hashfun,
- incomplete_ranges
--record(tgt, {
- shard,
- seq = 0,
- infos = [],
- localid,
- purgeid,
- history = {[]},
- remaining = 0
-go(Source, Target) ->
- go(Source, Target, []).
-go(DbName, Node, Opts) when is_binary(DbName), is_atom(Node) ->
- go(#shard{name=DbName, node=node()}, #shard{name=DbName, node=Node}, Opts);
-go(#shard{} = Source, #shard{} = Target, Opts) ->
- case mem3:db_is_current(Source) of
- true ->
- go(Source, targets_map(Source, Target), Opts);
- false ->
- % Database could have been recreated
- {error, missing_source}
- end;
-go(#shard{} = Source, #{} = Targets0, Opts) when map_size(Targets0) > 0 ->
- Targets = maps:map(fun(_, T) -> #tgt{shard = T} end, Targets0),
- case couch_server:exists( of
- true ->
- sync_security(Source, Targets),
- BatchSize = case proplists:get_value(batch_size, Opts) of
- BS when is_integer(BS), BS > 0 -> BS;
- _ -> 100
- end,
- BatchCount = case proplists:get_value(batch_count, Opts) of
- all -> all;
- BC when is_integer(BC), BC > 0 -> BC;
- _ -> 1
- end,
- IncompleteRanges = config:get_boolean("mem3", "incomplete_ranges",
- false),
- Filter = proplists:get_value(filter, Opts),
- Acc = #acc{
- batch_size = BatchSize,
- batch_count = BatchCount,
- source = Source,
- targets = Targets,
- filter = Filter,
- incomplete_ranges = IncompleteRanges
- },
- go(Acc);
- false ->
- {error, missing_source}
- end.
-go(#acc{source=Source, batch_count=BC}=Acc) ->
- case couch_db:open(, [?ADMIN_CTX]) of
- {ok, Db} ->
- Resp = try
- HashFun = mem3_hash:get_hash_fun(couch_db:name(Db)),
- repl(Acc#acc{db = Db, hashfun = HashFun})
- catch
- error:{error, missing_source} ->
- {error, missing_source};
- error:{not_found, no_db_file} ->
- {error, missing_target}
- after
- couch_db:close(Db)
- end,
- case Resp of
- {ok, P} when P > 0, BC == all ->
- go(Acc);
- {ok, P} when P > 0, BC > 1 ->
- go(Acc#acc{batch_count=BC-1});
- Else ->
- Else
- end;
- {not_found, no_db_file} ->
- {error, missing_source}
- end.
-make_local_id(Source, Target) ->
- make_local_id(Source, Target, undefined).
-make_local_id(#shard{node=SourceNode}, #shard{node=TargetNode}, Filter) ->
- make_local_id(SourceNode, TargetNode, Filter);
-make_local_id(SourceThing, TargetThing, F) when is_binary(F) ->
- S = local_id_hash(SourceThing),
- T = local_id_hash(TargetThing),
- <<"_local/shard-sync-", S/binary, "-", T/binary, F/binary>>;
-make_local_id(SourceThing, TargetThing, Filter) ->
- S = local_id_hash(SourceThing),
- T = local_id_hash(TargetThing),
- F = filter_hash(Filter),
- <<"_local/shard-sync-", S/binary, "-", T/binary, F/binary>>.
-filter_hash(Filter) when is_function(Filter) ->
- {new_uniq, Hash} = erlang:fun_info(Filter, new_uniq),
- B = couch_util:encodeBase64Url(Hash),
- <<"-", B/binary>>;
-filter_hash(_) ->
- <<>>.
-local_id_hash(Thing) ->
- couch_util:encodeBase64Url(couch_hash:md5_hash(term_to_binary(Thing))).
-make_purge_id(SourceUUID, TargetUUID) ->
- <<"_local/purge-mem3-", SourceUUID/binary, "-", TargetUUID/binary>>.
-verify_purge_checkpoint(DbName, Props) ->
- try
- Type = couch_util:get_value(<<"type">>, Props),
- if Type =/= <<"internal_replication">> -> false; true ->
- SourceBin = couch_util:get_value(<<"source">>, Props),
- TargetBin = couch_util:get_value(<<"target">>, Props),
- Range = couch_util:get_value(<<"range">>, Props),
- Source = binary_to_existing_atom(SourceBin, latin1),
- Target = binary_to_existing_atom(TargetBin, latin1),
- try
- Nodes = lists:foldl(fun(Shard, Acc) ->
- case Shard#shard.range == Range of
- true -> [Shard#shard.node | Acc];
- false -> Acc
- end
- end, [], mem3:shards(DbName)),
- lists:member(Source, Nodes) andalso lists:member(Target, Nodes)
- catch
- error:database_does_not_exist ->
- false
- end
- end
- catch _:_ ->
- false
- end.
-%% @doc Find and return the largest update_seq in SourceDb
-%% that the client has seen from TargetNode.
-%% When reasoning about this function it is very important to
-%% understand the direction of replication for this comparison.
-%% We're only interesting in internal replications initiated
-%% by this node to the node being replaced. When doing a
-%% replacement the most important thing is that the client doesn't
-%% miss any updates. This means we can only fast-forward as far
-%% as they've seen updates on this node. We can detect that by
-%% looking for our push replication history and choosing the
-%% largest source_seq that has a target_seq =< TgtSeq.
-find_source_seq(SrcDb, TgtNode, TgtUUIDPrefix, TgtSeq) ->
- case find_repl_doc(SrcDb, TgtUUIDPrefix) of
- {ok, TgtUUID, Doc} ->
- SrcNode = atom_to_binary(node(), utf8),
- find_source_seq_int(Doc, SrcNode, TgtNode, TgtUUID, TgtSeq);
- {not_found, _} ->
- couch_log:warning("~p find_source_seq repl doc not_found "
- "src_db: ~p, tgt_node: ~p, tgt_uuid_prefix: ~p, tgt_seq: ~p",
- [?MODULE, SrcDb, TgtNode, TgtUUIDPrefix, TgtSeq]),
- 0
- end.
-find_source_seq_int(#doc{body={Props}}, SrcNode0, TgtNode0, TgtUUID, TgtSeq) ->
- SrcNode = case is_atom(SrcNode0) of
- true -> atom_to_binary(SrcNode0, utf8);
- false -> SrcNode0
- end,
- TgtNode = case is_atom(TgtNode0) of
- true -> atom_to_binary(TgtNode0, utf8);
- false -> TgtNode0
- end,
- % This is split off purely for the ability to run unit tests
- % against this bit of code without requiring all sorts of mocks.
- {History} = couch_util:get_value(<<"history">>, Props, {[]}),
- SrcHistory = couch_util:get_value(SrcNode, History, []),
- UseableHistory = lists:filter(fun({Entry}) ->
- couch_util:get_value(<<"target_node">>, Entry) =:= TgtNode andalso
- couch_util:get_value(<<"target_uuid">>, Entry) =:= TgtUUID andalso
- couch_util:get_value(<<"target_seq">>, Entry) =< TgtSeq
- end, SrcHistory),
- % This relies on SrcHistory being ordered desceding by source
- % sequence.
- case UseableHistory of
- [{Entry} | _] ->
- couch_util:get_value(<<"source_seq">>, Entry);
- [] ->
- couch_log:warning("~p find_source_seq_int nil useable history "
- "src_node: ~p, tgt_node: ~p, tgt_uuid: ~p, tgt_seq: ~p, "
- "src_history: ~p",
- [?MODULE, SrcNode, TgtNode, TgtUUID, TgtSeq, SrcHistory]),
- 0
- end.
-find_split_target_seq(TgtDb, SrcNode0, SrcUUIDPrefix, SrcSeq) ->
- SrcNode = case is_atom(SrcNode0) of
- true -> atom_to_binary(SrcNode0, utf8);
- false -> SrcNode0
- end,
- case find_split_target_seq_int(TgtDb, SrcNode, SrcUUIDPrefix) of
- {ok, [{BulkCopySeq, BulkCopySeq} | _]} when SrcSeq =< BulkCopySeq ->
- % Check if source sequence is at or below the initial bulk copy
- % checkpointed sequence. That sequence or anything lower than it
- % can be directly replaced with the same value for each target. For
- % extra safety we assert that the initial source and target
- % sequences are the same value
- SrcSeq;
- {ok, Seqs= [{_, _} | _]} ->
- % Pick the target sequence for the greatest source sequence that is
- % less than `SrcSeq`.
- case lists:takewhile(fun({Seq, _}) -> Seq < SrcSeq end, Seqs) of
- [] ->
- couch_log:warning("~p find_split_target_seq target seq not found "
- "tgt_db: ~p, src_uuid_prefix: ~p, src_seq: ~p",
- [?MODULE, couch_db:name(TgtDb), SrcUUIDPrefix, SrcSeq]),
- 0;
- [{_, _} | _] = Seqs1 ->
- {_, TSeq} = lists:last(Seqs1),
- TSeq
- end;
- {not_found, _} ->
- couch_log:warning("~p find_split_target_seq target seq not found "
- "tgt_db: ~p, src_uuid_prefix: ~p, src_seq: ~p",
- [?MODULE, couch_db:name(TgtDb), SrcUUIDPrefix, SrcSeq]),
- 0
- end.
-repl(#acc{db = Db0} = Acc0) ->
- erlang:put(io_priority, {internal_repl, couch_db:name(Db0)}),
- Acc1 = calculate_start_seq_multi(Acc0),
- try
- Acc3 = case config:get_boolean("mem3", "replicate_purges", false) of
- true ->
- Acc2 = pull_purges_multi(Acc1),
- push_purges_multi(Acc2);
- false ->
- Acc1
- end,
- push_changes(Acc3)
- catch
- throw:{finished, Count} ->
- {ok, Count}
- end.
-pull_purges_multi(#acc{source = Source} = Acc0) ->
- #acc{batch_size = Count, seq = UpdateSeq, targets = Targets0} = Acc0,
- with_src_db(Acc0, fun(Db) ->
- Targets = maps:map(fun(_, #tgt{} = T) ->
- pull_purges(Db, Count, Source, T)
- end, reset_remaining(Targets0)),
- Remaining = maps:fold(fun(_, #tgt{remaining = R}, Sum) ->
- Sum + R
- end, 0, Targets),
- if Remaining == 0 -> Acc0#acc{targets = Targets}; true ->
- PurgeSeq = couch_db:get_purge_seq(Db),
- OldestPurgeSeq = couch_db:get_oldest_purge_seq(Db),
- PurgesToPush = PurgeSeq - OldestPurgeSeq,
- Changes = couch_db:count_changes_since(Db, UpdateSeq),
- Pending = Remaining + PurgesToPush + Changes,
- throw({finished, Pending})
- end
- end).
-pull_purges(Db, Count, SrcShard, #tgt{} = Tgt0) ->
- #tgt{shard = TgtShard} = Tgt0,
- SrcUUID = couch_db:get_uuid(Db),
- #shard{node = TgtNode, name = TgtDbName} = TgtShard,
- {LocalPurgeId, Infos, ThroughSeq, Remaining} =
- mem3_rpc:load_purge_infos(TgtNode, TgtDbName, SrcUUID, Count),
- Tgt = Tgt0#tgt{purgeid = LocalPurgeId},
- if Infos == [] -> ok; true ->
- {ok, _} = couch_db:purge_docs(Db, Infos, [replicated_edits]),
- Body = purge_cp_body(SrcShard, TgtShard, ThroughSeq),
- mem3_rpc:save_purge_checkpoint(TgtNode, TgtDbName, LocalPurgeId, Body)
- end,
- Tgt#tgt{remaining = max(0, Remaining)}.
-push_purges_multi(#acc{source = SrcShard} = Acc) ->
- #acc{batch_size = BatchSize, seq = UpdateSeq, targets = Targets0} = Acc,
- with_src_db(Acc, fun(Db) ->
- Targets = maps:map(fun(_, #tgt{} = T) ->
- push_purges(Db, BatchSize, SrcShard, T)
- end, reset_remaining(Targets0)),
- Remaining = maps:fold(fun(_, #tgt{remaining = R}, Sum) ->
- Sum + R
- end, 0, Targets),
- if Remaining == 0 -> Acc#acc{targets = Targets}; true ->
- Changes = couch_db:count_changes_since(Db, UpdateSeq),
- throw({finished, Remaining + Changes})
- end
- end).
-push_purges(Db, BatchSize, SrcShard, Tgt) ->
- #tgt{shard = TgtShard, purgeid = LocalPurgeId} = Tgt,
- #shard{node = TgtNode, name = TgtDbName} = TgtShard,
- StartSeq = case couch_db:open_doc(Db, LocalPurgeId, []) of
- {ok, #doc{body = {Props}}} ->
- couch_util:get_value(<<"purge_seq">>, Props);
- {not_found, _} ->
- Oldest = couch_db:get_oldest_purge_seq(Db),
- erlang:max(0, Oldest - 1)
- end,
- FoldFun = fun({PSeq, UUID, Id, Revs}, {Count, Infos, _}) ->
- NewCount = Count + length(Revs),
- NewInfos = [{UUID, Id, Revs} | Infos],
- Status = if NewCount < BatchSize -> ok; true -> stop end,
- {Status, {NewCount, NewInfos, PSeq}}
- end,
- InitAcc = {0, [], StartSeq},
- {ok, {_, Infos, ThroughSeq}} =
- couch_db:fold_purge_infos(Db, StartSeq, FoldFun, InitAcc),
- if Infos == [] -> ok; true ->
- ok = purge_on_target(TgtNode, TgtDbName, Infos),
- Body = purge_cp_body(SrcShard, TgtShard, ThroughSeq),
- Doc = #doc{id = LocalPurgeId, body = Body},
- {ok, _} = couch_db:update_doc(Db, Doc, [])
- end,
- Tgt#tgt{remaining = max(0, couch_db:get_purge_seq(Db) - ThroughSeq)}.
-calculate_start_seq_multi(#acc{} = Acc) ->
- #acc{db = Db, targets = Targets0, filter = Filter} = Acc,
- FilterHash = filter_hash(Filter),
- Targets = maps:map(fun(_, #tgt{} = T) ->
- calculate_start_seq(Db, FilterHash, T)
- end, Targets0),
- % There will always be at least one target
- #tgt{seq = Seq0} = hd(maps:values(Targets)),
- Seq = maps:fold(fun(_, #tgt{seq = S}, M) -> min(S, M) end, Seq0, Targets),
- Acc#acc{seq = Seq, targets = Targets}.
-calculate_start_seq(Db, FilterHash, #tgt{shard = TgtShard} = Tgt) ->
- UUID = couch_db:get_uuid(Db),
- #shard{node = Node, name = Name} = TgtShard,
- {NewDocId, Doc} = mem3_rpc:load_checkpoint(Node, Name, node(), UUID,
- FilterHash),
- #doc{id=FoundId, body={TProps}} = Doc,
- Tgt1 = Tgt#tgt{localid = NewDocId},
- % NewDocId and FoundId may be different the first time
- % this code runs to save our newly named internal replication
- % checkpoints. We store NewDocId to use when saving checkpoints
- % but use FoundId to reuse the same docid that the target used.
- case couch_db:open_doc(Db, FoundId, [ejson_body]) of
- {ok, #doc{body = {SProps}}} ->
- SourceSeq = couch_util:get_value(<<"seq">>, SProps, 0),
- TargetSeq = couch_util:get_value(<<"seq">>, TProps, 0),
- % We resume from the lower update seq stored in the two
- % shard copies. We also need to be sure and use the
- % corresponding history. A difference here could result
- % from either a write failure on one of the nodes or if
- % either shard was truncated by an operator.
- case SourceSeq =< TargetSeq of
- true ->
- Seq = SourceSeq,
- History = couch_util:get_value(<<"history">>, SProps, {[]});
- false ->
- Seq = TargetSeq,
- History = couch_util:get_value(<<"history">>, TProps, {[]})
- end,
- Tgt1#tgt{seq = Seq, history = History};
- {not_found, _} ->
- compare_epochs(Db, Tgt1)
- end.
-push_changes(#acc{} = Acc0) ->
- #acc{
- db = Db0,
- seq = Seq
- } = Acc0,
- % Avoid needless rewriting the internal replication
- % checkpoint document if nothing is replicated.
- UpdateSeq = couch_db:get_update_seq(Db0),
- if Seq < UpdateSeq -> ok; true ->
- throw({finished, 0})
- end,
- with_src_db(Acc0, fun(Db) ->
- Acc1 = Acc0#acc{db = Db},
- Fun = fun ?MODULE:changes_enumerator/2,
- {ok, Acc2} = couch_db:fold_changes(Db, Seq, Fun, Acc1),
- {ok, #acc{seq = LastSeq}} = replicate_batch_multi(Acc2),
- {ok, couch_db:count_changes_since(Db, LastSeq)}
- end).
-compare_epochs(Db, #tgt{shard = TgtShard} = Tgt) ->
- #shard{node = Node, name = Name} = TgtShard,
- UUID = couch_db:get_uuid(Db),
- Epochs = couch_db:get_epochs(Db),
- Seq = mem3_rpc:find_common_seq(Node, Name, UUID, Epochs),
- Tgt#tgt{seq = Seq, history = {[]}}.
-changes_enumerator(#doc_info{id=DocId}, #acc{db=Db}=Acc) ->
- {ok, FDI} = couch_db:get_full_doc_info(Db, DocId),
- changes_enumerator(FDI, Acc);
-changes_enumerator(#full_doc_info{}=FDI, #acc{}=Acc0) ->
- #acc{
- revcount = C,
- targets = Targets0,
- hashfun = HashFun,
- incomplete_ranges = IncompleteRanges
- } = Acc0,
- #doc_info{high_seq=Seq, revs=Revs} = couch_doc:to_doc_info(FDI),
- {Count, Targets} = case filter_doc(Acc0#acc.filter, FDI) of
- keep ->
- NewTargets = changes_append_fdi(FDI, Targets0, HashFun,
- IncompleteRanges),
- {C + length(Revs), NewTargets};
- discard ->
- {C, Targets0}
- end,
- Acc1 = Acc0#acc{seq = Seq, revcount = Count, targets = Targets},
- Go = if Count < Acc1#acc.batch_size -> ok; true -> stop end,
- {Go, Acc1}.
-changes_append_fdi(#full_doc_info{id = Id} = FDI, Targets, HashFun,
- IncompleteRanges) ->
- case mem3_reshard_job:pickfun(Id, maps:keys(Targets), HashFun) of
- not_in_range when IncompleteRanges ->
- Targets;
- not_in_range when not IncompleteRanges ->
- ErrMsg = "~p : ~p not in any target ranges: ~p",
- TShards = [TS || #tgt{shard = TS} <- maps:values(Targets)],
- TNames = [TN || #shard{name = TN} <- TShards],
- couch_log:error(ErrMsg, [?MODULE, Id, TNames]),
- error({error, {Id, not_in_target_ranges}});
- Key ->
- maps:update_with(Key, fun(#tgt{infos = Infos} = T) ->
- T#tgt{infos = [FDI | Infos]}
- end, Targets)
- end.
-replicate_batch_multi(#acc{targets = Targets0, seq = Seq, db = Db} = Acc) ->
- Targets = maps:map(fun(_, #tgt{} = T) ->
- replicate_batch(T, Db, Seq)
- end, Targets0),
- {ok, Acc#acc{targets = Targets, revcount = 0}}.
-replicate_batch(#tgt{shard = TgtShard, infos = Infos} = Target, Db, Seq) ->
- #shard{node = Node, name = Name} = TgtShard,
- case find_missing_revs(Target) of
- [] ->
- ok;
- Missing ->
- lists:map(fun(Chunk) ->
- Docs = open_docs(Db, Infos, Chunk),
- ok = save_on_target(Node, Name, Docs)
- end, chunk_revs(Missing))
- end,
- update_locals(Target, Db, Seq),
- Target#tgt{infos = []}.
-find_missing_revs(#tgt{shard = TgtShard, infos = Infos}) ->
- #shard{node = Node, name = Name} = TgtShard,
- IdsRevs = lists:map(fun(FDI) ->
- #doc_info{id=Id, revs=RevInfos} = couch_doc:to_doc_info(FDI),
- {Id, [R || #rev_info{rev=R} <- RevInfos]}
- end, Infos),
- Missing = mem3_rpc:get_missing_revs(Node, Name, IdsRevs, [
- {io_priority, {internal_repl, Name}},
- ]),
- lists:filter(fun
- ({_Id, [], _Ancestors}) -> false;
- ({_Id, _Revs, _Ancestors}) -> true
- end, Missing).
-chunk_revs(Revs) ->
- Limit = list_to_integer(config:get("mem3", "rev_chunk_size", "5000")),
- chunk_revs(Revs, Limit).
-chunk_revs(Revs, Limit) ->
- chunk_revs(Revs, {0, []}, [], Limit).
-chunk_revs([], {_Count, Chunk}, Chunks, _Limit) ->
- [Chunk|Chunks];
-chunk_revs([{Id, R, A}|Revs], {Count, Chunk}, Chunks, Limit) when length(R) =< Limit - Count ->
- chunk_revs(
- Revs,
- {Count + length(R), [{Id, R, A}|Chunk]},
- Chunks,
- Limit
- );
-chunk_revs([{Id, R, A}|Revs], {Count, Chunk}, Chunks, Limit) ->
- {This, Next} = lists:split(Limit - Count, R),
- chunk_revs(
- [{Id, Next, A}|Revs],
- {0, []},
- [[{Id, This, A}|Chunk]|Chunks],
- Limit
- ).
-open_docs(Db, Infos, Missing) ->
- lists:flatmap(fun({Id, Revs, _}) ->
- FDI = lists:keyfind(Id,, Infos),
- #full_doc_info{rev_tree=RevTree} = FDI,
- {FoundRevs, _} = couch_key_tree:get_key_leafs(RevTree, Revs),
- lists:map(fun({#leaf{deleted=IsDel, ptr=SummaryPtr}, FoundRevPath}) ->
- couch_db:make_doc(Db, Id, IsDel, SummaryPtr, FoundRevPath)
- end, FoundRevs)
- end, Missing).
-save_on_target(Node, Name, Docs) ->
- mem3_rpc:update_docs(Node, Name, Docs, [
- replicated_changes,
- full_commit,
- {io_priority, {internal_repl, Name}}
- ]),
- ok.
-purge_on_target(Node, Name, PurgeInfos) ->
- mem3_rpc:purge_docs(Node, Name, PurgeInfos, [
- replicated_changes,
- full_commit,
- {io_priority, {internal_repl, Name}}
- ]),
- ok.
-update_locals(Target, Db, Seq) ->
- #tgt{shard = TgtShard, localid = Id, history = History} = Target,
- #shard{node = Node, name = Name} = TgtShard,
- NewEntry = [
- {<<"source_node">>, atom_to_binary(node(), utf8)},
- {<<"source_uuid">>, couch_db:get_uuid(Db)},
- {<<"source_seq">>, Seq},
- {<<"timestamp">>, list_to_binary(mem3_util:iso8601_timestamp())}
- ],
- NewBody = mem3_rpc:save_checkpoint(Node, Name, Id, Seq, NewEntry, History),
- {ok, _} = couch_db:update_doc(Db, #doc{id = Id, body = NewBody}, []).
-purge_cp_body(#shard{} = Source, #shard{} = Target, PurgeSeq) ->
- {Mega, Secs, _} = os:timestamp(),
- NowSecs = Mega * 1000000 + Secs,
- {[
- {<<"type">>, <<"internal_replication">>},
- {<<"updated_on">>, NowSecs},
- {<<"purge_seq">>, PurgeSeq},
- {<<"source">>, atom_to_binary(Source#shard.node, latin1)},
- {<<"target">>, atom_to_binary(Target#shard.node, latin1)},
- {<<"range">>, Source#shard.range}
- ]}.
-find_repl_doc(SrcDb, TgtUUIDPrefix) ->
- SrcUUID = couch_db:get_uuid(SrcDb),
- S = local_id_hash(SrcUUID),
- DocIdPrefix = <<"_local/shard-sync-", S/binary, "-">>,
- FoldFun = fun(#doc{id = DocId, body = {BodyProps}} = Doc, _) ->
- TgtUUID = couch_util:get_value(<<"target_uuid">>, BodyProps, <<>>),
- case is_prefix(DocIdPrefix, DocId) of
- true ->
- case is_prefix(TgtUUIDPrefix, TgtUUID) of
- true ->
- {stop, {TgtUUID, Doc}};
- false ->
- {ok, not_found}
- end;
- _ ->
- {stop, not_found}
- end
- end,
- Options = [{start_key, DocIdPrefix}],
- case couch_db:fold_local_docs(SrcDb, FoldFun, not_found, Options) of
- {ok, {TgtUUID, Doc}} ->
- {ok, TgtUUID, Doc};
- {ok, not_found} ->
- {not_found, missing};
- Else ->
- couch_log:error("Error finding replication doc: ~w", [Else]),
- {not_found, missing}
- end.
-find_split_target_seq_int(TgtDb, Node, SrcUUIDPrefix) ->
- TgtUUID = couch_db:get_uuid(TgtDb),
- FoldFun = fun(#doc{body = {Props}}, _) ->
- DocTgtUUID = couch_util:get_value(<<"target_uuid">>, Props, <<>>),
- case TgtUUID == DocTgtUUID of
- true ->
- {History} = couch_util:get_value(<<"history">>, Props, {[]}),
- HProps = couch_util:get_value(Node, History, []),
- case get_target_seqs(HProps, TgtUUID, Node, SrcUUIDPrefix, []) of
- [] ->
- % No replication found from source to target
- {ok, not_found};
- [{_, _} | _] = SeqPairs ->
- % Found shared replicated history from source to target
- % Return sorted list by the earliest source sequence
- {stop, lists:sort(SeqPairs)}
- end;
- false ->
- {ok, not_found}
- end
- end,
- Options = [{start_key, <<"_local/shard-sync-">>}],
- case couch_db:fold_local_docs(TgtDb, FoldFun, not_found, Options) of
- {ok, Seqs} when is_list(Seqs) ->
- {ok, Seqs};
- {ok, not_found} ->
- {not_found, missing};
- Else ->
- couch_log:error("Error finding replication doc: ~w", [Else]),
- {not_found, missing}
- end.
-% Get target sequences for each checkpoint when source replicated to the target
-% The "target" is the current db where the history entry was read from and "source"
-% is another, now possibly deleted, database.
-get_target_seqs([], _TgtUUID, _Node, _SrcUUIDPrefix, Acc) ->
- lists:reverse(Acc);
-get_target_seqs([{Entry} | HProps], TgtUUID, Node, SrcUUIDPrefix, Acc) ->
- SameTgt = couch_util:get_value(<<"target_uuid">>, Entry) =:= TgtUUID,
- SameNode = couch_util:get_value(<<"target_node">>, Entry) =:= Node,
- SrcUUID = couch_util:get_value(<<"source_uuid">>, Entry),
- IsPrefix = is_prefix(SrcUUIDPrefix, SrcUUID),
- Acc1 = case SameTgt andalso SameNode andalso IsPrefix of
- true ->
- EntrySourceSeq = couch_util:get_value(<<"source_seq">>, Entry),
- EntryTargetSeq = couch_util:get_value(<<"target_seq">>, Entry),
- [{EntrySourceSeq, EntryTargetSeq} | Acc];
- false ->
- Acc
- end,
- get_target_seqs(HProps, TgtUUID, Node, SrcUUIDPrefix, Acc1).
-with_src_db(#acc{source = Source}, Fun) ->
- case couch_db:open(, [?ADMIN_CTX]) of
- {ok, Db} ->
- try
- Fun(Db)
- after
- couch_db:close(Db)
- end;
- {not_found, _} ->
- error({error, missing_source})
- end.
-is_prefix(Prefix, Subject) ->
- binary:longest_common_prefix([Prefix, Subject]) == size(Prefix).
-filter_doc(Filter, FullDocInfo) when is_function(Filter) ->
- try Filter(FullDocInfo) of
- discard -> discard;
- _ -> keep
- catch _:_ ->
- keep
- end;
-filter_doc(_, _) ->
- keep.
-sync_security(#shard{} = Source, #{} = Targets) ->
- maps:map(fun(_, #tgt{shard = Target}) ->
- mem3_sync_security:maybe_sync(Source, Target)
- end, Targets).
-targets_map(#shard{name = <<"shards/", _/binary>> = SrcName} = Src,
- #shard{name = <<"shards/", _/binary>>, node = TgtNode} = Tgt) ->
- % Parse range from name in case the passed shard is built with a name only
- SrcRange = mem3:range(SrcName),
- Shards0 = mem3:shards(mem3:dbname(SrcName)),
- Shards1 = [S || S <- Shards0, not shard_eq(S, Src)],
- Shards2 = [S || S <- Shards1, check_overlap(SrcRange, TgtNode, S)],
- case [{R, S} || #shard{range = R} = S <- Shards2] of
- [] ->
- % If target map is empty, create a target map with just
- % that one target. This is to support tooling which may be
- % moving / copying shards using mem3:go/2,3 before the
- % shards are present in the shard map
- #{mem3:range(SrcName) => Tgt};
- [_ | _] = TMapList->
- maps:from_list(TMapList)
- end;
-targets_map(_Src, Tgt) ->
- #{[0, ?RING_END] => Tgt}.
-shard_eq(#shard{name = Name, node = Node}, #shard{name = Name, node = Node}) ->
- true;
-shard_eq(_, _) ->
- false.
-check_overlap(SrcRange, Node, #shard{node = Node, range = TgtRange}) ->
- mem3_util:range_overlap(SrcRange, TgtRange);
-check_overlap([_, _], _, #shard{}) ->
- false.
-reset_remaining(#{} = Targets) ->
- maps:map(fun(_, #tgt{} = T) ->
- T#tgt{remaining = 0}
- end, Targets).
-find_source_seq_unknown_node_test() ->
- ?assertEqual(
- find_source_seq_int(doc_(), <<"foo">>, <<"bing">>, <<"bar_uuid">>, 10),
- 0
- ).
-find_source_seq_unknown_uuid_test() ->
- ?assertEqual(
- find_source_seq_int(doc_(), <<"foo">>, <<"bar">>, <<"teapot">>, 10),
- 0
- ).
-find_source_seq_ok_test() ->
- ?assertEqual(
- find_source_seq_int(doc_(), <<"foo">>, <<"bar">>, <<"bar_uuid">>, 100),
- 100
- ).
-find_source_seq_old_ok_test() ->
- ?assertEqual(
- find_source_seq_int(doc_(), <<"foo">>, <<"bar">>, <<"bar_uuid">>, 84),
- 50
- ).
-find_source_seq_different_node_test() ->
- ?assertEqual(
- find_source_seq_int(doc_(), <<"foo2">>, <<"bar">>, <<"bar_uuid">>, 92),
- 31
- ).
--define(SNODE, <<"source_node">>).
--define(SUUID, <<"source_uuid">>).
--define(SSEQ, <<"source_seq">>).
--define(TNODE, <<"target_node">>).
--define(TUUID, <<"target_uuid">>).
--define(TSEQ, <<"target_seq">>).
-doc_() ->
- Foo_Bar = [
- {[
- {?SNODE, <<"foo">>}, {?SUUID, <<"foo_uuid">>}, {?SSEQ, 100},
- {?TNODE, <<"bar">>}, {?TUUID, <<"bar_uuid">>}, {?TSEQ, 100}
- ]},
- {[
- {?SNODE, <<"foo">>}, {?SUUID, <<"foo_uuid">>}, {?SSEQ, 90},
- {?TNODE, <<"bar">>}, {?TUUID, <<"bar_uuid">>}, {?TSEQ, 85}
- ]},
- {[
- {?SNODE, <<"foo">>}, {?SUUID, <<"foo_uuid">>}, {?SSEQ, 50},
- {?TNODE, <<"bar">>}, {?TUUID, <<"bar_uuid">>}, {?TSEQ, 51}
- ]},
- {[
- {?SNODE, <<"foo">>}, {?SUUID, <<"foo_uuid">>}, {?SSEQ, 40},
- {?TNODE, <<"bar">>}, {?TUUID, <<"bar_uuid">>}, {?TSEQ, 45}
- ]},
- {[
- {?SNODE, <<"foo">>}, {?SUUID, <<"foo_uuid">>}, {?SSEQ, 2},
- {?TNODE, <<"bar">>}, {?TUUID, <<"bar_uuid">>}, {?TSEQ, 2}
- ]}
- ],
- Foo2_Bar = [
- {[
- {?SNODE, <<"foo2">>}, {?SUUID, <<"foo_uuid">>}, {?SSEQ, 100},
- {?TNODE, <<"bar">>}, {?TUUID, <<"bar_uuid">>}, {?TSEQ, 100}
- ]},
- {[
- {?SNODE, <<"foo2">>}, {?SUUID, <<"foo_uuid">>}, {?SSEQ, 92},
- {?TNODE, <<"bar">>}, {?TUUID, <<"bar_uuid">>}, {?TSEQ, 93}
- ]},
- {[
- {?SNODE, <<"foo2">>}, {?SUUID, <<"foo_uuid">>}, {?SSEQ, 31},
- {?TNODE, <<"bar">>}, {?TUUID, <<"bar_uuid">>}, {?TSEQ, 30}
- ]}
- ],
- History = {[
- {<<"foo">>, Foo_Bar},
- {<<"foo2">>, Foo2_Bar}
- ]},
- #doc{
- body={[{<<"history">>, History}]}
- }.
-targets_map_test_() ->
- {
- setup,
- fun() -> meck:new(mem3, [passthrough]) end,
- fun(_) -> meck:unload() end,
- [
- target_not_a_shard(),
- source_contained_in_target(),
- multiple_targets(),
- uneven_overlap(),
- target_not_in_shard_map()
- ]
- }.
-target_not_a_shard() ->
- ?_assertEqual(#{[0, ?RING_END] => <<"t">>}, targets_map(<<"s">>, <<"t">>)).
-source_contained_in_target() ->
- ?_test(begin
- R07 = [16#00000000, 16#7fffffff],
- R8f = [16#80000000, 16#ffffffff],
- R0f = [16#00000000, 16#ffffffff],
- Shards = [
- #shard{node = 'n1', range = R07},
- #shard{node = 'n1', range = R8f},
- #shard{node = 'n2', range = R07},
- #shard{node = 'n2', range = R8f},
- #shard{node = 'n3', range = R0f}
- ],
- meck:expect(mem3, shards, 1, Shards),
- SrcName1 = <<"shards/00000000-7fffffff/d.1551893552">>,
- TgtName1 = <<"shards/00000000-7fffffff/d.1551893552">>,
- Src1 = #shard{name = SrcName1, node = 'n1'},
- Tgt1 = #shard{name = TgtName1, node = 'n2'},
- Map1 = targets_map(Src1, Tgt1),
- ?assertEqual(1, map_size(Map1)),
- ?assertMatch(#{R07 := #shard{node = 'n2'}}, Map1),
- Tgt2 = #shard{name = TgtName1, node = 'n3'},
- Map2 = targets_map(Src1, Tgt2),
- ?assertEqual(1, map_size(Map2)),
- ?assertMatch(#{R0f := #shard{node = 'n3'}}, Map2)
- end).
-multiple_targets() ->
- ?_test(begin
- R07 = [16#00000000, 16#7fffffff],
- R8f = [16#80000000, 16#ffffffff],
- R0f = [16#00000000, 16#ffffffff],
- Shards = [
- #shard{node = 'n1', range = R07},
- #shard{node = 'n1', range = R8f},
- #shard{node = 'n2', range = R0f}
- ],
- meck:expect(mem3, shards, 1, Shards),
- SrcName = <<"shards/00000000-ffffffff/d.1551893552">>,
- TgtName = <<"shards/00000000-7fffffff/d.1551893552">>,
- Src = #shard{name = SrcName, node = 'n2'},
- Tgt = #shard{name = TgtName, node = 'n1'},
- Map = targets_map(Src, Tgt),
- ?assertEqual(2, map_size(Map)),
- ?assertMatch(#{R07 := #shard{node = 'n1'}}, Map),
- ?assertMatch(#{R8f := #shard{node = 'n1'}}, Map)
- end).
-uneven_overlap() ->
- ?_test(begin
- R04 = [16#00000000, 16#4fffffff],
- R26 = [16#20000000, 16#6fffffff],
- R58 = [16#50000000, 16#8fffffff],
- R9f = [16#90000000, 16#ffffffff],
- Shards = [
- #shard{node = 'n1', range = R04},
- #shard{node = 'n1', range = R58},
- #shard{node = 'n1', range = R9f},
- #shard{node = 'n2', range = R26}
- ],
- meck:expect(mem3, shards, 1, Shards),
- SrcName = <<"shards/20000000-6fffffff/d.1551893552">>,
- TgtName = <<"shards/20000000-6fffffff/d.1551893552">>,
- Src = #shard{name = SrcName, node = 'n2'},
- Tgt = #shard{name = TgtName, node = 'n1'},
- Map = targets_map(Src, Tgt),
- ?assertEqual(2, map_size(Map)),
- ?assertMatch(#{R04 := #shard{node = 'n1'}}, Map),
- ?assertMatch(#{R58 := #shard{node = 'n1'}}, Map)
- end).
-target_not_in_shard_map() ->
- ?_test(begin
- R0f = [16#00000000, 16#ffffffff],
- Name = <<"shards/00000000-ffffffff/d.1551893552">>,
- Shards = [
- #shard{name = Name, node = 'n1', range = R0f},
- #shard{name = Name, node = 'n2', range = R0f}
- ],
- meck:expect(mem3, shards, 1, Shards),
- Src = #shard{name = Name, node = 'n1'},
- Tgt = #shard{name = Name, node = 'n3'},
- Map = targets_map(Src, Tgt),
- ?assertEqual(1, map_size(Map)),
- ?assertMatch(#{R0f := #shard{name = Name, node = 'n3'}}, Map)
- end).