diff options
Diffstat (limited to 'src/couch_replicator/src/couch_replicator_scheduler_job.erl')
-rw-r--r-- | src/couch_replicator/src/couch_replicator_scheduler_job.erl | 63 |
1 files changed, 37 insertions, 26 deletions
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl index d69febb81..0b33419e1 100644 --- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl +++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl @@ -73,8 +73,6 @@ workers, stats = couch_replicator_stats:new(), session_id, - source_monitor = nil, - target_monitor = nil, source_seq = nil, use_checkpoints = true, checkpoint_interval = ?DEFAULT_CHECKPOINT_INTERVAL, @@ -242,14 +240,6 @@ handle_cast({report_seq, Seq}, handle_info(shutdown, St) -> {stop, shutdown, St}; -handle_info({'DOWN', Ref, _, _, Why}, #rep_state{source_monitor = Ref} = St) -> - couch_log:error("Source database is down. Reason: ~p", [Why]), - {stop, source_db_down, St}; - -handle_info({'DOWN', Ref, _, _, Why}, #rep_state{target_monitor = Ref} = St) -> - couch_log:error("Target database is down. Reason: ~p", [Why]), - {stop, target_db_down, St}; - handle_info({'EXIT', Pid, max_backoff}, State) -> couch_log:error("Max backoff reached child process ~p", [Pid]), {stop, {shutdown, max_backoff}, State}; @@ -261,10 +251,20 @@ handle_info({'EXIT', Pid, {shutdown, max_backoff}}, State) -> handle_info({'EXIT', Pid, normal}, #rep_state{changes_reader=Pid} = State) -> {noreply, State}; -handle_info({'EXIT', Pid, Reason}, #rep_state{changes_reader=Pid} = State) -> +handle_info({'EXIT', Pid, Reason0}, #rep_state{changes_reader=Pid} = State) -> couch_stats:increment_counter([couch_replicator, changes_reader_deaths]), + Reason = case Reason0 of + {changes_req_failed, _, _} = HttpFail -> + HttpFail; + {http_request_failed, _, _, {error, {code, Code}}} -> + {changes_req_failed, Code}; + {http_request_failed, _, _, {error, Err}} -> + {changes_req_failed, Err}; + Other -> + {changes_reader_died, Other} + end, couch_log:error("ChangesReader process died with reason: ~p", [Reason]), - {stop, changes_reader_died, cancel_timer(State)}; + {stop, {shutdown, Reason}, cancel_timer(State)}; handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager = Pid} = State) -> {noreply, State}; @@ -272,7 +272,7 @@ handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager = Pid} = State) -> handle_info({'EXIT', Pid, Reason}, #rep_state{changes_manager = Pid} = State) -> couch_stats:increment_counter([couch_replicator, changes_manager_deaths]), couch_log:error("ChangesManager process died with reason: ~p", [Reason]), - {stop, changes_manager_died, cancel_timer(State)}; + {stop, {shutdown, {changes_manager_died, Reason}}, cancel_timer(State)}; handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) -> {noreply, State}; @@ -280,7 +280,7 @@ handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) -> handle_info({'EXIT', Pid, Reason}, #rep_state{changes_queue=Pid} = State) -> couch_stats:increment_counter([couch_replicator, changes_queue_deaths]), couch_log:error("ChangesQueue process died with reason: ~p", [Reason]), - {stop, changes_queue_died, cancel_timer(State)}; + {stop, {shutdown, {changes_queue_died, Reason}}, cancel_timer(State)}; handle_info({'EXIT', Pid, normal}, #rep_state{workers = Workers} = State) -> case Workers -- [Pid] of @@ -304,8 +304,14 @@ handle_info({'EXIT', Pid, Reason}, #rep_state{workers = Workers} = State) -> {stop, {unknown_process_died, Pid, Reason}, State2}; true -> couch_stats:increment_counter([couch_replicator, worker_deaths]), - couch_log:error("Worker ~p died with reason: ~p", [Pid, Reason]), - {stop, {worker_died, Pid, Reason}, State2} + StopReason = case Reason of + {shutdown, _} = Err -> + Err; + Other -> + couch_log:error("Worker ~p died with reason: ~p", [Pid, Reason]), + {worker_died, Pid, Other} + end, + {stop, StopReason, State2} end; handle_info(timeout, InitArgs) -> @@ -380,6 +386,11 @@ terminate({shutdown, max_backoff}, State) -> terminate_cleanup(State), couch_replicator_notifier:notify({error, RepId, max_backoff}); +terminate({shutdown, Reason}, State) -> + % Unwrap so when reporting we don't have an extra {shutdown, ...} tuple + % wrapped around the message + terminate(Reason, State); + terminate(Reason, State) -> #rep_state{ source_name = Source, @@ -554,7 +565,7 @@ init_state(Rep) -> options = Options, type = Type, view = View, start_time = StartTime, - stats = Stats + stats = ArgStats0 } = Rep, % Adjust minimum number of http source connections to 2 to avoid deadlock Src = adjust_maxconn(Src0, BaseId), @@ -569,6 +580,14 @@ init_state(Rep) -> [SourceLog, TargetLog] = find_and_migrate_logs([Source, Target], Rep), {StartSeq0, History} = compare_replication_logs(SourceLog, TargetLog), + + ArgStats1 = couch_replicator_stats:new(ArgStats0), + HistoryStats = case History of + [{[_ | _] = HProps} | _] -> couch_replicator_stats:new(HProps); + _ -> couch_replicator_stats:new() + end, + Stats = couch_replicator_stats:max_stats(ArgStats1, HistoryStats), + StartSeq1 = get_value(since_seq, Options, StartSeq0), StartSeq = {0, StartSeq1}, @@ -592,15 +611,13 @@ init_state(Rep) -> src_starttime = get_value(<<"instance_start_time">>, SourceInfo), tgt_starttime = get_value(<<"instance_start_time">>, TargetInfo), session_id = couch_uuids:random(), - source_monitor = db_monitor(Source), - target_monitor = db_monitor(Target), source_seq = SourceSeq, use_checkpoints = get_value(use_checkpoints, Options, true), checkpoint_interval = get_value(checkpoint_interval, Options, ?DEFAULT_CHECKPOINT_INTERVAL), type = Type, view = View, - stats = couch_replicator_stats:new(Stats) + stats = Stats }, State#rep_state{timer = start_timer(State)}. @@ -905,12 +922,6 @@ has_session_id(SessionId, [{Props} | Rest]) -> end. -db_monitor(#httpdb{}) -> - nil; -db_monitor(Db) -> - couch_db:monitor(Db). - - get_pending_count(St) -> Rep = St#rep_state.rep_details, Timeout = get_value(connection_timeout, Rep#rep.options), |