From 279a66ca079088e29ad78d1f72b2baabd6c40d53 Mon Sep 17 00:00:00 2001 From: "Paul J. Davis" Date: Tue, 5 Sep 2017 18:04:54 -0500 Subject: ss - test suite passing --- src/couch/src/couch_db_updater.erl | 392 +++++++++++++++++++++---------------- src/couch/src/couch_ehamt.erl | 45 +++-- 2 files changed, 256 insertions(+), 181 deletions(-) diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl index 905c0f440..6ea8a0f85 100644 --- a/src/couch/src/couch_db_updater.erl +++ b/src/couch/src/couch_db_updater.erl @@ -26,14 +26,16 @@ -record(comp_st, { old_db, new_db, - new_id_tree, - meta_fd, + docid_fd, + docid_mod, + docid_st, retry }). -record(comp_header, { + vsn = 1, db_header, - meta_st + docid_st }). -record(merge_st, { @@ -1071,11 +1073,11 @@ start_copy_compact(#db{}=Db) -> #comp_st{ new_db = FinalNewDb, - meta_fd = MetaFd + docid_fd = DocIdFd } = FinalCompSt, close_db(FinalNewDb), - ok = couch_file:close(MetaFd), + ok = couch_file:close(DocIdFd), ?COMP_EVENT(before_notify), gen_server:cast(Db#db.main_pid, {compact_done, FinalNewDb#db.filepath}). @@ -1088,57 +1090,57 @@ open_compaction_files(OldDb) -> options = Options, header = SrcHdr } = OldDb, + DataFile = DbFilePath ++ ".compact.data", - MetaFile = DbFilePath ++ ".compact.meta", + DocIdFile = DbFilePath ++ ".compact.meta", {ok, DataFd, DataHdr} = open_compaction_file(DataFile), - {ok, MetaFd, MetaHdr} = open_compaction_file(MetaFile), + {ok, DocIdFd, DocIdHdr} = open_compaction_file(DocIdFile), DataHdrIsDbHdr = couch_db_header:is_header(DataHdr), - CompSt = case {DataHdr, MetaHdr} of + InitCompSt = case {DataHdr, DocIdHdr} of {#comp_header{}=A, #comp_header{}=A} -> % We're restarting a compaction that did not finish % before trying to swap out with the original db DbHeader = A#comp_header.db_header, - Db0 = init_db(DbName, DataFile, DataFd, DbHeader, Options), - Db1 = bind_emsort(Db0, MetaFd, A#comp_header.meta_st), - #comp_st{ + DocIdSt = A#comp_header.docid_st, + NewDb = init_db(DbName, DataFile, DataFd, DbHeader, Options), + CompSt = #comp_st{ old_db = OldDb, - new_db = Db1, - new_id_tree = Db0#db.id_tree, - meta_fd = MetaFd, - retry = (couch_btree:size(Db0#db.id_tree) > 0) - }; + new_db = NewDb, + docid_fd = DocIdFd, + docid_st = DocIdSt, + retry = check_is_retry_compaction(NewDb) + }, + open_docids(CompSt); _ when DataHdrIsDbHdr -> % We tried to swap out the compaction but there were % writes to the database during compaction. Start % a compaction retry. - ok = reset_compaction_file(MetaFd, couch_db_header:from(SrcHdr)), - Db0 = init_db(DbName, DataFile, DataFd, DataHdr, Options), - Db1 = bind_emsort(Db0, MetaFd, nil), - #comp_st{ + ok = reset_compaction_file(DocIdFd, couch_db_header:from(SrcHdr)), + NewDb = init_db(DbName, DataFile, DataFd, DataHdr, Options), + CompSt = #comp_st{ old_db = OldDb, - new_db = Db1, - new_id_tree = Db0#db.id_tree, - meta_fd = MetaFd, - retry = true - }; + new_db = NewDb, + docid_fd = DocIdFd, + retry = check_is_retry_compaction(NewDb) + }, + open_docids(CompSt); _ -> % We're starting a compaction from scratch Header = couch_db_header:from(SrcHdr), ok = reset_compaction_file(DataFd, Header), - ok = reset_compaction_file(MetaFd, Header), - Db0 = init_db(DbName, DataFile, DataFd, Header, Options), - Db1 = bind_emsort(Db0, MetaFd, nil), - #comp_st{ + ok = reset_compaction_file(DocIdFd, Header), + NewDb = init_db(DbName, DataFile, DataFd, Header, Options), + CompSt = #comp_st{ old_db = OldDb, - new_db = Db1, - new_id_tree = Db0#db.id_tree, - meta_fd = MetaFd, + new_db = NewDb, + docid_fd = DocIdFd, retry = false - } + }, + open_docids(CompSt) end, unlink(DataFd), - erlang:monitor(process, MetaFd), - {ok, CompSt}. + erlang:monitor(process, DocIdFd), + {ok, InitCompSt}. open_compaction_file(FilePath) -> @@ -1154,6 +1156,32 @@ open_compaction_file(FilePath) -> end. +check_is_retry_compaction(Db) -> + {ok, Reds} = couch_btree:full_reduce(Db#db.id_tree), + element(1, Reds) + element(2, Reds) > 0. + + +open_docids(#comp_st{} = CompSt) -> + #comp_st{ + docid_fd = DocIdFd, + docid_st = DocIdSt, + retry = Retry + } = CompSt, + DocIdMod = case Retry of + true -> couch_emsort; + false -> couch_ehamt + end, + Opts = case DocIdSt of + undefined -> []; + _ -> [{root, DocIdSt}] + end, + {ok, NewSt} = DocIdMod:open(DocIdFd, Opts), + CompSt#comp_st{ + docid_mod = DocIdMod, + docid_st = NewSt + }. + + reset_compaction_file(Fd, Header) -> ok = couch_file:truncate(Fd, 0), ok = couch_file:write_header(Fd, Header). @@ -1184,52 +1212,34 @@ copy_purge_info(#comp_st{old_db = OldDb, new_db = NewDb} = CompSt) -> end. +-record(comp_acc, { + comp_st, + fdis, + uncopied, + copied, + buffer_size, + checkpoint_after +}). + + copy_compact(#comp_st{} = CompSt) -> #comp_st{ - old_db = Db, - new_db = NewDb0, + old_db = OldDb, + new_db = NewDb, retry = Retry } = CompSt, - Compression = couch_compress:get_compression_method(), - NewDb = NewDb0#db{compression=Compression}, - TotalChanges = couch_db:count_changes_since(Db, NewDb#db.update_seq), + TotalChanges = couch_db:count_changes_since(OldDb, NewDb#db.update_seq), BufferSize = list_to_integer( config:get("database_compaction", "doc_buffer_size", "524288")), CheckpointAfter = couch_util:to_integer( config:get("database_compaction", "checkpoint_after", BufferSize * 10)), - EnumBySeqFun = - fun(DocInfo, _Offset, - {AccNewDb, AccUncopied, AccUncopiedSize, AccCopiedSize}) -> - - Seq = case DocInfo of - #full_doc_info{} -> DocInfo#full_doc_info.update_seq; - #doc_info{} -> DocInfo#doc_info.high_seq - end, - - AccUncopiedSize2 = AccUncopiedSize + ?term_size(DocInfo), - if AccUncopiedSize2 >= BufferSize -> - NewDb2 = copy_docs( - Db, AccNewDb, lists:reverse([DocInfo | AccUncopied]), CompSt), - AccCopiedSize2 = AccCopiedSize + AccUncopiedSize2, - if AccCopiedSize2 >= CheckpointAfter -> - CommNewDb2 = commit_compaction_data(NewDb2#db{update_seq=Seq}), - {ok, {CommNewDb2, [], 0, 0}}; - true -> - {ok, {NewDb2#db{update_seq = Seq}, [], 0, AccCopiedSize2}} - end; - true -> - {ok, {AccNewDb, [DocInfo | AccUncopied], AccUncopiedSize2, - AccCopiedSize}} - end - end, - TaskProps0 = [ {type, database_compaction}, {phase, copy_docs}, {retry, Retry}, - {database, Db#db.name}, + {database, OldDb#db.name}, {progress, 0}, {changes_done, 0}, {total_changes, TotalChanges} @@ -1249,32 +1259,112 @@ copy_compact(#comp_st{} = CompSt) -> end, ?COMP_EVENT(seq_init), - {ok, _, {NewDb2, Uncopied, _, _}} = - couch_btree:foldl(Db#db.seq_tree, EnumBySeqFun, - {NewDb, [], 0, 0}, - [{start_key, NewDb#db.update_seq + 1}]), - NewDb3 = copy_docs(Db, NewDb2, lists:reverse(Uncopied), CompSt), + SeqTree = OldDb#db.seq_tree, + EnumBySeqFun = fun copy_enum_by_seq/3, + InitAcc = #comp_acc{ + comp_st = CompSt, + fdis = [], + uncopied = 0, + copied = 0, + buffer_size = BufferSize, + checkpoint_after = CheckpointAfter + }, + FoldOpts = [{start_key, NewDb#db.update_seq + 1}], + {ok, _, FinalAcc} = + couch_btree:foldl(SeqTree, EnumBySeqFun, InitAcc, FoldOpts), + + #comp_acc{ + comp_st = NewCompSt1, + fdis = FDIs + } = FinalAcc, + NewCompSt2 = copy_docs(NewCompSt1, lists:reverse(FDIs)), + + #comp_st{ + new_db = NewDb1 + } = NewCompSt2, + ?COMP_EVENT(seq_done), % copy misc header values - if NewDb3#db.security /= Db#db.security -> + if NewDb1#db.security /= OldDb#db.security -> {ok, Ptr, _} = couch_file:append_term( - NewDb3#db.fd, Db#db.security, - [{compression, NewDb3#db.compression}]), - NewDb4 = NewDb3#db{security=Db#db.security, security_ptr=Ptr}; + NewDb1#db.fd, OldDb#db.security, + [{compression, NewDb1#db.compression}]), + NewDb2 = NewDb1#db{security=OldDb#db.security, security_ptr=Ptr}; true -> - NewDb4 = NewDb3 + NewDb2 = NewDb1 end, - CompSt#comp_st{ - new_db = NewDb4#db{ - update_seq = Db#db.update_seq + NewCompSt2#comp_st{ + new_db = NewDb2#db{ + update_seq = OldDb#db.update_seq } }. -copy_docs(Db, #db{fd = DestFd} = NewDb, MixedInfos, CompSt) -> +copy_enum_by_seq(DocInfo, _Offset, #comp_acc{} = AccIn) -> + #comp_acc{ + comp_st = CompSt, + fdis = FDIs, + uncopied = Uncopied, + copied = Copied, + buffer_size = BufferSize, + checkpoint_after = CheckpointAfter + } = AccIn, + + #comp_st{ + new_db = NewDb + } = CompSt, + + Seq = case DocInfo of + #full_doc_info{} -> DocInfo#full_doc_info.update_seq; + #doc_info{} -> DocInfo#doc_info.high_seq + end, + + NewUncopied = Uncopied + ?term_size(DocInfo), + NewAcc = case NewUncopied >= BufferSize of + true -> + ToFlush = lists:reverse(FDIs, [DocInfo]), + NewCompSt = copy_docs(CompSt, ToFlush), + case Copied + NewUncopied >= CheckpointAfter of + true -> + TmpCompSt = NewCompSt#comp_st{ + new_db = NewDb#db{update_seq = Seq} + }, + #comp_acc{ + comp_st = commit_compaction_data(TmpCompSt), + fdis = [], + uncopied = 0, + copied = 0 + }; + false -> + #comp_acc{ + comp_st = NewCompSt, + fdis = [], + uncopied = 0, + copied = Copied + NewUncopied + } + end; + false -> + AccIn#comp_acc{ + fdis = [DocInfo | FDIs], + uncopied = NewUncopied + } + end, + {ok, NewAcc}. + + +copy_docs(CompSt, MixedInfos) -> + #comp_st{ + old_db = Db, + new_db = NewDb, + docid_mod = DocIdMod, + docid_st = DocIdSt + } = CompSt, + #db{ + fd = DestFd + } = NewDb, DocInfoIds = [Id || #doc_info{id=Id} <- MixedInfos], LookupResults = couch_btree:lookup(Db#db.id_tree, DocInfoIds), % COUCHDB-968, make sure we prune duplicates during compaction @@ -1329,7 +1419,7 @@ copy_docs(Db, #db{fd = DestFd} = NewDb, MixedInfos, CompSt) -> RemoveSeqs = case CompSt#comp_st.retry of true -> - NewDocIdTree = CompSt#comp_st.new_id_tree, + NewDocIdTree = NewDb#db.id_tree, % Compaction is being rerun to catch up to writes during the % first pass. This means we may have docs that already exist % in the seq_tree in the .data file. Here we lookup any old @@ -1344,24 +1434,15 @@ copy_docs(Db, #db{fd = DestFd} = NewDb, MixedInfos, CompSt) -> {ok, SeqTree} = couch_btree:add_remove( NewDb#db.seq_tree, NewInfos, RemoveSeqs), - NewIdEms = case CompSt#comp_st.retry of - false -> - % If we're not in a retry compaction then there's - % no reason for us to copy doc infos to the emsort - % structure. We can just re-use the id_tree in the - % old db record. - NewDb#db.id_tree; - true -> - % We have to store doc infos because of the possible - % changes in the update sequences. - FDIKVs = lists:map(fun(#full_doc_info{id=Id, update_seq=Seq}=FDI) -> - {{Id, Seq}, FDI} - end, NewInfos), - {ok, IdEms} = couch_emsort:add(NewDb#db.id_tree, FDIKVs), - IdEms - end, + FDIKVs = lists:map(fun(#full_doc_info{id=Id, update_seq=Seq}=FDI) -> + {{Id, Seq}, FDI} + end, NewInfos), + {ok, NewDocIdSt} = DocIdMod:add(DocIdSt, FDIKVs), update_compact_task(length(NewInfos)), - NewDb#db{id_tree=NewIdEms, seq_tree=SeqTree}. + CompSt#comp_st{ + new_db = NewDb#db{seq_tree = SeqTree}, + docid_st = NewDocIdSt + }. copy_doc_attachments(#db{fd = SrcFd} = SrcDb, SrcSp, DestFd) -> @@ -1400,58 +1481,38 @@ copy_doc_attachments(#db{fd = SrcFd} = SrcDb, SrcSp, DestFd) -> {BodyData, NewBinInfos}. -commit_compaction_data(#comp_st{new_db = Db} = CompSt) -> +commit_compaction_data(#comp_st{} = CompSt) -> % Compaction needs to write headers to both the data file - % and the meta file so if we need to restart we can pick + % and the docid file so if we need to restart we can pick % back up from where we left off. - CompSt#comp_st{ - new_db = commit_compaction_data(Db) - }; - -commit_compaction_data(#db{} = Db) -> - commit_compaction_data(Db, couch_emsort:get_fd(Db#db.id_tree)), - commit_compaction_data(Db, Db#db.fd). - - -commit_compaction_data(#db{header=OldHeader}=Db0, Fd) -> - % Mostly copied from commit_data/2 but I have to - % replace the logic to commit and fsync to a specific - % fd instead of the Filepath stuff that commit_data/2 - % does. - DataState = couch_db_header:id_tree_state(OldHeader), - MetaFd = couch_emsort:get_fd(Db0#db.id_tree), - MetaState = couch_emsort:get_state(Db0#db.id_tree), - Db1 = bind_id_tree(Db0, Db0#db.fd, DataState), - Header = db_to_header(Db1, OldHeader), + #comp_st{ + new_db = NewDb, + docid_fd = DocIdFd, + docid_mod = DocIdMod, + docid_st = DocIdSt + } = CompSt, + #db{ + header = OldHeader + } = NewDb, + NewHeader = db_to_header(NewDb, OldHeader), CompHeader = #comp_header{ - db_header = Header, - meta_st = MetaState - }, - ok = couch_file:sync(Fd), - ok = couch_file:write_header(Fd, CompHeader), - Db2 = Db1#db{ - waiting_delayed_commit=nil, - header=Header, - committed_update_seq=Db1#db.update_seq + db_header = NewHeader, + docid_st = DocIdMod:get_state(DocIdSt) }, - bind_emsort(Db2, MetaFd, MetaState). - - -bind_emsort(Db, Fd, nil) -> - {ok, Ems} = couch_emsort:open(Fd), - Db#db{id_tree=Ems}; -bind_emsort(Db, Fd, State) -> - {ok, Ems} = couch_emsort:open(Fd, [{root, State}]), - Db#db{id_tree=Ems}. + sync_compaction_data(DocIdFd, CompHeader), + sync_compaction_data(NewDb#db.fd, CompHeader), + CompSt#comp_st{ + new_db = NewDb#db{ + waiting_delayed_commit = nil, + header = NewHeader, + committed_update_seq = NewDb#db.update_seq + } + }. -bind_id_tree(Db, Fd, State) -> - {ok, IdBtree} = couch_btree:open(State, Fd, [ - {split, fun ?MODULE:btree_by_id_split/1}, - {join, fun ?MODULE:btree_by_id_join/2}, - {reduce, fun ?MODULE:btree_by_id_reduce/2} - ]), - Db#db{id_tree=IdBtree}. +sync_compaction_data(Fd, #comp_header{} = CompHeader) -> + ok = couch_file:sync(Fd), + ok = couch_file:write_header(Fd, CompHeader). copy_doc_ids(#comp_st{retry = false} = CompSt) -> @@ -1465,7 +1526,12 @@ copy_doc_ids(#comp_st{retry = true} = CompSt) -> lists:foldl(fun(Stage, St) -> Stage(St) end, CompSt, Stages). -copy_from_tree(#comp_st{old_db = OldDb, new_db = NewDb} = CompSt) -> +copy_from_tree(#comp_st{} = CompSt) -> + #comp_st{ + old_db = OldDb, + new_db = NewDb, + docid_st = DocIdSt + } = CompSt, #db{ fd = OldFd, header = OldHdr @@ -1491,13 +1557,12 @@ copy_from_tree(#comp_st{old_db = OldDb, new_db = NewDb} = CompSt) -> {compression, Compression} ]), - NewIdTree = CompSt#comp_st.new_id_tree, - NewSeqTree = NewDb#db.seq_tree, + NewIdTree = NewDb#db.id_tree, EnumByIdFun = fun({DocId, UpdateSeq}, _, {DstIdTree, Batch, Count}) -> case Count >= 1000 of true -> - DstIdTree2 = flush_docid_batch(Batch, NewSeqTree, DstIdTree), + DstIdTree2 = flush_docid_batch(Batch, DocIdSt, DstIdTree), update_compact_task(Count), {ok, {DstIdTree2, [{DocId, UpdateSeq}], 1}}; false -> @@ -1508,7 +1573,7 @@ copy_from_tree(#comp_st{old_db = OldDb, new_db = NewDb} = CompSt) -> ?COMP_EVENT(id_init), {ok, _, {NewIdTree2, LastBatch, LastCount}} = couch_btree:foldl(OldIdTree, EnumByIdFun, {NewIdTree, [], 0}, []), - NewIdTree3 = flush_docid_batch(LastBatch, NewSeqTree, NewIdTree2), + NewIdTree3 = flush_docid_batch(LastBatch, DocIdSt, NewIdTree2), ?COMP_EVENT(id_done), update_compact_task(LastCount), CompSt#comp_st{ @@ -1519,12 +1584,12 @@ copy_from_tree(#comp_st{old_db = OldDb, new_db = NewDb} = CompSt) -> flush_docid_batch([], _, IdTree) -> IdTree; -flush_docid_batch(DocIds, SeqTree, IdTree) -> - Seqs = [Seq || {_Id, Seq} <- DocIds], - FDIs = lists:map(fun({ok, #full_doc_info{} = FDI}) -> +flush_docid_batch(DocIds, EHamt, IdTree) -> + FDIs = lists:map(fun(DocIdSeq) -> + {ok, #full_doc_info{} = FDI} = couch_ehamt:lookup(EHamt, DocIdSeq), ?COMP_EVENT(id_copy), FDI - end, couch_btree:lookup(SeqTree, Seqs)), + end, DocIds), {ok, NewIdTree} = couch_btree:add(IdTree, FDIs), NewIdTree. @@ -1535,29 +1600,20 @@ compact_id_join(Id, {HighSeq, _, _, _}) -> {Id, HighSeq}. -sort_meta_data(#comp_st{new_db = Db0} = CompSt) -> +sort_meta_data(#comp_st{docid_st = DocIdSt} = CompSt) -> ?COMP_EVENT(md_sort_init), - {ok, Ems} = couch_emsort:merge(Db0#db.id_tree), + {ok, NewSt} = couch_emsort:merge(DocIdSt), ?COMP_EVENT(md_sort_done), CompSt#comp_st{ - new_db = Db0#db{ - id_tree = Ems - } + docid_st = NewSt }. copy_meta_data(#comp_st{new_db = Db} = CompSt) -> #db{ - fd = Fd, - header = Header + id_tree = IdTree0 } = Db, - Src = Db#db.id_tree, - DstState = couch_db_header:id_tree_state(Header), - {ok, IdTree0} = couch_btree:open(DstState, Fd, [ - {split, fun ?MODULE:btree_by_id_split/1}, - {join, fun ?MODULE:btree_by_id_join/2}, - {reduce, fun ?MODULE:btree_by_id_reduce/2} - ]), + Src = CompSt#comp_st.docid_st, {ok, Iter} = couch_emsort:iter(Src), Acc0 = #merge_st{ id_tree=IdTree0, diff --git a/src/couch/src/couch_ehamt.erl b/src/couch/src/couch_ehamt.erl index cfa8bc724..07e2475ae 100644 --- a/src/couch/src/couch_ehamt.erl +++ b/src/couch/src/couch_ehamt.erl @@ -50,6 +50,8 @@ get_fd/1, get_state/1, + add/2, + insert/2, lookup/2 ]). @@ -84,6 +86,13 @@ get_state(#ehamt{fd = Fd, root = Root}) -> flush_nodes(Fd, Root). +add(EHamt, KVs) -> + NewSt = lists:foldl(fun(KV, Acc) -> + insert(Acc, KV) + end, EHamt, KVs), + {ok, NewSt}. + + insert(EHamt, {K, V}) -> #ehamt{ fd = Fd, @@ -91,19 +100,26 @@ insert(EHamt, {K, V}) -> } = EHamt, HId = hash(K), {ok, DiskLoc, _} = couch_file:append_term(Fd, {K, V}), - insert(Fd, read_node(Fd, Root), HId, DiskLoc, 0). + NewRoot = insert(Fd, read_node(Fd, Root), HId, DiskLoc, 0), + EHamt#ehamt{ + root = NewRoot + }. +insert(Fd, NodeDiskLoc, HId, DiskLoc, Depth) when is_integer(NodeDiskLoc) -> + {ok, Node} = couch_file:pread_term(Fd, NodeDiskLoc), + insert(Fd, Node, HId, DiskLoc, Depth); -insert(_Fd, {item, HId, DiskLocs}, HId, DiskLoc, _Depth) -> - {item, HId, [DiskLoc | DiskLocs]}; +insert(_Fd, {item, HId1, DiskLocs}, HId2, DiskLoc, Depth) + when HId1 == HId2 orelse Depth >= 5 -> + {item, HId1, [DiskLoc | DiskLocs]}; insert(Fd, {item, HId1, DiskLocs}, HId2, DiskLoc, Depth) -> % Transform this item in a new node and then % continue our insertion algorithm. We do this % because we're not guaranteed that the hash ids % are different at the current depth. - Map = get_header_index(HId1, Depth), - Children = [{item, HId1, DiskLocs}], + Map = 1 bsl get_header_index(HId1, Depth), + Children = {{item, HId1, DiskLocs}}, Node = {node, Map, Children}, insert(Fd, Node, HId2, DiskLoc, Depth); @@ -112,17 +128,17 @@ insert(Fd, {node, Map, Children}, HId, DiskLoc, Depth) -> case is_set(Map, Idx) of true -> Pos = get_header_position(Map, Idx), - Child = erlang:element(Pos + 1, Children), + Child = erlang:element(Pos, Children), NewChild0 = insert(Fd, Child, HId, DiskLoc, Depth + 1), NewChild1 = write_node(Fd, NewChild0, Depth + 1), - NewChildren = erlang:setelement(Pos + 1, Children, NewChild1), + NewChildren = erlang:setelement(Pos, Children, NewChild1), {node, Map, NewChildren}; false -> Pos = get_header_position(Map, Idx), NewChild0 = {item, HId, [DiskLoc]}, NewChild1 = write_node(Fd, NewChild0, Depth + 1), - NewMap = Map bor Idx, - NewChildren = erlang:insert_element(Pos + 1, Children, NewChild1), + NewMap = Map bor (1 bsl Idx), + NewChildren = erlang:insert_element(Pos, Children, NewChild1), {node, NewMap, NewChildren} end. @@ -136,15 +152,17 @@ lookup(EHamt, Key) -> lookup(Fd, read_node(Fd, Root), HId, Key, 0). -lookup(_Fd, {item, HId, []}, HId, _, _) -> +lookup(_Fd, {item, HId1, []}, HId2, _, Depth) + when HId1 == HId2 orelse Depth >= 5 -> not_found; -lookup(Fd, {item, HId, [DiskLoc | RestLocs]}, HId, Key, Depth) -> +lookup(Fd, {item, HId1, [DiskLoc | RestLocs]}, HId2, Key, Depth) + when HId1 == HId2 orelse Depth >= 5 -> case couch_file:pread_term(Fd, DiskLoc) of {ok, {Key, Value}} -> {ok, Value}; {ok, _} -> - lookup(Fd, {item, HId, RestLocs}, HId, Key, Depth) + lookup(Fd, {item, HId1, RestLocs}, HId2, Key, Depth) end; lookup(_Fd, {item, HId1, _}, HId2, _, _) when HId1 /= HId2 -> @@ -214,7 +232,8 @@ get_header_index(HashId, Depth) when Depth >= 0, Depth < 5-> is_set(Header, Idx) when Idx >= 0, Idx =< 31 -> - Header band (1 bsl Idx). + Mask = 1 bsl Idx, + Header band Mask == Mask. get_header_position(Header, Idx) -> -- cgit v1.2.1