diff options
Diffstat (limited to 'src/couch/src/couch_emsort.erl')
-rw-r--r-- | src/couch/src/couch_emsort.erl | 115 |
1 files changed, 48 insertions, 67 deletions
diff --git a/src/couch/src/couch_emsort.erl b/src/couch/src/couch_emsort.erl index 430d94e01..9dcc08d67 100644 --- a/src/couch/src/couch_emsort.erl +++ b/src/couch/src/couch_emsort.erl @@ -142,36 +142,30 @@ num_bb = 0 }). - -define(REPORT_INTERVAL, 1000). - open(Fd) -> - {ok, #ems{fd=Fd}}. - + {ok, #ems{fd = Fd}}. open(Fd, Options) -> - {ok, set_options(#ems{fd=Fd}, Options)}. - + {ok, set_options(#ems{fd = Fd}, Options)}. set_options(Ems, []) -> Ems; set_options(Ems, [{root, Root} | Rest]) -> - set_options(Ems#ems{root=Root}, Rest); + set_options(Ems#ems{root = Root}, Rest); set_options(Ems, [{chain_chunk, Count} | Rest]) when is_integer(Count) -> - set_options(Ems#ems{chain_chunk=Count}, Rest); + set_options(Ems#ems{chain_chunk = Count}, Rest); set_options(Ems, [{back_bone_chunk, Count} | Rest]) when is_integer(Count) -> - set_options(Ems#ems{bb_chunk=Count}, Rest); + set_options(Ems#ems{bb_chunk = Count}, Rest); set_options(Ems, [{num_kvs, NumKVs} | Rest]) when is_integer(NumKVs) -> - set_options(Ems#ems{num_kvs=NumKVs}, Rest); + set_options(Ems#ems{num_kvs = NumKVs}, Rest); set_options(Ems, [{num_bb, NumBB} | Rest]) when is_integer(NumBB) -> - set_options(Ems#ems{num_bb=NumBB}, Rest). + set_options(Ems#ems{num_bb = NumBB}, Rest). - -get_fd(#ems{fd=Fd}) -> +get_fd(#ems{fd = Fd}) -> Fd. - get_state(#ems{} = Ems) -> #ems{ root = Root, @@ -184,7 +178,6 @@ get_state(#ems{} = Ems) -> {num_bb, NumBB} ]. - add(Ems, []) -> {ok, Ems}; add(Ems, KVs) -> @@ -195,69 +188,64 @@ add(Ems, KVs) -> num_bb = Ems#ems.num_bb + 1 }}. - -sort(#ems{}=Ems) -> +sort(#ems{} = Ems) -> {ok, Ems1} = merge(Ems), iter(Ems1). - merge(Ems) -> merge(Ems, fun(_) -> ok end). - -merge(#ems{root=undefined}=Ems, _Reporter) -> +merge(#ems{root = undefined} = Ems, _Reporter) -> {ok, Ems}; -merge(#ems{}=Ems, Reporter) -> +merge(#ems{} = Ems, Reporter) -> {ok, decimate(Ems, Reporter)}. - -iter(#ems{root=undefined}=Ems) -> +iter(#ems{root = undefined} = Ems) -> {ok, {Ems, []}}; -iter(#ems{root={BB, nil}}=Ems) -> +iter(#ems{root = {BB, nil}} = Ems) -> Chains = init_chains(Ems, small, BB), {ok, {Ems, Chains}}; -iter(#ems{root={_, _}}) -> +iter(#ems{root = {_, _}}) -> {error, not_merged}. - next({_Ems, []}) -> finished; next({Ems, Chains}) -> {KV, RestChains} = choose_kv(small, Ems, Chains), {ok, KV, {Ems, RestChains}}. - -num_kvs(#ems{num_kvs=NumKVs}) -> +num_kvs(#ems{num_kvs = NumKVs}) -> NumKVs. -num_merges(#ems{bb_chunk=BBChunk, num_bb=NumBB}) -> +num_merges(#ems{bb_chunk = BBChunk, num_bb = NumBB}) -> num_merges(BBChunk, NumBB). - -add_bb_pos(#ems{root=undefined}=Ems, Pos) -> - Ems#ems{root={[Pos], nil}}; -add_bb_pos(#ems{root={BB, Prev}}=Ems, Pos) -> +add_bb_pos(#ems{root = undefined} = Ems, Pos) -> + Ems#ems{root = {[Pos], nil}}; +add_bb_pos(#ems{root = {BB, Prev}} = Ems, Pos) -> {NewBB, NewPrev} = append_item(Ems, {BB, Prev}, Pos, Ems#ems.bb_chunk), - Ems#ems{root={NewBB, NewPrev}}. - + Ems#ems{root = {NewBB, NewPrev}}. write_kvs(Ems, KVs) -> % Write the list of KV's to disk in sorted order in chunks % of 100. Also make sure that the order is so that they % can be streamed in asscending order. {LastKVs, LastPos} = - lists:foldr(fun(KV, Acc) -> - append_item(Ems, Acc, KV, Ems#ems.chain_chunk) - end, {[], nil}, lists:sort(KVs)), + lists:foldr( + fun(KV, Acc) -> + append_item(Ems, Acc, KV, Ems#ems.chain_chunk) + end, + {[], nil}, + lists:sort(KVs) + ), {ok, Final, _} = couch_file:append_term(Ems#ems.fd, {LastKVs, LastPos}), Final. - -decimate(#ems{root={_BB, nil}}=Ems, _Reporter) -> +decimate(#ems{root = {_BB, nil}} = Ems, _Reporter) -> % We have less than bb_chunk backbone pointers so we're % good to start streaming KV's back to the client. Ems; -decimate(#ems{root={BB, NextBB}}=Ems, Reporter) -> +decimate(#ems{root = {BB, NextBB}} = Ems, Reporter) -> % To make sure we have a bounded amount of data in RAM % at any given point we first need to decimate the data % by performing the first couple iterations of a merge @@ -273,15 +261,13 @@ decimate(#ems{root={BB, NextBB}}=Ems, Reporter) -> % Continue deicmating until we have an acceptable bound on % the number of keys to use. - decimate(Ems#ems{root={FwdBB, FwdNextBB}}, Reporter). - + decimate(Ems#ems{root = {FwdBB, FwdNextBB}}, Reporter). merge_back_bone(Ems, Choose, BB, NextBB, Reporter) -> BBPos = merge_chains(Ems, Choose, BB, Reporter), Reporter(length(BB)), merge_rest_back_bone(Ems, Choose, NextBB, {[BBPos], nil}, Reporter). - merge_rest_back_bone(_Ems, _Choose, nil, Acc, _Reporter) -> Acc; merge_rest_back_bone(Ems, Choose, BBPos, Acc, Reporter) -> @@ -290,40 +276,39 @@ merge_rest_back_bone(Ems, Choose, BBPos, Acc, Reporter) -> {NewBB, NewPrev} = append_item(Ems, Acc, NewPos, Ems#ems.bb_chunk), merge_rest_back_bone(Ems, Choose, NextBB, {NewBB, NewPrev}, Reporter). - merge_chains(Ems, Choose, BB, Reporter) -> Chains = init_chains(Ems, Choose, BB), merge_chains(Ems, Choose, Chains, {[], nil}, Reporter, 0). - merge_chains(Ems, _Choose, [], ChainAcc, _Reporter, _Count) -> {ok, CPos, _} = couch_file:append_term(Ems#ems.fd, ChainAcc), CPos; -merge_chains(#ems{chain_chunk=CC}=Ems, Choose, Chains, Acc, Reporter, Count0) -> +merge_chains(#ems{chain_chunk = CC} = Ems, Choose, Chains, Acc, Reporter, Count0) -> {KV, RestChains} = choose_kv(Choose, Ems, Chains), {NewKVs, NewPrev} = append_item(Ems, Acc, KV, CC), - Count1 = case (Count0 + 1) rem ?REPORT_INTERVAL of - 0 -> - Reporter(Count0), - 0; - _ -> - Count0 + 1 - end, + Count1 = + case (Count0 + 1) rem ?REPORT_INTERVAL of + 0 -> + Reporter(Count0), + 0; + _ -> + Count0 + 1 + end, merge_chains(Ems, Choose, RestChains, {NewKVs, NewPrev}, Reporter, Count1). - init_chains(Ems, Choose, BB) -> - Chains = lists:map(fun(CPos) -> - {ok, {KVs, NextKVs}} = couch_file:pread_term(Ems#ems.fd, CPos), - {KVs, NextKVs} - end, BB), + Chains = lists:map( + fun(CPos) -> + {ok, {KVs, NextKVs}} = couch_file:pread_term(Ems#ems.fd, CPos), + {KVs, NextKVs} + end, + BB + ), order_chains(Choose, Chains). - order_chains(small, Chains) -> lists:sort(Chains); order_chains(big, Chains) -> lists:reverse(lists:sort(Chains)). - choose_kv(_Choose, _Ems, [{[KV], nil} | Rest]) -> {KV, Rest}; choose_kv(Choose, Ems, [{[KV], Pos} | RestChains]) -> @@ -338,26 +323,22 @@ choose_kv(Choose, _Ems, [{[KV | RestKVs], Prev} | RestChains]) -> big -> {KV, ins_big_chain(RestChains, {RestKVs, Prev}, [])} end. - -ins_small_chain([{[{K1,_}|_],_}=C1|Rest], {[{K2,_}|_],_}=C2, Acc) when K1<K2 -> +ins_small_chain([{[{K1, _} | _], _} = C1 | Rest], {[{K2, _} | _], _} = C2, Acc) when K1 < K2 -> ins_small_chain(Rest, C2, [C1 | Acc]); ins_small_chain(Rest, Chain, Acc) -> lists:reverse(Acc, [Chain | Rest]). - -ins_big_chain([{[{K1,_}|_],_}=C1|Rest], {[{K2,_}|_],_}=C2, Acc) when K1>K2 -> +ins_big_chain([{[{K1, _} | _], _} = C1 | Rest], {[{K2, _} | _], _} = C2, Acc) when K1 > K2 -> ins_big_chain(Rest, C2, [C1 | Acc]); ins_big_chain(Rest, Chain, Acc) -> lists:reverse(Acc, [Chain | Rest]). - append_item(Ems, {List, Prev}, Pos, Size) when length(List) >= Size -> {ok, PrevList, _} = couch_file:append_term(Ems#ems.fd, {List, Prev}), {[Pos], PrevList}; append_item(_Ems, {List, Prev}, Pos, _Size) -> {[Pos | List], Prev}. - num_merges(BBChunk, NumBB) when NumBB =< BBChunk -> 0; num_merges(BBChunk, NumBB) when NumBB > BBChunk -> |