diff options
author | jiangph <jiangph@cn.ibm.com> | 2020-05-12 23:05:25 +0800 |
---|---|---|
committer | jiangph <jiangph@cn.ibm.com> | 2020-05-13 15:53:04 +0800 |
commit | 0e7b08cba1e63cc6251600747f132afc6b865f06 (patch) | |
tree | 2eb363047db30ceb02e8817783c12f88df1f9f40 | |
parent | 2144b30057dbbe07937e4a6c118934c5e277bb7f (diff) | |
download | couchdb-background-db-deletion.tar.gz |
address Nick's commentbackground-db-deletion
-rw-r--r-- | src/fabric/src/fabric2_db_expiration.erl | 86 | ||||
-rw-r--r-- | src/fabric/src/fabric2_sup.erl | 2 | ||||
-rw-r--r-- | src/fabric/test/fabric2_db_crud_tests.erl | 49 |
3 files changed, 71 insertions, 66 deletions
diff --git a/src/fabric/src/fabric2_db_expiration.erl b/src/fabric/src/fabric2_db_expiration.erl index c2ba03fc6..48e4cd295 100644 --- a/src/fabric/src/fabric2_db_expiration.erl +++ b/src/fabric/src/fabric2_db_expiration.erl @@ -19,7 +19,7 @@ -export([ start_link/0, cleanup/1, - delete_dbs/1 + process_expirations/2 ]). -export([ @@ -35,13 +35,11 @@ -include_lib("couch/include/couch_db.hrl"). -include_lib("fabric/include/fabric2.hrl"). - --define(JOB_TYPE, <<"dbexpiration">>). --define(JOB_ID, <<"dbexpiration_job">>). +-define(JOB_TYPE, <<"db_expiration">>). +-define(JOB_ID, <<"db_expiration_job">>). -define(DEFAULT_JOB_Version, 1). -define(DEFAULT_RETENTION_SEC, 172800). % 48 hours -define(DEFAULT_SCHEDULE_SEC, 3600). % 1 hour --define(DEFAULT_EXPIRATION_BATCH, 100). -define(ERROR_RESCHEDULE_SEC, 5). -define(CHECK_ENABLED_SEC, 2). -define(JOB_TIMEOUT_SEC, 30). @@ -131,7 +129,7 @@ cleanup(true) -> % maybe handle timeout here, need to check the api {ok, Job, Data} -> try - {ok, Job1, Data1} = process_expirations(Job, Data), + {ok, Job1, Data1} = ?MODULE:process_expirations(Job, Data), ok = resubmit_job(Job1, Data1, schedule_sec()) catch _Tag:Error -> @@ -142,7 +140,7 @@ cleanup(true) -> exit({job_error, Error, Stack}) end; {error, not_found} -> - timer:sleep(1000), + timer:sleep(?CHECK_ENABLED_SEC * 1000), ?MODULE:cleanup(is_enabled()) end. @@ -158,72 +156,41 @@ resubmit_job(Job, Data, After) -> process_expirations(#{} = Job, #{} = Data) -> - Callback = fun(Value, Acc) -> - NewAcc = case Value of - {meta, _} -> Acc; - {row, DbInfo} -> - TotalLen = length(Acc), - process_row(TotalLen, Acc, DbInfo); - complete -> - TotalLen = length(Acc), - if TotalLen == 0 -> Acc; true -> - [{LastDelete, _, _} | _] = Acc, - TotalLen = length(Acc), - delete_dbs(lists:sublist(Acc, TotalLen - LastDelete)), - Acc - end - end, - {ok, NewAcc} - end, - {ok, _Infos} = fabric2_db:list_deleted_dbs_info(Callback, [], []), - {ok, Job, Data}. - - -process_row(0, _Acc, DbInfo) -> - DbName = proplists:get_value(db_name, DbInfo), - TimeStamp = proplists:get_value(timestamp, DbInfo), - [{0, DbName, TimeStamp}]; - -process_row(TotalLen, Acc, DbInfo) -> - [{LastDelete, _, _} | _] = Acc, - NumberToDelete = TotalLen - LastDelete, - DeleteBatch = expiration_batch(), - LastDelete2 = case NumberToDelete == DeleteBatch of - true -> - delete_dbs(lists:sublist(Acc, DeleteBatch)), - report_progress(TotalLen), - TotalLen; - _ -> - LastDelete - end, - DbName = proplists:get_value(db_name, DbInfo), - TimeStamp = proplists:get_value(timestamp, DbInfo), - [{LastDelete2, DbName, TimeStamp} | Acc]. - - -delete_dbs(Infos) -> - lists:foreach(fun({_, DbName, TimeStamp}) -> + {ok, Infos} = fabric2_db:list_deleted_dbs_info([{restart_tx, true}]), + Start = now_sec(), + lists:foldl(fun(DeletedDbInfo, LastUpdateAt) -> + DbName = proplists:get_value(db_name, DeletedDbInfo), + TimeStamp = proplists:get_value(timestamp, DeletedDbInfo), Now = now_sec(), Retention = retention_sec(), Since = Now - Retention, case Since >= timestamp_to_sec(TimeStamp) of true -> + couch_log:notice("Permanently deleting ~p database with" + " timestamp ~p", [DbName, TimeStamp]), ok = fabric2_db:delete(DbName, [{deleted_at, TimeStamp}]); false -> ok - end - end, Infos). + end, + maybe_report_progress(Job, LastUpdateAt) + end, Start, Infos), + {ok, Job, Data}. -report_progress(TotalDbs) -> +maybe_report_progress(Job, LastUpdateAt) -> % Update periodically the job so it doesn't expire Now = now_sec(), Progress = #{ - <<"total_processed_dbs">> => TotalDbs, <<"processed_at">> => Now }, - couch_jobs:update(undfined, ?JOB_TYPE, Progress). + case (Now - LastUpdateAt) > (?JOB_TIMEOUT_SEC div 2) of + true -> + couch_jobs:update(undefined, Job, Progress), + Now; + false -> + LastUpdateAt + end. job_id() -> @@ -266,8 +233,3 @@ retention_sec() -> schedule_sec() -> config:get_integer("couchdb", "db_expiration_schedule_sec", ?DEFAULT_SCHEDULE_SEC). - - -expiration_batch() -> - config:get_integer("couchdb", "db_expiration_batch", - ?DEFAULT_EXPIRATION_BATCH). diff --git a/src/fabric/src/fabric2_sup.erl b/src/fabric/src/fabric2_sup.erl index 899bd999a..874a8c240 100644 --- a/src/fabric/src/fabric2_sup.erl +++ b/src/fabric/src/fabric2_sup.erl @@ -30,7 +30,7 @@ start_link(Args) -> init([]) -> config:enable_feature(fdb), - Flags = {one_for_all, 1, 5}, + Flags = {rest_for_one, 1, 5}, Children = [ { fabric2_server, diff --git a/src/fabric/test/fabric2_db_crud_tests.erl b/src/fabric/test/fabric2_db_crud_tests.erl index 849d090bc..b1e15aa2e 100644 --- a/src/fabric/test/fabric2_db_crud_tests.erl +++ b/src/fabric/test/fabric2_db_crud_tests.erl @@ -13,6 +13,7 @@ -module(fabric2_db_crud_tests). +-include_lib("fabric/include/fabric2.hrl"). -include_lib("couch/include/couch_eunit.hrl"). -include_lib("eunit/include/eunit.hrl"). -include("fabric2_test.hrl"). @@ -61,6 +62,25 @@ crud_test_() -> }. +scheduled_db_remove_error_test_() -> + { + "Test scheduled database remove operations", + { + setup, + fun setup_all/0, + fun teardown_all/1, + { + foreach, + fun setup/0, + fun cleanup/1, + [ + ?TDEF_FE(scheduled_remove_deleted_dbs_with_error) + ] + } + } + }. + + setup_all() -> meck:new(config, [passthrough]), meck:expect(config, get_integer, fun @@ -75,7 +95,9 @@ setup_all() -> teardown_all(Ctx) -> - meck:unload(), + meck:unload(erlfdb), + meck:unload(config), + meck:unload(fabric2_db_expiration), test_util:stop_couch(Ctx). @@ -229,7 +251,7 @@ scheduled_remove_deleted_db(_) -> ?assertEqual(ok, fabric2_db:delete(DbName, [])), ?assertEqual(false, ets:member(fabric2_server, DbName)), - meck:wait(fabric2_db_expiration, cleanup, '_', 5000), + meck:wait(fabric2_db_expiration, process_expirations, '_', 5000), {ok, Infos} = fabric2_db:list_deleted_dbs_info(), DeletedDbs = [proplists:get_value(db_name, Info) || Info <- Infos], @@ -241,7 +263,7 @@ scheduled_remove_deleted_dbs(_) -> ok = config:set("couchdb", "db_expiration_batch", "2", false), ok = config:set("couchdb", "enable_database_recovery", "true", false), DbNameList = [create_and_delete_db() || _I <- lists:seq(1, 5)], - meck:wait(fabric2_db_expiration, cleanup, '_', 5000), + meck:wait(fabric2_db_expiration, process_expirations, '_', 5000), {ok, Infos} = fabric2_db:list_deleted_dbs_info(), DeletedDbs = [proplists:get_value(db_name, Info) || Info <- Infos], @@ -250,6 +272,27 @@ scheduled_remove_deleted_dbs(_) -> end, DbNameList). +scheduled_remove_deleted_dbs_with_error(_) -> + meck:expect(fabric2_db_expiration, process_expirations, fun(_, _) -> + throw(process_expirations_error) + end), + + {Pid, Ref} = spawn_monitor(fun() -> + fabric2_db_expiration:cleanup(true) + end), + receive + {'DOWN', Ref, process, Pid, Error} -> + ?assertMatch({job_error, process_expirations_error, _}, Error) + end, + JobType = <<"db_expiration">>, + JobId = <<"db_expiration_job">>, + FQJobId = <<JobId/binary, "-", 1:16/integer>>, + + ?assertMatch({ok, _}, couch_jobs:get_job_data(undefined, JobType, FQJobId)), + {ok, JobState} = couch_jobs:get_job_state(undefined, JobType, FQJobId), + ?assert(lists:member(JobState, [pending, running])). + + old_db_handle(_) -> % db hard deleted DbName1 = ?tempdb(), |