diff options
author | Paul J. Davis <paul.joseph.davis@gmail.com> | 2020-07-29 10:34:48 -0500 |
---|---|---|
committer | Paul J. Davis <paul.joseph.davis@gmail.com> | 2020-09-30 10:08:44 -0500 |
commit | 69f0ba1cc0af0c6008f63fb7342a60efa794634b (patch) | |
tree | 56e33dc559de1198ed22962b8a4705b40272dd94 | |
parent | ef6b60b760a79542df31df9c7ad64d737b860c92 (diff) | |
download | couchdb-69f0ba1cc0af0c6008f63fb7342a60efa794634b.tar.gz |
Use ebtree for reduce functions
-rw-r--r-- | src/couch_views/src/couch_views.erl | 6 | ||||
-rw-r--r-- | src/couch_views/src/couch_views_fdb.erl | 1 | ||||
-rw-r--r-- | src/couch_views/src/couch_views_reader.erl | 159 | ||||
-rw-r--r-- | src/couch_views/src/couch_views_trees.erl | 199 |
4 files changed, 327 insertions, 38 deletions
diff --git a/src/couch_views/src/couch_views.erl b/src/couch_views/src/couch_views.erl index da8a142f9..2d916314f 100644 --- a/src/couch_views/src/couch_views.erl +++ b/src/couch_views/src/couch_views.erl @@ -161,12 +161,6 @@ maybe_update_view(TxDb, Mrst, false, _Args) -> end. -is_reduce_view(#mrargs{view_type = ViewType}) -> - ViewType =:= red; -is_reduce_view({Reduce, _, _}) -> - Reduce =:= red. - - to_mrargs(#mrargs{} = Args) -> Args; diff --git a/src/couch_views/src/couch_views_fdb.erl b/src/couch_views/src/couch_views_fdb.erl index 28a60b872..b0fb82e85 100644 --- a/src/couch_views/src/couch_views_fdb.erl +++ b/src/couch_views/src/couch_views_fdb.erl @@ -299,7 +299,6 @@ reset_interactive_index(Db, Sig, _St) -> {VS, ?INDEX_BUILDING}. - version_key(Db, Sig) -> #{ db_prefix := DbPrefix diff --git a/src/couch_views/src/couch_views_reader.erl b/src/couch_views/src/couch_views_reader.erl index a785c7b35..3c5862749 100644 --- a/src/couch_views/src/couch_views_reader.erl +++ b/src/couch_views/src/couch_views_reader.erl @@ -23,7 +23,15 @@ -include_lib("fabric/include/fabric2.hrl"). -read(Db, Mrst0, ViewName, UserCallback, UserAcc0, Args) -> +read(Db, Mrst, ViewName, UserCallback, UserAcc, Args) -> + ReadFun = case Args of + #mrargs{view_type = map} -> fun read_map_view/6; + #mrargs{view_type = red} -> fun read_red_view/6 + end, + ReadFun(Db, Mrst, ViewName, UserCallback, UserAcc, Args). + + +read_map_view(Db, Mrst0, ViewName, UserCallback, UserAcc0, Args) -> try fabric2_fdb:transactional(Db, fun(TxDb) -> #mrst{ @@ -68,6 +76,79 @@ read(Db, Mrst0, ViewName, UserCallback, UserAcc0, Args) -> end. +read_red_view(Db, Mrst0, ViewName, UserCallback, UserAcc0, Args) -> + #mrst{ + language = Lang, + views = Views + } = Mrst0, + {Idx, Lang, View0} = get_red_view(Lang, Args, ViewName, Views), + Mrst1 = Mrst0#mrst{views = [View0]}, + ReadOpts = [{read_only, Idx}], + try + fabric2_fdb:transactional(Db, fun(TxDb) -> + #mrst{ + language = Lang, + views = [View1] + } = Mrst = couch_views_trees:open(TxDb, Mrst1, ReadOpts), + + #mrargs{ + extra = Extra + } = Args, + + Fun = fun handle_red_row/3, + + Meta = get_red_meta(TxDb, Mrst, View1, Args), + UserAcc1 = maybe_stop(UserCallback(Meta, UserAcc0)), + + Finalizer = case couch_util:get_value(finalizer, Extra) of + undefined -> + {_, FunSrc} = lists:nth(Idx, View1#mrview.reduce_funs), + FunSrc; + CustomFun-> + CustomFun + end, + + Acc0 = #{ + db => TxDb, + skip => Args#mrargs.skip, + limit => Args#mrargs.limit, + mrargs => undefined, + finalizer => Finalizer, + red_idx => Idx, + language => Lang, + callback => UserCallback, + acc => UserAcc1 + }, + + Acc1 = lists:foldl(fun(KeyArgs, KeyAcc0) -> + Opts = mrargs_to_fdb_options(KeyArgs), + KeyAcc1 = KeyAcc0#{ + mrargs := KeyArgs + }, + couch_views_trees:fold_red_idx( + TxDb, + View1, + Idx, + Opts, + Fun, + KeyAcc1 + ) + end, Acc0, expand_keys_args(Args)), + + #{ + acc := UserAcc2 + } = Acc1, + {ok, maybe_stop(UserCallback(complete, UserAcc2))} + end) + catch + throw:{complete, Out} -> + {_, Final} = UserCallback(complete, Out), + {ok, Final}; + throw:{done, Out} -> + {ok, Out} + end. + + get_map_meta(TxDb, Mrst, View, #mrargs{update_seq = true}) -> TotalRows = couch_views_trees:get_row_count(TxDb, View), ViewSeq = couch_views_fdb:get_update_seq(TxDb, Mrst), @@ -78,6 +159,14 @@ get_map_meta(TxDb, _Mrst, View, #mrargs{}) -> {meta, [{total, TotalRows}, {offset, null}]}. +get_red_meta(TxDb, Mrst, _View, #mrargs{update_seq = true}) -> + ViewSeq = couch_views_fdb:get_update_seq(TxDb, Mrst), + {meta, [{update_seq, ViewSeq}]}; + +get_red_meta(_TxDb, _Mrst, _View, #mrargs{}) -> + {meta, []}. + + handle_map_row(_DocId, _Key, _Value, #{skip := Skip} = Acc) when Skip > 0 -> Acc#{skip := Skip - 1}; @@ -115,6 +204,38 @@ handle_map_row(DocId, Key, Value, Acc) -> Acc#{limit := Limit - 1, acc := UserAcc1}. +handle_red_row(_Key, _Red, #{skip := Skip} = Acc) when Skip > 0 -> + Acc#{skip := Skip - 1}; + +handle_red_row(_Key, _Value, #{limit := 0, acc := UserAcc}) -> + throw({complete, UserAcc}); + +handle_red_row(Key0, Value0, Acc) -> + #{ + limit := Limit, + finalizer := Finalizer, + callback := UserCallback, + acc := UserAcc0 + } = Acc, + + Key1 = case Key0 of + undefined -> null; + _ -> Key0 + end, + Value1 = maybe_finalize(Finalizer, Value0), + Row = [{key, Key1}, {value, Value1}], + + UserAcc1 = maybe_stop(UserCallback({row, Row}, UserAcc0)), + Acc#{limit := Limit - 1, acc := UserAcc1}. + + +maybe_finalize(null, Red) -> + Red; +maybe_finalize(Finalizer, Red) -> + {ok, Finalized} = couch_query_servers:finalize(Finalizer, Red), + Finalized. + + get_map_view(Lang, Args, ViewName, Views) -> case couch_mrview_util:extract_view(Lang, Args, ViewName, Views) of {map, View, _Args} -> View; @@ -122,6 +243,13 @@ get_map_view(Lang, Args, ViewName, Views) -> end. +get_red_view(Lang, Args, ViewName, Views) -> + case couch_mrview_util:extract_view(Lang, Args, ViewName, Views) of + {red, {Idx, Lang, View}, _} -> {Idx, Lang, View}; + _ -> throw({not_found, missing_named_view}) + end. + + expand_keys_args(#mrargs{keys = undefined} = Args) -> [Args]; @@ -136,12 +264,14 @@ expand_keys_args(#mrargs{keys = Keys} = Args) -> mrargs_to_fdb_options(Args) -> #mrargs{ + view_type = ViewType, start_key = StartKey, start_key_docid = StartKeyDocId, end_key = EndKey, end_key_docid = EndKeyDocId0, direction = Direction, - inclusive_end = InclusiveEnd + inclusive_end = InclusiveEnd, + group_level = GroupLevel } = Args, StartKeyOpts = if StartKey == undefined -> []; true -> @@ -160,10 +290,33 @@ mrargs_to_fdb_options(Args) -> [{end_key, {EndKey, EndKeyDocId}}] end, + GroupFunOpt = make_group_key_fun(ViewType, GroupLevel), + [ {dir, Direction}, {inclusive_end, InclusiveEnd} - ] ++ StartKeyOpts ++ EndKeyOpts. + ] ++ StartKeyOpts ++ EndKeyOpts ++ GroupFunOpt. + + +make_group_key_fun(map, _) -> + []; + +make_group_key_fun(red, exact) -> + [ + {group_key_fun, fun({Key, _DocId}) -> Key end} + ]; + +make_group_key_fun(red, 0) -> + [ + {group_key_fun, group_all} + ]; + +make_group_key_fun(red, N) when is_integer(N), N > 0 -> + GKFun = fun + ({Key, _DocId}) when is_list(Key) -> lists:sublist(Key, N); + ({Key, _DocId}) -> Key + end, + [{group_key_fun, GKFun}]. maybe_stop({ok, Acc}) -> Acc; diff --git a/src/couch_views/src/couch_views_trees.erl b/src/couch_views/src/couch_views_trees.erl index 7ce350506..b45750be9 100644 --- a/src/couch_views/src/couch_views_trees.erl +++ b/src/couch_views/src/couch_views_trees.erl @@ -14,11 +14,13 @@ -export([ open/2, + open/3, get_row_count/2, get_kv_size/2, fold_map_idx/5, + fold_red_idx/6, update_views/3 ]). @@ -35,6 +37,10 @@ open(TxDb, Mrst) -> + open(TxDb, Mrst, []). + + +open(TxDb, Mrst, Options) -> #mrst{ sig = Sig, language = Lang, @@ -42,7 +48,7 @@ open(TxDb, Mrst) -> } = Mrst, Mrst#mrst{ id_btree = open_id_tree(TxDb, Sig), - views = [open_view_tree(TxDb, Sig, Lang, V) || V <- Views] + views = [open_view_tree(TxDb, Sig, Lang, V, Options) || V <- Views] }. @@ -50,7 +56,7 @@ get_row_count(TxDb, View) -> #{ tx := Tx } = TxDb, - {Count, _} = ebtree:full_reduce(Tx, View#mrview.btree), + {Count, _, _} = ebtree:full_reduce(Tx, View#mrview.btree), Count. @@ -58,7 +64,7 @@ get_kv_size(TxDb, View) -> #{ tx := Tx } = TxDb, - {_, TotalSize} = ebtree:full_reduce(Tx, View#mrview.btree), + {_, TotalSize, _} = ebtree:full_reduce(Tx, View#mrview.btree), TotalSize. @@ -122,6 +128,74 @@ fold_map_idx(TxDb, View, Options, Callback, Acc0) -> end. +fold_red_idx(TxDb, View, Idx, Options, Callback, Acc0) -> + #{ + tx := Tx + } = TxDb, + #mrview{ + btree = Btree + } = View, + + {Dir, StartKey, EndKey, InclusiveEnd, GroupKeyFun} = to_red_opts(Options), + + Wrapper = fun({GroupKey, Reduction}, WAcc) -> + {_RowCount, _RowSize, UserReds} = Reduction, + RedValue = lists:nth(Idx, UserReds), + Callback(GroupKey, RedValue, WAcc) + end, + + case {GroupKeyFun, Dir} of + {group_all, fwd} -> + EBtreeOpts = [ + {dir, fwd}, + {inclusive_end, InclusiveEnd} + ], + Reduction = ebtree:reduce(Tx, Btree, StartKey, EndKey, EBtreeOpts), + Wrapper({null, Reduction}, Acc0); + {F, fwd} when is_function(F) -> + EBtreeOpts = [ + {dir, fwd}, + {inclusive_end, InclusiveEnd} + ], + ebtree:group_reduce( + Tx, + Btree, + StartKey, + EndKey, + GroupKeyFun, + Wrapper, + Acc0, + EBtreeOpts + ); + {group_all, rev} -> + % Start/End keys swapped on purpose because ebtree. Also + % inclusive_start for same reason. + EBtreeOpts = [ + {dir, rev}, + {inclusive_start, InclusiveEnd} + ], + Reduction = ebtree:reduce(Tx, Btree, EndKey, StartKey, EBtreeOpts), + Wrapper({null, Reduction}, Acc0); + {F, rev} when is_function(F) -> + % Start/End keys swapped on purpose because ebtree. Also + % inclusive_start for same reason. + EBtreeOpts = [ + {dir, rev}, + {inclusive_start, InclusiveEnd} + ], + ebtree:group_reduce( + Tx, + Btree, + EndKey, + StartKey, + GroupKeyFun, + Wrapper, + Acc0, + EBtreeOpts + ) + end. + + update_views(TxDb, Mrst, Docs) -> #{ tx := Tx @@ -129,7 +203,7 @@ update_views(TxDb, Mrst, Docs) -> % Get initial KV size OldKVSize = lists:foldl(fun(View, SizeAcc) -> - {_, Size} = ebtree:full_reduce(Tx, View#mrview.btree), + {_, Size, _} = ebtree:full_reduce(Tx, View#mrview.btree), SizeAcc + Size end, 0, Mrst#mrst.views), @@ -156,7 +230,7 @@ update_views(TxDb, Mrst, Docs) -> % Get new KV size after update NewKVSize = lists:foldl(fun(View, SizeAcc) -> - {_, Size} = ebtree:full_reduce(Tx, View#mrview.btree), + {_, Size, _} = ebtree:full_reduce(Tx, View#mrview.btree), SizeAcc + Size end, 0, Mrst#mrst.views), @@ -176,7 +250,7 @@ open_id_tree(TxDb, Sig) -> ebtree:open(Tx, Prefix, get_order(id_btree), TreeOpts). -open_view_tree(TxDb, Sig, Lang, View) -> +open_view_tree(TxDb, Sig, Lang, View, Options) -> #{ tx := Tx, db_prefix := DbPrefix @@ -185,12 +259,21 @@ open_view_tree(TxDb, Sig, Lang, View) -> id_num = ViewId } = View, Prefix = view_tree_prefix(DbPrefix, Sig, ViewId), - TreeOpts = [ + BaseOpts = [ {collate_fun, couch_views_util:collate_fun(View)}, - {reduce_fun, make_reduce_fun(Lang, View)}, - {persist_fun, fun couch_views_fdb:persist_chunks/3}, - {cache_fun, create_cache_fun({view, ViewId})} + {persist_fun, fun couch_views_fdb:persist_chunks/3} ], + ExtraOpts = case lists:keyfind(read_only, 1, Options) of + {read_only, Idx} -> + RedFun = make_read_only_reduce_fun(Lang, View, Idx), + [{reduce_fun, RedFun}]; + false -> + [ + {reduce_fun, make_reduce_fun(Lang, View)}, + {cache_fun, create_cache_fun({view, ViewId})} + ] + end, + TreeOpts = BaseOpts ++ ExtraOpts, View#mrview{ btree = ebtree:open(Tx, Prefix, get_order(view_btree), TreeOpts) }. @@ -210,27 +293,60 @@ min_order(V) -> V + 1. -make_reduce_fun(_Lang, #mrview{}) -> +make_read_only_reduce_fun(Lang, View, NthRed) -> + RedFuns = [Src || {_, Src} <- View#mrview.reduce_funs], + if RedFuns /= [] -> ok; true -> + io:format(standard_error, "~p~n", [process_info(self(), current_stacktrace)]) + end, + LPad = lists:duplicate(NthRed - 1, []), + RPad = lists:duplicate(length(RedFuns) - NthRed, []), + FunSrc = lists:nth(NthRed, RedFuns), fun - (KVs, _ReReduce = false) -> + (KVs0, _ReReduce = false) -> + KVs1 = detuple_kvs(expand_dupes(KVs0)), + {ok, Result} = couch_query_servers:reduce(Lang, [FunSrc], KVs1), + {0, 0, LPad ++ Result ++ RPad}; + (Reductions, _ReReduce = true) -> + ExtractFun = fun(Reds) -> + {_Count, _Size, UReds} = Reds, + [lists:nth(NthRed, UReds)] + end, + UReds = lists:map(ExtractFun, Reductions), + {ok, Result} = case UReds of + [RedVal] -> + {ok, RedVal}; + _ -> + couch_query_servers:rereduce(Lang, [FunSrc], UReds) + end, + {0, 0, LPad ++ Result ++ RPad} + end. + + +make_reduce_fun(Lang, #mrview{} = View) -> + RedFuns = [Src || {_, Src} <- View#mrview.reduce_funs], + fun + (KVs0, _ReReduce = false) -> + KVs1 = expand_dupes(KVs0), TotalSize = lists:foldl(fun({{K, _DocId}, V}, Acc) -> KSize = couch_ejson_size:encoded_size(K), - Acc + case V of - {dups, Dups} -> - lists:foldl(fun(D, DAcc) -> - VSize = couch_ejson_size:encoded_size(D), - DAcc + KSize + VSize - end, 0, Dups); - _ -> - VSize = couch_ejson_size:encoded_size(V), - KSize + VSize - end - end, 0, KVs), - {length(KVs), TotalSize}; - (KRs, _ReReduce = true) -> - lists:foldl(fun({Count, Size}, {CountAcc, SizeAcc}) -> - {Count + CountAcc, Size + SizeAcc} - end, {0, 0}, KRs) + VSize = couch_ejson_size:encoded_size(V), + KSize + VSize + Acc + end, 0, KVs1), + KVs2 = detuple_kvs(KVs1), + {ok, UserReds} = couch_query_servers:reduce(Lang, RedFuns, KVs2), + {length(KVs1), TotalSize, UserReds}; + (Reductions, _ReReduce = true) -> + FoldFun = fun({Count, Size, UserReds}, {CAcc, SAcc, URedAcc}) -> + NewCAcc = Count + CAcc, + NewSAcc = Size + SAcc, + NewURedAcc = [UserReds | URedAcc], + {NewCAcc, NewSAcc, NewURedAcc} + end, + InitAcc = {0, 0, []}, + FinalAcc = lists:foldl(FoldFun, InitAcc, Reductions), + {FinalCount, FinalSize, UReds} = FinalAcc, + {ok, Result} = couch_query_servers:rereduce(Lang, RedFuns, UReds), + {FinalCount, FinalSize, Result} end. @@ -284,6 +400,17 @@ to_map_opts(Options) -> {Dir, StartKey, EndKey, InclusiveEnd}. +to_red_opts(Options) -> + {Dir, StartKey, EndKey, InclusiveEnd} = to_map_opts(Options), + + GroupKeyFun = case lists:keyfind(group_key_fun, 1, Options) of + {group_key_fun, GKF} -> GKF; + false -> fun({_Key, _DocId}) -> global_group end + end, + + {Dir, StartKey, EndKey, InclusiveEnd, GroupKeyFun}. + + gather_update_info(Tx, Mrst, Docs) -> % A special token used to indicate that the row should be deleted DeleteRef = erlang:make_ref(), @@ -420,6 +547,22 @@ combine_vals(V1, V2) -> {dups, [V1, V2]}. +expand_dupes([]) -> + []; +expand_dupes([{K, {dups, Dups}} | Rest]) -> + Expanded = [{K, D} || D <- Dups], + Expanded ++ expand_dupes(Rest); +expand_dupes([{K, V} | Rest]) -> + [{K, V} | expand_dupes(Rest)]. + + +detuple_kvs([]) -> + []; +detuple_kvs([KV | Rest]) -> + {{Key, Id}, Value} = KV, + [[[Key, Id], Value] | detuple_kvs(Rest)]. + + id_tree_prefix(DbPrefix, Sig) -> Key = {?DB_VIEWS, ?VIEW_TREES, Sig, ?VIEW_ID_TREE}, erlfdb_tuple:pack(Key, DbPrefix). |