summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul J. Davis <paul.joseph.davis@gmail.com>2020-04-03 10:08:07 -0500
committerPaul J. Davis <paul.joseph.davis@gmail.com>2020-04-13 16:22:23 -0500
commit45458feb578272a8437a1ecd049ba8932ce4f230 (patch)
treed26740625463e53c9f6d143aa63fa6629eecca39
parent977f514c149f94b8d240b15c32f2d0fe879b5908 (diff)
downloadcouchdb-3.x-optimize-compactor.tar.gz
Update compaction progress during docid phases3.x-optimize-compactor
Previously the sort and copy phases when handling document IDs was not measured in _active_tasks. This adds size tracking to allow operators a way to measure progress during those phases. I'd like to thank Vitaly for the example in #1006 that showed a clean way for tracking the size info in `couch_emsort`. Co-Authored-By: Vitaly Goot <vitaly.goot@gmail.com>
-rw-r--r--src/couch/src/couch_bt_engine_compactor.erl26
-rw-r--r--src/couch/src/couch_emsort.erl102
2 files changed, 99 insertions, 29 deletions
diff --git a/src/couch/src/couch_bt_engine_compactor.erl b/src/couch/src/couch_bt_engine_compactor.erl
index 92ccea575..4bed49c7c 100644
--- a/src/couch/src/couch_bt_engine_compactor.erl
+++ b/src/couch/src/couch_bt_engine_compactor.erl
@@ -318,6 +318,7 @@ copy_compact(#comp_st{} = CompSt) ->
TaskProps0 = [
{type, database_compaction},
{database, DbName},
+ {phase, document_copy},
{progress, 0},
{changes_done, 0},
{total_changes, TotalChanges}
@@ -326,6 +327,7 @@ copy_compact(#comp_st{} = CompSt) ->
true ->
couch_task_status:update([
{retry, true},
+ {phase, document_copy},
{progress, 0},
{changes_done, 0},
{total_changes, TotalChanges}
@@ -502,7 +504,16 @@ copy_doc_attachments(#st{} = SrcSt, SrcSp, DstSt) ->
sort_meta_data(#comp_st{new_st = St0} = CompSt) ->
?COMP_EVENT(md_sort_init),
- {ok, Ems} = couch_emsort:merge(St0#st.id_tree),
+ NumKVs = couch_emsort:num_kvs(St0#st.id_tree),
+ NumMerges = couch_emsort:num_merges(St0#st.id_tree),
+ couch_task_status:update([
+ {phase, docid_sort},
+ {progress, 0},
+ {changes_done, 0},
+ {total_changes, NumMerges * NumKVs}
+ ]),
+ Reporter = fun update_compact_task/1,
+ {ok, Ems} = couch_emsort:merge(St0#st.id_tree, Reporter),
?COMP_EVENT(md_sort_done),
CompSt#comp_st{
new_st = St0#st{
@@ -533,12 +544,20 @@ copy_meta_data(#comp_st{new_st = St} = CompSt) ->
locs=[]
},
?COMP_EVENT(md_copy_init),
+ NumKVs = couch_emsort:num_kvs(Src),
+ couch_task_status:update([
+ {phase, docid_copy},
+ {progress, 0},
+ {changes_done, 0},
+ {total_changes, NumKVs}
+ ]),
Acc = merge_docids(Iter, Acc0),
{ok, Infos} = couch_file:pread_terms(SrcFd, Acc#merge_st.locs),
{ok, IdTree} = couch_btree:add(Acc#merge_st.id_tree, Infos),
{ok, SeqTree} = couch_btree:add_remove(
Acc#merge_st.seq_tree, [], Acc#merge_st.rem_seqs
),
+ update_compact_task(NumKVs),
?COMP_EVENT(md_copy_done),
CompSt#comp_st{
new_st = St#st{
@@ -609,8 +628,10 @@ commit_compaction_data(#st{header = OldHeader} = St0, Fd) ->
bind_emsort(St, Fd, nil) ->
{ok, Ems} = couch_emsort:open(Fd),
St#st{id_tree=Ems};
+bind_emsort(St, Fd, State) when is_integer(State) ->
+ bind_emsort(St, Fd, [{root, State}]);
bind_emsort(St, Fd, State) ->
- {ok, Ems} = couch_emsort:open(Fd, [{root, State}]),
+ {ok, Ems} = couch_emsort:open(Fd, State),
St#st{id_tree=Ems}.
@@ -653,6 +674,7 @@ merge_docids(Iter, #merge_st{locs=Locs}=Acc) when length(Locs) > 1000 ->
rem_seqs=[],
locs=[]
},
+ update_compact_task(length(Locs)),
merge_docids(Iter, Acc1);
merge_docids(Iter, #merge_st{curr=Curr}=Acc) ->
case next_info(Iter, Curr, []) of
diff --git a/src/couch/src/couch_emsort.erl b/src/couch/src/couch_emsort.erl
index 2a25a2322..430d94e01 100644
--- a/src/couch/src/couch_emsort.erl
+++ b/src/couch/src/couch_emsort.erl
@@ -130,17 +130,22 @@
%
-export([open/1, open/2, get_fd/1, get_state/1]).
--export([add/2, merge/1, sort/1, iter/1, next/1]).
-
+-export([add/2, merge/1, merge/2, sort/1, iter/1, next/1]).
+-export([num_kvs/1, num_merges/1]).
-record(ems, {
fd,
root,
bb_chunk = 10,
- chain_chunk = 100
+ chain_chunk = 100,
+ num_kvs = 0,
+ num_bb = 0
}).
+-define(REPORT_INTERVAL, 1000).
+
+
open(Fd) ->
{ok, #ems{fd=Fd}}.
@@ -156,22 +161,39 @@ set_options(Ems, [{root, Root} | Rest]) ->
set_options(Ems, [{chain_chunk, Count} | Rest]) when is_integer(Count) ->
set_options(Ems#ems{chain_chunk=Count}, Rest);
set_options(Ems, [{back_bone_chunk, Count} | Rest]) when is_integer(Count) ->
- set_options(Ems#ems{bb_chunk=Count}, Rest).
+ set_options(Ems#ems{bb_chunk=Count}, Rest);
+set_options(Ems, [{num_kvs, NumKVs} | Rest]) when is_integer(NumKVs) ->
+ set_options(Ems#ems{num_kvs=NumKVs}, Rest);
+set_options(Ems, [{num_bb, NumBB} | Rest]) when is_integer(NumBB) ->
+ set_options(Ems#ems{num_bb=NumBB}, Rest).
get_fd(#ems{fd=Fd}) ->
Fd.
-get_state(#ems{root=Root}) ->
- Root.
+get_state(#ems{} = Ems) ->
+ #ems{
+ root = Root,
+ num_kvs = NumKVs,
+ num_bb = NumBB
+ } = Ems,
+ [
+ {root, Root},
+ {num_kvs, NumKVs},
+ {num_bb, NumBB}
+ ].
add(Ems, []) ->
{ok, Ems};
add(Ems, KVs) ->
Pos = write_kvs(Ems, KVs),
- {ok, add_bb_pos(Ems, Pos)}.
+ NewEms = add_bb_pos(Ems, Pos),
+ {ok, NewEms#ems{
+ num_kvs = Ems#ems.num_kvs + length(KVs),
+ num_bb = Ems#ems.num_bb + 1
+ }}.
sort(#ems{}=Ems) ->
@@ -179,10 +201,14 @@ sort(#ems{}=Ems) ->
iter(Ems1).
-merge(#ems{root=undefined}=Ems) ->
+merge(Ems) ->
+ merge(Ems, fun(_) -> ok end).
+
+
+merge(#ems{root=undefined}=Ems, _Reporter) ->
{ok, Ems};
-merge(#ems{}=Ems) ->
- {ok, decimate(Ems)}.
+merge(#ems{}=Ems, Reporter) ->
+ {ok, decimate(Ems, Reporter)}.
iter(#ems{root=undefined}=Ems) ->
@@ -201,6 +227,13 @@ next({Ems, Chains}) ->
{ok, KV, {Ems, RestChains}}.
+num_kvs(#ems{num_kvs=NumKVs}) ->
+ NumKVs.
+
+num_merges(#ems{bb_chunk=BBChunk, num_bb=NumBB}) ->
+ num_merges(BBChunk, NumBB).
+
+
add_bb_pos(#ems{root=undefined}=Ems, Pos) ->
Ems#ems{root={[Pos], nil}};
add_bb_pos(#ems{root={BB, Prev}}=Ems, Pos) ->
@@ -220,11 +253,11 @@ write_kvs(Ems, KVs) ->
Final.
-decimate(#ems{root={_BB, nil}}=Ems) ->
+decimate(#ems{root={_BB, nil}}=Ems, _Reporter) ->
% We have less than bb_chunk backbone pointers so we're
% good to start streaming KV's back to the client.
Ems;
-decimate(#ems{root={BB, NextBB}}=Ems) ->
+decimate(#ems{root={BB, NextBB}}=Ems, Reporter) ->
% To make sure we have a bounded amount of data in RAM
% at any given point we first need to decimate the data
% by performing the first couple iterations of a merge
@@ -232,43 +265,51 @@ decimate(#ems{root={BB, NextBB}}=Ems) ->
% The first pass gives us a sort with pointers linked from
% largest to smallest.
- {RevBB, RevNextBB} = merge_back_bone(Ems, small, BB, NextBB),
+ {RevBB, RevNextBB} = merge_back_bone(Ems, small, BB, NextBB, Reporter),
% We have to run a second pass so that links are pointed
% back from smallest to largest.
- {FwdBB, FwdNextBB} = merge_back_bone(Ems, big, RevBB, RevNextBB),
+ {FwdBB, FwdNextBB} = merge_back_bone(Ems, big, RevBB, RevNextBB, Reporter),
% Continue deicmating until we have an acceptable bound on
% the number of keys to use.
- decimate(Ems#ems{root={FwdBB, FwdNextBB}}).
+ decimate(Ems#ems{root={FwdBB, FwdNextBB}}, Reporter).
-merge_back_bone(Ems, Choose, BB, NextBB) ->
- BBPos = merge_chains(Ems, Choose, BB),
- merge_rest_back_bone(Ems, Choose, NextBB, {[BBPos], nil}).
+merge_back_bone(Ems, Choose, BB, NextBB, Reporter) ->
+ BBPos = merge_chains(Ems, Choose, BB, Reporter),
+ Reporter(length(BB)),
+ merge_rest_back_bone(Ems, Choose, NextBB, {[BBPos], nil}, Reporter).
-merge_rest_back_bone(_Ems, _Choose, nil, Acc) ->
+merge_rest_back_bone(_Ems, _Choose, nil, Acc, _Reporter) ->
Acc;
-merge_rest_back_bone(Ems, Choose, BBPos, Acc) ->
+merge_rest_back_bone(Ems, Choose, BBPos, Acc, Reporter) ->
{ok, {BB, NextBB}} = couch_file:pread_term(Ems#ems.fd, BBPos),
- NewPos = merge_chains(Ems, Choose, BB),
+ NewPos = merge_chains(Ems, Choose, BB, Reporter),
{NewBB, NewPrev} = append_item(Ems, Acc, NewPos, Ems#ems.bb_chunk),
- merge_rest_back_bone(Ems, Choose, NextBB, {NewBB, NewPrev}).
+ merge_rest_back_bone(Ems, Choose, NextBB, {NewBB, NewPrev}, Reporter).
-merge_chains(Ems, Choose, BB) ->
+merge_chains(Ems, Choose, BB, Reporter) ->
Chains = init_chains(Ems, Choose, BB),
- merge_chains(Ems, Choose, Chains, {[], nil}).
+ merge_chains(Ems, Choose, Chains, {[], nil}, Reporter, 0).
-merge_chains(Ems, _Choose, [], ChainAcc) ->
+merge_chains(Ems, _Choose, [], ChainAcc, _Reporter, _Count) ->
{ok, CPos, _} = couch_file:append_term(Ems#ems.fd, ChainAcc),
CPos;
-merge_chains(#ems{chain_chunk=CC}=Ems, Choose, Chains, Acc) ->
+merge_chains(#ems{chain_chunk=CC}=Ems, Choose, Chains, Acc, Reporter, Count0) ->
{KV, RestChains} = choose_kv(Choose, Ems, Chains),
{NewKVs, NewPrev} = append_item(Ems, Acc, KV, CC),
- merge_chains(Ems, Choose, RestChains, {NewKVs, NewPrev}).
+ Count1 = case (Count0 + 1) rem ?REPORT_INTERVAL of
+ 0 ->
+ Reporter(Count0),
+ 0;
+ _ ->
+ Count0 + 1
+ end,
+ merge_chains(Ems, Choose, RestChains, {NewKVs, NewPrev}, Reporter, Count1).
init_chains(Ems, Choose, BB) ->
@@ -316,3 +357,10 @@ append_item(Ems, {List, Prev}, Pos, Size) when length(List) >= Size ->
append_item(_Ems, {List, Prev}, Pos, _Size) ->
{[Pos | List], Prev}.
+
+num_merges(BBChunk, NumBB) when NumBB =< BBChunk ->
+ 0;
+num_merges(BBChunk, NumBB) when NumBB > BBChunk ->
+ RevNumBB = ceil(NumBB / BBChunk),
+ FwdNumBB = ceil(RevNumBB / BBChunk),
+ 2 + num_merges(BBChunk, FwdNumBB).