diff options
Diffstat (limited to 'src/couch/src/couch_bt_engine.erl')
-rw-r--r-- | src/couch/src/couch_bt_engine.erl | 959 |
1 files changed, 959 insertions, 0 deletions
diff --git a/src/couch/src/couch_bt_engine.erl b/src/couch/src/couch_bt_engine.erl new file mode 100644 index 000000000..347c9318f --- /dev/null +++ b/src/couch/src/couch_bt_engine.erl @@ -0,0 +1,959 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_bt_engine). +-behavior(couch_db_engine). + +-export([ + exists/1, + + delete/3, + delete_compaction_files/3, + + init/2, + terminate/2, + handle_db_updater_call/2, + handle_db_updater_info/2, + + incref/1, + decref/1, + monitored_by/1, + + last_activity/1, + + get_compacted_seq/1, + get_del_doc_count/1, + get_disk_version/1, + get_doc_count/1, + get_epochs/1, + get_last_purged/1, + get_purge_seq/1, + get_revs_limit/1, + get_security/1, + get_size_info/1, + get_update_seq/1, + get_uuid/1, + + set_revs_limit/2, + set_security/2, + + open_docs/2, + open_local_docs/2, + read_doc_body/2, + + serialize_doc/2, + write_doc_body/2, + write_doc_infos/4, + + commit_data/1, + + open_write_stream/2, + open_read_stream/2, + is_active_stream/2, + + fold_docs/4, + fold_local_docs/4, + fold_changes/5, + count_changes_since/2, + + start_compaction/4, + finish_compaction/4 +]). + + +-export([ + init_state/4 +]). + + +-export([ + id_tree_split/1, + id_tree_join/2, + id_tree_reduce/2, + + seq_tree_split/1, + seq_tree_join/2, + seq_tree_reduce/2, + + local_tree_split/1, + local_tree_join/2 +]). + + +% Used by the compactor +-export([ + set_update_seq/2, + copy_security/2 +]). + + +-include_lib("couch/include/couch_db.hrl"). +-include("couch_bt_engine.hrl"). + + +exists(FilePath) -> + case filelib:is_file(FilePath) of + true -> + true; + false -> + filelib:is_file(FilePath ++ ".compact") + end. + + +delete(RootDir, FilePath, Async) -> + %% Delete any leftover compaction files. If we don't do this a + %% subsequent request for this DB will try to open them to use + %% as a recovery. + delete_compaction_files(RootDir, FilePath, [{context, delete}]), + + % Delete the actual database file + couch_file:delete(RootDir, FilePath, Async). + + +delete_compaction_files(RootDir, FilePath, DelOpts) -> + lists:foreach(fun(Ext) -> + couch_file:delete(RootDir, FilePath ++ Ext, DelOpts) + end, [".compact", ".compact.data", ".compact.meta"]). + + +init(FilePath, Options) -> + {ok, Fd} = open_db_file(FilePath, Options), + Header = case lists:member(create, Options) of + true -> + delete_compaction_files(FilePath), + Header0 = couch_bt_engine_header:new(), + ok = couch_file:write_header(Fd, Header0), + Header0; + false -> + case couch_file:read_header(Fd) of + {ok, Header0} -> + Header0; + no_valid_header -> + delete_compaction_files(FilePath), + Header0 = couch_bt_engine_header:new(), + ok = couch_file:write_header(Fd, Header0), + Header0 + end + end, + {ok, init_state(FilePath, Fd, Header, Options)}. + + +terminate(_Reason, St) -> + % If the reason we died is because our fd disappeared + % then we don't need to try closing it again. + Ref = St#st.fd_monitor, + if Ref == closed -> ok; true -> + ok = couch_file:close(St#st.fd), + receive + {'DOWN', Ref, _, _, _} -> + ok + after 500 -> + ok + end + end, + couch_util:shutdown_sync(St#st.fd), + ok. + + +handle_db_updater_call(Msg, St) -> + {stop, {invalid_call, Msg}, {invalid_call, Msg}, St}. + + +handle_db_updater_info({'DOWN', Ref, _, _, _}, #st{fd_monitor=Ref} = St) -> + {stop, normal, St#st{fd=undefined, fd_monitor=closed}}. + + +incref(St) -> + {ok, St#st{fd_monitor = erlang:monitor(process, St#st.fd)}}. + + +decref(St) -> + true = erlang:demonitor(St#st.fd_monitor, [flush]), + ok. + + +monitored_by(St) -> + case erlang:process_info(St#st.fd, monitored_by) of + {monitored_by, Pids} -> + Pids; + _ -> + [] + end. + + +last_activity(#st{fd = Fd}) -> + couch_file:last_read(Fd). + + +get_compacted_seq(#st{header = Header}) -> + couch_bt_engine_header:get(Header, compacted_seq). + + +get_del_doc_count(#st{} = St) -> + {ok, Reds} = couch_btree:full_reduce(St#st.id_tree), + element(2, Reds). + + +get_disk_version(#st{header = Header}) -> + couch_bt_engine_header:get(Header, disk_version). + + +get_doc_count(#st{} = St) -> + {ok, Reds} = couch_btree:full_reduce(St#st.id_tree), + element(1, Reds). + + +get_epochs(#st{header = Header}) -> + couch_bt_engine_header:get(Header, epochs). + + +get_last_purged(#st{header = Header} = St) -> + case couch_bt_engine_header:get(Header, purged_docs) of + nil -> + []; + Pointer -> + {ok, PurgeInfo} = couch_file:pread_term(St#st.fd, Pointer), + PurgeInfo + end. + + +get_purge_seq(#st{header = Header}) -> + couch_bt_engine_header:get(Header, purge_seq). + + +get_revs_limit(#st{header = Header}) -> + couch_bt_engine_header:get(Header, revs_limit). + + +get_size_info(#st{} = St) -> + {ok, FileSize} = couch_file:bytes(St#st.fd), + {ok, DbReduction} = couch_btree:full_reduce(St#st.id_tree), + SizeInfo0 = element(3, DbReduction), + SizeInfo = case SizeInfo0 of + SI when is_record(SI, size_info) -> + SI; + {AS, ES} -> + #size_info{active=AS, external=ES}; + AS -> + #size_info{active=AS} + end, + ActiveSize = active_size(St, SizeInfo), + ExternalSize = SizeInfo#size_info.external, + [ + {active, ActiveSize}, + {external, ExternalSize}, + {file, FileSize} + ]. + + +get_security(#st{header = Header} = St) -> + case couch_bt_engine_header:get(Header, security_ptr) of + undefined -> + []; + Pointer -> + {ok, SecProps} = couch_file:pread_term(St#st.fd, Pointer), + SecProps + end. + + +get_update_seq(#st{header = Header}) -> + couch_bt_engine_header:get(Header, update_seq). + + +get_uuid(#st{header = Header}) -> + couch_bt_engine_header:get(Header, uuid). + + +set_revs_limit(#st{header = Header} = St, RevsLimit) -> + NewSt = St#st{ + header = couch_bt_engine_header:set(Header, [ + {revs_limit, RevsLimit} + ]), + needs_commit = true + }, + {ok, increment_update_seq(NewSt)}. + + +set_security(#st{header = Header} = St, NewSecurity) -> + Options = [{compression, St#st.compression}], + {ok, Ptr, _} = couch_file:append_term(St#st.fd, NewSecurity, Options), + NewSt = St#st{ + header = couch_bt_engine_header:set(Header, [ + {security_ptr, Ptr} + ]), + needs_commit = true + }, + {ok, increment_update_seq(NewSt)}. + + +open_docs(#st{} = St, DocIds) -> + Results = couch_btree:lookup(St#st.id_tree, DocIds), + lists:map(fun + ({ok, FDI}) -> FDI; + (not_found) -> not_found + end, Results). + + +open_local_docs(#st{} = St, DocIds) -> + Results = couch_btree:lookup(St#st.local_tree, DocIds), + lists:map(fun + ({ok, Doc}) -> Doc; + (not_found) -> not_found + end, Results). + + +read_doc_body(#st{} = St, #doc{} = Doc) -> + {ok, {Body, Atts}} = couch_file:pread_term(St#st.fd, Doc#doc.body), + Doc#doc{ + body = Body, + atts = Atts + }. + + +serialize_doc(#st{} = St, #doc{} = Doc) -> + Compress = fun(Term) -> + case couch_compress:is_compressed(Term, St#st.compression) of + true -> Term; + false -> couch_compress:compress(Term, St#st.compression) + end + end, + Body = Compress(Doc#doc.body), + Atts = Compress(Doc#doc.atts), + SummaryBin = ?term_to_bin({Body, Atts}), + Md5 = crypto:hash(md5, SummaryBin), + Data = couch_file:assemble_file_chunk(SummaryBin, Md5), + % TODO: This is a terrible hack to get around the issues + % in COUCHDB-3255. We'll need to come back and figure + % out a better approach to handling the case when we + % need to generate a new revision id after the doc + % has been serialized. + Doc#doc{ + body = Data, + meta = [{comp_body, Body} | Doc#doc.meta] + }. + + +write_doc_body(St, #doc{} = Doc) -> + #st{ + fd = Fd + } = St, + {ok, Ptr, Written} = couch_file:append_raw_chunk(Fd, Doc#doc.body), + {ok, Doc#doc{body = Ptr}, Written}. + + +write_doc_infos(#st{} = St, Pairs, LocalDocs, PurgedIdRevs) -> + #st{ + id_tree = IdTree, + seq_tree = SeqTree, + local_tree = LocalTree + } = St, + FinalAcc = lists:foldl(fun({OldFDI, NewFDI}, Acc) -> + {AddAcc, RemIdsAcc, RemSeqsAcc} = Acc, + case {OldFDI, NewFDI} of + {not_found, #full_doc_info{}} -> + {[NewFDI | AddAcc], RemIdsAcc, RemSeqsAcc}; + {#full_doc_info{id = Id}, #full_doc_info{id = Id}} -> + NewAddAcc = [NewFDI | AddAcc], + NewRemSeqsAcc = [OldFDI#full_doc_info.update_seq | RemSeqsAcc], + {NewAddAcc, RemIdsAcc, NewRemSeqsAcc}; + {#full_doc_info{id = Id}, not_found} -> + NewRemIdsAcc = [Id | RemIdsAcc], + NewRemSeqsAcc = [OldFDI#full_doc_info.update_seq | RemSeqsAcc], + {AddAcc, NewRemIdsAcc, NewRemSeqsAcc} + end + end, {[], [], []}, Pairs), + + {Add, RemIds, RemSeqs} = FinalAcc, + {ok, IdTree2} = couch_btree:add_remove(IdTree, Add, RemIds), + {ok, SeqTree2} = couch_btree:add_remove(SeqTree, Add, RemSeqs), + + {AddLDocs, RemLDocIds} = lists:foldl(fun(Doc, {AddAcc, RemAcc}) -> + case Doc#doc.deleted of + true -> + {AddAcc, [Doc#doc.id | RemAcc]}; + false -> + {[Doc | AddAcc], RemAcc} + end + end, {[], []}, LocalDocs), + {ok, LocalTree2} = couch_btree:add_remove(LocalTree, AddLDocs, RemLDocIds), + + NewUpdateSeq = lists:foldl(fun(#full_doc_info{update_seq=Seq}, Acc) -> + erlang:max(Seq, Acc) + end, get_update_seq(St), Add), + + NewHeader = case PurgedIdRevs of + [] -> + couch_bt_engine_header:set(St#st.header, [ + {update_seq, NewUpdateSeq} + ]); + _ -> + {ok, Ptr, _} = couch_file:append_term(St#st.fd, PurgedIdRevs), + OldPurgeSeq = couch_bt_engine_header:get(St#st.header, purge_seq), + % We bump NewUpdateSeq because we have to ensure that + % indexers see that they need to process the new purge + % information. + couch_bt_engine_header:set(St#st.header, [ + {update_seq, NewUpdateSeq + 1}, + {purge_seq, OldPurgeSeq + 1}, + {purged_docs, Ptr} + ]) + end, + + {ok, St#st{ + header = NewHeader, + id_tree = IdTree2, + seq_tree = SeqTree2, + local_tree = LocalTree2, + needs_commit = true + }}. + + +commit_data(St) -> + #st{ + fd = Fd, + fsync_options = FsyncOptions, + header = OldHeader, + needs_commit = NeedsCommit + } = St, + + NewHeader = update_header(St, OldHeader), + + case NewHeader /= OldHeader orelse NeedsCommit of + true -> + Before = lists:member(before_header, FsyncOptions), + After = lists:member(after_header, FsyncOptions), + + if Before -> couch_file:sync(Fd); true -> ok end, + ok = couch_file:write_header(Fd, NewHeader), + if After -> couch_file:sync(Fd); true -> ok end, + + {ok, St#st{ + header = NewHeader, + needs_commit = false + }}; + false -> + {ok, St} + end. + + +open_write_stream(#st{} = St, Options) -> + couch_stream:open({couch_bt_engine_stream, {St#st.fd, []}}, Options). + + +open_read_stream(#st{} = St, StreamSt) -> + {ok, {couch_bt_engine_stream, {St#st.fd, StreamSt}}}. + + +is_active_stream(#st{} = St, {couch_bt_engine_stream, {Fd, _}}) -> + St#st.fd == Fd; +is_active_stream(_, _) -> + false. + + +fold_docs(St, UserFun, UserAcc, Options) -> + fold_docs_int(St, St#st.id_tree, UserFun, UserAcc, Options). + + +fold_local_docs(St, UserFun, UserAcc, Options) -> + fold_docs_int(St, St#st.local_tree, UserFun, UserAcc, Options). + + +fold_changes(St, SinceSeq, UserFun, UserAcc, Options) -> + Fun = fun drop_reductions/4, + InAcc = {UserFun, UserAcc}, + Opts = [{start_key, SinceSeq + 1}] ++ Options, + {ok, _, OutAcc} = couch_btree:fold(St#st.seq_tree, Fun, InAcc, Opts), + {_, FinalUserAcc} = OutAcc, + {ok, FinalUserAcc}. + + +count_changes_since(St, SinceSeq) -> + BTree = St#st.seq_tree, + FoldFun = fun(_SeqStart, PartialReds, 0) -> + {ok, couch_btree:final_reduce(BTree, PartialReds)} + end, + Opts = [{start_key, SinceSeq + 1}], + {ok, Changes} = couch_btree:fold_reduce(BTree, FoldFun, 0, Opts), + Changes. + + +start_compaction(St, DbName, Options, Parent) -> + Args = [St, DbName, Options, Parent], + Pid = spawn_link(couch_bt_engine_compactor, start, Args), + {ok, St, Pid}. + + +finish_compaction(OldState, DbName, Options, CompactFilePath) -> + {ok, NewState1} = ?MODULE:init(CompactFilePath, Options), + OldSeq = get_update_seq(OldState), + NewSeq = get_update_seq(NewState1), + case OldSeq == NewSeq of + true -> + finish_compaction_int(OldState, NewState1); + false -> + couch_log:info("Compaction file still behind main file " + "(update seq=~p. compact update seq=~p). Retrying.", + [OldSeq, NewSeq]), + ok = decref(NewState1), + start_compaction(OldState, DbName, Options, self()) + end. + + +id_tree_split(#full_doc_info{}=Info) -> + #full_doc_info{ + id = Id, + update_seq = Seq, + deleted = Deleted, + sizes = SizeInfo, + rev_tree = Tree + } = Info, + {Id, {Seq, ?b2i(Deleted), split_sizes(SizeInfo), disk_tree(Tree)}}. + + +id_tree_join(Id, {HighSeq, Deleted, DiskTree}) -> + % Handle old formats before data_size was added + id_tree_join(Id, {HighSeq, Deleted, #size_info{}, DiskTree}); + +id_tree_join(Id, {HighSeq, Deleted, Sizes, DiskTree}) -> + #full_doc_info{ + id = Id, + update_seq = HighSeq, + deleted = ?i2b(Deleted), + sizes = couch_db_updater:upgrade_sizes(Sizes), + rev_tree = rev_tree(DiskTree) + }. + + +id_tree_reduce(reduce, FullDocInfos) -> + lists:foldl(fun(Info, {NotDeleted, Deleted, Sizes}) -> + Sizes2 = reduce_sizes(Sizes, Info#full_doc_info.sizes), + case Info#full_doc_info.deleted of + true -> + {NotDeleted, Deleted + 1, Sizes2}; + false -> + {NotDeleted + 1, Deleted, Sizes2} + end + end, {0, 0, #size_info{}}, FullDocInfos); +id_tree_reduce(rereduce, Reds) -> + lists:foldl(fun + ({NotDeleted, Deleted}, {AccNotDeleted, AccDeleted, _AccSizes}) -> + % pre 1.2 format, will be upgraded on compaction + {AccNotDeleted + NotDeleted, AccDeleted + Deleted, nil}; + ({NotDeleted, Deleted, Sizes}, {AccNotDeleted, AccDeleted, AccSizes}) -> + AccSizes2 = reduce_sizes(AccSizes, Sizes), + {AccNotDeleted + NotDeleted, AccDeleted + Deleted, AccSizes2} + end, {0, 0, #size_info{}}, Reds). + + +seq_tree_split(#full_doc_info{}=Info) -> + #full_doc_info{ + id = Id, + update_seq = Seq, + deleted = Del, + sizes = SizeInfo, + rev_tree = Tree + } = Info, + {Seq, {Id, ?b2i(Del), split_sizes(SizeInfo), disk_tree(Tree)}}. + + +seq_tree_join(Seq, {Id, Del, DiskTree}) when is_integer(Del) -> + seq_tree_join(Seq, {Id, Del, {0, 0}, DiskTree}); + +seq_tree_join(Seq, {Id, Del, Sizes, DiskTree}) when is_integer(Del) -> + #full_doc_info{ + id = Id, + update_seq = Seq, + deleted = ?i2b(Del), + sizes = join_sizes(Sizes), + rev_tree = rev_tree(DiskTree) + }; + +seq_tree_join(KeySeq, {Id, RevInfos, DeletedRevInfos}) -> + % Older versions stored #doc_info records in the seq_tree. + % Compact to upgrade. + Revs = lists:map(fun({Rev, Seq, Bp}) -> + #rev_info{rev = Rev, seq = Seq, deleted = false, body_sp = Bp} + end, RevInfos), + DeletedRevs = lists:map(fun({Rev, Seq, Bp}) -> + #rev_info{rev = Rev, seq = Seq, deleted = true, body_sp = Bp} + end, DeletedRevInfos), + #doc_info{ + id = Id, + high_seq = KeySeq, + revs = Revs ++ DeletedRevs + }. + + +seq_tree_reduce(reduce, DocInfos) -> + % count the number of documents + length(DocInfos); +seq_tree_reduce(rereduce, Reds) -> + lists:sum(Reds). + + +local_tree_split(#doc{revs = {0, [Rev]}} = Doc) when is_binary(Rev) -> + #doc{ + id = Id, + body = BodyData + } = Doc, + {Id, {binary_to_integer(Rev), BodyData}}; + +local_tree_split(#doc{revs = {0, [Rev]}} = Doc) when is_integer(Rev) -> + #doc{ + id = Id, + body = BodyData + } = Doc, + {Id, {Rev, BodyData}}. + + +local_tree_join(Id, {Rev, BodyData}) when is_integer(Rev) -> + #doc{ + id = Id, + revs = {0, [integer_to_binary(Rev)]}, + body = BodyData + }. + + +set_update_seq(#st{header = Header} = St, UpdateSeq) -> + {ok, St#st{ + header = couch_bt_engine_header:set(Header, [ + {update_seq, UpdateSeq} + ]), + needs_commit = true + }}. + + +copy_security(#st{header = Header} = St, SecProps) -> + Options = [{compression, St#st.compression}], + {ok, Ptr, _} = couch_file:append_term(St#st.fd, SecProps, Options), + {ok, St#st{ + header = couch_bt_engine_header:set(Header, [ + {security_ptr, Ptr} + ]), + needs_commit = true + }}. + + +open_db_file(FilePath, Options) -> + case couch_file:open(FilePath, Options) of + {ok, Fd} -> + {ok, Fd}; + {error, enoent} -> + % Couldn't find file. is there a compact version? This ca + % happen (rarely) if we crashed during the file switch. + case couch_file:open(FilePath ++ ".compact", [nologifmissing]) of + {ok, Fd} -> + Fmt = "Recovering from compaction file: ~s~s", + couch_log:info(Fmt, [FilePath, ".compact"]), + ok = file:rename(FilePath ++ ".compact", FilePath), + ok = couch_file:sync(Fd), + {ok, Fd}; + {error, enoent} -> + throw({not_found, no_db_file}) + end; + Error -> + throw(Error) + end. + + +init_state(FilePath, Fd, Header0, Options) -> + DefaultFSync = "[before_header, after_header, on_file_open]", + FsyncStr = config:get("couchdb", "fsync_options", DefaultFSync), + {ok, FsyncOptions} = couch_util:parse_term(FsyncStr), + + case lists:member(on_file_open, FsyncOptions) of + true -> ok = couch_file:sync(Fd); + _ -> ok + end, + + Compression = couch_compress:get_compression_method(), + + Header1 = couch_bt_engine_header:upgrade(Header0), + Header = set_default_security_object(Fd, Header1, Compression, Options), + + IdTreeState = couch_bt_engine_header:id_tree_state(Header), + {ok, IdTree} = couch_btree:open(IdTreeState, Fd, [ + {split, fun ?MODULE:id_tree_split/1}, + {join, fun ?MODULE:id_tree_join/2}, + {reduce, fun ?MODULE:id_tree_reduce/2}, + {compression, Compression} + ]), + + SeqTreeState = couch_bt_engine_header:seq_tree_state(Header), + {ok, SeqTree} = couch_btree:open(SeqTreeState, Fd, [ + {split, fun ?MODULE:seq_tree_split/1}, + {join, fun ?MODULE:seq_tree_join/2}, + {reduce, fun ?MODULE:seq_tree_reduce/2}, + {compression, Compression} + ]), + + LocalTreeState = couch_bt_engine_header:local_tree_state(Header), + {ok, LocalTree} = couch_btree:open(LocalTreeState, Fd, [ + {split, fun ?MODULE:local_tree_split/1}, + {join, fun ?MODULE:local_tree_join/2}, + {compression, Compression} + ]), + + ok = couch_file:set_db_pid(Fd, self()), + + St = #st{ + filepath = FilePath, + fd = Fd, + fd_monitor = erlang:monitor(process, Fd), + fsync_options = FsyncOptions, + header = Header, + needs_commit = false, + id_tree = IdTree, + seq_tree = SeqTree, + local_tree = LocalTree, + compression = Compression + }, + + % If this is a new database we've just created a + % new UUID and default security object which need + % to be written to disk. + case Header /= Header0 of + true -> + {ok, NewSt} = commit_data(St), + NewSt; + false -> + St + end. + + +update_header(St, Header) -> + couch_bt_engine_header:set(Header, [ + {seq_tree_state, couch_btree:get_state(St#st.seq_tree)}, + {id_tree_state, couch_btree:get_state(St#st.id_tree)}, + {local_tree_state, couch_btree:get_state(St#st.local_tree)} + ]). + + +increment_update_seq(#st{header = Header} = St) -> + UpdateSeq = couch_bt_engine_header:get(Header, update_seq), + St#st{ + header = couch_bt_engine_header:set(Header, [ + {update_seq, UpdateSeq + 1} + ]) + }. + + +set_default_security_object(Fd, Header, Compression, Options) -> + case couch_bt_engine_header:get(Header, security_ptr) of + Pointer when is_integer(Pointer) -> + Header; + _ -> + Default = couch_util:get_value(default_security_object, Options), + AppendOpts = [{compression, Compression}], + {ok, Ptr, _} = couch_file:append_term(Fd, Default, AppendOpts), + couch_bt_engine_header:set(Header, security_ptr, Ptr) + end. + + +delete_compaction_files(FilePath) -> + RootDir = config:get("couchdb", "database_dir", "."), + DelOpts = [{context, delete}], + delete_compaction_files(RootDir, FilePath, DelOpts). + + +rev_tree(DiskTree) -> + couch_key_tree:map(fun + (_RevId, {Del, Ptr, Seq}) -> + #leaf{ + deleted = ?i2b(Del), + ptr = Ptr, + seq = Seq + }; + (_RevId, {Del, Ptr, Seq, Size}) -> + #leaf{ + deleted = ?i2b(Del), + ptr = Ptr, + seq = Seq, + sizes = couch_db_updater:upgrade_sizes(Size) + }; + (_RevId, {Del, Ptr, Seq, Sizes, Atts}) -> + #leaf{ + deleted = ?i2b(Del), + ptr = Ptr, + seq = Seq, + sizes = couch_db_updater:upgrade_sizes(Sizes), + atts = Atts + }; + (_RevId, ?REV_MISSING) -> + ?REV_MISSING + end, DiskTree). + + +disk_tree(RevTree) -> + couch_key_tree:map(fun + (_RevId, ?REV_MISSING) -> + ?REV_MISSING; + (_RevId, #leaf{} = Leaf) -> + #leaf{ + deleted = Del, + ptr = Ptr, + seq = Seq, + sizes = Sizes, + atts = Atts + } = Leaf, + {?b2i(Del), Ptr, Seq, split_sizes(Sizes), Atts} + end, RevTree). + + +split_sizes(#size_info{}=SI) -> + {SI#size_info.active, SI#size_info.external}. + + +join_sizes({Active, External}) when is_integer(Active), is_integer(External) -> + #size_info{active=Active, external=External}. + + +reduce_sizes(nil, _) -> + nil; +reduce_sizes(_, nil) -> + nil; +reduce_sizes(#size_info{}=S1, #size_info{}=S2) -> + #size_info{ + active = S1#size_info.active + S2#size_info.active, + external = S1#size_info.external + S2#size_info.external + }; +reduce_sizes(S1, S2) -> + US1 = couch_db_updater:upgrade_sizes(S1), + US2 = couch_db_updater:upgrade_sizes(S2), + reduce_sizes(US1, US2). + + +active_size(#st{} = St, #size_info{} = SI) -> + Trees = [ + St#st.id_tree, + St#st.seq_tree, + St#st.local_tree + ], + lists:foldl(fun(T, Acc) -> + case couch_btree:size(T) of + _ when Acc == null -> + null; + nil -> + null; + Size -> + Acc + Size + end + end, SI#size_info.active, Trees). + + +fold_docs_int(St, Tree, UserFun, UserAcc, Options) -> + Fun = case lists:member(include_deleted, Options) of + true -> fun include_deleted/4; + false -> fun skip_deleted/4 + end, + RedFun = case lists:member(include_reductions, Options) of + true -> fun include_reductions/4; + false -> fun drop_reductions/4 + end, + InAcc = {RedFun, {UserFun, UserAcc}}, + {ok, Reds, OutAcc} = couch_btree:fold(Tree, Fun, InAcc, Options), + {_, {_, FinalUserAcc}} = OutAcc, + case lists:member(include_reductions, Options) of + true when Tree == St#st.id_tree -> + {ok, fold_docs_reduce_to_count(Reds), FinalUserAcc}; + true when Tree == St#st.local_tree -> + {ok, 0, FinalUserAcc}; + false -> + {ok, FinalUserAcc} + end. + + +include_deleted(Case, Entry, Reds, {UserFun, UserAcc}) -> + {Go, NewUserAcc} = UserFun(Case, Entry, Reds, UserAcc), + {Go, {UserFun, NewUserAcc}}. + + +% First element of the reductions is the total +% number of undeleted documents. +skip_deleted(traverse, _Entry, {0, _, _} = _Reds, Acc) -> + {skip, Acc}; +skip_deleted(visit, #full_doc_info{deleted = true}, _, Acc) -> + {ok, Acc}; +skip_deleted(Case, Entry, Reds, {UserFun, UserAcc}) -> + {Go, NewUserAcc} = UserFun(Case, Entry, Reds, UserAcc), + {Go, {UserFun, NewUserAcc}}. + + +include_reductions(visit, FDI, Reds, {UserFun, UserAcc}) -> + {Go, NewUserAcc} = UserFun(FDI, Reds, UserAcc), + {Go, {UserFun, NewUserAcc}}; +include_reductions(_, _, _, Acc) -> + {ok, Acc}. + + +drop_reductions(visit, FDI, _Reds, {UserFun, UserAcc}) -> + {Go, NewUserAcc} = UserFun(FDI, UserAcc), + {Go, {UserFun, NewUserAcc}}; +drop_reductions(_, _, _, Acc) -> + {ok, Acc}. + + +fold_docs_reduce_to_count(Reds) -> + RedFun = fun id_tree_reduce/2, + FinalRed = couch_btree:final_reduce(RedFun, Reds), + element(1, FinalRed). + + +finish_compaction_int(#st{} = OldSt, #st{} = NewSt1) -> + #st{ + filepath = FilePath, + local_tree = OldLocal + } = OldSt, + #st{ + filepath = CompactDataPath, + header = Header, + local_tree = NewLocal1 + } = NewSt1, + + % suck up all the local docs into memory and write them to the new db + LoadFun = fun(Value, _Offset, Acc) -> + {ok, [Value | Acc]} + end, + {ok, _, LocalDocs} = couch_btree:foldl(OldLocal, LoadFun, []), + {ok, NewLocal2} = couch_btree:add(NewLocal1, LocalDocs), + + {ok, NewSt2} = commit_data(NewSt1#st{ + header = couch_bt_engine_header:set(Header, [ + {compacted_seq, get_update_seq(OldSt)}, + {revs_limit, get_revs_limit(OldSt)} + ]), + local_tree = NewLocal2 + }), + + % Rename our *.compact.data file to *.compact so that if we + % die between deleting the old file and renaming *.compact + % we can recover correctly. + ok = file:rename(CompactDataPath, FilePath ++ ".compact"), + + % Remove the uncompacted database file + RootDir = config:get("couchdb", "database_dir", "."), + couch_file:delete(RootDir, FilePath), + + % Move our compacted file into its final location + ok = file:rename(FilePath ++ ".compact", FilePath), + + % Delete the old meta compaction file after promoting + % the compaction file. + couch_file:delete(RootDir, FilePath ++ ".compact.meta"), + + % We're finished with our old state + decref(OldSt), + + % And return our finished new state + {ok, NewSt2#st{ + filepath = FilePath + }, undefined}. |