diff options
author | Paul J. Davis <paul.joseph.davis@gmail.com> | 2020-11-13 14:11:00 -0600 |
---|---|---|
committer | Paul J. Davis <paul.joseph.davis@gmail.com> | 2020-11-17 15:53:52 -0600 |
commit | 27e8e332cf974bff69b86a8d3c5e2f5a6de4125f (patch) | |
tree | af0d1c625a45711d8f879e11fdeeaaeaf7f0f8e1 | |
parent | 0e91f2f608d4a920b6c22f1fe1eb9e0ea6982d5a (diff) | |
download | couchdb-27e8e332cf974bff69b86a8d3c5e2f5a6de4125f.tar.gz |
Minimize conflicts while building views
This flips the view indexer to grab the database update_seq outside of
the update transaction. Previously we would cosntantly refresh the
db_seq value on every retry of the transactional loop.
We use a snapshot to get the update_seq so that we don't trigger
spurious read conflicts with any clients that might be updating the
database.
-rw-r--r-- | src/couch_views/src/couch_views_indexer.erl | 54 | ||||
-rw-r--r-- | src/couch_views/test/couch_views_indexer_test.erl | 45 |
2 files changed, 91 insertions, 8 deletions
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl index 2735f66b7..83ccb2c17 100644 --- a/src/couch_views/src/couch_views_indexer.erl +++ b/src/couch_views/src/couch_views_indexer.erl @@ -86,15 +86,22 @@ init() -> fail_job(Job, Data, sig_changed, "Design document was modified") end, + DbSeq = fabric2_fdb:transactional(Db, fun(TxDb) -> + fabric2_fdb:with_snapshot(TxDb, fun(SSDb) -> + fabric2_db:get_update_seq(SSDb) + end) + end), + State = #{ tx_db => undefined, db_uuid => DbUUID, - db_seq => undefined, + db_seq => DbSeq, view_seq => undefined, last_seq => undefined, view_vs => undefined, job => Job, job_data => Data, + rows_processed => 0, count => 0, changes_done => 0, doc_acc => [], @@ -206,8 +213,6 @@ do_update(Db, Mrst0, State0) -> {ok, State2} = fold_changes(State1), #{ - count := Count, - limit := Limit, doc_acc := DocAcc, last_seq := LastSeq, view_vs := ViewVS, @@ -228,7 +233,7 @@ do_update(Db, Mrst0, State0) -> total_kvs => TotalKVs }, - case Count < Limit of + case is_update_finished(State2) of true -> maybe_set_build_status(TxDb, Mrst2, ViewVS, ?INDEX_READY), @@ -249,6 +254,20 @@ do_update(Db, Mrst0, State0) -> end). +is_update_finished(State) -> + #{ + db_seq := DbSeq, + last_seq := LastSeq, + view_vs := ViewVs + } = State, + AtDbSeq = LastSeq == DbSeq, + AtViewVs = case ViewVs of + not_found -> false; + _ -> LastSeq == fabric2_fdb:vs_to_seq(ViewVs) + end, + AtDbSeq orelse AtViewVs. + + maybe_set_build_status(_TxDb, _Mrst1, not_found, _State) -> ok; @@ -258,7 +277,7 @@ maybe_set_build_status(TxDb, Mrst1, _ViewVS, State) -> % In the first iteration of update we need % to populate our db and view sequences -get_update_start_state(TxDb, Mrst, #{db_seq := undefined} = State) -> +get_update_start_state(TxDb, Mrst, #{view_seq := undefined} = State) -> #{ view_vs := ViewVS, view_seq := ViewSeq @@ -266,7 +285,6 @@ get_update_start_state(TxDb, Mrst, #{db_seq := undefined} = State) -> State#{ tx_db := TxDb, - db_seq := fabric2_db:get_update_seq(TxDb), view_vs := ViewVS, view_seq := ViewSeq, last_seq := ViewSeq @@ -281,18 +299,36 @@ get_update_start_state(TxDb, _Idx, State) -> fold_changes(State) -> #{ view_seq := SinceSeq, + db_seq := DbSeq, limit := Limit, tx_db := TxDb } = State, + FoldState = State#{ + rows_processed := 0 + }, + Fun = fun process_changes/2, - Opts = [{limit, Limit}, {restart_tx, false}], - fabric2_db:fold_changes(TxDb, SinceSeq, Fun, State, Opts). + Opts = [ + {end_key, fabric2_fdb:seq_to_vs(DbSeq)}, + {limit, Limit}, + {restart_tx, false} + ], + case fabric2_db:fold_changes(TxDb, SinceSeq, Fun, FoldState, Opts) of + {ok, #{rows_processed := 0} = FinalState} when Limit > 0 -> + % If we read zero rows with a non-zero limit + % it means we've caught up to the DbSeq as our + % last_seq. + {ok, FinalState#{last_seq := DbSeq}}; + Result -> + Result + end. process_changes(Change, Acc) -> #{ doc_acc := DocAcc, + rows_processed := RowsProcessed, count := Count, design_opts := DesignOpts, view_vs := ViewVS @@ -308,12 +344,14 @@ process_changes(Change, Acc) -> Acc1 = case {Id, IncludeDesign} of {<<?DESIGN_DOC_PREFIX, _/binary>>, false} -> maps:merge(Acc, #{ + rows_processed => RowsProcessed + 1, count => Count + 1, last_seq => LastSeq }); _ -> Acc#{ doc_acc := DocAcc ++ [Change], + rows_processed := RowsProcessed + 1, count := Count + 1, last_seq := LastSeq } diff --git a/src/couch_views/test/couch_views_indexer_test.erl b/src/couch_views/test/couch_views_indexer_test.erl index 75be2459f..a0890da26 100644 --- a/src/couch_views/test/couch_views_indexer_test.erl +++ b/src/couch_views/test/couch_views_indexer_test.erl @@ -52,6 +52,7 @@ indexer_test_() -> ?TDEF_FE(index_autoupdater_callback), ?TDEF_FE(handle_db_recreated_when_running), ?TDEF_FE(handle_db_recreated_after_finished), + ?TDEF_FE(handle_doc_updated_when_running), ?TDEF_FE(index_can_recover_from_crash, 60) ] } @@ -504,6 +505,50 @@ handle_db_recreated_after_finished(Db) -> ], Out2). +handle_doc_updated_when_running(Db) -> + DDoc = create_ddoc(), + {ok, _} = fabric2_db:update_doc(Db, DDoc, []), + {ok, _} = fabric2_db:update_doc(Db, doc(0), []), + {ok, _} = fabric2_db:update_doc(Db, doc(1), []), + + % To intercept job building while it is running ensure updates happen one + % row at a time. + config:set("couch_views", "batch_initial_size", "1", false), + + meck_intercept_job_update(self()), + + [{ok, JobId}] = couch_views:build_indices(Db, [DDoc]), + + {Indexer, _Job, _Data} = wait_indexer_update(10000), + + {ok, State} = couch_jobs:get_job_state(undefined, ?INDEX_JOB_TYPE, JobId), + ?assertEqual(running, State), + + {ok, SubId, running, _} = couch_jobs:subscribe(?INDEX_JOB_TYPE, JobId), + + {ok, Doc} = fabric2_db:open_doc(Db, <<"1">>), + Doc2 = Doc#doc { + body = {[{<<"val">>, 2}]} + }, + {ok, _} = fabric2_db:update_doc(Db, Doc2), + + reset_intercept_job_update(Indexer), + Indexer ! continue, + + ?assertMatch({ + ?INDEX_JOB_TYPE, + JobId, + finished, + #{<<"active_task_info">> := #{<<"changes_done">> := 1}} + }, couch_jobs:wait(SubId, finished, infinity)), + + Args = #mrargs{update = false}, + {ok, Out2} = couch_views:query(Db, DDoc, ?MAP_FUN1, fun fold_fun/2, [], Args), + ?assertEqual([ + row(<<"0">>, 0, 0) + ], Out2). + + index_can_recover_from_crash(Db) -> ok = meck:new(config, [passthrough]), ok = meck:expect(config, get_integer, fun(Section, Key, Default) -> |