summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul J. Davis <paul.joseph.davis@gmail.com>2020-11-13 14:11:00 -0600
committerPaul J. Davis <paul.joseph.davis@gmail.com>2020-11-17 15:53:52 -0600
commit27e8e332cf974bff69b86a8d3c5e2f5a6de4125f (patch)
treeaf0d1c625a45711d8f879e11fdeeaaeaf7f0f8e1
parent0e91f2f608d4a920b6c22f1fe1eb9e0ea6982d5a (diff)
downloadcouchdb-27e8e332cf974bff69b86a8d3c5e2f5a6de4125f.tar.gz
Minimize conflicts while building views
This flips the view indexer to grab the database update_seq outside of the update transaction. Previously we would cosntantly refresh the db_seq value on every retry of the transactional loop. We use a snapshot to get the update_seq so that we don't trigger spurious read conflicts with any clients that might be updating the database.
-rw-r--r--src/couch_views/src/couch_views_indexer.erl54
-rw-r--r--src/couch_views/test/couch_views_indexer_test.erl45
2 files changed, 91 insertions, 8 deletions
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index 2735f66b7..83ccb2c17 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -86,15 +86,22 @@ init() ->
fail_job(Job, Data, sig_changed, "Design document was modified")
end,
+ DbSeq = fabric2_fdb:transactional(Db, fun(TxDb) ->
+ fabric2_fdb:with_snapshot(TxDb, fun(SSDb) ->
+ fabric2_db:get_update_seq(SSDb)
+ end)
+ end),
+
State = #{
tx_db => undefined,
db_uuid => DbUUID,
- db_seq => undefined,
+ db_seq => DbSeq,
view_seq => undefined,
last_seq => undefined,
view_vs => undefined,
job => Job,
job_data => Data,
+ rows_processed => 0,
count => 0,
changes_done => 0,
doc_acc => [],
@@ -206,8 +213,6 @@ do_update(Db, Mrst0, State0) ->
{ok, State2} = fold_changes(State1),
#{
- count := Count,
- limit := Limit,
doc_acc := DocAcc,
last_seq := LastSeq,
view_vs := ViewVS,
@@ -228,7 +233,7 @@ do_update(Db, Mrst0, State0) ->
total_kvs => TotalKVs
},
- case Count < Limit of
+ case is_update_finished(State2) of
true ->
maybe_set_build_status(TxDb, Mrst2, ViewVS,
?INDEX_READY),
@@ -249,6 +254,20 @@ do_update(Db, Mrst0, State0) ->
end).
+is_update_finished(State) ->
+ #{
+ db_seq := DbSeq,
+ last_seq := LastSeq,
+ view_vs := ViewVs
+ } = State,
+ AtDbSeq = LastSeq == DbSeq,
+ AtViewVs = case ViewVs of
+ not_found -> false;
+ _ -> LastSeq == fabric2_fdb:vs_to_seq(ViewVs)
+ end,
+ AtDbSeq orelse AtViewVs.
+
+
maybe_set_build_status(_TxDb, _Mrst1, not_found, _State) ->
ok;
@@ -258,7 +277,7 @@ maybe_set_build_status(TxDb, Mrst1, _ViewVS, State) ->
% In the first iteration of update we need
% to populate our db and view sequences
-get_update_start_state(TxDb, Mrst, #{db_seq := undefined} = State) ->
+get_update_start_state(TxDb, Mrst, #{view_seq := undefined} = State) ->
#{
view_vs := ViewVS,
view_seq := ViewSeq
@@ -266,7 +285,6 @@ get_update_start_state(TxDb, Mrst, #{db_seq := undefined} = State) ->
State#{
tx_db := TxDb,
- db_seq := fabric2_db:get_update_seq(TxDb),
view_vs := ViewVS,
view_seq := ViewSeq,
last_seq := ViewSeq
@@ -281,18 +299,36 @@ get_update_start_state(TxDb, _Idx, State) ->
fold_changes(State) ->
#{
view_seq := SinceSeq,
+ db_seq := DbSeq,
limit := Limit,
tx_db := TxDb
} = State,
+ FoldState = State#{
+ rows_processed := 0
+ },
+
Fun = fun process_changes/2,
- Opts = [{limit, Limit}, {restart_tx, false}],
- fabric2_db:fold_changes(TxDb, SinceSeq, Fun, State, Opts).
+ Opts = [
+ {end_key, fabric2_fdb:seq_to_vs(DbSeq)},
+ {limit, Limit},
+ {restart_tx, false}
+ ],
+ case fabric2_db:fold_changes(TxDb, SinceSeq, Fun, FoldState, Opts) of
+ {ok, #{rows_processed := 0} = FinalState} when Limit > 0 ->
+ % If we read zero rows with a non-zero limit
+ % it means we've caught up to the DbSeq as our
+ % last_seq.
+ {ok, FinalState#{last_seq := DbSeq}};
+ Result ->
+ Result
+ end.
process_changes(Change, Acc) ->
#{
doc_acc := DocAcc,
+ rows_processed := RowsProcessed,
count := Count,
design_opts := DesignOpts,
view_vs := ViewVS
@@ -308,12 +344,14 @@ process_changes(Change, Acc) ->
Acc1 = case {Id, IncludeDesign} of
{<<?DESIGN_DOC_PREFIX, _/binary>>, false} ->
maps:merge(Acc, #{
+ rows_processed => RowsProcessed + 1,
count => Count + 1,
last_seq => LastSeq
});
_ ->
Acc#{
doc_acc := DocAcc ++ [Change],
+ rows_processed := RowsProcessed + 1,
count := Count + 1,
last_seq := LastSeq
}
diff --git a/src/couch_views/test/couch_views_indexer_test.erl b/src/couch_views/test/couch_views_indexer_test.erl
index 75be2459f..a0890da26 100644
--- a/src/couch_views/test/couch_views_indexer_test.erl
+++ b/src/couch_views/test/couch_views_indexer_test.erl
@@ -52,6 +52,7 @@ indexer_test_() ->
?TDEF_FE(index_autoupdater_callback),
?TDEF_FE(handle_db_recreated_when_running),
?TDEF_FE(handle_db_recreated_after_finished),
+ ?TDEF_FE(handle_doc_updated_when_running),
?TDEF_FE(index_can_recover_from_crash, 60)
]
}
@@ -504,6 +505,50 @@ handle_db_recreated_after_finished(Db) ->
], Out2).
+handle_doc_updated_when_running(Db) ->
+ DDoc = create_ddoc(),
+ {ok, _} = fabric2_db:update_doc(Db, DDoc, []),
+ {ok, _} = fabric2_db:update_doc(Db, doc(0), []),
+ {ok, _} = fabric2_db:update_doc(Db, doc(1), []),
+
+ % To intercept job building while it is running ensure updates happen one
+ % row at a time.
+ config:set("couch_views", "batch_initial_size", "1", false),
+
+ meck_intercept_job_update(self()),
+
+ [{ok, JobId}] = couch_views:build_indices(Db, [DDoc]),
+
+ {Indexer, _Job, _Data} = wait_indexer_update(10000),
+
+ {ok, State} = couch_jobs:get_job_state(undefined, ?INDEX_JOB_TYPE, JobId),
+ ?assertEqual(running, State),
+
+ {ok, SubId, running, _} = couch_jobs:subscribe(?INDEX_JOB_TYPE, JobId),
+
+ {ok, Doc} = fabric2_db:open_doc(Db, <<"1">>),
+ Doc2 = Doc#doc {
+ body = {[{<<"val">>, 2}]}
+ },
+ {ok, _} = fabric2_db:update_doc(Db, Doc2),
+
+ reset_intercept_job_update(Indexer),
+ Indexer ! continue,
+
+ ?assertMatch({
+ ?INDEX_JOB_TYPE,
+ JobId,
+ finished,
+ #{<<"active_task_info">> := #{<<"changes_done">> := 1}}
+ }, couch_jobs:wait(SubId, finished, infinity)),
+
+ Args = #mrargs{update = false},
+ {ok, Out2} = couch_views:query(Db, DDoc, ?MAP_FUN1, fun fold_fun/2, [], Args),
+ ?assertEqual([
+ row(<<"0">>, 0, 0)
+ ], Out2).
+
+
index_can_recover_from_crash(Db) ->
ok = meck:new(config, [passthrough]),
ok = meck:expect(config, get_integer, fun(Section, Key, Default) ->