summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul J. Davis <paul.joseph.davis@gmail.com>2018-04-24 12:26:42 -0500
committerjiangph <jiangph@cn.ibm.com>2018-08-22 00:59:15 +0800
commit3be455b4816858a092b43e5a2ea8a5e6a564e4d8 (patch)
treebfc1e49dc4bfe96afddc906299b2f82d84277a6c
parent48378a97153006e75b9909064c67ce1391f266cf (diff)
downloadcouchdb-3be455b4816858a092b43e5a2ea8a5e6a564e4d8.tar.gz
[06/10] Clustered Purge: Update mrview indexes
This commit updates the mrview secondary index to properly process the new history of purge requests as well as to store the _local purge checkpoint doc. The importance of the _local checkpoint doc is to ensure that compaction of a database does not remove any purge requests that have not yet been processed by this secondary index. COUCHDB-3326 Co-authored-by: Mayya Sharipova <mayyas@ca.ibm.com> Co-authored-by: jiangphcn <jiangph@cn.ibm.com>
-rw-r--r--src/couch_index/src/couch_index_epi.erl5
-rw-r--r--src/couch_index/src/couch_index_plugin_couch_db.erl26
-rw-r--r--src/couch_index/src/couch_index_updater.erl50
-rw-r--r--src/couch_mrview/src/couch_mrview_cleanup.erl16
-rw-r--r--src/couch_mrview/src/couch_mrview_index.erl115
-rw-r--r--src/couch_mrview/src/couch_mrview_test_util.erl5
-rw-r--r--src/couch_mrview/src/couch_mrview_updater.erl14
-rw-r--r--src/couch_mrview/src/couch_mrview_util.erl39
-rw-r--r--src/couch_mrview/test/couch_mrview_purge_docs_fabric_tests.erl276
-rw-r--r--src/couch_mrview/test/couch_mrview_purge_docs_tests.erl506
10 files changed, 1022 insertions, 30 deletions
diff --git a/src/couch_index/src/couch_index_epi.erl b/src/couch_index/src/couch_index_epi.erl
index 946a5906b..1c4eb9596 100644
--- a/src/couch_index/src/couch_index_epi.erl
+++ b/src/couch_index/src/couch_index_epi.erl
@@ -28,8 +28,9 @@ app() ->
couch_index.
providers() ->
- [].
-
+ [
+ {couch_db, couch_index_plugin_couch_db}
+ ].
services() ->
[
diff --git a/src/couch_index/src/couch_index_plugin_couch_db.erl b/src/couch_index/src/couch_index_plugin_couch_db.erl
new file mode 100644
index 000000000..0af22e396
--- /dev/null
+++ b/src/couch_index/src/couch_index_plugin_couch_db.erl
@@ -0,0 +1,26 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_index_plugin_couch_db).
+
+-export([
+ is_valid_purge_client/2,
+ on_compact/2
+]).
+
+
+is_valid_purge_client(DbName, Props) ->
+ couch_mrview_index:verify_index_exists(DbName, Props).
+
+
+on_compact(DbName, DDocs) ->
+ couch_mrview_index:ensure_local_purge_docs(DbName, DDocs).
diff --git a/src/couch_index/src/couch_index_updater.erl b/src/couch_index/src/couch_index_updater.erl
index 5ab9ea809..7864bde4d 100644
--- a/src/couch_index/src/couch_index_updater.erl
+++ b/src/couch_index/src/couch_index_updater.erl
@@ -141,12 +141,10 @@ update(Idx, Mod, IdxState) ->
DbUpdateSeq = couch_db:get_update_seq(Db),
DbCommittedSeq = couch_db:get_committed_update_seq(Db),
- PurgedIdxState = case purge_index(Db, Mod, IdxState) of
- {ok, IdxState0} -> IdxState0;
- reset -> exit({reset, self()})
- end,
-
- NumChanges = couch_db:count_changes_since(Db, CurrSeq),
+ NumUpdateChanges = couch_db:count_changes_since(Db, CurrSeq),
+ NumPurgeChanges = count_pending_purged_docs_since(Db, Mod, IdxState),
+ TotalChanges = NumUpdateChanges + NumPurgeChanges,
+ {ok, PurgedIdxState} = purge_index(Db, Mod, IdxState),
GetSeq = fun
(#full_doc_info{update_seq=Seq}) -> Seq;
@@ -185,8 +183,13 @@ update(Idx, Mod, IdxState) ->
{ok, {NewSt, true}}
end
end,
+ {ok, InitIdxState} = Mod:start_update(
+ Idx,
+ PurgedIdxState,
+ TotalChanges,
+ NumPurgeChanges
+ ),
- {ok, InitIdxState} = Mod:start_update(Idx, PurgedIdxState, NumChanges),
Acc0 = {InitIdxState, true},
{ok, Acc} = couch_db:fold_changes(Db, CurrSeq, Proc, Acc0, []),
{ProcIdxSt, SendLast} = Acc,
@@ -206,14 +209,29 @@ update(Idx, Mod, IdxState) ->
purge_index(Db, Mod, IdxState) ->
- {ok, DbPurgeSeq} = couch_db:get_purge_seq(Db),
+ DbPurgeSeq = couch_db:get_purge_seq(Db),
IdxPurgeSeq = Mod:get(purge_seq, IdxState),
- if
- DbPurgeSeq == IdxPurgeSeq ->
- {ok, IdxState};
- DbPurgeSeq == IdxPurgeSeq + 1 ->
- {ok, PurgedIdRevs} = couch_db:get_last_purged(Db),
- Mod:purge(Db, DbPurgeSeq, PurgedIdRevs, IdxState);
- true ->
- reset
+ if IdxPurgeSeq == DbPurgeSeq -> {ok, IdxState}; true ->
+ FoldFun = fun({PurgeSeq, _UUId, Id, Revs}, Acc) ->
+ Mod:purge(Db, PurgeSeq, [{Id, Revs}], Acc)
+ end,
+ {ok, NewStateAcc} = try
+ couch_db:fold_purge_infos(
+ Db,
+ IdxPurgeSeq,
+ FoldFun,
+ IdxState,
+ []
+ )
+ catch error:{invalid_start_purge_seq, _} ->
+ exit({reset, self()})
+ end,
+ Mod:update_local_purge_doc(Db, NewStateAcc),
+ {ok, NewStateAcc}
end.
+
+
+count_pending_purged_docs_since(Db, Mod, IdxState) ->
+ DbPurgeSeq = couch_db:get_purge_seq(Db),
+ IdxPurgeSeq = Mod:get(purge_seq, IdxState),
+ DbPurgeSeq - IdxPurgeSeq.
diff --git a/src/couch_mrview/src/couch_mrview_cleanup.erl b/src/couch_mrview/src/couch_mrview_cleanup.erl
index 380376de2..e0cb1c64f 100644
--- a/src/couch_mrview/src/couch_mrview_cleanup.erl
+++ b/src/couch_mrview/src/couch_mrview_cleanup.erl
@@ -41,7 +41,19 @@ run(Db) ->
lists:foreach(fun(FN) ->
couch_log:debug("Deleting stale view file: ~s", [FN]),
- couch_file:delete(RootDir, FN, [sync])
+ couch_file:delete(RootDir, FN, [sync]),
+ case couch_mrview_util:verify_view_filename(FN) of
+ true ->
+ Sig = couch_mrview_util:get_signature_from_filename(FN),
+ DocId = couch_mrview_util:get_local_purge_doc_id(Sig),
+ case couch_db:open_doc(Db, DocId, []) of
+ {ok, LocalPurgeDoc} ->
+ couch_db:update_doc(Db,
+ LocalPurgeDoc#doc{deleted=true}, [?ADMIN_CTX]);
+ {not_found, _} ->
+ ok
+ end;
+ false -> ok
+ end
end, ToDelete),
-
ok.
diff --git a/src/couch_mrview/src/couch_mrview_index.erl b/src/couch_mrview/src/couch_mrview_index.erl
index 5d285d639..4718b562d 100644
--- a/src/couch_mrview/src/couch_mrview_index.erl
+++ b/src/couch_mrview/src/couch_mrview_index.erl
@@ -15,9 +15,11 @@
-export([get/2]).
-export([init/2, open/2, close/1, reset/1, delete/1, shutdown/1]).
--export([start_update/3, purge/4, process_doc/3, finish_update/1, commit/1]).
+-export([start_update/4, purge/4, process_doc/3, finish_update/1, commit/1]).
-export([compact/3, swap_compacted/2, remove_compacted/1]).
-export([index_file_exists/1]).
+-export([update_local_purge_doc/2, verify_index_exists/2]).
+-export([ensure_local_purge_docs/2]).
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_mrview/include/couch_mrview.hrl").
@@ -122,14 +124,17 @@ open(Db, State) ->
{ok, {OldSig, Header}} ->
% Matching view signatures.
NewSt = couch_mrview_util:init_state(Db, Fd, State, Header),
+ ensure_local_purge_doc(Db, NewSt),
{ok, NewSt};
% end of upgrade code for <= 1.2.x
{ok, {Sig, Header}} ->
% Matching view signatures.
NewSt = couch_mrview_util:init_state(Db, Fd, State, Header),
+ ensure_local_purge_doc(Db, NewSt),
{ok, NewSt};
_ ->
NewSt = couch_mrview_util:reset_index(Db, Fd, State),
+ ensure_local_purge_doc(Db, NewSt),
{ok, NewSt}
end;
{error, Reason} = Error ->
@@ -168,8 +173,13 @@ reset(State) ->
end).
-start_update(PartialDest, State, NumChanges) ->
- couch_mrview_updater:start_update(PartialDest, State, NumChanges).
+start_update(PartialDest, State, NumChanges, NumChangesDone) ->
+ couch_mrview_updater:start_update(
+ PartialDest,
+ State,
+ NumChanges,
+ NumChangesDone
+ ).
purge(Db, PurgeSeq, PurgedIdRevs, State) ->
@@ -208,3 +218,102 @@ index_file_exists(State) ->
} = State,
IndexFName = couch_mrview_util:index_file(DbName, Sig),
filelib:is_file(IndexFName).
+
+
+verify_index_exists(DbName, Props) ->
+ try
+ Type = couch_util:get_value(<<"type">>, Props),
+ if Type =/= <<"mrview">> -> false; true ->
+ DDocId = couch_util:get_value(<<"ddoc_id">>, Props),
+ couch_util:with_db(DbName, fun(Db) ->
+ {ok, DesignDocs} = couch_db:get_design_docs(Db),
+ case get_ddoc(DbName, DesignDocs, DDocId) of
+ #doc{} = DDoc ->
+ {ok, IdxState} = couch_mrview_util:ddoc_to_mrst(
+ DbName, DDoc),
+ IdxSig = IdxState#mrst.sig,
+ SigInLocal = couch_util:get_value(
+ <<"signature">>, Props),
+ couch_index_util:hexsig(IdxSig) == SigInLocal;
+ not_found ->
+ false
+ end
+ end)
+ end
+ catch _:_ ->
+ false
+ end.
+
+
+get_ddoc(<<"shards/", _/binary>> = _DbName, DesignDocs, DDocId) ->
+ DDocs = [couch_doc:from_json_obj(DD) || DD <- DesignDocs],
+ case lists:keyfind(DDocId, #doc.id, DDocs) of
+ #doc{} = DDoc -> DDoc;
+ false -> not_found
+ end;
+get_ddoc(DbName, DesignDocs, DDocId) ->
+ couch_util:with_db(DbName, fun(Db) ->
+ case lists:keyfind(DDocId, #full_doc_info.id, DesignDocs) of
+ #full_doc_info{} = DDocInfo ->
+ {ok, DDoc} = couch_db:open_doc_int(
+ Db, DDocInfo, [ejson_body]),
+ DDoc;
+ false ->
+ not_found
+ end
+ end).
+
+
+ensure_local_purge_docs(DbName, DDocs) ->
+ couch_util:with_db(DbName, fun(Db) ->
+ lists:foreach(fun(DDoc) ->
+ try couch_mrview_util:ddoc_to_mrst(DbName, DDoc) of
+ {ok, MRSt} ->
+ ensure_local_purge_doc(Db, MRSt)
+ catch _:_ ->
+ ok
+ end
+ end, DDocs)
+ end).
+
+
+ensure_local_purge_doc(Db, #mrst{}=State) ->
+ Sig = couch_index_util:hexsig(get(signature, State)),
+ DocId = couch_mrview_util:get_local_purge_doc_id(Sig),
+ case couch_db:open_doc(Db, DocId, []) of
+ {not_found, _} ->
+ create_local_purge_doc(Db, State);
+ {ok, _} ->
+ ok
+ end.
+
+
+create_local_purge_doc(Db, State) ->
+ PurgeSeq = couch_db:get_purge_seq(Db),
+ update_local_purge_doc(Db, State, PurgeSeq).
+
+
+update_local_purge_doc(Db, State) ->
+ update_local_purge_doc(Db, State, get(purge_seq, State)).
+
+
+update_local_purge_doc(Db, State, PSeq) ->
+ Sig = couch_index_util:hexsig(State#mrst.sig),
+ DocId = couch_mrview_util:get_local_purge_doc_id(Sig),
+ {Mega, Secs, _} = os:timestamp(),
+ NowSecs = Mega * 1000000 + Secs,
+ BaseDoc = couch_doc:from_json_obj({[
+ {<<"_id">>, DocId},
+ {<<"type">>, <<"mrview">>},
+ {<<"purge_seq">>, PSeq},
+ {<<"updated_on">>, NowSecs},
+ {<<"ddoc_id">>, get(idx_name, State)},
+ {<<"signature">>, Sig}
+ ]}),
+ Doc = case couch_db:open_doc(Db, DocId, []) of
+ {ok, #doc{revs = Revs}} ->
+ BaseDoc#doc{revs = Revs};
+ {not_found, _} ->
+ BaseDoc
+ end,
+ couch_db:update_doc(Db, Doc, []).
diff --git a/src/couch_mrview/src/couch_mrview_test_util.erl b/src/couch_mrview/src/couch_mrview_test_util.erl
index b07b07679..35ab6c673 100644
--- a/src/couch_mrview/src/couch_mrview_test_util.erl
+++ b/src/couch_mrview/src/couch_mrview_test_util.erl
@@ -49,6 +49,11 @@ make_docs(local, Count) ->
make_docs(_, Count) ->
[doc(I) || I <- lists:seq(1, Count)].
+
+make_docs(_, Since, Count) ->
+ [doc(I) || I <- lists:seq(Since, Count)].
+
+
ddoc({changes, Opts}) ->
ViewOpts = case Opts of
seq_indexed ->
diff --git a/src/couch_mrview/src/couch_mrview_updater.erl b/src/couch_mrview/src/couch_mrview_updater.erl
index 214f48793..3383b49b6 100644
--- a/src/couch_mrview/src/couch_mrview_updater.erl
+++ b/src/couch_mrview/src/couch_mrview_updater.erl
@@ -12,15 +12,14 @@
-module(couch_mrview_updater).
--export([start_update/3, purge/4, process_doc/3, finish_update/1]).
+-export([start_update/4, purge/4, process_doc/3, finish_update/1]).
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_mrview/include/couch_mrview.hrl").
-define(REM_VAL, removed).
-
-start_update(Partial, State, NumChanges) ->
+start_update(Partial, State, NumChanges, NumChangesDone) ->
MaxSize = config:get_integer("view_updater", "queue_memory_cap", 100000),
MaxItems = config:get_integer("view_updater", "queue_item_cap", 500),
QueueOpts = [{max_size, MaxSize}, {max_items, MaxItems}],
@@ -36,14 +35,19 @@ start_update(Partial, State, NumChanges) ->
},
Self = self(),
+
MapFun = fun() ->
+ Progress = case NumChanges of
+ 0 -> 0;
+ _ -> (NumChangesDone * 100) div NumChanges
+ end,
couch_task_status:add_task([
{indexer_pid, ?l2b(pid_to_list(Partial))},
{type, indexer},
{database, State#mrst.db_name},
{design_document, State#mrst.idx_name},
- {progress, 0},
- {changes_done, 0},
+ {progress, Progress},
+ {changes_done, NumChangesDone},
{total_changes, NumChanges}
]),
couch_task_status:set_update_frequency(500),
diff --git a/src/couch_mrview/src/couch_mrview_util.erl b/src/couch_mrview/src/couch_mrview_util.erl
index 120a9b873..4fd82e0af 100644
--- a/src/couch_mrview/src/couch_mrview_util.erl
+++ b/src/couch_mrview/src/couch_mrview_util.erl
@@ -13,6 +13,8 @@
-module(couch_mrview_util).
-export([get_view/4, get_view_index_pid/4]).
+-export([get_local_purge_doc_id/1, get_value_from_options/2]).
+-export([verify_view_filename/1, get_signature_from_filename/1]).
-export([ddoc_to_mrst/2, init_state/4, reset_index/3]).
-export([make_header/1]).
-export([index_file/2, compaction_file/2, open_file/1]).
@@ -42,6 +44,39 @@
-include_lib("couch_mrview/include/couch_mrview.hrl").
+get_local_purge_doc_id(Sig) ->
+ ?l2b(?LOCAL_DOC_PREFIX ++ "purge-mrview-" ++ Sig).
+
+
+get_value_from_options(Key, Options) ->
+ case couch_util:get_value(Key, Options) of
+ undefined ->
+ Reason = <<"'", Key/binary, "' must exists in options.">>,
+ throw({bad_request, Reason});
+ Value -> Value
+ end.
+
+
+verify_view_filename(FileName) ->
+ FilePathList = filename:split(FileName),
+ PureFN = lists:last(FilePathList),
+ case filename:extension(PureFN) of
+ ".view" ->
+ Sig = filename:basename(PureFN),
+ case [Ch || Ch <- Sig, not (((Ch >= $0) and (Ch =< $9))
+ orelse ((Ch >= $a) and (Ch =< $f))
+ orelse ((Ch >= $A) and (Ch =< $F)))] == [] of
+ true -> true;
+ false -> false
+ end;
+ _ -> false
+ end.
+
+get_signature_from_filename(FileName) ->
+ FilePathList = filename:split(FileName),
+ PureFN = lists:last(FilePathList),
+ filename:basename(PureFN, ".view").
+
get_view(Db, DDoc, ViewName, Args0) ->
case get_view_index_state(Db, DDoc, ViewName, Args0) of
{ok, State, Args2} ->
@@ -197,7 +232,7 @@ extract_view(Lang, #mrargs{view_type=red}=Args, Name, [View | Rest]) ->
view_sig(Db, State, View, #mrargs{include_docs=true}=Args) ->
BaseSig = view_sig(Db, State, View, Args#mrargs{include_docs=false}),
UpdateSeq = couch_db:get_update_seq(Db),
- {ok, PurgeSeq} = couch_db:get_purge_seq(Db),
+ PurgeSeq = couch_db:get_purge_seq(Db),
#mrst{
seq_indexed=SeqIndexed,
keyseq_indexed=KeySeqIndexed
@@ -231,7 +266,7 @@ view_sig_term(BaseSig, UpdateSeq, PurgeSeq, KeySeqIndexed, SeqIndexed, Args) ->
init_state(Db, Fd, #mrst{views=Views}=State, nil) ->
- {ok, PurgeSeq} = couch_db:get_purge_seq(Db),
+ PurgeSeq = couch_db:get_purge_seq(Db),
Header = #mrheader{
seq=0,
purge_seq=PurgeSeq,
diff --git a/src/couch_mrview/test/couch_mrview_purge_docs_fabric_tests.erl b/src/couch_mrview/test/couch_mrview_purge_docs_fabric_tests.erl
new file mode 100644
index 000000000..213acac0b
--- /dev/null
+++ b/src/couch_mrview/test/couch_mrview_purge_docs_fabric_tests.erl
@@ -0,0 +1,276 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_mrview_purge_docs_fabric_tests).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+
+-define(TIMEOUT, 1000).
+
+
+setup() ->
+ DbName = ?tempdb(),
+ ok = fabric:create_db(DbName, [?ADMIN_CTX, {q, 1}]),
+ meck:new(couch_mrview_index, [passthrough]),
+ meck:expect(couch_mrview_index, ensure_local_purge_docs, fun(A, B) ->
+ meck:passthrough([A, B])
+ end),
+ DbName.
+
+
+teardown(DbName) ->
+ meck:unload(),
+ ok = fabric:delete_db(DbName, [?ADMIN_CTX]).
+
+
+view_purge_fabric_test_() ->
+ {
+ "Map views",
+ {
+ setup,
+ fun() -> test_util:start_couch([fabric, mem3]) end,
+ fun test_util:stop_couch/1,
+ {
+ foreach,
+ fun setup/0,
+ fun teardown/1,
+ [
+ fun test_purge_verify_index/1,
+ fun test_purge_hook_before_compaction/1
+ ]
+ }
+ }
+ }.
+
+
+test_purge_verify_index(DbName) ->
+ ?_test(begin
+ Docs1 = couch_mrview_test_util:make_docs(normal, 5),
+ {ok, _} = fabric:update_docs(DbName, Docs1, [?ADMIN_CTX]),
+ {ok, _} = fabric:update_doc(
+ DbName,
+ couch_mrview_test_util:ddoc(map),
+ [?ADMIN_CTX]
+ ),
+
+ Result1 = fabric:query_view(DbName, <<"bar">>, <<"baz">>, #mrargs{}),
+ Expect1 = {ok, [
+ {meta, [{total, 5}, {offset, 0}]},
+ {row, [{id, <<"1">>}, {key, 1}, {value, 1}]},
+ {row, [{id, <<"2">>}, {key, 2}, {value, 2}]},
+ {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+ {row, [{id, <<"4">>}, {key, 4}, {value, 4}]},
+ {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}
+ ]},
+ ?assertEqual(Expect1, Result1),
+
+ {ok, #doc{body = {Props1}}} = get_local_purge_doc(DbName),
+ ?assertEqual(0, couch_util:get_value(<<"purge_seq">>, Props1)),
+ ShardNames = [Sh || #shard{name = Sh} <- mem3:local_shards(DbName)],
+ [ShardDbName | _Rest ] = ShardNames,
+ ?assertEqual(true, couch_mrview_index:verify_index_exists(
+ ShardDbName, Props1)),
+
+ purge_docs(DbName, [<<"1">>]),
+
+ Result2 = fabric:query_view(DbName, <<"bar">>, <<"baz">>, #mrargs{}),
+ Expect2 = {ok, [
+ {meta, [{total, 4}, {offset, 0}]},
+ {row, [{id, <<"2">>}, {key, 2}, {value, 2}]},
+ {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+ {row, [{id, <<"4">>}, {key, 4}, {value, 4}]},
+ {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}
+ ]},
+ ?assertEqual(Expect2, Result2),
+
+ {ok, #doc{body = {Props2}}} = get_local_purge_doc(DbName),
+ ?assertEqual(1, couch_util:get_value(<<"purge_seq">>, Props2)),
+ ?assertEqual(true, couch_mrview_index:verify_index_exists(
+ ShardDbName, Props2))
+ end).
+
+
+test_purge_hook_before_compaction(DbName) ->
+ ?_test(begin
+ Docs1 = couch_mrview_test_util:make_docs(normal, 5),
+ {ok, _} = fabric:update_docs(DbName, Docs1, [?ADMIN_CTX]),
+ {ok, _} = fabric:update_doc(
+ DbName,
+ couch_mrview_test_util:ddoc(map),
+ [?ADMIN_CTX]
+ ),
+
+ Result1 = fabric:query_view(DbName, <<"bar">>, <<"baz">>, #mrargs{}),
+ Expect1 = {ok, [
+ {meta, [{total, 5}, {offset, 0}]},
+ {row, [{id, <<"1">>}, {key, 1}, {value, 1}]},
+ {row, [{id, <<"2">>}, {key, 2}, {value, 2}]},
+ {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+ {row, [{id, <<"4">>}, {key, 4}, {value, 4}]},
+ {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}
+ ]},
+ ?assertEqual(Expect1, Result1),
+
+ purge_docs(DbName, [<<"1">>]),
+
+ Result2 = fabric:query_view(DbName, <<"bar">>, <<"baz">>, #mrargs{}),
+ Expect2 = {ok, [
+ {meta, [{total, 4}, {offset, 0}]},
+ {row, [{id, <<"2">>}, {key, 2}, {value, 2}]},
+ {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+ {row, [{id, <<"4">>}, {key, 4}, {value, 4}]},
+ {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}
+ ]},
+ ?assertEqual(Expect2, Result2),
+
+ {ok, #doc{body = {Props1}}} = get_local_purge_doc(DbName),
+ ?assertEqual(1, couch_util:get_value(<<"purge_seq">>, Props1)),
+
+ [ShardName | _] = local_shards(DbName),
+ couch_util:with_db(ShardName, fun(Db) ->
+ {ok, _} = couch_db:start_compact(Db)
+ end),
+ wait_compaction(ShardName, ?LINE),
+
+ ?assertEqual(ok, meck:wait(1, couch_mrview_index,
+ ensure_local_purge_docs, '_', 5000)
+ ),
+
+ % Make sure compaction didn't change the update seq
+ {ok, #doc{body = {Props1}}} = get_local_purge_doc(DbName),
+ ?assertEqual(1, couch_util:get_value(<<"purge_seq">>, Props1)),
+
+ purge_docs(DbName, [<<"2">>]),
+
+ couch_util:with_db(ShardName, fun(Db) ->
+ {ok, _} = couch_db:start_compact(Db)
+ end),
+ wait_compaction(ShardName, ?LINE),
+
+ ?assertEqual(ok, meck:wait(2, couch_mrview_index,
+ ensure_local_purge_docs, '_', 5000)
+ ),
+
+ % Make sure compaction after a purge didn't overwrite
+ % the local purge doc for the index
+ {ok, #doc{body = {Props2}}} = get_local_purge_doc(DbName),
+ ?assertEqual(1, couch_util:get_value(<<"purge_seq">>, Props2)),
+
+ % Force another update to ensure that we update
+ % the local doc appropriate after compaction
+ Result3 = fabric:query_view(DbName, <<"bar">>, <<"baz">>, #mrargs{}),
+ Expect3 = {ok, [
+ {meta, [{total, 3}, {offset, 0}]},
+ {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+ {row, [{id, <<"4">>}, {key, 4}, {value, 4}]},
+ {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}
+ ]},
+ ?assertEqual(Expect3, Result3),
+
+ {ok, #doc{body = {Props3}}} = get_local_purge_doc(DbName),
+ ?assertEqual(2, couch_util:get_value(<<"purge_seq">>, Props3)),
+
+ % Check that if the local doc doesn't exist that one
+ % is created for the index on compaction
+ delete_local_purge_doc(DbName),
+ ?assertMatch({not_found, _}, get_local_purge_doc(DbName)),
+
+ couch_util:with_db(ShardName, fun(Db) ->
+ {ok, _} = couch_db:start_compact(Db)
+ end),
+ wait_compaction(ShardName, ?LINE),
+
+ ?assertEqual(ok, meck:wait(3, couch_mrview_index,
+ ensure_local_purge_docs, '_', 5000)
+ ),
+
+ {ok, #doc{body = {Props4}}} = get_local_purge_doc(DbName),
+ ?assertEqual(2, couch_util:get_value(<<"purge_seq">>, Props4))
+ end).
+
+
+get_local_purge_doc(DbName) ->
+ {ok, DDoc} = fabric:open_doc(DbName, <<"_design/bar">>, []),
+ {ok, IdxState} = couch_mrview_util:ddoc_to_mrst(DbName, DDoc),
+ Sig = IdxState#mrst.sig,
+ HexSig = list_to_binary(couch_index_util:hexsig(Sig)),
+ DocId = couch_mrview_util:get_local_purge_doc_id(HexSig),
+ [ShardName | _] = local_shards(DbName),
+ couch_util:with_db(ShardName, fun(Db) ->
+ couch_db:open_doc(Db, DocId, [])
+ end).
+
+
+delete_local_purge_doc(DbName) ->
+ {ok, DDoc} = fabric:open_doc(DbName, <<"_design/bar">>, []),
+ {ok, IdxState} = couch_mrview_util:ddoc_to_mrst(DbName, DDoc),
+ Sig = IdxState#mrst.sig,
+ HexSig = list_to_binary(couch_index_util:hexsig(Sig)),
+ DocId = couch_mrview_util:get_local_purge_doc_id(HexSig),
+ NewDoc = #doc{id = DocId, deleted = true},
+ [ShardName | _] = local_shards(DbName),
+ couch_util:with_db(ShardName, fun(Db) ->
+ {ok, _} = couch_db:update_doc(Db, NewDoc, [])
+ end).
+
+
+get_rev(#full_doc_info{} = FDI) ->
+ #doc_info{
+ revs = [#rev_info{} = PrevRev | _]
+ } = couch_doc:to_doc_info(FDI),
+ PrevRev#rev_info.rev.
+
+
+purge_docs(DbName, DocIds) ->
+ lists:foreach(fun(DocId) ->
+ FDI = fabric:get_full_doc_info(DbName, DocId, []),
+ Rev = get_rev(FDI),
+ {ok, [{ok, _}]} = fabric:purge_docs(DbName, [{DocId, [Rev]}], [])
+ end, DocIds).
+
+
+wait_compaction(DbName, Line) ->
+ WaitFun = fun() ->
+ case is_compaction_running(DbName) of
+ true -> wait;
+ false -> ok
+ end
+ end,
+ case test_util:wait(WaitFun, 10000) of
+ timeout ->
+ erlang:error({assertion_failed, [
+ {module, ?MODULE},
+ {line, Line},
+ {reason, "Timeout waiting for database compaction"}
+ ]});
+ _ ->
+ ok
+ end.
+
+
+is_compaction_running(DbName) ->
+ {ok, DbInfo} = couch_util:with_db(DbName, fun(Db) ->
+ couch_db:get_db_info(Db)
+ end),
+ couch_util:get_value(compact_running, DbInfo).
+
+
+local_shards(DbName) ->
+ try
+ [ShardName || #shard{name = ShardName} <- mem3:local_shards(DbName)]
+ catch
+ error:database_does_not_exist ->
+ []
+ end.
diff --git a/src/couch_mrview/test/couch_mrview_purge_docs_tests.erl b/src/couch_mrview/test/couch_mrview_purge_docs_tests.erl
new file mode 100644
index 000000000..eb180b005
--- /dev/null
+++ b/src/couch_mrview/test/couch_mrview_purge_docs_tests.erl
@@ -0,0 +1,506 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_mrview_purge_docs_tests).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+
+-define(TIMEOUT, 1000).
+
+
+setup() ->
+ meck:new(couch_index_updater, [passthrough]),
+ {ok, Db} = couch_mrview_test_util:init_db(?tempdb(), map, 5),
+ Db.
+
+teardown(Db) ->
+ couch_db:close(Db),
+ couch_server:delete(couch_db:name(Db), [?ADMIN_CTX]),
+ meck:unload(),
+ ok.
+
+view_purge_test_() ->
+ {
+ "Map views",
+ {
+ setup,
+ fun test_util:start_couch/0,
+ fun test_util:stop_couch/1,
+ {
+ foreach,
+ fun setup/0,
+ fun teardown/1,
+ [
+ fun test_purge_single/1,
+ fun test_purge_partial/1,
+ fun test_purge_complete/1,
+ fun test_purge_nochange/1,
+ fun test_purge_index_reset/1,
+ fun test_purge_compact_size_check/1,
+ fun test_purge_compact_for_stale_purge_cp_without_client/1,
+ fun test_purge_compact_for_stale_purge_cp_with_client/1
+ ]
+ }
+ }
+ }.
+
+
+test_purge_single(Db) ->
+ ?_test(begin
+ Result = run_query(Db, []),
+ Expect = {ok, [
+ {meta, [{total, 5}, {offset, 0}]},
+ {row, [{id, <<"1">>}, {key, 1}, {value, 1}]},
+ {row, [{id, <<"2">>}, {key, 2}, {value, 2}]},
+ {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+ {row, [{id, <<"4">>}, {key, 4}, {value, 4}]},
+ {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}
+ ]},
+ ?assertEqual(Expect, Result),
+
+ FDI = couch_db:get_full_doc_info(Db, <<"1">>),
+ Rev = get_rev(FDI),
+ {ok, [{ok, _PRevs}]} = couch_db:purge_docs(
+ Db,
+ [{<<"UUID1">>, <<"1">>, [Rev]}]
+ ),
+ {ok, Db2} = couch_db:reopen(Db),
+
+ Result2 = run_query(Db2, []),
+ Expect2 = {ok, [
+ {meta, [{total, 4}, {offset, 0}]},
+ {row, [{id, <<"2">>}, {key, 2}, {value, 2}]},
+ {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+ {row, [{id, <<"4">>}, {key, 4}, {value, 4}]},
+ {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}
+ ]},
+ ?assertEqual(Expect2, Result2)
+ end).
+
+
+test_purge_partial(Db) ->
+ ?_test(begin
+ Result = run_query(Db, []),
+ Expect = {ok, [
+ {meta, [{total, 5}, {offset, 0}]},
+ {row, [{id, <<"1">>}, {key, 1}, {value, 1}]},
+ {row, [{id, <<"2">>}, {key, 2}, {value, 2}]},
+ {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+ {row, [{id, <<"4">>}, {key, 4}, {value, 4}]},
+ {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}
+ ]},
+ ?assertEqual(Expect, Result),
+
+ FDI1 = couch_db:get_full_doc_info(Db, <<"1">>), Rev1 = get_rev(FDI1),
+ Update = {[
+ {'_id', <<"1">>},
+ {'_rev', couch_doc:rev_to_str({1, [crypto:hash(md5, <<"1.2">>)]})},
+ {'val', 1.2}
+ ]},
+ {ok, [_Rev2]} = save_docs(Db, [Update], [replicated_changes]),
+
+ PurgeInfos = [{<<"UUID1">>, <<"1">>, [Rev1]}],
+
+ {ok, _} = couch_db:purge_docs(Db, PurgeInfos),
+ {ok, Db2} = couch_db:reopen(Db),
+
+ Result2 = run_query(Db2, []),
+ Expect2 = {ok, [
+ {meta, [{total, 5}, {offset, 0}]},
+ {row, [{id, <<"1">>}, {key, 1.2}, {value, 1.2}]},
+ {row, [{id, <<"2">>}, {key, 2}, {value, 2}]},
+ {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+ {row, [{id, <<"4">>}, {key, 4}, {value, 4}]},
+ {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}
+ ]},
+ ?assertEqual(Expect2, Result2)
+ end).
+
+
+test_purge_complete(Db) ->
+ ?_test(begin
+ Result = run_query(Db, []),
+ Expect = {ok, [
+ {meta, [{total, 5}, {offset, 0}]},
+ {row, [{id, <<"1">>}, {key, 1}, {value, 1}]},
+ {row, [{id, <<"2">>}, {key, 2}, {value, 2}]},
+ {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+ {row, [{id, <<"4">>}, {key, 4}, {value, 4}]},
+ {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}
+ ]},
+ ?assertEqual(Expect, Result),
+
+ FDI1 = couch_db:get_full_doc_info(Db, <<"1">>), Rev1 = get_rev(FDI1),
+ FDI2 = couch_db:get_full_doc_info(Db, <<"2">>), Rev2 = get_rev(FDI2),
+ FDI5 = couch_db:get_full_doc_info(Db, <<"5">>), Rev5 = get_rev(FDI5),
+
+ PurgeInfos = [
+ {<<"UUID1">>, <<"1">>, [Rev1]},
+ {<<"UUID2">>, <<"2">>, [Rev2]},
+ {<<"UUID5">>, <<"5">>, [Rev5]}
+ ],
+ {ok, _} = couch_db:purge_docs(Db, PurgeInfos),
+ {ok, Db2} = couch_db:reopen(Db),
+
+ Result2 = run_query(Db2, []),
+ Expect2 = {ok, [
+ {meta, [{total, 2}, {offset, 0}]},
+ {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+ {row, [{id, <<"4">>}, {key, 4}, {value, 4}]}
+ ]},
+ ?assertEqual(Expect2, Result2)
+ end).
+
+
+test_purge_nochange(Db) ->
+ ?_test(begin
+ Result = run_query(Db, []),
+ Expect = {ok, [
+ {meta, [{total, 5}, {offset, 0}]},
+ {row, [{id, <<"1">>}, {key, 1}, {value, 1}]},
+ {row, [{id, <<"2">>}, {key, 2}, {value, 2}]},
+ {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+ {row, [{id, <<"4">>}, {key, 4}, {value, 4}]},
+ {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}
+ ]},
+ ?assertEqual(Expect, Result),
+
+ FDI1 = couch_db:get_full_doc_info(Db, <<"1">>),
+ Rev1 = get_rev(FDI1),
+
+ PurgeInfos = [
+ {<<"UUID1">>, <<"6">>, [Rev1]}
+ ],
+ {ok, _} = couch_db:purge_docs(Db, PurgeInfos),
+ {ok, Db2} = couch_db:reopen(Db),
+
+ Result2 = run_query(Db2, []),
+ Expect2 = {ok, [
+ {meta, [{total, 5}, {offset, 0}]},
+ {row, [{id, <<"1">>}, {key, 1}, {value, 1}]},
+ {row, [{id, <<"2">>}, {key, 2}, {value, 2}]},
+ {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+ {row, [{id, <<"4">>}, {key, 4}, {value, 4}]},
+ {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}
+ ]},
+ ?assertEqual(Expect2, Result2)
+ end).
+
+
+test_purge_index_reset(Db) ->
+ ?_test(begin
+ ok = couch_db:set_purge_infos_limit(Db, 2),
+ {ok, Db1} = couch_db:reopen(Db),
+
+ Result = run_query(Db1, []),
+ Expect = {ok, [
+ {meta, [{total, 5}, {offset, 0}]},
+ {row, [{id, <<"1">>}, {key, 1}, {value, 1}]},
+ {row, [{id, <<"2">>}, {key, 2}, {value, 2}]},
+ {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+ {row, [{id, <<"4">>}, {key, 4}, {value, 4}]},
+ {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}
+ ]},
+ ?assertEqual(Expect, Result),
+
+ PurgeInfos = lists:map(fun(I) ->
+ DocId = list_to_binary(integer_to_list(I)),
+ FDI = couch_db:get_full_doc_info(Db, DocId),
+ Rev = get_rev(FDI),
+ {couch_uuids:random(), DocId, [Rev]}
+ end, lists:seq(1, 5)),
+ {ok, _} = couch_db:purge_docs(Db1, PurgeInfos),
+
+ {ok, Db2} = couch_db:reopen(Db1),
+
+ % Forcibly set the purge doc to a newer purge
+ % sequence to force an index reset. This should
+ % never happen in real life but the reset
+ % is required for correctness.
+ {ok, #doc{body = {OldProps}} = LocalDoc} = get_local_purge_doc(Db2),
+ NewPurgeSeq = {<<"purge_seq">>, 5},
+ NewProps = lists:keyreplace(<<"purge_seq">>, 1, OldProps, NewPurgeSeq),
+ RewindDoc = LocalDoc#doc{body = {NewProps}},
+ {ok, _} = couch_db:update_doc(Db2, RewindDoc, []),
+
+ % Compact the database to remove purge infos
+ {ok, _} = couch_db:start_compact(Db2),
+ wait_compaction(couch_db:name(Db), "database", ?LINE),
+
+ {ok, Db3} = couch_db:reopen(Db2),
+ Result2 = run_query(Db3, []),
+ Expect2 = {ok, [
+ {meta, [{total, 0}, {offset, 0}]}
+ ]},
+ ?assertEqual(Expect2, Result2),
+
+ % Assert that we had a reset
+ meck:wait(
+ 1,
+ couch_index_updater,
+ handle_info,
+ [{'EXIT', '_', {reset, '_'}}, '_'],
+ 5000
+ )
+ end).
+
+
+test_purge_compact_size_check(Db) ->
+ ?_test(begin
+ DbName = couch_db:name(Db),
+ Docs = couch_mrview_test_util:make_docs(normal, 6, 200),
+ {ok, Db1} = couch_mrview_test_util:save_docs(Db, Docs),
+ _Result = run_query(Db1, []),
+ DiskSizeBefore = db_disk_size(DbName),
+
+ PurgedDocsNum = 150,
+ IdsRevs = lists:foldl(fun(Id, CIdRevs) ->
+ Id1 = docid(Id),
+ FDI1 = couch_db:get_full_doc_info(Db1, Id1),
+ Rev1 = get_rev(FDI1),
+ UUID1 = uuid(Id),
+ [{UUID1, Id1, [Rev1]} | CIdRevs]
+ end, [], lists:seq(1, PurgedDocsNum)),
+ {ok, _} = couch_db:purge_docs(Db1, IdsRevs),
+
+ {ok, Db2} = couch_db:reopen(Db1),
+ _Result1 = run_query(Db2, []),
+ {ok, PurgedIdRevs} = couch_db:fold_purge_infos(
+ Db2,
+ 0,
+ fun fold_fun/2,
+ [],
+ []
+ ),
+ ?assertEqual(PurgedDocsNum, length(PurgedIdRevs)),
+ config:set("couchdb", "file_compression", "snappy", false),
+
+ {ok, Db3} = couch_db:open_int(DbName, []),
+ {ok, _CompactPid} = couch_db:start_compact(Db3),
+ wait_compaction(DbName, "database", ?LINE),
+ ok = couch_db:close(Db3),
+ DiskSizeAfter = db_disk_size(DbName),
+ ?assert(DiskSizeBefore > DiskSizeAfter)
+ end).
+
+
+test_purge_compact_for_stale_purge_cp_without_client(Db) ->
+ ?_test(begin
+ DbName = couch_db:name(Db),
+ % add more documents to database for purge
+ Docs = couch_mrview_test_util:make_docs(normal, 6, 200),
+ {ok, Db1} = couch_mrview_test_util:save_docs(Db, Docs),
+
+ % change PurgedDocsLimit to 10 from 1000 to
+ % avoid timeout of eunit test
+ PurgedDocsLimit = 10,
+ couch_db:set_purge_infos_limit(Db1, PurgedDocsLimit),
+
+ % purge 150 documents
+ PurgedDocsNum = 150,
+ PurgeInfos = lists:foldl(fun(Id, CIdRevs) ->
+ Id1 = docid(Id),
+ FDI1 = couch_db:get_full_doc_info(Db1, Id1),
+ Rev1 = get_rev(FDI1),
+ UUID1 = uuid(Id),
+ [{UUID1, Id1, [Rev1]} | CIdRevs]
+ end, [], lists:seq(1, PurgedDocsNum)),
+ {ok, _} = couch_db:purge_docs(Db1, PurgeInfos),
+
+ {ok, Db2} = couch_db:reopen(Db1),
+ {ok, PurgedIdRevs} = couch_db:fold_purge_infos(
+ Db2,
+ 0,
+ fun fold_fun/2,
+ [],
+ []
+ ),
+ ?assertEqual(PurgedDocsNum, length(PurgedIdRevs)),
+
+ % run compaction to trigger pruning of purge tree
+ {ok, Db3} = couch_db:open_int(DbName, []),
+ {ok, _CompactPid} = couch_db:start_compact(Db3),
+ wait_compaction(DbName, "database", ?LINE),
+ ok = couch_db:close(Db3),
+
+ % check the remaining purge requests in purge tree
+ {ok, Db4} = couch_db:reopen(Db3),
+ OldestPSeq = couch_db:get_oldest_purge_seq(Db4),
+ {ok, PurgedIdRevs2} = couch_db:fold_purge_infos(
+ Db4,
+ OldestPSeq - 1,
+ fun fold_fun/2,
+ [],
+ []
+ ),
+ ?assertEqual(PurgedDocsLimit, length(PurgedIdRevs2))
+ end).
+
+
+test_purge_compact_for_stale_purge_cp_with_client(Db) ->
+ ?_test(begin
+ DbName = couch_db:name(Db),
+ % add more documents to database for purge
+ Docs = couch_mrview_test_util:make_docs(normal, 6, 200),
+ {ok, Db1} = couch_mrview_test_util:save_docs(Db, Docs),
+
+ % change PurgedDocsLimit to 10 from 1000 to
+ % avoid timeout of eunit test
+ PurgedDocsLimit = 10,
+ couch_db:set_purge_infos_limit(Db1, PurgedDocsLimit),
+ _Result = run_query(Db1, []),
+
+ % first purge 30 documents
+ PurgedDocsNum1 = 30,
+ IdsRevs = lists:foldl(fun(Id, CIdRevs) ->
+ Id1 = docid(Id),
+ FDI1 = couch_db:get_full_doc_info(Db1, Id1),
+ Rev1 = get_rev(FDI1),
+ UUID1 = uuid(Id),
+ [{UUID1, Id1, [Rev1]} | CIdRevs]
+ end, [], lists:seq(1, PurgedDocsNum1)),
+ {ok, _} = couch_db:purge_docs(Db1, IdsRevs),
+
+ {ok, Db2} = couch_db:reopen(Db1),
+ % run query again to reflect purge request to mrview
+ _Result1 = run_query(Db2, []),
+ {ok, PurgedIdRevs} = couch_db:fold_purge_infos(
+ Db2,
+ 0,
+ fun fold_fun/2,
+ [],
+ []
+ ),
+ ?assertEqual(PurgedDocsNum1, length(PurgedIdRevs)),
+
+ % then purge 120 documents
+ PurgedDocsNum2 = 150,
+ IdsRevs2 = lists:foldl(fun(Id, CIdRevs) ->
+ Id1 = docid(Id),
+ FDI1 = couch_db:get_full_doc_info(Db1, Id1),
+ Rev1 = get_rev(FDI1),
+ UUID1 = uuid(Id),
+ [{UUID1, Id1, [Rev1]} | CIdRevs]
+ end, [], lists:seq(PurgedDocsNum1 + 1, PurgedDocsNum2)),
+ {ok, _} = couch_db:purge_docs(Db2, IdsRevs2),
+
+ % run compaction to trigger pruning of purge tree
+ % only the first 30 purge requests are pruned
+ {ok, Db3} = couch_db:open_int(DbName, []),
+ {ok, _CompactPid} = couch_db:start_compact(Db3),
+ wait_compaction(DbName, "database", ?LINE),
+ ok = couch_db:close(Db3),
+
+ % check the remaining purge requests in purge tree
+ {ok, Db4} = couch_db:reopen(Db3),
+ OldestPSeq = couch_db:get_oldest_purge_seq(Db4),
+ {ok, PurgedIdRevs2} = couch_db:fold_purge_infos(
+ Db4,
+ OldestPSeq - 1,
+ fun fold_fun/2,
+ [],
+ []
+ ),
+ ?assertEqual(PurgedDocsNum2 - PurgedDocsNum1, length(PurgedIdRevs2))
+ end).
+
+
+get_local_purge_doc(Db) ->
+ {ok, DDoc} = couch_db:open_doc(Db, <<"_design/bar">>, []),
+ {ok, IdxState} = couch_mrview_util:ddoc_to_mrst(couch_db:name(Db), DDoc),
+ Sig = IdxState#mrst.sig,
+ HexSig = list_to_binary(couch_index_util:hexsig(Sig)),
+ DocId = couch_mrview_util:get_local_purge_doc_id(HexSig),
+ couch_db:open_doc(Db, DocId, []).
+
+
+run_query(Db, Opts) ->
+ couch_mrview:query_view(Db, <<"_design/bar">>, <<"baz">>, Opts).
+
+
+save_docs(Db, JsonDocs, Options) ->
+ Docs = lists:map(fun(JDoc) ->
+ couch_doc:from_json_obj(?JSON_DECODE(?JSON_ENCODE(JDoc)))
+ end, JsonDocs),
+ Opts = [full_commit | Options],
+ case lists:member(replicated_changes, Options) of
+ true ->
+ {ok, []} = couch_db:update_docs(
+ Db, Docs, Opts, replicated_changes),
+ {ok, lists:map(fun(Doc) ->
+ {Pos, [RevId | _]} = Doc#doc.revs,
+ {Pos, RevId}
+ end, Docs)};
+ false ->
+ {ok, Resp} = couch_db:update_docs(Db, Docs, Opts),
+ {ok, [Rev || {ok, Rev} <- Resp]}
+ end.
+
+
+get_rev(#full_doc_info{} = FDI) ->
+ #doc_info{
+ revs = [#rev_info{} = PrevRev | _]
+ } = couch_doc:to_doc_info(FDI),
+ PrevRev#rev_info.rev.
+
+
+db_disk_size(DbName) ->
+ {ok, Db} = couch_db:open_int(DbName, []),
+ {ok, Info} = couch_db:get_db_info(Db),
+ ok = couch_db:close(Db),
+ active_size(Info).
+
+
+active_size(Info) ->
+ couch_util:get_nested_json_value({Info}, [sizes, active]).
+
+
+wait_compaction(DbName, Kind, Line) ->
+ WaitFun = fun() ->
+ case is_compaction_running(DbName) of
+ true -> wait;
+ false -> ok
+ end
+ end,
+ case test_util:wait(WaitFun, 10000) of
+ timeout ->
+ erlang:error({assertion_failed,
+ [{module, ?MODULE},
+ {line, Line},
+ {reason, "Timeout waiting for "
+ ++ Kind
+ ++ " database compaction"}]});
+ _ ->
+ ok
+ end.
+
+
+is_compaction_running(DbName) ->
+ {ok, Db} = couch_db:open_int(DbName, []),
+ {ok, DbInfo} = couch_db:get_db_info(Db),
+ couch_db:close(Db),
+ couch_util:get_value(compact_running, DbInfo).
+
+
+fold_fun({_PSeq, _UUID, Id, Revs}, Acc) ->
+ {ok, [{Id, Revs} | Acc]}.
+
+
+docid(I) ->
+ list_to_binary(integer_to_list(I)).
+
+
+uuid(I) ->
+ Str = io_lib:format("UUID~4..0b", [I]),
+ iolist_to_binary(Str).