summaryrefslogtreecommitdiff
path: root/src/couch_replicator/src/couch_replicator_worker.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couch_replicator/src/couch_replicator_worker.erl')
-rw-r--r--src/couch_replicator/src/couch_replicator_worker.erl193
1 files changed, 128 insertions, 65 deletions
diff --git a/src/couch_replicator/src/couch_replicator_worker.erl b/src/couch_replicator/src/couch_replicator_worker.erl
index ec98fa0f3..9b8baabe3 100644
--- a/src/couch_replicator/src/couch_replicator_worker.erl
+++ b/src/couch_replicator/src/couch_replicator_worker.erl
@@ -15,7 +15,7 @@
-vsn(1).
% public API
--export([start_link/5]).
+-export([start_link/6]).
% gen_server callbacks
-export([init/1, terminate/2, code_change/3]).
@@ -64,28 +64,39 @@
stats = couch_replicator_stats:new(),
source_db_compaction_notifier = nil,
target_db_compaction_notifier = nil,
- batch = #batch{}
+ batch = #batch{},
+ stop_on_doc_write_failure
}).
-start_link(Cp, #httpdb{} = Source, Target, ChangesManager, MaxConns) ->
- gen_server:start_link(
- ?MODULE, {Cp, Source, Target, ChangesManager, MaxConns}, []);
+start_link(Cp, #httpdb{} = Source, Target, ChangesManager, MaxConns,
+ StopOnDocWriteFailure) ->
+ gen_server:start_link(?MODULE, {
+ Cp,
+ Source,
+ Target,
+ ChangesManager,
+ MaxConns,
+ StopOnDocWriteFailure
+ }, []);
-start_link(Cp, Source, Target, ChangesManager, _MaxConns) ->
+start_link(Cp, Source, Target, ChangesManager, _MaxConns,
+ StopOnDocWriteFailure) ->
Pid = spawn_link(fun() ->
erlang:put(last_stats_report, os:timestamp()),
- queue_fetch_loop(Source, Target, Cp, Cp, ChangesManager)
+ queue_fetch_loop(Source, Target, Cp, Cp, ChangesManager,
+ StopOnDocWriteFailure)
end),
{ok, Pid}.
-init({Cp, Source, Target, ChangesManager, MaxConns}) ->
+init({Cp, Source, Target, ChangesManager, MaxConns, StopOnDocWriteFailure}) ->
process_flag(trap_exit, true),
Parent = self(),
LoopPid = spawn_link(fun() ->
- queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager)
+ queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager,
+ StopOnDocWriteFailure)
end),
erlang:put(last_stats_report, os:timestamp()),
State = #state{
@@ -97,17 +108,19 @@ init({Cp, Source, Target, ChangesManager, MaxConns}) ->
source_db_compaction_notifier =
start_db_compaction_notifier(Source, self()),
target_db_compaction_notifier =
- start_db_compaction_notifier(Target, self())
+ start_db_compaction_notifier(Target, self()),
+ stop_on_doc_write_failure = StopOnDocWriteFailure
},
{ok, State}.
handle_call({fetch_doc, {_Id, _Revs, _PAs} = Params}, {Pid, _} = From,
#state{loop = Pid, readers = Readers, pending_fetch = nil,
- source = Src, target = Tgt, max_parallel_conns = MaxConns} = State) ->
+ source = Src, target = Tgt, max_parallel_conns = MaxConns,
+ stop_on_doc_write_failure = StopOnDocWriteFailure} = State) ->
case length(Readers) of
Size when Size < MaxConns ->
- Reader = spawn_doc_reader(Src, Tgt, Params),
+ Reader = spawn_doc_reader(Src, Tgt, Params, StopOnDocWriteFailure),
NewState = State#state{
readers = [Reader | Readers]
},
@@ -131,10 +144,12 @@ handle_call({add_stats, IncStats}, From, #state{stats = Stats} = State) ->
handle_call(flush, {Pid, _} = From,
#state{loop = Pid, writer = nil, flush_waiter = nil,
- target = Target, batch = Batch} = State) ->
+ target = Target, batch = Batch,
+ stop_on_doc_write_failure = StopOnDocWriteFailure
+ } = State) ->
State2 = case State#state.readers of
[] ->
- State#state{writer = spawn_writer(Target, Batch)};
+ State#state{writer = spawn_writer(Target, Batch, StopOnDocWriteFailure)};
_ ->
State
end,
@@ -177,7 +192,8 @@ handle_info({'EXIT', Pid, normal}, #state{writer = nil} = State) ->
#state{
readers = Readers, writer = Writer, batch = Batch,
source = Source, target = Target,
- pending_fetch = Fetch, flush_waiter = FlushWaiter
+ pending_fetch = Fetch, flush_waiter = FlushWaiter,
+ stop_on_doc_write_failure = StopOnDocWriteFailure
} = State,
case Readers -- [Pid] of
Readers ->
@@ -190,13 +206,14 @@ handle_info({'EXIT', Pid, normal}, #state{writer = nil} = State) ->
true ->
State#state{
readers = Readers2,
- writer = spawn_writer(Target, Batch)
+ writer = spawn_writer(Target, Batch, StopOnDocWriteFailure)
};
false ->
State#state{readers = Readers2}
end;
{From, FetchParams} ->
- Reader = spawn_doc_reader(Source, Target, FetchParams),
+ Reader = spawn_doc_reader(Source, Target, FetchParams,
+ StopOnDocWriteFailure),
gen_server:reply(From, ok),
State#state{
readers = [Reader | Readers2],
@@ -209,6 +226,10 @@ handle_info({'EXIT', Pid, normal}, #state{writer = nil} = State) ->
handle_info({'EXIT', _Pid, max_backoff}, State) ->
{stop, {shutdown, max_backoff}, State};
+handle_info({'EXIT', _Pid, {shutdown, {doc_write_failure, _, _}} = Error},
+ State) ->
+ {stop, Error, State};
+
handle_info({'EXIT', Pid, Reason}, State) ->
{stop, {process_died, Pid, Reason}, State}.
@@ -243,7 +264,8 @@ code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager) ->
+queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager,
+ StopOnDocWriteFailure) ->
ChangesManager ! {get_changes, self()},
receive
{closed, ChangesManager} ->
@@ -251,7 +273,8 @@ queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager) ->
{changes, ChangesManager, [], ReportSeq} ->
Stats = couch_replicator_stats:new(),
ok = gen_server:call(Cp, {report_seq_done, ReportSeq, Stats}, infinity),
- queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager);
+ queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager,
+ StopOnDocWriteFailure);
{changes, ChangesManager, Changes, ReportSeq} ->
Target2 = open_db(Target),
{IdRevs, Stats0} = find_missing(Changes, Target2),
@@ -263,41 +286,50 @@ queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager) ->
_Db ->
Source2 = open_db(Source),
Stats = local_process_batch(
- IdRevs, Cp, Source2, Target2, #batch{}, Stats0),
+ IdRevs, Cp, Source2, Target2, #batch{}, Stats0,
+ StopOnDocWriteFailure
+ ),
close_db(Source2)
end,
close_db(Target2),
ok = gen_server:call(Cp, {report_seq_done, ReportSeq, Stats}, infinity),
erlang:put(last_stats_report, os:timestamp()),
couch_log:debug("Worker reported completion of seq ~p", [ReportSeq]),
- queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager)
+ queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager,
+ StopOnDocWriteFailure)
end.
-local_process_batch([], _Cp, _Src, _Tgt, #batch{docs = []}, Stats) ->
+local_process_batch([], _Cp, _Src, _Tgt, #batch{docs = []}, Stats, _) ->
Stats;
-local_process_batch([], Cp, Source, Target, #batch{docs = Docs, size = Size}, Stats) ->
+local_process_batch([], Cp, Source, Target, #batch{docs = Docs, size = Size}, Stats,
+ StopOnDocWriteFailure) ->
case Target of
#httpdb{} ->
couch_log:debug("Worker flushing doc batch of size ~p bytes", [Size]);
_Db ->
couch_log:debug("Worker flushing doc batch of ~p docs", [Size])
end,
- Stats2 = flush_docs(Target, Docs),
+ Stats2 = flush_docs(Target, Docs, StopOnDocWriteFailure),
Stats3 = couch_replicator_utils:sum_stats(Stats, Stats2),
- local_process_batch([], Cp, Source, Target, #batch{}, Stats3);
-
-local_process_batch([IdRevs | Rest], Cp, Source, Target, Batch, Stats) ->
- {ok, {_, DocList, Stats2, _}} = fetch_doc(
- Source, IdRevs, fun local_doc_handler/2, {Target, [], Stats, Cp}),
+ local_process_batch([], Cp, Source, Target, #batch{}, Stats3,
+ StopOnDocWriteFailure);
+
+local_process_batch([IdRevs | Rest], Cp, Source, Target, Batch, Stats,
+ StopOnDocWriteFailure) ->
+ {ok, {_, DocList, Stats2, _, _}} = fetch_doc(
+ Source, IdRevs, fun local_doc_handler/2, {Target, [], Stats, Cp,
+ StopOnDocWriteFailure}),
{Batch2, Stats3} = lists:foldl(
fun(Doc, {Batch0, Stats0}) ->
- {Batch1, S} = maybe_flush_docs(Target, Batch0, Doc),
+ {Batch1, S} = maybe_flush_docs(Target, Batch0, Doc,
+ StopOnDocWriteFailure),
{Batch1, couch_replicator_utils:sum_stats(Stats0, S)}
end,
{Batch, Stats2}, DocList),
- local_process_batch(Rest, Cp, Source, Target, Batch2, Stats3).
+ local_process_batch(Rest, Cp, Source, Target, Batch2, Stats3,
+ StopOnDocWriteFailure).
remote_process_batch([], _Parent) ->
@@ -316,12 +348,13 @@ remote_process_batch([{Id, Revs, PAs} | Rest], Parent) ->
remote_process_batch(Rest, Parent).
-spawn_doc_reader(Source, Target, FetchParams) ->
+spawn_doc_reader(Source, Target, FetchParams, StopOnDocWriteFailure) ->
Parent = self(),
spawn_link(fun() ->
Source2 = open_db(Source),
fetch_doc(
- Source2, FetchParams, fun remote_doc_handler/2, {Parent, Target}),
+ Source2, FetchParams, fun remote_doc_handler/2, {Parent, Target,
+ StopOnDocWriteFailure}),
close_db(Source2)
end).
@@ -350,33 +383,36 @@ fetch_doc(Source, {Id, Revs, PAs}, DocHandler, Acc) ->
end.
-local_doc_handler({ok, Doc}, {Target, DocList, Stats, Cp}) ->
+local_doc_handler({ok, Doc}, {Target, DocList, Stats, Cp,
+ StopOnDocWriteFailure}) ->
Stats2 = couch_replicator_stats:increment(docs_read, Stats),
case batch_doc(Doc) of
true ->
- {ok, {Target, [Doc | DocList], Stats2, Cp}};
+ {ok, {Target, [Doc | DocList], Stats2, Cp, StopOnDocWriteFailure}};
false ->
couch_log:debug("Worker flushing doc with attachments", []),
Target2 = open_db(Target),
- Success = (flush_doc(Target2, Doc) =:= ok),
+ FlushResult = flush_doc(Target2, Doc),
close_db(Target2),
- Stats3 = case Success of
- true ->
+ Stats3 = case FlushResult of
+ ok ->
couch_replicator_stats:increment(docs_written, Stats2);
- false ->
+ Error->
+ stop_on_doc_write_failure(Doc#doc.id, Error,
+ StopOnDocWriteFailure),
couch_replicator_stats:increment(doc_write_failures, Stats2)
end,
Stats4 = maybe_report_stats(Cp, Stats3),
- {ok, {Target, DocList, Stats4, Cp}}
+ {ok, {Target, DocList, Stats4, Cp, StopOnDocWriteFailure}}
end;
local_doc_handler(_, Acc) ->
{ok, Acc}.
-remote_doc_handler({ok, #doc{atts = []} = Doc}, {Parent, _} = Acc) ->
+remote_doc_handler({ok, #doc{atts = []} = Doc}, {Parent, _, _} = Acc) ->
ok = gen_server:call(Parent, {batch_doc, Doc}, infinity),
{ok, Acc};
-remote_doc_handler({ok, Doc}, {Parent, Target} = Acc) ->
+remote_doc_handler({ok, Doc}, {Parent, Target, StopOnDocWriteFailure} = Acc) ->
% Immediately flush documents with attachments received from a remote
% source. The data property of each attachment is a function that starts
% streaming the attachment data from the remote source, therefore it's
@@ -384,12 +420,14 @@ remote_doc_handler({ok, Doc}, {Parent, Target} = Acc) ->
Stats = couch_replicator_stats:new([{docs_read, 1}]),
couch_log:debug("Worker flushing doc with attachments", []),
Target2 = open_db(Target),
- Success = (flush_doc(Target2, Doc) =:= ok),
+ FlushResult = flush_doc(Target2, Doc),
close_db(Target2),
- {Result, Stats2} = case Success of
- true ->
+ {Result, Stats2} = case FlushResult of
+ ok ->
{{ok, Acc}, couch_replicator_stats:increment(docs_written, Stats)};
- false ->
+ Error ->
+ stop_on_doc_write_failure(Doc#doc.id, Error,
+ StopOnDocWriteFailure),
{{skip, Acc}, couch_replicator_stats:increment(doc_write_failures, Stats)}
end,
ok = gen_server:call(Parent, {add_stats, Stats2}, infinity),
@@ -398,7 +436,8 @@ remote_doc_handler({{not_found, missing}, _}, _Acc) ->
throw(missing_doc).
-spawn_writer(Target, #batch{docs = DocList, size = Size}) ->
+spawn_writer(Target, #batch{docs = DocList, size = Size},
+ StopOnDocWriteFailure) ->
case {Target, Size > 0} of
{#httpdb{}, true} ->
couch_log:debug("Worker flushing doc batch of size ~p bytes", [Size]);
@@ -411,7 +450,7 @@ spawn_writer(Target, #batch{docs = DocList, size = Size}) ->
spawn_link(
fun() ->
Target2 = open_db(Target),
- Stats = flush_docs(Target2, DocList),
+ Stats = flush_docs(Target2, DocList, StopOnDocWriteFailure),
close_db(Target2),
ok = gen_server:call(Parent, {add_stats, Stats}, infinity)
end).
@@ -431,16 +470,17 @@ after_full_flush(#state{stats = Stats, flush_waiter = Waiter} = State) ->
maybe_flush_docs(Doc,State) ->
#state{
target = Target, batch = Batch,
- stats = Stats, cp = Cp
+ stats = Stats, cp = Cp,
+ stop_on_doc_write_failure = StopOnDocWriteFailure
} = State,
- {Batch2, WStats} = maybe_flush_docs(Target, Batch, Doc),
+ {Batch2, WStats} = maybe_flush_docs(Target, Batch, Doc, StopOnDocWriteFailure),
Stats2 = couch_replicator_stats:sum_stats(Stats, WStats),
Stats3 = couch_replicator_stats:increment(docs_read, Stats2),
Stats4 = maybe_report_stats(Cp, Stats3),
State#state{stats = Stats4, batch = Batch2}.
-maybe_flush_docs(#httpdb{} = Target, Batch, Doc) ->
+maybe_flush_docs(#httpdb{} = Target, Batch, Doc, StopOnDocWriteFailure) ->
#batch{docs = DocAcc, size = SizeAcc} = Batch,
case batch_doc(Doc) of
false ->
@@ -448,7 +488,8 @@ maybe_flush_docs(#httpdb{} = Target, Batch, Doc) ->
case flush_doc(Target, Doc) of
ok ->
{Batch, couch_replicator_stats:new([{docs_written, 1}])};
- _ ->
+ Error ->
+ stop_on_doc_write_failure(Doc#doc.id, Error, StopOnDocWriteFailure),
{Batch, couch_replicator_stats:new([{doc_write_failures, 1}])}
end;
true ->
@@ -456,7 +497,7 @@ maybe_flush_docs(#httpdb{} = Target, Batch, Doc) ->
case SizeAcc + iolist_size(JsonDoc) of
SizeAcc2 when SizeAcc2 > ?DOC_BUFFER_BYTE_SIZE ->
couch_log:debug("Worker flushing doc batch of size ~p bytes", [SizeAcc2]),
- Stats = flush_docs(Target, [JsonDoc | DocAcc]),
+ Stats = flush_docs(Target, [JsonDoc | DocAcc], StopOnDocWriteFailure),
{#batch{}, Stats};
SizeAcc2 ->
Stats = couch_replicator_stats:new(),
@@ -464,11 +505,12 @@ maybe_flush_docs(#httpdb{} = Target, Batch, Doc) ->
end
end;
-maybe_flush_docs(Target, #batch{docs = DocAcc, size = SizeAcc}, Doc) ->
+maybe_flush_docs(Target, #batch{docs = DocAcc, size = SizeAcc}, Doc,
+ StopOnDocWriteFailure) ->
case SizeAcc + 1 of
SizeAcc2 when SizeAcc2 >= ?DOC_BUFFER_LEN ->
couch_log:debug("Worker flushing doc batch of ~p docs", [SizeAcc2]),
- Stats = flush_docs(Target, [Doc | DocAcc]),
+ Stats = flush_docs(Target, [Doc | DocAcc], StopOnDocWriteFailure),
{#batch{}, Stats};
SizeAcc2 ->
Stats = couch_replicator_stats:new(),
@@ -487,34 +529,46 @@ batch_doc(#doc{atts = Atts}) ->
end, Atts).
-flush_docs(_Target, []) ->
+flush_docs(_Target, [], _) ->
couch_replicator_stats:new();
-flush_docs(Target, DocList) ->
+flush_docs(Target, DocList, StopOnDocWriteFailure) ->
FlushResult = couch_replicator_api_wrap:update_docs(Target, DocList,
[delay_commit], replicated_changes),
- handle_flush_docs_result(FlushResult, Target, DocList).
+ handle_flush_docs_result(FlushResult, Target, DocList,
+ StopOnDocWriteFailure).
-handle_flush_docs_result({error, request_body_too_large}, _Target, [Doc]) ->
+handle_flush_docs_result({error, request_body_too_large} = Error, _Target,
+ [Doc], StopOnDocWriteFailure) ->
couch_log:error("Replicator: failed to write doc ~p. Too large", [Doc]),
+ {DocProps} = ?JSON_DECODE(Doc),
+ DocId = get_value(<<"_id">>, DocProps, undefined),
+ stop_on_doc_write_failure(DocId, Error, StopOnDocWriteFailure),
couch_replicator_stats:new([{doc_write_failures, 1}]);
-handle_flush_docs_result({error, request_body_too_large}, Target, DocList) ->
+handle_flush_docs_result({error, request_body_too_large}, Target, DocList,
+ StopOnDocWriteFailure) ->
Len = length(DocList),
{DocList1, DocList2} = lists:split(Len div 2, DocList),
couch_log:notice("Replicator: couldn't write batch of size ~p to ~p because"
" request body is too large. Splitting batch into 2 separate batches of"
" sizes ~p and ~p", [Len, couch_replicator_api_wrap:db_uri(Target),
length(DocList1), length(DocList2)]),
- flush_docs(Target, DocList1),
- flush_docs(Target, DocList2);
-handle_flush_docs_result({ok, Errors}, Target, DocList) ->
+ flush_docs(Target, DocList1, StopOnDocWriteFailure),
+ flush_docs(Target, DocList2, StopOnDocWriteFailure);
+handle_flush_docs_result({ok, Errors}, Target, DocList, StopOnDocWriteFailure) ->
DbUri = couch_replicator_api_wrap:db_uri(Target),
lists:foreach(
fun({Props}) ->
+ Id = get_value(id, Props, ""),
+ Rev = get_value(rev, Props, ""),
+ Error = get_value(error, Props, ""),
+ Reason = get_value(reason, Props, ""),
couch_log:error("Replicator: couldn't write document `~s`, revision"
" `~s`, to target database `~s`. Error: `~s`, reason: `~s`.", [
- get_value(id, Props, ""), get_value(rev, Props, ""), DbUri,
- get_value(error, Props, ""), get_value(reason, Props, "")])
+ Id, Rev, DbUri, Error, Reason
+ ]),
+ stop_on_doc_write_failure(Id, {Error, Reason},
+ StopOnDocWriteFailure)
end, Errors),
couch_replicator_stats:new([
{docs_written, length(DocList) - length(Errors)},
@@ -581,6 +635,15 @@ maybe_report_stats(Cp, Stats) ->
end.
+stop_on_doc_write_failure(DocId, Error, true) ->
+ DocIdStr = binary_to_list(DocId),
+ exit({shutdown, {doc_write_failure, DocIdStr, Error}});
+
+stop_on_doc_write_failure(_, _, false) ->
+ ok.
+
+
+
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").