-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
- start_link/0,
- start/0,
- stop/1,
- start_split_job/1,
- stop_job/2,
- resume_job/1,
- remove_job/1,
- get_state/0,
- jobs/0,
- job/1,
- is_disabled/0,
- report/2,
- checkpoint/2,
- now_sec/0,
- update_history/4,
- shard_from_name/1,
- reset_state/0
- init/1,
- terminate/2,
- handle_call/3,
- handle_cast/2,
- handle_info/2,
- code_change/3
--define(JOB_ID_VERSION, 1).
--define(JOB_STATE_VERSION, 1).
--define(DEFAULT_MAX_JOBS, 48).
--define(DEFAULT_MAX_HISTORY, 20).
--define(JOB_PREFIX, <<"reshard-job-">>).
--define(STATE_PREFIX, <<"reshard-state-">>).
-%% Public API
--spec start_link() -> {ok, pid()} | ignore | {error, term()}.
-start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
--spec start() -> ok | {error, any()}.
-start() ->
- case is_disabled() of
- true -> {error, resharding_disabled};
- false -> gen_server:call(?MODULE, start, infinity)
- end.
--spec stop(binary()) -> ok | {error, any()}.
-stop(Reason) ->
- case is_disabled() of
- true -> {error, resharding_disabled};
- false -> gen_server:call(?MODULE, {stop, Reason}, infinity)
- end.
--spec start_split_job(#shard{} | binary()) -> {ok, binary()} | {error, term()}.
-start_split_job(#shard{} = Shard) ->
- start_split_job(Shard, 2);
-start_split_job(ShardName) when is_binary(ShardName) ->
- start_split_job(shard_from_name(ShardName), 2).
--spec start_split_job(#shard{}, split()) -> {ok, binary()} | {error, any()}.
-start_split_job(#shard{} = Source, Split) ->
- case is_disabled() of
- true -> {error, resharding_disabled};
- false -> validate_and_start_job(Source, Split)
- end.
--spec stop_job(binary(), binary()) -> ok | {error, any()}.
-stop_job(JobId, Reason) when is_binary(JobId), is_binary(Reason) ->
- case is_disabled() of
- true -> {error, resharding_disabled};
- false -> gen_server:call(?MODULE, {stop_job, JobId, Reason}, infinity)
- end.
--spec resume_job(binary()) -> ok | {error, any()}.
-resume_job(JobId) when is_binary(JobId) ->
- case is_disabled() of
- true -> {error, resharding_disabled};
- false -> gen_server:call(?MODULE, {resume_job, JobId}, infinity)
- end.
--spec remove_job(binary()) -> ok | {error, any()}.
-remove_job(JobId) when is_binary(JobId) ->
- case is_disabled() of
- true -> {error, resharding_disabled};
- false -> gen_server:call(?MODULE, {remove_job, JobId}, infinity)
- end.
--spec get_state() -> {[_ | _]}.
-get_state() ->
- gen_server:call(?MODULE, get_state, infinity).
--spec jobs() -> [[tuple()]].
-jobs() ->
- ets:foldl(fun(Job, Acc) ->
- Opts = [iso8601],
- Props = mem3_reshard_store:job_to_ejson_props(Job, Opts),
- [{Props} | Acc]
- end, [], ?MODULE).
--spec job(job_id()) -> {ok, {[_ | _]}} | {error, not_found}.
-job(JobId) ->
- case job_by_id(JobId) of
- #job{} = Job ->
- Opts = [iso8601],
- Props = mem3_reshard_store:job_to_ejson_props(Job, Opts),
- {ok, {Props}};
- not_found ->
- {error, not_found}
- end.
-% Return true resharding is disabled in the application level settings
--spec is_disabled() -> boolean().
-is_disabled() ->
- case application:get_env(mem3, reshard_disabled) of
- {ok, "true"} -> true;
- {ok, true} -> true;
- _ -> false
- end.
-% State reporting callbacks. Used by mem3_reshard_job module.
--spec report(pid(), #job{}) -> ok.
-report(Server, #job{} = Job) when is_pid(Server) ->
- gen_server:cast(Server, {report, Job}).
--spec checkpoint(pid(), #job{}) -> ok.
-checkpoint(Server, #job{} = Job) ->
- couch_log:notice("~p checkpointing ~p ~p", [?MODULE, Server, jobfmt(Job)]),
- gen_server:cast(Server, {checkpoint, Job}).
-% Utility functions used from other mem3_reshard modules
--spec now_sec() -> non_neg_integer().
-now_sec() ->
- {Mega, Sec, _Micro} = os:timestamp(),
- Mega * 1000000 + Sec.
--spec update_history(atom(), binary() | null, time_sec(), list()) -> list().
-update_history(State, State, Ts, History) ->
- % State is the same as detail. Make the detail null to avoid duplication
- update_history(State, null, Ts, History);
-update_history(State, Detail, Ts, History) ->
- % Reverse, so we can process the last event as the head using
- % head matches, then after append and trimming, reserse again
- Rev = lists:reverse(History),
- UpdatedRev = update_history_rev(State, Detail, Ts, Rev),
- TrimmedRev = lists:sublist(UpdatedRev, max_history()),
- lists:reverse(TrimmedRev).
--spec shard_from_name(binary()) -> #shard{}.
-shard_from_name(<<"shards/", _:8/binary, "-", _:8/binary, "/",
- Rest/binary>> = Shard) ->
- Range = mem3:range(Shard),
- [DbName, Suffix] = binary:split(Rest, <<".">>),
- build_shard(Range, DbName, Suffix).
-% For debugging only
--spec reset_state() -> ok.
-reset_state() ->
- gen_server:call(?MODULE, reset_state, infinity).
-% Gen server functions
-init(_) ->
- % Advertise resharding API feature only if it is not disabled
- case is_disabled() of
- true -> ok;
- false -> config:enable_feature('reshard')
- end,
- couch_log:notice("~p start init()", [?MODULE]),
- EtsOpts = [named_table, {keypos,}, {read_concurrency, true}],
- ?MODULE = ets:new(?MODULE, EtsOpts),
- ManagerPid = self(),
- State = #state{
- state = running,
- state_info = [],
- update_time = now_sec(),
- node = node(),
- db_monitor = spawn_link(fun() -> db_monitor(ManagerPid) end)
- },
- State1 = mem3_reshard_store:init(State, ?JOB_PREFIX, state_id()),
- State2 = mem3_reshard_store:load_state(State1, running),
- State3 = maybe_disable(State2),
- gen_server:cast(self(), reload_jobs),
- {ok, State3}.
-terminate(Reason, State) ->
- couch_log:notice("~p terminate ~p ~p", [?MODULE, Reason, statefmt(State)]),
- catch unlink(State#state.db_monitor),
- catch exit(State#state.db_monitor, kill),
- lists:foreach(fun(Job) -> kill_job_int(Job) end, running_jobs()).
-handle_call(start, _From, #state{state = stopped} = State) ->
- State1 = State#state{
- state = running,
- update_time = now_sec(),
- state_info = info_delete(reason, State#state.state_info)
- },
- ok = mem3_reshard_store:store_state(State1),
- State2 = maybe_disable(State1),
- State3 = reload_jobs(State2),
- {reply, ok, State3};
-handle_call(start, _From, State) ->
- {reply, ok, State};
-handle_call({stop, Reason}, _From, #state{state = running} = State) ->
- State1 = State#state{
- state = stopped,
- update_time = now_sec(),
- state_info = info_update(reason, Reason, State#state.state_info)
- },
- ok = mem3_reshard_store:store_state(State1),
- lists:foreach(fun(Job) -> temporarily_stop_job(Job) end, running_jobs()),
- {reply, ok, State1};
-handle_call({stop, _}, _From, State) ->
- {reply, ok, State};
-handle_call({start_job, #job{id = Id, source = Source} = Job}, _From, State) ->
- couch_log:notice("~p start_job call ~p", [?MODULE, jobfmt(Job)]),
- Total = ets:info(?MODULE, size),
- SourceOk = mem3_reshard_validate:source(Source),
- case {job_by_id(Id), Total + 1 =< get_max_jobs(), SourceOk} of
- {not_found, true, ok} ->
- handle_start_job(Job, State);
- {#job{}, _, _} ->
- {reply, {error, job_already_exists}, State};
- {_, false, _} ->
- {reply, {error, max_jobs_exceeded}, State};
- {_, _, {error, _} = SourceError} ->
- {reply, SourceError, State}
- end;
-handle_call({resume_job, _}, _From, #state{state = stopped} = State) ->
- case couch_util:get_value(reason, State#state.state_info) of
- undefined ->
- {reply, {error, stopped}, State};
- Reason ->
- {reply, {error, {stopped, Reason}}, State}
- end;
-handle_call({resume_job, Id}, _From, State) ->
- couch_log:notice("~p resume_job call ~p", [?MODULE, Id]),
- case job_by_id(Id) of
- #job{job_state = stopped} = Job ->
- case start_job_int(Job, State) of
- ok ->
- {reply, ok, State};
- {error, Error} ->
- {reply, {error, Error}, State}
- end;
- #job{} ->
- {reply, ok, State};
- not_found ->
- {reply, {error, not_found}, State}
- end;
-handle_call({stop_job, Id, Reason}, _From, State) ->
- couch_log:notice("~p stop_job Id:~p Reason:~p", [?MODULE, Id, Reason]),
- case job_by_id(Id) of
- #job{job_state = JSt} = Job when JSt =:= running orelse JSt =:= new
- orelse JSt =:= stopped ->
- ok = stop_job_int(Job, stopped, Reason, State),
- {reply, ok, State};
- #job{} ->
- {reply, ok, State};
- not_found ->
- {reply, {error, not_found}, State}
- end;
-handle_call({remove_job, Id}, _From, State) ->
- {reply, remove_job_int(Id, State), State};
-handle_call(get_state, _From, #state{state = GlobalState} = State) ->
- StateProps = mem3_reshard_store:state_to_ejson_props(State),
- Stats0 = #{running => 0, completed => 0, failed => 0, stopped => 0},
- StateStats = ets:foldl(fun(#job{job_state = JS}, Acc) ->
- % When jobs are disabled globally their state is not checkpointed as
- % "stopped", but it stays as "running". But when returning the state we
- % don't want to mislead and indicate that there are "N running jobs"
- % when the global state is "stopped".
- JS1 = case GlobalState =:= stopped andalso JS =:= running of
- true -> stopped;
- false -> JS
- end,
- Acc#{JS1 => maps:get(JS1, Acc, 0) + 1}
- end, Stats0, ?MODULE),
- Total = ets:info(?MODULE, size),
- StateStats1 = maps:to_list(StateStats) ++ [{total, Total}],
- Result = {lists:sort(StateProps ++ StateStats1)},
- {reply, Result, State};
-handle_call(reset_state, _From, State) ->
- {reply, ok, reset_state(State)};
-handle_call(Call, From, State) ->
- couch_log:error("~p unknown call ~p from: ~p", [?MODULE, Call, From]),
- {noreply, State}.
-handle_cast({db_deleted, DbName}, State) ->
- % Remove only completed jobs. Other running states would `fail` but
- % job result would stick around so users can inspect them.
- JobIds = jobs_by_db_and_state(DbName, completed),
- [remove_job_int(JobId, State) || JobId <- JobIds],
- {noreply, State};
-handle_cast({report, Job}, State) ->
- report_int(Job),
- {noreply, State};
-handle_cast({checkpoint, Job}, State) ->
- {noreply, checkpoint_int(Job, State)};
-handle_cast(reload_jobs, State) ->
- couch_log:notice("~p starting reloading jobs", [?MODULE]),
- State1 = reload_jobs(State),
- couch_log:notice("~p finished reloading jobs", [?MODULE]),
- {noreply, State1};
-handle_cast(Cast, State) ->
- couch_log:error("~p unexpected cast ~p", [?MODULE, Cast]),
- {noreply, State}.
-handle_info({'DOWN', _Ref, process, Pid, Info}, State) ->
- case job_by_pid(Pid) of
- {ok, Job} ->
- couch_log:notice("~p job ~s exit ~p", [?MODULE,, Info]),
- ok = handle_job_exit(Job, Info, State);
- {error, not_found} ->
- couch_log:error("~p job not found: ~p ~p", [?MODULE, Pid, Info])
- end,
- {noreply, State};
-handle_info(Info, State) ->
- couch_log:error("~p unexpected info ~p", [?MODULE, Info]),
- {noreply, State}.
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-%% Private API
-validate_and_start_job(#shard{} = Source, Split) ->
- case mem3_reshard_validate:start_args(Source, Split) of
- ok ->
- Target = target_shards(Source, Split),
- case mem3_reshard_validate:targets(Source, Target) of
- ok ->
- TStamp = now_sec(),
- Job = #job{
- type = split,
- job_state = new,
- split_state = new,
- start_time = TStamp,
- update_time = TStamp,
- node = node(),
- source = Source,
- target = Target
- },
- Job1 = Job#job{id = job_id(Job)},
- Job2 = update_job_history(Job1),
- gen_server:call(?MODULE, {start_job, Job2}, infinity);
- {error, Error} ->
- {error, Error}
- end;
- {error, Error} ->
- {error, Error}
- end.
-handle_start_job(#job{} = Job, #state{state = running} = State) ->
- case start_job_int(Job, State) of
- ok ->
- {reply, {ok,}, State};
- {error, Error} ->
- {reply, {error, Error}, State}
- end;
-handle_start_job(#job{} = Job, #state{state = stopped} = State) ->
- ok = mem3_reshard_store:store_job(State, Job),
- % Since resharding is stopped on this node, the job is temporarily marked
- % as stopped in the ets table so as not to return a "running" result which
- % would look odd.
- temporarily_stop_job(Job),
- {reply, {ok,}, State}.
-% Insert job in the ets table as a temporarily stopped job. This would happen
-% when a job is reloaded or added when node-wide resharding is stopped.
--spec temporarily_stop_job(#job{}) -> #job{}.
-temporarily_stop_job(Job) ->
- Job1 = kill_job_int(Job),
- OldInfo = Job1#job.state_info,
- Reason = <<"Shard splitting disabled">>,
- Job2 = Job1#job{
- job_state = stopped,
- update_time = now_sec(),
- start_time = 0,
- state_info = info_update(reason, Reason, OldInfo),
- pid = undefined,
- ref = undefined
- },
- Job3 = update_job_history(Job2),
- true = ets:insert(?MODULE, Job3),
- Job3.
--spec reload_jobs(#state{}) -> #state{}.
-reload_jobs(State) ->
- Jobs = mem3_reshard_store:get_jobs(State),
- lists:foldl(fun reload_job/2, State, Jobs).
-% This is a case when main application is stopped but a job is reloaded that
-% was checkpointed in running state. Set that state to stopped to avoid the API
-% results looking odd.
--spec reload_job(#job{}, #state{}) -> #state{}.
-reload_job(#job{job_state = JS} = Job, #state{state = stopped} = State)
- when JS =:= running orelse JS =:= new ->
- temporarily_stop_job(Job),
- State;
-% This is a case when a job process should be spawend
-reload_job(#job{job_state = JS} = Job, #state{state = running} = State)
- when JS =:= running orelse JS =:= new ->
- case start_job_int(Job, State) of
- ok ->
- State;
- {error, Error} ->
- Msg = "~p could not resume ~s error: ~p",
- couch_log:error(Msg, [?MODULE, jobfmt(Job), Error]),
- State
- end;
-% If job is disabled individually (stopped by the user), is completed or failed
-% then simply load it into the ets table
-reload_job(#job{job_state = JS} = Job, #state{} = State)
- when JS =:= failed orelse JS =:= completed orelse JS =:= stopped ->
- true = ets:insert(?MODULE, Job),
- State.
--spec get_max_jobs() -> integer().
-get_max_jobs() ->
- config:get_integer("reshard", "max_jobs", ?DEFAULT_MAX_JOBS).
--spec start_job_int(#job{}, #state{}) -> ok | {error, term()}.
-start_job_int(Job, State) ->
- case spawn_job(Job) of
- {ok, #job{} = Job1} ->
- Job2 = update_job_history(Job1),
- ok = mem3_reshard_store:store_job(State, Job2),
- true = ets:insert(?MODULE, Job2),
- ok;
- {error, Error} ->
- {error, Error}
- end.
--spec spawn_job(#job{}) -> {ok, pid()} | {error, term()}.
-spawn_job(#job{} = Job0) ->
- Job = Job0#job{
- job_state = running,
- start_time = 0,
- update_time = now_sec(),
- state_info = info_delete(reason, Job0#job.state_info),
- manager = self(),
- workers = [],
- retries = 0
- },
- case mem3_reshard_job_sup:start_child(Job) of
- {ok, Pid} ->
- Ref = monitor(process, Pid),
- {ok, Job#job{pid = Pid, ref = Ref}};
- {error, Reason} ->
- {error, Reason}
- end.
--spec stop_job_int(#job{}, job_state(), term(), #state{}) -> ok.
-stop_job_int(#job{} = Job, JobState, Reason, State) ->
- couch_log:info("~p stop_job_int ~p newstate: ~p reason:~p", [?MODULE,
- jobfmt(Job), JobState, Reason]),
- Job1 = kill_job_int(Job),
- Job2 = Job1#job{
- job_state = JobState,
- update_time = now_sec(),
- state_info = [{reason, Reason}]
- },
- ok = mem3_reshard_store:store_job(State, Job2),
- true = ets:insert(?MODULE, Job2),
- couch_log:info("~p stop_job_int stopped ~p", [?MODULE, jobfmt(Job2)]),
- ok.
--spec kill_job_int(#job{}) -> #job{}.
-kill_job_int(#job{pid = undefined} = Job) ->
- Job;
-kill_job_int(#job{pid = Pid, ref = Ref} = Job) ->
- couch_log:info("~p kill_job_int ~p", [?MODULE, jobfmt(Job)]),
- demonitor(Ref, [flush]),
- case erlang:is_process_alive(Pid) of
- true ->
- ok = mem3_reshard_job_sup:terminate_child(Pid);
- false ->
- ok
- end,
- Job1 = Job#job{pid = undefined, ref = undefined},
- true = ets:insert(?MODULE, Job1),
- Job1.
--spec handle_job_exit(#job{}, term(), #state{}) -> ok.
-handle_job_exit(#job{split_state = completed} = Job, normal, State) ->
- couch_log:notice("~p completed job ~s exited", [?MODULE,]),
- Job1 = Job#job{
- pid = undefined,
- ref = undefined,
- job_state = completed,
- update_time = now_sec(),
- state_info = []
- },
- Job2 = update_job_history(Job1),
- ok = mem3_reshard_store:store_job(State, Job2),
- true = ets:insert(?MODULE, Job2),
- ok;
-handle_job_exit(#job{job_state = running} = Job, normal, _State) ->
- couch_log:notice("~p running job ~s stopped", [?MODULE,]),
- OldInfo = Job#job.state_info,
- Job1 = Job#job{
- pid = undefined,
- ref = undefined,
- job_state = stopped,
- update_time = now_sec(),
- state_info = info_update(reason, <<"Job stopped">>, OldInfo)
- },
- true = ets:insert(?MODULE, update_job_history(Job1)),
- ok;
-handle_job_exit(#job{job_state = running} = Job, shutdown, _State) ->
- couch_log:notice("~p job ~s shutdown", [?MODULE,]),
- OldInfo = Job#job.state_info,
- Job1 = Job#job{
- pid = undefined,
- ref = undefined,
- job_state = stopped,
- update_time = now_sec(),
- state_info = info_update(reason, <<"Job shutdown">>, OldInfo)
- },
- true = ets:insert(?MODULE, update_job_history(Job1)),
- ok;
-handle_job_exit(#job{job_state = running} = Job, {shutdown, Msg}, _State) ->
- couch_log:notice("~p job ~s shutdown ~p", [?MODULE,, Msg]),
- OldInfo = Job#job.state_info,
- Job1 = Job#job{
- pid = undefined,
- ref = undefined,
- job_state = stopped,
- update_time = now_sec(),
- state_info = info_update(reason, <<"Job shutdown">>, OldInfo)
- },
- true = ets:insert(?MODULE, update_job_history(Job1)),
- ok;
-handle_job_exit(#job{} = Job, Error, State) ->
- couch_log:notice("~p job ~s failed ~p", [?MODULE,, Error]),
- OldInfo = Job#job.state_info,
- Job1 = Job#job{
- pid = undefined,
- ref = undefined,
- job_state = failed,
- update_time = now_sec(),
- state_info = info_update(reason, Error, OldInfo)
- },
- Job2 = update_job_history(Job1),
- ok = mem3_reshard_store:store_job(State, Job2),
- true = ets:insert(?MODULE, Job2),
- ok.
--spec job_by_id(job_id()) -> #job{} | not_found.
-job_by_id(Id) ->
- case ets:lookup(?MODULE, Id) of
- [] ->
- not_found;
- [#job{} = Job] ->
- Job
- end.
--spec job_by_pid(pid()) -> {ok, #job{}} | {error, not_found}.
-job_by_pid(Pid) when is_pid(Pid) ->
- case ets:match_object(?MODULE, #job{pid=Pid, _='_'}) of
- [] ->
- {error, not_found};
- [#job{} = Job] ->
- {ok, Job}
- end.
--spec state_id() -> binary().
-state_id() ->
- Ver = iolist_to_binary(io_lib:format("~3..0B", [?JOB_STATE_VERSION])),
- <<?STATE_PREFIX/binary, Ver/binary>>.
--spec job_id(#job{}) -> binary().
-job_id(#job{source = #shard{name = SourceName}}) ->
- HashInput = [SourceName, atom_to_binary(node(), utf8)],
- IdHashList = couch_util:to_hex(crypto:hash(sha256, HashInput)),
- IdHash = iolist_to_binary(IdHashList),
- Prefix = iolist_to_binary(io_lib:format("~3..0B", [?JOB_ID_VERSION])),
- <<Prefix/binary, "-", IdHash/binary>>.
--spec target_shards(#shard{}, split()) -> [#shard{}].
-target_shards(#shard{name = Name, range = [B, E], dbname = DbName}, Split) when
- is_integer(Split), Split >= 2, (E - B + 1) >= Split ->
- Ranges = target_ranges([B, E], Split),
- <<"shards/", _:8/binary, "-", _:8/binary, "/", DbAndSuffix/binary>> = Name,
- [DbName, Suffix] = binary:split(DbAndSuffix, <<".">>),
- [build_shard(R, DbName, Suffix) || R <- Ranges].
--spec target_ranges([range_pos()], split()) -> [[range_pos()]].
-target_ranges([Begin, End], Split) when (End - Begin + 1) >= Split,
- Split >=2 ->
- Len = End - Begin + 1, % + 1 since intervals are inclusive
- NewLen = Len div Split,
- Rem = Len rem Split,
- Ranges = [[I, I + NewLen - 1] || I <- lists:seq(Begin, End - Rem, NewLen)],
- % Adjust last end to always match the original end to ensure we always
- % cover the whole range. In case when remainder is larger this will make
- % the last range larger. Improve the algorithm later to re-distribute
- % the remainder equally amonst the chunks.
- {BeforeLast, [[BeginLast, _]]} = lists:split(Split - 1, Ranges),
- BeforeLast ++ [[BeginLast, End]].
--spec build_shard([non_neg_integer()], binary(), binary()) -> #shard{}.
-build_shard(Range, DbName, Suffix) ->
- Shard = #shard{dbname = DbName, range = Range, node = node()},
- mem3_util:name_shard(Shard, <<".", Suffix/binary>>).
--spec running_jobs() -> [#job{}].
-running_jobs() ->
- Pat = #job{job_state = running, _ = '_'},
- ets:match_object(?MODULE, Pat).
--spec info_update(atom(), any(), [tuple()]) -> [tuple()].
-info_update(Key, Val, StateInfo) ->
- lists:keystore(Key, 1, StateInfo, {Key, Val}).
--spec info_delete(atom(), [tuple()]) -> [tuple()].
-info_delete(Key, StateInfo) ->
- lists:keydelete(Key, 1, StateInfo).
--spec checkpoint_int(#job{}, #state{}) -> #state{}.
-checkpoint_int(#job{} = Job, State) ->
- couch_log:debug("~p checkpoint ~s", [?MODULE, jobfmt(Job)]),
- case report_int(Job) of
- ok ->
- ok = mem3_reshard_store:store_job(State, Job),
- ok = mem3_reshard_job:checkpoint_done(Job),
- State;
- not_found ->
- couch_log:error("~p checkpoint couldn't find ~p", [?MODULE, Job]),
- State
- end.
--spec report_int(#job{}) -> ok | not_found.
-report_int(Job) ->
- case ets:lookup(?MODULE, of
- [#job{ref = Ref, pid = CurPid}] ->
- case =:= CurPid of
- true ->
- couch_log:debug("~p reported ~s", [?MODULE, jobfmt(Job)]),
- % Carry over the reference from ets as the #job{} coming
- % from the job process won't have it's own monitor ref.
- true = ets:insert(?MODULE, Job#job{ref = Ref}),
- ok;
- false ->
- LogMsg = "~p ignoring old job report ~p curr pid:~p",
- couch_log:warning(LogMsg, [?MODULE, jobfmt(Job), CurPid]),
- not_found
- end;
- _ ->
- couch_log:error("~p reporting : couldn't find ~p", [?MODULE, Job]),
- not_found
- end.
--spec remove_job_int(#job{}, #state{}) -> ok | {error, not_found}.
-remove_job_int(Id, State) ->
- couch_log:notice("~p call remove_job Id:~p", [?MODULE, Id]),
- case job_by_id(Id) of
- #job{} = Job ->
- kill_job_int(Job),
- ok = mem3_reshard_store:delete_job(State, Id),
- ets:delete(?MODULE,,
- ok;
- not_found ->
- {error, not_found}
- end.
-% This function is for testing and debugging only
--spec reset_state(#state{}) -> #state{}.
-reset_state(#state{} = State) ->
- couch_log:warning("~p resetting state", [?MODULE]),
- ok = mem3_reshard_store:delete_state(State),
- couch_log:warning("~p killing all running jobs", [?MODULE]),
- [kill_job_int(Job) || Job <- running_jobs()],
- ets:delete_all_objects(?MODULE),
- couch_log:warning("~p resetting all job states", [?MODULE]),
- Jobs = mem3_reshard_store:get_jobs(State),
- lists:foldl(fun(#job{id = Id}, StateAcc) ->
- couch_log:warning("~p resetting job state ~p", [?MODULE, Id]),
- ok = mem3_reshard_store:delete_job(StateAcc, Id),
- StateAcc
- end, State, Jobs),
- couch_log:warning("~p resetting state done", [?MODULE]),
- State#state{
- state = running,
- state_info = [],
- update_time = now_sec()
- }.
--spec update_job_history(#job{}) -> #job{}.
-update_job_history(#job{job_state = St, update_time = Ts} = Job) ->
- Hist = Job#job.history,
- Reason = case couch_util:get_value(reason, Job#job.state_info) of
- undefined -> null;
- Val -> couch_util:to_binary(Val)
- end,
- Job#job{history = update_history(St, Reason, Ts, Hist)}.
-update_history_rev(State, null, Ts, [{_, State, Detail} | Rest]) ->
- % Just updated the detail, state stays the same, no new entry added
- [{Ts, State, Detail} | Rest];
-update_history_rev(State, Detail, Ts, [{_, State, Detail} | Rest]) ->
- % State and detail were same as last event, just update the timestamp
- [{Ts, State, Detail} | Rest];
-update_history_rev(State, Detail, Ts, [{_, State, Detail} | Rest]) ->
- % State and detail were same as last event, just update the timestamp
- [{Ts, State, Detail} | Rest];
-update_history_rev(State, Detail, Ts, History) ->
- [{Ts, State, Detail} | History].
--spec max_history() -> non_neg_integer().
-max_history() ->
- config:get_integer("reshard", "max_history", ?DEFAULT_MAX_HISTORY).
--spec maybe_disable(#state{}) -> #state{}.
-maybe_disable(#state{} = State) ->
- case is_disabled() of
- true ->
- Reason = <<"Resharding disabled by application level config">>,
- SInfo = State#state.state_info,
- State#state{
- state = stopped,
- state_info = info_update(reason, Reason, SInfo)
- };
- false ->
- State
- end.
--spec jobs_by_db_and_state(binary(), split_state() | '_') -> [job_id()].
-jobs_by_db_and_state(Db, State) ->
- DbName = mem3:dbname(Db),
- Pat = #job{
- id = '$1',
- source =#shard{dbname = DbName, _ = '_'},
- job_state = State,
- _ = '_'
- },
- [JobId || [JobId] <- ets:match(?MODULE, Pat)].
--spec db_exists(binary()) -> boolean().
-db_exists(Name) ->
- try
- mem3:shards(mem3:dbname(Name)),
- true
- catch
- error:database_does_not_exist ->
- false
- end.
--spec db_monitor(pid()) -> no_return().
-db_monitor(Server) ->
- couch_log:notice("~p db monitor ~p starting", [?MODULE, self()]),
- EvtRef = erlang:monitor(process, couch_event_server),
- couch_event:register_all(self()),
- db_monitor_loop(Server, EvtRef).
--spec db_monitor_loop(pid(), reference()) -> no_return().
-db_monitor_loop(Server, EvtRef) ->
- receive
- {'$couch_event', DbName, deleted} ->
- case db_exists(DbName) of
- true ->
- % Could be source shard being deleted during splitting
- ok;
- false ->
- case length(jobs_by_db_and_state(DbName, '_')) > 0 of
- true ->
- % Notify only if there are jobs with that db
- gen_server:cast(Server, {db_deleted, DbName});
- false ->
- ok
- end
- end,
- db_monitor_loop(Server, EvtRef);
- {'$couch_event', _, _} ->
- db_monitor_loop(Server, EvtRef);
- {'DOWN', EvtRef, _, _, Info} ->
- couch_log:error("~p db monitor listener died ~p", [?MODULE, Info]),
- exit({db_monitor_died, Info});
- Msg ->
- couch_log:error("~p db monitor unexpected msg ~p", [?MODULE, Msg]),
- db_monitor_loop(Server, EvtRef)
- end.
--spec statefmt(#state{} | term()) -> string().
-statefmt(#state{state = StateName}) ->
- Total = ets:info(?MODULE, size),
- Active = mem3_reshard_job_sup:count_children(),
- Msg = "#state{~s total:~B active:~B}",
- Fmt = io_lib:format(Msg, [StateName, Total, Active]),
- lists:flatten(Fmt);
-statefmt(State) ->
- Fmt = io_lib:format("<Unknown split state:~p>", [State]),
- lists:flatten(Fmt).
--spec jobfmt(#job{}) -> string().
-jobfmt(#job{} = Job) ->
- mem3_reshard_job:jobfmt(Job).