path: root/src/mem3/test/eunit/mem3_reshard_test.erl
diff options
Diffstat (limited to 'src/mem3/test/eunit/mem3_reshard_test.erl')
1 files changed, 0 insertions, 834 deletions
diff --git a/src/mem3/test/eunit/mem3_reshard_test.erl b/src/mem3/test/eunit/mem3_reshard_test.erl
deleted file mode 100644
index 7cd6b1fe6..000000000
--- a/src/mem3/test/eunit/mem3_reshard_test.erl
+++ /dev/null
@@ -1,834 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
--include_lib("couch_mrview/include/couch_mrview.hrl"). % for all_docs function
--define(ID, <<"_id">>).
--define(TIMEOUT, 60).
-setup() ->
- HaveDreyfus = code:lib_dir(dreyfus) /= {error, bad_name},
- case HaveDreyfus of false -> ok; true ->
- mock_dreyfus_indices()
- end,
- HaveHastings = code:lib_dir(hastings) /= {error, bad_name},
- case HaveHastings of false -> ok; true ->
- mock_hastings_indices()
- end,
- {Db1, Db2} = {?tempdb(), ?tempdb()},
- create_db(Db1, [{q, 1}, {n, 1}]),
- PartProps = [{partitioned, true}, {hash, [couch_partition, hash, []]}],
- create_db(Db2, [{q, 1}, {n, 1}, {props, PartProps}]),
- config:set("reshard", "retry_interval_sec", "0", _Persist=false),
- #{db1 => Db1, db2 => Db2}.
-teardown(#{} = Dbs) ->
- mem3_reshard:reset_state(),
- maps:map(fun(_, Db) -> delete_db(Db) end, Dbs),
- config:delete("reshard", "retry_interval_sec", _Persist=false),
- meck:unload().
-start_couch() ->
- test_util:start_couch(?CONFIG_CHAIN, [mem3, fabric]).
-stop_couch(Ctx) ->
- test_util:stop_couch(Ctx).
-mem3_reshard_db_test_() ->
- {
- "mem3 shard split db tests",
- {
- setup,
- fun start_couch/0, fun stop_couch/1,
- {
- foreach,
- fun setup/0, fun teardown/1,
- [
- fun split_one_shard/1,
- fun update_docs_before_topoff1/1,
- fun indices_are_built/1,
- fun split_partitioned_db/1,
- fun split_twice/1,
- fun couch_events_are_emitted/1,
- fun retries_work/1,
- fun target_reset_in_initial_copy/1,
- fun split_an_incomplete_shard_map/1,
- fun target_shards_are_locked/1
- ]
- }
- }
- }.
-% This is a basic test to check that shard splitting preserves documents, and
-% db meta props like revs limits and security.
-split_one_shard(#{db1 := Db}) ->
- {timeout, ?TIMEOUT, ?_test(begin
- DocSpec = #{docs => 10, delete => [5, 9], mrview => 1, local => 1},
- add_test_docs(Db, DocSpec),
- % Save documents before the split
- Docs0 = get_all_docs(Db),
- Local0 = get_local_docs(Db),
- % Set some custom metadata properties
- set_revs_limit(Db, 942),
- set_purge_infos_limit(Db, 943),
- SecObj = {[{<<"foo">>, <<"bar">>}]},
- set_security(Db, SecObj),
- % DbInfo is saved after setting metadata bits
- % as those could bump the update sequence
- DbInfo0 = get_db_info(Db),
- % Split the one shard
- [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)),
- {ok, JobId} = mem3_reshard:start_split_job(Shard),
- wait_state(JobId, completed),
- % Perform some basic checks that the shard was split
- Shards1 = lists:sort(mem3:local_shards(Db)),
- ?assertEqual(2, length(Shards1)),
- [#shard{range = R1}, #shard{range = R2}] = Shards1,
- ?assertEqual([16#00000000, 16#7fffffff], R1),
- ?assertEqual([16#80000000, 16#ffffffff], R2),
- % Check metadata bits after the split
- ?assertEqual(942, get_revs_limit(Db)),
- ?assertEqual(943, get_purge_infos_limit(Db)),
- ?assertEqual(SecObj, get_security(Db)),
- DbInfo1 = get_db_info(Db),
- Docs1 = get_all_docs(Db),
- Local1 = get_local_docs(Db),
- % When comparing db infos, ignore update sequences they won't be the
- % same since they are more shards involved after the split
- ?assertEqual(without_seqs(DbInfo0), without_seqs(DbInfo1)),
- % Update seq prefix number is a sum of all shard update sequences
- #{<<"update_seq">> := UpdateSeq0} = update_seq_to_num(DbInfo0),
- #{<<"update_seq">> := UpdateSeq1} = update_seq_to_num(DbInfo1),
- ?assertEqual(UpdateSeq0 * 2, UpdateSeq1),
- % Finally compare that the documents are still there after the split
- ?assertEqual(Docs0, Docs1),
- % Don't forget about the local but don't include internal checkpoints
- % as some of those are munged and transformed during the split
- ?assertEqual(without_meta_locals(Local0), without_meta_locals(Local1))
- end)}.
-% This test checks that document added while the shard is being split are not
-% lost. Topoff1 state happens before indices are built
-update_docs_before_topoff1(#{db1 := Db}) ->
- {timeout, ?TIMEOUT, ?_test(begin
- add_test_docs(Db, #{docs => 10}),
- intercept_state(topoff1),
- [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)),
- {ok, JobId} = mem3_reshard:start_split_job(Shard),
- receive {JobPid, topoff1} -> ok end,
- add_test_docs(Db, #{docs => [10, 19], local => 1}),
- Docs0 = get_all_docs(Db),
- Local0 = get_local_docs(Db),
- DbInfo0 = get_db_info(Db),
- JobPid ! continue,
- wait_state(JobId, completed),
- % Perform some basic checks that the shard was split
- Shards1 = lists:sort(mem3:local_shards(Db)),
- ?assertEqual(2, length(Shards1)),
- DbInfo1 = get_db_info(Db),
- Docs1 = get_all_docs(Db),
- Local1 = get_local_docs(Db),
- ?assertEqual(without_seqs(DbInfo0), without_seqs(DbInfo1)),
- % Update sequence after initial copy with 10 docs would be 10 on each
- % target shard (to match the source) and the total update sequence
- % would have been 20. But then 10 more docs were added (3 might have
- % ended up on one target and 7 on another) so the final update sequence
- % would then be 20 + 10 = 30.
- ?assertMatch(#{<<"update_seq">> := 30}, update_seq_to_num(DbInfo1)),
- ?assertEqual(Docs0, Docs1),
- ?assertEqual(without_meta_locals(Local0), without_meta_locals(Local1))
- end)}.
-% This test that indices are built during shard splitting.
-indices_are_built(#{db1 := Db}) ->
- {timeout, ?TIMEOUT, ?_test(begin
- HaveDreyfus = code:lib_dir(dreyfus) /= {error, bad_name},
- HaveHastings = code:lib_dir(hastings) /= {error, bad_name},
- add_test_docs(Db, #{docs => 10, mrview => 2, search => 2, geo => 2}),
- [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)),
- {ok, JobId} = mem3_reshard:start_split_job(Shard),
- wait_state(JobId, completed),
- Shards1 = lists:sort(mem3:local_shards(Db)),
- ?assertEqual(2, length(Shards1)),
- MRViewGroupInfo = get_group_info(Db, <<"_design/mrview00000">>),
- ?assertMatch(#{<<"update_seq">> := 32}, MRViewGroupInfo),
- HaveDreyfus = code:lib_dir(dreyfus) /= {error, bad_name},
- case HaveDreyfus of false -> ok; true ->
- % 4 because there are 2 indices and 2 target shards
- ?assertEqual(4, meck:num_calls(dreyfus_index, await, 2))
- end,
- HaveHastings = code:lib_dir(hastings) /= {error, bad_name},
- case HaveHastings of false -> ok; true ->
- % 4 because there are 2 indices and 2 target shards
- ?assertEqual(4, meck:num_calls(hastings_index, await, 2))
- end
- end)}.
-mock_dreyfus_indices() ->
- meck:expect(dreyfus_index, design_doc_to_indexes, fun(Doc) ->
- #doc{body = {BodyProps}} = Doc,
- case couch_util:get_value(<<"indexes">>, BodyProps) of
- undefined ->
- [];
- {[_]} ->
- [{dreyfus, <<"db">>, dreyfus_index1}]
- end
- end),
- meck:expect(dreyfus_index_manager, get_index, fun(_, _) -> {ok, pid} end),
- meck:expect(dreyfus_index, await, fun(_, _) -> ok end).
-mock_hastings_indices() ->
- meck:expect(hastings_index, design_doc_to_indexes, fun(Doc) ->
- #doc{body = {BodyProps}} = Doc,
- case couch_util:get_value(<<"st_indexes">>, BodyProps) of
- undefined ->
- [];
- {[_]} ->
- [{hastings, <<"db">>, hastings_index1}]
- end
- end),
- meck:expect(hastings_index_manager, get_index, fun(_, _) -> {ok, pid} end),
- meck:expect(hastings_index, await, fun(_, _) -> ok end).
-% Split partitioned database
-split_partitioned_db(#{db2 := Db}) ->
- {timeout, ?TIMEOUT, ?_test(begin
- DocSpec = #{
- pdocs => #{
- <<"PX">> => 5,
- <<"PY">> => 5
- },
- mrview => 1,
- local => 1
- },
- add_test_docs(Db, DocSpec),
- % Save documents before the split
- Docs0 = get_all_docs(Db),
- Local0 = get_local_docs(Db),
- % Set some custom metadata properties
- set_revs_limit(Db, 942),
- set_purge_infos_limit(Db, 943),
- SecObj = {[{<<"foo">>, <<"bar">>}]},
- set_security(Db, SecObj),
- % DbInfo is saved after setting metadata bits
- % as those could bump the update sequence
- DbInfo0 = get_db_info(Db),
- PX0 = get_partition_info(Db, <<"PX">>),
- PY0 = get_partition_info(Db, <<"PY">>),
- % Split the one shard
- [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)),
- {ok, JobId} = mem3_reshard:start_split_job(Shard),
- wait_state(JobId, completed),
- % Perform some basic checks that the shard was split
- Shards1 = lists:sort(mem3:local_shards(Db)),
- ?assertEqual(2, length(Shards1)),
- [#shard{range = R1}, #shard{range = R2}] = Shards1,
- ?assertEqual([16#00000000, 16#7fffffff], R1),
- ?assertEqual([16#80000000, 16#ffffffff], R2),
- % Check metadata bits after the split
- ?assertEqual(942, get_revs_limit(Db)),
- ?assertEqual(943, get_purge_infos_limit(Db)),
- ?assertEqual(SecObj, get_security(Db)),
- DbInfo1 = get_db_info(Db),
- Docs1 = get_all_docs(Db),
- Local1 = get_local_docs(Db),
- % When comparing db infos, ignore update sequences they won't be the
- % same since they are more shards involved after the split
- ?assertEqual(without_seqs(DbInfo0), without_seqs(DbInfo1)),
- % Update seq prefix number is a sum of all shard update sequences
- #{<<"update_seq">> := UpdateSeq0} = update_seq_to_num(DbInfo0),
- #{<<"update_seq">> := UpdateSeq1} = update_seq_to_num(DbInfo1),
- ?assertEqual(UpdateSeq0 * 2, UpdateSeq1),
- % Finally compare that documents are still there after the split
- ?assertEqual(Docs0, Docs1),
- ?assertEqual(PX0, get_partition_info(Db, <<"PX">>)),
- ?assertEqual(PY0, get_partition_info(Db, <<"PY">>)),
- % Don't forget about the local but don't include internal checkpoints
- % as some of those are munged and transformed during the split
- ?assertEqual(without_meta_locals(Local0), without_meta_locals(Local1))
- end)}.
-% Make sure a shard can be split again after it was split once. This checks that
-% too many got added to some range, such that on next split they'd fail to fit
-% in to any of the new target ranges.
-split_twice(#{db1 := Db}) ->
- {timeout, ?TIMEOUT, ?_test(begin
- DocSpec = #{docs => 100, delete => [80, 99], mrview => 2, local => 100},
- add_test_docs(Db, DocSpec),
- % Save documents before the split
- Docs0 = get_all_docs(Db),
- Local0 = get_local_docs(Db),
- % Set some custom metadata properties
- set_revs_limit(Db, 942),
- set_purge_infos_limit(Db, 943),
- SecObj = {[{<<"foo">>, <<"bar">>}]},
- set_security(Db, SecObj),
- % DbInfo is saved after setting metadata bits
- % as those could bump the update sequence
- DbInfo0 = get_db_info(Db),
- % Split the one shard
- [#shard{name=Shard1}] = lists:sort(mem3:local_shards(Db)),
- {ok, JobId1} = mem3_reshard:start_split_job(Shard1),
- wait_state(JobId1, completed),
- % Perform some basic checks that the shard was split
- Shards1 = lists:sort(mem3:local_shards(Db)),
- ?assertEqual(2, length(Shards1)),
- [#shard{range = R1}, #shard{range = R2}] = Shards1,
- ?assertEqual([16#00000000, 16#7fffffff], R1),
- ?assertEqual([16#80000000, 16#ffffffff], R2),
- % Check metadata bits after the split
- ?assertEqual(942, get_revs_limit(Db)),
- ?assertEqual(943, get_purge_infos_limit(Db)),
- ?assertEqual(SecObj, get_security(Db)),
- DbInfo1 = get_db_info(Db),
- Docs1 = get_all_docs(Db),
- Local1 = get_local_docs(Db),
- % When comparing db infos, ignore update sequences they won't be the
- % same since they are more shards involved after the split
- ?assertEqual(without_seqs(DbInfo0), without_seqs(DbInfo1)),
- % Update seq prefix number is a sum of all shard update sequences
- #{<<"update_seq">> := UpdateSeq0} = update_seq_to_num(DbInfo0),
- #{<<"update_seq">> := UpdateSeq1} = update_seq_to_num(DbInfo1),
- ?assertEqual(UpdateSeq0 * 2, UpdateSeq1),
- ?assertEqual(Docs0, Docs1),
- ?assertEqual(without_meta_locals(Local0), without_meta_locals(Local1)),
- % Split the first range again
- [#shard{name=Shard2}, _] = lists:sort(mem3:local_shards(Db)),
- {ok, JobId2} = mem3_reshard:start_split_job(Shard2),
- wait_state(JobId2, completed),
- Shards2 = lists:sort(mem3:local_shards(Db)),
- ?assertEqual(3, length(Shards2)),
- [R3, R4, R5] = [R || #shard{range = R} <- Shards2],
- ?assertEqual([16#00000000, 16#3fffffff], R3),
- ?assertEqual([16#40000000, 16#7fffffff], R4),
- ?assertEqual([16#80000000, 16#ffffffff], R5),
- % Check metadata bits after the second split
- ?assertEqual(942, get_revs_limit(Db)),
- ?assertEqual(943, get_purge_infos_limit(Db)),
- ?assertEqual(SecObj, get_security(Db)),
- DbInfo2 = get_db_info(Db),
- Docs2 = get_all_docs(Db),
- Local2 = get_local_docs(Db),
- ?assertEqual(without_seqs(DbInfo1), without_seqs(DbInfo2)),
- % Update seq prefix number is a sum of all shard update sequences
- % But only 1 shard out of 2 was split
- #{<<"update_seq">> := UpdateSeq2} = update_seq_to_num(DbInfo2),
- ?assertEqual(trunc(UpdateSeq1 * 1.5), UpdateSeq2),
- ?assertEqual(Docs1, Docs2),
- ?assertEqual(without_meta_locals(Local1), without_meta_locals(Local2))
- end)}.
-couch_events_are_emitted(#{db1 := Db}) ->
- {timeout, ?TIMEOUT, ?_test(begin
- couch_event:register_all(self()),
- % Split the one shard
- [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)),
- {ok, JobId} = mem3_reshard:start_split_job(Shard),
- wait_state(JobId, completed),
- % Perform some basic checks that the shard was split
- Shards1 = lists:sort(mem3:local_shards(Db)),
- ?assertEqual(2, length(Shards1)),
- [#shard{range = R1}, #shard{range = R2}] = Shards1,
- ?assertEqual([16#00000000, 16#7fffffff], R1),
- ?assertEqual([16#80000000, 16#ffffffff], R2),
- Flush = fun F(Events) ->
- receive
- {'$couch_event', DbName, Event} when Event =:= deleted
- orelse Event =:= updated ->
- case binary:match(DbName, Db) of
- nomatch -> F(Events);
- {_, _} -> F([Event | Events])
- end
- after 0 ->
- lists:reverse(Events)
- end
- end,
- Events = Flush([]),
- StartAtDeleted = lists:dropwhile(fun(E) -> E =/= deleted end, Events),
- ?assertMatch([deleted, deleted, updated, updated | _], StartAtDeleted),
- couch_event:unregister(self())
- end)}.
-retries_work(#{db1 := Db}) ->
- {timeout, ?TIMEOUT, ?_test(begin
- meck:expect(couch_db_split, split, fun(_, _, _) ->
- error(kapow)
- end),
- [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)),
- {ok, JobId} = mem3_reshard:start_split_job(Shard),
- wait_state(JobId, failed),
- ?assertEqual(3, meck:num_calls(couch_db_split, split, 3))
- end)}.
-target_reset_in_initial_copy(#{db1 := Db}) ->
- {timeout, ?TIMEOUT, ?_test(begin
- [#shard{} = Src] = lists:sort(mem3:local_shards(Db)),
- Job = #job{
- source = Src,
- target = [#shard{name= <<"t1">>}, #shard{name = <<"t2">>}],
- job_state = running,
- split_state = initial_copy
- },
- meck:expect(couch_db_split, cleanup_target, 2, ok),
- meck:expect(couch_server, exists, fun
- (<<"t1">>) -> true;
- (<<"t2">>) -> true;
- (DbName) -> meck:passthrough([DbName])
- end),
- JobPid = spawn(fun() -> mem3_reshard_job:initial_copy_impl(Job) end),
- meck:wait(2, couch_db_split, cleanup_target, ['_', '_'], 5000),
- exit(JobPid, kill),
- ?assertEqual(2, meck:num_calls(couch_db_split, cleanup_target, 2))
- end)}.
-split_an_incomplete_shard_map(#{db1 := Db}) ->
- {timeout, ?TIMEOUT, ?_test(begin
- [#shard{} = Src] = lists:sort(mem3:local_shards(Db)),
- [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)),
- meck:expect(mem3_util, calculate_max_n, 1, 0),
- ?assertMatch({error, {not_enough_shard_copies, _}},
- mem3_reshard:start_split_job(Shard))
- end)}.
-% Opening a db target db in initial copy phase will throw an error
-target_shards_are_locked(#{db1 := Db}) ->
- {timeout, ?TIMEOUT, ?_test(begin
- add_test_docs(Db, #{docs => 10}),
- % Make the job stops right when it was about to copy the docs
- TestPid = self(),
- meck:new(couch_db, [passthrough]),
- meck:expect(couch_db, start_link, fun(Engine, TName, FilePath, Opts) ->
- TestPid ! {start_link, self(), TName},
- receive
- continue ->
- meck:passthrough([Engine, TName, FilePath, Opts])
- end
- end),
- [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)),
- {ok, JobId} = mem3_reshard:start_split_job(Shard),
- {Target0, JobPid} = receive
- {start_link, Pid, TName} -> {TName, Pid}
- end,
- ?assertEqual({error, {locked, <<"shard splitting">>}},
- couch_db:open_int(Target0, [])),
- % Send two continues for two targets
- JobPid ! continue,
- JobPid ! continue,
- wait_state(JobId, completed)
- end)}.
-intercept_state(State) ->
- TestPid = self(),
- meck:new(mem3_reshard_job, [passthrough]),
- meck:expect(mem3_reshard_job, checkpoint_done, fun(Job) ->
- case Job#job.split_state of
- State ->
- TestPid ! {self(), State},
- receive
- continue -> meck:passthrough([Job]);
- cancel -> ok
- end;
- _ ->
- meck:passthrough([Job])
- end
- end).
-wait_state(JobId, State) ->
- test_util:wait(fun() ->
- case mem3_reshard:job(JobId) of
- {ok, {Props}} ->
- case couch_util:get_value(job_state, Props) of
- State -> ok;
- _ -> timer:sleep(100), wait
- end;
- {error, not_found} -> timer:sleep(100), wait
- end
- end, 30000).
-set_revs_limit(DbName, Limit) ->
- with_proc(fun() -> fabric:set_revs_limit(DbName, Limit, [?ADMIN_CTX]) end).
-get_revs_limit(DbName) ->
- with_proc(fun() -> fabric:get_revs_limit(DbName) end).
-get_purge_infos_limit(DbName) ->
- with_proc(fun() -> fabric:get_purge_infos_limit(DbName) end).
-set_purge_infos_limit(DbName, Limit) ->
- with_proc(fun() ->
- fabric:set_purge_infos_limit(DbName, Limit, [?ADMIN_CTX])
- end).
-set_security(DbName, SecObj) ->
- with_proc(fun() -> fabric:set_security(DbName, SecObj) end).
-get_security(DbName) ->
- with_proc(fun() -> fabric:get_security(DbName, [?ADMIN_CTX]) end).
-get_db_info(DbName) ->
- with_proc(fun() ->
- {ok, Info} = fabric:get_db_info(DbName),
- maps:with([
- <<"db_name">>, <<"doc_count">>, <<"props">>, <<"doc_del_count">>,
- <<"update_seq">>, <<"purge_seq">>, <<"disk_format_version">>
- ], to_map(Info))
- end).
-get_group_info(DbName, DesignId) ->
- with_proc(fun() ->
- {ok, GInfo} = fabric:get_view_group_info(DbName, DesignId),
- maps:with([
- <<"language">>, <<"purge_seq">>, <<"signature">>, <<"update_seq">>
- ], to_map(GInfo))
- end).
-get_partition_info(DbName, Partition) ->
- with_proc(fun() ->
- {ok, PInfo} = fabric:get_partition_info(DbName, Partition),
- maps:with([
- <<"db_name">>, <<"doc_count">>, <<"doc_del_count">>, <<"partition">>
- ], to_map(PInfo))
- end).
-get_all_docs(DbName) ->
- get_all_docs(DbName, #mrargs{}).
-get_all_docs(DbName, #mrargs{} = QArgs0) ->
- GL = erlang:group_leader(),
- with_proc(fun() ->
- Cb = fun
- ({row, Props}, Acc) ->
- Doc = to_map(couch_util:get_value(doc, Props)),
- #{?ID := Id} = Doc,
- {ok, Acc#{Id => Doc}};
- ({meta, _}, Acc) -> {ok, Acc};
- (complete, Acc) -> {ok, Acc}
- end,
- QArgs = QArgs0#mrargs{include_docs = true},
- {ok, Docs} = fabric:all_docs(DbName, Cb, #{}, QArgs),
- Docs
- end, GL).
-get_local_docs(DbName) ->
- LocalNS = {namespace, <<"_local">>},
- maps:map(fun(_, Doc) ->
- maps:without([<<"_rev">>], Doc)
- end, get_all_docs(DbName, #mrargs{extra = [LocalNS]})).
-without_seqs(#{} = InfoMap) ->
- maps:without([<<"update_seq">>, <<"purge_seq">>], InfoMap).
-without_meta_locals(#{} = Local) ->
- maps:filter(fun
- (<<"_local/purge-mrview-", _/binary>>, _) -> false;
- (<<"_local/shard-sync-", _/binary>>, _) -> false;
- (_, _) -> true
- end, Local).
-update_seq_to_num(#{} = InfoMap) ->
- maps:map(fun
- (<<"update_seq">>, Seq) -> seq_to_num(Seq);
- (<<"purge_seq">>, PSeq) -> seq_to_num(PSeq);
- (_, V) -> V
- end, InfoMap).
-seq_to_num(Seq) ->
- [SeqNum, _] = binary:split(Seq, <<"-">>),
- binary_to_integer(SeqNum).
-to_map([_ | _] = Props) ->
- to_map({Props});
-to_map({[_ | _]} = EJson) ->
- jiffy:decode(jiffy:encode(EJson), [return_maps]).
-create_db(DbName, Opts) ->
- GL = erlang:group_leader(),
- with_proc(fun() -> fabric:create_db(DbName, Opts) end, GL).
-delete_db(DbName) ->
- GL = erlang:group_leader(),
- with_proc(fun() -> fabric:delete_db(DbName, [?ADMIN_CTX]) end, GL).
-with_proc(Fun) ->
- with_proc(Fun, undefined, 30000).
-with_proc(Fun, GroupLeader) ->
- with_proc(Fun, GroupLeader, 30000).
-with_proc(Fun, GroupLeader, Timeout) ->
- {Pid, Ref} = spawn_monitor(fun() ->
- case GroupLeader of
- undefined -> ok;
- _ -> erlang:group_leader(GroupLeader, self())
- end,
- exit({with_proc_res, Fun()})
- end),
- receive
- {'DOWN', Ref, process, Pid, {with_proc_res, Res}} ->
- Res;
- {'DOWN', Ref, process, Pid, Error} ->
- error(Error)
- after Timeout ->
- erlang:demonitor(Ref, [flush]),
- exit(Pid, kill),
- error({with_proc_timeout, Fun, Timeout})
- end.
-add_test_docs(DbName, #{} = DocSpec) ->
- Docs = docs(maps:get(docs, DocSpec, []))
- ++ pdocs(maps:get(pdocs, DocSpec, #{}))
- ++ ddocs(mrview, maps:get(mrview, DocSpec, []))
- ++ ddocs(search, maps:get(search, DocSpec, []))
- ++ ddocs(geo, maps:get(geo, DocSpec, []))
- ++ ldocs(maps:get(local, DocSpec, [])),
- Res = update_docs(DbName, Docs),
- Docs1 = lists:map(fun({Doc, {ok, {RevPos, Rev}}}) ->
- Doc#doc{revs = {RevPos, [Rev]}}
- end, lists:zip(Docs, Res)),
- case delete_docs(maps:get(delete, DocSpec, []), Docs1) of
- [] -> ok;
- [_ | _] = Deleted -> update_docs(DbName, Deleted)
- end,
- ok.
-update_docs(DbName, Docs) ->
- with_proc(fun() ->
- case fabric:update_docs(DbName, Docs, [?ADMIN_CTX]) of
- {accepted, Res} -> Res;
- {ok, Res} -> Res
- end
- end).
-delete_docs([S, E], Docs) when E >= S ->
- ToDelete = [doc_id(<<"">>, I) || I <- lists:seq(S, E)],
- lists:filtermap(fun(#doc{id = Id} = Doc) ->
- case lists:member(Id, ToDelete) of
- true -> {true, Doc#doc{deleted = true}};
- false -> false
- end
- end, Docs);
-delete_docs(_, _) ->
- [].
-pdocs(#{} = PMap) ->
- maps:fold(fun(Part, DocSpec, DocsAcc) ->
- docs(DocSpec, <<Part/binary, ":">>) ++ DocsAcc
- end, [], PMap).
-docs(DocSpec) ->
- docs(DocSpec, <<"">>).
-docs(N, Prefix) when is_integer(N), N > 0 ->
- docs([0, N - 1], Prefix);
-docs([S, E], Prefix) when E >= S ->
- [doc(Prefix, I) || I <- lists:seq(S, E)];
-docs(_, _) ->
- [].
-ddocs(Type, N) when is_integer(N), N > 0 ->
- ddocs(Type, [0, N - 1]);
-ddocs(Type, [S, E]) when E >= S ->
- Body = ddprop(Type),
- BType = atom_to_binary(Type, utf8),
- [doc(<<"_design/", BType/binary>>, I, Body, 0) || I <- lists:seq(S, E)];
-ddocs(_, _) ->
- [].
-ldocs(N) when is_integer(N), N > 0 ->
- ldocs([0, N - 1]);
-ldocs([S, E]) when E >= S ->
- [doc(<<"_local/">>, I, bodyprops(), 0) || I <- lists:seq(S, E)];
-ldocs(_) ->
- [].
-doc(Pref, Id) ->
- Body = bodyprops(),
- doc(Pref, Id, Body, 42).
-doc(Pref, Id, BodyProps, AttSize) ->
- #doc{
- id = doc_id(Pref, Id),
- body = {BodyProps},
- atts = atts(AttSize)
- }.
-doc_id(Pref, Id) ->
- IdBin = iolist_to_binary(io_lib:format("~5..0B", [Id])),
- <<Pref/binary, IdBin/binary>>.
-ddprop(mrview) ->
- [
- {<<"views">>, {[
- {<<"v1">>, {[
- {<<"map">>, <<"function(d){emit(d);}">>}
- ]}}
- ]}}
- ];
-ddprop(geo) ->
- [
- {<<"st_indexes">>, {[
- {<<"area">>, {[
- {<<"analyzer">>, <<"standard">>},
- {<<"index">>, <<"function(d){if(d.g){st_index(d.g)}}">> }
- ]}}
- ]}}
- ];
-ddprop(search) ->
- [
- {<<"indexes">>, {[
- {<<"types">>, {[
- {<<"index">>, <<"function(d){if(d.g){st_index(d.g.type)}}">>}
- ]}}
- ]}}
- ].
-bodyprops() ->
- [
- {<<"g">>, {[
- {<<"type">>, <<"Polygon">>},
- {<<"coordinates">>, [[[-71.0, 48.4], [-70.0, 48.4], [-71.0, 48.4]]]}
- ]}}
- ].
-atts(0) ->
- [];
-atts(Size) when is_integer(Size), Size >= 1 ->
- Data = << <<"x">> || _ <- lists:seq(1, Size) >>,
- [couch_att:new([
- {name, <<"att">>},
- {type, <<"app/binary">>},
- {att_len, Size},
- {data, Data}
- ])].