diff options
author | Nick Vatamaniuc <vatamane@gmail.com> | 2022-08-18 21:00:13 -0400 |
---|---|---|
committer | Nick Vatamaniuc <nickva@users.noreply.github.com> | 2022-08-30 21:02:31 -0400 |
commit | be93983b927b543e1728c5d50a66429e2c88d229 (patch) | |
tree | 9bd8e35ec282c2e37a38d980918a7e4289976866 | |
parent | d4c7273e706d5121794fc55ee2633af7f7f02543 (diff) | |
download | couchdb-be93983b927b543e1728c5d50a66429e2c88d229.tar.gz |
Implement _bulk_get support for the replicator
By now most of the CouchDB implementations support `_bulk_get`, so
let's update the replicator to take advantage of that.
To be backwards compatible assume some endpoints will not support
`_bulk_get` and may return either a 500 or 400 error. In that case the
replicator will fall back to fetching individual document revisions
like it did previously. For additional backward compatibility, and to
keep things simple, support only the `application/json` `_bulk_get`
response format. (Ideally, we'd send multiple Accept headers with
various `q` preference parameters for `json` and `multipart/related`
content, then do the right thing based on the response, however, none
of the recent Apache CouchDB implementations support that scheme
properly).
Since fetching attachments with application/json response is not
optimal, attachments are fetched individually. This means there are
two main reasons for the replicator to fall back to fetching
individual revisions: 1) when _bulk_get endpoint is not supported and
2) when the document revisions contain attachments.
To avoid wasting resource repeatedly attempting to use `_bulk_get `and
then falling back to individual doc fetches, maintain some historical
stats about the rate of failure, and if it crosses a threshold, skip
calling `_bulk_get` altogether. This is implemented with a moving
exponential average, along with periodic probing to see if `_bulk_get`
usage becomes viable again.
To give the users some indication about how successful `_bulk_get`
usage is, introduce two replication statistics parameters:
* `bulk_get_attempts`: _bulk_get document revisions attempts made.
* `bulk_get_docs` : `_bulk_get` document revisions successfully retrieved.
These are persisted in the replication checkpoints along with the rest
of the job statistics and visible in `_scheduler/jobs` and
`_active_tasks` output.
Since we updated the replication job statistics, perform some minor
cleanups in that area:
- Stop using the process dictionary for the reporting timestamp. Use
a regular record state field instead.
- Use casts instead of a calls when possible. We still rely on
report_seq_done calls as a synchronization point to make sure we
don't overrun the message queues for the replication worker and
scheduler job process.
- Add stats update API functions instead of relying on naked
`gen_server` calls and casts. The functions make it clear which
process is being updated: the replication worker or the main
replication scheduler job process.
For testing, rely on the variety of existing replication tests running
and passing. The recently merged replication test overhaul from [pull
the tests form using the node-local (back-end API) to chttpd (the
cluster API), which actually implements `_bulk_get`. In this way, the
majority of replication tests should test the `_bulk_get` API usage
alongside whatever else they are testing. There there is new test
checking that `_bulk_get` fallback works and testing the
characteristics of the new statistics parameters.
8 files changed, 382 insertions, 77 deletions
diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl b/src/couch_replicator/src/couch_replicator_api_wrap.erl index 3c956c8de..a6e39cb02 100644 --- a/src/couch_replicator/src/couch_replicator_api_wrap.erl +++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl @@ -35,6 +35,7 @@ update_docs/4, ensure_full_commit/1, get_missing_revs/2, + bulk_get/3, open_doc/3, open_doc_revs/6, changes_since/5, @@ -206,6 +207,74 @@ get_missing_revs(#httpdb{} = Db, IdRevs) -> end ). +bulk_get(#httpdb{} = Db, #{} = IdRevs, Options) -> + FoldFun = fun({Id, Rev}, PAs, Acc) -> [{Id, Rev, PAs} | Acc] end, + ReqDocsList = lists:sort(maps:fold(FoldFun, [], IdRevs)), + MapFun = fun({Id, Rev, PAs}) -> + #{ + <<"id">> => Id, + <<"rev">> => couch_doc:rev_to_str(Rev), + <<"atts_since">> => couch_doc:revs_to_strs(PAs) + } + end, + ReqDocsMaps = lists:map(MapFun, ReqDocsList), + % We are also sending request parameters in the doc body with the hopes + % that at some point in the future we could make that the default, instead + % of having to send query parameters with a POST request as we do today + Body = options_to_json_map(Options, #{<<"docs">> => ReqDocsMaps}), + Req = [ + {method, post}, + {path, "_bulk_get"}, + {qs, options_to_query_args(Options, [])}, + {body, ?JSON_ENCODE(Body)}, + {headers, [ + {"Content-Type", "application/json"}, + {"Accept", "application/json"} + ]} + ], + try + send_req(Db, Req, fun + (200, _, {[{<<"results">>, Res}]}) when is_list(Res) -> + Zip = lists:zipwith(fun bulk_get_zip/2, ReqDocsList, Res), + {ok, maps:from_list(Zip)}; + (200, _, _) -> + {error, {bulk_get_failed, invalid_results}}; + (ErrCode, _, _) when is_integer(ErrCode) -> + % On older Apache CouchDB instances where _bulk_get is not + % implemented we would hit the POST db/doc form uploader + % handler. When that fails the request body is not consumed and + % we'd end up recycling a worker with an unsent body in the + % connection stream. Instead of waiting for it to blow up + % eventually and consuming an extra retry attempt, proactively + % advise httpc logic to stop this worker and not return back to + % the pool. + couch_replicator_httpc:stop_http_worker(), + {error, {bulk_get_failed, ErrCode}} + end) + catch + exit:{http_request_failed, _, _, {error, {code, ErrCode}}} -> + % We are being a bit more tolerant of _bulk_get errors as we can + % always fallback to individual fetches + {error, {bulk_get_failed, ErrCode}} + end. + +bulk_get_zip({Id, Rev, _}, {[_ | _] = Props}) -> + Docs = couch_util:get_value(<<"docs">>, Props), + ResId = couch_util:get_value(<<"id">>, Props), + % "docs" is a one item list, either [{"ok": Doc}] or [{"error": Error}] + case Docs of + [{[{<<"ok">>, {[_ | _]} = Doc}]}] when ResId =:= Id -> + {{Id, Rev}, couch_doc:from_json_obj(Doc)}; + [{[{<<"error">>, {[_ | _] = Err}}]}] when ResId =:= Id -> + Tag = couch_util:get_value(<<"error">>, Err), + Reason = couch_util:get_value(<<"reason">>, Err), + couch_log:debug("~p bulk_get zip error ~p:~p", [?MODULE, Tag, Reason]), + {{Id, Rev}, {error, {Tag, Reason}}}; + Other -> + couch_log:debug("~p bulk_get zip other error:~p", [?MODULE, Other]), + {{Id, Rev}, {error, {unexpected_bulk_get_response, Other}}} + end. + -spec open_doc_revs(#httpdb{}, binary(), list(), list(), function(), any()) -> no_return(). open_doc_revs(#httpdb{retries = 0} = HttpDb, Id, Revs, Options, _Fun, _Acc) -> Path = encode_doc_id(Id), @@ -647,7 +716,19 @@ options_to_query_args([latest | Rest], Acc) -> options_to_query_args(Rest, [{"latest", "true"} | Acc]); options_to_query_args([{open_revs, Revs} | Rest], Acc) -> JsonRevs = ?b2l(iolist_to_binary(?JSON_ENCODE(couch_doc:revs_to_strs(Revs)))), - options_to_query_args(Rest, [{"open_revs", JsonRevs} | Acc]). + options_to_query_args(Rest, [{"open_revs", JsonRevs} | Acc]); +options_to_query_args([{attachments, Bool} | Rest], Acc) when is_atom(Bool) -> + BoolStr = atom_to_list(Bool), + options_to_query_args(Rest, [{"attachments", BoolStr} | Acc]). + +options_to_json_map([], #{} = Acc) -> + Acc; +options_to_json_map([latest | Rest], #{} = Acc) -> + options_to_json_map(Rest, Acc#{<<"latest">> => true}); +options_to_json_map([revs | Rest], #{} = Acc) -> + options_to_json_map(Rest, Acc#{<<"revs">> => true}); +options_to_json_map([{attachments, Bool} | Rest], #{} = Acc) when is_atom(Bool) -> + options_to_json_map(Rest, Acc#{<<"attachments">> => Bool}). atts_since_arg(_UrlLen, [], _MaxLen, Acc) -> lists:reverse(Acc); diff --git a/src/couch_replicator/src/couch_replicator_changes_reader.erl b/src/couch_replicator/src/couch_replicator_changes_reader.erl index 7fa8c26c2..bb6733608 100644 --- a/src/couch_replicator/src/couch_replicator_changes_reader.erl +++ b/src/couch_replicator/src/couch_replicator_changes_reader.erl @@ -119,7 +119,7 @@ process_change(#doc_info{id = Id} = DocInfo, {Parent, Db, ChangesQueue, _}) -> [Id, SourceDb] ), Stats = couch_replicator_stats:new([{doc_write_failures, 1}]), - ok = gen_server:call(Parent, {add_stats, Stats}, infinity); + ok = couch_replicator_scheduler_job:sum_stats(Parent, Stats); false -> ok = couch_work_queue:queue(ChangesQueue, DocInfo), put(last_seq, DocInfo#doc_info.high_seq) diff --git a/src/couch_replicator/src/couch_replicator_httpc.erl b/src/couch_replicator/src/couch_replicator_httpc.erl index c6f22468d..cd5e4d75d 100644 --- a/src/couch_replicator/src/couch_replicator_httpc.erl +++ b/src/couch_replicator/src/couch_replicator_httpc.erl @@ -18,6 +18,7 @@ -export([setup/1]). -export([send_req/3]). +-export([stop_http_worker/0]). -export([full_url/2]). -import(couch_util, [ @@ -102,6 +103,9 @@ send_req(HttpDb, Params1, Callback) -> Ret end. +stop_http_worker() -> + put(?STOP_HTTP_WORKER, stop). + send_ibrowse_req(#httpdb{headers = BaseHeaders} = HttpDb0, Params) -> Method = get_value(method, Params, get), UserHeaders = get_value(headers, Params, []), diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl index 2ae8718ad..38de8a45a 100644 --- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl +++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl @@ -25,7 +25,9 @@ handle_info/2, handle_cast/2, code_change/3, - format_status/2 + format_status/2, + sum_stats/2, + report_seq_done/3 ]). -include_lib("couch/include/couch_db.hrl"). @@ -181,20 +183,13 @@ do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx = UserCtx} = Rep) - workers = Workers }}. -handle_call({add_stats, Stats}, From, State) -> - gen_server:reply(From, ok), - NewStats = couch_replicator_utils:sum_stats(State#rep_state.stats, Stats), - {noreply, State#rep_state{stats = NewStats}}; -handle_call( - {report_seq_done, Seq, StatsInc}, - From, +handle_call({report_seq_done, Seq, StatsInc}, From, State) -> #rep_state{ seqs_in_progress = SeqsInProgress, highest_seq_done = HighestDone, current_through_seq = ThroughSeq, stats = Stats - } = State -) -> + } = State, gen_server:reply(From, ok), {NewThroughSeq0, NewSeqsInProgress} = case SeqsInProgress of @@ -237,6 +232,9 @@ handle_call( update_task(NewState), {noreply, NewState}. +handle_cast({sum_stats, Stats}, State) -> + NewStats = couch_replicator_utils:sum_stats(State#rep_state.stats, Stats), + {noreply, State#rep_state{stats = NewStats}}; handle_cast(checkpoint, State) -> case do_checkpoint(State) of {ok, NewState} -> @@ -467,6 +465,12 @@ format_status(_Opt, [_PDict, State]) -> {highest_seq_done, HighestSeqDone} ]. +sum_stats(Pid, Stats) when is_pid(Pid) -> + gen_server:cast(Pid, {sum_stats, Stats}). + +report_seq_done(Pid, ReportSeq, Stats) when is_pid(Pid) -> + gen_server:call(Pid, {report_seq_done, ReportSeq, Stats}, infinity). + startup_jitter() -> Jitter = config:get_integer( "replicator", @@ -792,7 +796,9 @@ do_checkpoint(State) -> {<<"missing_found">>, couch_replicator_stats:missing_found(Stats)}, {<<"docs_read">>, couch_replicator_stats:docs_read(Stats)}, {<<"docs_written">>, couch_replicator_stats:docs_written(Stats)}, - {<<"doc_write_failures">>, couch_replicator_stats:doc_write_failures(Stats)} + {<<"doc_write_failures">>, couch_replicator_stats:doc_write_failures(Stats)}, + {<<"bulk_get_docs">>, couch_replicator_stats:bulk_get_docs(Stats)}, + {<<"bulk_get_attempts">>, couch_replicator_stats:bulk_get_attempts(Stats)} ]}, BaseHistory = [ @@ -1056,6 +1062,8 @@ rep_stats(State) -> {docs_written, couch_replicator_stats:docs_written(Stats)}, {changes_pending, get_pending_count(State)}, {doc_write_failures, couch_replicator_stats:doc_write_failures(Stats)}, + {bulk_get_docs, couch_replicator_stats:bulk_get_docs(Stats)}, + {bulk_get_attempts, couch_replicator_stats:bulk_get_attempts(Stats)}, {checkpointed_source_seq, CommittedSeq} ]. diff --git a/src/couch_replicator/src/couch_replicator_stats.erl b/src/couch_replicator/src/couch_replicator_stats.erl index e1f23a1bc..ff8f1b62f 100644 --- a/src/couch_replicator/src/couch_replicator_stats.erl +++ b/src/couch_replicator/src/couch_replicator_stats.erl @@ -26,7 +26,9 @@ missing_found/1, docs_read/1, docs_written/1, - doc_write_failures/1 + doc_write_failures/1, + bulk_get_docs/1, + bulk_get_attempts/1 ]). new() -> @@ -51,6 +53,12 @@ docs_written(Stats) -> doc_write_failures(Stats) -> get(doc_write_failures, Stats). +bulk_get_docs(Stats) -> + get(bulk_get_docs, Stats). + +bulk_get_attempts(Stats) -> + get(bulk_get_attempts, Stats). + get(Field, Stats) -> case orddict:find(Field, Stats) of {ok, Value} -> @@ -84,4 +92,8 @@ fmap({docs_written, _}) -> true; fmap({<<"docs_written">>, V}) -> {true, {docs_written, V}}; fmap({doc_write_failures, _}) -> true; fmap({<<"doc_write_failures">>, V}) -> {true, {doc_write_failures, V}}; +fmap({bulk_get_docs, _}) -> true; +fmap({<<"bulk_get_docs">>, V}) -> {true, {bulk_get_docs, V}}; +fmap({bulk_get_attempts, _}) -> true; +fmap({<<"bulk_get_attempts">>, V}) -> {true, {bulk_get_attempts, V}}; fmap({_, _}) -> false. diff --git a/src/couch_replicator/src/couch_replicator_worker.erl b/src/couch_replicator/src/couch_replicator_worker.erl index 3c6d6d040..94e3e028b 100644 --- a/src/couch_replicator/src/couch_replicator_worker.erl +++ b/src/couch_replicator/src/couch_replicator_worker.erl @@ -25,13 +25,12 @@ -include_lib("couch/include/couch_db.hrl"). -include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl"). -% TODO: maybe make both buffer max sizes configurable - -% for remote targets -define(DOC_BUFFER_BYTE_SIZE, 512 * 1024). -% 10 seconds (in microseconds) --define(STATS_DELAY, 10000000). +-define(STATS_DELAY_SEC, 10). -define(MISSING_DOC_RETRY_MSEC, 2000). +-define(BULK_GET_RATIO_THRESHOLD, 0.5). +-define(BULK_GET_RATIO_DECAY, 0.25). +-define(BULK_GET_RETRY_SEC, 37). -import(couch_util, [ to_binary/1, @@ -54,9 +53,24 @@ pending_fetch = nil, flush_waiter = nil, stats = couch_replicator_stats:new(), + last_stats_report_sec = 0, batch = #batch{} }). +-record(bulk_get_stats, { + ratio = 0, + tsec = 0 +}). + +-record(fetch_st, { + source, + target, + parent, + cp, + changes_manager, + bulk_get_stats +}). + start_link(Cp, #httpdb{} = Source, Target, ChangesManager, MaxConns) -> gen_server:start_link( ?MODULE, {Cp, Source, Target, ChangesManager, MaxConns}, [] @@ -64,17 +78,22 @@ start_link(Cp, #httpdb{} = Source, Target, ChangesManager, MaxConns) -> init({Cp, Source, Target, ChangesManager, MaxConns}) -> process_flag(trap_exit, true), - Parent = self(), - LoopPid = spawn_link(fun() -> - queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager) - end), - erlang:put(last_stats_report, os:timestamp()), + NowSec = erlang:monotonic_time(second), + FetchSt = #fetch_st{ + cp = Cp, + source = Source, + target = Target, + parent = self(), + changes_manager = ChangesManager, + bulk_get_stats = #bulk_get_stats{ratio = 0, tsec = NowSec} + }, State = #state{ cp = Cp, max_parallel_conns = MaxConns, - loop = LoopPid, + loop = spawn_link(fun() -> queue_fetch_loop(FetchSt) end), source = Source, - target = Target + target = Target, + last_stats_report_sec = NowSec }, {ok, State}. @@ -106,11 +125,6 @@ handle_call( handle_call({batch_doc, Doc}, From, State) -> gen_server:reply(From, ok), {noreply, maybe_flush_docs(Doc, State)}; -handle_call({add_stats, IncStats}, From, #state{stats = Stats} = State) -> - gen_server:reply(From, ok), - NewStats = couch_replicator_utils:sum_stats(Stats, IncStats), - NewStats2 = maybe_report_stats(State#state.cp, NewStats), - {noreply, State#state{stats = NewStats2}}; handle_call( flush, {Pid, _} = From, @@ -131,8 +145,11 @@ handle_call( end, {noreply, State2#state{flush_waiter = From}}. +handle_cast({sum_stats, IncStats}, #state{stats = Stats} = State) -> + SumStats = couch_replicator_utils:sum_stats(Stats, IncStats), + {noreply, maybe_report_stats(State#state{stats = SumStats})}; handle_cast(Msg, State) -> - {stop, {unexpected_async_call, Msg}, State}. + {stop, {unexpected_cast, Msg}, State}. handle_info({'EXIT', Pid, normal}, #state{loop = Pid} = State) -> #state{ @@ -188,6 +205,8 @@ handle_info({'EXIT', _Pid, max_backoff}, State) -> {stop, {shutdown, max_backoff}, State}; handle_info({'EXIT', _Pid, {bulk_docs_failed, _, _} = Err}, State) -> {stop, {shutdown, Err}, State}; +handle_info({'EXIT', _Pid, {bulk_get_failed, _, _} = Err}, State) -> + {stop, {shutdown, Err}, State}; handle_info({'EXIT', _Pid, {revs_diff_failed, _, _} = Err}, State) -> {stop, {shutdown, Err}, State}; handle_info({'EXIT', _Pid, {http_request_failed, _, _, _} = Err}, State) -> @@ -221,40 +240,109 @@ format_status(_Opt, [_PDict, State]) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager) -> +sum_stats(Pid, Stats) when is_pid(Pid) -> + ok = gen_server:cast(Pid, {sum_stats, Stats}). + +report_seq_done(Cp, Seq) -> + ok = report_seq_done(Cp, Seq, couch_replicator_stats:new()). + +report_seq_done(Cp, Seq, Stats) -> + ok = couch_replicator_scheduler_job:report_seq_done(Cp, Seq, Stats). + +queue_fetch_loop(#fetch_st{} = St) -> + #fetch_st{ + cp = Cp, + source = Source, + target = Target, + parent = Parent, + changes_manager = ChangesManager, + bulk_get_stats = BgSt + } = St, ChangesManager ! {get_changes, self()}, receive {closed, ChangesManager} -> ok; {changes, ChangesManager, [], ReportSeq} -> - Stats = couch_replicator_stats:new(), - ok = gen_server:call(Cp, {report_seq_done, ReportSeq, Stats}, infinity), - queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager); + ok = report_seq_done(Cp, ReportSeq), + queue_fetch_loop(St); {changes, ChangesManager, Changes, ReportSeq} -> - {IdRevs, Stats0} = find_missing(Changes, Target), - ok = gen_server:call(Parent, {add_stats, Stats0}, infinity), - remote_process_batch(IdRevs, Parent), + % Find missing revisions (POST to _revs_diff) + IdRevs = find_missing(Changes, Target, Parent), + {Docs, BgSt1} = bulk_get(Source, IdRevs, Parent, BgSt), + % Documents without attachments can be uploaded right away + BatchFun = fun({_, #doc{} = Doc}) -> + ok = gen_server:call(Parent, {batch_doc, Doc}, infinity) + end, + lists:foreach(BatchFun, lists:sort(maps:to_list(Docs))), + % Fetch individually if _bulk_get failed or there are attachments + FetchFun = fun({Id, Rev}, PAs) -> + ok = gen_server:call(Parent, {fetch_doc, {Id, [Rev], PAs}}, infinity) + end, + maps:map(FetchFun, maps:without(maps:keys(Docs), IdRevs)), {ok, Stats} = gen_server:call(Parent, flush, infinity), - ok = gen_server:call(Cp, {report_seq_done, ReportSeq, Stats}, infinity), - erlang:put(last_stats_report, os:timestamp()), + ok = report_seq_done(Cp, ReportSeq, Stats), couch_log:debug("Worker reported completion of seq ~p", [ReportSeq]), - queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager) + queue_fetch_loop(St#fetch_st{bulk_get_stats = BgSt1}) end. -remote_process_batch([], _Parent) -> - ok; -remote_process_batch([{Id, Revs, PAs} | Rest], Parent) -> - % When the source is a remote database, we fetch a single document revision - % per HTTP request. This is mostly to facilitate retrying of HTTP requests - % due to network transient failures. It also helps not exceeding the maximum - % URL length allowed by proxies and Mochiweb. - lists:foreach( - fun(Rev) -> - ok = gen_server:call(Parent, {fetch_doc, {Id, [Rev], PAs}}, infinity) +% Return revisions without attachments. Maintain an exponential moving failure +% ratio. When the ratio becomes greater than the threshold, skip calling +% bulk_get altogether. To avoid getting permanently stuck with a high failure +% ratio after replicating lots of attachments, periodically attempt to use +% _bulk_get. After a few successful attempts that should lower the failure rate +% enough to start allow using _bulk_get again. +% +bulk_get(Source, IdRevs, Parent, #bulk_get_stats{} = St) -> + NowSec = erlang:monotonic_time(second), + case attempt_bulk_get(St, NowSec) of + true -> + Docs = bulk_get(Source, IdRevs), + Attempts = map_size(IdRevs), + Successes = map_size(Docs), + Stats = couch_replicator_stats:new([ + {bulk_get_docs, Successes}, + {bulk_get_attempts, Attempts} + ]), + ok = sum_stats(Parent, Stats), + St1 = update_bulk_get_ratio(St, Successes, Attempts), + {Docs, St1#bulk_get_stats{tsec = NowSec}}; + false -> + {#{}, St} + end. + +bulk_get(#httpdb{} = Source, #{} = IdRevs) -> + Opts = [latest, revs, {attachments, false}], + case couch_replicator_api_wrap:bulk_get(Source, IdRevs, Opts) of + {ok, #{} = Docs} -> + FilterFun = fun + (_, #doc{atts = []}) -> true; + (_, #doc{atts = [_ | _]}) -> false; + (_, {error, _}) -> false + end, + maps:filter(FilterFun, Docs); + {error, Error} -> + couch_log:debug("_bulk_get failed ~p", [Error]), + #{} + end. + +attempt_bulk_get(#bulk_get_stats{} = St, NowSec) -> + #bulk_get_stats{tsec = TSec, ratio = Ratio} = St, + TimeThreshold = (NowSec - TSec) > ?BULK_GET_RETRY_SEC, + RatioThreshold = Ratio =< ?BULK_GET_RATIO_THRESHOLD, + TimeThreshold orelse RatioThreshold. + +% Update fail ratio. Use the basic exponential moving average formula to smooth +% over minor bumps in case we encounter a few % attachments and then get back +% to replicationg documents without attachments. +% +update_bulk_get_ratio(#bulk_get_stats{} = St, Successes, Attempts) -> + #bulk_get_stats{ratio = Avg} = St, + Ratio = + case Attempts > 0 of + true -> (Attempts - Successes) / Attempts; + false -> 0 end, - Revs - ), - remote_process_batch(Rest, Parent). + St#bulk_get_stats{ratio = ?BULK_GET_RATIO_DECAY * (Ratio - Avg) + Avg}. -spec spawn_doc_reader(#httpdb{}, #httpdb{}, {list(), list(), list()}) -> no_return(). spawn_doc_reader(Source, Target, FetchParams) -> @@ -331,7 +419,7 @@ doc_handler_flush_doc(#doc{} = Doc, {Parent, Target} = Acc) -> false -> {{skip, Acc}, couch_replicator_stats:increment(doc_write_failures, Stats)} end, - ok = gen_server:call(Parent, {add_stats, Stats2}, infinity), + ok = sum_stats(Parent, Stats2), Result. spawn_writer(Target, #batch{docs = DocList, size = Size}) -> @@ -345,32 +433,30 @@ spawn_writer(Target, #batch{docs = DocList, size = Size}) -> spawn_link( fun() -> Stats = flush_docs(Target, DocList), - ok = gen_server:call(Parent, {add_stats, Stats}, infinity) + ok = sum_stats(Parent, Stats) end ). after_full_flush(#state{stats = Stats, flush_waiter = Waiter} = State) -> gen_server:reply(Waiter, {ok, Stats}), - erlang:put(last_stats_report, os:timestamp()), State#state{ stats = couch_replicator_stats:new(), flush_waiter = nil, writer = nil, - batch = #batch{} + batch = #batch{}, + last_stats_report_sec = erlang:monotonic_time(second) }. maybe_flush_docs(Doc, State) -> #state{ target = Target, batch = Batch, - stats = Stats, - cp = Cp + stats = Stats } = State, {Batch2, WStats} = maybe_flush_docs(Target, Batch, Doc), Stats2 = couch_replicator_stats:sum_stats(Stats, WStats), Stats3 = couch_replicator_stats:increment(docs_read, Stats2), - Stats4 = maybe_report_stats(Cp, Stats3), - State#state{stats = Stats4, batch = Batch2}. + maybe_report_stats(State#state{stats = Stats3, batch = Batch2}). maybe_flush_docs(#httpdb{} = Target, Batch, Doc) -> #batch{docs = DocAcc, size = SizeAcc} = Batch, @@ -481,7 +567,7 @@ flush_doc(Target, #doc{id = Id, revs = {Pos, [RevId | _]}} = Doc) -> {error, Err} end. -find_missing(DocInfos, Target) -> +find_missing(DocInfos, Target, Parent) -> {IdRevs, AllRevsCount} = lists:foldr( fun (#doc_info{revs = []}, {IdRevAcc, CountAcc}) -> @@ -504,21 +590,37 @@ find_missing(DocInfos, Target) -> 0, Missing ), - Stats = couch_replicator_stats:new([ - {missing_checked, AllRevsCount}, - {missing_found, MissingRevsCount} - ]), - {Missing, Stats}. - -maybe_report_stats(Cp, Stats) -> - Now = os:timestamp(), - case timer:now_diff(erlang:get(last_stats_report), Now) >= ?STATS_DELAY of + ok = sum_stats( + Parent, + couch_replicator_stats:new([ + {missing_checked, AllRevsCount}, + {missing_found, MissingRevsCount} + ]) + ), + % Turn {Id, [Rev1, Rev2, ...], PAs} into a map: + % #{{Id, Rev1} => PAs, {Id, Rev2} => PAs, ...} + id_rev_map(Missing). + +id_rev_map(IdRevs) -> + id_rev_map(IdRevs, #{}). + +id_rev_map([], #{} = Acc) -> + Acc; +id_rev_map([{_, [], _} | Docs], #{} = Acc) -> + id_rev_map(Docs, Acc); +id_rev_map([{Id, [Rev | Revs], PAs} | Docs], #{} = Acc) -> + id_rev_map([{Id, Revs, PAs} | Docs], Acc#{{Id, Rev} => PAs}). + +maybe_report_stats(#state{} = State) -> + #state{cp = Cp, stats = Stats, last_stats_report_sec = LastReport} = State, + Now = erlang:monotonic_time(second), + case Now - LastReport >= ?STATS_DELAY_SEC of true -> - ok = gen_server:call(Cp, {add_stats, Stats}, infinity), - erlang:put(last_stats_report, Now), - couch_replicator_stats:new(); + ok = couch_replicator_scheduler_job:sum_stats(Cp, Stats), + NewStats = couch_replicator_stats:new(), + State#state{stats = NewStats, last_stats_report_sec = Now}; false -> - Stats + State end. -ifdef(TEST). @@ -544,4 +646,70 @@ replication_worker_format_status_test() -> ?assertEqual(nil, proplists:get_value(pending_fetch, Format)), ?assertEqual(5, proplists:get_value(batch_size, Format)). +bulk_get_attempt_test() -> + Now = erlang:monotonic_time(second), + St = #bulk_get_stats{ratio = 0, tsec = Now}, + ?assert(attempt_bulk_get(St#bulk_get_stats{ratio = 0.1}, Now)), + ?assertNot(attempt_bulk_get(St#bulk_get_stats{ratio = 0.9}, Now)), + RetryTime = Now + ?BULK_GET_RETRY_SEC + 1, + ?assert(attempt_bulk_get(St#bulk_get_stats{ratio = 0.9}, RetryTime)). + +update_bulk_get_ratio_test() -> + Init = #bulk_get_stats{ratio = 0, tsec = 0}, + + % Almost all failures + Fail = lists:foldl( + fun(_, Acc) -> + update_bulk_get_ratio(Acc, 1, 1000) + end, + Init, + lists:seq(1, 100) + ), + ?assert(Fail#bulk_get_stats.ratio > 0.9), + + % Almost all successes + Success = lists:foldl( + fun(_, Acc) -> + update_bulk_get_ratio(Acc, 900, 1000) + end, + Init, + lists:seq(1, 100) + ), + ?assert(Success#bulk_get_stats.ratio < 0.1), + + % Half and half + Half = lists:foldl( + fun(_, Acc) -> + update_bulk_get_ratio(Acc, 500, 1000) + end, + Init, + lists:seq(1, 100) + ), + ?assert(Half#bulk_get_stats.ratio > 0.49), + ?assert(Half#bulk_get_stats.ratio < 0.51), + + % Successes after failures + FailThenSuccess = lists:foldl( + fun(_, Acc) -> + update_bulk_get_ratio(Acc, 1000, 1000) + end, + Fail, + lists:seq(1, 100) + ), + ?assert(FailThenSuccess#bulk_get_stats.ratio < 0.1), + + % Failures after success + SuccessThenFailure = lists:foldl( + fun(_, Acc) -> + update_bulk_get_ratio(Acc, 0, 1000) + end, + Success, + lists:seq(1, 100) + ), + ?assert(SuccessThenFailure#bulk_get_stats.ratio > 0.9), + + % 0 attempts doesn't crash with a division by 0 + ZeroAttempts = update_bulk_get_ratio(Init, 0, 0), + ?assertEqual(0.0, ZeroAttempts#bulk_get_stats.ratio). + -endif. diff --git a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl index 7e198562f..3f454b002 100644 --- a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl +++ b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl @@ -26,6 +26,7 @@ error_reporting_test_() -> ?TDEF_FE(t_fail_bulk_docs), ?TDEF_FE(t_fail_changes_reader), ?TDEF_FE(t_fail_revs_diff), + ?TDEF_FE(t_fail_bulk_get, 15), ?TDEF_FE(t_fail_changes_queue), ?TDEF_FE(t_fail_changes_manager), ?TDEF_FE(t_fail_changes_reader_proc) @@ -74,6 +75,31 @@ t_fail_revs_diff({_Ctx, {Source, Target}}) -> couch_replicator_notifier:stop(Listener). +t_fail_bulk_get({_Ctx, {Source, Target}}) -> + % For _bulk_get the expectation is that the replication job will fallback + % to a plain GET so the shape of the test is a bit different than the other + % tests here. + meck:new(couch_replicator_api_wrap, [passthrough]), + populate_db(Source, 1, 5), + {ok, _} = replicate(Source, Target), + wait_target_in_sync(Source, Target), + + % Tolerate a 500 error + mock_fail_req("/_bulk_get", {ok, "501", [], [<<"not_implemented">>]}), + meck:reset(couch_replicator_api_wrap), + populate_db(Source, 6, 6), + wait_target_in_sync(Source, Target), + % Check that there was a fallback to a plain GET + ?assertEqual(1, meck:num_calls(couch_replicator_api_wrap, open_doc_revs, 6)), + + % Tolerate a 400 error + mock_fail_req("/_bulk_get", {ok, "418", [], [<<"{\"x\":\"y\"}">>]}), + meck:reset(couch_replicator_api_wrap), + populate_db(Source, 7, 7), + wait_target_in_sync(Source, Target), + % Check that there was a falback to a plain GET + ?assertEqual(1, meck:num_calls(couch_replicator_api_wrap, open_doc_revs, 6)). + t_fail_changes_queue({_Ctx, {Source, Target}}) -> populate_db(Source, 1, 5), {ok, RepId} = replicate(Source, Target), diff --git a/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl b/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl index 1da4dfa02..d1116e8b0 100644 --- a/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl +++ b/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl @@ -98,6 +98,8 @@ check_active_tasks(DocsRead, DocsWritten, DocsFailed) -> RepTask = wait_for_task_status(DocsWritten), ?assertNotEqual(timeout, RepTask), ?assertEqual(DocsRead, couch_util:get_value(docs_read, RepTask)), + ?assertEqual(DocsRead, couch_util:get_value(bulk_get_docs, RepTask)), + ?assertEqual(DocsRead, couch_util:get_value(bulk_get_attempts, RepTask)), ?assertEqual(DocsWritten, couch_util:get_value(docs_written, RepTask)), ?assertEqual( DocsFailed, @@ -112,12 +114,16 @@ check_scheduler_jobs(DocsRead, DocsWritten, DocFailed) -> ?assert(maps:is_key(<<"changes_pending">>, Info)), ?assert(maps:is_key(<<"doc_write_failures">>, Info)), ?assert(maps:is_key(<<"docs_read">>, Info)), + ?assert(maps:is_key(<<"bulk_get_docs">>, Info)), + ?assert(maps:is_key(<<"bulk_get_attempts">>, Info)), ?assert(maps:is_key(<<"docs_written">>, Info)), ?assert(maps:is_key(<<"missing_revisions_found">>, Info)), ?assert(maps:is_key(<<"checkpointed_source_seq">>, Info)), ?assert(maps:is_key(<<"source_seq">>, Info)), ?assert(maps:is_key(<<"revisions_checked">>, Info)), ?assertMatch(#{<<"docs_read">> := DocsRead}, Info), + ?assertMatch(#{<<"bulk_get_docs">> := DocsRead}, Info), + ?assertMatch(#{<<"bulk_get_attempts">> := DocsRead}, Info), ?assertMatch(#{<<"docs_written">> := DocsWritten}, Info), ?assertMatch(#{<<"doc_write_failures">> := DocFailed}, Info). |