summaryrefslogtreecommitdiff
path: root/src/couch_replicator/src/couch_replicator_scheduler_job.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couch_replicator/src/couch_replicator_scheduler_job.erl')
-rw-r--r--src/couch_replicator/src/couch_replicator_scheduler_job.erl63
1 files changed, 37 insertions, 26 deletions
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
index d69febb81..0b33419e1 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
@@ -73,8 +73,6 @@
workers,
stats = couch_replicator_stats:new(),
session_id,
- source_monitor = nil,
- target_monitor = nil,
source_seq = nil,
use_checkpoints = true,
checkpoint_interval = ?DEFAULT_CHECKPOINT_INTERVAL,
@@ -242,14 +240,6 @@ handle_cast({report_seq, Seq},
handle_info(shutdown, St) ->
{stop, shutdown, St};
-handle_info({'DOWN', Ref, _, _, Why}, #rep_state{source_monitor = Ref} = St) ->
- couch_log:error("Source database is down. Reason: ~p", [Why]),
- {stop, source_db_down, St};
-
-handle_info({'DOWN', Ref, _, _, Why}, #rep_state{target_monitor = Ref} = St) ->
- couch_log:error("Target database is down. Reason: ~p", [Why]),
- {stop, target_db_down, St};
-
handle_info({'EXIT', Pid, max_backoff}, State) ->
couch_log:error("Max backoff reached child process ~p", [Pid]),
{stop, {shutdown, max_backoff}, State};
@@ -261,10 +251,20 @@ handle_info({'EXIT', Pid, {shutdown, max_backoff}}, State) ->
handle_info({'EXIT', Pid, normal}, #rep_state{changes_reader=Pid} = State) ->
{noreply, State};
-handle_info({'EXIT', Pid, Reason}, #rep_state{changes_reader=Pid} = State) ->
+handle_info({'EXIT', Pid, Reason0}, #rep_state{changes_reader=Pid} = State) ->
couch_stats:increment_counter([couch_replicator, changes_reader_deaths]),
+ Reason = case Reason0 of
+ {changes_req_failed, _, _} = HttpFail ->
+ HttpFail;
+ {http_request_failed, _, _, {error, {code, Code}}} ->
+ {changes_req_failed, Code};
+ {http_request_failed, _, _, {error, Err}} ->
+ {changes_req_failed, Err};
+ Other ->
+ {changes_reader_died, Other}
+ end,
couch_log:error("ChangesReader process died with reason: ~p", [Reason]),
- {stop, changes_reader_died, cancel_timer(State)};
+ {stop, {shutdown, Reason}, cancel_timer(State)};
handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager = Pid} = State) ->
{noreply, State};
@@ -272,7 +272,7 @@ handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager = Pid} = State) ->
handle_info({'EXIT', Pid, Reason}, #rep_state{changes_manager = Pid} = State) ->
couch_stats:increment_counter([couch_replicator, changes_manager_deaths]),
couch_log:error("ChangesManager process died with reason: ~p", [Reason]),
- {stop, changes_manager_died, cancel_timer(State)};
+ {stop, {shutdown, {changes_manager_died, Reason}}, cancel_timer(State)};
handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) ->
{noreply, State};
@@ -280,7 +280,7 @@ handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) ->
handle_info({'EXIT', Pid, Reason}, #rep_state{changes_queue=Pid} = State) ->
couch_stats:increment_counter([couch_replicator, changes_queue_deaths]),
couch_log:error("ChangesQueue process died with reason: ~p", [Reason]),
- {stop, changes_queue_died, cancel_timer(State)};
+ {stop, {shutdown, {changes_queue_died, Reason}}, cancel_timer(State)};
handle_info({'EXIT', Pid, normal}, #rep_state{workers = Workers} = State) ->
case Workers -- [Pid] of
@@ -304,8 +304,14 @@ handle_info({'EXIT', Pid, Reason}, #rep_state{workers = Workers} = State) ->
{stop, {unknown_process_died, Pid, Reason}, State2};
true ->
couch_stats:increment_counter([couch_replicator, worker_deaths]),
- couch_log:error("Worker ~p died with reason: ~p", [Pid, Reason]),
- {stop, {worker_died, Pid, Reason}, State2}
+ StopReason = case Reason of
+ {shutdown, _} = Err ->
+ Err;
+ Other ->
+ couch_log:error("Worker ~p died with reason: ~p", [Pid, Reason]),
+ {worker_died, Pid, Other}
+ end,
+ {stop, StopReason, State2}
end;
handle_info(timeout, InitArgs) ->
@@ -380,6 +386,11 @@ terminate({shutdown, max_backoff}, State) ->
terminate_cleanup(State),
couch_replicator_notifier:notify({error, RepId, max_backoff});
+terminate({shutdown, Reason}, State) ->
+ % Unwrap so when reporting we don't have an extra {shutdown, ...} tuple
+ % wrapped around the message
+ terminate(Reason, State);
+
terminate(Reason, State) ->
#rep_state{
source_name = Source,
@@ -554,7 +565,7 @@ init_state(Rep) ->
options = Options,
type = Type, view = View,
start_time = StartTime,
- stats = Stats
+ stats = ArgStats0
} = Rep,
% Adjust minimum number of http source connections to 2 to avoid deadlock
Src = adjust_maxconn(Src0, BaseId),
@@ -569,6 +580,14 @@ init_state(Rep) ->
[SourceLog, TargetLog] = find_and_migrate_logs([Source, Target], Rep),
{StartSeq0, History} = compare_replication_logs(SourceLog, TargetLog),
+
+ ArgStats1 = couch_replicator_stats:new(ArgStats0),
+ HistoryStats = case History of
+ [{[_ | _] = HProps} | _] -> couch_replicator_stats:new(HProps);
+ _ -> couch_replicator_stats:new()
+ end,
+ Stats = couch_replicator_stats:max_stats(ArgStats1, HistoryStats),
+
StartSeq1 = get_value(since_seq, Options, StartSeq0),
StartSeq = {0, StartSeq1},
@@ -592,15 +611,13 @@ init_state(Rep) ->
src_starttime = get_value(<<"instance_start_time">>, SourceInfo),
tgt_starttime = get_value(<<"instance_start_time">>, TargetInfo),
session_id = couch_uuids:random(),
- source_monitor = db_monitor(Source),
- target_monitor = db_monitor(Target),
source_seq = SourceSeq,
use_checkpoints = get_value(use_checkpoints, Options, true),
checkpoint_interval = get_value(checkpoint_interval, Options,
?DEFAULT_CHECKPOINT_INTERVAL),
type = Type,
view = View,
- stats = couch_replicator_stats:new(Stats)
+ stats = Stats
},
State#rep_state{timer = start_timer(State)}.
@@ -905,12 +922,6 @@ has_session_id(SessionId, [{Props} | Rest]) ->
end.
-db_monitor(#httpdb{}) ->
- nil;
-db_monitor(Db) ->
- couch_db:monitor(Db).
-
-
get_pending_count(St) ->
Rep = St#rep_state.rep_details,
Timeout = get_value(connection_timeout, Rep#rep.options),