summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul J. Davis <paul.joseph.davis@gmail.com>2018-04-24 12:25:43 -0500
committerjiangph <jiangph@cn.ibm.com>2018-08-22 00:59:15 +0800
commitbccc39d4e61e087c8d4f144cf87e635ad8a5c495 (patch)
tree28d9cc4462fbd659bfeca797511772532bd348cf
parenteca282e3fa4285dfeb2ff1591f66330e8d18c868 (diff)
downloadcouchdb-bccc39d4e61e087c8d4f144cf87e635ad8a5c495.tar.gz
[03/10] Clustered Purge: Update couch_bt_engine
This commit updates the couch_bt_engine storage engine implementation to satisfy the newly defined single-node purge APIs. This is accomplished by storing two new database btrees. The purge_seq_tree orders purge requests by their purge_seq. This tree is used to satisfy the fold_purge_infos API for database components to enumerate the list of purge requests in a defined order. The second index is the purge_tree which orders purge requests by their UUID to make for an efficient lookup when filtering replicated purge requests. COUCHDB-3326 Co-authored-by: Mayya Sharipova <mayyas@ca.ibm.com> Co-authored-by: jiangphcn <jiangph@cn.ibm.com>
-rw-r--r--src/couch/src/couch_bt_engine.erl251
-rw-r--r--src/couch/src/couch_bt_engine.hrl4
-rw-r--r--src/couch/src/couch_bt_engine_compactor.erl122
-rw-r--r--src/couch/src/couch_bt_engine_header.erl35
4 files changed, 344 insertions, 68 deletions
diff --git a/src/couch/src/couch_bt_engine.erl b/src/couch/src/couch_bt_engine.erl
index c5df11bc9..6d858ed49 100644
--- a/src/couch/src/couch_bt_engine.erl
+++ b/src/couch/src/couch_bt_engine.erl
@@ -35,8 +35,9 @@
get_disk_version/1,
get_doc_count/1,
get_epochs/1,
- get_last_purged/1,
get_purge_seq/1,
+ get_oldest_purge_seq/1,
+ get_purge_infos_limit/1,
get_revs_limit/1,
get_security/1,
get_size_info/1,
@@ -44,15 +45,18 @@
get_uuid/1,
set_revs_limit/2,
+ set_purge_infos_limit/2,
set_security/2,
open_docs/2,
open_local_docs/2,
read_doc_body/2,
+ load_purge_infos/2,
serialize_doc/2,
write_doc_body/2,
- write_doc_infos/4,
+ write_doc_infos/3,
+ purge_docs/3,
commit_data/1,
@@ -63,6 +67,7 @@
fold_docs/4,
fold_local_docs/4,
fold_changes/5,
+ fold_purge_infos/5,
count_changes_since/2,
start_compaction/4,
@@ -85,7 +90,13 @@
seq_tree_reduce/2,
local_tree_split/1,
- local_tree_join/2
+ local_tree_join/2,
+
+ purge_tree_split/1,
+ purge_tree_join/2,
+ purge_tree_reduce/2,
+ purge_seq_tree_split/1,
+ purge_seq_tree_join/2
]).
@@ -217,18 +228,24 @@ get_epochs(#st{header = Header}) ->
couch_bt_engine_header:get(Header, epochs).
-get_last_purged(#st{header = Header} = St) ->
- case couch_bt_engine_header:get(Header, purged_docs) of
- nil ->
- [];
- Pointer ->
- {ok, PurgeInfo} = couch_file:pread_term(St#st.fd, Pointer),
- PurgeInfo
- end.
+get_purge_seq(#st{purge_seq_tree = PurgeSeqTree}) ->
+ Fun = fun({PurgeSeq, _, _, _}, _Reds, _Acc) ->
+ {stop, PurgeSeq}
+ end,
+ {ok, _, PurgeSeq} = couch_btree:fold(PurgeSeqTree, Fun, 0, [{dir, rev}]),
+ PurgeSeq.
+
+
+get_oldest_purge_seq(#st{purge_seq_tree = PurgeSeqTree}) ->
+ Fun = fun({PurgeSeq, _, _, _}, _Reds, _Acc) ->
+ {stop, PurgeSeq}
+ end,
+ {ok, _, PurgeSeq} = couch_btree:fold(PurgeSeqTree, Fun, 0, []),
+ PurgeSeq.
-get_purge_seq(#st{header = Header}) ->
- couch_bt_engine_header:get(Header, purge_seq).
+get_purge_infos_limit(#st{header = Header}) ->
+ couch_bt_engine_header:get(Header, purge_infos_limit).
get_revs_limit(#st{header = Header}) ->
@@ -284,6 +301,16 @@ set_revs_limit(#st{header = Header} = St, RevsLimit) ->
{ok, increment_update_seq(NewSt)}.
+set_purge_infos_limit(#st{header = Header} = St, PurgeInfosLimit) ->
+ NewSt = St#st{
+ header = couch_bt_engine_header:set(Header, [
+ {purge_infos_limit, PurgeInfosLimit}
+ ]),
+ needs_commit = true
+ },
+ {ok, increment_update_seq(NewSt)}.
+
+
set_security(#st{header = Header} = St, NewSecurity) ->
Options = [{compression, St#st.compression}],
{ok, Ptr, _} = couch_file:append_term(St#st.fd, NewSecurity, Options),
@@ -320,6 +347,14 @@ read_doc_body(#st{} = St, #doc{} = Doc) ->
}.
+load_purge_infos(St, UUIDs) ->
+ Results = couch_btree:lookup(St#st.purge_tree, UUIDs),
+ lists:map(fun
+ ({ok, Info}) -> Info;
+ (not_found) -> not_found
+ end, Results).
+
+
serialize_doc(#st{} = St, #doc{} = Doc) ->
Compress = fun(Term) ->
case couch_compress:is_compressed(Term, St#st.compression) of
@@ -351,7 +386,7 @@ write_doc_body(St, #doc{} = Doc) ->
{ok, Doc#doc{body = Ptr}, Written}.
-write_doc_infos(#st{} = St, Pairs, LocalDocs, PurgedIdRevs) ->
+write_doc_infos(#st{} = St, Pairs, LocalDocs) ->
#st{
id_tree = IdTree,
seq_tree = SeqTree,
@@ -391,23 +426,9 @@ write_doc_infos(#st{} = St, Pairs, LocalDocs, PurgedIdRevs) ->
erlang:max(Seq, Acc)
end, get_update_seq(St), Add),
- NewHeader = case PurgedIdRevs of
- [] ->
- couch_bt_engine_header:set(St#st.header, [
- {update_seq, NewUpdateSeq}
- ]);
- _ ->
- {ok, Ptr, _} = couch_file:append_term(St#st.fd, PurgedIdRevs),
- OldPurgeSeq = couch_bt_engine_header:get(St#st.header, purge_seq),
- % We bump NewUpdateSeq because we have to ensure that
- % indexers see that they need to process the new purge
- % information.
- couch_bt_engine_header:set(St#st.header, [
- {update_seq, NewUpdateSeq + 1},
- {purge_seq, OldPurgeSeq + 1},
- {purged_docs, Ptr}
- ])
- end,
+ NewHeader = couch_bt_engine_header:set(St#st.header, [
+ {update_seq, NewUpdateSeq}
+ ]),
{ok, St#st{
header = NewHeader,
@@ -418,6 +439,46 @@ write_doc_infos(#st{} = St, Pairs, LocalDocs, PurgedIdRevs) ->
}}.
+purge_docs(#st{} = St, Pairs, PurgeInfos) ->
+ #st{
+ id_tree = IdTree,
+ seq_tree = SeqTree,
+ purge_tree = PurgeTree,
+ purge_seq_tree = PurgeSeqTree
+ } = St,
+
+ RemDocIds = [Old#full_doc_info.id || {Old, not_found} <- Pairs],
+ RemSeqs = [Old#full_doc_info.update_seq || {Old, _} <- Pairs],
+ DocsToAdd = [New || {_, New} <- Pairs, New /= not_found],
+ CurrSeq = couch_bt_engine_header:get(St#st.header, update_seq),
+ Seqs = [FDI#full_doc_info.update_seq || FDI <- DocsToAdd],
+ NewSeq = lists:max([CurrSeq | Seqs]),
+
+ % We bump NewUpdateSeq because we have to ensure that
+ % indexers see that they need to process the new purge
+ % information.
+ UpdateSeq = case NewSeq == CurrSeq of
+ true -> CurrSeq + 1;
+ false -> NewSeq
+ end,
+ Header = couch_bt_engine_header:set(St#st.header, [
+ {update_seq, UpdateSeq}
+ ]),
+
+ {ok, IdTree2} = couch_btree:add_remove(IdTree, DocsToAdd, RemDocIds),
+ {ok, SeqTree2} = couch_btree:add_remove(SeqTree, DocsToAdd, RemSeqs),
+ {ok, PurgeTree2} = couch_btree:add(PurgeTree, PurgeInfos),
+ {ok, PurgeSeqTree2} = couch_btree:add(PurgeSeqTree, PurgeInfos),
+ {ok, St#st{
+ header = Header,
+ id_tree = IdTree2,
+ seq_tree = SeqTree2,
+ purge_tree = PurgeTree2,
+ purge_seq_tree = PurgeSeqTree2,
+ needs_commit = true
+ }}.
+
+
commit_data(St) ->
#st{
fd = Fd,
@@ -480,6 +541,21 @@ fold_changes(St, SinceSeq, UserFun, UserAcc, Options) ->
{ok, FinalUserAcc}.
+fold_purge_infos(St, StartSeq0, UserFun, UserAcc, Options) ->
+ PurgeSeqTree = St#st.purge_seq_tree,
+ StartSeq = StartSeq0 + 1,
+ MinSeq = get_oldest_purge_seq(St),
+ if MinSeq =< StartSeq -> ok; true ->
+ erlang:error({invalid_start_purge_seq, StartSeq0})
+ end,
+ Wrapper = fun(Info, _Reds, UAcc) ->
+ UserFun(Info, UAcc)
+ end,
+ Opts = [{start_key, StartSeq}] ++ Options,
+ {ok, _, OutAcc} = couch_btree:fold(PurgeSeqTree, Wrapper, UserAcc, Opts),
+ {ok, OutAcc}.
+
+
count_changes_since(St, SinceSeq) ->
BTree = St#st.seq_tree,
FoldFun = fun(_SeqStart, PartialReds, 0) ->
@@ -619,6 +695,13 @@ local_tree_split(#doc{revs = {0, [Rev]}} = Doc) when is_integer(Rev) ->
{Id, {Rev, BodyData}}.
+local_tree_join(Id, {Rev, BodyData}) when is_binary(Rev) ->
+ #doc{
+ id = Id,
+ revs = {0, [Rev]},
+ body = BodyData
+ };
+
local_tree_join(Id, {Rev, BodyData}) when is_integer(Rev) ->
#doc{
id = Id,
@@ -627,6 +710,29 @@ local_tree_join(Id, {Rev, BodyData}) when is_integer(Rev) ->
}.
+purge_tree_split({PurgeSeq, UUID, DocId, Revs}) ->
+ {UUID, {PurgeSeq, DocId, Revs}}.
+
+
+purge_tree_join(UUID, {PurgeSeq, DocId, Revs}) ->
+ {PurgeSeq, UUID, DocId, Revs}.
+
+
+purge_seq_tree_split({PurgeSeq, UUID, DocId, Revs}) ->
+ {PurgeSeq, {UUID, DocId, Revs}}.
+
+
+purge_seq_tree_join(PurgeSeq, {UUID, DocId, Revs}) ->
+ {PurgeSeq, UUID, DocId, Revs}.
+
+
+purge_tree_reduce(reduce, IdRevs) ->
+ % count the number of purge requests
+ length(IdRevs);
+purge_tree_reduce(rereduce, Reds) ->
+ lists:sum(Reds).
+
+
set_update_seq(#st{header = Header} = St, UpdateSeq) ->
{ok, St#st{
header = couch_bt_engine_header:set(Header, [
@@ -682,7 +788,8 @@ init_state(FilePath, Fd, Header0, Options) ->
Compression = couch_compress:get_compression_method(),
Header1 = couch_bt_engine_header:upgrade(Header0),
- Header = set_default_security_object(Fd, Header1, Compression, Options),
+ Header2 = set_default_security_object(Fd, Header1, Compression, Options),
+ Header = upgrade_purge_info(Fd, Header2),
IdTreeState = couch_bt_engine_header:id_tree_state(Header),
{ok, IdTree} = couch_btree:open(IdTreeState, Fd, [
@@ -707,6 +814,20 @@ init_state(FilePath, Fd, Header0, Options) ->
{compression, Compression}
]),
+ PurgeTreeState = couch_bt_engine_header:purge_tree_state(Header),
+ {ok, PurgeTree} = couch_btree:open(PurgeTreeState, Fd, [
+ {split, fun ?MODULE:purge_tree_split/1},
+ {join, fun ?MODULE:purge_tree_join/2},
+ {reduce, fun ?MODULE:purge_tree_reduce/2}
+ ]),
+
+ PurgeSeqTreeState = couch_bt_engine_header:purge_seq_tree_state(Header),
+ {ok, PurgeSeqTree} = couch_btree:open(PurgeSeqTreeState, Fd, [
+ {split, fun ?MODULE:purge_seq_tree_split/1},
+ {join, fun ?MODULE:purge_seq_tree_join/2},
+ {reduce, fun ?MODULE:purge_tree_reduce/2}
+ ]),
+
ok = couch_file:set_db_pid(Fd, self()),
St = #st{
@@ -719,7 +840,9 @@ init_state(FilePath, Fd, Header0, Options) ->
id_tree = IdTree,
seq_tree = SeqTree,
local_tree = LocalTree,
- compression = Compression
+ compression = Compression,
+ purge_tree = PurgeTree,
+ purge_seq_tree = PurgeSeqTree
},
% If this is a new database we've just created a
@@ -738,7 +861,9 @@ update_header(St, Header) ->
couch_bt_engine_header:set(Header, [
{seq_tree_state, couch_btree:get_state(St#st.seq_tree)},
{id_tree_state, couch_btree:get_state(St#st.id_tree)},
- {local_tree_state, couch_btree:get_state(St#st.local_tree)}
+ {local_tree_state, couch_btree:get_state(St#st.local_tree)},
+ {purge_tree_state, couch_btree:get_state(St#st.purge_tree)},
+ {purge_seq_tree_state, couch_btree:get_state(St#st.purge_seq_tree)}
]).
@@ -763,6 +888,57 @@ set_default_security_object(Fd, Header, Compression, Options) ->
end.
+% This function is here, and not in couch_bt_engine_header
+% because it requires modifying file contents
+upgrade_purge_info(Fd, Header) ->
+ case couch_bt_engine_header:get(Header, purge_tree_state) of
+ nil ->
+ Header;
+ Ptr when is_tuple(Ptr) ->
+ Header;
+ PurgeSeq when is_integer(PurgeSeq)->
+ % Pointer to old purged ids/revs is in purge_seq_tree_state
+ Ptr = couch_bt_engine_header:get(Header, purge_seq_tree_state),
+
+ case Ptr of
+ nil ->
+ PTS = couch_bt_engine_header:purge_tree_state(Header),
+ PurgeTreeSt = case PTS of 0 -> nil; Else -> Else end,
+ couch_bt_engine_header:set(Header, [
+ {purge_tree_state, PurgeTreeSt}
+ ]);
+ _ ->
+ {ok, PurgedIdsRevs} = couch_file:pread_term(Fd, Ptr),
+
+ {Infos, NewSeq} = lists:foldl(fun({Id, Revs}, {InfoAcc, PSeq}) ->
+ Info = {PSeq, couch_uuids:random(), Id, Revs},
+ {[Info | InfoAcc], PSeq + 1}
+ end, {[], PurgeSeq}, PurgedIdsRevs),
+
+ {ok, PurgeTree} = couch_btree:open(nil, Fd, [
+ {split, fun ?MODULE:purge_tree_split/1},
+ {join, fun ?MODULE:purge_tree_join/2},
+ {reduce, fun ?MODULE:purge_tree_reduce/2}
+ ]),
+ {ok, PurgeTree2} = couch_btree:add(PurgeTree, Infos),
+ PurgeTreeSt = couch_btree:get_state(PurgeTree2),
+
+ {ok, PurgeSeqTree} = couch_btree:open(nil, Fd, [
+ {split, fun ?MODULE:purge_seq_tree_split/1},
+ {join, fun ?MODULE:purge_seq_tree_join/2},
+ {reduce, fun ?MODULE:purge_tree_reduce/2}
+ ]),
+ {ok, PurgeSeqTree2} = couch_btree:add(PurgeSeqTree, Infos),
+ PurgeSeqTreeSt = couch_btree:get_state(PurgeSeqTree2),
+
+ couch_bt_engine_header:set(Header, [
+ {purge_tree_state, PurgeTreeSt},
+ {purge_seq_tree_state, PurgeSeqTreeSt}
+ ])
+ end
+ end.
+
+
delete_compaction_files(FilePath) ->
RootDir = config:get("couchdb", "database_dir", "."),
DelOpts = [{context, compaction}],
@@ -840,7 +1016,9 @@ active_size(#st{} = St, #size_info{} = SI) ->
Trees = [
St#st.id_tree,
St#st.seq_tree,
- St#st.local_tree
+ St#st.local_tree,
+ St#st.purge_tree,
+ St#st.purge_seq_tree
],
lists:foldl(fun(T, Acc) ->
case couch_btree:size(T) of
@@ -933,7 +1111,8 @@ finish_compaction_int(#st{} = OldSt, #st{} = NewSt1) ->
{ok, NewSt2} = commit_data(NewSt1#st{
header = couch_bt_engine_header:set(Header, [
{compacted_seq, get_update_seq(OldSt)},
- {revs_limit, get_revs_limit(OldSt)}
+ {revs_limit, get_revs_limit(OldSt)},
+ {purge_infos_limit, get_purge_infos_limit(OldSt)}
]),
local_tree = NewLocal2
}),
diff --git a/src/couch/src/couch_bt_engine.hrl b/src/couch/src/couch_bt_engine.hrl
index 7f52d8fdd..1f5bcc9df 100644
--- a/src/couch/src/couch_bt_engine.hrl
+++ b/src/couch/src/couch_bt_engine.hrl
@@ -20,5 +20,7 @@
id_tree,
seq_tree,
local_tree,
- compression
+ compression,
+ purge_tree,
+ purge_seq_tree
}).
diff --git a/src/couch/src/couch_bt_engine_compactor.erl b/src/couch/src/couch_bt_engine_compactor.erl
index 2c5b78e0b..10de68687 100644
--- a/src/couch/src/couch_bt_engine_compactor.erl
+++ b/src/couch/src/couch_bt_engine_compactor.erl
@@ -56,7 +56,7 @@ start(#st{} = St, DbName, Options, Parent) ->
% and hope everything works out for the best.
unlink(DFd),
- NewSt1 = copy_purge_info(St, NewSt),
+ NewSt1 = copy_purge_info(DbName, St, NewSt, Retry),
NewSt2 = copy_compact(DbName, St, NewSt1, Retry),
NewSt3 = sort_meta_data(NewSt2),
NewSt4 = commit_compaction_data(NewSt3),
@@ -99,23 +99,111 @@ open_compaction_files(SrcHdr, DbFilePath, Options) ->
end.
-copy_purge_info(OldSt, NewSt) ->
- OldHdr = OldSt#st.header,
- NewHdr = NewSt#st.header,
- OldPurgeSeq = couch_bt_engine_header:purge_seq(OldHdr),
- case OldPurgeSeq > 0 of
+copy_purge_info(DbName, OldSt, NewSt, Retry) ->
+ MinPurgeSeq = couch_util:with_db(DbName, fun(Db) ->
+ couch_db:get_minimum_purge_seq(Db)
+ end),
+ OldPSTree = OldSt#st.purge_seq_tree,
+ StartSeq = couch_bt_engine:get_purge_seq(NewSt) + 1,
+ BufferSize = config:get_integer(
+ "database_compaction", "doc_buffer_size", 524288),
+ CheckpointAfter = config:get(
+ "database_compaction", "checkpoint_after", BufferSize * 10),
+
+ EnumFun = fun(Info, _Reds, {StAcc0, InfosAcc, InfosSize, CopiedSize}) ->
+ NewInfosSize = InfosSize + ?term_size(Info),
+ if NewInfosSize >= BufferSize ->
+ StAcc1 = copy_purge_infos(
+ OldSt, StAcc0, [Info | InfosAcc], MinPurgeSeq, Retry),
+ NewCopiedSize = CopiedSize + NewInfosSize,
+ if NewCopiedSize >= CheckpointAfter ->
+ StAcc2 = commit_compaction_data(StAcc1),
+ {ok, {StAcc2, [], 0, 0}};
+ true ->
+ {ok, {StAcc1, [], 0, NewCopiedSize}}
+ end;
true ->
- Purged = couch_bt_engine:get_last_purged(OldSt),
- Opts = [{compression, NewSt#st.compression}],
- {ok, Ptr, _} = couch_file:append_term(NewSt#st.fd, Purged, Opts),
- NewNewHdr = couch_bt_engine_header:set(NewHdr, [
- {purge_seq, OldPurgeSeq},
- {purged_docs, Ptr}
- ]),
- NewSt#st{header = NewNewHdr};
- false ->
- NewSt
- end.
+ NewInfosAcc = [Info | InfosAcc],
+ {ok, {StAcc0, NewInfosAcc, NewInfosSize, CopiedSize}}
+ end
+ end,
+
+ InitAcc = {NewSt, [], 0, 0},
+ Opts = [{start_key, StartSeq}],
+ {ok, _, FinalAcc} = couch_btree:fold(OldPSTree, EnumFun, InitAcc, Opts),
+ {NewStAcc, Infos, _, _} = FinalAcc,
+ copy_purge_infos(OldSt, NewStAcc, Infos, MinPurgeSeq, Retry).
+
+
+copy_purge_infos(OldSt, NewSt0, Infos, MinPurgeSeq, Retry) ->
+ #st{
+ id_tree = OldIdTree
+ } = OldSt,
+
+ % Re-bind our id_tree to the backing btree
+ NewIdTreeState = couch_bt_engine_header:id_tree_state(NewSt0#st.header),
+ MetaFd = couch_emsort:get_fd(NewSt0#st.id_tree),
+ MetaState = couch_emsort:get_state(NewSt0#st.id_tree),
+ NewSt1 = bind_id_tree(NewSt0, NewSt0#st.fd, NewIdTreeState),
+
+ #st{
+ id_tree = NewIdTree0,
+ seq_tree = NewSeqTree0,
+ purge_tree = NewPurgeTree0,
+ purge_seq_tree = NewPurgeSeqTree0
+ } = NewSt1,
+
+ % Copy over the purge infos
+ InfosToAdd = lists:filter(fun({PSeq, _, _, _}) ->
+ PSeq > MinPurgeSeq
+ end, Infos),
+ {ok, NewPurgeTree1} = couch_btree:add(NewPurgeTree0, InfosToAdd),
+ {ok, NewPurgeSeqTree1} = couch_btree:add(NewPurgeSeqTree0, InfosToAdd),
+
+ NewSt2 = NewSt1#st{
+ purge_tree = NewPurgeTree1,
+ purge_seq_tree = NewPurgeSeqTree1
+ },
+
+ % If we're peforming a retry compaction we have to check if
+ % any of the referenced docs have been completely purged
+ % from the database. Any doc that has been completely purged
+ % must then be removed from our partially compacted database.
+ NewSt3 = if Retry == nil -> NewSt2; true ->
+ AllDocIds = [DocId || {_PurgeSeq, _UUID, DocId, _Revs} <- Infos],
+ UniqDocIds = lists:usort(AllDocIds),
+ OldIdResults = couch_btree:lookup(OldIdTree, UniqDocIds),
+ OldZipped = lists:zip(UniqDocIds, OldIdResults),
+
+ % The list of non-existant docs in the database being compacted
+ MaybeRemDocIds = [DocId || {DocId, not_found} <- OldZipped],
+
+ % Removing anything that exists in the partially compacted database
+ NewIdResults = couch_btree:lookup(NewIdTree0, MaybeRemDocIds),
+ ToRemove = [Doc || {ok, Doc} <- NewIdResults, Doc /= {ok, not_found}],
+
+ {RemIds, RemSeqs} = lists:unzip(lists:map(fun(FDI) ->
+ #full_doc_info{
+ id = Id,
+ update_seq = Seq
+ } = FDI,
+ {Id, Seq}
+ end, ToRemove)),
+
+ {ok, NewIdTree1} = couch_btree:add_remove(NewIdTree0, [], RemIds),
+ {ok, NewSeqTree1} = couch_btree:add_remove(NewSeqTree0, [], RemSeqs),
+
+ NewSt2#st{
+ id_tree = NewIdTree1,
+ seq_tree = NewSeqTree1
+ }
+ end,
+
+ Header = couch_bt_engine:update_header(NewSt3, NewSt3#st.header),
+ NewSt4 = NewSt3#st{
+ header = Header
+ },
+ bind_emsort(NewSt4, MetaFd, MetaState).
copy_compact(DbName, St, NewSt0, Retry) ->
diff --git a/src/couch/src/couch_bt_engine_header.erl b/src/couch/src/couch_bt_engine_header.erl
index 3d24f3189..467bb2ff8 100644
--- a/src/couch/src/couch_bt_engine_header.erl
+++ b/src/couch/src/couch_bt_engine_header.erl
@@ -31,8 +31,9 @@
seq_tree_state/1,
latest/1,
local_tree_state/1,
- purge_seq/1,
- purged_docs/1,
+ purge_tree_state/1,
+ purge_seq_tree_state/1,
+ purge_infos_limit/1,
security_ptr/1,
revs_limit/1,
uuid/1,
@@ -51,7 +52,7 @@
% if the disk revision is incremented, then new upgrade logic will need to be
% added to couch_db_updater:init_db.
--define(LATEST_DISK_VERSION, 6).
+-define(LATEST_DISK_VERSION, 7).
-record(db_header, {
disk_version = ?LATEST_DISK_VERSION,
@@ -60,13 +61,14 @@
id_tree_state = nil,
seq_tree_state = nil,
local_tree_state = nil,
- purge_seq = 0,
- purged_docs = nil,
+ purge_tree_state = nil,
+ purge_seq_tree_state = nil, %purge tree: purge_seq -> uuid
security_ptr = nil,
revs_limit = 1000,
uuid,
epochs,
- compacted_seq
+ compacted_seq,
+ purge_infos_limit = 1000
}).
@@ -150,12 +152,12 @@ local_tree_state(Header) ->
get_field(Header, local_tree_state).
-purge_seq(Header) ->
- get_field(Header, purge_seq).
+purge_tree_state(Header) ->
+ get_field(Header, purge_tree_state).
-purged_docs(Header) ->
- get_field(Header, purged_docs).
+purge_seq_tree_state(Header) ->
+ get_field(Header, purge_seq_tree_state).
security_ptr(Header) ->
@@ -178,6 +180,10 @@ compacted_seq(Header) ->
get_field(Header, compacted_seq).
+purge_infos_limit(Header) ->
+ get_field(Header, purge_infos_limit).
+
+
get_field(Header, Field) ->
get_field(Header, Field, undefined).
@@ -229,6 +235,7 @@ upgrade_disk_version(#db_header{}=Header) ->
3 -> throw({database_disk_version_error, ?OLD_DISK_VERSION_ERROR});
4 -> Header#db_header{security_ptr = nil}; % [0.10 - 0.11)
5 -> Header; % pre 1.2
+ 6 -> Header; % pre clustered purge
?LATEST_DISK_VERSION -> Header;
_ ->
Reason = "Incorrect disk header version",
@@ -322,8 +329,8 @@ mk_header(Vsn) ->
foo, % id_tree_state
bar, % seq_tree_state
bam, % local_tree_state
- 1, % purge_seq
- baz, % purged_docs
+ flam, % was purge_seq - now purge_tree_state
+ baz, % was purged_docs - now purge_seq_tree_state
bang, % security_ptr
999 % revs_limit
}.
@@ -342,8 +349,8 @@ upgrade_v3_test() ->
?assertEqual(foo, id_tree_state(NewHeader)),
?assertEqual(bar, seq_tree_state(NewHeader)),
?assertEqual(bam, local_tree_state(NewHeader)),
- ?assertEqual(1, purge_seq(NewHeader)),
- ?assertEqual(baz, purged_docs(NewHeader)),
+ ?assertEqual(flam, purge_tree_state(NewHeader)),
+ ?assertEqual(baz, purge_seq_tree_state(NewHeader)),
?assertEqual(bang, security_ptr(NewHeader)),
?assertEqual(999, revs_limit(NewHeader)),
?assertEqual(undefined, uuid(NewHeader)),