summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarren Smith <garren.smith@gmail.com>2020-04-06 12:04:56 +0200
committergarren smith <garren.smith@gmail.com>2020-04-23 19:24:43 +0200
commit5efcbfc3ca114696273f56b1c98876c95e63bda7 (patch)
tree0ddd75805d43401d311bbdea4ddf4843cda7ae69
parentf522b880b68109feb5960d6d8244ac034990059e (diff)
downloadcouchdb-5efcbfc3ca114696273f56b1c98876c95e63bda7.tar.gz
Add fold_docs for DocId list
Adds a fold_docs function that will do a parallel fetch for the supplied Doc Ids. This is used for _all_docs?keys=["id1", "id2"]. This uses a queue for fetching the revs and another queue for fetching the doc bodies. These queues will be drained if the future queue gets to large.
-rw-r--r--src/chttpd/src/chttpd_db.erl14
-rw-r--r--src/fabric/src/fabric2_db.erl142
-rw-r--r--test/elixir/test/all_docs_test.exs30
3 files changed, 177 insertions, 9 deletions
diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl
index 8cfcfecaa..078009590 100644
--- a/src/chttpd/src/chttpd_db.erl
+++ b/src/chttpd/src/chttpd_db.erl
@@ -902,9 +902,10 @@ send_all_docs_keys(Db, #mrargs{} = Args, VAcc0) ->
_ -> Args#mrargs.doc_options
end,
IncludeDocs = Args#mrargs.include_docs,
- lists:foldl(fun(DocId, Acc) ->
- OpenOpts = [deleted | DocOpts],
- Row0 = case fabric2_db:open_doc(Db, DocId, OpenOpts) of
+ OpenOpts = [deleted | DocOpts],
+
+ CB = fun(DocId, Doc, Acc) ->
+ Row0 = case Doc of
{not_found, missing} ->
#view_row{key = DocId};
{ok, #doc{deleted = true, revs = Revs}} ->
@@ -938,9 +939,10 @@ send_all_docs_keys(Db, #mrargs{} = Args, VAcc0) ->
}
end,
Row1 = fabric_view:transform_row(Row0),
- {ok, NewAcc} = view_cb(Row1, Acc),
- NewAcc
- end, VAcc1, Keys).
+ view_cb(Row1, Acc)
+ end,
+ {ok, VAcc2} = fabric2_db:fold_docs(Db, Keys, CB, VAcc1, OpenOpts),
+ VAcc2.
apply_args_to_keylist(Args, Keys0) ->
diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl
index 15694cdde..740f9abf6 100644
--- a/src/fabric/src/fabric2_db.erl
+++ b/src/fabric/src/fabric2_db.erl
@@ -110,6 +110,7 @@
fold_docs/3,
fold_docs/4,
+ fold_docs/5,
fold_design_docs/4,
fold_local_docs/4,
fold_changes/4,
@@ -969,6 +970,61 @@ fold_docs(Db, UserFun, UserAcc0, Options) ->
end).
+fold_docs(Db, DocIds, UserFun, UserAcc0, Options) ->
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ try
+ NeedsTreeOpts = [revs_info, conflicts, deleted_conflicts],
+ NeedsTree = (Options -- NeedsTreeOpts /= Options),
+
+ FetchRevs = case NeedsTree of
+ true ->
+ fun(DocId) ->
+ fabric2_fdb:get_all_revs_future(TxDb, DocId)
+ end;
+ false ->
+ fun(DocId) ->
+ fabric2_fdb:get_winning_revs_future(TxDb, DocId, 1)
+ end
+ end,
+ InitAcc = #{
+ revs_q => queue:new(),
+ revs_count => 0,
+ body_q => queue:new(),
+ body_count => 0,
+ doc_opts => Options,
+ user_acc => UserAcc0,
+ user_fun => UserFun
+ },
+
+ FinalAcc1 = lists:foldl(fun(DocId, Acc) ->
+ #{
+ revs_q := RevsQ,
+ revs_count := RevsCount
+ } = Acc,
+ Future = FetchRevs(DocId),
+ NewAcc = Acc#{
+ revs_q := queue:in({DocId, Future}, RevsQ),
+ revs_count := RevsCount + 1
+ },
+ drain_fold_docs_revs_futures(TxDb, NewAcc)
+ end, InitAcc, DocIds),
+
+ FinalAcc2 = drain_all_fold_docs_revs_futures(TxDb, FinalAcc1),
+ FinalAcc3 = drain_all_fold_docs_body_futures(TxDb, FinalAcc2),
+
+ #{
+ user_acc := FinalUserAcc
+ } = FinalAcc3,
+ {ok, FinalUserAcc}
+
+ catch throw:{stop, StopUserAcc} ->
+ {ok, StopUserAcc}
+ end
+ end).
+
+
+
+
fold_design_docs(Db, UserFun, UserAcc0, Options1) ->
Options2 = set_design_doc_keys(Options1),
fold_docs(Db, UserFun, UserAcc0, Options2).
@@ -1206,6 +1262,92 @@ drain_all_deleted_info_futures(FutureQ, UserFun, Acc) ->
end.
+drain_fold_docs_revs_futures(_TxDb, #{revs_count := C} = Acc) when C < 100 ->
+ Acc;
+drain_fold_docs_revs_futures(TxDb, Acc) ->
+ drain_one_fold_docs_revs_future(TxDb, Acc).
+
+
+drain_all_fold_docs_revs_futures(_TxDb, #{revs_count := C} = Acc) when C =< 0 ->
+ Acc;
+drain_all_fold_docs_revs_futures(TxDb, #{revs_count := C} = Acc) when C > 0 ->
+ NewAcc = drain_one_fold_docs_revs_future(TxDb, Acc),
+ drain_all_fold_docs_revs_futures(TxDb, NewAcc).
+
+
+drain_one_fold_docs_revs_future(TxDb, Acc) ->
+ #{
+ revs_q := RevsQ,
+ revs_count := RevsCount,
+ body_q := BodyQ,
+ body_count := BodyCount
+ } = Acc,
+ {{value, {DocId, RevsFuture}}, RestRevsQ} = queue:out(RevsQ),
+
+ Revs = fabric2_fdb:get_revs_wait(TxDb, RevsFuture),
+ DocFuture = case Revs of
+ [] ->
+ {DocId, [], not_found};
+ [_ | _] ->
+ Winner = get_rev_winner(Revs),
+ BodyFuture = fabric2_fdb:get_doc_body_future(TxDb, DocId, Winner),
+ {DocId, Revs, BodyFuture}
+ end,
+ NewAcc = Acc#{
+ revs_q := RestRevsQ,
+ revs_count := RevsCount - 1,
+ body_q := queue:in(DocFuture, BodyQ),
+ body_count := BodyCount + 1
+ },
+ drain_fold_docs_body_futures(TxDb, NewAcc).
+
+
+drain_fold_docs_body_futures(_TxDb, #{body_count := C} = Acc) when C < 100 ->
+ Acc;
+drain_fold_docs_body_futures(TxDb, Acc) ->
+ drain_one_fold_docs_body_future(TxDb, Acc).
+
+
+drain_all_fold_docs_body_futures(_TxDb, #{body_count := C} = Acc) when C =< 0 ->
+ Acc;
+drain_all_fold_docs_body_futures(TxDb, #{body_count := C} = Acc) when C > 0 ->
+ NewAcc = drain_one_fold_docs_body_future(TxDb, Acc),
+ drain_all_fold_docs_body_futures(TxDb, NewAcc).
+
+
+drain_one_fold_docs_body_future(TxDb, Acc) ->
+ #{
+ body_q := BodyQ,
+ body_count := BodyCount,
+ doc_opts := DocOpts,
+ user_fun := UserFun,
+ user_acc := UserAcc
+ } = Acc,
+ {{value, {DocId, Revs, BodyFuture}}, RestBodyQ} = queue:out(BodyQ),
+ Doc = case BodyFuture of
+ not_found ->
+ {not_found, missing};
+ _ ->
+ RevInfo = get_rev_winner(Revs),
+ Base = fabric2_fdb:get_doc_body_wait(TxDb, DocId, RevInfo,
+ BodyFuture),
+ apply_open_doc_opts(Base, Revs, DocOpts)
+ end,
+ NewUserAcc = maybe_stop(UserFun(DocId, Doc, UserAcc)),
+ Acc#{
+ body_q := RestBodyQ,
+ body_count := BodyCount - 1,
+ user_acc := NewUserAcc
+ }.
+
+
+get_rev_winner(Revs) ->
+ [Winner] = lists:filter(fun(Rev) ->
+ maps:get(winner, Rev)
+ end, Revs),
+ Winner.
+
+
new_revid(Db, Doc) ->
#doc{
id = DocId,
diff --git a/test/elixir/test/all_docs_test.exs b/test/elixir/test/all_docs_test.exs
index 16641aa95..d41d046b8 100644
--- a/test/elixir/test/all_docs_test.exs
+++ b/test/elixir/test/all_docs_test.exs
@@ -233,6 +233,26 @@ defmodule AllDocsTest do
end
@tag :with_db
+ test "POST with missing keys", context do
+ db_name = context[:db_name]
+
+ resp = Couch.post("/#{db_name}/_bulk_docs", body: %{docs: create_docs(0..3)})
+ assert resp.status_code in [201, 202]
+
+ resp = Couch.post(
+ "/#{db_name}/_all_docs",
+ body: %{
+ :keys => [1]
+ }
+ )
+
+ assert resp.status_code == 200
+ rows = resp.body["rows"]
+ assert length(rows) == 1
+ assert hd(rows) == %{"error" => "not_found", "key" => 1}
+ end
+
+ @tag :with_db
test "POST with keys and limit", context do
db_name = context[:db_name]
@@ -242,13 +262,17 @@ defmodule AllDocsTest do
resp = Couch.post(
"/#{db_name}/_all_docs",
body: %{
- :keys => [1, 2],
- :limit => 1
+ :keys => ["1", "2"],
+ :limit => 1,
+ :include_docs => true
}
)
assert resp.status_code == 200
- assert length(Map.get(resp, :body)["rows"]) == 1
+ rows = resp.body["rows"]
+ assert length(rows) == 1
+ doc = hd(rows)["doc"]
+ assert doc["string"] == "1"
end
@tag :with_db