diff options
author | Nick Vatamaniuc <vatamane@apache.org> | 2018-11-13 11:41:54 -0500 |
---|---|---|
committer | Nick Vatamaniuc <vatamane@apache.org> | 2018-11-13 18:16:33 -0500 |
commit | dd0fa9916cef66401ecc186796f3d0e70d0cef2b (patch) | |
tree | 8a7ed3a628de8594b53273c4e53ff68c717b9a0f | |
parent | f12f4c5847a1cfa409c70dbc75b64a4f9e3645a3 (diff) | |
download | couchdb-dd0fa9916cef66401ecc186796f3d0e70d0cef2b.tar.gz |
Stop replications on target document write failuresfail-on-doc-write-mode
Previously when target document writes failed, because a VDU prevented the
write, or it exceed size limits on the target cluster, replication continued
and a `doc_write_failures` statistic counter was incremented. The counter was
visible in _active_tasks output for continuous replications and was visible in
the completion record written back to the replication documents.
That behavior might not be suitable in many cases. For instance, when migrating
data, if the counter is ignored by the user, combined with a successful
replication completion could lead to a perceived data loss. Until recently
00b28c265d97df675b725cd68897dc371cbd7168 this was even worse because on
replication scheduler would reset statistics counters on every job start. So if
a job restarted at least once, user might never find out that all their data
didn't copy from the source to the target.
Introduce a replicator config `stop_on_doc_write_failure = true | false` where
the replication crashes if a single document write fails. This will many write
failures visible to the and they would know exactly which document failed and
why.
Replications which crash because of doc write failures would retry periodically
just like any other crashes related to missing source or target connectivity,
for example. They do not fail permanently because users could adjust document
size limit on a target cluster or change the VDU function and then replication
will start working again and complete. Here we rely on exponential backoffs
which were introduced with the scheduling replicator work. At the maximum
backoff interval replications would retry about once every 8 hours.
After the failure is handled and bubles up to the main replication job process,
there is an attempt to checkpoint before exiting with the error. That is done
in order to reduce change feed reprocessing during each retry attempt.
6 files changed, 224 insertions, 70 deletions
diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini index edaebf9e2..356bbf36f 100644 --- a/rel/overlay/etc/default.ini +++ b/rel/overlay/etc/default.ini @@ -395,6 +395,14 @@ ssl_certificate_max_depth = 3 ; To restore the old behaviour, use the following value: ;auth_plugins = couch_replicator_auth_noop +; Stop replication jobs with an error on first document write failure to the +; target cluster. By default, when false, only a doc_write_failures counter is +; incremented but the replication continues on. When true the replication will +; crash and the document id and the reason for doc failure will be reported +; in the replication state. Replication will then be retried periodically with +; exponential backoffs like any other replication. +;stop_on_doc_write_failure = false + [compaction_daemon] ; The delay, in seconds, between each check for which database and view indexes ; need to be compacted. diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl b/src/couch_replicator/src/couch_replicator_api_wrap.erl index 44c290d33..f94a6f207 100644 --- a/src/couch_replicator/src/couch_replicator_api_wrap.erl +++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl @@ -304,6 +304,12 @@ open_doc_revs(#httpdb{} = HttpDb, Id, Revs, Options, Fun, Acc) -> throw(Stub); {'DOWN', Ref, process, Pid, {http_request_failed, _, _, max_backoff}} -> exit(max_backoff); + {'DOWN', Ref, process, Pid, {http_request_failed, _, _, + {shutdown, {doc_write_failure, _, _}} = DocWriteFailure}} -> + exit(DocWriteFailure); + {'DOWN', Ref, process, Pid, {shutdown, {doc_write_failure, _, _}} = + DocWriteFailure} -> + exit(DocWriteFailure); {'DOWN', Ref, process, Pid, request_uri_too_long} -> NewMaxLen = get_value(max_url_len, Options, ?MAX_URL_LEN) div 2, case NewMaxLen < ?MIN_URL_LEN of diff --git a/src/couch_replicator/src/couch_replicator_docs.erl b/src/couch_replicator/src/couch_replicator_docs.erl index 013475683..e6c71bfa5 100644 --- a/src/couch_replicator/src/couch_replicator_docs.erl +++ b/src/couch_replicator/src/couch_replicator_docs.erl @@ -462,6 +462,8 @@ make_options(Props) -> {ok, DefSocketOptions} = couch_util:parse_term( config:get("replicator", "socket_options", "[{keepalive, true}, {nodelay, false}]")), + StopOnDocWriteFailure = config:get("replicator", + "stop_on_doc_write_failure", "false"), lists:ukeymerge(1, Options, lists:keysort(1, [ {connection_timeout, list_to_integer(DefTimeout)}, {retries, list_to_integer(DefRetries)}, @@ -470,7 +472,9 @@ make_options(Props) -> {worker_batch_size, list_to_integer(DefBatchSize)}, {worker_processes, list_to_integer(DefWorkers)}, {use_checkpoints, list_to_existing_atom(UseCheckpoints)}, - {checkpoint_interval, list_to_integer(DefCheckpointInterval)} + {checkpoint_interval, list_to_integer(DefCheckpointInterval)}, + {stop_on_doc_write_failure, + list_to_existing_atom(StopOnDocWriteFailure)} ])). @@ -533,6 +537,12 @@ convert_options([{<<"use_checkpoints">>, V} | R]) -> [{use_checkpoints, V} | convert_options(R)]; convert_options([{<<"checkpoint_interval">>, V} | R]) -> [{checkpoint_interval, couch_util:to_integer(V)} | convert_options(R)]; +convert_options([{<<"stop_on_doc_write_failure">>, V} | _R]) + when not is_boolean(V)-> + ErrMsg = <<"parameter `stop_on_doc_write_failure` must be a boolean">>, + throw({bad_request, ErrMsg}); +convert_options([{<<"stop_on_doc_write_failure">>, V} | R]) -> + [{stop_on_doc_write_failure, V} | convert_options(R)]; convert_options([_ | R]) -> % skip unknown option convert_options(R). @@ -737,7 +747,9 @@ check_convert_options_pass_test() -> ?assertEqual([{doc_ids, [<<"id">>]}], convert_options([{<<"doc_ids">>, [<<"id">>]}])), ?assertEqual([{selector, {key, value}}], - convert_options([{<<"selector">>, {key, value}}])). + convert_options([{<<"selector">>, {key, value}}])), + ?assertEqual([{stop_on_doc_write_failure, true}], + convert_options([{<<"stop_on_doc_write_failure">>, true}])). check_convert_options_fail_test() -> @@ -750,7 +762,10 @@ check_convert_options_fail_test() -> ?assertThrow({bad_request, _}, convert_options([{<<"doc_ids">>, not_a_list}])), ?assertThrow({bad_request, _}, - convert_options([{<<"selector">>, [{key, value}]}])). + convert_options([{<<"selector">>, [{key, value}]}])), + ?assertThrow({bad_request, _}, + convert_options([{<<"stop_on_doc_write_failure">>, "42"}])). + check_strip_credentials_test() -> [?assertEqual(Expected, strip_credentials(Body)) || {Expected, Body} <- [ diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl index f669d464d..ca4d6c367 100644 --- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl +++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl @@ -140,11 +140,13 @@ do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) -> % a batch of _changes rows to process -> check which revs are missing in the % target, and for the missing ones, it copies them from the source to the target. MaxConns = get_value(http_connections, Options), + StopOnDocWriteFailure = get_value(stop_on_doc_write_failure, Options), Workers = lists:map( fun(_) -> couch_stats:increment_counter([couch_replicator, workers_started]), {ok, Pid} = couch_replicator_worker:start_link( - self(), Source, Target, ChangesManager, MaxConns), + self(), Source, Target, ChangesManager, MaxConns, + StopOnDocWriteFailure), Pid end, lists:seq(1, NumWorkers)), @@ -277,6 +279,11 @@ handle_info({'EXIT', Pid, {shutdown, max_backoff}}, State) -> couch_log:error("Max backoff reached child process ~p", [Pid]), {stop, {shutdown, max_backoff}, State}; +handle_info({'EXIT', Pid, {shutdown, {doc_write_failure, _, _}} = Error}, + #rep_state{} = State) -> + couch_log:error("Worker ~p exited with doc write error ~p", [Pid, Error]), + {stop, Error, State}; + handle_info({'EXIT', Pid, normal}, #rep_state{changes_reader=Pid} = State) -> {noreply, State}; @@ -365,6 +372,25 @@ terminate(shutdown, #rep_state{rep_details = #rep{id = RepId}} = State) -> couch_replicator_notifier:notify({stopped, RepId, <<"stopped">>}), terminate_cleanup(State1); +terminate({shutdown, {doc_write_failure, DocId, DocError}}, State) -> + #rep_state{ + rep_details = #rep{id = {BaseId, Ext} = RepId}, + source_name = Source, + target_name = Target + } = State, + % Checkpoint to avoid reprocessing the same changes during retries + case do_checkpoint(State) of + {ok, _} -> + ok; + Error -> + LogMsg = "~p : Failed doc_write_failure checkpoint. Error: ~p", + couch_log:error(LogMsg, [?MODULE, Error]) + end, + RepIdStr = BaseId ++ Ext, + Msg = "Replication `~s` (~s -> ~s) failed with doc_write_failure ~p : ~p", + couch_log:warning(Msg, [RepIdStr, Source, Target, DocId, DocError]), + couch_replicator_notifier:notify({error, RepId, doc_write_failure}); + terminate({shutdown, max_backoff}, {error, InitArgs}) -> #rep{id = {BaseId, Ext} = RepId} = InitArgs, couch_stats:increment_counter([couch_replicator, failed_starts]), diff --git a/src/couch_replicator/src/couch_replicator_worker.erl b/src/couch_replicator/src/couch_replicator_worker.erl index ec98fa0f3..9b8baabe3 100644 --- a/src/couch_replicator/src/couch_replicator_worker.erl +++ b/src/couch_replicator/src/couch_replicator_worker.erl @@ -15,7 +15,7 @@ -vsn(1). % public API --export([start_link/5]). +-export([start_link/6]). % gen_server callbacks -export([init/1, terminate/2, code_change/3]). @@ -64,28 +64,39 @@ stats = couch_replicator_stats:new(), source_db_compaction_notifier = nil, target_db_compaction_notifier = nil, - batch = #batch{} + batch = #batch{}, + stop_on_doc_write_failure }). -start_link(Cp, #httpdb{} = Source, Target, ChangesManager, MaxConns) -> - gen_server:start_link( - ?MODULE, {Cp, Source, Target, ChangesManager, MaxConns}, []); +start_link(Cp, #httpdb{} = Source, Target, ChangesManager, MaxConns, + StopOnDocWriteFailure) -> + gen_server:start_link(?MODULE, { + Cp, + Source, + Target, + ChangesManager, + MaxConns, + StopOnDocWriteFailure + }, []); -start_link(Cp, Source, Target, ChangesManager, _MaxConns) -> +start_link(Cp, Source, Target, ChangesManager, _MaxConns, + StopOnDocWriteFailure) -> Pid = spawn_link(fun() -> erlang:put(last_stats_report, os:timestamp()), - queue_fetch_loop(Source, Target, Cp, Cp, ChangesManager) + queue_fetch_loop(Source, Target, Cp, Cp, ChangesManager, + StopOnDocWriteFailure) end), {ok, Pid}. -init({Cp, Source, Target, ChangesManager, MaxConns}) -> +init({Cp, Source, Target, ChangesManager, MaxConns, StopOnDocWriteFailure}) -> process_flag(trap_exit, true), Parent = self(), LoopPid = spawn_link(fun() -> - queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager) + queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager, + StopOnDocWriteFailure) end), erlang:put(last_stats_report, os:timestamp()), State = #state{ @@ -97,17 +108,19 @@ init({Cp, Source, Target, ChangesManager, MaxConns}) -> source_db_compaction_notifier = start_db_compaction_notifier(Source, self()), target_db_compaction_notifier = - start_db_compaction_notifier(Target, self()) + start_db_compaction_notifier(Target, self()), + stop_on_doc_write_failure = StopOnDocWriteFailure }, {ok, State}. handle_call({fetch_doc, {_Id, _Revs, _PAs} = Params}, {Pid, _} = From, #state{loop = Pid, readers = Readers, pending_fetch = nil, - source = Src, target = Tgt, max_parallel_conns = MaxConns} = State) -> + source = Src, target = Tgt, max_parallel_conns = MaxConns, + stop_on_doc_write_failure = StopOnDocWriteFailure} = State) -> case length(Readers) of Size when Size < MaxConns -> - Reader = spawn_doc_reader(Src, Tgt, Params), + Reader = spawn_doc_reader(Src, Tgt, Params, StopOnDocWriteFailure), NewState = State#state{ readers = [Reader | Readers] }, @@ -131,10 +144,12 @@ handle_call({add_stats, IncStats}, From, #state{stats = Stats} = State) -> handle_call(flush, {Pid, _} = From, #state{loop = Pid, writer = nil, flush_waiter = nil, - target = Target, batch = Batch} = State) -> + target = Target, batch = Batch, + stop_on_doc_write_failure = StopOnDocWriteFailure + } = State) -> State2 = case State#state.readers of [] -> - State#state{writer = spawn_writer(Target, Batch)}; + State#state{writer = spawn_writer(Target, Batch, StopOnDocWriteFailure)}; _ -> State end, @@ -177,7 +192,8 @@ handle_info({'EXIT', Pid, normal}, #state{writer = nil} = State) -> #state{ readers = Readers, writer = Writer, batch = Batch, source = Source, target = Target, - pending_fetch = Fetch, flush_waiter = FlushWaiter + pending_fetch = Fetch, flush_waiter = FlushWaiter, + stop_on_doc_write_failure = StopOnDocWriteFailure } = State, case Readers -- [Pid] of Readers -> @@ -190,13 +206,14 @@ handle_info({'EXIT', Pid, normal}, #state{writer = nil} = State) -> true -> State#state{ readers = Readers2, - writer = spawn_writer(Target, Batch) + writer = spawn_writer(Target, Batch, StopOnDocWriteFailure) }; false -> State#state{readers = Readers2} end; {From, FetchParams} -> - Reader = spawn_doc_reader(Source, Target, FetchParams), + Reader = spawn_doc_reader(Source, Target, FetchParams, + StopOnDocWriteFailure), gen_server:reply(From, ok), State#state{ readers = [Reader | Readers2], @@ -209,6 +226,10 @@ handle_info({'EXIT', Pid, normal}, #state{writer = nil} = State) -> handle_info({'EXIT', _Pid, max_backoff}, State) -> {stop, {shutdown, max_backoff}, State}; +handle_info({'EXIT', _Pid, {shutdown, {doc_write_failure, _, _}} = Error}, + State) -> + {stop, Error, State}; + handle_info({'EXIT', Pid, Reason}, State) -> {stop, {process_died, Pid, Reason}, State}. @@ -243,7 +264,8 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. -queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager) -> +queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager, + StopOnDocWriteFailure) -> ChangesManager ! {get_changes, self()}, receive {closed, ChangesManager} -> @@ -251,7 +273,8 @@ queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager) -> {changes, ChangesManager, [], ReportSeq} -> Stats = couch_replicator_stats:new(), ok = gen_server:call(Cp, {report_seq_done, ReportSeq, Stats}, infinity), - queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager); + queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager, + StopOnDocWriteFailure); {changes, ChangesManager, Changes, ReportSeq} -> Target2 = open_db(Target), {IdRevs, Stats0} = find_missing(Changes, Target2), @@ -263,41 +286,50 @@ queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager) -> _Db -> Source2 = open_db(Source), Stats = local_process_batch( - IdRevs, Cp, Source2, Target2, #batch{}, Stats0), + IdRevs, Cp, Source2, Target2, #batch{}, Stats0, + StopOnDocWriteFailure + ), close_db(Source2) end, close_db(Target2), ok = gen_server:call(Cp, {report_seq_done, ReportSeq, Stats}, infinity), erlang:put(last_stats_report, os:timestamp()), couch_log:debug("Worker reported completion of seq ~p", [ReportSeq]), - queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager) + queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager, + StopOnDocWriteFailure) end. -local_process_batch([], _Cp, _Src, _Tgt, #batch{docs = []}, Stats) -> +local_process_batch([], _Cp, _Src, _Tgt, #batch{docs = []}, Stats, _) -> Stats; -local_process_batch([], Cp, Source, Target, #batch{docs = Docs, size = Size}, Stats) -> +local_process_batch([], Cp, Source, Target, #batch{docs = Docs, size = Size}, Stats, + StopOnDocWriteFailure) -> case Target of #httpdb{} -> couch_log:debug("Worker flushing doc batch of size ~p bytes", [Size]); _Db -> couch_log:debug("Worker flushing doc batch of ~p docs", [Size]) end, - Stats2 = flush_docs(Target, Docs), + Stats2 = flush_docs(Target, Docs, StopOnDocWriteFailure), Stats3 = couch_replicator_utils:sum_stats(Stats, Stats2), - local_process_batch([], Cp, Source, Target, #batch{}, Stats3); - -local_process_batch([IdRevs | Rest], Cp, Source, Target, Batch, Stats) -> - {ok, {_, DocList, Stats2, _}} = fetch_doc( - Source, IdRevs, fun local_doc_handler/2, {Target, [], Stats, Cp}), + local_process_batch([], Cp, Source, Target, #batch{}, Stats3, + StopOnDocWriteFailure); + +local_process_batch([IdRevs | Rest], Cp, Source, Target, Batch, Stats, + StopOnDocWriteFailure) -> + {ok, {_, DocList, Stats2, _, _}} = fetch_doc( + Source, IdRevs, fun local_doc_handler/2, {Target, [], Stats, Cp, + StopOnDocWriteFailure}), {Batch2, Stats3} = lists:foldl( fun(Doc, {Batch0, Stats0}) -> - {Batch1, S} = maybe_flush_docs(Target, Batch0, Doc), + {Batch1, S} = maybe_flush_docs(Target, Batch0, Doc, + StopOnDocWriteFailure), {Batch1, couch_replicator_utils:sum_stats(Stats0, S)} end, {Batch, Stats2}, DocList), - local_process_batch(Rest, Cp, Source, Target, Batch2, Stats3). + local_process_batch(Rest, Cp, Source, Target, Batch2, Stats3, + StopOnDocWriteFailure). remote_process_batch([], _Parent) -> @@ -316,12 +348,13 @@ remote_process_batch([{Id, Revs, PAs} | Rest], Parent) -> remote_process_batch(Rest, Parent). -spawn_doc_reader(Source, Target, FetchParams) -> +spawn_doc_reader(Source, Target, FetchParams, StopOnDocWriteFailure) -> Parent = self(), spawn_link(fun() -> Source2 = open_db(Source), fetch_doc( - Source2, FetchParams, fun remote_doc_handler/2, {Parent, Target}), + Source2, FetchParams, fun remote_doc_handler/2, {Parent, Target, + StopOnDocWriteFailure}), close_db(Source2) end). @@ -350,33 +383,36 @@ fetch_doc(Source, {Id, Revs, PAs}, DocHandler, Acc) -> end. -local_doc_handler({ok, Doc}, {Target, DocList, Stats, Cp}) -> +local_doc_handler({ok, Doc}, {Target, DocList, Stats, Cp, + StopOnDocWriteFailure}) -> Stats2 = couch_replicator_stats:increment(docs_read, Stats), case batch_doc(Doc) of true -> - {ok, {Target, [Doc | DocList], Stats2, Cp}}; + {ok, {Target, [Doc | DocList], Stats2, Cp, StopOnDocWriteFailure}}; false -> couch_log:debug("Worker flushing doc with attachments", []), Target2 = open_db(Target), - Success = (flush_doc(Target2, Doc) =:= ok), + FlushResult = flush_doc(Target2, Doc), close_db(Target2), - Stats3 = case Success of - true -> + Stats3 = case FlushResult of + ok -> couch_replicator_stats:increment(docs_written, Stats2); - false -> + Error-> + stop_on_doc_write_failure(Doc#doc.id, Error, + StopOnDocWriteFailure), couch_replicator_stats:increment(doc_write_failures, Stats2) end, Stats4 = maybe_report_stats(Cp, Stats3), - {ok, {Target, DocList, Stats4, Cp}} + {ok, {Target, DocList, Stats4, Cp, StopOnDocWriteFailure}} end; local_doc_handler(_, Acc) -> {ok, Acc}. -remote_doc_handler({ok, #doc{atts = []} = Doc}, {Parent, _} = Acc) -> +remote_doc_handler({ok, #doc{atts = []} = Doc}, {Parent, _, _} = Acc) -> ok = gen_server:call(Parent, {batch_doc, Doc}, infinity), {ok, Acc}; -remote_doc_handler({ok, Doc}, {Parent, Target} = Acc) -> +remote_doc_handler({ok, Doc}, {Parent, Target, StopOnDocWriteFailure} = Acc) -> % Immediately flush documents with attachments received from a remote % source. The data property of each attachment is a function that starts % streaming the attachment data from the remote source, therefore it's @@ -384,12 +420,14 @@ remote_doc_handler({ok, Doc}, {Parent, Target} = Acc) -> Stats = couch_replicator_stats:new([{docs_read, 1}]), couch_log:debug("Worker flushing doc with attachments", []), Target2 = open_db(Target), - Success = (flush_doc(Target2, Doc) =:= ok), + FlushResult = flush_doc(Target2, Doc), close_db(Target2), - {Result, Stats2} = case Success of - true -> + {Result, Stats2} = case FlushResult of + ok -> {{ok, Acc}, couch_replicator_stats:increment(docs_written, Stats)}; - false -> + Error -> + stop_on_doc_write_failure(Doc#doc.id, Error, + StopOnDocWriteFailure), {{skip, Acc}, couch_replicator_stats:increment(doc_write_failures, Stats)} end, ok = gen_server:call(Parent, {add_stats, Stats2}, infinity), @@ -398,7 +436,8 @@ remote_doc_handler({{not_found, missing}, _}, _Acc) -> throw(missing_doc). -spawn_writer(Target, #batch{docs = DocList, size = Size}) -> +spawn_writer(Target, #batch{docs = DocList, size = Size}, + StopOnDocWriteFailure) -> case {Target, Size > 0} of {#httpdb{}, true} -> couch_log:debug("Worker flushing doc batch of size ~p bytes", [Size]); @@ -411,7 +450,7 @@ spawn_writer(Target, #batch{docs = DocList, size = Size}) -> spawn_link( fun() -> Target2 = open_db(Target), - Stats = flush_docs(Target2, DocList), + Stats = flush_docs(Target2, DocList, StopOnDocWriteFailure), close_db(Target2), ok = gen_server:call(Parent, {add_stats, Stats}, infinity) end). @@ -431,16 +470,17 @@ after_full_flush(#state{stats = Stats, flush_waiter = Waiter} = State) -> maybe_flush_docs(Doc,State) -> #state{ target = Target, batch = Batch, - stats = Stats, cp = Cp + stats = Stats, cp = Cp, + stop_on_doc_write_failure = StopOnDocWriteFailure } = State, - {Batch2, WStats} = maybe_flush_docs(Target, Batch, Doc), + {Batch2, WStats} = maybe_flush_docs(Target, Batch, Doc, StopOnDocWriteFailure), Stats2 = couch_replicator_stats:sum_stats(Stats, WStats), Stats3 = couch_replicator_stats:increment(docs_read, Stats2), Stats4 = maybe_report_stats(Cp, Stats3), State#state{stats = Stats4, batch = Batch2}. -maybe_flush_docs(#httpdb{} = Target, Batch, Doc) -> +maybe_flush_docs(#httpdb{} = Target, Batch, Doc, StopOnDocWriteFailure) -> #batch{docs = DocAcc, size = SizeAcc} = Batch, case batch_doc(Doc) of false -> @@ -448,7 +488,8 @@ maybe_flush_docs(#httpdb{} = Target, Batch, Doc) -> case flush_doc(Target, Doc) of ok -> {Batch, couch_replicator_stats:new([{docs_written, 1}])}; - _ -> + Error -> + stop_on_doc_write_failure(Doc#doc.id, Error, StopOnDocWriteFailure), {Batch, couch_replicator_stats:new([{doc_write_failures, 1}])} end; true -> @@ -456,7 +497,7 @@ maybe_flush_docs(#httpdb{} = Target, Batch, Doc) -> case SizeAcc + iolist_size(JsonDoc) of SizeAcc2 when SizeAcc2 > ?DOC_BUFFER_BYTE_SIZE -> couch_log:debug("Worker flushing doc batch of size ~p bytes", [SizeAcc2]), - Stats = flush_docs(Target, [JsonDoc | DocAcc]), + Stats = flush_docs(Target, [JsonDoc | DocAcc], StopOnDocWriteFailure), {#batch{}, Stats}; SizeAcc2 -> Stats = couch_replicator_stats:new(), @@ -464,11 +505,12 @@ maybe_flush_docs(#httpdb{} = Target, Batch, Doc) -> end end; -maybe_flush_docs(Target, #batch{docs = DocAcc, size = SizeAcc}, Doc) -> +maybe_flush_docs(Target, #batch{docs = DocAcc, size = SizeAcc}, Doc, + StopOnDocWriteFailure) -> case SizeAcc + 1 of SizeAcc2 when SizeAcc2 >= ?DOC_BUFFER_LEN -> couch_log:debug("Worker flushing doc batch of ~p docs", [SizeAcc2]), - Stats = flush_docs(Target, [Doc | DocAcc]), + Stats = flush_docs(Target, [Doc | DocAcc], StopOnDocWriteFailure), {#batch{}, Stats}; SizeAcc2 -> Stats = couch_replicator_stats:new(), @@ -487,34 +529,46 @@ batch_doc(#doc{atts = Atts}) -> end, Atts). -flush_docs(_Target, []) -> +flush_docs(_Target, [], _) -> couch_replicator_stats:new(); -flush_docs(Target, DocList) -> +flush_docs(Target, DocList, StopOnDocWriteFailure) -> FlushResult = couch_replicator_api_wrap:update_docs(Target, DocList, [delay_commit], replicated_changes), - handle_flush_docs_result(FlushResult, Target, DocList). + handle_flush_docs_result(FlushResult, Target, DocList, + StopOnDocWriteFailure). -handle_flush_docs_result({error, request_body_too_large}, _Target, [Doc]) -> +handle_flush_docs_result({error, request_body_too_large} = Error, _Target, + [Doc], StopOnDocWriteFailure) -> couch_log:error("Replicator: failed to write doc ~p. Too large", [Doc]), + {DocProps} = ?JSON_DECODE(Doc), + DocId = get_value(<<"_id">>, DocProps, undefined), + stop_on_doc_write_failure(DocId, Error, StopOnDocWriteFailure), couch_replicator_stats:new([{doc_write_failures, 1}]); -handle_flush_docs_result({error, request_body_too_large}, Target, DocList) -> +handle_flush_docs_result({error, request_body_too_large}, Target, DocList, + StopOnDocWriteFailure) -> Len = length(DocList), {DocList1, DocList2} = lists:split(Len div 2, DocList), couch_log:notice("Replicator: couldn't write batch of size ~p to ~p because" " request body is too large. Splitting batch into 2 separate batches of" " sizes ~p and ~p", [Len, couch_replicator_api_wrap:db_uri(Target), length(DocList1), length(DocList2)]), - flush_docs(Target, DocList1), - flush_docs(Target, DocList2); -handle_flush_docs_result({ok, Errors}, Target, DocList) -> + flush_docs(Target, DocList1, StopOnDocWriteFailure), + flush_docs(Target, DocList2, StopOnDocWriteFailure); +handle_flush_docs_result({ok, Errors}, Target, DocList, StopOnDocWriteFailure) -> DbUri = couch_replicator_api_wrap:db_uri(Target), lists:foreach( fun({Props}) -> + Id = get_value(id, Props, ""), + Rev = get_value(rev, Props, ""), + Error = get_value(error, Props, ""), + Reason = get_value(reason, Props, ""), couch_log:error("Replicator: couldn't write document `~s`, revision" " `~s`, to target database `~s`. Error: `~s`, reason: `~s`.", [ - get_value(id, Props, ""), get_value(rev, Props, ""), DbUri, - get_value(error, Props, ""), get_value(reason, Props, "")]) + Id, Rev, DbUri, Error, Reason + ]), + stop_on_doc_write_failure(Id, {Error, Reason}, + StopOnDocWriteFailure) end, Errors), couch_replicator_stats:new([ {docs_written, length(DocList) - length(Errors)}, @@ -581,6 +635,15 @@ maybe_report_stats(Cp, Stats) -> end. +stop_on_doc_write_failure(DocId, Error, true) -> + DocIdStr = binary_to_list(DocId), + exit({shutdown, {doc_write_failure, DocIdStr, Error}}); + +stop_on_doc_write_failure(_, _, false) -> + ok. + + + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). diff --git a/src/couch_replicator/test/couch_replicator_attachments_too_large.erl b/src/couch_replicator/test/couch_replicator_attachments_too_large.erl index 7fe84d2d9..9a21ce381 100644 --- a/src/couch_replicator/test/couch_replicator_attachments_too_large.erl +++ b/src/couch_replicator/test/couch_replicator_attachments_too_large.erl @@ -29,6 +29,7 @@ teardown(_, {Ctx, {Source, Target}}) -> delete_db(Source), delete_db(Target), config:delete("couchdb", "max_attachment_size"), + config:delete("replicator", "stop_on_doc_write_failure"), ok = test_util:stop_couch(Ctx). @@ -40,7 +41,9 @@ attachment_too_large_replication_test_() -> foreachx, fun setup/1, fun teardown/2, [{Pair, fun should_succeed/2} || Pair <- Pairs] ++ - [{Pair, fun should_fail/2} || Pair <- Pairs] + [{Pair, fun should_fail/2} || Pair <- Pairs] ++ + [{Pair, fun should_crash_small/2} || Pair <- Pairs] ++ + [{Pair, fun should_crash_large/2} || Pair <- Pairs] } }. @@ -66,6 +69,39 @@ should_fail({From, To}, {_Ctx, {Source, Target}}) -> couch_replicator_test_helper:compare_dbs(Source, Target)). +should_crash_small({From, To}, {_Ctx, {Source, Target}}) -> + ?_test(begin + RepObject = {[ + {<<"source">>, db_url(From, Source)}, + {<<"target">>, db_url(To, Target)} + ]}, + config:set("couchdb", "max_attachment_size", "999", _Persist = false), + config:set_boolean("replicator", "stop_on_doc_write_failure", true, + false), + Result = couch_replicator:replicate( + RepObject, ?ADMIN_USER), + ?assertEqual({error, doc_write_failure}, Result) + end). + + +should_crash_large({From, To}, {_Ctx, {Source, Target}}) -> + ?_test(begin + % see couch_replicator_worker.erl ?MAX_BULK_ATT_SIZE macro + create_doc_with_attachment(Source, <<"largedoc">>, 65537), + RepObject = {[ + {<<"source">>, db_url(From, Source)}, + {<<"target">>, db_url(To, Target)} + ]}, + config:set("couchdb", "max_attachment_size", "65536", _Persist = false), + config:set_boolean("replicator", "stop_on_doc_write_failure", true, + false), + Result = couch_replicator:replicate( + RepObject, ?ADMIN_USER), + ?assertEqual({error, doc_write_failure}, Result) + end). + + + create_db() -> DbName = ?tempdb(), {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]), |