diff options
author | Nick Vatamaniuc <vatamane@apache.org> | 2020-03-16 13:50:34 -0400 |
---|---|---|
committer | Nick Vatamaniuc <nickva@users.noreply.github.com> | 2020-03-17 13:04:36 -0400 |
commit | 0cafb178488e77e94b29607d278aae22ffb98187 (patch) | |
tree | b20378f4f1f6d46719c3d200a3b6919a29d0c262 | |
parent | 3bac80479bd57535e5748dd4e6c3bc53d1ef7bb0 (diff) | |
download | couchdb-0cafb178488e77e94b29607d278aae22ffb98187.tar.gz |
Handle transaction cancelled errors in list_dbs_info/3
`list_dbs_info/3` maintains a queue of up to 100 futures which are used to
concurrently fetch data. Previously, if the transaction was reset, and the
accumulator inside the fold may have had futures from a previous transaction,
which have gotten their results yet, they threw a transaction_canceled (1025)
error.
To fix this, if we're in a read-only transaction, we return the tx object in
the opaque db info record. Then, if `erlfdb:wait/1` throws a transaction
canceled error, we re-fetch the future from the now restarted transaction.
Potentially, the transaction may also time-out while the futures queues is
drained after the main range fold has finished already. Handle that case by
reseting the transaction and then re-fetching the futures. To avoid an infinite
loop we allow up to 2 retries only.
This approach is not the most optimal but simpler as it hides the complexity
inside the fabric2_fdb module where we already handle these conditions. It
means that every 5 or so seconds we might have to refetch less than 100 extra
futures from the queue (as some or all may have gotten their results back
already).
-rw-r--r-- | src/fabric/include/fabric2.hrl | 1 | ||||
-rw-r--r-- | src/fabric/src/fabric2_fdb.erl | 96 | ||||
-rw-r--r-- | src/fabric/test/fabric2_db_crud_tests.erl | 88 |
3 files changed, 157 insertions, 28 deletions
diff --git a/src/fabric/include/fabric2.hrl b/src/fabric/include/fabric2.hrl index f526d7b34..a4f68bdf6 100644 --- a/src/fabric/include/fabric2.hrl +++ b/src/fabric/include/fabric2.hrl @@ -68,6 +68,7 @@ -define(TRANSACTION_TOO_OLD, 1007). -define(FUTURE_VERSION, 1009). -define(COMMIT_UNKNOWN_RESULT, 1021). +-define(TRANSACTION_CANCELLED, 1025). -define(BINARY_CHUNK_SIZE, 100000). diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl index 8bc87926d..403b5bb53 100644 --- a/src/fabric/src/fabric2_fdb.erl +++ b/src/fabric/src/fabric2_fdb.erl @@ -89,6 +89,14 @@ user_acc }). +-record(info_future, { + tx, + db_prefix, + changes_future, + meta_future, + retries = 0 +}). + transactional(Fun) -> do_transaction(Fun, undefined). @@ -386,36 +394,37 @@ get_info_future(Tx, DbPrefix) -> StatsPrefix = erlfdb_tuple:pack({?DB_STATS}, DbPrefix), MetaFuture = erlfdb:get_range_startswith(Tx, StatsPrefix), - {DbPrefix, ChangesFuture, MetaFuture}. + % Save the tx object only if it's read-only as we might retry to get the + % future again after the tx was reset + SaveTx = case erlfdb:get_writes_allowed(Tx) of + true -> undefined; + false -> Tx + end, + #info_future{ + tx = SaveTx, + db_prefix = DbPrefix, + changes_future = ChangesFuture, + meta_future = MetaFuture + }. -get_info_wait({DbPrefix, ChangesFuture, MetaFuture}) -> - RawSeq = case erlfdb:wait(ChangesFuture) of - [] -> - vs_to_seq(fabric2_util:seq_zero_vs()); - [{SeqKey, _}] -> - {?DB_CHANGES, SeqVS} = erlfdb_tuple:unpack(SeqKey, DbPrefix), - vs_to_seq(SeqVS) - end, - CProp = {update_seq, RawSeq}, - MProps = lists:foldl(fun({K, V}, Acc) -> - case erlfdb_tuple:unpack(K, DbPrefix) of - {?DB_STATS, <<"doc_count">>} -> - [{doc_count, ?bin2uint(V)} | Acc]; - {?DB_STATS, <<"doc_del_count">>} -> - [{doc_del_count, ?bin2uint(V)} | Acc]; - {?DB_STATS, <<"sizes">>, Name} -> - Val = ?bin2uint(V), - {_, {Sizes}} = lists:keyfind(sizes, 1, Acc), - NewSizes = lists:keystore(Name, 1, Sizes, {Name, Val}), - lists:keystore(sizes, 1, Acc, {sizes, {NewSizes}}); - {?DB_STATS, _} -> - Acc - end - end, [{sizes, {[]}}], erlfdb:wait(MetaFuture)), +get_info_wait(#info_future{tx = Tx, retries = Retries} = Future) + when Tx =:= undefined orelse Retries >= 2 -> + get_info_wait_int(Future); - [CProp | MProps]. +get_info_wait(#info_future{tx = Tx, retries = Retries} = Future) -> + try + get_info_wait_int(Future) + catch + error:{erlfdb_error, ?TRANSACTION_CANCELLED} -> + Future1 = get_info_future(Tx, Future#info_future.db_prefix), + get_info_wait(Future1#info_future{retries = Retries + 1}); + error:{erlfdb_error, ?TRANSACTION_TOO_OLD} -> + ok = erlfdb:reset(Tx), + Future1 = get_info_future(Tx, Future#info_future.db_prefix), + get_info_wait(Future1#info_future{retries = Retries + 1}) + end. load_config(#{} = Db) -> @@ -1759,6 +1768,41 @@ with_span(Operation, ExtraTags, Fun) -> end. +get_info_wait_int(#info_future{} = InfoFuture) -> + #info_future{ + db_prefix = DbPrefix, + changes_future = ChangesFuture, + meta_future = MetaFuture + } = InfoFuture, + + RawSeq = case erlfdb:wait(ChangesFuture) of + [] -> + vs_to_seq(fabric2_util:seq_zero_vs()); + [{SeqKey, _}] -> + {?DB_CHANGES, SeqVS} = erlfdb_tuple:unpack(SeqKey, DbPrefix), + vs_to_seq(SeqVS) + end, + CProp = {update_seq, RawSeq}, + + MProps = lists:foldl(fun({K, V}, Acc) -> + case erlfdb_tuple:unpack(K, DbPrefix) of + {?DB_STATS, <<"doc_count">>} -> + [{doc_count, ?bin2uint(V)} | Acc]; + {?DB_STATS, <<"doc_del_count">>} -> + [{doc_del_count, ?bin2uint(V)} | Acc]; + {?DB_STATS, <<"sizes">>, Name} -> + Val = ?bin2uint(V), + {_, {Sizes}} = lists:keyfind(sizes, 1, Acc), + NewSizes = lists:keystore(Name, 1, Sizes, {Name, Val}), + lists:keystore(sizes, 1, Acc, {sizes, {NewSizes}}); + {?DB_STATS, _} -> + Acc + end + end, [{sizes, {[]}}], erlfdb:wait(MetaFuture)), + + [CProp | MProps]. + + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). diff --git a/src/fabric/test/fabric2_db_crud_tests.erl b/src/fabric/test/fabric2_db_crud_tests.erl index 205994267..a82afb54d 100644 --- a/src/fabric/test/fabric2_db_crud_tests.erl +++ b/src/fabric/test/fabric2_db_crud_tests.erl @@ -18,6 +18,9 @@ -include("fabric2_test.hrl"). +-define(PDICT_RAISE_IN_ERLFDB_WAIT, '$pdict_raise_in_erlfdb_wait'). + + crud_test_() -> { "Test database CRUD operations", @@ -39,7 +42,9 @@ crud_test_() -> ?TDEF_FE(list_dbs_info), ?TDEF_FE(list_dbs_info_partial), ?TDEF_FE(list_dbs_tx_too_old), - ?TDEF_FE(list_dbs_info_tx_too_old) + ?TDEF_FE(list_dbs_info_tx_too_old), + ?TDEF_FE(get_info_wait_retry_on_tx_too_old), + ?TDEF_FE(get_info_wait_retry_on_tx_abort) ] } } @@ -62,7 +67,9 @@ setup() -> cleanup(_) -> - fabric2_test_util:tx_too_old_reset_errors(). + fabric2_test_util:tx_too_old_reset_errors(), + reset_fail_erfdb_wait(), + meck:reset([erlfdb]). create_db(_) -> @@ -256,6 +263,83 @@ list_dbs_info_tx_too_old(_) -> end, DbNames). +get_info_wait_retry_on_tx_too_old(_) -> + DbName = ?tempdb(), + ?assertMatch({ok, _}, fabric2_db:create(DbName, [])), + + {ok, Db} = fabric2_db:open(DbName, []), + + fabric2_fdb:transactional(Db, fun(TxDb) -> + #{ + tx := Tx, + db_prefix := DbPrefix + } = TxDb, + + % Simulate being in a list_dbs_info callback + ok = erlfdb:set_option(Tx, disallow_writes), + + InfoF = fabric2_fdb:get_info_future(Tx, DbPrefix), + {info_future, _, _, ChangesF, _, _} = InfoF, + + raise_in_erlfdb_wait(ChangesF, {erlfdb_error, 1007}, 3), + ?assertError({erlfdb_error, 1007}, fabric2_fdb:get_info_wait(InfoF)), + + raise_in_erlfdb_wait(ChangesF, {erlfdb_error, 1007}, 2), + ?assertMatch([{_, _} | _], fabric2_fdb:get_info_wait(InfoF)), + + ?assertEqual(ok, fabric2_db:delete(DbName, [])) + end). + + +get_info_wait_retry_on_tx_abort(_)-> + DbName = ?tempdb(), + ?assertMatch({ok, _}, fabric2_db:create(DbName, [])), + + {ok, Db} = fabric2_db:open(DbName, []), + + fabric2_fdb:transactional(Db, fun(TxDb) -> + #{ + tx := Tx, + db_prefix := DbPrefix + } = TxDb, + + % Simulate being in a list_dbs_info callback + ok = erlfdb:set_option(Tx, disallow_writes), + + InfoF = fabric2_fdb:get_info_future(Tx, DbPrefix), + {info_future, _, _, ChangesF, _, _} = InfoF, + + raise_in_erlfdb_wait(ChangesF, {erlfdb_error, 1025}, 3), + ?assertError({erlfdb_error, 1025}, fabric2_fdb:get_info_wait(InfoF)), + + raise_in_erlfdb_wait(ChangesF, {erlfdb_error, 1025}, 2), + ?assertMatch([{_, _} | _], fabric2_fdb:get_info_wait(InfoF)), + + ?assertEqual(ok, fabric2_db:delete(DbName, [])) + end). + + +reset_fail_erfdb_wait() -> + erase(?PDICT_RAISE_IN_ERLFDB_WAIT), + meck:expect(erlfdb, wait, fun(F) -> meck:passthrough([F]) end). + + +raise_in_erlfdb_wait(Future, Error, Count) -> + put(?PDICT_RAISE_IN_ERLFDB_WAIT, Count), + meck:expect(erlfdb, wait, fun + (F) when F =:= Future -> + case get(?PDICT_RAISE_IN_ERLFDB_WAIT) of + N when is_integer(N), N > 0 -> + put(?PDICT_RAISE_IN_ERLFDB_WAIT, N - 1), + error(Error); + _ -> + meck:passthrough([F]) + end; + (F) -> + meck:passthrough([F]) + end). + + is_db_info_member(_, []) -> false; |