summaryrefslogtreecommitdiff
path: root/src/couch/src/couch_bt_engine_compactor.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couch/src/couch_bt_engine_compactor.erl')
-rw-r--r--src/couch/src/couch_bt_engine_compactor.erl767
1 files changed, 0 insertions, 767 deletions
diff --git a/src/couch/src/couch_bt_engine_compactor.erl b/src/couch/src/couch_bt_engine_compactor.erl
deleted file mode 100644
index 8ed55b5c3..000000000
--- a/src/couch/src/couch_bt_engine_compactor.erl
+++ /dev/null
@@ -1,767 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_bt_engine_compactor).
-
--export([
- start/4
-]).
-
--include_lib("couch/include/couch_db.hrl").
--include("couch_bt_engine.hrl").
-
--record(comp_st, {
- db_name,
- old_st,
- new_st,
- meta_fd,
- retry
-}).
-
--record(comp_header, {
- db_header,
- meta_st
-}).
-
--record(merge_st, {
- src_fd,
- id_tree,
- seq_tree,
- curr,
- rem_seqs,
- locs
-}).
-
--ifdef(TEST).
--define(COMP_EVENT(Name), couch_bt_engine_compactor_ev:event(Name)).
--else.
--define(COMP_EVENT(Name), ignore).
--endif.
-
-start(#st{} = St, DbName, Options, Parent) ->
- erlang:put(io_priority, {db_compact, DbName}),
- couch_log:debug("Compaction process spawned for db \"~s\"", [DbName]),
-
- couch_db_engine:trigger_on_compact(DbName),
-
- ?COMP_EVENT(init),
- {ok, InitCompSt} = open_compaction_files(DbName, St, Options),
- ?COMP_EVENT(files_opened),
-
- Stages = [
- fun copy_purge_info/1,
- fun copy_compact/1,
- fun commit_compaction_data/1,
- fun sort_meta_data/1,
- fun commit_compaction_data/1,
- fun copy_meta_data/1,
- fun compact_final_sync/1
- ],
-
- FinalCompSt = lists:foldl(
- fun(Stage, CompSt) ->
- Stage(CompSt)
- end,
- InitCompSt,
- Stages
- ),
-
- #comp_st{
- new_st = FinalNewSt,
- meta_fd = MetaFd
- } = FinalCompSt,
-
- ok = couch_bt_engine:decref(FinalNewSt),
- ok = couch_file:close(MetaFd),
-
- ?COMP_EVENT(before_notify),
- Msg = {compact_done, couch_bt_engine, FinalNewSt#st.filepath},
- gen_server:cast(Parent, Msg).
-
-open_compaction_files(DbName, OldSt, Options) ->
- #st{
- filepath = DbFilePath,
- header = SrcHdr
- } = OldSt,
- DataFile = DbFilePath ++ ".compact.data",
- MetaFile = DbFilePath ++ ".compact.meta",
- {ok, DataFd, DataHdr} = open_compaction_file(DataFile),
- {ok, MetaFd, MetaHdr} = open_compaction_file(MetaFile),
- DataHdrIsDbHdr = couch_bt_engine_header:is_header(DataHdr),
- CompSt =
- case {DataHdr, MetaHdr} of
- {#comp_header{} = A, #comp_header{} = A} ->
- % We're restarting a compaction that did not finish
- % before trying to swap out with the original db
- DbHeader = A#comp_header.db_header,
- St0 = couch_bt_engine:init_state(
- DataFile, DataFd, DbHeader, Options
- ),
- St1 = bind_emsort(St0, MetaFd, A#comp_header.meta_st),
- #comp_st{
- db_name = DbName,
- old_st = OldSt,
- new_st = St1,
- meta_fd = MetaFd,
- retry = St0#st.id_tree
- };
- _ when DataHdrIsDbHdr ->
- % We tried to swap out the compaction but there were
- % writes to the database during compaction. Start
- % a compaction retry.
- Header = couch_bt_engine_header:from(SrcHdr),
- ok = reset_compaction_file(MetaFd, Header),
- St0 = couch_bt_engine:init_state(
- DataFile, DataFd, DataHdr, Options
- ),
- St1 = bind_emsort(St0, MetaFd, nil),
- #comp_st{
- db_name = DbName,
- old_st = OldSt,
- new_st = St1,
- meta_fd = MetaFd,
- retry = St0#st.id_tree
- };
- _ ->
- % We're starting a compaction from scratch
- Header = couch_bt_engine_header:from(SrcHdr),
- ok = reset_compaction_file(DataFd, Header),
- ok = reset_compaction_file(MetaFd, Header),
- St0 = couch_bt_engine:init_state(DataFile, DataFd, Header, Options),
- St1 = bind_emsort(St0, MetaFd, nil),
- #comp_st{
- db_name = DbName,
- old_st = OldSt,
- new_st = St1,
- meta_fd = MetaFd,
- retry = nil
- }
- end,
- unlink(DataFd),
- erlang:monitor(process, MetaFd),
- {ok, CompSt}.
-
-copy_purge_info(#comp_st{} = CompSt) ->
- #comp_st{
- db_name = DbName,
- old_st = OldSt,
- new_st = NewSt,
- retry = Retry
- } = CompSt,
- ?COMP_EVENT(purge_init),
- MinPurgeSeq = couch_util:with_db(DbName, fun(Db) ->
- couch_db:get_minimum_purge_seq(Db)
- end),
- OldPSTree = OldSt#st.purge_seq_tree,
- StartSeq = couch_bt_engine:get_purge_seq(NewSt) + 1,
- BufferSize = config:get_integer(
- "database_compaction", "doc_buffer_size", 524288
- ),
- CheckpointAfter = config:get(
- "database_compaction", "checkpoint_after", BufferSize * 10
- ),
-
- EnumFun = fun(Info, _Reds, {StAcc0, InfosAcc, InfosSize, CopiedSize}) ->
- NewInfosSize = InfosSize + ?term_size(Info),
- if
- NewInfosSize >= BufferSize ->
- StAcc1 = copy_purge_infos(
- OldSt, StAcc0, [Info | InfosAcc], MinPurgeSeq, Retry
- ),
- NewCopiedSize = CopiedSize + NewInfosSize,
- if
- NewCopiedSize >= CheckpointAfter ->
- StAcc2 = commit_compaction_data(StAcc1),
- {ok, {StAcc2, [], 0, 0}};
- true ->
- {ok, {StAcc1, [], 0, NewCopiedSize}}
- end;
- true ->
- NewInfosAcc = [Info | InfosAcc],
- {ok, {StAcc0, NewInfosAcc, NewInfosSize, CopiedSize}}
- end
- end,
-
- InitAcc = {NewSt, [], 0, 0},
- Opts = [{start_key, StartSeq}],
- {ok, _, FinalAcc} = couch_btree:fold(OldPSTree, EnumFun, InitAcc, Opts),
- {NewStAcc, Infos, _, _} = FinalAcc,
- FinalNewSt = copy_purge_infos(OldSt, NewStAcc, Infos, MinPurgeSeq, Retry),
-
- ?COMP_EVENT(purge_done),
- CompSt#comp_st{
- new_st = FinalNewSt
- }.
-
-copy_purge_infos(OldSt, NewSt0, Infos, MinPurgeSeq, Retry) ->
- #st{
- id_tree = OldIdTree
- } = OldSt,
-
- % Re-bind our id_tree to the backing btree
- NewIdTreeState = couch_bt_engine_header:id_tree_state(NewSt0#st.header),
- MetaFd = couch_emsort:get_fd(NewSt0#st.id_tree),
- MetaState = couch_emsort:get_state(NewSt0#st.id_tree),
- NewSt1 = bind_id_tree(NewSt0, NewSt0#st.fd, NewIdTreeState),
-
- #st{
- id_tree = NewIdTree0,
- seq_tree = NewSeqTree0,
- purge_tree = NewPurgeTree0,
- purge_seq_tree = NewPurgeSeqTree0
- } = NewSt1,
-
- % Copy over the purge infos
- InfosToAdd = lists:filter(
- fun({PSeq, _, _, _}) ->
- PSeq > MinPurgeSeq
- end,
- Infos
- ),
- {ok, NewPurgeTree1} = couch_btree:add(NewPurgeTree0, InfosToAdd),
- {ok, NewPurgeSeqTree1} = couch_btree:add(NewPurgeSeqTree0, InfosToAdd),
-
- NewSt2 = NewSt1#st{
- purge_tree = NewPurgeTree1,
- purge_seq_tree = NewPurgeSeqTree1
- },
-
- % If we're peforming a retry compaction we have to check if
- % any of the referenced docs have been completely purged
- % from the database. Any doc that has been completely purged
- % must then be removed from our partially compacted database.
- NewSt3 =
- if
- Retry == nil ->
- NewSt2;
- true ->
- AllDocIds = [DocId || {_PurgeSeq, _UUID, DocId, _Revs} <- Infos],
- UniqDocIds = lists:usort(AllDocIds),
- OldIdResults = couch_btree:lookup(OldIdTree, UniqDocIds),
- OldZipped = lists:zip(UniqDocIds, OldIdResults),
-
- % The list of non-existant docs in the database being compacted
- MaybeRemDocIds = [DocId || {DocId, not_found} <- OldZipped],
-
- % Removing anything that exists in the partially compacted database
- NewIdResults = couch_btree:lookup(NewIdTree0, MaybeRemDocIds),
- ToRemove = [Doc || {ok, Doc} <- NewIdResults, Doc /= {ok, not_found}],
-
- {RemIds, RemSeqs} = lists:unzip(
- lists:map(
- fun(FDI) ->
- #full_doc_info{
- id = Id,
- update_seq = Seq
- } = FDI,
- {Id, Seq}
- end,
- ToRemove
- )
- ),
-
- {ok, NewIdTree1} = couch_btree:add_remove(NewIdTree0, [], RemIds),
- {ok, NewSeqTree1} = couch_btree:add_remove(NewSeqTree0, [], RemSeqs),
-
- NewSt2#st{
- id_tree = NewIdTree1,
- seq_tree = NewSeqTree1
- }
- end,
-
- Header = couch_bt_engine:update_header(NewSt3, NewSt3#st.header),
- NewSt4 = NewSt3#st{
- header = Header
- },
- bind_emsort(NewSt4, MetaFd, MetaState).
-
-copy_compact(#comp_st{} = CompSt) ->
- #comp_st{
- db_name = DbName,
- old_st = St,
- new_st = NewSt0,
- retry = Retry
- } = CompSt,
-
- Compression = couch_compress:get_compression_method(),
- NewSt = NewSt0#st{compression = Compression},
- NewUpdateSeq = couch_bt_engine:get_update_seq(NewSt0),
- TotalChanges = couch_bt_engine:count_changes_since(St, NewUpdateSeq),
- BufferSize = list_to_integer(
- config:get("database_compaction", "doc_buffer_size", "524288")
- ),
- CheckpointAfter = couch_util:to_integer(
- config:get(
- "database_compaction",
- "checkpoint_after",
- BufferSize * 10
- )
- ),
-
- EnumBySeqFun =
- fun(
- DocInfo,
- _Offset,
- {AccNewSt, AccUncopied, AccUncopiedSize, AccCopiedSize}
- ) ->
- Seq =
- case DocInfo of
- #full_doc_info{} -> DocInfo#full_doc_info.update_seq;
- #doc_info{} -> DocInfo#doc_info.high_seq
- end,
-
- AccUncopiedSize2 = AccUncopiedSize + ?term_size(DocInfo),
- if
- AccUncopiedSize2 >= BufferSize ->
- NewSt2 = copy_docs(
- St, AccNewSt, lists:reverse([DocInfo | AccUncopied]), Retry
- ),
- AccCopiedSize2 = AccCopiedSize + AccUncopiedSize2,
- if
- AccCopiedSize2 >= CheckpointAfter ->
- {ok, NewSt3} = couch_bt_engine:set_update_seq(NewSt2, Seq),
- CommNewSt3 = commit_compaction_data(NewSt3),
- {ok, {CommNewSt3, [], 0, 0}};
- true ->
- {ok, NewSt3} = couch_bt_engine:set_update_seq(NewSt2, Seq),
- {ok, {NewSt3, [], 0, AccCopiedSize2}}
- end;
- true ->
- {ok, {AccNewSt, [DocInfo | AccUncopied], AccUncopiedSize2, AccCopiedSize}}
- end
- end,
-
- TaskProps0 = [
- {type, database_compaction},
- {database, DbName},
- {phase, document_copy},
- {progress, 0},
- {changes_done, 0},
- {total_changes, TotalChanges}
- ],
- case (Retry =/= nil) and couch_task_status:is_task_added() of
- true ->
- couch_task_status:update([
- {retry, true},
- {phase, document_copy},
- {progress, 0},
- {changes_done, 0},
- {total_changes, TotalChanges}
- ]);
- false ->
- couch_task_status:add_task(TaskProps0),
- couch_task_status:set_update_frequency(500)
- end,
-
- ?COMP_EVENT(seq_init),
- {ok, _, {NewSt2, Uncopied, _, _}} =
- couch_btree:foldl(
- St#st.seq_tree,
- EnumBySeqFun,
- {NewSt, [], 0, 0},
- [{start_key, NewUpdateSeq + 1}]
- ),
-
- NewSt3 = copy_docs(St, NewSt2, lists:reverse(Uncopied), Retry),
-
- ?COMP_EVENT(seq_done),
-
- % Copy the security information over
- SecProps = couch_bt_engine:get_security(St),
- {ok, NewSt4} = couch_bt_engine:copy_security(NewSt3, SecProps),
-
- % Copy general properties over
- Props = couch_bt_engine:get_props(St),
- {ok, NewSt5} = couch_bt_engine:set_props(NewSt4, Props),
-
- FinalUpdateSeq = couch_bt_engine:get_update_seq(St),
- {ok, NewSt6} = couch_bt_engine:set_update_seq(NewSt5, FinalUpdateSeq),
-
- CompSt#comp_st{
- new_st = NewSt6
- }.
-
-copy_docs(St, #st{} = NewSt, MixedInfos, Retry) ->
- DocInfoIds = [Id || #doc_info{id = Id} <- MixedInfos],
- LookupResults = couch_btree:lookup(St#st.id_tree, DocInfoIds),
- % COUCHDB-968, make sure we prune duplicates during compaction
- NewInfos0 = lists:usort(
- fun(#full_doc_info{id = A}, #full_doc_info{id = B}) ->
- A =< B
- end,
- merge_lookups(MixedInfos, LookupResults)
- ),
-
- NewInfos1 = lists:map(
- fun(Info) ->
- {NewRevTree, FinalAcc} = couch_key_tree:mapfold(
- fun
- ({RevPos, RevId}, #leaf{ptr = Sp} = Leaf, leaf, SizesAcc) ->
- {Body, AttInfos} = copy_doc_attachments(St, Sp, NewSt),
- #size_info{external = OldExternalSize} = Leaf#leaf.sizes,
- ExternalSize =
- case OldExternalSize of
- 0 when is_binary(Body) ->
- couch_compress:uncompressed_size(Body);
- 0 ->
- couch_ejson_size:encoded_size(Body);
- N ->
- N
- end,
- Doc0 = #doc{
- id = Info#full_doc_info.id,
- revs = {RevPos, [RevId]},
- deleted = Leaf#leaf.deleted,
- body = Body,
- atts = AttInfos
- },
- Doc1 = couch_bt_engine:serialize_doc(NewSt, Doc0),
- {ok, Doc2, ActiveSize} =
- couch_bt_engine:write_doc_body(NewSt, Doc1),
- AttSizes = [{element(3, A), element(4, A)} || A <- AttInfos],
- NewLeaf = Leaf#leaf{
- ptr = Doc2#doc.body,
- sizes = #size_info{
- active = ActiveSize,
- external = ExternalSize
- },
- atts = AttSizes
- },
- {NewLeaf, couch_db_updater:add_sizes(leaf, NewLeaf, SizesAcc)};
- (_Rev, _Leaf, branch, SizesAcc) ->
- {?REV_MISSING, SizesAcc}
- end,
- {0, 0, []},
- Info#full_doc_info.rev_tree
- ),
- {FinalAS, FinalES, FinalAtts} = FinalAcc,
- TotalAttSize = lists:foldl(fun({_, S}, A) -> S + A end, 0, FinalAtts),
- NewActiveSize = FinalAS + TotalAttSize,
- NewExternalSize = FinalES + TotalAttSize,
- ?COMP_EVENT(seq_copy),
- Info#full_doc_info{
- rev_tree = NewRevTree,
- sizes = #size_info{
- active = NewActiveSize,
- external = NewExternalSize
- }
- }
- end,
- NewInfos0
- ),
-
- Limit = couch_bt_engine:get_revs_limit(St),
- NewInfos = lists:map(
- fun(FDI) ->
- FDI#full_doc_info{
- rev_tree = couch_key_tree:stem(FDI#full_doc_info.rev_tree, Limit)
- }
- end,
- NewInfos1
- ),
-
- RemoveSeqs =
- case Retry of
- nil ->
- [];
- OldDocIdTree ->
- % Compaction is being rerun to catch up to writes during the
- % first pass. This means we may have docs that already exist
- % in the seq_tree in the .data file. Here we lookup any old
- % update_seqs so that they can be removed.
- Ids = [Id || #full_doc_info{id = Id} <- NewInfos],
- Existing = couch_btree:lookup(OldDocIdTree, Ids),
- [Seq || {ok, #full_doc_info{update_seq = Seq}} <- Existing]
- end,
-
- {ok, SeqTree} = couch_btree:add_remove(
- NewSt#st.seq_tree, NewInfos, RemoveSeqs
- ),
-
- EMSortFd = couch_emsort:get_fd(NewSt#st.id_tree),
- {ok, LocSizes} = couch_file:append_terms(EMSortFd, NewInfos),
- EMSortEntries = lists:zipwith(
- fun(FDI, {Loc, _}) ->
- #full_doc_info{
- id = Id,
- update_seq = Seq
- } = FDI,
- {{Id, Seq}, Loc}
- end,
- NewInfos,
- LocSizes
- ),
- {ok, IdEms} = couch_emsort:add(NewSt#st.id_tree, EMSortEntries),
- update_compact_task(length(NewInfos)),
- NewSt#st{id_tree = IdEms, seq_tree = SeqTree}.
-
-copy_doc_attachments(#st{} = SrcSt, SrcSp, DstSt) ->
- {ok, {BodyData, BinInfos0}} = couch_file:pread_term(SrcSt#st.fd, SrcSp),
- BinInfos =
- case BinInfos0 of
- _ when is_binary(BinInfos0) ->
- couch_compress:decompress(BinInfos0);
- _ when is_list(BinInfos0) ->
- % pre 1.2 file format
- BinInfos0
- end,
- % copy the bin values
- NewBinInfos = lists:map(
- fun
- ({Name, Type, BinSp, AttLen, RevPos, ExpectedMd5}) ->
- % 010 UPGRADE CODE
- {ok, SrcStream} = couch_bt_engine:open_read_stream(SrcSt, BinSp),
- {ok, DstStream} = couch_bt_engine:open_write_stream(DstSt, []),
- ok = couch_stream:copy(SrcStream, DstStream),
- {NewStream, AttLen, AttLen, ActualMd5, _IdentityMd5} =
- couch_stream:close(DstStream),
- {ok, NewBinSp} = couch_stream:to_disk_term(NewStream),
- couch_util:check_md5(ExpectedMd5, ActualMd5),
- {Name, Type, NewBinSp, AttLen, AttLen, RevPos, ExpectedMd5, identity};
- ({Name, Type, BinSp, AttLen, DiskLen, RevPos, ExpectedMd5, Enc1}) ->
- {ok, SrcStream} = couch_bt_engine:open_read_stream(SrcSt, BinSp),
- {ok, DstStream} = couch_bt_engine:open_write_stream(DstSt, []),
- ok = couch_stream:copy(SrcStream, DstStream),
- {NewStream, AttLen, _, ActualMd5, _IdentityMd5} =
- couch_stream:close(DstStream),
- {ok, NewBinSp} = couch_stream:to_disk_term(NewStream),
- couch_util:check_md5(ExpectedMd5, ActualMd5),
- Enc =
- case Enc1 of
- true ->
- % 0110 UPGRADE CODE
- gzip;
- false ->
- % 0110 UPGRADE CODE
- identity;
- _ ->
- Enc1
- end,
- {Name, Type, NewBinSp, AttLen, DiskLen, RevPos, ExpectedMd5, Enc}
- end,
- BinInfos
- ),
- {BodyData, NewBinInfos}.
-
-sort_meta_data(#comp_st{new_st = St0} = CompSt) ->
- ?COMP_EVENT(md_sort_init),
- NumKVs = couch_emsort:num_kvs(St0#st.id_tree),
- NumMerges = couch_emsort:num_merges(St0#st.id_tree),
- couch_task_status:update([
- {phase, docid_sort},
- {progress, 0},
- {changes_done, 0},
- {total_changes, NumMerges * NumKVs}
- ]),
- Reporter = fun update_compact_task/1,
- {ok, Ems} = couch_emsort:merge(St0#st.id_tree, Reporter),
- ?COMP_EVENT(md_sort_done),
- CompSt#comp_st{
- new_st = St0#st{
- id_tree = Ems
- }
- }.
-
-copy_meta_data(#comp_st{new_st = St} = CompSt) ->
- #st{
- fd = Fd,
- header = Header,
- id_tree = Src
- } = St,
- SrcFd = couch_emsort:get_fd(Src),
- DstState = couch_bt_engine_header:id_tree_state(Header),
- {ok, IdTree0} = couch_btree:open(DstState, Fd, [
- {split, fun couch_bt_engine:id_tree_split/1},
- {join, fun couch_bt_engine:id_tree_join/2},
- {reduce, fun couch_bt_engine:id_tree_reduce/2}
- ]),
- {ok, Iter} = couch_emsort:iter(Src),
- Acc0 = #merge_st{
- src_fd = SrcFd,
- id_tree = IdTree0,
- seq_tree = St#st.seq_tree,
- rem_seqs = [],
- locs = []
- },
- ?COMP_EVENT(md_copy_init),
- NumKVs = couch_emsort:num_kvs(Src),
- couch_task_status:update([
- {phase, docid_copy},
- {progress, 0},
- {changes_done, 0},
- {total_changes, NumKVs}
- ]),
- Acc = merge_docids(Iter, Acc0),
- {ok, Infos} = couch_file:pread_terms(SrcFd, Acc#merge_st.locs),
- {ok, IdTree} = couch_btree:add(Acc#merge_st.id_tree, Infos),
- {ok, SeqTree} = couch_btree:add_remove(
- Acc#merge_st.seq_tree, [], Acc#merge_st.rem_seqs
- ),
- update_compact_task(NumKVs),
- ?COMP_EVENT(md_copy_done),
- CompSt#comp_st{
- new_st = St#st{
- id_tree = IdTree,
- seq_tree = SeqTree
- }
- }.
-
-compact_final_sync(#comp_st{new_st = St0} = CompSt) ->
- ?COMP_EVENT(before_final_sync),
- {ok, St1} = couch_bt_engine:commit_data(St0),
- ?COMP_EVENT(after_final_sync),
- CompSt#comp_st{
- new_st = St1
- }.
-
-open_compaction_file(FilePath) ->
- case couch_file:open(FilePath, [nologifmissing]) of
- {ok, Fd} ->
- case couch_file:read_header(Fd) of
- {ok, Header} -> {ok, Fd, Header};
- no_valid_header -> {ok, Fd, nil}
- end;
- {error, enoent} ->
- {ok, Fd} = couch_file:open(FilePath, [create]),
- {ok, Fd, nil}
- end.
-
-reset_compaction_file(Fd, Header) ->
- ok = couch_file:truncate(Fd, 0),
- ok = couch_file:write_header(Fd, Header).
-
-commit_compaction_data(#comp_st{new_st = St} = CompSt) ->
- % Compaction needs to write headers to both the data file
- % and the meta file so if we need to restart we can pick
- % back up from where we left off.
- CompSt#comp_st{
- new_st = commit_compaction_data(St)
- };
-commit_compaction_data(#st{} = St) ->
- commit_compaction_data(St, couch_emsort:get_fd(St#st.id_tree)),
- commit_compaction_data(St, St#st.fd).
-
-commit_compaction_data(#st{header = OldHeader} = St0, Fd) ->
- DataState = couch_bt_engine_header:id_tree_state(OldHeader),
- MetaFd = couch_emsort:get_fd(St0#st.id_tree),
- MetaState = couch_emsort:get_state(St0#st.id_tree),
- St1 = bind_id_tree(St0, St0#st.fd, DataState),
- Header = couch_bt_engine:update_header(St1, St1#st.header),
- CompHeader = #comp_header{
- db_header = Header,
- meta_st = MetaState
- },
- ok = couch_file:sync(Fd),
- ok = couch_file:write_header(Fd, CompHeader),
- St2 = St1#st{
- header = Header
- },
- bind_emsort(St2, MetaFd, MetaState).
-
-bind_emsort(St, Fd, nil) ->
- {ok, Ems} = couch_emsort:open(Fd),
- St#st{id_tree = Ems};
-bind_emsort(St, Fd, {BB, _} = Root) when is_list(BB) ->
- % Upgrade clause when we find old compaction files
- bind_emsort(St, Fd, [{root, Root}]);
-bind_emsort(St, Fd, State) ->
- {ok, Ems} = couch_emsort:open(Fd, State),
- St#st{id_tree = Ems}.
-
-bind_id_tree(St, Fd, State) ->
- {ok, IdBtree} = couch_btree:open(State, Fd, [
- {split, fun couch_bt_engine:id_tree_split/1},
- {join, fun couch_bt_engine:id_tree_join/2},
- {reduce, fun couch_bt_engine:id_tree_reduce/2}
- ]),
- St#st{id_tree = IdBtree}.
-
-merge_lookups(Infos, []) ->
- Infos;
-merge_lookups([], _) ->
- [];
-merge_lookups([#doc_info{} = DI | RestInfos], [{ok, FDI} | RestLookups]) ->
- % Assert we've matched our lookups
- if
- DI#doc_info.id == FDI#full_doc_info.id -> ok;
- true -> erlang:error({mismatched_doc_infos, DI#doc_info.id})
- end,
- [FDI | merge_lookups(RestInfos, RestLookups)];
-merge_lookups([FDI | RestInfos], Lookups) ->
- [FDI | merge_lookups(RestInfos, Lookups)].
-
-merge_docids(Iter, #merge_st{locs = Locs} = Acc) when length(Locs) > 1000 ->
- #merge_st{
- src_fd = SrcFd,
- id_tree = IdTree0,
- seq_tree = SeqTree0,
- rem_seqs = RemSeqs
- } = Acc,
- {ok, Infos} = couch_file:pread_terms(SrcFd, Locs),
- {ok, IdTree1} = couch_btree:add(IdTree0, Infos),
- {ok, SeqTree1} = couch_btree:add_remove(SeqTree0, [], RemSeqs),
- Acc1 = Acc#merge_st{
- id_tree = IdTree1,
- seq_tree = SeqTree1,
- rem_seqs = [],
- locs = []
- },
- update_compact_task(length(Locs)),
- merge_docids(Iter, Acc1);
-merge_docids(Iter, #merge_st{curr = Curr} = Acc) ->
- case next_info(Iter, Curr, []) of
- {NextIter, NewCurr, Loc, Seqs} ->
- Acc1 = Acc#merge_st{
- locs = [Loc | Acc#merge_st.locs],
- rem_seqs = Seqs ++ Acc#merge_st.rem_seqs,
- curr = NewCurr
- },
- ?COMP_EVENT(md_copy_row),
- merge_docids(NextIter, Acc1);
- {finished, Loc, Seqs} ->
- Acc#merge_st{
- locs = [Loc | Acc#merge_st.locs],
- rem_seqs = Seqs ++ Acc#merge_st.rem_seqs,
- curr = undefined
- };
- empty ->
- Acc
- end.
-
-next_info(Iter, undefined, []) ->
- case couch_emsort:next(Iter) of
- {ok, {{Id, Seq}, Loc}, NextIter} ->
- next_info(NextIter, {Id, Seq, Loc}, []);
- finished ->
- empty
- end;
-next_info(Iter, {Id, Seq, Loc}, Seqs) ->
- case couch_emsort:next(Iter) of
- {ok, {{Id, NSeq}, NLoc}, NextIter} ->
- next_info(NextIter, {Id, NSeq, NLoc}, [Seq | Seqs]);
- {ok, {{NId, NSeq}, NLoc}, NextIter} ->
- {NextIter, {NId, NSeq, NLoc}, Loc, Seqs};
- finished ->
- {finished, Loc, Seqs}
- end.
-
-update_compact_task(NumChanges) ->
- [Changes, Total] = couch_task_status:get([changes_done, total_changes]),
- Changes2 = Changes + NumChanges,
- Progress =
- case Total of
- 0 ->
- 0;
- _ ->
- (Changes2 * 100) div Total
- end,
- couch_task_status:update([{changes_done, Changes2}, {progress, Progress}]).