summaryrefslogtreecommitdiff
path: root/src/couch_replicator/src/couch_replicator_scheduler.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couch_replicator/src/couch_replicator_scheduler.erl')
-rw-r--r--src/couch_replicator/src/couch_replicator_scheduler.erl1687
1 files changed, 0 insertions, 1687 deletions
diff --git a/src/couch_replicator/src/couch_replicator_scheduler.erl b/src/couch_replicator/src/couch_replicator_scheduler.erl
deleted file mode 100644
index 53c040e8c..000000000
--- a/src/couch_replicator/src/couch_replicator_scheduler.erl
+++ /dev/null
@@ -1,1687 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_replicator_scheduler).
-
--behaviour(gen_server).
--behaviour(config_listener).
-
--export([
- start_link/0
-]).
-
--export([
- init/1,
- terminate/2,
- handle_call/3,
- handle_info/2,
- handle_cast/2,
- code_change/3,
- format_status/2
-]).
-
--export([
- add_job/1,
- remove_job/1,
- reschedule/0,
- rep_state/1,
- find_jobs_by_dbname/1,
- find_jobs_by_doc/2,
- job_summary/2,
- health_threshold/0,
- jobs/0,
- job/1,
- restart_job/1,
- update_job_stats/2
-]).
-
-%% config_listener callbacks
--export([
- handle_config_change/5,
- handle_config_terminate/3
-]).
-
-%% for status updater process to allow hot code loading
--export([
- stats_updater_loop/1
-]).
-
--include("couch_replicator_scheduler.hrl").
--include("couch_replicator.hrl").
--include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
--include_lib("couch/include/couch_db.hrl").
-
-%% types
--type event_type() :: added | started | stopped | {crashed, any()}.
--type event() :: {Type:: event_type(), When :: erlang:timestamp()}.
--type history() :: nonempty_list(event()).
-
-%% definitions
--define(MAX_BACKOFF_EXPONENT, 10).
--define(BACKOFF_INTERVAL_MICROS, 30 * 1000 * 1000).
--define(DEFAULT_HEALTH_THRESHOLD_SEC, 2 * 60).
--define(RELISTEN_DELAY, 5000).
--define(STATS_UPDATE_WAIT, 5000).
-
--define(DEFAULT_MAX_JOBS, 500).
--define(DEFAULT_MAX_CHURN, 20).
--define(DEFAULT_MAX_HISTORY, 20).
--define(DEFAULT_SCHEDULER_INTERVAL, 60000).
-
-
--record(state, {interval, timer, max_jobs, max_churn, max_history, stats_pid}).
--record(job, {
- id :: job_id() | '$1' | '_',
- rep :: #rep{} | '_',
- pid :: undefined | pid() | '$1' | '_',
- monitor :: undefined | reference() | '_',
- history :: history() | '_'
-}).
-
--record(stats_acc, {
- pending_n = 0 :: non_neg_integer(),
- running_n = 0 :: non_neg_integer(),
- crashed_n = 0 :: non_neg_integer()
-}).
-
-
-%% public functions
-
--spec start_link() -> {ok, pid()} | ignore | {error, term()}.
-start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
-
--spec add_job(#rep{}) -> ok.
-add_job(#rep{} = Rep) when Rep#rep.id /= undefined ->
- case existing_replication(Rep) of
- false ->
- Job = #job{
- id = Rep#rep.id,
- rep = Rep,
- history = [{added, os:timestamp()}]
- },
- gen_server:call(?MODULE, {add_job, Job}, infinity);
- true ->
- ok
- end.
-
-
--spec remove_job(job_id()) -> ok.
-remove_job(Id) ->
- gen_server:call(?MODULE, {remove_job, Id}, infinity).
-
-
--spec reschedule() -> ok.
-% Trigger a manual reschedule. Used for testing and/or ops.
-reschedule() ->
- gen_server:call(?MODULE, reschedule, infinity).
-
-
--spec rep_state(rep_id()) -> #rep{} | nil.
-rep_state(RepId) ->
- case (catch ets:lookup_element(?MODULE, RepId, #job.rep)) of
- {'EXIT',{badarg, _}} ->
- nil;
- Rep ->
- Rep
- end.
-
-
--spec job_summary(job_id(), non_neg_integer()) -> [_] | nil.
-job_summary(JobId, HealthThreshold) ->
- case job_by_id(JobId) of
- {ok, #job{pid = Pid, history = History, rep = Rep}} ->
- ErrorCount = consecutive_crashes(History, HealthThreshold),
- {State, Info} = case {Pid, ErrorCount} of
- {undefined, 0} ->
- case History of
- [{{crashed, Error}, _When} | _] ->
- {crashing, crash_reason_json(Error)};
- [_ | _] ->
- {pending, Rep#rep.stats}
- end;
- {undefined, ErrorCount} when ErrorCount > 0 ->
- [{{crashed, Error}, _When} | _] = History,
- {crashing, crash_reason_json(Error)};
- {Pid, ErrorCount} when is_pid(Pid) ->
- {running, Rep#rep.stats}
- end,
- [
- {source, iolist_to_binary(ejson_url(Rep#rep.source))},
- {target, iolist_to_binary(ejson_url(Rep#rep.target))},
- {state, State},
- {info, couch_replicator_utils:ejson_state_info(Info)},
- {error_count, ErrorCount},
- {last_updated, last_updated(History)},
- {start_time,
- couch_replicator_utils:iso8601(Rep#rep.start_time)},
- {source_proxy, job_proxy_url(Rep#rep.source)},
- {target_proxy, job_proxy_url(Rep#rep.target)}
- ];
- {error, not_found} ->
- nil % Job might have just completed
- end.
-
-
-job_proxy_url(#httpdb{proxy_url = ProxyUrl}) when is_list(ProxyUrl) ->
- list_to_binary(couch_util:url_strip_password(ProxyUrl));
-job_proxy_url(_Endpoint) ->
- null.
-
-
-% Health threshold is the minimum amount of time an unhealthy job should run
-% crashing before it is considered to be healthy again. HealtThreashold should
-% not be 0 as jobs could start and immediately crash, and it shouldn't be
-% infinity, since then consecutive crashes would accumulate forever even if
-% job is back to normal.
--spec health_threshold() -> non_neg_integer().
-health_threshold() ->
- config:get_integer("replicator", "health_threshold",
- ?DEFAULT_HEALTH_THRESHOLD_SEC).
-
-
--spec find_jobs_by_dbname(binary()) -> list(#rep{}).
-find_jobs_by_dbname(DbName) ->
- Rep = #rep{db_name = DbName, _ = '_'},
- MatchSpec = #job{id = '$1', rep = Rep, _ = '_'},
- [RepId || [RepId] <- ets:match(?MODULE, MatchSpec)].
-
-
--spec find_jobs_by_doc(binary(), binary()) -> list(#rep{}).
-find_jobs_by_doc(DbName, DocId) ->
- Rep = #rep{db_name = DbName, doc_id = DocId, _ = '_'},
- MatchSpec = #job{id = '$1', rep = Rep, _ = '_'},
- [RepId || [RepId] <- ets:match(?MODULE, MatchSpec)].
-
-
--spec restart_job(binary() | list() | rep_id()) ->
- {ok, {[_]}} | {error, not_found}.
-restart_job(JobId) ->
- case rep_state(JobId) of
- nil ->
- {error, not_found};
- #rep{} = Rep ->
- ok = remove_job(JobId),
- ok = add_job(Rep),
- job(JobId)
- end.
-
-
--spec update_job_stats(job_id(), term()) -> ok.
-update_job_stats(JobId, Stats) ->
- gen_server:cast(?MODULE, {update_job_stats, JobId, Stats}).
-
-
-%% gen_server functions
-
-init(_) ->
- config:enable_feature('scheduler'),
- EtsOpts = [named_table, {keypos, #job.id}, {read_concurrency, true},
- {write_concurrency, true}],
- ?MODULE = ets:new(?MODULE, EtsOpts),
- ok = config:listen_for_changes(?MODULE, nil),
- Interval = config:get_integer("replicator", "interval",
- ?DEFAULT_SCHEDULER_INTERVAL),
- MaxJobs = config:get_integer("replicator", "max_jobs", ?DEFAULT_MAX_JOBS),
- MaxChurn = config:get_integer("replicator", "max_churn",
- ?DEFAULT_MAX_CHURN),
- MaxHistory = config:get_integer("replicator", "max_history",
- ?DEFAULT_MAX_HISTORY),
- Timer = erlang:send_after(Interval, self(), reschedule),
- State = #state{
- interval = Interval,
- max_jobs = MaxJobs,
- max_churn = MaxChurn,
- max_history = MaxHistory,
- timer = Timer,
- stats_pid = start_stats_updater()
- },
- {ok, State}.
-
-
-handle_call({add_job, Job}, _From, State) ->
- ok = maybe_remove_job_int(Job#job.id, State),
- true = add_job_int(Job),
- ok = maybe_start_newly_added_job(Job, State),
- couch_stats:increment_counter([couch_replicator, jobs, adds]),
- TotalJobs = ets:info(?MODULE, size),
- couch_stats:update_gauge([couch_replicator, jobs, total], TotalJobs),
- {reply, ok, State};
-
-handle_call({remove_job, Id}, _From, State) ->
- ok = maybe_remove_job_int(Id, State),
- {reply, ok, State};
-
-handle_call(reschedule, _From, State) ->
- ok = reschedule(State),
- {reply, ok, State};
-
-handle_call(_, _From, State) ->
- {noreply, State}.
-
-
-handle_cast({set_max_jobs, MaxJobs}, State) when is_integer(MaxJobs),
- MaxJobs >= 0 ->
- couch_log:notice("~p: max_jobs set to ~B", [?MODULE, MaxJobs]),
- {noreply, State#state{max_jobs = MaxJobs}};
-
-handle_cast({set_max_churn, MaxChurn}, State) when is_integer(MaxChurn),
- MaxChurn > 0 ->
- couch_log:notice("~p: max_churn set to ~B", [?MODULE, MaxChurn]),
- {noreply, State#state{max_churn = MaxChurn}};
-
-handle_cast({set_max_history, MaxHistory}, State) when is_integer(MaxHistory),
- MaxHistory > 0 ->
- couch_log:notice("~p: max_history set to ~B", [?MODULE, MaxHistory]),
- {noreply, State#state{max_history = MaxHistory}};
-
-handle_cast({set_interval, Interval}, State) when is_integer(Interval),
- Interval > 0 ->
- couch_log:notice("~p: interval set to ~B", [?MODULE, Interval]),
- {noreply, State#state{interval = Interval}};
-
-handle_cast({update_job_stats, JobId, Stats}, State) ->
- case rep_state(JobId) of
- nil ->
- ok;
- #rep{} = Rep ->
- NewRep = Rep#rep{stats = Stats},
- true = ets:update_element(?MODULE, JobId, {#job.rep, NewRep})
- end,
- {noreply, State};
-
-handle_cast(UnexpectedMsg, State) ->
- couch_log:error("~p: received un-expected cast ~p", [?MODULE, UnexpectedMsg]),
- {noreply, State}.
-
-
-handle_info(reschedule, State) ->
- ok = reschedule(State),
- erlang:cancel_timer(State#state.timer),
- Timer = erlang:send_after(State#state.interval, self(), reschedule),
- {noreply, State#state{timer = Timer}};
-
-handle_info({'DOWN', _Ref, process, Pid, normal}, State) ->
- {ok, Job} = job_by_pid(Pid),
- couch_log:notice("~p: Job ~p completed normally", [?MODULE, Job#job.id]),
- remove_job_int(Job),
- update_running_jobs_stats(State#state.stats_pid),
- {noreply, State};
-
-handle_info({'DOWN', _Ref, process, Pid, Reason0}, State) ->
- {ok, Job} = job_by_pid(Pid),
- Reason = case Reason0 of
- {shutdown, ShutdownReason} -> ShutdownReason;
- Other -> Other
- end,
- ok = handle_crashed_job(Job, Reason, State),
- {noreply, State};
-
-handle_info(restart_config_listener, State) ->
- ok = config:listen_for_changes(?MODULE, nil),
- {noreply, State};
-
-handle_info(_, State) ->
- {noreply, State}.
-
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-
-terminate(_Reason, _State) ->
- ok.
-
-
-format_status(_Opt, [_PDict, State]) ->
- [
- {max_jobs, State#state.max_jobs},
- {running_jobs, running_job_count()},
- {pending_jobs, pending_job_count()}
- ].
-
-
-%% config listener functions
-
-handle_config_change("replicator", "max_jobs", V, _, S) ->
- ok = gen_server:cast(?MODULE, {set_max_jobs, list_to_integer(V)}),
- {ok, S};
-
-handle_config_change("replicator", "max_churn", V, _, S) ->
- ok = gen_server:cast(?MODULE, {set_max_churn, list_to_integer(V)}),
- {ok, S};
-
-handle_config_change("replicator", "interval", V, _, S) ->
- ok = gen_server:cast(?MODULE, {set_interval, list_to_integer(V)}),
- {ok, S};
-
-handle_config_change("replicator", "max_history", V, _, S) ->
- ok = gen_server:cast(?MODULE, {set_max_history, list_to_integer(V)}),
- {ok, S};
-
-handle_config_change(_, _, _, _, S) ->
- {ok, S}.
-
-
-handle_config_terminate(_, stop, _) ->
- ok;
-
-handle_config_terminate(_, _, _) ->
- Pid = whereis(?MODULE),
- erlang:send_after(?RELISTEN_DELAY, Pid, restart_config_listener).
-
-
-%% Private functions
-
-% Handle crashed jobs. Handling differs between transient and permanent jobs.
-% Transient jobs are those posted to the _replicate endpoint. They don't have a
-% db associated with them. When those jobs crash, they are not restarted. That
-% is also consistent with behavior when the node they run on, crashed and they
-% do not migrate to other nodes. Permanent jobs are those created from
-% replicator documents. Those jobs, once they pass basic validation and end up
-% in the scheduler will be retried indefinitely (with appropriate exponential
-% backoffs).
--spec handle_crashed_job(#job{}, any(), #state{}) -> ok.
-handle_crashed_job(#job{rep = #rep{db_name = null}} = Job, Reason, State) ->
- Msg = "~p : Transient job ~p failed, removing. Error: ~p",
- ErrorBinary = couch_replicator_utils:rep_error_to_binary(Reason),
- couch_log:error(Msg, [?MODULE, Job#job.id, ErrorBinary]),
- remove_job_int(Job),
- update_running_jobs_stats(State#state.stats_pid),
- ok;
-
-handle_crashed_job(Job, Reason, State) ->
- ok = update_state_crashed(Job, Reason, State),
- case couch_replicator_doc_processor:update_docs() of
- true ->
- couch_replicator_docs:update_error(Job#job.rep, Reason);
- false ->
- ok
- end,
- case ets:info(?MODULE, size) < State#state.max_jobs of
- true ->
- % Starting pending jobs is an O(TotalJobsCount) operation. Only do
- % it if there is a relatively small number of jobs. Otherwise
- % scheduler could be blocked if there is a cascade of lots failing
- % jobs in a row.
- start_pending_jobs(State),
- update_running_jobs_stats(State#state.stats_pid),
- ok;
- false ->
- ok
- end.
-
-
-% Attempt to start a newly added job. First quickly check if total jobs
-% already exceed max jobs, then do a more expensive check which runs a
-% select (an O(n) operation) to check pending jobs specifically.
--spec maybe_start_newly_added_job(#job{}, #state{}) -> ok.
-maybe_start_newly_added_job(Job, State) ->
- MaxJobs = State#state.max_jobs,
- TotalJobs = ets:info(?MODULE, size),
- case TotalJobs < MaxJobs andalso running_job_count() < MaxJobs of
- true ->
- start_job_int(Job, State),
- update_running_jobs_stats(State#state.stats_pid),
- ok;
- false ->
- ok
- end.
-
-
-% Return up to a given number of oldest, not recently crashed jobs. Try to be
-% memory efficient and use ets:foldl to accumulate jobs.
--spec pending_jobs(non_neg_integer()) -> [#job{}].
-pending_jobs(0) ->
- % Handle this case as user could set max_churn to 0. If this is passed to
- % other function clause it will crash as gb_sets:largest assumes set is not
- % empty.
- [];
-
-pending_jobs(Count) when is_integer(Count), Count > 0 ->
- Set0 = gb_sets:new(), % [{LastStart, Job},...]
- Now = os:timestamp(),
- Acc0 = {Set0, Now, Count, health_threshold()},
- {Set1, _, _, _} = ets:foldl(fun pending_fold/2, Acc0, ?MODULE),
- [Job || {_Started, Job} <- gb_sets:to_list(Set1)].
-
-
-pending_fold(Job, {Set, Now, Count, HealthThreshold}) ->
- Set1 = case {not_recently_crashed(Job, Now, HealthThreshold),
- gb_sets:size(Set) >= Count} of
- {true, true} ->
- % Job is healthy but already reached accumulated limit, so might
- % have to replace one of the accumulated jobs
- pending_maybe_replace(Job, Set);
- {true, false} ->
- % Job is healthy and we haven't reached the limit, so add job
- % to accumulator
- gb_sets:add_element({last_started(Job), Job}, Set);
- {false, _} ->
- % This job is not healthy (has crashed too recently), so skip it.
- Set
- end,
- {Set1, Now, Count, HealthThreshold}.
-
-
-% Replace Job in the accumulator if it is older than youngest job there.
-% "oldest" here means one which has been waiting to run the longest. "youngest"
-% means the one with most recent activity. The goal is to keep up to Count
-% oldest jobs during iteration. For example if there are jobs with these times
-% accumulated so far [5, 7, 11], and start time of current job is 6. Then
-% 6 < 11 is true, so 11 (youngest) is dropped and 6 inserted resulting in
-% [5, 6, 7]. In the end the result might look like [1, 2, 5], for example.
-pending_maybe_replace(Job, Set) ->
- Started = last_started(Job),
- {Youngest, YoungestJob} = gb_sets:largest(Set),
- case Started < Youngest of
- true ->
- Set1 = gb_sets:delete({Youngest, YoungestJob}, Set),
- gb_sets:add_element({Started, Job}, Set1);
- false ->
- Set
- end.
-
-
-start_jobs(Count, State) ->
- [start_job_int(Job, State) || Job <- pending_jobs(Count)],
- ok.
-
-
--spec stop_jobs(non_neg_integer(), boolean(), #state{}) -> non_neg_integer().
-stop_jobs(Count, _, _) when is_integer(Count), Count =< 0 ->
- 0;
-
-stop_jobs(Count, IsContinuous, State) when is_integer(Count) ->
- Running0 = running_jobs(),
- ContinuousPred = fun(Job) -> is_continuous(Job) =:= IsContinuous end,
- Running1 = lists:filter(ContinuousPred, Running0),
- Running2 = lists:sort(fun longest_running/2, Running1),
- Running3 = lists:sublist(Running2, Count),
- length([stop_job_int(Job, State) || Job <- Running3]).
-
-
-longest_running(#job{} = A, #job{} = B) ->
- last_started(A) =< last_started(B).
-
-
-not_recently_crashed(#job{history = History}, Now, HealthThreshold) ->
- case History of
- [{added, _When}] ->
- true;
- [{stopped, _When} | _] ->
- true;
- _ ->
- LatestCrashT = latest_crash_timestamp(History),
- CrashCount = consecutive_crashes(History, HealthThreshold),
- timer:now_diff(Now, LatestCrashT) >= backoff_micros(CrashCount)
- end.
-
-
-% Count consecutive crashes. A crash happens when there is a `crashed` event
-% within a short period of time (configurable) after any other event. It could
-% be `crashed, started` for jobs crashing quickly after starting, `crashed,
-% crashed`, `crashed, stopped` if job repeatedly failed to start
-% being stopped. Or it could be `crashed, added` if it crashed immediately after
-% being added during start.
-%
-% A streak of "consecutive crashes" ends when a crashed event is seen starting
-% and running successfully without crashing for a period of time. That period
-% of time is the HealthThreshold.
-%
-
--spec consecutive_crashes(history(), non_neg_integer()) -> non_neg_integer().
-consecutive_crashes(History, HealthThreshold) when is_list(History) ->
- consecutive_crashes(History, HealthThreshold, 0).
-
-
--spec consecutive_crashes(history(), non_neg_integer(), non_neg_integer()) ->
- non_neg_integer().
-consecutive_crashes([], _HealthThreashold, Count) ->
- Count;
-
-consecutive_crashes([{{crashed, _}, CrashT}, {_, PrevT} = PrevEvent | Rest],
- HealthThreshold, Count) ->
- case timer:now_diff(CrashT, PrevT) > HealthThreshold * 1000000 of
- true ->
- Count;
- false ->
- consecutive_crashes([PrevEvent | Rest], HealthThreshold, Count + 1)
- end;
-
-consecutive_crashes([{stopped, _}, {started, _} | _], _HealthThreshold,
- Count) ->
- Count;
-
-consecutive_crashes([_ | Rest], HealthThreshold, Count) ->
- consecutive_crashes(Rest, HealthThreshold, Count).
-
-
--spec latest_crash_timestamp(history()) -> erlang:timestamp().
-latest_crash_timestamp([]) ->
- {0, 0, 0}; % Used to avoid special-casing "no crash" when doing now_diff
-
-latest_crash_timestamp([{{crashed, _Reason}, When} | _]) ->
- When;
-
-latest_crash_timestamp([_Event | Rest]) ->
- latest_crash_timestamp(Rest).
-
-
--spec backoff_micros(non_neg_integer()) -> non_neg_integer().
-backoff_micros(CrashCount) ->
- % When calculating the backoff interval treat consecutive crash count as the
- % exponent in Base * 2 ^ CrashCount to achieve an exponential backoff
- % doubling every consecutive failure, starting with the base value of
- % ?BACKOFF_INTERVAL_MICROS.
- BackoffExp = erlang:min(CrashCount - 1, ?MAX_BACKOFF_EXPONENT),
- (1 bsl BackoffExp) * ?BACKOFF_INTERVAL_MICROS.
-
-
--spec add_job_int(#job{}) -> boolean().
-add_job_int(#job{} = Job) ->
- ets:insert_new(?MODULE, Job).
-
-
--spec maybe_remove_job_int(job_id(), #state{}) -> ok.
-maybe_remove_job_int(JobId, State) ->
- case job_by_id(JobId) of
- {ok, Job} ->
- ok = stop_job_int(Job, State),
- true = remove_job_int(Job),
- couch_stats:increment_counter([couch_replicator, jobs, removes]),
- TotalJobs = ets:info(?MODULE, size),
- couch_stats:update_gauge([couch_replicator, jobs, total],
- TotalJobs),
- update_running_jobs_stats(State#state.stats_pid),
- ok;
- {error, not_found} ->
- ok
- end.
-
-
-start_job_int(#job{pid = Pid}, _State) when Pid /= undefined ->
- ok;
-
-start_job_int(#job{} = Job0, State) ->
- Job = maybe_optimize_job_for_rate_limiting(Job0),
- case couch_replicator_scheduler_sup:start_child(Job#job.rep) of
- {ok, Child} ->
- Ref = monitor(process, Child),
- ok = update_state_started(Job, Child, Ref, State),
- couch_log:notice("~p: Job ~p started as ~p",
- [?MODULE, Job#job.id, Child]);
- {error, {already_started, OtherPid}} when node(OtherPid) =:= node() ->
- Ref = monitor(process, OtherPid),
- ok = update_state_started(Job, OtherPid, Ref, State),
- couch_log:notice("~p: Job ~p already running as ~p. Most likely"
- " because replicator scheduler was restarted",
- [?MODULE, Job#job.id, OtherPid]);
- {error, {already_started, OtherPid}} when node(OtherPid) =/= node() ->
- CrashMsg = "Duplicate replication running on another node",
- couch_log:notice("~p: Job ~p already running as ~p. Most likely"
- " because a duplicate replication is running on another node",
- [?MODULE, Job#job.id, OtherPid]),
- ok = update_state_crashed(Job, CrashMsg, State);
- {error, Reason} ->
- couch_log:notice("~p: Job ~p failed to start for reason ~p",
- [?MODULE, Job, Reason]),
- ok = update_state_crashed(Job, Reason, State)
- end.
-
-
--spec stop_job_int(#job{}, #state{}) -> ok | {error, term()}.
-stop_job_int(#job{pid = undefined}, _State) ->
- ok;
-
-stop_job_int(#job{} = Job, State) ->
- ok = couch_replicator_scheduler_sup:terminate_child(Job#job.pid),
- demonitor(Job#job.monitor, [flush]),
- ok = update_state_stopped(Job, State),
- couch_log:notice("~p: Job ~p stopped as ~p",
- [?MODULE, Job#job.id, Job#job.pid]).
-
-
--spec remove_job_int(#job{}) -> true.
-remove_job_int(#job{} = Job) ->
- ets:delete(?MODULE, Job#job.id).
-
-
--spec running_job_count() -> non_neg_integer().
-running_job_count() ->
- ets:info(?MODULE, size) - pending_job_count().
-
-
--spec running_jobs() -> [#job{}].
-running_jobs() ->
- ets:select(?MODULE, [{#job{pid = '$1', _='_'}, [{is_pid, '$1'}], ['$_']}]).
-
-
--spec pending_job_count() -> non_neg_integer().
-pending_job_count() ->
- ets:select_count(?MODULE, [{#job{pid=undefined, _='_'}, [], [true]}]).
-
-
--spec job_by_pid(pid()) -> {ok, #job{}} | {error, not_found}.
-job_by_pid(Pid) when is_pid(Pid) ->
- case ets:match_object(?MODULE, #job{pid=Pid, _='_'}) of
- [] ->
- {error, not_found};
- [#job{}=Job] ->
- {ok, Job}
- end.
-
-
--spec job_by_id(job_id()) -> {ok, #job{}} | {error, not_found}.
-job_by_id(Id) ->
- case ets:lookup(?MODULE, Id) of
- [] ->
- {error, not_found};
- [#job{}=Job] ->
- {ok, Job}
- end.
-
-
--spec update_state_stopped(#job{}, #state{}) -> ok.
-update_state_stopped(Job, State) ->
- Job1 = reset_job_process(Job),
- Job2 = update_history(Job1, stopped, os:timestamp(), State),
- true = ets:insert(?MODULE, Job2),
- couch_stats:increment_counter([couch_replicator, jobs, stops]),
- ok.
-
-
--spec update_state_started(#job{}, pid(), reference(), #state{}) -> ok.
-update_state_started(Job, Pid, Ref, State) ->
- Job1 = set_job_process(Job, Pid, Ref),
- Job2 = update_history(Job1, started, os:timestamp(), State),
- true = ets:insert(?MODULE, Job2),
- couch_stats:increment_counter([couch_replicator, jobs, starts]),
- ok.
-
-
--spec update_state_crashed(#job{}, any(), #state{}) -> ok.
-update_state_crashed(Job, Reason, State) ->
- Job1 = reset_job_process(Job),
- Job2 = update_history(Job1, {crashed, Reason}, os:timestamp(), State),
- true = ets:insert(?MODULE, Job2),
- couch_stats:increment_counter([couch_replicator, jobs, crashes]),
- ok.
-
-
--spec set_job_process(#job{}, pid(), reference()) -> #job{}.
-set_job_process(#job{} = Job, Pid, Ref) when is_pid(Pid), is_reference(Ref) ->
- Job#job{pid = Pid, monitor = Ref}.
-
-
--spec reset_job_process(#job{}) -> #job{}.
-reset_job_process(#job{} = Job) ->
- Job#job{pid = undefined, monitor = undefined}.
-
-
--spec reschedule(#state{}) -> ok.
-reschedule(State) ->
- StopCount = stop_excess_jobs(State, running_job_count()),
- rotate_jobs(State, StopCount),
- update_running_jobs_stats(State#state.stats_pid).
-
-
--spec stop_excess_jobs(#state{}, non_neg_integer()) -> non_neg_integer().
-stop_excess_jobs(State, Running) ->
- #state{max_jobs=MaxJobs} = State,
- StopCount = max(0, Running - MaxJobs),
- Stopped = stop_jobs(StopCount, true, State),
- OneshotLeft = StopCount - Stopped,
- stop_jobs(OneshotLeft, false, State),
- StopCount.
-
-
-start_pending_jobs(State) ->
- #state{max_jobs=MaxJobs} = State,
- Running = running_job_count(),
- Pending = pending_job_count(),
- if Running < MaxJobs, Pending > 0 ->
- start_jobs(MaxJobs - Running, State);
- true ->
- ok
- end.
-
-
--spec rotate_jobs(#state{}, non_neg_integer()) -> ok.
-rotate_jobs(State, ChurnSoFar) ->
- #state{max_jobs=MaxJobs, max_churn=MaxChurn} = State,
- Running = running_job_count(),
- Pending = pending_job_count(),
- % Reduce MaxChurn by the number of already stopped jobs in the
- % current rescheduling cycle.
- Churn = max(0, MaxChurn - ChurnSoFar),
- SlotsAvailable = MaxJobs - Running,
- if SlotsAvailable >= 0 ->
- % If there is are enough SlotsAvailable reduce StopCount to avoid
- % unnesessarily stopping jobs. `stop_jobs/3` ignores 0 or negative
- % values so we don't worry about that here.
- StopCount = lists:min([Pending - SlotsAvailable, Running, Churn]),
- stop_jobs(StopCount, true, State),
- StartCount = max(0, MaxJobs - running_job_count()),
- start_jobs(StartCount, State);
- true ->
- ok
- end.
-
-
--spec last_started(#job{}) -> erlang:timestamp().
-last_started(#job{} = Job) ->
- case lists:keyfind(started, 1, Job#job.history) of
- false ->
- {0, 0, 0};
- {started, When} ->
- When
- end.
-
-
--spec update_history(#job{}, event_type(), erlang:timestamp(), #state{}) ->
- #job{}.
-update_history(Job, Type, When, State) ->
- History0 = [{Type, When} | Job#job.history],
- History1 = lists:sublist(History0, State#state.max_history),
- Job#job{history = History1}.
-
-
--spec ejson_url(#httpdb{} | binary()) -> binary().
-ejson_url(#httpdb{}=Httpdb) ->
- couch_util:url_strip_password(Httpdb#httpdb.url);
-ejson_url(DbName) when is_binary(DbName) ->
- DbName.
-
-
--spec job_ejson(#job{}) -> {[_ | _]}.
-job_ejson(Job) ->
- Rep = Job#job.rep,
- Source = ejson_url(Rep#rep.source),
- Target = ejson_url(Rep#rep.target),
- History = lists:map(fun({Type, When}) ->
- EventProps = case Type of
- {crashed, Reason} ->
- [{type, crashed}, {reason, crash_reason_json(Reason)}];
- Type ->
- [{type, Type}]
- end,
- {[{timestamp, couch_replicator_utils:iso8601(When)} | EventProps]}
- end, Job#job.history),
- {BaseID, Ext} = Job#job.id,
- Pid = case Job#job.pid of
- undefined ->
- null;
- P when is_pid(P) ->
- ?l2b(pid_to_list(P))
- end,
- {[
- {id, iolist_to_binary([BaseID, Ext])},
- {pid, Pid},
- {source, iolist_to_binary(Source)},
- {target, iolist_to_binary(Target)},
- {database, Rep#rep.db_name},
- {user, (Rep#rep.user_ctx)#user_ctx.name},
- {doc_id, Rep#rep.doc_id},
- {info, couch_replicator_utils:ejson_state_info(Rep#rep.stats)},
- {history, History},
- {node, node()},
- {start_time, couch_replicator_utils:iso8601(Rep#rep.start_time)}
- ]}.
-
-
--spec jobs() -> [[tuple()]].
-jobs() ->
- ets:foldl(fun(Job, Acc) -> [job_ejson(Job) | Acc] end, [], ?MODULE).
-
-
--spec job(job_id()) -> {ok, {[_ | _]}} | {error, not_found}.
-job(JobId) ->
- case job_by_id(JobId) of
- {ok, Job} ->
- {ok, job_ejson(Job)};
- Error ->
- Error
- end.
-
-
-crash_reason_json({_CrashType, Info}) when is_binary(Info) ->
- Info;
-crash_reason_json(Reason) when is_binary(Reason) ->
- Reason;
-crash_reason_json(Error) ->
- couch_replicator_utils:rep_error_to_binary(Error).
-
-
--spec last_updated([_]) -> binary().
-last_updated([{_Type, When} | _]) ->
- couch_replicator_utils:iso8601(When).
-
-
--spec is_continuous(#job{}) -> boolean().
-is_continuous(#job{rep = Rep}) ->
- couch_util:get_value(continuous, Rep#rep.options, false).
-
-
-% If job crashed last time because it was rate limited, try to
-% optimize some options to help the job make progress.
--spec maybe_optimize_job_for_rate_limiting(#job{}) -> #job{}.
-maybe_optimize_job_for_rate_limiting(Job = #job{history =
- [{{crashed, max_backoff}, _} | _]}) ->
- Opts = [
- {checkpoint_interval, 5000},
- {worker_processes, 2},
- {worker_batch_size, 100},
- {http_connections, 5}
- ],
- Rep = lists:foldl(fun optimize_int_option/2, Job#job.rep, Opts),
- Job#job{rep = Rep};
-maybe_optimize_job_for_rate_limiting(Job) ->
- Job.
-
-
--spec optimize_int_option({atom(), any()}, #rep{}) -> #rep{}.
-optimize_int_option({Key, Val}, #rep{options = Options} = Rep) ->
- case couch_util:get_value(Key, Options) of
- CurVal when is_integer(CurVal), CurVal > Val ->
- Msg = "~p replication ~p : setting ~p = ~p due to rate limiting",
- couch_log:warning(Msg, [?MODULE, Rep#rep.id, Key, Val]),
- Options1 = lists:keyreplace(Key, 1, Options, {Key, Val}),
- Rep#rep{options = Options1};
- _ ->
- Rep
- end.
-
-
-% Updater is a separate process. It receives `update_stats` messages and
-% updates scheduler stats from the scheduler jobs table. Updates are
-% performed no more frequently than once per ?STATS_UPDATE_WAIT milliseconds.
-
-update_running_jobs_stats(StatsPid) when is_pid(StatsPid) ->
- StatsPid ! update_stats,
- ok.
-
-
-start_stats_updater() ->
- erlang:spawn_link(?MODULE, stats_updater_loop, [undefined]).
-
-
-stats_updater_loop(Timer) ->
- receive
- update_stats when Timer == undefined ->
- TRef = erlang:send_after(?STATS_UPDATE_WAIT, self(), refresh_stats),
- ?MODULE:stats_updater_loop(TRef);
- update_stats when is_reference(Timer) ->
- ?MODULE:stats_updater_loop(Timer);
- refresh_stats ->
- ok = stats_updater_refresh(),
- ?MODULE:stats_updater_loop(undefined);
- Else ->
- erlang:exit({stats_updater_bad_msg, Else})
- end.
-
-
--spec stats_updater_refresh() -> ok.
-stats_updater_refresh() ->
- #stats_acc{
- pending_n = PendingN,
- running_n = RunningN,
- crashed_n = CrashedN
- } = ets:foldl(fun stats_fold/2, #stats_acc{}, ?MODULE),
- couch_stats:update_gauge([couch_replicator, jobs, pending], PendingN),
- couch_stats:update_gauge([couch_replicator, jobs, running], RunningN),
- couch_stats:update_gauge([couch_replicator, jobs, crashed], CrashedN),
- ok.
-
-
--spec stats_fold(#job{}, #stats_acc{}) -> #stats_acc{}.
-stats_fold(#job{pid = undefined, history = [{added, _}]}, Acc) ->
- Acc#stats_acc{pending_n = Acc#stats_acc.pending_n + 1};
-stats_fold(#job{pid = undefined, history = [{stopped, _} | _]}, Acc) ->
- Acc#stats_acc{pending_n = Acc#stats_acc.pending_n + 1};
-stats_fold(#job{pid = undefined, history = [{{crashed, _}, _} | _]}, Acc) ->
- Acc#stats_acc{crashed_n =Acc#stats_acc.crashed_n + 1};
-stats_fold(#job{pid = P, history = [{started, _} | _]}, Acc) when is_pid(P) ->
- Acc#stats_acc{running_n = Acc#stats_acc.running_n + 1}.
-
-
--spec existing_replication(#rep{}) -> boolean().
-existing_replication(#rep{} = NewRep) ->
- case job_by_id(NewRep#rep.id) of
- {ok, #job{rep = CurRep}} ->
- NormCurRep = couch_replicator_utils:normalize_rep(CurRep),
- NormNewRep = couch_replicator_utils:normalize_rep(NewRep),
- NormCurRep == NormNewRep;
- {error, not_found} ->
- false
- end.
-
-
--ifdef(TEST).
-
--include_lib("eunit/include/eunit.hrl").
-
-
-backoff_micros_test_() ->
- BaseInterval = ?BACKOFF_INTERVAL_MICROS,
- [?_assertEqual(R * BaseInterval, backoff_micros(N)) || {R, N} <- [
- {1, 1}, {2, 2}, {4, 3}, {8, 4}, {16, 5}, {32, 6}, {64, 7}, {128, 8},
- {256, 9}, {512, 10}, {1024, 11}, {1024, 12}
- ]].
-
-
-consecutive_crashes_test_() ->
- Threshold = ?DEFAULT_HEALTH_THRESHOLD_SEC,
- [?_assertEqual(R, consecutive_crashes(H, Threshold)) || {R, H} <- [
- {0, []},
- {0, [added()]},
- {0, [stopped()]},
- {0, [crashed()]},
- {1, [crashed(), added()]},
- {1, [crashed(), crashed()]},
- {1, [crashed(), stopped()]},
- {3, [crashed(), crashed(), crashed(), added()]},
- {2, [crashed(), crashed(), stopped()]},
- {1, [crashed(), started(), added()]},
- {2, [crashed(3), started(2), crashed(1), started(0)]},
- {0, [stopped(3), started(2), crashed(1), started(0)]},
- {1, [crashed(3), started(2), stopped(1), started(0)]},
- {0, [crashed(999), started(0)]},
- {1, [crashed(999), started(998), crashed(997), started(0)]}
- ]].
-
-
-consecutive_crashes_non_default_threshold_test_() ->
- [?_assertEqual(R, consecutive_crashes(H, T)) || {R, H, T} <- [
- {0, [crashed(11), started(0)], 10},
- {1, [crashed(10), started(0)], 10}
- ]].
-
-
-latest_crash_timestamp_test_() ->
- [?_assertEqual({0, R, 0}, latest_crash_timestamp(H)) || {R, H} <- [
- {0, [added()]},
- {1, [crashed(1)]},
- {3, [crashed(3), started(2), crashed(1), started(0)]},
- {1, [started(3), stopped(2), crashed(1), started(0)]}
- ]].
-
-
-last_started_test_() ->
- [?_assertEqual({0, R, 0}, last_started(testjob(H))) || {R, H} <- [
- {0, [added()]},
- {0, [crashed(1)]},
- {1, [started(1)]},
- {1, [added(), started(1)]},
- {2, [started(2), started(1)]},
- {2, [crashed(3), started(2), started(1)]}
- ]].
-
-
-longest_running_test() ->
- J0 = testjob([crashed()]),
- J1 = testjob([started(1)]),
- J2 = testjob([started(2)]),
- Sort = fun(Jobs) -> lists:sort(fun longest_running/2, Jobs) end,
- ?assertEqual([], Sort([])),
- ?assertEqual([J1], Sort([J1])),
- ?assertEqual([J1, J2], Sort([J2, J1])),
- ?assertEqual([J0, J1, J2], Sort([J2, J1, J0])).
-
-
-scheduler_test_() ->
- {
- setup,
- fun setup_all/0,
- fun teardown_all/1,
- {
- foreach,
- fun setup/0,
- fun teardown/1,
- [
- t_pending_jobs_simple(),
- t_pending_jobs_skip_crashed(),
- t_one_job_starts(),
- t_no_jobs_start_if_max_is_0(),
- t_one_job_starts_if_max_is_1(),
- t_max_churn_does_not_throttle_initial_start(),
- t_excess_oneshot_only_jobs(),
- t_excess_continuous_only_jobs(),
- t_excess_prefer_continuous_first(),
- t_stop_oldest_first(),
- t_start_oldest_first(),
- t_jobs_churn_even_if_not_all_max_jobs_are_running(),
- t_jobs_dont_churn_if_there_are_available_running_slots(),
- t_start_only_pending_jobs_do_not_churn_existing_ones(),
- t_dont_stop_if_nothing_pending(),
- t_max_churn_limits_number_of_rotated_jobs(),
- t_existing_jobs(),
- t_if_pending_less_than_running_start_all_pending(),
- t_running_less_than_pending_swap_all_running(),
- t_oneshot_dont_get_rotated(),
- t_rotate_continuous_only_if_mixed(),
- t_oneshot_dont_get_starting_priority(),
- t_oneshot_will_hog_the_scheduler(),
- t_if_excess_is_trimmed_rotation_still_happens(),
- t_if_transient_job_crashes_it_gets_removed(),
- t_if_permanent_job_crashes_it_stays_in_ets(),
- t_job_summary_running(),
- t_job_summary_pending(),
- t_job_summary_crashing_once(),
- t_job_summary_crashing_many_times(),
- t_job_summary_proxy_fields()
- ]
- }
- }.
-
-
-t_pending_jobs_simple() ->
- ?_test(begin
- Job1 = oneshot(1),
- Job2 = oneshot(2),
- setup_jobs([Job2, Job1]),
- ?assertEqual([], pending_jobs(0)),
- ?assertEqual([Job1], pending_jobs(1)),
- ?assertEqual([Job1, Job2], pending_jobs(2)),
- ?assertEqual([Job1, Job2], pending_jobs(3))
- end).
-
-
-t_pending_jobs_skip_crashed() ->
- ?_test(begin
- Job = oneshot(1),
- Ts = os:timestamp(),
- History = [crashed(Ts), started(Ts) | Job#job.history],
- Job1 = Job#job{history = History},
- Job2 = oneshot(2),
- Job3 = oneshot(3),
- setup_jobs([Job2, Job1, Job3]),
- ?assertEqual([Job2], pending_jobs(1)),
- ?assertEqual([Job2, Job3], pending_jobs(2)),
- ?assertEqual([Job2, Job3], pending_jobs(3))
- end).
-
-
-t_one_job_starts() ->
- ?_test(begin
- setup_jobs([oneshot(1)]),
- ?assertEqual({0, 1}, run_stop_count()),
- reschedule(mock_state(?DEFAULT_MAX_JOBS)),
- ?assertEqual({1, 0}, run_stop_count())
- end).
-
-
-t_no_jobs_start_if_max_is_0() ->
- ?_test(begin
- setup_jobs([oneshot(1)]),
- reschedule(mock_state(0)),
- ?assertEqual({0, 1}, run_stop_count())
- end).
-
-
-t_one_job_starts_if_max_is_1() ->
- ?_test(begin
- setup_jobs([oneshot(1), oneshot(2)]),
- reschedule(mock_state(1)),
- ?assertEqual({1, 1}, run_stop_count())
- end).
-
-
-t_max_churn_does_not_throttle_initial_start() ->
- ?_test(begin
- setup_jobs([oneshot(1), oneshot(2)]),
- reschedule(mock_state(?DEFAULT_MAX_JOBS, 0)),
- ?assertEqual({2, 0}, run_stop_count())
- end).
-
-
-t_excess_oneshot_only_jobs() ->
- ?_test(begin
- setup_jobs([oneshot_running(1), oneshot_running(2)]),
- ?assertEqual({2, 0}, run_stop_count()),
- reschedule(mock_state(1)),
- ?assertEqual({1, 1}, run_stop_count()),
- reschedule(mock_state(0)),
- ?assertEqual({0, 2}, run_stop_count())
- end).
-
-
-t_excess_continuous_only_jobs() ->
- ?_test(begin
- setup_jobs([continuous_running(1), continuous_running(2)]),
- ?assertEqual({2, 0}, run_stop_count()),
- reschedule(mock_state(1)),
- ?assertEqual({1, 1}, run_stop_count()),
- reschedule(mock_state(0)),
- ?assertEqual({0, 2}, run_stop_count())
- end).
-
-
-t_excess_prefer_continuous_first() ->
- ?_test(begin
- Jobs = [
- continuous_running(1),
- oneshot_running(2),
- continuous_running(3)
- ],
- setup_jobs(Jobs),
- ?assertEqual({3, 0}, run_stop_count()),
- ?assertEqual({1, 0}, oneshot_run_stop_count()),
- reschedule(mock_state(2)),
- ?assertEqual({2, 1}, run_stop_count()),
- ?assertEqual({1, 0}, oneshot_run_stop_count()),
- reschedule(mock_state(1)),
- ?assertEqual({1, 0}, oneshot_run_stop_count()),
- reschedule(mock_state(0)),
- ?assertEqual({0, 1}, oneshot_run_stop_count())
- end).
-
-
-t_stop_oldest_first() ->
- ?_test(begin
- Jobs = [
- continuous_running(7),
- continuous_running(4),
- continuous_running(5)
- ],
- setup_jobs(Jobs),
- reschedule(mock_state(2, 1)),
- ?assertEqual({2, 1}, run_stop_count()),
- ?assertEqual([4], jobs_stopped()),
- reschedule(mock_state(1, 1)),
- ?assertEqual([7], jobs_running())
- end).
-
-
-t_start_oldest_first() ->
- ?_test(begin
- setup_jobs([continuous(7), continuous(2), continuous(5)]),
- reschedule(mock_state(1)),
- ?assertEqual({1, 2}, run_stop_count()),
- ?assertEqual([2], jobs_running()),
- reschedule(mock_state(2)),
- ?assertEqual({2, 1}, run_stop_count()),
- % After rescheduling with max_jobs = 2, 2 was stopped and 5, 7 should
- % be running.
- ?assertEqual([2], jobs_stopped())
- end).
-
-
-t_jobs_churn_even_if_not_all_max_jobs_are_running() ->
- ?_test(begin
- setup_jobs([
- continuous_running(7),
- continuous(2),
- continuous(5)
- ]),
- reschedule(mock_state(2, 2)),
- ?assertEqual({2, 1}, run_stop_count()),
- ?assertEqual([7], jobs_stopped())
- end).
-
-
-t_jobs_dont_churn_if_there_are_available_running_slots() ->
- ?_test(begin
- setup_jobs([
- continuous_running(1),
- continuous_running(2)
- ]),
- reschedule(mock_state(2, 2)),
- ?assertEqual({2, 0}, run_stop_count()),
- ?assertEqual([], jobs_stopped()),
- ?assertEqual(0, meck:num_calls(couch_replicator_scheduler_sup, start_child, 1))
- end).
-
-
-t_start_only_pending_jobs_do_not_churn_existing_ones() ->
- ?_test(begin
- setup_jobs([
- continuous(1),
- continuous_running(2)
- ]),
- reschedule(mock_state(2, 2)),
- ?assertEqual(1, meck:num_calls(couch_replicator_scheduler_sup, start_child, 1)),
- ?assertEqual([], jobs_stopped()),
- ?assertEqual({2, 0}, run_stop_count())
- end).
-
-
-t_dont_stop_if_nothing_pending() ->
- ?_test(begin
- setup_jobs([continuous_running(1), continuous_running(2)]),
- reschedule(mock_state(2)),
- ?assertEqual({2, 0}, run_stop_count())
- end).
-
-
-t_max_churn_limits_number_of_rotated_jobs() ->
- ?_test(begin
- Jobs = [
- continuous(1),
- continuous_running(2),
- continuous(3),
- continuous_running(4)
- ],
- setup_jobs(Jobs),
- reschedule(mock_state(2, 1)),
- ?assertEqual([2, 3], jobs_stopped())
- end).
-
-
-t_if_pending_less_than_running_start_all_pending() ->
- ?_test(begin
- Jobs = [
- continuous(1),
- continuous_running(2),
- continuous(3),
- continuous_running(4),
- continuous_running(5)
- ],
- setup_jobs(Jobs),
- reschedule(mock_state(3)),
- ?assertEqual([1, 2, 5], jobs_running())
- end).
-
-
-t_running_less_than_pending_swap_all_running() ->
- ?_test(begin
- Jobs = [
- continuous(1),
- continuous(2),
- continuous(3),
- continuous_running(4),
- continuous_running(5)
- ],
- setup_jobs(Jobs),
- reschedule(mock_state(2)),
- ?assertEqual([3, 4, 5], jobs_stopped())
- end).
-
-
-t_oneshot_dont_get_rotated() ->
- ?_test(begin
- setup_jobs([oneshot_running(1), continuous(2)]),
- reschedule(mock_state(1)),
- ?assertEqual([1], jobs_running())
- end).
-
-
-t_rotate_continuous_only_if_mixed() ->
- ?_test(begin
- setup_jobs([continuous(1), oneshot_running(2), continuous_running(3)]),
- reschedule(mock_state(2)),
- ?assertEqual([1, 2], jobs_running())
- end).
-
-
-t_oneshot_dont_get_starting_priority() ->
- ?_test(begin
- setup_jobs([continuous(1), oneshot(2), continuous_running(3)]),
- reschedule(mock_state(1)),
- ?assertEqual([1], jobs_running())
- end).
-
-
-% This tested in other test cases, it is here to mainly make explicit a property
-% of one-shot replications -- they can starve other jobs if they "take control"
-% of all the available scheduler slots.
-t_oneshot_will_hog_the_scheduler() ->
- ?_test(begin
- Jobs = [
- oneshot_running(1),
- oneshot_running(2),
- oneshot(3),
- continuous(4)
- ],
- setup_jobs(Jobs),
- reschedule(mock_state(2)),
- ?assertEqual([1, 2], jobs_running())
- end).
-
-
-t_if_excess_is_trimmed_rotation_still_happens() ->
- ?_test(begin
- Jobs = [
- continuous(1),
- continuous_running(2),
- continuous_running(3)
- ],
- setup_jobs(Jobs),
- reschedule(mock_state(1)),
- ?assertEqual([1], jobs_running())
- end).
-
-
-t_if_transient_job_crashes_it_gets_removed() ->
- ?_test(begin
- Pid = mock_pid(),
- Job = #job{
- id = job1,
- pid = Pid,
- history = [added()],
- rep = #rep{db_name = null, options = [{continuous, true}]}
- },
- setup_jobs([Job]),
- ?assertEqual(1, ets:info(?MODULE, size)),
- State = #state{max_history = 3, stats_pid = self()},
- {noreply, State} = handle_info({'DOWN', r1, process, Pid, failed},
- State),
- ?assertEqual(0, ets:info(?MODULE, size))
- end).
-
-
-t_if_permanent_job_crashes_it_stays_in_ets() ->
- ?_test(begin
- Pid = mock_pid(),
- Job = #job{
- id = job1,
- pid = Pid,
- history = [added()],
- rep = #rep{db_name = <<"db1">>, options = [{continuous, true}]}
- },
- setup_jobs([Job]),
- ?assertEqual(1, ets:info(?MODULE, size)),
- State = #state{max_jobs =1, max_history = 3, stats_pid = self()},
- {noreply, State} = handle_info({'DOWN', r1, process, Pid, failed},
- State),
- ?assertEqual(1, ets:info(?MODULE, size)),
- [Job1] = ets:lookup(?MODULE, job1),
- [Latest | _] = Job1#job.history,
- ?assertMatch({{crashed, failed}, _}, Latest)
- end).
-
-
-t_existing_jobs() ->
- ?_test(begin
- Rep = #rep{
- id = job1,
- db_name = <<"db">>,
- source = <<"s">>,
- target = <<"t">>,
- options = [{continuous, true}]
- },
- setup_jobs([#job{id = Rep#rep.id, rep = Rep}]),
- NewRep = #rep{
- id = Rep#rep.id,
- db_name = <<"db">>,
- source = <<"s">>,
- target = <<"t">>,
- options = [{continuous, true}]
- },
- ?assert(existing_replication(NewRep)),
- ?assertNot(existing_replication(NewRep#rep{source = <<"s1">>})),
- ?assertNot(existing_replication(NewRep#rep{target = <<"t1">>})),
- ?assertNot(existing_replication(NewRep#rep{options = []}))
- end).
-
-
-t_job_summary_running() ->
- ?_test(begin
- Job = #job{
- id = job1,
- pid = mock_pid(),
- history = [added()],
- rep = #rep{
- db_name = <<"db1">>,
- source = <<"s">>,
- target = <<"t">>
- }
- },
- setup_jobs([Job]),
- Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC),
- ?assertEqual(running, proplists:get_value(state, Summary)),
- ?assertEqual(null, proplists:get_value(info, Summary)),
- ?assertEqual(0, proplists:get_value(error_count, Summary)),
-
- Stats = [{source_seq, <<"1-abc">>}],
- handle_cast({update_job_stats, job1, Stats}, mock_state(1)),
- Summary1 = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC),
- ?assertEqual({Stats}, proplists:get_value(info, Summary1))
- end).
-
-
-t_job_summary_pending() ->
- ?_test(begin
- Job = #job{
- id = job1,
- pid = undefined,
- history = [stopped(20), started(10), added()],
- rep = #rep{source = <<"s">>, target = <<"t">>}
- },
- setup_jobs([Job]),
- Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC),
- ?assertEqual(pending, proplists:get_value(state, Summary)),
- ?assertEqual(null, proplists:get_value(info, Summary)),
- ?assertEqual(0, proplists:get_value(error_count, Summary)),
-
- Stats = [{doc_write_failures, 1}],
- handle_cast({update_job_stats, job1, Stats}, mock_state(1)),
- Summary1 = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC),
- ?assertEqual({Stats}, proplists:get_value(info, Summary1))
- end).
-
-
-t_job_summary_crashing_once() ->
- ?_test(begin
- Job = #job{
- id = job1,
- history = [crashed(?DEFAULT_HEALTH_THRESHOLD_SEC + 1), started(0)],
- rep = #rep{source = <<"s">>, target = <<"t">>}
- },
- setup_jobs([Job]),
- Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC),
- ?assertEqual(crashing, proplists:get_value(state, Summary)),
- Info = proplists:get_value(info, Summary),
- ?assertEqual({[{<<"error">>, <<"some_reason">>}]}, Info),
- ?assertEqual(0, proplists:get_value(error_count, Summary))
- end).
-
-
-t_job_summary_crashing_many_times() ->
- ?_test(begin
- Job = #job{
- id = job1,
- history = [crashed(4), started(3), crashed(2), started(1)],
- rep = #rep{source = <<"s">>, target = <<"t">>}
- },
- setup_jobs([Job]),
- Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC),
- ?assertEqual(crashing, proplists:get_value(state, Summary)),
- Info = proplists:get_value(info, Summary),
- ?assertEqual({[{<<"error">>, <<"some_reason">>}]}, Info),
- ?assertEqual(2, proplists:get_value(error_count, Summary))
- end).
-
-
-t_job_summary_proxy_fields() ->
- ?_test(begin
- Job = #job{
- id = job1,
- history = [started(10), added()],
- rep = #rep{
- source = #httpdb{
- url = "https://s",
- proxy_url = "http://u:p@sproxy:12"
- },
- target = #httpdb{
- url = "http://t",
- proxy_url = "socks5://u:p@tproxy:34"
- }
- }
- },
- setup_jobs([Job]),
- Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC),
- ?assertEqual(<<"http://u:*****@sproxy:12">>,
- proplists:get_value(source_proxy, Summary)),
- ?assertEqual(<<"socks5://u:*****@tproxy:34">>,
- proplists:get_value(target_proxy, Summary))
- end).
-
-
-% Test helper functions
-
-setup_all() ->
- catch ets:delete(?MODULE),
- meck:expect(couch_log, notice, 2, ok),
- meck:expect(couch_log, warning, 2, ok),
- meck:expect(couch_log, error, 2, ok),
- meck:expect(couch_replicator_scheduler_sup, terminate_child, 1, ok),
- meck:expect(couch_stats, increment_counter, 1, ok),
- meck:expect(couch_stats, update_gauge, 2, ok),
- Pid = mock_pid(),
- meck:expect(couch_replicator_scheduler_sup, start_child, 1, {ok, Pid}).
-
-
-teardown_all(_) ->
- catch ets:delete(?MODULE),
- meck:unload().
-
-
-setup() ->
- meck:reset([
- couch_log,
- couch_replicator_scheduler_sup,
- couch_stats
- ]).
-
-
-teardown(_) ->
- ok.
-
-
-setup_jobs(Jobs) when is_list(Jobs) ->
- ?MODULE = ets:new(?MODULE, [named_table, {keypos, #job.id}]),
- ets:insert(?MODULE, Jobs).
-
-
-all_jobs() ->
- lists:usort(ets:tab2list(?MODULE)).
-
-
-jobs_stopped() ->
- [Job#job.id || Job <- all_jobs(), Job#job.pid =:= undefined].
-
-
-jobs_running() ->
- [Job#job.id || Job <- all_jobs(), Job#job.pid =/= undefined].
-
-
-run_stop_count() ->
- {length(jobs_running()), length(jobs_stopped())}.
-
-
-oneshot_run_stop_count() ->
- Running = [Job#job.id || Job <- all_jobs(), Job#job.pid =/= undefined,
- not is_continuous(Job)],
- Stopped = [Job#job.id || Job <- all_jobs(), Job#job.pid =:= undefined,
- not is_continuous(Job)],
- {length(Running), length(Stopped)}.
-
-
-mock_state(MaxJobs) ->
- #state{
- max_jobs = MaxJobs,
- max_churn = ?DEFAULT_MAX_CHURN,
- max_history = ?DEFAULT_MAX_HISTORY,
- stats_pid = self()
- }.
-
-mock_state(MaxJobs, MaxChurn) ->
- #state{
- max_jobs = MaxJobs,
- max_churn = MaxChurn,
- max_history = ?DEFAULT_MAX_HISTORY,
- stats_pid = self()
- }.
-
-
-continuous(Id) when is_integer(Id) ->
- Started = Id,
- Hist = [stopped(Started+1), started(Started), added()],
- #job{
- id = Id,
- history = Hist,
- rep = #rep{options = [{continuous, true}]}
- }.
-
-
-continuous_running(Id) when is_integer(Id) ->
- Started = Id,
- Pid = mock_pid(),
- #job{
- id = Id,
- history = [started(Started), added()],
- rep = #rep{options = [{continuous, true}]},
- pid = Pid,
- monitor = monitor(process, Pid)
- }.
-
-
-oneshot(Id) when is_integer(Id) ->
- Started = Id,
- Hist = [stopped(Started + 1), started(Started), added()],
- #job{id = Id, history = Hist, rep = #rep{options = []}}.
-
-
-oneshot_running(Id) when is_integer(Id) ->
- Started = Id,
- Pid = mock_pid(),
- #job{
- id = Id,
- history = [started(Started), added()],
- rep = #rep{options = []},
- pid = Pid,
- monitor = monitor(process, Pid)
- }.
-
-
-testjob(Hist) when is_list(Hist) ->
- #job{history = Hist}.
-
-
-mock_pid() ->
- list_to_pid("<0.999.999>").
-
-crashed() ->
- crashed(0).
-
-
-crashed(WhenSec) when is_integer(WhenSec)->
- {{crashed, some_reason}, {0, WhenSec, 0}};
-crashed({MSec, Sec, USec}) ->
- {{crashed, some_reason}, {MSec, Sec, USec}}.
-
-
-started() ->
- started(0).
-
-
-started(WhenSec) when is_integer(WhenSec)->
- {started, {0, WhenSec, 0}};
-
-started({MSec, Sec, USec}) ->
- {started, {MSec, Sec, USec}}.
-
-
-stopped() ->
- stopped(0).
-
-
-stopped(WhenSec) ->
- {stopped, {0, WhenSec, 0}}.
-
-
-added() ->
- {added, {0, 0, 0}}.
-
--endif.