diff options
author | Paul J. Davis <paul.joseph.davis@gmail.com> | 2020-02-10 13:08:52 -0600 |
---|---|---|
committer | Paul J. Davis <paul.joseph.davis@gmail.com> | 2020-02-10 14:01:57 -0600 |
commit | 56cfe40fed1a51cde02744dfefe64283c4157c1f (patch) | |
tree | d1fc1d4c2a64ce73f5cc19a2c74c9d3c9ec45209 | |
parent | f4315660b2abf0570b27b581b71bd618a4f0e538 (diff) | |
download | couchdb-multi-transactional-iterators-2-davisp.tar.gz |
[WIP] Restart transactionsmulti-transactional-iterators-2-davisp
-rw-r--r-- | src/fabric/include/fabric2.hrl | 2 | ||||
-rw-r--r-- | src/fabric/src/fabric2_fdb.erl | 193 |
2 files changed, 170 insertions, 25 deletions
diff --git a/src/fabric/include/fabric2.hrl b/src/fabric/include/fabric2.hrl index 828a51b8f..8305adbee 100644 --- a/src/fabric/include/fabric2.hrl +++ b/src/fabric/include/fabric2.hrl @@ -57,7 +57,9 @@ -define(PDICT_TX_ID_KEY, '$fabric_tx_id'). -define(PDICT_TX_RES_KEY, '$fabric_tx_result'). -define(PDICT_ON_COMMIT_FUN, '$fabric_on_commit_fun'). +-define(PDICT_FOLD_ACC_STATE, '$fabric_fold_acc_state'). -define(COMMIT_UNKNOWN_RESULT, 1021). +-define(TRANSACTION_TOO_OLD, 1007). -define(BINARY_CHUNK_SIZE, 100000). diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl index 6abe1f6de..951437262 100644 --- a/src/fabric/src/fabric2_fdb.erl +++ b/src/fabric/src/fabric2_fdb.erl @@ -69,6 +69,19 @@ -include("fabric2.hrl"). +-record(fold_acc, { + db, + restart_tx, + start_key, + end_key, + skip, + base_opts, + restart_tx, + user_fun, + user_acc +}). + + transactional(Fun) -> do_transaction(Fun, undefined). @@ -116,15 +129,16 @@ do_transaction(Fun, LayerPrefix) when is_function(Fun, 1) -> Db = get_db_handle(), try erlfdb:transactional(Db, fun(Tx) -> - case get(erlfdb_trace) of + TraceOpts = case get(erlfdb_trace) of Name when is_binary(Name) -> UId = erlang:unique_integer([positive]), UIdBin = integer_to_binary(UId, 36), TxId = <<Name/binary, "_", UIdBin/binary>>, - erlfdb:set_option(Tx, transaction_logging_enable, TxId); + [{transaction_logging_enable, TxId}]; _ -> - ok + [] end, + apply_tx_options(Tx, TraceOpts ++ Options), case is_transaction_applied(Tx) of true -> get_previous_transaction_result(); @@ -815,25 +829,53 @@ get_last_change(#{} = Db) -> end. -fold_range(#{} = Db, RangePrefix, Callback, Acc, Options) -> +fold_range(#{} = Db, RangePrefix, UserFun, UserAcc, Options) -> #{ tx := Tx - } = ensure_current(Db), - fold_range({tx, Tx}, RangePrefix, Callback, Acc, Options); + } = CurrDb = ensure_current(Db), + case fabric2_util:get_value(limit, Options) of + 0 -> + % FoundationDB treats a limit of 0 as unlimited + % so we have to guard for that here. + UserAcc; + _ -> + FAcc = get_fold_acc(CurrDb, RangePrefix, Options, UserFun, UserAcc), + try + fold_range(Tx, FAcc) + after + erase(?PDICT_FOLD_ACC_STATE) + end + end; -fold_range({tx, Tx}, RangePrefix, UserCallback, UserAcc, Options) -> +fold_range({tx, Tx}, RangePrefix, UserFun, UserAcc, Options) -> case fabric2_util:get_value(limit, Options) of 0 -> % FoundationDB treats a limit of 0 as unlimited % so we have to guard for that here. UserAcc; _ -> - {Start, End, Skip, FoldOpts} = get_fold_opts(RangePrefix, Options), - Callback = fun fold_range_cb/2, - Acc = {skip, Skip, UserCallback, UserAcc}, - {skip, _, UserCallback, OutAcc} = - erlfdb:fold_range(Tx, Start, End, Callback, Acc, FoldOpts), - OutAcc + FAcc1 = get_fold_acc(nil, RangePrefix, Options, UserFun, UserAcc), + try + fold_range(Tx, FAcc1) + after + erase(?PDICT_FOLD_ACC_STATE) + end. + end. + + +fold_range(Tx, FAcc) -> + #fold_acc{ + restart_tx = DoRestart + } = FAcc, + {FStart, FEnd, FOpts} = fold_acc_to_opts(FAcc), + try + #fold_acc{ + user_acc = FinalUserAcc + } = erlfdb:fold_range(Tx, FStart, FEnd, Callback, FAcc1, FOpts), + FinalUserAcc + catch error:{erlfdb_error, ?TRANSACTION_TOO_OLD} when DoRestart -> + NewFAcc = restart_fold(FAcc) + fold_range(Tx, NewFAcc) end. @@ -1277,7 +1319,21 @@ chunkify_binary(Data) -> end. -get_fold_opts(RangePrefix, Options) -> +get_fold_acc(Db, RangePrefix, Options, UserCallback, UserAcc) -> + BaseAcc = case lists:member(restart_tx, Options) of + true when is_map(Db) -> + erlfdb:set_option(Tx, disallow_writes), + #fold_acc{ + db = Db, + restart_tx = true + }; + false -> + #fold_acc{ + db = undefined, + restart_tx = false + } + end, + Reverse = case fabric2_util:get_value(dir, Options) of rev -> true; _ -> false @@ -1342,8 +1398,8 @@ get_fold_opts(RangePrefix, Options) -> end, Limit = case fabric2_util:get_value(limit, Options) of - L when is_integer(L), L >= 0 -> [{limit, L + Skip}]; - undefined -> [] + L when is_integer(L), L >= 0 -> L + Skip; + undefined -> 0 end, TargetBytes = case fabric2_util:get_value(target_bytes, Options) of @@ -1361,21 +1417,99 @@ get_fold_opts(RangePrefix, Options) -> B when is_boolean(B) -> [{snapshot, B}] end, - OutOpts = [{reverse, Reverse}] - ++ Limit + BaseOpts = [{reverse, Reverse}] ++ TargetBytes ++ StreamingMode ++ Snapshot, - {StartKey3, EndKey3, Skip, OutOpts}. - + BaseAcc#fold_acc{ + start_key = StartKey3, + end_key = EndKey3, + skip = Skip, + limit = Limit, + base_opts = Opts, + ucallback = UserCallback, + uacc = UserAcc + }. -fold_range_cb(KV, {skip, 0, Callback, Acc}) -> - NewAcc = Callback(KV, Acc), - {skip, 0, Callback, NewAcc}; -fold_range_cb(_KV, {skip, N, Callback, Acc}) when is_integer(N), N > 0 -> - {skip, N - 1, Callback, Acc}. +fold_acc_to_opts(#fold_acc{} = Acc) -> + #fold_acc{ + start_key = StartKey, + end_key = EndKey, + limit = Limit, + base_opts = BaseOpts + } = Acc, + {StartKey, EndKey, [{limit, Limit} | BaseOpts]}. + + +fold_range_cb({K, V}, #fold_acc{skip = 0} = Acc) -> + #fold_acc{ + tx = Tx, + restart_tx = DoRestart, + limit = Limit, + user_fun = UserFun, + user_acc = UserAcc + } = Acc + NewUserAcc = UserFun({K, V}, UserAcc), + store_fold_acc(Acc#fold_acc{ + start_key = K, + limit = Limit - 1, + user_acc = NewUserAcc + }); + +fold_range_cb({K, _V}, #fold_acc{skip = S} = Acc) when is_integer(S), S > 0 -> + #fold_acc{ + skip = Skip, + limit = Limit + } = Acc, + store_fold_acc(Acc#fold_acc{ + start_key = K, + skip = Skip - 1, + limit = Limit - 1 + }). + + +store_fold_acc(Acc) -> + #fold_acc{ + start_key = StartKey, + skip = Skip, + limit = Limit, + user_acc = UserAcc + } = Acc, + put(?PDICT_FOLD_ACC_STATE, {StartKey, Skip, Limit, UserAc}), + Acc. + + +restart_fold(Acc) -> + erase(?PDICT_CHECKED_MD_IS_CURRENT), + erase(?PDICT_CHECKED_DB_IS_CURRENT) + ok = erlfdb:reset(Tx), + % Do a thing with Acc#fold_acc.db to ensure that we're + % on the same db version? Also, do we need to differentiate + % between db versions and db instances? I.e., do we bail + % when a ddoc has been updated or just if we have done a + % delete/create cycle? + case get(?PDICT_FOLD_ACC_STATE) of + erase(?PDICT_FOLD_ACC_STATE), + {StartKey, Skip, Limit, UserAcc} -> + Acc#fold_acc{ + start_key = StartKey, + skip = Skip, + limit = Limit, + user_acc = UserAcc + }; + undefined -> + % Might need to assert that we haven't started + % folding yet for this case. + % + % We may also want to add a check here that we're + % not in an infinite loop somehow. Mebbe if the + % user_fun is using a different tx that's throwing + % the too_old error that we're not clearing by reseting + % here? + Acc + end. get_db_handle() -> @@ -1512,3 +1646,12 @@ with_span(Operation, ExtraTags, Fun) -> false -> Fun() end. + +apply_tx_options(Tx, Options) -> + lists:foreach(fun(Option) -> + case Option of + K when is_atom(K) -> erlfdb:set_option(Tx, K); + {K, V} -> erlfdb:set_option(Tx, K, V) + end + end, Options), + Tx.
\ No newline at end of file |