diff options
-rw-r--r-- | src/fabric/include/fabric2.hrl | 1 | ||||
-rw-r--r-- | src/fabric/src/fabric2_db_expiration.erl | 246 | ||||
-rw-r--r-- | src/fabric/src/fabric2_sup.erl | 10 | ||||
-rw-r--r-- | src/fabric/test/fabric2_db_crud_tests.erl | 104 |
4 files changed, 358 insertions, 3 deletions
diff --git a/src/fabric/include/fabric2.hrl b/src/fabric/include/fabric2.hrl index 27f3d61c2..2e71787c3 100644 --- a/src/fabric/include/fabric2.hrl +++ b/src/fabric/include/fabric2.hrl @@ -13,6 +13,7 @@ -define(uint2bin(I), binary:encode_unsigned(I, little)). -define(bin2uint(I), binary:decode_unsigned(I, little)). +-define(bin2int(V), binary_to_integer(V)). -define(METADATA_VERSION_KEY, <<16#FF, "/metadataVersion">>). % Prefix Definitions diff --git a/src/fabric/src/fabric2_db_expiration.erl b/src/fabric/src/fabric2_db_expiration.erl new file mode 100644 index 000000000..3363d2427 --- /dev/null +++ b/src/fabric/src/fabric2_db_expiration.erl @@ -0,0 +1,246 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric2_db_expiration). + + +-behaviour(gen_server). + + +-export([ + start_link/0, + cleanup/1, + process_expirations/2 +]). + +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3 +]). + + +-include_lib("couch/include/couch_db.hrl"). +-include_lib("fabric/include/fabric2.hrl"). + +-define(JOB_TYPE, <<"db_expiration">>). +-define(JOB_ID, <<"db_expiration_job">>). +-define(DEFAULT_JOB_Version, 1). +-define(DEFAULT_RETENTION_SEC, 172800). % 48 hours +-define(DEFAULT_SCHEDULE_SEC, 3600). % 1 hour +-define(ERROR_RESCHEDULE_SEC, 5). +-define(CHECK_ENABLED_SEC, 2). +-define(JOB_TIMEOUT_SEC, 30). + + +-record(st, { + job +}). + + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + + +init(_) -> + process_flag(trap_exit, true), + {ok, #st{job = undefined}, 0}. + + +terminate(_M, _St) -> + ok. + + +handle_call(Msg, _From, St) -> + {stop, {bad_call, Msg}, {bad_call, Msg}, St}. + + +handle_cast(Msg, St) -> + {stop, {bad_cast, Msg}, St}. + + +handle_info(timeout, #st{job = undefined} = St) -> + ok = wait_for_couch_jobs_app(), + ok = couch_jobs:set_type_timeout(?JOB_TYPE, ?JOB_TIMEOUT_SEC), + ok = maybe_add_job(), + Pid = spawn_link(?MODULE, cleanup, [is_enabled()]), + {noreply, St#st{job = Pid}}; + +handle_info({'EXIT', Pid, Exit}, #st{job = Pid} = St) -> + case Exit of + normal -> ok; + Error -> couch_log:error("~p : job error ~p", [?MODULE, Error]) + end, + NewPid = spawn_link(?MODULE, cleanup, [is_enabled()]), + {noreply, St#st{job = NewPid}}; + +handle_info(Msg, St) -> + {stop, {bad_info, Msg}, St}. + + +code_change(_OldVsn, St, _Extra) -> + {ok, St}. + + +wait_for_couch_jobs_app() -> + % Because of a circular dependency between couch_jobs and fabric apps, wait + % for couch_jobs to initialize before continuing. If we refactor the + % commits FDB utilities out we can remove this bit of code. + case lists:keysearch(couch_jobs, 1, application:which_applications()) of + {value, {couch_jobs, _, _}} -> + ok; + false -> + timer:sleep(100), + wait_for_couch_jobs_app() + end. + + +maybe_add_job() -> + case couch_jobs:get_job_data(undefined, ?JOB_TYPE, job_id()) of + {error, not_found} -> + Now = erlang:system_time(second), + ok = couch_jobs:add(undefined, ?JOB_TYPE, job_id(), #{}, Now); + {ok, _JobData} -> + ok + end. + + +cleanup(false) -> + timer:sleep(?CHECK_ENABLED_SEC * 1000), + exit(normal); + +cleanup(true) -> + Now = erlang:system_time(second), + ScheduleSec = schedule_sec(), + Opts = #{max_sched_time => Now + min(ScheduleSec div 3, 15)}, + case couch_jobs:accept(?JOB_TYPE, Opts) of + {ok, Job, Data} -> + try + {ok, Job1, Data1} = ?MODULE:process_expirations(Job, Data), + ok = resubmit_job(Job1, Data1, schedule_sec()) + catch + _Tag:Error -> + Stack = erlang:get_stacktrace(), + couch_log:error("~p : processing error ~p ~p ~p", + [?MODULE, Job, Error, Stack]), + ok = resubmit_job(Job, Data, ?ERROR_RESCHEDULE_SEC), + exit({job_error, Error, Stack}) + end; + {error, not_found} -> + timer:sleep(?CHECK_ENABLED_SEC * 1000), + ?MODULE:cleanup(is_enabled()) + end. + + +resubmit_job(Job, Data, After) -> + Now = erlang:system_time(second), + SchedTime = Now + After, + couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) -> + {ok, Job1} = couch_jobs:resubmit(JTx, Job, SchedTime), + ok = couch_jobs:finish(JTx, Job1, Data) + end), + ok. + + +process_expirations(#{} = Job, #{} = Data) -> + Start = now_sec(), + Callback = fun(Value, LastUpdateAt) -> + case Value of + {meta, _} -> ok; + {row, DbInfo} -> process_row(DbInfo); + complete -> ok + end, + {ok, maybe_report_progress(Job, LastUpdateAt)} + end, + {ok, _Infos} = fabric2_db:list_deleted_dbs_info( + Callback, + Start, + [{restart_tx, true}] + ), + {ok, Job, Data}. + + +process_row(DbInfo) -> + DbName = proplists:get_value(db_name, DbInfo), + TimeStamp = proplists:get_value(timestamp, DbInfo), + Now = now_sec(), + Retention = retention_sec(), + Since = Now - Retention, + case Since >= timestamp_to_sec(TimeStamp) of + true -> + couch_log:notice("Permanently deleting ~p database with" + " timestamp ~p", [DbName, TimeStamp]), + ok = fabric2_db:delete(DbName, [{deleted_at, TimeStamp}]); + false -> + ok + end. + + +maybe_report_progress(Job, LastUpdateAt) -> + % Update periodically the job so it doesn't expire + Now = now_sec(), + Progress = #{ + <<"processed_at">> => Now + + }, + case (Now - LastUpdateAt) > (?JOB_TIMEOUT_SEC div 2) of + true -> + couch_jobs:update(undefined, Job, Progress), + Now; + false -> + LastUpdateAt + end. + + +job_id() -> + JobVersion = job_version(), + <<?JOB_ID/binary, "-", JobVersion:16/integer>>. + + +now_sec() -> + Now = os:timestamp(), + Nowish = calendar:now_to_universal_time(Now), + calendar:datetime_to_gregorian_seconds(Nowish). + + +timestamp_to_sec(TimeStamp) -> + <<Year:4/binary, "-", Month:2/binary, "-", Day:2/binary, + "T", + Hour:2/binary, ":", Minutes:2/binary, ":", Second:2/binary, + "Z">> = TimeStamp, + + calendar:datetime_to_gregorian_seconds( + {{?bin2int(Year), ?bin2int(Month), ?bin2int(Day)}, + {?bin2int(Hour), ?bin2int(Minutes), ?bin2int(Second)}} + ). + + +is_enabled() -> + config:get_boolean("couchdb", "db_expiration_enabled", false). + + +job_version() -> + config:get_integer("couchdb", "db_expiration_job_version", + ?DEFAULT_JOB_Version). + + +retention_sec() -> + config:get_integer("couchdb", "db_expiration_retention_sec", + ?DEFAULT_RETENTION_SEC). + + +schedule_sec() -> + config:get_integer("couchdb", "db_expiration_schedule_sec", + ?DEFAULT_SCHEDULE_SEC). diff --git a/src/fabric/src/fabric2_sup.erl b/src/fabric/src/fabric2_sup.erl index e8201b4ee..874a8c240 100644 --- a/src/fabric/src/fabric2_sup.erl +++ b/src/fabric/src/fabric2_sup.erl @@ -30,7 +30,7 @@ start_link(Args) -> init([]) -> config:enable_feature(fdb), - Flags = {one_for_one, 1, 5}, + Flags = {rest_for_one, 1, 5}, Children = [ { fabric2_server, @@ -55,6 +55,14 @@ init([]) -> 5000, worker, [fabric2_index] + }, + { + fabric2_db_expiration, + {fabric2_db_expiration, start_link, []}, + permanent, + 5000, + worker, + [fabric2_db_expiration] } ], ChildrenWithEpi = couch_epi:register_service(fabric2_epi, Children), diff --git a/src/fabric/test/fabric2_db_crud_tests.erl b/src/fabric/test/fabric2_db_crud_tests.erl index 9deb8dd26..b1e15aa2e 100644 --- a/src/fabric/test/fabric2_db_crud_tests.erl +++ b/src/fabric/test/fabric2_db_crud_tests.erl @@ -13,6 +13,7 @@ -module(fabric2_db_crud_tests). +-include_lib("fabric/include/fabric2.hrl"). -include_lib("couch/include/couch_eunit.hrl"). -include_lib("eunit/include/eunit.hrl"). -include("fabric2_test.hrl"). @@ -39,6 +40,8 @@ crud_test_() -> ?TDEF_FE(recreate_db), ?TDEF_FE(undelete_db), ?TDEF_FE(remove_deleted_db), + ?TDEF_FE(scheduled_remove_deleted_db), + ?TDEF_FE(scheduled_remove_deleted_dbs), ?TDEF_FE(old_db_handle), ?TDEF_FE(list_dbs), ?TDEF_FE(list_dbs_user_fun), @@ -59,14 +62,42 @@ crud_test_() -> }. +scheduled_db_remove_error_test_() -> + { + "Test scheduled database remove operations", + { + setup, + fun setup_all/0, + fun teardown_all/1, + { + foreach, + fun setup/0, + fun cleanup/1, + [ + ?TDEF_FE(scheduled_remove_deleted_dbs_with_error) + ] + } + } + }. + + setup_all() -> - Ctx = test_util:start_couch([fabric]), + meck:new(config, [passthrough]), + meck:expect(config, get_integer, fun + ("couchdb", "db_expiration_schedule_sec", _) -> 2; + ("couchdb", "db_expiration_retention_sec", _) -> 0; + (_, _, Default) -> Default + end), + Ctx = test_util:start_couch([fabric, couch_jobs]), meck:new(erlfdb, [passthrough]), + meck:new(fabric2_db_expiration, [passthrough]), Ctx. teardown_all(Ctx) -> - meck:unload(), + meck:unload(erlfdb), + meck:unload(config), + meck:unload(fabric2_db_expiration), test_util:stop_couch(Ctx). @@ -75,9 +106,12 @@ setup() -> cleanup(_) -> + ok = config:set("couchdb", "db_expiration_enabled", "false", false), ok = config:set("couchdb", "enable_database_recovery", "false", false), fabric2_test_util:tx_too_old_reset_errors(), reset_fail_erfdb_wait(), + meck:reset([fabric2_db_expiration]), + meck:reset([config]), meck:reset([erlfdb]). @@ -205,6 +239,60 @@ remove_deleted_db(_) -> ?assert(not lists:member(DbName, DeletedDbs)). +scheduled_remove_deleted_db(_) -> + ok = config:set("couchdb", "db_expiration_enabled", "true", false), + ok = config:set("couchdb", "enable_database_recovery", "true", false), + DbName = ?tempdb(), + ?assertError(database_does_not_exist, fabric2_db:delete(DbName, [])), + + ?assertMatch({ok, _}, fabric2_db:create(DbName, [])), + ?assertEqual(true, ets:member(fabric2_server, DbName)), + + ?assertEqual(ok, fabric2_db:delete(DbName, [])), + ?assertEqual(false, ets:member(fabric2_server, DbName)), + + meck:wait(fabric2_db_expiration, process_expirations, '_', 5000), + + {ok, Infos} = fabric2_db:list_deleted_dbs_info(), + DeletedDbs = [proplists:get_value(db_name, Info) || Info <- Infos], + ?assert(not lists:member(DbName, DeletedDbs)). + + +scheduled_remove_deleted_dbs(_) -> + ok = config:set("couchdb", "db_expiration_enabled", "true", false), + ok = config:set("couchdb", "db_expiration_batch", "2", false), + ok = config:set("couchdb", "enable_database_recovery", "true", false), + DbNameList = [create_and_delete_db() || _I <- lists:seq(1, 5)], + meck:wait(fabric2_db_expiration, process_expirations, '_', 5000), + + {ok, Infos} = fabric2_db:list_deleted_dbs_info(), + DeletedDbs = [proplists:get_value(db_name, Info) || Info <- Infos], + lists:map(fun(DbName) -> + ?assert(not lists:member(DbName, DeletedDbs)) + end, DbNameList). + + +scheduled_remove_deleted_dbs_with_error(_) -> + meck:expect(fabric2_db_expiration, process_expirations, fun(_, _) -> + throw(process_expirations_error) + end), + + {Pid, Ref} = spawn_monitor(fun() -> + fabric2_db_expiration:cleanup(true) + end), + receive + {'DOWN', Ref, process, Pid, Error} -> + ?assertMatch({job_error, process_expirations_error, _}, Error) + end, + JobType = <<"db_expiration">>, + JobId = <<"db_expiration_job">>, + FQJobId = <<JobId/binary, "-", 1:16/integer>>, + + ?assertMatch({ok, _}, couch_jobs:get_job_data(undefined, JobType, FQJobId)), + {ok, JobState} = couch_jobs:get_job_state(undefined, JobType, FQJobId), + ?assert(lists:member(JobState, [pending, running])). + + old_db_handle(_) -> % db hard deleted DbName1 = ?tempdb(), @@ -615,3 +703,15 @@ get_deleted_dbs(DeletedDbInfos) -> DbName = fabric2_util:get_value(db_name, DbInfo), [DbName | Acc] end, [], DeletedDbInfos). + + +create_and_delete_db() -> + DbName = ?tempdb(), + ?assertError(database_does_not_exist, fabric2_db:delete(DbName, [])), + + ?assertMatch({ok, _}, fabric2_db:create(DbName, [])), + ?assertEqual(true, ets:member(fabric2_server, DbName)), + + ?assertEqual(ok, fabric2_db:delete(DbName, [])), + ?assertEqual(false, ets:member(fabric2_server, DbName)), + DbName. |