diff options
Diffstat (limited to 'src/couch_replicator/src/couch_replicator_scheduler.erl')
-rw-r--r-- | src/couch_replicator/src/couch_replicator_scheduler.erl | 700 |
1 files changed, 314 insertions, 386 deletions
diff --git a/src/couch_replicator/src/couch_replicator_scheduler.erl b/src/couch_replicator/src/couch_replicator_scheduler.erl index d3b5b71a4..f544865af 100644 --- a/src/couch_replicator/src/couch_replicator_scheduler.erl +++ b/src/couch_replicator/src/couch_replicator_scheduler.erl @@ -20,28 +20,28 @@ ]). -export([ - init/1, - terminate/2, - handle_call/3, - handle_info/2, - handle_cast/2, - code_change/3, - format_status/2 + init/1, + terminate/2, + handle_call/3, + handle_info/2, + handle_cast/2, + code_change/3, + format_status/2 ]). -export([ - add_job/1, - remove_job/1, - reschedule/0, - rep_state/1, - find_jobs_by_dbname/1, - find_jobs_by_doc/2, - job_summary/2, - health_threshold/0, - jobs/0, - job/1, - restart_job/1, - update_job_stats/2 + add_job/1, + remove_job/1, + reschedule/0, + rep_state/1, + find_jobs_by_dbname/1, + find_jobs_by_doc/2, + job_summary/2, + health_threshold/0, + jobs/0, + job/1, + restart_job/1, + update_job_stats/2 ]). %% config_listener callbacks @@ -59,7 +59,6 @@ -include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl"). -include_lib("couch/include/couch_db.hrl"). - %% definitions -define(MAX_BACKOFF_EXPONENT, 10). -define(BACKOFF_INTERVAL_MICROS, 30 * 1000 * 1000). @@ -72,7 +71,6 @@ -define(DEFAULT_MAX_HISTORY, 20). -define(DEFAULT_SCHEDULER_INTERVAL, 60000). - -record(state, { interval = ?DEFAULT_SCHEDULER_INTERVAL, timer, @@ -88,14 +86,12 @@ crashed_n = 0 :: non_neg_integer() }). - %% public functions -spec start_link() -> {ok, pid()} | ignore | {error, term()}. start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - -spec add_job(#rep{}) -> ok. add_job(#rep{} = Rep) when Rep#rep.id /= undefined -> case existing_replication(Rep) of @@ -110,47 +106,44 @@ add_job(#rep{} = Rep) when Rep#rep.id /= undefined -> ok end. - -spec remove_job(job_id()) -> ok. remove_job(Id) -> gen_server:call(?MODULE, {remove_job, Id}, infinity). - -spec reschedule() -> ok. % Trigger a manual reschedule. Used for testing and/or ops. reschedule() -> gen_server:call(?MODULE, reschedule, infinity). - -spec rep_state(rep_id()) -> #rep{} | nil. rep_state(RepId) -> case (catch ets:lookup_element(?MODULE, RepId, #job.rep)) of - {'EXIT',{badarg, _}} -> + {'EXIT', {badarg, _}} -> nil; Rep -> Rep end. - -spec job_summary(job_id(), non_neg_integer()) -> [_] | nil. job_summary(JobId, HealthThreshold) -> case job_by_id(JobId) of {ok, #job{pid = Pid, history = History, rep = Rep}} -> ErrorCount = consecutive_crashes(History, HealthThreshold), - {State, Info} = case {Pid, ErrorCount} of - {undefined, 0} -> - case History of - [{{crashed, Error}, _When} | _] -> - {crashing, crash_reason_json(Error)}; - [_ | _] -> - {pending, Rep#rep.stats} - end; - {undefined, ErrorCount} when ErrorCount > 0 -> - [{{crashed, Error}, _When} | _] = History, - {crashing, crash_reason_json(Error)}; - {Pid, ErrorCount} when is_pid(Pid) -> - {running, Rep#rep.stats} - end, + {State, Info} = + case {Pid, ErrorCount} of + {undefined, 0} -> + case History of + [{{crashed, Error}, _When} | _] -> + {crashing, crash_reason_json(Error)}; + [_ | _] -> + {pending, Rep#rep.stats} + end; + {undefined, ErrorCount} when ErrorCount > 0 -> + [{{crashed, Error}, _When} | _] = History, + {crashing, crash_reason_json(Error)}; + {Pid, ErrorCount} when is_pid(Pid) -> + {running, Rep#rep.stats} + end, [ {source, iolist_to_binary(ejson_url(Rep#rep.source))}, {target, iolist_to_binary(ejson_url(Rep#rep.target))}, @@ -158,22 +151,20 @@ job_summary(JobId, HealthThreshold) -> {info, couch_replicator_utils:ejson_state_info(Info)}, {error_count, ErrorCount}, {last_updated, last_updated(History)}, - {start_time, - couch_replicator_utils:iso8601(Rep#rep.start_time)}, + {start_time, couch_replicator_utils:iso8601(Rep#rep.start_time)}, {source_proxy, job_proxy_url(Rep#rep.source)}, {target_proxy, job_proxy_url(Rep#rep.target)} ]; {error, not_found} -> - nil % Job might have just completed + % Job might have just completed + nil end. - job_proxy_url(#httpdb{proxy_url = ProxyUrl}) when is_list(ProxyUrl) -> list_to_binary(couch_util:url_strip_password(ProxyUrl)); job_proxy_url(_Endpoint) -> null. - % Health threshold is the minimum amount of time an unhealthy job should run % crashing before it is considered to be healthy again. HealtThreashold should % not be 0 as jobs could start and immediately crash, and it shouldn't be @@ -181,9 +172,11 @@ job_proxy_url(_Endpoint) -> % job is back to normal. -spec health_threshold() -> non_neg_integer(). health_threshold() -> - config:get_integer("replicator", "health_threshold", - ?DEFAULT_HEALTH_THRESHOLD_SEC). - + config:get_integer( + "replicator", + "health_threshold", + ?DEFAULT_HEALTH_THRESHOLD_SEC + ). -spec find_jobs_by_dbname(binary()) -> list(#rep{}). find_jobs_by_dbname(DbName) -> @@ -191,14 +184,12 @@ find_jobs_by_dbname(DbName) -> MatchSpec = #job{id = '$1', rep = Rep, _ = '_'}, [RepId || [RepId] <- ets:match(?MODULE, MatchSpec)]. - -spec find_jobs_by_doc(binary(), binary()) -> list(#rep{}). find_jobs_by_doc(DbName, DocId) -> - Rep = #rep{db_name = DbName, doc_id = DocId, _ = '_'}, + Rep = #rep{db_name = DbName, doc_id = DocId, _ = '_'}, MatchSpec = #job{id = '$1', rep = Rep, _ = '_'}, [RepId || [RepId] <- ets:match(?MODULE, MatchSpec)]. - -spec restart_job(binary() | list() | rep_id()) -> {ok, {[_]}} | {error, not_found}. restart_job(JobId) -> @@ -211,28 +202,39 @@ restart_job(JobId) -> job(JobId) end. - -spec update_job_stats(job_id(), term()) -> ok. update_job_stats(JobId, Stats) -> gen_server:cast(?MODULE, {update_job_stats, JobId, Stats}). - %% gen_server functions init(_) -> config:enable_feature('scheduler'), - EtsOpts = [named_table, {keypos, #job.id}, {read_concurrency, true}, - {write_concurrency, true}], + EtsOpts = [ + named_table, + {keypos, #job.id}, + {read_concurrency, true}, + {write_concurrency, true} + ], ?MODULE = ets:new(?MODULE, EtsOpts), ok = couch_replicator_share:init(), ok = config:listen_for_changes(?MODULE, nil), - Interval = config:get_integer("replicator", "interval", - ?DEFAULT_SCHEDULER_INTERVAL), + Interval = config:get_integer( + "replicator", + "interval", + ?DEFAULT_SCHEDULER_INTERVAL + ), MaxJobs = config:get_integer("replicator", "max_jobs", ?DEFAULT_MAX_JOBS), - MaxChurn = config:get_integer("replicator", "max_churn", - ?DEFAULT_MAX_CHURN), - MaxHistory = config:get_integer("replicator", "max_history", - ?DEFAULT_MAX_HISTORY), + MaxChurn = config:get_integer( + "replicator", + "max_churn", + ?DEFAULT_MAX_CHURN + ), + MaxHistory = config:get_integer( + "replicator", + "max_history", + ?DEFAULT_MAX_HISTORY + ), Timer = erlang:send_after(Interval, self(), reschedule), State = #state{ interval = Interval, @@ -244,7 +246,6 @@ init(_) -> }, {ok, State}. - handle_call({add_job, Job}, _From, State) -> ok = maybe_remove_job_int(Job#job.id, State), true = add_job_int(Job), @@ -253,50 +254,51 @@ handle_call({add_job, Job}, _From, State) -> TotalJobs = ets:info(?MODULE, size), couch_stats:update_gauge([couch_replicator, jobs, total], TotalJobs), {reply, ok, State}; - handle_call({remove_job, Id}, _From, State) -> ok = maybe_remove_job_int(Id, State), {reply, ok, State}; - handle_call(reschedule, _From, State) -> ok = reschedule(State), {reply, ok, State}; - handle_call(_, _From, State) -> {noreply, State}. - -handle_cast({set_max_jobs, MaxJobs}, State) when is_integer(MaxJobs), - MaxJobs >= 0 -> +handle_cast({set_max_jobs, MaxJobs}, State) when + is_integer(MaxJobs), + MaxJobs >= 0 +-> couch_log:notice("~p: max_jobs set to ~B", [?MODULE, MaxJobs]), {noreply, State#state{max_jobs = MaxJobs}}; - -handle_cast({set_max_churn, MaxChurn}, State) when is_integer(MaxChurn), - MaxChurn > 0 -> +handle_cast({set_max_churn, MaxChurn}, State) when + is_integer(MaxChurn), + MaxChurn > 0 +-> couch_log:notice("~p: max_churn set to ~B", [?MODULE, MaxChurn]), {noreply, State#state{max_churn = MaxChurn}}; - -handle_cast({set_max_history, MaxHistory}, State) when is_integer(MaxHistory), - MaxHistory > 0 -> +handle_cast({set_max_history, MaxHistory}, State) when + is_integer(MaxHistory), + MaxHistory > 0 +-> couch_log:notice("~p: max_history set to ~B", [?MODULE, MaxHistory]), {noreply, State#state{max_history = MaxHistory}}; - -handle_cast({set_interval, Interval}, State) when is_integer(Interval), - Interval > 0 -> +handle_cast({set_interval, Interval}, State) when + is_integer(Interval), + Interval > 0 +-> couch_log:notice("~p: interval set to ~B", [?MODULE, Interval]), {noreply, State#state{interval = Interval}}; - -handle_cast({update_shares, Key, Shares}, State) when is_binary(Key), - is_integer(Shares), Shares >= 0 -> +handle_cast({update_shares, Key, Shares}, State) when + is_binary(Key), + is_integer(Shares), + Shares >= 0 +-> couch_log:notice("~p: shares for ~s set to ~B", [?MODULE, Key, Shares]), couch_replicator_share:update_shares(Key, Shares), {noreply, State}; - -handle_cast({reset_shares, Key}, State) when is_binary(Key) -> +handle_cast({reset_shares, Key}, State) when is_binary(Key) -> couch_log:notice("~p: shares for ~s reset to default", [?MODULE, Key]), couch_replicator_share:reset_shares(Key), {noreply, State}; - handle_cast({update_job_stats, JobId, Stats}, State) -> case rep_state(JobId) of nil -> @@ -306,18 +308,15 @@ handle_cast({update_job_stats, JobId, Stats}, State) -> true = ets:update_element(?MODULE, JobId, {#job.rep, NewRep}) end, {noreply, State}; - handle_cast(UnexpectedMsg, State) -> couch_log:error("~p: received un-expected cast ~p", [?MODULE, UnexpectedMsg]), {noreply, State}. - handle_info(reschedule, State) -> ok = reschedule(State), erlang:cancel_timer(State#state.timer), Timer = erlang:send_after(State#state.interval, self(), reschedule), {noreply, State#state{timer = Timer}}; - handle_info({'DOWN', _Ref, process, Pid, normal}, State) -> {ok, Job} = job_by_pid(Pid), couch_log:notice("~p: Job ~p completed normally", [?MODULE, Job#job.id]), @@ -326,82 +325,66 @@ handle_info({'DOWN', _Ref, process, Pid, normal}, State) -> remove_job_int(Job), update_running_jobs_stats(State#state.stats_pid), {noreply, State}; - handle_info({'DOWN', _Ref, process, Pid, Reason0}, State) -> {ok, Job} = job_by_pid(Pid), - Reason = case Reason0 of - {shutdown, ShutdownReason} -> ShutdownReason; - Other -> Other - end, + Reason = + case Reason0 of + {shutdown, ShutdownReason} -> ShutdownReason; + Other -> Other + end, Interval = State#state.interval, couch_replicator_share:charge(Job, Interval, os:timestamp()), ok = handle_crashed_job(Job, Reason, State), {noreply, State}; - handle_info(restart_config_listener, State) -> ok = config:listen_for_changes(?MODULE, nil), {noreply, State}; - handle_info(_, State) -> {noreply, State}. - code_change(_OldVsn, State, _Extra) -> {ok, State}. - terminate(_Reason, _State) -> couch_replicator_share:clear(), ok. - format_status(_Opt, [_PDict, State]) -> [ - {max_jobs, State#state.max_jobs}, - {running_jobs, running_job_count()}, - {pending_jobs, pending_job_count()} + {max_jobs, State#state.max_jobs}, + {running_jobs, running_job_count()}, + {pending_jobs, pending_job_count()} ]. - %% config listener functions handle_config_change("replicator", "max_jobs", V, _, S) -> ok = gen_server:cast(?MODULE, {set_max_jobs, list_to_integer(V)}), {ok, S}; - handle_config_change("replicator", "max_churn", V, _, S) -> ok = gen_server:cast(?MODULE, {set_max_churn, list_to_integer(V)}), {ok, S}; - handle_config_change("replicator", "interval", V, _, S) -> ok = gen_server:cast(?MODULE, {set_interval, list_to_integer(V)}), {ok, S}; - handle_config_change("replicator", "max_history", V, _, S) -> ok = gen_server:cast(?MODULE, {set_max_history, list_to_integer(V)}), {ok, S}; - handle_config_change("replicator.shares", Key, deleted, _, S) -> ok = gen_server:cast(?MODULE, {reset_shares, list_to_binary(Key)}), {ok, S}; - handle_config_change("replicator.shares", Key, V, _, S) -> - ok = gen_server:cast(?MODULE, {update_shares, list_to_binary(Key), - list_to_integer(V)}), + ok = gen_server:cast(?MODULE, {update_shares, list_to_binary(Key), list_to_integer(V)}), {ok, S}; - handle_config_change(_, _, _, _, S) -> {ok, S}. - handle_config_terminate(_, stop, _) -> ok; - handle_config_terminate(_, _, _) -> Pid = whereis(?MODULE), erlang:send_after(?RELISTEN_DELAY, Pid, restart_config_listener). - %% Private functions % Handle crashed jobs. Handling differs between transient and permanent jobs. @@ -420,7 +403,6 @@ handle_crashed_job(#job{rep = #rep{db_name = null}} = Job, Reason, State) -> remove_job_int(Job), update_running_jobs_stats(State#state.stats_pid), ok; - handle_crashed_job(Job, Reason, State) -> ok = update_state_crashed(Job, Reason, State), case couch_replicator_doc_processor:update_docs() of @@ -442,7 +424,6 @@ handle_crashed_job(Job, Reason, State) -> ok end. - % Attempt to start a newly added job. First quickly check if total jobs % already exceed max jobs, then do a more expensive check which runs a % select (an O(n) operation) to check pending jobs specifically. @@ -459,7 +440,6 @@ maybe_start_newly_added_job(Job, State) -> ok end. - % Return up to a given number of oldest, not recently crashed jobs. Try to be % memory efficient and use ets:foldl to accumulate jobs. -spec pending_jobs(non_neg_integer()) -> [#job{}]. @@ -468,36 +448,34 @@ pending_jobs(0) -> % other function clause it will crash as gb_sets:largest assumes set is not % empty. []; - pending_jobs(Count) when is_integer(Count), Count > 0 -> - Set0 = gb_sets:new(), % [{{Priority, LastStart}, Job},...] + % [{{Priority, LastStart}, Job},...] + Set0 = gb_sets:new(), Now = os:timestamp(), Acc0 = {Set0, Now, Count, health_threshold()}, {Set1, _, _, _} = ets:foldl(fun pending_fold/2, Acc0, ?MODULE), [Job || {_PriorityKey, Job} <- gb_sets:to_list(Set1)]. - pending_fold(#job{pid = Pid}, Acc) when is_pid(Pid) -> Acc; - pending_fold(Job, {Set, Now, Count, HealthThreshold}) -> Healthy = not_recently_crashed(Job, Now, HealthThreshold), - Set1 = case {Healthy, gb_sets:size(Set) >= Count} of - {true, true} -> - % Job is healthy but already reached accumulated limit, so might - % have to replace one of the accumulated jobs - pending_maybe_replace(Job, Set); - {true, false} -> - % Job is healthy and we haven't reached the limit, so add job - % to accumulator - gb_sets:add_element({start_priority_key(Job), Job}, Set); - {false, _} -> - % This job is not healthy (has crashed too recently), so skip it. - Set - end, + Set1 = + case {Healthy, gb_sets:size(Set) >= Count} of + {true, true} -> + % Job is healthy but already reached accumulated limit, so might + % have to replace one of the accumulated jobs + pending_maybe_replace(Job, Set); + {true, false} -> + % Job is healthy and we haven't reached the limit, so add job + % to accumulator + gb_sets:add_element({start_priority_key(Job), Job}, Set); + {false, _} -> + % This job is not healthy (has crashed too recently), so skip it. + Set + end, {Set1, Now, Count, HealthThreshold}. - % Replace Job in the accumulator if it has a higher priority (lower priority % value) than the lowest priority there. Job priority is indexed by % {FairSharePiority, LastStarted} tuples. If the FairSharePriority is the same @@ -526,16 +504,13 @@ pending_maybe_replace(Job, Set) -> start_priority_key(#job{} = Job) -> {couch_replicator_share:priority(Job#job.id), last_started(Job)}. - start_jobs(Count, State) -> [start_job_int(Job, State) || Job <- pending_jobs(Count)], ok. - -spec stop_jobs(non_neg_integer(), boolean(), #state{}) -> non_neg_integer(). stop_jobs(Count, _, _) when is_integer(Count), Count =< 0 -> 0; - stop_jobs(Count, IsContinuous, State) when is_integer(Count) -> Running0 = running_jobs(), ContinuousPred = fun(Job) -> is_continuous(Job) =:= IsContinuous end, @@ -544,7 +519,6 @@ stop_jobs(Count, IsContinuous, State) when is_integer(Count) -> Running3 = lists:sublist(lists:sort(Running2), Count), length([stop_job_int(Job, State) || {_SortKey, Job} <- Running3]). - % Lower priority jobs have higher priority values, so we negate them, that way % when sorted, they'll come up first. If priorities are equal, jobs are sorted % by the lowest starting times as jobs with lowest start time have been running @@ -553,7 +527,6 @@ stop_jobs(Count, IsContinuous, State) when is_integer(Count) -> stop_priority_key(#job{} = Job) -> {-couch_replicator_share:priority(Job#job.id), last_started(Job)}. - not_recently_crashed(#job{history = History}, Now, HealthThreshold) -> case History of [{added, _When}] -> @@ -566,7 +539,6 @@ not_recently_crashed(#job{history = History}, Now, HealthThreshold) -> timer:now_diff(Now, LatestCrashT) >= backoff_micros(CrashCount) end. - % Count consecutive crashes. A crash happens when there is a `crashed` event % within a short period of time (configurable) after any other event. It could % be `crashed, started` for jobs crashing quickly after starting, `crashed, @@ -583,40 +555,39 @@ not_recently_crashed(#job{history = History}, Now, HealthThreshold) -> consecutive_crashes(History, HealthThreshold) when is_list(History) -> consecutive_crashes(History, HealthThreshold, 0). - -spec consecutive_crashes(history(), non_neg_integer(), non_neg_integer()) -> - non_neg_integer(). + non_neg_integer(). consecutive_crashes([], _HealthThreashold, Count) -> Count; - -consecutive_crashes([{{crashed, _}, CrashT}, {_, PrevT} = PrevEvent | Rest], - HealthThreshold, Count) -> +consecutive_crashes( + [{{crashed, _}, CrashT}, {_, PrevT} = PrevEvent | Rest], + HealthThreshold, + Count +) -> case timer:now_diff(CrashT, PrevT) > HealthThreshold * 1000000 of true -> Count; false -> consecutive_crashes([PrevEvent | Rest], HealthThreshold, Count + 1) end; - -consecutive_crashes([{stopped, _}, {started, _} | _], _HealthThreshold, - Count) -> +consecutive_crashes( + [{stopped, _}, {started, _} | _], + _HealthThreshold, + Count +) -> Count; - consecutive_crashes([_ | Rest], HealthThreshold, Count) -> consecutive_crashes(Rest, HealthThreshold, Count). - -spec latest_crash_timestamp(history()) -> erlang:timestamp(). latest_crash_timestamp([]) -> - {0, 0, 0}; % Used to avoid special-casing "no crash" when doing now_diff - + % Used to avoid special-casing "no crash" when doing now_diff + {0, 0, 0}; latest_crash_timestamp([{{crashed, _Reason}, When} | _]) -> When; - latest_crash_timestamp([_Event | Rest]) -> latest_crash_timestamp(Rest). - -spec backoff_micros(non_neg_integer()) -> non_neg_integer(). backoff_micros(CrashCount) -> % When calculating the backoff interval treat consecutive crash count as the @@ -626,13 +597,11 @@ backoff_micros(CrashCount) -> BackoffExp = erlang:min(CrashCount - 1, ?MAX_BACKOFF_EXPONENT), (1 bsl BackoffExp) * ?BACKOFF_INTERVAL_MICROS. - -spec add_job_int(#job{}) -> boolean(). add_job_int(#job{} = Job) -> couch_replicator_share:job_added(Job), ets:insert_new(?MODULE, Job). - -spec maybe_remove_job_int(job_id(), #state{}) -> ok. maybe_remove_job_int(JobId, State) -> case job_by_id(JobId) of @@ -644,98 +613,99 @@ maybe_remove_job_int(JobId, State) -> true = remove_job_int(Job), couch_stats:increment_counter([couch_replicator, jobs, removes]), TotalJobs = ets:info(?MODULE, size), - couch_stats:update_gauge([couch_replicator, jobs, total], - TotalJobs), + couch_stats:update_gauge( + [couch_replicator, jobs, total], + TotalJobs + ), update_running_jobs_stats(State#state.stats_pid), ok; {error, not_found} -> ok end. - start_job_int(#job{pid = Pid}, _State) when Pid /= undefined -> ok; - start_job_int(#job{} = Job0, State) -> Job = maybe_optimize_job_for_rate_limiting(Job0), case couch_replicator_scheduler_sup:start_child(Job#job.rep) of {ok, Child} -> Ref = monitor(process, Child), ok = update_state_started(Job, Child, Ref, State), - couch_log:notice("~p: Job ~p started as ~p", - [?MODULE, Job#job.id, Child]); + couch_log:notice( + "~p: Job ~p started as ~p", + [?MODULE, Job#job.id, Child] + ); {error, {already_started, OtherPid}} when node(OtherPid) =:= node() -> Ref = monitor(process, OtherPid), ok = update_state_started(Job, OtherPid, Ref, State), - couch_log:notice("~p: Job ~p already running as ~p. Most likely" + couch_log:notice( + "~p: Job ~p already running as ~p. Most likely" " because replicator scheduler was restarted", - [?MODULE, Job#job.id, OtherPid]); + [?MODULE, Job#job.id, OtherPid] + ); {error, {already_started, OtherPid}} when node(OtherPid) =/= node() -> CrashMsg = "Duplicate replication running on another node", - couch_log:notice("~p: Job ~p already running as ~p. Most likely" + couch_log:notice( + "~p: Job ~p already running as ~p. Most likely" " because a duplicate replication is running on another node", - [?MODULE, Job#job.id, OtherPid]), + [?MODULE, Job#job.id, OtherPid] + ), ok = update_state_crashed(Job, CrashMsg, State); {error, Reason} -> - couch_log:notice("~p: Job ~p failed to start for reason ~p", - [?MODULE, Job, Reason]), + couch_log:notice( + "~p: Job ~p failed to start for reason ~p", + [?MODULE, Job, Reason] + ), ok = update_state_crashed(Job, Reason, State) end. - -spec stop_job_int(#job{}, #state{}) -> ok | {error, term()}. stop_job_int(#job{pid = undefined}, _State) -> ok; - stop_job_int(#job{} = Job, State) -> ok = couch_replicator_scheduler_sup:terminate_child(Job#job.pid), demonitor(Job#job.monitor, [flush]), ok = update_state_stopped(Job, State), - couch_log:notice("~p: Job ~p stopped as ~p", - [?MODULE, Job#job.id, Job#job.pid]). - + couch_log:notice( + "~p: Job ~p stopped as ~p", + [?MODULE, Job#job.id, Job#job.pid] + ). -spec remove_job_int(#job{}) -> true. remove_job_int(#job{} = Job) -> couch_replicator_share:job_removed(Job), ets:delete(?MODULE, Job#job.id). - -spec running_job_count() -> non_neg_integer(). running_job_count() -> ets:info(?MODULE, size) - pending_job_count(). - -spec running_jobs() -> [#job{}]. running_jobs() -> - ets:select(?MODULE, [{#job{pid = '$1', _='_'}, [{is_pid, '$1'}], ['$_']}]). - + ets:select(?MODULE, [{#job{pid = '$1', _ = '_'}, [{is_pid, '$1'}], ['$_']}]). -spec pending_job_count() -> non_neg_integer(). pending_job_count() -> - ets:select_count(?MODULE, [{#job{pid=undefined, _='_'}, [], [true]}]). - + ets:select_count(?MODULE, [{#job{pid = undefined, _ = '_'}, [], [true]}]). -spec job_by_pid(pid()) -> {ok, #job{}} | {error, not_found}. job_by_pid(Pid) when is_pid(Pid) -> - case ets:match_object(?MODULE, #job{pid=Pid, _='_'}) of + case ets:match_object(?MODULE, #job{pid = Pid, _ = '_'}) of [] -> {error, not_found}; - [#job{}=Job] -> + [#job{} = Job] -> {ok, Job} end. - -spec job_by_id(job_id()) -> {ok, #job{}} | {error, not_found}. job_by_id(Id) -> case ets:lookup(?MODULE, Id) of [] -> {error, not_found}; - [#job{}=Job] -> + [#job{} = Job] -> {ok, Job} end. - -spec update_state_stopped(#job{}, #state{}) -> ok. update_state_stopped(Job, State) -> Job1 = reset_job_process(Job), @@ -744,7 +714,6 @@ update_state_stopped(Job, State) -> couch_stats:increment_counter([couch_replicator, jobs, stops]), ok. - -spec update_state_started(#job{}, pid(), reference(), #state{}) -> ok. update_state_started(Job, Pid, Ref, State) -> Job1 = set_job_process(Job, Pid, Ref), @@ -753,7 +722,6 @@ update_state_started(Job, Pid, Ref, State) -> couch_stats:increment_counter([couch_replicator, jobs, starts]), ok. - -spec update_state_crashed(#job{}, any(), #state{}) -> ok. update_state_crashed(Job, Reason, State) -> Job1 = reset_job_process(Job), @@ -762,17 +730,14 @@ update_state_crashed(Job, Reason, State) -> couch_stats:increment_counter([couch_replicator, jobs, crashes]), ok. - -spec set_job_process(#job{}, pid(), reference()) -> #job{}. set_job_process(#job{} = Job, Pid, Ref) when is_pid(Pid), is_reference(Ref) -> Job#job{pid = Pid, monitor = Ref}. - -spec reset_job_process(#job{}) -> #job{}. reset_job_process(#job{} = Job) -> Job#job{pid = undefined, monitor = undefined}. - -spec reschedule(#state{}) -> ok. reschedule(#state{interval = Interval} = State) -> couch_replicator_share:update(running_jobs(), Interval, os:timestamp()), @@ -780,50 +745,48 @@ reschedule(#state{interval = Interval} = State) -> rotate_jobs(State, StopCount), update_running_jobs_stats(State#state.stats_pid). - -spec stop_excess_jobs(#state{}, non_neg_integer()) -> non_neg_integer(). stop_excess_jobs(State, Running) -> - #state{max_jobs=MaxJobs} = State, + #state{max_jobs = MaxJobs} = State, StopCount = max(0, Running - MaxJobs), Stopped = stop_jobs(StopCount, true, State), OneshotLeft = StopCount - Stopped, stop_jobs(OneshotLeft, false, State), StopCount. - start_pending_jobs(State) -> - #state{max_jobs=MaxJobs} = State, + #state{max_jobs = MaxJobs} = State, Running = running_job_count(), Pending = pending_job_count(), - if Running < MaxJobs, Pending > 0 -> - start_jobs(MaxJobs - Running, State); - true -> - ok + if + Running < MaxJobs, Pending > 0 -> + start_jobs(MaxJobs - Running, State); + true -> + ok end. - -spec rotate_jobs(#state{}, non_neg_integer()) -> ok. rotate_jobs(State, ChurnSoFar) -> - #state{max_jobs=MaxJobs, max_churn=MaxChurn} = State, + #state{max_jobs = MaxJobs, max_churn = MaxChurn} = State, Running = running_job_count(), Pending = pending_job_count(), % Reduce MaxChurn by the number of already stopped jobs in the % current rescheduling cycle. Churn = max(0, MaxChurn - ChurnSoFar), SlotsAvailable = MaxJobs - Running, - if SlotsAvailable >= 0 -> - % If there is are enough SlotsAvailable reduce StopCount to avoid - % unnesessarily stopping jobs. `stop_jobs/3` ignores 0 or negative - % values so we don't worry about that here. - StopCount = lists:min([Pending - SlotsAvailable, Running, Churn]), - stop_jobs(StopCount, true, State), - StartCount = max(0, MaxJobs - running_job_count()), - start_jobs(StartCount, State); - true -> - ok + if + SlotsAvailable >= 0 -> + % If there is are enough SlotsAvailable reduce StopCount to avoid + % unnesessarily stopping jobs. `stop_jobs/3` ignores 0 or negative + % values so we don't worry about that here. + StopCount = lists:min([Pending - SlotsAvailable, Running, Churn]), + stop_jobs(StopCount, true, State), + StartCount = max(0, MaxJobs - running_job_count()), + start_jobs(StartCount, State); + true -> + ok end. - -spec last_started(#job{}) -> erlang:timestamp(). last_started(#job{} = Job) -> case lists:keyfind(started, 1, Job#job.history) of @@ -833,7 +796,6 @@ last_started(#job{} = Job) -> When end. - -spec update_history(#job{}, event_type(), erlang:timestamp(), #state{}) -> #job{}. update_history(Job, Type, When, State) -> @@ -841,35 +803,38 @@ update_history(Job, Type, When, State) -> History1 = lists:sublist(History0, State#state.max_history), Job#job{history = History1}. - -spec ejson_url(#httpdb{} | binary()) -> binary(). -ejson_url(#httpdb{}=Httpdb) -> +ejson_url(#httpdb{} = Httpdb) -> couch_util:url_strip_password(Httpdb#httpdb.url); ejson_url(DbName) when is_binary(DbName) -> DbName. - -spec job_ejson(#job{}) -> {[_ | _]}. job_ejson(Job) -> Rep = Job#job.rep, Source = ejson_url(Rep#rep.source), Target = ejson_url(Rep#rep.target), - History = lists:map(fun({Type, When}) -> - EventProps = case Type of - {crashed, Reason} -> - [{type, crashed}, {reason, crash_reason_json(Reason)}]; - Type -> - [{type, Type}] + History = lists:map( + fun({Type, When}) -> + EventProps = + case Type of + {crashed, Reason} -> + [{type, crashed}, {reason, crash_reason_json(Reason)}]; + Type -> + [{type, Type}] + end, + {[{timestamp, couch_replicator_utils:iso8601(When)} | EventProps]} end, - {[{timestamp, couch_replicator_utils:iso8601(When)} | EventProps]} - end, Job#job.history), + Job#job.history + ), {BaseID, Ext} = Job#job.id, - Pid = case Job#job.pid of - undefined -> - null; - P when is_pid(P) -> - ?l2b(pid_to_list(P)) - end, + Pid = + case Job#job.pid of + undefined -> + null; + P when is_pid(P) -> + ?l2b(pid_to_list(P)) + end, {[ {id, iolist_to_binary([BaseID, Ext])}, {pid, Pid}, @@ -884,12 +849,10 @@ job_ejson(Job) -> {start_time, couch_replicator_utils:iso8601(Rep#rep.start_time)} ]}. - -spec jobs() -> [[tuple()]]. jobs() -> ets:foldl(fun(Job, Acc) -> [job_ejson(Job) | Acc] end, [], ?MODULE). - -spec job(job_id()) -> {ok, {[_ | _]}} | {error, not_found}. job(JobId) -> case job_by_id(JobId) of @@ -899,7 +862,6 @@ job(JobId) -> Error end. - crash_reason_json({_CrashType, Info}) when is_binary(Info) -> Info; crash_reason_json(Reason) when is_binary(Reason) -> @@ -907,22 +869,23 @@ crash_reason_json(Reason) when is_binary(Reason) -> crash_reason_json(Error) -> couch_replicator_utils:rep_error_to_binary(Error). - -spec last_updated([_]) -> binary(). last_updated([{_Type, When} | _]) -> couch_replicator_utils:iso8601(When). - -spec is_continuous(#job{}) -> boolean(). is_continuous(#job{rep = Rep}) -> couch_util:get_value(continuous, Rep#rep.options, false). - % If job crashed last time because it was rate limited, try to % optimize some options to help the job make progress. -spec maybe_optimize_job_for_rate_limiting(#job{}) -> #job{}. -maybe_optimize_job_for_rate_limiting(Job = #job{history = - [{{crashed, max_backoff}, _} | _]}) -> +maybe_optimize_job_for_rate_limiting( + Job = #job{ + history = + [{{crashed, max_backoff}, _} | _] + } +) -> Opts = [ {checkpoint_interval, 5000}, {worker_processes, 2}, @@ -934,7 +897,6 @@ maybe_optimize_job_for_rate_limiting(Job = #job{history = maybe_optimize_job_for_rate_limiting(Job) -> Job. - -spec optimize_int_option({atom(), any()}, #rep{}) -> #rep{}. optimize_int_option({Key, Val}, #rep{options = Options} = Rep) -> case couch_util:get_value(Key, Options) of @@ -947,7 +909,6 @@ optimize_int_option({Key, Val}, #rep{options = Options} = Rep) -> Rep end. - % Updater is a separate process. It receives `update_stats` messages and % updates scheduler stats from the scheduler jobs table. Updates are % performed no more frequently than once per ?STATS_UPDATE_WAIT milliseconds. @@ -956,11 +917,9 @@ update_running_jobs_stats(StatsPid) when is_pid(StatsPid) -> StatsPid ! update_stats, ok. - start_stats_updater() -> erlang:spawn_link(?MODULE, stats_updater_loop, [undefined]). - stats_updater_loop(Timer) -> receive update_stats when Timer == undefined -> @@ -975,31 +934,28 @@ stats_updater_loop(Timer) -> erlang:exit({stats_updater_bad_msg, Else}) end. - -spec stats_updater_refresh() -> ok. stats_updater_refresh() -> #stats_acc{ - pending_n = PendingN, - running_n = RunningN, - crashed_n = CrashedN - } = ets:foldl(fun stats_fold/2, #stats_acc{}, ?MODULE), + pending_n = PendingN, + running_n = RunningN, + crashed_n = CrashedN + } = ets:foldl(fun stats_fold/2, #stats_acc{}, ?MODULE), couch_stats:update_gauge([couch_replicator, jobs, pending], PendingN), couch_stats:update_gauge([couch_replicator, jobs, running], RunningN), couch_stats:update_gauge([couch_replicator, jobs, crashed], CrashedN), ok. - -spec stats_fold(#job{}, #stats_acc{}) -> #stats_acc{}. stats_fold(#job{pid = undefined, history = [{added, _}]}, Acc) -> Acc#stats_acc{pending_n = Acc#stats_acc.pending_n + 1}; stats_fold(#job{pid = undefined, history = [{stopped, _} | _]}, Acc) -> Acc#stats_acc{pending_n = Acc#stats_acc.pending_n + 1}; stats_fold(#job{pid = undefined, history = [{{crashed, _}, _} | _]}, Acc) -> - Acc#stats_acc{crashed_n =Acc#stats_acc.crashed_n + 1}; + Acc#stats_acc{crashed_n = Acc#stats_acc.crashed_n + 1}; stats_fold(#job{pid = P, history = [{started, _} | _]}, Acc) when is_pid(P) -> Acc#stats_acc{running_n = Acc#stats_acc.running_n + 1}. - -spec existing_replication(#rep{}) -> boolean(). existing_replication(#rep{} = NewRep) -> case job_by_id(NewRep#rep.id) of @@ -1011,67 +967,85 @@ existing_replication(#rep{} = NewRep) -> false end. - -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). - backoff_micros_test_() -> BaseInterval = ?BACKOFF_INTERVAL_MICROS, - [?_assertEqual(R * BaseInterval, backoff_micros(N)) || {R, N} <- [ - {1, 1}, {2, 2}, {4, 3}, {8, 4}, {16, 5}, {32, 6}, {64, 7}, {128, 8}, - {256, 9}, {512, 10}, {1024, 11}, {1024, 12} - ]]. - + [ + ?_assertEqual(R * BaseInterval, backoff_micros(N)) + || {R, N} <- [ + {1, 1}, + {2, 2}, + {4, 3}, + {8, 4}, + {16, 5}, + {32, 6}, + {64, 7}, + {128, 8}, + {256, 9}, + {512, 10}, + {1024, 11}, + {1024, 12} + ] + ]. consecutive_crashes_test_() -> Threshold = ?DEFAULT_HEALTH_THRESHOLD_SEC, - [?_assertEqual(R, consecutive_crashes(H, Threshold)) || {R, H} <- [ - {0, []}, - {0, [added()]}, - {0, [stopped()]}, - {0, [crashed()]}, - {1, [crashed(), added()]}, - {1, [crashed(), crashed()]}, - {1, [crashed(), stopped()]}, - {3, [crashed(), crashed(), crashed(), added()]}, - {2, [crashed(), crashed(), stopped()]}, - {1, [crashed(), started(), added()]}, - {2, [crashed(3), started(2), crashed(1), started(0)]}, - {0, [stopped(3), started(2), crashed(1), started(0)]}, - {1, [crashed(3), started(2), stopped(1), started(0)]}, - {0, [crashed(999), started(0)]}, - {1, [crashed(999), started(998), crashed(997), started(0)]} - ]]. - + [ + ?_assertEqual(R, consecutive_crashes(H, Threshold)) + || {R, H} <- [ + {0, []}, + {0, [added()]}, + {0, [stopped()]}, + {0, [crashed()]}, + {1, [crashed(), added()]}, + {1, [crashed(), crashed()]}, + {1, [crashed(), stopped()]}, + {3, [crashed(), crashed(), crashed(), added()]}, + {2, [crashed(), crashed(), stopped()]}, + {1, [crashed(), started(), added()]}, + {2, [crashed(3), started(2), crashed(1), started(0)]}, + {0, [stopped(3), started(2), crashed(1), started(0)]}, + {1, [crashed(3), started(2), stopped(1), started(0)]}, + {0, [crashed(999), started(0)]}, + {1, [crashed(999), started(998), crashed(997), started(0)]} + ] + ]. consecutive_crashes_non_default_threshold_test_() -> - [?_assertEqual(R, consecutive_crashes(H, T)) || {R, H, T} <- [ - {0, [crashed(11), started(0)], 10}, - {1, [crashed(10), started(0)], 10} - ]]. - + [ + ?_assertEqual(R, consecutive_crashes(H, T)) + || {R, H, T} <- [ + {0, [crashed(11), started(0)], 10}, + {1, [crashed(10), started(0)], 10} + ] + ]. latest_crash_timestamp_test_() -> - [?_assertEqual({0, R, 0}, latest_crash_timestamp(H)) || {R, H} <- [ - {0, [added()]}, - {1, [crashed(1)]}, - {3, [crashed(3), started(2), crashed(1), started(0)]}, - {1, [started(3), stopped(2), crashed(1), started(0)]} - ]]. - + [ + ?_assertEqual({0, R, 0}, latest_crash_timestamp(H)) + || {R, H} <- [ + {0, [added()]}, + {1, [crashed(1)]}, + {3, [crashed(3), started(2), crashed(1), started(0)]}, + {1, [started(3), stopped(2), crashed(1), started(0)]} + ] + ]. last_started_test_() -> - [?_assertEqual({0, R, 0}, last_started(testjob(H))) || {R, H} <- [ - {0, [added()]}, - {0, [crashed(1)]}, - {1, [started(1)]}, - {1, [added(), started(1)]}, - {2, [started(2), started(1)]}, - {2, [crashed(3), started(2), started(1)]} - ]]. - + [ + ?_assertEqual({0, R, 0}, last_started(testjob(H))) + || {R, H} <- [ + {0, [added()]}, + {0, [crashed(1)]}, + {1, [started(1)]}, + {1, [added(), started(1)]}, + {2, [started(2), started(1)]}, + {2, [crashed(3), started(2), started(1)]} + ] + ]. longest_running_test() -> J0 = testjob([crashed()]), @@ -1084,7 +1058,6 @@ longest_running_test() -> ?assertEqual([J1, J2], Sort([J2, J1])), ?assertEqual([J0, J1, J2], Sort([J2, J1, J0])). - scheduler_test_() -> { setup, @@ -1131,9 +1104,8 @@ scheduler_test_() -> } }. - t_pending_jobs_simple() -> - ?_test(begin + ?_test(begin Job1 = oneshot(1), Job2 = oneshot(2), setup_jobs([Job2, Job1]), @@ -1143,9 +1115,8 @@ t_pending_jobs_simple() -> ?assertEqual([Job1, Job2], pending_jobs(3)) end). - t_pending_jobs_skip_crashed() -> - ?_test(begin + ?_test(begin Job = oneshot(1), Ts = os:timestamp(), History = [crashed(Ts), started(Ts) | Job#job.history], @@ -1158,9 +1129,8 @@ t_pending_jobs_skip_crashed() -> ?assertEqual([Job2, Job3], pending_jobs(3)) end). - t_pending_jobs_skip_running() -> - ?_test(begin + ?_test(begin Job1 = continuous(1), Job2 = continuous_running(2), Job3 = oneshot(3), @@ -1170,7 +1140,6 @@ t_pending_jobs_skip_running() -> ?assertEqual([Job1, Job3], pending_jobs(4)) end). - t_one_job_starts() -> ?_test(begin setup_jobs([oneshot(1)]), @@ -1179,7 +1148,6 @@ t_one_job_starts() -> ?assertEqual({1, 0}, run_stop_count()) end). - t_no_jobs_start_if_max_is_0() -> ?_test(begin setup_jobs([oneshot(1)]), @@ -1187,7 +1155,6 @@ t_no_jobs_start_if_max_is_0() -> ?assertEqual({0, 1}, run_stop_count()) end). - t_one_job_starts_if_max_is_1() -> ?_test(begin setup_jobs([oneshot(1), oneshot(2)]), @@ -1195,7 +1162,6 @@ t_one_job_starts_if_max_is_1() -> ?assertEqual({1, 1}, run_stop_count()) end). - t_max_churn_does_not_throttle_initial_start() -> ?_test(begin setup_jobs([oneshot(1), oneshot(2)]), @@ -1203,7 +1169,6 @@ t_max_churn_does_not_throttle_initial_start() -> ?assertEqual({2, 0}, run_stop_count()) end). - t_excess_oneshot_only_jobs() -> ?_test(begin setup_jobs([oneshot_running(1), oneshot_running(2)]), @@ -1214,7 +1179,6 @@ t_excess_oneshot_only_jobs() -> ?assertEqual({0, 2}, run_stop_count()) end). - t_excess_continuous_only_jobs() -> ?_test(begin setup_jobs([continuous_running(1), continuous_running(2)]), @@ -1225,7 +1189,6 @@ t_excess_continuous_only_jobs() -> ?assertEqual({0, 2}, run_stop_count()) end). - t_excess_prefer_continuous_first() -> ?_test(begin Jobs = [ @@ -1245,7 +1208,6 @@ t_excess_prefer_continuous_first() -> ?assertEqual({0, 1}, oneshot_run_stop_count()) end). - t_stop_oldest_first() -> ?_test(begin Jobs = [ @@ -1261,7 +1223,6 @@ t_stop_oldest_first() -> ?assertEqual([7], jobs_running()) end). - t_start_oldest_first() -> ?_test(begin setup_jobs([continuous(7), continuous(2), continuous(5)]), @@ -1275,7 +1236,6 @@ t_start_oldest_first() -> ?assertEqual([2], jobs_stopped()) end). - t_jobs_churn_even_if_not_all_max_jobs_are_running() -> ?_test(begin setup_jobs([ @@ -1288,9 +1248,8 @@ t_jobs_churn_even_if_not_all_max_jobs_are_running() -> ?assertEqual([7], jobs_stopped()) end). - t_jobs_dont_churn_if_there_are_available_running_slots() -> - ?_test(begin + ?_test(begin setup_jobs([ continuous_running(1), continuous_running(2) @@ -1301,9 +1260,8 @@ t_jobs_dont_churn_if_there_are_available_running_slots() -> ?assertEqual(0, meck:num_calls(couch_replicator_scheduler_sup, start_child, 1)) end). - t_start_only_pending_jobs_do_not_churn_existing_ones() -> - ?_test(begin + ?_test(begin setup_jobs([ continuous(1), continuous_running(2) @@ -1314,7 +1272,6 @@ t_start_only_pending_jobs_do_not_churn_existing_ones() -> ?assertEqual({2, 0}, run_stop_count()) end). - t_dont_stop_if_nothing_pending() -> ?_test(begin setup_jobs([continuous_running(1), continuous_running(2)]), @@ -1322,7 +1279,6 @@ t_dont_stop_if_nothing_pending() -> ?assertEqual({2, 0}, run_stop_count()) end). - t_max_churn_limits_number_of_rotated_jobs() -> ?_test(begin Jobs = [ @@ -1336,7 +1292,6 @@ t_max_churn_limits_number_of_rotated_jobs() -> ?assertEqual([2, 3], jobs_stopped()) end). - t_if_pending_less_than_running_start_all_pending() -> ?_test(begin Jobs = [ @@ -1351,7 +1306,6 @@ t_if_pending_less_than_running_start_all_pending() -> ?assertEqual([1, 2, 5], jobs_running()) end). - t_running_less_than_pending_swap_all_running() -> ?_test(begin Jobs = [ @@ -1366,7 +1320,6 @@ t_running_less_than_pending_swap_all_running() -> ?assertEqual([3, 4, 5], jobs_stopped()) end). - t_oneshot_dont_get_rotated() -> ?_test(begin setup_jobs([oneshot_running(1), continuous(2)]), @@ -1374,7 +1327,6 @@ t_oneshot_dont_get_rotated() -> ?assertEqual([1], jobs_running()) end). - t_rotate_continuous_only_if_mixed() -> ?_test(begin setup_jobs([continuous(1), oneshot_running(2), continuous_running(3)]), @@ -1382,7 +1334,6 @@ t_rotate_continuous_only_if_mixed() -> ?assertEqual([1, 2], jobs_running()) end). - t_oneshot_dont_get_starting_priority() -> ?_test(begin setup_jobs([continuous(1), oneshot(2), continuous_running(3)]), @@ -1390,7 +1341,6 @@ t_oneshot_dont_get_starting_priority() -> ?assertEqual([1], jobs_running()) end). - % This tested in other test cases, it is here to mainly make explicit a property % of one-shot replications -- they can starve other jobs if they "take control" % of all the available scheduler slots. @@ -1407,7 +1357,6 @@ t_oneshot_will_hog_the_scheduler() -> ?assertEqual([1, 2], jobs_running()) end). - t_if_excess_is_trimmed_rotation_still_happens() -> ?_test(begin Jobs = [ @@ -1420,12 +1369,11 @@ t_if_excess_is_trimmed_rotation_still_happens() -> ?assertEqual([1], jobs_running()) end). - t_if_transient_job_crashes_it_gets_removed() -> ?_test(begin Pid = mock_pid(), Rep = continuous_rep(), - Job = #job{ + Job = #job{ id = job1, pid = Pid, history = [added()], @@ -1434,17 +1382,18 @@ t_if_transient_job_crashes_it_gets_removed() -> setup_jobs([Job]), ?assertEqual(1, ets:info(?MODULE, size)), State = #state{max_history = 3, stats_pid = self()}, - {noreply, State} = handle_info({'DOWN', r1, process, Pid, failed}, - State), + {noreply, State} = handle_info( + {'DOWN', r1, process, Pid, failed}, + State + ), ?assertEqual(0, ets:info(?MODULE, size)) - end). - + end). t_if_permanent_job_crashes_it_stays_in_ets() -> ?_test(begin Pid = mock_pid(), Rep = continuous_rep(), - Job = #job{ + Job = #job{ id = job1, pid = Pid, history = [added()], @@ -1457,14 +1406,15 @@ t_if_permanent_job_crashes_it_stays_in_ets() -> max_history = 3, stats_pid = self() }, - {noreply, State} = handle_info({'DOWN', r1, process, Pid, failed}, - State), + {noreply, State} = handle_info( + {'DOWN', r1, process, Pid, failed}, + State + ), ?assertEqual(1, ets:info(?MODULE, size)), [Job1] = ets:lookup(?MODULE, job1), [Latest | _] = Job1#job.history, ?assertMatch({{crashed, failed}, _}, Latest) - end). - + end). t_existing_jobs() -> ?_test(begin @@ -1479,11 +1429,10 @@ t_existing_jobs() -> ?assertNot(existing_replication(NewRep#rep{options = []})) end). - t_job_summary_running() -> ?_test(begin Rep = rep(<<"s">>, <<"t">>), - Job = #job{ + Job = #job{ id = job1, pid = mock_pid(), history = [added()], @@ -1501,10 +1450,9 @@ t_job_summary_running() -> ?assertEqual({Stats}, proplists:get_value(info, Summary1)) end). - t_job_summary_pending() -> ?_test(begin - Job = #job{ + Job = #job{ id = job1, pid = undefined, history = [stopped(20), started(10), added()], @@ -1522,10 +1470,9 @@ t_job_summary_pending() -> ?assertEqual({Stats}, proplists:get_value(info, Summary1)) end). - t_job_summary_crashing_once() -> ?_test(begin - Job = #job{ + Job = #job{ id = job1, history = [crashed(?DEFAULT_HEALTH_THRESHOLD_SEC + 1), started(0)], rep = rep(<<"s">>, <<"t">>) @@ -1538,10 +1485,9 @@ t_job_summary_crashing_once() -> ?assertEqual(0, proplists:get_value(error_count, Summary)) end). - t_job_summary_crashing_many_times() -> ?_test(begin - Job = #job{ + Job = #job{ id = job1, history = [crashed(4), started(3), crashed(2), started(1)], rep = rep(<<"s">>, <<"t">>) @@ -1554,7 +1500,6 @@ t_job_summary_crashing_many_times() -> ?assertEqual(2, proplists:get_value(error_count, Summary)) end). - t_job_summary_proxy_fields() -> ?_test(begin Src = #httpdb{ @@ -1565,20 +1510,23 @@ t_job_summary_proxy_fields() -> url = "http://t", proxy_url = "socks5://u:p@tproxy:34" }, - Job = #job{ + Job = #job{ id = job1, history = [started(10), added()], rep = rep(Src, Tgt) }, setup_jobs([Job]), Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC), - ?assertEqual(<<"http://u:*****@sproxy:12">>, - proplists:get_value(source_proxy, Summary)), - ?assertEqual(<<"socks5://u:*****@tproxy:34">>, - proplists:get_value(target_proxy, Summary)) + ?assertEqual( + <<"http://u:*****@sproxy:12">>, + proplists:get_value(source_proxy, Summary) + ), + ?assertEqual( + <<"socks5://u:*****@tproxy:34">>, + proplists:get_value(target_proxy, Summary) + ) end). - % Test helper functions setup_all() -> @@ -1595,14 +1543,11 @@ setup_all() -> meck:expect(couch_replicator_scheduler_sup, start_child, 1, {ok, Pid}), couch_replicator_share:init(). - - teardown_all(_) -> couch_replicator_share:clear(), catch ets:delete(?MODULE), meck:unload(). - setup() -> meck:reset([ couch_log, @@ -1611,40 +1556,40 @@ setup() -> config ]). - teardown(_) -> ok. - setup_jobs(Jobs) when is_list(Jobs) -> ?MODULE = ets:new(?MODULE, [named_table, {keypos, #job.id}]), ets:insert(?MODULE, Jobs). - all_jobs() -> lists:usort(ets:tab2list(?MODULE)). - jobs_stopped() -> [Job#job.id || Job <- all_jobs(), Job#job.pid =:= undefined]. - jobs_running() -> [Job#job.id || Job <- all_jobs(), Job#job.pid =/= undefined]. - run_stop_count() -> {length(jobs_running()), length(jobs_stopped())}. - oneshot_run_stop_count() -> - Running = [Job#job.id || Job <- all_jobs(), Job#job.pid =/= undefined, - not is_continuous(Job)], - Stopped = [Job#job.id || Job <- all_jobs(), Job#job.pid =:= undefined, - not is_continuous(Job)], + Running = [ + Job#job.id + || Job <- all_jobs(), + Job#job.pid =/= undefined, + not is_continuous(Job) + ], + Stopped = [ + Job#job.id + || Job <- all_jobs(), + Job#job.pid =:= undefined, + not is_continuous(Job) + ], {length(Running), length(Stopped)}. - mock_state(MaxJobs) -> #state{ max_jobs = MaxJobs, @@ -1661,35 +1606,29 @@ mock_state(MaxJobs, MaxChurn) -> stats_pid = self() }. - rep() -> #rep{options = [], user_ctx = #user_ctx{}}. - rep(Src, Tgt) -> Rep = rep(), Rep#rep{source = Src, target = Tgt}. - continuous_rep() -> #rep{options = [{continuous, true}], user_ctx = #user_ctx{}}. - continuous_rep(Src, Tgt) -> Rep = continuous_rep(), Rep#rep{source = Src, target = Tgt}. - continuous(Id) when is_integer(Id) -> Started = Id, - Hist = [stopped(Started+1), started(Started), added()], + Hist = [stopped(Started + 1), started(Started), added()], #job{ id = Id, history = Hist, rep = continuous_rep() }. - continuous_running(Id) when is_integer(Id) -> Started = Id, Pid = mock_pid(), @@ -1701,13 +1640,11 @@ continuous_running(Id) when is_integer(Id) -> monitor = monitor(process, Pid) }. - oneshot(Id) when is_integer(Id) -> Started = Id, Hist = [stopped(Started + 1), started(Started), added()], #job{id = Id, history = Hist, rep = rep()}. - oneshot_running(Id) when is_integer(Id) -> Started = Id, Pid = mock_pid(), @@ -1719,43 +1656,34 @@ oneshot_running(Id) when is_integer(Id) -> monitor = monitor(process, Pid) }. - testjob(Hist) when is_list(Hist) -> #job{history = Hist}. - mock_pid() -> - list_to_pid("<0.999.999>"). + list_to_pid("<0.999.999>"). crashed() -> crashed(0). - -crashed(WhenSec) when is_integer(WhenSec)-> +crashed(WhenSec) when is_integer(WhenSec) -> {{crashed, some_reason}, {0, WhenSec, 0}}; crashed({MSec, Sec, USec}) -> {{crashed, some_reason}, {MSec, Sec, USec}}. - started() -> started(0). - -started(WhenSec) when is_integer(WhenSec)-> +started(WhenSec) when is_integer(WhenSec) -> {started, {0, WhenSec, 0}}; - started({MSec, Sec, USec}) -> {started, {MSec, Sec, USec}}. - stopped() -> stopped(0). - stopped(WhenSec) -> {stopped, {0, WhenSec, 0}}. - added() -> {added, {0, 0, 0}}. |