summaryrefslogtreecommitdiff
path: root/src/couch_jobs/src/couch_jobs_fdb.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couch_jobs/src/couch_jobs_fdb.erl')
-rw-r--r--src/couch_jobs/src/couch_jobs_fdb.erl725
1 files changed, 725 insertions, 0 deletions
diff --git a/src/couch_jobs/src/couch_jobs_fdb.erl b/src/couch_jobs/src/couch_jobs_fdb.erl
new file mode 100644
index 000000000..27131ec86
--- /dev/null
+++ b/src/couch_jobs/src/couch_jobs_fdb.erl
@@ -0,0 +1,725 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_jobs_fdb).
+
+
+-export([
+ add/5,
+ remove/2,
+ get_job_state_and_data/2,
+ get_jobs/2,
+ get_jobs/3,
+
+ accept/4,
+ finish/3,
+ resubmit/3,
+ resubmit/4,
+ update/3,
+
+ set_type_timeout/3,
+ clear_type_timeout/2,
+ get_type_timeout/2,
+ get_types/1,
+
+ get_activity_vs/2,
+ get_activity_vs_and_watch/2,
+ get_active_since/3,
+ get_inactive_since/3,
+ re_enqueue_inactive/3,
+
+ init_cache/0,
+
+ encode_data/1,
+ decode_data/1,
+
+ get_jtx/0,
+ get_jtx/1,
+ tx/2,
+
+ get_job/2,
+ get_jobs/0,
+
+ bump_metadata_version/0,
+ bump_metadata_version/1
+]).
+
+
+-include("couch_jobs.hrl").
+
+
+-record(jv, {
+ seq,
+ jlock,
+ stime,
+ resubmit,
+ data
+}).
+
+
+-define(JOBS_ETS_KEY, jobs).
+-define(MD_TIMESTAMP_ETS_KEY, md_timestamp).
+-define(MD_VERSION_MAX_AGE_SEC, 10).
+-define(PENDING_SEQ, 0).
+
+
+% Data model
+%
+% (?JOBS, ?DATA, Type, JobId) = (Sequence, Lock, SchedTime, Resubmit, JobData)
+% (?JOBS, ?PENDING, Type, ScheduledTime, JobId) = ""
+% (?JOBS, ?WATCHES_PENDING, Type) = Counter
+% (?JOBS, ?WATCHES_ACTIVITY, Type) = Sequence
+% (?JOBS, ?ACTIVITY_TIMEOUT, Type) = ActivityTimeout
+% (?JOBS, ?ACTIVITY, Type, Sequence) = JobId
+%
+% In the ?DATA row Sequence can have these values:
+% 0 - when the job is pending
+% null - when the job is finished
+% Versionstamp - when the job is running
+
+
+% Job creation API
+
+add(#{jtx := true} = JTx0, Type, JobId, Data, STime) ->
+ #{tx := Tx} = JTx = get_jtx(JTx0),
+ Job = #{job => true, type => Type, id => JobId},
+ case get_type_timeout(JTx, Type) of
+ not_found ->
+ {error, no_type_timeout};
+ Int when is_integer(Int) ->
+ Key = job_key(JTx, Job),
+ case erlfdb:wait(erlfdb:get(Tx, Key)) of
+ <<_/binary>> ->
+ {ok, Job1} = resubmit(JTx, Job, STime, Data),
+ #{seq := Seq, state := State, data := Data1} = Job1,
+ {ok, State, Seq, Data1};
+ not_found ->
+ try
+ maybe_enqueue(JTx, Type, JobId, STime, true, Data),
+ {ok, pending, ?PENDING_SEQ, Data}
+ catch
+ error:{json_encoding_error, Error} ->
+ {error, {json_encoding_error, Error}}
+ end
+ end
+ end.
+
+
+remove(#{jtx := true} = JTx0, #{job := true} = Job) ->
+ #{tx := Tx} = JTx = get_jtx(JTx0),
+ #{type := Type, id := JobId} = Job,
+ Key = job_key(JTx, Job),
+ case get_job_val(Tx, Key) of
+ #jv{stime = STime, seq = Seq} ->
+ couch_jobs_pending:remove(JTx, Type, JobId, STime),
+ clear_activity(JTx, Type, Seq),
+ erlfdb:clear(Tx, Key),
+ update_watch(JTx, Type),
+ ok;
+ not_found ->
+ {error, not_found}
+ end.
+
+
+get_job_state_and_data(#{jtx := true} = JTx, #{job := true} = Job) ->
+ case get_job_val(get_jtx(JTx), Job) of
+ #jv{seq = Seq, jlock = JLock, data = Data} ->
+ {ok, Seq, job_state(JLock, Seq), Data};
+ not_found ->
+ {error, not_found}
+ end.
+
+
+get_jobs(JTx, Type) ->
+ get_jobs(JTx, Type, fun(_) -> true end).
+
+
+get_jobs(#{jtx := true} = JTx, Type, Filter) when is_function(Filter, 1) ->
+ #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+ Prefix = erlfdb_tuple:pack({?DATA, Type}, Jobs),
+ Opts = [{streaming_mode, want_all}],
+ Result = erlfdb:wait(erlfdb:get_range_startswith(Tx, Prefix, Opts)),
+ lists:foldl(fun({K, V}, #{} = Acc) ->
+ {JobId} = erlfdb_tuple:unpack(K, Prefix),
+ case Filter(JobId) of
+ true ->
+ {Seq, JLock, _, _, Data} = erlfdb_tuple:unpack(V),
+ Acc#{JobId => {Seq, job_state(JLock, Seq), Data}};
+ false ->
+ Acc
+ end
+ end, #{}, Result).
+
+
+% Job processor API
+
+accept(#{jtx := true} = JTx0, Type, MaxSTime, NoSched)
+ when is_integer(MaxSTime), is_boolean(NoSched) ->
+ #{jtx := true, tx := Tx} = JTx = get_jtx(JTx0),
+ case couch_jobs_pending:dequeue(JTx, Type, MaxSTime, NoSched) of
+ {not_found, PendingWatch} ->
+ {not_found, PendingWatch};
+ {ok, JobId} ->
+ JLock = fabric2_util:uuid(),
+ Key = job_key(JTx, Type, JobId),
+ JV0 = get_job_val(Tx, Key),
+ #jv{jlock = null, data = Data} = JV0,
+ JV = JV0#jv{seq = ?UNSET_VS, jlock = JLock, resubmit = false},
+ set_job_val(Tx, Key, JV),
+ update_activity(JTx, Type, JobId, null, Data),
+ Job = #{
+ job => true,
+ type => Type,
+ id => JobId,
+ jlock => JLock
+ },
+ {ok, Job, decode_data(Data)}
+ end.
+
+
+finish(#{jtx := true} = JTx0, #{jlock := <<_/binary>>} = Job, Data) when
+ is_map(Data) orelse Data =:= undefined ->
+ #{tx := Tx} = JTx = get_jtx(JTx0),
+ #{type := Type, jlock := JLock, id := JobId} = Job,
+ case get_job_or_halt(Tx, job_key(JTx, Job), JLock) of
+ #jv{seq = Seq, stime = STime, resubmit = Resubmit, data = OldData} ->
+ NewData = case Data =:= undefined of
+ true -> OldData;
+ false -> Data
+ end,
+ try maybe_enqueue(JTx, Type, JobId, STime, Resubmit, NewData) of
+ ok ->
+ clear_activity(JTx, Type, Seq),
+ update_watch(JTx, Type)
+ catch
+ error:{json_encoding_error, Error} ->
+ {error, {json_encoding_error, Error}}
+ end;
+ halt ->
+ {error, halt}
+ end.
+
+resubmit(JTx0, Job, NewSTime) ->
+ resubmit(JTx0, Job, NewSTime, undefined).
+
+
+resubmit(#{jtx := true} = JTx0, #{job := true} = Job, NewSTime, NewData) ->
+ #{tx := Tx} = JTx = get_jtx(JTx0),
+ #{type := Type, id := JobId} = Job,
+ Key = job_key(JTx, Job),
+ case get_job_val(Tx, Key) of
+ #jv{seq = Seq, jlock = JLock, stime = OldSTime, data = Data} = JV ->
+ STime = case NewSTime =:= undefined of
+ true -> OldSTime;
+ false -> NewSTime
+ end,
+ case job_state(JLock, Seq) of
+ finished ->
+ ok = maybe_enqueue(JTx, Type, JobId, STime, true, NewData),
+ NewData1 = update_job_data(Data, NewData),
+ Job1 = Job#{
+ seq => ?PENDING_SEQ,
+ state => pending,
+ data => NewData1
+ },
+ {ok, Job1};
+ pending when STime == OldSTime ->
+ % If pending and scheduled time doesn't change avoid generating
+ % un-necessary writes by removing and re-adding the jobs into the
+ % pending queue.
+ Job1 = Job#{
+ stime => STime,
+ seq => ?PENDING_SEQ,
+ state => pending,
+ data => Data
+ },
+ {ok, Job1};
+ pending ->
+ JV1 = JV#jv{seq = ?PENDING_SEQ, stime = STime, data = NewData},
+ set_job_val(Tx, Key, JV1),
+ couch_jobs_pending:remove(JTx, Type, JobId, OldSTime),
+ couch_jobs_pending:enqueue(JTx, Type, STime, JobId),
+ NewData1 = update_job_data(Data, NewData),
+ Job1 = Job#{
+ stime => STime,
+ seq => ?PENDING_SEQ,
+ state => pending,
+ data => NewData1
+ },
+ {ok, Job1};
+ running ->
+ JV1 = JV#jv{stime = STime, resubmit = true},
+ set_job_val(Tx, Key, JV1),
+ {ok, Job#{resubmit => true, stime => STime,
+ state => running, seq => Seq, data => Data}}
+ end;
+ not_found ->
+ {error, not_found}
+ end.
+
+
+update(#{jtx := true} = JTx0, #{jlock := <<_/binary>>} = Job, Data0) when
+ is_map(Data0) orelse Data0 =:= undefined ->
+ #{tx := Tx} = JTx = get_jtx(JTx0),
+ #{jlock := JLock, type := Type, id := JobId} = Job,
+ Key = job_key(JTx, Job),
+ case get_job_or_halt(Tx, Key, JLock) of
+ #jv{seq = Seq, stime = STime, resubmit = Resubmit} = JV0 ->
+ Data = case Data0 =:= undefined of
+ true -> JV0#jv.data;
+ false -> Data0
+ end,
+ JV = JV0#jv{seq = ?UNSET_VS, data = Data},
+ try set_job_val(Tx, Key, JV) of
+ ok ->
+ update_activity(JTx, Type, JobId, Seq, Data),
+ {ok, Job#{resubmit => Resubmit, stime => STime}}
+ catch
+ error:{json_encoding_error, Error} ->
+ {error, {json_encoding_error, Error}}
+ end;
+ halt ->
+ {error, halt}
+ end.
+
+
+% Type and activity monitoring API
+
+set_type_timeout(#{jtx := true} = JTx, Type, Timeout) ->
+ #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+ Key = erlfdb_tuple:pack({?ACTIVITY_TIMEOUT, Type}, Jobs),
+ Val = erlfdb_tuple:pack({Timeout}),
+ erlfdb:set(Tx, Key, Val).
+
+
+clear_type_timeout(#{jtx := true} = JTx, Type) ->
+ #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+ Key = erlfdb_tuple:pack({?ACTIVITY_TIMEOUT, Type}, Jobs),
+ erlfdb:clear(Tx, Key).
+
+
+get_type_timeout(#{jtx := true} = JTx, Type) ->
+ #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+ Key = erlfdb_tuple:pack({?ACTIVITY_TIMEOUT, Type}, Jobs),
+ case erlfdb:wait(erlfdb:get_ss(Tx, Key)) of
+ not_found ->
+ not_found;
+ Val ->
+ {Timeout} = erlfdb_tuple:unpack(Val),
+ Timeout
+ end.
+
+
+get_types(#{jtx := true} = JTx) ->
+ #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+ Prefix = erlfdb_tuple:pack({?ACTIVITY_TIMEOUT}, Jobs),
+ Opts = [{streaming_mode, want_all}],
+ Result = erlfdb:wait(erlfdb:get_range_startswith(Tx, Prefix, Opts)),
+ lists:map(fun({K, _V}) ->
+ {Type} = erlfdb_tuple:unpack(K, Prefix),
+ Type
+ end, Result).
+
+
+get_activity_vs(#{jtx := true} = JTx, Type) ->
+ #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+ Key = erlfdb_tuple:pack({?WATCHES_ACTIVITY, Type}, Jobs),
+ case erlfdb:wait(erlfdb:get(Tx, Key)) of
+ not_found ->
+ not_found;
+ Val ->
+ {VS} = erlfdb_tuple:unpack(Val),
+ VS
+ end.
+
+
+get_activity_vs_and_watch(#{jtx := true} = JTx, Type) ->
+ #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+ Key = erlfdb_tuple:pack({?WATCHES_ACTIVITY, Type}, Jobs),
+ Future = erlfdb:get(Tx, Key),
+ Watch = erlfdb:watch(Tx, Key),
+ case erlfdb:wait(Future) of
+ not_found ->
+ {not_found, Watch};
+ Val ->
+ {VS} = erlfdb_tuple:unpack(Val),
+ {VS, Watch}
+ end.
+
+
+get_active_since(#{jtx := true} = JTx, Type, Versionstamp) ->
+ #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+ Prefix = erlfdb_tuple:pack({?ACTIVITY}, Jobs),
+ StartKey = erlfdb_tuple:pack({Type, Versionstamp}, Prefix),
+ StartKeySel = erlfdb_key:first_greater_or_equal(StartKey),
+ {_, EndKey} = erlfdb_tuple:range({Type}, Prefix),
+ Opts = [{streaming_mode, want_all}],
+ Future = erlfdb:get_range(Tx, StartKeySel, EndKey, Opts),
+ maps:from_list(lists:map(fun({_K, V}) ->
+ erlfdb_tuple:unpack(V)
+ end, erlfdb:wait(Future))).
+
+
+get_inactive_since(#{jtx := true} = JTx, Type, Versionstamp) ->
+ #{tx := Tx, jobs_path := Jobs} = get_jtx(JTx),
+ Prefix = erlfdb_tuple:pack({?ACTIVITY}, Jobs),
+ {StartKey, _} = erlfdb_tuple:range({Type}, Prefix),
+ EndKey = erlfdb_tuple:pack({Type, Versionstamp}, Prefix),
+ EndKeySel = erlfdb_key:first_greater_than(EndKey),
+ Opts = [{streaming_mode, want_all}],
+ Future = erlfdb:get_range(Tx, StartKey, EndKeySel, Opts),
+ lists:map(fun({_K, V}) ->
+ {JobId, _} = erlfdb_tuple:unpack(V),
+ JobId
+ end, erlfdb:wait(Future)).
+
+
+re_enqueue_inactive(#{jtx := true} = JTx, Type, JobIds) when is_list(JobIds) ->
+ #{tx := Tx} = get_jtx(JTx),
+ lists:foreach(fun(JobId) ->
+ case get_job_val(Tx, job_key(JTx, Type, JobId)) of
+ #jv{seq = Seq, stime = STime, data = Data} ->
+ clear_activity(JTx, Type, Seq),
+ maybe_enqueue(JTx, Type, JobId, STime, true, Data);
+ not_found ->
+ ok
+ end
+ end, JobIds),
+ case length(JobIds) > 0 of
+ true -> update_watch(JTx, Type);
+ false -> ok
+ end.
+
+
+% Cache initialization API. Called from the supervisor just to create the ETS
+% table. It returns `ignore` to tell supervisor it won't actually start any
+% process, which is what we want here.
+%
+init_cache() ->
+ ConcurrencyOpts = [{read_concurrency, true}, {write_concurrency, true}],
+ ets:new(?MODULE, [public, named_table] ++ ConcurrencyOpts),
+ ignore.
+
+
+% Functions to encode / decode JobData
+%
+encode_data(#{} = JobData) ->
+ try
+ iolist_to_binary(jiffy:encode(JobData, [force_utf8]))
+ catch
+ throw:{error, Error} ->
+ % legacy clause since new versions of jiffy raise error instead
+ error({json_encoding_error, Error});
+ error:Error ->
+ error({json_encoding_error, Error})
+ end.
+
+
+decode_data(not_found) ->
+ not_found;
+
+decode_data(#{} = JobData) ->
+ JobData;
+
+decode_data(<<_/binary>> = JobData) ->
+ jiffy:decode(JobData, [dedupe_keys, return_maps]).
+
+
+% Cached job transaction object. This object wraps a transaction, caches the
+% directory lookup path, and the metadata version. The function can be used
+% from inside or outside the transaction. When used from a transaction it will
+% verify if the metadata was changed, and will refresh automatically.
+%
+get_jtx() ->
+ get_jtx(undefined).
+
+
+get_jtx(#{tx := Tx} = _TxDb) ->
+ get_jtx(Tx);
+
+get_jtx(undefined = _Tx) ->
+ case ets:lookup(?MODULE, ?JOBS_ETS_KEY) of
+ [{_, #{} = JTx}] ->
+ JTx;
+ [] ->
+ JTx = update_jtx_cache(init_jtx(undefined)),
+ JTx#{tx := undefined}
+ end;
+
+get_jtx({erlfdb_transaction, _} = Tx) ->
+ case ets:lookup(?MODULE, ?JOBS_ETS_KEY) of
+ [{_, #{} = JTx}] ->
+ ensure_current(JTx#{tx := Tx});
+ [] ->
+ update_jtx_cache(init_jtx(Tx))
+ end.
+
+
+% Transaction processing to be used with couch jobs' specific transaction
+% contexts
+%
+tx(#{jtx := true} = JTx, Fun) when is_function(Fun, 1) ->
+ fabric2_fdb:transactional(JTx, Fun).
+
+
+% Debug and testing API
+
+get_job(Type, JobId) ->
+ fabric2_fdb:transactional(fun(Tx) ->
+ JTx = init_jtx(Tx),
+ case get_job_val(Tx, job_key(JTx, Type, JobId)) of
+ #jv{seq = Seq, jlock = JLock} = JV ->
+ #{
+ job => true,
+ type => Type,
+ id => JobId,
+ seq => Seq,
+ jlock => JLock,
+ stime => JV#jv.stime,
+ resubmit => JV#jv.resubmit,
+ data => decode_data(JV#jv.data),
+ state => job_state(JLock, Seq)
+ };
+ not_found ->
+ not_found
+ end
+ end).
+
+
+get_jobs() ->
+ fabric2_fdb:transactional(fun(Tx) ->
+ #{jobs_path := Jobs} = init_jtx(Tx),
+ Prefix = erlfdb_tuple:pack({?DATA}, Jobs),
+ Opts = [{streaming_mode, want_all}],
+ Result = erlfdb:wait(erlfdb:get_range_startswith(Tx, Prefix, Opts)),
+ lists:map(fun({K, V}) ->
+ {Type, JobId} = erlfdb_tuple:unpack(K, Prefix),
+ {Seq, JLock, _, _, Data} = erlfdb_tuple:unpack(V),
+ JobState = job_state(JLock, Seq),
+ {Type, JobId, JobState, decode_data(Data)}
+ end, Result)
+ end).
+
+
+% Call this function if the top level "couchdb" FDB directory layer
+% changes.
+%
+bump_metadata_version() ->
+ fabric2_fdb:transactional(fun(Tx) ->
+ bump_metadata_version(Tx)
+ end).
+
+
+bump_metadata_version(Tx) ->
+ erlfdb:set_versionstamped_value(Tx, ?COUCH_JOBS_MD_VERSION, <<0:112>>).
+
+
+% Private helper functions
+
+maybe_enqueue(#{jtx := true} = JTx, Type, JobId, STime, Resubmit, Data) ->
+ #{tx := Tx} = JTx,
+ Key = job_key(JTx, Type, JobId),
+ JV = #jv{
+ seq = null,
+ jlock = null,
+ stime = STime,
+ resubmit = false,
+ data = Data
+ },
+ case Resubmit of
+ true ->
+ set_job_val(Tx, Key, JV#jv{seq = ?PENDING_SEQ}),
+ couch_jobs_pending:enqueue(JTx, Type, STime, JobId);
+ false ->
+ set_job_val(Tx, Key, JV)
+ end,
+ ok.
+
+
+job_key(#{jtx := true, jobs_path := Jobs}, Type, JobId) ->
+ erlfdb_tuple:pack({?DATA, Type, JobId}, Jobs).
+
+
+job_key(JTx, #{type := Type, id := JobId}) ->
+ job_key(JTx, Type, JobId).
+
+
+get_job_val(#{jtx := true, tx := Tx} = JTx, #{job := true} = Job) ->
+ get_job_val(Tx, job_key(JTx, Job));
+
+get_job_val(Tx = {erlfdb_transaction, _}, Key) ->
+ case erlfdb:wait(erlfdb:get(Tx, Key)) of
+ <<_/binary>> = Val ->
+ {Seq, JLock, STime, Resubmit, Data} = erlfdb_tuple:unpack(Val),
+ #jv{
+ seq = Seq,
+ jlock = JLock,
+ stime = STime,
+ resubmit = Resubmit,
+ data = Data
+ };
+ not_found ->
+ not_found
+ end.
+
+
+set_job_val(Tx = {erlfdb_transaction, _}, Key, #jv{} = JV) ->
+ #jv{
+ seq = Seq,
+ jlock = JLock,
+ stime = STime,
+ resubmit = Resubmit,
+ data = Data0
+ } = JV,
+ Data = case Data0 of
+ #{} -> encode_data(Data0);
+ <<_/binary>> -> Data0
+ end,
+ case Seq of
+ ?UNSET_VS ->
+ Val = erlfdb_tuple:pack_vs({Seq, JLock, STime, Resubmit, Data}),
+ erlfdb:set_versionstamped_value(Tx, Key, Val);
+ _Other ->
+ Val = erlfdb_tuple:pack({Seq, JLock, STime, Resubmit, Data}),
+ erlfdb:set(Tx, Key, Val)
+ end,
+ ok.
+
+
+get_job_or_halt(Tx, Key, JLock) ->
+ case get_job_val(Tx, Key) of
+ #jv{jlock = CurJLock} when CurJLock =/= JLock ->
+ halt;
+ #jv{} = Res ->
+ Res;
+ not_found ->
+ halt
+ end.
+
+
+update_activity(#{jtx := true} = JTx, Type, JobId, Seq, Data0) ->
+ #{tx := Tx, jobs_path := Jobs} = JTx,
+ case Seq =/= null of
+ true -> clear_activity(JTx, Type, Seq);
+ false -> ok
+ end,
+ Key = erlfdb_tuple:pack_vs({?ACTIVITY, Type, ?UNSET_VS}, Jobs),
+ Data = case Data0 of
+ #{} -> encode_data(Data0);
+ <<_/binary>> -> Data0
+ end,
+ Val = erlfdb_tuple:pack({JobId, Data}),
+ erlfdb:set_versionstamped_key(Tx, Key, Val),
+ update_watch(JTx, Type).
+
+
+clear_activity(#{jtx := true} = JTx, Type, Seq) ->
+ #{tx := Tx, jobs_path := Jobs} = JTx,
+ Key = erlfdb_tuple:pack({?ACTIVITY, Type, Seq}, Jobs),
+ erlfdb:clear(Tx, Key).
+
+
+update_watch(#{jtx := true} = JTx, Type) ->
+ #{tx := Tx, jobs_path := Jobs} = JTx,
+ Key = erlfdb_tuple:pack({?WATCHES_ACTIVITY, Type}, Jobs),
+ Val = erlfdb_tuple:pack_vs({?UNSET_VS}),
+ erlfdb:set_versionstamped_value(Tx, Key, Val),
+ ok.
+
+
+job_state(JLock, Seq) ->
+ case {JLock, Seq} of
+ {null, null} -> finished;
+ {JLock, _} when JLock =/= null -> running;
+ {null, Seq} when Seq =/= null -> pending
+ end.
+
+
+% This a transaction context object similar to the Db = #{} one from
+% fabric2_fdb. It's is used to cache the jobs path directory (to avoid extra
+% lookups on every operation) and to check for metadata changes (in case
+% directory changes).
+%
+init_jtx(undefined) ->
+ fabric2_fdb:transactional(fun(Tx) -> init_jtx(Tx) end);
+
+init_jtx({erlfdb_transaction, _} = Tx) ->
+ LayerPrefix = fabric2_fdb:get_dir(Tx),
+ Jobs = erlfdb_tuple:pack({?JOBS}, LayerPrefix),
+ % layer_prefix, md_version and tx here match db map fields in fabric2_fdb
+ % but we also assert that this is a job transaction using the jtx => true
+ % field
+ #{
+ jtx => true,
+ tx => Tx,
+ layer_prefix => LayerPrefix,
+ jobs_path => Jobs,
+ md_version => get_metadata_version(Tx)
+ }.
+
+
+ensure_current(#{jtx := true, tx := Tx} = JTx) ->
+ case get(?COUCH_JOBS_CURRENT) of
+ Tx ->
+ JTx;
+ _ ->
+ JTx1 = update_current(JTx),
+ put(?COUCH_JOBS_CURRENT, Tx),
+ JTx1
+ end.
+
+
+get_metadata_version({erlfdb_transaction, _} = Tx) ->
+ erlfdb:wait(erlfdb:get_ss(Tx, ?COUCH_JOBS_MD_VERSION)).
+
+
+update_current(#{tx := Tx, md_version := Version} = JTx) ->
+ case get_md_version_age(Version) of
+ Age when Age =< ?MD_VERSION_MAX_AGE_SEC ->
+ % Looked it up not too long ago. Avoid looking it up to frequently
+ JTx;
+ _ ->
+ case get_metadata_version(Tx) of
+ Version ->
+ update_md_version_timestamp(Version),
+ JTx;
+ _NewVersion ->
+ update_jtx_cache(init_jtx(Tx))
+ end
+ end.
+
+
+update_jtx_cache(#{jtx := true, md_version := Version} = JTx) ->
+ CachedJTx = JTx#{tx := undefined},
+ ets:insert(?MODULE, {?JOBS_ETS_KEY, CachedJTx}),
+ update_md_version_timestamp(Version),
+ JTx.
+
+
+get_md_version_age(Version) ->
+ Timestamp = case ets:lookup(?MODULE, ?MD_TIMESTAMP_ETS_KEY) of
+ [{_, Version, Ts}] -> Ts;
+ _ -> 0
+ end,
+ erlang:system_time(second) - Timestamp.
+
+
+update_md_version_timestamp(Version) ->
+ Ts = erlang:system_time(second),
+ ets:insert(?MODULE, {?MD_TIMESTAMP_ETS_KEY, Version, Ts}).
+
+
+update_job_data(Data, undefined) ->
+ Data;
+
+update_job_data(_Data, NewData) ->
+ NewData.