summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul J. Davis <paul.joseph.davis@gmail.com>2017-09-06 16:26:29 -0500
committerPaul J. Davis <paul.joseph.davis@gmail.com>2020-04-14 11:26:13 -0500
commit1caf374bdc89fbe41ecb0908ccb7c170640612af (patch)
tree550caa691218a25726db3b2d67e3217041f75eb1
parentfbe5ba5b9cb6df984ce12e25d92eafe1af370fff (diff)
downloadcouchdb-1caf374bdc89fbe41ecb0908ccb7c170640612af.tar.gz
Implement compactor test suite
-rw-r--r--src/couch/src/couch_bt_engine_compactor.erl23
-rw-r--r--src/couch/test/eunit/couch_bt_engine_compactor_ev.erl106
-rw-r--r--src/couch/test/eunit/couch_bt_engine_compactor_ev_tests.erl335
3 files changed, 464 insertions, 0 deletions
diff --git a/src/couch/src/couch_bt_engine_compactor.erl b/src/couch/src/couch_bt_engine_compactor.erl
index 3979fcbb5..50b3981d5 100644
--- a/src/couch/src/couch_bt_engine_compactor.erl
+++ b/src/couch/src/couch_bt_engine_compactor.erl
@@ -44,13 +44,22 @@
}).
+-ifdef(TEST).
+-define(COMP_EVENT(Name), couch_bt_engine_compactor_ev:event(Name)).
+-else.
+-define(COMP_EVENT(Name), ignore).
+-endif.
+
+
start(#st{} = St, DbName, Options, Parent) ->
erlang:put(io_priority, {db_compact, DbName}),
couch_log:debug("Compaction process spawned for db \"~s\"", [DbName]),
couch_db_engine:trigger_on_compact(DbName),
+ ?COMP_EVENT(init),
{ok, InitCompSt} = open_compaction_files(DbName, St, Options),
+ ?COMP_EVENT(files_opened),
Stages = [
fun copy_purge_info/1,
@@ -74,6 +83,7 @@ start(#st{} = St, DbName, Options, Parent) ->
ok = couch_bt_engine:decref(FinalNewSt),
ok = couch_file:close(MetaFd),
+ ?COMP_EVENT(before_notify),
Msg = {compact_done, couch_bt_engine, FinalNewSt#st.filepath},
gen_server:cast(Parent, Msg).
@@ -146,6 +156,7 @@ copy_purge_info(#comp_st{} = CompSt) ->
new_st = NewSt,
retry = Retry
} = CompSt,
+ ?COMP_EVENT(purge_init),
MinPurgeSeq = couch_util:with_db(DbName, fun(Db) ->
couch_db:get_minimum_purge_seq(Db)
end),
@@ -180,6 +191,7 @@ copy_purge_info(#comp_st{} = CompSt) ->
{NewStAcc, Infos, _, _} = FinalAcc,
FinalNewSt = copy_purge_infos(OldSt, NewStAcc, Infos, MinPurgeSeq, Retry),
+ ?COMP_EVENT(purge_done),
CompSt#comp_st{
new_st = FinalNewSt
}.
@@ -322,6 +334,7 @@ copy_compact(#comp_st{} = CompSt) ->
couch_task_status:set_update_frequency(500)
end,
+ ?COMP_EVENT(seq_init),
{ok, _, {NewSt2, Uncopied, _, _}} =
couch_btree:foldl(St#st.seq_tree, EnumBySeqFun,
{NewSt, [], 0, 0},
@@ -329,6 +342,8 @@ copy_compact(#comp_st{} = CompSt) ->
NewSt3 = copy_docs(St, NewSt2, lists:reverse(Uncopied), Retry),
+ ?COMP_EVENT(seq_done),
+
% Copy the security information over
SecProps = couch_bt_engine:get_security(St),
{ok, NewSt4} = couch_bt_engine:copy_security(NewSt3, SecProps),
@@ -392,6 +407,7 @@ copy_docs(St, #st{} = NewSt, MixedInfos, Retry) ->
TotalAttSize = lists:foldl(fun({_, S}, A) -> S + A end, 0, FinalAtts),
NewActiveSize = FinalAS + TotalAttSize,
NewExternalSize = FinalES + TotalAttSize,
+ ?COMP_EVENT(seq_copy),
Info#full_doc_info{
rev_tree = NewRevTree,
sizes = #size_info{
@@ -478,7 +494,9 @@ copy_doc_attachments(#st{} = SrcSt, SrcSp, DstSt) ->
sort_meta_data(#comp_st{new_st = St0} = CompSt) ->
+ ?COMP_EVENT(md_sort_init),
{ok, Ems} = couch_emsort:merge(St0#st.id_tree),
+ ?COMP_EVENT(md_sort_done),
CompSt#comp_st{
new_st = St0#st{
id_tree = Ems
@@ -505,11 +523,13 @@ copy_meta_data(#comp_st{new_st = St} = CompSt) ->
rem_seqs=[],
infos=[]
},
+ ?COMP_EVENT(md_copy_init),
Acc = merge_docids(Iter, Acc0),
{ok, IdTree} = couch_btree:add(Acc#merge_st.id_tree, Acc#merge_st.infos),
{ok, SeqTree} = couch_btree:add_remove(
Acc#merge_st.seq_tree, [], Acc#merge_st.rem_seqs
),
+ ?COMP_EVENT(md_copy_done),
CompSt#comp_st{
new_st = St#st{
id_tree = IdTree,
@@ -519,7 +539,9 @@ copy_meta_data(#comp_st{new_st = St} = CompSt) ->
compact_final_sync(#comp_st{new_st = St0} = CompSt) ->
+ ?COMP_EVENT(before_final_sync),
{ok, St1} = couch_bt_engine:commit_data(St0),
+ ?COMP_EVENT(after_final_sync),
CompSt#comp_st{
new_st = St1
}.
@@ -628,6 +650,7 @@ merge_docids(Iter, #merge_st{curr=Curr}=Acc) ->
rem_seqs = Seqs ++ Acc#merge_st.rem_seqs,
curr = NewCurr
},
+ ?COMP_EVENT(md_copy_row),
merge_docids(NextIter, Acc1);
{finished, FDI, Seqs} ->
Acc#merge_st{
diff --git a/src/couch/test/eunit/couch_bt_engine_compactor_ev.erl b/src/couch/test/eunit/couch_bt_engine_compactor_ev.erl
new file mode 100644
index 000000000..f50be84de
--- /dev/null
+++ b/src/couch/test/eunit/couch_bt_engine_compactor_ev.erl
@@ -0,0 +1,106 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_bt_engine_compactor_ev).
+
+
+-export([
+ init/0,
+ terminate/0,
+ clear/0,
+
+ set_wait/1,
+ set_crash/1,
+
+ event/1
+]).
+
+
+-define(TAB, couch_db_updater_ev_tab).
+
+
+init() ->
+ ets:new(?TAB, [set, public, named_table]).
+
+
+terminate() ->
+ ets:delete(?TAB).
+
+
+clear() ->
+ ets:delete_all_objects(?TAB).
+
+
+set_wait(Event) ->
+ Self = self(),
+ WaitFun = fun(_) ->
+ receive
+ {Self, go} ->
+ Self ! {self(), ok}
+ end,
+ ets:delete(?TAB, Event)
+ end,
+ ContinueFun = fun(Pid) ->
+ Pid ! {Self, go},
+ receive {Pid, ok} -> ok end
+ end,
+ ets:insert(?TAB, {Event, WaitFun}),
+ {ok, ContinueFun}.
+
+
+set_crash(Event) ->
+ Reason = {couch_db_updater_ev_crash, Event},
+ CrashFun = fun(_) -> exit(Reason) end,
+ ets:insert(?TAB, {Event, CrashFun}),
+ {ok, Reason}.
+
+
+event(Event) ->
+ NewEvent = case Event of
+ seq_init ->
+ put(?MODULE, 0),
+ Event;
+ seq_copy ->
+ Count = get(?MODULE),
+ put(?MODULE, Count + 1),
+ {seq_copy, Count};
+ id_init ->
+ put(?MODULE, 0),
+ Event;
+ id_copy ->
+ Count = get(?MODULE),
+ put(?MODULE, Count + 1),
+ {id_copy, Count};
+ md_copy_init ->
+ put(?MODULE, 0),
+ Event;
+ md_copy_row ->
+ Count = get(?MODULE),
+ put(?MODULE, Count + 1),
+ {md_copy_row, Count};
+ _ ->
+ Event
+ end,
+ handle_event(NewEvent).
+
+
+handle_event(Event) ->
+ try
+ case ets:lookup(?TAB, Event) of
+ [{Event, ActionFun}] ->
+ ActionFun(Event);
+ [] ->
+ ok
+ end
+ catch error:badarg ->
+ ok
+ end. \ No newline at end of file
diff --git a/src/couch/test/eunit/couch_bt_engine_compactor_ev_tests.erl b/src/couch/test/eunit/couch_bt_engine_compactor_ev_tests.erl
new file mode 100644
index 000000000..188078c2d
--- /dev/null
+++ b/src/couch/test/eunit/couch_bt_engine_compactor_ev_tests.erl
@@ -0,0 +1,335 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_bt_engine_compactor_ev_tests).
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/src/couch_server_int.hrl").
+
+-define(TIMEOUT_EUNIT, 60).
+-define(EV_MOD, couch_bt_engine_compactor_ev).
+-define(INIT_DOCS, 2500).
+-define(WRITE_DOCS, 20).
+
+% The idea behind the tests in this module are to attempt to
+% cover the number of restart/recopy events during compaction
+% so that we can be as sure as possible that the compactor
+% is resilient to errors in the face of external conditions
+% (i.e., the VM rebooted). The single linear pass is easy enough
+% to prove, however restarting is important enough that we don't
+% want to waste work if a VM happens to bounce a lot.
+%
+% To try and cover as many restart situations we have created a
+% number of events in the compactor code that are present during
+% a test compiled version of the module. These events can then
+% be used (via meck) to introduce errors and coordinate writes
+% to the database while compaction is in progress.
+
+% This list of events is where we'll insert our errors.
+
+events() ->
+ [
+ init, % The compactor process is spawned
+ files_opened, % After compaction files have opened
+
+ purge_init, % Just before apply purge changes
+ purge_done, % Just after finish purge updates
+
+ % The firs phase is when we copy all document body and attachment
+ % data to the new database file in order of update sequence so
+ % that we can resume on crash.
+
+ seq_init, % Before the first change is copied
+ {seq_copy, 0}, % After change N is copied
+ {seq_copy, ?INIT_DOCS div 2},
+ {seq_copy, ?INIT_DOCS - 2},
+ seq_done, % After last change is copied
+
+ % The id copy phases come in two flavors. Before a compaction
+ % swap is attempted they're copied from the id_tree in the
+ % database being compacted. After a swap attempt they are
+ % stored in an emsort file on disk. Thus the two sets of
+ % related events here.
+
+ md_sort_init, % Just before metadata sort starts
+ md_sort_done, % Justa after metadata sort finished
+ md_copy_init, % Just before metadata copy starts
+ {md_copy_row, 0}, % After docid N is copied
+ {md_copy_row, ?INIT_DOCS div 2},
+ {md_copy_row, ?INIT_DOCS - 2},
+ md_copy_done, % Just after the last docid is copied
+
+ % And then the final steps before we finish
+
+ before_final_sync, % Just before final sync
+ after_final_sync, % Just after the final sync
+ before_notify % Just before the final notification
+ ].
+
+% Mark which evens only happen when documents are present
+
+requires_docs({seq_copy, _}) -> true;
+requires_docs(md_sort_init) -> true;
+requires_docs(md_sort_done) -> true;
+requires_docs(md_copy_init) -> true;
+requires_docs({md_copy_row, _}) -> true;
+requires_docs(md_copy_done) -> true;
+requires_docs(_) -> false.
+
+
+% Mark which events only happen when there's write activity during
+% a compaction.
+
+requires_write(md_sort_init) -> true;
+requires_write(md_sort_done) -> true;
+requires_write(md_copy_init) -> true;
+requires_write({md_copy_row, _}) -> true;
+requires_write(md_copy_done) -> true;
+requires_write(_) -> false.
+
+
+setup() ->
+ purge_module(),
+ ?EV_MOD:init(),
+ test_util:start_couch().
+
+
+teardown(Ctx) ->
+ test_util:stop_couch(Ctx),
+ ?EV_MOD:terminate().
+
+
+start_empty_db_test(_Event) ->
+ ?EV_MOD:clear(),
+ DbName = ?tempdb(),
+ {ok, _} = couch_db:create(DbName, [?ADMIN_CTX]),
+ DbName.
+
+
+start_populated_db_test(Event) ->
+ DbName = start_empty_db_test(Event),
+ {ok, Db} = couch_db:open_int(DbName, []),
+ try
+ populate_db(Db, ?INIT_DOCS)
+ after
+ couch_db:close(Db)
+ end,
+ DbName.
+
+
+stop_test(_Event, DbName) ->
+ couch_server:delete(DbName, [?ADMIN_CTX]).
+
+
+static_empty_db_test_() ->
+ FiltFun = fun(E) ->
+ not (requires_docs(E) or requires_write(E))
+ end,
+ Events = lists:filter(FiltFun, events()) -- [init],
+ {
+ "Idle empty database",
+ {
+ setup,
+ fun setup/0,
+ fun teardown/1,
+ [
+ {
+ foreachx,
+ fun start_empty_db_test/1,
+ fun stop_test/2,
+ [{Event, fun run_static_init/2} || Event <- Events]
+ }
+ ]
+ }
+ }.
+
+
+static_populated_db_test_() ->
+ FiltFun = fun(E) -> not requires_write(E) end,
+ Events = lists:filter(FiltFun, events()) -- [init],
+ {
+ "Idle populated database",
+ {
+ setup,
+ fun setup/0,
+ fun teardown/1,
+ [
+ {
+ foreachx,
+ fun start_populated_db_test/1,
+ fun stop_test/2,
+ [{Event, fun run_static_init/2} || Event <- Events]
+ }
+ ]
+ }
+ }.
+
+
+dynamic_empty_db_test_() ->
+ FiltFun = fun(E) -> not requires_docs(E) end,
+ Events = lists:filter(FiltFun, events()) -- [init],
+ {
+ "Writes to empty database",
+ {
+ setup,
+ fun setup/0,
+ fun teardown/1,
+ [
+ {
+ foreachx,
+ fun start_empty_db_test/1,
+ fun stop_test/2,
+ [{Event, fun run_dynamic_init/2} || Event <- Events]
+ }
+ ]
+ }
+ }.
+
+
+dynamic_populated_db_test_() ->
+ Events = events() -- [init],
+ {
+ "Writes to populated database",
+ {
+ setup,
+ fun setup/0,
+ fun teardown/1,
+ [
+ {
+ foreachx,
+ fun start_populated_db_test/1,
+ fun stop_test/2,
+ [{Event, fun run_dynamic_init/2} || Event <- Events]
+ }
+ ]
+ }
+ }.
+
+
+run_static_init(Event, DbName) ->
+ Name = lists:flatten(io_lib:format("~p", [Event])),
+ Test = {timeout, ?TIMEOUT_EUNIT, ?_test(run_static(Event, DbName))},
+ {Name, Test}.
+
+
+run_static(Event, DbName) ->
+ {ok, ContinueFun} = ?EV_MOD:set_wait(init),
+ {ok, Reason} = ?EV_MOD:set_crash(Event),
+ {ok, Db} = couch_db:open_int(DbName, []),
+ Ref = couch_db:monitor(Db),
+ {ok, CPid} = couch_db:start_compact(Db),
+ ContinueFun(CPid),
+ receive
+ {'DOWN', Ref, _, _, Reason} ->
+ wait_db_cleared(Db)
+ end,
+ run_successful_compaction(DbName),
+ couch_db:close(Db).
+
+
+run_dynamic_init(Event, DbName) ->
+ Name = lists:flatten(io_lib:format("~p", [Event])),
+ Test = {timeout, ?TIMEOUT_EUNIT, ?_test(run_dynamic(Event, DbName))},
+ {Name, Test}.
+
+
+run_dynamic(Event, DbName) ->
+ {ok, ContinueFun} = ?EV_MOD:set_wait(init),
+ {ok, Reason} = ?EV_MOD:set_crash(Event),
+ {ok, Db} = couch_db:open_int(DbName, []),
+ Ref = couch_db:monitor(Db),
+ {ok, CPid} = couch_db:start_compact(Db),
+ ok = populate_db(Db, 10),
+ ContinueFun(CPid),
+ receive
+ {'DOWN', Ref, _, _, Reason} ->
+ wait_db_cleared(Db)
+ end,
+ run_successful_compaction(DbName),
+ couch_db:close(Db).
+
+
+run_successful_compaction(DbName) ->
+ ?EV_MOD:clear(),
+ {ok, ContinueFun} = ?EV_MOD:set_wait(init),
+ {ok, Db} = couch_db:open_int(DbName, []),
+ {ok, CPid} = couch_db:start_compact(Db),
+ Ref = erlang:monitor(process, CPid),
+ ContinueFun(CPid),
+ receive
+ {'DOWN', Ref, _, _, normal} -> ok
+ end,
+ Pid = couch_db:get_pid(Db),
+ {ok, NewDb} = gen_server:call(Pid, get_db),
+ validate_compaction(NewDb),
+ couch_db:close(Db).
+
+
+wait_db_cleared(Db) ->
+ wait_db_cleared(Db, 5).
+
+
+wait_db_cleared(Db, N) when N < 0 ->
+ erlang:error({db_clear_timeout, couch_db:name(Db)});
+
+wait_db_cleared(Db, N) ->
+ case ets:lookup(couch_dbs, couch_db:name(Db)) of
+ [] ->
+ ok;
+ [#entry{db = NewDb}] ->
+ OldPid = couch_db:get_pid(Db),
+ NewPid = couch_db:get_pid(NewDb),
+ if NewPid /= OldPid -> ok; true ->
+ timer:sleep(100),
+ wait_db_cleared(Db, N - 1)
+ end
+ end.
+
+
+populate_db(_Db, NumDocs) when NumDocs =< 0 ->
+ ok;
+populate_db(Db, NumDocs) ->
+ String = [$a || _ <- lists:seq(1, erlang:min(NumDocs, 500))],
+ Docs = lists:map(
+ fun(_) ->
+ couch_doc:from_json_obj({[
+ {<<"_id">>, couch_uuids:random()},
+ {<<"string">>, list_to_binary(String)}
+ ]})
+ end,
+ lists:seq(1, 500)),
+ {ok, _} = couch_db:update_docs(Db, Docs, []),
+ populate_db(Db, NumDocs - 500).
+
+
+validate_compaction(Db) ->
+ {ok, DocCount} = couch_db:get_doc_count(Db),
+ {ok, DelDocCount} = couch_db:get_del_doc_count(Db),
+ NumChanges = couch_db:count_changes_since(Db, 0),
+ FoldFun = fun(FDI, {PrevId, CountAcc}) ->
+ ?assert(FDI#full_doc_info.id > PrevId),
+ {ok, {FDI#full_doc_info.id, CountAcc + 1}}
+ end,
+ {ok, {_, LastCount}} = couch_db:fold_docs(Db, FoldFun, {<<>>, 0}),
+ ?assertEqual(DocCount + DelDocCount, LastCount),
+ ?assertEqual(NumChanges, LastCount).
+
+
+purge_module() ->
+ case code:which(couch_db_updater) of
+ cover_compiled ->
+ ok;
+ _ ->
+ code:delete(couch_db_updater),
+ code:purge(couch_db_updater)
+ end. \ No newline at end of file