From 07657867889b106a1a639660416ef69b714d1247 Mon Sep 17 00:00:00 2001 From: "Paul J. Davis" Date: Fri, 18 Oct 2019 16:32:14 -0500 Subject: Parallelize view builds --- src/couch_views/src/couch_views_indexer.erl | 291 +++++++++++++++++----------- 1 file changed, 177 insertions(+), 114 deletions(-) diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl index 55ce06311..f0b711756 100644 --- a/src/couch_views/src/couch_views_indexer.erl +++ b/src/couch_views/src/couch_views_indexer.erl @@ -42,7 +42,16 @@ init() -> <<"sig">> := JobSig } = Data, - {ok, Db} = fabric2_db:open(DbName, [?ADMIN_CTX]), + {ok, Db} = try + fabric2_db:open(DbName, [?ADMIN_CTX]) + catch error:database_does_not_exist -> + couch_jobs:finish(undefined, Job, Data#{ + error => database_does_not_exist, + reason => <<"Database was deleted">> + }), + exit(normal) + end, + {ok, DDoc} = fabric2_db:open_doc(Db, DDocId), {ok, Mrst} = couch_views_util:ddoc_to_mrst(DbName, DDoc), HexSig = fabric2_util:to_hex(Mrst#mrst.sig), @@ -57,92 +66,107 @@ init() -> State = #{ tx_db => undefined, - db_seq => undefined, view_seq => undefined, last_seq => undefined, job => Job, job_data => Data, count => 0, limit => num_changes(), - doc_acc => [], + batch_size => batch_size(), + workers => [], design_opts => Mrst#mrst.design_opts }, update(Db, Mrst, State). -update(#{} = Db, Mrst0, State0) -> - {Mrst2, State4} = fabric2_fdb:transactional(Db, fun(TxDb) -> +update(#{} = Db, MrSt, State0) -> + State2 = fabric2_fdb:transactional(Db, fun(TxDb1) -> % In the first iteration of update we need % to populate our db and view sequences State1 = case State0 of - #{db_seq := undefined} -> - ViewSeq = couch_views_fdb:get_update_seq(TxDb, Mrst0), + #{view_seq := undefined} -> + ViewSeq = couch_views_fdb:get_update_seq(TxDb1, MrSt), State0#{ - tx_db := TxDb, - db_seq := fabric2_db:get_update_seq(TxDb), + tx_db := TxDb1, view_seq := ViewSeq, last_seq := ViewSeq }; _ -> State0#{ - tx_db := TxDb + tx_db := TxDb1 } end, - {ok, State2} = fold_changes(State1), + fold_changes(State1, MrSt) + end), - #{ - count := Count, - limit := Limit, - doc_acc := DocAcc, - last_seq := LastSeq - } = State2, + #{ + last_seq := LastSeq, + count := Count, + limit := Limit, + workers := Workers + } = State2, + + % Bit odd to be starting with the newest + % worker first here but I think it's fine + % for now since we're collecting all updates + % into a single write transaction + Changes = lists:foldl(fun({WPid, WRef}, Acc) -> + receive + {'DOWN', WRef, process, WPid, {ok, NewChanges}} -> + NewChanges ++ Acc; + {'DOWN', WRef, process, WPid, Reason} -> + exit({worker_update_failed, Reason}) + after 6000 -> + erlang:error("Timeout waiting for worker: ~p", [WPid]) + end + end, [], Workers), - DocAcc1 = fetch_docs(TxDb, DocAcc), - {Mrst1, MappedDocs} = map_docs(Mrst0, DocAcc1), - write_docs(TxDb, Mrst1, MappedDocs, State2), + State4 = fabric2_fdb:transactional(Db, fun(TxDb2) -> + + write_changes(TxDb2, MrSt, LastSeq, Changes), case Count < Limit of true -> - report_progress(State2, finished), - {Mrst1, finished}; + report_progress(State2#{tx_db := TxDb2}, finished), + finished; false -> - State3 = report_progress(State2, update), - {Mrst1, State3#{ + State3 = report_progress(State2#{tx_db := TxDb2}, update), + State3#{ tx_db := undefined, + view_seq := LastSeq, count := 0, - doc_acc := [], - view_seq := LastSeq - }} + workers := [] + } end end), - case State4 of - finished -> - couch_eval:release_map_context(Mrst2#mrst.qserver); - _ -> - update(Db, Mrst2, State4) + if State4 == finished -> ok; true -> + update(Db, MrSt, State4) end. -fold_changes(State) -> +fold_changes(State0, MrSt) -> #{ - view_seq := SinceSeq, - limit := Limit, - tx_db := TxDb - } = State, + tx_db := TxDb, + view_seq := Seq, + limit := Limit + } = State0, Fun = fun process_changes/2, - fabric2_db:fold_changes(TxDb, SinceSeq, Fun, State, [{limit, Limit}]). + Acc = {State0, MrSt, []}, + Opts = [{limit, Limit}], + {ok, AccOut} = fabric2_db:fold_changes(TxDb, Seq, Fun, Acc, Opts), + spawn_worker(AccOut). -process_changes(Change, Acc) -> +process_changes(Change, {State0, MrSt, Changes0}) -> #{ - doc_acc := DocAcc, count := Count, + batch_size := BatchSize, design_opts := DesignOpts - } = Acc, + } = State0, #{ id := Id, @@ -151,86 +175,65 @@ process_changes(Change, Acc) -> IncludeDesign = lists:keymember(<<"include_design">>, 1, DesignOpts), - Acc1 = case {Id, IncludeDesign} of + Changes1 = case {Id, IncludeDesign} of {<>, false} -> - maps:merge(Acc, #{ - count => Count + 1, - last_seq => LastSeq - }); + Changes0; _ -> - Acc#{ - doc_acc := DocAcc ++ [Change], - count := Count + 1, - last_seq := LastSeq - } + [Change | Changes0] end, - {ok, Acc1}. - - -map_docs(Mrst, Docs) -> - % Run all the non deleted docs through the view engine and - Mrst1 = start_query_server(Mrst), - QServer = Mrst1#mrst.qserver, - {Deleted0, NotDeleted0} = lists:partition(fun(Doc) -> - #{deleted := Deleted} = Doc, - Deleted - end, Docs), - - Deleted1 = lists:map(fun(Doc) -> - Doc#{results => []} - end, Deleted0), - - DocsToMap = lists:map(fun(Doc) -> - #{doc := DocRec} = Doc, - DocRec - end, NotDeleted0), - - {ok, AllResults} = couch_eval:map_docs(QServer, DocsToMap), - - % The expanded function head here is making an assertion - % that the results match the given doc - NotDeleted1 = lists:zipwith(fun(#{id := DocId} = Doc, {DocId, Results}) -> - Doc#{results => Results} - end, NotDeleted0, AllResults), - - % I'm being a bit careful here resorting the docs - % in order of the changes feed. Theoretically this is - % unnecessary since we're inside a single transaction. - % However, I'm concerned if we ever split this up - % into multiple transactions that this detail might - % be important but forgotten. - MappedDocs = lists:sort(fun(A, B) -> - #{sequence := ASeq} = A, - #{sequence := BSeq} = B, - ASeq =< BSeq - end, Deleted1 ++ NotDeleted1), + State1 = State0#{ + count := Count + 1, + last_seq := LastSeq + }, - {Mrst1, MappedDocs}. + case length(Changes1) < BatchSize of + true -> + {ok, {State1, MrSt, Changes1}}; + false -> + State2 = spawn_worker({State1, MrSt, Changes1}), + {ok, {State2, MrSt, []}} + end. -write_docs(TxDb, Mrst, Docs, State) -> - #mrst{ - views = Views, - sig = Sig - } = Mrst, +spawn_worker({State, _MrSt, []}) -> + State; +spawn_worker({State, MrSt, Changes}) when length(Changes) > 0 -> #{ - last_seq := LastSeq + tx_db := #{tx := Tx} = TxDb, + workers := Workers } = State, + ReadVersion = erlfdb:wait(erlfdb:get_read_version(Tx)), + WState = State#{ + tx_db := TxDb#{tx := {read_version, ReadVersion}}, + workers := [] + }, + Worker = spawn_monitor(fun() -> + process_changes(WState, MrSt, Changes) + end), + State#{ + workers := [Worker | Workers] + }. - ViewIds = [View#mrview.id_num || View <- Views], - - lists:foreach(fun(Doc) -> - couch_views_fdb:write_doc(TxDb, Sig, ViewIds, Doc) - end, Docs), - couch_views_fdb:set_update_seq(TxDb, Sig, LastSeq). +process_changes(State, MrSt, Changes0) -> + #{ + tx_db := #{tx := {read_version, ReadVersion}} = TxDb0 + } = State, + {ok, Db} = application:get_env(fabric, db), + exit(erlfdb:transactional(Db, fun(NewTx) -> + erlfdb:set_read_version(NewTx, ReadVersion), + TxDb1 = TxDb0#{tx := NewTx}, + Changes1 = fetch_docs(TxDb1, Changes0), + Changes2 = map_docs(MrSt, Changes1), + {ok, Changes2} + end)). fetch_docs(Db, Changes) -> - {Deleted, NotDeleted} = lists:partition(fun(Doc) -> - #{deleted := Deleted} = Doc, + {Deleted, NotDeleted} = lists:partition(fun(Change) -> + #{deleted := Deleted} = Change, Deleted end, Changes), @@ -269,7 +272,66 @@ fetch_docs(Db, Changes) -> Deleted ++ ChangesWithDocs. -start_query_server(#mrst{qserver = nil} = Mrst) -> +map_docs(MrSt, Changes) -> + % Run all the non deleted docs through the view engine and + {ok, QServer} = get_query_server(MrSt), + + {Deleted0, NotDeleted0} = lists:partition(fun(Change) -> + #{deleted := Deleted} = Change, + Deleted + end, Changes), + + Deleted1 = lists:map(fun(Change) -> + Change#{ + results => [] + } + end, Deleted0), + + DocsToMap = lists:map(fun(Change) -> + #{doc := DocRec} = Change, + DocRec + end, NotDeleted0), + + {ok, AllResults} = couch_eval:map_docs(QServer, DocsToMap), + + % The expanded function head here is making an assertion + % that the results match the given doc + NotDeleted1 = lists:zipwith(fun(#{id := Id} = Change, {Id, Results}) -> + Change#{ + doc := [], + results => Results + } + end, NotDeleted0, AllResults), + + % I'm being a bit careful here resorting the docs + % in order of the changes feed. Theoretically this is + % unnecessary since we're inside a single transaction. + % However, I'm concerned if we ever split this up + % into multiple transactions that this detail might + % be important but forgotten. + lists:sort(fun(A, B) -> + #{sequence := ASeq} = A, + #{sequence := BSeq} = B, + ASeq =< BSeq + end, Deleted1 ++ NotDeleted1). + + +write_changes(TxDb, MrSt, LastSeq, Changes) -> + #mrst{ + views = Views, + sig = Sig + } = MrSt, + + ViewIds = [View#mrview.id_num || View <- Views], + + lists:foreach(fun(Change) -> + couch_views_fdb:write_doc(TxDb, Sig, ViewIds, Change) + end, Changes), + + couch_views_fdb:set_update_seq(TxDb, Sig, LastSeq). + + +get_query_server(#mrst{qserver = nil} = Mrst) -> #mrst{ db_name = DbName, idx_name = DDocId, @@ -278,18 +340,14 @@ start_query_server(#mrst{qserver = nil} = Mrst) -> lib = Lib, views = Views } = Mrst, - {ok, QServer} = couch_eval:acquire_map_context( + couch_eval:acquire_map_context( DbName, DDocId, Language, Sig, Lib, [View#mrview.def || View <- Views] - ), - Mrst#mrst{qserver = QServer}; - -start_query_server(#mrst{} = Mrst) -> - Mrst. + ). report_progress(State, UpdateType) -> @@ -336,4 +394,9 @@ report_progress(State, UpdateType) -> num_changes() -> - config:get_integer("couch_views", "change_limit", 100). + config:get_integer("couch_views", "change_limit", 1000). + + +batch_size() -> + config:get_integer("couch_views", "batch_size", 100). + -- cgit v1.2.1