diff options
Diffstat (limited to 'src/mem3/src/mem3_reshard.erl')
-rw-r--r-- | src/mem3/src/mem3_reshard.erl | 918 |
1 files changed, 0 insertions, 918 deletions
diff --git a/src/mem3/src/mem3_reshard.erl b/src/mem3/src/mem3_reshard.erl deleted file mode 100644 index 620b1bc73..000000000 --- a/src/mem3/src/mem3_reshard.erl +++ /dev/null @@ -1,918 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(mem3_reshard). - - --behaviour(gen_server). - - --export([ - start_link/0, - - start/0, - stop/1, - - start_split_job/1, - stop_job/2, - resume_job/1, - remove_job/1, - - get_state/0, - jobs/0, - job/1, - is_disabled/0, - - report/2, - checkpoint/2, - - now_sec/0, - update_history/4, - shard_from_name/1, - reset_state/0 -]). - --export([ - init/1, - terminate/2, - handle_call/3, - handle_cast/2, - handle_info/2, - code_change/3 -]). - - --include("mem3_reshard.hrl"). - - --define(JOB_ID_VERSION, 1). --define(JOB_STATE_VERSION, 1). --define(DEFAULT_MAX_JOBS, 48). --define(DEFAULT_MAX_HISTORY, 20). --define(JOB_PREFIX, <<"reshard-job-">>). --define(STATE_PREFIX, <<"reshard-state-">>). - - -%% Public API - --spec start_link() -> {ok, pid()} | ignore | {error, term()}. -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - - --spec start() -> ok | {error, any()}. -start() -> - case is_disabled() of - true -> {error, resharding_disabled}; - false -> gen_server:call(?MODULE, start, infinity) - end. - - --spec stop(binary()) -> ok | {error, any()}. -stop(Reason) -> - case is_disabled() of - true -> {error, resharding_disabled}; - false -> gen_server:call(?MODULE, {stop, Reason}, infinity) - end. - - --spec start_split_job(#shard{} | binary()) -> {ok, binary()} | {error, term()}. -start_split_job(#shard{} = Shard) -> - start_split_job(Shard, 2); - -start_split_job(ShardName) when is_binary(ShardName) -> - start_split_job(shard_from_name(ShardName), 2). - - --spec start_split_job(#shard{}, split()) -> {ok, binary()} | {error, any()}. -start_split_job(#shard{} = Source, Split) -> - case is_disabled() of - true -> {error, resharding_disabled}; - false -> validate_and_start_job(Source, Split) - end. - - --spec stop_job(binary(), binary()) -> ok | {error, any()}. -stop_job(JobId, Reason) when is_binary(JobId), is_binary(Reason) -> - case is_disabled() of - true -> {error, resharding_disabled}; - false -> gen_server:call(?MODULE, {stop_job, JobId, Reason}, infinity) - end. - - --spec resume_job(binary()) -> ok | {error, any()}. -resume_job(JobId) when is_binary(JobId) -> - case is_disabled() of - true -> {error, resharding_disabled}; - false -> gen_server:call(?MODULE, {resume_job, JobId}, infinity) - end. - - --spec remove_job(binary()) -> ok | {error, any()}. -remove_job(JobId) when is_binary(JobId) -> - case is_disabled() of - true -> {error, resharding_disabled}; - false -> gen_server:call(?MODULE, {remove_job, JobId}, infinity) - end. - - --spec get_state() -> {[_ | _]}. -get_state() -> - gen_server:call(?MODULE, get_state, infinity). - - --spec jobs() -> [[tuple()]]. -jobs() -> - ets:foldl(fun(Job, Acc) -> - Opts = [iso8601], - Props = mem3_reshard_store:job_to_ejson_props(Job, Opts), - [{Props} | Acc] - end, [], ?MODULE). - - --spec job(job_id()) -> {ok, {[_ | _]}} | {error, not_found}. -job(JobId) -> - case job_by_id(JobId) of - #job{} = Job -> - Opts = [iso8601], - Props = mem3_reshard_store:job_to_ejson_props(Job, Opts), - {ok, {Props}}; - not_found -> - {error, not_found} - end. - - -% Return true resharding is disabled in the application level settings --spec is_disabled() -> boolean(). -is_disabled() -> - case application:get_env(mem3, reshard_disabled) of - {ok, "true"} -> true; - {ok, true} -> true; - _ -> false - end. - - -% State reporting callbacks. Used by mem3_reshard_job module. --spec report(pid(), #job{}) -> ok. -report(Server, #job{} = Job) when is_pid(Server) -> - gen_server:cast(Server, {report, Job}). - - --spec checkpoint(pid(), #job{}) -> ok. -checkpoint(Server, #job{} = Job) -> - couch_log:notice("~p checkpointing ~p ~p", [?MODULE, Server, jobfmt(Job)]), - gen_server:cast(Server, {checkpoint, Job}). - - -% Utility functions used from other mem3_reshard modules - --spec now_sec() -> non_neg_integer(). -now_sec() -> - {Mega, Sec, _Micro} = os:timestamp(), - Mega * 1000000 + Sec. - - --spec update_history(atom(), binary() | null, time_sec(), list()) -> list(). -update_history(State, State, Ts, History) -> - % State is the same as detail. Make the detail null to avoid duplication - update_history(State, null, Ts, History); - -update_history(State, Detail, Ts, History) -> - % Reverse, so we can process the last event as the head using - % head matches, then after append and trimming, reserse again - Rev = lists:reverse(History), - UpdatedRev = update_history_rev(State, Detail, Ts, Rev), - TrimmedRev = lists:sublist(UpdatedRev, max_history()), - lists:reverse(TrimmedRev). - - --spec shard_from_name(binary()) -> #shard{}. -shard_from_name(<<"shards/", _:8/binary, "-", _:8/binary, "/", - Rest/binary>> = Shard) -> - Range = mem3:range(Shard), - [DbName, Suffix] = binary:split(Rest, <<".">>), - build_shard(Range, DbName, Suffix). - - -% For debugging only - --spec reset_state() -> ok. -reset_state() -> - gen_server:call(?MODULE, reset_state, infinity). - - -% Gen server functions - -init(_) -> - % Advertise resharding API feature only if it is not disabled - case is_disabled() of - true -> ok; - false -> config:enable_feature('reshard') - end, - couch_log:notice("~p start init()", [?MODULE]), - EtsOpts = [named_table, {keypos, #job.id}, {read_concurrency, true}], - ?MODULE = ets:new(?MODULE, EtsOpts), - ManagerPid = self(), - State = #state{ - state = running, - state_info = [], - update_time = now_sec(), - node = node(), - db_monitor = spawn_link(fun() -> db_monitor(ManagerPid) end) - }, - State1 = mem3_reshard_store:init(State, ?JOB_PREFIX, state_id()), - State2 = mem3_reshard_store:load_state(State1, running), - State3 = maybe_disable(State2), - gen_server:cast(self(), reload_jobs), - {ok, State3}. - - -terminate(Reason, State) -> - couch_log:notice("~p terminate ~p ~p", [?MODULE, Reason, statefmt(State)]), - catch unlink(State#state.db_monitor), - catch exit(State#state.db_monitor, kill), - lists:foreach(fun(Job) -> kill_job_int(Job) end, running_jobs()). - - -handle_call(start, _From, #state{state = stopped} = State) -> - State1 = State#state{ - state = running, - update_time = now_sec(), - state_info = info_delete(reason, State#state.state_info) - }, - ok = mem3_reshard_store:store_state(State1), - State2 = maybe_disable(State1), - State3 = reload_jobs(State2), - {reply, ok, State3}; - -handle_call(start, _From, State) -> - {reply, ok, State}; - -handle_call({stop, Reason}, _From, #state{state = running} = State) -> - State1 = State#state{ - state = stopped, - update_time = now_sec(), - state_info = info_update(reason, Reason, State#state.state_info) - }, - ok = mem3_reshard_store:store_state(State1), - lists:foreach(fun(Job) -> temporarily_stop_job(Job) end, running_jobs()), - {reply, ok, State1}; - -handle_call({stop, _}, _From, State) -> - {reply, ok, State}; - -handle_call({start_job, #job{id = Id, source = Source} = Job}, _From, State) -> - couch_log:notice("~p start_job call ~p", [?MODULE, jobfmt(Job)]), - Total = ets:info(?MODULE, size), - SourceOk = mem3_reshard_validate:source(Source), - case {job_by_id(Id), Total + 1 =< get_max_jobs(), SourceOk} of - {not_found, true, ok} -> - handle_start_job(Job, State); - {#job{}, _, _} -> - {reply, {error, job_already_exists}, State}; - {_, false, _} -> - {reply, {error, max_jobs_exceeded}, State}; - {_, _, {error, _} = SourceError} -> - {reply, SourceError, State} - end; - -handle_call({resume_job, _}, _From, #state{state = stopped} = State) -> - case couch_util:get_value(reason, State#state.state_info) of - undefined -> - {reply, {error, stopped}, State}; - Reason -> - {reply, {error, {stopped, Reason}}, State} - end; - -handle_call({resume_job, Id}, _From, State) -> - couch_log:notice("~p resume_job call ~p", [?MODULE, Id]), - case job_by_id(Id) of - #job{job_state = stopped} = Job -> - case start_job_int(Job, State) of - ok -> - {reply, ok, State}; - {error, Error} -> - {reply, {error, Error}, State} - end; - #job{} -> - {reply, ok, State}; - not_found -> - {reply, {error, not_found}, State} - end; - -handle_call({stop_job, Id, Reason}, _From, State) -> - couch_log:notice("~p stop_job Id:~p Reason:~p", [?MODULE, Id, Reason]), - case job_by_id(Id) of - #job{job_state = JSt} = Job when JSt =:= running orelse JSt =:= new - orelse JSt =:= stopped -> - ok = stop_job_int(Job, stopped, Reason, State), - {reply, ok, State}; - #job{} -> - {reply, ok, State}; - not_found -> - {reply, {error, not_found}, State} - end; - -handle_call({remove_job, Id}, _From, State) -> - {reply, remove_job_int(Id, State), State}; - -handle_call(get_state, _From, #state{state = GlobalState} = State) -> - StateProps = mem3_reshard_store:state_to_ejson_props(State), - Stats0 = #{running => 0, completed => 0, failed => 0, stopped => 0}, - StateStats = ets:foldl(fun(#job{job_state = JS}, Acc) -> - % When jobs are disabled globally their state is not checkpointed as - % "stopped", but it stays as "running". But when returning the state we - % don't want to mislead and indicate that there are "N running jobs" - % when the global state is "stopped". - JS1 = case GlobalState =:= stopped andalso JS =:= running of - true -> stopped; - false -> JS - end, - Acc#{JS1 => maps:get(JS1, Acc, 0) + 1} - end, Stats0, ?MODULE), - Total = ets:info(?MODULE, size), - StateStats1 = maps:to_list(StateStats) ++ [{total, Total}], - Result = {lists:sort(StateProps ++ StateStats1)}, - {reply, Result, State}; - -handle_call(reset_state, _From, State) -> - {reply, ok, reset_state(State)}; - -handle_call(Call, From, State) -> - couch_log:error("~p unknown call ~p from: ~p", [?MODULE, Call, From]), - {noreply, State}. - - -handle_cast({db_deleted, DbName}, State) -> - % Remove only completed jobs. Other running states would `fail` but - % job result would stick around so users can inspect them. - JobIds = jobs_by_db_and_state(DbName, completed), - [remove_job_int(JobId, State) || JobId <- JobIds], - {noreply, State}; - -handle_cast({report, Job}, State) -> - report_int(Job), - {noreply, State}; - -handle_cast({checkpoint, Job}, State) -> - {noreply, checkpoint_int(Job, State)}; - -handle_cast(reload_jobs, State) -> - couch_log:notice("~p starting reloading jobs", [?MODULE]), - State1 = reload_jobs(State), - couch_log:notice("~p finished reloading jobs", [?MODULE]), - {noreply, State1}; - -handle_cast(Cast, State) -> - couch_log:error("~p unexpected cast ~p", [?MODULE, Cast]), - {noreply, State}. - - -handle_info({'DOWN', _Ref, process, Pid, Info}, State) -> - case job_by_pid(Pid) of - {ok, Job} -> - couch_log:notice("~p job ~s exit ~p", [?MODULE, Job#job.id, Info]), - ok = handle_job_exit(Job, Info, State); - {error, not_found} -> - couch_log:error("~p job not found: ~p ~p", [?MODULE, Pid, Info]) - end, - {noreply, State}; - -handle_info(Info, State) -> - couch_log:error("~p unexpected info ~p", [?MODULE, Info]), - {noreply, State}. - - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - - -%% Private API - -validate_and_start_job(#shard{} = Source, Split) -> - case mem3_reshard_validate:start_args(Source, Split) of - ok -> - Target = target_shards(Source, Split), - case mem3_reshard_validate:targets(Source, Target) of - ok -> - TStamp = now_sec(), - Job = #job{ - type = split, - job_state = new, - split_state = new, - start_time = TStamp, - update_time = TStamp, - node = node(), - source = Source, - target = Target - }, - Job1 = Job#job{id = job_id(Job)}, - Job2 = update_job_history(Job1), - gen_server:call(?MODULE, {start_job, Job2}, infinity); - {error, Error} -> - {error, Error} - end; - {error, Error} -> - {error, Error} - end. - - -handle_start_job(#job{} = Job, #state{state = running} = State) -> - case start_job_int(Job, State) of - ok -> - {reply, {ok, Job#job.id}, State}; - {error, Error} -> - {reply, {error, Error}, State} - end; - -handle_start_job(#job{} = Job, #state{state = stopped} = State) -> - ok = mem3_reshard_store:store_job(State, Job), - % Since resharding is stopped on this node, the job is temporarily marked - % as stopped in the ets table so as not to return a "running" result which - % would look odd. - temporarily_stop_job(Job), - {reply, {ok, Job#job.id}, State}. - - -% Insert job in the ets table as a temporarily stopped job. This would happen -% when a job is reloaded or added when node-wide resharding is stopped. --spec temporarily_stop_job(#job{}) -> #job{}. -temporarily_stop_job(Job) -> - Job1 = kill_job_int(Job), - OldInfo = Job1#job.state_info, - Reason = <<"Shard splitting disabled">>, - Job2 = Job1#job{ - job_state = stopped, - update_time = now_sec(), - start_time = 0, - state_info = info_update(reason, Reason, OldInfo), - pid = undefined, - ref = undefined - }, - Job3 = update_job_history(Job2), - true = ets:insert(?MODULE, Job3), - Job3. - - --spec reload_jobs(#state{}) -> #state{}. -reload_jobs(State) -> - Jobs = mem3_reshard_store:get_jobs(State), - lists:foldl(fun reload_job/2, State, Jobs). - - -% This is a case when main application is stopped but a job is reloaded that -% was checkpointed in running state. Set that state to stopped to avoid the API -% results looking odd. --spec reload_job(#job{}, #state{}) -> #state{}. -reload_job(#job{job_state = JS} = Job, #state{state = stopped} = State) - when JS =:= running orelse JS =:= new -> - temporarily_stop_job(Job), - State; - -% This is a case when a job process should be spawend -reload_job(#job{job_state = JS} = Job, #state{state = running} = State) - when JS =:= running orelse JS =:= new -> - case start_job_int(Job, State) of - ok -> - State; - {error, Error} -> - Msg = "~p could not resume ~s error: ~p", - couch_log:error(Msg, [?MODULE, jobfmt(Job), Error]), - State - end; - -% If job is disabled individually (stopped by the user), is completed or failed -% then simply load it into the ets table -reload_job(#job{job_state = JS} = Job, #state{} = State) - when JS =:= failed orelse JS =:= completed orelse JS =:= stopped -> - true = ets:insert(?MODULE, Job), - State. - - --spec get_max_jobs() -> integer(). -get_max_jobs() -> - config:get_integer("reshard", "max_jobs", ?DEFAULT_MAX_JOBS). - - --spec start_job_int(#job{}, #state{}) -> ok | {error, term()}. -start_job_int(Job, State) -> - case spawn_job(Job) of - {ok, #job{} = Job1} -> - Job2 = update_job_history(Job1), - ok = mem3_reshard_store:store_job(State, Job2), - true = ets:insert(?MODULE, Job2), - ok; - {error, Error} -> - {error, Error} - end. - - --spec spawn_job(#job{}) -> {ok, pid()} | {error, term()}. -spawn_job(#job{} = Job0) -> - Job = Job0#job{ - job_state = running, - start_time = 0, - update_time = now_sec(), - state_info = info_delete(reason, Job0#job.state_info), - manager = self(), - workers = [], - retries = 0 - }, - case mem3_reshard_job_sup:start_child(Job) of - {ok, Pid} -> - Ref = monitor(process, Pid), - {ok, Job#job{pid = Pid, ref = Ref}}; - {error, Reason} -> - {error, Reason} - end. - - --spec stop_job_int(#job{}, job_state(), term(), #state{}) -> ok. -stop_job_int(#job{} = Job, JobState, Reason, State) -> - couch_log:info("~p stop_job_int ~p newstate: ~p reason:~p", [?MODULE, - jobfmt(Job), JobState, Reason]), - Job1 = kill_job_int(Job), - Job2 = Job1#job{ - job_state = JobState, - update_time = now_sec(), - state_info = [{reason, Reason}] - }, - ok = mem3_reshard_store:store_job(State, Job2), - true = ets:insert(?MODULE, Job2), - couch_log:info("~p stop_job_int stopped ~p", [?MODULE, jobfmt(Job2)]), - ok. - - --spec kill_job_int(#job{}) -> #job{}. -kill_job_int(#job{pid = undefined} = Job) -> - Job; - -kill_job_int(#job{pid = Pid, ref = Ref} = Job) -> - couch_log:info("~p kill_job_int ~p", [?MODULE, jobfmt(Job)]), - demonitor(Ref, [flush]), - case erlang:is_process_alive(Pid) of - true -> - ok = mem3_reshard_job_sup:terminate_child(Pid); - false -> - ok - end, - Job1 = Job#job{pid = undefined, ref = undefined}, - true = ets:insert(?MODULE, Job1), - Job1. - - --spec handle_job_exit(#job{}, term(), #state{}) -> ok. -handle_job_exit(#job{split_state = completed} = Job, normal, State) -> - couch_log:notice("~p completed job ~s exited", [?MODULE, Job#job.id]), - Job1 = Job#job{ - pid = undefined, - ref = undefined, - job_state = completed, - update_time = now_sec(), - state_info = [] - }, - Job2 = update_job_history(Job1), - ok = mem3_reshard_store:store_job(State, Job2), - true = ets:insert(?MODULE, Job2), - ok; - -handle_job_exit(#job{job_state = running} = Job, normal, _State) -> - couch_log:notice("~p running job ~s stopped", [?MODULE, Job#job.id]), - OldInfo = Job#job.state_info, - Job1 = Job#job{ - pid = undefined, - ref = undefined, - job_state = stopped, - update_time = now_sec(), - state_info = info_update(reason, <<"Job stopped">>, OldInfo) - }, - true = ets:insert(?MODULE, update_job_history(Job1)), - ok; - -handle_job_exit(#job{job_state = running} = Job, shutdown, _State) -> - couch_log:notice("~p job ~s shutdown", [?MODULE, Job#job.id]), - OldInfo = Job#job.state_info, - Job1 = Job#job{ - pid = undefined, - ref = undefined, - job_state = stopped, - update_time = now_sec(), - state_info = info_update(reason, <<"Job shutdown">>, OldInfo) - }, - true = ets:insert(?MODULE, update_job_history(Job1)), - ok; - -handle_job_exit(#job{job_state = running} = Job, {shutdown, Msg}, _State) -> - couch_log:notice("~p job ~s shutdown ~p", [?MODULE, Job#job.id, Msg]), - OldInfo = Job#job.state_info, - Job1 = Job#job{ - pid = undefined, - ref = undefined, - job_state = stopped, - update_time = now_sec(), - state_info = info_update(reason, <<"Job shutdown">>, OldInfo) - }, - true = ets:insert(?MODULE, update_job_history(Job1)), - ok; - -handle_job_exit(#job{} = Job, Error, State) -> - couch_log:notice("~p job ~s failed ~p", [?MODULE, Job#job.id, Error]), - OldInfo = Job#job.state_info, - Job1 = Job#job{ - pid = undefined, - ref = undefined, - job_state = failed, - update_time = now_sec(), - state_info = info_update(reason, Error, OldInfo) - }, - Job2 = update_job_history(Job1), - ok = mem3_reshard_store:store_job(State, Job2), - true = ets:insert(?MODULE, Job2), - ok. - - --spec job_by_id(job_id()) -> #job{} | not_found. -job_by_id(Id) -> - case ets:lookup(?MODULE, Id) of - [] -> - not_found; - [#job{} = Job] -> - Job - end. - - --spec job_by_pid(pid()) -> {ok, #job{}} | {error, not_found}. -job_by_pid(Pid) when is_pid(Pid) -> - case ets:match_object(?MODULE, #job{pid=Pid, _='_'}) of - [] -> - {error, not_found}; - [#job{} = Job] -> - {ok, Job} - end. - - --spec state_id() -> binary(). -state_id() -> - Ver = iolist_to_binary(io_lib:format("~3..0B", [?JOB_STATE_VERSION])), - <<?STATE_PREFIX/binary, Ver/binary>>. - - --spec job_id(#job{}) -> binary(). -job_id(#job{source = #shard{name = SourceName}}) -> - HashInput = [SourceName, atom_to_binary(node(), utf8)], - IdHashList = couch_util:to_hex(crypto:hash(sha256, HashInput)), - IdHash = iolist_to_binary(IdHashList), - Prefix = iolist_to_binary(io_lib:format("~3..0B", [?JOB_ID_VERSION])), - <<Prefix/binary, "-", IdHash/binary>>. - - --spec target_shards(#shard{}, split()) -> [#shard{}]. -target_shards(#shard{name = Name, range = [B, E], dbname = DbName}, Split) when - is_integer(Split), Split >= 2, (E - B + 1) >= Split -> - Ranges = target_ranges([B, E], Split), - <<"shards/", _:8/binary, "-", _:8/binary, "/", DbAndSuffix/binary>> = Name, - [DbName, Suffix] = binary:split(DbAndSuffix, <<".">>), - [build_shard(R, DbName, Suffix) || R <- Ranges]. - - --spec target_ranges([range_pos()], split()) -> [[range_pos()]]. -target_ranges([Begin, End], Split) when (End - Begin + 1) >= Split, - Split >=2 -> - Len = End - Begin + 1, % + 1 since intervals are inclusive - NewLen = Len div Split, - Rem = Len rem Split, - Ranges = [[I, I + NewLen - 1] || I <- lists:seq(Begin, End - Rem, NewLen)], - % Adjust last end to always match the original end to ensure we always - % cover the whole range. In case when remainder is larger this will make - % the last range larger. Improve the algorithm later to re-distribute - % the remainder equally amonst the chunks. - {BeforeLast, [[BeginLast, _]]} = lists:split(Split - 1, Ranges), - BeforeLast ++ [[BeginLast, End]]. - - --spec build_shard([non_neg_integer()], binary(), binary()) -> #shard{}. -build_shard(Range, DbName, Suffix) -> - Shard = #shard{dbname = DbName, range = Range, node = node()}, - mem3_util:name_shard(Shard, <<".", Suffix/binary>>). - - --spec running_jobs() -> [#job{}]. -running_jobs() -> - Pat = #job{job_state = running, _ = '_'}, - ets:match_object(?MODULE, Pat). - - --spec info_update(atom(), any(), [tuple()]) -> [tuple()]. -info_update(Key, Val, StateInfo) -> - lists:keystore(Key, 1, StateInfo, {Key, Val}). - - --spec info_delete(atom(), [tuple()]) -> [tuple()]. -info_delete(Key, StateInfo) -> - lists:keydelete(Key, 1, StateInfo). - - --spec checkpoint_int(#job{}, #state{}) -> #state{}. -checkpoint_int(#job{} = Job, State) -> - couch_log:debug("~p checkpoint ~s", [?MODULE, jobfmt(Job)]), - case report_int(Job) of - ok -> - ok = mem3_reshard_store:store_job(State, Job), - ok = mem3_reshard_job:checkpoint_done(Job), - State; - not_found -> - couch_log:error("~p checkpoint couldn't find ~p", [?MODULE, Job]), - State - end. - - --spec report_int(#job{}) -> ok | not_found. -report_int(Job) -> - case ets:lookup(?MODULE, Job#job.id) of - [#job{ref = Ref, pid = CurPid}] -> - case Job#job.pid =:= CurPid of - true -> - couch_log:debug("~p reported ~s", [?MODULE, jobfmt(Job)]), - % Carry over the reference from ets as the #job{} coming - % from the job process won't have it's own monitor ref. - true = ets:insert(?MODULE, Job#job{ref = Ref}), - ok; - false -> - LogMsg = "~p ignoring old job report ~p curr pid:~p", - couch_log:warning(LogMsg, [?MODULE, jobfmt(Job), CurPid]), - not_found - end; - _ -> - couch_log:error("~p reporting : couldn't find ~p", [?MODULE, Job]), - not_found - end. - - --spec remove_job_int(#job{}, #state{}) -> ok | {error, not_found}. -remove_job_int(Id, State) -> - couch_log:notice("~p call remove_job Id:~p", [?MODULE, Id]), - case job_by_id(Id) of - #job{} = Job -> - kill_job_int(Job), - ok = mem3_reshard_store:delete_job(State, Id), - ets:delete(?MODULE, Job#job.id), - ok; - not_found -> - {error, not_found} - end. - - -% This function is for testing and debugging only --spec reset_state(#state{}) -> #state{}. -reset_state(#state{} = State) -> - couch_log:warning("~p resetting state", [?MODULE]), - ok = mem3_reshard_store:delete_state(State), - couch_log:warning("~p killing all running jobs", [?MODULE]), - [kill_job_int(Job) || Job <- running_jobs()], - ets:delete_all_objects(?MODULE), - couch_log:warning("~p resetting all job states", [?MODULE]), - Jobs = mem3_reshard_store:get_jobs(State), - lists:foldl(fun(#job{id = Id}, StateAcc) -> - couch_log:warning("~p resetting job state ~p", [?MODULE, Id]), - ok = mem3_reshard_store:delete_job(StateAcc, Id), - StateAcc - end, State, Jobs), - couch_log:warning("~p resetting state done", [?MODULE]), - State#state{ - state = running, - state_info = [], - update_time = now_sec() - }. - - --spec update_job_history(#job{}) -> #job{}. -update_job_history(#job{job_state = St, update_time = Ts} = Job) -> - Hist = Job#job.history, - Reason = case couch_util:get_value(reason, Job#job.state_info) of - undefined -> null; - Val -> couch_util:to_binary(Val) - end, - Job#job{history = update_history(St, Reason, Ts, Hist)}. - - -update_history_rev(State, null, Ts, [{_, State, Detail} | Rest]) -> - % Just updated the detail, state stays the same, no new entry added - [{Ts, State, Detail} | Rest]; - -update_history_rev(State, Detail, Ts, [{_, State, Detail} | Rest]) -> - % State and detail were same as last event, just update the timestamp - [{Ts, State, Detail} | Rest]; - -update_history_rev(State, Detail, Ts, [{_, State, Detail} | Rest]) -> - % State and detail were same as last event, just update the timestamp - [{Ts, State, Detail} | Rest]; - -update_history_rev(State, Detail, Ts, History) -> - [{Ts, State, Detail} | History]. - - --spec max_history() -> non_neg_integer(). -max_history() -> - config:get_integer("reshard", "max_history", ?DEFAULT_MAX_HISTORY). - - --spec maybe_disable(#state{}) -> #state{}. -maybe_disable(#state{} = State) -> - case is_disabled() of - true -> - Reason = <<"Resharding disabled by application level config">>, - SInfo = State#state.state_info, - State#state{ - state = stopped, - state_info = info_update(reason, Reason, SInfo) - }; - false -> - State - end. - - --spec jobs_by_db_and_state(binary(), split_state() | '_') -> [job_id()]. -jobs_by_db_and_state(Db, State) -> - DbName = mem3:dbname(Db), - Pat = #job{ - id = '$1', - source =#shard{dbname = DbName, _ = '_'}, - job_state = State, - _ = '_' - }, - [JobId || [JobId] <- ets:match(?MODULE, Pat)]. - - --spec db_exists(binary()) -> boolean(). -db_exists(Name) -> - try - mem3:shards(mem3:dbname(Name)), - true - catch - error:database_does_not_exist -> - false - end. - - --spec db_monitor(pid()) -> no_return(). -db_monitor(Server) -> - couch_log:notice("~p db monitor ~p starting", [?MODULE, self()]), - EvtRef = erlang:monitor(process, couch_event_server), - couch_event:register_all(self()), - db_monitor_loop(Server, EvtRef). - - --spec db_monitor_loop(pid(), reference()) -> no_return(). -db_monitor_loop(Server, EvtRef) -> - receive - {'$couch_event', DbName, deleted} -> - case db_exists(DbName) of - true -> - % Could be source shard being deleted during splitting - ok; - false -> - case length(jobs_by_db_and_state(DbName, '_')) > 0 of - true -> - % Notify only if there are jobs with that db - gen_server:cast(Server, {db_deleted, DbName}); - false -> - ok - end - end, - db_monitor_loop(Server, EvtRef); - {'$couch_event', _, _} -> - db_monitor_loop(Server, EvtRef); - {'DOWN', EvtRef, _, _, Info} -> - couch_log:error("~p db monitor listener died ~p", [?MODULE, Info]), - exit({db_monitor_died, Info}); - Msg -> - couch_log:error("~p db monitor unexpected msg ~p", [?MODULE, Msg]), - db_monitor_loop(Server, EvtRef) - end. - - --spec statefmt(#state{} | term()) -> string(). -statefmt(#state{state = StateName}) -> - Total = ets:info(?MODULE, size), - Active = mem3_reshard_job_sup:count_children(), - Msg = "#state{~s total:~B active:~B}", - Fmt = io_lib:format(Msg, [StateName, Total, Active]), - lists:flatten(Fmt); - -statefmt(State) -> - Fmt = io_lib:format("<Unknown split state:~p>", [State]), - lists:flatten(Fmt). - - --spec jobfmt(#job{}) -> string(). -jobfmt(#job{} = Job) -> - mem3_reshard_job:jobfmt(Job). |