summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul J. Davis <paul.joseph.davis@gmail.com>2018-04-24 12:26:42 -0500
committerPaul J. Davis <paul.joseph.davis@gmail.com>2018-05-30 12:41:35 -0500
commit88abd5f3bdf59dcf420af155847f1ef9d1dba19f (patch)
treec1bddd8c3d4dfc1300e8b7665d18b01067499598
parenta04229cd96d29fba1968bb9b2651a84411c78abc (diff)
downloadcouchdb-88abd5f3bdf59dcf420af155847f1ef9d1dba19f.tar.gz
[06/N] Clustered Purge: Update mrview indexes
This commit updates the mrview secondary index to properly process the new history of purge requests as well as to store the _local purge checkpoint doc. The importance of the _local checkpoint doc is to ensure that compaction of a database does not remove any purge requests that have not yet been processed by this secondary index. COUCHDB-3326 Co-authored-by: Mayya Sharipova <mayyas@ca.ibm.com> Co-authored-by: jiangphcn <jiangph@cn.ibm.com>
-rw-r--r--src/couch_index/src/couch_index_epi.erl5
-rw-r--r--src/couch_index/src/couch_index_plugin_couch_db.erl24
-rw-r--r--src/couch_index/src/couch_index_updater.erl40
-rw-r--r--src/couch_mrview/src/couch_mrview_cleanup.erl18
-rw-r--r--src/couch_mrview/src/couch_mrview_index.erl133
-rw-r--r--src/couch_mrview/src/couch_mrview_test_util.erl5
-rw-r--r--src/couch_mrview/src/couch_mrview_updater.erl14
-rw-r--r--src/couch_mrview/src/couch_mrview_util.erl23
-rw-r--r--src/couch_mrview/test/couch_mrview_purge_docs_fabric_tests.erl171
-rw-r--r--src/couch_mrview/test/couch_mrview_purge_docs_tests.erl429
10 files changed, 839 insertions, 23 deletions
diff --git a/src/couch_index/src/couch_index_epi.erl b/src/couch_index/src/couch_index_epi.erl
index 946a5906b..1c4eb9596 100644
--- a/src/couch_index/src/couch_index_epi.erl
+++ b/src/couch_index/src/couch_index_epi.erl
@@ -28,8 +28,9 @@ app() ->
couch_index.
providers() ->
- [].
-
+ [
+ {couch_db, couch_index_plugin_couch_db}
+ ].
services() ->
[
diff --git a/src/couch_index/src/couch_index_plugin_couch_db.erl b/src/couch_index/src/couch_index_plugin_couch_db.erl
new file mode 100644
index 000000000..3e3b711bc
--- /dev/null
+++ b/src/couch_index/src/couch_index_plugin_couch_db.erl
@@ -0,0 +1,24 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_index_plugin_couch_db).
+
+-export([
+ maybe_init_index_purge_state/2
+]).
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+
+
+maybe_init_index_purge_state(DbName, DDoc) ->
+ couch_mrview_index:maybe_create_local_purge_doc(DbName, DDoc).
diff --git a/src/couch_index/src/couch_index_updater.erl b/src/couch_index/src/couch_index_updater.erl
index 5ab9ea809..4856c1d9f 100644
--- a/src/couch_index/src/couch_index_updater.erl
+++ b/src/couch_index/src/couch_index_updater.erl
@@ -141,12 +141,10 @@ update(Idx, Mod, IdxState) ->
DbUpdateSeq = couch_db:get_update_seq(Db),
DbCommittedSeq = couch_db:get_committed_update_seq(Db),
- PurgedIdxState = case purge_index(Db, Mod, IdxState) of
- {ok, IdxState0} -> IdxState0;
- reset -> exit({reset, self()})
- end,
-
- NumChanges = couch_db:count_changes_since(Db, CurrSeq),
+ NumUpdateChanges = couch_db:count_changes_since(Db, CurrSeq),
+ NumPurgeChanges = count_pending_purged_docs_since(Db, Mod, IdxState),
+ TotalChanges = NumUpdateChanges + NumPurgeChanges,
+ {ok, PurgedIdxState} = purge_index(Db, Mod, IdxState),
GetSeq = fun
(#full_doc_info{update_seq=Seq}) -> Seq;
@@ -185,8 +183,13 @@ update(Idx, Mod, IdxState) ->
{ok, {NewSt, true}}
end
end,
+ {ok, InitIdxState} = Mod:start_update(
+ Idx,
+ PurgedIdxState,
+ TotalChanges,
+ NumPurgeChanges
+ ),
- {ok, InitIdxState} = Mod:start_update(Idx, PurgedIdxState, NumChanges),
Acc0 = {InitIdxState, true},
{ok, Acc} = couch_db:fold_changes(Db, CurrSeq, Proc, Acc0, []),
{ProcIdxSt, SendLast} = Acc,
@@ -209,11 +212,24 @@ purge_index(Db, Mod, IdxState) ->
{ok, DbPurgeSeq} = couch_db:get_purge_seq(Db),
IdxPurgeSeq = Mod:get(purge_seq, IdxState),
if
- DbPurgeSeq == IdxPurgeSeq ->
+ IdxPurgeSeq == DbPurgeSeq ->
{ok, IdxState};
- DbPurgeSeq == IdxPurgeSeq + 1 ->
- {ok, PurgedIdRevs} = couch_db:get_last_purged(Db),
- Mod:purge(Db, DbPurgeSeq, PurgedIdRevs, IdxState);
true ->
- reset
+ FoldFun = fun({PurgeSeq, _UUId, Id, Revs}, Acc) ->
+ Mod:purge(Db, PurgeSeq, [{Id, Revs}], Acc)
+ end,
+ {ok, NewStateAcc} = couch_db:fold_purge_infos(
+ Db,
+ IdxPurgeSeq,
+ FoldFun,
+ IdxState,
+ []
+ ),
+ Mod:update_local_purge_doc(Db, NewStateAcc),
+ {ok, NewStateAcc}
end.
+
+count_pending_purged_docs_since(Db, Mod, IdxState) ->
+ {ok, DbPurgeSeq} = couch_db:get_purge_seq(Db),
+ IdxPurgeSeq = Mod:get(purge_seq, IdxState),
+ DbPurgeSeq - IdxPurgeSeq.
diff --git a/src/couch_mrview/src/couch_mrview_cleanup.erl b/src/couch_mrview/src/couch_mrview_cleanup.erl
index 380376de2..93c9387d6 100644
--- a/src/couch_mrview/src/couch_mrview_cleanup.erl
+++ b/src/couch_mrview/src/couch_mrview_cleanup.erl
@@ -41,7 +41,23 @@ run(Db) ->
lists:foreach(fun(FN) ->
couch_log:debug("Deleting stale view file: ~s", [FN]),
- couch_file:delete(RootDir, FN, [sync])
+ couch_file:delete(RootDir, FN, [sync]),
+ Sig = couch_mrview_util:get_signature_from_filename(FN),
+ if length(Sig) < 16 -> ok; true ->
+ case re:run(Sig,"^[a-fA-F0-9]+$",[{capture, none}]) of
+ match ->
+ DocId = couch_mrview_util:get_local_purge_doc_id(Sig),
+ case couch_db:open_doc(Db, DocId, []) of
+ {ok, LocalPurgeDoc} ->
+ couch_db:update_doc(Db,
+ LocalPurgeDoc#doc{deleted=true}, [?ADMIN_CTX]);
+ {not_found, _} ->
+ ok
+ end;
+ _ ->
+ ok
+ end
+ end
end, ToDelete),
ok.
diff --git a/src/couch_mrview/src/couch_mrview_index.erl b/src/couch_mrview/src/couch_mrview_index.erl
index aa1ee2741..2b678a3c0 100644
--- a/src/couch_mrview/src/couch_mrview_index.erl
+++ b/src/couch_mrview/src/couch_mrview_index.erl
@@ -15,9 +15,11 @@
-export([get/2]).
-export([init/2, open/2, close/1, reset/1, delete/1, shutdown/1]).
--export([start_update/3, purge/4, process_doc/3, finish_update/1, commit/1]).
+-export([start_update/4, purge/4, process_doc/3, finish_update/1, commit/1]).
-export([compact/3, swap_compacted/2, remove_compacted/1]).
-export([index_file_exists/1]).
+-export([update_local_purge_doc/2, verify_index_exists/1]).
+-export([maybe_create_local_purge_doc/2]).
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_mrview/include/couch_mrview.hrl").
@@ -121,14 +123,17 @@ open(Db, State) ->
{ok, {OldSig, Header}} ->
% Matching view signatures.
NewSt = couch_mrview_util:init_state(Db, Fd, State, Header),
+ maybe_create_local_purge_doc(Db, NewSt),
{ok, NewSt};
% end of upgrade code for <= 1.2.x
{ok, {Sig, Header}} ->
% Matching view signatures.
NewSt = couch_mrview_util:init_state(Db, Fd, State, Header),
+ maybe_create_local_purge_doc(Db, NewSt),
{ok, NewSt};
_ ->
NewSt = couch_mrview_util:reset_index(Db, Fd, State),
+ maybe_create_local_purge_doc(Db, NewSt),
{ok, NewSt}
end;
{error, Reason} = Error ->
@@ -167,8 +172,13 @@ reset(State) ->
end).
-start_update(PartialDest, State, NumChanges) ->
- couch_mrview_updater:start_update(PartialDest, State, NumChanges).
+start_update(PartialDest, State, NumChanges, NumChangesDone) ->
+ couch_mrview_updater:start_update(
+ PartialDest,
+ State,
+ NumChanges,
+ NumChangesDone
+ ).
purge(Db, PurgeSeq, PurgedIdRevs, State) ->
@@ -207,3 +217,120 @@ index_file_exists(State) ->
} = State,
IndexFName = couch_mrview_util:index_file(DbName, Sig),
filelib:is_file(IndexFName).
+
+
+verify_index_exists(Options) ->
+ ShardDbName = couch_mrview_util:get_value_from_options(
+ <<"dbname">>,
+ Options
+ ),
+ DDocId = couch_mrview_util:get_value_from_options(
+ <<"ddoc_id">>,
+ Options
+ ),
+ SigInLocal = couch_mrview_util:get_value_from_options(
+ <<"signature">>,
+ Options
+ ),
+ case couch_db:open_int(ShardDbName, []) of
+ {ok, Db} ->
+ try
+ DbName = mem3:dbname(couch_db:name(Db)),
+ case ddoc_cache:open(DbName, DDocId) of
+ {ok, DDoc} ->
+ {ok, IdxState} = couch_mrview_util:ddoc_to_mrst(
+ ShardDbName,
+ DDoc
+ ),
+ IdxSig = IdxState#mrst.sig,
+ couch_index_util:hexsig(IdxSig) == SigInLocal;
+ _Else ->
+ false
+ end
+ catch E:T ->
+ Stack = erlang:get_stacktrace(),
+ couch_log:error(
+ "Error occurs when verifying existence of ~s/~s :: ~p ~p",
+ [ShardDbName, DDocId, {E, T}, Stack]
+ ),
+ false
+ after
+ catch couch_db:close(Db)
+ end;
+ _ ->
+ false
+ end.
+
+
+maybe_create_local_purge_doc(Db, #mrst{}=State) ->
+ Sig = couch_index_util:hexsig(get(signature, State)),
+ LocalPurgeDocId = couch_mrview_util:get_local_purge_doc_id(Sig),
+ case couch_db:open_doc(Db, LocalPurgeDocId, []) of
+ {not_found, _Reason} ->
+ update_local_purge_doc(Db, State);
+ {ok, _LocalPurgeDoc} ->
+ ok
+ end;
+maybe_create_local_purge_doc(DbName, #doc{}=DDoc) ->
+ {ok, Db} = case couch_db:open_int(DbName, []) of
+ {ok, _} = Resp -> Resp;
+ Else -> exit(Else)
+ end,
+ try
+ {ok, DefaultPurgeSeq} = couch_db:get_purge_seq(Db),
+ case get_index_type(DDoc, <<"views">>) of true ->
+ try couch_mrview_util:ddoc_to_mrst(DbName, DDoc) of
+ {ok, MRSt} -> update_local_purge_doc(Db, MRSt, DefaultPurgeSeq)
+ catch _:_ ->
+ ok
+ end;
+ false ->
+ ok
+ end
+ catch E:T ->
+ Stack = erlang:get_stacktrace(),
+ couch_log:error(
+ "Error occurs when creating local purge document ~p ~p",
+ [{E, T}, Stack]
+ )
+ after
+ catch couch_db:close(Db)
+ end.
+
+
+update_local_purge_doc(Db, State) ->
+ update_local_purge_doc(Db, State, get(purge_seq, State)).
+
+
+update_local_purge_doc(Db, State, PSeq) ->
+ Sig = couch_index_util:hexsig(State#mrst.sig),
+ DocId = couch_mrview_util:get_local_purge_doc_id(Sig),
+ {Mega, Secs, _} = os:timestamp(),
+ NowSecs = Mega * 1000000 + Secs,
+ BaseDoc = couch_doc:from_json_obj({[
+ {<<"_id">>, DocId},
+ {<<"type">>, <<"mrview">>},
+ {<<"purge_seq">>, PSeq},
+ {<<"updated_on">>, NowSecs},
+ {<<"verify_module">>, <<"couch_mrview_index">>},
+ {<<"verify_function">>, <<"verify_index_exists">>},
+ {<<"verify_options">>, {[
+ {<<"dbname">>, get(db_name, State)},
+ {<<"ddoc_id">>, get(idx_name, State)},
+ {<<"signature">>, Sig}
+ ]}}
+ ]}),
+ Doc = case couch_db:open_doc(Db, DocId, []) of
+ {ok, #doc{revs = Revs}} ->
+ BaseDoc#doc{revs = Revs};
+ {not_found, _} ->
+ BaseDoc
+ end,
+ couch_db:update_doc(Db, Doc, []).
+
+
+get_index_type(#doc{body={Props}}, IndexType) ->
+ case couch_util:get_value(IndexType, Props) of
+ undefined -> false;
+ _ -> true
+ end.
diff --git a/src/couch_mrview/src/couch_mrview_test_util.erl b/src/couch_mrview/src/couch_mrview_test_util.erl
index b07b07679..35ab6c673 100644
--- a/src/couch_mrview/src/couch_mrview_test_util.erl
+++ b/src/couch_mrview/src/couch_mrview_test_util.erl
@@ -49,6 +49,11 @@ make_docs(local, Count) ->
make_docs(_, Count) ->
[doc(I) || I <- lists:seq(1, Count)].
+
+make_docs(_, Since, Count) ->
+ [doc(I) || I <- lists:seq(Since, Count)].
+
+
ddoc({changes, Opts}) ->
ViewOpts = case Opts of
seq_indexed ->
diff --git a/src/couch_mrview/src/couch_mrview_updater.erl b/src/couch_mrview/src/couch_mrview_updater.erl
index 214f48793..3383b49b6 100644
--- a/src/couch_mrview/src/couch_mrview_updater.erl
+++ b/src/couch_mrview/src/couch_mrview_updater.erl
@@ -12,15 +12,14 @@
-module(couch_mrview_updater).
--export([start_update/3, purge/4, process_doc/3, finish_update/1]).
+-export([start_update/4, purge/4, process_doc/3, finish_update/1]).
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_mrview/include/couch_mrview.hrl").
-define(REM_VAL, removed).
-
-start_update(Partial, State, NumChanges) ->
+start_update(Partial, State, NumChanges, NumChangesDone) ->
MaxSize = config:get_integer("view_updater", "queue_memory_cap", 100000),
MaxItems = config:get_integer("view_updater", "queue_item_cap", 500),
QueueOpts = [{max_size, MaxSize}, {max_items, MaxItems}],
@@ -36,14 +35,19 @@ start_update(Partial, State, NumChanges) ->
},
Self = self(),
+
MapFun = fun() ->
+ Progress = case NumChanges of
+ 0 -> 0;
+ _ -> (NumChangesDone * 100) div NumChanges
+ end,
couch_task_status:add_task([
{indexer_pid, ?l2b(pid_to_list(Partial))},
{type, indexer},
{database, State#mrst.db_name},
{design_document, State#mrst.idx_name},
- {progress, 0},
- {changes_done, 0},
+ {progress, Progress},
+ {changes_done, NumChangesDone},
{total_changes, NumChanges}
]),
couch_task_status:set_update_frequency(500),
diff --git a/src/couch_mrview/src/couch_mrview_util.erl b/src/couch_mrview/src/couch_mrview_util.erl
index 0c6e5fc88..710cb28d4 100644
--- a/src/couch_mrview/src/couch_mrview_util.erl
+++ b/src/couch_mrview/src/couch_mrview_util.erl
@@ -13,6 +13,8 @@
-module(couch_mrview_util).
-export([get_view/4, get_view_index_pid/4]).
+-export([get_local_purge_doc_id/1, get_value_from_options/2]).
+-export([get_signature_from_filename/1]).
-export([ddoc_to_mrst/2, init_state/4, reset_index/3]).
-export([make_header/1]).
-export([index_file/2, compaction_file/2, open_file/1]).
@@ -41,6 +43,27 @@
-include_lib("couch_mrview/include/couch_mrview.hrl").
+get_local_purge_doc_id(Sig) ->
+ Version = "v" ++ config:get("purge", "version", "1") ++ "-",
+ ?l2b(?LOCAL_DOC_PREFIX ++ "purge-" ++ Version ++ "mrview-" ++ Sig).
+
+
+get_value_from_options(Key, Options) ->
+ case couch_util:get_value(Key, Options) of
+ undefined ->
+ Reason = binary_to_list(Key) ++ " must exist in Options.",
+ throw({bad_request, Reason});
+ Value -> Value
+ end.
+
+
+get_signature_from_filename(FileName) ->
+ FilePathList = filename:split(FileName),
+ [PureFN] = lists:nthtail(length(FilePathList) - 1, FilePathList),
+ PureFNExt = filename:extension(PureFN),
+ filename:basename(PureFN, PureFNExt).
+
+
get_view(Db, DDoc, ViewName, Args0) ->
case get_view_index_state(Db, DDoc, ViewName, Args0) of
{ok, State, Args2} ->
diff --git a/src/couch_mrview/test/couch_mrview_purge_docs_fabric_tests.erl b/src/couch_mrview/test/couch_mrview_purge_docs_fabric_tests.erl
new file mode 100644
index 000000000..b36cad534
--- /dev/null
+++ b/src/couch_mrview/test/couch_mrview_purge_docs_fabric_tests.erl
@@ -0,0 +1,171 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_mrview_purge_docs_fabric_tests).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+
+-define(TIMEOUT, 1000).
+
+
+setup() ->
+ DbName = ?tempdb(),
+ ok = fabric:create_db(DbName, [?ADMIN_CTX]),
+ meck:new(couch_mrview_index, [passthrough]),
+ meck:expect(couch_mrview_index, maybe_create_local_purge_doc, fun(A, B) ->
+ meck:passthrough([A, B])
+ end),
+ DbName.
+
+
+teardown(DbName) ->
+ meck:unload(couch_mrview_index),
+ ok = fabric:delete_db(DbName, [?ADMIN_CTX]).
+
+
+view_purge_fabric_test_() ->
+ {
+ "Map views",
+ {
+ setup,
+ fun() -> test_util:start_couch([fabric, mem3]) end,
+ fun test_util:stop_couch/1,
+ {
+ foreach,
+ fun setup/0, fun teardown/1,
+ [
+ fun test_purge_verify_index/1,
+ fun test_purge_hook_before_compaction/1
+ ]
+ }
+ }
+ }.
+
+
+test_purge_verify_index(DbName) ->
+ ?_test(begin
+ Docs1 = couch_mrview_test_util:make_docs(normal, 5),
+ {ok, _} = fabric:update_docs(DbName, Docs1, [?ADMIN_CTX]),
+ {ok, _} = fabric:update_doc(
+ DbName,
+ couch_mrview_test_util:ddoc(map),
+ [?ADMIN_CTX]
+ ),
+
+ purge_docs(DbName, [<<"1">>]),
+
+ Result2 = fabric:query_view(DbName, <<"bar">>, <<"baz">>, #mrargs{}),
+ Expect2 = {ok, [
+ {meta, [{total, 4}, {offset, 0}]},
+ {row, [{id, <<"2">>}, {key, 2}, {value, 2}]},
+ {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+ {row, [{id, <<"4">>}, {key, 4}, {value, 4}]},
+ {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}
+ ]},
+ ?assertEqual(Expect2, Result2),
+
+ {ok, DDoc} = fabric:open_doc(DbName, <<"_design/bar">>, []),
+ {ok, IdxState} = couch_mrview_util:ddoc_to_mrst(DbName, DDoc),
+ Sig = IdxState#mrst.sig,
+ HexSig = list_to_binary(couch_index_util:hexsig(Sig)),
+ DocId = couch_mrview_util:get_local_purge_doc_id(HexSig),
+ {ok, LocPurgeDoc} = fabric:open_doc(DbName, DocId, []),
+ {Props} = couch_doc:to_json_obj(LocPurgeDoc,[]),
+ {Options} = couch_util:get_value(<<"verify_options">>, Props),
+ ?assertEqual(true, couch_mrview_index:verify_index_exists(Options)),
+
+ ok
+ end).
+
+test_purge_hook_before_compaction(DbName) ->
+ ?_test(begin
+ Docs1 = couch_mrview_test_util:make_docs(normal, 5),
+ {ok, _} = fabric:update_docs(DbName, Docs1, [?ADMIN_CTX]),
+ {ok, _} = fabric:update_doc(
+ DbName,
+ couch_mrview_test_util:ddoc(map),
+ [?ADMIN_CTX]
+ ),
+
+ purge_docs(DbName, [<<"1">>]),
+
+ Result1 = fabric:query_view(DbName, <<"bar">>, <<"baz">>, #mrargs{}),
+ Expect1 = {ok, [
+ {meta, [{total, 4}, {offset, 0}]},
+ {row, [{id, <<"2">>}, {key, 2}, {value, 2}]},
+ {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+ {row, [{id, <<"4">>}, {key, 4}, {value, 4}]},
+ {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}
+ ]},
+ ?assertEqual(Expect1, Result1),
+
+ [FirstShDbName | _RestShDbNames] = local_shards(DbName),
+ {ok, Db} = couch_db:open_int(FirstShDbName, []),
+ {ok, _CompactPid} = couch_db:start_compact(Db),
+ wait_compaction(FirstShDbName, "database", ?LINE),
+ ok = couch_db:close(Db),
+
+ ?assertEqual(ok, meck:wait(1, couch_mrview_index,
+ maybe_create_local_purge_doc, '_', 5000)
+ ),
+ ok
+ end).
+
+get_rev(#full_doc_info{} = FDI) ->
+ #doc_info{
+ revs = [#rev_info{} = PrevRev | _]
+ } = couch_doc:to_doc_info(FDI),
+ PrevRev#rev_info.rev.
+
+
+purge_docs(DbName, DocIds) ->
+ lists:foreach(fun(DocId) ->
+ FDI = fabric:get_full_doc_info(DbName, DocId, []),
+ Rev = get_rev(FDI),
+ {ok, [{ok, _}]} = fabric:purge_docs(DbName, [{DocId, [Rev]}], [])
+ end, DocIds).
+
+wait_compaction(DbName, Kind, Line) ->
+ WaitFun = fun() ->
+ case is_compaction_running(DbName) of
+ true -> wait;
+ false -> ok
+ end
+ end,
+ case test_util:wait(WaitFun, 10000) of
+ timeout ->
+ erlang:error({assertion_failed,
+ [{module, ?MODULE},
+ {line, Line},
+ {reason, "Timeout waiting for "
+ ++ Kind
+ ++ " database compaction"}]});
+ _ ->
+ ok
+ end.
+
+is_compaction_running(DbName) ->
+ {ok, Db} = couch_db:open_int(DbName, []),
+ {ok, DbInfo} = couch_db:get_db_info(Db),
+ couch_db:close(Db),
+ couch_util:get_value(compact_running, DbInfo).
+
+local_shards(DbName) ->
+ try
+ [ShardName || #shard{name = ShardName} <- mem3:local_shards(DbName)]
+ catch
+ error:database_does_not_exist ->
+ []
+ end.
diff --git a/src/couch_mrview/test/couch_mrview_purge_docs_tests.erl b/src/couch_mrview/test/couch_mrview_purge_docs_tests.erl
new file mode 100644
index 000000000..183c31ab5
--- /dev/null
+++ b/src/couch_mrview/test/couch_mrview_purge_docs_tests.erl
@@ -0,0 +1,429 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_mrview_purge_docs_tests).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+
+-define(TIMEOUT, 1000).
+
+
+setup() ->
+ {ok, Db} = couch_mrview_test_util:init_db(?tempdb(), map, 5),
+ Db.
+
+teardown(Db) ->
+ couch_db:close(Db),
+ couch_server:delete(couch_db:name(Db), [?ADMIN_CTX]),
+ ok.
+
+view_purge_test_() ->
+ {
+ "Map views",
+ {
+ setup,
+ fun test_util:start_couch/0, fun test_util:stop_couch/1,
+ {
+ foreach,
+ fun setup/0, fun teardown/1,
+ [
+ fun test_purge_single/1,
+ fun test_purge_partial/1,
+ fun test_purge_complete/1,
+ fun test_purge_nochange/1,
+ fun test_purge_compact_size_check/1,
+ fun test_purge_compact_for_stale_purge_cp_without_client/1,
+ fun test_purge_compact_for_stale_purge_cp_with_client/1
+ ]
+ }
+ }
+ }.
+
+
+test_purge_single(Db) ->
+ ?_test(begin
+ Result = run_query(Db, []),
+ Expect = {ok, [
+ {meta, [{total, 5}, {offset, 0}]},
+ {row, [{id, <<"1">>}, {key, 1}, {value, 1}]},
+ {row, [{id, <<"2">>}, {key, 2}, {value, 2}]},
+ {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+ {row, [{id, <<"4">>}, {key, 4}, {value, 4}]},
+ {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}
+ ]},
+ ?assertEqual(Expect, Result),
+
+ FDI = couch_db:get_full_doc_info(Db, <<"1">>),
+ Rev = get_rev(FDI),
+ {ok, [{ok, _PRevs}]} = couch_db:purge_docs(
+ Db,
+ [{<<"UUID1">>, <<"1">>, [Rev]}]
+ ),
+ {ok, Db2} = couch_db:reopen(Db),
+
+ Result2 = run_query(Db2, []),
+ Expect2 = {ok, [
+ {meta, [{total, 4}, {offset, 0}]},
+ {row, [{id, <<"2">>}, {key, 2}, {value, 2}]},
+ {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+ {row, [{id, <<"4">>}, {key, 4}, {value, 4}]},
+ {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}
+ ]},
+ ?assertEqual(Expect2, Result2),
+
+ ok
+ end).
+
+
+test_purge_partial(Db) ->
+ ?_test(begin
+ Result = run_query(Db, []),
+ Expect = {ok, [
+ {meta, [{total, 5}, {offset, 0}]},
+ {row, [{id, <<"1">>}, {key, 1}, {value, 1}]},
+ {row, [{id, <<"2">>}, {key, 2}, {value, 2}]},
+ {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+ {row, [{id, <<"4">>}, {key, 4}, {value, 4}]},
+ {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}
+ ]},
+ ?assertEqual(Expect, Result),
+
+ FDI1 = couch_db:get_full_doc_info(Db, <<"1">>), Rev1 = get_rev(FDI1),
+ Update = {[
+ {'_id', <<"1">>},
+ {'_rev', couch_doc:rev_to_str({1, [crypto:hash(md5, <<"1.2">>)]})},
+ {'val', 1.2}
+ ]},
+ {ok, [_Rev2]} = save_docs(Db, [Update], [replicated_changes]),
+
+ PurgeInfos = [{<<"UUID1">>, <<"1">>, [Rev1]}],
+
+ {ok, _} = couch_db:purge_docs(Db, PurgeInfos),
+ {ok, Db2} = couch_db:reopen(Db),
+
+ Result2 = run_query(Db2, []),
+ Expect2 = {ok, [
+ {meta, [{total, 5}, {offset, 0}]},
+ {row, [{id, <<"1">>}, {key, 1.2}, {value, 1.2}]},
+ {row, [{id, <<"2">>}, {key, 2}, {value, 2}]},
+ {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+ {row, [{id, <<"4">>}, {key, 4}, {value, 4}]},
+ {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}
+ ]},
+ ?assertEqual(Expect2, Result2),
+
+ ok
+ end).
+
+test_purge_complete(Db) ->
+ ?_test(begin
+ Result = run_query(Db, []),
+ Expect = {ok, [
+ {meta, [{total, 5}, {offset, 0}]},
+ {row, [{id, <<"1">>}, {key, 1}, {value, 1}]},
+ {row, [{id, <<"2">>}, {key, 2}, {value, 2}]},
+ {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+ {row, [{id, <<"4">>}, {key, 4}, {value, 4}]},
+ {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}
+ ]},
+ ?assertEqual(Expect, Result),
+
+ FDI1 = couch_db:get_full_doc_info(Db, <<"1">>), Rev1 = get_rev(FDI1),
+ FDI2 = couch_db:get_full_doc_info(Db, <<"2">>), Rev2 = get_rev(FDI2),
+ FDI5 = couch_db:get_full_doc_info(Db, <<"5">>), Rev5 = get_rev(FDI5),
+
+ PurgeInfos = [
+ {<<"UUID1">>, <<"1">>, [Rev1]},
+ {<<"UUID2">>, <<"2">>, [Rev2]},
+ {<<"UUID5">>, <<"5">>, [Rev5]}
+ ],
+ {ok, _} = couch_db:purge_docs(Db, PurgeInfos),
+ {ok, Db2} = couch_db:reopen(Db),
+
+ Result2 = run_query(Db2, []),
+ Expect2 = {ok, [
+ {meta, [{total, 2}, {offset, 0}]},
+ {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+ {row, [{id, <<"4">>}, {key, 4}, {value, 4}]}
+ ]},
+ ?assertEqual(Expect2, Result2),
+
+ ok
+ end).
+
+
+test_purge_nochange(Db) ->
+ ?_test(begin
+ Result = run_query(Db, []),
+ Expect = {ok, [
+ {meta, [{total, 5}, {offset, 0}]},
+ {row, [{id, <<"1">>}, {key, 1}, {value, 1}]},
+ {row, [{id, <<"2">>}, {key, 2}, {value, 2}]},
+ {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+ {row, [{id, <<"4">>}, {key, 4}, {value, 4}]},
+ {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}
+ ]},
+ ?assertEqual(Expect, Result),
+
+ FDI1 = couch_db:get_full_doc_info(Db, <<"1">>),
+ Rev1 = get_rev(FDI1),
+
+ PurgeInfos = [
+ {<<"UUID1">>, <<"6">>, [Rev1]}
+ ],
+ {ok, _} = couch_db:purge_docs(Db, PurgeInfos),
+ {ok, Db2} = couch_db:reopen(Db),
+
+ Result2 = run_query(Db2, []),
+ Expect2 = {ok, [
+ {meta, [{total, 5}, {offset, 0}]},
+ {row, [{id, <<"1">>}, {key, 1}, {value, 1}]},
+ {row, [{id, <<"2">>}, {key, 2}, {value, 2}]},
+ {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+ {row, [{id, <<"4">>}, {key, 4}, {value, 4}]},
+ {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}
+ ]},
+ ?assertEqual(Expect2, Result2),
+
+ ok
+ end).
+
+
+test_purge_compact_size_check(Db) ->
+ ?_test(begin
+ DbName = couch_db:name(Db),
+ Docs = couch_mrview_test_util:make_docs(normal, 6, 200),
+ {ok, Db1} = couch_mrview_test_util:save_docs(Db, Docs),
+ _Result = run_query(Db1, []),
+ DiskSizeBefore = db_disk_size(DbName),
+
+ PurgedDocsNum = 150,
+ IdsRevs = lists:foldl(fun(Id, CIdRevs) ->
+ Id1 = docid(Id),
+ FDI1 = couch_db:get_full_doc_info(Db1, Id1),
+ Rev1 = get_rev(FDI1),
+ UUID1 = uuid(Id),
+ [{UUID1, Id1, [Rev1]} | CIdRevs]
+ end, [], lists:seq(1, PurgedDocsNum)),
+ {ok, _} = couch_db:purge_docs(Db1, IdsRevs),
+
+ {ok, Db2} = couch_db:reopen(Db1),
+ _Result1 = run_query(Db2, []),
+ {ok, PurgedIdRevs} = couch_db:fold_purge_infos(
+ Db2,
+ 0,
+ fun fold_fun/2,
+ [],
+ []
+ ),
+ ?assertEqual(PurgedDocsNum, length(PurgedIdRevs)),
+ config:set("couchdb", "file_compression", "snappy", false),
+
+ {ok, Db3} = couch_db:open_int(DbName, []),
+ {ok, _CompactPid} = couch_db:start_compact(Db3),
+ wait_compaction(DbName, "database", ?LINE),
+ ok = couch_db:close(Db3),
+ DiskSizeAfter = db_disk_size(DbName),
+ ?assert(DiskSizeBefore > DiskSizeAfter),
+
+ ok
+ end).
+
+
+test_purge_compact_for_stale_purge_cp_without_client(Db) ->
+ ?_test(begin
+ DbName = couch_db:name(Db),
+ % add more documents to database for purge
+ Docs = couch_mrview_test_util:make_docs(normal, 6, 200),
+ {ok, Db1} = couch_mrview_test_util:save_docs(Db, Docs),
+
+ % change PurgedDocsLimit to 10 from 1000 to
+ % avoid timeout of eunit test
+ PurgedDocsLimit = 10,
+ couch_db:set_purge_infos_limit(Db1, PurgedDocsLimit),
+
+ % purge 150 documents
+ PurgedDocsNum = 150,
+ PurgeInfos = lists:foldl(fun(Id, CIdRevs) ->
+ Id1 = docid(Id),
+ FDI1 = couch_db:get_full_doc_info(Db1, Id1),
+ Rev1 = get_rev(FDI1),
+ UUID1 = uuid(Id),
+ [{UUID1, Id1, [Rev1]} | CIdRevs]
+ end, [], lists:seq(1, PurgedDocsNum)),
+ {ok, _} = couch_db:purge_docs(Db1, PurgeInfos),
+
+ {ok, Db2} = couch_db:reopen(Db1),
+ {ok, PurgedIdRevs} = couch_db:fold_purge_infos(
+ Db2,
+ 0,
+ fun fold_fun/2,
+ [],
+ []
+ ),
+ ?assertEqual(PurgedDocsNum, length(PurgedIdRevs)),
+
+ % run compaction to trigger pruning of purge tree
+ {ok, Db3} = couch_db:open_int(DbName, []),
+ {ok, _CompactPid} = couch_db:start_compact(Db3),
+ wait_compaction(DbName, "database", ?LINE),
+ ok = couch_db:close(Db3),
+
+ % check the remaining purge requests in purge tree
+ {ok, Db4} = couch_db:reopen(Db3),
+ {ok, OldestPSeq} = couch_db:get_oldest_purge_seq(Db4),
+ {ok, PurgedIdRevs2} = couch_db:fold_purge_infos(
+ Db4,
+ OldestPSeq - 1,
+ fun fold_fun/2,
+ [],
+ []
+ ),
+ ?assertEqual(PurgedDocsLimit, length(PurgedIdRevs2)),
+
+ ok
+ end).
+
+
+test_purge_compact_for_stale_purge_cp_with_client(Db) ->
+ ?_test(begin
+ DbName = couch_db:name(Db),
+ % add more documents to database for purge
+ Docs = couch_mrview_test_util:make_docs(normal, 6, 200),
+ {ok, Db1} = couch_mrview_test_util:save_docs(Db, Docs),
+
+ % change PurgedDocsLimit to 10 from 1000 to
+ % avoid timeout of eunit test
+ PurgedDocsLimit = 10,
+ couch_db:set_purge_infos_limit(Db1, PurgedDocsLimit),
+ _Result = run_query(Db1, []),
+
+ % purge 150 documents
+ PurgedDocsNum = 150,
+ IdsRevs = lists:foldl(fun(Id, CIdRevs) ->
+ Id1 = docid(Id),
+ FDI1 = couch_db:get_full_doc_info(Db1, Id1),
+ Rev1 = get_rev(FDI1),
+ UUID1 = uuid(Id),
+ [{UUID1, Id1, [Rev1]} | CIdRevs]
+ end, [], lists:seq(1, PurgedDocsNum)),
+ {ok, _} = couch_db:purge_docs(Db1, IdsRevs),
+
+ % run query again to reflect purge requests
+ % to mrview
+ {ok, Db2} = couch_db:reopen(Db1),
+ _Result1 = run_query(Db2, []),
+ {ok, PurgedIdRevs} = couch_db:fold_purge_infos(
+ Db2,
+ 0,
+ fun fold_fun/2,
+ [],
+ []
+ ),
+ ?assertEqual(PurgedDocsNum, length(PurgedIdRevs)),
+
+ % run compaction to trigger pruning of purge tree
+ {ok, Db3} = couch_db:open_int(DbName, []),
+ {ok, _CompactPid} = couch_db:start_compact(Db3),
+ wait_compaction(DbName, "database", ?LINE),
+ ok = couch_db:close(Db3),
+
+ % check the remaining purge requests in purge tree
+ {ok, Db4} = couch_db:reopen(Db3),
+ {ok, OldestPSeq} = couch_db:get_oldest_purge_seq(Db4),
+ {ok, PurgedIdRevs2} = couch_db:fold_purge_infos(
+ Db4,
+ OldestPSeq - 1,
+ fun fold_fun/2,
+ [],
+ []
+ ),
+ ?assertEqual(PurgedDocsLimit, length(PurgedIdRevs2)),
+
+ ok
+ end).
+
+
+run_query(Db, Opts) ->
+ couch_mrview:query_view(Db, <<"_design/bar">>, <<"baz">>, Opts).
+
+
+save_docs(Db, JsonDocs, Options) ->
+ Docs = lists:map(fun(JDoc) ->
+ couch_doc:from_json_obj(?JSON_DECODE(?JSON_ENCODE(JDoc)))
+ end, JsonDocs),
+ Opts = [full_commit | Options],
+ case lists:member(replicated_changes, Options) of
+ true ->
+ {ok, []} = couch_db:update_docs(
+ Db, Docs, Opts, replicated_changes),
+ {ok, lists:map(fun(Doc) ->
+ {Pos, [RevId | _]} = Doc#doc.revs,
+ {Pos, RevId}
+ end, Docs)};
+ false ->
+ {ok, Resp} = couch_db:update_docs(Db, Docs, Opts),
+ {ok, [Rev || {ok, Rev} <- Resp]}
+ end.
+
+
+get_rev(#full_doc_info{} = FDI) ->
+ #doc_info{
+ revs = [#rev_info{} = PrevRev | _]
+ } = couch_doc:to_doc_info(FDI),
+ PrevRev#rev_info.rev.
+
+db_disk_size(DbName) ->
+ {ok, Db} = couch_db:open_int(DbName, []),
+ {ok, Info} = couch_db:get_db_info(Db),
+ ok = couch_db:close(Db),
+ active_size(Info).
+
+active_size(Info) ->
+ couch_util:get_nested_json_value({Info}, [sizes, active]).
+
+wait_compaction(DbName, Kind, Line) ->
+ WaitFun = fun() ->
+ case is_compaction_running(DbName) of
+ true -> wait;
+ false -> ok
+ end
+ end,
+ case test_util:wait(WaitFun, 10000) of
+ timeout ->
+ erlang:error({assertion_failed,
+ [{module, ?MODULE},
+ {line, Line},
+ {reason, "Timeout waiting for "
+ ++ Kind
+ ++ " database compaction"}]});
+ _ ->
+ ok
+ end.
+
+is_compaction_running(DbName) ->
+ {ok, Db} = couch_db:open_int(DbName, []),
+ {ok, DbInfo} = couch_db:get_db_info(Db),
+ couch_db:close(Db),
+ couch_util:get_value(compact_running, DbInfo).
+
+fold_fun({_PSeq, _UUID, Id, Revs}, Acc) ->
+ {ok, [{Id, Revs} | Acc]}.
+
+docid(I) ->
+ list_to_binary(integer_to_list(I)).
+
+uuid(I) ->
+ Str = io_lib:format("UUID~4..0b", [I]),
+ iolist_to_binary(Str).