summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2019-03-18 13:32:15 -0400
committerNick Vatamaniuc <nickva@users.noreply.github.com>2019-04-03 10:48:45 -0400
commit9f9a6fc13eab305bfcb72f83aff175362862d9be (patch)
tree7c4e585ae9bd4954eba2efa657316ef898fcd6e9
parentb7d5b5d4661e742596a8b7b9302e0e3aa9f4cf5d (diff)
downloadcouchdb-9f9a6fc13eab305bfcb72f83aff175362862d9be.tar.gz
Shard splitting job implementation
This is the implementation of the shard splitting job. `mem3_reshard` manager spawns `mem3_reshard_job` instances via the `mem3_reshard_job_sup` supervisor. Each job is a gen_server process that starts in `mem3_reshard_job:init/1` with `#job{}` record instance as the argument. Then the job goes through recovery, so it can handle resuming in cases where the job was interrupted previously and it was initialized from a checkpointed state. Checkpoiting happens in `mem3_reshard` manager with the help of the `mem3_reshard_store` module (introduced in a previous commit). After recovery, processing starts in the `switch_state` function. The states are defined as a sequence of atoms in a list in `mem3_reshard.hrl`. In the `switch_state()` function, the state and history is updated in the `#job{}` record, then `mem3_reshard` manager is asked to checkpoint the new state. The job process waits for `mem3_reshard` manager to notify it when checkpointing has finished so it can continue processesing the new state. That happens when the `do_state` gen_server cast is received. `do_state` function has state matching heads for each state. Usually if there are long running tasks to be performed `do_state` will spawn a few workers and perform all the work in there. In the meantime the main job process will simpy wait for all the workers to exit. When that happens, it will call `switch_state` to switch to the new state, checkpoint again and so on. Since there are quite a few steps needed to split a shard, some of the helper function needed are defined in separate modules such as: * mem3_reshard_index : Index discovery and building. * mem3_reshard_dbdoc : Shard map updates. * couch_db_split : Initial (bulk) data copy (added in a separate commit). * mem3_rep : To perfom "top-offs" in between some steps.
-rw-r--r--src/mem3/src/mem3_reshard_dbdoc.erl275
-rw-r--r--src/mem3/src/mem3_reshard_index.erl164
-rw-r--r--src/mem3/src/mem3_reshard_job.erl722
-rw-r--r--src/mem3/src/mem3_reshard_store.erl288
-rw-r--r--src/mem3/src/mem3_reshard_validate.erl126
5 files changed, 1575 insertions, 0 deletions
diff --git a/src/mem3/src/mem3_reshard_dbdoc.erl b/src/mem3/src/mem3_reshard_dbdoc.erl
new file mode 100644
index 000000000..7eb3e9f13
--- /dev/null
+++ b/src/mem3/src/mem3_reshard_dbdoc.erl
@@ -0,0 +1,275 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_reshard_dbdoc).
+
+-behaviour(gen_server).
+
+-export([
+ update_shard_map/1,
+
+ start_link/0,
+
+ init/1,
+ terminate/2,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ code_change/3
+]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+-include("mem3_reshard.hrl").
+
+
+-spec update_shard_map(#job{}) -> no_return | ok.
+update_shard_map(#job{source = Source, target = Target} = Job) ->
+ Node = hd(mem3_util:live_nodes()),
+ JobStr = mem3_reshard_job:jobfmt(Job),
+ LogMsg1 = "~p : ~p calling update_shard_map node:~p",
+ couch_log:notice(LogMsg1, [?MODULE, JobStr, Node]),
+ ServerRef = {?MODULE, Node},
+ CallArg = {update_shard_map, Source, Target},
+ TimeoutMSec = shard_update_timeout_msec(),
+ try
+ case gen_server:call(ServerRef, CallArg, TimeoutMSec) of
+ {ok, _} -> ok;
+ {error, CallError} -> throw({error, CallError})
+ end
+ catch
+ _:Err ->
+ exit(Err)
+ end,
+ LogMsg2 = "~p : ~p update_shard_map on node:~p returned",
+ couch_log:notice(LogMsg2, [?MODULE, JobStr, Node]),
+ UntilSec = mem3_reshard:now_sec() + (TimeoutMSec div 1000),
+ case wait_source_removed(Source, 5, UntilSec) of
+ true -> ok;
+ false -> exit(shard_update_did_not_propagate)
+ end.
+
+
+-spec start_link() -> {ok, pid()} | ignore | {error, term()}.
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+init(_) ->
+ couch_log:notice("~p start init()", [?MODULE]),
+ {ok, nil}.
+
+
+terminate(_Reason, _State) ->
+ ok.
+
+
+handle_call({update_shard_map, Source, Target}, _From, State) ->
+ Res = try
+ update_shard_map(Source, Target)
+ catch
+ throw:{error, Error} ->
+ {error, Error}
+ end,
+ {reply, Res, State};
+
+handle_call(Call, From, State) ->
+ couch_log:error("~p unknown call ~p from: ~p", [?MODULE, Call, From]),
+ {noreply, State}.
+
+
+handle_cast(Cast, State) ->
+ couch_log:error("~p unexpected cast ~p", [?MODULE, Cast]),
+ {noreply, State}.
+
+
+handle_info(Info, State) ->
+ couch_log:error("~p unexpected info ~p", [?MODULE, Info]),
+ {noreply, State}.
+
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+
+% Private
+
+update_shard_map(Source, Target) ->
+ ok = validate_coordinator(),
+ ok = replicate_from_all_nodes(shard_update_timeout_msec()),
+ DocId = mem3:dbname(Source#shard.name),
+ OldDoc = case mem3_util:open_db_doc(DocId) of
+ {ok, #doc{deleted = true}} ->
+ throw({error, missing_source});
+ {ok, #doc{} = Doc} ->
+ Doc;
+ {not_found, deleted} ->
+ throw({error, missing_source});
+ OpenErr ->
+ throw({error, {shard_doc_open_error, OpenErr}})
+ end,
+ #doc{body = OldBody} = OldDoc,
+ NewBody = update_shard_props(OldBody, Source, Target),
+ {ok, _} = write_shard_doc(OldDoc, NewBody),
+ ok = replicate_to_all_nodes(shard_update_timeout_msec()),
+ {ok, NewBody}.
+
+
+validate_coordinator() ->
+ case hd(mem3_util:live_nodes()) =:= node() of
+ true -> ok;
+ false -> throw({error, coordinator_changed})
+ end.
+
+
+replicate_from_all_nodes(TimeoutMSec) ->
+ case mem3_util:replicate_dbs_from_all_nodes(TimeoutMSec) of
+ ok -> ok;
+ Error -> throw({error, Error})
+ end.
+
+
+replicate_to_all_nodes(TimeoutMSec) ->
+ case mem3_util:replicate_dbs_to_all_nodes(TimeoutMSec) of
+ ok -> ok;
+ Error -> throw({error, Error})
+ end.
+
+
+write_shard_doc(#doc{id = Id} = Doc, Body) ->
+ DbName = ?l2b(config:get("mem3", "shards_db", "_dbs")),
+ UpdatedDoc = Doc#doc{body = Body},
+ couch_util:with_db(DbName, fun(Db) ->
+ try
+ {ok, _} = couch_db:update_doc(Db, UpdatedDoc, [])
+ catch
+ conflict ->
+ throw({error, {conflict, Id, Doc#doc.body, UpdatedDoc}})
+ end
+ end).
+
+
+update_shard_props({Props0}, #shard{} = Source, [#shard{} | _] = Targets) ->
+ {ByNode0} = couch_util:get_value(<<"by_node">>, Props0, {[]}),
+ ByNodeKV = {<<"by_node">>, {update_by_node(ByNode0, Source, Targets)}},
+ Props1 = lists:keyreplace(<<"by_node">>, 1, Props0, ByNodeKV),
+
+ {ByRange0} = couch_util:get_value(<<"by_range">>, Props1, {[]}),
+ ByRangeKV = {<<"by_range">>, {update_by_range(ByRange0, Source, Targets)}},
+ Props2 = lists:keyreplace(<<"by_range">>, 1, Props1, ByRangeKV),
+
+ Changelog = couch_util:get_value(<<"changelog">>, Props2, []),
+ {Node, Range} = {node_key(Source), range_key(Source)},
+ TRanges = [range_key(T) || T <- Targets],
+ ChangelogEntry = [[<<"split">>, Range, TRanges, Node]],
+ ChangelogKV = {<<"changelog">>, Changelog ++ ChangelogEntry},
+ Props3 = lists:keyreplace(<<"changelog">>, 1, Props2, ChangelogKV),
+
+ {Props3}.
+
+
+update_by_node(ByNode, #shard{} = Source, [#shard{} | _] = Targets) ->
+ {NodeKey, SKey} = {node_key(Source), range_key(Source)},
+ {_, Ranges} = lists:keyfind(NodeKey, 1, ByNode),
+ Ranges1 = Ranges -- [SKey],
+ Ranges2 = Ranges1 ++ [range_key(T) || T <- Targets],
+ lists:keyreplace(NodeKey, 1, ByNode, {NodeKey, lists:sort(Ranges2)}).
+
+
+update_by_range(ByRange, Source, Targets) ->
+ ByRange1 = remove_node_from_source(ByRange, Source),
+ lists:foldl(fun add_node_to_target_foldl/2, ByRange1, Targets).
+
+
+remove_node_from_source(ByRange, Source) ->
+ {NodeKey, SKey} = {node_key(Source), range_key(Source)},
+ {_, SourceNodes} = lists:keyfind(SKey, 1, ByRange),
+ % Double check that source had node to begin with
+ case lists:member(NodeKey, SourceNodes) of
+ true ->
+ ok;
+ false ->
+ throw({source_shard_missing_node, NodeKey, SourceNodes})
+ end,
+ SourceNodes1 = SourceNodes -- [NodeKey],
+ case SourceNodes1 of
+ [] ->
+ % If last node deleted, remove entry
+ lists:keydelete(SKey, 1, ByRange);
+ _ ->
+ lists:keyreplace(SKey, 1, ByRange, {SKey, SourceNodes1})
+ end.
+
+
+add_node_to_target_foldl(#shard{} = Target, ByRange) ->
+ {NodeKey, TKey} = {node_key(Target), range_key(Target)},
+ case lists:keyfind(TKey, 1, ByRange) of
+ {_, Nodes} ->
+ % Double check that target does not have node already
+ case lists:member(NodeKey, Nodes) of
+ false ->
+ ok;
+ true ->
+ throw({target_shard_already_has_node, NodeKey, Nodes})
+ end,
+ Nodes1 = lists:sort([NodeKey | Nodes]),
+ lists:keyreplace(TKey, 1, ByRange, {TKey, Nodes1});
+ false ->
+ % fabric_db_create:make_document/3 says they should be sorted
+ lists:sort([{TKey, [NodeKey]} | ByRange])
+ end.
+
+
+node_key(#shard{node = Node}) ->
+ couch_util:to_binary(Node).
+
+
+range_key(#shard{range = [B, E]}) ->
+ BHex = couch_util:to_hex(<<B:32/integer>>),
+ EHex = couch_util:to_hex(<<E:32/integer>>),
+ list_to_binary([BHex, "-", EHex]).
+
+
+shard_update_timeout_msec() ->
+ config:get_integer("reshard", "shard_upate_timeout_msec", 300000).
+
+
+wait_source_removed(#shard{name = Name} = Source, SleepSec, UntilSec) ->
+ case check_source_removed(Source) of
+ true ->
+ true;
+ false ->
+ case mem3_reshard:now_sec() < UntilSec of
+ true ->
+ LogMsg = "~p : Waiting for shard ~p removal confirmation",
+ couch_log:notice(LogMsg, [?MODULE, Name]),
+ timer:sleep(SleepSec * 1000),
+ wait_source_removed(Source, SleepSec, UntilSec);
+ false ->
+ false
+ end
+ end.
+
+
+check_source_removed(#shard{name = Name}) ->
+ DbName = mem3:dbname(Name),
+ Live = mem3_util:live_nodes(),
+ ShardNodes = [N || #shard{node = N} <- mem3:shards(DbName)],
+ Nodes = lists:usort([N || N <- ShardNodes, lists:member(N, Live)]),
+ {Responses, _} = rpc:multicall(Nodes, mem3, shards, [DbName]),
+ Shards = lists:usort(lists:flatten(Responses)),
+ SourcePresent = [S || S = #shard{name = S, node = N} <- Shards, S =:= Name,
+ N =:= node()],
+ case SourcePresent of
+ [] -> true;
+ [_ | _] -> false
+ end.
diff --git a/src/mem3/src/mem3_reshard_index.erl b/src/mem3/src/mem3_reshard_index.erl
new file mode 100644
index 000000000..d4cb7caa1
--- /dev/null
+++ b/src/mem3/src/mem3_reshard_index.erl
@@ -0,0 +1,164 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_reshard_index).
+
+
+-export([
+ design_docs/1,
+ target_indices/2,
+ spawn_builders/1
+]).
+
+
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+
+%% Public API
+
+design_docs(DbName) ->
+ try
+ case fabric_design_docs(mem3:dbname(DbName)) of
+ {error, {maintenance_mode, _, _Node}} ->
+ {ok, []};
+ {ok, DDocs} ->
+ JsonDocs = [couch_doc:from_json_obj(DDoc) || DDoc <- DDocs],
+ {ok, JsonDocs};
+ Else ->
+ Else
+ end
+ catch error:database_does_not_exist ->
+ {ok, []}
+ end.
+
+
+target_indices(Docs, Targets) ->
+ Indices = [[indices(N, D) || D <- Docs] || #shard{name = N} <- Targets],
+ lists:flatten(Indices).
+
+
+spawn_builders(Indices) ->
+ Results = [build_index(Index) || Index <- Indices],
+ Oks = [{ok, Pid} || {ok, Pid} <- Results, is_pid(Pid)],
+ case Results -- Oks of
+ [] ->
+ {ok, [Pid || {ok, Pid} <- Results]};
+ Error ->
+ % Do a all or nothing pattern, if some indices could not be
+ % spawned, kill the spawned ones and and return the error.
+ ErrMsg = "~p failed to spawn index builders: ~p ~p",
+ couch_log:error(ErrMsg, [?MODULE, Error, Indices]),
+ lists:foreach(fun({ok, Pid}) ->
+ catch unlink(Pid),
+ catch exit(Pid, kill)
+ end, Oks),
+ {error, Error}
+ end.
+
+
+%% Private API
+
+fabric_design_docs(DbName) ->
+ case couch_util:with_proc(fabric, design_docs, [DbName], infinity) of
+ {ok, Resp} -> Resp;
+ {error, Error} -> Error
+ end.
+
+
+indices(DbName, Doc) ->
+ mrview_indices(DbName, Doc)
+ ++ [dreyfus_indices(DbName, Doc) || has_app(dreyfus)]
+ ++ [hastings_indices(DbName, Doc) || has_app(hastings)].
+
+
+mrview_indices(DbName, Doc) ->
+ try
+ {ok, MRSt} = couch_mrview_util:ddoc_to_mrst(DbName, Doc),
+ Views = couch_mrview_index:get(views, MRSt),
+ case Views =/= [] of
+ true ->
+ [{mrview, DbName, MRSt}];
+ false ->
+ []
+ end
+ catch
+ Tag:Err ->
+ Msg = "~p couldn't get mrview index ~p ~p ~p:~p",
+ couch_log:error(Msg, [?MODULE, DbName, Doc, Tag, Err]),
+ []
+ end.
+
+
+dreyfus_indices(DbName, Doc) ->
+ try
+ Indices = dreyfus_index:design_doc_to_indexes(Doc),
+ [{dreyfus, DbName, Index} || Index <- Indices]
+ catch
+ Tag:Err ->
+ Msg = "~p couldn't get dreyfus indices ~p ~p ~p:~p",
+ couch_log:error(Msg, [?MODULE, DbName, Doc, Tag, Err]),
+ []
+ end.
+
+
+hastings_indices(DbName, Doc) ->
+ try
+ Indices = hastings_index:design_doc_to_indexes(Doc),
+ [{hastings, DbName, Index} || Index <- Indices]
+ catch
+ Tag:Err ->
+ Msg = "~p couldn't get hasting indices ~p ~p ~p:~p",
+ couch_log:error(Msg, [?MODULE, DbName, Doc, Tag, Err]),
+ []
+ end.
+
+
+build_index({mrview, DbName, MRSt}) ->
+ case couch_index_server:get_index(couch_mrview_index, MRSt) of
+ {ok, Pid} ->
+ Args = [Pid, get_update_seq(DbName)],
+ WPid = spawn_link(couch_index, get_state, Args),
+ {ok, WPid};
+ Error ->
+ Error
+ end;
+
+build_index({dreyfus, DbName, Index})->
+ case dreyfus_index_manager:get_index(DbName, Index) of
+ {ok, Pid} ->
+ Args = [Pid, get_update_seq(DbName)],
+ WPid = spawn_link(dreyfus_index, await, Args),
+ {ok, WPid};
+ Error ->
+ Error
+ end;
+
+build_index({hastings, DbName, Index}) ->
+ case hastings_index_manager:get_index(DbName, Index) of
+ {ok, Pid} ->
+ Args = [Pid, get_update_seq(DbName)],
+ WPid = spawn_link(hastings_index, await, Args),
+ {ok, WPid};
+ Error ->
+ Error
+ end.
+
+
+has_app(App) ->
+ code:lib_dir(App) /= {error, bad_name}.
+
+
+get_update_seq(DbName) ->
+ couch_util:with_db(DbName, fun(Db) ->
+ couch_db:get_update_seq(Db)
+ end).
diff --git a/src/mem3/src/mem3_reshard_job.erl b/src/mem3/src/mem3_reshard_job.erl
new file mode 100644
index 000000000..d3a33d3f6
--- /dev/null
+++ b/src/mem3/src/mem3_reshard_job.erl
@@ -0,0 +1,722 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_reshard_job).
+
+
+-export([
+ start_link/1,
+
+ checkpoint_done/1,
+ jobfmt/1,
+ pickfun/3
+]).
+
+-export([
+ init/1,
+
+ initial_copy/1,
+ initial_copy_impl/1,
+
+ topoff/1,
+ topoff_impl/1,
+
+ build_indices/1,
+
+ copy_local_docs/1,
+ copy_local_docs_impl/1,
+
+ update_shardmap/1,
+
+ wait_source_close/1,
+ wait_source_close_impl/1,
+
+ source_delete/1,
+ source_delete_impl/1,
+
+ completed/1
+]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+-include("mem3_reshard.hrl").
+
+
+% Batch size for internal replication topoffs
+-define(INTERNAL_REP_BATCH_SIZE, 2000).
+
+% The list of possible job states. The order of this
+% list is important as a job will progress linearly
+% through it. However, when starting a job we may
+% have to resume from an earlier state as listed
+% below in STATE_RESTART.
+-define(SPLIT_STATES, [
+ new,
+ initial_copy,
+ topoff1,
+ build_indices,
+ topoff2,
+ copy_local_docs,
+ update_shardmap,
+ wait_source_close,
+ topoff3,
+ source_delete,
+ completed
+]).
+
+
+% When a job starts it may be resuming from a partially
+% completed state. These state pairs list the state
+% we have to restart from for each possible state.
+-define(STATE_RESTART, #{
+ new => initial_copy,
+ initial_copy => initial_copy,
+ topoff1 => topoff1,
+ build_indices => topoff1,
+ topoff2 => topoff1,
+ copy_local_docs => topoff1,
+ update_shardmap => update_shardmap,
+ wait_source_close => wait_source_close,
+ topoff3 => wait_source_close,
+ source_delete => wait_source_close,
+ completed => completed
+}).
+
+
+% If we have a worker failing during any of these
+% states we need to clean up the targets
+-define(CLEAN_TARGET_STATES, [
+ initial_copy,
+ topoff1,
+ build_indices,
+ topoff2,
+ copy_local_docs
+]).
+
+
+start_link(#job{} = Job) ->
+ proc_lib:start_link(?MODULE, init, [Job]).
+
+
+% This is called by the main proces after it has checkpointed the progress
+% of the job. After the new state is checkpointed, we signal the job to start
+% executing that state.
+checkpoint_done(#job{pid = Pid} = Job) ->
+ couch_log:notice(" ~p : checkpoint done for ~p", [?MODULE, jobfmt(Job)]),
+ Pid ! checkpoint_done,
+ ok.
+
+
+% Formatting function, used for logging mostly
+jobfmt(#job{} = Job) ->
+ #job{
+ id = Id,
+ source = #shard{name = Source},
+ target = Target,
+ split_state = State,
+ job_state = JobState,
+ pid = Pid
+ } = Job,
+ TargetCount = length(Target),
+ Msg = "#job{~s ~s /~B job_state:~s split_state:~s pid:~p}",
+ Fmt = io_lib:format(Msg, [Id, Source, TargetCount, JobState, State, Pid]),
+ lists:flatten(Fmt).
+
+
+% This is the function which picks between various targets. It is used here as
+% well as in mem3_rep internal replicator and couch_db_split bulk copy logic.
+% Given a document id and list of ranges, and a hash function, it will pick one
+% of the range or return not_in_range atom.
+pickfun(DocId, [[B, E] | _] = Ranges, {_M, _F, _A} = HashFun) when
+ is_integer(B), is_integer(E), B =< E ->
+ HashKey = mem3_hash:calculate(HashFun, DocId),
+ Pred = fun([Begin, End]) ->
+ Begin =< HashKey andalso HashKey =< End
+ end,
+ case lists:filter(Pred, Ranges) of
+ [] -> not_in_range;
+ [Key] -> Key
+ end.
+
+
+init(#job{} = Job0) ->
+ process_flag(trap_exit, true),
+ Job1 = set_start_state(Job0#job{
+ pid = self(),
+ start_time = mem3_reshard:now_sec(),
+ workers = [],
+ retries = 0
+ }),
+ Job2 = update_split_history(Job1),
+ proc_lib:init_ack({ok, self()}),
+ couch_log:notice("~p starting job ~s", [?MODULE, jobfmt(Job2)]),
+ ok = checkpoint(Job2),
+ run(Job2).
+
+
+run(#job{split_state = CurrState} = Job) ->
+ StateFun = case CurrState of
+ topoff1 -> topoff;
+ topoff2 -> topoff;
+ topoff3 -> topoff;
+ _ -> CurrState
+ end,
+ NewJob = try
+ Job1 = ?MODULE:StateFun(Job),
+ Job2 = wait_for_workers(Job1),
+ Job3 = switch_to_next_state(Job2),
+ ok = checkpoint(Job3),
+ Job3
+ catch
+ throw:{retry, RetryJob} ->
+ RetryJob
+ end,
+ run(NewJob).
+
+
+set_start_state(#job{split_state = State} = Job) ->
+ case {State, maps:get(State, ?STATE_RESTART, undefined)} of
+ {_, undefined} ->
+ Fmt1 = "~p recover : unknown state ~s",
+ couch_log:error(Fmt1, [?MODULE, jobfmt(Job)]),
+ erlang:error({invalid_split_job_recover_state, Job});
+ {initial_copy, initial_copy} ->
+ % Since we recover from initial_copy to initial_copy, we need
+ % to reset the target state as initial_copy expects to
+ % create a new target
+ Fmt2 = "~p recover : resetting target ~s",
+ couch_log:notice(Fmt2, [?MODULE, jobfmt(Job)]),
+ reset_target(Job);
+ {_, StartState} ->
+ Job#job{split_state = StartState}
+ end.
+
+
+get_next_state(#job{split_state = State}) ->
+ get_next_state(State, ?SPLIT_STATES).
+
+
+get_next_state(completed, _) ->
+ completed;
+
+get_next_state(CurrState, [CurrState, NextState | _]) ->
+ NextState;
+
+get_next_state(CurrState, [_ | Rest]) ->
+ get_next_state(CurrState, Rest).
+
+
+switch_to_next_state(#job{} = Job0) ->
+ Info0 = Job0#job.state_info,
+ Info1 = info_delete(error, Info0),
+ Info2 = info_delete(reason, Info1),
+ Job1 = Job0#job{
+ split_state = get_next_state(Job0),
+ update_time = mem3_reshard:now_sec(),
+ retries = 0,
+ state_info = Info2,
+ workers = []
+ },
+ Job2 = update_split_history(Job1),
+ check_state(Job2).
+
+
+checkpoint(Job) ->
+ % Ask main process to checkpoint. When it has finished it will notify us
+ % by calling by checkpoint_done/1. The reason not to call the main process
+ % via a gen_server:call is because the main process could be in the middle
+ % of terminating the job and then it would deadlock (after sending us a
+ % shutdown message) and it would end up using the whole supervisor
+ % termination timeout before finally.
+ ok = mem3_reshard:checkpoint(Job#job.manager, Job),
+ Parent = parent(),
+ receive
+ {'EXIT', Parent, Reason} ->
+ handle_exit(Job, Reason);
+ checkpoint_done ->
+ ok;
+ Other ->
+ handle_unknown_msg(Job, "checkpoint", Other)
+ end.
+
+
+wait_for_workers(#job{workers = []} = Job) ->
+ Job;
+
+wait_for_workers(#job{workers = Workers} = Job) ->
+ Parent = parent(),
+ receive
+ {'EXIT', Parent, Reason} ->
+ handle_exit(Job, Reason);
+ {'EXIT', Pid, Reason} ->
+ case lists:member(Pid, Workers) of
+ true ->
+ NewJob = handle_worker_exit(Job, Pid, Reason),
+ wait_for_workers(NewJob);
+ false ->
+ handle_unknown_msg(Job, "wait_for_workers", {Pid, Reason})
+ end;
+ Other ->
+ handle_unknown_msg(Job, "wait_for_workers", Other)
+ end.
+
+
+handle_worker_exit(#job{workers = Workers} = Job, Pid, normal) ->
+ Job#job{workers = Workers -- [Pid]};
+
+handle_worker_exit(#job{} = Job, _Pid, {error, missing_source}) ->
+ Msg1 = "~p stopping worker due to source missing ~p",
+ couch_log:error(Msg1, [?MODULE, jobfmt(Job)]),
+ kill_workers(Job),
+ case lists:member(Job#job.split_state, ?CLEAN_TARGET_STATES) of
+ true ->
+ Msg2 = "~p cleaning target after db was deleted ~p",
+ couch_log:error(Msg2, [?MODULE, jobfmt(Job)]),
+ reset_target(Job),
+ exit({error, missing_source});
+ false ->
+ exit({error, missing_source})
+ end;
+
+handle_worker_exit(#job{} = Job, _Pid, {error, missing_target}) ->
+ Msg = "~p stopping worker due to target db missing ~p",
+ couch_log:error(Msg, [?MODULE, jobfmt(Job)]),
+ kill_workers(Job),
+ exit({error, missing_target});
+
+handle_worker_exit(#job{} = Job0, _Pid, Reason) ->
+ couch_log:error("~p worker error ~p ~p", [?MODULE, jobfmt(Job0), Reason]),
+ kill_workers(Job0),
+ Job1 = Job0#job{workers = []},
+ case Job1#job.retries =< max_retries() of
+ true ->
+ retry_state(Job1, Reason);
+ false ->
+ exit(Reason)
+ end.
+
+
+% Cleanup and exit when we receive an 'EXIT' message from our parent. In case
+% the shard map is being updated, try to wait some time for it to finish.
+handle_exit(#job{split_state = update_shardmap, workers = [WPid]} = Job,
+ Reason) ->
+ Timeout = update_shard_map_timeout_sec(),
+ Msg1 = "~p job exit ~s ~p while shard map is updating, waiting ~p sec",
+ couch_log:warning(Msg1, [?MODULE, jobfmt(Job), Reason, Timeout]),
+ receive
+ {'EXIT', WPid, normal} ->
+ Msg2 = "~p ~s shard map finished updating successfully, exiting",
+ couch_log:notice(Msg2, [?MODULE, jobfmt(Job)]),
+ exit(Reason);
+ {'EXIT', WPid, Error} ->
+ Msg3 = "~p ~s shard map update failed with error ~p",
+ couch_log:error(Msg3, [?MODULE, jobfmt(Job), Error]),
+ exit(Reason)
+ after Timeout * 1000->
+ Msg4 = "~p ~s shard map update timeout exceeded ~p sec",
+ couch_log:error(Msg4, [?MODULE, jobfmt(Job), Timeout]),
+ kill_workers(Job),
+ exit(Reason)
+ end;
+
+handle_exit(#job{} = Job, Reason) ->
+ kill_workers(Job),
+ exit(Reason).
+
+
+retry_state(#job{retries = Retries, state_info = Info} = Job0, Error) ->
+ Job1 = Job0#job{
+ retries = Retries + 1,
+ state_info = info_update(error, Error, Info)
+ },
+ couch_log:notice("~p retrying ~p ~p", [?MODULE, jobfmt(Job1), Retries]),
+ Job2 = report(Job1),
+ Timeout = retry_interval_sec(),
+ Parent = parent(),
+ receive
+ {'EXIT', Parent, Reason} ->
+ handle_exit(Job2, Reason);
+ Other ->
+ handle_unknown_msg(Job2, "retry_state", Other)
+ after Timeout * 1000 ->
+ ok
+ end,
+ throw({retry, Job2}).
+
+
+report(#job{manager = ManagerPid} = Job) ->
+ Job1 = Job#job{update_time = mem3_reshard:now_sec()},
+ ok = mem3_reshard:report(ManagerPid, Job1),
+ Job1.
+
+
+kill_workers(#job{workers = Workers}) ->
+ lists:foreach(fun(Worker) ->
+ unlink(Worker),
+ exit(Worker, kill)
+ end, Workers),
+ flush_worker_messages().
+
+
+flush_worker_messages() ->
+ Parent = parent(),
+ receive
+ {'EXIT', Pid, _} when Pid =/= Parent ->
+ flush_worker_messages()
+ after 0 ->
+ ok
+ end.
+
+
+parent() ->
+ case get('$ancestors') of
+ [Pid | _] when is_pid(Pid) -> Pid;
+ [Name | _] when is_atom(Name) -> whereis(Name);
+ _ -> undefined
+ end.
+
+
+handle_unknown_msg(Job, When, RMsg) ->
+ LogMsg = "~p ~s received an unknown message ~p when in ~s",
+ couch_log:error(LogMsg, [?MODULE, jobfmt(Job), RMsg, When]),
+ erlang:error({invalid_split_job_message, Job#job.id, When, RMsg}).
+
+
+initial_copy(#job{} = Job) ->
+ Pid = spawn_link(?MODULE, initial_copy_impl, [Job]),
+ report(Job#job{workers = [Pid]}).
+
+
+initial_copy_impl(#job{source = Source, target = Targets0} = Job) ->
+ #shard{name = SourceName} = Source,
+ Targets = [{R, N} || #shard{range = R, name = N} <- Targets0],
+ TMap = maps:from_list(Targets),
+ LogMsg1 = "~p initial_copy started ~s",
+ LogArgs1 = [?MODULE, shardsstr(Source, Targets0)],
+ couch_log:notice(LogMsg1, LogArgs1),
+ case couch_db_split:split(SourceName, TMap, fun pickfun/3) of
+ {ok, Seq} ->
+ LogMsg2 = "~p initial_copy of ~s finished @ seq:~p",
+ LogArgs2 = [?MODULE, shardsstr(Source, Targets0), Seq],
+ couch_log:notice(LogMsg2, LogArgs2),
+ create_artificial_mem3_rep_checkpoints(Job, Seq);
+ {error, Error} ->
+ LogMsg3 = "~p initial_copy of ~p finished @ ~p",
+ LogArgs3 = [?MODULE, shardsstr(Source, Targets0), Error],
+ couch_log:notice(LogMsg3, LogArgs3),
+ exit({error, Error})
+ end.
+
+
+topoff(#job{} = Job) ->
+ Pid = spawn_link(?MODULE, topoff_impl, [Job]),
+ report(Job#job{workers = [Pid]}).
+
+
+topoff_impl(#job{source = #shard{} = Source, target = Targets}) ->
+ couch_log:notice("~p topoff ~p", [?MODULE, shardsstr(Source, Targets)]),
+ check_source_exists(Source, topoff),
+ check_targets_exist(Targets, topoff),
+ TMap = maps:from_list([{R, T} || #shard{range = R} = T <- Targets]),
+ Opts = [{batch_size, ?INTERNAL_REP_BATCH_SIZE}, {batch_count, all}],
+ case mem3_rep:go(Source, TMap, Opts) of
+ {ok, Count} ->
+ Args = [?MODULE, shardsstr(Source, Targets), Count],
+ couch_log:notice("~p topoff done ~s, count: ~p", Args),
+ ok;
+ {error, Error} ->
+ Args = [?MODULE, shardsstr(Source, Targets), Error],
+ couch_log:error("~p topoff failed ~s, error: ~p", Args),
+ exit({error, Error})
+ end.
+
+
+build_indices(#job{} = Job) ->
+ #job{
+ source = #shard{name = SourceName} = Source,
+ target = Targets,
+ retries = Retries,
+ state_info = Info
+ } = Job,
+ check_source_exists(Source, build_indices),
+ {ok, DDocs} = mem3_reshard_index:design_docs(SourceName),
+ Indices = mem3_reshard_index:target_indices(DDocs, Targets),
+ case mem3_reshard_index:spawn_builders(Indices) of
+ {ok, []} ->
+ % Skip the log spam if this is a no-op
+ Job#job{workers = []};
+ {ok, Pids} ->
+ report(Job#job{workers = Pids});
+ {error, Error} ->
+ case Job#job.retries =< max_retries() of
+ true ->
+ build_indices(Job#job{
+ retries = Retries + 1,
+ state_info = info_update(error, Error, Info)
+ });
+ false ->
+ exit(Error)
+ end
+ end.
+
+
+copy_local_docs(#job{split_state = copy_local_docs} = Job) ->
+ Pid = spawn_link(?MODULE, copy_local_docs_impl, [Job]),
+ report(Job#job{workers = [Pid]}).
+
+
+copy_local_docs_impl(#job{source = Source, target = Targets0}) ->
+ #shard{name = SourceName} = Source,
+ Targets = [{R, N} || #shard{range = R, name = N} <- Targets0],
+ TMap = maps:from_list(Targets),
+ LogArg1 = [?MODULE, shardsstr(Source, Targets)],
+ couch_log:notice("~p copy local docs start ~s", LogArg1),
+ case couch_db_split:copy_local_docs(SourceName, TMap, fun pickfun/3) of
+ ok ->
+ couch_log:notice("~p copy local docs finished for ~s", LogArg1),
+ ok;
+ {error, Error} ->
+ LogArg2 = [?MODULE, shardsstr(Source, Targets), Error],
+ couch_log:error("~p copy local docs failed for ~s ~p", LogArg2),
+ exit({error, Error})
+ end.
+
+
+update_shardmap(#job{} = Job) ->
+ Pid = spawn_link(mem3_reshard_dbdoc, update_shard_map, [Job]),
+ report(Job#job{workers = [Pid]}).
+
+
+wait_source_close(#job{source = #shard{name = Name}} = Job) ->
+ couch_event:notify(Name, deleted),
+ Pid = spawn_link(?MODULE, wait_source_close_impl, [Job]),
+ report(Job#job{workers = [Pid]}).
+
+
+wait_source_close_impl(#job{source = #shard{name = Name}, target = Targets}) ->
+ Timeout = config:get_integer("reshard", "source_close_timeout_sec", 600),
+ check_targets_exist(Targets, wait_source_close),
+ case couch_db:open_int(Name, [?ADMIN_CTX]) of
+ {ok, Db} ->
+ Now = mem3_reshard:now_sec(),
+ case wait_source_close(Db, 1, Now + Timeout) of
+ true ->
+ ok;
+ false ->
+ exit({error, source_db_close_timeout, Name, Timeout})
+ end;
+ {not_found, _} ->
+ couch_log:warning("~p source already deleted ~p", [?MODULE, Name]),
+ ok
+ end.
+
+
+wait_source_close(Db, SleepSec, UntilSec) ->
+ case couch_db:monitored_by(Db) -- [self()] of
+ [] ->
+ true;
+ [_ | _] ->
+ Now = mem3_reshard:now_sec(),
+ case Now < UntilSec of
+ true ->
+ LogMsg = "~p : Waiting for source shard ~p to be closed",
+ couch_log:notice(LogMsg, [?MODULE, couch_db:name(Db)]),
+ timer:sleep(SleepSec * 1000),
+ wait_source_close(Db, SleepSec, UntilSec);
+ false ->
+ false
+ end
+ end.
+
+
+source_delete(#job{} = Job) ->
+ Pid = spawn_link(?MODULE, source_delete_impl, [Job]),
+ report(Job#job{workers = [Pid]}).
+
+
+source_delete_impl(#job{source = #shard{name = Name}, target = Targets}) ->
+ check_targets_exist(Targets, source_delete),
+ case config:get_boolean("mem3_reshard", "delete_source", true) of
+ true ->
+ case couch_server:delete(Name, [?ADMIN_CTX]) of
+ ok ->
+ couch_log:notice("~p : deleted source shard ~p",
+ [?MODULE, Name]);
+ not_found ->
+ couch_log:warning("~p : source was already deleted ~p",
+ [?MODULE, Name])
+ end;
+ false ->
+ % Emit deleted event even when not actually deleting the files this
+ % is the second one emitted, the other one was before
+ % wait_source_close. They should be idempotent. This one is just to
+ % match the one that couch_server would emit had the config not
+ % been set
+ couch_event:notify(Name, deleted),
+ LogMsg = "~p : according to configuration not deleting source ~p",
+ couch_log:warning(LogMsg, [?MODULE, Name])
+ end,
+ TNames = [TName || #shard{name = TName} <- Targets],
+ lists:foreach(fun(TName) -> couch_event:notify(TName, updated) end, TNames).
+
+
+completed(#job{} = Job) ->
+ couch_log:notice("~p : ~p completed, exit normal", [?MODULE, jobfmt(Job)]),
+ exit(normal).
+
+
+% This is for belt and suspenders really. Call periodically to validate the
+% state is one of the expected states.
+-spec check_state(#job{}) -> #job{} | no_return().
+check_state(#job{split_state = State} = Job) ->
+ case lists:member(State, ?SPLIT_STATES) of
+ true ->
+ Job;
+ false ->
+ erlang:error({invalid_shard_split_state, State, Job})
+ end.
+
+
+create_artificial_mem3_rep_checkpoints(#job{} = Job, Seq) ->
+ #job{source = Source = #shard{name = SourceName}, target = Targets} = Job,
+ check_source_exists(Source, initial_copy),
+ TNames = [TN || #shard{name = TN} <- Targets],
+ Timestamp = list_to_binary(mem3_util:iso8601_timestamp()),
+ couch_util:with_db(SourceName, fun(SDb) ->
+ [couch_util:with_db(TName, fun(TDb) ->
+ Doc = mem3_rep_checkpoint_doc(SDb, TDb, Timestamp, Seq),
+ {ok, _} = couch_db:update_doc(SDb, Doc, []),
+ {ok, _} = couch_db:update_doc(TDb, Doc, []),
+ ok
+ end) || TName <- TNames]
+ end),
+ ok.
+
+
+mem3_rep_checkpoint_doc(SourceDb, TargetDb, Timestamp, Seq) ->
+ Node = atom_to_binary(node(), utf8),
+ SourceUUID = couch_db:get_uuid(SourceDb),
+ TargetUUID = couch_db:get_uuid(TargetDb),
+ History = {[
+ {<<"source_node">>, Node},
+ {<<"source_uuid">>, SourceUUID},
+ {<<"source_seq">>, Seq},
+ {<<"timestamp">>, Timestamp},
+ {<<"target_node">>, Node},
+ {<<"target_uuid">>, TargetUUID},
+ {<<"target_seq">>, Seq}
+ ]},
+ Body = {[
+ {<<"seq">>, Seq},
+ {<<"target_uuid">>, TargetUUID},
+ {<<"history">>, {[{Node, [History]}]}}
+ ]},
+ Id = mem3_rep:make_local_id(SourceUUID, TargetUUID),
+ #doc{id = Id, body = Body}.
+
+
+check_source_exists(#shard{name = Name}, StateName) ->
+ case couch_server:exists(Name) of
+ true ->
+ ok;
+ false ->
+ ErrMsg = "~p source ~p is unexpectedly missing in ~p",
+ couch_log:error(ErrMsg, [?MODULE, Name, StateName]),
+ exit({error, missing_source})
+ end.
+
+
+check_targets_exist(Targets, StateName) ->
+ lists:foreach(fun(#shard{name = Name}) ->
+ case couch_server:exists(Name) of
+ true ->
+ ok;
+ false ->
+ ErrMsg = "~p target ~p is unexpectedly missing in ~p",
+ couch_log:error(ErrMsg, [?MODULE, Name, StateName]),
+ exit({error, missing_target})
+ end
+ end, Targets).
+
+
+-spec max_retries() -> integer().
+max_retries() ->
+ config:get_integer("reshard", "max_retries", 1).
+
+
+-spec retry_interval_sec() -> integer().
+retry_interval_sec() ->
+ config:get_integer("reshard", "retry_interval_sec", 10).
+
+
+-spec update_shard_map_timeout_sec() -> integer().
+update_shard_map_timeout_sec() ->
+ config:get_integer("reshard", "update_shardmap_timeout_sec", 60).
+
+
+-spec info_update(atom(), any(), [tuple()]) -> [tuple()].
+info_update(Key, Val, StateInfo) ->
+ lists:keystore(Key, 1, StateInfo, {Key, Val}).
+
+
+-spec info_delete(atom(), [tuple()]) -> [tuple()].
+info_delete(Key, StateInfo) ->
+ lists:keydelete(Key, 1, StateInfo).
+
+
+-spec shardsstr(#shard{}, #shard{} | [#shard{}]) -> string().
+shardsstr(#shard{name = SourceName}, #shard{name = TargetName}) ->
+ lists:flatten(io_lib:format("~s -> ~s", [SourceName, TargetName]));
+
+shardsstr(#shard{name = SourceName}, Targets) ->
+ TNames = [TN || #shard{name = TN} <- Targets],
+ TargetsStr = string:join([binary_to_list(T) || T <- TNames], ","),
+ lists:flatten(io_lib:format("~s -> ~s", [SourceName, TargetsStr])).
+
+
+-spec reset_target(#job{}) -> #job{}.
+reset_target(#job{source = Source, target = Targets} = Job) ->
+ ShardNames = try
+ [N || #shard{name = N} <- mem3:local_shards(mem3:dbname(Source))]
+ catch
+ error:database_does_not_exist ->
+ []
+ end,
+ lists:map(fun(#shard{name = Name}) ->
+ case {couch_server:exists(Name), lists:member(Name, ShardNames)} of
+ {_, true} ->
+ % Should never get here but if we do crash and don't continue
+ LogMsg = "~p : ~p target unexpectedly found in shard map ~p",
+ couch_log:error(LogMsg, [?MODULE, jobfmt(Job), Name]),
+ erlang:error({target_present_in_shard_map, Name});
+ {true, false} ->
+ LogMsg = "~p : ~p resetting ~p target",
+ couch_log:warning(LogMsg, [?MODULE, jobfmt(Job), Name]),
+ couch_db_split:cleanup_target(Source#shard.name, Name);
+ {false, false} ->
+ ok
+ end
+ end, Targets),
+ Job.
+
+
+-spec update_split_history(#job{}) -> #job{}.
+update_split_history(#job{split_state = St, update_time = Ts} = Job) ->
+ Hist = Job#job.history,
+ JobSt = case St of
+ completed -> completed;
+ failed -> failed;
+ new -> new;
+ stopped -> stopped;
+ _ -> running
+ end,
+ Job#job{history = mem3_reshard:update_history(JobSt, St, Ts, Hist)}.
diff --git a/src/mem3/src/mem3_reshard_store.erl b/src/mem3/src/mem3_reshard_store.erl
new file mode 100644
index 000000000..a1e00544a
--- /dev/null
+++ b/src/mem3/src/mem3_reshard_store.erl
@@ -0,0 +1,288 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_reshard_store).
+
+
+-export([
+ init/3,
+
+ store_job/2,
+ load_job/2,
+ delete_job/2,
+ get_jobs/1,
+
+ store_state/1,
+ load_state/2,
+ delete_state/1, % for debugging
+
+ job_to_ejson_props/2,
+ state_to_ejson_props/1
+]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+-include("mem3_reshard.hrl").
+
+
+-spec init(#state{}, binary(), binary()) -> #state{}.
+init(#state{} = State, JobPrefix, StateDocId) ->
+ State#state{
+ job_prefix = <<?LOCAL_DOC_PREFIX, JobPrefix/binary>>,
+ state_id = <<?LOCAL_DOC_PREFIX, StateDocId/binary>>
+ }.
+
+
+-spec store_job(#state{}, #job{}) -> ok.
+store_job(#state{job_prefix = Prefix}, #job{id = Id} = Job) ->
+ with_shards_db(fun(Db) ->
+ DocId = <<Prefix/binary, Id/binary>>,
+ ok = update_doc(Db, DocId, job_to_ejson_props(Job))
+ end).
+
+
+-spec load_job(#state{}, binary()) -> {ok, {[_]}} | not_found.
+load_job(#state{job_prefix = Prefix}, Id) ->
+ with_shards_db(fun(Db) ->
+ case load_doc(Db, <<Prefix/binary, Id/binary>>) of
+ {ok, DocBody} ->
+ {ok, job_from_ejson(DocBody)};
+ not_found ->
+ not_found
+ end
+ end).
+
+
+-spec delete_job(#state{}, binary()) -> ok.
+delete_job(#state{job_prefix = Prefix}, Id) ->
+ with_shards_db(fun(Db) ->
+ DocId = <<Prefix/binary, Id/binary>>,
+ ok = delete_doc(Db, DocId)
+ end).
+
+
+-spec get_jobs(#state{}) -> [#job{}].
+get_jobs(#state{job_prefix = Prefix}) ->
+ with_shards_db(fun(Db) ->
+ PrefixLen = byte_size(Prefix),
+ FoldFun = fun(#doc{id = Id, body = Body}, Acc) ->
+ case Id of
+ <<Prefix:PrefixLen/binary, _/binary>> ->
+ {ok, [job_from_ejson(Body) | Acc]};
+ _ ->
+ {stop, Acc}
+ end
+ end,
+ Opts = [{start_key, Prefix}],
+ {ok, Jobs} = couch_db:fold_local_docs(Db, FoldFun, [], Opts),
+ lists:reverse(Jobs)
+ end).
+
+
+-spec store_state(#state{}) -> ok.
+store_state(#state{state_id = DocId} = State) ->
+ with_shards_db(fun(Db) ->
+ ok = update_doc(Db, DocId, state_to_ejson_props(State))
+ end).
+
+
+-spec load_state(#state{}, atom()) -> #state{}.
+load_state(#state{state_id = DocId} = State, Default) ->
+ with_shards_db(fun(Db) ->
+ case load_doc(Db, DocId) of
+ {ok, DocBody} ->
+ state_from_ejson(State, DocBody);
+ not_found ->
+ State#state{state = Default}
+ end
+ end).
+
+
+-spec delete_state(#state{}) -> ok.
+delete_state(#state{state_id = DocId}) ->
+ with_shards_db(fun(Db) ->
+ ok = delete_doc(Db, DocId)
+ end).
+
+
+job_to_ejson_props(#job{source = Source, target = Targets} = Job, Opts) ->
+ Iso8601 = proplists:get_value(iso8601, Opts),
+ History = history_to_ejson(Job#job.history, Iso8601),
+ StartTime = case Iso8601 of
+ true -> iso8601(Job#job.start_time);
+ _ -> Job#job.start_time
+ end,
+ UpdateTime = case Iso8601 of
+ true -> iso8601(Job#job.update_time);
+ _ -> Job#job.update_time
+ end,
+ [
+ {id, Job#job.id},
+ {type, Job#job.type},
+ {source, Source#shard.name},
+ {target, [T#shard.name || T <- Targets]},
+ {job_state, Job#job.job_state},
+ {split_state, Job#job.split_state},
+ {state_info, state_info_to_ejson(Job#job.state_info)},
+ {node, atom_to_binary(Job#job.node, utf8)},
+ {start_time, StartTime},
+ {update_time, UpdateTime},
+ {history, History}
+ ].
+
+
+state_to_ejson_props(#state{} = State) ->
+ [
+ {state, atom_to_binary(State#state.state, utf8)},
+ {state_info, state_info_to_ejson(State#state.state_info)},
+ {update_time, State#state.update_time},
+ {node, atom_to_binary(State#state.node, utf8)}
+ ].
+
+
+% Private API
+
+with_shards_db(Fun) ->
+ DbName = config:get("mem3", "shards_db", "_dbs"),
+ case mem3_util:ensure_exists(DbName) of
+ {ok, Db} ->
+ try
+ Fun(Db)
+ after
+ catch couch_db:close(Db)
+ end;
+ Else ->
+ throw(Else)
+ end.
+
+
+delete_doc(Db, DocId) ->
+ case couch_db:open_doc(Db, DocId, []) of
+ {ok, #doc{revs = {_, Revs}}} ->
+ {ok, _} = couch_db:delete_doc(Db, DocId, Revs),
+ {ok, _} = couch_db:ensure_full_commit(Db),
+ ok;
+ {not_found, _} ->
+ ok
+ end.
+
+
+update_doc(Db, DocId, Body) ->
+ DocProps = [{<<"_id">>, DocId}] ++ Body,
+ Body1 = ?JSON_DECODE(?JSON_ENCODE({DocProps})),
+ BaseDoc = couch_doc:from_json_obj(Body1),
+ Doc = case couch_db:open_doc(Db, DocId, []) of
+ {ok, #doc{revs = Revs}} ->
+ BaseDoc#doc{revs = Revs};
+ {not_found, _} ->
+ BaseDoc
+ end,
+ case store_state() of
+ true ->
+ {ok, _} = couch_db:update_doc(Db, Doc, []),
+ couch_log:debug("~p updated doc ~p ~p", [?MODULE, DocId, Body]),
+ {ok, _} = couch_db:ensure_full_commit(Db),
+ ok;
+ false ->
+ couch_log:debug("~p not storing state in ~p", [?MODULE, DocId]),
+ ok
+ end.
+
+
+load_doc(Db, DocId) ->
+ case couch_db:open_doc(Db, DocId, [ejson_body]) of
+ {ok, #doc{body = Body}} ->
+ couch_log:debug("~p loaded doc ~p ~p", [?MODULE, DocId, Body]),
+ {ok, Body};
+ {not_found, _} ->
+ not_found
+ end.
+
+
+job_to_ejson_props(#job{} = Job) ->
+ job_to_ejson_props(Job, []).
+
+
+job_from_ejson({Props}) ->
+ Id = couch_util:get_value(<<"id">>, Props),
+ Type = couch_util:get_value(<<"type">>, Props),
+ Source = couch_util:get_value(<<"source">>, Props),
+ Target = couch_util:get_value(<<"target">>, Props),
+ JobState = couch_util:get_value(<<"job_state">>, Props),
+ SplitState = couch_util:get_value(<<"split_state">>, Props),
+ StateInfo = couch_util:get_value(<<"state_info">>, Props),
+ TStarted = couch_util:get_value(<<"start_time">>, Props),
+ TUpdated = couch_util:get_value(<<"update_time">>, Props),
+ History = couch_util:get_value(<<"history">>, Props),
+ #job{
+ id = Id,
+ type = binary_to_atom(Type, utf8),
+ job_state = binary_to_atom(JobState, utf8),
+ split_state = binary_to_atom(SplitState, utf8),
+ state_info = state_info_from_ejson(StateInfo),
+ node = node(),
+ start_time = TStarted,
+ update_time = TUpdated,
+ source = mem3_reshard:shard_from_name(Source),
+ target = [mem3_reshard:shard_from_name(T) || T <- Target],
+ history = history_from_ejson(History)
+ }.
+
+
+state_from_ejson(#state{} = State, {Props}) ->
+ StateVal = couch_util:get_value(<<"state">>, Props),
+ StateInfo = couch_util:get_value(<<"state_info">>, Props),
+ TUpdated = couch_util:get_value(<<"update_time">>, Props),
+ State#state{
+ state = binary_to_atom(StateVal, utf8),
+ state_info = state_info_from_ejson(StateInfo),
+ node = node(),
+ update_time = TUpdated
+ }.
+
+
+state_info_from_ejson({Props}) ->
+ Props1 = [{binary_to_atom(K, utf8), couch_util:to_binary(V)}
+ || {K, V} <- Props],
+ lists:sort(Props1).
+
+
+history_to_ejson(Hist, true) when is_list(Hist) ->
+ [{[{timestamp, iso8601(T)}, {type, S}, {detail, D}]} || {T, S, D} <- Hist];
+
+history_to_ejson(Hist, _) when is_list(Hist) ->
+ [{[{timestamp, T}, {type, S}, {detail, D}]} || {T, S, D} <- Hist].
+
+
+history_from_ejson(HistoryEJson) when is_list(HistoryEJson) ->
+ lists:map(fun({EventProps}) ->
+ Timestamp = couch_util:get_value(<<"timestamp">>, EventProps),
+ State = couch_util:get_value(<<"type">>, EventProps),
+ Detail = couch_util:get_value(<<"detail">>, EventProps),
+ {Timestamp, binary_to_atom(State, utf8), Detail}
+ end, HistoryEJson).
+
+
+state_info_to_ejson(Props) ->
+ {lists:sort([{K, couch_util:to_binary(V)} || {K, V} <- Props])}.
+
+
+store_state() ->
+ config:get_boolean("reshard", "store_state", true).
+
+
+iso8601(UnixSec) ->
+ Mega = UnixSec div 1000000,
+ Sec = UnixSec rem 1000000,
+ {{Y, M, D}, {H, Min, S}} = calendar:now_to_universal_time({Mega, Sec, 0}),
+ Format = "~B-~2..0B-~2..0BT~2..0B:~2..0B:~2..0BZ",
+ iolist_to_binary(io_lib:format(Format, [Y, M, D, H, Min, S])).
diff --git a/src/mem3/src/mem3_reshard_validate.erl b/src/mem3/src/mem3_reshard_validate.erl
new file mode 100644
index 000000000..aa8df3e16
--- /dev/null
+++ b/src/mem3/src/mem3_reshard_validate.erl
@@ -0,0 +1,126 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_reshard_validate).
+
+-export([
+ start_args/2,
+ source/1,
+ targets/2
+]).
+
+-include_lib("mem3/include/mem3.hrl").
+
+
+-spec start_args(#shard{}, any()) -> ok | {error, term()}.
+start_args(Source, Split) ->
+ first_error([
+ check_split(Split),
+ check_range(Source, Split),
+ check_node(Source),
+ source(Source),
+ check_shard_map(Source)
+ ]).
+
+
+-spec source(#shard{}) -> ok | {error, term()}.
+source(#shard{name = Name}) ->
+ case couch_server:exists(Name) of
+ true ->
+ ok;
+ false ->
+ {error, {source_shard_not_found, Name}}
+ end.
+
+
+-spec check_shard_map(#shard{}) -> ok | {error, term()}.
+check_shard_map(#shard{name = Name}) ->
+ DbName = mem3:dbname(Name),
+ AllShards = mem3:shards(DbName),
+ case mem3_util:calculate_max_n(AllShards) of
+ N when is_integer(N), N >= 1 ->
+ ok;
+ N when is_integer(N), N < 1 ->
+ {error, {not_enough_shard_copies, DbName}}
+ end.
+
+
+-spec targets(#shard{}, [#shard{}]) -> ok | {error, term()}.
+targets(#shard{} = Source, Targets) ->
+ first_error([
+ target_ranges(Source, Targets)
+ ]).
+
+
+-spec check_split(any()) -> ok | {error, term()}.
+check_split(Split) when is_integer(Split), Split > 1 ->
+ ok;
+check_split(Split) ->
+ {error, {invalid_split_parameter, Split}}.
+
+
+-spec check_range(#shard{}, any()) -> ok | {error, term()}.
+check_range(#shard{range = Range = [B, E]}, Split) ->
+ case (E + 1 - B) >= Split of
+ true ->
+ ok;
+ false ->
+ {error, {shard_range_cannot_be_split, Range, Split}}
+ end.
+
+
+-spec check_node(#shard{}) -> ok | {error, term()}.
+check_node(#shard{node = undefined}) ->
+ ok;
+
+check_node(#shard{node = Node}) when Node =:= node() ->
+ ok;
+
+check_node(#shard{node = Node}) ->
+ {error, {source_shard_node_is_not_current_node, Node}}.
+
+
+-spec target_ranges(#shard{}, [#shard{}]) -> ok | {error, any()}.
+target_ranges(#shard{range = [Begin, End]}, Targets) ->
+ Ranges = [R || #shard{range = R} <- Targets],
+ SortFun = fun([B1, _], [B2, _]) -> B1 =< B2 end,
+ [First | RestRanges] = lists:sort(SortFun, Ranges),
+ try
+ TotalRange = lists:foldl(fun([B2, E2], [B1, E1]) ->
+ case B2 =:= E1 + 1 of
+ true ->
+ ok;
+ false ->
+ throw({range_error, {B2, E1}})
+ end,
+ [B1, E2]
+ end, First, RestRanges),
+ case [Begin, End] =:= TotalRange of
+ true ->
+ ok;
+ false ->
+ throw({range_error, {[Begin, End], TotalRange}})
+ end
+ catch
+ throw:{range_error, Error} ->
+ {error, {shard_range_error, Error}}
+ end.
+
+
+-spec first_error([ok | {error, term()}]) -> ok | {error, term()}.
+first_error(Results) ->
+ case [Res || Res <- Results, Res =/= ok] of
+ [] ->
+ ok;
+ [FirstError | _] ->
+ FirstError
+ end.