summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Newson <rnewson@apache.org>2020-07-06 14:24:28 +0100
committerGitHub Enterprise <noreply@github.ibm.com>2020-07-06 14:24:28 +0100
commit60cea2d624fcd3ae54de9d342e7acdcbed00637a (patch)
tree62ed8c1dc9fe75330ceaed547105a48af9918203
parentd598fc6212508195589a9d6ba890921100110e70 (diff)
parent6ae6cc230478c43ae27b3c8203bae7152bd0962e (diff)
downloadcouchdb-60cea2d624fcd3ae54de9d342e7acdcbed00637a.tar.gz
Merge pull request #13 from cloudant/group_level
Group level
-rw-r--r--src/ebtree.erl78
1 files changed, 72 insertions, 6 deletions
diff --git a/src/ebtree.erl b/src/ebtree.erl
index ca47a41dc..c61a30d00 100644
--- a/src/ebtree.erl
+++ b/src/ebtree.erl
@@ -12,6 +12,7 @@
fold/4,
reduce/4,
full_reduce/2,
+ group_reduce/5,
validate_tree/2
]).
@@ -188,13 +189,64 @@ reduce(Db, #tree{} = Tree, StartKey, EndKey) ->
end
end,
{MapValues, ReduceValues} = fold(Db, Tree, Fun, {[], []}),
- if
- MapValues /= [] ->
- MapReduction = reduce_values(Tree, MapValues, false),
- reduce_values(Tree, [MapReduction | ReduceValues], true);
+ do_reduce(Tree, MapValues, ReduceValues).
+
+
+do_reduce(#tree{} = Tree, [], ReduceValues) when is_list(ReduceValues) ->
+ reduce_values(Tree, ReduceValues, true);
+
+do_reduce(#tree{} = Tree, MapValues, ReduceValues) when is_list(MapValues), is_list(ReduceValues) ->
+ do_reduce(Tree, [], [reduce_values(Tree, MapValues, false) | ReduceValues]).
+
+
+%% group reduce - produces reductions for contiguous keys in the same group.
+
+group_reduce(Db, #tree{} = Tree, StartKey, EndKey, GroupKeyFun) ->
+ NoGroupYet = erlang:make_ref(),
+ Fun = fun
+ ({visit, Key, Value}, {CurrentGroup, GroupAcc, MapAcc, ReduceAcc}) ->
+ AfterEnd = greater_than(Tree, Key, EndKey),
+ InRange = greater_than_or_equal(Tree, Key, StartKey) andalso less_than_or_equal(Tree, Key, EndKey),
+ KeyGroup = GroupKeyFun(Key),
+ SameGroup = CurrentGroup =:= KeyGroup,
+ if
+ AfterEnd ->
+ {stop, {CurrentGroup, GroupAcc, MapAcc, ReduceAcc}};
+ SameGroup ->
+ {ok, {CurrentGroup, GroupAcc, [{Key, Value} | MapAcc], ReduceAcc}};
+ InRange andalso CurrentGroup =:= NoGroupYet ->
+ {ok, {KeyGroup, GroupAcc, [{Key, Value}], []}};
+ InRange ->
+ %% implicit end of current group and start of a new one
+ GroupValue = do_reduce(Tree, MapAcc, ReduceAcc),
+ {ok, {KeyGroup, [{CurrentGroup, GroupValue} | GroupAcc], [{Key, Value}], []}};
+ true ->
+ {ok, {CurrentGroup, GroupAcc, MapAcc, ReduceAcc}}
+ end;
+ ({traverse, FirstKey, LastKey, Reduction}, {CurrentGroup, GroupAcc, MapAcc, ReduceAcc}) ->
+ BeforeStart = less_than(Tree, LastKey, StartKey),
+ AfterEnd = greater_than(Tree, FirstKey, EndKey),
+ Whole = CurrentGroup =:= GroupKeyFun(FirstKey) andalso CurrentGroup =:= GroupKeyFun(LastKey),
+ if
+ BeforeStart ->
+ {skip, {CurrentGroup, GroupAcc, MapAcc, ReduceAcc}};
+ AfterEnd ->
+ {stop, {CurrentGroup, GroupAcc, MapAcc, ReduceAcc}};
+ Whole ->
+ {skip, {CurrentGroup, GroupAcc, MapAcc, [Reduction | ReduceAcc]}};
+ true ->
+ {ok, {CurrentGroup, GroupAcc, MapAcc, ReduceAcc}}
+ end
+ end,
+ {CurrentGroup, GroupAcc0, MapValues, ReduceValues} = fold(Db, Tree, Fun, {NoGroupYet, [], [], []}),
+ GroupAcc1 = if
+ MapValues /= [] orelse ReduceValues /= [] ->
+ FinalGroup = do_reduce(Tree, MapValues, ReduceValues),
+ [{CurrentGroup, FinalGroup} | GroupAcc0];
true ->
- reduce_values(Tree, ReduceValues, true)
- end.
+ GroupAcc0
+ end,
+ lists:reverse(GroupAcc1).
%% range (inclusive of both ends)
@@ -936,6 +988,20 @@ stats_reduce_test_() ->
].
+group_reduce_test_() ->
+ Db = erlfdb_util:get_test_db([empty]),
+ init(Db, <<1,2,3>>, 4),
+ Tree = open(Db, <<1,2,3>>, [{reduce_fun, fun reduce_sum/2}]),
+ Max = 100,
+ Keys = [X || {_, X} <- lists:sort([ {rand:uniform(), N} || N <- lists:seq(1, Max)])],
+ GroupKeyFun = fun(Key) -> lists:sublist(Key, 2) end,
+ lists:foreach(fun(Key) -> insert(Db, Tree, [Key rem 4, Key rem 3, Key], Key) end, Keys),
+ [
+ ?_test(?assertEqual([{[1, 0], 408}, {[1, 1], 441}, {[1, 2], 376}],
+ group_reduce(Db, Tree, [1], [2], GroupKeyFun)))
+ ].
+
+
raw_collation_test() ->
Db = erlfdb_util:get_test_db([empty]),
init(Db, <<1,2,3>>, 4),