summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2019-06-12 16:11:56 -0400
committerPaul J. Davis <paul.joseph.davis@gmail.com>2019-07-31 11:55:30 -0500
commit0c2d674d6aff8fe6d3458db65c14594dce5e48dc (patch)
treebfa1941d96259a106a317e083d3606996fb30289
parentb9ee168426308db1f3212795fba48ee66edbec42 (diff)
downloadcouchdb-0c2d674d6aff8fe6d3458db65c14594dce5e48dc.tar.gz
CouchDB background jobs
RFC: https://github.com/apache/couchdb-documentation/pull/409 Main API is in the `couch_jobs` module. Additional description of internals is in the README.md file.
-rw-r--r--rebar.config.script1
-rw-r--r--rel/overlay/etc/default.ini23
-rw-r--r--rel/reltool.config2
-rw-r--r--src/couch_jobs/.gitignore4
-rw-r--r--src/couch_jobs/README.md62
-rw-r--r--src/couch_jobs/rebar.config14
-rw-r--r--src/couch_jobs/src/couch_jobs.app.src31
-rw-r--r--src/couch_jobs/src/couch_jobs.erl378
-rw-r--r--src/couch_jobs/src/couch_jobs.hrl52
-rw-r--r--src/couch_jobs/src/couch_jobs_activity_monitor.erl133
-rw-r--r--src/couch_jobs/src/couch_jobs_activity_monitor_sup.erl64
-rw-r--r--src/couch_jobs/src/couch_jobs_app.erl26
-rw-r--r--src/couch_jobs/src/couch_jobs_fdb.erl679
-rw-r--r--src/couch_jobs/src/couch_jobs_notifier.erl285
-rw-r--r--src/couch_jobs/src/couch_jobs_notifier_sup.erl64
-rw-r--r--src/couch_jobs/src/couch_jobs_pending.erl143
-rw-r--r--src/couch_jobs/src/couch_jobs_server.erl193
-rw-r--r--src/couch_jobs/src/couch_jobs_sup.erl66
-rw-r--r--src/couch_jobs/src/couch_jobs_type_monitor.erl84
-rw-r--r--src/couch_jobs/test/couch_jobs_tests.erl606
20 files changed, 2909 insertions, 1 deletions
diff --git a/rebar.config.script b/rebar.config.script
index d7c0d9a0a..14fdf28f2 100644
--- a/rebar.config.script
+++ b/rebar.config.script
@@ -87,6 +87,7 @@ SubDirs = [
"src/ddoc_cache",
"src/dreyfus",
"src/fabric",
+ "src/couch_jobs",
"src/global_changes",
"src/mango",
"src/rexi",
diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini
index dbb0744b9..69f57fff2 100644
--- a/rel/overlay/etc/default.ini
+++ b/rel/overlay/etc/default.ini
@@ -520,4 +520,25 @@ min_priority = 2.0
; value will be rejected. If this config setting is not defined,
; CouchDB will use the value of `max_limit` instead. If neither is
; defined, the default is 2000 as stated here.
-; max_limit_partitions = 2000 \ No newline at end of file
+; max_limit_partitions = 2000
+
+[couch_jobs]
+;
+; Maximum jitter used when checking for active job timeouts
+;activity_monitor_max_jitter_msec = 10000
+;
+; Hold-off applied before notifying subscribers. Since active jobs can be
+; queried more effiently using a range read, increasing this value should make
+; notifications more performant, however, it would also increase notification
+; latency.
+;type_monitor_holdoff_msec = 50
+;
+; Timeout used when waiting for the job type notification watches. The default
+; value of "infinity" should work well in most cases.
+;type_monitor_timeout_msec = infinity
+;
+; How often to check for the presense of new job types.
+;type_check_period_msec = 15000
+;
+; Jitter applied when checking for new job types.
+;type_check_max_jitter_msec = 5000
diff --git a/rel/reltool.config b/rel/reltool.config
index da85f36bc..2f03e61a4 100644
--- a/rel/reltool.config
+++ b/rel/reltool.config
@@ -33,6 +33,7 @@
config,
couch,
couch_epi,
+ couch_jobs,
couch_index,
couch_log,
couch_mrview,
@@ -91,6 +92,7 @@
{app, config, [{incl_cond, include}]},
{app, couch, [{incl_cond, include}]},
{app, couch_epi, [{incl_cond, include}]},
+ {app, couch_jobs, [{incl_cond, include}]},
{app, couch_index, [{incl_cond, include}]},
{app, couch_log, [{incl_cond, include}]},
{app, couch_mrview, [{incl_cond, include}]},
diff --git a/src/couch_jobs/.gitignore b/src/couch_jobs/.gitignore
new file mode 100644
index 000000000..6ef4c5212
--- /dev/null
+++ b/src/couch_jobs/.gitignore
@@ -0,0 +1,4 @@
+*.beam
+.eunit
+ebin/couch_jobs.app
+.DS_Store \ No newline at end of file
diff --git a/src/couch_jobs/README.md b/src/couch_jobs/README.md
new file mode 100644
index 000000000..bc45d323c
--- /dev/null
+++ b/src/couch_jobs/README.md
@@ -0,0 +1,62 @@
+CouchDB Jobs Application
+========================
+
+Run background jobs in CouchDB
+
+Design (RFC) discussion: https://github.com/apache/couchdb-documentation/pull/409/files
+
+This is a description of some of the modules:
+
+ * `couch_jobs`: The main API module. It contains functions for creating,
+ accepting, executing, and monitoring jobs. A common pattern in this module
+ is to get a jobs transaction object (named `JTx` throughout the code), then
+ start a transaction and call a bunch of functions from `couch_jobs_fdb` in
+ that transaction.
+
+ * `couch_jobs_fdb`: This is a layer that talks to FDB. There is a lot of tuple
+ packing and unpacking, reading ranges and also managing transaction objects.
+
+ * `couch_jobs_pending`: This module implements the pending jobs queue. These
+ functions could all go in `couch_jobs_fdb` but the implemention was fairly
+ self-contained, with its own private helper functions, so it made sense to
+ move to a separate module.
+
+ * `couch_jobs_activity_monitor`: Here is where the "activity monitor"
+ functionality is implemented. That's done with a `gen_server` instance
+ running for each type. This `gen_server` periodically check if there are
+ inactive jobs for its type, and if they are, it re-enqueues them. If the
+ timeout value changes, then it skips the pending check, until the new
+ timeout expires.
+
+ * `couch_jobs_activity_monitor_sup` : This is a simple one-for-one supervisor
+ to spawn `couch_jobs_activity_monitor` instances for each type.
+
+ * `couch_jobs_type_monitor` : This is a helper process meant to be
+ `spawn_link`-ed from a parent `gen_server`. It then monitors activity for a
+ particular job type. If any jobs of that type have an update it notifies the
+ parent process.
+
+ * `couch_jobs_notifier`: Is responsible for subscriptions. Just like
+ with activity monitor there is a `gen_server` instance running per
+ each type. It uses a linked `couch_jobs_type_monitor` process to wait for
+ any job updates. When an update notification arrives, it can efficiently
+ find out if any active jobs have been updated, by reading the `(?JOBS,
+ ?ACTIVITY, Type, Sequence)` range. That should account for the bulk of
+ changes. The jobs that are not active anymore, are queried individually.
+ Subscriptions are managed in an ordered set ETS table.
+
+ * `couch_jobs_notifier_sup`: A simple one-for-one supervisor to spawn
+ `couch_jobs_notifier` processes for each type.
+
+ * `couch_jobs_server`: This is a `gen_server` which keeps track of job
+ types. It then starts or stops activity monitors and notifiers for each
+ type. To do that it queries the ` (?JOBS, ?ACTIVITY_TIMEOUT)` periodically.
+
+ * `couch_jobs_sup`: This is the main application supervisor. The restart
+ strategy is `rest_for_one`, meaning that a when a child restarts, the
+ sibling following it will restart. One interesting entry there is the first
+ child which is used just to create an ETS table used by `couch_jobs_fdb` to
+ cache transaction object (`JTx` mentioned above). That child calls
+ `init_cache/0`, where it creates the ETS then returns with `ignore` so it
+ doesn't actually spawn a process. The ETS table will be owned by the
+ supervisor process.
diff --git a/src/couch_jobs/rebar.config b/src/couch_jobs/rebar.config
new file mode 100644
index 000000000..362c8785e
--- /dev/null
+++ b/src/couch_jobs/rebar.config
@@ -0,0 +1,14 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+{cover_enabled, true}.
+{cover_print_enabled, true}.
diff --git a/src/couch_jobs/src/couch_jobs.app.src b/src/couch_jobs/src/couch_jobs.app.src
new file mode 100644
index 000000000..8ded14c6f
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs.app.src
@@ -0,0 +1,31 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+{application, couch_jobs, [
+ {description, "CouchDB Jobs"},
+ {vsn, git},
+ {mod, {couch_jobs_app, []}},
+ {registered, [
+ couch_jobs_sup,
+ couch_jobs_activity_monitor_sup,
+ couch_jobs_notifier_sup,
+ couch_jobs_server
+ ]},
+ {applications, [
+ kernel,
+ stdlib,
+ erlfdb,
+ couch_log,
+ config,
+ fabric
+ ]}
+]}.
diff --git a/src/couch_jobs/src/couch_jobs.erl b/src/couch_jobs/src/couch_jobs.erl
new file mode 100644
index 000000000..d469ed41a
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs.erl
@@ -0,0 +1,378 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs).
+
+-export([
+ % Job creation
+ add/4,
+ add/5,
+ remove/3,
+ get_job_data/3,
+ get_job_state/3,
+
+ % Job processing
+ accept/1,
+ accept/2,
+ finish/2,
+ finish/3,
+ resubmit/2,
+ resubmit/3,
+ is_resubmitted/1,
+ update/2,
+ update/3,
+
+ % Subscriptions
+ subscribe/2,
+ subscribe/3,
+ unsubscribe/1,
+ wait/2,
+ wait/3,
+
+ % Type timeouts
+ set_type_timeout/2,
+ clear_type_timeout/1,
+ get_type_timeout/1
+]).
+
+
+-include("couch_jobs.hrl").
+
+
+-define(MIN_ACCEPT_WAIT_MSEC, 100).
+
+
+%% Job Creation API
+
+-spec add(jtx(), job_type(), job_id(), job_data()) -> ok | {error, any()}.
+add(Tx, Type, JobId, JobData) ->
+ add(Tx, Type, JobId, JobData, 0).
+
+
+-spec add(jtx(), job_type(), job_id(), job_data(), scheduled_time()) ->
+ ok | {error, any()}.
+add(Tx, Type, JobId, JobData, ScheduledTime) when is_binary(JobId),
+ is_map(JobData), is_integer(ScheduledTime) ->
+ couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+ case couch_jobs_fdb:add(JTx, Type, JobId, JobData, ScheduledTime) of
+ {ok, _, _, _} -> ok;
+ {error, Error} -> {error, Error}
+ end
+ end).
+
+
+-spec remove(jtx(), job_type(), job_id()) -> ok | {error, any()}.
+remove(Tx, Type, JobId) when is_binary(JobId) ->
+ couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+ couch_jobs_fdb:remove(JTx, job(Type, JobId))
+ end).
+
+
+-spec get_job_data(jtx(), job_type(), job_id()) -> {ok, job_data()} | {error,
+ any()}.
+get_job_data(Tx, Type, JobId) when is_binary(JobId) ->
+ couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+ case couch_jobs_fdb:get_job_state_and_data(JTx, job(Type, JobId)) of
+ {ok, _Seq, _State, Data} ->
+ {ok, couch_jobs_fdb:decode_data(Data)};
+ {error, Error} ->
+ {error, Error}
+ end
+ end).
+
+
+-spec get_job_state(jtx(), job_type(), job_id()) -> {ok, job_state()} | {error,
+ any()}.
+get_job_state(Tx, Type, JobId) when is_binary(JobId) ->
+ couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+ case couch_jobs_fdb:get_job_state_and_data(JTx, job(Type, JobId)) of
+ {ok, _Seq, State, _Data} ->
+ {ok, State};
+ {error, Error} ->
+ {error, Error}
+ end
+ end).
+
+
+%% Job processor API
+
+-spec accept(job_type()) -> {ok, job(), job_data()} | {error, any()}.
+accept(Type) ->
+ accept(Type, #{}).
+
+
+-spec accept(job_type(), job_accept_opts()) -> {ok, job()} | {error, any()}.
+accept(Type, #{} = Opts) ->
+ NoSched = maps:get(no_schedule, Opts, false),
+ MaxSchedTimeDefault = case NoSched of
+ true -> 0;
+ false -> ?UNDEFINED_MAX_SCHEDULED_TIME
+ end,
+ MaxSchedTime = maps:get(max_sched_time, Opts, MaxSchedTimeDefault),
+ Timeout = maps:get(timeout, Opts, infinity),
+ case NoSched andalso MaxSchedTime =/= 0 of
+ true ->
+ {error, no_schedule_require_0_max_sched_time};
+ false ->
+ accept_loop(Type, NoSched, MaxSchedTime, Timeout)
+ end.
+
+
+-spec finish(jtx(), job()) -> ok | {error, any()}.
+finish(Tx, Job) ->
+ finish(Tx, Job, undefined).
+
+
+-spec finish(jtx(), job(), job_data()) -> ok | {error, any()}.
+finish(Tx, #{jlock := <<_/binary>>} = Job, JobData) ->
+ couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+ couch_jobs_fdb:finish(JTx, Job, JobData)
+ end).
+
+
+-spec resubmit(jtx(), job()) -> {ok, job()} | {error, any()}.
+resubmit(Tx, Job) ->
+ resubmit(Tx, Job, ?UNDEFINED_MAX_SCHEDULED_TIME).
+
+
+-spec resubmit(jtx(), job(), scheduled_time()) -> {ok, job()} | {error, any()}.
+resubmit(Tx, #{jlock := <<_/binary>>} = Job, SchedTime) ->
+ couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+ couch_jobs_fdb:resubmit(JTx, Job, SchedTime)
+ end).
+
+
+-spec is_resubmitted(job()) -> true | false.
+is_resubmitted(#{job := true} = Job) ->
+ maps:get(resubmit, Job, false).
+
+
+-spec update(jtx(), job()) -> {ok, job()} | {error, any()}.
+update(Tx, Job) ->
+ update(Tx, Job, undefined).
+
+
+-spec update(jtx(), job(), job_data()) -> {ok, job()} | {error, any()}.
+update(Tx, #{jlock := <<_/binary>>} = Job, JobData) ->
+ couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+ couch_jobs_fdb:update(JTx, Job, JobData)
+ end).
+
+
+%% Subscription API
+
+% Receive events as messages. Wait for them using `wait/2,3`
+% functions.
+%
+
+-spec subscribe(job_type(), job_id()) -> {ok, job_subscription(), job_state(),
+ job_data()} | {ok, finished, job_data()} | {error, any()}.
+subscribe(Type, JobId) ->
+ subscribe(undefined, Type, JobId).
+
+
+-spec subscribe(jtx(), job_type(), job_id()) -> {ok, job_subscription(),
+ job_state(), job_data()} | {ok, finished, job_data()} | {error, any()}.
+subscribe(Tx, Type, JobId) ->
+ StateData = couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+ Job = #{job => true, type => Type, id => JobId},
+ couch_jobs_fdb:get_job_state_and_data(JTx, Job)
+ end),
+ case StateData of
+ {ok, _Seq, finished, Data} ->
+ {ok, finished, couch_jobs_fdb:decode_data(Data)};
+ {ok, Seq, State, Data} ->
+ case couch_jobs_notifier:subscribe(Type, JobId, State, Seq) of
+ {ok, SubRef} ->
+ Data1 = couch_jobs_fdb:decode_data(Data),
+ {ok, SubRef, State, Data1};
+ {error, Error} ->
+ {error, Error}
+ end;
+ {error, Error} ->
+ {error, Error}
+ end.
+
+
+% Unsubscribe from getting notifications based on a particular subscription.
+% Each subscription should be followed by its own unsubscription call. However,
+% subscriber processes are also monitored and auto-unsubscribed if they exit.
+% If subscribing process is exiting, calling this function is optional.
+%
+-spec unsubscribe(job_subscription()) -> ok.
+unsubscribe({Server, Ref}) when is_pid(Server), is_reference(Ref) ->
+ try
+ couch_jobs_notifier:unsubscribe(Server, Ref)
+ after
+ flush_notifications(Ref)
+ end.
+
+
+% Wait to receive job state updates
+%
+-spec wait(job_subscription() | [job_subscription()], timeout()) ->
+ {job_type(), job_id(), job_state(), job_data()} | timeout.
+wait({_, Ref}, Timeout) ->
+ receive
+ {?COUCH_JOBS_EVENT, Ref, Type, Id, State, Data} ->
+ {Type, Id, State, couch_jobs_fdb:decode_data(Data)}
+ after
+ Timeout -> timeout
+ end;
+
+wait(Subs, Timeout) when is_list(Subs) ->
+ {Result, ResendQ} = wait_any(Subs, Timeout, []),
+ lists:foreach(fun(Msg) -> self() ! Msg end, ResendQ),
+ Result.
+
+
+-spec wait(job_subscription() | [job_subscription()], job_state(), timeout())
+ -> {job_type(), job_id(), job_state(), job_data()} | timeout.
+wait({_, Ref} = Sub, State, Timeout) when is_atom(State) ->
+ receive
+ {?COUCH_JOBS_EVENT, Ref, Type, Id, MsgState, Data0} ->
+ case MsgState =:= State of
+ true ->
+ Data = couch_jobs_fdb:decode_data(Data0),
+ {Type, Id, State, Data};
+ false ->
+ wait(Sub, State, Timeout)
+ end
+ after
+ Timeout -> timeout
+ end;
+
+wait(Subs, State, Timeout) when is_list(Subs),
+ is_atom(State) ->
+ {Result, ResendQ} = wait_any(Subs, State, Timeout, []),
+ lists:foreach(fun(Msg) -> self() ! Msg end, ResendQ),
+ Result.
+
+
+%% Job type timeout API
+
+% These functions manipulate the activity timeout for each job type.
+
+-spec set_type_timeout(job_type(), timeout()) -> ok.
+set_type_timeout(Type, Timeout) ->
+ couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+ couch_jobs_fdb:set_type_timeout(JTx, Type, Timeout)
+ end).
+
+
+-spec clear_type_timeout(job_type()) -> ok.
+clear_type_timeout(Type) ->
+ couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+ couch_jobs_fdb:clear_type_timeout(JTx, Type)
+ end).
+
+
+-spec get_type_timeout(job_type()) -> timeout().
+get_type_timeout(Type) ->
+ couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+ couch_jobs_fdb:get_type_timeout(JTx, Type)
+ end).
+
+
+%% Private utilities
+
+accept_loop(Type, NoSched, MaxSchedTime, Timeout) ->
+ TxFun = fun(JTx) ->
+ couch_jobs_fdb:accept(JTx, Type, MaxSchedTime, NoSched)
+ end,
+ case couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), TxFun) of
+ {ok, Job, Data} ->
+ {ok, Job, Data};
+ {not_found, PendingWatch} ->
+ case wait_pending(PendingWatch, MaxSchedTime, Timeout) of
+ {error, not_found} ->
+ {error, not_found};
+ ok ->
+ accept_loop(Type, NoSched, MaxSchedTime, Timeout)
+ end
+ end.
+
+
+job(Type, JobId) ->
+ #{job => true, type => Type, id => JobId}.
+
+
+wait_pending(PendingWatch, _MaxSTime, 0) ->
+ erlfdb:cancel(PendingWatch, [flush]),
+ {error, not_found};
+
+wait_pending(PendingWatch, MaxSTime, UserTimeout) ->
+ NowMSec = erlang:system_time(millisecond),
+ Timeout0 = max(?MIN_ACCEPT_WAIT_MSEC, MaxSTime * 1000 - NowMSec),
+ Timeout = min(limit_timeout(Timeout0), UserTimeout),
+ try
+ erlfdb:wait(PendingWatch, [{timeout, Timeout}]),
+ ok
+ catch
+ error:{timeout, _} ->
+ erlfdb:cancel(PendingWatch, [flush]),
+ {error, not_found}
+ end.
+
+
+wait_any(Subs, Timeout0, ResendQ) when is_list(Subs) ->
+ Timeout = limit_timeout(Timeout0),
+ receive
+ {?COUCH_JOBS_EVENT, Ref, Type, Id, State, Data0} = Msg ->
+ case lists:keyfind(Ref, 2, Subs) of
+ false ->
+ wait_any(Subs, Timeout, [Msg | ResendQ]);
+ {_, Ref} ->
+ Data = couch_jobs_fdb:decode_data(Data0),
+ {{Type, Id, State, Data}, ResendQ}
+ end
+ after
+ Timeout -> {timeout, ResendQ}
+ end.
+
+
+wait_any(Subs, State, Timeout0, ResendQ) when
+ is_list(Subs) ->
+ Timeout = limit_timeout(Timeout0),
+ receive
+ {?COUCH_JOBS_EVENT, Ref, Type, Id, MsgState, Data0} = Msg ->
+ case lists:keyfind(Ref, 2, Subs) of
+ false ->
+ wait_any(Subs, Timeout, [Msg | ResendQ]);
+ {_, Ref} ->
+ case MsgState =:= State of
+ true ->
+ Data = couch_jobs_fdb:decode_data(Data0),
+ {{Type, Id, State, Data}, ResendQ};
+ false ->
+ wait_any(Subs, Timeout, ResendQ)
+ end
+ end
+ after
+ Timeout -> {timeout, ResendQ}
+ end.
+
+
+limit_timeout(Timeout) when is_integer(Timeout), Timeout < 16#FFFFFFFF ->
+ Timeout;
+
+limit_timeout(_Timeout) ->
+ infinity.
+
+
+flush_notifications(Ref) ->
+ receive
+ {?COUCH_JOBS_EVENT, Ref, _, _, _} ->
+ flush_notifications(Ref)
+ after
+ 0 -> ok
+ end.
diff --git a/src/couch_jobs/src/couch_jobs.hrl b/src/couch_jobs/src/couch_jobs.hrl
new file mode 100644
index 000000000..2a02d760f
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs.hrl
@@ -0,0 +1,52 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+
+% Job map/json field definitions
+%
+-define(OPT_PRIORITY, <<"priority">>).
+-define(OPT_DATA, <<"data">>).
+-define(OPT_CANCEL, <<"cancel">>).
+-define(OPT_RESUBMIT, <<"resubmit">>).
+
+% These might be in a fabric public hrl eventually
+%
+-define(uint2bin(I), binary:encode_unsigned(I, little)).
+-define(bin2uint(I), binary:decode_unsigned(I, little)).
+-define(UNSET_VS, {versionstamp, 16#FFFFFFFFFFFFFFFF, 16#FFFF}).
+-define(METADATA_VERSION_KEY, <<"$metadata_version_key$">>).
+
+% Data model definitions
+%
+-define(JOBS, 51). % coordinate with fabric2.hrl
+-define(DATA, 1).
+-define(PENDING, 2).
+-define(WATCHES_PENDING, 3).
+-define(WATCHES_ACTIVITY, 4).
+-define(ACTIVITY_TIMEOUT, 5).
+-define(ACTIVITY, 6).
+
+
+-define(COUCH_JOBS_EVENT, '$couch_jobs_event').
+-define(COUCH_JOBS_CURRENT, '$couch_jobs_current').
+-define(UNDEFINED_MAX_SCHEDULED_TIME, 1 bsl 36).
+
+
+-type jtx() :: map() | undefined | tuple().
+-type job_id() :: binary().
+-type job_type() :: tuple() | binary() | non_neg_integer().
+-type job() :: map().
+-type job_data() :: map() | undefined.
+-type job_accept_opts() :: map().
+-type scheduled_time() :: non_neg_integer() | undefined.
+-type job_state() :: running | pending | finished.
+-type job_subscription() :: {pid(), reference()}.
diff --git a/src/couch_jobs/src/couch_jobs_activity_monitor.erl b/src/couch_jobs/src/couch_jobs_activity_monitor.erl
new file mode 100644
index 000000000..ef82e6bd9
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_activity_monitor.erl
@@ -0,0 +1,133 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_activity_monitor).
+
+-behaviour(gen_server).
+
+
+-export([
+ start_link/1
+]).
+
+-export([
+ init/1,
+ terminate/2,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ code_change/3
+]).
+
+-record(st, {
+ jtx,
+ type,
+ tref,
+ timeout = 0,
+ vs = not_found
+}).
+
+
+-define(MAX_JITTER_DEFAULT, 10000).
+-define(MISSING_TIMEOUT_CHECK, 5000).
+
+
+start_link(Type) ->
+ gen_server:start_link(?MODULE, [Type], []).
+
+
+%% gen_server callbacks
+
+init([Type]) ->
+ St = #st{jtx = couch_jobs_fdb:get_jtx(), type = Type},
+ {ok, schedule_check(St)}.
+
+
+terminate(_, _St) ->
+ ok.
+
+
+handle_call(Msg, _From, St) ->
+ {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
+
+
+handle_cast(Msg, St) ->
+ {stop, {bad_cast, Msg}, St}.
+
+
+handle_info(check_activity, St) ->
+ St1 = check_activity(St),
+ St2 = schedule_check(St1),
+ {noreply, St2};
+
+handle_info({Ref, ready}, St) when is_reference(Ref) ->
+ % Don't crash out couch_jobs_server and the whole application would need to
+ % eventually do proper cleanup in erlfdb:wait timeout code.
+ LogMsg = "~p : spurious erlfdb future ready message ~p",
+ couch_log:error(LogMsg, [?MODULE, Ref]),
+ {noreply, St};
+
+handle_info(Msg, St) ->
+ {stop, {bad_info, Msg}, St}.
+
+
+code_change(_OldVsn, St, _Extra) ->
+ {ok, St}.
+
+
+% Private helper functions
+
+check_activity(#st{jtx = JTx, type = Type, vs = not_found} = St) ->
+ NewVS = couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+ couch_jobs_fdb:get_activity_vs(JTx1, Type)
+ end),
+ St#st{vs = NewVS};
+
+check_activity(#st{jtx = JTx, type = Type, vs = VS} = St) ->
+ NewVS = couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+ NewVS = couch_jobs_fdb:get_activity_vs(JTx1, Type),
+ JobIds = couch_jobs_fdb:get_inactive_since(JTx1, Type, VS),
+ couch_jobs_fdb:re_enqueue_inactive(JTx1, Type, JobIds),
+ NewVS
+ end),
+ St#st{vs = NewVS}.
+
+
+get_timeout_msec(JTx, Type) ->
+ TimeoutVal = couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+ couch_jobs_fdb:get_type_timeout(JTx1, Type)
+ end),
+ case TimeoutVal of
+ not_found -> not_found;
+ ValSeconds -> timer:seconds(ValSeconds)
+ end.
+
+
+schedule_check(#st{jtx = JTx, type = Type, timeout = OldTimeout} = St) ->
+ % Reset versionstamp if timeout changed.
+ St1 = case get_timeout_msec(JTx, Type) of
+ not_found ->
+ St#st{vs = not_found, timeout = ?MISSING_TIMEOUT_CHECK};
+ OldTimeout ->
+ St;
+ NewTimeout ->
+ St#st{vs = not_found, timeout = NewTimeout}
+ end,
+ #st{timeout = Timeout} = St1,
+ MaxJitter = min(Timeout div 2, get_max_jitter_msec()),
+ Wait = Timeout + rand:uniform(max(1, MaxJitter)),
+ St1#st{tref = erlang:send_after(Wait, self(), check_activity)}.
+
+
+get_max_jitter_msec()->
+ config:get_integer("couch_jobs", "activity_monitor_max_jitter_msec",
+ ?MAX_JITTER_DEFAULT).
diff --git a/src/couch_jobs/src/couch_jobs_activity_monitor_sup.erl b/src/couch_jobs/src/couch_jobs_activity_monitor_sup.erl
new file mode 100644
index 000000000..b11161a24
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_activity_monitor_sup.erl
@@ -0,0 +1,64 @@
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_activity_monitor_sup).
+
+
+-behaviour(supervisor).
+
+
+-export([
+ start_link/0,
+
+ start_monitor/1,
+ stop_monitor/1,
+ get_child_pids/0
+]).
+
+-export([
+ init/1
+]).
+
+
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+
+start_monitor(Type) ->
+ supervisor:start_child(?MODULE, [Type]).
+
+
+stop_monitor(Pid) ->
+ supervisor:terminate_child(?MODULE, Pid).
+
+
+get_child_pids() ->
+ lists:map(fun({_Id, Pid, _Type, _Mod}) ->
+ Pid
+ end, supervisor:which_children(?MODULE)).
+
+
+init(_) ->
+ Flags = #{
+ strategy => simple_one_for_one,
+ intensity => 10,
+ period => 3
+ },
+ Children = [
+ #{
+ id => couch_jobs_monitor,
+ restart => temporary,
+ start => {couch_jobs_activity_monitor, start_link, []}
+ }
+ ],
+ {ok, {Flags, Children}}.
diff --git a/src/couch_jobs/src/couch_jobs_app.erl b/src/couch_jobs/src/couch_jobs_app.erl
new file mode 100644
index 000000000..720b94891
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_app.erl
@@ -0,0 +1,26 @@
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_app).
+
+
+-behaviour(application).
+
+
+-export([
+ start/2,
+ stop/1
+]).
+
+
+start(_Type, []) ->
+ couch_jobs_sup:start_link().
+
+
+stop([]) ->
+ ok.
diff --git a/src/couch_jobs/src/couch_jobs_fdb.erl b/src/couch_jobs/src/couch_jobs_fdb.erl
new file mode 100644
index 000000000..1317d03df
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_fdb.erl
@@ -0,0 +1,679 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_fdb).
+
+
+-export([
+ add/5,
+ remove/2,
+ get_job_state_and_data/2,
+ get_jobs/2,
+ get_jobs/3,
+
+ accept/4,
+ finish/3,
+ resubmit/3,
+ update/3,
+
+ set_type_timeout/3,
+ clear_type_timeout/2,
+ get_type_timeout/2,
+ get_types/1,
+
+ get_activity_vs/2,
+ get_activity_vs_and_watch/2,
+ get_active_since/3,
+ get_inactive_since/3,
+ re_enqueue_inactive/3,
+
+ init_cache/0,
+
+ encode_data/1,
+ decode_data/1,
+
+ get_jtx/0,
+ get_jtx/1,
+ tx/2,
+
+ get_job/2,
+ get_jobs/0
+]).
+
+
+-include("couch_jobs.hrl").
+
+
+-record(jv, {
+ seq,
+ jlock,
+ stime,
+ resubmit,
+ data
+}).
+
+
+-define(JOBS_ETS_KEY, jobs).
+-define(MD_TIMESTAMP_ETS_KEY, md_timestamp).
+-define(MD_VERSION_MAX_AGE_SEC, 10).
+-define(PENDING_SEQ, 0).
+
+
+% Data model
+%
+% (?JOBS, ?DATA, Type, JobId) = (Sequence, Lock, SchedTime, Resubmit, JobData)
+% (?JOBS, ?PENDING, Type, ScheduledTime, JobId) = ""
+% (?JOBS, ?WATCHES_PENDING, Type) = Counter
+% (?JOBS, ?WATCHES_ACTIVITY, Type) = Sequence
+% (?JOBS, ?ACTIVITY_TIMEOUT, Type) = ActivityTimeout
+% (?JOBS, ?ACTIVITY, Type, Sequence) = JobId
+%
+% In the ?DATA row Sequence can have these values:
+% 0 - when the job is pending
+% null - when the job is finished
+% Versionstamp - when the job is running
+
+
+% Job creation API
+
+add(#{jtx := true} = JTx0, Type, JobId, Data, STime) ->
+ #{tx := Tx} = JTx = get_jtx(JTx0),
+ Job = #{job => true, type => Type, id => JobId},
+ case get_type_timeout(JTx, Type) of
+ not_found ->
+ {error, no_type_timeout};
+ Int when is_integer(Int) ->
+ Key = job_key(JTx, Job),
+ case erlfdb:wait(erlfdb:get(Tx, Key)) of
+ <<_/binary>> ->
+ {ok, Job1} = resubmit(JTx, Job, STime),
+ #{seq := Seq, state := State, data := Data1} = Job1,
+ {ok, State, Seq, Data1};
+ not_found ->
+ try
+ maybe_enqueue(JTx, Type, JobId, STime, true, Data),
+ {ok, pending, ?PENDING_SEQ, Data}
+ catch
+ error:{json_encoding_error, Error} ->
+ {error, {json_encoding_error, Error}}
+ end
+ end
+ end.
+
+
+remove(#{jtx := true} = JTx0, #{job := true} = Job) ->
+ #{tx := Tx} = JTx = get_jtx(JTx0),
+ #{type := Type, id := JobId} = Job,
+ Key = job_key(JTx, Job),
+ case get_job_val(Tx, Key) of
+ #jv{stime = STime} ->
+ couch_jobs_pending:remove(JTx, Type, JobId, STime),
+ erlfdb:clear(Tx, Key),
+ ok;
+ not_found ->
+ {error, not_found}
+ end.
+
+
+get_job_state_and_data(#{jtx := true} = JTx, #{job := true} = Job) ->
+ case get_job_val(get_jtx(JTx), Job) of
+ #jv{seq = Seq, jlock = JLock, data = Data} ->
+ {ok, Seq, job_state(JLock, Seq), Data};
+ not_found ->
+ {error, not_found}
+ end.
+
+
+get_jobs(JTx, Type) ->
+ get_jobs(JTx, Type, fun(_) -> true end).
+
+
+get_jobs(#{jtx := true} = JTx, Type, Filter) when is_function(Filter, 1) ->
+ #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+ Prefix = erlfdb_tuple:pack({?DATA, Type}, Jobs),
+ Opts = [{streaming_mode, want_all}],
+ Result = erlfdb:wait(erlfdb:get_range_startswith(Tx, Prefix, Opts)),
+ lists:foldl(fun({K, V}, #{} = Acc) ->
+ {JobId} = erlfdb_tuple:unpack(K, Prefix),
+ case Filter(JobId) of
+ true ->
+ {Seq, JLock, _, _, Data} = erlfdb_tuple:unpack(V),
+ Acc#{JobId => {Seq, job_state(JLock, Seq), Data}};
+ false ->
+ Acc
+ end
+ end, #{}, Result).
+
+
+% Job processor API
+
+accept(#{jtx := true} = JTx0, Type, MaxSTime, NoSched)
+ when is_integer(MaxSTime), is_boolean(NoSched) ->
+ #{jtx := true, tx := Tx} = JTx = get_jtx(JTx0),
+ case couch_jobs_pending:dequeue(JTx, Type, MaxSTime, NoSched) of
+ {not_found, PendingWatch} ->
+ {not_found, PendingWatch};
+ {ok, JobId} ->
+ JLock = fabric2_util:uuid(),
+ Key = job_key(JTx, Type, JobId),
+ JV0 = get_job_val(Tx, Key),
+ #jv{jlock = null, data = Data} = JV0,
+ JV = JV0#jv{seq = ?UNSET_VS, jlock = JLock, resubmit = false},
+ set_job_val(Tx, Key, JV),
+ update_activity(JTx, Type, JobId, null, Data),
+ Job = #{
+ job => true,
+ type => Type,
+ id => JobId,
+ jlock => JLock
+ },
+ {ok, Job, decode_data(Data)}
+ end.
+
+
+finish(#{jtx := true} = JTx0, #{jlock := <<_/binary>>} = Job, Data) when
+ is_map(Data) orelse Data =:= undefined ->
+ #{tx := Tx} = JTx = get_jtx(JTx0),
+ #{type := Type, jlock := JLock, id := JobId} = Job,
+ case get_job_or_halt(Tx, job_key(JTx, Job), JLock) of
+ #jv{seq = Seq, stime = STime, resubmit = Resubmit, data = OldData} ->
+ NewData = case Data =:= undefined of
+ true -> OldData;
+ false -> Data
+ end,
+ try maybe_enqueue(JTx, Type, JobId, STime, Resubmit, NewData) of
+ ok ->
+ clear_activity(JTx, Type, Seq),
+ update_watch(JTx, Type)
+ catch
+ error:{json_encoding_error, Error} ->
+ {error, {json_encoding_error, Error}}
+ end;
+ halt ->
+ {error, halt}
+ end.
+
+
+resubmit(#{jtx := true} = JTx0, #{job := true} = Job, NewSTime) ->
+ #{tx := Tx} = JTx = get_jtx(JTx0),
+ #{type := Type, id := JobId} = Job,
+ Key = job_key(JTx, Job),
+ case get_job_val(Tx, Key) of
+ #jv{seq = Seq, jlock = JLock, stime = OldSTime, data = Data} = JV ->
+ STime = case NewSTime =:= undefined of
+ true -> OldSTime;
+ false -> NewSTime
+ end,
+ case job_state(JLock, Seq) of
+ finished ->
+ ok = maybe_enqueue(JTx, Type, JobId, STime, true, Data),
+ Job1 = Job#{
+ seq => ?PENDING_SEQ,
+ state => pending,
+ data => Data
+ },
+ {ok, Job1};
+ pending ->
+ JV1 = JV#jv{seq = ?PENDING_SEQ, stime = STime},
+ set_job_val(Tx, Key, JV1),
+ couch_jobs_pending:remove(JTx, Type, JobId, OldSTime),
+ couch_jobs_pending:enqueue(JTx, Type, STime, JobId),
+ Job1 = Job#{
+ stime => STime,
+ seq => ?PENDING_SEQ,
+ state => pending,
+ data => Data
+ },
+ {ok, Job1};
+ running ->
+ JV1 = JV#jv{stime = STime, resubmit = true},
+ set_job_val(Tx, Key, JV1),
+ {ok, Job#{resubmit => true, stime => STime,
+ state => running, seq => Seq, data => Data}}
+ end;
+ not_found ->
+ {error, not_found}
+ end.
+
+
+update(#{jtx := true} = JTx0, #{jlock := <<_/binary>>} = Job, Data0) when
+ is_map(Data0) orelse Data0 =:= undefined ->
+ #{tx := Tx} = JTx = get_jtx(JTx0),
+ #{jlock := JLock, type := Type, id := JobId} = Job,
+ Key = job_key(JTx, Job),
+ case get_job_or_halt(Tx, Key, JLock) of
+ #jv{seq = Seq, stime = STime, resubmit = Resubmit} = JV0 ->
+ Data = case Data0 =:= undefined of
+ true -> JV0#jv.data;
+ false -> Data0
+ end,
+ JV = JV0#jv{seq = ?UNSET_VS, data = Data},
+ try set_job_val(Tx, Key, JV) of
+ ok ->
+ update_activity(JTx, Type, JobId, Seq, Data),
+ {ok, Job#{resubmit => Resubmit, stime => STime}}
+ catch
+ error:{json_encoding_error, Error} ->
+ {error, {json_encoding_error, Error}}
+ end;
+ halt ->
+ {error, halt}
+ end.
+
+
+% Type and activity monitoring API
+
+set_type_timeout(#{jtx := true} = JTx, Type, Timeout) ->
+ #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+ Key = erlfdb_tuple:pack({?ACTIVITY_TIMEOUT, Type}, Jobs),
+ Val = erlfdb_tuple:pack({Timeout}),
+ erlfdb:set(Tx, Key, Val).
+
+
+clear_type_timeout(#{jtx := true} = JTx, Type) ->
+ #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+ Key = erlfdb_tuple:pack({?ACTIVITY_TIMEOUT, Type}, Jobs),
+ erlfdb:clear(Tx, Key).
+
+
+get_type_timeout(#{jtx := true} = JTx, Type) ->
+ #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+ Key = erlfdb_tuple:pack({?ACTIVITY_TIMEOUT, Type}, Jobs),
+ case erlfdb:wait(erlfdb:get_ss(Tx, Key)) of
+ not_found ->
+ not_found;
+ Val ->
+ {Timeout} = erlfdb_tuple:unpack(Val),
+ Timeout
+ end.
+
+
+get_types(#{jtx := true} = JTx) ->
+ #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+ Prefix = erlfdb_tuple:pack({?ACTIVITY_TIMEOUT}, Jobs),
+ Opts = [{streaming_mode, want_all}],
+ Result = erlfdb:wait(erlfdb:get_range_startswith(Tx, Prefix, Opts)),
+ lists:map(fun({K, _V}) ->
+ {Type} = erlfdb_tuple:unpack(K, Prefix),
+ Type
+ end, Result).
+
+
+get_activity_vs(#{jtx := true} = JTx, Type) ->
+ #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+ Key = erlfdb_tuple:pack({?WATCHES_ACTIVITY, Type}, Jobs),
+ case erlfdb:wait(erlfdb:get(Tx, Key)) of
+ not_found ->
+ not_found;
+ Val ->
+ {VS} = erlfdb_tuple:unpack(Val),
+ VS
+ end.
+
+
+get_activity_vs_and_watch(#{jtx := true} = JTx, Type) ->
+ #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+ Key = erlfdb_tuple:pack({?WATCHES_ACTIVITY, Type}, Jobs),
+ Future = erlfdb:get(Tx, Key),
+ Watch = erlfdb:watch(Tx, Key),
+ case erlfdb:wait(Future) of
+ not_found ->
+ {not_found, Watch};
+ Val ->
+ {VS} = erlfdb_tuple:unpack(Val),
+ {VS, Watch}
+ end.
+
+
+get_active_since(#{jtx := true} = JTx, Type, Versionstamp) ->
+ #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+ Prefix = erlfdb_tuple:pack({?ACTIVITY}, Jobs),
+ StartKey = erlfdb_tuple:pack({Type, Versionstamp}, Prefix),
+ StartKeySel = erlfdb_key:first_greater_or_equal(StartKey),
+ {_, EndKey} = erlfdb_tuple:range({Type}, Prefix),
+ Opts = [{streaming_mode, want_all}],
+ Future = erlfdb:get_range(Tx, StartKeySel, EndKey, Opts),
+ maps:from_list(lists:map(fun({_K, V}) ->
+ erlfdb_tuple:unpack(V)
+ end, erlfdb:wait(Future))).
+
+
+get_inactive_since(#{jtx := true} = JTx, Type, Versionstamp) ->
+ #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+ Prefix = erlfdb_tuple:pack({?ACTIVITY}, Jobs),
+ {StartKey, _} = erlfdb_tuple:range({Type}, Prefix),
+ EndKey = erlfdb_tuple:pack({Type, Versionstamp}, Prefix),
+ EndKeySel = erlfdb_key:first_greater_than(EndKey),
+ Opts = [{streaming_mode, want_all}],
+ Future = erlfdb:get_range(Tx, StartKey, EndKeySel, Opts),
+ lists:map(fun({_K, V}) ->
+ {JobId, _} = erlfdb_tuple:unpack(V),
+ JobId
+ end, erlfdb:wait(Future)).
+
+
+re_enqueue_inactive(#{jtx := true} = JTx, Type, JobIds) when is_list(JobIds) ->
+ #{tx := Tx} = get_jtx(JTx),
+ lists:foreach(fun(JobId) ->
+ case get_job_val(Tx, job_key(JTx, Type, JobId)) of
+ #jv{seq = Seq, stime = STime, data = Data} ->
+ clear_activity(JTx, Type, Seq),
+ maybe_enqueue(JTx, Type, JobId, STime, true, Data);
+ not_found ->
+ ok
+ end
+ end, JobIds),
+ case length(JobIds) > 0 of
+ true -> update_watch(JTx, Type);
+ false -> ok
+ end.
+
+
+% Cache initialization API. Called from the supervisor just to create the ETS
+% table. It returns `ignore` to tell supervisor it won't actually start any
+% process, which is what we want here.
+%
+init_cache() ->
+ ConcurrencyOpts = [{read_concurrency, true}, {write_concurrency, true}],
+ ets:new(?MODULE, [public, named_table] ++ ConcurrencyOpts),
+ ignore.
+
+
+% Functions to encode / decode JobData
+%
+encode_data(#{} = JobData) ->
+ try
+ jiffy:encode(JobData)
+ catch
+ throw:{error, Error} ->
+ % legacy clause since new versions of jiffy raise error instead
+ error({json_encoding_error, Error});
+ error:{error, Error} ->
+ error({json_encoding_error, Error})
+ end.
+
+
+decode_data(#{} = JobData) ->
+ JobData;
+
+decode_data(<<_/binary>> = JobData) ->
+ jiffy:decode(JobData, [return_maps]).
+
+
+% Cached job transaction object. This object wraps a transaction, caches the
+% directory lookup path, and the metadata version. The function can be used
+% from inside or outside the transaction. When used from a transaction it will
+% verify if the metadata was changed, and will refresh automatically.
+%
+get_jtx() ->
+ get_jtx(undefined).
+
+
+get_jtx(#{tx := Tx} = _TxDb) ->
+ get_jtx(Tx);
+
+get_jtx(undefined = _Tx) ->
+ case ets:lookup(?MODULE, ?JOBS_ETS_KEY) of
+ [{_, #{} = JTx}] ->
+ JTx;
+ [] ->
+ JTx = update_jtx_cache(init_jtx(undefined)),
+ JTx#{tx := undefined}
+ end;
+
+get_jtx({erlfdb_transaction, _} = Tx) ->
+ case ets:lookup(?MODULE, ?JOBS_ETS_KEY) of
+ [{_, #{} = JTx}] ->
+ ensure_current(JTx#{tx := Tx});
+ [] ->
+ update_jtx_cache(init_jtx(Tx))
+ end.
+
+
+% Transaction processing to be used with couch jobs' specific transaction
+% contexts
+%
+tx(#{jtx := true} = JTx, Fun) when is_function(Fun, 1) ->
+ fabric2_fdb:transactional(JTx, Fun).
+
+
+% Debug and testing API
+
+get_job(Type, JobId) ->
+ fabric2_fdb:transactional(fun(Tx) ->
+ JTx = init_jtx(Tx),
+ case get_job_val(Tx, job_key(JTx, Type, JobId)) of
+ #jv{seq = Seq, jlock = JLock} = JV ->
+ #{
+ job => true,
+ type => Type,
+ id => JobId,
+ seq => Seq,
+ jlock => JLock,
+ stime => JV#jv.stime,
+ resubmit => JV#jv.resubmit,
+ data => decode_data(JV#jv.data),
+ state => job_state(JLock, Seq)
+ };
+ not_found ->
+ not_found
+ end
+ end).
+
+
+get_jobs() ->
+ fabric2_fdb:transactional(fun(Tx) ->
+ #{jobs_path := Jobs} = init_jtx(Tx),
+ Prefix = erlfdb_tuple:pack({?DATA}, Jobs),
+ Opts = [{streaming_mode, want_all}],
+ Result = erlfdb:wait(erlfdb:get_range_startswith(Tx, Prefix, Opts)),
+ lists:map(fun({K, V}) ->
+ {Type, JobId} = erlfdb_tuple:unpack(K, Prefix),
+ {Seq, JLock, _, _, Data} = erlfdb_tuple:unpack(V),
+ JobState = job_state(JLock, Seq),
+ {Type, JobId, JobState, decode_data(Data)}
+ end, Result)
+ end).
+
+
+% Private helper functions
+
+maybe_enqueue(#{jtx := true} = JTx, Type, JobId, STime, Resubmit, Data) ->
+ #{tx := Tx} = JTx,
+ Key = job_key(JTx, Type, JobId),
+ JV = #jv{
+ seq = null,
+ jlock = null,
+ stime = STime,
+ resubmit = false,
+ data = Data
+ },
+ case Resubmit of
+ true ->
+ set_job_val(Tx, Key, JV#jv{seq = ?PENDING_SEQ}),
+ couch_jobs_pending:enqueue(JTx, Type, STime, JobId);
+ false ->
+ set_job_val(Tx, Key, JV)
+ end,
+ ok.
+
+
+job_key(#{jtx := true, jobs_path := Jobs}, Type, JobId) ->
+ erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs).
+
+
+job_key(JTx, #{type := Type, id := JobId}) ->
+ job_key(JTx, Type, JobId).
+
+
+get_job_val(#{jtx := true, tx := Tx} = JTx, #{job := true} = Job) ->
+ get_job_val(Tx, job_key(JTx, Job));
+
+get_job_val(Tx = {erlfdb_transaction, _}, Key) ->
+ case erlfdb:wait(erlfdb:get(Tx, Key)) of
+ <<_/binary>> = Val ->
+ {Seq, JLock, STime, Resubmit, Data} = erlfdb_tuple:unpack(Val),
+ #jv{
+ seq = Seq,
+ jlock = JLock,
+ stime = STime,
+ resubmit = Resubmit,
+ data = Data
+ };
+ not_found ->
+ not_found
+ end.
+
+
+set_job_val(Tx = {erlfdb_transaction, _}, Key, #jv{} = JV) ->
+ #jv{
+ seq = Seq,
+ jlock = JLock,
+ stime = STime,
+ resubmit = Resubmit,
+ data = Data0
+ } = JV,
+ Data = case Data0 of
+ #{} -> encode_data(Data0);
+ <<_/binary>> -> Data0
+ end,
+ case Seq of
+ ?UNSET_VS ->
+ Val = erlfdb_tuple:pack_vs({Seq, JLock, STime, Resubmit, Data}),
+ erlfdb:set_versionstamped_value(Tx, Key, Val);
+ _Other ->
+ Val = erlfdb_tuple:pack({Seq, JLock, STime, Resubmit, Data}),
+ erlfdb:set(Tx, Key, Val)
+ end,
+ ok.
+
+
+get_job_or_halt(Tx, Key, JLock) ->
+ case get_job_val(Tx, Key) of
+ #jv{jlock = CurJLock} when CurJLock =/= JLock ->
+ halt;
+ #jv{} = Res ->
+ Res;
+ not_found ->
+ halt
+ end.
+
+
+update_activity(#{jtx := true} = JTx, Type, JobId, Seq, Data0) ->
+ #{tx := Tx, jobs_path := Jobs} = JTx,
+ case Seq =/= null of
+ true -> clear_activity(JTx, Type, Seq);
+ false -> ok
+ end,
+ Key = erlfdb_tuple:pack_vs({?ACTIVITY, Type, ?UNSET_VS}, Jobs),
+ Data = case Data0 of
+ #{} -> encode_data(Data0);
+ <<_/binary>> -> Data0
+ end,
+ Val = erlfdb_tuple:pack({JobId, Data}),
+ erlfdb:set_versionstamped_key(Tx, Key, Val),
+ update_watch(JTx, Type).
+
+
+clear_activity(#{jtx := true} = JTx, Type, Seq) ->
+ #{tx := Tx, jobs_path := Jobs} = JTx,
+ Key = erlfdb_tuple:pack({?ACTIVITY, Type, Seq}, Jobs),
+ erlfdb:clear(Tx, Key).
+
+
+update_watch(#{jtx := true} = JTx, Type) ->
+ #{tx := Tx, jobs_path := Jobs} = JTx,
+ Key = erlfdb_tuple:pack({?WATCHES_ACTIVITY, Type}, Jobs),
+ Val = erlfdb_tuple:pack_vs({?UNSET_VS}),
+ erlfdb:set_versionstamped_value(Tx, Key, Val),
+ ok.
+
+
+job_state(JLock, Seq) ->
+ case {JLock, Seq} of
+ {null, null} -> finished;
+ {JLock, _} when JLock =/= null -> running;
+ {null, Seq} when Seq =/= null -> pending
+ end.
+
+
+% This a transaction context object similar to the Db = #{} one from
+% fabric2_fdb. It's is used to cache the jobs path directory (to avoid extra
+% lookups on every operation) and to check for metadata changes (in case
+% directory changes).
+%
+init_jtx(undefined) ->
+ fabric2_fdb:transactional(fun(Tx) -> init_jtx(Tx) end);
+
+init_jtx({erlfdb_transaction, _} = Tx) ->
+ Root = erlfdb_directory:root(),
+ CouchDB = erlfdb_directory:create_or_open(Tx, Root, [<<"couchdb">>]),
+ LayerPrefix = erlfdb_directory:get_name(CouchDB),
+ Jobs = erlfdb_tuple:pack({?JOBS}, LayerPrefix),
+ Version = erlfdb:wait(erlfdb:get(Tx, ?METADATA_VERSION_KEY)),
+ % layer_prefix, md_version and tx here match db map fields in fabric2_fdb
+ % but we also assert that this is a job transaction using the jtx => true
+ % field
+ #{
+ jtx => true,
+ tx => Tx,
+ layer_prefix => LayerPrefix,
+ jobs_path => Jobs,
+ md_version => Version
+ }.
+
+
+ensure_current(#{jtx := true, tx := Tx} = JTx) ->
+ case get(?COUCH_JOBS_CURRENT) of
+ Tx ->
+ JTx;
+ _ ->
+ JTx1 = update_current(JTx),
+ put(?COUCH_JOBS_CURRENT, Tx),
+ JTx1
+ end.
+
+
+update_current(#{tx := Tx, md_version := Version} = JTx) ->
+ case get_md_version_age(Version) of
+ Age when Age =< ?MD_VERSION_MAX_AGE_SEC ->
+ % Looked it up not too long ago. Avoid looking it up to frequently
+ JTx;
+ _ ->
+ case erlfdb:wait(erlfdb:get(Tx, ?METADATA_VERSION_KEY)) of
+ Version ->
+ update_md_version_timestamp(Version),
+ JTx;
+ _NewVersion ->
+ update_jtx_cache(init_jtx(Tx))
+ end
+ end.
+
+
+update_jtx_cache(#{jtx := true, md_version := Version} = JTx) ->
+ CachedJTx = JTx#{tx := undefined},
+ ets:insert(?MODULE, {?JOBS_ETS_KEY, CachedJTx}),
+ update_md_version_timestamp(Version),
+ JTx.
+
+
+get_md_version_age(Version) ->
+ Timestamp = case ets:lookup(?MODULE, ?MD_TIMESTAMP_ETS_KEY) of
+ [{_, Version, Ts}] -> Ts;
+ _ -> 0
+ end,
+ erlang:system_time(second) - Timestamp.
+
+
+update_md_version_timestamp(Version) ->
+ Ts = erlang:system_time(second),
+ ets:insert(?MODULE, {?MD_TIMESTAMP_ETS_KEY, Version, Ts}).
diff --git a/src/couch_jobs/src/couch_jobs_notifier.erl b/src/couch_jobs/src/couch_jobs_notifier.erl
new file mode 100644
index 000000000..1c554a0c0
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_notifier.erl
@@ -0,0 +1,285 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_notifier).
+
+-behaviour(gen_server).
+
+
+-export([
+ start_link/1,
+ subscribe/4,
+ unsubscribe/2
+]).
+
+-export([
+ init/1,
+ terminate/2,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ code_change/3
+]).
+
+
+-include("couch_jobs.hrl").
+
+
+-define(TYPE_MONITOR_HOLDOFF_DEFAULT, 50).
+-define(TYPE_MONITOR_TIMEOUT_DEFAULT, "infinity").
+-define(GET_JOBS_RANGE_RATIO, 0.5).
+
+
+-record(st, {
+ jtx,
+ type,
+ monitor_pid,
+ subs, % #{JobId => #{Ref => {Pid, State, Seq}}}
+ pidmap, % #{{Jobid, Pid} => Ref}
+ refmap % #{Ref => JobId}
+}).
+
+
+start_link(Type) ->
+ gen_server:start_link(?MODULE, [Type], []).
+
+
+subscribe(Type, JobId, State, Seq) ->
+ case couch_jobs_server:get_notifier_server(Type) of
+ {ok, Server} ->
+ CallArgs = {subscribe, JobId, State, Seq, self()},
+ Ref = gen_server:call(Server, CallArgs, infinity),
+ {ok, {Server, Ref}};
+ {error, Error} ->
+ {error, Error}
+ end.
+
+
+unsubscribe(Server, Ref) when is_reference(Ref) ->
+ gen_server:call(Server, {unsubscribe, Ref, self()}, infinity).
+
+
+init([Type]) ->
+ JTx = couch_jobs_fdb:get_jtx(),
+ St = #st{
+ jtx = JTx,
+ type = Type,
+ subs = #{},
+ pidmap = #{},
+ refmap = #{}
+ },
+ VS = get_type_vs(St),
+ HoldOff = get_holdoff(),
+ Timeout = get_timeout(),
+ Pid = couch_jobs_type_monitor:start(Type, VS, HoldOff, Timeout),
+ {ok, St#st{monitor_pid = Pid}}.
+
+
+terminate(_, _St) ->
+ ok.
+
+
+handle_call({subscribe, JobId, State, Seq, Pid}, _From, #st{} = St) ->
+ #st{pidmap = PidMap, refmap = RefMap} = St,
+ case maps:get({JobId, Pid}, PidMap, not_found) of
+ not_found ->
+ Ref = erlang:monitor(process, Pid),
+ St1 = update_sub(JobId, Ref, Pid, State, Seq, St),
+ St2 = St1#st{pidmap = PidMap#{{JobId, Pid} => Ref}},
+ St3 = St2#st{refmap = RefMap#{Ref => JobId}},
+ {reply, Ref, St3};
+ Ref when is_reference(Ref) ->
+ St1 = update_sub(JobId, Ref, Pid, State, Seq, St),
+ {reply, Ref, St1}
+ end;
+
+handle_call({unsubscribe, Ref, Pid}, _From, #st{} = St) ->
+ {reply, ok, unsubscribe_int(Ref, Pid, St)};
+
+handle_call(Msg, _From, St) ->
+ {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
+
+
+handle_cast(Msg, St) ->
+ {stop, {bad_cast, Msg}, St}.
+
+
+handle_info({type_updated, VS}, St) ->
+ VSMax = flush_type_updated_messages(VS),
+ {noreply, notify_subscribers(VSMax, St)};
+
+handle_info({Ref, ready}, St) when is_reference(Ref) ->
+ % Don't crash out couch_jobs_server and the whole application would need to
+ % eventually do proper cleanup in erlfdb:wait timeout code.
+ LogMsg = "~p : spurious erlfdb future ready message ~p",
+ couch_log:error(LogMsg, [?MODULE, Ref]),
+ {noreply, St};
+
+handle_info({'DOWN', Ref, process, Pid, _}, #st{} = St) ->
+ {noreply, unsubscribe_int(Ref, Pid, St)};
+
+handle_info(Msg, St) ->
+ {stop, {bad_info, Msg}, St}.
+
+
+code_change(_OldVsn, St, _Extra) ->
+ {ok, St}.
+
+
+update_subs(JobId, Refs, #st{subs = Subs} = St) when map_size(Refs) =:= 0 ->
+ St#st{subs = maps:remove(JobId, Subs)};
+
+update_subs(JobId, Refs, #st{subs = Subs} = St) when map_size(Refs) > 0 ->
+ St#st{subs = Subs#{JobId => Refs}}.
+
+
+update_sub(JobId, Ref, Pid, State, Seq, #st{subs = Subs} = St) ->
+ Refs = maps:get(JobId, Subs, #{}),
+ update_subs(JobId, Refs#{Ref => {Pid, State, Seq}}, St).
+
+
+remove_sub(JobId, Ref, #st{subs = Subs} = St) ->
+ case maps:get(JobId, Subs, not_found) of
+ not_found -> St;
+ #{} = Refs -> update_subs(JobId, maps:remove(Ref, Refs), St)
+ end.
+
+
+unsubscribe_int(Id, Ref, Pid, #st{pidmap = PidMap, refmap = RefMap} = St) ->
+ St1 = remove_sub(Id, Ref, St),
+ erlang:demonitor(Ref, [flush]),
+ St1#st{
+ pidmap = maps:remove({Id, Pid}, PidMap),
+ refmap = maps:remove(Ref, RefMap)
+ }.
+
+
+unsubscribe_int(Ref, Pid, #st{refmap = RefMap} = St) ->
+ case maps:get(Ref, RefMap, not_found) of
+ not_found -> St;
+ Id -> unsubscribe_int(Id, Ref, Pid, St)
+ end.
+
+
+flush_type_updated_messages(VSMax) ->
+ receive
+ {type_updated, VS} ->
+ flush_type_updated_messages(max(VS, VSMax))
+ after
+ 0 -> VSMax
+ end.
+
+
+get_jobs(#st{jtx = JTx, type = Type}, InactiveIdMap, Ratio)
+ when Ratio >= ?GET_JOBS_RANGE_RATIO ->
+ Filter = fun(JobId) -> maps:is_key(JobId, InactiveIdMap) end,
+ JobMap = couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+ couch_jobs_fdb:get_jobs(JTx1, Type, Filter)
+ end),
+ maps:map(fun(JobId, _) ->
+ case maps:is_key(JobId, JobMap) of
+ true -> maps:get(JobId, JobMap);
+ false -> {null, not_found, not_found}
+ end
+ end, InactiveIdMap);
+
+get_jobs(#st{jtx = JTx, type = Type}, InactiveIdMap, _) ->
+ couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+ maps:map(fun(JobId, _) ->
+ Job = #{job => true, type => Type, id => JobId},
+ case couch_jobs_fdb:get_job_state_and_data(JTx1, Job) of
+ {ok, Seq, State, Data} ->
+ {Seq, State, Data};
+ {error, not_found} ->
+ {null, not_found, not_found}
+ end
+ end, InactiveIdMap)
+ end).
+
+
+get_type_vs(#st{jtx = JTx, type = Type}) ->
+ couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+ couch_jobs_fdb:get_activity_vs(JTx1, Type)
+ end).
+
+
+% "Active since" is the set of jobs that have been active (running)
+% and updated at least once since the given versionstamp. These are relatively
+% cheap to find as it's just a range read in the ?ACTIVITY subspace.
+%
+get_active_since(#st{} = _St, not_found) ->
+ #{};
+
+get_active_since(#st{jtx = JTx, type = Type, subs = Subs}, VS) ->
+ AllUpdated = couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+ couch_jobs_fdb:get_active_since(JTx1, Type, VS)
+ end),
+ maps:map(fun(_JobId, Data) ->
+ {VS, running, Data}
+ end, maps:with(maps:keys(Subs), AllUpdated)).
+
+
+notify_subscribers(_, #st{subs = Subs} = St) when map_size(Subs) =:= 0 ->
+ St;
+
+notify_subscribers(ActiveVS, #st{} = St1) ->
+ % First gather the easy (cheap) active jobs. Then with those out of way
+ % inspect each job to get its state.
+ Active = get_active_since(St1, ActiveVS),
+ St2 = notify_job_ids(Active, St1),
+ ActiveIds = maps:keys(Active),
+ Subs = St2#st.subs,
+ InactiveIdMap = maps:without(ActiveIds, Subs),
+ InactiveRatio = maps:size(InactiveIdMap) / maps:size(Subs),
+ Inactive = get_jobs(St2, InactiveIdMap, InactiveRatio),
+ notify_job_ids(Inactive, St2).
+
+
+notify_job_ids(#{} = Jobs, #st{type = Type} = St0) ->
+ maps:fold(fun(Id, {VS, State, Data}, #st{} = StAcc) ->
+ DoUnsub = lists:member(State, [finished, not_found]),
+ maps:fold(fun
+ (_Ref, {_Pid, running, OldVS}, St) when State =:= running,
+ OldVS >= VS ->
+ St;
+ (Ref, {Pid, running, OldVS}, St) when State =:= running,
+ OldVS < VS ->
+ % For running state send updates even if state doesn't change
+ notify(Pid, Ref, Type, Id, State, Data),
+ update_sub(Id, Ref, Pid, running, VS, St);
+ (_Ref, {_Pid, OldState, _VS}, St) when OldState =:= State ->
+ St;
+ (Ref, {Pid, _State, _VS}, St) ->
+ notify(Pid, Ref, Type, Id, State, Data),
+ case DoUnsub of
+ true -> unsubscribe_int(Id, Ref, Pid, St);
+ false -> update_sub(Id, Ref, Pid, State, VS, St)
+ end
+ end, StAcc, maps:get(Id, StAcc#st.subs, #{}))
+ end, St0, Jobs).
+
+
+notify(Pid, Ref, Type, Id, State, Data) ->
+ Pid ! {?COUCH_JOBS_EVENT, Ref, Type, Id, State, Data}.
+
+
+get_holdoff() ->
+ config:get_integer("couch_jobs", "type_monitor_holdoff_msec",
+ ?TYPE_MONITOR_HOLDOFF_DEFAULT).
+
+
+get_timeout() ->
+ Default = ?TYPE_MONITOR_TIMEOUT_DEFAULT,
+ case config:get("couch_jobs", "type_monitor_timeout_msec", Default) of
+ "infinity" -> infinity;
+ Milliseconds -> list_to_integer(Milliseconds)
+ end.
diff --git a/src/couch_jobs/src/couch_jobs_notifier_sup.erl b/src/couch_jobs/src/couch_jobs_notifier_sup.erl
new file mode 100644
index 000000000..81d93493b
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_notifier_sup.erl
@@ -0,0 +1,64 @@
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_notifier_sup).
+
+
+-behaviour(supervisor).
+
+
+-export([
+ start_link/0,
+
+ start_notifier/1,
+ stop_notifier/1,
+ get_child_pids/0
+]).
+
+-export([
+ init/1
+]).
+
+
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+
+start_notifier(Type) ->
+ supervisor:start_child(?MODULE, [Type]).
+
+
+stop_notifier(Pid) ->
+ supervisor:terminate_child(?MODULE, Pid).
+
+
+get_child_pids() ->
+ lists:map(fun({_Id, Pid, _Type, _Mod}) ->
+ Pid
+ end, supervisor:which_children(?MODULE)).
+
+
+init(_) ->
+ Flags = #{
+ strategy => simple_one_for_one,
+ intensity => 10,
+ period => 3
+ },
+ Children = [
+ #{
+ id => couch_jobs_notifier,
+ restart => temporary,
+ start => {couch_jobs_notifier, start_link, []}
+ }
+ ],
+ {ok, {Flags, Children}}.
diff --git a/src/couch_jobs/src/couch_jobs_pending.erl b/src/couch_jobs/src/couch_jobs_pending.erl
new file mode 100644
index 000000000..ab53c59d1
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_pending.erl
@@ -0,0 +1,143 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_pending).
+
+
+-export([
+ enqueue/4,
+ dequeue/4,
+ remove/4
+]).
+
+
+-include("couch_jobs.hrl").
+
+
+-define(RANGE_LIMIT, 1024).
+
+
+enqueue(#{jtx := true} = JTx, Type, STime, JobId) ->
+ #{tx := Tx, jobs_path := Jobs} = JTx,
+ Key = erlfdb_tuple:pack({?PENDING, Type, STime, JobId}, Jobs),
+ erlfdb:set(Tx, Key, <<>>),
+ WatchKey = erlfdb_tuple:pack({?WATCHES_PENDING, Type}, Jobs),
+ erlfdb:add(Tx, WatchKey, 1),
+ ok.
+
+
+dequeue(#{jtx := true} = JTx, Type, _, true) ->
+ #{tx := Tx, jobs_path := Jobs} = JTx,
+ Prefix = erlfdb_tuple:pack({?PENDING, Type, 0}, Jobs),
+ case get_random_item(Tx, Prefix) of
+ {error, not_found} ->
+ {not_found, get_pending_watch(JTx, Type)};
+ {ok, PendingKey} ->
+ erlfdb:clear(Tx, PendingKey),
+ {JobId} = erlfdb_tuple:unpack(PendingKey, Prefix),
+ {ok, JobId}
+ end;
+
+dequeue(#{jtx := true} = JTx, Type, MaxPriority, _) ->
+ #{tx := Tx, jobs_path := Jobs} = JTx,
+ Prefix = erlfdb_tuple:pack({?PENDING, Type}, Jobs),
+ StartKeySel = erlfdb_key:first_greater_than(Prefix),
+ End = erlfdb_tuple:pack({MaxPriority, <<16#FF>>}, Prefix),
+ EndKeySel = erlfdb_key:first_greater_or_equal(End),
+ case clear_random_key_from_range(Tx, StartKeySel, EndKeySel) of
+ {error, not_found} ->
+ {not_found, get_pending_watch(JTx, Type)};
+ {ok, PendingKey} ->
+ {_, JobId} = erlfdb_tuple:unpack(PendingKey, Prefix),
+ {ok, JobId}
+ end.
+
+
+remove(#{jtx := true} = JTx, Type, JobId, STime) ->
+ #{tx := Tx, jobs_path := Jobs} = JTx,
+ Key = erlfdb_tuple:pack({?PENDING, Type, STime, JobId}, Jobs),
+ erlfdb:clear(Tx, Key).
+
+
+%% Private functions
+
+
+% Pick a random item from the range without reading the keys in first. However
+% the constraint it that IDs should looks like random UUIDs
+get_random_item(Tx, Prefix) ->
+ Id = fabric2_util:uuid(),
+ Snapshot = erlfdb:snapshot(Tx),
+ % Try to be fair and switch evently between trying ids before or after the
+ % randomly generated one. Otherwise, trying before first, will leave a lot
+ % of <<"fff...">> IDs in the queue for too long and trying "after" first
+ % will leave a lot of <"00...">> ones waiting.
+ case rand:uniform() > 0.5 of
+ true ->
+ case get_after(Snapshot, Prefix, Id) of
+ {error, not_found} -> get_before(Snapshot, Prefix, Id);
+ {ok, Key} -> {ok, Key}
+ end;
+ false ->
+ case get_before(Snapshot, Prefix, Id) of
+ {error, not_found} -> get_after(Snapshot, Prefix, Id);
+ {ok, Key} -> {ok, Key}
+ end
+ end.
+
+
+get_before(Snapshot, Prefix, Id) ->
+ KSel = erlfdb_key:last_less_or_equal(erlfdb_tuple:pack({Id}, Prefix)),
+ PrefixSize = byte_size(Prefix),
+ case erlfdb:wait(erlfdb:get_key(Snapshot, KSel)) of
+ <<Prefix:PrefixSize/binary, _/binary>> = Key -> {ok, Key};
+ _ -> {error, not_found}
+ end.
+
+
+get_after(Snapshot, Prefix, Id) ->
+ KSel = erlfdb_key:first_greater_or_equal(erlfdb_tuple:pack({Id}, Prefix)),
+ PrefixSize = byte_size(Prefix),
+ case erlfdb:wait(erlfdb:get_key(Snapshot, KSel)) of
+ <<Prefix:PrefixSize/binary, _/binary>> = Key -> {ok, Key};
+ _ -> {error, not_found}
+ end.
+
+
+% Pick a random key from the range snapshot. Then radomly pick a key to clear.
+% Before clearing, ensure there is a read conflict on the key in in case other
+% workers have picked the same key.
+%
+clear_random_key_from_range(Tx, Start, End) ->
+ Opts = [
+ {limit, ?RANGE_LIMIT},
+ {snapshot, true}
+ ],
+ case erlfdb:wait(erlfdb:get_range(Tx, Start, End, Opts)) of
+ [] ->
+ {error, not_found};
+ [{Key, _}] ->
+ erlfdb:add_read_conflict_key(Tx, Key),
+ erlfdb:clear(Tx, Key),
+ {ok, Key};
+ [{_, _} | _] = KVs ->
+ Index = rand:uniform(length(KVs)),
+ {Key, _} = lists:nth(Index, KVs),
+ erlfdb:add_read_conflict_key(Tx, Key),
+ erlfdb:clear(Tx, Key),
+ {ok, Key}
+ end.
+
+
+get_pending_watch(#{jtx := true} = JTx, Type) ->
+ #{tx := Tx, jobs_path := Jobs} = couch_jobs_fdb:get_jtx(JTx),
+ Key = erlfdb_tuple:pack({?WATCHES_PENDING, Type}, Jobs),
+ erlfdb:watch(Tx, Key).
diff --git a/src/couch_jobs/src/couch_jobs_server.erl b/src/couch_jobs/src/couch_jobs_server.erl
new file mode 100644
index 000000000..2e03c7dcf
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_server.erl
@@ -0,0 +1,193 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_server).
+
+-behaviour(gen_server).
+
+
+-export([
+ start_link/0,
+ get_notifier_server/1,
+ force_check_types/0
+]).
+
+-export([
+ init/1,
+ terminate/2,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ code_change/3
+]).
+
+
+-define(TYPE_CHECK_PERIOD_DEFAULT, 15000).
+-define(MAX_JITTER_DEFAULT, 5000).
+
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, nil, []).
+
+
+get_notifier_server(Type) ->
+ case get_type_pid_refs(Type) of
+ {{_, _}, {NotifierPid, _}} ->
+ {ok, NotifierPid};
+ not_found ->
+ force_check_types(),
+ case get_type_pid_refs(Type) of
+ {{_, _}, {NotifierPid, _}} ->
+ {ok, NotifierPid};
+ not_found ->
+ {error, not_found}
+ end
+ end.
+
+
+force_check_types() ->
+ gen_server:call(?MODULE, check_types, infinity).
+
+
+init(_) ->
+ % If couch_jobs_server is after the notifiers and activity supervisor. If
+ % it restart, there could be some stale notifier or activity monitors. Kill
+ % those as later on we'd start new ones anyway.
+ reset_monitors(),
+ reset_notifiers(),
+ ets:new(?MODULE, [protected, named_table]),
+ check_types(),
+ schedule_check(),
+ {ok, nil}.
+
+
+terminate(_, _St) ->
+ ok.
+
+
+handle_call(check_types, _From, St) ->
+ check_types(),
+ {reply, ok, St};
+
+handle_call(Msg, _From, St) ->
+ {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
+
+
+handle_cast(Msg, St) ->
+ {stop, {bad_cast, Msg}, St}.
+
+
+handle_info(check_types, St) ->
+ check_types(),
+ schedule_check(),
+ {noreply, St};
+
+handle_info({'DOWN', _Ref, process, Pid, Reason}, St) ->
+ LogMsg = "~p : process ~p exited with ~p",
+ couch_log:error(LogMsg, [?MODULE, Pid, Reason]),
+ {stop, {unexpected_process_exit, Pid, Reason}, St};
+
+handle_info({Ref, ready}, St) when is_reference(Ref) ->
+ % Don't crash out couch_jobs_server and the whole application would need to
+ % eventually do proper cleanup in erlfdb:wait timeout code.
+ LogMsg = "~p : spurious erlfdb future ready message ~p",
+ couch_log:error(LogMsg, [?MODULE, Ref]),
+ {noreply, St};
+
+handle_info(Msg, St) ->
+ {stop, {bad_info, Msg}, St}.
+
+
+code_change(_OldVsn, St, _Extra) ->
+ {ok, St}.
+
+
+check_types() ->
+ FdbTypes = fdb_types(),
+ EtsTypes = ets_types(),
+ ToStart = FdbTypes -- EtsTypes,
+ ToStop = EtsTypes -- FdbTypes,
+ lists:foreach(fun(Type) -> start_monitors(Type) end, ToStart),
+ lists:foreach(fun(Type) -> stop_monitors(Type) end, ToStop).
+
+
+start_monitors(Type) ->
+ MonPidRef = case couch_jobs_activity_monitor_sup:start_monitor(Type) of
+ {ok, Pid1} -> {Pid1, monitor(process, Pid1)};
+ {error, Error1} -> error({failed_to_start_monitor, Type, Error1})
+ end,
+ NotifierPidRef = case couch_jobs_notifier_sup:start_notifier(Type) of
+ {ok, Pid2} -> {Pid2, monitor(process, Pid2)};
+ {error, Error2} -> error({failed_to_start_notifier, Type, Error2})
+ end,
+ ets:insert_new(?MODULE, {Type, MonPidRef, NotifierPidRef}).
+
+
+stop_monitors(Type) ->
+ {{MonPid, MonRef}, {NotifierPid, NotifierRef}} = get_type_pid_refs(Type),
+ ok = couch_jobs_activity_monitor_sup:stop_monitor(MonPid),
+ demonitor(MonRef, [flush]),
+ ok = couch_jobs_notifier_sup:stop_notifier(NotifierPid),
+ demonitor(NotifierRef, [flush]),
+ ets:delete(?MODULE, Type).
+
+
+reset_monitors() ->
+ lists:foreach(fun(Pid) ->
+ couch_jobs_activity_monitor_sup:stop_monitor(Pid)
+ end, couch_jobs_activity_monitor_sup:get_child_pids()).
+
+
+reset_notifiers() ->
+ lists:foreach(fun(Pid) ->
+ couch_jobs_notifier_sup:stop_notifier(Pid)
+ end, couch_jobs_notifier_sup:get_child_pids()).
+
+
+get_type_pid_refs(Type) ->
+ case ets:lookup(?MODULE, Type) of
+ [{_, MonPidRef, NotifierPidRef}] -> {MonPidRef, NotifierPidRef};
+ [] -> not_found
+ end.
+
+
+ets_types() ->
+ lists:flatten(ets:match(?MODULE, {'$1', '_', '_'})).
+
+
+fdb_types() ->
+ try
+ couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+ couch_jobs_fdb:get_types(JTx)
+ end)
+ catch
+ error:{timeout, _} ->
+ couch_log:warning("~p : Timed out connecting to FDB", [?MODULE]),
+ []
+ end.
+
+
+schedule_check() ->
+ Timeout = get_period_msec(),
+ MaxJitter = max(Timeout div 2, get_max_jitter_msec()),
+ Wait = Timeout + rand:uniform(max(1, MaxJitter)),
+ erlang:send_after(Wait, self(), check_types).
+
+
+get_period_msec() ->
+ config:get_integer("couch_jobs", "type_check_period_msec",
+ ?TYPE_CHECK_PERIOD_DEFAULT).
+
+
+get_max_jitter_msec() ->
+ config:get_integer("couch_jobs", "type_check_max_jitter_msec",
+ ?MAX_JITTER_DEFAULT).
diff --git a/src/couch_jobs/src/couch_jobs_sup.erl b/src/couch_jobs/src/couch_jobs_sup.erl
new file mode 100644
index 000000000..d79023777
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_sup.erl
@@ -0,0 +1,66 @@
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_sup).
+
+
+-behaviour(supervisor).
+
+
+-export([
+ start_link/0
+]).
+
+-export([
+ init/1
+]).
+
+
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+
+init([]) ->
+ Flags = #{
+ strategy => rest_for_one,
+ intensity => 3,
+ period => 10
+ },
+ Children = [
+ #{
+ id => couch_jobs_fdb,
+ restart => transient,
+ start => {couch_jobs_fdb, init_cache, []}
+ },
+ #{
+ id => couch_jobs_activity_monitor_sup,
+ restart => permanent,
+ shutdown => brutal_kill,
+ type => supervisor,
+ start => {couch_jobs_activity_monitor_sup, start_link, []}
+ },
+ #{
+ id => couch_jobs_notifier_sup,
+ restart => permanent,
+ shutdown => brutal_kill,
+ type => supervisor,
+ start => {couch_jobs_notifier_sup, start_link, []}
+ },
+ #{
+ id => couch_jobs_server,
+ restart => permanent,
+ shutdown => brutal_kill,
+ start => {couch_jobs_server, start_link, []}
+ }
+ ],
+ {ok, {Flags, Children}}.
diff --git a/src/couch_jobs/src/couch_jobs_type_monitor.erl b/src/couch_jobs/src/couch_jobs_type_monitor.erl
new file mode 100644
index 000000000..562a866da
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_type_monitor.erl
@@ -0,0 +1,84 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_type_monitor).
+
+
+-export([
+ start/4
+]).
+
+
+-include("couch_jobs.hrl").
+
+
+-record(st, {
+ jtx,
+ type,
+ vs,
+ parent,
+ timestamp,
+ holdoff,
+ timeout
+}).
+
+
+start(Type, VS, HoldOff, Timeout) ->
+ Parent = self(),
+ spawn_link(fun() ->
+ loop(#st{
+ jtx = couch_jobs_fdb:get_jtx(),
+ type = Type,
+ vs = VS,
+ parent = Parent,
+ timestamp = 0,
+ holdoff = HoldOff,
+ timeout = Timeout
+ })
+ end).
+
+
+loop(#st{vs = VS, timeout = Timeout} = St) ->
+ {St1, Watch} = case get_vs_and_watch(St) of
+ {VS1, W} when VS1 =/= VS -> {notify(St#st{vs = VS1}), W};
+ {VS, W} -> {St, W}
+ end,
+ try
+ erlfdb:wait(Watch, [{timeout, Timeout}])
+ catch
+ error:{erlfdb_error, 1009} ->
+ erlfdb:cancel(Watch, [flush]),
+ ok;
+ error:{timeout, _} ->
+ erlfdb:cancel(Watch, [flush]),
+ ok
+ end,
+ loop(St1).
+
+
+notify(#st{} = St) ->
+ #st{holdoff = HoldOff, parent = Pid, timestamp = Ts, vs = VS} = St,
+ Now = erlang:system_time(millisecond),
+ case Now - Ts of
+ Dt when Dt < HoldOff ->
+ timer:sleep(max(HoldOff - Dt, 0));
+ _ ->
+ ok
+ end,
+ Pid ! {type_updated, VS},
+ St#st{timestamp = Now}.
+
+
+get_vs_and_watch(#st{jtx = JTx, type = Type}) ->
+ couch_jobs_fdb:tx(JTx, fun(JTx1) ->
+ couch_jobs_fdb:get_activity_vs_and_watch(JTx1, Type)
+ end).
diff --git a/src/couch_jobs/test/couch_jobs_tests.erl b/src/couch_jobs/test/couch_jobs_tests.erl
new file mode 100644
index 000000000..a7e085e40
--- /dev/null
+++ b/src/couch_jobs/test/couch_jobs_tests.erl
@@ -0,0 +1,606 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_tests).
+
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+
+% Job creation API can take an undefined Tx object
+% in that case it will start its own transaction
+-define(TX, undefined).
+
+
+couch_jobs_basic_test_() ->
+ {
+ "Test couch jobs basics",
+ {
+ setup,
+ fun setup_couch/0, fun teardown_couch/1,
+ {
+ foreach,
+ fun setup/0, fun teardown/1,
+ [
+ fun add_remove_pending/1,
+ fun add_remove_errors/1,
+ fun get_job_data_and_state/1,
+ fun resubmit_as_job_creator/1,
+ fun type_timeouts_and_server/1,
+ fun dead_notifier_restarts_jobs_server/1,
+ fun bad_messages_restart_couch_jobs_server/1,
+ fun bad_messages_restart_notifier/1,
+ fun bad_messages_restart_activity_monitor/1,
+ fun basic_accept_and_finish/1,
+ fun accept_blocking/1,
+ fun job_processor_update/1,
+ fun resubmit_enqueues_job/1,
+ fun resubmit_custom_schedtime/1,
+ fun accept_max_schedtime/1,
+ fun accept_no_schedule/1,
+ fun subscribe/1,
+ fun subscribe_wait_multiple/1,
+ fun enqueue_inactive/1,
+ fun remove_running_job/1,
+ fun check_get_jobs/1,
+ fun use_fabric_transaction_object/1
+ ]
+ }
+ }
+ }.
+
+
+setup_couch() ->
+ test_util:start_couch([fabric]).
+
+
+teardown_couch(Ctx) ->
+ test_util:stop_couch(Ctx),
+ meck:unload().
+
+
+setup() ->
+ application:start(couch_jobs),
+ clear_jobs(),
+ T1 = {<<"t1">>, 1024}, % a complex type should work
+ T2 = 42, % a number should work as well
+ T1Timeout = 2,
+ T2Timeout = 3,
+ couch_jobs:set_type_timeout(T1, T1Timeout),
+ couch_jobs:set_type_timeout(T2, T2Timeout),
+ #{
+ t1 => T1,
+ t2 => T2,
+ t1_timeout => T1Timeout,
+ j1 => <<"j1">>,
+ j2 => <<"j2">>,
+ dbname => ?tempdb()
+ }.
+
+
+teardown(#{dbname := DbName}) ->
+ clear_jobs(),
+ application:stop(couch_jobs),
+ AllDbs = fabric2_db:list_dbs(),
+ case lists:member(DbName, AllDbs) of
+ true -> ok = fabric2_db:delete(DbName, []);
+ false -> ok
+ end,
+ meck:unload().
+
+
+clear_jobs() ->
+ couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+ #{jobs_path := Jobs, tx := Tx} = JTx,
+ erlfdb:clear_range_startswith(Tx, Jobs)
+ end).
+
+
+restart_app() ->
+ application:stop(couch_jobs),
+ application:start(couch_jobs),
+ couch_jobs_server:force_check_types().
+
+
+get_job(Type, JobId) ->
+ couch_jobs_fdb:get_job(Type, JobId).
+
+
+add_remove_pending(#{t1 := T1, j1 := J1, t2 := T2, j2 := J2}) ->
+ ?_test(begin
+ ?assertEqual(ok, couch_jobs:add(?TX, T1, J1, #{})),
+ ?assertMatch(#{state := pending, data := #{}}, get_job(T1, J1)),
+ ?assertEqual(ok, couch_jobs:remove(?TX, T1, J1)),
+ % Data and numeric type should work as well. Also do it in a
+ % transaction
+ Data = #{<<"x">> => 42},
+ ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) ->
+ couch_jobs:add(Tx, T2, J2, Data)
+ end)),
+ ?assertMatch(#{state := pending, data := Data}, get_job(T2, J2)),
+ ?assertEqual(ok, couch_jobs:remove(?TX, T2, J2))
+ end).
+
+
+get_job_data_and_state(#{t1 := T, j1 := J}) ->
+ ?_test(begin
+ Data = #{<<"x">> => 42},
+ ok = couch_jobs:add(?TX, T, J, Data),
+ ?assertEqual({ok, Data}, couch_jobs:get_job_data(?TX, T, J)),
+ ?assertEqual({ok, pending}, couch_jobs:get_job_state(?TX, T, J)),
+ ?assertEqual(ok, couch_jobs:remove(?TX, T, J)),
+ ?assertEqual({error, not_found}, couch_jobs:get_job_data(?TX, T, J)),
+ ?assertEqual({error, not_found}, couch_jobs:get_job_state(?TX, T, J))
+ end).
+
+
+add_remove_errors(#{t1 := T, j1 := J}) ->
+ ?_test(begin
+ ?assertEqual({error, not_found}, couch_jobs:remove(?TX, 999, <<"x">>)),
+ ?assertMatch({error, {json_encoding_error, _}}, couch_jobs:add(?TX, T,
+ J, #{1 => 2})),
+ ?assertEqual({error, no_type_timeout}, couch_jobs:add(?TX, <<"x">>, J,
+ #{})),
+ ?assertEqual(ok, couch_jobs:add(?TX, T, J, #{})),
+ ?assertEqual(ok, couch_jobs:add(?TX, T, J, #{})),
+ ?assertEqual(ok, couch_jobs:remove(?TX, T, J))
+ end).
+
+
+resubmit_as_job_creator(#{t1 := T, j1 := J}) ->
+ ?_test(begin
+ Data = #{<<"x">> => 42},
+ ok = couch_jobs:add(?TX, T, J, Data, 15),
+
+ % Job was pending, doesn't get resubmitted
+ ok = couch_jobs:add(?TX, T, J, Data, 16),
+ ?assertMatch(#{state := pending, stime := 16}, get_job(T, J)),
+
+ {ok, Job1, Data} = couch_jobs:accept(T),
+
+ % If is running, it gets flagged to be resubmitted
+ ok = couch_jobs:add(?TX, T, J, Data, 17),
+ ?assertMatch(#{state := running, stime := 17}, get_job(T, J)),
+ ?assertEqual(true, couch_jobs:is_resubmitted(get_job(T, J))),
+
+ ?assertEqual(ok, couch_jobs:finish(?TX, Job1)),
+ % It should be pending according to the resubmit flag
+ ?assertMatch(#{state := pending, stime := 17}, get_job(T, J)),
+
+ % A finished job will be re-enqueued
+ {ok, Job2, _} = couch_jobs:accept(T),
+ ?assertEqual(ok, couch_jobs:finish(?TX, Job2)),
+ ?assertMatch(#{state := finished, stime := 17}, get_job(T, J)),
+ ok = couch_jobs:add(?TX, T, J, Data, 18),
+ ?assertMatch(#{state := pending, stime := 18}, get_job(T, J))
+ end).
+
+
+type_timeouts_and_server(#{t1 := T, t1_timeout := T1Timeout}) ->
+ ?_test(begin
+ couch_jobs_server:force_check_types(),
+
+ ?assertEqual(T1Timeout, couch_jobs:get_type_timeout(T)),
+
+ ?assertEqual(2,
+ length(couch_jobs_activity_monitor_sup:get_child_pids())),
+ ?assertEqual(2, length(couch_jobs_notifier_sup:get_child_pids())),
+ ?assertMatch({ok, _}, couch_jobs_server:get_notifier_server(T)),
+
+ ?assertEqual(ok, couch_jobs:set_type_timeout(<<"t3">>, 8)),
+ couch_jobs_server:force_check_types(),
+ ?assertEqual(3,
+ length(couch_jobs_activity_monitor_sup:get_child_pids())),
+ ?assertEqual(3, length(couch_jobs_notifier_sup:get_child_pids())),
+
+ ?assertEqual(ok, couch_jobs:clear_type_timeout(<<"t3">>)),
+ couch_jobs_server:force_check_types(),
+ ?assertEqual(2,
+ length(couch_jobs_activity_monitor_sup:get_child_pids())),
+ ?assertEqual(2,
+ length(couch_jobs_notifier_sup:get_child_pids())),
+ ?assertMatch({error, _},
+ couch_jobs_server:get_notifier_server(<<"t3">>)),
+
+ ?assertEqual(not_found, couch_jobs:get_type_timeout(<<"t3">>))
+ end).
+
+
+dead_notifier_restarts_jobs_server(#{}) ->
+ ?_test(begin
+ couch_jobs_server:force_check_types(),
+
+ ServerPid = whereis(couch_jobs_server),
+ Ref = monitor(process, ServerPid),
+
+ [Notifier1, _Notifier2] = couch_jobs_notifier_sup:get_child_pids(),
+ exit(Notifier1, kill),
+
+ % Killing a notifier should kill the server as well
+ receive {'DOWN', Ref, _, _, _} -> ok end
+ end).
+
+
+bad_messages_restart_couch_jobs_server(#{}) ->
+ ?_test(begin
+ % couch_jobs_server dies on bad cast
+ ServerPid1 = whereis(couch_jobs_server),
+ Ref1 = monitor(process, ServerPid1),
+ gen_server:cast(ServerPid1, bad_cast),
+ receive {'DOWN', Ref1, _, _, _} -> ok end,
+
+ restart_app(),
+
+ % couch_jobs_server dies on bad call
+ ServerPid2 = whereis(couch_jobs_server),
+ Ref2 = monitor(process, ServerPid2),
+ catch gen_server:call(ServerPid2, bad_call),
+ receive {'DOWN', Ref2, _, _, _} -> ok end,
+
+ restart_app(),
+
+ % couch_jobs_server dies on bad info
+ ServerPid3 = whereis(couch_jobs_server),
+ Ref3 = monitor(process, ServerPid3),
+ ServerPid3 ! a_random_message,
+ receive {'DOWN', Ref3, _, _, _} -> ok end,
+
+ restart_app()
+ end).
+
+
+bad_messages_restart_notifier(#{}) ->
+ ?_test(begin
+ couch_jobs_server:force_check_types(),
+
+ % bad cast kills the activity monitor
+ [AMon1, _] = couch_jobs_notifier_sup:get_child_pids(),
+ Ref1 = monitor(process, AMon1),
+ gen_server:cast(AMon1, bad_cast),
+ receive {'DOWN', Ref1, _, _, _} -> ok end,
+
+ restart_app(),
+
+ % bad calls restart activity monitor
+ [AMon2, _] = couch_jobs_notifier_sup:get_child_pids(),
+ Ref2 = monitor(process, AMon2),
+ catch gen_server:call(AMon2, bad_call),
+ receive {'DOWN', Ref2, _, _, _} -> ok end,
+
+ restart_app(),
+
+ % bad info message kills activity monitor
+ [AMon3, _] = couch_jobs_notifier_sup:get_child_pids(),
+ Ref3 = monitor(process, AMon3),
+ AMon3 ! a_bad_message,
+ receive {'DOWN', Ref3, _, _, _} -> ok end,
+
+
+ restart_app()
+ end).
+
+
+bad_messages_restart_activity_monitor(#{}) ->
+ ?_test(begin
+ couch_jobs_server:force_check_types(),
+
+ % bad cast kills the activity monitor
+ [AMon1, _] = couch_jobs_activity_monitor_sup:get_child_pids(),
+ Ref1 = monitor(process, AMon1),
+ gen_server:cast(AMon1, bad_cast),
+ receive {'DOWN', Ref1, _, _, _} -> ok end,
+
+ restart_app(),
+
+ % bad calls restart activity monitor
+ [AMon2, _] = couch_jobs_activity_monitor_sup:get_child_pids(),
+ Ref2 = monitor(process, AMon2),
+ catch gen_server:call(AMon2, bad_call),
+ receive {'DOWN', Ref2, _, _, _} -> ok end,
+
+ restart_app(),
+
+ % bad info message kills activity monitor
+ [AMon3, _] = couch_jobs_activity_monitor_sup:get_child_pids(),
+ Ref3 = monitor(process, AMon3),
+ AMon3 ! a_bad_message,
+ receive {'DOWN', Ref3, _, _, _} -> ok end,
+
+ restart_app()
+ end).
+
+
+basic_accept_and_finish(#{t1 := T, j1 := J}) ->
+ ?_test(begin
+ ok = couch_jobs:add(?TX, T, J, #{}),
+ {ok, Job, #{}} = couch_jobs:accept(T),
+ ?assertMatch(#{state := running}, get_job(T, J)),
+ % check json validation for bad data in finish
+ ?assertMatch({error, {json_encoding_error, _}},
+ fabric2_fdb:transactional(fun(Tx) ->
+ couch_jobs:finish(Tx, Job, #{1 => 1})
+ end)),
+ Data = #{<<"x">> => 42},
+ ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) ->
+ couch_jobs:finish(Tx, Job, Data)
+ end)),
+ ?assertMatch(#{state := finished, data := Data}, get_job(T, J))
+ end).
+
+
+accept_blocking(#{t1 := T, j1 := J1, j2 := J2}) ->
+ ?_test(begin
+ Accept = fun() -> exit(couch_jobs:accept(T)) end,
+ WaitAccept = fun(Ref) ->
+ receive
+ {'DOWN', Ref, _, _, Res} -> Res
+ after
+ 500 -> timeout
+ end
+ end,
+ {_, Ref1} = spawn_monitor(Accept),
+ ok = couch_jobs:add(?TX, T, J1, #{}),
+ ?assertMatch({ok, #{id := J1}, #{}}, WaitAccept(Ref1)),
+ {_, Ref2} = spawn_monitor(Accept),
+ ?assertEqual(timeout, WaitAccept(Ref2)),
+ ok = couch_jobs:add(?TX, T, J2, #{}),
+ ?assertMatch({ok, #{id := J2}, #{}}, WaitAccept(Ref2))
+ end).
+
+
+job_processor_update(#{t1 := T, j1 := J}) ->
+ ?_test(begin
+ ok = couch_jobs:add(?TX, T, J, #{}),
+ {ok, Job, #{}} = couch_jobs:accept(T),
+
+ % Use proper transactions in a few places here instead of passing in
+ % ?TX This is mostly to increase code coverage
+
+ ?assertMatch({ok, #{job := true}}, fabric2_fdb:transactional(fun(Tx) ->
+ couch_jobs:update(Tx, Job, #{<<"x">> => 1})
+ end)),
+
+ ?assertMatch(#{data := #{<<"x">> := 1}, state := running},
+ get_job(T, J)),
+
+ ?assertMatch({ok, #{job := true}}, fabric2_fdb:transactional(fun(Tx) ->
+ couch_jobs:update(Tx, Job)
+ end)),
+
+ ?assertMatch(#{data := #{<<"x">> := 1}, state := running},
+ get_job(T, J)),
+
+ ?assertMatch({ok, #{job := true}}, fabric2_fdb:transactional(fun(Tx) ->
+ couch_jobs:update(Tx, Job, #{<<"x">> => 2})
+ end)),
+
+ % check json validation for bad data in update
+ ?assertMatch({error, {json_encoding_error, _}},
+ fabric2_fdb:transactional(fun(Tx) ->
+ couch_jobs:update(Tx, Job, #{1 => 1})
+ end)),
+
+ ?assertMatch(#{data := #{<<"x">> := 2}, state := running},
+ get_job(T, J)),
+
+ % Finish may update the data as well
+ ?assertEqual(ok, couch_jobs:finish(?TX, Job, #{<<"x">> => 3})),
+ ?assertMatch(#{data := #{<<"x">> := 3}, state := finished},
+ get_job(T, J))
+ end).
+
+
+resubmit_enqueues_job(#{t1 := T, j1 := J}) ->
+ ?_test(begin
+ ok = couch_jobs:add(?TX, T, J, #{}),
+ {ok, Job1, #{}} = couch_jobs:accept(T),
+ ?assertMatch({ok, _}, couch_jobs:resubmit(?TX, Job1, 6)),
+ ?assertEqual(ok, couch_jobs:finish(?TX, Job1)),
+ ?assertMatch(#{state := pending, stime := 6}, get_job(T, J)),
+ {ok, Job2, #{}} = couch_jobs:accept(T),
+ ?assertEqual(ok, couch_jobs:finish(?TX, Job2)),
+ ?assertMatch(#{state := finished}, get_job(T, J))
+ end).
+
+
+resubmit_custom_schedtime(#{t1 := T, j1 := J}) ->
+ ?_test(begin
+ ?assertEqual(ok, couch_jobs:add(?TX, T, J, #{}, 7)),
+ {ok, Job, #{}} = couch_jobs:accept(T),
+ ?assertMatch({ok, _}, couch_jobs:resubmit(?TX, Job, 9)),
+ ?assertEqual(ok, couch_jobs:finish(?TX, Job)),
+ ?assertMatch(#{stime := 9, state := pending}, get_job(T, J))
+ end).
+
+
+accept_max_schedtime(#{t1 := T, j1 := J1, j2 := J2}) ->
+ ?_test(begin
+ ok = couch_jobs:add(?TX, T, J1, #{}, 5000),
+ ok = couch_jobs:add(?TX, T, J2, #{}, 3000),
+ ?assertEqual({error, not_found}, couch_jobs:accept(T,
+ #{max_sched_time => 1000})),
+ ?assertMatch({ok, #{id := J2}, _}, couch_jobs:accept(T,
+ #{max_sched_time => 3000})),
+ ?assertMatch({ok, #{id := J1}, _}, couch_jobs:accept(T,
+ #{max_sched_time => 9000}))
+ end).
+
+
+accept_no_schedule(#{t1 := T}) ->
+ ?_test(begin
+ JobCount = 25,
+ Jobs = [fabric2_util:uuid() || _ <- lists:seq(1, JobCount)],
+ [couch_jobs:add(?TX, T, J, #{}) || J <- Jobs],
+ InvalidOpts = #{no_schedule => true, max_sched_time => 1},
+ ?assertMatch({error, _}, couch_jobs:accept(T, InvalidOpts)),
+ AcceptOpts = #{no_schedule => true},
+ Accepted = [begin
+ {ok, #{id := J}, _} = couch_jobs:accept(T, AcceptOpts),
+ J
+ end || _ <- lists:seq(1, JobCount)],
+ ?assertEqual(lists:sort(Jobs), lists:sort(Accepted))
+ end).
+
+
+subscribe(#{t1 := T, j1 := J}) ->
+ ?_test(begin
+ ok = couch_jobs:add(?TX, T, J, #{<<"z">> => 1}),
+
+ ?assertEqual({error, not_found}, couch_jobs:subscribe(<<"xyz">>, J)),
+ ?assertEqual({error, not_found}, couch_jobs:subscribe(T, <<"j5">>)),
+
+ SubRes0 = couch_jobs:subscribe(T, J),
+ ?assertMatch({ok, {_, _}, pending, #{<<"z">> := 1}}, SubRes0),
+ {ok, SubId0, pending, _} = SubRes0,
+
+ SubRes1 = couch_jobs:subscribe(T, J),
+ ?assertEqual(SubRes0, SubRes1),
+
+ ?assertEqual(ok, couch_jobs:unsubscribe(SubId0)),
+
+ SubRes = couch_jobs:subscribe(T, J),
+ ?assertMatch({ok, {_, _}, pending, #{<<"z">> := 1}}, SubRes),
+ {ok, SubId, pending, _} = SubRes,
+
+ {ok, Job, _} = couch_jobs:accept(T),
+ ?assertMatch({T, J, running, #{<<"z">> := 1}},
+ couch_jobs:wait(SubId, 5000)),
+
+ % Make sure we get intermediate `running` updates
+ ?assertMatch({ok, _}, couch_jobs:update(?TX, Job, #{<<"z">> => 2})),
+ ?assertMatch({T, J, running, #{<<"z">> := 2}},
+ couch_jobs:wait(SubId, 5000)),
+
+ ?assertEqual(ok, couch_jobs:finish(?TX, Job, #{<<"z">> => 3})),
+ ?assertMatch({T, J, finished, #{<<"z">> := 3}},
+ couch_jobs:wait(SubId, finished, 5000)),
+
+ ?assertEqual(timeout, couch_jobs:wait(SubId, 50)),
+
+ ?assertEqual({ok, finished, #{<<"z">> => 3}},
+ couch_jobs:subscribe(T, J)),
+
+ ?assertEqual(ok, couch_jobs:remove(?TX, T, J)),
+ ?assertEqual({error, not_found}, couch_jobs:subscribe(T, J))
+ end).
+
+
+subscribe_wait_multiple(#{t1 := T, j1 := J1, j2 := J2}) ->
+ ?_test(begin
+ ok = couch_jobs:add(?TX, T, J1, #{}),
+ ok = couch_jobs:add(?TX, T, J2, #{}),
+
+ {ok, S1, pending, #{}} = couch_jobs:subscribe(T, J1),
+ {ok, S2, pending, #{}} = couch_jobs:subscribe(T, J2),
+
+ Subs = [S1, S2],
+
+ % Accept one job. Only one running update is expected. PJob1 and PJob2
+ % do not necessarily correspond got Job1 and Job2, they could be
+ % accepted as Job2 and Job1 respectively.
+ {ok, PJob1, _} = couch_jobs:accept(T),
+ ?assertMatch({_, _, running, _}, couch_jobs:wait(Subs, 5000)),
+ ?assertMatch(timeout, couch_jobs:wait(Subs, 50)),
+
+ % Accept another job. Expect another update.
+ {ok, PJob2, _} = couch_jobs:accept(T),
+ ?assertMatch({_, _, running, _}, couch_jobs:wait(Subs, 5000)),
+ ?assertMatch(timeout, couch_jobs:wait(Subs, 50)),
+
+ ?assertMatch({ok, _}, couch_jobs:update(?TX, PJob1, #{<<"q">> => 5})),
+ ?assertMatch({ok, _}, couch_jobs:update(?TX, PJob2, #{<<"r">> => 6})),
+
+ % Each job was updated once, expect two running updates.
+ ?assertMatch({_, _, running, _}, couch_jobs:wait(Subs, 5000)),
+ ?assertMatch({_, _, running, _}, couch_jobs:wait(Subs, 5000)),
+
+ % Finish one job. Expect one finished update only.
+ ?assertEqual(ok, couch_jobs:finish(?TX, PJob1)),
+
+ ?assertMatch({_, _, finished, #{<<"q">> := 5}},
+ couch_jobs:wait(Subs, finished, 5000)),
+ ?assertMatch(timeout, couch_jobs:wait(Subs, finished, 50)),
+
+ % Finish another job. However, unsubscribe should flush the
+ % the message and we should not get it.
+ ?assertEqual(ok, couch_jobs:finish(?TX, PJob2)),
+ ?assertEqual(ok, couch_jobs:unsubscribe(S1)),
+ ?assertEqual(ok, couch_jobs:unsubscribe(S2)),
+ ?assertMatch(timeout, couch_jobs:wait(Subs, finished, 50))
+ end).
+
+
+enqueue_inactive(#{t1 := T, j1 := J, t1_timeout := Timeout}) ->
+ {timeout, 10, ?_test(begin
+ couch_jobs_server:force_check_types(),
+
+ ok = couch_jobs:add(?TX, T, J, #{<<"y">> => 1}),
+ {ok, Job, _} = couch_jobs:accept(T),
+
+ {ok, SubId, running, #{<<"y">> := 1}} = couch_jobs:subscribe(T, J),
+ Wait = 3 * Timeout * 1000,
+ ?assertEqual({T, J, pending, #{<<"y">> => 1}},
+ couch_jobs:wait(SubId, pending, Wait)),
+ ?assertMatch(#{state := pending}, get_job(T, J)),
+
+ % After job was re-enqueued, old job processor can't update it anymore
+ ?assertEqual({error, halt}, couch_jobs:update(?TX, Job)),
+ ?assertEqual({error, halt}, couch_jobs:finish(?TX, Job))
+ end)}.
+
+
+remove_running_job(#{t1 := T, j1 := J}) ->
+ ?_test(begin
+ ok = couch_jobs:add(?TX, T, J, #{}),
+ {ok, Job, _} = couch_jobs:accept(T),
+ ?assertEqual(ok, couch_jobs:remove(?TX, T, J)),
+ ?assertEqual({error, not_found}, couch_jobs:remove(?TX, T, J)),
+ ?assertEqual({error, halt}, couch_jobs:update(?TX, Job)),
+ ?assertEqual({error, halt}, couch_jobs:finish(?TX, Job))
+ end).
+
+
+check_get_jobs(#{t1 := T1, j1 := J1, t2 := T2, j2 := J2}) ->
+ ?_test(begin
+ ok = couch_jobs:add(?TX, T1, J1, #{}),
+ ok = couch_jobs:add(?TX, T2, J2, #{}),
+ ?assertMatch([
+ {T2, J2, pending, #{}},
+ {T1, J1, pending, #{}}
+ ], lists:sort(couch_jobs_fdb:get_jobs())),
+ {ok, _, _} = couch_jobs:accept(T1),
+ ?assertMatch([
+ {T2, J2, pending, #{}},
+ {T1, J1, running, #{}}
+ ], lists:sort(couch_jobs_fdb:get_jobs()))
+ end).
+
+
+use_fabric_transaction_object(#{t1 := T1, j1 := J1, dbname := DbName}) ->
+ ?_test(begin
+ {ok, Db} = fabric2_db:create(DbName, []),
+ ?assertEqual(ok, couch_jobs:add(Db, T1, J1, #{})),
+ ?assertMatch(#{state := pending, data := #{}}, get_job(T1, J1)),
+ {ok, Job, _} = couch_jobs:accept(T1),
+ ?assertEqual(ok, fabric2_fdb:transactional(Db, fun(Db1) ->
+ {ok, #{}} = couch_jobs:get_job_data(Db1, T1, J1),
+ Doc1 = #doc{id = <<"1">>, body = {[]}},
+ {ok, {_, _}} = fabric2_db:update_doc(Db1, Doc1),
+ Doc2 = #doc{id = <<"2">>, body = {[]}},
+ {ok, {_, _}} = fabric2_db:update_doc(Db1, Doc2),
+ couch_jobs:finish(Db1, Job, #{<<"d">> => 1})
+ end)),
+ ok = couch_jobs:remove(#{tx => undefined}, T1, J1),
+ ok = fabric2_db:delete(DbName, [])
+ end).