diff options
authorNick Vatamaniuc <>2018-11-13 11:41:54 -0500
committerNick Vatamaniuc <>2018-11-13 18:16:33 -0500
commitdd0fa9916cef66401ecc186796f3d0e70d0cef2b (patch)
parentf12f4c5847a1cfa409c70dbc75b64a4f9e3645a3 (diff)
Stop replications on target document write failuresfail-on-doc-write-mode
Previously when target document writes failed, because a VDU prevented the write, or it exceed size limits on the target cluster, replication continued and a `doc_write_failures` statistic counter was incremented. The counter was visible in _active_tasks output for continuous replications and was visible in the completion record written back to the replication documents. That behavior might not be suitable in many cases. For instance, when migrating data, if the counter is ignored by the user, combined with a successful replication completion could lead to a perceived data loss. Until recently 00b28c265d97df675b725cd68897dc371cbd7168 this was even worse because on replication scheduler would reset statistics counters on every job start. So if a job restarted at least once, user might never find out that all their data didn't copy from the source to the target. Introduce a replicator config `stop_on_doc_write_failure = true | false` where the replication crashes if a single document write fails. This will many write failures visible to the and they would know exactly which document failed and why. Replications which crash because of doc write failures would retry periodically just like any other crashes related to missing source or target connectivity, for example. They do not fail permanently because users could adjust document size limit on a target cluster or change the VDU function and then replication will start working again and complete. Here we rely on exponential backoffs which were introduced with the scheduling replicator work. At the maximum backoff interval replications would retry about once every 8 hours. After the failure is handled and bubles up to the main replication job process, there is an attempt to checkpoint before exiting with the error. That is done in order to reduce change feed reprocessing during each retry attempt.
6 files changed, 224 insertions, 70 deletions
diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini
index edaebf9e2..356bbf36f 100644
--- a/rel/overlay/etc/default.ini
+++ b/rel/overlay/etc/default.ini
@@ -395,6 +395,14 @@ ssl_certificate_max_depth = 3
; To restore the old behaviour, use the following value:
;auth_plugins = couch_replicator_auth_noop
+; Stop replication jobs with an error on first document write failure to the
+; target cluster. By default, when false, only a doc_write_failures counter is
+; incremented but the replication continues on. When true the replication will
+; crash and the document id and the reason for doc failure will be reported
+; in the replication state. Replication will then be retried periodically with
+; exponential backoffs like any other replication.
+;stop_on_doc_write_failure = false
; The delay, in seconds, between each check for which database and view indexes
; need to be compacted.
diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl b/src/couch_replicator/src/couch_replicator_api_wrap.erl
index 44c290d33..f94a6f207 100644
--- a/src/couch_replicator/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl
@@ -304,6 +304,12 @@ open_doc_revs(#httpdb{} = HttpDb, Id, Revs, Options, Fun, Acc) ->
{'DOWN', Ref, process, Pid, {http_request_failed, _, _, max_backoff}} ->
+ {'DOWN', Ref, process, Pid, {http_request_failed, _, _,
+ {shutdown, {doc_write_failure, _, _}} = DocWriteFailure}} ->
+ exit(DocWriteFailure);
+ {'DOWN', Ref, process, Pid, {shutdown, {doc_write_failure, _, _}} =
+ DocWriteFailure} ->
+ exit(DocWriteFailure);
{'DOWN', Ref, process, Pid, request_uri_too_long} ->
NewMaxLen = get_value(max_url_len, Options, ?MAX_URL_LEN) div 2,
case NewMaxLen < ?MIN_URL_LEN of
diff --git a/src/couch_replicator/src/couch_replicator_docs.erl b/src/couch_replicator/src/couch_replicator_docs.erl
index 013475683..e6c71bfa5 100644
--- a/src/couch_replicator/src/couch_replicator_docs.erl
+++ b/src/couch_replicator/src/couch_replicator_docs.erl
@@ -462,6 +462,8 @@ make_options(Props) ->
{ok, DefSocketOptions} = couch_util:parse_term(
config:get("replicator", "socket_options",
"[{keepalive, true}, {nodelay, false}]")),
+ StopOnDocWriteFailure = config:get("replicator",
+ "stop_on_doc_write_failure", "false"),
lists:ukeymerge(1, Options, lists:keysort(1, [
{connection_timeout, list_to_integer(DefTimeout)},
{retries, list_to_integer(DefRetries)},
@@ -470,7 +472,9 @@ make_options(Props) ->
{worker_batch_size, list_to_integer(DefBatchSize)},
{worker_processes, list_to_integer(DefWorkers)},
{use_checkpoints, list_to_existing_atom(UseCheckpoints)},
- {checkpoint_interval, list_to_integer(DefCheckpointInterval)}
+ {checkpoint_interval, list_to_integer(DefCheckpointInterval)},
+ {stop_on_doc_write_failure,
+ list_to_existing_atom(StopOnDocWriteFailure)}
@@ -533,6 +537,12 @@ convert_options([{<<"use_checkpoints">>, V} | R]) ->
[{use_checkpoints, V} | convert_options(R)];
convert_options([{<<"checkpoint_interval">>, V} | R]) ->
[{checkpoint_interval, couch_util:to_integer(V)} | convert_options(R)];
+convert_options([{<<"stop_on_doc_write_failure">>, V} | _R])
+ when not is_boolean(V)->
+ ErrMsg = <<"parameter `stop_on_doc_write_failure` must be a boolean">>,
+ throw({bad_request, ErrMsg});
+convert_options([{<<"stop_on_doc_write_failure">>, V} | R]) ->
+ [{stop_on_doc_write_failure, V} | convert_options(R)];
convert_options([_ | R]) -> % skip unknown option
@@ -737,7 +747,9 @@ check_convert_options_pass_test() ->
?assertEqual([{doc_ids, [<<"id">>]}],
convert_options([{<<"doc_ids">>, [<<"id">>]}])),
?assertEqual([{selector, {key, value}}],
- convert_options([{<<"selector">>, {key, value}}])).
+ convert_options([{<<"selector">>, {key, value}}])),
+ ?assertEqual([{stop_on_doc_write_failure, true}],
+ convert_options([{<<"stop_on_doc_write_failure">>, true}])).
check_convert_options_fail_test() ->
@@ -750,7 +762,10 @@ check_convert_options_fail_test() ->
?assertThrow({bad_request, _},
convert_options([{<<"doc_ids">>, not_a_list}])),
?assertThrow({bad_request, _},
- convert_options([{<<"selector">>, [{key, value}]}])).
+ convert_options([{<<"selector">>, [{key, value}]}])),
+ ?assertThrow({bad_request, _},
+ convert_options([{<<"stop_on_doc_write_failure">>, "42"}])).
check_strip_credentials_test() ->
[?assertEqual(Expected, strip_credentials(Body)) || {Expected, Body} <- [
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
index f669d464d..ca4d6c367 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
@@ -140,11 +140,13 @@ do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
% a batch of _changes rows to process -> check which revs are missing in the
% target, and for the missing ones, it copies them from the source to the target.
MaxConns = get_value(http_connections, Options),
+ StopOnDocWriteFailure = get_value(stop_on_doc_write_failure, Options),
Workers = lists:map(
fun(_) ->
couch_stats:increment_counter([couch_replicator, workers_started]),
{ok, Pid} = couch_replicator_worker:start_link(
- self(), Source, Target, ChangesManager, MaxConns),
+ self(), Source, Target, ChangesManager, MaxConns,
+ StopOnDocWriteFailure),
lists:seq(1, NumWorkers)),
@@ -277,6 +279,11 @@ handle_info({'EXIT', Pid, {shutdown, max_backoff}}, State) ->
couch_log:error("Max backoff reached child process ~p", [Pid]),
{stop, {shutdown, max_backoff}, State};
+handle_info({'EXIT', Pid, {shutdown, {doc_write_failure, _, _}} = Error},
+ #rep_state{} = State) ->
+ couch_log:error("Worker ~p exited with doc write error ~p", [Pid, Error]),
+ {stop, Error, State};
handle_info({'EXIT', Pid, normal}, #rep_state{changes_reader=Pid} = State) ->
{noreply, State};
@@ -365,6 +372,25 @@ terminate(shutdown, #rep_state{rep_details = #rep{id = RepId}} = State) ->
couch_replicator_notifier:notify({stopped, RepId, <<"stopped">>}),
+terminate({shutdown, {doc_write_failure, DocId, DocError}}, State) ->
+ #rep_state{
+ rep_details = #rep{id = {BaseId, Ext} = RepId},
+ source_name = Source,
+ target_name = Target
+ } = State,
+ % Checkpoint to avoid reprocessing the same changes during retries
+ case do_checkpoint(State) of
+ {ok, _} ->
+ ok;
+ Error ->
+ LogMsg = "~p : Failed doc_write_failure checkpoint. Error: ~p",
+ couch_log:error(LogMsg, [?MODULE, Error])
+ end,
+ RepIdStr = BaseId ++ Ext,
+ Msg = "Replication `~s` (~s -> ~s) failed with doc_write_failure ~p : ~p",
+ couch_log:warning(Msg, [RepIdStr, Source, Target, DocId, DocError]),
+ couch_replicator_notifier:notify({error, RepId, doc_write_failure});
terminate({shutdown, max_backoff}, {error, InitArgs}) ->
#rep{id = {BaseId, Ext} = RepId} = InitArgs,
couch_stats:increment_counter([couch_replicator, failed_starts]),
diff --git a/src/couch_replicator/src/couch_replicator_worker.erl b/src/couch_replicator/src/couch_replicator_worker.erl
index ec98fa0f3..9b8baabe3 100644
--- a/src/couch_replicator/src/couch_replicator_worker.erl
+++ b/src/couch_replicator/src/couch_replicator_worker.erl
@@ -15,7 +15,7 @@
% public API
% gen_server callbacks
-export([init/1, terminate/2, code_change/3]).
@@ -64,28 +64,39 @@
stats = couch_replicator_stats:new(),
source_db_compaction_notifier = nil,
target_db_compaction_notifier = nil,
- batch = #batch{}
+ batch = #batch{},
+ stop_on_doc_write_failure
-start_link(Cp, #httpdb{} = Source, Target, ChangesManager, MaxConns) ->
- gen_server:start_link(
- ?MODULE, {Cp, Source, Target, ChangesManager, MaxConns}, []);
+start_link(Cp, #httpdb{} = Source, Target, ChangesManager, MaxConns,
+ StopOnDocWriteFailure) ->
+ gen_server:start_link(?MODULE, {
+ Cp,
+ Source,
+ Target,
+ ChangesManager,
+ MaxConns,
+ StopOnDocWriteFailure
+ }, []);
-start_link(Cp, Source, Target, ChangesManager, _MaxConns) ->
+start_link(Cp, Source, Target, ChangesManager, _MaxConns,
+ StopOnDocWriteFailure) ->
Pid = spawn_link(fun() ->
erlang:put(last_stats_report, os:timestamp()),
- queue_fetch_loop(Source, Target, Cp, Cp, ChangesManager)
+ queue_fetch_loop(Source, Target, Cp, Cp, ChangesManager,
+ StopOnDocWriteFailure)
{ok, Pid}.
-init({Cp, Source, Target, ChangesManager, MaxConns}) ->
+init({Cp, Source, Target, ChangesManager, MaxConns, StopOnDocWriteFailure}) ->
process_flag(trap_exit, true),
Parent = self(),
LoopPid = spawn_link(fun() ->
- queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager)
+ queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager,
+ StopOnDocWriteFailure)
erlang:put(last_stats_report, os:timestamp()),
State = #state{
@@ -97,17 +108,19 @@ init({Cp, Source, Target, ChangesManager, MaxConns}) ->
source_db_compaction_notifier =
start_db_compaction_notifier(Source, self()),
target_db_compaction_notifier =
- start_db_compaction_notifier(Target, self())
+ start_db_compaction_notifier(Target, self()),
+ stop_on_doc_write_failure = StopOnDocWriteFailure
{ok, State}.
handle_call({fetch_doc, {_Id, _Revs, _PAs} = Params}, {Pid, _} = From,
#state{loop = Pid, readers = Readers, pending_fetch = nil,
- source = Src, target = Tgt, max_parallel_conns = MaxConns} = State) ->
+ source = Src, target = Tgt, max_parallel_conns = MaxConns,
+ stop_on_doc_write_failure = StopOnDocWriteFailure} = State) ->
case length(Readers) of
Size when Size < MaxConns ->
- Reader = spawn_doc_reader(Src, Tgt, Params),
+ Reader = spawn_doc_reader(Src, Tgt, Params, StopOnDocWriteFailure),
NewState = State#state{
readers = [Reader | Readers]
@@ -131,10 +144,12 @@ handle_call({add_stats, IncStats}, From, #state{stats = Stats} = State) ->
handle_call(flush, {Pid, _} = From,
#state{loop = Pid, writer = nil, flush_waiter = nil,
- target = Target, batch = Batch} = State) ->
+ target = Target, batch = Batch,
+ stop_on_doc_write_failure = StopOnDocWriteFailure
+ } = State) ->
State2 = case State#state.readers of
[] ->
- State#state{writer = spawn_writer(Target, Batch)};
+ State#state{writer = spawn_writer(Target, Batch, StopOnDocWriteFailure)};
_ ->
@@ -177,7 +192,8 @@ handle_info({'EXIT', Pid, normal}, #state{writer = nil} = State) ->
readers = Readers, writer = Writer, batch = Batch,
source = Source, target = Target,
- pending_fetch = Fetch, flush_waiter = FlushWaiter
+ pending_fetch = Fetch, flush_waiter = FlushWaiter,
+ stop_on_doc_write_failure = StopOnDocWriteFailure
} = State,
case Readers -- [Pid] of
Readers ->
@@ -190,13 +206,14 @@ handle_info({'EXIT', Pid, normal}, #state{writer = nil} = State) ->
true ->
readers = Readers2,
- writer = spawn_writer(Target, Batch)
+ writer = spawn_writer(Target, Batch, StopOnDocWriteFailure)
false ->
State#state{readers = Readers2}
{From, FetchParams} ->
- Reader = spawn_doc_reader(Source, Target, FetchParams),
+ Reader = spawn_doc_reader(Source, Target, FetchParams,
+ StopOnDocWriteFailure),
gen_server:reply(From, ok),
readers = [Reader | Readers2],
@@ -209,6 +226,10 @@ handle_info({'EXIT', Pid, normal}, #state{writer = nil} = State) ->
handle_info({'EXIT', _Pid, max_backoff}, State) ->
{stop, {shutdown, max_backoff}, State};
+handle_info({'EXIT', _Pid, {shutdown, {doc_write_failure, _, _}} = Error},
+ State) ->
+ {stop, Error, State};
handle_info({'EXIT', Pid, Reason}, State) ->
{stop, {process_died, Pid, Reason}, State}.
@@ -243,7 +264,8 @@ code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager) ->
+queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager,
+ StopOnDocWriteFailure) ->
ChangesManager ! {get_changes, self()},
{closed, ChangesManager} ->
@@ -251,7 +273,8 @@ queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager) ->
{changes, ChangesManager, [], ReportSeq} ->
Stats = couch_replicator_stats:new(),
ok = gen_server:call(Cp, {report_seq_done, ReportSeq, Stats}, infinity),
- queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager);
+ queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager,
+ StopOnDocWriteFailure);
{changes, ChangesManager, Changes, ReportSeq} ->
Target2 = open_db(Target),
{IdRevs, Stats0} = find_missing(Changes, Target2),
@@ -263,41 +286,50 @@ queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager) ->
_Db ->
Source2 = open_db(Source),
Stats = local_process_batch(
- IdRevs, Cp, Source2, Target2, #batch{}, Stats0),
+ IdRevs, Cp, Source2, Target2, #batch{}, Stats0,
+ StopOnDocWriteFailure
+ ),
ok = gen_server:call(Cp, {report_seq_done, ReportSeq, Stats}, infinity),
erlang:put(last_stats_report, os:timestamp()),
couch_log:debug("Worker reported completion of seq ~p", [ReportSeq]),
- queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager)
+ queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager,
+ StopOnDocWriteFailure)
-local_process_batch([], _Cp, _Src, _Tgt, #batch{docs = []}, Stats) ->
+local_process_batch([], _Cp, _Src, _Tgt, #batch{docs = []}, Stats, _) ->
-local_process_batch([], Cp, Source, Target, #batch{docs = Docs, size = Size}, Stats) ->
+local_process_batch([], Cp, Source, Target, #batch{docs = Docs, size = Size}, Stats,
+ StopOnDocWriteFailure) ->
case Target of
#httpdb{} ->
couch_log:debug("Worker flushing doc batch of size ~p bytes", [Size]);
_Db ->
couch_log:debug("Worker flushing doc batch of ~p docs", [Size])
- Stats2 = flush_docs(Target, Docs),
+ Stats2 = flush_docs(Target, Docs, StopOnDocWriteFailure),
Stats3 = couch_replicator_utils:sum_stats(Stats, Stats2),
- local_process_batch([], Cp, Source, Target, #batch{}, Stats3);
-local_process_batch([IdRevs | Rest], Cp, Source, Target, Batch, Stats) ->
- {ok, {_, DocList, Stats2, _}} = fetch_doc(
- Source, IdRevs, fun local_doc_handler/2, {Target, [], Stats, Cp}),
+ local_process_batch([], Cp, Source, Target, #batch{}, Stats3,
+ StopOnDocWriteFailure);
+local_process_batch([IdRevs | Rest], Cp, Source, Target, Batch, Stats,
+ StopOnDocWriteFailure) ->
+ {ok, {_, DocList, Stats2, _, _}} = fetch_doc(
+ Source, IdRevs, fun local_doc_handler/2, {Target, [], Stats, Cp,
+ StopOnDocWriteFailure}),
{Batch2, Stats3} = lists:foldl(
fun(Doc, {Batch0, Stats0}) ->
- {Batch1, S} = maybe_flush_docs(Target, Batch0, Doc),
+ {Batch1, S} = maybe_flush_docs(Target, Batch0, Doc,
+ StopOnDocWriteFailure),
{Batch1, couch_replicator_utils:sum_stats(Stats0, S)}
{Batch, Stats2}, DocList),
- local_process_batch(Rest, Cp, Source, Target, Batch2, Stats3).
+ local_process_batch(Rest, Cp, Source, Target, Batch2, Stats3,
+ StopOnDocWriteFailure).
remote_process_batch([], _Parent) ->
@@ -316,12 +348,13 @@ remote_process_batch([{Id, Revs, PAs} | Rest], Parent) ->
remote_process_batch(Rest, Parent).
-spawn_doc_reader(Source, Target, FetchParams) ->
+spawn_doc_reader(Source, Target, FetchParams, StopOnDocWriteFailure) ->
Parent = self(),
spawn_link(fun() ->
Source2 = open_db(Source),
- Source2, FetchParams, fun remote_doc_handler/2, {Parent, Target}),
+ Source2, FetchParams, fun remote_doc_handler/2, {Parent, Target,
+ StopOnDocWriteFailure}),
@@ -350,33 +383,36 @@ fetch_doc(Source, {Id, Revs, PAs}, DocHandler, Acc) ->
-local_doc_handler({ok, Doc}, {Target, DocList, Stats, Cp}) ->
+local_doc_handler({ok, Doc}, {Target, DocList, Stats, Cp,
+ StopOnDocWriteFailure}) ->
Stats2 = couch_replicator_stats:increment(docs_read, Stats),
case batch_doc(Doc) of
true ->
- {ok, {Target, [Doc | DocList], Stats2, Cp}};
+ {ok, {Target, [Doc | DocList], Stats2, Cp, StopOnDocWriteFailure}};
false ->
couch_log:debug("Worker flushing doc with attachments", []),
Target2 = open_db(Target),
- Success = (flush_doc(Target2, Doc) =:= ok),
+ FlushResult = flush_doc(Target2, Doc),
- Stats3 = case Success of
- true ->
+ Stats3 = case FlushResult of
+ ok ->
couch_replicator_stats:increment(docs_written, Stats2);
- false ->
+ Error->
+ stop_on_doc_write_failure(, Error,
+ StopOnDocWriteFailure),
couch_replicator_stats:increment(doc_write_failures, Stats2)
Stats4 = maybe_report_stats(Cp, Stats3),
- {ok, {Target, DocList, Stats4, Cp}}
+ {ok, {Target, DocList, Stats4, Cp, StopOnDocWriteFailure}}
local_doc_handler(_, Acc) ->
{ok, Acc}.
-remote_doc_handler({ok, #doc{atts = []} = Doc}, {Parent, _} = Acc) ->
+remote_doc_handler({ok, #doc{atts = []} = Doc}, {Parent, _, _} = Acc) ->
ok = gen_server:call(Parent, {batch_doc, Doc}, infinity),
{ok, Acc};
-remote_doc_handler({ok, Doc}, {Parent, Target} = Acc) ->
+remote_doc_handler({ok, Doc}, {Parent, Target, StopOnDocWriteFailure} = Acc) ->
% Immediately flush documents with attachments received from a remote
% source. The data property of each attachment is a function that starts
% streaming the attachment data from the remote source, therefore it's
@@ -384,12 +420,14 @@ remote_doc_handler({ok, Doc}, {Parent, Target} = Acc) ->
Stats = couch_replicator_stats:new([{docs_read, 1}]),
couch_log:debug("Worker flushing doc with attachments", []),
Target2 = open_db(Target),
- Success = (flush_doc(Target2, Doc) =:= ok),
+ FlushResult = flush_doc(Target2, Doc),
- {Result, Stats2} = case Success of
- true ->
+ {Result, Stats2} = case FlushResult of
+ ok ->
{{ok, Acc}, couch_replicator_stats:increment(docs_written, Stats)};
- false ->
+ Error ->
+ stop_on_doc_write_failure(, Error,
+ StopOnDocWriteFailure),
{{skip, Acc}, couch_replicator_stats:increment(doc_write_failures, Stats)}
ok = gen_server:call(Parent, {add_stats, Stats2}, infinity),
@@ -398,7 +436,8 @@ remote_doc_handler({{not_found, missing}, _}, _Acc) ->
-spawn_writer(Target, #batch{docs = DocList, size = Size}) ->
+spawn_writer(Target, #batch{docs = DocList, size = Size},
+ StopOnDocWriteFailure) ->
case {Target, Size > 0} of
{#httpdb{}, true} ->
couch_log:debug("Worker flushing doc batch of size ~p bytes", [Size]);
@@ -411,7 +450,7 @@ spawn_writer(Target, #batch{docs = DocList, size = Size}) ->
fun() ->
Target2 = open_db(Target),
- Stats = flush_docs(Target2, DocList),
+ Stats = flush_docs(Target2, DocList, StopOnDocWriteFailure),
ok = gen_server:call(Parent, {add_stats, Stats}, infinity)
@@ -431,16 +470,17 @@ after_full_flush(#state{stats = Stats, flush_waiter = Waiter} = State) ->
maybe_flush_docs(Doc,State) ->
target = Target, batch = Batch,
- stats = Stats, cp = Cp
+ stats = Stats, cp = Cp,
+ stop_on_doc_write_failure = StopOnDocWriteFailure
} = State,
- {Batch2, WStats} = maybe_flush_docs(Target, Batch, Doc),
+ {Batch2, WStats} = maybe_flush_docs(Target, Batch, Doc, StopOnDocWriteFailure),
Stats2 = couch_replicator_stats:sum_stats(Stats, WStats),
Stats3 = couch_replicator_stats:increment(docs_read, Stats2),
Stats4 = maybe_report_stats(Cp, Stats3),
State#state{stats = Stats4, batch = Batch2}.
-maybe_flush_docs(#httpdb{} = Target, Batch, Doc) ->
+maybe_flush_docs(#httpdb{} = Target, Batch, Doc, StopOnDocWriteFailure) ->
#batch{docs = DocAcc, size = SizeAcc} = Batch,
case batch_doc(Doc) of
false ->
@@ -448,7 +488,8 @@ maybe_flush_docs(#httpdb{} = Target, Batch, Doc) ->
case flush_doc(Target, Doc) of
ok ->
{Batch, couch_replicator_stats:new([{docs_written, 1}])};
- _ ->
+ Error ->
+ stop_on_doc_write_failure(, Error, StopOnDocWriteFailure),
{Batch, couch_replicator_stats:new([{doc_write_failures, 1}])}
true ->
@@ -456,7 +497,7 @@ maybe_flush_docs(#httpdb{} = Target, Batch, Doc) ->
case SizeAcc + iolist_size(JsonDoc) of
SizeAcc2 when SizeAcc2 > ?DOC_BUFFER_BYTE_SIZE ->
couch_log:debug("Worker flushing doc batch of size ~p bytes", [SizeAcc2]),
- Stats = flush_docs(Target, [JsonDoc | DocAcc]),
+ Stats = flush_docs(Target, [JsonDoc | DocAcc], StopOnDocWriteFailure),
{#batch{}, Stats};
SizeAcc2 ->
Stats = couch_replicator_stats:new(),
@@ -464,11 +505,12 @@ maybe_flush_docs(#httpdb{} = Target, Batch, Doc) ->
-maybe_flush_docs(Target, #batch{docs = DocAcc, size = SizeAcc}, Doc) ->
+maybe_flush_docs(Target, #batch{docs = DocAcc, size = SizeAcc}, Doc,
+ StopOnDocWriteFailure) ->
case SizeAcc + 1 of
SizeAcc2 when SizeAcc2 >= ?DOC_BUFFER_LEN ->
couch_log:debug("Worker flushing doc batch of ~p docs", [SizeAcc2]),
- Stats = flush_docs(Target, [Doc | DocAcc]),
+ Stats = flush_docs(Target, [Doc | DocAcc], StopOnDocWriteFailure),
{#batch{}, Stats};
SizeAcc2 ->
Stats = couch_replicator_stats:new(),
@@ -487,34 +529,46 @@ batch_doc(#doc{atts = Atts}) ->
end, Atts).
-flush_docs(_Target, []) ->
+flush_docs(_Target, [], _) ->
-flush_docs(Target, DocList) ->
+flush_docs(Target, DocList, StopOnDocWriteFailure) ->
FlushResult = couch_replicator_api_wrap:update_docs(Target, DocList,
[delay_commit], replicated_changes),
- handle_flush_docs_result(FlushResult, Target, DocList).
+ handle_flush_docs_result(FlushResult, Target, DocList,
+ StopOnDocWriteFailure).
-handle_flush_docs_result({error, request_body_too_large}, _Target, [Doc]) ->
+handle_flush_docs_result({error, request_body_too_large} = Error, _Target,
+ [Doc], StopOnDocWriteFailure) ->
couch_log:error("Replicator: failed to write doc ~p. Too large", [Doc]),
+ {DocProps} = ?JSON_DECODE(Doc),
+ DocId = get_value(<<"_id">>, DocProps, undefined),
+ stop_on_doc_write_failure(DocId, Error, StopOnDocWriteFailure),
couch_replicator_stats:new([{doc_write_failures, 1}]);
-handle_flush_docs_result({error, request_body_too_large}, Target, DocList) ->
+handle_flush_docs_result({error, request_body_too_large}, Target, DocList,
+ StopOnDocWriteFailure) ->
Len = length(DocList),
{DocList1, DocList2} = lists:split(Len div 2, DocList),
couch_log:notice("Replicator: couldn't write batch of size ~p to ~p because"
" request body is too large. Splitting batch into 2 separate batches of"
" sizes ~p and ~p", [Len, couch_replicator_api_wrap:db_uri(Target),
length(DocList1), length(DocList2)]),
- flush_docs(Target, DocList1),
- flush_docs(Target, DocList2);
-handle_flush_docs_result({ok, Errors}, Target, DocList) ->
+ flush_docs(Target, DocList1, StopOnDocWriteFailure),
+ flush_docs(Target, DocList2, StopOnDocWriteFailure);
+handle_flush_docs_result({ok, Errors}, Target, DocList, StopOnDocWriteFailure) ->
DbUri = couch_replicator_api_wrap:db_uri(Target),
fun({Props}) ->
+ Id = get_value(id, Props, ""),
+ Rev = get_value(rev, Props, ""),
+ Error = get_value(error, Props, ""),
+ Reason = get_value(reason, Props, ""),
couch_log:error("Replicator: couldn't write document `~s`, revision"
" `~s`, to target database `~s`. Error: `~s`, reason: `~s`.", [
- get_value(id, Props, ""), get_value(rev, Props, ""), DbUri,
- get_value(error, Props, ""), get_value(reason, Props, "")])
+ Id, Rev, DbUri, Error, Reason
+ ]),
+ stop_on_doc_write_failure(Id, {Error, Reason},
+ StopOnDocWriteFailure)
end, Errors),
{docs_written, length(DocList) - length(Errors)},
@@ -581,6 +635,15 @@ maybe_report_stats(Cp, Stats) ->
+stop_on_doc_write_failure(DocId, Error, true) ->
+ DocIdStr = binary_to_list(DocId),
+ exit({shutdown, {doc_write_failure, DocIdStr, Error}});
+stop_on_doc_write_failure(_, _, false) ->
+ ok.
diff --git a/src/couch_replicator/test/couch_replicator_attachments_too_large.erl b/src/couch_replicator/test/couch_replicator_attachments_too_large.erl
index 7fe84d2d9..9a21ce381 100644
--- a/src/couch_replicator/test/couch_replicator_attachments_too_large.erl
+++ b/src/couch_replicator/test/couch_replicator_attachments_too_large.erl
@@ -29,6 +29,7 @@ teardown(_, {Ctx, {Source, Target}}) ->
config:delete("couchdb", "max_attachment_size"),
+ config:delete("replicator", "stop_on_doc_write_failure"),
ok = test_util:stop_couch(Ctx).
@@ -40,7 +41,9 @@ attachment_too_large_replication_test_() ->
fun setup/1, fun teardown/2,
[{Pair, fun should_succeed/2} || Pair <- Pairs] ++
- [{Pair, fun should_fail/2} || Pair <- Pairs]
+ [{Pair, fun should_fail/2} || Pair <- Pairs] ++
+ [{Pair, fun should_crash_small/2} || Pair <- Pairs] ++
+ [{Pair, fun should_crash_large/2} || Pair <- Pairs]
@@ -66,6 +69,39 @@ should_fail({From, To}, {_Ctx, {Source, Target}}) ->
couch_replicator_test_helper:compare_dbs(Source, Target)).
+should_crash_small({From, To}, {_Ctx, {Source, Target}}) ->
+ ?_test(begin
+ RepObject = {[
+ {<<"source">>, db_url(From, Source)},
+ {<<"target">>, db_url(To, Target)}
+ ]},
+ config:set("couchdb", "max_attachment_size", "999", _Persist = false),
+ config:set_boolean("replicator", "stop_on_doc_write_failure", true,
+ false),
+ Result = couch_replicator:replicate(
+ RepObject, ?ADMIN_USER),
+ ?assertEqual({error, doc_write_failure}, Result)
+ end).
+should_crash_large({From, To}, {_Ctx, {Source, Target}}) ->
+ ?_test(begin
+ % see couch_replicator_worker.erl ?MAX_BULK_ATT_SIZE macro
+ create_doc_with_attachment(Source, <<"largedoc">>, 65537),
+ RepObject = {[
+ {<<"source">>, db_url(From, Source)},
+ {<<"target">>, db_url(To, Target)}
+ ]},
+ config:set("couchdb", "max_attachment_size", "65536", _Persist = false),
+ config:set_boolean("replicator", "stop_on_doc_write_failure", true,
+ false),
+ Result = couch_replicator:replicate(
+ RepObject, ?ADMIN_USER),
+ ?assertEqual({error, doc_write_failure}, Result)
+ end).
create_db() ->
DbName = ?tempdb(),
{ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),