diff options
author | Nick Vatamaniuc <vatamane@apache.org> | 2019-03-18 13:32:15 -0400 |
---|---|---|
committer | Nick Vatamaniuc <nickva@users.noreply.github.com> | 2019-04-03 10:48:45 -0400 |
commit | a6db7d54e7651eee796932a60b534b208566f563 (patch) | |
tree | 5db1b16cec2cc628ceb1d71b925502feff234db8 | |
parent | bcdd99497601a3054fb1335ebb940fbe00b7469f (diff) | |
download | couchdb-a6db7d54e7651eee796932a60b534b208566f563.tar.gz |
Implement resharding HTTP API
This implements the API as defined in RFC #1920
The handlers live in the `mem3_reshard_httpd` and helpers, like validators live
in the `mem3_reshard_api` module.
There are also a bunch of high level (HTTP & fabric) API tests that check that
shard splitting happens properly, jobs are behaving as defined in the RFC, etc.
Co-authored-by: Eric Avdey <eiri@eiri.ca>
-rw-r--r-- | src/chttpd/src/chttpd_auth_request.erl | 2 | ||||
-rw-r--r-- | src/mem3/src/mem3_httpd_handlers.erl | 1 | ||||
-rw-r--r-- | src/mem3/src/mem3_reshard_api.erl | 195 | ||||
-rw-r--r-- | src/mem3/src/mem3_reshard_httpd.erl | 317 | ||||
-rw-r--r-- | src/mem3/test/mem3_reshard_api_test.erl | 817 | ||||
-rw-r--r-- | src/mem3/test/mem3_reshard_changes_feed_test.erl | 388 | ||||
-rw-r--r-- | src/mem3/test/mem3_reshard_test.erl | 804 | ||||
-rw-r--r-- | test/elixir/lib/couch/db_test.ex | 3 | ||||
-rw-r--r-- | test/elixir/test/reshard_all_docs_test.exs | 79 | ||||
-rw-r--r-- | test/elixir/test/reshard_basic_test.exs | 174 | ||||
-rw-r--r-- | test/elixir/test/reshard_changes_feed.exs | 81 | ||||
-rw-r--r-- | test/elixir/test/reshard_helpers.exs | 111 | ||||
-rw-r--r-- | test/elixir/test/test_helper.exs | 1 |
13 files changed, 2971 insertions, 2 deletions
diff --git a/src/chttpd/src/chttpd_auth_request.erl b/src/chttpd/src/chttpd_auth_request.erl index 5b4ec84d5..96dbf980c 100644 --- a/src/chttpd/src/chttpd_auth_request.erl +++ b/src/chttpd/src/chttpd_auth_request.erl @@ -50,6 +50,8 @@ authorize_request_int(#httpd{path_parts=[<<"_replicator">>,<<"_changes">>|_]}=Re require_admin(Req); authorize_request_int(#httpd{path_parts=[<<"_replicator">>|_]}=Req) -> db_authorization_check(Req); +authorize_request_int(#httpd{path_parts=[<<"_reshard">>|_]}=Req) -> + require_admin(Req); authorize_request_int(#httpd{path_parts=[<<"_users">>], method='PUT'}=Req) -> require_admin(Req); authorize_request_int(#httpd{path_parts=[<<"_users">>], method='DELETE'}=Req) -> diff --git a/src/mem3/src/mem3_httpd_handlers.erl b/src/mem3/src/mem3_httpd_handlers.erl index 7cbd9fe5f..7dd6ab052 100644 --- a/src/mem3/src/mem3_httpd_handlers.erl +++ b/src/mem3/src/mem3_httpd_handlers.erl @@ -15,6 +15,7 @@ -export([url_handler/1, db_handler/1, design_handler/1]). url_handler(<<"_membership">>) -> fun mem3_httpd:handle_membership_req/1; +url_handler(<<"_reshard">>) -> fun mem3_reshard_httpd:handle_reshard_req/1; url_handler(_) -> no_match. db_handler(<<"_shards">>) -> fun mem3_httpd:handle_shards_req/2; diff --git a/src/mem3/src/mem3_reshard_api.erl b/src/mem3/src/mem3_reshard_api.erl new file mode 100644 index 000000000..f39df4cbb --- /dev/null +++ b/src/mem3/src/mem3_reshard_api.erl @@ -0,0 +1,195 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(mem3_reshard_api). + +-export([ + create_jobs/5, + get_jobs/0, + get_job/1, + get_summary/0, + resume_job/1, + stop_job/2, + start_shard_splitting/0, + stop_shard_splitting/1, + get_shard_splitting_state/0 +]). + + +create_jobs(Node, Shard, Db, Range, split) -> + lists:map(fun(S) -> + N = mem3:node(S), + Name = mem3:name(S), + case rpc:call(N, mem3_reshard, start_split_job, [Name]) of + {badrpc, Error} -> + {error, Error, N, Name}; + {ok, JobId} -> + {ok, JobId, N, Name}; + {error, Error} -> + {error, Error, N, Name} + end + end, pick_shards(Node, Shard, Db, Range)). + + +get_jobs() -> + Nodes = mem3_util:live_nodes(), + {Replies, _Bad} = rpc:multicall(Nodes, mem3_reshard, jobs, []), + lists:flatten(Replies). + + +get_job(JobId) -> + Nodes = mem3_util:live_nodes(), + {Replies, _Bad} = rpc:multicall(Nodes, mem3_reshard, job, [JobId]), + case [JobInfo || {ok, JobInfo} <- Replies] of + [JobInfo | _] -> + {ok, JobInfo}; + [] -> + {error, not_found} + end. + + +get_summary() -> + Nodes = mem3_util:live_nodes(), + {Replies, _Bad} = rpc:multicall(Nodes, mem3_reshard, get_state, []), + Stats0 = #{running => 0, total => 0, completed => 0, failed => 0, + stopped => 0}, + StatsF = lists:foldl(fun({Res}, Stats) -> + maps:map(fun(Stat, OldVal) -> + OldVal + couch_util:get_value(Stat, Res, 0) + end, Stats) + end, Stats0, Replies), + {State, Reason} = state_and_reason(Replies), + StateReasonProps = [{state, State}, {state_reason, Reason}], + {StateReasonProps ++ lists:sort(maps:to_list(StatsF))}. + + +resume_job(JobId) -> + Nodes = mem3_util:live_nodes(), + {Replies, _Bad} = rpc:multicall(Nodes, mem3_reshard, resume_job, + [JobId]), + WithoutNotFound = [R || R <- Replies, R =/= {error, not_found}], + case lists:usort(WithoutNotFound) of + [ok] -> + ok; + [{error, Error} | _] -> + {error, {[{error, couch_util:to_binary(Error)}]}}; + [] -> + {error, not_found} + end. + + +stop_job(JobId, Reason) -> + Nodes = mem3_util:live_nodes(), + {Replies, _Bad} = rpc:multicall(Nodes, mem3_reshard, stop_job, + [JobId, Reason]), + WithoutNotFound = [R || R <- Replies, R =/= {error, not_found}], + case lists:usort(WithoutNotFound) of + [ok] -> + ok; + [{error, Error} | _] -> + {error, {[{error, couch_util:to_binary(Error)}]}}; + [] -> + {error, not_found} + end. + + +start_shard_splitting() -> + {Replies, _Bad} = rpc:multicall(mem3_reshard, start, []), + case lists:usort(lists:flatten(Replies)) of + [ok] -> + {ok, {[{ok, true}]}}; + [Error | _] -> + {error, {[{error, couch_util:to_binary(Error)}]}} + end. + + +stop_shard_splitting(Reason) -> + {Replies, _Bad} = rpc:multicall(mem3_reshard, stop, [Reason]), + case lists:usort(lists:flatten(Replies)) of + [ok] -> + {ok, {[{ok, true}]}}; + [Error | _] -> + {error, {[{error, couch_util:to_binary(Error)}]}} + end. + + +get_shard_splitting_state() -> + Nodes = mem3_util:live_nodes(), + {Replies, _Bad} = rpc:multicall(Nodes, mem3_reshard, get_state, []), + state_and_reason(Replies). + + +state_and_reason(StateReplies) -> + AccF = lists:foldl(fun({ResProps}, Acc) -> + Reason = get_reason(ResProps), + case couch_util:get_value(state, ResProps) of + <<"running">> -> orddict:append(running, Reason, Acc); + <<"stopped">> -> orddict:append(stopped, Reason, Acc); + undefined -> Acc + end + end, orddict:from_list([{running, []}, {stopped, []}]), StateReplies), + Running = orddict:fetch(running, AccF), + case length(Running) > 0 of + true -> + Reason = pick_reason(Running), + {running, Reason}; + false -> + Reason = pick_reason(orddict:fetch(stopped, AccF)), + {stopped, Reason} + end. + + +pick_reason(Reasons) -> + Reasons1 = lists:usort(Reasons), + Reasons2 = [R || R <- Reasons1, R =/= undefined], + case Reasons2 of + [] -> null; + [R1 | _] -> R1 + end. + + +get_reason(StateProps) when is_list(StateProps) -> + case couch_util:get_value(state_info, StateProps) of + [] -> undefined; + undefined -> undefined; + {SInfoProps} -> couch_util:get_value(reason, SInfoProps) + end. + + +pick_shards(undefined, undefined, Db, undefined) when is_binary(Db) -> + mem3:shards(Db); + +pick_shards(Node, undefined, Db, undefined) when is_atom(Node), + is_binary(Db) -> + [S || S <- mem3:shards(Db), mem3:node(S) == Node]; + +pick_shards(undefined, undefined, Db, [_B, _E] = Range) when is_binary(Db) -> + [S || S <- mem3:shards(Db), mem3:range(S) == Range]; + +pick_shards(Node, undefined, Db, [_B, _E] = Range) when is_atom(Node), + is_binary(Db) -> + [S || S <- mem3:shards(Db), mem3:node(S) == Node, mem3:range(S) == Range]; + +pick_shards(undefined, Shard, undefined, undefined) when is_binary(Shard) -> + Db = mem3:dbname(Shard), + [S || S <- mem3:shards(Db), mem3:name(S) == Shard]; + +pick_shards(Node, Shard, undefined, undefined) when is_atom(Node), + is_binary(Shard) -> + Db = mem3:dbname(Shard), + [S || S <- mem3:shards(Db), mem3:name(S) == Shard, mem3:node(S) == Node]; + +pick_shards(_, undefined, undefined, _) -> + throw({bad_request, <<"Must specify at least `db` or `shard`">>}); + +pick_shards(_, Db, Shard, _) when is_binary(Db), is_binary(Shard) -> + throw({bad_request, <<"`db` and `shard` are mutually exclusive">>}). diff --git a/src/mem3/src/mem3_reshard_httpd.erl b/src/mem3/src/mem3_reshard_httpd.erl new file mode 100644 index 000000000..3d0f77f39 --- /dev/null +++ b/src/mem3/src/mem3_reshard_httpd.erl @@ -0,0 +1,317 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(mem3_reshard_httpd). + +-export([ + handle_reshard_req/1 +]). + +-import(couch_httpd, [ + send_json/2, + send_json/3, + send_method_not_allowed/2 +]). + + +-include_lib("couch/include/couch_db.hrl"). + + +-define(JOBS, <<"jobs">>). +-define(STATE, <<"state">>). +-define(S_RUNNING, <<"running">>). +-define(S_STOPPED, <<"stopped">>). + + +% GET /_reshard +handle_reshard_req(#httpd{method='GET', path_parts=[_]} = Req) -> + reject_if_disabled(), + State = mem3_reshard_api:get_summary(), + send_json(Req, State); + +handle_reshard_req(#httpd{path_parts=[_]} = Req) -> + send_method_not_allowed(Req, "GET,HEAD"); + +% GET /_reshard/state +handle_reshard_req(#httpd{method='GET', + path_parts=[_, ?STATE]} = Req) -> + reject_if_disabled(), + {State, Reason} = mem3_reshard_api:get_shard_splitting_state(), + send_json(Req, {[{state, State}, {reason, Reason}]}); + +% PUT /_reshard/state +handle_reshard_req(#httpd{method='PUT', + path_parts=[_, ?STATE]} = Req) -> + reject_if_disabled(), + couch_httpd:validate_ctype(Req, "application/json"), + {Props} = couch_httpd:json_body_obj(Req), + State = couch_util:get_value(<<"state">>, Props), + Reason = couch_util:get_value(<<"reason">>, Props), + case {State, Reason} of + {undefined, _} -> + throw({bad_request, <<"Expected a `state` field">>}); + {?S_RUNNING, _} -> + case mem3_reshard_api:start_shard_splitting() of + {ok, JsonResult} -> + send_json(Req, 200, JsonResult); + {error, JsonError} -> + send_json(Req, 500, JsonError) + end; + {?S_STOPPED, Reason} -> + Reason1 = case Reason =:= undefined of + false -> Reason; + true -> <<"Cluster-wide resharding stopped by the user">> + end, + case mem3_reshard_api:stop_shard_splitting(Reason1) of + {ok, JsonResult} -> + send_json(Req, 200, JsonResult); + {error, JsonError} -> + send_json(Req, 500, JsonError) + end; + {_, _} -> + throw({bad_request, <<"State field not `running` or `stopped`">>}) + end; + +handle_reshard_req(#httpd{path_parts=[_, ?STATE]} = Req) -> + send_method_not_allowed(Req, "GET,HEAD,PUT"); + +handle_reshard_req(#httpd{path_parts=[_, ?STATE | _]} = _Req) -> + throw(not_found); + +% GET /_reshard/jobs +handle_reshard_req(#httpd{method='GET', path_parts=[_, ?JOBS]}=Req) -> + reject_if_disabled(), + Jobs = mem3_reshard_api:get_jobs(), + Total = length(Jobs), + send_json(Req, {[{total_rows, Total}, {offset, 0}, {jobs, Jobs}]}); + +% POST /_reshard/jobs {"node": "...", "shard": "..."} +handle_reshard_req(#httpd{method = 'POST', + path_parts=[_, ?JOBS]} = Req) -> + reject_if_disabled(), + couch_httpd:validate_ctype(Req, "application/json"), + {Props} = couch_httpd:json_body_obj(Req), + Node = validate_node(couch_util:get_value(<<"node">>, Props)), + Shard = validate_shard(couch_util:get_value(<<"shard">>, Props)), + Db = validate_db(couch_util:get_value(<<"db">>, Props)), + Range = validate_range(couch_util:get_value(<<"range">>, Props)), + Type = validate_type(couch_util:get_value(<<"type">>, Props)), + Res = mem3_reshard_api:create_jobs(Node, Shard, Db, Range, Type), + case Res of + [] -> throw(not_found); + _ -> ok + end, + Oks = length([R || {ok, _, _, _} = R <- Res]), + Code = case {Oks, length(Res)} of + {Oks, Oks} -> 201; + {Oks, _} when Oks > 0 -> 202; + {0, _} -> 500 + end, + EJson = lists:map(fun + ({ok, Id, N, S}) -> + {[{ok, true}, {id, Id}, {node, N}, {shard, S}]}; + ({error, E, N, S}) -> + {[{error, couch_util:to_binary(E)}, {node, N}, {shard, S}]} + end, Res), + send_json(Req, Code, EJson); + +handle_reshard_req(#httpd{path_parts=[_, ?JOBS]} = Req) -> + send_method_not_allowed(Req, "GET,HEAD,POST"); + +handle_reshard_req(#httpd{path_parts=[_, _]}) -> + throw(not_found); + +% GET /_reshard/jobs/$jobid +handle_reshard_req(#httpd{method='GET', + path_parts=[_, ?JOBS, JobId]}=Req) -> + reject_if_disabled(), + case mem3_reshard_api:get_job(JobId) of + {ok, JobInfo} -> + send_json(Req, JobInfo); + {error, not_found} -> + throw(not_found) + end; + +% DELETE /_reshard/jobs/$jobid +handle_reshard_req(#httpd{method='DELETE', + path_parts=[_, ?JOBS, JobId]}=Req) -> + reject_if_disabled(), + case mem3_reshard_api:get_job(JobId) of + {ok, {Props}} -> + NodeBin = couch_util:get_value(node, Props), + Node = binary_to_atom(NodeBin, utf8), + case rpc:call(Node, mem3_reshard, remove_job, [JobId]) of + ok -> + send_json(Req, 200, {[{ok, true}]}); + {error, not_found} -> + throw(not_found) + end; + {error, not_found} -> + throw(not_found) + end; + +handle_reshard_req(#httpd{path_parts=[_, ?JOBS, _]} = Req) -> + send_method_not_allowed(Req, "GET,HEAD,DELETE"); + +% GET /_reshard/jobs/$jobid/state +handle_reshard_req(#httpd{method='GET', + path_parts=[_, ?JOBS, JobId, ?STATE]} = Req) -> + reject_if_disabled(), + case mem3_reshard_api:get_job(JobId) of + {ok, {Props}} -> + JobState = couch_util:get_value(job_state, Props), + {SIProps} = couch_util:get_value(state_info, Props), + Reason = case couch_util:get_value(reason, SIProps) of + undefined -> null; + Val -> couch_util:to_binary(Val) + end, + send_json(Req, 200, {[{state, JobState}, {reason, Reason}]}); + {error, not_found} -> + throw(not_found) + end; + +% PUT /_reshard/jobs/$jobid/state +handle_reshard_req(#httpd{method='PUT', + path_parts=[_, ?JOBS, JobId, ?STATE]} = Req) -> + reject_if_disabled(), + couch_httpd:validate_ctype(Req, "application/json"), + {Props} = couch_httpd:json_body_obj(Req), + State = couch_util:get_value(<<"state">>, Props), + Reason = couch_util:get_value(<<"reason">>, Props), + case {State, Reason} of + {undefined, _} -> + throw({bad_request, <<"Expected a `state` field">>}); + {?S_RUNNING, _} -> + case mem3_reshard_api:resume_job(JobId) of + ok -> + send_json(Req, 200, {[{ok, true}]}); + {error, not_found} -> + throw(not_found); + {error, JsonError} -> + send_json(Req, 500, JsonError) + end; + {?S_STOPPED, Reason} -> + Reason1 = case Reason =:= undefined of + false -> Reason; + true -> <<"Stopped by user">> + end, + case mem3_reshard_api:stop_job(JobId, Reason1) of + ok -> + send_json(Req, 200, {[{ok, true}]}); + {error, not_found} -> + throw(not_found); + {error, JsonError} -> + send_json(Req, 500, JsonError) + end; + {_, _} -> + throw({bad_request, <<"State field not `running` or `stopped`">>}) + end; + +handle_reshard_req(#httpd{path_parts=[_, ?JOBS, _, ?STATE]} = Req) -> + send_method_not_allowed(Req, "GET,HEAD,PUT"). + + +reject_if_disabled() -> + case mem3_reshard:is_disabled() of + true -> throw(not_implemented); + false -> ok + end. + + +validate_type(<<"split">>) -> + split; + +validate_type(_Type) -> + throw({bad_request, <<"`job type must be `split`">>}). + + +validate_node(undefined) -> + undefined; + +validate_node(Node0) when is_binary(Node0) -> + Nodes = mem3_util:live_nodes(), + try binary_to_existing_atom(Node0, utf8) of + N1 -> + case lists:member(N1, Nodes) of + true -> N1; + false -> throw({bad_request, <<"Not connected to `node`">>}) + end + catch + error:badarg -> + throw({bad_request, <<"`node` is not a valid node name">>}) + end; + +validate_node(_Node) -> + throw({bad_request, <<"Invalid `node`">>}). + + +validate_shard(undefined) -> + undefined; + +validate_shard(Shard) when is_binary(Shard) -> + case Shard of + <<"shards/", _:8/binary, "-", _:8/binary, "/", _/binary>> -> + Shard; + _ -> + throw({bad_request, <<"`shard` is invalid">>}) + end; + +validate_shard(_Shard) -> + throw({bad_request, <<"Invalid `shard`">>}). + + +validate_db(undefined) -> + undefined; + +validate_db(DbName) when is_binary(DbName) -> + try mem3:shards(DbName) of + [_ | _] -> DbName; + _ -> throw({bad_request, <<"`No shards in `db`">>}) + catch + _:_ -> + throw({bad_request, <<"Invalid `db`">>}) + end; + +validate_db(_bName) -> + throw({bad_request, <<"Invalid `db`">>}). + + +validate_range(undefined) -> + undefined; + +validate_range(<<BBin:8/binary, "-", EBin:8/binary>>) -> + {B, E} = try + { + httpd_util:hexlist_to_integer(binary_to_list(BBin)), + httpd_util:hexlist_to_integer(binary_to_list(EBin)) + } + catch + _:_ -> + invalid_range() + end, + if + B < 0 -> invalid_range(); + E < 0 -> invalid_range(); + B > (2 bsl 31) - 1 -> invalid_range(); + E > (2 bsl 31) - 1 -> invalid_range(); + B >= E -> invalid_range(); + true -> ok + end, + % Use a list format here to make it look the same as #shard's range + [B, E]; + +validate_range(_Range) -> + invalid_range(). + + +invalid_range() -> + throw({bad_request, <<"Invalid `range`">>}). diff --git a/src/mem3/test/mem3_reshard_api_test.erl b/src/mem3/test/mem3_reshard_api_test.erl new file mode 100644 index 000000000..c8be28591 --- /dev/null +++ b/src/mem3/test/mem3_reshard_api_test.erl @@ -0,0 +1,817 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(mem3_reshard_api_test). + + +-include_lib("couch/include/couch_eunit.hrl"). +-include_lib("couch/include/couch_db.hrl"). +-include_lib("mem3/src/mem3_reshard.hrl"). + + +-define(USER, "mem3_reshard_api_test_admin"). +-define(PASS, "pass"). +-define(AUTH, {basic_auth, {?USER, ?PASS}}). +-define(JSON, {"Content-Type", "application/json"}). +-define(RESHARD, "_reshard/"). +-define(JOBS, "_reshard/jobs/"). +-define(STATE, "_reshard/state"). +-define(ID, <<"id">>). +-define(OK, <<"ok">>). + + +setup() -> + Hashed = couch_passwords:hash_admin_password(?PASS), + ok = config:set("admins", ?USER, ?b2l(Hashed), _Persist=false), + Addr = config:get("chttpd", "bind_address", "127.0.0.1"), + Port = mochiweb_socket_server:get(chttpd, port), + Url = lists:concat(["http://", Addr, ":", Port, "/"]), + {Db1, Db2, Db3} = {?tempdb(), ?tempdb(), ?tempdb()}, + create_db(Url, Db1, "?q=1&n=1"), + create_db(Url, Db2, "?q=1&n=1"), + create_db(Url, Db3, "?q=2&n=1"), + {Url, {Db1, Db2, Db3}}. + + +teardown({Url, {Db1, Db2, Db3}}) -> + mem3_reshard:reset_state(), + application:unset_env(mem3, reshard_disabled), + delete_db(Url, Db1), + delete_db(Url, Db2), + delete_db(Url, Db3), + ok = config:delete("reshard", "max_jobs", _Persist=false), + ok = config:delete("admins", ?USER, _Persist=false), + meck:unload(). + + +start_couch() -> + test_util:start_couch([mem3, chttpd]). + + +stop_couch(Ctx) -> + test_util:stop_couch(Ctx). + + +mem3_reshard_api_test_() -> + { + "mem3 shard split api tests", + { + setup, + fun start_couch/0, fun stop_couch/1, + { + foreach, + fun setup/0, fun teardown/1, + [ + fun basics/1, + fun create_job_basic/1, + fun create_two_jobs/1, + fun create_multiple_jobs_from_one_post/1, + fun start_stop_cluster_basic/1, + fun test_disabled/1, + fun start_stop_cluster_with_a_job/1, + fun individual_job_start_stop/1, + fun individual_job_stop_when_cluster_stopped/1, + fun create_job_with_invalid_arguments/1, + fun create_job_with_db/1, + fun create_job_with_shard_name/1, + fun completed_job_handling/1, + fun handle_db_deletion_in_initial_copy/1, + fun handle_db_deletion_in_topoff1/1, + fun handle_db_deletion_in_copy_local_docs/1, + fun handle_db_deletion_in_build_indices/1, + fun handle_db_deletion_in_update_shard_map/1, + fun handle_db_deletion_in_wait_source_close/1, + fun recover_in_initial_copy/1, + fun recover_in_topoff1/1, + fun recover_in_copy_local_docs/1, + fun recover_in_build_indices/1, + fun recover_in_update_shard_map/1, + fun recover_in_wait_source_close/1, + fun recover_in_topoff3/1, + fun recover_in_source_delete/1, + fun check_max_jobs/1, + fun cleanup_completed_jobs/1 + ] + } + } + }. + + +basics({Top, _}) -> + ?_test(begin + % GET /_reshard + ?assertMatch({200, #{ + <<"state">> := <<"running">>, + <<"state_reason">> := null, + <<"completed">> := 0, + <<"failed">> := 0, + <<"running">> := 0, + <<"stopped">> := 0, + <<"total">> := 0 + }}, req(get, Top ++ ?RESHARD)), + + % GET _reshard/state + ?assertMatch({200, #{<<"state">> := <<"running">>}}, + req(get, Top ++ ?STATE)), + + % GET _reshard/jobs + ?assertMatch({200, #{ + <<"jobs">> := [], + <<"offset">> := 0, + <<"total_rows">> := 0 + }}, req(get, Top ++ ?JOBS)), + + % Some invalid paths and methods + ?assertMatch({404, _}, req(get, Top ++ ?RESHARD ++ "/invalidpath")), + ?assertMatch({405, _}, req(put, Top ++ ?RESHARD, #{dont => thinkso})), + ?assertMatch({405, _}, req(post, Top ++ ?RESHARD, #{nope => nope})) + end). + + +create_job_basic({Top, {Db1, _, _}}) -> + ?_test(begin + % POST /_reshard/jobs + {C1, R1} = req(post, Top ++ ?JOBS, #{type => split, db => Db1}), + ?assertEqual(201, C1), + ?assertMatch([#{?OK := true, ?ID := J, <<"shard">> := S}] + when is_binary(J) andalso is_binary(S), R1), + [#{?ID := Id, <<"shard">> := Shard}] = R1, + + % GET /_reshard/jobs + ?assertMatch({200, #{ + <<"jobs">> := [#{?ID := Id, <<"type">> := <<"split">>}], + <<"offset">> := 0, + <<"total_rows">> := 1 + }}, req(get, Top ++ ?JOBS)), + + % GET /_reshard/job/$jobid + {C2, R2} = req(get, Top ++ ?JOBS ++ ?b2l(Id)), + ?assertEqual(200, C2), + ThisNode = atom_to_binary(node(), utf8), + ?assertMatch(#{?ID := Id}, R2), + ?assertMatch(#{<<"type">> := <<"split">>}, R2), + ?assertMatch(#{<<"source">> := Shard}, R2), + ?assertMatch(#{<<"history">> := History} when length(History) > 1, R2), + ?assertMatch(#{<<"node">> := ThisNode}, R2), + ?assertMatch(#{<<"split_state">> := SSt} when is_binary(SSt), R2), + ?assertMatch(#{<<"job_state">> := JSt} when is_binary(JSt), R2), + ?assertMatch(#{<<"state_info">> := #{}}, R2), + ?assertMatch(#{<<"target">> := Target} when length(Target) == 2, R2), + + % GET /_reshard/job/$jobid/state + ?assertMatch({200, #{<<"state">> := S, <<"reason">> := R}} + when is_binary(S) andalso (is_binary(R) orelse R =:= null), + req(get, Top ++ ?JOBS ++ ?b2l(Id) ++ "/state")), + + % GET /_reshard + ?assertMatch({200, #{<<"state">> := <<"running">>, <<"total">> := 1}}, + req(get, Top ++ ?RESHARD)), + + % DELETE /_reshard/jobs/$jobid + ?assertMatch({200, #{?OK := true}}, + req(delete, Top ++ ?JOBS ++ ?b2l(Id))), + + % GET _reshard/jobs + ?assertMatch({200, #{<<"jobs">> := [], <<"total_rows">> := 0}}, + req(get, Top ++ ?JOBS)), + + % GET /_reshard/job/$jobid should be a 404 + ?assertMatch({404, #{}}, req(get, Top ++ ?JOBS ++ ?b2l(Id))), + + % DELETE /_reshard/jobs/$jobid should be a 404 as well + ?assertMatch({404, #{}}, req(delete, Top ++ ?JOBS ++ ?b2l(Id))) + end). + + +create_two_jobs({Top, {Db1, Db2, _}}) -> + ?_test(begin + Jobs = Top ++ ?JOBS, + + ?assertMatch({201, [#{?OK := true}]}, + req(post, Jobs, #{type => split, db => Db1})), + ?assertMatch({201, [#{?OK := true}]}, + req(post, Jobs, #{type => split, db => Db2})), + + ?assertMatch({200, #{<<"total">> := 2}}, req(get, Top ++ ?RESHARD)), + + ?assertMatch({200, #{ + <<"jobs">> := [#{?ID := Id1}, #{?ID := Id2}], + <<"offset">> := 0, + <<"total_rows">> := 2 + }} when Id1 =/= Id2, req(get, Jobs)), + + {200, #{<<"jobs">> := [#{?ID := Id1}, #{?ID := Id2}]}} = req(get, Jobs), + + {200, #{?OK := true}} = req(delete, Jobs ++ ?b2l(Id1)), + ?assertMatch({200, #{<<"total">> := 1}}, req(get, Top ++ ?RESHARD)), + {200, #{?OK := true}} = req(delete, Jobs ++ ?b2l(Id2)), + ?assertMatch({200, #{<<"total">> := 0}}, req(get, Top ++ ?RESHARD)) + end). + + +create_multiple_jobs_from_one_post({Top, {_, _, Db3}}) -> + ?_test(begin + Jobs = Top ++ ?JOBS, + {C1, R1} = req(post, Jobs, #{type => split, db => Db3}), + ?assertMatch({201, [#{?OK := true}, #{?OK := true}]}, {C1, R1}), + ?assertMatch({200, #{<<"total">> := 2}}, req(get, Top ++ ?RESHARD)) + end). + + +start_stop_cluster_basic({Top, _}) -> + ?_test(begin + Url = Top ++ ?STATE, + + ?assertMatch({200, #{ + <<"state">> := <<"running">>, + <<"reason">> := null + }}, req(get, Url)), + + ?assertMatch({200, _}, req(put, Url, #{state => stopped})), + ?assertMatch({200, #{ + <<"state">> := <<"stopped">>, + <<"reason">> := R + }} when is_binary(R), req(get, Url)), + + ?assertMatch({200, _}, req(put, Url, #{state => running})), + + % Make sure the reason shows in the state GET request + Reason = <<"somereason">>, + ?assertMatch({200, _}, req(put, Url, #{state => stopped, + reason => Reason})), + ?assertMatch({200, #{<<"state">> := <<"stopped">>, + <<"reason">> := Reason}}, req(get, Url)), + + % Top level summary also shows the reason + ?assertMatch({200, #{ + <<"state">> := <<"stopped">>, + <<"state_reason">> := Reason + }}, req(get, Top ++ ?RESHARD)), + ?assertMatch({200, _}, req(put, Url, #{state => running})), + ?assertMatch({200, #{<<"state">> := <<"running">>}}, req(get, Url)) + end). + + +test_disabled({Top, _}) -> + ?_test(begin + application:set_env(mem3, reshard_disabled, true), + ?assertMatch({501, _}, req(get, Top ++ ?RESHARD)), + ?assertMatch({501, _}, req(put, Top ++ ?STATE, #{state => running})), + + application:unset_env(mem3, reshard_disabled), + ?assertMatch({200, _}, req(get, Top ++ ?RESHARD)), + ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => running})) + end). + + +start_stop_cluster_with_a_job({Top, {Db1, _, _}}) -> + ?_test(begin + Url = Top ++ ?STATE, + + ?assertMatch({200, _}, req(put, Url, #{state => stopped})), + ?assertMatch({200, #{<<"state">> := <<"stopped">>}}, req(get, Url)), + + % Can add jobs with global state stopped, they just won't be running + {201, R1} = req(post, Top ++ ?JOBS, #{type => split, db => Db1}), + ?assertMatch([#{?OK := true}], R1), + [#{?ID := Id1}] = R1, + {200, J1} = req(get, Top ++ ?JOBS ++ ?b2l(Id1)), + ?assertMatch(#{?ID := Id1, <<"job_state">> := <<"stopped">>}, J1), + % Check summary stats + ?assertMatch({200, #{ + <<"state">> := <<"stopped">>, + <<"running">> := 0, + <<"stopped">> := 1, + <<"total">> := 1 + }}, req(get, Top ++ ?RESHARD)), + + % Can delete the job when stopped + {200, #{?OK := true}} = req(delete, Top ++ ?JOBS ++ ?b2l(Id1)), + ?assertMatch({200, #{ + <<"state">> := <<"stopped">>, + <<"running">> := 0, + <<"stopped">> := 0, + <<"total">> := 0 + }}, req(get, Top ++ ?RESHARD)), + + % Add same job again + {201, [#{?ID := Id2}]} = req(post, Top ++ ?JOBS, #{type => split, + db => Db1}), + ?assertMatch({200, #{?ID := Id2, <<"job_state">> := <<"stopped">>}}, + req(get, Top ++ ?JOBS ++ ?b2l(Id2))), + + % Job should start after resharding is started on the cluster + ?assertMatch({200, _}, req(put, Url, #{state => running})), + ?assertMatch({200, #{?ID := Id2, <<"job_state">> := JSt}} + when JSt =/= <<"stopped">>, req(get, Top ++ ?JOBS ++ ?b2l(Id2))) + end). + + +individual_job_start_stop({Top, {Db1, _, _}}) -> + ?_test(begin + intercept_state(topoff1), + + Body = #{type => split, db => Db1}, + {201, [#{?ID := Id}]} = req(post, Top ++ ?JOBS, Body), + + JobUrl = Top ++ ?JOBS ++ ?b2l(Id), + StUrl = JobUrl ++ "/state", + + % Wait for the the job to start running and intercept it in topoff1 state + receive {JobPid, topoff1} -> ok end, + % Tell the intercept to never finish checkpointing so job is left hanging + % forever in running state + JobPid ! cancel, + ?assertMatch({200, #{<<"state">> := <<"running">>}}, req(get, StUrl)), + + {200, _} = req(put, StUrl, #{state => stopped}), + wait_state(StUrl, <<"stopped">>), + + % Stop/start resharding globally and job should still stay stopped + ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => stopped})), + ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => running})), + ?assertMatch({200, #{<<"state">> := <<"stopped">>}}, req(get, StUrl)), + + % Start the job again + ?assertMatch({200, _}, req(put, StUrl, #{state => running})), + % Wait for the the job to start running and intercept it in topoff1 state + receive {JobPid2, topoff1} -> ok end, + ?assertMatch({200, #{<<"state">> := <<"running">>}}, req(get, StUrl)), + % Let it continue running and it should complete eventually + JobPid2 ! continue, + wait_state(StUrl, <<"completed">>) + end). + + +individual_job_stop_when_cluster_stopped({Top, {Db1, _, _}}) -> + ?_test(begin + intercept_state(topoff1), + + Body = #{type => split, db => Db1}, + {201, [#{?ID := Id}]} = req(post, Top ++ ?JOBS, Body), + + JobUrl = Top ++ ?JOBS ++ ?b2l(Id), + StUrl = JobUrl ++ "/state", + + % Wait for the the job to start running and intercept in topoff1 + receive {JobPid, topoff1} -> ok end, + % Tell the intercept to never finish checkpointing so job is left + % hanging forever in running state + JobPid ! cancel, + ?assertMatch({200, #{<<"state">> := <<"running">>}}, req(get, StUrl)), + + % Stop resharding globally + ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => stopped})), + wait_state(StUrl, <<"stopped">>), + + % Stop the job specifically + {200, _} = req(put, StUrl, #{state => stopped}), + % Job stays stopped + ?assertMatch({200, #{<<"state">> := <<"stopped">>}}, req(get, StUrl)), + + % Set cluster to running again + ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => running})), + + % The job should stay stopped + ?assertMatch({200, #{<<"state">> := <<"stopped">>}}, req(get, StUrl)), + + % It should be possible to resume job and it should complete + ?assertMatch({200, _}, req(put, StUrl, #{state => running})), + + % Wait for the the job to start running and intercept in topoff1 state + receive {JobPid2, topoff1} -> ok end, + ?assertMatch({200, #{<<"state">> := <<"running">>}}, req(get, StUrl)), + + % Let it continue running and it should complete eventually + JobPid2 ! continue, + wait_state(StUrl, <<"completed">>) + end). + + +create_job_with_invalid_arguments({Top, {Db1, _, _}}) -> + ?_test(begin + Jobs = Top ++ ?JOBS, + + % Nothing in the body + ?assertMatch({400, _}, req(post, Jobs, #{})), + + % Missing type + ?assertMatch({400, _}, req(post, Jobs, #{db => Db1})), + + % Have type but no db and no shard + ?assertMatch({400, _}, req(post, Jobs, #{type => split})), + + % Have type and db but db is invalid + ?assertMatch({400, _}, req(post, Jobs, #{db => <<"baddb">>, + type => split})), + + % Have type and shard but shard is not an existing database + ?assertMatch({404, _}, req(post, Jobs, #{type => split, + shard => <<"shards/80000000-ffffffff/baddb.1549492084">>})), + + % Bad range values, too large, different types, inverted + ?assertMatch({400, _}, req(post, Jobs, #{db => Db1, range => 42, + type => split})), + ?assertMatch({400, _}, req(post, Jobs, #{db => Db1, + range => <<"x">>, type => split})), + ?assertMatch({400, _}, req(post, Jobs, #{db => Db1, + range => <<"ffffffff-80000000">>, type => split})), + ?assertMatch({400, _}, req(post, Jobs, #{db => Db1, + range => <<"00000000-fffffffff">>, type => split})), + + % Can't have both db and shard + ?assertMatch({400, _}, req(post, Jobs, #{type => split, db => Db1, + shard => <<"blah">>})) + end). + + +create_job_with_db({Top, {Db1, _, _}}) -> + ?_test(begin + Jobs = Top ++ ?JOBS, + Body1 = #{type => split, db => Db1}, + + % Node with db + N = atom_to_binary(node(), utf8), + {C1, R1} = req(post, Jobs, Body1#{node => N}), + ?assertMatch({201, [#{?OK := true}]}, {C1, R1}), + wait_to_complete_then_cleanup(Top, R1), + + % Range and db + {C2, R2} = req(post, Jobs, Body1#{range => <<"00000000-7fffffff">>}), + ?assertMatch({201, [#{?OK := true}]}, {C2, R2}), + wait_to_complete_then_cleanup(Top, R2), + + % Node, range and db + Range = <<"80000000-ffffffff">>, + {C3, R3} = req(post, Jobs, Body1#{range => Range, node => N}), + ?assertMatch({201, [#{?OK := true}]}, {C3, R3}), + wait_to_complete_then_cleanup(Top, R3), + + ?assertMatch([ + [16#00000000, 16#3fffffff], + [16#40000000, 16#7fffffff], + [16#80000000, 16#bfffffff], + [16#c0000000, 16#ffffffff] + ], [mem3:range(S) || S <- lists:sort(mem3:shards(Db1))]) + end). + + +create_job_with_shard_name({Top, {_, _, Db3}}) -> + ?_test(begin + Jobs = Top ++ ?JOBS, + [S1, S2] = [mem3:name(S) || S <- lists:sort(mem3:shards(Db3))], + + % Shard only + {C1, R1} = req(post, Jobs, #{type => split, shard => S1}), + ?assertMatch({201, [#{?OK := true}]}, {C1, R1}), + wait_to_complete_then_cleanup(Top, R1), + + % Shard with a node + N = atom_to_binary(node(), utf8), + {C2, R2} = req(post, Jobs, #{type => split, shard => S2, node => N}), + ?assertMatch({201, [#{?OK := true}]}, {C2, R2}), + wait_to_complete_then_cleanup(Top, R2), + + ?assertMatch([ + [16#00000000, 16#3fffffff], + [16#40000000, 16#7fffffff], + [16#80000000, 16#bfffffff], + [16#c0000000, 16#ffffffff] + ], [mem3:range(S) || S <- lists:sort(mem3:shards(Db3))]) + end). + + +completed_job_handling({Top, {Db1, _, _}}) -> + ?_test(begin + Jobs = Top ++ ?JOBS, + + % Run job to completion + {C1, R1} = req(post, Jobs, #{type => split, db => Db1}), + ?assertMatch({201, [#{?OK := true}]}, {C1, R1}), + [#{?ID := Id}] = R1, + wait_to_complete(Top, R1), + + % Check top level stats + ?assertMatch({200, #{ + <<"state">> := <<"running">>, + <<"state_reason">> := null, + <<"completed">> := 1, + <<"failed">> := 0, + <<"running">> := 0, + <<"stopped">> := 0, + <<"total">> := 1 + }}, req(get, Top ++ ?RESHARD)), + + % Job state itself + JobUrl = Jobs ++ ?b2l(Id), + ?assertMatch({200, #{ + <<"split_state">> := <<"completed">>, + <<"job_state">> := <<"completed">> + }}, req(get, JobUrl)), + + % Job's state endpoint + StUrl = Jobs ++ ?b2l(Id) ++ "/state", + ?assertMatch({200, #{<<"state">> := <<"completed">>}}, req(get, StUrl)), + + % Try to stop it and it should stay completed + {200, _} = req(put, StUrl, #{state => stopped}), + ?assertMatch({200, #{<<"state">> := <<"completed">>}}, req(get, StUrl)), + + % Try to resume it and it should stay completed + {200, _} = req(put, StUrl, #{state => running}), + ?assertMatch({200, #{<<"state">> := <<"completed">>}}, req(get, StUrl)), + + % Stop resharding globally and job should still stay completed + ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => stopped})), + ?assertMatch({200, #{<<"state">> := <<"completed">>}}, req(get, StUrl)), + + % Start resharding and job stays completed + ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => running})), + ?assertMatch({200, #{<<"state">> := <<"completed">>}}, req(get, StUrl)), + + ?assertMatch({200, #{?OK := true}}, req(delete, JobUrl)) + end). + + +handle_db_deletion_in_topoff1({Top, {Db1, _, _}}) -> + ?_test(begin + JobId = delete_source_in_state(Top, Db1, topoff1), + wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"failed">>) + end). + + +handle_db_deletion_in_initial_copy({Top, {Db1, _, _}}) -> + ?_test(begin + JobId = delete_source_in_state(Top, Db1, initial_copy), + wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"failed">>) + end). + + +handle_db_deletion_in_copy_local_docs({Top, {Db1, _, _}}) -> + ?_test(begin + JobId = delete_source_in_state(Top, Db1, copy_local_docs), + wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"failed">>) + end). + + +handle_db_deletion_in_build_indices({Top, {Db1, _, _}}) -> + ?_test(begin + JobId = delete_source_in_state(Top, Db1, build_indices), + wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"failed">>) + end). + + +handle_db_deletion_in_update_shard_map({Top, {Db1, _, _}}) -> + ?_test(begin + JobId = delete_source_in_state(Top, Db1, update_shardmap), + wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"failed">>) + end). + + +handle_db_deletion_in_wait_source_close({Top, {Db1, _, _}}) -> + ?_test(begin + JobId = delete_source_in_state(Top, Db1, wait_source_close), + wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"failed">>) + end). + + +recover_in_topoff1({Top, {Db1, _, _}}) -> + ?_test(begin + JobId = recover_in_state(Top, Db1, topoff1), + wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>) + end). + + +recover_in_initial_copy({Top, {Db1, _, _}}) -> + ?_test(begin + JobId = recover_in_state(Top, Db1, initial_copy), + wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>) + end). + + +recover_in_copy_local_docs({Top, {Db1, _, _}}) -> + ?_test(begin + JobId = recover_in_state(Top, Db1, copy_local_docs), + wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>) + end). + + +recover_in_build_indices({Top, {Db1, _, _}}) -> + ?_test(begin + JobId = recover_in_state(Top, Db1, build_indices), + wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>) + end). + + +recover_in_update_shard_map({Top, {Db1, _, _}}) -> + ?_test(begin + JobId = recover_in_state(Top, Db1, update_shardmap), + wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>) + end). + + +recover_in_wait_source_close({Top, {Db1, _, _}}) -> + ?_test(begin + JobId = recover_in_state(Top, Db1, wait_source_close), + wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>) + end). + + +recover_in_topoff3({Top, {Db1, _, _}}) -> + ?_test(begin + JobId = recover_in_state(Top, Db1, topoff3), + wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>) + end). + + +recover_in_source_delete({Top, {Db1, _, _}}) -> + ?_test(begin + JobId = recover_in_state(Top, Db1, source_delete), + wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>) + end). + + +check_max_jobs({Top, {Db1, Db2, _}}) -> + ?_test(begin + Jobs = Top ++ ?JOBS, + + config:set("reshard", "max_jobs", "0", _Persist=false), + {C1, R1} = req(post, Jobs, #{type => split, db => Db1}), + ?assertMatch({500, [#{<<"error">> := <<"max_jobs_exceeded">>}]}, {C1, R1}), + + config:set("reshard", "max_jobs", "1", _Persist=false), + {201, R2} = req(post, Jobs, #{type => split, db => Db1}), + wait_to_complete(Top, R2), + + % Stop clustering so jobs are not started anymore and ensure max jobs + % is enforced even if jobs are stopped + ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => stopped})), + + {C3, R3} = req(post, Jobs, #{type => split, db => Db2}), + ?assertMatch({500, [#{<<"error">> := <<"max_jobs_exceeded">>}]}, + {C3, R3}), + + % Allow the job to be created by raising max_jobs + config:set("reshard", "max_jobs", "2", _Persist=false), + + {C4, R4} = req(post, Jobs, #{type => split, db => Db2}), + ?assertEqual(201, C4), + + % Lower max_jobs after job is created but it's not running + config:set("reshard", "max_jobs", "1", _Persist=false), + + % Start resharding again + ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => running})), + + % Jobs that have been created already are not removed if max jobs is lowered + % so make sure the job completes + wait_to_complete(Top, R4) + end). + + +cleanup_completed_jobs({Top, {Db1, _, _}}) -> + ?_test(begin + Body = #{type => split, db => Db1}, + {201, [#{?ID := Id}]} = req(post, Top ++ ?JOBS, Body), + JobUrl = Top ++ ?JOBS ++ ?b2l(Id), + wait_state(JobUrl ++ "/state", <<"completed">>), + delete_db(Top, Db1), + wait_for_http_code(JobUrl, 404) + end). + + +% Test help functions + +wait_to_complete_then_cleanup(Top, Jobs) -> + JobsUrl = Top ++ ?JOBS, + lists:foreach(fun(#{?ID := Id}) -> + wait_state(JobsUrl ++ ?b2l(Id) ++ "/state", <<"completed">>), + {200, _} = req(delete, JobsUrl ++ ?b2l(Id)) + end, Jobs). + + +wait_to_complete(Top, Jobs) -> + JobsUrl = Top ++ ?JOBS, + lists:foreach(fun(#{?ID := Id}) -> + wait_state(JobsUrl ++ ?b2l(Id) ++ "/state", <<"completed">>) + end, Jobs). + + +intercept_state(State) -> + TestPid = self(), + meck:new(mem3_reshard_job, [passthrough]), + meck:expect(mem3_reshard_job, checkpoint_done, fun(Job) -> + case Job#job.split_state of + State -> + TestPid ! {self(), State}, + receive + continue -> meck:passthrough([Job]); + cancel -> ok + end; + _ -> + meck:passthrough([Job]) + end + end). + + +cancel_intercept() -> + meck:expect(mem3_reshard_job, checkpoint_done, fun(Job) -> + meck:passthrough([Job]) + end). + + +wait_state(Url, State) -> + test_util:wait(fun() -> + case req(get, Url) of + {200, #{<<"state">> := State}} -> ok; + {200, #{}} -> timer:sleep(100), wait + end + end, 30000). + + +wait_for_http_code(Url, Code) when is_integer(Code) -> + test_util:wait(fun() -> + case req(get, Url) of + {Code, _} -> ok; + {_, _} -> timer:sleep(100), wait + end + end, 30000). + + +delete_source_in_state(Top, Db, State) when is_atom(State), is_binary(Db) -> + intercept_state(State), + Body = #{type => split, db => Db}, + {201, [#{?ID := Id}]} = req(post, Top ++ ?JOBS, Body), + receive {JobPid, State} -> ok end, + sync_delete_db(Top, Db), + JobPid ! continue, + Id. + + +recover_in_state(Top, Db, State) when is_atom(State) -> + intercept_state(State), + Body = #{type => split, db => Db}, + {201, [#{?ID := Id}]} = req(post, Top ++ ?JOBS, Body), + receive {JobPid, State} -> ok end, + % Job is now stuck in running we prevented it from executing + % the given state + JobPid ! cancel, + % Now restart resharding + ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => stopped})), + cancel_intercept(), + ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => running})), + Id. + + +create_db(Top, Db, QArgs) when is_binary(Db) -> + Url = Top ++ binary_to_list(Db) ++ QArgs, + {ok, Status, _, _} = test_request:put(Url, [?JSON, ?AUTH], "{}"), + ?assert(Status =:= 201 orelse Status =:= 202). + + +delete_db(Top, Db) when is_binary(Db) -> + Url = Top ++ binary_to_list(Db), + case test_request:get(Url, [?AUTH]) of + {ok, 404, _, _} -> + not_found; + {ok, 200, _, _} -> + {ok, 200, _, _} = test_request:delete(Url, [?AUTH]), + ok + end. + + +sync_delete_db(Top, Db) when is_binary(Db) -> + delete_db(Top, Db), + try + Shards = mem3:local_shards(Db), + ShardNames = [mem3:name(S) || S <- Shards], + [couch_server:delete(N, [?ADMIN_CTX]) || N <- ShardNames], + ok + catch + error:database_does_not_exist -> + ok + end. + + +req(Method, Url) -> + Headers = [?AUTH], + {ok, Code, _, Res} = test_request:request(Method, Url, Headers), + {Code, jiffy:decode(Res, [return_maps])}. + + +req(Method, Url, #{} = Body) -> + req(Method, Url, jiffy:encode(Body)); + +req(Method, Url, Body) -> + Headers = [?JSON, ?AUTH], + {ok, Code, _, Res} = test_request:request(Method, Url, Headers, Body), + {Code, jiffy:decode(Res, [return_maps])}. diff --git a/src/mem3/test/mem3_reshard_changes_feed_test.erl b/src/mem3/test/mem3_reshard_changes_feed_test.erl new file mode 100644 index 000000000..52e18fb26 --- /dev/null +++ b/src/mem3/test/mem3_reshard_changes_feed_test.erl @@ -0,0 +1,388 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(mem3_reshard_changes_feed_test). + + +-include_lib("couch/include/couch_eunit.hrl"). +-include_lib("couch/include/couch_db.hrl"). +-include_lib("mem3/src/mem3_reshard.hrl"). + + +-define(assertChanges(Expected, Received), + begin + ((fun() -> + ExpectedIDs = lists:sort([I || #{id := I} <- Expected]), + ReceivedIDs = lists:sort([I || #{id := I} <- Received]), + ?assertEqual(ExpectedIDs, ReceivedIDs) + end)()) + end). + + +setup() -> + Db1 = ?tempdb(), + create_db(Db1, [{q, 1}, {n, 1}]), + #{db1 => Db1}. + + +teardown(#{} = Dbs) -> + mem3_reshard:reset_state(), + maps:map(fun(_, Db) -> delete_db(Db) end, Dbs). + + +start_couch() -> + test_util:start_couch(?CONFIG_CHAIN, [mem3, fabric]). + + +stop_couch(Ctx) -> + test_util:stop_couch(Ctx). + + +mem3_reshard_changes_feed_test_() -> + { + "mem3 shard split changes feed tests", + { + setup, + fun start_couch/0, fun stop_couch/1, + { + foreach, + fun setup/0, fun teardown/1, + [ + fun normal_feed_should_work_after_split/1, + fun continuous_feed_should_work_during_split/1 + ] + } + } + }. + + +normal_feed_should_work_after_split(#{db1 := Db}) -> + ?_test(begin + DocSpec = #{ + docs => [1, 10], + delete => [5, 6] + }, + add_test_docs(Db, DocSpec), + + % gather pre-shard changes + BaseArgs = #changes_args{feed = "normal", dir = fwd, since = 0}, + {ok, OldChanges, OldEndSeq} = get_changes_feed(Db, BaseArgs), + + % Split the shard + split_and_wait(Db), + + % verify changes list consistent for all the old seqs + lists:foldl(fun(#{seq := Seq} = C, ExpectedChanges) -> + Args = BaseArgs#changes_args{since = Seq}, + {ok, Changes, _EndSeq} = get_changes_feed(Db, Args), + ?assertChanges(ExpectedChanges, Changes), + [C | ExpectedChanges] + end, [], OldChanges), + + % confirm that old LastSeq respected + Args1 = BaseArgs#changes_args{since = OldEndSeq}, + {ok, Changes1, EndSeq1} = get_changes_feed(Db, Args1), + ?assertChanges([], Changes1), + + % confirm that new LastSeq also respected + Args2 = BaseArgs#changes_args{since = EndSeq1}, + {ok, Changes2, EndSeq2} = get_changes_feed(Db, Args2), + ?assertChanges([], Changes2), + ?assertEqual(EndSeq2, EndSeq1), + + % confirm we didn't lost any changes and have consistent last seq + {ok, Changes3, EndSeq3} = get_changes_feed(Db, BaseArgs), + ?assertChanges(OldChanges, Changes3), + + % add some docs + add_test_docs(Db, #{docs => [11, 15]}), + Args4 = BaseArgs#changes_args{since = EndSeq3}, + {ok, Changes4, EndSeq4} = get_changes_feed(Db, Args4), + AddedChanges = [#{id => ID} || #doc{id = ID} <- docs([11, 15])], + ?assertChanges(AddedChanges, Changes4), + + % confirm include_docs and deleted works + Args5 = BaseArgs#changes_args{include_docs = true}, + {ok, Changes5, EndSeq5} = get_changes_feed(Db, Args5), + ?assertEqual(EndSeq4, EndSeq5), + [SampleChange] = [C || #{id := ID} = C <- Changes5, ID == <<"00005">>], + ?assertMatch(#{deleted := true}, SampleChange), + ?assertMatch(#{doc := {Body}} when is_list(Body), SampleChange), + + % update and delete some pre and post split docs + AllDocs = [couch_doc:from_json_obj(Doc) || #{doc := Doc} <- Changes5], + UpdateDocs = lists:filtermap(fun + (#doc{id = <<"00002">>}) -> true; + (#doc{id = <<"00012">>}) -> true; + (#doc{id = <<"00004">>} = Doc) -> {true, Doc#doc{deleted = true}}; + (#doc{id = <<"00014">>} = Doc) -> {true, Doc#doc{deleted = true}}; + (_) -> false + end, AllDocs), + update_docs(Db, UpdateDocs), + + Args6 = BaseArgs#changes_args{since = EndSeq5}, + {ok, Changes6, EndSeq6} = get_changes_feed(Db, Args6), + UpdatedChanges = [#{id => ID} || #doc{id = ID} <- UpdateDocs], + ?assertChanges(UpdatedChanges, Changes6), + [#{seq := Seq6} | _] = Changes6, + ?assertEqual(EndSeq6, Seq6), + + Args7 = Args6#changes_args{dir = rev, limit = 4}, + {ok, Changes7, EndSeq7} = get_changes_feed(Db, Args7), + ?assertEqual(4, length(Changes7)), + [#{seq := Seq7} | _] = Changes7, + ?assertEqual(EndSeq7, Seq7) + end). + + +continuous_feed_should_work_during_split(#{db1 := Db}) -> + ?_test(begin + {UpdaterPid, UpdaterRef} = spawn_monitor(fun() -> + Updater = fun U({State, I}) -> + receive + {get_state, {Pid, Ref}} -> + Pid ! {state, Ref, {State, I}}, + U({State, I}); + add -> + DocSpec = #{docs => [I, I]}, + add_test_docs(Db, DocSpec), + U({State, I + 1}); + split -> + spawn_monitor(fun() -> split_and_wait(Db) end), + U({"in_process", I}); + stop -> + receive {'DOWN', _, process, _, _} -> ok end, + ok + end + end, + Updater({"before", 1}) + end), + + Callback = fun + (start, Acc) -> + {ok, Acc}; + (waiting_for_updates, Acc) -> + Ref = make_ref(), + UpdaterPid ! {get_state, {self(), Ref}}, + receive {state, Ref, {State, _}} -> ok end, + case {State, length(Acc)} of + {"before", N} when N < 5 -> + UpdaterPid ! add, + {ok, Acc}; + {"before", _} -> + UpdaterPid ! split, + {ok, Acc}; + {"in_process", N} when N < 10 -> + UpdaterPid ! add, + {ok, Acc}; + {"in_process", _} -> + {ok, Acc} + end; + (timeout, Acc) -> + {ok, Acc}; + ({change, {Change}}, Acc) -> + CM = maps:from_list(Change), + {ok, [CM | Acc]}; + ({stop, EndSeq, _Pending}, Acc) -> + % Notice updater is still running + {stop, EndSeq, Acc} + end, + + BaseArgs = #changes_args{ + feed = "continuous", + heartbeat = 100, + timeout = 1000 + }, + StopResult = get_changes_feed(Db, BaseArgs, Callback), + + % Changes feed stopped when source shard was deleted + ?assertMatch({stop, _, _}, StopResult), + {stop, StopEndSeq, StopChanges} = StopResult, + + % Add 5 extra docs to the db right after changes feed was stopped + [UpdaterPid ! add || _ <- lists:seq(1, 5)], + + % The the number of documents that updater had added + Ref = make_ref(), + UpdaterPid ! {get_state, {self(), Ref}}, + DocCount = receive {state, Ref, {_, I}} -> I - 1 end, + + UpdaterPid ! stop, + receive + {'DOWN', UpdaterRef, process, UpdaterPid, normal} -> + ok; + {'DOWN', UpdaterRef, process, UpdaterPid, Error} -> + erlang:error({test_context_failed, [ + {module, ?MODULE}, + {line, ?LINE}, + {value, Error}, + {reason, "Updater died"}]}) + end, + + AfterArgs = #changes_args{feed = "normal", since = StopEndSeq}, + {ok, AfterChanges, _} = get_changes_feed(Db, AfterArgs), + DocIDs = [Id || #{id := Id} <- StopChanges ++ AfterChanges], + ExpectedDocIDs = [doc_id(<<>>, N) || N <- lists:seq(1, DocCount)], + ?assertEqual(ExpectedDocIDs, lists:usort(DocIDs)) + end). + + +split_and_wait(Db) -> + [#shard{name = Shard}] = lists:sort(mem3:local_shards(Db)), + {ok, JobId} = mem3_reshard:start_split_job(Shard), + wait_state(JobId, completed), + ResultShards = lists:sort(mem3:local_shards(Db)), + ?assertEqual(2, length(ResultShards)). + + +wait_state(JobId, State) -> + test_util:wait(fun() -> + case mem3_reshard:job(JobId) of + {ok, {Props}} -> + case couch_util:get_value(job_state, Props) of + State -> ok; + _ -> timer:sleep(100), wait + end; + {error, not_found} -> timer:sleep(100), wait + end + end, 30000). + + +get_changes_feed(Db, Args) -> + get_changes_feed(Db, Args, fun changes_callback/2). + + +get_changes_feed(Db, Args, Callback) -> + with_proc(fun() -> + fabric:changes(Db, Callback, [], Args) + end). + + +changes_callback(start, Acc) -> + {ok, Acc}; +changes_callback({change, {Change}}, Acc) -> + CM = maps:from_list(Change), + {ok, [CM | Acc]}; +changes_callback({stop, EndSeq, _Pending}, Acc) -> + {ok, Acc, EndSeq}. + + +%% common helpers from here + + +create_db(DbName, Opts) -> + GL = erlang:group_leader(), + with_proc(fun() -> fabric:create_db(DbName, Opts) end, GL). + + +delete_db(DbName) -> + GL = erlang:group_leader(), + with_proc(fun() -> fabric:delete_db(DbName, [?ADMIN_CTX]) end, GL). + + +with_proc(Fun) -> + with_proc(Fun, undefined, 30000). + + +with_proc(Fun, GroupLeader) -> + with_proc(Fun, GroupLeader, 30000). + + +with_proc(Fun, GroupLeader, Timeout) -> + {Pid, Ref} = spawn_monitor(fun() -> + case GroupLeader of + undefined -> ok; + _ -> erlang:group_leader(GroupLeader, self()) + end, + exit({with_proc_res, Fun()}) + end), + receive + {'DOWN', Ref, process, Pid, {with_proc_res, Res}} -> + Res; + {'DOWN', Ref, process, Pid, Error} -> + error(Error) + after Timeout -> + erlang:demonitor(Ref, [flush]), + exit(Pid, kill), + error({with_proc_timeout, Fun, Timeout}) + end. + + +add_test_docs(DbName, #{} = DocSpec) -> + Docs = docs(maps:get(docs, DocSpec, [])), + Res = update_docs(DbName, Docs), + Docs1 = lists:map(fun({Doc, {ok, {RevPos, Rev}}}) -> + Doc#doc{revs = {RevPos, [Rev]}} + end, lists:zip(Docs, Res)), + case delete_docs(maps:get(delete, DocSpec, []), Docs1) of + [] -> ok; + [_ | _] = Deleted -> update_docs(DbName, Deleted) + end, + ok. + + +update_docs(DbName, Docs) -> + with_proc(fun() -> + case fabric:update_docs(DbName, Docs, [?ADMIN_CTX]) of + {accepted, Res} -> Res; + {ok, Res} -> Res + end + end). + + +delete_docs([S, E], Docs) when E >= S -> + ToDelete = [doc_id(<<"">>, I) || I <- lists:seq(S, E)], + lists:filtermap(fun(#doc{id = Id} = Doc) -> + case lists:member(Id, ToDelete) of + true -> {true, Doc#doc{deleted = true}}; + false -> false + end + end, Docs); +delete_docs(_, _) -> + []. + + +docs([S, E]) when E >= S -> + [doc(<<"">>, I) || I <- lists:seq(S, E)]; +docs(_) -> + []. + + +doc(Pref, Id) -> + Body = [{<<"a">>, <<"b">>}], + doc(Pref, Id, Body, 42). + + +doc(Pref, Id, BodyProps, AttSize) -> + #doc{ + id = doc_id(Pref, Id), + body = {BodyProps}, + atts = atts(AttSize) + }. + + +doc_id(Pref, Id) -> + IdBin = iolist_to_binary(io_lib:format("~5..0B", [Id])), + <<Pref/binary, IdBin/binary>>. + + +atts(0) -> + []; + +atts(Size) when is_integer(Size), Size >= 1 -> + Data = << <<"x">> || _ <- lists:seq(1, Size) >>, + [couch_att:new([ + {name, <<"att">>}, + {type, <<"app/binary">>}, + {att_len, Size}, + {data, Data} + ])]. diff --git a/src/mem3/test/mem3_reshard_test.erl b/src/mem3/test/mem3_reshard_test.erl new file mode 100644 index 000000000..8c4479656 --- /dev/null +++ b/src/mem3/test/mem3_reshard_test.erl @@ -0,0 +1,804 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(mem3_reshard_test). + + +-include_lib("couch/include/couch_eunit.hrl"). +-include_lib("couch/include/couch_db.hrl"). +-include_lib("mem3/src/mem3_reshard.hrl"). +-include_lib("couch_mrview/include/couch_mrview.hrl"). % for all_docs function + +-define(ID, <<"_id">>). + +setup() -> + HaveDreyfus = code:lib_dir(dreyfus) /= {error, bad_name}, + case HaveDreyfus of false -> ok; true -> + mock_dreyfus_indices() + end, + + HaveHastings = code:lib_dir(hastings) /= {error, bad_name}, + case HaveHastings of false -> ok; true -> + mock_hastings_indices() + end, + {Db1, Db2} = {?tempdb(), ?tempdb()}, + create_db(Db1, [{q, 1}, {n, 1}]), + PartProps = [{partitioned, true}, {hash, [couch_partition, hash, []]}], + create_db(Db2, [{q, 1}, {n, 1}, {props, PartProps}]), + config:set("reshard", "retry_interval_sec", "0", _Persist=false), + #{db1 => Db1, db2 => Db2}. + + +teardown(#{} = Dbs) -> + mem3_reshard:reset_state(), + maps:map(fun(_, Db) -> delete_db(Db) end, Dbs), + config:delete("reshard", "retry_interval_sec", _Persist=false), + meck:unload(). + + +start_couch() -> + test_util:start_couch(?CONFIG_CHAIN, [mem3, fabric]). + + +stop_couch(Ctx) -> + test_util:stop_couch(Ctx). + + +mem3_reshard_db_test_() -> + { + "mem3 shard split db tests", + { + setup, + fun start_couch/0, fun stop_couch/1, + { + foreach, + fun setup/0, fun teardown/1, + [ + fun split_one_shard/1, + fun update_docs_before_topoff1/1, + fun indices_are_built/1, + fun split_partitioned_db/1, + fun split_twice/1, + fun couch_events_are_emitted/1, + fun retries_work/1, + fun target_reset_in_initial_copy/1, + fun split_an_incomplete_shard_map/1 + ] + } + } + }. + + +% This is a basic test to check that shard splitting preserves documents, and +% db meta props like revs limits and security. +split_one_shard(#{db1 := Db}) -> + ?_test(begin + DocSpec = #{docs => 10, delete => [5, 9], mrview => 1, local => 1}, + add_test_docs(Db, DocSpec), + + % Save documents before the split + Docs0 = get_all_docs(Db), + Local0 = get_local_docs(Db), + + % Set some custom metadata properties + set_revs_limit(Db, 942), + set_purge_infos_limit(Db, 943), + SecObj = {[{<<"foo">>, <<"bar">>}]}, + set_security(Db, SecObj), + + % DbInfo is saved after setting metadata bits + % as those could bump the update sequence + DbInfo0 = get_db_info(Db), + + % Split the one shard + [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)), + {ok, JobId} = mem3_reshard:start_split_job(Shard), + wait_state(JobId, completed), + + % Perform some basic checks that the shard was split + Shards1 = lists:sort(mem3:local_shards(Db)), + ?assertEqual(2, length(Shards1)), + [#shard{range = R1}, #shard{range = R2}] = Shards1, + ?assertEqual([16#00000000, 16#7fffffff], R1), + ?assertEqual([16#80000000, 16#ffffffff], R2), + + % Check metadata bits after the split + ?assertEqual(942, get_revs_limit(Db)), + ?assertEqual(943, get_purge_infos_limit(Db)), + ?assertEqual(SecObj, get_security(Db)), + + DbInfo1 = get_db_info(Db), + Docs1 = get_all_docs(Db), + Local1 = get_local_docs(Db), + + % When comparing db infos, ignore update sequences they won't be the + % same since they are more shards involved after the split + ?assertEqual(without_seqs(DbInfo0), without_seqs(DbInfo1)), + + % Update seq prefix number is a sum of all shard update sequences + #{<<"update_seq">> := UpdateSeq0} = update_seq_to_num(DbInfo0), + #{<<"update_seq">> := UpdateSeq1} = update_seq_to_num(DbInfo1), + ?assertEqual(UpdateSeq0 * 2, UpdateSeq1), + + % Finally compare that the documents are still there after the split + ?assertEqual(Docs0, Docs1), + + % Don't forget about the local but don't include internal checkpoints + % as some of those are munged and transformed during the split + ?assertEqual(without_meta_locals(Local0), without_meta_locals(Local1)) + end). + + +% This test checks that document added while the shard is being split are not +% lost. Topoff1 state happens before indices are built +update_docs_before_topoff1(#{db1 := Db}) -> + ?_test(begin + add_test_docs(Db, #{docs => 10}), + + intercept_state(topoff1), + + [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)), + {ok, JobId} = mem3_reshard:start_split_job(Shard), + + receive {JobPid, topoff1} -> ok end, + add_test_docs(Db, #{docs => [10, 19], local => 1}), + Docs0 = get_all_docs(Db), + Local0 = get_local_docs(Db), + DbInfo0 = get_db_info(Db), + JobPid ! continue, + + wait_state(JobId, completed), + + % Perform some basic checks that the shard was split + Shards1 = lists:sort(mem3:local_shards(Db)), + ?assertEqual(2, length(Shards1)), + + DbInfo1 = get_db_info(Db), + Docs1 = get_all_docs(Db), + Local1 = get_local_docs(Db), + + ?assertEqual(without_seqs(DbInfo0), without_seqs(DbInfo1)), + + % Update sequence after initial copy with 10 docs would be 10 on each + % target shard (to match the source) and the total update sequence + % would have been 20. But then 10 more docs were added (3 might have + % ended up on one target and 7 on another) so the final update sequence + % would then be 20 + 10 = 30. + ?assertMatch(#{<<"update_seq">> := 30}, update_seq_to_num(DbInfo1)), + + ?assertEqual(Docs0, Docs1), + ?assertEqual(without_meta_locals(Local0), without_meta_locals(Local1)) + end). + + +% This test that indices are built during shard splitting. +indices_are_built(#{db1 := Db}) -> + ?_test(begin + HaveDreyfus = code:lib_dir(dreyfus) /= {error, bad_name}, + HaveHastings = code:lib_dir(hastings) /= {error, bad_name}, + + add_test_docs(Db, #{docs => 10, mrview => 2, search => 2, geo => 2}), + [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)), + {ok, JobId} = mem3_reshard:start_split_job(Shard), + wait_state(JobId, completed), + Shards1 = lists:sort(mem3:local_shards(Db)), + ?assertEqual(2, length(Shards1)), + MRViewGroupInfo = get_group_info(Db, <<"_design/mrview00000">>), + ?assertMatch(#{<<"update_seq">> := 32}, MRViewGroupInfo), + + HaveDreyfus = code:lib_dir(dreyfus) /= {error, bad_name}, + case HaveDreyfus of false -> ok; true -> + % 4 because there are 2 indices and 2 target shards + ?assertEqual(4, meck:num_calls(dreyfus_index, await, 2)) + end, + + HaveHastings = code:lib_dir(hastings) /= {error, bad_name}, + case HaveHastings of false -> ok; true -> + % 4 because there are 2 indices and 2 target shards + ?assertEqual(4, meck:num_calls(hastings_index, await, 2)) + end + end). + + +mock_dreyfus_indices() -> + meck:expect(dreyfus_index, design_doc_to_indexes, fun(Doc) -> + #doc{body = {BodyProps}} = Doc, + case couch_util:get_value(<<"indexes">>, BodyProps) of + undefined -> + []; + {[_]} -> + [{dreyfus, <<"db">>, dreyfus_index1}] + end + end), + meck:expect(dreyfus_index_manager, get_index, fun(_, _) -> {ok, pid} end), + meck:expect(dreyfus_index, await, fun(_, _) -> ok end). + + +mock_hastings_indices() -> + meck:expect(hastings_index, design_doc_to_indexes, fun(Doc) -> + #doc{body = {BodyProps}} = Doc, + case couch_util:get_value(<<"st_indexes">>, BodyProps) of + undefined -> + []; + {[_]} -> + [{hastings, <<"db">>, hastings_index1}] + end + end), + meck:expect(hastings_index_manager, get_index, fun(_, _) -> {ok, pid} end), + meck:expect(hastings_index, await, fun(_, _) -> ok end). + +% Split partitioned database +split_partitioned_db(#{db2 := Db}) -> + ?_test(begin + DocSpec = #{ + pdocs => #{ + <<"PX">> => 5, + <<"PY">> => 5 + }, + mrview => 1, + local => 1 + }, + add_test_docs(Db, DocSpec), + + % Save documents before the split + Docs0 = get_all_docs(Db), + Local0 = get_local_docs(Db), + + % Set some custom metadata properties + set_revs_limit(Db, 942), + set_purge_infos_limit(Db, 943), + SecObj = {[{<<"foo">>, <<"bar">>}]}, + set_security(Db, SecObj), + + % DbInfo is saved after setting metadata bits + % as those could bump the update sequence + DbInfo0 = get_db_info(Db), + PX0 = get_partition_info(Db, <<"PX">>), + PY0 = get_partition_info(Db, <<"PY">>), + + % Split the one shard + [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)), + {ok, JobId} = mem3_reshard:start_split_job(Shard), + wait_state(JobId, completed), + + % Perform some basic checks that the shard was split + Shards1 = lists:sort(mem3:local_shards(Db)), + ?assertEqual(2, length(Shards1)), + [#shard{range = R1}, #shard{range = R2}] = Shards1, + ?assertEqual([16#00000000, 16#7fffffff], R1), + ?assertEqual([16#80000000, 16#ffffffff], R2), + + % Check metadata bits after the split + ?assertEqual(942, get_revs_limit(Db)), + ?assertEqual(943, get_purge_infos_limit(Db)), + ?assertEqual(SecObj, get_security(Db)), + + DbInfo1 = get_db_info(Db), + Docs1 = get_all_docs(Db), + Local1 = get_local_docs(Db), + + % When comparing db infos, ignore update sequences they won't be the + % same since they are more shards involved after the split + ?assertEqual(without_seqs(DbInfo0), without_seqs(DbInfo1)), + + % Update seq prefix number is a sum of all shard update sequences + #{<<"update_seq">> := UpdateSeq0} = update_seq_to_num(DbInfo0), + #{<<"update_seq">> := UpdateSeq1} = update_seq_to_num(DbInfo1), + ?assertEqual(UpdateSeq0 * 2, UpdateSeq1), + + % Finally compare that documents are still there after the split + ?assertEqual(Docs0, Docs1), + + ?assertEqual(PX0, get_partition_info(Db, <<"PX">>)), + ?assertEqual(PY0, get_partition_info(Db, <<"PY">>)), + + % Don't forget about the local but don't include internal checkpoints + % as some of those are munged and transformed during the split + ?assertEqual(without_meta_locals(Local0), without_meta_locals(Local1)) + end). + + +% Make sure a shard can be split again after it was split once. This checks that +% too many got added to some range, such that on next split they'd fail to fit +% in to any of the new target ranges. +split_twice(#{db1 := Db}) -> + ?_test(begin + DocSpec = #{docs => 100, delete => [80, 99], mrview => 2, local => 100}, + add_test_docs(Db, DocSpec), + + % Save documents before the split + Docs0 = get_all_docs(Db), + Local0 = get_local_docs(Db), + + % Set some custom metadata properties + set_revs_limit(Db, 942), + set_purge_infos_limit(Db, 943), + SecObj = {[{<<"foo">>, <<"bar">>}]}, + set_security(Db, SecObj), + + % DbInfo is saved after setting metadata bits + % as those could bump the update sequence + DbInfo0 = get_db_info(Db), + + % Split the one shard + [#shard{name=Shard1}] = lists:sort(mem3:local_shards(Db)), + {ok, JobId1} = mem3_reshard:start_split_job(Shard1), + wait_state(JobId1, completed), + + % Perform some basic checks that the shard was split + Shards1 = lists:sort(mem3:local_shards(Db)), + ?assertEqual(2, length(Shards1)), + [#shard{range = R1}, #shard{range = R2}] = Shards1, + ?assertEqual([16#00000000, 16#7fffffff], R1), + ?assertEqual([16#80000000, 16#ffffffff], R2), + + % Check metadata bits after the split + ?assertEqual(942, get_revs_limit(Db)), + ?assertEqual(943, get_purge_infos_limit(Db)), + ?assertEqual(SecObj, get_security(Db)), + + DbInfo1 = get_db_info(Db), + Docs1 = get_all_docs(Db), + Local1 = get_local_docs(Db), + + % When comparing db infos, ignore update sequences they won't be the + % same since they are more shards involved after the split + ?assertEqual(without_seqs(DbInfo0), without_seqs(DbInfo1)), + + % Update seq prefix number is a sum of all shard update sequences + #{<<"update_seq">> := UpdateSeq0} = update_seq_to_num(DbInfo0), + #{<<"update_seq">> := UpdateSeq1} = update_seq_to_num(DbInfo1), + ?assertEqual(UpdateSeq0 * 2, UpdateSeq1), + + ?assertEqual(Docs0, Docs1), + ?assertEqual(without_meta_locals(Local0), without_meta_locals(Local1)), + + % Split the first range again + [#shard{name=Shard2}, _] = lists:sort(mem3:local_shards(Db)), + {ok, JobId2} = mem3_reshard:start_split_job(Shard2), + wait_state(JobId2, completed), + + Shards2 = lists:sort(mem3:local_shards(Db)), + ?assertEqual(3, length(Shards2)), + [R3, R4, R5] = [R || #shard{range = R} <- Shards2], + ?assertEqual([16#00000000, 16#3fffffff], R3), + ?assertEqual([16#40000000, 16#7fffffff], R4), + ?assertEqual([16#80000000, 16#ffffffff], R5), + + % Check metadata bits after the second split + ?assertEqual(942, get_revs_limit(Db)), + ?assertEqual(943, get_purge_infos_limit(Db)), + ?assertEqual(SecObj, get_security(Db)), + + DbInfo2 = get_db_info(Db), + Docs2 = get_all_docs(Db), + Local2 = get_local_docs(Db), + + ?assertEqual(without_seqs(DbInfo1), without_seqs(DbInfo2)), + % Update seq prefix number is a sum of all shard update sequences + % But only 1 shard out of 2 was split + #{<<"update_seq">> := UpdateSeq2} = update_seq_to_num(DbInfo2), + ?assertEqual(trunc(UpdateSeq1 * 1.5), UpdateSeq2), + ?assertEqual(Docs1, Docs2), + ?assertEqual(without_meta_locals(Local1), without_meta_locals(Local2)) + end). + + +couch_events_are_emitted(#{db1 := Db}) -> + ?_test(begin + couch_event:register_all(self()), + + % Split the one shard + [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)), + {ok, JobId} = mem3_reshard:start_split_job(Shard), + wait_state(JobId, completed), + + % Perform some basic checks that the shard was split + Shards1 = lists:sort(mem3:local_shards(Db)), + ?assertEqual(2, length(Shards1)), + [#shard{range = R1}, #shard{range = R2}] = Shards1, + ?assertEqual([16#00000000, 16#7fffffff], R1), + ?assertEqual([16#80000000, 16#ffffffff], R2), + + Flush = fun F(Events) -> + receive + {'$couch_event', DbName, Event} when Event =:= deleted + orelse Event =:= updated -> + case binary:match(DbName, Db) of + nomatch -> F(Events); + {_, _} -> F([Event | Events]) + end + after 0 -> + lists:reverse(Events) + end + end, + Events = Flush([]), + StartAtDeleted = lists:dropwhile(fun(E) -> E =/= deleted end, Events), + ?assertMatch([deleted, deleted, updated, updated | _], StartAtDeleted), + couch_event:unregister(self()) + end). + + +retries_work(#{db1 := Db}) -> + ?_test(begin + meck:expect(couch_db_split, split, fun(_, _, _) -> + error(kapow) + end), + + [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)), + {ok, JobId} = mem3_reshard:start_split_job(Shard), + + wait_state(JobId, failed), + ?assertEqual(3, meck:num_calls(couch_db_split, split, 3)) + end). + + +target_reset_in_initial_copy(#{db1 := Db}) -> + ?_test(begin + [#shard{} = Src] = lists:sort(mem3:local_shards(Db)), + Job = #job{ + source = Src, + target = [#shard{name= <<"t1">>}, #shard{name = <<"t2">>}], + job_state = running, + split_state = initial_copy + }, + BogusParent = spawn(fun() -> receive {ack, _, _} -> ok end end), + put('$ancestors', [BogusParent]), % make prock_lib:ack not blow up + meck:expect(mem3_reshard, checkpoint, 2, ok), + meck:expect(couch_db_split, cleanup_target, 2, ok), + meck:expect(couch_server, exists, fun + (<<"t1">>) -> true; + (<<"t2">>) -> true; + (DbName) -> meck:passthrough([DbName]) + end), + JobPid = spawn(fun() -> mem3_reshard_job:init(Job) end), + meck:wait(2, couch_db_split, cleanup_target, ['_', '_'], 5000), + exit(JobPid, kill), + exit(BogusParent, kill), + ?assertEqual(2, meck:num_calls(couch_db_split, cleanup_target, 2)) + end). + + +split_an_incomplete_shard_map(#{db1 := Db}) -> + ?_test(begin + [#shard{} = Src] = lists:sort(mem3:local_shards(Db)), + [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)), + meck:expect(mem3_util, calculate_max_n, 1, 0), + ?assertMatch({error, {not_enough_shard_copies, _}}, + mem3_reshard:start_split_job(Shard)) + end). + + +intercept_state(State) -> + TestPid = self(), + meck:new(mem3_reshard_job, [passthrough]), + meck:expect(mem3_reshard_job, checkpoint_done, fun(Job) -> + case Job#job.split_state of + State -> + TestPid ! {self(), State}, + receive + continue -> meck:passthrough([Job]); + cancel -> ok + end; + _ -> + meck:passthrough([Job]) + end + end). + + +wait_state(JobId, State) -> + test_util:wait(fun() -> + case mem3_reshard:job(JobId) of + {ok, {Props}} -> + case couch_util:get_value(job_state, Props) of + State -> ok; + _ -> timer:sleep(100), wait + end; + {error, not_found} -> timer:sleep(100), wait + end + end, 30000). + + +set_revs_limit(DbName, Limit) -> + with_proc(fun() -> fabric:set_revs_limit(DbName, Limit, [?ADMIN_CTX]) end). + + +get_revs_limit(DbName) -> + with_proc(fun() -> fabric:get_revs_limit(DbName) end). + + +get_purge_infos_limit(DbName) -> + with_proc(fun() -> fabric:get_purge_infos_limit(DbName) end). + + +set_purge_infos_limit(DbName, Limit) -> + with_proc(fun() -> + fabric:set_purge_infos_limit(DbName, Limit, [?ADMIN_CTX]) + end). + + +set_security(DbName, SecObj) -> + with_proc(fun() -> fabric:set_security(DbName, SecObj) end). + + +get_security(DbName) -> + with_proc(fun() -> fabric:get_security(DbName, [?ADMIN_CTX]) end). + + +get_db_info(DbName) -> + with_proc(fun() -> + {ok, Info} = fabric:get_db_info(DbName), + maps:with([ + <<"db_name">>, <<"doc_count">>, <<"props">>, <<"doc_del_count">>, + <<"update_seq">>, <<"purge_seq">>, <<"disk_format_version">> + ], to_map(Info)) + end). + + +get_group_info(DbName, DesignId) -> + with_proc(fun() -> + {ok, GInfo} = fabric:get_view_group_info(DbName, DesignId), + maps:with([ + <<"language">>, <<"purge_seq">>, <<"signature">>, <<"update_seq">> + ], to_map(GInfo)) + end). + + +get_partition_info(DbName, Partition) -> + with_proc(fun() -> + {ok, PInfo} = fabric:get_partition_info(DbName, Partition), + maps:with([ + <<"db_name">>, <<"doc_count">>, <<"doc_del_count">>, <<"partition">> + ], to_map(PInfo)) + end). + + +get_all_docs(DbName) -> + get_all_docs(DbName, #mrargs{}). + + +get_all_docs(DbName, #mrargs{} = QArgs0) -> + GL = erlang:group_leader(), + with_proc(fun() -> + Cb = fun + ({row, Props}, Acc) -> + Doc = to_map(couch_util:get_value(doc, Props)), + #{?ID := Id} = Doc, + {ok, Acc#{Id => Doc}}; + ({meta, _}, Acc) -> {ok, Acc}; + (complete, Acc) -> {ok, Acc} + end, + QArgs = QArgs0#mrargs{include_docs = true}, + {ok, Docs} = fabric:all_docs(DbName, Cb, #{}, QArgs), + Docs + end, GL). + + +get_local_docs(DbName) -> + LocalNS = {namespace, <<"_local">>}, + maps:map(fun(_, Doc) -> + maps:without([<<"_rev">>], Doc) + end, get_all_docs(DbName, #mrargs{extra = [LocalNS]})). + + +without_seqs(#{} = InfoMap) -> + maps:without([<<"update_seq">>, <<"purge_seq">>], InfoMap). + + +without_meta_locals(#{} = Local) -> + maps:filter(fun + (<<"_local/purge-mrview-", _/binary>>, _) -> false; + (<<"_local/shard-sync-", _/binary>>, _) -> false; + (_, _) -> true + end, Local). + + +update_seq_to_num(#{} = InfoMap) -> + maps:map(fun + (<<"update_seq">>, Seq) -> seq_to_num(Seq); + (<<"purge_seq">>, PSeq) -> seq_to_num(PSeq); + (_, V) -> V + end, InfoMap). + + +seq_to_num(Seq) -> + [SeqNum, _] = binary:split(Seq, <<"-">>), + binary_to_integer(SeqNum). + + +to_map([_ | _] = Props) -> + to_map({Props}); + +to_map({[_ | _]} = EJson) -> + jiffy:decode(jiffy:encode(EJson), [return_maps]). + + +create_db(DbName, Opts) -> + GL = erlang:group_leader(), + with_proc(fun() -> fabric:create_db(DbName, Opts) end, GL). + + +delete_db(DbName) -> + GL = erlang:group_leader(), + with_proc(fun() -> fabric:delete_db(DbName, [?ADMIN_CTX]) end, GL). + + +with_proc(Fun) -> + with_proc(Fun, undefined, 30000). + + +with_proc(Fun, GroupLeader) -> + with_proc(Fun, GroupLeader, 30000). + + +with_proc(Fun, GroupLeader, Timeout) -> + {Pid, Ref} = spawn_monitor(fun() -> + case GroupLeader of + undefined -> ok; + _ -> erlang:group_leader(GroupLeader, self()) + end, + exit({with_proc_res, Fun()}) + end), + receive + {'DOWN', Ref, process, Pid, {with_proc_res, Res}} -> + Res; + {'DOWN', Ref, process, Pid, Error} -> + error(Error) + after Timeout -> + erlang:demonitor(Ref, [flush]), + exit(Pid, kill), + error({with_proc_timeout, Fun, Timeout}) + end. + + +add_test_docs(DbName, #{} = DocSpec) -> + Docs = docs(maps:get(docs, DocSpec, [])) + ++ pdocs(maps:get(pdocs, DocSpec, #{})) + ++ ddocs(mrview, maps:get(mrview, DocSpec, [])) + ++ ddocs(search, maps:get(search, DocSpec, [])) + ++ ddocs(geo, maps:get(geo, DocSpec, [])) + ++ ldocs(maps:get(local, DocSpec, [])), + Res = update_docs(DbName, Docs), + Docs1 = lists:map(fun({Doc, {ok, {RevPos, Rev}}}) -> + Doc#doc{revs = {RevPos, [Rev]}} + end, lists:zip(Docs, Res)), + case delete_docs(maps:get(delete, DocSpec, []), Docs1) of + [] -> ok; + [_ | _] = Deleted -> update_docs(DbName, Deleted) + end, + ok. + + +update_docs(DbName, Docs) -> + with_proc(fun() -> + case fabric:update_docs(DbName, Docs, [?ADMIN_CTX]) of + {accepted, Res} -> Res; + {ok, Res} -> Res + end + end). + + +delete_docs([S, E], Docs) when E >= S -> + ToDelete = [doc_id(<<"">>, I) || I <- lists:seq(S, E)], + lists:filtermap(fun(#doc{id = Id} = Doc) -> + case lists:member(Id, ToDelete) of + true -> {true, Doc#doc{deleted = true}}; + false -> false + end + end, Docs); +delete_docs(_, _) -> + []. + + +pdocs(#{} = PMap) -> + maps:fold(fun(Part, DocSpec, DocsAcc) -> + docs(DocSpec, <<Part/binary, ":">>) ++ DocsAcc + end, [], PMap). + + +docs(DocSpec) -> + docs(DocSpec, <<"">>). + + +docs(N, Prefix) when is_integer(N), N > 0 -> + docs([0, N - 1], Prefix); +docs([S, E], Prefix) when E >= S -> + [doc(Prefix, I) || I <- lists:seq(S, E)]; +docs(_, _) -> + []. + +ddocs(Type, N) when is_integer(N), N > 0 -> + ddocs(Type, [0, N - 1]); +ddocs(Type, [S, E]) when E >= S -> + Body = ddprop(Type), + BType = atom_to_binary(Type, utf8), + [doc(<<"_design/", BType/binary>>, I, Body, 0) || I <- lists:seq(S, E)]; +ddocs(_, _) -> + []. + + +ldocs(N) when is_integer(N), N > 0 -> + ldocs([0, N - 1]); +ldocs([S, E]) when E >= S -> + [doc(<<"_local/">>, I, bodyprops(), 0) || I <- lists:seq(S, E)]; +ldocs(_) -> + []. + + + +doc(Pref, Id) -> + Body = bodyprops(), + doc(Pref, Id, Body, 42). + + +doc(Pref, Id, BodyProps, AttSize) -> + #doc{ + id = doc_id(Pref, Id), + body = {BodyProps}, + atts = atts(AttSize) + }. + + +doc_id(Pref, Id) -> + IdBin = iolist_to_binary(io_lib:format("~5..0B", [Id])), + <<Pref/binary, IdBin/binary>>. + + +ddprop(mrview) -> + [ + {<<"views">>, {[ + {<<"v1">>, {[ + {<<"map">>, <<"function(d){emit(d);}">>} + ]}} + ]}} + ]; + +ddprop(geo) -> + [ + {<<"st_indexes">>, {[ + {<<"area">>, {[ + {<<"analyzer">>, <<"standard">>}, + {<<"index">>, <<"function(d){if(d.g){st_index(d.g)}}">> } + ]}} + ]}} + ]; + +ddprop(search) -> + [ + {<<"indexes">>, {[ + {<<"types">>, {[ + {<<"index">>, <<"function(d){if(d.g){st_index(d.g.type)}}">>} + ]}} + ]}} + ]. + + +bodyprops() -> + [ + {<<"g">>, {[ + {<<"type">>, <<"Polygon">>}, + {<<"coordinates">>, [[[-71.0, 48.4], [-70.0, 48.4], [-71.0, 48.4]]]} + ]}} + ]. + + +atts(0) -> + []; + +atts(Size) when is_integer(Size), Size >= 1 -> + Data = << <<"x">> || _ <- lists:seq(1, Size) >>, + [couch_att:new([ + {name, <<"att">>}, + {type, <<"app/binary">>}, + {att_len, Size}, + {data, Data} + ])]. diff --git a/test/elixir/lib/couch/db_test.ex b/test/elixir/lib/couch/db_test.ex index 7a08aae36..d88478a8f 100644 --- a/test/elixir/lib/couch/db_test.ex +++ b/test/elixir/lib/couch/db_test.ex @@ -171,8 +171,7 @@ defmodule Couch.DBTest do def delete_db(db_name) do resp = Couch.delete("/#{db_name}") - assert resp.status_code in [200, 202] - assert resp.body == %{"ok" => true} + assert resp.status_code in [200, 202, 404] {:ok, resp} end diff --git a/test/elixir/test/reshard_all_docs_test.exs b/test/elixir/test/reshard_all_docs_test.exs new file mode 100644 index 000000000..62b6e372c --- /dev/null +++ b/test/elixir/test/reshard_all_docs_test.exs @@ -0,0 +1,79 @@ +defmodule ReshardAllDocsTest do + use CouchTestCase + import ReshardHelpers + + @moduledoc """ + Test _all_docs interaction with resharding + """ + + setup do + db = random_db_name() + {:ok, _} = create_db(db, query: %{q: 2}) + + on_exit(fn -> + reset_reshard_state() + delete_db(db) + end) + + {:ok, [db: db]} + end + + test "all_docs after splitting all shards on node1", context do + db = context[:db] + node1 = get_first_node() + docs = add_docs(1..100, db) + + before_split_all_docs = all_docs(db) + assert docs == before_split_all_docs + + resp = post_job_node(db, node1) + assert resp.status_code == 201 + jobid = hd(resp.body)["id"] + wait_job_completed(jobid) + + assert before_split_all_docs == all_docs(db) + + assert remove_job(jobid).status_code == 200 + end + + test "all_docs after splitting the same range on all nodes", context do + db = context[:db] + docs = add_docs(1..100, db) + + before_split_all_docs = all_docs(db) + assert docs == before_split_all_docs + + resp = post_job_range(db, "00000000-7fffffff") + assert resp.status_code == 201 + + resp.body + |> Enum.map(fn j -> j["id"] end) + |> Enum.each(fn id -> wait_job_completed(id) end) + + assert before_split_all_docs == all_docs(db) + + get_jobs() + |> Enum.map(fn j -> j["id"] end) + |> Enum.each(fn id -> remove_job(id) end) + end + + defp add_docs(range, db) do + docs = create_docs(range) + w3 = %{:w => 3} + resp = Couch.post("/#{db}/_bulk_docs", body: %{docs: docs}, query: w3) + assert resp.status_code == 201 + assert length(resp.body) == length(docs) + + docs + |> rev(resp.body) + |> Enum.into(%{}, fn %{:_id => id, :_rev => rev} -> {id, rev} end) + end + + defp all_docs(db, query \\ %{}) do + resp = Couch.get("/#{db}/_all_docs", query: query) + assert resp.status_code == 200 + + resp.body["rows"] + |> Enum.into(%{}, fn %{"id" => id, "value" => v} -> {id, v["rev"]} end) + end +end diff --git a/test/elixir/test/reshard_basic_test.exs b/test/elixir/test/reshard_basic_test.exs new file mode 100644 index 000000000..211dd6bf7 --- /dev/null +++ b/test/elixir/test/reshard_basic_test.exs @@ -0,0 +1,174 @@ +defmodule ReshardBasicTest do + use CouchTestCase + import ReshardHelpers + + @moduledoc """ + Test resharding basic functionality + """ + + setup_all do + db1 = random_db_name() + {:ok, _} = create_db(db1, query: %{q: 1}) + db2 = random_db_name() + {:ok, _} = create_db(db2, query: %{q: 2}) + + on_exit(fn -> + reset_reshard_state() + delete_db(db1) + delete_db(db2) + end) + + {:ok, [db1: db1, db2: db2]} + end + + test "basic api querying, no jobs present" do + summary = get_summary() + assert summary["state"] == "running" + assert summary["state_reason"] == :null + assert summary["total"] == 0 + assert summary["completed"] == 0 + assert summary["failed"] == 0 + assert summary["stopped"] == 0 + assert get_state() == %{"state" => "running", "reason" => :null} + assert get_jobs() == [] + end + + test "check validation of invalid parameters", context do + db1 = context[:db1] + node1 = get_first_node() + + resp = post_job_node(db1, "badnode") + assert resp.status_code == 400 + + resp = post_job_node("badresharddb", node1) + assert resp.status_code == 400 + + resp = post_job_db("badresharddb") + assert resp.status_code == 400 + + resp = post_job_range("badresharddb", "randomgarbage") + assert resp.status_code == 400 + + resp = get_job("badjobid") + assert resp.status_code == 404 + + resp = remove_job("badjobid") + assert resp.status_code == 404 + end + + test "toggle global state" do + assert get_state() == %{"state" => "running", "reason" => :null} + put_state_stopped("xyz") + assert get_state() == %{"state" => "stopped", "reason" => "xyz"} + put_state_running() + assert get_state() == %{"state" => "running", "reason" => :null} + end + + test "split q=1 db shards on node1 (1 job)", context do + db = context[:db1] + node1 = get_first_node() + + resp = post_job_node(db, node1) + assert resp.status_code == 201 + + body = resp.body + assert is_list(body) + assert length(body) == 1 + + [job] = body + id = job["id"] + assert is_binary(id) + node = job["node"] + assert is_binary(node) + assert node == node1 + assert job["ok"] == true + shard = job["shard"] + assert is_binary(shard) + + resp = get_job(id) + assert resp.status_code == 200 + + body = resp.body + assert body["type"] == "split" + assert body["id"] == id + assert body["source"] == shard + assert is_list(body["history"]) + assert body["job_state"] in ["new", "running", "completed"] + assert is_list(body["target"]) + assert length(body["target"]) == 2 + + wait_job_completed(id) + + resp = get_job(id) + assert resp.status_code == 200 + + body = resp.body + assert body["job_state"] == "completed" + assert body["split_state"] == "completed" + + resp = Couch.get("/#{db}/_shards") + assert resp.status_code == 200 + shards = resp.body["shards"] + assert node1 not in shards["00000000-ffffffff"] + assert shards["00000000-7fffffff"] == [node1] + assert shards["80000000-ffffffff"] == [node1] + + summary = get_summary() + assert summary["total"] == 1 + assert summary["completed"] == 1 + + resp = remove_job(id) + assert resp.status_code == 200 + + assert get_jobs() == [] + + summary = get_summary() + assert summary["total"] == 0 + assert summary["completed"] == 0 + end + + test "split q=2 shards on node1 (2 jobs)", context do + db = context[:db2] + node1 = get_first_node() + + resp = post_job_node(db, node1) + assert resp.status_code == 201 + + body = resp.body + assert is_list(body) + assert length(body) == 2 + + [job1, job2] = Enum.sort(body) + {id1, id2} = {job1["id"], job2["id"]} + + assert get_job(id1).body["id"] == id1 + assert get_job(id2).body["id"] == id2 + + summary = get_summary() + assert summary["total"] == 2 + + wait_job_completed(id1) + wait_job_completed(id2) + + summary = get_summary() + assert summary["completed"] == 2 + + resp = Couch.get("/#{db}/_shards") + assert resp.status_code == 200 + shards = resp.body["shards"] + assert node1 not in shards["00000000-7fffffff"] + assert node1 not in shards["80000000-ffffffff"] + assert shards["00000000-3fffffff"] == [node1] + assert shards["40000000-7fffffff"] == [node1] + assert shards["80000000-bfffffff"] == [node1] + assert shards["c0000000-ffffffff"] == [node1] + + # deleting the source db should remove the jobs + delete_db(db) + wait_job_removed(id1) + wait_job_removed(id2) + + summary = get_summary() + assert summary["total"] == 0 + end +end diff --git a/test/elixir/test/reshard_changes_feed.exs b/test/elixir/test/reshard_changes_feed.exs new file mode 100644 index 000000000..a4a39fec1 --- /dev/null +++ b/test/elixir/test/reshard_changes_feed.exs @@ -0,0 +1,81 @@ +defmodule ReshardChangesFeedTest do + use CouchTestCase + import ReshardHelpers + + @moduledoc """ + Test _changes interaction with resharding + """ + + setup do + db = random_db_name() + {:ok, _} = create_db(db, query: %{q: 2}) + + on_exit(fn -> + reset_reshard_state() + delete_db(db) + end) + + {:ok, [db: db]} + end + + test "all_docs after splitting all shards on node1", context do + db = context[:db] + add_docs(1..3, db) + + all_before = changes(db) + first_seq = hd(all_before["results"])["seq"] + last_seq = all_before["last_seq"] + since_1_before = docset(changes(db, %{:since => first_seq})) + since_last_before = docset(changes(db, %{:since => last_seq})) + + resp = post_job_range(db, "00000000-7fffffff") + assert resp.status_code == 201 + + resp.body + |> Enum.map(fn j -> j["id"] end) + |> Enum.each(fn id -> wait_job_completed(id) end) + + all_after = changes(db) + since_1_after = docset(changes(db, %{:since => first_seq})) + since_last_after = docset(changes(db, %{:since => last_seq})) + + assert docset(all_before) == docset(all_after) + assert MapSet.subset?(since_1_before, since_1_after) + assert MapSet.subset?(since_last_before, since_last_after) + + get_jobs() + |> Enum.map(fn j -> j["id"] end) + |> Enum.each(fn id -> remove_job(id) end) + end + + defp docset(changes) do + changes["results"] + |> Enum.map(fn %{"id" => id} -> id end) + |> MapSet.new() + end + + defp changes(db, query \\ %{}) do + resp = Couch.get("/#{db}/_changes", query: query) + assert resp.status_code == 200 + resp.body + end + + defp add_docs(range, db) do + docs = create_docs(range) + w3 = %{:w => 3} + resp = Couch.post("/#{db}/_bulk_docs", body: %{docs: docs}, query: w3) + assert resp.status_code == 201 + assert length(resp.body) == length(docs) + + docs + |> rev(resp.body) + |> Enum.into(%{}, fn %{:_id => id, :_rev => rev} -> {id, rev} end) + end + + # (Keep for debugging) + # defp unpack_seq(seq) when is_binary(seq) do + # [_, opaque] = String.split(seq, "-") + # {:ok, binblob} = Base.url_decode64(opaque, padding: false) + # :erlang.binary_to_term(binblob) + # end +end diff --git a/test/elixir/test/reshard_helpers.exs b/test/elixir/test/reshard_helpers.exs new file mode 100644 index 000000000..c67e6902e --- /dev/null +++ b/test/elixir/test/reshard_helpers.exs @@ -0,0 +1,111 @@ +defmodule ReshardHelpers do + use CouchTestCase + + def get_summary do + resp = Couch.get("/_reshard") + assert resp.status_code == 200 + resp.body + end + + def get_state do + resp = Couch.get("/_reshard/state") + assert resp.status_code == 200 + resp.body + end + + def put_state_running do + resp = Couch.put("/_reshard/state", body: %{:state => "running"}) + assert resp.status_code == 200 + resp + end + + def put_state_stopped(reason \\ "") do + body = %{:state => "stopped", :reason => reason} + resp = Couch.put("/_reshard/state", body: body) + assert resp.status_code == 200 + resp + end + + def get_jobs do + resp = Couch.get("/_reshard/jobs") + assert resp.status_code == 200 + resp.body["jobs"] + end + + def post_job_db(db) do + body = %{:type => :split, :db => db} + Couch.post("/_reshard/jobs", body: body) + end + + def post_job_node(db, node) do + body = %{:type => :split, :db => db, :node => node} + Couch.post("/_reshard/jobs", body: body) + end + + def post_job_range(db, range) do + body = %{:type => :split, :db => db, :range => range} + Couch.post("/_reshard/jobs", body: body) + end + + def post_job_node_and_range(db, node, range) do + body = %{:type => :split, :db => db, :node => node, :range => range} + Couch.post("/_reshard/jobs", body: body) + end + + def get_job(id) when is_binary(id) do + Couch.get("/_reshard/jobs/#{id}") + end + + def remove_job(id) when is_binary(id) do + Couch.delete("/_reshard/jobs/#{id}") + end + + def get_job_state(id) when is_binary(id) do + resp = Couch.get("/_reshard/jobs/#{id}/state") + assert resp.status_code == 200 + resp.body["state"] + end + + def stop_job(id, reason \\ "") when is_binary(id) do + body = %{:state => "stopped", :reason => reason} + Couch.post("/_reshard/jobs/#{id}/state", body: body) + end + + def resume_job(id) when is_binary(id) do + body = %{:state => "running"} + Couch.post("/_reshard/jobs/#{id}/state", body: body) + end + + def job_ids(jobs) do + Enum.map(fn job -> job["id"] end, jobs) + end + + def get_first_node do + mresp = Couch.get("/_membership") + assert mresp.status_code == 200 + cluster_nodes = mresp.body["cluster_nodes"] + [node1 | _] = cluster_nodes + node1 + end + + def wait_job_removed(id) do + retry_until(fn -> get_job(id).status_code == 404 end, 200, 10_000) + end + + def wait_job_completed(id) do + wait_job_state(id, "completed") + end + + def wait_job_state(id, state) do + retry_until(fn -> get_job_state(id) == state end, 200, 10_000) + end + + def reset_reshard_state do + get_jobs() + |> Enum.map(fn j -> j["id"] end) + |> Enum.each(fn id -> remove_job(id) end) + + assert get_jobs() == [] + put_state_running() + end +end diff --git a/test/elixir/test/test_helper.exs b/test/elixir/test/test_helper.exs index 4df3bf74a..ef71bbb1b 100644 --- a/test/elixir/test/test_helper.exs +++ b/test/elixir/test/test_helper.exs @@ -13,3 +13,4 @@ ExUnit.configure( ExUnit.start() Code.require_file("partition_helpers.exs", __DIR__) +Code.require_file("reshard_helpers.exs", __DIR__) |