diff options
Diffstat (limited to 'src/fabric/src/fabric2_fdb.erl')
-rw-r--r-- | src/fabric/src/fabric2_fdb.erl | 96 |
1 files changed, 70 insertions, 26 deletions
diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl index 8bc87926d..403b5bb53 100644 --- a/src/fabric/src/fabric2_fdb.erl +++ b/src/fabric/src/fabric2_fdb.erl @@ -89,6 +89,14 @@ user_acc }). +-record(info_future, { + tx, + db_prefix, + changes_future, + meta_future, + retries = 0 +}). + transactional(Fun) -> do_transaction(Fun, undefined). @@ -386,36 +394,37 @@ get_info_future(Tx, DbPrefix) -> StatsPrefix = erlfdb_tuple:pack({?DB_STATS}, DbPrefix), MetaFuture = erlfdb:get_range_startswith(Tx, StatsPrefix), - {DbPrefix, ChangesFuture, MetaFuture}. + % Save the tx object only if it's read-only as we might retry to get the + % future again after the tx was reset + SaveTx = case erlfdb:get_writes_allowed(Tx) of + true -> undefined; + false -> Tx + end, + #info_future{ + tx = SaveTx, + db_prefix = DbPrefix, + changes_future = ChangesFuture, + meta_future = MetaFuture + }. -get_info_wait({DbPrefix, ChangesFuture, MetaFuture}) -> - RawSeq = case erlfdb:wait(ChangesFuture) of - [] -> - vs_to_seq(fabric2_util:seq_zero_vs()); - [{SeqKey, _}] -> - {?DB_CHANGES, SeqVS} = erlfdb_tuple:unpack(SeqKey, DbPrefix), - vs_to_seq(SeqVS) - end, - CProp = {update_seq, RawSeq}, - MProps = lists:foldl(fun({K, V}, Acc) -> - case erlfdb_tuple:unpack(K, DbPrefix) of - {?DB_STATS, <<"doc_count">>} -> - [{doc_count, ?bin2uint(V)} | Acc]; - {?DB_STATS, <<"doc_del_count">>} -> - [{doc_del_count, ?bin2uint(V)} | Acc]; - {?DB_STATS, <<"sizes">>, Name} -> - Val = ?bin2uint(V), - {_, {Sizes}} = lists:keyfind(sizes, 1, Acc), - NewSizes = lists:keystore(Name, 1, Sizes, {Name, Val}), - lists:keystore(sizes, 1, Acc, {sizes, {NewSizes}}); - {?DB_STATS, _} -> - Acc - end - end, [{sizes, {[]}}], erlfdb:wait(MetaFuture)), +get_info_wait(#info_future{tx = Tx, retries = Retries} = Future) + when Tx =:= undefined orelse Retries >= 2 -> + get_info_wait_int(Future); - [CProp | MProps]. +get_info_wait(#info_future{tx = Tx, retries = Retries} = Future) -> + try + get_info_wait_int(Future) + catch + error:{erlfdb_error, ?TRANSACTION_CANCELLED} -> + Future1 = get_info_future(Tx, Future#info_future.db_prefix), + get_info_wait(Future1#info_future{retries = Retries + 1}); + error:{erlfdb_error, ?TRANSACTION_TOO_OLD} -> + ok = erlfdb:reset(Tx), + Future1 = get_info_future(Tx, Future#info_future.db_prefix), + get_info_wait(Future1#info_future{retries = Retries + 1}) + end. load_config(#{} = Db) -> @@ -1759,6 +1768,41 @@ with_span(Operation, ExtraTags, Fun) -> end. +get_info_wait_int(#info_future{} = InfoFuture) -> + #info_future{ + db_prefix = DbPrefix, + changes_future = ChangesFuture, + meta_future = MetaFuture + } = InfoFuture, + + RawSeq = case erlfdb:wait(ChangesFuture) of + [] -> + vs_to_seq(fabric2_util:seq_zero_vs()); + [{SeqKey, _}] -> + {?DB_CHANGES, SeqVS} = erlfdb_tuple:unpack(SeqKey, DbPrefix), + vs_to_seq(SeqVS) + end, + CProp = {update_seq, RawSeq}, + + MProps = lists:foldl(fun({K, V}, Acc) -> + case erlfdb_tuple:unpack(K, DbPrefix) of + {?DB_STATS, <<"doc_count">>} -> + [{doc_count, ?bin2uint(V)} | Acc]; + {?DB_STATS, <<"doc_del_count">>} -> + [{doc_del_count, ?bin2uint(V)} | Acc]; + {?DB_STATS, <<"sizes">>, Name} -> + Val = ?bin2uint(V), + {_, {Sizes}} = lists:keyfind(sizes, 1, Acc), + NewSizes = lists:keystore(Name, 1, Sizes, {Name, Val}), + lists:keystore(sizes, 1, Acc, {sizes, {NewSizes}}); + {?DB_STATS, _} -> + Acc + end + end, [{sizes, {[]}}], erlfdb:wait(MetaFuture)), + + [CProp | MProps]. + + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). |