diff options
Diffstat (limited to 'src/couch/src/couch_btree.erl')
-rw-r--r-- | src/couch/src/couch_btree.erl | 855 |
1 files changed, 0 insertions, 855 deletions
diff --git a/src/couch/src/couch_btree.erl b/src/couch/src/couch_btree.erl deleted file mode 100644 index ea0cf69e9..000000000 --- a/src/couch/src/couch_btree.erl +++ /dev/null @@ -1,855 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_btree). - --export([open/2, open/3, query_modify/4, add/2, add_remove/3]). --export([fold/4, full_reduce/1, final_reduce/2, size/1, foldl/3, foldl/4]). --export([fold_reduce/4, lookup/2, get_state/1, set_options/2]). --export([extract/2, assemble/3, less/3]). - --include_lib("couch/include/couch_db.hrl"). - --define(FILL_RATIO, 0.5). - -extract(#btree{extract_kv=undefined}, Value) -> - Value; -extract(#btree{extract_kv=Extract}, Value) -> - Extract(Value). - -assemble(#btree{assemble_kv=undefined}, Key, Value) -> - {Key, Value}; -assemble(#btree{assemble_kv=Assemble}, Key, Value) -> - Assemble(Key, Value). - -less(#btree{less=undefined}, A, B) -> - A < B; -less(#btree{less=Less}, A, B) -> - Less(A, B). - -% pass in 'nil' for State if a new Btree. -open(State, Fd) -> - {ok, #btree{root=State, fd=Fd}}. - -set_options(Bt, []) -> - Bt; -set_options(Bt, [{split, Extract}|Rest]) -> - set_options(Bt#btree{extract_kv=Extract}, Rest); -set_options(Bt, [{join, Assemble}|Rest]) -> - set_options(Bt#btree{assemble_kv=Assemble}, Rest); -set_options(Bt, [{less, Less}|Rest]) -> - set_options(Bt#btree{less=Less}, Rest); -set_options(Bt, [{reduce, Reduce}|Rest]) -> - set_options(Bt#btree{reduce=Reduce}, Rest); -set_options(Bt, [{compression, Comp}|Rest]) -> - set_options(Bt#btree{compression=Comp}, Rest). - -open(State, Fd, Options) -> - {ok, set_options(#btree{root=State, fd=Fd}, Options)}. - -get_state(#btree{root=Root}) -> - Root. - -final_reduce(#btree{reduce=Reduce}, Val) -> - final_reduce(Reduce, Val); -final_reduce(Reduce, {[], []}) -> - Reduce(reduce, []); -final_reduce(_Bt, {[], [Red]}) -> - Red; -final_reduce(Reduce, {[], Reductions}) -> - Reduce(rereduce, Reductions); -final_reduce(Reduce, {KVs, Reductions}) -> - Red = Reduce(reduce, KVs), - final_reduce(Reduce, {[], [Red | Reductions]}). - -fold_reduce(#btree{root=Root}=Bt, Fun, Acc, Options) -> - Dir = couch_util:get_value(dir, Options, fwd), - StartKey = couch_util:get_value(start_key, Options), - InEndRangeFun = make_key_in_end_range_function(Bt, Dir, Options), - KeyGroupFun = get_group_fun(Bt, Options), - try - {ok, Acc2, GroupedRedsAcc2, GroupedKVsAcc2, GroupedKey2} = - reduce_stream_node(Bt, Dir, Root, StartKey, InEndRangeFun, undefined, [], [], - KeyGroupFun, Fun, Acc), - if GroupedKey2 == undefined -> - {ok, Acc2}; - true -> - case Fun(GroupedKey2, {GroupedKVsAcc2, GroupedRedsAcc2}, Acc2) of - {ok, Acc3} -> {ok, Acc3}; - {stop, Acc3} -> {ok, Acc3} - end - end - catch - throw:{stop, AccDone} -> {ok, AccDone} - end. - -full_reduce(#btree{root=nil,reduce=Reduce}) -> - {ok, Reduce(reduce, [])}; -full_reduce(#btree{root=Root}) -> - {ok, element(2, Root)}. - -size(#btree{root = nil}) -> - 0; -size(#btree{root = {_P, _Red}}) -> - % pre 1.2 format - nil; -size(#btree{root = {_P, _Red, Size}}) -> - Size. - -get_group_fun(Bt, Options) -> - case couch_util:get_value(key_group_level, Options) of - exact -> - make_group_fun(Bt, exact); - 0 -> - fun(_, _) -> true end; - N when is_integer(N), N > 0 -> - make_group_fun(Bt, N); - undefined -> - couch_util:get_value(key_group_fun, Options, fun(_,_) -> true end) - end. - -make_group_fun(Bt, exact) -> - fun({Key1, _}, {Key2, _}) -> - case less(Bt, {Key1, nil}, {Key2, nil}) of - false -> - case less(Bt, {Key2, nil}, {Key1, nil}) of - false -> - true; - _ -> - false - end; - _ -> - false - end - end; -make_group_fun(Bt, GroupLevel) when is_integer(GroupLevel), GroupLevel > 0 -> - fun - GF({{p, Partition, Key1}, Val1}, {{p, Partition, Key2}, Val2}) -> - GF({Key1, Val1}, {Key2, Val2}); - GF({[_|_] = Key1, _}, {[_|_] = Key2, _}) -> - SL1 = lists:sublist(Key1, GroupLevel), - SL2 = lists:sublist(Key2, GroupLevel), - case less(Bt, {SL1, nil}, {SL2, nil}) of - false -> - case less(Bt, {SL2, nil}, {SL1, nil}) of - false -> - true; - _ -> - false - end; - _ -> - false - end; - GF({Key1, _}, {Key2, _}) -> - case less(Bt, {Key1, nil}, {Key2, nil}) of - false -> - case less(Bt, {Key2, nil}, {Key1, nil}) of - false -> - true; - _ -> - false - end; - _ -> - false - end - end. - -% wraps a 2 arity function with the proper 3 arity function -convert_fun_arity(Fun) when is_function(Fun, 2) -> - fun - (visit, KV, _Reds, AccIn) -> Fun(KV, AccIn); - (traverse, _K, _Red, AccIn) -> {ok, AccIn} - end; -convert_fun_arity(Fun) when is_function(Fun, 3) -> - fun - (visit, KV, Reds, AccIn) -> Fun(KV, Reds, AccIn); - (traverse, _K, _Red, AccIn) -> {ok, AccIn} - end; -convert_fun_arity(Fun) when is_function(Fun, 4) -> - Fun. % Already arity 4 - -make_key_in_end_range_function(Bt, fwd, Options) -> - case couch_util:get_value(end_key_gt, Options) of - undefined -> - case couch_util:get_value(end_key, Options) of - undefined -> - fun(_Key) -> true end; - LastKey -> - fun(Key) -> not less(Bt, LastKey, Key) end - end; - EndKey -> - fun(Key) -> less(Bt, Key, EndKey) end - end; -make_key_in_end_range_function(Bt, rev, Options) -> - case couch_util:get_value(end_key_gt, Options) of - undefined -> - case couch_util:get_value(end_key, Options) of - undefined -> - fun(_Key) -> true end; - LastKey -> - fun(Key) -> not less(Bt, Key, LastKey) end - end; - EndKey -> - fun(Key) -> less(Bt, EndKey, Key) end - end. - - -foldl(Bt, Fun, Acc) -> - fold(Bt, Fun, Acc, []). - -foldl(Bt, Fun, Acc, Options) -> - fold(Bt, Fun, Acc, Options). - - -fold(#btree{root=nil}, _Fun, Acc, _Options) -> - {ok, {[], []}, Acc}; -fold(#btree{root=Root}=Bt, Fun, Acc, Options) -> - Dir = couch_util:get_value(dir, Options, fwd), - InRange = make_key_in_end_range_function(Bt, Dir, Options), - Result = - case couch_util:get_value(start_key, Options) of - undefined -> - stream_node(Bt, [], Bt#btree.root, InRange, Dir, - convert_fun_arity(Fun), Acc); - StartKey -> - stream_node(Bt, [], Bt#btree.root, StartKey, InRange, Dir, - convert_fun_arity(Fun), Acc) - end, - case Result of - {ok, Acc2}-> - FullReduction = element(2, Root), - {ok, {[], [FullReduction]}, Acc2}; - {stop, LastReduction, Acc2} -> - {ok, LastReduction, Acc2} - end. - -add(Bt, InsertKeyValues) -> - add_remove(Bt, InsertKeyValues, []). - -add_remove(Bt, InsertKeyValues, RemoveKeys) -> - {ok, [], Bt2} = query_modify(Bt, [], InsertKeyValues, RemoveKeys), - {ok, Bt2}. - -query_modify(Bt, LookupKeys, InsertValues, RemoveKeys) -> - #btree{root=Root} = Bt, - InsertActions = lists:map( - fun(KeyValue) -> - {Key, Value} = extract(Bt, KeyValue), - {insert, Key, Value} - end, InsertValues), - RemoveActions = [{remove, Key, nil} || Key <- RemoveKeys], - FetchActions = [{fetch, Key, nil} || Key <- LookupKeys], - SortFun = - fun({OpA, A, _}, {OpB, B, _}) -> - case A == B of - % A and B are equal, sort by op. - true -> op_order(OpA) < op_order(OpB); - false -> - less(Bt, A, B) - end - end, - Actions = lists:sort(SortFun, lists:append([InsertActions, RemoveActions, FetchActions])), - {ok, KeyPointers, QueryResults} = modify_node(Bt, Root, Actions, []), - {ok, NewRoot} = complete_root(Bt, KeyPointers), - {ok, QueryResults, Bt#btree{root=NewRoot}}. - -% for ordering different operations with the same key. -% fetch < remove < insert -op_order(fetch) -> 1; -op_order(remove) -> 2; -op_order(insert) -> 3. - -lookup(#btree{root=Root, less=Less}=Bt, Keys) -> - SortedKeys = case Less of - undefined -> lists:sort(Keys); - _ -> lists:sort(Less, Keys) - end, - {ok, SortedResults} = lookup(Bt, Root, SortedKeys), - % We want to return the results in the same order as the keys were input - % but we may have changed the order when we sorted. So we need to put the - % order back into the results. - couch_util:reorder_results(Keys, SortedResults). - -lookup(_Bt, nil, Keys) -> - {ok, [{Key, not_found} || Key <- Keys]}; -lookup(Bt, Node, Keys) -> - Pointer = element(1, Node), - {NodeType, NodeList} = get_node(Bt, Pointer), - case NodeType of - kp_node -> - lookup_kpnode(Bt, list_to_tuple(NodeList), 1, Keys, []); - kv_node -> - lookup_kvnode(Bt, list_to_tuple(NodeList), 1, Keys, []) - end. - -lookup_kpnode(_Bt, _NodeTuple, _LowerBound, [], Output) -> - {ok, lists:reverse(Output)}; -lookup_kpnode(_Bt, NodeTuple, LowerBound, Keys, Output) when tuple_size(NodeTuple) < LowerBound -> - {ok, lists:reverse(Output, [{Key, not_found} || Key <- Keys])}; -lookup_kpnode(Bt, NodeTuple, LowerBound, [FirstLookupKey | _] = LookupKeys, Output) -> - N = find_first_gteq(Bt, NodeTuple, LowerBound, tuple_size(NodeTuple), FirstLookupKey), - {Key, PointerInfo} = element(N, NodeTuple), - SplitFun = fun(LookupKey) -> not less(Bt, Key, LookupKey) end, - case lists:splitwith(SplitFun, LookupKeys) of - {[], GreaterQueries} -> - lookup_kpnode(Bt, NodeTuple, N + 1, GreaterQueries, Output); - {LessEqQueries, GreaterQueries} -> - {ok, Results} = lookup(Bt, PointerInfo, LessEqQueries), - lookup_kpnode(Bt, NodeTuple, N + 1, GreaterQueries, lists:reverse(Results, Output)) - end. - - -lookup_kvnode(_Bt, _NodeTuple, _LowerBound, [], Output) -> - {ok, lists:reverse(Output)}; -lookup_kvnode(_Bt, NodeTuple, LowerBound, Keys, Output) when tuple_size(NodeTuple) < LowerBound -> - % keys not found - {ok, lists:reverse(Output, [{Key, not_found} || Key <- Keys])}; -lookup_kvnode(Bt, NodeTuple, LowerBound, [LookupKey | RestLookupKeys], Output) -> - N = find_first_gteq(Bt, NodeTuple, LowerBound, tuple_size(NodeTuple), LookupKey), - {Key, Value} = element(N, NodeTuple), - case less(Bt, LookupKey, Key) of - true -> - % LookupKey is less than Key - lookup_kvnode(Bt, NodeTuple, N, RestLookupKeys, [{LookupKey, not_found} | Output]); - false -> - case less(Bt, Key, LookupKey) of - true -> - % LookupKey is greater than Key - lookup_kvnode(Bt, NodeTuple, N+1, RestLookupKeys, [{LookupKey, not_found} | Output]); - false -> - % LookupKey is equal to Key - lookup_kvnode(Bt, NodeTuple, N, RestLookupKeys, [{LookupKey, {ok, assemble(Bt, LookupKey, Value)}} | Output]) - end - end. - - -complete_root(_Bt, []) -> - {ok, nil}; -complete_root(_Bt, [{_Key, PointerInfo}])-> - {ok, PointerInfo}; -complete_root(Bt, KPs) -> - {ok, ResultKeyPointers} = write_node(Bt, kp_node, KPs), - complete_root(Bt, ResultKeyPointers). - -%%%%%%%%%%%%% The chunkify function sucks! %%%%%%%%%%%%% -% It is inaccurate as it does not account for compression when blocks are -% written. Plus with the "case byte_size(term_to_binary(InList)) of" code -% it's probably really inefficient. - -chunkify(InList) -> - BaseChunkSize = get_chunk_size(), - case ?term_size(InList) of - Size when Size > BaseChunkSize -> - NumberOfChunksLikely = ((Size div BaseChunkSize) + 1), - ChunkThreshold = Size div NumberOfChunksLikely, - chunkify(InList, ChunkThreshold, [], 0, []); - _Else -> - [InList] - end. - -chunkify([], _ChunkThreshold, [], 0, OutputChunks) -> - lists:reverse(OutputChunks); -chunkify([], _ChunkThreshold, [Item], _OutListSize, [PrevChunk | RestChunks]) -> - NewPrevChunk = PrevChunk ++ [Item], - lists:reverse(RestChunks, [NewPrevChunk]); -chunkify([], _ChunkThreshold, OutList, _OutListSize, OutputChunks) -> - lists:reverse([lists:reverse(OutList) | OutputChunks]); -chunkify([InElement | RestInList], ChunkThreshold, OutList, OutListSize, OutputChunks) -> - case ?term_size(InElement) of - Size when (Size + OutListSize) > ChunkThreshold andalso OutList /= [] -> - chunkify(RestInList, ChunkThreshold, [], 0, [lists:reverse([InElement | OutList]) | OutputChunks]); - Size -> - chunkify(RestInList, ChunkThreshold, [InElement | OutList], OutListSize + Size, OutputChunks) - end. - --compile({inline,[get_chunk_size/0]}). -get_chunk_size() -> - try - list_to_integer(config:get("couchdb", "btree_chunk_size", "1279")) - catch error:badarg -> - 1279 - end. - -modify_node(Bt, RootPointerInfo, Actions, QueryOutput) -> - {NodeType, NodeList} = case RootPointerInfo of - nil -> - {kv_node, []}; - _Tuple -> - Pointer = element(1, RootPointerInfo), - get_node(Bt, Pointer) - end, - NodeTuple = list_to_tuple(NodeList), - - {ok, NewNodeList, QueryOutput2} = - case NodeType of - kp_node -> modify_kpnode(Bt, NodeTuple, 1, Actions, [], QueryOutput); - kv_node -> modify_kvnode(Bt, NodeTuple, 1, Actions, [], QueryOutput) - end, - case NewNodeList of - [] -> % no nodes remain - {ok, [], QueryOutput2}; - NodeList -> % nothing changed - {LastKey, _LastValue} = element(tuple_size(NodeTuple), NodeTuple), - {ok, [{LastKey, RootPointerInfo}], QueryOutput2}; - _Else2 -> - {ok, ResultList} = case RootPointerInfo of - nil -> - write_node(Bt, NodeType, NewNodeList); - _ -> - {LastKey, _LastValue} = element(tuple_size(NodeTuple), NodeTuple), - OldNode = {LastKey, RootPointerInfo}, - write_node(Bt, OldNode, NodeType, NodeList, NewNodeList) - end, - {ok, ResultList, QueryOutput2} - end. - -reduce_node(#btree{reduce=nil}, _NodeType, _NodeList) -> - []; -reduce_node(#btree{reduce=R}, kp_node, NodeList) -> - R(rereduce, [element(2, Node) || {_K, Node} <- NodeList]); -reduce_node(#btree{reduce=R}=Bt, kv_node, NodeList) -> - R(reduce, [assemble(Bt, K, V) || {K, V} <- NodeList]). - -reduce_tree_size(kv_node, NodeSize, _KvList) -> - NodeSize; -reduce_tree_size(kp_node, NodeSize, []) -> - NodeSize; -reduce_tree_size(kp_node, _NodeSize, [{_K, {_P, _Red}} | _]) -> - % pre 1.2 format - nil; -reduce_tree_size(kp_node, _NodeSize, [{_K, {_P, _Red, nil}} | _]) -> - nil; -reduce_tree_size(kp_node, NodeSize, [{_K, {_P, _Red, Sz}} | NodeList]) -> - reduce_tree_size(kp_node, NodeSize + Sz, NodeList). - -get_node(#btree{fd = Fd}, NodePos) -> - {ok, {NodeType, NodeList}} = couch_file:pread_term(Fd, NodePos), - {NodeType, NodeList}. - -write_node(#btree{fd = Fd, compression = Comp} = Bt, NodeType, NodeList) -> - % split up nodes into smaller sizes - NodeListList = chunkify(NodeList), - % now write out each chunk and return the KeyPointer pairs for those nodes - ResultList = [ - begin - {ok, Pointer, Size} = couch_file:append_term( - Fd, {NodeType, ANodeList}, [{compression, Comp}]), - {LastKey, _} = lists:last(ANodeList), - SubTreeSize = reduce_tree_size(NodeType, Size, ANodeList), - {LastKey, {Pointer, reduce_node(Bt, NodeType, ANodeList), SubTreeSize}} - end - || - ANodeList <- NodeListList - ], - {ok, ResultList}. - - -write_node(Bt, _OldNode, NodeType, [], NewList) -> - write_node(Bt, NodeType, NewList); -write_node(Bt, _OldNode, NodeType, [_], NewList) -> - write_node(Bt, NodeType, NewList); -write_node(Bt, OldNode, NodeType, OldList, NewList) -> - case can_reuse_old_node(OldList, NewList) of - {true, Prefix, Suffix} -> - {ok, PrefixKVs} = case Prefix of - [] -> {ok, []}; - _ -> write_node(Bt, NodeType, Prefix) - end, - {ok, SuffixKVs} = case Suffix of - [] -> {ok, []}; - _ -> write_node(Bt, NodeType, Suffix) - end, - Result = PrefixKVs ++ [OldNode] ++ SuffixKVs, - {ok, Result}; - false -> - write_node(Bt, NodeType, NewList) - end. - -can_reuse_old_node(OldList, NewList) -> - {Prefix, RestNewList} = remove_prefix_kvs(hd(OldList), NewList), - case old_list_is_prefix(OldList, RestNewList, 0) of - {true, Size, Suffix} -> - ReuseThreshold = get_chunk_size() * ?FILL_RATIO, - if Size < ReuseThreshold -> false; true -> - {true, Prefix, Suffix} - end; - false -> - false - end. - -remove_prefix_kvs(KV1, [KV2 | Rest]) when KV2 < KV1 -> - {Prefix, RestNewList} = remove_prefix_kvs(KV1, Rest), - {[KV2 | Prefix], RestNewList}; -remove_prefix_kvs(_, RestNewList) -> - {[], RestNewList}. - -% No more KV's in the old node so its a prefix -old_list_is_prefix([], Suffix, Size) -> - {true, Size, Suffix}; -% Some KV's have been removed from the old node -old_list_is_prefix(_OldList, [], _Size) -> - false; -% KV is equal in both old and new node so continue -old_list_is_prefix([KV | Rest1], [KV | Rest2], Acc) -> - old_list_is_prefix(Rest1, Rest2, ?term_size(KV) + Acc); -% KV mismatch between old and new node so not a prefix -old_list_is_prefix(_OldList, _NewList, _Acc) -> - false. - -modify_kpnode(Bt, {}, _LowerBound, Actions, [], QueryOutput) -> - modify_node(Bt, nil, Actions, QueryOutput); -modify_kpnode(_Bt, NodeTuple, LowerBound, [], ResultNode, QueryOutput) -> - {ok, lists:reverse(ResultNode, bounded_tuple_to_list(NodeTuple, LowerBound, - tuple_size(NodeTuple), [])), QueryOutput}; -modify_kpnode(Bt, NodeTuple, LowerBound, - [{_, FirstActionKey, _}|_]=Actions, ResultNode, QueryOutput) -> - Sz = tuple_size(NodeTuple), - N = find_first_gteq(Bt, NodeTuple, LowerBound, Sz, FirstActionKey), - case N =:= Sz of - true -> - % perform remaining actions on last node - {_, PointerInfo} = element(Sz, NodeTuple), - {ok, ChildKPs, QueryOutput2} = - modify_node(Bt, PointerInfo, Actions, QueryOutput), - NodeList = lists:reverse(ResultNode, bounded_tuple_to_list(NodeTuple, LowerBound, - Sz - 1, ChildKPs)), - {ok, NodeList, QueryOutput2}; - false -> - {NodeKey, PointerInfo} = element(N, NodeTuple), - SplitFun = fun({_ActionType, ActionKey, _ActionValue}) -> - not less(Bt, NodeKey, ActionKey) - end, - {LessEqQueries, GreaterQueries} = lists:splitwith(SplitFun, Actions), - {ok, ChildKPs, QueryOutput2} = - modify_node(Bt, PointerInfo, LessEqQueries, QueryOutput), - ResultNode2 = lists:reverse(ChildKPs, bounded_tuple_to_revlist(NodeTuple, - LowerBound, N - 1, ResultNode)), - modify_kpnode(Bt, NodeTuple, N+1, GreaterQueries, ResultNode2, QueryOutput2) - end. - -bounded_tuple_to_revlist(_Tuple, Start, End, Tail) when Start > End -> - Tail; -bounded_tuple_to_revlist(Tuple, Start, End, Tail) -> - bounded_tuple_to_revlist(Tuple, Start+1, End, [element(Start, Tuple)|Tail]). - -bounded_tuple_to_list(Tuple, Start, End, Tail) -> - bounded_tuple_to_list2(Tuple, Start, End, [], Tail). - -bounded_tuple_to_list2(_Tuple, Start, End, Acc, Tail) when Start > End -> - lists:reverse(Acc, Tail); -bounded_tuple_to_list2(Tuple, Start, End, Acc, Tail) -> - bounded_tuple_to_list2(Tuple, Start + 1, End, [element(Start, Tuple) | Acc], Tail). - -find_first_gteq(_Bt, _Tuple, Start, End, _Key) when Start == End -> - End; -find_first_gteq(Bt, Tuple, Start, End, Key) -> - Mid = Start + ((End - Start) div 2), - {TupleKey, _} = element(Mid, Tuple), - case less(Bt, TupleKey, Key) of - true -> - find_first_gteq(Bt, Tuple, Mid+1, End, Key); - false -> - find_first_gteq(Bt, Tuple, Start, Mid, Key) - end. - -modify_kvnode(_Bt, NodeTuple, LowerBound, [], ResultNode, QueryOutput) -> - {ok, lists:reverse(ResultNode, bounded_tuple_to_list(NodeTuple, LowerBound, tuple_size(NodeTuple), [])), QueryOutput}; -modify_kvnode(Bt, NodeTuple, LowerBound, [{ActionType, ActionKey, ActionValue} | RestActions], ResultNode, QueryOutput) when LowerBound > tuple_size(NodeTuple) -> - case ActionType of - insert -> - modify_kvnode(Bt, NodeTuple, LowerBound, RestActions, [{ActionKey, ActionValue} | ResultNode], QueryOutput); - remove -> - % just drop the action - modify_kvnode(Bt, NodeTuple, LowerBound, RestActions, ResultNode, QueryOutput); - fetch -> - % the key/value must not exist in the tree - modify_kvnode(Bt, NodeTuple, LowerBound, RestActions, ResultNode, [{not_found, {ActionKey, nil}} | QueryOutput]) - end; -modify_kvnode(Bt, NodeTuple, LowerBound, [{ActionType, ActionKey, ActionValue} | RestActions], AccNode, QueryOutput) -> - N = find_first_gteq(Bt, NodeTuple, LowerBound, tuple_size(NodeTuple), ActionKey), - {Key, Value} = element(N, NodeTuple), - ResultNode = bounded_tuple_to_revlist(NodeTuple, LowerBound, N - 1, AccNode), - case less(Bt, ActionKey, Key) of - true -> - case ActionType of - insert -> - % ActionKey is less than the Key, so insert - modify_kvnode(Bt, NodeTuple, N, RestActions, [{ActionKey, ActionValue} | ResultNode], QueryOutput); - remove -> - % ActionKey is less than the Key, just drop the action - modify_kvnode(Bt, NodeTuple, N, RestActions, ResultNode, QueryOutput); - fetch -> - % ActionKey is less than the Key, the key/value must not exist in the tree - modify_kvnode(Bt, NodeTuple, N, RestActions, ResultNode, [{not_found, {ActionKey, nil}} | QueryOutput]) - end; - false -> - % ActionKey and Key are maybe equal. - case less(Bt, Key, ActionKey) of - false -> - case ActionType of - insert -> - modify_kvnode(Bt, NodeTuple, N+1, RestActions, [{ActionKey, ActionValue} | ResultNode], QueryOutput); - remove -> - modify_kvnode(Bt, NodeTuple, N+1, RestActions, ResultNode, QueryOutput); - fetch -> - % ActionKey is equal to the Key, insert into the QueryOuput, but re-process the node - % since an identical action key can follow it. - modify_kvnode(Bt, NodeTuple, N, RestActions, ResultNode, [{ok, assemble(Bt, Key, Value)} | QueryOutput]) - end; - true -> - modify_kvnode(Bt, NodeTuple, N + 1, [{ActionType, ActionKey, ActionValue} | RestActions], [{Key, Value} | ResultNode], QueryOutput) - end - end. - - -reduce_stream_node(_Bt, _Dir, nil, _KeyStart, _InEndRangeFun, GroupedKey, GroupedKVsAcc, - GroupedRedsAcc, _KeyGroupFun, _Fun, Acc) -> - {ok, Acc, GroupedRedsAcc, GroupedKVsAcc, GroupedKey}; -reduce_stream_node(Bt, Dir, Node, KeyStart, InEndRangeFun, GroupedKey, GroupedKVsAcc, - GroupedRedsAcc, KeyGroupFun, Fun, Acc) -> - P = element(1, Node), - case get_node(Bt, P) of - {kp_node, NodeList} -> - NodeList2 = adjust_dir(Dir, NodeList), - reduce_stream_kp_node(Bt, Dir, NodeList2, KeyStart, InEndRangeFun, GroupedKey, - GroupedKVsAcc, GroupedRedsAcc, KeyGroupFun, Fun, Acc); - {kv_node, KVs} -> - KVs2 = adjust_dir(Dir, KVs), - reduce_stream_kv_node(Bt, Dir, KVs2, KeyStart, InEndRangeFun, GroupedKey, - GroupedKVsAcc, GroupedRedsAcc, KeyGroupFun, Fun, Acc) - end. - -reduce_stream_kv_node(Bt, Dir, KVs, KeyStart, InEndRangeFun, - GroupedKey, GroupedKVsAcc, GroupedRedsAcc, - KeyGroupFun, Fun, Acc) -> - - GTEKeyStartKVs = - case KeyStart of - undefined -> - KVs; - _ -> - DropFun = case Dir of - fwd -> - fun({Key, _}) -> less(Bt, Key, KeyStart) end; - rev -> - fun({Key, _}) -> less(Bt, KeyStart, Key) end - end, - lists:dropwhile(DropFun, KVs) - end, - KVs2 = lists:takewhile( - fun({Key, _}) -> InEndRangeFun(Key) end, GTEKeyStartKVs), - reduce_stream_kv_node2(Bt, KVs2, GroupedKey, GroupedKVsAcc, GroupedRedsAcc, - KeyGroupFun, Fun, Acc). - - -reduce_stream_kv_node2(_Bt, [], GroupedKey, GroupedKVsAcc, GroupedRedsAcc, - _KeyGroupFun, _Fun, Acc) -> - {ok, Acc, GroupedRedsAcc, GroupedKVsAcc, GroupedKey}; -reduce_stream_kv_node2(Bt, [{Key, Value}| RestKVs], GroupedKey, GroupedKVsAcc, - GroupedRedsAcc, KeyGroupFun, Fun, Acc) -> - case GroupedKey of - undefined -> - reduce_stream_kv_node2(Bt, RestKVs, Key, - [assemble(Bt,Key,Value)], [], KeyGroupFun, Fun, Acc); - _ -> - - case KeyGroupFun(GroupedKey, Key) of - true -> - reduce_stream_kv_node2(Bt, RestKVs, GroupedKey, - [assemble(Bt,Key,Value)|GroupedKVsAcc], GroupedRedsAcc, KeyGroupFun, - Fun, Acc); - false -> - case Fun(GroupedKey, {GroupedKVsAcc, GroupedRedsAcc}, Acc) of - {ok, Acc2} -> - reduce_stream_kv_node2(Bt, RestKVs, Key, [assemble(Bt,Key,Value)], - [], KeyGroupFun, Fun, Acc2); - {stop, Acc2} -> - throw({stop, Acc2}) - end - end - end. - -reduce_stream_kp_node(Bt, Dir, NodeList, KeyStart, InEndRangeFun, - GroupedKey, GroupedKVsAcc, GroupedRedsAcc, - KeyGroupFun, Fun, Acc) -> - Nodes = - case KeyStart of - undefined -> - NodeList; - _ -> - case Dir of - fwd -> - lists:dropwhile(fun({Key, _}) -> less(Bt, Key, KeyStart) end, NodeList); - rev -> - RevKPs = lists:reverse(NodeList), - case lists:splitwith(fun({Key, _}) -> less(Bt, Key, KeyStart) end, RevKPs) of - {_Before, []} -> - NodeList; - {Before, [FirstAfter | _]} -> - [FirstAfter | lists:reverse(Before)] - end - end - end, - {InRange, MaybeInRange} = lists:splitwith( - fun({Key, _}) -> InEndRangeFun(Key) end, Nodes), - NodesInRange = case MaybeInRange of - [FirstMaybeInRange | _] when Dir =:= fwd -> - InRange ++ [FirstMaybeInRange]; - _ -> - InRange - end, - reduce_stream_kp_node2(Bt, Dir, NodesInRange, KeyStart, InEndRangeFun, - GroupedKey, GroupedKVsAcc, GroupedRedsAcc, KeyGroupFun, Fun, Acc). - - -reduce_stream_kp_node2(Bt, Dir, [{_Key, NodeInfo} | RestNodeList], KeyStart, InEndRangeFun, - undefined, [], [], KeyGroupFun, Fun, Acc) -> - {ok, Acc2, GroupedRedsAcc2, GroupedKVsAcc2, GroupedKey2} = - reduce_stream_node(Bt, Dir, NodeInfo, KeyStart, InEndRangeFun, undefined, - [], [], KeyGroupFun, Fun, Acc), - reduce_stream_kp_node2(Bt, Dir, RestNodeList, KeyStart, InEndRangeFun, GroupedKey2, - GroupedKVsAcc2, GroupedRedsAcc2, KeyGroupFun, Fun, Acc2); -reduce_stream_kp_node2(Bt, Dir, NodeList, KeyStart, InEndRangeFun, - GroupedKey, GroupedKVsAcc, GroupedRedsAcc, KeyGroupFun, Fun, Acc) -> - {Grouped0, Ungrouped0} = lists:splitwith(fun({Key,_}) -> - KeyGroupFun(GroupedKey, Key) end, NodeList), - {GroupedNodes, UngroupedNodes} = - case Grouped0 of - [] -> - {Grouped0, Ungrouped0}; - _ -> - [FirstGrouped | RestGrouped] = lists:reverse(Grouped0), - {RestGrouped, [FirstGrouped | Ungrouped0]} - end, - GroupedReds = [element(2, Node) || {_, Node} <- GroupedNodes], - case UngroupedNodes of - [{_Key, NodeInfo}|RestNodes] -> - {ok, Acc2, GroupedRedsAcc2, GroupedKVsAcc2, GroupedKey2} = - reduce_stream_node(Bt, Dir, NodeInfo, KeyStart, InEndRangeFun, GroupedKey, - GroupedKVsAcc, GroupedReds ++ GroupedRedsAcc, KeyGroupFun, Fun, Acc), - reduce_stream_kp_node2(Bt, Dir, RestNodes, KeyStart, InEndRangeFun, GroupedKey2, - GroupedKVsAcc2, GroupedRedsAcc2, KeyGroupFun, Fun, Acc2); - [] -> - {ok, Acc, GroupedReds ++ GroupedRedsAcc, GroupedKVsAcc, GroupedKey} - end. - -adjust_dir(fwd, List) -> - List; -adjust_dir(rev, List) -> - lists:reverse(List). - -stream_node(Bt, Reds, Node, StartKey, InRange, Dir, Fun, Acc) -> - Pointer = element(1, Node), - {NodeType, NodeList} = get_node(Bt, Pointer), - case NodeType of - kp_node -> - stream_kp_node(Bt, Reds, adjust_dir(Dir, NodeList), StartKey, InRange, Dir, Fun, Acc); - kv_node -> - stream_kv_node(Bt, Reds, adjust_dir(Dir, NodeList), StartKey, InRange, Dir, Fun, Acc) - end. - -stream_node(Bt, Reds, Node, InRange, Dir, Fun, Acc) -> - Pointer = element(1, Node), - {NodeType, NodeList} = get_node(Bt, Pointer), - case NodeType of - kp_node -> - stream_kp_node(Bt, Reds, adjust_dir(Dir, NodeList), InRange, Dir, Fun, Acc); - kv_node -> - stream_kv_node2(Bt, Reds, [], adjust_dir(Dir, NodeList), InRange, Dir, Fun, Acc) - end. - -stream_kp_node(_Bt, _Reds, [], _InRange, _Dir, _Fun, Acc) -> - {ok, Acc}; -stream_kp_node(Bt, Reds, [{Key, Node} | Rest], InRange, Dir, Fun, Acc) -> - Red = element(2, Node), - case Fun(traverse, Key, Red, Acc) of - {ok, Acc2} -> - case stream_node(Bt, Reds, Node, InRange, Dir, Fun, Acc2) of - {ok, Acc3} -> - stream_kp_node(Bt, [Red | Reds], Rest, InRange, Dir, Fun, Acc3); - {stop, LastReds, Acc3} -> - {stop, LastReds, Acc3} - end; - {skip, Acc2} -> - stream_kp_node(Bt, [Red | Reds], Rest, InRange, Dir, Fun, Acc2); - {stop, Acc2} -> - {stop, Reds, Acc2} - end. - -drop_nodes(_Bt, Reds, _StartKey, []) -> - {Reds, []}; -drop_nodes(Bt, Reds, StartKey, [{NodeKey, Node} | RestKPs]) -> - case less(Bt, NodeKey, StartKey) of - true -> - drop_nodes(Bt, [element(2, Node) | Reds], StartKey, RestKPs); - false -> - {Reds, [{NodeKey, Node} | RestKPs]} - end. - -stream_kp_node(Bt, Reds, KPs, StartKey, InRange, Dir, Fun, Acc) -> - {NewReds, NodesToStream} = - case Dir of - fwd -> - % drop all nodes sorting before the key - drop_nodes(Bt, Reds, StartKey, KPs); - rev -> - % keep all nodes sorting before the key, AND the first node to sort after - RevKPs = lists:reverse(KPs), - case lists:splitwith(fun({Key, _Pointer}) -> less(Bt, Key, StartKey) end, RevKPs) of - {_RevsBefore, []} -> - % everything sorts before it - {Reds, KPs}; - {RevBefore, [FirstAfter | Drop]} -> - {[element(2, Node) || {_K, Node} <- Drop] ++ Reds, - [FirstAfter | lists:reverse(RevBefore)]} - end - end, - case NodesToStream of - [] -> - {ok, Acc}; - [{_Key, Node} | Rest] -> - case stream_node(Bt, NewReds, Node, StartKey, InRange, Dir, Fun, Acc) of - {ok, Acc2} -> - Red = element(2, Node), - stream_kp_node(Bt, [Red | NewReds], Rest, InRange, Dir, Fun, Acc2); - {stop, LastReds, Acc2} -> - {stop, LastReds, Acc2} - end - end. - -stream_kv_node(Bt, Reds, KVs, StartKey, InRange, Dir, Fun, Acc) -> - DropFun = - case Dir of - fwd -> - fun({Key, _}) -> less(Bt, Key, StartKey) end; - rev -> - fun({Key, _}) -> less(Bt, StartKey, Key) end - end, - {LTKVs, GTEKVs} = lists:splitwith(DropFun, KVs), - AssembleLTKVs = [assemble(Bt,K,V) || {K,V} <- LTKVs], - stream_kv_node2(Bt, Reds, AssembleLTKVs, GTEKVs, InRange, Dir, Fun, Acc). - -stream_kv_node2(_Bt, _Reds, _PrevKVs, [], _InRange, _Dir, _Fun, Acc) -> - {ok, Acc}; -stream_kv_node2(Bt, Reds, PrevKVs, [{K,V} | RestKVs], InRange, Dir, Fun, Acc) -> - case InRange(K) of - false -> - {stop, {PrevKVs, Reds}, Acc}; - true -> - AssembledKV = assemble(Bt, K, V), - case Fun(visit, AssembledKV, {PrevKVs, Reds}, Acc) of - {ok, Acc2} -> - stream_kv_node2(Bt, Reds, [AssembledKV | PrevKVs], RestKVs, InRange, Dir, Fun, Acc2); - {stop, Acc2} -> - {stop, {PrevKVs, Reds}, Acc2} - end - end. |