summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjiangph <jiangph@cn.ibm.com>2020-05-12 23:05:25 +0800
committerjiangph <jiangph@cn.ibm.com>2020-05-13 15:53:04 +0800
commit0e7b08cba1e63cc6251600747f132afc6b865f06 (patch)
tree2eb363047db30ceb02e8817783c12f88df1f9f40
parent2144b30057dbbe07937e4a6c118934c5e277bb7f (diff)
downloadcouchdb-background-db-deletion.tar.gz
address Nick's commentbackground-db-deletion
-rw-r--r--src/fabric/src/fabric2_db_expiration.erl86
-rw-r--r--src/fabric/src/fabric2_sup.erl2
-rw-r--r--src/fabric/test/fabric2_db_crud_tests.erl49
3 files changed, 71 insertions, 66 deletions
diff --git a/src/fabric/src/fabric2_db_expiration.erl b/src/fabric/src/fabric2_db_expiration.erl
index c2ba03fc6..48e4cd295 100644
--- a/src/fabric/src/fabric2_db_expiration.erl
+++ b/src/fabric/src/fabric2_db_expiration.erl
@@ -19,7 +19,7 @@
-export([
start_link/0,
cleanup/1,
- delete_dbs/1
+ process_expirations/2
]).
-export([
@@ -35,13 +35,11 @@
-include_lib("couch/include/couch_db.hrl").
-include_lib("fabric/include/fabric2.hrl").
-
--define(JOB_TYPE, <<"dbexpiration">>).
--define(JOB_ID, <<"dbexpiration_job">>).
+-define(JOB_TYPE, <<"db_expiration">>).
+-define(JOB_ID, <<"db_expiration_job">>).
-define(DEFAULT_JOB_Version, 1).
-define(DEFAULT_RETENTION_SEC, 172800). % 48 hours
-define(DEFAULT_SCHEDULE_SEC, 3600). % 1 hour
--define(DEFAULT_EXPIRATION_BATCH, 100).
-define(ERROR_RESCHEDULE_SEC, 5).
-define(CHECK_ENABLED_SEC, 2).
-define(JOB_TIMEOUT_SEC, 30).
@@ -131,7 +129,7 @@ cleanup(true) ->
% maybe handle timeout here, need to check the api
{ok, Job, Data} ->
try
- {ok, Job1, Data1} = process_expirations(Job, Data),
+ {ok, Job1, Data1} = ?MODULE:process_expirations(Job, Data),
ok = resubmit_job(Job1, Data1, schedule_sec())
catch
_Tag:Error ->
@@ -142,7 +140,7 @@ cleanup(true) ->
exit({job_error, Error, Stack})
end;
{error, not_found} ->
- timer:sleep(1000),
+ timer:sleep(?CHECK_ENABLED_SEC * 1000),
?MODULE:cleanup(is_enabled())
end.
@@ -158,72 +156,41 @@ resubmit_job(Job, Data, After) ->
process_expirations(#{} = Job, #{} = Data) ->
- Callback = fun(Value, Acc) ->
- NewAcc = case Value of
- {meta, _} -> Acc;
- {row, DbInfo} ->
- TotalLen = length(Acc),
- process_row(TotalLen, Acc, DbInfo);
- complete ->
- TotalLen = length(Acc),
- if TotalLen == 0 -> Acc; true ->
- [{LastDelete, _, _} | _] = Acc,
- TotalLen = length(Acc),
- delete_dbs(lists:sublist(Acc, TotalLen - LastDelete)),
- Acc
- end
- end,
- {ok, NewAcc}
- end,
- {ok, _Infos} = fabric2_db:list_deleted_dbs_info(Callback, [], []),
- {ok, Job, Data}.
-
-
-process_row(0, _Acc, DbInfo) ->
- DbName = proplists:get_value(db_name, DbInfo),
- TimeStamp = proplists:get_value(timestamp, DbInfo),
- [{0, DbName, TimeStamp}];
-
-process_row(TotalLen, Acc, DbInfo) ->
- [{LastDelete, _, _} | _] = Acc,
- NumberToDelete = TotalLen - LastDelete,
- DeleteBatch = expiration_batch(),
- LastDelete2 = case NumberToDelete == DeleteBatch of
- true ->
- delete_dbs(lists:sublist(Acc, DeleteBatch)),
- report_progress(TotalLen),
- TotalLen;
- _ ->
- LastDelete
- end,
- DbName = proplists:get_value(db_name, DbInfo),
- TimeStamp = proplists:get_value(timestamp, DbInfo),
- [{LastDelete2, DbName, TimeStamp} | Acc].
-
-
-delete_dbs(Infos) ->
- lists:foreach(fun({_, DbName, TimeStamp}) ->
+ {ok, Infos} = fabric2_db:list_deleted_dbs_info([{restart_tx, true}]),
+ Start = now_sec(),
+ lists:foldl(fun(DeletedDbInfo, LastUpdateAt) ->
+ DbName = proplists:get_value(db_name, DeletedDbInfo),
+ TimeStamp = proplists:get_value(timestamp, DeletedDbInfo),
Now = now_sec(),
Retention = retention_sec(),
Since = Now - Retention,
case Since >= timestamp_to_sec(TimeStamp) of
true ->
+ couch_log:notice("Permanently deleting ~p database with"
+ " timestamp ~p", [DbName, TimeStamp]),
ok = fabric2_db:delete(DbName, [{deleted_at, TimeStamp}]);
false ->
ok
- end
- end, Infos).
+ end,
+ maybe_report_progress(Job, LastUpdateAt)
+ end, Start, Infos),
+ {ok, Job, Data}.
-report_progress(TotalDbs) ->
+maybe_report_progress(Job, LastUpdateAt) ->
% Update periodically the job so it doesn't expire
Now = now_sec(),
Progress = #{
- <<"total_processed_dbs">> => TotalDbs,
<<"processed_at">> => Now
},
- couch_jobs:update(undfined, ?JOB_TYPE, Progress).
+ case (Now - LastUpdateAt) > (?JOB_TIMEOUT_SEC div 2) of
+ true ->
+ couch_jobs:update(undefined, Job, Progress),
+ Now;
+ false ->
+ LastUpdateAt
+ end.
job_id() ->
@@ -266,8 +233,3 @@ retention_sec() ->
schedule_sec() ->
config:get_integer("couchdb", "db_expiration_schedule_sec",
?DEFAULT_SCHEDULE_SEC).
-
-
-expiration_batch() ->
- config:get_integer("couchdb", "db_expiration_batch",
- ?DEFAULT_EXPIRATION_BATCH).
diff --git a/src/fabric/src/fabric2_sup.erl b/src/fabric/src/fabric2_sup.erl
index 899bd999a..874a8c240 100644
--- a/src/fabric/src/fabric2_sup.erl
+++ b/src/fabric/src/fabric2_sup.erl
@@ -30,7 +30,7 @@ start_link(Args) ->
init([]) ->
config:enable_feature(fdb),
- Flags = {one_for_all, 1, 5},
+ Flags = {rest_for_one, 1, 5},
Children = [
{
fabric2_server,
diff --git a/src/fabric/test/fabric2_db_crud_tests.erl b/src/fabric/test/fabric2_db_crud_tests.erl
index 849d090bc..b1e15aa2e 100644
--- a/src/fabric/test/fabric2_db_crud_tests.erl
+++ b/src/fabric/test/fabric2_db_crud_tests.erl
@@ -13,6 +13,7 @@
-module(fabric2_db_crud_tests).
+-include_lib("fabric/include/fabric2.hrl").
-include_lib("couch/include/couch_eunit.hrl").
-include_lib("eunit/include/eunit.hrl").
-include("fabric2_test.hrl").
@@ -61,6 +62,25 @@ crud_test_() ->
}.
+scheduled_db_remove_error_test_() ->
+ {
+ "Test scheduled database remove operations",
+ {
+ setup,
+ fun setup_all/0,
+ fun teardown_all/1,
+ {
+ foreach,
+ fun setup/0,
+ fun cleanup/1,
+ [
+ ?TDEF_FE(scheduled_remove_deleted_dbs_with_error)
+ ]
+ }
+ }
+ }.
+
+
setup_all() ->
meck:new(config, [passthrough]),
meck:expect(config, get_integer, fun
@@ -75,7 +95,9 @@ setup_all() ->
teardown_all(Ctx) ->
- meck:unload(),
+ meck:unload(erlfdb),
+ meck:unload(config),
+ meck:unload(fabric2_db_expiration),
test_util:stop_couch(Ctx).
@@ -229,7 +251,7 @@ scheduled_remove_deleted_db(_) ->
?assertEqual(ok, fabric2_db:delete(DbName, [])),
?assertEqual(false, ets:member(fabric2_server, DbName)),
- meck:wait(fabric2_db_expiration, cleanup, '_', 5000),
+ meck:wait(fabric2_db_expiration, process_expirations, '_', 5000),
{ok, Infos} = fabric2_db:list_deleted_dbs_info(),
DeletedDbs = [proplists:get_value(db_name, Info) || Info <- Infos],
@@ -241,7 +263,7 @@ scheduled_remove_deleted_dbs(_) ->
ok = config:set("couchdb", "db_expiration_batch", "2", false),
ok = config:set("couchdb", "enable_database_recovery", "true", false),
DbNameList = [create_and_delete_db() || _I <- lists:seq(1, 5)],
- meck:wait(fabric2_db_expiration, cleanup, '_', 5000),
+ meck:wait(fabric2_db_expiration, process_expirations, '_', 5000),
{ok, Infos} = fabric2_db:list_deleted_dbs_info(),
DeletedDbs = [proplists:get_value(db_name, Info) || Info <- Infos],
@@ -250,6 +272,27 @@ scheduled_remove_deleted_dbs(_) ->
end, DbNameList).
+scheduled_remove_deleted_dbs_with_error(_) ->
+ meck:expect(fabric2_db_expiration, process_expirations, fun(_, _) ->
+ throw(process_expirations_error)
+ end),
+
+ {Pid, Ref} = spawn_monitor(fun() ->
+ fabric2_db_expiration:cleanup(true)
+ end),
+ receive
+ {'DOWN', Ref, process, Pid, Error} ->
+ ?assertMatch({job_error, process_expirations_error, _}, Error)
+ end,
+ JobType = <<"db_expiration">>,
+ JobId = <<"db_expiration_job">>,
+ FQJobId = <<JobId/binary, "-", 1:16/integer>>,
+
+ ?assertMatch({ok, _}, couch_jobs:get_job_data(undefined, JobType, FQJobId)),
+ {ok, JobState} = couch_jobs:get_job_state(undefined, JobType, FQJobId),
+ ?assert(lists:member(JobState, [pending, running])).
+
+
old_db_handle(_) ->
% db hard deleted
DbName1 = ?tempdb(),