summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul J. Davis <paul.joseph.davis@gmail.com>2017-09-05 18:04:54 -0500
committerPaul J. Davis <paul.joseph.davis@gmail.com>2017-09-05 18:04:54 -0500
commit279a66ca079088e29ad78d1f72b2baabd6c40d53 (patch)
treeab2edb895d0b60da371b665f96dd614d12345487
parentae5ff1bb6ca221e20df30ac8c1dd67aed9d75a34 (diff)
downloadcouchdb-compactor-id-tree-optimization.tar.gz
ss - test suite passingcompactor-id-tree-optimization
-rw-r--r--src/couch/src/couch_db_updater.erl392
-rw-r--r--src/couch/src/couch_ehamt.erl45
2 files changed, 256 insertions, 181 deletions
diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl
index 905c0f440..6ea8a0f85 100644
--- a/src/couch/src/couch_db_updater.erl
+++ b/src/couch/src/couch_db_updater.erl
@@ -26,14 +26,16 @@
-record(comp_st, {
old_db,
new_db,
- new_id_tree,
- meta_fd,
+ docid_fd,
+ docid_mod,
+ docid_st,
retry
}).
-record(comp_header, {
+ vsn = 1,
db_header,
- meta_st
+ docid_st
}).
-record(merge_st, {
@@ -1071,11 +1073,11 @@ start_copy_compact(#db{}=Db) ->
#comp_st{
new_db = FinalNewDb,
- meta_fd = MetaFd
+ docid_fd = DocIdFd
} = FinalCompSt,
close_db(FinalNewDb),
- ok = couch_file:close(MetaFd),
+ ok = couch_file:close(DocIdFd),
?COMP_EVENT(before_notify),
gen_server:cast(Db#db.main_pid, {compact_done, FinalNewDb#db.filepath}).
@@ -1088,57 +1090,57 @@ open_compaction_files(OldDb) ->
options = Options,
header = SrcHdr
} = OldDb,
+
DataFile = DbFilePath ++ ".compact.data",
- MetaFile = DbFilePath ++ ".compact.meta",
+ DocIdFile = DbFilePath ++ ".compact.meta",
{ok, DataFd, DataHdr} = open_compaction_file(DataFile),
- {ok, MetaFd, MetaHdr} = open_compaction_file(MetaFile),
+ {ok, DocIdFd, DocIdHdr} = open_compaction_file(DocIdFile),
DataHdrIsDbHdr = couch_db_header:is_header(DataHdr),
- CompSt = case {DataHdr, MetaHdr} of
+ InitCompSt = case {DataHdr, DocIdHdr} of
{#comp_header{}=A, #comp_header{}=A} ->
% We're restarting a compaction that did not finish
% before trying to swap out with the original db
DbHeader = A#comp_header.db_header,
- Db0 = init_db(DbName, DataFile, DataFd, DbHeader, Options),
- Db1 = bind_emsort(Db0, MetaFd, A#comp_header.meta_st),
- #comp_st{
+ DocIdSt = A#comp_header.docid_st,
+ NewDb = init_db(DbName, DataFile, DataFd, DbHeader, Options),
+ CompSt = #comp_st{
old_db = OldDb,
- new_db = Db1,
- new_id_tree = Db0#db.id_tree,
- meta_fd = MetaFd,
- retry = (couch_btree:size(Db0#db.id_tree) > 0)
- };
+ new_db = NewDb,
+ docid_fd = DocIdFd,
+ docid_st = DocIdSt,
+ retry = check_is_retry_compaction(NewDb)
+ },
+ open_docids(CompSt);
_ when DataHdrIsDbHdr ->
% We tried to swap out the compaction but there were
% writes to the database during compaction. Start
% a compaction retry.
- ok = reset_compaction_file(MetaFd, couch_db_header:from(SrcHdr)),
- Db0 = init_db(DbName, DataFile, DataFd, DataHdr, Options),
- Db1 = bind_emsort(Db0, MetaFd, nil),
- #comp_st{
+ ok = reset_compaction_file(DocIdFd, couch_db_header:from(SrcHdr)),
+ NewDb = init_db(DbName, DataFile, DataFd, DataHdr, Options),
+ CompSt = #comp_st{
old_db = OldDb,
- new_db = Db1,
- new_id_tree = Db0#db.id_tree,
- meta_fd = MetaFd,
- retry = true
- };
+ new_db = NewDb,
+ docid_fd = DocIdFd,
+ retry = check_is_retry_compaction(NewDb)
+ },
+ open_docids(CompSt);
_ ->
% We're starting a compaction from scratch
Header = couch_db_header:from(SrcHdr),
ok = reset_compaction_file(DataFd, Header),
- ok = reset_compaction_file(MetaFd, Header),
- Db0 = init_db(DbName, DataFile, DataFd, Header, Options),
- Db1 = bind_emsort(Db0, MetaFd, nil),
- #comp_st{
+ ok = reset_compaction_file(DocIdFd, Header),
+ NewDb = init_db(DbName, DataFile, DataFd, Header, Options),
+ CompSt = #comp_st{
old_db = OldDb,
- new_db = Db1,
- new_id_tree = Db0#db.id_tree,
- meta_fd = MetaFd,
+ new_db = NewDb,
+ docid_fd = DocIdFd,
retry = false
- }
+ },
+ open_docids(CompSt)
end,
unlink(DataFd),
- erlang:monitor(process, MetaFd),
- {ok, CompSt}.
+ erlang:monitor(process, DocIdFd),
+ {ok, InitCompSt}.
open_compaction_file(FilePath) ->
@@ -1154,6 +1156,32 @@ open_compaction_file(FilePath) ->
end.
+check_is_retry_compaction(Db) ->
+ {ok, Reds} = couch_btree:full_reduce(Db#db.id_tree),
+ element(1, Reds) + element(2, Reds) > 0.
+
+
+open_docids(#comp_st{} = CompSt) ->
+ #comp_st{
+ docid_fd = DocIdFd,
+ docid_st = DocIdSt,
+ retry = Retry
+ } = CompSt,
+ DocIdMod = case Retry of
+ true -> couch_emsort;
+ false -> couch_ehamt
+ end,
+ Opts = case DocIdSt of
+ undefined -> [];
+ _ -> [{root, DocIdSt}]
+ end,
+ {ok, NewSt} = DocIdMod:open(DocIdFd, Opts),
+ CompSt#comp_st{
+ docid_mod = DocIdMod,
+ docid_st = NewSt
+ }.
+
+
reset_compaction_file(Fd, Header) ->
ok = couch_file:truncate(Fd, 0),
ok = couch_file:write_header(Fd, Header).
@@ -1184,52 +1212,34 @@ copy_purge_info(#comp_st{old_db = OldDb, new_db = NewDb} = CompSt) ->
end.
+-record(comp_acc, {
+ comp_st,
+ fdis,
+ uncopied,
+ copied,
+ buffer_size,
+ checkpoint_after
+}).
+
+
copy_compact(#comp_st{} = CompSt) ->
#comp_st{
- old_db = Db,
- new_db = NewDb0,
+ old_db = OldDb,
+ new_db = NewDb,
retry = Retry
} = CompSt,
- Compression = couch_compress:get_compression_method(),
- NewDb = NewDb0#db{compression=Compression},
- TotalChanges = couch_db:count_changes_since(Db, NewDb#db.update_seq),
+ TotalChanges = couch_db:count_changes_since(OldDb, NewDb#db.update_seq),
BufferSize = list_to_integer(
config:get("database_compaction", "doc_buffer_size", "524288")),
CheckpointAfter = couch_util:to_integer(
config:get("database_compaction", "checkpoint_after",
BufferSize * 10)),
- EnumBySeqFun =
- fun(DocInfo, _Offset,
- {AccNewDb, AccUncopied, AccUncopiedSize, AccCopiedSize}) ->
-
- Seq = case DocInfo of
- #full_doc_info{} -> DocInfo#full_doc_info.update_seq;
- #doc_info{} -> DocInfo#doc_info.high_seq
- end,
-
- AccUncopiedSize2 = AccUncopiedSize + ?term_size(DocInfo),
- if AccUncopiedSize2 >= BufferSize ->
- NewDb2 = copy_docs(
- Db, AccNewDb, lists:reverse([DocInfo | AccUncopied]), CompSt),
- AccCopiedSize2 = AccCopiedSize + AccUncopiedSize2,
- if AccCopiedSize2 >= CheckpointAfter ->
- CommNewDb2 = commit_compaction_data(NewDb2#db{update_seq=Seq}),
- {ok, {CommNewDb2, [], 0, 0}};
- true ->
- {ok, {NewDb2#db{update_seq = Seq}, [], 0, AccCopiedSize2}}
- end;
- true ->
- {ok, {AccNewDb, [DocInfo | AccUncopied], AccUncopiedSize2,
- AccCopiedSize}}
- end
- end,
-
TaskProps0 = [
{type, database_compaction},
{phase, copy_docs},
{retry, Retry},
- {database, Db#db.name},
+ {database, OldDb#db.name},
{progress, 0},
{changes_done, 0},
{total_changes, TotalChanges}
@@ -1249,32 +1259,112 @@ copy_compact(#comp_st{} = CompSt) ->
end,
?COMP_EVENT(seq_init),
- {ok, _, {NewDb2, Uncopied, _, _}} =
- couch_btree:foldl(Db#db.seq_tree, EnumBySeqFun,
- {NewDb, [], 0, 0},
- [{start_key, NewDb#db.update_seq + 1}]),
- NewDb3 = copy_docs(Db, NewDb2, lists:reverse(Uncopied), CompSt),
+ SeqTree = OldDb#db.seq_tree,
+ EnumBySeqFun = fun copy_enum_by_seq/3,
+ InitAcc = #comp_acc{
+ comp_st = CompSt,
+ fdis = [],
+ uncopied = 0,
+ copied = 0,
+ buffer_size = BufferSize,
+ checkpoint_after = CheckpointAfter
+ },
+ FoldOpts = [{start_key, NewDb#db.update_seq + 1}],
+ {ok, _, FinalAcc} =
+ couch_btree:foldl(SeqTree, EnumBySeqFun, InitAcc, FoldOpts),
+
+ #comp_acc{
+ comp_st = NewCompSt1,
+ fdis = FDIs
+ } = FinalAcc,
+ NewCompSt2 = copy_docs(NewCompSt1, lists:reverse(FDIs)),
+
+ #comp_st{
+ new_db = NewDb1
+ } = NewCompSt2,
+
?COMP_EVENT(seq_done),
% copy misc header values
- if NewDb3#db.security /= Db#db.security ->
+ if NewDb1#db.security /= OldDb#db.security ->
{ok, Ptr, _} = couch_file:append_term(
- NewDb3#db.fd, Db#db.security,
- [{compression, NewDb3#db.compression}]),
- NewDb4 = NewDb3#db{security=Db#db.security, security_ptr=Ptr};
+ NewDb1#db.fd, OldDb#db.security,
+ [{compression, NewDb1#db.compression}]),
+ NewDb2 = NewDb1#db{security=OldDb#db.security, security_ptr=Ptr};
true ->
- NewDb4 = NewDb3
+ NewDb2 = NewDb1
end,
- CompSt#comp_st{
- new_db = NewDb4#db{
- update_seq = Db#db.update_seq
+ NewCompSt2#comp_st{
+ new_db = NewDb2#db{
+ update_seq = OldDb#db.update_seq
}
}.
-copy_docs(Db, #db{fd = DestFd} = NewDb, MixedInfos, CompSt) ->
+copy_enum_by_seq(DocInfo, _Offset, #comp_acc{} = AccIn) ->
+ #comp_acc{
+ comp_st = CompSt,
+ fdis = FDIs,
+ uncopied = Uncopied,
+ copied = Copied,
+ buffer_size = BufferSize,
+ checkpoint_after = CheckpointAfter
+ } = AccIn,
+
+ #comp_st{
+ new_db = NewDb
+ } = CompSt,
+
+ Seq = case DocInfo of
+ #full_doc_info{} -> DocInfo#full_doc_info.update_seq;
+ #doc_info{} -> DocInfo#doc_info.high_seq
+ end,
+
+ NewUncopied = Uncopied + ?term_size(DocInfo),
+ NewAcc = case NewUncopied >= BufferSize of
+ true ->
+ ToFlush = lists:reverse(FDIs, [DocInfo]),
+ NewCompSt = copy_docs(CompSt, ToFlush),
+ case Copied + NewUncopied >= CheckpointAfter of
+ true ->
+ TmpCompSt = NewCompSt#comp_st{
+ new_db = NewDb#db{update_seq = Seq}
+ },
+ #comp_acc{
+ comp_st = commit_compaction_data(TmpCompSt),
+ fdis = [],
+ uncopied = 0,
+ copied = 0
+ };
+ false ->
+ #comp_acc{
+ comp_st = NewCompSt,
+ fdis = [],
+ uncopied = 0,
+ copied = Copied + NewUncopied
+ }
+ end;
+ false ->
+ AccIn#comp_acc{
+ fdis = [DocInfo | FDIs],
+ uncopied = NewUncopied
+ }
+ end,
+ {ok, NewAcc}.
+
+
+copy_docs(CompSt, MixedInfos) ->
+ #comp_st{
+ old_db = Db,
+ new_db = NewDb,
+ docid_mod = DocIdMod,
+ docid_st = DocIdSt
+ } = CompSt,
+ #db{
+ fd = DestFd
+ } = NewDb,
DocInfoIds = [Id || #doc_info{id=Id} <- MixedInfos],
LookupResults = couch_btree:lookup(Db#db.id_tree, DocInfoIds),
% COUCHDB-968, make sure we prune duplicates during compaction
@@ -1329,7 +1419,7 @@ copy_docs(Db, #db{fd = DestFd} = NewDb, MixedInfos, CompSt) ->
RemoveSeqs =
case CompSt#comp_st.retry of
true ->
- NewDocIdTree = CompSt#comp_st.new_id_tree,
+ NewDocIdTree = NewDb#db.id_tree,
% Compaction is being rerun to catch up to writes during the
% first pass. This means we may have docs that already exist
% in the seq_tree in the .data file. Here we lookup any old
@@ -1344,24 +1434,15 @@ copy_docs(Db, #db{fd = DestFd} = NewDb, MixedInfos, CompSt) ->
{ok, SeqTree} = couch_btree:add_remove(
NewDb#db.seq_tree, NewInfos, RemoveSeqs),
- NewIdEms = case CompSt#comp_st.retry of
- false ->
- % If we're not in a retry compaction then there's
- % no reason for us to copy doc infos to the emsort
- % structure. We can just re-use the id_tree in the
- % old db record.
- NewDb#db.id_tree;
- true ->
- % We have to store doc infos because of the possible
- % changes in the update sequences.
- FDIKVs = lists:map(fun(#full_doc_info{id=Id, update_seq=Seq}=FDI) ->
- {{Id, Seq}, FDI}
- end, NewInfos),
- {ok, IdEms} = couch_emsort:add(NewDb#db.id_tree, FDIKVs),
- IdEms
- end,
+ FDIKVs = lists:map(fun(#full_doc_info{id=Id, update_seq=Seq}=FDI) ->
+ {{Id, Seq}, FDI}
+ end, NewInfos),
+ {ok, NewDocIdSt} = DocIdMod:add(DocIdSt, FDIKVs),
update_compact_task(length(NewInfos)),
- NewDb#db{id_tree=NewIdEms, seq_tree=SeqTree}.
+ CompSt#comp_st{
+ new_db = NewDb#db{seq_tree = SeqTree},
+ docid_st = NewDocIdSt
+ }.
copy_doc_attachments(#db{fd = SrcFd} = SrcDb, SrcSp, DestFd) ->
@@ -1400,58 +1481,38 @@ copy_doc_attachments(#db{fd = SrcFd} = SrcDb, SrcSp, DestFd) ->
{BodyData, NewBinInfos}.
-commit_compaction_data(#comp_st{new_db = Db} = CompSt) ->
+commit_compaction_data(#comp_st{} = CompSt) ->
% Compaction needs to write headers to both the data file
- % and the meta file so if we need to restart we can pick
+ % and the docid file so if we need to restart we can pick
% back up from where we left off.
- CompSt#comp_st{
- new_db = commit_compaction_data(Db)
- };
-
-commit_compaction_data(#db{} = Db) ->
- commit_compaction_data(Db, couch_emsort:get_fd(Db#db.id_tree)),
- commit_compaction_data(Db, Db#db.fd).
-
-
-commit_compaction_data(#db{header=OldHeader}=Db0, Fd) ->
- % Mostly copied from commit_data/2 but I have to
- % replace the logic to commit and fsync to a specific
- % fd instead of the Filepath stuff that commit_data/2
- % does.
- DataState = couch_db_header:id_tree_state(OldHeader),
- MetaFd = couch_emsort:get_fd(Db0#db.id_tree),
- MetaState = couch_emsort:get_state(Db0#db.id_tree),
- Db1 = bind_id_tree(Db0, Db0#db.fd, DataState),
- Header = db_to_header(Db1, OldHeader),
+ #comp_st{
+ new_db = NewDb,
+ docid_fd = DocIdFd,
+ docid_mod = DocIdMod,
+ docid_st = DocIdSt
+ } = CompSt,
+ #db{
+ header = OldHeader
+ } = NewDb,
+ NewHeader = db_to_header(NewDb, OldHeader),
CompHeader = #comp_header{
- db_header = Header,
- meta_st = MetaState
- },
- ok = couch_file:sync(Fd),
- ok = couch_file:write_header(Fd, CompHeader),
- Db2 = Db1#db{
- waiting_delayed_commit=nil,
- header=Header,
- committed_update_seq=Db1#db.update_seq
+ db_header = NewHeader,
+ docid_st = DocIdMod:get_state(DocIdSt)
},
- bind_emsort(Db2, MetaFd, MetaState).
-
-
-bind_emsort(Db, Fd, nil) ->
- {ok, Ems} = couch_emsort:open(Fd),
- Db#db{id_tree=Ems};
-bind_emsort(Db, Fd, State) ->
- {ok, Ems} = couch_emsort:open(Fd, [{root, State}]),
- Db#db{id_tree=Ems}.
+ sync_compaction_data(DocIdFd, CompHeader),
+ sync_compaction_data(NewDb#db.fd, CompHeader),
+ CompSt#comp_st{
+ new_db = NewDb#db{
+ waiting_delayed_commit = nil,
+ header = NewHeader,
+ committed_update_seq = NewDb#db.update_seq
+ }
+ }.
-bind_id_tree(Db, Fd, State) ->
- {ok, IdBtree} = couch_btree:open(State, Fd, [
- {split, fun ?MODULE:btree_by_id_split/1},
- {join, fun ?MODULE:btree_by_id_join/2},
- {reduce, fun ?MODULE:btree_by_id_reduce/2}
- ]),
- Db#db{id_tree=IdBtree}.
+sync_compaction_data(Fd, #comp_header{} = CompHeader) ->
+ ok = couch_file:sync(Fd),
+ ok = couch_file:write_header(Fd, CompHeader).
copy_doc_ids(#comp_st{retry = false} = CompSt) ->
@@ -1465,7 +1526,12 @@ copy_doc_ids(#comp_st{retry = true} = CompSt) ->
lists:foldl(fun(Stage, St) -> Stage(St) end, CompSt, Stages).
-copy_from_tree(#comp_st{old_db = OldDb, new_db = NewDb} = CompSt) ->
+copy_from_tree(#comp_st{} = CompSt) ->
+ #comp_st{
+ old_db = OldDb,
+ new_db = NewDb,
+ docid_st = DocIdSt
+ } = CompSt,
#db{
fd = OldFd,
header = OldHdr
@@ -1491,13 +1557,12 @@ copy_from_tree(#comp_st{old_db = OldDb, new_db = NewDb} = CompSt) ->
{compression, Compression}
]),
- NewIdTree = CompSt#comp_st.new_id_tree,
- NewSeqTree = NewDb#db.seq_tree,
+ NewIdTree = NewDb#db.id_tree,
EnumByIdFun = fun({DocId, UpdateSeq}, _, {DstIdTree, Batch, Count}) ->
case Count >= 1000 of
true ->
- DstIdTree2 = flush_docid_batch(Batch, NewSeqTree, DstIdTree),
+ DstIdTree2 = flush_docid_batch(Batch, DocIdSt, DstIdTree),
update_compact_task(Count),
{ok, {DstIdTree2, [{DocId, UpdateSeq}], 1}};
false ->
@@ -1508,7 +1573,7 @@ copy_from_tree(#comp_st{old_db = OldDb, new_db = NewDb} = CompSt) ->
?COMP_EVENT(id_init),
{ok, _, {NewIdTree2, LastBatch, LastCount}} =
couch_btree:foldl(OldIdTree, EnumByIdFun, {NewIdTree, [], 0}, []),
- NewIdTree3 = flush_docid_batch(LastBatch, NewSeqTree, NewIdTree2),
+ NewIdTree3 = flush_docid_batch(LastBatch, DocIdSt, NewIdTree2),
?COMP_EVENT(id_done),
update_compact_task(LastCount),
CompSt#comp_st{
@@ -1519,12 +1584,12 @@ copy_from_tree(#comp_st{old_db = OldDb, new_db = NewDb} = CompSt) ->
flush_docid_batch([], _, IdTree) ->
IdTree;
-flush_docid_batch(DocIds, SeqTree, IdTree) ->
- Seqs = [Seq || {_Id, Seq} <- DocIds],
- FDIs = lists:map(fun({ok, #full_doc_info{} = FDI}) ->
+flush_docid_batch(DocIds, EHamt, IdTree) ->
+ FDIs = lists:map(fun(DocIdSeq) ->
+ {ok, #full_doc_info{} = FDI} = couch_ehamt:lookup(EHamt, DocIdSeq),
?COMP_EVENT(id_copy),
FDI
- end, couch_btree:lookup(SeqTree, Seqs)),
+ end, DocIds),
{ok, NewIdTree} = couch_btree:add(IdTree, FDIs),
NewIdTree.
@@ -1535,29 +1600,20 @@ compact_id_join(Id, {HighSeq, _, _, _}) ->
{Id, HighSeq}.
-sort_meta_data(#comp_st{new_db = Db0} = CompSt) ->
+sort_meta_data(#comp_st{docid_st = DocIdSt} = CompSt) ->
?COMP_EVENT(md_sort_init),
- {ok, Ems} = couch_emsort:merge(Db0#db.id_tree),
+ {ok, NewSt} = couch_emsort:merge(DocIdSt),
?COMP_EVENT(md_sort_done),
CompSt#comp_st{
- new_db = Db0#db{
- id_tree = Ems
- }
+ docid_st = NewSt
}.
copy_meta_data(#comp_st{new_db = Db} = CompSt) ->
#db{
- fd = Fd,
- header = Header
+ id_tree = IdTree0
} = Db,
- Src = Db#db.id_tree,
- DstState = couch_db_header:id_tree_state(Header),
- {ok, IdTree0} = couch_btree:open(DstState, Fd, [
- {split, fun ?MODULE:btree_by_id_split/1},
- {join, fun ?MODULE:btree_by_id_join/2},
- {reduce, fun ?MODULE:btree_by_id_reduce/2}
- ]),
+ Src = CompSt#comp_st.docid_st,
{ok, Iter} = couch_emsort:iter(Src),
Acc0 = #merge_st{
id_tree=IdTree0,
diff --git a/src/couch/src/couch_ehamt.erl b/src/couch/src/couch_ehamt.erl
index cfa8bc724..07e2475ae 100644
--- a/src/couch/src/couch_ehamt.erl
+++ b/src/couch/src/couch_ehamt.erl
@@ -50,6 +50,8 @@
get_fd/1,
get_state/1,
+ add/2,
+
insert/2,
lookup/2
]).
@@ -84,6 +86,13 @@ get_state(#ehamt{fd = Fd, root = Root}) ->
flush_nodes(Fd, Root).
+add(EHamt, KVs) ->
+ NewSt = lists:foldl(fun(KV, Acc) ->
+ insert(Acc, KV)
+ end, EHamt, KVs),
+ {ok, NewSt}.
+
+
insert(EHamt, {K, V}) ->
#ehamt{
fd = Fd,
@@ -91,19 +100,26 @@ insert(EHamt, {K, V}) ->
} = EHamt,
HId = hash(K),
{ok, DiskLoc, _} = couch_file:append_term(Fd, {K, V}),
- insert(Fd, read_node(Fd, Root), HId, DiskLoc, 0).
+ NewRoot = insert(Fd, read_node(Fd, Root), HId, DiskLoc, 0),
+ EHamt#ehamt{
+ root = NewRoot
+ }.
+insert(Fd, NodeDiskLoc, HId, DiskLoc, Depth) when is_integer(NodeDiskLoc) ->
+ {ok, Node} = couch_file:pread_term(Fd, NodeDiskLoc),
+ insert(Fd, Node, HId, DiskLoc, Depth);
-insert(_Fd, {item, HId, DiskLocs}, HId, DiskLoc, _Depth) ->
- {item, HId, [DiskLoc | DiskLocs]};
+insert(_Fd, {item, HId1, DiskLocs}, HId2, DiskLoc, Depth)
+ when HId1 == HId2 orelse Depth >= 5 ->
+ {item, HId1, [DiskLoc | DiskLocs]};
insert(Fd, {item, HId1, DiskLocs}, HId2, DiskLoc, Depth) ->
% Transform this item in a new node and then
% continue our insertion algorithm. We do this
% because we're not guaranteed that the hash ids
% are different at the current depth.
- Map = get_header_index(HId1, Depth),
- Children = [{item, HId1, DiskLocs}],
+ Map = 1 bsl get_header_index(HId1, Depth),
+ Children = {{item, HId1, DiskLocs}},
Node = {node, Map, Children},
insert(Fd, Node, HId2, DiskLoc, Depth);
@@ -112,17 +128,17 @@ insert(Fd, {node, Map, Children}, HId, DiskLoc, Depth) ->
case is_set(Map, Idx) of
true ->
Pos = get_header_position(Map, Idx),
- Child = erlang:element(Pos + 1, Children),
+ Child = erlang:element(Pos, Children),
NewChild0 = insert(Fd, Child, HId, DiskLoc, Depth + 1),
NewChild1 = write_node(Fd, NewChild0, Depth + 1),
- NewChildren = erlang:setelement(Pos + 1, Children, NewChild1),
+ NewChildren = erlang:setelement(Pos, Children, NewChild1),
{node, Map, NewChildren};
false ->
Pos = get_header_position(Map, Idx),
NewChild0 = {item, HId, [DiskLoc]},
NewChild1 = write_node(Fd, NewChild0, Depth + 1),
- NewMap = Map bor Idx,
- NewChildren = erlang:insert_element(Pos + 1, Children, NewChild1),
+ NewMap = Map bor (1 bsl Idx),
+ NewChildren = erlang:insert_element(Pos, Children, NewChild1),
{node, NewMap, NewChildren}
end.
@@ -136,15 +152,17 @@ lookup(EHamt, Key) ->
lookup(Fd, read_node(Fd, Root), HId, Key, 0).
-lookup(_Fd, {item, HId, []}, HId, _, _) ->
+lookup(_Fd, {item, HId1, []}, HId2, _, Depth)
+ when HId1 == HId2 orelse Depth >= 5 ->
not_found;
-lookup(Fd, {item, HId, [DiskLoc | RestLocs]}, HId, Key, Depth) ->
+lookup(Fd, {item, HId1, [DiskLoc | RestLocs]}, HId2, Key, Depth)
+ when HId1 == HId2 orelse Depth >= 5 ->
case couch_file:pread_term(Fd, DiskLoc) of
{ok, {Key, Value}} ->
{ok, Value};
{ok, _} ->
- lookup(Fd, {item, HId, RestLocs}, HId, Key, Depth)
+ lookup(Fd, {item, HId1, RestLocs}, HId2, Key, Depth)
end;
lookup(_Fd, {item, HId1, _}, HId2, _, _) when HId1 /= HId2 ->
@@ -214,7 +232,8 @@ get_header_index(HashId, Depth) when Depth >= 0, Depth < 5->
is_set(Header, Idx) when Idx >= 0, Idx =< 31 ->
- Header band (1 bsl Idx).
+ Mask = 1 bsl Idx,
+ Header band Mask == Mask.
get_header_position(Header, Idx) ->