diff options
Diffstat (limited to 'src/couch_replicator/src/couch_replicator_worker.erl')
-rw-r--r-- | src/couch_replicator/src/couch_replicator_worker.erl | 143 |
1 files changed, 12 insertions, 131 deletions
diff --git a/src/couch_replicator/src/couch_replicator_worker.erl b/src/couch_replicator/src/couch_replicator_worker.erl index ec98fa0f3..986c32c0a 100644 --- a/src/couch_replicator/src/couch_replicator_worker.erl +++ b/src/couch_replicator/src/couch_replicator_worker.erl @@ -28,18 +28,11 @@ % TODO: maybe make both buffer max sizes configurable -define(DOC_BUFFER_BYTE_SIZE, 512 * 1024). % for remote targets --define(DOC_BUFFER_LEN, 10). % for local targets, # of documents -define(MAX_BULK_ATT_SIZE, 64 * 1024). -define(MAX_BULK_ATTS_PER_DOC, 8). -define(STATS_DELAY, 10000000). % 10 seconds (in microseconds) -define(MISSING_DOC_RETRY_MSEC, 2000). --import(couch_replicator_utils, [ - open_db/1, - close_db/1, - start_db_compaction_notifier/2, - stop_db_compaction_notifier/1 -]). -import(couch_util, [ to_binary/1, get_value/3 @@ -62,8 +55,6 @@ pending_fetch = nil, flush_waiter = nil, stats = couch_replicator_stats:new(), - source_db_compaction_notifier = nil, - target_db_compaction_notifier = nil, batch = #batch{} }). @@ -71,14 +62,7 @@ start_link(Cp, #httpdb{} = Source, Target, ChangesManager, MaxConns) -> gen_server:start_link( - ?MODULE, {Cp, Source, Target, ChangesManager, MaxConns}, []); - -start_link(Cp, Source, Target, ChangesManager, _MaxConns) -> - Pid = spawn_link(fun() -> - erlang:put(last_stats_report, os:timestamp()), - queue_fetch_loop(Source, Target, Cp, Cp, ChangesManager) - end), - {ok, Pid}. + ?MODULE, {Cp, Source, Target, ChangesManager, MaxConns}, []). init({Cp, Source, Target, ChangesManager, MaxConns}) -> @@ -92,12 +76,8 @@ init({Cp, Source, Target, ChangesManager, MaxConns}) -> cp = Cp, max_parallel_conns = MaxConns, loop = LoopPid, - source = open_db(Source), - target = open_db(Target), - source_db_compaction_notifier = - start_db_compaction_notifier(Source, self()), - target_db_compaction_notifier = - start_db_compaction_notifier(Target, self()) + source = Source, + target = Target }, {ok, State}. @@ -141,24 +121,6 @@ handle_call(flush, {Pid, _} = From, {noreply, State2#state{flush_waiter = From}}. -handle_cast({db_compacted, DbName} = Msg, #state{} = State) -> - #state{ - source = Source, - target = Target - } = State, - SourceName = couch_replicator_utils:local_db_name(Source), - TargetName = couch_replicator_utils:local_db_name(Target), - case DbName of - SourceName -> - {ok, NewSource} = couch_db:reopen(Source), - {noreply, State#state{source = NewSource}}; - TargetName -> - {ok, NewTarget} = couch_db:reopen(Target), - {noreply, State#state{target = NewTarget}}; - _Else -> - {stop, {unexpected_async_call, Msg}, State} - end; - handle_cast(Msg, State) -> {stop, {unexpected_async_call, Msg}, State}. @@ -213,11 +175,8 @@ handle_info({'EXIT', Pid, Reason}, State) -> {stop, {process_died, Pid, Reason}, State}. -terminate(_Reason, State) -> - close_db(State#state.source), - close_db(State#state.target), - stop_db_compaction_notifier(State#state.source_db_compaction_notifier), - stop_db_compaction_notifier(State#state.target_db_compaction_notifier). +terminate(_Reason, _State) -> + ok. format_status(_Opt, [_PDict, State]) -> #state{ @@ -253,20 +212,10 @@ queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager) -> ok = gen_server:call(Cp, {report_seq_done, ReportSeq, Stats}, infinity), queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager); {changes, ChangesManager, Changes, ReportSeq} -> - Target2 = open_db(Target), - {IdRevs, Stats0} = find_missing(Changes, Target2), - case Source of - #httpdb{} -> - ok = gen_server:call(Parent, {add_stats, Stats0}, infinity), - remote_process_batch(IdRevs, Parent), - {ok, Stats} = gen_server:call(Parent, flush, infinity); - _Db -> - Source2 = open_db(Source), - Stats = local_process_batch( - IdRevs, Cp, Source2, Target2, #batch{}, Stats0), - close_db(Source2) - end, - close_db(Target2), + {IdRevs, Stats0} = find_missing(Changes, Target), + ok = gen_server:call(Parent, {add_stats, Stats0}, infinity), + remote_process_batch(IdRevs, Parent), + {ok, Stats} = gen_server:call(Parent, flush, infinity), ok = gen_server:call(Cp, {report_seq_done, ReportSeq, Stats}, infinity), erlang:put(last_stats_report, os:timestamp()), couch_log:debug("Worker reported completion of seq ~p", [ReportSeq]), @@ -274,32 +223,6 @@ queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager) -> end. -local_process_batch([], _Cp, _Src, _Tgt, #batch{docs = []}, Stats) -> - Stats; - -local_process_batch([], Cp, Source, Target, #batch{docs = Docs, size = Size}, Stats) -> - case Target of - #httpdb{} -> - couch_log:debug("Worker flushing doc batch of size ~p bytes", [Size]); - _Db -> - couch_log:debug("Worker flushing doc batch of ~p docs", [Size]) - end, - Stats2 = flush_docs(Target, Docs), - Stats3 = couch_replicator_utils:sum_stats(Stats, Stats2), - local_process_batch([], Cp, Source, Target, #batch{}, Stats3); - -local_process_batch([IdRevs | Rest], Cp, Source, Target, Batch, Stats) -> - {ok, {_, DocList, Stats2, _}} = fetch_doc( - Source, IdRevs, fun local_doc_handler/2, {Target, [], Stats, Cp}), - {Batch2, Stats3} = lists:foldl( - fun(Doc, {Batch0, Stats0}) -> - {Batch1, S} = maybe_flush_docs(Target, Batch0, Doc), - {Batch1, couch_replicator_utils:sum_stats(Stats0, S)} - end, - {Batch, Stats2}, DocList), - local_process_batch(Rest, Cp, Source, Target, Batch2, Stats3). - - remote_process_batch([], _Parent) -> ok; @@ -319,10 +242,8 @@ remote_process_batch([{Id, Revs, PAs} | Rest], Parent) -> spawn_doc_reader(Source, Target, FetchParams) -> Parent = self(), spawn_link(fun() -> - Source2 = open_db(Source), fetch_doc( - Source2, FetchParams, fun remote_doc_handler/2, {Parent, Target}), - close_db(Source2) + Source, FetchParams, fun remote_doc_handler/2, {Parent, Target}) end). @@ -350,29 +271,6 @@ fetch_doc(Source, {Id, Revs, PAs}, DocHandler, Acc) -> end. -local_doc_handler({ok, Doc}, {Target, DocList, Stats, Cp}) -> - Stats2 = couch_replicator_stats:increment(docs_read, Stats), - case batch_doc(Doc) of - true -> - {ok, {Target, [Doc | DocList], Stats2, Cp}}; - false -> - couch_log:debug("Worker flushing doc with attachments", []), - Target2 = open_db(Target), - Success = (flush_doc(Target2, Doc) =:= ok), - close_db(Target2), - Stats3 = case Success of - true -> - couch_replicator_stats:increment(docs_written, Stats2); - false -> - couch_replicator_stats:increment(doc_write_failures, Stats2) - end, - Stats4 = maybe_report_stats(Cp, Stats3), - {ok, {Target, DocList, Stats4, Cp}} - end; -local_doc_handler(_, Acc) -> - {ok, Acc}. - - remote_doc_handler({ok, #doc{atts = []} = Doc}, {Parent, _} = Acc) -> ok = gen_server:call(Parent, {batch_doc, Doc}, infinity), {ok, Acc}; @@ -383,9 +281,7 @@ remote_doc_handler({ok, Doc}, {Parent, Target} = Acc) -> % convenient to call it ASAP to avoid ibrowse inactivity timeouts. Stats = couch_replicator_stats:new([{docs_read, 1}]), couch_log:debug("Worker flushing doc with attachments", []), - Target2 = open_db(Target), - Success = (flush_doc(Target2, Doc) =:= ok), - close_db(Target2), + Success = (flush_doc(Target, Doc) =:= ok), {Result, Stats2} = case Success of true -> {{ok, Acc}, couch_replicator_stats:increment(docs_written, Stats)}; @@ -402,17 +298,13 @@ spawn_writer(Target, #batch{docs = DocList, size = Size}) -> case {Target, Size > 0} of {#httpdb{}, true} -> couch_log:debug("Worker flushing doc batch of size ~p bytes", [Size]); - {_Db, true} -> - couch_log:debug("Worker flushing doc batch of ~p docs", [Size]); _ -> ok end, Parent = self(), spawn_link( fun() -> - Target2 = open_db(Target), - Stats = flush_docs(Target2, DocList), - close_db(Target2), + Stats = flush_docs(Target, DocList), ok = gen_server:call(Parent, {add_stats, Stats}, infinity) end). @@ -462,17 +354,6 @@ maybe_flush_docs(#httpdb{} = Target, Batch, Doc) -> Stats = couch_replicator_stats:new(), {#batch{docs = [JsonDoc | DocAcc], size = SizeAcc2}, Stats} end - end; - -maybe_flush_docs(Target, #batch{docs = DocAcc, size = SizeAcc}, Doc) -> - case SizeAcc + 1 of - SizeAcc2 when SizeAcc2 >= ?DOC_BUFFER_LEN -> - couch_log:debug("Worker flushing doc batch of ~p docs", [SizeAcc2]), - Stats = flush_docs(Target, [Doc | DocAcc]), - {#batch{}, Stats}; - SizeAcc2 -> - Stats = couch_replicator_stats:new(), - {#batch{docs = [Doc | DocAcc], size = SizeAcc2}, Stats} end. |