summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorgarren smith <garren.smith@gmail.com>2020-03-10 19:34:33 +0200
committerGitHub <noreply@github.com>2020-03-10 19:34:33 +0200
commit1457c2826eceed7cd6244e1130c6f9c499fd2bb3 (patch)
tree54fca7b47a2a74c03f69a0392390d72c321bc680
parent6b5985fa4e0abc787698cb458050203c01250adc (diff)
downloadcouchdb-1457c2826eceed7cd6244e1130c6f9c499fd2bb3.tar.gz
couch_jobs resubmit updates job data (#2649)
* couch_jobs resubmit updates job data When a job is either pending or finished and the job is resubmitted with new data the job data is updated.
-rw-r--r--src/couch_jobs/src/couch_jobs.erl8
-rw-r--r--src/couch_jobs/src/couch_jobs_fdb.erl25
-rw-r--r--src/couch_jobs/test/couch_jobs_tests.erl63
3 files changed, 90 insertions, 6 deletions
diff --git a/src/couch_jobs/src/couch_jobs.erl b/src/couch_jobs/src/couch_jobs.erl
index c134f5ac5..d9ea0fbfa 100644
--- a/src/couch_jobs/src/couch_jobs.erl
+++ b/src/couch_jobs/src/couch_jobs.erl
@@ -27,6 +27,7 @@
finish/3,
resubmit/2,
resubmit/3,
+ resubmit/4,
is_resubmitted/1,
update/2,
update/3,
@@ -151,6 +152,13 @@ resubmit(Tx, #{jlock := <<_/binary>>} = Job, SchedTime) ->
end).
+-spec resubmit(jtx(), job(), scheduled_time(), job_data()) -> {ok, job()} | {error, any()}.
+resubmit(Tx, #{jlock := <<_/binary>>} = Job, SchedTime, Data) ->
+ couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
+ couch_jobs_fdb:resubmit(JTx, Job, SchedTime, Data)
+ end).
+
+
-spec is_resubmitted(job()) -> true | false.
is_resubmitted(#{job := true} = Job) ->
maps:get(resubmit, Job, false).
diff --git a/src/couch_jobs/src/couch_jobs_fdb.erl b/src/couch_jobs/src/couch_jobs_fdb.erl
index 8c1ab7ac5..4c8cd9f37 100644
--- a/src/couch_jobs/src/couch_jobs_fdb.erl
+++ b/src/couch_jobs/src/couch_jobs_fdb.erl
@@ -23,6 +23,7 @@
accept/4,
finish/3,
resubmit/3,
+ resubmit/4,
update/3,
set_type_timeout/3,
@@ -98,7 +99,7 @@ add(#{jtx := true} = JTx0, Type, JobId, Data, STime) ->
Key = job_key(JTx, Job),
case erlfdb:wait(erlfdb:get(Tx, Key)) of
<<_/binary>> ->
- {ok, Job1} = resubmit(JTx, Job, STime),
+ {ok, Job1} = resubmit(JTx, Job, STime, Data),
#{seq := Seq, state := State, data := Data1} = Job1,
{ok, State, Seq, Data1};
not_found ->
@@ -205,8 +206,11 @@ finish(#{jtx := true} = JTx0, #{jlock := <<_/binary>>} = Job, Data) when
{error, halt}
end.
+resubmit(JTx0, Job, NewSTime) ->
+ resubmit(JTx0, Job, NewSTime, undefined).
-resubmit(#{jtx := true} = JTx0, #{job := true} = Job, NewSTime) ->
+
+resubmit(#{jtx := true} = JTx0, #{job := true} = Job, NewSTime, NewData) ->
#{tx := Tx} = JTx = get_jtx(JTx0),
#{type := Type, id := JobId} = Job,
Key = job_key(JTx, Job),
@@ -218,11 +222,12 @@ resubmit(#{jtx := true} = JTx0, #{job := true} = Job, NewSTime) ->
end,
case job_state(JLock, Seq) of
finished ->
- ok = maybe_enqueue(JTx, Type, JobId, STime, true, Data),
+ ok = maybe_enqueue(JTx, Type, JobId, STime, true, NewData),
+ NewData1 = update_job_data(Data, NewData),
Job1 = Job#{
seq => ?PENDING_SEQ,
state => pending,
- data => Data
+ data => NewData1
},
{ok, Job1};
pending when STime == OldSTime ->
@@ -237,15 +242,16 @@ resubmit(#{jtx := true} = JTx0, #{job := true} = Job, NewSTime) ->
},
{ok, Job1};
pending ->
- JV1 = JV#jv{seq = ?PENDING_SEQ, stime = STime},
+ JV1 = JV#jv{seq = ?PENDING_SEQ, stime = STime, data = NewData},
set_job_val(Tx, Key, JV1),
couch_jobs_pending:remove(JTx, Type, JobId, OldSTime),
couch_jobs_pending:enqueue(JTx, Type, STime, JobId),
+ NewData1 = update_job_data(Data, NewData),
Job1 = Job#{
stime => STime,
seq => ?PENDING_SEQ,
state => pending,
- data => Data
+ data => NewData1
},
{ok, Job1};
running ->
@@ -705,3 +711,10 @@ get_md_version_age(Version) ->
update_md_version_timestamp(Version) ->
Ts = erlang:system_time(second),
ets:insert(?MODULE, {?MD_TIMESTAMP_ETS_KEY, Version, Ts}).
+
+
+update_job_data(Data, undefined) ->
+ Data;
+
+update_job_data(_Data, NewData) ->
+ NewData.
diff --git a/src/couch_jobs/test/couch_jobs_tests.erl b/src/couch_jobs/test/couch_jobs_tests.erl
index 9d8e2df50..af95eebe6 100644
--- a/src/couch_jobs/test/couch_jobs_tests.erl
+++ b/src/couch_jobs/test/couch_jobs_tests.erl
@@ -47,7 +47,12 @@ couch_jobs_basic_test_() ->
fun accept_blocking/1,
fun job_processor_update/1,
fun resubmit_enqueues_job/1,
+ fun resubmit_finished_updates_job_data/1,
+ fun resubmit_running_does_not_update_job_data/1,
fun resubmit_custom_schedtime/1,
+ fun add_pending_updates_job_data/1,
+ fun add_finished_updates_job_data/1,
+ fun add_running_does_not_update_job_data/1,
fun accept_max_schedtime/1,
fun accept_no_schedule/1,
fun subscribe/1,
@@ -426,6 +431,30 @@ resubmit_enqueues_job(#{t1 := T, j1 := J}) ->
end).
+resubmit_finished_updates_job_data(#{t1 := T, j1 := J}) ->
+ ?_test(begin
+ Data1 = #{<<"test">> => 1},
+ Data2 = #{<<"test">> => 2},
+ ok = couch_jobs:add(?TX, T, J, Data1),
+ {ok, Job1, #{}} = couch_jobs:accept(T),
+ ?assertEqual(ok, couch_jobs:finish(?TX, Job1)),
+ ?assertMatch({ok, _}, couch_jobs:resubmit(?TX, Job1, 6, Data2)),
+ ?assertMatch({ok, _, Data2}, couch_jobs:accept(T))
+ end).
+
+
+resubmit_running_does_not_update_job_data(#{t1 := T, j1 := J}) ->
+ ?_test(begin
+ Data1 = #{<<"test">> => 1},
+ Data2 = #{<<"test">> => 2},
+ ok = couch_jobs:add(?TX, T, J, Data1),
+ {ok, Job1, #{}} = couch_jobs:accept(T),
+ ?assertMatch({ok, _}, couch_jobs:resubmit(?TX, Job1, 6, Data2)),
+ ?assertEqual(ok, couch_jobs:finish(?TX, Job1)),
+ ?assertMatch({ok, _, Data1}, couch_jobs:accept(T))
+ end).
+
+
resubmit_custom_schedtime(#{t1 := T, j1 := J}) ->
?_test(begin
?assertEqual(ok, couch_jobs:add(?TX, T, J, #{}, 7)),
@@ -436,6 +465,40 @@ resubmit_custom_schedtime(#{t1 := T, j1 := J}) ->
end).
+add_pending_updates_job_data(#{t1 := T, j1 := J}) ->
+ ?_test(begin
+ Data1 = #{<<"test">> => 1},
+ Data2 = #{<<"test">> => 2},
+ ok = couch_jobs:add(?TX, T, J, Data1),
+ ?assertEqual(ok, couch_jobs:add(?TX, T, J, Data2, 6)),
+ ?assertMatch({ok, _, Data2}, couch_jobs:accept(T))
+ end).
+
+
+add_finished_updates_job_data(#{t1 := T, j1 := J}) ->
+ ?_test(begin
+ Data1 = #{<<"test">> => 1},
+ Data2 = #{<<"test">> => 2},
+ ok = couch_jobs:add(?TX, T, J, Data1),
+ {ok, Job1, #{}} = couch_jobs:accept(T),
+ ?assertEqual(ok, couch_jobs:finish(?TX, Job1)),
+ ?assertEqual(ok, couch_jobs:add(?TX, T, J, Data2, 6)),
+ ?assertMatch({ok, _, Data2}, couch_jobs:accept(T))
+ end).
+
+
+add_running_does_not_update_job_data(#{t1 := T, j1 := J}) ->
+ ?_test(begin
+ Data1 = #{<<"test">> => 1},
+ Data2 = #{<<"test">> => 2},
+ ok = couch_jobs:add(?TX, T, J, Data1),
+ {ok, Job1, #{}} = couch_jobs:accept(T),
+ ?assertEqual(ok, couch_jobs:add(?TX, T, J, Data2, 6)),
+ ?assertEqual(ok, couch_jobs:finish(?TX, Job1)),
+ ?assertMatch({ok, _, Data1}, couch_jobs:accept(T))
+ end).
+
+
accept_max_schedtime(#{t1 := T, j1 := J1, j2 := J2}) ->
?_test(begin
ok = couch_jobs:add(?TX, T, J1, #{}, 5000),