diff options
author | Nick Vatamaniuc <vatamane@apache.org> | 2020-05-01 20:06:54 -0400 |
---|---|---|
committer | jiangph <jiangph@cn.ibm.com> | 2020-05-11 14:46:58 +0800 |
commit | 2c9d219514d74fb2cc1ca3e2262d10ea5953c4a2 (patch) | |
tree | 0a67ecc2d7167f49f33b0eb6f4f86fac4798a4b2 | |
parent | eacf2b65ad2efb66f5c026af26996b5737c4767c (diff) | |
download | couchdb-2c9d219514d74fb2cc1ca3e2262d10ea5953c4a2.tar.gz |
Update
-rw-r--r-- | src/fabric/include/fabric2.hrl | 9 | ||||
-rw-r--r-- | src/fabric/src/fabric2_db_expiration.erl | 148 | ||||
-rw-r--r-- | src/fabric/src/fabric2_sup.erl | 2 |
3 files changed, 100 insertions, 59 deletions
diff --git a/src/fabric/include/fabric2.hrl b/src/fabric/include/fabric2.hrl index 868bd0f72..4bc6b764a 100644 --- a/src/fabric/include/fabric2.hrl +++ b/src/fabric/include/fabric2.hrl @@ -83,12 +83,3 @@ -define(DEFAULT_BINARY_CHUNK_SIZE, 100000). -define(BINARY_CHUNK_SIZE, 100000). - -% jobs api --define(DB_EXPIRATION_JOB_TYPE, <<"dbexpiration">>). --define(DB_EXPIRATION_JOB, <<"dbexpiration_job">>). - -% settings for background database expiration deletion --define(DEFAULT_RETENTION_SEC, 172800). % 48 hours --define(DEFAULT_EXPIRATION_BATCH, 100). --define(DEFAULT_SCHEDULE_SEC, 3600). % 1 hour diff --git a/src/fabric/src/fabric2_db_expiration.erl b/src/fabric/src/fabric2_db_expiration.erl index bf1037cee..f52d8bca3 100644 --- a/src/fabric/src/fabric2_db_expiration.erl +++ b/src/fabric/src/fabric2_db_expiration.erl @@ -18,7 +18,7 @@ -export([ start_link/0, - accept_job/0 + cleanup/1 ]). -export([ @@ -35,17 +35,29 @@ -include_lib("fabric/include/fabric2.hrl"). +-define(JOB_TYPE, <<"dbexpiration">>). +-define(JOB_ID, <<"dbexpiration_job">>). +-define(DEFAULT_RETENTION_SEC, 172800). % 48 hours +-define(DEFAULT_EXPIRATION_BATCH, 100). +-define(DEFAULT_SCHEDULE_SEC, 7). % 1 hour +-define(ERROR_RESCHEDULE_SEC, 3). +-define(INITIAL_DELAY_MSEC, 900). +-define(CHECK_ENABLED_MSEC, 2000). +-define(JOB_TIMEOUT_SEC, 30). + + -record(st, { - acceptor + job }). start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + gen_server:start_link({local, ?MODULE}, ?MODULE, nil, []). init(_) -> - {ok, nil, 5000}. + couch_log:info("~p : in init", [?MODULE]), + {ok, #st{job = undefined}, ?INITIAL_DELAY_MSEC}. terminate(_M, _St) -> @@ -60,27 +72,27 @@ handle_cast(Msg, St) -> {stop, {bad_cast, Msg}, St}. -handle_info(timeout, St) -> - case wait_for_couch_jobs_app() of - ok -> ok; - retry -> wait_for_couch_jobs_app() - end, - couch_jobs:set_type_timeout(?DB_EXPIRATION_JOB_TYPE, 6), - add_or_get_job(), - {_Pid, Ref} = spawn_monitor(?MODULE, accept_job, []), - {noreply, St#st{acceptor = Ref}}; -handle_info({'DOWN', Ref, process, _Pid, {exit_ok, Resp}}, #st{acceptor=Ref} = St) -> - case is_enabled() of - true -> - process_expiration(); - false -> - ok +handle_info(timeout, #st{job = undefined} = St) -> + couch_log:info("~p : got timeout", [?JOB_ID]), + ok = wait_for_couch_jobs_app(), + ok = couch_jobs:set_type_timeout(?JOB_TYPE, ?JOB_TIMEOUT_SEC), + % for testing + Removed = couch_jobs:remove(undefined, ?JOB_TYPE, ?JOB_ID), + couch_log:info("~p : remove result ~p", [?MODULE, Removed]), + ok = maybe_add_job(), + couch_log:info("~p : added job ~p, initialized", [?MODULE, ?JOB_ID]), + {_, Ref} = spawn_monitor(?MODULE, cleanup, [is_enabled()]), + {noreply, St#st{job = Ref}}; + +handle_info({'DOWN', Ref, _, _, Exit}, #st{job = Ref} = St) -> + couch_log:info("~p : job finished with ~p", [?MODULE, Exit]), + case Exit of + normal -> ok; + Error -> couch_log:error("~p : job error ~p", [?MODULE, Error]) end, - Now = erlang:system_time(second), - {ok, Job, JobData} = Resp, - couch_jobs:resubmit(undefined, Job, Now + schedule_sec(), JobData), - {_Pid, Ref} = spawn_monitor(?MODULE, accept_job, []), - {noreply, St#st{acceptor = Ref}}; + {_, Ref} = spawn_monitor(?MODULE, cleanup, [is_enabled()]), + {noreply, St#st{job = Ref}}; + handle_info(Msg, St) -> {stop, {bad_info, Msg}, St}. @@ -90,44 +102,81 @@ code_change(_OldVsn, St, _Extra) -> wait_for_couch_jobs_app() -> - case lists:keysearch(couch_jobs, 1, application:which_applications()) of - {value, {couch_jobs, _Value}} -> ok; + % Because of a circular dependency between couch_jobs and fabric apps, wait + % for couch_jobs to initialize before continuing. If we refactor the + % commits FDB utilities out we can remove this bit of code. + Apps = try + application:which_applications(1000) + catch + _:_ -> [] + end, + case lists:keysearch(couch_jobs, 1, Apps) of + {value, {couch_jobs, _, _}} -> + couch_log:info("~p : couch_jobs started! ", [?MODULE]), + ok; false -> timer:sleep(1000), - retry + couch_log:info("~p : waiting for couch jobs", [?MODULE]), + wait_for_couch_jobs_app() end. -add_or_get_job() -> - couch_jobs:set_type_timeout(?DB_EXPIRATION_JOB_TYPE, 6), - case couch_jobs:get_job_data( - undefined, - ?DB_EXPIRATION_JOB_TYPE, - ?DB_EXPIRATION_JOB - ) of +maybe_add_job() -> + case couch_jobs:get_job_data(undefined, ?JOB_TYPE, ?JOB_ID) of {error, not_found} -> - couch_jobs:add( - undefined, - ?DB_EXPIRATION_JOB_TYPE, - ?DB_EXPIRATION_JOB, - #{} - ); + Now = erlang:system_time(second), + ok = couch_jobs:add(undefined, ?JOB_TYPE, ?JOB_ID, #{}, Now); {ok, _JobData} -> ok end. -accept_job() -> - try couch_jobs:accept(?DB_EXPIRATION_JOB_TYPE) of - Resp -> - exit({exit_ok, Resp}) - catch - _:Reason -> - exit({exit_error, Reason}) +cleanup(false = _Enabled) -> + couch_log:info("~p : Not enabled waiting ...", [?MODULE]), + timer:sleep(?CHECK_ENABLED_MSEC), + exit(normal); + +cleanup(true = _Enabled) -> + couch_log:info("~p : enable, accepting ...", [?MODULE]), + Opts = #{max_sched_time => erlang:system_time(second), timeout => 1}, + case couch_jobs:accept(?JOB_TYPE, Opts) of + % maybe handle timeout here, need to check the api + {ok, Job, Data} -> + try + couch_log:error("~p : processing expirations ~p ~p", [?MODULE, Job, Data]), + {ok, Job1, Data1} = process_expirations(Job, Data), + couch_log:error("~p : resubmitting job ~p ~p", [?MODULE, Job, schedule_sec()]), + ok = resubmit_job(Job1, Data1, schedule_sec()) + catch + _Tag:Error -> + Stack = erlang:get_stacktrace(), + couch_log:error("~p : processing error ~p ~p ~p", [?MODULE, Job, Error, Stack]), + ok = resubmit_job(Job, Data, ?ERROR_RESCHEDULE_SEC), + exit({job_error, Error, Stack}) + end; + {error, not_found} -> + couch_log:error("~p : not found error", [?MODULE]), + timer:sleep(1000), + cleanup(is_enabled()) end. -process_expiration() -> + +resubmit_job(Job, Data, After) -> + Now = erlang:system_time(second), + SchedTime = Now + After, + couch_jobs:finish(undefined, Job, Data), + couch_jobs:add(undefined, ?JOB_TYPE, ?JOB_ID, Data, SchedTime), + ok. + %% couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) -> + %% {ok, Job1} = couch_jobs:resubmit(JTx, Job, SchedTime), + %% ok = couch_jobs:finish(JTx, Job1, Data) + %% end), + %% ok. + + +process_expirations(#{} = Job, #{} = Data) -> + % Maybe periodically update the job so it doesn't expire Callback = fun(Value, Acc) -> NewAcc = case Value of {meta, _} -> Acc; @@ -144,7 +193,8 @@ process_expiration() -> end, {ok, NewAcc} end, - {ok, _Infos} = fabric2_db:list_deleted_dbs_info(Callback, [], []). + {ok, _Infos} = fabric2_db:list_deleted_dbs_info(Callback, [], []), + {ok, Job, Data}. process_row(Acc, DbInfo) -> diff --git a/src/fabric/src/fabric2_sup.erl b/src/fabric/src/fabric2_sup.erl index 589b70a9c..8faa90dc8 100644 --- a/src/fabric/src/fabric2_sup.erl +++ b/src/fabric/src/fabric2_sup.erl @@ -33,7 +33,7 @@ start_link(Args) -> init([]) -> config:enable_feature(fdb), - Flags = {one_for_one, 1, 5}, + Flags = {one_for_all, 1, 1}, % lowered for debugging Children = [ { fabric2_server, |