summaryrefslogtreecommitdiff
path: root/src/fabric/src/fabric2_fdb.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/fabric/src/fabric2_fdb.erl')
-rw-r--r--src/fabric/src/fabric2_fdb.erl96
1 files changed, 70 insertions, 26 deletions
diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl
index 8bc87926d..403b5bb53 100644
--- a/src/fabric/src/fabric2_fdb.erl
+++ b/src/fabric/src/fabric2_fdb.erl
@@ -89,6 +89,14 @@
user_acc
}).
+-record(info_future, {
+ tx,
+ db_prefix,
+ changes_future,
+ meta_future,
+ retries = 0
+}).
+
transactional(Fun) ->
do_transaction(Fun, undefined).
@@ -386,36 +394,37 @@ get_info_future(Tx, DbPrefix) ->
StatsPrefix = erlfdb_tuple:pack({?DB_STATS}, DbPrefix),
MetaFuture = erlfdb:get_range_startswith(Tx, StatsPrefix),
- {DbPrefix, ChangesFuture, MetaFuture}.
+ % Save the tx object only if it's read-only as we might retry to get the
+ % future again after the tx was reset
+ SaveTx = case erlfdb:get_writes_allowed(Tx) of
+ true -> undefined;
+ false -> Tx
+ end,
+ #info_future{
+ tx = SaveTx,
+ db_prefix = DbPrefix,
+ changes_future = ChangesFuture,
+ meta_future = MetaFuture
+ }.
-get_info_wait({DbPrefix, ChangesFuture, MetaFuture}) ->
- RawSeq = case erlfdb:wait(ChangesFuture) of
- [] ->
- vs_to_seq(fabric2_util:seq_zero_vs());
- [{SeqKey, _}] ->
- {?DB_CHANGES, SeqVS} = erlfdb_tuple:unpack(SeqKey, DbPrefix),
- vs_to_seq(SeqVS)
- end,
- CProp = {update_seq, RawSeq},
- MProps = lists:foldl(fun({K, V}, Acc) ->
- case erlfdb_tuple:unpack(K, DbPrefix) of
- {?DB_STATS, <<"doc_count">>} ->
- [{doc_count, ?bin2uint(V)} | Acc];
- {?DB_STATS, <<"doc_del_count">>} ->
- [{doc_del_count, ?bin2uint(V)} | Acc];
- {?DB_STATS, <<"sizes">>, Name} ->
- Val = ?bin2uint(V),
- {_, {Sizes}} = lists:keyfind(sizes, 1, Acc),
- NewSizes = lists:keystore(Name, 1, Sizes, {Name, Val}),
- lists:keystore(sizes, 1, Acc, {sizes, {NewSizes}});
- {?DB_STATS, _} ->
- Acc
- end
- end, [{sizes, {[]}}], erlfdb:wait(MetaFuture)),
+get_info_wait(#info_future{tx = Tx, retries = Retries} = Future)
+ when Tx =:= undefined orelse Retries >= 2 ->
+ get_info_wait_int(Future);
- [CProp | MProps].
+get_info_wait(#info_future{tx = Tx, retries = Retries} = Future) ->
+ try
+ get_info_wait_int(Future)
+ catch
+ error:{erlfdb_error, ?TRANSACTION_CANCELLED} ->
+ Future1 = get_info_future(Tx, Future#info_future.db_prefix),
+ get_info_wait(Future1#info_future{retries = Retries + 1});
+ error:{erlfdb_error, ?TRANSACTION_TOO_OLD} ->
+ ok = erlfdb:reset(Tx),
+ Future1 = get_info_future(Tx, Future#info_future.db_prefix),
+ get_info_wait(Future1#info_future{retries = Retries + 1})
+ end.
load_config(#{} = Db) ->
@@ -1759,6 +1768,41 @@ with_span(Operation, ExtraTags, Fun) ->
end.
+get_info_wait_int(#info_future{} = InfoFuture) ->
+ #info_future{
+ db_prefix = DbPrefix,
+ changes_future = ChangesFuture,
+ meta_future = MetaFuture
+ } = InfoFuture,
+
+ RawSeq = case erlfdb:wait(ChangesFuture) of
+ [] ->
+ vs_to_seq(fabric2_util:seq_zero_vs());
+ [{SeqKey, _}] ->
+ {?DB_CHANGES, SeqVS} = erlfdb_tuple:unpack(SeqKey, DbPrefix),
+ vs_to_seq(SeqVS)
+ end,
+ CProp = {update_seq, RawSeq},
+
+ MProps = lists:foldl(fun({K, V}, Acc) ->
+ case erlfdb_tuple:unpack(K, DbPrefix) of
+ {?DB_STATS, <<"doc_count">>} ->
+ [{doc_count, ?bin2uint(V)} | Acc];
+ {?DB_STATS, <<"doc_del_count">>} ->
+ [{doc_del_count, ?bin2uint(V)} | Acc];
+ {?DB_STATS, <<"sizes">>, Name} ->
+ Val = ?bin2uint(V),
+ {_, {Sizes}} = lists:keyfind(sizes, 1, Acc),
+ NewSizes = lists:keystore(Name, 1, Sizes, {Name, Val}),
+ lists:keystore(sizes, 1, Acc, {sizes, {NewSizes}});
+ {?DB_STATS, _} ->
+ Acc
+ end
+ end, [{sizes, {[]}}], erlfdb:wait(MetaFuture)),
+
+ [CProp | MProps].
+
+
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").