diff options
author | Nick Vatamaniuc <vatamane@apache.org> | 2022-09-06 02:46:56 -0400 |
---|---|---|
committer | Nick Vatamaniuc <nickva@users.noreply.github.com> | 2022-09-14 12:52:03 -0400 |
commit | 9fab3868de298b44b931a51d791b1d54bdc3c2ed (patch) | |
tree | ea5d6b513d68109b56f18a487059a2089c9f71f9 | |
parent | f1b7ce6370621634ebab4e27218654efac059472 (diff) | |
download | couchdb-9fab3868de298b44b931a51d791b1d54bdc3c2ed.tar.gz |
Statistically skip _revs_diff in the replicator
When the revisions are consistently missing from the target, for
example when replicating to a new target database, it's wasteful to
keep calling _revs_diff. Reuse the same algorithm used when skipping
calls to _bulk_get if there are too many attachments to statistically
start skipping calls to _revs_diff.
Also use the same record for keeping track of stats so rename
`bulk_get_stats` to `fetch_stats`.
The update and attempt functions are re-used as well, there are just
separate wrappers which pass in different values for the threshold and
other parameters.
-rw-r--r-- | src/couch_replicator/src/couch_replicator_worker.erl | 209 | ||||
-rw-r--r-- | src/couch_replicator/test/eunit/couch_replicator_revs_diff_tests.erl | 70 |
2 files changed, 192 insertions, 87 deletions
diff --git a/src/couch_replicator/src/couch_replicator_worker.erl b/src/couch_replicator/src/couch_replicator_worker.erl index 4df5d1c7c..d8f872388 100644 --- a/src/couch_replicator/src/couch_replicator_worker.erl +++ b/src/couch_replicator/src/couch_replicator_worker.erl @@ -28,9 +28,30 @@ -define(DOC_BUFFER_BYTE_SIZE, 512 * 1024). -define(STATS_DELAY_SEC, 10). -define(MISSING_DOC_RETRY_MSEC, 2000). + +% Coefficients for the bulk_get and revs_diffs stats processing. +% +% Threshold is the ratio limit. Valid ranges are from 0.0 to 1.0. When +% the estimated ratio is greater than the threshold, the "action" is +% performed (_bulk_get or _revs_diff API are used). If it's below the +% limit, the action is not performed. +% +% Decay is the "alpha" of the exponential moving average. Valid ranges +% are from 0.0 to 1.0. The closer to 1.0, the more the latest update +% takes effect; the closer to 0.0, the more historical values persist. +% +% Retries are the forced periodic retry intervals. Even if the +% estimated ratio indicates not to perform the action, periodically +% try to use the API anyway to update the stats with the latest +% information. Both values are at 37 seconds, which is a bit longer +% than the default checkpoint interval. +% -define(BULK_GET_RATIO_THRESHOLD, 0.5). -define(BULK_GET_RATIO_DECAY, 0.25). -define(BULK_GET_RETRY_SEC, 37). +-define(REVS_DIFF_RATIO_THRESHOLD, 0.95). +-define(REVS_DIFF_RATIO_DECAY, 0.4). +-define(REVS_DIFF_RETRY_SEC, 37). -import(couch_util, [ to_binary/1, @@ -57,7 +78,7 @@ batch = #batch{} }). --record(bulk_get_stats, { +-record(fetch_stats, { ratio = 0, tsec = 0 }). @@ -69,7 +90,8 @@ cp, changes_manager, use_bulk_get, - bulk_get_stats + bulk_get_stats = #fetch_stats{}, + revs_diff_stats = #fetch_stats{} }). start_link(Cp, #httpdb{} = Source, Target, ChangesManager, [_ | _] = Options) -> @@ -89,7 +111,8 @@ init({Cp, Source, Target, ChangesManager, Options}) -> parent = self(), changes_manager = ChangesManager, use_bulk_get = UseBulkGet, - bulk_get_stats = #bulk_get_stats{ratio = 0, tsec = NowSec} + bulk_get_stats = #fetch_stats{tsec = NowSec}, + revs_diff_stats = #fetch_stats{tsec = NowSec} }, State = #state{ cp = Cp, @@ -261,7 +284,8 @@ queue_fetch_loop(#fetch_st{} = St) -> parent = Parent, changes_manager = ChangesManager, use_bulk_get = UseBulkGet, - bulk_get_stats = BgSt + bulk_get_stats = BgSt, + revs_diff_stats = RdSt } = St, ChangesManager ! {get_changes, self()}, receive @@ -272,7 +296,7 @@ queue_fetch_loop(#fetch_st{} = St) -> queue_fetch_loop(St); {changes, ChangesManager, Changes, ReportSeq} -> % Find missing revisions (POST to _revs_diff) - IdRevs = find_missing(Changes, Target, Parent), + {IdRevs, RdSt1} = find_missing(Changes, Target, Parent, RdSt), {Docs, BgSt1} = bulk_get(UseBulkGet, Source, IdRevs, Parent, BgSt), % Documents without attachments can be uploaded right away BatchFun = fun({_, #doc{} = Doc}) -> @@ -287,7 +311,11 @@ queue_fetch_loop(#fetch_st{} = St) -> {ok, Stats} = gen_server:call(Parent, flush, infinity), ok = report_seq_done(Cp, ReportSeq, Stats), couch_log:debug("Worker reported completion of seq ~p", [ReportSeq]), - queue_fetch_loop(St#fetch_st{bulk_get_stats = BgSt1}) + St1 = St#fetch_st{ + bulk_get_stats = BgSt1, + revs_diff_stats = RdSt1 + }, + queue_fetch_loop(St1) end. % Return revisions without attachments. Maintain an exponential moving failure @@ -297,9 +325,9 @@ queue_fetch_loop(#fetch_st{} = St) -> % _bulk_get. After a few successful attempts that should lower the failure rate % enough to start allow using _bulk_get again. % -bulk_get(false, _Source, _IdRevs, _Parent, #bulk_get_stats{} = St) -> +bulk_get(false, _Source, _IdRevs, _Parent, #fetch_stats{} = St) -> {#{}, St}; -bulk_get(true, Source, IdRevs, Parent, #bulk_get_stats{} = St) -> +bulk_get(true, Source, IdRevs, Parent, #fetch_stats{} = St) -> NowSec = erlang:monotonic_time(second), case attempt_bulk_get(St, NowSec) of true -> @@ -311,8 +339,7 @@ bulk_get(true, Source, IdRevs, Parent, #bulk_get_stats{} = St) -> {bulk_get_attempts, Attempts} ]), ok = sum_stats(Parent, Stats), - St1 = update_bulk_get_ratio(St, Successes, Attempts), - {Docs, St1#bulk_get_stats{tsec = NowSec}}; + {Docs, update_bulk_get_ratio(St, Successes, Attempts, NowSec)}; false -> {#{}, St} end. @@ -332,24 +359,36 @@ bulk_get(#httpdb{} = Source, #{} = IdRevs) -> #{} end. -attempt_bulk_get(#bulk_get_stats{} = St, NowSec) -> - #bulk_get_stats{tsec = TSec, ratio = Ratio} = St, - TimeThreshold = (NowSec - TSec) > ?BULK_GET_RETRY_SEC, - RatioThreshold = Ratio =< ?BULK_GET_RATIO_THRESHOLD, +attempt_fetch(#fetch_stats{} = St, NowSec, RetryLimit, RatioLimit) -> + #fetch_stats{tsec = TSec, ratio = Ratio} = St, + TimeThreshold = (NowSec - TSec) > RetryLimit, + RatioThreshold = Ratio =< RatioLimit, TimeThreshold orelse RatioThreshold. +attempt_bulk_get(#fetch_stats{} = St, NowSec) -> + attempt_fetch(St, NowSec, ?BULK_GET_RETRY_SEC, ?BULK_GET_RATIO_THRESHOLD). + +attempt_revs_diff(#fetch_stats{} = St, NowSec) -> + attempt_fetch(St, NowSec, ?REVS_DIFF_RETRY_SEC, ?REVS_DIFF_RATIO_THRESHOLD). + % Update fail ratio. Use the basic exponential moving average formula to smooth % over minor bumps in case we encounter a few % attachments and then get back % to replicationg documents without attachments. % -update_bulk_get_ratio(#bulk_get_stats{} = St, Successes, Attempts) -> - #bulk_get_stats{ratio = Avg} = St, +update_fetch_stats(#fetch_stats{} = St, Successes, Attempts, Decay, NowSec) -> + #fetch_stats{ratio = Avg} = St, Ratio = case Attempts > 0 of true -> (Attempts - Successes) / Attempts; false -> 0 end, - St#bulk_get_stats{ratio = ?BULK_GET_RATIO_DECAY * (Ratio - Avg) + Avg}. + St#fetch_stats{ratio = Decay * (Ratio - Avg) + Avg, tsec = NowSec}. + +update_bulk_get_ratio(#fetch_stats{} = St, Successes, Attempts, NowSec) -> + update_fetch_stats(St, Successes, Attempts, ?BULK_GET_RATIO_DECAY, NowSec). + +update_revs_diff_ratio(#fetch_stats{} = St, Successes, Attempts, NowSec) -> + update_fetch_stats(St, Successes, Attempts, ?REVS_DIFF_RATIO_DECAY, NowSec). -spec spawn_doc_reader(#httpdb{}, #httpdb{}, {list(), list(), list()}) -> no_return(). spawn_doc_reader(Source, Target, FetchParams) -> @@ -574,8 +613,8 @@ flush_doc(Target, #doc{id = Id, revs = {Pos, [RevId | _]}} = Doc) -> {error, Err} end. -find_missing(DocInfos, Target, Parent) -> - {IdRevs, AllRevsCount} = lists:foldr( +find_missing(DocInfos, Target, Parent, #fetch_stats{} = St) -> + {IdRevs, AllCount} = lists:foldr( fun (#doc_info{revs = []}, {IdRevAcc, CountAcc}) -> {IdRevAcc, CountAcc}; @@ -586,27 +625,49 @@ find_missing(DocInfos, Target, Parent) -> {[], 0}, DocInfos ), - - Missing = - case couch_replicator_api_wrap:get_missing_revs(Target, IdRevs) of - {ok, Result} -> Result; - {error, Error} -> exit(Error) + NowSec = erlang:monotonic_time(second), + {MissingRes, St1} = + case attempt_revs_diff(St, NowSec) of + true -> + Missing = find_missing(IdRevs, Target), + % The target might have some of the revisions and those might have + % attachments associated with them, so only consider missing + % revisions with an empty "possible_ancestors" list. + FoldFun = fun + (_IdRev, _PAs = [_ | _], Acc) -> Acc; + (_IdRev, _PAs = [], Acc) -> Acc + 1 + end, + MissingWithoutPAs = maps:fold(FoldFun, 0, Missing), + % The "success" metric of the update algorithm is the number of + % revisions which are already on target. The higher the number - + % the higher the chance of calling _revs_diff. If it gets lower + % than a threshold, it's worth avoiding calling revs_diff since the + % target seems to be missing the majority of the revisions. + OnTarget = AllCount - MissingWithoutPAs, + {Missing, update_revs_diff_ratio(St, OnTarget, AllCount, NowSec)}; + false -> + % Construct the result to look as if _revs_diff returned with + % all missing revs. To reuse the existing id_revs_map/1 + % function, add the empty PAs list to IdRevs input. + MapFun = fun({Id, Revs}) -> {Id, Revs, []} end, + {id_rev_map(lists:map(MapFun, IdRevs)), St} end, - MissingRevsCount = lists:foldl( - fun({_Id, MissingRevs, _PAs}, Acc) -> Acc + length(MissingRevs) end, - 0, - Missing - ), - ok = sum_stats( - Parent, - couch_replicator_stats:new([ - {missing_checked, AllRevsCount}, - {missing_found, MissingRevsCount} - ]) - ), - % Turn {Id, [Rev1, Rev2, ...], PAs} into a map: - % #{{Id, Rev1} => PAs, {Id, Rev2} => PAs, ...} - id_rev_map(Missing). + Stats = couch_replicator_stats:new([ + {missing_checked, AllCount}, + {missing_found, map_size(MissingRes)} + ]), + ok = sum_stats(Parent, Stats), + {MissingRes, St1}. + +find_missing(Revs, Target) -> + case couch_replicator_api_wrap:get_missing_revs(Target, Revs) of + {ok, Missing} -> + % Turn {Id, [Rev1, Rev2, ...], PAs} into a map: + % #{{Id, Rev1} => PAs, {Id, Rev2} => PAs, ...} + id_rev_map(Missing); + {error, Error} -> + exit(Error) + end. id_rev_map(IdRevs) -> id_rev_map(IdRevs, #{}). @@ -655,68 +716,42 @@ replication_worker_format_status_test() -> bulk_get_attempt_test() -> Now = erlang:monotonic_time(second), - St = #bulk_get_stats{ratio = 0, tsec = Now}, - ?assert(attempt_bulk_get(St#bulk_get_stats{ratio = 0.1}, Now)), - ?assertNot(attempt_bulk_get(St#bulk_get_stats{ratio = 0.9}, Now)), + St = #fetch_stats{ratio = 0, tsec = Now}, + ?assert(attempt_bulk_get(St#fetch_stats{ratio = 0.1}, Now)), + ?assertNot(attempt_bulk_get(St#fetch_stats{ratio = 0.9}, Now)), RetryTime = Now + ?BULK_GET_RETRY_SEC + 1, - ?assert(attempt_bulk_get(St#bulk_get_stats{ratio = 0.9}, RetryTime)). + ?assert(attempt_bulk_get(St#fetch_stats{ratio = 0.9}, RetryTime)). update_bulk_get_ratio_test() -> - Init = #bulk_get_stats{ratio = 0, tsec = 0}, + Init = #fetch_stats{ratio = 0, tsec = 0}, + Update = fun(St, Successes, Attempts) -> + update_bulk_get_ratio(St, Successes, Attempts, 0) + end, + Seq = lists:seq(1, 100), % Almost all failures - Fail = lists:foldl( - fun(_, Acc) -> - update_bulk_get_ratio(Acc, 1, 1000) - end, - Init, - lists:seq(1, 100) - ), - ?assert(Fail#bulk_get_stats.ratio > 0.9), + Fail = lists:foldl(fun(_, Acc) -> Update(Acc, 1, 1000) end, Init, Seq), + ?assert(Fail#fetch_stats.ratio > 0.9), % Almost all successes - Success = lists:foldl( - fun(_, Acc) -> - update_bulk_get_ratio(Acc, 900, 1000) - end, - Init, - lists:seq(1, 100) - ), - ?assert(Success#bulk_get_stats.ratio < 0.1), + Success = lists:foldl(fun(_, Acc) -> Update(Acc, 900, 1000) end, Init, Seq), + ?assert(Success#fetch_stats.ratio < 0.1), % Half and half - Half = lists:foldl( - fun(_, Acc) -> - update_bulk_get_ratio(Acc, 500, 1000) - end, - Init, - lists:seq(1, 100) - ), - ?assert(Half#bulk_get_stats.ratio > 0.49), - ?assert(Half#bulk_get_stats.ratio < 0.51), + Half = lists:foldl(fun(_, Acc) -> Update(Acc, 500, 1000) end, Init, Seq), + ?assert(Half#fetch_stats.ratio > 0.49), + ?assert(Half#fetch_stats.ratio < 0.51), % Successes after failures - FailThenSuccess = lists:foldl( - fun(_, Acc) -> - update_bulk_get_ratio(Acc, 1000, 1000) - end, - Fail, - lists:seq(1, 100) - ), - ?assert(FailThenSuccess#bulk_get_stats.ratio < 0.1), + FailSuccess = lists:foldl(fun(_, Acc) -> Update(Acc, 1000, 1000) end, Fail, Seq), + ?assert(FailSuccess#fetch_stats.ratio < 0.1), % Failures after success - SuccessThenFailure = lists:foldl( - fun(_, Acc) -> - update_bulk_get_ratio(Acc, 0, 1000) - end, - Success, - lists:seq(1, 100) - ), - ?assert(SuccessThenFailure#bulk_get_stats.ratio > 0.9), + SuccessFailure = lists:foldl(fun(_, Acc) -> Update(Acc, 0, 1000) end, Success, Seq), + ?assert(SuccessFailure#fetch_stats.ratio > 0.9), % 0 attempts doesn't crash with a division by 0 - ZeroAttempts = update_bulk_get_ratio(Init, 0, 0), - ?assertEqual(0.0, ZeroAttempts#bulk_get_stats.ratio). + ZeroAttempts = Update(Init, 0, 0), + ?assertEqual(0.0, ZeroAttempts#fetch_stats.ratio). -endif. diff --git a/src/couch_replicator/test/eunit/couch_replicator_revs_diff_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_revs_diff_tests.erl new file mode 100644 index 000000000..75f01b20a --- /dev/null +++ b/src/couch_replicator/test/eunit/couch_replicator_revs_diff_tests.erl @@ -0,0 +1,70 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_replicator_revs_diff_tests). + +-include_lib("couch/include/couch_eunit.hrl"). +-include_lib("couch/include/couch_db.hrl"). +-include("couch_replicator_test.hrl"). + +-define(DOC_COUNT, 100). +-define(BATCH_SIZE, 5). + +revs_diff_test_() -> + { + "Use _revs_diff when replicating", + { + foreach, + fun couch_replicator_test_helper:test_setup/0, + fun couch_replicator_test_helper:test_teardown/1, + [ + ?TDEF_FE(use_revs_diff_when_most_docs_are_present, 15), + ?TDEF_FE(skip_revs_diff_when_most_docs_are_missing, 15) + ] + } + }. + +use_revs_diff_when_most_docs_are_present({_Ctx, {Source, Target}}) -> + populate_db(Source, ?DOC_COUNT), + populate_db(Target, ?DOC_COUNT), + meck:new(couch_replicator_api_wrap, [passthrough]), + replicate(Source, Target), + Calls = meck:num_calls(couch_replicator_api_wrap, get_missing_revs, 2), + ExpectAtLeast = ?DOC_COUNT / ?BATCH_SIZE, + ?assert(Calls >= ExpectAtLeast). + +skip_revs_diff_when_most_docs_are_missing({_Ctx, {Source, Target}}) -> + populate_db(Source, ?DOC_COUNT), + meck:new(couch_replicator_api_wrap, [passthrough]), + replicate(Source, Target), + Calls = meck:num_calls(couch_replicator_api_wrap, get_missing_revs, 2), + % This is not exact. But expect to skip at least half the revs_diffs calls. + ExpectAtMost = (?DOC_COUNT / ?BATCH_SIZE) / 2, + ?assert(Calls =< ExpectAtMost). + +populate_db(DbName, DocCount) -> + Fun = fun(Id, Acc) -> [#doc{id = integer_to_binary(Id)} | Acc] end, + Docs = lists:foldl(Fun, [], lists:seq(1, DocCount)), + {ok, _} = fabric:update_docs(DbName, Docs, [?ADMIN_CTX]). + +db_url(DbName) -> + couch_replicator_test_helper:cluster_db_url(DbName). + +replicate(Source, Target) -> + couch_replicator_test_helper:replicate( + {[ + {<<"source">>, db_url(Source)}, + {<<"target">>, db_url(Target)}, + {<<"worker_processes">>, <<"1">>}, + {<<"worker_batch_size">>, integer_to_binary(?BATCH_SIZE)} + ]} + ). |