diff options
author | Garren Smith <garren.smith@gmail.com> | 2020-04-06 12:04:56 +0200 |
---|---|---|
committer | garren smith <garren.smith@gmail.com> | 2020-04-23 19:24:43 +0200 |
commit | 5efcbfc3ca114696273f56b1c98876c95e63bda7 (patch) | |
tree | 0ddd75805d43401d311bbdea4ddf4843cda7ae69 | |
parent | f522b880b68109feb5960d6d8244ac034990059e (diff) | |
download | couchdb-5efcbfc3ca114696273f56b1c98876c95e63bda7.tar.gz |
Add fold_docs for DocId list
Adds a fold_docs function that will do a parallel fetch for the supplied
Doc Ids. This is used for _all_docs?keys=["id1", "id2"].
This uses a queue for fetching the revs and another queue for fetching
the doc bodies. These queues will be drained if the future queue gets
to large.
-rw-r--r-- | src/chttpd/src/chttpd_db.erl | 14 | ||||
-rw-r--r-- | src/fabric/src/fabric2_db.erl | 142 | ||||
-rw-r--r-- | test/elixir/test/all_docs_test.exs | 30 |
3 files changed, 177 insertions, 9 deletions
diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl index 8cfcfecaa..078009590 100644 --- a/src/chttpd/src/chttpd_db.erl +++ b/src/chttpd/src/chttpd_db.erl @@ -902,9 +902,10 @@ send_all_docs_keys(Db, #mrargs{} = Args, VAcc0) -> _ -> Args#mrargs.doc_options end, IncludeDocs = Args#mrargs.include_docs, - lists:foldl(fun(DocId, Acc) -> - OpenOpts = [deleted | DocOpts], - Row0 = case fabric2_db:open_doc(Db, DocId, OpenOpts) of + OpenOpts = [deleted | DocOpts], + + CB = fun(DocId, Doc, Acc) -> + Row0 = case Doc of {not_found, missing} -> #view_row{key = DocId}; {ok, #doc{deleted = true, revs = Revs}} -> @@ -938,9 +939,10 @@ send_all_docs_keys(Db, #mrargs{} = Args, VAcc0) -> } end, Row1 = fabric_view:transform_row(Row0), - {ok, NewAcc} = view_cb(Row1, Acc), - NewAcc - end, VAcc1, Keys). + view_cb(Row1, Acc) + end, + {ok, VAcc2} = fabric2_db:fold_docs(Db, Keys, CB, VAcc1, OpenOpts), + VAcc2. apply_args_to_keylist(Args, Keys0) -> diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl index 15694cdde..740f9abf6 100644 --- a/src/fabric/src/fabric2_db.erl +++ b/src/fabric/src/fabric2_db.erl @@ -110,6 +110,7 @@ fold_docs/3, fold_docs/4, + fold_docs/5, fold_design_docs/4, fold_local_docs/4, fold_changes/4, @@ -969,6 +970,61 @@ fold_docs(Db, UserFun, UserAcc0, Options) -> end). +fold_docs(Db, DocIds, UserFun, UserAcc0, Options) -> + fabric2_fdb:transactional(Db, fun(TxDb) -> + try + NeedsTreeOpts = [revs_info, conflicts, deleted_conflicts], + NeedsTree = (Options -- NeedsTreeOpts /= Options), + + FetchRevs = case NeedsTree of + true -> + fun(DocId) -> + fabric2_fdb:get_all_revs_future(TxDb, DocId) + end; + false -> + fun(DocId) -> + fabric2_fdb:get_winning_revs_future(TxDb, DocId, 1) + end + end, + InitAcc = #{ + revs_q => queue:new(), + revs_count => 0, + body_q => queue:new(), + body_count => 0, + doc_opts => Options, + user_acc => UserAcc0, + user_fun => UserFun + }, + + FinalAcc1 = lists:foldl(fun(DocId, Acc) -> + #{ + revs_q := RevsQ, + revs_count := RevsCount + } = Acc, + Future = FetchRevs(DocId), + NewAcc = Acc#{ + revs_q := queue:in({DocId, Future}, RevsQ), + revs_count := RevsCount + 1 + }, + drain_fold_docs_revs_futures(TxDb, NewAcc) + end, InitAcc, DocIds), + + FinalAcc2 = drain_all_fold_docs_revs_futures(TxDb, FinalAcc1), + FinalAcc3 = drain_all_fold_docs_body_futures(TxDb, FinalAcc2), + + #{ + user_acc := FinalUserAcc + } = FinalAcc3, + {ok, FinalUserAcc} + + catch throw:{stop, StopUserAcc} -> + {ok, StopUserAcc} + end + end). + + + + fold_design_docs(Db, UserFun, UserAcc0, Options1) -> Options2 = set_design_doc_keys(Options1), fold_docs(Db, UserFun, UserAcc0, Options2). @@ -1206,6 +1262,92 @@ drain_all_deleted_info_futures(FutureQ, UserFun, Acc) -> end. +drain_fold_docs_revs_futures(_TxDb, #{revs_count := C} = Acc) when C < 100 -> + Acc; +drain_fold_docs_revs_futures(TxDb, Acc) -> + drain_one_fold_docs_revs_future(TxDb, Acc). + + +drain_all_fold_docs_revs_futures(_TxDb, #{revs_count := C} = Acc) when C =< 0 -> + Acc; +drain_all_fold_docs_revs_futures(TxDb, #{revs_count := C} = Acc) when C > 0 -> + NewAcc = drain_one_fold_docs_revs_future(TxDb, Acc), + drain_all_fold_docs_revs_futures(TxDb, NewAcc). + + +drain_one_fold_docs_revs_future(TxDb, Acc) -> + #{ + revs_q := RevsQ, + revs_count := RevsCount, + body_q := BodyQ, + body_count := BodyCount + } = Acc, + {{value, {DocId, RevsFuture}}, RestRevsQ} = queue:out(RevsQ), + + Revs = fabric2_fdb:get_revs_wait(TxDb, RevsFuture), + DocFuture = case Revs of + [] -> + {DocId, [], not_found}; + [_ | _] -> + Winner = get_rev_winner(Revs), + BodyFuture = fabric2_fdb:get_doc_body_future(TxDb, DocId, Winner), + {DocId, Revs, BodyFuture} + end, + NewAcc = Acc#{ + revs_q := RestRevsQ, + revs_count := RevsCount - 1, + body_q := queue:in(DocFuture, BodyQ), + body_count := BodyCount + 1 + }, + drain_fold_docs_body_futures(TxDb, NewAcc). + + +drain_fold_docs_body_futures(_TxDb, #{body_count := C} = Acc) when C < 100 -> + Acc; +drain_fold_docs_body_futures(TxDb, Acc) -> + drain_one_fold_docs_body_future(TxDb, Acc). + + +drain_all_fold_docs_body_futures(_TxDb, #{body_count := C} = Acc) when C =< 0 -> + Acc; +drain_all_fold_docs_body_futures(TxDb, #{body_count := C} = Acc) when C > 0 -> + NewAcc = drain_one_fold_docs_body_future(TxDb, Acc), + drain_all_fold_docs_body_futures(TxDb, NewAcc). + + +drain_one_fold_docs_body_future(TxDb, Acc) -> + #{ + body_q := BodyQ, + body_count := BodyCount, + doc_opts := DocOpts, + user_fun := UserFun, + user_acc := UserAcc + } = Acc, + {{value, {DocId, Revs, BodyFuture}}, RestBodyQ} = queue:out(BodyQ), + Doc = case BodyFuture of + not_found -> + {not_found, missing}; + _ -> + RevInfo = get_rev_winner(Revs), + Base = fabric2_fdb:get_doc_body_wait(TxDb, DocId, RevInfo, + BodyFuture), + apply_open_doc_opts(Base, Revs, DocOpts) + end, + NewUserAcc = maybe_stop(UserFun(DocId, Doc, UserAcc)), + Acc#{ + body_q := RestBodyQ, + body_count := BodyCount - 1, + user_acc := NewUserAcc + }. + + +get_rev_winner(Revs) -> + [Winner] = lists:filter(fun(Rev) -> + maps:get(winner, Rev) + end, Revs), + Winner. + + new_revid(Db, Doc) -> #doc{ id = DocId, diff --git a/test/elixir/test/all_docs_test.exs b/test/elixir/test/all_docs_test.exs index 16641aa95..d41d046b8 100644 --- a/test/elixir/test/all_docs_test.exs +++ b/test/elixir/test/all_docs_test.exs @@ -233,6 +233,26 @@ defmodule AllDocsTest do end @tag :with_db + test "POST with missing keys", context do + db_name = context[:db_name] + + resp = Couch.post("/#{db_name}/_bulk_docs", body: %{docs: create_docs(0..3)}) + assert resp.status_code in [201, 202] + + resp = Couch.post( + "/#{db_name}/_all_docs", + body: %{ + :keys => [1] + } + ) + + assert resp.status_code == 200 + rows = resp.body["rows"] + assert length(rows) == 1 + assert hd(rows) == %{"error" => "not_found", "key" => 1} + end + + @tag :with_db test "POST with keys and limit", context do db_name = context[:db_name] @@ -242,13 +262,17 @@ defmodule AllDocsTest do resp = Couch.post( "/#{db_name}/_all_docs", body: %{ - :keys => [1, 2], - :limit => 1 + :keys => ["1", "2"], + :limit => 1, + :include_docs => true } ) assert resp.status_code == 200 - assert length(Map.get(resp, :body)["rows"]) == 1 + rows = resp.body["rows"] + assert length(rows) == 1 + doc = hd(rows)["doc"] + assert doc["string"] == "1" end @tag :with_db |