summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2020-01-31 14:58:41 -0500
committerNick Vatamaniuc <vatamane@apache.org>2020-02-10 12:50:44 -0500
commit9665d86031fe4a633a07743f70e3bb9a40127da9 (patch)
treec558d5fa8ffdb54c17f09fce8f11cbba3eca9ea1
parentf4315660b2abf0570b27b581b71bd618a4f0e538 (diff)
downloadcouchdb-multi-transactional-iterators-2.tar.gz
Use multi-transactional iteratorsmulti-transactional-iterators-2
-rw-r--r--src/fabric/include/fabric2.hrl5
-rw-r--r--src/fabric/src/fabric2_db.erl35
-rw-r--r--src/fabric/src/fabric2_fdb.erl255
-rw-r--r--src/fabric/src/fabric2_server.erl1
-rw-r--r--src/fabric/src/fabric2_util.erl5
5 files changed, 263 insertions, 38 deletions
diff --git a/src/fabric/include/fabric2.hrl b/src/fabric/include/fabric2.hrl
index 828a51b8f..0be27fb39 100644
--- a/src/fabric/include/fabric2.hrl
+++ b/src/fabric/include/fabric2.hrl
@@ -57,7 +57,10 @@
-define(PDICT_TX_ID_KEY, '$fabric_tx_id').
-define(PDICT_TX_RES_KEY, '$fabric_tx_result').
-define(PDICT_ON_COMMIT_FUN, '$fabric_on_commit_fun').
--define(COMMIT_UNKNOWN_RESULT, 1021).
+-define(PDICT_ITER_CHECKPOINT, '$fabric_iter_checkpoint').
+-define(PDICT_ITER_VALIDATE_DB, '$fabric_iter_validate_db').
+-define(COMMIT_UNKNOWN_RESULT, 1021).
+-define(TRANSACTION_TOO_OLD, 1007).
-define(BINARY_CHUNK_SIZE, 100000).
diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl
index 6d015df0e..8b6a57452 100644
--- a/src/fabric/src/fabric2_db.erl
+++ b/src/fabric/src/fabric2_db.erl
@@ -175,7 +175,8 @@ open(DbName, Options) ->
case fabric2_server:fetch(DbName) of
#{} = Db ->
Db1 = maybe_set_user_ctx(Db, Options),
- {ok, require_member_check(Db1)};
+ Db2 = set_tx_options(Db1, Options),
+ {ok, require_member_check(Db2)};
undefined ->
Result = fabric2_fdb:transactional(DbName, Options, fun(TxDb) ->
fabric2_fdb:open(TxDb, Options)
@@ -211,18 +212,20 @@ list_dbs() ->
list_dbs(Options) ->
+ TxFun = tx_fun(Options),
Callback = fun(DbName, Acc) -> [DbName | Acc] end,
- DbNames = fabric2_fdb:transactional(fun(Tx) ->
+ DbNames = fabric2_fdb:TxFun(fun(Tx) ->
fabric2_fdb:list_dbs(Tx, Callback, [], Options)
- end),
+ end, Options),
lists:reverse(DbNames).
list_dbs(UserFun, UserAcc0, Options) ->
+ TxFun = tx_fun(Options),
FoldFun = fun
(DbName, Acc) -> maybe_stop(UserFun({row, [{id, DbName}]}, Acc))
end,
- fabric2_fdb:transactional(fun(Tx) ->
+ fabric2_fdb:TxFun(fun(Tx) ->
try
UserAcc1 = maybe_stop(UserFun({meta, []}, UserAcc0)),
UserAcc2 = fabric2_fdb:list_dbs(
@@ -235,7 +238,7 @@ list_dbs(UserFun, UserAcc0, Options) ->
catch throw:{stop, FinalUserAcc} ->
{ok, FinalUserAcc}
end
- end).
+ end, Options).
is_admin(Db, {SecProps}) when is_list(SecProps) ->
@@ -755,7 +758,8 @@ fold_docs(Db, UserFun, UserAcc) ->
fold_docs(Db, UserFun, UserAcc0, Options) ->
- fabric2_fdb:transactional(Db, fun(TxDb) ->
+ TxFun = tx_fun(Options),
+ fabric2_fdb:TxFun(Db, fun(TxDb) ->
try
#{
db_prefix := DbPrefix
@@ -780,7 +784,7 @@ fold_docs(Db, UserFun, UserAcc0, Options) ->
catch throw:{stop, FinalUserAcc} ->
{ok, FinalUserAcc}
end
- end).
+ end, Options).
fold_design_docs(Db, UserFun, UserAcc0, Options1) ->
@@ -829,7 +833,8 @@ fold_changes(Db, SinceSeq, UserFun, UserAcc) ->
fold_changes(Db, SinceSeq, UserFun, UserAcc, Options) ->
- fabric2_fdb:transactional(Db, fun(TxDb) ->
+ TxFun = tx_fun(Options),
+ fabric2_fdb:TxFun(Db, fun(TxDb) ->
try
#{
db_prefix := DbPrefix
@@ -868,7 +873,7 @@ fold_changes(Db, SinceSeq, UserFun, UserAcc, Options) ->
catch throw:{stop, FinalUserAcc} ->
{ok, FinalUserAcc}
end
- end).
+ end, Options).
dbname_suffix(DbName) ->
@@ -1001,6 +1006,11 @@ maybe_set_user_ctx(Db, Options) ->
end.
+set_tx_options(Db, Options) ->
+ TxOptions = fabric2_util:get_value(tx_options, Options, []),
+ Db#{tx_options := TxOptions}.
+
+
is_member(Db, {SecProps}) when is_list(SecProps) ->
case is_admin(Db, {SecProps}) of
true ->
@@ -1765,3 +1775,10 @@ stem_revisions(#{} = Db, #doc{} = Doc) ->
true -> Doc#doc{revs = {RevPos, lists:sublist(Revs, RevsLimit)}};
false -> Doc
end.
+
+
+tx_fun(Options) when is_list(Options) ->
+ case fabric2_util:get_value(iterator, Options, true) of
+ true -> with_iter;
+ undefined -> transactional
+ end.
diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl
index 6abe1f6de..8d45e418d 100644
--- a/src/fabric/src/fabric2_fdb.erl
+++ b/src/fabric/src/fabric2_fdb.erl
@@ -24,6 +24,8 @@
delete/1,
exists/1,
+ with_iter/2,
+
get_dir/1,
list_dbs/4,
@@ -69,17 +71,31 @@
-include("fabric2.hrl").
-transactional(Fun) ->
- do_transaction(Fun, undefined).
+-record(fold_acc, {
+ start_key,
+ end_key,
+ skip,
+ opts,
+ ucallback,
+ uacc
+}).
+
+
+
+transactional(Fun) when is_function(Fun, 1) ->
+ transactional(Fun, []).
transactional(DbName, Options, Fun) when is_binary(DbName) ->
with_span(Fun, #{'db.name' => DbName}, fun() ->
transactional(fun(Tx) ->
Fun(init_db(Tx, DbName, Options))
- end)
+ end, Options)
end).
+transactional(Fun, Options) when is_function(Fun, 1), is_list(Options) ->
+ TxOptions = fabric2_util:get_value(tx_options, Options, []),
+ do_transaction(Fun, undefined, TxOptions);
transactional(#{tx := undefined} = Db, Fun) ->
DbName = maps:get(name, Db, undefined),
@@ -91,13 +107,14 @@ transactional(#{tx := undefined} = Db, Fun) ->
true -> undefined;
false -> maps:get(layer_prefix, Db2)
end,
+ Options = maps:get(tx_options, Db2, []),
with_span(Fun, #{'db.name' => DbName}, fun() ->
do_transaction(fun(Tx) ->
case Reopen of
true -> Fun(reopen(Db2#{tx => Tx}));
false -> Fun(Db2#{tx => Tx})
end
- end, LayerPrefix)
+ end, LayerPrefix, Options)
end)
catch throw:{?MODULE, reopen} ->
with_span('db.reopen', #{'db.name' => DbName}, fun() ->
@@ -112,19 +129,20 @@ transactional(#{tx := {erlfdb_transaction, _}} = Db, Fun) ->
end).
-do_transaction(Fun, LayerPrefix) when is_function(Fun, 1) ->
+do_transaction(Fun, LayerPrefix, Options) when is_function(Fun, 1) ->
Db = get_db_handle(),
try
erlfdb:transactional(Db, fun(Tx) ->
- case get(erlfdb_trace) of
+ TraceOpts = case get(erlfdb_trace) of
Name when is_binary(Name) ->
UId = erlang:unique_integer([positive]),
UIdBin = integer_to_binary(UId, 36),
TxId = <<Name/binary, "_", UIdBin/binary>>,
- erlfdb:set_option(Tx, transaction_logging_enable, TxId);
+ [{transaction_logging_enable, TxId}];
_ ->
- ok
+ []
end,
+ apply_tx_options(Tx, TraceOpts ++ Options),
case is_transaction_applied(Tx) of
true ->
get_previous_transaction_result();
@@ -184,6 +202,9 @@ create(#{} = Db0, Options) ->
UserCtx = fabric2_util:get_value(user_ctx, Options, #user_ctx{}),
Options1 = lists:keydelete(user_ctx, 1, Options),
+ TxOptions = fabric2_util:get_value(tx_options, Options1, []),
+ Options2 = lists:keydelete(tx_options, 1, Options1),
+
Db#{
uuid => UUID,
db_prefix => DbPrefix,
@@ -198,7 +219,8 @@ create(#{} = Db0, Options) ->
after_doc_read => undefined,
% All other db things as we add features,
- db_options => Options1
+ db_options => Options2,
+ tx_options => TxOptions
}.
@@ -221,6 +243,9 @@ open(#{} = Db0, Options) ->
UserCtx = fabric2_util:get_value(user_ctx, Options, #user_ctx{}),
Options1 = lists:keydelete(user_ctx, 1, Options),
+ TxOptions = fabric2_util:get_value(tx_options, Options1, []),
+ Options2 = lists:keydelete(tx_options, 1, Options1),
+
Db2 = Db1#{
db_prefix => DbPrefix,
db_version => DbVersion,
@@ -237,7 +262,8 @@ open(#{} = Db0, Options) ->
before_doc_update => undefined,
after_doc_read => undefined,
- db_options => Options1
+ db_options => Options2,
+ tx_options => TxOptions
},
Db3 = load_config(Db2),
@@ -307,6 +333,23 @@ exists(#{name := DbName} = Db) when is_binary(DbName) ->
end.
+with_iter(Fun, Options) when is_function(Fun, 1), is_list(Options) ->
+ Tx = create_iter(fabric2_util:get_value(tx_options, Options, [])),
+ try
+ Fun(Tx)
+ after
+ destroy_iter(Tx)
+ end;
+
+with_iter(#{tx := undefined} = Db, Fun) when is_function(Fun, 1) ->
+ IterDb = create_iter(Db),
+ try
+ Fun(IterDb)
+ after
+ destroy_iter(IterDb)
+ end.
+
+
get_dir(Tx) ->
Root = erlfdb_directory:root(),
Dir = fabric2_server:fdb_directory(),
@@ -821,19 +864,50 @@ fold_range(#{} = Db, RangePrefix, Callback, Acc, Options) ->
} = ensure_current(Db),
fold_range({tx, Tx}, RangePrefix, Callback, Acc, Options);
-fold_range({tx, Tx}, RangePrefix, UserCallback, UserAcc, Options) ->
- case fabric2_util:get_value(limit, Options) of
- 0 ->
+fold_range({tx, Tx}, RangePrefix, Callback, Acc, Options) ->
+ Iterator = fabric2_util:get_value(iterator, Options),
+ case {fabric2_util:get_value(limit, Options), Iterator} of
+ {0, _} ->
% FoundationDB treats a limit of 0 as unlimited
% so we have to guard for that here.
- UserAcc;
- _ ->
- {Start, End, Skip, FoldOpts} = get_fold_opts(RangePrefix, Options),
- Callback = fun fold_range_cb/2,
- Acc = {skip, Skip, UserCallback, UserAcc},
- {skip, _, UserCallback, OutAcc} =
- erlfdb:fold_range(Tx, Start, End, Callback, Acc, FoldOpts),
- OutAcc
+ Acc;
+ {_, undefined} ->
+ FAcc = get_fold_acc(RangePrefix, Callback, Acc, Options),
+ fold_range_tx(Tx, FAcc);
+ {_, true} ->
+ FAcc = get_fold_acc(RangePrefix, Callback, Acc, Options),
+ fold_range_iter(Tx, FAcc)
+ end.
+
+
+fold_range_tx(Tx, #fold_acc{} = FAcc) ->
+ #fold_acc{
+ start_key = Start,
+ end_key = End,
+ opts = Opts
+ } = FAcc,
+ Callback = fun fold_range_tx_cb/2,
+ FAccOut = erlfdb:fold_range(Tx, Start, End, Callback, FAcc, Opts),
+ FAccOut#fold_acc.uacc.
+
+
+fold_range_iter(Tx, #fold_acc{} = FAcc) ->
+ #fold_acc{
+ start_key = Start,
+ end_key = End,
+ opts = Opts
+ } = FAcc,
+ Callback = fun fold_range_iter_cb/2,
+ put(?PDICT_ITER_CHECKPOINT, FAcc),
+ try erlfdb:fold_range(Tx, Start, End, Callback, FAcc, Opts) of
+ #fold_acc{uacc = UAccOut} -> UAccOut
+ catch
+ error:{erlfdb_error, ?TRANSACTION_TOO_OLD} ->
+ #fold_acc{} = FAcc1 = get(?PDICT_ITER_CHECKPOINT),
+ io:format(standard_error, "~n **** resetting Tx ~p FAcc1:~p~n", [Tx, FAcc1]),
+ couch_log:error("**** RESETTING Tx ~p FAcc1:~p~n", [Tx, FAcc1]),
+ ok = reset_iter_tx(Tx),
+ fold_range_iter(Tx, FAcc1)
end.
@@ -1277,7 +1351,7 @@ chunkify_binary(Data) ->
end.
-get_fold_opts(RangePrefix, Options) ->
+get_fold_acc(RangePrefix, UserCallback, UserAcc, Options) ->
Reverse = case fabric2_util:get_value(dir, Options) of
rev -> true;
_ -> false
@@ -1367,15 +1441,75 @@ get_fold_opts(RangePrefix, Options) ->
++ StreamingMode
++ Snapshot,
- {StartKey3, EndKey3, Skip, OutOpts}.
+ #fold_acc{
+ start_key = StartKey3,
+ end_key = EndKey3,
+ skip = Skip,
+ opts = OutOpts,
+ ucallback = UserCallback,
+ uacc = UserAcc
+ }.
+
+
+fold_range_tx_cb(KV, #fold_acc{skip = 0} = FAcc) ->
+ #fold_acc{ucallback = UCallback, uacc = UAcc} = FAcc,
+ NewUAcc = UCallback(KV, UAcc),
+ FAcc#fold_acc{uacc = NewUAcc};
+
+fold_range_tx_cb(_KV, #fold_acc{skip = N} = FAcc) when N > 0 ->
+ FAcc#fold_acc{skip = N - 1}.
+
+fold_range_iter_cb({K, V}, #fold_acc{skip = 0} = FAcc) ->
+ #fold_acc{ucallback = UCallback, uacc = UAcc} = FAcc,
+ NewUAcc = UCallback({K, V}, UAcc),
+ NewFAcc = next_iter_acc(K, FAcc),
+ put(?PDICT_ITER_CHECKPOINT, NewFAcc),
+ NewFAcc#fold_acc{uacc = NewUAcc};
-fold_range_cb(KV, {skip, 0, Callback, Acc}) ->
- NewAcc = Callback(KV, Acc),
- {skip, 0, Callback, NewAcc};
-fold_range_cb(_KV, {skip, N, Callback, Acc}) when is_integer(N), N > 0 ->
- {skip, N - 1, Callback, Acc}.
+fold_range_iter_cb({K, _V}, #fold_acc{skip = N} = FAcc) when N > 0 ->
+ put(?PDICT_ITER_CHECKPOINT, FAcc),
+ next_iter_acc(K, FAcc).
+
+
+next_iter_acc(K, #fold_acc{} = FAcc) ->
+ #fold_acc{skip = Skip, opts = Opts} = FAcc,
+ Opts1 = case fabric2_util:get_value(limit, Opts) of
+ N when is_integer(N), N > 0 ->
+ fabric2_util:replace_value(limit, Opts, N - 1);
+ undefined ->
+ Opts
+ end,
+ FAcc1 = FAcc#fold_acc{opts = Opts1, skip = max(0, Skip - 1)},
+ case fabric2_util:get_value(reverse, Opts, false) of
+ true -> FAcc1#fold_acc{end_key = K};
+ false -> FAcc1#fold_acc{start_key = K}
+ end.
+
+
+reset_iter_tx({erlfdb_transaction, _} = Tx) ->
+ ok = erlfdb:reset(Tx),
+ erlfdb:set_option(Tx, retry_limit, 0),
+ erlfdb:set_option(Tx, max_retry_delay, 0),
+ ok = iterator_db_validate(Tx, get(?PDICT_ITER_VALIDATE_DB)).
+
+
+iterator_db_validate({erlfdb_transaction, _}, undefined) ->
+ ok;
+
+iterator_db_validate({erlfdb_transaction, _} = Tx, #{} = Db) ->
+ #{
+ uuid := UUID,
+ name := DbName,
+ layer_prefix := LayerPrefix
+ } = Db,
+ DbPrefix = erlfdb_tuple:pack({?DBS, DbName}, LayerPrefix),
+ UUIDKey = erlfdb_tuple:pack({?DB_CONFIG, <<"uuid">>}, DbPrefix),
+ case erlfdb:wait(erlfdb:get(Tx, UUIDKey)) of
+ UUID -> ok;
+ _ -> error(database_does_not_exist)
+ end.
get_db_handle() ->
@@ -1512,3 +1646,68 @@ with_span(Operation, ExtraTags, Fun) ->
false ->
Fun()
end.
+
+
+create_iter(Options) when is_list(Options) ->
+ case get(?PDICT_ITER_CHECKPOINT) of
+ undefined -> ok;
+ _ -> error(iterator_already_created)
+ end,
+ Fdb = get_db_handle(),
+ Tx = erlfdb:create_transaction(Fdb),
+ apply_tx_options(Tx, Options ++ [
+ disallow_writes,
+ {retry_limit, 0},
+ {max_retry_delay, 0}
+ ]);
+
+create_iter(#{tx := undefined} = Db) ->
+ try
+ Db1 = refresh(Db),
+
+ Reopen = maps:get(reopen, Db1, false),
+ Db2 = maps:remove(reopen, Db1),
+
+ Options = maps:get(tx_options, Db2, []),
+ Tx = create_iter(Options),
+
+ Db3 = case Reopen of
+ true -> reopen(Db2#{tx => Tx});
+ false -> Db2#{tx => Tx}
+ end,
+
+ % Here we might throw `reopen`
+ Db4 = ensure_current(Db3),
+
+ % This part might update the Db cache
+ ok = run_on_commit_fun(Tx),
+ erase({?PDICT_ON_COMMIT_FUN, Tx}),
+
+ % Save the initial Db handle so we can validate
+ % that the same db was still running
+ put(?PDICT_ITER_VALIDATE_DB, Db4),
+
+ Db4#{tx := Tx}
+ catch throw:{?MODULE, reopen} ->
+ create_iter(Db#{reopen => true})
+ end.
+
+
+destroy_iter({erlfdb_transaction, _}) ->
+ erase(?PDICT_ITER_CHECKPOINT);
+
+destroy_iter(#{tx := {erfdb_transaction, _}} = Db) ->
+ #{tx := Tx} = Db,
+ erase(?PDICT_ITER_VALIDATE_DB),
+ destroy_iter(Tx),
+ Db#{tx := undefined}.
+
+
+apply_tx_options(Tx, Options) ->
+ lists:foreach(fun(Option) ->
+ case Option of
+ K when is_atom(K) -> erlfdb:set_option(Tx, K);
+ {K, V} -> erlfdb:set_option(Tx, K, V)
+ end
+ end, Options),
+ Tx.
diff --git a/src/fabric/src/fabric2_server.erl b/src/fabric/src/fabric2_server.erl
index b1c38ef55..acacb0c11 100644
--- a/src/fabric/src/fabric2_server.erl
+++ b/src/fabric/src/fabric2_server.erl
@@ -58,6 +58,7 @@ fetch(DbName) when is_binary(DbName) ->
store(#{name := DbName} = Db0) when is_binary(DbName) ->
Db1 = Db0#{
tx := undefined,
+ tx_options := [],
user_ctx := #user_ctx{},
security_fun := undefined
},
diff --git a/src/fabric/src/fabric2_util.erl b/src/fabric/src/fabric2_util.erl
index 4e2e2d76b..8d04219a6 100644
--- a/src/fabric/src/fabric2_util.erl
+++ b/src/fabric/src/fabric2_util.erl
@@ -31,6 +31,7 @@
get_value/2,
get_value/3,
+ replace_value/3,
to_hex/1,
from_hex/1,
uuid/0
@@ -160,6 +161,10 @@ get_value(Key, List, Default) ->
end.
+replace_value(Key, Val, List) ->
+ lists:keyreplace(Key, 1, List, {Key, Val}).
+
+
to_hex(Bin) ->
list_to_binary(to_hex_int(Bin)).