diff options
author | Nick Vatamaniuc <vatamane@apache.org> | 2021-03-10 18:16:42 -0500 |
---|---|---|
committer | Nick Vatamaniuc <nickva@users.noreply.github.com> | 2021-03-12 14:15:56 -0500 |
commit | 304a0634a2e06fdedd0746d39984e27147391842 (patch) | |
tree | 66f56f673131ce7ffbcd7a9ef0f42b2ea7c697a7 | |
parent | 63700c38dfe5f0c023e243362cd7101d5ec09058 (diff) | |
download | couchdb-304a0634a2e06fdedd0746d39984e27147391842.tar.gz |
Fix couch_jobs to be less flaky
It turns out fabric is dependent on couch_jobs because of db expiration module.
So when couch_jobs was restarted multiple times per test case it could have
brought down fabric. However, since couch_jobs needs fabric for transactional
stuff it ended up brining couch_jobs app down as well.
To fix it:
* Switch to explicitly starting/stopping fabric and couch_jobs together
* Break appart bad_messages* tests to individually test each type of message
as app restarts in the middle of the tests kept killing fabric and
intermettently killing couch_jobs a well.
* Also make the tests look nicer by re-using ?TDEF_FE macros from
`fabric2_test`, this we can avoid the `?_test(begin... end).` pattern.
* Remove meck:unload since we don't really meck anything in the module
* Don't need to spend time cleaning out database as we don't really create
that many dbs (just one) and that one gets cleaned out in its own test.
-rw-r--r-- | src/couch_jobs/test/couch_jobs_tests.erl | 986 |
1 files changed, 454 insertions, 532 deletions
diff --git a/src/couch_jobs/test/couch_jobs_tests.erl b/src/couch_jobs/test/couch_jobs_tests.erl index 11572a4b9..b40e52e8a 100644 --- a/src/couch_jobs/test/couch_jobs_tests.erl +++ b/src/couch_jobs/test/couch_jobs_tests.erl @@ -16,6 +16,7 @@ -include_lib("couch/include/couch_db.hrl"). -include_lib("couch/include/couch_eunit.hrl"). -include_lib("eunit/include/eunit.hrl"). +-include_lib("fabric/test/fabric2_test.hrl"). % Job creation API can take an undefined Tx object @@ -33,37 +34,43 @@ couch_jobs_basic_test_() -> foreach, fun setup/0, fun teardown/1, [ - fun add_remove_pending/1, - fun add_remove_errors/1, - fun add_with_the_same_scheduled_time/1, - fun get_job_data_and_state/1, - fun resubmit_as_job_creator/1, - fun type_timeouts_and_server/1, - fun dead_notifier_restarts_jobs_server/1, - fun bad_messages_restart_couch_jobs_server/1, - fun bad_messages_restart_notifier/1, - fun bad_messages_restart_activity_monitor/1, - fun basic_accept_and_finish/1, - fun accept_blocking/1, - fun job_processor_update/1, - fun resubmit_enqueues_job/1, - fun resubmit_finished_updates_job_data/1, - fun resubmit_running_does_not_update_job_data/1, - fun resubmit_custom_schedtime/1, - fun add_pending_updates_job_data/1, - fun add_finished_updates_job_data/1, - fun add_running_does_not_update_job_data/1, - fun accept_max_schedtime/1, - fun accept_no_schedule/1, - fun subscribe/1, - fun remove_when_subscribed_and_pending/1, - fun remove_when_subscribed_and_running/1, - fun subscribe_wait_multiple/1, - fun enqueue_inactive/1, - fun remove_running_job/1, - fun check_get_jobs/1, - fun use_fabric_transaction_object/1, - fun metadata_version_bump/1 + ?TDEF_FE(add_remove_pending), + ?TDEF_FE(add_remove_errors), + ?TDEF_FE(add_with_the_same_scheduled_time), + ?TDEF_FE(get_job_data_and_state), + ?TDEF_FE(resubmit_as_job_creator), + ?TDEF_FE(type_timeouts_and_server, 15), + ?TDEF_FE(dead_notifier_restarts_jobs_server), + ?TDEF_FE(bad_cast_restarts_couch_jobs_server), + ?TDEF_FE(bad_call_restarts_couch_jobs_server), + ?TDEF_FE(bad_info_restarts_couch_jobs_server), + ?TDEF_FE(bad_cast_restarts_notifier), + ?TDEF_FE(bad_call_restarts_notifier), + ?TDEF_FE(bad_info_restarts_notifier), + ?TDEF_FE(bad_cast_restarts_activity_monitor), + ?TDEF_FE(bad_call_restarts_activity_monitor), + ?TDEF_FE(bad_info_restarts_activity_monitor), + ?TDEF_FE(basic_accept_and_finish), + ?TDEF_FE(accept_blocking), + ?TDEF_FE(job_processor_update), + ?TDEF_FE(resubmit_enqueues_job), + ?TDEF_FE(resubmit_finished_updates_job_data), + ?TDEF_FE(resubmit_running_does_not_update_job_data), + ?TDEF_FE(resubmit_custom_schedtime), + ?TDEF_FE(add_pending_updates_job_data), + ?TDEF_FE(add_finished_updates_job_data), + ?TDEF_FE(add_running_does_not_update_job_data), + ?TDEF_FE(accept_max_schedtime), + ?TDEF_FE(accept_no_schedule), + ?TDEF_FE(subscribe), + ?TDEF_FE(remove_when_subscribed_and_pending), + ?TDEF_FE(remove_when_subscribed_and_running), + ?TDEF_FE(subscribe_wait_multiple), + ?TDEF_FE(enqueue_inactive, 15), + ?TDEF_FE(remove_running_job), + ?TDEF_FE(check_get_jobs), + ?TDEF_FE(use_fabric_transaction_object), + ?TDEF_FE(metadata_version_bump) ] } } @@ -75,11 +82,11 @@ setup_couch() -> teardown_couch(Ctx) -> - test_util:stop_couch(Ctx), - meck:unload(). + test_util:stop_couch(Ctx). setup() -> + application:start(fabric), application:start(couch_jobs), clear_jobs(), T1 = {<<"t1">>, 1024}, % a complex type should work @@ -98,15 +105,10 @@ setup() -> }. -teardown(#{dbname := DbName}) -> - clear_jobs(), +teardown(#{}) -> application:stop(couch_jobs), - AllDbs = fabric2_db:list_dbs(), - case lists:member(DbName, AllDbs) of - true -> ok = fabric2_db:delete(DbName, []); - false -> ok - end, - meck:unload(). + application:stop(fabric), + ok. clear_jobs() -> @@ -116,647 +118,567 @@ clear_jobs() -> end). -restart_app() -> - application:stop(couch_jobs), - application:start(couch_jobs), - couch_jobs_server:force_check_types(). - - get_job(Type, JobId) -> couch_jobs_fdb:get_job(Type, JobId). add_remove_pending(#{t1 := T1, j1 := J1, t2 := T2, j2 := J2}) -> - ?_test(begin - ?assertEqual(ok, couch_jobs:add(?TX, T1, J1, #{})), - ?assertMatch(#{state := pending, data := #{}}, get_job(T1, J1)), - ?assertEqual(ok, couch_jobs:remove(?TX, T1, J1)), - % Data and numeric type should work as well. Also do it in a - % transaction - Data = #{<<"x">> => 42}, - ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) -> - couch_jobs:add(Tx, T2, J2, Data) - end)), - ?assertMatch(#{state := pending, data := Data}, get_job(T2, J2)), - ?assertEqual(ok, couch_jobs:remove(?TX, T2, J2)) - end). + ?assertEqual(ok, couch_jobs:add(?TX, T1, J1, #{})), + ?assertMatch(#{state := pending, data := #{}}, get_job(T1, J1)), + ?assertEqual(ok, couch_jobs:remove(?TX, T1, J1)), + % Data and numeric type should work as well. Also do it in a + % transaction + Data = #{<<"x">> => 42}, + ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) -> + couch_jobs:add(Tx, T2, J2, Data) + end)), + ?assertMatch(#{state := pending, data := Data}, get_job(T2, J2)), + ?assertEqual(ok, couch_jobs:remove(?TX, T2, J2)). + get_job_data_and_state(#{t1 := T, j1 := J}) -> - ?_test(begin - Data = #{<<"x">> => 42}, - ok = couch_jobs:add(?TX, T, J, Data), - ?assertEqual({ok, Data}, couch_jobs:get_job_data(?TX, T, J)), - ?assertEqual({ok, pending}, couch_jobs:get_job_state(?TX, T, J)), - ?assertEqual(ok, couch_jobs:remove(?TX, T, J)), - ?assertEqual({error, not_found}, couch_jobs:get_job_data(?TX, T, J)), - ?assertEqual({error, not_found}, couch_jobs:get_job_state(?TX, T, J)) - end). + Data = #{<<"x">> => 42}, + ok = couch_jobs:add(?TX, T, J, Data), + ?assertEqual({ok, Data}, couch_jobs:get_job_data(?TX, T, J)), + ?assertEqual({ok, pending}, couch_jobs:get_job_state(?TX, T, J)), + ?assertEqual(ok, couch_jobs:remove(?TX, T, J)), + ?assertEqual({error, not_found}, couch_jobs:get_job_data(?TX, T, J)), + ?assertEqual({error, not_found}, couch_jobs:get_job_state(?TX, T, J)). add_remove_errors(#{t1 := T, j1 := J}) -> - ?_test(begin - ?assertEqual({error, not_found}, couch_jobs:remove(?TX, 999, <<"x">>)), - ?assertMatch({error, {json_encoding_error, _}}, couch_jobs:add(?TX, T, - J, #{1 => 2})), - ?assertEqual({error, no_type_timeout}, couch_jobs:add(?TX, <<"x">>, J, - #{})), - ?assertEqual(ok, couch_jobs:add(?TX, T, J, #{})), - ?assertEqual(ok, couch_jobs:add(?TX, T, J, #{})), - ?assertEqual(ok, couch_jobs:remove(?TX, T, J)) - end). + ?assertEqual({error, not_found}, couch_jobs:remove(?TX, 999, <<"x">>)), + ?assertMatch({error, {json_encoding_error, _}}, couch_jobs:add(?TX, T, + J, #{1 => 2})), + ?assertEqual({error, no_type_timeout}, couch_jobs:add(?TX, <<"x">>, J, + #{})), + ?assertEqual(ok, couch_jobs:add(?TX, T, J, #{})), + ?assertEqual(ok, couch_jobs:add(?TX, T, J, #{})), + ?assertEqual(ok, couch_jobs:remove(?TX, T, J)). add_with_the_same_scheduled_time(#{t1 := T, j1 := J}) -> - ?_test(begin - ?assertEqual(ok, couch_jobs:add(?TX, T, J, #{})), - fabric2_fdb:transactional(fun(Tx) -> - ?assertEqual(ok, couch_jobs:add(Tx, T, J, #{})), - ?assert(erlfdb:is_read_only(Tx)) - end) + ?assertEqual(ok, couch_jobs:add(?TX, T, J, #{})), + fabric2_fdb:transactional(fun(Tx) -> + ?assertEqual(ok, couch_jobs:add(Tx, T, J, #{})), + ?assert(erlfdb:is_read_only(Tx)) end). resubmit_as_job_creator(#{t1 := T, j1 := J}) -> - ?_test(begin - Data = #{<<"x">> => 42}, - ok = couch_jobs:add(?TX, T, J, Data, 15), - - % Job was pending, doesn't get resubmitted - ok = couch_jobs:add(?TX, T, J, Data, 16), - ?assertMatch(#{state := pending, stime := 16}, get_job(T, J)), - - {ok, Job1, Data} = couch_jobs:accept(T), - - % If is running, it gets flagged to be resubmitted - ok = couch_jobs:add(?TX, T, J, Data, 17), - ?assertMatch(#{state := running, stime := 17}, get_job(T, J)), - ?assertEqual(true, couch_jobs:is_resubmitted(get_job(T, J))), - - ?assertEqual(ok, couch_jobs:finish(?TX, Job1)), - % It should be pending according to the resubmit flag - ?assertMatch(#{state := pending, stime := 17}, get_job(T, J)), - - % A finished job will be re-enqueued - {ok, Job2, _} = couch_jobs:accept(T), - ?assertEqual(ok, couch_jobs:finish(?TX, Job2)), - ?assertMatch(#{state := finished, stime := 17}, get_job(T, J)), - ok = couch_jobs:add(?TX, T, J, Data, 18), - ?assertMatch(#{state := pending, stime := 18}, get_job(T, J)) - end). - - -type_timeouts_and_server(#{t1 := T, t1_timeout := T1Timeout}) -> - {timeout, 15, ?_test(begin + Data = #{<<"x">> => 42}, + ok = couch_jobs:add(?TX, T, J, Data, 15), - WaitForActivityMonitors = fun(N) -> - test_util:wait(fun() -> - Pids = couch_jobs_activity_monitor_sup:get_child_pids(), - case length(Pids) == N of - true -> ok; - false -> wait - end - end) - end, + % Job was pending, doesn't get resubmitted + ok = couch_jobs:add(?TX, T, J, Data, 16), + ?assertMatch(#{state := pending, stime := 16}, get_job(T, J)), - WaitForNotifiers = fun(N) -> - test_util:wait(fun() -> - Pids = couch_jobs_notifier_sup:get_child_pids(), - case length(Pids) == N of - true -> ok; - false -> wait - end - end) - end, + {ok, Job1, Data} = couch_jobs:accept(T), - couch_jobs_server:force_check_types(), + % If is running, it gets flagged to be resubmitted + ok = couch_jobs:add(?TX, T, J, Data, 17), + ?assertMatch(#{state := running, stime := 17}, get_job(T, J)), + ?assertEqual(true, couch_jobs:is_resubmitted(get_job(T, J))), - ?assertEqual(T1Timeout, couch_jobs:get_type_timeout(T)), + ?assertEqual(ok, couch_jobs:finish(?TX, Job1)), + % It should be pending according to the resubmit flag + ?assertMatch(#{state := pending, stime := 17}, get_job(T, J)), - WaitForActivityMonitors(2), - ?assertEqual(2, - length(couch_jobs_activity_monitor_sup:get_child_pids())), + % A finished job will be re-enqueued + {ok, Job2, _} = couch_jobs:accept(T), + ?assertEqual(ok, couch_jobs:finish(?TX, Job2)), + ?assertMatch(#{state := finished, stime := 17}, get_job(T, J)), + ok = couch_jobs:add(?TX, T, J, Data, 18), + ?assertMatch(#{state := pending, stime := 18}, get_job(T, J)). - WaitForNotifiers(2), - ?assertEqual(2, length(couch_jobs_notifier_sup:get_child_pids())), - ?assertMatch({ok, _}, couch_jobs_server:get_notifier_server(T)), +type_timeouts_and_server(#{t1 := T, t1_timeout := T1Timeout}) -> + WaitForActivityMonitors = fun(N) -> + test_util:wait(fun() -> + Pids = couch_jobs_activity_monitor_sup:get_child_pids(), + case length(Pids) == N of + true -> ok; + false -> wait + end + end) + end, - ?assertEqual(ok, couch_jobs:set_type_timeout(<<"t3">>, 8)), - couch_jobs_server:force_check_types(), + WaitForNotifiers = fun(N) -> + test_util:wait(fun() -> + Pids = couch_jobs_notifier_sup:get_child_pids(), + case length(Pids) == N of + true -> ok; + false -> wait + end + end) + end, - WaitForActivityMonitors(3), - ?assertEqual(3, - length(couch_jobs_activity_monitor_sup:get_child_pids())), + couch_jobs_server:force_check_types(), - WaitForNotifiers(3), - ?assertEqual(3, length(couch_jobs_notifier_sup:get_child_pids())), + ?assertEqual(T1Timeout, couch_jobs:get_type_timeout(T)), - ?assertEqual(ok, couch_jobs:clear_type_timeout(<<"t3">>)), - couch_jobs_server:force_check_types(), + WaitForActivityMonitors(2), + ?assertEqual(2, + length(couch_jobs_activity_monitor_sup:get_child_pids())), - WaitForActivityMonitors(2), - ?assertEqual(2, - length(couch_jobs_activity_monitor_sup:get_child_pids())), + WaitForNotifiers(2), + ?assertEqual(2, length(couch_jobs_notifier_sup:get_child_pids())), - WaitForNotifiers(2), - ?assertEqual(2, - length(couch_jobs_notifier_sup:get_child_pids())), + ?assertMatch({ok, _}, couch_jobs_server:get_notifier_server(T)), - ?assertMatch({error, _}, - couch_jobs_server:get_notifier_server(<<"t3">>)), + ?assertEqual(ok, couch_jobs:set_type_timeout(<<"t3">>, 8)), + couch_jobs_server:force_check_types(), - ?assertEqual(not_found, couch_jobs:get_type_timeout(<<"t3">>)) - end)}. + WaitForActivityMonitors(3), + ?assertEqual(3, + length(couch_jobs_activity_monitor_sup:get_child_pids())), + WaitForNotifiers(3), + ?assertEqual(3, length(couch_jobs_notifier_sup:get_child_pids())), -dead_notifier_restarts_jobs_server(#{}) -> - ?_test(begin - couch_jobs_server:force_check_types(), + ?assertEqual(ok, couch_jobs:clear_type_timeout(<<"t3">>)), + couch_jobs_server:force_check_types(), - ServerPid = whereis(couch_jobs_server), - Ref = monitor(process, ServerPid), + WaitForActivityMonitors(2), + ?assertEqual(2, + length(couch_jobs_activity_monitor_sup:get_child_pids())), - [Notifier1, _Notifier2] = couch_jobs_notifier_sup:get_child_pids(), - exit(Notifier1, kill), + WaitForNotifiers(2), + ?assertEqual(2, + length(couch_jobs_notifier_sup:get_child_pids())), - % Killing a notifier should kill the server as well - receive {'DOWN', Ref, _, _, _} -> ok end - end). + ?assertMatch({error, _}, + couch_jobs_server:get_notifier_server(<<"t3">>)), + ?assertEqual(not_found, couch_jobs:get_type_timeout(<<"t3">>)). -bad_messages_restart_couch_jobs_server(#{}) -> - ?_test(begin - % couch_jobs_server dies on bad cast - ServerPid1 = whereis(couch_jobs_server), - Ref1 = monitor(process, ServerPid1), - gen_server:cast(ServerPid1, bad_cast), - receive {'DOWN', Ref1, _, _, _} -> ok end, - restart_app(), +dead_notifier_restarts_jobs_server(#{}) -> + couch_jobs_server:force_check_types(), - % couch_jobs_server dies on bad call - ServerPid2 = whereis(couch_jobs_server), - Ref2 = monitor(process, ServerPid2), - catch gen_server:call(ServerPid2, bad_call), - receive {'DOWN', Ref2, _, _, _} -> ok end, + ServerPid = whereis(couch_jobs_server), + Ref = monitor(process, ServerPid), - restart_app(), + [Notifier1, _Notifier2] = couch_jobs_notifier_sup:get_child_pids(), + exit(Notifier1, kill), - % couch_jobs_server dies on bad info - ServerPid3 = whereis(couch_jobs_server), - Ref3 = monitor(process, ServerPid3), - ServerPid3 ! a_random_message, - receive {'DOWN', Ref3, _, _, _} -> ok end, + % Killing a notifier should kill the server as well + receive {'DOWN', Ref, _, _, _} -> ok end. - restart_app() - end). +bad_cast_restarts_couch_jobs_server(#{}) -> + ServerPid1 = whereis(couch_jobs_server), + Ref1 = monitor(process, ServerPid1), + gen_server:cast(ServerPid1, bad_cast), + receive {'DOWN', Ref1, _, _, _} -> ok end. -bad_messages_restart_notifier(#{}) -> - ?_test(begin - couch_jobs_server:force_check_types(), - % bad cast kills the activity monitor - [AMon1, _] = couch_jobs_notifier_sup:get_child_pids(), - Ref1 = monitor(process, AMon1), - gen_server:cast(AMon1, bad_cast), - receive {'DOWN', Ref1, _, _, _} -> ok end, +bad_call_restarts_couch_jobs_server(#{}) -> + ServerPid2 = whereis(couch_jobs_server), + Ref2 = monitor(process, ServerPid2), + catch gen_server:call(ServerPid2, bad_call), + receive {'DOWN', Ref2, _, _, _} -> ok end. - restart_app(), - % bad calls restart activity monitor - [AMon2, _] = couch_jobs_notifier_sup:get_child_pids(), - Ref2 = monitor(process, AMon2), - catch gen_server:call(AMon2, bad_call), - receive {'DOWN', Ref2, _, _, _} -> ok end, +bad_info_restarts_couch_jobs_server(#{}) -> + ServerPid3 = whereis(couch_jobs_server), + Ref3 = monitor(process, ServerPid3), + ServerPid3 ! a_random_message, + receive {'DOWN', Ref3, _, _, _} -> ok end. - restart_app(), - % bad info message kills activity monitor - [AMon3, _] = couch_jobs_notifier_sup:get_child_pids(), - Ref3 = monitor(process, AMon3), - AMon3 ! a_bad_message, - receive {'DOWN', Ref3, _, _, _} -> ok end, +bad_cast_restarts_notifier(#{}) -> + couch_jobs_server:force_check_types(), + [AMon1, _] = couch_jobs_notifier_sup:get_child_pids(), + Ref1 = monitor(process, AMon1), + gen_server:cast(AMon1, bad_cast), + receive {'DOWN', Ref1, _, _, _} -> ok end. - restart_app() - end). +bad_call_restarts_notifier(#{}) -> + couch_jobs_server:force_check_types(), + [AMon2, _] = couch_jobs_notifier_sup:get_child_pids(), + Ref2 = monitor(process, AMon2), + catch gen_server:call(AMon2, bad_call), + receive {'DOWN', Ref2, _, _, _} -> ok end. -bad_messages_restart_activity_monitor(#{}) -> - ?_test(begin - couch_jobs_server:force_check_types(), +bad_info_restarts_notifier(#{}) -> + couch_jobs_server:force_check_types(), + [AMon3, _] = couch_jobs_notifier_sup:get_child_pids(), + Ref3 = monitor(process, AMon3), + AMon3 ! a_bad_message, + receive {'DOWN', Ref3, _, _, _} -> ok end. - % bad cast kills the activity monitor - [AMon1, _] = couch_jobs_activity_monitor_sup:get_child_pids(), - Ref1 = monitor(process, AMon1), - gen_server:cast(AMon1, bad_cast), - receive {'DOWN', Ref1, _, _, _} -> ok end, - restart_app(), +bad_cast_restarts_activity_monitor(#{}) -> + couch_jobs_server:force_check_types(), + [AMon1, _] = couch_jobs_activity_monitor_sup:get_child_pids(), + Ref1 = monitor(process, AMon1), + gen_server:cast(AMon1, bad_cast), + receive {'DOWN', Ref1, _, _, _} -> ok end. - % bad calls restart activity monitor - [AMon2, _] = couch_jobs_activity_monitor_sup:get_child_pids(), - Ref2 = monitor(process, AMon2), - catch gen_server:call(AMon2, bad_call), - receive {'DOWN', Ref2, _, _, _} -> ok end, - restart_app(), +bad_call_restarts_activity_monitor(#{}) -> + couch_jobs_server:force_check_types(), + [AMon2, _] = couch_jobs_activity_monitor_sup:get_child_pids(), + Ref2 = monitor(process, AMon2), + catch gen_server:call(AMon2, bad_call), + receive {'DOWN', Ref2, _, _, _} -> ok end. - % bad info message kills activity monitor - [AMon3, _] = couch_jobs_activity_monitor_sup:get_child_pids(), - Ref3 = monitor(process, AMon3), - AMon3 ! a_bad_message, - receive {'DOWN', Ref3, _, _, _} -> ok end, - restart_app() - end). +bad_info_restarts_activity_monitor(#{}) -> + couch_jobs_server:force_check_types(), + % bad info message kills activity monitor + [AMon3, _] = couch_jobs_activity_monitor_sup:get_child_pids(), + Ref3 = monitor(process, AMon3), + AMon3 ! a_bad_message, + receive {'DOWN', Ref3, _, _, _} -> ok end. basic_accept_and_finish(#{t1 := T, j1 := J}) -> - ?_test(begin - ok = couch_jobs:add(?TX, T, J, #{}), - {ok, Job, #{}} = couch_jobs:accept(T), - ?assertMatch(#{state := running}, get_job(T, J)), - % check json validation for bad data in finish - ?assertMatch({error, {json_encoding_error, _}}, - fabric2_fdb:transactional(fun(Tx) -> - couch_jobs:finish(Tx, Job, #{1 => 1}) - end)), - Data = #{<<"x">> => 42}, - ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) -> - couch_jobs:finish(Tx, Job, Data) + ok = couch_jobs:add(?TX, T, J, #{}), + {ok, Job, #{}} = couch_jobs:accept(T), + ?assertMatch(#{state := running}, get_job(T, J)), + % check json validation for bad data in finish + ?assertMatch({error, {json_encoding_error, _}}, + fabric2_fdb:transactional(fun(Tx) -> + couch_jobs:finish(Tx, Job, #{1 => 1}) end)), - ?assertMatch(#{state := finished, data := Data}, get_job(T, J)) - end). + Data = #{<<"x">> => 42}, + ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) -> + couch_jobs:finish(Tx, Job, Data) + end)), + ?assertMatch(#{state := finished, data := Data}, get_job(T, J)). accept_blocking(#{t1 := T, j1 := J1, j2 := J2}) -> - ?_test(begin - Accept = fun() -> exit(couch_jobs:accept(T)) end, - WaitAccept = fun(Ref) -> - receive - {'DOWN', Ref, _, _, Res} -> Res - after - 500 -> timeout - end - end, - {_, Ref1} = spawn_monitor(Accept), - ok = couch_jobs:add(?TX, T, J1, #{}), - ?assertMatch({ok, #{id := J1}, #{}}, WaitAccept(Ref1)), - {_, Ref2} = spawn_monitor(Accept), - ?assertEqual(timeout, WaitAccept(Ref2)), - ok = couch_jobs:add(?TX, T, J2, #{}), - ?assertMatch({ok, #{id := J2}, #{}}, WaitAccept(Ref2)) - end). + Accept = fun() -> exit(couch_jobs:accept(T)) end, + WaitAccept = fun(Ref) -> + receive + {'DOWN', Ref, _, _, Res} -> Res + after + 500 -> timeout + end + end, + {_, Ref1} = spawn_monitor(Accept), + ok = couch_jobs:add(?TX, T, J1, #{}), + ?assertMatch({ok, #{id := J1}, #{}}, WaitAccept(Ref1)), + {_, Ref2} = spawn_monitor(Accept), + ?assertEqual(timeout, WaitAccept(Ref2)), + ok = couch_jobs:add(?TX, T, J2, #{}), + ?assertMatch({ok, #{id := J2}, #{}}, WaitAccept(Ref2)). job_processor_update(#{t1 := T, j1 := J}) -> - ?_test(begin - ok = couch_jobs:add(?TX, T, J, #{}), - {ok, Job, #{}} = couch_jobs:accept(T), + ok = couch_jobs:add(?TX, T, J, #{}), + {ok, Job, #{}} = couch_jobs:accept(T), - % Use proper transactions in a few places here instead of passing in - % ?TX This is mostly to increase code coverage + % Use proper transactions in a few places here instead of passing in + % ?TX This is mostly to increase code coverage - ?assertMatch({ok, #{job := true}}, fabric2_fdb:transactional(fun(Tx) -> - couch_jobs:update(Tx, Job, #{<<"x">> => 1}) - end)), + ?assertMatch({ok, #{job := true}}, fabric2_fdb:transactional(fun(Tx) -> + couch_jobs:update(Tx, Job, #{<<"x">> => 1}) + end)), - ?assertMatch(#{data := #{<<"x">> := 1}, state := running}, - get_job(T, J)), + ?assertMatch(#{data := #{<<"x">> := 1}, state := running}, + get_job(T, J)), - ?assertMatch({ok, #{job := true}}, fabric2_fdb:transactional(fun(Tx) -> - couch_jobs:update(Tx, Job) - end)), + ?assertMatch({ok, #{job := true}}, fabric2_fdb:transactional(fun(Tx) -> + couch_jobs:update(Tx, Job) + end)), - ?assertMatch(#{data := #{<<"x">> := 1}, state := running}, - get_job(T, J)), + ?assertMatch(#{data := #{<<"x">> := 1}, state := running}, + get_job(T, J)), - ?assertMatch({ok, #{job := true}}, fabric2_fdb:transactional(fun(Tx) -> - couch_jobs:update(Tx, Job, #{<<"x">> => 2}) - end)), + ?assertMatch({ok, #{job := true}}, fabric2_fdb:transactional(fun(Tx) -> + couch_jobs:update(Tx, Job, #{<<"x">> => 2}) + end)), - % check json validation for bad data in update - ?assertMatch({error, {json_encoding_error, _}}, - fabric2_fdb:transactional(fun(Tx) -> - couch_jobs:update(Tx, Job, #{1 => 1}) - end)), + % check json validation for bad data in update + ?assertMatch({error, {json_encoding_error, _}}, + fabric2_fdb:transactional(fun(Tx) -> + couch_jobs:update(Tx, Job, #{1 => 1}) + end)), - ?assertMatch(#{data := #{<<"x">> := 2}, state := running}, - get_job(T, J)), + ?assertMatch(#{data := #{<<"x">> := 2}, state := running}, + get_job(T, J)), - % Finish may update the data as well - ?assertEqual(ok, couch_jobs:finish(?TX, Job, #{<<"x">> => 3})), - ?assertMatch(#{data := #{<<"x">> := 3}, state := finished}, - get_job(T, J)) - end). + % Finish may update the data as well + ?assertEqual(ok, couch_jobs:finish(?TX, Job, #{<<"x">> => 3})), + ?assertMatch(#{data := #{<<"x">> := 3}, state := finished}, + get_job(T, J)). resubmit_enqueues_job(#{t1 := T, j1 := J}) -> - ?_test(begin - ok = couch_jobs:add(?TX, T, J, #{}), - {ok, Job1, #{}} = couch_jobs:accept(T), - ?assertMatch({ok, _}, couch_jobs:resubmit(?TX, Job1, 6)), - ?assertEqual(ok, couch_jobs:finish(?TX, Job1)), - ?assertMatch(#{state := pending, stime := 6}, get_job(T, J)), - {ok, Job2, #{}} = couch_jobs:accept(T), - ?assertEqual(ok, couch_jobs:finish(?TX, Job2)), - ?assertMatch(#{state := finished}, get_job(T, J)) - end). + ok = couch_jobs:add(?TX, T, J, #{}), + {ok, Job1, #{}} = couch_jobs:accept(T), + ?assertMatch({ok, _}, couch_jobs:resubmit(?TX, Job1, 6)), + ?assertEqual(ok, couch_jobs:finish(?TX, Job1)), + ?assertMatch(#{state := pending, stime := 6}, get_job(T, J)), + {ok, Job2, #{}} = couch_jobs:accept(T), + ?assertEqual(ok, couch_jobs:finish(?TX, Job2)), + ?assertMatch(#{state := finished}, get_job(T, J)). + resubmit_finished_updates_job_data(#{t1 := T, j1 := J}) -> - ?_test(begin - Data1 = #{<<"test">> => 1}, - Data2 = #{<<"test">> => 2}, - ok = couch_jobs:add(?TX, T, J, Data1), - {ok, Job1, #{}} = couch_jobs:accept(T), - ?assertEqual(ok, couch_jobs:finish(?TX, Job1)), - ?assertMatch({ok, _}, couch_jobs:resubmit(?TX, Job1, 6, Data2)), - ?assertMatch({ok, _, Data2}, couch_jobs:accept(T)) - end). + Data1 = #{<<"test">> => 1}, + Data2 = #{<<"test">> => 2}, + ok = couch_jobs:add(?TX, T, J, Data1), + {ok, Job1, #{}} = couch_jobs:accept(T), + ?assertEqual(ok, couch_jobs:finish(?TX, Job1)), + ?assertMatch({ok, _}, couch_jobs:resubmit(?TX, Job1, 6, Data2)), + ?assertMatch({ok, _, Data2}, couch_jobs:accept(T)). resubmit_running_does_not_update_job_data(#{t1 := T, j1 := J}) -> - ?_test(begin - Data1 = #{<<"test">> => 1}, - Data2 = #{<<"test">> => 2}, - ok = couch_jobs:add(?TX, T, J, Data1), - {ok, Job1, #{}} = couch_jobs:accept(T), - ?assertMatch({ok, _}, couch_jobs:resubmit(?TX, Job1, 6, Data2)), - ?assertEqual(ok, couch_jobs:finish(?TX, Job1)), - ?assertMatch({ok, _, Data1}, couch_jobs:accept(T)) - end). + Data1 = #{<<"test">> => 1}, + Data2 = #{<<"test">> => 2}, + ok = couch_jobs:add(?TX, T, J, Data1), + {ok, Job1, #{}} = couch_jobs:accept(T), + ?assertMatch({ok, _}, couch_jobs:resubmit(?TX, Job1, 6, Data2)), + ?assertEqual(ok, couch_jobs:finish(?TX, Job1)), + ?assertMatch({ok, _, Data1}, couch_jobs:accept(T)). resubmit_custom_schedtime(#{t1 := T, j1 := J}) -> - ?_test(begin - ?assertEqual(ok, couch_jobs:add(?TX, T, J, #{}, 7)), - {ok, Job, #{}} = couch_jobs:accept(T), - ?assertMatch({ok, _}, couch_jobs:resubmit(?TX, Job, 9)), - ?assertEqual(ok, couch_jobs:finish(?TX, Job)), - ?assertMatch(#{stime := 9, state := pending}, get_job(T, J)) - end). + ?assertEqual(ok, couch_jobs:add(?TX, T, J, #{}, 7)), + {ok, Job, #{}} = couch_jobs:accept(T), + ?assertMatch({ok, _}, couch_jobs:resubmit(?TX, Job, 9)), + ?assertEqual(ok, couch_jobs:finish(?TX, Job)), + ?assertMatch(#{stime := 9, state := pending}, get_job(T, J)). add_pending_updates_job_data(#{t1 := T, j1 := J}) -> - ?_test(begin - Data1 = #{<<"test">> => 1}, - Data2 = #{<<"test">> => 2}, - ok = couch_jobs:add(?TX, T, J, Data1), - ?assertEqual(ok, couch_jobs:add(?TX, T, J, Data2, 6)), - ?assertMatch({ok, _, Data2}, couch_jobs:accept(T)) - end). + Data1 = #{<<"test">> => 1}, + Data2 = #{<<"test">> => 2}, + ok = couch_jobs:add(?TX, T, J, Data1), + ?assertEqual(ok, couch_jobs:add(?TX, T, J, Data2, 6)), + ?assertMatch({ok, _, Data2}, couch_jobs:accept(T)). add_finished_updates_job_data(#{t1 := T, j1 := J}) -> - ?_test(begin - Data1 = #{<<"test">> => 1}, - Data2 = #{<<"test">> => 2}, - ok = couch_jobs:add(?TX, T, J, Data1), - {ok, Job1, #{}} = couch_jobs:accept(T), - ?assertEqual(ok, couch_jobs:finish(?TX, Job1)), - ?assertEqual(ok, couch_jobs:add(?TX, T, J, Data2, 6)), - ?assertMatch({ok, _, Data2}, couch_jobs:accept(T)) - end). + Data1 = #{<<"test">> => 1}, + Data2 = #{<<"test">> => 2}, + ok = couch_jobs:add(?TX, T, J, Data1), + {ok, Job1, #{}} = couch_jobs:accept(T), + ?assertEqual(ok, couch_jobs:finish(?TX, Job1)), + ?assertEqual(ok, couch_jobs:add(?TX, T, J, Data2, 6)), + ?assertMatch({ok, _, Data2}, couch_jobs:accept(T)). add_running_does_not_update_job_data(#{t1 := T, j1 := J}) -> - ?_test(begin - Data1 = #{<<"test">> => 1}, - Data2 = #{<<"test">> => 2}, - ok = couch_jobs:add(?TX, T, J, Data1), - {ok, Job1, #{}} = couch_jobs:accept(T), - ?assertEqual(ok, couch_jobs:add(?TX, T, J, Data2, 6)), - ?assertEqual(ok, couch_jobs:finish(?TX, Job1)), - ?assertMatch({ok, _, Data1}, couch_jobs:accept(T)) - end). + Data1 = #{<<"test">> => 1}, + Data2 = #{<<"test">> => 2}, + ok = couch_jobs:add(?TX, T, J, Data1), + {ok, Job1, #{}} = couch_jobs:accept(T), + ?assertEqual(ok, couch_jobs:add(?TX, T, J, Data2, 6)), + ?assertEqual(ok, couch_jobs:finish(?TX, Job1)), + ?assertMatch({ok, _, Data1}, couch_jobs:accept(T)). accept_max_schedtime(#{t1 := T, j1 := J1, j2 := J2}) -> - ?_test(begin - ok = couch_jobs:add(?TX, T, J1, #{}, 5000), - ok = couch_jobs:add(?TX, T, J2, #{}, 3000), - ?assertEqual({error, not_found}, couch_jobs:accept(T, - #{max_sched_time => 1000})), - ?assertMatch({ok, #{id := J2}, _}, couch_jobs:accept(T, - #{max_sched_time => 3000})), - ?assertMatch({ok, #{id := J1}, _}, couch_jobs:accept(T, - #{max_sched_time => 9000})) - end). + ok = couch_jobs:add(?TX, T, J1, #{}, 5000), + ok = couch_jobs:add(?TX, T, J2, #{}, 3000), + ?assertEqual({error, not_found}, couch_jobs:accept(T, + #{max_sched_time => 1000})), + ?assertMatch({ok, #{id := J2}, _}, couch_jobs:accept(T, + #{max_sched_time => 3000})), + ?assertMatch({ok, #{id := J1}, _}, couch_jobs:accept(T, + #{max_sched_time => 9000})). accept_no_schedule(#{t1 := T}) -> - ?_test(begin - JobCount = 25, - Jobs = [fabric2_util:uuid() || _ <- lists:seq(1, JobCount)], - [couch_jobs:add(?TX, T, J, #{}) || J <- Jobs], - InvalidOpts = #{no_schedule => true, max_sched_time => 1}, - ?assertMatch({error, _}, couch_jobs:accept(T, InvalidOpts)), - AcceptOpts = #{no_schedule => true}, - Accepted = [begin - {ok, #{id := J}, _} = couch_jobs:accept(T, AcceptOpts), - J - end || _ <- lists:seq(1, JobCount)], - ?assertEqual(lists:sort(Jobs), lists:sort(Accepted)) - end). + JobCount = 25, + Jobs = [fabric2_util:uuid() || _ <- lists:seq(1, JobCount)], + [couch_jobs:add(?TX, T, J, #{}) || J <- Jobs], + InvalidOpts = #{no_schedule => true, max_sched_time => 1}, + ?assertMatch({error, _}, couch_jobs:accept(T, InvalidOpts)), + AcceptOpts = #{no_schedule => true}, + Accepted = [begin + {ok, #{id := J}, _} = couch_jobs:accept(T, AcceptOpts), + J + end || _ <- lists:seq(1, JobCount)], + ?assertEqual(lists:sort(Jobs), lists:sort(Accepted)). subscribe(#{t1 := T, j1 := J}) -> - ?_test(begin - ok = couch_jobs:add(?TX, T, J, #{<<"z">> => 1}), + ok = couch_jobs:add(?TX, T, J, #{<<"z">> => 1}), - ?assertEqual({error, not_found}, couch_jobs:subscribe(<<"xyz">>, J)), - ?assertEqual({error, not_found}, couch_jobs:subscribe(T, <<"j5">>)), + ?assertEqual({error, not_found}, couch_jobs:subscribe(<<"xyz">>, J)), + ?assertEqual({error, not_found}, couch_jobs:subscribe(T, <<"j5">>)), - SubRes0 = couch_jobs:subscribe(T, J), - ?assertMatch({ok, {_, _}, pending, #{<<"z">> := 1}}, SubRes0), - {ok, SubId0, pending, _} = SubRes0, + SubRes0 = couch_jobs:subscribe(T, J), + ?assertMatch({ok, {_, _}, pending, #{<<"z">> := 1}}, SubRes0), + {ok, SubId0, pending, _} = SubRes0, - SubRes1 = couch_jobs:subscribe(T, J), - ?assertEqual(SubRes0, SubRes1), + SubRes1 = couch_jobs:subscribe(T, J), + ?assertEqual(SubRes0, SubRes1), - ?assertEqual(ok, couch_jobs:unsubscribe(SubId0)), + ?assertEqual(ok, couch_jobs:unsubscribe(SubId0)), - SubRes = couch_jobs:subscribe(T, J), - ?assertMatch({ok, {_, _}, pending, #{<<"z">> := 1}}, SubRes), - {ok, SubId, pending, _} = SubRes, + SubRes = couch_jobs:subscribe(T, J), + ?assertMatch({ok, {_, _}, pending, #{<<"z">> := 1}}, SubRes), + {ok, SubId, pending, _} = SubRes, - {ok, Job, _} = couch_jobs:accept(T), - ?assertMatch({T, J, running, #{<<"z">> := 1}}, - couch_jobs:wait(SubId, 5000)), + {ok, Job, _} = couch_jobs:accept(T), + ?assertMatch({T, J, running, #{<<"z">> := 1}}, + couch_jobs:wait(SubId, 5000)), - % Make sure we get intermediate `running` updates - ?assertMatch({ok, _}, couch_jobs:update(?TX, Job, #{<<"z">> => 2})), - ?assertMatch({T, J, running, #{<<"z">> := 2}}, - couch_jobs:wait(SubId, 5000)), + % Make sure we get intermediate `running` updates + ?assertMatch({ok, _}, couch_jobs:update(?TX, Job, #{<<"z">> => 2})), + ?assertMatch({T, J, running, #{<<"z">> := 2}}, + couch_jobs:wait(SubId, 5000)), - ?assertEqual(ok, couch_jobs:finish(?TX, Job, #{<<"z">> => 3})), - ?assertMatch({T, J, finished, #{<<"z">> := 3}}, - couch_jobs:wait(SubId, finished, 5000)), + ?assertEqual(ok, couch_jobs:finish(?TX, Job, #{<<"z">> => 3})), + ?assertMatch({T, J, finished, #{<<"z">> := 3}}, + couch_jobs:wait(SubId, finished, 5000)), - ?assertEqual(timeout, couch_jobs:wait(SubId, 50)), + ?assertEqual(timeout, couch_jobs:wait(SubId, 50)), - ?assertEqual({ok, finished, #{<<"z">> => 3}}, - couch_jobs:subscribe(T, J)), + ?assertEqual({ok, finished, #{<<"z">> => 3}}, + couch_jobs:subscribe(T, J)), - ?assertEqual(ok, couch_jobs:remove(?TX, T, J)), - ?assertEqual({error, not_found}, couch_jobs:subscribe(T, J)) - end). + ?assertEqual(ok, couch_jobs:remove(?TX, T, J)), + ?assertEqual({error, not_found}, couch_jobs:subscribe(T, J)). remove_when_subscribed_and_pending(#{t1 := T, j1 := J}) -> - ?_test(begin - ok = couch_jobs:add(?TX, T, J, #{<<"x">> => 1}), - {ok, SId, pending, _} = couch_jobs:subscribe(T, J), + ok = couch_jobs:add(?TX, T, J, #{<<"x">> => 1}), + {ok, SId, pending, _} = couch_jobs:subscribe(T, J), - couch_jobs:remove(?TX, T, J), + couch_jobs:remove(?TX, T, J), - ?assertMatch({T, J, not_found, not_found}, couch_jobs:wait(SId, 5000)), - ?assertEqual(timeout, couch_jobs:wait(SId, 50)) - end). + ?assertMatch({T, J, not_found, not_found}, couch_jobs:wait(SId, 5000)), + ?assertEqual(timeout, couch_jobs:wait(SId, 50)). remove_when_subscribed_and_running(#{t1 := T, j1 := J}) -> - ?_test(begin - ok = couch_jobs:add(?TX, T, J, #{<<"z">> => 2}), - {ok, SId, pending, _} = couch_jobs:subscribe(T, J), - {ok, #{}, _} = couch_jobs:accept(T), - ?assertMatch({_, _, running, _}, couch_jobs:wait(SId, 5000)), + ok = couch_jobs:add(?TX, T, J, #{<<"z">> => 2}), + {ok, SId, pending, _} = couch_jobs:subscribe(T, J), + {ok, #{}, _} = couch_jobs:accept(T), + ?assertMatch({_, _, running, _}, couch_jobs:wait(SId, 5000)), - couch_jobs:remove(?TX, T, J), + couch_jobs:remove(?TX, T, J), - ?assertMatch({T, J, not_found, not_found}, couch_jobs:wait(SId, 5000)), - ?assertEqual(timeout, couch_jobs:wait(SId, 50)) - end). + ?assertMatch({T, J, not_found, not_found}, couch_jobs:wait(SId, 5000)), + ?assertEqual(timeout, couch_jobs:wait(SId, 50)). subscribe_wait_multiple(#{t1 := T, j1 := J1, j2 := J2}) -> - ?_test(begin - ok = couch_jobs:add(?TX, T, J1, #{}), - ok = couch_jobs:add(?TX, T, J2, #{}), - - {ok, S1, pending, #{}} = couch_jobs:subscribe(T, J1), - {ok, S2, pending, #{}} = couch_jobs:subscribe(T, J2), - - Subs = [S1, S2], - - % Accept one job. Only one running update is expected. PJob1 and PJob2 - % do not necessarily correspond got Job1 and Job2, they could be - % accepted as Job2 and Job1 respectively. - {ok, PJob1, _} = couch_jobs:accept(T), - ?assertMatch({_, _, running, _}, couch_jobs:wait(Subs, 5000)), - ?assertMatch(timeout, couch_jobs:wait(Subs, 50)), - - % Accept another job. Expect another update. - {ok, PJob2, _} = couch_jobs:accept(T), - ?assertMatch({_, _, running, _}, couch_jobs:wait(Subs, 5000)), - ?assertMatch(timeout, couch_jobs:wait(Subs, 50)), - - ?assertMatch({ok, _}, couch_jobs:update(?TX, PJob1, #{<<"q">> => 5})), - ?assertMatch({ok, _}, couch_jobs:update(?TX, PJob2, #{<<"r">> => 6})), - - % Each job was updated once, expect two running updates. - ?assertMatch({_, _, running, _}, couch_jobs:wait(Subs, 5000)), - ?assertMatch({_, _, running, _}, couch_jobs:wait(Subs, 5000)), - - % Finish one job. Expect one finished update only. - ?assertEqual(ok, couch_jobs:finish(?TX, PJob1)), - - ?assertMatch({_, _, finished, #{<<"q">> := 5}}, - couch_jobs:wait(Subs, finished, 5000)), - ?assertMatch(timeout, couch_jobs:wait(Subs, finished, 50)), - - % Finish another job. However, unsubscribe should flush the - % the message and we should not get it. - ?assertEqual(ok, couch_jobs:finish(?TX, PJob2)), - ?assertEqual(ok, couch_jobs:unsubscribe(S1)), - ?assertEqual(ok, couch_jobs:unsubscribe(S2)), - ?assertMatch(timeout, couch_jobs:wait(Subs, finished, 50)) - end). + ok = couch_jobs:add(?TX, T, J1, #{}), + ok = couch_jobs:add(?TX, T, J2, #{}), + + {ok, S1, pending, #{}} = couch_jobs:subscribe(T, J1), + {ok, S2, pending, #{}} = couch_jobs:subscribe(T, J2), + + Subs = [S1, S2], + + % Accept one job. Only one running update is expected. PJob1 and PJob2 + % do not necessarily correspond got Job1 and Job2, they could be + % accepted as Job2 and Job1 respectively. + {ok, PJob1, _} = couch_jobs:accept(T), + ?assertMatch({_, _, running, _}, couch_jobs:wait(Subs, 5000)), + ?assertMatch(timeout, couch_jobs:wait(Subs, 50)), + + % Accept another job. Expect another update. + {ok, PJob2, _} = couch_jobs:accept(T), + ?assertMatch({_, _, running, _}, couch_jobs:wait(Subs, 5000)), + ?assertMatch(timeout, couch_jobs:wait(Subs, 50)), + + ?assertMatch({ok, _}, couch_jobs:update(?TX, PJob1, #{<<"q">> => 5})), + ?assertMatch({ok, _}, couch_jobs:update(?TX, PJob2, #{<<"r">> => 6})), + + % Each job was updated once, expect two running updates. + ?assertMatch({_, _, running, _}, couch_jobs:wait(Subs, 5000)), + ?assertMatch({_, _, running, _}, couch_jobs:wait(Subs, 5000)), + + % Finish one job. Expect one finished update only. + ?assertEqual(ok, couch_jobs:finish(?TX, PJob1)), + + ?assertMatch({_, _, finished, #{<<"q">> := 5}}, + couch_jobs:wait(Subs, finished, 5000)), + ?assertMatch(timeout, couch_jobs:wait(Subs, finished, 50)), + + % Finish another job. However, unsubscribe should flush the + % the message and we should not get it. + ?assertEqual(ok, couch_jobs:finish(?TX, PJob2)), + ?assertEqual(ok, couch_jobs:unsubscribe(S1)), + ?assertEqual(ok, couch_jobs:unsubscribe(S2)), + ?assertMatch(timeout, couch_jobs:wait(Subs, finished, 50)). enqueue_inactive(#{t1 := T, j1 := J, t1_timeout := Timeout}) -> - {timeout, 10, ?_test(begin - couch_jobs_server:force_check_types(), + couch_jobs_server:force_check_types(), - ok = couch_jobs:add(?TX, T, J, #{<<"y">> => 1}), - {ok, Job, _} = couch_jobs:accept(T), + ok = couch_jobs:add(?TX, T, J, #{<<"y">> => 1}), + {ok, Job, _} = couch_jobs:accept(T), - {ok, SubId, running, #{<<"y">> := 1}} = couch_jobs:subscribe(T, J), - Wait = 3 * Timeout * 1000, - ?assertEqual({T, J, pending, #{<<"y">> => 1}}, - couch_jobs:wait(SubId, pending, Wait)), - ?assertMatch(#{state := pending}, get_job(T, J)), + {ok, SubId, running, #{<<"y">> := 1}} = couch_jobs:subscribe(T, J), + Wait = 3 * Timeout * 1000, + ?assertEqual({T, J, pending, #{<<"y">> => 1}}, + couch_jobs:wait(SubId, pending, Wait)), + ?assertMatch(#{state := pending}, get_job(T, J)), - % After job was re-enqueued, old job processor can't update it anymore - ?assertEqual({error, halt}, couch_jobs:update(?TX, Job)), - ?assertEqual({error, halt}, couch_jobs:finish(?TX, Job)) - end)}. + % After job was re-enqueued, old job processor can't update it anymore + ?assertEqual({error, halt}, couch_jobs:update(?TX, Job)), + ?assertEqual({error, halt}, couch_jobs:finish(?TX, Job)). remove_running_job(#{t1 := T, j1 := J}) -> - ?_test(begin - ok = couch_jobs:add(?TX, T, J, #{}), - {ok, Job, _} = couch_jobs:accept(T), - ?assertEqual(ok, couch_jobs:remove(?TX, T, J)), - ?assertEqual({error, not_found}, couch_jobs:remove(?TX, T, J)), - ?assertEqual({error, halt}, couch_jobs:update(?TX, Job)), - ?assertEqual({error, halt}, couch_jobs:finish(?TX, Job)) - end). + ok = couch_jobs:add(?TX, T, J, #{}), + {ok, Job, _} = couch_jobs:accept(T), + ?assertEqual(ok, couch_jobs:remove(?TX, T, J)), + ?assertEqual({error, not_found}, couch_jobs:remove(?TX, T, J)), + ?assertEqual({error, halt}, couch_jobs:update(?TX, Job)), + ?assertEqual({error, halt}, couch_jobs:finish(?TX, Job)). check_get_jobs(#{t1 := T1, j1 := J1, t2 := T2, j2 := J2}) -> - ?_test(begin - ok = couch_jobs:add(?TX, T1, J1, #{}), - ok = couch_jobs:add(?TX, T2, J2, #{}), - ?assertMatch([ - {T2, J2, pending, #{}}, - {T1, J1, pending, #{}} - ], lists:sort(couch_jobs_fdb:get_jobs())), - {ok, _, _} = couch_jobs:accept(T1), - ?assertMatch([ - {T2, J2, pending, #{}}, - {T1, J1, running, #{}} - ], lists:sort(couch_jobs_fdb:get_jobs())) - end). + ok = couch_jobs:add(?TX, T1, J1, #{}), + ok = couch_jobs:add(?TX, T2, J2, #{}), + ?assertMatch([ + {T2, J2, pending, #{}}, + {T1, J1, pending, #{}} + ], lists:sort(couch_jobs_fdb:get_jobs())), + {ok, _, _} = couch_jobs:accept(T1), + ?assertMatch([ + {T2, J2, pending, #{}}, + {T1, J1, running, #{}} + ], lists:sort(couch_jobs_fdb:get_jobs())). use_fabric_transaction_object(#{t1 := T1, j1 := J1, dbname := DbName}) -> - ?_test(begin - {ok, Db} = fabric2_db:create(DbName, []), - ?assertEqual(ok, couch_jobs:add(Db, T1, J1, #{})), - ?assertMatch(#{state := pending, data := #{}}, get_job(T1, J1)), - {ok, Job, _} = couch_jobs:accept(T1), - ?assertEqual(ok, fabric2_fdb:transactional(Db, fun(Db1) -> - {ok, #{}} = couch_jobs:get_job_data(Db1, T1, J1), - Doc1 = #doc{id = <<"1">>, body = {[]}}, - {ok, {_, _}} = fabric2_db:update_doc(Db1, Doc1), - Doc2 = #doc{id = <<"2">>, body = {[]}}, - {ok, {_, _}} = fabric2_db:update_doc(Db1, Doc2), - couch_jobs:finish(Db1, Job, #{<<"d">> => 1}) - end)), - ok = couch_jobs:remove(#{tx => undefined}, T1, J1), - ok = fabric2_db:delete(DbName, []) - end). + {ok, Db} = fabric2_db:create(DbName, []), + ?assertEqual(ok, couch_jobs:add(Db, T1, J1, #{})), + ?assertMatch(#{state := pending, data := #{}}, get_job(T1, J1)), + {ok, Job, _} = couch_jobs:accept(T1), + ?assertEqual(ok, fabric2_fdb:transactional(Db, fun(Db1) -> + {ok, #{}} = couch_jobs:get_job_data(Db1, T1, J1), + Doc1 = #doc{id = <<"1">>, body = {[]}}, + {ok, {_, _}} = fabric2_db:update_doc(Db1, Doc1), + Doc2 = #doc{id = <<"2">>, body = {[]}}, + {ok, {_, _}} = fabric2_db:update_doc(Db1, Doc2), + couch_jobs:finish(Db1, Job, #{<<"d">> => 1}) + end)), + ok = couch_jobs:remove(#{tx => undefined}, T1, J1), + ok = fabric2_db:delete(DbName, []). metadata_version_bump(_) -> - ?_test(begin - JTx1 = couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(Tx) -> Tx end), - ?assertMatch(#{md_version := not_found}, JTx1), - - ets:delete_all_objects(couch_jobs_fdb), - couch_jobs_fdb:bump_metadata_version(), - JTx2 = couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(Tx) -> Tx end), - ?assertMatch(#{md_version := Bin} when is_binary(Bin), JTx2), - - ets:delete_all_objects(couch_jobs_fdb), - couch_jobs_fdb:bump_metadata_version(), - JTx3 = couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(Tx) -> Tx end), - OldMdv = maps:get(md_version, JTx2), - NewMdv = maps:get(md_version, JTx3), - ?assert(NewMdv > OldMdv) - end). + JTx1 = couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(Tx) -> Tx end), + ?assertMatch(#{md_version := not_found}, JTx1), + + ets:delete_all_objects(couch_jobs_fdb), + couch_jobs_fdb:bump_metadata_version(), + JTx2 = couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(Tx) -> Tx end), + ?assertMatch(#{md_version := Bin} when is_binary(Bin), JTx2), + + ets:delete_all_objects(couch_jobs_fdb), + couch_jobs_fdb:bump_metadata_version(), + JTx3 = couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(Tx) -> Tx end), + OldMdv = maps:get(md_version, JTx2), + NewMdv = maps:get(md_version, JTx3), + ?assert(NewMdv > OldMdv). |