summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul J. Davis <paul.joseph.davis@gmail.com>2019-10-18 16:32:14 -0500
committerPaul J. Davis <paul.joseph.davis@gmail.com>2019-10-18 16:45:22 -0500
commit07657867889b106a1a639660416ef69b714d1247 (patch)
tree35ea3110ee665902e43db837d0d3ebaac65e9de4
parentae0dc9657880a03836c4cac04833504dd2711b81 (diff)
downloadcouchdb-prototype/fdb-layer-parallel-view-builds.tar.gz
-rw-r--r--src/couch_views/src/couch_views_indexer.erl291
1 files changed, 177 insertions, 114 deletions
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index 55ce06311..f0b711756 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -42,7 +42,16 @@ init() ->
<<"sig">> := JobSig
} = Data,
- {ok, Db} = fabric2_db:open(DbName, [?ADMIN_CTX]),
+ {ok, Db} = try
+ fabric2_db:open(DbName, [?ADMIN_CTX])
+ catch error:database_does_not_exist ->
+ couch_jobs:finish(undefined, Job, Data#{
+ error => database_does_not_exist,
+ reason => <<"Database was deleted">>
+ }),
+ exit(normal)
+ end,
+
{ok, DDoc} = fabric2_db:open_doc(Db, DDocId),
{ok, Mrst} = couch_views_util:ddoc_to_mrst(DbName, DDoc),
HexSig = fabric2_util:to_hex(Mrst#mrst.sig),
@@ -57,92 +66,107 @@ init() ->
State = #{
tx_db => undefined,
- db_seq => undefined,
view_seq => undefined,
last_seq => undefined,
job => Job,
job_data => Data,
count => 0,
limit => num_changes(),
- doc_acc => [],
+ batch_size => batch_size(),
+ workers => [],
design_opts => Mrst#mrst.design_opts
},
update(Db, Mrst, State).
-update(#{} = Db, Mrst0, State0) ->
- {Mrst2, State4} = fabric2_fdb:transactional(Db, fun(TxDb) ->
+update(#{} = Db, MrSt, State0) ->
+ State2 = fabric2_fdb:transactional(Db, fun(TxDb1) ->
% In the first iteration of update we need
% to populate our db and view sequences
State1 = case State0 of
- #{db_seq := undefined} ->
- ViewSeq = couch_views_fdb:get_update_seq(TxDb, Mrst0),
+ #{view_seq := undefined} ->
+ ViewSeq = couch_views_fdb:get_update_seq(TxDb1, MrSt),
State0#{
- tx_db := TxDb,
- db_seq := fabric2_db:get_update_seq(TxDb),
+ tx_db := TxDb1,
view_seq := ViewSeq,
last_seq := ViewSeq
};
_ ->
State0#{
- tx_db := TxDb
+ tx_db := TxDb1
}
end,
- {ok, State2} = fold_changes(State1),
+ fold_changes(State1, MrSt)
+ end),
- #{
- count := Count,
- limit := Limit,
- doc_acc := DocAcc,
- last_seq := LastSeq
- } = State2,
+ #{
+ last_seq := LastSeq,
+ count := Count,
+ limit := Limit,
+ workers := Workers
+ } = State2,
+
+ % Bit odd to be starting with the newest
+ % worker first here but I think it's fine
+ % for now since we're collecting all updates
+ % into a single write transaction
+ Changes = lists:foldl(fun({WPid, WRef}, Acc) ->
+ receive
+ {'DOWN', WRef, process, WPid, {ok, NewChanges}} ->
+ NewChanges ++ Acc;
+ {'DOWN', WRef, process, WPid, Reason} ->
+ exit({worker_update_failed, Reason})
+ after 6000 ->
+ erlang:error("Timeout waiting for worker: ~p", [WPid])
+ end
+ end, [], Workers),
- DocAcc1 = fetch_docs(TxDb, DocAcc),
- {Mrst1, MappedDocs} = map_docs(Mrst0, DocAcc1),
- write_docs(TxDb, Mrst1, MappedDocs, State2),
+ State4 = fabric2_fdb:transactional(Db, fun(TxDb2) ->
+
+ write_changes(TxDb2, MrSt, LastSeq, Changes),
case Count < Limit of
true ->
- report_progress(State2, finished),
- {Mrst1, finished};
+ report_progress(State2#{tx_db := TxDb2}, finished),
+ finished;
false ->
- State3 = report_progress(State2, update),
- {Mrst1, State3#{
+ State3 = report_progress(State2#{tx_db := TxDb2}, update),
+ State3#{
tx_db := undefined,
+ view_seq := LastSeq,
count := 0,
- doc_acc := [],
- view_seq := LastSeq
- }}
+ workers := []
+ }
end
end),
- case State4 of
- finished ->
- couch_eval:release_map_context(Mrst2#mrst.qserver);
- _ ->
- update(Db, Mrst2, State4)
+ if State4 == finished -> ok; true ->
+ update(Db, MrSt, State4)
end.
-fold_changes(State) ->
+fold_changes(State0, MrSt) ->
#{
- view_seq := SinceSeq,
- limit := Limit,
- tx_db := TxDb
- } = State,
+ tx_db := TxDb,
+ view_seq := Seq,
+ limit := Limit
+ } = State0,
Fun = fun process_changes/2,
- fabric2_db:fold_changes(TxDb, SinceSeq, Fun, State, [{limit, Limit}]).
+ Acc = {State0, MrSt, []},
+ Opts = [{limit, Limit}],
+ {ok, AccOut} = fabric2_db:fold_changes(TxDb, Seq, Fun, Acc, Opts),
+ spawn_worker(AccOut).
-process_changes(Change, Acc) ->
+process_changes(Change, {State0, MrSt, Changes0}) ->
#{
- doc_acc := DocAcc,
count := Count,
+ batch_size := BatchSize,
design_opts := DesignOpts
- } = Acc,
+ } = State0,
#{
id := Id,
@@ -151,86 +175,65 @@ process_changes(Change, Acc) ->
IncludeDesign = lists:keymember(<<"include_design">>, 1, DesignOpts),
- Acc1 = case {Id, IncludeDesign} of
+ Changes1 = case {Id, IncludeDesign} of
{<<?DESIGN_DOC_PREFIX, _/binary>>, false} ->
- maps:merge(Acc, #{
- count => Count + 1,
- last_seq => LastSeq
- });
+ Changes0;
_ ->
- Acc#{
- doc_acc := DocAcc ++ [Change],
- count := Count + 1,
- last_seq := LastSeq
- }
+ [Change | Changes0]
end,
- {ok, Acc1}.
-
-
-map_docs(Mrst, Docs) ->
- % Run all the non deleted docs through the view engine and
- Mrst1 = start_query_server(Mrst),
- QServer = Mrst1#mrst.qserver,
- {Deleted0, NotDeleted0} = lists:partition(fun(Doc) ->
- #{deleted := Deleted} = Doc,
- Deleted
- end, Docs),
-
- Deleted1 = lists:map(fun(Doc) ->
- Doc#{results => []}
- end, Deleted0),
-
- DocsToMap = lists:map(fun(Doc) ->
- #{doc := DocRec} = Doc,
- DocRec
- end, NotDeleted0),
-
- {ok, AllResults} = couch_eval:map_docs(QServer, DocsToMap),
-
- % The expanded function head here is making an assertion
- % that the results match the given doc
- NotDeleted1 = lists:zipwith(fun(#{id := DocId} = Doc, {DocId, Results}) ->
- Doc#{results => Results}
- end, NotDeleted0, AllResults),
-
- % I'm being a bit careful here resorting the docs
- % in order of the changes feed. Theoretically this is
- % unnecessary since we're inside a single transaction.
- % However, I'm concerned if we ever split this up
- % into multiple transactions that this detail might
- % be important but forgotten.
- MappedDocs = lists:sort(fun(A, B) ->
- #{sequence := ASeq} = A,
- #{sequence := BSeq} = B,
- ASeq =< BSeq
- end, Deleted1 ++ NotDeleted1),
+ State1 = State0#{
+ count := Count + 1,
+ last_seq := LastSeq
+ },
- {Mrst1, MappedDocs}.
+ case length(Changes1) < BatchSize of
+ true ->
+ {ok, {State1, MrSt, Changes1}};
+ false ->
+ State2 = spawn_worker({State1, MrSt, Changes1}),
+ {ok, {State2, MrSt, []}}
+ end.
-write_docs(TxDb, Mrst, Docs, State) ->
- #mrst{
- views = Views,
- sig = Sig
- } = Mrst,
+spawn_worker({State, _MrSt, []}) ->
+ State;
+spawn_worker({State, MrSt, Changes}) when length(Changes) > 0 ->
#{
- last_seq := LastSeq
+ tx_db := #{tx := Tx} = TxDb,
+ workers := Workers
} = State,
+ ReadVersion = erlfdb:wait(erlfdb:get_read_version(Tx)),
+ WState = State#{
+ tx_db := TxDb#{tx := {read_version, ReadVersion}},
+ workers := []
+ },
+ Worker = spawn_monitor(fun() ->
+ process_changes(WState, MrSt, Changes)
+ end),
+ State#{
+ workers := [Worker | Workers]
+ }.
- ViewIds = [View#mrview.id_num || View <- Views],
-
- lists:foreach(fun(Doc) ->
- couch_views_fdb:write_doc(TxDb, Sig, ViewIds, Doc)
- end, Docs),
- couch_views_fdb:set_update_seq(TxDb, Sig, LastSeq).
+process_changes(State, MrSt, Changes0) ->
+ #{
+ tx_db := #{tx := {read_version, ReadVersion}} = TxDb0
+ } = State,
+ {ok, Db} = application:get_env(fabric, db),
+ exit(erlfdb:transactional(Db, fun(NewTx) ->
+ erlfdb:set_read_version(NewTx, ReadVersion),
+ TxDb1 = TxDb0#{tx := NewTx},
+ Changes1 = fetch_docs(TxDb1, Changes0),
+ Changes2 = map_docs(MrSt, Changes1),
+ {ok, Changes2}
+ end)).
fetch_docs(Db, Changes) ->
- {Deleted, NotDeleted} = lists:partition(fun(Doc) ->
- #{deleted := Deleted} = Doc,
+ {Deleted, NotDeleted} = lists:partition(fun(Change) ->
+ #{deleted := Deleted} = Change,
Deleted
end, Changes),
@@ -269,7 +272,66 @@ fetch_docs(Db, Changes) ->
Deleted ++ ChangesWithDocs.
-start_query_server(#mrst{qserver = nil} = Mrst) ->
+map_docs(MrSt, Changes) ->
+ % Run all the non deleted docs through the view engine and
+ {ok, QServer} = get_query_server(MrSt),
+
+ {Deleted0, NotDeleted0} = lists:partition(fun(Change) ->
+ #{deleted := Deleted} = Change,
+ Deleted
+ end, Changes),
+
+ Deleted1 = lists:map(fun(Change) ->
+ Change#{
+ results => []
+ }
+ end, Deleted0),
+
+ DocsToMap = lists:map(fun(Change) ->
+ #{doc := DocRec} = Change,
+ DocRec
+ end, NotDeleted0),
+
+ {ok, AllResults} = couch_eval:map_docs(QServer, DocsToMap),
+
+ % The expanded function head here is making an assertion
+ % that the results match the given doc
+ NotDeleted1 = lists:zipwith(fun(#{id := Id} = Change, {Id, Results}) ->
+ Change#{
+ doc := [],
+ results => Results
+ }
+ end, NotDeleted0, AllResults),
+
+ % I'm being a bit careful here resorting the docs
+ % in order of the changes feed. Theoretically this is
+ % unnecessary since we're inside a single transaction.
+ % However, I'm concerned if we ever split this up
+ % into multiple transactions that this detail might
+ % be important but forgotten.
+ lists:sort(fun(A, B) ->
+ #{sequence := ASeq} = A,
+ #{sequence := BSeq} = B,
+ ASeq =< BSeq
+ end, Deleted1 ++ NotDeleted1).
+
+
+write_changes(TxDb, MrSt, LastSeq, Changes) ->
+ #mrst{
+ views = Views,
+ sig = Sig
+ } = MrSt,
+
+ ViewIds = [View#mrview.id_num || View <- Views],
+
+ lists:foreach(fun(Change) ->
+ couch_views_fdb:write_doc(TxDb, Sig, ViewIds, Change)
+ end, Changes),
+
+ couch_views_fdb:set_update_seq(TxDb, Sig, LastSeq).
+
+
+get_query_server(#mrst{qserver = nil} = Mrst) ->
#mrst{
db_name = DbName,
idx_name = DDocId,
@@ -278,18 +340,14 @@ start_query_server(#mrst{qserver = nil} = Mrst) ->
lib = Lib,
views = Views
} = Mrst,
- {ok, QServer} = couch_eval:acquire_map_context(
+ couch_eval:acquire_map_context(
DbName,
DDocId,
Language,
Sig,
Lib,
[View#mrview.def || View <- Views]
- ),
- Mrst#mrst{qserver = QServer};
-
-start_query_server(#mrst{} = Mrst) ->
- Mrst.
+ ).
report_progress(State, UpdateType) ->
@@ -336,4 +394,9 @@ report_progress(State, UpdateType) ->
num_changes() ->
- config:get_integer("couch_views", "change_limit", 100).
+ config:get_integer("couch_views", "change_limit", 1000).
+
+
+batch_size() ->
+ config:get_integer("couch_views", "batch_size", 100).
+