summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul J. Davis <paul.joseph.davis@gmail.com>2020-02-10 13:08:52 -0600
committerPaul J. Davis <paul.joseph.davis@gmail.com>2020-02-10 14:01:57 -0600
commit56cfe40fed1a51cde02744dfefe64283c4157c1f (patch)
treed1fc1d4c2a64ce73f5cc19a2c74c9d3c9ec45209
parentf4315660b2abf0570b27b581b71bd618a4f0e538 (diff)
downloadcouchdb-multi-transactional-iterators-2-davisp.tar.gz
[WIP] Restart transactionsmulti-transactional-iterators-2-davisp
-rw-r--r--src/fabric/include/fabric2.hrl2
-rw-r--r--src/fabric/src/fabric2_fdb.erl193
2 files changed, 170 insertions, 25 deletions
diff --git a/src/fabric/include/fabric2.hrl b/src/fabric/include/fabric2.hrl
index 828a51b8f..8305adbee 100644
--- a/src/fabric/include/fabric2.hrl
+++ b/src/fabric/include/fabric2.hrl
@@ -57,7 +57,9 @@
-define(PDICT_TX_ID_KEY, '$fabric_tx_id').
-define(PDICT_TX_RES_KEY, '$fabric_tx_result').
-define(PDICT_ON_COMMIT_FUN, '$fabric_on_commit_fun').
+-define(PDICT_FOLD_ACC_STATE, '$fabric_fold_acc_state').
-define(COMMIT_UNKNOWN_RESULT, 1021).
+-define(TRANSACTION_TOO_OLD, 1007).
-define(BINARY_CHUNK_SIZE, 100000).
diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl
index 6abe1f6de..951437262 100644
--- a/src/fabric/src/fabric2_fdb.erl
+++ b/src/fabric/src/fabric2_fdb.erl
@@ -69,6 +69,19 @@
-include("fabric2.hrl").
+-record(fold_acc, {
+ db,
+ restart_tx,
+ start_key,
+ end_key,
+ skip,
+ base_opts,
+ restart_tx,
+ user_fun,
+ user_acc
+}).
+
+
transactional(Fun) ->
do_transaction(Fun, undefined).
@@ -116,15 +129,16 @@ do_transaction(Fun, LayerPrefix) when is_function(Fun, 1) ->
Db = get_db_handle(),
try
erlfdb:transactional(Db, fun(Tx) ->
- case get(erlfdb_trace) of
+ TraceOpts = case get(erlfdb_trace) of
Name when is_binary(Name) ->
UId = erlang:unique_integer([positive]),
UIdBin = integer_to_binary(UId, 36),
TxId = <<Name/binary, "_", UIdBin/binary>>,
- erlfdb:set_option(Tx, transaction_logging_enable, TxId);
+ [{transaction_logging_enable, TxId}];
_ ->
- ok
+ []
end,
+ apply_tx_options(Tx, TraceOpts ++ Options),
case is_transaction_applied(Tx) of
true ->
get_previous_transaction_result();
@@ -815,25 +829,53 @@ get_last_change(#{} = Db) ->
end.
-fold_range(#{} = Db, RangePrefix, Callback, Acc, Options) ->
+fold_range(#{} = Db, RangePrefix, UserFun, UserAcc, Options) ->
#{
tx := Tx
- } = ensure_current(Db),
- fold_range({tx, Tx}, RangePrefix, Callback, Acc, Options);
+ } = CurrDb = ensure_current(Db),
+ case fabric2_util:get_value(limit, Options) of
+ 0 ->
+ % FoundationDB treats a limit of 0 as unlimited
+ % so we have to guard for that here.
+ UserAcc;
+ _ ->
+ FAcc = get_fold_acc(CurrDb, RangePrefix, Options, UserFun, UserAcc),
+ try
+ fold_range(Tx, FAcc)
+ after
+ erase(?PDICT_FOLD_ACC_STATE)
+ end
+ end;
-fold_range({tx, Tx}, RangePrefix, UserCallback, UserAcc, Options) ->
+fold_range({tx, Tx}, RangePrefix, UserFun, UserAcc, Options) ->
case fabric2_util:get_value(limit, Options) of
0 ->
% FoundationDB treats a limit of 0 as unlimited
% so we have to guard for that here.
UserAcc;
_ ->
- {Start, End, Skip, FoldOpts} = get_fold_opts(RangePrefix, Options),
- Callback = fun fold_range_cb/2,
- Acc = {skip, Skip, UserCallback, UserAcc},
- {skip, _, UserCallback, OutAcc} =
- erlfdb:fold_range(Tx, Start, End, Callback, Acc, FoldOpts),
- OutAcc
+ FAcc1 = get_fold_acc(nil, RangePrefix, Options, UserFun, UserAcc),
+ try
+ fold_range(Tx, FAcc1)
+ after
+ erase(?PDICT_FOLD_ACC_STATE)
+ end.
+ end.
+
+
+fold_range(Tx, FAcc) ->
+ #fold_acc{
+ restart_tx = DoRestart
+ } = FAcc,
+ {FStart, FEnd, FOpts} = fold_acc_to_opts(FAcc),
+ try
+ #fold_acc{
+ user_acc = FinalUserAcc
+ } = erlfdb:fold_range(Tx, FStart, FEnd, Callback, FAcc1, FOpts),
+ FinalUserAcc
+ catch error:{erlfdb_error, ?TRANSACTION_TOO_OLD} when DoRestart ->
+ NewFAcc = restart_fold(FAcc)
+ fold_range(Tx, NewFAcc)
end.
@@ -1277,7 +1319,21 @@ chunkify_binary(Data) ->
end.
-get_fold_opts(RangePrefix, Options) ->
+get_fold_acc(Db, RangePrefix, Options, UserCallback, UserAcc) ->
+ BaseAcc = case lists:member(restart_tx, Options) of
+ true when is_map(Db) ->
+ erlfdb:set_option(Tx, disallow_writes),
+ #fold_acc{
+ db = Db,
+ restart_tx = true
+ };
+ false ->
+ #fold_acc{
+ db = undefined,
+ restart_tx = false
+ }
+ end,
+
Reverse = case fabric2_util:get_value(dir, Options) of
rev -> true;
_ -> false
@@ -1342,8 +1398,8 @@ get_fold_opts(RangePrefix, Options) ->
end,
Limit = case fabric2_util:get_value(limit, Options) of
- L when is_integer(L), L >= 0 -> [{limit, L + Skip}];
- undefined -> []
+ L when is_integer(L), L >= 0 -> L + Skip;
+ undefined -> 0
end,
TargetBytes = case fabric2_util:get_value(target_bytes, Options) of
@@ -1361,21 +1417,99 @@ get_fold_opts(RangePrefix, Options) ->
B when is_boolean(B) -> [{snapshot, B}]
end,
- OutOpts = [{reverse, Reverse}]
- ++ Limit
+ BaseOpts = [{reverse, Reverse}]
++ TargetBytes
++ StreamingMode
++ Snapshot,
- {StartKey3, EndKey3, Skip, OutOpts}.
-
+ BaseAcc#fold_acc{
+ start_key = StartKey3,
+ end_key = EndKey3,
+ skip = Skip,
+ limit = Limit,
+ base_opts = Opts,
+ ucallback = UserCallback,
+ uacc = UserAcc
+ }.
-fold_range_cb(KV, {skip, 0, Callback, Acc}) ->
- NewAcc = Callback(KV, Acc),
- {skip, 0, Callback, NewAcc};
-fold_range_cb(_KV, {skip, N, Callback, Acc}) when is_integer(N), N > 0 ->
- {skip, N - 1, Callback, Acc}.
+fold_acc_to_opts(#fold_acc{} = Acc) ->
+ #fold_acc{
+ start_key = StartKey,
+ end_key = EndKey,
+ limit = Limit,
+ base_opts = BaseOpts
+ } = Acc,
+ {StartKey, EndKey, [{limit, Limit} | BaseOpts]}.
+
+
+fold_range_cb({K, V}, #fold_acc{skip = 0} = Acc) ->
+ #fold_acc{
+ tx = Tx,
+ restart_tx = DoRestart,
+ limit = Limit,
+ user_fun = UserFun,
+ user_acc = UserAcc
+ } = Acc
+ NewUserAcc = UserFun({K, V}, UserAcc),
+ store_fold_acc(Acc#fold_acc{
+ start_key = K,
+ limit = Limit - 1,
+ user_acc = NewUserAcc
+ });
+
+fold_range_cb({K, _V}, #fold_acc{skip = S} = Acc) when is_integer(S), S > 0 ->
+ #fold_acc{
+ skip = Skip,
+ limit = Limit
+ } = Acc,
+ store_fold_acc(Acc#fold_acc{
+ start_key = K,
+ skip = Skip - 1,
+ limit = Limit - 1
+ }).
+
+
+store_fold_acc(Acc) ->
+ #fold_acc{
+ start_key = StartKey,
+ skip = Skip,
+ limit = Limit,
+ user_acc = UserAcc
+ } = Acc,
+ put(?PDICT_FOLD_ACC_STATE, {StartKey, Skip, Limit, UserAc}),
+ Acc.
+
+
+restart_fold(Acc) ->
+ erase(?PDICT_CHECKED_MD_IS_CURRENT),
+ erase(?PDICT_CHECKED_DB_IS_CURRENT)
+ ok = erlfdb:reset(Tx),
+ % Do a thing with Acc#fold_acc.db to ensure that we're
+ % on the same db version? Also, do we need to differentiate
+ % between db versions and db instances? I.e., do we bail
+ % when a ddoc has been updated or just if we have done a
+ % delete/create cycle?
+ case get(?PDICT_FOLD_ACC_STATE) of
+ erase(?PDICT_FOLD_ACC_STATE),
+ {StartKey, Skip, Limit, UserAcc} ->
+ Acc#fold_acc{
+ start_key = StartKey,
+ skip = Skip,
+ limit = Limit,
+ user_acc = UserAcc
+ };
+ undefined ->
+ % Might need to assert that we haven't started
+ % folding yet for this case.
+ %
+ % We may also want to add a check here that we're
+ % not in an infinite loop somehow. Mebbe if the
+ % user_fun is using a different tx that's throwing
+ % the too_old error that we're not clearing by reseting
+ % here?
+ Acc
+ end.
get_db_handle() ->
@@ -1512,3 +1646,12 @@ with_span(Operation, ExtraTags, Fun) ->
false ->
Fun()
end.
+
+apply_tx_options(Tx, Options) ->
+ lists:foreach(fun(Option) ->
+ case Option of
+ K when is_atom(K) -> erlfdb:set_option(Tx, K);
+ {K, V} -> erlfdb:set_option(Tx, K, V)
+ end
+ end, Options),
+ Tx. \ No newline at end of file