summaryrefslogtreecommitdiff
path: root/src/couch/src/couch_emsort.erl
blob: 430d94e01288c217d999e2aaaf1c144eeb554060 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
%   http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.

-module(couch_emsort).

% This is an implementation of an external N-way merge sort. It's primary
% purpose is to be used during database compaction as an optimization for
% managing the docid btree.
%
% Trunk currently writes the docid btree as its compacting the database but
% this is quite inneficient as its written out of order in the general case
% as writes are ordered by update_seq.
%
% The general design of this module is a very standard merge sort with one
% caveat due to append only files. This is described in more detail in the
% sorting phase.
%
% The basic algorithm is in two halves. The first half stores KV pairs to disk
% which is then followed by the actual sorting phase that streams KV's back
% to the client using a fold-like function. After some basic definitions we'll
% describe both phases.
%
% Key/Value apairs (aka, KV pairs, or KVs) are simply lists of two-tuples with
% a key as the first element and an arbitrary value as the second. The key of
% this pair is what used to determine the sort order based on native Erlang
% term comparison.
%
% Internally, KVs are stored as lists with a max size defined by
% #ems.chain_chunk. These lists are then chained together on disk using disk
% offsets as a poor man's linked list. The basic format of a list looks like
% {KVs, DiskOffset} where DiskOffset is either the atom nil which means "end
% of the list" or an integer that is a file position offset that is the
% location of another {KVs, DiskOffset} term. The head of each list is
% referred to with a single DiskOffset. The set of terms that extend from
% this initial DiskOffset to the last {KVs, nil} term is referred to in the
% code as a chain. Two important facts are that one call to couch_emsort:add/2
% creates a single chain, and that a chain is always sorted on disk (though its
% possible to be sorted in descending order which will be discussed later).
%
% The second major internal structure is the back bone. This is a list of
% chains that has a quite similar structure to chains but contains different
% data types and has no guarantee on ordering. The back bone is merely the
% list of all head DiskOffsets. The structure has the similar structure of
% {DiskOffsets, DiskOffset} that we use for chains, except that DiskOffsets is
% a list of integers that refer to the heads of chains. The maximum size of
% DiskOffsets is defined by #ems.bb_chunk. It is important to note that the
% backbone has no defined ordering. The other thing of note is that the RAM
% bounds are loosely defined as:
%
%     #ems.bb_chunk * #ems.chain_chunk * avg_size(KV).
%
% Build Phase
% -----------
%
% As mentioned, each call to couch_emsort:add/2 creates a chain from the
% list of KVs that are passed in. This list is first sorted and then the
% chain is created by foldr-ing (note: r) across the list to build the
% chain on disk. It is important to note that the final chain is then
% sorted in ascending order on disk.
%
%
% Sort Phase
% ----------
%
% The sort phase is where the merge sort kicks in. This is generally your
% average merge sort with a caveat for append only storage. First the
% general outline.
%
% The general outline for this sort is that it iteratively merges chains
% in the backbone until less than #ems.bb_chunk chains exist. At this
% point it switches to the last merge sort phase where it just streams
% the sorted KVs back to the client using a fold function.
%
% The general chain merging is a pretty standard merge sort. You load up
% the initial KVs from each phase, pick the next one in sort order and
% then when you run out of KVs you're left with a single DiskOffset for
% the head of a single chain that represents the merge. These new
% DiskOffsets are used to build the new back bone.
%
% The one caveat here is that we're using append only storage. This is
% important because once we make a pass we've effectively reversed the
% sort order of each chain. Ie, the first merge results in chains that
% are ordered in descending order. Since, one pass reverses the list
% the trick is that each phase does two passes. The first phase picks
% the smallest KV to write next and the second phase picks the largest.
% In this manner each time we do a back bone merge we end up with chains
% that are always sorted in an ascending order.
%
% The one downfall is that in the interest of simplicity the sorting is
% restricted to Erlang's native term sorting. A possible extension would
% be to allow two comparison functions to be used, but this module is
% currently only used for docid sorting which is hardcoded to be raw
% Erlang ordering.
%
% Diagram
% -------
%
% If it helps, this is a general diagram of the internal structures. A
% couple points to note since this is ASCII art. The BB pointers across
% the top are lists of chains going down. Each BBN item is one of the
% {DiskOffsets, DiskOffset} structures discussed earlier. Going down,
% the CMN nodes are actually representing #ems.bb_chunk chains in parallel
% going off the back bone. It is important and not represented in this
% diagram that within these groups the chains don't have to be the same
% length. That's just a limitiationg of my ASCII artistic abilities.
%
% The BBN* node is marked with a * to denote that it is the only state
% that we store when writing headeres to disk as it has pointers that
% lead us to all data in the tree.
%
%     BB1 <- BB2 <- BB3 <- BBN*
%      |      |      |      |
%      v      v      v      v
%     CA1    CB1    CC1    CD1
%      |             |      |
%      v             v      v
%     CA2           CC2    CD2
%      |                    |
%      v                    v
%     CA3                  CD3
%

