summaryrefslogtreecommitdiff
path: root/src/couch_replicator/src/couch_replicator_scheduler_job.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couch_replicator/src/couch_replicator_scheduler_job.erl')
-rw-r--r--src/couch_replicator/src/couch_replicator_scheduler_job.erl28
1 files changed, 27 insertions, 1 deletions
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
index f669d464d..ca4d6c367 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
@@ -140,11 +140,13 @@ do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
% a batch of _changes rows to process -> check which revs are missing in the
% target, and for the missing ones, it copies them from the source to the target.
MaxConns = get_value(http_connections, Options),
+ StopOnDocWriteFailure = get_value(stop_on_doc_write_failure, Options),
Workers = lists:map(
fun(_) ->
couch_stats:increment_counter([couch_replicator, workers_started]),
{ok, Pid} = couch_replicator_worker:start_link(
- self(), Source, Target, ChangesManager, MaxConns),
+ self(), Source, Target, ChangesManager, MaxConns,
+ StopOnDocWriteFailure),
Pid
end,
lists:seq(1, NumWorkers)),
@@ -277,6 +279,11 @@ handle_info({'EXIT', Pid, {shutdown, max_backoff}}, State) ->
couch_log:error("Max backoff reached child process ~p", [Pid]),
{stop, {shutdown, max_backoff}, State};
+handle_info({'EXIT', Pid, {shutdown, {doc_write_failure, _, _}} = Error},
+ #rep_state{} = State) ->
+ couch_log:error("Worker ~p exited with doc write error ~p", [Pid, Error]),
+ {stop, Error, State};
+
handle_info({'EXIT', Pid, normal}, #rep_state{changes_reader=Pid} = State) ->
{noreply, State};
@@ -365,6 +372,25 @@ terminate(shutdown, #rep_state{rep_details = #rep{id = RepId}} = State) ->
couch_replicator_notifier:notify({stopped, RepId, <<"stopped">>}),
terminate_cleanup(State1);
+terminate({shutdown, {doc_write_failure, DocId, DocError}}, State) ->
+ #rep_state{
+ rep_details = #rep{id = {BaseId, Ext} = RepId},
+ source_name = Source,
+ target_name = Target
+ } = State,
+ % Checkpoint to avoid reprocessing the same changes during retries
+ case do_checkpoint(State) of
+ {ok, _} ->
+ ok;
+ Error ->
+ LogMsg = "~p : Failed doc_write_failure checkpoint. Error: ~p",
+ couch_log:error(LogMsg, [?MODULE, Error])
+ end,
+ RepIdStr = BaseId ++ Ext,
+ Msg = "Replication `~s` (~s -> ~s) failed with doc_write_failure ~p : ~p",
+ couch_log:warning(Msg, [RepIdStr, Source, Target, DocId, DocError]),
+ couch_replicator_notifier:notify({error, RepId, doc_write_failure});
+
terminate({shutdown, max_backoff}, {error, InitArgs}) ->
#rep{id = {BaseId, Ext} = RepId} = InitArgs,
couch_stats:increment_counter([couch_replicator, failed_starts]),