diff options
author | Nick Vatamaniuc <vatamane@apache.org> | 2020-05-05 01:47:25 -0400 |
---|---|---|
committer | jiangph <jiangph@cn.ibm.com> | 2020-05-11 14:46:58 +0800 |
commit | 05251adcc1759c19e41b96c7652eb415f49896ce (patch) | |
tree | 87fe6a2ee386abb637d1ce376d030be313b70d82 | |
parent | 2c9d219514d74fb2cc1ca3e2262d10ea5953c4a2 (diff) | |
download | couchdb-05251adcc1759c19e41b96c7652eb415f49896ce.tar.gz |
Another update
-rw-r--r-- | src/fabric/src/fabric2_db_expiration.erl | 62 |
1 files changed, 25 insertions, 37 deletions
diff --git a/src/fabric/src/fabric2_db_expiration.erl b/src/fabric/src/fabric2_db_expiration.erl index f52d8bca3..3e05d65fd 100644 --- a/src/fabric/src/fabric2_db_expiration.erl +++ b/src/fabric/src/fabric2_db_expiration.erl @@ -39,10 +39,9 @@ -define(JOB_ID, <<"dbexpiration_job">>). -define(DEFAULT_RETENTION_SEC, 172800). % 48 hours -define(DEFAULT_EXPIRATION_BATCH, 100). --define(DEFAULT_SCHEDULE_SEC, 7). % 1 hour --define(ERROR_RESCHEDULE_SEC, 3). --define(INITIAL_DELAY_MSEC, 900). --define(CHECK_ENABLED_MSEC, 2000). +-define(DEFAULT_SCHEDULE_SEC, 15). % 1 hour +-define(ERROR_RESCHEDULE_SEC, 5). +-define(CHECK_ENABLED_SEC, 2). -define(JOB_TIMEOUT_SEC, 30). @@ -52,12 +51,12 @@ start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, nil, []). + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). init(_) -> - couch_log:info("~p : in init", [?MODULE]), - {ok, #st{job = undefined}, ?INITIAL_DELAY_MSEC}. + process_flag(trap_exit, true), + {ok, #st{job = undefined}, 0}. terminate(_M, _St) -> @@ -77,21 +76,19 @@ handle_info(timeout, #st{job = undefined} = St) -> ok = wait_for_couch_jobs_app(), ok = couch_jobs:set_type_timeout(?JOB_TYPE, ?JOB_TIMEOUT_SEC), % for testing - Removed = couch_jobs:remove(undefined, ?JOB_TYPE, ?JOB_ID), - couch_log:info("~p : remove result ~p", [?MODULE, Removed]), + couch_jobs:remove(undefined, ?JOB_TYPE, ?JOB_ID), ok = maybe_add_job(), couch_log:info("~p : added job ~p, initialized", [?MODULE, ?JOB_ID]), - {_, Ref} = spawn_monitor(?MODULE, cleanup, [is_enabled()]), - {noreply, St#st{job = Ref}}; + Pid = spawn_link(?MODULE, cleanup, [is_enabled()]), + {noreply, St#st{job = Pid}}; -handle_info({'DOWN', Ref, _, _, Exit}, #st{job = Ref} = St) -> - couch_log:info("~p : job finished with ~p", [?MODULE, Exit]), +handle_info({'EXIT', Pid, Exit}, #st{job = Pid} = St) -> case Exit of normal -> ok; Error -> couch_log:error("~p : job error ~p", [?MODULE, Error]) end, - {_, Ref} = spawn_monitor(?MODULE, cleanup, [is_enabled()]), - {noreply, St#st{job = Ref}}; + NewPid = spawn_link(?MODULE, cleanup, [is_enabled()]), + {noreply, St#st{job = NewPid}}; handle_info(Msg, St) -> {stop, {bad_info, Msg}, St}. @@ -105,17 +102,12 @@ wait_for_couch_jobs_app() -> % Because of a circular dependency between couch_jobs and fabric apps, wait % for couch_jobs to initialize before continuing. If we refactor the % commits FDB utilities out we can remove this bit of code. - Apps = try - application:which_applications(1000) - catch - _:_ -> [] - end, - case lists:keysearch(couch_jobs, 1, Apps) of + case lists:keysearch(couch_jobs, 1, application:which_applications()) of {value, {couch_jobs, _, _}} -> couch_log:info("~p : couch_jobs started! ", [?MODULE]), ok; false -> - timer:sleep(1000), + timer:sleep(100), couch_log:info("~p : waiting for couch jobs", [?MODULE]), wait_for_couch_jobs_app() end. @@ -131,21 +123,21 @@ maybe_add_job() -> end. -cleanup(false = _Enabled) -> +cleanup(false) -> couch_log:info("~p : Not enabled waiting ...", [?MODULE]), - timer:sleep(?CHECK_ENABLED_MSEC), + timer:sleep(?CHECK_ENABLED_SEC * 1000), exit(normal); -cleanup(true = _Enabled) -> - couch_log:info("~p : enable, accepting ...", [?MODULE]), - Opts = #{max_sched_time => erlang:system_time(second), timeout => 1}, +cleanup(true) -> + Now = erlang:system_time(second), + Opts = #{max_sched_time => Now + min(?DEFAULT_SCHEDULE_SEC div 3, 15)}, case couch_jobs:accept(?JOB_TYPE, Opts) of % maybe handle timeout here, need to check the api {ok, Job, Data} -> try couch_log:error("~p : processing expirations ~p ~p", [?MODULE, Job, Data]), {ok, Job1, Data1} = process_expirations(Job, Data), - couch_log:error("~p : resubmitting job ~p ~p", [?MODULE, Job, schedule_sec()]), + couch_log:error("~p : DONE resubmitting job ~p ~p", [?MODULE, Job, schedule_sec()]), ok = resubmit_job(Job1, Data1, schedule_sec()) catch _Tag:Error -> @@ -155,9 +147,8 @@ cleanup(true = _Enabled) -> exit({job_error, Error, Stack}) end; {error, not_found} -> - couch_log:error("~p : not found error", [?MODULE]), timer:sleep(1000), - cleanup(is_enabled()) + ?MODULE:cleanup(is_enabled()) end. @@ -165,14 +156,11 @@ cleanup(true = _Enabled) -> resubmit_job(Job, Data, After) -> Now = erlang:system_time(second), SchedTime = Now + After, - couch_jobs:finish(undefined, Job, Data), - couch_jobs:add(undefined, ?JOB_TYPE, ?JOB_ID, Data, SchedTime), + couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) -> + {ok, Job1} = couch_jobs:resubmit(JTx, Job, SchedTime), + ok = couch_jobs:finish(JTx, Job1, Data) + end), ok. - %% couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) -> - %% {ok, Job1} = couch_jobs:resubmit(JTx, Job, SchedTime), - %% ok = couch_jobs:finish(JTx, Job1, Data) - %% end), - %% ok. process_expirations(#{} = Job, #{} = Data) -> |