diff options
Diffstat (limited to 'src/couch/src/couch_emsort.erl')
-rw-r--r-- | src/couch/src/couch_emsort.erl | 102 |
1 files changed, 75 insertions, 27 deletions
diff --git a/src/couch/src/couch_emsort.erl b/src/couch/src/couch_emsort.erl index 2a25a2322..430d94e01 100644 --- a/src/couch/src/couch_emsort.erl +++ b/src/couch/src/couch_emsort.erl @@ -130,17 +130,22 @@ % -export([open/1, open/2, get_fd/1, get_state/1]). --export([add/2, merge/1, sort/1, iter/1, next/1]). - +-export([add/2, merge/1, merge/2, sort/1, iter/1, next/1]). +-export([num_kvs/1, num_merges/1]). -record(ems, { fd, root, bb_chunk = 10, - chain_chunk = 100 + chain_chunk = 100, + num_kvs = 0, + num_bb = 0 }). +-define(REPORT_INTERVAL, 1000). + + open(Fd) -> {ok, #ems{fd=Fd}}. @@ -156,22 +161,39 @@ set_options(Ems, [{root, Root} | Rest]) -> set_options(Ems, [{chain_chunk, Count} | Rest]) when is_integer(Count) -> set_options(Ems#ems{chain_chunk=Count}, Rest); set_options(Ems, [{back_bone_chunk, Count} | Rest]) when is_integer(Count) -> - set_options(Ems#ems{bb_chunk=Count}, Rest). + set_options(Ems#ems{bb_chunk=Count}, Rest); +set_options(Ems, [{num_kvs, NumKVs} | Rest]) when is_integer(NumKVs) -> + set_options(Ems#ems{num_kvs=NumKVs}, Rest); +set_options(Ems, [{num_bb, NumBB} | Rest]) when is_integer(NumBB) -> + set_options(Ems#ems{num_bb=NumBB}, Rest). get_fd(#ems{fd=Fd}) -> Fd. -get_state(#ems{root=Root}) -> - Root. +get_state(#ems{} = Ems) -> + #ems{ + root = Root, + num_kvs = NumKVs, + num_bb = NumBB + } = Ems, + [ + {root, Root}, + {num_kvs, NumKVs}, + {num_bb, NumBB} + ]. add(Ems, []) -> {ok, Ems}; add(Ems, KVs) -> Pos = write_kvs(Ems, KVs), - {ok, add_bb_pos(Ems, Pos)}. + NewEms = add_bb_pos(Ems, Pos), + {ok, NewEms#ems{ + num_kvs = Ems#ems.num_kvs + length(KVs), + num_bb = Ems#ems.num_bb + 1 + }}. sort(#ems{}=Ems) -> @@ -179,10 +201,14 @@ sort(#ems{}=Ems) -> iter(Ems1). -merge(#ems{root=undefined}=Ems) -> +merge(Ems) -> + merge(Ems, fun(_) -> ok end). + + +merge(#ems{root=undefined}=Ems, _Reporter) -> {ok, Ems}; -merge(#ems{}=Ems) -> - {ok, decimate(Ems)}. +merge(#ems{}=Ems, Reporter) -> + {ok, decimate(Ems, Reporter)}. iter(#ems{root=undefined}=Ems) -> @@ -201,6 +227,13 @@ next({Ems, Chains}) -> {ok, KV, {Ems, RestChains}}. +num_kvs(#ems{num_kvs=NumKVs}) -> + NumKVs. + +num_merges(#ems{bb_chunk=BBChunk, num_bb=NumBB}) -> + num_merges(BBChunk, NumBB). + + add_bb_pos(#ems{root=undefined}=Ems, Pos) -> Ems#ems{root={[Pos], nil}}; add_bb_pos(#ems{root={BB, Prev}}=Ems, Pos) -> @@ -220,11 +253,11 @@ write_kvs(Ems, KVs) -> Final. -decimate(#ems{root={_BB, nil}}=Ems) -> +decimate(#ems{root={_BB, nil}}=Ems, _Reporter) -> % We have less than bb_chunk backbone pointers so we're % good to start streaming KV's back to the client. Ems; -decimate(#ems{root={BB, NextBB}}=Ems) -> +decimate(#ems{root={BB, NextBB}}=Ems, Reporter) -> % To make sure we have a bounded amount of data in RAM % at any given point we first need to decimate the data % by performing the first couple iterations of a merge @@ -232,43 +265,51 @@ decimate(#ems{root={BB, NextBB}}=Ems) -> % The first pass gives us a sort with pointers linked from % largest to smallest. - {RevBB, RevNextBB} = merge_back_bone(Ems, small, BB, NextBB), + {RevBB, RevNextBB} = merge_back_bone(Ems, small, BB, NextBB, Reporter), % We have to run a second pass so that links are pointed % back from smallest to largest. - {FwdBB, FwdNextBB} = merge_back_bone(Ems, big, RevBB, RevNextBB), + {FwdBB, FwdNextBB} = merge_back_bone(Ems, big, RevBB, RevNextBB, Reporter), % Continue deicmating until we have an acceptable bound on % the number of keys to use. - decimate(Ems#ems{root={FwdBB, FwdNextBB}}). + decimate(Ems#ems{root={FwdBB, FwdNextBB}}, Reporter). -merge_back_bone(Ems, Choose, BB, NextBB) -> - BBPos = merge_chains(Ems, Choose, BB), - merge_rest_back_bone(Ems, Choose, NextBB, {[BBPos], nil}). +merge_back_bone(Ems, Choose, BB, NextBB, Reporter) -> + BBPos = merge_chains(Ems, Choose, BB, Reporter), + Reporter(length(BB)), + merge_rest_back_bone(Ems, Choose, NextBB, {[BBPos], nil}, Reporter). -merge_rest_back_bone(_Ems, _Choose, nil, Acc) -> +merge_rest_back_bone(_Ems, _Choose, nil, Acc, _Reporter) -> Acc; -merge_rest_back_bone(Ems, Choose, BBPos, Acc) -> +merge_rest_back_bone(Ems, Choose, BBPos, Acc, Reporter) -> {ok, {BB, NextBB}} = couch_file:pread_term(Ems#ems.fd, BBPos), - NewPos = merge_chains(Ems, Choose, BB), + NewPos = merge_chains(Ems, Choose, BB, Reporter), {NewBB, NewPrev} = append_item(Ems, Acc, NewPos, Ems#ems.bb_chunk), - merge_rest_back_bone(Ems, Choose, NextBB, {NewBB, NewPrev}). + merge_rest_back_bone(Ems, Choose, NextBB, {NewBB, NewPrev}, Reporter). -merge_chains(Ems, Choose, BB) -> +merge_chains(Ems, Choose, BB, Reporter) -> Chains = init_chains(Ems, Choose, BB), - merge_chains(Ems, Choose, Chains, {[], nil}). + merge_chains(Ems, Choose, Chains, {[], nil}, Reporter, 0). -merge_chains(Ems, _Choose, [], ChainAcc) -> +merge_chains(Ems, _Choose, [], ChainAcc, _Reporter, _Count) -> {ok, CPos, _} = couch_file:append_term(Ems#ems.fd, ChainAcc), CPos; -merge_chains(#ems{chain_chunk=CC}=Ems, Choose, Chains, Acc) -> +merge_chains(#ems{chain_chunk=CC}=Ems, Choose, Chains, Acc, Reporter, Count0) -> {KV, RestChains} = choose_kv(Choose, Ems, Chains), {NewKVs, NewPrev} = append_item(Ems, Acc, KV, CC), - merge_chains(Ems, Choose, RestChains, {NewKVs, NewPrev}). + Count1 = case (Count0 + 1) rem ?REPORT_INTERVAL of + 0 -> + Reporter(Count0), + 0; + _ -> + Count0 + 1 + end, + merge_chains(Ems, Choose, RestChains, {NewKVs, NewPrev}, Reporter, Count1). init_chains(Ems, Choose, BB) -> @@ -316,3 +357,10 @@ append_item(Ems, {List, Prev}, Pos, Size) when length(List) >= Size -> append_item(_Ems, {List, Prev}, Pos, _Size) -> {[Pos | List], Prev}. + +num_merges(BBChunk, NumBB) when NumBB =< BBChunk -> + 0; +num_merges(BBChunk, NumBB) when NumBB > BBChunk -> + RevNumBB = ceil(NumBB / BBChunk), + FwdNumBB = ceil(RevNumBB / BBChunk), + 2 + num_merges(BBChunk, FwdNumBB). |