summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2020-01-31 14:58:41 -0500
committerNick Vatamaniuc <vatamane@apache.org>2020-02-12 12:56:05 -0500
commita5b2ece3079763c6df2d4fc6879e7cfd2b60b7ca (patch)
treea0ff769a99d7a9d4490e61a78e51fc5382e04ab6
parentf4315660b2abf0570b27b581b71bd618a4f0e538 (diff)
downloadcouchdb-multi-transactional-iterators-3.tar.gz
Use multi-transactional iteratorsmulti-transactional-iterators-3
-rw-r--r--src/fabric/include/fabric2.hrl4
-rw-r--r--src/fabric/src/fabric2_fdb.erl174
2 files changed, 148 insertions, 30 deletions
diff --git a/src/fabric/include/fabric2.hrl b/src/fabric/include/fabric2.hrl
index 828a51b8f..d07a73793 100644
--- a/src/fabric/include/fabric2.hrl
+++ b/src/fabric/include/fabric2.hrl
@@ -57,6 +57,10 @@
-define(PDICT_TX_ID_KEY, '$fabric_tx_id').
-define(PDICT_TX_RES_KEY, '$fabric_tx_result').
-define(PDICT_ON_COMMIT_FUN, '$fabric_on_commit_fun').
+-define(PDICT_FOLD_ACC_STATE, '$fabric_fold_acc_state').
+
+% Let's keep these in ascending order
+-define(TRANSACTION_TOO_OLD, 1007).
-define(COMMIT_UNKNOWN_RESULT, 1021).
diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl
index 6abe1f6de..0f3c6234a 100644
--- a/src/fabric/src/fabric2_fdb.erl
+++ b/src/fabric/src/fabric2_fdb.erl
@@ -69,6 +69,23 @@
-include("fabric2.hrl").
+-define(MAX_FOLD_RANGE_RETRIES, 3).
+
+
+-record(fold_acc, {
+ db,
+ restart_tx,
+ start_key,
+ end_key,
+ limit,
+ skip,
+ retries,
+ base_opts,
+ user_fun,
+ user_acc
+}).
+
+
transactional(Fun) ->
do_transaction(Fun, undefined).
@@ -815,25 +832,49 @@ get_last_change(#{} = Db) ->
end.
-fold_range(#{} = Db, RangePrefix, Callback, Acc, Options) ->
- #{
- tx := Tx
- } = ensure_current(Db),
- fold_range({tx, Tx}, RangePrefix, Callback, Acc, Options);
-
-fold_range({tx, Tx}, RangePrefix, UserCallback, UserAcc, Options) ->
- case fabric2_util:get_value(limit, Options) of
- 0 ->
- % FoundationDB treats a limit of 0 as unlimited
- % so we have to guard for that here.
- UserAcc;
- _ ->
- {Start, End, Skip, FoldOpts} = get_fold_opts(RangePrefix, Options),
- Callback = fun fold_range_cb/2,
- Acc = {skip, Skip, UserCallback, UserAcc},
- {skip, _, UserCallback, OutAcc} =
- erlfdb:fold_range(Tx, Start, End, Callback, Acc, FoldOpts),
- OutAcc
+fold_range(TxOrDb, RangePrefix, UserFun, UserAcc, Options) ->
+ {Db, Tx} = case TxOrDb of
+ {tx, TxObj} ->
+ {undefined, TxObj};
+ #{} = DbObj ->
+ DbObj1 = #{tx := TxObj} = ensure_current(DbObj),
+ {DbObj1, TxObj}
+ end,
+ % FoundationDB treats a limit 0 of as unlimited so we guard against it
+ case fabric2_util:get_value(limit, Options) of 0 -> UserAcc; _ ->
+ FAcc = get_fold_acc(Db, RangePrefix, UserFun, UserAcc, Options),
+ try
+ fold_range(Tx, FAcc)
+ after
+ erase(?PDICT_FOLD_ACC_STATE)
+ end
+ end.
+
+
+fold_range(Tx, FAcc) ->
+ #fold_acc{
+ start_key = Start,
+ end_key = End,
+ limit = Limit,
+ base_opts = BaseOpts,
+ restart_tx = DoRestart
+ } = FAcc,
+ case DoRestart of false -> ok; true ->
+ ok = erlfdb:set_option(Tx, disallow_writes)
+ end,
+ Opts = [{limit, Limit} | BaseOpts],
+ Callback = fun fold_range_cb/2,
+ try
+ #fold_acc{
+ user_acc = FinalUserAcc
+ } = erlfdb:fold_range(Tx, Start, End, Callback, FAcc, Opts),
+ FinalUserAcc
+ catch error:{erlfdb_error, ?TRANSACTION_TOO_OLD} when DoRestart ->
+ % Possibly handle cluster_version_changed and future_version as well to
+ % continue iteration instead fallback to transactional and retrying
+ % from the begging which is bound to fail when streaming data out to a
+ % socket.
+ fold_range(Tx, restart_fold(Tx, FAcc))
end.
@@ -1277,7 +1318,9 @@ chunkify_binary(Data) ->
end.
-get_fold_opts(RangePrefix, Options) ->
+get_fold_acc(Db, RangePrefix, UserCallback, UserAcc, Options)
+ when is_map(Db) orelse Db =:= undefined ->
+
Reverse = case fabric2_util:get_value(dir, Options) of
rev -> true;
_ -> false
@@ -1342,8 +1385,8 @@ get_fold_opts(RangePrefix, Options) ->
end,
Limit = case fabric2_util:get_value(limit, Options) of
- L when is_integer(L), L >= 0 -> [{limit, L + Skip}];
- undefined -> []
+ L when is_integer(L), L >= 0 -> L + Skip;
+ undefined -> 0
end,
TargetBytes = case fabric2_util:get_value(target_bytes, Options) of
@@ -1361,21 +1404,68 @@ get_fold_opts(RangePrefix, Options) ->
B when is_boolean(B) -> [{snapshot, B}]
end,
- OutOpts = [{reverse, Reverse}]
- ++ Limit
+ BaseOpts = [{reverse, Reverse}]
++ TargetBytes
++ StreamingMode
++ Snapshot,
- {StartKey3, EndKey3, Skip, OutOpts}.
+ RestartTx = fabric2_util:get_value(restart_tx, Options, false),
+
+ #fold_acc{
+ db = Db,
+ start_key = StartKey3,
+ end_key = EndKey3,
+ skip = Skip,
+ limit = Limit,
+ retries = 0,
+ base_opts = BaseOpts,
+ restart_tx = RestartTx,
+ user_fun = UserCallback,
+ user_acc = UserAcc
+ }.
+
+fold_range_cb({K, V}, #fold_acc{} = Acc) ->
+ #fold_acc{
+ skip = Skip,
+ limit = Limit,
+ user_fun = UserFun,
+ user_acc = UserAcc,
+ base_opts = Opts
+ } = Acc,
+ Acc1 = case Skip =:= 0 of
+ true ->
+ UserAcc1 = UserFun({K, V}, UserAcc),
+ Acc#fold_acc{limit = Limit - 1, user_acc = UserAcc1};
+ false ->
+ Acc#fold_acc{skip = Skip - 1, limit = Limit - 1}
+ end,
+ Acc2 = case fabric2_util:get_value(reverse, Opts, false) of
+ true -> Acc1#fold_acc{end_key = K};
+ false -> Acc1#fold_acc{start_key = K}
+ end,
+ put(?PDICT_FOLD_ACC_STATE, Acc2),
+ Acc2.
-fold_range_cb(KV, {skip, 0, Callback, Acc}) ->
- NewAcc = Callback(KV, Acc),
- {skip, 0, Callback, NewAcc};
-fold_range_cb(_KV, {skip, N, Callback, Acc}) when is_integer(N), N > 0 ->
- {skip, N - 1, Callback, Acc}.
+restart_fold(Tx, #fold_acc{} = Acc) ->
+ erase(?PDICT_CHECKED_MD_IS_CURRENT),
+ % Not actually committing anyting so we skip on-commit handlers here. Those
+ % are usually to refresh db handles in the cache. If the iterator runs for
+ % a while it might be inserting a stale handle in there anyway.
+ erase({?PDICT_ON_COMMIT_FUN, Tx}),
+
+ ok = erlfdb:reset(Tx),
+
+ case {erase(?PDICT_FOLD_ACC_STATE), Acc#fold_acc.retries} of
+ {#fold_acc{db = Db} = Acc1, _} ->
+ Acc1#fold_acc{db = check_db_instance(Db), retries = 0};
+ {undefined, Retries} when Retries < ?MAX_FOLD_RANGE_RETRIES ->
+ Db = check_db_instance(Acc#fold_acc.db),
+ Acc#fold_acc{db = Db, retries = Retries + 1};
+ {undefined, _} ->
+ error(fold_range_not_progressing)
+ end.
get_db_handle() ->
@@ -1415,6 +1505,30 @@ ensure_current(#{} = Db0, CheckDbVersion) ->
end.
+check_db_instance(undefined) ->
+ undefined;
+
+check_db_instance(#{} = Db) ->
+ require_transaction(Db),
+ case check_metadata_version(Db) of
+ {current, Db1} ->
+ Db1;
+ {stale, Db1} ->
+ #{
+ tx := Tx,
+ uuid := UUID,
+ name := DbName,
+ layer_prefix := LayerPrefix
+ } = Db1,
+ DbPrefix = erlfdb_tuple:pack({?DBS, DbName}, LayerPrefix),
+ UUIDKey = erlfdb_tuple:pack({?DB_CONFIG, <<"uuid">>}, DbPrefix),
+ case erlfdb:wait(erlfdb:get(Tx, UUIDKey)) of
+ UUID -> Db1;
+ _ -> error(database_does_not_exist)
+ end
+ end.
+
+
is_transaction_applied(Tx) ->
is_commit_unknown_result()
andalso has_transaction_id()