diff options
author | Paul J. Davis <paul.joseph.davis@gmail.com> | 2019-06-19 11:58:47 -0500 |
---|---|---|
committer | Paul J. Davis <paul.joseph.davis@gmail.com> | 2019-07-31 11:55:30 -0500 |
commit | a8e306d5dca1cb647d5dc51b73aa10d611ae291d (patch) | |
tree | 0f47aae1447fe934844587d926fdcdfbefdd7bdf | |
parent | 40561bc83ac171f24cd9adace464112512ec08da (diff) | |
download | couchdb-a8e306d5dca1cb647d5dc51b73aa10d611ae291d.tar.gz |
Implement _all_dbs/_all_docs API parameters
This adds the mapping of CouchDB start/end keys and so on to the similar
yet slightly different concepts in FoundationDB. The handlers for
`_all_dbs` and `_all_docs` have been udpated to use this new logic.
-rw-r--r-- | src/chttpd/src/chttpd_changes.erl | 10 | ||||
-rw-r--r-- | src/chttpd/src/chttpd_db.erl | 220 | ||||
-rw-r--r-- | src/chttpd/src/chttpd_misc.erl | 67 | ||||
-rw-r--r-- | src/fabric/src/fabric2_db.erl | 143 | ||||
-rw-r--r-- | src/fabric/src/fabric2_fdb.erl | 236 | ||||
-rw-r--r-- | src/fabric/test/fabric2_doc_fold_tests.erl | 84 | ||||
-rw-r--r-- | test/elixir/test/all_docs_test.exs | 3 |
7 files changed, 512 insertions, 251 deletions
diff --git a/src/chttpd/src/chttpd_changes.erl b/src/chttpd/src/chttpd_changes.erl index 0e034828e..c9107d16b 100644 --- a/src/chttpd/src/chttpd_changes.erl +++ b/src/chttpd/src/chttpd_changes.erl @@ -871,15 +871,19 @@ changes_row(Results, Change, Acc) -> maybe_get_changes_doc(Value, #changes_acc{include_docs=true}=Acc) -> #changes_acc{ db = Db, - doc_options = DocOpts, + doc_options = DocOpts0, conflicts = Conflicts, filter = Filter } = Acc, - Opts = case Conflicts of + OpenOpts = case Conflicts of true -> [deleted, conflicts]; false -> [deleted] end, - load_doc(Db, Value, Opts, DocOpts, Filter); + DocOpts1 = case Conflicts of + true -> [conflicts | DocOpts0]; + false -> DocOpts0 + end, + load_doc(Db, Value, OpenOpts, DocOpts1, Filter); maybe_get_changes_doc(_Value, _Acc) -> []. diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl index c0ac1caa3..90869c6dd 100644 --- a/src/chttpd/src/chttpd_db.erl +++ b/src/chttpd/src/chttpd_db.erl @@ -16,6 +16,7 @@ -include_lib("couch/include/couch_db.hrl"). -include_lib("couch_mrview/include/couch_mrview.hrl"). +-include_lib("fabric/include/fabric.hrl"). -include_lib("mem3/include/mem3.hrl"). -export([handle_request/1, handle_compact_req/2, handle_design_req/2, @@ -825,21 +826,151 @@ multi_all_docs_view(Req, Db, OP, Queries) -> {ok, Resp1} = chttpd:send_delayed_chunk(VAcc2#vacc.resp, "\r\n]}"), chttpd:end_delayed_json_response(Resp1). -all_docs_view(Req, Db, _Keys, _OP) -> - % Args0 = couch_mrview_http:parse_params(Req, Keys), - % Args1 = Args0#mrargs{view_type=map}, - % Args2 = fabric_util:validate_all_docs_args(Db, Args1), - % Args3 = set_namespace(OP, Args2), - Options = [{user_ctx, Req#httpd.user_ctx}], +all_docs_view(Req, Db, Keys, OP) -> + Args0 = couch_mrview_http:parse_params(Req, Keys), + Args1 = set_namespace(OP, Args0), Max = chttpd:chunked_response_buffer_size(), - VAcc = #vacc{db=Db, req=Req, threshold=Max}, - {ok, Resp} = fabric2_db:fold_docs(Db, fun view_cb/2, VAcc, Options), - {ok, Resp#vacc.resp}. + VAcc0 = #vacc{ + db = Db, + req = Req, + threshold = Max + }, + case Args1#mrargs.keys of + undefined -> + Options = [ + {user_ctx, Req#httpd.user_ctx}, + {dir, Args1#mrargs.direction}, + {start_key, Args1#mrargs.start_key}, + {end_key, Args1#mrargs.end_key}, + {limit, Args1#mrargs.limit}, + {skip, Args1#mrargs.skip}, + {update_seq, Args1#mrargs.update_seq} + ], + Acc = {iter, Db, Args1, VAcc0}, + {ok, {iter, _, _, Resp}} = + fabric2_db:fold_docs(Db, fun view_cb/2, Acc, Options), + {ok, Resp#vacc.resp}; + Keys0 when is_list(Keys0) -> + Keys1 = apply_args_to_keylist(Args1, Keys0), + %% namespace can be _set_ to `undefined`, so we + %% want simulate enum here + NS = case couch_util:get_value(namespace, Args1#mrargs.extra) of + <<"_all_docs">> -> <<"_all_docs">>; + <<"_design">> -> <<"_design">>; + <<"_local">> -> <<"_local">>; + _ -> <<"_all_docs">> + end, + TotalRows = fabric2_db:get_doc_count(Db, NS), + Meta = case Args1#mrargs.update_seq of + true -> + UpdateSeq = fabric2_db:get_update_seq(Db), + [{update_seq, UpdateSeq}]; + false -> + [] + end ++ [{total, TotalRows}, {offset, null}], + {ok, VAcc1} = view_cb({meta, Meta}, VAcc0), + DocOpts = case Args1#mrargs.conflicts of + true -> [conflicts | Args1#mrargs.doc_options]; + _ -> Args1#mrargs.doc_options + end ++ [{user_ctx, Req#httpd.user_ctx}], + IncludeDocs = Args1#mrargs.include_docs, + VAcc2 = lists:foldl(fun(DocId, Acc) -> + OpenOpts = [deleted | DocOpts], + Row0 = case fabric2_db:open_doc(Db, DocId, OpenOpts) of + {not_found, missing} -> + #view_row{key = DocId}; + {ok, #doc{deleted = true, revs = Revs}} -> + {RevPos, [RevId | _]} = Revs, + Value = {[ + {rev, couch_doc:rev_to_str({RevPos, RevId})}, + {deleted, true} + ]}, + DocValue = if not IncludeDocs -> undefined; true -> + null + end, + #view_row{ + key = DocId, + id = DocId, + value = Value, + doc = DocValue + }; + {ok, #doc{revs = Revs} = Doc0} -> + {RevPos, [RevId | _]} = Revs, + Value = {[ + {rev, couch_doc:rev_to_str({RevPos, RevId})} + ]}, + DocValue = if not IncludeDocs -> undefined; true -> + couch_doc:to_json_obj(Doc0, DocOpts) + end, + #view_row{ + key = DocId, + id = DocId, + value = Value, + doc = DocValue + } + end, + Row1 = fabric_view:transform_row(Row0), + {ok, NewAcc} = view_cb(Row1, Acc), + NewAcc + end, VAcc1, Keys1), + {ok, VAcc3} = view_cb(complete, VAcc2), + {ok, VAcc3#vacc.resp} + end. + + +apply_args_to_keylist(Args, Keys0) -> + Keys1 = case Args#mrargs.direction of + fwd -> Keys0; + _ -> lists:reverse(Keys0) + end, + Keys2 = case Args#mrargs.skip < length(Keys1) of + true -> lists:nthtail(Args#mrargs.skip, Keys1); + false -> [] + end, + case Args#mrargs.limit < length(Keys2) of + true -> lists:sublist(Keys2, Args#mrargs.limit); + false -> Keys2 + end. + + +view_cb({row, Row}, {iter, Db, Args, VAcc}) -> + NewRow = case lists:keymember(doc, 1, Row) of + true -> + chttpd_stats:incr_reads(); + false when Args#mrargs.include_docs -> + {id, DocId} = lists:keyfind(id, 1, Row), + chttpd_stats:incr_reads(), + DocOpts = case Args#mrargs.conflicts of + true -> [conflicts | Args#mrargs.doc_options]; + _ -> Args#mrargs.doc_options + end ++ [{user_ctx, (VAcc#vacc.req)#httpd.user_ctx}], + OpenOpts = [deleted | DocOpts], + DocMember = case fabric2_db:open_doc(Db, DocId, OpenOpts) of + {not_found, missing} -> + []; + {ok, #doc{deleted = true}} -> + [{doc, null}]; + {ok, #doc{} = Doc} -> + [{doc, couch_doc:to_json_obj(Doc, DocOpts)}] + end, + Row ++ DocMember; + _ -> + Row + end, + chttpd_stats:incr_rows(), + {Go, NewVAcc} = couch_mrview_http:view_cb({row, NewRow}, VAcc), + {Go, {iter, Db, Args, NewVAcc}}; + +view_cb(Msg, {iter, Db, Args, VAcc}) -> + {Go, NewVAcc} = couch_mrview_http:view_cb(Msg, VAcc), + {Go, {iter, Db, Args, NewVAcc}}; view_cb({row, Row} = Msg, Acc) -> case lists:keymember(doc, 1, Row) of - true -> chttpd_stats:incr_reads(); - false -> ok + true -> + chttpd_stats:incr_reads(); + false -> + ok end, chttpd_stats:incr_rows(), couch_mrview_http:view_cb(Msg, Acc); @@ -2005,70 +2136,3 @@ bulk_get_json_error(DocId, Rev, Error, Reason) -> {<<"rev">>, Rev}, {<<"error">>, Error}, {<<"reason">>, Reason}]}}]}). - - --ifdef(TEST). --include_lib("eunit/include/eunit.hrl"). - -monitor_attachments_test_() -> - {"ignore stubs", - fun () -> - Atts = [couch_att:new([{data, stub}])], - ?_assertEqual([], monitor_attachments(Atts)) - end - }. - -parse_partitioned_opt_test_() -> - { - foreach, - fun setup/0, - fun teardown/1, - [ - t_should_allow_partitioned_db(), - t_should_throw_on_not_allowed_partitioned_db(), - t_returns_empty_array_for_partitioned_false(), - t_returns_empty_array_for_no_partitioned_qs() - ] - }. - - -setup() -> - ok. - -teardown(_) -> - meck:unload(). - -mock_request(Url) -> - Headers = mochiweb_headers:make([{"Host", "examples.com"}]), - MochiReq = mochiweb_request:new(nil, 'PUT', Url, {1, 1}, Headers), - #httpd{mochi_req = MochiReq}. - -t_should_allow_partitioned_db() -> - ?_test(begin - meck:expect(couch_flags, is_enabled, 2, true), - Req = mock_request("/all-test21?partitioned=true"), - [Partitioned, _] = parse_partitioned_opt(Req), - ?assertEqual(Partitioned, {partitioned, true}) - end). - -t_should_throw_on_not_allowed_partitioned_db() -> - ?_test(begin - meck:expect(couch_flags, is_enabled, 2, false), - Req = mock_request("/all-test21?partitioned=true"), - Throw = {bad_request, <<"Partitioned feature is not enabled.">>}, - ?assertThrow(Throw, parse_partitioned_opt(Req)) - end). - -t_returns_empty_array_for_partitioned_false() -> - ?_test(begin - Req = mock_request("/all-test21?partitioned=false"), - ?assertEqual(parse_partitioned_opt(Req), []) - end). - -t_returns_empty_array_for_no_partitioned_qs() -> - ?_test(begin - Req = mock_request("/all-test21"), - ?assertEqual(parse_partitioned_opt(Req), []) - end). - --endif. diff --git a/src/chttpd/src/chttpd_misc.erl b/src/chttpd/src/chttpd_misc.erl index b244e84f6..e5f000264 100644 --- a/src/chttpd/src/chttpd_misc.erl +++ b/src/chttpd/src/chttpd_misc.erl @@ -108,39 +108,54 @@ maybe_add_csp_headers(Headers, _) -> Headers. handle_all_dbs_req(#httpd{method='GET'}=Req) -> - % TODO: Support args and options properly, transform - % this back into a fold call similar to the old - % version. - %% Args = couch_mrview_http:parse_params(Req, undefined), + #mrargs{ + start_key = StartKey, + end_key = EndKey, + direction = Dir, + limit = Limit, + skip = Skip + } = couch_mrview_http:parse_params(Req, undefined), + + Options = [ + {start_key, StartKey}, + {end_key, EndKey}, + {dir, Dir}, + {limit, Limit}, + {skip, Skip} + ], + % Eventually the Etag for this request will be derived % from the \xFFmetadataVersion key in fdb Etag = <<"foo">>, - %% Options = [{user_ctx, Req#httpd.user_ctx}], + {ok, Resp} = chttpd:etag_respond(Req, Etag, fun() -> - AllDbs = fabric2_db:list_dbs(), - chttpd:send_json(Req, AllDbs) - end); + {ok, Resp} = chttpd:start_delayed_json_response(Req, 200, [{"ETag",Etag}]), + Callback = fun all_dbs_callback/2, + Acc = #vacc{req=Req,resp=Resp}, + fabric2_db:list_dbs(Callback, Acc, Options) + end), + case is_record(Resp, vacc) of + true -> {ok, Resp#vacc.resp}; + _ -> {ok, Resp} + end; handle_all_dbs_req(Req) -> send_method_not_allowed(Req, "GET,HEAD"). -%% all_dbs_callback({meta, _Meta}, #vacc{resp=Resp0}=Acc) -> -%% {ok, Resp1} = chttpd:send_delayed_chunk(Resp0, "["), -%% {ok, Acc#vacc{resp=Resp1}}; -%% all_dbs_callback({row, Row}, #vacc{resp=Resp0}=Acc) -> -%% Prepend = couch_mrview_http:prepend_val(Acc), -%% case couch_util:get_value(id, Row) of <<"_design", _/binary>> -> -%% {ok, Acc}; -%% DbName -> -%% {ok, Resp1} = chttpd:send_delayed_chunk(Resp0, [Prepend, ?JSON_ENCODE(DbName)]), -%% {ok, Acc#vacc{prepend=",", resp=Resp1}} -%% end; -%% all_dbs_callback(complete, #vacc{resp=Resp0}=Acc) -> -%% {ok, Resp1} = chttpd:send_delayed_chunk(Resp0, "]"), -%% {ok, Resp2} = chttpd:end_delayed_json_response(Resp1), -%% {ok, Acc#vacc{resp=Resp2}}; -%% all_dbs_callback({error, Reason}, #vacc{resp=Resp0}=Acc) -> -%% {ok, Resp1} = chttpd:send_delayed_error(Resp0, Reason), -%% {ok, Acc#vacc{resp=Resp1}}. +all_dbs_callback({meta, _Meta}, #vacc{resp=Resp0}=Acc) -> + {ok, Resp1} = chttpd:send_delayed_chunk(Resp0, "["), + {ok, Acc#vacc{resp=Resp1}}; +all_dbs_callback({row, Row}, #vacc{resp=Resp0}=Acc) -> + Prepend = couch_mrview_http:prepend_val(Acc), + DbName = couch_util:get_value(id, Row), + {ok, Resp1} = chttpd:send_delayed_chunk(Resp0, [Prepend, ?JSON_ENCODE(DbName)]), + {ok, Acc#vacc{prepend=",", resp=Resp1}}; +all_dbs_callback(complete, #vacc{resp=Resp0}=Acc) -> + {ok, Resp1} = chttpd:send_delayed_chunk(Resp0, "]"), + {ok, Resp2} = chttpd:end_delayed_json_response(Resp1), + {ok, Acc#vacc{resp=Resp2}}; +all_dbs_callback({error, Reason}, #vacc{resp=Resp0}=Acc) -> + {ok, Resp1} = chttpd:send_delayed_error(Resp0, Reason), + {ok, Acc#vacc{resp=Resp1}}. handle_dbs_info_req(#httpd{method='POST'}=Req) -> chttpd:validate_ctype(Req, "application/json"), diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl index 80028a645..eb74a183c 100644 --- a/src/fabric/src/fabric2_db.erl +++ b/src/fabric/src/fabric2_db.erl @@ -20,6 +20,7 @@ list_dbs/0, list_dbs/1, + list_dbs/3, is_admin/1, check_is_admin/1, @@ -194,8 +195,30 @@ list_dbs() -> list_dbs(Options) -> + Callback = fun(DbName, Acc) -> [DbName | Acc] end, + DbNames = fabric2_fdb:transactional(fun(Tx) -> + fabric2_fdb:list_dbs(Tx, Callback, [], Options) + end), + lists:reverse(DbNames). + + +list_dbs(UserFun, UserAcc0, Options) -> + FoldFun = fun + (DbName, Acc) -> maybe_stop(UserFun({row, [{id, DbName}]}, Acc)) + end, fabric2_fdb:transactional(fun(Tx) -> - fabric2_fdb:list_dbs(Tx, Options) + try + UserAcc1 = maybe_stop(UserFun({meta, []}, UserAcc0)), + UserAcc2 = fabric2_fdb:list_dbs( + Tx, + FoldFun, + UserAcc1, + Options + ), + {ok, maybe_stop(UserFun(complete, UserAcc2))} + catch throw:{stop, FinalUserAcc} -> + {ok, FinalUserAcc} + end end). @@ -406,6 +429,7 @@ open_doc(#{} = Db, <<?LOCAL_DOC_PREFIX, _/binary>> = DocId, _Options) -> open_doc(#{} = Db, DocId, Options) -> NeedsTreeOpts = [revs_info, conflicts, deleted_conflicts], NeedsTree = (Options -- NeedsTreeOpts /= Options), + OpenDeleted = lists:member(deleted, Options), fabric2_fdb:transactional(Db, fun(TxDb) -> Revs = case NeedsTree of true -> fabric2_fdb:get_all_revs(TxDb, DocId); @@ -414,6 +438,8 @@ open_doc(#{} = Db, DocId, Options) -> if Revs == [] -> {not_found, missing}; true -> #{winner := true} = RI = lists:last(Revs), case fabric2_fdb:get_doc_body(TxDb, DocId, RI) of + #doc{deleted = true} when not OpenDeleted -> + {not_found, deleted}; #doc{} = Doc -> apply_open_doc_opts(Doc, Revs, Options); Else -> @@ -451,8 +477,10 @@ open_doc_revs(Db, DocId, Revs, Options) -> rev_path => RevPath }, case fabric2_fdb:get_doc_body(TxDb, DocId, RevInfo) of - #doc{} = Doc -> {ok, Doc}; - Else -> {Else, {Pos, Rev}} + #doc{} = Doc -> + apply_open_doc_opts(Doc, AllRevInfos, Options); + Else -> + {Else, {Pos, Rev}} end end end, Found), @@ -615,9 +643,40 @@ fold_docs(Db, UserFun, UserAcc) -> fold_docs(Db, UserFun, UserAcc, []). -fold_docs(Db, UserFun, UserAcc, Options) -> +fold_docs(Db, UserFun, UserAcc0, Options) -> fabric2_fdb:transactional(Db, fun(TxDb) -> - fabric2_fdb:fold_docs(TxDb, UserFun, UserAcc, Options) + try + #{ + db_prefix := DbPrefix + } = TxDb, + + Prefix = erlfdb_tuple:pack({?DB_ALL_DOCS}, DbPrefix), + DocCount = get_doc_count(TxDb), + + Meta = case lists:keyfind(update_seq, 1, Options) of + {_, true} -> + UpdateSeq = fabric2_db:get_update_seq(TxDb), + [{update_seq, UpdateSeq}]; + _ -> + [] + end ++ [{total, DocCount}, {offset, null}], + + UserAcc1 = maybe_stop(UserFun({meta, Meta}, UserAcc0)), + + UserAcc2 = fabric2_fdb:fold_range(TxDb, Prefix, fun({K, V}, Acc) -> + {DocId} = erlfdb_tuple:unpack(K, Prefix), + RevId = erlfdb_tuple:unpack(V), + maybe_stop(UserFun({row, [ + {id, DocId}, + {key, DocId}, + {value, {[{rev, couch_doc:rev_to_str(RevId)}]}} + ]}, Acc)) + end, UserAcc1, Options), + + {ok, maybe_stop(UserFun(complete, UserAcc2))} + catch throw:{stop, FinalUserAcc} -> + {ok, FinalUserAcc} + end end). @@ -627,7 +686,44 @@ fold_changes(Db, SinceSeq, UserFun, UserAcc) -> fold_changes(Db, SinceSeq, UserFun, UserAcc, Options) -> fabric2_fdb:transactional(Db, fun(TxDb) -> - fabric2_fdb:fold_changes(TxDb, SinceSeq, UserFun, UserAcc, Options) + try + #{ + db_prefix := DbPrefix + } = TxDb, + + Prefix = erlfdb_tuple:pack({?DB_CHANGES}, DbPrefix), + + Dir = case fabric2_util:get_value(dir, Options, fwd) of + rev -> rev; + _ -> fwd + end, + + StartKey = get_since_seq(TxDb, Dir, SinceSeq), + EndKey = case Dir of + rev -> fabric2_util:seq_zero_vs(); + _ -> fabric2_util:seq_max_vs() + end, + FoldOpts = [ + {start_key, StartKey}, + {end_key, EndKey} + ] ++ Options, + + {ok, fabric2_fdb:fold_range(TxDb, Prefix, fun({K, V}, Acc) -> + {SeqVS} = erlfdb_tuple:unpack(K, Prefix), + {DocId, Deleted, RevId} = erlfdb_tuple:unpack(V), + + Change = #{ + id => DocId, + sequence => fabric2_fdb:vs_to_seq(SeqVS), + rev_id => RevId, + deleted => Deleted + }, + + maybe_stop(UserFun(Change, Acc)) + end, UserAcc, FoldOpts)} + catch throw:{stop, FinalUserAcc} -> + {ok, FinalUserAcc} + end end). @@ -796,7 +892,6 @@ apply_open_doc_opts(Doc, Revs, Options) -> IncludeConflicts = lists:member(conflicts, Options), IncludeDelConflicts = lists:member(deleted_conflicts, Options), IncludeLocalSeq = lists:member(local_seq, Options), - ReturnDeleted = lists:member(deleted, Options), % This revs_info becomes fairly useless now that we're % not keeping old document bodies around... @@ -827,14 +922,7 @@ apply_open_doc_opts(Doc, Revs, Options) -> [{local_seq, fabric2_fdb:vs_to_seq(SeqVS)}] end, - case Doc#doc.deleted and not ReturnDeleted of - true -> - {not_found, deleted}; - false -> - {ok, Doc#doc{ - meta = Meta1 ++ Meta2 ++ Meta3 ++ Meta4 - }} - end. + {ok, Doc#doc{meta = Meta1 ++ Meta2 ++ Meta3 ++ Meta4}}. filter_found_revs(RevInfo, Revs) -> @@ -1289,6 +1377,26 @@ check_duplicate_attachments(#doc{atts = Atts}) -> end, ordsets:new(), Atts). +get_since_seq(Db, rev, <<>>) -> + get_since_seq(Db, rev, now); + +get_since_seq(_Db, _Dir, Seq) when Seq == <<>>; Seq == <<"0">>; Seq == 0-> + fabric2_util:seq_zero_vs(); + +get_since_seq(Db, Dir, Seq) when Seq == now; Seq == <<"now">> -> + CurrSeq = fabric2_fdb:get_last_change(Db), + get_since_seq(Db, Dir, CurrSeq); + +get_since_seq(_Db, _Dir, Seq) when is_binary(Seq), size(Seq) == 24 -> + fabric2_fdb:next_vs(fabric2_fdb:seq_to_vs(Seq)); + +get_since_seq(Db, Dir, List) when is_list(List) -> + get_since_seq(Db, Dir, list_to_binary(List)); + +get_since_seq(_Db, _Dir, Seq) -> + erlang:error({invalid_since_seq, Seq}). + + get_leaf_path(Pos, Rev, [{Pos, [{Rev, _RevInfo} | LeafPath]} | _]) -> LeafPath; get_leaf_path(Pos, Rev, [_WrongLeaf | RestLeafs]) -> @@ -1353,3 +1461,8 @@ rev(Rev) when is_list(Rev); is_binary(Rev) -> rev({Seq, Hash} = Rev) when is_integer(Seq), is_binary(Hash) -> Rev. + +maybe_stop({ok, Acc}) -> + Acc; +maybe_stop({stop, Acc}) -> + throw({stop, Acc}). diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl index 4b0182646..670ce8b49 100644 --- a/src/fabric/src/fabric2_fdb.erl +++ b/src/fabric/src/fabric2_fdb.erl @@ -24,7 +24,7 @@ delete/1, exists/1, - list_dbs/2, + list_dbs/4, get_info/1, get_config/1, @@ -50,11 +50,13 @@ read_attachment/3, write_attachment/3, - fold_docs/4, - fold_changes/5, get_last_change/1, + fold_range/5, + vs_to_seq/1, + seq_to_vs/1, + next_vs/1, debug_cluster/0, debug_cluster/2 @@ -254,16 +256,15 @@ exists(#{name := DbName} = Db) when is_binary(DbName) -> end. -list_dbs(Tx, _Options) -> +list_dbs(Tx, Callback, AccIn, Options) -> Root = erlfdb_directory:root(), CouchDB = erlfdb_directory:create_or_open(Tx, Root, [<<"couchdb">>]), LayerPrefix = erlfdb_directory:get_name(CouchDB), - {Start, End} = erlfdb_tuple:range({?ALL_DBS}, LayerPrefix), - Future = erlfdb:get_range(Tx, Start, End), - lists:map(fun({K, _V}) -> - {?ALL_DBS, DbName} = erlfdb_tuple:unpack(K, LayerPrefix), - DbName - end, erlfdb:wait(Future)). + Prefix = erlfdb_tuple:pack({?ALL_DBS}, LayerPrefix), + fold_range({tx, Tx}, Prefix, fun({K, _V}, Acc) -> + {DbName} = erlfdb_tuple:unpack(K, Prefix), + Callback(DbName, Acc) + end, AccIn, Options). get_info(#{} = Db) -> @@ -508,24 +509,26 @@ write_doc(#{} = Db0, Doc, NewWinner0, OldWinner, ToUpdate, ToRemove) -> UpdateStatus = case {OldWinner, NewWinner} of {not_found, #{deleted := false}} -> created; + {not_found, #{deleted := true}} -> + deleted; {#{deleted := true}, #{deleted := false}} -> recreated; {#{deleted := false}, #{deleted := false}} -> updated; {#{deleted := false}, #{deleted := true}} -> + deleted; + {#{deleted := true}, #{deleted := true}} -> deleted end, case UpdateStatus of - Status when Status == created orelse Status == recreated -> - ADKey = erlfdb_tuple:pack({?DB_ALL_DOCS, DocId}, DbPrefix), - ADVal = erlfdb_tuple:pack(NewRevId), - ok = erlfdb:set(Tx, ADKey, ADVal); deleted -> ADKey = erlfdb_tuple:pack({?DB_ALL_DOCS, DocId}, DbPrefix), ok = erlfdb:clear(Tx, ADKey); - updated -> - ok + _ -> + ADKey = erlfdb_tuple:pack({?DB_ALL_DOCS, DocId}, DbPrefix), + ADVal = erlfdb_tuple:pack(NewRevId), + ok = erlfdb:set(Tx, ADKey, ADVal) end, % _changes @@ -640,84 +643,6 @@ write_attachment(#{} = Db, DocId, Data) when is_binary(Data) -> {ok, AttId}. -fold_docs(#{} = Db, UserFun, UserAcc0, Options) -> - #{ - tx := Tx, - db_prefix := DbPrefix - } = ensure_current(Db), - - {Reverse, Start, End} = get_dir_and_bounds(DbPrefix, Options), - - DocCountKey = erlfdb_tuple:pack({?DB_STATS, <<"doc_count">>}, DbPrefix), - DocCountBin = erlfdb:wait(erlfdb:get(Tx, DocCountKey)), - - try - UserAcc1 = maybe_stop(UserFun({meta, [ - {total, ?bin2uint(DocCountBin)}, - {offset, null} - ]}, UserAcc0)), - - UserAcc2 = erlfdb:fold_range(Tx, Start, End, fun({K, V}, UserAccIn) -> - {?DB_ALL_DOCS, DocId} = erlfdb_tuple:unpack(K, DbPrefix), - RevId = erlfdb_tuple:unpack(V), - maybe_stop(UserFun({row, [ - {id, DocId}, - {key, DocId}, - {value, couch_doc:rev_to_str(RevId)} - ]}, UserAccIn)) - end, UserAcc1, [{reverse, Reverse}] ++ Options), - - {ok, maybe_stop(UserFun(complete, UserAcc2))} - catch throw:{stop, FinalUserAcc} -> - {ok, FinalUserAcc} - end. - - -fold_changes(#{} = Db, SinceSeq0, UserFun, UserAcc0, Options) -> - #{ - tx := Tx, - db_prefix := DbPrefix - } = ensure_current(Db), - - SinceSeq1 = get_since_seq(SinceSeq0), - - Reverse = case fabric2_util:get_value(dir, Options, fwd) of - fwd -> false; - rev -> true - end, - - {Start0, End0} = case Reverse of - false -> {SinceSeq1, fabric2_util:seq_max_vs()}; - true -> {fabric2_util:seq_zero_vs(), SinceSeq1} - end, - - Start1 = erlfdb_tuple:pack({?DB_CHANGES, Start0}, DbPrefix), - End1 = erlfdb_tuple:pack({?DB_CHANGES, End0}, DbPrefix), - - {Start, End} = case Reverse of - false -> {erlfdb_key:first_greater_than(Start1), End1}; - true -> {Start1, erlfdb_key:first_greater_than(End1)} - end, - - try - {ok, erlfdb:fold_range(Tx, Start, End, fun({K, V}, UserAccIn) -> - {?DB_CHANGES, SeqVS} = erlfdb_tuple:unpack(K, DbPrefix), - {DocId, Deleted, RevId} = erlfdb_tuple:unpack(V), - - Change = #{ - id => DocId, - sequence => vs_to_seq(SeqVS), - rev_id => RevId, - deleted => Deleted - }, - - maybe_stop(UserFun(Change, UserAccIn)) - end, UserAcc0, [{reverse, Reverse}] ++ Options)} - catch throw:{stop, FinalUserAcc} -> - {ok, FinalUserAcc} - end. - - get_last_change(#{} = Db) -> #{ tx := Tx, @@ -735,17 +660,57 @@ get_last_change(#{} = Db) -> end. -maybe_stop({ok, Acc}) -> - Acc; -maybe_stop({stop, Acc}) -> - throw({stop, Acc}). +fold_range(#{} = Db, RangePrefix, Callback, Acc, Options) -> + #{ + tx := Tx + } = ensure_current(Db), + fold_range({tx, Tx}, RangePrefix, Callback, Acc, Options); + +fold_range({tx, Tx}, RangePrefix, UserCallback, UserAcc, Options) -> + case fabric2_util:get_value(limit, Options) of + 0 -> + % FoundationDB treats a limit of 0 as unlimited + % so we have to guard for that here. + UserAcc; + _ -> + {Start, End, Skip, FoldOpts} = get_fold_opts(RangePrefix, Options), + Callback = fun fold_range_cb/2, + Acc = {skip, Skip, UserCallback, UserAcc}, + {skip, _, UserCallback, OutAcc} = + erlfdb:fold_range(Tx, Start, End, Callback, Acc, FoldOpts), + OutAcc + end. -vs_to_seq(VS) -> +vs_to_seq(VS) when is_tuple(VS) -> + % 51 is the versionstamp type tag <<51:8, SeqBin:12/binary>> = erlfdb_tuple:pack({VS}), fabric2_util:to_hex(SeqBin). +seq_to_vs(Seq) when is_binary(Seq) -> + Seq1 = fabric2_util:from_hex(Seq), + % 51 is the versionstamp type tag + Seq2 = <<51:8, Seq1/binary>>, + {VS} = erlfdb_tuple:unpack(Seq2), + VS. + + +next_vs({versionstamp, VS, Batch, TxId}) -> + {V, B, T} = case TxId =< 65535 of + true -> + {VS, Batch, TxId + 1}; + false -> + case Batch =< 65535 of + true -> + {VS, Batch + 1, 0}; + false -> + {VS + 1, 0, 0} + end + end, + {versionstamp, V, B, T}. + + debug_cluster() -> debug_cluster(<<>>, <<16#FE, 16#FF, 16#FF>>). @@ -753,7 +718,7 @@ debug_cluster() -> debug_cluster(Start, End) -> transactional(fun(Tx) -> lists:foreach(fun({Key, Val}) -> - io:format("~s => ~s~n", [ + io:format(standard_error, "~s => ~s~n", [ string:pad(erlfdb_util:repr(Key), 60), erlfdb_util:repr(Val) ]) @@ -790,7 +755,7 @@ load_validate_doc_funs(#{} = Db) -> {end_key, <<"_design0">>} ], - {ok, Infos1} = fold_docs(Db, FoldFun, [], Options), + {ok, Infos1} = fabric2_db:fold_docs(Db, FoldFun, [], Options), Infos2 = lists:map(fun(Info) -> #{ @@ -999,11 +964,12 @@ chunkify_attachment(Data) -> end. -get_dir_and_bounds(DbPrefix, Options) -> - Reverse = case fabric2_util:get_value(dir, Options, fwd) of - fwd -> false; - rev -> true +get_fold_opts(RangePrefix, Options) -> + Reverse = case fabric2_util:get_value(dir, Options) of + rev -> true; + _ -> false end, + StartKey0 = fabric2_util:get_value(start_key, Options), EndKeyGt = fabric2_util:get_value(end_key_gt, Options), EndKey0 = fabric2_util:get_value(end_key, Options, EndKeyGt), @@ -1019,17 +985,17 @@ get_dir_and_bounds(DbPrefix, Options) -> % Set the maximum bounds for the start and endkey StartKey2 = case StartKey1 of - undefined -> {?DB_ALL_DOCS}; - SK2 when is_binary(SK2) -> {?DB_ALL_DOCS, SK2} + undefined -> <<>>; + SK2 -> SK2 end, EndKey2 = case EndKey1 of - undefined -> {?DB_ALL_DOCS, <<16#FF>>}; - EK2 when is_binary(EK2) -> {?DB_ALL_DOCS, EK2} + undefined -> <<255>>; + EK2 -> EK2 end, - StartKey3 = erlfdb_tuple:pack(StartKey2, DbPrefix), - EndKey3 = erlfdb_tuple:pack(EndKey2, DbPrefix), + StartKey3 = erlfdb_tuple:pack({StartKey2}, RangePrefix), + EndKey3 = erlfdb_tuple:pack({EndKey2}, RangePrefix), % FoundationDB ranges are applied as SK <= key < EK % By default, CouchDB is SK <= key <= EK with the @@ -1056,26 +1022,46 @@ get_dir_and_bounds(DbPrefix, Options) -> EndKey3 end, - {Reverse, StartKey4, EndKey4}. + Skip = case fabric2_util:get_value(skip, Options) of + S when is_integer(S), S >= 0 -> S; + _ -> 0 + end, + Limit = case fabric2_util:get_value(limit, Options) of + L when is_integer(L), L >= 0 -> [{limit, L + Skip}]; + undefined -> [] + end, -get_since_seq(Seq) when Seq == <<>>; Seq == <<"0">>; Seq == 0-> - fabric2_util:seq_zero_vs(); + TargetBytes = case fabric2_util:get_value(target_bytes, Options) of + T when is_integer(T), T >= 0 -> [{target_bytes, T}]; + undefined -> [] + end, -get_since_seq(Seq) when Seq == now; Seq == <<"now">> -> - fabric2_util:seq_max_vs(); + StreamingMode = case fabric2_util:get_value(streaming_mode, Options) of + undefined -> []; + Name when is_atom(Name) -> [{streaming_mode, Name}] + end, + + Snapshot = case fabric2_util:get_value(snapshot, Options) of + undefined -> []; + B when is_boolean(B) -> [{snapshot, B}] + end, + + OutOpts = [{reverse, Reverse}] + ++ Limit + ++ TargetBytes + ++ StreamingMode + ++ Snapshot, + + {StartKey4, EndKey4, Skip, OutOpts}. -get_since_seq(Seq) when is_binary(Seq), size(Seq) == 24 -> - Seq1 = fabric2_util:from_hex(Seq), - Seq2 = <<51:8, Seq1/binary>>, - {SeqVS} = erlfdb_tuple:unpack(Seq2), - SeqVS; -get_since_seq(List) when is_list(List) -> - get_since_seq(list_to_binary(List)); +fold_range_cb(KV, {skip, 0, Callback, Acc}) -> + NewAcc = Callback(KV, Acc), + {skip, 0, Callback, NewAcc}; -get_since_seq(Seq) -> - erlang:error({invalid_since_seq, Seq}). +fold_range_cb(_KV, {skip, N, Callback, Acc}) when is_integer(N), N > 0 -> + {skip, N - 1, Callback, Acc}. get_db_handle() -> diff --git a/src/fabric/test/fabric2_doc_fold_tests.erl b/src/fabric/test/fabric2_doc_fold_tests.erl index caa5f925a..ee0180f14 100644 --- a/src/fabric/test/fabric2_doc_fold_tests.erl +++ b/src/fabric/test/fabric2_doc_fold_tests.erl @@ -34,7 +34,10 @@ doc_fold_test_() -> fun fold_docs_with_start_key/1, fun fold_docs_with_end_key/1, fun fold_docs_with_both_keys_the_same/1, - fun fold_docs_with_different_keys/1 + fun fold_docs_with_different_keys/1, + fun fold_docs_with_limit/1, + fun fold_docs_with_skip/1, + fun fold_docs_with_skip_and_limit/1 ]} } }. @@ -50,7 +53,7 @@ setup() -> body = {[{<<"value">>, Val}]} }, {ok, Rev} = fabric2_db:update_doc(Db, Doc, []), - {DocId, couch_doc:rev_to_str(Rev)} + {DocId, {[{rev, couch_doc:rev_to_str(Rev)}]}} end, lists:seq(1, ?DOC_COUNT)), {Db, lists:sort(DocIdRevs), Ctx}. @@ -108,11 +111,58 @@ fold_docs_with_different_keys({Db, DocIdRevs, _}) -> end, lists:seq(1, 500)). +fold_docs_with_limit({Db, DocIdRevs, _}) -> + lists:foreach(fun(Limit) -> + Opts1 = [{limit, Limit}], + {ok, {?DOC_COUNT, Rows1}} = + fabric2_db:fold_docs(Db, fun fold_fun/2, [], Opts1), + ?assertEqual(lists:sublist(DocIdRevs, Limit), lists:reverse(Rows1)), + + Opts2 = [{dir, rev} | Opts1], + {ok, {?DOC_COUNT, Rows2}} = + fabric2_db:fold_docs(Db, fun fold_fun/2, [], Opts2), + ?assertEqual( + lists:sublist(lists:reverse(DocIdRevs), Limit), + lists:reverse(Rows2) + ) + end, lists:seq(0, 51)). + + +fold_docs_with_skip({Db, DocIdRevs, _}) -> + lists:foreach(fun(Skip) -> + Opts1 = [{skip, Skip}], + {ok, {?DOC_COUNT, Rows1}} = + fabric2_db:fold_docs(Db, fun fold_fun/2, [], Opts1), + Expect1 = case Skip > length(DocIdRevs) of + true -> []; + false -> lists:nthtail(Skip, DocIdRevs) + end, + ?assertEqual(Expect1, lists:reverse(Rows1)), + + Opts2 = [{dir, rev} | Opts1], + {ok, {?DOC_COUNT, Rows2}} = + fabric2_db:fold_docs(Db, fun fold_fun/2, [], Opts2), + Expect2 = case Skip > length(DocIdRevs) of + true -> []; + false -> lists:nthtail(Skip, lists:reverse(DocIdRevs)) + end, + ?assertEqual(Expect2, lists:reverse(Rows2)) + end, lists:seq(0, 51)). + + +fold_docs_with_skip_and_limit({Db, DocIdRevs, _}) -> + lists:foreach(fun(_) -> + check_skip_and_limit(Db, [], DocIdRevs), + check_skip_and_limit(Db, [{dir, rev}], lists:reverse(DocIdRevs)) + end, lists:seq(1, 100)). + + check_all_combos(Db, StartKey, EndKey, Rows) -> Opts1 = make_opts(fwd, StartKey, EndKey, true), {ok, {?DOC_COUNT, Rows1}} = fabric2_db:fold_docs(Db, fun fold_fun/2, [], Opts1), ?assertEqual(lists:reverse(Rows), Rows1), + check_skip_and_limit(Db, Opts1, Rows), Opts2 = make_opts(fwd, StartKey, EndKey, false), {ok, {?DOC_COUNT, Rows2}} = @@ -121,11 +171,13 @@ check_all_combos(Db, StartKey, EndKey, Rows) -> lists:reverse(all_but_last(Rows)) end, ?assertEqual(Expect2, Rows2), + check_skip_and_limit(Db, Opts2, lists:reverse(Expect2)), Opts3 = make_opts(rev, StartKey, EndKey, true), {ok, {?DOC_COUNT, Rows3}} = fabric2_db:fold_docs(Db, fun fold_fun/2, [], Opts3), ?assertEqual(Rows, Rows3), + check_skip_and_limit(Db, Opts3, lists:reverse(Rows)), Opts4 = make_opts(rev, StartKey, EndKey, false), {ok, {?DOC_COUNT, Rows4}} = @@ -133,8 +185,34 @@ check_all_combos(Db, StartKey, EndKey, Rows) -> Expect4 = if StartKey == undefined -> Rows; true -> tl(Rows) end, - ?assertEqual(Expect4, Rows4). + ?assertEqual(Expect4, Rows4), + check_skip_and_limit(Db, Opts4, lists:reverse(Expect4)). + + +check_skip_and_limit(Db, Opts, []) -> + Skip = rand:uniform(?DOC_COUNT + 1) - 1, + Limit = rand:uniform(?DOC_COUNT + 1) - 1, + NewOpts = [{skip, Skip}, {limit, Limit} | Opts], + {ok, {?DOC_COUNT, OutRows}} = + fabric2_db:fold_docs(Db, fun fold_fun/2, [], NewOpts), + ?assertEqual([], OutRows); + +check_skip_and_limit(Db, Opts, Rows) -> + Skip = rand:uniform(length(Rows) + 1) - 1, + Limit = rand:uniform(?DOC_COUNT + 1 - Skip) - 1, + + ExpectRows = case Skip >= length(Rows) of + true -> + []; + false -> + lists:sublist(lists:nthtail(Skip, Rows), Limit) + end, + SkipLimitOpts = [{skip, Skip}, {limit, Limit} | Opts], + {ok, {?DOC_COUNT, RevRows}} = + fabric2_db:fold_docs(Db, fun fold_fun/2, [], SkipLimitOpts), + OutRows = lists:reverse(RevRows), + ?assertEqual(ExpectRows, OutRows). make_opts(fwd, StartKey, EndKey, InclusiveEnd) -> diff --git a/test/elixir/test/all_docs_test.exs b/test/elixir/test/all_docs_test.exs index 9f6aeb61d..dab153a96 100644 --- a/test/elixir/test/all_docs_test.exs +++ b/test/elixir/test/all_docs_test.exs @@ -43,7 +43,8 @@ defmodule AllDocsTest do # Check _all_docs offset retry_until(fn -> resp = Couch.get("/#{db_name}/_all_docs", query: %{:startkey => "\"2\""}).body - assert resp["offset"] == 2 + assert resp["offset"] == :null + assert Enum.at(resp["rows"], 0)["key"] == "2" end) # Confirm that queries may assume raw collation |