diff options
Diffstat (limited to 'src/couch_replicator/src/couch_replicator_worker.erl')
-rw-r--r-- | src/couch_replicator/src/couch_replicator_worker.erl | 193 |
1 files changed, 128 insertions, 65 deletions
diff --git a/src/couch_replicator/src/couch_replicator_worker.erl b/src/couch_replicator/src/couch_replicator_worker.erl index ec98fa0f3..9b8baabe3 100644 --- a/src/couch_replicator/src/couch_replicator_worker.erl +++ b/src/couch_replicator/src/couch_replicator_worker.erl @@ -15,7 +15,7 @@ -vsn(1). % public API --export([start_link/5]). +-export([start_link/6]). % gen_server callbacks -export([init/1, terminate/2, code_change/3]). @@ -64,28 +64,39 @@ stats = couch_replicator_stats:new(), source_db_compaction_notifier = nil, target_db_compaction_notifier = nil, - batch = #batch{} + batch = #batch{}, + stop_on_doc_write_failure }). -start_link(Cp, #httpdb{} = Source, Target, ChangesManager, MaxConns) -> - gen_server:start_link( - ?MODULE, {Cp, Source, Target, ChangesManager, MaxConns}, []); +start_link(Cp, #httpdb{} = Source, Target, ChangesManager, MaxConns, + StopOnDocWriteFailure) -> + gen_server:start_link(?MODULE, { + Cp, + Source, + Target, + ChangesManager, + MaxConns, + StopOnDocWriteFailure + }, []); -start_link(Cp, Source, Target, ChangesManager, _MaxConns) -> +start_link(Cp, Source, Target, ChangesManager, _MaxConns, + StopOnDocWriteFailure) -> Pid = spawn_link(fun() -> erlang:put(last_stats_report, os:timestamp()), - queue_fetch_loop(Source, Target, Cp, Cp, ChangesManager) + queue_fetch_loop(Source, Target, Cp, Cp, ChangesManager, + StopOnDocWriteFailure) end), {ok, Pid}. -init({Cp, Source, Target, ChangesManager, MaxConns}) -> +init({Cp, Source, Target, ChangesManager, MaxConns, StopOnDocWriteFailure}) -> process_flag(trap_exit, true), Parent = self(), LoopPid = spawn_link(fun() -> - queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager) + queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager, + StopOnDocWriteFailure) end), erlang:put(last_stats_report, os:timestamp()), State = #state{ @@ -97,17 +108,19 @@ init({Cp, Source, Target, ChangesManager, MaxConns}) -> source_db_compaction_notifier = start_db_compaction_notifier(Source, self()), target_db_compaction_notifier = - start_db_compaction_notifier(Target, self()) + start_db_compaction_notifier(Target, self()), + stop_on_doc_write_failure = StopOnDocWriteFailure }, {ok, State}. handle_call({fetch_doc, {_Id, _Revs, _PAs} = Params}, {Pid, _} = From, #state{loop = Pid, readers = Readers, pending_fetch = nil, - source = Src, target = Tgt, max_parallel_conns = MaxConns} = State) -> + source = Src, target = Tgt, max_parallel_conns = MaxConns, + stop_on_doc_write_failure = StopOnDocWriteFailure} = State) -> case length(Readers) of Size when Size < MaxConns -> - Reader = spawn_doc_reader(Src, Tgt, Params), + Reader = spawn_doc_reader(Src, Tgt, Params, StopOnDocWriteFailure), NewState = State#state{ readers = [Reader | Readers] }, @@ -131,10 +144,12 @@ handle_call({add_stats, IncStats}, From, #state{stats = Stats} = State) -> handle_call(flush, {Pid, _} = From, #state{loop = Pid, writer = nil, flush_waiter = nil, - target = Target, batch = Batch} = State) -> + target = Target, batch = Batch, + stop_on_doc_write_failure = StopOnDocWriteFailure + } = State) -> State2 = case State#state.readers of [] -> - State#state{writer = spawn_writer(Target, Batch)}; + State#state{writer = spawn_writer(Target, Batch, StopOnDocWriteFailure)}; _ -> State end, @@ -177,7 +192,8 @@ handle_info({'EXIT', Pid, normal}, #state{writer = nil} = State) -> #state{ readers = Readers, writer = Writer, batch = Batch, source = Source, target = Target, - pending_fetch = Fetch, flush_waiter = FlushWaiter + pending_fetch = Fetch, flush_waiter = FlushWaiter, + stop_on_doc_write_failure = StopOnDocWriteFailure } = State, case Readers -- [Pid] of Readers -> @@ -190,13 +206,14 @@ handle_info({'EXIT', Pid, normal}, #state{writer = nil} = State) -> true -> State#state{ readers = Readers2, - writer = spawn_writer(Target, Batch) + writer = spawn_writer(Target, Batch, StopOnDocWriteFailure) }; false -> State#state{readers = Readers2} end; {From, FetchParams} -> - Reader = spawn_doc_reader(Source, Target, FetchParams), + Reader = spawn_doc_reader(Source, Target, FetchParams, + StopOnDocWriteFailure), gen_server:reply(From, ok), State#state{ readers = [Reader | Readers2], @@ -209,6 +226,10 @@ handle_info({'EXIT', Pid, normal}, #state{writer = nil} = State) -> handle_info({'EXIT', _Pid, max_backoff}, State) -> {stop, {shutdown, max_backoff}, State}; +handle_info({'EXIT', _Pid, {shutdown, {doc_write_failure, _, _}} = Error}, + State) -> + {stop, Error, State}; + handle_info({'EXIT', Pid, Reason}, State) -> {stop, {process_died, Pid, Reason}, State}. @@ -243,7 +264,8 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. -queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager) -> +queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager, + StopOnDocWriteFailure) -> ChangesManager ! {get_changes, self()}, receive {closed, ChangesManager} -> @@ -251,7 +273,8 @@ queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager) -> {changes, ChangesManager, [], ReportSeq} -> Stats = couch_replicator_stats:new(), ok = gen_server:call(Cp, {report_seq_done, ReportSeq, Stats}, infinity), - queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager); + queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager, + StopOnDocWriteFailure); {changes, ChangesManager, Changes, ReportSeq} -> Target2 = open_db(Target), {IdRevs, Stats0} = find_missing(Changes, Target2), @@ -263,41 +286,50 @@ queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager) -> _Db -> Source2 = open_db(Source), Stats = local_process_batch( - IdRevs, Cp, Source2, Target2, #batch{}, Stats0), + IdRevs, Cp, Source2, Target2, #batch{}, Stats0, + StopOnDocWriteFailure + ), close_db(Source2) end, close_db(Target2), ok = gen_server:call(Cp, {report_seq_done, ReportSeq, Stats}, infinity), erlang:put(last_stats_report, os:timestamp()), couch_log:debug("Worker reported completion of seq ~p", [ReportSeq]), - queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager) + queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager, + StopOnDocWriteFailure) end. -local_process_batch([], _Cp, _Src, _Tgt, #batch{docs = []}, Stats) -> +local_process_batch([], _Cp, _Src, _Tgt, #batch{docs = []}, Stats, _) -> Stats; -local_process_batch([], Cp, Source, Target, #batch{docs = Docs, size = Size}, Stats) -> +local_process_batch([], Cp, Source, Target, #batch{docs = Docs, size = Size}, Stats, + StopOnDocWriteFailure) -> case Target of #httpdb{} -> couch_log:debug("Worker flushing doc batch of size ~p bytes", [Size]); _Db -> couch_log:debug("Worker flushing doc batch of ~p docs", [Size]) end, - Stats2 = flush_docs(Target, Docs), + Stats2 = flush_docs(Target, Docs, StopOnDocWriteFailure), Stats3 = couch_replicator_utils:sum_stats(Stats, Stats2), - local_process_batch([], Cp, Source, Target, #batch{}, Stats3); - -local_process_batch([IdRevs | Rest], Cp, Source, Target, Batch, Stats) -> - {ok, {_, DocList, Stats2, _}} = fetch_doc( - Source, IdRevs, fun local_doc_handler/2, {Target, [], Stats, Cp}), + local_process_batch([], Cp, Source, Target, #batch{}, Stats3, + StopOnDocWriteFailure); + +local_process_batch([IdRevs | Rest], Cp, Source, Target, Batch, Stats, + StopOnDocWriteFailure) -> + {ok, {_, DocList, Stats2, _, _}} = fetch_doc( + Source, IdRevs, fun local_doc_handler/2, {Target, [], Stats, Cp, + StopOnDocWriteFailure}), {Batch2, Stats3} = lists:foldl( fun(Doc, {Batch0, Stats0}) -> - {Batch1, S} = maybe_flush_docs(Target, Batch0, Doc), + {Batch1, S} = maybe_flush_docs(Target, Batch0, Doc, + StopOnDocWriteFailure), {Batch1, couch_replicator_utils:sum_stats(Stats0, S)} end, {Batch, Stats2}, DocList), - local_process_batch(Rest, Cp, Source, Target, Batch2, Stats3). + local_process_batch(Rest, Cp, Source, Target, Batch2, Stats3, + StopOnDocWriteFailure). remote_process_batch([], _Parent) -> @@ -316,12 +348,13 @@ remote_process_batch([{Id, Revs, PAs} | Rest], Parent) -> remote_process_batch(Rest, Parent). -spawn_doc_reader(Source, Target, FetchParams) -> +spawn_doc_reader(Source, Target, FetchParams, StopOnDocWriteFailure) -> Parent = self(), spawn_link(fun() -> Source2 = open_db(Source), fetch_doc( - Source2, FetchParams, fun remote_doc_handler/2, {Parent, Target}), + Source2, FetchParams, fun remote_doc_handler/2, {Parent, Target, + StopOnDocWriteFailure}), close_db(Source2) end). @@ -350,33 +383,36 @@ fetch_doc(Source, {Id, Revs, PAs}, DocHandler, Acc) -> end. -local_doc_handler({ok, Doc}, {Target, DocList, Stats, Cp}) -> +local_doc_handler({ok, Doc}, {Target, DocList, Stats, Cp, + StopOnDocWriteFailure}) -> Stats2 = couch_replicator_stats:increment(docs_read, Stats), case batch_doc(Doc) of true -> - {ok, {Target, [Doc | DocList], Stats2, Cp}}; + {ok, {Target, [Doc | DocList], Stats2, Cp, StopOnDocWriteFailure}}; false -> couch_log:debug("Worker flushing doc with attachments", []), Target2 = open_db(Target), - Success = (flush_doc(Target2, Doc) =:= ok), + FlushResult = flush_doc(Target2, Doc), close_db(Target2), - Stats3 = case Success of - true -> + Stats3 = case FlushResult of + ok -> couch_replicator_stats:increment(docs_written, Stats2); - false -> + Error-> + stop_on_doc_write_failure(Doc#doc.id, Error, + StopOnDocWriteFailure), couch_replicator_stats:increment(doc_write_failures, Stats2) end, Stats4 = maybe_report_stats(Cp, Stats3), - {ok, {Target, DocList, Stats4, Cp}} + {ok, {Target, DocList, Stats4, Cp, StopOnDocWriteFailure}} end; local_doc_handler(_, Acc) -> {ok, Acc}. -remote_doc_handler({ok, #doc{atts = []} = Doc}, {Parent, _} = Acc) -> +remote_doc_handler({ok, #doc{atts = []} = Doc}, {Parent, _, _} = Acc) -> ok = gen_server:call(Parent, {batch_doc, Doc}, infinity), {ok, Acc}; -remote_doc_handler({ok, Doc}, {Parent, Target} = Acc) -> +remote_doc_handler({ok, Doc}, {Parent, Target, StopOnDocWriteFailure} = Acc) -> % Immediately flush documents with attachments received from a remote % source. The data property of each attachment is a function that starts % streaming the attachment data from the remote source, therefore it's @@ -384,12 +420,14 @@ remote_doc_handler({ok, Doc}, {Parent, Target} = Acc) -> Stats = couch_replicator_stats:new([{docs_read, 1}]), couch_log:debug("Worker flushing doc with attachments", []), Target2 = open_db(Target), - Success = (flush_doc(Target2, Doc) =:= ok), + FlushResult = flush_doc(Target2, Doc), close_db(Target2), - {Result, Stats2} = case Success of - true -> + {Result, Stats2} = case FlushResult of + ok -> {{ok, Acc}, couch_replicator_stats:increment(docs_written, Stats)}; - false -> + Error -> + stop_on_doc_write_failure(Doc#doc.id, Error, + StopOnDocWriteFailure), {{skip, Acc}, couch_replicator_stats:increment(doc_write_failures, Stats)} end, ok = gen_server:call(Parent, {add_stats, Stats2}, infinity), @@ -398,7 +436,8 @@ remote_doc_handler({{not_found, missing}, _}, _Acc) -> throw(missing_doc). -spawn_writer(Target, #batch{docs = DocList, size = Size}) -> +spawn_writer(Target, #batch{docs = DocList, size = Size}, + StopOnDocWriteFailure) -> case {Target, Size > 0} of {#httpdb{}, true} -> couch_log:debug("Worker flushing doc batch of size ~p bytes", [Size]); @@ -411,7 +450,7 @@ spawn_writer(Target, #batch{docs = DocList, size = Size}) -> spawn_link( fun() -> Target2 = open_db(Target), - Stats = flush_docs(Target2, DocList), + Stats = flush_docs(Target2, DocList, StopOnDocWriteFailure), close_db(Target2), ok = gen_server:call(Parent, {add_stats, Stats}, infinity) end). @@ -431,16 +470,17 @@ after_full_flush(#state{stats = Stats, flush_waiter = Waiter} = State) -> maybe_flush_docs(Doc,State) -> #state{ target = Target, batch = Batch, - stats = Stats, cp = Cp + stats = Stats, cp = Cp, + stop_on_doc_write_failure = StopOnDocWriteFailure } = State, - {Batch2, WStats} = maybe_flush_docs(Target, Batch, Doc), + {Batch2, WStats} = maybe_flush_docs(Target, Batch, Doc, StopOnDocWriteFailure), Stats2 = couch_replicator_stats:sum_stats(Stats, WStats), Stats3 = couch_replicator_stats:increment(docs_read, Stats2), Stats4 = maybe_report_stats(Cp, Stats3), State#state{stats = Stats4, batch = Batch2}. -maybe_flush_docs(#httpdb{} = Target, Batch, Doc) -> +maybe_flush_docs(#httpdb{} = Target, Batch, Doc, StopOnDocWriteFailure) -> #batch{docs = DocAcc, size = SizeAcc} = Batch, case batch_doc(Doc) of false -> @@ -448,7 +488,8 @@ maybe_flush_docs(#httpdb{} = Target, Batch, Doc) -> case flush_doc(Target, Doc) of ok -> {Batch, couch_replicator_stats:new([{docs_written, 1}])}; - _ -> + Error -> + stop_on_doc_write_failure(Doc#doc.id, Error, StopOnDocWriteFailure), {Batch, couch_replicator_stats:new([{doc_write_failures, 1}])} end; true -> @@ -456,7 +497,7 @@ maybe_flush_docs(#httpdb{} = Target, Batch, Doc) -> case SizeAcc + iolist_size(JsonDoc) of SizeAcc2 when SizeAcc2 > ?DOC_BUFFER_BYTE_SIZE -> couch_log:debug("Worker flushing doc batch of size ~p bytes", [SizeAcc2]), - Stats = flush_docs(Target, [JsonDoc | DocAcc]), + Stats = flush_docs(Target, [JsonDoc | DocAcc], StopOnDocWriteFailure), {#batch{}, Stats}; SizeAcc2 -> Stats = couch_replicator_stats:new(), @@ -464,11 +505,12 @@ maybe_flush_docs(#httpdb{} = Target, Batch, Doc) -> end end; -maybe_flush_docs(Target, #batch{docs = DocAcc, size = SizeAcc}, Doc) -> +maybe_flush_docs(Target, #batch{docs = DocAcc, size = SizeAcc}, Doc, + StopOnDocWriteFailure) -> case SizeAcc + 1 of SizeAcc2 when SizeAcc2 >= ?DOC_BUFFER_LEN -> couch_log:debug("Worker flushing doc batch of ~p docs", [SizeAcc2]), - Stats = flush_docs(Target, [Doc | DocAcc]), + Stats = flush_docs(Target, [Doc | DocAcc], StopOnDocWriteFailure), {#batch{}, Stats}; SizeAcc2 -> Stats = couch_replicator_stats:new(), @@ -487,34 +529,46 @@ batch_doc(#doc{atts = Atts}) -> end, Atts). -flush_docs(_Target, []) -> +flush_docs(_Target, [], _) -> couch_replicator_stats:new(); -flush_docs(Target, DocList) -> +flush_docs(Target, DocList, StopOnDocWriteFailure) -> FlushResult = couch_replicator_api_wrap:update_docs(Target, DocList, [delay_commit], replicated_changes), - handle_flush_docs_result(FlushResult, Target, DocList). + handle_flush_docs_result(FlushResult, Target, DocList, + StopOnDocWriteFailure). -handle_flush_docs_result({error, request_body_too_large}, _Target, [Doc]) -> +handle_flush_docs_result({error, request_body_too_large} = Error, _Target, + [Doc], StopOnDocWriteFailure) -> couch_log:error("Replicator: failed to write doc ~p. Too large", [Doc]), + {DocProps} = ?JSON_DECODE(Doc), + DocId = get_value(<<"_id">>, DocProps, undefined), + stop_on_doc_write_failure(DocId, Error, StopOnDocWriteFailure), couch_replicator_stats:new([{doc_write_failures, 1}]); -handle_flush_docs_result({error, request_body_too_large}, Target, DocList) -> +handle_flush_docs_result({error, request_body_too_large}, Target, DocList, + StopOnDocWriteFailure) -> Len = length(DocList), {DocList1, DocList2} = lists:split(Len div 2, DocList), couch_log:notice("Replicator: couldn't write batch of size ~p to ~p because" " request body is too large. Splitting batch into 2 separate batches of" " sizes ~p and ~p", [Len, couch_replicator_api_wrap:db_uri(Target), length(DocList1), length(DocList2)]), - flush_docs(Target, DocList1), - flush_docs(Target, DocList2); -handle_flush_docs_result({ok, Errors}, Target, DocList) -> + flush_docs(Target, DocList1, StopOnDocWriteFailure), + flush_docs(Target, DocList2, StopOnDocWriteFailure); +handle_flush_docs_result({ok, Errors}, Target, DocList, StopOnDocWriteFailure) -> DbUri = couch_replicator_api_wrap:db_uri(Target), lists:foreach( fun({Props}) -> + Id = get_value(id, Props, ""), + Rev = get_value(rev, Props, ""), + Error = get_value(error, Props, ""), + Reason = get_value(reason, Props, ""), couch_log:error("Replicator: couldn't write document `~s`, revision" " `~s`, to target database `~s`. Error: `~s`, reason: `~s`.", [ - get_value(id, Props, ""), get_value(rev, Props, ""), DbUri, - get_value(error, Props, ""), get_value(reason, Props, "")]) + Id, Rev, DbUri, Error, Reason + ]), + stop_on_doc_write_failure(Id, {Error, Reason}, + StopOnDocWriteFailure) end, Errors), couch_replicator_stats:new([ {docs_written, length(DocList) - length(Errors)}, @@ -581,6 +635,15 @@ maybe_report_stats(Cp, Stats) -> end. +stop_on_doc_write_failure(DocId, Error, true) -> + DocIdStr = binary_to_list(DocId), + exit({shutdown, {doc_write_failure, DocIdStr, Error}}); + +stop_on_doc_write_failure(_, _, false) -> + ok. + + + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). |