diff options
Diffstat (limited to 'src/fabric/src/fabric2_db_expiration.erl')
-rw-r--r-- | src/fabric/src/fabric2_db_expiration.erl | 246 |
1 files changed, 246 insertions, 0 deletions
diff --git a/src/fabric/src/fabric2_db_expiration.erl b/src/fabric/src/fabric2_db_expiration.erl new file mode 100644 index 000000000..92f22e749 --- /dev/null +++ b/src/fabric/src/fabric2_db_expiration.erl @@ -0,0 +1,246 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric2_db_expiration). + + +-behaviour(gen_server). + + +-export([ + start_link/0, + cleanup/1, + process_expirations/2 +]). + +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3 +]). + + +-include_lib("couch/include/couch_db.hrl"). +-include_lib("fabric/include/fabric2.hrl"). + +-define(JOB_TYPE, <<"db_expiration">>). +-define(JOB_ID, <<"db_expiration_job">>). +-define(DEFAULT_JOB_Version, 1). +-define(DEFAULT_RETENTION_SEC, 172800). % 48 hours +-define(DEFAULT_SCHEDULE_SEC, 3600). % 1 hour +-define(ERROR_RESCHEDULE_SEC, 5). +-define(CHECK_ENABLED_SEC, 2). +-define(JOB_TIMEOUT_SEC, 30). + + +-record(st, { + job +}). + + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + + +init(_) -> + process_flag(trap_exit, true), + {ok, #st{job = undefined}, 0}. + + +terminate(_M, _St) -> + ok. + + +handle_call(Msg, _From, St) -> + {stop, {bad_call, Msg}, {bad_call, Msg}, St}. + + +handle_cast(Msg, St) -> + {stop, {bad_cast, Msg}, St}. + + +handle_info(timeout, #st{job = undefined} = St) -> + ok = wait_for_couch_jobs_app(), + ok = couch_jobs:set_type_timeout(?JOB_TYPE, ?JOB_TIMEOUT_SEC), + ok = maybe_add_job(), + Pid = spawn_link(?MODULE, cleanup, [is_enabled()]), + {noreply, St#st{job = Pid}}; + +handle_info({'EXIT', Pid, Exit}, #st{job = Pid} = St) -> + case Exit of + normal -> ok; + Error -> couch_log:error("~p : job error ~p", [?MODULE, Error]) + end, + NewPid = spawn_link(?MODULE, cleanup, [is_enabled()]), + {noreply, St#st{job = NewPid}}; + +handle_info(Msg, St) -> + {stop, {bad_info, Msg}, St}. + + +code_change(_OldVsn, St, _Extra) -> + {ok, St}. + + +wait_for_couch_jobs_app() -> + % Because of a circular dependency between couch_jobs and fabric apps, wait + % for couch_jobs to initialize before continuing. If we refactor the + % commits FDB utilities out we can remove this bit of code. + case lists:keysearch(couch_jobs, 1, application:which_applications()) of + {value, {couch_jobs, _, _}} -> + ok; + false -> + timer:sleep(100), + wait_for_couch_jobs_app() + end. + + +maybe_add_job() -> + case couch_jobs:get_job_data(undefined, ?JOB_TYPE, job_id()) of + {error, not_found} -> + Now = erlang:system_time(second), + ok = couch_jobs:add(undefined, ?JOB_TYPE, job_id(), #{}, Now); + {ok, _JobData} -> + ok + end. + + +cleanup(false) -> + timer:sleep(?CHECK_ENABLED_SEC * 1000), + exit(normal); + +cleanup(true) -> + Now = erlang:system_time(second), + ScheduleSec = schedule_sec(), + Opts = #{max_sched_time => Now + min(ScheduleSec div 3, 15)}, + case couch_jobs:accept(?JOB_TYPE, Opts) of + {ok, Job, Data} -> + try + {ok, Job1, Data1} = ?MODULE:process_expirations(Job, Data), + ok = resubmit_job(Job1, Data1, schedule_sec()) + catch + _Tag:Error -> + Stack = erlang:get_stacktrace(), + couch_log:error("~p : processing error ~p ~p ~p", + [?MODULE, Job, Error, Stack]), + ok = resubmit_job(Job, Data, ?ERROR_RESCHEDULE_SEC), + exit({job_error, Error, Stack}) + end; + {error, not_found} -> + timer:sleep(?CHECK_ENABLED_SEC * 1000), + ?MODULE:cleanup(is_enabled()) + end. + + +resubmit_job(Job, Data, After) -> + Now = erlang:system_time(second), + SchedTime = Now + After, + couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) -> + {ok, Job1} = couch_jobs:resubmit(JTx, Job, SchedTime), + ok = couch_jobs:finish(JTx, Job1, Data) + end), + ok. + + +process_expirations(#{} = Job, #{} = Data) -> + Start = now_sec(), + Callback = fun(Value, LastUpdateAt) -> + case Value of + {meta, _} -> ok; + {row, DbInfo} -> process_row(DbInfo); + complete -> ok + end, + {ok, maybe_report_progress(Job, LastUpdateAt)} + end, + {ok, _Infos} = fabric2_db:list_deleted_dbs_info( + Callback, + Start, + [{restart_tx, true}] + ), + {ok, Job, Data}. + + +process_row(DbInfo) -> + DbName = proplists:get_value(db_name, DbInfo), + TimeStamp = proplists:get_value(timestamp, DbInfo), + Now = now_sec(), + Retention = retention_sec(), + Since = Now - Retention, + case Since >= timestamp_to_sec(TimeStamp) of + true -> + couch_log:notice("Permanently deleting ~s database with" + " timestamp ~s", [DbName, TimeStamp]), + ok = fabric2_db:delete(DbName, [{deleted_at, TimeStamp}]); + false -> + ok + end. + + +maybe_report_progress(Job, LastUpdateAt) -> + % Update periodically the job so it doesn't expire + Now = now_sec(), + Progress = #{ + <<"processed_at">> => Now + + }, + case (Now - LastUpdateAt) > (?JOB_TIMEOUT_SEC div 2) of + true -> + couch_jobs:update(undefined, Job, Progress), + Now; + false -> + LastUpdateAt + end. + + +job_id() -> + JobVersion = job_version(), + <<?JOB_ID/binary, "-", JobVersion:16/integer>>. + + +now_sec() -> + Now = os:timestamp(), + Nowish = calendar:now_to_universal_time(Now), + calendar:datetime_to_gregorian_seconds(Nowish). + + +timestamp_to_sec(TimeStamp) -> + <<Year:4/binary, "-", Month:2/binary, "-", Day:2/binary, + "T", + Hour:2/binary, ":", Minutes:2/binary, ":", Second:2/binary, + "Z">> = TimeStamp, + + calendar:datetime_to_gregorian_seconds( + {{?bin2int(Year), ?bin2int(Month), ?bin2int(Day)}, + {?bin2int(Hour), ?bin2int(Minutes), ?bin2int(Second)}} + ). + + +is_enabled() -> + config:get_boolean("couchdb", "db_expiration_enabled", false). + + +job_version() -> + config:get_integer("couchdb", "db_expiration_job_version", + ?DEFAULT_JOB_Version). + + +retention_sec() -> + config:get_integer("couchdb", "db_expiration_retention_sec", + ?DEFAULT_RETENTION_SEC). + + +schedule_sec() -> + config:get_integer("couchdb", "db_expiration_schedule_sec", + ?DEFAULT_SCHEDULE_SEC). |