summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2019-03-18 13:32:15 -0400
committerNick Vatamaniuc <vatamane@apache.org>2019-04-02 16:48:22 -0400
commit64bc9417cac092bb20fbdb76a830dcca60d2f13d (patch)
tree5db1b16cec2cc628ceb1d71b925502feff234db8
parentd25d982217c6fa0be47d3b42402a8b5fbdcf4116 (diff)
downloadcouchdb-reshard.tar.gz
Implement resharding HTTP APIreshard
This implements the API as defined in RFC #1920 The handlers live in the `mem3_reshard_httpd` and helpers, like validators live in the `mem3_reshard_api` module. There are also a bunch of high level (HTTP & fabric) API tests that check that shard splitting happens properly, jobs are behaving as defined in the RFC, etc. Co-authored-by: Eric Avdey <eiri@eiri.ca>
-rw-r--r--src/chttpd/src/chttpd_auth_request.erl2
-rw-r--r--src/mem3/src/mem3_httpd_handlers.erl1
-rw-r--r--src/mem3/src/mem3_reshard_api.erl195
-rw-r--r--src/mem3/src/mem3_reshard_httpd.erl317
-rw-r--r--src/mem3/test/mem3_reshard_api_test.erl817
-rw-r--r--src/mem3/test/mem3_reshard_changes_feed_test.erl388
-rw-r--r--src/mem3/test/mem3_reshard_test.erl804
-rw-r--r--test/elixir/lib/couch/db_test.ex3
-rw-r--r--test/elixir/test/reshard_all_docs_test.exs79
-rw-r--r--test/elixir/test/reshard_basic_test.exs174
-rw-r--r--test/elixir/test/reshard_changes_feed.exs81
-rw-r--r--test/elixir/test/reshard_helpers.exs111
-rw-r--r--test/elixir/test/test_helper.exs1
13 files changed, 2971 insertions, 2 deletions
diff --git a/src/chttpd/src/chttpd_auth_request.erl b/src/chttpd/src/chttpd_auth_request.erl
index 5b4ec84d5..96dbf980c 100644
--- a/src/chttpd/src/chttpd_auth_request.erl
+++ b/src/chttpd/src/chttpd_auth_request.erl
@@ -50,6 +50,8 @@ authorize_request_int(#httpd{path_parts=[<<"_replicator">>,<<"_changes">>|_]}=Re
require_admin(Req);
authorize_request_int(#httpd{path_parts=[<<"_replicator">>|_]}=Req) ->
db_authorization_check(Req);
+authorize_request_int(#httpd{path_parts=[<<"_reshard">>|_]}=Req) ->
+ require_admin(Req);
authorize_request_int(#httpd{path_parts=[<<"_users">>], method='PUT'}=Req) ->
require_admin(Req);
authorize_request_int(#httpd{path_parts=[<<"_users">>], method='DELETE'}=Req) ->
diff --git a/src/mem3/src/mem3_httpd_handlers.erl b/src/mem3/src/mem3_httpd_handlers.erl
index 7cbd9fe5f..7dd6ab052 100644
--- a/src/mem3/src/mem3_httpd_handlers.erl
+++ b/src/mem3/src/mem3_httpd_handlers.erl
@@ -15,6 +15,7 @@
-export([url_handler/1, db_handler/1, design_handler/1]).
url_handler(<<"_membership">>) -> fun mem3_httpd:handle_membership_req/1;
+url_handler(<<"_reshard">>) -> fun mem3_reshard_httpd:handle_reshard_req/1;
url_handler(_) -> no_match.
db_handler(<<"_shards">>) -> fun mem3_httpd:handle_shards_req/2;
diff --git a/src/mem3/src/mem3_reshard_api.erl b/src/mem3/src/mem3_reshard_api.erl
new file mode 100644
index 000000000..f39df4cbb
--- /dev/null
+++ b/src/mem3/src/mem3_reshard_api.erl
@@ -0,0 +1,195 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_reshard_api).
+
+-export([
+ create_jobs/5,
+ get_jobs/0,
+ get_job/1,
+ get_summary/0,
+ resume_job/1,
+ stop_job/2,
+ start_shard_splitting/0,
+ stop_shard_splitting/1,
+ get_shard_splitting_state/0
+]).
+
+
+create_jobs(Node, Shard, Db, Range, split) ->
+ lists:map(fun(S) ->
+ N = mem3:node(S),
+ Name = mem3:name(S),
+ case rpc:call(N, mem3_reshard, start_split_job, [Name]) of
+ {badrpc, Error} ->
+ {error, Error, N, Name};
+ {ok, JobId} ->
+ {ok, JobId, N, Name};
+ {error, Error} ->
+ {error, Error, N, Name}
+ end
+ end, pick_shards(Node, Shard, Db, Range)).
+
+
+get_jobs() ->
+ Nodes = mem3_util:live_nodes(),
+ {Replies, _Bad} = rpc:multicall(Nodes, mem3_reshard, jobs, []),
+ lists:flatten(Replies).
+
+
+get_job(JobId) ->
+ Nodes = mem3_util:live_nodes(),
+ {Replies, _Bad} = rpc:multicall(Nodes, mem3_reshard, job, [JobId]),
+ case [JobInfo || {ok, JobInfo} <- Replies] of
+ [JobInfo | _] ->
+ {ok, JobInfo};
+ [] ->
+ {error, not_found}
+ end.
+
+
+get_summary() ->
+ Nodes = mem3_util:live_nodes(),
+ {Replies, _Bad} = rpc:multicall(Nodes, mem3_reshard, get_state, []),
+ Stats0 = #{running => 0, total => 0, completed => 0, failed => 0,
+ stopped => 0},
+ StatsF = lists:foldl(fun({Res}, Stats) ->
+ maps:map(fun(Stat, OldVal) ->
+ OldVal + couch_util:get_value(Stat, Res, 0)
+ end, Stats)
+ end, Stats0, Replies),
+ {State, Reason} = state_and_reason(Replies),
+ StateReasonProps = [{state, State}, {state_reason, Reason}],
+ {StateReasonProps ++ lists:sort(maps:to_list(StatsF))}.
+
+
+resume_job(JobId) ->
+ Nodes = mem3_util:live_nodes(),
+ {Replies, _Bad} = rpc:multicall(Nodes, mem3_reshard, resume_job,
+ [JobId]),
+ WithoutNotFound = [R || R <- Replies, R =/= {error, not_found}],
+ case lists:usort(WithoutNotFound) of
+ [ok] ->
+ ok;
+ [{error, Error} | _] ->
+ {error, {[{error, couch_util:to_binary(Error)}]}};
+ [] ->
+ {error, not_found}
+ end.
+
+
+stop_job(JobId, Reason) ->
+ Nodes = mem3_util:live_nodes(),
+ {Replies, _Bad} = rpc:multicall(Nodes, mem3_reshard, stop_job,
+ [JobId, Reason]),
+ WithoutNotFound = [R || R <- Replies, R =/= {error, not_found}],
+ case lists:usort(WithoutNotFound) of
+ [ok] ->
+ ok;
+ [{error, Error} | _] ->
+ {error, {[{error, couch_util:to_binary(Error)}]}};
+ [] ->
+ {error, not_found}
+ end.
+
+
+start_shard_splitting() ->
+ {Replies, _Bad} = rpc:multicall(mem3_reshard, start, []),
+ case lists:usort(lists:flatten(Replies)) of
+ [ok] ->
+ {ok, {[{ok, true}]}};
+ [Error | _] ->
+ {error, {[{error, couch_util:to_binary(Error)}]}}
+ end.
+
+
+stop_shard_splitting(Reason) ->
+ {Replies, _Bad} = rpc:multicall(mem3_reshard, stop, [Reason]),
+ case lists:usort(lists:flatten(Replies)) of
+ [ok] ->
+ {ok, {[{ok, true}]}};
+ [Error | _] ->
+ {error, {[{error, couch_util:to_binary(Error)}]}}
+ end.
+
+
+get_shard_splitting_state() ->
+ Nodes = mem3_util:live_nodes(),
+ {Replies, _Bad} = rpc:multicall(Nodes, mem3_reshard, get_state, []),
+ state_and_reason(Replies).
+
+
+state_and_reason(StateReplies) ->
+ AccF = lists:foldl(fun({ResProps}, Acc) ->
+ Reason = get_reason(ResProps),
+ case couch_util:get_value(state, ResProps) of
+ <<"running">> -> orddict:append(running, Reason, Acc);
+ <<"stopped">> -> orddict:append(stopped, Reason, Acc);
+ undefined -> Acc
+ end
+ end, orddict:from_list([{running, []}, {stopped, []}]), StateReplies),
+ Running = orddict:fetch(running, AccF),
+ case length(Running) > 0 of
+ true ->
+ Reason = pick_reason(Running),
+ {running, Reason};
+ false ->
+ Reason = pick_reason(orddict:fetch(stopped, AccF)),
+ {stopped, Reason}
+ end.
+
+
+pick_reason(Reasons) ->
+ Reasons1 = lists:usort(Reasons),
+ Reasons2 = [R || R <- Reasons1, R =/= undefined],
+ case Reasons2 of
+ [] -> null;
+ [R1 | _] -> R1
+ end.
+
+
+get_reason(StateProps) when is_list(StateProps) ->
+ case couch_util:get_value(state_info, StateProps) of
+ [] -> undefined;
+ undefined -> undefined;
+ {SInfoProps} -> couch_util:get_value(reason, SInfoProps)
+ end.
+
+
+pick_shards(undefined, undefined, Db, undefined) when is_binary(Db) ->
+ mem3:shards(Db);
+
+pick_shards(Node, undefined, Db, undefined) when is_atom(Node),
+ is_binary(Db) ->
+ [S || S <- mem3:shards(Db), mem3:node(S) == Node];
+
+pick_shards(undefined, undefined, Db, [_B, _E] = Range) when is_binary(Db) ->
+ [S || S <- mem3:shards(Db), mem3:range(S) == Range];
+
+pick_shards(Node, undefined, Db, [_B, _E] = Range) when is_atom(Node),
+ is_binary(Db) ->
+ [S || S <- mem3:shards(Db), mem3:node(S) == Node, mem3:range(S) == Range];
+
+pick_shards(undefined, Shard, undefined, undefined) when is_binary(Shard) ->
+ Db = mem3:dbname(Shard),
+ [S || S <- mem3:shards(Db), mem3:name(S) == Shard];
+
+pick_shards(Node, Shard, undefined, undefined) when is_atom(Node),
+ is_binary(Shard) ->
+ Db = mem3:dbname(Shard),
+ [S || S <- mem3:shards(Db), mem3:name(S) == Shard, mem3:node(S) == Node];
+
+pick_shards(_, undefined, undefined, _) ->
+ throw({bad_request, <<"Must specify at least `db` or `shard`">>});
+
+pick_shards(_, Db, Shard, _) when is_binary(Db), is_binary(Shard) ->
+ throw({bad_request, <<"`db` and `shard` are mutually exclusive">>}).
diff --git a/src/mem3/src/mem3_reshard_httpd.erl b/src/mem3/src/mem3_reshard_httpd.erl
new file mode 100644
index 000000000..3d0f77f39
--- /dev/null
+++ b/src/mem3/src/mem3_reshard_httpd.erl
@@ -0,0 +1,317 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_reshard_httpd).
+
+-export([
+ handle_reshard_req/1
+]).
+
+-import(couch_httpd, [
+ send_json/2,
+ send_json/3,
+ send_method_not_allowed/2
+]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+
+
+-define(JOBS, <<"jobs">>).
+-define(STATE, <<"state">>).
+-define(S_RUNNING, <<"running">>).
+-define(S_STOPPED, <<"stopped">>).
+
+
+% GET /_reshard
+handle_reshard_req(#httpd{method='GET', path_parts=[_]} = Req) ->
+ reject_if_disabled(),
+ State = mem3_reshard_api:get_summary(),
+ send_json(Req, State);
+
+handle_reshard_req(#httpd{path_parts=[_]} = Req) ->
+ send_method_not_allowed(Req, "GET,HEAD");
+
+% GET /_reshard/state
+handle_reshard_req(#httpd{method='GET',
+ path_parts=[_, ?STATE]} = Req) ->
+ reject_if_disabled(),
+ {State, Reason} = mem3_reshard_api:get_shard_splitting_state(),
+ send_json(Req, {[{state, State}, {reason, Reason}]});
+
+% PUT /_reshard/state
+handle_reshard_req(#httpd{method='PUT',
+ path_parts=[_, ?STATE]} = Req) ->
+ reject_if_disabled(),
+ couch_httpd:validate_ctype(Req, "application/json"),
+ {Props} = couch_httpd:json_body_obj(Req),
+ State = couch_util:get_value(<<"state">>, Props),
+ Reason = couch_util:get_value(<<"reason">>, Props),
+ case {State, Reason} of
+ {undefined, _} ->
+ throw({bad_request, <<"Expected a `state` field">>});
+ {?S_RUNNING, _} ->
+ case mem3_reshard_api:start_shard_splitting() of
+ {ok, JsonResult} ->
+ send_json(Req, 200, JsonResult);
+ {error, JsonError} ->
+ send_json(Req, 500, JsonError)
+ end;
+ {?S_STOPPED, Reason} ->
+ Reason1 = case Reason =:= undefined of
+ false -> Reason;
+ true -> <<"Cluster-wide resharding stopped by the user">>
+ end,
+ case mem3_reshard_api:stop_shard_splitting(Reason1) of
+ {ok, JsonResult} ->
+ send_json(Req, 200, JsonResult);
+ {error, JsonError} ->
+ send_json(Req, 500, JsonError)
+ end;
+ {_, _} ->
+ throw({bad_request, <<"State field not `running` or `stopped`">>})
+ end;
+
+handle_reshard_req(#httpd{path_parts=[_, ?STATE]} = Req) ->
+ send_method_not_allowed(Req, "GET,HEAD,PUT");
+
+handle_reshard_req(#httpd{path_parts=[_, ?STATE | _]} = _Req) ->
+ throw(not_found);
+
+% GET /_reshard/jobs
+handle_reshard_req(#httpd{method='GET', path_parts=[_, ?JOBS]}=Req) ->
+ reject_if_disabled(),
+ Jobs = mem3_reshard_api:get_jobs(),
+ Total = length(Jobs),
+ send_json(Req, {[{total_rows, Total}, {offset, 0}, {jobs, Jobs}]});
+
+% POST /_reshard/jobs {"node": "...", "shard": "..."}
+handle_reshard_req(#httpd{method = 'POST',
+ path_parts=[_, ?JOBS]} = Req) ->
+ reject_if_disabled(),
+ couch_httpd:validate_ctype(Req, "application/json"),
+ {Props} = couch_httpd:json_body_obj(Req),
+ Node = validate_node(couch_util:get_value(<<"node">>, Props)),
+ Shard = validate_shard(couch_util:get_value(<<"shard">>, Props)),
+ Db = validate_db(couch_util:get_value(<<"db">>, Props)),
+ Range = validate_range(couch_util:get_value(<<"range">>, Props)),
+ Type = validate_type(couch_util:get_value(<<"type">>, Props)),
+ Res = mem3_reshard_api:create_jobs(Node, Shard, Db, Range, Type),
+ case Res of
+ [] -> throw(not_found);
+ _ -> ok
+ end,
+ Oks = length([R || {ok, _, _, _} = R <- Res]),
+ Code = case {Oks, length(Res)} of
+ {Oks, Oks} -> 201;
+ {Oks, _} when Oks > 0 -> 202;
+ {0, _} -> 500
+ end,
+ EJson = lists:map(fun
+ ({ok, Id, N, S}) ->
+ {[{ok, true}, {id, Id}, {node, N}, {shard, S}]};
+ ({error, E, N, S}) ->
+ {[{error, couch_util:to_binary(E)}, {node, N}, {shard, S}]}
+ end, Res),
+ send_json(Req, Code, EJson);
+
+handle_reshard_req(#httpd{path_parts=[_, ?JOBS]} = Req) ->
+ send_method_not_allowed(Req, "GET,HEAD,POST");
+
+handle_reshard_req(#httpd{path_parts=[_, _]}) ->
+ throw(not_found);
+
+% GET /_reshard/jobs/$jobid
+handle_reshard_req(#httpd{method='GET',
+ path_parts=[_, ?JOBS, JobId]}=Req) ->
+ reject_if_disabled(),
+ case mem3_reshard_api:get_job(JobId) of
+ {ok, JobInfo} ->
+ send_json(Req, JobInfo);
+ {error, not_found} ->
+ throw(not_found)
+ end;
+
+% DELETE /_reshard/jobs/$jobid
+handle_reshard_req(#httpd{method='DELETE',
+ path_parts=[_, ?JOBS, JobId]}=Req) ->
+ reject_if_disabled(),
+ case mem3_reshard_api:get_job(JobId) of
+ {ok, {Props}} ->
+ NodeBin = couch_util:get_value(node, Props),
+ Node = binary_to_atom(NodeBin, utf8),
+ case rpc:call(Node, mem3_reshard, remove_job, [JobId]) of
+ ok ->
+ send_json(Req, 200, {[{ok, true}]});
+ {error, not_found} ->
+ throw(not_found)
+ end;
+ {error, not_found} ->
+ throw(not_found)
+ end;
+
+handle_reshard_req(#httpd{path_parts=[_, ?JOBS, _]} = Req) ->
+ send_method_not_allowed(Req, "GET,HEAD,DELETE");
+
+% GET /_reshard/jobs/$jobid/state
+handle_reshard_req(#httpd{method='GET',
+ path_parts=[_, ?JOBS, JobId, ?STATE]} = Req) ->
+ reject_if_disabled(),
+ case mem3_reshard_api:get_job(JobId) of
+ {ok, {Props}} ->
+ JobState = couch_util:get_value(job_state, Props),
+ {SIProps} = couch_util:get_value(state_info, Props),
+ Reason = case couch_util:get_value(reason, SIProps) of
+ undefined -> null;
+ Val -> couch_util:to_binary(Val)
+ end,
+ send_json(Req, 200, {[{state, JobState}, {reason, Reason}]});
+ {error, not_found} ->
+ throw(not_found)
+ end;
+
+% PUT /_reshard/jobs/$jobid/state
+handle_reshard_req(#httpd{method='PUT',
+ path_parts=[_, ?JOBS, JobId, ?STATE]} = Req) ->
+ reject_if_disabled(),
+ couch_httpd:validate_ctype(Req, "application/json"),
+ {Props} = couch_httpd:json_body_obj(Req),
+ State = couch_util:get_value(<<"state">>, Props),
+ Reason = couch_util:get_value(<<"reason">>, Props),
+ case {State, Reason} of
+ {undefined, _} ->
+ throw({bad_request, <<"Expected a `state` field">>});
+ {?S_RUNNING, _} ->
+ case mem3_reshard_api:resume_job(JobId) of
+ ok ->
+ send_json(Req, 200, {[{ok, true}]});
+ {error, not_found} ->
+ throw(not_found);
+ {error, JsonError} ->
+ send_json(Req, 500, JsonError)
+ end;
+ {?S_STOPPED, Reason} ->
+ Reason1 = case Reason =:= undefined of
+ false -> Reason;
+ true -> <<"Stopped by user">>
+ end,
+ case mem3_reshard_api:stop_job(JobId, Reason1) of
+ ok ->
+ send_json(Req, 200, {[{ok, true}]});
+ {error, not_found} ->
+ throw(not_found);
+ {error, JsonError} ->
+ send_json(Req, 500, JsonError)
+ end;
+ {_, _} ->
+ throw({bad_request, <<"State field not `running` or `stopped`">>})
+ end;
+
+handle_reshard_req(#httpd{path_parts=[_, ?JOBS, _, ?STATE]} = Req) ->
+ send_method_not_allowed(Req, "GET,HEAD,PUT").
+
+
+reject_if_disabled() ->
+ case mem3_reshard:is_disabled() of
+ true -> throw(not_implemented);
+ false -> ok
+ end.
+
+
+validate_type(<<"split">>) ->
+ split;
+
+validate_type(_Type) ->
+ throw({bad_request, <<"`job type must be `split`">>}).
+
+
+validate_node(undefined) ->
+ undefined;
+
+validate_node(Node0) when is_binary(Node0) ->
+ Nodes = mem3_util:live_nodes(),
+ try binary_to_existing_atom(Node0, utf8) of
+ N1 ->
+ case lists:member(N1, Nodes) of
+ true -> N1;
+ false -> throw({bad_request, <<"Not connected to `node`">>})
+ end
+ catch
+ error:badarg ->
+ throw({bad_request, <<"`node` is not a valid node name">>})
+ end;
+
+validate_node(_Node) ->
+ throw({bad_request, <<"Invalid `node`">>}).
+
+
+validate_shard(undefined) ->
+ undefined;
+
+validate_shard(Shard) when is_binary(Shard) ->
+ case Shard of
+ <<"shards/", _:8/binary, "-", _:8/binary, "/", _/binary>> ->
+ Shard;
+ _ ->
+ throw({bad_request, <<"`shard` is invalid">>})
+ end;
+
+validate_shard(_Shard) ->
+ throw({bad_request, <<"Invalid `shard`">>}).
+
+
+validate_db(undefined) ->
+ undefined;
+
+validate_db(DbName) when is_binary(DbName) ->
+ try mem3:shards(DbName) of
+ [_ | _] -> DbName;
+ _ -> throw({bad_request, <<"`No shards in `db`">>})
+ catch
+ _:_ ->
+ throw({bad_request, <<"Invalid `db`">>})
+ end;
+
+validate_db(_bName) ->
+ throw({bad_request, <<"Invalid `db`">>}).
+
+
+validate_range(undefined) ->
+ undefined;
+
+validate_range(<<BBin:8/binary, "-", EBin:8/binary>>) ->
+ {B, E} = try
+ {
+ httpd_util:hexlist_to_integer(binary_to_list(BBin)),
+ httpd_util:hexlist_to_integer(binary_to_list(EBin))
+ }
+ catch
+ _:_ ->
+ invalid_range()
+ end,
+ if
+ B < 0 -> invalid_range();
+ E < 0 -> invalid_range();
+ B > (2 bsl 31) - 1 -> invalid_range();
+ E > (2 bsl 31) - 1 -> invalid_range();
+ B >= E -> invalid_range();
+ true -> ok
+ end,
+ % Use a list format here to make it look the same as #shard's range
+ [B, E];
+
+validate_range(_Range) ->
+ invalid_range().
+
+
+invalid_range() ->
+ throw({bad_request, <<"Invalid `range`">>}).
diff --git a/src/mem3/test/mem3_reshard_api_test.erl b/src/mem3/test/mem3_reshard_api_test.erl
new file mode 100644
index 000000000..c8be28591
--- /dev/null
+++ b/src/mem3/test/mem3_reshard_api_test.erl
@@ -0,0 +1,817 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_reshard_api_test).
+
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("mem3/src/mem3_reshard.hrl").
+
+
+-define(USER, "mem3_reshard_api_test_admin").
+-define(PASS, "pass").
+-define(AUTH, {basic_auth, {?USER, ?PASS}}).
+-define(JSON, {"Content-Type", "application/json"}).
+-define(RESHARD, "_reshard/").
+-define(JOBS, "_reshard/jobs/").
+-define(STATE, "_reshard/state").
+-define(ID, <<"id">>).
+-define(OK, <<"ok">>).
+
+
+setup() ->
+ Hashed = couch_passwords:hash_admin_password(?PASS),
+ ok = config:set("admins", ?USER, ?b2l(Hashed), _Persist=false),
+ Addr = config:get("chttpd", "bind_address", "127.0.0.1"),
+ Port = mochiweb_socket_server:get(chttpd, port),
+ Url = lists:concat(["http://", Addr, ":", Port, "/"]),
+ {Db1, Db2, Db3} = {?tempdb(), ?tempdb(), ?tempdb()},
+ create_db(Url, Db1, "?q=1&n=1"),
+ create_db(Url, Db2, "?q=1&n=1"),
+ create_db(Url, Db3, "?q=2&n=1"),
+ {Url, {Db1, Db2, Db3}}.
+
+
+teardown({Url, {Db1, Db2, Db3}}) ->
+ mem3_reshard:reset_state(),
+ application:unset_env(mem3, reshard_disabled),
+ delete_db(Url, Db1),
+ delete_db(Url, Db2),
+ delete_db(Url, Db3),
+ ok = config:delete("reshard", "max_jobs", _Persist=false),
+ ok = config:delete("admins", ?USER, _Persist=false),
+ meck:unload().
+
+
+start_couch() ->
+ test_util:start_couch([mem3, chttpd]).
+
+
+stop_couch(Ctx) ->
+ test_util:stop_couch(Ctx).
+
+
+mem3_reshard_api_test_() ->
+ {
+ "mem3 shard split api tests",
+ {
+ setup,
+ fun start_couch/0, fun stop_couch/1,
+ {
+ foreach,
+ fun setup/0, fun teardown/1,
+ [
+ fun basics/1,
+ fun create_job_basic/1,
+ fun create_two_jobs/1,
+ fun create_multiple_jobs_from_one_post/1,
+ fun start_stop_cluster_basic/1,
+ fun test_disabled/1,
+ fun start_stop_cluster_with_a_job/1,
+ fun individual_job_start_stop/1,
+ fun individual_job_stop_when_cluster_stopped/1,
+ fun create_job_with_invalid_arguments/1,
+ fun create_job_with_db/1,
+ fun create_job_with_shard_name/1,
+ fun completed_job_handling/1,
+ fun handle_db_deletion_in_initial_copy/1,
+ fun handle_db_deletion_in_topoff1/1,
+ fun handle_db_deletion_in_copy_local_docs/1,
+ fun handle_db_deletion_in_build_indices/1,
+ fun handle_db_deletion_in_update_shard_map/1,
+ fun handle_db_deletion_in_wait_source_close/1,
+ fun recover_in_initial_copy/1,
+ fun recover_in_topoff1/1,
+ fun recover_in_copy_local_docs/1,
+ fun recover_in_build_indices/1,
+ fun recover_in_update_shard_map/1,
+ fun recover_in_wait_source_close/1,
+ fun recover_in_topoff3/1,
+ fun recover_in_source_delete/1,
+ fun check_max_jobs/1,
+ fun cleanup_completed_jobs/1
+ ]
+ }
+ }
+ }.
+
+
+basics({Top, _}) ->
+ ?_test(begin
+ % GET /_reshard
+ ?assertMatch({200, #{
+ <<"state">> := <<"running">>,
+ <<"state_reason">> := null,
+ <<"completed">> := 0,
+ <<"failed">> := 0,
+ <<"running">> := 0,
+ <<"stopped">> := 0,
+ <<"total">> := 0
+ }}, req(get, Top ++ ?RESHARD)),
+
+ % GET _reshard/state
+ ?assertMatch({200, #{<<"state">> := <<"running">>}},
+ req(get, Top ++ ?STATE)),
+
+ % GET _reshard/jobs
+ ?assertMatch({200, #{
+ <<"jobs">> := [],
+ <<"offset">> := 0,
+ <<"total_rows">> := 0
+ }}, req(get, Top ++ ?JOBS)),
+
+ % Some invalid paths and methods
+ ?assertMatch({404, _}, req(get, Top ++ ?RESHARD ++ "/invalidpath")),
+ ?assertMatch({405, _}, req(put, Top ++ ?RESHARD, #{dont => thinkso})),
+ ?assertMatch({405, _}, req(post, Top ++ ?RESHARD, #{nope => nope}))
+ end).
+
+
+create_job_basic({Top, {Db1, _, _}}) ->
+ ?_test(begin
+ % POST /_reshard/jobs
+ {C1, R1} = req(post, Top ++ ?JOBS, #{type => split, db => Db1}),
+ ?assertEqual(201, C1),
+ ?assertMatch([#{?OK := true, ?ID := J, <<"shard">> := S}]
+ when is_binary(J) andalso is_binary(S), R1),
+ [#{?ID := Id, <<"shard">> := Shard}] = R1,
+
+ % GET /_reshard/jobs
+ ?assertMatch({200, #{
+ <<"jobs">> := [#{?ID := Id, <<"type">> := <<"split">>}],
+ <<"offset">> := 0,
+ <<"total_rows">> := 1
+ }}, req(get, Top ++ ?JOBS)),
+
+ % GET /_reshard/job/$jobid
+ {C2, R2} = req(get, Top ++ ?JOBS ++ ?b2l(Id)),
+ ?assertEqual(200, C2),
+ ThisNode = atom_to_binary(node(), utf8),
+ ?assertMatch(#{?ID := Id}, R2),
+ ?assertMatch(#{<<"type">> := <<"split">>}, R2),
+ ?assertMatch(#{<<"source">> := Shard}, R2),
+ ?assertMatch(#{<<"history">> := History} when length(History) > 1, R2),
+ ?assertMatch(#{<<"node">> := ThisNode}, R2),
+ ?assertMatch(#{<<"split_state">> := SSt} when is_binary(SSt), R2),
+ ?assertMatch(#{<<"job_state">> := JSt} when is_binary(JSt), R2),
+ ?assertMatch(#{<<"state_info">> := #{}}, R2),
+ ?assertMatch(#{<<"target">> := Target} when length(Target) == 2, R2),
+
+ % GET /_reshard/job/$jobid/state
+ ?assertMatch({200, #{<<"state">> := S, <<"reason">> := R}}
+ when is_binary(S) andalso (is_binary(R) orelse R =:= null),
+ req(get, Top ++ ?JOBS ++ ?b2l(Id) ++ "/state")),
+
+ % GET /_reshard
+ ?assertMatch({200, #{<<"state">> := <<"running">>, <<"total">> := 1}},
+ req(get, Top ++ ?RESHARD)),
+
+ % DELETE /_reshard/jobs/$jobid
+ ?assertMatch({200, #{?OK := true}},
+ req(delete, Top ++ ?JOBS ++ ?b2l(Id))),
+
+ % GET _reshard/jobs
+ ?assertMatch({200, #{<<"jobs">> := [], <<"total_rows">> := 0}},
+ req(get, Top ++ ?JOBS)),
+
+ % GET /_reshard/job/$jobid should be a 404
+ ?assertMatch({404, #{}}, req(get, Top ++ ?JOBS ++ ?b2l(Id))),
+
+ % DELETE /_reshard/jobs/$jobid should be a 404 as well
+ ?assertMatch({404, #{}}, req(delete, Top ++ ?JOBS ++ ?b2l(Id)))
+ end).
+
+
+create_two_jobs({Top, {Db1, Db2, _}}) ->
+ ?_test(begin
+ Jobs = Top ++ ?JOBS,
+
+ ?assertMatch({201, [#{?OK := true}]},
+ req(post, Jobs, #{type => split, db => Db1})),
+ ?assertMatch({201, [#{?OK := true}]},
+ req(post, Jobs, #{type => split, db => Db2})),
+
+ ?assertMatch({200, #{<<"total">> := 2}}, req(get, Top ++ ?RESHARD)),
+
+ ?assertMatch({200, #{
+ <<"jobs">> := [#{?ID := Id1}, #{?ID := Id2}],
+ <<"offset">> := 0,
+ <<"total_rows">> := 2
+ }} when Id1 =/= Id2, req(get, Jobs)),
+
+ {200, #{<<"jobs">> := [#{?ID := Id1}, #{?ID := Id2}]}} = req(get, Jobs),
+
+ {200, #{?OK := true}} = req(delete, Jobs ++ ?b2l(Id1)),
+ ?assertMatch({200, #{<<"total">> := 1}}, req(get, Top ++ ?RESHARD)),
+ {200, #{?OK := true}} = req(delete, Jobs ++ ?b2l(Id2)),
+ ?assertMatch({200, #{<<"total">> := 0}}, req(get, Top ++ ?RESHARD))
+ end).
+
+
+create_multiple_jobs_from_one_post({Top, {_, _, Db3}}) ->
+ ?_test(begin
+ Jobs = Top ++ ?JOBS,
+ {C1, R1} = req(post, Jobs, #{type => split, db => Db3}),
+ ?assertMatch({201, [#{?OK := true}, #{?OK := true}]}, {C1, R1}),
+ ?assertMatch({200, #{<<"total">> := 2}}, req(get, Top ++ ?RESHARD))
+ end).
+
+
+start_stop_cluster_basic({Top, _}) ->
+ ?_test(begin
+ Url = Top ++ ?STATE,
+
+ ?assertMatch({200, #{
+ <<"state">> := <<"running">>,
+ <<"reason">> := null
+ }}, req(get, Url)),
+
+ ?assertMatch({200, _}, req(put, Url, #{state => stopped})),
+ ?assertMatch({200, #{
+ <<"state">> := <<"stopped">>,
+ <<"reason">> := R
+ }} when is_binary(R), req(get, Url)),
+
+ ?assertMatch({200, _}, req(put, Url, #{state => running})),
+
+ % Make sure the reason shows in the state GET request
+ Reason = <<"somereason">>,
+ ?assertMatch({200, _}, req(put, Url, #{state => stopped,
+ reason => Reason})),
+ ?assertMatch({200, #{<<"state">> := <<"stopped">>,
+ <<"reason">> := Reason}}, req(get, Url)),
+
+ % Top level summary also shows the reason
+ ?assertMatch({200, #{
+ <<"state">> := <<"stopped">>,
+ <<"state_reason">> := Reason
+ }}, req(get, Top ++ ?RESHARD)),
+ ?assertMatch({200, _}, req(put, Url, #{state => running})),
+ ?assertMatch({200, #{<<"state">> := <<"running">>}}, req(get, Url))
+ end).
+
+
+test_disabled({Top, _}) ->
+ ?_test(begin
+ application:set_env(mem3, reshard_disabled, true),
+ ?assertMatch({501, _}, req(get, Top ++ ?RESHARD)),
+ ?assertMatch({501, _}, req(put, Top ++ ?STATE, #{state => running})),
+
+ application:unset_env(mem3, reshard_disabled),
+ ?assertMatch({200, _}, req(get, Top ++ ?RESHARD)),
+ ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => running}))
+ end).
+
+
+start_stop_cluster_with_a_job({Top, {Db1, _, _}}) ->
+ ?_test(begin
+ Url = Top ++ ?STATE,
+
+ ?assertMatch({200, _}, req(put, Url, #{state => stopped})),
+ ?assertMatch({200, #{<<"state">> := <<"stopped">>}}, req(get, Url)),
+
+ % Can add jobs with global state stopped, they just won't be running
+ {201, R1} = req(post, Top ++ ?JOBS, #{type => split, db => Db1}),
+ ?assertMatch([#{?OK := true}], R1),
+ [#{?ID := Id1}] = R1,
+ {200, J1} = req(get, Top ++ ?JOBS ++ ?b2l(Id1)),
+ ?assertMatch(#{?ID := Id1, <<"job_state">> := <<"stopped">>}, J1),
+ % Check summary stats
+ ?assertMatch({200, #{
+ <<"state">> := <<"stopped">>,
+ <<"running">> := 0,
+ <<"stopped">> := 1,
+ <<"total">> := 1
+ }}, req(get, Top ++ ?RESHARD)),
+
+ % Can delete the job when stopped
+ {200, #{?OK := true}} = req(delete, Top ++ ?JOBS ++ ?b2l(Id1)),
+ ?assertMatch({200, #{
+ <<"state">> := <<"stopped">>,
+ <<"running">> := 0,
+ <<"stopped">> := 0,
+ <<"total">> := 0
+ }}, req(get, Top ++ ?RESHARD)),
+
+ % Add same job again
+ {201, [#{?ID := Id2}]} = req(post, Top ++ ?JOBS, #{type => split,
+ db => Db1}),
+ ?assertMatch({200, #{?ID := Id2, <<"job_state">> := <<"stopped">>}},
+ req(get, Top ++ ?JOBS ++ ?b2l(Id2))),
+
+ % Job should start after resharding is started on the cluster
+ ?assertMatch({200, _}, req(put, Url, #{state => running})),
+ ?assertMatch({200, #{?ID := Id2, <<"job_state">> := JSt}}
+ when JSt =/= <<"stopped">>, req(get, Top ++ ?JOBS ++ ?b2l(Id2)))
+ end).
+
+
+individual_job_start_stop({Top, {Db1, _, _}}) ->
+ ?_test(begin
+ intercept_state(topoff1),
+
+ Body = #{type => split, db => Db1},
+ {201, [#{?ID := Id}]} = req(post, Top ++ ?JOBS, Body),
+
+ JobUrl = Top ++ ?JOBS ++ ?b2l(Id),
+ StUrl = JobUrl ++ "/state",
+
+ % Wait for the the job to start running and intercept it in topoff1 state
+ receive {JobPid, topoff1} -> ok end,
+ % Tell the intercept to never finish checkpointing so job is left hanging
+ % forever in running state
+ JobPid ! cancel,
+ ?assertMatch({200, #{<<"state">> := <<"running">>}}, req(get, StUrl)),
+
+ {200, _} = req(put, StUrl, #{state => stopped}),
+ wait_state(StUrl, <<"stopped">>),
+
+ % Stop/start resharding globally and job should still stay stopped
+ ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => stopped})),
+ ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => running})),
+ ?assertMatch({200, #{<<"state">> := <<"stopped">>}}, req(get, StUrl)),
+
+ % Start the job again
+ ?assertMatch({200, _}, req(put, StUrl, #{state => running})),
+ % Wait for the the job to start running and intercept it in topoff1 state
+ receive {JobPid2, topoff1} -> ok end,
+ ?assertMatch({200, #{<<"state">> := <<"running">>}}, req(get, StUrl)),
+ % Let it continue running and it should complete eventually
+ JobPid2 ! continue,
+ wait_state(StUrl, <<"completed">>)
+ end).
+
+
+individual_job_stop_when_cluster_stopped({Top, {Db1, _, _}}) ->
+ ?_test(begin
+ intercept_state(topoff1),
+
+ Body = #{type => split, db => Db1},
+ {201, [#{?ID := Id}]} = req(post, Top ++ ?JOBS, Body),
+
+ JobUrl = Top ++ ?JOBS ++ ?b2l(Id),
+ StUrl = JobUrl ++ "/state",
+
+ % Wait for the the job to start running and intercept in topoff1
+ receive {JobPid, topoff1} -> ok end,
+ % Tell the intercept to never finish checkpointing so job is left
+ % hanging forever in running state
+ JobPid ! cancel,
+ ?assertMatch({200, #{<<"state">> := <<"running">>}}, req(get, StUrl)),
+
+ % Stop resharding globally
+ ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => stopped})),
+ wait_state(StUrl, <<"stopped">>),
+
+ % Stop the job specifically
+ {200, _} = req(put, StUrl, #{state => stopped}),
+ % Job stays stopped
+ ?assertMatch({200, #{<<"state">> := <<"stopped">>}}, req(get, StUrl)),
+
+ % Set cluster to running again
+ ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => running})),
+
+ % The job should stay stopped
+ ?assertMatch({200, #{<<"state">> := <<"stopped">>}}, req(get, StUrl)),
+
+ % It should be possible to resume job and it should complete
+ ?assertMatch({200, _}, req(put, StUrl, #{state => running})),
+
+ % Wait for the the job to start running and intercept in topoff1 state
+ receive {JobPid2, topoff1} -> ok end,
+ ?assertMatch({200, #{<<"state">> := <<"running">>}}, req(get, StUrl)),
+
+ % Let it continue running and it should complete eventually
+ JobPid2 ! continue,
+ wait_state(StUrl, <<"completed">>)
+ end).
+
+
+create_job_with_invalid_arguments({Top, {Db1, _, _}}) ->
+ ?_test(begin
+ Jobs = Top ++ ?JOBS,
+
+ % Nothing in the body
+ ?assertMatch({400, _}, req(post, Jobs, #{})),
+
+ % Missing type
+ ?assertMatch({400, _}, req(post, Jobs, #{db => Db1})),
+
+ % Have type but no db and no shard
+ ?assertMatch({400, _}, req(post, Jobs, #{type => split})),
+
+ % Have type and db but db is invalid
+ ?assertMatch({400, _}, req(post, Jobs, #{db => <<"baddb">>,
+ type => split})),
+
+ % Have type and shard but shard is not an existing database
+ ?assertMatch({404, _}, req(post, Jobs, #{type => split,
+ shard => <<"shards/80000000-ffffffff/baddb.1549492084">>})),
+
+ % Bad range values, too large, different types, inverted
+ ?assertMatch({400, _}, req(post, Jobs, #{db => Db1, range => 42,
+ type => split})),
+ ?assertMatch({400, _}, req(post, Jobs, #{db => Db1,
+ range => <<"x">>, type => split})),
+ ?assertMatch({400, _}, req(post, Jobs, #{db => Db1,
+ range => <<"ffffffff-80000000">>, type => split})),
+ ?assertMatch({400, _}, req(post, Jobs, #{db => Db1,
+ range => <<"00000000-fffffffff">>, type => split})),
+
+ % Can't have both db and shard
+ ?assertMatch({400, _}, req(post, Jobs, #{type => split, db => Db1,
+ shard => <<"blah">>}))
+ end).
+
+
+create_job_with_db({Top, {Db1, _, _}}) ->
+ ?_test(begin
+ Jobs = Top ++ ?JOBS,
+ Body1 = #{type => split, db => Db1},
+
+ % Node with db
+ N = atom_to_binary(node(), utf8),
+ {C1, R1} = req(post, Jobs, Body1#{node => N}),
+ ?assertMatch({201, [#{?OK := true}]}, {C1, R1}),
+ wait_to_complete_then_cleanup(Top, R1),
+
+ % Range and db
+ {C2, R2} = req(post, Jobs, Body1#{range => <<"00000000-7fffffff">>}),
+ ?assertMatch({201, [#{?OK := true}]}, {C2, R2}),
+ wait_to_complete_then_cleanup(Top, R2),
+
+ % Node, range and db
+ Range = <<"80000000-ffffffff">>,
+ {C3, R3} = req(post, Jobs, Body1#{range => Range, node => N}),
+ ?assertMatch({201, [#{?OK := true}]}, {C3, R3}),
+ wait_to_complete_then_cleanup(Top, R3),
+
+ ?assertMatch([
+ [16#00000000, 16#3fffffff],
+ [16#40000000, 16#7fffffff],
+ [16#80000000, 16#bfffffff],
+ [16#c0000000, 16#ffffffff]
+ ], [mem3:range(S) || S <- lists:sort(mem3:shards(Db1))])
+ end).
+
+
+create_job_with_shard_name({Top, {_, _, Db3}}) ->
+ ?_test(begin
+ Jobs = Top ++ ?JOBS,
+ [S1, S2] = [mem3:name(S) || S <- lists:sort(mem3:shards(Db3))],
+
+ % Shard only
+ {C1, R1} = req(post, Jobs, #{type => split, shard => S1}),
+ ?assertMatch({201, [#{?OK := true}]}, {C1, R1}),
+ wait_to_complete_then_cleanup(Top, R1),
+
+ % Shard with a node
+ N = atom_to_binary(node(), utf8),
+ {C2, R2} = req(post, Jobs, #{type => split, shard => S2, node => N}),
+ ?assertMatch({201, [#{?OK := true}]}, {C2, R2}),
+ wait_to_complete_then_cleanup(Top, R2),
+
+ ?assertMatch([
+ [16#00000000, 16#3fffffff],
+ [16#40000000, 16#7fffffff],
+ [16#80000000, 16#bfffffff],
+ [16#c0000000, 16#ffffffff]
+ ], [mem3:range(S) || S <- lists:sort(mem3:shards(Db3))])
+ end).
+
+
+completed_job_handling({Top, {Db1, _, _}}) ->
+ ?_test(begin
+ Jobs = Top ++ ?JOBS,
+
+ % Run job to completion
+ {C1, R1} = req(post, Jobs, #{type => split, db => Db1}),
+ ?assertMatch({201, [#{?OK := true}]}, {C1, R1}),
+ [#{?ID := Id}] = R1,
+ wait_to_complete(Top, R1),
+
+ % Check top level stats
+ ?assertMatch({200, #{
+ <<"state">> := <<"running">>,
+ <<"state_reason">> := null,
+ <<"completed">> := 1,
+ <<"failed">> := 0,
+ <<"running">> := 0,
+ <<"stopped">> := 0,
+ <<"total">> := 1
+ }}, req(get, Top ++ ?RESHARD)),
+
+ % Job state itself
+ JobUrl = Jobs ++ ?b2l(Id),
+ ?assertMatch({200, #{
+ <<"split_state">> := <<"completed">>,
+ <<"job_state">> := <<"completed">>
+ }}, req(get, JobUrl)),
+
+ % Job's state endpoint
+ StUrl = Jobs ++ ?b2l(Id) ++ "/state",
+ ?assertMatch({200, #{<<"state">> := <<"completed">>}}, req(get, StUrl)),
+
+ % Try to stop it and it should stay completed
+ {200, _} = req(put, StUrl, #{state => stopped}),
+ ?assertMatch({200, #{<<"state">> := <<"completed">>}}, req(get, StUrl)),
+
+ % Try to resume it and it should stay completed
+ {200, _} = req(put, StUrl, #{state => running}),
+ ?assertMatch({200, #{<<"state">> := <<"completed">>}}, req(get, StUrl)),
+
+ % Stop resharding globally and job should still stay completed
+ ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => stopped})),
+ ?assertMatch({200, #{<<"state">> := <<"completed">>}}, req(get, StUrl)),
+
+ % Start resharding and job stays completed
+ ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => running})),
+ ?assertMatch({200, #{<<"state">> := <<"completed">>}}, req(get, StUrl)),
+
+ ?assertMatch({200, #{?OK := true}}, req(delete, JobUrl))
+ end).
+
+
+handle_db_deletion_in_topoff1({Top, {Db1, _, _}}) ->
+ ?_test(begin
+ JobId = delete_source_in_state(Top, Db1, topoff1),
+ wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"failed">>)
+ end).
+
+
+handle_db_deletion_in_initial_copy({Top, {Db1, _, _}}) ->
+ ?_test(begin
+ JobId = delete_source_in_state(Top, Db1, initial_copy),
+ wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"failed">>)
+ end).
+
+
+handle_db_deletion_in_copy_local_docs({Top, {Db1, _, _}}) ->
+ ?_test(begin
+ JobId = delete_source_in_state(Top, Db1, copy_local_docs),
+ wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"failed">>)
+ end).
+
+
+handle_db_deletion_in_build_indices({Top, {Db1, _, _}}) ->
+ ?_test(begin
+ JobId = delete_source_in_state(Top, Db1, build_indices),
+ wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"failed">>)
+ end).
+
+
+handle_db_deletion_in_update_shard_map({Top, {Db1, _, _}}) ->
+ ?_test(begin
+ JobId = delete_source_in_state(Top, Db1, update_shardmap),
+ wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"failed">>)
+ end).
+
+
+handle_db_deletion_in_wait_source_close({Top, {Db1, _, _}}) ->
+ ?_test(begin
+ JobId = delete_source_in_state(Top, Db1, wait_source_close),
+ wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"failed">>)
+ end).
+
+
+recover_in_topoff1({Top, {Db1, _, _}}) ->
+ ?_test(begin
+ JobId = recover_in_state(Top, Db1, topoff1),
+ wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>)
+ end).
+
+
+recover_in_initial_copy({Top, {Db1, _, _}}) ->
+ ?_test(begin
+ JobId = recover_in_state(Top, Db1, initial_copy),
+ wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>)
+ end).
+
+
+recover_in_copy_local_docs({Top, {Db1, _, _}}) ->
+ ?_test(begin
+ JobId = recover_in_state(Top, Db1, copy_local_docs),
+ wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>)
+ end).
+
+
+recover_in_build_indices({Top, {Db1, _, _}}) ->
+ ?_test(begin
+ JobId = recover_in_state(Top, Db1, build_indices),
+ wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>)
+ end).
+
+
+recover_in_update_shard_map({Top, {Db1, _, _}}) ->
+ ?_test(begin
+ JobId = recover_in_state(Top, Db1, update_shardmap),
+ wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>)
+ end).
+
+
+recover_in_wait_source_close({Top, {Db1, _, _}}) ->
+ ?_test(begin
+ JobId = recover_in_state(Top, Db1, wait_source_close),
+ wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>)
+ end).
+
+
+recover_in_topoff3({Top, {Db1, _, _}}) ->
+ ?_test(begin
+ JobId = recover_in_state(Top, Db1, topoff3),
+ wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>)
+ end).
+
+
+recover_in_source_delete({Top, {Db1, _, _}}) ->
+ ?_test(begin
+ JobId = recover_in_state(Top, Db1, source_delete),
+ wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>)
+ end).
+
+
+check_max_jobs({Top, {Db1, Db2, _}}) ->
+ ?_test(begin
+ Jobs = Top ++ ?JOBS,
+
+ config:set("reshard", "max_jobs", "0", _Persist=false),
+ {C1, R1} = req(post, Jobs, #{type => split, db => Db1}),
+ ?assertMatch({500, [#{<<"error">> := <<"max_jobs_exceeded">>}]}, {C1, R1}),
+
+ config:set("reshard", "max_jobs", "1", _Persist=false),
+ {201, R2} = req(post, Jobs, #{type => split, db => Db1}),
+ wait_to_complete(Top, R2),
+
+ % Stop clustering so jobs are not started anymore and ensure max jobs
+ % is enforced even if jobs are stopped
+ ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => stopped})),
+
+ {C3, R3} = req(post, Jobs, #{type => split, db => Db2}),
+ ?assertMatch({500, [#{<<"error">> := <<"max_jobs_exceeded">>}]},
+ {C3, R3}),
+
+ % Allow the job to be created by raising max_jobs
+ config:set("reshard", "max_jobs", "2", _Persist=false),
+
+ {C4, R4} = req(post, Jobs, #{type => split, db => Db2}),
+ ?assertEqual(201, C4),
+
+ % Lower max_jobs after job is created but it's not running
+ config:set("reshard", "max_jobs", "1", _Persist=false),
+
+ % Start resharding again
+ ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => running})),
+
+ % Jobs that have been created already are not removed if max jobs is lowered
+ % so make sure the job completes
+ wait_to_complete(Top, R4)
+ end).
+
+
+cleanup_completed_jobs({Top, {Db1, _, _}}) ->
+ ?_test(begin
+ Body = #{type => split, db => Db1},
+ {201, [#{?ID := Id}]} = req(post, Top ++ ?JOBS, Body),
+ JobUrl = Top ++ ?JOBS ++ ?b2l(Id),
+ wait_state(JobUrl ++ "/state", <<"completed">>),
+ delete_db(Top, Db1),
+ wait_for_http_code(JobUrl, 404)
+ end).
+
+
+% Test help functions
+
+wait_to_complete_then_cleanup(Top, Jobs) ->
+ JobsUrl = Top ++ ?JOBS,
+ lists:foreach(fun(#{?ID := Id}) ->
+ wait_state(JobsUrl ++ ?b2l(Id) ++ "/state", <<"completed">>),
+ {200, _} = req(delete, JobsUrl ++ ?b2l(Id))
+ end, Jobs).
+
+
+wait_to_complete(Top, Jobs) ->
+ JobsUrl = Top ++ ?JOBS,
+ lists:foreach(fun(#{?ID := Id}) ->
+ wait_state(JobsUrl ++ ?b2l(Id) ++ "/state", <<"completed">>)
+ end, Jobs).
+
+
+intercept_state(State) ->
+ TestPid = self(),
+ meck:new(mem3_reshard_job, [passthrough]),
+ meck:expect(mem3_reshard_job, checkpoint_done, fun(Job) ->
+ case Job#job.split_state of
+ State ->
+ TestPid ! {self(), State},
+ receive
+ continue -> meck:passthrough([Job]);
+ cancel -> ok
+ end;
+ _ ->
+ meck:passthrough([Job])
+ end
+ end).
+
+
+cancel_intercept() ->
+ meck:expect(mem3_reshard_job, checkpoint_done, fun(Job) ->
+ meck:passthrough([Job])
+ end).
+
+
+wait_state(Url, State) ->
+ test_util:wait(fun() ->
+ case req(get, Url) of
+ {200, #{<<"state">> := State}} -> ok;
+ {200, #{}} -> timer:sleep(100), wait
+ end
+ end, 30000).
+
+
+wait_for_http_code(Url, Code) when is_integer(Code) ->
+ test_util:wait(fun() ->
+ case req(get, Url) of
+ {Code, _} -> ok;
+ {_, _} -> timer:sleep(100), wait
+ end
+ end, 30000).
+
+
+delete_source_in_state(Top, Db, State) when is_atom(State), is_binary(Db) ->
+ intercept_state(State),
+ Body = #{type => split, db => Db},
+ {201, [#{?ID := Id}]} = req(post, Top ++ ?JOBS, Body),
+ receive {JobPid, State} -> ok end,
+ sync_delete_db(Top, Db),
+ JobPid ! continue,
+ Id.
+
+
+recover_in_state(Top, Db, State) when is_atom(State) ->
+ intercept_state(State),
+ Body = #{type => split, db => Db},
+ {201, [#{?ID := Id}]} = req(post, Top ++ ?JOBS, Body),
+ receive {JobPid, State} -> ok end,
+ % Job is now stuck in running we prevented it from executing
+ % the given state
+ JobPid ! cancel,
+ % Now restart resharding
+ ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => stopped})),
+ cancel_intercept(),
+ ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => running})),
+ Id.
+
+
+create_db(Top, Db, QArgs) when is_binary(Db) ->
+ Url = Top ++ binary_to_list(Db) ++ QArgs,
+ {ok, Status, _, _} = test_request:put(Url, [?JSON, ?AUTH], "{}"),
+ ?assert(Status =:= 201 orelse Status =:= 202).
+
+
+delete_db(Top, Db) when is_binary(Db) ->
+ Url = Top ++ binary_to_list(Db),
+ case test_request:get(Url, [?AUTH]) of
+ {ok, 404, _, _} ->
+ not_found;
+ {ok, 200, _, _} ->
+ {ok, 200, _, _} = test_request:delete(Url, [?AUTH]),
+ ok
+ end.
+
+
+sync_delete_db(Top, Db) when is_binary(Db) ->
+ delete_db(Top, Db),
+ try
+ Shards = mem3:local_shards(Db),
+ ShardNames = [mem3:name(S) || S <- Shards],
+ [couch_server:delete(N, [?ADMIN_CTX]) || N <- ShardNames],
+ ok
+ catch
+ error:database_does_not_exist ->
+ ok
+ end.
+
+
+req(Method, Url) ->
+ Headers = [?AUTH],
+ {ok, Code, _, Res} = test_request:request(Method, Url, Headers),
+ {Code, jiffy:decode(Res, [return_maps])}.
+
+
+req(Method, Url, #{} = Body) ->
+ req(Method, Url, jiffy:encode(Body));
+
+req(Method, Url, Body) ->
+ Headers = [?JSON, ?AUTH],
+ {ok, Code, _, Res} = test_request:request(Method, Url, Headers, Body),
+ {Code, jiffy:decode(Res, [return_maps])}.
diff --git a/src/mem3/test/mem3_reshard_changes_feed_test.erl b/src/mem3/test/mem3_reshard_changes_feed_test.erl
new file mode 100644
index 000000000..52e18fb26
--- /dev/null
+++ b/src/mem3/test/mem3_reshard_changes_feed_test.erl
@@ -0,0 +1,388 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_reshard_changes_feed_test).
+
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("mem3/src/mem3_reshard.hrl").
+
+
+-define(assertChanges(Expected, Received),
+ begin
+ ((fun() ->
+ ExpectedIDs = lists:sort([I || #{id := I} <- Expected]),
+ ReceivedIDs = lists:sort([I || #{id := I} <- Received]),
+ ?assertEqual(ExpectedIDs, ReceivedIDs)
+ end)())
+ end).
+
+
+setup() ->
+ Db1 = ?tempdb(),
+ create_db(Db1, [{q, 1}, {n, 1}]),
+ #{db1 => Db1}.
+
+
+teardown(#{} = Dbs) ->
+ mem3_reshard:reset_state(),
+ maps:map(fun(_, Db) -> delete_db(Db) end, Dbs).
+
+
+start_couch() ->
+ test_util:start_couch(?CONFIG_CHAIN, [mem3, fabric]).
+
+
+stop_couch(Ctx) ->
+ test_util:stop_couch(Ctx).
+
+
+mem3_reshard_changes_feed_test_() ->
+ {
+ "mem3 shard split changes feed tests",
+ {
+ setup,
+ fun start_couch/0, fun stop_couch/1,
+ {
+ foreach,
+ fun setup/0, fun teardown/1,
+ [
+ fun normal_feed_should_work_after_split/1,
+ fun continuous_feed_should_work_during_split/1
+ ]
+ }
+ }
+ }.
+
+
+normal_feed_should_work_after_split(#{db1 := Db}) ->
+ ?_test(begin
+ DocSpec = #{
+ docs => [1, 10],
+ delete => [5, 6]
+ },
+ add_test_docs(Db, DocSpec),
+
+ % gather pre-shard changes
+ BaseArgs = #changes_args{feed = "normal", dir = fwd, since = 0},
+ {ok, OldChanges, OldEndSeq} = get_changes_feed(Db, BaseArgs),
+
+ % Split the shard
+ split_and_wait(Db),
+
+ % verify changes list consistent for all the old seqs
+ lists:foldl(fun(#{seq := Seq} = C, ExpectedChanges) ->
+ Args = BaseArgs#changes_args{since = Seq},
+ {ok, Changes, _EndSeq} = get_changes_feed(Db, Args),
+ ?assertChanges(ExpectedChanges, Changes),
+ [C | ExpectedChanges]
+ end, [], OldChanges),
+
+ % confirm that old LastSeq respected
+ Args1 = BaseArgs#changes_args{since = OldEndSeq},
+ {ok, Changes1, EndSeq1} = get_changes_feed(Db, Args1),
+ ?assertChanges([], Changes1),
+
+ % confirm that new LastSeq also respected
+ Args2 = BaseArgs#changes_args{since = EndSeq1},
+ {ok, Changes2, EndSeq2} = get_changes_feed(Db, Args2),
+ ?assertChanges([], Changes2),
+ ?assertEqual(EndSeq2, EndSeq1),
+
+ % confirm we didn't lost any changes and have consistent last seq
+ {ok, Changes3, EndSeq3} = get_changes_feed(Db, BaseArgs),
+ ?assertChanges(OldChanges, Changes3),
+
+ % add some docs
+ add_test_docs(Db, #{docs => [11, 15]}),
+ Args4 = BaseArgs#changes_args{since = EndSeq3},
+ {ok, Changes4, EndSeq4} = get_changes_feed(Db, Args4),
+ AddedChanges = [#{id => ID} || #doc{id = ID} <- docs([11, 15])],
+ ?assertChanges(AddedChanges, Changes4),
+
+ % confirm include_docs and deleted works
+ Args5 = BaseArgs#changes_args{include_docs = true},
+ {ok, Changes5, EndSeq5} = get_changes_feed(Db, Args5),
+ ?assertEqual(EndSeq4, EndSeq5),
+ [SampleChange] = [C || #{id := ID} = C <- Changes5, ID == <<"00005">>],
+ ?assertMatch(#{deleted := true}, SampleChange),
+ ?assertMatch(#{doc := {Body}} when is_list(Body), SampleChange),
+
+ % update and delete some pre and post split docs
+ AllDocs = [couch_doc:from_json_obj(Doc) || #{doc := Doc} <- Changes5],
+ UpdateDocs = lists:filtermap(fun
+ (#doc{id = <<"00002">>}) -> true;
+ (#doc{id = <<"00012">>}) -> true;
+ (#doc{id = <<"00004">>} = Doc) -> {true, Doc#doc{deleted = true}};
+ (#doc{id = <<"00014">>} = Doc) -> {true, Doc#doc{deleted = true}};
+ (_) -> false
+ end, AllDocs),
+ update_docs(Db, UpdateDocs),
+
+ Args6 = BaseArgs#changes_args{since = EndSeq5},
+ {ok, Changes6, EndSeq6} = get_changes_feed(Db, Args6),
+ UpdatedChanges = [#{id => ID} || #doc{id = ID} <- UpdateDocs],
+ ?assertChanges(UpdatedChanges, Changes6),
+ [#{seq := Seq6} | _] = Changes6,
+ ?assertEqual(EndSeq6, Seq6),
+
+ Args7 = Args6#changes_args{dir = rev, limit = 4},
+ {ok, Changes7, EndSeq7} = get_changes_feed(Db, Args7),
+ ?assertEqual(4, length(Changes7)),
+ [#{seq := Seq7} | _] = Changes7,
+ ?assertEqual(EndSeq7, Seq7)
+ end).
+
+
+continuous_feed_should_work_during_split(#{db1 := Db}) ->
+ ?_test(begin
+ {UpdaterPid, UpdaterRef} = spawn_monitor(fun() ->
+ Updater = fun U({State, I}) ->
+ receive
+ {get_state, {Pid, Ref}} ->
+ Pid ! {state, Ref, {State, I}},
+ U({State, I});
+ add ->
+ DocSpec = #{docs => [I, I]},
+ add_test_docs(Db, DocSpec),
+ U({State, I + 1});
+ split ->
+ spawn_monitor(fun() -> split_and_wait(Db) end),
+ U({"in_process", I});
+ stop ->
+ receive {'DOWN', _, process, _, _} -> ok end,
+ ok
+ end
+ end,
+ Updater({"before", 1})
+ end),
+
+ Callback = fun
+ (start, Acc) ->
+ {ok, Acc};
+ (waiting_for_updates, Acc) ->
+ Ref = make_ref(),
+ UpdaterPid ! {get_state, {self(), Ref}},
+ receive {state, Ref, {State, _}} -> ok end,
+ case {State, length(Acc)} of
+ {"before", N} when N < 5 ->
+ UpdaterPid ! add,
+ {ok, Acc};
+ {"before", _} ->
+ UpdaterPid ! split,
+ {ok, Acc};
+ {"in_process", N} when N < 10 ->
+ UpdaterPid ! add,
+ {ok, Acc};
+ {"in_process", _} ->
+ {ok, Acc}
+ end;
+ (timeout, Acc) ->
+ {ok, Acc};
+ ({change, {Change}}, Acc) ->
+ CM = maps:from_list(Change),
+ {ok, [CM | Acc]};
+ ({stop, EndSeq, _Pending}, Acc) ->
+ % Notice updater is still running
+ {stop, EndSeq, Acc}
+ end,
+
+ BaseArgs = #changes_args{
+ feed = "continuous",
+ heartbeat = 100,
+ timeout = 1000
+ },
+ StopResult = get_changes_feed(Db, BaseArgs, Callback),
+
+ % Changes feed stopped when source shard was deleted
+ ?assertMatch({stop, _, _}, StopResult),
+ {stop, StopEndSeq, StopChanges} = StopResult,
+
+ % Add 5 extra docs to the db right after changes feed was stopped
+ [UpdaterPid ! add || _ <- lists:seq(1, 5)],
+
+ % The the number of documents that updater had added
+ Ref = make_ref(),
+ UpdaterPid ! {get_state, {self(), Ref}},
+ DocCount = receive {state, Ref, {_, I}} -> I - 1 end,
+
+ UpdaterPid ! stop,
+ receive
+ {'DOWN', UpdaterRef, process, UpdaterPid, normal} ->
+ ok;
+ {'DOWN', UpdaterRef, process, UpdaterPid, Error} ->
+ erlang:error({test_context_failed, [
+ {module, ?MODULE},
+ {line, ?LINE},
+ {value, Error},
+ {reason, "Updater died"}]})
+ end,
+
+ AfterArgs = #changes_args{feed = "normal", since = StopEndSeq},
+ {ok, AfterChanges, _} = get_changes_feed(Db, AfterArgs),
+ DocIDs = [Id || #{id := Id} <- StopChanges ++ AfterChanges],
+ ExpectedDocIDs = [doc_id(<<>>, N) || N <- lists:seq(1, DocCount)],
+ ?assertEqual(ExpectedDocIDs, lists:usort(DocIDs))
+ end).
+
+
+split_and_wait(Db) ->
+ [#shard{name = Shard}] = lists:sort(mem3:local_shards(Db)),
+ {ok, JobId} = mem3_reshard:start_split_job(Shard),
+ wait_state(JobId, completed),
+ ResultShards = lists:sort(mem3:local_shards(Db)),
+ ?assertEqual(2, length(ResultShards)).
+
+
+wait_state(JobId, State) ->
+ test_util:wait(fun() ->
+ case mem3_reshard:job(JobId) of
+ {ok, {Props}} ->
+ case couch_util:get_value(job_state, Props) of
+ State -> ok;
+ _ -> timer:sleep(100), wait
+ end;
+ {error, not_found} -> timer:sleep(100), wait
+ end
+ end, 30000).
+
+
+get_changes_feed(Db, Args) ->
+ get_changes_feed(Db, Args, fun changes_callback/2).
+
+
+get_changes_feed(Db, Args, Callback) ->
+ with_proc(fun() ->
+ fabric:changes(Db, Callback, [], Args)
+ end).
+
+
+changes_callback(start, Acc) ->
+ {ok, Acc};
+changes_callback({change, {Change}}, Acc) ->
+ CM = maps:from_list(Change),
+ {ok, [CM | Acc]};
+changes_callback({stop, EndSeq, _Pending}, Acc) ->
+ {ok, Acc, EndSeq}.
+
+
+%% common helpers from here
+
+
+create_db(DbName, Opts) ->
+ GL = erlang:group_leader(),
+ with_proc(fun() -> fabric:create_db(DbName, Opts) end, GL).
+
+
+delete_db(DbName) ->
+ GL = erlang:group_leader(),
+ with_proc(fun() -> fabric:delete_db(DbName, [?ADMIN_CTX]) end, GL).
+
+
+with_proc(Fun) ->
+ with_proc(Fun, undefined, 30000).
+
+
+with_proc(Fun, GroupLeader) ->
+ with_proc(Fun, GroupLeader, 30000).
+
+
+with_proc(Fun, GroupLeader, Timeout) ->
+ {Pid, Ref} = spawn_monitor(fun() ->
+ case GroupLeader of
+ undefined -> ok;
+ _ -> erlang:group_leader(GroupLeader, self())
+ end,
+ exit({with_proc_res, Fun()})
+ end),
+ receive
+ {'DOWN', Ref, process, Pid, {with_proc_res, Res}} ->
+ Res;
+ {'DOWN', Ref, process, Pid, Error} ->
+ error(Error)
+ after Timeout ->
+ erlang:demonitor(Ref, [flush]),
+ exit(Pid, kill),
+ error({with_proc_timeout, Fun, Timeout})
+ end.
+
+
+add_test_docs(DbName, #{} = DocSpec) ->
+ Docs = docs(maps:get(docs, DocSpec, [])),
+ Res = update_docs(DbName, Docs),
+ Docs1 = lists:map(fun({Doc, {ok, {RevPos, Rev}}}) ->
+ Doc#doc{revs = {RevPos, [Rev]}}
+ end, lists:zip(Docs, Res)),
+ case delete_docs(maps:get(delete, DocSpec, []), Docs1) of
+ [] -> ok;
+ [_ | _] = Deleted -> update_docs(DbName, Deleted)
+ end,
+ ok.
+
+
+update_docs(DbName, Docs) ->
+ with_proc(fun() ->
+ case fabric:update_docs(DbName, Docs, [?ADMIN_CTX]) of
+ {accepted, Res} -> Res;
+ {ok, Res} -> Res
+ end
+ end).
+
+
+delete_docs([S, E], Docs) when E >= S ->
+ ToDelete = [doc_id(<<"">>, I) || I <- lists:seq(S, E)],
+ lists:filtermap(fun(#doc{id = Id} = Doc) ->
+ case lists:member(Id, ToDelete) of
+ true -> {true, Doc#doc{deleted = true}};
+ false -> false
+ end
+ end, Docs);
+delete_docs(_, _) ->
+ [].
+
+
+docs([S, E]) when E >= S ->
+ [doc(<<"">>, I) || I <- lists:seq(S, E)];
+docs(_) ->
+ [].
+
+
+doc(Pref, Id) ->
+ Body = [{<<"a">>, <<"b">>}],
+ doc(Pref, Id, Body, 42).
+
+
+doc(Pref, Id, BodyProps, AttSize) ->
+ #doc{
+ id = doc_id(Pref, Id),
+ body = {BodyProps},
+ atts = atts(AttSize)
+ }.
+
+
+doc_id(Pref, Id) ->
+ IdBin = iolist_to_binary(io_lib:format("~5..0B", [Id])),
+ <<Pref/binary, IdBin/binary>>.
+
+
+atts(0) ->
+ [];
+
+atts(Size) when is_integer(Size), Size >= 1 ->
+ Data = << <<"x">> || _ <- lists:seq(1, Size) >>,
+ [couch_att:new([
+ {name, <<"att">>},
+ {type, <<"app/binary">>},
+ {att_len, Size},
+ {data, Data}
+ ])].
diff --git a/src/mem3/test/mem3_reshard_test.erl b/src/mem3/test/mem3_reshard_test.erl
new file mode 100644
index 000000000..8c4479656
--- /dev/null
+++ b/src/mem3/test/mem3_reshard_test.erl
@@ -0,0 +1,804 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_reshard_test).
+
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("mem3/src/mem3_reshard.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl"). % for all_docs function
+
+-define(ID, <<"_id">>).
+
+setup() ->
+ HaveDreyfus = code:lib_dir(dreyfus) /= {error, bad_name},
+ case HaveDreyfus of false -> ok; true ->
+ mock_dreyfus_indices()
+ end,
+
+ HaveHastings = code:lib_dir(hastings) /= {error, bad_name},
+ case HaveHastings of false -> ok; true ->
+ mock_hastings_indices()
+ end,
+ {Db1, Db2} = {?tempdb(), ?tempdb()},
+ create_db(Db1, [{q, 1}, {n, 1}]),
+ PartProps = [{partitioned, true}, {hash, [couch_partition, hash, []]}],
+ create_db(Db2, [{q, 1}, {n, 1}, {props, PartProps}]),
+ config:set("reshard", "retry_interval_sec", "0", _Persist=false),
+ #{db1 => Db1, db2 => Db2}.
+
+
+teardown(#{} = Dbs) ->
+ mem3_reshard:reset_state(),
+ maps:map(fun(_, Db) -> delete_db(Db) end, Dbs),
+ config:delete("reshard", "retry_interval_sec", _Persist=false),
+ meck:unload().
+
+
+start_couch() ->
+ test_util:start_couch(?CONFIG_CHAIN, [mem3, fabric]).
+
+
+stop_couch(Ctx) ->
+ test_util:stop_couch(Ctx).
+
+
+mem3_reshard_db_test_() ->
+ {
+ "mem3 shard split db tests",
+ {
+ setup,
+ fun start_couch/0, fun stop_couch/1,
+ {
+ foreach,
+ fun setup/0, fun teardown/1,
+ [
+ fun split_one_shard/1,
+ fun update_docs_before_topoff1/1,
+ fun indices_are_built/1,
+ fun split_partitioned_db/1,
+ fun split_twice/1,
+ fun couch_events_are_emitted/1,
+ fun retries_work/1,
+ fun target_reset_in_initial_copy/1,
+ fun split_an_incomplete_shard_map/1
+ ]
+ }
+ }
+ }.
+
+
+% This is a basic test to check that shard splitting preserves documents, and
+% db meta props like revs limits and security.
+split_one_shard(#{db1 := Db}) ->
+ ?_test(begin
+ DocSpec = #{docs => 10, delete => [5, 9], mrview => 1, local => 1},
+ add_test_docs(Db, DocSpec),
+
+ % Save documents before the split
+ Docs0 = get_all_docs(Db),
+ Local0 = get_local_docs(Db),
+
+ % Set some custom metadata properties
+ set_revs_limit(Db, 942),
+ set_purge_infos_limit(Db, 943),
+ SecObj = {[{<<"foo">>, <<"bar">>}]},
+ set_security(Db, SecObj),
+
+ % DbInfo is saved after setting metadata bits
+ % as those could bump the update sequence
+ DbInfo0 = get_db_info(Db),
+
+ % Split the one shard
+ [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)),
+ {ok, JobId} = mem3_reshard:start_split_job(Shard),
+ wait_state(JobId, completed),
+
+ % Perform some basic checks that the shard was split
+ Shards1 = lists:sort(mem3:local_shards(Db)),
+ ?assertEqual(2, length(Shards1)),
+ [#shard{range = R1}, #shard{range = R2}] = Shards1,
+ ?assertEqual([16#00000000, 16#7fffffff], R1),
+ ?assertEqual([16#80000000, 16#ffffffff], R2),
+
+ % Check metadata bits after the split
+ ?assertEqual(942, get_revs_limit(Db)),
+ ?assertEqual(943, get_purge_infos_limit(Db)),
+ ?assertEqual(SecObj, get_security(Db)),
+
+ DbInfo1 = get_db_info(Db),
+ Docs1 = get_all_docs(Db),
+ Local1 = get_local_docs(Db),
+
+ % When comparing db infos, ignore update sequences they won't be the
+ % same since they are more shards involved after the split
+ ?assertEqual(without_seqs(DbInfo0), without_seqs(DbInfo1)),
+
+ % Update seq prefix number is a sum of all shard update sequences
+ #{<<"update_seq">> := UpdateSeq0} = update_seq_to_num(DbInfo0),
+ #{<<"update_seq">> := UpdateSeq1} = update_seq_to_num(DbInfo1),
+ ?assertEqual(UpdateSeq0 * 2, UpdateSeq1),
+
+ % Finally compare that the documents are still there after the split
+ ?assertEqual(Docs0, Docs1),
+
+ % Don't forget about the local but don't include internal checkpoints
+ % as some of those are munged and transformed during the split
+ ?assertEqual(without_meta_locals(Local0), without_meta_locals(Local1))
+ end).
+
+
+% This test checks that document added while the shard is being split are not
+% lost. Topoff1 state happens before indices are built
+update_docs_before_topoff1(#{db1 := Db}) ->
+ ?_test(begin
+ add_test_docs(Db, #{docs => 10}),
+
+ intercept_state(topoff1),
+
+ [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)),
+ {ok, JobId} = mem3_reshard:start_split_job(Shard),
+
+ receive {JobPid, topoff1} -> ok end,
+ add_test_docs(Db, #{docs => [10, 19], local => 1}),
+ Docs0 = get_all_docs(Db),
+ Local0 = get_local_docs(Db),
+ DbInfo0 = get_db_info(Db),
+ JobPid ! continue,
+
+ wait_state(JobId, completed),
+
+ % Perform some basic checks that the shard was split
+ Shards1 = lists:sort(mem3:local_shards(Db)),
+ ?assertEqual(2, length(Shards1)),
+
+ DbInfo1 = get_db_info(Db),
+ Docs1 = get_all_docs(Db),
+ Local1 = get_local_docs(Db),
+
+ ?assertEqual(without_seqs(DbInfo0), without_seqs(DbInfo1)),
+
+ % Update sequence after initial copy with 10 docs would be 10 on each
+ % target shard (to match the source) and the total update sequence
+ % would have been 20. But then 10 more docs were added (3 might have
+ % ended up on one target and 7 on another) so the final update sequence
+ % would then be 20 + 10 = 30.
+ ?assertMatch(#{<<"update_seq">> := 30}, update_seq_to_num(DbInfo1)),
+
+ ?assertEqual(Docs0, Docs1),
+ ?assertEqual(without_meta_locals(Local0), without_meta_locals(Local1))
+ end).
+
+
+% This test that indices are built during shard splitting.
+indices_are_built(#{db1 := Db}) ->
+ ?_test(begin
+ HaveDreyfus = code:lib_dir(dreyfus) /= {error, bad_name},
+ HaveHastings = code:lib_dir(hastings) /= {error, bad_name},
+
+ add_test_docs(Db, #{docs => 10, mrview => 2, search => 2, geo => 2}),
+ [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)),
+ {ok, JobId} = mem3_reshard:start_split_job(Shard),
+ wait_state(JobId, completed),
+ Shards1 = lists:sort(mem3:local_shards(Db)),
+ ?assertEqual(2, length(Shards1)),
+ MRViewGroupInfo = get_group_info(Db, <<"_design/mrview00000">>),
+ ?assertMatch(#{<<"update_seq">> := 32}, MRViewGroupInfo),
+
+ HaveDreyfus = code:lib_dir(dreyfus) /= {error, bad_name},
+ case HaveDreyfus of false -> ok; true ->
+ % 4 because there are 2 indices and 2 target shards
+ ?assertEqual(4, meck:num_calls(dreyfus_index, await, 2))
+ end,
+
+ HaveHastings = code:lib_dir(hastings) /= {error, bad_name},
+ case HaveHastings of false -> ok; true ->
+ % 4 because there are 2 indices and 2 target shards
+ ?assertEqual(4, meck:num_calls(hastings_index, await, 2))
+ end
+ end).
+
+
+mock_dreyfus_indices() ->
+ meck:expect(dreyfus_index, design_doc_to_indexes, fun(Doc) ->
+ #doc{body = {BodyProps}} = Doc,
+ case couch_util:get_value(<<"indexes">>, BodyProps) of
+ undefined ->
+ [];
+ {[_]} ->
+ [{dreyfus, <<"db">>, dreyfus_index1}]
+ end
+ end),
+ meck:expect(dreyfus_index_manager, get_index, fun(_, _) -> {ok, pid} end),
+ meck:expect(dreyfus_index, await, fun(_, _) -> ok end).
+
+
+mock_hastings_indices() ->
+ meck:expect(hastings_index, design_doc_to_indexes, fun(Doc) ->
+ #doc{body = {BodyProps}} = Doc,
+ case couch_util:get_value(<<"st_indexes">>, BodyProps) of
+ undefined ->
+ [];
+ {[_]} ->
+ [{hastings, <<"db">>, hastings_index1}]
+ end
+ end),
+ meck:expect(hastings_index_manager, get_index, fun(_, _) -> {ok, pid} end),
+ meck:expect(hastings_index, await, fun(_, _) -> ok end).
+
+% Split partitioned database
+split_partitioned_db(#{db2 := Db}) ->
+ ?_test(begin
+ DocSpec = #{
+ pdocs => #{
+ <<"PX">> => 5,
+ <<"PY">> => 5
+ },
+ mrview => 1,
+ local => 1
+ },
+ add_test_docs(Db, DocSpec),
+
+ % Save documents before the split
+ Docs0 = get_all_docs(Db),
+ Local0 = get_local_docs(Db),
+
+ % Set some custom metadata properties
+ set_revs_limit(Db, 942),
+ set_purge_infos_limit(Db, 943),
+ SecObj = {[{<<"foo">>, <<"bar">>}]},
+ set_security(Db, SecObj),
+
+ % DbInfo is saved after setting metadata bits
+ % as those could bump the update sequence
+ DbInfo0 = get_db_info(Db),
+ PX0 = get_partition_info(Db, <<"PX">>),
+ PY0 = get_partition_info(Db, <<"PY">>),
+
+ % Split the one shard
+ [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)),
+ {ok, JobId} = mem3_reshard:start_split_job(Shard),
+ wait_state(JobId, completed),
+
+ % Perform some basic checks that the shard was split
+ Shards1 = lists:sort(mem3:local_shards(Db)),
+ ?assertEqual(2, length(Shards1)),
+ [#shard{range = R1}, #shard{range = R2}] = Shards1,
+ ?assertEqual([16#00000000, 16#7fffffff], R1),
+ ?assertEqual([16#80000000, 16#ffffffff], R2),
+
+ % Check metadata bits after the split
+ ?assertEqual(942, get_revs_limit(Db)),
+ ?assertEqual(943, get_purge_infos_limit(Db)),
+ ?assertEqual(SecObj, get_security(Db)),
+
+ DbInfo1 = get_db_info(Db),
+ Docs1 = get_all_docs(Db),
+ Local1 = get_local_docs(Db),
+
+ % When comparing db infos, ignore update sequences they won't be the
+ % same since they are more shards involved after the split
+ ?assertEqual(without_seqs(DbInfo0), without_seqs(DbInfo1)),
+
+ % Update seq prefix number is a sum of all shard update sequences
+ #{<<"update_seq">> := UpdateSeq0} = update_seq_to_num(DbInfo0),
+ #{<<"update_seq">> := UpdateSeq1} = update_seq_to_num(DbInfo1),
+ ?assertEqual(UpdateSeq0 * 2, UpdateSeq1),
+
+ % Finally compare that documents are still there after the split
+ ?assertEqual(Docs0, Docs1),
+
+ ?assertEqual(PX0, get_partition_info(Db, <<"PX">>)),
+ ?assertEqual(PY0, get_partition_info(Db, <<"PY">>)),
+
+ % Don't forget about the local but don't include internal checkpoints
+ % as some of those are munged and transformed during the split
+ ?assertEqual(without_meta_locals(Local0), without_meta_locals(Local1))
+ end).
+
+
+% Make sure a shard can be split again after it was split once. This checks that
+% too many got added to some range, such that on next split they'd fail to fit
+% in to any of the new target ranges.
+split_twice(#{db1 := Db}) ->
+ ?_test(begin
+ DocSpec = #{docs => 100, delete => [80, 99], mrview => 2, local => 100},
+ add_test_docs(Db, DocSpec),
+
+ % Save documents before the split
+ Docs0 = get_all_docs(Db),
+ Local0 = get_local_docs(Db),
+
+ % Set some custom metadata properties
+ set_revs_limit(Db, 942),
+ set_purge_infos_limit(Db, 943),
+ SecObj = {[{<<"foo">>, <<"bar">>}]},
+ set_security(Db, SecObj),
+
+ % DbInfo is saved after setting metadata bits
+ % as those could bump the update sequence
+ DbInfo0 = get_db_info(Db),
+
+ % Split the one shard
+ [#shard{name=Shard1}] = lists:sort(mem3:local_shards(Db)),
+ {ok, JobId1} = mem3_reshard:start_split_job(Shard1),
+ wait_state(JobId1, completed),
+
+ % Perform some basic checks that the shard was split
+ Shards1 = lists:sort(mem3:local_shards(Db)),
+ ?assertEqual(2, length(Shards1)),
+ [#shard{range = R1}, #shard{range = R2}] = Shards1,
+ ?assertEqual([16#00000000, 16#7fffffff], R1),
+ ?assertEqual([16#80000000, 16#ffffffff], R2),
+
+ % Check metadata bits after the split
+ ?assertEqual(942, get_revs_limit(Db)),
+ ?assertEqual(943, get_purge_infos_limit(Db)),
+ ?assertEqual(SecObj, get_security(Db)),
+
+ DbInfo1 = get_db_info(Db),
+ Docs1 = get_all_docs(Db),
+ Local1 = get_local_docs(Db),
+
+ % When comparing db infos, ignore update sequences they won't be the
+ % same since they are more shards involved after the split
+ ?assertEqual(without_seqs(DbInfo0), without_seqs(DbInfo1)),
+
+ % Update seq prefix number is a sum of all shard update sequences
+ #{<<"update_seq">> := UpdateSeq0} = update_seq_to_num(DbInfo0),
+ #{<<"update_seq">> := UpdateSeq1} = update_seq_to_num(DbInfo1),
+ ?assertEqual(UpdateSeq0 * 2, UpdateSeq1),
+
+ ?assertEqual(Docs0, Docs1),
+ ?assertEqual(without_meta_locals(Local0), without_meta_locals(Local1)),
+
+ % Split the first range again
+ [#shard{name=Shard2}, _] = lists:sort(mem3:local_shards(Db)),
+ {ok, JobId2} = mem3_reshard:start_split_job(Shard2),
+ wait_state(JobId2, completed),
+
+ Shards2 = lists:sort(mem3:local_shards(Db)),
+ ?assertEqual(3, length(Shards2)),
+ [R3, R4, R5] = [R || #shard{range = R} <- Shards2],
+ ?assertEqual([16#00000000, 16#3fffffff], R3),
+ ?assertEqual([16#40000000, 16#7fffffff], R4),
+ ?assertEqual([16#80000000, 16#ffffffff], R5),
+
+ % Check metadata bits after the second split
+ ?assertEqual(942, get_revs_limit(Db)),
+ ?assertEqual(943, get_purge_infos_limit(Db)),
+ ?assertEqual(SecObj, get_security(Db)),
+
+ DbInfo2 = get_db_info(Db),
+ Docs2 = get_all_docs(Db),
+ Local2 = get_local_docs(Db),
+
+ ?assertEqual(without_seqs(DbInfo1), without_seqs(DbInfo2)),
+ % Update seq prefix number is a sum of all shard update sequences
+ % But only 1 shard out of 2 was split
+ #{<<"update_seq">> := UpdateSeq2} = update_seq_to_num(DbInfo2),
+ ?assertEqual(trunc(UpdateSeq1 * 1.5), UpdateSeq2),
+ ?assertEqual(Docs1, Docs2),
+ ?assertEqual(without_meta_locals(Local1), without_meta_locals(Local2))
+ end).
+
+
+couch_events_are_emitted(#{db1 := Db}) ->
+ ?_test(begin
+ couch_event:register_all(self()),
+
+ % Split the one shard
+ [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)),
+ {ok, JobId} = mem3_reshard:start_split_job(Shard),
+ wait_state(JobId, completed),
+
+ % Perform some basic checks that the shard was split
+ Shards1 = lists:sort(mem3:local_shards(Db)),
+ ?assertEqual(2, length(Shards1)),
+ [#shard{range = R1}, #shard{range = R2}] = Shards1,
+ ?assertEqual([16#00000000, 16#7fffffff], R1),
+ ?assertEqual([16#80000000, 16#ffffffff], R2),
+
+ Flush = fun F(Events) ->
+ receive
+ {'$couch_event', DbName, Event} when Event =:= deleted
+ orelse Event =:= updated ->
+ case binary:match(DbName, Db) of
+ nomatch -> F(Events);
+ {_, _} -> F([Event | Events])
+ end
+ after 0 ->
+ lists:reverse(Events)
+ end
+ end,
+ Events = Flush([]),
+ StartAtDeleted = lists:dropwhile(fun(E) -> E =/= deleted end, Events),
+ ?assertMatch([deleted, deleted, updated, updated | _], StartAtDeleted),
+ couch_event:unregister(self())
+ end).
+
+
+retries_work(#{db1 := Db}) ->
+ ?_test(begin
+ meck:expect(couch_db_split, split, fun(_, _, _) ->
+ error(kapow)
+ end),
+
+ [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)),
+ {ok, JobId} = mem3_reshard:start_split_job(Shard),
+
+ wait_state(JobId, failed),
+ ?assertEqual(3, meck:num_calls(couch_db_split, split, 3))
+ end).
+
+
+target_reset_in_initial_copy(#{db1 := Db}) ->
+ ?_test(begin
+ [#shard{} = Src] = lists:sort(mem3:local_shards(Db)),
+ Job = #job{
+ source = Src,
+ target = [#shard{name= <<"t1">>}, #shard{name = <<"t2">>}],
+ job_state = running,
+ split_state = initial_copy
+ },
+ BogusParent = spawn(fun() -> receive {ack, _, _} -> ok end end),
+ put('$ancestors', [BogusParent]), % make prock_lib:ack not blow up
+ meck:expect(mem3_reshard, checkpoint, 2, ok),
+ meck:expect(couch_db_split, cleanup_target, 2, ok),
+ meck:expect(couch_server, exists, fun
+ (<<"t1">>) -> true;
+ (<<"t2">>) -> true;
+ (DbName) -> meck:passthrough([DbName])
+ end),
+ JobPid = spawn(fun() -> mem3_reshard_job:init(Job) end),
+ meck:wait(2, couch_db_split, cleanup_target, ['_', '_'], 5000),
+ exit(JobPid, kill),
+ exit(BogusParent, kill),
+ ?assertEqual(2, meck:num_calls(couch_db_split, cleanup_target, 2))
+ end).
+
+
+split_an_incomplete_shard_map(#{db1 := Db}) ->
+ ?_test(begin
+ [#shard{} = Src] = lists:sort(mem3:local_shards(Db)),
+ [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)),
+ meck:expect(mem3_util, calculate_max_n, 1, 0),
+ ?assertMatch({error, {not_enough_shard_copies, _}},
+ mem3_reshard:start_split_job(Shard))
+ end).
+
+
+intercept_state(State) ->
+ TestPid = self(),
+ meck:new(mem3_reshard_job, [passthrough]),
+ meck:expect(mem3_reshard_job, checkpoint_done, fun(Job) ->
+ case Job#job.split_state of
+ State ->
+ TestPid ! {self(), State},
+ receive
+ continue -> meck:passthrough([Job]);
+ cancel -> ok
+ end;
+ _ ->
+ meck:passthrough([Job])
+ end
+ end).
+
+
+wait_state(JobId, State) ->
+ test_util:wait(fun() ->
+ case mem3_reshard:job(JobId) of
+ {ok, {Props}} ->
+ case couch_util:get_value(job_state, Props) of
+ State -> ok;
+ _ -> timer:sleep(100), wait
+ end;
+ {error, not_found} -> timer:sleep(100), wait
+ end
+ end, 30000).
+
+
+set_revs_limit(DbName, Limit) ->
+ with_proc(fun() -> fabric:set_revs_limit(DbName, Limit, [?ADMIN_CTX]) end).
+
+
+get_revs_limit(DbName) ->
+ with_proc(fun() -> fabric:get_revs_limit(DbName) end).
+
+
+get_purge_infos_limit(DbName) ->
+ with_proc(fun() -> fabric:get_purge_infos_limit(DbName) end).
+
+
+set_purge_infos_limit(DbName, Limit) ->
+ with_proc(fun() ->
+ fabric:set_purge_infos_limit(DbName, Limit, [?ADMIN_CTX])
+ end).
+
+
+set_security(DbName, SecObj) ->
+ with_proc(fun() -> fabric:set_security(DbName, SecObj) end).
+
+
+get_security(DbName) ->
+ with_proc(fun() -> fabric:get_security(DbName, [?ADMIN_CTX]) end).
+
+
+get_db_info(DbName) ->
+ with_proc(fun() ->
+ {ok, Info} = fabric:get_db_info(DbName),
+ maps:with([
+ <<"db_name">>, <<"doc_count">>, <<"props">>, <<"doc_del_count">>,
+ <<"update_seq">>, <<"purge_seq">>, <<"disk_format_version">>
+ ], to_map(Info))
+ end).
+
+
+get_group_info(DbName, DesignId) ->
+ with_proc(fun() ->
+ {ok, GInfo} = fabric:get_view_group_info(DbName, DesignId),
+ maps:with([
+ <<"language">>, <<"purge_seq">>, <<"signature">>, <<"update_seq">>
+ ], to_map(GInfo))
+ end).
+
+
+get_partition_info(DbName, Partition) ->
+ with_proc(fun() ->
+ {ok, PInfo} = fabric:get_partition_info(DbName, Partition),
+ maps:with([
+ <<"db_name">>, <<"doc_count">>, <<"doc_del_count">>, <<"partition">>
+ ], to_map(PInfo))
+ end).
+
+
+get_all_docs(DbName) ->
+ get_all_docs(DbName, #mrargs{}).
+
+
+get_all_docs(DbName, #mrargs{} = QArgs0) ->
+ GL = erlang:group_leader(),
+ with_proc(fun() ->
+ Cb = fun
+ ({row, Props}, Acc) ->
+ Doc = to_map(couch_util:get_value(doc, Props)),
+ #{?ID := Id} = Doc,
+ {ok, Acc#{Id => Doc}};
+ ({meta, _}, Acc) -> {ok, Acc};
+ (complete, Acc) -> {ok, Acc}
+ end,
+ QArgs = QArgs0#mrargs{include_docs = true},
+ {ok, Docs} = fabric:all_docs(DbName, Cb, #{}, QArgs),
+ Docs
+ end, GL).
+
+
+get_local_docs(DbName) ->
+ LocalNS = {namespace, <<"_local">>},
+ maps:map(fun(_, Doc) ->
+ maps:without([<<"_rev">>], Doc)
+ end, get_all_docs(DbName, #mrargs{extra = [LocalNS]})).
+
+
+without_seqs(#{} = InfoMap) ->
+ maps:without([<<"update_seq">>, <<"purge_seq">>], InfoMap).
+
+
+without_meta_locals(#{} = Local) ->
+ maps:filter(fun
+ (<<"_local/purge-mrview-", _/binary>>, _) -> false;
+ (<<"_local/shard-sync-", _/binary>>, _) -> false;
+ (_, _) -> true
+ end, Local).
+
+
+update_seq_to_num(#{} = InfoMap) ->
+ maps:map(fun
+ (<<"update_seq">>, Seq) -> seq_to_num(Seq);
+ (<<"purge_seq">>, PSeq) -> seq_to_num(PSeq);
+ (_, V) -> V
+ end, InfoMap).
+
+
+seq_to_num(Seq) ->
+ [SeqNum, _] = binary:split(Seq, <<"-">>),
+ binary_to_integer(SeqNum).
+
+
+to_map([_ | _] = Props) ->
+ to_map({Props});
+
+to_map({[_ | _]} = EJson) ->
+ jiffy:decode(jiffy:encode(EJson), [return_maps]).
+
+
+create_db(DbName, Opts) ->
+ GL = erlang:group_leader(),
+ with_proc(fun() -> fabric:create_db(DbName, Opts) end, GL).
+
+
+delete_db(DbName) ->
+ GL = erlang:group_leader(),
+ with_proc(fun() -> fabric:delete_db(DbName, [?ADMIN_CTX]) end, GL).
+
+
+with_proc(Fun) ->
+ with_proc(Fun, undefined, 30000).
+
+
+with_proc(Fun, GroupLeader) ->
+ with_proc(Fun, GroupLeader, 30000).
+
+
+with_proc(Fun, GroupLeader, Timeout) ->
+ {Pid, Ref} = spawn_monitor(fun() ->
+ case GroupLeader of
+ undefined -> ok;
+ _ -> erlang:group_leader(GroupLeader, self())
+ end,
+ exit({with_proc_res, Fun()})
+ end),
+ receive
+ {'DOWN', Ref, process, Pid, {with_proc_res, Res}} ->
+ Res;
+ {'DOWN', Ref, process, Pid, Error} ->
+ error(Error)
+ after Timeout ->
+ erlang:demonitor(Ref, [flush]),
+ exit(Pid, kill),
+ error({with_proc_timeout, Fun, Timeout})
+ end.
+
+
+add_test_docs(DbName, #{} = DocSpec) ->
+ Docs = docs(maps:get(docs, DocSpec, []))
+ ++ pdocs(maps:get(pdocs, DocSpec, #{}))
+ ++ ddocs(mrview, maps:get(mrview, DocSpec, []))
+ ++ ddocs(search, maps:get(search, DocSpec, []))
+ ++ ddocs(geo, maps:get(geo, DocSpec, []))
+ ++ ldocs(maps:get(local, DocSpec, [])),
+ Res = update_docs(DbName, Docs),
+ Docs1 = lists:map(fun({Doc, {ok, {RevPos, Rev}}}) ->
+ Doc#doc{revs = {RevPos, [Rev]}}
+ end, lists:zip(Docs, Res)),
+ case delete_docs(maps:get(delete, DocSpec, []), Docs1) of
+ [] -> ok;
+ [_ | _] = Deleted -> update_docs(DbName, Deleted)
+ end,
+ ok.
+
+
+update_docs(DbName, Docs) ->
+ with_proc(fun() ->
+ case fabric:update_docs(DbName, Docs, [?ADMIN_CTX]) of
+ {accepted, Res} -> Res;
+ {ok, Res} -> Res
+ end
+ end).
+
+
+delete_docs([S, E], Docs) when E >= S ->
+ ToDelete = [doc_id(<<"">>, I) || I <- lists:seq(S, E)],
+ lists:filtermap(fun(#doc{id = Id} = Doc) ->
+ case lists:member(Id, ToDelete) of
+ true -> {true, Doc#doc{deleted = true}};
+ false -> false
+ end
+ end, Docs);
+delete_docs(_, _) ->
+ [].
+
+
+pdocs(#{} = PMap) ->
+ maps:fold(fun(Part, DocSpec, DocsAcc) ->
+ docs(DocSpec, <<Part/binary, ":">>) ++ DocsAcc
+ end, [], PMap).
+
+
+docs(DocSpec) ->
+ docs(DocSpec, <<"">>).
+
+
+docs(N, Prefix) when is_integer(N), N > 0 ->
+ docs([0, N - 1], Prefix);
+docs([S, E], Prefix) when E >= S ->
+ [doc(Prefix, I) || I <- lists:seq(S, E)];
+docs(_, _) ->
+ [].
+
+ddocs(Type, N) when is_integer(N), N > 0 ->
+ ddocs(Type, [0, N - 1]);
+ddocs(Type, [S, E]) when E >= S ->
+ Body = ddprop(Type),
+ BType = atom_to_binary(Type, utf8),
+ [doc(<<"_design/", BType/binary>>, I, Body, 0) || I <- lists:seq(S, E)];
+ddocs(_, _) ->
+ [].
+
+
+ldocs(N) when is_integer(N), N > 0 ->
+ ldocs([0, N - 1]);
+ldocs([S, E]) when E >= S ->
+ [doc(<<"_local/">>, I, bodyprops(), 0) || I <- lists:seq(S, E)];
+ldocs(_) ->
+ [].
+
+
+
+doc(Pref, Id) ->
+ Body = bodyprops(),
+ doc(Pref, Id, Body, 42).
+
+
+doc(Pref, Id, BodyProps, AttSize) ->
+ #doc{
+ id = doc_id(Pref, Id),
+ body = {BodyProps},
+ atts = atts(AttSize)
+ }.
+
+
+doc_id(Pref, Id) ->
+ IdBin = iolist_to_binary(io_lib:format("~5..0B", [Id])),
+ <<Pref/binary, IdBin/binary>>.
+
+
+ddprop(mrview) ->
+ [
+ {<<"views">>, {[
+ {<<"v1">>, {[
+ {<<"map">>, <<"function(d){emit(d);}">>}
+ ]}}
+ ]}}
+ ];
+
+ddprop(geo) ->
+ [
+ {<<"st_indexes">>, {[
+ {<<"area">>, {[
+ {<<"analyzer">>, <<"standard">>},
+ {<<"index">>, <<"function(d){if(d.g){st_index(d.g)}}">> }
+ ]}}
+ ]}}
+ ];
+
+ddprop(search) ->
+ [
+ {<<"indexes">>, {[
+ {<<"types">>, {[
+ {<<"index">>, <<"function(d){if(d.g){st_index(d.g.type)}}">>}
+ ]}}
+ ]}}
+ ].
+
+
+bodyprops() ->
+ [
+ {<<"g">>, {[
+ {<<"type">>, <<"Polygon">>},
+ {<<"coordinates">>, [[[-71.0, 48.4], [-70.0, 48.4], [-71.0, 48.4]]]}
+ ]}}
+ ].
+
+
+atts(0) ->
+ [];
+
+atts(Size) when is_integer(Size), Size >= 1 ->
+ Data = << <<"x">> || _ <- lists:seq(1, Size) >>,
+ [couch_att:new([
+ {name, <<"att">>},
+ {type, <<"app/binary">>},
+ {att_len, Size},
+ {data, Data}
+ ])].
diff --git a/test/elixir/lib/couch/db_test.ex b/test/elixir/lib/couch/db_test.ex
index 7a08aae36..d88478a8f 100644
--- a/test/elixir/lib/couch/db_test.ex
+++ b/test/elixir/lib/couch/db_test.ex
@@ -171,8 +171,7 @@ defmodule Couch.DBTest do
def delete_db(db_name) do
resp = Couch.delete("/#{db_name}")
- assert resp.status_code in [200, 202]
- assert resp.body == %{"ok" => true}
+ assert resp.status_code in [200, 202, 404]
{:ok, resp}
end
diff --git a/test/elixir/test/reshard_all_docs_test.exs b/test/elixir/test/reshard_all_docs_test.exs
new file mode 100644
index 000000000..62b6e372c
--- /dev/null
+++ b/test/elixir/test/reshard_all_docs_test.exs
@@ -0,0 +1,79 @@
+defmodule ReshardAllDocsTest do
+ use CouchTestCase
+ import ReshardHelpers
+
+ @moduledoc """
+ Test _all_docs interaction with resharding
+ """
+
+ setup do
+ db = random_db_name()
+ {:ok, _} = create_db(db, query: %{q: 2})
+
+ on_exit(fn ->
+ reset_reshard_state()
+ delete_db(db)
+ end)
+
+ {:ok, [db: db]}
+ end
+
+ test "all_docs after splitting all shards on node1", context do
+ db = context[:db]
+ node1 = get_first_node()
+ docs = add_docs(1..100, db)
+
+ before_split_all_docs = all_docs(db)
+ assert docs == before_split_all_docs
+
+ resp = post_job_node(db, node1)
+ assert resp.status_code == 201
+ jobid = hd(resp.body)["id"]
+ wait_job_completed(jobid)
+
+ assert before_split_all_docs == all_docs(db)
+
+ assert remove_job(jobid).status_code == 200
+ end
+
+ test "all_docs after splitting the same range on all nodes", context do
+ db = context[:db]
+ docs = add_docs(1..100, db)
+
+ before_split_all_docs = all_docs(db)
+ assert docs == before_split_all_docs
+
+ resp = post_job_range(db, "00000000-7fffffff")
+ assert resp.status_code == 201
+
+ resp.body
+ |> Enum.map(fn j -> j["id"] end)
+ |> Enum.each(fn id -> wait_job_completed(id) end)
+
+ assert before_split_all_docs == all_docs(db)
+
+ get_jobs()
+ |> Enum.map(fn j -> j["id"] end)
+ |> Enum.each(fn id -> remove_job(id) end)
+ end
+
+ defp add_docs(range, db) do
+ docs = create_docs(range)
+ w3 = %{:w => 3}
+ resp = Couch.post("/#{db}/_bulk_docs", body: %{docs: docs}, query: w3)
+ assert resp.status_code == 201
+ assert length(resp.body) == length(docs)
+
+ docs
+ |> rev(resp.body)
+ |> Enum.into(%{}, fn %{:_id => id, :_rev => rev} -> {id, rev} end)
+ end
+
+ defp all_docs(db, query \\ %{}) do
+ resp = Couch.get("/#{db}/_all_docs", query: query)
+ assert resp.status_code == 200
+
+ resp.body["rows"]
+ |> Enum.into(%{}, fn %{"id" => id, "value" => v} -> {id, v["rev"]} end)
+ end
+end
diff --git a/test/elixir/test/reshard_basic_test.exs b/test/elixir/test/reshard_basic_test.exs
new file mode 100644
index 000000000..211dd6bf7
--- /dev/null
+++ b/test/elixir/test/reshard_basic_test.exs
@@ -0,0 +1,174 @@
+defmodule ReshardBasicTest do
+ use CouchTestCase
+ import ReshardHelpers
+
+ @moduledoc """
+ Test resharding basic functionality
+ """
+
+ setup_all do
+ db1 = random_db_name()
+ {:ok, _} = create_db(db1, query: %{q: 1})
+ db2 = random_db_name()
+ {:ok, _} = create_db(db2, query: %{q: 2})
+
+ on_exit(fn ->
+ reset_reshard_state()
+ delete_db(db1)
+ delete_db(db2)
+ end)
+
+ {:ok, [db1: db1, db2: db2]}
+ end
+
+ test "basic api querying, no jobs present" do
+ summary = get_summary()
+ assert summary["state"] == "running"
+ assert summary["state_reason"] == :null
+ assert summary["total"] == 0
+ assert summary["completed"] == 0
+ assert summary["failed"] == 0
+ assert summary["stopped"] == 0
+ assert get_state() == %{"state" => "running", "reason" => :null}
+ assert get_jobs() == []
+ end
+
+ test "check validation of invalid parameters", context do
+ db1 = context[:db1]
+ node1 = get_first_node()
+
+ resp = post_job_node(db1, "badnode")
+ assert resp.status_code == 400
+
+ resp = post_job_node("badresharddb", node1)
+ assert resp.status_code == 400
+
+ resp = post_job_db("badresharddb")
+ assert resp.status_code == 400
+
+ resp = post_job_range("badresharddb", "randomgarbage")
+ assert resp.status_code == 400
+
+ resp = get_job("badjobid")
+ assert resp.status_code == 404
+
+ resp = remove_job("badjobid")
+ assert resp.status_code == 404
+ end
+
+ test "toggle global state" do
+ assert get_state() == %{"state" => "running", "reason" => :null}
+ put_state_stopped("xyz")
+ assert get_state() == %{"state" => "stopped", "reason" => "xyz"}
+ put_state_running()
+ assert get_state() == %{"state" => "running", "reason" => :null}
+ end
+
+ test "split q=1 db shards on node1 (1 job)", context do
+ db = context[:db1]
+ node1 = get_first_node()
+
+ resp = post_job_node(db, node1)
+ assert resp.status_code == 201
+
+ body = resp.body
+ assert is_list(body)
+ assert length(body) == 1
+
+ [job] = body
+ id = job["id"]
+ assert is_binary(id)
+ node = job["node"]
+ assert is_binary(node)
+ assert node == node1
+ assert job["ok"] == true
+ shard = job["shard"]
+ assert is_binary(shard)
+
+ resp = get_job(id)
+ assert resp.status_code == 200
+
+ body = resp.body
+ assert body["type"] == "split"
+ assert body["id"] == id
+ assert body["source"] == shard
+ assert is_list(body["history"])
+ assert body["job_state"] in ["new", "running", "completed"]
+ assert is_list(body["target"])
+ assert length(body["target"]) == 2
+
+ wait_job_completed(id)
+
+ resp = get_job(id)
+ assert resp.status_code == 200
+
+ body = resp.body
+ assert body["job_state"] == "completed"
+ assert body["split_state"] == "completed"
+
+ resp = Couch.get("/#{db}/_shards")
+ assert resp.status_code == 200
+ shards = resp.body["shards"]
+ assert node1 not in shards["00000000-ffffffff"]
+ assert shards["00000000-7fffffff"] == [node1]
+ assert shards["80000000-ffffffff"] == [node1]
+
+ summary = get_summary()
+ assert summary["total"] == 1
+ assert summary["completed"] == 1
+
+ resp = remove_job(id)
+ assert resp.status_code == 200
+
+ assert get_jobs() == []
+
+ summary = get_summary()
+ assert summary["total"] == 0
+ assert summary["completed"] == 0
+ end
+
+ test "split q=2 shards on node1 (2 jobs)", context do
+ db = context[:db2]
+ node1 = get_first_node()
+
+ resp = post_job_node(db, node1)
+ assert resp.status_code == 201
+
+ body = resp.body
+ assert is_list(body)
+ assert length(body) == 2
+
+ [job1, job2] = Enum.sort(body)
+ {id1, id2} = {job1["id"], job2["id"]}
+
+ assert get_job(id1).body["id"] == id1
+ assert get_job(id2).body["id"] == id2
+
+ summary = get_summary()
+ assert summary["total"] == 2
+
+ wait_job_completed(id1)
+ wait_job_completed(id2)
+
+ summary = get_summary()
+ assert summary["completed"] == 2
+
+ resp = Couch.get("/#{db}/_shards")
+ assert resp.status_code == 200
+ shards = resp.body["shards"]
+ assert node1 not in shards["00000000-7fffffff"]
+ assert node1 not in shards["80000000-ffffffff"]
+ assert shards["00000000-3fffffff"] == [node1]
+ assert shards["40000000-7fffffff"] == [node1]
+ assert shards["80000000-bfffffff"] == [node1]
+ assert shards["c0000000-ffffffff"] == [node1]
+
+ # deleting the source db should remove the jobs
+ delete_db(db)
+ wait_job_removed(id1)
+ wait_job_removed(id2)
+
+ summary = get_summary()
+ assert summary["total"] == 0
+ end
+end
diff --git a/test/elixir/test/reshard_changes_feed.exs b/test/elixir/test/reshard_changes_feed.exs
new file mode 100644
index 000000000..a4a39fec1
--- /dev/null
+++ b/test/elixir/test/reshard_changes_feed.exs
@@ -0,0 +1,81 @@
+defmodule ReshardChangesFeedTest do
+ use CouchTestCase
+ import ReshardHelpers
+
+ @moduledoc """
+ Test _changes interaction with resharding
+ """
+
+ setup do
+ db = random_db_name()
+ {:ok, _} = create_db(db, query: %{q: 2})
+
+ on_exit(fn ->
+ reset_reshard_state()
+ delete_db(db)
+ end)
+
+ {:ok, [db: db]}
+ end
+
+ test "all_docs after splitting all shards on node1", context do
+ db = context[:db]
+ add_docs(1..3, db)
+
+ all_before = changes(db)
+ first_seq = hd(all_before["results"])["seq"]
+ last_seq = all_before["last_seq"]
+ since_1_before = docset(changes(db, %{:since => first_seq}))
+ since_last_before = docset(changes(db, %{:since => last_seq}))
+
+ resp = post_job_range(db, "00000000-7fffffff")
+ assert resp.status_code == 201
+
+ resp.body
+ |> Enum.map(fn j -> j["id"] end)
+ |> Enum.each(fn id -> wait_job_completed(id) end)
+
+ all_after = changes(db)
+ since_1_after = docset(changes(db, %{:since => first_seq}))
+ since_last_after = docset(changes(db, %{:since => last_seq}))
+
+ assert docset(all_before) == docset(all_after)
+ assert MapSet.subset?(since_1_before, since_1_after)
+ assert MapSet.subset?(since_last_before, since_last_after)
+
+ get_jobs()
+ |> Enum.map(fn j -> j["id"] end)
+ |> Enum.each(fn id -> remove_job(id) end)
+ end
+
+ defp docset(changes) do
+ changes["results"]
+ |> Enum.map(fn %{"id" => id} -> id end)
+ |> MapSet.new()
+ end
+
+ defp changes(db, query \\ %{}) do
+ resp = Couch.get("/#{db}/_changes", query: query)
+ assert resp.status_code == 200
+ resp.body
+ end
+
+ defp add_docs(range, db) do
+ docs = create_docs(range)
+ w3 = %{:w => 3}
+ resp = Couch.post("/#{db}/_bulk_docs", body: %{docs: docs}, query: w3)
+ assert resp.status_code == 201
+ assert length(resp.body) == length(docs)
+
+ docs
+ |> rev(resp.body)
+ |> Enum.into(%{}, fn %{:_id => id, :_rev => rev} -> {id, rev} end)
+ end
+
+ # (Keep for debugging)
+ # defp unpack_seq(seq) when is_binary(seq) do
+ # [_, opaque] = String.split(seq, "-")
+ # {:ok, binblob} = Base.url_decode64(opaque, padding: false)
+ # :erlang.binary_to_term(binblob)
+ # end
+end
diff --git a/test/elixir/test/reshard_helpers.exs b/test/elixir/test/reshard_helpers.exs
new file mode 100644
index 000000000..c67e6902e
--- /dev/null
+++ b/test/elixir/test/reshard_helpers.exs
@@ -0,0 +1,111 @@
+defmodule ReshardHelpers do
+ use CouchTestCase
+
+ def get_summary do
+ resp = Couch.get("/_reshard")
+ assert resp.status_code == 200
+ resp.body
+ end
+
+ def get_state do
+ resp = Couch.get("/_reshard/state")
+ assert resp.status_code == 200
+ resp.body
+ end
+
+ def put_state_running do
+ resp = Couch.put("/_reshard/state", body: %{:state => "running"})
+ assert resp.status_code == 200
+ resp
+ end
+
+ def put_state_stopped(reason \\ "") do
+ body = %{:state => "stopped", :reason => reason}
+ resp = Couch.put("/_reshard/state", body: body)
+ assert resp.status_code == 200
+ resp
+ end
+
+ def get_jobs do
+ resp = Couch.get("/_reshard/jobs")
+ assert resp.status_code == 200
+ resp.body["jobs"]
+ end
+
+ def post_job_db(db) do
+ body = %{:type => :split, :db => db}
+ Couch.post("/_reshard/jobs", body: body)
+ end
+
+ def post_job_node(db, node) do
+ body = %{:type => :split, :db => db, :node => node}
+ Couch.post("/_reshard/jobs", body: body)
+ end
+
+ def post_job_range(db, range) do
+ body = %{:type => :split, :db => db, :range => range}
+ Couch.post("/_reshard/jobs", body: body)
+ end
+
+ def post_job_node_and_range(db, node, range) do
+ body = %{:type => :split, :db => db, :node => node, :range => range}
+ Couch.post("/_reshard/jobs", body: body)
+ end
+
+ def get_job(id) when is_binary(id) do
+ Couch.get("/_reshard/jobs/#{id}")
+ end
+
+ def remove_job(id) when is_binary(id) do
+ Couch.delete("/_reshard/jobs/#{id}")
+ end
+
+ def get_job_state(id) when is_binary(id) do
+ resp = Couch.get("/_reshard/jobs/#{id}/state")
+ assert resp.status_code == 200
+ resp.body["state"]
+ end
+
+ def stop_job(id, reason \\ "") when is_binary(id) do
+ body = %{:state => "stopped", :reason => reason}
+ Couch.post("/_reshard/jobs/#{id}/state", body: body)
+ end
+
+ def resume_job(id) when is_binary(id) do
+ body = %{:state => "running"}
+ Couch.post("/_reshard/jobs/#{id}/state", body: body)
+ end
+
+ def job_ids(jobs) do
+ Enum.map(fn job -> job["id"] end, jobs)
+ end
+
+ def get_first_node do
+ mresp = Couch.get("/_membership")
+ assert mresp.status_code == 200
+ cluster_nodes = mresp.body["cluster_nodes"]
+ [node1 | _] = cluster_nodes
+ node1
+ end
+
+ def wait_job_removed(id) do
+ retry_until(fn -> get_job(id).status_code == 404 end, 200, 10_000)
+ end
+
+ def wait_job_completed(id) do
+ wait_job_state(id, "completed")
+ end
+
+ def wait_job_state(id, state) do
+ retry_until(fn -> get_job_state(id) == state end, 200, 10_000)
+ end
+
+ def reset_reshard_state do
+ get_jobs()
+ |> Enum.map(fn j -> j["id"] end)
+ |> Enum.each(fn id -> remove_job(id) end)
+
+ assert get_jobs() == []
+ put_state_running()
+ end
+end
diff --git a/test/elixir/test/test_helper.exs b/test/elixir/test/test_helper.exs
index 4df3bf74a..ef71bbb1b 100644
--- a/test/elixir/test/test_helper.exs
+++ b/test/elixir/test/test_helper.exs
@@ -13,3 +13,4 @@ ExUnit.configure(
ExUnit.start()
Code.require_file("partition_helpers.exs", __DIR__)
+Code.require_file("reshard_helpers.exs", __DIR__)