summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2019-05-16 19:11:50 -0400
committerNick Vatamaniuc <vatamane@apache.org>2019-06-03 13:43:36 -0400
commit4a904b555a4d5f6bea178583357ff5bbcab7e181 (patch)
treefa47db0efee3b18af9be79e3536387bb9d00f15a
parentabd5d86b5c673dcf638405fc7fe10c29577eb96d (diff)
downloadcouchdb-prototype/rfc-couch-jobs.tar.gz
CouchDB background jobsprototype/rfc-couch-jobs
Done: - README file to describe each module - Supervisor and app structures - Main API module (couch_jobs.erl) - FDB read/write code in couch_jobs_fdb.erl - All jobs creation API: add(), remove(), submit(), get_job() - Workers API: finish(), resubmit(), update() and accept() - Directory path caching with metadata version checks - Activity monitor - Type monitor (start notifiers and activity monitors when new types detected) - Job state subscription: subscribe(), unsubscribe() - Tests. 90+% coverage Todo: - Example worker (maybe part of testing)
-rw-r--r--rebar.config.script1
-rw-r--r--rel/reltool.config2
-rw-r--r--src/couch_jobs/.gitignore4
-rw-r--r--src/couch_jobs/README.md66
-rw-r--r--src/couch_jobs/rebar.config14
-rw-r--r--src/couch_jobs/src/couch_jobs.app.src31
-rw-r--r--src/couch_jobs/src/couch_jobs.erl300
-rw-r--r--src/couch_jobs/src/couch_jobs.hrl51
-rw-r--r--src/couch_jobs/src/couch_jobs_activity_monitor.erl137
-rw-r--r--src/couch_jobs/src/couch_jobs_activity_monitor_sup.erl64
-rw-r--r--src/couch_jobs/src/couch_jobs_app.erl26
-rw-r--r--src/couch_jobs/src/couch_jobs_fdb.erl565
-rw-r--r--src/couch_jobs/src/couch_jobs_notifier.erl218
-rw-r--r--src/couch_jobs/src/couch_jobs_notifier_sup.erl64
-rw-r--r--src/couch_jobs/src/couch_jobs_pending.erl159
-rw-r--r--src/couch_jobs/src/couch_jobs_server.erl184
-rw-r--r--src/couch_jobs/src/couch_jobs_sup.erl66
-rw-r--r--src/couch_jobs/src/couch_jobs_type_monitor.erl78
-rw-r--r--src/couch_jobs/test/couch_jobs_tests.erl597
19 files changed, 2627 insertions, 0 deletions
diff --git a/rebar.config.script b/rebar.config.script
index 3b58bcb1e..2def72484 100644
--- a/rebar.config.script
+++ b/rebar.config.script
@@ -76,6 +76,7 @@ SubDirs = [
"src/couch_tests",
"src/ddoc_cache",
"src/fabric",
+ "src/couch_jobs",
"src/global_changes",
"src/mango",
"src/rexi",
diff --git a/rel/reltool.config b/rel/reltool.config
index 1051d2e77..afebc4466 100644
--- a/rel/reltool.config
+++ b/rel/reltool.config
@@ -34,6 +34,7 @@
couch,
couch_epi,
couch_index,
+ couch_jobs,
couch_log,
couch_mrview,
couch_plugins,
@@ -90,6 +91,7 @@
{app, config, [{incl_cond, include}]},
{app, couch, [{incl_cond, include}]},
{app, couch_epi, [{incl_cond, include}]},
+ {app, couch_jobs, [{incl_cond, include}]},
{app, couch_index, [{incl_cond, include}]},
{app, couch_log, [{incl_cond, include}]},
{app, couch_mrview, [{incl_cond, include}]},
diff --git a/src/couch_jobs/.gitignore b/src/couch_jobs/.gitignore
new file mode 100644
index 000000000..6ef4c5212
--- /dev/null
+++ b/src/couch_jobs/.gitignore
@@ -0,0 +1,4 @@
+*.beam
+.eunit
+ebin/couch_jobs.app
+.DS_Store \ No newline at end of file
diff --git a/src/couch_jobs/README.md b/src/couch_jobs/README.md
new file mode 100644
index 000000000..b0552b233
--- /dev/null
+++ b/src/couch_jobs/README.md
@@ -0,0 +1,66 @@
+CouchDB Jobs Application
+========================
+
+Run background jobs in CouchDB
+
+Design (RFC) discussion: https://github.com/apache/couchdb-documentation/pull/409/files
+
+This is a description of some of the modules:
+
+ * `couch_jobs`: The main API module. It contains functions for creating,
+ accepting, executing, and monitoring jobs. A common pattern in this module
+ is to get a jobs transaction object (named `JTx` throughout the code), then
+ start a transaction and call a bunch of functions from `couch_jobs_fdb` in
+ that transaction.
+
+ * `couch_jobs_fdb`: This is a layer that talks to FDB. There is a lot of tuple
+ packing and unpacking, reading ranges and also managing transaction objects.
+
+ * `couch_jobs_pending`: This module implements the pending jobs queue. These
+ functions could all go in `couch_jobs_fdb` but the implemention was fairly
+ self-contained, with its own private helper functions, so it made sense to
+ move to a separate module.
+
+ * `couch_jobs_activity_monitor`: Here is where the "activity monitor"
+ functionality is implemented. That's done with `gen_server` instance running for
+ each type. This `gen_server` periodically check if there are inactive jobs
+ for its type, and if they are, it re-enqueues them. If the timeout value
+ changes, then it skips the pending check, until the new timeout expires.
+
+ * `couch_jobs_activity_monitor_sup` : This is a simple one-for-one supervisor
+ to spawn `couch_job_activity_monitor` instances for each type.
+
+ * `couch_jobs_type_monitor` : This is a helper process meant to be
+ spawn_linked from a parent `gen_server` and use to monitor activity for a
+ particular job type. If any jobs of that type have an update it notifies the
+ parent process. To avoid firing for every single update, there is a
+ configurable `holdoff` parameter to wait a bit before notifying the parent
+ process. This functionality is used by the `couch_jobs_notifier` to drive
+ job state subscriptions.
+
+ * `couch_jobs_notifier`: Is responsible for job state subscriptions. Just like
+ with activity monitors there is a `gen_server` instance of this running per
+ each type. It uses a linked `couch_jobs_type_monitor` process to wait for
+ any job updates. When an update notification arrives, it can efficiently
+ find out if any active jobs have been updated, by reading the `(?JOBS,
+ ?ACTIVITY, Type, Sequence)` range. That should account for the bulk of
+ changes. The jobs that are not active anymore, are queried individually.
+ Subscriptions are managed in an ordered set ETS table with the schema that
+ looks like `{{JobId, Ref}, {Fun, Pid, JobState}}`. The `Ref` is reference
+ used to track individual subscriptions and also used to monitor subscriber
+ processes in case they die (then it it auto-subscribes them).
+
+ * `couch_jobs_notifier_sup`: A simple one-for-one supervisor to spawn
+ `couch_job_notifier` processes for each type.
+
+ * `couch_jobs_server`: This is a `gen_server` which keeps track of job
+ types. It then starts or stops activity monitors and notifiers for each
+ type. To do that it queries the ` (?JOBS, ?ACTIVITY_TIMEOUT)` periodically.
+
+ * `couch_jobs_sup`: This is the application supervisor. The restart strategy
+ is `rest_for_one`, meaning that a when a child restarts, the sibling
+ following it will restart. One interesting entry there is the first child
+ which is used just to create an ETS table used by `couch_jobs_fdb` to cache
+ transaction object (`JTx` mentioned above). That child calls `init_cache/0`,
+ where it creates the ETS then returns with `ignore` so it doesn't actually
+ spawn a process. The ETS table will be owned by the supervisor process.
diff --git a/src/couch_jobs/rebar.config b/src/couch_jobs/rebar.config
new file mode 100644
index 000000000..362c8785e
--- /dev/null
+++ b/src/couch_jobs/rebar.config
@@ -0,0 +1,14 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+{cover_enabled, true}.
+{cover_print_enabled, true}.
diff --git a/src/couch_jobs/src/couch_jobs.app.src b/src/couch_jobs/src/couch_jobs.app.src
new file mode 100644
index 000000000..8ded14c6f
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs.app.src
@@ -0,0 +1,31 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+{application, couch_jobs, [
+ {description, "CouchDB Jobs"},
+ {vsn, git},
+ {mod, {couch_jobs_app, []}},
+ {registered, [
+ couch_jobs_sup,
+ couch_jobs_activity_monitor_sup,
+ couch_jobs_notifier_sup,
+ couch_jobs_server
+ ]},
+ {applications, [
+ kernel,
+ stdlib,
+ erlfdb,
+ couch_log,
+ config,
+ fabric
+ ]}
+]}.
diff --git a/src/couch_jobs/src/couch_jobs.erl b/src/couch_jobs/src/couch_jobs.erl
new file mode 100644
index 000000000..20cfb5de2
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs.erl
@@ -0,0 +1,300 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs).
+
+-export([
+ add/3,
+ remove/2,
+ stop_and_remove/3,
+ resubmit/2,
+ resubmit/3,
+ get_job/2,
+ get_jobs/1,
+ get_jobs/0,
+
+ accept/1,
+ accept/2,
+ finish/5,
+ resubmit/5,
+ update/5,
+
+ subscribe/2,
+ subscribe/3,
+ unsubscribe/1,
+ wait_job_state/2,
+ wait_job_state/3,
+
+ set_type_timeout/2,
+ clear_type_timeout/1,
+ get_type_timeout/1
+]).
+
+
+-include("couch_jobs.hrl").
+
+
+%% Job Creation API
+
+-spec add(job_type(), job_id(), job_opts()) -> ok | {error, any()}.
+add(Type, JobId, JobOpts) ->
+ try validate_job_opts(JobOpts) of
+ ok ->
+ couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+ couch_jobs_fdb:add(JTx, Type, JobId, JobOpts)
+ end)
+ catch
+ Tag:Err -> {error, {invalid_args, {Tag, Err}}}
+ end.
+
+
+-spec remove(job_type(), job_id()) -> ok | not_found | canceled.
+remove(Type, JobId) ->
+ couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+ couch_jobs_fdb:remove(JTx, Type, JobId)
+ end).
+
+
+-spec stop_and_remove(job_type(), job_id(), timeout()) ->
+ ok | not_found | timeout.
+stop_and_remove(Type, JobId, Timeout) ->
+ case remove(Type, JobId) of
+ not_found ->
+ not_found;
+ ok ->
+ ok;
+ canceled ->
+ case subscribe(Type, JobId) of
+ not_found ->
+ not_found;
+ finished ->
+ ok = remove(Type, JobId);
+ {ok, SubId, _JobState} ->
+ case wait_job_state(SubId, finished, Timeout) of
+ timeout ->
+ timeout;
+ {Type, JobId, finished} ->
+ ok = remove(Type, JobId)
+ end
+ end
+ end.
+
+
+-spec resubmit(job_type(), job_id()) -> ok | not_found.
+resubmit(Type, JobId) ->
+ couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+ couch_jobs_fdb:resubmit(JTx, Type, JobId, undefined)
+ end).
+
+
+-spec resubmit(job_type(), job_id(), job_priority()) -> ok | not_found.
+resubmit(Type, JobId, NewPriority) ->
+ couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+ couch_jobs_fdb:resubmit(JTx, Type, JobId, NewPriority)
+ end).
+
+
+-spec get_job(job_type(), job_id()) -> {ok, job_opts(), job_state()}
+ | not_found.
+get_job(Type, JobId) ->
+ couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+ couch_jobs_fdb:get_job(JTx, Type, JobId)
+ end).
+
+
+-spec get_jobs(job_type()) -> [{job_id(), job_state(), job_opts()}].
+get_jobs(Type) ->
+ couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+ couch_jobs_fdb:get_jobs(JTx, Type)
+ end).
+
+
+-spec get_jobs() -> [{job_type(), job_id(), job_state(), job_opts()}].
+get_jobs() ->
+ couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+ couch_jobs_fdb:get_jobs(JTx)
+ end).
+
+
+%% Worker Implementation API
+
+-spec accept(job_type()) -> {ok, job_id(), worker_lock()} | not_found.
+accept(Type) ->
+ accept(Type, undefined).
+
+
+-spec accept(job_type(), job_priority()) -> {ok, job_id(), worker_lock()}
+ | not_found.
+accept(Type, MaxPriority) ->
+ couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+ couch_jobs_fdb:accept(JTx, Type, MaxPriority)
+ end).
+
+
+-spec finish(jtx(), job_type(), job_id(), job_opts(), worker_lock()) -> ok |
+ worker_conflict | no_return().
+finish(Tx, Type, JobId, JobOpts, WorkerLockId) ->
+ couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+ couch_jobs_fdb:finish(JTx, Type, JobId, JobOpts, WorkerLockId)
+ end).
+
+
+-spec resubmit(jtx(), job_type(), job_id(), job_priority() | undefined,
+ worker_lock()) -> ok | worker_conflict | canceled | no_return().
+resubmit(Tx, Type, JobId, NewPriority, WorkerLockId) ->
+ couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+ couch_jobs_fdb:resubmit(JTx, Type, JobId, NewPriority, WorkerLockId)
+ end).
+
+
+-spec update(jtx(), job_type(), job_id(), job_opts(), worker_lock()) -> ok |
+ worker_conflict | canceled | no_return().
+update(Tx, Type, JobId, JobOpts, WorkerLockId) ->
+ couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+ couch_jobs_fdb:update(JTx, Type, JobId, JobOpts, WorkerLockId)
+ end).
+
+
+%% Subscription API
+
+% Receive events as messages. Wait for them using `wait_job_state/2,3`
+% functions.
+%
+-spec subscribe(job_type(), job_id()) -> {ok, job_subscription(), job_state()}
+ | not_found | {error, any()}.
+subscribe(Type, JobId) ->
+ case couch_jobs_server:get_notifier_server(Type) of
+ {ok, Server} ->
+ case couch_jobs_notifier:subscribe(Server, JobId, self()) of
+ {Ref, JobState} -> {ok, {Server, Ref}, JobState};
+ not_found -> not_found;
+ finished -> finished
+ end;
+ {error, Error} ->
+ {error, Error}
+ end.
+
+
+% Receive events as callbacks. Callback arguments will be:
+% Fun(SubscriptionRef, Type, JobId, JobState)
+%
+% Returns:
+% - {ok, SubscriptionRef, CurrentState} where:
+% - SubscriptionRef is opaque reference for the subscription
+% - CurrentState is the current job state
+% - `not_found` if job was not found
+% - `finished` if job is already in the `finished` state
+%
+-spec subscribe(job_type(), job_id(), job_callback()) -> {ok,
+ job_subscription(), job_state()} | not_found | {error, any()}.
+subscribe(Type, JobId, Fun) when is_function(Fun, 3) ->
+ case couch_jobs_server:get_notifier_server(Type) of
+ {ok, Server} ->
+ case couch_jobs_notifier:subscribe(Server, JobId, Fun, self()) of
+ {Ref, JobState} -> {ok, {Server, Ref}, JobState};
+ not_found -> not_found;
+ finished -> finished
+ end;
+ {error, Error} ->
+ {error, Error}
+ end.
+
+
+% Unsubscribe from getting notifications based on a particular subscription.
+% Each subscription should be followed by its own unsubscription call. However,
+% subscriber processes are also monitored and auto-unsubscribed if they exit.
+% So the subscribing process is exiting, calling this function is optional.
+%
+-spec unsubscribe(job_subscription()) -> ok.
+unsubscribe({Server, Ref}) when is_pid(Server), is_reference(Ref) ->
+ try
+ couch_jobs_notifier:unsubscribe(Server, Ref)
+ after
+ flush_notifications(Ref)
+ end.
+
+
+% Wait to receive job state updates as messages.
+%
+-spec wait_job_state(job_subscription(), timeout()) -> {job_type(), job_id(),
+ job_state()} | timeout.
+wait_job_state({_, Ref}, Timeout) ->
+ receive
+ {?COUCH_JOBS_EVENT, Ref, Type, JobId, JobState} ->
+ {Type, JobId, JobState}
+ after
+ Timeout -> timeout
+ end.
+
+
+% Wait for a particular job state received as a message.
+%
+-spec wait_job_state(job_subscription(), job_state(), timeout()) ->
+ {job_type(), job_id(), job_state()} | timeout.
+wait_job_state({_, Ref}, JobState, Timeout) ->
+ receive
+ {?COUCH_JOBS_EVENT, Ref, Type, JobId, JobState} ->
+ {Type, JobId, JobState}
+ after
+ Timeout -> timeout
+ end.
+
+
+%% Job type timeout API
+
+% These functions manipulate the activity timeout for each job type.
+
+-spec set_type_timeout(job_type(), timeout()) -> ok.
+set_type_timeout(Type, Timeout) ->
+ couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+ couch_jobs_fdb:set_type_timeout(JTx, Type, Timeout)
+ end).
+
+
+-spec clear_type_timeout(job_type()) -> ok.
+clear_type_timeout(Type) ->
+ couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+ couch_jobs_fdb:clear_type_timeout(JTx, Type)
+ end).
+
+
+-spec get_type_timeout(job_type()) -> timeout().
+get_type_timeout(Type) ->
+ couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+ couch_jobs_fdb:get_type_timeout(JTx, Type)
+ end).
+
+
+%% Private utilities
+
+validate_job_opts(#{} = JobOpts) ->
+ jiffy:encode(JobOpts),
+ case maps:get(?OPT_RESUBMIT, JobOpts, undefined) of
+ undefined -> ok;
+ true -> ok;
+ Resubmit -> error({invalid_resubmit, Resubmit, JobOpts})
+ end,
+ case maps:get(?OPT_PRIORITY, JobOpts, undefined) of
+ undefined -> ok;
+ Binary when is_binary(Binary) -> ok;
+ Int when is_integer(Int), Int >= 0 -> ok;
+ Priority -> error({invalid_priority, Priority, JobOpts})
+ end.
+
+
+flush_notifications(Ref) ->
+ receive
+ {?COUCH_JOBS_EVENT, Ref, _, _, _} ->
+ flush_notifications(Ref)
+ after
+ 0 -> ok
+ end.
diff --git a/src/couch_jobs/src/couch_jobs.hrl b/src/couch_jobs/src/couch_jobs.hrl
new file mode 100644
index 000000000..dc3e7ffb1
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs.hrl
@@ -0,0 +1,51 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+
+% JobOpts map/json field definitions
+%
+-define(OPT_PRIORITY, <<"priority">>).
+-define(OPT_DATA, <<"data">>).
+-define(OPT_CANCEL, <<"cancel">>).
+-define(OPT_RESUBMIT, <<"resubmit">>).
+
+% These might be in a fabric public hrl eventually
+%
+-define(uint2bin(I), binary:encode_unsigned(I, little)).
+-define(bin2uint(I), binary:decode_unsigned(I, little)).
+-define(UNSET_VS, {versionstamp, 16#FFFFFFFFFFFFFFFF, 16#FFFF}).
+-define(METADATA_VERSION_KEY, <<"$metadata_version_key$">>).
+
+% Data model definitions
+%
+-define(JOBS, 51). % coordinate with fabric2.hrl
+-define(DATA, 1).
+-define(PENDING, 2).
+-define(WATCHES, 3).
+-define(ACTIVITY_TIMEOUT, 4).
+-define(ACTIVITY, 5).
+
+
+% Couch jobs event notifier tag
+-define(COUCH_JOBS_EVENT, '$couch_jobs_event').
+
+
+-type jtx() :: map().
+-type job_id() :: binary().
+-type job_type() :: tuple() | binary() | non_neg_integer().
+-type job_opts() :: map().
+-type job_state() :: running | pending | finished.
+-type job_priority() :: tuple() | binary() | non_neg_integer().
+-type job_subscription() :: {pid(), reference()}.
+-type job_callback() :: fun((job_subscription(), job_type(), job_id(),
+ job_state()) -> ok).
+-type worker_lock() :: binary().
diff --git a/src/couch_jobs/src/couch_jobs_activity_monitor.erl b/src/couch_jobs/src/couch_jobs_activity_monitor.erl
new file mode 100644
index 000000000..07ff66aa2
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_activity_monitor.erl
@@ -0,0 +1,137 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_activity_monitor).
+
+-behaviour(gen_server).
+
+
+-export([
+ start_link/1
+]).
+
+-export([
+ init/1,
+ terminate/2,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ code_change/3
+]).
+
+-record(st, {
+ jtx,
+ type,
+ tref,
+ timeout = 0,
+ vs = not_found
+}).
+
+
+-define(MAX_JITTER_DEFAULT, 15000).
+-define(MISSING_TIMEOUT_CHECK, 5000).
+
+
+start_link(Type) ->
+ gen_server:start_link(?MODULE, [Type], []).
+
+
+%% gen_server callbacks
+
+init([Type]) ->
+ St = #st{jtx = couch_jobs_fdb:get_jtx(), type = Type},
+ {ok, schedule_check(St)}.
+
+
+terminate(_, _St) ->
+ ok.
+
+
+handle_call(Msg, _From, St) ->
+ {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
+
+
+handle_cast(Msg, St) ->
+ {stop, {bad_cast, Msg}, St}.
+
+
+handle_info(check_activity, St) ->
+ St1 = check_activity(St),
+ St2 = schedule_check(St1),
+ {noreply, St2};
+
+handle_info(Msg, St) ->
+ {stop, {bad_info, Msg}, St}.
+
+
+code_change(_OldVsn, St, _Extra) ->
+ {ok, St}.
+
+
+% Private helper functions
+
+check_activity(#st{vs = not_found} = St) ->
+ St#st{vs = get_vs(St)};
+
+check_activity(#st{} = St) ->
+ NewVS = get_vs(St),
+ case get_inactive_since(St) of
+ [] -> ok;
+ [_ | _] -> re_enqueue_inactive(St)
+ end,
+ St#st{vs = NewVS}.
+
+
+get_timeout_msec(JTx, Type) ->
+ TimeoutVal = couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+ couch_jobs_fdb:get_type_timeout(JTx1, Type)
+ end),
+ case TimeoutVal of
+ not_found -> not_found;
+ ValSeconds -> timer:seconds(ValSeconds)
+ end.
+
+
+schedule_check(#st{jtx = JTx, type = Type, timeout = OldTimeout} = St) ->
+ % Reset versionstamp if timeout changed.
+ St1 = case get_timeout_msec(JTx, Type) of
+ not_found -> St#st{vs = not_found, timeout = ?MISSING_TIMEOUT_CHECK};
+ OldTimeout -> St;
+ NewTimeout -> St#st{vs = not_found, timeout = NewTimeout}
+ end,
+ #st{timeout = Timeout} = St1,
+ MaxJitter = max(Timeout div 2, get_max_jitter_msec()),
+ Wait = Timeout + rand:uniform(min(1, MaxJitter)),
+ St1#st{tref = erlang:send_after(Wait, self(), check_activity)}.
+
+
+get_vs(#st{jtx = JTx, type = Type}) ->
+ couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+ couch_jobs_fdb:get_activity_vs(JTx1, Type)
+ end).
+
+
+re_enqueue_inactive(#st{jtx = JTx, type = Type, vs = VS}) ->
+ couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+ couch_jobs_fdb:re_enqueue_inactive(JTx1, Type, VS)
+ end).
+
+
+get_inactive_since(#st{jtx = JTx, type = Type, vs = VS}) ->
+ couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+ couch_jobs_fdb:get_inactive_since(JTx1, Type, VS)
+ end).
+
+
+get_max_jitter_msec()->
+ config:get_integer("couch_jobs", "activity_monitor_max_jitter_msec",
+ ?MAX_JITTER_DEFAULT).
diff --git a/src/couch_jobs/src/couch_jobs_activity_monitor_sup.erl b/src/couch_jobs/src/couch_jobs_activity_monitor_sup.erl
new file mode 100644
index 000000000..b11161a24
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_activity_monitor_sup.erl
@@ -0,0 +1,64 @@
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_activity_monitor_sup).
+
+
+-behaviour(supervisor).
+
+
+-export([
+ start_link/0,
+
+ start_monitor/1,
+ stop_monitor/1,
+ get_child_pids/0
+]).
+
+-export([
+ init/1
+]).
+
+
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+
+start_monitor(Type) ->
+ supervisor:start_child(?MODULE, [Type]).
+
+
+stop_monitor(Pid) ->
+ supervisor:terminate_child(?MODULE, Pid).
+
+
+get_child_pids() ->
+ lists:map(fun({_Id, Pid, _Type, _Mod}) ->
+ Pid
+ end, supervisor:which_children(?MODULE)).
+
+
+init(_) ->
+ Flags = #{
+ strategy => simple_one_for_one,
+ intensity => 10,
+ period => 3
+ },
+ Children = [
+ #{
+ id => couch_jobs_monitor,
+ restart => temporary,
+ start => {couch_jobs_activity_monitor, start_link, []}
+ }
+ ],
+ {ok, {Flags, Children}}.
diff --git a/src/couch_jobs/src/couch_jobs_app.erl b/src/couch_jobs/src/couch_jobs_app.erl
new file mode 100644
index 000000000..720b94891
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_app.erl
@@ -0,0 +1,26 @@
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_app).
+
+
+-behaviour(application).
+
+
+-export([
+ start/2,
+ stop/1
+]).
+
+
+start(_Type, []) ->
+ couch_jobs_sup:start_link().
+
+
+stop([]) ->
+ ok.
diff --git a/src/couch_jobs/src/couch_jobs_fdb.erl b/src/couch_jobs/src/couch_jobs_fdb.erl
new file mode 100644
index 000000000..093516858
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_fdb.erl
@@ -0,0 +1,565 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_fdb).
+
+
+-export([
+ add/4,
+ remove/3,
+ resubmit/4,
+ get_job/3,
+ get_jobs/2,
+ get_jobs/1,
+
+ accept/3,
+ finish/5,
+ resubmit/5,
+ update/5,
+
+ set_type_timeout/3,
+ clear_type_timeout/2,
+ get_type_timeout/2,
+ get_types/1,
+
+ get_activity_vs/2,
+ get_activity_vs_and_watch/2,
+ get_active_since/3,
+ get_inactive_since/3,
+ re_enqueue_inactive/3,
+
+ init_cache/0,
+
+ get_jtx/0,
+ get_jtx/1,
+ tx/2,
+
+ has_versionstamp/1,
+
+ clear_jobs/0,
+ clear_type/1
+]).
+
+
+-include("couch_jobs.hrl").
+
+
+% Data model
+%
+% (?JOBS, ?DATA, Type, JobId) = (Sequence, WorkerLockId, Priority, JobOpts)
+% (?JOBS, ?PENDING, Type, Priority, JobId) = ""
+% (?JOBS, ?WATCHES, Type) = Sequence
+% (?JOBS, ?ACTIVITY_TIMEOUT, Type) = ActivityTimeout
+% (?JOBS, ?ACTIVITY, Type, Sequence) = JobId
+
+% Job creation API
+
+add(#{jtx := true} = JTx0, Type, JobId, JobOpts) ->
+ #{tx := Tx, jobs_path := Jobs} = JTx = get_jtx(JTx0),
+ case get_type_timeout(JTx, Type) of
+ not_found ->
+ {error, no_type_timeout};
+ Int when is_integer(Int) ->
+ Key = erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs),
+ case erlfdb:wait(erlfdb:get(Tx, Key)) of
+ <<_/binary>> -> {error, duplicate_job};
+ not_found -> maybe_enqueue(JTx, Type, JobId, JobOpts, true)
+ end
+ end.
+
+
+remove(#{jtx := true} = JTx0, Type, JobId) ->
+ #{tx := Tx, jobs_path := Jobs} = JTx = get_jtx(JTx0),
+ Key = erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs),
+ case get_job(Tx, Key) of
+ {_, WorkerLockId, _Priority, _} = Job when WorkerLockId =/= null ->
+ ok = cancel(JTx, Key, Job),
+ canceled;
+ {_, _WorkerLockId, Priority, _} when Priority =/= null ->
+ couch_jobs_pending:remove(JTx, Type, Priority, JobId),
+ erlfdb:clear(Tx, Key),
+ ok;
+ {_, _WorkerLockId, _Priority, _} ->
+ erlfdb:clear(Tx, Key),
+ ok;
+ not_found ->
+ not_found
+ end.
+
+
+resubmit(#{jtx := true} = JTx, Type, JobId, NewPriority) ->
+ #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+ Key = erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs),
+ case get_job(Tx, Key) of
+ {Seq, WorkerLockId, Priority, #{} = JobOpts} ->
+ JobOpts1 = JobOpts#{?OPT_RESUBMIT => true},
+ JobOpts2 = update_priority(JobOpts1, NewPriority),
+ JobOptsEnc = jiffy:encode(JobOpts2),
+ Val = erlfdb_tuple:pack({Seq, WorkerLockId, Priority, JobOptsEnc}),
+ erlfdb:set(Tx, Key, Val),
+ ok;
+ not_found ->
+ not_found
+ end.
+
+
+get_job(#{jtx := true} = JTx, Type, JobId) ->
+ #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+ Key = erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs),
+ case get_job(Tx, Key) of
+ {_, WorkerLockId, Priority, JobOpts} ->
+ {ok, JobOpts, job_state(WorkerLockId, Priority)};
+ not_found ->
+ not_found
+ end.
+
+
+get_jobs(#{jtx := true} = JTx, Type) ->
+ #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+ Prefix = erlfdb_tuple:pack({?DATA, Type}, Jobs),
+ Opts = [{streaming_mode, want_all}],
+ Result = erlfdb:wait(erlfdb:get_range_startswith(Tx, Prefix, Opts)),
+ lists:map(fun({K, V}) ->
+ {JobId} = erlfdb_tuple:unpack(K, Prefix),
+ {_Seq, WorkerLockId, Priority, JobOpts} = unpack_job(V),
+ JobState = job_state(WorkerLockId, Priority),
+ {JobId, JobState, JobOpts}
+ end, Result).
+
+
+get_jobs(#{jtx := true} = JTx) ->
+ #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+ Prefix = erlfdb_tuple:pack({?DATA}, Jobs),
+ Opts = [{streaming_mode, want_all}],
+ Result = erlfdb:wait(erlfdb:get_range_startswith(Tx, Prefix, Opts)),
+ lists:map(fun({K, V}) ->
+ {Type, JobId} = erlfdb_tuple:unpack(K, Prefix),
+ {_Seq, WorkerLockId, Priority, JobOpts} = unpack_job(V),
+ JobState = job_state(WorkerLockId, Priority),
+ {Type, JobId, JobState, JobOpts}
+ end, Result).
+
+
+% Worker public API
+
+accept(#{jtx := true} = JTx0, Type, MaxPriority) ->
+ #{jtx := true} = JTx = get_jtx(JTx0),
+ case couch_jobs_pending:dequeue(JTx, Type, MaxPriority) of
+ not_found ->
+ not_found;
+ <<_/binary>> = JobId ->
+ WorkerLockId = fabric2_util:uuid(),
+ update_lock(JTx, Type, JobId, WorkerLockId),
+ update_activity(JTx, Type, JobId, null),
+ {ok, JobId, WorkerLockId}
+ end.
+
+
+finish(#{jtx := true} = JTx0, Type, JobId, JobOpts, WorkerLockId) ->
+ #{tx := Tx, jobs_path := Jobs} = JTx = get_jtx(JTx0),
+ Key = erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs),
+ case get_job_and_status(Tx, Key, WorkerLockId) of
+ {Status, {Seq, _, _, JobOptsCur}} when
+ Status =:= ok orelse Status =:= canceled ->
+ % If the job was canceled, allow updating its data one last time
+ JobOpts1 = maps:merge(JobOptsCur, JobOpts),
+ Resubmit = maps:get(?OPT_RESUBMIT, JobOpts1, false) == true,
+ maybe_enqueue(JTx, Type, JobId, JobOpts1, Resubmit),
+ clear_activity(JTx, Type, Seq),
+ update_watch(JTx, Type),
+ ok;
+ {worker_conflict, _} ->
+ worker_conflict
+ end.
+
+
+resubmit(#{jtx := true} = JTx0, Type, JobId, NewPriority, WorkerLockId) ->
+ #{tx := Tx, jobs_path := Jobs} = JTx = get_jtx(JTx0),
+ Key = erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs),
+ case get_job_and_status(Tx, Key, WorkerLockId) of
+ {ok, {Seq, WorkerLockId, null, JobOpts}} ->
+ update_activity(JTx, Type, JobId, Seq),
+ JobOpts1 = JobOpts#{?OPT_RESUBMIT => true},
+ JobOpts2 = update_priority(JobOpts1, NewPriority),
+ update_job(JTx, Type, JobId, WorkerLockId, JobOpts2, Seq),
+ ok;
+ {Status, _} when Status =/= ok ->
+ Status
+ end.
+
+
+update(#{jtx := true} = JTx, Type, JobId, JobOpts, WorkerLockId) ->
+ #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+ Key = erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs),
+ case get_job_and_status(Tx, Key, WorkerLockId) of
+ {ok, {Seq, WorkerLockId, null, JobOptsCur}} ->
+ JobOpts1 = maps:merge(JobOptsCur, JobOpts),
+ update_job(JTx, Type, JobId, WorkerLockId, JobOpts1, Seq),
+ ok;
+ {Status, _} when Status =/= ok ->
+ Status
+ end.
+
+
+% Type and activity monitoring API
+
+set_type_timeout(#{jtx := true} = JTx, Type, Timeout) ->
+ #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+ Key = erlfdb_tuple:pack({?ACTIVITY_TIMEOUT, Type}, Jobs),
+ Val = erlfdb_tuple:pack({Timeout}),
+ erlfdb:set(Tx, Key, Val).
+
+
+clear_type_timeout(#{jtx := true} = JTx, Type) ->
+ #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+ Key = erlfdb_tuple:pack({?ACTIVITY_TIMEOUT, Type}, Jobs),
+ erlfdb:clear(Tx, Key).
+
+
+get_type_timeout(#{jtx := true} = JTx, Type) ->
+ #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+ Key = erlfdb_tuple:pack({?ACTIVITY_TIMEOUT, Type}, Jobs),
+ case erlfdb:wait(erlfdb:get_ss(Tx, Key)) of
+ not_found ->
+ not_found;
+ Val ->
+ {Timeout} = erlfdb_tuple:unpack(Val),
+ Timeout
+ end.
+
+
+get_types(#{jtx := true} = JTx) ->
+ #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+ Prefix = erlfdb_tuple:pack({?ACTIVITY_TIMEOUT}, Jobs),
+ Opts = [{streaming_mode, want_all}],
+ Result = erlfdb:wait(erlfdb:get_range_startswith(Tx, Prefix, Opts)),
+ lists:map(fun({K, _V}) ->
+ {Type} = erlfdb_tuple:unpack(K, Prefix),
+ Type
+ end, Result).
+
+
+get_activity_vs(#{jtx := true} = JTx, Type) ->
+ #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+ Key = erlfdb_tuple:pack({?WATCHES, Type}, Jobs),
+ case erlfdb:wait(erlfdb:get(Tx, Key)) of
+ not_found ->
+ not_found;
+ Val ->
+ {VS} = erlfdb_tuple:unpack(Val),
+ VS
+ end.
+
+
+get_activity_vs_and_watch(#{jtx := true} = JTx, Type) ->
+ #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+ Key = erlfdb_tuple:pack({?WATCHES, Type}, Jobs),
+ Future = erlfdb:get(Tx, Key),
+ Watch = erlfdb:watch(Tx, Key),
+ case erlfdb:wait(Future) of
+ not_found ->
+ {not_found, Watch};
+ Val ->
+ {VS} = erlfdb_tuple:unpack(Val),
+ {VS, Watch}
+ end.
+
+
+get_active_since(#{jtx := true} = JTx, Type, Versionstamp) ->
+ #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+ Prefix = erlfdb_tuple:pack({?ACTIVITY}, Jobs),
+ StartKey = erlfdb_tuple:pack({Type, Versionstamp}, Prefix),
+ StartKeySel = erlfdb_key:first_greater_than(StartKey),
+ {_, EndKey} = erlfdb_tuple:range({Type}, Prefix),
+ Opts = [{streaming_mode, want_all}],
+ Future = erlfdb:get_range(Tx, StartKeySel, EndKey, Opts),
+ lists:map(fun({_K, JobId}) -> JobId end, erlfdb:wait(Future)).
+
+
+get_inactive_since(#{jtx := true} = JTx, Type, Versionstamp) ->
+ #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+ Prefix = erlfdb_tuple:pack({?ACTIVITY}, Jobs),
+ {StartKey, _} = erlfdb_tuple:range({Type}, Prefix),
+ EndKey = erlfdb_tuple:pack({Type, Versionstamp}, Prefix),
+ EndKeySel = erlfdb_key:first_greater_than(EndKey),
+ Opts = [{streaming_mode, want_all}],
+ Future = erlfdb:get_range(Tx, StartKey, EndKeySel, Opts),
+ lists:map(fun({_K, JobId}) -> JobId end, erlfdb:wait(Future)).
+
+
+re_enqueue_inactive(#{jtx := true} = JTx, Type, Versionstamp) ->
+ #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+ JobIds = get_inactive_since(JTx, Type, Versionstamp),
+ lists:foreach(fun(JobId) ->
+ Key = erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs),
+ {Seq, _, _, JobOpts} = get_job(Tx, Key),
+ clear_activity(JTx, Type, Seq),
+ maybe_enqueue(JTx, Type, JobId, JobOpts, true)
+ end, JobIds),
+ case length(JobIds) > 0 of
+ true -> update_watch(JTx, Type);
+ false -> ok
+ end,
+ JobIds.
+
+
+% Cache initialization API. Called from the supervisor just to create the ETS
+% table. It returns `ignore` to tell supervisor it won't actually start any
+% process, which is what we want here.
+%
+init_cache() ->
+ ConcurrencyOpts = [{read_concurrency, true}, {write_concurrency, true}],
+ ets:new(?MODULE, [public, named_table] ++ ConcurrencyOpts),
+ ignore.
+
+
+% Cached job transaction object. This object wraps a transaction, caches the
+% directory lookup path, and the metadata version. The function can be used
+% from or outside the transaction. When used from a transaction it will verify
+% if the metadata was changed, and will refresh automatically.
+%
+get_jtx() ->
+ get_jtx(undefined).
+
+
+get_jtx(#{tx := Tx} = _TxDb) ->
+ get_jtx(Tx);
+
+get_jtx(undefined = _Tx) ->
+ case ets:lookup(?MODULE, ?JOBS) of
+ [{_, #{} = JTx}] ->
+ JTx;
+ [] ->
+ JTx = update_jtx_cache(init_jtx(undefined)),
+ JTx#{tx := undefined}
+ end;
+
+get_jtx({erlfdb_transaction, _} = Tx) ->
+ case ets:lookup(?MODULE, ?JOBS) of
+ [{_, #{} = JTx}] ->
+ ensure_current(JTx#{tx := Tx});
+ [] ->
+ update_jtx_cache(init_jtx(Tx))
+ end.
+
+
+% Transaction processing to be used with couch jobs' specific transaction
+% contexts
+%
+tx(#{jtx := true} = JTx, Fun) when is_function(Fun, 1) ->
+ fabric2_fdb:transactional(JTx, Fun).
+
+
+% Utility fdb functions used by other module in couch_job. Maybe move these to
+% a separate module if the list keep growing
+
+has_versionstamp(?UNSET_VS) ->
+ true;
+
+has_versionstamp(Tuple) when is_tuple(Tuple) ->
+ has_versionstamp(tuple_to_list(Tuple));
+
+has_versionstamp([Elem | Rest]) ->
+ has_versionstamp(Elem) orelse has_versionstamp(Rest);
+
+has_versionstamp(_Other) ->
+ false.
+
+
+% Debug and testing API
+
+clear_jobs() ->
+ fabric2_fdb:transactional(fun(Tx) ->
+ #{jobs_path := Jobs} = init_jtx(Tx),
+ erlfdb:clear_range_startswith(Tx, Jobs)
+ end).
+
+
+clear_type(Type) ->
+ Sections = [?DATA, ?PENDING, ?WATCHES, ?ACTIVITY_TIMEOUT, ?ACTIVITY],
+ fabric2_fdb:transactional(fun(Tx) ->
+ #{jobs_path := Jobs} = init_jtx(Tx),
+ lists:foreach(fun(Section) ->
+ Prefix = erlfdb_tuple:pack({Section, Type}, Jobs),
+ erlfdb:clear_range_startswith(Tx, Prefix)
+ end, Sections)
+ end).
+
+
+% Private helper functions
+
+update_job(JTx, Type, JobId, WorkerLockId, JobOpts, OldSeq) ->
+ #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+ Key = erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs),
+ update_activity(JTx, Type, JobId, OldSeq),
+ ValTup = {?UNSET_VS, WorkerLockId, null, jiffy:encode(JobOpts)},
+ Val = erlfdb_tuple:pack_vs(ValTup),
+ erlfdb:set_versionstamped_value(Tx, Key, Val).
+
+
+update_priority(JobOpts, undefined) ->
+ JobOpts;
+
+update_priority(JobOpts, NewPriority) ->
+ OldPriority = maps:get(?OPT_PRIORITY, JobOpts, undefined),
+ case NewPriority =/= OldPriority of
+ true -> JobOpts#{?OPT_PRIORITY => NewPriority};
+ false -> JobOpts
+ end.
+
+
+cancel(#{jx := true}, _, {_, _, _, #{?OPT_CANCEL := true}}) ->
+ ok;
+
+cancel(#{jtx := true, tx := Tx}, Key, Job) ->
+ {Seq, WorkerLockId, Priority, JobOpts} = Job,
+ JobOpts1 = JobOpts#{?OPT_CANCEL => true},
+ JobOptsEnc = jiffy:encode(JobOpts1),
+ Val = erlfdb_tuple:pack({Seq, WorkerLockId, Priority, JobOptsEnc}),
+ erlfdb:set(Tx, Key, Val),
+ ok.
+
+
+maybe_enqueue(#{jtx := true} = JTx, Type, JobId, JobOpts, Resubmit) ->
+ #{tx := Tx, jobs_path := Jobs} = JTx,
+ Key = erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs),
+ Cancel = maps:get(?OPT_CANCEL, JobOpts, false) == true,
+ JobOpts1 = maps:without([?OPT_RESUBMIT], JobOpts),
+ Priority = maps:get(?OPT_PRIORITY, JobOpts1, ?UNSET_VS),
+ JobOptsEnc = jiffy:encode(JobOpts1),
+ case Resubmit andalso not Cancel of
+ true ->
+ case has_versionstamp(Priority) of
+ true ->
+ Val = erlfdb_tuple:pack_vs({null, null, Priority,
+ JobOptsEnc}),
+ erlfdb:set_versionstamped_value(Tx, Key, Val);
+ false ->
+ Val = erlfdb_tuple:pack({null, null, Priority,
+ JobOptsEnc}),
+ erlfdb:set(Tx, Key, Val)
+ end,
+ couch_jobs_pending:enqueue(JTx, Type, Priority, JobId);
+ false ->
+ Val = erlfdb_tuple:pack({null, null, null, JobOptsEnc}),
+ erlfdb:set(Tx, Key, Val)
+ end,
+ ok.
+
+
+get_job(Tx = {erlfdb_transaction, _}, Key) ->
+ case erlfdb:wait(erlfdb:get(Tx, Key)) of
+ <<_/binary>> = Val -> unpack_job(Val);
+ not_found -> not_found
+ end.
+
+
+unpack_job(<<_/binary>> = JobVal) ->
+ {Seq, WorkerLockId, Priority, JobOptsEnc} = erlfdb_tuple:unpack(JobVal),
+ JobOpts = jiffy:decode(JobOptsEnc, [return_maps]),
+ {Seq, WorkerLockId, Priority, JobOpts}.
+
+
+get_job_and_status(Tx, Key, WorkerLockId) ->
+ case get_job(Tx, Key) of
+ {_, LockId, _, _} = Res when WorkerLockId =/= LockId ->
+ {worker_conflict, Res};
+ {_, _, _, #{?OPT_CANCEL := true}} = Res ->
+ {canceled, Res};
+ {_, _, _, #{}} = Res ->
+ {ok, Res};
+ not_found ->
+ {worker_conflict, not_found}
+ end.
+
+
+update_activity(#{jtx := true} = JTx, Type, JobId, Seq) ->
+ #{tx := Tx, jobs_path := Jobs} = JTx,
+ case Seq =/= null of
+ true -> clear_activity(JTx, Type, Seq);
+ false -> ok
+ end,
+ Key = erlfdb_tuple:pack_vs({?ACTIVITY, Type, ?UNSET_VS}, Jobs),
+ erlfdb:set_versionstamped_key(Tx, Key, JobId),
+ update_watch(JTx, Type).
+
+
+clear_activity(#{jtx := true} = JTx, Type, Seq) ->
+ #{tx := Tx, jobs_path := Jobs} = JTx,
+ Key = erlfdb_tuple:pack({?ACTIVITY, Type, Seq}, Jobs),
+ erlfdb:clear(Tx, Key).
+
+
+update_watch(#{jtx := true} = JTx, Type) ->
+ #{tx := Tx, jobs_path := Jobs} = JTx,
+ Key = erlfdb_tuple:pack({?WATCHES, Type}, Jobs),
+ Val = erlfdb_tuple:pack_vs({?UNSET_VS}),
+ erlfdb:set_versionstamped_value(Tx, Key, Val).
+
+
+update_lock(#{jtx := true} = JTx, Type, JobId, WorkerLockId) ->
+ #{tx := Tx, jobs_path := Jobs} = JTx,
+ Key = erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs),
+ {null, null, _, JobOpts} = get_job(Tx, Key),
+ ValTup = {?UNSET_VS, WorkerLockId, null, jiffy:encode(JobOpts)},
+ Val = erlfdb_tuple:pack_vs(ValTup),
+ erlfdb:set_versionstamped_value(Tx, Key, Val).
+
+
+job_state(WorkerLockId, Priority) ->
+ case {WorkerLockId, Priority} of
+ {null, null} ->
+ finished;
+ {WorkerLockId, _} when WorkerLockId =/= null ->
+ running;
+ {_, Priority} when Priority =/= null ->
+ pending
+ end.
+
+
+% This a transaction context object similar to the Db = #{} one from
+% fabric2_fdb. It's is used to cache the jobs path directory (to avoid extra
+% lookups on every operation) and to check for metadata changes (in case
+% directory changes).
+%
+init_jtx(undefined) ->
+ fabric2_fdb:transactional(fun(Tx) -> init_jtx(Tx) end);
+
+init_jtx({erlfdb_transaction, _} = Tx) ->
+ Root = erlfdb_directory:root(),
+ CouchDB = erlfdb_directory:create_or_open(Tx, Root, [<<"couchdb">>]),
+ LayerPrefix = erlfdb_directory:get_name(CouchDB),
+ Jobs = erlfdb_tuple:pack({?JOBS}, LayerPrefix),
+ Version = erlfdb:wait(erlfdb:get(Tx, ?METADATA_VERSION_KEY)),
+ % layer_prefix, md_version and tx here match db map fields in fabric2_fdb
+ % but we also assert that this is a job transaction using the jtx => true
+ % field
+ #{
+ jtx => true,
+ tx => Tx,
+ layer_prefix => LayerPrefix,
+ jobs_path => Jobs,
+ md_version => Version
+ }.
+
+
+ensure_current(#{jtx := true, tx := Tx, md_version := Version} = JTx) ->
+ case erlfdb:wait(erlfdb:get(Tx, ?METADATA_VERSION_KEY)) of
+ Version -> JTx;
+ _NewVersion -> update_jtx_cache(init_jtx(Tx))
+ end.
+
+
+update_jtx_cache(#{jtx := true} = JTx) ->
+ CachedJTx = JTx#{tx := undefined},
+ ets:insert(?MODULE, {?JOBS, CachedJTx}),
+ JTx.
diff --git a/src/couch_jobs/src/couch_jobs_notifier.erl b/src/couch_jobs/src/couch_jobs_notifier.erl
new file mode 100644
index 000000000..59fa31a29
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_notifier.erl
@@ -0,0 +1,218 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_notifier).
+
+-behaviour(gen_server).
+
+
+-export([
+ start_link/1,
+ subscribe/3,
+ subscribe/4,
+ unsubscribe/2
+]).
+
+-export([
+ init/1,
+ terminate/2,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ code_change/3
+]).
+
+
+-include("couch_jobs.hrl").
+
+
+-define(TYPE_MONITOR_HOLDOFF_DEFAULT, 1000).
+-define(TYPE_MONITOR_TIMEOUT_DEFAULT, "infinity").
+
+
+-record(st, {
+ jtx,
+ type,
+ monitor_pid,
+ subs
+}).
+
+
+start_link(Type) ->
+ gen_server:start_link(?MODULE, [Type], []).
+
+
+subscribe(Server, JobId, Pid) when is_pid(Pid) ->
+ gen_server:call(Server, {subscribe, JobId, nil, Pid}, infinity).
+
+
+subscribe(Server, JobId, Fun, Pid) when is_function(Fun, 3), is_pid(Pid) ->
+ gen_server:call(Server, {subscribe, JobId, Fun, Pid}, infinity).
+
+
+unsubscribe(Server, Ref) when is_reference(Ref) ->
+ gen_server:call(Server, {unsubscribe, Ref}, infinity).
+
+
+init([Type]) ->
+ JTx = couch_jobs_fdb:get_jtx(),
+ EtsOpts = [ordered_set, protected],
+ St = #st{jtx = JTx, type = Type, subs = ets:new(?MODULE, EtsOpts)},
+ VS = get_type_vs(St),
+ HoldOff = get_holdoff(),
+ Timeout = get_timeout(),
+ Pid = couch_jobs_type_monitor:start(Type, VS, HoldOff, Timeout),
+ {ok, St#st{monitor_pid = Pid}}.
+
+
+terminate(_, _St) ->
+ ok.
+
+
+handle_call({subscribe, JobId, Fun, Pid}, _From, #st{} = St) ->
+ Res = case get_job(St, JobId) of
+ not_found ->
+ not_found;
+ {ok, _, finished} ->
+ finished;
+ {ok, _, JobState} ->
+ Ref = erlang:monitor(process, Pid),
+ ets:insert(St#st.subs, {{JobId, Ref}, {Fun, Pid, JobState}}),
+ {Ref, JobState}
+ end,
+ {reply, Res, St};
+
+handle_call({unsubscribe, Ref}, _From, #st{subs = Subs} = St) ->
+ true = ets:match_delete(Subs, {{'$1', Ref}, '_'}),
+ {reply, ok, St};
+
+% couch_jobs_type_monitor calls this
+handle_call({type_updated, VS}, _From, St) ->
+ ok = notify_subscribers(VS, St),
+ {reply, ok, St};
+
+handle_call(Msg, _From, St) ->
+ {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
+
+
+handle_cast(Msg, St) ->
+ {stop, {bad_cast, Msg}, St}.
+
+
+handle_info({'DOWN', Ref, process, _, _}, #st{subs = Subs} = St) ->
+ true = ets:match_delete(Subs, {{'$1', Ref}, '_'}),
+ {noreply, St};
+
+handle_info(Msg, St) ->
+ {stop, {bad_info, Msg}, St}.
+
+
+code_change(_OldVsn, St, _Extra) ->
+ {ok, St}.
+
+
+get_job(#st{jtx = JTx, type = Type}, JobId) ->
+ couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+ couch_jobs_fdb:get_job(JTx1, Type, JobId)
+ end).
+
+
+get_jobs(#st{jtx = JTx, type = Type}, JobIds) ->
+ couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+ lists:map(fun(JobId) ->
+ case couch_jobs_fdb:get_job(JTx1, Type, JobId) of
+ {ok, _, JobState} -> {JobId, JobState};
+ not_found -> {JobId, not_found}
+ end
+ end, JobIds)
+ end).
+
+
+get_type_vs(#st{jtx = JTx, type = Type}) ->
+ couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+ couch_jobs_fdb:get_activity_vs(JTx1, Type)
+ end).
+
+
+% "Active since" is the list of jobs that have been active (running)
+% and updated at least once since the given versionstamp. These are relatively
+% cheap to find as it's just a range read in the ?ACTIVITY subspace.
+%
+get_active_since(#st{jtx = JTx, type = Type}, VS, SubscribedJobs) ->
+ AllUpdatedSet = sets:from_list(couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+ couch_jobs_fdb:get_active_since(JTx1, Type, VS)
+ end)),
+ SubscribedSet = sets:from_list(SubscribedJobs),
+ SubscribedActiveSet = sets:intersection(AllUpdatedSet, SubscribedSet),
+ sets:to_list(SubscribedActiveSet).
+
+
+get_subscribers(JobId, #st{subs = Subs}) ->
+ % Use ordered ets's fast matching of partial key prefixes here
+ lists:map(fun([Ref, {Fun, Pid, JobState}]) ->
+ {Ref, Fun, Pid, JobState}
+ end, ets:match(Subs, {{JobId, '$1'}, '$2'})).
+
+
+get_subscribed_job_ids(#st{subs = Subs}) ->
+ Matches = ets:match(Subs, {{'$1', '_'}, '_'}),
+ lists:usort(lists:flatten(Matches)).
+
+
+notify_subscribers(VS, #st{subs = Subs} = St) ->
+ JobIds = get_subscribed_job_ids(St),
+ % First gather the easy (cheap) active jobs. Then with those out of way,
+ % inspect each job to get its state.
+ Active = get_active_since(St, VS, JobIds),
+ JobStates = [{JobId, running} || JobId <- Active],
+ JobStates1 = JobStates ++ get_jobs(St, JobIds -- Active),
+ lists:foreach(fun({JobId, JobState}) ->
+ lists:foreach(fun
+ ({Ref, Fun, Pid, running}) when JobState =:= running ->
+ notify(JobId, Ref, Fun, Pid, JobState, St);
+ ({_Ref, _Fun, _Pid, State}) when State =:= JobState ->
+ ok;
+ ({Ref, Fun, Pid, _}) ->
+ notify(JobId, Ref, Fun, Pid, JobState, St),
+ ets:insert(Subs, {{JobId, Ref}, {Fun, Pid, JobState}})
+ end, get_subscribers(JobId, St)),
+ case lists:member(JobState, [finished, not_found]) of
+ true -> ets:match_delete(Subs, {{JobId, '_'}, '_'});
+ false -> ok
+ end
+ end, JobStates1).
+
+
+notify(JobId, _Ref, Fun, _, JobState, St) when is_function(Fun, 3) ->
+ try
+ Fun(St#st.type, JobId, JobState)
+ catch
+ Tag:Err ->
+ ErrMsg = "~p : callback ~p failed ~p ~p => ~p:~p",
+ couch_log:error(ErrMsg, [?MODULE, Fun, JobId, JobState, Tag, Err])
+ end;
+
+notify(JobId, Ref, _, Pid, JobState, St) ->
+ Pid ! {?COUCH_JOBS_EVENT, Ref, St#st.type, JobId, JobState}.
+
+
+get_holdoff() ->
+ config:get_integer("couch_jobs", "type_monitor_holdoff_msec",
+ ?TYPE_MONITOR_HOLDOFF_DEFAULT).
+
+
+get_timeout() ->
+ Default = ?TYPE_MONITOR_TIMEOUT_DEFAULT,
+ case config:get("couch_jobs", "type_monitor_timeout_msec", Default) of
+ "infinity" -> infinity;
+ Milliseconds -> list_to_integer(Milliseconds)
+ end.
diff --git a/src/couch_jobs/src/couch_jobs_notifier_sup.erl b/src/couch_jobs/src/couch_jobs_notifier_sup.erl
new file mode 100644
index 000000000..81d93493b
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_notifier_sup.erl
@@ -0,0 +1,64 @@
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_notifier_sup).
+
+
+-behaviour(supervisor).
+
+
+-export([
+ start_link/0,
+
+ start_notifier/1,
+ stop_notifier/1,
+ get_child_pids/0
+]).
+
+-export([
+ init/1
+]).
+
+
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+
+start_notifier(Type) ->
+ supervisor:start_child(?MODULE, [Type]).
+
+
+stop_notifier(Pid) ->
+ supervisor:terminate_child(?MODULE, Pid).
+
+
+get_child_pids() ->
+ lists:map(fun({_Id, Pid, _Type, _Mod}) ->
+ Pid
+ end, supervisor:which_children(?MODULE)).
+
+
+init(_) ->
+ Flags = #{
+ strategy => simple_one_for_one,
+ intensity => 10,
+ period => 3
+ },
+ Children = [
+ #{
+ id => couch_jobs_notifier,
+ restart => temporary,
+ start => {couch_jobs_notifier, start_link, []}
+ }
+ ],
+ {ok, {Flags, Children}}.
diff --git a/src/couch_jobs/src/couch_jobs_pending.erl b/src/couch_jobs/src/couch_jobs_pending.erl
new file mode 100644
index 000000000..cedf53e43
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_pending.erl
@@ -0,0 +1,159 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_pending).
+
+
+-export([
+ enqueue/4,
+ dequeue/3,
+ remove/4
+]).
+
+
+-include("couch_jobs.hrl").
+
+
+% Make this configurable or auto-adjustable based on retries
+%
+-define(RANGE_LIMIT, 256).
+
+
+% Data model
+%
+% (?JOBS, ?PENDING, Type, Priority, JobId) = ""
+
+
+% Public API
+
+% Enqueue a job into the pending queue. Priority determines the place in the
+% queue.
+%
+enqueue(#{jtx := true} = JTx, Type, Priority, JobId) ->
+ #{tx := Tx, jobs_path := Jobs} = JTx,
+ KeyTup = {?PENDING, Type, Priority, JobId},
+ case couch_jobs_fdb:has_versionstamp(Priority) of
+ true ->
+ Key = erlfdb_tuple:pack_vs(KeyTup, Jobs),
+ erlfdb:set_versionstamped_key(Tx, Key, <<>>);
+ false ->
+ Key = erlfdb_tuple:pack(KeyTup, Jobs),
+ erlfdb:set(Tx, Key, <<>>)
+ end.
+
+
+% Dequeue a job from the front of the queue.
+%
+% If MaxPriority is specified, any job between the front of the queue and up
+% until MaxPriority is considered. Workers may randomly pick jobs in that
+% range. That can be used to avoid contention at the expense of strict dequeue
+% ordering. For instance if priorities are 0-high, 1-normal and 2-low, and we
+% wish to process normal and urgent jobs, then MaxPriority=1-normal would
+% accomplish that.
+%
+dequeue(#{jtx := true} = JTx, Type, undefined) ->
+ #{tx := Tx, jobs_path := Jobs} = JTx,
+ Prefix = erlfdb_tuple:pack({?PENDING, Type}, Jobs),
+ case get_front_priority(Tx, Prefix) of
+ not_found ->
+ not_found;
+ {{versionstamp, _, _}, PendingKey} ->
+ erlfdb:clear(Tx, PendingKey),
+ {_, JobId} = erlfdb_tuple:unpack(PendingKey, Prefix),
+ JobId;
+ {{versionstamp, _, _, _}, PendingKey} ->
+ erlfdb:clear(Tx, PendingKey),
+ {_, JobId} = erlfdb_tuple:unpack(PendingKey, Prefix),
+ JobId;
+ {Priority, _} ->
+ {Start, End} = erlfdb_tuple:range({Priority}, Prefix),
+ case clear_random_key_from_range(Tx, Start, End) of
+ not_found ->
+ not_found;
+ <<_/binary>> = PendingKey ->
+ {_, JobId} = erlfdb_tuple:unpack(PendingKey, Prefix),
+ JobId
+ end
+ end;
+
+% If MaxPriority is not specified, only jobs with the same priority as the item
+% at the front of the queue are considered. Two extremes are useful to
+% consider:
+%
+% * Priority is just one static value (say null, or "normal"). In that case,
+% the queue is effectively a bag of tasks that can be grabbed in any order,
+% which should minimize contention.
+%
+% * Each job has a unique priority value, for example a versionstamp. In that
+% case, the queue has strict FIFO behavior, but there will be more contention
+% at the front of the queue.
+%
+dequeue(#{jtx := true} = JTx, Type, MaxPriority) ->
+ #{tx := Tx, jobs_path := Jobs} = JTx,
+ Prefix = erlfdb_tuple:pack({?PENDING, Type}, Jobs),
+ StartKeySel = erlfdb_key:first_greater_than(Prefix),
+ End = erlfdb_tuple:pack({MaxPriority, <<16#FF>>}, Prefix),
+ EndKeySel = erlfdb_key:first_greater_or_equal(End),
+ case clear_random_key_from_range(Tx, StartKeySel, EndKeySel) of
+ not_found ->
+ not_found;
+ <<_/binary>> = PendingKey ->
+ {_, JobId} = erlfdb_tuple:unpack(PendingKey, Prefix),
+ JobId
+ end.
+
+
+% Remove a job from the pending queue. This is used, for example, when a job is
+% canceled while it was waiting in the pending queue.
+%
+remove(#{jtx := true} = JTx, Type, Priority, JobId) ->
+ #{tx := Tx, jobs_path := Jobs} = JTx,
+ Key = erlfdb_tuple:pack({?PENDING, Type, Priority, JobId}, Jobs),
+ erlfdb:clear(Tx, Key).
+
+
+% Private functions
+
+% The priority of the item at the front. If there are multiple
+% items with the same priority, workers can randomly pick between them to
+% avoid contention.
+%
+get_front_priority(Tx, Prefix) ->
+ Opts = [{limit, 1}, {snapshot, true}],
+ case erlfdb:wait(erlfdb:get_range_startswith(Tx, Prefix, Opts)) of
+ [] ->
+ not_found;
+ [{FrontKey, _}] ->
+ {Priority, _} = erlfdb_tuple:unpack(FrontKey, Prefix),
+ {Priority, FrontKey}
+ end.
+
+
+% Pick a random key from the range snapshot. Then radomly pick a key to
+% clear. Before clearing, ensure there is a read conflict on the key in
+% in case other workers have picked the same key.
+%
+clear_random_key_from_range(Tx, Start, End) ->
+ Opts = [
+ {limit, ?RANGE_LIMIT},
+ {snapshot, true}
+ ],
+ case erlfdb:wait(erlfdb:get_range(Tx, Start, End, Opts)) of
+ [] ->
+ not_found;
+ [{_, _} | _] = KVs ->
+ Index = rand:uniform(length(KVs)),
+ {Key, _} = lists:nth(Index, KVs),
+ erlfdb:add_read_conflict_key(Tx, Key),
+ erlfdb:clear(Tx, Key),
+ Key
+ end.
diff --git a/src/couch_jobs/src/couch_jobs_server.erl b/src/couch_jobs/src/couch_jobs_server.erl
new file mode 100644
index 000000000..ad30435bd
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_server.erl
@@ -0,0 +1,184 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_server).
+
+-behaviour(gen_server).
+
+
+-export([
+ start_link/0,
+ get_notifier_server/1,
+ force_check_types/0
+]).
+
+-export([
+ init/1,
+ terminate/2,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ code_change/3
+]).
+
+
+-define(TYPE_CHECK_PERIOD_DEFAULT, 15000).
+-define(MAX_JITTER_DEFAULT, 5000).
+
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, nil, []).
+
+
+get_notifier_server(Type) ->
+ case get_type_pid_refs(Type) of
+ {{_, _}, {NotifierPid, _}} -> {ok, NotifierPid};
+ not_found -> {error, not_found}
+ end.
+
+
+force_check_types() ->
+ gen_server:call(?MODULE, check_types, infinity).
+
+
+init(_) ->
+ % If couch_jobs_server is after the notifiers and activity supervisor. If
+ % it restart, there could be some stale notifier or activity monitors. Kill
+ % those as later on we'd start new ones anyway.
+ reset_monitors(),
+ reset_notifiers(),
+ ets:new(?MODULE, [protected, named_table]),
+ schedule_check(),
+ {ok, nil}.
+
+
+terminate(_, _St) ->
+ ok.
+
+
+handle_call(check_types, _From, St) ->
+ check_types(),
+ {reply, ok, St};
+
+handle_call(Msg, _From, St) ->
+ {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
+
+
+handle_cast(Msg, St) ->
+ {stop, {bad_cast, Msg}, St}.
+
+
+handle_info(check_types, St) ->
+ check_types(),
+ schedule_check(),
+ {noreply, St};
+
+handle_info({'DOWN', _Ref, process, Pid, Reason}, St) ->
+ LogMsg = "~p : process ~p exited with ~p",
+ couch_log:error(LogMsg, [?MODULE, Pid, Reason]),
+ {stop, {unexpected_process_exit, Pid, Reason}, St};
+
+handle_info({Ref, ready}, St) when is_reference(Ref) ->
+ % Don't crash out couch_jobs_server and the whole application would need to
+ % eventually do proper cleanup in erlfdb:wait timeout code.
+ LogMsg = "~p : spurious erlfdb future ready message ~p",
+ couch_log:error(LogMsg, [?MODULE, Ref]),
+ {noreply, St};
+
+handle_info(Msg, St) ->
+ {stop, {bad_info, Msg}, St}.
+
+
+code_change(_OldVsn, St, _Extra) ->
+ {ok, St}.
+
+
+check_types() ->
+ FdbTypes = fdb_types(),
+ EtsTypes = ets_types(),
+ ToStart = FdbTypes -- EtsTypes,
+ ToStop = EtsTypes -- FdbTypes,
+ lists:foreach(fun(Type) -> start_monitors(Type) end, ToStart),
+ lists:foreach(fun(Type) -> stop_monitors(Type) end, ToStop).
+
+
+start_monitors(Type) ->
+ MonPidRef = case couch_jobs_activity_monitor_sup:start_monitor(Type) of
+ {ok, Pid1} -> {Pid1, monitor(process, Pid1)};
+ {error, Error1} -> error({failed_to_start_monitor, Type, Error1})
+ end,
+ NotifierPidRef = case couch_jobs_notifier_sup:start_notifier(Type) of
+ {ok, Pid2} -> {Pid2, monitor(process, Pid2)};
+ {error, Error2} -> error({failed_to_start_notifier, Type, Error2})
+ end,
+ ets:insert_new(?MODULE, {Type, MonPidRef, NotifierPidRef}).
+
+
+stop_monitors(Type) ->
+ {{MonPid, MonRef}, {NotifierPid, NotifierRef}} = get_type_pid_refs(Type),
+ ok = couch_jobs_activity_monitor_sup:stop_monitor(MonPid),
+ demonitor(MonRef, [flush]),
+ ok = couch_jobs_notifier_sup:stop_notifier(NotifierPid),
+ demonitor(NotifierRef, [flush]),
+ ets:delete(?MODULE, Type).
+
+
+reset_monitors() ->
+ lists:foreach(fun(Pid) ->
+ couch_jobs_activity_monitor_sup:stop_monitor(Pid)
+ end, couch_jobs_activity_monitor_sup:get_child_pids()).
+
+
+reset_notifiers() ->
+ lists:foreach(fun(Pid) ->
+ couch_jobs_notifier_sup:stop_notifier(Pid)
+ end, couch_jobs_notifier_sup:get_child_pids()).
+
+
+get_type_pid_refs(Type) ->
+ case ets:lookup(?MODULE, Type) of
+ [{_, MonPidRef, NotifierPidRef}] -> {MonPidRef, NotifierPidRef};
+ [] -> not_found
+ end.
+
+
+ets_types() ->
+ lists:flatten(ets:match(?MODULE, {'$1', '_', '_'})).
+
+
+fdb_types() ->
+ try
+ couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+ couch_jobs_fdb:get_types(JTx)
+ end)
+ catch
+ error:{timeout, _} ->
+ couch_log:warning("~p : Timed out connecting to FDB", [?MODULE]),
+ []
+ end.
+
+
+schedule_check() ->
+ Timeout = get_period_msec(),
+ MaxJitter = max(Timeout div 2, get_max_jitter_msec()),
+ Wait = Timeout + rand:uniform(min(1, MaxJitter)),
+ erlang:send_after(Wait, self(), check_types).
+
+
+get_period_msec() ->
+ config:get_integer("couch_jobs", "type_check_period_msec",
+ ?TYPE_CHECK_PERIOD_DEFAULT).
+
+
+get_max_jitter_msec() ->
+ config:get_integer("couch_jobs", "type_check_max_jitter_msec",
+ ?MAX_JITTER_DEFAULT).
diff --git a/src/couch_jobs/src/couch_jobs_sup.erl b/src/couch_jobs/src/couch_jobs_sup.erl
new file mode 100644
index 000000000..d79023777
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_sup.erl
@@ -0,0 +1,66 @@
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_sup).
+
+
+-behaviour(supervisor).
+
+
+-export([
+ start_link/0
+]).
+
+-export([
+ init/1
+]).
+
+
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+
+init([]) ->
+ Flags = #{
+ strategy => rest_for_one,
+ intensity => 3,
+ period => 10
+ },
+ Children = [
+ #{
+ id => couch_jobs_fdb,
+ restart => transient,
+ start => {couch_jobs_fdb, init_cache, []}
+ },
+ #{
+ id => couch_jobs_activity_monitor_sup,
+ restart => permanent,
+ shutdown => brutal_kill,
+ type => supervisor,
+ start => {couch_jobs_activity_monitor_sup, start_link, []}
+ },
+ #{
+ id => couch_jobs_notifier_sup,
+ restart => permanent,
+ shutdown => brutal_kill,
+ type => supervisor,
+ start => {couch_jobs_notifier_sup, start_link, []}
+ },
+ #{
+ id => couch_jobs_server,
+ restart => permanent,
+ shutdown => brutal_kill,
+ start => {couch_jobs_server, start_link, []}
+ }
+ ],
+ {ok, {Flags, Children}}.
diff --git a/src/couch_jobs/src/couch_jobs_type_monitor.erl b/src/couch_jobs/src/couch_jobs_type_monitor.erl
new file mode 100644
index 000000000..b7ba633e2
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_type_monitor.erl
@@ -0,0 +1,78 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_type_monitor).
+
+
+-export([
+ start/4
+]).
+
+
+-include("couch_jobs.hrl").
+
+
+-record(st, {
+ jtx,
+ type,
+ vs,
+ parent,
+ timestamp,
+ holdoff,
+ timeout
+}).
+
+
+start(Type, VS, HoldOff, Timeout) ->
+ Parent = self(),
+ spawn_link(fun() ->
+ loop(#st{
+ jtx = couch_jobs_fdb:get_jtx(),
+ type = Type,
+ vs = VS,
+ parent = Parent,
+ timestamp = 0,
+ holdoff = HoldOff,
+ timeout = Timeout
+ })
+ end).
+
+
+loop(#st{vs = VS, timeout = Timeout} = St) ->
+ {St1, Watch} = case get_vs_and_watch(St) of
+ {VS1, W} when VS1 =/= VS -> {notify(St#st{vs = VS1}), W};
+ {VS, W} -> {St, W}
+ end,
+ try
+ erlfdb:wait(Watch, [{timeout, Timeout}])
+ catch
+ error:{timeout, _} ->
+ ok
+ end,
+ loop(St1).
+
+
+notify(#st{} = St) ->
+ #st{holdoff = HoldOff, parent = Pid, timestamp = Ts, vs = VS} = St,
+ Now = erlang:system_time(millisecond),
+ case Now - Ts of
+ Dt when Dt < HoldOff -> timer:sleep(min(HoldOff - Dt, 0));
+ _ -> ok
+ end,
+ gen_server:call(Pid, {type_updated, VS}, infinity),
+ St#st{timestamp = Now}.
+
+
+get_vs_and_watch(#st{jtx = JTx, type = Type}) ->
+ couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+ couch_jobs_fdb:get_activity_vs_and_watch(JTx1, Type)
+ end).
diff --git a/src/couch_jobs/test/couch_jobs_tests.erl b/src/couch_jobs/test/couch_jobs_tests.erl
new file mode 100644
index 000000000..fe3df5663
--- /dev/null
+++ b/src/couch_jobs/test/couch_jobs_tests.erl
@@ -0,0 +1,597 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_tests).
+
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+
+-define(DATA, <<"data">>).
+-define(RESUBMIT, <<"resubmit">>).
+-define(PRIORITY, <<"priority">>).
+
+
+couch_jobs_basic_test_() ->
+ {
+ "Test couch jobs basics",
+ {
+ setup,
+ fun setup_couch/0, fun teardown_couch/1,
+ {
+ foreach,
+ fun setup/0, fun teardown/1,
+ [
+ fun add_remove_pending/1,
+ fun add_remove_errors/1,
+ fun get_jobs/1,
+ fun resubmit_as_job_creator/1,
+ fun type_timeouts_and_server/1,
+ fun dead_notifier_restarts_jobs_server/1,
+ fun bad_messages_restart_couch_jobs_server/1,
+ fun bad_messages_restart_notifier/1,
+ fun bad_messages_restart_activity_monitor/1,
+ fun worker_accept_and_finish/1,
+ fun worker_update/1,
+ fun resubmit_enqueues_job/1,
+ fun add_custom_priority/1,
+ fun resubmit_custom_priority/1,
+ fun accept_max_priority/1,
+ fun subscribe/1,
+ fun subscribe_callback/1,
+ fun subscribe_errors/1,
+ fun enqueue_inactive/1,
+ fun cancel_running_job/1,
+ fun stop_and_remove_running_job/1,
+ fun clear_type_works/1
+ ]
+ }
+ }
+ }.
+
+
+setup_couch() ->
+ test_util:start_couch([fabric]).
+
+
+teardown_couch(Ctx) ->
+ test_util:stop_couch(Ctx),
+ meck:unload().
+
+
+setup() ->
+ couch_jobs_fdb:clear_jobs(),
+ application:start(couch_jobs),
+ T1 = <<"t1">>,
+ T2 = 424242, % a number should work as well
+ T1Timeout = 2,
+ T2Timeout = 3,
+ couch_jobs:set_type_timeout(T1, T1Timeout),
+ couch_jobs:set_type_timeout(T2, T2Timeout),
+ couch_jobs_server:force_check_types(),
+ #{
+ t1 => T1,
+ t2 => T2,
+ t1_timeout => T1Timeout,
+ t2_timeout => T2Timeout,
+ j1 => <<"j1">>,
+ j2 => <<"j2">>,
+ j1_data => #{<<"j1_data">> => 42}
+ }.
+
+
+teardown(#{}) ->
+ application:stop(couch_jobs),
+ couch_jobs_fdb:clear_jobs(),
+ meck:unload().
+
+
+restart_app() ->
+ application:stop(couch_jobs),
+ application:start(couch_jobs),
+ couch_jobs_server:force_check_types().
+
+
+add_remove_pending(#{t1 := T, j1 := J, j1_data := Data}) ->
+ ?_test(begin
+ ?assertEqual(ok, couch_jobs:add(T, J, #{?DATA => Data})),
+ ?assertMatch({ok, #{?DATA := Data}, pending},
+ couch_jobs:get_job(T, J)),
+ ?assertEqual(ok, couch_jobs:remove(T, J)),
+ ?assertEqual(ok, couch_jobs:add(T, J, #{?DATA => Data})),
+ ?assertMatch({ok, #{?DATA := Data}, pending},
+ couch_jobs:get_job(T, J)),
+ ?assertEqual(ok, couch_jobs:remove(T, J))
+ end).
+
+
+add_remove_errors(#{t1 := T, j1 := J}) ->
+ ?_test(begin
+ ?assertEqual(not_found, couch_jobs:remove(<<"bad_type">>, <<"1">>)),
+ ?assertMatch({error, {invalid_args, _}}, couch_jobs:add(T, J,
+ #{1 => 2})),
+ ?assertEqual({error, no_type_timeout}, couch_jobs:add(<<"x">>, J,
+ #{})),
+ ?assertEqual(ok, couch_jobs:add(T, J, #{})),
+ ?assertEqual({error, duplicate_job}, couch_jobs:add(T, J, #{})),
+ ?assertEqual(ok, couch_jobs:remove(T, J)),
+ ?assertEqual(not_found, couch_jobs:stop_and_remove(T, J, 100)),
+ ?assertEqual(not_found, couch_jobs:remove(T, J)),
+ ?assertMatch({error, {invalid_args, _}}, couch_jobs:add(T, J,
+ #{?RESUBMIT => potato})),
+ ?assertMatch({error, {invalid_args, _}}, couch_jobs:add(T, J,
+ #{?PRIORITY => #{bad_priority => nope}}))
+
+ end).
+
+
+get_jobs(#{t1 := T1, t2 := T2, j1 := J1, j2 := J2, j1_data := Data}) ->
+ ?_test(begin
+ ok = couch_jobs:add(T1, J1, #{?DATA => Data}),
+ ok = couch_jobs:add(T2, J2, #{}),
+
+ ?assertMatch({ok, #{?DATA := Data}, pending},
+ couch_jobs:get_job(T1, J1)),
+ ?assertEqual({ok, #{}, pending}, couch_jobs:get_job(T2, J2)),
+
+ ?assertEqual([{J1, pending, #{?DATA => Data}}],
+ couch_jobs:get_jobs(T1)),
+ ?assertEqual([{J2, pending, #{}}], couch_jobs:get_jobs(T2)),
+ ?assertEqual([], couch_jobs:get_jobs(<<"othertype">>)),
+
+ ?assertEqual(lists:sort([
+ {T1, J1, pending, #{?DATA => Data}},
+ {T2, J2, pending, #{}}
+ ]), lists:sort(couch_jobs:get_jobs())),
+ ?assertEqual(ok, couch_jobs:remove(T1, J1)),
+ ?assertEqual([{T2, J2, pending, #{}}], couch_jobs:get_jobs()),
+ ?assertEqual(ok, couch_jobs:remove(T2, J2)),
+ ?assertEqual([], couch_jobs:get_jobs())
+ end).
+
+
+resubmit_as_job_creator(#{t1 := T, j1 := J, j1_data := Data}) ->
+ ?_test(begin
+ ok = couch_jobs:add(T, J, #{?DATA => Data}),
+
+ ?assertEqual(not_found, couch_jobs:resubmit(T, <<"badjob">>)),
+
+ ?assertEqual(ok, couch_jobs:resubmit(T, J)),
+ JobOpts1 = #{?DATA => Data, ?RESUBMIT => true},
+ ?assertEqual({ok, JobOpts1, pending}, couch_jobs:get_job(T, J)),
+
+ ?assertEqual(ok, couch_jobs:resubmit(T, J)),
+ ?assertEqual({ok, JobOpts1, pending}, couch_jobs:get_job(T, J)),
+
+ ?assertEqual(ok, couch_jobs:resubmit(T, J, <<"a">>)),
+ JobOpts2 = #{?DATA => Data, ?RESUBMIT => true, ?PRIORITY => <<"a">>},
+ ?assertEqual({ok, JobOpts2, pending}, couch_jobs:get_job(T, J)),
+
+ ?assertEqual(ok, couch_jobs:resubmit(T, J, <<"b">>)),
+ JobOpts3 = #{?DATA => Data, ?RESUBMIT => true, ?PRIORITY => <<"b">>},
+ ?assertEqual({ok, JobOpts3, pending}, couch_jobs:get_job(T, J))
+ end).
+
+
+type_timeouts_and_server(#{t1 := T, t1_timeout := T1Timeout}) ->
+ ?_test(begin
+ ?assertEqual(T1Timeout, couch_jobs:get_type_timeout(T)),
+
+ ?assertEqual(2,
+ length(couch_jobs_activity_monitor_sup:get_child_pids())),
+ ?assertEqual(2, length(couch_jobs_notifier_sup:get_child_pids())),
+ ?assertMatch({ok, _}, couch_jobs_server:get_notifier_server(T)),
+
+ ?assertEqual(ok, couch_jobs:set_type_timeout(<<"t3">>, 8)),
+ couch_jobs_server:force_check_types(),
+ ?assertEqual(3,
+ length(couch_jobs_activity_monitor_sup:get_child_pids())),
+ ?assertEqual(3, length(couch_jobs_notifier_sup:get_child_pids())),
+
+ ?assertEqual(ok, couch_jobs:clear_type_timeout(<<"t3">>)),
+ couch_jobs_server:force_check_types(),
+ ?assertEqual(2,
+ length(couch_jobs_activity_monitor_sup:get_child_pids())),
+ ?assertEqual(2,
+ length(couch_jobs_notifier_sup:get_child_pids())),
+ ?assertMatch({error, _},
+ couch_jobs_server:get_notifier_server(<<"t3">>)),
+
+ ?assertEqual(not_found, couch_jobs:get_type_timeout(<<"t3">>))
+ end).
+
+
+dead_notifier_restarts_jobs_server(#{}) ->
+ ?_test(begin
+ ServerPid = whereis(couch_jobs_server),
+ Ref = monitor(process, ServerPid),
+
+ [Notifier1, _Notifier2] = couch_jobs_notifier_sup:get_child_pids(),
+ exit(Notifier1, kill),
+
+ % Killing a notifier should kill the server as well
+ receive {'DOWN', Ref, _, _, _} -> ok end
+ end).
+
+
+bad_messages_restart_couch_jobs_server(#{}) ->
+ ?_test(begin
+ % couch_jobs_server dies on bad cast
+ ServerPid1 = whereis(couch_jobs_server),
+ Ref1 = monitor(process, ServerPid1),
+ gen_server:cast(ServerPid1, bad_cast),
+ receive {'DOWN', Ref1, _, _, _} -> ok end,
+
+ restart_app(),
+
+ % couch_jobs_server dies on bad call
+ ServerPid2 = whereis(couch_jobs_server),
+ Ref2 = monitor(process, ServerPid2),
+ catch gen_server:call(ServerPid2, bad_call),
+ receive {'DOWN', Ref2, _, _, _} -> ok end,
+
+ restart_app(),
+
+ % couch_jobs_server dies on bad info
+ ServerPid3 = whereis(couch_jobs_server),
+ Ref3 = monitor(process, ServerPid3),
+ ServerPid3 ! a_random_message,
+ receive {'DOWN', Ref3, _, _, _} -> ok end,
+
+ restart_app()
+ end).
+
+
+bad_messages_restart_notifier(#{}) ->
+ ?_test(begin
+ % bad cast kills the activity monitor
+ [AMon1, _] = couch_jobs_notifier_sup:get_child_pids(),
+ Ref1 = monitor(process, AMon1),
+ gen_server:cast(AMon1, bad_cast),
+ receive {'DOWN', Ref1, _, _, _} -> ok end,
+
+ restart_app(),
+
+ % bad calls restart activity monitor
+ [AMon2, _] = couch_jobs_notifier_sup:get_child_pids(),
+ Ref2 = monitor(process, AMon2),
+ catch gen_server:call(AMon2, bad_call),
+ receive {'DOWN', Ref2, _, _, _} -> ok end,
+
+ restart_app(),
+
+ % bad info message kills activity monitor
+ [AMon3, _] = couch_jobs_notifier_sup:get_child_pids(),
+ Ref3 = monitor(process, AMon3),
+ AMon3 ! a_bad_message,
+ receive {'DOWN', Ref3, _, _, _} -> ok end,
+
+
+ restart_app()
+ end).
+
+
+bad_messages_restart_activity_monitor(#{}) ->
+ ?_test(begin
+ % bad cast kills the activity monitor
+ [AMon1, _] = couch_jobs_activity_monitor_sup:get_child_pids(),
+ Ref1 = monitor(process, AMon1),
+ gen_server:cast(AMon1, bad_cast),
+ receive {'DOWN', Ref1, _, _, _} -> ok end,
+
+ restart_app(),
+
+ % bad calls restart activity monitor
+ [AMon2, _] = couch_jobs_activity_monitor_sup:get_child_pids(),
+ Ref2 = monitor(process, AMon2),
+ catch gen_server:call(AMon2, bad_call),
+ receive {'DOWN', Ref2, _, _, _} -> ok end,
+
+ restart_app(),
+
+ % bad info message kills activity monitor
+ [AMon3, _] = couch_jobs_activity_monitor_sup:get_child_pids(),
+ Ref3 = monitor(process, AMon3),
+ AMon3 ! a_bad_message,
+ receive {'DOWN', Ref3, _, _, _} -> ok end,
+
+
+ restart_app()
+ end).
+
+
+worker_accept_and_finish(#{t1 := T, j1 := J}) ->
+ ?_test(begin
+ ok = couch_jobs:add(T, J, #{}),
+
+ AcceptResponse = couch_jobs:accept(T),
+ ?assertMatch({ok, J, <<_/binary>>}, AcceptResponse),
+ {ok, J, WLock} = AcceptResponse,
+
+ ?assertEqual({ok, #{}, running}, couch_jobs:get_job(T, J)),
+ Res = #{<<"result">> => <<"done">>},
+ ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) ->
+ couch_jobs:finish(Tx, T, J, #{?DATA => Res}, WLock)
+ end)),
+ ?assertEqual({ok, #{?DATA => Res}, finished},
+ couch_jobs:get_job(T, J)),
+
+ ?assertEqual(ok, couch_jobs:remove(T, J))
+ end).
+
+
+worker_update(#{t1 := T, j1 := J}) ->
+ ?_test(begin
+ ok = couch_jobs:add(T, J, #{}),
+
+ AcceptResponse = couch_jobs:accept(T),
+ ?assertMatch({ok, J, <<_/binary>>}, AcceptResponse),
+ {ok, J, WLock} = AcceptResponse,
+
+ ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) ->
+ couch_jobs:update(Tx, T, J, #{?DATA => 1}, WLock)
+ end)),
+ ?assertEqual({ok, #{?DATA => 1}, running}, couch_jobs:get_job(T, J)),
+
+ ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) ->
+ couch_jobs:update(Tx, T, J, #{?DATA => 2}, WLock)
+ end)),
+ ?assertEqual({ok, #{?DATA => 2}, running}, couch_jobs:get_job(T, J)),
+
+ ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) ->
+ couch_jobs:finish(Tx, T, J, #{?DATA => 3}, WLock)
+ end)),
+ ?assertEqual({ok, #{?DATA => 3}, finished}, couch_jobs:get_job(T, J)),
+
+ ?assertEqual(worker_conflict, fabric2_fdb:transactional(fun(Tx) ->
+ couch_jobs:update(Tx, T, J, #{?DATA => 4}, WLock)
+ end)),
+
+ ?assertMatch(not_found, couch_jobs:accept(T)),
+
+ ?assertEqual(ok, couch_jobs:remove(T, J))
+ end).
+
+
+resubmit_enqueues_job(#{t1 := T, j1 := J}) ->
+ ?_test(begin
+ ok = couch_jobs:add(T, J, #{}),
+
+ {ok, J, WLock1} = couch_jobs:accept(T),
+
+ ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) ->
+ couch_jobs:resubmit(Tx, T, J, undefined, WLock1)
+ end)),
+
+ ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) ->
+ couch_jobs:finish(Tx, T, J, #{?DATA => 1}, WLock1)
+ end)),
+
+ ?assertEqual({ok, #{?DATA => 1}, pending}, couch_jobs:get_job(T, J)),
+
+ {ok, J, WLock2} = couch_jobs:accept(T),
+
+ ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) ->
+ couch_jobs:finish(Tx, T, J, #{?DATA => 2}, WLock2)
+ end)),
+ ?assertEqual({ok, #{?DATA => 2}, finished}, couch_jobs:get_job(T, J)),
+
+ ?assertEqual(ok, couch_jobs:remove(T, J))
+ end).
+
+
+add_custom_priority(#{t1 := T, j1 := J1, j2 := J2}) ->
+ ?_test(begin
+ ?assertEqual(ok, couch_jobs:add(T, J1, #{?PRIORITY => 5})),
+ ?assertEqual(ok, couch_jobs:add(T, J2, #{?PRIORITY => 3})),
+
+ ?assertMatch({ok, J2, _}, couch_jobs:accept(T)),
+ ?assertMatch({ok, J1, _}, couch_jobs:accept(T)),
+ ?assertMatch(not_found, couch_jobs:accept(T))
+ end).
+
+
+resubmit_custom_priority(#{t1 := T, j1 := J}) ->
+ ?_test(begin
+ ?assertEqual(ok, couch_jobs:add(T, J, #{?PRIORITY => 7})),
+ {ok, J, WLock} = couch_jobs:accept(T),
+ ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) ->
+ couch_jobs:resubmit(Tx, T, J, 9, WLock)
+ end)),
+ ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) ->
+ couch_jobs:finish(Tx, T, J, #{?DATA => 1}, WLock)
+ end)),
+ ?assertEqual({ok, #{?DATA => 1, ?PRIORITY => 9}, pending},
+ couch_jobs:get_job(T, J))
+ end).
+
+
+accept_max_priority(#{t1 := T, j1 := J1, j2 := J2}) ->
+ ?_test(begin
+ ok = couch_jobs:add(T, J1, #{?PRIORITY => <<"5">>}),
+ ok = couch_jobs:add(T, J2, #{?PRIORITY => <<"3">>}),
+ ?assertEqual(not_found, couch_jobs:accept(T, <<"2">>)),
+ ?assertMatch({ok, J2, _}, couch_jobs:accept(T, <<"3">>)),
+ ?assertMatch({ok, J1, _}, couch_jobs:accept(T, <<"9">>))
+ end).
+
+
+subscribe(#{t1 := T, j1 := J}) ->
+ ?_test(begin
+ ok = couch_jobs:add(T, J, #{}),
+
+ SubRes0 = couch_jobs:subscribe(T, J),
+ ?assertMatch({ok, {_, _}, pending}, SubRes0),
+ {ok, SubId0, pending} = SubRes0,
+
+ ?assertEqual(ok, couch_jobs:unsubscribe(SubId0)),
+
+ SubRes = couch_jobs:subscribe(T, J),
+ ?assertMatch({ok, {_, _}, pending}, SubRes),
+ {ok, SubId, pending} = SubRes,
+
+ {ok, J, WLock} = couch_jobs:accept(T),
+ ?assertMatch(timeout, couch_jobs:wait_job_state(SubId, finished, 50)),
+ ?assertMatch({T, J, running}, couch_jobs:wait_job_state(SubId, 5000)),
+
+ ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) ->
+ couch_jobs:update(Tx, T, J, #{?DATA => 1}, WLock)
+ end)),
+
+ % Make sure we get intermediate `running` updates
+ ?assertMatch({T, J, running}, couch_jobs:wait_job_state(SubId, 5000)),
+
+ ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) ->
+ couch_jobs:finish(Tx, T, J, #{}, WLock)
+ end)),
+ ?assertMatch({T, J, finished}, couch_jobs:wait_job_state(SubId,
+ 5000)),
+
+ ?assertEqual(timeout, couch_jobs:wait_job_state(SubId, 50)),
+ ?assertEqual(finished, couch_jobs:subscribe(T, J)),
+
+ ?assertEqual(ok, couch_jobs:remove(T, J))
+ end).
+
+
+subscribe_callback(#{t1 := T, j1 := J}) ->
+ ?_test(begin
+ ok = couch_jobs:add(T, J, #{}),
+
+ TestPid = self(),
+ SomeRef = make_ref(),
+ Cbk = fun(Type, JobId, JobState) ->
+ TestPid ! {SomeRef, Type, JobId, JobState}
+ end,
+
+ SubRes = couch_jobs:subscribe(T, J, Cbk),
+ ?assertMatch({ok, {_, _}, pending}, SubRes),
+ {ok, _, pending} = SubRes,
+
+ {ok, J, WLock} = couch_jobs:accept(T),
+ receive {SomeRef, T, J, running} -> ok end,
+
+ ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) ->
+ couch_jobs:finish(Tx, T, J, #{}, WLock)
+ end)),
+ receive {SomeRef, T, J, finished} -> ok end
+ end).
+
+
+subscribe_errors(#{t1 := T, j1 := J}) ->
+ ?_test(begin
+ ok = couch_jobs:add(T, J, #{}),
+
+ Cbk = fun(_Type, _JobId, _JobState) -> ok end,
+
+ ?assertMatch({error, _}, couch_jobs:subscribe(<<"badtype">>, J)),
+ ?assertMatch({error, _}, couch_jobs:subscribe(<<"badtype">>, J, Cbk)),
+
+ ?assertEqual(not_found, couch_jobs:subscribe(T, <<"j5">>)),
+ ?assertEqual(not_found, couch_jobs:subscribe(T, <<"j5">>, Cbk)),
+
+ {ok, J, WLock} = couch_jobs:accept(T),
+ ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) ->
+ couch_jobs:finish(Tx, T, J, #{}, WLock)
+ end)),
+
+ ?assertEqual(finished, couch_jobs:subscribe(T, J)),
+ ?assertEqual(finished, couch_jobs:subscribe(T, J, Cbk))
+ end).
+
+
+enqueue_inactive(#{t1 := T, j1 := J, t1_timeout := Timeout}) ->
+ {timeout, 15, ?_test(begin
+ ok = couch_jobs:add(T, J, #{}),
+
+ {ok, J, WLock} = couch_jobs:accept(T),
+
+ {ok, SubId, running} = couch_jobs:subscribe(T, J),
+ ?assertEqual({T, J, pending}, couch_jobs:wait_job_state(SubId,
+ pending, 3 * Timeout * 1000)),
+
+ ?assertMatch({ok, #{}, pending}, couch_jobs:get_job(T, J)),
+
+ ?assertEqual(worker_conflict, fabric2_fdb:transactional(fun(Tx) ->
+ couch_jobs:update(Tx, T, J, #{?DATA => 1}, WLock)
+ end)),
+
+
+ ?assertEqual(worker_conflict, fabric2_fdb:transactional(fun(Tx) ->
+ couch_jobs:finish(Tx, T, J, #{}, WLock)
+ end)),
+
+
+ ?assertEqual(worker_conflict, fabric2_fdb:transactional(fun(Tx) ->
+ couch_jobs:resubmit(Tx, T, J, undefined, WLock)
+ end))
+ end)}.
+
+
+cancel_running_job(#{t1 := T, j1 := J}) ->
+ ?_test(begin
+ ok = couch_jobs:add(T, J, #{}),
+ {ok, J, WLock} = couch_jobs:accept(T),
+ ?assertEqual(canceled, couch_jobs:remove(T, J)),
+ % Try again and it should be a no-op
+ ?assertEqual(canceled, couch_jobs:remove(T, J)),
+
+ ?assertEqual(canceled, fabric2_fdb:transactional(fun(Tx) ->
+ couch_jobs:update(Tx, T, J, #{?DATA => 1}, WLock)
+ end)),
+
+ ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) ->
+ couch_jobs:finish(Tx, T, J, #{?DATA => 2}, WLock)
+ end)),
+
+ ?assertMatch({ok, #{?DATA := 2}, finished}, couch_jobs:get_job(T, J)),
+
+ ?assertEqual(ok, couch_jobs:remove(T, J))
+ end).
+
+
+stop_and_remove_running_job(#{t1 := T, j1 := J}) ->
+ ?_test(begin
+ ok = couch_jobs:add(T, J, #{}),
+ {ok, J, WLock} = couch_jobs:accept(T),
+ {_, Ref} = spawn_monitor(fun() ->
+ exit({result, couch_jobs:stop_and_remove(T, J, 10000)})
+ end),
+
+ fabric2_fdb:transactional(fun(Tx) ->
+ couch_jobs:update(Tx, T, J, #{?DATA => 1}, WLock)
+ end),
+
+ fabric2_fdb:transactional(fun(Tx) ->
+ couch_jobs:update(Tx, T, J, #{?DATA => 2}, WLock)
+ end),
+
+ ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) ->
+ couch_jobs:finish(Tx, T, J, #{?DATA => 3}, WLock)
+ end)),
+
+ Exit = receive {'DOWN', Ref, _, _, Reason} -> Reason end,
+ ?assertEqual({result, ok}, Exit),
+
+ ?assertMatch(not_found, couch_jobs:get_job(T, J))
+ end).
+
+
+clear_type_works(#{t1 := T, j1 := J}) ->
+ ?_test(begin
+ ok = couch_jobs:add(T, J, #{}),
+ ?assertEqual([{J, pending, #{}}], couch_jobs:get_jobs(T)),
+ couch_jobs_fdb:clear_type(T),
+ ?assertEqual([], couch_jobs:get_jobs(T))
+ end).