diff options
author | Nick Vatamaniuc <vatamane@apache.org> | 2020-01-31 14:58:41 -0500 |
---|---|---|
committer | Nick Vatamaniuc <nickva@users.noreply.github.com> | 2020-02-14 15:31:13 -0500 |
commit | 148af9efadfe97382523c0259171278849186fe9 (patch) | |
tree | e8d37584273ab1d0f912506a72620283c182c5a5 | |
parent | 31878f8fb4f61f401f0e5e4b412052ad35b1893c (diff) | |
download | couchdb-148af9efadfe97382523c0259171278849186fe9.tar.gz |
Implement mult-transactional iterators for _changes feeds
Previously changes feeds would fail if they streamed data for more than five
seconds. This was because of the FoundationDB's transaction time limit. After
the timeout fired, an 1007 (transaction_too_long) error was raised, and
transaction was retried. The emitted changes feed would often crash or simple
hang because the HTTP state would be garbled as response data was re-sent over
the same socket stream again.
To fix the issue introduce a new `{restart_tx, true}` option for
`fold_range/4`. This option sets up a new transaction to continue iterating
over the range from where the last one left off.
To avoid data being resent in the response stream, user callback functions must
first read all the data they plan on sending during that callback, send it out,
and then after that it must not do any more db reads so as not to trigger a
`transaction_too_old` error.
-rw-r--r-- | src/fabric/include/fabric2.hrl | 4 | ||||
-rw-r--r-- | src/fabric/src/fabric2_db.erl | 7 | ||||
-rw-r--r-- | src/fabric/src/fabric2_fdb.erl | 174 | ||||
-rw-r--r-- | src/fabric/test/fabric2_changes_fold_tests.erl | 241 | ||||
-rw-r--r-- | src/fabric/test/fabric2_test.hrl | 8 |
5 files changed, 364 insertions, 70 deletions
diff --git a/src/fabric/include/fabric2.hrl b/src/fabric/include/fabric2.hrl index 828a51b8f..d07a73793 100644 --- a/src/fabric/include/fabric2.hrl +++ b/src/fabric/include/fabric2.hrl @@ -57,6 +57,10 @@ -define(PDICT_TX_ID_KEY, '$fabric_tx_id'). -define(PDICT_TX_RES_KEY, '$fabric_tx_result'). -define(PDICT_ON_COMMIT_FUN, '$fabric_on_commit_fun'). +-define(PDICT_FOLD_ACC_STATE, '$fabric_fold_acc_state'). + +% Let's keep these in ascending order +-define(TRANSACTION_TOO_OLD, 1007). -define(COMMIT_UNKNOWN_RESULT, 1021). diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl index 17c899d27..3349722ad 100644 --- a/src/fabric/src/fabric2_db.erl +++ b/src/fabric/src/fabric2_db.erl @@ -872,6 +872,11 @@ fold_changes(Db, SinceSeq, UserFun, UserAcc, Options) -> _ -> fwd end, + RestartTx = case fabric2_util:get_value(restart_tx, Options) of + undefined -> [{restart_tx, true}]; + _AlreadySet -> [] + end, + StartKey = get_since_seq(TxDb, Dir, SinceSeq), EndKey = case Dir of rev -> fabric2_util:seq_zero_vs(); @@ -880,7 +885,7 @@ fold_changes(Db, SinceSeq, UserFun, UserAcc, Options) -> FoldOpts = [ {start_key, StartKey}, {end_key, EndKey} - ] ++ Options, + ] ++ RestartTx ++ Options, {ok, fabric2_fdb:fold_range(TxDb, Prefix, fun({K, V}, Acc) -> {SeqVS} = erlfdb_tuple:unpack(K, Prefix), diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl index 99611b0a1..00bb4855a 100644 --- a/src/fabric/src/fabric2_fdb.erl +++ b/src/fabric/src/fabric2_fdb.erl @@ -72,6 +72,23 @@ -include("fabric2.hrl"). +-define(MAX_FOLD_RANGE_RETRIES, 3). + + +-record(fold_acc, { + db, + restart_tx, + start_key, + end_key, + limit, + skip, + retries, + base_opts, + user_fun, + user_acc +}). + + transactional(Fun) -> do_transaction(Fun, undefined). @@ -835,25 +852,49 @@ get_last_change(#{} = Db) -> end. -fold_range(#{} = Db, RangePrefix, Callback, Acc, Options) -> - #{ - tx := Tx - } = ensure_current(Db), - fold_range({tx, Tx}, RangePrefix, Callback, Acc, Options); - -fold_range({tx, Tx}, RangePrefix, UserCallback, UserAcc, Options) -> - case fabric2_util:get_value(limit, Options) of - 0 -> - % FoundationDB treats a limit of 0 as unlimited - % so we have to guard for that here. - UserAcc; - _ -> - {Start, End, Skip, FoldOpts} = get_fold_opts(RangePrefix, Options), - Callback = fun fold_range_cb/2, - Acc = {skip, Skip, UserCallback, UserAcc}, - {skip, _, UserCallback, OutAcc} = - erlfdb:fold_range(Tx, Start, End, Callback, Acc, FoldOpts), - OutAcc +fold_range(TxOrDb, RangePrefix, UserFun, UserAcc, Options) -> + {Db, Tx} = case TxOrDb of + {tx, TxObj} -> + {undefined, TxObj}; + #{} = DbObj -> + DbObj1 = #{tx := TxObj} = ensure_current(DbObj), + {DbObj1, TxObj} + end, + % FoundationDB treats a limit 0 of as unlimited so we guard against it + case fabric2_util:get_value(limit, Options) of 0 -> UserAcc; _ -> + FAcc = get_fold_acc(Db, RangePrefix, UserFun, UserAcc, Options), + try + fold_range(Tx, FAcc) + after + erase(?PDICT_FOLD_ACC_STATE) + end + end. + + +fold_range(Tx, FAcc) -> + #fold_acc{ + start_key = Start, + end_key = End, + limit = Limit, + base_opts = BaseOpts, + restart_tx = DoRestart + } = FAcc, + case DoRestart of false -> ok; true -> + ok = erlfdb:set_option(Tx, disallow_writes) + end, + Opts = [{limit, Limit} | BaseOpts], + Callback = fun fold_range_cb/2, + try + #fold_acc{ + user_acc = FinalUserAcc + } = erlfdb:fold_range(Tx, Start, End, Callback, FAcc, Opts), + FinalUserAcc + catch error:{erlfdb_error, ?TRANSACTION_TOO_OLD} when DoRestart -> + % Possibly handle cluster_version_changed and future_version as well to + % continue iteration instead fallback to transactional and retrying + % from the beginning which is bound to fail when streaming data out to a + % socket. + fold_range(Tx, restart_fold(Tx, FAcc)) end. @@ -1297,7 +1338,9 @@ chunkify_binary(Data) -> end. -get_fold_opts(RangePrefix, Options) -> +get_fold_acc(Db, RangePrefix, UserCallback, UserAcc, Options) + when is_map(Db) orelse Db =:= undefined -> + Reverse = case fabric2_util:get_value(dir, Options) of rev -> true; _ -> false @@ -1362,8 +1405,8 @@ get_fold_opts(RangePrefix, Options) -> end, Limit = case fabric2_util:get_value(limit, Options) of - L when is_integer(L), L >= 0 -> [{limit, L + Skip}]; - undefined -> [] + L when is_integer(L), L >= 0 -> L + Skip; + undefined -> 0 end, TargetBytes = case fabric2_util:get_value(target_bytes, Options) of @@ -1381,21 +1424,68 @@ get_fold_opts(RangePrefix, Options) -> B when is_boolean(B) -> [{snapshot, B}] end, - OutOpts = [{reverse, Reverse}] - ++ Limit + BaseOpts = [{reverse, Reverse}] ++ TargetBytes ++ StreamingMode ++ Snapshot, - {StartKey3, EndKey3, Skip, OutOpts}. + RestartTx = fabric2_util:get_value(restart_tx, Options, false), + + #fold_acc{ + db = Db, + start_key = StartKey3, + end_key = EndKey3, + skip = Skip, + limit = Limit, + retries = 0, + base_opts = BaseOpts, + restart_tx = RestartTx, + user_fun = UserCallback, + user_acc = UserAcc + }. + +fold_range_cb({K, V}, #fold_acc{} = Acc) -> + #fold_acc{ + skip = Skip, + limit = Limit, + user_fun = UserFun, + user_acc = UserAcc, + base_opts = Opts + } = Acc, + Acc1 = case Skip =:= 0 of + true -> + UserAcc1 = UserFun({K, V}, UserAcc), + Acc#fold_acc{limit = max(0, Limit - 1), user_acc = UserAcc1}; + false -> + Acc#fold_acc{skip = Skip - 1, limit = Limit - 1} + end, + Acc2 = case fabric2_util:get_value(reverse, Opts, false) of + true -> Acc1#fold_acc{end_key = erlfdb_key:last_less_or_equal(K)}; + false -> Acc1#fold_acc{start_key = erlfdb_key:first_greater_than(K)} + end, + put(?PDICT_FOLD_ACC_STATE, Acc2), + Acc2. -fold_range_cb(KV, {skip, 0, Callback, Acc}) -> - NewAcc = Callback(KV, Acc), - {skip, 0, Callback, NewAcc}; -fold_range_cb(_KV, {skip, N, Callback, Acc}) when is_integer(N), N > 0 -> - {skip, N - 1, Callback, Acc}. +restart_fold(Tx, #fold_acc{} = Acc) -> + erase(?PDICT_CHECKED_MD_IS_CURRENT), + % Not actually committing anyting so we skip on-commit handlers here. Those + % are usually to refresh db handles in the cache. If the iterator runs for + % a while it might be inserting a stale handle in there anyway. + erase({?PDICT_ON_COMMIT_FUN, Tx}), + + ok = erlfdb:reset(Tx), + + case {erase(?PDICT_FOLD_ACC_STATE), Acc#fold_acc.retries} of + {#fold_acc{db = Db} = Acc1, _} -> + Acc1#fold_acc{db = check_db_instance(Db), retries = 0}; + {undefined, Retries} when Retries < ?MAX_FOLD_RANGE_RETRIES -> + Db = check_db_instance(Acc#fold_acc.db), + Acc#fold_acc{db = Db, retries = Retries + 1}; + {undefined, _} -> + error(fold_range_not_progressing) + end. get_db_handle() -> @@ -1435,6 +1525,30 @@ ensure_current(#{} = Db0, CheckDbVersion) -> end. +check_db_instance(undefined) -> + undefined; + +check_db_instance(#{} = Db) -> + require_transaction(Db), + case check_metadata_version(Db) of + {current, Db1} -> + Db1; + {stale, Db1} -> + #{ + tx := Tx, + uuid := UUID, + name := DbName, + layer_prefix := LayerPrefix + } = Db1, + DbPrefix = erlfdb_tuple:pack({?DBS, DbName}, LayerPrefix), + UUIDKey = erlfdb_tuple:pack({?DB_CONFIG, <<"uuid">>}, DbPrefix), + case erlfdb:wait(erlfdb:get(Tx, UUIDKey)) of + UUID -> Db1; + _ -> error(database_does_not_exist) + end + end. + + is_transaction_applied(Tx) -> is_commit_unknown_result() andalso has_transaction_id() diff --git a/src/fabric/test/fabric2_changes_fold_tests.erl b/src/fabric/test/fabric2_changes_fold_tests.erl index 8a29bcb00..fddf1802b 100644 --- a/src/fabric/test/fabric2_changes_fold_tests.erl +++ b/src/fabric/test/fabric2_changes_fold_tests.erl @@ -21,28 +21,55 @@ -define(DOC_COUNT, 25). +-define(PDICT_ERROR_IN_FOLD_RANGE, '$fabric2_error_in_fold_range'). +-define(PDICT_ERROR_IN_USER_FUN, '$fabric2_error_throw_in_user_fun'). + changes_fold_test_() -> { "Test changes fold operations", { setup, - fun setup/0, - fun cleanup/1, - with([ - ?TDEF(fold_changes_basic), - ?TDEF(fold_changes_since_now), - ?TDEF(fold_changes_since_seq), - ?TDEF(fold_changes_basic_rev), - ?TDEF(fold_changes_since_now_rev), - ?TDEF(fold_changes_since_seq_rev) - ]) + fun setup_all/0, + fun teardown_all/1, + { + foreach, + fun setup/0, + fun cleanup/1, + [ + ?TDEF_FE(fold_changes_basic), + ?TDEF_FE(fold_changes_since_now), + ?TDEF_FE(fold_changes_since_seq), + ?TDEF_FE(fold_changes_basic_rev), + ?TDEF_FE(fold_changes_since_now_rev), + ?TDEF_FE(fold_changes_since_seq_rev), + ?TDEF_FE(fold_changes_basic_tx_too_long), + ?TDEF_FE(fold_changes_reverse_tx_too_long), + ?TDEF_FE(fold_changes_tx_too_long_with_single_row_emits), + ?TDEF_FE(fold_changes_since_seq_tx_too_long), + ?TDEF_FE(fold_changes_not_progressing) + ] + } } }. -setup() -> +setup_all() -> Ctx = test_util:start_couch([fabric]), + meck:new(erlfdb, [passthrough]), + Ctx. + + +teardown_all(Ctx) -> + meck:unload(), + test_util:stop_couch(Ctx). + + +setup() -> + meck:expect(erlfdb, fold_range, fun(Tx, Start, End, Callback, Acc, Opts) -> + maybe_tx_too_long(?PDICT_ERROR_IN_FOLD_RANGE), + meck:passthrough([Tx, Start, End, Callback, Acc, Opts]) + end), {ok, Db} = fabric2_db:create(?tempdb(), [{user_ctx, ?ADMIN_USER}]), Rows = lists:map(fun(Val) -> DocId = fabric2_util:uuid(), @@ -59,57 +86,193 @@ setup() -> rev_id => RevId } end, lists:seq(1, ?DOC_COUNT)), - {Db, Rows, Ctx}. + {Db, Rows}. -cleanup({Db, _DocIdRevs, Ctx}) -> - ok = fabric2_db:delete(fabric2_db:name(Db), []), - test_util:stop_couch(Ctx). +cleanup({Db, _DocIdRevs}) -> + reset_error_counts(), + ok = fabric2_db:delete(fabric2_db:name(Db), []). -fold_changes_basic({Db, DocRows, _}) -> - {ok, Rows} = fabric2_db:fold_changes(Db, 0, fun fold_fun/2, []), - ?assertEqual(lists:reverse(DocRows), Rows). +fold_changes_basic({Db, DocRows}) -> + ?assertEqual(lists:reverse(DocRows), changes(Db)). -fold_changes_since_now({Db, _, _}) -> - {ok, Rows} = fabric2_db:fold_changes(Db, now, fun fold_fun/2, []), - ?assertEqual([], Rows). +fold_changes_since_now({Db, _}) -> + ?assertEqual([], changes(Db, now, [])). -fold_changes_since_seq({_, [], _}) -> +fold_changes_since_seq({_, []}) -> ok; -fold_changes_since_seq({Db, [Row | RestRows], _}) -> +fold_changes_since_seq({Db, [Row | RestRows]}) -> #{sequence := Since} = Row, - {ok, Rows} = fabric2_db:fold_changes(Db, Since, fun fold_fun/2, []), - ?assertEqual(lists:reverse(RestRows), Rows), - fold_changes_since_seq({Db, RestRows, nil}). + ?assertEqual(lists:reverse(RestRows), changes(Db, Since, [])), + fold_changes_since_seq({Db, RestRows}). -fold_changes_basic_rev({Db, _, _}) -> - Opts = [{dir, rev}], - {ok, Rows} = fabric2_db:fold_changes(Db, 0, fun fold_fun/2, [], Opts), - ?assertEqual([], Rows). +fold_changes_basic_rev({Db, _}) -> + ?assertEqual([], changes(Db, 0, [{dir, rev}])). -fold_changes_since_now_rev({Db, DocRows, _}) -> - Opts = [{dir, rev}], - {ok, Rows} = fabric2_db:fold_changes(Db, now, fun fold_fun/2, [], Opts), - ?assertEqual(DocRows, Rows). +fold_changes_since_now_rev({Db, DocRows}) -> + ?assertEqual(DocRows, changes(Db, now, [{dir, rev}])). -fold_changes_since_seq_rev({_, [], _}) -> +fold_changes_since_seq_rev({_, []}) -> ok; -fold_changes_since_seq_rev({Db, DocRows, _}) -> +fold_changes_since_seq_rev({Db, DocRows}) -> #{sequence := Since} = lists:last(DocRows), Opts = [{dir, rev}], - {ok, Rows} = fabric2_db:fold_changes(Db, Since, fun fold_fun/2, [], Opts), - ?assertEqual(DocRows, Rows), + ?assertEqual(DocRows, changes(Db, Since, Opts)), RestRows = lists:sublist(DocRows, length(DocRows) - 1), - fold_changes_since_seq_rev({Db, RestRows, nil}). + fold_changes_since_seq_rev({Db, RestRows}). + + +fold_changes_basic_tx_too_long({Db, DocRows0}) -> + DocRows = lists:reverse(DocRows0), + + tx_too_long_errors(0, 1), + ?assertEqual(DocRows, changes(Db)), + + tx_too_long_errors(1, 0), + ?assertEqual(DocRows, changes(Db)), + + % Blow up in user fun but after emitting one row successfully. + tx_too_long_errors({1, 1}, 0), + ?assertEqual(DocRows, changes(Db)), + + % Blow up before last document + tx_too_long_errors({?DOC_COUNT - 1, 1}, 0), + ?assertEqual(DocRows, changes(Db)), + + % Emit one value, then blow up in user function and then blow up twice in + % fold_range. But it is not enough to stop the iteration. + tx_too_long_errors({1, 1}, {1, 2}), + ?assertEqual(DocRows, changes(Db)). + + +fold_changes_reverse_tx_too_long({Db, DocRows}) -> + Opts = [{dir, rev}], + + tx_too_long_errors(0, 1), + ?assertEqual([], changes(Db, 0, Opts)), + + tx_too_long_errors(1, 0), + ?assertEqual([], changes(Db, 0, Opts)), + + tx_too_long_errors(1, 0), + ?assertEqual(DocRows, changes(Db, now, Opts)), + + tx_too_long_errors(1, 0), + ?assertEqual(DocRows, changes(Db, now, Opts)), + + % Blow up in user fun but after emitting one row successfully. + tx_too_long_errors({1, 1}, 0), + ?assertEqual(DocRows, changes(Db, now, Opts)), + + % Blow up before last document + tx_too_long_errors({?DOC_COUNT - 1, 1}, 0), + ?assertEqual(DocRows, changes(Db, now, Opts)), + + % Emit value, blow up in user function, and twice in fold_range + tx_too_long_errors({1, 1}, {1, 2}), + ?assertEqual(DocRows, changes(Db, now, Opts)). + + +fold_changes_tx_too_long_with_single_row_emits({Db, DocRows0}) -> + % This test does a few basic operations while forcing erlfdb range fold to + % emit a single row at a time, thus forcing it to use continuations while + % also inducing tx errors + Opts = [{target_bytes, 1}], + DocRows = lists:reverse(DocRows0), + + tx_too_long_errors(0, 1), + ?assertEqual(DocRows, changes(Db, 0, Opts)), + + tx_too_long_errors(1, 0), + ?assertEqual(DocRows, changes(Db, 0, Opts)), + + % Blow up in user fun but after emitting one row successfully. + tx_too_long_errors({1, 1}, 0), + ?assertEqual(DocRows, changes(Db, 0, Opts)), + + % Blow up before last document + tx_too_long_errors({?DOC_COUNT - 1, 1}, 0), + ?assertEqual(DocRows, changes(Db, 0, Opts)). + + +fold_changes_since_seq_tx_too_long({Db, Rows}) -> + % Blow up after after a successful emit, then twice + % in range fold call. Also re-use already existing basic + % fold_changes_since_seq test function. + tx_too_long_errors({1, 1}, {1, 2}), + fold_changes_since_seq({Db, Rows}). + + +fold_changes_not_progressing({Db, _}) -> + % Fail in first fold range call. + tx_too_long_errors(5, 0), + ?assertError(fold_range_not_progressing, changes(Db)), + + % Fail in first user fun call. + tx_too_long_errors(0, 5), + ?assertError(fold_range_not_progressing, changes(Db)), + + % Blow up in last user fun call + tx_too_long_errors({?DOC_COUNT - 1, 5}, 0), + ?assertError(fold_range_not_progressing, changes(Db)), + + % Blow up in user function after one success. + tx_too_long_errors({1, 5}, 0), + ?assertError(fold_range_not_progressing, changes(Db)), + + % Emit value, blow up in user function, then keep blowing up in fold_range. + tx_too_long_errors({1, 1}, {1, 4}), + ?assertError(fold_range_not_progressing, changes(Db)). fold_fun(#{} = Change, Acc) -> + maybe_tx_too_long(?PDICT_ERROR_IN_USER_FUN), {ok, [Change | Acc]}. + + +tx_too_long_errors(UserFunCount, FoldErrors) when is_integer(UserFunCount) -> + tx_too_long_errors({0, UserFunCount}, FoldErrors); + +tx_too_long_errors(UserFunErrors, FoldCount) when is_integer(FoldCount) -> + tx_too_long_errors(UserFunErrors, {0, FoldCount}); + +tx_too_long_errors({UserFunSkip, UserFunCount}, {FoldSkip, FoldCount}) -> + reset_error_counts(), + put(?PDICT_ERROR_IN_USER_FUN, {UserFunSkip, UserFunCount}), + put(?PDICT_ERROR_IN_FOLD_RANGE, {FoldSkip, FoldCount}). + + +reset_error_counts() -> + erase(?PDICT_ERROR_IN_FOLD_RANGE), + erase(?PDICT_ERROR_IN_USER_FUN). + + +changes(Db) -> + changes(Db, 0, []). + + +changes(Db, Since, Opts) -> + {ok, Rows} = fabric2_db:fold_changes(Db, Since, fun fold_fun/2, [], Opts), + Rows. + + +maybe_tx_too_long(Key) -> + case get(Key) of + {Skip, Count} when is_integer(Skip), Skip > 0 -> + put(Key, {Skip - 1, Count}); + {0, Count} when is_integer(Count), Count > 0 -> + put(Key, {0, Count - 1}), + error({erlfdb_error, 1007}); + {0, 0} -> + ok; + undefined -> + ok + end. diff --git a/src/fabric/test/fabric2_test.hrl b/src/fabric/test/fabric2_test.hrl index a0532b360..9239096fc 100644 --- a/src/fabric/test/fabric2_test.hrl +++ b/src/fabric/test/fabric2_test.hrl @@ -10,9 +10,17 @@ % License for the specific language governing permissions and limitations under % the License. + +% Some test modules do not use with, so squash the unused fun compiler warning +-compile([{nowarn_unused_function, [{with, 1}]}]). + + -define(TDEF(Name), {atom_to_list(Name), fun Name/1}). -define(TDEF(Name, Timeout), {atom_to_list(Name), Timeout, fun Name/1}). +-define(TDEF_FE(Name), fun(Arg) -> {atom_to_list(Name), ?_test(Name(Arg))} end). +-define(TDEF_FE(Name, Timeout), fun(Arg) -> {atom_to_list(Name), {timeout, Timeout, ?_test(Name(Arg))}} end). + with(Tests) -> fun(ArgsTuple) -> |