summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/fabric/include/fabric2.hrl1
-rw-r--r--src/fabric/src/fabric2_db_expiration.erl246
-rw-r--r--src/fabric/src/fabric2_sup.erl10
-rw-r--r--src/fabric/test/fabric2_db_crud_tests.erl104
4 files changed, 358 insertions, 3 deletions
diff --git a/src/fabric/include/fabric2.hrl b/src/fabric/include/fabric2.hrl
index 27f3d61c2..2e71787c3 100644
--- a/src/fabric/include/fabric2.hrl
+++ b/src/fabric/include/fabric2.hrl
@@ -13,6 +13,7 @@
-define(uint2bin(I), binary:encode_unsigned(I, little)).
-define(bin2uint(I), binary:decode_unsigned(I, little)).
+-define(bin2int(V), binary_to_integer(V)).
-define(METADATA_VERSION_KEY, <<16#FF, "/metadataVersion">>).
% Prefix Definitions
diff --git a/src/fabric/src/fabric2_db_expiration.erl b/src/fabric/src/fabric2_db_expiration.erl
new file mode 100644
index 000000000..3363d2427
--- /dev/null
+++ b/src/fabric/src/fabric2_db_expiration.erl
@@ -0,0 +1,246 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric2_db_expiration).
+
+
+-behaviour(gen_server).
+
+
+-export([
+ start_link/0,
+ cleanup/1,
+ process_expirations/2
+]).
+
+-export([
+ init/1,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ terminate/2,
+ code_change/3
+]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("fabric/include/fabric2.hrl").
+
+-define(JOB_TYPE, <<"db_expiration">>).
+-define(JOB_ID, <<"db_expiration_job">>).
+-define(DEFAULT_JOB_Version, 1).
+-define(DEFAULT_RETENTION_SEC, 172800). % 48 hours
+-define(DEFAULT_SCHEDULE_SEC, 3600). % 1 hour
+-define(ERROR_RESCHEDULE_SEC, 5).
+-define(CHECK_ENABLED_SEC, 2).
+-define(JOB_TIMEOUT_SEC, 30).
+
+
+-record(st, {
+ job
+}).
+
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+init(_) ->
+ process_flag(trap_exit, true),
+ {ok, #st{job = undefined}, 0}.
+
+
+terminate(_M, _St) ->
+ ok.
+
+
+handle_call(Msg, _From, St) ->
+ {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
+
+
+handle_cast(Msg, St) ->
+ {stop, {bad_cast, Msg}, St}.
+
+
+handle_info(timeout, #st{job = undefined} = St) ->
+ ok = wait_for_couch_jobs_app(),
+ ok = couch_jobs:set_type_timeout(?JOB_TYPE, ?JOB_TIMEOUT_SEC),
+ ok = maybe_add_job(),
+ Pid = spawn_link(?MODULE, cleanup, [is_enabled()]),
+ {noreply, St#st{job = Pid}};
+
+handle_info({'EXIT', Pid, Exit}, #st{job = Pid} = St) ->
+ case Exit of
+ normal -> ok;
+ Error -> couch_log:error("~p : job error ~p", [?MODULE, Error])
+ end,
+ NewPid = spawn_link(?MODULE, cleanup, [is_enabled()]),
+ {noreply, St#st{job = NewPid}};
+
+handle_info(Msg, St) ->
+ {stop, {bad_info, Msg}, St}.
+
+
+code_change(_OldVsn, St, _Extra) ->
+ {ok, St}.
+
+
+wait_for_couch_jobs_app() ->
+ % Because of a circular dependency between couch_jobs and fabric apps, wait
+ % for couch_jobs to initialize before continuing. If we refactor the
+ % commits FDB utilities out we can remove this bit of code.
+ case lists:keysearch(couch_jobs, 1, application:which_applications()) of
+ {value, {couch_jobs, _, _}} ->
+ ok;
+ false ->
+ timer:sleep(100),
+ wait_for_couch_jobs_app()
+ end.
+
+
+maybe_add_job() ->
+ case couch_jobs:get_job_data(undefined, ?JOB_TYPE, job_id()) of
+ {error, not_found} ->
+ Now = erlang:system_time(second),
+ ok = couch_jobs:add(undefined, ?JOB_TYPE, job_id(), #{}, Now);
+ {ok, _JobData} ->
+ ok
+ end.
+
+
+cleanup(false) ->
+ timer:sleep(?CHECK_ENABLED_SEC * 1000),
+ exit(normal);
+
+cleanup(true) ->
+ Now = erlang:system_time(second),
+ ScheduleSec = schedule_sec(),
+ Opts = #{max_sched_time => Now + min(ScheduleSec div 3, 15)},
+ case couch_jobs:accept(?JOB_TYPE, Opts) of
+ {ok, Job, Data} ->
+ try
+ {ok, Job1, Data1} = ?MODULE:process_expirations(Job, Data),
+ ok = resubmit_job(Job1, Data1, schedule_sec())
+ catch
+ _Tag:Error ->
+ Stack = erlang:get_stacktrace(),
+ couch_log:error("~p : processing error ~p ~p ~p",
+ [?MODULE, Job, Error, Stack]),
+ ok = resubmit_job(Job, Data, ?ERROR_RESCHEDULE_SEC),
+ exit({job_error, Error, Stack})
+ end;
+ {error, not_found} ->
+ timer:sleep(?CHECK_ENABLED_SEC * 1000),
+ ?MODULE:cleanup(is_enabled())
+ end.
+
+
+resubmit_job(Job, Data, After) ->
+ Now = erlang:system_time(second),
+ SchedTime = Now + After,
+ couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+ {ok, Job1} = couch_jobs:resubmit(JTx, Job, SchedTime),
+ ok = couch_jobs:finish(JTx, Job1, Data)
+ end),
+ ok.
+
+
+process_expirations(#{} = Job, #{} = Data) ->
+ Start = now_sec(),
+ Callback = fun(Value, LastUpdateAt) ->
+ case Value of
+ {meta, _} -> ok;
+ {row, DbInfo} -> process_row(DbInfo);
+ complete -> ok
+ end,
+ {ok, maybe_report_progress(Job, LastUpdateAt)}
+ end,
+ {ok, _Infos} = fabric2_db:list_deleted_dbs_info(
+ Callback,
+ Start,
+ [{restart_tx, true}]
+ ),
+ {ok, Job, Data}.
+
+
+process_row(DbInfo) ->
+ DbName = proplists:get_value(db_name, DbInfo),
+ TimeStamp = proplists:get_value(timestamp, DbInfo),
+ Now = now_sec(),
+ Retention = retention_sec(),
+ Since = Now - Retention,
+ case Since >= timestamp_to_sec(TimeStamp) of
+ true ->
+ couch_log:notice("Permanently deleting ~p database with"
+ " timestamp ~p", [DbName, TimeStamp]),
+ ok = fabric2_db:delete(DbName, [{deleted_at, TimeStamp}]);
+ false ->
+ ok
+ end.
+
+
+maybe_report_progress(Job, LastUpdateAt) ->
+ % Update periodically the job so it doesn't expire
+ Now = now_sec(),
+ Progress = #{
+ <<"processed_at">> => Now
+
+ },
+ case (Now - LastUpdateAt) > (?JOB_TIMEOUT_SEC div 2) of
+ true ->
+ couch_jobs:update(undefined, Job, Progress),
+ Now;
+ false ->
+ LastUpdateAt
+ end.
+
+
+job_id() ->
+ JobVersion = job_version(),
+ <<?JOB_ID/binary, "-", JobVersion:16/integer>>.
+
+
+now_sec() ->
+ Now = os:timestamp(),
+ Nowish = calendar:now_to_universal_time(Now),
+ calendar:datetime_to_gregorian_seconds(Nowish).
+
+
+timestamp_to_sec(TimeStamp) ->
+ <<Year:4/binary, "-", Month:2/binary, "-", Day:2/binary,
+ "T",
+ Hour:2/binary, ":", Minutes:2/binary, ":", Second:2/binary,
+ "Z">> = TimeStamp,
+
+ calendar:datetime_to_gregorian_seconds(
+ {{?bin2int(Year), ?bin2int(Month), ?bin2int(Day)},
+ {?bin2int(Hour), ?bin2int(Minutes), ?bin2int(Second)}}
+ ).
+
+
+is_enabled() ->
+ config:get_boolean("couchdb", "db_expiration_enabled", false).
+
+
+job_version() ->
+ config:get_integer("couchdb", "db_expiration_job_version",
+ ?DEFAULT_JOB_Version).
+
+
+retention_sec() ->
+ config:get_integer("couchdb", "db_expiration_retention_sec",
+ ?DEFAULT_RETENTION_SEC).
+
+
+schedule_sec() ->
+ config:get_integer("couchdb", "db_expiration_schedule_sec",
+ ?DEFAULT_SCHEDULE_SEC).
diff --git a/src/fabric/src/fabric2_sup.erl b/src/fabric/src/fabric2_sup.erl
index e8201b4ee..874a8c240 100644
--- a/src/fabric/src/fabric2_sup.erl
+++ b/src/fabric/src/fabric2_sup.erl
@@ -30,7 +30,7 @@ start_link(Args) ->
init([]) ->
config:enable_feature(fdb),
- Flags = {one_for_one, 1, 5},
+ Flags = {rest_for_one, 1, 5},
Children = [
{
fabric2_server,
@@ -55,6 +55,14 @@ init([]) ->
5000,
worker,
[fabric2_index]
+ },
+ {
+ fabric2_db_expiration,
+ {fabric2_db_expiration, start_link, []},
+ permanent,
+ 5000,
+ worker,
+ [fabric2_db_expiration]
}
],
ChildrenWithEpi = couch_epi:register_service(fabric2_epi, Children),
diff --git a/src/fabric/test/fabric2_db_crud_tests.erl b/src/fabric/test/fabric2_db_crud_tests.erl
index 9deb8dd26..b1e15aa2e 100644
--- a/src/fabric/test/fabric2_db_crud_tests.erl
+++ b/src/fabric/test/fabric2_db_crud_tests.erl
@@ -13,6 +13,7 @@
-module(fabric2_db_crud_tests).
+-include_lib("fabric/include/fabric2.hrl").
-include_lib("couch/include/couch_eunit.hrl").
-include_lib("eunit/include/eunit.hrl").
-include("fabric2_test.hrl").
@@ -39,6 +40,8 @@ crud_test_() ->
?TDEF_FE(recreate_db),
?TDEF_FE(undelete_db),
?TDEF_FE(remove_deleted_db),
+ ?TDEF_FE(scheduled_remove_deleted_db),
+ ?TDEF_FE(scheduled_remove_deleted_dbs),
?TDEF_FE(old_db_handle),
?TDEF_FE(list_dbs),
?TDEF_FE(list_dbs_user_fun),
@@ -59,14 +62,42 @@ crud_test_() ->
}.
+scheduled_db_remove_error_test_() ->
+ {
+ "Test scheduled database remove operations",
+ {
+ setup,
+ fun setup_all/0,
+ fun teardown_all/1,
+ {
+ foreach,
+ fun setup/0,
+ fun cleanup/1,
+ [
+ ?TDEF_FE(scheduled_remove_deleted_dbs_with_error)
+ ]
+ }
+ }
+ }.
+
+
setup_all() ->
- Ctx = test_util:start_couch([fabric]),
+ meck:new(config, [passthrough]),
+ meck:expect(config, get_integer, fun
+ ("couchdb", "db_expiration_schedule_sec", _) -> 2;
+ ("couchdb", "db_expiration_retention_sec", _) -> 0;
+ (_, _, Default) -> Default
+ end),
+ Ctx = test_util:start_couch([fabric, couch_jobs]),
meck:new(erlfdb, [passthrough]),
+ meck:new(fabric2_db_expiration, [passthrough]),
Ctx.
teardown_all(Ctx) ->
- meck:unload(),
+ meck:unload(erlfdb),
+ meck:unload(config),
+ meck:unload(fabric2_db_expiration),
test_util:stop_couch(Ctx).
@@ -75,9 +106,12 @@ setup() ->
cleanup(_) ->
+ ok = config:set("couchdb", "db_expiration_enabled", "false", false),
ok = config:set("couchdb", "enable_database_recovery", "false", false),
fabric2_test_util:tx_too_old_reset_errors(),
reset_fail_erfdb_wait(),
+ meck:reset([fabric2_db_expiration]),
+ meck:reset([config]),
meck:reset([erlfdb]).
@@ -205,6 +239,60 @@ remove_deleted_db(_) ->
?assert(not lists:member(DbName, DeletedDbs)).
+scheduled_remove_deleted_db(_) ->
+ ok = config:set("couchdb", "db_expiration_enabled", "true", false),
+ ok = config:set("couchdb", "enable_database_recovery", "true", false),
+ DbName = ?tempdb(),
+ ?assertError(database_does_not_exist, fabric2_db:delete(DbName, [])),
+
+ ?assertMatch({ok, _}, fabric2_db:create(DbName, [])),
+ ?assertEqual(true, ets:member(fabric2_server, DbName)),
+
+ ?assertEqual(ok, fabric2_db:delete(DbName, [])),
+ ?assertEqual(false, ets:member(fabric2_server, DbName)),
+
+ meck:wait(fabric2_db_expiration, process_expirations, '_', 5000),
+
+ {ok, Infos} = fabric2_db:list_deleted_dbs_info(),
+ DeletedDbs = [proplists:get_value(db_name, Info) || Info <- Infos],
+ ?assert(not lists:member(DbName, DeletedDbs)).
+
+
+scheduled_remove_deleted_dbs(_) ->
+ ok = config:set("couchdb", "db_expiration_enabled", "true", false),
+ ok = config:set("couchdb", "db_expiration_batch", "2", false),
+ ok = config:set("couchdb", "enable_database_recovery", "true", false),
+ DbNameList = [create_and_delete_db() || _I <- lists:seq(1, 5)],
+ meck:wait(fabric2_db_expiration, process_expirations, '_', 5000),
+
+ {ok, Infos} = fabric2_db:list_deleted_dbs_info(),
+ DeletedDbs = [proplists:get_value(db_name, Info) || Info <- Infos],
+ lists:map(fun(DbName) ->
+ ?assert(not lists:member(DbName, DeletedDbs))
+ end, DbNameList).
+
+
+scheduled_remove_deleted_dbs_with_error(_) ->
+ meck:expect(fabric2_db_expiration, process_expirations, fun(_, _) ->
+ throw(process_expirations_error)
+ end),
+
+ {Pid, Ref} = spawn_monitor(fun() ->
+ fabric2_db_expiration:cleanup(true)
+ end),
+ receive
+ {'DOWN', Ref, process, Pid, Error} ->
+ ?assertMatch({job_error, process_expirations_error, _}, Error)
+ end,
+ JobType = <<"db_expiration">>,
+ JobId = <<"db_expiration_job">>,
+ FQJobId = <<JobId/binary, "-", 1:16/integer>>,
+
+ ?assertMatch({ok, _}, couch_jobs:get_job_data(undefined, JobType, FQJobId)),
+ {ok, JobState} = couch_jobs:get_job_state(undefined, JobType, FQJobId),
+ ?assert(lists:member(JobState, [pending, running])).
+
+
old_db_handle(_) ->
% db hard deleted
DbName1 = ?tempdb(),
@@ -615,3 +703,15 @@ get_deleted_dbs(DeletedDbInfos) ->
DbName = fabric2_util:get_value(db_name, DbInfo),
[DbName | Acc]
end, [], DeletedDbInfos).
+
+
+create_and_delete_db() ->
+ DbName = ?tempdb(),
+ ?assertError(database_does_not_exist, fabric2_db:delete(DbName, [])),
+
+ ?assertMatch({ok, _}, fabric2_db:create(DbName, [])),
+ ?assertEqual(true, ets:member(fabric2_server, DbName)),
+
+ ?assertEqual(ok, fabric2_db:delete(DbName, [])),
+ ?assertEqual(false, ets:member(fabric2_server, DbName)),
+ DbName.