summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul J. Davis <paul.joseph.davis@gmail.com>2017-08-16 13:47:38 -0500
committerPaul J. Davis <paul.joseph.davis@gmail.com>2017-08-16 16:15:15 -0500
commit9c4c41ac22577e6ec8c46c44f0efc13c38712d3d (patch)
tree73b8af609a0c77407998f8b019149935b85b63c5
parentb267ccceab85e6baf77178bbb1f234e76568dce1 (diff)
downloadcouchdb-9c4c41ac22577e6ec8c46c44f0efc13c38712d3d.tar.gz
Copy the docid tree from the old db
This optimizes the first pass of compaction to avoid writing data to the emsort structure stored in the .compact.meta file. Instead of writing data there we instead just re-read the docid tree from the original database.
-rw-r--r--src/couch/src/couch_db_updater.erl164
1 files changed, 142 insertions, 22 deletions
diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl
index a83e3b411..221b7710e 100644
--- a/src/couch/src/couch_db_updater.erl
+++ b/src/couch/src/couch_db_updater.erl
@@ -26,6 +26,7 @@
-record(comp_st, {
old_db,
new_db,
+ new_id_tree,
meta_fd,
retry
}).
@@ -1049,10 +1050,9 @@ start_copy_compact(#db{}=Db) ->
fun copy_purge_info/1,
fun copy_compact/1,
fun commit_compaction_data/1,
- fun sort_meta_data/1,
- fun commit_compaction_data/1,
- fun copy_meta_data/1,
- fun compact_final_sync/1
+ fun copy_doc_ids/1,
+ fun compact_final_sync/1,
+ fun verify_compaction/1
],
FinalCompSt = lists:foldl(fun(Stage, CompSt) ->
@@ -1092,8 +1092,9 @@ open_compaction_files(OldDb) ->
#comp_st{
old_db = OldDb,
new_db = Db1,
+ new_id_tree = Db0#db.id_tree,
meta_fd = MetaFd,
- retry = Db0#db.id_tree
+ retry = (couch_btree:size(Db0#db.id_tree) > 0)
};
_ when DataHdrIsDbHdr ->
% We tried to swap out the compaction but there were
@@ -1105,8 +1106,9 @@ open_compaction_files(OldDb) ->
#comp_st{
old_db = OldDb,
new_db = Db1,
+ new_id_tree = Db0#db.id_tree,
meta_fd = MetaFd,
- retry = Db0#db.id_tree
+ retry = true
};
_ ->
% We're starting a compaction from scratch
@@ -1118,8 +1120,9 @@ open_compaction_files(OldDb) ->
#comp_st{
old_db = OldDb,
new_db = Db1,
+ new_id_tree = Db0#db.id_tree,
meta_fd = MetaFd,
- retry = nil
+ retry = false
}
end,
unlink(DataFd),
@@ -1194,7 +1197,7 @@ copy_compact(#comp_st{} = CompSt) ->
AccUncopiedSize2 = AccUncopiedSize + ?term_size(DocInfo),
if AccUncopiedSize2 >= BufferSize ->
NewDb2 = copy_docs(
- Db, AccNewDb, lists:reverse([DocInfo | AccUncopied]), Retry),
+ Db, AccNewDb, lists:reverse([DocInfo | AccUncopied]), CompSt),
AccCopiedSize2 = AccCopiedSize + AccUncopiedSize2,
if AccCopiedSize2 >= CheckpointAfter ->
CommNewDb2 = commit_compaction_data(NewDb2#db{update_seq=Seq}),
@@ -1216,7 +1219,7 @@ copy_compact(#comp_st{} = CompSt) ->
{changes_done, 0},
{total_changes, TotalChanges}
],
- case (Retry /= nil) and couch_task_status:is_task_added() of
+ case Retry and couch_task_status:is_task_added() of
true ->
couch_task_status:update([
{retry, true},
@@ -1234,7 +1237,7 @@ copy_compact(#comp_st{} = CompSt) ->
{NewDb, [], 0, 0},
[{start_key, NewDb#db.update_seq + 1}]),
- NewDb3 = copy_docs(Db, NewDb2, lists:reverse(Uncopied), Retry),
+ NewDb3 = copy_docs(Db, NewDb2, lists:reverse(Uncopied), CompSt),
% copy misc header values
if NewDb3#db.security /= Db#db.security ->
@@ -1253,7 +1256,7 @@ copy_compact(#comp_st{} = CompSt) ->
}.
-copy_docs(Db, #db{fd = DestFd} = NewDb, MixedInfos, Retry) ->
+copy_docs(Db, #db{fd = DestFd} = NewDb, MixedInfos, CompSt) ->
DocInfoIds = [Id || #doc_info{id=Id} <- MixedInfos],
LookupResults = couch_btree:lookup(Db#db.id_tree, DocInfoIds),
% COUCHDB-968, make sure we prune duplicates during compaction
@@ -1305,28 +1308,41 @@ copy_docs(Db, #db{fd = DestFd} = NewDb, MixedInfos, Retry) ->
NewInfos = stem_full_doc_infos(Db, NewInfos1),
RemoveSeqs =
- case Retry of
- nil ->
- [];
- OldDocIdTree ->
+ case CompSt#comp_st.retry of
+ true ->
+ NewDocIdTree = CompSt#comp_st.new_id_tree,
% Compaction is being rerun to catch up to writes during the
% first pass. This means we may have docs that already exist
% in the seq_tree in the .data file. Here we lookup any old
% update_seqs so that they can be removed.
Ids = [Id || #full_doc_info{id=Id} <- NewInfos],
- Existing = couch_btree:lookup(OldDocIdTree, Ids),
- [Seq || {ok, #full_doc_info{update_seq=Seq}} <- Existing]
+ Existing = couch_btree:lookup(NewDocIdTree, Ids),
+ [Seq || {ok, #full_doc_info{update_seq=Seq}} <- Existing];
+ false ->
+ []
end,
{ok, SeqTree} = couch_btree:add_remove(
NewDb#db.seq_tree, NewInfos, RemoveSeqs),
- FDIKVs = lists:map(fun(#full_doc_info{id=Id, update_seq=Seq}=FDI) ->
- {{Id, Seq}, FDI}
- end, NewInfos),
- {ok, IdEms} = couch_emsort:add(NewDb#db.id_tree, FDIKVs),
+ NewIdEms = case CompSt#comp_st.retry of
+ false ->
+ % If we're not in a retry compaction then there's
+ % no reason for us to copy doc infos to the emsort
+ % structure. We can just re-use the id_tree in the
+ % old db record.
+ NewDb#db.id_tree;
+ true ->
+ % We have to store doc infos because of the possible
+ % changes in the update sequences.
+ FDIKVs = lists:map(fun(#full_doc_info{id=Id, update_seq=Seq}=FDI) ->
+ {{Id, Seq}, FDI}
+ end, NewInfos),
+ {ok, IdEms} = couch_emsort:add(NewDb#db.id_tree, FDIKVs),
+ IdEms
+ end,
update_compact_task(length(NewInfos)),
- NewDb#db{id_tree=IdEms, seq_tree=SeqTree}.
+ NewDb#db{id_tree=NewIdEms, seq_tree=SeqTree}.
copy_doc_attachments(#db{fd = SrcFd} = SrcDb, SrcSp, DestFd) ->
@@ -1417,6 +1433,88 @@ bind_id_tree(Db, Fd, State) ->
Db#db{id_tree=IdBtree}.
+copy_doc_ids(#comp_st{retry = false} = CompSt) ->
+ copy_from_tree(CompSt);
+copy_doc_ids(#comp_st{retry = true} = CompSt) ->
+ Stages = [
+ fun sort_meta_data/1,
+ fun commit_compaction_data/1,
+ fun copy_meta_data/1
+ ],
+ lists:foldl(fun(Stage, St) -> Stage(St) end, CompSt, Stages).
+
+
+copy_from_tree(#comp_st{old_db = OldDb, new_db = NewDb} = CompSt) ->
+ #db{
+ fd = OldFd,
+ header = OldHdr
+ } = OldDb,
+
+ % We're reopening a custom view of the id_tree to
+ % avoid the work of creating complete `#full_doc_info{}`
+ % records.
+ Compression = couch_compress:get_compression_method(),
+ OldIdTreeState = couch_btree:get_state(OldDb#db.id_tree),
+ {ok, OldIdTree} = couch_btree:open(OldIdTreeState, OldFd, [
+ {split, fun ?MODULE:btree_by_id_split/1},
+ {join, fun compact_id_join/2},
+ {reduce, fun ?MODULE:btree_by_id_reduce/2},
+ {compression, Compression}
+ ]),
+
+ NewIdTree = CompSt#comp_st.new_id_tree,
+ NewSeqTree = NewDb#db.seq_tree,
+
+ BufferSize = list_to_integer(
+ config:get("database_compaction", "doc_buffer_size", "524288")),
+
+ EnumByIdFun = fun({DocId, UpdateSeq}, _, {DstIdTree, Batch, Count}) ->
+ case Count >= 1000 of
+ true ->
+ DstIdTree2 = flush_docid_batch(
+ Batch, NewSeqTree, DstIdTree, BufferSize),
+ {ok, {DstIdTree2, [{DocId, UpdateSeq}], 1}};
+ false ->
+ {ok, {DstIdTree, [{DocId, UpdateSeq} | Batch], Count + 1}}
+ end
+ end,
+
+ {ok, _, {NewIdTree2, LastBatch, _}} =
+ couch_btree:foldl(OldIdTree, EnumByIdFun, {NewIdTree, [], 0}, []),
+ NewIdTree3 = flush_docid_batch(
+ LastBatch, NewSeqTree, NewIdTree2, BufferSize),
+ CompSt#comp_st{
+ new_db = NewDb#db{id_tree = NewIdTree3}
+ }.
+
+
+flush_docid_batch(DocIds, SeqTree, IdTree, MaxBufSize) ->
+ flush_docid_batch(DocIds, SeqTree, IdTree, MaxBufSize, [], 0).
+
+
+flush_docid_batch([], _, IdTree, _, Batch, _) ->
+ {ok, NewIdTree} = couch_btree:add(IdTree, Batch),
+ NewIdTree;
+
+flush_docid_batch(DocIds, SeqTree, IdTree, MaxBufSize, Buf, BufSize)
+ when BufSize >= MaxBufSize ->
+ {ok, NewIdTree} = couch_btree:add(IdTree, Buf),
+ flush_docid_batch(DocIds, SeqTree, NewIdTree, MaxBufSize, [], 0);
+
+flush_docid_batch(DocIds, SeqTree, IdTree, MaxBufSize, Buf, BufSize) ->
+ [{_Id, Seq} | Rest] = DocIds,
+ [{ok, #full_doc_info{} = FDI}] = couch_btree:lookup(SeqTree, [Seq]),
+ NewBuf = [FDI | Buf],
+ NewBufSize = BufSize + ?term_size(FDI),
+ flush_docid_batch(Rest, SeqTree, IdTree, MaxBufSize, NewBuf, NewBufSize).
+
+
+compact_id_join(Id, {HighSeq, _, _}) ->
+ {Id, HighSeq};
+compact_id_join(Id, {HighSeq, _, _, _}) ->
+ {Id, HighSeq}.
+
+
sort_meta_data(#comp_st{new_db = Db0} = CompSt) ->
{ok, Ems} = couch_emsort:merge(Db0#db.id_tree),
CompSt#comp_st{
@@ -1466,6 +1564,28 @@ compact_final_sync(#comp_st{new_db = NewDb0} = CompSt) ->
}.
+verify_compaction(#comp_st{old_db = OldDb, new_db = NewDb} = CompSt) ->
+ {ok, OldIdReds0} = couch_btree:full_reduce(OldDb#db.id_tree),
+ {ok, OldSeqReds} = couch_btree:full_reduce(OldDb#db.seq_tree),
+ {ok, NewIdReds0} = couch_btree:full_reduce(NewDb#db.id_tree),
+ {ok, NewSeqReds} = couch_btree:full_reduce(NewDb#db.seq_tree),
+ {
+ OldDocCount,
+ OldDelDocCount,
+ #size_info{external = OldExternalSize}
+ } = OldIdReds0,
+ OldIdReds = {OldDocCount, OldDelDocCount, OldExternalSize},
+ {
+ NewDocCount,
+ NewDelDocCount,
+ #size_info{external = NewExternalSize}
+ } = NewIdReds0,
+ NewIdReds = {NewDocCount, NewDelDocCount, NewExternalSize},
+ NewIdReds = OldIdReds,
+ NewSeqReds = OldSeqReds,
+ CompSt.
+
+
merge_docids(Iter, #merge_st{infos=Infos}=Acc) when length(Infos) > 1000 ->
#merge_st{
id_tree=IdTree0,