summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2019-03-18 13:32:15 -0400
committerNick Vatamaniuc <nickva@users.noreply.github.com>2019-04-03 10:48:45 -0400
commitb7d5b5d4661e742596a8b7b9302e0e3aa9f4cf5d (patch)
tree0d125ff27588e016cde166f24e8e35d09af9a3ea
parent13db67efeebccce0b92556aa5a356a38d1b28200 (diff)
downloadcouchdb-b7d5b5d4661e742596a8b7b9302e0e3aa9f4cf5d.tar.gz
Update internal replicator to handle split shards
Shard splitting will result in uneven shard copies. Previously internal replicator knew to replicate from one shard copy to another but now it needs to know how to replicate from one source to possibly multiple targets. The main idea is to reuse the same logic and "pick" function as `couch_db_split`. But to avoid a penalty of calling the custom hash function for every document even for cases when there is just a single target, there is a special "1 target" case where the hash function is `undefined`. Another case where internal replicator is used is to topoff replication and to replicate the shard map dbs to and from current node (used in shard map update logic). For that reason there are a few helper mem3_util and mem3_rpc functions.
-rw-r--r--src/mem3/src/mem3_rep.erl672
-rw-r--r--src/mem3/src/mem3_rpc.erl49
-rw-r--r--src/mem3/test/mem3_rep_test.erl320
3 files changed, 849 insertions, 192 deletions
diff --git a/src/mem3/src/mem3_rep.erl b/src/mem3/src/mem3_rep.erl
index a30630167..d5b42d315 100644
--- a/src/mem3/src/mem3_rep.erl
+++ b/src/mem3/src/mem3_rep.erl
@@ -17,9 +17,12 @@
go/2,
go/3,
make_local_id/2,
+ make_local_id/3,
make_purge_id/2,
verify_purge_checkpoint/2,
- find_source_seq/4
+ find_source_seq/4,
+ find_split_target_seq/4,
+ local_id_hash/1
]).
-export([
@@ -33,18 +36,25 @@
-record(acc, {
batch_size,
batch_count,
- revcount = 0,
- infos = [],
seq = 0,
- localid,
- purgeid,
+ revcount = 0,
source,
- target,
+ targets,
filter,
db,
- history = {[]}
+ hashfun,
+ incomplete_ranges
}).
+-record(tgt, {
+ shard,
+ seq = 0,
+ infos = [],
+ localid,
+ purgeid,
+ history = {[]},
+ remaining = 0
+}).
go(Source, Target) ->
go(Source, Target, []).
@@ -53,36 +63,51 @@ go(Source, Target) ->
go(DbName, Node, Opts) when is_binary(DbName), is_atom(Node) ->
go(#shard{name=DbName, node=node()}, #shard{name=DbName, node=Node}, Opts);
-
go(#shard{} = Source, #shard{} = Target, Opts) ->
- mem3_sync_security:maybe_sync(Source, Target),
- BatchSize = case proplists:get_value(batch_size, Opts) of
- BS when is_integer(BS), BS > 0 -> BS;
- _ -> 100
- end,
- BatchCount = case proplists:get_value(batch_count, Opts) of
- all -> all;
- BC when is_integer(BC), BC > 0 -> BC;
- _ -> 1
- end,
- Filter = proplists:get_value(filter, Opts),
- Acc = #acc{
- batch_size = BatchSize,
- batch_count = BatchCount,
- source = Source,
- target = Target,
- filter = Filter
- },
- go(Acc).
+ go(Source, targets_map(Source, Target), Opts);
+
+go(#shard{} = Source, #{} = Targets0, Opts) when map_size(Targets0) > 0 ->
+ Targets = maps:map(fun(_, T) -> #tgt{shard = T} end, Targets0),
+ case couch_server:exists(Source#shard.name) of
+ true ->
+ sync_security(Source, Targets),
+ BatchSize = case proplists:get_value(batch_size, Opts) of
+ BS when is_integer(BS), BS > 0 -> BS;
+ _ -> 100
+ end,
+ BatchCount = case proplists:get_value(batch_count, Opts) of
+ all -> all;
+ BC when is_integer(BC), BC > 0 -> BC;
+ _ -> 1
+ end,
+ IncompleteRanges = config:get_boolean("mem3", "incomplete_ranges",
+ false),
+ Filter = proplists:get_value(filter, Opts),
+ Acc = #acc{
+ batch_size = BatchSize,
+ batch_count = BatchCount,
+ source = Source,
+ targets = Targets,
+ filter = Filter,
+ incomplete_ranges = IncompleteRanges
+ },
+ go(Acc);
+ false ->
+ {error, missing_source}
+ end.
go(#acc{source=Source, batch_count=BC}=Acc) ->
case couch_db:open(Source#shard.name, [?ADMIN_CTX]) of
{ok, Db} ->
Resp = try
- repl(Acc#acc{db = Db})
- catch error:{not_found, no_db_file} ->
- {error, missing_target}
+ HashFun = mem3_hash:get_hash_fun(couch_db:name(Db)),
+ repl(Acc#acc{db = Db, hashfun = HashFun})
+ catch
+ error:{error, missing_source} ->
+ {error, missing_source};
+ error:{not_found, no_db_file} ->
+ {error, missing_target}
after
couch_db:close(Db)
end,
@@ -106,21 +131,31 @@ make_local_id(Source, Target) ->
make_local_id(#shard{node=SourceNode}, #shard{node=TargetNode}, Filter) ->
make_local_id(SourceNode, TargetNode, Filter);
+make_local_id(SourceThing, TargetThing, F) when is_binary(F) ->
+ S = local_id_hash(SourceThing),
+ T = local_id_hash(TargetThing),
+ <<"_local/shard-sync-", S/binary, "-", T/binary, F/binary>>;
make_local_id(SourceThing, TargetThing, Filter) ->
- S = couch_util:encodeBase64Url(couch_hash:md5_hash(term_to_binary(SourceThing))),
- T = couch_util:encodeBase64Url(couch_hash:md5_hash(term_to_binary(TargetThing))),
- F = case is_function(Filter) of
- true ->
- {new_uniq, Hash} = erlang:fun_info(Filter, new_uniq),
- B = couch_util:encodeBase64Url(Hash),
- <<"-", B/binary>>;
- false ->
- <<>>
- end,
+ S = local_id_hash(SourceThing),
+ T = local_id_hash(TargetThing),
+ F = filter_hash(Filter),
<<"_local/shard-sync-", S/binary, "-", T/binary, F/binary>>.
+filter_hash(Filter) when is_function(Filter) ->
+ {new_uniq, Hash} = erlang:fun_info(Filter, new_uniq),
+ B = couch_util:encodeBase64Url(Hash),
+ <<"-", B/binary>>;
+
+filter_hash(_) ->
+ <<>>.
+
+
+local_id_hash(Thing) ->
+ couch_util:encodeBase64Url(couch_hash:md5_hash(term_to_binary(Thing))).
+
+
make_purge_id(SourceUUID, TargetUUID) ->
<<"_local/purge-mem3-", SourceUUID/binary, "-", TargetUUID/binary>>.
@@ -212,14 +247,48 @@ find_source_seq_int(#doc{body={Props}}, SrcNode0, TgtNode0, TgtUUID, TgtSeq) ->
end.
+find_split_target_seq(TgtDb, SrcNode0, SrcUUIDPrefix, SrcSeq) ->
+ SrcNode = case is_atom(SrcNode0) of
+ true -> atom_to_binary(SrcNode0, utf8);
+ false -> SrcNode0
+ end,
+ case find_split_target_seq_int(TgtDb, SrcNode, SrcUUIDPrefix) of
+ {ok, [{BulkCopySeq, BulkCopySeq} | _]} when SrcSeq =< BulkCopySeq ->
+ % Check if source sequence is at or below the initial bulk copy
+ % checkpointed sequence. That sequence or anything lower than it
+ % can be directly replaced with the same value for each target. For
+ % extra safety we assert that the initial source and target
+ % sequences are the same value
+ SrcSeq;
+ {ok, Seqs= [{_, _} | _]} ->
+ % Pick the target sequence for the greatest source sequence that is
+ % less than `SrcSeq`.
+ case lists:takewhile(fun({Seq, _}) -> Seq < SrcSeq end, Seqs) of
+ [] ->
+ couch_log:warning("~p find_split_target_seq target seq not found "
+ "tgt_db: ~p, src_uuid_prefix: ~p, src_seq: ~p",
+ [?MODULE, couch_db:name(TgtDb), SrcUUIDPrefix, SrcSeq]),
+ 0;
+ [{_, _} | _] = Seqs1 ->
+ {_, TSeq} = lists:last(Seqs1),
+ TSeq
+ end;
+ {not_found, _} ->
+ couch_log:warning("~p find_split_target_seq target seq not found "
+ "tgt_db: ~p, src_uuid_prefix: ~p, src_seq: ~p",
+ [?MODULE, couch_db:name(TgtDb), SrcUUIDPrefix, SrcSeq]),
+ 0
+ end.
+
+
repl(#acc{db = Db0} = Acc0) ->
erlang:put(io_priority, {internal_repl, couch_db:name(Db0)}),
- Acc1 = calculate_start_seq(Acc0),
+ Acc1 = calculate_start_seq_multi(Acc0),
try
Acc3 = case config:get_boolean("mem3", "replicate_purges", false) of
true ->
- Acc2 = pull_purges(Acc1),
- push_purges(Acc2);
+ Acc2 = pull_purges_multi(Acc1),
+ push_purges_multi(Acc2);
false ->
Acc1
end,
@@ -230,124 +299,104 @@ repl(#acc{db = Db0} = Acc0) ->
end.
-pull_purges(#acc{} = Acc0) ->
- #acc{
- batch_size = Count,
- seq = UpdateSeq,
- target = Target
- } = Acc0,
- #shard{
- node = TgtNode,
- name = TgtDbName
- } = Target,
-
+pull_purges_multi(#acc{source = Source} = Acc0) ->
+ #acc{batch_size = Count, seq = UpdateSeq, targets = Targets0} = Acc0,
with_src_db(Acc0, fun(Db) ->
- SrcUUID = couch_db:get_uuid(Db),
- {LocalPurgeId, Infos, ThroughSeq, Remaining} =
- mem3_rpc:load_purge_infos(TgtNode, TgtDbName, SrcUUID, Count),
-
- if Infos == [] -> ok; true ->
- {ok, _} = couch_db:purge_docs(Db, Infos, [replicated_edits]),
- Body = purge_cp_body(Acc0, ThroughSeq),
- mem3_rpc:save_purge_checkpoint(
- TgtNode, TgtDbName, LocalPurgeId, Body)
- end,
-
- if Remaining =< 0 -> ok; true ->
+ Targets = maps:map(fun(_, #tgt{} = T) ->
+ pull_purges(Db, Count, Source, T)
+ end, reset_remaining(Targets0)),
+ Remaining = maps:fold(fun(_, #tgt{remaining = R}, Sum) ->
+ Sum + R
+ end, 0, Targets),
+ if Remaining == 0 -> Acc0#acc{targets = Targets}; true ->
PurgeSeq = couch_db:get_purge_seq(Db),
OldestPurgeSeq = couch_db:get_oldest_purge_seq(Db),
PurgesToPush = PurgeSeq - OldestPurgeSeq,
Changes = couch_db:count_changes_since(Db, UpdateSeq),
- throw({finished, Remaining + PurgesToPush + Changes})
- end,
-
- Acc0#acc{purgeid = LocalPurgeId}
+ Pending = Remaining + PurgesToPush + Changes,
+ throw({finished, Pending})
+ end
end).
-push_purges(#acc{} = Acc0) ->
- #acc{
- batch_size = BatchSize,
- purgeid = LocalPurgeId,
- seq = UpdateSeq,
- target = Target
- } = Acc0,
- #shard{
- node = TgtNode,
- name = TgtDbName
- } = Target,
-
- with_src_db(Acc0, fun(Db) ->
- StartSeq = case couch_db:open_doc(Db, LocalPurgeId, []) of
- {ok, #doc{body = {Props}}} ->
- couch_util:get_value(<<"purge_seq">>, Props);
- {not_found, _} ->
- Oldest = couch_db:get_oldest_purge_seq(Db),
- erlang:max(0, Oldest - 1)
- end,
-
- FoldFun = fun({PSeq, UUID, Id, Revs}, {Count, Infos, _}) ->
- NewCount = Count + length(Revs),
- NewInfos = [{UUID, Id, Revs} | Infos],
- Status = if NewCount < BatchSize -> ok; true -> stop end,
- {Status, {NewCount, NewInfos, PSeq}}
- end,
- InitAcc = {0, [], StartSeq},
- {ok, {_, Infos, ThroughSeq}} =
- couch_db:fold_purge_infos(Db, StartSeq, FoldFun, InitAcc),
-
- if Infos == [] -> ok; true ->
- ok = purge_on_target(TgtNode, TgtDbName, Infos),
- Doc = #doc{
- id = LocalPurgeId,
- body = purge_cp_body(Acc0, ThroughSeq)
- },
- {ok, _} = couch_db:update_doc(Db, Doc, [])
- end,
-
- PurgeSeq = couch_db:get_purge_seq(Db),
- if ThroughSeq >= PurgeSeq -> ok; true ->
- Remaining = PurgeSeq - ThroughSeq,
+pull_purges(Db, Count, SrcShard, #tgt{} = Tgt0) ->
+ #tgt{shard = TgtShard} = Tgt0,
+ SrcUUID = couch_db:get_uuid(Db),
+ #shard{node = TgtNode, name = TgtDbName} = TgtShard,
+ {LocalPurgeId, Infos, ThroughSeq, Remaining} =
+ mem3_rpc:load_purge_infos(TgtNode, TgtDbName, SrcUUID, Count),
+ Tgt = Tgt0#tgt{purgeid = LocalPurgeId},
+ if Infos == [] -> ok; true ->
+ {ok, _} = couch_db:purge_docs(Db, Infos, [replicated_edits]),
+ Body = purge_cp_body(SrcShard, TgtShard, ThroughSeq),
+ mem3_rpc:save_purge_checkpoint(TgtNode, TgtDbName, LocalPurgeId, Body)
+ end,
+ Tgt#tgt{remaining = max(0, Remaining)}.
+
+
+push_purges_multi(#acc{source = SrcShard} = Acc) ->
+ #acc{batch_size = BatchSize, seq = UpdateSeq, targets = Targets0} = Acc,
+ with_src_db(Acc, fun(Db) ->
+ Targets = maps:map(fun(_, #tgt{} = T) ->
+ push_purges(Db, BatchSize, SrcShard, T)
+ end, reset_remaining(Targets0)),
+ Remaining = maps:fold(fun(_, #tgt{remaining = R}, Sum) ->
+ Sum + R
+ end, 0, Targets),
+ if Remaining == 0 -> Acc#acc{targets = Targets}; true ->
Changes = couch_db:count_changes_since(Db, UpdateSeq),
throw({finished, Remaining + Changes})
- end,
-
- Acc0
+ end
end).
-push_changes(#acc{} = Acc0) ->
- #acc{
- db = Db0,
- seq = Seq
- } = Acc0,
-
- % Avoid needless rewriting the internal replication
- % checkpoint document if nothing is replicated.
- UpdateSeq = couch_db:get_update_seq(Db0),
- if Seq < UpdateSeq -> ok; true ->
- throw({finished, 0})
+push_purges(Db, BatchSize, SrcShard, Tgt) ->
+ #tgt{shard = TgtShard, purgeid = LocalPurgeId} = Tgt,
+ #shard{node = TgtNode, name = TgtDbName} = TgtShard,
+ StartSeq = case couch_db:open_doc(Db, LocalPurgeId, []) of
+ {ok, #doc{body = {Props}}} ->
+ couch_util:get_value(<<"purge_seq">>, Props);
+ {not_found, _} ->
+ Oldest = couch_db:get_oldest_purge_seq(Db),
+ erlang:max(0, Oldest - 1)
+ end,
+ FoldFun = fun({PSeq, UUID, Id, Revs}, {Count, Infos, _}) ->
+ NewCount = Count + length(Revs),
+ NewInfos = [{UUID, Id, Revs} | Infos],
+ Status = if NewCount < BatchSize -> ok; true -> stop end,
+ {Status, {NewCount, NewInfos, PSeq}}
end,
+ InitAcc = {0, [], StartSeq},
+ {ok, {_, Infos, ThroughSeq}} =
+ couch_db:fold_purge_infos(Db, StartSeq, FoldFun, InitAcc),
+ if Infos == [] -> ok; true ->
+ ok = purge_on_target(TgtNode, TgtDbName, Infos),
+ Body = purge_cp_body(SrcShard, TgtShard, ThroughSeq),
+ Doc = #doc{id = LocalPurgeId, body = Body},
+ {ok, _} = couch_db:update_doc(Db, Doc, [])
+ end,
+ Tgt#tgt{remaining = max(0, couch_db:get_purge_seq(Db) - ThroughSeq)}.
- with_src_db(Acc0, fun(Db) ->
- Acc1 = Acc0#acc{db = Db},
- Fun = fun ?MODULE:changes_enumerator/2,
- {ok, Acc2} = couch_db:fold_changes(Db, Seq, Fun, Acc1),
- {ok, #acc{seq = LastSeq}} = replicate_batch(Acc2),
- {ok, couch_db:count_changes_since(Db, LastSeq)}
- end).
+calculate_start_seq_multi(#acc{} = Acc) ->
+ #acc{db = Db, targets = Targets0, filter = Filter} = Acc,
+ FilterHash = filter_hash(Filter),
+ Targets = maps:map(fun(_, #tgt{} = T) ->
+ calculate_start_seq(Db, FilterHash, T)
+ end, Targets0),
+ % There will always be at least one target
+ #tgt{seq = Seq0} = hd(maps:values(Targets)),
+ Seq = maps:fold(fun(_, #tgt{seq = S}, M) -> min(S, M) end, Seq0, Targets),
+ Acc#acc{seq = Seq, targets = Targets}.
-calculate_start_seq(Acc) ->
- #acc{
- db = Db,
- target = #shard{node=Node, name=Name}
- } = Acc,
- %% Give the target our UUID and ask it to return the checkpoint doc
+
+calculate_start_seq(Db, FilterHash, #tgt{shard = TgtShard} = Tgt) ->
UUID = couch_db:get_uuid(Db),
- {NewDocId, Doc} = mem3_rpc:load_checkpoint(Node, Name, node(), UUID),
+ #shard{node = Node, name = Name} = TgtShard,
+ {NewDocId, Doc} = mem3_rpc:load_checkpoint(Node, Name, node(), UUID,
+ FilterHash),
#doc{id=FoundId, body={TProps}} = Doc,
- Acc1 = Acc#acc{localid = NewDocId},
+ Tgt1 = Tgt#tgt{localid = NewDocId},
% NewDocId and FoundId may be different the first time
% this code runs to save our newly named internal replication
% checkpoints. We store NewDocId to use when saving checkpoints
@@ -369,58 +418,108 @@ calculate_start_seq(Acc) ->
Seq = TargetSeq,
History = couch_util:get_value(<<"history">>, TProps, {[]})
end,
- Acc1#acc{seq = Seq, history = History};
+ Tgt1#tgt{seq = Seq, history = History};
{not_found, _} ->
- compare_epochs(Acc1)
+ compare_epochs(Db, Tgt1)
end.
-compare_epochs(Acc) ->
+
+push_changes(#acc{} = Acc0) ->
#acc{
- db = Db,
- target = #shard{node=Node, name=Name}
- } = Acc,
+ db = Db0,
+ seq = Seq
+ } = Acc0,
+
+ % Avoid needless rewriting the internal replication
+ % checkpoint document if nothing is replicated.
+ UpdateSeq = couch_db:get_update_seq(Db0),
+ if Seq < UpdateSeq -> ok; true ->
+ throw({finished, 0})
+ end,
+
+ with_src_db(Acc0, fun(Db) ->
+ Acc1 = Acc0#acc{db = Db},
+ Fun = fun ?MODULE:changes_enumerator/2,
+ {ok, Acc2} = couch_db:fold_changes(Db, Seq, Fun, Acc1),
+ {ok, #acc{seq = LastSeq}} = replicate_batch_multi(Acc2),
+ {ok, couch_db:count_changes_since(Db, LastSeq)}
+ end).
+
+
+compare_epochs(Db, #tgt{shard = TgtShard} = Tgt) ->
+ #shard{node = Node, name = Name} = TgtShard,
UUID = couch_db:get_uuid(Db),
Epochs = couch_db:get_epochs(Db),
Seq = mem3_rpc:find_common_seq(Node, Name, UUID, Epochs),
- Acc#acc{seq = Seq, history = {[]}}.
+ Tgt#tgt{seq = Seq, history = {[]}}.
+
changes_enumerator(#doc_info{id=DocId}, #acc{db=Db}=Acc) ->
{ok, FDI} = couch_db:get_full_doc_info(Db, DocId),
changes_enumerator(FDI, Acc);
-changes_enumerator(#full_doc_info{}=FDI, #acc{revcount=C, infos=Infos}=Acc0) ->
- #doc_info{
- high_seq=Seq,
- revs=Revs
- } = couch_doc:to_doc_info(FDI),
- {Count, NewInfos} = case filter_doc(Acc0#acc.filter, FDI) of
- keep -> {C + length(Revs), [FDI | Infos]};
- discard -> {C, Infos}
+changes_enumerator(#full_doc_info{}=FDI, #acc{}=Acc0) ->
+ #acc{
+ revcount = C,
+ targets = Targets0,
+ hashfun = HashFun,
+ incomplete_ranges = IncompleteRanges
+ } = Acc0,
+ #doc_info{high_seq=Seq, revs=Revs} = couch_doc:to_doc_info(FDI),
+ {Count, Targets} = case filter_doc(Acc0#acc.filter, FDI) of
+ keep ->
+ NewTargets = changes_append_fdi(FDI, Targets0, HashFun,
+ IncompleteRanges),
+ {C + length(Revs), NewTargets};
+ discard ->
+ {C, Targets0}
end,
- Acc1 = Acc0#acc{
- seq=Seq,
- revcount=Count,
- infos=NewInfos
- },
+ Acc1 = Acc0#acc{seq = Seq, revcount = Count, targets = Targets},
Go = if Count < Acc1#acc.batch_size -> ok; true -> stop end,
{Go, Acc1}.
-replicate_batch(#acc{target = #shard{node=Node, name=Name}} = Acc) ->
- case find_missing_revs(Acc) of
- [] ->
- ok;
- Missing ->
- lists:map(fun(Chunk) ->
- Docs = open_docs(Acc, Chunk),
+changes_append_fdi(#full_doc_info{id = Id} = FDI, Targets, HashFun,
+ IncompleteRanges) ->
+ case mem3_reshard_job:pickfun(Id, maps:keys(Targets), HashFun) of
+ not_in_range when IncompleteRanges ->
+ Targets;
+ not_in_range when not IncompleteRanges ->
+ ErrMsg = "~p : ~p not in any target ranges: ~p",
+ TShards = [TS || #tgt{shard = TS} <- maps:values(Targets)],
+ TNames = [TN || #shard{name = TN} <- TShards],
+ couch_log:error(ErrMsg, [?MODULE, Id, TNames]),
+ error({error, {Id, not_in_target_ranges}});
+ Key ->
+ maps:update_with(Key, fun(#tgt{infos = Infos} = T) ->
+ T#tgt{infos = [FDI | Infos]}
+ end, Targets)
+ end.
+
+
+replicate_batch_multi(#acc{targets = Targets0, seq = Seq, db = Db} = Acc) ->
+ Targets = maps:map(fun(_, #tgt{} = T) ->
+ replicate_batch(T, Db, Seq)
+ end, Targets0),
+ {ok, Acc#acc{targets = Targets, revcount = 0}}.
+
+
+replicate_batch(#tgt{shard = TgtShard, infos = Infos} = Target, Db, Seq) ->
+ #shard{node = Node, name = Name} = TgtShard,
+ case find_missing_revs(Target) of
+ [] ->
+ ok;
+ Missing ->
+ lists:map(fun(Chunk) ->
+ Docs = open_docs(Db, Infos, Chunk),
ok = save_on_target(Node, Name, Docs)
end, chunk_revs(Missing))
end,
- update_locals(Acc),
- {ok, Acc#acc{revcount=0, infos=[]}}.
+ update_locals(Target, Db, Seq),
+ Target#tgt{infos = []}.
-find_missing_revs(Acc) ->
- #acc{target = #shard{node=Node, name=Name}, infos = Infos} = Acc,
+find_missing_revs(#tgt{shard = TgtShard, infos = Infos}) ->
+ #shard{node = Node, name = Name} = TgtShard,
IdsRevs = lists:map(fun(FDI) ->
#doc_info{id=Id, revs=RevInfos} = couch_doc:to_doc_info(FDI),
{Id, [R || #rev_info{rev=R} <- RevInfos]}
@@ -461,7 +560,7 @@ chunk_revs([{Id, R, A}|Revs], {Count, Chunk}, Chunks, Limit) ->
).
-open_docs(#acc{db=Db, infos=Infos}, Missing) ->
+open_docs(Db, Infos, Missing) ->
lists:flatmap(fun({Id, Revs, _}) ->
FDI = lists:keyfind(Id, #full_doc_info.id, Infos),
#full_doc_info{rev_tree=RevTree} = FDI,
@@ -491,9 +590,10 @@ purge_on_target(Node, Name, PurgeInfos) ->
]),
ok.
-update_locals(Acc) ->
- #acc{seq=Seq, db=Db, target=Target, localid=Id, history=History} = Acc,
- #shard{name=Name, node=Node} = Target,
+
+update_locals(Target, Db, Seq) ->
+ #tgt{shard = TgtShard, localid = Id, history = History} = Target,
+ #shard{node = Node, name = Name} = TgtShard,
NewEntry = [
{<<"source_node">>, atom_to_binary(node(), utf8)},
{<<"source_uuid">>, couch_db:get_uuid(Db)},
@@ -504,11 +604,7 @@ update_locals(Acc) ->
{ok, _} = couch_db:update_doc(Db, #doc{id = Id, body = NewBody}, []).
-purge_cp_body(#acc{} = Acc, PurgeSeq) ->
- #acc{
- source = Source,
- target = Target
- } = Acc,
+purge_cp_body(#shard{} = Source, #shard{} = Target, PurgeSeq) ->
{Mega, Secs, _} = os:timestamp(),
NowSecs = Mega * 1000000 + Secs,
{[
@@ -523,7 +619,7 @@ purge_cp_body(#acc{} = Acc, PurgeSeq) ->
find_repl_doc(SrcDb, TgtUUIDPrefix) ->
SrcUUID = couch_db:get_uuid(SrcDb),
- S = couch_util:encodeBase64Url(couch_hash:md5_hash(term_to_binary(SrcUUID))),
+ S = local_id_hash(SrcUUID),
DocIdPrefix = <<"_local/shard-sync-", S/binary, "-">>,
FoldFun = fun(#doc{id = DocId, body = {BodyProps}} = Doc, _) ->
TgtUUID = couch_util:get_value(<<"target_uuid">>, BodyProps, <<>>),
@@ -551,12 +647,71 @@ find_repl_doc(SrcDb, TgtUUIDPrefix) ->
end.
+find_split_target_seq_int(TgtDb, Node, SrcUUIDPrefix) ->
+ TgtUUID = couch_db:get_uuid(TgtDb),
+ FoldFun = fun(#doc{body = {Props}}, _) ->
+ DocTgtUUID = couch_util:get_value(<<"target_uuid">>, Props, <<>>),
+ case TgtUUID == DocTgtUUID of
+ true ->
+ {History} = couch_util:get_value(<<"history">>, Props, {[]}),
+ HProps = couch_util:get_value(Node, History, []),
+ case get_target_seqs(HProps, TgtUUID, Node, SrcUUIDPrefix, []) of
+ [] ->
+ % No replication found from source to target
+ {ok, not_found};
+ [{_, _} | _] = SeqPairs ->
+ % Found shared replicated history from source to target
+ % Return sorted list by the earliest source sequence
+ {stop, lists:sort(SeqPairs)}
+ end;
+ false ->
+ {ok, not_found}
+ end
+ end,
+ Options = [{start_key, <<"_local/shard-sync-">>}],
+ case couch_db:fold_local_docs(TgtDb, FoldFun, not_found, Options) of
+ {ok, Seqs} when is_list(Seqs) ->
+ {ok, Seqs};
+ {ok, not_found} ->
+ {not_found, missing};
+ Else ->
+ couch_log:error("Error finding replication doc: ~w", [Else]),
+ {not_found, missing}
+ end.
+
+
+% Get target sequences for each checkpoint when source replicated to the target
+% The "target" is the current db where the history entry was read from and "source"
+% is another, now possibly deleted, database.
+get_target_seqs([], _TgtUUID, _Node, _SrcUUIDPrefix, Acc) ->
+ lists:reverse(Acc);
+
+get_target_seqs([{Entry} | HProps], TgtUUID, Node, SrcUUIDPrefix, Acc) ->
+ SameTgt = couch_util:get_value(<<"target_uuid">>, Entry) =:= TgtUUID,
+ SameNode = couch_util:get_value(<<"target_node">>, Entry) =:= Node,
+ SrcUUID = couch_util:get_value(<<"source_uuid">>, Entry),
+ IsPrefix = is_prefix(SrcUUIDPrefix, SrcUUID),
+ Acc1 = case SameTgt andalso SameNode andalso IsPrefix of
+ true ->
+ EntrySourceSeq = couch_util:get_value(<<"source_seq">>, Entry),
+ EntryTargetSeq = couch_util:get_value(<<"target_seq">>, Entry),
+ [{EntrySourceSeq, EntryTargetSeq} | Acc];
+ false ->
+ Acc
+ end,
+ get_target_seqs(HProps, TgtUUID, Node, SrcUUIDPrefix, Acc1).
+
+
with_src_db(#acc{source = Source}, Fun) ->
- {ok, Db} = couch_db:open(Source#shard.name, [?ADMIN_CTX]),
- try
- Fun(Db)
- after
- couch_db:close(Db)
+ case couch_db:open(Source#shard.name, [?ADMIN_CTX]) of
+ {ok, Db} ->
+ try
+ Fun(Db)
+ after
+ couch_db:close(Db)
+ end;
+ {not_found, _} ->
+ error({error, missing_source})
end.
@@ -575,6 +730,45 @@ filter_doc(_, _) ->
keep.
+sync_security(#shard{} = Source, #{} = Targets) ->
+ maps:map(fun(_, #tgt{shard = Target}) ->
+ mem3_sync_security:maybe_sync(Source, Target)
+ end, Targets).
+
+
+targets_map(#shard{name = <<"shards/", _/binary>> = SrcName} = Src,
+ #shard{name = <<"shards/", _/binary>>, node = TgtNode}) ->
+ % Parse range from name in case the passed shard is built with a name only
+ SrcRange = mem3:range(SrcName),
+ Shards0 = mem3:shards(mem3:dbname(SrcName)),
+ Shards1 = [S || S <- Shards0, not shard_eq(S, Src)],
+ Shards2 = [S || S <- Shards1, check_overlap(SrcRange, TgtNode, S)],
+ maps:from_list([{R, S} || #shard{range = R} = S <- Shards2]);
+
+targets_map(_Src, Tgt) ->
+ #{[0, ?RING_END] => Tgt}.
+
+
+shard_eq(#shard{name = Name, node = Node}, #shard{name = Name, node = Node}) ->
+ true;
+
+shard_eq(_, _) ->
+ false.
+
+
+check_overlap(SrcRange, Node, #shard{node = Node, range = TgtRange}) ->
+ mem3_util:range_overlap(SrcRange, TgtRange);
+
+check_overlap([_, _], _, #shard{}) ->
+ false.
+
+
+reset_remaining(#{} = Targets) ->
+ maps:map(fun(_, #tgt{} = T) ->
+ T#tgt{remaining = 0}
+ end, Targets).
+
+
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
@@ -666,4 +860,106 @@ doc_() ->
body={[{<<"history">>, History}]}
}.
+
+targets_map_test_() ->
+ {
+ foreach,
+ fun() -> meck:new(mem3, [passthrough]) end,
+ fun(_) -> meck:unload() end,
+ [
+ target_not_a_shard(),
+ source_contained_in_target(),
+ multiple_targets(),
+ uneven_overlap()
+ ]
+ }.
+
+
+target_not_a_shard() ->
+ ?_assertEqual(#{[0, ?RING_END] => <<"t">>}, targets_map(<<"s">>, <<"t">>)).
+
+
+source_contained_in_target() ->
+ ?_test(begin
+ R07 = [16#00000000, 16#7fffffff],
+ R8f = [16#80000000, 16#ffffffff],
+ R0f = [16#00000000, 16#ffffffff],
+
+ Shards = [
+ #shard{node = 'n1', range = R07},
+ #shard{node = 'n1', range = R8f},
+ #shard{node = 'n2', range = R07},
+ #shard{node = 'n2', range = R8f},
+ #shard{node = 'n3', range = R0f}
+ ],
+ meck:expect(mem3, shards, 1, Shards),
+
+ SrcName1 = <<"shards/00000000-7fffffff/d.1551893552">>,
+ TgtName1 = <<"shards/00000000-7fffffff/d.1551893552">>,
+
+ Src1 = #shard{name = SrcName1, node = 'n1'},
+ Tgt1 = #shard{name = TgtName1, node = 'n2'},
+ Map1 = targets_map(Src1, Tgt1),
+ ?assertEqual(1, map_size(Map1)),
+ ?assertMatch(#{R07 := #shard{node = 'n2'}}, Map1),
+
+ Tgt2 = #shard{name = TgtName1, node = 'n3'},
+ Map2 = targets_map(Src1, Tgt2),
+ ?assertEqual(1, map_size(Map2)),
+ ?assertMatch(#{R0f := #shard{node = 'n3'}}, Map2)
+ end).
+
+
+multiple_targets() ->
+ ?_test(begin
+ R07 = [16#00000000, 16#7fffffff],
+ R8f = [16#80000000, 16#ffffffff],
+ R0f = [16#00000000, 16#ffffffff],
+
+ Shards = [
+ #shard{node = 'n1', range = R07},
+ #shard{node = 'n1', range = R8f},
+ #shard{node = 'n2', range = R0f}
+ ],
+ meck:expect(mem3, shards, 1, Shards),
+
+ SrcName = <<"shards/00000000-ffffffff/d.1551893552">>,
+ TgtName = <<"shards/00000000-7fffffff/d.1551893552">>,
+
+ Src = #shard{name = SrcName, node = 'n2'},
+ Tgt = #shard{name = TgtName, node = 'n1'},
+ Map = targets_map(Src, Tgt),
+ ?assertEqual(2, map_size(Map)),
+ ?assertMatch(#{R07 := #shard{node = 'n1'}}, Map),
+ ?assertMatch(#{R8f := #shard{node = 'n1'}}, Map)
+ end).
+
+
+uneven_overlap() ->
+ ?_test(begin
+ R04 = [16#00000000, 16#4fffffff],
+ R26 = [16#20000000, 16#6fffffff],
+ R58 = [16#50000000, 16#8fffffff],
+ R9f = [16#90000000, 16#ffffffff],
+ Shards = [
+ #shard{node = 'n1', range = R04},
+ #shard{node = 'n1', range = R58},
+ #shard{node = 'n1', range = R9f},
+ #shard{node = 'n2', range = R26}
+ ],
+
+ meck:expect(mem3, shards, 1, Shards),
+
+ SrcName = <<"shards/20000000-6fffffff/d.1551893552">>,
+ TgtName = <<"shards/20000000-6fffffff/d.1551893552">>,
+
+ Src = #shard{name = SrcName, node = 'n2'},
+ Tgt = #shard{name = TgtName, node = 'n1'},
+ Map = targets_map(Src, Tgt),
+ ?assertEqual(2, map_size(Map)),
+ ?assertMatch(#{R04 := #shard{node = 'n1'}}, Map),
+ ?assertMatch(#{R58 := #shard{node = 'n1'}}, Map)
+ end).
+
+
-endif.
diff --git a/src/mem3/src/mem3_rpc.erl b/src/mem3/src/mem3_rpc.erl
index 61a29d9bf..59dbbe891 100644
--- a/src/mem3/src/mem3_rpc.erl
+++ b/src/mem3/src/mem3_rpc.erl
@@ -21,11 +21,14 @@
update_docs/4,
pull_replication/1,
load_checkpoint/4,
+ load_checkpoint/5,
save_checkpoint/6,
load_purge_infos/4,
save_purge_checkpoint/4,
- purge_docs/4
+ purge_docs/4,
+
+ replicate/4
]).
% Private RPC callbacks
@@ -33,16 +36,25 @@
find_common_seq_rpc/3,
load_checkpoint_rpc/3,
pull_replication_rpc/1,
+ load_checkpoint_rpc/4,
save_checkpoint_rpc/5,
load_purge_infos_rpc/3,
- save_purge_checkpoint_rpc/3
+ save_purge_checkpoint_rpc/3,
+
+ replicate_rpc/2
+
]).
-include("mem3.hrl").
-include_lib("couch/include/couch_db.hrl").
+
+-define(BATCH_SIZE, 1000).
+-define(REXI_CALL_TIMEOUT_MSEC, 600000).
+
+
% "Pull" is a bit of a misnomer here, as what we're actually doing is
% issuing an RPC request and telling the remote node to push updates to
% us. This lets us reuse all of the battle-tested machinery of mem3_rpc.
@@ -57,6 +69,11 @@ update_docs(Node, DbName, Docs, Options) ->
rexi_call(Node, {fabric_rpc, update_docs, [DbName, Docs, Options]}).
+load_checkpoint(Node, DbName, SourceNode, SourceUUID, FilterHash) ->
+ Args = [DbName, SourceNode, SourceUUID, FilterHash],
+ rexi_call(Node, {mem3_rpc, load_checkpoint_rpc, Args}).
+
+
load_checkpoint(Node, DbName, SourceNode, SourceUUID) ->
Args = [DbName, SourceNode, SourceUUID],
rexi_call(Node, {mem3_rpc, load_checkpoint_rpc, Args}).
@@ -86,12 +103,22 @@ purge_docs(Node, DbName, PurgeInfos, Options) ->
rexi_call(Node, {fabric_rpc, purge_docs, [DbName, PurgeInfos, Options]}).
+replicate(Source, Target, DbName, Timeout)
+ when is_atom(Source), is_atom(Target), is_binary(DbName) ->
+ Args = [DbName, Target],
+ rexi_call(Source, {mem3_rpc, replicate_rpc, Args}, Timeout).
+
+
load_checkpoint_rpc(DbName, SourceNode, SourceUUID) ->
+ load_checkpoint_rpc(DbName, SourceNode, SourceUUID, <<>>).
+
+
+load_checkpoint_rpc(DbName, SourceNode, SourceUUID, FilterHash) ->
erlang:put(io_priority, {internal_repl, DbName}),
case get_or_create_db(DbName, [?ADMIN_CTX]) of
{ok, Db} ->
TargetUUID = couch_db:get_uuid(Db),
- NewId = mem3_rep:make_local_id(SourceUUID, TargetUUID),
+ NewId = mem3_rep:make_local_id(SourceUUID, TargetUUID, FilterHash),
case couch_db:open_doc(Db, NewId, []) of
{ok, Doc} ->
rexi:reply({ok, {NewId, Doc}});
@@ -208,6 +235,16 @@ save_purge_checkpoint_rpc(DbName, PurgeDocId, Body) ->
end.
+replicate_rpc(DbName, Target) ->
+ rexi:reply(try
+ Opts = [{batch_size, ?BATCH_SIZE}, {batch_count, all}],
+ {ok, mem3_rep:go(DbName, Target, Opts)}
+ catch
+ Tag:Error ->
+ {Tag, Error}
+ end).
+
+
%% @doc Return the sequence where two files with the same UUID diverged.
compare_epochs(SourceEpochs, TargetEpochs) ->
compare_rev_epochs(
@@ -338,6 +375,10 @@ find_bucket(NewSeq, CurSeq, Bucket) ->
rexi_call(Node, MFA) ->
+ rexi_call(Node, MFA, ?REXI_CALL_TIMEOUT_MSEC).
+
+
+rexi_call(Node, MFA, Timeout) ->
Mon = rexi_monitor:start([rexi_utils:server_pid(Node)]),
Ref = rexi:cast(Node, self(), MFA, [sync]),
try
@@ -347,7 +388,7 @@ rexi_call(Node, MFA) ->
erlang:error(Error);
{rexi_DOWN, Mon, _, Reason} ->
erlang:error({rexi_DOWN, {Node, Reason}})
- after 600000 ->
+ after Timeout ->
erlang:error(timeout)
end
after
diff --git a/src/mem3/test/mem3_rep_test.erl b/src/mem3/test/mem3_rep_test.erl
new file mode 100644
index 000000000..7e8856f7c
--- /dev/null
+++ b/src/mem3/test/mem3_rep_test.erl
@@ -0,0 +1,320 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_rep_test).
+
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+-include_lib("mem3/include/mem3.hrl").
+
+
+-define(ID, <<"_id">>).
+
+setup() ->
+ {AllSrc, AllTgt} = {?tempdb(), ?tempdb()},
+ {PartSrc, PartTgt} = {?tempdb(), ?tempdb()},
+ create_db(AllSrc, [{q, 1}, {n, 1}]),
+ create_db(AllTgt, [{q, 2}, {n, 1}]),
+ PartProps = [{partitioned, true}, {hash, [couch_partition, hash, []]}],
+ create_db(PartSrc, [{q, 1}, {n, 1}, {props, PartProps}]),
+ create_db(PartTgt, [{q, 2}, {n, 1}, {props, PartProps}]),
+ #{allsrc => AllSrc, alltgt => AllTgt, partsrc => PartSrc, parttgt => PartTgt}.
+
+
+teardown(#{} = Dbs) ->
+ maps:map(fun(_, Db) -> delete_db(Db) end, Dbs).
+
+
+start_couch() ->
+ test_util:start_couch([mem3, fabric]).
+
+
+stop_couch(Ctx) ->
+ test_util:stop_couch(Ctx).
+
+
+mem3_reshard_db_test_() ->
+ {
+ "mem3 rep db tests",
+ {
+ setup,
+ fun start_couch/0, fun stop_couch/1,
+ {
+ foreach,
+ fun setup/0, fun teardown/1,
+ [
+ fun replicate_basics/1,
+ fun replicate_small_batches/1,
+ fun replicate_low_batch_count/1,
+ fun replicate_with_partitions/1
+ ]
+ }
+ }
+ }.
+
+
+replicate_basics(#{allsrc := AllSrc, alltgt := AllTgt}) ->
+ ?_test(begin
+ DocSpec = #{docs => 10, delete => [5, 9]},
+ add_test_docs(AllSrc, DocSpec),
+ SDocs = get_all_docs(AllSrc),
+
+ [Src] = lists:sort(mem3:local_shards(AllSrc)),
+ [Tgt1, Tgt2] = lists:sort(mem3:local_shards(AllTgt)),
+ #shard{range = R1} = Tgt1,
+ #shard{range = R2} = Tgt2,
+ TMap = #{R1 => Tgt1, R2 => Tgt2},
+ Opts = [{batch_size, 1000}, {batch_count, all}],
+ ?assertMatch({ok, 0}, mem3_rep:go(Src, TMap, Opts)),
+
+ ?assertEqual(SDocs, get_all_docs(AllTgt))
+ end).
+
+
+replicate_small_batches(#{allsrc := AllSrc, alltgt := AllTgt}) ->
+ ?_test(begin
+ DocSpec = #{docs => 10, delete => [5, 9]},
+ add_test_docs(AllSrc, DocSpec),
+ SDocs = get_all_docs(AllSrc),
+
+ [Src] = lists:sort(mem3:local_shards(AllSrc)),
+ [Tgt1, Tgt2] = lists:sort(mem3:local_shards(AllTgt)),
+ #shard{range = R1} = Tgt1,
+ #shard{range = R2} = Tgt2,
+ TMap = #{R1 => Tgt1, R2 => Tgt2},
+ Opts = [{batch_size, 2}, {batch_count, all}],
+ ?assertMatch({ok, 0}, mem3_rep:go(Src, TMap, Opts)),
+
+ ?assertEqual(SDocs, get_all_docs(AllTgt))
+ end).
+
+
+replicate_low_batch_count(#{allsrc := AllSrc, alltgt := AllTgt}) ->
+ ?_test(begin
+ DocSpec = #{docs => 10, delete => [5, 9]},
+ add_test_docs(AllSrc, DocSpec),
+ SDocs = get_all_docs(AllSrc),
+
+ [Src] = lists:sort(mem3:local_shards(AllSrc)),
+ [Tgt1, Tgt2] = lists:sort(mem3:local_shards(AllTgt)),
+ #shard{range = R1} = Tgt1,
+ #shard{range = R2} = Tgt2,
+ TMap = #{R1 => Tgt1, R2 => Tgt2},
+
+ Opts1 = [{batch_size, 2}, {batch_count, 1}],
+ ?assertMatch({ok, 8}, mem3_rep:go(Src, TMap, Opts1)),
+
+ Opts2 = [{batch_size, 1}, {batch_count, 2}],
+ ?assertMatch({ok, 6}, mem3_rep:go(Src, TMap, Opts2)),
+
+ Opts3 = [{batch_size, 1000}, {batch_count, all}],
+ ?assertMatch({ok, 0}, mem3_rep:go(Src, TMap, Opts3)),
+
+ ?assertEqual(SDocs, get_all_docs(AllTgt))
+ end).
+
+
+replicate_with_partitions(#{partsrc := PartSrc, parttgt := PartTgt}) ->
+ ?_test(begin
+ DocSpec = #{
+ pdocs => #{
+ <<"PX">> => 15,
+ <<"PY">> => 19
+ }
+ },
+ add_test_docs(PartSrc, DocSpec),
+ SDocs = get_all_docs(PartSrc),
+ PXSrc = get_partition_info(PartSrc, <<"PX">>),
+ PYSrc = get_partition_info(PartSrc, <<"PY">>),
+
+ [Src] = lists:sort(mem3:local_shards(PartSrc)),
+ [Tgt1, Tgt2] = lists:sort(mem3:local_shards(PartTgt)),
+ #shard{range = R1} = Tgt1,
+ #shard{range = R2} = Tgt2,
+ TMap = #{R1 => Tgt1, R2 => Tgt2},
+ Opts = [{batch_size, 1000}, {batch_count, all}],
+ ?assertMatch({ok, 0}, mem3_rep:go(Src, TMap, Opts)),
+
+ ?assertEqual(PXSrc, get_partition_info(PartTgt, <<"PX">>)),
+ ?assertEqual(PYSrc, get_partition_info(PartTgt, <<"PY">>)),
+ ?assertEqual(SDocs, get_all_docs(PartTgt))
+ end).
+
+
+get_partition_info(DbName, Partition) ->
+ with_proc(fun() ->
+ {ok, PInfo} = fabric:get_partition_info(DbName, Partition),
+ maps:with([
+ <<"doc_count">>, <<"doc_del_count">>, <<"partition">>
+ ], to_map(PInfo))
+ end).
+
+
+get_all_docs(DbName) ->
+ get_all_docs(DbName, #mrargs{}).
+
+
+get_all_docs(DbName, #mrargs{} = QArgs0) ->
+ GL = erlang:group_leader(),
+ with_proc(fun() ->
+ Cb = fun
+ ({row, Props}, Acc) ->
+ Doc = to_map(couch_util:get_value(doc, Props)),
+ #{?ID := Id} = Doc,
+ {ok, Acc#{Id => Doc}};
+ ({meta, _}, Acc) -> {ok, Acc};
+ (complete, Acc) -> {ok, Acc}
+ end,
+ QArgs = QArgs0#mrargs{include_docs = true},
+ {ok, Docs} = fabric:all_docs(DbName, Cb, #{}, QArgs),
+ Docs
+ end, GL).
+
+
+to_map([_ | _] = Props) ->
+ to_map({Props});
+
+to_map({[_ | _]} = EJson) ->
+ jiffy:decode(jiffy:encode(EJson), [return_maps]).
+
+
+create_db(DbName, Opts) ->
+ GL = erlang:group_leader(),
+ with_proc(fun() -> fabric:create_db(DbName, Opts) end, GL).
+
+
+delete_db(DbName) ->
+ GL = erlang:group_leader(),
+ with_proc(fun() -> fabric:delete_db(DbName, [?ADMIN_CTX]) end, GL).
+
+
+with_proc(Fun) ->
+ with_proc(Fun, undefined, 30000).
+
+
+with_proc(Fun, GroupLeader) ->
+ with_proc(Fun, GroupLeader, 30000).
+
+
+with_proc(Fun, GroupLeader, Timeout) ->
+ {Pid, Ref} = spawn_monitor(fun() ->
+ case GroupLeader of
+ undefined -> ok;
+ _ -> erlang:group_leader(GroupLeader, self())
+ end,
+ exit({with_proc_res, Fun()})
+ end),
+ receive
+ {'DOWN', Ref, process, Pid, {with_proc_res, Res}} ->
+ Res;
+ {'DOWN', Ref, process, Pid, Error} ->
+ error(Error)
+ after Timeout ->
+ erlang:demonitor(Ref, [flush]),
+ exit(Pid, kill),
+ error({with_proc_timeout, Fun, Timeout})
+ end.
+
+
+add_test_docs(DbName, #{} = DocSpec) ->
+ Docs = docs(maps:get(docs, DocSpec, []))
+ ++ pdocs(maps:get(pdocs, DocSpec, #{})),
+ Res = update_docs(DbName, Docs),
+ Docs1 = lists:map(fun({Doc, {ok, {RevPos, Rev}}}) ->
+ Doc#doc{revs = {RevPos, [Rev]}}
+ end, lists:zip(Docs, Res)),
+ case delete_docs(maps:get(delete, DocSpec, []), Docs1) of
+ [] -> ok;
+ [_ | _] = Deleted -> update_docs(DbName, Deleted)
+ end,
+ ok.
+
+
+update_docs(DbName, Docs) ->
+ with_proc(fun() ->
+ case fabric:update_docs(DbName, Docs, [?ADMIN_CTX]) of
+ {accepted, Res} -> Res;
+ {ok, Res} -> Res
+ end
+ end).
+
+
+delete_docs([S, E], Docs) when E >= S ->
+ ToDelete = [doc_id(<<"">>, I) || I <- lists:seq(S, E)],
+ lists:filtermap(fun(#doc{id = Id} = Doc) ->
+ case lists:member(Id, ToDelete) of
+ true -> {true, Doc#doc{deleted = true}};
+ false -> false
+ end
+ end, Docs);
+delete_docs(_, _) ->
+ [].
+
+
+pdocs(#{} = PMap) ->
+ maps:fold(fun(Part, DocSpec, DocsAcc) ->
+ docs(DocSpec, <<Part/binary, ":">>) ++ DocsAcc
+ end, [], PMap).
+
+
+docs(DocSpec) ->
+ docs(DocSpec, <<"">>).
+
+
+docs(N, Prefix) when is_integer(N), N > 0 ->
+ docs([0, N - 1], Prefix);
+docs([S, E], Prefix) when E >= S ->
+ [doc(Prefix, I) || I <- lists:seq(S, E)];
+docs(_, _) ->
+ [].
+
+
+doc(Pref, Id) ->
+ Body = bodyprops(),
+ doc(Pref, Id, Body, 42).
+
+
+doc(Pref, Id, BodyProps, AttSize) ->
+ #doc{
+ id = doc_id(Pref, Id),
+ body = {BodyProps},
+ atts = atts(AttSize)
+ }.
+
+
+doc_id(Pref, Id) ->
+ IdBin = iolist_to_binary(io_lib:format("~5..0B", [Id])),
+ <<Pref/binary, IdBin/binary>>.
+
+
+bodyprops() ->
+ [
+ {<<"g">>, {[
+ {<<"type">>, <<"Polygon">>},
+ {<<"coordinates">>, [[[-71.0, 48.4], [-70.0, 48.4], [-71.0, 48.4]]]}
+ ]}}
+ ].
+
+
+atts(0) ->
+ [];
+
+atts(Size) when is_integer(Size), Size >= 1 ->
+ Data = << <<"x">> || _ <- lists:seq(1, Size) >>,
+ [couch_att:new([
+ {name, <<"att">>},
+ {type, <<"app/binary">>},
+ {att_len, Size},
+ {data, Data}
+ ])].