+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+-define(USER, "mem3_reshard_api_test_admin").
+-define(PASS, "pass").
+-define(AUTH, {basic_auth, {?USER, ?PASS}}).
+-define(JSON, {"Content-Type", "application/json"}).
+-define(RESHARD, "_reshard/").
+-define(JOBS, "_reshard/jobs/").
+-define(STATE, "_reshard/state").
+-define(ID, <<"id">>).
+-define(OK, <<"ok">>).
+-define(TIMEOUT, 60). % seconds
+setup() ->
+ Hashed = couch_passwords:hash_admin_password(?PASS),
+ ok = config:set("admins", ?USER, ?b2l(Hashed), _Persist=false),
+ Addr = config:get("chttpd", "bind_address", ""),
+ Port = mochiweb_socket_server:get(chttpd, port),
+ Url = lists:concat(["http://", Addr, ":", Port, "/"]),
+ {Db1, Db2, Db3} = {?tempdb(), ?tempdb(), ?tempdb()},
+ create_db(Url, Db1, "?q=1&n=1"),
+ create_db(Url, Db2, "?q=1&n=1"),
+ create_db(Url, Db3, "?q=2&n=1"),
+ {Url, {Db1, Db2, Db3}}.
+teardown({Url, {Db1, Db2, Db3}}) ->
+ mem3_reshard:reset_state(),
+ application:unset_env(mem3, reshard_disabled),
+ delete_db(Url, Db1),
+ delete_db(Url, Db2),
+ delete_db(Url, Db3),
+ ok = config:delete("reshard", "max_jobs", _Persist=false),
+ ok = config:delete("reshard", "require_node_param", _Persist=false),
+ ok = config:delete("reshard", "require_range_param", _Persist=false),
+ ok = config:delete("admins", ?USER, _Persist=false),
+ meck:unload().
+start_couch() ->
+ test_util:start_couch([mem3, chttpd]).
+stop_couch(Ctx) ->
+ test_util:stop_couch(Ctx).
+mem3_reshard_api_test_() ->
+ {
+ "mem3 shard split api tests",
+ {
+ setup,
+ fun start_couch/0, fun stop_couch/1,
+ {
+ foreach,
+ fun setup/0, fun teardown/1,
+ [
+ fun basics/1,
+ fun create_job_basic/1,
+ fun create_two_jobs/1,
+ fun create_multiple_jobs_from_one_post/1,
+ fun start_stop_cluster_basic/1,
+ fun test_disabled/1,
+ fun start_stop_cluster_with_a_job/1,
+ fun individual_job_start_stop/1,
+ fun individual_job_stop_when_cluster_stopped/1,
+ fun create_job_with_invalid_arguments/1,
+ fun create_job_with_db/1,
+ fun create_job_with_shard_name/1,
+ fun completed_job_handling/1,
+ fun handle_db_deletion_in_initial_copy/1,
+ fun handle_db_deletion_in_topoff1/1,
+ fun handle_db_deletion_in_copy_local_docs/1,
+ fun handle_db_deletion_in_build_indices/1,
+ fun handle_db_deletion_in_update_shard_map/1,
+ fun handle_db_deletion_in_wait_source_close/1,
+ fun recover_in_initial_copy/1,
+ fun recover_in_topoff1/1,
+ fun recover_in_copy_local_docs/1,
+ fun recover_in_build_indices/1,
+ fun recover_in_update_shard_map/1,
+ fun recover_in_wait_source_close/1,
+ fun recover_in_topoff3/1,
+ fun recover_in_source_delete/1,
+ fun check_max_jobs/1,
+ fun check_node_and_range_required_params/1,
+ fun cleanup_completed_jobs/1
+ ]
+ }
+ }
+ }.
+basics({Top, _}) ->
+ {timeout, ?TIMEOUT, ?_test(begin
+ % GET /_reshard
+ ?assertMatch({200, #{
+ <<"state">> := <<"running">>,
+ <<"state_reason">> := null,
+ <<"completed">> := 0,
+ <<"failed">> := 0,
+ <<"running">> := 0,
+ <<"stopped">> := 0,
+ <<"total">> := 0
+ }}, req(get, Top ++ ?RESHARD)),
+ % GET _reshard/state
+ ?assertMatch({200, #{<<"state">> := <<"running">>}},
+ req(get, Top ++ ?STATE)),
+ % GET _reshard/jobs
+ ?assertMatch({200, #{
+ <<"jobs">> := [],
+ <<"offset">> := 0,
+ <<"total_rows">> := 0
+ }}, req(get, Top ++ ?JOBS)),
+ % Some invalid paths and methods
+ ?assertMatch({404, _}, req(get, Top ++ ?RESHARD ++ "/invalidpath")),
+ ?assertMatch({405, _}, req(put, Top ++ ?RESHARD, #{dont => thinkso})),
+ ?assertMatch({405, _}, req(post, Top ++ ?RESHARD, #{nope => nope}))
+ end)}.
+create_job_basic({Top, {Db1, _, _}}) ->
+ {timeout, ?TIMEOUT, ?_test(begin
+ % POST /_reshard/jobs
+ {C1, R1} = req(post, Top ++ ?JOBS, #{type => split, db => Db1}),
+ ?assertEqual(201, C1),
+ ?assertMatch([#{?OK := true, ?ID := J, <<"shard">> := S}]
+ when is_binary(J) andalso is_binary(S), R1),
+ [#{?ID := Id, <<"shard">> := Shard}] = R1,
+ % GET /_reshard/jobs
+ ?assertMatch({200, #{
+ <<"jobs">> := [#{?ID := Id, <<"type">> := <<"split">>}],
+ <<"offset">> := 0,
+ <<"total_rows">> := 1
+ }}, req(get, Top ++ ?JOBS)),
+ % GET /_reshard/job/$jobid
+ {C2, R2} = req(get, Top ++ ?JOBS ++ ?b2l(Id)),
+ ?assertEqual(200, C2),
+ ThisNode = atom_to_binary(node(), utf8),
+ ?assertMatch(#{?ID := Id}, R2),
+ ?assertMatch(#{<<"type">> := <<"split">>}, R2),
+ ?assertMatch(#{<<"source">> := Shard}, R2),
+ ?assertMatch(#{<<"history">> := History} when length(History) > 1, R2),
+ ?assertMatch(#{<<"node">> := ThisNode}, R2),
+ ?assertMatch(#{<<"split_state">> := SSt} when is_binary(SSt), R2),
+ ?assertMatch(#{<<"job_state">> := JSt} when is_binary(JSt), R2),
+ ?assertMatch(#{<<"state_info">> := #{}}, R2),
+ ?assertMatch(#{<<"target">> := Target} when length(Target) == 2, R2),
+ % GET /_reshard/job/$jobid/state
+ ?assertMatch({200, #{<<"state">> := S, <<"reason">> := R}}
+ when is_binary(S) andalso (is_binary(R) orelse R =:= null),
+ req(get, Top ++ ?JOBS ++ ?b2l(Id) ++ "/state")),
+ % GET /_reshard
+ ?assertMatch({200, #{<<"state">> := <<"running">>, <<"total">> := 1}},
+ req(get, Top ++ ?RESHARD)),
+ % DELETE /_reshard/jobs/$jobid
+ ?assertMatch({200, #{?OK := true}},
+ req(delete, Top ++ ?JOBS ++ ?b2l(Id))),
+ % GET _reshard/jobs
+ ?assertMatch({200, #{<<"jobs">> := [], <<"total_rows">> := 0}},
+ req(get, Top ++ ?JOBS)),
+ % GET /_reshard/job/$jobid should be a 404
+ ?assertMatch({404, #{}}, req(get, Top ++ ?JOBS ++ ?b2l(Id))),
+ % DELETE /_reshard/jobs/$jobid should be a 404 as well
+ ?assertMatch({404, #{}}, req(delete, Top ++ ?JOBS ++ ?b2l(Id)))
+ end)}.
+create_two_jobs({Top, {Db1, Db2, _}}) ->
+ {timeout, ?TIMEOUT, ?_test(begin
+ Jobs = Top ++ ?JOBS,
+ ?assertMatch({201, [#{?OK := true}]},
+ req(post, Jobs, #{type => split, db => Db1})),
+ ?assertMatch({201, [#{?OK := true}]},
+ req(post, Jobs, #{type => split, db => Db2})),
+ ?assertMatch({200, #{<<"total">> := 2}}, req(get, Top ++ ?RESHARD)),
+ ?assertMatch({200, #{
+ <<"jobs">> := [#{?ID := Id1}, #{?ID := Id2}],
+ <<"offset">> := 0,
+ <<"total_rows">> := 2
+ }} when Id1 =/= Id2, req(get, Jobs)),
+ {200, #{<<"jobs">> := [#{?ID := Id1}, #{?ID := Id2}]}} = req(get, Jobs),
+ {200, #{?OK := true}} = req(delete, Jobs ++ ?b2l(Id1)),
+ ?assertMatch({200, #{<<"total">> := 1}}, req(get, Top ++ ?RESHARD)),
+ {200, #{?OK := true}} = req(delete, Jobs ++ ?b2l(Id2)),
+ ?assertMatch({200, #{<<"total">> := 0}}, req(get, Top ++ ?RESHARD))
+ end)}.
+create_multiple_jobs_from_one_post({Top, {_, _, Db3}}) ->
+ {timeout, ?TIMEOUT, ?_test(begin
+ Jobs = Top ++ ?JOBS,
+ {C1, R1} = req(post, Jobs, #{type => split, db => Db3}),
+ ?assertMatch({201, [#{?OK := true}, #{?OK := true}]}, {C1, R1}),
+ ?assertMatch({200, #{<<"total">> := 2}}, req(get, Top ++ ?RESHARD))
+ end)}.
+start_stop_cluster_basic({Top, _}) ->
+ {timeout, ?TIMEOUT, ?_test(begin
+ Url = Top ++ ?STATE,
+ ?assertMatch({200, #{
+ <<"state">> := <<"running">>,
+ <<"reason">> := null
+ }}, req(get, Url)),
+ ?assertMatch({200, _}, req(put, Url, #{state => stopped})),
+ ?assertMatch({200, #{
+ <<"state">> := <<"stopped">>,
+ <<"reason">> := R
+ }} when is_binary(R), req(get, Url)),
+ ?assertMatch({200, _}, req(put, Url, #{state => running})),
+ % Make sure the reason shows in the state GET request
+ Reason = <<"somereason">>,
+ ?assertMatch({200, _}, req(put, Url, #{state => stopped,
+ reason => Reason})),
+ ?assertMatch({200, #{<<"state">> := <<"stopped">>,
+ <<"reason">> := Reason}}, req(get, Url)),
+ % Top level summary also shows the reason
+ ?assertMatch({200, #{
+ <<"state">> := <<"stopped">>,
+ <<"state_reason">> := Reason
+ }}, req(get, Top ++ ?RESHARD)),
+ ?assertMatch({200, _}, req(put, Url, #{state => running})),
+ ?assertMatch({200, #{<<"state">> := <<"running">>}}, req(get, Url))
+ end)}.
+test_disabled({Top, _}) ->
+ {timeout, ?TIMEOUT, ?_test(begin
+ application:set_env(mem3, reshard_disabled, true),
+ ?assertMatch({501, _}, req(get, Top ++ ?RESHARD)),
+ ?assertMatch({501, _}, req(put, Top ++ ?STATE, #{state => running})),
+ application:unset_env(mem3, reshard_disabled),
+ ?assertMatch({200, _}, req(get, Top ++ ?RESHARD)),
+ ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => running}))
+ end)}.
+start_stop_cluster_with_a_job({Top, {Db1, _, _}}) ->
+ {timeout, ?TIMEOUT, ?_test(begin
+ Url = Top ++ ?STATE,
+ ?assertMatch({200, _}, req(put, Url, #{state => stopped})),
+ ?assertMatch({200, #{<<"state">> := <<"stopped">>}}, req(get, Url)),
+ % Can add jobs with global state stopped, they just won't be running
+ {201, R1} = req(post, Top ++ ?JOBS, #{type => split, db => Db1}),
+ ?assertMatch([#{?OK := true}], R1),
+ [#{?ID := Id1}] = R1,
+ {200, J1} = req(get, Top ++ ?JOBS ++ ?b2l(Id1)),
+ ?assertMatch(#{?ID := Id1, <<"job_state">> := <<"stopped">>}, J1),
+ % Check summary stats
+ ?assertMatch({200, #{
+ <<"state">> := <<"stopped">>,
+ <<"running">> := 0,
+ <<"stopped">> := 1,
+ <<"total">> := 1
+ }}, req(get, Top ++ ?RESHARD)),
+ % Can delete the job when stopped
+ {200, #{?OK := true}} = req(delete, Top ++ ?JOBS ++ ?b2l(Id1)),
+ ?assertMatch({200, #{
+ <<"state">> := <<"stopped">>,
+ <<"running">> := 0,
+ <<"stopped">> := 0,
+ <<"total">> := 0
+ }}, req(get, Top ++ ?RESHARD)),
+ % Add same job again
+ {201, [#{?ID := Id2}]} = req(post, Top ++ ?JOBS, #{type => split,
+ db => Db1}),
+ ?assertMatch({200, #{?ID := Id2, <<"job_state">> := <<"stopped">>}},
+ req(get, Top ++ ?JOBS ++ ?b2l(Id2))),
+ % Job should start after resharding is started on the cluster
+ ?assertMatch({200, _}, req(put, Url, #{state => running})),
+ ?assertMatch({200, #{?ID := Id2, <<"job_state">> := JSt}}
+ when JSt =/= <<"stopped">>, req(get, Top ++ ?JOBS ++ ?b2l(Id2)))
+ end)}.
+individual_job_start_stop({Top, {Db1, _, _}}) ->
+ {timeout, ?TIMEOUT, ?_test(begin
+ intercept_state(topoff1),
+ Body = #{type => split, db => Db1},
+ {201, [#{?ID := Id}]} = req(post, Top ++ ?JOBS, Body),
+ JobUrl = Top ++ ?JOBS ++ ?b2l(Id),
+ StUrl = JobUrl ++ "/state",
+ % Wait for the the job to start running and intercept it in topoff1 state
+ receive {JobPid, topoff1} -> ok end,
+ % Tell the intercept to never finish checkpointing so job is left hanging
+ % forever in running state
+ JobPid ! cancel,
+ ?assertMatch({200, #{<<"state">> := <<"running">>}}, req(get, StUrl)),
+ {200, _} = req(put, StUrl, #{state => stopped}),
+ wait_state(StUrl, <<"stopped">>),
+ % Stop/start resharding globally and job should still stay stopped
+ ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => stopped})),
+ ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => running})),
+ ?assertMatch({200, #{<<"state">> := <<"stopped">>}}, req(get, StUrl)),
+ % Start the job again
+ ?assertMatch({200, _}, req(put, StUrl, #{state => running})),
+ % Wait for the the job to start running and intercept it in topoff1 state
+ receive {JobPid2, topoff1} -> ok end,
+ ?assertMatch({200, #{<<"state">> := <<"running">>}}, req(get, StUrl)),
+ % Let it continue running and it should complete eventually
+ JobPid2 ! continue,
+ wait_state(StUrl, <<"completed">>)
+ end)}.
+individual_job_stop_when_cluster_stopped({Top, {Db1, _, _}}) ->
+ {timeout, ?TIMEOUT, ?_test(begin
+ intercept_state(topoff1),
+ Body = #{type => split, db => Db1},
+ {201, [#{?ID := Id}]} = req(post, Top ++ ?JOBS, Body),
+ JobUrl = Top ++ ?JOBS ++ ?b2l(Id),
+ StUrl = JobUrl ++ "/state",
+ % Wait for the the job to start running and intercept in topoff1
+ receive {JobPid, topoff1} -> ok end,
+ % Tell the intercept to never finish checkpointing so job is left
+ % hanging forever in running state
+ JobPid ! cancel,
+ ?assertMatch({200, #{<<"state">> := <<"running">>}}, req(get, StUrl)),
+ % Stop resharding globally
+ ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => stopped})),
+ wait_state(StUrl, <<"stopped">>),
+ % Stop the job specifically
+ {200, _} = req(put, StUrl, #{state => stopped}),
+ % Job stays stopped
+ ?assertMatch({200, #{<<"state">> := <<"stopped">>}}, req(get, StUrl)),
+ % Set cluster to running again
+ ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => running})),
+ % The job should stay stopped
+ ?assertMatch({200, #{<<"state">> := <<"stopped">>}}, req(get, StUrl)),
+ % It should be possible to resume job and it should complete
+ ?assertMatch({200, _}, req(put, StUrl, #{state => running})),
+ % Wait for the the job to start running and intercept in topoff1 state
+ receive {JobPid2, topoff1} -> ok end,
+ ?assertMatch({200, #{<<"state">> := <<"running">>}}, req(get, StUrl)),
+ % Let it continue running and it should complete eventually
+ JobPid2 ! continue,
+ wait_state(StUrl, <<"completed">>)
+ end)}.
+create_job_with_invalid_arguments({Top, {Db1, _, _}}) ->
+ {timeout, ?TIMEOUT, ?_test(begin
+ Jobs = Top ++ ?JOBS,
+ % Nothing in the body
+ ?assertMatch({400, _}, req(post, Jobs, #{})),
+ % Missing type
+ ?assertMatch({400, _}, req(post, Jobs, #{db => Db1})),
+ % Have type but no db and no shard
+ ?assertMatch({400, _}, req(post, Jobs, #{type => split})),
+ % Have type and db but db is invalid
+ ?assertMatch({400, _}, req(post, Jobs, #{db => <<"baddb">>,
+ type => split})),
+ % Have type and shard but shard is not an existing database
+ ?assertMatch({404, _}, req(post, Jobs, #{type => split,
+ shard => <<"shards/80000000-ffffffff/baddb.1549492084">>})),
+ % Bad range values, too large, different types, inverted
+ ?assertMatch({400, _}, req(post, Jobs, #{db => Db1, range => 42,
+ type => split})),
+ ?assertMatch({400, _}, req(post, Jobs, #{db => Db1,
+ range => <<"x">>, type => split})),
+ ?assertMatch({400, _}, req(post, Jobs, #{db => Db1,
+ range => <<"ffffffff-80000000">>, type => split})),
+ ?assertMatch({400, _}, req(post, Jobs, #{db => Db1,
+ range => <<"00000000-fffffffff">>, type => split})),
+ % Can't have both db and shard
+ ?assertMatch({400, _}, req(post, Jobs, #{type => split, db => Db1,
+ shard => <<"blah">>}))
+ end)}.
+create_job_with_db({Top, {Db1, _, _}}) ->
+ {timeout, ?TIMEOUT, ?_test(begin
+ Jobs = Top ++ ?JOBS,
+ Body1 = #{type => split, db => Db1},
+ % Node with db
+ N = atom_to_binary(node(), utf8),
+ {C1, R1} = req(post, Jobs, Body1#{node => N}),
+ ?assertMatch({201, [#{?OK := true}]}, {C1, R1}),
+ wait_to_complete_then_cleanup(Top, R1),
+ % Range and db
+ {C2, R2} = req(post, Jobs, Body1#{range => <<"00000000-7fffffff">>}),
+ ?assertMatch({201, [#{?OK := true}]}, {C2, R2}),
+ wait_to_complete_then_cleanup(Top, R2),
+ % Node, range and db
+ Range = <<"80000000-ffffffff">>,
+ {C3, R3} = req(post, Jobs, Body1#{range => Range, node => N}),
+ ?assertMatch({201, [#{?OK := true}]}, {C3, R3}),
+ wait_to_complete_then_cleanup(Top, R3),
+ ?assertMatch([
+ [16#00000000, 16#3fffffff],
+ [16#40000000, 16#7fffffff],
+ [16#80000000, 16#bfffffff],
+ [16#c0000000, 16#ffffffff]
+ ], [mem3:range(S) || S <- lists:sort(mem3:shards(Db1))])
+ end)}.
+create_job_with_shard_name({Top, {_, _, Db3}}) ->
+ {timeout, ?TIMEOUT, ?_test(begin
+ Jobs = Top ++ ?JOBS,
+ [S1, S2] = [mem3:name(S) || S <- lists:sort(mem3:shards(Db3))],
+ % Shard only
+ {C1, R1} = req(post, Jobs, #{type => split, shard => S1}),
+ ?assertMatch({201, [#{?OK := true}]}, {C1, R1}),
+ wait_to_complete_then_cleanup(Top, R1),
+ % Shard with a node
+ N = atom_to_binary(node(), utf8),
+ {C2, R2} = req(post, Jobs, #{type => split, shard => S2, node => N}),
+ ?assertMatch({201, [#{?OK := true}]}, {C2, R2}),
+ wait_to_complete_then_cleanup(Top, R2),
+ ?assertMatch([
+ [16#00000000, 16#3fffffff],
+ [16#40000000, 16#7fffffff],
+ [16#80000000, 16#bfffffff],
+ [16#c0000000, 16#ffffffff]
+ ], [mem3:range(S) || S <- lists:sort(mem3:shards(Db3))])
+ end)}.
+completed_job_handling({Top, {Db1, _, _}}) ->
+ {timeout, ?TIMEOUT, ?_test(begin
+ Jobs = Top ++ ?JOBS,
+ % Run job to completion
+ {C1, R1} = req(post, Jobs, #{type => split, db => Db1}),
+ ?assertMatch({201, [#{?OK := true}]}, {C1, R1}),
+ [#{?ID := Id}] = R1,
+ wait_to_complete(Top, R1),
+ % Check top level stats
+ ?assertMatch({200, #{
+ <<"state">> := <<"running">>,
+ <<"state_reason">> := null,
+ <<"completed">> := 1,
+ <<"failed">> := 0,
+ <<"running">> := 0,
+ <<"stopped">> := 0,
+ <<"total">> := 1
+ }}, req(get, Top ++ ?RESHARD)),
+ % Job state itself
+ JobUrl = Jobs ++ ?b2l(Id),
+ ?assertMatch({200, #{
+ <<"split_state">> := <<"completed">>,
+ <<"job_state">> := <<"completed">>
+ }}, req(get, JobUrl)),
+ % Job's state endpoint
+ StUrl = Jobs ++ ?b2l(Id) ++ "/state",
+ ?assertMatch({200, #{<<"state">> := <<"completed">>}}, req(get, StUrl)),
+ % Try to stop it and it should stay completed
+ {200, _} = req(put, StUrl, #{state => stopped}),
+ ?assertMatch({200, #{<<"state">> := <<"completed">>}}, req(get, StUrl)),
+ % Try to resume it and it should stay completed
+ {200, _} = req(put, StUrl, #{state => running}),
+ ?assertMatch({200, #{<<"state">> := <<"completed">>}}, req(get, StUrl)),
+ % Stop resharding globally and job should still stay completed
+ ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => stopped})),
+ ?assertMatch({200, #{<<"state">> := <<"completed">>}}, req(get, StUrl)),
+ % Start resharding and job stays completed
+ ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => running})),
+ ?assertMatch({200, #{<<"state">> := <<"completed">>}}, req(get, StUrl)),
+ ?assertMatch({200, #{?OK := true}}, req(delete, JobUrl))
+ end)}.
+handle_db_deletion_in_topoff1({Top, {Db1, _, _}}) ->
+ {timeout, ?TIMEOUT, ?_test(begin
+ JobId = delete_source_in_state(Top, Db1, topoff1),
+ wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"failed">>)
+ end)}.
+handle_db_deletion_in_initial_copy({Top, {Db1, _, _}}) ->
+ {timeout, ?TIMEOUT, ?_test(begin
+ JobId = delete_source_in_state(Top, Db1, initial_copy),
+ wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"failed">>)
+ end)}.
+handle_db_deletion_in_copy_local_docs({Top, {Db1, _, _}}) ->
+ {timeout, ?TIMEOUT, ?_test(begin
+ JobId = delete_source_in_state(Top, Db1, copy_local_docs),
+ wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"failed">>)
+ end)}.
+handle_db_deletion_in_build_indices({Top, {Db1, _, _}}) ->
+ {timeout, ?TIMEOUT, ?_test(begin
+ JobId = delete_source_in_state(Top, Db1, build_indices),
+ wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"failed">>)
+ end)}.
+handle_db_deletion_in_update_shard_map({Top, {Db1, _, _}}) ->
+ {timeout, ?TIMEOUT, ?_test(begin
+ JobId = delete_source_in_state(Top, Db1, update_shardmap),
+ wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"failed">>)
+ end)}.
+handle_db_deletion_in_wait_source_close({Top, {Db1, _, _}}) ->
+ {timeout, ?TIMEOUT, ?_test(begin
+ JobId = delete_source_in_state(Top, Db1, wait_source_close),
+ wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"failed">>)
+ end)}.
+recover_in_topoff1({Top, {Db1, _, _}}) ->
+ {timeout, ?TIMEOUT, ?_test(begin
+ JobId = recover_in_state(Top, Db1, topoff1),
+ wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>)
+ end)}.
+recover_in_initial_copy({Top, {Db1, _, _}}) ->
+ {timeout, ?TIMEOUT, ?_test(begin
+ JobId = recover_in_state(Top, Db1, initial_copy),
+ wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>)
+ end)}.
+recover_in_copy_local_docs({Top, {Db1, _, _}}) ->
+ {timeout, ?TIMEOUT, ?_test(begin
+ JobId = recover_in_state(Top, Db1, copy_local_docs),
+ wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>)
+ end)}.
+recover_in_build_indices({Top, {Db1, _, _}}) ->
+ {timeout, ?TIMEOUT, ?_test(begin
+ JobId = recover_in_state(Top, Db1, build_indices),
+ wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>)
+ end)}.
+recover_in_update_shard_map({Top, {Db1, _, _}}) ->
+ {timeout, ?TIMEOUT, ?_test(begin
+ JobId = recover_in_state(Top, Db1, update_shardmap),
+ wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>)
+ end)}.
+recover_in_wait_source_close({Top, {Db1, _, _}}) ->
+ {timeout, ?TIMEOUT, ?_test(begin
+ JobId = recover_in_state(Top, Db1, wait_source_close),
+ wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>)
+ end)}.
+recover_in_topoff3({Top, {Db1, _, _}}) ->
+ {timeout, ?TIMEOUT, ?_test(begin
+ JobId = recover_in_state(Top, Db1, topoff3),
+ wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>)
+ end)}.
+recover_in_source_delete({Top, {Db1, _, _}}) ->
+ {timeout, ?TIMEOUT, ?_test(begin
+ JobId = recover_in_state(Top, Db1, source_delete),
+ wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>)
+ end)}.
+check_max_jobs({Top, {Db1, Db2, _}}) ->
+ {timeout, ?TIMEOUT, ?_test(begin
+ Jobs = Top ++ ?JOBS,
+ config:set("reshard", "max_jobs", "0", _Persist=false),
+ {C1, R1} = req(post, Jobs, #{type => split, db => Db1}),
+ ?assertMatch({500, [#{<<"error">> := <<"max_jobs_exceeded">>}]}, {C1, R1}),
+ config:set("reshard", "max_jobs", "1", _Persist=false),
+ {201, R2} = req(post, Jobs, #{type => split, db => Db1}),
+ wait_to_complete(Top, R2),
+ % Stop clustering so jobs are not started anymore and ensure max jobs
+ % is enforced even if jobs are stopped
+ ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => stopped})),
+ {C3, R3} = req(post, Jobs, #{type => split, db => Db2}),
+ ?assertMatch({500, [#{<<"error">> := <<"max_jobs_exceeded">>}]},
+ {C3, R3}),
+ % Allow the job to be created by raising max_jobs
+ config:set("reshard", "max_jobs", "2", _Persist=false),
+ {C4, R4} = req(post, Jobs, #{type => split, db => Db2}),
+ ?assertEqual(201, C4),
+ % Lower max_jobs after job is created but it's not running
+ config:set("reshard", "max_jobs", "1", _Persist=false),
+ % Start resharding again
+ ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => running})),
+ % Jobs that have been created already are not removed if max jobs is lowered
+ % so make sure the job completes
+ wait_to_complete(Top, R4)
+ end)}.
+check_node_and_range_required_params({Top, {Db1, _, _}}) ->
+ {timeout, ?TIMEOUT, ?_test(begin
+ Jobs = Top ++ ?JOBS,
+ Node = atom_to_binary(node(), utf8),
+ Range = <<"00000000-ffffffff">>,
+ config:set("reshard", "require_node_param", "true", _Persist=false),
+ {C1, R1} = req(post, Jobs, #{type => split, db => Db1}),
+ NodeRequiredErr = <<"`node` prameter is required">>,
+ ?assertEqual({400, #{<<"error">> => <<"bad_request">>,
+ <<"reason">> => NodeRequiredErr}}, {C1, R1}),
+ config:set("reshard", "require_range_param", "true", _Persist=false),
+ {C2, R2} = req(post, Jobs, #{type => split, db => Db1, node => Node}),
+ RangeRequiredErr = <<"`range` prameter is required">>,
+ ?assertEqual({400, #{<<"error">> => <<"bad_request">>,
+ <<"reason">> => RangeRequiredErr}}, {C2, R2}),
+ Body = #{type => split, db => Db1, range => Range, node => Node},
+ {C3, R3} = req(post, Jobs, Body),
+ ?assertMatch({201, [#{?OK := true}]}, {C3, R3}),
+ wait_to_complete_then_cleanup(Top, R3)
+ end)}.
+cleanup_completed_jobs({Top, {Db1, _, _}}) ->
+ {timeout, ?TIMEOUT, ?_test(begin
+ Body = #{type => split, db => Db1},
+ {201, [#{?ID := Id}]} = req(post, Top ++ ?JOBS, Body),
+ JobUrl = Top ++ ?JOBS ++ ?b2l(Id),
+ wait_state(JobUrl ++ "/state", <<"completed">>),
+ delete_db(Top, Db1),
+ wait_for_http_code(JobUrl, 404)
+ end)}.
+% Test help functions
+wait_to_complete_then_cleanup(Top, Jobs) ->
+ JobsUrl = Top ++ ?JOBS,
+ lists:foreach(fun(#{?ID := Id}) ->
+ wait_state(JobsUrl ++ ?b2l(Id) ++ "/state", <<"completed">>),
+ {200, _} = req(delete, JobsUrl ++ ?b2l(Id))
+ end, Jobs).
+wait_to_complete(Top, Jobs) ->
+ JobsUrl = Top ++ ?JOBS,
+ lists:foreach(fun(#{?ID := Id}) ->
+ wait_state(JobsUrl ++ ?b2l(Id) ++ "/state", <<"completed">>)
+ end, Jobs).
+intercept_state(State) ->
+ TestPid = self(),
+ meck:new(mem3_reshard_job, [passthrough]),
+ meck:expect(mem3_reshard_job, checkpoint_done, fun(Job) ->
+ case Job#job.split_state of
+ State ->
+ TestPid ! {self(), State},
+ receive
+ continue -> meck:passthrough([Job]);
+ cancel -> ok
+ end;
+ _ ->
+ meck:passthrough([Job])
+ end
+ end).
+cancel_intercept() ->
+ meck:expect(mem3_reshard_job, checkpoint_done, fun(Job) ->
+ meck:passthrough([Job])
+ end).
+wait_state(Url, State) ->
+ test_util:wait(fun() ->
+ case req(get, Url) of
+ {200, #{<<"state">> := State}} -> ok;
+ {200, #{}} -> timer:sleep(100), wait
+ end
+ end, 30000).
+wait_for_http_code(Url, Code) when is_integer(Code) ->
+ test_util:wait(fun() ->
+ case req(get, Url) of
+ {Code, _} -> ok;
+ {_, _} -> timer:sleep(100), wait
+ end
+ end, 30000).
+delete_source_in_state(Top, Db, State) when is_atom(State), is_binary(Db) ->
+ intercept_state(State),
+ Body = #{type => split, db => Db},
+ {201, [#{?ID := Id}]} = req(post, Top ++ ?JOBS, Body),
+ receive {JobPid, State} -> ok end,
+ sync_delete_db(Top, Db),
+ JobPid ! continue,
+ Id.
+recover_in_state(Top, Db, State) when is_atom(State) ->
+ intercept_state(State),
+ Body = #{type => split, db => Db},
+ {201, [#{?ID := Id}]} = req(post, Top ++ ?JOBS, Body),
+ receive {JobPid, State} -> ok end,
+ % Job is now stuck in running we prevented it from executing
+ % the given state
+ JobPid ! cancel,
+ % Now restart resharding
+ ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => stopped})),
+ cancel_intercept(),
+ ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => running})),
+ Id.
+create_db(Top, Db, QArgs) when is_binary(Db) ->
+ Url = Top ++ binary_to_list(Db) ++ QArgs,
+ {ok, Status, _, _} = test_request:put(Url, [?JSON, ?AUTH], "{}"),
+ ?assert(Status =:= 201 orelse Status =:= 202).
+delete_db(Top, Db) when is_binary(Db) ->
+ Url = Top ++ binary_to_list(Db),
+ case test_request:get(Url, [?AUTH]) of
+ {ok, 404, _, _} ->
+ not_found;
+ {ok, 200, _, _} ->
+ {ok, 200, _, _} = test_request:delete(Url, [?AUTH]),
+ ok
+ end.
+sync_delete_db(Top, Db) when is_binary(Db) ->
+ delete_db(Top, Db),
+ try
+ Shards = mem3:local_shards(Db),
+ ShardNames = [mem3:name(S) || S <- Shards],
+ [couch_server:delete(N, [?ADMIN_CTX]) || N <- ShardNames],
+ ok
+ catch
+ error:database_does_not_exist ->
+ ok
+ end.
+req(Method, Url) ->
+ Headers = [?AUTH],
+ {ok, Code, _, Res} = test_request:request(Method, Url, Headers),
+ {Code, jiffy:decode(Res, [return_maps])}.
+req(Method, Url, #{} = Body) ->
+ req(Method, Url, jiffy:encode(Body));
+req(Method, Url, Body) ->
+ Headers = [?JSON, ?AUTH],
+ {ok, Code, _, Res} = test_request:request(Method, Url, Headers, Body),
+ {Code, jiffy:decode(Res, [return_maps])}.