diff options
author | Paul J. Davis <paul.joseph.davis@gmail.com> | 2017-08-16 13:47:38 -0500 |
---|---|---|
committer | Paul J. Davis <paul.joseph.davis@gmail.com> | 2017-08-16 16:15:15 -0500 |
commit | 9c4c41ac22577e6ec8c46c44f0efc13c38712d3d (patch) | |
tree | 73b8af609a0c77407998f8b019149935b85b63c5 | |
parent | b267ccceab85e6baf77178bbb1f234e76568dce1 (diff) | |
download | couchdb-9c4c41ac22577e6ec8c46c44f0efc13c38712d3d.tar.gz |
Copy the docid tree from the old db
This optimizes the first pass of compaction to avoid writing data to the
emsort structure stored in the .compact.meta file. Instead of writing
data there we instead just re-read the docid tree from the original
database.
-rw-r--r-- | src/couch/src/couch_db_updater.erl | 164 |
1 files changed, 142 insertions, 22 deletions
diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl index a83e3b411..221b7710e 100644 --- a/src/couch/src/couch_db_updater.erl +++ b/src/couch/src/couch_db_updater.erl @@ -26,6 +26,7 @@ -record(comp_st, { old_db, new_db, + new_id_tree, meta_fd, retry }). @@ -1049,10 +1050,9 @@ start_copy_compact(#db{}=Db) -> fun copy_purge_info/1, fun copy_compact/1, fun commit_compaction_data/1, - fun sort_meta_data/1, - fun commit_compaction_data/1, - fun copy_meta_data/1, - fun compact_final_sync/1 + fun copy_doc_ids/1, + fun compact_final_sync/1, + fun verify_compaction/1 ], FinalCompSt = lists:foldl(fun(Stage, CompSt) -> @@ -1092,8 +1092,9 @@ open_compaction_files(OldDb) -> #comp_st{ old_db = OldDb, new_db = Db1, + new_id_tree = Db0#db.id_tree, meta_fd = MetaFd, - retry = Db0#db.id_tree + retry = (couch_btree:size(Db0#db.id_tree) > 0) }; _ when DataHdrIsDbHdr -> % We tried to swap out the compaction but there were @@ -1105,8 +1106,9 @@ open_compaction_files(OldDb) -> #comp_st{ old_db = OldDb, new_db = Db1, + new_id_tree = Db0#db.id_tree, meta_fd = MetaFd, - retry = Db0#db.id_tree + retry = true }; _ -> % We're starting a compaction from scratch @@ -1118,8 +1120,9 @@ open_compaction_files(OldDb) -> #comp_st{ old_db = OldDb, new_db = Db1, + new_id_tree = Db0#db.id_tree, meta_fd = MetaFd, - retry = nil + retry = false } end, unlink(DataFd), @@ -1194,7 +1197,7 @@ copy_compact(#comp_st{} = CompSt) -> AccUncopiedSize2 = AccUncopiedSize + ?term_size(DocInfo), if AccUncopiedSize2 >= BufferSize -> NewDb2 = copy_docs( - Db, AccNewDb, lists:reverse([DocInfo | AccUncopied]), Retry), + Db, AccNewDb, lists:reverse([DocInfo | AccUncopied]), CompSt), AccCopiedSize2 = AccCopiedSize + AccUncopiedSize2, if AccCopiedSize2 >= CheckpointAfter -> CommNewDb2 = commit_compaction_data(NewDb2#db{update_seq=Seq}), @@ -1216,7 +1219,7 @@ copy_compact(#comp_st{} = CompSt) -> {changes_done, 0}, {total_changes, TotalChanges} ], - case (Retry /= nil) and couch_task_status:is_task_added() of + case Retry and couch_task_status:is_task_added() of true -> couch_task_status:update([ {retry, true}, @@ -1234,7 +1237,7 @@ copy_compact(#comp_st{} = CompSt) -> {NewDb, [], 0, 0}, [{start_key, NewDb#db.update_seq + 1}]), - NewDb3 = copy_docs(Db, NewDb2, lists:reverse(Uncopied), Retry), + NewDb3 = copy_docs(Db, NewDb2, lists:reverse(Uncopied), CompSt), % copy misc header values if NewDb3#db.security /= Db#db.security -> @@ -1253,7 +1256,7 @@ copy_compact(#comp_st{} = CompSt) -> }. -copy_docs(Db, #db{fd = DestFd} = NewDb, MixedInfos, Retry) -> +copy_docs(Db, #db{fd = DestFd} = NewDb, MixedInfos, CompSt) -> DocInfoIds = [Id || #doc_info{id=Id} <- MixedInfos], LookupResults = couch_btree:lookup(Db#db.id_tree, DocInfoIds), % COUCHDB-968, make sure we prune duplicates during compaction @@ -1305,28 +1308,41 @@ copy_docs(Db, #db{fd = DestFd} = NewDb, MixedInfos, Retry) -> NewInfos = stem_full_doc_infos(Db, NewInfos1), RemoveSeqs = - case Retry of - nil -> - []; - OldDocIdTree -> + case CompSt#comp_st.retry of + true -> + NewDocIdTree = CompSt#comp_st.new_id_tree, % Compaction is being rerun to catch up to writes during the % first pass. This means we may have docs that already exist % in the seq_tree in the .data file. Here we lookup any old % update_seqs so that they can be removed. Ids = [Id || #full_doc_info{id=Id} <- NewInfos], - Existing = couch_btree:lookup(OldDocIdTree, Ids), - [Seq || {ok, #full_doc_info{update_seq=Seq}} <- Existing] + Existing = couch_btree:lookup(NewDocIdTree, Ids), + [Seq || {ok, #full_doc_info{update_seq=Seq}} <- Existing]; + false -> + [] end, {ok, SeqTree} = couch_btree:add_remove( NewDb#db.seq_tree, NewInfos, RemoveSeqs), - FDIKVs = lists:map(fun(#full_doc_info{id=Id, update_seq=Seq}=FDI) -> - {{Id, Seq}, FDI} - end, NewInfos), - {ok, IdEms} = couch_emsort:add(NewDb#db.id_tree, FDIKVs), + NewIdEms = case CompSt#comp_st.retry of + false -> + % If we're not in a retry compaction then there's + % no reason for us to copy doc infos to the emsort + % structure. We can just re-use the id_tree in the + % old db record. + NewDb#db.id_tree; + true -> + % We have to store doc infos because of the possible + % changes in the update sequences. + FDIKVs = lists:map(fun(#full_doc_info{id=Id, update_seq=Seq}=FDI) -> + {{Id, Seq}, FDI} + end, NewInfos), + {ok, IdEms} = couch_emsort:add(NewDb#db.id_tree, FDIKVs), + IdEms + end, update_compact_task(length(NewInfos)), - NewDb#db{id_tree=IdEms, seq_tree=SeqTree}. + NewDb#db{id_tree=NewIdEms, seq_tree=SeqTree}. copy_doc_attachments(#db{fd = SrcFd} = SrcDb, SrcSp, DestFd) -> @@ -1417,6 +1433,88 @@ bind_id_tree(Db, Fd, State) -> Db#db{id_tree=IdBtree}. +copy_doc_ids(#comp_st{retry = false} = CompSt) -> + copy_from_tree(CompSt); +copy_doc_ids(#comp_st{retry = true} = CompSt) -> + Stages = [ + fun sort_meta_data/1, + fun commit_compaction_data/1, + fun copy_meta_data/1 + ], + lists:foldl(fun(Stage, St) -> Stage(St) end, CompSt, Stages). + + +copy_from_tree(#comp_st{old_db = OldDb, new_db = NewDb} = CompSt) -> + #db{ + fd = OldFd, + header = OldHdr + } = OldDb, + + % We're reopening a custom view of the id_tree to + % avoid the work of creating complete `#full_doc_info{}` + % records. + Compression = couch_compress:get_compression_method(), + OldIdTreeState = couch_btree:get_state(OldDb#db.id_tree), + {ok, OldIdTree} = couch_btree:open(OldIdTreeState, OldFd, [ + {split, fun ?MODULE:btree_by_id_split/1}, + {join, fun compact_id_join/2}, + {reduce, fun ?MODULE:btree_by_id_reduce/2}, + {compression, Compression} + ]), + + NewIdTree = CompSt#comp_st.new_id_tree, + NewSeqTree = NewDb#db.seq_tree, + + BufferSize = list_to_integer( + config:get("database_compaction", "doc_buffer_size", "524288")), + + EnumByIdFun = fun({DocId, UpdateSeq}, _, {DstIdTree, Batch, Count}) -> + case Count >= 1000 of + true -> + DstIdTree2 = flush_docid_batch( + Batch, NewSeqTree, DstIdTree, BufferSize), + {ok, {DstIdTree2, [{DocId, UpdateSeq}], 1}}; + false -> + {ok, {DstIdTree, [{DocId, UpdateSeq} | Batch], Count + 1}} + end + end, + + {ok, _, {NewIdTree2, LastBatch, _}} = + couch_btree:foldl(OldIdTree, EnumByIdFun, {NewIdTree, [], 0}, []), + NewIdTree3 = flush_docid_batch( + LastBatch, NewSeqTree, NewIdTree2, BufferSize), + CompSt#comp_st{ + new_db = NewDb#db{id_tree = NewIdTree3} + }. + + +flush_docid_batch(DocIds, SeqTree, IdTree, MaxBufSize) -> + flush_docid_batch(DocIds, SeqTree, IdTree, MaxBufSize, [], 0). + + +flush_docid_batch([], _, IdTree, _, Batch, _) -> + {ok, NewIdTree} = couch_btree:add(IdTree, Batch), + NewIdTree; + +flush_docid_batch(DocIds, SeqTree, IdTree, MaxBufSize, Buf, BufSize) + when BufSize >= MaxBufSize -> + {ok, NewIdTree} = couch_btree:add(IdTree, Buf), + flush_docid_batch(DocIds, SeqTree, NewIdTree, MaxBufSize, [], 0); + +flush_docid_batch(DocIds, SeqTree, IdTree, MaxBufSize, Buf, BufSize) -> + [{_Id, Seq} | Rest] = DocIds, + [{ok, #full_doc_info{} = FDI}] = couch_btree:lookup(SeqTree, [Seq]), + NewBuf = [FDI | Buf], + NewBufSize = BufSize + ?term_size(FDI), + flush_docid_batch(Rest, SeqTree, IdTree, MaxBufSize, NewBuf, NewBufSize). + + +compact_id_join(Id, {HighSeq, _, _}) -> + {Id, HighSeq}; +compact_id_join(Id, {HighSeq, _, _, _}) -> + {Id, HighSeq}. + + sort_meta_data(#comp_st{new_db = Db0} = CompSt) -> {ok, Ems} = couch_emsort:merge(Db0#db.id_tree), CompSt#comp_st{ @@ -1466,6 +1564,28 @@ compact_final_sync(#comp_st{new_db = NewDb0} = CompSt) -> }. +verify_compaction(#comp_st{old_db = OldDb, new_db = NewDb} = CompSt) -> + {ok, OldIdReds0} = couch_btree:full_reduce(OldDb#db.id_tree), + {ok, OldSeqReds} = couch_btree:full_reduce(OldDb#db.seq_tree), + {ok, NewIdReds0} = couch_btree:full_reduce(NewDb#db.id_tree), + {ok, NewSeqReds} = couch_btree:full_reduce(NewDb#db.seq_tree), + { + OldDocCount, + OldDelDocCount, + #size_info{external = OldExternalSize} + } = OldIdReds0, + OldIdReds = {OldDocCount, OldDelDocCount, OldExternalSize}, + { + NewDocCount, + NewDelDocCount, + #size_info{external = NewExternalSize} + } = NewIdReds0, + NewIdReds = {NewDocCount, NewDelDocCount, NewExternalSize}, + NewIdReds = OldIdReds, + NewSeqReds = OldSeqReds, + CompSt. + + merge_docids(Iter, #merge_st{infos=Infos}=Acc) when length(Infos) > 1000 -> #merge_st{ id_tree=IdTree0, |