summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2019-02-25 12:20:46 -0500
committerNick Vatamaniuc <vatamane@apache.org>2019-02-28 11:49:01 -0500
commit71f18988fa6bcaadd230f75683c5dd577bd3ff79 (patch)
treed8715bfd91704b5652de088a29ecfc4ad1382c22
parent97dc33894ed147efd13cf8c4132c45dba5cd9e9d (diff)
downloadcouchdb-71f18988fa6bcaadd230f75683c5dd577bd3ff79.tar.gz
Shard splitting job implementation
This is the implementation of the shard splitting job. `mem3_reshard` manager spawns `mem3_reshard_job` instances via the `mem3_reshard_job_sup` supervisor. Each job is a gen_server process that starts in `mem3_reshard_job:init/1` with `#job{}` record instance as the argument. Then the job goes through recovery, so it can handle resuming in cases where the job was interrupted previously and it was initialized from a checkpointed state. Checkpoiting happens in `mem3_reshard` manager with the help of the `mem3_reshard_store` module (introduced in a previous commit). After recovery, processing starts in the `switch_state` function. The states are defined as a sequence of atoms in a list in `mem3_reshard.hrl`. In the `switch_state()` function, the state and history is updated in the `#job{}` record, then `mem3_reshard` manager is asked to checkpoint the new state. The job process waits for `mem3_reshard` manager to notify it when checkpointing has finished so it can continue processesing the new state. That happens when the `do_state` gen_server cast is received. `do_state` function has state matching heads for each state. Usually if there are long running tasks to be performed `do_state` will spawn a few workers and perform all the work in there. In the meantime the main job process will simpy wait for all the workers to exit. When that happens, it will call `switch_state` to switch to the new state, checkpoint again and so on. Since there are quite a few steps needed to split a shard, some of the helper function needed are defined in separate modules such as: * mem3_reshard_index : Index discovery and building. * mem3_reshard_dbdoc : Shard map updates. * couch_db_split : Initial (bulk) data copy (added in a separate commit). * mem3_rep : To perfom "top-offs" in between some steps. Issue #1920
-rw-r--r--src/mem3/src/mem3_reshard_dbdoc.erl275
-rw-r--r--src/mem3/src/mem3_reshard_index.erl162
-rw-r--r--src/mem3/src/mem3_reshard_job.erl627
-rw-r--r--src/mem3/src/mem3_reshard_store.erl288
-rw-r--r--src/mem3/src/mem3_reshard_validate.erl113
5 files changed, 1465 insertions, 0 deletions
diff --git a/src/mem3/src/mem3_reshard_dbdoc.erl b/src/mem3/src/mem3_reshard_dbdoc.erl
new file mode 100644
index 000000000..fc67f4e02
--- /dev/null
+++ b/src/mem3/src/mem3_reshard_dbdoc.erl
@@ -0,0 +1,275 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_reshard_dbdoc).
+
+-behaviour(gen_server).
+
+-export([
+ update_shard_map/1,
+
+ start_link/0,
+
+ init/1,
+ terminate/2,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ code_change/3
+]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+-include("mem3_reshard.hrl").
+
+
+-spec update_shard_map(#job{}) -> no_return | ok.
+update_shard_map(#job{source = Source, target = Target} = Job) ->
+ Node = hd(mem3_util:live_nodes()),
+ JobStr = mem3_reshard_job:jobfmt(Job),
+ LogMsg1 = "~p : calling update_shard_map on node ~p. ~p",
+ couch_log:notice(LogMsg1, [?MODULE, Node, JobStr]),
+ ServerRef = {?MODULE, Node},
+ CallArg = {update_shard_map, Source, Target},
+ TimeoutMSec = shard_update_timeout_msec(),
+ try
+ case gen_server:call(ServerRef, CallArg, TimeoutMSec) of
+ {ok, _} -> ok;
+ {error, CallError} -> throw({error, CallError})
+ end
+ catch
+ _:Err ->
+ exit(Err)
+ end,
+ LogMsg2 = "~p : update_shard_map on node returned. ~p",
+ couch_log:notice(LogMsg2, [?MODULE, Node]),
+ UntilSec = mem3_reshard:now_sec() + (TimeoutMSec div 1000),
+ case wait_source_removed(Source, 5, UntilSec) of
+ true -> ok;
+ false -> exit(shard_update_did_not_propagate)
+ end.
+
+
+-spec start_link() -> {ok, pid()} | ignore | {error, term()}.
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+init(_) ->
+ couch_log:notice("~p start init()", [?MODULE]),
+ {ok, nil}.
+
+
+terminate(_Reason, _State) ->
+ ok.
+
+
+handle_call({update_shard_map, Source, Target}, _From, State) ->
+ Res = try
+ update_shard_map(Source, Target)
+ catch
+ throw:{error, Error} ->
+ {error, Error}
+ end,
+ {reply, Res, State};
+
+handle_call(Call, From, State) ->
+ couch_log:error("~p unknown call ~p from: ~p", [?MODULE, Call, From]),
+ {noreply, State}.
+
+
+handle_cast(Cast, State) ->
+ couch_log:error("~p unexpected cast ~p", [?MODULE, Cast]),
+ {noreply, State}.
+
+
+handle_info(Info, State) ->
+ couch_log:error("~p unexpected info ~p", [?MODULE, Info]),
+ {noreply, State}.
+
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+
+% Private
+
+update_shard_map(Source, Target) ->
+ ok = validate_coordinator(),
+ ok = replicate_from_all_nodes(shard_update_timeout_msec()),
+ #shard{name = SourceName} = Source,
+ DocId = mem3:dbname(SourceName),
+ OldDoc = case mem3_util:open_db_doc(DocId) of
+ {ok, #doc{deleted = true}} ->
+ throw({error, missing_source});
+ {ok, #doc{} = Doc} ->
+ Doc;
+ {not_found, deleted} ->
+ throw({error, missing_source});
+ OpenErr ->
+ throw({error, {shard_doc_open_error, OpenErr}})
+ end,
+ #doc{body = OldBody} = OldDoc,
+ NewBody = update_shard_props(OldBody, Source, Target),
+ {ok, _} = write_shard_doc(OldDoc, NewBody),
+ ok = replicate_to_all_nodes(shard_update_timeout_msec()),
+ {ok, NewBody}.
+
+
+validate_coordinator() ->
+ case hd(mem3_util:live_nodes()) =:= node() of
+ true -> ok;
+ false -> throw({error, coordinator_changed})
+ end.
+
+
+replicate_from_all_nodes(TimeoutMSec) ->
+ case mem3_util:replicate_dbs_from_all_nodes(TimeoutMSec) of
+ ok -> ok;
+ Error -> throw({error, Error})
+ end.
+
+
+replicate_to_all_nodes(TimeoutMSec) ->
+ case mem3_util:replicate_dbs_to_all_nodes(TimeoutMSec) of
+ ok -> ok;
+ Error -> throw({error, Error})
+ end.
+
+
+write_shard_doc(#doc{id = Id} = Doc, Body) ->
+ DbName = ?l2b(config:get("mem3", "shards_db", "_dbs")),
+ UpdatedDoc = Doc#doc{body = Body},
+ couch_util:with_db(DbName, fun(Db) ->
+ try
+ {ok, _} = couch_db:update_doc(Db, UpdatedDoc, [])
+ catch
+ conflict ->
+ throw({error, {conflict, Id, Doc#doc.body, UpdatedDoc}})
+ end
+ end).
+
+
+update_shard_props({Props0}, #shard{} = Source, [#shard{} | _] = Targets) ->
+ {ByNode0} = couch_util:get_value(<<"by_node">>, Props0, {[]}),
+ ByNodeKV = {<<"by_node">>, {update_by_node(ByNode0, Source, Targets)}},
+ Props1 = lists:keyreplace(<<"by_node">>, 1, Props0, ByNodeKV),
+
+ {ByRange0} = couch_util:get_value(<<"by_range">>, Props1, {[]}),
+ ByRangeKV = {<<"by_range">>, {update_by_range(ByRange0, Source, Targets)}},
+ Props2 = lists:keyreplace(<<"by_range">>, 1, Props1, ByRangeKV),
+
+ Changelog = couch_util:get_value(<<"changelog">>, Props2, []),
+ {Node, Range} = {node_key(Source), range_key(Source)},
+ ChangelogEntry = [[<<"split">>, Range, length(Targets), Node]],
+ ChangelogKV = {<<"changelog">>, Changelog ++ ChangelogEntry},
+ Props3 = lists:keyreplace(<<"changelog">>, 1, Props2, ChangelogKV),
+
+ {Props3}.
+
+
+update_by_node(ByNode, #shard{} = Source, [#shard{} | _] = Targets) ->
+ {NodeKey, SKey} = {node_key(Source), range_key(Source)},
+ {_, Ranges} = lists:keyfind(NodeKey, 1, ByNode),
+ Ranges1 = Ranges -- [SKey],
+ Ranges2 = Ranges1 ++ [range_key(T) || T <- Targets],
+ lists:keyreplace(NodeKey, 1, ByNode, {NodeKey, lists:sort(Ranges2)}).
+
+
+update_by_range(ByRange, Source, Targets) ->
+ ByRange1 = remove_node_from_source(ByRange, Source),
+ lists:foldl(fun add_node_to_target_foldl/2, ByRange1, Targets).
+
+
+remove_node_from_source(ByRange, Source) ->
+ {NodeKey, SKey} = {node_key(Source), range_key(Source)},
+ {_, SourceNodes} = lists:keyfind(SKey, 1, ByRange),
+ % Double check that source had node to begin with
+ case lists:member(NodeKey, SourceNodes) of
+ true ->
+ ok;
+ false ->
+ throw({source_shard_missing_node, NodeKey, SourceNodes})
+ end,
+ SourceNodes1 = SourceNodes -- [NodeKey],
+ case SourceNodes1 of
+ [] ->
+ % If last node deleted, remove entry
+ lists:keydelete(SKey, 1, ByRange);
+ _ ->
+ lists:keyreplace(SKey, 1, ByRange, {SKey, SourceNodes1})
+ end.
+
+
+add_node_to_target_foldl(#shard{} = Target, ByRange) ->
+ {NodeKey, TKey} = {node_key(Target), range_key(Target)},
+ case lists:keyfind(TKey, 1, ByRange) of
+ {_, Nodes} ->
+ % Double check that target does not have node already
+ case lists:member(NodeKey, Nodes) of
+ false ->
+ ok;
+ true ->
+ throw({target_shard_already_has_node, NodeKey, Nodes})
+ end,
+ Nodes1 = lists:sort([NodeKey | Nodes]),
+ lists:keyreplace(TKey, 1, ByRange, {TKey, Nodes1});
+ false ->
+ % fabric_db_create:make_document/3 says they should be sorted
+ lists:sort([{TKey, [NodeKey]} | ByRange])
+ end.
+
+
+node_key(#shard{node = Node}) ->
+ couch_util:to_binary(Node).
+
+
+range_key(#shard{range = [B, E]}) ->
+ BHex = couch_util:to_hex(<<B:32/integer>>),
+ EHex = couch_util:to_hex(<<E:32/integer>>),
+ list_to_binary([BHex, "-", EHex]).
+
+
+shard_update_timeout_msec() ->
+ config:get_integer("reshard", "shard_upate_timeout_msec", 300000).
+
+
+wait_source_removed(#shard{name = Name} = Source, SleepSec, UntilSec) ->
+ case check_source_removed(Source) of
+ true ->
+ true;
+ false ->
+ case mem3_reshard:now_sec() < UntilSec of
+ true ->
+ LogMsg = "~p : Waiting for shard ~p removal confirmation",
+ couch_log:notice(LogMsg, [?MODULE, Name]),
+ timer:sleep(SleepSec * 1000),
+ wait_source_removed(Source, SleepSec, UntilSec);
+ false ->
+ false
+ end
+ end.
+
+
+check_source_removed(#shard{name = Name}) ->
+ DbName = mem3:dbname(Name),
+ Live = mem3_util:live_nodes(),
+ ShardNodes = [N || #shard{node = N} <- mem3:shards(DbName)],
+ Nodes = lists:usort([N || N <- ShardNodes, lists:member(N, Live)]),
+ {Responses, _} = rpc:multicall(Nodes, mem3, shards, [DbName]),
+ Shards = lists:usort(lists:flatten(Responses)),
+ SourcePresent = [S || S = #shard{name = S, node = N} <- Shards, S =:= Name,
+ N =:= node()],
+ case SourcePresent of
+ [] -> true;
+ [_ | _] -> false
+ end.
diff --git a/src/mem3/src/mem3_reshard_index.erl b/src/mem3/src/mem3_reshard_index.erl
new file mode 100644
index 000000000..5c61abdad
--- /dev/null
+++ b/src/mem3/src/mem3_reshard_index.erl
@@ -0,0 +1,162 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_reshard_index).
+
+
+-export([
+ design_docs/1,
+ target_indices/2,
+ spawn_builders/1
+]).
+
+
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+
+%% Public API
+
+design_docs(DbName) ->
+ try
+ case fabric:design_docs(mem3:dbname(DbName)) of
+ {error, {maintenance_mode, _, _Node}} ->
+ {ok, []};
+ {ok, DDocs} ->
+ JsonDocs = [couch_doc:from_json_obj(DDoc) || DDoc <- DDocs],
+ {ok, JsonDocs};
+ Else ->
+ Else
+ end
+ catch error:database_does_not_exist ->
+ {ok, []}
+ end.
+
+
+target_indices(Docs, Targets) ->
+ Indices = [[indices(N, D) || D <- Docs] || #shard{name = N} <- Targets],
+ lists:flatten(Indices).
+
+
+spawn_builders(Indices) ->
+ Results = [build_index(Index) || Index <- Indices],
+ Oks = [{ok, Pid} || {ok, Pid} <- Results, is_pid(Pid)],
+ case Results -- Oks of
+ [] ->
+ {ok, [Pid || {ok, Pid} <- Results]};
+ Error ->
+ % Do a all or nothing pattern, if some indices could not be
+ % spawned, kill the spawned ones and and return the error.
+ ErrMsg = "~p failed to spawn index builders: ~p ~p",
+ couch_log:error(ErrMsg, [?MODULE, Error, Indices]),
+ lists:foreach(fun({ok, Pid}) ->
+ catch unlink(Pid),
+ catch exit(Pid, kill)
+ end, Oks),
+ {error, Error}
+ end.
+
+
+%% Private API
+
+indices(DbName, Doc) ->
+ mrview_indices(DbName, Doc) ++
+ [dreyfus_indices(DbName, Doc) || has_app(dreyfus)] ++
+ [hastings_indices(DbName, Doc) || has_app(hastings)].
+
+
+mrview_indices(DbName, Doc) ->
+ try
+ {ok, MRSt} = couch_mrview_util:ddoc_to_mrst(DbName, Doc),
+ Views = couch_mrview_index:get(views, MRSt),
+ case Views =/= [] of
+ true ->
+ [{mrview, DbName, MRSt}];
+ false ->
+ []
+ end
+ catch
+ Tag:Err ->
+ Msg = "~p couldn't get mrview index ~p ~p ~p:~p",
+ couch_log:error(Msg, [?MODULE, DbName, Doc, Tag, Err]),
+ []
+ end.
+
+
+dreyfus_indices(DbName, Doc) ->
+ try
+ Indices = dreyfus_index:design_doc_to_indexes(Doc),
+ [{dreyfus, DbName, Index} || Index <- Indices]
+ catch
+ Tag:Err ->
+ Msg = "~p couldn't get dreyfus indices ~p ~p ~p:~p",
+ couch_log:error(Msg, [?MODULE, DbName, Doc, Tag, Err]),
+ []
+ end.
+
+
+hastings_indices(DbName, Doc) ->
+ try
+ Indices = hastings_index:design_doc_to_indexes(Doc),
+ [{hastings, DbName, Index} || Index <- Indices]
+ catch
+ Tag:Err ->
+ Msg = "~p couldn't get hasting indices ~p ~p ~p:~p",
+ couch_log:error(Msg, [?MODULE, DbName, Doc, Tag, Err]),
+ []
+ end.
+
+
+build_index({mrview, DbName, MRSt}) ->
+ case couch_index_server:get_index(couch_mrview_index, MRSt) of
+ {ok, Pid} ->
+ Args = [Pid, get_update_seq(DbName)],
+ WPid = spawn_link(couch_index, get_state, Args),
+ {ok, WPid};
+ Error ->
+ Error
+ end;
+
+build_index({dreyfus, DbName, Index})->
+ case dreyfus_index_manager:get_index(DbName, Index) of
+ {ok, Pid} ->
+ Args = [Pid, get_update_seq(DbName)],
+ WPid = spawn_link(dreyfus_index, await, Args),
+ {ok, WPid};
+ Error ->
+ Error
+ end;
+
+build_index({hastings, DbName, Index}) ->
+ case hastings_index_manager:get_index(DbName, Index) of
+ {ok, Pid} ->
+ Args = [Pid, get_update_seq(DbName)],
+ WPid = spawn_link(hastings_index, await, Args),
+ {ok, WPid};
+ Error ->
+ Error
+ end.
+
+
+has_app(App) ->
+ case application:get_application(App) of
+ {ok, _} ->
+ true;
+ _ ->
+ false
+ end.
+
+
+get_update_seq(DbName) ->
+ couch_util:with_db(DbName, fun(Db) ->
+ couch_db:get_update_seq(Db)
+ end).
diff --git a/src/mem3/src/mem3_reshard_job.erl b/src/mem3/src/mem3_reshard_job.erl
new file mode 100644
index 000000000..fff2a543e
--- /dev/null
+++ b/src/mem3/src/mem3_reshard_job.erl
@@ -0,0 +1,627 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_reshard_job).
+
+
+-behaviour(gen_server).
+
+
+-export([
+ start_link/1,
+ checkpoint_done/1,
+ jobfmt/1,
+ pickfun/3,
+
+ init/1,
+ terminate/2,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ code_change/3,
+
+ initial_copy/1,
+ topoff/2,
+ copy_local_docs/1,
+ wait_source_close/1,
+ source_delete/1
+]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+-include("mem3_reshard.hrl").
+
+
+start_link(#job{} = Job) ->
+ gen_server:start_link(?MODULE, [Job], []).
+
+
+% This is called by the main proces after it has checkpointed the progress
+% of the job. After the new state was checkpointed, we signal the job do start
+% executing that state.
+checkpoint_done(#job{pid = Pid} = Job) ->
+ couch_log:notice(" ~p : checkpoint done for ~p", [?MODULE, jobfmt(Job)]),
+ gen_server:cast(Pid, do_state).
+
+
+% Formatting function, used for logging mostly
+jobfmt(#job{} = Job) ->
+ #job{
+ id = Id,
+ source = #shard{name = Source},
+ target = Target,
+ split_state = State,
+ job_state = JobState,
+ pid = Pid
+ } = Job,
+ TargetCount = length(Target),
+ Msg = "#job{~s ~s /~B job_state:~s split_state:~s pid:~p}",
+ Fmt = io_lib:format(Msg, [Id, Source, TargetCount, JobState, State, Pid]),
+ lists:flatten(Fmt).
+
+
+pickfun(DocId, [[B, E] | _] = Ranges, {_M, _F, _A} = HashFun) when
+ is_integer(B), is_integer(E), B =< E ->
+ HashKey = mem3_hash:calculate(HashFun, DocId),
+ Pred = fun([Begin, End]) ->
+ Begin =< HashKey andalso HashKey =< End
+ end,
+ [Key] = lists:filter(Pred, Ranges),
+ Key.
+
+
+init([#job{} = Job0]) ->
+ process_flag(trap_exit, true),
+ Job = Job0#job{
+ pid = self(),
+ start_time = mem3_reshard:now_sec(),
+ workers = [],
+ retries = 0
+ },
+ gen_server:cast(self(), maybe_recover),
+ couch_log:notice("~p starting job ~s", [?MODULE, jobfmt(Job)]),
+ {ok, Job}.
+
+
+terminate(normal, _Job) ->
+ ok;
+
+terminate(shutdown, _Job) ->
+ ok;
+
+terminate({shutdown, _}, _Job) ->
+ ok;
+
+terminate(Reason, Job) ->
+ couch_log:error("~p terminate ~p ~p", [?MODULE, Reason, Job]),
+ ok.
+
+
+handle_call(Call, From, Job) ->
+ couch_log:notice("~p call ~p from: ~p", [?MODULE, Call, From]),
+ {noreply, Job}.
+
+
+handle_cast(maybe_recover, Job) ->
+ {noreply, maybe_recover(Job)};
+
+handle_cast(do_state, Job) ->
+ case do_state(Job) of
+ {ok, Job1} ->
+ {noreply, Job1};
+ {stop, Error, Job1} ->
+ {stop, Error, Job1};
+ {retry, Error, Job1} ->
+ {noreply, retry_state(Error, Job1)}
+ end;
+
+handle_cast(Cast, Job) ->
+ couch_log:notice("~p unknown cast ~p", [?MODULE, Cast]),
+ {noreply, Job}.
+
+
+handle_info(retry, #job{split_state = initial_copy} = Job) ->
+ % For initial copy before retrying make sure to reset the target
+ % as initial copy works from newly created copy every time
+ handle_cast(do_state, reset_target(Job));
+
+handle_info(retry, #job{} = Job) ->
+ handle_cast(do_state, Job);
+
+handle_info({'EXIT', Pid, Reason}, #job{workers = Workers} = Job) ->
+ case lists:member(Pid, Workers) of
+ true ->
+ Job1 = Job#job{workers = Workers -- [Pid]},
+ worker_exited(Reason, Job1);
+ false ->
+ ErrMsg = "~p ~p pid exited reason: ~p job: ~s",
+ couch_log:error(ErrMsg, [?MODULE, Pid, Reason, jobfmt(Job)]),
+ {noreply, Job}
+ end;
+
+handle_info(Info, Job) ->
+ couch_log:notice("~p info ~p ~p", [?MODULE, Info, Job]),
+ {noreply, Job}.
+
+
+code_change(_OldVsn, Job, _Extra) ->
+ {ok, Job}.
+
+
+-spec initial_copy(#job{}) -> ok | no_return().
+initial_copy(#job{source = Source, target = Targets0} = Job) ->
+ #shard{name = SourceName} = Source,
+ Targets = [{R, N} || #shard{range = R, name = N} <- Targets0],
+ TMap = maps:from_list(Targets),
+ LogMsg1 = "~p initial_copy started ~s",
+ LogArgs1 = [?MODULE, shardsstr(Source, Targets0)],
+ couch_log:notice(LogMsg1, LogArgs1),
+ case couch_db_split:split(SourceName, TMap, fun pickfun/3) of
+ {ok, Seq} ->
+ LogMsg2 = "~p initial_copy of ~s finished @ seq:~p",
+ LogArgs2 = [?MODULE, shardsstr(Source, Targets0), Seq],
+ couch_log:notice(LogMsg2, LogArgs2),
+ create_artificial_mem3_rep_checkpoints(Job, Seq);
+ {error, Error} ->
+ LogMsg3 = "~p initial_copy of ~p finished @ ~p",
+ LogArgs3 = [?MODULE, shardsstr(Source, Targets0), Error],
+ couch_log:notice(LogMsg3, LogArgs3),
+ exit({error, Error})
+ end.
+
+
+-spec topoff(#shard{}, [#shard{}]) -> ok | no_return().
+topoff(#shard{} = Source, Targets) ->
+ couch_log:notice("~p topoff ~p", [?MODULE, shardsstr(Source, Targets)]),
+ check_source_exists(Source, topoff),
+ check_targets_exist(Targets, topoff),
+ TMap = maps:from_list([{R, T} || #shard{range = R} = T <- Targets]),
+ Opts = [{batch_size, 2000}, {batch_count, all}],
+ case mem3_rep:go(Source, TMap, Opts) of
+ {ok, Count} ->
+ Args = [?MODULE, shardsstr(Source, Targets), Count],
+ couch_log:notice("~p topoff done ~s, count: ~p", Args),
+ ok;
+ {error, Error} ->
+ Args = [?MODULE, shardsstr(Source, Targets), Error],
+ couch_log:error("~p topoff failed ~s, error: ~p", Args),
+ exit({error, Error})
+ end.
+
+
+-spec copy_local_docs(#job{}) -> ok | no_return().
+copy_local_docs(#job{source = Source, target = Targets0}) ->
+ #shard{name = SourceName} = Source,
+ Targets = [{R, N} || #shard{range = R, name = N} <- Targets0],
+ TMap = maps:from_list(Targets),
+ LogArg1 = [?MODULE, shardsstr(Source, Targets)],
+ couch_log:notice("~p copy local docs start ~s", LogArg1),
+ case couch_db_split:copy_local_docs(SourceName, TMap, fun pickfun/3) of
+ ok ->
+ couch_log:notice("~p copy local docs finished for ~s", LogArg1),
+ ok;
+ {error, Error} ->
+ LogArg2 = [?MODULE, shardsstr(Source, Targets), Error],
+ couch_log:error("~p copy local docs failed for ~s ~p", LogArg2),
+ exit({error, Error})
+ end.
+
+
+-spec wait_source_close(#job{}) -> ok | no_return().
+wait_source_close(#job{source = #shard{name = Name}, target = Targets}) ->
+ Timeout = config:get_integer("mem3_reshard",
+ "source_close_timeout_sec", 600),
+ check_targets_exist(Targets, wait_source_close),
+ case couch_db:open_int(Name, [?ADMIN_CTX]) of
+ {ok, Db} ->
+ Now = mem3_reshard:now_sec(),
+ case wait_source_close(Db, 1, Now + Timeout) of
+ true ->
+ ok;
+ false ->
+ exit({error, source_db_close_timeout, Name, Timeout})
+ end;
+ {not_found, _} ->
+ couch_log:warning("~p source already deleted ~p", [?MODULE, Name]),
+ ok
+ end.
+
+
+-spec source_delete(#job{}) -> ok | no_return().
+source_delete(#job{source = #shard{name = Name}, target = Targets}) ->
+ check_targets_exist(Targets, source_delete),
+ case config:get_boolean("mem3_reshard", "delete_source", true) of
+ true ->
+ case couch_server:delete(Name, [?ADMIN_CTX]) of
+ ok ->
+ couch_log:notice("~p : deleted source shard ~p",
+ [?MODULE, Name]);
+ not_found ->
+ couch_log:warning("~p : source was already deleted ~p",
+ [?MODULE, Name])
+ end;
+ false ->
+ LogMsg = "~p : according to configuration not deleting source ~p",
+ couch_log:warning(LogMsg, [?MODULE, Name])
+ end.
+
+
+-spec next_state(split_state()) -> split_state().
+next_state(State) ->
+ next_state(State, ?SPLIT_STATES).
+
+
+next_state(State, [State, Next | _Rest]) ->
+ Next;
+
+next_state(completed, _) ->
+ completed;
+
+next_state(State, [_ | Rest]) ->
+ next_state(State, Rest).
+
+
+-spec maybe_recover(#job{}) -> #job{} | no_return().
+maybe_recover(#job{split_state = new} = Job) ->
+ Job1 = reset_target(Job),
+ switch_state(Job1, initial_copy);
+
+maybe_recover(#job{split_state = initial_copy} = Job) ->
+ Job1 = reset_target(Job),
+ switch_state(Job1, initial_copy);
+
+maybe_recover(#job{split_state = topoff1} = Job) ->
+ switch_state(Job, topoff1);
+
+maybe_recover(#job{split_state = build_indices} = Job) ->
+ switch_state(Job, topoff1);
+
+maybe_recover(#job{split_state = topoff2} = Job) ->
+ switch_state(Job, topoff1);
+
+maybe_recover(#job{split_state = copy_local_docs} = Job) ->
+ switch_state(Job, topoff1);
+
+maybe_recover(#job{split_state = update_shardmap} = Job) ->
+ switch_state(Job, update_shardmap);
+
+maybe_recover(#job{split_state = wait_source_close} = Job) ->
+ switch_state(Job, wait_source_close);
+
+maybe_recover(#job{split_state = topoff3} = Job) ->
+ switch_state(Job, wait_source_close);
+
+maybe_recover(#job{split_state = source_delete} = Job) ->
+ switch_state(Job, wait_source_close);
+
+maybe_recover(#job{split_state = completed} = Job) ->
+ switch_state(Job, completed);
+
+maybe_recover(Job) ->
+ couch_log:error("~p reocver : unknown state ~s", [?MODULE, jobfmt(Job)]),
+ erlang:error({invalid_split_job_recover_state, Job}).
+
+
+-spec retry_state(term(), #job{}) -> #job{}.
+retry_state(Error, #job{retries = Retries, state_info = Info} = Job0) ->
+ Job = Job0#job{
+ retries = Retries + 1,
+ state_info = info_update(error, Error, Info)
+ },
+ couch_log:notice("~p retrying ~p ~p", [?MODULE, jobfmt(Job), Retries]),
+ Job1 = report(Job),
+ erlang:send_after(retry_interval_sec() * 1000, self(), retry),
+ Job1.
+
+
+-spec switch_state(#job{}, split_state()) -> #job{}.
+switch_state(#job{manager = ManagerPid} = Job0, NewState) ->
+ Info = Job0#job.state_info,
+ Info1 = info_delete(error, Info),
+ Info2 = info_delete(reason, Info1),
+ Job = Job0#job{
+ split_state = NewState,
+ update_time = mem3_reshard:now_sec(),
+ retries = 0,
+ state_info = Info2,
+ workers = []
+ },
+ Job1 = update_split_history(Job),
+ % Ask main process to checkpoint. When if it has finished it will notify us
+ % by calling by checkpoint_done/1. The reason not to call the main process
+ % via a gen_server:call is because the main process could be in the middle
+ % of terminating the job and then it would deadlock (after sending us a
+ % shutdown message) and it would end up using the whole supervisor
+ % termination timeout before finally.
+ ok = mem3_reshard:checkpoint(ManagerPid, check_state(Job1)),
+ Job1.
+
+
+-spec do_state(#job{}) -> #job{}.
+do_state(#job{split_state = initial_copy} = Job) ->
+ Pid = spawn_link(?MODULE, initial_copy, [Job]),
+ {ok, report(Job#job{workers = [Pid]})};
+
+do_state(#job{split_state = topoff1} = Job) ->
+ do_state_topoff(Job);
+
+do_state(#job{split_state = build_indices} = Job) ->
+ case build_indices(Job#job.source, Job#job.target) of
+ {ok, []} ->
+ {ok, switch_state(Job, next_state(build_indices))};
+ {ok, Pids} when is_list(Pids) ->
+ {ok, report(Job#job{workers = Pids})};
+ {error, Error} ->
+ maybe_retry(Job, max_retries(), Error)
+ end;
+
+do_state(#job{split_state = topoff2} = Job) ->
+ do_state_topoff(Job);
+
+do_state(#job{split_state = copy_local_docs} = Job) ->
+ Pid = spawn_link(?MODULE, copy_local_docs, [Job]),
+ {ok, report(Job#job{workers = [Pid]})};
+
+do_state(#job{split_state = update_shardmap} = Job) ->
+ Pid = spawn_link(mem3_reshard_dbdoc, update_shard_map, [Job]),
+ {ok, report(Job#job{workers = [Pid]})};
+
+do_state(#job{split_state = wait_source_close} = Job) ->
+ Pid = spawn_link(?MODULE, wait_source_close, [Job]),
+ {ok, report(Job#job{workers = [Pid]})};
+
+do_state(#job{split_state = topoff3} = Job) ->
+ do_state_topoff(Job);
+
+do_state(#job{split_state = source_delete} = Job) ->
+ ok = source_delete(Job),
+ {ok, switch_state(Job, next_state(source_delete))};
+
+do_state(#job{split_state = completed} = Job) ->
+ couch_log:notice("~p : ~p completed, exit normal", [?MODULE, jobfmt(Job)]),
+ {stop, normal, Job};
+
+do_state(#job{split_state = State} = Job) ->
+ couch_log:error("~p do_state NOT IMPLEMENTED ~p", [?MODULE, jobfmt(Job)]),
+ erlang:error({split_state_not_implemented, State, Job}).
+
+
+-spec maybe_retry(#job{}, any(), integer()) ->
+ {stop, any(), #job{}} | {retry, any(), #job{}}.
+maybe_retry(#job{retries = Retries} = Job, Max, Error) when Retries =< Max ->
+ {retry, Error, Job};
+
+maybe_retry(#job{} = Job, _, Error) ->
+ {stop, Error, Job}.
+
+
+-spec report(#job{}) -> #job{}.
+report(#job{manager = ManagerPid} = Job) ->
+ Job1 = Job#job{update_time = mem3_reshard:now_sec()},
+ ok = mem3_reshard:report(ManagerPid, Job1),
+ Job1.
+
+
+-spec worker_exited(any(), #job{}) ->
+ {noreply, #job{}} | {stop, any()} | {retry, any(), #job{}}.
+worker_exited(normal, #job{split_state = State, workers = []} = Job) ->
+ couch_log:notice("~p last worker exited ~p", [?MODULE, jobfmt(Job)]),
+ {noreply, switch_state(Job, next_state(State))};
+
+worker_exited(normal, #job{workers = Workers} = Job) when Workers =/= [] ->
+ {noreply, Job};
+
+worker_exited({error, missing_source}, #job{} = Job) ->
+ Msg1 = "~p stopping worker due to source missing ~p",
+ couch_log:error(Msg1, [?MODULE, jobfmt(Job)]),
+ [begin unlink(Pid), exit(Pid, kill) end || Pid <- Job#job.workers],
+ TargetCleanStates = [initial_copy, topoff1, build_indices, topoff2,
+ copy_local_docs],
+ case lists:member(Job#job.split_state, TargetCleanStates) of
+ true ->
+ Msg2 = "~p cleaning target after db was deleted ~p",
+ couch_log:error(Msg2, [?MODULE, jobfmt(Job)]),
+ {stop, {error, missing_source}, reset_target(Job)};
+ false ->
+ {stop, {error, missing_source}, Job}
+ end;
+
+worker_exited({error, missing_target}, #job{} = Job) ->
+ Msg = "~p stopping worker due to target db missing ~p",
+ couch_log:error(Msg, [?MODULE, jobfmt(Job)]),
+ [begin unlink(Pid), exit(Pid, kill) end || Pid <- Job#job.workers],
+ {stop, {error, missing_target}, Job};
+
+worker_exited(Reason, #job{} = Job0) ->
+ couch_log:error("~p worker error ~p ~p", [?MODULE, jobfmt(Job0), Reason]),
+ [begin unlink(Pid), exit(Pid, kill) end || Pid <- Job0#job.workers],
+ Job = Job0#job{workers = []},
+ case maybe_retry(Job, max_retries(), Reason) of
+ {stop, Error, Job1} ->
+ {stop, Error, Job1};
+ {retry, Error, Job1} ->
+ {noreply, retry_state(Error, Job1)}
+ end.
+
+
+% This is for belt and suspenders really. Call periodically to validate the
+% state is one of the expected states.
+-spec check_state(#job{}) -> #job{} | no_return().
+check_state(#job{split_state = State} = Job) ->
+ case lists:member(State, ?SPLIT_STATES) of
+ true ->
+ Job;
+ false ->
+ erlang:error({invalid_shard_split_state, State, Job})
+ end.
+
+
+create_artificial_mem3_rep_checkpoints(#job{} = Job, Seq) ->
+ #job{source = Source = #shard{name = SourceName}, target = Targets} = Job,
+ check_source_exists(Source, initial_copy),
+ TNames = [TN || #shard{name = TN} <- Targets],
+ Timestamp = list_to_binary(mem3_util:iso8601_timestamp()),
+ couch_util:with_db(SourceName, fun(SDb) ->
+ [couch_util:with_db(TName, fun(TDb) ->
+ Doc = mem3_rep_checkpoint_doc(SDb, TDb, Timestamp, Seq),
+ {ok, _} = couch_db:update_doc(SDb, Doc, []),
+ {ok, _} = couch_db:update_doc(TDb, Doc, []),
+ ok
+ end) || TName <- TNames]
+ end),
+ ok.
+
+
+mem3_rep_checkpoint_doc(SourceDb, TargetDb, Timestamp, Seq) ->
+ Node = atom_to_binary(node(), utf8),
+ SourceUUID = couch_db:get_uuid(SourceDb),
+ TargetUUID = couch_db:get_uuid(TargetDb),
+ History = {[
+ {<<"source_node">>, Node},
+ {<<"source_uuid">>, SourceUUID},
+ {<<"source_seq">>, Seq},
+ {<<"timestamp">>, Timestamp},
+ {<<"target_node">>, Node},
+ {<<"target_uuid">>, TargetUUID},
+ {<<"target_seq">>, Seq}
+ ]},
+ Body = {[
+ {<<"seq">>, Seq},
+ {<<"target_uuid">>, TargetUUID},
+ {<<"history">>, {[{Node, [History]}]}}
+ ]},
+ Id = mem3_rep:make_local_id(SourceUUID, TargetUUID),
+ #doc{id = Id, body = Body}.
+
+
+do_state_topoff(#job{} = Job) ->
+ #job{source = Source, target = Targets} = Job,
+ Pid = spawn_link(?MODULE, topoff, [Source, Targets]),
+ {ok, report(Job#job{workers = [Pid]})}.
+
+
+check_source_exists(#shard{name = Name}, StateName) ->
+ case couch_server:exists(Name) of
+ true ->
+ ok;
+ false ->
+ ErrMsg = "~p source ~p is unexpectedly missing in ~p",
+ couch_log:error(ErrMsg, [?MODULE, Name, StateName]),
+ exit({error, missing_source})
+ end.
+
+
+check_targets_exist(Targets, StateName) ->
+ lists:foreach(fun(#shard{name = Name}) ->
+ case couch_server:exists(Name) of
+ true ->
+ ok;
+ false ->
+ ErrMsg = "~p target ~p is unexpectedly missing in ~p",
+ couch_log:error(ErrMsg, [?MODULE, Name, StateName]),
+ exit({error, missing_target})
+ end
+ end, Targets).
+
+
+build_indices(#shard{name = SourceName} = Source, Targets) ->
+ check_source_exists(Source, build_indices),
+ {ok, DDocs} = mem3_reshard_index:design_docs(SourceName),
+ Indices = mem3_reshard_index:target_indices(DDocs, Targets),
+ mem3_reshard_index:spawn_builders(Indices).
+
+
+wait_source_close(Db, SleepSec, UntilSec) ->
+ case couch_db:monitored_by(Db) -- [self()] of
+ [] ->
+ true;
+ [_ | _] ->
+ Now = mem3_reshard:now_sec(),
+ case Now < UntilSec of
+ true ->
+ LogMsg = "~p : Waiting for source shard ~p to be closed",
+ couch_log:notice(LogMsg, [?MODULE, couch_db:name(Db)]),
+ timer:sleep(SleepSec * 1000),
+ wait_source_close(Db, SleepSec, UntilSec);
+ false ->
+ false
+ end
+ end.
+
+
+-spec max_retries() -> integer().
+max_retries() ->
+ config:get_integer("mem3_reshard", "max_retries", 1).
+
+
+-spec retry_interval_sec() -> integer().
+retry_interval_sec() ->
+ config:get_integer("mem3_reshard", "retry_interval_sec", 10).
+
+
+-spec info_update(atom(), any(), [tuple()]) -> [tuple()].
+info_update(Key, Val, StateInfo) ->
+ lists:keystore(Key, 1, StateInfo, {Key, Val}).
+
+
+-spec info_delete(atom(), [tuple()]) -> [tuple()].
+info_delete(Key, StateInfo) ->
+ lists:keydelete(Key, 1, StateInfo).
+
+
+-spec shardsstr(#shard{}, #shard{} | [#shard{}]) -> string().
+shardsstr(#shard{name = SourceName}, #shard{name = TargetName}) ->
+ lists:flatten(io_lib:format("~s -> ~s", [SourceName, TargetName]));
+
+shardsstr(#shard{name = SourceName}, Targets) ->
+ TNames = [TN || #shard{name = TN} <- Targets],
+ TargetsStr = string:join([binary_to_list(T) || T <- TNames], ","),
+ lists:flatten(io_lib:format("~s -> ~s", [SourceName, TargetsStr])).
+
+
+-spec reset_target(#job{}) -> #job{}.
+reset_target(#job{source = Source, target = Targets} = Job) ->
+ ShardNames = try
+ [N || #shard{name = N} <- mem3:local_shards(mem3:dbname(Source))]
+ catch
+ error:database_does_not_exist ->
+ []
+ end,
+ lists:map(fun(#shard{name = Name}) ->
+ case {couch_server:exists(Name), lists:member(Name, ShardNames)} of
+ {_, true} ->
+ % Should never get here but if we do crash and don't continue
+ LogMsg = "~p : ~p target unexpectedly found in shard map ~p",
+ couch_log:error(LogMsg, [?MODULE, jobfmt(Job), Name]),
+ erlang:error({target_present_in_shard_map, Name});
+ {true, false} ->
+ LogMsg = "~p : ~p resetting ~p target",
+ couch_log:warning(LogMsg, [?MODULE, jobfmt(Job), Name]),
+ couch_db_split:cleanup_target(Source#shard.name, Name);
+ {false, false} ->
+ ok
+ end
+ end, Targets),
+ Job.
+
+
+-spec update_split_history(#job{}) -> #job{}.
+update_split_history(#job{split_state = St, update_time = Ts} = Job) ->
+ Hist = Job#job.history,
+ JobSt = case St of
+ completed -> completed;
+ failed -> failed;
+ new -> new;
+ stopped -> stopped;
+ _ -> running
+ end,
+ Job#job{history = mem3_reshard:update_history(JobSt, St, Ts, Hist)}.
diff --git a/src/mem3/src/mem3_reshard_store.erl b/src/mem3/src/mem3_reshard_store.erl
new file mode 100644
index 000000000..e677b82f3
--- /dev/null
+++ b/src/mem3/src/mem3_reshard_store.erl
@@ -0,0 +1,288 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_reshard_store).
+
+
+-export([
+ init/3,
+
+ store_job/2,
+ load_job/2,
+ delete_job/2,
+ get_jobs/1,
+
+ store_state/1,
+ load_state/1,
+ delete_state/1, % for debugging
+
+ job_to_ejson_props/2,
+ state_to_ejson_props/1
+]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+-include("mem3_reshard.hrl").
+
+
+-spec init(#state{}, binary(), binary()) -> #state{}.
+init(#state{} = State, JobPrefix, StateDocId) ->
+ State#state{
+ job_prefix = <<?LOCAL_DOC_PREFIX, JobPrefix/binary>>,
+ state_id = <<?LOCAL_DOC_PREFIX, StateDocId/binary>>
+ }.
+
+
+-spec store_job(#state{}, #job{}) -> ok.
+store_job(#state{job_prefix = Prefix}, #job{id = Id} = Job) ->
+ with_shards_db(fun(Db) ->
+ DocId = <<Prefix/binary, Id/binary>>,
+ ok = update_doc(Db, DocId, job_to_ejson_props(Job))
+ end).
+
+
+-spec load_job(#state{}, binary()) -> {ok, {[_]}} | not_found.
+load_job(#state{job_prefix = Prefix}, Id) ->
+ with_shards_db(fun(Db) ->
+ case load_doc(Db, <<Prefix/binary, Id/binary>>) of
+ {ok, DocBody} ->
+ {ok, job_from_ejson(DocBody)};
+ not_found ->
+ not_found
+ end
+ end).
+
+
+-spec delete_job(#state{}, binary()) -> ok.
+delete_job(#state{job_prefix = Prefix}, Id) ->
+ with_shards_db(fun(Db) ->
+ DocId = <<Prefix/binary, Id/binary>>,
+ ok = delete_doc(Db, DocId)
+ end).
+
+
+-spec get_jobs(#state{}) -> [#job{}].
+get_jobs(#state{job_prefix = Prefix}) ->
+ with_shards_db(fun(Db) ->
+ PrefixLen = byte_size(Prefix),
+ FoldFun = fun(#doc{id = Id, body = Body}, Acc) ->
+ case Id of
+ <<Prefix:PrefixLen/binary, _/binary>> ->
+ {ok, [job_from_ejson(Body) | Acc]};
+ _ ->
+ {stop, Acc}
+ end
+ end,
+ Opts = [{start_key, Prefix}],
+ {ok, Jobs} = couch_db:fold_local_docs(Db, FoldFun, [], Opts),
+ lists:reverse(Jobs)
+ end).
+
+
+-spec store_state(#state{}) -> ok.
+store_state(#state{state_id = DocId} = State) ->
+ with_shards_db(fun(Db) ->
+ ok = update_doc(Db, DocId, state_to_ejson_props(State))
+ end).
+
+
+-spec load_state(#state{}) -> #state{}.
+load_state(#state{state_id = DocId} = State) ->
+ with_shards_db(fun(Db) ->
+ case load_doc(Db, DocId) of
+ {ok, DocBody} ->
+ state_from_ejson(State, DocBody);
+ not_found ->
+ State
+ end
+ end).
+
+
+-spec delete_state(#state{}) -> ok.
+delete_state(#state{state_id = DocId}) ->
+ with_shards_db(fun(Db) ->
+ ok = delete_doc(Db, DocId)
+ end).
+
+
+job_to_ejson_props(#job{source = Source, target = Targets} = Job, Opts) ->
+ Iso8601 = proplists:get_value(iso8601, Opts),
+ History = history_to_ejson(Job#job.history, Iso8601),
+ StartTime = case Iso8601 of
+ true -> iso8601(Job#job.start_time);
+ _ -> Job#job.start_time
+ end,
+ UpdateTime = case Iso8601 of
+ true -> iso8601(Job#job.update_time);
+ _ -> Job#job.update_time
+ end,
+ [
+ {id, Job#job.id},
+ {type, Job#job.type},
+ {source, Source#shard.name},
+ {target, [T#shard.name || T <- Targets]},
+ {job_state, Job#job.job_state},
+ {split_state, Job#job.split_state},
+ {state_info, state_info_to_ejson(Job#job.state_info)},
+ {node, atom_to_binary(Job#job.node, utf8)},
+ {start_time, StartTime},
+ {update_time, UpdateTime},
+ {history, History}
+ ].
+
+
+state_to_ejson_props(#state{} = State) ->
+ [
+ {state, atom_to_binary(State#state.state, utf8)},
+ {state_info, state_info_to_ejson(State#state.state_info)},
+ {update_time, State#state.update_time},
+ {node, atom_to_binary(State#state.node, utf8)}
+ ].
+
+
+% Private API
+
+with_shards_db(Fun) ->
+ DbName = config:get("mem3", "shards_db", "_dbs"),
+ case mem3_util:ensure_exists(DbName) of
+ {ok, Db} ->
+ try
+ Fun(Db)
+ after
+ catch couch_db:close(Db)
+ end;
+ Else ->
+ throw(Else)
+ end.
+
+
+delete_doc(Db, DocId) ->
+ case couch_db:open_doc(Db, DocId, []) of
+ {ok, #doc{revs = {_, Revs}}} ->
+ {ok, _} = couch_db:delete_doc(Db, DocId, Revs),
+ {ok, _} = couch_db:ensure_full_commit(Db),
+ ok;
+ {not_found, _} ->
+ ok
+ end.
+
+
+update_doc(Db, DocId, Body) ->
+ DocProps = [{<<"_id">>, DocId}] ++ Body,
+ Body1 = ?JSON_DECODE(?JSON_ENCODE({DocProps})),
+ BaseDoc = couch_doc:from_json_obj(Body1),
+ Doc = case couch_db:open_doc(Db, DocId, []) of
+ {ok, #doc{revs = Revs}} ->
+ BaseDoc#doc{revs = Revs};
+ {not_found, _} ->
+ BaseDoc
+ end,
+ case store_state() of
+ true ->
+ {ok, _} = couch_db:update_doc(Db, Doc, []),
+ couch_log:debug("~p updated doc ~p ~p", [?MODULE, DocId, Body]),
+ {ok, _} = couch_db:ensure_full_commit(Db),
+ ok;
+ false ->
+ couch_log:debug("~p not storing state in ~p", [?MODULE, DocId]),
+ ok
+ end.
+
+
+load_doc(Db, DocId) ->
+ case couch_db:open_doc(Db, DocId, [ejson_body]) of
+ {ok, #doc{body = Body}} ->
+ couch_log:debug("~p loaded doc ~p ~p", [?MODULE, DocId, Body]),
+ {ok, Body};
+ {not_found, _} ->
+ not_found
+ end.
+
+
+job_to_ejson_props(#job{} = Job) ->
+ job_to_ejson_props(Job, []).
+
+
+job_from_ejson({Props}) ->
+ Id = couch_util:get_value(<<"id">>, Props),
+ Type = couch_util:get_value(<<"type">>, Props),
+ Source = couch_util:get_value(<<"source">>, Props),
+ Target = couch_util:get_value(<<"target">>, Props),
+ JobState = couch_util:get_value(<<"job_state">>, Props),
+ SplitState = couch_util:get_value(<<"split_state">>, Props),
+ StateInfo = couch_util:get_value(<<"state_info">>, Props),
+ TStarted = couch_util:get_value(<<"start_time">>, Props),
+ TUpdated = couch_util:get_value(<<"update_time">>, Props),
+ History = couch_util:get_value(<<"history">>, Props),
+ #job{
+ id = Id,
+ type = binary_to_atom(Type, utf8),
+ job_state = binary_to_atom(JobState, utf8),
+ split_state = binary_to_atom(SplitState, utf8),
+ state_info = state_info_from_ejson(StateInfo),
+ node = node(),
+ start_time = TStarted,
+ update_time = TUpdated,
+ source = mem3_reshard:shard_from_name(Source),
+ target = [mem3_reshard:shard_from_name(T) || T <- Target],
+ history = history_from_ejson(History)
+ }.
+
+
+state_from_ejson(#state{} = State, {Props}) ->
+ StateVal = couch_util:get_value(<<"state">>, Props),
+ StateInfo = couch_util:get_value(<<"state_info">>, Props),
+ TUpdated = couch_util:get_value(<<"update_time">>, Props),
+ State#state{
+ state = binary_to_atom(StateVal, utf8),
+ state_info = state_info_from_ejson(StateInfo),
+ node = node(),
+ update_time = TUpdated
+ }.
+
+
+state_info_from_ejson({Props}) ->
+ Props1 = [{binary_to_atom(K, utf8), couch_util:to_binary(V)}
+ || {K, V} <- Props],
+ lists:sort(Props1).
+
+
+history_to_ejson(Hist, true) when is_list(Hist) ->
+ [{[{timestamp, iso8601(T)}, {type, S}, {detail, D}]} || {T, S, D} <- Hist];
+
+history_to_ejson(Hist, _) when is_list(Hist) ->
+ [{[{timestamp, T}, {type, S}, {detail, D}]} || {T, S, D} <- Hist].
+
+
+history_from_ejson(HistoryEJson) when is_list(HistoryEJson) ->
+ lists:map(fun({EventProps}) ->
+ Timestamp = couch_util:get_value(<<"timestamp">>, EventProps),
+ State = couch_util:get_value(<<"type">>, EventProps),
+ Detail = couch_util:get_value(<<"detail">>, EventProps),
+ {Timestamp, binary_to_atom(State, utf8), Detail}
+ end, HistoryEJson).
+
+
+state_info_to_ejson(Props) ->
+ {lists:sort([{K, couch_util:to_binary(V)} || {K, V} <- Props])}.
+
+
+store_state() ->
+ config:get_boolean("mem3_reshard", "store_state", true).
+
+
+iso8601(UnixSec) ->
+ Mega = UnixSec div 1000000,
+ Sec = UnixSec rem 1000000,
+ {{Y, M, D}, {H, Min, S}} = calendar:now_to_universal_time({Mega, Sec, 0}),
+ Format = "~B-~2..0B-~2..0BT~2..0B:~2..0B:~2..0BZ",
+ iolist_to_binary(io_lib:format(Format, [Y, M, D, H, Min, S])).
diff --git a/src/mem3/src/mem3_reshard_validate.erl b/src/mem3/src/mem3_reshard_validate.erl
new file mode 100644
index 000000000..2e929e62c
--- /dev/null
+++ b/src/mem3/src/mem3_reshard_validate.erl
@@ -0,0 +1,113 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_reshard_validate).
+
+-export([
+ start_args/2,
+ source/1,
+ targets/2
+]).
+
+-include_lib("mem3/include/mem3.hrl").
+
+
+-spec start_args(#shard{}, any()) -> ok | {error, term()}.
+start_args(Source, Split) ->
+ first_error([
+ check_split(Split),
+ check_range(Source, Split),
+ check_node(Source),
+ source(Source)
+ ]).
+
+
+-spec source(#shard{}) -> ok | {error, term()}.
+source(#shard{name = Name}) ->
+ case couch_server:exists(Name) of
+ true ->
+ ok;
+ false ->
+ {error, {source_shard_not_found, Name}}
+ end.
+
+
+-spec targets(#shard{}, [#shard{}]) -> ok | {error, term()}.
+targets(#shard{} = Source, Targets) ->
+ first_error([
+ target_ranges(Source, Targets)
+ ]).
+
+
+-spec check_split(any()) -> ok | {error, term()}.
+check_split(Split) when is_integer(Split), Split > 1 ->
+ ok;
+check_split(Split) ->
+ {error, {invalid_split_parameter, Split}}.
+
+
+-spec check_range(#shard{}, any()) -> ok | {error, term()}.
+check_range(#shard{range = Range = [B, E]}, Split) ->
+ case (E + 1 - B) >= Split of
+ true ->
+ ok;
+ false ->
+ {error, {shard_range_cannot_be_split, Range, Split}}
+ end.
+
+
+-spec check_node(#shard{}) -> ok | {error, term()}.
+check_node(#shard{node = undefined}) ->
+ ok;
+
+check_node(#shard{node = Node}) when Node =:= node() ->
+ ok;
+
+check_node(#shard{node = Node}) ->
+ {error, {source_shard_node_is_not_current_node, Node}}.
+
+
+-spec target_ranges(#shard{}, [#shard{}]) -> ok | {error, any()}.
+target_ranges(#shard{range = [Begin, End]}, Targets) ->
+ Ranges = [R || #shard{range = R} <- Targets],
+ SortFun = fun([B1, _], [B2, _]) -> B1 =< B2 end,
+ [First | RestRanges] = lists:sort(SortFun, Ranges),
+ try
+ TotalRange = lists:foldl(fun([B2, E2], [B1, E1]) ->
+ case B2 =:= E1 + 1 of
+ true ->
+ ok;
+ false ->
+ throw({range_error, {B2, E1}})
+ end,
+ [B1, E2]
+ end, First, RestRanges),
+ case [Begin, End] =:= TotalRange of
+ true ->
+ ok;
+ false ->
+ throw({range_error, {[Begin, End], TotalRange}})
+ end
+ catch
+ throw:{range_error, Error} ->
+ {error, {shard_range_error, Error}}
+ end.
+
+
+-spec first_error([ok | {error, term()}]) -> ok | {error, term()}.
+first_error(Results) ->
+ case [Res || Res <- Results, Res =/= ok] of
+ [] ->
+ ok;
+ [FirstError | _] ->
+ FirstError
+ end.