-export([open/1, open/2, get_fd/1, get_state/1]).
-export([add/2, merge/1, merge/2, sort/1, iter/1, next/1]).
-export([num_kvs/1, num_merges/1]).

-record(ems, {
    fd,
    root,
    bb_chunk = 10,
    chain_chunk = 100,
    num_kvs = 0,
    num_bb = 0
}).


-define(REPORT_INTERVAL, 1000).


open(Fd) ->
    {ok, #ems{fd=Fd}}.


open(Fd, Options) ->
    {ok, set_options(#ems{fd=Fd}, Options)}.


set_options(Ems, []) ->
    Ems;
set_options(Ems, [{root, Root} | Rest]) ->
    set_options(Ems#ems{root=Root}, Rest);
set_options(Ems, [{chain_chunk, Count} | Rest]) when is_integer(Count) ->
    set_options(Ems#ems{chain_chunk=Count}, Rest);
set_options(Ems, [{back_bone_chunk, Count} | Rest]) when is_integer(Count) ->
    set_options(Ems#ems{bb_chunk=Count}, Rest);
set_options(Ems, [{num_kvs, NumKVs} | Rest]) when is_integer(NumKVs) ->
    set_options(Ems#ems{num_kvs=NumKVs}, Rest);
set_options(Ems, [{num_bb, NumBB} | Rest]) when is_integer(NumBB) ->
    set_options(Ems#ems{num_bb=NumBB}, Rest).


get_fd(#ems{fd=Fd}) ->
    Fd.


get_state(#ems{} = Ems) ->
    #ems{
        root = Root,
        num_kvs = NumKVs,
        num_bb = NumBB
    } = Ems,
    [
        {root, Root},
        {num_kvs, NumKVs},
        {num_bb, NumBB}
    ].


add(Ems, []) ->
    {ok, Ems};
add(Ems, KVs) ->
    Pos = write_kvs(Ems, KVs),
    NewEms = add_bb_pos(Ems, Pos),
    {ok, NewEms#ems{
        num_kvs = Ems#ems.num_kvs + length(KVs),
        num_bb = Ems#ems.num_bb + 1
    }}.


sort(#ems{}=Ems) ->
    {ok, Ems1} = merge(Ems),
    iter(Ems1).


merge(Ems) ->
    merge(Ems, fun(_) -> ok end).


merge(#ems{root=undefined}=Ems, _Reporter) ->
    {ok, Ems};
merge(#ems{}=Ems, Reporter) ->
    {ok, decimate(Ems, Reporter)}.


iter(#ems{root=undefined}=Ems) ->
    {ok, {Ems, []}};
iter(#ems{root={BB, nil}}=Ems) ->
    Chains = init_chains(Ems, small, BB),
    {ok, {Ems, Chains}};
iter(#ems{root={_, _}}) ->
    {error, not_merged}.


next({_Ems, []}) ->
    finished;
next({Ems, Chains}) ->
    {KV, RestChains} = choose_kv(small, Ems, Chains),
    {ok, KV, {Ems, RestChains}}.


num_kvs(#ems{num_kvs=NumKVs}) ->
    NumKVs.

num_merges(#ems{bb_chunk=BBChunk, num_bb=NumBB}) ->
    num_merges(BBChunk, NumBB).


add_bb_pos(#ems{root=undefined}=Ems, Pos) ->
    Ems#ems{root={[Pos], nil}};
add_bb_pos(#ems{root={BB, Prev}}=Ems, Pos) ->
    {NewBB, NewPrev} = append_item(Ems, {BB, Prev}, Pos, Ems#ems.bb_chunk),
    Ems#ems{root={NewBB, NewPrev}}.


write_kvs(Ems, KVs) ->
    % Write the list of KV's to disk in sorted order in chunks
    % of 100. Also make sure that the order is so that they
    % can be streamed in asscending order.
    {LastKVs, LastPos} =
    lists:foldr(fun(KV, Acc) ->
        append_item(Ems, Acc, KV, Ems#ems.chain_chunk)
    end, {[], nil}, lists:sort(KVs)),
    {ok, Final, _} = couch_file:append_term(Ems#ems.fd, {LastKVs, LastPos}),
    Final.


decimate(#ems{root={_BB, nil}}=Ems, _Reporter) ->
    % We have less than bb_chunk backbone pointers so we're
    % good to start streaming KV's back to the client.
    Ems;
decimate(#ems{root={BB, NextBB}}=Ems, Reporter) ->
    % To make sure we have a bounded amount of data in RAM
    % at any given point we first need to decimate the data
    % by performing the first couple iterations of a merge
    % sort writing the intermediate results back to disk.

    % The first pass gives us a sort with pointers linked from
    % largest to smallest.
    {RevBB, RevNextBB} = merge_back_bone(Ems, small, BB, NextBB, Reporter),

    % We have to run a second pass so that links are pointed
    % back from smallest to largest.
    {FwdBB, FwdNextBB} = merge_back_bone(Ems, big, RevBB, RevNextBB, Reporter),

    % Continue deicmating until we have an acceptable bound on
    % the number of keys to use.
    decimate(Ems#ems{root={FwdBB, FwdNextBB}}, Reporter).


merge_back_bone(Ems, Choose, BB, NextBB, Reporter) ->
    BBPos = merge_chains(Ems, Choose, BB, Reporter),
    Reporter(length(BB)),
    merge_rest_back_bone(Ems, Choose, NextBB, {[BBPos], nil}, Reporter).


merge_rest_back_bone(_Ems, _Choose, nil, Acc, _Reporter) ->
    Acc;
merge_rest_back_bone(Ems, Choose, BBPos, Acc, Reporter) ->
    {ok, {BB, NextBB}} = couch_file:pread_term(Ems#ems.fd, BBPos),
    NewPos = merge_chains(Ems, Choose, BB, Reporter),
    {NewBB, NewPrev} = append_item(Ems, Acc, NewPos, Ems#ems.bb_chunk),
    merge_rest_back_bone(Ems, Choose, NextBB, {NewBB, NewPrev}, Reporter).


merge_chains(Ems, Choose, BB, Reporter) ->
    Chains = init_chains(Ems, Choose, BB),
    merge_chains(Ems, Choose, Chains, {[], nil}, Reporter, 0).


merge_chains(Ems, _Choose, [], ChainAcc, _Reporter, _Count) ->
    {ok, CPos, _} = couch_file:append_term(Ems#ems.fd, ChainAcc),
    CPos;
merge_chains(#ems{chain_chunk=CC}=Ems, Choose, Chains, Acc, Reporter, Count0) ->
    {KV, RestChains} = choose_kv(Choose, Ems, Chains),
    {NewKVs, NewPrev} = append_item(Ems, Acc, KV, CC),
    Count1 = case (Count0 + 1) rem ?REPORT_INTERVAL of
        0 ->
            Reporter(Count0),
            0;
        _ ->
            Count0 + 1
    end,
    merge_chains(Ems, Choose, RestChains, {NewKVs, NewPrev}, Reporter, Count1).


init_chains(Ems, Choose, BB) ->
    Chains = lists:map(fun(CPos) ->
        {ok, {KVs, NextKVs}} = couch_file:pread_term(Ems#ems.fd, CPos),
        {KVs, NextKVs}
    end, BB),
    order_chains(Choose, Chains).


order_chains(small, Chains) -> lists:sort(Chains);
order_chains(big, Chains) -> lists:reverse(lists:sort(Chains)).


choose_kv(_Choose, _Ems, [{[KV], nil} | Rest]) ->
    {KV, Rest};
choose_kv(Choose, Ems, [{[KV], Pos} | RestChains]) ->
    {ok, Chain} = couch_file:pread_term(Ems#ems.fd, Pos),
    case Choose of
        small -> {KV, ins_small_chain(RestChains, Chain, [])};
        big -> {KV, ins_big_chain(RestChains, Chain, [])}
    end;
choose_kv(Choose, _Ems, [{[KV | RestKVs], Prev} | RestChains]) ->
    case Choose of
        small -> {KV, ins_small_chain(RestChains, {RestKVs, Prev}, [])};
        big -> {KV, ins_big_chain(RestChains, {RestKVs, Prev}, [])}
    end.


ins_small_chain([{[{K1,_}|_],_}=C1|Rest], {[{K2,_}|_],_}=C2, Acc) when K1<K2 ->
    ins_small_chain(Rest, C2, [C1 | Acc]);
ins_small_chain(Rest, Chain, Acc) ->
    lists:reverse(Acc, [Chain | Rest]).


ins_big_chain([{[{K1,_}|_],_}=C1|Rest], {[{K2,_}|_],_}=C2, Acc) when K1>K2 ->
    ins_big_chain(Rest, C2, [C1 | Acc]);
ins_big_chain(Rest, Chain, Acc) ->
    lists:reverse(Acc, [Chain | Rest]).


append_item(Ems, {List, Prev}, Pos, Size) when length(List) >= Size ->
    {ok, PrevList, _} = couch_file:append_term(Ems#ems.fd, {List, Prev}),
    {[Pos], PrevList};
append_item(_Ems, {List, Prev}, Pos, _Size) ->
    {[Pos | List], Prev}.


num_merges(BBChunk, NumBB) when NumBB =< BBChunk ->
    0;
num_merges(BBChunk, NumBB) when NumBB > BBChunk ->
    RevNumBB = ceil(NumBB / BBChunk),
    FwdNumBB = ceil(RevNumBB / BBChunk),
    2 + num_merges(BBChunk, FwdNumBB).