summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Newson <rnewson@apache.org>2016-04-11 20:12:26 +0100
committerNick Vatamaniuc <vatamane@apache.org>2017-04-28 17:35:50 -0400
commit9718b97cb87ba239409d13e72c5f369927322e0e (patch)
tree8d508b5fdebfd7778ed0fbf408162bb97da45d82
parent350a67b3bc2372b4a88649768805deb896c86451 (diff)
downloadcouchdb-9718b97cb87ba239409d13e72c5f369927322e0e.tar.gz
Introduce couch_replicator_scheduler
Scheduling replicator can run a large number of replication jobs by scheduling them. It will periodically stop some jobs and start new ones. Jobs that fail will be penalized with an exponential backoff. Jira: COUCHDB-3324
-rw-r--r--src/couch_replicator/src/couch_replicator_scheduler.erl1430
-rw-r--r--src/couch_replicator/src/couch_replicator_scheduler.hrl15
-rw-r--r--src/couch_replicator/src/couch_replicator_scheduler_job.erl969
-rw-r--r--src/couch_replicator/src/couch_replicator_scheduler_sup.erl62
4 files changed, 2476 insertions, 0 deletions
diff --git a/src/couch_replicator/src/couch_replicator_scheduler.erl b/src/couch_replicator/src/couch_replicator_scheduler.erl
new file mode 100644
index 000000000..45d54b1a0
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_scheduler.erl
@@ -0,0 +1,1430 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_scheduler).
+
+-behaviour(gen_server).
+-behaviour(config_listener).
+
+-export([
+ start_link/0
+]).
+
+-export([
+ init/1,
+ terminate/2,
+ handle_call/3,
+ handle_info/2,
+ handle_cast/2,
+ code_change/3,
+ format_status/2
+]).
+
+-export([
+ add_job/1,
+ remove_job/1,
+ reschedule/0,
+ rep_state/1,
+ find_jobs_by_dbname/1,
+ find_jobs_by_doc/2,
+ job_summary/2,
+ health_threshold/0,
+ jobs/0,
+ job/1
+]).
+
+%% config_listener callbacks
+-export([
+ handle_config_change/5,
+ handle_config_terminate/3
+]).
+
+%% for status updater process to allow hot code loading
+-export([
+ stats_updater_loop/1
+]).
+
+-include("couch_replicator_scheduler.hrl").
+-include("couch_replicator.hrl").
+-include("couch_replicator_api_wrap.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+%% types
+-type event_type() :: added | started | stopped | {crashed, any()}.
+-type event() :: {Type:: event_type(), When :: erlang:timestamp()}.
+-type history() :: nonempty_list(event()).
+
+%% definitions
+-define(MAX_BACKOFF_EXPONENT, 10).
+-define(BACKOFF_INTERVAL_MICROS, 30 * 1000 * 1000).
+-define(DEFAULT_HEALTH_THRESHOLD_SEC, 2 * 60).
+-define(RELISTEN_DELAY, 5000).
+-define(STATS_UPDATE_WAIT, 5000).
+
+-define(DEFAULT_MAX_JOBS, 500).
+-define(DEFAULT_MAX_CHURN, 20).
+-define(DEFAULT_MAX_HISTORY, 20).
+-define(DEFAULT_SCHEDULER_INTERVAL, 60000).
+
+
+-record(state, {interval, timer, max_jobs, max_churn, max_history, stats_pid}).
+-record(job, {
+ id :: job_id() | '$1' | '_',
+ rep :: #rep{} | '_',
+ pid :: undefined | pid() | '$1' | '_',
+ monitor :: undefined | reference() | '_',
+ history :: history() | '_'
+}).
+
+-record(stats_acc, {
+ pending_n = 0 :: non_neg_integer(),
+ running_n = 0 :: non_neg_integer(),
+ crashed_n = 0 :: non_neg_integer()
+}).
+
+
+%% public functions
+
+-spec start_link() -> {ok, pid()} | ignore | {error, term()}.
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+-spec add_job(#rep{}) -> ok.
+add_job(#rep{} = Rep) when Rep#rep.id /= undefined ->
+ Job = #job{
+ id = Rep#rep.id,
+ rep = Rep,
+ history = [{added, os:timestamp()}]
+ },
+ gen_server:call(?MODULE, {add_job, Job}, infinity).
+
+
+-spec remove_job(job_id()) -> ok.
+remove_job(Id) ->
+ gen_server:call(?MODULE, {remove_job, Id}, infinity).
+
+
+-spec reschedule() -> ok.
+% Trigger a manual reschedule. Used for testing and/or ops.
+reschedule() ->
+ gen_server:call(?MODULE, reschedule, infinity).
+
+
+-spec rep_state(rep_id()) -> #rep{} | nil.
+rep_state(RepId) ->
+ case (catch ets:lookup_element(?MODULE, RepId, #job.rep)) of
+ {'EXIT',{badarg, _}} ->
+ nil;
+ Rep ->
+ Rep
+ end.
+
+
+-spec job_summary(job_id(), non_neg_integer()) -> [_] | nil.
+job_summary(JobId, HealthThreshold) ->
+ case job_by_id(JobId) of
+ {ok, #job{pid = Pid, history = History, rep = Rep}} ->
+ ErrorCount = consecutive_crashes(History, HealthThreshold),
+ {State, Info} = case {Pid, ErrorCount} of
+ {undefined, 0} ->
+ {pending, null};
+ {undefined, ErrorCount} when ErrorCount > 0 ->
+ [{{crashed, Error}, _When} | _] = History,
+ ErrMsg = couch_replicator_utils:rep_error_to_binary(Error),
+ {crashing, ErrMsg};
+ {Pid, ErrorCount} when is_pid(Pid) ->
+ {running, null}
+ end,
+ [
+ {source, iolist_to_binary(ejson_url(Rep#rep.source))},
+ {target, iolist_to_binary(ejson_url(Rep#rep.target))},
+ {state, State},
+ {info, Info},
+ {error_count, ErrorCount},
+ {last_updated, last_updated(History)},
+ {start_time,
+ couch_replicator_utils:iso8601(Rep#rep.start_time)},
+ {proxy, job_proxy_url(Rep#rep.source)}
+ ];
+ {error, not_found} ->
+ nil % Job might have just completed
+ end.
+
+
+job_proxy_url(#httpdb{proxy_url = ProxyUrl}) when is_list(ProxyUrl) ->
+ list_to_binary(couch_util:url_strip_password(ProxyUrl));
+job_proxy_url(_Endpoint) ->
+ null.
+
+
+% Health threshold is the minimum amount of time an unhealthy job should run
+% crashing before it is considered to be healthy again. HealtThreashold should
+% not be 0 as jobs could start and immediately crash, and it shouldn't be
+% infinity, since then consecutive crashes would accumulate forever even if
+% job is back to normal.
+-spec health_threshold() -> non_neg_integer().
+health_threshold() ->
+ config:get_integer("replicator", "health_threshold",
+ ?DEFAULT_HEALTH_THRESHOLD_SEC).
+
+
+-spec find_jobs_by_dbname(binary()) -> list(#rep{}).
+find_jobs_by_dbname(DbName) ->
+ Rep = #rep{db_name = DbName, _ = '_'},
+ MatchSpec = #job{id = '$1', rep = Rep, _ = '_'},
+ [RepId || [RepId] <- ets:match(?MODULE, MatchSpec)].
+
+
+-spec find_jobs_by_doc(binary(), binary()) -> list(#rep{}).
+find_jobs_by_doc(DbName, DocId) ->
+ Rep = #rep{db_name = DbName, doc_id = DocId, _ = '_'},
+ MatchSpec = #job{id = '$1', rep = Rep, _ = '_'},
+ [RepId || [RepId] <- ets:match(?MODULE, MatchSpec)].
+
+
+%% gen_server functions
+
+init(_) ->
+ EtsOpts = [named_table, {read_concurrency, true}, {keypos, #job.id}],
+ ?MODULE = ets:new(?MODULE, EtsOpts),
+ ok = config:listen_for_changes(?MODULE, nil),
+ Interval = config:get_integer("replicator", "interval",
+ ?DEFAULT_SCHEDULER_INTERVAL),
+ MaxJobs = config:get_integer("replicator", "max_jobs", ?DEFAULT_MAX_JOBS),
+ MaxChurn = config:get_integer("replicator", "max_churn",
+ ?DEFAULT_MAX_CHURN),
+ MaxHistory = config:get_integer("replicator", "max_history",
+ ?DEFAULT_MAX_HISTORY),
+ Timer = erlang:send_after(Interval, self(), reschedule),
+ State = #state{
+ interval = Interval,
+ max_jobs = MaxJobs,
+ max_churn = MaxChurn,
+ max_history = MaxHistory,
+ timer = Timer,
+ stats_pid = start_stats_updater()
+ },
+ {ok, State}.
+
+
+handle_call({add_job, Job}, _From, State) ->
+ ok = maybe_remove_job_int(Job#job.id, State),
+ true = add_job_int(Job),
+ ok = maybe_start_newly_added_job(Job, State),
+ couch_stats:increment_counter([couch_replicator, jobs, adds]),
+ TotalJobs = ets:info(?MODULE, size),
+ couch_stats:update_gauge([couch_replicator, jobs, total], TotalJobs),
+ {reply, ok, State};
+
+handle_call({remove_job, Id}, _From, State) ->
+ ok = maybe_remove_job_int(Id, State),
+ {reply, ok, State};
+
+handle_call(reschedule, _From, State) ->
+ ok = reschedule(State),
+ {reply, ok, State};
+
+handle_call(_, _From, State) ->
+ {noreply, State}.
+
+
+handle_cast({set_max_jobs, MaxJobs}, State) when is_integer(MaxJobs),
+ MaxJobs >= 0 ->
+ couch_log:notice("~p: max_jobs set to ~B", [?MODULE, MaxJobs]),
+ {noreply, State#state{max_jobs = MaxJobs}};
+
+handle_cast({set_max_churn, MaxChurn}, State) when is_integer(MaxChurn),
+ MaxChurn > 0 ->
+ couch_log:notice("~p: max_churn set to ~B", [?MODULE, MaxChurn]),
+ {noreply, State#state{max_churn = MaxChurn}};
+
+handle_cast({set_max_history, MaxHistory}, State) when is_integer(MaxHistory),
+ MaxHistory > 0 ->
+ couch_log:notice("~p: max_history set to ~B", [?MODULE, MaxHistory]),
+ {noreply, State#state{max_history = MaxHistory}};
+
+handle_cast({set_interval, Interval}, State) when is_integer(Interval),
+ Interval > 0 ->
+ couch_log:notice("~p: interval set to ~B", [?MODULE, Interval]),
+ {noreply, State#state{interval = Interval}};
+
+handle_cast(_, State) ->
+ {noreply, State}.
+
+
+handle_info(reschedule, State) ->
+ ok = reschedule(State),
+ erlang:cancel_timer(State#state.timer),
+ Timer = erlang:send_after(State#state.interval, self(), reschedule),
+ {noreply, State#state{timer = Timer}};
+
+handle_info({'DOWN', _Ref, process, Pid, normal}, State) ->
+ {ok, Job} = job_by_pid(Pid),
+ couch_log:notice("~p: Job ~p completed normally", [?MODULE, Job#job.id]),
+ remove_job_int(Job),
+ update_running_jobs_stats(State#state.stats_pid),
+ {noreply, State};
+
+handle_info({'DOWN', _Ref, process, Pid, Reason}, State) ->
+ {ok, Job} = job_by_pid(Pid),
+ ok = handle_crashed_job(Job, Reason, State),
+ {noreply, State};
+
+handle_info(restart_config_listener, State) ->
+ ok = config:listen_for_changes(?MODULE, nil),
+ {noreply, State};
+
+handle_info(_, State) ->
+ {noreply, State}.
+
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+
+terminate(_Reason, _State) ->
+ ok.
+
+
+format_status(_Opt, [_PDict, State]) ->
+ [
+ {max_jobs, State#state.max_jobs},
+ {running_jobs, running_job_count()},
+ {pending_jobs, pending_job_count()}
+ ].
+
+
+%% config listener functions
+
+handle_config_change("replicator", "max_jobs", V, _, S) ->
+ ok = gen_server:cast(?MODULE, {set_max_jobs, list_to_integer(V)}),
+ {ok, S};
+
+handle_config_change("replicator", "max_churn", V, _, S) ->
+ ok = gen_server:cast(?MODULE, {set_max_churn, list_to_integer(V)}),
+ {ok, S};
+
+handle_config_change("replicator", "interval", V, _, S) ->
+ ok = gen_server:cast(?MODULE, {set_interval, list_to_integer(V)}),
+ {ok, S};
+
+handle_config_change("replicator", "max_history", V, _, S) ->
+ ok = gen_server:cast(?MODULE, {set_max_history, list_to_integer(V)}),
+ {ok, S};
+
+handle_config_change(_, _, _, _, S) ->
+ {ok, S}.
+
+
+handle_config_terminate(_, stop, _) ->
+ ok;
+
+handle_config_terminate(_, _, _) ->
+ Pid = whereis(?MODULE),
+ erlang:send_after(?RELISTEN_DELAY, Pid, restart_config_listener).
+
+
+%% Private functions
+
+% Handle crashed jobs. Handling differs between transient and permanent jobs.
+% Transient jobs are those posted to the _replicate endpoint. They don't have a
+% db associated with them. When those jobs crash, they are not restarted. That
+% is also consistent with behavior when the node they run on, crashed and they
+% do not migrate to other nodes. Permanent jobs are those created from
+% replicator documents. Those jobs, once they pass basic validation and end up
+% in the scheduler will be retried indefinitely (with appropriate exponential
+% backoffs).
+-spec handle_crashed_job(#job{}, any(), #state{}) -> ok.
+handle_crashed_job(#job{rep = #rep{db_name = null}} = Job, Reason, State) ->
+ Msg = "~p : Transient job ~p failed, removing. Error: ~p",
+ ErrorBinary = couch_replicator_utils:rep_error_to_binary(Reason),
+ couch_log:error(Msg, [?MODULE, Job#job.id, ErrorBinary]),
+ remove_job_int(Job),
+ update_running_jobs_stats(State#state.stats_pid),
+ ok;
+
+handle_crashed_job(Job, Reason, State) ->
+ ok = update_state_crashed(Job, Reason, State),
+ case couch_replicator_doc_processor:update_docs() of
+ true ->
+ couch_replicator_docs:update_error(Job#job.rep, Reason);
+ false ->
+ ok
+ end,
+ case ets:info(?MODULE, size) < State#state.max_jobs of
+ true ->
+ % Starting pending jobs is an O(TotalJobsCount) operation. Only do
+ % it if there is a relatively small number of jobs. Otherwise
+ % scheduler could be blocked if there is a cascade of lots failing
+ % jobs in a row.
+ start_pending_jobs(State),
+ update_running_jobs_stats(State#state.stats_pid),
+ ok;
+ false ->
+ ok
+ end.
+
+
+% Attempt to start a newly added job. First quickly check if total jobs
+% already exceed max jobs, then do a more expensive check which runs a
+% select (an O(n) operation) to check pending jobs specifically.
+-spec maybe_start_newly_added_job(#job{}, #state{}) -> ok.
+maybe_start_newly_added_job(Job, State) ->
+ MaxJobs = State#state.max_jobs,
+ TotalJobs = ets:info(?MODULE, size),
+ case TotalJobs < MaxJobs andalso running_job_count() < MaxJobs of
+ true ->
+ start_job_int(Job, State),
+ update_running_jobs_stats(State#state.stats_pid),
+ ok;
+ false ->
+ ok
+ end.
+
+
+% Return up to a given number of oldest, not recently crashed jobs. Try to be
+% memory efficient and use ets:foldl to accumulate jobs.
+-spec pending_jobs(non_neg_integer()) -> [#job{}].
+pending_jobs(0) ->
+ % Handle this case as user could set max_churn to 0. If this is passed to
+ % other function clause it will crash as gb_sets:largest assumes set is not
+ % empty.
+ [];
+
+pending_jobs(Count) when is_integer(Count), Count > 0 ->
+ Set0 = gb_sets:new(), % [{LastStart, Job},...]
+ Now = os:timestamp(),
+ Acc0 = {Set0, Now, Count, health_threshold()},
+ {Set1, _, _, _} = ets:foldl(fun pending_fold/2, Acc0, ?MODULE),
+ [Job || {_Started, Job} <- gb_sets:to_list(Set1)].
+
+
+pending_fold(Job, {Set, Now, Count, HealthThreshold}) ->
+ Set1 = case {not_recently_crashed(Job, Now, HealthThreshold),
+ gb_sets:size(Set) >= Count} of
+ {true, true} ->
+ % Job is healthy but already reached accumulated limit, so might
+ % have to replace one of the accumulated jobs
+ pending_maybe_replace(Job, Set);
+ {true, false} ->
+ % Job is healthy and we haven't reached the limit, so add job
+ % to accumulator
+ gb_sets:add_element({last_started(Job), Job}, Set);
+ {false, _} ->
+ % This job is not healthy (has crashed too recently), so skip it.
+ Set
+ end,
+ {Set1, Now, Count, HealthThreshold}.
+
+
+% Replace Job in the accumulator if it is older than youngest job there.
+% "oldest" here means one which has been waiting to run the longest. "youngest"
+% means the one with most recent activity. The goal is to keep up to Count
+% oldest jobs during iteration. For example if there are jobs with these times
+% accumulated so far [5, 7, 11], and start time of current job is 6. Then
+% 6 < 11 is true, so 11 (youngest) is dropped and 6 inserted resulting in
+% [5, 6, 7]. In the end the result might look like [1, 2, 5], for example.
+pending_maybe_replace(Job, Set) ->
+ Started = last_started(Job),
+ {Youngest, YoungestJob} = gb_sets:largest(Set),
+ case Started < Youngest of
+ true ->
+ Set1 = gb_sets:delete({Youngest, YoungestJob}, Set),
+ gb_sets:add_element({Started, Job}, Set1);
+ false ->
+ Set
+ end.
+
+
+start_jobs(Count, State) ->
+ [start_job_int(Job, State) || Job <- pending_jobs(Count)],
+ ok.
+
+
+-spec stop_jobs(non_neg_integer(), boolean(), #state{}) -> non_neg_integer().
+stop_jobs(Count, IsContinuous, State) ->
+ Running0 = running_jobs(),
+ ContinuousPred = fun(Job) -> is_continuous(Job) =:= IsContinuous end,
+ Running1 = lists:filter(ContinuousPred, Running0),
+ Running2 = lists:sort(fun longest_running/2, Running1),
+ Running3 = lists:sublist(Running2, Count),
+ length([stop_job_int(Job, State) || Job <- Running3]).
+
+
+longest_running(#job{} = A, #job{} = B) ->
+ last_started(A) =< last_started(B).
+
+
+not_recently_crashed(#job{history = History}, Now, HealthThreshold) ->
+ case History of
+ [{added, _When}] ->
+ true;
+ [{stopped, _When} | _] ->
+ true;
+ _ ->
+ LatestCrashT = latest_crash_timestamp(History),
+ CrashCount = consecutive_crashes(History, HealthThreshold),
+ timer:now_diff(Now, LatestCrashT) >= backoff_micros(CrashCount)
+ end.
+
+
+% Count consecutive crashes. A crash happens when there is a `crashed` event
+% within a short period of time (configurable) after any other event. It could
+% be `crashed, started` for jobs crashing quickly after starting, `crashed,
+% crashed`, `crashed, stopped` if job repeatedly failed to start
+% being stopped. Or it could be `crashed, added` if it crashed immediately after
+% being added during start.
+%
+% A streak of "consecutive crashes" ends when a crashed event is seen starting
+% and running successfully without crashing for a period of time. That period
+% of time is the HealthThreshold.
+%
+
+-spec consecutive_crashes(history(), non_neg_integer()) -> non_neg_integer().
+consecutive_crashes(History, HealthThreshold) when is_list(History) ->
+ consecutive_crashes(History, HealthThreshold, 0).
+
+
+-spec consecutive_crashes(history(), non_neg_integer(), non_neg_integer()) ->
+ non_neg_integer().
+consecutive_crashes([], _HealthThreashold, Count) ->
+ Count;
+
+consecutive_crashes([{{crashed, _}, CrashT}, {_, PrevT} = PrevEvent | Rest],
+ HealthThreshold, Count) ->
+ case timer:now_diff(CrashT, PrevT) > HealthThreshold * 1000000 of
+ true ->
+ Count;
+ false ->
+ consecutive_crashes([PrevEvent | Rest], HealthThreshold, Count + 1)
+ end;
+
+consecutive_crashes([{stopped, _}, {started, _} | _], _HealthThreshold,
+ Count) ->
+ Count;
+
+consecutive_crashes([_ | Rest], HealthThreshold, Count) ->
+ consecutive_crashes(Rest, HealthThreshold, Count).
+
+
+-spec latest_crash_timestamp(history()) -> erlang:timestamp().
+latest_crash_timestamp([]) ->
+ {0, 0, 0}; % Used to avoid special-casing "no crash" when doing now_diff
+
+latest_crash_timestamp([{{crashed, _Reason}, When} | _]) ->
+ When;
+
+latest_crash_timestamp([_Event | Rest]) ->
+ latest_crash_timestamp(Rest).
+
+
+-spec backoff_micros(non_neg_integer()) -> non_neg_integer().
+backoff_micros(CrashCount) ->
+ % When calculating the backoff interval treat consecutive crash count as the
+ % exponent in Base * 2 ^ CrashCount to achieve an exponential backoff
+ % doubling every consecutive failure, starting with the base value of
+ % ?BACKOFF_INTERVAL_MICROS.
+ BackoffExp = erlang:min(CrashCount - 1, ?MAX_BACKOFF_EXPONENT),
+ (1 bsl BackoffExp) * ?BACKOFF_INTERVAL_MICROS.
+
+
+-spec add_job_int(#job{}) -> boolean().
+add_job_int(#job{} = Job) ->
+ ets:insert_new(?MODULE, Job).
+
+
+-spec maybe_remove_job_int(job_id(), #state{}) -> ok.
+maybe_remove_job_int(JobId, State) ->
+ case job_by_id(JobId) of
+ {ok, Job} ->
+ ok = stop_job_int(Job, State),
+ true = remove_job_int(Job),
+ couch_stats:increment_counter([couch_replicator, jobs, removes]),
+ TotalJobs = ets:info(?MODULE, size),
+ couch_stats:update_gauge([couch_replicator, jobs, total],
+ TotalJobs),
+ update_running_jobs_stats(State#state.stats_pid),
+ ok;
+ {error, not_found} ->
+ ok
+ end.
+
+
+start_job_int(#job{pid = Pid}, _State) when Pid /= undefined ->
+ ok;
+
+start_job_int(#job{} = Job0, State) ->
+ Job = maybe_optimize_job_for_rate_limiting(Job0),
+ case couch_replicator_scheduler_sup:start_child(Job#job.rep) of
+ {ok, Child} ->
+ Ref = monitor(process, Child),
+ ok = update_state_started(Job, Child, Ref, State),
+ couch_log:notice("~p: Job ~p started as ~p",
+ [?MODULE, Job#job.id, Child]);
+ {error, {already_started, OtherPid}} when node(OtherPid) =:= node() ->
+ Ref = monitor(process, OtherPid),
+ ok = update_state_started(Job, OtherPid, Ref, State),
+ couch_log:notice("~p: Job ~p already running as ~p. Most likely"
+ " because replicator scheduler was restarted",
+ [?MODULE, Job#job.id, OtherPid]);
+ {error, {already_started, OtherPid}} when node(OtherPid) =/= node() ->
+ CrashMsg = "Duplicate replication running on another node",
+ couch_log:notice("~p: Job ~p already running as ~p. Most likely"
+ " because a duplicate replication is running on another node",
+ [?MODULE, Job#job.id, OtherPid]),
+ ok = update_state_crashed(Job, CrashMsg, State);
+ {error, Reason} ->
+ couch_log:notice("~p: Job ~p failed to start for reason ~p",
+ [?MODULE, Job, Reason]),
+ ok = update_state_crashed(Job, Reason, State)
+ end.
+
+
+-spec stop_job_int(#job{}, #state{}) -> ok | {error, term()}.
+stop_job_int(#job{pid = undefined}, _State) ->
+ ok;
+
+stop_job_int(#job{} = Job, State) ->
+ ok = couch_replicator_scheduler_sup:terminate_child(Job#job.pid),
+ demonitor(Job#job.monitor, [flush]),
+ ok = update_state_stopped(Job, State),
+ couch_log:notice("~p: Job ~p stopped as ~p",
+ [?MODULE, Job#job.id, Job#job.pid]).
+
+
+-spec remove_job_int(#job{}) -> true.
+remove_job_int(#job{} = Job) ->
+ ets:delete(?MODULE, Job#job.id).
+
+
+-spec running_job_count() -> non_neg_integer().
+running_job_count() ->
+ ets:info(?MODULE, size) - pending_job_count().
+
+
+-spec running_jobs() -> [#job{}].
+running_jobs() ->
+ ets:select(?MODULE, [{#job{pid = '$1', _='_'}, [{is_pid, '$1'}], ['$_']}]).
+
+
+-spec pending_job_count() -> non_neg_integer().
+pending_job_count() ->
+ ets:select_count(?MODULE, [{#job{pid=undefined, _='_'}, [], [true]}]).
+
+
+-spec job_by_pid(pid()) -> {ok, #job{}} | {error, not_found}.
+job_by_pid(Pid) when is_pid(Pid) ->
+ case ets:match_object(?MODULE, #job{pid=Pid, _='_'}) of
+ [] ->
+ {error, not_found};
+ [#job{}=Job] ->
+ {ok, Job}
+ end.
+
+
+-spec job_by_id(job_id()) -> {ok, #job{}} | {error, not_found}.
+job_by_id(Id) ->
+ case ets:lookup(?MODULE, Id) of
+ [] ->
+ {error, not_found};
+ [#job{}=Job] ->
+ {ok, Job}
+ end.
+
+
+-spec update_state_stopped(#job{}, #state{}) -> ok.
+update_state_stopped(Job, State) ->
+ Job1 = reset_job_process(Job),
+ Job2 = update_history(Job1, stopped, os:timestamp(), State),
+ true = ets:insert(?MODULE, Job2),
+ couch_stats:increment_counter([couch_replicator, jobs, stops]),
+ ok.
+
+
+-spec update_state_started(#job{}, pid(), reference(), #state{}) -> ok.
+update_state_started(Job, Pid, Ref, State) ->
+ Job1 = set_job_process(Job, Pid, Ref),
+ Job2 = update_history(Job1, started, os:timestamp(), State),
+ true = ets:insert(?MODULE, Job2),
+ couch_stats:increment_counter([couch_replicator, jobs, starts]),
+ ok.
+
+
+-spec update_state_crashed(#job{}, any(), #state{}) -> ok.
+update_state_crashed(Job, Reason, State) ->
+ Job1 = reset_job_process(Job),
+ Job2 = update_history(Job1, {crashed, Reason}, os:timestamp(), State),
+ true = ets:insert(?MODULE, Job2),
+ couch_stats:increment_counter([couch_replicator, jobs, crashes]),
+ ok.
+
+
+-spec set_job_process(#job{}, pid(), reference()) -> #job{}.
+set_job_process(#job{} = Job, Pid, Ref) when is_pid(Pid), is_reference(Ref) ->
+ Job#job{pid = Pid, monitor = Ref}.
+
+
+-spec reset_job_process(#job{}) -> #job{}.
+reset_job_process(#job{} = Job) ->
+ Job#job{pid = undefined, monitor = undefined}.
+
+
+-spec reschedule(#state{}) -> ok.
+reschedule(State) ->
+ Running = running_job_count(),
+ Pending = pending_job_count(),
+ stop_excess_jobs(State, Running),
+ start_pending_jobs(State, Running, Pending),
+ rotate_jobs(State, Running, Pending),
+ update_running_jobs_stats(State#state.stats_pid),
+ ok.
+
+
+-spec stop_excess_jobs(#state{}, non_neg_integer()) -> ok.
+stop_excess_jobs(State, Running) ->
+ #state{max_jobs=MaxJobs} = State,
+ StopCount = Running - MaxJobs,
+ if StopCount =< 0 -> ok; true ->
+ Stopped = stop_jobs(StopCount, true, State),
+ OneshotLeft = StopCount - Stopped,
+ if OneshotLeft =< 0 -> ok; true ->
+ stop_jobs(OneshotLeft, false, State),
+ ok
+ end
+ end.
+
+
+start_pending_jobs(State) ->
+ start_pending_jobs(State, running_job_count(), pending_job_count()).
+
+
+start_pending_jobs(State, Running, Pending) ->
+ #state{max_jobs=MaxJobs} = State,
+ if Running < MaxJobs, Pending > 0 ->
+ start_jobs(MaxJobs - Running, State);
+ true ->
+ ok
+ end.
+
+
+-spec rotate_jobs(#state{}, non_neg_integer(), non_neg_integer()) -> ok.
+rotate_jobs(State, Running, Pending) ->
+ #state{max_jobs=MaxJobs, max_churn=MaxChurn} = State,
+ if Running == MaxJobs, Pending > 0 ->
+ RotateCount = lists:min([Pending, Running, MaxChurn]),
+ StopCount = stop_jobs(RotateCount, true, State),
+ start_jobs(StopCount, State);
+ true ->
+ ok
+ end.
+
+
+-spec last_started(#job{}) -> erlang:timestamp().
+last_started(#job{} = Job) ->
+ case lists:keyfind(started, 1, Job#job.history) of
+ false ->
+ {0, 0, 0};
+ {started, When} ->
+ When
+ end.
+
+
+-spec update_history(#job{}, event_type(), erlang:timestamp(), #state{}) ->
+ #job{}.
+update_history(Job, Type, When, State) ->
+ History0 = [{Type, When} | Job#job.history],
+ History1 = lists:sublist(History0, State#state.max_history),
+ Job#job{history = History1}.
+
+
+-spec ejson_url(#httpdb{} | binary()) -> binary().
+ejson_url(#httpdb{}=Httpdb) ->
+ couch_util:url_strip_password(Httpdb#httpdb.url);
+ejson_url(DbName) when is_binary(DbName) ->
+ DbName.
+
+
+-spec job_ejson(#job{}) -> {[_ | _]}.
+job_ejson(Job) ->
+ Rep = Job#job.rep,
+ Source = ejson_url(Rep#rep.source),
+ Target = ejson_url(Rep#rep.target),
+ History = lists:map(fun({Type, When}) ->
+ EventProps = case Type of
+ {crashed, Reason} ->
+ [{type, crashed}, {reason, crash_reason_json(Reason)}];
+ Type ->
+ [{type, Type}]
+ end,
+ {[{timestamp, couch_replicator_utils:iso8601(When)} | EventProps]}
+ end, Job#job.history),
+ {BaseID, Ext} = Job#job.id,
+ Pid = case Job#job.pid of
+ undefined ->
+ null;
+ P when is_pid(P) ->
+ ?l2b(pid_to_list(P))
+ end,
+ {[
+ {id, iolist_to_binary([BaseID, Ext])},
+ {pid, Pid},
+ {source, iolist_to_binary(Source)},
+ {target, iolist_to_binary(Target)},
+ {database, Rep#rep.db_name},
+ {user, (Rep#rep.user_ctx)#user_ctx.name},
+ {doc_id, Rep#rep.doc_id},
+ {history, History},
+ {node, node()},
+ {start_time, couch_replicator_utils:iso8601(Rep#rep.start_time)}
+ ]}.
+
+
+-spec jobs() -> [[tuple()]].
+jobs() ->
+ ets:foldl(fun(Job, Acc) -> [job_ejson(Job) | Acc] end, [], ?MODULE).
+
+
+-spec job(job_id()) -> {ok, {[_ | _]}} | {error, not_found}.
+job(JobId) ->
+ case job_by_id(JobId) of
+ {ok, Job} ->
+ {ok, job_ejson(Job)};
+ Error ->
+ Error
+ end.
+
+
+crash_reason_json({_CrashType, Info}) when is_binary(Info) ->
+ Info;
+crash_reason_json(Reason) when is_binary(Reason) ->
+ Reason;
+crash_reason_json(Error) ->
+ couch_replicator_utils:rep_error_to_binary(Error).
+
+
+-spec last_updated([_]) -> binary().
+last_updated([{_Type, When} | _]) ->
+ couch_replicator_utils:iso8601(When).
+
+
+-spec is_continuous(#job{}) -> boolean().
+is_continuous(#job{rep = Rep}) ->
+ couch_util:get_value(continuous, Rep#rep.options, false).
+
+
+% If job crashed last time because it was rate limited, try to
+% optimize some options to help the job make progress.
+-spec maybe_optimize_job_for_rate_limiting(#job{}) -> #job{}.
+maybe_optimize_job_for_rate_limiting(Job = #job{history =
+ [{{crashed, {shutdown, max_backoff}}, _} | _]}) ->
+ Opts = [
+ {checkpoint_interval, 5000},
+ {worker_processes, 2},
+ {worker_batch_size, 100},
+ {http_connections, 5}
+ ],
+ Rep = lists:foldl(fun optimize_int_option/2, Job#job.rep, Opts),
+ Job#job{rep = Rep};
+maybe_optimize_job_for_rate_limiting(Job) ->
+ Job.
+
+
+-spec optimize_int_option({atom(), any()}, #rep{}) -> #rep{}.
+optimize_int_option({Key, Val}, #rep{options = Options} = Rep) ->
+ case couch_util:get_value(Key, Options) of
+ CurVal when is_integer(CurVal), CurVal > Val ->
+ Msg = "~p replication ~p : setting ~p = ~p due to rate limiting",
+ couch_log:warning(Msg, [?MODULE, Rep#rep.id, Key, Val]),
+ Options1 = lists:keyreplace(Key, 1, Options, {Key, Val}),
+ Rep#rep{options = Options1};
+ _ ->
+ Rep
+ end.
+
+
+% Updater is a separate process. It receives `update_stats` messages and
+% updates scheduler stats from the scheduler jobs table. Updates are
+% performed no more frequently than once per ?STATS_UPDATE_WAIT milliseconds.
+
+update_running_jobs_stats(StatsPid) when is_pid(StatsPid) ->
+ StatsPid ! update_stats,
+ ok.
+
+
+start_stats_updater() ->
+ erlang:spawn_link(?MODULE, stats_updater_loop, [undefined]).
+
+
+stats_updater_loop(Timer) ->
+ receive
+ update_stats when Timer == undefined ->
+ TRef = erlang:send_after(?STATS_UPDATE_WAIT, self(), refresh_stats),
+ ?MODULE:stats_updater_loop(TRef);
+ update_stats when is_reference(Timer) ->
+ ?MODULE:stats_updater_loop(Timer);
+ refresh_stats ->
+ ok = stats_updater_refresh(),
+ ?MODULE:stats_updater_loop(undefined);
+ Else ->
+ erlang:exit({stats_updater_bad_msg, Else})
+ end.
+
+
+-spec stats_updater_refresh() -> ok.
+stats_updater_refresh() ->
+ #stats_acc{
+ pending_n = PendingN,
+ running_n = RunningN,
+ crashed_n = CrashedN
+ } = ets:foldl(fun stats_fold/2, #stats_acc{}, ?MODULE),
+ couch_stats:update_gauge([couch_replicator, jobs, pending], PendingN),
+ couch_stats:update_gauge([couch_replicator, jobs, running], RunningN),
+ couch_stats:update_gauge([couch_replicator, jobs, crashed], CrashedN),
+ ok.
+
+
+-spec stats_fold(#job{}, #stats_acc{}) -> #stats_acc{}.
+stats_fold(#job{pid = undefined, history = [{added, _}]}, Acc) ->
+ Acc#stats_acc{pending_n = Acc#stats_acc.pending_n + 1};
+stats_fold(#job{pid = undefined, history = [{stopped, _} | _]}, Acc) ->
+ Acc#stats_acc{pending_n = Acc#stats_acc.pending_n + 1};
+stats_fold(#job{pid = undefined, history = [{{crashed, _}, _} | _]}, Acc) ->
+ Acc#stats_acc{crashed_n =Acc#stats_acc.crashed_n + 1};
+stats_fold(#job{pid = P, history = [{started, _} | _]}, Acc) when is_pid(P) ->
+ Acc#stats_acc{running_n = Acc#stats_acc.running_n + 1}.
+
+
+
+-ifdef(TEST).
+
+-include_lib("eunit/include/eunit.hrl").
+
+
+backoff_micros_test_() ->
+ BaseInterval = ?BACKOFF_INTERVAL_MICROS,
+ [?_assertEqual(R * BaseInterval, backoff_micros(N)) || {R, N} <- [
+ {1, 1}, {2, 2}, {4, 3}, {8, 4}, {16, 5}, {32, 6}, {64, 7}, {128, 8},
+ {256, 9}, {512, 10}, {1024, 11}, {1024, 12}
+ ]].
+
+
+consecutive_crashes_test_() ->
+ Threshold = ?DEFAULT_HEALTH_THRESHOLD_SEC,
+ [?_assertEqual(R, consecutive_crashes(H, Threshold)) || {R, H} <- [
+ {0, []},
+ {0, [added()]},
+ {0, [stopped()]},
+ {0, [crashed()]},
+ {1, [crashed(), added()]},
+ {1, [crashed(), crashed()]},
+ {1, [crashed(), stopped()]},
+ {3, [crashed(), crashed(), crashed(), added()]},
+ {2, [crashed(), crashed(), stopped()]},
+ {1, [crashed(), started(), added()]},
+ {2, [crashed(3), started(2), crashed(1), started(0)]},
+ {0, [stopped(3), started(2), crashed(1), started(0)]},
+ {1, [crashed(3), started(2), stopped(1), started(0)]},
+ {0, [crashed(999), started(0)]},
+ {1, [crashed(999), started(998), crashed(997), started(0)]}
+ ]].
+
+
+consecutive_crashes_non_default_threshold_test_() ->
+ [?_assertEqual(R, consecutive_crashes(H, T)) || {R, H, T} <- [
+ {0, [crashed(11), started(0)], 10},
+ {1, [crashed(10), started(0)], 10}
+ ]].
+
+
+latest_crash_timestamp_test_() ->
+ [?_assertEqual({0, R, 0}, latest_crash_timestamp(H)) || {R, H} <- [
+ {0, [added()]},
+ {1, [crashed(1)]},
+ {3, [crashed(3), started(2), crashed(1), started(0)]},
+ {1, [started(3), stopped(2), crashed(1), started(0)]}
+ ]].
+
+
+last_started_test_() ->
+ [?_assertEqual({0, R, 0}, last_started(testjob(H))) || {R, H} <- [
+ {0, [added()]},
+ {0, [crashed(1)]},
+ {1, [started(1)]},
+ {1, [added(), started(1)]},
+ {2, [started(2), started(1)]},
+ {2, [crashed(3), started(2), started(1)]}
+ ]].
+
+
+longest_running_test() ->
+ J0 = testjob([crashed()]),
+ J1 = testjob([started(1)]),
+ J2 = testjob([started(2)]),
+ Sort = fun(Jobs) -> lists:sort(fun longest_running/2, Jobs) end,
+ ?assertEqual([], Sort([])),
+ ?assertEqual([J1], Sort([J1])),
+ ?assertEqual([J1, J2], Sort([J2, J1])),
+ ?assertEqual([J0, J1, J2], Sort([J2, J1, J0])).
+
+
+scheduler_test_() ->
+ {
+ foreach,
+ fun setup/0,
+ fun teardown/1,
+ [
+ t_pending_jobs_simple(),
+ t_pending_jobs_skip_crashed(),
+ t_one_job_starts(),
+ t_no_jobs_start_if_max_is_0(),
+ t_one_job_starts_if_max_is_1(),
+ t_max_churn_does_not_throttle_initial_start(),
+ t_excess_oneshot_only_jobs(),
+ t_excess_continuous_only_jobs(),
+ t_excess_prefer_continuous_first(),
+ t_stop_oldest_first(),
+ t_start_oldest_first(),
+ t_dont_stop_if_nothing_pending(),
+ t_max_churn_limits_number_of_rotated_jobs(),
+ t_if_pending_less_than_running_start_all_pending(),
+ t_running_less_than_pending_swap_all_running(),
+ t_oneshot_dont_get_rotated(),
+ t_rotate_continuous_only_if_mixed(),
+ t_oneshot_dont_get_starting_priority(),
+ t_oneshot_will_hog_the_scheduler(),
+ t_if_excess_is_trimmed_rotation_doesnt_happen(),
+ t_if_transient_job_crashes_it_gets_removed(),
+ t_if_permanent_job_crashes_it_stays_in_ets()
+ ]
+ }.
+
+
+t_pending_jobs_simple() ->
+ ?_test(begin
+ Job1 = oneshot(1),
+ Job2 = oneshot(2),
+ setup_jobs([Job2, Job1]),
+ ?assertEqual([], pending_jobs(0)),
+ ?assertEqual([Job1], pending_jobs(1)),
+ ?assertEqual([Job1, Job2], pending_jobs(2)),
+ ?assertEqual([Job1, Job2], pending_jobs(3))
+ end).
+
+
+t_pending_jobs_skip_crashed() ->
+ ?_test(begin
+ Job = oneshot(1),
+ Ts = os:timestamp(),
+ History = [crashed(Ts), started(Ts) | Job#job.history],
+ Job1 = Job#job{history = History},
+ Job2 = oneshot(2),
+ Job3 = oneshot(3),
+ setup_jobs([Job2, Job1, Job3]),
+ ?assertEqual([Job2], pending_jobs(1)),
+ ?assertEqual([Job2, Job3], pending_jobs(2)),
+ ?assertEqual([Job2, Job3], pending_jobs(3))
+ end).
+
+
+t_one_job_starts() ->
+ ?_test(begin
+ setup_jobs([oneshot(1)]),
+ ?assertEqual({0, 1}, run_stop_count()),
+ reschedule(mock_state(?DEFAULT_MAX_JOBS)),
+ ?assertEqual({1, 0}, run_stop_count())
+ end).
+
+
+t_no_jobs_start_if_max_is_0() ->
+ ?_test(begin
+ setup_jobs([oneshot(1)]),
+ reschedule(mock_state(0)),
+ ?assertEqual({0, 1}, run_stop_count())
+ end).
+
+
+t_one_job_starts_if_max_is_1() ->
+ ?_test(begin
+ setup_jobs([oneshot(1), oneshot(2)]),
+ reschedule(mock_state(1)),
+ ?assertEqual({1, 1}, run_stop_count())
+ end).
+
+
+t_max_churn_does_not_throttle_initial_start() ->
+ ?_test(begin
+ setup_jobs([oneshot(1), oneshot(2)]),
+ reschedule(mock_state(?DEFAULT_MAX_JOBS, 0)),
+ ?assertEqual({2, 0}, run_stop_count())
+ end).
+
+
+t_excess_oneshot_only_jobs() ->
+ ?_test(begin
+ setup_jobs([oneshot_running(1), oneshot_running(2)]),
+ ?assertEqual({2, 0}, run_stop_count()),
+ reschedule(mock_state(1)),
+ ?assertEqual({1, 1}, run_stop_count()),
+ reschedule(mock_state(0)),
+ ?assertEqual({0, 2}, run_stop_count())
+ end).
+
+
+t_excess_continuous_only_jobs() ->
+ ?_test(begin
+ setup_jobs([continuous_running(1), continuous_running(2)]),
+ ?assertEqual({2, 0}, run_stop_count()),
+ reschedule(mock_state(1)),
+ ?assertEqual({1, 1}, run_stop_count()),
+ reschedule(mock_state(0)),
+ ?assertEqual({0, 2}, run_stop_count())
+ end).
+
+
+t_excess_prefer_continuous_first() ->
+ ?_test(begin
+ Jobs = [
+ continuous_running(1),
+ oneshot_running(2),
+ continuous_running(3)
+ ],
+ setup_jobs(Jobs),
+ ?assertEqual({3, 0}, run_stop_count()),
+ ?assertEqual({1, 0}, oneshot_run_stop_count()),
+ reschedule(mock_state(2)),
+ ?assertEqual({2, 1}, run_stop_count()),
+ ?assertEqual({1, 0}, oneshot_run_stop_count()),
+ reschedule(mock_state(1)),
+ ?assertEqual({1, 0}, oneshot_run_stop_count()),
+ reschedule(mock_state(0)),
+ ?assertEqual({0, 1}, oneshot_run_stop_count())
+ end).
+
+
+t_stop_oldest_first() ->
+ ?_test(begin
+ Jobs = [
+ continuous_running(7),
+ continuous_running(4),
+ continuous_running(5)
+ ],
+ setup_jobs(Jobs),
+ reschedule(mock_state(2)),
+ ?assertEqual({2, 1}, run_stop_count()),
+ ?assertEqual([4], jobs_stopped()),
+ reschedule(mock_state(1)),
+ ?assertEqual([7], jobs_running())
+ end).
+
+
+t_start_oldest_first() ->
+ ?_test(begin
+ setup_jobs([continuous(7), continuous(2), continuous(5)]),
+ reschedule(mock_state(1)),
+ ?assertEqual({1, 2}, run_stop_count()),
+ ?assertEqual([2], jobs_running()),
+ reschedule(mock_state(2)),
+ ?assertEqual([7], jobs_stopped())
+ end).
+
+
+t_dont_stop_if_nothing_pending() ->
+ ?_test(begin
+ setup_jobs([continuous_running(1), continuous_running(2)]),
+ reschedule(mock_state(2)),
+ ?assertEqual({2, 0}, run_stop_count())
+ end).
+
+
+t_max_churn_limits_number_of_rotated_jobs() ->
+ ?_test(begin
+ Jobs = [
+ continuous(1),
+ continuous_running(2),
+ continuous(3),
+ continuous_running(4)
+ ],
+ setup_jobs(Jobs),
+ reschedule(mock_state(2, 1)),
+ ?assertEqual([2, 3], jobs_stopped())
+ end).
+
+
+t_if_pending_less_than_running_start_all_pending() ->
+ ?_test(begin
+ Jobs = [
+ continuous(1),
+ continuous_running(2),
+ continuous(3),
+ continuous_running(4),
+ continuous_running(5)
+ ],
+ setup_jobs(Jobs),
+ reschedule(mock_state(3)),
+ ?assertEqual([1, 2, 5], jobs_running())
+ end).
+
+
+t_running_less_than_pending_swap_all_running() ->
+ ?_test(begin
+ Jobs = [
+ continuous(1),
+ continuous(2),
+ continuous(3),
+ continuous_running(4),
+ continuous_running(5)
+ ],
+ setup_jobs(Jobs),
+ reschedule(mock_state(2)),
+ ?assertEqual([3, 4, 5], jobs_stopped())
+ end).
+
+
+t_oneshot_dont_get_rotated() ->
+ ?_test(begin
+ setup_jobs([oneshot_running(1), continuous(2)]),
+ reschedule(mock_state(1)),
+ ?assertEqual([1], jobs_running())
+ end).
+
+
+t_rotate_continuous_only_if_mixed() ->
+ ?_test(begin
+ setup_jobs([continuous(1), oneshot_running(2), continuous_running(3)]),
+ reschedule(mock_state(2)),
+ ?assertEqual([1, 2], jobs_running())
+ end).
+
+
+t_oneshot_dont_get_starting_priority() ->
+ ?_test(begin
+ setup_jobs([continuous(1), oneshot(2), continuous_running(3)]),
+ reschedule(mock_state(1)),
+ ?assertEqual([1], jobs_running())
+ end).
+
+
+% This tested in other test cases, it is here to mainly make explicit a property
+% of one-shot replications -- they can starve other jobs if they "take control"
+% of all the available scheduler slots.
+t_oneshot_will_hog_the_scheduler() ->
+ ?_test(begin
+ Jobs = [
+ oneshot_running(1),
+ oneshot_running(2),
+ oneshot(3),
+ continuous(4)
+ ],
+ setup_jobs(Jobs),
+ reschedule(mock_state(2)),
+ ?assertEqual([1, 2], jobs_running())
+ end).
+
+
+t_if_excess_is_trimmed_rotation_doesnt_happen() ->
+ ?_test(begin
+ Jobs = [
+ continuous(1),
+ continuous_running(2),
+ continuous_running(3)
+ ],
+ setup_jobs(Jobs),
+ reschedule(mock_state(1)),
+ ?assertEqual([3], jobs_running())
+ end).
+
+
+t_if_transient_job_crashes_it_gets_removed() ->
+ ?_test(begin
+ Pid = mock_pid(),
+ Job = #job{
+ id = job1,
+ pid = Pid,
+ history = [added()],
+ rep = #rep{db_name = null, options = [{continuous, true}]}
+ },
+ setup_jobs([Job]),
+ ?assertEqual(1, ets:info(?MODULE, size)),
+ State = #state{max_history = 3, stats_pid = self()},
+ {noreply, State} = handle_info({'DOWN', r1, process, Pid, failed},
+ State),
+ ?assertEqual(0, ets:info(?MODULE, size))
+ end).
+
+
+t_if_permanent_job_crashes_it_stays_in_ets() ->
+ ?_test(begin
+ Pid = mock_pid(),
+ Job = #job{
+ id = job1,
+ pid = Pid,
+ history = [added()],
+ rep = #rep{db_name = <<"db1">>, options = [{continuous, true}]}
+ },
+ setup_jobs([Job]),
+ ?assertEqual(1, ets:info(?MODULE, size)),
+ State = #state{max_jobs =1, max_history = 3, stats_pid = self()},
+ {noreply, State} = handle_info({'DOWN', r1, process, Pid, failed},
+ State),
+ ?assertEqual(1, ets:info(?MODULE, size)),
+ [Job1] = ets:lookup(?MODULE, job1),
+ [Latest | _] = Job1#job.history,
+ ?assertMatch({{crashed, failed}, _}, Latest)
+ end).
+
+
+% Test helper functions
+
+setup() ->
+ catch ets:delete(?MODULE),
+ meck:expect(couch_log, notice, 2, ok),
+ meck:expect(couch_log, warning, 2, ok),
+ meck:expect(couch_log, error, 2, ok),
+ meck:expect(couch_replicator_scheduler_sup, terminate_child, 1, ok),
+ meck:expect(couch_stats, increment_counter, 1, ok),
+ meck:expect(couch_stats, update_gauge, 2, ok),
+ Pid = mock_pid(),
+ meck:expect(couch_replicator_scheduler_sup, start_child, 1, {ok, Pid}).
+
+
+teardown(_) ->
+ catch ets:delete(?MODULE),
+ meck:unload().
+
+
+setup_jobs(Jobs) when is_list(Jobs) ->
+ ?MODULE = ets:new(?MODULE, [named_table, {keypos, #job.id}]),
+ ets:insert(?MODULE, Jobs).
+
+
+all_jobs() ->
+ lists:usort(ets:tab2list(?MODULE)).
+
+
+jobs_stopped() ->
+ [Job#job.id || Job <- all_jobs(), Job#job.pid =:= undefined].
+
+
+jobs_running() ->
+ [Job#job.id || Job <- all_jobs(), Job#job.pid =/= undefined].
+
+
+run_stop_count() ->
+ {length(jobs_running()), length(jobs_stopped())}.
+
+
+oneshot_run_stop_count() ->
+ Running = [Job#job.id || Job <- all_jobs(), Job#job.pid =/= undefined,
+ not is_continuous(Job)],
+ Stopped = [Job#job.id || Job <- all_jobs(), Job#job.pid =:= undefined,
+ not is_continuous(Job)],
+ {length(Running), length(Stopped)}.
+
+
+mock_state(MaxJobs) ->
+ #state{
+ max_jobs = MaxJobs,
+ max_churn = ?DEFAULT_MAX_CHURN,
+ max_history = ?DEFAULT_MAX_HISTORY,
+ stats_pid = self()
+ }.
+
+mock_state(MaxJobs, MaxChurn) ->
+ #state{
+ max_jobs = MaxJobs,
+ max_churn = MaxChurn,
+ max_history = ?DEFAULT_MAX_HISTORY,
+ stats_pid = self()
+ }.
+
+
+continuous(Id) when is_integer(Id) ->
+ Started = Id,
+ Hist = [stopped(Started+1), started(Started), added()],
+ #job{
+ id = Id,
+ history = Hist,
+ rep = #rep{options = [{continuous, true}]}
+ }.
+
+
+continuous_running(Id) when is_integer(Id) ->
+ Started = Id,
+ Pid = mock_pid(),
+ #job{
+ id = Id,
+ history = [started(Started), added()],
+ rep = #rep{options = [{continuous, true}]},
+ pid = Pid,
+ monitor = monitor(process, Pid)
+ }.
+
+
+oneshot(Id) when is_integer(Id) ->
+ Started = Id,
+ Hist = [stopped(Started + 1), started(Started), added()],
+ #job{id = Id, history = Hist, rep = #rep{options = []}}.
+
+
+oneshot_running(Id) when is_integer(Id) ->
+ Started = Id,
+ Pid = mock_pid(),
+ #job{
+ id = Id,
+ history = [started(Started), added()],
+ rep = #rep{options = []},
+ pid = Pid,
+ monitor = monitor(process, Pid)
+ }.
+
+
+testjob(Hist) when is_list(Hist) ->
+ #job{history = Hist}.
+
+
+mock_pid() ->
+ list_to_pid("<0.999.999>").
+
+crashed() ->
+ crashed(0).
+
+
+crashed(WhenSec) when is_integer(WhenSec)->
+ {{crashed, some_reason}, {0, WhenSec, 0}};
+crashed({MSec, Sec, USec}) ->
+ {{crashed, some_reason}, {MSec, Sec, USec}}.
+
+
+started() ->
+ started(0).
+
+
+started(WhenSec) when is_integer(WhenSec)->
+ {started, {0, WhenSec, 0}};
+
+started({MSec, Sec, USec}) ->
+ {started, {MSec, Sec, USec}}.
+
+
+stopped() ->
+ stopped(0).
+
+
+stopped(WhenSec) ->
+ {stopped, {0, WhenSec, 0}}.
+
+
+added() ->
+ {added, {0, 0, 0}}.
+
+-endif.
diff --git a/src/couch_replicator/src/couch_replicator_scheduler.hrl b/src/couch_replicator/src/couch_replicator_scheduler.hrl
new file mode 100644
index 000000000..5203b0caa
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_scheduler.hrl
@@ -0,0 +1,15 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+
+-type job_id() :: term().
+-type job_args() :: term().
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
new file mode 100644
index 000000000..3253ce526
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
@@ -0,0 +1,969 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_scheduler_job).
+
+-behaviour(gen_server).
+
+-export([
+ start_link/1
+]).
+
+-export([
+ init/1,
+ terminate/2,
+ handle_call/3,
+ handle_info/2,
+ handle_cast/2,
+ code_change/3,
+ format_status/2
+]).
+
+-include_lib("couch/include/couch_db.hrl").
+-include("couch_replicator_api_wrap.hrl").
+-include("couch_replicator_scheduler.hrl").
+-include("couch_replicator.hrl").
+
+-import(couch_util, [
+ get_value/2,
+ get_value/3,
+ to_binary/1
+]).
+
+-import(couch_replicator_utils, [
+ start_db_compaction_notifier/2,
+ stop_db_compaction_notifier/1,
+ pp_rep_id/1
+]).
+
+
+-define(LOWEST_SEQ, 0).
+-define(DEFAULT_CHECKPOINT_INTERVAL, 30000).
+-define(STARTUP_JITTER_DEFAULT, 5000).
+
+-record(rep_state, {
+ rep_details,
+ source_name,
+ target_name,
+ source,
+ target,
+ history,
+ checkpoint_history,
+ start_seq,
+ committed_seq,
+ current_through_seq,
+ seqs_in_progress = [],
+ highest_seq_done = {0, ?LOWEST_SEQ},
+ source_log,
+ target_log,
+ rep_starttime,
+ src_starttime,
+ tgt_starttime,
+ timer, % checkpoint timer
+ changes_queue,
+ changes_manager,
+ changes_reader,
+ workers,
+ stats = couch_replicator_stats:new(),
+ session_id,
+ source_db_compaction_notifier = nil,
+ target_db_compaction_notifier = nil,
+ source_monitor = nil,
+ target_monitor = nil,
+ source_seq = nil,
+ use_checkpoints = true,
+ checkpoint_interval = ?DEFAULT_CHECKPOINT_INTERVAL,
+ type = db,
+ view = nil
+}).
+
+
+start_link(#rep{id = {BaseId, Ext}, source = Src, target = Tgt} = Rep) ->
+ RepChildId = BaseId ++ Ext,
+ Source = couch_replicator_api_wrap:db_uri(Src),
+ Target = couch_replicator_api_wrap:db_uri(Tgt),
+ ServerName = {global, {?MODULE, Rep#rep.id}},
+
+ case gen_server:start_link(ServerName, ?MODULE, Rep, []) of
+ {ok, Pid} ->
+ couch_log:notice("starting new replication `~s` at ~p (`~s` -> `~s`)",
+ [RepChildId, Pid, Source, Target]),
+ {ok, Pid};
+ {error, Reason} ->
+ couch_log:warning("failed to start replication `~s` (`~s` -> `~s`)",
+ [RepChildId, Source, Target]),
+ {error, Reason}
+ end.
+
+
+init(InitArgs) ->
+ {ok, InitArgs, 0}.
+
+
+do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
+ process_flag(trap_exit, true),
+
+ random:seed(os:timestamp()),
+ timer:sleep(startup_jitter()),
+
+ #rep_state{
+ source = Source,
+ target = Target,
+ source_name = SourceName,
+ target_name = TargetName,
+ start_seq = {_Ts, StartSeq},
+ committed_seq = {_, CommittedSeq},
+ highest_seq_done = {_, HighestSeq},
+ checkpoint_interval = CheckpointInterval
+ } = State = init_state(Rep),
+
+ NumWorkers = get_value(worker_processes, Options),
+ BatchSize = get_value(worker_batch_size, Options),
+ {ok, ChangesQueue} = couch_work_queue:new([
+ {max_items, BatchSize * NumWorkers * 2},
+ {max_size, 100 * 1024 * NumWorkers}
+ ]),
+ % This starts the _changes reader process. It adds the changes from
+ % the source db to the ChangesQueue.
+ {ok, ChangesReader} = couch_replicator_changes_reader:start_link(
+ StartSeq, Source, ChangesQueue, Options
+ ),
+ % Changes manager - responsible for dequeing batches from the changes queue
+ % and deliver them to the worker processes.
+ ChangesManager = spawn_changes_manager(self(), ChangesQueue, BatchSize),
+ % This starts the worker processes. They ask the changes queue manager for a
+ % a batch of _changes rows to process -> check which revs are missing in the
+ % target, and for the missing ones, it copies them from the source to the target.
+ MaxConns = get_value(http_connections, Options),
+ Workers = lists:map(
+ fun(_) ->
+ couch_stats:increment_counter([couch_replicator, workers_started]),
+ {ok, Pid} = couch_replicator_worker:start_link(
+ self(), Source, Target, ChangesManager, MaxConns),
+ Pid
+ end,
+ lists:seq(1, NumWorkers)),
+
+ couch_task_status:add_task([
+ {type, replication},
+ {user, UserCtx#user_ctx.name},
+ {replication_id, ?l2b(BaseId ++ Ext)},
+ {database, Rep#rep.db_name},
+ {doc_id, Rep#rep.doc_id},
+ {source, ?l2b(SourceName)},
+ {target, ?l2b(TargetName)},
+ {continuous, get_value(continuous, Options, false)},
+ {revisions_checked, 0},
+ {missing_revisions_found, 0},
+ {docs_read, 0},
+ {docs_written, 0},
+ {changes_pending, get_pending_count(State)},
+ {doc_write_failures, 0},
+ {source_seq, HighestSeq},
+ {checkpointed_source_seq, CommittedSeq},
+ {checkpoint_interval, CheckpointInterval}
+ ]),
+ couch_task_status:set_update_frequency(1000),
+
+ % Until OTP R14B03:
+ %
+ % Restarting a temporary supervised child implies that the original arguments
+ % (#rep{} record) specified in the MFA component of the supervisor
+ % child spec will always be used whenever the child is restarted.
+ % This implies the same replication performance tunning parameters will
+ % always be used. The solution is to delete the child spec (see
+ % cancel_replication/1) and then start the replication again, but this is
+ % unfortunately not immune to race conditions.
+
+ couch_log:notice("Replication `~p` is using:~n"
+ "~c~p worker processes~n"
+ "~ca worker batch size of ~p~n"
+ "~c~p HTTP connections~n"
+ "~ca connection timeout of ~p milliseconds~n"
+ "~c~p retries per request~n"
+ "~csocket options are: ~s~s",
+ [BaseId ++ Ext, $\t, NumWorkers, $\t, BatchSize, $\t,
+ MaxConns, $\t, get_value(connection_timeout, Options),
+ $\t, get_value(retries, Options),
+ $\t, io_lib:format("~p", [get_value(socket_options, Options)]),
+ case StartSeq of
+ ?LOWEST_SEQ ->
+ "";
+ _ ->
+ io_lib:format("~n~csource start sequence ~p", [$\t, StartSeq])
+ end]),
+
+ couch_log:debug("Worker pids are: ~p", [Workers]),
+
+ doc_update_triggered(Rep),
+
+ {ok, State#rep_state{
+ changes_queue = ChangesQueue,
+ changes_manager = ChangesManager,
+ changes_reader = ChangesReader,
+ workers = Workers
+ }
+ }.
+
+
+handle_call(get_details, _From, #rep_state{rep_details = Rep} = State) ->
+ {reply, {ok, Rep}, State};
+
+handle_call({add_stats, Stats}, From, State) ->
+ gen_server:reply(From, ok),
+ NewStats = couch_replicator_utils:sum_stats(State#rep_state.stats, Stats),
+ {noreply, State#rep_state{stats = NewStats}};
+
+handle_call({report_seq_done, Seq, StatsInc}, From,
+ #rep_state{seqs_in_progress = SeqsInProgress, highest_seq_done = HighestDone,
+ current_through_seq = ThroughSeq, stats = Stats} = State) ->
+ gen_server:reply(From, ok),
+ {NewThroughSeq0, NewSeqsInProgress} = case SeqsInProgress of
+ [] ->
+ {Seq, []};
+ [Seq | Rest] ->
+ {Seq, Rest};
+ [_ | _] ->
+ {ThroughSeq, ordsets:del_element(Seq, SeqsInProgress)}
+ end,
+ NewHighestDone = lists:max([HighestDone, Seq]),
+ NewThroughSeq = case NewSeqsInProgress of
+ [] ->
+ lists:max([NewThroughSeq0, NewHighestDone]);
+ _ ->
+ NewThroughSeq0
+ end,
+ couch_log:debug("Worker reported seq ~p, through seq was ~p, "
+ "new through seq is ~p, highest seq done was ~p, "
+ "new highest seq done is ~p~n"
+ "Seqs in progress were: ~p~nSeqs in progress are now: ~p",
+ [Seq, ThroughSeq, NewThroughSeq, HighestDone,
+ NewHighestDone, SeqsInProgress, NewSeqsInProgress]),
+ NewState = State#rep_state{
+ stats = couch_replicator_utils:sum_stats(Stats, StatsInc),
+ current_through_seq = NewThroughSeq,
+ seqs_in_progress = NewSeqsInProgress,
+ highest_seq_done = NewHighestDone
+ },
+ update_task(NewState),
+ {noreply, NewState}.
+
+
+handle_cast({db_compacted, DbName},
+ #rep_state{source = #db{name = DbName} = Source} = State) ->
+ {ok, NewSource} = couch_db:reopen(Source),
+ {noreply, State#rep_state{source = NewSource}};
+
+handle_cast({db_compacted, DbName},
+ #rep_state{target = #db{name = DbName} = Target} = State) ->
+ {ok, NewTarget} = couch_db:reopen(Target),
+ {noreply, State#rep_state{target = NewTarget}};
+
+handle_cast(checkpoint, State) ->
+ case do_checkpoint(State) of
+ {ok, NewState} ->
+ couch_stats:increment_counter([couch_replicator, checkpoints, success]),
+ {noreply, NewState#rep_state{timer = start_timer(State)}};
+ Error ->
+ couch_stats:increment_counter([couch_replicator, checkpoints, failure]),
+ {stop, Error, State}
+ end;
+
+handle_cast({report_seq, Seq},
+ #rep_state{seqs_in_progress = SeqsInProgress} = State) ->
+ NewSeqsInProgress = ordsets:add_element(Seq, SeqsInProgress),
+ {noreply, State#rep_state{seqs_in_progress = NewSeqsInProgress}}.
+
+
+handle_info(shutdown, St) ->
+ {stop, shutdown, St};
+
+handle_info({'DOWN', Ref, _, _, Why}, #rep_state{source_monitor = Ref} = St) ->
+ couch_log:error("Source database is down. Reason: ~p", [Why]),
+ {stop, source_db_down, St};
+
+handle_info({'DOWN', Ref, _, _, Why}, #rep_state{target_monitor = Ref} = St) ->
+ couch_log:error("Target database is down. Reason: ~p", [Why]),
+ {stop, target_db_down, St};
+
+handle_info({'EXIT', Pid, max_backoff}, State) ->
+ couch_log:error("Max backoff reached child process ~p", [Pid]),
+ {stop, {shutdown, max_backoff}, State};
+
+handle_info({'EXIT', Pid, {shutdown, max_backoff}}, State) ->
+ couch_log:error("Max backoff reached child process ~p", [Pid]),
+ {stop, {shutdown, max_backoff}, State};
+
+handle_info({'EXIT', Pid, normal}, #rep_state{changes_reader=Pid} = State) ->
+ {noreply, State};
+
+handle_info({'EXIT', Pid, Reason}, #rep_state{changes_reader=Pid} = State) ->
+ couch_stats:increment_counter([couch_replicator, changes_reader_deaths]),
+ couch_log:error("ChangesReader process died with reason: ~p", [Reason]),
+ {stop, changes_reader_died, cancel_timer(State)};
+
+handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager = Pid} = State) ->
+ {noreply, State};
+
+handle_info({'EXIT', Pid, Reason}, #rep_state{changes_manager = Pid} = State) ->
+ couch_stats:increment_counter([couch_replicator, changes_manager_deaths]),
+ couch_log:error("ChangesManager process died with reason: ~p", [Reason]),
+ {stop, changes_manager_died, cancel_timer(State)};
+
+handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) ->
+ {noreply, State};
+
+handle_info({'EXIT', Pid, Reason}, #rep_state{changes_queue=Pid} = State) ->
+ couch_stats:increment_counter([couch_replicator, changes_queue_deaths]),
+ couch_log:error("ChangesQueue process died with reason: ~p", [Reason]),
+ {stop, changes_queue_died, cancel_timer(State)};
+
+handle_info({'EXIT', Pid, normal}, #rep_state{workers = Workers} = State) ->
+ case Workers -- [Pid] of
+ Workers ->
+ couch_log:error("unknown pid bit the dust ~p ~n",[Pid]),
+ {noreply, State#rep_state{workers = Workers}};
+ %% not clear why a stop was here before
+ %%{stop, {unknown_process_died, Pid, normal}, State};
+ [] ->
+ catch unlink(State#rep_state.changes_manager),
+ catch exit(State#rep_state.changes_manager, kill),
+ do_last_checkpoint(State);
+ Workers2 ->
+ {noreply, State#rep_state{workers = Workers2}}
+ end;
+
+handle_info({'EXIT', Pid, Reason}, #rep_state{workers = Workers} = State) ->
+ State2 = cancel_timer(State),
+ case lists:member(Pid, Workers) of
+ false ->
+ {stop, {unknown_process_died, Pid, Reason}, State2};
+ true ->
+ couch_stats:increment_counter([couch_replicator, worker_deaths]),
+ couch_log:error("Worker ~p died with reason: ~p", [Pid, Reason]),
+ {stop, {worker_died, Pid, Reason}, State2}
+ end;
+
+handle_info(timeout, InitArgs) ->
+ try do_init(InitArgs) of {ok, State} ->
+ {noreply, State}
+ catch
+ exit:{http_request_failed, _, _, max_backoff} ->
+ {stop, {shutdown, max_backoff}, {error, InitArgs}};
+ Class:Error ->
+ ShutdownReason = {error, replication_start_error(Error)},
+ % Shutdown state is a hack as it is not really the state of the
+ % gen_server (it failed to initialize, so it doesn't have one).
+ % Shutdown state is used to pass extra info about why start failed.
+ ShutdownState = {error, Class, erlang:get_stacktrace(), InitArgs},
+ {stop, {shutdown, ShutdownReason}, ShutdownState}
+ end.
+
+
+terminate(normal, #rep_state{rep_details = #rep{id = RepId} = Rep,
+ checkpoint_history = CheckpointHistory} = State) ->
+ terminate_cleanup(State),
+ couch_replicator_notifier:notify({finished, RepId, CheckpointHistory}),
+ doc_update_completed(Rep, rep_stats(State));
+
+terminate(shutdown, #rep_state{rep_details = #rep{id = RepId}} = State) ->
+ % Replication stopped via _scheduler_sup:terminate_child/1, which can be
+ % occur during regular scheduler operation or when job is removed from
+ % the scheduler.
+ State1 = case do_checkpoint(State) of
+ {ok, NewState} ->
+ NewState;
+ Error ->
+ LogMsg = "~p : Failed last checkpoint. Job: ~p Error: ~p",
+ couch_log:error(LogMsg, [?MODULE, RepId, Error]),
+ State
+ end,
+ couch_replicator_notifier:notify({stopped, RepId, <<"stopped">>}),
+ terminate_cleanup(State1);
+
+terminate({shutdown, max_backoff}, {error, InitArgs}) ->
+ #rep{id = {BaseId, Ext} = RepId} = InitArgs,
+ couch_stats:increment_counter([couch_replicator, failed_starts]),
+ couch_log:warning("Replication `~s` reached max backoff ", [BaseId ++ Ext]),
+ couch_replicator_notifier:notify({error, RepId, max_backoff});
+
+terminate({shutdown, {error, Error}}, {error, Class, Stack, InitArgs}) ->
+ #rep{id=RepId} = InitArgs,
+ couch_stats:increment_counter([couch_replicator, failed_starts]),
+ CleanInitArgs = rep_strip_creds(InitArgs),
+ couch_log:error("~p:~p: Replication failed to start for args ~p: ~p",
+ [Class, Error, CleanInitArgs, Stack]),
+ couch_replicator_notifier:notify({error, RepId, Error});
+
+terminate({shutdown, max_backoff}, State) ->
+ #rep_state{
+ source_name = Source,
+ target_name = Target,
+ rep_details = #rep{id = {BaseId, Ext} = RepId}
+ } = State,
+ couch_log:error("Replication `~s` (`~s` -> `~s`) reached max backoff",
+ [BaseId ++ Ext, Source, Target]),
+ terminate_cleanup(State),
+ couch_replicator_notifier:notify({error, RepId, max_backoff});
+
+terminate(Reason, State) ->
+#rep_state{
+ source_name = Source,
+ target_name = Target,
+ rep_details = #rep{id = {BaseId, Ext} = RepId}
+ } = State,
+ couch_log:error("Replication `~s` (`~s` -> `~s`) failed: ~s",
+ [BaseId ++ Ext, Source, Target, to_binary(Reason)]),
+ terminate_cleanup(State),
+ couch_replicator_notifier:notify({error, RepId, Reason}).
+
+terminate_cleanup(State) ->
+ update_task(State),
+ stop_db_compaction_notifier(State#rep_state.source_db_compaction_notifier),
+ stop_db_compaction_notifier(State#rep_state.target_db_compaction_notifier),
+ couch_replicator_api_wrap:db_close(State#rep_state.source),
+ couch_replicator_api_wrap:db_close(State#rep_state.target).
+
+
+code_change(_OldVsn, #rep_state{}=State, _Extra) ->
+ {ok, State}.
+
+
+format_status(_Opt, [_PDict, State]) ->
+ [{data, [{"State", state_strip_creds(State)}]}].
+
+
+startup_jitter() ->
+ Jitter = config:get_integer("replicator", "startup_jitter",
+ ?STARTUP_JITTER_DEFAULT),
+ random:uniform(erlang:max(1, Jitter)).
+
+
+headers_strip_creds([], Acc) ->
+ lists:reverse(Acc);
+headers_strip_creds([{Key, Value0} | Rest], Acc) ->
+ Value = case string:to_lower(Key) of
+ "authorization" ->
+ "****";
+ _ ->
+ Value0
+ end,
+ headers_strip_creds(Rest, [{Key, Value} | Acc]).
+
+
+httpdb_strip_creds(#httpdb{url = Url, headers = Headers} = HttpDb) ->
+ HttpDb#httpdb{
+ url = couch_util:url_strip_password(Url),
+ headers = headers_strip_creds(Headers, [])
+ };
+httpdb_strip_creds(LocalDb) ->
+ LocalDb.
+
+
+rep_strip_creds(#rep{source = Source, target = Target} = Rep) ->
+ Rep#rep{
+ source = httpdb_strip_creds(Source),
+ target = httpdb_strip_creds(Target)
+ }.
+
+
+state_strip_creds(#rep_state{rep_details = Rep, source = Source, target = Target} = State) ->
+ % #rep_state contains the source and target at the top level and also
+ % in the nested #rep_details record
+ State#rep_state{
+ rep_details = rep_strip_creds(Rep),
+ source = httpdb_strip_creds(Source),
+ target = httpdb_strip_creds(Target)
+ }.
+
+
+adjust_maxconn(Src = #httpdb{http_connections = 1}, RepId) ->
+ Msg = "Adjusting minimum number of HTTP source connections to 2 for ~p",
+ couch_log:notice(Msg, [RepId]),
+ Src#httpdb{http_connections = 2};
+adjust_maxconn(Src, _RepId) ->
+ Src.
+
+
+-spec doc_update_triggered(#rep{}) -> ok.
+doc_update_triggered(#rep{db_name = null}) ->
+ ok;
+doc_update_triggered(#rep{id = RepId, doc_id = DocId} = Rep) ->
+ case couch_replicator_doc_processor:update_docs() of
+ true ->
+ couch_replicator_docs:update_triggered(Rep, RepId);
+ false ->
+ ok
+ end,
+ couch_log:notice("Document `~s` triggered replication `~s`",
+ [DocId, pp_rep_id(RepId)]),
+ ok.
+
+
+-spec doc_update_completed(#rep{}, list()) -> ok.
+doc_update_completed(#rep{db_name = null}, _Stats) ->
+ ok;
+doc_update_completed(#rep{id = RepId, doc_id = DocId, db_name = DbName,
+ start_time = StartTime}, Stats0) ->
+ Stats = Stats0 ++ [{start_time, couch_replicator_utils:iso8601(StartTime)}],
+ couch_replicator_docs:update_doc_completed(DbName, DocId, Stats),
+ couch_log:notice("Replication `~s` completed (triggered by `~s`)",
+ [pp_rep_id(RepId), DocId]),
+ ok.
+
+
+do_last_checkpoint(#rep_state{seqs_in_progress = [],
+ highest_seq_done = {_Ts, ?LOWEST_SEQ}} = State) ->
+ {stop, normal, cancel_timer(State)};
+do_last_checkpoint(#rep_state{seqs_in_progress = [],
+ highest_seq_done = Seq} = State) ->
+ case do_checkpoint(State#rep_state{current_through_seq = Seq}) of
+ {ok, NewState} ->
+ couch_stats:increment_counter([couch_replicator, checkpoints, success]),
+ {stop, normal, cancel_timer(NewState)};
+ Error ->
+ couch_stats:increment_counter([couch_replicator, checkpoints, failure]),
+ {stop, Error, State}
+ end.
+
+
+start_timer(State) ->
+ After = State#rep_state.checkpoint_interval,
+ case timer:apply_after(After, gen_server, cast, [self(), checkpoint]) of
+ {ok, Ref} ->
+ Ref;
+ Error ->
+ couch_log:error("Replicator, error scheduling checkpoint: ~p", [Error]),
+ nil
+ end.
+
+
+cancel_timer(#rep_state{timer = nil} = State) ->
+ State;
+cancel_timer(#rep_state{timer = Timer} = State) ->
+ {ok, cancel} = timer:cancel(Timer),
+ State#rep_state{timer = nil}.
+
+
+init_state(Rep) ->
+ #rep{
+ id = {BaseId, _Ext},
+ source = Src0, target = Tgt,
+ options = Options, user_ctx = UserCtx,
+ type = Type, view = View,
+ start_time = StartTime
+ } = Rep,
+ % Adjust minimum number of http source connections to 2 to avoid deadlock
+ Src = adjust_maxconn(Src0, BaseId),
+ {ok, Source} = couch_replicator_api_wrap:db_open(Src, [{user_ctx, UserCtx}]),
+ {ok, Target} = couch_replicator_api_wrap:db_open(Tgt, [{user_ctx, UserCtx}],
+ get_value(create_target, Options, false)),
+
+ {ok, SourceInfo} = couch_replicator_api_wrap:get_db_info(Source),
+ {ok, TargetInfo} = couch_replicator_api_wrap:get_db_info(Target),
+
+ [SourceLog, TargetLog] = find_replication_logs([Source, Target], Rep),
+
+ {StartSeq0, History} = compare_replication_logs(SourceLog, TargetLog),
+ StartSeq1 = get_value(since_seq, Options, StartSeq0),
+ StartSeq = {0, StartSeq1},
+
+ SourceSeq = get_value(<<"update_seq">>, SourceInfo, ?LOWEST_SEQ),
+
+ #doc{body={CheckpointHistory}} = SourceLog,
+ State = #rep_state{
+ rep_details = Rep,
+ source_name = couch_replicator_api_wrap:db_uri(Source),
+ target_name = couch_replicator_api_wrap:db_uri(Target),
+ source = Source,
+ target = Target,
+ history = History,
+ checkpoint_history = {[{<<"no_changes">>, true}| CheckpointHistory]},
+ start_seq = StartSeq,
+ current_through_seq = StartSeq,
+ committed_seq = StartSeq,
+ source_log = SourceLog,
+ target_log = TargetLog,
+ rep_starttime = StartTime,
+ src_starttime = get_value(<<"instance_start_time">>, SourceInfo),
+ tgt_starttime = get_value(<<"instance_start_time">>, TargetInfo),
+ session_id = couch_uuids:random(),
+ source_db_compaction_notifier =
+ start_db_compaction_notifier(Source, self()),
+ target_db_compaction_notifier =
+ start_db_compaction_notifier(Target, self()),
+ source_monitor = db_monitor(Source),
+ target_monitor = db_monitor(Target),
+ source_seq = SourceSeq,
+ use_checkpoints = get_value(use_checkpoints, Options, true),
+ checkpoint_interval = get_value(checkpoint_interval, Options,
+ ?DEFAULT_CHECKPOINT_INTERVAL),
+ type = Type,
+ view = View
+ },
+ State#rep_state{timer = start_timer(State)}.
+
+
+find_replication_logs(DbList, #rep{id = {BaseId, _}} = Rep) ->
+ LogId = ?l2b(?LOCAL_DOC_PREFIX ++ BaseId),
+ fold_replication_logs(DbList, ?REP_ID_VERSION, LogId, LogId, Rep, []).
+
+
+fold_replication_logs([], _Vsn, _LogId, _NewId, _Rep, Acc) ->
+ lists:reverse(Acc);
+
+fold_replication_logs([Db | Rest] = Dbs, Vsn, LogId, NewId, Rep, Acc) ->
+ case couch_replicator_api_wrap:open_doc(Db, LogId, [ejson_body]) of
+ {error, <<"not_found">>} when Vsn > 1 ->
+ OldRepId = couch_replicator_utils:replication_id(Rep, Vsn - 1),
+ fold_replication_logs(Dbs, Vsn - 1,
+ ?l2b(?LOCAL_DOC_PREFIX ++ OldRepId), NewId, Rep, Acc);
+ {error, <<"not_found">>} ->
+ fold_replication_logs(
+ Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [#doc{id = NewId} | Acc]);
+ {ok, Doc} when LogId =:= NewId ->
+ fold_replication_logs(
+ Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [Doc | Acc]);
+ {ok, Doc} ->
+ MigratedLog = #doc{id = NewId, body = Doc#doc.body},
+ fold_replication_logs(
+ Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [MigratedLog | Acc])
+ end.
+
+
+spawn_changes_manager(Parent, ChangesQueue, BatchSize) ->
+ spawn_link(fun() ->
+ changes_manager_loop_open(Parent, ChangesQueue, BatchSize, 1)
+ end).
+
+
+changes_manager_loop_open(Parent, ChangesQueue, BatchSize, Ts) ->
+ receive
+ {get_changes, From} ->
+ case couch_work_queue:dequeue(ChangesQueue, BatchSize) of
+ closed ->
+ From ! {closed, self()};
+ {ok, Changes} ->
+ #doc_info{high_seq = Seq} = lists:last(Changes),
+ ReportSeq = {Ts, Seq},
+ ok = gen_server:cast(Parent, {report_seq, ReportSeq}),
+ From ! {changes, self(), Changes, ReportSeq}
+ end,
+ changes_manager_loop_open(Parent, ChangesQueue, BatchSize, Ts + 1)
+ end.
+
+
+do_checkpoint(#rep_state{use_checkpoints=false} = State) ->
+ NewState = State#rep_state{checkpoint_history = {[{<<"use_checkpoints">>, false}]} },
+ {ok, NewState};
+do_checkpoint(#rep_state{current_through_seq=Seq, committed_seq=Seq} = State) ->
+ update_task(State),
+ {ok, State};
+do_checkpoint(State) ->
+ #rep_state{
+ source_name=SourceName,
+ target_name=TargetName,
+ source = Source,
+ target = Target,
+ history = OldHistory,
+ start_seq = {_, StartSeq},
+ current_through_seq = {_Ts, NewSeq} = NewTsSeq,
+ source_log = SourceLog,
+ target_log = TargetLog,
+ rep_starttime = ReplicationStartTime,
+ src_starttime = SrcInstanceStartTime,
+ tgt_starttime = TgtInstanceStartTime,
+ stats = Stats,
+ rep_details = #rep{options = Options},
+ session_id = SessionId
+ } = State,
+ case commit_to_both(Source, Target) of
+ {source_error, Reason} ->
+ {checkpoint_commit_failure,
+ <<"Failure on source commit: ", (to_binary(Reason))/binary>>};
+ {target_error, Reason} ->
+ {checkpoint_commit_failure,
+ <<"Failure on target commit: ", (to_binary(Reason))/binary>>};
+ {SrcInstanceStartTime, TgtInstanceStartTime} ->
+ couch_log:notice("recording a checkpoint for `~s` -> `~s` at source update_seq ~p",
+ [SourceName, TargetName, NewSeq]),
+ UniversalStartTime = calendar:now_to_universal_time(ReplicationStartTime),
+ StartTime = ?l2b(httpd_util:rfc1123_date(UniversalStartTime)),
+ EndTime = ?l2b(httpd_util:rfc1123_date()),
+ NewHistoryEntry = {[
+ {<<"session_id">>, SessionId},
+ {<<"start_time">>, StartTime},
+ {<<"end_time">>, EndTime},
+ {<<"start_last_seq">>, StartSeq},
+ {<<"end_last_seq">>, NewSeq},
+ {<<"recorded_seq">>, NewSeq},
+ {<<"missing_checked">>, couch_replicator_stats:missing_checked(Stats)},
+ {<<"missing_found">>, couch_replicator_stats:missing_found(Stats)},
+ {<<"docs_read">>, couch_replicator_stats:docs_read(Stats)},
+ {<<"docs_written">>, couch_replicator_stats:docs_written(Stats)},
+ {<<"doc_write_failures">>, couch_replicator_stats:doc_write_failures(Stats)}
+ ]},
+ BaseHistory = [
+ {<<"session_id">>, SessionId},
+ {<<"source_last_seq">>, NewSeq},
+ {<<"replication_id_version">>, ?REP_ID_VERSION}
+ ] ++ case get_value(doc_ids, Options) of
+ undefined ->
+ [];
+ _DocIds ->
+ % backwards compatibility with the result of a replication by
+ % doc IDs in versions 0.11.x and 1.0.x
+ % TODO: deprecate (use same history format, simplify code)
+ [
+ {<<"start_time">>, StartTime},
+ {<<"end_time">>, EndTime},
+ {<<"docs_read">>, couch_replicator_stats:docs_read(Stats)},
+ {<<"docs_written">>, couch_replicator_stats:docs_written(Stats)},
+ {<<"doc_write_failures">>, couch_replicator_stats:doc_write_failures(Stats)}
+ ]
+ end,
+ % limit history to 50 entries
+ NewRepHistory = {
+ BaseHistory ++
+ [{<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)}]
+ },
+
+ try
+ {SrcRevPos, SrcRevId} = update_checkpoint(
+ Source, SourceLog#doc{body = NewRepHistory}, source),
+ {TgtRevPos, TgtRevId} = update_checkpoint(
+ Target, TargetLog#doc{body = NewRepHistory}, target),
+ NewState = State#rep_state{
+ checkpoint_history = NewRepHistory,
+ committed_seq = NewTsSeq,
+ source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}},
+ target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}}
+ },
+ update_task(NewState),
+ {ok, NewState}
+ catch throw:{checkpoint_commit_failure, _} = Failure ->
+ Failure
+ end;
+ {SrcInstanceStartTime, _NewTgtInstanceStartTime} ->
+ {checkpoint_commit_failure, <<"Target database out of sync. "
+ "Try to increase max_dbs_open at the target's server.">>};
+ {_NewSrcInstanceStartTime, TgtInstanceStartTime} ->
+ {checkpoint_commit_failure, <<"Source database out of sync. "
+ "Try to increase max_dbs_open at the source's server.">>};
+ {_NewSrcInstanceStartTime, _NewTgtInstanceStartTime} ->
+ {checkpoint_commit_failure, <<"Source and target databases out of "
+ "sync. Try to increase max_dbs_open at both servers.">>}
+ end.
+
+
+update_checkpoint(Db, Doc, DbType) ->
+ try
+ update_checkpoint(Db, Doc)
+ catch throw:{checkpoint_commit_failure, Reason} ->
+ throw({checkpoint_commit_failure,
+ <<"Error updating the ", (to_binary(DbType))/binary,
+ " checkpoint document: ", (to_binary(Reason))/binary>>})
+ end.
+
+
+update_checkpoint(Db, #doc{id = LogId, body = LogBody} = Doc) ->
+ try
+ case couch_replicator_api_wrap:update_doc(Db, Doc, [delay_commit]) of
+ {ok, PosRevId} ->
+ PosRevId;
+ {error, Reason} ->
+ throw({checkpoint_commit_failure, Reason})
+ end
+ catch throw:conflict ->
+ case (catch couch_replicator_api_wrap:open_doc(Db, LogId, [ejson_body])) of
+ {ok, #doc{body = LogBody, revs = {Pos, [RevId | _]}}} ->
+ % This means that we were able to update successfully the
+ % checkpoint doc in a previous attempt but we got a connection
+ % error (timeout for e.g.) before receiving the success response.
+ % Therefore the request was retried and we got a conflict, as the
+ % revision we sent is not the current one.
+ % We confirm this by verifying the doc body we just got is the same
+ % that we have just sent.
+ {Pos, RevId};
+ _ ->
+ throw({checkpoint_commit_failure, conflict})
+ end
+ end.
+
+
+commit_to_both(Source, Target) ->
+ % commit the src async
+ ParentPid = self(),
+ SrcCommitPid = spawn_link(
+ fun() ->
+ Result = (catch couch_replicator_api_wrap:ensure_full_commit(Source)),
+ ParentPid ! {self(), Result}
+ end),
+
+ % commit tgt sync
+ TargetResult = (catch couch_replicator_api_wrap:ensure_full_commit(Target)),
+
+ SourceResult = receive
+ {SrcCommitPid, Result} ->
+ unlink(SrcCommitPid),
+ receive {'EXIT', SrcCommitPid, _} -> ok after 0 -> ok end,
+ Result;
+ {'EXIT', SrcCommitPid, Reason} ->
+ {error, Reason}
+ end,
+ case TargetResult of
+ {ok, TargetStartTime} ->
+ case SourceResult of
+ {ok, SourceStartTime} ->
+ {SourceStartTime, TargetStartTime};
+ SourceError ->
+ {source_error, SourceError}
+ end;
+ TargetError ->
+ {target_error, TargetError}
+ end.
+
+
+compare_replication_logs(SrcDoc, TgtDoc) ->
+ #doc{body={RepRecProps}} = SrcDoc,
+ #doc{body={RepRecPropsTgt}} = TgtDoc,
+ case get_value(<<"session_id">>, RepRecProps) ==
+ get_value(<<"session_id">>, RepRecPropsTgt) of
+ true ->
+ % if the records have the same session id,
+ % then we have a valid replication history
+ OldSeqNum = get_value(<<"source_last_seq">>, RepRecProps, ?LOWEST_SEQ),
+ OldHistory = get_value(<<"history">>, RepRecProps, []),
+ {OldSeqNum, OldHistory};
+ false ->
+ SourceHistory = get_value(<<"history">>, RepRecProps, []),
+ TargetHistory = get_value(<<"history">>, RepRecPropsTgt, []),
+ couch_log:notice("Replication records differ. "
+ "Scanning histories to find a common ancestor.", []),
+ couch_log:debug("Record on source:~p~nRecord on target:~p~n",
+ [RepRecProps, RepRecPropsTgt]),
+ compare_rep_history(SourceHistory, TargetHistory)
+ end.
+
+
+compare_rep_history(S, T) when S =:= [] orelse T =:= [] ->
+ couch_log:notice("no common ancestry -- performing full replication", []),
+ {?LOWEST_SEQ, []};
+compare_rep_history([{S} | SourceRest], [{T} | TargetRest] = Target) ->
+ SourceId = get_value(<<"session_id">>, S),
+ case has_session_id(SourceId, Target) of
+ true ->
+ RecordSeqNum = get_value(<<"recorded_seq">>, S, ?LOWEST_SEQ),
+ couch_log:notice("found a common replication record with source_seq ~p",
+ [RecordSeqNum]),
+ {RecordSeqNum, SourceRest};
+ false ->
+ TargetId = get_value(<<"session_id">>, T),
+ case has_session_id(TargetId, SourceRest) of
+ true ->
+ RecordSeqNum = get_value(<<"recorded_seq">>, T, ?LOWEST_SEQ),
+ couch_log:notice("found a common replication record with source_seq ~p",
+ [RecordSeqNum]),
+ {RecordSeqNum, TargetRest};
+ false ->
+ compare_rep_history(SourceRest, TargetRest)
+ end
+ end.
+
+
+has_session_id(_SessionId, []) ->
+ false;
+has_session_id(SessionId, [{Props} | Rest]) ->
+ case get_value(<<"session_id">>, Props, nil) of
+ SessionId ->
+ true;
+ _Else ->
+ has_session_id(SessionId, Rest)
+ end.
+
+
+db_monitor(#db{} = Db) ->
+ couch_db:monitor(Db);
+db_monitor(_HttpDb) ->
+ nil.
+
+
+get_pending_count(St) ->
+ Rep = St#rep_state.rep_details,
+ Timeout = get_value(connection_timeout, Rep#rep.options),
+ TimeoutMicro = Timeout * 1000,
+ case get(pending_count_state) of
+ {LastUpdate, PendingCount} ->
+ case timer:now_diff(os:timestamp(), LastUpdate) > TimeoutMicro of
+ true ->
+ NewPendingCount = get_pending_count_int(St),
+ put(pending_count_state, {os:timestamp(), NewPendingCount}),
+ NewPendingCount;
+ false ->
+ PendingCount
+ end;
+ undefined ->
+ NewPendingCount = get_pending_count_int(St),
+ put(pending_count_state, {os:timestamp(), NewPendingCount}),
+ NewPendingCount
+ end.
+
+
+get_pending_count_int(#rep_state{source = #httpdb{} = Db0}=St) ->
+ {_, Seq} = St#rep_state.highest_seq_done,
+ Db = Db0#httpdb{retries = 3},
+ case (catch couch_replicator_api_wrap:get_pending_count(Db, Seq)) of
+ {ok, Pending} ->
+ Pending;
+ _ ->
+ null
+ end;
+get_pending_count_int(#rep_state{source = Db}=St) ->
+ {_, Seq} = St#rep_state.highest_seq_done,
+ {ok, Pending} = couch_replicator_api_wrap:get_pending_count(Db, Seq),
+ Pending.
+
+
+update_task(State) ->
+ #rep_state{
+ current_through_seq = {_, ThroughSeq},
+ highest_seq_done = {_, HighestSeq}
+ } = State,
+ couch_task_status:update(
+ rep_stats(State) ++ [
+ {source_seq, HighestSeq},
+ {through_seq, ThroughSeq}
+ ]).
+
+
+rep_stats(State) ->
+ #rep_state{
+ committed_seq = {_, CommittedSeq},
+ stats = Stats
+ } = State,
+ [
+ {revisions_checked, couch_replicator_stats:missing_checked(Stats)},
+ {missing_revisions_found, couch_replicator_stats:missing_found(Stats)},
+ {docs_read, couch_replicator_stats:docs_read(Stats)},
+ {docs_written, couch_replicator_stats:docs_written(Stats)},
+ {changes_pending, get_pending_count(State)},
+ {doc_write_failures, couch_replicator_stats:doc_write_failures(Stats)},
+ {checkpointed_source_seq, CommittedSeq}
+ ].
+
+
+replication_start_error({unauthorized, DbUri}) ->
+ {unauthorized, <<"unauthorized to access or create database ", DbUri/binary>>};
+replication_start_error({db_not_found, DbUri}) ->
+ {db_not_found, <<"could not open ", DbUri/binary>>};
+replication_start_error(Error) ->
+ Error.
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_sup.erl b/src/couch_replicator/src/couch_replicator_scheduler_sup.erl
new file mode 100644
index 000000000..8ab55f838
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_scheduler_sup.erl
@@ -0,0 +1,62 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_scheduler_sup).
+
+-behaviour(supervisor).
+
+%% public api
+-export([
+ start_link/0,
+ start_child/1,
+ terminate_child/1
+]).
+
+%% supervisor api
+-export([
+ init/1
+]).
+
+
+%% includes
+-include("couch_replicator.hrl").
+
+
+%% public functions
+
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+
+start_child(#rep{} = Rep) ->
+ supervisor:start_child(?MODULE, [Rep]).
+
+
+terminate_child(Pid) ->
+ supervisor:terminate_child(?MODULE, Pid).
+
+%% supervisor functions
+
+init(_Args) ->
+ Start = {couch_replicator_scheduler_job, start_link, []},
+ Restart = temporary, % A crashed job is not entitled to immediate restart.
+ Shutdown = 5000,
+ Type = worker,
+ Modules = [couch_replicator_scheduler_job],
+
+ RestartStrategy = simple_one_for_one,
+ MaxR = 10,
+ MaxT = 3,
+
+ ChildSpec =
+ {undefined, Start, Restart, Shutdown, Type, Modules},
+ {ok, {{RestartStrategy, MaxR, MaxT}, [ChildSpec]}}.