summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2019-03-18 13:32:15 -0400
committerNick Vatamaniuc <nickva@users.noreply.github.com>2019-04-03 10:48:45 -0400
commitbcdd99497601a3054fb1335ebb940fbe00b7469f (patch)
treef7054f50fe970dbd377e7b959f0c79de9b02cc1b
parent9f9a6fc13eab305bfcb72f83aff175362862d9be (diff)
downloadcouchdb-bcdd99497601a3054fb1335ebb940fbe00b7469f.tar.gz
Resharding supervisor and job manager
Most of the resharding logic lives in the mem3 application under the `mem3_reshard_sup` supervisor. `mem3_reshard_sup` has three children: 1) `mem3_reshard` : The main reshading job manager. 2) `mem3_reshard_job_sup` : A simple-one-for-one supervisor to keep track of individual resharding jobs. 3) `mem3_reshard_dbdoc` : Helper gen_server used to update the shard map. `mem_reshard` gen_server is the central point in the resharding logic. It is a job manager which accept new jobs, monitors jobs when they run, checkpoints their status as they make progress, and knows how to restore their state when a node reboots. Jobs are represented as instances of the `#job{}` records defined in `mem3_reshard.hrl` header. There is also a global resharding state represented by a `#state{}` record. `mem3_reshard` gen_server maintains an ets table of "live" `#job{}` records. as its gen_server state represented by `#state{}`. When jobs are checkpointed or user updates the global resharding state, `mem3_reshard` will use the `mem3_reshard_store` module to persist those updates to `_local/...` documents in the shards database. The idea is to allow jobs to persist across node or application restarts. After a job is added, if the global state is not `stopped`, `mem3_reshard` manager will ask the `mem3_reshard_job_sup` to spawn a new child. That child will be running in a gen_server defined in `mem3_reshard_job` module (included in subsequent commits). Each child process will periodically ask `mem3_reshard` manager to checkpoint when it jump to a new state. `mem3_reshard` checkpoints then informs the child to continue its work.
-rw-r--r--src/mem3/README.md6
-rw-r--r--src/mem3/README_reshard.md93
-rw-r--r--src/mem3/src/mem3.app.src1
-rw-r--r--src/mem3/src/mem3_reshard.erl918
-rw-r--r--src/mem3/src/mem3_reshard.hrl74
-rw-r--r--src/mem3/src/mem3_reshard_job_sup.erl55
-rw-r--r--src/mem3/src/mem3_reshard_sup.erl47
-rw-r--r--src/mem3/src/mem3_sup.erl6
8 files changed, 1196 insertions, 4 deletions
diff --git a/src/mem3/README.md b/src/mem3/README.md
index 1e1e0bd2c..8098f6979 100644
--- a/src/mem3/README.md
+++ b/src/mem3/README.md
@@ -22,9 +22,9 @@ shards total, so 8 nodes will hold none of the data for this database. Given
this feature, we even shard use out across the cluster by altering the 'start'
node for the database's shards.
-Splitting and merging shards is an immature feature of the system, and will
-require attention in the near-term. We believe we can implement both
-functions and perform them while the database remains online.
+Shards can be split using the `/_reshard` API endpoint. Refer to a separate
+[README](README_reshard.md) regarding the technical detail on how shard
+splitting works.
### Getting Started
diff --git a/src/mem3/README_reshard.md b/src/mem3/README_reshard.md
new file mode 100644
index 000000000..237371485
--- /dev/null
+++ b/src/mem3/README_reshard.md
@@ -0,0 +1,93 @@
+Developer Oriented Resharding Description
+=========================================
+
+This is a technical description of the resharding logic. The discussion will focus on: job creation and life-cycle, data definitions, and on the state transition mechanisms.
+
+
+Job Life-Cycle
+--------------
+
+Job creation happens in the `mem3_reshard_httpd` API handler module. That module uses `mem3_reshard_http_util` to do some validation, and eventually calls `mem3_reshard:start_split_job/2` on one or more nodes in the cluster depending where the new jobs should run.
+
+`mem3_reshard:start_split_job/2` is the main Erlang API entry point. After some more validation it creates a `#job{}` record and calls the `mem3_reshard` job manager. The manager then will add the job to its jobs ets table, save it to a `_local` document in the shards db, and most importantly, start a new resharding process. That process will be supervised by the `mem3_reshard_job_sup` supervisor.
+
+Each job will be running in a gen_server implemented in `mem3_reshard_job` module. When splitting a shard, a job will go through a series of steps such as `initial_copy`, `build_indices`, `update_shard_map`, etc. Between each step it will report progress and checkpoint with `mem3_reshard` manager. A checkpoint involved the `mem3_reshard` manager persisting that job's state to disk in `_local` document in `_dbs` db. Then job continues until `completed` state or until it failed in the `failed` state.
+
+If a user stops shard splitting on the whole cluster, then all running jobs will stop. When shard splitting is resumed, they will try to recover from their last checkpoint.
+
+A job can also be individually stopped or resumed. If a job is individually stopped it will stay so even if the global shard splitting state is `running`. A user has to explicitly set that job to a `running` state for it to resume. If a node with running jobs is turned off, when it is rebooted running jobs will resume from their last checkpoint.
+
+
+Data Definitions
+----------------
+
+This section focuses on record definition and how data is transformed to and from various formats.
+
+Right below the `mem3_reshard:start_split_job/1` API level a job is converted to a `#job{}` record defined in the `mem3_reshard.hrl` header file. That record is then used throughout most of the resharding code. The job manager `mem3_reshard` stores it in its jobs ets table, then when a job process is spawn it single argument also just a `#job{}` record. As a job process is executing it will periodically report state back to the `mem3_reshard` manager as an updated `#job{}` record.
+
+Some interesting fields from the `#job{}` record:
+
+ - `id` Uniquely identifies a job in a cluster. It is derived from the source shard name, node and a version (currently = 1).
+ - `type` Currently the only type supported is `split` but `merge` or `rebalance` might be added in the future.
+ - `job_state` The running state of the job. Indicates if the job is `running`, `stopped`, `completed` or `failed`.
+ - `split_state` Once the job is running this indicates how far along it got in the splitting process.
+ - `source` Source shard file. If/when merge is implemented this will be a list.
+ - `target` List of target shard files. This is expected to be a list of 2 items currently.
+ - `history` A time-line of state transitions represented as a list of tuples.
+ - `pid` When job is running this will be set to the pid of the process.
+
+
+In the `mem3_reshard_job_store` module the `#job{}` record is translated to an json document so it can be persisted to disk. Translation functions to and from a json in that module are also used by the HTTP API layer to return a job's state and other information to the user.
+
+Another important piece of data is the global resharding state. When a user disables resharding on a cluster, a call is made to `mem3_reshard` manager on each node and they store that in a `#state{}` record. This record is defined in the `mem3_reshard.hrl` module, and just like the `#job{}` record can be translated to/from ejson in the `mem3_reshard_store` and stored and loaded from disk.
+
+
+State Transitions
+-----------------
+
+Resharding logic has 3 separate states to keep track of:
+
+1. Per-node resharding state. This state can be toggled between `running` and `stopped`. That toggle happens via the `mem3_reshard:start/0` and `mem3_reshard:stop/1` function. This state is kept in the `#state{}` record of the `mem3_reshard` manager gen_server. This state is also persisted to the local shard map database as a `_local` document so that it is maintained through a node restart. When the state is `running` then all jobs that are not individually `stopped`, and have not failed or completed, will be `running`. When the state is `stopped` all the running jobs will be `stopped`.
+
+2. Job's running state held in the `#job{}` `job_state` field. This is the general running state of a resharding job. It can be `new`, `running`, `stopped`, `completed` or `failed`. This state is most relevant for the `mem3_reshard` manager. In other words, it is the `mem3_reshard` gen_server that starts the job, monitors it to see if it exits successfully on completion or with an error.
+
+3. Job's internal splitting state. This state tracks the steps taken during shard splitting by each job. This state is mostly relevant for the `mem3_reshard_job` module. This state is kept in `#job{}`'s `split_state` field. The progression of these states is linear going from one state to the next. That's reflected in the code as well, when one state is done, `mem3_reshard_job:get_next_state/1` is called which returns the next state in the list. The list is defined in the `SPLIT_STATES` macro. This simplistic transition is also one of the reasons why a gen_fsm wasn't considered for `mem3_reshard_job` logic.
+
+Another interesting aspect is how the `split_state` transitions happen in the `mem3_reshard_job` module. What follows is an examination of that.
+
+A job starts running in the `new` state or from a previously checkpointed state. In the later case, the job goes through some recovery logic (see `?STATE_RESTART` macro in `mem3_reshard_job`) where it tries to resume its work from where it left of. It means, for example, if it was in the `initial_copy` state and was interrupted it might have to reset the target files and copy everything again. After recovery, the state execution logic is driven by `run(#job{})` which ends up calling `?MODULE:State(#job{})` state specific functions for each state.
+
+In `mem3_reshard_job:switch_to_next_state/2` job's history is updated, any current `state_info` is cleared, job state is switched in the `#job{}` record. Then, the new state is checkpointed in the `checkpoint/1` function. Checkpoint will cast a message to the `mem3_reshard` manager. After that message is sent the job process sits and waits.
+
+In the meantime `mem3_reshard` manager checkpoints the state, which means it updates both its ETS table with the new `#job{}` record, persists the state with the `mem3_reshard_store` module, then, finally, it notifies the job process that checkpointing is done by calling `mem3_reshard_job:checkpoint_done/1`.
+
+`mem3_reshard_job:checkpoint_done/1` function call sends a `checkpoint_done` message to the job's process, at which point it starts executing that state.
+
+Most states in `mem3_reshard_job` try not to block the main job process and instead launch worker processes to perform long running operations. It is usually just one worker process but it could be multiple as well. After that it waits for the workers to finish and inspects their exit signal (see `wait_for_workers/1` function). When all the workers exit for a particular `split_state`, the job is switched to the next state with `switch_to_next_state/1` and the whole thing repeats until the `completed` state is reached when the whole job exits normally.
+
+If the source is updated at high rate and the cluster is under load, there is a possibility for the resharding jobs to take longer to finish. The cluster would have to be running at the limit where both compaction and internal replication will have difficulty catching up as fundamentally the logic used for the initial bulk copy is similar the compaction code, and topoff states are just reusing the internal replicator code. Eventually when the load subsides the jobs should catch up and finish.
+
+Individual Modules Description
+------------------------------
+
+These are mostly random notes about various modules involved in resharding. Most, but not all, are in the `mem3` application.
+
+* `mem3_reshard`: Main API entry point and the job manager.
+
+* `mem3_reshard_job` : Individual job logic.
+
+* `mem3_reshard_dbdoc` : Responsible for updating shard doc in the `_db`'s database. Besides just having a bunch of utility function there is a gen_server spawned which is used to update shard documents in a cluster in such a way as to minimize the risk of conflicts. That is accomplished by having each shard updater calling only one such updater for the whole cluster. This coordinator is picked by sorting the list of all the live mem3 nodes and picking the first one in the list.
+
+* `mem3_reshard_httpd` : API endpoint definitions.
+
+* `mem3_reshard_api` : Cluster API endpoint. This module is responsible for sending requests to all the nodes in a cluster and gathering results.
+
+* `mem3_reshard_index` : This is a helper module used by workers in the `build_indices` state.
+
+* `mem3_reshard_job_sup` : Simple one for one supervisor which keeps track of running jobs.
+
+* `mem3_reshard_store` : State persistence module. It knows how to save/restore `#job{}` and `#state{}` records to/from `_local` docs. It is also re-used for serializing `#job{}` into ejson by the HTTP API module.
+
+* `mem3_reshard_validate` : Validate that source exists, target ranges don't have gaps in them, etc.
+
+* `couch_db_split` : This module is not in `mem3` app but it does all the heavy lifting during the initial data copy. Given a source db and some targets, and a function to decide which doc go to which target, it will copy all data from the source to the targets. It's best to think of this module as a form of compactor. Unlike `couch_bt_engine_compactor` this one lives above the `couch_db_engine` API, and instead of copying data to one new file it copies it to 2 or more. Unsurprisingly because of that it uses some lower level `couch_db_engine` API directly, including linking to a couch_db_updater, force setting db update sequences and others.
diff --git a/src/mem3/src/mem3.app.src b/src/mem3/src/mem3.app.src
index 15efc64ca..889ebf9a3 100644
--- a/src/mem3/src/mem3.app.src
+++ b/src/mem3/src/mem3.app.src
@@ -20,6 +20,7 @@
mem3_shards,
mem3_sync,
mem3_sync_nodes,
+ mem3_reshard,
mem3_sup
]},
{applications, [
diff --git a/src/mem3/src/mem3_reshard.erl b/src/mem3/src/mem3_reshard.erl
new file mode 100644
index 000000000..87aee6852
--- /dev/null
+++ b/src/mem3/src/mem3_reshard.erl
@@ -0,0 +1,918 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_reshard).
+
+
+-behaviour(gen_server).
+
+
+-export([
+ start_link/0,
+
+ start/0,
+ stop/1,
+
+ start_split_job/1,
+ stop_job/2,
+ resume_job/1,
+ remove_job/1,
+
+ get_state/0,
+ jobs/0,
+ job/1,
+ is_disabled/0,
+
+ report/2,
+ checkpoint/2,
+
+ now_sec/0,
+ update_history/4,
+ shard_from_name/1,
+ reset_state/0
+]).
+
+-export([
+ init/1,
+ terminate/2,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ code_change/3
+]).
+
+
+-include("mem3_reshard.hrl").
+
+
+-define(JOB_ID_VERSION, 1).
+-define(JOB_STATE_VERSION, 1).
+-define(DEFAULT_MAX_JOBS, 25).
+-define(DEFAULT_MAX_HISTORY, 20).
+-define(JOB_PREFIX, <<"reshard-job-">>).
+-define(STATE_PREFIX, <<"reshard-state-">>).
+
+
+%% Public API
+
+-spec start_link() -> {ok, pid()} | ignore | {error, term()}.
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+-spec start() -> ok | {error, any()}.
+start() ->
+ case is_disabled() of
+ true -> {error, resharding_disabled};
+ false -> gen_server:call(?MODULE, start, infinity)
+ end.
+
+
+-spec stop(binary()) -> ok | {error, any()}.
+stop(Reason) ->
+ case is_disabled() of
+ true -> {error, resharding_disabled};
+ false -> gen_server:call(?MODULE, {stop, Reason}, infinity)
+ end.
+
+
+-spec start_split_job(#shard{} | binary()) -> {ok, binary()} | {error, term()}.
+start_split_job(#shard{} = Shard) ->
+ start_split_job(Shard, 2);
+
+start_split_job(ShardName) when is_binary(ShardName) ->
+ start_split_job(shard_from_name(ShardName), 2).
+
+
+-spec start_split_job(#shard{}, split()) -> {ok, binary()} | {error, any()}.
+start_split_job(#shard{} = Source, Split) ->
+ case is_disabled() of
+ true -> {error, resharding_disabled};
+ false -> validate_and_start_job(Source, Split)
+ end.
+
+
+-spec stop_job(binary(), binary()) -> ok | {error, any()}.
+stop_job(JobId, Reason) when is_binary(JobId), is_binary(Reason) ->
+ case is_disabled() of
+ true -> {error, resharding_disabled};
+ false -> gen_server:call(?MODULE, {stop_job, JobId, Reason}, infinity)
+ end.
+
+
+-spec resume_job(binary()) -> ok | {error, any()}.
+resume_job(JobId) when is_binary(JobId) ->
+ case is_disabled() of
+ true -> {error, resharding_disabled};
+ false -> gen_server:call(?MODULE, {resume_job, JobId}, infinity)
+ end.
+
+
+-spec remove_job(binary()) -> ok | {error, any()}.
+remove_job(JobId) when is_binary(JobId) ->
+ case is_disabled() of
+ true -> {error, resharding_disabled};
+ false -> gen_server:call(?MODULE, {remove_job, JobId}, infinity)
+ end.
+
+
+-spec get_state() -> {[_ | _]}.
+get_state() ->
+ gen_server:call(?MODULE, get_state, infinity).
+
+
+-spec jobs() -> [[tuple()]].
+jobs() ->
+ ets:foldl(fun(Job, Acc) ->
+ Opts = [iso8601],
+ Props = mem3_reshard_store:job_to_ejson_props(Job, Opts),
+ [{Props} | Acc]
+ end, [], ?MODULE).
+
+
+-spec job(job_id()) -> {ok, {[_ | _]}} | {error, not_found}.
+job(JobId) ->
+ case job_by_id(JobId) of
+ #job{} = Job ->
+ Opts = [iso8601],
+ Props = mem3_reshard_store:job_to_ejson_props(Job, Opts),
+ {ok, {Props}};
+ not_found ->
+ {error, not_found}
+ end.
+
+
+% Return true resharding is disabled in the application level settings
+-spec is_disabled() -> boolean().
+is_disabled() ->
+ case application:get_env(mem3, reshard_disabled) of
+ {ok, "true"} -> true;
+ {ok, true} -> true;
+ _ -> false
+ end.
+
+
+% State reporting callbacks. Used by mem3_reshard_job module.
+-spec report(pid(), #job{}) -> ok.
+report(Server, #job{} = Job) when is_pid(Server) ->
+ gen_server:cast(Server, {report, Job}).
+
+
+-spec checkpoint(pid(), #job{}) -> ok.
+checkpoint(Server, #job{} = Job) ->
+ couch_log:notice("~p checkpointing ~p ~p", [?MODULE, Server, jobfmt(Job)]),
+ gen_server:cast(Server, {checkpoint, Job}).
+
+
+% Utility functions used from other mem3_reshard modules
+
+-spec now_sec() -> non_neg_integer().
+now_sec() ->
+ {Mega, Sec, _Micro} = os:timestamp(),
+ Mega * 1000000 + Sec.
+
+
+-spec update_history(atom(), binary() | null, time_sec(), list()) -> list().
+update_history(State, State, Ts, History) ->
+ % State is the same as detail. Make the detail null to avoid duplication
+ update_history(State, null, Ts, History);
+
+update_history(State, Detail, Ts, History) ->
+ % Reverse, so we can process the last event as the head using
+ % head matches, then after append and trimming, reserse again
+ Rev = lists:reverse(History),
+ UpdatedRev = update_history_rev(State, Detail, Ts, Rev),
+ TrimmedRev = lists:sublist(UpdatedRev, max_history()),
+ lists:reverse(TrimmedRev).
+
+
+-spec shard_from_name(binary()) -> #shard{}.
+shard_from_name(<<"shards/", _:8/binary, "-", _:8/binary, "/",
+ Rest/binary>> = Shard) ->
+ Range = mem3:range(Shard),
+ [DbName, Suffix] = binary:split(Rest, <<".">>),
+ build_shard(Range, DbName, Suffix).
+
+
+% For debugging only
+
+-spec reset_state() -> ok.
+reset_state() ->
+ gen_server:call(?MODULE, reset_state, infinity).
+
+
+% Gen server functions
+
+init(_) ->
+ % Advertise resharding API feature only if it is not disabled
+ case is_disabled() of
+ true -> ok;
+ false -> config:enable_feature('reshard')
+ end,
+ couch_log:notice("~p start init()", [?MODULE]),
+ EtsOpts = [named_table, {keypos, #job.id}, {read_concurrency, true}],
+ ?MODULE = ets:new(?MODULE, EtsOpts),
+ ManagerPid = self(),
+ State = #state{
+ state = running,
+ state_info = [],
+ update_time = now_sec(),
+ node = node(),
+ db_monitor = spawn_link(fun() -> db_monitor(ManagerPid) end)
+ },
+ State1 = mem3_reshard_store:init(State, ?JOB_PREFIX, state_id()),
+ State2 = mem3_reshard_store:load_state(State1, running),
+ State3 = maybe_disable(State2),
+ gen_server:cast(self(), reload_jobs),
+ {ok, State3}.
+
+
+terminate(Reason, State) ->
+ couch_log:notice("~p terminate ~p ~p", [?MODULE, Reason, statefmt(State)]),
+ catch unlink(State#state.db_monitor),
+ catch exit(State#state.db_monitor, kill),
+ lists:foreach(fun(Job) -> kill_job_int(Job) end, running_jobs()).
+
+
+handle_call(start, _From, #state{state = stopped} = State) ->
+ State1 = State#state{
+ state = running,
+ update_time = now_sec(),
+ state_info = info_delete(reason, State#state.state_info)
+ },
+ ok = mem3_reshard_store:store_state(State1),
+ State2 = maybe_disable(State1),
+ State3 = reload_jobs(State2),
+ {reply, ok, State3};
+
+handle_call(start, _From, State) ->
+ {reply, ok, State};
+
+handle_call({stop, Reason}, _From, #state{state = running} = State) ->
+ State1 = State#state{
+ state = stopped,
+ update_time = now_sec(),
+ state_info = info_update(reason, Reason, State#state.state_info)
+ },
+ ok = mem3_reshard_store:store_state(State1),
+ lists:foreach(fun(Job) -> temporarily_stop_job(Job) end, running_jobs()),
+ {reply, ok, State1};
+
+handle_call({stop, _}, _From, State) ->
+ {reply, ok, State};
+
+handle_call({start_job, #job{id = Id, source = Source} = Job}, _From, State) ->
+ couch_log:notice("~p start_job call ~p", [?MODULE, jobfmt(Job)]),
+ Total = ets:info(?MODULE, size),
+ SourceOk = mem3_reshard_validate:source(Source),
+ case {job_by_id(Id), Total + 1 =< get_max_jobs(), SourceOk} of
+ {not_found, true, ok} ->
+ handle_start_job(Job, State);
+ {#job{}, _, _} ->
+ {reply, {error, job_already_exists}, State};
+ {_, false, _} ->
+ {reply, {error, max_jobs_exceeded}, State};
+ {_, _, {error, _} = SourceError} ->
+ {reply, SourceError, State}
+ end;
+
+handle_call({resume_job, _}, _From, #state{state = stopped} = State) ->
+ case couch_util:get_value(reason, State#state.state_info) of
+ undefined ->
+ {reply, {error, stopped}, State};
+ Reason ->
+ {reply, {error, {stopped, Reason}}, State}
+ end;
+
+handle_call({resume_job, Id}, _From, State) ->
+ couch_log:notice("~p resume_job call ~p", [?MODULE, Id]),
+ case job_by_id(Id) of
+ #job{job_state = stopped} = Job ->
+ case start_job_int(Job, State) of
+ ok ->
+ {reply, ok, State};
+ {error, Error} ->
+ {reply, {error, Error}, State}
+ end;
+ #job{} ->
+ {reply, ok, State};
+ not_found ->
+ {reply, {error, not_found}, State}
+ end;
+
+handle_call({stop_job, Id, Reason}, _From, State) ->
+ couch_log:notice("~p stop_job Id:~p Reason:~p", [?MODULE, Id, Reason]),
+ case job_by_id(Id) of
+ #job{job_state = JSt} = Job when JSt =:= running orelse JSt =:= new
+ orelse JSt =:= stopped ->
+ ok = stop_job_int(Job, stopped, Reason, State),
+ {reply, ok, State};
+ #job{} ->
+ {reply, ok, State};
+ not_found ->
+ {reply, {error, not_found}, State}
+ end;
+
+handle_call({remove_job, Id}, _From, State) ->
+ {reply, remove_job_int(Id, State), State};
+
+handle_call(get_state, _From, #state{state = GlobalState} = State) ->
+ StateProps = mem3_reshard_store:state_to_ejson_props(State),
+ Stats0 = #{running => 0, completed => 0, failed => 0, stopped => 0},
+ StateStats = ets:foldl(fun(#job{job_state = JS}, Acc) ->
+ % When jobs are disabled globally their state is not checkpointed as
+ % "stopped", but it stays as "running". But when returning the state we
+ % don't want to mislead and indicate that there are "N running jobs"
+ % when the global state is "stopped".
+ JS1 = case GlobalState =:= stopped andalso JS =:= running of
+ true -> stopped;
+ false -> JS
+ end,
+ Acc#{JS1 => maps:get(JS1, Acc, 0) + 1}
+ end, Stats0, ?MODULE),
+ Total = ets:info(?MODULE, size),
+ StateStats1 = maps:to_list(StateStats) ++ [{total, Total}],
+ Result = {lists:sort(StateProps ++ StateStats1)},
+ {reply, Result, State};
+
+handle_call(reset_state, _From, State) ->
+ {reply, ok, reset_state(State)};
+
+handle_call(Call, From, State) ->
+ couch_log:error("~p unknown call ~p from: ~p", [?MODULE, Call, From]),
+ {noreply, State}.
+
+
+handle_cast({db_deleted, DbName}, State) ->
+ % Remove only completed jobs. Other running states would `fail` but
+ % job result would stick around so users can inspect them.
+ JobIds = jobs_by_db_and_state(DbName, completed),
+ [remove_job_int(JobId, State) || JobId <- JobIds],
+ {noreply, State};
+
+handle_cast({report, Job}, State) ->
+ report_int(Job),
+ {noreply, State};
+
+handle_cast({checkpoint, Job}, State) ->
+ {noreply, checkpoint_int(Job, State)};
+
+handle_cast(reload_jobs, State) ->
+ couch_log:notice("~p starting reloading jobs", [?MODULE]),
+ State1 = reload_jobs(State),
+ couch_log:notice("~p finished reloading jobs", [?MODULE]),
+ {noreply, State1};
+
+handle_cast(Cast, State) ->
+ couch_log:error("~p unexpected cast ~p", [?MODULE, Cast]),
+ {noreply, State}.
+
+
+handle_info({'DOWN', _Ref, process, Pid, Info}, State) ->
+ case job_by_pid(Pid) of
+ {ok, Job} ->
+ couch_log:notice("~p job ~s exit ~p", [?MODULE, Job#job.id, Info]),
+ ok = handle_job_exit(Job, Info, State);
+ {error, not_found} ->
+ couch_log:error("~p job not found: ~p ~p", [?MODULE, Pid, Info])
+ end,
+ {noreply, State};
+
+handle_info(Info, State) ->
+ couch_log:error("~p unexpected info ~p", [?MODULE, Info]),
+ {noreply, State}.
+
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+
+%% Private API
+
+validate_and_start_job(#shard{} = Source, Split) ->
+ case mem3_reshard_validate:start_args(Source, Split) of
+ ok ->
+ Target = target_shards(Source, Split),
+ case mem3_reshard_validate:targets(Source, Target) of
+ ok ->
+ TStamp = now_sec(),
+ Job = #job{
+ type = split,
+ job_state = new,
+ split_state = new,
+ start_time = TStamp,
+ update_time = TStamp,
+ node = node(),
+ source = Source,
+ target = Target
+ },
+ Job1 = Job#job{id = job_id(Job)},
+ Job2 = update_job_history(Job1),
+ gen_server:call(?MODULE, {start_job, Job2}, infinity);
+ {error, Error} ->
+ {error, Error}
+ end;
+ {error, Error} ->
+ {error, Error}
+ end.
+
+
+handle_start_job(#job{} = Job, #state{state = running} = State) ->
+ case start_job_int(Job, State) of
+ ok ->
+ {reply, {ok, Job#job.id}, State};
+ {error, Error} ->
+ {reply, {error, Error}, State}
+ end;
+
+handle_start_job(#job{} = Job, #state{state = stopped} = State) ->
+ ok = mem3_reshard_store:store_job(State, Job),
+ % Since resharding is stopped on this node, the job is temporarily marked
+ % as stopped in the ets table so as not to return a "running" result which
+ % would look odd.
+ temporarily_stop_job(Job),
+ {reply, {ok, Job#job.id}, State}.
+
+
+% Insert job in the ets table as a temporarily stopped job. This would happen
+% when a job is reloaded or added when node-wide resharding is stopped.
+-spec temporarily_stop_job(#job{}) -> #job{}.
+temporarily_stop_job(Job) ->
+ Job1 = kill_job_int(Job),
+ OldInfo = Job1#job.state_info,
+ Reason = <<"Shard splitting disabled">>,
+ Job2 = Job1#job{
+ job_state = stopped,
+ update_time = now_sec(),
+ start_time = 0,
+ state_info = info_update(reason, Reason, OldInfo),
+ pid = undefined,
+ ref = undefined
+ },
+ Job3 = update_job_history(Job2),
+ true = ets:insert(?MODULE, Job3),
+ Job3.
+
+
+-spec reload_jobs(#state{}) -> #state{}.
+reload_jobs(State) ->
+ Jobs = mem3_reshard_store:get_jobs(State),
+ lists:foldl(fun reload_job/2, State, Jobs).
+
+
+% This is a case when main application is stopped but a job is reloaded that
+% was checkpointed in running state. Set that state to stopped to avoid the API
+% results looking odd.
+-spec reload_job(#job{}, #state{}) -> #state{}.
+reload_job(#job{job_state = JS} = Job, #state{state = stopped} = State)
+ when JS =:= running orelse JS =:= new ->
+ temporarily_stop_job(Job),
+ State;
+
+% This is a case when a job process should be spawend
+reload_job(#job{job_state = JS} = Job, #state{state = running} = State)
+ when JS =:= running orelse JS =:= new ->
+ case start_job_int(Job, State) of
+ ok ->
+ State;
+ {error, Error} ->
+ Msg = "~p could not resume ~s error: ~p",
+ couch_log:error(Msg, [?MODULE, jobfmt(Job), Error]),
+ State
+ end;
+
+% If job is disabled individually (stopped by the user), is completed or failed
+% then simply load it into the ets table
+reload_job(#job{job_state = JS} = Job, #state{} = State)
+ when JS =:= failed orelse JS =:= completed orelse JS =:= stopped ->
+ true = ets:insert(?MODULE, Job),
+ State.
+
+
+-spec get_max_jobs() -> integer().
+get_max_jobs() ->
+ config:get_integer("reshard", "max_jobs", ?DEFAULT_MAX_JOBS).
+
+
+-spec start_job_int(#job{}, #state{}) -> ok | {error, term()}.
+start_job_int(Job, State) ->
+ case spawn_job(Job) of
+ {ok, #job{} = Job1} ->
+ Job2 = update_job_history(Job1),
+ ok = mem3_reshard_store:store_job(State, Job2),
+ true = ets:insert(?MODULE, Job2),
+ ok;
+ {error, Error} ->
+ {error, Error}
+ end.
+
+
+-spec spawn_job(#job{}) -> {ok, pid()} | {error, term()}.
+spawn_job(#job{} = Job0) ->
+ Job = Job0#job{
+ job_state = running,
+ start_time = 0,
+ update_time = now_sec(),
+ state_info = info_delete(reason, Job0#job.state_info),
+ manager = self(),
+ workers = [],
+ retries = 0
+ },
+ case mem3_reshard_job_sup:start_child(Job) of
+ {ok, Pid} ->
+ Ref = monitor(process, Pid),
+ {ok, Job#job{pid = Pid, ref = Ref}};
+ {error, Reason} ->
+ {error, Reason}
+ end.
+
+
+-spec stop_job_int(#job{}, job_state(), term(), #state{}) -> ok.
+stop_job_int(#job{} = Job, JobState, Reason, State) ->
+ couch_log:info("~p stop_job_int ~p newstate: ~p reason:~p", [?MODULE,
+ jobfmt(Job), JobState, Reason]),
+ Job1 = kill_job_int(Job),
+ Job2 = Job1#job{
+ job_state = JobState,
+ update_time = now_sec(),
+ state_info = [{reason, Reason}]
+ },
+ ok = mem3_reshard_store:store_job(State, Job2),
+ true = ets:insert(?MODULE, Job2),
+ couch_log:info("~p stop_job_int stopped ~p", [?MODULE, jobfmt(Job2)]),
+ ok.
+
+
+-spec kill_job_int(#job{}) -> #job{}.
+kill_job_int(#job{pid = undefined} = Job) ->
+ Job;
+
+kill_job_int(#job{pid = Pid, ref = Ref} = Job) ->
+ couch_log:info("~p kill_job_int ~p", [?MODULE, jobfmt(Job)]),
+ demonitor(Ref, [flush]),
+ case erlang:is_process_alive(Pid) of
+ true ->
+ ok = mem3_reshard_job_sup:terminate_child(Pid);
+ false ->
+ ok
+ end,
+ Job1 = Job#job{pid = undefined, ref = undefined},
+ true = ets:insert(?MODULE, Job1),
+ Job1.
+
+
+-spec handle_job_exit(#job{}, term(), #state{}) -> ok.
+handle_job_exit(#job{split_state = completed} = Job, normal, State) ->
+ couch_log:notice("~p completed job ~s exited", [?MODULE, Job#job.id]),
+ Job1 = Job#job{
+ pid = undefined,
+ ref = undefined,
+ job_state = completed,
+ update_time = now_sec(),
+ state_info = []
+ },
+ Job2 = update_job_history(Job1),
+ ok = mem3_reshard_store:store_job(State, Job2),
+ true = ets:insert(?MODULE, Job2),
+ ok;
+
+handle_job_exit(#job{job_state = running} = Job, normal, _State) ->
+ couch_log:notice("~p running job ~s stopped", [?MODULE, Job#job.id]),
+ OldInfo = Job#job.state_info,
+ Job1 = Job#job{
+ pid = undefined,
+ ref = undefined,
+ job_state = stopped,
+ update_time = now_sec(),
+ state_info = info_update(reason, <<"Job stopped">>, OldInfo)
+ },
+ true = ets:insert(?MODULE, update_job_history(Job1)),
+ ok;
+
+handle_job_exit(#job{job_state = running} = Job, shutdown, _State) ->
+ couch_log:notice("~p job ~s shutdown", [?MODULE, Job#job.id]),
+ OldInfo = Job#job.state_info,
+ Job1 = Job#job{
+ pid = undefined,
+ ref = undefined,
+ job_state = stopped,
+ update_time = now_sec(),
+ state_info = info_update(reason, <<"Job shutdown">>, OldInfo)
+ },
+ true = ets:insert(?MODULE, update_job_history(Job1)),
+ ok;
+
+handle_job_exit(#job{job_state = running} = Job, {shutdown, Msg}, _State) ->
+ couch_log:notice("~p job ~s shutdown ~p", [?MODULE, Job#job.id, Msg]),
+ OldInfo = Job#job.state_info,
+ Job1 = Job#job{
+ pid = undefined,
+ ref = undefined,
+ job_state = stopped,
+ update_time = now_sec(),
+ state_info = info_update(reason, <<"Job shutdown">>, OldInfo)
+ },
+ true = ets:insert(?MODULE, update_job_history(Job1)),
+ ok;
+
+handle_job_exit(#job{} = Job, Error, State) ->
+ couch_log:notice("~p job ~s failed ~p", [?MODULE, Job#job.id, Error]),
+ OldInfo = Job#job.state_info,
+ Job1 = Job#job{
+ pid = undefined,
+ ref = undefined,
+ job_state = failed,
+ update_time = now_sec(),
+ state_info = info_update(reason, Error, OldInfo)
+ },
+ Job2 = update_job_history(Job1),
+ ok = mem3_reshard_store:store_job(State, Job2),
+ true = ets:insert(?MODULE, Job2),
+ ok.
+
+
+-spec job_by_id(job_id()) -> #job{} | not_found.
+job_by_id(Id) ->
+ case ets:lookup(?MODULE, Id) of
+ [] ->
+ not_found;
+ [#job{} = Job] ->
+ Job
+ end.
+
+
+-spec job_by_pid(pid()) -> {ok, #job{}} | {error, not_found}.
+job_by_pid(Pid) when is_pid(Pid) ->
+ case ets:match_object(?MODULE, #job{pid=Pid, _='_'}) of
+ [] ->
+ {error, not_found};
+ [#job{} = Job] ->
+ {ok, Job}
+ end.
+
+
+-spec state_id() -> binary().
+state_id() ->
+ Ver = iolist_to_binary(io_lib:format("~3..0B", [?JOB_STATE_VERSION])),
+ <<?STATE_PREFIX/binary, Ver/binary>>.
+
+
+-spec job_id(#job{}) -> binary().
+job_id(#job{source = #shard{name = SourceName}}) ->
+ HashInput = [SourceName, atom_to_binary(node(), utf8)],
+ IdHashList = couch_util:to_hex(crypto:hash(sha256, HashInput)),
+ IdHash = iolist_to_binary(IdHashList),
+ Prefix = iolist_to_binary(io_lib:format("~3..0B", [?JOB_ID_VERSION])),
+ <<Prefix/binary, "-", IdHash/binary>>.
+
+
+-spec target_shards(#shard{}, split()) -> [#shard{}].
+target_shards(#shard{name = Name, range = [B, E], dbname = DbName}, Split) when
+ is_integer(Split), Split >= 2, (E - B + 1) >= Split ->
+ Ranges = target_ranges([B, E], Split),
+ <<"shards/", _:8/binary, "-", _:8/binary, "/", DbAndSuffix/binary>> = Name,
+ [DbName, Suffix] = binary:split(DbAndSuffix, <<".">>),
+ [build_shard(R, DbName, Suffix) || R <- Ranges].
+
+
+-spec target_ranges([range_pos()], split()) -> [[range_pos()]].
+target_ranges([Begin, End], Split) when (End - Begin + 1) >= Split,
+ Split >=2 ->
+ Len = End - Begin + 1, % + 1 since intervals are inclusive
+ NewLen = Len div Split,
+ Rem = Len rem Split,
+ Ranges = [[I, I + NewLen - 1] || I <- lists:seq(Begin, End - Rem, NewLen)],
+ % Adjust last end to always match the original end to ensure we always
+ % cover the whole range. In case when remainder is larger this will make
+ % the last range larger. Improve the algorithm later to re-distribute
+ % the remainder equally amonst the chunks.
+ {BeforeLast, [[BeginLast, _]]} = lists:split(Split - 1, Ranges),
+ BeforeLast ++ [[BeginLast, End]].
+
+
+-spec build_shard([non_neg_integer()], binary(), binary()) -> #shard{}.
+build_shard(Range, DbName, Suffix) ->
+ Shard = #shard{dbname = DbName, range = Range, node = node()},
+ mem3_util:name_shard(Shard, <<".", Suffix/binary>>).
+
+
+-spec running_jobs() -> [#job{}].
+running_jobs() ->
+ Pat = #job{job_state = running, _ = '_'},
+ ets:match_object(?MODULE, Pat).
+
+
+-spec info_update(atom(), any(), [tuple()]) -> [tuple()].
+info_update(Key, Val, StateInfo) ->
+ lists:keystore(Key, 1, StateInfo, {Key, Val}).
+
+
+-spec info_delete(atom(), [tuple()]) -> [tuple()].
+info_delete(Key, StateInfo) ->
+ lists:keydelete(Key, 1, StateInfo).
+
+
+-spec checkpoint_int(#job{}, #state{}) -> #state{}.
+checkpoint_int(#job{} = Job, State) ->
+ couch_log:debug("~p checkpoint ~s", [?MODULE, jobfmt(Job)]),
+ case report_int(Job) of
+ ok ->
+ ok = mem3_reshard_store:store_job(State, Job),
+ ok = mem3_reshard_job:checkpoint_done(Job),
+ State;
+ not_found ->
+ couch_log:error("~p checkpoint couldn't find ~p", [?MODULE, Job]),
+ State
+ end.
+
+
+-spec report_int(#job{}) -> ok | not_found.
+report_int(Job) ->
+ case ets:lookup(?MODULE, Job#job.id) of
+ [#job{ref = Ref, pid = CurPid}] ->
+ case Job#job.pid =:= CurPid of
+ true ->
+ couch_log:debug("~p reported ~s", [?MODULE, jobfmt(Job)]),
+ % Carry over the reference from ets as the #job{} coming
+ % from the job process won't have it's own monitor ref.
+ true = ets:insert(?MODULE, Job#job{ref = Ref}),
+ ok;
+ false ->
+ LogMsg = "~p ignoring old job report ~p curr pid:~p",
+ couch_log:warning(LogMsg, [?MODULE, jobfmt(Job), CurPid]),
+ not_found
+ end;
+ _ ->
+ couch_log:error("~p reporting : couldn't find ~p", [?MODULE, Job]),
+ not_found
+ end.
+
+
+-spec remove_job_int(#job{}, #state{}) -> ok | {error, not_found}.
+remove_job_int(Id, State) ->
+ couch_log:notice("~p call remove_job Id:~p", [?MODULE, Id]),
+ case job_by_id(Id) of
+ #job{} = Job ->
+ kill_job_int(Job),
+ ok = mem3_reshard_store:delete_job(State, Id),
+ ets:delete(?MODULE, Job#job.id),
+ ok;
+ not_found ->
+ {error, not_found}
+ end.
+
+
+% This function is for testing and debugging only
+-spec reset_state(#state{}) -> #state{}.
+reset_state(#state{} = State) ->
+ couch_log:warning("~p resetting state", [?MODULE]),
+ ok = mem3_reshard_store:delete_state(State),
+ couch_log:warning("~p killing all running jobs", [?MODULE]),
+ [kill_job_int(Job) || Job <- running_jobs()],
+ ets:delete_all_objects(?MODULE),
+ couch_log:warning("~p resetting all job states", [?MODULE]),
+ Jobs = mem3_reshard_store:get_jobs(State),
+ lists:foldl(fun(#job{id = Id}, StateAcc) ->
+ couch_log:warning("~p resetting job state ~p", [?MODULE, Id]),
+ ok = mem3_reshard_store:delete_job(StateAcc, Id),
+ StateAcc
+ end, State, Jobs),
+ couch_log:warning("~p resetting state done", [?MODULE]),
+ State#state{
+ state = running,
+ state_info = [],
+ update_time = now_sec()
+ }.
+
+
+-spec update_job_history(#job{}) -> #job{}.
+update_job_history(#job{job_state = St, update_time = Ts} = Job) ->
+ Hist = Job#job.history,
+ Reason = case couch_util:get_value(reason, Job#job.state_info) of
+ undefined -> null;
+ Val -> couch_util:to_binary(Val)
+ end,
+ Job#job{history = update_history(St, Reason, Ts, Hist)}.
+
+
+update_history_rev(State, null, Ts, [{_, State, Detail} | Rest]) ->
+ % Just updated the detail, state stays the same, no new entry added
+ [{Ts, State, Detail} | Rest];
+
+update_history_rev(State, Detail, Ts, [{_, State, Detail} | Rest]) ->
+ % State and detail were same as last event, just update the timestamp
+ [{Ts, State, Detail} | Rest];
+
+update_history_rev(State, Detail, Ts, [{_, State, Detail} | Rest]) ->
+ % State and detail were same as last event, just update the timestamp
+ [{Ts, State, Detail} | Rest];
+
+update_history_rev(State, Detail, Ts, History) ->
+ [{Ts, State, Detail} | History].
+
+
+-spec max_history() -> non_neg_integer().
+max_history() ->
+ config:get_integer("reshard", "max_history", ?DEFAULT_MAX_HISTORY).
+
+
+-spec maybe_disable(#state{}) -> #state{}.
+maybe_disable(#state{} = State) ->
+ case is_disabled() of
+ true ->
+ Reason = <<"Resharding disabled by application level config">>,
+ SInfo = State#state.state_info,
+ State#state{
+ state = stopped,
+ state_info = info_update(reason, Reason, SInfo)
+ };
+ false ->
+ State
+ end.
+
+
+-spec jobs_by_db_and_state(binary(), split_state() | '_') -> [job_id()].
+jobs_by_db_and_state(Db, State) ->
+ DbName = mem3:dbname(Db),
+ Pat = #job{
+ id = '$1',
+ source =#shard{dbname = DbName, _ = '_'},
+ job_state = State,
+ _ = '_'
+ },
+ [JobId || [JobId] <- ets:match(?MODULE, Pat)].
+
+
+-spec db_exists(binary()) -> boolean().
+db_exists(Name) ->
+ try
+ mem3:shards(mem3:dbname(Name)),
+ true
+ catch
+ error:database_does_not_exist ->
+ false
+ end.
+
+
+-spec db_monitor(pid()) -> no_return().
+db_monitor(Server) ->
+ couch_log:notice("~p db monitor ~p starting", [?MODULE, self()]),
+ EvtRef = erlang:monitor(process, couch_event_server),
+ couch_event:register_all(self()),
+ db_monitor_loop(Server, EvtRef).
+
+
+-spec db_monitor_loop(pid(), reference()) -> no_return().
+db_monitor_loop(Server, EvtRef) ->
+ receive
+ {'$couch_event', DbName, deleted} ->
+ case db_exists(DbName) of
+ true ->
+ % Could be source shard being deleted during splitting
+ ok;
+ false ->
+ case length(jobs_by_db_and_state(DbName, '_')) > 0 of
+ true ->
+ % Notify only if there are jobs with that db
+ gen_server:cast(Server, {db_deleted, DbName});
+ false ->
+ ok
+ end
+ end,
+ db_monitor_loop(Server, EvtRef);
+ {'$couch_event', _, _} ->
+ db_monitor_loop(Server, EvtRef);
+ {'DOWN', EvtRef, _, _, Info} ->
+ couch_log:error("~p db monitor listener died ~p", [?MODULE, Info]),
+ exit({db_monitor_died, Info});
+ Msg ->
+ couch_log:error("~p db monitor unexpected msg ~p", [?MODULE, Msg]),
+ db_monitor_loop(Server, EvtRef)
+ end.
+
+
+-spec statefmt(#state{} | term()) -> string().
+statefmt(#state{state = StateName}) ->
+ Total = ets:info(?MODULE, size),
+ Active = mem3_reshard_job_sup:count_children(),
+ Msg = "#state{~s total:~B active:~B}",
+ Fmt = io_lib:format(Msg, [StateName, Total, Active]),
+ lists:flatten(Fmt);
+
+statefmt(State) ->
+ Fmt = io_lib:format("<Unknown split state:~p>", [State]),
+ lists:flatten(Fmt).
+
+
+-spec jobfmt(#job{}) -> string().
+jobfmt(#job{} = Job) ->
+ mem3_reshard_job:jobfmt(Job).
diff --git a/src/mem3/src/mem3_reshard.hrl b/src/mem3/src/mem3_reshard.hrl
new file mode 100644
index 000000000..ad76aeadf
--- /dev/null
+++ b/src/mem3/src/mem3_reshard.hrl
@@ -0,0 +1,74 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-include_lib("mem3/include/mem3.hrl").
+
+
+-type range_pos() :: non_neg_integer().
+-type split() :: pos_integer().
+-type job_id() :: binary() | undefined.
+-type job_type() :: split.
+-type time_sec() :: non_neg_integer().
+
+-type shard_split_main_state() ::
+ running |
+ stopped.
+
+-type job_state() ::
+ new |
+ running |
+ stopped |
+ failed |
+ completed.
+
+-type split_state() ::
+ new |
+ initial_copy |
+ topoff1 |
+ build_indices |
+ topoff2 |
+ copy_local_docs |
+ update_shardmap |
+ wait_source_close |
+ topoff3 |
+ source_delete |
+ completed.
+
+
+-record(job, {
+ id :: job_id() | '$1' | '_',
+ type :: job_type(),
+ job_state :: job_state(),
+ split_state :: split_state(),
+ state_info = [] :: [{atom(), any()}],
+ source :: #shard{},
+ target :: [#shard{}],
+ history = [] :: [{atom(), time_sec()}],
+ start_time = 0 :: non_neg_integer(),
+ update_time = 0 :: non_neg_integer(),
+ node :: node(),
+ pid :: undefined | pid() | '$1' | '_',
+ ref :: undefined | reference() | '_',
+ manager :: undefined | pid(),
+ workers = [] :: [pid()],
+ retries = 0 :: non_neg_integer()
+}).
+
+-record(state, {
+ state :: shard_split_main_state(),
+ state_info :: [],
+ update_time :: non_neg_integer(),
+ job_prefix :: binary(),
+ state_id :: binary(),
+ node :: node(),
+ db_monitor :: pid()
+}).
diff --git a/src/mem3/src/mem3_reshard_job_sup.erl b/src/mem3/src/mem3_reshard_job_sup.erl
new file mode 100644
index 000000000..3f1b3bfb4
--- /dev/null
+++ b/src/mem3/src/mem3_reshard_job_sup.erl
@@ -0,0 +1,55 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_reshard_job_sup).
+
+-behaviour(supervisor).
+
+-export([
+ start_link/0,
+ start_child/1,
+ terminate_child/1,
+ count_children/0,
+ init/1
+]).
+
+
+-include("mem3_reshard.hrl").
+
+
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+
+start_child(Job) ->
+ supervisor:start_child(?MODULE, [Job]).
+
+
+terminate_child(Pid) ->
+ supervisor:terminate_child(?MODULE, Pid).
+
+
+count_children() ->
+ Props = supervisor:count_children(?MODULE),
+ proplists:get_value(active, Props).
+
+
+init(_Args) ->
+ Children = [
+ {mem3_reshard_job,
+ {mem3_reshard_job, start_link, []},
+ temporary,
+ 60000,
+ worker,
+ [mem3_reshard_job]}
+ ],
+ {ok, {{simple_one_for_one, 10, 3}, Children}}.
diff --git a/src/mem3/src/mem3_reshard_sup.erl b/src/mem3/src/mem3_reshard_sup.erl
new file mode 100644
index 000000000..6349a4041
--- /dev/null
+++ b/src/mem3/src/mem3_reshard_sup.erl
@@ -0,0 +1,47 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_reshard_sup).
+
+-behaviour(supervisor).
+
+-export([
+ start_link/0,
+ init/1
+]).
+
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+
+init(_Args) ->
+ Children = [
+ {mem3_reshard_dbdoc,
+ {mem3_reshard_dbdoc, start_link, []},
+ permanent,
+ infinity,
+ worker,
+ [mem3_reshard_dbdoc]},
+ {mem3_reshard_job_sup,
+ {mem3_reshard_job_sup, start_link, []},
+ permanent,
+ infinity,
+ supervisor,
+ [mem3_reshard_job_sup]},
+ {mem3_reshard,
+ {mem3_reshard, start_link, []},
+ permanent,
+ brutal_kill,
+ worker,
+ [mem3_reshard]}
+ ],
+ {ok, {{one_for_all, 5, 5}, Children}}.
diff --git a/src/mem3/src/mem3_sup.erl b/src/mem3/src/mem3_sup.erl
index 0adaf51e0..3a1a3ca5a 100644
--- a/src/mem3/src/mem3_sup.erl
+++ b/src/mem3/src/mem3_sup.erl
@@ -25,12 +25,16 @@ init(_Args) ->
child(mem3_sync_nodes), % Order important?
child(mem3_sync),
child(mem3_shards),
- child(mem3_sync_event_listener)
+ child(mem3_sync_event_listener),
+ child(mem3_reshard_sup)
],
{ok, {{one_for_one,10,1}, couch_epi:register_service(mem3_epi, Children)}}.
child(mem3_events) ->
MFA = {gen_event, start_link, [{local, mem3_events}]},
{mem3_events, MFA, permanent, 1000, worker, dynamic};
+child(mem3_reshard_sup = Child) ->
+ MFA = {Child, start_link, []},
+ {Child, MFA, permanent, infinity, supervisor, [Child]};
child(Child) ->
{Child, {Child, start_link, []}, permanent, 1000, worker, [Child]}.