diff options
author | Paul J. Davis <paul.joseph.davis@gmail.com> | 2020-04-03 10:08:07 -0500 |
---|---|---|
committer | Paul J. Davis <paul.joseph.davis@gmail.com> | 2020-04-13 16:22:23 -0500 |
commit | 45458feb578272a8437a1ecd049ba8932ce4f230 (patch) | |
tree | d26740625463e53c9f6d143aa63fa6629eecca39 | |
parent | 977f514c149f94b8d240b15c32f2d0fe879b5908 (diff) | |
download | couchdb-3.x-optimize-compactor.tar.gz |
Update compaction progress during docid phases3.x-optimize-compactor
Previously the sort and copy phases when handling document IDs was not
measured in _active_tasks. This adds size tracking to allow operators a
way to measure progress during those phases.
I'd like to thank Vitaly for the example in #1006 that showed a clean
way for tracking the size info in `couch_emsort`.
Co-Authored-By: Vitaly Goot <vitaly.goot@gmail.com>
-rw-r--r-- | src/couch/src/couch_bt_engine_compactor.erl | 26 | ||||
-rw-r--r-- | src/couch/src/couch_emsort.erl | 102 |
2 files changed, 99 insertions, 29 deletions
diff --git a/src/couch/src/couch_bt_engine_compactor.erl b/src/couch/src/couch_bt_engine_compactor.erl index 92ccea575..4bed49c7c 100644 --- a/src/couch/src/couch_bt_engine_compactor.erl +++ b/src/couch/src/couch_bt_engine_compactor.erl @@ -318,6 +318,7 @@ copy_compact(#comp_st{} = CompSt) -> TaskProps0 = [ {type, database_compaction}, {database, DbName}, + {phase, document_copy}, {progress, 0}, {changes_done, 0}, {total_changes, TotalChanges} @@ -326,6 +327,7 @@ copy_compact(#comp_st{} = CompSt) -> true -> couch_task_status:update([ {retry, true}, + {phase, document_copy}, {progress, 0}, {changes_done, 0}, {total_changes, TotalChanges} @@ -502,7 +504,16 @@ copy_doc_attachments(#st{} = SrcSt, SrcSp, DstSt) -> sort_meta_data(#comp_st{new_st = St0} = CompSt) -> ?COMP_EVENT(md_sort_init), - {ok, Ems} = couch_emsort:merge(St0#st.id_tree), + NumKVs = couch_emsort:num_kvs(St0#st.id_tree), + NumMerges = couch_emsort:num_merges(St0#st.id_tree), + couch_task_status:update([ + {phase, docid_sort}, + {progress, 0}, + {changes_done, 0}, + {total_changes, NumMerges * NumKVs} + ]), + Reporter = fun update_compact_task/1, + {ok, Ems} = couch_emsort:merge(St0#st.id_tree, Reporter), ?COMP_EVENT(md_sort_done), CompSt#comp_st{ new_st = St0#st{ @@ -533,12 +544,20 @@ copy_meta_data(#comp_st{new_st = St} = CompSt) -> locs=[] }, ?COMP_EVENT(md_copy_init), + NumKVs = couch_emsort:num_kvs(Src), + couch_task_status:update([ + {phase, docid_copy}, + {progress, 0}, + {changes_done, 0}, + {total_changes, NumKVs} + ]), Acc = merge_docids(Iter, Acc0), {ok, Infos} = couch_file:pread_terms(SrcFd, Acc#merge_st.locs), {ok, IdTree} = couch_btree:add(Acc#merge_st.id_tree, Infos), {ok, SeqTree} = couch_btree:add_remove( Acc#merge_st.seq_tree, [], Acc#merge_st.rem_seqs ), + update_compact_task(NumKVs), ?COMP_EVENT(md_copy_done), CompSt#comp_st{ new_st = St#st{ @@ -609,8 +628,10 @@ commit_compaction_data(#st{header = OldHeader} = St0, Fd) -> bind_emsort(St, Fd, nil) -> {ok, Ems} = couch_emsort:open(Fd), St#st{id_tree=Ems}; +bind_emsort(St, Fd, State) when is_integer(State) -> + bind_emsort(St, Fd, [{root, State}]); bind_emsort(St, Fd, State) -> - {ok, Ems} = couch_emsort:open(Fd, [{root, State}]), + {ok, Ems} = couch_emsort:open(Fd, State), St#st{id_tree=Ems}. @@ -653,6 +674,7 @@ merge_docids(Iter, #merge_st{locs=Locs}=Acc) when length(Locs) > 1000 -> rem_seqs=[], locs=[] }, + update_compact_task(length(Locs)), merge_docids(Iter, Acc1); merge_docids(Iter, #merge_st{curr=Curr}=Acc) -> case next_info(Iter, Curr, []) of diff --git a/src/couch/src/couch_emsort.erl b/src/couch/src/couch_emsort.erl index 2a25a2322..430d94e01 100644 --- a/src/couch/src/couch_emsort.erl +++ b/src/couch/src/couch_emsort.erl @@ -130,17 +130,22 @@ % -export([open/1, open/2, get_fd/1, get_state/1]). --export([add/2, merge/1, sort/1, iter/1, next/1]). - +-export([add/2, merge/1, merge/2, sort/1, iter/1, next/1]). +-export([num_kvs/1, num_merges/1]). -record(ems, { fd, root, bb_chunk = 10, - chain_chunk = 100 + chain_chunk = 100, + num_kvs = 0, + num_bb = 0 }). +-define(REPORT_INTERVAL, 1000). + + open(Fd) -> {ok, #ems{fd=Fd}}. @@ -156,22 +161,39 @@ set_options(Ems, [{root, Root} | Rest]) -> set_options(Ems, [{chain_chunk, Count} | Rest]) when is_integer(Count) -> set_options(Ems#ems{chain_chunk=Count}, Rest); set_options(Ems, [{back_bone_chunk, Count} | Rest]) when is_integer(Count) -> - set_options(Ems#ems{bb_chunk=Count}, Rest). + set_options(Ems#ems{bb_chunk=Count}, Rest); +set_options(Ems, [{num_kvs, NumKVs} | Rest]) when is_integer(NumKVs) -> + set_options(Ems#ems{num_kvs=NumKVs}, Rest); +set_options(Ems, [{num_bb, NumBB} | Rest]) when is_integer(NumBB) -> + set_options(Ems#ems{num_bb=NumBB}, Rest). get_fd(#ems{fd=Fd}) -> Fd. -get_state(#ems{root=Root}) -> - Root. +get_state(#ems{} = Ems) -> + #ems{ + root = Root, + num_kvs = NumKVs, + num_bb = NumBB + } = Ems, + [ + {root, Root}, + {num_kvs, NumKVs}, + {num_bb, NumBB} + ]. add(Ems, []) -> {ok, Ems}; add(Ems, KVs) -> Pos = write_kvs(Ems, KVs), - {ok, add_bb_pos(Ems, Pos)}. + NewEms = add_bb_pos(Ems, Pos), + {ok, NewEms#ems{ + num_kvs = Ems#ems.num_kvs + length(KVs), + num_bb = Ems#ems.num_bb + 1 + }}. sort(#ems{}=Ems) -> @@ -179,10 +201,14 @@ sort(#ems{}=Ems) -> iter(Ems1). -merge(#ems{root=undefined}=Ems) -> +merge(Ems) -> + merge(Ems, fun(_) -> ok end). + + +merge(#ems{root=undefined}=Ems, _Reporter) -> {ok, Ems}; -merge(#ems{}=Ems) -> - {ok, decimate(Ems)}. +merge(#ems{}=Ems, Reporter) -> + {ok, decimate(Ems, Reporter)}. iter(#ems{root=undefined}=Ems) -> @@ -201,6 +227,13 @@ next({Ems, Chains}) -> {ok, KV, {Ems, RestChains}}. +num_kvs(#ems{num_kvs=NumKVs}) -> + NumKVs. + +num_merges(#ems{bb_chunk=BBChunk, num_bb=NumBB}) -> + num_merges(BBChunk, NumBB). + + add_bb_pos(#ems{root=undefined}=Ems, Pos) -> Ems#ems{root={[Pos], nil}}; add_bb_pos(#ems{root={BB, Prev}}=Ems, Pos) -> @@ -220,11 +253,11 @@ write_kvs(Ems, KVs) -> Final. -decimate(#ems{root={_BB, nil}}=Ems) -> +decimate(#ems{root={_BB, nil}}=Ems, _Reporter) -> % We have less than bb_chunk backbone pointers so we're % good to start streaming KV's back to the client. Ems; -decimate(#ems{root={BB, NextBB}}=Ems) -> +decimate(#ems{root={BB, NextBB}}=Ems, Reporter) -> % To make sure we have a bounded amount of data in RAM % at any given point we first need to decimate the data % by performing the first couple iterations of a merge @@ -232,43 +265,51 @@ decimate(#ems{root={BB, NextBB}}=Ems) -> % The first pass gives us a sort with pointers linked from % largest to smallest. - {RevBB, RevNextBB} = merge_back_bone(Ems, small, BB, NextBB), + {RevBB, RevNextBB} = merge_back_bone(Ems, small, BB, NextBB, Reporter), % We have to run a second pass so that links are pointed % back from smallest to largest. - {FwdBB, FwdNextBB} = merge_back_bone(Ems, big, RevBB, RevNextBB), + {FwdBB, FwdNextBB} = merge_back_bone(Ems, big, RevBB, RevNextBB, Reporter), % Continue deicmating until we have an acceptable bound on % the number of keys to use. - decimate(Ems#ems{root={FwdBB, FwdNextBB}}). + decimate(Ems#ems{root={FwdBB, FwdNextBB}}, Reporter). -merge_back_bone(Ems, Choose, BB, NextBB) -> - BBPos = merge_chains(Ems, Choose, BB), - merge_rest_back_bone(Ems, Choose, NextBB, {[BBPos], nil}). +merge_back_bone(Ems, Choose, BB, NextBB, Reporter) -> + BBPos = merge_chains(Ems, Choose, BB, Reporter), + Reporter(length(BB)), + merge_rest_back_bone(Ems, Choose, NextBB, {[BBPos], nil}, Reporter). -merge_rest_back_bone(_Ems, _Choose, nil, Acc) -> +merge_rest_back_bone(_Ems, _Choose, nil, Acc, _Reporter) -> Acc; -merge_rest_back_bone(Ems, Choose, BBPos, Acc) -> +merge_rest_back_bone(Ems, Choose, BBPos, Acc, Reporter) -> {ok, {BB, NextBB}} = couch_file:pread_term(Ems#ems.fd, BBPos), - NewPos = merge_chains(Ems, Choose, BB), + NewPos = merge_chains(Ems, Choose, BB, Reporter), {NewBB, NewPrev} = append_item(Ems, Acc, NewPos, Ems#ems.bb_chunk), - merge_rest_back_bone(Ems, Choose, NextBB, {NewBB, NewPrev}). + merge_rest_back_bone(Ems, Choose, NextBB, {NewBB, NewPrev}, Reporter). -merge_chains(Ems, Choose, BB) -> +merge_chains(Ems, Choose, BB, Reporter) -> Chains = init_chains(Ems, Choose, BB), - merge_chains(Ems, Choose, Chains, {[], nil}). + merge_chains(Ems, Choose, Chains, {[], nil}, Reporter, 0). -merge_chains(Ems, _Choose, [], ChainAcc) -> +merge_chains(Ems, _Choose, [], ChainAcc, _Reporter, _Count) -> {ok, CPos, _} = couch_file:append_term(Ems#ems.fd, ChainAcc), CPos; -merge_chains(#ems{chain_chunk=CC}=Ems, Choose, Chains, Acc) -> +merge_chains(#ems{chain_chunk=CC}=Ems, Choose, Chains, Acc, Reporter, Count0) -> {KV, RestChains} = choose_kv(Choose, Ems, Chains), {NewKVs, NewPrev} = append_item(Ems, Acc, KV, CC), - merge_chains(Ems, Choose, RestChains, {NewKVs, NewPrev}). + Count1 = case (Count0 + 1) rem ?REPORT_INTERVAL of + 0 -> + Reporter(Count0), + 0; + _ -> + Count0 + 1 + end, + merge_chains(Ems, Choose, RestChains, {NewKVs, NewPrev}, Reporter, Count1). init_chains(Ems, Choose, BB) -> @@ -316,3 +357,10 @@ append_item(Ems, {List, Prev}, Pos, Size) when length(List) >= Size -> append_item(_Ems, {List, Prev}, Pos, _Size) -> {[Pos | List], Prev}. + +num_merges(BBChunk, NumBB) when NumBB =< BBChunk -> + 0; +num_merges(BBChunk, NumBB) when NumBB > BBChunk -> + RevNumBB = ceil(NumBB / BBChunk), + FwdNumBB = ceil(RevNumBB / BBChunk), + 2 + num_merges(BBChunk, FwdNumBB). |