diff options
author | Robert Newson <rnewson@apache.org> | 2016-04-11 20:12:26 +0100 |
---|---|---|
committer | Nick Vatamaniuc <vatamane@apache.org> | 2017-04-28 17:35:50 -0400 |
commit | 9718b97cb87ba239409d13e72c5f369927322e0e (patch) | |
tree | 8d508b5fdebfd7778ed0fbf408162bb97da45d82 | |
parent | 350a67b3bc2372b4a88649768805deb896c86451 (diff) | |
download | couchdb-9718b97cb87ba239409d13e72c5f369927322e0e.tar.gz |
Introduce couch_replicator_scheduler
Scheduling replicator can run a large number of replication jobs by scheduling
them. It will periodically stop some jobs and start new ones. Jobs that fail
will be penalized with an exponential backoff.
Jira: COUCHDB-3324
4 files changed, 2476 insertions, 0 deletions
diff --git a/src/couch_replicator/src/couch_replicator_scheduler.erl b/src/couch_replicator/src/couch_replicator_scheduler.erl new file mode 100644 index 000000000..45d54b1a0 --- /dev/null +++ b/src/couch_replicator/src/couch_replicator_scheduler.erl @@ -0,0 +1,1430 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_replicator_scheduler). + +-behaviour(gen_server). +-behaviour(config_listener). + +-export([ + start_link/0 +]). + +-export([ + init/1, + terminate/2, + handle_call/3, + handle_info/2, + handle_cast/2, + code_change/3, + format_status/2 +]). + +-export([ + add_job/1, + remove_job/1, + reschedule/0, + rep_state/1, + find_jobs_by_dbname/1, + find_jobs_by_doc/2, + job_summary/2, + health_threshold/0, + jobs/0, + job/1 +]). + +%% config_listener callbacks +-export([ + handle_config_change/5, + handle_config_terminate/3 +]). + +%% for status updater process to allow hot code loading +-export([ + stats_updater_loop/1 +]). + +-include("couch_replicator_scheduler.hrl"). +-include("couch_replicator.hrl"). +-include("couch_replicator_api_wrap.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +%% types +-type event_type() :: added | started | stopped | {crashed, any()}. +-type event() :: {Type:: event_type(), When :: erlang:timestamp()}. +-type history() :: nonempty_list(event()). + +%% definitions +-define(MAX_BACKOFF_EXPONENT, 10). +-define(BACKOFF_INTERVAL_MICROS, 30 * 1000 * 1000). +-define(DEFAULT_HEALTH_THRESHOLD_SEC, 2 * 60). +-define(RELISTEN_DELAY, 5000). +-define(STATS_UPDATE_WAIT, 5000). + +-define(DEFAULT_MAX_JOBS, 500). +-define(DEFAULT_MAX_CHURN, 20). +-define(DEFAULT_MAX_HISTORY, 20). +-define(DEFAULT_SCHEDULER_INTERVAL, 60000). + + +-record(state, {interval, timer, max_jobs, max_churn, max_history, stats_pid}). +-record(job, { + id :: job_id() | '$1' | '_', + rep :: #rep{} | '_', + pid :: undefined | pid() | '$1' | '_', + monitor :: undefined | reference() | '_', + history :: history() | '_' +}). + +-record(stats_acc, { + pending_n = 0 :: non_neg_integer(), + running_n = 0 :: non_neg_integer(), + crashed_n = 0 :: non_neg_integer() +}). + + +%% public functions + +-spec start_link() -> {ok, pid()} | ignore | {error, term()}. +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + + +-spec add_job(#rep{}) -> ok. +add_job(#rep{} = Rep) when Rep#rep.id /= undefined -> + Job = #job{ + id = Rep#rep.id, + rep = Rep, + history = [{added, os:timestamp()}] + }, + gen_server:call(?MODULE, {add_job, Job}, infinity). + + +-spec remove_job(job_id()) -> ok. +remove_job(Id) -> + gen_server:call(?MODULE, {remove_job, Id}, infinity). + + +-spec reschedule() -> ok. +% Trigger a manual reschedule. Used for testing and/or ops. +reschedule() -> + gen_server:call(?MODULE, reschedule, infinity). + + +-spec rep_state(rep_id()) -> #rep{} | nil. +rep_state(RepId) -> + case (catch ets:lookup_element(?MODULE, RepId, #job.rep)) of + {'EXIT',{badarg, _}} -> + nil; + Rep -> + Rep + end. + + +-spec job_summary(job_id(), non_neg_integer()) -> [_] | nil. +job_summary(JobId, HealthThreshold) -> + case job_by_id(JobId) of + {ok, #job{pid = Pid, history = History, rep = Rep}} -> + ErrorCount = consecutive_crashes(History, HealthThreshold), + {State, Info} = case {Pid, ErrorCount} of + {undefined, 0} -> + {pending, null}; + {undefined, ErrorCount} when ErrorCount > 0 -> + [{{crashed, Error}, _When} | _] = History, + ErrMsg = couch_replicator_utils:rep_error_to_binary(Error), + {crashing, ErrMsg}; + {Pid, ErrorCount} when is_pid(Pid) -> + {running, null} + end, + [ + {source, iolist_to_binary(ejson_url(Rep#rep.source))}, + {target, iolist_to_binary(ejson_url(Rep#rep.target))}, + {state, State}, + {info, Info}, + {error_count, ErrorCount}, + {last_updated, last_updated(History)}, + {start_time, + couch_replicator_utils:iso8601(Rep#rep.start_time)}, + {proxy, job_proxy_url(Rep#rep.source)} + ]; + {error, not_found} -> + nil % Job might have just completed + end. + + +job_proxy_url(#httpdb{proxy_url = ProxyUrl}) when is_list(ProxyUrl) -> + list_to_binary(couch_util:url_strip_password(ProxyUrl)); +job_proxy_url(_Endpoint) -> + null. + + +% Health threshold is the minimum amount of time an unhealthy job should run +% crashing before it is considered to be healthy again. HealtThreashold should +% not be 0 as jobs could start and immediately crash, and it shouldn't be +% infinity, since then consecutive crashes would accumulate forever even if +% job is back to normal. +-spec health_threshold() -> non_neg_integer(). +health_threshold() -> + config:get_integer("replicator", "health_threshold", + ?DEFAULT_HEALTH_THRESHOLD_SEC). + + +-spec find_jobs_by_dbname(binary()) -> list(#rep{}). +find_jobs_by_dbname(DbName) -> + Rep = #rep{db_name = DbName, _ = '_'}, + MatchSpec = #job{id = '$1', rep = Rep, _ = '_'}, + [RepId || [RepId] <- ets:match(?MODULE, MatchSpec)]. + + +-spec find_jobs_by_doc(binary(), binary()) -> list(#rep{}). +find_jobs_by_doc(DbName, DocId) -> + Rep = #rep{db_name = DbName, doc_id = DocId, _ = '_'}, + MatchSpec = #job{id = '$1', rep = Rep, _ = '_'}, + [RepId || [RepId] <- ets:match(?MODULE, MatchSpec)]. + + +%% gen_server functions + +init(_) -> + EtsOpts = [named_table, {read_concurrency, true}, {keypos, #job.id}], + ?MODULE = ets:new(?MODULE, EtsOpts), + ok = config:listen_for_changes(?MODULE, nil), + Interval = config:get_integer("replicator", "interval", + ?DEFAULT_SCHEDULER_INTERVAL), + MaxJobs = config:get_integer("replicator", "max_jobs", ?DEFAULT_MAX_JOBS), + MaxChurn = config:get_integer("replicator", "max_churn", + ?DEFAULT_MAX_CHURN), + MaxHistory = config:get_integer("replicator", "max_history", + ?DEFAULT_MAX_HISTORY), + Timer = erlang:send_after(Interval, self(), reschedule), + State = #state{ + interval = Interval, + max_jobs = MaxJobs, + max_churn = MaxChurn, + max_history = MaxHistory, + timer = Timer, + stats_pid = start_stats_updater() + }, + {ok, State}. + + +handle_call({add_job, Job}, _From, State) -> + ok = maybe_remove_job_int(Job#job.id, State), + true = add_job_int(Job), + ok = maybe_start_newly_added_job(Job, State), + couch_stats:increment_counter([couch_replicator, jobs, adds]), + TotalJobs = ets:info(?MODULE, size), + couch_stats:update_gauge([couch_replicator, jobs, total], TotalJobs), + {reply, ok, State}; + +handle_call({remove_job, Id}, _From, State) -> + ok = maybe_remove_job_int(Id, State), + {reply, ok, State}; + +handle_call(reschedule, _From, State) -> + ok = reschedule(State), + {reply, ok, State}; + +handle_call(_, _From, State) -> + {noreply, State}. + + +handle_cast({set_max_jobs, MaxJobs}, State) when is_integer(MaxJobs), + MaxJobs >= 0 -> + couch_log:notice("~p: max_jobs set to ~B", [?MODULE, MaxJobs]), + {noreply, State#state{max_jobs = MaxJobs}}; + +handle_cast({set_max_churn, MaxChurn}, State) when is_integer(MaxChurn), + MaxChurn > 0 -> + couch_log:notice("~p: max_churn set to ~B", [?MODULE, MaxChurn]), + {noreply, State#state{max_churn = MaxChurn}}; + +handle_cast({set_max_history, MaxHistory}, State) when is_integer(MaxHistory), + MaxHistory > 0 -> + couch_log:notice("~p: max_history set to ~B", [?MODULE, MaxHistory]), + {noreply, State#state{max_history = MaxHistory}}; + +handle_cast({set_interval, Interval}, State) when is_integer(Interval), + Interval > 0 -> + couch_log:notice("~p: interval set to ~B", [?MODULE, Interval]), + {noreply, State#state{interval = Interval}}; + +handle_cast(_, State) -> + {noreply, State}. + + +handle_info(reschedule, State) -> + ok = reschedule(State), + erlang:cancel_timer(State#state.timer), + Timer = erlang:send_after(State#state.interval, self(), reschedule), + {noreply, State#state{timer = Timer}}; + +handle_info({'DOWN', _Ref, process, Pid, normal}, State) -> + {ok, Job} = job_by_pid(Pid), + couch_log:notice("~p: Job ~p completed normally", [?MODULE, Job#job.id]), + remove_job_int(Job), + update_running_jobs_stats(State#state.stats_pid), + {noreply, State}; + +handle_info({'DOWN', _Ref, process, Pid, Reason}, State) -> + {ok, Job} = job_by_pid(Pid), + ok = handle_crashed_job(Job, Reason, State), + {noreply, State}; + +handle_info(restart_config_listener, State) -> + ok = config:listen_for_changes(?MODULE, nil), + {noreply, State}; + +handle_info(_, State) -> + {noreply, State}. + + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + +terminate(_Reason, _State) -> + ok. + + +format_status(_Opt, [_PDict, State]) -> + [ + {max_jobs, State#state.max_jobs}, + {running_jobs, running_job_count()}, + {pending_jobs, pending_job_count()} + ]. + + +%% config listener functions + +handle_config_change("replicator", "max_jobs", V, _, S) -> + ok = gen_server:cast(?MODULE, {set_max_jobs, list_to_integer(V)}), + {ok, S}; + +handle_config_change("replicator", "max_churn", V, _, S) -> + ok = gen_server:cast(?MODULE, {set_max_churn, list_to_integer(V)}), + {ok, S}; + +handle_config_change("replicator", "interval", V, _, S) -> + ok = gen_server:cast(?MODULE, {set_interval, list_to_integer(V)}), + {ok, S}; + +handle_config_change("replicator", "max_history", V, _, S) -> + ok = gen_server:cast(?MODULE, {set_max_history, list_to_integer(V)}), + {ok, S}; + +handle_config_change(_, _, _, _, S) -> + {ok, S}. + + +handle_config_terminate(_, stop, _) -> + ok; + +handle_config_terminate(_, _, _) -> + Pid = whereis(?MODULE), + erlang:send_after(?RELISTEN_DELAY, Pid, restart_config_listener). + + +%% Private functions + +% Handle crashed jobs. Handling differs between transient and permanent jobs. +% Transient jobs are those posted to the _replicate endpoint. They don't have a +% db associated with them. When those jobs crash, they are not restarted. That +% is also consistent with behavior when the node they run on, crashed and they +% do not migrate to other nodes. Permanent jobs are those created from +% replicator documents. Those jobs, once they pass basic validation and end up +% in the scheduler will be retried indefinitely (with appropriate exponential +% backoffs). +-spec handle_crashed_job(#job{}, any(), #state{}) -> ok. +handle_crashed_job(#job{rep = #rep{db_name = null}} = Job, Reason, State) -> + Msg = "~p : Transient job ~p failed, removing. Error: ~p", + ErrorBinary = couch_replicator_utils:rep_error_to_binary(Reason), + couch_log:error(Msg, [?MODULE, Job#job.id, ErrorBinary]), + remove_job_int(Job), + update_running_jobs_stats(State#state.stats_pid), + ok; + +handle_crashed_job(Job, Reason, State) -> + ok = update_state_crashed(Job, Reason, State), + case couch_replicator_doc_processor:update_docs() of + true -> + couch_replicator_docs:update_error(Job#job.rep, Reason); + false -> + ok + end, + case ets:info(?MODULE, size) < State#state.max_jobs of + true -> + % Starting pending jobs is an O(TotalJobsCount) operation. Only do + % it if there is a relatively small number of jobs. Otherwise + % scheduler could be blocked if there is a cascade of lots failing + % jobs in a row. + start_pending_jobs(State), + update_running_jobs_stats(State#state.stats_pid), + ok; + false -> + ok + end. + + +% Attempt to start a newly added job. First quickly check if total jobs +% already exceed max jobs, then do a more expensive check which runs a +% select (an O(n) operation) to check pending jobs specifically. +-spec maybe_start_newly_added_job(#job{}, #state{}) -> ok. +maybe_start_newly_added_job(Job, State) -> + MaxJobs = State#state.max_jobs, + TotalJobs = ets:info(?MODULE, size), + case TotalJobs < MaxJobs andalso running_job_count() < MaxJobs of + true -> + start_job_int(Job, State), + update_running_jobs_stats(State#state.stats_pid), + ok; + false -> + ok + end. + + +% Return up to a given number of oldest, not recently crashed jobs. Try to be +% memory efficient and use ets:foldl to accumulate jobs. +-spec pending_jobs(non_neg_integer()) -> [#job{}]. +pending_jobs(0) -> + % Handle this case as user could set max_churn to 0. If this is passed to + % other function clause it will crash as gb_sets:largest assumes set is not + % empty. + []; + +pending_jobs(Count) when is_integer(Count), Count > 0 -> + Set0 = gb_sets:new(), % [{LastStart, Job},...] + Now = os:timestamp(), + Acc0 = {Set0, Now, Count, health_threshold()}, + {Set1, _, _, _} = ets:foldl(fun pending_fold/2, Acc0, ?MODULE), + [Job || {_Started, Job} <- gb_sets:to_list(Set1)]. + + +pending_fold(Job, {Set, Now, Count, HealthThreshold}) -> + Set1 = case {not_recently_crashed(Job, Now, HealthThreshold), + gb_sets:size(Set) >= Count} of + {true, true} -> + % Job is healthy but already reached accumulated limit, so might + % have to replace one of the accumulated jobs + pending_maybe_replace(Job, Set); + {true, false} -> + % Job is healthy and we haven't reached the limit, so add job + % to accumulator + gb_sets:add_element({last_started(Job), Job}, Set); + {false, _} -> + % This job is not healthy (has crashed too recently), so skip it. + Set + end, + {Set1, Now, Count, HealthThreshold}. + + +% Replace Job in the accumulator if it is older than youngest job there. +% "oldest" here means one which has been waiting to run the longest. "youngest" +% means the one with most recent activity. The goal is to keep up to Count +% oldest jobs during iteration. For example if there are jobs with these times +% accumulated so far [5, 7, 11], and start time of current job is 6. Then +% 6 < 11 is true, so 11 (youngest) is dropped and 6 inserted resulting in +% [5, 6, 7]. In the end the result might look like [1, 2, 5], for example. +pending_maybe_replace(Job, Set) -> + Started = last_started(Job), + {Youngest, YoungestJob} = gb_sets:largest(Set), + case Started < Youngest of + true -> + Set1 = gb_sets:delete({Youngest, YoungestJob}, Set), + gb_sets:add_element({Started, Job}, Set1); + false -> + Set + end. + + +start_jobs(Count, State) -> + [start_job_int(Job, State) || Job <- pending_jobs(Count)], + ok. + + +-spec stop_jobs(non_neg_integer(), boolean(), #state{}) -> non_neg_integer(). +stop_jobs(Count, IsContinuous, State) -> + Running0 = running_jobs(), + ContinuousPred = fun(Job) -> is_continuous(Job) =:= IsContinuous end, + Running1 = lists:filter(ContinuousPred, Running0), + Running2 = lists:sort(fun longest_running/2, Running1), + Running3 = lists:sublist(Running2, Count), + length([stop_job_int(Job, State) || Job <- Running3]). + + +longest_running(#job{} = A, #job{} = B) -> + last_started(A) =< last_started(B). + + +not_recently_crashed(#job{history = History}, Now, HealthThreshold) -> + case History of + [{added, _When}] -> + true; + [{stopped, _When} | _] -> + true; + _ -> + LatestCrashT = latest_crash_timestamp(History), + CrashCount = consecutive_crashes(History, HealthThreshold), + timer:now_diff(Now, LatestCrashT) >= backoff_micros(CrashCount) + end. + + +% Count consecutive crashes. A crash happens when there is a `crashed` event +% within a short period of time (configurable) after any other event. It could +% be `crashed, started` for jobs crashing quickly after starting, `crashed, +% crashed`, `crashed, stopped` if job repeatedly failed to start +% being stopped. Or it could be `crashed, added` if it crashed immediately after +% being added during start. +% +% A streak of "consecutive crashes" ends when a crashed event is seen starting +% and running successfully without crashing for a period of time. That period +% of time is the HealthThreshold. +% + +-spec consecutive_crashes(history(), non_neg_integer()) -> non_neg_integer(). +consecutive_crashes(History, HealthThreshold) when is_list(History) -> + consecutive_crashes(History, HealthThreshold, 0). + + +-spec consecutive_crashes(history(), non_neg_integer(), non_neg_integer()) -> + non_neg_integer(). +consecutive_crashes([], _HealthThreashold, Count) -> + Count; + +consecutive_crashes([{{crashed, _}, CrashT}, {_, PrevT} = PrevEvent | Rest], + HealthThreshold, Count) -> + case timer:now_diff(CrashT, PrevT) > HealthThreshold * 1000000 of + true -> + Count; + false -> + consecutive_crashes([PrevEvent | Rest], HealthThreshold, Count + 1) + end; + +consecutive_crashes([{stopped, _}, {started, _} | _], _HealthThreshold, + Count) -> + Count; + +consecutive_crashes([_ | Rest], HealthThreshold, Count) -> + consecutive_crashes(Rest, HealthThreshold, Count). + + +-spec latest_crash_timestamp(history()) -> erlang:timestamp(). +latest_crash_timestamp([]) -> + {0, 0, 0}; % Used to avoid special-casing "no crash" when doing now_diff + +latest_crash_timestamp([{{crashed, _Reason}, When} | _]) -> + When; + +latest_crash_timestamp([_Event | Rest]) -> + latest_crash_timestamp(Rest). + + +-spec backoff_micros(non_neg_integer()) -> non_neg_integer(). +backoff_micros(CrashCount) -> + % When calculating the backoff interval treat consecutive crash count as the + % exponent in Base * 2 ^ CrashCount to achieve an exponential backoff + % doubling every consecutive failure, starting with the base value of + % ?BACKOFF_INTERVAL_MICROS. + BackoffExp = erlang:min(CrashCount - 1, ?MAX_BACKOFF_EXPONENT), + (1 bsl BackoffExp) * ?BACKOFF_INTERVAL_MICROS. + + +-spec add_job_int(#job{}) -> boolean(). +add_job_int(#job{} = Job) -> + ets:insert_new(?MODULE, Job). + + +-spec maybe_remove_job_int(job_id(), #state{}) -> ok. +maybe_remove_job_int(JobId, State) -> + case job_by_id(JobId) of + {ok, Job} -> + ok = stop_job_int(Job, State), + true = remove_job_int(Job), + couch_stats:increment_counter([couch_replicator, jobs, removes]), + TotalJobs = ets:info(?MODULE, size), + couch_stats:update_gauge([couch_replicator, jobs, total], + TotalJobs), + update_running_jobs_stats(State#state.stats_pid), + ok; + {error, not_found} -> + ok + end. + + +start_job_int(#job{pid = Pid}, _State) when Pid /= undefined -> + ok; + +start_job_int(#job{} = Job0, State) -> + Job = maybe_optimize_job_for_rate_limiting(Job0), + case couch_replicator_scheduler_sup:start_child(Job#job.rep) of + {ok, Child} -> + Ref = monitor(process, Child), + ok = update_state_started(Job, Child, Ref, State), + couch_log:notice("~p: Job ~p started as ~p", + [?MODULE, Job#job.id, Child]); + {error, {already_started, OtherPid}} when node(OtherPid) =:= node() -> + Ref = monitor(process, OtherPid), + ok = update_state_started(Job, OtherPid, Ref, State), + couch_log:notice("~p: Job ~p already running as ~p. Most likely" + " because replicator scheduler was restarted", + [?MODULE, Job#job.id, OtherPid]); + {error, {already_started, OtherPid}} when node(OtherPid) =/= node() -> + CrashMsg = "Duplicate replication running on another node", + couch_log:notice("~p: Job ~p already running as ~p. Most likely" + " because a duplicate replication is running on another node", + [?MODULE, Job#job.id, OtherPid]), + ok = update_state_crashed(Job, CrashMsg, State); + {error, Reason} -> + couch_log:notice("~p: Job ~p failed to start for reason ~p", + [?MODULE, Job, Reason]), + ok = update_state_crashed(Job, Reason, State) + end. + + +-spec stop_job_int(#job{}, #state{}) -> ok | {error, term()}. +stop_job_int(#job{pid = undefined}, _State) -> + ok; + +stop_job_int(#job{} = Job, State) -> + ok = couch_replicator_scheduler_sup:terminate_child(Job#job.pid), + demonitor(Job#job.monitor, [flush]), + ok = update_state_stopped(Job, State), + couch_log:notice("~p: Job ~p stopped as ~p", + [?MODULE, Job#job.id, Job#job.pid]). + + +-spec remove_job_int(#job{}) -> true. +remove_job_int(#job{} = Job) -> + ets:delete(?MODULE, Job#job.id). + + +-spec running_job_count() -> non_neg_integer(). +running_job_count() -> + ets:info(?MODULE, size) - pending_job_count(). + + +-spec running_jobs() -> [#job{}]. +running_jobs() -> + ets:select(?MODULE, [{#job{pid = '$1', _='_'}, [{is_pid, '$1'}], ['$_']}]). + + +-spec pending_job_count() -> non_neg_integer(). +pending_job_count() -> + ets:select_count(?MODULE, [{#job{pid=undefined, _='_'}, [], [true]}]). + + +-spec job_by_pid(pid()) -> {ok, #job{}} | {error, not_found}. +job_by_pid(Pid) when is_pid(Pid) -> + case ets:match_object(?MODULE, #job{pid=Pid, _='_'}) of + [] -> + {error, not_found}; + [#job{}=Job] -> + {ok, Job} + end. + + +-spec job_by_id(job_id()) -> {ok, #job{}} | {error, not_found}. +job_by_id(Id) -> + case ets:lookup(?MODULE, Id) of + [] -> + {error, not_found}; + [#job{}=Job] -> + {ok, Job} + end. + + +-spec update_state_stopped(#job{}, #state{}) -> ok. +update_state_stopped(Job, State) -> + Job1 = reset_job_process(Job), + Job2 = update_history(Job1, stopped, os:timestamp(), State), + true = ets:insert(?MODULE, Job2), + couch_stats:increment_counter([couch_replicator, jobs, stops]), + ok. + + +-spec update_state_started(#job{}, pid(), reference(), #state{}) -> ok. +update_state_started(Job, Pid, Ref, State) -> + Job1 = set_job_process(Job, Pid, Ref), + Job2 = update_history(Job1, started, os:timestamp(), State), + true = ets:insert(?MODULE, Job2), + couch_stats:increment_counter([couch_replicator, jobs, starts]), + ok. + + +-spec update_state_crashed(#job{}, any(), #state{}) -> ok. +update_state_crashed(Job, Reason, State) -> + Job1 = reset_job_process(Job), + Job2 = update_history(Job1, {crashed, Reason}, os:timestamp(), State), + true = ets:insert(?MODULE, Job2), + couch_stats:increment_counter([couch_replicator, jobs, crashes]), + ok. + + +-spec set_job_process(#job{}, pid(), reference()) -> #job{}. +set_job_process(#job{} = Job, Pid, Ref) when is_pid(Pid), is_reference(Ref) -> + Job#job{pid = Pid, monitor = Ref}. + + +-spec reset_job_process(#job{}) -> #job{}. +reset_job_process(#job{} = Job) -> + Job#job{pid = undefined, monitor = undefined}. + + +-spec reschedule(#state{}) -> ok. +reschedule(State) -> + Running = running_job_count(), + Pending = pending_job_count(), + stop_excess_jobs(State, Running), + start_pending_jobs(State, Running, Pending), + rotate_jobs(State, Running, Pending), + update_running_jobs_stats(State#state.stats_pid), + ok. + + +-spec stop_excess_jobs(#state{}, non_neg_integer()) -> ok. +stop_excess_jobs(State, Running) -> + #state{max_jobs=MaxJobs} = State, + StopCount = Running - MaxJobs, + if StopCount =< 0 -> ok; true -> + Stopped = stop_jobs(StopCount, true, State), + OneshotLeft = StopCount - Stopped, + if OneshotLeft =< 0 -> ok; true -> + stop_jobs(OneshotLeft, false, State), + ok + end + end. + + +start_pending_jobs(State) -> + start_pending_jobs(State, running_job_count(), pending_job_count()). + + +start_pending_jobs(State, Running, Pending) -> + #state{max_jobs=MaxJobs} = State, + if Running < MaxJobs, Pending > 0 -> + start_jobs(MaxJobs - Running, State); + true -> + ok + end. + + +-spec rotate_jobs(#state{}, non_neg_integer(), non_neg_integer()) -> ok. +rotate_jobs(State, Running, Pending) -> + #state{max_jobs=MaxJobs, max_churn=MaxChurn} = State, + if Running == MaxJobs, Pending > 0 -> + RotateCount = lists:min([Pending, Running, MaxChurn]), + StopCount = stop_jobs(RotateCount, true, State), + start_jobs(StopCount, State); + true -> + ok + end. + + +-spec last_started(#job{}) -> erlang:timestamp(). +last_started(#job{} = Job) -> + case lists:keyfind(started, 1, Job#job.history) of + false -> + {0, 0, 0}; + {started, When} -> + When + end. + + +-spec update_history(#job{}, event_type(), erlang:timestamp(), #state{}) -> + #job{}. +update_history(Job, Type, When, State) -> + History0 = [{Type, When} | Job#job.history], + History1 = lists:sublist(History0, State#state.max_history), + Job#job{history = History1}. + + +-spec ejson_url(#httpdb{} | binary()) -> binary(). +ejson_url(#httpdb{}=Httpdb) -> + couch_util:url_strip_password(Httpdb#httpdb.url); +ejson_url(DbName) when is_binary(DbName) -> + DbName. + + +-spec job_ejson(#job{}) -> {[_ | _]}. +job_ejson(Job) -> + Rep = Job#job.rep, + Source = ejson_url(Rep#rep.source), + Target = ejson_url(Rep#rep.target), + History = lists:map(fun({Type, When}) -> + EventProps = case Type of + {crashed, Reason} -> + [{type, crashed}, {reason, crash_reason_json(Reason)}]; + Type -> + [{type, Type}] + end, + {[{timestamp, couch_replicator_utils:iso8601(When)} | EventProps]} + end, Job#job.history), + {BaseID, Ext} = Job#job.id, + Pid = case Job#job.pid of + undefined -> + null; + P when is_pid(P) -> + ?l2b(pid_to_list(P)) + end, + {[ + {id, iolist_to_binary([BaseID, Ext])}, + {pid, Pid}, + {source, iolist_to_binary(Source)}, + {target, iolist_to_binary(Target)}, + {database, Rep#rep.db_name}, + {user, (Rep#rep.user_ctx)#user_ctx.name}, + {doc_id, Rep#rep.doc_id}, + {history, History}, + {node, node()}, + {start_time, couch_replicator_utils:iso8601(Rep#rep.start_time)} + ]}. + + +-spec jobs() -> [[tuple()]]. +jobs() -> + ets:foldl(fun(Job, Acc) -> [job_ejson(Job) | Acc] end, [], ?MODULE). + + +-spec job(job_id()) -> {ok, {[_ | _]}} | {error, not_found}. +job(JobId) -> + case job_by_id(JobId) of + {ok, Job} -> + {ok, job_ejson(Job)}; + Error -> + Error + end. + + +crash_reason_json({_CrashType, Info}) when is_binary(Info) -> + Info; +crash_reason_json(Reason) when is_binary(Reason) -> + Reason; +crash_reason_json(Error) -> + couch_replicator_utils:rep_error_to_binary(Error). + + +-spec last_updated([_]) -> binary(). +last_updated([{_Type, When} | _]) -> + couch_replicator_utils:iso8601(When). + + +-spec is_continuous(#job{}) -> boolean(). +is_continuous(#job{rep = Rep}) -> + couch_util:get_value(continuous, Rep#rep.options, false). + + +% If job crashed last time because it was rate limited, try to +% optimize some options to help the job make progress. +-spec maybe_optimize_job_for_rate_limiting(#job{}) -> #job{}. +maybe_optimize_job_for_rate_limiting(Job = #job{history = + [{{crashed, {shutdown, max_backoff}}, _} | _]}) -> + Opts = [ + {checkpoint_interval, 5000}, + {worker_processes, 2}, + {worker_batch_size, 100}, + {http_connections, 5} + ], + Rep = lists:foldl(fun optimize_int_option/2, Job#job.rep, Opts), + Job#job{rep = Rep}; +maybe_optimize_job_for_rate_limiting(Job) -> + Job. + + +-spec optimize_int_option({atom(), any()}, #rep{}) -> #rep{}. +optimize_int_option({Key, Val}, #rep{options = Options} = Rep) -> + case couch_util:get_value(Key, Options) of + CurVal when is_integer(CurVal), CurVal > Val -> + Msg = "~p replication ~p : setting ~p = ~p due to rate limiting", + couch_log:warning(Msg, [?MODULE, Rep#rep.id, Key, Val]), + Options1 = lists:keyreplace(Key, 1, Options, {Key, Val}), + Rep#rep{options = Options1}; + _ -> + Rep + end. + + +% Updater is a separate process. It receives `update_stats` messages and +% updates scheduler stats from the scheduler jobs table. Updates are +% performed no more frequently than once per ?STATS_UPDATE_WAIT milliseconds. + +update_running_jobs_stats(StatsPid) when is_pid(StatsPid) -> + StatsPid ! update_stats, + ok. + + +start_stats_updater() -> + erlang:spawn_link(?MODULE, stats_updater_loop, [undefined]). + + +stats_updater_loop(Timer) -> + receive + update_stats when Timer == undefined -> + TRef = erlang:send_after(?STATS_UPDATE_WAIT, self(), refresh_stats), + ?MODULE:stats_updater_loop(TRef); + update_stats when is_reference(Timer) -> + ?MODULE:stats_updater_loop(Timer); + refresh_stats -> + ok = stats_updater_refresh(), + ?MODULE:stats_updater_loop(undefined); + Else -> + erlang:exit({stats_updater_bad_msg, Else}) + end. + + +-spec stats_updater_refresh() -> ok. +stats_updater_refresh() -> + #stats_acc{ + pending_n = PendingN, + running_n = RunningN, + crashed_n = CrashedN + } = ets:foldl(fun stats_fold/2, #stats_acc{}, ?MODULE), + couch_stats:update_gauge([couch_replicator, jobs, pending], PendingN), + couch_stats:update_gauge([couch_replicator, jobs, running], RunningN), + couch_stats:update_gauge([couch_replicator, jobs, crashed], CrashedN), + ok. + + +-spec stats_fold(#job{}, #stats_acc{}) -> #stats_acc{}. +stats_fold(#job{pid = undefined, history = [{added, _}]}, Acc) -> + Acc#stats_acc{pending_n = Acc#stats_acc.pending_n + 1}; +stats_fold(#job{pid = undefined, history = [{stopped, _} | _]}, Acc) -> + Acc#stats_acc{pending_n = Acc#stats_acc.pending_n + 1}; +stats_fold(#job{pid = undefined, history = [{{crashed, _}, _} | _]}, Acc) -> + Acc#stats_acc{crashed_n =Acc#stats_acc.crashed_n + 1}; +stats_fold(#job{pid = P, history = [{started, _} | _]}, Acc) when is_pid(P) -> + Acc#stats_acc{running_n = Acc#stats_acc.running_n + 1}. + + + +-ifdef(TEST). + +-include_lib("eunit/include/eunit.hrl"). + + +backoff_micros_test_() -> + BaseInterval = ?BACKOFF_INTERVAL_MICROS, + [?_assertEqual(R * BaseInterval, backoff_micros(N)) || {R, N} <- [ + {1, 1}, {2, 2}, {4, 3}, {8, 4}, {16, 5}, {32, 6}, {64, 7}, {128, 8}, + {256, 9}, {512, 10}, {1024, 11}, {1024, 12} + ]]. + + +consecutive_crashes_test_() -> + Threshold = ?DEFAULT_HEALTH_THRESHOLD_SEC, + [?_assertEqual(R, consecutive_crashes(H, Threshold)) || {R, H} <- [ + {0, []}, + {0, [added()]}, + {0, [stopped()]}, + {0, [crashed()]}, + {1, [crashed(), added()]}, + {1, [crashed(), crashed()]}, + {1, [crashed(), stopped()]}, + {3, [crashed(), crashed(), crashed(), added()]}, + {2, [crashed(), crashed(), stopped()]}, + {1, [crashed(), started(), added()]}, + {2, [crashed(3), started(2), crashed(1), started(0)]}, + {0, [stopped(3), started(2), crashed(1), started(0)]}, + {1, [crashed(3), started(2), stopped(1), started(0)]}, + {0, [crashed(999), started(0)]}, + {1, [crashed(999), started(998), crashed(997), started(0)]} + ]]. + + +consecutive_crashes_non_default_threshold_test_() -> + [?_assertEqual(R, consecutive_crashes(H, T)) || {R, H, T} <- [ + {0, [crashed(11), started(0)], 10}, + {1, [crashed(10), started(0)], 10} + ]]. + + +latest_crash_timestamp_test_() -> + [?_assertEqual({0, R, 0}, latest_crash_timestamp(H)) || {R, H} <- [ + {0, [added()]}, + {1, [crashed(1)]}, + {3, [crashed(3), started(2), crashed(1), started(0)]}, + {1, [started(3), stopped(2), crashed(1), started(0)]} + ]]. + + +last_started_test_() -> + [?_assertEqual({0, R, 0}, last_started(testjob(H))) || {R, H} <- [ + {0, [added()]}, + {0, [crashed(1)]}, + {1, [started(1)]}, + {1, [added(), started(1)]}, + {2, [started(2), started(1)]}, + {2, [crashed(3), started(2), started(1)]} + ]]. + + +longest_running_test() -> + J0 = testjob([crashed()]), + J1 = testjob([started(1)]), + J2 = testjob([started(2)]), + Sort = fun(Jobs) -> lists:sort(fun longest_running/2, Jobs) end, + ?assertEqual([], Sort([])), + ?assertEqual([J1], Sort([J1])), + ?assertEqual([J1, J2], Sort([J2, J1])), + ?assertEqual([J0, J1, J2], Sort([J2, J1, J0])). + + +scheduler_test_() -> + { + foreach, + fun setup/0, + fun teardown/1, + [ + t_pending_jobs_simple(), + t_pending_jobs_skip_crashed(), + t_one_job_starts(), + t_no_jobs_start_if_max_is_0(), + t_one_job_starts_if_max_is_1(), + t_max_churn_does_not_throttle_initial_start(), + t_excess_oneshot_only_jobs(), + t_excess_continuous_only_jobs(), + t_excess_prefer_continuous_first(), + t_stop_oldest_first(), + t_start_oldest_first(), + t_dont_stop_if_nothing_pending(), + t_max_churn_limits_number_of_rotated_jobs(), + t_if_pending_less_than_running_start_all_pending(), + t_running_less_than_pending_swap_all_running(), + t_oneshot_dont_get_rotated(), + t_rotate_continuous_only_if_mixed(), + t_oneshot_dont_get_starting_priority(), + t_oneshot_will_hog_the_scheduler(), + t_if_excess_is_trimmed_rotation_doesnt_happen(), + t_if_transient_job_crashes_it_gets_removed(), + t_if_permanent_job_crashes_it_stays_in_ets() + ] + }. + + +t_pending_jobs_simple() -> + ?_test(begin + Job1 = oneshot(1), + Job2 = oneshot(2), + setup_jobs([Job2, Job1]), + ?assertEqual([], pending_jobs(0)), + ?assertEqual([Job1], pending_jobs(1)), + ?assertEqual([Job1, Job2], pending_jobs(2)), + ?assertEqual([Job1, Job2], pending_jobs(3)) + end). + + +t_pending_jobs_skip_crashed() -> + ?_test(begin + Job = oneshot(1), + Ts = os:timestamp(), + History = [crashed(Ts), started(Ts) | Job#job.history], + Job1 = Job#job{history = History}, + Job2 = oneshot(2), + Job3 = oneshot(3), + setup_jobs([Job2, Job1, Job3]), + ?assertEqual([Job2], pending_jobs(1)), + ?assertEqual([Job2, Job3], pending_jobs(2)), + ?assertEqual([Job2, Job3], pending_jobs(3)) + end). + + +t_one_job_starts() -> + ?_test(begin + setup_jobs([oneshot(1)]), + ?assertEqual({0, 1}, run_stop_count()), + reschedule(mock_state(?DEFAULT_MAX_JOBS)), + ?assertEqual({1, 0}, run_stop_count()) + end). + + +t_no_jobs_start_if_max_is_0() -> + ?_test(begin + setup_jobs([oneshot(1)]), + reschedule(mock_state(0)), + ?assertEqual({0, 1}, run_stop_count()) + end). + + +t_one_job_starts_if_max_is_1() -> + ?_test(begin + setup_jobs([oneshot(1), oneshot(2)]), + reschedule(mock_state(1)), + ?assertEqual({1, 1}, run_stop_count()) + end). + + +t_max_churn_does_not_throttle_initial_start() -> + ?_test(begin + setup_jobs([oneshot(1), oneshot(2)]), + reschedule(mock_state(?DEFAULT_MAX_JOBS, 0)), + ?assertEqual({2, 0}, run_stop_count()) + end). + + +t_excess_oneshot_only_jobs() -> + ?_test(begin + setup_jobs([oneshot_running(1), oneshot_running(2)]), + ?assertEqual({2, 0}, run_stop_count()), + reschedule(mock_state(1)), + ?assertEqual({1, 1}, run_stop_count()), + reschedule(mock_state(0)), + ?assertEqual({0, 2}, run_stop_count()) + end). + + +t_excess_continuous_only_jobs() -> + ?_test(begin + setup_jobs([continuous_running(1), continuous_running(2)]), + ?assertEqual({2, 0}, run_stop_count()), + reschedule(mock_state(1)), + ?assertEqual({1, 1}, run_stop_count()), + reschedule(mock_state(0)), + ?assertEqual({0, 2}, run_stop_count()) + end). + + +t_excess_prefer_continuous_first() -> + ?_test(begin + Jobs = [ + continuous_running(1), + oneshot_running(2), + continuous_running(3) + ], + setup_jobs(Jobs), + ?assertEqual({3, 0}, run_stop_count()), + ?assertEqual({1, 0}, oneshot_run_stop_count()), + reschedule(mock_state(2)), + ?assertEqual({2, 1}, run_stop_count()), + ?assertEqual({1, 0}, oneshot_run_stop_count()), + reschedule(mock_state(1)), + ?assertEqual({1, 0}, oneshot_run_stop_count()), + reschedule(mock_state(0)), + ?assertEqual({0, 1}, oneshot_run_stop_count()) + end). + + +t_stop_oldest_first() -> + ?_test(begin + Jobs = [ + continuous_running(7), + continuous_running(4), + continuous_running(5) + ], + setup_jobs(Jobs), + reschedule(mock_state(2)), + ?assertEqual({2, 1}, run_stop_count()), + ?assertEqual([4], jobs_stopped()), + reschedule(mock_state(1)), + ?assertEqual([7], jobs_running()) + end). + + +t_start_oldest_first() -> + ?_test(begin + setup_jobs([continuous(7), continuous(2), continuous(5)]), + reschedule(mock_state(1)), + ?assertEqual({1, 2}, run_stop_count()), + ?assertEqual([2], jobs_running()), + reschedule(mock_state(2)), + ?assertEqual([7], jobs_stopped()) + end). + + +t_dont_stop_if_nothing_pending() -> + ?_test(begin + setup_jobs([continuous_running(1), continuous_running(2)]), + reschedule(mock_state(2)), + ?assertEqual({2, 0}, run_stop_count()) + end). + + +t_max_churn_limits_number_of_rotated_jobs() -> + ?_test(begin + Jobs = [ + continuous(1), + continuous_running(2), + continuous(3), + continuous_running(4) + ], + setup_jobs(Jobs), + reschedule(mock_state(2, 1)), + ?assertEqual([2, 3], jobs_stopped()) + end). + + +t_if_pending_less_than_running_start_all_pending() -> + ?_test(begin + Jobs = [ + continuous(1), + continuous_running(2), + continuous(3), + continuous_running(4), + continuous_running(5) + ], + setup_jobs(Jobs), + reschedule(mock_state(3)), + ?assertEqual([1, 2, 5], jobs_running()) + end). + + +t_running_less_than_pending_swap_all_running() -> + ?_test(begin + Jobs = [ + continuous(1), + continuous(2), + continuous(3), + continuous_running(4), + continuous_running(5) + ], + setup_jobs(Jobs), + reschedule(mock_state(2)), + ?assertEqual([3, 4, 5], jobs_stopped()) + end). + + +t_oneshot_dont_get_rotated() -> + ?_test(begin + setup_jobs([oneshot_running(1), continuous(2)]), + reschedule(mock_state(1)), + ?assertEqual([1], jobs_running()) + end). + + +t_rotate_continuous_only_if_mixed() -> + ?_test(begin + setup_jobs([continuous(1), oneshot_running(2), continuous_running(3)]), + reschedule(mock_state(2)), + ?assertEqual([1, 2], jobs_running()) + end). + + +t_oneshot_dont_get_starting_priority() -> + ?_test(begin + setup_jobs([continuous(1), oneshot(2), continuous_running(3)]), + reschedule(mock_state(1)), + ?assertEqual([1], jobs_running()) + end). + + +% This tested in other test cases, it is here to mainly make explicit a property +% of one-shot replications -- they can starve other jobs if they "take control" +% of all the available scheduler slots. +t_oneshot_will_hog_the_scheduler() -> + ?_test(begin + Jobs = [ + oneshot_running(1), + oneshot_running(2), + oneshot(3), + continuous(4) + ], + setup_jobs(Jobs), + reschedule(mock_state(2)), + ?assertEqual([1, 2], jobs_running()) + end). + + +t_if_excess_is_trimmed_rotation_doesnt_happen() -> + ?_test(begin + Jobs = [ + continuous(1), + continuous_running(2), + continuous_running(3) + ], + setup_jobs(Jobs), + reschedule(mock_state(1)), + ?assertEqual([3], jobs_running()) + end). + + +t_if_transient_job_crashes_it_gets_removed() -> + ?_test(begin + Pid = mock_pid(), + Job = #job{ + id = job1, + pid = Pid, + history = [added()], + rep = #rep{db_name = null, options = [{continuous, true}]} + }, + setup_jobs([Job]), + ?assertEqual(1, ets:info(?MODULE, size)), + State = #state{max_history = 3, stats_pid = self()}, + {noreply, State} = handle_info({'DOWN', r1, process, Pid, failed}, + State), + ?assertEqual(0, ets:info(?MODULE, size)) + end). + + +t_if_permanent_job_crashes_it_stays_in_ets() -> + ?_test(begin + Pid = mock_pid(), + Job = #job{ + id = job1, + pid = Pid, + history = [added()], + rep = #rep{db_name = <<"db1">>, options = [{continuous, true}]} + }, + setup_jobs([Job]), + ?assertEqual(1, ets:info(?MODULE, size)), + State = #state{max_jobs =1, max_history = 3, stats_pid = self()}, + {noreply, State} = handle_info({'DOWN', r1, process, Pid, failed}, + State), + ?assertEqual(1, ets:info(?MODULE, size)), + [Job1] = ets:lookup(?MODULE, job1), + [Latest | _] = Job1#job.history, + ?assertMatch({{crashed, failed}, _}, Latest) + end). + + +% Test helper functions + +setup() -> + catch ets:delete(?MODULE), + meck:expect(couch_log, notice, 2, ok), + meck:expect(couch_log, warning, 2, ok), + meck:expect(couch_log, error, 2, ok), + meck:expect(couch_replicator_scheduler_sup, terminate_child, 1, ok), + meck:expect(couch_stats, increment_counter, 1, ok), + meck:expect(couch_stats, update_gauge, 2, ok), + Pid = mock_pid(), + meck:expect(couch_replicator_scheduler_sup, start_child, 1, {ok, Pid}). + + +teardown(_) -> + catch ets:delete(?MODULE), + meck:unload(). + + +setup_jobs(Jobs) when is_list(Jobs) -> + ?MODULE = ets:new(?MODULE, [named_table, {keypos, #job.id}]), + ets:insert(?MODULE, Jobs). + + +all_jobs() -> + lists:usort(ets:tab2list(?MODULE)). + + +jobs_stopped() -> + [Job#job.id || Job <- all_jobs(), Job#job.pid =:= undefined]. + + +jobs_running() -> + [Job#job.id || Job <- all_jobs(), Job#job.pid =/= undefined]. + + +run_stop_count() -> + {length(jobs_running()), length(jobs_stopped())}. + + +oneshot_run_stop_count() -> + Running = [Job#job.id || Job <- all_jobs(), Job#job.pid =/= undefined, + not is_continuous(Job)], + Stopped = [Job#job.id || Job <- all_jobs(), Job#job.pid =:= undefined, + not is_continuous(Job)], + {length(Running), length(Stopped)}. + + +mock_state(MaxJobs) -> + #state{ + max_jobs = MaxJobs, + max_churn = ?DEFAULT_MAX_CHURN, + max_history = ?DEFAULT_MAX_HISTORY, + stats_pid = self() + }. + +mock_state(MaxJobs, MaxChurn) -> + #state{ + max_jobs = MaxJobs, + max_churn = MaxChurn, + max_history = ?DEFAULT_MAX_HISTORY, + stats_pid = self() + }. + + +continuous(Id) when is_integer(Id) -> + Started = Id, + Hist = [stopped(Started+1), started(Started), added()], + #job{ + id = Id, + history = Hist, + rep = #rep{options = [{continuous, true}]} + }. + + +continuous_running(Id) when is_integer(Id) -> + Started = Id, + Pid = mock_pid(), + #job{ + id = Id, + history = [started(Started), added()], + rep = #rep{options = [{continuous, true}]}, + pid = Pid, + monitor = monitor(process, Pid) + }. + + +oneshot(Id) when is_integer(Id) -> + Started = Id, + Hist = [stopped(Started + 1), started(Started), added()], + #job{id = Id, history = Hist, rep = #rep{options = []}}. + + +oneshot_running(Id) when is_integer(Id) -> + Started = Id, + Pid = mock_pid(), + #job{ + id = Id, + history = [started(Started), added()], + rep = #rep{options = []}, + pid = Pid, + monitor = monitor(process, Pid) + }. + + +testjob(Hist) when is_list(Hist) -> + #job{history = Hist}. + + +mock_pid() -> + list_to_pid("<0.999.999>"). + +crashed() -> + crashed(0). + + +crashed(WhenSec) when is_integer(WhenSec)-> + {{crashed, some_reason}, {0, WhenSec, 0}}; +crashed({MSec, Sec, USec}) -> + {{crashed, some_reason}, {MSec, Sec, USec}}. + + +started() -> + started(0). + + +started(WhenSec) when is_integer(WhenSec)-> + {started, {0, WhenSec, 0}}; + +started({MSec, Sec, USec}) -> + {started, {MSec, Sec, USec}}. + + +stopped() -> + stopped(0). + + +stopped(WhenSec) -> + {stopped, {0, WhenSec, 0}}. + + +added() -> + {added, {0, 0, 0}}. + +-endif. diff --git a/src/couch_replicator/src/couch_replicator_scheduler.hrl b/src/couch_replicator/src/couch_replicator_scheduler.hrl new file mode 100644 index 000000000..5203b0caa --- /dev/null +++ b/src/couch_replicator/src/couch_replicator_scheduler.hrl @@ -0,0 +1,15 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + + +-type job_id() :: term(). +-type job_args() :: term(). diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl new file mode 100644 index 000000000..3253ce526 --- /dev/null +++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl @@ -0,0 +1,969 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_replicator_scheduler_job). + +-behaviour(gen_server). + +-export([ + start_link/1 +]). + +-export([ + init/1, + terminate/2, + handle_call/3, + handle_info/2, + handle_cast/2, + code_change/3, + format_status/2 +]). + +-include_lib("couch/include/couch_db.hrl"). +-include("couch_replicator_api_wrap.hrl"). +-include("couch_replicator_scheduler.hrl"). +-include("couch_replicator.hrl"). + +-import(couch_util, [ + get_value/2, + get_value/3, + to_binary/1 +]). + +-import(couch_replicator_utils, [ + start_db_compaction_notifier/2, + stop_db_compaction_notifier/1, + pp_rep_id/1 +]). + + +-define(LOWEST_SEQ, 0). +-define(DEFAULT_CHECKPOINT_INTERVAL, 30000). +-define(STARTUP_JITTER_DEFAULT, 5000). + +-record(rep_state, { + rep_details, + source_name, + target_name, + source, + target, + history, + checkpoint_history, + start_seq, + committed_seq, + current_through_seq, + seqs_in_progress = [], + highest_seq_done = {0, ?LOWEST_SEQ}, + source_log, + target_log, + rep_starttime, + src_starttime, + tgt_starttime, + timer, % checkpoint timer + changes_queue, + changes_manager, + changes_reader, + workers, + stats = couch_replicator_stats:new(), + session_id, + source_db_compaction_notifier = nil, + target_db_compaction_notifier = nil, + source_monitor = nil, + target_monitor = nil, + source_seq = nil, + use_checkpoints = true, + checkpoint_interval = ?DEFAULT_CHECKPOINT_INTERVAL, + type = db, + view = nil +}). + + +start_link(#rep{id = {BaseId, Ext}, source = Src, target = Tgt} = Rep) -> + RepChildId = BaseId ++ Ext, + Source = couch_replicator_api_wrap:db_uri(Src), + Target = couch_replicator_api_wrap:db_uri(Tgt), + ServerName = {global, {?MODULE, Rep#rep.id}}, + + case gen_server:start_link(ServerName, ?MODULE, Rep, []) of + {ok, Pid} -> + couch_log:notice("starting new replication `~s` at ~p (`~s` -> `~s`)", + [RepChildId, Pid, Source, Target]), + {ok, Pid}; + {error, Reason} -> + couch_log:warning("failed to start replication `~s` (`~s` -> `~s`)", + [RepChildId, Source, Target]), + {error, Reason} + end. + + +init(InitArgs) -> + {ok, InitArgs, 0}. + + +do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) -> + process_flag(trap_exit, true), + + random:seed(os:timestamp()), + timer:sleep(startup_jitter()), + + #rep_state{ + source = Source, + target = Target, + source_name = SourceName, + target_name = TargetName, + start_seq = {_Ts, StartSeq}, + committed_seq = {_, CommittedSeq}, + highest_seq_done = {_, HighestSeq}, + checkpoint_interval = CheckpointInterval + } = State = init_state(Rep), + + NumWorkers = get_value(worker_processes, Options), + BatchSize = get_value(worker_batch_size, Options), + {ok, ChangesQueue} = couch_work_queue:new([ + {max_items, BatchSize * NumWorkers * 2}, + {max_size, 100 * 1024 * NumWorkers} + ]), + % This starts the _changes reader process. It adds the changes from + % the source db to the ChangesQueue. + {ok, ChangesReader} = couch_replicator_changes_reader:start_link( + StartSeq, Source, ChangesQueue, Options + ), + % Changes manager - responsible for dequeing batches from the changes queue + % and deliver them to the worker processes. + ChangesManager = spawn_changes_manager(self(), ChangesQueue, BatchSize), + % This starts the worker processes. They ask the changes queue manager for a + % a batch of _changes rows to process -> check which revs are missing in the + % target, and for the missing ones, it copies them from the source to the target. + MaxConns = get_value(http_connections, Options), + Workers = lists:map( + fun(_) -> + couch_stats:increment_counter([couch_replicator, workers_started]), + {ok, Pid} = couch_replicator_worker:start_link( + self(), Source, Target, ChangesManager, MaxConns), + Pid + end, + lists:seq(1, NumWorkers)), + + couch_task_status:add_task([ + {type, replication}, + {user, UserCtx#user_ctx.name}, + {replication_id, ?l2b(BaseId ++ Ext)}, + {database, Rep#rep.db_name}, + {doc_id, Rep#rep.doc_id}, + {source, ?l2b(SourceName)}, + {target, ?l2b(TargetName)}, + {continuous, get_value(continuous, Options, false)}, + {revisions_checked, 0}, + {missing_revisions_found, 0}, + {docs_read, 0}, + {docs_written, 0}, + {changes_pending, get_pending_count(State)}, + {doc_write_failures, 0}, + {source_seq, HighestSeq}, + {checkpointed_source_seq, CommittedSeq}, + {checkpoint_interval, CheckpointInterval} + ]), + couch_task_status:set_update_frequency(1000), + + % Until OTP R14B03: + % + % Restarting a temporary supervised child implies that the original arguments + % (#rep{} record) specified in the MFA component of the supervisor + % child spec will always be used whenever the child is restarted. + % This implies the same replication performance tunning parameters will + % always be used. The solution is to delete the child spec (see + % cancel_replication/1) and then start the replication again, but this is + % unfortunately not immune to race conditions. + + couch_log:notice("Replication `~p` is using:~n" + "~c~p worker processes~n" + "~ca worker batch size of ~p~n" + "~c~p HTTP connections~n" + "~ca connection timeout of ~p milliseconds~n" + "~c~p retries per request~n" + "~csocket options are: ~s~s", + [BaseId ++ Ext, $\t, NumWorkers, $\t, BatchSize, $\t, + MaxConns, $\t, get_value(connection_timeout, Options), + $\t, get_value(retries, Options), + $\t, io_lib:format("~p", [get_value(socket_options, Options)]), + case StartSeq of + ?LOWEST_SEQ -> + ""; + _ -> + io_lib:format("~n~csource start sequence ~p", [$\t, StartSeq]) + end]), + + couch_log:debug("Worker pids are: ~p", [Workers]), + + doc_update_triggered(Rep), + + {ok, State#rep_state{ + changes_queue = ChangesQueue, + changes_manager = ChangesManager, + changes_reader = ChangesReader, + workers = Workers + } + }. + + +handle_call(get_details, _From, #rep_state{rep_details = Rep} = State) -> + {reply, {ok, Rep}, State}; + +handle_call({add_stats, Stats}, From, State) -> + gen_server:reply(From, ok), + NewStats = couch_replicator_utils:sum_stats(State#rep_state.stats, Stats), + {noreply, State#rep_state{stats = NewStats}}; + +handle_call({report_seq_done, Seq, StatsInc}, From, + #rep_state{seqs_in_progress = SeqsInProgress, highest_seq_done = HighestDone, + current_through_seq = ThroughSeq, stats = Stats} = State) -> + gen_server:reply(From, ok), + {NewThroughSeq0, NewSeqsInProgress} = case SeqsInProgress of + [] -> + {Seq, []}; + [Seq | Rest] -> + {Seq, Rest}; + [_ | _] -> + {ThroughSeq, ordsets:del_element(Seq, SeqsInProgress)} + end, + NewHighestDone = lists:max([HighestDone, Seq]), + NewThroughSeq = case NewSeqsInProgress of + [] -> + lists:max([NewThroughSeq0, NewHighestDone]); + _ -> + NewThroughSeq0 + end, + couch_log:debug("Worker reported seq ~p, through seq was ~p, " + "new through seq is ~p, highest seq done was ~p, " + "new highest seq done is ~p~n" + "Seqs in progress were: ~p~nSeqs in progress are now: ~p", + [Seq, ThroughSeq, NewThroughSeq, HighestDone, + NewHighestDone, SeqsInProgress, NewSeqsInProgress]), + NewState = State#rep_state{ + stats = couch_replicator_utils:sum_stats(Stats, StatsInc), + current_through_seq = NewThroughSeq, + seqs_in_progress = NewSeqsInProgress, + highest_seq_done = NewHighestDone + }, + update_task(NewState), + {noreply, NewState}. + + +handle_cast({db_compacted, DbName}, + #rep_state{source = #db{name = DbName} = Source} = State) -> + {ok, NewSource} = couch_db:reopen(Source), + {noreply, State#rep_state{source = NewSource}}; + +handle_cast({db_compacted, DbName}, + #rep_state{target = #db{name = DbName} = Target} = State) -> + {ok, NewTarget} = couch_db:reopen(Target), + {noreply, State#rep_state{target = NewTarget}}; + +handle_cast(checkpoint, State) -> + case do_checkpoint(State) of + {ok, NewState} -> + couch_stats:increment_counter([couch_replicator, checkpoints, success]), + {noreply, NewState#rep_state{timer = start_timer(State)}}; + Error -> + couch_stats:increment_counter([couch_replicator, checkpoints, failure]), + {stop, Error, State} + end; + +handle_cast({report_seq, Seq}, + #rep_state{seqs_in_progress = SeqsInProgress} = State) -> + NewSeqsInProgress = ordsets:add_element(Seq, SeqsInProgress), + {noreply, State#rep_state{seqs_in_progress = NewSeqsInProgress}}. + + +handle_info(shutdown, St) -> + {stop, shutdown, St}; + +handle_info({'DOWN', Ref, _, _, Why}, #rep_state{source_monitor = Ref} = St) -> + couch_log:error("Source database is down. Reason: ~p", [Why]), + {stop, source_db_down, St}; + +handle_info({'DOWN', Ref, _, _, Why}, #rep_state{target_monitor = Ref} = St) -> + couch_log:error("Target database is down. Reason: ~p", [Why]), + {stop, target_db_down, St}; + +handle_info({'EXIT', Pid, max_backoff}, State) -> + couch_log:error("Max backoff reached child process ~p", [Pid]), + {stop, {shutdown, max_backoff}, State}; + +handle_info({'EXIT', Pid, {shutdown, max_backoff}}, State) -> + couch_log:error("Max backoff reached child process ~p", [Pid]), + {stop, {shutdown, max_backoff}, State}; + +handle_info({'EXIT', Pid, normal}, #rep_state{changes_reader=Pid} = State) -> + {noreply, State}; + +handle_info({'EXIT', Pid, Reason}, #rep_state{changes_reader=Pid} = State) -> + couch_stats:increment_counter([couch_replicator, changes_reader_deaths]), + couch_log:error("ChangesReader process died with reason: ~p", [Reason]), + {stop, changes_reader_died, cancel_timer(State)}; + +handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager = Pid} = State) -> + {noreply, State}; + +handle_info({'EXIT', Pid, Reason}, #rep_state{changes_manager = Pid} = State) -> + couch_stats:increment_counter([couch_replicator, changes_manager_deaths]), + couch_log:error("ChangesManager process died with reason: ~p", [Reason]), + {stop, changes_manager_died, cancel_timer(State)}; + +handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) -> + {noreply, State}; + +handle_info({'EXIT', Pid, Reason}, #rep_state{changes_queue=Pid} = State) -> + couch_stats:increment_counter([couch_replicator, changes_queue_deaths]), + couch_log:error("ChangesQueue process died with reason: ~p", [Reason]), + {stop, changes_queue_died, cancel_timer(State)}; + +handle_info({'EXIT', Pid, normal}, #rep_state{workers = Workers} = State) -> + case Workers -- [Pid] of + Workers -> + couch_log:error("unknown pid bit the dust ~p ~n",[Pid]), + {noreply, State#rep_state{workers = Workers}}; + %% not clear why a stop was here before + %%{stop, {unknown_process_died, Pid, normal}, State}; + [] -> + catch unlink(State#rep_state.changes_manager), + catch exit(State#rep_state.changes_manager, kill), + do_last_checkpoint(State); + Workers2 -> + {noreply, State#rep_state{workers = Workers2}} + end; + +handle_info({'EXIT', Pid, Reason}, #rep_state{workers = Workers} = State) -> + State2 = cancel_timer(State), + case lists:member(Pid, Workers) of + false -> + {stop, {unknown_process_died, Pid, Reason}, State2}; + true -> + couch_stats:increment_counter([couch_replicator, worker_deaths]), + couch_log:error("Worker ~p died with reason: ~p", [Pid, Reason]), + {stop, {worker_died, Pid, Reason}, State2} + end; + +handle_info(timeout, InitArgs) -> + try do_init(InitArgs) of {ok, State} -> + {noreply, State} + catch + exit:{http_request_failed, _, _, max_backoff} -> + {stop, {shutdown, max_backoff}, {error, InitArgs}}; + Class:Error -> + ShutdownReason = {error, replication_start_error(Error)}, + % Shutdown state is a hack as it is not really the state of the + % gen_server (it failed to initialize, so it doesn't have one). + % Shutdown state is used to pass extra info about why start failed. + ShutdownState = {error, Class, erlang:get_stacktrace(), InitArgs}, + {stop, {shutdown, ShutdownReason}, ShutdownState} + end. + + +terminate(normal, #rep_state{rep_details = #rep{id = RepId} = Rep, + checkpoint_history = CheckpointHistory} = State) -> + terminate_cleanup(State), + couch_replicator_notifier:notify({finished, RepId, CheckpointHistory}), + doc_update_completed(Rep, rep_stats(State)); + +terminate(shutdown, #rep_state{rep_details = #rep{id = RepId}} = State) -> + % Replication stopped via _scheduler_sup:terminate_child/1, which can be + % occur during regular scheduler operation or when job is removed from + % the scheduler. + State1 = case do_checkpoint(State) of + {ok, NewState} -> + NewState; + Error -> + LogMsg = "~p : Failed last checkpoint. Job: ~p Error: ~p", + couch_log:error(LogMsg, [?MODULE, RepId, Error]), + State + end, + couch_replicator_notifier:notify({stopped, RepId, <<"stopped">>}), + terminate_cleanup(State1); + +terminate({shutdown, max_backoff}, {error, InitArgs}) -> + #rep{id = {BaseId, Ext} = RepId} = InitArgs, + couch_stats:increment_counter([couch_replicator, failed_starts]), + couch_log:warning("Replication `~s` reached max backoff ", [BaseId ++ Ext]), + couch_replicator_notifier:notify({error, RepId, max_backoff}); + +terminate({shutdown, {error, Error}}, {error, Class, Stack, InitArgs}) -> + #rep{id=RepId} = InitArgs, + couch_stats:increment_counter([couch_replicator, failed_starts]), + CleanInitArgs = rep_strip_creds(InitArgs), + couch_log:error("~p:~p: Replication failed to start for args ~p: ~p", + [Class, Error, CleanInitArgs, Stack]), + couch_replicator_notifier:notify({error, RepId, Error}); + +terminate({shutdown, max_backoff}, State) -> + #rep_state{ + source_name = Source, + target_name = Target, + rep_details = #rep{id = {BaseId, Ext} = RepId} + } = State, + couch_log:error("Replication `~s` (`~s` -> `~s`) reached max backoff", + [BaseId ++ Ext, Source, Target]), + terminate_cleanup(State), + couch_replicator_notifier:notify({error, RepId, max_backoff}); + +terminate(Reason, State) -> +#rep_state{ + source_name = Source, + target_name = Target, + rep_details = #rep{id = {BaseId, Ext} = RepId} + } = State, + couch_log:error("Replication `~s` (`~s` -> `~s`) failed: ~s", + [BaseId ++ Ext, Source, Target, to_binary(Reason)]), + terminate_cleanup(State), + couch_replicator_notifier:notify({error, RepId, Reason}). + +terminate_cleanup(State) -> + update_task(State), + stop_db_compaction_notifier(State#rep_state.source_db_compaction_notifier), + stop_db_compaction_notifier(State#rep_state.target_db_compaction_notifier), + couch_replicator_api_wrap:db_close(State#rep_state.source), + couch_replicator_api_wrap:db_close(State#rep_state.target). + + +code_change(_OldVsn, #rep_state{}=State, _Extra) -> + {ok, State}. + + +format_status(_Opt, [_PDict, State]) -> + [{data, [{"State", state_strip_creds(State)}]}]. + + +startup_jitter() -> + Jitter = config:get_integer("replicator", "startup_jitter", + ?STARTUP_JITTER_DEFAULT), + random:uniform(erlang:max(1, Jitter)). + + +headers_strip_creds([], Acc) -> + lists:reverse(Acc); +headers_strip_creds([{Key, Value0} | Rest], Acc) -> + Value = case string:to_lower(Key) of + "authorization" -> + "****"; + _ -> + Value0 + end, + headers_strip_creds(Rest, [{Key, Value} | Acc]). + + +httpdb_strip_creds(#httpdb{url = Url, headers = Headers} = HttpDb) -> + HttpDb#httpdb{ + url = couch_util:url_strip_password(Url), + headers = headers_strip_creds(Headers, []) + }; +httpdb_strip_creds(LocalDb) -> + LocalDb. + + +rep_strip_creds(#rep{source = Source, target = Target} = Rep) -> + Rep#rep{ + source = httpdb_strip_creds(Source), + target = httpdb_strip_creds(Target) + }. + + +state_strip_creds(#rep_state{rep_details = Rep, source = Source, target = Target} = State) -> + % #rep_state contains the source and target at the top level and also + % in the nested #rep_details record + State#rep_state{ + rep_details = rep_strip_creds(Rep), + source = httpdb_strip_creds(Source), + target = httpdb_strip_creds(Target) + }. + + +adjust_maxconn(Src = #httpdb{http_connections = 1}, RepId) -> + Msg = "Adjusting minimum number of HTTP source connections to 2 for ~p", + couch_log:notice(Msg, [RepId]), + Src#httpdb{http_connections = 2}; +adjust_maxconn(Src, _RepId) -> + Src. + + +-spec doc_update_triggered(#rep{}) -> ok. +doc_update_triggered(#rep{db_name = null}) -> + ok; +doc_update_triggered(#rep{id = RepId, doc_id = DocId} = Rep) -> + case couch_replicator_doc_processor:update_docs() of + true -> + couch_replicator_docs:update_triggered(Rep, RepId); + false -> + ok + end, + couch_log:notice("Document `~s` triggered replication `~s`", + [DocId, pp_rep_id(RepId)]), + ok. + + +-spec doc_update_completed(#rep{}, list()) -> ok. +doc_update_completed(#rep{db_name = null}, _Stats) -> + ok; +doc_update_completed(#rep{id = RepId, doc_id = DocId, db_name = DbName, + start_time = StartTime}, Stats0) -> + Stats = Stats0 ++ [{start_time, couch_replicator_utils:iso8601(StartTime)}], + couch_replicator_docs:update_doc_completed(DbName, DocId, Stats), + couch_log:notice("Replication `~s` completed (triggered by `~s`)", + [pp_rep_id(RepId), DocId]), + ok. + + +do_last_checkpoint(#rep_state{seqs_in_progress = [], + highest_seq_done = {_Ts, ?LOWEST_SEQ}} = State) -> + {stop, normal, cancel_timer(State)}; +do_last_checkpoint(#rep_state{seqs_in_progress = [], + highest_seq_done = Seq} = State) -> + case do_checkpoint(State#rep_state{current_through_seq = Seq}) of + {ok, NewState} -> + couch_stats:increment_counter([couch_replicator, checkpoints, success]), + {stop, normal, cancel_timer(NewState)}; + Error -> + couch_stats:increment_counter([couch_replicator, checkpoints, failure]), + {stop, Error, State} + end. + + +start_timer(State) -> + After = State#rep_state.checkpoint_interval, + case timer:apply_after(After, gen_server, cast, [self(), checkpoint]) of + {ok, Ref} -> + Ref; + Error -> + couch_log:error("Replicator, error scheduling checkpoint: ~p", [Error]), + nil + end. + + +cancel_timer(#rep_state{timer = nil} = State) -> + State; +cancel_timer(#rep_state{timer = Timer} = State) -> + {ok, cancel} = timer:cancel(Timer), + State#rep_state{timer = nil}. + + +init_state(Rep) -> + #rep{ + id = {BaseId, _Ext}, + source = Src0, target = Tgt, + options = Options, user_ctx = UserCtx, + type = Type, view = View, + start_time = StartTime + } = Rep, + % Adjust minimum number of http source connections to 2 to avoid deadlock + Src = adjust_maxconn(Src0, BaseId), + {ok, Source} = couch_replicator_api_wrap:db_open(Src, [{user_ctx, UserCtx}]), + {ok, Target} = couch_replicator_api_wrap:db_open(Tgt, [{user_ctx, UserCtx}], + get_value(create_target, Options, false)), + + {ok, SourceInfo} = couch_replicator_api_wrap:get_db_info(Source), + {ok, TargetInfo} = couch_replicator_api_wrap:get_db_info(Target), + + [SourceLog, TargetLog] = find_replication_logs([Source, Target], Rep), + + {StartSeq0, History} = compare_replication_logs(SourceLog, TargetLog), + StartSeq1 = get_value(since_seq, Options, StartSeq0), + StartSeq = {0, StartSeq1}, + + SourceSeq = get_value(<<"update_seq">>, SourceInfo, ?LOWEST_SEQ), + + #doc{body={CheckpointHistory}} = SourceLog, + State = #rep_state{ + rep_details = Rep, + source_name = couch_replicator_api_wrap:db_uri(Source), + target_name = couch_replicator_api_wrap:db_uri(Target), + source = Source, + target = Target, + history = History, + checkpoint_history = {[{<<"no_changes">>, true}| CheckpointHistory]}, + start_seq = StartSeq, + current_through_seq = StartSeq, + committed_seq = StartSeq, + source_log = SourceLog, + target_log = TargetLog, + rep_starttime = StartTime, + src_starttime = get_value(<<"instance_start_time">>, SourceInfo), + tgt_starttime = get_value(<<"instance_start_time">>, TargetInfo), + session_id = couch_uuids:random(), + source_db_compaction_notifier = + start_db_compaction_notifier(Source, self()), + target_db_compaction_notifier = + start_db_compaction_notifier(Target, self()), + source_monitor = db_monitor(Source), + target_monitor = db_monitor(Target), + source_seq = SourceSeq, + use_checkpoints = get_value(use_checkpoints, Options, true), + checkpoint_interval = get_value(checkpoint_interval, Options, + ?DEFAULT_CHECKPOINT_INTERVAL), + type = Type, + view = View + }, + State#rep_state{timer = start_timer(State)}. + + +find_replication_logs(DbList, #rep{id = {BaseId, _}} = Rep) -> + LogId = ?l2b(?LOCAL_DOC_PREFIX ++ BaseId), + fold_replication_logs(DbList, ?REP_ID_VERSION, LogId, LogId, Rep, []). + + +fold_replication_logs([], _Vsn, _LogId, _NewId, _Rep, Acc) -> + lists:reverse(Acc); + +fold_replication_logs([Db | Rest] = Dbs, Vsn, LogId, NewId, Rep, Acc) -> + case couch_replicator_api_wrap:open_doc(Db, LogId, [ejson_body]) of + {error, <<"not_found">>} when Vsn > 1 -> + OldRepId = couch_replicator_utils:replication_id(Rep, Vsn - 1), + fold_replication_logs(Dbs, Vsn - 1, + ?l2b(?LOCAL_DOC_PREFIX ++ OldRepId), NewId, Rep, Acc); + {error, <<"not_found">>} -> + fold_replication_logs( + Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [#doc{id = NewId} | Acc]); + {ok, Doc} when LogId =:= NewId -> + fold_replication_logs( + Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [Doc | Acc]); + {ok, Doc} -> + MigratedLog = #doc{id = NewId, body = Doc#doc.body}, + fold_replication_logs( + Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [MigratedLog | Acc]) + end. + + +spawn_changes_manager(Parent, ChangesQueue, BatchSize) -> + spawn_link(fun() -> + changes_manager_loop_open(Parent, ChangesQueue, BatchSize, 1) + end). + + +changes_manager_loop_open(Parent, ChangesQueue, BatchSize, Ts) -> + receive + {get_changes, From} -> + case couch_work_queue:dequeue(ChangesQueue, BatchSize) of + closed -> + From ! {closed, self()}; + {ok, Changes} -> + #doc_info{high_seq = Seq} = lists:last(Changes), + ReportSeq = {Ts, Seq}, + ok = gen_server:cast(Parent, {report_seq, ReportSeq}), + From ! {changes, self(), Changes, ReportSeq} + end, + changes_manager_loop_open(Parent, ChangesQueue, BatchSize, Ts + 1) + end. + + +do_checkpoint(#rep_state{use_checkpoints=false} = State) -> + NewState = State#rep_state{checkpoint_history = {[{<<"use_checkpoints">>, false}]} }, + {ok, NewState}; +do_checkpoint(#rep_state{current_through_seq=Seq, committed_seq=Seq} = State) -> + update_task(State), + {ok, State}; +do_checkpoint(State) -> + #rep_state{ + source_name=SourceName, + target_name=TargetName, + source = Source, + target = Target, + history = OldHistory, + start_seq = {_, StartSeq}, + current_through_seq = {_Ts, NewSeq} = NewTsSeq, + source_log = SourceLog, + target_log = TargetLog, + rep_starttime = ReplicationStartTime, + src_starttime = SrcInstanceStartTime, + tgt_starttime = TgtInstanceStartTime, + stats = Stats, + rep_details = #rep{options = Options}, + session_id = SessionId + } = State, + case commit_to_both(Source, Target) of + {source_error, Reason} -> + {checkpoint_commit_failure, + <<"Failure on source commit: ", (to_binary(Reason))/binary>>}; + {target_error, Reason} -> + {checkpoint_commit_failure, + <<"Failure on target commit: ", (to_binary(Reason))/binary>>}; + {SrcInstanceStartTime, TgtInstanceStartTime} -> + couch_log:notice("recording a checkpoint for `~s` -> `~s` at source update_seq ~p", + [SourceName, TargetName, NewSeq]), + UniversalStartTime = calendar:now_to_universal_time(ReplicationStartTime), + StartTime = ?l2b(httpd_util:rfc1123_date(UniversalStartTime)), + EndTime = ?l2b(httpd_util:rfc1123_date()), + NewHistoryEntry = {[ + {<<"session_id">>, SessionId}, + {<<"start_time">>, StartTime}, + {<<"end_time">>, EndTime}, + {<<"start_last_seq">>, StartSeq}, + {<<"end_last_seq">>, NewSeq}, + {<<"recorded_seq">>, NewSeq}, + {<<"missing_checked">>, couch_replicator_stats:missing_checked(Stats)}, + {<<"missing_found">>, couch_replicator_stats:missing_found(Stats)}, + {<<"docs_read">>, couch_replicator_stats:docs_read(Stats)}, + {<<"docs_written">>, couch_replicator_stats:docs_written(Stats)}, + {<<"doc_write_failures">>, couch_replicator_stats:doc_write_failures(Stats)} + ]}, + BaseHistory = [ + {<<"session_id">>, SessionId}, + {<<"source_last_seq">>, NewSeq}, + {<<"replication_id_version">>, ?REP_ID_VERSION} + ] ++ case get_value(doc_ids, Options) of + undefined -> + []; + _DocIds -> + % backwards compatibility with the result of a replication by + % doc IDs in versions 0.11.x and 1.0.x + % TODO: deprecate (use same history format, simplify code) + [ + {<<"start_time">>, StartTime}, + {<<"end_time">>, EndTime}, + {<<"docs_read">>, couch_replicator_stats:docs_read(Stats)}, + {<<"docs_written">>, couch_replicator_stats:docs_written(Stats)}, + {<<"doc_write_failures">>, couch_replicator_stats:doc_write_failures(Stats)} + ] + end, + % limit history to 50 entries + NewRepHistory = { + BaseHistory ++ + [{<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)}] + }, + + try + {SrcRevPos, SrcRevId} = update_checkpoint( + Source, SourceLog#doc{body = NewRepHistory}, source), + {TgtRevPos, TgtRevId} = update_checkpoint( + Target, TargetLog#doc{body = NewRepHistory}, target), + NewState = State#rep_state{ + checkpoint_history = NewRepHistory, + committed_seq = NewTsSeq, + source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}}, + target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}} + }, + update_task(NewState), + {ok, NewState} + catch throw:{checkpoint_commit_failure, _} = Failure -> + Failure + end; + {SrcInstanceStartTime, _NewTgtInstanceStartTime} -> + {checkpoint_commit_failure, <<"Target database out of sync. " + "Try to increase max_dbs_open at the target's server.">>}; + {_NewSrcInstanceStartTime, TgtInstanceStartTime} -> + {checkpoint_commit_failure, <<"Source database out of sync. " + "Try to increase max_dbs_open at the source's server.">>}; + {_NewSrcInstanceStartTime, _NewTgtInstanceStartTime} -> + {checkpoint_commit_failure, <<"Source and target databases out of " + "sync. Try to increase max_dbs_open at both servers.">>} + end. + + +update_checkpoint(Db, Doc, DbType) -> + try + update_checkpoint(Db, Doc) + catch throw:{checkpoint_commit_failure, Reason} -> + throw({checkpoint_commit_failure, + <<"Error updating the ", (to_binary(DbType))/binary, + " checkpoint document: ", (to_binary(Reason))/binary>>}) + end. + + +update_checkpoint(Db, #doc{id = LogId, body = LogBody} = Doc) -> + try + case couch_replicator_api_wrap:update_doc(Db, Doc, [delay_commit]) of + {ok, PosRevId} -> + PosRevId; + {error, Reason} -> + throw({checkpoint_commit_failure, Reason}) + end + catch throw:conflict -> + case (catch couch_replicator_api_wrap:open_doc(Db, LogId, [ejson_body])) of + {ok, #doc{body = LogBody, revs = {Pos, [RevId | _]}}} -> + % This means that we were able to update successfully the + % checkpoint doc in a previous attempt but we got a connection + % error (timeout for e.g.) before receiving the success response. + % Therefore the request was retried and we got a conflict, as the + % revision we sent is not the current one. + % We confirm this by verifying the doc body we just got is the same + % that we have just sent. + {Pos, RevId}; + _ -> + throw({checkpoint_commit_failure, conflict}) + end + end. + + +commit_to_both(Source, Target) -> + % commit the src async + ParentPid = self(), + SrcCommitPid = spawn_link( + fun() -> + Result = (catch couch_replicator_api_wrap:ensure_full_commit(Source)), + ParentPid ! {self(), Result} + end), + + % commit tgt sync + TargetResult = (catch couch_replicator_api_wrap:ensure_full_commit(Target)), + + SourceResult = receive + {SrcCommitPid, Result} -> + unlink(SrcCommitPid), + receive {'EXIT', SrcCommitPid, _} -> ok after 0 -> ok end, + Result; + {'EXIT', SrcCommitPid, Reason} -> + {error, Reason} + end, + case TargetResult of + {ok, TargetStartTime} -> + case SourceResult of + {ok, SourceStartTime} -> + {SourceStartTime, TargetStartTime}; + SourceError -> + {source_error, SourceError} + end; + TargetError -> + {target_error, TargetError} + end. + + +compare_replication_logs(SrcDoc, TgtDoc) -> + #doc{body={RepRecProps}} = SrcDoc, + #doc{body={RepRecPropsTgt}} = TgtDoc, + case get_value(<<"session_id">>, RepRecProps) == + get_value(<<"session_id">>, RepRecPropsTgt) of + true -> + % if the records have the same session id, + % then we have a valid replication history + OldSeqNum = get_value(<<"source_last_seq">>, RepRecProps, ?LOWEST_SEQ), + OldHistory = get_value(<<"history">>, RepRecProps, []), + {OldSeqNum, OldHistory}; + false -> + SourceHistory = get_value(<<"history">>, RepRecProps, []), + TargetHistory = get_value(<<"history">>, RepRecPropsTgt, []), + couch_log:notice("Replication records differ. " + "Scanning histories to find a common ancestor.", []), + couch_log:debug("Record on source:~p~nRecord on target:~p~n", + [RepRecProps, RepRecPropsTgt]), + compare_rep_history(SourceHistory, TargetHistory) + end. + + +compare_rep_history(S, T) when S =:= [] orelse T =:= [] -> + couch_log:notice("no common ancestry -- performing full replication", []), + {?LOWEST_SEQ, []}; +compare_rep_history([{S} | SourceRest], [{T} | TargetRest] = Target) -> + SourceId = get_value(<<"session_id">>, S), + case has_session_id(SourceId, Target) of + true -> + RecordSeqNum = get_value(<<"recorded_seq">>, S, ?LOWEST_SEQ), + couch_log:notice("found a common replication record with source_seq ~p", + [RecordSeqNum]), + {RecordSeqNum, SourceRest}; + false -> + TargetId = get_value(<<"session_id">>, T), + case has_session_id(TargetId, SourceRest) of + true -> + RecordSeqNum = get_value(<<"recorded_seq">>, T, ?LOWEST_SEQ), + couch_log:notice("found a common replication record with source_seq ~p", + [RecordSeqNum]), + {RecordSeqNum, TargetRest}; + false -> + compare_rep_history(SourceRest, TargetRest) + end + end. + + +has_session_id(_SessionId, []) -> + false; +has_session_id(SessionId, [{Props} | Rest]) -> + case get_value(<<"session_id">>, Props, nil) of + SessionId -> + true; + _Else -> + has_session_id(SessionId, Rest) + end. + + +db_monitor(#db{} = Db) -> + couch_db:monitor(Db); +db_monitor(_HttpDb) -> + nil. + + +get_pending_count(St) -> + Rep = St#rep_state.rep_details, + Timeout = get_value(connection_timeout, Rep#rep.options), + TimeoutMicro = Timeout * 1000, + case get(pending_count_state) of + {LastUpdate, PendingCount} -> + case timer:now_diff(os:timestamp(), LastUpdate) > TimeoutMicro of + true -> + NewPendingCount = get_pending_count_int(St), + put(pending_count_state, {os:timestamp(), NewPendingCount}), + NewPendingCount; + false -> + PendingCount + end; + undefined -> + NewPendingCount = get_pending_count_int(St), + put(pending_count_state, {os:timestamp(), NewPendingCount}), + NewPendingCount + end. + + +get_pending_count_int(#rep_state{source = #httpdb{} = Db0}=St) -> + {_, Seq} = St#rep_state.highest_seq_done, + Db = Db0#httpdb{retries = 3}, + case (catch couch_replicator_api_wrap:get_pending_count(Db, Seq)) of + {ok, Pending} -> + Pending; + _ -> + null + end; +get_pending_count_int(#rep_state{source = Db}=St) -> + {_, Seq} = St#rep_state.highest_seq_done, + {ok, Pending} = couch_replicator_api_wrap:get_pending_count(Db, Seq), + Pending. + + +update_task(State) -> + #rep_state{ + current_through_seq = {_, ThroughSeq}, + highest_seq_done = {_, HighestSeq} + } = State, + couch_task_status:update( + rep_stats(State) ++ [ + {source_seq, HighestSeq}, + {through_seq, ThroughSeq} + ]). + + +rep_stats(State) -> + #rep_state{ + committed_seq = {_, CommittedSeq}, + stats = Stats + } = State, + [ + {revisions_checked, couch_replicator_stats:missing_checked(Stats)}, + {missing_revisions_found, couch_replicator_stats:missing_found(Stats)}, + {docs_read, couch_replicator_stats:docs_read(Stats)}, + {docs_written, couch_replicator_stats:docs_written(Stats)}, + {changes_pending, get_pending_count(State)}, + {doc_write_failures, couch_replicator_stats:doc_write_failures(Stats)}, + {checkpointed_source_seq, CommittedSeq} + ]. + + +replication_start_error({unauthorized, DbUri}) -> + {unauthorized, <<"unauthorized to access or create database ", DbUri/binary>>}; +replication_start_error({db_not_found, DbUri}) -> + {db_not_found, <<"could not open ", DbUri/binary>>}; +replication_start_error(Error) -> + Error. diff --git a/src/couch_replicator/src/couch_replicator_scheduler_sup.erl b/src/couch_replicator/src/couch_replicator_scheduler_sup.erl new file mode 100644 index 000000000..8ab55f838 --- /dev/null +++ b/src/couch_replicator/src/couch_replicator_scheduler_sup.erl @@ -0,0 +1,62 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_replicator_scheduler_sup). + +-behaviour(supervisor). + +%% public api +-export([ + start_link/0, + start_child/1, + terminate_child/1 +]). + +%% supervisor api +-export([ + init/1 +]). + + +%% includes +-include("couch_replicator.hrl"). + + +%% public functions + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + + +start_child(#rep{} = Rep) -> + supervisor:start_child(?MODULE, [Rep]). + + +terminate_child(Pid) -> + supervisor:terminate_child(?MODULE, Pid). + +%% supervisor functions + +init(_Args) -> + Start = {couch_replicator_scheduler_job, start_link, []}, + Restart = temporary, % A crashed job is not entitled to immediate restart. + Shutdown = 5000, + Type = worker, + Modules = [couch_replicator_scheduler_job], + + RestartStrategy = simple_one_for_one, + MaxR = 10, + MaxT = 3, + + ChildSpec = + {undefined, Start, Restart, Shutdown, Type, Modules}, + {ok, {{RestartStrategy, MaxR, MaxT}, [ChildSpec]}}. |