diff options
author | Nick Vatamaniuc <vatamane@gmail.com> | 2022-09-09 16:28:29 -0400 |
---|---|---|
committer | Nick Vatamaniuc <nickva@users.noreply.github.com> | 2022-09-10 18:56:42 -0400 |
commit | 84883c6e34c46d72b07aee65786d6ec8a6c56700 (patch) | |
tree | 3fb97626ef1f28af4e9f66589b58c8929774d450 | |
parent | 5f86af1ec7692ba1b2db1512401e1402a5e5f05a (diff) | |
download | couchdb-docs-fix-edit-link.tar.gz |
Give the users the option to disable bulk_get attemptsdocs-fix-edit-link
Let users have the option to revert to the previous behavior. They may have
some odd load balancer setup, or a custom API implementation where repeated
_bulk_get attempts may cause unexpected issues.
-rw-r--r-- | rel/overlay/etc/default.ini | 2 | ||||
-rw-r--r-- | src/couch_replicator/src/couch_replicator_docs.erl | 6 | ||||
-rw-r--r-- | src/couch_replicator/src/couch_replicator_scheduler_job.erl | 3 | ||||
-rw-r--r-- | src/couch_replicator/src/couch_replicator_worker.erl | 17 | ||||
-rw-r--r-- | src/couch_replicator/test/eunit/couch_replicator_bulk_get_tests.erl | 109 | ||||
-rw-r--r-- | src/docs/src/config/replicator.rst | 10 | ||||
-rw-r--r-- | src/docs/src/json-structure.rst | 2 |
7 files changed, 142 insertions, 7 deletions
diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini index 929c08351..b989ba3fa 100644 --- a/rel/overlay/etc/default.ini +++ b/rel/overlay/etc/default.ini @@ -494,6 +494,8 @@ partitioned||* = true ;retries_per_request = 5 ; Use checkpoints ;use_checkpoints = true +; Attempt to use bulk_get for fetching documents from the source +;use_bulk_get = true ; Checkpoint interval ;checkpoint_interval = 30000 ; Some socket options that might boost performance in some scenarios: diff --git a/src/couch_replicator/src/couch_replicator_docs.erl b/src/couch_replicator/src/couch_replicator_docs.erl index a80abe901..a60f1a1e1 100644 --- a/src/couch_replicator/src/couch_replicator_docs.erl +++ b/src/couch_replicator/src/couch_replicator_docs.erl @@ -464,6 +464,7 @@ make_options(Props) -> DefTimeout = config:get_integer("replicator", "connection_timeout", 30000), DefRetries = config:get_integer("replicator", "retries_per_request", 5), UseCheckpoints = config:get_boolean("replicator", "use_checkpoints", true), + UseBulkGet = config:get_boolean("replicator", "use_bulk_get", true), DefCheckpointInterval = config:get_integer( "replicator", "checkpoint_interval", @@ -487,6 +488,7 @@ make_options(Props) -> {worker_batch_size, DefBatchSize}, {worker_processes, DefWorkers}, {use_checkpoints, UseCheckpoints}, + {use_bulk_get, UseBulkGet}, {checkpoint_interval, DefCheckpointInterval} ]) ). @@ -554,6 +556,10 @@ convert_options([{<<"since_seq">>, V} | R]) -> [{since_seq, V} | convert_options(R)]; convert_options([{<<"use_checkpoints">>, V} | R]) -> [{use_checkpoints, V} | convert_options(R)]; +convert_options([{<<"use_bulk_get">>, V} | _R]) when not is_boolean(V) -> + throw({bad_request, <<"parameter `use_bulk_get` must be a boolean">>}); +convert_options([{<<"use_bulk_get">>, V} | R]) -> + [{use_bulk_get, V} | convert_options(R)]; convert_options([{<<"checkpoint_interval">>, V} | R]) -> [{checkpoint_interval, couch_util:to_integer(V)} | convert_options(R)]; % skip unknown option diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl index 38de8a45a..e06a1ffea 100644 --- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl +++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl @@ -133,12 +133,11 @@ do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx = UserCtx} = Rep) - % This starts the worker processes. They ask the changes queue manager for a % a batch of _changes rows to process -> check which revs are missing in the % target, and for the missing ones, it copies them from the source to the target. - MaxConns = get_value(http_connections, Options), Workers = lists:map( fun(_) -> couch_stats:increment_counter([couch_replicator, workers_started]), {ok, Pid} = couch_replicator_worker:start_link( - self(), Source, Target, ChangesManager, MaxConns + self(), Source, Target, ChangesManager, Options ), Pid end, diff --git a/src/couch_replicator/src/couch_replicator_worker.erl b/src/couch_replicator/src/couch_replicator_worker.erl index 94e3e028b..4df5d1c7c 100644 --- a/src/couch_replicator/src/couch_replicator_worker.erl +++ b/src/couch_replicator/src/couch_replicator_worker.erl @@ -68,23 +68,27 @@ parent, cp, changes_manager, + use_bulk_get, bulk_get_stats }). -start_link(Cp, #httpdb{} = Source, Target, ChangesManager, MaxConns) -> +start_link(Cp, #httpdb{} = Source, Target, ChangesManager, [_ | _] = Options) -> gen_server:start_link( - ?MODULE, {Cp, Source, Target, ChangesManager, MaxConns}, [] + ?MODULE, {Cp, Source, Target, ChangesManager, Options}, [] ). -init({Cp, Source, Target, ChangesManager, MaxConns}) -> +init({Cp, Source, Target, ChangesManager, Options}) -> process_flag(trap_exit, true), NowSec = erlang:monotonic_time(second), + MaxConns = couch_util:get_value(http_connections, Options), + UseBulkGet = couch_util:get_value(use_bulk_get, Options), FetchSt = #fetch_st{ cp = Cp, source = Source, target = Target, parent = self(), changes_manager = ChangesManager, + use_bulk_get = UseBulkGet, bulk_get_stats = #bulk_get_stats{ratio = 0, tsec = NowSec} }, State = #state{ @@ -256,6 +260,7 @@ queue_fetch_loop(#fetch_st{} = St) -> target = Target, parent = Parent, changes_manager = ChangesManager, + use_bulk_get = UseBulkGet, bulk_get_stats = BgSt } = St, ChangesManager ! {get_changes, self()}, @@ -268,7 +273,7 @@ queue_fetch_loop(#fetch_st{} = St) -> {changes, ChangesManager, Changes, ReportSeq} -> % Find missing revisions (POST to _revs_diff) IdRevs = find_missing(Changes, Target, Parent), - {Docs, BgSt1} = bulk_get(Source, IdRevs, Parent, BgSt), + {Docs, BgSt1} = bulk_get(UseBulkGet, Source, IdRevs, Parent, BgSt), % Documents without attachments can be uploaded right away BatchFun = fun({_, #doc{} = Doc}) -> ok = gen_server:call(Parent, {batch_doc, Doc}, infinity) @@ -292,7 +297,9 @@ queue_fetch_loop(#fetch_st{} = St) -> % _bulk_get. After a few successful attempts that should lower the failure rate % enough to start allow using _bulk_get again. % -bulk_get(Source, IdRevs, Parent, #bulk_get_stats{} = St) -> +bulk_get(false, _Source, _IdRevs, _Parent, #bulk_get_stats{} = St) -> + {#{}, St}; +bulk_get(true, Source, IdRevs, Parent, #bulk_get_stats{} = St) -> NowSec = erlang:monotonic_time(second), case attempt_bulk_get(St, NowSec) of true -> diff --git a/src/couch_replicator/test/eunit/couch_replicator_bulk_get_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_bulk_get_tests.erl new file mode 100644 index 000000000..52c623c85 --- /dev/null +++ b/src/couch_replicator/test/eunit/couch_replicator_bulk_get_tests.erl @@ -0,0 +1,109 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_replicator_bulk_get_tests). + +-include_lib("couch/include/couch_eunit.hrl"). +-include_lib("couch/include/couch_db.hrl"). +-include("couch_replicator_test.hrl"). + +-define(DOC_COUNT, 10). + +bulk_get_test_() -> + { + "Use _bulk_get when replicating", + { + foreach, + fun couch_replicator_test_helper:test_setup/0, + fun couch_replicator_test_helper:test_teardown/1, + [ + ?TDEF_FE(use_bulk_get), + ?TDEF_FE(dont_use_bulk_get), + ?TDEF_FE(job_enable_overrides_global_disable), + ?TDEF_FE(global_disable_works) + ] + } + }. + +use_bulk_get({_Ctx, {Source, Target}}) -> + populate_db(Source, ?DOC_COUNT), + meck:new(couch_replicator_api_wrap, [passthrough]), + replicate(Source, Target, true), + BulkGets = meck:num_calls(couch_replicator_api_wrap, bulk_get, 3), + JustGets = meck:num_calls(couch_replicator_api_wrap, open_doc_revs, 6), + ?assertEqual(0, JustGets), + ?assert(BulkGets >= 1), + compare_dbs(Source, Target). + +dont_use_bulk_get({_Ctx, {Source, Target}}) -> + populate_db(Source, ?DOC_COUNT), + meck:new(couch_replicator_api_wrap, [passthrough]), + replicate(Source, Target, false), + BulkGets = meck:num_calls(couch_replicator_api_wrap, bulk_get, 3), + JustGets = meck:num_calls(couch_replicator_api_wrap, open_doc_revs, 6), + ?assertEqual(0, BulkGets), + ?assertEqual(?DOC_COUNT, JustGets), + compare_dbs(Source, Target). + +job_enable_overrides_global_disable({_Ctx, {Source, Target}}) -> + populate_db(Source, ?DOC_COUNT), + Persist = false, + config:set("replicator", "use_bulk_get", "false", Persist), + meck:new(couch_replicator_api_wrap, [passthrough]), + replicate(Source, Target, true), + BulkGets = meck:num_calls(couch_replicator_api_wrap, bulk_get, 3), + JustGets = meck:num_calls(couch_replicator_api_wrap, open_doc_revs, 6), + ?assertEqual(0, JustGets), + ?assert(BulkGets >= 1), + compare_dbs(Source, Target). + +global_disable_works({_Ctx, {Source, Target}}) -> + populate_db(Source, ?DOC_COUNT), + Persist = false, + config:set("replicator", "use_bulk_get", "false", Persist), + meck:new(couch_replicator_api_wrap, [passthrough]), + replicate(Source, Target), + BulkGets = meck:num_calls(couch_replicator_api_wrap, bulk_get, 3), + JustGets = meck:num_calls(couch_replicator_api_wrap, open_doc_revs, 6), + ?assertEqual(0, BulkGets), + ?assertEqual(?DOC_COUNT, JustGets), + compare_dbs(Source, Target). + +populate_db(DbName, DocCount) -> + Fun = fun(Id, Acc) -> [#doc{id = integer_to_binary(Id)} | Acc] end, + Docs = lists:foldl(Fun, [], lists:seq(1, DocCount)), + {ok, _} = fabric:update_docs(DbName, Docs, [?ADMIN_CTX]). + +compare_dbs(Source, Target) -> + couch_replicator_test_helper:cluster_compare_dbs(Source, Target). + +db_url(DbName) -> + couch_replicator_test_helper:cluster_db_url(DbName). + +replicate(Source, Target) -> + couch_replicator_test_helper:replicate( + {[ + {<<"source">>, db_url(Source)}, + {<<"target">>, db_url(Target)}, + {<<"worker_processes">>, <<"1">>} + ]} + ). + +replicate(Source, Target, UseBulkGet) -> + couch_replicator_test_helper:replicate( + {[ + {<<"source">>, db_url(Source)}, + {<<"target">>, db_url(Target)}, + {<<"worker_processes">>, <<"1">>}, + {<<"use_bulk_get">>, UseBulkGet} + ]} + ). diff --git a/src/docs/src/config/replicator.rst b/src/docs/src/config/replicator.rst index 1f94efedf..092711450 100644 --- a/src/docs/src/config/replicator.rst +++ b/src/docs/src/config/replicator.rst @@ -188,6 +188,16 @@ Replicator Database Configuration Disabling checkpoints is **not recommended** as CouchDB will scan the Source database's changes feed from the beginning. + .. config:option:: use_bulk_get :: Use ``_bulk_get`` to fetch docs from the source + + .. versionadded:: 3.3 + + If ``use_bulk_get`` is ``true``, CouchDB will attempt to use the + ``_bulk_get`` HTTP API endpoint to fetch documents from the source. + Replicator should automatically fall back to individual doc GETs on + on error; however, in some cases it may be useful to prevent spending + time attempting to call ``_bulk_get`` altogether. + .. config:option:: cert_file :: Path to user PEM certificate file Path to a file containing the user's certificate:: diff --git a/src/docs/src/json-structure.rst b/src/docs/src/json-structure.rst index 2220244fa..a39f0bfe2 100644 --- a/src/docs/src/json-structure.rst +++ b/src/docs/src/json-structure.rst @@ -284,6 +284,8 @@ Replication Settings +--------------------------------+---------------------------------------------+ | winning_revs_only (optional) | Replicate only the winning revisions. | +--------------------------------+---------------------------------------------+ +| use_bulk_get (optional) | Try to use ``_bulk_get`` to fetch revisions.| ++--------------------------------+---------------------------------------------+ .. _replication-status: |