summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarren Smith <garren.smith@gmail.com>2019-09-09 16:01:41 +0200
committergarren smith <garren.smith@gmail.com>2019-09-09 17:44:07 +0200
commit207441269aa95ad9d683af7d87ced82145ff6843 (patch)
treeae0077227aa744e3722626be64e670a0f89a9dd5
parent1c1d32690690dea966523de203f4f7be0b2177f4 (diff)
downloadcouchdb-207441269aa95ad9d683af7d87ced82145ff6843.tar.gz
Fetch docs in parallel for view indexing
-rw-r--r--src/couch_views/src/couch_views_indexer.erl58
1 files changed, 45 insertions, 13 deletions
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index 60c819486..83d1b6aa2 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -99,7 +99,8 @@ update(#{} = Db, Mrst0, State0) ->
last_seq := LastSeq
} = State2,
- {Mrst1, MappedDocs} = map_docs(Mrst0, DocAcc),
+ DocAcc1 = fetch_docs(TxDb, DocAcc),
+ {Mrst1, MappedDocs} = map_docs(Mrst0, DocAcc1),
write_docs(TxDb, Mrst1, MappedDocs, State2),
case Count < Limit of
@@ -140,14 +141,12 @@ process_changes(Change, Acc) ->
#{
doc_acc := DocAcc,
count := Count,
- tx_db := TxDb,
design_opts := DesignOpts
} = Acc,
#{
id := Id,
- sequence := LastSeq,
- deleted := Deleted
+ sequence := LastSeq
} = Change,
IncludeDesign = lists:keymember(<<"include_design">>, 1, DesignOpts),
@@ -159,16 +158,8 @@ process_changes(Change, Acc) ->
last_seq => LastSeq
});
_ ->
- % Making a note here that we should make fetching all the docs
- % a parallel fdb operation
- {ok, Doc} = case Deleted of
- true -> {ok, []};
- false -> fabric2_db:open_doc(TxDb, Id)
- end,
-
- Change1 = maps:put(doc, Doc, Change),
Acc#{
- doc_acc := DocAcc ++ [Change1],
+ doc_acc := DocAcc ++ [Change],
count := Count + 1,
last_seq := LastSeq
}
@@ -215,6 +206,47 @@ write_docs(TxDb, Mrst, Docs, State) ->
couch_views_fdb:set_update_seq(TxDb, Sig, LastSeq).
+fetch_docs(Db, Changes) ->
+ {Deleted, NotDeleted} = lists:partition(fun(Doc) ->
+ #{deleted := Deleted} = Doc,
+ Deleted
+ end, Changes),
+
+ RevState = lists:foldl(fun(Change, Acc) ->
+ #{id := Id} = Change,
+ RevFuture = fabric2_fdb:get_winning_revs_future(Db, Id, 1),
+ Acc#{
+ RevFuture => {Id, Change}
+ }
+ end, #{}, NotDeleted),
+
+ RevFutures = maps:keys(RevState),
+ BodyState = lists:foldl(fun(RevFuture, Acc) ->
+ {Id, Change} = maps:get(RevFuture, RevState),
+ Revs = fabric2_fdb:get_winning_revs_wait(Db, RevFuture),
+
+ % I'm assuming that in this changes transaction that the winning
+ % doc body exists since it is listed in the changes feed as not deleted
+ #{winner := true} = RevInfo = lists:last(Revs),
+ BodyFuture = fabric2_fdb:get_doc_body_future(Db, Id, RevInfo),
+ Acc#{
+ BodyFuture => {Id, RevInfo, Change}
+ }
+ end, #{}, erlfdb:wait_for_all(RevFutures)),
+
+ BodyFutures = maps:keys(BodyState),
+ ChangesWithDocs = lists:map(fun (BodyFuture) ->
+ {Id, RevInfo, Change} = maps:get(BodyFuture, BodyState),
+ Doc = fabric2_fdb:get_doc_body_wait(Db, Id, RevInfo, BodyFuture),
+ Change#{doc => Doc}
+ end, erlfdb:wait_for_all(BodyFutures)),
+
+ % This combines the deleted changes with the changes that contain docs
+ % Important to note that this is now unsorted. Which is fine for now
+ % But later could be an issue if we split this across transactions
+ Deleted ++ ChangesWithDocs.
+
+
start_query_server(#mrst{qserver = nil} = Mrst) ->
#mrst{
language = Language,