summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@gmail.com>2022-09-09 16:28:29 -0400
committerNick Vatamaniuc <nickva@users.noreply.github.com>2022-09-10 18:56:42 -0400
commit84883c6e34c46d72b07aee65786d6ec8a6c56700 (patch)
tree3fb97626ef1f28af4e9f66589b58c8929774d450
parent5f86af1ec7692ba1b2db1512401e1402a5e5f05a (diff)
downloadcouchdb-docs-fix-edit-link.tar.gz
Give the users the option to disable bulk_get attemptsdocs-fix-edit-link
Let users have the option to revert to the previous behavior. They may have some odd load balancer setup, or a custom API implementation where repeated _bulk_get attempts may cause unexpected issues.
-rw-r--r--rel/overlay/etc/default.ini2
-rw-r--r--src/couch_replicator/src/couch_replicator_docs.erl6
-rw-r--r--src/couch_replicator/src/couch_replicator_scheduler_job.erl3
-rw-r--r--src/couch_replicator/src/couch_replicator_worker.erl17
-rw-r--r--src/couch_replicator/test/eunit/couch_replicator_bulk_get_tests.erl109
-rw-r--r--src/docs/src/config/replicator.rst10
-rw-r--r--src/docs/src/json-structure.rst2
7 files changed, 142 insertions, 7 deletions
diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini
index 929c08351..b989ba3fa 100644
--- a/rel/overlay/etc/default.ini
+++ b/rel/overlay/etc/default.ini
@@ -494,6 +494,8 @@ partitioned||* = true
;retries_per_request = 5
; Use checkpoints
;use_checkpoints = true
+; Attempt to use bulk_get for fetching documents from the source
+;use_bulk_get = true
; Checkpoint interval
;checkpoint_interval = 30000
; Some socket options that might boost performance in some scenarios:
diff --git a/src/couch_replicator/src/couch_replicator_docs.erl b/src/couch_replicator/src/couch_replicator_docs.erl
index a80abe901..a60f1a1e1 100644
--- a/src/couch_replicator/src/couch_replicator_docs.erl
+++ b/src/couch_replicator/src/couch_replicator_docs.erl
@@ -464,6 +464,7 @@ make_options(Props) ->
DefTimeout = config:get_integer("replicator", "connection_timeout", 30000),
DefRetries = config:get_integer("replicator", "retries_per_request", 5),
UseCheckpoints = config:get_boolean("replicator", "use_checkpoints", true),
+ UseBulkGet = config:get_boolean("replicator", "use_bulk_get", true),
DefCheckpointInterval = config:get_integer(
"replicator",
"checkpoint_interval",
@@ -487,6 +488,7 @@ make_options(Props) ->
{worker_batch_size, DefBatchSize},
{worker_processes, DefWorkers},
{use_checkpoints, UseCheckpoints},
+ {use_bulk_get, UseBulkGet},
{checkpoint_interval, DefCheckpointInterval}
])
).
@@ -554,6 +556,10 @@ convert_options([{<<"since_seq">>, V} | R]) ->
[{since_seq, V} | convert_options(R)];
convert_options([{<<"use_checkpoints">>, V} | R]) ->
[{use_checkpoints, V} | convert_options(R)];
+convert_options([{<<"use_bulk_get">>, V} | _R]) when not is_boolean(V) ->
+ throw({bad_request, <<"parameter `use_bulk_get` must be a boolean">>});
+convert_options([{<<"use_bulk_get">>, V} | R]) ->
+ [{use_bulk_get, V} | convert_options(R)];
convert_options([{<<"checkpoint_interval">>, V} | R]) ->
[{checkpoint_interval, couch_util:to_integer(V)} | convert_options(R)];
% skip unknown option
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
index 38de8a45a..e06a1ffea 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
@@ -133,12 +133,11 @@ do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx = UserCtx} = Rep) -
% This starts the worker processes. They ask the changes queue manager for a
% a batch of _changes rows to process -> check which revs are missing in the
% target, and for the missing ones, it copies them from the source to the target.
- MaxConns = get_value(http_connections, Options),
Workers = lists:map(
fun(_) ->
couch_stats:increment_counter([couch_replicator, workers_started]),
{ok, Pid} = couch_replicator_worker:start_link(
- self(), Source, Target, ChangesManager, MaxConns
+ self(), Source, Target, ChangesManager, Options
),
Pid
end,
diff --git a/src/couch_replicator/src/couch_replicator_worker.erl b/src/couch_replicator/src/couch_replicator_worker.erl
index 94e3e028b..4df5d1c7c 100644
--- a/src/couch_replicator/src/couch_replicator_worker.erl
+++ b/src/couch_replicator/src/couch_replicator_worker.erl
@@ -68,23 +68,27 @@
parent,
cp,
changes_manager,
+ use_bulk_get,
bulk_get_stats
}).
-start_link(Cp, #httpdb{} = Source, Target, ChangesManager, MaxConns) ->
+start_link(Cp, #httpdb{} = Source, Target, ChangesManager, [_ | _] = Options) ->
gen_server:start_link(
- ?MODULE, {Cp, Source, Target, ChangesManager, MaxConns}, []
+ ?MODULE, {Cp, Source, Target, ChangesManager, Options}, []
).
-init({Cp, Source, Target, ChangesManager, MaxConns}) ->
+init({Cp, Source, Target, ChangesManager, Options}) ->
process_flag(trap_exit, true),
NowSec = erlang:monotonic_time(second),
+ MaxConns = couch_util:get_value(http_connections, Options),
+ UseBulkGet = couch_util:get_value(use_bulk_get, Options),
FetchSt = #fetch_st{
cp = Cp,
source = Source,
target = Target,
parent = self(),
changes_manager = ChangesManager,
+ use_bulk_get = UseBulkGet,
bulk_get_stats = #bulk_get_stats{ratio = 0, tsec = NowSec}
},
State = #state{
@@ -256,6 +260,7 @@ queue_fetch_loop(#fetch_st{} = St) ->
target = Target,
parent = Parent,
changes_manager = ChangesManager,
+ use_bulk_get = UseBulkGet,
bulk_get_stats = BgSt
} = St,
ChangesManager ! {get_changes, self()},
@@ -268,7 +273,7 @@ queue_fetch_loop(#fetch_st{} = St) ->
{changes, ChangesManager, Changes, ReportSeq} ->
% Find missing revisions (POST to _revs_diff)
IdRevs = find_missing(Changes, Target, Parent),
- {Docs, BgSt1} = bulk_get(Source, IdRevs, Parent, BgSt),
+ {Docs, BgSt1} = bulk_get(UseBulkGet, Source, IdRevs, Parent, BgSt),
% Documents without attachments can be uploaded right away
BatchFun = fun({_, #doc{} = Doc}) ->
ok = gen_server:call(Parent, {batch_doc, Doc}, infinity)
@@ -292,7 +297,9 @@ queue_fetch_loop(#fetch_st{} = St) ->
% _bulk_get. After a few successful attempts that should lower the failure rate
% enough to start allow using _bulk_get again.
%
-bulk_get(Source, IdRevs, Parent, #bulk_get_stats{} = St) ->
+bulk_get(false, _Source, _IdRevs, _Parent, #bulk_get_stats{} = St) ->
+ {#{}, St};
+bulk_get(true, Source, IdRevs, Parent, #bulk_get_stats{} = St) ->
NowSec = erlang:monotonic_time(second),
case attempt_bulk_get(St, NowSec) of
true ->
diff --git a/src/couch_replicator/test/eunit/couch_replicator_bulk_get_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_bulk_get_tests.erl
new file mode 100644
index 000000000..52c623c85
--- /dev/null
+++ b/src/couch_replicator/test/eunit/couch_replicator_bulk_get_tests.erl
@@ -0,0 +1,109 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_bulk_get_tests).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include("couch_replicator_test.hrl").
+
+-define(DOC_COUNT, 10).
+
+bulk_get_test_() ->
+ {
+ "Use _bulk_get when replicating",
+ {
+ foreach,
+ fun couch_replicator_test_helper:test_setup/0,
+ fun couch_replicator_test_helper:test_teardown/1,
+ [
+ ?TDEF_FE(use_bulk_get),
+ ?TDEF_FE(dont_use_bulk_get),
+ ?TDEF_FE(job_enable_overrides_global_disable),
+ ?TDEF_FE(global_disable_works)
+ ]
+ }
+ }.
+
+use_bulk_get({_Ctx, {Source, Target}}) ->
+ populate_db(Source, ?DOC_COUNT),
+ meck:new(couch_replicator_api_wrap, [passthrough]),
+ replicate(Source, Target, true),
+ BulkGets = meck:num_calls(couch_replicator_api_wrap, bulk_get, 3),
+ JustGets = meck:num_calls(couch_replicator_api_wrap, open_doc_revs, 6),
+ ?assertEqual(0, JustGets),
+ ?assert(BulkGets >= 1),
+ compare_dbs(Source, Target).
+
+dont_use_bulk_get({_Ctx, {Source, Target}}) ->
+ populate_db(Source, ?DOC_COUNT),
+ meck:new(couch_replicator_api_wrap, [passthrough]),
+ replicate(Source, Target, false),
+ BulkGets = meck:num_calls(couch_replicator_api_wrap, bulk_get, 3),
+ JustGets = meck:num_calls(couch_replicator_api_wrap, open_doc_revs, 6),
+ ?assertEqual(0, BulkGets),
+ ?assertEqual(?DOC_COUNT, JustGets),
+ compare_dbs(Source, Target).
+
+job_enable_overrides_global_disable({_Ctx, {Source, Target}}) ->
+ populate_db(Source, ?DOC_COUNT),
+ Persist = false,
+ config:set("replicator", "use_bulk_get", "false", Persist),
+ meck:new(couch_replicator_api_wrap, [passthrough]),
+ replicate(Source, Target, true),
+ BulkGets = meck:num_calls(couch_replicator_api_wrap, bulk_get, 3),
+ JustGets = meck:num_calls(couch_replicator_api_wrap, open_doc_revs, 6),
+ ?assertEqual(0, JustGets),
+ ?assert(BulkGets >= 1),
+ compare_dbs(Source, Target).
+
+global_disable_works({_Ctx, {Source, Target}}) ->
+ populate_db(Source, ?DOC_COUNT),
+ Persist = false,
+ config:set("replicator", "use_bulk_get", "false", Persist),
+ meck:new(couch_replicator_api_wrap, [passthrough]),
+ replicate(Source, Target),
+ BulkGets = meck:num_calls(couch_replicator_api_wrap, bulk_get, 3),
+ JustGets = meck:num_calls(couch_replicator_api_wrap, open_doc_revs, 6),
+ ?assertEqual(0, BulkGets),
+ ?assertEqual(?DOC_COUNT, JustGets),
+ compare_dbs(Source, Target).
+
+populate_db(DbName, DocCount) ->
+ Fun = fun(Id, Acc) -> [#doc{id = integer_to_binary(Id)} | Acc] end,
+ Docs = lists:foldl(Fun, [], lists:seq(1, DocCount)),
+ {ok, _} = fabric:update_docs(DbName, Docs, [?ADMIN_CTX]).
+
+compare_dbs(Source, Target) ->
+ couch_replicator_test_helper:cluster_compare_dbs(Source, Target).
+
+db_url(DbName) ->
+ couch_replicator_test_helper:cluster_db_url(DbName).
+
+replicate(Source, Target) ->
+ couch_replicator_test_helper:replicate(
+ {[
+ {<<"source">>, db_url(Source)},
+ {<<"target">>, db_url(Target)},
+ {<<"worker_processes">>, <<"1">>}
+ ]}
+ ).
+
+replicate(Source, Target, UseBulkGet) ->
+ couch_replicator_test_helper:replicate(
+ {[
+ {<<"source">>, db_url(Source)},
+ {<<"target">>, db_url(Target)},
+ {<<"worker_processes">>, <<"1">>},
+ {<<"use_bulk_get">>, UseBulkGet}
+ ]}
+ ).
diff --git a/src/docs/src/config/replicator.rst b/src/docs/src/config/replicator.rst
index 1f94efedf..092711450 100644
--- a/src/docs/src/config/replicator.rst
+++ b/src/docs/src/config/replicator.rst
@@ -188,6 +188,16 @@ Replicator Database Configuration
Disabling checkpoints is **not recommended** as CouchDB will scan
the Source database's changes feed from the beginning.
+ .. config:option:: use_bulk_get :: Use ``_bulk_get`` to fetch docs from the source
+
+ .. versionadded:: 3.3
+
+ If ``use_bulk_get`` is ``true``, CouchDB will attempt to use the
+ ``_bulk_get`` HTTP API endpoint to fetch documents from the source.
+ Replicator should automatically fall back to individual doc GETs on
+ on error; however, in some cases it may be useful to prevent spending
+ time attempting to call ``_bulk_get`` altogether.
+
.. config:option:: cert_file :: Path to user PEM certificate file
Path to a file containing the user's certificate::
diff --git a/src/docs/src/json-structure.rst b/src/docs/src/json-structure.rst
index 2220244fa..a39f0bfe2 100644
--- a/src/docs/src/json-structure.rst
+++ b/src/docs/src/json-structure.rst
@@ -284,6 +284,8 @@ Replication Settings
+--------------------------------+---------------------------------------------+
| winning_revs_only (optional) | Replicate only the winning revisions. |
+--------------------------------+---------------------------------------------+
+| use_bulk_get (optional) | Try to use ``_bulk_get`` to fetch revisions.|
++--------------------------------+---------------------------------------------+
.. _replication-status: