summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul J. Davis <paul.joseph.davis@gmail.com>2020-07-29 10:34:48 -0500
committerPaul J. Davis <paul.joseph.davis@gmail.com>2020-09-30 10:08:44 -0500
commit69f0ba1cc0af0c6008f63fb7342a60efa794634b (patch)
tree56e33dc559de1198ed22962b8a4705b40272dd94
parentef6b60b760a79542df31df9c7ad64d737b860c92 (diff)
downloadcouchdb-69f0ba1cc0af0c6008f63fb7342a60efa794634b.tar.gz
Use ebtree for reduce functions
-rw-r--r--src/couch_views/src/couch_views.erl6
-rw-r--r--src/couch_views/src/couch_views_fdb.erl1
-rw-r--r--src/couch_views/src/couch_views_reader.erl159
-rw-r--r--src/couch_views/src/couch_views_trees.erl199
4 files changed, 327 insertions, 38 deletions
diff --git a/src/couch_views/src/couch_views.erl b/src/couch_views/src/couch_views.erl
index da8a142f9..2d916314f 100644
--- a/src/couch_views/src/couch_views.erl
+++ b/src/couch_views/src/couch_views.erl
@@ -161,12 +161,6 @@ maybe_update_view(TxDb, Mrst, false, _Args) ->
end.
-is_reduce_view(#mrargs{view_type = ViewType}) ->
- ViewType =:= red;
-is_reduce_view({Reduce, _, _}) ->
- Reduce =:= red.
-
-
to_mrargs(#mrargs{} = Args) ->
Args;
diff --git a/src/couch_views/src/couch_views_fdb.erl b/src/couch_views/src/couch_views_fdb.erl
index 28a60b872..b0fb82e85 100644
--- a/src/couch_views/src/couch_views_fdb.erl
+++ b/src/couch_views/src/couch_views_fdb.erl
@@ -299,7 +299,6 @@ reset_interactive_index(Db, Sig, _St) ->
{VS, ?INDEX_BUILDING}.
-
version_key(Db, Sig) ->
#{
db_prefix := DbPrefix
diff --git a/src/couch_views/src/couch_views_reader.erl b/src/couch_views/src/couch_views_reader.erl
index a785c7b35..3c5862749 100644
--- a/src/couch_views/src/couch_views_reader.erl
+++ b/src/couch_views/src/couch_views_reader.erl
@@ -23,7 +23,15 @@
-include_lib("fabric/include/fabric2.hrl").
-read(Db, Mrst0, ViewName, UserCallback, UserAcc0, Args) ->
+read(Db, Mrst, ViewName, UserCallback, UserAcc, Args) ->
+ ReadFun = case Args of
+ #mrargs{view_type = map} -> fun read_map_view/6;
+ #mrargs{view_type = red} -> fun read_red_view/6
+ end,
+ ReadFun(Db, Mrst, ViewName, UserCallback, UserAcc, Args).
+
+
+read_map_view(Db, Mrst0, ViewName, UserCallback, UserAcc0, Args) ->
try
fabric2_fdb:transactional(Db, fun(TxDb) ->
#mrst{
@@ -68,6 +76,79 @@ read(Db, Mrst0, ViewName, UserCallback, UserAcc0, Args) ->
end.
+read_red_view(Db, Mrst0, ViewName, UserCallback, UserAcc0, Args) ->
+ #mrst{
+ language = Lang,
+ views = Views
+ } = Mrst0,
+ {Idx, Lang, View0} = get_red_view(Lang, Args, ViewName, Views),
+ Mrst1 = Mrst0#mrst{views = [View0]},
+ ReadOpts = [{read_only, Idx}],
+ try
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ #mrst{
+ language = Lang,
+ views = [View1]
+ } = Mrst = couch_views_trees:open(TxDb, Mrst1, ReadOpts),
+
+ #mrargs{
+ extra = Extra
+ } = Args,
+
+ Fun = fun handle_red_row/3,
+
+ Meta = get_red_meta(TxDb, Mrst, View1, Args),
+ UserAcc1 = maybe_stop(UserCallback(Meta, UserAcc0)),
+
+ Finalizer = case couch_util:get_value(finalizer, Extra) of
+ undefined ->
+ {_, FunSrc} = lists:nth(Idx, View1#mrview.reduce_funs),
+ FunSrc;
+ CustomFun->
+ CustomFun
+ end,
+
+ Acc0 = #{
+ db => TxDb,
+ skip => Args#mrargs.skip,
+ limit => Args#mrargs.limit,
+ mrargs => undefined,
+ finalizer => Finalizer,
+ red_idx => Idx,
+ language => Lang,
+ callback => UserCallback,
+ acc => UserAcc1
+ },
+
+ Acc1 = lists:foldl(fun(KeyArgs, KeyAcc0) ->
+ Opts = mrargs_to_fdb_options(KeyArgs),
+ KeyAcc1 = KeyAcc0#{
+ mrargs := KeyArgs
+ },
+ couch_views_trees:fold_red_idx(
+ TxDb,
+ View1,
+ Idx,
+ Opts,
+ Fun,
+ KeyAcc1
+ )
+ end, Acc0, expand_keys_args(Args)),
+
+ #{
+ acc := UserAcc2
+ } = Acc1,
+ {ok, maybe_stop(UserCallback(complete, UserAcc2))}
+ end)
+ catch
+ throw:{complete, Out} ->
+ {_, Final} = UserCallback(complete, Out),
+ {ok, Final};
+ throw:{done, Out} ->
+ {ok, Out}
+ end.
+
+
get_map_meta(TxDb, Mrst, View, #mrargs{update_seq = true}) ->
TotalRows = couch_views_trees:get_row_count(TxDb, View),
ViewSeq = couch_views_fdb:get_update_seq(TxDb, Mrst),
@@ -78,6 +159,14 @@ get_map_meta(TxDb, _Mrst, View, #mrargs{}) ->
{meta, [{total, TotalRows}, {offset, null}]}.
+get_red_meta(TxDb, Mrst, _View, #mrargs{update_seq = true}) ->
+ ViewSeq = couch_views_fdb:get_update_seq(TxDb, Mrst),
+ {meta, [{update_seq, ViewSeq}]};
+
+get_red_meta(_TxDb, _Mrst, _View, #mrargs{}) ->
+ {meta, []}.
+
+
handle_map_row(_DocId, _Key, _Value, #{skip := Skip} = Acc) when Skip > 0 ->
Acc#{skip := Skip - 1};
@@ -115,6 +204,38 @@ handle_map_row(DocId, Key, Value, Acc) ->
Acc#{limit := Limit - 1, acc := UserAcc1}.
+handle_red_row(_Key, _Red, #{skip := Skip} = Acc) when Skip > 0 ->
+ Acc#{skip := Skip - 1};
+
+handle_red_row(_Key, _Value, #{limit := 0, acc := UserAcc}) ->
+ throw({complete, UserAcc});
+
+handle_red_row(Key0, Value0, Acc) ->
+ #{
+ limit := Limit,
+ finalizer := Finalizer,
+ callback := UserCallback,
+ acc := UserAcc0
+ } = Acc,
+
+ Key1 = case Key0 of
+ undefined -> null;
+ _ -> Key0
+ end,
+ Value1 = maybe_finalize(Finalizer, Value0),
+ Row = [{key, Key1}, {value, Value1}],
+
+ UserAcc1 = maybe_stop(UserCallback({row, Row}, UserAcc0)),
+ Acc#{limit := Limit - 1, acc := UserAcc1}.
+
+
+maybe_finalize(null, Red) ->
+ Red;
+maybe_finalize(Finalizer, Red) ->
+ {ok, Finalized} = couch_query_servers:finalize(Finalizer, Red),
+ Finalized.
+
+
get_map_view(Lang, Args, ViewName, Views) ->
case couch_mrview_util:extract_view(Lang, Args, ViewName, Views) of
{map, View, _Args} -> View;
@@ -122,6 +243,13 @@ get_map_view(Lang, Args, ViewName, Views) ->
end.
+get_red_view(Lang, Args, ViewName, Views) ->
+ case couch_mrview_util:extract_view(Lang, Args, ViewName, Views) of
+ {red, {Idx, Lang, View}, _} -> {Idx, Lang, View};
+ _ -> throw({not_found, missing_named_view})
+ end.
+
+
expand_keys_args(#mrargs{keys = undefined} = Args) ->
[Args];
@@ -136,12 +264,14 @@ expand_keys_args(#mrargs{keys = Keys} = Args) ->
mrargs_to_fdb_options(Args) ->
#mrargs{
+ view_type = ViewType,
start_key = StartKey,
start_key_docid = StartKeyDocId,
end_key = EndKey,
end_key_docid = EndKeyDocId0,
direction = Direction,
- inclusive_end = InclusiveEnd
+ inclusive_end = InclusiveEnd,
+ group_level = GroupLevel
} = Args,
StartKeyOpts = if StartKey == undefined -> []; true ->
@@ -160,10 +290,33 @@ mrargs_to_fdb_options(Args) ->
[{end_key, {EndKey, EndKeyDocId}}]
end,
+ GroupFunOpt = make_group_key_fun(ViewType, GroupLevel),
+
[
{dir, Direction},
{inclusive_end, InclusiveEnd}
- ] ++ StartKeyOpts ++ EndKeyOpts.
+ ] ++ StartKeyOpts ++ EndKeyOpts ++ GroupFunOpt.
+
+
+make_group_key_fun(map, _) ->
+ [];
+
+make_group_key_fun(red, exact) ->
+ [
+ {group_key_fun, fun({Key, _DocId}) -> Key end}
+ ];
+
+make_group_key_fun(red, 0) ->
+ [
+ {group_key_fun, group_all}
+ ];
+
+make_group_key_fun(red, N) when is_integer(N), N > 0 ->
+ GKFun = fun
+ ({Key, _DocId}) when is_list(Key) -> lists:sublist(Key, N);
+ ({Key, _DocId}) -> Key
+ end,
+ [{group_key_fun, GKFun}].
maybe_stop({ok, Acc}) -> Acc;
diff --git a/src/couch_views/src/couch_views_trees.erl b/src/couch_views/src/couch_views_trees.erl
index 7ce350506..b45750be9 100644
--- a/src/couch_views/src/couch_views_trees.erl
+++ b/src/couch_views/src/couch_views_trees.erl
@@ -14,11 +14,13 @@
-export([
open/2,
+ open/3,
get_row_count/2,
get_kv_size/2,
fold_map_idx/5,
+ fold_red_idx/6,
update_views/3
]).
@@ -35,6 +37,10 @@
open(TxDb, Mrst) ->
+ open(TxDb, Mrst, []).
+
+
+open(TxDb, Mrst, Options) ->
#mrst{
sig = Sig,
language = Lang,
@@ -42,7 +48,7 @@ open(TxDb, Mrst) ->
} = Mrst,
Mrst#mrst{
id_btree = open_id_tree(TxDb, Sig),
- views = [open_view_tree(TxDb, Sig, Lang, V) || V <- Views]
+ views = [open_view_tree(TxDb, Sig, Lang, V, Options) || V <- Views]
}.
@@ -50,7 +56,7 @@ get_row_count(TxDb, View) ->
#{
tx := Tx
} = TxDb,
- {Count, _} = ebtree:full_reduce(Tx, View#mrview.btree),
+ {Count, _, _} = ebtree:full_reduce(Tx, View#mrview.btree),
Count.
@@ -58,7 +64,7 @@ get_kv_size(TxDb, View) ->
#{
tx := Tx
} = TxDb,
- {_, TotalSize} = ebtree:full_reduce(Tx, View#mrview.btree),
+ {_, TotalSize, _} = ebtree:full_reduce(Tx, View#mrview.btree),
TotalSize.
@@ -122,6 +128,74 @@ fold_map_idx(TxDb, View, Options, Callback, Acc0) ->
end.
+fold_red_idx(TxDb, View, Idx, Options, Callback, Acc0) ->
+ #{
+ tx := Tx
+ } = TxDb,
+ #mrview{
+ btree = Btree
+ } = View,
+
+ {Dir, StartKey, EndKey, InclusiveEnd, GroupKeyFun} = to_red_opts(Options),
+
+ Wrapper = fun({GroupKey, Reduction}, WAcc) ->
+ {_RowCount, _RowSize, UserReds} = Reduction,
+ RedValue = lists:nth(Idx, UserReds),
+ Callback(GroupKey, RedValue, WAcc)
+ end,
+
+ case {GroupKeyFun, Dir} of
+ {group_all, fwd} ->
+ EBtreeOpts = [
+ {dir, fwd},
+ {inclusive_end, InclusiveEnd}
+ ],
+ Reduction = ebtree:reduce(Tx, Btree, StartKey, EndKey, EBtreeOpts),
+ Wrapper({null, Reduction}, Acc0);
+ {F, fwd} when is_function(F) ->
+ EBtreeOpts = [
+ {dir, fwd},
+ {inclusive_end, InclusiveEnd}
+ ],
+ ebtree:group_reduce(
+ Tx,
+ Btree,
+ StartKey,
+ EndKey,
+ GroupKeyFun,
+ Wrapper,
+ Acc0,
+ EBtreeOpts
+ );
+ {group_all, rev} ->
+ % Start/End keys swapped on purpose because ebtree. Also
+ % inclusive_start for same reason.
+ EBtreeOpts = [
+ {dir, rev},
+ {inclusive_start, InclusiveEnd}
+ ],
+ Reduction = ebtree:reduce(Tx, Btree, EndKey, StartKey, EBtreeOpts),
+ Wrapper({null, Reduction}, Acc0);
+ {F, rev} when is_function(F) ->
+ % Start/End keys swapped on purpose because ebtree. Also
+ % inclusive_start for same reason.
+ EBtreeOpts = [
+ {dir, rev},
+ {inclusive_start, InclusiveEnd}
+ ],
+ ebtree:group_reduce(
+ Tx,
+ Btree,
+ EndKey,
+ StartKey,
+ GroupKeyFun,
+ Wrapper,
+ Acc0,
+ EBtreeOpts
+ )
+ end.
+
+
update_views(TxDb, Mrst, Docs) ->
#{
tx := Tx
@@ -129,7 +203,7 @@ update_views(TxDb, Mrst, Docs) ->
% Get initial KV size
OldKVSize = lists:foldl(fun(View, SizeAcc) ->
- {_, Size} = ebtree:full_reduce(Tx, View#mrview.btree),
+ {_, Size, _} = ebtree:full_reduce(Tx, View#mrview.btree),
SizeAcc + Size
end, 0, Mrst#mrst.views),
@@ -156,7 +230,7 @@ update_views(TxDb, Mrst, Docs) ->
% Get new KV size after update
NewKVSize = lists:foldl(fun(View, SizeAcc) ->
- {_, Size} = ebtree:full_reduce(Tx, View#mrview.btree),
+ {_, Size, _} = ebtree:full_reduce(Tx, View#mrview.btree),
SizeAcc + Size
end, 0, Mrst#mrst.views),
@@ -176,7 +250,7 @@ open_id_tree(TxDb, Sig) ->
ebtree:open(Tx, Prefix, get_order(id_btree), TreeOpts).
-open_view_tree(TxDb, Sig, Lang, View) ->
+open_view_tree(TxDb, Sig, Lang, View, Options) ->
#{
tx := Tx,
db_prefix := DbPrefix
@@ -185,12 +259,21 @@ open_view_tree(TxDb, Sig, Lang, View) ->
id_num = ViewId
} = View,
Prefix = view_tree_prefix(DbPrefix, Sig, ViewId),
- TreeOpts = [
+ BaseOpts = [
{collate_fun, couch_views_util:collate_fun(View)},
- {reduce_fun, make_reduce_fun(Lang, View)},
- {persist_fun, fun couch_views_fdb:persist_chunks/3},
- {cache_fun, create_cache_fun({view, ViewId})}
+ {persist_fun, fun couch_views_fdb:persist_chunks/3}
],
+ ExtraOpts = case lists:keyfind(read_only, 1, Options) of
+ {read_only, Idx} ->
+ RedFun = make_read_only_reduce_fun(Lang, View, Idx),
+ [{reduce_fun, RedFun}];
+ false ->
+ [
+ {reduce_fun, make_reduce_fun(Lang, View)},
+ {cache_fun, create_cache_fun({view, ViewId})}
+ ]
+ end,
+ TreeOpts = BaseOpts ++ ExtraOpts,
View#mrview{
btree = ebtree:open(Tx, Prefix, get_order(view_btree), TreeOpts)
}.
@@ -210,27 +293,60 @@ min_order(V) ->
V + 1.
-make_reduce_fun(_Lang, #mrview{}) ->
+make_read_only_reduce_fun(Lang, View, NthRed) ->
+ RedFuns = [Src || {_, Src} <- View#mrview.reduce_funs],
+ if RedFuns /= [] -> ok; true ->
+ io:format(standard_error, "~p~n", [process_info(self(), current_stacktrace)])
+ end,
+ LPad = lists:duplicate(NthRed - 1, []),
+ RPad = lists:duplicate(length(RedFuns) - NthRed, []),
+ FunSrc = lists:nth(NthRed, RedFuns),
fun
- (KVs, _ReReduce = false) ->
+ (KVs0, _ReReduce = false) ->
+ KVs1 = detuple_kvs(expand_dupes(KVs0)),
+ {ok, Result} = couch_query_servers:reduce(Lang, [FunSrc], KVs1),
+ {0, 0, LPad ++ Result ++ RPad};
+ (Reductions, _ReReduce = true) ->
+ ExtractFun = fun(Reds) ->
+ {_Count, _Size, UReds} = Reds,
+ [lists:nth(NthRed, UReds)]
+ end,
+ UReds = lists:map(ExtractFun, Reductions),
+ {ok, Result} = case UReds of
+ [RedVal] ->
+ {ok, RedVal};
+ _ ->
+ couch_query_servers:rereduce(Lang, [FunSrc], UReds)
+ end,
+ {0, 0, LPad ++ Result ++ RPad}
+ end.
+
+
+make_reduce_fun(Lang, #mrview{} = View) ->
+ RedFuns = [Src || {_, Src} <- View#mrview.reduce_funs],
+ fun
+ (KVs0, _ReReduce = false) ->
+ KVs1 = expand_dupes(KVs0),
TotalSize = lists:foldl(fun({{K, _DocId}, V}, Acc) ->
KSize = couch_ejson_size:encoded_size(K),
- Acc + case V of
- {dups, Dups} ->
- lists:foldl(fun(D, DAcc) ->
- VSize = couch_ejson_size:encoded_size(D),
- DAcc + KSize + VSize
- end, 0, Dups);
- _ ->
- VSize = couch_ejson_size:encoded_size(V),
- KSize + VSize
- end
- end, 0, KVs),
- {length(KVs), TotalSize};
- (KRs, _ReReduce = true) ->
- lists:foldl(fun({Count, Size}, {CountAcc, SizeAcc}) ->
- {Count + CountAcc, Size + SizeAcc}
- end, {0, 0}, KRs)
+ VSize = couch_ejson_size:encoded_size(V),
+ KSize + VSize + Acc
+ end, 0, KVs1),
+ KVs2 = detuple_kvs(KVs1),
+ {ok, UserReds} = couch_query_servers:reduce(Lang, RedFuns, KVs2),
+ {length(KVs1), TotalSize, UserReds};
+ (Reductions, _ReReduce = true) ->
+ FoldFun = fun({Count, Size, UserReds}, {CAcc, SAcc, URedAcc}) ->
+ NewCAcc = Count + CAcc,
+ NewSAcc = Size + SAcc,
+ NewURedAcc = [UserReds | URedAcc],
+ {NewCAcc, NewSAcc, NewURedAcc}
+ end,
+ InitAcc = {0, 0, []},
+ FinalAcc = lists:foldl(FoldFun, InitAcc, Reductions),
+ {FinalCount, FinalSize, UReds} = FinalAcc,
+ {ok, Result} = couch_query_servers:rereduce(Lang, RedFuns, UReds),
+ {FinalCount, FinalSize, Result}
end.
@@ -284,6 +400,17 @@ to_map_opts(Options) ->
{Dir, StartKey, EndKey, InclusiveEnd}.
+to_red_opts(Options) ->
+ {Dir, StartKey, EndKey, InclusiveEnd} = to_map_opts(Options),
+
+ GroupKeyFun = case lists:keyfind(group_key_fun, 1, Options) of
+ {group_key_fun, GKF} -> GKF;
+ false -> fun({_Key, _DocId}) -> global_group end
+ end,
+
+ {Dir, StartKey, EndKey, InclusiveEnd, GroupKeyFun}.
+
+
gather_update_info(Tx, Mrst, Docs) ->
% A special token used to indicate that the row should be deleted
DeleteRef = erlang:make_ref(),
@@ -420,6 +547,22 @@ combine_vals(V1, V2) ->
{dups, [V1, V2]}.
+expand_dupes([]) ->
+ [];
+expand_dupes([{K, {dups, Dups}} | Rest]) ->
+ Expanded = [{K, D} || D <- Dups],
+ Expanded ++ expand_dupes(Rest);
+expand_dupes([{K, V} | Rest]) ->
+ [{K, V} | expand_dupes(Rest)].
+
+
+detuple_kvs([]) ->
+ [];
+detuple_kvs([KV | Rest]) ->
+ {{Key, Id}, Value} = KV,
+ [[[Key, Id], Value] | detuple_kvs(Rest)].
+
+
id_tree_prefix(DbPrefix, Sig) ->
Key = {?DB_VIEWS, ?VIEW_TREES, Sig, ?VIEW_ID_TREE},
erlfdb_tuple:pack(Key, DbPrefix).