diff options
Diffstat (limited to 'src/mem3/src/mem3_reshard_job.erl')
-rw-r--r-- | src/mem3/src/mem3_reshard_job.erl | 716 |
1 files changed, 0 insertions, 716 deletions
diff --git a/src/mem3/src/mem3_reshard_job.erl b/src/mem3/src/mem3_reshard_job.erl deleted file mode 100644 index aedca21bb..000000000 --- a/src/mem3/src/mem3_reshard_job.erl +++ /dev/null @@ -1,716 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(mem3_reshard_job). - - --export([ - start_link/1, - - checkpoint_done/1, - jobfmt/1, - pickfun/3 -]). - --export([ - init/1, - - initial_copy/1, - initial_copy_impl/1, - - topoff/1, - topoff_impl/1, - - build_indices/1, - - copy_local_docs/1, - copy_local_docs_impl/1, - - update_shardmap/1, - - wait_source_close/1, - wait_source_close_impl/1, - - source_delete/1, - source_delete_impl/1, - - completed/1 -]). - - --include_lib("couch/include/couch_db.hrl"). --include("mem3_reshard.hrl"). - - -% Batch size for internal replication topoffs --define(INTERNAL_REP_BATCH_SIZE, 2000). - -% The list of possible job states. The order of this -% list is important as a job will progress linearly -% through it. However, when starting a job we may -% have to resume from an earlier state as listed -% below in STATE_RESTART. --define(SPLIT_STATES, [ - new, - initial_copy, - topoff1, - build_indices, - topoff2, - copy_local_docs, - update_shardmap, - wait_source_close, - topoff3, - source_delete, - completed -]). - - -% When a job starts it may be resuming from a partially -% completed state. These state pairs list the state -% we have to restart from for each possible state. --define(STATE_RESTART, #{ - new => initial_copy, - initial_copy => initial_copy, - topoff1 => topoff1, - build_indices => topoff1, - topoff2 => topoff1, - copy_local_docs => topoff1, - update_shardmap => update_shardmap, - wait_source_close => wait_source_close, - topoff3 => wait_source_close, - source_delete => wait_source_close, - completed => completed -}). - - -% If we have a worker failing during any of these -% states we need to clean up the targets --define(CLEAN_TARGET_STATES, [ - initial_copy, - topoff1, - build_indices, - topoff2, - copy_local_docs -]). - - -start_link(#job{} = Job) -> - proc_lib:start_link(?MODULE, init, [Job]). - - -% This is called by the main proces after it has checkpointed the progress -% of the job. After the new state is checkpointed, we signal the job to start -% executing that state. -checkpoint_done(#job{pid = Pid} = Job) -> - couch_log:notice(" ~p : checkpoint done for ~p", [?MODULE, jobfmt(Job)]), - Pid ! checkpoint_done, - ok. - - -% Formatting function, used for logging mostly -jobfmt(#job{} = Job) -> - #job{ - id = Id, - source = #shard{name = Source}, - target = Target, - split_state = State, - job_state = JobState, - pid = Pid - } = Job, - TargetCount = length(Target), - Msg = "#job{~s ~s /~B job_state:~s split_state:~s pid:~p}", - Fmt = io_lib:format(Msg, [Id, Source, TargetCount, JobState, State, Pid]), - lists:flatten(Fmt). - - -% This is the function which picks between various targets. It is used here as -% well as in mem3_rep internal replicator and couch_db_split bulk copy logic. -% Given a document id and list of ranges, and a hash function, it will pick one -% of the range or return not_in_range atom. -pickfun(DocId, [[B, E] | _] = Ranges, {_M, _F, _A} = HashFun) when - is_integer(B), is_integer(E), B =< E -> - HashKey = mem3_hash:calculate(HashFun, DocId), - Pred = fun([Begin, End]) -> - Begin =< HashKey andalso HashKey =< End - end, - case lists:filter(Pred, Ranges) of - [] -> not_in_range; - [Key] -> Key - end. - - -init(#job{} = Job0) -> - process_flag(trap_exit, true), - Job1 = set_start_state(Job0#job{ - pid = self(), - start_time = mem3_reshard:now_sec(), - workers = [], - retries = 0 - }), - Job2 = update_split_history(Job1), - proc_lib:init_ack({ok, self()}), - couch_log:notice("~p starting job ~s", [?MODULE, jobfmt(Job2)]), - ok = checkpoint(Job2), - run(Job2). - - -run(#job{split_state = CurrState} = Job) -> - StateFun = case CurrState of - topoff1 -> topoff; - topoff2 -> topoff; - topoff3 -> topoff; - _ -> CurrState - end, - NewJob = try - Job1 = ?MODULE:StateFun(Job), - Job2 = wait_for_workers(Job1), - Job3 = switch_to_next_state(Job2), - ok = checkpoint(Job3), - Job3 - catch - throw:{retry, RetryJob} -> - RetryJob - end, - run(NewJob). - - -set_start_state(#job{split_state = State} = Job) -> - case maps:get(State, ?STATE_RESTART, undefined) of - undefined -> - Fmt1 = "~p recover : unknown state ~s", - couch_log:error(Fmt1, [?MODULE, jobfmt(Job)]), - erlang:error({invalid_split_job_recover_state, Job}); - StartState-> - Job#job{split_state = StartState} - end. - - -get_next_state(#job{split_state = State}) -> - get_next_state(State, ?SPLIT_STATES). - - -get_next_state(completed, _) -> - completed; - -get_next_state(CurrState, [CurrState, NextState | _]) -> - NextState; - -get_next_state(CurrState, [_ | Rest]) -> - get_next_state(CurrState, Rest). - - -switch_to_next_state(#job{} = Job0) -> - Info0 = Job0#job.state_info, - Info1 = info_delete(error, Info0), - Info2 = info_delete(reason, Info1), - Job1 = Job0#job{ - split_state = get_next_state(Job0), - update_time = mem3_reshard:now_sec(), - retries = 0, - state_info = Info2, - workers = [] - }, - Job2 = update_split_history(Job1), - check_state(Job2). - - -checkpoint(Job) -> - % Ask main process to checkpoint. When it has finished it will notify us - % by calling by checkpoint_done/1. The reason not to call the main process - % via a gen_server:call is because the main process could be in the middle - % of terminating the job and then it would deadlock (after sending us a - % shutdown message) and it would end up using the whole supervisor - % termination timeout before finally. - ok = mem3_reshard:checkpoint(Job#job.manager, Job), - Parent = parent(), - receive - {'EXIT', Parent, Reason} -> - handle_exit(Job, Reason); - checkpoint_done -> - ok; - Other -> - handle_unknown_msg(Job, "checkpoint", Other) - end. - - -wait_for_workers(#job{workers = []} = Job) -> - Job; - -wait_for_workers(#job{workers = Workers} = Job) -> - Parent = parent(), - receive - {'EXIT', Parent, Reason} -> - handle_exit(Job, Reason); - {'EXIT', Pid, Reason} -> - case lists:member(Pid, Workers) of - true -> - NewJob = handle_worker_exit(Job, Pid, Reason), - wait_for_workers(NewJob); - false -> - handle_unknown_msg(Job, "wait_for_workers", {Pid, Reason}) - end; - Other -> - handle_unknown_msg(Job, "wait_for_workers", Other) - end. - - -handle_worker_exit(#job{workers = Workers} = Job, Pid, normal) -> - Job#job{workers = Workers -- [Pid]}; - -handle_worker_exit(#job{} = Job, _Pid, {error, missing_source}) -> - Msg1 = "~p stopping worker due to source missing ~p", - couch_log:error(Msg1, [?MODULE, jobfmt(Job)]), - kill_workers(Job), - case lists:member(Job#job.split_state, ?CLEAN_TARGET_STATES) of - true -> - Msg2 = "~p cleaning target after db was deleted ~p", - couch_log:error(Msg2, [?MODULE, jobfmt(Job)]), - reset_target(Job), - exit({error, missing_source}); - false -> - exit({error, missing_source}) - end; - -handle_worker_exit(#job{} = Job, _Pid, {error, missing_target}) -> - Msg = "~p stopping worker due to target db missing ~p", - couch_log:error(Msg, [?MODULE, jobfmt(Job)]), - kill_workers(Job), - exit({error, missing_target}); - -handle_worker_exit(#job{} = Job0, _Pid, Reason) -> - couch_log:error("~p worker error ~p ~p", [?MODULE, jobfmt(Job0), Reason]), - kill_workers(Job0), - Job1 = Job0#job{workers = []}, - case Job1#job.retries =< max_retries() of - true -> - retry_state(Job1, Reason); - false -> - exit(Reason) - end. - - -% Cleanup and exit when we receive an 'EXIT' message from our parent. In case -% the shard map is being updated, try to wait some time for it to finish. -handle_exit(#job{split_state = update_shardmap, workers = [WPid]} = Job, - Reason) -> - Timeout = update_shard_map_timeout_sec(), - Msg1 = "~p job exit ~s ~p while shard map is updating, waiting ~p sec", - couch_log:warning(Msg1, [?MODULE, jobfmt(Job), Reason, Timeout]), - receive - {'EXIT', WPid, normal} -> - Msg2 = "~p ~s shard map finished updating successfully, exiting", - couch_log:notice(Msg2, [?MODULE, jobfmt(Job)]), - exit(Reason); - {'EXIT', WPid, Error} -> - Msg3 = "~p ~s shard map update failed with error ~p", - couch_log:error(Msg3, [?MODULE, jobfmt(Job), Error]), - exit(Reason) - after Timeout * 1000-> - Msg4 = "~p ~s shard map update timeout exceeded ~p sec", - couch_log:error(Msg4, [?MODULE, jobfmt(Job), Timeout]), - kill_workers(Job), - exit(Reason) - end; - -handle_exit(#job{} = Job, Reason) -> - kill_workers(Job), - exit(Reason). - - -retry_state(#job{retries = Retries, state_info = Info} = Job0, Error) -> - Job1 = Job0#job{ - retries = Retries + 1, - state_info = info_update(error, Error, Info) - }, - couch_log:notice("~p retrying ~p ~p", [?MODULE, jobfmt(Job1), Retries]), - Job2 = report(Job1), - Timeout = retry_interval_sec(), - Parent = parent(), - receive - {'EXIT', Parent, Reason} -> - handle_exit(Job2, Reason); - Other -> - handle_unknown_msg(Job2, "retry_state", Other) - after Timeout * 1000 -> - ok - end, - throw({retry, Job2}). - - -report(#job{manager = ManagerPid} = Job) -> - Job1 = Job#job{update_time = mem3_reshard:now_sec()}, - ok = mem3_reshard:report(ManagerPid, Job1), - Job1. - - -kill_workers(#job{workers = Workers}) -> - lists:foreach(fun(Worker) -> - unlink(Worker), - exit(Worker, kill) - end, Workers), - flush_worker_messages(). - - -flush_worker_messages() -> - Parent = parent(), - receive - {'EXIT', Pid, _} when Pid =/= Parent -> - flush_worker_messages() - after 0 -> - ok - end. - - -parent() -> - case get('$ancestors') of - [Pid | _] when is_pid(Pid) -> Pid; - [Name | _] when is_atom(Name) -> whereis(Name); - _ -> undefined - end. - - -handle_unknown_msg(Job, When, RMsg) -> - LogMsg = "~p ~s received an unknown message ~p when in ~s", - couch_log:error(LogMsg, [?MODULE, jobfmt(Job), RMsg, When]), - erlang:error({invalid_split_job_message, Job#job.id, When, RMsg}). - - -initial_copy(#job{} = Job) -> - Pid = spawn_link(?MODULE, initial_copy_impl, [Job]), - report(Job#job{workers = [Pid]}). - - -initial_copy_impl(#job{source = Source, target = Targets0} = Job) -> - #shard{name = SourceName} = Source, - Targets = [{R, N} || #shard{range = R, name = N} <- Targets0], - TMap = maps:from_list(Targets), - LogMsg1 = "~p initial_copy started ~s", - LogArgs1 = [?MODULE, shardsstr(Source, Targets0)], - couch_log:notice(LogMsg1, LogArgs1), - reset_target(Job), - case couch_db_split:split(SourceName, TMap, fun pickfun/3) of - {ok, Seq} -> - LogMsg2 = "~p initial_copy of ~s finished @ seq:~p", - LogArgs2 = [?MODULE, shardsstr(Source, Targets0), Seq], - couch_log:notice(LogMsg2, LogArgs2), - create_artificial_mem3_rep_checkpoints(Job, Seq); - {error, Error} -> - LogMsg3 = "~p initial_copy of ~p finished @ ~p", - LogArgs3 = [?MODULE, shardsstr(Source, Targets0), Error], - couch_log:notice(LogMsg3, LogArgs3), - exit({error, Error}) - end. - - -topoff(#job{} = Job) -> - Pid = spawn_link(?MODULE, topoff_impl, [Job]), - report(Job#job{workers = [Pid]}). - - -topoff_impl(#job{source = #shard{} = Source, target = Targets}) -> - couch_log:notice("~p topoff ~p", [?MODULE, shardsstr(Source, Targets)]), - check_source_exists(Source, topoff), - check_targets_exist(Targets, topoff), - TMap = maps:from_list([{R, T} || #shard{range = R} = T <- Targets]), - Opts = [{batch_size, ?INTERNAL_REP_BATCH_SIZE}, {batch_count, all}], - case mem3_rep:go(Source, TMap, Opts) of - {ok, Count} -> - Args = [?MODULE, shardsstr(Source, Targets), Count], - couch_log:notice("~p topoff done ~s, count: ~p", Args), - ok; - {error, Error} -> - Args = [?MODULE, shardsstr(Source, Targets), Error], - couch_log:error("~p topoff failed ~s, error: ~p", Args), - exit({error, Error}) - end. - - -build_indices(#job{} = Job) -> - #job{ - source = #shard{name = SourceName} = Source, - target = Targets, - retries = Retries, - state_info = Info - } = Job, - check_source_exists(Source, build_indices), - {ok, DDocs} = mem3_reshard_index:design_docs(SourceName), - Indices = mem3_reshard_index:target_indices(DDocs, Targets), - case mem3_reshard_index:spawn_builders(Indices) of - {ok, []} -> - % Skip the log spam if this is a no-op - Job#job{workers = []}; - {ok, Pids} -> - report(Job#job{workers = Pids}); - {error, Error} -> - case Job#job.retries =< max_retries() of - true -> - build_indices(Job#job{ - retries = Retries + 1, - state_info = info_update(error, Error, Info) - }); - false -> - exit(Error) - end - end. - - -copy_local_docs(#job{split_state = copy_local_docs} = Job) -> - Pid = spawn_link(?MODULE, copy_local_docs_impl, [Job]), - report(Job#job{workers = [Pid]}). - - -copy_local_docs_impl(#job{source = Source, target = Targets0}) -> - #shard{name = SourceName} = Source, - Targets = [{R, N} || #shard{range = R, name = N} <- Targets0], - TMap = maps:from_list(Targets), - LogArg1 = [?MODULE, shardsstr(Source, Targets)], - couch_log:notice("~p copy local docs start ~s", LogArg1), - case couch_db_split:copy_local_docs(SourceName, TMap, fun pickfun/3) of - ok -> - couch_log:notice("~p copy local docs finished for ~s", LogArg1), - ok; - {error, Error} -> - LogArg2 = [?MODULE, shardsstr(Source, Targets), Error], - couch_log:error("~p copy local docs failed for ~s ~p", LogArg2), - exit({error, Error}) - end. - - -update_shardmap(#job{} = Job) -> - Pid = spawn_link(mem3_reshard_dbdoc, update_shard_map, [Job]), - report(Job#job{workers = [Pid]}). - - -wait_source_close(#job{source = #shard{name = Name}} = Job) -> - couch_event:notify(Name, deleted), - Pid = spawn_link(?MODULE, wait_source_close_impl, [Job]), - report(Job#job{workers = [Pid]}). - - -wait_source_close_impl(#job{source = #shard{name = Name}, target = Targets}) -> - Timeout = config:get_integer("reshard", "source_close_timeout_sec", 600), - check_targets_exist(Targets, wait_source_close), - case couch_db:open_int(Name, [?ADMIN_CTX]) of - {ok, Db} -> - Now = mem3_reshard:now_sec(), - case wait_source_close(Db, 1, Now + Timeout) of - true -> - ok; - false -> - exit({error, source_db_close_timeout, Name, Timeout}) - end; - {not_found, _} -> - couch_log:warning("~p source already deleted ~p", [?MODULE, Name]), - ok - end. - - -wait_source_close(Db, SleepSec, UntilSec) -> - case couch_db:monitored_by(Db) -- [self()] of - [] -> - true; - [_ | _] -> - Now = mem3_reshard:now_sec(), - case Now < UntilSec of - true -> - LogMsg = "~p : Waiting for source shard ~p to be closed", - couch_log:notice(LogMsg, [?MODULE, couch_db:name(Db)]), - timer:sleep(SleepSec * 1000), - wait_source_close(Db, SleepSec, UntilSec); - false -> - false - end - end. - - -source_delete(#job{} = Job) -> - Pid = spawn_link(?MODULE, source_delete_impl, [Job]), - report(Job#job{workers = [Pid]}). - - -source_delete_impl(#job{source = #shard{name = Name}, target = Targets}) -> - check_targets_exist(Targets, source_delete), - case config:get_boolean("mem3_reshard", "delete_source", true) of - true -> - case couch_server:delete(Name, [?ADMIN_CTX]) of - ok -> - couch_log:notice("~p : deleted source shard ~p", - [?MODULE, Name]); - not_found -> - couch_log:warning("~p : source was already deleted ~p", - [?MODULE, Name]) - end; - false -> - % Emit deleted event even when not actually deleting the files this - % is the second one emitted, the other one was before - % wait_source_close. They should be idempotent. This one is just to - % match the one that couch_server would emit had the config not - % been set - couch_event:notify(Name, deleted), - LogMsg = "~p : according to configuration not deleting source ~p", - couch_log:warning(LogMsg, [?MODULE, Name]) - end, - TNames = [TName || #shard{name = TName} <- Targets], - lists:foreach(fun(TName) -> couch_event:notify(TName, updated) end, TNames). - - -completed(#job{} = Job) -> - couch_log:notice("~p : ~p completed, exit normal", [?MODULE, jobfmt(Job)]), - exit(normal). - - -% This is for belt and suspenders really. Call periodically to validate the -% state is one of the expected states. --spec check_state(#job{}) -> #job{} | no_return(). -check_state(#job{split_state = State} = Job) -> - case lists:member(State, ?SPLIT_STATES) of - true -> - Job; - false -> - erlang:error({invalid_shard_split_state, State, Job}) - end. - - -create_artificial_mem3_rep_checkpoints(#job{} = Job, Seq) -> - #job{source = Source = #shard{name = SourceName}, target = Targets} = Job, - check_source_exists(Source, initial_copy), - TNames = [TN || #shard{name = TN} <- Targets], - Timestamp = list_to_binary(mem3_util:iso8601_timestamp()), - couch_util:with_db(SourceName, fun(SDb) -> - [couch_util:with_db(TName, fun(TDb) -> - Doc = mem3_rep_checkpoint_doc(SDb, TDb, Timestamp, Seq), - {ok, _} = couch_db:update_doc(SDb, Doc, []), - {ok, _} = couch_db:update_doc(TDb, Doc, []), - ok - end) || TName <- TNames] - end), - ok. - - -mem3_rep_checkpoint_doc(SourceDb, TargetDb, Timestamp, Seq) -> - Node = atom_to_binary(node(), utf8), - SourceUUID = couch_db:get_uuid(SourceDb), - TargetUUID = couch_db:get_uuid(TargetDb), - History = {[ - {<<"source_node">>, Node}, - {<<"source_uuid">>, SourceUUID}, - {<<"source_seq">>, Seq}, - {<<"timestamp">>, Timestamp}, - {<<"target_node">>, Node}, - {<<"target_uuid">>, TargetUUID}, - {<<"target_seq">>, Seq} - ]}, - Body = {[ - {<<"seq">>, Seq}, - {<<"target_uuid">>, TargetUUID}, - {<<"history">>, {[{Node, [History]}]}} - ]}, - Id = mem3_rep:make_local_id(SourceUUID, TargetUUID), - #doc{id = Id, body = Body}. - - -check_source_exists(#shard{name = Name}, StateName) -> - case couch_server:exists(Name) of - true -> - ok; - false -> - ErrMsg = "~p source ~p is unexpectedly missing in ~p", - couch_log:error(ErrMsg, [?MODULE, Name, StateName]), - exit({error, missing_source}) - end. - - -check_targets_exist(Targets, StateName) -> - lists:foreach(fun(#shard{name = Name}) -> - case couch_server:exists(Name) of - true -> - ok; - false -> - ErrMsg = "~p target ~p is unexpectedly missing in ~p", - couch_log:error(ErrMsg, [?MODULE, Name, StateName]), - exit({error, missing_target}) - end - end, Targets). - - --spec max_retries() -> integer(). -max_retries() -> - config:get_integer("reshard", "max_retries", 1). - - --spec retry_interval_sec() -> integer(). -retry_interval_sec() -> - config:get_integer("reshard", "retry_interval_sec", 10). - - --spec update_shard_map_timeout_sec() -> integer(). -update_shard_map_timeout_sec() -> - config:get_integer("reshard", "update_shardmap_timeout_sec", 60). - - --spec info_update(atom(), any(), [tuple()]) -> [tuple()]. -info_update(Key, Val, StateInfo) -> - lists:keystore(Key, 1, StateInfo, {Key, Val}). - - --spec info_delete(atom(), [tuple()]) -> [tuple()]. -info_delete(Key, StateInfo) -> - lists:keydelete(Key, 1, StateInfo). - - --spec shardsstr(#shard{}, #shard{} | [#shard{}]) -> string(). -shardsstr(#shard{name = SourceName}, #shard{name = TargetName}) -> - lists:flatten(io_lib:format("~s -> ~s", [SourceName, TargetName])); - -shardsstr(#shard{name = SourceName}, Targets) -> - TNames = [TN || #shard{name = TN} <- Targets], - TargetsStr = string:join([binary_to_list(T) || T <- TNames], ","), - lists:flatten(io_lib:format("~s -> ~s", [SourceName, TargetsStr])). - - --spec reset_target(#job{}) -> #job{}. -reset_target(#job{source = Source, target = Targets} = Job) -> - ShardNames = try - [N || #shard{name = N} <- mem3:local_shards(mem3:dbname(Source))] - catch - error:database_does_not_exist -> - [] - end, - lists:map(fun(#shard{name = Name}) -> - case {couch_server:exists(Name), lists:member(Name, ShardNames)} of - {_, true} -> - % Should never get here but if we do crash and don't continue - LogMsg = "~p : ~p target unexpectedly found in shard map ~p", - couch_log:error(LogMsg, [?MODULE, jobfmt(Job), Name]), - erlang:error({target_present_in_shard_map, Name}); - {true, false} -> - LogMsg = "~p : ~p resetting ~p target", - couch_log:warning(LogMsg, [?MODULE, jobfmt(Job), Name]), - couch_db_split:cleanup_target(Source#shard.name, Name); - {false, false} -> - ok - end - end, Targets), - Job. - - --spec update_split_history(#job{}) -> #job{}. -update_split_history(#job{split_state = St, update_time = Ts} = Job) -> - Hist = Job#job.history, - JobSt = case St of - completed -> completed; - failed -> failed; - new -> new; - stopped -> stopped; - _ -> running - end, - Job#job{history = mem3_reshard:update_history(JobSt, St, Ts, Hist)}. |