summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul J. Davis <paul.joseph.davis@gmail.com>2019-06-19 11:58:47 -0500
committerPaul J. Davis <paul.joseph.davis@gmail.com>2019-07-11 14:55:51 -0500
commiteb8d6666beb1798c7885c05b4f308daaf8889296 (patch)
treecd2dfccfba3b47651b2f5a05afc804a2e421e98b
parent3a14e2918efbfd2b1286995a9f39c71cf6ca88f8 (diff)
downloadcouchdb-prototype/fdb-all-dbs-all-docs-qs-params.tar.gz
Implement _all_dbs/_all_docs API parametersprototype/fdb-all-dbs-all-docs-qs-params
This adds the mapping of CouchDB start/end keys and so on to the similar yet slightly different concepts in FoundationDB. The handlers for `_all_dbs` and `_all_docs` have been udpated to use this new logic.
-rw-r--r--src/chttpd/src/chttpd_changes.erl10
-rw-r--r--src/chttpd/src/chttpd_db.erl218
-rw-r--r--src/chttpd/src/chttpd_misc.erl67
-rw-r--r--src/fabric/src/fabric2_db.erl138
-rw-r--r--src/fabric/src/fabric2_fdb.erl236
-rw-r--r--src/fabric/test/fabric2_doc_fold_tests.erl84
-rw-r--r--test/elixir/test/all_docs_test.exs3
7 files changed, 505 insertions, 251 deletions
diff --git a/src/chttpd/src/chttpd_changes.erl b/src/chttpd/src/chttpd_changes.erl
index 0e034828e..c9107d16b 100644
--- a/src/chttpd/src/chttpd_changes.erl
+++ b/src/chttpd/src/chttpd_changes.erl
@@ -871,15 +871,19 @@ changes_row(Results, Change, Acc) ->
maybe_get_changes_doc(Value, #changes_acc{include_docs=true}=Acc) ->
#changes_acc{
db = Db,
- doc_options = DocOpts,
+ doc_options = DocOpts0,
conflicts = Conflicts,
filter = Filter
} = Acc,
- Opts = case Conflicts of
+ OpenOpts = case Conflicts of
true -> [deleted, conflicts];
false -> [deleted]
end,
- load_doc(Db, Value, Opts, DocOpts, Filter);
+ DocOpts1 = case Conflicts of
+ true -> [conflicts | DocOpts0];
+ false -> DocOpts0
+ end,
+ load_doc(Db, Value, OpenOpts, DocOpts1, Filter);
maybe_get_changes_doc(_Value, _Acc) ->
[].
diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl
index c0ac1caa3..0266561aa 100644
--- a/src/chttpd/src/chttpd_db.erl
+++ b/src/chttpd/src/chttpd_db.erl
@@ -16,6 +16,7 @@
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_mrview/include/couch_mrview.hrl").
+-include_lib("fabric/include/fabric.hrl").
-include_lib("mem3/include/mem3.hrl").
-export([handle_request/1, handle_compact_req/2, handle_design_req/2,
@@ -825,21 +826,149 @@ multi_all_docs_view(Req, Db, OP, Queries) ->
{ok, Resp1} = chttpd:send_delayed_chunk(VAcc2#vacc.resp, "\r\n]}"),
chttpd:end_delayed_json_response(Resp1).
-all_docs_view(Req, Db, _Keys, _OP) ->
- % Args0 = couch_mrview_http:parse_params(Req, Keys),
- % Args1 = Args0#mrargs{view_type=map},
- % Args2 = fabric_util:validate_all_docs_args(Db, Args1),
- % Args3 = set_namespace(OP, Args2),
- Options = [{user_ctx, Req#httpd.user_ctx}],
+all_docs_view(Req, Db, Keys, OP) ->
+ Args0 = couch_mrview_http:parse_params(Req, Keys),
+ Args1 = set_namespace(OP, Args0),
Max = chttpd:chunked_response_buffer_size(),
- VAcc = #vacc{db=Db, req=Req, threshold=Max},
- {ok, Resp} = fabric2_db:fold_docs(Db, fun view_cb/2, VAcc, Options),
- {ok, Resp#vacc.resp}.
+ VAcc0 = #vacc{
+ db = Db,
+ req = Req,
+ threshold = Max
+ },
+ case Args1#mrargs.keys of
+ undefined ->
+ Options = [
+ {user_ctx, Req#httpd.user_ctx},
+ {dir, Args1#mrargs.direction},
+ {start_key, Args1#mrargs.start_key},
+ {end_key, Args1#mrargs.end_key},
+ {limit, Args1#mrargs.limit},
+ {skip, Args1#mrargs.skip}
+ ],
+ Acc = {iter, Db, Args1, VAcc0},
+ {ok, {iter, _, _, Resp}} =
+ fabric2_db:fold_docs(Db, fun view_cb/2, Acc, Options),
+ {ok, Resp#vacc.resp};
+ Keys0 when is_list(Keys0) ->
+ Keys1 = apply_args_to_keylist(Args1, Keys0),
+ %% namespace can be _set_ to `undefined`, so we
+ %% want simulate enum here
+ NS = case couch_util:get_value(namespace, Args1#mrargs.extra) of
+ <<"_all_docs">> -> <<"_all_docs">>;
+ <<"_design">> -> <<"_design">>;
+ <<"_local">> -> <<"_local">>;
+ _ -> <<"_all_docs">>
+ end,
+ TotalRows = fabric2_db:get_doc_count(Db, NS),
+ Meta = case Args1#mrargs.update_seq of
+ false ->
+ [{total, TotalRows}, {offset, null}];
+ true ->
+ [{total, TotalRows}, {offset, null}, {update_seq, null}]
+ end,
+ {ok, VAcc1} = view_cb({meta, Meta}, VAcc0),
+ DocOpts = case Args1#mrargs.conflicts of
+ true -> [conflicts | Args1#mrargs.doc_options];
+ _ -> Args1#mrargs.doc_options
+ end ++ [{user_ctx, Req#httpd.user_ctx}],
+ IncludeDocs = Args1#mrargs.include_docs,
+ VAcc2 = lists:foldl(fun(DocId, Acc) ->
+ OpenOpts = [deleted | DocOpts],
+ Row0 = case fabric2_db:open_doc(Db, DocId, OpenOpts) of
+ {not_found, missing} ->
+ #view_row{key = DocId};
+ {ok, #doc{deleted = true, revs = Revs}} ->
+ {RevPos, [RevId | _]} = Revs,
+ Value = {[
+ {rev, couch_doc:rev_to_str({RevPos, RevId})},
+ {deleted, true}
+ ]},
+ DocValue = if not IncludeDocs -> undefined; true ->
+ null
+ end,
+ #view_row{
+ key = DocId,
+ id = DocId,
+ value = Value,
+ doc = DocValue
+ };
+ {ok, #doc{revs = Revs} = Doc0} ->
+ {RevPos, [RevId | _]} = Revs,
+ Value = {[
+ {rev, couch_doc:rev_to_str({RevPos, RevId})}
+ ]},
+ DocValue = if not IncludeDocs -> undefined; true ->
+ couch_doc:to_json_obj(Doc0, DocOpts)
+ end,
+ #view_row{
+ key = DocId,
+ id = DocId,
+ value = Value,
+ doc = DocValue
+ }
+ end,
+ Row1 = fabric_view:transform_row(Row0),
+ {ok, NewAcc} = view_cb(Row1, Acc),
+ NewAcc
+ end, VAcc1, Keys1),
+ {ok, VAcc3} = view_cb(complete, VAcc2),
+ {ok, VAcc3#vacc.resp}
+ end.
+
+
+apply_args_to_keylist(Args, Keys0) ->
+ Keys1 = case Args#mrargs.direction of
+ fwd -> Keys0;
+ _ -> lists:reverse(Keys0)
+ end,
+ Keys2 = case Args#mrargs.skip < length(Keys1) of
+ true -> lists:nthtail(Args#mrargs.skip, Keys1);
+ false -> []
+ end,
+ case Args#mrargs.limit < length(Keys2) of
+ true -> lists:sublist(Keys2, Args#mrargs.limit);
+ false -> Keys2
+ end.
+
+
+view_cb({row, Row}, {iter, Db, Args, VAcc}) ->
+ NewRow = case lists:keymember(doc, 1, Row) of
+ true ->
+ chttpd_stats:incr_reads();
+ false when Args#mrargs.include_docs ->
+ {id, DocId} = lists:keyfind(id, 1, Row),
+ chttpd_stats:incr_reads(),
+ DocOpts = case Args#mrargs.conflicts of
+ true -> [conflicts | Args#mrargs.doc_options];
+ _ -> Args#mrargs.doc_options
+ end ++ [{user_ctx, (VAcc#vacc.req)#httpd.user_ctx}],
+ OpenOpts = [deleted | DocOpts],
+ DocMember = case fabric2_db:open_doc(Db, DocId, OpenOpts) of
+ {not_found, missing} ->
+ [];
+ {ok, #doc{deleted = true}} ->
+ [{doc, null}];
+ {ok, #doc{} = Doc} ->
+ [{doc, couch_doc:to_json_obj(Doc, DocOpts)}]
+ end,
+ Row ++ DocMember;
+ _ ->
+ Row
+ end,
+ chttpd_stats:incr_rows(),
+ {Go, NewVAcc} = couch_mrview_http:view_cb({row, NewRow}, VAcc),
+ {Go, {iter, Db, Args, NewVAcc}};
+
+view_cb(Msg, {iter, Db, Args, VAcc}) ->
+ {Go, NewVAcc} = couch_mrview_http:view_cb(Msg, VAcc),
+ {Go, {iter, Db, Args, NewVAcc}};
view_cb({row, Row} = Msg, Acc) ->
case lists:keymember(doc, 1, Row) of
- true -> chttpd_stats:incr_reads();
- false -> ok
+ true ->
+ chttpd_stats:incr_reads();
+ false ->
+ ok
end,
chttpd_stats:incr_rows(),
couch_mrview_http:view_cb(Msg, Acc);
@@ -2005,70 +2134,3 @@ bulk_get_json_error(DocId, Rev, Error, Reason) ->
{<<"rev">>, Rev},
{<<"error">>, Error},
{<<"reason">>, Reason}]}}]}).
-
-
--ifdef(TEST).
--include_lib("eunit/include/eunit.hrl").
-
-monitor_attachments_test_() ->
- {"ignore stubs",
- fun () ->
- Atts = [couch_att:new([{data, stub}])],
- ?_assertEqual([], monitor_attachments(Atts))
- end
- }.
-
-parse_partitioned_opt_test_() ->
- {
- foreach,
- fun setup/0,
- fun teardown/1,
- [
- t_should_allow_partitioned_db(),
- t_should_throw_on_not_allowed_partitioned_db(),
- t_returns_empty_array_for_partitioned_false(),
- t_returns_empty_array_for_no_partitioned_qs()
- ]
- }.
-
-
-setup() ->
- ok.
-
-teardown(_) ->
- meck:unload().
-
-mock_request(Url) ->
- Headers = mochiweb_headers:make([{"Host", "examples.com"}]),
- MochiReq = mochiweb_request:new(nil, 'PUT', Url, {1, 1}, Headers),
- #httpd{mochi_req = MochiReq}.
-
-t_should_allow_partitioned_db() ->
- ?_test(begin
- meck:expect(couch_flags, is_enabled, 2, true),
- Req = mock_request("/all-test21?partitioned=true"),
- [Partitioned, _] = parse_partitioned_opt(Req),
- ?assertEqual(Partitioned, {partitioned, true})
- end).
-
-t_should_throw_on_not_allowed_partitioned_db() ->
- ?_test(begin
- meck:expect(couch_flags, is_enabled, 2, false),
- Req = mock_request("/all-test21?partitioned=true"),
- Throw = {bad_request, <<"Partitioned feature is not enabled.">>},
- ?assertThrow(Throw, parse_partitioned_opt(Req))
- end).
-
-t_returns_empty_array_for_partitioned_false() ->
- ?_test(begin
- Req = mock_request("/all-test21?partitioned=false"),
- ?assertEqual(parse_partitioned_opt(Req), [])
- end).
-
-t_returns_empty_array_for_no_partitioned_qs() ->
- ?_test(begin
- Req = mock_request("/all-test21"),
- ?assertEqual(parse_partitioned_opt(Req), [])
- end).
-
--endif.
diff --git a/src/chttpd/src/chttpd_misc.erl b/src/chttpd/src/chttpd_misc.erl
index b244e84f6..e5f000264 100644
--- a/src/chttpd/src/chttpd_misc.erl
+++ b/src/chttpd/src/chttpd_misc.erl
@@ -108,39 +108,54 @@ maybe_add_csp_headers(Headers, _) ->
Headers.
handle_all_dbs_req(#httpd{method='GET'}=Req) ->
- % TODO: Support args and options properly, transform
- % this back into a fold call similar to the old
- % version.
- %% Args = couch_mrview_http:parse_params(Req, undefined),
+ #mrargs{
+ start_key = StartKey,
+ end_key = EndKey,
+ direction = Dir,
+ limit = Limit,
+ skip = Skip
+ } = couch_mrview_http:parse_params(Req, undefined),
+
+ Options = [
+ {start_key, StartKey},
+ {end_key, EndKey},
+ {dir, Dir},
+ {limit, Limit},
+ {skip, Skip}
+ ],
+
% Eventually the Etag for this request will be derived
% from the \xFFmetadataVersion key in fdb
Etag = <<"foo">>,
- %% Options = [{user_ctx, Req#httpd.user_ctx}],
+
{ok, Resp} = chttpd:etag_respond(Req, Etag, fun() ->
- AllDbs = fabric2_db:list_dbs(),
- chttpd:send_json(Req, AllDbs)
- end);
+ {ok, Resp} = chttpd:start_delayed_json_response(Req, 200, [{"ETag",Etag}]),
+ Callback = fun all_dbs_callback/2,
+ Acc = #vacc{req=Req,resp=Resp},
+ fabric2_db:list_dbs(Callback, Acc, Options)
+ end),
+ case is_record(Resp, vacc) of
+ true -> {ok, Resp#vacc.resp};
+ _ -> {ok, Resp}
+ end;
handle_all_dbs_req(Req) ->
send_method_not_allowed(Req, "GET,HEAD").
-%% all_dbs_callback({meta, _Meta}, #vacc{resp=Resp0}=Acc) ->
-%% {ok, Resp1} = chttpd:send_delayed_chunk(Resp0, "["),
-%% {ok, Acc#vacc{resp=Resp1}};
-%% all_dbs_callback({row, Row}, #vacc{resp=Resp0}=Acc) ->
-%% Prepend = couch_mrview_http:prepend_val(Acc),
-%% case couch_util:get_value(id, Row) of <<"_design", _/binary>> ->
-%% {ok, Acc};
-%% DbName ->
-%% {ok, Resp1} = chttpd:send_delayed_chunk(Resp0, [Prepend, ?JSON_ENCODE(DbName)]),
-%% {ok, Acc#vacc{prepend=",", resp=Resp1}}
-%% end;
-%% all_dbs_callback(complete, #vacc{resp=Resp0}=Acc) ->
-%% {ok, Resp1} = chttpd:send_delayed_chunk(Resp0, "]"),
-%% {ok, Resp2} = chttpd:end_delayed_json_response(Resp1),
-%% {ok, Acc#vacc{resp=Resp2}};
-%% all_dbs_callback({error, Reason}, #vacc{resp=Resp0}=Acc) ->
-%% {ok, Resp1} = chttpd:send_delayed_error(Resp0, Reason),
-%% {ok, Acc#vacc{resp=Resp1}}.
+all_dbs_callback({meta, _Meta}, #vacc{resp=Resp0}=Acc) ->
+ {ok, Resp1} = chttpd:send_delayed_chunk(Resp0, "["),
+ {ok, Acc#vacc{resp=Resp1}};
+all_dbs_callback({row, Row}, #vacc{resp=Resp0}=Acc) ->
+ Prepend = couch_mrview_http:prepend_val(Acc),
+ DbName = couch_util:get_value(id, Row),
+ {ok, Resp1} = chttpd:send_delayed_chunk(Resp0, [Prepend, ?JSON_ENCODE(DbName)]),
+ {ok, Acc#vacc{prepend=",", resp=Resp1}};
+all_dbs_callback(complete, #vacc{resp=Resp0}=Acc) ->
+ {ok, Resp1} = chttpd:send_delayed_chunk(Resp0, "]"),
+ {ok, Resp2} = chttpd:end_delayed_json_response(Resp1),
+ {ok, Acc#vacc{resp=Resp2}};
+all_dbs_callback({error, Reason}, #vacc{resp=Resp0}=Acc) ->
+ {ok, Resp1} = chttpd:send_delayed_error(Resp0, Reason),
+ {ok, Acc#vacc{resp=Resp1}}.
handle_dbs_info_req(#httpd{method='POST'}=Req) ->
chttpd:validate_ctype(Req, "application/json"),
diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl
index 80028a645..dab0b9ce2 100644
--- a/src/fabric/src/fabric2_db.erl
+++ b/src/fabric/src/fabric2_db.erl
@@ -20,6 +20,7 @@
list_dbs/0,
list_dbs/1,
+ list_dbs/3,
is_admin/1,
check_is_admin/1,
@@ -194,8 +195,30 @@ list_dbs() ->
list_dbs(Options) ->
+ Callback = fun(DbName, Acc) -> [DbName | Acc] end,
+ DbNames = fabric2_fdb:transactional(fun(Tx) ->
+ fabric2_fdb:list_dbs(Tx, Callback, [], Options)
+ end),
+ lists:reverse(DbNames).
+
+
+list_dbs(UserFun, UserAcc0, Options) ->
+ FoldFun = fun
+ (DbName, Acc) -> maybe_stop(UserFun({row, [{id, DbName}]}, Acc))
+ end,
fabric2_fdb:transactional(fun(Tx) ->
- fabric2_fdb:list_dbs(Tx, Options)
+ try
+ UserAcc1 = maybe_stop(UserFun({meta, []}, UserAcc0)),
+ UserAcc2 = fabric2_fdb:list_dbs(
+ Tx,
+ FoldFun,
+ UserAcc1,
+ Options
+ ),
+ {ok, maybe_stop(UserFun(complete, UserAcc2))}
+ catch throw:{stop, FinalUserAcc} ->
+ {ok, FinalUserAcc}
+ end
end).
@@ -406,6 +429,7 @@ open_doc(#{} = Db, <<?LOCAL_DOC_PREFIX, _/binary>> = DocId, _Options) ->
open_doc(#{} = Db, DocId, Options) ->
NeedsTreeOpts = [revs_info, conflicts, deleted_conflicts],
NeedsTree = (Options -- NeedsTreeOpts /= Options),
+ OpenDeleted = lists:member(deleted, Options),
fabric2_fdb:transactional(Db, fun(TxDb) ->
Revs = case NeedsTree of
true -> fabric2_fdb:get_all_revs(TxDb, DocId);
@@ -414,6 +438,8 @@ open_doc(#{} = Db, DocId, Options) ->
if Revs == [] -> {not_found, missing}; true ->
#{winner := true} = RI = lists:last(Revs),
case fabric2_fdb:get_doc_body(TxDb, DocId, RI) of
+ #doc{deleted = true} when not OpenDeleted ->
+ {not_found, deleted};
#doc{} = Doc ->
apply_open_doc_opts(Doc, Revs, Options);
Else ->
@@ -451,8 +477,10 @@ open_doc_revs(Db, DocId, Revs, Options) ->
rev_path => RevPath
},
case fabric2_fdb:get_doc_body(TxDb, DocId, RevInfo) of
- #doc{} = Doc -> {ok, Doc};
- Else -> {Else, {Pos, Rev}}
+ #doc{} = Doc ->
+ apply_open_doc_opts(Doc, AllRevInfos, Options);
+ Else ->
+ {Else, {Pos, Rev}}
end
end
end, Found),
@@ -615,9 +643,35 @@ fold_docs(Db, UserFun, UserAcc) ->
fold_docs(Db, UserFun, UserAcc, []).
-fold_docs(Db, UserFun, UserAcc, Options) ->
+fold_docs(Db, UserFun, UserAcc0, Options) ->
fabric2_fdb:transactional(Db, fun(TxDb) ->
- fabric2_fdb:fold_docs(TxDb, UserFun, UserAcc, Options)
+ try
+ #{
+ db_prefix := DbPrefix
+ } = TxDb,
+
+ Prefix = erlfdb_tuple:pack({?DB_ALL_DOCS}, DbPrefix),
+ DocCount = get_doc_count(TxDb),
+
+ UserAcc1 = maybe_stop(UserFun({meta, [
+ {total, DocCount},
+ {offset, null}
+ ]}, UserAcc0)),
+
+ UserAcc2 = fabric2_fdb:fold_range(TxDb, Prefix, fun({K, V}, Acc) ->
+ {DocId} = erlfdb_tuple:unpack(K, Prefix),
+ RevId = erlfdb_tuple:unpack(V),
+ maybe_stop(UserFun({row, [
+ {id, DocId},
+ {key, DocId},
+ {value, {[{rev, couch_doc:rev_to_str(RevId)}]}}
+ ]}, Acc))
+ end, UserAcc1, Options),
+
+ {ok, maybe_stop(UserFun(complete, UserAcc2))}
+ catch throw:{stop, FinalUserAcc} ->
+ {ok, FinalUserAcc}
+ end
end).
@@ -627,7 +681,44 @@ fold_changes(Db, SinceSeq, UserFun, UserAcc) ->
fold_changes(Db, SinceSeq, UserFun, UserAcc, Options) ->
fabric2_fdb:transactional(Db, fun(TxDb) ->
- fabric2_fdb:fold_changes(TxDb, SinceSeq, UserFun, UserAcc, Options)
+ try
+ #{
+ db_prefix := DbPrefix
+ } = TxDb,
+
+ Prefix = erlfdb_tuple:pack({?DB_CHANGES}, DbPrefix),
+
+ Dir = case fabric2_util:get_value(dir, Options, fwd) of
+ rev -> rev;
+ _ -> fwd
+ end,
+
+ StartKey = get_since_seq(TxDb, Dir, SinceSeq),
+ EndKey = case Dir of
+ rev -> fabric2_util:seq_zero_vs();
+ _ -> fabric2_util:seq_max_vs()
+ end,
+ FoldOpts = [
+ {start_key, StartKey},
+ {end_key, EndKey}
+ ] ++ Options,
+
+ {ok, fabric2_fdb:fold_range(TxDb, Prefix, fun({K, V}, Acc) ->
+ {SeqVS} = erlfdb_tuple:unpack(K, Prefix),
+ {DocId, Deleted, RevId} = erlfdb_tuple:unpack(V),
+
+ Change = #{
+ id => DocId,
+ sequence => fabric2_fdb:vs_to_seq(SeqVS),
+ rev_id => RevId,
+ deleted => Deleted
+ },
+
+ maybe_stop(UserFun(Change, Acc))
+ end, UserAcc, FoldOpts)}
+ catch throw:{stop, FinalUserAcc} ->
+ {ok, FinalUserAcc}
+ end
end).
@@ -796,7 +887,6 @@ apply_open_doc_opts(Doc, Revs, Options) ->
IncludeConflicts = lists:member(conflicts, Options),
IncludeDelConflicts = lists:member(deleted_conflicts, Options),
IncludeLocalSeq = lists:member(local_seq, Options),
- ReturnDeleted = lists:member(deleted, Options),
% This revs_info becomes fairly useless now that we're
% not keeping old document bodies around...
@@ -827,14 +917,7 @@ apply_open_doc_opts(Doc, Revs, Options) ->
[{local_seq, fabric2_fdb:vs_to_seq(SeqVS)}]
end,
- case Doc#doc.deleted and not ReturnDeleted of
- true ->
- {not_found, deleted};
- false ->
- {ok, Doc#doc{
- meta = Meta1 ++ Meta2 ++ Meta3 ++ Meta4
- }}
- end.
+ {ok, Doc#doc{meta = Meta1 ++ Meta2 ++ Meta3 ++ Meta4}}.
filter_found_revs(RevInfo, Revs) ->
@@ -1289,6 +1372,26 @@ check_duplicate_attachments(#doc{atts = Atts}) ->
end, ordsets:new(), Atts).
+get_since_seq(Db, rev, <<>>) ->
+ get_since_seq(Db, rev, now);
+
+get_since_seq(_Db, _Dir, Seq) when Seq == <<>>; Seq == <<"0">>; Seq == 0->
+ fabric2_util:seq_zero_vs();
+
+get_since_seq(Db, Dir, Seq) when Seq == now; Seq == <<"now">> ->
+ CurrSeq = fabric2_fdb:get_last_change(Db),
+ get_since_seq(Db, Dir, CurrSeq);
+
+get_since_seq(_Db, _Dir, Seq) when is_binary(Seq), size(Seq) == 24 ->
+ fabric2_fdb:next_vs(fabric2_fdb:seq_to_vs(Seq));
+
+get_since_seq(Db, Dir, List) when is_list(List) ->
+ get_since_seq(Db, Dir, list_to_binary(List));
+
+get_since_seq(_Db, _Dir, Seq) ->
+ erlang:error({invalid_since_seq, Seq}).
+
+
get_leaf_path(Pos, Rev, [{Pos, [{Rev, _RevInfo} | LeafPath]} | _]) ->
LeafPath;
get_leaf_path(Pos, Rev, [_WrongLeaf | RestLeafs]) ->
@@ -1353,3 +1456,8 @@ rev(Rev) when is_list(Rev); is_binary(Rev) ->
rev({Seq, Hash} = Rev) when is_integer(Seq), is_binary(Hash) ->
Rev.
+
+maybe_stop({ok, Acc}) ->
+ Acc;
+maybe_stop({stop, Acc}) ->
+ throw({stop, Acc}).
diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl
index 4b0182646..670ce8b49 100644
--- a/src/fabric/src/fabric2_fdb.erl
+++ b/src/fabric/src/fabric2_fdb.erl
@@ -24,7 +24,7 @@
delete/1,
exists/1,
- list_dbs/2,
+ list_dbs/4,
get_info/1,
get_config/1,
@@ -50,11 +50,13 @@
read_attachment/3,
write_attachment/3,
- fold_docs/4,
- fold_changes/5,
get_last_change/1,
+ fold_range/5,
+
vs_to_seq/1,
+ seq_to_vs/1,
+ next_vs/1,
debug_cluster/0,
debug_cluster/2
@@ -254,16 +256,15 @@ exists(#{name := DbName} = Db) when is_binary(DbName) ->
end.
-list_dbs(Tx, _Options) ->
+list_dbs(Tx, Callback, AccIn, Options) ->
Root = erlfdb_directory:root(),
CouchDB = erlfdb_directory:create_or_open(Tx, Root, [<<"couchdb">>]),
LayerPrefix = erlfdb_directory:get_name(CouchDB),
- {Start, End} = erlfdb_tuple:range({?ALL_DBS}, LayerPrefix),
- Future = erlfdb:get_range(Tx, Start, End),
- lists:map(fun({K, _V}) ->
- {?ALL_DBS, DbName} = erlfdb_tuple:unpack(K, LayerPrefix),
- DbName
- end, erlfdb:wait(Future)).
+ Prefix = erlfdb_tuple:pack({?ALL_DBS}, LayerPrefix),
+ fold_range({tx, Tx}, Prefix, fun({K, _V}, Acc) ->
+ {DbName} = erlfdb_tuple:unpack(K, Prefix),
+ Callback(DbName, Acc)
+ end, AccIn, Options).
get_info(#{} = Db) ->
@@ -508,24 +509,26 @@ write_doc(#{} = Db0, Doc, NewWinner0, OldWinner, ToUpdate, ToRemove) ->
UpdateStatus = case {OldWinner, NewWinner} of
{not_found, #{deleted := false}} ->
created;
+ {not_found, #{deleted := true}} ->
+ deleted;
{#{deleted := true}, #{deleted := false}} ->
recreated;
{#{deleted := false}, #{deleted := false}} ->
updated;
{#{deleted := false}, #{deleted := true}} ->
+ deleted;
+ {#{deleted := true}, #{deleted := true}} ->
deleted
end,
case UpdateStatus of
- Status when Status == created orelse Status == recreated ->
- ADKey = erlfdb_tuple:pack({?DB_ALL_DOCS, DocId}, DbPrefix),
- ADVal = erlfdb_tuple:pack(NewRevId),
- ok = erlfdb:set(Tx, ADKey, ADVal);
deleted ->
ADKey = erlfdb_tuple:pack({?DB_ALL_DOCS, DocId}, DbPrefix),
ok = erlfdb:clear(Tx, ADKey);
- updated ->
- ok
+ _ ->
+ ADKey = erlfdb_tuple:pack({?DB_ALL_DOCS, DocId}, DbPrefix),
+ ADVal = erlfdb_tuple:pack(NewRevId),
+ ok = erlfdb:set(Tx, ADKey, ADVal)
end,
% _changes
@@ -640,84 +643,6 @@ write_attachment(#{} = Db, DocId, Data) when is_binary(Data) ->
{ok, AttId}.
-fold_docs(#{} = Db, UserFun, UserAcc0, Options) ->
- #{
- tx := Tx,
- db_prefix := DbPrefix
- } = ensure_current(Db),
-
- {Reverse, Start, End} = get_dir_and_bounds(DbPrefix, Options),
-
- DocCountKey = erlfdb_tuple:pack({?DB_STATS, <<"doc_count">>}, DbPrefix),
- DocCountBin = erlfdb:wait(erlfdb:get(Tx, DocCountKey)),
-
- try
- UserAcc1 = maybe_stop(UserFun({meta, [
- {total, ?bin2uint(DocCountBin)},
- {offset, null}
- ]}, UserAcc0)),
-
- UserAcc2 = erlfdb:fold_range(Tx, Start, End, fun({K, V}, UserAccIn) ->
- {?DB_ALL_DOCS, DocId} = erlfdb_tuple:unpack(K, DbPrefix),
- RevId = erlfdb_tuple:unpack(V),
- maybe_stop(UserFun({row, [
- {id, DocId},
- {key, DocId},
- {value, couch_doc:rev_to_str(RevId)}
- ]}, UserAccIn))
- end, UserAcc1, [{reverse, Reverse}] ++ Options),
-
- {ok, maybe_stop(UserFun(complete, UserAcc2))}
- catch throw:{stop, FinalUserAcc} ->
- {ok, FinalUserAcc}
- end.
-
-
-fold_changes(#{} = Db, SinceSeq0, UserFun, UserAcc0, Options) ->
- #{
- tx := Tx,
- db_prefix := DbPrefix
- } = ensure_current(Db),
-
- SinceSeq1 = get_since_seq(SinceSeq0),
-
- Reverse = case fabric2_util:get_value(dir, Options, fwd) of
- fwd -> false;
- rev -> true
- end,
-
- {Start0, End0} = case Reverse of
- false -> {SinceSeq1, fabric2_util:seq_max_vs()};
- true -> {fabric2_util:seq_zero_vs(), SinceSeq1}
- end,
-
- Start1 = erlfdb_tuple:pack({?DB_CHANGES, Start0}, DbPrefix),
- End1 = erlfdb_tuple:pack({?DB_CHANGES, End0}, DbPrefix),
-
- {Start, End} = case Reverse of
- false -> {erlfdb_key:first_greater_than(Start1), End1};
- true -> {Start1, erlfdb_key:first_greater_than(End1)}
- end,
-
- try
- {ok, erlfdb:fold_range(Tx, Start, End, fun({K, V}, UserAccIn) ->
- {?DB_CHANGES, SeqVS} = erlfdb_tuple:unpack(K, DbPrefix),
- {DocId, Deleted, RevId} = erlfdb_tuple:unpack(V),
-
- Change = #{
- id => DocId,
- sequence => vs_to_seq(SeqVS),
- rev_id => RevId,
- deleted => Deleted
- },
-
- maybe_stop(UserFun(Change, UserAccIn))
- end, UserAcc0, [{reverse, Reverse}] ++ Options)}
- catch throw:{stop, FinalUserAcc} ->
- {ok, FinalUserAcc}
- end.
-
-
get_last_change(#{} = Db) ->
#{
tx := Tx,
@@ -735,17 +660,57 @@ get_last_change(#{} = Db) ->
end.
-maybe_stop({ok, Acc}) ->
- Acc;
-maybe_stop({stop, Acc}) ->
- throw({stop, Acc}).
+fold_range(#{} = Db, RangePrefix, Callback, Acc, Options) ->
+ #{
+ tx := Tx
+ } = ensure_current(Db),
+ fold_range({tx, Tx}, RangePrefix, Callback, Acc, Options);
+
+fold_range({tx, Tx}, RangePrefix, UserCallback, UserAcc, Options) ->
+ case fabric2_util:get_value(limit, Options) of
+ 0 ->
+ % FoundationDB treats a limit of 0 as unlimited
+ % so we have to guard for that here.
+ UserAcc;
+ _ ->
+ {Start, End, Skip, FoldOpts} = get_fold_opts(RangePrefix, Options),
+ Callback = fun fold_range_cb/2,
+ Acc = {skip, Skip, UserCallback, UserAcc},
+ {skip, _, UserCallback, OutAcc} =
+ erlfdb:fold_range(Tx, Start, End, Callback, Acc, FoldOpts),
+ OutAcc
+ end.
-vs_to_seq(VS) ->
+vs_to_seq(VS) when is_tuple(VS) ->
+ % 51 is the versionstamp type tag
<<51:8, SeqBin:12/binary>> = erlfdb_tuple:pack({VS}),
fabric2_util:to_hex(SeqBin).
+seq_to_vs(Seq) when is_binary(Seq) ->
+ Seq1 = fabric2_util:from_hex(Seq),
+ % 51 is the versionstamp type tag
+ Seq2 = <<51:8, Seq1/binary>>,
+ {VS} = erlfdb_tuple:unpack(Seq2),
+ VS.
+
+
+next_vs({versionstamp, VS, Batch, TxId}) ->
+ {V, B, T} = case TxId =< 65535 of
+ true ->
+ {VS, Batch, TxId + 1};
+ false ->
+ case Batch =< 65535 of
+ true ->
+ {VS, Batch + 1, 0};
+ false ->
+ {VS + 1, 0, 0}
+ end
+ end,
+ {versionstamp, V, B, T}.
+
+
debug_cluster() ->
debug_cluster(<<>>, <<16#FE, 16#FF, 16#FF>>).
@@ -753,7 +718,7 @@ debug_cluster() ->
debug_cluster(Start, End) ->
transactional(fun(Tx) ->
lists:foreach(fun({Key, Val}) ->
- io:format("~s => ~s~n", [
+ io:format(standard_error, "~s => ~s~n", [
string:pad(erlfdb_util:repr(Key), 60),
erlfdb_util:repr(Val)
])
@@ -790,7 +755,7 @@ load_validate_doc_funs(#{} = Db) ->
{end_key, <<"_design0">>}
],
- {ok, Infos1} = fold_docs(Db, FoldFun, [], Options),
+ {ok, Infos1} = fabric2_db:fold_docs(Db, FoldFun, [], Options),
Infos2 = lists:map(fun(Info) ->
#{
@@ -999,11 +964,12 @@ chunkify_attachment(Data) ->
end.
-get_dir_and_bounds(DbPrefix, Options) ->
- Reverse = case fabric2_util:get_value(dir, Options, fwd) of
- fwd -> false;
- rev -> true
+get_fold_opts(RangePrefix, Options) ->
+ Reverse = case fabric2_util:get_value(dir, Options) of
+ rev -> true;
+ _ -> false
end,
+
StartKey0 = fabric2_util:get_value(start_key, Options),
EndKeyGt = fabric2_util:get_value(end_key_gt, Options),
EndKey0 = fabric2_util:get_value(end_key, Options, EndKeyGt),
@@ -1019,17 +985,17 @@ get_dir_and_bounds(DbPrefix, Options) ->
% Set the maximum bounds for the start and endkey
StartKey2 = case StartKey1 of
- undefined -> {?DB_ALL_DOCS};
- SK2 when is_binary(SK2) -> {?DB_ALL_DOCS, SK2}
+ undefined -> <<>>;
+ SK2 -> SK2
end,
EndKey2 = case EndKey1 of
- undefined -> {?DB_ALL_DOCS, <<16#FF>>};
- EK2 when is_binary(EK2) -> {?DB_ALL_DOCS, EK2}
+ undefined -> <<255>>;
+ EK2 -> EK2
end,
- StartKey3 = erlfdb_tuple:pack(StartKey2, DbPrefix),
- EndKey3 = erlfdb_tuple:pack(EndKey2, DbPrefix),
+ StartKey3 = erlfdb_tuple:pack({StartKey2}, RangePrefix),
+ EndKey3 = erlfdb_tuple:pack({EndKey2}, RangePrefix),
% FoundationDB ranges are applied as SK <= key < EK
% By default, CouchDB is SK <= key <= EK with the
@@ -1056,26 +1022,46 @@ get_dir_and_bounds(DbPrefix, Options) ->
EndKey3
end,
- {Reverse, StartKey4, EndKey4}.
+ Skip = case fabric2_util:get_value(skip, Options) of
+ S when is_integer(S), S >= 0 -> S;
+ _ -> 0
+ end,
+ Limit = case fabric2_util:get_value(limit, Options) of
+ L when is_integer(L), L >= 0 -> [{limit, L + Skip}];
+ undefined -> []
+ end,
-get_since_seq(Seq) when Seq == <<>>; Seq == <<"0">>; Seq == 0->
- fabric2_util:seq_zero_vs();
+ TargetBytes = case fabric2_util:get_value(target_bytes, Options) of
+ T when is_integer(T), T >= 0 -> [{target_bytes, T}];
+ undefined -> []
+ end,
-get_since_seq(Seq) when Seq == now; Seq == <<"now">> ->
- fabric2_util:seq_max_vs();
+ StreamingMode = case fabric2_util:get_value(streaming_mode, Options) of
+ undefined -> [];
+ Name when is_atom(Name) -> [{streaming_mode, Name}]
+ end,
+
+ Snapshot = case fabric2_util:get_value(snapshot, Options) of
+ undefined -> [];
+ B when is_boolean(B) -> [{snapshot, B}]
+ end,
+
+ OutOpts = [{reverse, Reverse}]
+ ++ Limit
+ ++ TargetBytes
+ ++ StreamingMode
+ ++ Snapshot,
+
+ {StartKey4, EndKey4, Skip, OutOpts}.
-get_since_seq(Seq) when is_binary(Seq), size(Seq) == 24 ->
- Seq1 = fabric2_util:from_hex(Seq),
- Seq2 = <<51:8, Seq1/binary>>,
- {SeqVS} = erlfdb_tuple:unpack(Seq2),
- SeqVS;
-get_since_seq(List) when is_list(List) ->
- get_since_seq(list_to_binary(List));
+fold_range_cb(KV, {skip, 0, Callback, Acc}) ->
+ NewAcc = Callback(KV, Acc),
+ {skip, 0, Callback, NewAcc};
-get_since_seq(Seq) ->
- erlang:error({invalid_since_seq, Seq}).
+fold_range_cb(_KV, {skip, N, Callback, Acc}) when is_integer(N), N > 0 ->
+ {skip, N - 1, Callback, Acc}.
get_db_handle() ->
diff --git a/src/fabric/test/fabric2_doc_fold_tests.erl b/src/fabric/test/fabric2_doc_fold_tests.erl
index caa5f925a..ee0180f14 100644
--- a/src/fabric/test/fabric2_doc_fold_tests.erl
+++ b/src/fabric/test/fabric2_doc_fold_tests.erl
@@ -34,7 +34,10 @@ doc_fold_test_() ->
fun fold_docs_with_start_key/1,
fun fold_docs_with_end_key/1,
fun fold_docs_with_both_keys_the_same/1,
- fun fold_docs_with_different_keys/1
+ fun fold_docs_with_different_keys/1,
+ fun fold_docs_with_limit/1,
+ fun fold_docs_with_skip/1,
+ fun fold_docs_with_skip_and_limit/1
]}
}
}.
@@ -50,7 +53,7 @@ setup() ->
body = {[{<<"value">>, Val}]}
},
{ok, Rev} = fabric2_db:update_doc(Db, Doc, []),
- {DocId, couch_doc:rev_to_str(Rev)}
+ {DocId, {[{rev, couch_doc:rev_to_str(Rev)}]}}
end, lists:seq(1, ?DOC_COUNT)),
{Db, lists:sort(DocIdRevs), Ctx}.
@@ -108,11 +111,58 @@ fold_docs_with_different_keys({Db, DocIdRevs, _}) ->
end, lists:seq(1, 500)).
+fold_docs_with_limit({Db, DocIdRevs, _}) ->
+ lists:foreach(fun(Limit) ->
+ Opts1 = [{limit, Limit}],
+ {ok, {?DOC_COUNT, Rows1}} =
+ fabric2_db:fold_docs(Db, fun fold_fun/2, [], Opts1),
+ ?assertEqual(lists:sublist(DocIdRevs, Limit), lists:reverse(Rows1)),
+
+ Opts2 = [{dir, rev} | Opts1],
+ {ok, {?DOC_COUNT, Rows2}} =
+ fabric2_db:fold_docs(Db, fun fold_fun/2, [], Opts2),
+ ?assertEqual(
+ lists:sublist(lists:reverse(DocIdRevs), Limit),
+ lists:reverse(Rows2)
+ )
+ end, lists:seq(0, 51)).
+
+
+fold_docs_with_skip({Db, DocIdRevs, _}) ->
+ lists:foreach(fun(Skip) ->
+ Opts1 = [{skip, Skip}],
+ {ok, {?DOC_COUNT, Rows1}} =
+ fabric2_db:fold_docs(Db, fun fold_fun/2, [], Opts1),
+ Expect1 = case Skip > length(DocIdRevs) of
+ true -> [];
+ false -> lists:nthtail(Skip, DocIdRevs)
+ end,
+ ?assertEqual(Expect1, lists:reverse(Rows1)),
+
+ Opts2 = [{dir, rev} | Opts1],
+ {ok, {?DOC_COUNT, Rows2}} =
+ fabric2_db:fold_docs(Db, fun fold_fun/2, [], Opts2),
+ Expect2 = case Skip > length(DocIdRevs) of
+ true -> [];
+ false -> lists:nthtail(Skip, lists:reverse(DocIdRevs))
+ end,
+ ?assertEqual(Expect2, lists:reverse(Rows2))
+ end, lists:seq(0, 51)).
+
+
+fold_docs_with_skip_and_limit({Db, DocIdRevs, _}) ->
+ lists:foreach(fun(_) ->
+ check_skip_and_limit(Db, [], DocIdRevs),
+ check_skip_and_limit(Db, [{dir, rev}], lists:reverse(DocIdRevs))
+ end, lists:seq(1, 100)).
+
+
check_all_combos(Db, StartKey, EndKey, Rows) ->
Opts1 = make_opts(fwd, StartKey, EndKey, true),
{ok, {?DOC_COUNT, Rows1}} =
fabric2_db:fold_docs(Db, fun fold_fun/2, [], Opts1),
?assertEqual(lists:reverse(Rows), Rows1),
+ check_skip_and_limit(Db, Opts1, Rows),
Opts2 = make_opts(fwd, StartKey, EndKey, false),
{ok, {?DOC_COUNT, Rows2}} =
@@ -121,11 +171,13 @@ check_all_combos(Db, StartKey, EndKey, Rows) ->
lists:reverse(all_but_last(Rows))
end,
?assertEqual(Expect2, Rows2),
+ check_skip_and_limit(Db, Opts2, lists:reverse(Expect2)),
Opts3 = make_opts(rev, StartKey, EndKey, true),
{ok, {?DOC_COUNT, Rows3}} =
fabric2_db:fold_docs(Db, fun fold_fun/2, [], Opts3),
?assertEqual(Rows, Rows3),
+ check_skip_and_limit(Db, Opts3, lists:reverse(Rows)),
Opts4 = make_opts(rev, StartKey, EndKey, false),
{ok, {?DOC_COUNT, Rows4}} =
@@ -133,8 +185,34 @@ check_all_combos(Db, StartKey, EndKey, Rows) ->
Expect4 = if StartKey == undefined -> Rows; true ->
tl(Rows)
end,
- ?assertEqual(Expect4, Rows4).
+ ?assertEqual(Expect4, Rows4),
+ check_skip_and_limit(Db, Opts4, lists:reverse(Expect4)).
+
+
+check_skip_and_limit(Db, Opts, []) ->
+ Skip = rand:uniform(?DOC_COUNT + 1) - 1,
+ Limit = rand:uniform(?DOC_COUNT + 1) - 1,
+ NewOpts = [{skip, Skip}, {limit, Limit} | Opts],
+ {ok, {?DOC_COUNT, OutRows}} =
+ fabric2_db:fold_docs(Db, fun fold_fun/2, [], NewOpts),
+ ?assertEqual([], OutRows);
+
+check_skip_and_limit(Db, Opts, Rows) ->
+ Skip = rand:uniform(length(Rows) + 1) - 1,
+ Limit = rand:uniform(?DOC_COUNT + 1 - Skip) - 1,
+
+ ExpectRows = case Skip >= length(Rows) of
+ true ->
+ [];
+ false ->
+ lists:sublist(lists:nthtail(Skip, Rows), Limit)
+ end,
+ SkipLimitOpts = [{skip, Skip}, {limit, Limit} | Opts],
+ {ok, {?DOC_COUNT, RevRows}} =
+ fabric2_db:fold_docs(Db, fun fold_fun/2, [], SkipLimitOpts),
+ OutRows = lists:reverse(RevRows),
+ ?assertEqual(ExpectRows, OutRows).
make_opts(fwd, StartKey, EndKey, InclusiveEnd) ->
diff --git a/test/elixir/test/all_docs_test.exs b/test/elixir/test/all_docs_test.exs
index 9f6aeb61d..dab153a96 100644
--- a/test/elixir/test/all_docs_test.exs
+++ b/test/elixir/test/all_docs_test.exs
@@ -43,7 +43,8 @@ defmodule AllDocsTest do
# Check _all_docs offset
retry_until(fn ->
resp = Couch.get("/#{db_name}/_all_docs", query: %{:startkey => "\"2\""}).body
- assert resp["offset"] == 2
+ assert resp["offset"] == :null
+ assert Enum.at(resp["rows"], 0)["key"] == "2"
end)
# Confirm that queries may assume raw collation