diff options
author | Nick Vatamaniuc <vatamane@apache.org> | 2020-01-31 14:58:41 -0500 |
---|---|---|
committer | Nick Vatamaniuc <vatamane@apache.org> | 2020-02-10 12:50:44 -0500 |
commit | 9665d86031fe4a633a07743f70e3bb9a40127da9 (patch) | |
tree | c558d5fa8ffdb54c17f09fce8f11cbba3eca9ea1 | |
parent | f4315660b2abf0570b27b581b71bd618a4f0e538 (diff) | |
download | couchdb-multi-transactional-iterators-2.tar.gz |
Use multi-transactional iteratorsmulti-transactional-iterators-2
-rw-r--r-- | src/fabric/include/fabric2.hrl | 5 | ||||
-rw-r--r-- | src/fabric/src/fabric2_db.erl | 35 | ||||
-rw-r--r-- | src/fabric/src/fabric2_fdb.erl | 255 | ||||
-rw-r--r-- | src/fabric/src/fabric2_server.erl | 1 | ||||
-rw-r--r-- | src/fabric/src/fabric2_util.erl | 5 |
5 files changed, 263 insertions, 38 deletions
diff --git a/src/fabric/include/fabric2.hrl b/src/fabric/include/fabric2.hrl index 828a51b8f..0be27fb39 100644 --- a/src/fabric/include/fabric2.hrl +++ b/src/fabric/include/fabric2.hrl @@ -57,7 +57,10 @@ -define(PDICT_TX_ID_KEY, '$fabric_tx_id'). -define(PDICT_TX_RES_KEY, '$fabric_tx_result'). -define(PDICT_ON_COMMIT_FUN, '$fabric_on_commit_fun'). --define(COMMIT_UNKNOWN_RESULT, 1021). +-define(PDICT_ITER_CHECKPOINT, '$fabric_iter_checkpoint'). +-define(PDICT_ITER_VALIDATE_DB, '$fabric_iter_validate_db'). +-define(COMMIT_UNKNOWN_RESULT, 1021). +-define(TRANSACTION_TOO_OLD, 1007). -define(BINARY_CHUNK_SIZE, 100000). diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl index 6d015df0e..8b6a57452 100644 --- a/src/fabric/src/fabric2_db.erl +++ b/src/fabric/src/fabric2_db.erl @@ -175,7 +175,8 @@ open(DbName, Options) -> case fabric2_server:fetch(DbName) of #{} = Db -> Db1 = maybe_set_user_ctx(Db, Options), - {ok, require_member_check(Db1)}; + Db2 = set_tx_options(Db1, Options), + {ok, require_member_check(Db2)}; undefined -> Result = fabric2_fdb:transactional(DbName, Options, fun(TxDb) -> fabric2_fdb:open(TxDb, Options) @@ -211,18 +212,20 @@ list_dbs() -> list_dbs(Options) -> + TxFun = tx_fun(Options), Callback = fun(DbName, Acc) -> [DbName | Acc] end, - DbNames = fabric2_fdb:transactional(fun(Tx) -> + DbNames = fabric2_fdb:TxFun(fun(Tx) -> fabric2_fdb:list_dbs(Tx, Callback, [], Options) - end), + end, Options), lists:reverse(DbNames). list_dbs(UserFun, UserAcc0, Options) -> + TxFun = tx_fun(Options), FoldFun = fun (DbName, Acc) -> maybe_stop(UserFun({row, [{id, DbName}]}, Acc)) end, - fabric2_fdb:transactional(fun(Tx) -> + fabric2_fdb:TxFun(fun(Tx) -> try UserAcc1 = maybe_stop(UserFun({meta, []}, UserAcc0)), UserAcc2 = fabric2_fdb:list_dbs( @@ -235,7 +238,7 @@ list_dbs(UserFun, UserAcc0, Options) -> catch throw:{stop, FinalUserAcc} -> {ok, FinalUserAcc} end - end). + end, Options). is_admin(Db, {SecProps}) when is_list(SecProps) -> @@ -755,7 +758,8 @@ fold_docs(Db, UserFun, UserAcc) -> fold_docs(Db, UserFun, UserAcc0, Options) -> - fabric2_fdb:transactional(Db, fun(TxDb) -> + TxFun = tx_fun(Options), + fabric2_fdb:TxFun(Db, fun(TxDb) -> try #{ db_prefix := DbPrefix @@ -780,7 +784,7 @@ fold_docs(Db, UserFun, UserAcc0, Options) -> catch throw:{stop, FinalUserAcc} -> {ok, FinalUserAcc} end - end). + end, Options). fold_design_docs(Db, UserFun, UserAcc0, Options1) -> @@ -829,7 +833,8 @@ fold_changes(Db, SinceSeq, UserFun, UserAcc) -> fold_changes(Db, SinceSeq, UserFun, UserAcc, Options) -> - fabric2_fdb:transactional(Db, fun(TxDb) -> + TxFun = tx_fun(Options), + fabric2_fdb:TxFun(Db, fun(TxDb) -> try #{ db_prefix := DbPrefix @@ -868,7 +873,7 @@ fold_changes(Db, SinceSeq, UserFun, UserAcc, Options) -> catch throw:{stop, FinalUserAcc} -> {ok, FinalUserAcc} end - end). + end, Options). dbname_suffix(DbName) -> @@ -1001,6 +1006,11 @@ maybe_set_user_ctx(Db, Options) -> end. +set_tx_options(Db, Options) -> + TxOptions = fabric2_util:get_value(tx_options, Options, []), + Db#{tx_options := TxOptions}. + + is_member(Db, {SecProps}) when is_list(SecProps) -> case is_admin(Db, {SecProps}) of true -> @@ -1765,3 +1775,10 @@ stem_revisions(#{} = Db, #doc{} = Doc) -> true -> Doc#doc{revs = {RevPos, lists:sublist(Revs, RevsLimit)}}; false -> Doc end. + + +tx_fun(Options) when is_list(Options) -> + case fabric2_util:get_value(iterator, Options, true) of + true -> with_iter; + undefined -> transactional + end. diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl index 6abe1f6de..8d45e418d 100644 --- a/src/fabric/src/fabric2_fdb.erl +++ b/src/fabric/src/fabric2_fdb.erl @@ -24,6 +24,8 @@ delete/1, exists/1, + with_iter/2, + get_dir/1, list_dbs/4, @@ -69,17 +71,31 @@ -include("fabric2.hrl"). -transactional(Fun) -> - do_transaction(Fun, undefined). +-record(fold_acc, { + start_key, + end_key, + skip, + opts, + ucallback, + uacc +}). + + + +transactional(Fun) when is_function(Fun, 1) -> + transactional(Fun, []). transactional(DbName, Options, Fun) when is_binary(DbName) -> with_span(Fun, #{'db.name' => DbName}, fun() -> transactional(fun(Tx) -> Fun(init_db(Tx, DbName, Options)) - end) + end, Options) end). +transactional(Fun, Options) when is_function(Fun, 1), is_list(Options) -> + TxOptions = fabric2_util:get_value(tx_options, Options, []), + do_transaction(Fun, undefined, TxOptions); transactional(#{tx := undefined} = Db, Fun) -> DbName = maps:get(name, Db, undefined), @@ -91,13 +107,14 @@ transactional(#{tx := undefined} = Db, Fun) -> true -> undefined; false -> maps:get(layer_prefix, Db2) end, + Options = maps:get(tx_options, Db2, []), with_span(Fun, #{'db.name' => DbName}, fun() -> do_transaction(fun(Tx) -> case Reopen of true -> Fun(reopen(Db2#{tx => Tx})); false -> Fun(Db2#{tx => Tx}) end - end, LayerPrefix) + end, LayerPrefix, Options) end) catch throw:{?MODULE, reopen} -> with_span('db.reopen', #{'db.name' => DbName}, fun() -> @@ -112,19 +129,20 @@ transactional(#{tx := {erlfdb_transaction, _}} = Db, Fun) -> end). -do_transaction(Fun, LayerPrefix) when is_function(Fun, 1) -> +do_transaction(Fun, LayerPrefix, Options) when is_function(Fun, 1) -> Db = get_db_handle(), try erlfdb:transactional(Db, fun(Tx) -> - case get(erlfdb_trace) of + TraceOpts = case get(erlfdb_trace) of Name when is_binary(Name) -> UId = erlang:unique_integer([positive]), UIdBin = integer_to_binary(UId, 36), TxId = <<Name/binary, "_", UIdBin/binary>>, - erlfdb:set_option(Tx, transaction_logging_enable, TxId); + [{transaction_logging_enable, TxId}]; _ -> - ok + [] end, + apply_tx_options(Tx, TraceOpts ++ Options), case is_transaction_applied(Tx) of true -> get_previous_transaction_result(); @@ -184,6 +202,9 @@ create(#{} = Db0, Options) -> UserCtx = fabric2_util:get_value(user_ctx, Options, #user_ctx{}), Options1 = lists:keydelete(user_ctx, 1, Options), + TxOptions = fabric2_util:get_value(tx_options, Options1, []), + Options2 = lists:keydelete(tx_options, 1, Options1), + Db#{ uuid => UUID, db_prefix => DbPrefix, @@ -198,7 +219,8 @@ create(#{} = Db0, Options) -> after_doc_read => undefined, % All other db things as we add features, - db_options => Options1 + db_options => Options2, + tx_options => TxOptions }. @@ -221,6 +243,9 @@ open(#{} = Db0, Options) -> UserCtx = fabric2_util:get_value(user_ctx, Options, #user_ctx{}), Options1 = lists:keydelete(user_ctx, 1, Options), + TxOptions = fabric2_util:get_value(tx_options, Options1, []), + Options2 = lists:keydelete(tx_options, 1, Options1), + Db2 = Db1#{ db_prefix => DbPrefix, db_version => DbVersion, @@ -237,7 +262,8 @@ open(#{} = Db0, Options) -> before_doc_update => undefined, after_doc_read => undefined, - db_options => Options1 + db_options => Options2, + tx_options => TxOptions }, Db3 = load_config(Db2), @@ -307,6 +333,23 @@ exists(#{name := DbName} = Db) when is_binary(DbName) -> end. +with_iter(Fun, Options) when is_function(Fun, 1), is_list(Options) -> + Tx = create_iter(fabric2_util:get_value(tx_options, Options, [])), + try + Fun(Tx) + after + destroy_iter(Tx) + end; + +with_iter(#{tx := undefined} = Db, Fun) when is_function(Fun, 1) -> + IterDb = create_iter(Db), + try + Fun(IterDb) + after + destroy_iter(IterDb) + end. + + get_dir(Tx) -> Root = erlfdb_directory:root(), Dir = fabric2_server:fdb_directory(), @@ -821,19 +864,50 @@ fold_range(#{} = Db, RangePrefix, Callback, Acc, Options) -> } = ensure_current(Db), fold_range({tx, Tx}, RangePrefix, Callback, Acc, Options); -fold_range({tx, Tx}, RangePrefix, UserCallback, UserAcc, Options) -> - case fabric2_util:get_value(limit, Options) of - 0 -> +fold_range({tx, Tx}, RangePrefix, Callback, Acc, Options) -> + Iterator = fabric2_util:get_value(iterator, Options), + case {fabric2_util:get_value(limit, Options), Iterator} of + {0, _} -> % FoundationDB treats a limit of 0 as unlimited % so we have to guard for that here. - UserAcc; - _ -> - {Start, End, Skip, FoldOpts} = get_fold_opts(RangePrefix, Options), - Callback = fun fold_range_cb/2, - Acc = {skip, Skip, UserCallback, UserAcc}, - {skip, _, UserCallback, OutAcc} = - erlfdb:fold_range(Tx, Start, End, Callback, Acc, FoldOpts), - OutAcc + Acc; + {_, undefined} -> + FAcc = get_fold_acc(RangePrefix, Callback, Acc, Options), + fold_range_tx(Tx, FAcc); + {_, true} -> + FAcc = get_fold_acc(RangePrefix, Callback, Acc, Options), + fold_range_iter(Tx, FAcc) + end. + + +fold_range_tx(Tx, #fold_acc{} = FAcc) -> + #fold_acc{ + start_key = Start, + end_key = End, + opts = Opts + } = FAcc, + Callback = fun fold_range_tx_cb/2, + FAccOut = erlfdb:fold_range(Tx, Start, End, Callback, FAcc, Opts), + FAccOut#fold_acc.uacc. + + +fold_range_iter(Tx, #fold_acc{} = FAcc) -> + #fold_acc{ + start_key = Start, + end_key = End, + opts = Opts + } = FAcc, + Callback = fun fold_range_iter_cb/2, + put(?PDICT_ITER_CHECKPOINT, FAcc), + try erlfdb:fold_range(Tx, Start, End, Callback, FAcc, Opts) of + #fold_acc{uacc = UAccOut} -> UAccOut + catch + error:{erlfdb_error, ?TRANSACTION_TOO_OLD} -> + #fold_acc{} = FAcc1 = get(?PDICT_ITER_CHECKPOINT), + io:format(standard_error, "~n **** resetting Tx ~p FAcc1:~p~n", [Tx, FAcc1]), + couch_log:error("**** RESETTING Tx ~p FAcc1:~p~n", [Tx, FAcc1]), + ok = reset_iter_tx(Tx), + fold_range_iter(Tx, FAcc1) end. @@ -1277,7 +1351,7 @@ chunkify_binary(Data) -> end. -get_fold_opts(RangePrefix, Options) -> +get_fold_acc(RangePrefix, UserCallback, UserAcc, Options) -> Reverse = case fabric2_util:get_value(dir, Options) of rev -> true; _ -> false @@ -1367,15 +1441,75 @@ get_fold_opts(RangePrefix, Options) -> ++ StreamingMode ++ Snapshot, - {StartKey3, EndKey3, Skip, OutOpts}. + #fold_acc{ + start_key = StartKey3, + end_key = EndKey3, + skip = Skip, + opts = OutOpts, + ucallback = UserCallback, + uacc = UserAcc + }. + + +fold_range_tx_cb(KV, #fold_acc{skip = 0} = FAcc) -> + #fold_acc{ucallback = UCallback, uacc = UAcc} = FAcc, + NewUAcc = UCallback(KV, UAcc), + FAcc#fold_acc{uacc = NewUAcc}; + +fold_range_tx_cb(_KV, #fold_acc{skip = N} = FAcc) when N > 0 -> + FAcc#fold_acc{skip = N - 1}. + +fold_range_iter_cb({K, V}, #fold_acc{skip = 0} = FAcc) -> + #fold_acc{ucallback = UCallback, uacc = UAcc} = FAcc, + NewUAcc = UCallback({K, V}, UAcc), + NewFAcc = next_iter_acc(K, FAcc), + put(?PDICT_ITER_CHECKPOINT, NewFAcc), + NewFAcc#fold_acc{uacc = NewUAcc}; -fold_range_cb(KV, {skip, 0, Callback, Acc}) -> - NewAcc = Callback(KV, Acc), - {skip, 0, Callback, NewAcc}; -fold_range_cb(_KV, {skip, N, Callback, Acc}) when is_integer(N), N > 0 -> - {skip, N - 1, Callback, Acc}. +fold_range_iter_cb({K, _V}, #fold_acc{skip = N} = FAcc) when N > 0 -> + put(?PDICT_ITER_CHECKPOINT, FAcc), + next_iter_acc(K, FAcc). + + +next_iter_acc(K, #fold_acc{} = FAcc) -> + #fold_acc{skip = Skip, opts = Opts} = FAcc, + Opts1 = case fabric2_util:get_value(limit, Opts) of + N when is_integer(N), N > 0 -> + fabric2_util:replace_value(limit, Opts, N - 1); + undefined -> + Opts + end, + FAcc1 = FAcc#fold_acc{opts = Opts1, skip = max(0, Skip - 1)}, + case fabric2_util:get_value(reverse, Opts, false) of + true -> FAcc1#fold_acc{end_key = K}; + false -> FAcc1#fold_acc{start_key = K} + end. + + +reset_iter_tx({erlfdb_transaction, _} = Tx) -> + ok = erlfdb:reset(Tx), + erlfdb:set_option(Tx, retry_limit, 0), + erlfdb:set_option(Tx, max_retry_delay, 0), + ok = iterator_db_validate(Tx, get(?PDICT_ITER_VALIDATE_DB)). + + +iterator_db_validate({erlfdb_transaction, _}, undefined) -> + ok; + +iterator_db_validate({erlfdb_transaction, _} = Tx, #{} = Db) -> + #{ + uuid := UUID, + name := DbName, + layer_prefix := LayerPrefix + } = Db, + DbPrefix = erlfdb_tuple:pack({?DBS, DbName}, LayerPrefix), + UUIDKey = erlfdb_tuple:pack({?DB_CONFIG, <<"uuid">>}, DbPrefix), + case erlfdb:wait(erlfdb:get(Tx, UUIDKey)) of + UUID -> ok; + _ -> error(database_does_not_exist) + end. get_db_handle() -> @@ -1512,3 +1646,68 @@ with_span(Operation, ExtraTags, Fun) -> false -> Fun() end. + + +create_iter(Options) when is_list(Options) -> + case get(?PDICT_ITER_CHECKPOINT) of + undefined -> ok; + _ -> error(iterator_already_created) + end, + Fdb = get_db_handle(), + Tx = erlfdb:create_transaction(Fdb), + apply_tx_options(Tx, Options ++ [ + disallow_writes, + {retry_limit, 0}, + {max_retry_delay, 0} + ]); + +create_iter(#{tx := undefined} = Db) -> + try + Db1 = refresh(Db), + + Reopen = maps:get(reopen, Db1, false), + Db2 = maps:remove(reopen, Db1), + + Options = maps:get(tx_options, Db2, []), + Tx = create_iter(Options), + + Db3 = case Reopen of + true -> reopen(Db2#{tx => Tx}); + false -> Db2#{tx => Tx} + end, + + % Here we might throw `reopen` + Db4 = ensure_current(Db3), + + % This part might update the Db cache + ok = run_on_commit_fun(Tx), + erase({?PDICT_ON_COMMIT_FUN, Tx}), + + % Save the initial Db handle so we can validate + % that the same db was still running + put(?PDICT_ITER_VALIDATE_DB, Db4), + + Db4#{tx := Tx} + catch throw:{?MODULE, reopen} -> + create_iter(Db#{reopen => true}) + end. + + +destroy_iter({erlfdb_transaction, _}) -> + erase(?PDICT_ITER_CHECKPOINT); + +destroy_iter(#{tx := {erfdb_transaction, _}} = Db) -> + #{tx := Tx} = Db, + erase(?PDICT_ITER_VALIDATE_DB), + destroy_iter(Tx), + Db#{tx := undefined}. + + +apply_tx_options(Tx, Options) -> + lists:foreach(fun(Option) -> + case Option of + K when is_atom(K) -> erlfdb:set_option(Tx, K); + {K, V} -> erlfdb:set_option(Tx, K, V) + end + end, Options), + Tx. diff --git a/src/fabric/src/fabric2_server.erl b/src/fabric/src/fabric2_server.erl index b1c38ef55..acacb0c11 100644 --- a/src/fabric/src/fabric2_server.erl +++ b/src/fabric/src/fabric2_server.erl @@ -58,6 +58,7 @@ fetch(DbName) when is_binary(DbName) -> store(#{name := DbName} = Db0) when is_binary(DbName) -> Db1 = Db0#{ tx := undefined, + tx_options := [], user_ctx := #user_ctx{}, security_fun := undefined }, diff --git a/src/fabric/src/fabric2_util.erl b/src/fabric/src/fabric2_util.erl index 4e2e2d76b..8d04219a6 100644 --- a/src/fabric/src/fabric2_util.erl +++ b/src/fabric/src/fabric2_util.erl @@ -31,6 +31,7 @@ get_value/2, get_value/3, + replace_value/3, to_hex/1, from_hex/1, uuid/0 @@ -160,6 +161,10 @@ get_value(Key, List, Default) -> end. +replace_value(Key, Val, List) -> + lists:keyreplace(Key, 1, List, {Key, Val}). + + to_hex(Bin) -> list_to_binary(to_hex_int(Bin)). |