diff options
author | Nick Vatamaniuc <vatamane@apache.org> | 2020-04-09 16:48:25 -0400 |
---|---|---|
committer | Nick Vatamaniuc <nickva@users.noreply.github.com> | 2020-04-09 17:10:10 -0400 |
commit | 56137f341e0fd22c7027a52b5c7e5eb1aa75aee0 (patch) | |
tree | cda65cdd4115b905fa7e17cdd296805c6ea687e2 | |
parent | 396a3b595c7b62d1e272d95f3bdafed2fad7f188 (diff) | |
download | couchdb-56137f341e0fd22c7027a52b5c7e5eb1aa75aee0.tar.gz |
Fix job removal notifications
Fix the case when a job is removed while there are subscribers waiting for it.
Most of the logic was already there except:
* Handle the case when when data decoded from subscription results could be
`not_found`, in that case we just pass that atom back as is.
* Need to notify the watch when jobs are removed or couch_jobs_notifiers would
wake up and send notification messages.
-rw-r--r-- | src/couch_jobs/src/couch_jobs_fdb.erl | 4 | ||||
-rw-r--r-- | src/couch_jobs/test/couch_jobs_tests.erl | 28 |
2 files changed, 32 insertions, 0 deletions
diff --git a/src/couch_jobs/src/couch_jobs_fdb.erl b/src/couch_jobs/src/couch_jobs_fdb.erl index 4c8cd9f37..891aedc79 100644 --- a/src/couch_jobs/src/couch_jobs_fdb.erl +++ b/src/couch_jobs/src/couch_jobs_fdb.erl @@ -122,6 +122,7 @@ remove(#{jtx := true} = JTx0, #{job := true} = Job) -> #jv{stime = STime} -> couch_jobs_pending:remove(JTx, Type, JobId, STime), erlfdb:clear(Tx, Key), + update_watch(JTx, Type), ok; not_found -> {error, not_found} @@ -422,6 +423,9 @@ encode_data(#{} = JobData) -> end. +decode_data(not_found) -> + not_found; + decode_data(#{} = JobData) -> JobData; diff --git a/src/couch_jobs/test/couch_jobs_tests.erl b/src/couch_jobs/test/couch_jobs_tests.erl index af95eebe6..fbe4e93a0 100644 --- a/src/couch_jobs/test/couch_jobs_tests.erl +++ b/src/couch_jobs/test/couch_jobs_tests.erl @@ -56,6 +56,8 @@ couch_jobs_basic_test_() -> fun accept_max_schedtime/1, fun accept_no_schedule/1, fun subscribe/1, + fun remove_when_subscribed_and_pending/1, + fun remove_when_subscribed_and_running/1, fun subscribe_wait_multiple/1, fun enqueue_inactive/1, fun remove_running_job/1, @@ -571,6 +573,32 @@ subscribe(#{t1 := T, j1 := J}) -> end). +remove_when_subscribed_and_pending(#{t1 := T, j1 := J}) -> + ?_test(begin + ok = couch_jobs:add(?TX, T, J, #{<<"x">> => 1}), + {ok, SId, pending, _} = couch_jobs:subscribe(T, J), + + couch_jobs:remove(?TX, T, J), + + ?assertMatch({T, J, not_found, not_found}, couch_jobs:wait(SId, 5000)), + ?assertEqual(timeout, couch_jobs:wait(SId, 50)) + end). + + +remove_when_subscribed_and_running(#{t1 := T, j1 := J}) -> + ?_test(begin + ok = couch_jobs:add(?TX, T, J, #{<<"z">> => 2}), + {ok, SId, pending, _} = couch_jobs:subscribe(T, J), + {ok, #{}, _} = couch_jobs:accept(T), + ?assertMatch({_, _, running, _}, couch_jobs:wait(SId, 5000)), + + couch_jobs:remove(?TX, T, J), + + ?assertMatch({T, J, not_found, not_found}, couch_jobs:wait(SId, 5000)), + ?assertEqual(timeout, couch_jobs:wait(SId, 50)) + end). + + subscribe_wait_multiple(#{t1 := T, j1 := J1, j2 := J2}) -> ?_test(begin ok = couch_jobs:add(?TX, T, J1, #{}), |