diff options
author | Nick Vatamaniuc <vatamane@apache.org> | 2020-01-31 14:58:41 -0500 |
---|---|---|
committer | Nick Vatamaniuc <vatamane@apache.org> | 2020-02-12 12:56:05 -0500 |
commit | a5b2ece3079763c6df2d4fc6879e7cfd2b60b7ca (patch) | |
tree | a0ff769a99d7a9d4490e61a78e51fc5382e04ab6 | |
parent | f4315660b2abf0570b27b581b71bd618a4f0e538 (diff) | |
download | couchdb-multi-transactional-iterators-3.tar.gz |
Use multi-transactional iteratorsmulti-transactional-iterators-3
-rw-r--r-- | src/fabric/include/fabric2.hrl | 4 | ||||
-rw-r--r-- | src/fabric/src/fabric2_fdb.erl | 174 |
2 files changed, 148 insertions, 30 deletions
diff --git a/src/fabric/include/fabric2.hrl b/src/fabric/include/fabric2.hrl index 828a51b8f..d07a73793 100644 --- a/src/fabric/include/fabric2.hrl +++ b/src/fabric/include/fabric2.hrl @@ -57,6 +57,10 @@ -define(PDICT_TX_ID_KEY, '$fabric_tx_id'). -define(PDICT_TX_RES_KEY, '$fabric_tx_result'). -define(PDICT_ON_COMMIT_FUN, '$fabric_on_commit_fun'). +-define(PDICT_FOLD_ACC_STATE, '$fabric_fold_acc_state'). + +% Let's keep these in ascending order +-define(TRANSACTION_TOO_OLD, 1007). -define(COMMIT_UNKNOWN_RESULT, 1021). diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl index 6abe1f6de..0f3c6234a 100644 --- a/src/fabric/src/fabric2_fdb.erl +++ b/src/fabric/src/fabric2_fdb.erl @@ -69,6 +69,23 @@ -include("fabric2.hrl"). +-define(MAX_FOLD_RANGE_RETRIES, 3). + + +-record(fold_acc, { + db, + restart_tx, + start_key, + end_key, + limit, + skip, + retries, + base_opts, + user_fun, + user_acc +}). + + transactional(Fun) -> do_transaction(Fun, undefined). @@ -815,25 +832,49 @@ get_last_change(#{} = Db) -> end. -fold_range(#{} = Db, RangePrefix, Callback, Acc, Options) -> - #{ - tx := Tx - } = ensure_current(Db), - fold_range({tx, Tx}, RangePrefix, Callback, Acc, Options); - -fold_range({tx, Tx}, RangePrefix, UserCallback, UserAcc, Options) -> - case fabric2_util:get_value(limit, Options) of - 0 -> - % FoundationDB treats a limit of 0 as unlimited - % so we have to guard for that here. - UserAcc; - _ -> - {Start, End, Skip, FoldOpts} = get_fold_opts(RangePrefix, Options), - Callback = fun fold_range_cb/2, - Acc = {skip, Skip, UserCallback, UserAcc}, - {skip, _, UserCallback, OutAcc} = - erlfdb:fold_range(Tx, Start, End, Callback, Acc, FoldOpts), - OutAcc +fold_range(TxOrDb, RangePrefix, UserFun, UserAcc, Options) -> + {Db, Tx} = case TxOrDb of + {tx, TxObj} -> + {undefined, TxObj}; + #{} = DbObj -> + DbObj1 = #{tx := TxObj} = ensure_current(DbObj), + {DbObj1, TxObj} + end, + % FoundationDB treats a limit 0 of as unlimited so we guard against it + case fabric2_util:get_value(limit, Options) of 0 -> UserAcc; _ -> + FAcc = get_fold_acc(Db, RangePrefix, UserFun, UserAcc, Options), + try + fold_range(Tx, FAcc) + after + erase(?PDICT_FOLD_ACC_STATE) + end + end. + + +fold_range(Tx, FAcc) -> + #fold_acc{ + start_key = Start, + end_key = End, + limit = Limit, + base_opts = BaseOpts, + restart_tx = DoRestart + } = FAcc, + case DoRestart of false -> ok; true -> + ok = erlfdb:set_option(Tx, disallow_writes) + end, + Opts = [{limit, Limit} | BaseOpts], + Callback = fun fold_range_cb/2, + try + #fold_acc{ + user_acc = FinalUserAcc + } = erlfdb:fold_range(Tx, Start, End, Callback, FAcc, Opts), + FinalUserAcc + catch error:{erlfdb_error, ?TRANSACTION_TOO_OLD} when DoRestart -> + % Possibly handle cluster_version_changed and future_version as well to + % continue iteration instead fallback to transactional and retrying + % from the begging which is bound to fail when streaming data out to a + % socket. + fold_range(Tx, restart_fold(Tx, FAcc)) end. @@ -1277,7 +1318,9 @@ chunkify_binary(Data) -> end. -get_fold_opts(RangePrefix, Options) -> +get_fold_acc(Db, RangePrefix, UserCallback, UserAcc, Options) + when is_map(Db) orelse Db =:= undefined -> + Reverse = case fabric2_util:get_value(dir, Options) of rev -> true; _ -> false @@ -1342,8 +1385,8 @@ get_fold_opts(RangePrefix, Options) -> end, Limit = case fabric2_util:get_value(limit, Options) of - L when is_integer(L), L >= 0 -> [{limit, L + Skip}]; - undefined -> [] + L when is_integer(L), L >= 0 -> L + Skip; + undefined -> 0 end, TargetBytes = case fabric2_util:get_value(target_bytes, Options) of @@ -1361,21 +1404,68 @@ get_fold_opts(RangePrefix, Options) -> B when is_boolean(B) -> [{snapshot, B}] end, - OutOpts = [{reverse, Reverse}] - ++ Limit + BaseOpts = [{reverse, Reverse}] ++ TargetBytes ++ StreamingMode ++ Snapshot, - {StartKey3, EndKey3, Skip, OutOpts}. + RestartTx = fabric2_util:get_value(restart_tx, Options, false), + + #fold_acc{ + db = Db, + start_key = StartKey3, + end_key = EndKey3, + skip = Skip, + limit = Limit, + retries = 0, + base_opts = BaseOpts, + restart_tx = RestartTx, + user_fun = UserCallback, + user_acc = UserAcc + }. + +fold_range_cb({K, V}, #fold_acc{} = Acc) -> + #fold_acc{ + skip = Skip, + limit = Limit, + user_fun = UserFun, + user_acc = UserAcc, + base_opts = Opts + } = Acc, + Acc1 = case Skip =:= 0 of + true -> + UserAcc1 = UserFun({K, V}, UserAcc), + Acc#fold_acc{limit = Limit - 1, user_acc = UserAcc1}; + false -> + Acc#fold_acc{skip = Skip - 1, limit = Limit - 1} + end, + Acc2 = case fabric2_util:get_value(reverse, Opts, false) of + true -> Acc1#fold_acc{end_key = K}; + false -> Acc1#fold_acc{start_key = K} + end, + put(?PDICT_FOLD_ACC_STATE, Acc2), + Acc2. -fold_range_cb(KV, {skip, 0, Callback, Acc}) -> - NewAcc = Callback(KV, Acc), - {skip, 0, Callback, NewAcc}; -fold_range_cb(_KV, {skip, N, Callback, Acc}) when is_integer(N), N > 0 -> - {skip, N - 1, Callback, Acc}. +restart_fold(Tx, #fold_acc{} = Acc) -> + erase(?PDICT_CHECKED_MD_IS_CURRENT), + % Not actually committing anyting so we skip on-commit handlers here. Those + % are usually to refresh db handles in the cache. If the iterator runs for + % a while it might be inserting a stale handle in there anyway. + erase({?PDICT_ON_COMMIT_FUN, Tx}), + + ok = erlfdb:reset(Tx), + + case {erase(?PDICT_FOLD_ACC_STATE), Acc#fold_acc.retries} of + {#fold_acc{db = Db} = Acc1, _} -> + Acc1#fold_acc{db = check_db_instance(Db), retries = 0}; + {undefined, Retries} when Retries < ?MAX_FOLD_RANGE_RETRIES -> + Db = check_db_instance(Acc#fold_acc.db), + Acc#fold_acc{db = Db, retries = Retries + 1}; + {undefined, _} -> + error(fold_range_not_progressing) + end. get_db_handle() -> @@ -1415,6 +1505,30 @@ ensure_current(#{} = Db0, CheckDbVersion) -> end. +check_db_instance(undefined) -> + undefined; + +check_db_instance(#{} = Db) -> + require_transaction(Db), + case check_metadata_version(Db) of + {current, Db1} -> + Db1; + {stale, Db1} -> + #{ + tx := Tx, + uuid := UUID, + name := DbName, + layer_prefix := LayerPrefix + } = Db1, + DbPrefix = erlfdb_tuple:pack({?DBS, DbName}, LayerPrefix), + UUIDKey = erlfdb_tuple:pack({?DB_CONFIG, <<"uuid">>}, DbPrefix), + case erlfdb:wait(erlfdb:get(Tx, UUIDKey)) of + UUID -> Db1; + _ -> error(database_does_not_exist) + end + end. + + is_transaction_applied(Tx) -> is_commit_unknown_result() andalso has_transaction_id() |