diff options
author | Garren Smith <garren.smith@gmail.com> | 2019-09-09 16:01:41 +0200 |
---|---|---|
committer | garren smith <garren.smith@gmail.com> | 2019-09-09 17:44:07 +0200 |
commit | 207441269aa95ad9d683af7d87ced82145ff6843 (patch) | |
tree | ae0077227aa744e3722626be64e670a0f89a9dd5 | |
parent | 1c1d32690690dea966523de203f4f7be0b2177f4 (diff) | |
download | couchdb-207441269aa95ad9d683af7d87ced82145ff6843.tar.gz |
Fetch docs in parallel for view indexing
-rw-r--r-- | src/couch_views/src/couch_views_indexer.erl | 58 |
1 files changed, 45 insertions, 13 deletions
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl index 60c819486..83d1b6aa2 100644 --- a/src/couch_views/src/couch_views_indexer.erl +++ b/src/couch_views/src/couch_views_indexer.erl @@ -99,7 +99,8 @@ update(#{} = Db, Mrst0, State0) -> last_seq := LastSeq } = State2, - {Mrst1, MappedDocs} = map_docs(Mrst0, DocAcc), + DocAcc1 = fetch_docs(TxDb, DocAcc), + {Mrst1, MappedDocs} = map_docs(Mrst0, DocAcc1), write_docs(TxDb, Mrst1, MappedDocs, State2), case Count < Limit of @@ -140,14 +141,12 @@ process_changes(Change, Acc) -> #{ doc_acc := DocAcc, count := Count, - tx_db := TxDb, design_opts := DesignOpts } = Acc, #{ id := Id, - sequence := LastSeq, - deleted := Deleted + sequence := LastSeq } = Change, IncludeDesign = lists:keymember(<<"include_design">>, 1, DesignOpts), @@ -159,16 +158,8 @@ process_changes(Change, Acc) -> last_seq => LastSeq }); _ -> - % Making a note here that we should make fetching all the docs - % a parallel fdb operation - {ok, Doc} = case Deleted of - true -> {ok, []}; - false -> fabric2_db:open_doc(TxDb, Id) - end, - - Change1 = maps:put(doc, Doc, Change), Acc#{ - doc_acc := DocAcc ++ [Change1], + doc_acc := DocAcc ++ [Change], count := Count + 1, last_seq := LastSeq } @@ -215,6 +206,47 @@ write_docs(TxDb, Mrst, Docs, State) -> couch_views_fdb:set_update_seq(TxDb, Sig, LastSeq). +fetch_docs(Db, Changes) -> + {Deleted, NotDeleted} = lists:partition(fun(Doc) -> + #{deleted := Deleted} = Doc, + Deleted + end, Changes), + + RevState = lists:foldl(fun(Change, Acc) -> + #{id := Id} = Change, + RevFuture = fabric2_fdb:get_winning_revs_future(Db, Id, 1), + Acc#{ + RevFuture => {Id, Change} + } + end, #{}, NotDeleted), + + RevFutures = maps:keys(RevState), + BodyState = lists:foldl(fun(RevFuture, Acc) -> + {Id, Change} = maps:get(RevFuture, RevState), + Revs = fabric2_fdb:get_winning_revs_wait(Db, RevFuture), + + % I'm assuming that in this changes transaction that the winning + % doc body exists since it is listed in the changes feed as not deleted + #{winner := true} = RevInfo = lists:last(Revs), + BodyFuture = fabric2_fdb:get_doc_body_future(Db, Id, RevInfo), + Acc#{ + BodyFuture => {Id, RevInfo, Change} + } + end, #{}, erlfdb:wait_for_all(RevFutures)), + + BodyFutures = maps:keys(BodyState), + ChangesWithDocs = lists:map(fun (BodyFuture) -> + {Id, RevInfo, Change} = maps:get(BodyFuture, BodyState), + Doc = fabric2_fdb:get_doc_body_wait(Db, Id, RevInfo, BodyFuture), + Change#{doc => Doc} + end, erlfdb:wait_for_all(BodyFutures)), + + % This combines the deleted changes with the changes that contain docs + % Important to note that this is now unsorted. Which is fine for now + % But later could be an issue if we split this across transactions + Deleted ++ ChangesWithDocs. + + start_query_server(#mrst{qserver = nil} = Mrst) -> #mrst{ language = Language, |