summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarren Smith <garren.smith@gmail.com>2020-03-10 16:44:46 +0200
committerGarren Smith <garren.smith@gmail.com>2020-03-10 19:24:05 +0200
commit53bff3e6b1ef5e049d88a268044946672e9947f3 (patch)
treed2372050479b870650ef56ba09fa81aa89f0450a
parentd0c4f8e297eb3387895a2329a929deaee25f637e (diff)
downloadcouchdb-jobs-new-data.tar.gz
-rw-r--r--src/couch_jobs/src/couch_jobs.erl6
-rw-r--r--src/couch_jobs/src/couch_jobs_fdb.erl6
-rw-r--r--src/couch_jobs/test/couch_jobs_tests.erl52
3 files changed, 45 insertions, 19 deletions
diff --git a/src/couch_jobs/src/couch_jobs.erl b/src/couch_jobs/src/couch_jobs.erl
index bd9e35372..d9ea0fbfa 100644
--- a/src/couch_jobs/src/couch_jobs.erl
+++ b/src/couch_jobs/src/couch_jobs.erl
@@ -152,10 +152,10 @@ resubmit(Tx, #{jlock := <<_/binary>>} = Job, SchedTime) ->
end).
--spec resubmit(jtx(), job(), job_data(), scheduled_time()) -> {ok, job()} | {error, any()}.
-resubmit(Tx, #{jlock := <<_/binary>>} = Job, Data, SchedTime) ->
+-spec resubmit(jtx(), job(), scheduled_time(), job_data()) -> {ok, job()} | {error, any()}.
+resubmit(Tx, #{jlock := <<_/binary>>} = Job, SchedTime, Data) ->
couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
- couch_jobs_fdb:resubmit(JTx, Job, Data, SchedTime)
+ couch_jobs_fdb:resubmit(JTx, Job, SchedTime, Data)
end).
diff --git a/src/couch_jobs/src/couch_jobs_fdb.erl b/src/couch_jobs/src/couch_jobs_fdb.erl
index 7ff38fab7..4c8cd9f37 100644
--- a/src/couch_jobs/src/couch_jobs_fdb.erl
+++ b/src/couch_jobs/src/couch_jobs_fdb.erl
@@ -99,7 +99,7 @@ add(#{jtx := true} = JTx0, Type, JobId, Data, STime) ->
Key = job_key(JTx, Job),
case erlfdb:wait(erlfdb:get(Tx, Key)) of
<<_/binary>> ->
- {ok, Job1} = resubmit(JTx, Job, Data, STime),
+ {ok, Job1} = resubmit(JTx, Job, STime, Data),
#{seq := Seq, state := State, data := Data1} = Job1,
{ok, State, Seq, Data1};
not_found ->
@@ -207,10 +207,10 @@ finish(#{jtx := true} = JTx0, #{jlock := <<_/binary>>} = Job, Data) when
end.
resubmit(JTx0, Job, NewSTime) ->
- resubmit(JTx0, Job, undefined, NewSTime).
+ resubmit(JTx0, Job, NewSTime, undefined).
-resubmit(#{jtx := true} = JTx0, #{job := true} = Job, NewData, NewSTime) ->
+resubmit(#{jtx := true} = JTx0, #{job := true} = Job, NewSTime, NewData) ->
#{tx := Tx} = JTx = get_jtx(JTx0),
#{type := Type, id := JobId} = Job,
Key = job_key(JTx, Job),
diff --git a/src/couch_jobs/test/couch_jobs_tests.erl b/src/couch_jobs/test/couch_jobs_tests.erl
index e22023e62..af95eebe6 100644
--- a/src/couch_jobs/test/couch_jobs_tests.erl
+++ b/src/couch_jobs/test/couch_jobs_tests.erl
@@ -47,10 +47,12 @@ couch_jobs_basic_test_() ->
fun accept_blocking/1,
fun job_processor_update/1,
fun resubmit_enqueues_job/1,
- fun resubmit_pending_updates_job_data/1,
fun resubmit_finished_updates_job_data/1,
fun resubmit_running_does_not_update_job_data/1,
fun resubmit_custom_schedtime/1,
+ fun add_pending_updates_job_data/1,
+ fun add_finished_updates_job_data/1,
+ fun add_running_does_not_update_job_data/1,
fun accept_max_schedtime/1,
fun accept_no_schedule/1,
fun subscribe/1,
@@ -429,16 +431,6 @@ resubmit_enqueues_job(#{t1 := T, j1 := J}) ->
end).
-resubmit_pending_updates_job_data(#{t1 := T, j1 := J}) ->
- ?_test(begin
- Data1 = #{<<"test">> => 1},
- Data2 = #{<<"test">> => 2},
- ok = couch_jobs:add(?TX, T, J, Data1),
- ?assertEqual(ok, couch_jobs:add(?TX, T, J, Data2, 6)),
- ?assertMatch({ok, _, Data2}, couch_jobs:accept(T))
- end).
-
-
resubmit_finished_updates_job_data(#{t1 := T, j1 := J}) ->
?_test(begin
Data1 = #{<<"test">> => 1},
@@ -446,7 +438,7 @@ resubmit_finished_updates_job_data(#{t1 := T, j1 := J}) ->
ok = couch_jobs:add(?TX, T, J, Data1),
{ok, Job1, #{}} = couch_jobs:accept(T),
?assertEqual(ok, couch_jobs:finish(?TX, Job1)),
- ?assertEqual(ok, couch_jobs:add(?TX, T, J, Data2, 6)),
+ ?assertMatch({ok, _}, couch_jobs:resubmit(?TX, Job1, 6, Data2)),
?assertMatch({ok, _, Data2}, couch_jobs:accept(T))
end).
@@ -457,7 +449,7 @@ resubmit_running_does_not_update_job_data(#{t1 := T, j1 := J}) ->
Data2 = #{<<"test">> => 2},
ok = couch_jobs:add(?TX, T, J, Data1),
{ok, Job1, #{}} = couch_jobs:accept(T),
- ?assertEqual(ok, couch_jobs:add(?TX, T, J, Data2, 6)),
+ ?assertMatch({ok, _}, couch_jobs:resubmit(?TX, Job1, 6, Data2)),
?assertEqual(ok, couch_jobs:finish(?TX, Job1)),
?assertMatch({ok, _, Data1}, couch_jobs:accept(T))
end).
@@ -473,6 +465,40 @@ resubmit_custom_schedtime(#{t1 := T, j1 := J}) ->
end).
+add_pending_updates_job_data(#{t1 := T, j1 := J}) ->
+ ?_test(begin
+ Data1 = #{<<"test">> => 1},
+ Data2 = #{<<"test">> => 2},
+ ok = couch_jobs:add(?TX, T, J, Data1),
+ ?assertEqual(ok, couch_jobs:add(?TX, T, J, Data2, 6)),
+ ?assertMatch({ok, _, Data2}, couch_jobs:accept(T))
+ end).
+
+
+add_finished_updates_job_data(#{t1 := T, j1 := J}) ->
+ ?_test(begin
+ Data1 = #{<<"test">> => 1},
+ Data2 = #{<<"test">> => 2},
+ ok = couch_jobs:add(?TX, T, J, Data1),
+ {ok, Job1, #{}} = couch_jobs:accept(T),
+ ?assertEqual(ok, couch_jobs:finish(?TX, Job1)),
+ ?assertEqual(ok, couch_jobs:add(?TX, T, J, Data2, 6)),
+ ?assertMatch({ok, _, Data2}, couch_jobs:accept(T))
+ end).
+
+
+add_running_does_not_update_job_data(#{t1 := T, j1 := J}) ->
+ ?_test(begin
+ Data1 = #{<<"test">> => 1},
+ Data2 = #{<<"test">> => 2},
+ ok = couch_jobs:add(?TX, T, J, Data1),
+ {ok, Job1, #{}} = couch_jobs:accept(T),
+ ?assertEqual(ok, couch_jobs:add(?TX, T, J, Data2, 6)),
+ ?assertEqual(ok, couch_jobs:finish(?TX, Job1)),
+ ?assertMatch({ok, _, Data1}, couch_jobs:accept(T))
+ end).
+
+
accept_max_schedtime(#{t1 := T, j1 := J1, j2 := J2}) ->
?_test(begin
ok = couch_jobs:add(?TX, T, J1, #{}, 5000),