diff options
authorNick Vatamaniuc <>2022-09-06 02:46:56 -0400
committerNick Vatamaniuc <>2022-09-14 12:52:03 -0400
commit9fab3868de298b44b931a51d791b1d54bdc3c2ed (patch)
parentf1b7ce6370621634ebab4e27218654efac059472 (diff)
Statistically skip _revs_diff in the replicator
When the revisions are consistently missing from the target, for example when replicating to a new target database, it's wasteful to keep calling _revs_diff. Reuse the same algorithm used when skipping calls to _bulk_get if there are too many attachments to statistically start skipping calls to _revs_diff. Also use the same record for keeping track of stats so rename `bulk_get_stats` to `fetch_stats`. The update and attempt functions are re-used as well, there are just separate wrappers which pass in different values for the threshold and other parameters.
2 files changed, 192 insertions, 87 deletions
diff --git a/src/couch_replicator/src/couch_replicator_worker.erl b/src/couch_replicator/src/couch_replicator_worker.erl
index 4df5d1c7c..d8f872388 100644
--- a/src/couch_replicator/src/couch_replicator_worker.erl
+++ b/src/couch_replicator/src/couch_replicator_worker.erl
@@ -28,9 +28,30 @@
-define(DOC_BUFFER_BYTE_SIZE, 512 * 1024).
-define(STATS_DELAY_SEC, 10).
-define(MISSING_DOC_RETRY_MSEC, 2000).
+% Coefficients for the bulk_get and revs_diffs stats processing.
+% Threshold is the ratio limit. Valid ranges are from 0.0 to 1.0. When
+% the estimated ratio is greater than the threshold, the "action" is
+% performed (_bulk_get or _revs_diff API are used). If it's below the
+% limit, the action is not performed.
+% Decay is the "alpha" of the exponential moving average. Valid ranges
+% are from 0.0 to 1.0. The closer to 1.0, the more the latest update
+% takes effect; the closer to 0.0, the more historical values persist.
+% Retries are the forced periodic retry intervals. Even if the
+% estimated ratio indicates not to perform the action, periodically
+% try to use the API anyway to update the stats with the latest
+% information. Both values are at 37 seconds, which is a bit longer
+% than the default checkpoint interval.
-define(BULK_GET_RATIO_DECAY, 0.25).
-define(BULK_GET_RETRY_SEC, 37).
+-define(REVS_DIFF_RATIO_DECAY, 0.4).
+-define(REVS_DIFF_RETRY_SEC, 37).
-import(couch_util, [
@@ -57,7 +78,7 @@
batch = #batch{}
--record(bulk_get_stats, {
+-record(fetch_stats, {
ratio = 0,
tsec = 0
@@ -69,7 +90,8 @@
- bulk_get_stats
+ bulk_get_stats = #fetch_stats{},
+ revs_diff_stats = #fetch_stats{}
start_link(Cp, #httpdb{} = Source, Target, ChangesManager, [_ | _] = Options) ->
@@ -89,7 +111,8 @@ init({Cp, Source, Target, ChangesManager, Options}) ->
parent = self(),
changes_manager = ChangesManager,
use_bulk_get = UseBulkGet,
- bulk_get_stats = #bulk_get_stats{ratio = 0, tsec = NowSec}
+ bulk_get_stats = #fetch_stats{tsec = NowSec},
+ revs_diff_stats = #fetch_stats{tsec = NowSec}
State = #state{
cp = Cp,
@@ -261,7 +284,8 @@ queue_fetch_loop(#fetch_st{} = St) ->
parent = Parent,
changes_manager = ChangesManager,
use_bulk_get = UseBulkGet,
- bulk_get_stats = BgSt
+ bulk_get_stats = BgSt,
+ revs_diff_stats = RdSt
} = St,
ChangesManager ! {get_changes, self()},
@@ -272,7 +296,7 @@ queue_fetch_loop(#fetch_st{} = St) ->
{changes, ChangesManager, Changes, ReportSeq} ->
% Find missing revisions (POST to _revs_diff)
- IdRevs = find_missing(Changes, Target, Parent),
+ {IdRevs, RdSt1} = find_missing(Changes, Target, Parent, RdSt),
{Docs, BgSt1} = bulk_get(UseBulkGet, Source, IdRevs, Parent, BgSt),
% Documents without attachments can be uploaded right away
BatchFun = fun({_, #doc{} = Doc}) ->
@@ -287,7 +311,11 @@ queue_fetch_loop(#fetch_st{} = St) ->
{ok, Stats} = gen_server:call(Parent, flush, infinity),
ok = report_seq_done(Cp, ReportSeq, Stats),
couch_log:debug("Worker reported completion of seq ~p", [ReportSeq]),
- queue_fetch_loop(St#fetch_st{bulk_get_stats = BgSt1})
+ St1 = St#fetch_st{
+ bulk_get_stats = BgSt1,
+ revs_diff_stats = RdSt1
+ },
+ queue_fetch_loop(St1)
% Return revisions without attachments. Maintain an exponential moving failure
@@ -297,9 +325,9 @@ queue_fetch_loop(#fetch_st{} = St) ->
% _bulk_get. After a few successful attempts that should lower the failure rate
% enough to start allow using _bulk_get again.
-bulk_get(false, _Source, _IdRevs, _Parent, #bulk_get_stats{} = St) ->
+bulk_get(false, _Source, _IdRevs, _Parent, #fetch_stats{} = St) ->
{#{}, St};
-bulk_get(true, Source, IdRevs, Parent, #bulk_get_stats{} = St) ->
+bulk_get(true, Source, IdRevs, Parent, #fetch_stats{} = St) ->
NowSec = erlang:monotonic_time(second),
case attempt_bulk_get(St, NowSec) of
true ->
@@ -311,8 +339,7 @@ bulk_get(true, Source, IdRevs, Parent, #bulk_get_stats{} = St) ->
{bulk_get_attempts, Attempts}
ok = sum_stats(Parent, Stats),
- St1 = update_bulk_get_ratio(St, Successes, Attempts),
- {Docs, St1#bulk_get_stats{tsec = NowSec}};
+ {Docs, update_bulk_get_ratio(St, Successes, Attempts, NowSec)};
false ->
{#{}, St}
@@ -332,24 +359,36 @@ bulk_get(#httpdb{} = Source, #{} = IdRevs) ->
-attempt_bulk_get(#bulk_get_stats{} = St, NowSec) ->
- #bulk_get_stats{tsec = TSec, ratio = Ratio} = St,
- TimeThreshold = (NowSec - TSec) > ?BULK_GET_RETRY_SEC,
- RatioThreshold = Ratio =< ?BULK_GET_RATIO_THRESHOLD,
+attempt_fetch(#fetch_stats{} = St, NowSec, RetryLimit, RatioLimit) ->
+ #fetch_stats{tsec = TSec, ratio = Ratio} = St,
+ TimeThreshold = (NowSec - TSec) > RetryLimit,
+ RatioThreshold = Ratio =< RatioLimit,
TimeThreshold orelse RatioThreshold.
+attempt_bulk_get(#fetch_stats{} = St, NowSec) ->
+attempt_revs_diff(#fetch_stats{} = St, NowSec) ->
% Update fail ratio. Use the basic exponential moving average formula to smooth
% over minor bumps in case we encounter a few % attachments and then get back
% to replicationg documents without attachments.
-update_bulk_get_ratio(#bulk_get_stats{} = St, Successes, Attempts) ->
- #bulk_get_stats{ratio = Avg} = St,
+update_fetch_stats(#fetch_stats{} = St, Successes, Attempts, Decay, NowSec) ->
+ #fetch_stats{ratio = Avg} = St,
Ratio =
case Attempts > 0 of
true -> (Attempts - Successes) / Attempts;
false -> 0
- St#bulk_get_stats{ratio = ?BULK_GET_RATIO_DECAY * (Ratio - Avg) + Avg}.
+ St#fetch_stats{ratio = Decay * (Ratio - Avg) + Avg, tsec = NowSec}.
+update_bulk_get_ratio(#fetch_stats{} = St, Successes, Attempts, NowSec) ->
+ update_fetch_stats(St, Successes, Attempts, ?BULK_GET_RATIO_DECAY, NowSec).
+update_revs_diff_ratio(#fetch_stats{} = St, Successes, Attempts, NowSec) ->
+ update_fetch_stats(St, Successes, Attempts, ?REVS_DIFF_RATIO_DECAY, NowSec).
-spec spawn_doc_reader(#httpdb{}, #httpdb{}, {list(), list(), list()}) -> no_return().
spawn_doc_reader(Source, Target, FetchParams) ->
@@ -574,8 +613,8 @@ flush_doc(Target, #doc{id = Id, revs = {Pos, [RevId | _]}} = Doc) ->
{error, Err}
-find_missing(DocInfos, Target, Parent) ->
- {IdRevs, AllRevsCount} = lists:foldr(
+find_missing(DocInfos, Target, Parent, #fetch_stats{} = St) ->
+ {IdRevs, AllCount} = lists:foldr(
(#doc_info{revs = []}, {IdRevAcc, CountAcc}) ->
{IdRevAcc, CountAcc};
@@ -586,27 +625,49 @@ find_missing(DocInfos, Target, Parent) ->
{[], 0},
- Missing =
- case couch_replicator_api_wrap:get_missing_revs(Target, IdRevs) of
- {ok, Result} -> Result;
- {error, Error} -> exit(Error)
+ NowSec = erlang:monotonic_time(second),
+ {MissingRes, St1} =
+ case attempt_revs_diff(St, NowSec) of
+ true ->
+ Missing = find_missing(IdRevs, Target),
+ % The target might have some of the revisions and those might have
+ % attachments associated with them, so only consider missing
+ % revisions with an empty "possible_ancestors" list.
+ FoldFun = fun
+ (_IdRev, _PAs = [_ | _], Acc) -> Acc;
+ (_IdRev, _PAs = [], Acc) -> Acc + 1
+ end,
+ MissingWithoutPAs = maps:fold(FoldFun, 0, Missing),
+ % The "success" metric of the update algorithm is the number of
+ % revisions which are already on target. The higher the number -
+ % the higher the chance of calling _revs_diff. If it gets lower
+ % than a threshold, it's worth avoiding calling revs_diff since the
+ % target seems to be missing the majority of the revisions.
+ OnTarget = AllCount - MissingWithoutPAs,
+ {Missing, update_revs_diff_ratio(St, OnTarget, AllCount, NowSec)};
+ false ->
+ % Construct the result to look as if _revs_diff returned with
+ % all missing revs. To reuse the existing id_revs_map/1
+ % function, add the empty PAs list to IdRevs input.
+ MapFun = fun({Id, Revs}) -> {Id, Revs, []} end,
+ {id_rev_map(lists:map(MapFun, IdRevs)), St}
- MissingRevsCount = lists:foldl(
- fun({_Id, MissingRevs, _PAs}, Acc) -> Acc + length(MissingRevs) end,
- 0,
- Missing
- ),
- ok = sum_stats(
- Parent,
- couch_replicator_stats:new([
- {missing_checked, AllRevsCount},
- {missing_found, MissingRevsCount}
- ])
- ),
- % Turn {Id, [Rev1, Rev2, ...], PAs} into a map:
- % #{{Id, Rev1} => PAs, {Id, Rev2} => PAs, ...}
- id_rev_map(Missing).
+ Stats = couch_replicator_stats:new([
+ {missing_checked, AllCount},
+ {missing_found, map_size(MissingRes)}
+ ]),
+ ok = sum_stats(Parent, Stats),
+ {MissingRes, St1}.
+find_missing(Revs, Target) ->
+ case couch_replicator_api_wrap:get_missing_revs(Target, Revs) of
+ {ok, Missing} ->
+ % Turn {Id, [Rev1, Rev2, ...], PAs} into a map:
+ % #{{Id, Rev1} => PAs, {Id, Rev2} => PAs, ...}
+ id_rev_map(Missing);
+ {error, Error} ->
+ exit(Error)
+ end.
id_rev_map(IdRevs) ->
id_rev_map(IdRevs, #{}).
@@ -655,68 +716,42 @@ replication_worker_format_status_test() ->
bulk_get_attempt_test() ->
Now = erlang:monotonic_time(second),
- St = #bulk_get_stats{ratio = 0, tsec = Now},
- ?assert(attempt_bulk_get(St#bulk_get_stats{ratio = 0.1}, Now)),
- ?assertNot(attempt_bulk_get(St#bulk_get_stats{ratio = 0.9}, Now)),
+ St = #fetch_stats{ratio = 0, tsec = Now},
+ ?assert(attempt_bulk_get(St#fetch_stats{ratio = 0.1}, Now)),
+ ?assertNot(attempt_bulk_get(St#fetch_stats{ratio = 0.9}, Now)),
RetryTime = Now + ?BULK_GET_RETRY_SEC + 1,
- ?assert(attempt_bulk_get(St#bulk_get_stats{ratio = 0.9}, RetryTime)).
+ ?assert(attempt_bulk_get(St#fetch_stats{ratio = 0.9}, RetryTime)).
update_bulk_get_ratio_test() ->
- Init = #bulk_get_stats{ratio = 0, tsec = 0},
+ Init = #fetch_stats{ratio = 0, tsec = 0},
+ Update = fun(St, Successes, Attempts) ->
+ update_bulk_get_ratio(St, Successes, Attempts, 0)
+ end,
+ Seq = lists:seq(1, 100),
% Almost all failures
- Fail = lists:foldl(
- fun(_, Acc) ->
- update_bulk_get_ratio(Acc, 1, 1000)
- end,
- Init,
- lists:seq(1, 100)
- ),
- ?assert(Fail#bulk_get_stats.ratio > 0.9),
+ Fail = lists:foldl(fun(_, Acc) -> Update(Acc, 1, 1000) end, Init, Seq),
+ ?assert(Fail#fetch_stats.ratio > 0.9),
% Almost all successes
- Success = lists:foldl(
- fun(_, Acc) ->
- update_bulk_get_ratio(Acc, 900, 1000)
- end,
- Init,
- lists:seq(1, 100)
- ),
- ?assert(Success#bulk_get_stats.ratio < 0.1),
+ Success = lists:foldl(fun(_, Acc) -> Update(Acc, 900, 1000) end, Init, Seq),
+ ?assert(Success#fetch_stats.ratio < 0.1),
% Half and half
- Half = lists:foldl(
- fun(_, Acc) ->
- update_bulk_get_ratio(Acc, 500, 1000)
- end,
- Init,
- lists:seq(1, 100)
- ),
- ?assert(Half#bulk_get_stats.ratio > 0.49),
- ?assert(Half#bulk_get_stats.ratio < 0.51),
+ Half = lists:foldl(fun(_, Acc) -> Update(Acc, 500, 1000) end, Init, Seq),
+ ?assert(Half#fetch_stats.ratio > 0.49),
+ ?assert(Half#fetch_stats.ratio < 0.51),
% Successes after failures
- FailThenSuccess = lists:foldl(
- fun(_, Acc) ->
- update_bulk_get_ratio(Acc, 1000, 1000)
- end,
- Fail,
- lists:seq(1, 100)
- ),
- ?assert(FailThenSuccess#bulk_get_stats.ratio < 0.1),
+ FailSuccess = lists:foldl(fun(_, Acc) -> Update(Acc, 1000, 1000) end, Fail, Seq),
+ ?assert(FailSuccess#fetch_stats.ratio < 0.1),
% Failures after success
- SuccessThenFailure = lists:foldl(
- fun(_, Acc) ->
- update_bulk_get_ratio(Acc, 0, 1000)
- end,
- Success,
- lists:seq(1, 100)
- ),
- ?assert(SuccessThenFailure#bulk_get_stats.ratio > 0.9),
+ SuccessFailure = lists:foldl(fun(_, Acc) -> Update(Acc, 0, 1000) end, Success, Seq),
+ ?assert(SuccessFailure#fetch_stats.ratio > 0.9),
% 0 attempts doesn't crash with a division by 0
- ZeroAttempts = update_bulk_get_ratio(Init, 0, 0),
- ?assertEqual(0.0, ZeroAttempts#bulk_get_stats.ratio).
+ ZeroAttempts = Update(Init, 0, 0),
+ ?assertEqual(0.0, ZeroAttempts#fetch_stats.ratio).
diff --git a/src/couch_replicator/test/eunit/couch_replicator_revs_diff_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_revs_diff_tests.erl
new file mode 100644
index 000000000..75f01b20a
--- /dev/null
+++ b/src/couch_replicator/test/eunit/couch_replicator_revs_diff_tests.erl
@@ -0,0 +1,70 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+-define(DOC_COUNT, 100).
+-define(BATCH_SIZE, 5).
+revs_diff_test_() ->
+ {
+ "Use _revs_diff when replicating",
+ {
+ foreach,
+ fun couch_replicator_test_helper:test_setup/0,
+ fun couch_replicator_test_helper:test_teardown/1,
+ [
+ ?TDEF_FE(use_revs_diff_when_most_docs_are_present, 15),
+ ?TDEF_FE(skip_revs_diff_when_most_docs_are_missing, 15)
+ ]
+ }
+ }.
+use_revs_diff_when_most_docs_are_present({_Ctx, {Source, Target}}) ->
+ populate_db(Source, ?DOC_COUNT),
+ populate_db(Target, ?DOC_COUNT),
+ meck:new(couch_replicator_api_wrap, [passthrough]),
+ replicate(Source, Target),
+ Calls = meck:num_calls(couch_replicator_api_wrap, get_missing_revs, 2),
+ ExpectAtLeast = ?DOC_COUNT / ?BATCH_SIZE,
+ ?assert(Calls >= ExpectAtLeast).
+skip_revs_diff_when_most_docs_are_missing({_Ctx, {Source, Target}}) ->
+ populate_db(Source, ?DOC_COUNT),
+ meck:new(couch_replicator_api_wrap, [passthrough]),
+ replicate(Source, Target),
+ Calls = meck:num_calls(couch_replicator_api_wrap, get_missing_revs, 2),
+ % This is not exact. But expect to skip at least half the revs_diffs calls.
+ ExpectAtMost = (?DOC_COUNT / ?BATCH_SIZE) / 2,
+ ?assert(Calls =< ExpectAtMost).
+populate_db(DbName, DocCount) ->
+ Fun = fun(Id, Acc) -> [#doc{id = integer_to_binary(Id)} | Acc] end,
+ Docs = lists:foldl(Fun, [], lists:seq(1, DocCount)),
+ {ok, _} = fabric:update_docs(DbName, Docs, [?ADMIN_CTX]).
+db_url(DbName) ->
+ couch_replicator_test_helper:cluster_db_url(DbName).
+replicate(Source, Target) ->
+ couch_replicator_test_helper:replicate(
+ {[
+ {<<"source">>, db_url(Source)},
+ {<<"target">>, db_url(Target)},
+ {<<"worker_processes">>, <<"1">>},
+ {<<"worker_batch_size">>, integer_to_binary(?BATCH_SIZE)}
+ ]}
+ ).