diff options
Diffstat (limited to 'src/couch_replicator/src/couch_replicator_scheduler_job.erl')
-rw-r--r-- | src/couch_replicator/src/couch_replicator_scheduler_job.erl | 28 |
1 files changed, 27 insertions, 1 deletions
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl index f669d464d..ca4d6c367 100644 --- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl +++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl @@ -140,11 +140,13 @@ do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) -> % a batch of _changes rows to process -> check which revs are missing in the % target, and for the missing ones, it copies them from the source to the target. MaxConns = get_value(http_connections, Options), + StopOnDocWriteFailure = get_value(stop_on_doc_write_failure, Options), Workers = lists:map( fun(_) -> couch_stats:increment_counter([couch_replicator, workers_started]), {ok, Pid} = couch_replicator_worker:start_link( - self(), Source, Target, ChangesManager, MaxConns), + self(), Source, Target, ChangesManager, MaxConns, + StopOnDocWriteFailure), Pid end, lists:seq(1, NumWorkers)), @@ -277,6 +279,11 @@ handle_info({'EXIT', Pid, {shutdown, max_backoff}}, State) -> couch_log:error("Max backoff reached child process ~p", [Pid]), {stop, {shutdown, max_backoff}, State}; +handle_info({'EXIT', Pid, {shutdown, {doc_write_failure, _, _}} = Error}, + #rep_state{} = State) -> + couch_log:error("Worker ~p exited with doc write error ~p", [Pid, Error]), + {stop, Error, State}; + handle_info({'EXIT', Pid, normal}, #rep_state{changes_reader=Pid} = State) -> {noreply, State}; @@ -365,6 +372,25 @@ terminate(shutdown, #rep_state{rep_details = #rep{id = RepId}} = State) -> couch_replicator_notifier:notify({stopped, RepId, <<"stopped">>}), terminate_cleanup(State1); +terminate({shutdown, {doc_write_failure, DocId, DocError}}, State) -> + #rep_state{ + rep_details = #rep{id = {BaseId, Ext} = RepId}, + source_name = Source, + target_name = Target + } = State, + % Checkpoint to avoid reprocessing the same changes during retries + case do_checkpoint(State) of + {ok, _} -> + ok; + Error -> + LogMsg = "~p : Failed doc_write_failure checkpoint. Error: ~p", + couch_log:error(LogMsg, [?MODULE, Error]) + end, + RepIdStr = BaseId ++ Ext, + Msg = "Replication `~s` (~s -> ~s) failed with doc_write_failure ~p : ~p", + couch_log:warning(Msg, [RepIdStr, Source, Target, DocId, DocError]), + couch_replicator_notifier:notify({error, RepId, doc_write_failure}); + terminate({shutdown, max_backoff}, {error, InitArgs}) -> #rep{id = {BaseId, Ext} = RepId} = InitArgs, couch_stats:increment_counter([couch_replicator, failed_starts]), |