diff options
author | garren smith <garren.smith@gmail.com> | 2020-03-10 19:34:33 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-03-10 19:34:33 +0200 |
commit | 1457c2826eceed7cd6244e1130c6f9c499fd2bb3 (patch) | |
tree | 54fca7b47a2a74c03f69a0392390d72c321bc680 | |
parent | 6b5985fa4e0abc787698cb458050203c01250adc (diff) | |
download | couchdb-1457c2826eceed7cd6244e1130c6f9c499fd2bb3.tar.gz |
couch_jobs resubmit updates job data (#2649)
* couch_jobs resubmit updates job data
When a job is either pending or finished and the job is resubmitted
with new data the job data is updated.
-rw-r--r-- | src/couch_jobs/src/couch_jobs.erl | 8 | ||||
-rw-r--r-- | src/couch_jobs/src/couch_jobs_fdb.erl | 25 | ||||
-rw-r--r-- | src/couch_jobs/test/couch_jobs_tests.erl | 63 |
3 files changed, 90 insertions, 6 deletions
diff --git a/src/couch_jobs/src/couch_jobs.erl b/src/couch_jobs/src/couch_jobs.erl index c134f5ac5..d9ea0fbfa 100644 --- a/src/couch_jobs/src/couch_jobs.erl +++ b/src/couch_jobs/src/couch_jobs.erl @@ -27,6 +27,7 @@ finish/3, resubmit/2, resubmit/3, + resubmit/4, is_resubmitted/1, update/2, update/3, @@ -151,6 +152,13 @@ resubmit(Tx, #{jlock := <<_/binary>>} = Job, SchedTime) -> end). +-spec resubmit(jtx(), job(), scheduled_time(), job_data()) -> {ok, job()} | {error, any()}. +resubmit(Tx, #{jlock := <<_/binary>>} = Job, SchedTime, Data) -> + couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) -> + couch_jobs_fdb:resubmit(JTx, Job, SchedTime, Data) + end). + + -spec is_resubmitted(job()) -> true | false. is_resubmitted(#{job := true} = Job) -> maps:get(resubmit, Job, false). diff --git a/src/couch_jobs/src/couch_jobs_fdb.erl b/src/couch_jobs/src/couch_jobs_fdb.erl index 8c1ab7ac5..4c8cd9f37 100644 --- a/src/couch_jobs/src/couch_jobs_fdb.erl +++ b/src/couch_jobs/src/couch_jobs_fdb.erl @@ -23,6 +23,7 @@ accept/4, finish/3, resubmit/3, + resubmit/4, update/3, set_type_timeout/3, @@ -98,7 +99,7 @@ add(#{jtx := true} = JTx0, Type, JobId, Data, STime) -> Key = job_key(JTx, Job), case erlfdb:wait(erlfdb:get(Tx, Key)) of <<_/binary>> -> - {ok, Job1} = resubmit(JTx, Job, STime), + {ok, Job1} = resubmit(JTx, Job, STime, Data), #{seq := Seq, state := State, data := Data1} = Job1, {ok, State, Seq, Data1}; not_found -> @@ -205,8 +206,11 @@ finish(#{jtx := true} = JTx0, #{jlock := <<_/binary>>} = Job, Data) when {error, halt} end. +resubmit(JTx0, Job, NewSTime) -> + resubmit(JTx0, Job, NewSTime, undefined). -resubmit(#{jtx := true} = JTx0, #{job := true} = Job, NewSTime) -> + +resubmit(#{jtx := true} = JTx0, #{job := true} = Job, NewSTime, NewData) -> #{tx := Tx} = JTx = get_jtx(JTx0), #{type := Type, id := JobId} = Job, Key = job_key(JTx, Job), @@ -218,11 +222,12 @@ resubmit(#{jtx := true} = JTx0, #{job := true} = Job, NewSTime) -> end, case job_state(JLock, Seq) of finished -> - ok = maybe_enqueue(JTx, Type, JobId, STime, true, Data), + ok = maybe_enqueue(JTx, Type, JobId, STime, true, NewData), + NewData1 = update_job_data(Data, NewData), Job1 = Job#{ seq => ?PENDING_SEQ, state => pending, - data => Data + data => NewData1 }, {ok, Job1}; pending when STime == OldSTime -> @@ -237,15 +242,16 @@ resubmit(#{jtx := true} = JTx0, #{job := true} = Job, NewSTime) -> }, {ok, Job1}; pending -> - JV1 = JV#jv{seq = ?PENDING_SEQ, stime = STime}, + JV1 = JV#jv{seq = ?PENDING_SEQ, stime = STime, data = NewData}, set_job_val(Tx, Key, JV1), couch_jobs_pending:remove(JTx, Type, JobId, OldSTime), couch_jobs_pending:enqueue(JTx, Type, STime, JobId), + NewData1 = update_job_data(Data, NewData), Job1 = Job#{ stime => STime, seq => ?PENDING_SEQ, state => pending, - data => Data + data => NewData1 }, {ok, Job1}; running -> @@ -705,3 +711,10 @@ get_md_version_age(Version) -> update_md_version_timestamp(Version) -> Ts = erlang:system_time(second), ets:insert(?MODULE, {?MD_TIMESTAMP_ETS_KEY, Version, Ts}). + + +update_job_data(Data, undefined) -> + Data; + +update_job_data(_Data, NewData) -> + NewData. diff --git a/src/couch_jobs/test/couch_jobs_tests.erl b/src/couch_jobs/test/couch_jobs_tests.erl index 9d8e2df50..af95eebe6 100644 --- a/src/couch_jobs/test/couch_jobs_tests.erl +++ b/src/couch_jobs/test/couch_jobs_tests.erl @@ -47,7 +47,12 @@ couch_jobs_basic_test_() -> fun accept_blocking/1, fun job_processor_update/1, fun resubmit_enqueues_job/1, + fun resubmit_finished_updates_job_data/1, + fun resubmit_running_does_not_update_job_data/1, fun resubmit_custom_schedtime/1, + fun add_pending_updates_job_data/1, + fun add_finished_updates_job_data/1, + fun add_running_does_not_update_job_data/1, fun accept_max_schedtime/1, fun accept_no_schedule/1, fun subscribe/1, @@ -426,6 +431,30 @@ resubmit_enqueues_job(#{t1 := T, j1 := J}) -> end). +resubmit_finished_updates_job_data(#{t1 := T, j1 := J}) -> + ?_test(begin + Data1 = #{<<"test">> => 1}, + Data2 = #{<<"test">> => 2}, + ok = couch_jobs:add(?TX, T, J, Data1), + {ok, Job1, #{}} = couch_jobs:accept(T), + ?assertEqual(ok, couch_jobs:finish(?TX, Job1)), + ?assertMatch({ok, _}, couch_jobs:resubmit(?TX, Job1, 6, Data2)), + ?assertMatch({ok, _, Data2}, couch_jobs:accept(T)) + end). + + +resubmit_running_does_not_update_job_data(#{t1 := T, j1 := J}) -> + ?_test(begin + Data1 = #{<<"test">> => 1}, + Data2 = #{<<"test">> => 2}, + ok = couch_jobs:add(?TX, T, J, Data1), + {ok, Job1, #{}} = couch_jobs:accept(T), + ?assertMatch({ok, _}, couch_jobs:resubmit(?TX, Job1, 6, Data2)), + ?assertEqual(ok, couch_jobs:finish(?TX, Job1)), + ?assertMatch({ok, _, Data1}, couch_jobs:accept(T)) + end). + + resubmit_custom_schedtime(#{t1 := T, j1 := J}) -> ?_test(begin ?assertEqual(ok, couch_jobs:add(?TX, T, J, #{}, 7)), @@ -436,6 +465,40 @@ resubmit_custom_schedtime(#{t1 := T, j1 := J}) -> end). +add_pending_updates_job_data(#{t1 := T, j1 := J}) -> + ?_test(begin + Data1 = #{<<"test">> => 1}, + Data2 = #{<<"test">> => 2}, + ok = couch_jobs:add(?TX, T, J, Data1), + ?assertEqual(ok, couch_jobs:add(?TX, T, J, Data2, 6)), + ?assertMatch({ok, _, Data2}, couch_jobs:accept(T)) + end). + + +add_finished_updates_job_data(#{t1 := T, j1 := J}) -> + ?_test(begin + Data1 = #{<<"test">> => 1}, + Data2 = #{<<"test">> => 2}, + ok = couch_jobs:add(?TX, T, J, Data1), + {ok, Job1, #{}} = couch_jobs:accept(T), + ?assertEqual(ok, couch_jobs:finish(?TX, Job1)), + ?assertEqual(ok, couch_jobs:add(?TX, T, J, Data2, 6)), + ?assertMatch({ok, _, Data2}, couch_jobs:accept(T)) + end). + + +add_running_does_not_update_job_data(#{t1 := T, j1 := J}) -> + ?_test(begin + Data1 = #{<<"test">> => 1}, + Data2 = #{<<"test">> => 2}, + ok = couch_jobs:add(?TX, T, J, Data1), + {ok, Job1, #{}} = couch_jobs:accept(T), + ?assertEqual(ok, couch_jobs:add(?TX, T, J, Data2, 6)), + ?assertEqual(ok, couch_jobs:finish(?TX, Job1)), + ?assertMatch({ok, _, Data1}, couch_jobs:accept(T)) + end). + + accept_max_schedtime(#{t1 := T, j1 := J1, j2 := J2}) -> ?_test(begin ok = couch_jobs:add(?TX, T, J1, #{}, 5000), |