summaryrefslogtreecommitdiff
path: root/src/mem3/test/eunit/mem3_reshard_test.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/mem3/test/eunit/mem3_reshard_test.erl')
-rw-r--r--src/mem3/test/eunit/mem3_reshard_test.erl805
1 files changed, 805 insertions, 0 deletions
diff --git a/src/mem3/test/eunit/mem3_reshard_test.erl b/src/mem3/test/eunit/mem3_reshard_test.erl
new file mode 100644
index 000000000..ab6202115
--- /dev/null
+++ b/src/mem3/test/eunit/mem3_reshard_test.erl
@@ -0,0 +1,805 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_reshard_test).
+
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("mem3/src/mem3_reshard.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl"). % for all_docs function
+
+-define(ID, <<"_id">>).
+-define(TIMEOUT, 60).
+
+setup() ->
+ HaveDreyfus = code:lib_dir(dreyfus) /= {error, bad_name},
+ case HaveDreyfus of false -> ok; true ->
+ mock_dreyfus_indices()
+ end,
+
+ HaveHastings = code:lib_dir(hastings) /= {error, bad_name},
+ case HaveHastings of false -> ok; true ->
+ mock_hastings_indices()
+ end,
+ {Db1, Db2} = {?tempdb(), ?tempdb()},
+ create_db(Db1, [{q, 1}, {n, 1}]),
+ PartProps = [{partitioned, true}, {hash, [couch_partition, hash, []]}],
+ create_db(Db2, [{q, 1}, {n, 1}, {props, PartProps}]),
+ config:set("reshard", "retry_interval_sec", "0", _Persist=false),
+ #{db1 => Db1, db2 => Db2}.
+
+
+teardown(#{} = Dbs) ->
+ mem3_reshard:reset_state(),
+ maps:map(fun(_, Db) -> delete_db(Db) end, Dbs),
+ config:delete("reshard", "retry_interval_sec", _Persist=false),
+ meck:unload().
+
+
+start_couch() ->
+ test_util:start_couch(?CONFIG_CHAIN, [mem3, fabric]).
+
+
+stop_couch(Ctx) ->
+ test_util:stop_couch(Ctx).
+
+
+mem3_reshard_db_test_() ->
+ {
+ "mem3 shard split db tests",
+ {
+ setup,
+ fun start_couch/0, fun stop_couch/1,
+ {
+ foreach,
+ fun setup/0, fun teardown/1,
+ [
+ fun split_one_shard/1,
+ fun update_docs_before_topoff1/1,
+ fun indices_are_built/1,
+ fun split_partitioned_db/1,
+ fun split_twice/1,
+ fun couch_events_are_emitted/1,
+ fun retries_work/1,
+ fun target_reset_in_initial_copy/1,
+ fun split_an_incomplete_shard_map/1
+ ]
+ }
+ }
+ }.
+
+
+% This is a basic test to check that shard splitting preserves documents, and
+% db meta props like revs limits and security.
+split_one_shard(#{db1 := Db}) ->
+ {timeout, ?TIMEOUT, ?_test(begin
+ DocSpec = #{docs => 10, delete => [5, 9], mrview => 1, local => 1},
+ add_test_docs(Db, DocSpec),
+
+ % Save documents before the split
+ Docs0 = get_all_docs(Db),
+ Local0 = get_local_docs(Db),
+
+ % Set some custom metadata properties
+ set_revs_limit(Db, 942),
+ set_purge_infos_limit(Db, 943),
+ SecObj = {[{<<"foo">>, <<"bar">>}]},
+ set_security(Db, SecObj),
+
+ % DbInfo is saved after setting metadata bits
+ % as those could bump the update sequence
+ DbInfo0 = get_db_info(Db),
+
+ % Split the one shard
+ [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)),
+ {ok, JobId} = mem3_reshard:start_split_job(Shard),
+ wait_state(JobId, completed),
+
+ % Perform some basic checks that the shard was split
+ Shards1 = lists:sort(mem3:local_shards(Db)),
+ ?assertEqual(2, length(Shards1)),
+ [#shard{range = R1}, #shard{range = R2}] = Shards1,
+ ?assertEqual([16#00000000, 16#7fffffff], R1),
+ ?assertEqual([16#80000000, 16#ffffffff], R2),
+
+ % Check metadata bits after the split
+ ?assertEqual(942, get_revs_limit(Db)),
+ ?assertEqual(943, get_purge_infos_limit(Db)),
+ ?assertEqual(SecObj, get_security(Db)),
+
+ DbInfo1 = get_db_info(Db),
+ Docs1 = get_all_docs(Db),
+ Local1 = get_local_docs(Db),
+
+ % When comparing db infos, ignore update sequences they won't be the
+ % same since they are more shards involved after the split
+ ?assertEqual(without_seqs(DbInfo0), without_seqs(DbInfo1)),
+
+ % Update seq prefix number is a sum of all shard update sequences
+ #{<<"update_seq">> := UpdateSeq0} = update_seq_to_num(DbInfo0),
+ #{<<"update_seq">> := UpdateSeq1} = update_seq_to_num(DbInfo1),
+ ?assertEqual(UpdateSeq0 * 2, UpdateSeq1),
+
+ % Finally compare that the documents are still there after the split
+ ?assertEqual(Docs0, Docs1),
+
+ % Don't forget about the local but don't include internal checkpoints
+ % as some of those are munged and transformed during the split
+ ?assertEqual(without_meta_locals(Local0), without_meta_locals(Local1))
+ end)}.
+
+
+% This test checks that document added while the shard is being split are not
+% lost. Topoff1 state happens before indices are built
+update_docs_before_topoff1(#{db1 := Db}) ->
+ {timeout, ?TIMEOUT, ?_test(begin
+ add_test_docs(Db, #{docs => 10}),
+
+ intercept_state(topoff1),
+
+ [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)),
+ {ok, JobId} = mem3_reshard:start_split_job(Shard),
+
+ receive {JobPid, topoff1} -> ok end,
+ add_test_docs(Db, #{docs => [10, 19], local => 1}),
+ Docs0 = get_all_docs(Db),
+ Local0 = get_local_docs(Db),
+ DbInfo0 = get_db_info(Db),
+ JobPid ! continue,
+
+ wait_state(JobId, completed),
+
+ % Perform some basic checks that the shard was split
+ Shards1 = lists:sort(mem3:local_shards(Db)),
+ ?assertEqual(2, length(Shards1)),
+
+ DbInfo1 = get_db_info(Db),
+ Docs1 = get_all_docs(Db),
+ Local1 = get_local_docs(Db),
+
+ ?assertEqual(without_seqs(DbInfo0), without_seqs(DbInfo1)),
+
+ % Update sequence after initial copy with 10 docs would be 10 on each
+ % target shard (to match the source) and the total update sequence
+ % would have been 20. But then 10 more docs were added (3 might have
+ % ended up on one target and 7 on another) so the final update sequence
+ % would then be 20 + 10 = 30.
+ ?assertMatch(#{<<"update_seq">> := 30}, update_seq_to_num(DbInfo1)),
+
+ ?assertEqual(Docs0, Docs1),
+ ?assertEqual(without_meta_locals(Local0), without_meta_locals(Local1))
+ end)}.
+
+
+% This test that indices are built during shard splitting.
+indices_are_built(#{db1 := Db}) ->
+ {timeout, ?TIMEOUT, ?_test(begin
+ HaveDreyfus = code:lib_dir(dreyfus) /= {error, bad_name},
+ HaveHastings = code:lib_dir(hastings) /= {error, bad_name},
+
+ add_test_docs(Db, #{docs => 10, mrview => 2, search => 2, geo => 2}),
+ [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)),
+ {ok, JobId} = mem3_reshard:start_split_job(Shard),
+ wait_state(JobId, completed),
+ Shards1 = lists:sort(mem3:local_shards(Db)),
+ ?assertEqual(2, length(Shards1)),
+ MRViewGroupInfo = get_group_info(Db, <<"_design/mrview00000">>),
+ ?assertMatch(#{<<"update_seq">> := 32}, MRViewGroupInfo),
+
+ HaveDreyfus = code:lib_dir(dreyfus) /= {error, bad_name},
+ case HaveDreyfus of false -> ok; true ->
+ % 4 because there are 2 indices and 2 target shards
+ ?assertEqual(4, meck:num_calls(dreyfus_index, await, 2))
+ end,
+
+ HaveHastings = code:lib_dir(hastings) /= {error, bad_name},
+ case HaveHastings of false -> ok; true ->
+ % 4 because there are 2 indices and 2 target shards
+ ?assertEqual(4, meck:num_calls(hastings_index, await, 2))
+ end
+ end)}.
+
+
+mock_dreyfus_indices() ->
+ meck:expect(dreyfus_index, design_doc_to_indexes, fun(Doc) ->
+ #doc{body = {BodyProps}} = Doc,
+ case couch_util:get_value(<<"indexes">>, BodyProps) of
+ undefined ->
+ [];
+ {[_]} ->
+ [{dreyfus, <<"db">>, dreyfus_index1}]
+ end
+ end),
+ meck:expect(dreyfus_index_manager, get_index, fun(_, _) -> {ok, pid} end),
+ meck:expect(dreyfus_index, await, fun(_, _) -> ok end).
+
+
+mock_hastings_indices() ->
+ meck:expect(hastings_index, design_doc_to_indexes, fun(Doc) ->
+ #doc{body = {BodyProps}} = Doc,
+ case couch_util:get_value(<<"st_indexes">>, BodyProps) of
+ undefined ->
+ [];
+ {[_]} ->
+ [{hastings, <<"db">>, hastings_index1}]
+ end
+ end),
+ meck:expect(hastings_index_manager, get_index, fun(_, _) -> {ok, pid} end),
+ meck:expect(hastings_index, await, fun(_, _) -> ok end).
+
+% Split partitioned database
+split_partitioned_db(#{db2 := Db}) ->
+ {timeout, ?TIMEOUT, ?_test(begin
+ DocSpec = #{
+ pdocs => #{
+ <<"PX">> => 5,
+ <<"PY">> => 5
+ },
+ mrview => 1,
+ local => 1
+ },
+ add_test_docs(Db, DocSpec),
+
+ % Save documents before the split
+ Docs0 = get_all_docs(Db),
+ Local0 = get_local_docs(Db),
+
+ % Set some custom metadata properties
+ set_revs_limit(Db, 942),
+ set_purge_infos_limit(Db, 943),
+ SecObj = {[{<<"foo">>, <<"bar">>}]},
+ set_security(Db, SecObj),
+
+ % DbInfo is saved after setting metadata bits
+ % as those could bump the update sequence
+ DbInfo0 = get_db_info(Db),
+ PX0 = get_partition_info(Db, <<"PX">>),
+ PY0 = get_partition_info(Db, <<"PY">>),
+
+ % Split the one shard
+ [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)),
+ {ok, JobId} = mem3_reshard:start_split_job(Shard),
+ wait_state(JobId, completed),
+
+ % Perform some basic checks that the shard was split
+ Shards1 = lists:sort(mem3:local_shards(Db)),
+ ?assertEqual(2, length(Shards1)),
+ [#shard{range = R1}, #shard{range = R2}] = Shards1,
+ ?assertEqual([16#00000000, 16#7fffffff], R1),
+ ?assertEqual([16#80000000, 16#ffffffff], R2),
+
+ % Check metadata bits after the split
+ ?assertEqual(942, get_revs_limit(Db)),
+ ?assertEqual(943, get_purge_infos_limit(Db)),
+ ?assertEqual(SecObj, get_security(Db)),
+
+ DbInfo1 = get_db_info(Db),
+ Docs1 = get_all_docs(Db),
+ Local1 = get_local_docs(Db),
+
+ % When comparing db infos, ignore update sequences they won't be the
+ % same since they are more shards involved after the split
+ ?assertEqual(without_seqs(DbInfo0), without_seqs(DbInfo1)),
+
+ % Update seq prefix number is a sum of all shard update sequences
+ #{<<"update_seq">> := UpdateSeq0} = update_seq_to_num(DbInfo0),
+ #{<<"update_seq">> := UpdateSeq1} = update_seq_to_num(DbInfo1),
+ ?assertEqual(UpdateSeq0 * 2, UpdateSeq1),
+
+ % Finally compare that documents are still there after the split
+ ?assertEqual(Docs0, Docs1),
+
+ ?assertEqual(PX0, get_partition_info(Db, <<"PX">>)),
+ ?assertEqual(PY0, get_partition_info(Db, <<"PY">>)),
+
+ % Don't forget about the local but don't include internal checkpoints
+ % as some of those are munged and transformed during the split
+ ?assertEqual(without_meta_locals(Local0), without_meta_locals(Local1))
+ end)}.
+
+
+% Make sure a shard can be split again after it was split once. This checks that
+% too many got added to some range, such that on next split they'd fail to fit
+% in to any of the new target ranges.
+split_twice(#{db1 := Db}) ->
+ {timeout, ?TIMEOUT, ?_test(begin
+ DocSpec = #{docs => 100, delete => [80, 99], mrview => 2, local => 100},
+ add_test_docs(Db, DocSpec),
+
+ % Save documents before the split
+ Docs0 = get_all_docs(Db),
+ Local0 = get_local_docs(Db),
+
+ % Set some custom metadata properties
+ set_revs_limit(Db, 942),
+ set_purge_infos_limit(Db, 943),
+ SecObj = {[{<<"foo">>, <<"bar">>}]},
+ set_security(Db, SecObj),
+
+ % DbInfo is saved after setting metadata bits
+ % as those could bump the update sequence
+ DbInfo0 = get_db_info(Db),
+
+ % Split the one shard
+ [#shard{name=Shard1}] = lists:sort(mem3:local_shards(Db)),
+ {ok, JobId1} = mem3_reshard:start_split_job(Shard1),
+ wait_state(JobId1, completed),
+
+ % Perform some basic checks that the shard was split
+ Shards1 = lists:sort(mem3:local_shards(Db)),
+ ?assertEqual(2, length(Shards1)),
+ [#shard{range = R1}, #shard{range = R2}] = Shards1,
+ ?assertEqual([16#00000000, 16#7fffffff], R1),
+ ?assertEqual([16#80000000, 16#ffffffff], R2),
+
+ % Check metadata bits after the split
+ ?assertEqual(942, get_revs_limit(Db)),
+ ?assertEqual(943, get_purge_infos_limit(Db)),
+ ?assertEqual(SecObj, get_security(Db)),
+
+ DbInfo1 = get_db_info(Db),
+ Docs1 = get_all_docs(Db),
+ Local1 = get_local_docs(Db),
+
+ % When comparing db infos, ignore update sequences they won't be the
+ % same since they are more shards involved after the split
+ ?assertEqual(without_seqs(DbInfo0), without_seqs(DbInfo1)),
+
+ % Update seq prefix number is a sum of all shard update sequences
+ #{<<"update_seq">> := UpdateSeq0} = update_seq_to_num(DbInfo0),
+ #{<<"update_seq">> := UpdateSeq1} = update_seq_to_num(DbInfo1),
+ ?assertEqual(UpdateSeq0 * 2, UpdateSeq1),
+
+ ?assertEqual(Docs0, Docs1),
+ ?assertEqual(without_meta_locals(Local0), without_meta_locals(Local1)),
+
+ % Split the first range again
+ [#shard{name=Shard2}, _] = lists:sort(mem3:local_shards(Db)),
+ {ok, JobId2} = mem3_reshard:start_split_job(Shard2),
+ wait_state(JobId2, completed),
+
+ Shards2 = lists:sort(mem3:local_shards(Db)),
+ ?assertEqual(3, length(Shards2)),
+ [R3, R4, R5] = [R || #shard{range = R} <- Shards2],
+ ?assertEqual([16#00000000, 16#3fffffff], R3),
+ ?assertEqual([16#40000000, 16#7fffffff], R4),
+ ?assertEqual([16#80000000, 16#ffffffff], R5),
+
+ % Check metadata bits after the second split
+ ?assertEqual(942, get_revs_limit(Db)),
+ ?assertEqual(943, get_purge_infos_limit(Db)),
+ ?assertEqual(SecObj, get_security(Db)),
+
+ DbInfo2 = get_db_info(Db),
+ Docs2 = get_all_docs(Db),
+ Local2 = get_local_docs(Db),
+
+ ?assertEqual(without_seqs(DbInfo1), without_seqs(DbInfo2)),
+ % Update seq prefix number is a sum of all shard update sequences
+ % But only 1 shard out of 2 was split
+ #{<<"update_seq">> := UpdateSeq2} = update_seq_to_num(DbInfo2),
+ ?assertEqual(trunc(UpdateSeq1 * 1.5), UpdateSeq2),
+ ?assertEqual(Docs1, Docs2),
+ ?assertEqual(without_meta_locals(Local1), without_meta_locals(Local2))
+ end)}.
+
+
+couch_events_are_emitted(#{db1 := Db}) ->
+ {timeout, ?TIMEOUT, ?_test(begin
+ couch_event:register_all(self()),
+
+ % Split the one shard
+ [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)),
+ {ok, JobId} = mem3_reshard:start_split_job(Shard),
+ wait_state(JobId, completed),
+
+ % Perform some basic checks that the shard was split
+ Shards1 = lists:sort(mem3:local_shards(Db)),
+ ?assertEqual(2, length(Shards1)),
+ [#shard{range = R1}, #shard{range = R2}] = Shards1,
+ ?assertEqual([16#00000000, 16#7fffffff], R1),
+ ?assertEqual([16#80000000, 16#ffffffff], R2),
+
+ Flush = fun F(Events) ->
+ receive
+ {'$couch_event', DbName, Event} when Event =:= deleted
+ orelse Event =:= updated ->
+ case binary:match(DbName, Db) of
+ nomatch -> F(Events);
+ {_, _} -> F([Event | Events])
+ end
+ after 0 ->
+ lists:reverse(Events)
+ end
+ end,
+ Events = Flush([]),
+ StartAtDeleted = lists:dropwhile(fun(E) -> E =/= deleted end, Events),
+ ?assertMatch([deleted, deleted, updated, updated | _], StartAtDeleted),
+ couch_event:unregister(self())
+ end)}.
+
+
+retries_work(#{db1 := Db}) ->
+ {timeout, ?TIMEOUT, ?_test(begin
+ meck:expect(couch_db_split, split, fun(_, _, _) ->
+ error(kapow)
+ end),
+
+ [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)),
+ {ok, JobId} = mem3_reshard:start_split_job(Shard),
+
+ wait_state(JobId, failed),
+ ?assertEqual(3, meck:num_calls(couch_db_split, split, 3))
+ end)}.
+
+
+target_reset_in_initial_copy(#{db1 := Db}) ->
+ {timeout, ?TIMEOUT, ?_test(begin
+ [#shard{} = Src] = lists:sort(mem3:local_shards(Db)),
+ Job = #job{
+ source = Src,
+ target = [#shard{name= <<"t1">>}, #shard{name = <<"t2">>}],
+ job_state = running,
+ split_state = initial_copy
+ },
+ BogusParent = spawn(fun() -> receive {ack, _, _} -> ok end end),
+ put('$ancestors', [BogusParent]), % make prock_lib:ack not blow up
+ meck:expect(mem3_reshard, checkpoint, 2, ok),
+ meck:expect(couch_db_split, cleanup_target, 2, ok),
+ meck:expect(couch_server, exists, fun
+ (<<"t1">>) -> true;
+ (<<"t2">>) -> true;
+ (DbName) -> meck:passthrough([DbName])
+ end),
+ JobPid = spawn(fun() -> mem3_reshard_job:init(Job) end),
+ meck:wait(2, couch_db_split, cleanup_target, ['_', '_'], 5000),
+ exit(JobPid, kill),
+ exit(BogusParent, kill),
+ ?assertEqual(2, meck:num_calls(couch_db_split, cleanup_target, 2))
+ end)}.
+
+
+split_an_incomplete_shard_map(#{db1 := Db}) ->
+ {timeout, ?TIMEOUT, ?_test(begin
+ [#shard{} = Src] = lists:sort(mem3:local_shards(Db)),
+ [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)),
+ meck:expect(mem3_util, calculate_max_n, 1, 0),
+ ?assertMatch({error, {not_enough_shard_copies, _}},
+ mem3_reshard:start_split_job(Shard))
+ end)}.
+
+
+intercept_state(State) ->
+ TestPid = self(),
+ meck:new(mem3_reshard_job, [passthrough]),
+ meck:expect(mem3_reshard_job, checkpoint_done, fun(Job) ->
+ case Job#job.split_state of
+ State ->
+ TestPid ! {self(), State},
+ receive
+ continue -> meck:passthrough([Job]);
+ cancel -> ok
+ end;
+ _ ->
+ meck:passthrough([Job])
+ end
+ end).
+
+
+wait_state(JobId, State) ->
+ test_util:wait(fun() ->
+ case mem3_reshard:job(JobId) of
+ {ok, {Props}} ->
+ case couch_util:get_value(job_state, Props) of
+ State -> ok;
+ _ -> timer:sleep(100), wait
+ end;
+ {error, not_found} -> timer:sleep(100), wait
+ end
+ end, 30000).
+
+
+set_revs_limit(DbName, Limit) ->
+ with_proc(fun() -> fabric:set_revs_limit(DbName, Limit, [?ADMIN_CTX]) end).
+
+
+get_revs_limit(DbName) ->
+ with_proc(fun() -> fabric:get_revs_limit(DbName) end).
+
+
+get_purge_infos_limit(DbName) ->
+ with_proc(fun() -> fabric:get_purge_infos_limit(DbName) end).
+
+
+set_purge_infos_limit(DbName, Limit) ->
+ with_proc(fun() ->
+ fabric:set_purge_infos_limit(DbName, Limit, [?ADMIN_CTX])
+ end).
+
+
+set_security(DbName, SecObj) ->
+ with_proc(fun() -> fabric:set_security(DbName, SecObj) end).
+
+
+get_security(DbName) ->
+ with_proc(fun() -> fabric:get_security(DbName, [?ADMIN_CTX]) end).
+
+
+get_db_info(DbName) ->
+ with_proc(fun() ->
+ {ok, Info} = fabric:get_db_info(DbName),
+ maps:with([
+ <<"db_name">>, <<"doc_count">>, <<"props">>, <<"doc_del_count">>,
+ <<"update_seq">>, <<"purge_seq">>, <<"disk_format_version">>
+ ], to_map(Info))
+ end).
+
+
+get_group_info(DbName, DesignId) ->
+ with_proc(fun() ->
+ {ok, GInfo} = fabric:get_view_group_info(DbName, DesignId),
+ maps:with([
+ <<"language">>, <<"purge_seq">>, <<"signature">>, <<"update_seq">>
+ ], to_map(GInfo))
+ end).
+
+
+get_partition_info(DbName, Partition) ->
+ with_proc(fun() ->
+ {ok, PInfo} = fabric:get_partition_info(DbName, Partition),
+ maps:with([
+ <<"db_name">>, <<"doc_count">>, <<"doc_del_count">>, <<"partition">>
+ ], to_map(PInfo))
+ end).
+
+
+get_all_docs(DbName) ->
+ get_all_docs(DbName, #mrargs{}).
+
+
+get_all_docs(DbName, #mrargs{} = QArgs0) ->
+ GL = erlang:group_leader(),
+ with_proc(fun() ->
+ Cb = fun
+ ({row, Props}, Acc) ->
+ Doc = to_map(couch_util:get_value(doc, Props)),
+ #{?ID := Id} = Doc,
+ {ok, Acc#{Id => Doc}};
+ ({meta, _}, Acc) -> {ok, Acc};
+ (complete, Acc) -> {ok, Acc}
+ end,
+ QArgs = QArgs0#mrargs{include_docs = true},
+ {ok, Docs} = fabric:all_docs(DbName, Cb, #{}, QArgs),
+ Docs
+ end, GL).
+
+
+get_local_docs(DbName) ->
+ LocalNS = {namespace, <<"_local">>},
+ maps:map(fun(_, Doc) ->
+ maps:without([<<"_rev">>], Doc)
+ end, get_all_docs(DbName, #mrargs{extra = [LocalNS]})).
+
+
+without_seqs(#{} = InfoMap) ->
+ maps:without([<<"update_seq">>, <<"purge_seq">>], InfoMap).
+
+
+without_meta_locals(#{} = Local) ->
+ maps:filter(fun
+ (<<"_local/purge-mrview-", _/binary>>, _) -> false;
+ (<<"_local/shard-sync-", _/binary>>, _) -> false;
+ (_, _) -> true
+ end, Local).
+
+
+update_seq_to_num(#{} = InfoMap) ->
+ maps:map(fun
+ (<<"update_seq">>, Seq) -> seq_to_num(Seq);
+ (<<"purge_seq">>, PSeq) -> seq_to_num(PSeq);
+ (_, V) -> V
+ end, InfoMap).
+
+
+seq_to_num(Seq) ->
+ [SeqNum, _] = binary:split(Seq, <<"-">>),
+ binary_to_integer(SeqNum).
+
+
+to_map([_ | _] = Props) ->
+ to_map({Props});
+
+to_map({[_ | _]} = EJson) ->
+ jiffy:decode(jiffy:encode(EJson), [return_maps]).
+
+
+create_db(DbName, Opts) ->
+ GL = erlang:group_leader(),
+ with_proc(fun() -> fabric:create_db(DbName, Opts) end, GL).
+
+
+delete_db(DbName) ->
+ GL = erlang:group_leader(),
+ with_proc(fun() -> fabric:delete_db(DbName, [?ADMIN_CTX]) end, GL).
+
+
+with_proc(Fun) ->
+ with_proc(Fun, undefined, 30000).
+
+
+with_proc(Fun, GroupLeader) ->
+ with_proc(Fun, GroupLeader, 30000).
+
+
+with_proc(Fun, GroupLeader, Timeout) ->
+ {Pid, Ref} = spawn_monitor(fun() ->
+ case GroupLeader of
+ undefined -> ok;
+ _ -> erlang:group_leader(GroupLeader, self())
+ end,
+ exit({with_proc_res, Fun()})
+ end),
+ receive
+ {'DOWN', Ref, process, Pid, {with_proc_res, Res}} ->
+ Res;
+ {'DOWN', Ref, process, Pid, Error} ->
+ error(Error)
+ after Timeout ->
+ erlang:demonitor(Ref, [flush]),
+ exit(Pid, kill),
+ error({with_proc_timeout, Fun, Timeout})
+ end.
+
+
+add_test_docs(DbName, #{} = DocSpec) ->
+ Docs = docs(maps:get(docs, DocSpec, []))
+ ++ pdocs(maps:get(pdocs, DocSpec, #{}))
+ ++ ddocs(mrview, maps:get(mrview, DocSpec, []))
+ ++ ddocs(search, maps:get(search, DocSpec, []))
+ ++ ddocs(geo, maps:get(geo, DocSpec, []))
+ ++ ldocs(maps:get(local, DocSpec, [])),
+ Res = update_docs(DbName, Docs),
+ Docs1 = lists:map(fun({Doc, {ok, {RevPos, Rev}}}) ->
+ Doc#doc{revs = {RevPos, [Rev]}}
+ end, lists:zip(Docs, Res)),
+ case delete_docs(maps:get(delete, DocSpec, []), Docs1) of
+ [] -> ok;
+ [_ | _] = Deleted -> update_docs(DbName, Deleted)
+ end,
+ ok.
+
+
+update_docs(DbName, Docs) ->
+ with_proc(fun() ->
+ case fabric:update_docs(DbName, Docs, [?ADMIN_CTX]) of
+ {accepted, Res} -> Res;
+ {ok, Res} -> Res
+ end
+ end).
+
+
+delete_docs([S, E], Docs) when E >= S ->
+ ToDelete = [doc_id(<<"">>, I) || I <- lists:seq(S, E)],
+ lists:filtermap(fun(#doc{id = Id} = Doc) ->
+ case lists:member(Id, ToDelete) of
+ true -> {true, Doc#doc{deleted = true}};
+ false -> false
+ end
+ end, Docs);
+delete_docs(_, _) ->
+ [].
+
+
+pdocs(#{} = PMap) ->
+ maps:fold(fun(Part, DocSpec, DocsAcc) ->
+ docs(DocSpec, <<Part/binary, ":">>) ++ DocsAcc
+ end, [], PMap).
+
+
+docs(DocSpec) ->
+ docs(DocSpec, <<"">>).
+
+
+docs(N, Prefix) when is_integer(N), N > 0 ->
+ docs([0, N - 1], Prefix);
+docs([S, E], Prefix) when E >= S ->
+ [doc(Prefix, I) || I <- lists:seq(S, E)];
+docs(_, _) ->
+ [].
+
+ddocs(Type, N) when is_integer(N), N > 0 ->
+ ddocs(Type, [0, N - 1]);
+ddocs(Type, [S, E]) when E >= S ->
+ Body = ddprop(Type),
+ BType = atom_to_binary(Type, utf8),
+ [doc(<<"_design/", BType/binary>>, I, Body, 0) || I <- lists:seq(S, E)];
+ddocs(_, _) ->
+ [].
+
+
+ldocs(N) when is_integer(N), N > 0 ->
+ ldocs([0, N - 1]);
+ldocs([S, E]) when E >= S ->
+ [doc(<<"_local/">>, I, bodyprops(), 0) || I <- lists:seq(S, E)];
+ldocs(_) ->
+ [].
+
+
+
+doc(Pref, Id) ->
+ Body = bodyprops(),
+ doc(Pref, Id, Body, 42).
+
+
+doc(Pref, Id, BodyProps, AttSize) ->
+ #doc{
+ id = doc_id(Pref, Id),
+ body = {BodyProps},
+ atts = atts(AttSize)
+ }.
+
+
+doc_id(Pref, Id) ->
+ IdBin = iolist_to_binary(io_lib:format("~5..0B", [Id])),
+ <<Pref/binary, IdBin/binary>>.
+
+
+ddprop(mrview) ->
+ [
+ {<<"views">>, {[
+ {<<"v1">>, {[
+ {<<"map">>, <<"function(d){emit(d);}">>}
+ ]}}
+ ]}}
+ ];
+
+ddprop(geo) ->
+ [
+ {<<"st_indexes">>, {[
+ {<<"area">>, {[
+ {<<"analyzer">>, <<"standard">>},
+ {<<"index">>, <<"function(d){if(d.g){st_index(d.g)}}">> }
+ ]}}
+ ]}}
+ ];
+
+ddprop(search) ->
+ [
+ {<<"indexes">>, {[
+ {<<"types">>, {[
+ {<<"index">>, <<"function(d){if(d.g){st_index(d.g.type)}}">>}
+ ]}}
+ ]}}
+ ].
+
+
+bodyprops() ->
+ [
+ {<<"g">>, {[
+ {<<"type">>, <<"Polygon">>},
+ {<<"coordinates">>, [[[-71.0, 48.4], [-70.0, 48.4], [-71.0, 48.4]]]}
+ ]}}
+ ].
+
+
+atts(0) ->
+ [];
+
+atts(Size) when is_integer(Size), Size >= 1 ->
+ Data = << <<"x">> || _ <- lists:seq(1, Size) >>,
+ [couch_att:new([
+ {name, <<"att">>},
+ {type, <<"app/binary">>},
+ {att_len, Size},
+ {data, Data}
+ ])].