summaryrefslogtreecommitdiff
path: root/src/couch_replicator/src/couch_replicator_worker.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couch_replicator/src/couch_replicator_worker.erl')
-rw-r--r--src/couch_replicator/src/couch_replicator_worker.erl143
1 files changed, 12 insertions, 131 deletions
diff --git a/src/couch_replicator/src/couch_replicator_worker.erl b/src/couch_replicator/src/couch_replicator_worker.erl
index ec98fa0f3..986c32c0a 100644
--- a/src/couch_replicator/src/couch_replicator_worker.erl
+++ b/src/couch_replicator/src/couch_replicator_worker.erl
@@ -28,18 +28,11 @@
% TODO: maybe make both buffer max sizes configurable
-define(DOC_BUFFER_BYTE_SIZE, 512 * 1024). % for remote targets
--define(DOC_BUFFER_LEN, 10). % for local targets, # of documents
-define(MAX_BULK_ATT_SIZE, 64 * 1024).
-define(MAX_BULK_ATTS_PER_DOC, 8).
-define(STATS_DELAY, 10000000). % 10 seconds (in microseconds)
-define(MISSING_DOC_RETRY_MSEC, 2000).
--import(couch_replicator_utils, [
- open_db/1,
- close_db/1,
- start_db_compaction_notifier/2,
- stop_db_compaction_notifier/1
-]).
-import(couch_util, [
to_binary/1,
get_value/3
@@ -62,8 +55,6 @@
pending_fetch = nil,
flush_waiter = nil,
stats = couch_replicator_stats:new(),
- source_db_compaction_notifier = nil,
- target_db_compaction_notifier = nil,
batch = #batch{}
}).
@@ -71,14 +62,7 @@
start_link(Cp, #httpdb{} = Source, Target, ChangesManager, MaxConns) ->
gen_server:start_link(
- ?MODULE, {Cp, Source, Target, ChangesManager, MaxConns}, []);
-
-start_link(Cp, Source, Target, ChangesManager, _MaxConns) ->
- Pid = spawn_link(fun() ->
- erlang:put(last_stats_report, os:timestamp()),
- queue_fetch_loop(Source, Target, Cp, Cp, ChangesManager)
- end),
- {ok, Pid}.
+ ?MODULE, {Cp, Source, Target, ChangesManager, MaxConns}, []).
init({Cp, Source, Target, ChangesManager, MaxConns}) ->
@@ -92,12 +76,8 @@ init({Cp, Source, Target, ChangesManager, MaxConns}) ->
cp = Cp,
max_parallel_conns = MaxConns,
loop = LoopPid,
- source = open_db(Source),
- target = open_db(Target),
- source_db_compaction_notifier =
- start_db_compaction_notifier(Source, self()),
- target_db_compaction_notifier =
- start_db_compaction_notifier(Target, self())
+ source = Source,
+ target = Target
},
{ok, State}.
@@ -141,24 +121,6 @@ handle_call(flush, {Pid, _} = From,
{noreply, State2#state{flush_waiter = From}}.
-handle_cast({db_compacted, DbName} = Msg, #state{} = State) ->
- #state{
- source = Source,
- target = Target
- } = State,
- SourceName = couch_replicator_utils:local_db_name(Source),
- TargetName = couch_replicator_utils:local_db_name(Target),
- case DbName of
- SourceName ->
- {ok, NewSource} = couch_db:reopen(Source),
- {noreply, State#state{source = NewSource}};
- TargetName ->
- {ok, NewTarget} = couch_db:reopen(Target),
- {noreply, State#state{target = NewTarget}};
- _Else ->
- {stop, {unexpected_async_call, Msg}, State}
- end;
-
handle_cast(Msg, State) ->
{stop, {unexpected_async_call, Msg}, State}.
@@ -213,11 +175,8 @@ handle_info({'EXIT', Pid, Reason}, State) ->
{stop, {process_died, Pid, Reason}, State}.
-terminate(_Reason, State) ->
- close_db(State#state.source),
- close_db(State#state.target),
- stop_db_compaction_notifier(State#state.source_db_compaction_notifier),
- stop_db_compaction_notifier(State#state.target_db_compaction_notifier).
+terminate(_Reason, _State) ->
+ ok.
format_status(_Opt, [_PDict, State]) ->
#state{
@@ -253,20 +212,10 @@ queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager) ->
ok = gen_server:call(Cp, {report_seq_done, ReportSeq, Stats}, infinity),
queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager);
{changes, ChangesManager, Changes, ReportSeq} ->
- Target2 = open_db(Target),
- {IdRevs, Stats0} = find_missing(Changes, Target2),
- case Source of
- #httpdb{} ->
- ok = gen_server:call(Parent, {add_stats, Stats0}, infinity),
- remote_process_batch(IdRevs, Parent),
- {ok, Stats} = gen_server:call(Parent, flush, infinity);
- _Db ->
- Source2 = open_db(Source),
- Stats = local_process_batch(
- IdRevs, Cp, Source2, Target2, #batch{}, Stats0),
- close_db(Source2)
- end,
- close_db(Target2),
+ {IdRevs, Stats0} = find_missing(Changes, Target),
+ ok = gen_server:call(Parent, {add_stats, Stats0}, infinity),
+ remote_process_batch(IdRevs, Parent),
+ {ok, Stats} = gen_server:call(Parent, flush, infinity),
ok = gen_server:call(Cp, {report_seq_done, ReportSeq, Stats}, infinity),
erlang:put(last_stats_report, os:timestamp()),
couch_log:debug("Worker reported completion of seq ~p", [ReportSeq]),
@@ -274,32 +223,6 @@ queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager) ->
end.
-local_process_batch([], _Cp, _Src, _Tgt, #batch{docs = []}, Stats) ->
- Stats;
-
-local_process_batch([], Cp, Source, Target, #batch{docs = Docs, size = Size}, Stats) ->
- case Target of
- #httpdb{} ->
- couch_log:debug("Worker flushing doc batch of size ~p bytes", [Size]);
- _Db ->
- couch_log:debug("Worker flushing doc batch of ~p docs", [Size])
- end,
- Stats2 = flush_docs(Target, Docs),
- Stats3 = couch_replicator_utils:sum_stats(Stats, Stats2),
- local_process_batch([], Cp, Source, Target, #batch{}, Stats3);
-
-local_process_batch([IdRevs | Rest], Cp, Source, Target, Batch, Stats) ->
- {ok, {_, DocList, Stats2, _}} = fetch_doc(
- Source, IdRevs, fun local_doc_handler/2, {Target, [], Stats, Cp}),
- {Batch2, Stats3} = lists:foldl(
- fun(Doc, {Batch0, Stats0}) ->
- {Batch1, S} = maybe_flush_docs(Target, Batch0, Doc),
- {Batch1, couch_replicator_utils:sum_stats(Stats0, S)}
- end,
- {Batch, Stats2}, DocList),
- local_process_batch(Rest, Cp, Source, Target, Batch2, Stats3).
-
-
remote_process_batch([], _Parent) ->
ok;
@@ -319,10 +242,8 @@ remote_process_batch([{Id, Revs, PAs} | Rest], Parent) ->
spawn_doc_reader(Source, Target, FetchParams) ->
Parent = self(),
spawn_link(fun() ->
- Source2 = open_db(Source),
fetch_doc(
- Source2, FetchParams, fun remote_doc_handler/2, {Parent, Target}),
- close_db(Source2)
+ Source, FetchParams, fun remote_doc_handler/2, {Parent, Target})
end).
@@ -350,29 +271,6 @@ fetch_doc(Source, {Id, Revs, PAs}, DocHandler, Acc) ->
end.
-local_doc_handler({ok, Doc}, {Target, DocList, Stats, Cp}) ->
- Stats2 = couch_replicator_stats:increment(docs_read, Stats),
- case batch_doc(Doc) of
- true ->
- {ok, {Target, [Doc | DocList], Stats2, Cp}};
- false ->
- couch_log:debug("Worker flushing doc with attachments", []),
- Target2 = open_db(Target),
- Success = (flush_doc(Target2, Doc) =:= ok),
- close_db(Target2),
- Stats3 = case Success of
- true ->
- couch_replicator_stats:increment(docs_written, Stats2);
- false ->
- couch_replicator_stats:increment(doc_write_failures, Stats2)
- end,
- Stats4 = maybe_report_stats(Cp, Stats3),
- {ok, {Target, DocList, Stats4, Cp}}
- end;
-local_doc_handler(_, Acc) ->
- {ok, Acc}.
-
-
remote_doc_handler({ok, #doc{atts = []} = Doc}, {Parent, _} = Acc) ->
ok = gen_server:call(Parent, {batch_doc, Doc}, infinity),
{ok, Acc};
@@ -383,9 +281,7 @@ remote_doc_handler({ok, Doc}, {Parent, Target} = Acc) ->
% convenient to call it ASAP to avoid ibrowse inactivity timeouts.
Stats = couch_replicator_stats:new([{docs_read, 1}]),
couch_log:debug("Worker flushing doc with attachments", []),
- Target2 = open_db(Target),
- Success = (flush_doc(Target2, Doc) =:= ok),
- close_db(Target2),
+ Success = (flush_doc(Target, Doc) =:= ok),
{Result, Stats2} = case Success of
true ->
{{ok, Acc}, couch_replicator_stats:increment(docs_written, Stats)};
@@ -402,17 +298,13 @@ spawn_writer(Target, #batch{docs = DocList, size = Size}) ->
case {Target, Size > 0} of
{#httpdb{}, true} ->
couch_log:debug("Worker flushing doc batch of size ~p bytes", [Size]);
- {_Db, true} ->
- couch_log:debug("Worker flushing doc batch of ~p docs", [Size]);
_ ->
ok
end,
Parent = self(),
spawn_link(
fun() ->
- Target2 = open_db(Target),
- Stats = flush_docs(Target2, DocList),
- close_db(Target2),
+ Stats = flush_docs(Target, DocList),
ok = gen_server:call(Parent, {add_stats, Stats}, infinity)
end).
@@ -462,17 +354,6 @@ maybe_flush_docs(#httpdb{} = Target, Batch, Doc) ->
Stats = couch_replicator_stats:new(),
{#batch{docs = [JsonDoc | DocAcc], size = SizeAcc2}, Stats}
end
- end;
-
-maybe_flush_docs(Target, #batch{docs = DocAcc, size = SizeAcc}, Doc) ->
- case SizeAcc + 1 of
- SizeAcc2 when SizeAcc2 >= ?DOC_BUFFER_LEN ->
- couch_log:debug("Worker flushing doc batch of ~p docs", [SizeAcc2]),
- Stats = flush_docs(Target, [Doc | DocAcc]),
- {#batch{}, Stats};
- SizeAcc2 ->
- Stats = couch_replicator_stats:new(),
- {#batch{docs = [Doc | DocAcc], size = SizeAcc2}, Stats}
end.