summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2020-02-18 17:20:15 -0500
committerNick Vatamaniuc <vatamane@apache.org>2020-02-18 17:56:01 -0500
commit9d408aa0c73d8d1e8e7b4d3d5a51abdd5f7d793c (patch)
tree2a1f1b5002864f8e5c66b6d50f4f72aa608169a9
parentb9b757c2802f7fe2e96eec8aedbda10cf2453236 (diff)
downloadcouchdb-couch_jobs_uses_its_own_metadata.tar.gz
Let couch_jobs use its own metadata keycouch_jobs_uses_its_own_metadata
Previously, if the metadata key is bumped in a transaction, the same transaction could not be used to add jobs with `couch_jobs`. That's because metadata is a versionstamped value, and when set, it cannot be read back until that transaction has committed. In `fabric2_fdb` there is a process dict key that is set which declares that metadata was already read, which happens before any db update, however `couch_jobs` uses it's own caching mechanism and doesn't know about that pdict key. Ideally we'd implement a single `couch_fdb` module to be shared between `couch_jobs` and `fabric2_db` but until then it maybe simpler to just let `couch_jobs` use its own metadata key. This way, it doesn't get invalidated or bumped every time dbs get recreated or design docs are updated. The only time it would be bumped is if the FDB layer prefix changed at runtime.
-rw-r--r--src/couch_jobs/src/couch_jobs.hrl1
-rw-r--r--src/couch_jobs/src/couch_jobs_fdb.erl27
-rw-r--r--src/couch_jobs/test/couch_jobs_tests.erl22
3 files changed, 45 insertions, 5 deletions
diff --git a/src/couch_jobs/src/couch_jobs.hrl b/src/couch_jobs/src/couch_jobs.hrl
index 055bf091c..bb561b136 100644
--- a/src/couch_jobs/src/couch_jobs.hrl
+++ b/src/couch_jobs/src/couch_jobs.hrl
@@ -35,6 +35,7 @@
-define(ACTIVITY, 6).
+-define(COUCH_JOBS_MD_VERSION, <<"couch_jobs_md_version">>).
-define(COUCH_JOBS_EVENT, '$couch_jobs_event').
-define(COUCH_JOBS_CURRENT, '$couch_jobs_current').
-define(UNDEFINED_MAX_SCHEDULED_TIME, 1 bsl 36).
diff --git a/src/couch_jobs/src/couch_jobs_fdb.erl b/src/couch_jobs/src/couch_jobs_fdb.erl
index a08b78fc1..a81a313d8 100644
--- a/src/couch_jobs/src/couch_jobs_fdb.erl
+++ b/src/couch_jobs/src/couch_jobs_fdb.erl
@@ -46,7 +46,10 @@
tx/2,
get_job/2,
- get_jobs/0
+ get_jobs/0,
+
+ bump_metadata_version/0,
+ bump_metadata_version/1
]).
@@ -485,6 +488,19 @@ get_jobs() ->
end).
+% Call this function if the top level "couchdb" FDB directory layer
+% changes.
+%
+bump_metadata_version() ->
+ fabric2_fdb:transactional(fun(Tx) ->
+ bump_metadata_version(Tx)
+ end).
+
+
+bump_metadata_version(Tx) ->
+ erlfdb:set_versionstamped_value(Tx, ?COUCH_JOBS_MD_VERSION, <<0:112>>).
+
+
% Private helper functions
maybe_enqueue(#{jtx := true} = JTx, Type, JobId, STime, Resubmit, Data) ->
@@ -617,7 +633,6 @@ init_jtx(undefined) ->
init_jtx({erlfdb_transaction, _} = Tx) ->
LayerPrefix = fabric2_fdb:get_dir(Tx),
Jobs = erlfdb_tuple:pack({?JOBS}, LayerPrefix),
- Version = erlfdb:wait(erlfdb:get(Tx, ?METADATA_VERSION_KEY)),
% layer_prefix, md_version and tx here match db map fields in fabric2_fdb
% but we also assert that this is a job transaction using the jtx => true
% field
@@ -626,7 +641,7 @@ init_jtx({erlfdb_transaction, _} = Tx) ->
tx => Tx,
layer_prefix => LayerPrefix,
jobs_path => Jobs,
- md_version => Version
+ md_version => get_metadata_version(Tx)
}.
@@ -641,13 +656,17 @@ ensure_current(#{jtx := true, tx := Tx} = JTx) ->
end.
+get_metadata_version({erlfdb_transaction, _} = Tx) ->
+ erlfdb:wait(erlfdb:get_ss(Tx, ?COUCH_JOBS_MD_VERSION)).
+
+
update_current(#{tx := Tx, md_version := Version} = JTx) ->
case get_md_version_age(Version) of
Age when Age =< ?MD_VERSION_MAX_AGE_SEC ->
% Looked it up not too long ago. Avoid looking it up to frequently
JTx;
_ ->
- case erlfdb:wait(erlfdb:get(Tx, ?METADATA_VERSION_KEY)) of
+ case get_metadata_version(Tx) of
Version ->
update_md_version_timestamp(Version),
JTx;
diff --git a/src/couch_jobs/test/couch_jobs_tests.erl b/src/couch_jobs/test/couch_jobs_tests.erl
index a7e085e40..62a75c83e 100644
--- a/src/couch_jobs/test/couch_jobs_tests.erl
+++ b/src/couch_jobs/test/couch_jobs_tests.erl
@@ -54,7 +54,8 @@ couch_jobs_basic_test_() ->
fun enqueue_inactive/1,
fun remove_running_job/1,
fun check_get_jobs/1,
- fun use_fabric_transaction_object/1
+ fun use_fabric_transaction_object/1,
+ fun metadata_version_bump/1
]
}
}
@@ -604,3 +605,22 @@ use_fabric_transaction_object(#{t1 := T1, j1 := J1, dbname := DbName}) ->
ok = couch_jobs:remove(#{tx => undefined}, T1, J1),
ok = fabric2_db:delete(DbName, [])
end).
+
+
+metadata_version_bump(_) ->
+ ?_test(begin
+ JTx1 = couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(Tx) -> Tx end),
+ ?assertMatch(#{md_version := not_found}, JTx1),
+
+ ets:delete_all_objects(couch_jobs_fdb),
+ couch_jobs_fdb:bump_metadata_version(),
+ JTx2 = couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(Tx) -> Tx end),
+ ?assertMatch(#{md_version := Bin} when is_binary(Bin), JTx2),
+
+ ets:delete_all_objects(couch_jobs_fdb),
+ couch_jobs_fdb:bump_metadata_version(),
+ JTx3 = couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(Tx) -> Tx end),
+ OldMdv = maps:get(md_version, JTx2),
+ NewMdv = maps:get(md_version, JTx3),
+ ?assert(NewMdv > OldMdv)
+ end).