diff options
Diffstat (limited to 'src/couch_replicator/src/couch_replicator_job.erl')
-rw-r--r-- | src/couch_replicator/src/couch_replicator_job.erl | 167 |
1 files changed, 151 insertions, 16 deletions
diff --git a/src/couch_replicator/src/couch_replicator_job.erl b/src/couch_replicator/src/couch_replicator_job.erl index c8c143a58..951471a14 100644 --- a/src/couch_replicator/src/couch_replicator_job.erl +++ b/src/couch_replicator/src/couch_replicator_job.erl @@ -39,6 +39,7 @@ -include_lib("couch/include/couch_db.hrl"). -include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl"). -include("couch_replicator.hrl"). +-include_lib("kernel/include/logger.hrl"). -define(LOWEST_SEQ, 0). @@ -116,6 +117,12 @@ terminate(shutdown, #rep_state{} = State0) -> {ok, State2} -> State2; Error -> + ?LOG_ERROR(#{ + what => checkpoint_failure, + in => replicator, + jobid => State1#rep_state.id, + details => Error + }), Msg = "~p : Failed last checkpoint. Job: ~p Error: ~p", couch_log:error(Msg, [?MODULE, State1#rep_state.id, Error]), State1 @@ -127,9 +134,20 @@ terminate(shutdown, #rep_state{} = State0) -> terminate({shutdown, Error}, {init_error, Stack}) -> % Termination in init, before the job had initialized case Error of - max_backoff -> couch_log:warning("~p job backed off", [?MODULE]); - finished -> couch_log:notice("~p job finished in init", [?MODULE]); - _ -> couch_log:error("~p job failed ~p ~p", [?MODULE, Error, Stack]) + max_backoff -> + ?LOG_WARNING(#{what => job_backoff, in => replicator}), + couch_log:warning("~p job backed off", [?MODULE]); + finished -> + ?LOG_NOTICE(#{what => job_finished_during_init, in => replicator}), + couch_log:notice("~p job finished in init", [?MODULE]); + _ -> + ?LOG_ERROR(#{ + what => job_failure, + in => replicator, + details => Error, + stacktrace => Stack + }), + couch_log:error("~p job failed ~p ~p", [?MODULE, Error, Stack]) end, ok; @@ -139,6 +157,11 @@ terminate({shutdown, finished}, #rep_state{} = State) -> terminate({shutdown, halt}, #rep_state{} = State) -> % Job is re-enqueued and possibly already running somewhere else + ?LOG_ERROR(#{ + what => job_halted, + in => replicator, + jobid => State#rep_state.id + }), couch_log:error("~p job ~p halted", [?MODULE, State#rep_state.id]), ok = close_endpoints(State); @@ -155,6 +178,14 @@ terminate(Reason0, #rep_state{} = State0) -> source_name = Source, target_name = Target } = State, + ?LOG_ERROR(#{ + what => job_failure, + in => replicator, + replication_id => RepId, + source => Source, + target => Target, + details => Reason + }), couch_log:error("Replication `~s` (`~s` -> `~s`) failed: ~p", [RepId, Source, Target, Reason]), ok = reschedule_on_error(undefined, Job, JobData, Reason), @@ -189,6 +220,21 @@ handle_call({report_seq_done, Seq, StatsInc}, From, #rep_state{} = State) -> _ -> NewThroughSeq0 end, + ?LOG_DEBUG(#{ + what => progress_report, + in => replicator, + old => #{ + highest_seq_done => HighestDone, + current_through_seq => ThroughSeq, + seqs_in_progress => SeqsInProgress + }, + new => #{ + highest_seq_done => NewHighestDone, + current_through_seq => NewThroughSeq, + seqs_in_progress => NewSeqsInProgress + }, + worker_reported_seq => Seq + }), couch_log:debug("Worker reported seq ~p, through seq was ~p, " "new through seq is ~p, highest seq done was ~p, " "new highest seq done is ~p~n" @@ -221,12 +267,10 @@ handle_info(timeout, delayed_init) -> {ok, State} -> {noreply, State}; {stop, Reason, State} -> {stop, Reason, State} catch - exit:{shutdown, Exit} when Exit =:= finished orelse Exit =:= halt -> - Stack = erlang:get_stacktrace(), + exit:{shutdown, Exit}:Stack when Exit =:= finished orelse Exit =:= halt -> {stop, {shutdown, Exit}, {init_error, Stack}}; - _Tag:Error -> + _Tag:Error:Stack -> ShutdownReason = {error, replication_start_error(Error)}, - Stack = erlang:get_stacktrace(), {stop, {shutdown, ShutdownReason}, {init_error, Stack}} end; @@ -253,10 +297,12 @@ handle_info(shutdown, St) -> {stop, shutdown, St}; handle_info({'EXIT', Pid, max_backoff}, State) -> + ?LOG_ERROR(#{what => max_backoff, in => replicator, pid => Pid}), couch_log:error("Max backoff reached child process ~p", [Pid]), {stop, {shutdown, max_backoff}, State}; handle_info({'EXIT', Pid, {shutdown, max_backoff}}, State) -> + ?LOG_ERROR(#{what => max_backoff, in => replicator, pid => Pid}), couch_log:error("Max backoff reached child process ~p", [Pid]), {stop, {shutdown, max_backoff}, State}; @@ -275,6 +321,7 @@ handle_info({'EXIT', Pid, Reason0}, #rep_state{changes_reader=Pid} = State) -> Other -> {changes_reader_died, Other} end, + ?LOG_ERROR(#{what => changes_reader_crash, in => replicator, details => Reason}), couch_log:error("ChangesReader process died with reason: ~p", [Reason]), {stop, {shutdown, Reason}, cancel_timers(State)}; @@ -283,6 +330,7 @@ handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager=Pid} = State) -> handle_info({'EXIT', Pid, Reason}, #rep_state{changes_manager=Pid} = State) -> couch_stats:increment_counter([couch_replicator, changes_manager_deaths]), + ?LOG_ERROR(#{what => changes_manager_crash, in => replicator, details => Reason}), couch_log:error("ChangesManager process died with reason: ~p", [Reason]), {stop, {shutdown, {changes_manager_died, Reason}}, cancel_timers(State)}; @@ -291,6 +339,7 @@ handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) -> handle_info({'EXIT', Pid, Reason}, #rep_state{changes_queue=Pid} = State) -> couch_stats:increment_counter([couch_replicator, changes_queue_deaths]), + ?LOG_ERROR(#{what => changes_queue_crash, in => replicator, details => Reason}), couch_log:error("ChangesQueue process died with reason: ~p", [Reason]), {stop, {shutdown, {changes_queue_died, Reason}}, cancel_timers(State)}; @@ -299,6 +348,12 @@ handle_info({'EXIT', Pid, normal}, #rep_state{workers = Workers} = State) -> Workers -> %% Processes might be linked by replicator's auth plugins so %% we tolerate them exiting `normal` here and don't crash + ?LOG_WARNING(#{ + what => linked_process_exit, + in => replicator, + pid => Pid, + reason => normal + }), LogMsg = "~p: unknown pid exited `normal` ~p", couch_log:error(LogMsg, [?MODULE, Pid]), {noreply, State#rep_state{workers = Workers}}; @@ -321,6 +376,12 @@ handle_info({'EXIT', Pid, Reason}, #rep_state{workers = Workers} = State) -> {shutdown, _} = Err -> Err; Other -> + ?LOG_ERROR(#{ + what => worker_crash, + in => replicator, + pid => Pid, + details => Reason + }), ErrLog = "Worker ~p died with reason: ~p", couch_log:error(ErrLog, [Pid, Reason]), {worker_died, Pid, Other} @@ -329,6 +390,11 @@ handle_info({'EXIT', Pid, Reason}, #rep_state{workers = Workers} = State) -> end; handle_info({Ref, ready}, St) when is_reference(Ref) -> + ?LOG_NOTICE(#{ + what => spurious_future_ready_message, + in => replicator, + ref => Ref + }), LogMsg = "~p : spurious erlfdb future ready message ~p", couch_log:notice(LogMsg, [?MODULE, Ref]), {noreply, St}; @@ -406,16 +472,19 @@ delayed_init() -> try do_init(Job, JobData) of State = #rep_state{} -> {ok, State} catch - exit:{http_request_failed, _, _, max_backoff} -> - Stack = erlang:get_stacktrace(), + exit:{http_request_failed, _, _, max_backoff}:Stack -> reschedule_on_error(undefined, Job, JobData, max_backoff), {stop, {shutdown, max_backoff}, {init_error, Stack}}; - exit:{shutdown, Exit} when Exit =:= finished orelse Exit =:= halt -> - Stack = erlang:get_stacktrace(), + exit:{shutdown, Exit}:Stack when Exit =:= finished orelse Exit =:= halt -> {stop, {shutdown, Exit}, {init_error, Stack}}; - _Tag:Error -> + _Tag:Error:Stack -> Reason = {error, replication_start_error(Error)}, - Stack = erlang:get_stacktrace(), + ?LOG_ERROR(#{ + what => job_failure_during_init, + job => Job, + details => Reason, + stacktrace => Stack + }), ErrMsg = "~p : job ~p failed during startup ~p stack:~p", couch_log:error(ErrMsg, [?MODULE, Job, Reason, Stack]), reschedule_on_error(undefined, Job, JobData, Reason), @@ -576,12 +645,25 @@ check_ownership(#{jtx := true} = JTx, Job, JobData) -> fail_job(JTx, Job, JobData, Error), not_owner; {ok, #{}} -> + ?LOG_WARNING(#{ + what => duplicate_job_detected, + in => replicator, + jobid => JobId, + other_jobid => OtherJobId, + replication_id => RepId + }), LogMsg = "~p : Job ~p usurping job ~p for replication ~p", couch_log:warning(LogMsg, [?MODULE, JobId, OtherJobId, RepId]), couch_replicator_jobs:update_rep_id(JTx, JobId, RepId), owner; {error, not_found} -> + ?LOG_ERROR(#{ + what => orphaned_job_mapping, + in => replicator, + replication_id => RepId, + jobid => OtherJobId + }), LogMsg = "~p : Orphan replication job reference ~p -> ~p", couch_log:error(LogMsg, [?MODULE, RepId, OtherJobId]), couch_replicator_jobs:update_rep_id(JTx, JobId, RepId), @@ -866,6 +948,12 @@ state_strip_creds(#rep_state{source = Source, target = Target} = State) -> adjust_maxconn(Src = #{<<"http_connections">> := 1}, RepId) -> + ?LOG_NOTICE(#{ + what => minimum_source_connections_override, + in => replicator, + replication_id => RepId, + details => "adjusting minimum source connections to 2" + }), Msg = "Adjusting minimum number of HTTP source connections to 2 for ~p", couch_log:notice(Msg, [RepId]), Src#{<<"http_connections">> := 2}; @@ -965,7 +1053,21 @@ init_state(#{} = Job, #{} = JobData) -> [SourceLog, TargetLog] = find_and_migrate_logs([Source, Target], Rep, BaseId), - {StartSeq0, History} = compare_replication_logs(SourceLog, TargetLog), + {StartSeq0, History, MatchedSessionIds} = compare_replication_logs(SourceLog, TargetLog), + + if not MatchedSessionIds -> + ?LOG_NOTICE(#{ + what => session_history_mismatch, + in => replicator, + calculated_start_seq => StartSeq0, + source => couch_replicator_api_wrap:db_uri(Source), + target => couch_replicator_api_wrap:db_uri(Target), + replication_id => Id, + details => "scanned histories to find common ancestor" + }); + true -> + ok + end, #{?REP_STATS := Stats0} = JobData, Stats1 = couch_replicator_stats:new(Stats0), @@ -1048,6 +1150,13 @@ maybe_save_migrated_log(#{?OPTIONS := Options}, Db, #doc{} = Doc, OldId) -> case maps:get(<<"use_checkpoints">>, Options) of true -> update_checkpoint(Db, Doc), + ?LOG_NOTICE(#{ + what => migrated_checkpoint, + in => replicator, + db => httpdb_strip_creds(Db), + old_id => OldId, + new_id => Doc#doc.id + }), Msg = "Migrated replication checkpoint. Db:~p ~p -> ~p", couch_log:notice(Msg, [httpdb_strip_creds(Db), OldId, Doc#doc.id]); false -> @@ -1116,6 +1225,13 @@ do_checkpoint(State) -> {checkpoint_commit_failure, <<"Failure on target commit: ", (couch_util:to_binary(Reason))/binary>>}; {SrcInstanceStartTime, TgtInstanceStartTime} -> + ?LOG_NOTICE(#{ + what => checkpoint, + in => replicator, + source => SourceName, + target => TargetName, + sequence => NewSeq + }), couch_log:notice("recording a checkpoint for `~s` -> `~s` at " "source update_seq ~p", [SourceName, TargetName, NewSeq]), StartTime = couch_replicator_utils:rfc1123_local(RepStartTime), @@ -1276,7 +1392,7 @@ compare_replication_logs(SrcDoc, TgtDoc) -> OldSeqNum = get_value(<<"source_last_seq">>, RepRecProps, ?LOWEST_SEQ), OldHistory = get_value(<<"history">>, RepRecProps, []), - {OldSeqNum, OldHistory}; + {OldSeqNum, OldHistory, true}; false -> SourceHistory = get_value(<<"history">>, RepRecProps, []), TargetHistory = get_value(<<"history">>, RepRecPropsTgt, []), @@ -1284,7 +1400,8 @@ compare_replication_logs(SrcDoc, TgtDoc) -> "Scanning histories to find a common ancestor.", []), couch_log:debug("Record on source:~p~nRecord on target:~p~n", [RepRecProps, RepRecPropsTgt]), - compare_rep_history(SourceHistory, TargetHistory) + {StartSeq, History} = compare_rep_history(SourceHistory, TargetHistory), + {StartSeq, History, false} end. @@ -1431,6 +1548,17 @@ log_replication_start(#rep_state{} = RepState) -> _ -> "from _replicate endpoint" end, + ?LOG_NOTICE(#{ + what => starting_replication, + in => replicator, + source => Source, + target => Target, + replication_db => DbName, + replication_doc => DocId, + session_id => Sid, + worker_processes => Workers, + worker_batch_size => BatchSize + }), Msg = "Starting replication ~s (~s -> ~s) ~s worker_procesess:~p" " worker_batch_size:~p session_id:~s", couch_log:notice(Msg, [Id, Source, Target, From, Workers, BatchSize, Sid]). @@ -1447,6 +1575,13 @@ check_user_filter(#rep_state{} = State) -> {RepId, BaseId} -> ok; {NewId, NewBaseId} when is_binary(NewId), is_binary(NewBaseId) -> + ?LOG_ERROR(#{ + what => replication_id_updated, + in => replicator, + old_id => RepId, + new_id => NewId, + details => "replication job shutting down" + }), LogMsg = "~p : Replication id was updated ~p -> ~p", couch_log:error(LogMsg, [?MODULE, RepId, NewId]), reschedule(undefined, Job, JobData), |