summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2021-03-06 02:14:52 -0500
committerNick Vatamaniuc <nickva@users.noreply.github.com>2021-03-23 15:24:27 -0400
commitc777fc3544a10d1c8f38c3c95b8542a68b055cd3 (patch)
tree2123a5ab9c61094b9ba8ac05f472eb4096ab8756
parenta7078eef457f7ec4d17a083cbd1da4cd61b53a4b (diff)
downloadcouchdb-c777fc3544a10d1c8f38c3c95b8542a68b055cd3.tar.gz
Consistent view emits using indexer's GRVs and committed versionstamps
View indexer saves both the GRV it used during the view update, and the committed versionstamp in the couch_jobs job data section. Then, the view reader uses those versionstamps to emit a consistent snapshot of the view. * The committed versionstamp ensures that the view results can be emitted even if the view gets updated between the time the view finished and the reader gets notified. * The indexer GRV ensures that the view will emit the same doc revisions in case when include_docs=true option is used as what it read during the time it indexed the data. The view reader uses those two versionstamps only if it initiates the view build itself and then waits for it to build, to ensure that it doesn't operate on stale GRVs. Because the committed version is only available after the main transaction commits, during view indexing finalization there is now a separate transaction which runs at the end which reads the committed version then marks the view as `finished`. Since included docs have to be read at the indexer's GRV version, and, that version is different than the committed version, those documents are loaded in from a separate process. There are a few complications introduced by this commit: * The versionstamps, especially the indexer GRV ones, may become stale (older than 5 seconds) quickly and start throwing 1007 (transaction_too_old) errors. This could be mitigated by forcing the indexer to commit after a shorter interval (1-2 seconds). * There is some fragility introduced in respect to how included docs are loaded in a separate process. When that crashes or does timeout it maybe throw a new type of un-expected error that we don't catch properly.
-rw-r--r--src/couch_views/include/couch_views.hrl5
-rw-r--r--src/couch_views/src/couch_views.erl31
-rw-r--r--src/couch_views/src/couch_views_indexer.erl60
-rw-r--r--src/couch_views/src/couch_views_jobs.erl22
-rw-r--r--src/couch_views/src/couch_views_reader.erl76
-rw-r--r--src/couch_views/test/couch_views_indexer_test.erl3
-rw-r--r--src/couch_views/test/couch_views_map_test.erl98
7 files changed, 262 insertions, 33 deletions
diff --git a/src/couch_views/include/couch_views.hrl b/src/couch_views/include/couch_views.hrl
index 92b8f46fb..e28fa7478 100644
--- a/src/couch_views/include/couch_views.hrl
+++ b/src/couch_views/include/couch_views.hrl
@@ -40,3 +40,8 @@
% indexing progress
-define(INDEX_BUILDING, <<"building">>).
-define(INDEX_READY, <<"ready">>).
+
+% Views/db marker to indicate that the current (latest) FDB GRV version should
+% be used. Use `null` so it can can be round-tripped through json serialization
+% with couch_jobs.
+-define(VIEW_CURRENT_VSN, null).
diff --git a/src/couch_views/src/couch_views.erl b/src/couch_views/src/couch_views.erl
index 2d916314f..179e2b35b 100644
--- a/src/couch_views/src/couch_views.erl
+++ b/src/couch_views/src/couch_views.erl
@@ -53,11 +53,12 @@ query(Db, DDoc, ViewName, Callback, Acc0, Args0) ->
try
fabric2_fdb:transactional(Db, fun(TxDb) ->
ok = maybe_update_view(TxDb, Mrst, IsInteractive, Args3),
- read_view(TxDb, Mrst, ViewName, Callback, Acc0, Args3)
+ IdxVStamps = {?VIEW_CURRENT_VSN, ?VIEW_CURRENT_VSN},
+ read_view(TxDb, Mrst, ViewName, Callback, Acc0, Args3, IdxVStamps)
end)
catch throw:{build_view, WaitSeq} ->
- couch_views_jobs:build_view(Db, Mrst, WaitSeq),
- read_view(Db, Mrst, ViewName, Callback, Acc0, Args3)
+ {ok, IdxVStamps} = couch_views_jobs:build_view(Db, Mrst, WaitSeq),
+ read_view(Db, Mrst, ViewName, Callback, Acc0, Args3, IdxVStamps)
end.
@@ -126,14 +127,32 @@ get_total_view_size(TxDb, Mrst) ->
end, 0, Mrst#mrst.views).
-read_view(Db, Mrst, ViewName, Callback, Acc0, Args) ->
+read_view(Db, Mrst, ViewName, Callback, Acc0, Args, {_, _} = IdxVStamps) ->
+ {DbReadVsn, ViewReadVsn} = IdxVStamps,
fabric2_fdb:transactional(Db, fun(TxDb) ->
+ case ViewReadVsn of
+ ?VIEW_CURRENT_VSN ->
+ ok;
+ _ when is_integer(ViewReadVsn) ->
+ % Set the GRV of the transaction to the committed
+ % version of the indexer. That is the version at which
+ % the indexer has committed the view data.
+ erlfdb:set_read_version(maps:get(tx, TxDb), ViewReadVsn)
+ end,
try
- couch_views_reader:read(TxDb, Mrst, ViewName, Callback, Acc0, Args)
+ couch_views_reader:read(TxDb, Mrst, ViewName, Callback, Acc0, Args,
+ DbReadVsn)
after
UpdateAfter = Args#mrargs.update == lazy,
if UpdateAfter == false -> ok; true ->
- couch_views_jobs:build_view_async(TxDb, Mrst)
+ % Make sure to use a separate transaction if we are
+ % reading from a stale snapshot
+ case ViewReadVsn of
+ ?VIEW_CURRENT_VSN ->
+ couch_views_jobs:build_view_async(TxDb, Mrst);
+ _ ->
+ couch_views_jobs:build_view_async(Db, Mrst)
+ end
end
end
end).
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index 3a4da34a7..8556b9946 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -110,7 +110,9 @@ init() ->
doc_acc => [],
design_opts => Mrst#mrst.design_opts,
update_stats => #{},
- tx_retry_limit => tx_retry_limit()
+ tx_retry_limit => tx_retry_limit(),
+ db_read_vsn => ?VIEW_CURRENT_VSN,
+ view_read_vsn => ?VIEW_CURRENT_VSN
},
try
@@ -209,7 +211,7 @@ update(#{} = Db, Mrst0, State0) ->
do_update(Db, Mrst0, State0) ->
TxOpts = #{retry_limit => maps:get(tx_retry_limit, State0)},
- fabric2_fdb:transactional(Db, TxOpts, fun(TxDb) ->
+ TxResult = fabric2_fdb:transactional(Db, TxOpts, fun(TxDb) ->
#{
tx := Tx
} = TxDb,
@@ -224,7 +226,6 @@ do_update(Db, Mrst0, State0) ->
#{
doc_acc := DocAcc,
last_seq := LastSeq,
- view_vs := ViewVS,
changes_done := ChangesDone0,
design_opts := DesignOpts
} = State2,
@@ -244,12 +245,18 @@ do_update(Db, Mrst0, State0) ->
case is_update_finished(State2) of
true ->
- maybe_set_build_status(TxDb, Mrst2, ViewVS, ?INDEX_READY),
- report_progress(State2#{changes_done := ChangesDone}, finished),
- {Mrst2, finished};
+ State3 = State2#{changes_done := ChangesDone},
+ % We must call report_progress/2 (which, in turn calls
+ % couch_jobs:update/3) in every transaction where indexing data
+ % is updated, otherwise we risk another indexer taking over and
+ % clobbering the indexing data
+ State4 = report_progress(State3, update),
+ {Mrst2, finished, State4#{
+ db_read_vsn := erlfdb:wait(erlfdb:get_read_version(Tx))
+ }};
false ->
State3 = report_progress(State2, update),
- {Mrst2, State3#{
+ {Mrst2, continue, State3#{
tx_db := undefined,
count := 0,
doc_acc := [],
@@ -258,6 +265,37 @@ do_update(Db, Mrst0, State0) ->
update_stats := UpdateStats
}}
end
+ end),
+ case TxResult of
+ {Mrst, continue, State} ->
+ {Mrst, State};
+ {Mrst, finished, State} ->
+ do_finalize(Mrst, State),
+ {Mrst, finished}
+ end.
+
+
+do_finalize(Mrst, State) ->
+ #{tx_db := OldDb} = State,
+ ViewReadVsn = erlfdb:get_committed_version(maps:get(tx, OldDb)),
+ fabric2_fdb:transactional(OldDb#{tx := undefined}, fun(TxDb) ->
+ % Use the recent committed version as the read
+ % version. However, if transaction retries due to an error,
+ % let it acquire its own version to avoid spinning
+ % continuously due to conflicts or other errors.
+ case erlfdb:get_last_error() of
+ undefined ->
+ erlfdb:set_read_version(maps:get(tx, TxDb), ViewReadVsn);
+ ErrorCode when is_integer(ErrorCode) ->
+ ok
+ end,
+ State1 = State#{
+ tx_db := TxDb,
+ view_read_vsn := ViewReadVsn
+ },
+ ViewVS = maps:get(view_vs, State1),
+ maybe_set_build_status(TxDb, Mrst, ViewVS, ?INDEX_READY),
+ report_progress(State1, finished)
end).
@@ -583,7 +621,9 @@ report_progress(State, UpdateType) ->
job_data := JobData,
last_seq := LastSeq,
db_seq := DBSeq,
- changes_done := ChangesDone
+ changes_done := ChangesDone,
+ db_read_vsn := DbReadVsn,
+ view_read_vsn := ViewReadVsn
} = State,
#{
@@ -611,7 +651,9 @@ report_progress(State, UpdateType) ->
<<"ddoc_id">> => DDocId,
<<"sig">> => Sig,
<<"view_seq">> => LastSeq,
- <<"retries">> => Retries
+ <<"retries">> => Retries,
+ <<"db_read_vsn">> => DbReadVsn,
+ <<"view_read_vsn">> => ViewReadVsn
},
NewData = fabric2_active_tasks:update_active_task_info(NewData0,
NewActiveTasks),
diff --git a/src/couch_views/src/couch_views_jobs.erl b/src/couch_views/src/couch_views_jobs.erl
index 4b0aa2660..6e5111862 100644
--- a/src/couch_views/src/couch_views_jobs.erl
+++ b/src/couch_views/src/couch_views_jobs.erl
@@ -37,7 +37,7 @@ set_timeout() ->
build_view(TxDb, Mrst, UpdateSeq) ->
{ok, JobId} = build_view_async(TxDb, Mrst),
case wait_for_job(JobId, Mrst#mrst.idx_name, UpdateSeq) of
- ok -> ok;
+ {ok, IdxVStamps} -> {ok, IdxVStamps};
retry -> build_view(TxDb, Mrst, UpdateSeq)
end.
@@ -90,7 +90,7 @@ wait_for_job(JobId, DDocId, UpdateSeq) ->
{ok, finished, Data} ->
case Data of
#{<<"view_seq">> := ViewSeq} when ViewSeq >= UpdateSeq ->
- ok;
+ {ok, idx_vstamps(Data)};
_ ->
retry
end
@@ -117,18 +117,28 @@ wait_for_job(JobId, Subscription, DDocId, UpdateSeq) ->
{finished, #{<<"error">> := Error, <<"reason">> := Reason}} ->
couch_jobs:remove(undefined, ?INDEX_JOB_TYPE, JobId),
erlang:error({binary_to_existing_atom(Error, latin1), Reason});
- {finished, #{<<"view_seq">> := ViewSeq}} when ViewSeq >= UpdateSeq ->
- ok;
+ {finished, #{<<"view_seq">> := ViewSeq} = JobData}
+ when ViewSeq >= UpdateSeq ->
+ {ok, idx_vstamps(JobData)};
{finished, _} ->
wait_for_job(JobId, DDocId, UpdateSeq);
- {_State, #{<<"view_seq">> := ViewSeq}} when ViewSeq >= UpdateSeq ->
+ {_State, #{<<"view_seq">> := ViewSeq} = JobData}
+ when ViewSeq >= UpdateSeq ->
couch_jobs:unsubscribe(Subscription),
- ok;
+ {ok, idx_vstamps(JobData)};
{_, _} ->
wait_for_job(JobId, Subscription, DDocId, UpdateSeq)
end.
+idx_vstamps(#{} = JobData) ->
+ #{
+ <<"db_read_vsn">> := DbReadVsn,
+ <<"view_read_vsn">> := ViewReadVsn
+ } = JobData,
+ {DbReadVsn, ViewReadVsn}.
+
+
job_id(#{name := DbName}, #mrst{sig = Sig}) ->
job_id(DbName, Sig);
diff --git a/src/couch_views/src/couch_views_reader.erl b/src/couch_views/src/couch_views_reader.erl
index 35ee8a021..0fc910f77 100644
--- a/src/couch_views/src/couch_views_reader.erl
+++ b/src/couch_views/src/couch_views_reader.erl
@@ -13,7 +13,7 @@
-module(couch_views_reader).
-export([
- read/6
+ read/7
]).
@@ -23,15 +23,19 @@
-include_lib("fabric/include/fabric2.hrl").
-read(Db, Mrst, ViewName, UserCallback, UserAcc, Args) ->
+-define(LOAD_DOC_TIMEOUT_MSEC, 10000).
+
+
+read(Db, Mrst, ViewName, UserCallback, UserAcc, Args, DbReadVsn) ->
ReadFun = case Args of
- #mrargs{view_type = map} -> fun read_map_view/6;
- #mrargs{view_type = red} -> fun read_red_view/6
+ #mrargs{view_type = map} -> fun read_map_view/7;
+ #mrargs{view_type = red} -> fun read_red_view/7
end,
- ReadFun(Db, Mrst, ViewName, UserCallback, UserAcc, Args).
+ ReadFun(Db, Mrst, ViewName, UserCallback, UserAcc, Args, DbReadVsn).
-read_map_view(Db, Mrst0, ViewName, UserCallback, UserAcc0, Args) ->
+read_map_view(Db, Mrst0, ViewName, UserCallback, UserAcc0, Args, DbReadVsn) ->
+ DocLoader = maybe_start_doc_loader(Db, DbReadVsn),
try
fabric2_fdb:transactional(Db, fun(TxDb) ->
#mrst{
@@ -51,7 +55,8 @@ read_map_view(Db, Mrst0, ViewName, UserCallback, UserAcc0, Args) ->
limit => Args#mrargs.limit,
mrargs => undefined,
callback => UserCallback,
- acc => UserAcc1
+ acc => UserAcc1,
+ doc_loader => DocLoader
},
Acc1 = lists:foldl(fun(KeyArgs, KeyAcc0) ->
@@ -73,10 +78,12 @@ read_map_view(Db, Mrst0, ViewName, UserCallback, UserAcc0, Args) ->
{ok, Final};
throw:{done, Out} ->
{ok, Out}
+ after
+ stop_doc_loader(DocLoader)
end.
-read_red_view(Db, Mrst0, ViewName, UserCallback, UserAcc0, Args) ->
+read_red_view(Db, Mrst0, ViewName, UserCallback, UserAcc0, Args, _DbReadVsn) ->
#mrst{
language = Lang,
views = Views
@@ -179,7 +186,8 @@ handle_map_row(DocId, Key, Value, Acc) ->
limit := Limit,
mrargs := Args,
callback := UserCallback,
- acc := UserAcc0
+ acc := UserAcc0,
+ doc_loader := DocLoader
} = Acc,
BaseRow = [
@@ -196,7 +204,7 @@ handle_map_row(DocId, Key, Value, Acc) ->
end,
{TargetDocId, Rev} = get_doc_id(DocId, Value),
- DocObj = load_doc(TxDb, TargetDocId, Rev, DocOpts1),
+ DocObj = load_doc(TxDb, TargetDocId, Rev, DocOpts1, DocLoader),
[{doc, DocObj}]
end,
@@ -340,6 +348,19 @@ get_doc_id(Id, _Value) ->
{Id, null}.
+load_doc(TxDb, Id, Rev, DocOpts, undefined) ->
+ load_doc(TxDb, Id, Rev, DocOpts);
+
+load_doc(_TxDb, Id, Rev, DocOpts, DocLoader) when is_pid(DocLoader) ->
+ DocLoader ! {load_doc, Id, Rev, DocOpts},
+ receive
+ {load_doc_res, Result} -> Result
+ after
+ ?LOAD_DOC_TIMEOUT_MSEC ->
+ error(load_doc_timeout)
+ end.
+
+
load_doc(TxDb, Id, null, DocOpts) ->
case fabric2_db:open_doc(TxDb, Id, DocOpts) of
{ok, Doc} -> couch_doc:to_json_obj(Doc, DocOpts);
@@ -352,3 +373,38 @@ load_doc(TxDb, Id, Rev, DocOpts) ->
{ok, [{ok, Doc}]} -> couch_doc:to_json_obj(Doc, DocOpts);
{ok, [_Else]} -> null
end.
+
+
+
+% When reading doc bodies at the db version at which the indexer
+% observed them, need to use a separate process since the process dict
+% is used to hold some of the transaction metadata.
+%
+maybe_start_doc_loader(_Db, ?VIEW_CURRENT_VSN) ->
+ undefined;
+
+maybe_start_doc_loader(Db0, DbReadVsn) ->
+ Parent = self(),
+ Db = Db0#{tx := undefined},
+ spawn_link(fun() ->
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ erlfdb:set_read_version(maps:get(tx, TxDb), DbReadVsn),
+ doc_loader_loop(TxDb, Parent)
+ end)
+ end).
+
+
+stop_doc_loader(undefined) ->
+ ok;
+
+stop_doc_loader(Pid) when is_pid(Pid) ->
+ unlink(Pid),
+ exit(Pid, kill).
+
+
+doc_loader_loop(TxDb, Parent) ->
+ receive
+ {load_doc, Id, Rev, DocOpts} ->
+ Parent ! {load_doc_res, load_doc(TxDb, Id, Rev, DocOpts)},
+ doc_loader_loop(TxDb, Parent)
+ end.
diff --git a/src/couch_views/test/couch_views_indexer_test.erl b/src/couch_views/test/couch_views_indexer_test.erl
index 182cd6803..69186468d 100644
--- a/src/couch_views/test/couch_views_indexer_test.erl
+++ b/src/couch_views/test/couch_views_indexer_test.erl
@@ -373,7 +373,8 @@ index_autoupdater_callback(Db) ->
?assertMatch([{ok, <<_/binary>>}], Result),
[{ok, JobId}] = Result,
- ?assertEqual(ok, couch_views_jobs:wait_for_job(JobId, DDoc#doc.id, DbSeq)).
+ ?assertMatch({ok, {_, _}},
+ couch_views_jobs:wait_for_job(JobId, DDoc#doc.id, DbSeq)).
multiple_design_docs(Db) ->
diff --git a/src/couch_views/test/couch_views_map_test.erl b/src/couch_views/test/couch_views_map_test.erl
index c419546e1..125b43da7 100644
--- a/src/couch_views/test/couch_views_map_test.erl
+++ b/src/couch_views/test/couch_views_map_test.erl
@@ -14,6 +14,7 @@
-include_lib("couch/include/couch_eunit.hrl").
-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
-include("couch_views.hrl").
@@ -58,6 +59,7 @@ map_views_test_() ->
?TDEF(should_map_with_doc_emit),
?TDEF(should_map_update_is_false),
?TDEF(should_map_update_is_lazy),
+ ?TDEF(should_map_snapshot),
?TDEF(should_map_wait_for_interactive),
?TDEF(should_map_local_seq)
% fun should_give_ext_size_seq_indexed_test/1
@@ -410,7 +412,7 @@ should_map_update_is_lazy() ->
{ok, Mrst} = couch_views_util:ddoc_to_mrst(DbName, DDoc),
JobId = couch_views_jobs:job_id(Db, Mrst),
UpdateSeq = fabric2_db:get_update_seq(Db),
- ok = couch_views_jobs:wait_for_job(JobId, DDoc#doc.id, UpdateSeq),
+ {ok, _} = couch_views_jobs:wait_for_job(JobId, DDoc#doc.id, UpdateSeq),
Args2 = #{
start_key => 8,
@@ -422,6 +424,100 @@ should_map_update_is_lazy() ->
?assertEqual(Expect, Result2).
+should_map_snapshot() ->
+ Idx = <<"baz">>,
+ DbName = ?tempdb(),
+
+ {ok, Db} = fabric2_db:create(DbName, [{user_ctx, ?ADMIN_USER}]),
+
+ DDoc = create_ddoc(),
+ Docs = make_docs(2),
+ fabric2_db:update_docs(Db, [DDoc | Docs]),
+
+ % Lazy query just get a hold of a job and wait for it so we can
+ % get the indexer versionstamps
+ ?assertEqual({ok, []}, couch_views:query(Db, DDoc, Idx, fun default_cb/2,
+ [], #{update => lazy})),
+ {ok, Mrst} = couch_views_util:ddoc_to_mrst(DbName, DDoc),
+ JobId = couch_views_jobs:job_id(Db, Mrst),
+ DbSeq = fabric2_db:get_update_seq(Db),
+ {ok, VStamps} = couch_views_jobs:wait_for_job(JobId, DDoc#doc.id, DbSeq),
+
+ {DbReadVsn, ViewReadVsn} = VStamps,
+ ?assert(is_integer(DbReadVsn)),
+ ?assert(is_integer(ViewReadVsn)),
+ ?assert(DbReadVsn < ViewReadVsn),
+
+ % Update doc 1 and delete doc 2
+ {ok, Doc1Open} = fabric2_db:open_doc(Db, <<"1">>),
+ Doc1Upd = Doc1Open#doc{body = {[{<<"val">>, 42}]}},
+ ?assertMatch({ok, {2, _}}, fabric2_db:update_doc(Db, Doc1Upd)),
+
+ {ok, Doc2Open} = fabric2_db:open_doc(Db, <<"2">>),
+ Doc2Del = Doc2Open#doc{deleted = true},
+ ?assertMatch({ok, {2, _}}, fabric2_db:update_doc(Db, Doc2Del)),
+
+ ReadSnapshot = fun(#{tx := Tx} = TxDb) ->
+ Args = #mrargs{include_docs = true, view_type = map},
+ Callback = fun default_cb/2,
+ erlfdb:set_read_version(Tx, ViewReadVsn),
+ couch_views_reader:read(TxDb, Mrst, Idx, Callback, [], Args, DbReadVsn)
+ end,
+
+ % Perform a stale snapshot read asserting that docs updates
+ % haven't affected include_docs results
+ ?assertMatch({ok, [
+ {row, [
+ {id, <<"1">>},
+ {key, 1},
+ {value, 1},
+ {doc, {[
+ {<<"_id">>, <<"1">>},
+ {<<"_rev">>, <<_/binary>>},
+ {<<"val">>, 1}
+ ]}}
+ ]},
+ {row, [
+ {id, <<"2">>},
+ {key, 2},
+ {value, 2},
+ {doc, {[
+ {<<"_id">>, <<"2">>},
+ {<<"_rev">>, <<_/binary>>},
+ {<<"val">>, 2}
+ ]}}
+ ]}
+ ]}, fabric2_fdb:transactional(Db, ReadSnapshot)),
+
+ % Update the view
+ ?assertMatch({ok, [{row, [{id, <<"1">>}, {key, 42}, {value, 42}]}]},
+ couch_views:query(Db, DDoc, Idx, fun default_cb/2, [], #{})),
+
+ % After the view was updated, the original snapshot stays the same
+ ?assertMatch({ok, [
+ {row, [
+ {id, <<"1">>},
+ {key, 1},
+ {value, 1},
+ {doc, {[
+ {<<"_id">>, <<"1">>},
+ {<<"_rev">>, <<_/binary>>},
+ {<<"val">>, 1}
+ ]}}
+ ]},
+ {row, [
+ {id, <<"2">>},
+ {key, 2},
+ {value, 2},
+ {doc, {[
+ {<<"_id">>, <<"2">>},
+ {<<"_rev">>, <<_/binary>>},
+ {<<"val">>, 2}
+ ]}}
+ ]}
+ ]}, fabric2_fdb:transactional(Db, ReadSnapshot)).
+
+
should_map_wait_for_interactive() ->
DbName = ?tempdb(),
{ok, Db} = fabric2_db:create(DbName, [{user_ctx, ?ADMIN_USER}]),