From 4a904b555a4d5f6bea178583357ff5bbcab7e181 Mon Sep 17 00:00:00 2001 From: Nick Vatamaniuc Date: Thu, 16 May 2019 19:11:50 -0400 Subject: CouchDB background jobs Done: - README file to describe each module - Supervisor and app structures - Main API module (couch_jobs.erl) - FDB read/write code in couch_jobs_fdb.erl - All jobs creation API: add(), remove(), submit(), get_job() - Workers API: finish(), resubmit(), update() and accept() - Directory path caching with metadata version checks - Activity monitor - Type monitor (start notifiers and activity monitors when new types detected) - Job state subscription: subscribe(), unsubscribe() - Tests. 90+% coverage Todo: - Example worker (maybe part of testing) --- rebar.config.script | 1 + rel/reltool.config | 2 + src/couch_jobs/.gitignore | 4 + src/couch_jobs/README.md | 66 +++ src/couch_jobs/rebar.config | 14 + src/couch_jobs/src/couch_jobs.app.src | 31 ++ src/couch_jobs/src/couch_jobs.erl | 300 +++++++++++ src/couch_jobs/src/couch_jobs.hrl | 51 ++ src/couch_jobs/src/couch_jobs_activity_monitor.erl | 137 +++++ .../src/couch_jobs_activity_monitor_sup.erl | 64 +++ src/couch_jobs/src/couch_jobs_app.erl | 26 + src/couch_jobs/src/couch_jobs_fdb.erl | 565 +++++++++++++++++++ src/couch_jobs/src/couch_jobs_notifier.erl | 218 ++++++++ src/couch_jobs/src/couch_jobs_notifier_sup.erl | 64 +++ src/couch_jobs/src/couch_jobs_pending.erl | 159 ++++++ src/couch_jobs/src/couch_jobs_server.erl | 184 +++++++ src/couch_jobs/src/couch_jobs_sup.erl | 66 +++ src/couch_jobs/src/couch_jobs_type_monitor.erl | 78 +++ src/couch_jobs/test/couch_jobs_tests.erl | 597 +++++++++++++++++++++ 19 files changed, 2627 insertions(+) create mode 100644 src/couch_jobs/.gitignore create mode 100644 src/couch_jobs/README.md create mode 100644 src/couch_jobs/rebar.config create mode 100644 src/couch_jobs/src/couch_jobs.app.src create mode 100644 src/couch_jobs/src/couch_jobs.erl create mode 100644 src/couch_jobs/src/couch_jobs.hrl create mode 100644 src/couch_jobs/src/couch_jobs_activity_monitor.erl create mode 100644 src/couch_jobs/src/couch_jobs_activity_monitor_sup.erl create mode 100644 src/couch_jobs/src/couch_jobs_app.erl create mode 100644 src/couch_jobs/src/couch_jobs_fdb.erl create mode 100644 src/couch_jobs/src/couch_jobs_notifier.erl create mode 100644 src/couch_jobs/src/couch_jobs_notifier_sup.erl create mode 100644 src/couch_jobs/src/couch_jobs_pending.erl create mode 100644 src/couch_jobs/src/couch_jobs_server.erl create mode 100644 src/couch_jobs/src/couch_jobs_sup.erl create mode 100644 src/couch_jobs/src/couch_jobs_type_monitor.erl create mode 100644 src/couch_jobs/test/couch_jobs_tests.erl diff --git a/rebar.config.script b/rebar.config.script index 3b58bcb1e..2def72484 100644 --- a/rebar.config.script +++ b/rebar.config.script @@ -76,6 +76,7 @@ SubDirs = [ "src/couch_tests", "src/ddoc_cache", "src/fabric", + "src/couch_jobs", "src/global_changes", "src/mango", "src/rexi", diff --git a/rel/reltool.config b/rel/reltool.config index 1051d2e77..afebc4466 100644 --- a/rel/reltool.config +++ b/rel/reltool.config @@ -34,6 +34,7 @@ couch, couch_epi, couch_index, + couch_jobs, couch_log, couch_mrview, couch_plugins, @@ -90,6 +91,7 @@ {app, config, [{incl_cond, include}]}, {app, couch, [{incl_cond, include}]}, {app, couch_epi, [{incl_cond, include}]}, + {app, couch_jobs, [{incl_cond, include}]}, {app, couch_index, [{incl_cond, include}]}, {app, couch_log, [{incl_cond, include}]}, {app, couch_mrview, [{incl_cond, include}]}, diff --git a/src/couch_jobs/.gitignore b/src/couch_jobs/.gitignore new file mode 100644 index 000000000..6ef4c5212 --- /dev/null +++ b/src/couch_jobs/.gitignore @@ -0,0 +1,4 @@ +*.beam +.eunit +ebin/couch_jobs.app +.DS_Store \ No newline at end of file diff --git a/src/couch_jobs/README.md b/src/couch_jobs/README.md new file mode 100644 index 000000000..b0552b233 --- /dev/null +++ b/src/couch_jobs/README.md @@ -0,0 +1,66 @@ +CouchDB Jobs Application +======================== + +Run background jobs in CouchDB + +Design (RFC) discussion: https://github.com/apache/couchdb-documentation/pull/409/files + +This is a description of some of the modules: + + * `couch_jobs`: The main API module. It contains functions for creating, + accepting, executing, and monitoring jobs. A common pattern in this module + is to get a jobs transaction object (named `JTx` throughout the code), then + start a transaction and call a bunch of functions from `couch_jobs_fdb` in + that transaction. + + * `couch_jobs_fdb`: This is a layer that talks to FDB. There is a lot of tuple + packing and unpacking, reading ranges and also managing transaction objects. + + * `couch_jobs_pending`: This module implements the pending jobs queue. These + functions could all go in `couch_jobs_fdb` but the implemention was fairly + self-contained, with its own private helper functions, so it made sense to + move to a separate module. + + * `couch_jobs_activity_monitor`: Here is where the "activity monitor" + functionality is implemented. That's done with `gen_server` instance running for + each type. This `gen_server` periodically check if there are inactive jobs + for its type, and if they are, it re-enqueues them. If the timeout value + changes, then it skips the pending check, until the new timeout expires. + + * `couch_jobs_activity_monitor_sup` : This is a simple one-for-one supervisor + to spawn `couch_job_activity_monitor` instances for each type. + + * `couch_jobs_type_monitor` : This is a helper process meant to be + spawn_linked from a parent `gen_server` and use to monitor activity for a + particular job type. If any jobs of that type have an update it notifies the + parent process. To avoid firing for every single update, there is a + configurable `holdoff` parameter to wait a bit before notifying the parent + process. This functionality is used by the `couch_jobs_notifier` to drive + job state subscriptions. + + * `couch_jobs_notifier`: Is responsible for job state subscriptions. Just like + with activity monitors there is a `gen_server` instance of this running per + each type. It uses a linked `couch_jobs_type_monitor` process to wait for + any job updates. When an update notification arrives, it can efficiently + find out if any active jobs have been updated, by reading the `(?JOBS, + ?ACTIVITY, Type, Sequence)` range. That should account for the bulk of + changes. The jobs that are not active anymore, are queried individually. + Subscriptions are managed in an ordered set ETS table with the schema that + looks like `{{JobId, Ref}, {Fun, Pid, JobState}}`. The `Ref` is reference + used to track individual subscriptions and also used to monitor subscriber + processes in case they die (then it it auto-subscribes them). + + * `couch_jobs_notifier_sup`: A simple one-for-one supervisor to spawn + `couch_job_notifier` processes for each type. + + * `couch_jobs_server`: This is a `gen_server` which keeps track of job + types. It then starts or stops activity monitors and notifiers for each + type. To do that it queries the ` (?JOBS, ?ACTIVITY_TIMEOUT)` periodically. + + * `couch_jobs_sup`: This is the application supervisor. The restart strategy + is `rest_for_one`, meaning that a when a child restarts, the sibling + following it will restart. One interesting entry there is the first child + which is used just to create an ETS table used by `couch_jobs_fdb` to cache + transaction object (`JTx` mentioned above). That child calls `init_cache/0`, + where it creates the ETS then returns with `ignore` so it doesn't actually + spawn a process. The ETS table will be owned by the supervisor process. diff --git a/src/couch_jobs/rebar.config b/src/couch_jobs/rebar.config new file mode 100644 index 000000000..362c8785e --- /dev/null +++ b/src/couch_jobs/rebar.config @@ -0,0 +1,14 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +{cover_enabled, true}. +{cover_print_enabled, true}. diff --git a/src/couch_jobs/src/couch_jobs.app.src b/src/couch_jobs/src/couch_jobs.app.src new file mode 100644 index 000000000..8ded14c6f --- /dev/null +++ b/src/couch_jobs/src/couch_jobs.app.src @@ -0,0 +1,31 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +{application, couch_jobs, [ + {description, "CouchDB Jobs"}, + {vsn, git}, + {mod, {couch_jobs_app, []}}, + {registered, [ + couch_jobs_sup, + couch_jobs_activity_monitor_sup, + couch_jobs_notifier_sup, + couch_jobs_server + ]}, + {applications, [ + kernel, + stdlib, + erlfdb, + couch_log, + config, + fabric + ]} +]}. diff --git a/src/couch_jobs/src/couch_jobs.erl b/src/couch_jobs/src/couch_jobs.erl new file mode 100644 index 000000000..20cfb5de2 --- /dev/null +++ b/src/couch_jobs/src/couch_jobs.erl @@ -0,0 +1,300 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_jobs). + +-export([ + add/3, + remove/2, + stop_and_remove/3, + resubmit/2, + resubmit/3, + get_job/2, + get_jobs/1, + get_jobs/0, + + accept/1, + accept/2, + finish/5, + resubmit/5, + update/5, + + subscribe/2, + subscribe/3, + unsubscribe/1, + wait_job_state/2, + wait_job_state/3, + + set_type_timeout/2, + clear_type_timeout/1, + get_type_timeout/1 +]). + + +-include("couch_jobs.hrl"). + + +%% Job Creation API + +-spec add(job_type(), job_id(), job_opts()) -> ok | {error, any()}. +add(Type, JobId, JobOpts) -> + try validate_job_opts(JobOpts) of + ok -> + couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) -> + couch_jobs_fdb:add(JTx, Type, JobId, JobOpts) + end) + catch + Tag:Err -> {error, {invalid_args, {Tag, Err}}} + end. + + +-spec remove(job_type(), job_id()) -> ok | not_found | canceled. +remove(Type, JobId) -> + couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) -> + couch_jobs_fdb:remove(JTx, Type, JobId) + end). + + +-spec stop_and_remove(job_type(), job_id(), timeout()) -> + ok | not_found | timeout. +stop_and_remove(Type, JobId, Timeout) -> + case remove(Type, JobId) of + not_found -> + not_found; + ok -> + ok; + canceled -> + case subscribe(Type, JobId) of + not_found -> + not_found; + finished -> + ok = remove(Type, JobId); + {ok, SubId, _JobState} -> + case wait_job_state(SubId, finished, Timeout) of + timeout -> + timeout; + {Type, JobId, finished} -> + ok = remove(Type, JobId) + end + end + end. + + +-spec resubmit(job_type(), job_id()) -> ok | not_found. +resubmit(Type, JobId) -> + couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) -> + couch_jobs_fdb:resubmit(JTx, Type, JobId, undefined) + end). + + +-spec resubmit(job_type(), job_id(), job_priority()) -> ok | not_found. +resubmit(Type, JobId, NewPriority) -> + couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) -> + couch_jobs_fdb:resubmit(JTx, Type, JobId, NewPriority) + end). + + +-spec get_job(job_type(), job_id()) -> {ok, job_opts(), job_state()} + | not_found. +get_job(Type, JobId) -> + couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) -> + couch_jobs_fdb:get_job(JTx, Type, JobId) + end). + + +-spec get_jobs(job_type()) -> [{job_id(), job_state(), job_opts()}]. +get_jobs(Type) -> + couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) -> + couch_jobs_fdb:get_jobs(JTx, Type) + end). + + +-spec get_jobs() -> [{job_type(), job_id(), job_state(), job_opts()}]. +get_jobs() -> + couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) -> + couch_jobs_fdb:get_jobs(JTx) + end). + + +%% Worker Implementation API + +-spec accept(job_type()) -> {ok, job_id(), worker_lock()} | not_found. +accept(Type) -> + accept(Type, undefined). + + +-spec accept(job_type(), job_priority()) -> {ok, job_id(), worker_lock()} + | not_found. +accept(Type, MaxPriority) -> + couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) -> + couch_jobs_fdb:accept(JTx, Type, MaxPriority) + end). + + +-spec finish(jtx(), job_type(), job_id(), job_opts(), worker_lock()) -> ok | + worker_conflict | no_return(). +finish(Tx, Type, JobId, JobOpts, WorkerLockId) -> + couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) -> + couch_jobs_fdb:finish(JTx, Type, JobId, JobOpts, WorkerLockId) + end). + + +-spec resubmit(jtx(), job_type(), job_id(), job_priority() | undefined, + worker_lock()) -> ok | worker_conflict | canceled | no_return(). +resubmit(Tx, Type, JobId, NewPriority, WorkerLockId) -> + couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) -> + couch_jobs_fdb:resubmit(JTx, Type, JobId, NewPriority, WorkerLockId) + end). + + +-spec update(jtx(), job_type(), job_id(), job_opts(), worker_lock()) -> ok | + worker_conflict | canceled | no_return(). +update(Tx, Type, JobId, JobOpts, WorkerLockId) -> + couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) -> + couch_jobs_fdb:update(JTx, Type, JobId, JobOpts, WorkerLockId) + end). + + +%% Subscription API + +% Receive events as messages. Wait for them using `wait_job_state/2,3` +% functions. +% +-spec subscribe(job_type(), job_id()) -> {ok, job_subscription(), job_state()} + | not_found | {error, any()}. +subscribe(Type, JobId) -> + case couch_jobs_server:get_notifier_server(Type) of + {ok, Server} -> + case couch_jobs_notifier:subscribe(Server, JobId, self()) of + {Ref, JobState} -> {ok, {Server, Ref}, JobState}; + not_found -> not_found; + finished -> finished + end; + {error, Error} -> + {error, Error} + end. + + +% Receive events as callbacks. Callback arguments will be: +% Fun(SubscriptionRef, Type, JobId, JobState) +% +% Returns: +% - {ok, SubscriptionRef, CurrentState} where: +% - SubscriptionRef is opaque reference for the subscription +% - CurrentState is the current job state +% - `not_found` if job was not found +% - `finished` if job is already in the `finished` state +% +-spec subscribe(job_type(), job_id(), job_callback()) -> {ok, + job_subscription(), job_state()} | not_found | {error, any()}. +subscribe(Type, JobId, Fun) when is_function(Fun, 3) -> + case couch_jobs_server:get_notifier_server(Type) of + {ok, Server} -> + case couch_jobs_notifier:subscribe(Server, JobId, Fun, self()) of + {Ref, JobState} -> {ok, {Server, Ref}, JobState}; + not_found -> not_found; + finished -> finished + end; + {error, Error} -> + {error, Error} + end. + + +% Unsubscribe from getting notifications based on a particular subscription. +% Each subscription should be followed by its own unsubscription call. However, +% subscriber processes are also monitored and auto-unsubscribed if they exit. +% So the subscribing process is exiting, calling this function is optional. +% +-spec unsubscribe(job_subscription()) -> ok. +unsubscribe({Server, Ref}) when is_pid(Server), is_reference(Ref) -> + try + couch_jobs_notifier:unsubscribe(Server, Ref) + after + flush_notifications(Ref) + end. + + +% Wait to receive job state updates as messages. +% +-spec wait_job_state(job_subscription(), timeout()) -> {job_type(), job_id(), + job_state()} | timeout. +wait_job_state({_, Ref}, Timeout) -> + receive + {?COUCH_JOBS_EVENT, Ref, Type, JobId, JobState} -> + {Type, JobId, JobState} + after + Timeout -> timeout + end. + + +% Wait for a particular job state received as a message. +% +-spec wait_job_state(job_subscription(), job_state(), timeout()) -> + {job_type(), job_id(), job_state()} | timeout. +wait_job_state({_, Ref}, JobState, Timeout) -> + receive + {?COUCH_JOBS_EVENT, Ref, Type, JobId, JobState} -> + {Type, JobId, JobState} + after + Timeout -> timeout + end. + + +%% Job type timeout API + +% These functions manipulate the activity timeout for each job type. + +-spec set_type_timeout(job_type(), timeout()) -> ok. +set_type_timeout(Type, Timeout) -> + couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) -> + couch_jobs_fdb:set_type_timeout(JTx, Type, Timeout) + end). + + +-spec clear_type_timeout(job_type()) -> ok. +clear_type_timeout(Type) -> + couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) -> + couch_jobs_fdb:clear_type_timeout(JTx, Type) + end). + + +-spec get_type_timeout(job_type()) -> timeout(). +get_type_timeout(Type) -> + couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) -> + couch_jobs_fdb:get_type_timeout(JTx, Type) + end). + + +%% Private utilities + +validate_job_opts(#{} = JobOpts) -> + jiffy:encode(JobOpts), + case maps:get(?OPT_RESUBMIT, JobOpts, undefined) of + undefined -> ok; + true -> ok; + Resubmit -> error({invalid_resubmit, Resubmit, JobOpts}) + end, + case maps:get(?OPT_PRIORITY, JobOpts, undefined) of + undefined -> ok; + Binary when is_binary(Binary) -> ok; + Int when is_integer(Int), Int >= 0 -> ok; + Priority -> error({invalid_priority, Priority, JobOpts}) + end. + + +flush_notifications(Ref) -> + receive + {?COUCH_JOBS_EVENT, Ref, _, _, _} -> + flush_notifications(Ref) + after + 0 -> ok + end. diff --git a/src/couch_jobs/src/couch_jobs.hrl b/src/couch_jobs/src/couch_jobs.hrl new file mode 100644 index 000000000..dc3e7ffb1 --- /dev/null +++ b/src/couch_jobs/src/couch_jobs.hrl @@ -0,0 +1,51 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + + +% JobOpts map/json field definitions +% +-define(OPT_PRIORITY, <<"priority">>). +-define(OPT_DATA, <<"data">>). +-define(OPT_CANCEL, <<"cancel">>). +-define(OPT_RESUBMIT, <<"resubmit">>). + +% These might be in a fabric public hrl eventually +% +-define(uint2bin(I), binary:encode_unsigned(I, little)). +-define(bin2uint(I), binary:decode_unsigned(I, little)). +-define(UNSET_VS, {versionstamp, 16#FFFFFFFFFFFFFFFF, 16#FFFF}). +-define(METADATA_VERSION_KEY, <<"$metadata_version_key$">>). + +% Data model definitions +% +-define(JOBS, 51). % coordinate with fabric2.hrl +-define(DATA, 1). +-define(PENDING, 2). +-define(WATCHES, 3). +-define(ACTIVITY_TIMEOUT, 4). +-define(ACTIVITY, 5). + + +% Couch jobs event notifier tag +-define(COUCH_JOBS_EVENT, '$couch_jobs_event'). + + +-type jtx() :: map(). +-type job_id() :: binary(). +-type job_type() :: tuple() | binary() | non_neg_integer(). +-type job_opts() :: map(). +-type job_state() :: running | pending | finished. +-type job_priority() :: tuple() | binary() | non_neg_integer(). +-type job_subscription() :: {pid(), reference()}. +-type job_callback() :: fun((job_subscription(), job_type(), job_id(), + job_state()) -> ok). +-type worker_lock() :: binary(). diff --git a/src/couch_jobs/src/couch_jobs_activity_monitor.erl b/src/couch_jobs/src/couch_jobs_activity_monitor.erl new file mode 100644 index 000000000..07ff66aa2 --- /dev/null +++ b/src/couch_jobs/src/couch_jobs_activity_monitor.erl @@ -0,0 +1,137 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_jobs_activity_monitor). + +-behaviour(gen_server). + + +-export([ + start_link/1 +]). + +-export([ + init/1, + terminate/2, + handle_call/3, + handle_cast/2, + handle_info/2, + code_change/3 +]). + +-record(st, { + jtx, + type, + tref, + timeout = 0, + vs = not_found +}). + + +-define(MAX_JITTER_DEFAULT, 15000). +-define(MISSING_TIMEOUT_CHECK, 5000). + + +start_link(Type) -> + gen_server:start_link(?MODULE, [Type], []). + + +%% gen_server callbacks + +init([Type]) -> + St = #st{jtx = couch_jobs_fdb:get_jtx(), type = Type}, + {ok, schedule_check(St)}. + + +terminate(_, _St) -> + ok. + + +handle_call(Msg, _From, St) -> + {stop, {bad_call, Msg}, {bad_call, Msg}, St}. + + +handle_cast(Msg, St) -> + {stop, {bad_cast, Msg}, St}. + + +handle_info(check_activity, St) -> + St1 = check_activity(St), + St2 = schedule_check(St1), + {noreply, St2}; + +handle_info(Msg, St) -> + {stop, {bad_info, Msg}, St}. + + +code_change(_OldVsn, St, _Extra) -> + {ok, St}. + + +% Private helper functions + +check_activity(#st{vs = not_found} = St) -> + St#st{vs = get_vs(St)}; + +check_activity(#st{} = St) -> + NewVS = get_vs(St), + case get_inactive_since(St) of + [] -> ok; + [_ | _] -> re_enqueue_inactive(St) + end, + St#st{vs = NewVS}. + + +get_timeout_msec(JTx, Type) -> + TimeoutVal = couch_jobs_fdb:tx(JTx, fun(JTx1) -> + couch_jobs_fdb:get_type_timeout(JTx1, Type) + end), + case TimeoutVal of + not_found -> not_found; + ValSeconds -> timer:seconds(ValSeconds) + end. + + +schedule_check(#st{jtx = JTx, type = Type, timeout = OldTimeout} = St) -> + % Reset versionstamp if timeout changed. + St1 = case get_timeout_msec(JTx, Type) of + not_found -> St#st{vs = not_found, timeout = ?MISSING_TIMEOUT_CHECK}; + OldTimeout -> St; + NewTimeout -> St#st{vs = not_found, timeout = NewTimeout} + end, + #st{timeout = Timeout} = St1, + MaxJitter = max(Timeout div 2, get_max_jitter_msec()), + Wait = Timeout + rand:uniform(min(1, MaxJitter)), + St1#st{tref = erlang:send_after(Wait, self(), check_activity)}. + + +get_vs(#st{jtx = JTx, type = Type}) -> + couch_jobs_fdb:tx(JTx, fun(JTx1) -> + couch_jobs_fdb:get_activity_vs(JTx1, Type) + end). + + +re_enqueue_inactive(#st{jtx = JTx, type = Type, vs = VS}) -> + couch_jobs_fdb:tx(JTx, fun(JTx1) -> + couch_jobs_fdb:re_enqueue_inactive(JTx1, Type, VS) + end). + + +get_inactive_since(#st{jtx = JTx, type = Type, vs = VS}) -> + couch_jobs_fdb:tx(JTx, fun(JTx1) -> + couch_jobs_fdb:get_inactive_since(JTx1, Type, VS) + end). + + +get_max_jitter_msec()-> + config:get_integer("couch_jobs", "activity_monitor_max_jitter_msec", + ?MAX_JITTER_DEFAULT). diff --git a/src/couch_jobs/src/couch_jobs_activity_monitor_sup.erl b/src/couch_jobs/src/couch_jobs_activity_monitor_sup.erl new file mode 100644 index 000000000..b11161a24 --- /dev/null +++ b/src/couch_jobs/src/couch_jobs_activity_monitor_sup.erl @@ -0,0 +1,64 @@ +% +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_jobs_activity_monitor_sup). + + +-behaviour(supervisor). + + +-export([ + start_link/0, + + start_monitor/1, + stop_monitor/1, + get_child_pids/0 +]). + +-export([ + init/1 +]). + + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + + +start_monitor(Type) -> + supervisor:start_child(?MODULE, [Type]). + + +stop_monitor(Pid) -> + supervisor:terminate_child(?MODULE, Pid). + + +get_child_pids() -> + lists:map(fun({_Id, Pid, _Type, _Mod}) -> + Pid + end, supervisor:which_children(?MODULE)). + + +init(_) -> + Flags = #{ + strategy => simple_one_for_one, + intensity => 10, + period => 3 + }, + Children = [ + #{ + id => couch_jobs_monitor, + restart => temporary, + start => {couch_jobs_activity_monitor, start_link, []} + } + ], + {ok, {Flags, Children}}. diff --git a/src/couch_jobs/src/couch_jobs_app.erl b/src/couch_jobs/src/couch_jobs_app.erl new file mode 100644 index 000000000..720b94891 --- /dev/null +++ b/src/couch_jobs/src/couch_jobs_app.erl @@ -0,0 +1,26 @@ +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_jobs_app). + + +-behaviour(application). + + +-export([ + start/2, + stop/1 +]). + + +start(_Type, []) -> + couch_jobs_sup:start_link(). + + +stop([]) -> + ok. diff --git a/src/couch_jobs/src/couch_jobs_fdb.erl b/src/couch_jobs/src/couch_jobs_fdb.erl new file mode 100644 index 000000000..093516858 --- /dev/null +++ b/src/couch_jobs/src/couch_jobs_fdb.erl @@ -0,0 +1,565 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_jobs_fdb). + + +-export([ + add/4, + remove/3, + resubmit/4, + get_job/3, + get_jobs/2, + get_jobs/1, + + accept/3, + finish/5, + resubmit/5, + update/5, + + set_type_timeout/3, + clear_type_timeout/2, + get_type_timeout/2, + get_types/1, + + get_activity_vs/2, + get_activity_vs_and_watch/2, + get_active_since/3, + get_inactive_since/3, + re_enqueue_inactive/3, + + init_cache/0, + + get_jtx/0, + get_jtx/1, + tx/2, + + has_versionstamp/1, + + clear_jobs/0, + clear_type/1 +]). + + +-include("couch_jobs.hrl"). + + +% Data model +% +% (?JOBS, ?DATA, Type, JobId) = (Sequence, WorkerLockId, Priority, JobOpts) +% (?JOBS, ?PENDING, Type, Priority, JobId) = "" +% (?JOBS, ?WATCHES, Type) = Sequence +% (?JOBS, ?ACTIVITY_TIMEOUT, Type) = ActivityTimeout +% (?JOBS, ?ACTIVITY, Type, Sequence) = JobId + +% Job creation API + +add(#{jtx := true} = JTx0, Type, JobId, JobOpts) -> + #{tx := Tx, jobs_path := Jobs} = JTx = get_jtx(JTx0), + case get_type_timeout(JTx, Type) of + not_found -> + {error, no_type_timeout}; + Int when is_integer(Int) -> + Key = erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs), + case erlfdb:wait(erlfdb:get(Tx, Key)) of + <<_/binary>> -> {error, duplicate_job}; + not_found -> maybe_enqueue(JTx, Type, JobId, JobOpts, true) + end + end. + + +remove(#{jtx := true} = JTx0, Type, JobId) -> + #{tx := Tx, jobs_path := Jobs} = JTx = get_jtx(JTx0), + Key = erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs), + case get_job(Tx, Key) of + {_, WorkerLockId, _Priority, _} = Job when WorkerLockId =/= null -> + ok = cancel(JTx, Key, Job), + canceled; + {_, _WorkerLockId, Priority, _} when Priority =/= null -> + couch_jobs_pending:remove(JTx, Type, Priority, JobId), + erlfdb:clear(Tx, Key), + ok; + {_, _WorkerLockId, _Priority, _} -> + erlfdb:clear(Tx, Key), + ok; + not_found -> + not_found + end. + + +resubmit(#{jtx := true} = JTx, Type, JobId, NewPriority) -> + #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx), + Key = erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs), + case get_job(Tx, Key) of + {Seq, WorkerLockId, Priority, #{} = JobOpts} -> + JobOpts1 = JobOpts#{?OPT_RESUBMIT => true}, + JobOpts2 = update_priority(JobOpts1, NewPriority), + JobOptsEnc = jiffy:encode(JobOpts2), + Val = erlfdb_tuple:pack({Seq, WorkerLockId, Priority, JobOptsEnc}), + erlfdb:set(Tx, Key, Val), + ok; + not_found -> + not_found + end. + + +get_job(#{jtx := true} = JTx, Type, JobId) -> + #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx), + Key = erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs), + case get_job(Tx, Key) of + {_, WorkerLockId, Priority, JobOpts} -> + {ok, JobOpts, job_state(WorkerLockId, Priority)}; + not_found -> + not_found + end. + + +get_jobs(#{jtx := true} = JTx, Type) -> + #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx), + Prefix = erlfdb_tuple:pack({?DATA, Type}, Jobs), + Opts = [{streaming_mode, want_all}], + Result = erlfdb:wait(erlfdb:get_range_startswith(Tx, Prefix, Opts)), + lists:map(fun({K, V}) -> + {JobId} = erlfdb_tuple:unpack(K, Prefix), + {_Seq, WorkerLockId, Priority, JobOpts} = unpack_job(V), + JobState = job_state(WorkerLockId, Priority), + {JobId, JobState, JobOpts} + end, Result). + + +get_jobs(#{jtx := true} = JTx) -> + #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx), + Prefix = erlfdb_tuple:pack({?DATA}, Jobs), + Opts = [{streaming_mode, want_all}], + Result = erlfdb:wait(erlfdb:get_range_startswith(Tx, Prefix, Opts)), + lists:map(fun({K, V}) -> + {Type, JobId} = erlfdb_tuple:unpack(K, Prefix), + {_Seq, WorkerLockId, Priority, JobOpts} = unpack_job(V), + JobState = job_state(WorkerLockId, Priority), + {Type, JobId, JobState, JobOpts} + end, Result). + + +% Worker public API + +accept(#{jtx := true} = JTx0, Type, MaxPriority) -> + #{jtx := true} = JTx = get_jtx(JTx0), + case couch_jobs_pending:dequeue(JTx, Type, MaxPriority) of + not_found -> + not_found; + <<_/binary>> = JobId -> + WorkerLockId = fabric2_util:uuid(), + update_lock(JTx, Type, JobId, WorkerLockId), + update_activity(JTx, Type, JobId, null), + {ok, JobId, WorkerLockId} + end. + + +finish(#{jtx := true} = JTx0, Type, JobId, JobOpts, WorkerLockId) -> + #{tx := Tx, jobs_path := Jobs} = JTx = get_jtx(JTx0), + Key = erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs), + case get_job_and_status(Tx, Key, WorkerLockId) of + {Status, {Seq, _, _, JobOptsCur}} when + Status =:= ok orelse Status =:= canceled -> + % If the job was canceled, allow updating its data one last time + JobOpts1 = maps:merge(JobOptsCur, JobOpts), + Resubmit = maps:get(?OPT_RESUBMIT, JobOpts1, false) == true, + maybe_enqueue(JTx, Type, JobId, JobOpts1, Resubmit), + clear_activity(JTx, Type, Seq), + update_watch(JTx, Type), + ok; + {worker_conflict, _} -> + worker_conflict + end. + + +resubmit(#{jtx := true} = JTx0, Type, JobId, NewPriority, WorkerLockId) -> + #{tx := Tx, jobs_path := Jobs} = JTx = get_jtx(JTx0), + Key = erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs), + case get_job_and_status(Tx, Key, WorkerLockId) of + {ok, {Seq, WorkerLockId, null, JobOpts}} -> + update_activity(JTx, Type, JobId, Seq), + JobOpts1 = JobOpts#{?OPT_RESUBMIT => true}, + JobOpts2 = update_priority(JobOpts1, NewPriority), + update_job(JTx, Type, JobId, WorkerLockId, JobOpts2, Seq), + ok; + {Status, _} when Status =/= ok -> + Status + end. + + +update(#{jtx := true} = JTx, Type, JobId, JobOpts, WorkerLockId) -> + #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx), + Key = erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs), + case get_job_and_status(Tx, Key, WorkerLockId) of + {ok, {Seq, WorkerLockId, null, JobOptsCur}} -> + JobOpts1 = maps:merge(JobOptsCur, JobOpts), + update_job(JTx, Type, JobId, WorkerLockId, JobOpts1, Seq), + ok; + {Status, _} when Status =/= ok -> + Status + end. + + +% Type and activity monitoring API + +set_type_timeout(#{jtx := true} = JTx, Type, Timeout) -> + #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx), + Key = erlfdb_tuple:pack({?ACTIVITY_TIMEOUT, Type}, Jobs), + Val = erlfdb_tuple:pack({Timeout}), + erlfdb:set(Tx, Key, Val). + + +clear_type_timeout(#{jtx := true} = JTx, Type) -> + #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx), + Key = erlfdb_tuple:pack({?ACTIVITY_TIMEOUT, Type}, Jobs), + erlfdb:clear(Tx, Key). + + +get_type_timeout(#{jtx := true} = JTx, Type) -> + #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx), + Key = erlfdb_tuple:pack({?ACTIVITY_TIMEOUT, Type}, Jobs), + case erlfdb:wait(erlfdb:get_ss(Tx, Key)) of + not_found -> + not_found; + Val -> + {Timeout} = erlfdb_tuple:unpack(Val), + Timeout + end. + + +get_types(#{jtx := true} = JTx) -> + #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx), + Prefix = erlfdb_tuple:pack({?ACTIVITY_TIMEOUT}, Jobs), + Opts = [{streaming_mode, want_all}], + Result = erlfdb:wait(erlfdb:get_range_startswith(Tx, Prefix, Opts)), + lists:map(fun({K, _V}) -> + {Type} = erlfdb_tuple:unpack(K, Prefix), + Type + end, Result). + + +get_activity_vs(#{jtx := true} = JTx, Type) -> + #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx), + Key = erlfdb_tuple:pack({?WATCHES, Type}, Jobs), + case erlfdb:wait(erlfdb:get(Tx, Key)) of + not_found -> + not_found; + Val -> + {VS} = erlfdb_tuple:unpack(Val), + VS + end. + + +get_activity_vs_and_watch(#{jtx := true} = JTx, Type) -> + #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx), + Key = erlfdb_tuple:pack({?WATCHES, Type}, Jobs), + Future = erlfdb:get(Tx, Key), + Watch = erlfdb:watch(Tx, Key), + case erlfdb:wait(Future) of + not_found -> + {not_found, Watch}; + Val -> + {VS} = erlfdb_tuple:unpack(Val), + {VS, Watch} + end. + + +get_active_since(#{jtx := true} = JTx, Type, Versionstamp) -> + #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx), + Prefix = erlfdb_tuple:pack({?ACTIVITY}, Jobs), + StartKey = erlfdb_tuple:pack({Type, Versionstamp}, Prefix), + StartKeySel = erlfdb_key:first_greater_than(StartKey), + {_, EndKey} = erlfdb_tuple:range({Type}, Prefix), + Opts = [{streaming_mode, want_all}], + Future = erlfdb:get_range(Tx, StartKeySel, EndKey, Opts), + lists:map(fun({_K, JobId}) -> JobId end, erlfdb:wait(Future)). + + +get_inactive_since(#{jtx := true} = JTx, Type, Versionstamp) -> + #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx), + Prefix = erlfdb_tuple:pack({?ACTIVITY}, Jobs), + {StartKey, _} = erlfdb_tuple:range({Type}, Prefix), + EndKey = erlfdb_tuple:pack({Type, Versionstamp}, Prefix), + EndKeySel = erlfdb_key:first_greater_than(EndKey), + Opts = [{streaming_mode, want_all}], + Future = erlfdb:get_range(Tx, StartKey, EndKeySel, Opts), + lists:map(fun({_K, JobId}) -> JobId end, erlfdb:wait(Future)). + + +re_enqueue_inactive(#{jtx := true} = JTx, Type, Versionstamp) -> + #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx), + JobIds = get_inactive_since(JTx, Type, Versionstamp), + lists:foreach(fun(JobId) -> + Key = erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs), + {Seq, _, _, JobOpts} = get_job(Tx, Key), + clear_activity(JTx, Type, Seq), + maybe_enqueue(JTx, Type, JobId, JobOpts, true) + end, JobIds), + case length(JobIds) > 0 of + true -> update_watch(JTx, Type); + false -> ok + end, + JobIds. + + +% Cache initialization API. Called from the supervisor just to create the ETS +% table. It returns `ignore` to tell supervisor it won't actually start any +% process, which is what we want here. +% +init_cache() -> + ConcurrencyOpts = [{read_concurrency, true}, {write_concurrency, true}], + ets:new(?MODULE, [public, named_table] ++ ConcurrencyOpts), + ignore. + + +% Cached job transaction object. This object wraps a transaction, caches the +% directory lookup path, and the metadata version. The function can be used +% from or outside the transaction. When used from a transaction it will verify +% if the metadata was changed, and will refresh automatically. +% +get_jtx() -> + get_jtx(undefined). + + +get_jtx(#{tx := Tx} = _TxDb) -> + get_jtx(Tx); + +get_jtx(undefined = _Tx) -> + case ets:lookup(?MODULE, ?JOBS) of + [{_, #{} = JTx}] -> + JTx; + [] -> + JTx = update_jtx_cache(init_jtx(undefined)), + JTx#{tx := undefined} + end; + +get_jtx({erlfdb_transaction, _} = Tx) -> + case ets:lookup(?MODULE, ?JOBS) of + [{_, #{} = JTx}] -> + ensure_current(JTx#{tx := Tx}); + [] -> + update_jtx_cache(init_jtx(Tx)) + end. + + +% Transaction processing to be used with couch jobs' specific transaction +% contexts +% +tx(#{jtx := true} = JTx, Fun) when is_function(Fun, 1) -> + fabric2_fdb:transactional(JTx, Fun). + + +% Utility fdb functions used by other module in couch_job. Maybe move these to +% a separate module if the list keep growing + +has_versionstamp(?UNSET_VS) -> + true; + +has_versionstamp(Tuple) when is_tuple(Tuple) -> + has_versionstamp(tuple_to_list(Tuple)); + +has_versionstamp([Elem | Rest]) -> + has_versionstamp(Elem) orelse has_versionstamp(Rest); + +has_versionstamp(_Other) -> + false. + + +% Debug and testing API + +clear_jobs() -> + fabric2_fdb:transactional(fun(Tx) -> + #{jobs_path := Jobs} = init_jtx(Tx), + erlfdb:clear_range_startswith(Tx, Jobs) + end). + + +clear_type(Type) -> + Sections = [?DATA, ?PENDING, ?WATCHES, ?ACTIVITY_TIMEOUT, ?ACTIVITY], + fabric2_fdb:transactional(fun(Tx) -> + #{jobs_path := Jobs} = init_jtx(Tx), + lists:foreach(fun(Section) -> + Prefix = erlfdb_tuple:pack({Section, Type}, Jobs), + erlfdb:clear_range_startswith(Tx, Prefix) + end, Sections) + end). + + +% Private helper functions + +update_job(JTx, Type, JobId, WorkerLockId, JobOpts, OldSeq) -> + #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx), + Key = erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs), + update_activity(JTx, Type, JobId, OldSeq), + ValTup = {?UNSET_VS, WorkerLockId, null, jiffy:encode(JobOpts)}, + Val = erlfdb_tuple:pack_vs(ValTup), + erlfdb:set_versionstamped_value(Tx, Key, Val). + + +update_priority(JobOpts, undefined) -> + JobOpts; + +update_priority(JobOpts, NewPriority) -> + OldPriority = maps:get(?OPT_PRIORITY, JobOpts, undefined), + case NewPriority =/= OldPriority of + true -> JobOpts#{?OPT_PRIORITY => NewPriority}; + false -> JobOpts + end. + + +cancel(#{jx := true}, _, {_, _, _, #{?OPT_CANCEL := true}}) -> + ok; + +cancel(#{jtx := true, tx := Tx}, Key, Job) -> + {Seq, WorkerLockId, Priority, JobOpts} = Job, + JobOpts1 = JobOpts#{?OPT_CANCEL => true}, + JobOptsEnc = jiffy:encode(JobOpts1), + Val = erlfdb_tuple:pack({Seq, WorkerLockId, Priority, JobOptsEnc}), + erlfdb:set(Tx, Key, Val), + ok. + + +maybe_enqueue(#{jtx := true} = JTx, Type, JobId, JobOpts, Resubmit) -> + #{tx := Tx, jobs_path := Jobs} = JTx, + Key = erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs), + Cancel = maps:get(?OPT_CANCEL, JobOpts, false) == true, + JobOpts1 = maps:without([?OPT_RESUBMIT], JobOpts), + Priority = maps:get(?OPT_PRIORITY, JobOpts1, ?UNSET_VS), + JobOptsEnc = jiffy:encode(JobOpts1), + case Resubmit andalso not Cancel of + true -> + case has_versionstamp(Priority) of + true -> + Val = erlfdb_tuple:pack_vs({null, null, Priority, + JobOptsEnc}), + erlfdb:set_versionstamped_value(Tx, Key, Val); + false -> + Val = erlfdb_tuple:pack({null, null, Priority, + JobOptsEnc}), + erlfdb:set(Tx, Key, Val) + end, + couch_jobs_pending:enqueue(JTx, Type, Priority, JobId); + false -> + Val = erlfdb_tuple:pack({null, null, null, JobOptsEnc}), + erlfdb:set(Tx, Key, Val) + end, + ok. + + +get_job(Tx = {erlfdb_transaction, _}, Key) -> + case erlfdb:wait(erlfdb:get(Tx, Key)) of + <<_/binary>> = Val -> unpack_job(Val); + not_found -> not_found + end. + + +unpack_job(<<_/binary>> = JobVal) -> + {Seq, WorkerLockId, Priority, JobOptsEnc} = erlfdb_tuple:unpack(JobVal), + JobOpts = jiffy:decode(JobOptsEnc, [return_maps]), + {Seq, WorkerLockId, Priority, JobOpts}. + + +get_job_and_status(Tx, Key, WorkerLockId) -> + case get_job(Tx, Key) of + {_, LockId, _, _} = Res when WorkerLockId =/= LockId -> + {worker_conflict, Res}; + {_, _, _, #{?OPT_CANCEL := true}} = Res -> + {canceled, Res}; + {_, _, _, #{}} = Res -> + {ok, Res}; + not_found -> + {worker_conflict, not_found} + end. + + +update_activity(#{jtx := true} = JTx, Type, JobId, Seq) -> + #{tx := Tx, jobs_path := Jobs} = JTx, + case Seq =/= null of + true -> clear_activity(JTx, Type, Seq); + false -> ok + end, + Key = erlfdb_tuple:pack_vs({?ACTIVITY, Type, ?UNSET_VS}, Jobs), + erlfdb:set_versionstamped_key(Tx, Key, JobId), + update_watch(JTx, Type). + + +clear_activity(#{jtx := true} = JTx, Type, Seq) -> + #{tx := Tx, jobs_path := Jobs} = JTx, + Key = erlfdb_tuple:pack({?ACTIVITY, Type, Seq}, Jobs), + erlfdb:clear(Tx, Key). + + +update_watch(#{jtx := true} = JTx, Type) -> + #{tx := Tx, jobs_path := Jobs} = JTx, + Key = erlfdb_tuple:pack({?WATCHES, Type}, Jobs), + Val = erlfdb_tuple:pack_vs({?UNSET_VS}), + erlfdb:set_versionstamped_value(Tx, Key, Val). + + +update_lock(#{jtx := true} = JTx, Type, JobId, WorkerLockId) -> + #{tx := Tx, jobs_path := Jobs} = JTx, + Key = erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs), + {null, null, _, JobOpts} = get_job(Tx, Key), + ValTup = {?UNSET_VS, WorkerLockId, null, jiffy:encode(JobOpts)}, + Val = erlfdb_tuple:pack_vs(ValTup), + erlfdb:set_versionstamped_value(Tx, Key, Val). + + +job_state(WorkerLockId, Priority) -> + case {WorkerLockId, Priority} of + {null, null} -> + finished; + {WorkerLockId, _} when WorkerLockId =/= null -> + running; + {_, Priority} when Priority =/= null -> + pending + end. + + +% This a transaction context object similar to the Db = #{} one from +% fabric2_fdb. It's is used to cache the jobs path directory (to avoid extra +% lookups on every operation) and to check for metadata changes (in case +% directory changes). +% +init_jtx(undefined) -> + fabric2_fdb:transactional(fun(Tx) -> init_jtx(Tx) end); + +init_jtx({erlfdb_transaction, _} = Tx) -> + Root = erlfdb_directory:root(), + CouchDB = erlfdb_directory:create_or_open(Tx, Root, [<<"couchdb">>]), + LayerPrefix = erlfdb_directory:get_name(CouchDB), + Jobs = erlfdb_tuple:pack({?JOBS}, LayerPrefix), + Version = erlfdb:wait(erlfdb:get(Tx, ?METADATA_VERSION_KEY)), + % layer_prefix, md_version and tx here match db map fields in fabric2_fdb + % but we also assert that this is a job transaction using the jtx => true + % field + #{ + jtx => true, + tx => Tx, + layer_prefix => LayerPrefix, + jobs_path => Jobs, + md_version => Version + }. + + +ensure_current(#{jtx := true, tx := Tx, md_version := Version} = JTx) -> + case erlfdb:wait(erlfdb:get(Tx, ?METADATA_VERSION_KEY)) of + Version -> JTx; + _NewVersion -> update_jtx_cache(init_jtx(Tx)) + end. + + +update_jtx_cache(#{jtx := true} = JTx) -> + CachedJTx = JTx#{tx := undefined}, + ets:insert(?MODULE, {?JOBS, CachedJTx}), + JTx. diff --git a/src/couch_jobs/src/couch_jobs_notifier.erl b/src/couch_jobs/src/couch_jobs_notifier.erl new file mode 100644 index 000000000..59fa31a29 --- /dev/null +++ b/src/couch_jobs/src/couch_jobs_notifier.erl @@ -0,0 +1,218 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_jobs_notifier). + +-behaviour(gen_server). + + +-export([ + start_link/1, + subscribe/3, + subscribe/4, + unsubscribe/2 +]). + +-export([ + init/1, + terminate/2, + handle_call/3, + handle_cast/2, + handle_info/2, + code_change/3 +]). + + +-include("couch_jobs.hrl"). + + +-define(TYPE_MONITOR_HOLDOFF_DEFAULT, 1000). +-define(TYPE_MONITOR_TIMEOUT_DEFAULT, "infinity"). + + +-record(st, { + jtx, + type, + monitor_pid, + subs +}). + + +start_link(Type) -> + gen_server:start_link(?MODULE, [Type], []). + + +subscribe(Server, JobId, Pid) when is_pid(Pid) -> + gen_server:call(Server, {subscribe, JobId, nil, Pid}, infinity). + + +subscribe(Server, JobId, Fun, Pid) when is_function(Fun, 3), is_pid(Pid) -> + gen_server:call(Server, {subscribe, JobId, Fun, Pid}, infinity). + + +unsubscribe(Server, Ref) when is_reference(Ref) -> + gen_server:call(Server, {unsubscribe, Ref}, infinity). + + +init([Type]) -> + JTx = couch_jobs_fdb:get_jtx(), + EtsOpts = [ordered_set, protected], + St = #st{jtx = JTx, type = Type, subs = ets:new(?MODULE, EtsOpts)}, + VS = get_type_vs(St), + HoldOff = get_holdoff(), + Timeout = get_timeout(), + Pid = couch_jobs_type_monitor:start(Type, VS, HoldOff, Timeout), + {ok, St#st{monitor_pid = Pid}}. + + +terminate(_, _St) -> + ok. + + +handle_call({subscribe, JobId, Fun, Pid}, _From, #st{} = St) -> + Res = case get_job(St, JobId) of + not_found -> + not_found; + {ok, _, finished} -> + finished; + {ok, _, JobState} -> + Ref = erlang:monitor(process, Pid), + ets:insert(St#st.subs, {{JobId, Ref}, {Fun, Pid, JobState}}), + {Ref, JobState} + end, + {reply, Res, St}; + +handle_call({unsubscribe, Ref}, _From, #st{subs = Subs} = St) -> + true = ets:match_delete(Subs, {{'$1', Ref}, '_'}), + {reply, ok, St}; + +% couch_jobs_type_monitor calls this +handle_call({type_updated, VS}, _From, St) -> + ok = notify_subscribers(VS, St), + {reply, ok, St}; + +handle_call(Msg, _From, St) -> + {stop, {bad_call, Msg}, {bad_call, Msg}, St}. + + +handle_cast(Msg, St) -> + {stop, {bad_cast, Msg}, St}. + + +handle_info({'DOWN', Ref, process, _, _}, #st{subs = Subs} = St) -> + true = ets:match_delete(Subs, {{'$1', Ref}, '_'}), + {noreply, St}; + +handle_info(Msg, St) -> + {stop, {bad_info, Msg}, St}. + + +code_change(_OldVsn, St, _Extra) -> + {ok, St}. + + +get_job(#st{jtx = JTx, type = Type}, JobId) -> + couch_jobs_fdb:tx(JTx, fun(JTx1) -> + couch_jobs_fdb:get_job(JTx1, Type, JobId) + end). + + +get_jobs(#st{jtx = JTx, type = Type}, JobIds) -> + couch_jobs_fdb:tx(JTx, fun(JTx1) -> + lists:map(fun(JobId) -> + case couch_jobs_fdb:get_job(JTx1, Type, JobId) of + {ok, _, JobState} -> {JobId, JobState}; + not_found -> {JobId, not_found} + end + end, JobIds) + end). + + +get_type_vs(#st{jtx = JTx, type = Type}) -> + couch_jobs_fdb:tx(JTx, fun(JTx1) -> + couch_jobs_fdb:get_activity_vs(JTx1, Type) + end). + + +% "Active since" is the list of jobs that have been active (running) +% and updated at least once since the given versionstamp. These are relatively +% cheap to find as it's just a range read in the ?ACTIVITY subspace. +% +get_active_since(#st{jtx = JTx, type = Type}, VS, SubscribedJobs) -> + AllUpdatedSet = sets:from_list(couch_jobs_fdb:tx(JTx, fun(JTx1) -> + couch_jobs_fdb:get_active_since(JTx1, Type, VS) + end)), + SubscribedSet = sets:from_list(SubscribedJobs), + SubscribedActiveSet = sets:intersection(AllUpdatedSet, SubscribedSet), + sets:to_list(SubscribedActiveSet). + + +get_subscribers(JobId, #st{subs = Subs}) -> + % Use ordered ets's fast matching of partial key prefixes here + lists:map(fun([Ref, {Fun, Pid, JobState}]) -> + {Ref, Fun, Pid, JobState} + end, ets:match(Subs, {{JobId, '$1'}, '$2'})). + + +get_subscribed_job_ids(#st{subs = Subs}) -> + Matches = ets:match(Subs, {{'$1', '_'}, '_'}), + lists:usort(lists:flatten(Matches)). + + +notify_subscribers(VS, #st{subs = Subs} = St) -> + JobIds = get_subscribed_job_ids(St), + % First gather the easy (cheap) active jobs. Then with those out of way, + % inspect each job to get its state. + Active = get_active_since(St, VS, JobIds), + JobStates = [{JobId, running} || JobId <- Active], + JobStates1 = JobStates ++ get_jobs(St, JobIds -- Active), + lists:foreach(fun({JobId, JobState}) -> + lists:foreach(fun + ({Ref, Fun, Pid, running}) when JobState =:= running -> + notify(JobId, Ref, Fun, Pid, JobState, St); + ({_Ref, _Fun, _Pid, State}) when State =:= JobState -> + ok; + ({Ref, Fun, Pid, _}) -> + notify(JobId, Ref, Fun, Pid, JobState, St), + ets:insert(Subs, {{JobId, Ref}, {Fun, Pid, JobState}}) + end, get_subscribers(JobId, St)), + case lists:member(JobState, [finished, not_found]) of + true -> ets:match_delete(Subs, {{JobId, '_'}, '_'}); + false -> ok + end + end, JobStates1). + + +notify(JobId, _Ref, Fun, _, JobState, St) when is_function(Fun, 3) -> + try + Fun(St#st.type, JobId, JobState) + catch + Tag:Err -> + ErrMsg = "~p : callback ~p failed ~p ~p => ~p:~p", + couch_log:error(ErrMsg, [?MODULE, Fun, JobId, JobState, Tag, Err]) + end; + +notify(JobId, Ref, _, Pid, JobState, St) -> + Pid ! {?COUCH_JOBS_EVENT, Ref, St#st.type, JobId, JobState}. + + +get_holdoff() -> + config:get_integer("couch_jobs", "type_monitor_holdoff_msec", + ?TYPE_MONITOR_HOLDOFF_DEFAULT). + + +get_timeout() -> + Default = ?TYPE_MONITOR_TIMEOUT_DEFAULT, + case config:get("couch_jobs", "type_monitor_timeout_msec", Default) of + "infinity" -> infinity; + Milliseconds -> list_to_integer(Milliseconds) + end. diff --git a/src/couch_jobs/src/couch_jobs_notifier_sup.erl b/src/couch_jobs/src/couch_jobs_notifier_sup.erl new file mode 100644 index 000000000..81d93493b --- /dev/null +++ b/src/couch_jobs/src/couch_jobs_notifier_sup.erl @@ -0,0 +1,64 @@ +% +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_jobs_notifier_sup). + + +-behaviour(supervisor). + + +-export([ + start_link/0, + + start_notifier/1, + stop_notifier/1, + get_child_pids/0 +]). + +-export([ + init/1 +]). + + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + + +start_notifier(Type) -> + supervisor:start_child(?MODULE, [Type]). + + +stop_notifier(Pid) -> + supervisor:terminate_child(?MODULE, Pid). + + +get_child_pids() -> + lists:map(fun({_Id, Pid, _Type, _Mod}) -> + Pid + end, supervisor:which_children(?MODULE)). + + +init(_) -> + Flags = #{ + strategy => simple_one_for_one, + intensity => 10, + period => 3 + }, + Children = [ + #{ + id => couch_jobs_notifier, + restart => temporary, + start => {couch_jobs_notifier, start_link, []} + } + ], + {ok, {Flags, Children}}. diff --git a/src/couch_jobs/src/couch_jobs_pending.erl b/src/couch_jobs/src/couch_jobs_pending.erl new file mode 100644 index 000000000..cedf53e43 --- /dev/null +++ b/src/couch_jobs/src/couch_jobs_pending.erl @@ -0,0 +1,159 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_jobs_pending). + + +-export([ + enqueue/4, + dequeue/3, + remove/4 +]). + + +-include("couch_jobs.hrl"). + + +% Make this configurable or auto-adjustable based on retries +% +-define(RANGE_LIMIT, 256). + + +% Data model +% +% (?JOBS, ?PENDING, Type, Priority, JobId) = "" + + +% Public API + +% Enqueue a job into the pending queue. Priority determines the place in the +% queue. +% +enqueue(#{jtx := true} = JTx, Type, Priority, JobId) -> + #{tx := Tx, jobs_path := Jobs} = JTx, + KeyTup = {?PENDING, Type, Priority, JobId}, + case couch_jobs_fdb:has_versionstamp(Priority) of + true -> + Key = erlfdb_tuple:pack_vs(KeyTup, Jobs), + erlfdb:set_versionstamped_key(Tx, Key, <<>>); + false -> + Key = erlfdb_tuple:pack(KeyTup, Jobs), + erlfdb:set(Tx, Key, <<>>) + end. + + +% Dequeue a job from the front of the queue. +% +% If MaxPriority is specified, any job between the front of the queue and up +% until MaxPriority is considered. Workers may randomly pick jobs in that +% range. That can be used to avoid contention at the expense of strict dequeue +% ordering. For instance if priorities are 0-high, 1-normal and 2-low, and we +% wish to process normal and urgent jobs, then MaxPriority=1-normal would +% accomplish that. +% +dequeue(#{jtx := true} = JTx, Type, undefined) -> + #{tx := Tx, jobs_path := Jobs} = JTx, + Prefix = erlfdb_tuple:pack({?PENDING, Type}, Jobs), + case get_front_priority(Tx, Prefix) of + not_found -> + not_found; + {{versionstamp, _, _}, PendingKey} -> + erlfdb:clear(Tx, PendingKey), + {_, JobId} = erlfdb_tuple:unpack(PendingKey, Prefix), + JobId; + {{versionstamp, _, _, _}, PendingKey} -> + erlfdb:clear(Tx, PendingKey), + {_, JobId} = erlfdb_tuple:unpack(PendingKey, Prefix), + JobId; + {Priority, _} -> + {Start, End} = erlfdb_tuple:range({Priority}, Prefix), + case clear_random_key_from_range(Tx, Start, End) of + not_found -> + not_found; + <<_/binary>> = PendingKey -> + {_, JobId} = erlfdb_tuple:unpack(PendingKey, Prefix), + JobId + end + end; + +% If MaxPriority is not specified, only jobs with the same priority as the item +% at the front of the queue are considered. Two extremes are useful to +% consider: +% +% * Priority is just one static value (say null, or "normal"). In that case, +% the queue is effectively a bag of tasks that can be grabbed in any order, +% which should minimize contention. +% +% * Each job has a unique priority value, for example a versionstamp. In that +% case, the queue has strict FIFO behavior, but there will be more contention +% at the front of the queue. +% +dequeue(#{jtx := true} = JTx, Type, MaxPriority) -> + #{tx := Tx, jobs_path := Jobs} = JTx, + Prefix = erlfdb_tuple:pack({?PENDING, Type}, Jobs), + StartKeySel = erlfdb_key:first_greater_than(Prefix), + End = erlfdb_tuple:pack({MaxPriority, <<16#FF>>}, Prefix), + EndKeySel = erlfdb_key:first_greater_or_equal(End), + case clear_random_key_from_range(Tx, StartKeySel, EndKeySel) of + not_found -> + not_found; + <<_/binary>> = PendingKey -> + {_, JobId} = erlfdb_tuple:unpack(PendingKey, Prefix), + JobId + end. + + +% Remove a job from the pending queue. This is used, for example, when a job is +% canceled while it was waiting in the pending queue. +% +remove(#{jtx := true} = JTx, Type, Priority, JobId) -> + #{tx := Tx, jobs_path := Jobs} = JTx, + Key = erlfdb_tuple:pack({?PENDING, Type, Priority, JobId}, Jobs), + erlfdb:clear(Tx, Key). + + +% Private functions + +% The priority of the item at the front. If there are multiple +% items with the same priority, workers can randomly pick between them to +% avoid contention. +% +get_front_priority(Tx, Prefix) -> + Opts = [{limit, 1}, {snapshot, true}], + case erlfdb:wait(erlfdb:get_range_startswith(Tx, Prefix, Opts)) of + [] -> + not_found; + [{FrontKey, _}] -> + {Priority, _} = erlfdb_tuple:unpack(FrontKey, Prefix), + {Priority, FrontKey} + end. + + +% Pick a random key from the range snapshot. Then radomly pick a key to +% clear. Before clearing, ensure there is a read conflict on the key in +% in case other workers have picked the same key. +% +clear_random_key_from_range(Tx, Start, End) -> + Opts = [ + {limit, ?RANGE_LIMIT}, + {snapshot, true} + ], + case erlfdb:wait(erlfdb:get_range(Tx, Start, End, Opts)) of + [] -> + not_found; + [{_, _} | _] = KVs -> + Index = rand:uniform(length(KVs)), + {Key, _} = lists:nth(Index, KVs), + erlfdb:add_read_conflict_key(Tx, Key), + erlfdb:clear(Tx, Key), + Key + end. diff --git a/src/couch_jobs/src/couch_jobs_server.erl b/src/couch_jobs/src/couch_jobs_server.erl new file mode 100644 index 000000000..ad30435bd --- /dev/null +++ b/src/couch_jobs/src/couch_jobs_server.erl @@ -0,0 +1,184 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_jobs_server). + +-behaviour(gen_server). + + +-export([ + start_link/0, + get_notifier_server/1, + force_check_types/0 +]). + +-export([ + init/1, + terminate/2, + handle_call/3, + handle_cast/2, + handle_info/2, + code_change/3 +]). + + +-define(TYPE_CHECK_PERIOD_DEFAULT, 15000). +-define(MAX_JITTER_DEFAULT, 5000). + + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, nil, []). + + +get_notifier_server(Type) -> + case get_type_pid_refs(Type) of + {{_, _}, {NotifierPid, _}} -> {ok, NotifierPid}; + not_found -> {error, not_found} + end. + + +force_check_types() -> + gen_server:call(?MODULE, check_types, infinity). + + +init(_) -> + % If couch_jobs_server is after the notifiers and activity supervisor. If + % it restart, there could be some stale notifier or activity monitors. Kill + % those as later on we'd start new ones anyway. + reset_monitors(), + reset_notifiers(), + ets:new(?MODULE, [protected, named_table]), + schedule_check(), + {ok, nil}. + + +terminate(_, _St) -> + ok. + + +handle_call(check_types, _From, St) -> + check_types(), + {reply, ok, St}; + +handle_call(Msg, _From, St) -> + {stop, {bad_call, Msg}, {bad_call, Msg}, St}. + + +handle_cast(Msg, St) -> + {stop, {bad_cast, Msg}, St}. + + +handle_info(check_types, St) -> + check_types(), + schedule_check(), + {noreply, St}; + +handle_info({'DOWN', _Ref, process, Pid, Reason}, St) -> + LogMsg = "~p : process ~p exited with ~p", + couch_log:error(LogMsg, [?MODULE, Pid, Reason]), + {stop, {unexpected_process_exit, Pid, Reason}, St}; + +handle_info({Ref, ready}, St) when is_reference(Ref) -> + % Don't crash out couch_jobs_server and the whole application would need to + % eventually do proper cleanup in erlfdb:wait timeout code. + LogMsg = "~p : spurious erlfdb future ready message ~p", + couch_log:error(LogMsg, [?MODULE, Ref]), + {noreply, St}; + +handle_info(Msg, St) -> + {stop, {bad_info, Msg}, St}. + + +code_change(_OldVsn, St, _Extra) -> + {ok, St}. + + +check_types() -> + FdbTypes = fdb_types(), + EtsTypes = ets_types(), + ToStart = FdbTypes -- EtsTypes, + ToStop = EtsTypes -- FdbTypes, + lists:foreach(fun(Type) -> start_monitors(Type) end, ToStart), + lists:foreach(fun(Type) -> stop_monitors(Type) end, ToStop). + + +start_monitors(Type) -> + MonPidRef = case couch_jobs_activity_monitor_sup:start_monitor(Type) of + {ok, Pid1} -> {Pid1, monitor(process, Pid1)}; + {error, Error1} -> error({failed_to_start_monitor, Type, Error1}) + end, + NotifierPidRef = case couch_jobs_notifier_sup:start_notifier(Type) of + {ok, Pid2} -> {Pid2, monitor(process, Pid2)}; + {error, Error2} -> error({failed_to_start_notifier, Type, Error2}) + end, + ets:insert_new(?MODULE, {Type, MonPidRef, NotifierPidRef}). + + +stop_monitors(Type) -> + {{MonPid, MonRef}, {NotifierPid, NotifierRef}} = get_type_pid_refs(Type), + ok = couch_jobs_activity_monitor_sup:stop_monitor(MonPid), + demonitor(MonRef, [flush]), + ok = couch_jobs_notifier_sup:stop_notifier(NotifierPid), + demonitor(NotifierRef, [flush]), + ets:delete(?MODULE, Type). + + +reset_monitors() -> + lists:foreach(fun(Pid) -> + couch_jobs_activity_monitor_sup:stop_monitor(Pid) + end, couch_jobs_activity_monitor_sup:get_child_pids()). + + +reset_notifiers() -> + lists:foreach(fun(Pid) -> + couch_jobs_notifier_sup:stop_notifier(Pid) + end, couch_jobs_notifier_sup:get_child_pids()). + + +get_type_pid_refs(Type) -> + case ets:lookup(?MODULE, Type) of + [{_, MonPidRef, NotifierPidRef}] -> {MonPidRef, NotifierPidRef}; + [] -> not_found + end. + + +ets_types() -> + lists:flatten(ets:match(?MODULE, {'$1', '_', '_'})). + + +fdb_types() -> + try + couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) -> + couch_jobs_fdb:get_types(JTx) + end) + catch + error:{timeout, _} -> + couch_log:warning("~p : Timed out connecting to FDB", [?MODULE]), + [] + end. + + +schedule_check() -> + Timeout = get_period_msec(), + MaxJitter = max(Timeout div 2, get_max_jitter_msec()), + Wait = Timeout + rand:uniform(min(1, MaxJitter)), + erlang:send_after(Wait, self(), check_types). + + +get_period_msec() -> + config:get_integer("couch_jobs", "type_check_period_msec", + ?TYPE_CHECK_PERIOD_DEFAULT). + + +get_max_jitter_msec() -> + config:get_integer("couch_jobs", "type_check_max_jitter_msec", + ?MAX_JITTER_DEFAULT). diff --git a/src/couch_jobs/src/couch_jobs_sup.erl b/src/couch_jobs/src/couch_jobs_sup.erl new file mode 100644 index 000000000..d79023777 --- /dev/null +++ b/src/couch_jobs/src/couch_jobs_sup.erl @@ -0,0 +1,66 @@ +% +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_jobs_sup). + + +-behaviour(supervisor). + + +-export([ + start_link/0 +]). + +-export([ + init/1 +]). + + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + + +init([]) -> + Flags = #{ + strategy => rest_for_one, + intensity => 3, + period => 10 + }, + Children = [ + #{ + id => couch_jobs_fdb, + restart => transient, + start => {couch_jobs_fdb, init_cache, []} + }, + #{ + id => couch_jobs_activity_monitor_sup, + restart => permanent, + shutdown => brutal_kill, + type => supervisor, + start => {couch_jobs_activity_monitor_sup, start_link, []} + }, + #{ + id => couch_jobs_notifier_sup, + restart => permanent, + shutdown => brutal_kill, + type => supervisor, + start => {couch_jobs_notifier_sup, start_link, []} + }, + #{ + id => couch_jobs_server, + restart => permanent, + shutdown => brutal_kill, + start => {couch_jobs_server, start_link, []} + } + ], + {ok, {Flags, Children}}. diff --git a/src/couch_jobs/src/couch_jobs_type_monitor.erl b/src/couch_jobs/src/couch_jobs_type_monitor.erl new file mode 100644 index 000000000..b7ba633e2 --- /dev/null +++ b/src/couch_jobs/src/couch_jobs_type_monitor.erl @@ -0,0 +1,78 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_jobs_type_monitor). + + +-export([ + start/4 +]). + + +-include("couch_jobs.hrl"). + + +-record(st, { + jtx, + type, + vs, + parent, + timestamp, + holdoff, + timeout +}). + + +start(Type, VS, HoldOff, Timeout) -> + Parent = self(), + spawn_link(fun() -> + loop(#st{ + jtx = couch_jobs_fdb:get_jtx(), + type = Type, + vs = VS, + parent = Parent, + timestamp = 0, + holdoff = HoldOff, + timeout = Timeout + }) + end). + + +loop(#st{vs = VS, timeout = Timeout} = St) -> + {St1, Watch} = case get_vs_and_watch(St) of + {VS1, W} when VS1 =/= VS -> {notify(St#st{vs = VS1}), W}; + {VS, W} -> {St, W} + end, + try + erlfdb:wait(Watch, [{timeout, Timeout}]) + catch + error:{timeout, _} -> + ok + end, + loop(St1). + + +notify(#st{} = St) -> + #st{holdoff = HoldOff, parent = Pid, timestamp = Ts, vs = VS} = St, + Now = erlang:system_time(millisecond), + case Now - Ts of + Dt when Dt < HoldOff -> timer:sleep(min(HoldOff - Dt, 0)); + _ -> ok + end, + gen_server:call(Pid, {type_updated, VS}, infinity), + St#st{timestamp = Now}. + + +get_vs_and_watch(#st{jtx = JTx, type = Type}) -> + couch_jobs_fdb:tx(JTx, fun(JTx1) -> + couch_jobs_fdb:get_activity_vs_and_watch(JTx1, Type) + end). diff --git a/src/couch_jobs/test/couch_jobs_tests.erl b/src/couch_jobs/test/couch_jobs_tests.erl new file mode 100644 index 000000000..fe3df5663 --- /dev/null +++ b/src/couch_jobs/test/couch_jobs_tests.erl @@ -0,0 +1,597 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_jobs_tests). + + +-include_lib("couch/include/couch_db.hrl"). +-include_lib("couch/include/couch_eunit.hrl"). +-include_lib("eunit/include/eunit.hrl"). + + +-define(DATA, <<"data">>). +-define(RESUBMIT, <<"resubmit">>). +-define(PRIORITY, <<"priority">>). + + +couch_jobs_basic_test_() -> + { + "Test couch jobs basics", + { + setup, + fun setup_couch/0, fun teardown_couch/1, + { + foreach, + fun setup/0, fun teardown/1, + [ + fun add_remove_pending/1, + fun add_remove_errors/1, + fun get_jobs/1, + fun resubmit_as_job_creator/1, + fun type_timeouts_and_server/1, + fun dead_notifier_restarts_jobs_server/1, + fun bad_messages_restart_couch_jobs_server/1, + fun bad_messages_restart_notifier/1, + fun bad_messages_restart_activity_monitor/1, + fun worker_accept_and_finish/1, + fun worker_update/1, + fun resubmit_enqueues_job/1, + fun add_custom_priority/1, + fun resubmit_custom_priority/1, + fun accept_max_priority/1, + fun subscribe/1, + fun subscribe_callback/1, + fun subscribe_errors/1, + fun enqueue_inactive/1, + fun cancel_running_job/1, + fun stop_and_remove_running_job/1, + fun clear_type_works/1 + ] + } + } + }. + + +setup_couch() -> + test_util:start_couch([fabric]). + + +teardown_couch(Ctx) -> + test_util:stop_couch(Ctx), + meck:unload(). + + +setup() -> + couch_jobs_fdb:clear_jobs(), + application:start(couch_jobs), + T1 = <<"t1">>, + T2 = 424242, % a number should work as well + T1Timeout = 2, + T2Timeout = 3, + couch_jobs:set_type_timeout(T1, T1Timeout), + couch_jobs:set_type_timeout(T2, T2Timeout), + couch_jobs_server:force_check_types(), + #{ + t1 => T1, + t2 => T2, + t1_timeout => T1Timeout, + t2_timeout => T2Timeout, + j1 => <<"j1">>, + j2 => <<"j2">>, + j1_data => #{<<"j1_data">> => 42} + }. + + +teardown(#{}) -> + application:stop(couch_jobs), + couch_jobs_fdb:clear_jobs(), + meck:unload(). + + +restart_app() -> + application:stop(couch_jobs), + application:start(couch_jobs), + couch_jobs_server:force_check_types(). + + +add_remove_pending(#{t1 := T, j1 := J, j1_data := Data}) -> + ?_test(begin + ?assertEqual(ok, couch_jobs:add(T, J, #{?DATA => Data})), + ?assertMatch({ok, #{?DATA := Data}, pending}, + couch_jobs:get_job(T, J)), + ?assertEqual(ok, couch_jobs:remove(T, J)), + ?assertEqual(ok, couch_jobs:add(T, J, #{?DATA => Data})), + ?assertMatch({ok, #{?DATA := Data}, pending}, + couch_jobs:get_job(T, J)), + ?assertEqual(ok, couch_jobs:remove(T, J)) + end). + + +add_remove_errors(#{t1 := T, j1 := J}) -> + ?_test(begin + ?assertEqual(not_found, couch_jobs:remove(<<"bad_type">>, <<"1">>)), + ?assertMatch({error, {invalid_args, _}}, couch_jobs:add(T, J, + #{1 => 2})), + ?assertEqual({error, no_type_timeout}, couch_jobs:add(<<"x">>, J, + #{})), + ?assertEqual(ok, couch_jobs:add(T, J, #{})), + ?assertEqual({error, duplicate_job}, couch_jobs:add(T, J, #{})), + ?assertEqual(ok, couch_jobs:remove(T, J)), + ?assertEqual(not_found, couch_jobs:stop_and_remove(T, J, 100)), + ?assertEqual(not_found, couch_jobs:remove(T, J)), + ?assertMatch({error, {invalid_args, _}}, couch_jobs:add(T, J, + #{?RESUBMIT => potato})), + ?assertMatch({error, {invalid_args, _}}, couch_jobs:add(T, J, + #{?PRIORITY => #{bad_priority => nope}})) + + end). + + +get_jobs(#{t1 := T1, t2 := T2, j1 := J1, j2 := J2, j1_data := Data}) -> + ?_test(begin + ok = couch_jobs:add(T1, J1, #{?DATA => Data}), + ok = couch_jobs:add(T2, J2, #{}), + + ?assertMatch({ok, #{?DATA := Data}, pending}, + couch_jobs:get_job(T1, J1)), + ?assertEqual({ok, #{}, pending}, couch_jobs:get_job(T2, J2)), + + ?assertEqual([{J1, pending, #{?DATA => Data}}], + couch_jobs:get_jobs(T1)), + ?assertEqual([{J2, pending, #{}}], couch_jobs:get_jobs(T2)), + ?assertEqual([], couch_jobs:get_jobs(<<"othertype">>)), + + ?assertEqual(lists:sort([ + {T1, J1, pending, #{?DATA => Data}}, + {T2, J2, pending, #{}} + ]), lists:sort(couch_jobs:get_jobs())), + ?assertEqual(ok, couch_jobs:remove(T1, J1)), + ?assertEqual([{T2, J2, pending, #{}}], couch_jobs:get_jobs()), + ?assertEqual(ok, couch_jobs:remove(T2, J2)), + ?assertEqual([], couch_jobs:get_jobs()) + end). + + +resubmit_as_job_creator(#{t1 := T, j1 := J, j1_data := Data}) -> + ?_test(begin + ok = couch_jobs:add(T, J, #{?DATA => Data}), + + ?assertEqual(not_found, couch_jobs:resubmit(T, <<"badjob">>)), + + ?assertEqual(ok, couch_jobs:resubmit(T, J)), + JobOpts1 = #{?DATA => Data, ?RESUBMIT => true}, + ?assertEqual({ok, JobOpts1, pending}, couch_jobs:get_job(T, J)), + + ?assertEqual(ok, couch_jobs:resubmit(T, J)), + ?assertEqual({ok, JobOpts1, pending}, couch_jobs:get_job(T, J)), + + ?assertEqual(ok, couch_jobs:resubmit(T, J, <<"a">>)), + JobOpts2 = #{?DATA => Data, ?RESUBMIT => true, ?PRIORITY => <<"a">>}, + ?assertEqual({ok, JobOpts2, pending}, couch_jobs:get_job(T, J)), + + ?assertEqual(ok, couch_jobs:resubmit(T, J, <<"b">>)), + JobOpts3 = #{?DATA => Data, ?RESUBMIT => true, ?PRIORITY => <<"b">>}, + ?assertEqual({ok, JobOpts3, pending}, couch_jobs:get_job(T, J)) + end). + + +type_timeouts_and_server(#{t1 := T, t1_timeout := T1Timeout}) -> + ?_test(begin + ?assertEqual(T1Timeout, couch_jobs:get_type_timeout(T)), + + ?assertEqual(2, + length(couch_jobs_activity_monitor_sup:get_child_pids())), + ?assertEqual(2, length(couch_jobs_notifier_sup:get_child_pids())), + ?assertMatch({ok, _}, couch_jobs_server:get_notifier_server(T)), + + ?assertEqual(ok, couch_jobs:set_type_timeout(<<"t3">>, 8)), + couch_jobs_server:force_check_types(), + ?assertEqual(3, + length(couch_jobs_activity_monitor_sup:get_child_pids())), + ?assertEqual(3, length(couch_jobs_notifier_sup:get_child_pids())), + + ?assertEqual(ok, couch_jobs:clear_type_timeout(<<"t3">>)), + couch_jobs_server:force_check_types(), + ?assertEqual(2, + length(couch_jobs_activity_monitor_sup:get_child_pids())), + ?assertEqual(2, + length(couch_jobs_notifier_sup:get_child_pids())), + ?assertMatch({error, _}, + couch_jobs_server:get_notifier_server(<<"t3">>)), + + ?assertEqual(not_found, couch_jobs:get_type_timeout(<<"t3">>)) + end). + + +dead_notifier_restarts_jobs_server(#{}) -> + ?_test(begin + ServerPid = whereis(couch_jobs_server), + Ref = monitor(process, ServerPid), + + [Notifier1, _Notifier2] = couch_jobs_notifier_sup:get_child_pids(), + exit(Notifier1, kill), + + % Killing a notifier should kill the server as well + receive {'DOWN', Ref, _, _, _} -> ok end + end). + + +bad_messages_restart_couch_jobs_server(#{}) -> + ?_test(begin + % couch_jobs_server dies on bad cast + ServerPid1 = whereis(couch_jobs_server), + Ref1 = monitor(process, ServerPid1), + gen_server:cast(ServerPid1, bad_cast), + receive {'DOWN', Ref1, _, _, _} -> ok end, + + restart_app(), + + % couch_jobs_server dies on bad call + ServerPid2 = whereis(couch_jobs_server), + Ref2 = monitor(process, ServerPid2), + catch gen_server:call(ServerPid2, bad_call), + receive {'DOWN', Ref2, _, _, _} -> ok end, + + restart_app(), + + % couch_jobs_server dies on bad info + ServerPid3 = whereis(couch_jobs_server), + Ref3 = monitor(process, ServerPid3), + ServerPid3 ! a_random_message, + receive {'DOWN', Ref3, _, _, _} -> ok end, + + restart_app() + end). + + +bad_messages_restart_notifier(#{}) -> + ?_test(begin + % bad cast kills the activity monitor + [AMon1, _] = couch_jobs_notifier_sup:get_child_pids(), + Ref1 = monitor(process, AMon1), + gen_server:cast(AMon1, bad_cast), + receive {'DOWN', Ref1, _, _, _} -> ok end, + + restart_app(), + + % bad calls restart activity monitor + [AMon2, _] = couch_jobs_notifier_sup:get_child_pids(), + Ref2 = monitor(process, AMon2), + catch gen_server:call(AMon2, bad_call), + receive {'DOWN', Ref2, _, _, _} -> ok end, + + restart_app(), + + % bad info message kills activity monitor + [AMon3, _] = couch_jobs_notifier_sup:get_child_pids(), + Ref3 = monitor(process, AMon3), + AMon3 ! a_bad_message, + receive {'DOWN', Ref3, _, _, _} -> ok end, + + + restart_app() + end). + + +bad_messages_restart_activity_monitor(#{}) -> + ?_test(begin + % bad cast kills the activity monitor + [AMon1, _] = couch_jobs_activity_monitor_sup:get_child_pids(), + Ref1 = monitor(process, AMon1), + gen_server:cast(AMon1, bad_cast), + receive {'DOWN', Ref1, _, _, _} -> ok end, + + restart_app(), + + % bad calls restart activity monitor + [AMon2, _] = couch_jobs_activity_monitor_sup:get_child_pids(), + Ref2 = monitor(process, AMon2), + catch gen_server:call(AMon2, bad_call), + receive {'DOWN', Ref2, _, _, _} -> ok end, + + restart_app(), + + % bad info message kills activity monitor + [AMon3, _] = couch_jobs_activity_monitor_sup:get_child_pids(), + Ref3 = monitor(process, AMon3), + AMon3 ! a_bad_message, + receive {'DOWN', Ref3, _, _, _} -> ok end, + + + restart_app() + end). + + +worker_accept_and_finish(#{t1 := T, j1 := J}) -> + ?_test(begin + ok = couch_jobs:add(T, J, #{}), + + AcceptResponse = couch_jobs:accept(T), + ?assertMatch({ok, J, <<_/binary>>}, AcceptResponse), + {ok, J, WLock} = AcceptResponse, + + ?assertEqual({ok, #{}, running}, couch_jobs:get_job(T, J)), + Res = #{<<"result">> => <<"done">>}, + ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) -> + couch_jobs:finish(Tx, T, J, #{?DATA => Res}, WLock) + end)), + ?assertEqual({ok, #{?DATA => Res}, finished}, + couch_jobs:get_job(T, J)), + + ?assertEqual(ok, couch_jobs:remove(T, J)) + end). + + +worker_update(#{t1 := T, j1 := J}) -> + ?_test(begin + ok = couch_jobs:add(T, J, #{}), + + AcceptResponse = couch_jobs:accept(T), + ?assertMatch({ok, J, <<_/binary>>}, AcceptResponse), + {ok, J, WLock} = AcceptResponse, + + ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) -> + couch_jobs:update(Tx, T, J, #{?DATA => 1}, WLock) + end)), + ?assertEqual({ok, #{?DATA => 1}, running}, couch_jobs:get_job(T, J)), + + ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) -> + couch_jobs:update(Tx, T, J, #{?DATA => 2}, WLock) + end)), + ?assertEqual({ok, #{?DATA => 2}, running}, couch_jobs:get_job(T, J)), + + ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) -> + couch_jobs:finish(Tx, T, J, #{?DATA => 3}, WLock) + end)), + ?assertEqual({ok, #{?DATA => 3}, finished}, couch_jobs:get_job(T, J)), + + ?assertEqual(worker_conflict, fabric2_fdb:transactional(fun(Tx) -> + couch_jobs:update(Tx, T, J, #{?DATA => 4}, WLock) + end)), + + ?assertMatch(not_found, couch_jobs:accept(T)), + + ?assertEqual(ok, couch_jobs:remove(T, J)) + end). + + +resubmit_enqueues_job(#{t1 := T, j1 := J}) -> + ?_test(begin + ok = couch_jobs:add(T, J, #{}), + + {ok, J, WLock1} = couch_jobs:accept(T), + + ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) -> + couch_jobs:resubmit(Tx, T, J, undefined, WLock1) + end)), + + ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) -> + couch_jobs:finish(Tx, T, J, #{?DATA => 1}, WLock1) + end)), + + ?assertEqual({ok, #{?DATA => 1}, pending}, couch_jobs:get_job(T, J)), + + {ok, J, WLock2} = couch_jobs:accept(T), + + ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) -> + couch_jobs:finish(Tx, T, J, #{?DATA => 2}, WLock2) + end)), + ?assertEqual({ok, #{?DATA => 2}, finished}, couch_jobs:get_job(T, J)), + + ?assertEqual(ok, couch_jobs:remove(T, J)) + end). + + +add_custom_priority(#{t1 := T, j1 := J1, j2 := J2}) -> + ?_test(begin + ?assertEqual(ok, couch_jobs:add(T, J1, #{?PRIORITY => 5})), + ?assertEqual(ok, couch_jobs:add(T, J2, #{?PRIORITY => 3})), + + ?assertMatch({ok, J2, _}, couch_jobs:accept(T)), + ?assertMatch({ok, J1, _}, couch_jobs:accept(T)), + ?assertMatch(not_found, couch_jobs:accept(T)) + end). + + +resubmit_custom_priority(#{t1 := T, j1 := J}) -> + ?_test(begin + ?assertEqual(ok, couch_jobs:add(T, J, #{?PRIORITY => 7})), + {ok, J, WLock} = couch_jobs:accept(T), + ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) -> + couch_jobs:resubmit(Tx, T, J, 9, WLock) + end)), + ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) -> + couch_jobs:finish(Tx, T, J, #{?DATA => 1}, WLock) + end)), + ?assertEqual({ok, #{?DATA => 1, ?PRIORITY => 9}, pending}, + couch_jobs:get_job(T, J)) + end). + + +accept_max_priority(#{t1 := T, j1 := J1, j2 := J2}) -> + ?_test(begin + ok = couch_jobs:add(T, J1, #{?PRIORITY => <<"5">>}), + ok = couch_jobs:add(T, J2, #{?PRIORITY => <<"3">>}), + ?assertEqual(not_found, couch_jobs:accept(T, <<"2">>)), + ?assertMatch({ok, J2, _}, couch_jobs:accept(T, <<"3">>)), + ?assertMatch({ok, J1, _}, couch_jobs:accept(T, <<"9">>)) + end). + + +subscribe(#{t1 := T, j1 := J}) -> + ?_test(begin + ok = couch_jobs:add(T, J, #{}), + + SubRes0 = couch_jobs:subscribe(T, J), + ?assertMatch({ok, {_, _}, pending}, SubRes0), + {ok, SubId0, pending} = SubRes0, + + ?assertEqual(ok, couch_jobs:unsubscribe(SubId0)), + + SubRes = couch_jobs:subscribe(T, J), + ?assertMatch({ok, {_, _}, pending}, SubRes), + {ok, SubId, pending} = SubRes, + + {ok, J, WLock} = couch_jobs:accept(T), + ?assertMatch(timeout, couch_jobs:wait_job_state(SubId, finished, 50)), + ?assertMatch({T, J, running}, couch_jobs:wait_job_state(SubId, 5000)), + + ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) -> + couch_jobs:update(Tx, T, J, #{?DATA => 1}, WLock) + end)), + + % Make sure we get intermediate `running` updates + ?assertMatch({T, J, running}, couch_jobs:wait_job_state(SubId, 5000)), + + ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) -> + couch_jobs:finish(Tx, T, J, #{}, WLock) + end)), + ?assertMatch({T, J, finished}, couch_jobs:wait_job_state(SubId, + 5000)), + + ?assertEqual(timeout, couch_jobs:wait_job_state(SubId, 50)), + ?assertEqual(finished, couch_jobs:subscribe(T, J)), + + ?assertEqual(ok, couch_jobs:remove(T, J)) + end). + + +subscribe_callback(#{t1 := T, j1 := J}) -> + ?_test(begin + ok = couch_jobs:add(T, J, #{}), + + TestPid = self(), + SomeRef = make_ref(), + Cbk = fun(Type, JobId, JobState) -> + TestPid ! {SomeRef, Type, JobId, JobState} + end, + + SubRes = couch_jobs:subscribe(T, J, Cbk), + ?assertMatch({ok, {_, _}, pending}, SubRes), + {ok, _, pending} = SubRes, + + {ok, J, WLock} = couch_jobs:accept(T), + receive {SomeRef, T, J, running} -> ok end, + + ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) -> + couch_jobs:finish(Tx, T, J, #{}, WLock) + end)), + receive {SomeRef, T, J, finished} -> ok end + end). + + +subscribe_errors(#{t1 := T, j1 := J}) -> + ?_test(begin + ok = couch_jobs:add(T, J, #{}), + + Cbk = fun(_Type, _JobId, _JobState) -> ok end, + + ?assertMatch({error, _}, couch_jobs:subscribe(<<"badtype">>, J)), + ?assertMatch({error, _}, couch_jobs:subscribe(<<"badtype">>, J, Cbk)), + + ?assertEqual(not_found, couch_jobs:subscribe(T, <<"j5">>)), + ?assertEqual(not_found, couch_jobs:subscribe(T, <<"j5">>, Cbk)), + + {ok, J, WLock} = couch_jobs:accept(T), + ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) -> + couch_jobs:finish(Tx, T, J, #{}, WLock) + end)), + + ?assertEqual(finished, couch_jobs:subscribe(T, J)), + ?assertEqual(finished, couch_jobs:subscribe(T, J, Cbk)) + end). + + +enqueue_inactive(#{t1 := T, j1 := J, t1_timeout := Timeout}) -> + {timeout, 15, ?_test(begin + ok = couch_jobs:add(T, J, #{}), + + {ok, J, WLock} = couch_jobs:accept(T), + + {ok, SubId, running} = couch_jobs:subscribe(T, J), + ?assertEqual({T, J, pending}, couch_jobs:wait_job_state(SubId, + pending, 3 * Timeout * 1000)), + + ?assertMatch({ok, #{}, pending}, couch_jobs:get_job(T, J)), + + ?assertEqual(worker_conflict, fabric2_fdb:transactional(fun(Tx) -> + couch_jobs:update(Tx, T, J, #{?DATA => 1}, WLock) + end)), + + + ?assertEqual(worker_conflict, fabric2_fdb:transactional(fun(Tx) -> + couch_jobs:finish(Tx, T, J, #{}, WLock) + end)), + + + ?assertEqual(worker_conflict, fabric2_fdb:transactional(fun(Tx) -> + couch_jobs:resubmit(Tx, T, J, undefined, WLock) + end)) + end)}. + + +cancel_running_job(#{t1 := T, j1 := J}) -> + ?_test(begin + ok = couch_jobs:add(T, J, #{}), + {ok, J, WLock} = couch_jobs:accept(T), + ?assertEqual(canceled, couch_jobs:remove(T, J)), + % Try again and it should be a no-op + ?assertEqual(canceled, couch_jobs:remove(T, J)), + + ?assertEqual(canceled, fabric2_fdb:transactional(fun(Tx) -> + couch_jobs:update(Tx, T, J, #{?DATA => 1}, WLock) + end)), + + ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) -> + couch_jobs:finish(Tx, T, J, #{?DATA => 2}, WLock) + end)), + + ?assertMatch({ok, #{?DATA := 2}, finished}, couch_jobs:get_job(T, J)), + + ?assertEqual(ok, couch_jobs:remove(T, J)) + end). + + +stop_and_remove_running_job(#{t1 := T, j1 := J}) -> + ?_test(begin + ok = couch_jobs:add(T, J, #{}), + {ok, J, WLock} = couch_jobs:accept(T), + {_, Ref} = spawn_monitor(fun() -> + exit({result, couch_jobs:stop_and_remove(T, J, 10000)}) + end), + + fabric2_fdb:transactional(fun(Tx) -> + couch_jobs:update(Tx, T, J, #{?DATA => 1}, WLock) + end), + + fabric2_fdb:transactional(fun(Tx) -> + couch_jobs:update(Tx, T, J, #{?DATA => 2}, WLock) + end), + + ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) -> + couch_jobs:finish(Tx, T, J, #{?DATA => 3}, WLock) + end)), + + Exit = receive {'DOWN', Ref, _, _, Reason} -> Reason end, + ?assertEqual({result, ok}, Exit), + + ?assertMatch(not_found, couch_jobs:get_job(T, J)) + end). + + +clear_type_works(#{t1 := T, j1 := J}) -> + ?_test(begin + ok = couch_jobs:add(T, J, #{}), + ?assertEqual([{J, pending, #{}}], couch_jobs:get_jobs(T)), + couch_jobs_fdb:clear_type(T), + ?assertEqual([], couch_jobs:get_jobs(T)) + end). -- cgit v1.2.1