summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2020-04-09 16:48:25 -0400
committerNick Vatamaniuc <nickva@users.noreply.github.com>2020-04-09 17:10:10 -0400
commit56137f341e0fd22c7027a52b5c7e5eb1aa75aee0 (patch)
treecda65cdd4115b905fa7e17cdd296805c6ea687e2
parent396a3b595c7b62d1e272d95f3bdafed2fad7f188 (diff)
downloadcouchdb-56137f341e0fd22c7027a52b5c7e5eb1aa75aee0.tar.gz
Fix job removal notifications
Fix the case when a job is removed while there are subscribers waiting for it. Most of the logic was already there except: * Handle the case when when data decoded from subscription results could be `not_found`, in that case we just pass that atom back as is. * Need to notify the watch when jobs are removed or couch_jobs_notifiers would wake up and send notification messages.
-rw-r--r--src/couch_jobs/src/couch_jobs_fdb.erl4
-rw-r--r--src/couch_jobs/test/couch_jobs_tests.erl28
2 files changed, 32 insertions, 0 deletions
diff --git a/src/couch_jobs/src/couch_jobs_fdb.erl b/src/couch_jobs/src/couch_jobs_fdb.erl
index 4c8cd9f37..891aedc79 100644
--- a/src/couch_jobs/src/couch_jobs_fdb.erl
+++ b/src/couch_jobs/src/couch_jobs_fdb.erl
@@ -122,6 +122,7 @@ remove(#{jtx := true} = JTx0, #{job := true} = Job) ->
#jv{stime = STime} ->
couch_jobs_pending:remove(JTx, Type, JobId, STime),
erlfdb:clear(Tx, Key),
+ update_watch(JTx, Type),
ok;
not_found ->
{error, not_found}
@@ -422,6 +423,9 @@ encode_data(#{} = JobData) ->
end.
+decode_data(not_found) ->
+ not_found;
+
decode_data(#{} = JobData) ->
JobData;
diff --git a/src/couch_jobs/test/couch_jobs_tests.erl b/src/couch_jobs/test/couch_jobs_tests.erl
index af95eebe6..fbe4e93a0 100644
--- a/src/couch_jobs/test/couch_jobs_tests.erl
+++ b/src/couch_jobs/test/couch_jobs_tests.erl
@@ -56,6 +56,8 @@ couch_jobs_basic_test_() ->
fun accept_max_schedtime/1,
fun accept_no_schedule/1,
fun subscribe/1,
+ fun remove_when_subscribed_and_pending/1,
+ fun remove_when_subscribed_and_running/1,
fun subscribe_wait_multiple/1,
fun enqueue_inactive/1,
fun remove_running_job/1,
@@ -571,6 +573,32 @@ subscribe(#{t1 := T, j1 := J}) ->
end).
+remove_when_subscribed_and_pending(#{t1 := T, j1 := J}) ->
+ ?_test(begin
+ ok = couch_jobs:add(?TX, T, J, #{<<"x">> => 1}),
+ {ok, SId, pending, _} = couch_jobs:subscribe(T, J),
+
+ couch_jobs:remove(?TX, T, J),
+
+ ?assertMatch({T, J, not_found, not_found}, couch_jobs:wait(SId, 5000)),
+ ?assertEqual(timeout, couch_jobs:wait(SId, 50))
+ end).
+
+
+remove_when_subscribed_and_running(#{t1 := T, j1 := J}) ->
+ ?_test(begin
+ ok = couch_jobs:add(?TX, T, J, #{<<"z">> => 2}),
+ {ok, SId, pending, _} = couch_jobs:subscribe(T, J),
+ {ok, #{}, _} = couch_jobs:accept(T),
+ ?assertMatch({_, _, running, _}, couch_jobs:wait(SId, 5000)),
+
+ couch_jobs:remove(?TX, T, J),
+
+ ?assertMatch({T, J, not_found, not_found}, couch_jobs:wait(SId, 5000)),
+ ?assertEqual(timeout, couch_jobs:wait(SId, 50))
+ end).
+
+
subscribe_wait_multiple(#{t1 := T, j1 := J1, j2 := J2}) ->
?_test(begin
ok = couch_jobs:add(?TX, T, J1, #{}),