summaryrefslogtreecommitdiff
path: root/src/couch_replicator/src/couch_replicator_scheduler.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couch_replicator/src/couch_replicator_scheduler.erl')
-rw-r--r--src/couch_replicator/src/couch_replicator_scheduler.erl700
1 files changed, 314 insertions, 386 deletions
diff --git a/src/couch_replicator/src/couch_replicator_scheduler.erl b/src/couch_replicator/src/couch_replicator_scheduler.erl
index d3b5b71a4..f544865af 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler.erl
@@ -20,28 +20,28 @@
]).
-export([
- init/1,
- terminate/2,
- handle_call/3,
- handle_info/2,
- handle_cast/2,
- code_change/3,
- format_status/2
+ init/1,
+ terminate/2,
+ handle_call/3,
+ handle_info/2,
+ handle_cast/2,
+ code_change/3,
+ format_status/2
]).
-export([
- add_job/1,
- remove_job/1,
- reschedule/0,
- rep_state/1,
- find_jobs_by_dbname/1,
- find_jobs_by_doc/2,
- job_summary/2,
- health_threshold/0,
- jobs/0,
- job/1,
- restart_job/1,
- update_job_stats/2
+ add_job/1,
+ remove_job/1,
+ reschedule/0,
+ rep_state/1,
+ find_jobs_by_dbname/1,
+ find_jobs_by_doc/2,
+ job_summary/2,
+ health_threshold/0,
+ jobs/0,
+ job/1,
+ restart_job/1,
+ update_job_stats/2
]).
%% config_listener callbacks
@@ -59,7 +59,6 @@
-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
-include_lib("couch/include/couch_db.hrl").
-
%% definitions
-define(MAX_BACKOFF_EXPONENT, 10).
-define(BACKOFF_INTERVAL_MICROS, 30 * 1000 * 1000).
@@ -72,7 +71,6 @@
-define(DEFAULT_MAX_HISTORY, 20).
-define(DEFAULT_SCHEDULER_INTERVAL, 60000).
-
-record(state, {
interval = ?DEFAULT_SCHEDULER_INTERVAL,
timer,
@@ -88,14 +86,12 @@
crashed_n = 0 :: non_neg_integer()
}).
-
%% public functions
-spec start_link() -> {ok, pid()} | ignore | {error, term()}.
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
-spec add_job(#rep{}) -> ok.
add_job(#rep{} = Rep) when Rep#rep.id /= undefined ->
case existing_replication(Rep) of
@@ -110,47 +106,44 @@ add_job(#rep{} = Rep) when Rep#rep.id /= undefined ->
ok
end.
-
-spec remove_job(job_id()) -> ok.
remove_job(Id) ->
gen_server:call(?MODULE, {remove_job, Id}, infinity).
-
-spec reschedule() -> ok.
% Trigger a manual reschedule. Used for testing and/or ops.
reschedule() ->
gen_server:call(?MODULE, reschedule, infinity).
-
-spec rep_state(rep_id()) -> #rep{} | nil.
rep_state(RepId) ->
case (catch ets:lookup_element(?MODULE, RepId, #job.rep)) of
- {'EXIT',{badarg, _}} ->
+ {'EXIT', {badarg, _}} ->
nil;
Rep ->
Rep
end.
-
-spec job_summary(job_id(), non_neg_integer()) -> [_] | nil.
job_summary(JobId, HealthThreshold) ->
case job_by_id(JobId) of
{ok, #job{pid = Pid, history = History, rep = Rep}} ->
ErrorCount = consecutive_crashes(History, HealthThreshold),
- {State, Info} = case {Pid, ErrorCount} of
- {undefined, 0} ->
- case History of
- [{{crashed, Error}, _When} | _] ->
- {crashing, crash_reason_json(Error)};
- [_ | _] ->
- {pending, Rep#rep.stats}
- end;
- {undefined, ErrorCount} when ErrorCount > 0 ->
- [{{crashed, Error}, _When} | _] = History,
- {crashing, crash_reason_json(Error)};
- {Pid, ErrorCount} when is_pid(Pid) ->
- {running, Rep#rep.stats}
- end,
+ {State, Info} =
+ case {Pid, ErrorCount} of
+ {undefined, 0} ->
+ case History of
+ [{{crashed, Error}, _When} | _] ->
+ {crashing, crash_reason_json(Error)};
+ [_ | _] ->
+ {pending, Rep#rep.stats}
+ end;
+ {undefined, ErrorCount} when ErrorCount > 0 ->
+ [{{crashed, Error}, _When} | _] = History,
+ {crashing, crash_reason_json(Error)};
+ {Pid, ErrorCount} when is_pid(Pid) ->
+ {running, Rep#rep.stats}
+ end,
[
{source, iolist_to_binary(ejson_url(Rep#rep.source))},
{target, iolist_to_binary(ejson_url(Rep#rep.target))},
@@ -158,22 +151,20 @@ job_summary(JobId, HealthThreshold) ->
{info, couch_replicator_utils:ejson_state_info(Info)},
{error_count, ErrorCount},
{last_updated, last_updated(History)},
- {start_time,
- couch_replicator_utils:iso8601(Rep#rep.start_time)},
+ {start_time, couch_replicator_utils:iso8601(Rep#rep.start_time)},
{source_proxy, job_proxy_url(Rep#rep.source)},
{target_proxy, job_proxy_url(Rep#rep.target)}
];
{error, not_found} ->
- nil % Job might have just completed
+ % Job might have just completed
+ nil
end.
-
job_proxy_url(#httpdb{proxy_url = ProxyUrl}) when is_list(ProxyUrl) ->
list_to_binary(couch_util:url_strip_password(ProxyUrl));
job_proxy_url(_Endpoint) ->
null.
-
% Health threshold is the minimum amount of time an unhealthy job should run
% crashing before it is considered to be healthy again. HealtThreashold should
% not be 0 as jobs could start and immediately crash, and it shouldn't be
@@ -181,9 +172,11 @@ job_proxy_url(_Endpoint) ->
% job is back to normal.
-spec health_threshold() -> non_neg_integer().
health_threshold() ->
- config:get_integer("replicator", "health_threshold",
- ?DEFAULT_HEALTH_THRESHOLD_SEC).
-
+ config:get_integer(
+ "replicator",
+ "health_threshold",
+ ?DEFAULT_HEALTH_THRESHOLD_SEC
+ ).
-spec find_jobs_by_dbname(binary()) -> list(#rep{}).
find_jobs_by_dbname(DbName) ->
@@ -191,14 +184,12 @@ find_jobs_by_dbname(DbName) ->
MatchSpec = #job{id = '$1', rep = Rep, _ = '_'},
[RepId || [RepId] <- ets:match(?MODULE, MatchSpec)].
-
-spec find_jobs_by_doc(binary(), binary()) -> list(#rep{}).
find_jobs_by_doc(DbName, DocId) ->
- Rep = #rep{db_name = DbName, doc_id = DocId, _ = '_'},
+ Rep = #rep{db_name = DbName, doc_id = DocId, _ = '_'},
MatchSpec = #job{id = '$1', rep = Rep, _ = '_'},
[RepId || [RepId] <- ets:match(?MODULE, MatchSpec)].
-
-spec restart_job(binary() | list() | rep_id()) ->
{ok, {[_]}} | {error, not_found}.
restart_job(JobId) ->
@@ -211,28 +202,39 @@ restart_job(JobId) ->
job(JobId)
end.
-
-spec update_job_stats(job_id(), term()) -> ok.
update_job_stats(JobId, Stats) ->
gen_server:cast(?MODULE, {update_job_stats, JobId, Stats}).
-
%% gen_server functions
init(_) ->
config:enable_feature('scheduler'),
- EtsOpts = [named_table, {keypos, #job.id}, {read_concurrency, true},
- {write_concurrency, true}],
+ EtsOpts = [
+ named_table,
+ {keypos, #job.id},
+ {read_concurrency, true},
+ {write_concurrency, true}
+ ],
?MODULE = ets:new(?MODULE, EtsOpts),
ok = couch_replicator_share:init(),
ok = config:listen_for_changes(?MODULE, nil),
- Interval = config:get_integer("replicator", "interval",
- ?DEFAULT_SCHEDULER_INTERVAL),
+ Interval = config:get_integer(
+ "replicator",
+ "interval",
+ ?DEFAULT_SCHEDULER_INTERVAL
+ ),
MaxJobs = config:get_integer("replicator", "max_jobs", ?DEFAULT_MAX_JOBS),
- MaxChurn = config:get_integer("replicator", "max_churn",
- ?DEFAULT_MAX_CHURN),
- MaxHistory = config:get_integer("replicator", "max_history",
- ?DEFAULT_MAX_HISTORY),
+ MaxChurn = config:get_integer(
+ "replicator",
+ "max_churn",
+ ?DEFAULT_MAX_CHURN
+ ),
+ MaxHistory = config:get_integer(
+ "replicator",
+ "max_history",
+ ?DEFAULT_MAX_HISTORY
+ ),
Timer = erlang:send_after(Interval, self(), reschedule),
State = #state{
interval = Interval,
@@ -244,7 +246,6 @@ init(_) ->
},
{ok, State}.
-
handle_call({add_job, Job}, _From, State) ->
ok = maybe_remove_job_int(Job#job.id, State),
true = add_job_int(Job),
@@ -253,50 +254,51 @@ handle_call({add_job, Job}, _From, State) ->
TotalJobs = ets:info(?MODULE, size),
couch_stats:update_gauge([couch_replicator, jobs, total], TotalJobs),
{reply, ok, State};
-
handle_call({remove_job, Id}, _From, State) ->
ok = maybe_remove_job_int(Id, State),
{reply, ok, State};
-
handle_call(reschedule, _From, State) ->
ok = reschedule(State),
{reply, ok, State};
-
handle_call(_, _From, State) ->
{noreply, State}.
-
-handle_cast({set_max_jobs, MaxJobs}, State) when is_integer(MaxJobs),
- MaxJobs >= 0 ->
+handle_cast({set_max_jobs, MaxJobs}, State) when
+ is_integer(MaxJobs),
+ MaxJobs >= 0
+->
couch_log:notice("~p: max_jobs set to ~B", [?MODULE, MaxJobs]),
{noreply, State#state{max_jobs = MaxJobs}};
-
-handle_cast({set_max_churn, MaxChurn}, State) when is_integer(MaxChurn),
- MaxChurn > 0 ->
+handle_cast({set_max_churn, MaxChurn}, State) when
+ is_integer(MaxChurn),
+ MaxChurn > 0
+->
couch_log:notice("~p: max_churn set to ~B", [?MODULE, MaxChurn]),
{noreply, State#state{max_churn = MaxChurn}};
-
-handle_cast({set_max_history, MaxHistory}, State) when is_integer(MaxHistory),
- MaxHistory > 0 ->
+handle_cast({set_max_history, MaxHistory}, State) when
+ is_integer(MaxHistory),
+ MaxHistory > 0
+->
couch_log:notice("~p: max_history set to ~B", [?MODULE, MaxHistory]),
{noreply, State#state{max_history = MaxHistory}};
-
-handle_cast({set_interval, Interval}, State) when is_integer(Interval),
- Interval > 0 ->
+handle_cast({set_interval, Interval}, State) when
+ is_integer(Interval),
+ Interval > 0
+->
couch_log:notice("~p: interval set to ~B", [?MODULE, Interval]),
{noreply, State#state{interval = Interval}};
-
-handle_cast({update_shares, Key, Shares}, State) when is_binary(Key),
- is_integer(Shares), Shares >= 0 ->
+handle_cast({update_shares, Key, Shares}, State) when
+ is_binary(Key),
+ is_integer(Shares),
+ Shares >= 0
+->
couch_log:notice("~p: shares for ~s set to ~B", [?MODULE, Key, Shares]),
couch_replicator_share:update_shares(Key, Shares),
{noreply, State};
-
-handle_cast({reset_shares, Key}, State) when is_binary(Key) ->
+handle_cast({reset_shares, Key}, State) when is_binary(Key) ->
couch_log:notice("~p: shares for ~s reset to default", [?MODULE, Key]),
couch_replicator_share:reset_shares(Key),
{noreply, State};
-
handle_cast({update_job_stats, JobId, Stats}, State) ->
case rep_state(JobId) of
nil ->
@@ -306,18 +308,15 @@ handle_cast({update_job_stats, JobId, Stats}, State) ->
true = ets:update_element(?MODULE, JobId, {#job.rep, NewRep})
end,
{noreply, State};
-
handle_cast(UnexpectedMsg, State) ->
couch_log:error("~p: received un-expected cast ~p", [?MODULE, UnexpectedMsg]),
{noreply, State}.
-
handle_info(reschedule, State) ->
ok = reschedule(State),
erlang:cancel_timer(State#state.timer),
Timer = erlang:send_after(State#state.interval, self(), reschedule),
{noreply, State#state{timer = Timer}};
-
handle_info({'DOWN', _Ref, process, Pid, normal}, State) ->
{ok, Job} = job_by_pid(Pid),
couch_log:notice("~p: Job ~p completed normally", [?MODULE, Job#job.id]),
@@ -326,82 +325,66 @@ handle_info({'DOWN', _Ref, process, Pid, normal}, State) ->
remove_job_int(Job),
update_running_jobs_stats(State#state.stats_pid),
{noreply, State};
-
handle_info({'DOWN', _Ref, process, Pid, Reason0}, State) ->
{ok, Job} = job_by_pid(Pid),
- Reason = case Reason0 of
- {shutdown, ShutdownReason} -> ShutdownReason;
- Other -> Other
- end,
+ Reason =
+ case Reason0 of
+ {shutdown, ShutdownReason} -> ShutdownReason;
+ Other -> Other
+ end,
Interval = State#state.interval,
couch_replicator_share:charge(Job, Interval, os:timestamp()),
ok = handle_crashed_job(Job, Reason, State),
{noreply, State};
-
handle_info(restart_config_listener, State) ->
ok = config:listen_for_changes(?MODULE, nil),
{noreply, State};
-
handle_info(_, State) ->
{noreply, State}.
-
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-
terminate(_Reason, _State) ->
couch_replicator_share:clear(),
ok.
-
format_status(_Opt, [_PDict, State]) ->
[
- {max_jobs, State#state.max_jobs},
- {running_jobs, running_job_count()},
- {pending_jobs, pending_job_count()}
+ {max_jobs, State#state.max_jobs},
+ {running_jobs, running_job_count()},
+ {pending_jobs, pending_job_count()}
].
-
%% config listener functions
handle_config_change("replicator", "max_jobs", V, _, S) ->
ok = gen_server:cast(?MODULE, {set_max_jobs, list_to_integer(V)}),
{ok, S};
-
handle_config_change("replicator", "max_churn", V, _, S) ->
ok = gen_server:cast(?MODULE, {set_max_churn, list_to_integer(V)}),
{ok, S};
-
handle_config_change("replicator", "interval", V, _, S) ->
ok = gen_server:cast(?MODULE, {set_interval, list_to_integer(V)}),
{ok, S};
-
handle_config_change("replicator", "max_history", V, _, S) ->
ok = gen_server:cast(?MODULE, {set_max_history, list_to_integer(V)}),
{ok, S};
-
handle_config_change("replicator.shares", Key, deleted, _, S) ->
ok = gen_server:cast(?MODULE, {reset_shares, list_to_binary(Key)}),
{ok, S};
-
handle_config_change("replicator.shares", Key, V, _, S) ->
- ok = gen_server:cast(?MODULE, {update_shares, list_to_binary(Key),
- list_to_integer(V)}),
+ ok = gen_server:cast(?MODULE, {update_shares, list_to_binary(Key), list_to_integer(V)}),
{ok, S};
-
handle_config_change(_, _, _, _, S) ->
{ok, S}.
-
handle_config_terminate(_, stop, _) ->
ok;
-
handle_config_terminate(_, _, _) ->
Pid = whereis(?MODULE),
erlang:send_after(?RELISTEN_DELAY, Pid, restart_config_listener).
-
%% Private functions
% Handle crashed jobs. Handling differs between transient and permanent jobs.
@@ -420,7 +403,6 @@ handle_crashed_job(#job{rep = #rep{db_name = null}} = Job, Reason, State) ->
remove_job_int(Job),
update_running_jobs_stats(State#state.stats_pid),
ok;
-
handle_crashed_job(Job, Reason, State) ->
ok = update_state_crashed(Job, Reason, State),
case couch_replicator_doc_processor:update_docs() of
@@ -442,7 +424,6 @@ handle_crashed_job(Job, Reason, State) ->
ok
end.
-
% Attempt to start a newly added job. First quickly check if total jobs
% already exceed max jobs, then do a more expensive check which runs a
% select (an O(n) operation) to check pending jobs specifically.
@@ -459,7 +440,6 @@ maybe_start_newly_added_job(Job, State) ->
ok
end.
-
% Return up to a given number of oldest, not recently crashed jobs. Try to be
% memory efficient and use ets:foldl to accumulate jobs.
-spec pending_jobs(non_neg_integer()) -> [#job{}].
@@ -468,36 +448,34 @@ pending_jobs(0) ->
% other function clause it will crash as gb_sets:largest assumes set is not
% empty.
[];
-
pending_jobs(Count) when is_integer(Count), Count > 0 ->
- Set0 = gb_sets:new(), % [{{Priority, LastStart}, Job},...]
+ % [{{Priority, LastStart}, Job},...]
+ Set0 = gb_sets:new(),
Now = os:timestamp(),
Acc0 = {Set0, Now, Count, health_threshold()},
{Set1, _, _, _} = ets:foldl(fun pending_fold/2, Acc0, ?MODULE),
[Job || {_PriorityKey, Job} <- gb_sets:to_list(Set1)].
-
pending_fold(#job{pid = Pid}, Acc) when is_pid(Pid) ->
Acc;
-
pending_fold(Job, {Set, Now, Count, HealthThreshold}) ->
Healthy = not_recently_crashed(Job, Now, HealthThreshold),
- Set1 = case {Healthy, gb_sets:size(Set) >= Count} of
- {true, true} ->
- % Job is healthy but already reached accumulated limit, so might
- % have to replace one of the accumulated jobs
- pending_maybe_replace(Job, Set);
- {true, false} ->
- % Job is healthy and we haven't reached the limit, so add job
- % to accumulator
- gb_sets:add_element({start_priority_key(Job), Job}, Set);
- {false, _} ->
- % This job is not healthy (has crashed too recently), so skip it.
- Set
- end,
+ Set1 =
+ case {Healthy, gb_sets:size(Set) >= Count} of
+ {true, true} ->
+ % Job is healthy but already reached accumulated limit, so might
+ % have to replace one of the accumulated jobs
+ pending_maybe_replace(Job, Set);
+ {true, false} ->
+ % Job is healthy and we haven't reached the limit, so add job
+ % to accumulator
+ gb_sets:add_element({start_priority_key(Job), Job}, Set);
+ {false, _} ->
+ % This job is not healthy (has crashed too recently), so skip it.
+ Set
+ end,
{Set1, Now, Count, HealthThreshold}.
-
% Replace Job in the accumulator if it has a higher priority (lower priority
% value) than the lowest priority there. Job priority is indexed by
% {FairSharePiority, LastStarted} tuples. If the FairSharePriority is the same
@@ -526,16 +504,13 @@ pending_maybe_replace(Job, Set) ->
start_priority_key(#job{} = Job) ->
{couch_replicator_share:priority(Job#job.id), last_started(Job)}.
-
start_jobs(Count, State) ->
[start_job_int(Job, State) || Job <- pending_jobs(Count)],
ok.
-
-spec stop_jobs(non_neg_integer(), boolean(), #state{}) -> non_neg_integer().
stop_jobs(Count, _, _) when is_integer(Count), Count =< 0 ->
0;
-
stop_jobs(Count, IsContinuous, State) when is_integer(Count) ->
Running0 = running_jobs(),
ContinuousPred = fun(Job) -> is_continuous(Job) =:= IsContinuous end,
@@ -544,7 +519,6 @@ stop_jobs(Count, IsContinuous, State) when is_integer(Count) ->
Running3 = lists:sublist(lists:sort(Running2), Count),
length([stop_job_int(Job, State) || {_SortKey, Job} <- Running3]).
-
% Lower priority jobs have higher priority values, so we negate them, that way
% when sorted, they'll come up first. If priorities are equal, jobs are sorted
% by the lowest starting times as jobs with lowest start time have been running
@@ -553,7 +527,6 @@ stop_jobs(Count, IsContinuous, State) when is_integer(Count) ->
stop_priority_key(#job{} = Job) ->
{-couch_replicator_share:priority(Job#job.id), last_started(Job)}.
-
not_recently_crashed(#job{history = History}, Now, HealthThreshold) ->
case History of
[{added, _When}] ->
@@ -566,7 +539,6 @@ not_recently_crashed(#job{history = History}, Now, HealthThreshold) ->
timer:now_diff(Now, LatestCrashT) >= backoff_micros(CrashCount)
end.
-
% Count consecutive crashes. A crash happens when there is a `crashed` event
% within a short period of time (configurable) after any other event. It could
% be `crashed, started` for jobs crashing quickly after starting, `crashed,
@@ -583,40 +555,39 @@ not_recently_crashed(#job{history = History}, Now, HealthThreshold) ->
consecutive_crashes(History, HealthThreshold) when is_list(History) ->
consecutive_crashes(History, HealthThreshold, 0).
-
-spec consecutive_crashes(history(), non_neg_integer(), non_neg_integer()) ->
- non_neg_integer().
+ non_neg_integer().
consecutive_crashes([], _HealthThreashold, Count) ->
Count;
-
-consecutive_crashes([{{crashed, _}, CrashT}, {_, PrevT} = PrevEvent | Rest],
- HealthThreshold, Count) ->
+consecutive_crashes(
+ [{{crashed, _}, CrashT}, {_, PrevT} = PrevEvent | Rest],
+ HealthThreshold,
+ Count
+) ->
case timer:now_diff(CrashT, PrevT) > HealthThreshold * 1000000 of
true ->
Count;
false ->
consecutive_crashes([PrevEvent | Rest], HealthThreshold, Count + 1)
end;
-
-consecutive_crashes([{stopped, _}, {started, _} | _], _HealthThreshold,
- Count) ->
+consecutive_crashes(
+ [{stopped, _}, {started, _} | _],
+ _HealthThreshold,
+ Count
+) ->
Count;
-
consecutive_crashes([_ | Rest], HealthThreshold, Count) ->
consecutive_crashes(Rest, HealthThreshold, Count).
-
-spec latest_crash_timestamp(history()) -> erlang:timestamp().
latest_crash_timestamp([]) ->
- {0, 0, 0}; % Used to avoid special-casing "no crash" when doing now_diff
-
+ % Used to avoid special-casing "no crash" when doing now_diff
+ {0, 0, 0};
latest_crash_timestamp([{{crashed, _Reason}, When} | _]) ->
When;
-
latest_crash_timestamp([_Event | Rest]) ->
latest_crash_timestamp(Rest).
-
-spec backoff_micros(non_neg_integer()) -> non_neg_integer().
backoff_micros(CrashCount) ->
% When calculating the backoff interval treat consecutive crash count as the
@@ -626,13 +597,11 @@ backoff_micros(CrashCount) ->
BackoffExp = erlang:min(CrashCount - 1, ?MAX_BACKOFF_EXPONENT),
(1 bsl BackoffExp) * ?BACKOFF_INTERVAL_MICROS.
-
-spec add_job_int(#job{}) -> boolean().
add_job_int(#job{} = Job) ->
couch_replicator_share:job_added(Job),
ets:insert_new(?MODULE, Job).
-
-spec maybe_remove_job_int(job_id(), #state{}) -> ok.
maybe_remove_job_int(JobId, State) ->
case job_by_id(JobId) of
@@ -644,98 +613,99 @@ maybe_remove_job_int(JobId, State) ->
true = remove_job_int(Job),
couch_stats:increment_counter([couch_replicator, jobs, removes]),
TotalJobs = ets:info(?MODULE, size),
- couch_stats:update_gauge([couch_replicator, jobs, total],
- TotalJobs),
+ couch_stats:update_gauge(
+ [couch_replicator, jobs, total],
+ TotalJobs
+ ),
update_running_jobs_stats(State#state.stats_pid),
ok;
{error, not_found} ->
ok
end.
-
start_job_int(#job{pid = Pid}, _State) when Pid /= undefined ->
ok;
-
start_job_int(#job{} = Job0, State) ->
Job = maybe_optimize_job_for_rate_limiting(Job0),
case couch_replicator_scheduler_sup:start_child(Job#job.rep) of
{ok, Child} ->
Ref = monitor(process, Child),
ok = update_state_started(Job, Child, Ref, State),
- couch_log:notice("~p: Job ~p started as ~p",
- [?MODULE, Job#job.id, Child]);
+ couch_log:notice(
+ "~p: Job ~p started as ~p",
+ [?MODULE, Job#job.id, Child]
+ );
{error, {already_started, OtherPid}} when node(OtherPid) =:= node() ->
Ref = monitor(process, OtherPid),
ok = update_state_started(Job, OtherPid, Ref, State),
- couch_log:notice("~p: Job ~p already running as ~p. Most likely"
+ couch_log:notice(
+ "~p: Job ~p already running as ~p. Most likely"
" because replicator scheduler was restarted",
- [?MODULE, Job#job.id, OtherPid]);
+ [?MODULE, Job#job.id, OtherPid]
+ );
{error, {already_started, OtherPid}} when node(OtherPid) =/= node() ->
CrashMsg = "Duplicate replication running on another node",
- couch_log:notice("~p: Job ~p already running as ~p. Most likely"
+ couch_log:notice(
+ "~p: Job ~p already running as ~p. Most likely"
" because a duplicate replication is running on another node",
- [?MODULE, Job#job.id, OtherPid]),
+ [?MODULE, Job#job.id, OtherPid]
+ ),
ok = update_state_crashed(Job, CrashMsg, State);
{error, Reason} ->
- couch_log:notice("~p: Job ~p failed to start for reason ~p",
- [?MODULE, Job, Reason]),
+ couch_log:notice(
+ "~p: Job ~p failed to start for reason ~p",
+ [?MODULE, Job, Reason]
+ ),
ok = update_state_crashed(Job, Reason, State)
end.
-
-spec stop_job_int(#job{}, #state{}) -> ok | {error, term()}.
stop_job_int(#job{pid = undefined}, _State) ->
ok;
-
stop_job_int(#job{} = Job, State) ->
ok = couch_replicator_scheduler_sup:terminate_child(Job#job.pid),
demonitor(Job#job.monitor, [flush]),
ok = update_state_stopped(Job, State),
- couch_log:notice("~p: Job ~p stopped as ~p",
- [?MODULE, Job#job.id, Job#job.pid]).
-
+ couch_log:notice(
+ "~p: Job ~p stopped as ~p",
+ [?MODULE, Job#job.id, Job#job.pid]
+ ).
-spec remove_job_int(#job{}) -> true.
remove_job_int(#job{} = Job) ->
couch_replicator_share:job_removed(Job),
ets:delete(?MODULE, Job#job.id).
-
-spec running_job_count() -> non_neg_integer().
running_job_count() ->
ets:info(?MODULE, size) - pending_job_count().
-
-spec running_jobs() -> [#job{}].
running_jobs() ->
- ets:select(?MODULE, [{#job{pid = '$1', _='_'}, [{is_pid, '$1'}], ['$_']}]).
-
+ ets:select(?MODULE, [{#job{pid = '$1', _ = '_'}, [{is_pid, '$1'}], ['$_']}]).
-spec pending_job_count() -> non_neg_integer().
pending_job_count() ->
- ets:select_count(?MODULE, [{#job{pid=undefined, _='_'}, [], [true]}]).
-
+ ets:select_count(?MODULE, [{#job{pid = undefined, _ = '_'}, [], [true]}]).
-spec job_by_pid(pid()) -> {ok, #job{}} | {error, not_found}.
job_by_pid(Pid) when is_pid(Pid) ->
- case ets:match_object(?MODULE, #job{pid=Pid, _='_'}) of
+ case ets:match_object(?MODULE, #job{pid = Pid, _ = '_'}) of
[] ->
{error, not_found};
- [#job{}=Job] ->
+ [#job{} = Job] ->
{ok, Job}
end.
-
-spec job_by_id(job_id()) -> {ok, #job{}} | {error, not_found}.
job_by_id(Id) ->
case ets:lookup(?MODULE, Id) of
[] ->
{error, not_found};
- [#job{}=Job] ->
+ [#job{} = Job] ->
{ok, Job}
end.
-
-spec update_state_stopped(#job{}, #state{}) -> ok.
update_state_stopped(Job, State) ->
Job1 = reset_job_process(Job),
@@ -744,7 +714,6 @@ update_state_stopped(Job, State) ->
couch_stats:increment_counter([couch_replicator, jobs, stops]),
ok.
-
-spec update_state_started(#job{}, pid(), reference(), #state{}) -> ok.
update_state_started(Job, Pid, Ref, State) ->
Job1 = set_job_process(Job, Pid, Ref),
@@ -753,7 +722,6 @@ update_state_started(Job, Pid, Ref, State) ->
couch_stats:increment_counter([couch_replicator, jobs, starts]),
ok.
-
-spec update_state_crashed(#job{}, any(), #state{}) -> ok.
update_state_crashed(Job, Reason, State) ->
Job1 = reset_job_process(Job),
@@ -762,17 +730,14 @@ update_state_crashed(Job, Reason, State) ->
couch_stats:increment_counter([couch_replicator, jobs, crashes]),
ok.
-
-spec set_job_process(#job{}, pid(), reference()) -> #job{}.
set_job_process(#job{} = Job, Pid, Ref) when is_pid(Pid), is_reference(Ref) ->
Job#job{pid = Pid, monitor = Ref}.
-
-spec reset_job_process(#job{}) -> #job{}.
reset_job_process(#job{} = Job) ->
Job#job{pid = undefined, monitor = undefined}.
-
-spec reschedule(#state{}) -> ok.
reschedule(#state{interval = Interval} = State) ->
couch_replicator_share:update(running_jobs(), Interval, os:timestamp()),
@@ -780,50 +745,48 @@ reschedule(#state{interval = Interval} = State) ->
rotate_jobs(State, StopCount),
update_running_jobs_stats(State#state.stats_pid).
-
-spec stop_excess_jobs(#state{}, non_neg_integer()) -> non_neg_integer().
stop_excess_jobs(State, Running) ->
- #state{max_jobs=MaxJobs} = State,
+ #state{max_jobs = MaxJobs} = State,
StopCount = max(0, Running - MaxJobs),
Stopped = stop_jobs(StopCount, true, State),
OneshotLeft = StopCount - Stopped,
stop_jobs(OneshotLeft, false, State),
StopCount.
-
start_pending_jobs(State) ->
- #state{max_jobs=MaxJobs} = State,
+ #state{max_jobs = MaxJobs} = State,
Running = running_job_count(),
Pending = pending_job_count(),
- if Running < MaxJobs, Pending > 0 ->
- start_jobs(MaxJobs - Running, State);
- true ->
- ok
+ if
+ Running < MaxJobs, Pending > 0 ->
+ start_jobs(MaxJobs - Running, State);
+ true ->
+ ok
end.
-
-spec rotate_jobs(#state{}, non_neg_integer()) -> ok.
rotate_jobs(State, ChurnSoFar) ->
- #state{max_jobs=MaxJobs, max_churn=MaxChurn} = State,
+ #state{max_jobs = MaxJobs, max_churn = MaxChurn} = State,
Running = running_job_count(),
Pending = pending_job_count(),
% Reduce MaxChurn by the number of already stopped jobs in the
% current rescheduling cycle.
Churn = max(0, MaxChurn - ChurnSoFar),
SlotsAvailable = MaxJobs - Running,
- if SlotsAvailable >= 0 ->
- % If there is are enough SlotsAvailable reduce StopCount to avoid
- % unnesessarily stopping jobs. `stop_jobs/3` ignores 0 or negative
- % values so we don't worry about that here.
- StopCount = lists:min([Pending - SlotsAvailable, Running, Churn]),
- stop_jobs(StopCount, true, State),
- StartCount = max(0, MaxJobs - running_job_count()),
- start_jobs(StartCount, State);
- true ->
- ok
+ if
+ SlotsAvailable >= 0 ->
+ % If there is are enough SlotsAvailable reduce StopCount to avoid
+ % unnesessarily stopping jobs. `stop_jobs/3` ignores 0 or negative
+ % values so we don't worry about that here.
+ StopCount = lists:min([Pending - SlotsAvailable, Running, Churn]),
+ stop_jobs(StopCount, true, State),
+ StartCount = max(0, MaxJobs - running_job_count()),
+ start_jobs(StartCount, State);
+ true ->
+ ok
end.
-
-spec last_started(#job{}) -> erlang:timestamp().
last_started(#job{} = Job) ->
case lists:keyfind(started, 1, Job#job.history) of
@@ -833,7 +796,6 @@ last_started(#job{} = Job) ->
When
end.
-
-spec update_history(#job{}, event_type(), erlang:timestamp(), #state{}) ->
#job{}.
update_history(Job, Type, When, State) ->
@@ -841,35 +803,38 @@ update_history(Job, Type, When, State) ->
History1 = lists:sublist(History0, State#state.max_history),
Job#job{history = History1}.
-
-spec ejson_url(#httpdb{} | binary()) -> binary().
-ejson_url(#httpdb{}=Httpdb) ->
+ejson_url(#httpdb{} = Httpdb) ->
couch_util:url_strip_password(Httpdb#httpdb.url);
ejson_url(DbName) when is_binary(DbName) ->
DbName.
-
-spec job_ejson(#job{}) -> {[_ | _]}.
job_ejson(Job) ->
Rep = Job#job.rep,
Source = ejson_url(Rep#rep.source),
Target = ejson_url(Rep#rep.target),
- History = lists:map(fun({Type, When}) ->
- EventProps = case Type of
- {crashed, Reason} ->
- [{type, crashed}, {reason, crash_reason_json(Reason)}];
- Type ->
- [{type, Type}]
+ History = lists:map(
+ fun({Type, When}) ->
+ EventProps =
+ case Type of
+ {crashed, Reason} ->
+ [{type, crashed}, {reason, crash_reason_json(Reason)}];
+ Type ->
+ [{type, Type}]
+ end,
+ {[{timestamp, couch_replicator_utils:iso8601(When)} | EventProps]}
end,
- {[{timestamp, couch_replicator_utils:iso8601(When)} | EventProps]}
- end, Job#job.history),
+ Job#job.history
+ ),
{BaseID, Ext} = Job#job.id,
- Pid = case Job#job.pid of
- undefined ->
- null;
- P when is_pid(P) ->
- ?l2b(pid_to_list(P))
- end,
+ Pid =
+ case Job#job.pid of
+ undefined ->
+ null;
+ P when is_pid(P) ->
+ ?l2b(pid_to_list(P))
+ end,
{[
{id, iolist_to_binary([BaseID, Ext])},
{pid, Pid},
@@ -884,12 +849,10 @@ job_ejson(Job) ->
{start_time, couch_replicator_utils:iso8601(Rep#rep.start_time)}
]}.
-
-spec jobs() -> [[tuple()]].
jobs() ->
ets:foldl(fun(Job, Acc) -> [job_ejson(Job) | Acc] end, [], ?MODULE).
-
-spec job(job_id()) -> {ok, {[_ | _]}} | {error, not_found}.
job(JobId) ->
case job_by_id(JobId) of
@@ -899,7 +862,6 @@ job(JobId) ->
Error
end.
-
crash_reason_json({_CrashType, Info}) when is_binary(Info) ->
Info;
crash_reason_json(Reason) when is_binary(Reason) ->
@@ -907,22 +869,23 @@ crash_reason_json(Reason) when is_binary(Reason) ->
crash_reason_json(Error) ->
couch_replicator_utils:rep_error_to_binary(Error).
-
-spec last_updated([_]) -> binary().
last_updated([{_Type, When} | _]) ->
couch_replicator_utils:iso8601(When).
-
-spec is_continuous(#job{}) -> boolean().
is_continuous(#job{rep = Rep}) ->
couch_util:get_value(continuous, Rep#rep.options, false).
-
% If job crashed last time because it was rate limited, try to
% optimize some options to help the job make progress.
-spec maybe_optimize_job_for_rate_limiting(#job{}) -> #job{}.
-maybe_optimize_job_for_rate_limiting(Job = #job{history =
- [{{crashed, max_backoff}, _} | _]}) ->
+maybe_optimize_job_for_rate_limiting(
+ Job = #job{
+ history =
+ [{{crashed, max_backoff}, _} | _]
+ }
+) ->
Opts = [
{checkpoint_interval, 5000},
{worker_processes, 2},
@@ -934,7 +897,6 @@ maybe_optimize_job_for_rate_limiting(Job = #job{history =
maybe_optimize_job_for_rate_limiting(Job) ->
Job.
-
-spec optimize_int_option({atom(), any()}, #rep{}) -> #rep{}.
optimize_int_option({Key, Val}, #rep{options = Options} = Rep) ->
case couch_util:get_value(Key, Options) of
@@ -947,7 +909,6 @@ optimize_int_option({Key, Val}, #rep{options = Options} = Rep) ->
Rep
end.
-
% Updater is a separate process. It receives `update_stats` messages and
% updates scheduler stats from the scheduler jobs table. Updates are
% performed no more frequently than once per ?STATS_UPDATE_WAIT milliseconds.
@@ -956,11 +917,9 @@ update_running_jobs_stats(StatsPid) when is_pid(StatsPid) ->
StatsPid ! update_stats,
ok.
-
start_stats_updater() ->
erlang:spawn_link(?MODULE, stats_updater_loop, [undefined]).
-
stats_updater_loop(Timer) ->
receive
update_stats when Timer == undefined ->
@@ -975,31 +934,28 @@ stats_updater_loop(Timer) ->
erlang:exit({stats_updater_bad_msg, Else})
end.
-
-spec stats_updater_refresh() -> ok.
stats_updater_refresh() ->
#stats_acc{
- pending_n = PendingN,
- running_n = RunningN,
- crashed_n = CrashedN
- } = ets:foldl(fun stats_fold/2, #stats_acc{}, ?MODULE),
+ pending_n = PendingN,
+ running_n = RunningN,
+ crashed_n = CrashedN
+ } = ets:foldl(fun stats_fold/2, #stats_acc{}, ?MODULE),
couch_stats:update_gauge([couch_replicator, jobs, pending], PendingN),
couch_stats:update_gauge([couch_replicator, jobs, running], RunningN),
couch_stats:update_gauge([couch_replicator, jobs, crashed], CrashedN),
ok.
-
-spec stats_fold(#job{}, #stats_acc{}) -> #stats_acc{}.
stats_fold(#job{pid = undefined, history = [{added, _}]}, Acc) ->
Acc#stats_acc{pending_n = Acc#stats_acc.pending_n + 1};
stats_fold(#job{pid = undefined, history = [{stopped, _} | _]}, Acc) ->
Acc#stats_acc{pending_n = Acc#stats_acc.pending_n + 1};
stats_fold(#job{pid = undefined, history = [{{crashed, _}, _} | _]}, Acc) ->
- Acc#stats_acc{crashed_n =Acc#stats_acc.crashed_n + 1};
+ Acc#stats_acc{crashed_n = Acc#stats_acc.crashed_n + 1};
stats_fold(#job{pid = P, history = [{started, _} | _]}, Acc) when is_pid(P) ->
Acc#stats_acc{running_n = Acc#stats_acc.running_n + 1}.
-
-spec existing_replication(#rep{}) -> boolean().
existing_replication(#rep{} = NewRep) ->
case job_by_id(NewRep#rep.id) of
@@ -1011,67 +967,85 @@ existing_replication(#rep{} = NewRep) ->
false
end.
-
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-
backoff_micros_test_() ->
BaseInterval = ?BACKOFF_INTERVAL_MICROS,
- [?_assertEqual(R * BaseInterval, backoff_micros(N)) || {R, N} <- [
- {1, 1}, {2, 2}, {4, 3}, {8, 4}, {16, 5}, {32, 6}, {64, 7}, {128, 8},
- {256, 9}, {512, 10}, {1024, 11}, {1024, 12}
- ]].
-
+ [
+ ?_assertEqual(R * BaseInterval, backoff_micros(N))
+ || {R, N} <- [
+ {1, 1},
+ {2, 2},
+ {4, 3},
+ {8, 4},
+ {16, 5},
+ {32, 6},
+ {64, 7},
+ {128, 8},
+ {256, 9},
+ {512, 10},
+ {1024, 11},
+ {1024, 12}
+ ]
+ ].
consecutive_crashes_test_() ->
Threshold = ?DEFAULT_HEALTH_THRESHOLD_SEC,
- [?_assertEqual(R, consecutive_crashes(H, Threshold)) || {R, H} <- [
- {0, []},
- {0, [added()]},
- {0, [stopped()]},
- {0, [crashed()]},
- {1, [crashed(), added()]},
- {1, [crashed(), crashed()]},
- {1, [crashed(), stopped()]},
- {3, [crashed(), crashed(), crashed(), added()]},
- {2, [crashed(), crashed(), stopped()]},
- {1, [crashed(), started(), added()]},
- {2, [crashed(3), started(2), crashed(1), started(0)]},
- {0, [stopped(3), started(2), crashed(1), started(0)]},
- {1, [crashed(3), started(2), stopped(1), started(0)]},
- {0, [crashed(999), started(0)]},
- {1, [crashed(999), started(998), crashed(997), started(0)]}
- ]].
-
+ [
+ ?_assertEqual(R, consecutive_crashes(H, Threshold))
+ || {R, H} <- [
+ {0, []},
+ {0, [added()]},
+ {0, [stopped()]},
+ {0, [crashed()]},
+ {1, [crashed(), added()]},
+ {1, [crashed(), crashed()]},
+ {1, [crashed(), stopped()]},
+ {3, [crashed(), crashed(), crashed(), added()]},
+ {2, [crashed(), crashed(), stopped()]},
+ {1, [crashed(), started(), added()]},
+ {2, [crashed(3), started(2), crashed(1), started(0)]},
+ {0, [stopped(3), started(2), crashed(1), started(0)]},
+ {1, [crashed(3), started(2), stopped(1), started(0)]},
+ {0, [crashed(999), started(0)]},
+ {1, [crashed(999), started(998), crashed(997), started(0)]}
+ ]
+ ].
consecutive_crashes_non_default_threshold_test_() ->
- [?_assertEqual(R, consecutive_crashes(H, T)) || {R, H, T} <- [
- {0, [crashed(11), started(0)], 10},
- {1, [crashed(10), started(0)], 10}
- ]].
-
+ [
+ ?_assertEqual(R, consecutive_crashes(H, T))
+ || {R, H, T} <- [
+ {0, [crashed(11), started(0)], 10},
+ {1, [crashed(10), started(0)], 10}
+ ]
+ ].
latest_crash_timestamp_test_() ->
- [?_assertEqual({0, R, 0}, latest_crash_timestamp(H)) || {R, H} <- [
- {0, [added()]},
- {1, [crashed(1)]},
- {3, [crashed(3), started(2), crashed(1), started(0)]},
- {1, [started(3), stopped(2), crashed(1), started(0)]}
- ]].
-
+ [
+ ?_assertEqual({0, R, 0}, latest_crash_timestamp(H))
+ || {R, H} <- [
+ {0, [added()]},
+ {1, [crashed(1)]},
+ {3, [crashed(3), started(2), crashed(1), started(0)]},
+ {1, [started(3), stopped(2), crashed(1), started(0)]}
+ ]
+ ].
last_started_test_() ->
- [?_assertEqual({0, R, 0}, last_started(testjob(H))) || {R, H} <- [
- {0, [added()]},
- {0, [crashed(1)]},
- {1, [started(1)]},
- {1, [added(), started(1)]},
- {2, [started(2), started(1)]},
- {2, [crashed(3), started(2), started(1)]}
- ]].
-
+ [
+ ?_assertEqual({0, R, 0}, last_started(testjob(H)))
+ || {R, H} <- [
+ {0, [added()]},
+ {0, [crashed(1)]},
+ {1, [started(1)]},
+ {1, [added(), started(1)]},
+ {2, [started(2), started(1)]},
+ {2, [crashed(3), started(2), started(1)]}
+ ]
+ ].
longest_running_test() ->
J0 = testjob([crashed()]),
@@ -1084,7 +1058,6 @@ longest_running_test() ->
?assertEqual([J1, J2], Sort([J2, J1])),
?assertEqual([J0, J1, J2], Sort([J2, J1, J0])).
-
scheduler_test_() ->
{
setup,
@@ -1131,9 +1104,8 @@ scheduler_test_() ->
}
}.
-
t_pending_jobs_simple() ->
- ?_test(begin
+ ?_test(begin
Job1 = oneshot(1),
Job2 = oneshot(2),
setup_jobs([Job2, Job1]),
@@ -1143,9 +1115,8 @@ t_pending_jobs_simple() ->
?assertEqual([Job1, Job2], pending_jobs(3))
end).
-
t_pending_jobs_skip_crashed() ->
- ?_test(begin
+ ?_test(begin
Job = oneshot(1),
Ts = os:timestamp(),
History = [crashed(Ts), started(Ts) | Job#job.history],
@@ -1158,9 +1129,8 @@ t_pending_jobs_skip_crashed() ->
?assertEqual([Job2, Job3], pending_jobs(3))
end).
-
t_pending_jobs_skip_running() ->
- ?_test(begin
+ ?_test(begin
Job1 = continuous(1),
Job2 = continuous_running(2),
Job3 = oneshot(3),
@@ -1170,7 +1140,6 @@ t_pending_jobs_skip_running() ->
?assertEqual([Job1, Job3], pending_jobs(4))
end).
-
t_one_job_starts() ->
?_test(begin
setup_jobs([oneshot(1)]),
@@ -1179,7 +1148,6 @@ t_one_job_starts() ->
?assertEqual({1, 0}, run_stop_count())
end).
-
t_no_jobs_start_if_max_is_0() ->
?_test(begin
setup_jobs([oneshot(1)]),
@@ -1187,7 +1155,6 @@ t_no_jobs_start_if_max_is_0() ->
?assertEqual({0, 1}, run_stop_count())
end).
-
t_one_job_starts_if_max_is_1() ->
?_test(begin
setup_jobs([oneshot(1), oneshot(2)]),
@@ -1195,7 +1162,6 @@ t_one_job_starts_if_max_is_1() ->
?assertEqual({1, 1}, run_stop_count())
end).
-
t_max_churn_does_not_throttle_initial_start() ->
?_test(begin
setup_jobs([oneshot(1), oneshot(2)]),
@@ -1203,7 +1169,6 @@ t_max_churn_does_not_throttle_initial_start() ->
?assertEqual({2, 0}, run_stop_count())
end).
-
t_excess_oneshot_only_jobs() ->
?_test(begin
setup_jobs([oneshot_running(1), oneshot_running(2)]),
@@ -1214,7 +1179,6 @@ t_excess_oneshot_only_jobs() ->
?assertEqual({0, 2}, run_stop_count())
end).
-
t_excess_continuous_only_jobs() ->
?_test(begin
setup_jobs([continuous_running(1), continuous_running(2)]),
@@ -1225,7 +1189,6 @@ t_excess_continuous_only_jobs() ->
?assertEqual({0, 2}, run_stop_count())
end).
-
t_excess_prefer_continuous_first() ->
?_test(begin
Jobs = [
@@ -1245,7 +1208,6 @@ t_excess_prefer_continuous_first() ->
?assertEqual({0, 1}, oneshot_run_stop_count())
end).
-
t_stop_oldest_first() ->
?_test(begin
Jobs = [
@@ -1261,7 +1223,6 @@ t_stop_oldest_first() ->
?assertEqual([7], jobs_running())
end).
-
t_start_oldest_first() ->
?_test(begin
setup_jobs([continuous(7), continuous(2), continuous(5)]),
@@ -1275,7 +1236,6 @@ t_start_oldest_first() ->
?assertEqual([2], jobs_stopped())
end).
-
t_jobs_churn_even_if_not_all_max_jobs_are_running() ->
?_test(begin
setup_jobs([
@@ -1288,9 +1248,8 @@ t_jobs_churn_even_if_not_all_max_jobs_are_running() ->
?assertEqual([7], jobs_stopped())
end).
-
t_jobs_dont_churn_if_there_are_available_running_slots() ->
- ?_test(begin
+ ?_test(begin
setup_jobs([
continuous_running(1),
continuous_running(2)
@@ -1301,9 +1260,8 @@ t_jobs_dont_churn_if_there_are_available_running_slots() ->
?assertEqual(0, meck:num_calls(couch_replicator_scheduler_sup, start_child, 1))
end).
-
t_start_only_pending_jobs_do_not_churn_existing_ones() ->
- ?_test(begin
+ ?_test(begin
setup_jobs([
continuous(1),
continuous_running(2)
@@ -1314,7 +1272,6 @@ t_start_only_pending_jobs_do_not_churn_existing_ones() ->
?assertEqual({2, 0}, run_stop_count())
end).
-
t_dont_stop_if_nothing_pending() ->
?_test(begin
setup_jobs([continuous_running(1), continuous_running(2)]),
@@ -1322,7 +1279,6 @@ t_dont_stop_if_nothing_pending() ->
?assertEqual({2, 0}, run_stop_count())
end).
-
t_max_churn_limits_number_of_rotated_jobs() ->
?_test(begin
Jobs = [
@@ -1336,7 +1292,6 @@ t_max_churn_limits_number_of_rotated_jobs() ->
?assertEqual([2, 3], jobs_stopped())
end).
-
t_if_pending_less_than_running_start_all_pending() ->
?_test(begin
Jobs = [
@@ -1351,7 +1306,6 @@ t_if_pending_less_than_running_start_all_pending() ->
?assertEqual([1, 2, 5], jobs_running())
end).
-
t_running_less_than_pending_swap_all_running() ->
?_test(begin
Jobs = [
@@ -1366,7 +1320,6 @@ t_running_less_than_pending_swap_all_running() ->
?assertEqual([3, 4, 5], jobs_stopped())
end).
-
t_oneshot_dont_get_rotated() ->
?_test(begin
setup_jobs([oneshot_running(1), continuous(2)]),
@@ -1374,7 +1327,6 @@ t_oneshot_dont_get_rotated() ->
?assertEqual([1], jobs_running())
end).
-
t_rotate_continuous_only_if_mixed() ->
?_test(begin
setup_jobs([continuous(1), oneshot_running(2), continuous_running(3)]),
@@ -1382,7 +1334,6 @@ t_rotate_continuous_only_if_mixed() ->
?assertEqual([1, 2], jobs_running())
end).
-
t_oneshot_dont_get_starting_priority() ->
?_test(begin
setup_jobs([continuous(1), oneshot(2), continuous_running(3)]),
@@ -1390,7 +1341,6 @@ t_oneshot_dont_get_starting_priority() ->
?assertEqual([1], jobs_running())
end).
-
% This tested in other test cases, it is here to mainly make explicit a property
% of one-shot replications -- they can starve other jobs if they "take control"
% of all the available scheduler slots.
@@ -1407,7 +1357,6 @@ t_oneshot_will_hog_the_scheduler() ->
?assertEqual([1, 2], jobs_running())
end).
-
t_if_excess_is_trimmed_rotation_still_happens() ->
?_test(begin
Jobs = [
@@ -1420,12 +1369,11 @@ t_if_excess_is_trimmed_rotation_still_happens() ->
?assertEqual([1], jobs_running())
end).
-
t_if_transient_job_crashes_it_gets_removed() ->
?_test(begin
Pid = mock_pid(),
Rep = continuous_rep(),
- Job = #job{
+ Job = #job{
id = job1,
pid = Pid,
history = [added()],
@@ -1434,17 +1382,18 @@ t_if_transient_job_crashes_it_gets_removed() ->
setup_jobs([Job]),
?assertEqual(1, ets:info(?MODULE, size)),
State = #state{max_history = 3, stats_pid = self()},
- {noreply, State} = handle_info({'DOWN', r1, process, Pid, failed},
- State),
+ {noreply, State} = handle_info(
+ {'DOWN', r1, process, Pid, failed},
+ State
+ ),
?assertEqual(0, ets:info(?MODULE, size))
- end).
-
+ end).
t_if_permanent_job_crashes_it_stays_in_ets() ->
?_test(begin
Pid = mock_pid(),
Rep = continuous_rep(),
- Job = #job{
+ Job = #job{
id = job1,
pid = Pid,
history = [added()],
@@ -1457,14 +1406,15 @@ t_if_permanent_job_crashes_it_stays_in_ets() ->
max_history = 3,
stats_pid = self()
},
- {noreply, State} = handle_info({'DOWN', r1, process, Pid, failed},
- State),
+ {noreply, State} = handle_info(
+ {'DOWN', r1, process, Pid, failed},
+ State
+ ),
?assertEqual(1, ets:info(?MODULE, size)),
[Job1] = ets:lookup(?MODULE, job1),
[Latest | _] = Job1#job.history,
?assertMatch({{crashed, failed}, _}, Latest)
- end).
-
+ end).
t_existing_jobs() ->
?_test(begin
@@ -1479,11 +1429,10 @@ t_existing_jobs() ->
?assertNot(existing_replication(NewRep#rep{options = []}))
end).
-
t_job_summary_running() ->
?_test(begin
Rep = rep(<<"s">>, <<"t">>),
- Job = #job{
+ Job = #job{
id = job1,
pid = mock_pid(),
history = [added()],
@@ -1501,10 +1450,9 @@ t_job_summary_running() ->
?assertEqual({Stats}, proplists:get_value(info, Summary1))
end).
-
t_job_summary_pending() ->
?_test(begin
- Job = #job{
+ Job = #job{
id = job1,
pid = undefined,
history = [stopped(20), started(10), added()],
@@ -1522,10 +1470,9 @@ t_job_summary_pending() ->
?assertEqual({Stats}, proplists:get_value(info, Summary1))
end).
-
t_job_summary_crashing_once() ->
?_test(begin
- Job = #job{
+ Job = #job{
id = job1,
history = [crashed(?DEFAULT_HEALTH_THRESHOLD_SEC + 1), started(0)],
rep = rep(<<"s">>, <<"t">>)
@@ -1538,10 +1485,9 @@ t_job_summary_crashing_once() ->
?assertEqual(0, proplists:get_value(error_count, Summary))
end).
-
t_job_summary_crashing_many_times() ->
?_test(begin
- Job = #job{
+ Job = #job{
id = job1,
history = [crashed(4), started(3), crashed(2), started(1)],
rep = rep(<<"s">>, <<"t">>)
@@ -1554,7 +1500,6 @@ t_job_summary_crashing_many_times() ->
?assertEqual(2, proplists:get_value(error_count, Summary))
end).
-
t_job_summary_proxy_fields() ->
?_test(begin
Src = #httpdb{
@@ -1565,20 +1510,23 @@ t_job_summary_proxy_fields() ->
url = "http://t",
proxy_url = "socks5://u:p@tproxy:34"
},
- Job = #job{
+ Job = #job{
id = job1,
history = [started(10), added()],
rep = rep(Src, Tgt)
},
setup_jobs([Job]),
Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC),
- ?assertEqual(<<"http://u:*****@sproxy:12">>,
- proplists:get_value(source_proxy, Summary)),
- ?assertEqual(<<"socks5://u:*****@tproxy:34">>,
- proplists:get_value(target_proxy, Summary))
+ ?assertEqual(
+ <<"http://u:*****@sproxy:12">>,
+ proplists:get_value(source_proxy, Summary)
+ ),
+ ?assertEqual(
+ <<"socks5://u:*****@tproxy:34">>,
+ proplists:get_value(target_proxy, Summary)
+ )
end).
-
% Test helper functions
setup_all() ->
@@ -1595,14 +1543,11 @@ setup_all() ->
meck:expect(couch_replicator_scheduler_sup, start_child, 1, {ok, Pid}),
couch_replicator_share:init().
-
-
teardown_all(_) ->
couch_replicator_share:clear(),
catch ets:delete(?MODULE),
meck:unload().
-
setup() ->
meck:reset([
couch_log,
@@ -1611,40 +1556,40 @@ setup() ->
config
]).
-
teardown(_) ->
ok.
-
setup_jobs(Jobs) when is_list(Jobs) ->
?MODULE = ets:new(?MODULE, [named_table, {keypos, #job.id}]),
ets:insert(?MODULE, Jobs).
-
all_jobs() ->
lists:usort(ets:tab2list(?MODULE)).
-
jobs_stopped() ->
[Job#job.id || Job <- all_jobs(), Job#job.pid =:= undefined].
-
jobs_running() ->
[Job#job.id || Job <- all_jobs(), Job#job.pid =/= undefined].
-
run_stop_count() ->
{length(jobs_running()), length(jobs_stopped())}.
-
oneshot_run_stop_count() ->
- Running = [Job#job.id || Job <- all_jobs(), Job#job.pid =/= undefined,
- not is_continuous(Job)],
- Stopped = [Job#job.id || Job <- all_jobs(), Job#job.pid =:= undefined,
- not is_continuous(Job)],
+ Running = [
+ Job#job.id
+ || Job <- all_jobs(),
+ Job#job.pid =/= undefined,
+ not is_continuous(Job)
+ ],
+ Stopped = [
+ Job#job.id
+ || Job <- all_jobs(),
+ Job#job.pid =:= undefined,
+ not is_continuous(Job)
+ ],
{length(Running), length(Stopped)}.
-
mock_state(MaxJobs) ->
#state{
max_jobs = MaxJobs,
@@ -1661,35 +1606,29 @@ mock_state(MaxJobs, MaxChurn) ->
stats_pid = self()
}.
-
rep() ->
#rep{options = [], user_ctx = #user_ctx{}}.
-
rep(Src, Tgt) ->
Rep = rep(),
Rep#rep{source = Src, target = Tgt}.
-
continuous_rep() ->
#rep{options = [{continuous, true}], user_ctx = #user_ctx{}}.
-
continuous_rep(Src, Tgt) ->
Rep = continuous_rep(),
Rep#rep{source = Src, target = Tgt}.
-
continuous(Id) when is_integer(Id) ->
Started = Id,
- Hist = [stopped(Started+1), started(Started), added()],
+ Hist = [stopped(Started + 1), started(Started), added()],
#job{
id = Id,
history = Hist,
rep = continuous_rep()
}.
-
continuous_running(Id) when is_integer(Id) ->
Started = Id,
Pid = mock_pid(),
@@ -1701,13 +1640,11 @@ continuous_running(Id) when is_integer(Id) ->
monitor = monitor(process, Pid)
}.
-
oneshot(Id) when is_integer(Id) ->
Started = Id,
Hist = [stopped(Started + 1), started(Started), added()],
#job{id = Id, history = Hist, rep = rep()}.
-
oneshot_running(Id) when is_integer(Id) ->
Started = Id,
Pid = mock_pid(),
@@ -1719,43 +1656,34 @@ oneshot_running(Id) when is_integer(Id) ->
monitor = monitor(process, Pid)
}.
-
testjob(Hist) when is_list(Hist) ->
#job{history = Hist}.
-
mock_pid() ->
- list_to_pid("<0.999.999>").
+ list_to_pid("<0.999.999>").
crashed() ->
crashed(0).
-
-crashed(WhenSec) when is_integer(WhenSec)->
+crashed(WhenSec) when is_integer(WhenSec) ->
{{crashed, some_reason}, {0, WhenSec, 0}};
crashed({MSec, Sec, USec}) ->
{{crashed, some_reason}, {MSec, Sec, USec}}.
-
started() ->
started(0).
-
-started(WhenSec) when is_integer(WhenSec)->
+started(WhenSec) when is_integer(WhenSec) ->
{started, {0, WhenSec, 0}};
-
started({MSec, Sec, USec}) ->
{started, {MSec, Sec, USec}}.
-
stopped() ->
stopped(0).
-
stopped(WhenSec) ->
{stopped, {0, WhenSec, 0}}.
-
added() ->
{added, {0, 0, 0}}.