summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2020-05-19 12:52:02 -0400
committerNick Vatamaniuc <vatamane@apache.org>2020-05-20 15:34:13 -0400
commit2c44eafa482d19a6b7a0054926e427a0876341f1 (patch)
tree6e96ef4edfa27d037a1a8a1db12ac1aa060ba2cc
parent6f2417e1af712b3720cf6c07713d7751cbc9fbef (diff)
downloadcouchdb-batch-bulk-docs.tar.gz
Bulk docs transaction batchingbatch-bulk-docs
* Interactive (regular) requests are split into smaller transactions, so larger updates won't fail with either timeout so or transaction too large FDB errors. * Non-interactive (replicated) requests can now batch their updates in a few transaction and gain extra performance. Batch size is configurable: ``` [fabric] update_docs_batch_size = 5000000 ```
-rw-r--r--rebar.config.script2
-rw-r--r--rel/overlay/etc/default.ini3
-rw-r--r--src/fabric/src/fabric2_db.erl174
-rw-r--r--src/fabric/src/fabric2_fdb.erl8
-rw-r--r--src/fabric/test/fabric2_update_docs_tests.erl222
5 files changed, 380 insertions, 29 deletions
diff --git a/rebar.config.script b/rebar.config.script
index 03c380f46..c145566a3 100644
--- a/rebar.config.script
+++ b/rebar.config.script
@@ -152,7 +152,7 @@ DepDescs = [
%% Independent Apps
{config, "config", {tag, "2.1.7"}},
{b64url, "b64url", {tag, "1.0.2"}},
-{erlfdb, "erlfdb", {tag, "v1.2.0"}},
+{erlfdb, "erlfdb", {tag, "v1.2.1"}},
{ets_lru, "ets-lru", {tag, "1.1.0"}},
{khash, "khash", {tag, "1.1.0"}},
{snappy, "snappy", {tag, "CouchDB-1.0.4"}},
diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini
index 66680a4e8..35e5147b2 100644
--- a/rel/overlay/etc/default.ini
+++ b/rel/overlay/etc/default.ini
@@ -242,6 +242,9 @@ port = 6984
;
; Byte size of binary chunks written to FDB values. Defaults to FDB max limit.
;binary_chunk_size = 100000
+;
+; Bulk docs transaction batch size in bytes
+;update_docs_batch_size = 5000000
; [rexi]
; buffer_count = 2000
diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl
index 8764d4e18..ae03285e3 100644
--- a/src/fabric/src/fabric2_db.erl
+++ b/src/fabric/src/fabric2_db.erl
@@ -156,6 +156,20 @@
-define(RETURN(Term), throw({?MODULE, Term})).
+-define(DEFAULT_UPDATE_DOCS_BATCH_SIZE, 5000000).
+
+
+-record(bacc, {
+ db,
+ docs,
+ batch_size,
+ options,
+ rev_futures,
+ seen,
+ seen_tx,
+ results
+}).
+
create(DbName, Options) ->
case validate_dbname(DbName) of
@@ -861,18 +875,8 @@ update_docs(Db, Docs0, Options) ->
Docs1 = apply_before_doc_update(Db, Docs0, Options),
try
validate_atomic_update(Docs0, lists:member(all_or_nothing, Options)),
- Resps0 = case lists:member(replicated_changes, Options) of
- false ->
- fabric2_fdb:transactional(Db, fun(TxDb) ->
- update_docs_interactive(TxDb, Docs1, Options)
- end);
- true ->
- lists:map(fun(Doc) ->
- fabric2_fdb:transactional(Db, fun(TxDb) ->
- update_doc_int(TxDb, Doc, Options)
- end)
- end, Docs1)
- end,
+
+ Resps0 = batch_update_docs(Db, Docs1, Options),
% Notify index builder
fabric2_index:db_updated(name(Db)),
@@ -895,7 +899,7 @@ update_docs(Db, Docs0, Options) ->
Else
end
end, Resps0),
- case lists:member(replicated_changes, Options) of
+ case is_replicated(Options) of
true ->
{ok, lists:flatmap(fun(R) ->
case R of
@@ -1647,9 +1651,8 @@ update_doc_int(#{} = Db, #doc{} = Doc, Options) ->
<<?LOCAL_DOC_PREFIX, _/binary>> -> true;
_ -> false
end,
- IsReplicated = lists:member(replicated_changes, Options),
try
- case {IsLocal, IsReplicated} of
+ case {IsLocal, is_replicated(Options)} of
{false, false} -> update_doc_interactive(Db, Doc, Options);
{false, true} -> update_doc_replicated(Db, Doc, Options);
{true, _} -> update_local_doc(Db, Doc, Options)
@@ -1659,17 +1662,119 @@ update_doc_int(#{} = Db, #doc{} = Doc, Options) ->
end.
-update_docs_interactive(Db, Docs0, Options) ->
- Docs = tag_docs(Docs0),
- Futures = get_winning_rev_futures(Db, Docs),
- {Result, _} = lists:mapfoldl(fun(Doc, SeenIds) ->
- try
- update_docs_interactive(Db, Doc, Options, Futures, SeenIds)
- catch throw:{?MODULE, Return} ->
- {Return, SeenIds}
+batch_update_docs(Db, Docs, Options) ->
+ BAcc = #bacc{
+ db = Db,
+ docs = Docs,
+ batch_size = get_batch_size(Options),
+ options = Options,
+ rev_futures = #{},
+ seen = [],
+ seen_tx = [],
+ results = []
+ },
+ #bacc{results = Res} = batch_update_docs(BAcc),
+ lists:reverse(Res).
+
+
+batch_update_docs(#bacc{docs = []} = BAcc) ->
+ BAcc;
+
+batch_update_docs(#bacc{db = Db} = BAcc) ->
+ #bacc{
+ db = Db,
+ docs = Docs,
+ options = Options
+ } = BAcc,
+
+ BAccTx2 = fabric2_fdb:transactional(Db, fun(TxDb) ->
+ BAccTx = BAcc#bacc{db = TxDb},
+ case is_replicated(Options) of
+ false ->
+ Tagged = tag_docs(Docs),
+ RevFutures = get_winning_rev_futures(TxDb, Tagged),
+ BAccTx1 = BAccTx#bacc{
+ docs = Tagged,
+ rev_futures = RevFutures
+ },
+ batch_update_interactive_tx(BAccTx1);
+ true ->
+ batch_update_replicated_tx(BAccTx)
end
- end, [], Docs),
- Result.
+ end),
+
+ % Clean up after the transaction ends so we can recurse with a clean state
+ maps:map(fun(Tag, RangeFuture) when is_reference(Tag) ->
+ ok = erlfdb:cancel(RangeFuture, [flush])
+ end, BAccTx2#bacc.rev_futures),
+
+ BAcc1 = BAccTx2#bacc{
+ db = Db,
+ rev_futures = #{},
+ seen_tx = []
+ },
+
+ batch_update_docs(BAcc1).
+
+
+batch_update_interactive_tx(#bacc{docs = []} = BAcc) ->
+ BAcc;
+
+batch_update_interactive_tx(#bacc{} = BAcc) ->
+ #bacc{
+ db = TxDb,
+ docs = [Doc | Docs],
+ options = Options,
+ batch_size = MaxSize,
+ rev_futures = RevFutures,
+ seen = Seen,
+ results = Results
+ } = BAcc,
+ {Res, Seen1} = try
+ update_docs_interactive(TxDb, Doc, Options, RevFutures, Seen)
+ catch throw:{?MODULE, Return} ->
+ {Return, Seen}
+ end,
+ BAcc1 = BAcc#bacc{
+ docs = Docs,
+ results = [Res | Results],
+ seen = Seen1
+ },
+ case fabric2_fdb:get_approximate_tx_size(TxDb) > MaxSize of
+ true -> BAcc1;
+ false -> batch_update_interactive_tx(BAcc1)
+ end.
+
+
+batch_update_replicated_tx(#bacc{docs = []} = BAcc) ->
+ BAcc;
+
+batch_update_replicated_tx(#bacc{} = BAcc) ->
+ #bacc{
+ db = TxDb,
+ docs = [Doc | Docs],
+ options = Options,
+ batch_size = MaxSize,
+ seen_tx = SeenTx,
+ results = Results
+ } = BAcc,
+ case lists:member(Doc#doc.id, SeenTx) of
+ true ->
+ % If we already updated this doc in the current transaction, wait
+ % till the next transaction to update it again.
+ BAcc;
+ false ->
+ Res = update_doc_int(TxDb, Doc, Options),
+ BAcc1 = BAcc#bacc{
+ docs = Docs,
+ results = [Res | Results],
+ seen_tx = [Doc#doc.id | SeenTx]
+ },
+ case fabric2_fdb:get_approximate_tx_size(TxDb) > MaxSize of
+ true -> BAcc1;
+ false -> batch_update_replicated_tx(BAcc1)
+ end
+ end.
update_docs_interactive(Db, #doc{id = <<?LOCAL_DOC_PREFIX, _/binary>>} = Doc,
@@ -2122,9 +2227,8 @@ doc_to_revid(#doc{revs = Revs}) ->
tag_docs([]) ->
[];
tag_docs([#doc{meta = Meta} = Doc | Rest]) ->
- NewDoc = Doc#doc{
- meta = [{ref, make_ref()} | Meta]
- },
+ Meta1 = lists:keystore(ref, 1, Meta, {ref, make_ref()}),
+ NewDoc = Doc#doc{meta = Meta1},
[NewDoc | tag_docs(Rest)].
@@ -2226,3 +2330,17 @@ get_cached_db(#{} = Db, Opts) when is_list(Opts) ->
fabric2_fdb:ensure_current(TxDb)
end)
end.
+
+
+is_replicated(Options) when is_list(Options) ->
+ lists:member(replicated_changes, Options).
+
+
+get_batch_size(Options) ->
+ case fabric2_util:get_value(batch_size, Options) of
+ undefined ->
+ config:get_integer("fabric", "update_docs_batch_size",
+ ?DEFAULT_UPDATE_DOCS_BATCH_SIZE);
+ Val when is_integer(Val) ->
+ Val
+ end.
diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl
index f274aa606..e8f6e0daa 100644
--- a/src/fabric/src/fabric2_fdb.erl
+++ b/src/fabric/src/fabric2_fdb.erl
@@ -75,6 +75,8 @@
new_versionstamp/1,
+ get_approximate_tx_size/1,
+
debug_cluster/0,
debug_cluster/2
]).
@@ -1159,6 +1161,12 @@ new_versionstamp(Tx) ->
{versionstamp, 16#FFFFFFFFFFFFFFFF, 16#FFFF, TxId}.
+get_approximate_tx_size(#{} = TxDb) ->
+ require_transaction(TxDb),
+ #{tx := Tx} = TxDb,
+ erlfdb:wait(erlfdb:get_approximate_size(Tx)).
+
+
debug_cluster() ->
debug_cluster(<<>>, <<16#FE, 16#FF, 16#FF>>).
diff --git a/src/fabric/test/fabric2_update_docs_tests.erl b/src/fabric/test/fabric2_update_docs_tests.erl
new file mode 100644
index 000000000..5a2389abf
--- /dev/null
+++ b/src/fabric/test/fabric2_update_docs_tests.erl
@@ -0,0 +1,222 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric2_update_docs_tests).
+
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include("fabric2_test.hrl").
+
+
+update_docs_test_() ->
+ {
+ "Test update_docs",
+ {
+ setup,
+ fun setup_all/0,
+ fun teardown_all/1,
+ {
+ foreach,
+ fun setup/0,
+ fun cleanup/1,
+ [
+ ?TDEF_FE(update_docs),
+ ?TDEF_FE(update_docs_replicated),
+ ?TDEF_FE(update_docs_batches),
+ ?TDEF_FE(update_docs_replicated_batches),
+ ?TDEF_FE(update_docs_duplicate_ids_conflict),
+ ?TDEF_FE(update_docs_duplicate_ids_with_batches),
+ ?TDEF_FE(update_docs_replicate_batches_duplicate_id)
+ ]
+ }
+ }
+ }.
+
+
+setup_all() ->
+ test_util:start_couch([fabric]).
+
+
+teardown_all(Ctx) ->
+ test_util:stop_couch(Ctx).
+
+
+setup() ->
+ {ok, Db} = fabric2_db:create(?tempdb(), [{user_ctx, ?ADMIN_USER}]),
+ meck:new(erlfdb, [passthrough]),
+ Db.
+
+
+cleanup(#{} = Db) ->
+ meck:unload(),
+ ok = fabric2_db:delete(fabric2_db:name(Db), []).
+
+
+update_docs(Db) ->
+ ?assertEqual({ok, []}, fabric2_db:update_docs(Db, [])),
+
+ Doc1 = doc(),
+ Res1 = fabric2_db:update_docs(Db, [Doc1]),
+ ?assertMatch({ok, [_]}, Res1),
+ {ok, [Doc1Res]} = Res1,
+ ?assertMatch({ok, {1, <<_/binary>>}}, Doc1Res),
+ {ok, {1, Rev1}} = Doc1Res,
+ {ok, Doc1Open} = fabric2_db:open_doc(Db, Doc1#doc.id),
+ ?assertEqual(Doc1#doc{revs = {1, [Rev1]}}, Doc1Open),
+
+ Doc2 = doc(),
+ Doc3 = doc(),
+ Res2 = fabric2_db:update_docs(Db, [Doc2, Doc3]),
+ ?assertMatch({ok, [_, _]}, Res2),
+ {ok, [Doc2Res, Doc3Res]} = Res2,
+ ?assertMatch({ok, {1, <<_/binary>>}}, Doc2Res),
+ ?assertMatch({ok, {1, <<_/binary>>}}, Doc3Res).
+
+
+update_docs_replicated(Db) ->
+ Opts = [replicated_changes],
+
+ ?assertEqual({ok, []}, fabric2_db:update_docs(Db, [], Opts)),
+
+ Doc1 = doc(10, {1, [rev()]}),
+ ?assertMatch({ok, []}, fabric2_db:update_docs(Db, [Doc1], Opts)),
+ {ok, Doc1Open} = fabric2_db:open_doc(Db, Doc1#doc.id),
+ ?assertEqual(Doc1, Doc1Open),
+
+ Doc2 = doc(10, {1, [rev()]}),
+ Doc3 = doc(10, {1, [rev()]}),
+ ?assertMatch({ok, []}, fabric2_db:update_docs(Db, [Doc2, Doc3], Opts)),
+ {ok, Doc2Open} = fabric2_db:open_doc(Db, Doc2#doc.id),
+ ?assertEqual(Doc2, Doc2Open),
+ {ok, Doc3Open} = fabric2_db:open_doc(Db, Doc3#doc.id),
+ ?assertEqual(Doc3, Doc3Open).
+
+
+update_docs_batches(Db) ->
+ Opts = [{batch_size, 5000}],
+
+ Docs1 = [doc(9000), doc(9000)],
+
+ meck:reset(erlfdb),
+ ?assertMatch({ok, [_ | _]}, fabric2_db:update_docs(Db, Docs1, Opts)),
+ ?assertEqual(2, meck:num_calls(erlfdb, transactional, 2)),
+
+ lists:foreach(fun(#doc{} = Doc) ->
+ ?assertMatch({ok, #doc{}}, fabric2_db:open_doc(Db, Doc#doc.id))
+ end, Docs1),
+
+ Docs2 = [doc(10), doc(10), doc(9000), doc(10)],
+
+ meck:reset(erlfdb),
+ ?assertMatch({ok, [_ | _]}, fabric2_db:update_docs(Db, Docs2, Opts)),
+ ?assertEqual(2, meck:num_calls(erlfdb, transactional, 2)),
+
+ lists:foreach(fun(#doc{} = Doc) ->
+ ?assertMatch({ok, #doc{}}, fabric2_db:open_doc(Db, Doc#doc.id))
+ end, Docs2).
+
+
+update_docs_replicated_batches(Db) ->
+ Opts = [{batch_size, 5000}, replicated_changes],
+
+ Docs1 = [doc(Size, {1, [rev()]}) || Size <- [9000, 9000]],
+
+ meck:reset(erlfdb),
+ ?assertMatch({ok, []}, fabric2_db:update_docs(Db, Docs1, Opts)),
+ ?assertEqual(2, meck:num_calls(erlfdb, transactional, 2)),
+
+ lists:foreach(fun(#doc{} = Doc) ->
+ ?assertEqual({ok, Doc}, fabric2_db:open_doc(Db, Doc#doc.id))
+ end, Docs1),
+
+ Docs2 = [doc(Size, {1, [rev()]}) || Size <- [10, 10, 9000, 10]],
+
+ meck:reset(erlfdb),
+ ?assertMatch({ok, []}, fabric2_db:update_docs(Db, Docs2, Opts)),
+ ?assertEqual(2, meck:num_calls(erlfdb, transactional, 2)),
+
+ lists:foreach(fun(#doc{} = Doc) ->
+ ?assertEqual({ok, Doc}, fabric2_db:open_doc(Db, Doc#doc.id))
+ end, Docs2).
+
+
+update_docs_duplicate_ids_conflict(Db) ->
+ Doc = doc(),
+
+ Res = fabric2_db:update_docs(Db, [Doc, doc(), Doc]),
+ ?assertMatch({ok, [_, _, _]}, Res),
+
+ {ok, [Doc1Res, Doc2Res, Doc3Res]} = Res,
+ ?assertMatch({ok, {1, <<_/binary>>}}, Doc1Res),
+ ?assertMatch({ok, {1, <<_/binary>>}}, Doc2Res),
+ ?assertMatch(conflict, Doc3Res).
+
+
+update_docs_duplicate_ids_with_batches(Db) ->
+ Opts = [{batch_size, 5000}],
+
+ Doc = doc(9000),
+
+ meck:reset(erlfdb),
+ Res = fabric2_db:update_docs(Db, [Doc, doc(9000), Doc], Opts),
+ ?assertMatch({ok, [_, _, _]}, Res),
+ ?assertEqual(3, meck:num_calls(erlfdb, transactional, 2)),
+
+ {ok, [Doc1Res, Doc2Res, Doc3Res]} = Res,
+ ?assertMatch({ok, {1, <<_/binary>>}}, Doc1Res),
+ ?assertMatch({ok, {1, <<_/binary>>}}, Doc2Res),
+ ?assertMatch(conflict, Doc3Res).
+
+
+update_docs_replicate_batches_duplicate_id(Db) ->
+ Opts = [replicated_changes],
+
+ Doc = doc(10, {1, [rev()]}),
+ Docs = [Doc, Doc],
+
+ meck:reset(erlfdb),
+ ?assertMatch({ok, []}, fabric2_db:update_docs(Db, Docs, Opts)),
+ ?assertEqual(2, meck:num_calls(erlfdb, transactional, 2)),
+
+ ?assertEqual({ok, Doc}, fabric2_db:open_doc(Db, Doc#doc.id)).
+
+
+% Utility functions
+
+doc() ->
+ doc(2).
+
+
+doc(Size) ->
+ doc(Size, undefined).
+
+
+doc(Size, Revs) ->
+ Doc = #doc{
+ id = fabric2_util:uuid(),
+ body = doc_body(Size)
+ },
+ case Revs of
+ undefined -> Doc;
+ _ -> Doc#doc{revs = Revs}
+ end.
+
+
+rev() ->
+ fabric2_util:to_hex(crypto:strong_rand_bytes(16)).
+
+
+doc_body(Size) when is_integer(Size), Size >= 2 ->
+ Val = fabric2_util:to_hex(crypto:strong_rand_bytes(Size div 2)),
+ {[{<<"x">>, Val}]}.