summaryrefslogtreecommitdiff
path: root/src/couch/src/couch_emsort.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couch/src/couch_emsort.erl')
-rw-r--r--src/couch/src/couch_emsort.erl102
1 files changed, 75 insertions, 27 deletions
diff --git a/src/couch/src/couch_emsort.erl b/src/couch/src/couch_emsort.erl
index 2a25a2322..430d94e01 100644
--- a/src/couch/src/couch_emsort.erl
+++ b/src/couch/src/couch_emsort.erl
@@ -130,17 +130,22 @@
%
-export([open/1, open/2, get_fd/1, get_state/1]).
--export([add/2, merge/1, sort/1, iter/1, next/1]).
-
+-export([add/2, merge/1, merge/2, sort/1, iter/1, next/1]).
+-export([num_kvs/1, num_merges/1]).
-record(ems, {
fd,
root,
bb_chunk = 10,
- chain_chunk = 100
+ chain_chunk = 100,
+ num_kvs = 0,
+ num_bb = 0
}).
+-define(REPORT_INTERVAL, 1000).
+
+
open(Fd) ->
{ok, #ems{fd=Fd}}.
@@ -156,22 +161,39 @@ set_options(Ems, [{root, Root} | Rest]) ->
set_options(Ems, [{chain_chunk, Count} | Rest]) when is_integer(Count) ->
set_options(Ems#ems{chain_chunk=Count}, Rest);
set_options(Ems, [{back_bone_chunk, Count} | Rest]) when is_integer(Count) ->
- set_options(Ems#ems{bb_chunk=Count}, Rest).
+ set_options(Ems#ems{bb_chunk=Count}, Rest);
+set_options(Ems, [{num_kvs, NumKVs} | Rest]) when is_integer(NumKVs) ->
+ set_options(Ems#ems{num_kvs=NumKVs}, Rest);
+set_options(Ems, [{num_bb, NumBB} | Rest]) when is_integer(NumBB) ->
+ set_options(Ems#ems{num_bb=NumBB}, Rest).
get_fd(#ems{fd=Fd}) ->
Fd.
-get_state(#ems{root=Root}) ->
- Root.
+get_state(#ems{} = Ems) ->
+ #ems{
+ root = Root,
+ num_kvs = NumKVs,
+ num_bb = NumBB
+ } = Ems,
+ [
+ {root, Root},
+ {num_kvs, NumKVs},
+ {num_bb, NumBB}
+ ].
add(Ems, []) ->
{ok, Ems};
add(Ems, KVs) ->
Pos = write_kvs(Ems, KVs),
- {ok, add_bb_pos(Ems, Pos)}.
+ NewEms = add_bb_pos(Ems, Pos),
+ {ok, NewEms#ems{
+ num_kvs = Ems#ems.num_kvs + length(KVs),
+ num_bb = Ems#ems.num_bb + 1
+ }}.
sort(#ems{}=Ems) ->
@@ -179,10 +201,14 @@ sort(#ems{}=Ems) ->
iter(Ems1).
-merge(#ems{root=undefined}=Ems) ->
+merge(Ems) ->
+ merge(Ems, fun(_) -> ok end).
+
+
+merge(#ems{root=undefined}=Ems, _Reporter) ->
{ok, Ems};
-merge(#ems{}=Ems) ->
- {ok, decimate(Ems)}.
+merge(#ems{}=Ems, Reporter) ->
+ {ok, decimate(Ems, Reporter)}.
iter(#ems{root=undefined}=Ems) ->
@@ -201,6 +227,13 @@ next({Ems, Chains}) ->
{ok, KV, {Ems, RestChains}}.
+num_kvs(#ems{num_kvs=NumKVs}) ->
+ NumKVs.
+
+num_merges(#ems{bb_chunk=BBChunk, num_bb=NumBB}) ->
+ num_merges(BBChunk, NumBB).
+
+
add_bb_pos(#ems{root=undefined}=Ems, Pos) ->
Ems#ems{root={[Pos], nil}};
add_bb_pos(#ems{root={BB, Prev}}=Ems, Pos) ->
@@ -220,11 +253,11 @@ write_kvs(Ems, KVs) ->
Final.
-decimate(#ems{root={_BB, nil}}=Ems) ->
+decimate(#ems{root={_BB, nil}}=Ems, _Reporter) ->
% We have less than bb_chunk backbone pointers so we're
% good to start streaming KV's back to the client.
Ems;
-decimate(#ems{root={BB, NextBB}}=Ems) ->
+decimate(#ems{root={BB, NextBB}}=Ems, Reporter) ->
% To make sure we have a bounded amount of data in RAM
% at any given point we first need to decimate the data
% by performing the first couple iterations of a merge
@@ -232,43 +265,51 @@ decimate(#ems{root={BB, NextBB}}=Ems) ->
% The first pass gives us a sort with pointers linked from
% largest to smallest.
- {RevBB, RevNextBB} = merge_back_bone(Ems, small, BB, NextBB),
+ {RevBB, RevNextBB} = merge_back_bone(Ems, small, BB, NextBB, Reporter),
% We have to run a second pass so that links are pointed
% back from smallest to largest.
- {FwdBB, FwdNextBB} = merge_back_bone(Ems, big, RevBB, RevNextBB),
+ {FwdBB, FwdNextBB} = merge_back_bone(Ems, big, RevBB, RevNextBB, Reporter),
% Continue deicmating until we have an acceptable bound on
% the number of keys to use.
- decimate(Ems#ems{root={FwdBB, FwdNextBB}}).
+ decimate(Ems#ems{root={FwdBB, FwdNextBB}}, Reporter).
-merge_back_bone(Ems, Choose, BB, NextBB) ->
- BBPos = merge_chains(Ems, Choose, BB),
- merge_rest_back_bone(Ems, Choose, NextBB, {[BBPos], nil}).
+merge_back_bone(Ems, Choose, BB, NextBB, Reporter) ->
+ BBPos = merge_chains(Ems, Choose, BB, Reporter),
+ Reporter(length(BB)),
+ merge_rest_back_bone(Ems, Choose, NextBB, {[BBPos], nil}, Reporter).
-merge_rest_back_bone(_Ems, _Choose, nil, Acc) ->
+merge_rest_back_bone(_Ems, _Choose, nil, Acc, _Reporter) ->
Acc;
-merge_rest_back_bone(Ems, Choose, BBPos, Acc) ->
+merge_rest_back_bone(Ems, Choose, BBPos, Acc, Reporter) ->
{ok, {BB, NextBB}} = couch_file:pread_term(Ems#ems.fd, BBPos),
- NewPos = merge_chains(Ems, Choose, BB),
+ NewPos = merge_chains(Ems, Choose, BB, Reporter),
{NewBB, NewPrev} = append_item(Ems, Acc, NewPos, Ems#ems.bb_chunk),
- merge_rest_back_bone(Ems, Choose, NextBB, {NewBB, NewPrev}).
+ merge_rest_back_bone(Ems, Choose, NextBB, {NewBB, NewPrev}, Reporter).
-merge_chains(Ems, Choose, BB) ->
+merge_chains(Ems, Choose, BB, Reporter) ->
Chains = init_chains(Ems, Choose, BB),
- merge_chains(Ems, Choose, Chains, {[], nil}).
+ merge_chains(Ems, Choose, Chains, {[], nil}, Reporter, 0).
-merge_chains(Ems, _Choose, [], ChainAcc) ->
+merge_chains(Ems, _Choose, [], ChainAcc, _Reporter, _Count) ->
{ok, CPos, _} = couch_file:append_term(Ems#ems.fd, ChainAcc),
CPos;
-merge_chains(#ems{chain_chunk=CC}=Ems, Choose, Chains, Acc) ->
+merge_chains(#ems{chain_chunk=CC}=Ems, Choose, Chains, Acc, Reporter, Count0) ->
{KV, RestChains} = choose_kv(Choose, Ems, Chains),
{NewKVs, NewPrev} = append_item(Ems, Acc, KV, CC),
- merge_chains(Ems, Choose, RestChains, {NewKVs, NewPrev}).
+ Count1 = case (Count0 + 1) rem ?REPORT_INTERVAL of
+ 0 ->
+ Reporter(Count0),
+ 0;
+ _ ->
+ Count0 + 1
+ end,
+ merge_chains(Ems, Choose, RestChains, {NewKVs, NewPrev}, Reporter, Count1).
init_chains(Ems, Choose, BB) ->
@@ -316,3 +357,10 @@ append_item(Ems, {List, Prev}, Pos, Size) when length(List) >= Size ->
append_item(_Ems, {List, Prev}, Pos, _Size) ->
{[Pos | List], Prev}.
+
+num_merges(BBChunk, NumBB) when NumBB =< BBChunk ->
+ 0;
+num_merges(BBChunk, NumBB) when NumBB > BBChunk ->
+ RevNumBB = ceil(NumBB / BBChunk),
+ FwdNumBB = ceil(RevNumBB / BBChunk),
+ 2 + num_merges(BBChunk, FwdNumBB).