diff options
author | Nick Vatamaniuc <vatamane@apache.org> | 2021-03-06 02:14:52 -0500 |
---|---|---|
committer | Nick Vatamaniuc <nickva@users.noreply.github.com> | 2021-03-23 15:24:27 -0400 |
commit | c777fc3544a10d1c8f38c3c95b8542a68b055cd3 (patch) | |
tree | 2123a5ab9c61094b9ba8ac05f472eb4096ab8756 | |
parent | a7078eef457f7ec4d17a083cbd1da4cd61b53a4b (diff) | |
download | couchdb-c777fc3544a10d1c8f38c3c95b8542a68b055cd3.tar.gz |
Consistent view emits using indexer's GRVs and committed versionstamps
View indexer saves both the GRV it used during the view update, and
the committed versionstamp in the couch_jobs job data section. Then,
the view reader uses those versionstamps to emit a consistent snapshot
of the view.
* The committed versionstamp ensures that the view results can be
emitted even if the view gets updated between the time the view
finished and the reader gets notified.
* The indexer GRV ensures that the view will emit the same doc
revisions in case when include_docs=true option is used as what it
read during the time it indexed the data.
The view reader uses those two versionstamps only if it initiates the
view build itself and then waits for it to build, to ensure that it
doesn't operate on stale GRVs.
Because the committed version is only available after the main
transaction commits, during view indexing finalization there is now a
separate transaction which runs at the end which reads the committed
version then marks the view as `finished`.
Since included docs have to be read at the indexer's GRV version, and,
that version is different than the committed version, those documents
are loaded in from a separate process.
There are a few complications introduced by this commit:
* The versionstamps, especially the indexer GRV ones, may become
stale (older than 5 seconds) quickly and start throwing 1007
(transaction_too_old) errors. This could be mitigated by forcing
the indexer to commit after a shorter interval (1-2 seconds).
* There is some fragility introduced in respect to how included
docs are loaded in a separate process. When that crashes or does
timeout it maybe throw a new type of un-expected error that we
don't catch properly.
-rw-r--r-- | src/couch_views/include/couch_views.hrl | 5 | ||||
-rw-r--r-- | src/couch_views/src/couch_views.erl | 31 | ||||
-rw-r--r-- | src/couch_views/src/couch_views_indexer.erl | 60 | ||||
-rw-r--r-- | src/couch_views/src/couch_views_jobs.erl | 22 | ||||
-rw-r--r-- | src/couch_views/src/couch_views_reader.erl | 76 | ||||
-rw-r--r-- | src/couch_views/test/couch_views_indexer_test.erl | 3 | ||||
-rw-r--r-- | src/couch_views/test/couch_views_map_test.erl | 98 |
7 files changed, 262 insertions, 33 deletions
diff --git a/src/couch_views/include/couch_views.hrl b/src/couch_views/include/couch_views.hrl index 92b8f46fb..e28fa7478 100644 --- a/src/couch_views/include/couch_views.hrl +++ b/src/couch_views/include/couch_views.hrl @@ -40,3 +40,8 @@ % indexing progress -define(INDEX_BUILDING, <<"building">>). -define(INDEX_READY, <<"ready">>). + +% Views/db marker to indicate that the current (latest) FDB GRV version should +% be used. Use `null` so it can can be round-tripped through json serialization +% with couch_jobs. +-define(VIEW_CURRENT_VSN, null). diff --git a/src/couch_views/src/couch_views.erl b/src/couch_views/src/couch_views.erl index 2d916314f..179e2b35b 100644 --- a/src/couch_views/src/couch_views.erl +++ b/src/couch_views/src/couch_views.erl @@ -53,11 +53,12 @@ query(Db, DDoc, ViewName, Callback, Acc0, Args0) -> try fabric2_fdb:transactional(Db, fun(TxDb) -> ok = maybe_update_view(TxDb, Mrst, IsInteractive, Args3), - read_view(TxDb, Mrst, ViewName, Callback, Acc0, Args3) + IdxVStamps = {?VIEW_CURRENT_VSN, ?VIEW_CURRENT_VSN}, + read_view(TxDb, Mrst, ViewName, Callback, Acc0, Args3, IdxVStamps) end) catch throw:{build_view, WaitSeq} -> - couch_views_jobs:build_view(Db, Mrst, WaitSeq), - read_view(Db, Mrst, ViewName, Callback, Acc0, Args3) + {ok, IdxVStamps} = couch_views_jobs:build_view(Db, Mrst, WaitSeq), + read_view(Db, Mrst, ViewName, Callback, Acc0, Args3, IdxVStamps) end. @@ -126,14 +127,32 @@ get_total_view_size(TxDb, Mrst) -> end, 0, Mrst#mrst.views). -read_view(Db, Mrst, ViewName, Callback, Acc0, Args) -> +read_view(Db, Mrst, ViewName, Callback, Acc0, Args, {_, _} = IdxVStamps) -> + {DbReadVsn, ViewReadVsn} = IdxVStamps, fabric2_fdb:transactional(Db, fun(TxDb) -> + case ViewReadVsn of + ?VIEW_CURRENT_VSN -> + ok; + _ when is_integer(ViewReadVsn) -> + % Set the GRV of the transaction to the committed + % version of the indexer. That is the version at which + % the indexer has committed the view data. + erlfdb:set_read_version(maps:get(tx, TxDb), ViewReadVsn) + end, try - couch_views_reader:read(TxDb, Mrst, ViewName, Callback, Acc0, Args) + couch_views_reader:read(TxDb, Mrst, ViewName, Callback, Acc0, Args, + DbReadVsn) after UpdateAfter = Args#mrargs.update == lazy, if UpdateAfter == false -> ok; true -> - couch_views_jobs:build_view_async(TxDb, Mrst) + % Make sure to use a separate transaction if we are + % reading from a stale snapshot + case ViewReadVsn of + ?VIEW_CURRENT_VSN -> + couch_views_jobs:build_view_async(TxDb, Mrst); + _ -> + couch_views_jobs:build_view_async(Db, Mrst) + end end end end). diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl index 3a4da34a7..8556b9946 100644 --- a/src/couch_views/src/couch_views_indexer.erl +++ b/src/couch_views/src/couch_views_indexer.erl @@ -110,7 +110,9 @@ init() -> doc_acc => [], design_opts => Mrst#mrst.design_opts, update_stats => #{}, - tx_retry_limit => tx_retry_limit() + tx_retry_limit => tx_retry_limit(), + db_read_vsn => ?VIEW_CURRENT_VSN, + view_read_vsn => ?VIEW_CURRENT_VSN }, try @@ -209,7 +211,7 @@ update(#{} = Db, Mrst0, State0) -> do_update(Db, Mrst0, State0) -> TxOpts = #{retry_limit => maps:get(tx_retry_limit, State0)}, - fabric2_fdb:transactional(Db, TxOpts, fun(TxDb) -> + TxResult = fabric2_fdb:transactional(Db, TxOpts, fun(TxDb) -> #{ tx := Tx } = TxDb, @@ -224,7 +226,6 @@ do_update(Db, Mrst0, State0) -> #{ doc_acc := DocAcc, last_seq := LastSeq, - view_vs := ViewVS, changes_done := ChangesDone0, design_opts := DesignOpts } = State2, @@ -244,12 +245,18 @@ do_update(Db, Mrst0, State0) -> case is_update_finished(State2) of true -> - maybe_set_build_status(TxDb, Mrst2, ViewVS, ?INDEX_READY), - report_progress(State2#{changes_done := ChangesDone}, finished), - {Mrst2, finished}; + State3 = State2#{changes_done := ChangesDone}, + % We must call report_progress/2 (which, in turn calls + % couch_jobs:update/3) in every transaction where indexing data + % is updated, otherwise we risk another indexer taking over and + % clobbering the indexing data + State4 = report_progress(State3, update), + {Mrst2, finished, State4#{ + db_read_vsn := erlfdb:wait(erlfdb:get_read_version(Tx)) + }}; false -> State3 = report_progress(State2, update), - {Mrst2, State3#{ + {Mrst2, continue, State3#{ tx_db := undefined, count := 0, doc_acc := [], @@ -258,6 +265,37 @@ do_update(Db, Mrst0, State0) -> update_stats := UpdateStats }} end + end), + case TxResult of + {Mrst, continue, State} -> + {Mrst, State}; + {Mrst, finished, State} -> + do_finalize(Mrst, State), + {Mrst, finished} + end. + + +do_finalize(Mrst, State) -> + #{tx_db := OldDb} = State, + ViewReadVsn = erlfdb:get_committed_version(maps:get(tx, OldDb)), + fabric2_fdb:transactional(OldDb#{tx := undefined}, fun(TxDb) -> + % Use the recent committed version as the read + % version. However, if transaction retries due to an error, + % let it acquire its own version to avoid spinning + % continuously due to conflicts or other errors. + case erlfdb:get_last_error() of + undefined -> + erlfdb:set_read_version(maps:get(tx, TxDb), ViewReadVsn); + ErrorCode when is_integer(ErrorCode) -> + ok + end, + State1 = State#{ + tx_db := TxDb, + view_read_vsn := ViewReadVsn + }, + ViewVS = maps:get(view_vs, State1), + maybe_set_build_status(TxDb, Mrst, ViewVS, ?INDEX_READY), + report_progress(State1, finished) end). @@ -583,7 +621,9 @@ report_progress(State, UpdateType) -> job_data := JobData, last_seq := LastSeq, db_seq := DBSeq, - changes_done := ChangesDone + changes_done := ChangesDone, + db_read_vsn := DbReadVsn, + view_read_vsn := ViewReadVsn } = State, #{ @@ -611,7 +651,9 @@ report_progress(State, UpdateType) -> <<"ddoc_id">> => DDocId, <<"sig">> => Sig, <<"view_seq">> => LastSeq, - <<"retries">> => Retries + <<"retries">> => Retries, + <<"db_read_vsn">> => DbReadVsn, + <<"view_read_vsn">> => ViewReadVsn }, NewData = fabric2_active_tasks:update_active_task_info(NewData0, NewActiveTasks), diff --git a/src/couch_views/src/couch_views_jobs.erl b/src/couch_views/src/couch_views_jobs.erl index 4b0aa2660..6e5111862 100644 --- a/src/couch_views/src/couch_views_jobs.erl +++ b/src/couch_views/src/couch_views_jobs.erl @@ -37,7 +37,7 @@ set_timeout() -> build_view(TxDb, Mrst, UpdateSeq) -> {ok, JobId} = build_view_async(TxDb, Mrst), case wait_for_job(JobId, Mrst#mrst.idx_name, UpdateSeq) of - ok -> ok; + {ok, IdxVStamps} -> {ok, IdxVStamps}; retry -> build_view(TxDb, Mrst, UpdateSeq) end. @@ -90,7 +90,7 @@ wait_for_job(JobId, DDocId, UpdateSeq) -> {ok, finished, Data} -> case Data of #{<<"view_seq">> := ViewSeq} when ViewSeq >= UpdateSeq -> - ok; + {ok, idx_vstamps(Data)}; _ -> retry end @@ -117,18 +117,28 @@ wait_for_job(JobId, Subscription, DDocId, UpdateSeq) -> {finished, #{<<"error">> := Error, <<"reason">> := Reason}} -> couch_jobs:remove(undefined, ?INDEX_JOB_TYPE, JobId), erlang:error({binary_to_existing_atom(Error, latin1), Reason}); - {finished, #{<<"view_seq">> := ViewSeq}} when ViewSeq >= UpdateSeq -> - ok; + {finished, #{<<"view_seq">> := ViewSeq} = JobData} + when ViewSeq >= UpdateSeq -> + {ok, idx_vstamps(JobData)}; {finished, _} -> wait_for_job(JobId, DDocId, UpdateSeq); - {_State, #{<<"view_seq">> := ViewSeq}} when ViewSeq >= UpdateSeq -> + {_State, #{<<"view_seq">> := ViewSeq} = JobData} + when ViewSeq >= UpdateSeq -> couch_jobs:unsubscribe(Subscription), - ok; + {ok, idx_vstamps(JobData)}; {_, _} -> wait_for_job(JobId, Subscription, DDocId, UpdateSeq) end. +idx_vstamps(#{} = JobData) -> + #{ + <<"db_read_vsn">> := DbReadVsn, + <<"view_read_vsn">> := ViewReadVsn + } = JobData, + {DbReadVsn, ViewReadVsn}. + + job_id(#{name := DbName}, #mrst{sig = Sig}) -> job_id(DbName, Sig); diff --git a/src/couch_views/src/couch_views_reader.erl b/src/couch_views/src/couch_views_reader.erl index 35ee8a021..0fc910f77 100644 --- a/src/couch_views/src/couch_views_reader.erl +++ b/src/couch_views/src/couch_views_reader.erl @@ -13,7 +13,7 @@ -module(couch_views_reader). -export([ - read/6 + read/7 ]). @@ -23,15 +23,19 @@ -include_lib("fabric/include/fabric2.hrl"). -read(Db, Mrst, ViewName, UserCallback, UserAcc, Args) -> +-define(LOAD_DOC_TIMEOUT_MSEC, 10000). + + +read(Db, Mrst, ViewName, UserCallback, UserAcc, Args, DbReadVsn) -> ReadFun = case Args of - #mrargs{view_type = map} -> fun read_map_view/6; - #mrargs{view_type = red} -> fun read_red_view/6 + #mrargs{view_type = map} -> fun read_map_view/7; + #mrargs{view_type = red} -> fun read_red_view/7 end, - ReadFun(Db, Mrst, ViewName, UserCallback, UserAcc, Args). + ReadFun(Db, Mrst, ViewName, UserCallback, UserAcc, Args, DbReadVsn). -read_map_view(Db, Mrst0, ViewName, UserCallback, UserAcc0, Args) -> +read_map_view(Db, Mrst0, ViewName, UserCallback, UserAcc0, Args, DbReadVsn) -> + DocLoader = maybe_start_doc_loader(Db, DbReadVsn), try fabric2_fdb:transactional(Db, fun(TxDb) -> #mrst{ @@ -51,7 +55,8 @@ read_map_view(Db, Mrst0, ViewName, UserCallback, UserAcc0, Args) -> limit => Args#mrargs.limit, mrargs => undefined, callback => UserCallback, - acc => UserAcc1 + acc => UserAcc1, + doc_loader => DocLoader }, Acc1 = lists:foldl(fun(KeyArgs, KeyAcc0) -> @@ -73,10 +78,12 @@ read_map_view(Db, Mrst0, ViewName, UserCallback, UserAcc0, Args) -> {ok, Final}; throw:{done, Out} -> {ok, Out} + after + stop_doc_loader(DocLoader) end. -read_red_view(Db, Mrst0, ViewName, UserCallback, UserAcc0, Args) -> +read_red_view(Db, Mrst0, ViewName, UserCallback, UserAcc0, Args, _DbReadVsn) -> #mrst{ language = Lang, views = Views @@ -179,7 +186,8 @@ handle_map_row(DocId, Key, Value, Acc) -> limit := Limit, mrargs := Args, callback := UserCallback, - acc := UserAcc0 + acc := UserAcc0, + doc_loader := DocLoader } = Acc, BaseRow = [ @@ -196,7 +204,7 @@ handle_map_row(DocId, Key, Value, Acc) -> end, {TargetDocId, Rev} = get_doc_id(DocId, Value), - DocObj = load_doc(TxDb, TargetDocId, Rev, DocOpts1), + DocObj = load_doc(TxDb, TargetDocId, Rev, DocOpts1, DocLoader), [{doc, DocObj}] end, @@ -340,6 +348,19 @@ get_doc_id(Id, _Value) -> {Id, null}. +load_doc(TxDb, Id, Rev, DocOpts, undefined) -> + load_doc(TxDb, Id, Rev, DocOpts); + +load_doc(_TxDb, Id, Rev, DocOpts, DocLoader) when is_pid(DocLoader) -> + DocLoader ! {load_doc, Id, Rev, DocOpts}, + receive + {load_doc_res, Result} -> Result + after + ?LOAD_DOC_TIMEOUT_MSEC -> + error(load_doc_timeout) + end. + + load_doc(TxDb, Id, null, DocOpts) -> case fabric2_db:open_doc(TxDb, Id, DocOpts) of {ok, Doc} -> couch_doc:to_json_obj(Doc, DocOpts); @@ -352,3 +373,38 @@ load_doc(TxDb, Id, Rev, DocOpts) -> {ok, [{ok, Doc}]} -> couch_doc:to_json_obj(Doc, DocOpts); {ok, [_Else]} -> null end. + + + +% When reading doc bodies at the db version at which the indexer +% observed them, need to use a separate process since the process dict +% is used to hold some of the transaction metadata. +% +maybe_start_doc_loader(_Db, ?VIEW_CURRENT_VSN) -> + undefined; + +maybe_start_doc_loader(Db0, DbReadVsn) -> + Parent = self(), + Db = Db0#{tx := undefined}, + spawn_link(fun() -> + fabric2_fdb:transactional(Db, fun(TxDb) -> + erlfdb:set_read_version(maps:get(tx, TxDb), DbReadVsn), + doc_loader_loop(TxDb, Parent) + end) + end). + + +stop_doc_loader(undefined) -> + ok; + +stop_doc_loader(Pid) when is_pid(Pid) -> + unlink(Pid), + exit(Pid, kill). + + +doc_loader_loop(TxDb, Parent) -> + receive + {load_doc, Id, Rev, DocOpts} -> + Parent ! {load_doc_res, load_doc(TxDb, Id, Rev, DocOpts)}, + doc_loader_loop(TxDb, Parent) + end. diff --git a/src/couch_views/test/couch_views_indexer_test.erl b/src/couch_views/test/couch_views_indexer_test.erl index 182cd6803..69186468d 100644 --- a/src/couch_views/test/couch_views_indexer_test.erl +++ b/src/couch_views/test/couch_views_indexer_test.erl @@ -373,7 +373,8 @@ index_autoupdater_callback(Db) -> ?assertMatch([{ok, <<_/binary>>}], Result), [{ok, JobId}] = Result, - ?assertEqual(ok, couch_views_jobs:wait_for_job(JobId, DDoc#doc.id, DbSeq)). + ?assertMatch({ok, {_, _}}, + couch_views_jobs:wait_for_job(JobId, DDoc#doc.id, DbSeq)). multiple_design_docs(Db) -> diff --git a/src/couch_views/test/couch_views_map_test.erl b/src/couch_views/test/couch_views_map_test.erl index c419546e1..125b43da7 100644 --- a/src/couch_views/test/couch_views_map_test.erl +++ b/src/couch_views/test/couch_views_map_test.erl @@ -14,6 +14,7 @@ -include_lib("couch/include/couch_eunit.hrl"). -include_lib("couch/include/couch_db.hrl"). +-include_lib("couch_mrview/include/couch_mrview.hrl"). -include("couch_views.hrl"). @@ -58,6 +59,7 @@ map_views_test_() -> ?TDEF(should_map_with_doc_emit), ?TDEF(should_map_update_is_false), ?TDEF(should_map_update_is_lazy), + ?TDEF(should_map_snapshot), ?TDEF(should_map_wait_for_interactive), ?TDEF(should_map_local_seq) % fun should_give_ext_size_seq_indexed_test/1 @@ -410,7 +412,7 @@ should_map_update_is_lazy() -> {ok, Mrst} = couch_views_util:ddoc_to_mrst(DbName, DDoc), JobId = couch_views_jobs:job_id(Db, Mrst), UpdateSeq = fabric2_db:get_update_seq(Db), - ok = couch_views_jobs:wait_for_job(JobId, DDoc#doc.id, UpdateSeq), + {ok, _} = couch_views_jobs:wait_for_job(JobId, DDoc#doc.id, UpdateSeq), Args2 = #{ start_key => 8, @@ -422,6 +424,100 @@ should_map_update_is_lazy() -> ?assertEqual(Expect, Result2). +should_map_snapshot() -> + Idx = <<"baz">>, + DbName = ?tempdb(), + + {ok, Db} = fabric2_db:create(DbName, [{user_ctx, ?ADMIN_USER}]), + + DDoc = create_ddoc(), + Docs = make_docs(2), + fabric2_db:update_docs(Db, [DDoc | Docs]), + + % Lazy query just get a hold of a job and wait for it so we can + % get the indexer versionstamps + ?assertEqual({ok, []}, couch_views:query(Db, DDoc, Idx, fun default_cb/2, + [], #{update => lazy})), + {ok, Mrst} = couch_views_util:ddoc_to_mrst(DbName, DDoc), + JobId = couch_views_jobs:job_id(Db, Mrst), + DbSeq = fabric2_db:get_update_seq(Db), + {ok, VStamps} = couch_views_jobs:wait_for_job(JobId, DDoc#doc.id, DbSeq), + + {DbReadVsn, ViewReadVsn} = VStamps, + ?assert(is_integer(DbReadVsn)), + ?assert(is_integer(ViewReadVsn)), + ?assert(DbReadVsn < ViewReadVsn), + + % Update doc 1 and delete doc 2 + {ok, Doc1Open} = fabric2_db:open_doc(Db, <<"1">>), + Doc1Upd = Doc1Open#doc{body = {[{<<"val">>, 42}]}}, + ?assertMatch({ok, {2, _}}, fabric2_db:update_doc(Db, Doc1Upd)), + + {ok, Doc2Open} = fabric2_db:open_doc(Db, <<"2">>), + Doc2Del = Doc2Open#doc{deleted = true}, + ?assertMatch({ok, {2, _}}, fabric2_db:update_doc(Db, Doc2Del)), + + ReadSnapshot = fun(#{tx := Tx} = TxDb) -> + Args = #mrargs{include_docs = true, view_type = map}, + Callback = fun default_cb/2, + erlfdb:set_read_version(Tx, ViewReadVsn), + couch_views_reader:read(TxDb, Mrst, Idx, Callback, [], Args, DbReadVsn) + end, + + % Perform a stale snapshot read asserting that docs updates + % haven't affected include_docs results + ?assertMatch({ok, [ + {row, [ + {id, <<"1">>}, + {key, 1}, + {value, 1}, + {doc, {[ + {<<"_id">>, <<"1">>}, + {<<"_rev">>, <<_/binary>>}, + {<<"val">>, 1} + ]}} + ]}, + {row, [ + {id, <<"2">>}, + {key, 2}, + {value, 2}, + {doc, {[ + {<<"_id">>, <<"2">>}, + {<<"_rev">>, <<_/binary>>}, + {<<"val">>, 2} + ]}} + ]} + ]}, fabric2_fdb:transactional(Db, ReadSnapshot)), + + % Update the view + ?assertMatch({ok, [{row, [{id, <<"1">>}, {key, 42}, {value, 42}]}]}, + couch_views:query(Db, DDoc, Idx, fun default_cb/2, [], #{})), + + % After the view was updated, the original snapshot stays the same + ?assertMatch({ok, [ + {row, [ + {id, <<"1">>}, + {key, 1}, + {value, 1}, + {doc, {[ + {<<"_id">>, <<"1">>}, + {<<"_rev">>, <<_/binary>>}, + {<<"val">>, 1} + ]}} + ]}, + {row, [ + {id, <<"2">>}, + {key, 2}, + {value, 2}, + {doc, {[ + {<<"_id">>, <<"2">>}, + {<<"_rev">>, <<_/binary>>}, + {<<"val">>, 2} + ]}} + ]} + ]}, fabric2_fdb:transactional(Db, ReadSnapshot)). + + should_map_wait_for_interactive() -> DbName = ?tempdb(), {ok, Db} = fabric2_db:create(DbName, [{user_ctx, ?ADMIN_USER}]), |