summaryrefslogtreecommitdiff
path: root/src/couch/src/couch_emsort.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couch/src/couch_emsort.erl')
-rw-r--r--src/couch/src/couch_emsort.erl115
1 files changed, 48 insertions, 67 deletions
diff --git a/src/couch/src/couch_emsort.erl b/src/couch/src/couch_emsort.erl
index 430d94e01..9dcc08d67 100644
--- a/src/couch/src/couch_emsort.erl
+++ b/src/couch/src/couch_emsort.erl
@@ -142,36 +142,30 @@
num_bb = 0
}).
-
-define(REPORT_INTERVAL, 1000).
-
open(Fd) ->
- {ok, #ems{fd=Fd}}.
-
+ {ok, #ems{fd = Fd}}.
open(Fd, Options) ->
- {ok, set_options(#ems{fd=Fd}, Options)}.
-
+ {ok, set_options(#ems{fd = Fd}, Options)}.
set_options(Ems, []) ->
Ems;
set_options(Ems, [{root, Root} | Rest]) ->
- set_options(Ems#ems{root=Root}, Rest);
+ set_options(Ems#ems{root = Root}, Rest);
set_options(Ems, [{chain_chunk, Count} | Rest]) when is_integer(Count) ->
- set_options(Ems#ems{chain_chunk=Count}, Rest);
+ set_options(Ems#ems{chain_chunk = Count}, Rest);
set_options(Ems, [{back_bone_chunk, Count} | Rest]) when is_integer(Count) ->
- set_options(Ems#ems{bb_chunk=Count}, Rest);
+ set_options(Ems#ems{bb_chunk = Count}, Rest);
set_options(Ems, [{num_kvs, NumKVs} | Rest]) when is_integer(NumKVs) ->
- set_options(Ems#ems{num_kvs=NumKVs}, Rest);
+ set_options(Ems#ems{num_kvs = NumKVs}, Rest);
set_options(Ems, [{num_bb, NumBB} | Rest]) when is_integer(NumBB) ->
- set_options(Ems#ems{num_bb=NumBB}, Rest).
+ set_options(Ems#ems{num_bb = NumBB}, Rest).
-
-get_fd(#ems{fd=Fd}) ->
+get_fd(#ems{fd = Fd}) ->
Fd.
-
get_state(#ems{} = Ems) ->
#ems{
root = Root,
@@ -184,7 +178,6 @@ get_state(#ems{} = Ems) ->
{num_bb, NumBB}
].
-
add(Ems, []) ->
{ok, Ems};
add(Ems, KVs) ->
@@ -195,69 +188,64 @@ add(Ems, KVs) ->
num_bb = Ems#ems.num_bb + 1
}}.
-
-sort(#ems{}=Ems) ->
+sort(#ems{} = Ems) ->
{ok, Ems1} = merge(Ems),
iter(Ems1).
-
merge(Ems) ->
merge(Ems, fun(_) -> ok end).
-
-merge(#ems{root=undefined}=Ems, _Reporter) ->
+merge(#ems{root = undefined} = Ems, _Reporter) ->
{ok, Ems};
-merge(#ems{}=Ems, Reporter) ->
+merge(#ems{} = Ems, Reporter) ->
{ok, decimate(Ems, Reporter)}.
-
-iter(#ems{root=undefined}=Ems) ->
+iter(#ems{root = undefined} = Ems) ->
{ok, {Ems, []}};
-iter(#ems{root={BB, nil}}=Ems) ->
+iter(#ems{root = {BB, nil}} = Ems) ->
Chains = init_chains(Ems, small, BB),
{ok, {Ems, Chains}};
-iter(#ems{root={_, _}}) ->
+iter(#ems{root = {_, _}}) ->
{error, not_merged}.
-
next({_Ems, []}) ->
finished;
next({Ems, Chains}) ->
{KV, RestChains} = choose_kv(small, Ems, Chains),
{ok, KV, {Ems, RestChains}}.
-
-num_kvs(#ems{num_kvs=NumKVs}) ->
+num_kvs(#ems{num_kvs = NumKVs}) ->
NumKVs.
-num_merges(#ems{bb_chunk=BBChunk, num_bb=NumBB}) ->
+num_merges(#ems{bb_chunk = BBChunk, num_bb = NumBB}) ->
num_merges(BBChunk, NumBB).
-
-add_bb_pos(#ems{root=undefined}=Ems, Pos) ->
- Ems#ems{root={[Pos], nil}};
-add_bb_pos(#ems{root={BB, Prev}}=Ems, Pos) ->
+add_bb_pos(#ems{root = undefined} = Ems, Pos) ->
+ Ems#ems{root = {[Pos], nil}};
+add_bb_pos(#ems{root = {BB, Prev}} = Ems, Pos) ->
{NewBB, NewPrev} = append_item(Ems, {BB, Prev}, Pos, Ems#ems.bb_chunk),
- Ems#ems{root={NewBB, NewPrev}}.
-
+ Ems#ems{root = {NewBB, NewPrev}}.
write_kvs(Ems, KVs) ->
% Write the list of KV's to disk in sorted order in chunks
% of 100. Also make sure that the order is so that they
% can be streamed in asscending order.
{LastKVs, LastPos} =
- lists:foldr(fun(KV, Acc) ->
- append_item(Ems, Acc, KV, Ems#ems.chain_chunk)
- end, {[], nil}, lists:sort(KVs)),
+ lists:foldr(
+ fun(KV, Acc) ->
+ append_item(Ems, Acc, KV, Ems#ems.chain_chunk)
+ end,
+ {[], nil},
+ lists:sort(KVs)
+ ),
{ok, Final, _} = couch_file:append_term(Ems#ems.fd, {LastKVs, LastPos}),
Final.
-
-decimate(#ems{root={_BB, nil}}=Ems, _Reporter) ->
+decimate(#ems{root = {_BB, nil}} = Ems, _Reporter) ->
% We have less than bb_chunk backbone pointers so we're
% good to start streaming KV's back to the client.
Ems;
-decimate(#ems{root={BB, NextBB}}=Ems, Reporter) ->
+decimate(#ems{root = {BB, NextBB}} = Ems, Reporter) ->
% To make sure we have a bounded amount of data in RAM
% at any given point we first need to decimate the data
% by performing the first couple iterations of a merge
@@ -273,15 +261,13 @@ decimate(#ems{root={BB, NextBB}}=Ems, Reporter) ->
% Continue deicmating until we have an acceptable bound on
% the number of keys to use.
- decimate(Ems#ems{root={FwdBB, FwdNextBB}}, Reporter).
-
+ decimate(Ems#ems{root = {FwdBB, FwdNextBB}}, Reporter).
merge_back_bone(Ems, Choose, BB, NextBB, Reporter) ->
BBPos = merge_chains(Ems, Choose, BB, Reporter),
Reporter(length(BB)),
merge_rest_back_bone(Ems, Choose, NextBB, {[BBPos], nil}, Reporter).
-
merge_rest_back_bone(_Ems, _Choose, nil, Acc, _Reporter) ->
Acc;
merge_rest_back_bone(Ems, Choose, BBPos, Acc, Reporter) ->
@@ -290,40 +276,39 @@ merge_rest_back_bone(Ems, Choose, BBPos, Acc, Reporter) ->
{NewBB, NewPrev} = append_item(Ems, Acc, NewPos, Ems#ems.bb_chunk),
merge_rest_back_bone(Ems, Choose, NextBB, {NewBB, NewPrev}, Reporter).
-
merge_chains(Ems, Choose, BB, Reporter) ->
Chains = init_chains(Ems, Choose, BB),
merge_chains(Ems, Choose, Chains, {[], nil}, Reporter, 0).
-
merge_chains(Ems, _Choose, [], ChainAcc, _Reporter, _Count) ->
{ok, CPos, _} = couch_file:append_term(Ems#ems.fd, ChainAcc),
CPos;
-merge_chains(#ems{chain_chunk=CC}=Ems, Choose, Chains, Acc, Reporter, Count0) ->
+merge_chains(#ems{chain_chunk = CC} = Ems, Choose, Chains, Acc, Reporter, Count0) ->
{KV, RestChains} = choose_kv(Choose, Ems, Chains),
{NewKVs, NewPrev} = append_item(Ems, Acc, KV, CC),
- Count1 = case (Count0 + 1) rem ?REPORT_INTERVAL of
- 0 ->
- Reporter(Count0),
- 0;
- _ ->
- Count0 + 1
- end,
+ Count1 =
+ case (Count0 + 1) rem ?REPORT_INTERVAL of
+ 0 ->
+ Reporter(Count0),
+ 0;
+ _ ->
+ Count0 + 1
+ end,
merge_chains(Ems, Choose, RestChains, {NewKVs, NewPrev}, Reporter, Count1).
-
init_chains(Ems, Choose, BB) ->
- Chains = lists:map(fun(CPos) ->
- {ok, {KVs, NextKVs}} = couch_file:pread_term(Ems#ems.fd, CPos),
- {KVs, NextKVs}
- end, BB),
+ Chains = lists:map(
+ fun(CPos) ->
+ {ok, {KVs, NextKVs}} = couch_file:pread_term(Ems#ems.fd, CPos),
+ {KVs, NextKVs}
+ end,
+ BB
+ ),
order_chains(Choose, Chains).
-
order_chains(small, Chains) -> lists:sort(Chains);
order_chains(big, Chains) -> lists:reverse(lists:sort(Chains)).
-
choose_kv(_Choose, _Ems, [{[KV], nil} | Rest]) ->
{KV, Rest};
choose_kv(Choose, Ems, [{[KV], Pos} | RestChains]) ->
@@ -338,26 +323,22 @@ choose_kv(Choose, _Ems, [{[KV | RestKVs], Prev} | RestChains]) ->
big -> {KV, ins_big_chain(RestChains, {RestKVs, Prev}, [])}
end.
-
-ins_small_chain([{[{K1,_}|_],_}=C1|Rest], {[{K2,_}|_],_}=C2, Acc) when K1<K2 ->
+ins_small_chain([{[{K1, _} | _], _} = C1 | Rest], {[{K2, _} | _], _} = C2, Acc) when K1 < K2 ->
ins_small_chain(Rest, C2, [C1 | Acc]);
ins_small_chain(Rest, Chain, Acc) ->
lists:reverse(Acc, [Chain | Rest]).
-
-ins_big_chain([{[{K1,_}|_],_}=C1|Rest], {[{K2,_}|_],_}=C2, Acc) when K1>K2 ->
+ins_big_chain([{[{K1, _} | _], _} = C1 | Rest], {[{K2, _} | _], _} = C2, Acc) when K1 > K2 ->
ins_big_chain(Rest, C2, [C1 | Acc]);
ins_big_chain(Rest, Chain, Acc) ->
lists:reverse(Acc, [Chain | Rest]).
-
append_item(Ems, {List, Prev}, Pos, Size) when length(List) >= Size ->
{ok, PrevList, _} = couch_file:append_term(Ems#ems.fd, {List, Prev}),
{[Pos], PrevList};
append_item(_Ems, {List, Prev}, Pos, _Size) ->
{[Pos | List], Prev}.
-
num_merges(BBChunk, NumBB) when NumBB =< BBChunk ->
0;
num_merges(BBChunk, NumBB) when NumBB > BBChunk ->