summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2020-05-01 20:06:54 -0400
committerjiangph <jiangph@cn.ibm.com>2020-05-11 14:46:58 +0800
commit2c9d219514d74fb2cc1ca3e2262d10ea5953c4a2 (patch)
tree0a67ecc2d7167f49f33b0eb6f4f86fac4798a4b2
parenteacf2b65ad2efb66f5c026af26996b5737c4767c (diff)
downloadcouchdb-2c9d219514d74fb2cc1ca3e2262d10ea5953c4a2.tar.gz
Update
-rw-r--r--src/fabric/include/fabric2.hrl9
-rw-r--r--src/fabric/src/fabric2_db_expiration.erl148
-rw-r--r--src/fabric/src/fabric2_sup.erl2
3 files changed, 100 insertions, 59 deletions
diff --git a/src/fabric/include/fabric2.hrl b/src/fabric/include/fabric2.hrl
index 868bd0f72..4bc6b764a 100644
--- a/src/fabric/include/fabric2.hrl
+++ b/src/fabric/include/fabric2.hrl
@@ -83,12 +83,3 @@
-define(DEFAULT_BINARY_CHUNK_SIZE, 100000).
-define(BINARY_CHUNK_SIZE, 100000).
-
-% jobs api
--define(DB_EXPIRATION_JOB_TYPE, <<"dbexpiration">>).
--define(DB_EXPIRATION_JOB, <<"dbexpiration_job">>).
-
-% settings for background database expiration deletion
--define(DEFAULT_RETENTION_SEC, 172800). % 48 hours
--define(DEFAULT_EXPIRATION_BATCH, 100).
--define(DEFAULT_SCHEDULE_SEC, 3600). % 1 hour
diff --git a/src/fabric/src/fabric2_db_expiration.erl b/src/fabric/src/fabric2_db_expiration.erl
index bf1037cee..f52d8bca3 100644
--- a/src/fabric/src/fabric2_db_expiration.erl
+++ b/src/fabric/src/fabric2_db_expiration.erl
@@ -18,7 +18,7 @@
-export([
start_link/0,
- accept_job/0
+ cleanup/1
]).
-export([
@@ -35,17 +35,29 @@
-include_lib("fabric/include/fabric2.hrl").
+-define(JOB_TYPE, <<"dbexpiration">>).
+-define(JOB_ID, <<"dbexpiration_job">>).
+-define(DEFAULT_RETENTION_SEC, 172800). % 48 hours
+-define(DEFAULT_EXPIRATION_BATCH, 100).
+-define(DEFAULT_SCHEDULE_SEC, 7). % 1 hour
+-define(ERROR_RESCHEDULE_SEC, 3).
+-define(INITIAL_DELAY_MSEC, 900).
+-define(CHECK_ENABLED_MSEC, 2000).
+-define(JOB_TIMEOUT_SEC, 30).
+
+
-record(st, {
- acceptor
+ job
}).
start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+ gen_server:start_link({local, ?MODULE}, ?MODULE, nil, []).
init(_) ->
- {ok, nil, 5000}.
+ couch_log:info("~p : in init", [?MODULE]),
+ {ok, #st{job = undefined}, ?INITIAL_DELAY_MSEC}.
terminate(_M, _St) ->
@@ -60,27 +72,27 @@ handle_cast(Msg, St) ->
{stop, {bad_cast, Msg}, St}.
-handle_info(timeout, St) ->
- case wait_for_couch_jobs_app() of
- ok -> ok;
- retry -> wait_for_couch_jobs_app()
- end,
- couch_jobs:set_type_timeout(?DB_EXPIRATION_JOB_TYPE, 6),
- add_or_get_job(),
- {_Pid, Ref} = spawn_monitor(?MODULE, accept_job, []),
- {noreply, St#st{acceptor = Ref}};
-handle_info({'DOWN', Ref, process, _Pid, {exit_ok, Resp}}, #st{acceptor=Ref} = St) ->
- case is_enabled() of
- true ->
- process_expiration();
- false ->
- ok
+handle_info(timeout, #st{job = undefined} = St) ->
+ couch_log:info("~p : got timeout", [?JOB_ID]),
+ ok = wait_for_couch_jobs_app(),
+ ok = couch_jobs:set_type_timeout(?JOB_TYPE, ?JOB_TIMEOUT_SEC),
+ % for testing
+ Removed = couch_jobs:remove(undefined, ?JOB_TYPE, ?JOB_ID),
+ couch_log:info("~p : remove result ~p", [?MODULE, Removed]),
+ ok = maybe_add_job(),
+ couch_log:info("~p : added job ~p, initialized", [?MODULE, ?JOB_ID]),
+ {_, Ref} = spawn_monitor(?MODULE, cleanup, [is_enabled()]),
+ {noreply, St#st{job = Ref}};
+
+handle_info({'DOWN', Ref, _, _, Exit}, #st{job = Ref} = St) ->
+ couch_log:info("~p : job finished with ~p", [?MODULE, Exit]),
+ case Exit of
+ normal -> ok;
+ Error -> couch_log:error("~p : job error ~p", [?MODULE, Error])
end,
- Now = erlang:system_time(second),
- {ok, Job, JobData} = Resp,
- couch_jobs:resubmit(undefined, Job, Now + schedule_sec(), JobData),
- {_Pid, Ref} = spawn_monitor(?MODULE, accept_job, []),
- {noreply, St#st{acceptor = Ref}};
+ {_, Ref} = spawn_monitor(?MODULE, cleanup, [is_enabled()]),
+ {noreply, St#st{job = Ref}};
+
handle_info(Msg, St) ->
{stop, {bad_info, Msg}, St}.
@@ -90,44 +102,81 @@ code_change(_OldVsn, St, _Extra) ->
wait_for_couch_jobs_app() ->
- case lists:keysearch(couch_jobs, 1, application:which_applications()) of
- {value, {couch_jobs, _Value}} -> ok;
+ % Because of a circular dependency between couch_jobs and fabric apps, wait
+ % for couch_jobs to initialize before continuing. If we refactor the
+ % commits FDB utilities out we can remove this bit of code.
+ Apps = try
+ application:which_applications(1000)
+ catch
+ _:_ -> []
+ end,
+ case lists:keysearch(couch_jobs, 1, Apps) of
+ {value, {couch_jobs, _, _}} ->
+ couch_log:info("~p : couch_jobs started! ", [?MODULE]),
+ ok;
false ->
timer:sleep(1000),
- retry
+ couch_log:info("~p : waiting for couch jobs", [?MODULE]),
+ wait_for_couch_jobs_app()
end.
-add_or_get_job() ->
- couch_jobs:set_type_timeout(?DB_EXPIRATION_JOB_TYPE, 6),
- case couch_jobs:get_job_data(
- undefined,
- ?DB_EXPIRATION_JOB_TYPE,
- ?DB_EXPIRATION_JOB
- ) of
+maybe_add_job() ->
+ case couch_jobs:get_job_data(undefined, ?JOB_TYPE, ?JOB_ID) of
{error, not_found} ->
- couch_jobs:add(
- undefined,
- ?DB_EXPIRATION_JOB_TYPE,
- ?DB_EXPIRATION_JOB,
- #{}
- );
+ Now = erlang:system_time(second),
+ ok = couch_jobs:add(undefined, ?JOB_TYPE, ?JOB_ID, #{}, Now);
{ok, _JobData} ->
ok
end.
-accept_job() ->
- try couch_jobs:accept(?DB_EXPIRATION_JOB_TYPE) of
- Resp ->
- exit({exit_ok, Resp})
- catch
- _:Reason ->
- exit({exit_error, Reason})
+cleanup(false = _Enabled) ->
+ couch_log:info("~p : Not enabled waiting ...", [?MODULE]),
+ timer:sleep(?CHECK_ENABLED_MSEC),
+ exit(normal);
+
+cleanup(true = _Enabled) ->
+ couch_log:info("~p : enable, accepting ...", [?MODULE]),
+ Opts = #{max_sched_time => erlang:system_time(second), timeout => 1},
+ case couch_jobs:accept(?JOB_TYPE, Opts) of
+ % maybe handle timeout here, need to check the api
+ {ok, Job, Data} ->
+ try
+ couch_log:error("~p : processing expirations ~p ~p", [?MODULE, Job, Data]),
+ {ok, Job1, Data1} = process_expirations(Job, Data),
+ couch_log:error("~p : resubmitting job ~p ~p", [?MODULE, Job, schedule_sec()]),
+ ok = resubmit_job(Job1, Data1, schedule_sec())
+ catch
+ _Tag:Error ->
+ Stack = erlang:get_stacktrace(),
+ couch_log:error("~p : processing error ~p ~p ~p", [?MODULE, Job, Error, Stack]),
+ ok = resubmit_job(Job, Data, ?ERROR_RESCHEDULE_SEC),
+ exit({job_error, Error, Stack})
+ end;
+ {error, not_found} ->
+ couch_log:error("~p : not found error", [?MODULE]),
+ timer:sleep(1000),
+ cleanup(is_enabled())
end.
-process_expiration() ->
+
+resubmit_job(Job, Data, After) ->
+ Now = erlang:system_time(second),
+ SchedTime = Now + After,
+ couch_jobs:finish(undefined, Job, Data),
+ couch_jobs:add(undefined, ?JOB_TYPE, ?JOB_ID, Data, SchedTime),
+ ok.
+ %% couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+ %% {ok, Job1} = couch_jobs:resubmit(JTx, Job, SchedTime),
+ %% ok = couch_jobs:finish(JTx, Job1, Data)
+ %% end),
+ %% ok.
+
+
+process_expirations(#{} = Job, #{} = Data) ->
+ % Maybe periodically update the job so it doesn't expire
Callback = fun(Value, Acc) ->
NewAcc = case Value of
{meta, _} -> Acc;
@@ -144,7 +193,8 @@ process_expiration() ->
end,
{ok, NewAcc}
end,
- {ok, _Infos} = fabric2_db:list_deleted_dbs_info(Callback, [], []).
+ {ok, _Infos} = fabric2_db:list_deleted_dbs_info(Callback, [], []),
+ {ok, Job, Data}.
process_row(Acc, DbInfo) ->
diff --git a/src/fabric/src/fabric2_sup.erl b/src/fabric/src/fabric2_sup.erl
index 589b70a9c..8faa90dc8 100644
--- a/src/fabric/src/fabric2_sup.erl
+++ b/src/fabric/src/fabric2_sup.erl
@@ -33,7 +33,7 @@ start_link(Args) ->
init([]) ->
config:enable_feature(fdb),
- Flags = {one_for_one, 1, 5},
+ Flags = {one_for_all, 1, 1}, % lowered for debugging
Children = [
{
fabric2_server,