From 2c44eafa482d19a6b7a0054926e427a0876341f1 Mon Sep 17 00:00:00 2001 From: Nick Vatamaniuc Date: Tue, 19 May 2020 12:52:02 -0400 Subject: Bulk docs transaction batching * Interactive (regular) requests are split into smaller transactions, so larger updates won't fail with either timeout so or transaction too large FDB errors. * Non-interactive (replicated) requests can now batch their updates in a few transaction and gain extra performance. Batch size is configurable: ``` [fabric] update_docs_batch_size = 5000000 ``` --- rebar.config.script | 2 +- rel/overlay/etc/default.ini | 3 + src/fabric/src/fabric2_db.erl | 174 ++++++++++++++++---- src/fabric/src/fabric2_fdb.erl | 8 + src/fabric/test/fabric2_update_docs_tests.erl | 222 ++++++++++++++++++++++++++ 5 files changed, 380 insertions(+), 29 deletions(-) create mode 100644 src/fabric/test/fabric2_update_docs_tests.erl diff --git a/rebar.config.script b/rebar.config.script index 03c380f46..c145566a3 100644 --- a/rebar.config.script +++ b/rebar.config.script @@ -152,7 +152,7 @@ DepDescs = [ %% Independent Apps {config, "config", {tag, "2.1.7"}}, {b64url, "b64url", {tag, "1.0.2"}}, -{erlfdb, "erlfdb", {tag, "v1.2.0"}}, +{erlfdb, "erlfdb", {tag, "v1.2.1"}}, {ets_lru, "ets-lru", {tag, "1.1.0"}}, {khash, "khash", {tag, "1.1.0"}}, {snappy, "snappy", {tag, "CouchDB-1.0.4"}}, diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini index 66680a4e8..35e5147b2 100644 --- a/rel/overlay/etc/default.ini +++ b/rel/overlay/etc/default.ini @@ -242,6 +242,9 @@ port = 6984 ; ; Byte size of binary chunks written to FDB values. Defaults to FDB max limit. ;binary_chunk_size = 100000 +; +; Bulk docs transaction batch size in bytes +;update_docs_batch_size = 5000000 ; [rexi] ; buffer_count = 2000 diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl index 8764d4e18..ae03285e3 100644 --- a/src/fabric/src/fabric2_db.erl +++ b/src/fabric/src/fabric2_db.erl @@ -156,6 +156,20 @@ -define(RETURN(Term), throw({?MODULE, Term})). +-define(DEFAULT_UPDATE_DOCS_BATCH_SIZE, 5000000). + + +-record(bacc, { + db, + docs, + batch_size, + options, + rev_futures, + seen, + seen_tx, + results +}). + create(DbName, Options) -> case validate_dbname(DbName) of @@ -861,18 +875,8 @@ update_docs(Db, Docs0, Options) -> Docs1 = apply_before_doc_update(Db, Docs0, Options), try validate_atomic_update(Docs0, lists:member(all_or_nothing, Options)), - Resps0 = case lists:member(replicated_changes, Options) of - false -> - fabric2_fdb:transactional(Db, fun(TxDb) -> - update_docs_interactive(TxDb, Docs1, Options) - end); - true -> - lists:map(fun(Doc) -> - fabric2_fdb:transactional(Db, fun(TxDb) -> - update_doc_int(TxDb, Doc, Options) - end) - end, Docs1) - end, + + Resps0 = batch_update_docs(Db, Docs1, Options), % Notify index builder fabric2_index:db_updated(name(Db)), @@ -895,7 +899,7 @@ update_docs(Db, Docs0, Options) -> Else end end, Resps0), - case lists:member(replicated_changes, Options) of + case is_replicated(Options) of true -> {ok, lists:flatmap(fun(R) -> case R of @@ -1647,9 +1651,8 @@ update_doc_int(#{} = Db, #doc{} = Doc, Options) -> <> -> true; _ -> false end, - IsReplicated = lists:member(replicated_changes, Options), try - case {IsLocal, IsReplicated} of + case {IsLocal, is_replicated(Options)} of {false, false} -> update_doc_interactive(Db, Doc, Options); {false, true} -> update_doc_replicated(Db, Doc, Options); {true, _} -> update_local_doc(Db, Doc, Options) @@ -1659,17 +1662,119 @@ update_doc_int(#{} = Db, #doc{} = Doc, Options) -> end. -update_docs_interactive(Db, Docs0, Options) -> - Docs = tag_docs(Docs0), - Futures = get_winning_rev_futures(Db, Docs), - {Result, _} = lists:mapfoldl(fun(Doc, SeenIds) -> - try - update_docs_interactive(Db, Doc, Options, Futures, SeenIds) - catch throw:{?MODULE, Return} -> - {Return, SeenIds} +batch_update_docs(Db, Docs, Options) -> + BAcc = #bacc{ + db = Db, + docs = Docs, + batch_size = get_batch_size(Options), + options = Options, + rev_futures = #{}, + seen = [], + seen_tx = [], + results = [] + }, + #bacc{results = Res} = batch_update_docs(BAcc), + lists:reverse(Res). + + +batch_update_docs(#bacc{docs = []} = BAcc) -> + BAcc; + +batch_update_docs(#bacc{db = Db} = BAcc) -> + #bacc{ + db = Db, + docs = Docs, + options = Options + } = BAcc, + + BAccTx2 = fabric2_fdb:transactional(Db, fun(TxDb) -> + BAccTx = BAcc#bacc{db = TxDb}, + case is_replicated(Options) of + false -> + Tagged = tag_docs(Docs), + RevFutures = get_winning_rev_futures(TxDb, Tagged), + BAccTx1 = BAccTx#bacc{ + docs = Tagged, + rev_futures = RevFutures + }, + batch_update_interactive_tx(BAccTx1); + true -> + batch_update_replicated_tx(BAccTx) end - end, [], Docs), - Result. + end), + + % Clean up after the transaction ends so we can recurse with a clean state + maps:map(fun(Tag, RangeFuture) when is_reference(Tag) -> + ok = erlfdb:cancel(RangeFuture, [flush]) + end, BAccTx2#bacc.rev_futures), + + BAcc1 = BAccTx2#bacc{ + db = Db, + rev_futures = #{}, + seen_tx = [] + }, + + batch_update_docs(BAcc1). + + +batch_update_interactive_tx(#bacc{docs = []} = BAcc) -> + BAcc; + +batch_update_interactive_tx(#bacc{} = BAcc) -> + #bacc{ + db = TxDb, + docs = [Doc | Docs], + options = Options, + batch_size = MaxSize, + rev_futures = RevFutures, + seen = Seen, + results = Results + } = BAcc, + {Res, Seen1} = try + update_docs_interactive(TxDb, Doc, Options, RevFutures, Seen) + catch throw:{?MODULE, Return} -> + {Return, Seen} + end, + BAcc1 = BAcc#bacc{ + docs = Docs, + results = [Res | Results], + seen = Seen1 + }, + case fabric2_fdb:get_approximate_tx_size(TxDb) > MaxSize of + true -> BAcc1; + false -> batch_update_interactive_tx(BAcc1) + end. + + +batch_update_replicated_tx(#bacc{docs = []} = BAcc) -> + BAcc; + +batch_update_replicated_tx(#bacc{} = BAcc) -> + #bacc{ + db = TxDb, + docs = [Doc | Docs], + options = Options, + batch_size = MaxSize, + seen_tx = SeenTx, + results = Results + } = BAcc, + case lists:member(Doc#doc.id, SeenTx) of + true -> + % If we already updated this doc in the current transaction, wait + % till the next transaction to update it again. + BAcc; + false -> + Res = update_doc_int(TxDb, Doc, Options), + BAcc1 = BAcc#bacc{ + docs = Docs, + results = [Res | Results], + seen_tx = [Doc#doc.id | SeenTx] + }, + case fabric2_fdb:get_approximate_tx_size(TxDb) > MaxSize of + true -> BAcc1; + false -> batch_update_replicated_tx(BAcc1) + end + end. update_docs_interactive(Db, #doc{id = <>} = Doc, @@ -2122,9 +2227,8 @@ doc_to_revid(#doc{revs = Revs}) -> tag_docs([]) -> []; tag_docs([#doc{meta = Meta} = Doc | Rest]) -> - NewDoc = Doc#doc{ - meta = [{ref, make_ref()} | Meta] - }, + Meta1 = lists:keystore(ref, 1, Meta, {ref, make_ref()}), + NewDoc = Doc#doc{meta = Meta1}, [NewDoc | tag_docs(Rest)]. @@ -2226,3 +2330,17 @@ get_cached_db(#{} = Db, Opts) when is_list(Opts) -> fabric2_fdb:ensure_current(TxDb) end) end. + + +is_replicated(Options) when is_list(Options) -> + lists:member(replicated_changes, Options). + + +get_batch_size(Options) -> + case fabric2_util:get_value(batch_size, Options) of + undefined -> + config:get_integer("fabric", "update_docs_batch_size", + ?DEFAULT_UPDATE_DOCS_BATCH_SIZE); + Val when is_integer(Val) -> + Val + end. diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl index f274aa606..e8f6e0daa 100644 --- a/src/fabric/src/fabric2_fdb.erl +++ b/src/fabric/src/fabric2_fdb.erl @@ -75,6 +75,8 @@ new_versionstamp/1, + get_approximate_tx_size/1, + debug_cluster/0, debug_cluster/2 ]). @@ -1159,6 +1161,12 @@ new_versionstamp(Tx) -> {versionstamp, 16#FFFFFFFFFFFFFFFF, 16#FFFF, TxId}. +get_approximate_tx_size(#{} = TxDb) -> + require_transaction(TxDb), + #{tx := Tx} = TxDb, + erlfdb:wait(erlfdb:get_approximate_size(Tx)). + + debug_cluster() -> debug_cluster(<<>>, <<16#FE, 16#FF, 16#FF>>). diff --git a/src/fabric/test/fabric2_update_docs_tests.erl b/src/fabric/test/fabric2_update_docs_tests.erl new file mode 100644 index 000000000..5a2389abf --- /dev/null +++ b/src/fabric/test/fabric2_update_docs_tests.erl @@ -0,0 +1,222 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric2_update_docs_tests). + + +-include_lib("couch/include/couch_db.hrl"). +-include_lib("couch/include/couch_eunit.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include("fabric2_test.hrl"). + + +update_docs_test_() -> + { + "Test update_docs", + { + setup, + fun setup_all/0, + fun teardown_all/1, + { + foreach, + fun setup/0, + fun cleanup/1, + [ + ?TDEF_FE(update_docs), + ?TDEF_FE(update_docs_replicated), + ?TDEF_FE(update_docs_batches), + ?TDEF_FE(update_docs_replicated_batches), + ?TDEF_FE(update_docs_duplicate_ids_conflict), + ?TDEF_FE(update_docs_duplicate_ids_with_batches), + ?TDEF_FE(update_docs_replicate_batches_duplicate_id) + ] + } + } + }. + + +setup_all() -> + test_util:start_couch([fabric]). + + +teardown_all(Ctx) -> + test_util:stop_couch(Ctx). + + +setup() -> + {ok, Db} = fabric2_db:create(?tempdb(), [{user_ctx, ?ADMIN_USER}]), + meck:new(erlfdb, [passthrough]), + Db. + + +cleanup(#{} = Db) -> + meck:unload(), + ok = fabric2_db:delete(fabric2_db:name(Db), []). + + +update_docs(Db) -> + ?assertEqual({ok, []}, fabric2_db:update_docs(Db, [])), + + Doc1 = doc(), + Res1 = fabric2_db:update_docs(Db, [Doc1]), + ?assertMatch({ok, [_]}, Res1), + {ok, [Doc1Res]} = Res1, + ?assertMatch({ok, {1, <<_/binary>>}}, Doc1Res), + {ok, {1, Rev1}} = Doc1Res, + {ok, Doc1Open} = fabric2_db:open_doc(Db, Doc1#doc.id), + ?assertEqual(Doc1#doc{revs = {1, [Rev1]}}, Doc1Open), + + Doc2 = doc(), + Doc3 = doc(), + Res2 = fabric2_db:update_docs(Db, [Doc2, Doc3]), + ?assertMatch({ok, [_, _]}, Res2), + {ok, [Doc2Res, Doc3Res]} = Res2, + ?assertMatch({ok, {1, <<_/binary>>}}, Doc2Res), + ?assertMatch({ok, {1, <<_/binary>>}}, Doc3Res). + + +update_docs_replicated(Db) -> + Opts = [replicated_changes], + + ?assertEqual({ok, []}, fabric2_db:update_docs(Db, [], Opts)), + + Doc1 = doc(10, {1, [rev()]}), + ?assertMatch({ok, []}, fabric2_db:update_docs(Db, [Doc1], Opts)), + {ok, Doc1Open} = fabric2_db:open_doc(Db, Doc1#doc.id), + ?assertEqual(Doc1, Doc1Open), + + Doc2 = doc(10, {1, [rev()]}), + Doc3 = doc(10, {1, [rev()]}), + ?assertMatch({ok, []}, fabric2_db:update_docs(Db, [Doc2, Doc3], Opts)), + {ok, Doc2Open} = fabric2_db:open_doc(Db, Doc2#doc.id), + ?assertEqual(Doc2, Doc2Open), + {ok, Doc3Open} = fabric2_db:open_doc(Db, Doc3#doc.id), + ?assertEqual(Doc3, Doc3Open). + + +update_docs_batches(Db) -> + Opts = [{batch_size, 5000}], + + Docs1 = [doc(9000), doc(9000)], + + meck:reset(erlfdb), + ?assertMatch({ok, [_ | _]}, fabric2_db:update_docs(Db, Docs1, Opts)), + ?assertEqual(2, meck:num_calls(erlfdb, transactional, 2)), + + lists:foreach(fun(#doc{} = Doc) -> + ?assertMatch({ok, #doc{}}, fabric2_db:open_doc(Db, Doc#doc.id)) + end, Docs1), + + Docs2 = [doc(10), doc(10), doc(9000), doc(10)], + + meck:reset(erlfdb), + ?assertMatch({ok, [_ | _]}, fabric2_db:update_docs(Db, Docs2, Opts)), + ?assertEqual(2, meck:num_calls(erlfdb, transactional, 2)), + + lists:foreach(fun(#doc{} = Doc) -> + ?assertMatch({ok, #doc{}}, fabric2_db:open_doc(Db, Doc#doc.id)) + end, Docs2). + + +update_docs_replicated_batches(Db) -> + Opts = [{batch_size, 5000}, replicated_changes], + + Docs1 = [doc(Size, {1, [rev()]}) || Size <- [9000, 9000]], + + meck:reset(erlfdb), + ?assertMatch({ok, []}, fabric2_db:update_docs(Db, Docs1, Opts)), + ?assertEqual(2, meck:num_calls(erlfdb, transactional, 2)), + + lists:foreach(fun(#doc{} = Doc) -> + ?assertEqual({ok, Doc}, fabric2_db:open_doc(Db, Doc#doc.id)) + end, Docs1), + + Docs2 = [doc(Size, {1, [rev()]}) || Size <- [10, 10, 9000, 10]], + + meck:reset(erlfdb), + ?assertMatch({ok, []}, fabric2_db:update_docs(Db, Docs2, Opts)), + ?assertEqual(2, meck:num_calls(erlfdb, transactional, 2)), + + lists:foreach(fun(#doc{} = Doc) -> + ?assertEqual({ok, Doc}, fabric2_db:open_doc(Db, Doc#doc.id)) + end, Docs2). + + +update_docs_duplicate_ids_conflict(Db) -> + Doc = doc(), + + Res = fabric2_db:update_docs(Db, [Doc, doc(), Doc]), + ?assertMatch({ok, [_, _, _]}, Res), + + {ok, [Doc1Res, Doc2Res, Doc3Res]} = Res, + ?assertMatch({ok, {1, <<_/binary>>}}, Doc1Res), + ?assertMatch({ok, {1, <<_/binary>>}}, Doc2Res), + ?assertMatch(conflict, Doc3Res). + + +update_docs_duplicate_ids_with_batches(Db) -> + Opts = [{batch_size, 5000}], + + Doc = doc(9000), + + meck:reset(erlfdb), + Res = fabric2_db:update_docs(Db, [Doc, doc(9000), Doc], Opts), + ?assertMatch({ok, [_, _, _]}, Res), + ?assertEqual(3, meck:num_calls(erlfdb, transactional, 2)), + + {ok, [Doc1Res, Doc2Res, Doc3Res]} = Res, + ?assertMatch({ok, {1, <<_/binary>>}}, Doc1Res), + ?assertMatch({ok, {1, <<_/binary>>}}, Doc2Res), + ?assertMatch(conflict, Doc3Res). + + +update_docs_replicate_batches_duplicate_id(Db) -> + Opts = [replicated_changes], + + Doc = doc(10, {1, [rev()]}), + Docs = [Doc, Doc], + + meck:reset(erlfdb), + ?assertMatch({ok, []}, fabric2_db:update_docs(Db, Docs, Opts)), + ?assertEqual(2, meck:num_calls(erlfdb, transactional, 2)), + + ?assertEqual({ok, Doc}, fabric2_db:open_doc(Db, Doc#doc.id)). + + +% Utility functions + +doc() -> + doc(2). + + +doc(Size) -> + doc(Size, undefined). + + +doc(Size, Revs) -> + Doc = #doc{ + id = fabric2_util:uuid(), + body = doc_body(Size) + }, + case Revs of + undefined -> Doc; + _ -> Doc#doc{revs = Revs} + end. + + +rev() -> + fabric2_util:to_hex(crypto:strong_rand_bytes(16)). + + +doc_body(Size) when is_integer(Size), Size >= 2 -> + Val = fabric2_util:to_hex(crypto:strong_rand_bytes(Size div 2)), + {[{<<"x">>, Val}]}. -- cgit v1.2.1