diff options
Diffstat (limited to 'src/couch/src/couch_btree.erl')
-rw-r--r-- | src/couch/src/couch_btree.erl | 1112 |
1 files changed, 715 insertions, 397 deletions
diff --git a/src/couch/src/couch_btree.erl b/src/couch/src/couch_btree.erl index 858ae2b90..b974a22ee 100644 --- a/src/couch/src/couch_btree.erl +++ b/src/couch/src/couch_btree.erl @@ -21,45 +21,45 @@ -define(FILL_RATIO, 0.5). -extract(#btree{extract_kv=undefined}, Value) -> +extract(#btree{extract_kv = undefined}, Value) -> Value; -extract(#btree{extract_kv=Extract}, Value) -> +extract(#btree{extract_kv = Extract}, Value) -> Extract(Value). -assemble(#btree{assemble_kv=undefined}, Key, Value) -> +assemble(#btree{assemble_kv = undefined}, Key, Value) -> {Key, Value}; -assemble(#btree{assemble_kv=Assemble}, Key, Value) -> +assemble(#btree{assemble_kv = Assemble}, Key, Value) -> Assemble(Key, Value). -less(#btree{less=undefined}, A, B) -> +less(#btree{less = undefined}, A, B) -> A < B; -less(#btree{less=Less}, A, B) -> +less(#btree{less = Less}, A, B) -> Less(A, B). % pass in 'nil' for State if a new Btree. open(State, Fd) -> - {ok, #btree{root=State, fd=Fd}}. + {ok, #btree{root = State, fd = Fd}}. set_options(Bt, []) -> Bt; -set_options(Bt, [{split, Extract}|Rest]) -> - set_options(Bt#btree{extract_kv=Extract}, Rest); -set_options(Bt, [{join, Assemble}|Rest]) -> - set_options(Bt#btree{assemble_kv=Assemble}, Rest); -set_options(Bt, [{less, Less}|Rest]) -> - set_options(Bt#btree{less=Less}, Rest); -set_options(Bt, [{reduce, Reduce}|Rest]) -> - set_options(Bt#btree{reduce=Reduce}, Rest); -set_options(Bt, [{compression, Comp}|Rest]) -> - set_options(Bt#btree{compression=Comp}, Rest). +set_options(Bt, [{split, Extract} | Rest]) -> + set_options(Bt#btree{extract_kv = Extract}, Rest); +set_options(Bt, [{join, Assemble} | Rest]) -> + set_options(Bt#btree{assemble_kv = Assemble}, Rest); +set_options(Bt, [{less, Less} | Rest]) -> + set_options(Bt#btree{less = Less}, Rest); +set_options(Bt, [{reduce, Reduce} | Rest]) -> + set_options(Bt#btree{reduce = Reduce}, Rest); +set_options(Bt, [{compression, Comp} | Rest]) -> + set_options(Bt#btree{compression = Comp}, Rest). open(State, Fd, Options) -> - {ok, set_options(#btree{root=State, fd=Fd}, Options)}. + {ok, set_options(#btree{root = State, fd = Fd}, Options)}. -get_state(#btree{root=Root}) -> +get_state(#btree{root = Root}) -> Root. -final_reduce(#btree{reduce=Reduce}, Val) -> +final_reduce(#btree{reduce = Reduce}, Val) -> final_reduce(Reduce, Val); final_reduce(Reduce, {[], []}) -> Reduce(reduce, []); @@ -71,30 +71,42 @@ final_reduce(Reduce, {KVs, Reductions}) -> Red = Reduce(reduce, KVs), final_reduce(Reduce, {[], [Red | Reductions]}). -fold_reduce(#btree{root=Root}=Bt, Fun, Acc, Options) -> +fold_reduce(#btree{root = Root} = Bt, Fun, Acc, Options) -> Dir = couch_util:get_value(dir, Options, fwd), StartKey = couch_util:get_value(start_key, Options), InEndRangeFun = make_key_in_end_range_function(Bt, Dir, Options), KeyGroupFun = get_group_fun(Bt, Options), try {ok, Acc2, GroupedRedsAcc2, GroupedKVsAcc2, GroupedKey2} = - reduce_stream_node(Bt, Dir, Root, StartKey, InEndRangeFun, undefined, [], [], - KeyGroupFun, Fun, Acc), - if GroupedKey2 == undefined -> - {ok, Acc2}; - true -> - case Fun(GroupedKey2, {GroupedKVsAcc2, GroupedRedsAcc2}, Acc2) of - {ok, Acc3} -> {ok, Acc3}; - {stop, Acc3} -> {ok, Acc3} - end + reduce_stream_node( + Bt, + Dir, + Root, + StartKey, + InEndRangeFun, + undefined, + [], + [], + KeyGroupFun, + Fun, + Acc + ), + if + GroupedKey2 == undefined -> + {ok, Acc2}; + true -> + case Fun(GroupedKey2, {GroupedKVsAcc2, GroupedRedsAcc2}, Acc2) of + {ok, Acc3} -> {ok, Acc3}; + {stop, Acc3} -> {ok, Acc3} + end end catch throw:{stop, AccDone} -> {ok, AccDone} end. -full_reduce(#btree{root=nil,reduce=Reduce}) -> +full_reduce(#btree{root = nil, reduce = Reduce}) -> {ok, Reduce(reduce, [])}; -full_reduce(#btree{root=Root}) -> +full_reduce(#btree{root = Root}) -> {ok, element(2, Root)}. size(#btree{root = nil}) -> @@ -114,7 +126,7 @@ get_group_fun(Bt, Options) -> N when is_integer(N), N > 0 -> make_group_fun(Bt, N); undefined -> - couch_util:get_value(key_group_fun, Options, fun(_,_) -> true end) + couch_util:get_value(key_group_fun, Options, fun(_, _) -> true end) end. make_group_fun(Bt, exact) -> @@ -135,7 +147,7 @@ make_group_fun(Bt, GroupLevel) when is_integer(GroupLevel), GroupLevel > 0 -> fun GF({{p, Partition, Key1}, Val1}, {{p, Partition, Key2}, Val2}) -> GF({Key1, Val1}, {Key2, Val2}); - GF({[_|_] = Key1, _}, {[_|_] = Key2, _}) -> + GF({[_ | _] = Key1, _}, {[_ | _] = Key2, _}) -> SL1 = lists:sublist(Key1, GroupLevel), SL2 = lists:sublist(Key2, GroupLevel), case less(Bt, {SL1, nil}, {SL2, nil}) of @@ -175,61 +187,75 @@ convert_fun_arity(Fun) when is_function(Fun, 3) -> (traverse, _K, _Red, AccIn) -> {ok, AccIn} end; convert_fun_arity(Fun) when is_function(Fun, 4) -> - Fun. % Already arity 4 + % Already arity 4 + Fun. make_key_in_end_range_function(Bt, fwd, Options) -> case couch_util:get_value(end_key_gt, Options) of - undefined -> - case couch_util:get_value(end_key, Options) of undefined -> - fun(_Key) -> true end; - LastKey -> - fun(Key) -> not less(Bt, LastKey, Key) end - end; - EndKey -> - fun(Key) -> less(Bt, Key, EndKey) end + case couch_util:get_value(end_key, Options) of + undefined -> + fun(_Key) -> true end; + LastKey -> + fun(Key) -> not less(Bt, LastKey, Key) end + end; + EndKey -> + fun(Key) -> less(Bt, Key, EndKey) end end; make_key_in_end_range_function(Bt, rev, Options) -> case couch_util:get_value(end_key_gt, Options) of - undefined -> - case couch_util:get_value(end_key, Options) of undefined -> - fun(_Key) -> true end; - LastKey -> - fun(Key) -> not less(Bt, Key, LastKey) end - end; - EndKey -> - fun(Key) -> less(Bt, EndKey, Key) end + case couch_util:get_value(end_key, Options) of + undefined -> + fun(_Key) -> true end; + LastKey -> + fun(Key) -> not less(Bt, Key, LastKey) end + end; + EndKey -> + fun(Key) -> less(Bt, EndKey, Key) end end. - foldl(Bt, Fun, Acc) -> fold(Bt, Fun, Acc, []). foldl(Bt, Fun, Acc, Options) -> fold(Bt, Fun, Acc, Options). - -fold(#btree{root=nil}, _Fun, Acc, _Options) -> +fold(#btree{root = nil}, _Fun, Acc, _Options) -> {ok, {[], []}, Acc}; -fold(#btree{root=Root}=Bt, Fun, Acc, Options) -> +fold(#btree{root = Root} = Bt, Fun, Acc, Options) -> Dir = couch_util:get_value(dir, Options, fwd), InRange = make_key_in_end_range_function(Bt, Dir, Options), Result = - case couch_util:get_value(start_key, Options) of - undefined -> - stream_node(Bt, [], Bt#btree.root, InRange, Dir, - convert_fun_arity(Fun), Acc); - StartKey -> - stream_node(Bt, [], Bt#btree.root, StartKey, InRange, Dir, - convert_fun_arity(Fun), Acc) - end, + case couch_util:get_value(start_key, Options) of + undefined -> + stream_node( + Bt, + [], + Bt#btree.root, + InRange, + Dir, + convert_fun_arity(Fun), + Acc + ); + StartKey -> + stream_node( + Bt, + [], + Bt#btree.root, + StartKey, + InRange, + Dir, + convert_fun_arity(Fun), + Acc + ) + end, case Result of - {ok, Acc2}-> - FullReduction = element(2, Root), - {ok, {[], [FullReduction]}, Acc2}; - {stop, LastReduction, Acc2} -> - {ok, LastReduction, Acc2} + {ok, Acc2} -> + FullReduction = element(2, Root), + {ok, {[], [FullReduction]}, Acc2}; + {stop, LastReduction, Acc2} -> + {ok, LastReduction, Acc2} end. add(Bt, InsertKeyValues) -> @@ -240,27 +266,28 @@ add_remove(Bt, InsertKeyValues, RemoveKeys) -> {ok, Bt2}. query_modify(Bt, LookupKeys, InsertValues, RemoveKeys) -> - #btree{root=Root} = Bt, + #btree{root = Root} = Bt, InsertActions = lists:map( fun(KeyValue) -> {Key, Value} = extract(Bt, KeyValue), {insert, Key, Value} - end, InsertValues), + end, + InsertValues + ), RemoveActions = [{remove, Key, nil} || Key <- RemoveKeys], FetchActions = [{fetch, Key, nil} || Key <- LookupKeys], SortFun = fun({OpA, A, _}, {OpB, B, _}) -> case A == B of - % A and B are equal, sort by op. - true -> op_order(OpA) < op_order(OpB); - false -> - less(Bt, A, B) + % A and B are equal, sort by op. + true -> op_order(OpA) < op_order(OpB); + false -> less(Bt, A, B) end end, Actions = lists:sort(SortFun, lists:append([InsertActions, RemoveActions, FetchActions])), {ok, KeyPointers, QueryResults} = modify_node(Bt, Root, Actions, []), {ok, NewRoot} = complete_root(Bt, KeyPointers), - {ok, QueryResults, Bt#btree{root=NewRoot}}. + {ok, QueryResults, Bt#btree{root = NewRoot}}. % for ordering different operations with the same key. % fetch < remove < insert @@ -268,11 +295,12 @@ op_order(fetch) -> 1; op_order(remove) -> 2; op_order(insert) -> 3. -lookup(#btree{root=Root, less=Less}=Bt, Keys) -> - SortedKeys = case Less of - undefined -> lists:sort(Keys); - _ -> lists:sort(Less, Keys) - end, +lookup(#btree{root = Root, less = Less} = Bt, Keys) -> + SortedKeys = + case Less of + undefined -> lists:sort(Keys); + _ -> lists:sort(Less, Keys) + end, {ok, SortedResults} = lookup(Bt, Root, SortedKeys), % We want to return the results in the same order as the keys were input % but we may have changed the order when we sorted. So we need to put the @@ -285,10 +313,10 @@ lookup(Bt, Node, Keys) -> Pointer = element(1, Node), {NodeType, NodeList} = get_node(Bt, Pointer), case NodeType of - kp_node -> - lookup_kpnode(Bt, list_to_tuple(NodeList), 1, Keys, []); - kv_node -> - lookup_kvnode(Bt, list_to_tuple(NodeList), 1, Keys, []) + kp_node -> + lookup_kpnode(Bt, list_to_tuple(NodeList), 1, Keys, []); + kv_node -> + lookup_kvnode(Bt, list_to_tuple(NodeList), 1, Keys, []) end. lookup_kpnode(_Bt, _NodeTuple, _LowerBound, [], Output) -> @@ -300,14 +328,13 @@ lookup_kpnode(Bt, NodeTuple, LowerBound, [FirstLookupKey | _] = LookupKeys, Outp {Key, PointerInfo} = element(N, NodeTuple), SplitFun = fun(LookupKey) -> not less(Bt, Key, LookupKey) end, case lists:splitwith(SplitFun, LookupKeys) of - {[], GreaterQueries} -> - lookup_kpnode(Bt, NodeTuple, N + 1, GreaterQueries, Output); - {LessEqQueries, GreaterQueries} -> - {ok, Results} = lookup(Bt, PointerInfo, LessEqQueries), - lookup_kpnode(Bt, NodeTuple, N + 1, GreaterQueries, lists:reverse(Results, Output)) + {[], GreaterQueries} -> + lookup_kpnode(Bt, NodeTuple, N + 1, GreaterQueries, Output); + {LessEqQueries, GreaterQueries} -> + {ok, Results} = lookup(Bt, PointerInfo, LessEqQueries), + lookup_kpnode(Bt, NodeTuple, N + 1, GreaterQueries, lists:reverse(Results, Output)) end. - lookup_kvnode(_Bt, _NodeTuple, _LowerBound, [], Output) -> {ok, lists:reverse(Output)}; lookup_kvnode(_Bt, NodeTuple, LowerBound, Keys, Output) when tuple_size(NodeTuple) < LowerBound -> @@ -317,24 +344,27 @@ lookup_kvnode(Bt, NodeTuple, LowerBound, [LookupKey | RestLookupKeys], Output) - N = find_first_gteq(Bt, NodeTuple, LowerBound, tuple_size(NodeTuple), LookupKey), {Key, Value} = element(N, NodeTuple), case less(Bt, LookupKey, Key) of - true -> - % LookupKey is less than Key - lookup_kvnode(Bt, NodeTuple, N, RestLookupKeys, [{LookupKey, not_found} | Output]); - false -> - case less(Bt, Key, LookupKey) of true -> - % LookupKey is greater than Key - lookup_kvnode(Bt, NodeTuple, N+1, RestLookupKeys, [{LookupKey, not_found} | Output]); + % LookupKey is less than Key + lookup_kvnode(Bt, NodeTuple, N, RestLookupKeys, [{LookupKey, not_found} | Output]); false -> - % LookupKey is equal to Key - lookup_kvnode(Bt, NodeTuple, N, RestLookupKeys, [{LookupKey, {ok, assemble(Bt, LookupKey, Value)}} | Output]) - end + case less(Bt, Key, LookupKey) of + true -> + % LookupKey is greater than Key + lookup_kvnode(Bt, NodeTuple, N + 1, RestLookupKeys, [ + {LookupKey, not_found} | Output + ]); + false -> + % LookupKey is equal to Key + lookup_kvnode(Bt, NodeTuple, N, RestLookupKeys, [ + {LookupKey, {ok, assemble(Bt, LookupKey, Value)}} | Output + ]) + end end. - complete_root(_Bt, []) -> {ok, nil}; -complete_root(_Bt, [{_Key, PointerInfo}])-> +complete_root(_Bt, [{_Key, PointerInfo}]) -> {ok, PointerInfo}; complete_root(Bt, KPs) -> {ok, ResultKeyPointers} = write_node(Bt, kp_node, KPs), @@ -348,12 +378,12 @@ complete_root(Bt, KPs) -> chunkify(InList) -> BaseChunkSize = get_chunk_size(), case ?term_size(InList) of - Size when Size > BaseChunkSize -> - NumberOfChunksLikely = ((Size div BaseChunkSize) + 1), - ChunkThreshold = Size div NumberOfChunksLikely, - chunkify(InList, ChunkThreshold, [], 0, []); - _Else -> - [InList] + Size when Size > BaseChunkSize -> + NumberOfChunksLikely = ((Size div BaseChunkSize) + 1), + ChunkThreshold = Size div NumberOfChunksLikely, + chunkify(InList, ChunkThreshold, [], 0, []); + _Else -> + [InList] end. chunkify([], _ChunkThreshold, [], 0, OutputChunks) -> @@ -365,58 +395,67 @@ chunkify([], _ChunkThreshold, OutList, _OutListSize, OutputChunks) -> lists:reverse([lists:reverse(OutList) | OutputChunks]); chunkify([InElement | RestInList], ChunkThreshold, OutList, OutListSize, OutputChunks) -> case ?term_size(InElement) of - Size when (Size + OutListSize) > ChunkThreshold andalso OutList /= [] -> - chunkify(RestInList, ChunkThreshold, [], 0, [lists:reverse([InElement | OutList]) | OutputChunks]); - Size -> - chunkify(RestInList, ChunkThreshold, [InElement | OutList], OutListSize + Size, OutputChunks) + Size when (Size + OutListSize) > ChunkThreshold andalso OutList /= [] -> + chunkify(RestInList, ChunkThreshold, [], 0, [ + lists:reverse([InElement | OutList]) | OutputChunks + ]); + Size -> + chunkify( + RestInList, ChunkThreshold, [InElement | OutList], OutListSize + Size, OutputChunks + ) end. --compile({inline,[get_chunk_size/0]}). +-compile({inline, [get_chunk_size/0]}). get_chunk_size() -> try list_to_integer(config:get("couchdb", "btree_chunk_size", "1279")) - catch error:badarg -> - 1279 + catch + error:badarg -> + 1279 end. modify_node(Bt, RootPointerInfo, Actions, QueryOutput) -> - {NodeType, NodeList} = case RootPointerInfo of - nil -> - {kv_node, []}; - _Tuple -> - Pointer = element(1, RootPointerInfo), - get_node(Bt, Pointer) - end, + {NodeType, NodeList} = + case RootPointerInfo of + nil -> + {kv_node, []}; + _Tuple -> + Pointer = element(1, RootPointerInfo), + get_node(Bt, Pointer) + end, NodeTuple = list_to_tuple(NodeList), {ok, NewNodeList, QueryOutput2} = - case NodeType of - kp_node -> modify_kpnode(Bt, NodeTuple, 1, Actions, [], QueryOutput); - kv_node -> modify_kvnode(Bt, NodeTuple, 1, Actions, [], QueryOutput) - end, + case NodeType of + kp_node -> modify_kpnode(Bt, NodeTuple, 1, Actions, [], QueryOutput); + kv_node -> modify_kvnode(Bt, NodeTuple, 1, Actions, [], QueryOutput) + end, case NewNodeList of - [] -> % no nodes remain - {ok, [], QueryOutput2}; - NodeList -> % nothing changed - {LastKey, _LastValue} = element(tuple_size(NodeTuple), NodeTuple), - {ok, [{LastKey, RootPointerInfo}], QueryOutput2}; - _Else2 -> - {ok, ResultList} = case RootPointerInfo of - nil -> - write_node(Bt, NodeType, NewNodeList); - _ -> + % no nodes remain + [] -> + {ok, [], QueryOutput2}; + % nothing changed + NodeList -> {LastKey, _LastValue} = element(tuple_size(NodeTuple), NodeTuple), - OldNode = {LastKey, RootPointerInfo}, - write_node(Bt, OldNode, NodeType, NodeList, NewNodeList) - end, - {ok, ResultList, QueryOutput2} + {ok, [{LastKey, RootPointerInfo}], QueryOutput2}; + _Else2 -> + {ok, ResultList} = + case RootPointerInfo of + nil -> + write_node(Bt, NodeType, NewNodeList); + _ -> + {LastKey, _LastValue} = element(tuple_size(NodeTuple), NodeTuple), + OldNode = {LastKey, RootPointerInfo}, + write_node(Bt, OldNode, NodeType, NodeList, NewNodeList) + end, + {ok, ResultList, QueryOutput2} end. -reduce_node(#btree{reduce=nil}, _NodeType, _NodeList) -> +reduce_node(#btree{reduce = nil}, _NodeType, _NodeList) -> []; -reduce_node(#btree{reduce=R}, kp_node, NodeList) -> +reduce_node(#btree{reduce = R}, kp_node, NodeList) -> R(rereduce, [element(2, Node) || {_K, Node} <- NodeList]); -reduce_node(#btree{reduce=R}=Bt, kv_node, NodeList) -> +reduce_node(#btree{reduce = R} = Bt, kv_node, NodeList) -> R(reduce, [assemble(Bt, K, V) || {K, V} <- NodeList]). reduce_tree_size(kv_node, NodeSize, _KvList) -> @@ -444,17 +483,14 @@ write_node(#btree{fd = Fd, compression = Comp} = Bt, NodeType, NodeList) -> {ok, PtrSizes} = couch_file:append_terms(Fd, ToWrite, WriteOpts), {ok, group_kps(Bt, NodeType, Chunks, PtrSizes)}. - group_kps(_Bt, _NodeType, [], []) -> []; - group_kps(Bt, NodeType, [Chunk | RestChunks], [{Ptr, Size} | RestPtrSizes]) -> {LastKey, _} = lists:last(Chunk), SubTreeSize = reduce_tree_size(NodeType, Size, Chunk), KP = {LastKey, {Ptr, reduce_node(Bt, NodeType, Chunk), SubTreeSize}}, [KP | group_kps(Bt, NodeType, RestChunks, RestPtrSizes)]. - write_node(Bt, _OldNode, NodeType, [], NewList) -> write_node(Bt, NodeType, NewList); write_node(Bt, _OldNode, NodeType, [_], NewList) -> @@ -462,14 +498,16 @@ write_node(Bt, _OldNode, NodeType, [_], NewList) -> write_node(Bt, OldNode, NodeType, OldList, NewList) -> case can_reuse_old_node(OldList, NewList) of {true, Prefix, Suffix} -> - {ok, PrefixKVs} = case Prefix of - [] -> {ok, []}; - _ -> write_node(Bt, NodeType, Prefix) - end, - {ok, SuffixKVs} = case Suffix of - [] -> {ok, []}; - _ -> write_node(Bt, NodeType, Suffix) - end, + {ok, PrefixKVs} = + case Prefix of + [] -> {ok, []}; + _ -> write_node(Bt, NodeType, Prefix) + end, + {ok, SuffixKVs} = + case Suffix of + [] -> {ok, []}; + _ -> write_node(Bt, NodeType, Suffix) + end, Result = PrefixKVs ++ [OldNode] ++ SuffixKVs, {ok, Result}; false -> @@ -481,8 +519,9 @@ can_reuse_old_node(OldList, NewList) -> case old_list_is_prefix(OldList, RestNewList, 0) of {true, Size, Suffix} -> ReuseThreshold = get_chunk_size() * ?FILL_RATIO, - if Size < ReuseThreshold -> false; true -> - {true, Prefix, Suffix} + if + Size < ReuseThreshold -> false; + true -> {true, Prefix, Suffix} end; false -> false @@ -510,38 +549,67 @@ old_list_is_prefix(_OldList, _NewList, _Acc) -> modify_kpnode(Bt, {}, _LowerBound, Actions, [], QueryOutput) -> modify_node(Bt, nil, Actions, QueryOutput); modify_kpnode(_Bt, NodeTuple, LowerBound, [], ResultNode, QueryOutput) -> - {ok, lists:reverse(ResultNode, bounded_tuple_to_list(NodeTuple, LowerBound, - tuple_size(NodeTuple), [])), QueryOutput}; -modify_kpnode(Bt, NodeTuple, LowerBound, - [{_, FirstActionKey, _}|_]=Actions, ResultNode, QueryOutput) -> + {ok, + lists:reverse( + ResultNode, + bounded_tuple_to_list( + NodeTuple, + LowerBound, + tuple_size(NodeTuple), + [] + ) + ), + QueryOutput}; +modify_kpnode( + Bt, + NodeTuple, + LowerBound, + [{_, FirstActionKey, _} | _] = Actions, + ResultNode, + QueryOutput +) -> Sz = tuple_size(NodeTuple), N = find_first_gteq(Bt, NodeTuple, LowerBound, Sz, FirstActionKey), case N =:= Sz of - true -> - % perform remaining actions on last node - {_, PointerInfo} = element(Sz, NodeTuple), - {ok, ChildKPs, QueryOutput2} = - modify_node(Bt, PointerInfo, Actions, QueryOutput), - NodeList = lists:reverse(ResultNode, bounded_tuple_to_list(NodeTuple, LowerBound, - Sz - 1, ChildKPs)), - {ok, NodeList, QueryOutput2}; - false -> - {NodeKey, PointerInfo} = element(N, NodeTuple), - SplitFun = fun({_ActionType, ActionKey, _ActionValue}) -> + true -> + % perform remaining actions on last node + {_, PointerInfo} = element(Sz, NodeTuple), + {ok, ChildKPs, QueryOutput2} = + modify_node(Bt, PointerInfo, Actions, QueryOutput), + NodeList = lists:reverse( + ResultNode, + bounded_tuple_to_list( + NodeTuple, + LowerBound, + Sz - 1, + ChildKPs + ) + ), + {ok, NodeList, QueryOutput2}; + false -> + {NodeKey, PointerInfo} = element(N, NodeTuple), + SplitFun = fun({_ActionType, ActionKey, _ActionValue}) -> not less(Bt, NodeKey, ActionKey) end, - {LessEqQueries, GreaterQueries} = lists:splitwith(SplitFun, Actions), - {ok, ChildKPs, QueryOutput2} = + {LessEqQueries, GreaterQueries} = lists:splitwith(SplitFun, Actions), + {ok, ChildKPs, QueryOutput2} = modify_node(Bt, PointerInfo, LessEqQueries, QueryOutput), - ResultNode2 = lists:reverse(ChildKPs, bounded_tuple_to_revlist(NodeTuple, - LowerBound, N - 1, ResultNode)), - modify_kpnode(Bt, NodeTuple, N+1, GreaterQueries, ResultNode2, QueryOutput2) + ResultNode2 = lists:reverse( + ChildKPs, + bounded_tuple_to_revlist( + NodeTuple, + LowerBound, + N - 1, + ResultNode + ) + ), + modify_kpnode(Bt, NodeTuple, N + 1, GreaterQueries, ResultNode2, QueryOutput2) end. bounded_tuple_to_revlist(_Tuple, Start, End, Tail) when Start > End -> Tail; bounded_tuple_to_revlist(Tuple, Start, End, Tail) -> - bounded_tuple_to_revlist(Tuple, Start+1, End, [element(Start, Tuple)|Tail]). + bounded_tuple_to_revlist(Tuple, Start + 1, End, [element(Start, Tuple) | Tail]). bounded_tuple_to_list(Tuple, Start, End, Tail) -> bounded_tuple_to_list2(Tuple, Start, End, [], Tail). @@ -557,191 +625,438 @@ find_first_gteq(Bt, Tuple, Start, End, Key) -> Mid = Start + ((End - Start) div 2), {TupleKey, _} = element(Mid, Tuple), case less(Bt, TupleKey, Key) of - true -> - find_first_gteq(Bt, Tuple, Mid+1, End, Key); - false -> - find_first_gteq(Bt, Tuple, Start, Mid, Key) + true -> + find_first_gteq(Bt, Tuple, Mid + 1, End, Key); + false -> + find_first_gteq(Bt, Tuple, Start, Mid, Key) end. modify_kvnode(_Bt, NodeTuple, LowerBound, [], ResultNode, QueryOutput) -> - {ok, lists:reverse(ResultNode, bounded_tuple_to_list(NodeTuple, LowerBound, tuple_size(NodeTuple), [])), QueryOutput}; -modify_kvnode(Bt, NodeTuple, LowerBound, [{ActionType, ActionKey, ActionValue} | RestActions], ResultNode, QueryOutput) when LowerBound > tuple_size(NodeTuple) -> + {ok, + lists:reverse( + ResultNode, bounded_tuple_to_list(NodeTuple, LowerBound, tuple_size(NodeTuple), []) + ), + QueryOutput}; +modify_kvnode( + Bt, + NodeTuple, + LowerBound, + [{ActionType, ActionKey, ActionValue} | RestActions], + ResultNode, + QueryOutput +) when LowerBound > tuple_size(NodeTuple) -> case ActionType of - insert -> - modify_kvnode(Bt, NodeTuple, LowerBound, RestActions, [{ActionKey, ActionValue} | ResultNode], QueryOutput); - remove -> - % just drop the action - modify_kvnode(Bt, NodeTuple, LowerBound, RestActions, ResultNode, QueryOutput); - fetch -> - % the key/value must not exist in the tree - modify_kvnode(Bt, NodeTuple, LowerBound, RestActions, ResultNode, [{not_found, {ActionKey, nil}} | QueryOutput]) + insert -> + modify_kvnode( + Bt, + NodeTuple, + LowerBound, + RestActions, + [{ActionKey, ActionValue} | ResultNode], + QueryOutput + ); + remove -> + % just drop the action + modify_kvnode(Bt, NodeTuple, LowerBound, RestActions, ResultNode, QueryOutput); + fetch -> + % the key/value must not exist in the tree + modify_kvnode(Bt, NodeTuple, LowerBound, RestActions, ResultNode, [ + {not_found, {ActionKey, nil}} | QueryOutput + ]) end; -modify_kvnode(Bt, NodeTuple, LowerBound, [{ActionType, ActionKey, ActionValue} | RestActions], AccNode, QueryOutput) -> +modify_kvnode( + Bt, + NodeTuple, + LowerBound, + [{ActionType, ActionKey, ActionValue} | RestActions], + AccNode, + QueryOutput +) -> N = find_first_gteq(Bt, NodeTuple, LowerBound, tuple_size(NodeTuple), ActionKey), {Key, Value} = element(N, NodeTuple), - ResultNode = bounded_tuple_to_revlist(NodeTuple, LowerBound, N - 1, AccNode), + ResultNode = bounded_tuple_to_revlist(NodeTuple, LowerBound, N - 1, AccNode), case less(Bt, ActionKey, Key) of - true -> - case ActionType of - insert -> - % ActionKey is less than the Key, so insert - modify_kvnode(Bt, NodeTuple, N, RestActions, [{ActionKey, ActionValue} | ResultNode], QueryOutput); - remove -> - % ActionKey is less than the Key, just drop the action - modify_kvnode(Bt, NodeTuple, N, RestActions, ResultNode, QueryOutput); - fetch -> - % ActionKey is less than the Key, the key/value must not exist in the tree - modify_kvnode(Bt, NodeTuple, N, RestActions, ResultNode, [{not_found, {ActionKey, nil}} | QueryOutput]) - end; - false -> - % ActionKey and Key are maybe equal. - case less(Bt, Key, ActionKey) of - false -> + true -> case ActionType of - insert -> - modify_kvnode(Bt, NodeTuple, N+1, RestActions, [{ActionKey, ActionValue} | ResultNode], QueryOutput); - remove -> - modify_kvnode(Bt, NodeTuple, N+1, RestActions, ResultNode, QueryOutput); - fetch -> - % ActionKey is equal to the Key, insert into the QueryOuput, but re-process the node - % since an identical action key can follow it. - modify_kvnode(Bt, NodeTuple, N, RestActions, ResultNode, [{ok, assemble(Bt, Key, Value)} | QueryOutput]) + insert -> + % ActionKey is less than the Key, so insert + modify_kvnode( + Bt, + NodeTuple, + N, + RestActions, + [{ActionKey, ActionValue} | ResultNode], + QueryOutput + ); + remove -> + % ActionKey is less than the Key, just drop the action + modify_kvnode(Bt, NodeTuple, N, RestActions, ResultNode, QueryOutput); + fetch -> + % ActionKey is less than the Key, the key/value must not exist in the tree + modify_kvnode(Bt, NodeTuple, N, RestActions, ResultNode, [ + {not_found, {ActionKey, nil}} | QueryOutput + ]) end; - true -> - modify_kvnode(Bt, NodeTuple, N + 1, [{ActionType, ActionKey, ActionValue} | RestActions], [{Key, Value} | ResultNode], QueryOutput) - end + false -> + % ActionKey and Key are maybe equal. + case less(Bt, Key, ActionKey) of + false -> + case ActionType of + insert -> + modify_kvnode( + Bt, + NodeTuple, + N + 1, + RestActions, + [{ActionKey, ActionValue} | ResultNode], + QueryOutput + ); + remove -> + modify_kvnode( + Bt, NodeTuple, N + 1, RestActions, ResultNode, QueryOutput + ); + fetch -> + % ActionKey is equal to the Key, insert into the QueryOuput, but re-process the node + % since an identical action key can follow it. + modify_kvnode(Bt, NodeTuple, N, RestActions, ResultNode, [ + {ok, assemble(Bt, Key, Value)} | QueryOutput + ]) + end; + true -> + modify_kvnode( + Bt, + NodeTuple, + N + 1, + [{ActionType, ActionKey, ActionValue} | RestActions], + [{Key, Value} | ResultNode], + QueryOutput + ) + end end. - -reduce_stream_node(_Bt, _Dir, nil, _KeyStart, _InEndRangeFun, GroupedKey, GroupedKVsAcc, - GroupedRedsAcc, _KeyGroupFun, _Fun, Acc) -> +reduce_stream_node( + _Bt, + _Dir, + nil, + _KeyStart, + _InEndRangeFun, + GroupedKey, + GroupedKVsAcc, + GroupedRedsAcc, + _KeyGroupFun, + _Fun, + Acc +) -> {ok, Acc, GroupedRedsAcc, GroupedKVsAcc, GroupedKey}; -reduce_stream_node(Bt, Dir, Node, KeyStart, InEndRangeFun, GroupedKey, GroupedKVsAcc, - GroupedRedsAcc, KeyGroupFun, Fun, Acc) -> +reduce_stream_node( + Bt, + Dir, + Node, + KeyStart, + InEndRangeFun, + GroupedKey, + GroupedKVsAcc, + GroupedRedsAcc, + KeyGroupFun, + Fun, + Acc +) -> P = element(1, Node), case get_node(Bt, P) of - {kp_node, NodeList} -> - NodeList2 = adjust_dir(Dir, NodeList), - reduce_stream_kp_node(Bt, Dir, NodeList2, KeyStart, InEndRangeFun, GroupedKey, - GroupedKVsAcc, GroupedRedsAcc, KeyGroupFun, Fun, Acc); - {kv_node, KVs} -> - KVs2 = adjust_dir(Dir, KVs), - reduce_stream_kv_node(Bt, Dir, KVs2, KeyStart, InEndRangeFun, GroupedKey, - GroupedKVsAcc, GroupedRedsAcc, KeyGroupFun, Fun, Acc) + {kp_node, NodeList} -> + NodeList2 = adjust_dir(Dir, NodeList), + reduce_stream_kp_node( + Bt, + Dir, + NodeList2, + KeyStart, + InEndRangeFun, + GroupedKey, + GroupedKVsAcc, + GroupedRedsAcc, + KeyGroupFun, + Fun, + Acc + ); + {kv_node, KVs} -> + KVs2 = adjust_dir(Dir, KVs), + reduce_stream_kv_node( + Bt, + Dir, + KVs2, + KeyStart, + InEndRangeFun, + GroupedKey, + GroupedKVsAcc, + GroupedRedsAcc, + KeyGroupFun, + Fun, + Acc + ) end. -reduce_stream_kv_node(Bt, Dir, KVs, KeyStart, InEndRangeFun, - GroupedKey, GroupedKVsAcc, GroupedRedsAcc, - KeyGroupFun, Fun, Acc) -> - +reduce_stream_kv_node( + Bt, + Dir, + KVs, + KeyStart, + InEndRangeFun, + GroupedKey, + GroupedKVsAcc, + GroupedRedsAcc, + KeyGroupFun, + Fun, + Acc +) -> GTEKeyStartKVs = - case KeyStart of - undefined -> - KVs; - _ -> - DropFun = case Dir of - fwd -> - fun({Key, _}) -> less(Bt, Key, KeyStart) end; - rev -> - fun({Key, _}) -> less(Bt, KeyStart, Key) end + case KeyStart of + undefined -> + KVs; + _ -> + DropFun = + case Dir of + fwd -> + fun({Key, _}) -> less(Bt, Key, KeyStart) end; + rev -> + fun({Key, _}) -> less(Bt, KeyStart, Key) end + end, + lists:dropwhile(DropFun, KVs) end, - lists:dropwhile(DropFun, KVs) - end, KVs2 = lists:takewhile( - fun({Key, _}) -> InEndRangeFun(Key) end, GTEKeyStartKVs), - reduce_stream_kv_node2(Bt, KVs2, GroupedKey, GroupedKVsAcc, GroupedRedsAcc, - KeyGroupFun, Fun, Acc). - - -reduce_stream_kv_node2(_Bt, [], GroupedKey, GroupedKVsAcc, GroupedRedsAcc, - _KeyGroupFun, _Fun, Acc) -> + fun({Key, _}) -> InEndRangeFun(Key) end, GTEKeyStartKVs + ), + reduce_stream_kv_node2( + Bt, + KVs2, + GroupedKey, + GroupedKVsAcc, + GroupedRedsAcc, + KeyGroupFun, + Fun, + Acc + ). + +reduce_stream_kv_node2( + _Bt, + [], + GroupedKey, + GroupedKVsAcc, + GroupedRedsAcc, + _KeyGroupFun, + _Fun, + Acc +) -> {ok, Acc, GroupedRedsAcc, GroupedKVsAcc, GroupedKey}; -reduce_stream_kv_node2(Bt, [{Key, Value}| RestKVs], GroupedKey, GroupedKVsAcc, - GroupedRedsAcc, KeyGroupFun, Fun, Acc) -> +reduce_stream_kv_node2( + Bt, + [{Key, Value} | RestKVs], + GroupedKey, + GroupedKVsAcc, + GroupedRedsAcc, + KeyGroupFun, + Fun, + Acc +) -> case GroupedKey of - undefined -> - reduce_stream_kv_node2(Bt, RestKVs, Key, - [assemble(Bt,Key,Value)], [], KeyGroupFun, Fun, Acc); - _ -> - - case KeyGroupFun(GroupedKey, Key) of - true -> - reduce_stream_kv_node2(Bt, RestKVs, GroupedKey, - [assemble(Bt,Key,Value)|GroupedKVsAcc], GroupedRedsAcc, KeyGroupFun, - Fun, Acc); - false -> - case Fun(GroupedKey, {GroupedKVsAcc, GroupedRedsAcc}, Acc) of - {ok, Acc2} -> - reduce_stream_kv_node2(Bt, RestKVs, Key, [assemble(Bt,Key,Value)], - [], KeyGroupFun, Fun, Acc2); - {stop, Acc2} -> - throw({stop, Acc2}) + undefined -> + reduce_stream_kv_node2( + Bt, + RestKVs, + Key, + [assemble(Bt, Key, Value)], + [], + KeyGroupFun, + Fun, + Acc + ); + _ -> + case KeyGroupFun(GroupedKey, Key) of + true -> + reduce_stream_kv_node2( + Bt, + RestKVs, + GroupedKey, + [assemble(Bt, Key, Value) | GroupedKVsAcc], + GroupedRedsAcc, + KeyGroupFun, + Fun, + Acc + ); + false -> + case Fun(GroupedKey, {GroupedKVsAcc, GroupedRedsAcc}, Acc) of + {ok, Acc2} -> + reduce_stream_kv_node2( + Bt, + RestKVs, + Key, + [assemble(Bt, Key, Value)], + [], + KeyGroupFun, + Fun, + Acc2 + ); + {stop, Acc2} -> + throw({stop, Acc2}) + end end - end end. -reduce_stream_kp_node(Bt, Dir, NodeList, KeyStart, InEndRangeFun, - GroupedKey, GroupedKVsAcc, GroupedRedsAcc, - KeyGroupFun, Fun, Acc) -> +reduce_stream_kp_node( + Bt, + Dir, + NodeList, + KeyStart, + InEndRangeFun, + GroupedKey, + GroupedKVsAcc, + GroupedRedsAcc, + KeyGroupFun, + Fun, + Acc +) -> Nodes = - case KeyStart of - undefined -> - NodeList; - _ -> - case Dir of - fwd -> - lists:dropwhile(fun({Key, _}) -> less(Bt, Key, KeyStart) end, NodeList); - rev -> - RevKPs = lists:reverse(NodeList), - case lists:splitwith(fun({Key, _}) -> less(Bt, Key, KeyStart) end, RevKPs) of - {_Before, []} -> + case KeyStart of + undefined -> NodeList; - {Before, [FirstAfter | _]} -> - [FirstAfter | lists:reverse(Before)] - end - end - end, + _ -> + case Dir of + fwd -> + lists:dropwhile(fun({Key, _}) -> less(Bt, Key, KeyStart) end, NodeList); + rev -> + RevKPs = lists:reverse(NodeList), + case + lists:splitwith(fun({Key, _}) -> less(Bt, Key, KeyStart) end, RevKPs) + of + {_Before, []} -> + NodeList; + {Before, [FirstAfter | _]} -> + [FirstAfter | lists:reverse(Before)] + end + end + end, {InRange, MaybeInRange} = lists:splitwith( - fun({Key, _}) -> InEndRangeFun(Key) end, Nodes), - NodesInRange = case MaybeInRange of - [FirstMaybeInRange | _] when Dir =:= fwd -> - InRange ++ [FirstMaybeInRange]; - _ -> - InRange - end, - reduce_stream_kp_node2(Bt, Dir, NodesInRange, KeyStart, InEndRangeFun, - GroupedKey, GroupedKVsAcc, GroupedRedsAcc, KeyGroupFun, Fun, Acc). - - -reduce_stream_kp_node2(Bt, Dir, [{_Key, NodeInfo} | RestNodeList], KeyStart, InEndRangeFun, - undefined, [], [], KeyGroupFun, Fun, Acc) -> + fun({Key, _}) -> InEndRangeFun(Key) end, Nodes + ), + NodesInRange = + case MaybeInRange of + [FirstMaybeInRange | _] when Dir =:= fwd -> + InRange ++ [FirstMaybeInRange]; + _ -> + InRange + end, + reduce_stream_kp_node2( + Bt, + Dir, + NodesInRange, + KeyStart, + InEndRangeFun, + GroupedKey, + GroupedKVsAcc, + GroupedRedsAcc, + KeyGroupFun, + Fun, + Acc + ). + +reduce_stream_kp_node2( + Bt, + Dir, + [{_Key, NodeInfo} | RestNodeList], + KeyStart, + InEndRangeFun, + undefined, + [], + [], + KeyGroupFun, + Fun, + Acc +) -> {ok, Acc2, GroupedRedsAcc2, GroupedKVsAcc2, GroupedKey2} = - reduce_stream_node(Bt, Dir, NodeInfo, KeyStart, InEndRangeFun, undefined, - [], [], KeyGroupFun, Fun, Acc), - reduce_stream_kp_node2(Bt, Dir, RestNodeList, KeyStart, InEndRangeFun, GroupedKey2, - GroupedKVsAcc2, GroupedRedsAcc2, KeyGroupFun, Fun, Acc2); -reduce_stream_kp_node2(Bt, Dir, NodeList, KeyStart, InEndRangeFun, - GroupedKey, GroupedKVsAcc, GroupedRedsAcc, KeyGroupFun, Fun, Acc) -> - {Grouped0, Ungrouped0} = lists:splitwith(fun({Key,_}) -> - KeyGroupFun(GroupedKey, Key) end, NodeList), + reduce_stream_node( + Bt, + Dir, + NodeInfo, + KeyStart, + InEndRangeFun, + undefined, + [], + [], + KeyGroupFun, + Fun, + Acc + ), + reduce_stream_kp_node2( + Bt, + Dir, + RestNodeList, + KeyStart, + InEndRangeFun, + GroupedKey2, + GroupedKVsAcc2, + GroupedRedsAcc2, + KeyGroupFun, + Fun, + Acc2 + ); +reduce_stream_kp_node2( + Bt, + Dir, + NodeList, + KeyStart, + InEndRangeFun, + GroupedKey, + GroupedKVsAcc, + GroupedRedsAcc, + KeyGroupFun, + Fun, + Acc +) -> + {Grouped0, Ungrouped0} = lists:splitwith( + fun({Key, _}) -> + KeyGroupFun(GroupedKey, Key) + end, + NodeList + ), {GroupedNodes, UngroupedNodes} = - case Grouped0 of - [] -> - {Grouped0, Ungrouped0}; - _ -> - [FirstGrouped | RestGrouped] = lists:reverse(Grouped0), - {RestGrouped, [FirstGrouped | Ungrouped0]} - end, + case Grouped0 of + [] -> + {Grouped0, Ungrouped0}; + _ -> + [FirstGrouped | RestGrouped] = lists:reverse(Grouped0), + {RestGrouped, [FirstGrouped | Ungrouped0]} + end, GroupedReds = [element(2, Node) || {_, Node} <- GroupedNodes], case UngroupedNodes of - [{_Key, NodeInfo}|RestNodes] -> - {ok, Acc2, GroupedRedsAcc2, GroupedKVsAcc2, GroupedKey2} = - reduce_stream_node(Bt, Dir, NodeInfo, KeyStart, InEndRangeFun, GroupedKey, - GroupedKVsAcc, GroupedReds ++ GroupedRedsAcc, KeyGroupFun, Fun, Acc), - reduce_stream_kp_node2(Bt, Dir, RestNodes, KeyStart, InEndRangeFun, GroupedKey2, - GroupedKVsAcc2, GroupedRedsAcc2, KeyGroupFun, Fun, Acc2); - [] -> - {ok, Acc, GroupedReds ++ GroupedRedsAcc, GroupedKVsAcc, GroupedKey} + [{_Key, NodeInfo} | RestNodes] -> + {ok, Acc2, GroupedRedsAcc2, GroupedKVsAcc2, GroupedKey2} = + reduce_stream_node( + Bt, + Dir, + NodeInfo, + KeyStart, + InEndRangeFun, + GroupedKey, + GroupedKVsAcc, + GroupedReds ++ GroupedRedsAcc, + KeyGroupFun, + Fun, + Acc + ), + reduce_stream_kp_node2( + Bt, + Dir, + RestNodes, + KeyStart, + InEndRangeFun, + GroupedKey2, + GroupedKVsAcc2, + GroupedRedsAcc2, + KeyGroupFun, + Fun, + Acc2 + ); + [] -> + {ok, Acc, GroupedReds ++ GroupedRedsAcc, GroupedKVsAcc, GroupedKey} end. adjust_dir(fwd, List) -> @@ -753,20 +1068,20 @@ stream_node(Bt, Reds, Node, StartKey, InRange, Dir, Fun, Acc) -> Pointer = element(1, Node), {NodeType, NodeList} = get_node(Bt, Pointer), case NodeType of - kp_node -> - stream_kp_node(Bt, Reds, adjust_dir(Dir, NodeList), StartKey, InRange, Dir, Fun, Acc); - kv_node -> - stream_kv_node(Bt, Reds, adjust_dir(Dir, NodeList), StartKey, InRange, Dir, Fun, Acc) + kp_node -> + stream_kp_node(Bt, Reds, adjust_dir(Dir, NodeList), StartKey, InRange, Dir, Fun, Acc); + kv_node -> + stream_kv_node(Bt, Reds, adjust_dir(Dir, NodeList), StartKey, InRange, Dir, Fun, Acc) end. stream_node(Bt, Reds, Node, InRange, Dir, Fun, Acc) -> Pointer = element(1, Node), {NodeType, NodeList} = get_node(Bt, Pointer), case NodeType of - kp_node -> - stream_kp_node(Bt, Reds, adjust_dir(Dir, NodeList), InRange, Dir, Fun, Acc); - kv_node -> - stream_kv_node2(Bt, Reds, [], adjust_dir(Dir, NodeList), InRange, Dir, Fun, Acc) + kp_node -> + stream_kp_node(Bt, Reds, adjust_dir(Dir, NodeList), InRange, Dir, Fun, Acc); + kv_node -> + stream_kv_node2(Bt, Reds, [], adjust_dir(Dir, NodeList), InRange, Dir, Fun, Acc) end. stream_kp_node(_Bt, _Reds, [], _InRange, _Dir, _Fun, Acc) -> @@ -774,84 +1089,87 @@ stream_kp_node(_Bt, _Reds, [], _InRange, _Dir, _Fun, Acc) -> stream_kp_node(Bt, Reds, [{Key, Node} | Rest], InRange, Dir, Fun, Acc) -> Red = element(2, Node), case Fun(traverse, Key, Red, Acc) of - {ok, Acc2} -> - case stream_node(Bt, Reds, Node, InRange, Dir, Fun, Acc2) of - {ok, Acc3} -> - stream_kp_node(Bt, [Red | Reds], Rest, InRange, Dir, Fun, Acc3); - {stop, LastReds, Acc3} -> - {stop, LastReds, Acc3} - end; - {skip, Acc2} -> - stream_kp_node(Bt, [Red | Reds], Rest, InRange, Dir, Fun, Acc2); - {stop, Acc2} -> - {stop, Reds, Acc2} + {ok, Acc2} -> + case stream_node(Bt, Reds, Node, InRange, Dir, Fun, Acc2) of + {ok, Acc3} -> + stream_kp_node(Bt, [Red | Reds], Rest, InRange, Dir, Fun, Acc3); + {stop, LastReds, Acc3} -> + {stop, LastReds, Acc3} + end; + {skip, Acc2} -> + stream_kp_node(Bt, [Red | Reds], Rest, InRange, Dir, Fun, Acc2); + {stop, Acc2} -> + {stop, Reds, Acc2} end. drop_nodes(_Bt, Reds, _StartKey, []) -> {Reds, []}; drop_nodes(Bt, Reds, StartKey, [{NodeKey, Node} | RestKPs]) -> case less(Bt, NodeKey, StartKey) of - true -> - drop_nodes(Bt, [element(2, Node) | Reds], StartKey, RestKPs); - false -> - {Reds, [{NodeKey, Node} | RestKPs]} + true -> + drop_nodes(Bt, [element(2, Node) | Reds], StartKey, RestKPs); + false -> + {Reds, [{NodeKey, Node} | RestKPs]} end. stream_kp_node(Bt, Reds, KPs, StartKey, InRange, Dir, Fun, Acc) -> {NewReds, NodesToStream} = - case Dir of - fwd -> - % drop all nodes sorting before the key - drop_nodes(Bt, Reds, StartKey, KPs); - rev -> - % keep all nodes sorting before the key, AND the first node to sort after - RevKPs = lists:reverse(KPs), - case lists:splitwith(fun({Key, _Pointer}) -> less(Bt, Key, StartKey) end, RevKPs) of - {_RevsBefore, []} -> - % everything sorts before it - {Reds, KPs}; - {RevBefore, [FirstAfter | Drop]} -> - {[element(2, Node) || {_K, Node} <- Drop] ++ Reds, - [FirstAfter | lists:reverse(RevBefore)]} - end - end, + case Dir of + fwd -> + % drop all nodes sorting before the key + drop_nodes(Bt, Reds, StartKey, KPs); + rev -> + % keep all nodes sorting before the key, AND the first node to sort after + RevKPs = lists:reverse(KPs), + case lists:splitwith(fun({Key, _Pointer}) -> less(Bt, Key, StartKey) end, RevKPs) of + {_RevsBefore, []} -> + % everything sorts before it + {Reds, KPs}; + {RevBefore, [FirstAfter | Drop]} -> + {[element(2, Node) || {_K, Node} <- Drop] ++ Reds, [ + FirstAfter | lists:reverse(RevBefore) + ]} + end + end, case NodesToStream of - [] -> - {ok, Acc}; - [{_Key, Node} | Rest] -> - case stream_node(Bt, NewReds, Node, StartKey, InRange, Dir, Fun, Acc) of - {ok, Acc2} -> - Red = element(2, Node), - stream_kp_node(Bt, [Red | NewReds], Rest, InRange, Dir, Fun, Acc2); - {stop, LastReds, Acc2} -> - {stop, LastReds, Acc2} - end + [] -> + {ok, Acc}; + [{_Key, Node} | Rest] -> + case stream_node(Bt, NewReds, Node, StartKey, InRange, Dir, Fun, Acc) of + {ok, Acc2} -> + Red = element(2, Node), + stream_kp_node(Bt, [Red | NewReds], Rest, InRange, Dir, Fun, Acc2); + {stop, LastReds, Acc2} -> + {stop, LastReds, Acc2} + end end. stream_kv_node(Bt, Reds, KVs, StartKey, InRange, Dir, Fun, Acc) -> DropFun = - case Dir of - fwd -> - fun({Key, _}) -> less(Bt, Key, StartKey) end; - rev -> - fun({Key, _}) -> less(Bt, StartKey, Key) end - end, + case Dir of + fwd -> + fun({Key, _}) -> less(Bt, Key, StartKey) end; + rev -> + fun({Key, _}) -> less(Bt, StartKey, Key) end + end, {LTKVs, GTEKVs} = lists:splitwith(DropFun, KVs), - AssembleLTKVs = [assemble(Bt,K,V) || {K,V} <- LTKVs], + AssembleLTKVs = [assemble(Bt, K, V) || {K, V} <- LTKVs], stream_kv_node2(Bt, Reds, AssembleLTKVs, GTEKVs, InRange, Dir, Fun, Acc). stream_kv_node2(_Bt, _Reds, _PrevKVs, [], _InRange, _Dir, _Fun, Acc) -> {ok, Acc}; -stream_kv_node2(Bt, Reds, PrevKVs, [{K,V} | RestKVs], InRange, Dir, Fun, Acc) -> +stream_kv_node2(Bt, Reds, PrevKVs, [{K, V} | RestKVs], InRange, Dir, Fun, Acc) -> case InRange(K) of - false -> - {stop, {PrevKVs, Reds}, Acc}; - true -> - AssembledKV = assemble(Bt, K, V), - case Fun(visit, AssembledKV, {PrevKVs, Reds}, Acc) of - {ok, Acc2} -> - stream_kv_node2(Bt, Reds, [AssembledKV | PrevKVs], RestKVs, InRange, Dir, Fun, Acc2); - {stop, Acc2} -> - {stop, {PrevKVs, Reds}, Acc2} - end + false -> + {stop, {PrevKVs, Reds}, Acc}; + true -> + AssembledKV = assemble(Bt, K, V), + case Fun(visit, AssembledKV, {PrevKVs, Reds}, Acc) of + {ok, Acc2} -> + stream_kv_node2( + Bt, Reds, [AssembledKV | PrevKVs], RestKVs, InRange, Dir, Fun, Acc2 + ); + {stop, Acc2} -> + {stop, {PrevKVs, Reds}, Acc2} + end end. |