path: root/src/couch_replicator/src/couch_replicator_job.erl
diff options
Diffstat (limited to 'src/couch_replicator/src/couch_replicator_job.erl')
1 files changed, 151 insertions, 16 deletions
diff --git a/src/couch_replicator/src/couch_replicator_job.erl b/src/couch_replicator/src/couch_replicator_job.erl
index c8c143a58..951471a14 100644
--- a/src/couch_replicator/src/couch_replicator_job.erl
+++ b/src/couch_replicator/src/couch_replicator_job.erl
@@ -39,6 +39,7 @@
-define(LOWEST_SEQ, 0).
@@ -116,6 +117,12 @@ terminate(shutdown, #rep_state{} = State0) ->
{ok, State2} ->
Error ->
+ what => checkpoint_failure,
+ in => replicator,
+ jobid =>,
+ details => Error
+ }),
Msg = "~p : Failed last checkpoint. Job: ~p Error: ~p",
couch_log:error(Msg, [?MODULE,, Error]),
@@ -127,9 +134,20 @@ terminate(shutdown, #rep_state{} = State0) ->
terminate({shutdown, Error}, {init_error, Stack}) ->
% Termination in init, before the job had initialized
case Error of
- max_backoff -> couch_log:warning("~p job backed off", [?MODULE]);
- finished -> couch_log:notice("~p job finished in init", [?MODULE]);
- _ -> couch_log:error("~p job failed ~p ~p", [?MODULE, Error, Stack])
+ max_backoff ->
+ ?LOG_WARNING(#{what => job_backoff, in => replicator}),
+ couch_log:warning("~p job backed off", [?MODULE]);
+ finished ->
+ ?LOG_NOTICE(#{what => job_finished_during_init, in => replicator}),
+ couch_log:notice("~p job finished in init", [?MODULE]);
+ _ ->
+ what => job_failure,
+ in => replicator,
+ details => Error,
+ stacktrace => Stack
+ }),
+ couch_log:error("~p job failed ~p ~p", [?MODULE, Error, Stack])
@@ -139,6 +157,11 @@ terminate({shutdown, finished}, #rep_state{} = State) ->
terminate({shutdown, halt}, #rep_state{} = State) ->
% Job is re-enqueued and possibly already running somewhere else
+ what => job_halted,
+ in => replicator,
+ jobid =>
+ }),
couch_log:error("~p job ~p halted", [?MODULE,]),
ok = close_endpoints(State);
@@ -155,6 +178,14 @@ terminate(Reason0, #rep_state{} = State0) ->
source_name = Source,
target_name = Target
} = State,
+ what => job_failure,
+ in => replicator,
+ replication_id => RepId,
+ source => Source,
+ target => Target,
+ details => Reason
+ }),
couch_log:error("Replication `~s` (`~s` -> `~s`) failed: ~p",
[RepId, Source, Target, Reason]),
ok = reschedule_on_error(undefined, Job, JobData, Reason),
@@ -189,6 +220,21 @@ handle_call({report_seq_done, Seq, StatsInc}, From, #rep_state{} = State) ->
_ ->
+ what => progress_report,
+ in => replicator,
+ old => #{
+ highest_seq_done => HighestDone,
+ current_through_seq => ThroughSeq,
+ seqs_in_progress => SeqsInProgress
+ },
+ new => #{
+ highest_seq_done => NewHighestDone,
+ current_through_seq => NewThroughSeq,
+ seqs_in_progress => NewSeqsInProgress
+ },
+ worker_reported_seq => Seq
+ }),
couch_log:debug("Worker reported seq ~p, through seq was ~p, "
"new through seq is ~p, highest seq done was ~p, "
"new highest seq done is ~p~n"
@@ -221,12 +267,10 @@ handle_info(timeout, delayed_init) ->
{ok, State} -> {noreply, State};
{stop, Reason, State} -> {stop, Reason, State}
- exit:{shutdown, Exit} when Exit =:= finished orelse Exit =:= halt ->
- Stack = erlang:get_stacktrace(),
+ exit:{shutdown, Exit}:Stack when Exit =:= finished orelse Exit =:= halt ->
{stop, {shutdown, Exit}, {init_error, Stack}};
- _Tag:Error ->
+ _Tag:Error:Stack ->
ShutdownReason = {error, replication_start_error(Error)},
- Stack = erlang:get_stacktrace(),
{stop, {shutdown, ShutdownReason}, {init_error, Stack}}
@@ -253,10 +297,12 @@ handle_info(shutdown, St) ->
{stop, shutdown, St};
handle_info({'EXIT', Pid, max_backoff}, State) ->
+ ?LOG_ERROR(#{what => max_backoff, in => replicator, pid => Pid}),
couch_log:error("Max backoff reached child process ~p", [Pid]),
{stop, {shutdown, max_backoff}, State};
handle_info({'EXIT', Pid, {shutdown, max_backoff}}, State) ->
+ ?LOG_ERROR(#{what => max_backoff, in => replicator, pid => Pid}),
couch_log:error("Max backoff reached child process ~p", [Pid]),
{stop, {shutdown, max_backoff}, State};
@@ -275,6 +321,7 @@ handle_info({'EXIT', Pid, Reason0}, #rep_state{changes_reader=Pid} = State) ->
Other ->
{changes_reader_died, Other}
+ ?LOG_ERROR(#{what => changes_reader_crash, in => replicator, details => Reason}),
couch_log:error("ChangesReader process died with reason: ~p", [Reason]),
{stop, {shutdown, Reason}, cancel_timers(State)};
@@ -283,6 +330,7 @@ handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager=Pid} = State) ->
handle_info({'EXIT', Pid, Reason}, #rep_state{changes_manager=Pid} = State) ->
couch_stats:increment_counter([couch_replicator, changes_manager_deaths]),
+ ?LOG_ERROR(#{what => changes_manager_crash, in => replicator, details => Reason}),
couch_log:error("ChangesManager process died with reason: ~p", [Reason]),
{stop, {shutdown, {changes_manager_died, Reason}}, cancel_timers(State)};
@@ -291,6 +339,7 @@ handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) ->
handle_info({'EXIT', Pid, Reason}, #rep_state{changes_queue=Pid} = State) ->
couch_stats:increment_counter([couch_replicator, changes_queue_deaths]),
+ ?LOG_ERROR(#{what => changes_queue_crash, in => replicator, details => Reason}),
couch_log:error("ChangesQueue process died with reason: ~p", [Reason]),
{stop, {shutdown, {changes_queue_died, Reason}}, cancel_timers(State)};
@@ -299,6 +348,12 @@ handle_info({'EXIT', Pid, normal}, #rep_state{workers = Workers} = State) ->
Workers ->
%% Processes might be linked by replicator's auth plugins so
%% we tolerate them exiting `normal` here and don't crash
+ what => linked_process_exit,
+ in => replicator,
+ pid => Pid,
+ reason => normal
+ }),
LogMsg = "~p: unknown pid exited `normal` ~p",
couch_log:error(LogMsg, [?MODULE, Pid]),
{noreply, State#rep_state{workers = Workers}};
@@ -321,6 +376,12 @@ handle_info({'EXIT', Pid, Reason}, #rep_state{workers = Workers} = State) ->
{shutdown, _} = Err ->
Other ->
+ what => worker_crash,
+ in => replicator,
+ pid => Pid,
+ details => Reason
+ }),
ErrLog = "Worker ~p died with reason: ~p",
couch_log:error(ErrLog, [Pid, Reason]),
{worker_died, Pid, Other}
@@ -329,6 +390,11 @@ handle_info({'EXIT', Pid, Reason}, #rep_state{workers = Workers} = State) ->
handle_info({Ref, ready}, St) when is_reference(Ref) ->
+ what => spurious_future_ready_message,
+ in => replicator,
+ ref => Ref
+ }),
LogMsg = "~p : spurious erlfdb future ready message ~p",
couch_log:notice(LogMsg, [?MODULE, Ref]),
{noreply, St};
@@ -406,16 +472,19 @@ delayed_init() ->
try do_init(Job, JobData) of
State = #rep_state{} -> {ok, State}
- exit:{http_request_failed, _, _, max_backoff} ->
- Stack = erlang:get_stacktrace(),
+ exit:{http_request_failed, _, _, max_backoff}:Stack ->
reschedule_on_error(undefined, Job, JobData, max_backoff),
{stop, {shutdown, max_backoff}, {init_error, Stack}};
- exit:{shutdown, Exit} when Exit =:= finished orelse Exit =:= halt ->
- Stack = erlang:get_stacktrace(),
+ exit:{shutdown, Exit}:Stack when Exit =:= finished orelse Exit =:= halt ->
{stop, {shutdown, Exit}, {init_error, Stack}};
- _Tag:Error ->
+ _Tag:Error:Stack ->
Reason = {error, replication_start_error(Error)},
- Stack = erlang:get_stacktrace(),
+ what => job_failure_during_init,
+ job => Job,
+ details => Reason,
+ stacktrace => Stack
+ }),
ErrMsg = "~p : job ~p failed during startup ~p stack:~p",
couch_log:error(ErrMsg, [?MODULE, Job, Reason, Stack]),
reschedule_on_error(undefined, Job, JobData, Reason),
@@ -576,12 +645,25 @@ check_ownership(#{jtx := true} = JTx, Job, JobData) ->
fail_job(JTx, Job, JobData, Error),
{ok, #{}} ->
+ what => duplicate_job_detected,
+ in => replicator,
+ jobid => JobId,
+ other_jobid => OtherJobId,
+ replication_id => RepId
+ }),
LogMsg = "~p : Job ~p usurping job ~p for replication ~p",
couch_log:warning(LogMsg, [?MODULE, JobId, OtherJobId,
couch_replicator_jobs:update_rep_id(JTx, JobId, RepId),
{error, not_found} ->
+ what => orphaned_job_mapping,
+ in => replicator,
+ replication_id => RepId,
+ jobid => OtherJobId
+ }),
LogMsg = "~p : Orphan replication job reference ~p -> ~p",
couch_log:error(LogMsg, [?MODULE, RepId, OtherJobId]),
couch_replicator_jobs:update_rep_id(JTx, JobId, RepId),
@@ -866,6 +948,12 @@ state_strip_creds(#rep_state{source = Source, target = Target} = State) ->
adjust_maxconn(Src = #{<<"http_connections">> := 1}, RepId) ->
+ what => minimum_source_connections_override,
+ in => replicator,
+ replication_id => RepId,
+ details => "adjusting minimum source connections to 2"
+ }),
Msg = "Adjusting minimum number of HTTP source connections to 2 for ~p",
couch_log:notice(Msg, [RepId]),
Src#{<<"http_connections">> := 2};
@@ -965,7 +1053,21 @@ init_state(#{} = Job, #{} = JobData) ->
[SourceLog, TargetLog] = find_and_migrate_logs([Source, Target], Rep,
- {StartSeq0, History} = compare_replication_logs(SourceLog, TargetLog),
+ {StartSeq0, History, MatchedSessionIds} = compare_replication_logs(SourceLog, TargetLog),
+ if not MatchedSessionIds ->
+ what => session_history_mismatch,
+ in => replicator,
+ calculated_start_seq => StartSeq0,
+ source => couch_replicator_api_wrap:db_uri(Source),
+ target => couch_replicator_api_wrap:db_uri(Target),
+ replication_id => Id,
+ details => "scanned histories to find common ancestor"
+ });
+ true ->
+ ok
+ end,
#{?REP_STATS := Stats0} = JobData,
Stats1 = couch_replicator_stats:new(Stats0),
@@ -1048,6 +1150,13 @@ maybe_save_migrated_log(#{?OPTIONS := Options}, Db, #doc{} = Doc, OldId) ->
case maps:get(<<"use_checkpoints">>, Options) of
true ->
update_checkpoint(Db, Doc),
+ what => migrated_checkpoint,
+ in => replicator,
+ db => httpdb_strip_creds(Db),
+ old_id => OldId,
+ new_id =>
+ }),
Msg = "Migrated replication checkpoint. Db:~p ~p -> ~p",
couch_log:notice(Msg, [httpdb_strip_creds(Db), OldId,]);
false ->
@@ -1116,6 +1225,13 @@ do_checkpoint(State) ->
{checkpoint_commit_failure, <<"Failure on target commit: ",
{SrcInstanceStartTime, TgtInstanceStartTime} ->
+ what => checkpoint,
+ in => replicator,
+ source => SourceName,
+ target => TargetName,
+ sequence => NewSeq
+ }),
couch_log:notice("recording a checkpoint for `~s` -> `~s` at "
"source update_seq ~p", [SourceName, TargetName, NewSeq]),
StartTime = couch_replicator_utils:rfc1123_local(RepStartTime),
@@ -1276,7 +1392,7 @@ compare_replication_logs(SrcDoc, TgtDoc) ->
OldSeqNum = get_value(<<"source_last_seq">>, RepRecProps,
OldHistory = get_value(<<"history">>, RepRecProps, []),
- {OldSeqNum, OldHistory};
+ {OldSeqNum, OldHistory, true};
false ->
SourceHistory = get_value(<<"history">>, RepRecProps, []),
TargetHistory = get_value(<<"history">>, RepRecPropsTgt, []),
@@ -1284,7 +1400,8 @@ compare_replication_logs(SrcDoc, TgtDoc) ->
"Scanning histories to find a common ancestor.", []),
couch_log:debug("Record on source:~p~nRecord on target:~p~n",
[RepRecProps, RepRecPropsTgt]),
- compare_rep_history(SourceHistory, TargetHistory)
+ {StartSeq, History} = compare_rep_history(SourceHistory, TargetHistory),
+ {StartSeq, History, false}
@@ -1431,6 +1548,17 @@ log_replication_start(#rep_state{} = RepState) ->
_ ->
"from _replicate endpoint"
+ what => starting_replication,
+ in => replicator,
+ source => Source,
+ target => Target,
+ replication_db => DbName,
+ replication_doc => DocId,
+ session_id => Sid,
+ worker_processes => Workers,
+ worker_batch_size => BatchSize
+ }),
Msg = "Starting replication ~s (~s -> ~s) ~s worker_procesess:~p"
" worker_batch_size:~p session_id:~s",
couch_log:notice(Msg, [Id, Source, Target, From, Workers, BatchSize, Sid]).
@@ -1447,6 +1575,13 @@ check_user_filter(#rep_state{} = State) ->
{RepId, BaseId} ->
{NewId, NewBaseId} when is_binary(NewId), is_binary(NewBaseId) ->
+ what => replication_id_updated,
+ in => replicator,
+ old_id => RepId,
+ new_id => NewId,
+ details => "replication job shutting down"
+ }),
LogMsg = "~p : Replication id was updated ~p -> ~p",
couch_log:error(LogMsg, [?MODULE, RepId, NewId]),
reschedule(undefined, Job, JobData),