summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@gmail.com>2022-08-18 21:00:13 -0400
committerNick Vatamaniuc <nickva@users.noreply.github.com>2022-08-30 21:02:31 -0400
commitbe93983b927b543e1728c5d50a66429e2c88d229 (patch)
tree9bd8e35ec282c2e37a38d980918a7e4289976866
parentd4c7273e706d5121794fc55ee2633af7f7f02543 (diff)
downloadcouchdb-be93983b927b543e1728c5d50a66429e2c88d229.tar.gz
Implement _bulk_get support for the replicator
By now most of the CouchDB implementations support `_bulk_get`, so let's update the replicator to take advantage of that. To be backwards compatible assume some endpoints will not support `_bulk_get` and may return either a 500 or 400 error. In that case the replicator will fall back to fetching individual document revisions like it did previously. For additional backward compatibility, and to keep things simple, support only the `application/json` `_bulk_get` response format. (Ideally, we'd send multiple Accept headers with various `q` preference parameters for `json` and `multipart/related` content, then do the right thing based on the response, however, none of the recent Apache CouchDB implementations support that scheme properly). Since fetching attachments with application/json response is not optimal, attachments are fetched individually. This means there are two main reasons for the replicator to fall back to fetching individual revisions: 1) when _bulk_get endpoint is not supported and 2) when the document revisions contain attachments. To avoid wasting resource repeatedly attempting to use `_bulk_get `and then falling back to individual doc fetches, maintain some historical stats about the rate of failure, and if it crosses a threshold, skip calling `_bulk_get` altogether. This is implemented with a moving exponential average, along with periodic probing to see if `_bulk_get` usage becomes viable again. To give the users some indication about how successful `_bulk_get` usage is, introduce two replication statistics parameters: * `bulk_get_attempts`: _bulk_get document revisions attempts made. * `bulk_get_docs` : `_bulk_get` document revisions successfully retrieved. These are persisted in the replication checkpoints along with the rest of the job statistics and visible in `_scheduler/jobs` and `_active_tasks` output. Since we updated the replication job statistics, perform some minor cleanups in that area: - Stop using the process dictionary for the reporting timestamp. Use a regular record state field instead. - Use casts instead of a calls when possible. We still rely on report_seq_done calls as a synchronization point to make sure we don't overrun the message queues for the replication worker and scheduler job process. - Add stats update API functions instead of relying on naked `gen_server` calls and casts. The functions make it clear which process is being updated: the replication worker or the main replication scheduler job process. For testing, rely on the variety of existing replication tests running and passing. The recently merged replication test overhaul from [pull the tests form using the node-local (back-end API) to chttpd (the cluster API), which actually implements `_bulk_get`. In this way, the majority of replication tests should test the `_bulk_get` API usage alongside whatever else they are testing. There there is new test checking that `_bulk_get` fallback works and testing the characteristics of the new statistics parameters.
-rw-r--r--src/couch_replicator/src/couch_replicator_api_wrap.erl83
-rw-r--r--src/couch_replicator/src/couch_replicator_changes_reader.erl2
-rw-r--r--src/couch_replicator/src/couch_replicator_httpc.erl4
-rw-r--r--src/couch_replicator/src/couch_replicator_scheduler_job.erl30
-rw-r--r--src/couch_replicator/src/couch_replicator_stats.erl14
-rw-r--r--src/couch_replicator/src/couch_replicator_worker.erl294
-rw-r--r--src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl26
-rw-r--r--src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl6
8 files changed, 382 insertions, 77 deletions
diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl b/src/couch_replicator/src/couch_replicator_api_wrap.erl
index 3c956c8de..a6e39cb02 100644
--- a/src/couch_replicator/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl
@@ -35,6 +35,7 @@
update_docs/4,
ensure_full_commit/1,
get_missing_revs/2,
+ bulk_get/3,
open_doc/3,
open_doc_revs/6,
changes_since/5,
@@ -206,6 +207,74 @@ get_missing_revs(#httpdb{} = Db, IdRevs) ->
end
).
+bulk_get(#httpdb{} = Db, #{} = IdRevs, Options) ->
+ FoldFun = fun({Id, Rev}, PAs, Acc) -> [{Id, Rev, PAs} | Acc] end,
+ ReqDocsList = lists:sort(maps:fold(FoldFun, [], IdRevs)),
+ MapFun = fun({Id, Rev, PAs}) ->
+ #{
+ <<"id">> => Id,
+ <<"rev">> => couch_doc:rev_to_str(Rev),
+ <<"atts_since">> => couch_doc:revs_to_strs(PAs)
+ }
+ end,
+ ReqDocsMaps = lists:map(MapFun, ReqDocsList),
+ % We are also sending request parameters in the doc body with the hopes
+ % that at some point in the future we could make that the default, instead
+ % of having to send query parameters with a POST request as we do today
+ Body = options_to_json_map(Options, #{<<"docs">> => ReqDocsMaps}),
+ Req = [
+ {method, post},
+ {path, "_bulk_get"},
+ {qs, options_to_query_args(Options, [])},
+ {body, ?JSON_ENCODE(Body)},
+ {headers, [
+ {"Content-Type", "application/json"},
+ {"Accept", "application/json"}
+ ]}
+ ],
+ try
+ send_req(Db, Req, fun
+ (200, _, {[{<<"results">>, Res}]}) when is_list(Res) ->
+ Zip = lists:zipwith(fun bulk_get_zip/2, ReqDocsList, Res),
+ {ok, maps:from_list(Zip)};
+ (200, _, _) ->
+ {error, {bulk_get_failed, invalid_results}};
+ (ErrCode, _, _) when is_integer(ErrCode) ->
+ % On older Apache CouchDB instances where _bulk_get is not
+ % implemented we would hit the POST db/doc form uploader
+ % handler. When that fails the request body is not consumed and
+ % we'd end up recycling a worker with an unsent body in the
+ % connection stream. Instead of waiting for it to blow up
+ % eventually and consuming an extra retry attempt, proactively
+ % advise httpc logic to stop this worker and not return back to
+ % the pool.
+ couch_replicator_httpc:stop_http_worker(),
+ {error, {bulk_get_failed, ErrCode}}
+ end)
+ catch
+ exit:{http_request_failed, _, _, {error, {code, ErrCode}}} ->
+ % We are being a bit more tolerant of _bulk_get errors as we can
+ % always fallback to individual fetches
+ {error, {bulk_get_failed, ErrCode}}
+ end.
+
+bulk_get_zip({Id, Rev, _}, {[_ | _] = Props}) ->
+ Docs = couch_util:get_value(<<"docs">>, Props),
+ ResId = couch_util:get_value(<<"id">>, Props),
+ % "docs" is a one item list, either [{"ok": Doc}] or [{"error": Error}]
+ case Docs of
+ [{[{<<"ok">>, {[_ | _]} = Doc}]}] when ResId =:= Id ->
+ {{Id, Rev}, couch_doc:from_json_obj(Doc)};
+ [{[{<<"error">>, {[_ | _] = Err}}]}] when ResId =:= Id ->
+ Tag = couch_util:get_value(<<"error">>, Err),
+ Reason = couch_util:get_value(<<"reason">>, Err),
+ couch_log:debug("~p bulk_get zip error ~p:~p", [?MODULE, Tag, Reason]),
+ {{Id, Rev}, {error, {Tag, Reason}}};
+ Other ->
+ couch_log:debug("~p bulk_get zip other error:~p", [?MODULE, Other]),
+ {{Id, Rev}, {error, {unexpected_bulk_get_response, Other}}}
+ end.
+
-spec open_doc_revs(#httpdb{}, binary(), list(), list(), function(), any()) -> no_return().
open_doc_revs(#httpdb{retries = 0} = HttpDb, Id, Revs, Options, _Fun, _Acc) ->
Path = encode_doc_id(Id),
@@ -647,7 +716,19 @@ options_to_query_args([latest | Rest], Acc) ->
options_to_query_args(Rest, [{"latest", "true"} | Acc]);
options_to_query_args([{open_revs, Revs} | Rest], Acc) ->
JsonRevs = ?b2l(iolist_to_binary(?JSON_ENCODE(couch_doc:revs_to_strs(Revs)))),
- options_to_query_args(Rest, [{"open_revs", JsonRevs} | Acc]).
+ options_to_query_args(Rest, [{"open_revs", JsonRevs} | Acc]);
+options_to_query_args([{attachments, Bool} | Rest], Acc) when is_atom(Bool) ->
+ BoolStr = atom_to_list(Bool),
+ options_to_query_args(Rest, [{"attachments", BoolStr} | Acc]).
+
+options_to_json_map([], #{} = Acc) ->
+ Acc;
+options_to_json_map([latest | Rest], #{} = Acc) ->
+ options_to_json_map(Rest, Acc#{<<"latest">> => true});
+options_to_json_map([revs | Rest], #{} = Acc) ->
+ options_to_json_map(Rest, Acc#{<<"revs">> => true});
+options_to_json_map([{attachments, Bool} | Rest], #{} = Acc) when is_atom(Bool) ->
+ options_to_json_map(Rest, Acc#{<<"attachments">> => Bool}).
atts_since_arg(_UrlLen, [], _MaxLen, Acc) ->
lists:reverse(Acc);
diff --git a/src/couch_replicator/src/couch_replicator_changes_reader.erl b/src/couch_replicator/src/couch_replicator_changes_reader.erl
index 7fa8c26c2..bb6733608 100644
--- a/src/couch_replicator/src/couch_replicator_changes_reader.erl
+++ b/src/couch_replicator/src/couch_replicator_changes_reader.erl
@@ -119,7 +119,7 @@ process_change(#doc_info{id = Id} = DocInfo, {Parent, Db, ChangesQueue, _}) ->
[Id, SourceDb]
),
Stats = couch_replicator_stats:new([{doc_write_failures, 1}]),
- ok = gen_server:call(Parent, {add_stats, Stats}, infinity);
+ ok = couch_replicator_scheduler_job:sum_stats(Parent, Stats);
false ->
ok = couch_work_queue:queue(ChangesQueue, DocInfo),
put(last_seq, DocInfo#doc_info.high_seq)
diff --git a/src/couch_replicator/src/couch_replicator_httpc.erl b/src/couch_replicator/src/couch_replicator_httpc.erl
index c6f22468d..cd5e4d75d 100644
--- a/src/couch_replicator/src/couch_replicator_httpc.erl
+++ b/src/couch_replicator/src/couch_replicator_httpc.erl
@@ -18,6 +18,7 @@
-export([setup/1]).
-export([send_req/3]).
+-export([stop_http_worker/0]).
-export([full_url/2]).
-import(couch_util, [
@@ -102,6 +103,9 @@ send_req(HttpDb, Params1, Callback) ->
Ret
end.
+stop_http_worker() ->
+ put(?STOP_HTTP_WORKER, stop).
+
send_ibrowse_req(#httpdb{headers = BaseHeaders} = HttpDb0, Params) ->
Method = get_value(method, Params, get),
UserHeaders = get_value(headers, Params, []),
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
index 2ae8718ad..38de8a45a 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
@@ -25,7 +25,9 @@
handle_info/2,
handle_cast/2,
code_change/3,
- format_status/2
+ format_status/2,
+ sum_stats/2,
+ report_seq_done/3
]).
-include_lib("couch/include/couch_db.hrl").
@@ -181,20 +183,13 @@ do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx = UserCtx} = Rep) -
workers = Workers
}}.
-handle_call({add_stats, Stats}, From, State) ->
- gen_server:reply(From, ok),
- NewStats = couch_replicator_utils:sum_stats(State#rep_state.stats, Stats),
- {noreply, State#rep_state{stats = NewStats}};
-handle_call(
- {report_seq_done, Seq, StatsInc},
- From,
+handle_call({report_seq_done, Seq, StatsInc}, From, State) ->
#rep_state{
seqs_in_progress = SeqsInProgress,
highest_seq_done = HighestDone,
current_through_seq = ThroughSeq,
stats = Stats
- } = State
-) ->
+ } = State,
gen_server:reply(From, ok),
{NewThroughSeq0, NewSeqsInProgress} =
case SeqsInProgress of
@@ -237,6 +232,9 @@ handle_call(
update_task(NewState),
{noreply, NewState}.
+handle_cast({sum_stats, Stats}, State) ->
+ NewStats = couch_replicator_utils:sum_stats(State#rep_state.stats, Stats),
+ {noreply, State#rep_state{stats = NewStats}};
handle_cast(checkpoint, State) ->
case do_checkpoint(State) of
{ok, NewState} ->
@@ -467,6 +465,12 @@ format_status(_Opt, [_PDict, State]) ->
{highest_seq_done, HighestSeqDone}
].
+sum_stats(Pid, Stats) when is_pid(Pid) ->
+ gen_server:cast(Pid, {sum_stats, Stats}).
+
+report_seq_done(Pid, ReportSeq, Stats) when is_pid(Pid) ->
+ gen_server:call(Pid, {report_seq_done, ReportSeq, Stats}, infinity).
+
startup_jitter() ->
Jitter = config:get_integer(
"replicator",
@@ -792,7 +796,9 @@ do_checkpoint(State) ->
{<<"missing_found">>, couch_replicator_stats:missing_found(Stats)},
{<<"docs_read">>, couch_replicator_stats:docs_read(Stats)},
{<<"docs_written">>, couch_replicator_stats:docs_written(Stats)},
- {<<"doc_write_failures">>, couch_replicator_stats:doc_write_failures(Stats)}
+ {<<"doc_write_failures">>, couch_replicator_stats:doc_write_failures(Stats)},
+ {<<"bulk_get_docs">>, couch_replicator_stats:bulk_get_docs(Stats)},
+ {<<"bulk_get_attempts">>, couch_replicator_stats:bulk_get_attempts(Stats)}
]},
BaseHistory =
[
@@ -1056,6 +1062,8 @@ rep_stats(State) ->
{docs_written, couch_replicator_stats:docs_written(Stats)},
{changes_pending, get_pending_count(State)},
{doc_write_failures, couch_replicator_stats:doc_write_failures(Stats)},
+ {bulk_get_docs, couch_replicator_stats:bulk_get_docs(Stats)},
+ {bulk_get_attempts, couch_replicator_stats:bulk_get_attempts(Stats)},
{checkpointed_source_seq, CommittedSeq}
].
diff --git a/src/couch_replicator/src/couch_replicator_stats.erl b/src/couch_replicator/src/couch_replicator_stats.erl
index e1f23a1bc..ff8f1b62f 100644
--- a/src/couch_replicator/src/couch_replicator_stats.erl
+++ b/src/couch_replicator/src/couch_replicator_stats.erl
@@ -26,7 +26,9 @@
missing_found/1,
docs_read/1,
docs_written/1,
- doc_write_failures/1
+ doc_write_failures/1,
+ bulk_get_docs/1,
+ bulk_get_attempts/1
]).
new() ->
@@ -51,6 +53,12 @@ docs_written(Stats) ->
doc_write_failures(Stats) ->
get(doc_write_failures, Stats).
+bulk_get_docs(Stats) ->
+ get(bulk_get_docs, Stats).
+
+bulk_get_attempts(Stats) ->
+ get(bulk_get_attempts, Stats).
+
get(Field, Stats) ->
case orddict:find(Field, Stats) of
{ok, Value} ->
@@ -84,4 +92,8 @@ fmap({docs_written, _}) -> true;
fmap({<<"docs_written">>, V}) -> {true, {docs_written, V}};
fmap({doc_write_failures, _}) -> true;
fmap({<<"doc_write_failures">>, V}) -> {true, {doc_write_failures, V}};
+fmap({bulk_get_docs, _}) -> true;
+fmap({<<"bulk_get_docs">>, V}) -> {true, {bulk_get_docs, V}};
+fmap({bulk_get_attempts, _}) -> true;
+fmap({<<"bulk_get_attempts">>, V}) -> {true, {bulk_get_attempts, V}};
fmap({_, _}) -> false.
diff --git a/src/couch_replicator/src/couch_replicator_worker.erl b/src/couch_replicator/src/couch_replicator_worker.erl
index 3c6d6d040..94e3e028b 100644
--- a/src/couch_replicator/src/couch_replicator_worker.erl
+++ b/src/couch_replicator/src/couch_replicator_worker.erl
@@ -25,13 +25,12 @@
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
-% TODO: maybe make both buffer max sizes configurable
-
-% for remote targets
-define(DOC_BUFFER_BYTE_SIZE, 512 * 1024).
-% 10 seconds (in microseconds)
--define(STATS_DELAY, 10000000).
+-define(STATS_DELAY_SEC, 10).
-define(MISSING_DOC_RETRY_MSEC, 2000).
+-define(BULK_GET_RATIO_THRESHOLD, 0.5).
+-define(BULK_GET_RATIO_DECAY, 0.25).
+-define(BULK_GET_RETRY_SEC, 37).
-import(couch_util, [
to_binary/1,
@@ -54,9 +53,24 @@
pending_fetch = nil,
flush_waiter = nil,
stats = couch_replicator_stats:new(),
+ last_stats_report_sec = 0,
batch = #batch{}
}).
+-record(bulk_get_stats, {
+ ratio = 0,
+ tsec = 0
+}).
+
+-record(fetch_st, {
+ source,
+ target,
+ parent,
+ cp,
+ changes_manager,
+ bulk_get_stats
+}).
+
start_link(Cp, #httpdb{} = Source, Target, ChangesManager, MaxConns) ->
gen_server:start_link(
?MODULE, {Cp, Source, Target, ChangesManager, MaxConns}, []
@@ -64,17 +78,22 @@ start_link(Cp, #httpdb{} = Source, Target, ChangesManager, MaxConns) ->
init({Cp, Source, Target, ChangesManager, MaxConns}) ->
process_flag(trap_exit, true),
- Parent = self(),
- LoopPid = spawn_link(fun() ->
- queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager)
- end),
- erlang:put(last_stats_report, os:timestamp()),
+ NowSec = erlang:monotonic_time(second),
+ FetchSt = #fetch_st{
+ cp = Cp,
+ source = Source,
+ target = Target,
+ parent = self(),
+ changes_manager = ChangesManager,
+ bulk_get_stats = #bulk_get_stats{ratio = 0, tsec = NowSec}
+ },
State = #state{
cp = Cp,
max_parallel_conns = MaxConns,
- loop = LoopPid,
+ loop = spawn_link(fun() -> queue_fetch_loop(FetchSt) end),
source = Source,
- target = Target
+ target = Target,
+ last_stats_report_sec = NowSec
},
{ok, State}.
@@ -106,11 +125,6 @@ handle_call(
handle_call({batch_doc, Doc}, From, State) ->
gen_server:reply(From, ok),
{noreply, maybe_flush_docs(Doc, State)};
-handle_call({add_stats, IncStats}, From, #state{stats = Stats} = State) ->
- gen_server:reply(From, ok),
- NewStats = couch_replicator_utils:sum_stats(Stats, IncStats),
- NewStats2 = maybe_report_stats(State#state.cp, NewStats),
- {noreply, State#state{stats = NewStats2}};
handle_call(
flush,
{Pid, _} = From,
@@ -131,8 +145,11 @@ handle_call(
end,
{noreply, State2#state{flush_waiter = From}}.
+handle_cast({sum_stats, IncStats}, #state{stats = Stats} = State) ->
+ SumStats = couch_replicator_utils:sum_stats(Stats, IncStats),
+ {noreply, maybe_report_stats(State#state{stats = SumStats})};
handle_cast(Msg, State) ->
- {stop, {unexpected_async_call, Msg}, State}.
+ {stop, {unexpected_cast, Msg}, State}.
handle_info({'EXIT', Pid, normal}, #state{loop = Pid} = State) ->
#state{
@@ -188,6 +205,8 @@ handle_info({'EXIT', _Pid, max_backoff}, State) ->
{stop, {shutdown, max_backoff}, State};
handle_info({'EXIT', _Pid, {bulk_docs_failed, _, _} = Err}, State) ->
{stop, {shutdown, Err}, State};
+handle_info({'EXIT', _Pid, {bulk_get_failed, _, _} = Err}, State) ->
+ {stop, {shutdown, Err}, State};
handle_info({'EXIT', _Pid, {revs_diff_failed, _, _} = Err}, State) ->
{stop, {shutdown, Err}, State};
handle_info({'EXIT', _Pid, {http_request_failed, _, _, _} = Err}, State) ->
@@ -221,40 +240,109 @@ format_status(_Opt, [_PDict, State]) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager) ->
+sum_stats(Pid, Stats) when is_pid(Pid) ->
+ ok = gen_server:cast(Pid, {sum_stats, Stats}).
+
+report_seq_done(Cp, Seq) ->
+ ok = report_seq_done(Cp, Seq, couch_replicator_stats:new()).
+
+report_seq_done(Cp, Seq, Stats) ->
+ ok = couch_replicator_scheduler_job:report_seq_done(Cp, Seq, Stats).
+
+queue_fetch_loop(#fetch_st{} = St) ->
+ #fetch_st{
+ cp = Cp,
+ source = Source,
+ target = Target,
+ parent = Parent,
+ changes_manager = ChangesManager,
+ bulk_get_stats = BgSt
+ } = St,
ChangesManager ! {get_changes, self()},
receive
{closed, ChangesManager} ->
ok;
{changes, ChangesManager, [], ReportSeq} ->
- Stats = couch_replicator_stats:new(),
- ok = gen_server:call(Cp, {report_seq_done, ReportSeq, Stats}, infinity),
- queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager);
+ ok = report_seq_done(Cp, ReportSeq),
+ queue_fetch_loop(St);
{changes, ChangesManager, Changes, ReportSeq} ->
- {IdRevs, Stats0} = find_missing(Changes, Target),
- ok = gen_server:call(Parent, {add_stats, Stats0}, infinity),
- remote_process_batch(IdRevs, Parent),
+ % Find missing revisions (POST to _revs_diff)
+ IdRevs = find_missing(Changes, Target, Parent),
+ {Docs, BgSt1} = bulk_get(Source, IdRevs, Parent, BgSt),
+ % Documents without attachments can be uploaded right away
+ BatchFun = fun({_, #doc{} = Doc}) ->
+ ok = gen_server:call(Parent, {batch_doc, Doc}, infinity)
+ end,
+ lists:foreach(BatchFun, lists:sort(maps:to_list(Docs))),
+ % Fetch individually if _bulk_get failed or there are attachments
+ FetchFun = fun({Id, Rev}, PAs) ->
+ ok = gen_server:call(Parent, {fetch_doc, {Id, [Rev], PAs}}, infinity)
+ end,
+ maps:map(FetchFun, maps:without(maps:keys(Docs), IdRevs)),
{ok, Stats} = gen_server:call(Parent, flush, infinity),
- ok = gen_server:call(Cp, {report_seq_done, ReportSeq, Stats}, infinity),
- erlang:put(last_stats_report, os:timestamp()),
+ ok = report_seq_done(Cp, ReportSeq, Stats),
couch_log:debug("Worker reported completion of seq ~p", [ReportSeq]),
- queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager)
+ queue_fetch_loop(St#fetch_st{bulk_get_stats = BgSt1})
end.
-remote_process_batch([], _Parent) ->
- ok;
-remote_process_batch([{Id, Revs, PAs} | Rest], Parent) ->
- % When the source is a remote database, we fetch a single document revision
- % per HTTP request. This is mostly to facilitate retrying of HTTP requests
- % due to network transient failures. It also helps not exceeding the maximum
- % URL length allowed by proxies and Mochiweb.
- lists:foreach(
- fun(Rev) ->
- ok = gen_server:call(Parent, {fetch_doc, {Id, [Rev], PAs}}, infinity)
+% Return revisions without attachments. Maintain an exponential moving failure
+% ratio. When the ratio becomes greater than the threshold, skip calling
+% bulk_get altogether. To avoid getting permanently stuck with a high failure
+% ratio after replicating lots of attachments, periodically attempt to use
+% _bulk_get. After a few successful attempts that should lower the failure rate
+% enough to start allow using _bulk_get again.
+%
+bulk_get(Source, IdRevs, Parent, #bulk_get_stats{} = St) ->
+ NowSec = erlang:monotonic_time(second),
+ case attempt_bulk_get(St, NowSec) of
+ true ->
+ Docs = bulk_get(Source, IdRevs),
+ Attempts = map_size(IdRevs),
+ Successes = map_size(Docs),
+ Stats = couch_replicator_stats:new([
+ {bulk_get_docs, Successes},
+ {bulk_get_attempts, Attempts}
+ ]),
+ ok = sum_stats(Parent, Stats),
+ St1 = update_bulk_get_ratio(St, Successes, Attempts),
+ {Docs, St1#bulk_get_stats{tsec = NowSec}};
+ false ->
+ {#{}, St}
+ end.
+
+bulk_get(#httpdb{} = Source, #{} = IdRevs) ->
+ Opts = [latest, revs, {attachments, false}],
+ case couch_replicator_api_wrap:bulk_get(Source, IdRevs, Opts) of
+ {ok, #{} = Docs} ->
+ FilterFun = fun
+ (_, #doc{atts = []}) -> true;
+ (_, #doc{atts = [_ | _]}) -> false;
+ (_, {error, _}) -> false
+ end,
+ maps:filter(FilterFun, Docs);
+ {error, Error} ->
+ couch_log:debug("_bulk_get failed ~p", [Error]),
+ #{}
+ end.
+
+attempt_bulk_get(#bulk_get_stats{} = St, NowSec) ->
+ #bulk_get_stats{tsec = TSec, ratio = Ratio} = St,
+ TimeThreshold = (NowSec - TSec) > ?BULK_GET_RETRY_SEC,
+ RatioThreshold = Ratio =< ?BULK_GET_RATIO_THRESHOLD,
+ TimeThreshold orelse RatioThreshold.
+
+% Update fail ratio. Use the basic exponential moving average formula to smooth
+% over minor bumps in case we encounter a few % attachments and then get back
+% to replicationg documents without attachments.
+%
+update_bulk_get_ratio(#bulk_get_stats{} = St, Successes, Attempts) ->
+ #bulk_get_stats{ratio = Avg} = St,
+ Ratio =
+ case Attempts > 0 of
+ true -> (Attempts - Successes) / Attempts;
+ false -> 0
end,
- Revs
- ),
- remote_process_batch(Rest, Parent).
+ St#bulk_get_stats{ratio = ?BULK_GET_RATIO_DECAY * (Ratio - Avg) + Avg}.
-spec spawn_doc_reader(#httpdb{}, #httpdb{}, {list(), list(), list()}) -> no_return().
spawn_doc_reader(Source, Target, FetchParams) ->
@@ -331,7 +419,7 @@ doc_handler_flush_doc(#doc{} = Doc, {Parent, Target} = Acc) ->
false ->
{{skip, Acc}, couch_replicator_stats:increment(doc_write_failures, Stats)}
end,
- ok = gen_server:call(Parent, {add_stats, Stats2}, infinity),
+ ok = sum_stats(Parent, Stats2),
Result.
spawn_writer(Target, #batch{docs = DocList, size = Size}) ->
@@ -345,32 +433,30 @@ spawn_writer(Target, #batch{docs = DocList, size = Size}) ->
spawn_link(
fun() ->
Stats = flush_docs(Target, DocList),
- ok = gen_server:call(Parent, {add_stats, Stats}, infinity)
+ ok = sum_stats(Parent, Stats)
end
).
after_full_flush(#state{stats = Stats, flush_waiter = Waiter} = State) ->
gen_server:reply(Waiter, {ok, Stats}),
- erlang:put(last_stats_report, os:timestamp()),
State#state{
stats = couch_replicator_stats:new(),
flush_waiter = nil,
writer = nil,
- batch = #batch{}
+ batch = #batch{},
+ last_stats_report_sec = erlang:monotonic_time(second)
}.
maybe_flush_docs(Doc, State) ->
#state{
target = Target,
batch = Batch,
- stats = Stats,
- cp = Cp
+ stats = Stats
} = State,
{Batch2, WStats} = maybe_flush_docs(Target, Batch, Doc),
Stats2 = couch_replicator_stats:sum_stats(Stats, WStats),
Stats3 = couch_replicator_stats:increment(docs_read, Stats2),
- Stats4 = maybe_report_stats(Cp, Stats3),
- State#state{stats = Stats4, batch = Batch2}.
+ maybe_report_stats(State#state{stats = Stats3, batch = Batch2}).
maybe_flush_docs(#httpdb{} = Target, Batch, Doc) ->
#batch{docs = DocAcc, size = SizeAcc} = Batch,
@@ -481,7 +567,7 @@ flush_doc(Target, #doc{id = Id, revs = {Pos, [RevId | _]}} = Doc) ->
{error, Err}
end.
-find_missing(DocInfos, Target) ->
+find_missing(DocInfos, Target, Parent) ->
{IdRevs, AllRevsCount} = lists:foldr(
fun
(#doc_info{revs = []}, {IdRevAcc, CountAcc}) ->
@@ -504,21 +590,37 @@ find_missing(DocInfos, Target) ->
0,
Missing
),
- Stats = couch_replicator_stats:new([
- {missing_checked, AllRevsCount},
- {missing_found, MissingRevsCount}
- ]),
- {Missing, Stats}.
-
-maybe_report_stats(Cp, Stats) ->
- Now = os:timestamp(),
- case timer:now_diff(erlang:get(last_stats_report), Now) >= ?STATS_DELAY of
+ ok = sum_stats(
+ Parent,
+ couch_replicator_stats:new([
+ {missing_checked, AllRevsCount},
+ {missing_found, MissingRevsCount}
+ ])
+ ),
+ % Turn {Id, [Rev1, Rev2, ...], PAs} into a map:
+ % #{{Id, Rev1} => PAs, {Id, Rev2} => PAs, ...}
+ id_rev_map(Missing).
+
+id_rev_map(IdRevs) ->
+ id_rev_map(IdRevs, #{}).
+
+id_rev_map([], #{} = Acc) ->
+ Acc;
+id_rev_map([{_, [], _} | Docs], #{} = Acc) ->
+ id_rev_map(Docs, Acc);
+id_rev_map([{Id, [Rev | Revs], PAs} | Docs], #{} = Acc) ->
+ id_rev_map([{Id, Revs, PAs} | Docs], Acc#{{Id, Rev} => PAs}).
+
+maybe_report_stats(#state{} = State) ->
+ #state{cp = Cp, stats = Stats, last_stats_report_sec = LastReport} = State,
+ Now = erlang:monotonic_time(second),
+ case Now - LastReport >= ?STATS_DELAY_SEC of
true ->
- ok = gen_server:call(Cp, {add_stats, Stats}, infinity),
- erlang:put(last_stats_report, Now),
- couch_replicator_stats:new();
+ ok = couch_replicator_scheduler_job:sum_stats(Cp, Stats),
+ NewStats = couch_replicator_stats:new(),
+ State#state{stats = NewStats, last_stats_report_sec = Now};
false ->
- Stats
+ State
end.
-ifdef(TEST).
@@ -544,4 +646,70 @@ replication_worker_format_status_test() ->
?assertEqual(nil, proplists:get_value(pending_fetch, Format)),
?assertEqual(5, proplists:get_value(batch_size, Format)).
+bulk_get_attempt_test() ->
+ Now = erlang:monotonic_time(second),
+ St = #bulk_get_stats{ratio = 0, tsec = Now},
+ ?assert(attempt_bulk_get(St#bulk_get_stats{ratio = 0.1}, Now)),
+ ?assertNot(attempt_bulk_get(St#bulk_get_stats{ratio = 0.9}, Now)),
+ RetryTime = Now + ?BULK_GET_RETRY_SEC + 1,
+ ?assert(attempt_bulk_get(St#bulk_get_stats{ratio = 0.9}, RetryTime)).
+
+update_bulk_get_ratio_test() ->
+ Init = #bulk_get_stats{ratio = 0, tsec = 0},
+
+ % Almost all failures
+ Fail = lists:foldl(
+ fun(_, Acc) ->
+ update_bulk_get_ratio(Acc, 1, 1000)
+ end,
+ Init,
+ lists:seq(1, 100)
+ ),
+ ?assert(Fail#bulk_get_stats.ratio > 0.9),
+
+ % Almost all successes
+ Success = lists:foldl(
+ fun(_, Acc) ->
+ update_bulk_get_ratio(Acc, 900, 1000)
+ end,
+ Init,
+ lists:seq(1, 100)
+ ),
+ ?assert(Success#bulk_get_stats.ratio < 0.1),
+
+ % Half and half
+ Half = lists:foldl(
+ fun(_, Acc) ->
+ update_bulk_get_ratio(Acc, 500, 1000)
+ end,
+ Init,
+ lists:seq(1, 100)
+ ),
+ ?assert(Half#bulk_get_stats.ratio > 0.49),
+ ?assert(Half#bulk_get_stats.ratio < 0.51),
+
+ % Successes after failures
+ FailThenSuccess = lists:foldl(
+ fun(_, Acc) ->
+ update_bulk_get_ratio(Acc, 1000, 1000)
+ end,
+ Fail,
+ lists:seq(1, 100)
+ ),
+ ?assert(FailThenSuccess#bulk_get_stats.ratio < 0.1),
+
+ % Failures after success
+ SuccessThenFailure = lists:foldl(
+ fun(_, Acc) ->
+ update_bulk_get_ratio(Acc, 0, 1000)
+ end,
+ Success,
+ lists:seq(1, 100)
+ ),
+ ?assert(SuccessThenFailure#bulk_get_stats.ratio > 0.9),
+
+ % 0 attempts doesn't crash with a division by 0
+ ZeroAttempts = update_bulk_get_ratio(Init, 0, 0),
+ ?assertEqual(0.0, ZeroAttempts#bulk_get_stats.ratio).
+
-endif.
diff --git a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
index 7e198562f..3f454b002 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
@@ -26,6 +26,7 @@ error_reporting_test_() ->
?TDEF_FE(t_fail_bulk_docs),
?TDEF_FE(t_fail_changes_reader),
?TDEF_FE(t_fail_revs_diff),
+ ?TDEF_FE(t_fail_bulk_get, 15),
?TDEF_FE(t_fail_changes_queue),
?TDEF_FE(t_fail_changes_manager),
?TDEF_FE(t_fail_changes_reader_proc)
@@ -74,6 +75,31 @@ t_fail_revs_diff({_Ctx, {Source, Target}}) ->
couch_replicator_notifier:stop(Listener).
+t_fail_bulk_get({_Ctx, {Source, Target}}) ->
+ % For _bulk_get the expectation is that the replication job will fallback
+ % to a plain GET so the shape of the test is a bit different than the other
+ % tests here.
+ meck:new(couch_replicator_api_wrap, [passthrough]),
+ populate_db(Source, 1, 5),
+ {ok, _} = replicate(Source, Target),
+ wait_target_in_sync(Source, Target),
+
+ % Tolerate a 500 error
+ mock_fail_req("/_bulk_get", {ok, "501", [], [<<"not_implemented">>]}),
+ meck:reset(couch_replicator_api_wrap),
+ populate_db(Source, 6, 6),
+ wait_target_in_sync(Source, Target),
+ % Check that there was a fallback to a plain GET
+ ?assertEqual(1, meck:num_calls(couch_replicator_api_wrap, open_doc_revs, 6)),
+
+ % Tolerate a 400 error
+ mock_fail_req("/_bulk_get", {ok, "418", [], [<<"{\"x\":\"y\"}">>]}),
+ meck:reset(couch_replicator_api_wrap),
+ populate_db(Source, 7, 7),
+ wait_target_in_sync(Source, Target),
+ % Check that there was a falback to a plain GET
+ ?assertEqual(1, meck:num_calls(couch_replicator_api_wrap, open_doc_revs, 6)).
+
t_fail_changes_queue({_Ctx, {Source, Target}}) ->
populate_db(Source, 1, 5),
{ok, RepId} = replicate(Source, Target),
diff --git a/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl b/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl
index 1da4dfa02..d1116e8b0 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl
@@ -98,6 +98,8 @@ check_active_tasks(DocsRead, DocsWritten, DocsFailed) ->
RepTask = wait_for_task_status(DocsWritten),
?assertNotEqual(timeout, RepTask),
?assertEqual(DocsRead, couch_util:get_value(docs_read, RepTask)),
+ ?assertEqual(DocsRead, couch_util:get_value(bulk_get_docs, RepTask)),
+ ?assertEqual(DocsRead, couch_util:get_value(bulk_get_attempts, RepTask)),
?assertEqual(DocsWritten, couch_util:get_value(docs_written, RepTask)),
?assertEqual(
DocsFailed,
@@ -112,12 +114,16 @@ check_scheduler_jobs(DocsRead, DocsWritten, DocFailed) ->
?assert(maps:is_key(<<"changes_pending">>, Info)),
?assert(maps:is_key(<<"doc_write_failures">>, Info)),
?assert(maps:is_key(<<"docs_read">>, Info)),
+ ?assert(maps:is_key(<<"bulk_get_docs">>, Info)),
+ ?assert(maps:is_key(<<"bulk_get_attempts">>, Info)),
?assert(maps:is_key(<<"docs_written">>, Info)),
?assert(maps:is_key(<<"missing_revisions_found">>, Info)),
?assert(maps:is_key(<<"checkpointed_source_seq">>, Info)),
?assert(maps:is_key(<<"source_seq">>, Info)),
?assert(maps:is_key(<<"revisions_checked">>, Info)),
?assertMatch(#{<<"docs_read">> := DocsRead}, Info),
+ ?assertMatch(#{<<"bulk_get_docs">> := DocsRead}, Info),
+ ?assertMatch(#{<<"bulk_get_attempts">> := DocsRead}, Info),
?assertMatch(#{<<"docs_written">> := DocsWritten}, Info),
?assertMatch(#{<<"doc_write_failures">> := DocFailed}, Info).