summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2020-03-16 13:50:34 -0400
committerNick Vatamaniuc <vatamane@apache.org>2020-03-16 19:06:59 -0400
commit167b4a6d06290fa8fdc35b78c99a7195cd55216a (patch)
tree4fb2a33d8067f2d432aead01eb11a605a40ff697
parentf28a1ad439ffd1a2df3a715a2b696c0f4886bddf (diff)
downloadcouchdb-handle-list-dbs-future-tx-aborts.tar.gz
Handle transaction cancelled errors in list_dbs_info/3handle-list-dbs-future-tx-aborts
`list_dbs_info/3` maintains a queue of up to 100 futures which are used to concurrently fetch data. Previously, if the transaction was reset, and the accumulator inside the fold may have had futures from a previous transaction, which have gotten their results yet, they threw a transaction_canceled (1025) error. To fix this, if we're in a read-only transaction, we return the tx object in the opaque db info record. Then, if `erlfdb:wait/1` throws a transaction canceled error, we re-fetch the future from the now restarted transaction. Potentially, the transaction may also time-out while the futures queues is drained after the main range fold has finished already. Handle that case by reseting the transaction and then re-fetching the futures. To avoid an infinite loop we allow up to 2 retries only. This approach is not the most optimal but simpler as it hides the complexity inside the fabric2_fdb module where we already handle these conditions. It means that every 5 or so seconds we might have to refetch less than 100 extra futures from the queue (as some or all may have gotten their results back already).
-rw-r--r--src/fabric/include/fabric2.hrl1
-rw-r--r--src/fabric/src/fabric2_fdb.erl96
-rw-r--r--src/fabric/test/fabric2_db_crud_tests.erl88
3 files changed, 157 insertions, 28 deletions
diff --git a/src/fabric/include/fabric2.hrl b/src/fabric/include/fabric2.hrl
index f526d7b34..a4f68bdf6 100644
--- a/src/fabric/include/fabric2.hrl
+++ b/src/fabric/include/fabric2.hrl
@@ -68,6 +68,7 @@
-define(TRANSACTION_TOO_OLD, 1007).
-define(FUTURE_VERSION, 1009).
-define(COMMIT_UNKNOWN_RESULT, 1021).
+-define(TRANSACTION_CANCELLED, 1025).
-define(BINARY_CHUNK_SIZE, 100000).
diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl
index 8bc87926d..403b5bb53 100644
--- a/src/fabric/src/fabric2_fdb.erl
+++ b/src/fabric/src/fabric2_fdb.erl
@@ -89,6 +89,14 @@
user_acc
}).
+-record(info_future, {
+ tx,
+ db_prefix,
+ changes_future,
+ meta_future,
+ retries = 0
+}).
+
transactional(Fun) ->
do_transaction(Fun, undefined).
@@ -386,36 +394,37 @@ get_info_future(Tx, DbPrefix) ->
StatsPrefix = erlfdb_tuple:pack({?DB_STATS}, DbPrefix),
MetaFuture = erlfdb:get_range_startswith(Tx, StatsPrefix),
- {DbPrefix, ChangesFuture, MetaFuture}.
+ % Save the tx object only if it's read-only as we might retry to get the
+ % future again after the tx was reset
+ SaveTx = case erlfdb:get_writes_allowed(Tx) of
+ true -> undefined;
+ false -> Tx
+ end,
+ #info_future{
+ tx = SaveTx,
+ db_prefix = DbPrefix,
+ changes_future = ChangesFuture,
+ meta_future = MetaFuture
+ }.
-get_info_wait({DbPrefix, ChangesFuture, MetaFuture}) ->
- RawSeq = case erlfdb:wait(ChangesFuture) of
- [] ->
- vs_to_seq(fabric2_util:seq_zero_vs());
- [{SeqKey, _}] ->
- {?DB_CHANGES, SeqVS} = erlfdb_tuple:unpack(SeqKey, DbPrefix),
- vs_to_seq(SeqVS)
- end,
- CProp = {update_seq, RawSeq},
- MProps = lists:foldl(fun({K, V}, Acc) ->
- case erlfdb_tuple:unpack(K, DbPrefix) of
- {?DB_STATS, <<"doc_count">>} ->
- [{doc_count, ?bin2uint(V)} | Acc];
- {?DB_STATS, <<"doc_del_count">>} ->
- [{doc_del_count, ?bin2uint(V)} | Acc];
- {?DB_STATS, <<"sizes">>, Name} ->
- Val = ?bin2uint(V),
- {_, {Sizes}} = lists:keyfind(sizes, 1, Acc),
- NewSizes = lists:keystore(Name, 1, Sizes, {Name, Val}),
- lists:keystore(sizes, 1, Acc, {sizes, {NewSizes}});
- {?DB_STATS, _} ->
- Acc
- end
- end, [{sizes, {[]}}], erlfdb:wait(MetaFuture)),
+get_info_wait(#info_future{tx = Tx, retries = Retries} = Future)
+ when Tx =:= undefined orelse Retries >= 2 ->
+ get_info_wait_int(Future);
- [CProp | MProps].
+get_info_wait(#info_future{tx = Tx, retries = Retries} = Future) ->
+ try
+ get_info_wait_int(Future)
+ catch
+ error:{erlfdb_error, ?TRANSACTION_CANCELLED} ->
+ Future1 = get_info_future(Tx, Future#info_future.db_prefix),
+ get_info_wait(Future1#info_future{retries = Retries + 1});
+ error:{erlfdb_error, ?TRANSACTION_TOO_OLD} ->
+ ok = erlfdb:reset(Tx),
+ Future1 = get_info_future(Tx, Future#info_future.db_prefix),
+ get_info_wait(Future1#info_future{retries = Retries + 1})
+ end.
load_config(#{} = Db) ->
@@ -1759,6 +1768,41 @@ with_span(Operation, ExtraTags, Fun) ->
end.
+get_info_wait_int(#info_future{} = InfoFuture) ->
+ #info_future{
+ db_prefix = DbPrefix,
+ changes_future = ChangesFuture,
+ meta_future = MetaFuture
+ } = InfoFuture,
+
+ RawSeq = case erlfdb:wait(ChangesFuture) of
+ [] ->
+ vs_to_seq(fabric2_util:seq_zero_vs());
+ [{SeqKey, _}] ->
+ {?DB_CHANGES, SeqVS} = erlfdb_tuple:unpack(SeqKey, DbPrefix),
+ vs_to_seq(SeqVS)
+ end,
+ CProp = {update_seq, RawSeq},
+
+ MProps = lists:foldl(fun({K, V}, Acc) ->
+ case erlfdb_tuple:unpack(K, DbPrefix) of
+ {?DB_STATS, <<"doc_count">>} ->
+ [{doc_count, ?bin2uint(V)} | Acc];
+ {?DB_STATS, <<"doc_del_count">>} ->
+ [{doc_del_count, ?bin2uint(V)} | Acc];
+ {?DB_STATS, <<"sizes">>, Name} ->
+ Val = ?bin2uint(V),
+ {_, {Sizes}} = lists:keyfind(sizes, 1, Acc),
+ NewSizes = lists:keystore(Name, 1, Sizes, {Name, Val}),
+ lists:keystore(sizes, 1, Acc, {sizes, {NewSizes}});
+ {?DB_STATS, _} ->
+ Acc
+ end
+ end, [{sizes, {[]}}], erlfdb:wait(MetaFuture)),
+
+ [CProp | MProps].
+
+
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
diff --git a/src/fabric/test/fabric2_db_crud_tests.erl b/src/fabric/test/fabric2_db_crud_tests.erl
index 205994267..a82afb54d 100644
--- a/src/fabric/test/fabric2_db_crud_tests.erl
+++ b/src/fabric/test/fabric2_db_crud_tests.erl
@@ -18,6 +18,9 @@
-include("fabric2_test.hrl").
+-define(PDICT_RAISE_IN_ERLFDB_WAIT, '$pdict_raise_in_erlfdb_wait').
+
+
crud_test_() ->
{
"Test database CRUD operations",
@@ -39,7 +42,9 @@ crud_test_() ->
?TDEF_FE(list_dbs_info),
?TDEF_FE(list_dbs_info_partial),
?TDEF_FE(list_dbs_tx_too_old),
- ?TDEF_FE(list_dbs_info_tx_too_old)
+ ?TDEF_FE(list_dbs_info_tx_too_old),
+ ?TDEF_FE(get_info_wait_retry_on_tx_too_old),
+ ?TDEF_FE(get_info_wait_retry_on_tx_abort)
]
}
}
@@ -62,7 +67,9 @@ setup() ->
cleanup(_) ->
- fabric2_test_util:tx_too_old_reset_errors().
+ fabric2_test_util:tx_too_old_reset_errors(),
+ reset_fail_erfdb_wait(),
+ meck:reset([erlfdb]).
create_db(_) ->
@@ -256,6 +263,83 @@ list_dbs_info_tx_too_old(_) ->
end, DbNames).
+get_info_wait_retry_on_tx_too_old(_) ->
+ DbName = ?tempdb(),
+ ?assertMatch({ok, _}, fabric2_db:create(DbName, [])),
+
+ {ok, Db} = fabric2_db:open(DbName, []),
+
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = TxDb,
+
+ % Simulate being in a list_dbs_info callback
+ ok = erlfdb:set_option(Tx, disallow_writes),
+
+ InfoF = fabric2_fdb:get_info_future(Tx, DbPrefix),
+ {info_future, _, _, ChangesF, _, _} = InfoF,
+
+ raise_in_erlfdb_wait(ChangesF, {erlfdb_error, 1007}, 3),
+ ?assertError({erlfdb_error, 1007}, fabric2_fdb:get_info_wait(InfoF)),
+
+ raise_in_erlfdb_wait(ChangesF, {erlfdb_error, 1007}, 2),
+ ?assertMatch([{_, _} | _], fabric2_fdb:get_info_wait(InfoF)),
+
+ ?assertEqual(ok, fabric2_db:delete(DbName, []))
+ end).
+
+
+get_info_wait_retry_on_tx_abort(_)->
+ DbName = ?tempdb(),
+ ?assertMatch({ok, _}, fabric2_db:create(DbName, [])),
+
+ {ok, Db} = fabric2_db:open(DbName, []),
+
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = TxDb,
+
+ % Simulate being in a list_dbs_info callback
+ ok = erlfdb:set_option(Tx, disallow_writes),
+
+ InfoF = fabric2_fdb:get_info_future(Tx, DbPrefix),
+ {info_future, _, _, ChangesF, _, _} = InfoF,
+
+ raise_in_erlfdb_wait(ChangesF, {erlfdb_error, 1025}, 3),
+ ?assertError({erlfdb_error, 1025}, fabric2_fdb:get_info_wait(InfoF)),
+
+ raise_in_erlfdb_wait(ChangesF, {erlfdb_error, 1025}, 2),
+ ?assertMatch([{_, _} | _], fabric2_fdb:get_info_wait(InfoF)),
+
+ ?assertEqual(ok, fabric2_db:delete(DbName, []))
+ end).
+
+
+reset_fail_erfdb_wait() ->
+ erase(?PDICT_RAISE_IN_ERLFDB_WAIT),
+ meck:expect(erlfdb, wait, fun(F) -> meck:passthrough([F]) end).
+
+
+raise_in_erlfdb_wait(Future, Error, Count) ->
+ put(?PDICT_RAISE_IN_ERLFDB_WAIT, Count),
+ meck:expect(erlfdb, wait, fun
+ (F) when F =:= Future ->
+ case get(?PDICT_RAISE_IN_ERLFDB_WAIT) of
+ N when is_integer(N), N > 0 ->
+ put(?PDICT_RAISE_IN_ERLFDB_WAIT, N - 1),
+ error(Error);
+ _ ->
+ meck:passthrough([F])
+ end;
+ (F) ->
+ meck:passthrough([F])
+ end).
+
+
is_db_info_member(_, []) ->
false;