summaryrefslogtreecommitdiff
path: root/src/couch_replicator/src/couch_replicator_job.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couch_replicator/src/couch_replicator_job.erl')
-rw-r--r--src/couch_replicator/src/couch_replicator_job.erl1609
1 files changed, 1609 insertions, 0 deletions
diff --git a/src/couch_replicator/src/couch_replicator_job.erl b/src/couch_replicator/src/couch_replicator_job.erl
new file mode 100644
index 000000000..c8c143a58
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_job.erl
@@ -0,0 +1,1609 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_job).
+
+
+-behaviour(gen_server).
+
+
+-export([
+ start_link/0
+]).
+
+-export([
+ init/1,
+ terminate/2,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ format_status/2,
+ code_change/3
+]).
+
+-export([
+ accept/0,
+ health_threshold/0
+]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
+-include("couch_replicator.hrl").
+
+
+-define(LOWEST_SEQ, 0).
+-define(DEFAULT_CHECKPOINT_INTERVAL, 30000).
+-define(STARTUP_JITTER_DEFAULT, 5000).
+-define(DEFAULT_MIN_BACKOFF_PENALTY_SEC, 32).
+-define(DEFAULT_MAX_BACKOFF_PENALTY_SEC, 2 * 24 * 3600).
+-define(DEFAULT_HEALTH_THRESHOLD_SEC, 2 * 60).
+-define(DEFAULT_MAX_HISTORY, 10).
+-define(DEFAULT_STATS_UPDATE_INTERVAL_SEC, 10).
+
+
+-record(rep_state, {
+ job,
+ job_data,
+ id,
+ base_id,
+ doc_id,
+ db_name,
+ db_uuid,
+ source_name,
+ target_name,
+ source,
+ target,
+ history,
+ checkpoint_history,
+ start_seq,
+ committed_seq,
+ current_through_seq,
+ seqs_in_progress = [],
+ highest_seq_done = {0, ?LOWEST_SEQ},
+ source_log,
+ target_log,
+ rep_starttime,
+ src_starttime,
+ tgt_starttime,
+ checkpoint_timer,
+ stats_timer,
+ changes_queue,
+ changes_manager,
+ changes_reader,
+ workers,
+ stats = couch_replicator_stats:new(),
+ session_id,
+ source_seq = nil,
+ use_checkpoints = true,
+ checkpoint_interval = ?DEFAULT_CHECKPOINT_INTERVAL,
+ user = null,
+ options = #{}
+}).
+
+
+start_link() ->
+ gen_server:start_link(?MODULE, [], []).
+
+
+init(_) ->
+ process_flag(trap_exit, true),
+ {ok, delayed_init, 0}.
+
+
+terminate(normal, #rep_state{} = State) ->
+ #rep_state{
+ job = Job,
+ job_data = JobData,
+ checkpoint_history = History
+ } = State,
+ ok = complete_job(undefined, Job, JobData, History),
+ close_endpoints(State);
+
+terminate(shutdown, #rep_state{} = State0) ->
+ % Replication stopped by the job server
+ State1 = cancel_timers(State0),
+ State3 = case do_checkpoint(State1) of
+ {ok, State2} ->
+ State2;
+ Error ->
+ Msg = "~p : Failed last checkpoint. Job: ~p Error: ~p",
+ couch_log:error(Msg, [?MODULE, State1#rep_state.id, Error]),
+ State1
+ end,
+ #rep_state{job = Job, job_data = JobData} = State3,
+ ok = reschedule(undefined, Job, JobData),
+ ok = close_endpoints(State3);
+
+terminate({shutdown, Error}, {init_error, Stack}) ->
+ % Termination in init, before the job had initialized
+ case Error of
+ max_backoff -> couch_log:warning("~p job backed off", [?MODULE]);
+ finished -> couch_log:notice("~p job finished in init", [?MODULE]);
+ _ -> couch_log:error("~p job failed ~p ~p", [?MODULE, Error, Stack])
+ end,
+ ok;
+
+terminate({shutdown, finished}, #rep_state{} = State) ->
+ % Job state was already updated and job is marked as finished
+ ok = close_endpoints(State);
+
+terminate({shutdown, halt}, #rep_state{} = State) ->
+ % Job is re-enqueued and possibly already running somewhere else
+ couch_log:error("~p job ~p halted", [?MODULE, State#rep_state.id]),
+ ok = close_endpoints(State);
+
+terminate(Reason0, #rep_state{} = State0) ->
+ State = update_job_state(State0),
+ Reason = case Reason0 of
+ {shutdown, Err} -> Err;
+ _ -> Reason0
+ end,
+ #rep_state{
+ id = RepId,
+ job = Job,
+ job_data = JobData,
+ source_name = Source,
+ target_name = Target
+ } = State,
+ couch_log:error("Replication `~s` (`~s` -> `~s`) failed: ~p",
+ [RepId, Source, Target, Reason]),
+ ok = reschedule_on_error(undefined, Job, JobData, Reason),
+ ok = close_endpoints(State).
+
+
+handle_call({add_stats, Stats}, From, State) ->
+ gen_server:reply(From, ok),
+ NewStats = couch_replicator_stats:sum_stats(State#rep_state.stats, Stats),
+ {noreply, State#rep_state{stats = NewStats}};
+
+handle_call({report_seq_done, Seq, StatsInc}, From, #rep_state{} = State) ->
+ #rep_state{
+ seqs_in_progress = SeqsInProgress,
+ highest_seq_done = HighestDone,
+ current_through_seq = ThroughSeq,
+ stats = Stats
+ } = State,
+ gen_server:reply(From, ok),
+ {NewThroughSeq0, NewSeqsInProgress} = case SeqsInProgress of
+ [] ->
+ {Seq, []};
+ [Seq | Rest] ->
+ {Seq, Rest};
+ [_ | _] ->
+ {ThroughSeq, ordsets:del_element(Seq, SeqsInProgress)}
+ end,
+ NewHighestDone = lists:max([HighestDone, Seq]),
+ NewThroughSeq = case NewSeqsInProgress of
+ [] ->
+ lists:max([NewThroughSeq0, NewHighestDone]);
+ _ ->
+ NewThroughSeq0
+ end,
+ couch_log:debug("Worker reported seq ~p, through seq was ~p, "
+ "new through seq is ~p, highest seq done was ~p, "
+ "new highest seq done is ~p~n"
+ "Seqs in progress were: ~p~nSeqs in progress are now: ~p",
+ [Seq, ThroughSeq, NewThroughSeq, HighestDone,
+ NewHighestDone, SeqsInProgress, NewSeqsInProgress]),
+ NewState = State#rep_state{
+ stats = couch_replicator_stats:sum_stats(Stats, StatsInc),
+ current_through_seq = NewThroughSeq,
+ seqs_in_progress = NewSeqsInProgress,
+ highest_seq_done = NewHighestDone
+ },
+ {noreply, maybe_update_job_state(NewState)};
+
+handle_call(Msg, _From, St) ->
+ {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
+
+
+handle_cast({report_seq, Seq},
+ #rep_state{seqs_in_progress = SeqsInProgress} = State) ->
+ NewSeqsInProgress = ordsets:add_element(Seq, SeqsInProgress),
+ {noreply, State#rep_state{seqs_in_progress = NewSeqsInProgress}};
+
+handle_cast(Msg, St) ->
+ {stop, {bad_cast, Msg}, St}.
+
+
+handle_info(timeout, delayed_init) ->
+ try delayed_init() of
+ {ok, State} -> {noreply, State};
+ {stop, Reason, State} -> {stop, Reason, State}
+ catch
+ exit:{shutdown, Exit} when Exit =:= finished orelse Exit =:= halt ->
+ Stack = erlang:get_stacktrace(),
+ {stop, {shutdown, Exit}, {init_error, Stack}};
+ _Tag:Error ->
+ ShutdownReason = {error, replication_start_error(Error)},
+ Stack = erlang:get_stacktrace(),
+ {stop, {shutdown, ShutdownReason}, {init_error, Stack}}
+ end;
+
+handle_info(stats_update, #rep_state{} = State) ->
+ State1 = cancel_stats_timer(State),
+ State2 = update_job_state(State1),
+ {noreply, State2};
+
+handle_info(checkpoint, State0) ->
+ State = cancel_checkpoint_timer(State0),
+ ok = check_user_filter(State),
+ case do_checkpoint(State) of
+ {ok, State1} ->
+ couch_stats:increment_counter([couch_replicator, checkpoints,
+ success]),
+ {noreply, start_checkpoint_timer(State1)};
+ Error ->
+ couch_stats:increment_counter([couch_replicator, checkpoints,
+ failure]),
+ {stop, Error, State}
+ end;
+
+handle_info(shutdown, St) ->
+ {stop, shutdown, St};
+
+handle_info({'EXIT', Pid, max_backoff}, State) ->
+ couch_log:error("Max backoff reached child process ~p", [Pid]),
+ {stop, {shutdown, max_backoff}, State};
+
+handle_info({'EXIT', Pid, {shutdown, max_backoff}}, State) ->
+ couch_log:error("Max backoff reached child process ~p", [Pid]),
+ {stop, {shutdown, max_backoff}, State};
+
+handle_info({'EXIT', Pid, normal}, #rep_state{changes_reader=Pid} = State) ->
+ {noreply, State};
+
+handle_info({'EXIT', Pid, Reason0}, #rep_state{changes_reader=Pid} = State) ->
+ couch_stats:increment_counter([couch_replicator, changes_reader_deaths]),
+ Reason = case Reason0 of
+ {changes_req_failed, _, _} = HttpFail ->
+ HttpFail;
+ {http_request_failed, _, _, {error, {code, Code}}} ->
+ {changes_req_failed, Code};
+ {http_request_failed, _, _, {error, Err}} ->
+ {changes_req_failed, Err};
+ Other ->
+ {changes_reader_died, Other}
+ end,
+ couch_log:error("ChangesReader process died with reason: ~p", [Reason]),
+ {stop, {shutdown, Reason}, cancel_timers(State)};
+
+handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager=Pid} = State) ->
+ {noreply, State};
+
+handle_info({'EXIT', Pid, Reason}, #rep_state{changes_manager=Pid} = State) ->
+ couch_stats:increment_counter([couch_replicator, changes_manager_deaths]),
+ couch_log:error("ChangesManager process died with reason: ~p", [Reason]),
+ {stop, {shutdown, {changes_manager_died, Reason}}, cancel_timers(State)};
+
+handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) ->
+ {noreply, State};
+
+handle_info({'EXIT', Pid, Reason}, #rep_state{changes_queue=Pid} = State) ->
+ couch_stats:increment_counter([couch_replicator, changes_queue_deaths]),
+ couch_log:error("ChangesQueue process died with reason: ~p", [Reason]),
+ {stop, {shutdown, {changes_queue_died, Reason}}, cancel_timers(State)};
+
+handle_info({'EXIT', Pid, normal}, #rep_state{workers = Workers} = State) ->
+ case Workers -- [Pid] of
+ Workers ->
+ %% Processes might be linked by replicator's auth plugins so
+ %% we tolerate them exiting `normal` here and don't crash
+ LogMsg = "~p: unknown pid exited `normal` ~p",
+ couch_log:error(LogMsg, [?MODULE, Pid]),
+ {noreply, State#rep_state{workers = Workers}};
+ [] ->
+ catch unlink(State#rep_state.changes_manager),
+ catch exit(State#rep_state.changes_manager, kill),
+ do_last_checkpoint(State);
+ Workers2 ->
+ {noreply, State#rep_state{workers = Workers2}}
+ end;
+
+handle_info({'EXIT', Pid, Reason}, #rep_state{workers = Workers} = State) ->
+ State2 = cancel_timers(State),
+ case lists:member(Pid, Workers) of
+ false ->
+ {stop, {unknown_process_died, Pid, Reason}, State2};
+ true ->
+ couch_stats:increment_counter([couch_replicator, worker_deaths]),
+ StopReason = case Reason of
+ {shutdown, _} = Err ->
+ Err;
+ Other ->
+ ErrLog = "Worker ~p died with reason: ~p",
+ couch_log:error(ErrLog, [Pid, Reason]),
+ {worker_died, Pid, Other}
+ end,
+ {stop, StopReason, State2}
+ end;
+
+handle_info({Ref, ready}, St) when is_reference(Ref) ->
+ LogMsg = "~p : spurious erlfdb future ready message ~p",
+ couch_log:notice(LogMsg, [?MODULE, Ref]),
+ {noreply, St};
+
+handle_info(Msg, St) ->
+ {stop, {bad_info, Msg}, St}.
+
+
+format_status(_Opt, [_PDict, State]) ->
+ #rep_state{
+ id = Id,
+ source = Source,
+ target = Target,
+ start_seq = StartSeq,
+ source_seq = SourceSeq,
+ committed_seq = CommitedSeq,
+ current_through_seq = ThroughSeq,
+ highest_seq_done = HighestSeqDone,
+ session_id = SessionId,
+ doc_id = DocId,
+ db_name = DbName,
+ options = Options
+ } = state_strip_creds(State),
+ [
+ {rep_id, Id},
+ {source, couch_replicator_api_wrap:db_uri(Source)},
+ {target, couch_replicator_api_wrap:db_uri(Target)},
+ {db_name, DbName},
+ {doc_id, DocId},
+ {options, Options},
+ {session_id, SessionId},
+ {start_seq, StartSeq},
+ {source_seq, SourceSeq},
+ {committed_seq, CommitedSeq},
+ {current_through_seq, ThroughSeq},
+ {highest_seq_done, HighestSeqDone}
+ ].
+
+
+code_change(_OldVsn, #rep_state{}=State, _Extra) ->
+ {ok, State}.
+
+
+accept() ->
+ couch_stats:increment_counter([couch_replicator, jobs, accepts]),
+ Now = erlang:system_time(second),
+ case couch_replicator_jobs:accept_job(Now + 5) of
+ {ok, Job, #{?REP := Rep} = JobData} ->
+ Normal = case Rep of
+ #{?OPTIONS := #{} = Options} ->
+ not maps:get(<<"continuous">>, Options, false);
+ _ ->
+ true
+ end,
+ couch_replicator_job_server:accepted(self(), Normal),
+ {ok, Job, JobData};
+ {error, not_found} ->
+ timer:sleep(accept_jitter_msec()),
+ ?MODULE:accept()
+ end.
+
+
+% Health threshold is the minimum amount of time an unhealthy job should run
+% crashing before it is considered to be healthy again. HealtThreashold should
+% not be 0 as jobs could start and immediately crash, and it shouldn't be
+% infinity, since then consecutive crashes would accumulate forever even if
+% job is back to normal.
+health_threshold() ->
+ config:get_integer("replicator", "health_threshold_sec",
+ ?DEFAULT_HEALTH_THRESHOLD_SEC).
+
+
+delayed_init() ->
+ {ok, Job, JobData} = accept(),
+ try do_init(Job, JobData) of
+ State = #rep_state{} -> {ok, State}
+ catch
+ exit:{http_request_failed, _, _, max_backoff} ->
+ Stack = erlang:get_stacktrace(),
+ reschedule_on_error(undefined, Job, JobData, max_backoff),
+ {stop, {shutdown, max_backoff}, {init_error, Stack}};
+ exit:{shutdown, Exit} when Exit =:= finished orelse Exit =:= halt ->
+ Stack = erlang:get_stacktrace(),
+ {stop, {shutdown, Exit}, {init_error, Stack}};
+ _Tag:Error ->
+ Reason = {error, replication_start_error(Error)},
+ Stack = erlang:get_stacktrace(),
+ ErrMsg = "~p : job ~p failed during startup ~p stack:~p",
+ couch_log:error(ErrMsg, [?MODULE, Job, Reason, Stack]),
+ reschedule_on_error(undefined, Job, JobData, Reason),
+ {stop, {shutdown, Reason}, {init_error, Stack}}
+ end.
+
+
+do_init(Job, #{} = JobData) ->
+ couch_stats:increment_counter([couch_replicator, jobs, starts]),
+ % This may make a network request, then may fail and reschedule the job
+ {RepId, BaseId} = get_rep_id(undefined, Job, JobData),
+ #{
+ ?DB_NAME := DbName,
+ ?DB_UUID := DbUUID,
+ ?DOC_ID := DocId
+ } = JobData,
+
+ ok = couch_replicator_docs:remove_state_fields(DbName, DbUUID, DocId),
+
+ % Finish if job is in a failed state already
+ case JobData of
+ #{?STATE := ?ST_FAILED, ?STATE_INFO := Error} ->
+ ok = fail_job(undefined, Job, JobData, Error),
+ exit({shutdown, finished});
+ #{?STATE := St} when is_binary(St), St =/= ?ST_FAILED ->
+ ok
+ end,
+
+ JobsTx = couch_jobs_fdb:get_jtx(),
+ {Job1, JobData1, Owner} = couch_jobs_fdb:tx(JobsTx, fun(JTx) ->
+ init_job_data(JTx, Job, JobData, RepId, BaseId)
+ end),
+
+ % Handle ownership decision here to be outside of the transaction
+ case Owner of
+ owner -> ok;
+ not_owner -> exit({shutdown, finished})
+ end,
+
+ #rep_state{
+ source = Source,
+ target = Target,
+ start_seq = {_Ts, StartSeq},
+ options = Options,
+ doc_id = DocId,
+ db_name = DbName
+ } = State = init_state(Job1, JobData1),
+
+ NumWorkers = maps:get(<<"worker_processes">>, Options),
+ BatchSize = maps:get(<<"worker_batch_size">>, Options),
+ {ok, ChangesQueue} = couch_work_queue:new([
+ {max_items, BatchSize * NumWorkers * 2},
+ {max_size, 100 * 1024 * NumWorkers}
+ ]),
+
+ % This starts the _changes reader process. It adds the changes from the
+ % source db to the ChangesQueue.
+ {ok, ChangesReader} = couch_replicator_changes_reader:start_link(
+ StartSeq, Source, ChangesQueue, Options
+ ),
+
+ % Changes manager - responsible for dequeing batches from the changes queue
+ % and deliver them to the worker processes.
+ ChangesManager = spawn_changes_manager(self(), ChangesQueue, BatchSize),
+
+ % This starts the worker processes. They ask the changes queue manager for
+ % a a batch of _changes rows to process -> check which revs are missing in
+ % the target, and for the missing ones, it copies them from the source to
+ % the target.
+ MaxConns = maps:get(<<"http_connections">>, Options),
+ Workers = lists:map(fun(_) ->
+ couch_stats:increment_counter([couch_replicator, workers_started]),
+ {ok, Pid} = couch_replicator_worker:start_link(self(), Source, Target,
+ ChangesManager, MaxConns),
+ Pid
+ end, lists:seq(1, NumWorkers)),
+
+ log_replication_start(State),
+
+ State1 = State#rep_state{
+ changes_queue = ChangesQueue,
+ changes_manager = ChangesManager,
+ changes_reader = ChangesReader,
+ workers = Workers
+ },
+
+ update_job_state(State1).
+
+
+init_job_data(#{jtx := true} = JTx, Job, #{} = JobData, RepId, BaseId) ->
+ #{
+ ?REP := Rep,
+ ?REP_ID := OldRepId,
+ ?DB_UUID := DbUUID,
+ ?DOC_ID := DocId
+ } = JobData,
+ JobId = couch_replicator_ids:job_id(Rep, DbUUID, DocId),
+ Now = erlang:system_time(second),
+ JobData1 = JobData#{
+ ?REP_ID := RepId,
+ ?BASE_ID := BaseId,
+ ?STATE := ?ST_RUNNING,
+ ?STATE_INFO := null,
+ ?LAST_START := Now,
+ ?REP_NODE := erlang:atom_to_binary(node(), utf8),
+ ?REP_PID := list_to_binary(pid_to_list(self())),
+ ?LAST_UPDATED := Now
+ },
+ JobData2 = case is_binary(OldRepId) andalso OldRepId =/= RepId of
+ true ->
+ % Handle Replication ID change
+ ok = couch_replicator_jobs:clear_old_rep_id(JTx, JobId, OldRepId),
+ JobData1#{
+ ?REP_STATS := #{},
+ ?JOB_HISTORY := []
+ };
+ false ->
+ JobData1
+ end,
+ JobData3 = hist_append(?HIST_STARTED, Now, JobData2, undefined),
+ case check_ownership(JTx, Job, JobData3) of
+ owner ->
+ couch_stats:increment_counter([couch_replicator, jobs, starts]),
+ {Job1, JobData4} = update_job_data(JTx, Job, JobData3),
+ {Job1, JobData4, owner};
+ not_owner ->
+ {Job, JobData3, not_owner}
+ end.
+
+
+check_ownership(#{jtx := true} = JTx, Job, JobData) ->
+ #{
+ ?REP_ID := RepId,
+ ?REP := Rep,
+ ?DB_UUID := DbUUID,
+ ?DOC_ID := DocId
+ } = JobData,
+ JobId = couch_replicator_ids:job_id(Rep, DbUUID, DocId),
+ case couch_replicator_jobs:try_update_rep_id(JTx, JobId, RepId) of
+ ok ->
+ owner;
+ {error, {replication_job_conflict, OtherJobId}} ->
+ case couch_replicator_jobs:get_job_data(JTx, OtherJobId) of
+ {ok, #{?STATE := S, ?DB_NAME := null}} when
+ S == ?ST_RUNNING; S == ?ST_PENDING ->
+ % Conflicting job is a transient job, not associated with a
+ % _replicator doc, so we let this job retry. This is also
+ % partly done for compatibility with pervious replicator
+ % behavior.
+ Error = <<"Duplicate job running: ", OtherJobId/binary>>,
+ reschedule_on_error(JTx, Job, JobData, Error),
+ not_owner;
+ {ok, #{?STATE := S, ?DB_NAME := <<_/binary>>}} when
+ S == ?ST_RUNNING; S == ?ST_PENDING ->
+ % Conflicting job is a permanent replication job, so this
+ % job is marked as failed.
+ Error = <<"Duplicate job running: ", OtherJobId/binary>>,
+ fail_job(JTx, Job, JobData, Error),
+ not_owner;
+ {ok, #{}} ->
+ LogMsg = "~p : Job ~p usurping job ~p for replication ~p",
+ couch_log:warning(LogMsg, [?MODULE, JobId, OtherJobId,
+ RepId]),
+ couch_replicator_jobs:update_rep_id(JTx, JobId, RepId),
+ owner;
+ {error, not_found} ->
+ LogMsg = "~p : Orphan replication job reference ~p -> ~p",
+ couch_log:error(LogMsg, [?MODULE, RepId, OtherJobId]),
+ couch_replicator_jobs:update_rep_id(JTx, JobId, RepId),
+ owner
+ end
+ end.
+
+
+update_job_data(Tx, #rep_state{} = State) ->
+ #rep_state{job = Job, job_data = JobData} = State,
+ {Job1, JobData1} = update_job_data(Tx, Job, JobData),
+ State#rep_state{job = Job1, job_data = JobData1}.
+
+
+update_job_data(Tx, Job, #{} = JobData) ->
+ case couch_replicator_jobs:update_job_data(Tx, Job, JobData) of
+ {ok, Job1} ->
+ {Job1, JobData};
+ {error, halt} ->
+ exit({shutdown, halt})
+ end.
+
+
+update_active_task_info(#rep_state{} = State) ->
+ #rep_state{
+ job_data = JobData,
+ user = User,
+ id = RepId,
+ db_name = DbName,
+ doc_id = DocId,
+ source_name = Source,
+ target_name = Target,
+ options = Options,
+ highest_seq_done = {_, SourceSeq},
+ checkpoint_interval = CheckpointInterval
+ } = State,
+
+ #{
+ ?REP := #{?START_TIME := StartTime},
+ ?REP_STATS := Stats,
+ ?REP_NODE := Node,
+ ?REP_PID := Pid,
+ ?LAST_UPDATED := LastUpdated
+ } = JobData,
+
+ Info = maps:merge(Stats, #{
+ <<"type">> => <<"replication">>,
+ <<"user">> => User,
+ <<"replication_id">> => RepId,
+ <<"database">> => DbName,
+ <<"doc_id">> => DocId,
+ <<"source">> => ?l2b(Source),
+ <<"target">> => ?l2b(Target),
+ <<"continuous">> => maps:get(<<"continuous">>, Options, false),
+ <<"source_seq">> => SourceSeq,
+ <<"checkpoint_interval">> => CheckpointInterval,
+ <<"node">> => Node,
+ <<"pid">> => Pid,
+ <<"updated_on">> => LastUpdated,
+ <<"started_on">> => StartTime
+ }),
+
+ JobData1 = fabric2_active_tasks:update_active_task_info(JobData, Info),
+ State#rep_state{job_data = JobData1}.
+
+
+% Transient jobs don't get rescheduled on error with the exception of
+% max_backoff errors.
+%
+reschedule_on_error(JTx, Job, #{?DB_NAME := null} = JobData, Error) when
+ Error =/= max_backoff ->
+ fail_job(JTx, Job, JobData, Error);
+
+reschedule_on_error(JTx, Job, #{} = JobData0, Error0) ->
+ Error = error_info(Error0),
+
+ Now = erlang:system_time(second),
+
+ JobData = maybe_heal(JobData0, Now),
+ #{?ERROR_COUNT := ErrorCount} = JobData,
+ JobData1 = JobData#{
+ ?STATE := ?ST_CRASHING,
+ ?STATE_INFO := Error,
+ ?ERROR_COUNT := ErrorCount + 1,
+ ?LAST_ERROR := Error,
+ ?REP_NODE := null,
+ ?REP_PID := null
+ },
+ JobData2 = hist_append(?HIST_CRASHED, Now, JobData1, Error),
+ JobData3 = hist_append(?HIST_PENDING, Now, JobData2, undefined),
+ JobData4 = fabric2_active_tasks:update_active_task_info(JobData3, #{}),
+
+ couch_stats:increment_counter([couch_replicator, jobs, crashes]),
+
+ Time = get_backoff_time(ErrorCount + 1),
+ case couch_replicator_jobs:reschedule_job(JTx, Job, JobData4, Time) of
+ ok -> ok;
+ {error, halt} -> exit({shutdown, halt})
+ end.
+
+
+reschedule(JTx, Job, #{} = JobData) ->
+ Now = erlang:system_time(second),
+
+ JobData1 = JobData#{
+ ?STATE := ?ST_PENDING,
+ ?STATE_INFO := null,
+ ?LAST_ERROR := null,
+ ?REP_NODE := null,
+ ?REP_PID := null
+ },
+ JobData2 = hist_append(?HIST_STOPPED, Now, JobData1, undefined),
+ JobData3 = hist_append(?HIST_PENDING, Now, JobData2, undefined),
+ JobData4 = fabric2_active_tasks:update_active_task_info(JobData3, #{}),
+
+ couch_stats:increment_counter([couch_replicator, jobs, stops]),
+
+ Time = Now + couch_replicator_job_server:scheduling_interval_sec(),
+ case couch_replicator_jobs:reschedule_job(JTx, Job, JobData4, Time) of
+ ok -> ok;
+ {error, halt} -> exit({shutdown, halt})
+ end.
+
+
+fail_job(JTx, Job, #{} = JobData, Error0) ->
+ Error = error_info(Error0),
+
+ Now = erlang:system_time(second),
+
+ #{
+ ?ERROR_COUNT := ErrorCount,
+ ?DB_NAME := DbName,
+ ?DB_UUID := DbUUID,
+ ?DOC_ID := DocId
+ } = JobData,
+
+ JobData1 = JobData#{
+ ?STATE := ?ST_FAILED,
+ ?STATE_INFO := Error,
+ ?ERROR_COUNT := ErrorCount + 1,
+ ?REP_NODE := null,
+ ?REP_PID := null
+ },
+ JobData2 = hist_append(?HIST_CRASHED, Now, JobData1, Error),
+ JobData3 = fabric2_active_tasks:update_active_task_info(JobData2, #{}),
+
+ couch_stats:increment_counter([couch_replicator, jobs, crashes]),
+
+ case couch_replicator_jobs:finish_job(JTx, Job, JobData3) of
+ ok ->
+ couch_replicator_docs:update_failed(DbName, DbUUID, DocId, Error),
+ ok;
+ {error, halt} ->
+ exit({shutdown, halt})
+ end.
+
+
+complete_job(JTx, Job, #{} = JobData, CheckpointHistory) ->
+ #{
+ ?DB_NAME := Db,
+ ?DB_UUID := DbUUID,
+ ?DOC_ID := DocId,
+ ?REP_STATS := RepStats,
+ ?REP := Rep
+ } = JobData,
+
+ Now = erlang:system_time(second),
+
+ #{?START_TIME := StartTime} = Rep,
+ JobData1 = JobData#{
+ ?STATE := ?ST_COMPLETED,
+ ?CHECKPOINT_HISTORY := CheckpointHistory,
+ ?STATE_INFO := RepStats,
+ ?REP_NODE := null,
+ ?REP_PID := null
+ },
+ JobData2 = hist_append(?HIST_STOPPED, Now, JobData1, undefined),
+ JobData3 = fabric2_active_tasks:update_active_task_info(JobData2, #{}),
+
+ couch_stats:increment_counter([couch_replicator, jobs, stops]),
+
+ case couch_replicator_jobs:finish_job(JTx, Job, JobData3) of
+ ok ->
+ StartISO8601 = couch_replicator_utils:iso8601(StartTime),
+ Stats = maps:merge(RepStats, #{<<"start_time">> => StartISO8601}),
+ couch_replicator_docs:update_completed(Db, DbUUID, DocId, Stats),
+ ok;
+ {error, halt} ->
+ exit({shutdown, halt})
+ end.
+
+
+error_info(Error0) ->
+ case Error0 of
+ <<_/binary>> ->
+ Error0;
+ undefined ->
+ undefined;
+ null ->
+ null;
+ Atom when is_atom(Atom) ->
+ atom_to_binary(Atom, utf8);
+ {shutdown, Atom} when is_atom(Atom) ->
+ atom_to_binary(Atom, utf8);
+ {shutdown, Err} ->
+ couch_replicator_utils:rep_error_to_binary(Err);
+ {error, Atom} when is_atom(Atom) ->
+ atom_to_binary(Atom, utf8);
+ {error, {Err, Reason}} when is_atom(Err) ->
+ ReasonBin = couch_replicator_utils:rep_error_to_binary(Reason),
+ #{
+ <<"error">> => atom_to_binary(Err, utf8),
+ <<"reason">> => ReasonBin
+ };
+ _Other ->
+ couch_replicator_utils:rep_error_to_binary(Error0)
+ end.
+
+
+get_rep_id(JTx, Job, #{} = JobData) ->
+ #{?REP := Rep} = JobData,
+ try
+ couch_replicator_ids:replication_id(Rep)
+ catch
+ throw:{filter_fetch_error, _} = Error ->
+ reschedule_on_error(JTx, Job, JobData, {error, Error}),
+ exit({shutdown, finished})
+ end.
+
+
+% After job run continuously for some time we consider it "healed" and reset
+% its consecutive error count.
+maybe_heal(#{} = JobData, Now) ->
+ #{?LAST_START := LastStart} = JobData,
+ case Now - LastStart > health_threshold() of
+ true -> JobData#{?ERROR_COUNT := 0, ?LAST_ERROR := null};
+ false -> JobData
+ end.
+
+
+get_backoff_time(ErrCnt) ->
+ Max = min(max_backoff_penalty_sec(), 3600 * 24 * 30),
+ Min = max(min_backoff_penalty_sec(), 2),
+
+ % Calculate the max exponent so exponentiation doesn't blow up
+ MaxExp = math:log2(Max) - math:log2(Min),
+
+ % This is the recommended backoff amount
+ Wait = Min * math:pow(2, min(ErrCnt, MaxExp)),
+
+ % Apply a 25% jitter to avoid a thundering herd effect
+ WaitJittered = Wait * 0.75 + rand:uniform(trunc(Wait * 0.25) + 1),
+ erlang:system_time(second) + trunc(WaitJittered).
+
+
+headers_strip_creds([], Acc) ->
+ lists:reverse(Acc);
+
+headers_strip_creds([{Key, Value0} | Rest], Acc) ->
+ Value = case string:to_lower(Key) of
+ "authorization" -> "****";
+ _ -> Value0
+ end,
+ headers_strip_creds(Rest, [{Key, Value} | Acc]).
+
+
+httpdb_strip_creds(#httpdb{url = Url, headers = Headers} = HttpDb) ->
+ HttpDb#httpdb{
+ url = couch_util:url_strip_password(Url),
+ headers = headers_strip_creds(Headers, [])
+ };
+
+httpdb_strip_creds(LocalDb) ->
+ LocalDb.
+
+
+state_strip_creds(#rep_state{source = Source, target = Target} = State) ->
+ State#rep_state{
+ source = httpdb_strip_creds(Source),
+ target = httpdb_strip_creds(Target)
+ }.
+
+
+adjust_maxconn(Src = #{<<"http_connections">> := 1}, RepId) ->
+ Msg = "Adjusting minimum number of HTTP source connections to 2 for ~p",
+ couch_log:notice(Msg, [RepId]),
+ Src#{<<"http_connections">> := 2};
+
+adjust_maxconn(Src, _RepId) ->
+ Src.
+
+
+do_last_checkpoint(#rep_state{seqs_in_progress = [],
+ highest_seq_done = {_Ts, ?LOWEST_SEQ}} = State) ->
+ {stop, normal, cancel_timers(State)};
+
+do_last_checkpoint(#rep_state{seqs_in_progress = [],
+ highest_seq_done = Seq} = State) ->
+ State1 = State#rep_state{current_through_seq = Seq},
+ State2 = cancel_timers(State1),
+ case do_checkpoint(State2) of
+ {ok, State3} ->
+ couch_stats:increment_counter([couch_replicator, checkpoints,
+ success]),
+ {stop, normal, State3};
+ Error ->
+ couch_stats:increment_counter([couch_replicator, checkpoints,
+ failure]),
+ {stop, Error, State2}
+ end.
+
+
+start_checkpoint_timer(#rep_state{} = State) ->
+ CheckpointAfterMSec = State#rep_state.checkpoint_interval,
+ JobTimeoutMSec = couch_replicator_jobs:get_timeout() * 1000,
+ Wait1 = min(CheckpointAfterMSec, JobTimeoutMSec div 2),
+ Wait2 = trunc(Wait1 * 0.75) + rand:uniform(trunc(Wait1 * 0.25)),
+ TRef = erlang:send_after(Wait2, self(), checkpoint),
+ State#rep_state{checkpoint_timer = TRef}.
+
+
+cancel_checkpoint_timer(#rep_state{checkpoint_timer = nil} = State) ->
+ State;
+cancel_checkpoint_timer(#rep_state{checkpoint_timer = Timer} = State) ->
+ erlang:cancel_timer(Timer),
+ State#rep_state{checkpoint_timer = nil}.
+
+
+start_stats_timer(#rep_state{} = State) ->
+ MSec = stats_update_interval_sec() * 1000,
+ TRef = erlang:send_after(MSec, self(), stats_update),
+ State#rep_state{stats_timer = TRef}.
+
+
+cancel_stats_timer(#rep_state{stats_timer = nil} = State) ->
+ State;
+cancel_stats_timer(#rep_state{stats_timer = Timer} = State) ->
+ erlang:cancel_timer(Timer),
+ receive stats_update -> ok after 0 -> ok end,
+ State#rep_state{stats_timer = nil}.
+
+
+cancel_timers(#rep_state{} = State) ->
+ State1 = cancel_checkpoint_timer(State),
+ cancel_stats_timer(State1).
+
+
+init_state(#{} = Job, #{} = JobData) ->
+ #{
+ ?REP := Rep,
+ ?REP_ID := Id,
+ ?BASE_ID := BaseId,
+ ?DB_NAME := DbName,
+ ?DB_UUID := DbUUID,
+ ?DOC_ID := DocId,
+ ?LAST_ERROR := LastError
+ } = JobData,
+ #{
+ ?SOURCE := Src0,
+ ?TARGET := Tgt,
+ ?START_TIME := StartTime,
+ ?OPTIONS := Options0,
+ ?REP_USER := User
+ } = Rep,
+
+ % Optimize replication parameters if last time the jobs crashed because it
+ % was rate limited
+ Options = optimize_rate_limited_job(Options0, LastError),
+
+ % Adjust minimum number of http source connections to 2 to avoid deadlock
+ Src = adjust_maxconn(Src0, BaseId),
+ {ok, Source} = couch_replicator_api_wrap:db_open(Src),
+ CreateTgt = maps:get(<<"create_target">>, Options, false),
+ TParams = maps:get(<<"create_target_params">>, Options, #{}),
+
+ {ok, Target} = couch_replicator_api_wrap:db_open(Tgt, CreateTgt, TParams),
+
+ {ok, SourceInfo} = couch_replicator_api_wrap:get_db_info(Source),
+ {ok, TargetInfo} = couch_replicator_api_wrap:get_db_info(Target),
+
+ [SourceLog, TargetLog] = find_and_migrate_logs([Source, Target], Rep,
+ BaseId),
+
+ {StartSeq0, History} = compare_replication_logs(SourceLog, TargetLog),
+
+ #{?REP_STATS := Stats0} = JobData,
+ Stats1 = couch_replicator_stats:new(Stats0),
+ HistoryStats = case History of
+ [{[_ | _] = HProps} | _] -> couch_replicator_stats:new(HProps);
+ _ -> couch_replicator_stats:new()
+ end,
+ Stats2 = couch_replicator_stats:max_stats(Stats1, HistoryStats),
+
+ StartSeq1 = maps:get(<<"since_seq">>, Options, StartSeq0),
+ StartSeq = {0, StartSeq1},
+
+ SourceSeq = get_value(<<"update_seq">>, SourceInfo, ?LOWEST_SEQ),
+
+ #doc{body={CheckpointHistory}} = SourceLog,
+
+ State = #rep_state{
+ job = Job,
+ job_data = JobData,
+ id = Id,
+ base_id = BaseId,
+ source_name = couch_replicator_api_wrap:db_uri(Source),
+ target_name = couch_replicator_api_wrap:db_uri(Target),
+ source = Source,
+ target = Target,
+ options = Options,
+ history = History,
+ checkpoint_history = {[{<<"no_changes">>, true} | CheckpointHistory]},
+ start_seq = StartSeq,
+ current_through_seq = StartSeq,
+ committed_seq = StartSeq,
+ source_log = SourceLog,
+ target_log = TargetLog,
+ rep_starttime = StartTime,
+ src_starttime = get_value(<<"instance_start_time">>, SourceInfo),
+ tgt_starttime = get_value(<<"instance_start_time">>, TargetInfo),
+ session_id = couch_uuids:random(),
+ source_seq = SourceSeq,
+ use_checkpoints = maps:get(<<"use_checkpoints">>, Options),
+ checkpoint_interval = maps:get(<<"checkpoint_interval">>, Options),
+ stats = Stats2,
+ stats_timer = nil,
+ doc_id = DocId,
+ db_name = DbName,
+ db_uuid = DbUUID,
+ user = User
+ },
+ start_checkpoint_timer(State).
+
+
+find_and_migrate_logs(DbList, #{} = Rep, BaseId) when is_binary(BaseId) ->
+ LogId = ?l2b(?LOCAL_DOC_PREFIX ++ BaseId),
+ fold_replication_logs(DbList, ?REP_ID_VERSION, LogId, LogId, Rep, []).
+
+
+fold_replication_logs([], _Vsn, _LogId, _NewId, _Rep, Acc) ->
+ lists:reverse(Acc);
+
+fold_replication_logs([Db | Rest] = Dbs, Vsn, LogId, NewId, #{} = Rep, Acc) ->
+ case couch_replicator_api_wrap:open_doc(Db, LogId, [ejson_body]) of
+ {error, <<"not_found">>} when Vsn > 1 ->
+ OldRepId = couch_replicator_ids:base_id(Rep, Vsn - 1),
+ fold_replication_logs(Dbs, Vsn - 1,
+ ?l2b(?LOCAL_DOC_PREFIX ++ OldRepId), NewId, Rep, Acc);
+ {error, <<"not_found">>} ->
+ fold_replication_logs(Rest, ?REP_ID_VERSION, NewId, NewId, Rep,
+ [#doc{id = NewId} | Acc]);
+ {ok, Doc} when LogId =:= NewId ->
+ fold_replication_logs(
+ Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [Doc | Acc]);
+ {ok, Doc} ->
+ MigratedLog = #doc{id = NewId, body = Doc#doc.body},
+ maybe_save_migrated_log(Rep, Db, MigratedLog, Doc#doc.id),
+ fold_replication_logs(Rest, ?REP_ID_VERSION, NewId, NewId, Rep,
+ [MigratedLog | Acc])
+ end.
+
+
+maybe_save_migrated_log(#{?OPTIONS := Options}, Db, #doc{} = Doc, OldId) ->
+ case maps:get(<<"use_checkpoints">>, Options) of
+ true ->
+ update_checkpoint(Db, Doc),
+ Msg = "Migrated replication checkpoint. Db:~p ~p -> ~p",
+ couch_log:notice(Msg, [httpdb_strip_creds(Db), OldId, Doc#doc.id]);
+ false ->
+ ok
+ end.
+
+
+spawn_changes_manager(Parent, ChangesQueue, BatchSize) ->
+ spawn_link(fun() ->
+ changes_manager_loop_open(Parent, ChangesQueue, BatchSize, 1)
+ end).
+
+
+changes_manager_loop_open(Parent, ChangesQueue, BatchSize, Ts) ->
+ receive
+ {get_changes, From} ->
+ case couch_work_queue:dequeue(ChangesQueue, BatchSize) of
+ closed ->
+ From ! {closed, self()};
+ {ok, ChangesOrLastSeqs} ->
+ ReportSeq = case lists:last(ChangesOrLastSeqs) of
+ {last_seq, Seq} -> {Ts, Seq};
+ #doc_info{high_seq = Seq} -> {Ts, Seq}
+ end,
+ Changes = lists:filter(fun
+ (#doc_info{}) -> true;
+ ({last_seq, _Seq}) -> false
+ end, ChangesOrLastSeqs),
+ ok = gen_server:cast(Parent, {report_seq, ReportSeq}),
+ From ! {changes, self(), Changes, ReportSeq}
+ end,
+ changes_manager_loop_open(Parent, ChangesQueue, BatchSize, Ts + 1)
+ end.
+
+
+do_checkpoint(#rep_state{use_checkpoints=false} = State) ->
+ NewState = State#rep_state{
+ checkpoint_history = {[{<<"use_checkpoints">>, false}]}
+ },
+ {ok, update_job_state(NewState)};
+do_checkpoint(#rep_state{current_through_seq=S, committed_seq=S} = State) ->
+ {ok, update_job_state(State)};
+do_checkpoint(State) ->
+ #rep_state{
+ source_name=SourceName,
+ target_name=TargetName,
+ source = Source,
+ target = Target,
+ history = OldHistory,
+ start_seq = {_, StartSeq},
+ current_through_seq = {_Ts, NewSeq} = NewTsSeq,
+ source_log = SourceLog,
+ target_log = TargetLog,
+ rep_starttime = RepStartTime,
+ src_starttime = SrcInstanceStartTime,
+ tgt_starttime = TgtInstanceStartTime,
+ stats = Stats,
+ options = Options,
+ session_id = SessionId
+ } = State,
+ case commit_to_both(Source, Target) of
+ {source_error, Reason} ->
+ {checkpoint_commit_failure, <<"Failure on source commit: ",
+ (couch_util:to_binary(Reason))/binary>>};
+ {target_error, Reason} ->
+ {checkpoint_commit_failure, <<"Failure on target commit: ",
+ (couch_util:to_binary(Reason))/binary>>};
+ {SrcInstanceStartTime, TgtInstanceStartTime} ->
+ couch_log:notice("recording a checkpoint for `~s` -> `~s` at "
+ "source update_seq ~p", [SourceName, TargetName, NewSeq]),
+ StartTime = couch_replicator_utils:rfc1123_local(RepStartTime),
+ EndTime = couch_replicator_utils:rfc1123_local(),
+ NewHistoryEntry = {[
+ {<<"session_id">>, SessionId},
+ {<<"start_time">>, StartTime},
+ {<<"end_time">>, EndTime},
+ {<<"start_last_seq">>, StartSeq},
+ {<<"end_last_seq">>, NewSeq},
+ {<<"recorded_seq">>, NewSeq},
+ {<<"missing_checked">>,
+ couch_replicator_stats:missing_checked(Stats)},
+ {<<"missing_found">>,
+ couch_replicator_stats:missing_found(Stats)},
+ {<<"docs_read">>,
+ couch_replicator_stats:docs_read(Stats)},
+ {<<"docs_written">>,
+ couch_replicator_stats:docs_written(Stats)},
+ {<<"doc_write_failures">>,
+ couch_replicator_stats:doc_write_failures(Stats)}
+ ]},
+ BaseHistory = [
+ {<<"session_id">>, SessionId},
+ {<<"source_last_seq">>, NewSeq},
+ {<<"replication_id_version">>, ?REP_ID_VERSION}
+ ] ++ case maps:get(<<"doc_ids">>, Options, undefined) of
+ undefined ->
+ [];
+ _DocIds ->
+ % backwards compatibility with the result of a replication
+ % by doc IDs in versions 0.11.x and 1.0.x TODO: deprecate
+ % (use same history format, simplify code)
+ [
+ {<<"start_time">>, StartTime},
+ {<<"end_time">>, EndTime},
+ {<<"docs_read">>,
+ couch_replicator_stats:docs_read(Stats)},
+ {<<"docs_written">>,
+ couch_replicator_stats:docs_written(Stats)},
+ {<<"doc_write_failures">>,
+ couch_replicator_stats:doc_write_failures(Stats)}
+ ]
+ end,
+ % limit history to 50 entries
+ NewRepHistory = {
+ BaseHistory ++ [{<<"history">>,
+ lists:sublist([NewHistoryEntry | OldHistory], 50)}]
+ },
+
+ try
+ {SrcRevPos, SrcRevId} = update_checkpoint(Source,
+ SourceLog#doc{body = NewRepHistory}, source),
+ {TgtRevPos, TgtRevId} = update_checkpoint(Target,
+ TargetLog#doc{body = NewRepHistory}, target),
+ NewState = State#rep_state{
+ checkpoint_history = NewRepHistory,
+ committed_seq = NewTsSeq,
+ source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}},
+ target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}}
+ },
+ {ok, update_job_state(NewState)}
+ catch throw:{checkpoint_commit_failure, _} = Failure ->
+ Failure
+ end;
+ {SrcInstanceStartTime, _NewTgtInstanceStartTime} ->
+ {checkpoint_commit_failure, <<"Target database out of sync. "
+ "Try to increase max_dbs_open at the target's server.">>};
+ {_NewSrcInstanceStartTime, TgtInstanceStartTime} ->
+ {checkpoint_commit_failure, <<"Source database out of sync. "
+ "Try to increase max_dbs_open at the source's server.">>};
+ {_NewSrcInstanceStartTime, _NewTgtInstanceStartTime} ->
+ {checkpoint_commit_failure, <<"Source and target databases out of "
+ "sync. Try to increase max_dbs_open at both servers.">>}
+ end.
+
+
+update_checkpoint(Db, Doc, DbType) ->
+ try
+ update_checkpoint(Db, Doc)
+ catch throw:{checkpoint_commit_failure, Reason} ->
+ throw({checkpoint_commit_failure, <<"Error updating the ",
+ (couch_util:to_binary(DbType))/binary, " checkpoint document: ",
+ (couch_util:to_binary(Reason))/binary>>})
+ end.
+
+
+update_checkpoint(Db, #doc{id = LogId, body = LogBody} = Doc) ->
+ try
+ case couch_replicator_api_wrap:update_doc(Db, Doc, [delay_commit]) of
+ {ok, PosRevId} -> PosRevId;
+ {error, Reason} -> throw({checkpoint_commit_failure, Reason})
+ end
+ catch throw:conflict ->
+ Opts = [ejson_body],
+ case (catch couch_replicator_api_wrap:open_doc(Db, LogId, Opts)) of
+ {ok, #doc{body = LogBody, revs = {Pos, [RevId | _]}}} ->
+ % This means that we were able to update successfully the
+ % checkpoint doc in a previous attempt but we got a connection
+ % error (timeout for e.g.) before receiving the success
+ % response. Therefore the request was retried and we got a
+ % conflict, as the revision we sent is not the current one. We
+ % confirm this by verifying the doc body we just got is the
+ % same that we have just sent.
+ {Pos, RevId};
+ _ ->
+ throw({checkpoint_commit_failure, conflict})
+ end
+ end.
+
+
+commit_to_both(Source, Target) ->
+ % commit the src async
+ ParentPid = self(),
+ SrcCommitPid = spawn_link(fun() ->
+ Result = (catch couch_replicator_api_wrap:ensure_full_commit(Source)),
+ ParentPid ! {self(), Result}
+ end),
+
+ % commit tgt sync
+ TgtResult = (catch couch_replicator_api_wrap:ensure_full_commit(Target)),
+
+ SrcResult = receive
+ {SrcCommitPid, Result} ->
+ unlink(SrcCommitPid),
+ receive
+ {'EXIT', SrcCommitPid, _} ->
+ ok
+ after
+ 0 -> ok
+ end,
+ Result;
+ {'EXIT', SrcCommitPid, Reason} ->
+ {error, Reason}
+ end,
+ case TgtResult of
+ {ok, TargetStartTime} ->
+ case SrcResult of
+ {ok, SourceStartTime} ->
+ {SourceStartTime, TargetStartTime};
+ SourceError ->
+ {source_error, SourceError}
+ end;
+ TargetError ->
+ {target_error, TargetError}
+ end.
+
+
+compare_replication_logs(SrcDoc, TgtDoc) ->
+ #doc{body={RepRecProps}} = SrcDoc,
+ #doc{body={RepRecPropsTgt}} = TgtDoc,
+ SrcSession = get_value(<<"session_id">>, RepRecProps),
+ TgtSession = get_value(<<"session_id">>, RepRecPropsTgt),
+ case SrcSession == TgtSession of
+ true ->
+ % if the records have the same session id,
+ % then we have a valid replication history
+ OldSeqNum = get_value(<<"source_last_seq">>, RepRecProps,
+ ?LOWEST_SEQ),
+ OldHistory = get_value(<<"history">>, RepRecProps, []),
+ {OldSeqNum, OldHistory};
+ false ->
+ SourceHistory = get_value(<<"history">>, RepRecProps, []),
+ TargetHistory = get_value(<<"history">>, RepRecPropsTgt, []),
+ couch_log:notice("Replication records differ. "
+ "Scanning histories to find a common ancestor.", []),
+ couch_log:debug("Record on source:~p~nRecord on target:~p~n",
+ [RepRecProps, RepRecPropsTgt]),
+ compare_rep_history(SourceHistory, TargetHistory)
+ end.
+
+
+compare_rep_history(S, T) when S =:= [] orelse T =:= [] ->
+ couch_log:notice("no common ancestry -- performing full replication", []),
+ {?LOWEST_SEQ, []};
+
+compare_rep_history([{S} | SourceRest], [{T} | TargetRest] = Target) ->
+ SourceId = get_value(<<"session_id">>, S),
+ case has_session_id(SourceId, Target) of
+ true ->
+ RecordSeqNum = get_value(<<"recorded_seq">>, S, ?LOWEST_SEQ),
+ couch_log:notice("found a common replication record with "
+ "source_seq ~p", [RecordSeqNum]),
+ {RecordSeqNum, SourceRest};
+ false ->
+ TargetId = get_value(<<"session_id">>, T),
+ case has_session_id(TargetId, SourceRest) of
+ true ->
+ RecordSeqNum = get_value(<<"recorded_seq">>, T,
+ ?LOWEST_SEQ),
+ couch_log:notice("found a common replication record with "
+ "source_seq ~p", [RecordSeqNum]),
+ {RecordSeqNum, TargetRest};
+ false ->
+ compare_rep_history(SourceRest, TargetRest)
+ end
+ end.
+
+
+has_session_id(_SessionId, []) ->
+ false;
+
+has_session_id(SessionId, [{Props} | Rest]) ->
+ case get_value(<<"session_id">>, Props, nil) of
+ SessionId -> true;
+ _Else -> has_session_id(SessionId, Rest)
+ end.
+
+
+get_pending_count(#rep_state{} = St) ->
+ #rep_state{
+ highest_seq_done = HighestSeqDone,
+ source = #httpdb{} = Db0
+ } = St,
+ {_, Seq} = HighestSeqDone,
+ Db = Db0#httpdb{retries = 3},
+ case (catch couch_replicator_api_wrap:get_pending_count(Db, Seq)) of
+ {ok, Pending} ->
+ Pending;
+ _ ->
+ null
+ end.
+
+
+maybe_update_job_state(#rep_state{} = State) ->
+ case State#rep_state.stats_timer of
+ nil -> start_stats_timer(State);
+ Ref when is_reference(Ref) -> State
+ end.
+
+
+update_job_state(#rep_state{} = State0) ->
+ State = cancel_stats_timer(State0),
+ #rep_state{
+ current_through_seq = {_, ThroughSeq},
+ highest_seq_done = {_, HighestSeq},
+ committed_seq = {_, CommittedSeq},
+ stats = Stats,
+ job_data = JobData
+ } = State,
+
+ Now = erlang:system_time(second),
+
+ RevisionsChecked = couch_replicator_stats:missing_checked(Stats),
+ MissingRevisions = couch_replicator_stats:missing_found(Stats),
+ DocsRead = couch_replicator_stats:docs_read(Stats),
+ DocsWritten = couch_replicator_stats:docs_written(Stats),
+ DocWriteFailures = couch_replicator_stats:doc_write_failures(Stats),
+ PendingCount = get_pending_count(State),
+
+ StatsMap = #{
+ <<"checkpointed_source_seq">> => CommittedSeq,
+ <<"source_seq">> => HighestSeq,
+ <<"through_seq">> => ThroughSeq,
+ <<"revisions_checked">> => RevisionsChecked,
+ <<"missing_revisions_found">> => MissingRevisions,
+ <<"docs_read">> => DocsRead,
+ <<"docs_written">> => DocsWritten,
+ <<"doc_write_failures">> => DocWriteFailures,
+ <<"changes_pending">> => PendingCount
+ },
+
+ JobData1 = JobData#{
+ ?REP_STATS := StatsMap,
+ ?LAST_UPDATED := Now
+ },
+
+ JobData2 = maybe_heal(JobData1, Now),
+
+ State1 = State#rep_state{job_data = JobData2},
+ State2 = update_active_task_info(State1),
+ update_job_data(undefined, State2).
+
+
+replication_start_error({unauthorized, DbUri}) ->
+ {unauthorized, <<"unauthorized to access or create database ",
+ DbUri/binary>>};
+
+replication_start_error({db_not_found, DbUri}) ->
+ {db_not_found, <<"could not open ", DbUri/binary>>};
+
+replication_start_error({http_request_failed, _Method, Url0,
+ {error, {error, {conn_failed, {error, nxdomain}}}}}) ->
+ Url = ?l2b(couch_util:url_strip_password(Url0)),
+ {nxdomain, <<"could not resolve ", Url/binary>>};
+
+replication_start_error({http_request_failed, Method0, Url0,
+ {error, {code, Code}}}) when is_integer(Code) ->
+ Url = ?l2b(couch_util:url_strip_password(Url0)),
+ Method = ?l2b(Method0),
+ CodeBin = integer_to_binary(Code),
+ {http_error_code, <<CodeBin/binary, " ", Method/binary, " ", Url/binary>>};
+
+replication_start_error(Error) ->
+ Error.
+
+
+log_replication_start(#rep_state{} = RepState) ->
+ #rep_state{
+ id = Id,
+ doc_id = DocId,
+ db_name = DbName,
+ options = Options,
+ source_name = Source,
+ target_name = Target,
+ session_id = Sid
+ } = RepState,
+ Workers = maps:get(<<"worker_processes">>, Options),
+ BatchSize = maps:get(<<"worker_batch_size">>, Options),
+ From = case DbName of
+ Name when is_binary(Name) ->
+ io_lib:format("from doc ~s:~s", [Name, DocId]);
+ _ ->
+ "from _replicate endpoint"
+ end,
+ Msg = "Starting replication ~s (~s -> ~s) ~s worker_procesess:~p"
+ " worker_batch_size:~p session_id:~s",
+ couch_log:notice(Msg, [Id, Source, Target, From, Workers, BatchSize, Sid]).
+
+
+check_user_filter(#rep_state{} = State) ->
+ #rep_state{
+ id = RepId,
+ base_id = BaseId,
+ job = Job,
+ job_data = JobData
+ } = State,
+ case get_rep_id(undefined, Job, JobData) of
+ {RepId, BaseId} ->
+ ok;
+ {NewId, NewBaseId} when is_binary(NewId), is_binary(NewBaseId) ->
+ LogMsg = "~p : Replication id was updated ~p -> ~p",
+ couch_log:error(LogMsg, [?MODULE, RepId, NewId]),
+ reschedule(undefined, Job, JobData),
+ exit({shutdown, finished})
+ end.
+
+
+hist_append(Type, Now, #{} = JobData, Info) when is_integer(Now),
+ is_binary(Type) ->
+ #{?JOB_HISTORY := Hist} = JobData,
+ Evt1 = #{?HIST_TYPE => Type, ?HIST_TIMESTAMP => Now},
+ Evt2 = case Info of
+ undefined ->
+ Evt1;
+ null ->
+ Evt1#{?HIST_REASON => null};
+ <<_/binary>> ->
+ Evt1#{?HIST_REASON => Info};
+ #{<<"error">> := Err, <<"reason">> := Reason} when is_binary(Err),
+ is_binary(Reason) ->
+ Evt1#{?HIST_REASON => Reason}
+ end,
+ Hist1 = [Evt2 | Hist],
+ Hist2 = lists:sublist(Hist1, max_history()),
+ JobData#{?JOB_HISTORY := Hist2}.
+
+
+optimize_rate_limited_job(#{} = Options, <<"max_backoff">>) ->
+ OptimizedSettings = #{
+ <<"checkpoint_interval">> => 5000,
+ <<"worker_processes">> => 2,
+ <<"worker_batch_size">> => 100,
+ <<"http_connections">> => 2
+ },
+ maps:merge(Options, OptimizedSettings);
+
+optimize_rate_limited_job(#{} = Options, _Other) ->
+ Options.
+
+
+close_endpoints(State) ->
+ State1 = cancel_timers(State),
+ couch_replicator_api_wrap:db_close(State1#rep_state.source),
+ couch_replicator_api_wrap:db_close(State1#rep_state.target),
+ ok.
+
+
+get_value(K, Props) ->
+ couch_util:get_value(K, Props).
+
+
+get_value(K, Props, Default) ->
+ couch_util:get_value(K, Props, Default).
+
+
+accept_jitter_msec() ->
+ couch_rand:uniform(erlang:max(1, max_startup_jitter_msec())).
+
+
+max_startup_jitter_msec() ->
+ config:get_integer("replicator", "startup_jitter",
+ ?STARTUP_JITTER_DEFAULT).
+
+
+min_backoff_penalty_sec() ->
+ config:get_integer("replicator", "min_backoff_penalty_sec",
+ ?DEFAULT_MIN_BACKOFF_PENALTY_SEC).
+
+
+max_backoff_penalty_sec() ->
+ config:get_integer("replicator", "max_backoff_penalty_sec",
+ ?DEFAULT_MAX_BACKOFF_PENALTY_SEC).
+
+
+max_history() ->
+ config:get_integer("replicator", "max_history", ?DEFAULT_MAX_HISTORY).
+
+
+stats_update_interval_sec() ->
+ config:get_integer("replicator", "stats_update_interval_sec",
+ ?DEFAULT_STATS_UPDATE_INTERVAL_SEC).
+
+
+-ifdef(TEST).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("fabric/test/fabric2_test.hrl").
+
+
+replication_start_error_test() ->
+ ?assertEqual({unauthorized, <<"unauthorized to access or create database"
+ " http://x/y">>}, replication_start_error({unauthorized,
+ <<"http://x/y">>})),
+ ?assertEqual({db_not_found, <<"could not open http://x/y">>},
+ replication_start_error({db_not_found, <<"http://x/y">>})),
+ ?assertEqual({nxdomain, <<"could not resolve http://x/y">>},
+ replication_start_error({http_request_failed, "GET", "http://x/y",
+ {error, {error, {conn_failed, {error, nxdomain}}}}})),
+ ?assertEqual({http_error_code, <<"503 GET http://x/y">>},
+ replication_start_error({http_request_failed, "GET", "http://x/y",
+ {error, {code, 503}}})).
+
+
+scheduler_job_format_status_test_() ->
+ {
+ foreach,
+ fun setup/0,
+ fun teardown/1,
+ [
+ ?TDEF_FE(t_format_status)
+ ]
+ }.
+
+
+setup() ->
+ meck:expect(config, get, fun(_, _, Default) -> Default end).
+
+
+teardown(_) ->
+ meck:unload().
+
+
+t_format_status(_) ->
+ {ok, Rep} = couch_replicator_parse:parse_rep(#{
+ <<"source">> => <<"http://u:p@h1/d1">>,
+ <<"target">> => <<"http://u:p@h2/d2">>,
+ <<"create_target">> => true
+ }, null),
+ State = #rep_state{
+ id = <<"base+ext">>,
+ job_data = #{?REP => Rep},
+ doc_id = <<"mydoc">>,
+ db_name = <<"mydb">>,
+ source = maps:get(?SOURCE, Rep),
+ target = maps:get(?TARGET, Rep),
+ options = maps:get(?OPTIONS, Rep),
+ session_id = <<"a">>,
+ start_seq = <<"1">>,
+ source_seq = <<"2">>,
+ committed_seq = <<"3">>,
+ current_through_seq = <<"4">>,
+ highest_seq_done = <<"5">>
+ },
+ Format = format_status(opts_ignored, [pdict, State]),
+ FmtOptions = proplists:get_value(options, Format),
+ ?assertEqual("http://u:*****@h1/d1/", proplists:get_value(source, Format)),
+ ?assertEqual("http://u:*****@h2/d2/", proplists:get_value(target, Format)),
+ ?assertEqual(<<"base+ext">>, proplists:get_value(rep_id, Format)),
+ ?assertEqual(true, maps:get(<<"create_target">>, FmtOptions)),
+ ?assertEqual(<<"mydoc">>, proplists:get_value(doc_id, Format)),
+ ?assertEqual(<<"mydb">>, proplists:get_value(db_name, Format)),
+ ?assertEqual(<<"a">>, proplists:get_value(session_id, Format)),
+ ?assertEqual(<<"1">>, proplists:get_value(start_seq, Format)),
+ ?assertEqual(<<"2">>, proplists:get_value(source_seq, Format)),
+ ?assertEqual(<<"3">>, proplists:get_value(committed_seq, Format)),
+ ?assertEqual(<<"4">>, proplists:get_value(current_through_seq, Format)),
+ ?assertEqual(<<"5">>, proplists:get_value(highest_seq_done, Format)).
+
+
+-endif.