summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2020-05-05 01:47:25 -0400
committerjiangph <jiangph@cn.ibm.com>2020-05-11 14:46:58 +0800
commit05251adcc1759c19e41b96c7652eb415f49896ce (patch)
tree87fe6a2ee386abb637d1ce376d030be313b70d82
parent2c9d219514d74fb2cc1ca3e2262d10ea5953c4a2 (diff)
downloadcouchdb-05251adcc1759c19e41b96c7652eb415f49896ce.tar.gz
Another update
-rw-r--r--src/fabric/src/fabric2_db_expiration.erl62
1 files changed, 25 insertions, 37 deletions
diff --git a/src/fabric/src/fabric2_db_expiration.erl b/src/fabric/src/fabric2_db_expiration.erl
index f52d8bca3..3e05d65fd 100644
--- a/src/fabric/src/fabric2_db_expiration.erl
+++ b/src/fabric/src/fabric2_db_expiration.erl
@@ -39,10 +39,9 @@
-define(JOB_ID, <<"dbexpiration_job">>).
-define(DEFAULT_RETENTION_SEC, 172800). % 48 hours
-define(DEFAULT_EXPIRATION_BATCH, 100).
--define(DEFAULT_SCHEDULE_SEC, 7). % 1 hour
--define(ERROR_RESCHEDULE_SEC, 3).
--define(INITIAL_DELAY_MSEC, 900).
--define(CHECK_ENABLED_MSEC, 2000).
+-define(DEFAULT_SCHEDULE_SEC, 15). % 1 hour
+-define(ERROR_RESCHEDULE_SEC, 5).
+-define(CHECK_ENABLED_SEC, 2).
-define(JOB_TIMEOUT_SEC, 30).
@@ -52,12 +51,12 @@
start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, nil, []).
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
init(_) ->
- couch_log:info("~p : in init", [?MODULE]),
- {ok, #st{job = undefined}, ?INITIAL_DELAY_MSEC}.
+ process_flag(trap_exit, true),
+ {ok, #st{job = undefined}, 0}.
terminate(_M, _St) ->
@@ -77,21 +76,19 @@ handle_info(timeout, #st{job = undefined} = St) ->
ok = wait_for_couch_jobs_app(),
ok = couch_jobs:set_type_timeout(?JOB_TYPE, ?JOB_TIMEOUT_SEC),
% for testing
- Removed = couch_jobs:remove(undefined, ?JOB_TYPE, ?JOB_ID),
- couch_log:info("~p : remove result ~p", [?MODULE, Removed]),
+ couch_jobs:remove(undefined, ?JOB_TYPE, ?JOB_ID),
ok = maybe_add_job(),
couch_log:info("~p : added job ~p, initialized", [?MODULE, ?JOB_ID]),
- {_, Ref} = spawn_monitor(?MODULE, cleanup, [is_enabled()]),
- {noreply, St#st{job = Ref}};
+ Pid = spawn_link(?MODULE, cleanup, [is_enabled()]),
+ {noreply, St#st{job = Pid}};
-handle_info({'DOWN', Ref, _, _, Exit}, #st{job = Ref} = St) ->
- couch_log:info("~p : job finished with ~p", [?MODULE, Exit]),
+handle_info({'EXIT', Pid, Exit}, #st{job = Pid} = St) ->
case Exit of
normal -> ok;
Error -> couch_log:error("~p : job error ~p", [?MODULE, Error])
end,
- {_, Ref} = spawn_monitor(?MODULE, cleanup, [is_enabled()]),
- {noreply, St#st{job = Ref}};
+ NewPid = spawn_link(?MODULE, cleanup, [is_enabled()]),
+ {noreply, St#st{job = NewPid}};
handle_info(Msg, St) ->
{stop, {bad_info, Msg}, St}.
@@ -105,17 +102,12 @@ wait_for_couch_jobs_app() ->
% Because of a circular dependency between couch_jobs and fabric apps, wait
% for couch_jobs to initialize before continuing. If we refactor the
% commits FDB utilities out we can remove this bit of code.
- Apps = try
- application:which_applications(1000)
- catch
- _:_ -> []
- end,
- case lists:keysearch(couch_jobs, 1, Apps) of
+ case lists:keysearch(couch_jobs, 1, application:which_applications()) of
{value, {couch_jobs, _, _}} ->
couch_log:info("~p : couch_jobs started! ", [?MODULE]),
ok;
false ->
- timer:sleep(1000),
+ timer:sleep(100),
couch_log:info("~p : waiting for couch jobs", [?MODULE]),
wait_for_couch_jobs_app()
end.
@@ -131,21 +123,21 @@ maybe_add_job() ->
end.
-cleanup(false = _Enabled) ->
+cleanup(false) ->
couch_log:info("~p : Not enabled waiting ...", [?MODULE]),
- timer:sleep(?CHECK_ENABLED_MSEC),
+ timer:sleep(?CHECK_ENABLED_SEC * 1000),
exit(normal);
-cleanup(true = _Enabled) ->
- couch_log:info("~p : enable, accepting ...", [?MODULE]),
- Opts = #{max_sched_time => erlang:system_time(second), timeout => 1},
+cleanup(true) ->
+ Now = erlang:system_time(second),
+ Opts = #{max_sched_time => Now + min(?DEFAULT_SCHEDULE_SEC div 3, 15)},
case couch_jobs:accept(?JOB_TYPE, Opts) of
% maybe handle timeout here, need to check the api
{ok, Job, Data} ->
try
couch_log:error("~p : processing expirations ~p ~p", [?MODULE, Job, Data]),
{ok, Job1, Data1} = process_expirations(Job, Data),
- couch_log:error("~p : resubmitting job ~p ~p", [?MODULE, Job, schedule_sec()]),
+ couch_log:error("~p : DONE resubmitting job ~p ~p", [?MODULE, Job, schedule_sec()]),
ok = resubmit_job(Job1, Data1, schedule_sec())
catch
_Tag:Error ->
@@ -155,9 +147,8 @@ cleanup(true = _Enabled) ->
exit({job_error, Error, Stack})
end;
{error, not_found} ->
- couch_log:error("~p : not found error", [?MODULE]),
timer:sleep(1000),
- cleanup(is_enabled())
+ ?MODULE:cleanup(is_enabled())
end.
@@ -165,14 +156,11 @@ cleanup(true = _Enabled) ->
resubmit_job(Job, Data, After) ->
Now = erlang:system_time(second),
SchedTime = Now + After,
- couch_jobs:finish(undefined, Job, Data),
- couch_jobs:add(undefined, ?JOB_TYPE, ?JOB_ID, Data, SchedTime),
+ couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+ {ok, Job1} = couch_jobs:resubmit(JTx, Job, SchedTime),
+ ok = couch_jobs:finish(JTx, Job1, Data)
+ end),
ok.
- %% couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
- %% {ok, Job1} = couch_jobs:resubmit(JTx, Job, SchedTime),
- %% ok = couch_jobs:finish(JTx, Job1, Data)
- %% end),
- %% ok.
process_expirations(#{} = Job, #{} = Data) ->