diff options
Diffstat (limited to 'src/mem3/test/eunit/mem3_reshard_test.erl')
-rw-r--r-- | src/mem3/test/eunit/mem3_reshard_test.erl | 834 |
1 files changed, 0 insertions, 834 deletions
diff --git a/src/mem3/test/eunit/mem3_reshard_test.erl b/src/mem3/test/eunit/mem3_reshard_test.erl deleted file mode 100644 index 7cd6b1fe6..000000000 --- a/src/mem3/test/eunit/mem3_reshard_test.erl +++ /dev/null @@ -1,834 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(mem3_reshard_test). - - --include_lib("couch/include/couch_eunit.hrl"). --include_lib("couch/include/couch_db.hrl"). --include_lib("mem3/src/mem3_reshard.hrl"). --include_lib("couch_mrview/include/couch_mrview.hrl"). % for all_docs function - --define(ID, <<"_id">>). --define(TIMEOUT, 60). - -setup() -> - HaveDreyfus = code:lib_dir(dreyfus) /= {error, bad_name}, - case HaveDreyfus of false -> ok; true -> - mock_dreyfus_indices() - end, - - HaveHastings = code:lib_dir(hastings) /= {error, bad_name}, - case HaveHastings of false -> ok; true -> - mock_hastings_indices() - end, - {Db1, Db2} = {?tempdb(), ?tempdb()}, - create_db(Db1, [{q, 1}, {n, 1}]), - PartProps = [{partitioned, true}, {hash, [couch_partition, hash, []]}], - create_db(Db2, [{q, 1}, {n, 1}, {props, PartProps}]), - config:set("reshard", "retry_interval_sec", "0", _Persist=false), - #{db1 => Db1, db2 => Db2}. - - -teardown(#{} = Dbs) -> - mem3_reshard:reset_state(), - maps:map(fun(_, Db) -> delete_db(Db) end, Dbs), - config:delete("reshard", "retry_interval_sec", _Persist=false), - meck:unload(). - - -start_couch() -> - test_util:start_couch(?CONFIG_CHAIN, [mem3, fabric]). - - -stop_couch(Ctx) -> - test_util:stop_couch(Ctx). - - -mem3_reshard_db_test_() -> - { - "mem3 shard split db tests", - { - setup, - fun start_couch/0, fun stop_couch/1, - { - foreach, - fun setup/0, fun teardown/1, - [ - fun split_one_shard/1, - fun update_docs_before_topoff1/1, - fun indices_are_built/1, - fun split_partitioned_db/1, - fun split_twice/1, - fun couch_events_are_emitted/1, - fun retries_work/1, - fun target_reset_in_initial_copy/1, - fun split_an_incomplete_shard_map/1, - fun target_shards_are_locked/1 - ] - } - } - }. - - -% This is a basic test to check that shard splitting preserves documents, and -% db meta props like revs limits and security. -split_one_shard(#{db1 := Db}) -> - {timeout, ?TIMEOUT, ?_test(begin - DocSpec = #{docs => 10, delete => [5, 9], mrview => 1, local => 1}, - add_test_docs(Db, DocSpec), - - % Save documents before the split - Docs0 = get_all_docs(Db), - Local0 = get_local_docs(Db), - - % Set some custom metadata properties - set_revs_limit(Db, 942), - set_purge_infos_limit(Db, 943), - SecObj = {[{<<"foo">>, <<"bar">>}]}, - set_security(Db, SecObj), - - % DbInfo is saved after setting metadata bits - % as those could bump the update sequence - DbInfo0 = get_db_info(Db), - - % Split the one shard - [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)), - {ok, JobId} = mem3_reshard:start_split_job(Shard), - wait_state(JobId, completed), - - % Perform some basic checks that the shard was split - Shards1 = lists:sort(mem3:local_shards(Db)), - ?assertEqual(2, length(Shards1)), - [#shard{range = R1}, #shard{range = R2}] = Shards1, - ?assertEqual([16#00000000, 16#7fffffff], R1), - ?assertEqual([16#80000000, 16#ffffffff], R2), - - % Check metadata bits after the split - ?assertEqual(942, get_revs_limit(Db)), - ?assertEqual(943, get_purge_infos_limit(Db)), - ?assertEqual(SecObj, get_security(Db)), - - DbInfo1 = get_db_info(Db), - Docs1 = get_all_docs(Db), - Local1 = get_local_docs(Db), - - % When comparing db infos, ignore update sequences they won't be the - % same since they are more shards involved after the split - ?assertEqual(without_seqs(DbInfo0), without_seqs(DbInfo1)), - - % Update seq prefix number is a sum of all shard update sequences - #{<<"update_seq">> := UpdateSeq0} = update_seq_to_num(DbInfo0), - #{<<"update_seq">> := UpdateSeq1} = update_seq_to_num(DbInfo1), - ?assertEqual(UpdateSeq0 * 2, UpdateSeq1), - - % Finally compare that the documents are still there after the split - ?assertEqual(Docs0, Docs1), - - % Don't forget about the local but don't include internal checkpoints - % as some of those are munged and transformed during the split - ?assertEqual(without_meta_locals(Local0), without_meta_locals(Local1)) - end)}. - - -% This test checks that document added while the shard is being split are not -% lost. Topoff1 state happens before indices are built -update_docs_before_topoff1(#{db1 := Db}) -> - {timeout, ?TIMEOUT, ?_test(begin - add_test_docs(Db, #{docs => 10}), - - intercept_state(topoff1), - - [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)), - {ok, JobId} = mem3_reshard:start_split_job(Shard), - - receive {JobPid, topoff1} -> ok end, - add_test_docs(Db, #{docs => [10, 19], local => 1}), - Docs0 = get_all_docs(Db), - Local0 = get_local_docs(Db), - DbInfo0 = get_db_info(Db), - JobPid ! continue, - - wait_state(JobId, completed), - - % Perform some basic checks that the shard was split - Shards1 = lists:sort(mem3:local_shards(Db)), - ?assertEqual(2, length(Shards1)), - - DbInfo1 = get_db_info(Db), - Docs1 = get_all_docs(Db), - Local1 = get_local_docs(Db), - - ?assertEqual(without_seqs(DbInfo0), without_seqs(DbInfo1)), - - % Update sequence after initial copy with 10 docs would be 10 on each - % target shard (to match the source) and the total update sequence - % would have been 20. But then 10 more docs were added (3 might have - % ended up on one target and 7 on another) so the final update sequence - % would then be 20 + 10 = 30. - ?assertMatch(#{<<"update_seq">> := 30}, update_seq_to_num(DbInfo1)), - - ?assertEqual(Docs0, Docs1), - ?assertEqual(without_meta_locals(Local0), without_meta_locals(Local1)) - end)}. - - -% This test that indices are built during shard splitting. -indices_are_built(#{db1 := Db}) -> - {timeout, ?TIMEOUT, ?_test(begin - HaveDreyfus = code:lib_dir(dreyfus) /= {error, bad_name}, - HaveHastings = code:lib_dir(hastings) /= {error, bad_name}, - - add_test_docs(Db, #{docs => 10, mrview => 2, search => 2, geo => 2}), - [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)), - {ok, JobId} = mem3_reshard:start_split_job(Shard), - wait_state(JobId, completed), - Shards1 = lists:sort(mem3:local_shards(Db)), - ?assertEqual(2, length(Shards1)), - MRViewGroupInfo = get_group_info(Db, <<"_design/mrview00000">>), - ?assertMatch(#{<<"update_seq">> := 32}, MRViewGroupInfo), - - HaveDreyfus = code:lib_dir(dreyfus) /= {error, bad_name}, - case HaveDreyfus of false -> ok; true -> - % 4 because there are 2 indices and 2 target shards - ?assertEqual(4, meck:num_calls(dreyfus_index, await, 2)) - end, - - HaveHastings = code:lib_dir(hastings) /= {error, bad_name}, - case HaveHastings of false -> ok; true -> - % 4 because there are 2 indices and 2 target shards - ?assertEqual(4, meck:num_calls(hastings_index, await, 2)) - end - end)}. - - -mock_dreyfus_indices() -> - meck:expect(dreyfus_index, design_doc_to_indexes, fun(Doc) -> - #doc{body = {BodyProps}} = Doc, - case couch_util:get_value(<<"indexes">>, BodyProps) of - undefined -> - []; - {[_]} -> - [{dreyfus, <<"db">>, dreyfus_index1}] - end - end), - meck:expect(dreyfus_index_manager, get_index, fun(_, _) -> {ok, pid} end), - meck:expect(dreyfus_index, await, fun(_, _) -> ok end). - - -mock_hastings_indices() -> - meck:expect(hastings_index, design_doc_to_indexes, fun(Doc) -> - #doc{body = {BodyProps}} = Doc, - case couch_util:get_value(<<"st_indexes">>, BodyProps) of - undefined -> - []; - {[_]} -> - [{hastings, <<"db">>, hastings_index1}] - end - end), - meck:expect(hastings_index_manager, get_index, fun(_, _) -> {ok, pid} end), - meck:expect(hastings_index, await, fun(_, _) -> ok end). - -% Split partitioned database -split_partitioned_db(#{db2 := Db}) -> - {timeout, ?TIMEOUT, ?_test(begin - DocSpec = #{ - pdocs => #{ - <<"PX">> => 5, - <<"PY">> => 5 - }, - mrview => 1, - local => 1 - }, - add_test_docs(Db, DocSpec), - - % Save documents before the split - Docs0 = get_all_docs(Db), - Local0 = get_local_docs(Db), - - % Set some custom metadata properties - set_revs_limit(Db, 942), - set_purge_infos_limit(Db, 943), - SecObj = {[{<<"foo">>, <<"bar">>}]}, - set_security(Db, SecObj), - - % DbInfo is saved after setting metadata bits - % as those could bump the update sequence - DbInfo0 = get_db_info(Db), - PX0 = get_partition_info(Db, <<"PX">>), - PY0 = get_partition_info(Db, <<"PY">>), - - % Split the one shard - [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)), - {ok, JobId} = mem3_reshard:start_split_job(Shard), - wait_state(JobId, completed), - - % Perform some basic checks that the shard was split - Shards1 = lists:sort(mem3:local_shards(Db)), - ?assertEqual(2, length(Shards1)), - [#shard{range = R1}, #shard{range = R2}] = Shards1, - ?assertEqual([16#00000000, 16#7fffffff], R1), - ?assertEqual([16#80000000, 16#ffffffff], R2), - - % Check metadata bits after the split - ?assertEqual(942, get_revs_limit(Db)), - ?assertEqual(943, get_purge_infos_limit(Db)), - ?assertEqual(SecObj, get_security(Db)), - - DbInfo1 = get_db_info(Db), - Docs1 = get_all_docs(Db), - Local1 = get_local_docs(Db), - - % When comparing db infos, ignore update sequences they won't be the - % same since they are more shards involved after the split - ?assertEqual(without_seqs(DbInfo0), without_seqs(DbInfo1)), - - % Update seq prefix number is a sum of all shard update sequences - #{<<"update_seq">> := UpdateSeq0} = update_seq_to_num(DbInfo0), - #{<<"update_seq">> := UpdateSeq1} = update_seq_to_num(DbInfo1), - ?assertEqual(UpdateSeq0 * 2, UpdateSeq1), - - % Finally compare that documents are still there after the split - ?assertEqual(Docs0, Docs1), - - ?assertEqual(PX0, get_partition_info(Db, <<"PX">>)), - ?assertEqual(PY0, get_partition_info(Db, <<"PY">>)), - - % Don't forget about the local but don't include internal checkpoints - % as some of those are munged and transformed during the split - ?assertEqual(without_meta_locals(Local0), without_meta_locals(Local1)) - end)}. - - -% Make sure a shard can be split again after it was split once. This checks that -% too many got added to some range, such that on next split they'd fail to fit -% in to any of the new target ranges. -split_twice(#{db1 := Db}) -> - {timeout, ?TIMEOUT, ?_test(begin - DocSpec = #{docs => 100, delete => [80, 99], mrview => 2, local => 100}, - add_test_docs(Db, DocSpec), - - % Save documents before the split - Docs0 = get_all_docs(Db), - Local0 = get_local_docs(Db), - - % Set some custom metadata properties - set_revs_limit(Db, 942), - set_purge_infos_limit(Db, 943), - SecObj = {[{<<"foo">>, <<"bar">>}]}, - set_security(Db, SecObj), - - % DbInfo is saved after setting metadata bits - % as those could bump the update sequence - DbInfo0 = get_db_info(Db), - - % Split the one shard - [#shard{name=Shard1}] = lists:sort(mem3:local_shards(Db)), - {ok, JobId1} = mem3_reshard:start_split_job(Shard1), - wait_state(JobId1, completed), - - % Perform some basic checks that the shard was split - Shards1 = lists:sort(mem3:local_shards(Db)), - ?assertEqual(2, length(Shards1)), - [#shard{range = R1}, #shard{range = R2}] = Shards1, - ?assertEqual([16#00000000, 16#7fffffff], R1), - ?assertEqual([16#80000000, 16#ffffffff], R2), - - % Check metadata bits after the split - ?assertEqual(942, get_revs_limit(Db)), - ?assertEqual(943, get_purge_infos_limit(Db)), - ?assertEqual(SecObj, get_security(Db)), - - DbInfo1 = get_db_info(Db), - Docs1 = get_all_docs(Db), - Local1 = get_local_docs(Db), - - % When comparing db infos, ignore update sequences they won't be the - % same since they are more shards involved after the split - ?assertEqual(without_seqs(DbInfo0), without_seqs(DbInfo1)), - - % Update seq prefix number is a sum of all shard update sequences - #{<<"update_seq">> := UpdateSeq0} = update_seq_to_num(DbInfo0), - #{<<"update_seq">> := UpdateSeq1} = update_seq_to_num(DbInfo1), - ?assertEqual(UpdateSeq0 * 2, UpdateSeq1), - - ?assertEqual(Docs0, Docs1), - ?assertEqual(without_meta_locals(Local0), without_meta_locals(Local1)), - - % Split the first range again - [#shard{name=Shard2}, _] = lists:sort(mem3:local_shards(Db)), - {ok, JobId2} = mem3_reshard:start_split_job(Shard2), - wait_state(JobId2, completed), - - Shards2 = lists:sort(mem3:local_shards(Db)), - ?assertEqual(3, length(Shards2)), - [R3, R4, R5] = [R || #shard{range = R} <- Shards2], - ?assertEqual([16#00000000, 16#3fffffff], R3), - ?assertEqual([16#40000000, 16#7fffffff], R4), - ?assertEqual([16#80000000, 16#ffffffff], R5), - - % Check metadata bits after the second split - ?assertEqual(942, get_revs_limit(Db)), - ?assertEqual(943, get_purge_infos_limit(Db)), - ?assertEqual(SecObj, get_security(Db)), - - DbInfo2 = get_db_info(Db), - Docs2 = get_all_docs(Db), - Local2 = get_local_docs(Db), - - ?assertEqual(without_seqs(DbInfo1), without_seqs(DbInfo2)), - % Update seq prefix number is a sum of all shard update sequences - % But only 1 shard out of 2 was split - #{<<"update_seq">> := UpdateSeq2} = update_seq_to_num(DbInfo2), - ?assertEqual(trunc(UpdateSeq1 * 1.5), UpdateSeq2), - ?assertEqual(Docs1, Docs2), - ?assertEqual(without_meta_locals(Local1), without_meta_locals(Local2)) - end)}. - - -couch_events_are_emitted(#{db1 := Db}) -> - {timeout, ?TIMEOUT, ?_test(begin - couch_event:register_all(self()), - - % Split the one shard - [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)), - {ok, JobId} = mem3_reshard:start_split_job(Shard), - wait_state(JobId, completed), - - % Perform some basic checks that the shard was split - Shards1 = lists:sort(mem3:local_shards(Db)), - ?assertEqual(2, length(Shards1)), - [#shard{range = R1}, #shard{range = R2}] = Shards1, - ?assertEqual([16#00000000, 16#7fffffff], R1), - ?assertEqual([16#80000000, 16#ffffffff], R2), - - Flush = fun F(Events) -> - receive - {'$couch_event', DbName, Event} when Event =:= deleted - orelse Event =:= updated -> - case binary:match(DbName, Db) of - nomatch -> F(Events); - {_, _} -> F([Event | Events]) - end - after 0 -> - lists:reverse(Events) - end - end, - Events = Flush([]), - StartAtDeleted = lists:dropwhile(fun(E) -> E =/= deleted end, Events), - ?assertMatch([deleted, deleted, updated, updated | _], StartAtDeleted), - couch_event:unregister(self()) - end)}. - - -retries_work(#{db1 := Db}) -> - {timeout, ?TIMEOUT, ?_test(begin - meck:expect(couch_db_split, split, fun(_, _, _) -> - error(kapow) - end), - - [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)), - {ok, JobId} = mem3_reshard:start_split_job(Shard), - - wait_state(JobId, failed), - ?assertEqual(3, meck:num_calls(couch_db_split, split, 3)) - end)}. - - -target_reset_in_initial_copy(#{db1 := Db}) -> - {timeout, ?TIMEOUT, ?_test(begin - [#shard{} = Src] = lists:sort(mem3:local_shards(Db)), - Job = #job{ - source = Src, - target = [#shard{name= <<"t1">>}, #shard{name = <<"t2">>}], - job_state = running, - split_state = initial_copy - }, - meck:expect(couch_db_split, cleanup_target, 2, ok), - meck:expect(couch_server, exists, fun - (<<"t1">>) -> true; - (<<"t2">>) -> true; - (DbName) -> meck:passthrough([DbName]) - end), - JobPid = spawn(fun() -> mem3_reshard_job:initial_copy_impl(Job) end), - meck:wait(2, couch_db_split, cleanup_target, ['_', '_'], 5000), - exit(JobPid, kill), - ?assertEqual(2, meck:num_calls(couch_db_split, cleanup_target, 2)) - end)}. - - -split_an_incomplete_shard_map(#{db1 := Db}) -> - {timeout, ?TIMEOUT, ?_test(begin - [#shard{} = Src] = lists:sort(mem3:local_shards(Db)), - [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)), - meck:expect(mem3_util, calculate_max_n, 1, 0), - ?assertMatch({error, {not_enough_shard_copies, _}}, - mem3_reshard:start_split_job(Shard)) - end)}. - - -% Opening a db target db in initial copy phase will throw an error -target_shards_are_locked(#{db1 := Db}) -> - {timeout, ?TIMEOUT, ?_test(begin - add_test_docs(Db, #{docs => 10}), - - % Make the job stops right when it was about to copy the docs - TestPid = self(), - meck:new(couch_db, [passthrough]), - meck:expect(couch_db, start_link, fun(Engine, TName, FilePath, Opts) -> - TestPid ! {start_link, self(), TName}, - receive - continue -> - meck:passthrough([Engine, TName, FilePath, Opts]) - end - end), - - [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)), - {ok, JobId} = mem3_reshard:start_split_job(Shard), - {Target0, JobPid} = receive - {start_link, Pid, TName} -> {TName, Pid} - end, - ?assertEqual({error, {locked, <<"shard splitting">>}}, - couch_db:open_int(Target0, [])), - - % Send two continues for two targets - JobPid ! continue, - JobPid ! continue, - - wait_state(JobId, completed) - end)}. - - -intercept_state(State) -> - TestPid = self(), - meck:new(mem3_reshard_job, [passthrough]), - meck:expect(mem3_reshard_job, checkpoint_done, fun(Job) -> - case Job#job.split_state of - State -> - TestPid ! {self(), State}, - receive - continue -> meck:passthrough([Job]); - cancel -> ok - end; - _ -> - meck:passthrough([Job]) - end - end). - - -wait_state(JobId, State) -> - test_util:wait(fun() -> - case mem3_reshard:job(JobId) of - {ok, {Props}} -> - case couch_util:get_value(job_state, Props) of - State -> ok; - _ -> timer:sleep(100), wait - end; - {error, not_found} -> timer:sleep(100), wait - end - end, 30000). - - -set_revs_limit(DbName, Limit) -> - with_proc(fun() -> fabric:set_revs_limit(DbName, Limit, [?ADMIN_CTX]) end). - - -get_revs_limit(DbName) -> - with_proc(fun() -> fabric:get_revs_limit(DbName) end). - - -get_purge_infos_limit(DbName) -> - with_proc(fun() -> fabric:get_purge_infos_limit(DbName) end). - - -set_purge_infos_limit(DbName, Limit) -> - with_proc(fun() -> - fabric:set_purge_infos_limit(DbName, Limit, [?ADMIN_CTX]) - end). - - -set_security(DbName, SecObj) -> - with_proc(fun() -> fabric:set_security(DbName, SecObj) end). - - -get_security(DbName) -> - with_proc(fun() -> fabric:get_security(DbName, [?ADMIN_CTX]) end). - - -get_db_info(DbName) -> - with_proc(fun() -> - {ok, Info} = fabric:get_db_info(DbName), - maps:with([ - <<"db_name">>, <<"doc_count">>, <<"props">>, <<"doc_del_count">>, - <<"update_seq">>, <<"purge_seq">>, <<"disk_format_version">> - ], to_map(Info)) - end). - - -get_group_info(DbName, DesignId) -> - with_proc(fun() -> - {ok, GInfo} = fabric:get_view_group_info(DbName, DesignId), - maps:with([ - <<"language">>, <<"purge_seq">>, <<"signature">>, <<"update_seq">> - ], to_map(GInfo)) - end). - - -get_partition_info(DbName, Partition) -> - with_proc(fun() -> - {ok, PInfo} = fabric:get_partition_info(DbName, Partition), - maps:with([ - <<"db_name">>, <<"doc_count">>, <<"doc_del_count">>, <<"partition">> - ], to_map(PInfo)) - end). - - -get_all_docs(DbName) -> - get_all_docs(DbName, #mrargs{}). - - -get_all_docs(DbName, #mrargs{} = QArgs0) -> - GL = erlang:group_leader(), - with_proc(fun() -> - Cb = fun - ({row, Props}, Acc) -> - Doc = to_map(couch_util:get_value(doc, Props)), - #{?ID := Id} = Doc, - {ok, Acc#{Id => Doc}}; - ({meta, _}, Acc) -> {ok, Acc}; - (complete, Acc) -> {ok, Acc} - end, - QArgs = QArgs0#mrargs{include_docs = true}, - {ok, Docs} = fabric:all_docs(DbName, Cb, #{}, QArgs), - Docs - end, GL). - - -get_local_docs(DbName) -> - LocalNS = {namespace, <<"_local">>}, - maps:map(fun(_, Doc) -> - maps:without([<<"_rev">>], Doc) - end, get_all_docs(DbName, #mrargs{extra = [LocalNS]})). - - -without_seqs(#{} = InfoMap) -> - maps:without([<<"update_seq">>, <<"purge_seq">>], InfoMap). - - -without_meta_locals(#{} = Local) -> - maps:filter(fun - (<<"_local/purge-mrview-", _/binary>>, _) -> false; - (<<"_local/shard-sync-", _/binary>>, _) -> false; - (_, _) -> true - end, Local). - - -update_seq_to_num(#{} = InfoMap) -> - maps:map(fun - (<<"update_seq">>, Seq) -> seq_to_num(Seq); - (<<"purge_seq">>, PSeq) -> seq_to_num(PSeq); - (_, V) -> V - end, InfoMap). - - -seq_to_num(Seq) -> - [SeqNum, _] = binary:split(Seq, <<"-">>), - binary_to_integer(SeqNum). - - -to_map([_ | _] = Props) -> - to_map({Props}); - -to_map({[_ | _]} = EJson) -> - jiffy:decode(jiffy:encode(EJson), [return_maps]). - - -create_db(DbName, Opts) -> - GL = erlang:group_leader(), - with_proc(fun() -> fabric:create_db(DbName, Opts) end, GL). - - -delete_db(DbName) -> - GL = erlang:group_leader(), - with_proc(fun() -> fabric:delete_db(DbName, [?ADMIN_CTX]) end, GL). - - -with_proc(Fun) -> - with_proc(Fun, undefined, 30000). - - -with_proc(Fun, GroupLeader) -> - with_proc(Fun, GroupLeader, 30000). - - -with_proc(Fun, GroupLeader, Timeout) -> - {Pid, Ref} = spawn_monitor(fun() -> - case GroupLeader of - undefined -> ok; - _ -> erlang:group_leader(GroupLeader, self()) - end, - exit({with_proc_res, Fun()}) - end), - receive - {'DOWN', Ref, process, Pid, {with_proc_res, Res}} -> - Res; - {'DOWN', Ref, process, Pid, Error} -> - error(Error) - after Timeout -> - erlang:demonitor(Ref, [flush]), - exit(Pid, kill), - error({with_proc_timeout, Fun, Timeout}) - end. - - -add_test_docs(DbName, #{} = DocSpec) -> - Docs = docs(maps:get(docs, DocSpec, [])) - ++ pdocs(maps:get(pdocs, DocSpec, #{})) - ++ ddocs(mrview, maps:get(mrview, DocSpec, [])) - ++ ddocs(search, maps:get(search, DocSpec, [])) - ++ ddocs(geo, maps:get(geo, DocSpec, [])) - ++ ldocs(maps:get(local, DocSpec, [])), - Res = update_docs(DbName, Docs), - Docs1 = lists:map(fun({Doc, {ok, {RevPos, Rev}}}) -> - Doc#doc{revs = {RevPos, [Rev]}} - end, lists:zip(Docs, Res)), - case delete_docs(maps:get(delete, DocSpec, []), Docs1) of - [] -> ok; - [_ | _] = Deleted -> update_docs(DbName, Deleted) - end, - ok. - - -update_docs(DbName, Docs) -> - with_proc(fun() -> - case fabric:update_docs(DbName, Docs, [?ADMIN_CTX]) of - {accepted, Res} -> Res; - {ok, Res} -> Res - end - end). - - -delete_docs([S, E], Docs) when E >= S -> - ToDelete = [doc_id(<<"">>, I) || I <- lists:seq(S, E)], - lists:filtermap(fun(#doc{id = Id} = Doc) -> - case lists:member(Id, ToDelete) of - true -> {true, Doc#doc{deleted = true}}; - false -> false - end - end, Docs); -delete_docs(_, _) -> - []. - - -pdocs(#{} = PMap) -> - maps:fold(fun(Part, DocSpec, DocsAcc) -> - docs(DocSpec, <<Part/binary, ":">>) ++ DocsAcc - end, [], PMap). - - -docs(DocSpec) -> - docs(DocSpec, <<"">>). - - -docs(N, Prefix) when is_integer(N), N > 0 -> - docs([0, N - 1], Prefix); -docs([S, E], Prefix) when E >= S -> - [doc(Prefix, I) || I <- lists:seq(S, E)]; -docs(_, _) -> - []. - -ddocs(Type, N) when is_integer(N), N > 0 -> - ddocs(Type, [0, N - 1]); -ddocs(Type, [S, E]) when E >= S -> - Body = ddprop(Type), - BType = atom_to_binary(Type, utf8), - [doc(<<"_design/", BType/binary>>, I, Body, 0) || I <- lists:seq(S, E)]; -ddocs(_, _) -> - []. - - -ldocs(N) when is_integer(N), N > 0 -> - ldocs([0, N - 1]); -ldocs([S, E]) when E >= S -> - [doc(<<"_local/">>, I, bodyprops(), 0) || I <- lists:seq(S, E)]; -ldocs(_) -> - []. - - - -doc(Pref, Id) -> - Body = bodyprops(), - doc(Pref, Id, Body, 42). - - -doc(Pref, Id, BodyProps, AttSize) -> - #doc{ - id = doc_id(Pref, Id), - body = {BodyProps}, - atts = atts(AttSize) - }. - - -doc_id(Pref, Id) -> - IdBin = iolist_to_binary(io_lib:format("~5..0B", [Id])), - <<Pref/binary, IdBin/binary>>. - - -ddprop(mrview) -> - [ - {<<"views">>, {[ - {<<"v1">>, {[ - {<<"map">>, <<"function(d){emit(d);}">>} - ]}} - ]}} - ]; - -ddprop(geo) -> - [ - {<<"st_indexes">>, {[ - {<<"area">>, {[ - {<<"analyzer">>, <<"standard">>}, - {<<"index">>, <<"function(d){if(d.g){st_index(d.g)}}">> } - ]}} - ]}} - ]; - -ddprop(search) -> - [ - {<<"indexes">>, {[ - {<<"types">>, {[ - {<<"index">>, <<"function(d){if(d.g){st_index(d.g.type)}}">>} - ]}} - ]}} - ]. - - -bodyprops() -> - [ - {<<"g">>, {[ - {<<"type">>, <<"Polygon">>}, - {<<"coordinates">>, [[[-71.0, 48.4], [-70.0, 48.4], [-71.0, 48.4]]]} - ]}} - ]. - - -atts(0) -> - []; - -atts(Size) when is_integer(Size), Size >= 1 -> - Data = << <<"x">> || _ <- lists:seq(1, Size) >>, - [couch_att:new([ - {name, <<"att">>}, - {type, <<"app/binary">>}, - {att_len, Size}, - {data, Data} - ])]. |