summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2020-08-28 04:35:07 -0400
committerNick Vatamaniuc <nickva@users.noreply.github.com>2020-09-15 16:13:46 -0400
commit7dd18993f25b107db05a9e99e01770c7f84bd33b (patch)
tree60e570abc5420627b62742ce5f2293397b5a1ffe
parent5b98e8a6c169449d1a3e362e52e86822ef350ed5 (diff)
downloadcouchdb-7dd18993f25b107db05a9e99e01770c7f84bd33b.tar.gz
Update backend replicator modules
Backend replicator modules execute replication jobs. The two main modules reponsible for job management and execution are `couch_replicator_job` and `couch_replicator_job_server`. `couch_replicator_job` - Is the main process of each replication job. When this process starts, it waits in the `couch_jobs:accept/2` call. This may take an indefinite amount of time. The spawned `couch_replicator_job` waiting in accept like that is called internally an "acceptor". The main pattern of execution is multiple acceptors are started, and after some of them accept jobs, they become "workers". - After it accepts a job, it parses the `couch_jobs` job data, which contains the `Rep` object and calculates the replication ID from it. Replication ID calculation may involve making a call to the source endpoint in order to fetch the contents of the javascript filter. Then, the `Rep` object and the replication ID is used to construct the internal `#rep_state{}` state record of the `gen_server`. - Multiple replication jobs may end up trying to run the same replication (with the same replication ID) concurrently. To manage these types of colisions, `check_ownership/3` function is called to determine if the current replication is the correct `owner` of that replication. If it is not, then the job maybe fail and exit. - There is a periodic checkpoint timer which sends a `checkpoint` message. The checkpoint frequency is calculated as the minimum of the `couch_jobs` activity timeout and the configured checkpoint interval. During each checkpoint attempt, there is a call to `couch_jobs:update/3` which updates the job's data with latest state and ensure the job doesn't get re-enqueued due to inactivity. - If the job completes, then `couch_jobs:finish/3` is called and the replication process exits `normal`. If the job crashes, there is a consecutive error count field (`?ERROR_COUNT`) which, is used to calculate the backoff penalty. There is an exponential backoff schedule, that starts with the base value, then doubles, but only up to a maximum value. Both the base and the maximum values are configurable with the `min_backoff_penalty_sec` and `max_backoff_penalty_sec` settings respecively. This is an improvement from before where the users could only influence the maximum backoff penalty by reducing the number of failed events kept by each job. `couch_replicator_server` - This is a module which spawns and keeps track of `couch_replicator_job` processes. - Periodically, every `interval_sec` seconds, it runs the `reschedule` function which checks for pending jobs. If they are some, it will start acceptors to run them. If those acceptors become workers, and if the total number of workers goes above the `max_jobs` setting, the oldest `continuous` workers will be stopped until the total number of jobs falls below `max_jobs` value. - In addition to `max_jobs` limit, there is a `max_churn` limit which determines up to how many job starts to allow during each scheduling interval. As jobs are started, they reduce the available churn "budget" for that cycle and after it goes below 0 no more jobs can start until the next cycle. - This module also performs transient job cleanup. After transient jobs stop running previously they simply vanished but with this update they maybe linger for at least `transient_job_max_age_sec` seconds.
-rw-r--r--src/couch_replicator/priv/stats_descriptions.cfg44
-rw-r--r--src/couch_replicator/src/couch_replicator_httpc.erl2
-rw-r--r--src/couch_replicator/src/couch_replicator_job.erl1612
-rw-r--r--src/couch_replicator/src/couch_replicator_job_server.erl370
4 files changed, 1999 insertions, 29 deletions
diff --git a/src/couch_replicator/priv/stats_descriptions.cfg b/src/couch_replicator/priv/stats_descriptions.cfg
index d9efb91dc..1bb151c1c 100644
--- a/src/couch_replicator/priv/stats_descriptions.cfg
+++ b/src/couch_replicator/priv/stats_descriptions.cfg
@@ -54,14 +54,6 @@
{type, counter},
{desc, <<"number of replicator workers started">>}
]}.
-{[couch_replicator, cluster_is_stable], [
- {type, gauge},
- {desc, <<"1 if cluster is stable, 0 if unstable">>}
-]}.
-{[couch_replicator, db_scans], [
- {type, counter},
- {desc, <<"number of times replicator db scans have been started">>}
-]}.
{[couch_replicator, docs, dbs_created], [
{type, counter},
{desc, <<"number of db shard creations seen by replicator doc processor">>}
@@ -70,10 +62,6 @@
{type, counter},
{desc, <<"number of db shard deletions seen by replicator doc processor">>}
]}.
-{[couch_replicator, docs, dbs_found], [
- {type, counter},
- {desc, <<"number of db shard found by replicator doc processor">>}
-]}.
{[couch_replicator, docs, db_changes], [
{type, counter},
{desc, <<"number of db changes processed by replicator doc processor">>}
@@ -88,43 +76,43 @@
]}.
{[couch_replicator, jobs, adds], [
{type, counter},
- {desc, <<"number of jobs added to replicator scheduler">>}
+ {desc, <<"number of jobs added to replicator">>}
]}.
-{[couch_replicator, jobs, duplicate_adds], [
+{[couch_replicator, jobs, removes], [
{type, counter},
- {desc, <<"number of duplicate jobs added to replicator scheduler">>}
+ {desc, <<"number of jobs removed from replicator">>}
]}.
-{[couch_replicator, jobs, removes], [
+{[couch_replicator, jobs, accepts], [
{type, counter},
- {desc, <<"number of jobs removed from replicator scheduler">>}
+ {desc, <<"number of job acceptors started">>}
]}.
{[couch_replicator, jobs, starts], [
{type, counter},
- {desc, <<"number of jobs started by replicator scheduler">>}
+ {desc, <<"number of jobs started by replicator">>}
]}.
{[couch_replicator, jobs, stops], [
{type, counter},
- {desc, <<"number of jobs stopped by replicator scheduler">>}
+ {desc, <<"number of jobs stopped by replicator">>}
]}.
{[couch_replicator, jobs, crashes], [
{type, counter},
- {desc, <<"number of job crashed noticed by replicator scheduler">>}
+ {desc, <<"number of job crashed noticed by replicator">>}
]}.
{[couch_replicator, jobs, running], [
{type, gauge},
- {desc, <<"replicator scheduler running jobs">>}
+ {desc, <<"replicator running jobs">>}
]}.
-{[couch_replicator, jobs, pending], [
+{[couch_replicator, jobs, accepting], [
{type, gauge},
- {desc, <<"replicator scheduler pending jobs">>}
+ {desc, <<"replicator acceptors count">>}
]}.
-{[couch_replicator, jobs, crashed], [
- {type, gauge},
- {desc, <<"replicator scheduler crashed jobs">>}
+{[couch_replicator, jobs, reschedules], [
+ {type, counter},
+ {desc, <<"replicator reschedule cycles counter">>}
]}.
-{[couch_replicator, jobs, total], [
+{[couch_replicator, jobs, pending], [
{type, gauge},
- {desc, <<"total number of replicator scheduler jobs">>}
+ {desc, <<"replicator pending count">>}
]}.
{[couch_replicator, connection, acquires], [
{type, counter},
diff --git a/src/couch_replicator/src/couch_replicator_httpc.erl b/src/couch_replicator/src/couch_replicator_httpc.erl
index 4dce319dc..f11d1895d 100644
--- a/src/couch_replicator/src/couch_replicator_httpc.erl
+++ b/src/couch_replicator/src/couch_replicator_httpc.erl
@@ -327,7 +327,7 @@ total_error_time_exceeded(#httpdb{first_error_timestamp = nil}) ->
false;
total_error_time_exceeded(#httpdb{first_error_timestamp = ErrorTimestamp}) ->
- HealthThresholdSec = couch_replicator_scheduler:health_threshold(),
+ HealthThresholdSec = couch_replicator_job:health_threshold(),
% Theshold value is halved because in the calling code the next step
% is a doubling. Not halving here could mean sleeping too long and
% exceeding the health threshold.
diff --git a/src/couch_replicator/src/couch_replicator_job.erl b/src/couch_replicator/src/couch_replicator_job.erl
new file mode 100644
index 000000000..ed3d00d7b
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_job.erl
@@ -0,0 +1,1612 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_job).
+
+
+-behaviour(gen_server).
+
+
+-export([
+ start_link/0
+]).
+
+-export([
+ init/1,
+ terminate/2,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ format_status/2,
+ code_change/3
+]).
+
+-export([
+ accept/0,
+ health_threshold/0
+]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
+-include("couch_replicator.hrl").
+
+
+-define(LOWEST_SEQ, 0).
+-define(DEFAULT_CHECKPOINT_INTERVAL, 30000).
+-define(STARTUP_JITTER_DEFAULT, 5000).
+-define(DEFAULT_MIN_BACKOFF_PENALTY_SEC, 32).
+-define(DEFAULT_MAX_BACKOFF_PENALTY_SEC, 2 * 24 * 3600).
+-define(DEFAULT_HEALTH_THRESHOLD_SEC, 2 * 60).
+-define(DEFAULT_MAX_HISTORY, 10).
+-define(DEFAULT_STATS_UPDATE_INTERVAL_SEC, 10).
+
+
+-record(rep_state, {
+ job,
+ job_data,
+ id,
+ base_id,
+ doc_id,
+ db_name,
+ db_uuid,
+ source_name,
+ target_name,
+ source,
+ target,
+ history,
+ checkpoint_history,
+ start_seq,
+ committed_seq,
+ current_through_seq,
+ seqs_in_progress = [],
+ highest_seq_done = {0, ?LOWEST_SEQ},
+ source_log,
+ target_log,
+ rep_starttime,
+ src_starttime,
+ tgt_starttime,
+ checkpoint_timer,
+ stats_timer,
+ changes_queue,
+ changes_manager,
+ changes_reader,
+ workers,
+ stats = couch_replicator_stats:new(),
+ session_id,
+ source_seq = nil,
+ use_checkpoints = true,
+ checkpoint_interval = ?DEFAULT_CHECKPOINT_INTERVAL,
+ user = null,
+ options = #{}
+}).
+
+
+start_link() ->
+ gen_server:start_link(?MODULE, [], []).
+
+
+init(_) ->
+ process_flag(trap_exit, true),
+ {ok, delayed_init, 0}.
+
+
+terminate(normal, #rep_state{} = State) ->
+ #rep_state{
+ job = Job,
+ job_data = JobData,
+ checkpoint_history = History
+ } = State,
+ ok = complete_job(undefined, Job, JobData, History),
+ close_endpoints(State);
+
+terminate(shutdown, #rep_state{} = State0) ->
+ % Replication stopped by the job server
+ State1 = cancel_timers(State0),
+ State3 = case do_checkpoint(State1) of
+ {ok, State2} ->
+ State2;
+ Error ->
+ Msg = "~p : Failed last checkpoint. Job: ~p Error: ~p",
+ couch_log:error(Msg, [?MODULE, State1#rep_state.id, Error]),
+ State1
+ end,
+ #rep_state{job = Job, job_data = JobData} = State3,
+ ok = reschedule(undefined, Job, JobData),
+ ok = close_endpoints(State3);
+
+terminate({shutdown, Error}, {init_error, Stack}) ->
+ % Termination in init, before the job had initialized
+ case Error of
+ max_backoff -> couch_log:warning("~p job backed off", [?MODULE]);
+ finished -> couch_log:notice("~p job finished in init", [?MODULE]);
+ _ -> couch_log:error("~p job failed ~p ~p", [?MODULE, Error, Stack])
+ end,
+ ok;
+
+terminate({shutdown, finished}, #rep_state{} = State) ->
+ % Job state was already updated and job is marked as finished
+ ok = close_endpoints(State);
+
+terminate({shutdown, halt}, #rep_state{} = State) ->
+ % Job is re-enqueued and possibly already running somewhere else
+ couch_log:error("~p job ~p halted", [?MODULE, State#rep_state.id]),
+ ok = close_endpoints(State);
+
+terminate(Reason0, #rep_state{} = State0) ->
+ State = update_job_state(State0),
+ Reason = case Reason0 of
+ {shutdown, Err} -> Err;
+ _ -> Reason0
+ end,
+ #rep_state{
+ id = RepId,
+ job = Job,
+ job_data = JobData,
+ source_name = Source,
+ target_name = Target
+ } = State,
+ couch_log:error("Replication `~s` (`~s` -> `~s`) failed: ~p",
+ [RepId, Source, Target, Reason]),
+ ok = reschedule_on_error(undefined, Job, JobData, Reason),
+ ok = close_endpoints(State).
+
+
+handle_call({add_stats, Stats}, From, State) ->
+ gen_server:reply(From, ok),
+ NewStats = couch_replicator_stats:sum_stats(State#rep_state.stats, Stats),
+ {noreply, State#rep_state{stats = NewStats}};
+
+handle_call({report_seq_done, Seq, StatsInc}, From, #rep_state{} = State) ->
+ #rep_state{
+ seqs_in_progress = SeqsInProgress,
+ highest_seq_done = HighestDone,
+ current_through_seq = ThroughSeq,
+ stats = Stats
+ } = State,
+ gen_server:reply(From, ok),
+ {NewThroughSeq0, NewSeqsInProgress} = case SeqsInProgress of
+ [] ->
+ {Seq, []};
+ [Seq | Rest] ->
+ {Seq, Rest};
+ [_ | _] ->
+ {ThroughSeq, ordsets:del_element(Seq, SeqsInProgress)}
+ end,
+ NewHighestDone = lists:max([HighestDone, Seq]),
+ NewThroughSeq = case NewSeqsInProgress of
+ [] ->
+ lists:max([NewThroughSeq0, NewHighestDone]);
+ _ ->
+ NewThroughSeq0
+ end,
+ couch_log:debug("Worker reported seq ~p, through seq was ~p, "
+ "new through seq is ~p, highest seq done was ~p, "
+ "new highest seq done is ~p~n"
+ "Seqs in progress were: ~p~nSeqs in progress are now: ~p",
+ [Seq, ThroughSeq, NewThroughSeq, HighestDone,
+ NewHighestDone, SeqsInProgress, NewSeqsInProgress]),
+ NewState = State#rep_state{
+ stats = couch_replicator_stats:sum_stats(Stats, StatsInc),
+ current_through_seq = NewThroughSeq,
+ seqs_in_progress = NewSeqsInProgress,
+ highest_seq_done = NewHighestDone
+ },
+ {noreply, maybe_update_job_state(NewState)};
+
+handle_call(Msg, _From, St) ->
+ {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
+
+
+handle_cast({report_seq, Seq},
+ #rep_state{seqs_in_progress = SeqsInProgress} = State) ->
+ NewSeqsInProgress = ordsets:add_element(Seq, SeqsInProgress),
+ {noreply, State#rep_state{seqs_in_progress = NewSeqsInProgress}};
+
+handle_cast(Msg, St) ->
+ {stop, {bad_cast, Msg}, St}.
+
+
+handle_info(timeout, delayed_init) ->
+ try delayed_init() of
+ {ok, State} -> {noreply, State};
+ {stop, Reason, State} -> {stop, Reason, State}
+ catch
+ exit:{shutdown, Exit} when Exit =:= finished orelse Exit =:= halt ->
+ Stack = erlang:get_stacktrace(),
+ {stop, {shutdown, Exit}, {init_error, Stack}};
+ _Tag:Error ->
+ ShutdownReason = {error, replication_start_error(Error)},
+ Stack = erlang:get_stacktrace(),
+ {stop, {shutdown, ShutdownReason}, {init_error, Stack}}
+ end;
+
+handle_info(stats_update, #rep_state{} = State) ->
+ State1 = cancel_stats_timer(State),
+ State2 = update_job_state(State1),
+ {noreply, State2};
+
+handle_info(checkpoint, State0) ->
+ State = cancel_checkpoint_timer(State0),
+ ok = check_user_filter(State),
+ case do_checkpoint(State) of
+ {ok, State1} ->
+ couch_stats:increment_counter([couch_replicator, checkpoints,
+ success]),
+ {noreply, start_checkpoint_timer(State1)};
+ Error ->
+ couch_stats:increment_counter([couch_replicator, checkpoints,
+ failure]),
+ {stop, Error, State}
+ end;
+
+handle_info(shutdown, St) ->
+ {stop, shutdown, St};
+
+handle_info({'EXIT', Pid, max_backoff}, State) ->
+ couch_log:error("Max backoff reached child process ~p", [Pid]),
+ {stop, {shutdown, max_backoff}, State};
+
+handle_info({'EXIT', Pid, {shutdown, max_backoff}}, State) ->
+ couch_log:error("Max backoff reached child process ~p", [Pid]),
+ {stop, {shutdown, max_backoff}, State};
+
+handle_info({'EXIT', Pid, normal}, #rep_state{changes_reader=Pid} = State) ->
+ {noreply, State};
+
+handle_info({'EXIT', Pid, Reason0}, #rep_state{changes_reader=Pid} = State) ->
+ couch_stats:increment_counter([couch_replicator, changes_reader_deaths]),
+ Reason = case Reason0 of
+ {changes_req_failed, _, _} = HttpFail ->
+ HttpFail;
+ {http_request_failed, _, _, {error, {code, Code}}} ->
+ {changes_req_failed, Code};
+ {http_request_failed, _, _, {error, Err}} ->
+ {changes_req_failed, Err};
+ Other ->
+ {changes_reader_died, Other}
+ end,
+ couch_log:error("ChangesReader process died with reason: ~p", [Reason]),
+ {stop, {shutdown, Reason}, cancel_timers(State)};
+
+handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager=Pid} = State) ->
+ {noreply, State};
+
+handle_info({'EXIT', Pid, Reason}, #rep_state{changes_manager=Pid} = State) ->
+ couch_stats:increment_counter([couch_replicator, changes_manager_deaths]),
+ couch_log:error("ChangesManager process died with reason: ~p", [Reason]),
+ {stop, {shutdown, {changes_manager_died, Reason}}, cancel_timers(State)};
+
+handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) ->
+ {noreply, State};
+
+handle_info({'EXIT', Pid, Reason}, #rep_state{changes_queue=Pid} = State) ->
+ couch_stats:increment_counter([couch_replicator, changes_queue_deaths]),
+ couch_log:error("ChangesQueue process died with reason: ~p", [Reason]),
+ {stop, {shutdown, {changes_queue_died, Reason}}, cancel_timers(State)};
+
+handle_info({'EXIT', Pid, normal}, #rep_state{workers = Workers} = State) ->
+ case Workers -- [Pid] of
+ Workers ->
+ %% Processes might be linked by replicator's auth plugins so
+ %% we tolerate them exiting `normal` here and don't crash
+ LogMsg = "~p: unknown pid exited `normal` ~p",
+ couch_log:error(LogMsg, [?MODULE, Pid]),
+ {noreply, State#rep_state{workers = Workers}};
+ [] ->
+ catch unlink(State#rep_state.changes_manager),
+ catch exit(State#rep_state.changes_manager, kill),
+ do_last_checkpoint(State);
+ Workers2 ->
+ {noreply, State#rep_state{workers = Workers2}}
+ end;
+
+handle_info({'EXIT', Pid, Reason}, #rep_state{workers = Workers} = State) ->
+ State2 = cancel_timers(State),
+ case lists:member(Pid, Workers) of
+ false ->
+ {stop, {unknown_process_died, Pid, Reason}, State2};
+ true ->
+ couch_stats:increment_counter([couch_replicator, worker_deaths]),
+ StopReason = case Reason of
+ {shutdown, _} = Err ->
+ Err;
+ Other ->
+ ErrLog = "Worker ~p died with reason: ~p",
+ couch_log:error(ErrLog, [Pid, Reason]),
+ {worker_died, Pid, Other}
+ end,
+ {stop, StopReason, State2}
+ end;
+
+handle_info({Ref, ready}, St) when is_reference(Ref) ->
+ LogMsg = "~p : spurious erlfdb future ready message ~p",
+ couch_log:notice(LogMsg, [?MODULE, Ref]),
+ {noreply, St};
+
+handle_info(Msg, St) ->
+ {stop, {bad_info, Msg}, St}.
+
+
+format_status(_Opt, [_PDict, State]) ->
+ #rep_state{
+ id = Id,
+ source = Source,
+ target = Target,
+ start_seq = StartSeq,
+ source_seq = SourceSeq,
+ committed_seq = CommitedSeq,
+ current_through_seq = ThroughSeq,
+ highest_seq_done = HighestSeqDone,
+ session_id = SessionId,
+ doc_id = DocId,
+ db_name = DbName,
+ options = Options
+ } = state_strip_creds(State),
+ [
+ {rep_id, Id},
+ {source, couch_replicator_api_wrap:db_uri(Source)},
+ {target, couch_replicator_api_wrap:db_uri(Target)},
+ {db_name, DbName},
+ {doc_id, DocId},
+ {options, Options},
+ {session_id, SessionId},
+ {start_seq, StartSeq},
+ {source_seq, SourceSeq},
+ {committed_seq, CommitedSeq},
+ {current_through_seq, ThroughSeq},
+ {highest_seq_done, HighestSeqDone}
+ ].
+
+
+code_change(_OldVsn, #rep_state{}=State, _Extra) ->
+ {ok, State}.
+
+
+accept() ->
+ couch_stats:increment_counter([couch_replicator, jobs, accepts]),
+ Now = erlang:system_time(second),
+ case couch_replicator_jobs:accept_job(Now + 5) of
+ {ok, Job, #{?REP := Rep} = JobData} ->
+ Normal = case Rep of
+ #{?OPTIONS := #{} = Options} ->
+ not maps:get(<<"continuous">>, Options, false);
+ _ ->
+ true
+ end,
+ couch_replicator_job_server:accepted(self(), Normal),
+ {ok, Job, JobData};
+ {error, not_found} ->
+ timer:sleep(accept_jitter_msec()),
+ ?MODULE:accept()
+ end.
+
+
+% Health threshold is the minimum amount of time an unhealthy job should run
+% crashing before it is considered to be healthy again. HealtThreashold should
+% not be 0 as jobs could start and immediately crash, and it shouldn't be
+% infinity, since then consecutive crashes would accumulate forever even if
+% job is back to normal.
+health_threshold() ->
+ config:get_integer("replicator", "health_threshold_sec",
+ ?DEFAULT_HEALTH_THRESHOLD_SEC).
+
+
+delayed_init() ->
+ {ok, Job, JobData} = accept(),
+ try do_init(Job, JobData) of
+ State = #rep_state{} -> {ok, State}
+ catch
+ exit:{http_request_failed, _, _, max_backoff} ->
+ Stack = erlang:get_stacktrace(),
+ reschedule_on_error(undefined, Job, JobData, max_backoff),
+ {stop, {shutdown, max_backoff}, {init_error, Stack}};
+ exit:{shutdown, Exit} when Exit =:= finished orelse Exit =:= halt ->
+ Stack = erlang:get_stacktrace(),
+ {stop, {shutdown, Exit}, {init_error, Stack}};
+ _Tag:Error ->
+ Reason = {error, replication_start_error(Error)},
+ Stack = erlang:get_stacktrace(),
+ ErrMsg = "~p : job ~p failed during startup ~p stack:~p",
+ couch_log:error(ErrMsg, [?MODULE, Job, Reason, Stack]),
+ reschedule_on_error(undefined, Job, JobData, Reason),
+ {stop, {shutdown, Reason}, {init_error, Stack}}
+ end.
+
+
+do_init(Job, #{} = JobData) ->
+ couch_stats:increment_counter([couch_replicator, jobs, starts]),
+ % This may make a network request, then may fail and reschedule the job
+ {RepId, BaseId} = get_rep_id(undefined, Job, JobData),
+ #{
+ ?DB_NAME := DbName,
+ ?DB_UUID := DbUUID,
+ ?DOC_ID := DocId
+ } = JobData,
+
+ ok = couch_replicator_docs:remove_state_fields(DbName, DbUUID, DocId),
+
+ % Finish if job is in a failed state already
+ case JobData of
+ #{?STATE := ?ST_FAILED, ?STATE_INFO := Error} ->
+ ok = fail_job(undefined, Job, JobData, Error),
+ exit({shutdown, finished});
+ #{?STATE := St} when is_binary(St), St =/= ?ST_FAILED ->
+ ok
+ end,
+
+ JobsTx = couch_jobs_fdb:get_jtx(),
+ {Job1, JobData1, Owner} = couch_jobs_fdb:tx(JobsTx, fun(JTx) ->
+ init_job_data(JTx, Job, JobData, RepId, BaseId)
+ end),
+
+ % Handle ownership decision here to be outside of the transaction
+ case Owner of
+ owner -> ok;
+ not_owner -> exit({shutdown, finished})
+ end,
+
+ #rep_state{
+ source = Source,
+ target = Target,
+ start_seq = {_Ts, StartSeq},
+ options = Options,
+ doc_id = DocId,
+ db_name = DbName
+ } = State = init_state(Job1, JobData1),
+
+ NumWorkers = maps:get(<<"worker_processes">>, Options),
+ BatchSize = maps:get(<<"worker_batch_size">>, Options),
+ {ok, ChangesQueue} = couch_work_queue:new([
+ {max_items, BatchSize * NumWorkers * 2},
+ {max_size, 100 * 1024 * NumWorkers}
+ ]),
+
+ % This starts the _changes reader process. It adds the changes from the
+ % source db to the ChangesQueue.
+ {ok, ChangesReader} = couch_replicator_changes_reader:start_link(
+ StartSeq, Source, ChangesQueue, Options
+ ),
+
+ % Changes manager - responsible for dequeing batches from the changes queue
+ % and deliver them to the worker processes.
+ ChangesManager = spawn_changes_manager(self(), ChangesQueue, BatchSize),
+
+ % This starts the worker processes. They ask the changes queue manager for
+ % a a batch of _changes rows to process -> check which revs are missing in
+ % the target, and for the missing ones, it copies them from the source to
+ % the target.
+ MaxConns = maps:get(<<"http_connections">>, Options),
+ Workers = lists:map(fun(_) ->
+ couch_stats:increment_counter([couch_replicator, workers_started]),
+ {ok, Pid} = couch_replicator_worker:start_link(self(), Source, Target,
+ ChangesManager, MaxConns),
+ Pid
+ end, lists:seq(1, NumWorkers)),
+
+ log_replication_start(State),
+
+ State1 = State#rep_state{
+ changes_queue = ChangesQueue,
+ changes_manager = ChangesManager,
+ changes_reader = ChangesReader,
+ workers = Workers
+ },
+
+ update_job_state(State1).
+
+
+init_job_data(#{jtx := true} = JTx, Job, #{?REP_ID := RepId} = JobData, RepId,
+ _BaseId) ->
+ {Job, JobData, check_ownership(JTx, Job, JobData)};
+
+init_job_data(#{jtx := true} = JTx, Job, #{} = JobData, RepId, BaseId) ->
+ #{
+ ?REP := Rep,
+ ?REP_ID := OldRepId,
+ ?DB_UUID := DbUUID,
+ ?DOC_ID := DocId
+ } = JobData,
+ JobId = couch_replicator_ids:job_id(Rep, DbUUID, DocId),
+ Now = erlang:system_time(second),
+ JobData1 = JobData#{
+ ?REP_ID := RepId,
+ ?BASE_ID := BaseId,
+ ?STATE := ?ST_RUNNING,
+ ?STATE_INFO := null,
+ ?LAST_START := Now,
+ ?REP_NODE := erlang:atom_to_binary(node(), utf8),
+ ?REP_PID := list_to_binary(pid_to_list(self())),
+ ?LAST_UPDATED := Now
+ },
+ JobData2 = case is_binary(OldRepId) andalso OldRepId =/= RepId of
+ true ->
+ % Handle Replication ID change
+ ok = couch_replicator_jobs:clear_old_rep_id(JTx, JobId, OldRepId),
+ JobData1#{
+ ?REP_STATS := #{},
+ ?JOB_HISTORY := []
+ };
+ false ->
+ JobData1
+ end,
+ JobData3 = hist_append(?HIST_STARTED, Now, JobData2, undefined),
+ case check_ownership(JTx, Job, JobData3) of
+ owner ->
+ couch_stats:increment_counter([couch_replicator, jobs, starts]),
+ {Job1, JobData4} = update_job_data(JTx, Job, JobData3),
+ {Job1, JobData4, owner};
+ not_owner ->
+ {Job, JobData3, not_owner}
+ end.
+
+
+check_ownership(#{jtx := true} = JTx, Job, JobData) ->
+ #{
+ ?REP_ID := RepId,
+ ?REP := Rep,
+ ?DB_UUID := DbUUID,
+ ?DOC_ID := DocId
+ } = JobData,
+ JobId = couch_replicator_ids:job_id(Rep, DbUUID, DocId),
+ case couch_replicator_jobs:try_update_rep_id(JTx, JobId, RepId) of
+ ok ->
+ owner;
+ {error, {replication_job_conflict, OtherJobId}} ->
+ case couch_replicator_jobs:get_job_data(JTx, OtherJobId) of
+ {ok, #{?STATE := S, ?DB_NAME := null}} when
+ S == ?ST_RUNNING; S == ?ST_PENDING ->
+ % Conflicting job is a transient job, not associated with a
+ % _replicator doc, so we let this job retry. This is also
+ % partly done for compatibility with pervious replicator
+ % behavior.
+ Error = <<"Duplicate job running: ", OtherJobId/binary>>,
+ reschedule_on_error(JTx, Job, JobData, Error),
+ not_owner;
+ {ok, #{?STATE := S, ?DB_NAME := <<_/binary>>}} when
+ S == ?ST_RUNNING; S == ?ST_PENDING ->
+ % Conflicting job is a permanent replication job, so this
+ % job is marked as failed.
+ Error = <<"Duplicate job running: ", OtherJobId/binary>>,
+ fail_job(JTx, Job, JobData, Error),
+ not_owner;
+ {ok, #{}} ->
+ LogMsg = "~p : Job ~p usurping job ~p for replication ~p",
+ couch_log:warning(LogMsg, [?MODULE, JobId, OtherJobId,
+ RepId]),
+ couch_replicator_jobs:update_rep_id(JTx, JobId, RepId),
+ owner;
+ {error, not_found} ->
+ LogMsg = "~p : Orphan replication job reference ~p -> ~p",
+ couch_log:error(LogMsg, [?MODULE, RepId, OtherJobId]),
+ couch_replicator_jobs:update_rep_id(JTx, JobId, RepId),
+ owner
+ end
+ end.
+
+
+update_job_data(Tx, #rep_state{} = State) ->
+ #rep_state{job = Job, job_data = JobData} = State,
+ {Job1, JobData1} = update_job_data(Tx, Job, JobData),
+ State#rep_state{job = Job1, job_data = JobData1}.
+
+
+update_job_data(Tx, Job, #{} = JobData) ->
+ case couch_replicator_jobs:update_job_data(Tx, Job, JobData) of
+ {ok, Job1} ->
+ {Job1, JobData};
+ {error, halt} ->
+ exit({shutdown, halt})
+ end.
+
+
+update_active_task_info(#rep_state{} = State) ->
+ #rep_state{
+ job_data = JobData,
+ user = User,
+ id = RepId,
+ db_name = DbName,
+ doc_id = DocId,
+ source_name = Source,
+ target_name = Target,
+ options = Options,
+ highest_seq_done = {_, SourceSeq},
+ checkpoint_interval = CheckpointInterval
+ } = State,
+
+ #{
+ ?REP := #{?START_TIME := StartTime},
+ ?REP_STATS := Stats,
+ ?REP_NODE := Node,
+ ?REP_PID := Pid,
+ ?LAST_UPDATED := LastUpdated
+ } = JobData,
+
+ Info = maps:merge(Stats, #{
+ <<"type">> => <<"replication">>,
+ <<"user">> => User,
+ <<"replication_id">> => RepId,
+ <<"database">> => DbName,
+ <<"doc_id">> => DocId,
+ <<"source">> => ?l2b(Source),
+ <<"target">> => ?l2b(Target),
+ <<"continuous">> => maps:get(<<"continuous">>, Options, false),
+ <<"source_seq">> => SourceSeq,
+ <<"checkpoint_interval">> => CheckpointInterval,
+ <<"node">> => Node,
+ <<"pid">> => Pid,
+ <<"updated_on">> => LastUpdated,
+ <<"started_on">> => StartTime
+ }),
+
+ JobData1 = fabric2_active_tasks:update_active_task_info(JobData, Info),
+ State#rep_state{job_data = JobData1}.
+
+
+% Transient jobs don't get rescheduled on error with the exception of
+% max_backoff errors.
+%
+reschedule_on_error(JTx, Job, #{?DB_NAME := null} = JobData, Error) when
+ Error =/= max_backoff ->
+ fail_job(JTx, Job, JobData, Error);
+
+reschedule_on_error(JTx, Job, #{} = JobData0, Error0) ->
+ Error = error_info(Error0),
+
+ Now = erlang:system_time(second),
+
+ JobData = maybe_heal(JobData0, Now),
+ #{?ERROR_COUNT := ErrorCount} = JobData,
+ JobData1 = JobData#{
+ ?STATE := ?ST_CRASHING,
+ ?STATE_INFO := Error,
+ ?ERROR_COUNT := ErrorCount + 1,
+ ?LAST_ERROR := Error,
+ ?REP_NODE := null,
+ ?REP_PID := null
+ },
+ JobData2 = hist_append(?HIST_CRASHED, Now, JobData1, Error),
+ JobData3 = hist_append(?HIST_PENDING, Now, JobData2, undefined),
+ JobData4 = fabric2_active_tasks:update_active_task_info(JobData3, #{}),
+
+ couch_stats:increment_counter([couch_replicator, jobs, crashes]),
+
+ Time = get_backoff_time(ErrorCount + 1),
+ case couch_replicator_jobs:reschedule_job(JTx, Job, JobData4, Time) of
+ ok -> ok;
+ {error, halt} -> exit({shutdown, halt})
+ end.
+
+
+reschedule(JTx, Job, #{} = JobData) ->
+ Now = erlang:system_time(second),
+
+ JobData1 = JobData#{
+ ?STATE := ?ST_PENDING,
+ ?STATE_INFO := null,
+ ?LAST_ERROR := null,
+ ?REP_NODE := null,
+ ?REP_PID := null
+ },
+ JobData2 = hist_append(?HIST_STOPPED, Now, JobData1, undefined),
+ JobData3 = hist_append(?HIST_PENDING, Now, JobData2, undefined),
+ JobData4 = fabric2_active_tasks:update_active_task_info(JobData3, #{}),
+
+ couch_stats:increment_counter([couch_replicator, jobs, stops]),
+
+ Time = Now + couch_replicator_job_server:scheduling_interval_sec(),
+ case couch_replicator_jobs:reschedule_job(JTx, Job, JobData4, Time) of
+ ok -> ok;
+ {error, halt} -> exit({shutdown, halt})
+ end.
+
+
+fail_job(JTx, Job, #{} = JobData, Error0) ->
+ Error = error_info(Error0),
+
+ Now = erlang:system_time(second),
+
+ #{
+ ?ERROR_COUNT := ErrorCount,
+ ?DB_NAME := DbName,
+ ?DB_UUID := DbUUID,
+ ?DOC_ID := DocId
+ } = JobData,
+
+ JobData1 = JobData#{
+ ?STATE := ?ST_FAILED,
+ ?STATE_INFO := Error,
+ ?ERROR_COUNT := ErrorCount + 1,
+ ?REP_NODE := null,
+ ?REP_PID := null
+ },
+ JobData2 = hist_append(?HIST_CRASHED, Now, JobData1, Error),
+ JobData3 = fabric2_active_tasks:update_active_task_info(JobData2, #{}),
+
+ couch_stats:increment_counter([couch_replicator, jobs, crashes]),
+
+ case couch_replicator_jobs:finish_job(JTx, Job, JobData3) of
+ ok ->
+ couch_replicator_docs:update_failed(DbName, DbUUID, DocId, Error),
+ ok;
+ {error, halt} ->
+ exit({shutdown, halt})
+ end.
+
+
+complete_job(JTx, Job, #{} = JobData, CheckpointHistory) ->
+ #{
+ ?DB_NAME := Db,
+ ?DB_UUID := DbUUID,
+ ?DOC_ID := DocId,
+ ?REP_STATS := RepStats,
+ ?REP := Rep
+ } = JobData,
+
+ Now = erlang:system_time(second),
+
+ #{?START_TIME := StartTime} = Rep,
+ JobData1 = JobData#{
+ ?STATE := ?ST_COMPLETED,
+ ?CHECKPOINT_HISTORY := CheckpointHistory,
+ ?STATE_INFO := RepStats,
+ ?REP_NODE := null,
+ ?REP_PID := null
+ },
+ JobData2 = hist_append(?HIST_STOPPED, Now, JobData1, undefined),
+ JobData3 = fabric2_active_tasks:update_active_task_info(JobData2, #{}),
+
+ couch_stats:increment_counter([couch_replicator, jobs, stops]),
+
+ case couch_replicator_jobs:finish_job(JTx, Job, JobData3) of
+ ok ->
+ StartISO8601 = couch_replicator_utils:iso8601(StartTime),
+ Stats = maps:merge(RepStats, #{<<"start_time">> => StartISO8601}),
+ couch_replicator_docs:update_completed(Db, DbUUID, DocId, Stats),
+ ok;
+ {error, halt} ->
+ exit({shutdown, halt})
+ end.
+
+
+error_info(Error0) ->
+ case Error0 of
+ <<_/binary>> ->
+ Error0;
+ undefined ->
+ undefined;
+ null ->
+ null;
+ Atom when is_atom(Atom) ->
+ atom_to_binary(Atom, utf8);
+ {shutdown, Atom} when is_atom(Atom) ->
+ atom_to_binary(Atom, utf8);
+ {shutdown, Err} ->
+ couch_replicator_utils:rep_error_to_binary(Err);
+ {error, Atom} when is_atom(Atom) ->
+ atom_to_binary(Atom, utf8);
+ {error, {Err, Reason}} when is_atom(Err) ->
+ ReasonBin = couch_replicator_utils:rep_error_to_binary(Reason),
+ #{
+ <<"error">> => atom_to_binary(Err, utf8),
+ <<"reason">> => ReasonBin
+ };
+ _Other ->
+ couch_replicator_utils:rep_error_to_binary(Error0)
+ end.
+
+
+get_rep_id(JTx, Job, #{} = JobData) ->
+ #{?REP := Rep} = JobData,
+ try
+ couch_replicator_ids:replication_id(Rep)
+ catch
+ throw:{filter_fetch_error, Error} ->
+ Error1 = io_lib:format("Filter fetch error ~p", [Error]),
+ reschedule_on_error(JTx, Job, JobData, Error1),
+ exit({shutdown, finished})
+ end.
+
+
+% After job run continuously for some time we consider it "healed" and reset
+% its consecutive error count.
+maybe_heal(#{} = JobData, Now) ->
+ #{?LAST_START := LastStart} = JobData,
+ case Now - LastStart > health_threshold() of
+ true -> JobData#{?ERROR_COUNT := 0, ?LAST_ERROR := null};
+ false -> JobData
+ end.
+
+
+get_backoff_time(ErrCnt) ->
+ Max = min(max_backoff_penalty_sec(), 3600 * 24 * 30),
+ Min = max(min_backoff_penalty_sec(), 2),
+
+ % Calculate the max exponent so exponentiation doesn't blow up
+ MaxExp = math:log2(Max) - math:log2(Min),
+
+ % This is the recommended backoff amount
+ Wait = Min * math:pow(2, min(ErrCnt, MaxExp)),
+
+ % Apply a 25% jitter to avoid a thundering herd effect
+ WaitJittered = Wait * 0.75 + rand:uniform(trunc(Wait * 0.25) + 1),
+ erlang:system_time(second) + trunc(WaitJittered).
+
+
+headers_strip_creds([], Acc) ->
+ lists:reverse(Acc);
+
+headers_strip_creds([{Key, Value0} | Rest], Acc) ->
+ Value = case string:to_lower(Key) of
+ "authorization" -> "****";
+ _ -> Value0
+ end,
+ headers_strip_creds(Rest, [{Key, Value} | Acc]).
+
+
+httpdb_strip_creds(#httpdb{url = Url, headers = Headers} = HttpDb) ->
+ HttpDb#httpdb{
+ url = couch_util:url_strip_password(Url),
+ headers = headers_strip_creds(Headers, [])
+ };
+
+httpdb_strip_creds(LocalDb) ->
+ LocalDb.
+
+
+state_strip_creds(#rep_state{source = Source, target = Target} = State) ->
+ State#rep_state{
+ source = httpdb_strip_creds(Source),
+ target = httpdb_strip_creds(Target)
+ }.
+
+
+adjust_maxconn(Src = #{<<"http_connections">> := 1}, RepId) ->
+ Msg = "Adjusting minimum number of HTTP source connections to 2 for ~p",
+ couch_log:notice(Msg, [RepId]),
+ Src#{<<"http_connections">> := 2};
+
+adjust_maxconn(Src, _RepId) ->
+ Src.
+
+
+do_last_checkpoint(#rep_state{seqs_in_progress = [],
+ highest_seq_done = {_Ts, ?LOWEST_SEQ}} = State) ->
+ {stop, normal, cancel_timers(State)};
+
+do_last_checkpoint(#rep_state{seqs_in_progress = [],
+ highest_seq_done = Seq} = State) ->
+ State1 = State#rep_state{current_through_seq = Seq},
+ State2 = cancel_timers(State1),
+ case do_checkpoint(State2) of
+ {ok, State3} ->
+ couch_stats:increment_counter([couch_replicator, checkpoints,
+ success]),
+ {stop, normal, State3};
+ Error ->
+ couch_stats:increment_counter([couch_replicator, checkpoints,
+ failure]),
+ {stop, Error, State2}
+ end.
+
+
+start_checkpoint_timer(#rep_state{} = State) ->
+ CheckpointAfterMSec = State#rep_state.checkpoint_interval,
+ JobTimeoutMSec = couch_replicator_jobs:get_timeout() * 1000,
+ Wait1 = min(CheckpointAfterMSec, JobTimeoutMSec div 2),
+ Wait2 = trunc(Wait1 * 0.75) + rand:uniform(trunc(Wait1 * 0.25)),
+ TRef = erlang:send_after(Wait2, self(), checkpoint),
+ State#rep_state{checkpoint_timer = TRef}.
+
+
+cancel_checkpoint_timer(#rep_state{checkpoint_timer = nil} = State) ->
+ State;
+cancel_checkpoint_timer(#rep_state{checkpoint_timer = Timer} = State) ->
+ erlang:cancel_timer(Timer),
+ State#rep_state{checkpoint_timer = nil}.
+
+
+start_stats_timer(#rep_state{} = State) ->
+ MSec = stats_update_interval_sec() * 1000,
+ TRef = erlang:send_after(MSec, self(), stats_update),
+ State#rep_state{stats_timer = TRef}.
+
+
+cancel_stats_timer(#rep_state{stats_timer = nil} = State) ->
+ State;
+cancel_stats_timer(#rep_state{stats_timer = Timer} = State) ->
+ erlang:cancel_timer(Timer),
+ receive stats_update -> ok after 0 -> ok end,
+ State#rep_state{stats_timer = nil}.
+
+
+cancel_timers(#rep_state{} = State) ->
+ State1 = cancel_checkpoint_timer(State),
+ cancel_stats_timer(State1).
+
+
+init_state(#{} = Job, #{} = JobData) ->
+ #{
+ ?REP := Rep,
+ ?REP_ID := Id,
+ ?BASE_ID := BaseId,
+ ?DB_NAME := DbName,
+ ?DB_UUID := DbUUID,
+ ?DOC_ID := DocId,
+ ?LAST_ERROR := LastError
+ } = JobData,
+ #{
+ ?SOURCE := Src0,
+ ?TARGET := Tgt,
+ ?START_TIME := StartTime,
+ ?OPTIONS := Options0
+ } = Rep,
+
+ % Optimize replication parameters if last time the jobs crashed because it
+ % was rate limited
+ Options = optimize_rate_limited_job(Options0, LastError),
+
+ % Adjust minimum number of http source connections to 2 to avoid deadlock
+ Src = adjust_maxconn(Src0, BaseId),
+ {ok, Source} = couch_replicator_api_wrap:db_open(Src),
+ CreateTgt = maps:get(<<"create_target">>, Options, false),
+ TParams = maps:get(<<"create_target_params">>, Options, #{}),
+
+ {ok, Target} = couch_replicator_api_wrap:db_open(Tgt, CreateTgt, TParams),
+
+ {ok, SourceInfo} = couch_replicator_api_wrap:get_db_info(Source),
+ {ok, TargetInfo} = couch_replicator_api_wrap:get_db_info(Target),
+
+ [SourceLog, TargetLog] = find_and_migrate_logs([Source, Target], Rep,
+ BaseId),
+
+ {StartSeq0, History} = compare_replication_logs(SourceLog, TargetLog),
+
+ #{?REP_STATS := Stats0} = JobData,
+ Stats1 = couch_replicator_stats:new(Stats0),
+ HistoryStats = case History of
+ [{[_ | _] = HProps} | _] -> couch_replicator_stats:new(HProps);
+ _ -> couch_replicator_stats:new()
+ end,
+ Stats2 = couch_replicator_stats:max_stats(Stats1, HistoryStats),
+
+ StartSeq1 = maps:get(<<"since_seq">>, Options, StartSeq0),
+ StartSeq = {0, StartSeq1},
+
+ SourceSeq = get_value(<<"update_seq">>, SourceInfo, ?LOWEST_SEQ),
+
+ #doc{body={CheckpointHistory}} = SourceLog,
+
+ State = #rep_state{
+ job = Job,
+ job_data = JobData,
+ id = Id,
+ base_id = BaseId,
+ source_name = couch_replicator_api_wrap:db_uri(Source),
+ target_name = couch_replicator_api_wrap:db_uri(Target),
+ source = Source,
+ target = Target,
+ options = Options,
+ history = History,
+ checkpoint_history = {[{<<"no_changes">>, true} | CheckpointHistory]},
+ start_seq = StartSeq,
+ current_through_seq = StartSeq,
+ committed_seq = StartSeq,
+ source_log = SourceLog,
+ target_log = TargetLog,
+ rep_starttime = StartTime,
+ src_starttime = get_value(<<"instance_start_time">>, SourceInfo),
+ tgt_starttime = get_value(<<"instance_start_time">>, TargetInfo),
+ session_id = couch_uuids:random(),
+ source_seq = SourceSeq,
+ use_checkpoints = maps:get(<<"use_checkpoints">>, Options),
+ checkpoint_interval = maps:get(<<"checkpoint_interval">>, Options),
+ stats = Stats2,
+ stats_timer = nil,
+ doc_id = DocId,
+ db_name = DbName,
+ db_uuid = DbUUID
+ },
+ start_checkpoint_timer(State).
+
+
+find_and_migrate_logs(DbList, #{} = Rep, BaseId) when is_binary(BaseId) ->
+ LogId = ?l2b(?LOCAL_DOC_PREFIX ++ BaseId),
+ fold_replication_logs(DbList, ?REP_ID_VERSION, LogId, LogId, Rep, []).
+
+
+fold_replication_logs([], _Vsn, _LogId, _NewId, _Rep, Acc) ->
+ lists:reverse(Acc);
+
+fold_replication_logs([Db | Rest] = Dbs, Vsn, LogId, NewId, #{} = Rep, Acc) ->
+ case couch_replicator_api_wrap:open_doc(Db, LogId, [ejson_body]) of
+ {error, <<"not_found">>} when Vsn > 1 ->
+ OldRepId = couch_replicator_ids:base_id(Rep, Vsn - 1),
+ fold_replication_logs(Dbs, Vsn - 1,
+ ?l2b(?LOCAL_DOC_PREFIX ++ OldRepId), NewId, Rep, Acc);
+ {error, <<"not_found">>} ->
+ fold_replication_logs(Rest, ?REP_ID_VERSION, NewId, NewId, Rep,
+ [#doc{id = NewId} | Acc]);
+ {ok, Doc} when LogId =:= NewId ->
+ fold_replication_logs(
+ Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [Doc | Acc]);
+ {ok, Doc} ->
+ MigratedLog = #doc{id = NewId, body = Doc#doc.body},
+ maybe_save_migrated_log(Rep, Db, MigratedLog, Doc#doc.id),
+ fold_replication_logs(Rest, ?REP_ID_VERSION, NewId, NewId, Rep,
+ [MigratedLog | Acc])
+ end.
+
+
+maybe_save_migrated_log(#{?OPTIONS := Options}, Db, #doc{} = Doc, OldId) ->
+ case maps:get(<<"use_checkpoints">>, Options) of
+ true ->
+ update_checkpoint(Db, Doc),
+ Msg = "Migrated replication checkpoint. Db:~p ~p -> ~p",
+ couch_log:notice(Msg, [httpdb_strip_creds(Db), OldId, Doc#doc.id]);
+ false ->
+ ok
+ end.
+
+
+spawn_changes_manager(Parent, ChangesQueue, BatchSize) ->
+ spawn_link(fun() ->
+ changes_manager_loop_open(Parent, ChangesQueue, BatchSize, 1)
+ end).
+
+
+changes_manager_loop_open(Parent, ChangesQueue, BatchSize, Ts) ->
+ receive
+ {get_changes, From} ->
+ case couch_work_queue:dequeue(ChangesQueue, BatchSize) of
+ closed ->
+ From ! {closed, self()};
+ {ok, ChangesOrLastSeqs} ->
+ ReportSeq = case lists:last(ChangesOrLastSeqs) of
+ {last_seq, Seq} -> {Ts, Seq};
+ #doc_info{high_seq = Seq} -> {Ts, Seq}
+ end,
+ Changes = lists:filter(fun
+ (#doc_info{}) -> true;
+ ({last_seq, _Seq}) -> false
+ end, ChangesOrLastSeqs),
+ ok = gen_server:cast(Parent, {report_seq, ReportSeq}),
+ From ! {changes, self(), Changes, ReportSeq}
+ end,
+ changes_manager_loop_open(Parent, ChangesQueue, BatchSize, Ts + 1)
+ end.
+
+
+do_checkpoint(#rep_state{use_checkpoints=false} = State) ->
+ NewState = State#rep_state{
+ checkpoint_history = {[{<<"use_checkpoints">>, false}]}
+ },
+ {ok, update_job_state(NewState)};
+do_checkpoint(#rep_state{current_through_seq=S, committed_seq=S} = State) ->
+ {ok, update_job_state(State)};
+do_checkpoint(State) ->
+ #rep_state{
+ source_name=SourceName,
+ target_name=TargetName,
+ source = Source,
+ target = Target,
+ history = OldHistory,
+ start_seq = {_, StartSeq},
+ current_through_seq = {_Ts, NewSeq} = NewTsSeq,
+ source_log = SourceLog,
+ target_log = TargetLog,
+ rep_starttime = RepStartTime,
+ src_starttime = SrcInstanceStartTime,
+ tgt_starttime = TgtInstanceStartTime,
+ stats = Stats,
+ options = Options,
+ session_id = SessionId
+ } = State,
+ case commit_to_both(Source, Target) of
+ {source_error, Reason} ->
+ {checkpoint_commit_failure, <<"Failure on source commit: ",
+ (couch_util:to_binary(Reason))/binary>>};
+ {target_error, Reason} ->
+ {checkpoint_commit_failure, <<"Failure on target commit: ",
+ (couch_util:to_binary(Reason))/binary>>};
+ {SrcInstanceStartTime, TgtInstanceStartTime} ->
+ couch_log:notice("recording a checkpoint for `~s` -> `~s` at "
+ "source update_seq ~p", [SourceName, TargetName, NewSeq]),
+ StartTime = couch_replicator_utils:rfc1123_local(RepStartTime),
+ EndTime = couch_replicator_utils:rfc1123_local(),
+ NewHistoryEntry = {[
+ {<<"session_id">>, SessionId},
+ {<<"start_time">>, StartTime},
+ {<<"end_time">>, EndTime},
+ {<<"start_last_seq">>, StartSeq},
+ {<<"end_last_seq">>, NewSeq},
+ {<<"recorded_seq">>, NewSeq},
+ {<<"missing_checked">>,
+ couch_replicator_stats:missing_checked(Stats)},
+ {<<"missing_found">>,
+ couch_replicator_stats:missing_found(Stats)},
+ {<<"docs_read">>,
+ couch_replicator_stats:docs_read(Stats)},
+ {<<"docs_written">>,
+ couch_replicator_stats:docs_written(Stats)},
+ {<<"doc_write_failures">>,
+ couch_replicator_stats:doc_write_failures(Stats)}
+ ]},
+ BaseHistory = [
+ {<<"session_id">>, SessionId},
+ {<<"source_last_seq">>, NewSeq},
+ {<<"replication_id_version">>, ?REP_ID_VERSION}
+ ] ++ case maps:get(<<"doc_ids">>, Options, undefined) of
+ undefined ->
+ [];
+ _DocIds ->
+ % backwards compatibility with the result of a replication
+ % by doc IDs in versions 0.11.x and 1.0.x TODO: deprecate
+ % (use same history format, simplify code)
+ [
+ {<<"start_time">>, StartTime},
+ {<<"end_time">>, EndTime},
+ {<<"docs_read">>,
+ couch_replicator_stats:docs_read(Stats)},
+ {<<"docs_written">>,
+ couch_replicator_stats:docs_written(Stats)},
+ {<<"doc_write_failures">>,
+ couch_replicator_stats:doc_write_failures(Stats)}
+ ]
+ end,
+ % limit history to 50 entries
+ NewRepHistory = {
+ BaseHistory ++ [{<<"history">>,
+ lists:sublist([NewHistoryEntry | OldHistory], 50)}]
+ },
+
+ try
+ {SrcRevPos, SrcRevId} = update_checkpoint(Source,
+ SourceLog#doc{body = NewRepHistory}, source),
+ {TgtRevPos, TgtRevId} = update_checkpoint(Target,
+ TargetLog#doc{body = NewRepHistory}, target),
+ NewState = State#rep_state{
+ checkpoint_history = NewRepHistory,
+ committed_seq = NewTsSeq,
+ source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}},
+ target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}}
+ },
+ {ok, update_job_state(NewState)}
+ catch throw:{checkpoint_commit_failure, _} = Failure ->
+ Failure
+ end;
+ {SrcInstanceStartTime, _NewTgtInstanceStartTime} ->
+ {checkpoint_commit_failure, <<"Target database out of sync. "
+ "Try to increase max_dbs_open at the target's server.">>};
+ {_NewSrcInstanceStartTime, TgtInstanceStartTime} ->
+ {checkpoint_commit_failure, <<"Source database out of sync. "
+ "Try to increase max_dbs_open at the source's server.">>};
+ {_NewSrcInstanceStartTime, _NewTgtInstanceStartTime} ->
+ {checkpoint_commit_failure, <<"Source and target databases out of "
+ "sync. Try to increase max_dbs_open at both servers.">>}
+ end.
+
+
+update_checkpoint(Db, Doc, DbType) ->
+ try
+ update_checkpoint(Db, Doc)
+ catch throw:{checkpoint_commit_failure, Reason} ->
+ throw({checkpoint_commit_failure, <<"Error updating the ",
+ (couch_util:to_binary(DbType))/binary, " checkpoint document: ",
+ (couch_util:to_binary(Reason))/binary>>})
+ end.
+
+
+update_checkpoint(Db, #doc{id = LogId, body = LogBody} = Doc) ->
+ try
+ case couch_replicator_api_wrap:update_doc(Db, Doc, [delay_commit]) of
+ {ok, PosRevId} -> PosRevId;
+ {error, Reason} -> throw({checkpoint_commit_failure, Reason})
+ end
+ catch throw:conflict ->
+ Opts = [ejson_body],
+ case (catch couch_replicator_api_wrap:open_doc(Db, LogId, Opts)) of
+ {ok, #doc{body = LogBody, revs = {Pos, [RevId | _]}}} ->
+ % This means that we were able to update successfully the
+ % checkpoint doc in a previous attempt but we got a connection
+ % error (timeout for e.g.) before receiving the success
+ % response. Therefore the request was retried and we got a
+ % conflict, as the revision we sent is not the current one. We
+ % confirm this by verifying the doc body we just got is the
+ % same that we have just sent.
+ {Pos, RevId};
+ _ ->
+ throw({checkpoint_commit_failure, conflict})
+ end
+ end.
+
+
+commit_to_both(Source, Target) ->
+ % commit the src async
+ ParentPid = self(),
+ SrcCommitPid = spawn_link(fun() ->
+ Result = (catch couch_replicator_api_wrap:ensure_full_commit(Source)),
+ ParentPid ! {self(), Result}
+ end),
+
+ % commit tgt sync
+ TgtResult = (catch couch_replicator_api_wrap:ensure_full_commit(Target)),
+
+ SrcResult = receive
+ {SrcCommitPid, Result} ->
+ unlink(SrcCommitPid),
+ receive
+ {'EXIT', SrcCommitPid, _} ->
+ ok
+ after
+ 0 -> ok
+ end,
+ Result;
+ {'EXIT', SrcCommitPid, Reason} ->
+ {error, Reason}
+ end,
+ case TgtResult of
+ {ok, TargetStartTime} ->
+ case SrcResult of
+ {ok, SourceStartTime} ->
+ {SourceStartTime, TargetStartTime};
+ SourceError ->
+ {source_error, SourceError}
+ end;
+ TargetError ->
+ {target_error, TargetError}
+ end.
+
+
+compare_replication_logs(SrcDoc, TgtDoc) ->
+ #doc{body={RepRecProps}} = SrcDoc,
+ #doc{body={RepRecPropsTgt}} = TgtDoc,
+ SrcSession = get_value(<<"session_id">>, RepRecProps),
+ TgtSession = get_value(<<"session_id">>, RepRecPropsTgt),
+ case SrcSession == TgtSession of
+ true ->
+ % if the records have the same session id,
+ % then we have a valid replication history
+ OldSeqNum = get_value(<<"source_last_seq">>, RepRecProps,
+ ?LOWEST_SEQ),
+ OldHistory = get_value(<<"history">>, RepRecProps, []),
+ {OldSeqNum, OldHistory};
+ false ->
+ SourceHistory = get_value(<<"history">>, RepRecProps, []),
+ TargetHistory = get_value(<<"history">>, RepRecPropsTgt, []),
+ couch_log:notice("Replication records differ. "
+ "Scanning histories to find a common ancestor.", []),
+ couch_log:debug("Record on source:~p~nRecord on target:~p~n",
+ [RepRecProps, RepRecPropsTgt]),
+ compare_rep_history(SourceHistory, TargetHistory)
+ end.
+
+
+compare_rep_history(S, T) when S =:= [] orelse T =:= [] ->
+ couch_log:notice("no common ancestry -- performing full replication", []),
+ {?LOWEST_SEQ, []};
+
+compare_rep_history([{S} | SourceRest], [{T} | TargetRest] = Target) ->
+ SourceId = get_value(<<"session_id">>, S),
+ case has_session_id(SourceId, Target) of
+ true ->
+ RecordSeqNum = get_value(<<"recorded_seq">>, S, ?LOWEST_SEQ),
+ couch_log:notice("found a common replication record with "
+ "source_seq ~p", [RecordSeqNum]),
+ {RecordSeqNum, SourceRest};
+ false ->
+ TargetId = get_value(<<"session_id">>, T),
+ case has_session_id(TargetId, SourceRest) of
+ true ->
+ RecordSeqNum = get_value(<<"recorded_seq">>, T,
+ ?LOWEST_SEQ),
+ couch_log:notice("found a common replication record with "
+ "source_seq ~p", [RecordSeqNum]),
+ {RecordSeqNum, TargetRest};
+ false ->
+ compare_rep_history(SourceRest, TargetRest)
+ end
+ end.
+
+
+has_session_id(_SessionId, []) ->
+ false;
+
+has_session_id(SessionId, [{Props} | Rest]) ->
+ case get_value(<<"session_id">>, Props, nil) of
+ SessionId -> true;
+ _Else -> has_session_id(SessionId, Rest)
+ end.
+
+
+get_pending_count(#rep_state{} = St) ->
+ #rep_state{
+ highest_seq_done = HighestSeqDone,
+ source = #httpdb{} = Db0
+ } = St,
+ {_, Seq} = HighestSeqDone,
+ Db = Db0#httpdb{retries = 3},
+ case (catch couch_replicator_api_wrap:get_pending_count(Db, Seq)) of
+ {ok, Pending} ->
+ Pending;
+ _ ->
+ null
+ end.
+
+
+maybe_update_job_state(#rep_state{} = State) ->
+ case State#rep_state.stats_timer of
+ nil -> start_stats_timer(State);
+ Ref when is_reference(Ref) -> State
+ end.
+
+
+update_job_state(#rep_state{} = State0) ->
+ State = cancel_stats_timer(State0),
+ #rep_state{
+ current_through_seq = {_, ThroughSeq},
+ highest_seq_done = {_, HighestSeq},
+ committed_seq = {_, CommittedSeq},
+ stats = Stats,
+ job_data = JobData
+ } = State,
+
+ Now = erlang:system_time(second),
+
+ RevisionsChecked = couch_replicator_stats:missing_checked(Stats),
+ MissingRevisions = couch_replicator_stats:missing_found(Stats),
+ DocsRead = couch_replicator_stats:docs_read(Stats),
+ DocsWritten = couch_replicator_stats:docs_written(Stats),
+ DocWriteFailures = couch_replicator_stats:doc_write_failures(Stats),
+ PendingCount = get_pending_count(State),
+
+ StatsMap = #{
+ <<"checkpointed_source_seq">> => CommittedSeq,
+ <<"source_seq">> => HighestSeq,
+ <<"through_seq">> => ThroughSeq,
+ <<"revisions_checked">> => RevisionsChecked,
+ <<"missing_revisions_found">> => MissingRevisions,
+ <<"docs_read">> => DocsRead,
+ <<"docs_written">> => DocsWritten,
+ <<"doc_write_failures">> => DocWriteFailures,
+ <<"changes_pending">> => PendingCount
+ },
+
+ JobData1 = JobData#{
+ ?REP_STATS := StatsMap,
+ ?LAST_UPDATED := Now
+ },
+
+ JobData2 = maybe_heal(JobData1, Now),
+
+ State1 = State#rep_state{job_data = JobData2},
+ State2 = update_active_task_info(State1),
+ update_job_data(undefined, State2).
+
+
+replication_start_error({unauthorized, DbUri}) ->
+ {unauthorized, <<"unauthorized to access or create database ",
+ DbUri/binary>>};
+
+replication_start_error({db_not_found, DbUri}) ->
+ {db_not_found, <<"could not open ", DbUri/binary>>};
+
+replication_start_error({http_request_failed, _Method, Url0,
+ {error, {error, {conn_failed, {error, nxdomain}}}}}) ->
+ Url = ?l2b(couch_util:url_strip_password(Url0)),
+ {nxdomain, <<"could not resolve ", Url/binary>>};
+
+replication_start_error({http_request_failed, Method0, Url0,
+ {error, {code, Code}}}) when is_integer(Code) ->
+ Url = ?l2b(couch_util:url_strip_password(Url0)),
+ Method = ?l2b(Method0),
+ CodeBin = integer_to_binary(Code),
+ {http_error_code, <<CodeBin/binary, " ", Method/binary, " ", Url/binary>>};
+
+replication_start_error(Error) ->
+ Error.
+
+
+log_replication_start(#rep_state{} = RepState) ->
+ #rep_state{
+ id = Id,
+ doc_id = DocId,
+ db_name = DbName,
+ options = Options,
+ source_name = Source,
+ target_name = Target,
+ session_id = Sid
+ } = RepState,
+ Workers = maps:get(<<"worker_processes">>, Options),
+ BatchSize = maps:get(<<"worker_batch_size">>, Options),
+ From = case DbName of
+ Name when is_binary(Name) ->
+ io_lib:format("from doc ~s:~s", [Name, DocId]);
+ _ ->
+ "from _replicate endpoint"
+ end,
+ Msg = "Starting replication ~s (~s -> ~s) ~s worker_procesess:~p"
+ " worker_batch_size:~p session_id:~s",
+ couch_log:notice(Msg, [Id, Source, Target, From, Workers, BatchSize, Sid]).
+
+
+check_user_filter(#rep_state{} = State) ->
+ #rep_state{
+ id = RepId,
+ base_id = BaseId,
+ job = Job,
+ job_data = JobData
+ } = State,
+ case get_rep_id(undefined, Job, JobData) of
+ {RepId, BaseId} ->
+ ok;
+ {NewId, NewBaseId} when is_binary(NewId), is_binary(NewBaseId) ->
+ LogMsg = "~p : Replication id was updated ~p -> ~p",
+ couch_log:error(LogMsg, [?MODULE, RepId, NewId]),
+ reschedule(undefined, Job, JobData),
+ exit({shutdown, finished})
+ end.
+
+
+hist_append(Type, Now, #{} = JobData, Info) when is_integer(Now),
+ is_binary(Type) ->
+ #{?JOB_HISTORY := Hist} = JobData,
+ Evt1 = #{?HIST_TYPE => Type, ?HIST_TIMESTAMP => Now},
+ Evt2 = case Info of
+ undefined ->
+ Evt1;
+ null ->
+ Evt1#{?HIST_REASON => null};
+ <<_/binary>> ->
+ Evt1#{?HIST_REASON => Info};
+ #{<<"error">> := Err, <<"reason">> := Reason} when is_binary(Err),
+ is_binary(Reason) ->
+ Evt1#{?HIST_REASON => Reason}
+ end,
+ Hist1 = [Evt2 | Hist],
+ Hist2 = lists:sublist(Hist1, max_history()),
+ JobData#{?JOB_HISTORY := Hist2}.
+
+
+optimize_rate_limited_job(#{} = Options, <<"max_backoff">>) ->
+ OptimizedSettings = #{
+ <<"checkpoint_interval">> => 5000,
+ <<"worker_processes">> => 2,
+ <<"worker_batch_size">> => 100,
+ <<"http_connections">> => 2
+ },
+ maps:merge(Options, OptimizedSettings);
+
+optimize_rate_limited_job(#{} = Options, _Other) ->
+ Options.
+
+
+close_endpoints(State) ->
+ State1 = cancel_timers(State),
+ couch_replicator_api_wrap:db_close(State1#rep_state.source),
+ couch_replicator_api_wrap:db_close(State1#rep_state.target),
+ ok.
+
+
+get_value(K, Props) ->
+ couch_util:get_value(K, Props).
+
+
+get_value(K, Props, Default) ->
+ couch_util:get_value(K, Props, Default).
+
+
+accept_jitter_msec() ->
+ couch_rand:uniform(erlang:max(1, max_startup_jitter_msec())).
+
+
+max_startup_jitter_msec() ->
+ config:get_integer("replicator", "startup_jitter",
+ ?STARTUP_JITTER_DEFAULT).
+
+
+min_backoff_penalty_sec() ->
+ config:get_integer("replicator", "min_backoff_penalty_sec",
+ ?DEFAULT_MIN_BACKOFF_PENALTY_SEC).
+
+
+max_backoff_penalty_sec() ->
+ config:get_integer("replicator", "max_backoff_penalty_sec",
+ ?DEFAULT_MAX_BACKOFF_PENALTY_SEC).
+
+
+max_history() ->
+ config:get_integer("replicator", "max_history", ?DEFAULT_MAX_HISTORY).
+
+
+stats_update_interval_sec() ->
+ config:get_integer("replicator", "stats_update_interval_sec",
+ ?DEFAULT_STATS_UPDATE_INTERVAL_SEC).
+
+
+-ifdef(TEST).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("fabric/test/fabric2_test.hrl").
+
+
+replication_start_error_test() ->
+ ?assertEqual({unauthorized, <<"unauthorized to access or create database"
+ " http://x/y">>}, replication_start_error({unauthorized,
+ <<"http://x/y">>})),
+ ?assertEqual({db_not_found, <<"could not open http://x/y">>},
+ replication_start_error({db_not_found, <<"http://x/y">>})),
+ ?assertEqual({nxdomain, <<"could not resolve http://x/y">>},
+ replication_start_error({http_request_failed, "GET", "http://x/y",
+ {error, {error, {conn_failed, {error, nxdomain}}}}})),
+ ?assertEqual({http_error_code, <<"503 GET http://x/y">>},
+ replication_start_error({http_request_failed, "GET", "http://x/y",
+ {error, {code, 503}}})).
+
+
+scheduler_job_format_status_test_() ->
+ {
+ foreach,
+ fun setup/0,
+ fun teardown/1,
+ [
+ ?TDEF_FE(t_format_status)
+ ]
+ }.
+
+
+setup() ->
+ meck:expect(config, get, fun(_, _, Default) -> Default end).
+
+
+teardown(_) ->
+ meck:unload().
+
+
+t_format_status(_) ->
+ {ok, Rep} = couch_replicator_parse:parse_rep(#{
+ <<"source">> => <<"http://u:p@h1/d1">>,
+ <<"target">> => <<"http://u:p@h2/d2">>,
+ <<"create_target">> => true
+ }, null),
+ State = #rep_state{
+ id = <<"base+ext">>,
+ job_data = #{?REP => Rep},
+ doc_id = <<"mydoc">>,
+ db_name = <<"mydb">>,
+ source = maps:get(?SOURCE, Rep),
+ target = maps:get(?TARGET, Rep),
+ options = maps:get(?OPTIONS, Rep),
+ session_id = <<"a">>,
+ start_seq = <<"1">>,
+ source_seq = <<"2">>,
+ committed_seq = <<"3">>,
+ current_through_seq = <<"4">>,
+ highest_seq_done = <<"5">>
+ },
+ Format = format_status(opts_ignored, [pdict, State]),
+ FmtOptions = proplists:get_value(options, Format),
+ ?assertEqual("http://u:*****@h1/d1/", proplists:get_value(source, Format)),
+ ?assertEqual("http://u:*****@h2/d2/", proplists:get_value(target, Format)),
+ ?assertEqual(<<"base+ext">>, proplists:get_value(rep_id, Format)),
+ ?assertEqual(true, maps:get(<<"create_target">>, FmtOptions)),
+ ?assertEqual(<<"mydoc">>, proplists:get_value(doc_id, Format)),
+ ?assertEqual(<<"mydb">>, proplists:get_value(db_name, Format)),
+ ?assertEqual(<<"a">>, proplists:get_value(session_id, Format)),
+ ?assertEqual(<<"1">>, proplists:get_value(start_seq, Format)),
+ ?assertEqual(<<"2">>, proplists:get_value(source_seq, Format)),
+ ?assertEqual(<<"3">>, proplists:get_value(committed_seq, Format)),
+ ?assertEqual(<<"4">>, proplists:get_value(current_through_seq, Format)),
+ ?assertEqual(<<"5">>, proplists:get_value(highest_seq_done, Format)).
+
+
+-endif.
diff --git a/src/couch_replicator/src/couch_replicator_job_server.erl b/src/couch_replicator/src/couch_replicator_job_server.erl
new file mode 100644
index 000000000..a2e90b061
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_job_server.erl
@@ -0,0 +1,370 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_job_server).
+
+
+-behaviour(gen_server).
+
+
+-export([
+ start_link/1
+]).
+
+-export([
+ init/1,
+ terminate/2,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ format_status/2,
+ code_change/3
+]).
+
+-export([
+ accepted/2,
+ scheduling_interval_sec/0,
+ reschedule/0
+]).
+
+
+-include("couch_replicator.hrl").
+
+
+-define(MAX_ACCEPTORS, 2).
+-define(MAX_JOBS, 500).
+-define(MAX_CHURN, 100).
+-define(INTERVAL_SEC, 15).
+-define(MIN_RUN_TIME_SEC, 60).
+-define(TRANSIENT_JOB_MAX_AGE_SEC, 86400). % 1 day
+
+
+start_link(Timeout) when is_integer(Timeout) ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, Timeout, []).
+
+
+init(Timeout) when is_integer(Timeout) ->
+ process_flag(trap_exit, true),
+ couch_replicator_jobs:set_timeout(),
+ St = #{
+ acceptors => #{},
+ workers => #{},
+ churn => 0,
+ config => get_config(),
+ timer => undefined,
+ timeout => Timeout
+ },
+ St1 = spawn_acceptors(St),
+ St2 = do_send_after(St1),
+ {ok, St2}.
+
+
+terminate(_, #{} = St) ->
+ #{
+ workers := Workers,
+ timeout := Timeout
+ } = St,
+ [stop_job(Pid) || Pid <- maps:keys(Workers)],
+ % Give jobs a chance to checkpoint and release their locks
+ wait_jobs_exit(Workers, Timeout),
+ ok.
+
+
+handle_call({accepted, Pid, Normal}, _From, #{} = St) ->
+ #{
+ acceptors := Acceptors,
+ workers := Workers,
+ churn := Churn
+ } = St,
+ case maps:is_key(Pid, Acceptors) of
+ true ->
+ Val = {Normal, erlang:system_time(second)},
+ St1 = St#{
+ acceptors := maps:remove(Pid, Acceptors),
+ workers := Workers#{Pid => Val},
+ churn := Churn + 1
+ },
+ {reply, ok, spawn_acceptors(St1)};
+ false ->
+ LogMsg = "~p : unknown acceptor processs ~p",
+ couch_log:error(LogMsg, [?MODULE, Pid]),
+ {stop, {unknown_acceptor_pid, Pid}, St}
+ end;
+
+handle_call(reschedule, _From, St) ->
+ {reply, ok, reschedule(St)};
+
+handle_call(Msg, _From, St) ->
+ {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
+
+
+handle_cast(Msg, St) ->
+ {stop, {bad_cast, Msg}, St}.
+
+
+handle_info(reschedule, #{} = St) ->
+ {noreply, reschedule(St)};
+
+handle_info({'EXIT', Pid, Reason}, #{} = St) ->
+ #{
+ acceptors := Acceptors,
+ workers := Workers
+ } = St,
+ case {maps:is_key(Pid, Acceptors), maps:is_key(Pid, Workers)} of
+ {true, false} -> handle_acceptor_exit(St, Pid, Reason);
+ {false, true} -> handle_worker_exit(St, Pid, Reason);
+ {false, false} -> handle_unknown_exit(St, Pid, Reason)
+ end;
+
+handle_info(Msg, St) ->
+ {stop, {bad_info, Msg}, St}.
+
+
+format_status(_Opt, [_PDict, #{} = St]) ->
+ #{
+ acceptors := Acceptors,
+ workers := Workers,
+ churn := Churn,
+ config := Config
+ } = St,
+ [
+ {acceptors, map_size(Acceptors)},
+ {workers, map_size(Workers)},
+ {churn, Churn},
+ {config, Config}
+ ].
+
+
+code_change(_OldVsn, St, _Extra) ->
+ {ok, St}.
+
+
+accepted(Worker, Normal) when is_pid(Worker), is_boolean(Normal) ->
+ gen_server:call(?MODULE, {accepted, Worker, Normal}, infinity).
+
+
+scheduling_interval_sec() ->
+ config:get_integer("replicator", "interval_sec", ?INTERVAL_SEC).
+
+
+reschedule() ->
+ gen_server:call(?MODULE, reschedule, infinity).
+
+
+% Scheduling logic
+
+do_send_after(#{} = St) ->
+ #{config := #{interval_sec := IntervalSec}} = St,
+ IntervalMSec = IntervalSec * 1000,
+ Jitter = IntervalMSec div 3,
+ WaitMSec = IntervalMSec + rand:uniform(max(1, Jitter)),
+ TRef = erlang:send_after(WaitMSec, self(), reschedule),
+ St#{timer := TRef}.
+
+
+cancel_timer(#{timer := undefined} = St) ->
+ St;
+
+cancel_timer(#{timer := TRef} = St) when is_reference(TRef) ->
+ erlang:cancel_timer(TRef),
+ St#{timer := undefined}.
+
+
+reschedule(#{} = St) ->
+ St1 = cancel_timer(St),
+ St2 = St1#{config := get_config()},
+ St3 = trim_jobs(St2),
+ St4 = start_excess_acceptors(St3),
+ St5 = transient_job_cleanup(St4),
+ St6 = update_stats(St5),
+ St7 = do_send_after(St6),
+ St7#{churn := 0}.
+
+
+start_excess_acceptors(#{} = St) ->
+ #{
+ churn := Churn,
+ acceptors := Acceptors,
+ workers := Workers,
+ config := #{max_jobs := MaxJobs, max_churn := MaxChurn}
+ } = St,
+
+ ACnt = maps:size(Acceptors),
+ WCnt = maps:size(Workers),
+
+ ChurnLeft = MaxChurn - Churn,
+ Slots = (MaxJobs + MaxChurn) - (ACnt + WCnt),
+ MinSlotsChurn = min(Slots, ChurnLeft),
+
+ Pending = if MinSlotsChurn =< 0 -> 0; true ->
+ % Don't fetch pending if we don't have enough slots or churn budget
+ couch_replicator_jobs:pending_count(undefined, MinSlotsChurn)
+ end,
+
+ couch_stats:update_gauge([couch_replicator, jobs, pending], Pending),
+
+ % Start new acceptors only if we have churn budget, there are pending jobs
+ % and we won't start more than max jobs + churn total acceptors
+ ToStart = max(0, lists:min([ChurnLeft, Pending, Slots])),
+
+ lists:foldl(fun(_, #{} = StAcc) ->
+ #{acceptors := AccAcceptors} = StAcc,
+ {ok, Pid} = couch_replicator_job:start_link(),
+ StAcc#{acceptors := AccAcceptors#{Pid => true}}
+ end, St, lists:seq(1, ToStart)).
+
+
+transient_job_cleanup(#{} = St) ->
+ #{
+ config := #{transient_job_max_age_sec := MaxAge}
+ } = St,
+ Now = erlang:system_time(second),
+ FoldFun = fun(_JTx, JobId, State, #{} = Data, ok) ->
+ IsTransient = maps:get(?DB_NAME, Data) =:= null,
+ IsOld = Now - maps:get(?LAST_UPDATED, Data) >= MaxAge,
+ case State =:= finished andalso IsTransient andalso IsOld of
+ true ->
+ ok = couch_replicator_jobs:remove_job(undefined, JobId),
+ couch_log:info("~p : Removed old job ~p", [?MODULE, JobId]),
+ ok;
+ false ->
+ ok
+ end
+ end,
+ ok = couch_replicator_jobs:fold_jobs(undefined, FoldFun, ok),
+ St.
+
+
+update_stats(#{} = St) ->
+ ACnt = maps:size(maps:get(acceptors, St)),
+ WCnt = maps:size(maps:get(workers, St)),
+ couch_stats:update_gauge([couch_replicator, jobs, accepting], ACnt),
+ couch_stats:update_gauge([couch_replicator, jobs, running], WCnt),
+ couch_stats:increment_counter([couch_replicator, jobs, reschedules]),
+ St.
+
+
+trim_jobs(#{} = St) ->
+ #{
+ workers := Workers,
+ churn := Churn,
+ config := #{max_jobs := MaxJobs}
+ } = St,
+ Excess = max(0, maps:size(Workers) - MaxJobs),
+ lists:foreach(fun stop_job/1, stop_candidates(St, Excess)),
+ St#{churn := Churn + Excess}.
+
+
+stop_candidates(#{}, Top) when is_integer(Top), Top =< 0 ->
+ [];
+
+stop_candidates(#{} = St, Top) when is_integer(Top), Top > 0 ->
+ #{
+ workers := Workers,
+ config := #{min_run_time_sec := MinRunTime}
+ } = St,
+
+ WList1 = maps:to_list(Workers), % [{Pid, {Normal, StartTime}},...]
+
+ % Filter out normal jobs and those which have just started running
+ MaxT = erlang:system_time(second) - MinRunTime,
+ WList2 = lists:filter(fun({_Pid, {Normal, T}}) ->
+ not Normal andalso T =< MaxT
+ end, WList1),
+
+ Sorted = lists:keysort(2, WList2),
+ Pids = lists:map(fun({Pid, _}) -> Pid end, Sorted),
+ lists:sublist(Pids, Top).
+
+
+stop_job(Pid) when is_pid(Pid) ->
+ % Replication jobs handle the shutdown signal and then checkpoint in
+ % terminate handler
+ exit(Pid, shutdown).
+
+
+wait_jobs_exit(#{} = Jobs, _) when map_size(Jobs) =:= 0 ->
+ ok;
+
+wait_jobs_exit(#{} = Jobs, Timeout) ->
+ receive
+ {'EXIT', Pid, _} ->
+ wait_jobs_exit(maps:remove(Pid, Jobs), Timeout)
+ after
+ Timeout ->
+ LogMsg = "~p : ~p jobs didn't terminate cleanly",
+ couch_log:error(LogMsg, [?MODULE, map_size(Jobs)]),
+ ok
+ end.
+
+
+spawn_acceptors(St) ->
+ #{
+ workers := Workers,
+ acceptors := Acceptors,
+ config := #{max_jobs := MaxJobs, max_acceptors := MaxAcceptors}
+ } = St,
+ ACnt = maps:size(Acceptors),
+ WCnt = maps:size(Workers),
+ case ACnt < MaxAcceptors andalso (ACnt + WCnt) < MaxJobs of
+ true ->
+ {ok, Pid} = couch_replicator_job:start_link(),
+ NewSt = St#{acceptors := Acceptors#{Pid => true}},
+ spawn_acceptors(NewSt);
+ false ->
+ St
+ end.
+
+
+% Worker process exit handlers
+
+handle_acceptor_exit(#{acceptors := Acceptors} = St, Pid, Reason) ->
+ St1 = St#{acceptors := maps:remove(Pid, Acceptors)},
+ LogMsg = "~p : acceptor process ~p exited with ~p",
+ couch_log:error(LogMsg, [?MODULE, Pid, Reason]),
+ {noreply, spawn_acceptors(St1)}.
+
+
+handle_worker_exit(#{workers := Workers} = St, Pid, Reason) ->
+ St1 = St#{workers := maps:remove(Pid, Workers)},
+ case Reason of
+ normal ->
+ ok;
+ shutdown ->
+ ok;
+ {shutdown, _} ->
+ ok;
+ _ ->
+ LogMsg = "~p : replicator job process ~p exited with ~p",
+ couch_log:error(LogMsg, [?MODULE, Pid, Reason])
+ end,
+ {noreply, spawn_acceptors(St1)}.
+
+
+handle_unknown_exit(St, Pid, Reason) ->
+ LogMsg = "~p : unknown process ~p exited with ~p",
+ couch_log:error(LogMsg, [?MODULE, Pid, Reason]),
+ {stop, {unknown_pid_exit, Pid}, St}.
+
+
+get_config() ->
+ Defaults = #{
+ max_acceptors => ?MAX_ACCEPTORS,
+ interval_sec => ?INTERVAL_SEC,
+ max_jobs => ?MAX_JOBS,
+ max_churn => ?MAX_CHURN,
+ min_run_time_sec => ?MIN_RUN_TIME_SEC,
+ transient_job_max_age_sec => ?TRANSIENT_JOB_MAX_AGE_SEC
+ },
+ maps:map(fun(K, Default) ->
+ config:get_integer("replicator", atom_to_list(K), Default)
+ end, Defaults).