summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2021-03-10 18:16:42 -0500
committerNick Vatamaniuc <vatamane@apache.org>2021-03-10 22:18:51 -0500
commit23a2d3709bcf90170e7d3af6db43568f49c0898a (patch)
treed0c56137c9fc3ffe81d28817d464773f2901acf7
parent82bd4d450711b0ff62616229851e5d19fd20e42f (diff)
downloadcouchdb-fix-flaky-couch-jobs-tests.tar.gz
Fix couch_jobs to be less flakyfix-flaky-couch-jobs-tests
It turns out fabric is dependent on couch_jobs because of db expiration module. So when couch_jobs was restarted multiple times per test case it could have brought down fabric. However, since couch_jobs needs fabric for transactional stuff it ended up brining couch_jobs app down as well. To fix it: * Switch to explicitly starting/stopping fabric and couch_jobs together * Break appart bad_messages* tests to individually test each type of message as app restarts in the middle of the tests kept killing fabric and intermettently killing couch_jobs a well. * Also make the tests look nicer by re-using ?TDEF_FE macros from `fabric2_test`, this we can avoid the `?_test(begin... end).` pattern. * Remove meck:unload since we don't really meck anything in the module * Don't need to spend time cleaning out database as we don't really create that many dbs (just one) and that one gets cleaned out in its own test.
-rw-r--r--src/couch_jobs/test/couch_jobs_tests.erl986
1 files changed, 454 insertions, 532 deletions
diff --git a/src/couch_jobs/test/couch_jobs_tests.erl b/src/couch_jobs/test/couch_jobs_tests.erl
index 11572a4b9..b40e52e8a 100644
--- a/src/couch_jobs/test/couch_jobs_tests.erl
+++ b/src/couch_jobs/test/couch_jobs_tests.erl
@@ -16,6 +16,7 @@
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch/include/couch_eunit.hrl").
-include_lib("eunit/include/eunit.hrl").
+-include_lib("fabric/test/fabric2_test.hrl").
% Job creation API can take an undefined Tx object
@@ -33,37 +34,43 @@ couch_jobs_basic_test_() ->
foreach,
fun setup/0, fun teardown/1,
[
- fun add_remove_pending/1,
- fun add_remove_errors/1,
- fun add_with_the_same_scheduled_time/1,
- fun get_job_data_and_state/1,
- fun resubmit_as_job_creator/1,
- fun type_timeouts_and_server/1,
- fun dead_notifier_restarts_jobs_server/1,
- fun bad_messages_restart_couch_jobs_server/1,
- fun bad_messages_restart_notifier/1,
- fun bad_messages_restart_activity_monitor/1,
- fun basic_accept_and_finish/1,
- fun accept_blocking/1,
- fun job_processor_update/1,
- fun resubmit_enqueues_job/1,
- fun resubmit_finished_updates_job_data/1,
- fun resubmit_running_does_not_update_job_data/1,
- fun resubmit_custom_schedtime/1,
- fun add_pending_updates_job_data/1,
- fun add_finished_updates_job_data/1,
- fun add_running_does_not_update_job_data/1,
- fun accept_max_schedtime/1,
- fun accept_no_schedule/1,
- fun subscribe/1,
- fun remove_when_subscribed_and_pending/1,
- fun remove_when_subscribed_and_running/1,
- fun subscribe_wait_multiple/1,
- fun enqueue_inactive/1,
- fun remove_running_job/1,
- fun check_get_jobs/1,
- fun use_fabric_transaction_object/1,
- fun metadata_version_bump/1
+ ?TDEF_FE(add_remove_pending),
+ ?TDEF_FE(add_remove_errors),
+ ?TDEF_FE(add_with_the_same_scheduled_time),
+ ?TDEF_FE(get_job_data_and_state),
+ ?TDEF_FE(resubmit_as_job_creator),
+ ?TDEF_FE(type_timeouts_and_server, 15),
+ ?TDEF_FE(dead_notifier_restarts_jobs_server),
+ ?TDEF_FE(bad_cast_restarts_couch_jobs_server),
+ ?TDEF_FE(bad_call_restarts_couch_jobs_server),
+ ?TDEF_FE(bad_info_restarts_couch_jobs_server),
+ ?TDEF_FE(bad_cast_restarts_notifier),
+ ?TDEF_FE(bad_call_restarts_notifier),
+ ?TDEF_FE(bad_info_restarts_notifier),
+ ?TDEF_FE(bad_cast_restarts_activity_monitor),
+ ?TDEF_FE(bad_call_restarts_activity_monitor),
+ ?TDEF_FE(bad_info_restarts_activity_monitor),
+ ?TDEF_FE(basic_accept_and_finish),
+ ?TDEF_FE(accept_blocking),
+ ?TDEF_FE(job_processor_update),
+ ?TDEF_FE(resubmit_enqueues_job),
+ ?TDEF_FE(resubmit_finished_updates_job_data),
+ ?TDEF_FE(resubmit_running_does_not_update_job_data),
+ ?TDEF_FE(resubmit_custom_schedtime),
+ ?TDEF_FE(add_pending_updates_job_data),
+ ?TDEF_FE(add_finished_updates_job_data),
+ ?TDEF_FE(add_running_does_not_update_job_data),
+ ?TDEF_FE(accept_max_schedtime),
+ ?TDEF_FE(accept_no_schedule),
+ ?TDEF_FE(subscribe),
+ ?TDEF_FE(remove_when_subscribed_and_pending),
+ ?TDEF_FE(remove_when_subscribed_and_running),
+ ?TDEF_FE(subscribe_wait_multiple),
+ ?TDEF_FE(enqueue_inactive, 15),
+ ?TDEF_FE(remove_running_job),
+ ?TDEF_FE(check_get_jobs),
+ ?TDEF_FE(use_fabric_transaction_object),
+ ?TDEF_FE(metadata_version_bump)
]
}
}
@@ -75,11 +82,11 @@ setup_couch() ->
teardown_couch(Ctx) ->
- test_util:stop_couch(Ctx),
- meck:unload().
+ test_util:stop_couch(Ctx).
setup() ->
+ application:start(fabric),
application:start(couch_jobs),
clear_jobs(),
T1 = {<<"t1">>, 1024}, % a complex type should work
@@ -98,15 +105,10 @@ setup() ->
}.
-teardown(#{dbname := DbName}) ->
- clear_jobs(),
+teardown(#{}) ->
application:stop(couch_jobs),
- AllDbs = fabric2_db:list_dbs(),
- case lists:member(DbName, AllDbs) of
- true -> ok = fabric2_db:delete(DbName, []);
- false -> ok
- end,
- meck:unload().
+ application:stop(fabric),
+ ok.
clear_jobs() ->
@@ -116,647 +118,567 @@ clear_jobs() ->
end).
-restart_app() ->
- application:stop(couch_jobs),
- application:start(couch_jobs),
- couch_jobs_server:force_check_types().
-
-
get_job(Type, JobId) ->
couch_jobs_fdb:get_job(Type, JobId).
add_remove_pending(#{t1 := T1, j1 := J1, t2 := T2, j2 := J2}) ->
- ?_test(begin
- ?assertEqual(ok, couch_jobs:add(?TX, T1, J1, #{})),
- ?assertMatch(#{state := pending, data := #{}}, get_job(T1, J1)),
- ?assertEqual(ok, couch_jobs:remove(?TX, T1, J1)),
- % Data and numeric type should work as well. Also do it in a
- % transaction
- Data = #{<<"x">> => 42},
- ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) ->
- couch_jobs:add(Tx, T2, J2, Data)
- end)),
- ?assertMatch(#{state := pending, data := Data}, get_job(T2, J2)),
- ?assertEqual(ok, couch_jobs:remove(?TX, T2, J2))
- end).
+ ?assertEqual(ok, couch_jobs:add(?TX, T1, J1, #{})),
+ ?assertMatch(#{state := pending, data := #{}}, get_job(T1, J1)),
+ ?assertEqual(ok, couch_jobs:remove(?TX, T1, J1)),
+ % Data and numeric type should work as well. Also do it in a
+ % transaction
+ Data = #{<<"x">> => 42},
+ ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) ->
+ couch_jobs:add(Tx, T2, J2, Data)
+ end)),
+ ?assertMatch(#{state := pending, data := Data}, get_job(T2, J2)),
+ ?assertEqual(ok, couch_jobs:remove(?TX, T2, J2)).
+
get_job_data_and_state(#{t1 := T, j1 := J}) ->
- ?_test(begin
- Data = #{<<"x">> => 42},
- ok = couch_jobs:add(?TX, T, J, Data),
- ?assertEqual({ok, Data}, couch_jobs:get_job_data(?TX, T, J)),
- ?assertEqual({ok, pending}, couch_jobs:get_job_state(?TX, T, J)),
- ?assertEqual(ok, couch_jobs:remove(?TX, T, J)),
- ?assertEqual({error, not_found}, couch_jobs:get_job_data(?TX, T, J)),
- ?assertEqual({error, not_found}, couch_jobs:get_job_state(?TX, T, J))
- end).
+ Data = #{<<"x">> => 42},
+ ok = couch_jobs:add(?TX, T, J, Data),
+ ?assertEqual({ok, Data}, couch_jobs:get_job_data(?TX, T, J)),
+ ?assertEqual({ok, pending}, couch_jobs:get_job_state(?TX, T, J)),
+ ?assertEqual(ok, couch_jobs:remove(?TX, T, J)),
+ ?assertEqual({error, not_found}, couch_jobs:get_job_data(?TX, T, J)),
+ ?assertEqual({error, not_found}, couch_jobs:get_job_state(?TX, T, J)).
add_remove_errors(#{t1 := T, j1 := J}) ->
- ?_test(begin
- ?assertEqual({error, not_found}, couch_jobs:remove(?TX, 999, <<"x">>)),
- ?assertMatch({error, {json_encoding_error, _}}, couch_jobs:add(?TX, T,
- J, #{1 => 2})),
- ?assertEqual({error, no_type_timeout}, couch_jobs:add(?TX, <<"x">>, J,
- #{})),
- ?assertEqual(ok, couch_jobs:add(?TX, T, J, #{})),
- ?assertEqual(ok, couch_jobs:add(?TX, T, J, #{})),
- ?assertEqual(ok, couch_jobs:remove(?TX, T, J))
- end).
+ ?assertEqual({error, not_found}, couch_jobs:remove(?TX, 999, <<"x">>)),
+ ?assertMatch({error, {json_encoding_error, _}}, couch_jobs:add(?TX, T,
+ J, #{1 => 2})),
+ ?assertEqual({error, no_type_timeout}, couch_jobs:add(?TX, <<"x">>, J,
+ #{})),
+ ?assertEqual(ok, couch_jobs:add(?TX, T, J, #{})),
+ ?assertEqual(ok, couch_jobs:add(?TX, T, J, #{})),
+ ?assertEqual(ok, couch_jobs:remove(?TX, T, J)).
add_with_the_same_scheduled_time(#{t1 := T, j1 := J}) ->
- ?_test(begin
- ?assertEqual(ok, couch_jobs:add(?TX, T, J, #{})),
- fabric2_fdb:transactional(fun(Tx) ->
- ?assertEqual(ok, couch_jobs:add(Tx, T, J, #{})),
- ?assert(erlfdb:is_read_only(Tx))
- end)
+ ?assertEqual(ok, couch_jobs:add(?TX, T, J, #{})),
+ fabric2_fdb:transactional(fun(Tx) ->
+ ?assertEqual(ok, couch_jobs:add(Tx, T, J, #{})),
+ ?assert(erlfdb:is_read_only(Tx))
end).
resubmit_as_job_creator(#{t1 := T, j1 := J}) ->
- ?_test(begin
- Data = #{<<"x">> => 42},
- ok = couch_jobs:add(?TX, T, J, Data, 15),
-
- % Job was pending, doesn't get resubmitted
- ok = couch_jobs:add(?TX, T, J, Data, 16),
- ?assertMatch(#{state := pending, stime := 16}, get_job(T, J)),
-
- {ok, Job1, Data} = couch_jobs:accept(T),
-
- % If is running, it gets flagged to be resubmitted
- ok = couch_jobs:add(?TX, T, J, Data, 17),
- ?assertMatch(#{state := running, stime := 17}, get_job(T, J)),
- ?assertEqual(true, couch_jobs:is_resubmitted(get_job(T, J))),
-
- ?assertEqual(ok, couch_jobs:finish(?TX, Job1)),
- % It should be pending according to the resubmit flag
- ?assertMatch(#{state := pending, stime := 17}, get_job(T, J)),
-
- % A finished job will be re-enqueued
- {ok, Job2, _} = couch_jobs:accept(T),
- ?assertEqual(ok, couch_jobs:finish(?TX, Job2)),
- ?assertMatch(#{state := finished, stime := 17}, get_job(T, J)),
- ok = couch_jobs:add(?TX, T, J, Data, 18),
- ?assertMatch(#{state := pending, stime := 18}, get_job(T, J))
- end).
-
-
-type_timeouts_and_server(#{t1 := T, t1_timeout := T1Timeout}) ->
- {timeout, 15, ?_test(begin
+ Data = #{<<"x">> => 42},
+ ok = couch_jobs:add(?TX, T, J, Data, 15),
- WaitForActivityMonitors = fun(N) ->
- test_util:wait(fun() ->
- Pids = couch_jobs_activity_monitor_sup:get_child_pids(),
- case length(Pids) == N of
- true -> ok;
- false -> wait
- end
- end)
- end,
+ % Job was pending, doesn't get resubmitted
+ ok = couch_jobs:add(?TX, T, J, Data, 16),
+ ?assertMatch(#{state := pending, stime := 16}, get_job(T, J)),
- WaitForNotifiers = fun(N) ->
- test_util:wait(fun() ->
- Pids = couch_jobs_notifier_sup:get_child_pids(),
- case length(Pids) == N of
- true -> ok;
- false -> wait
- end
- end)
- end,
+ {ok, Job1, Data} = couch_jobs:accept(T),
- couch_jobs_server:force_check_types(),
+ % If is running, it gets flagged to be resubmitted
+ ok = couch_jobs:add(?TX, T, J, Data, 17),
+ ?assertMatch(#{state := running, stime := 17}, get_job(T, J)),
+ ?assertEqual(true, couch_jobs:is_resubmitted(get_job(T, J))),
- ?assertEqual(T1Timeout, couch_jobs:get_type_timeout(T)),
+ ?assertEqual(ok, couch_jobs:finish(?TX, Job1)),
+ % It should be pending according to the resubmit flag
+ ?assertMatch(#{state := pending, stime := 17}, get_job(T, J)),
- WaitForActivityMonitors(2),
- ?assertEqual(2,
- length(couch_jobs_activity_monitor_sup:get_child_pids())),
+ % A finished job will be re-enqueued
+ {ok, Job2, _} = couch_jobs:accept(T),
+ ?assertEqual(ok, couch_jobs:finish(?TX, Job2)),
+ ?assertMatch(#{state := finished, stime := 17}, get_job(T, J)),
+ ok = couch_jobs:add(?TX, T, J, Data, 18),
+ ?assertMatch(#{state := pending, stime := 18}, get_job(T, J)).
- WaitForNotifiers(2),
- ?assertEqual(2, length(couch_jobs_notifier_sup:get_child_pids())),
- ?assertMatch({ok, _}, couch_jobs_server:get_notifier_server(T)),
+type_timeouts_and_server(#{t1 := T, t1_timeout := T1Timeout}) ->
+ WaitForActivityMonitors = fun(N) ->
+ test_util:wait(fun() ->
+ Pids = couch_jobs_activity_monitor_sup:get_child_pids(),
+ case length(Pids) == N of
+ true -> ok;
+ false -> wait
+ end
+ end)
+ end,
- ?assertEqual(ok, couch_jobs:set_type_timeout(<<"t3">>, 8)),
- couch_jobs_server:force_check_types(),
+ WaitForNotifiers = fun(N) ->
+ test_util:wait(fun() ->
+ Pids = couch_jobs_notifier_sup:get_child_pids(),
+ case length(Pids) == N of
+ true -> ok;
+ false -> wait
+ end
+ end)
+ end,
- WaitForActivityMonitors(3),
- ?assertEqual(3,
- length(couch_jobs_activity_monitor_sup:get_child_pids())),
+ couch_jobs_server:force_check_types(),
- WaitForNotifiers(3),
- ?assertEqual(3, length(couch_jobs_notifier_sup:get_child_pids())),
+ ?assertEqual(T1Timeout, couch_jobs:get_type_timeout(T)),
- ?assertEqual(ok, couch_jobs:clear_type_timeout(<<"t3">>)),
- couch_jobs_server:force_check_types(),
+ WaitForActivityMonitors(2),
+ ?assertEqual(2,
+ length(couch_jobs_activity_monitor_sup:get_child_pids())),
- WaitForActivityMonitors(2),
- ?assertEqual(2,
- length(couch_jobs_activity_monitor_sup:get_child_pids())),
+ WaitForNotifiers(2),
+ ?assertEqual(2, length(couch_jobs_notifier_sup:get_child_pids())),
- WaitForNotifiers(2),
- ?assertEqual(2,
- length(couch_jobs_notifier_sup:get_child_pids())),
+ ?assertMatch({ok, _}, couch_jobs_server:get_notifier_server(T)),
- ?assertMatch({error, _},
- couch_jobs_server:get_notifier_server(<<"t3">>)),
+ ?assertEqual(ok, couch_jobs:set_type_timeout(<<"t3">>, 8)),
+ couch_jobs_server:force_check_types(),
- ?assertEqual(not_found, couch_jobs:get_type_timeout(<<"t3">>))
- end)}.
+ WaitForActivityMonitors(3),
+ ?assertEqual(3,
+ length(couch_jobs_activity_monitor_sup:get_child_pids())),
+ WaitForNotifiers(3),
+ ?assertEqual(3, length(couch_jobs_notifier_sup:get_child_pids())),
-dead_notifier_restarts_jobs_server(#{}) ->
- ?_test(begin
- couch_jobs_server:force_check_types(),
+ ?assertEqual(ok, couch_jobs:clear_type_timeout(<<"t3">>)),
+ couch_jobs_server:force_check_types(),
- ServerPid = whereis(couch_jobs_server),
- Ref = monitor(process, ServerPid),
+ WaitForActivityMonitors(2),
+ ?assertEqual(2,
+ length(couch_jobs_activity_monitor_sup:get_child_pids())),
- [Notifier1, _Notifier2] = couch_jobs_notifier_sup:get_child_pids(),
- exit(Notifier1, kill),
+ WaitForNotifiers(2),
+ ?assertEqual(2,
+ length(couch_jobs_notifier_sup:get_child_pids())),
- % Killing a notifier should kill the server as well
- receive {'DOWN', Ref, _, _, _} -> ok end
- end).
+ ?assertMatch({error, _},
+ couch_jobs_server:get_notifier_server(<<"t3">>)),
+ ?assertEqual(not_found, couch_jobs:get_type_timeout(<<"t3">>)).
-bad_messages_restart_couch_jobs_server(#{}) ->
- ?_test(begin
- % couch_jobs_server dies on bad cast
- ServerPid1 = whereis(couch_jobs_server),
- Ref1 = monitor(process, ServerPid1),
- gen_server:cast(ServerPid1, bad_cast),
- receive {'DOWN', Ref1, _, _, _} -> ok end,
- restart_app(),
+dead_notifier_restarts_jobs_server(#{}) ->
+ couch_jobs_server:force_check_types(),
- % couch_jobs_server dies on bad call
- ServerPid2 = whereis(couch_jobs_server),
- Ref2 = monitor(process, ServerPid2),
- catch gen_server:call(ServerPid2, bad_call),
- receive {'DOWN', Ref2, _, _, _} -> ok end,
+ ServerPid = whereis(couch_jobs_server),
+ Ref = monitor(process, ServerPid),
- restart_app(),
+ [Notifier1, _Notifier2] = couch_jobs_notifier_sup:get_child_pids(),
+ exit(Notifier1, kill),
- % couch_jobs_server dies on bad info
- ServerPid3 = whereis(couch_jobs_server),
- Ref3 = monitor(process, ServerPid3),
- ServerPid3 ! a_random_message,
- receive {'DOWN', Ref3, _, _, _} -> ok end,
+ % Killing a notifier should kill the server as well
+ receive {'DOWN', Ref, _, _, _} -> ok end.
- restart_app()
- end).
+bad_cast_restarts_couch_jobs_server(#{}) ->
+ ServerPid1 = whereis(couch_jobs_server),
+ Ref1 = monitor(process, ServerPid1),
+ gen_server:cast(ServerPid1, bad_cast),
+ receive {'DOWN', Ref1, _, _, _} -> ok end.
-bad_messages_restart_notifier(#{}) ->
- ?_test(begin
- couch_jobs_server:force_check_types(),
- % bad cast kills the activity monitor
- [AMon1, _] = couch_jobs_notifier_sup:get_child_pids(),
- Ref1 = monitor(process, AMon1),
- gen_server:cast(AMon1, bad_cast),
- receive {'DOWN', Ref1, _, _, _} -> ok end,
+bad_call_restarts_couch_jobs_server(#{}) ->
+ ServerPid2 = whereis(couch_jobs_server),
+ Ref2 = monitor(process, ServerPid2),
+ catch gen_server:call(ServerPid2, bad_call),
+ receive {'DOWN', Ref2, _, _, _} -> ok end.
- restart_app(),
- % bad calls restart activity monitor
- [AMon2, _] = couch_jobs_notifier_sup:get_child_pids(),
- Ref2 = monitor(process, AMon2),
- catch gen_server:call(AMon2, bad_call),
- receive {'DOWN', Ref2, _, _, _} -> ok end,
+bad_info_restarts_couch_jobs_server(#{}) ->
+ ServerPid3 = whereis(couch_jobs_server),
+ Ref3 = monitor(process, ServerPid3),
+ ServerPid3 ! a_random_message,
+ receive {'DOWN', Ref3, _, _, _} -> ok end.
- restart_app(),
- % bad info message kills activity monitor
- [AMon3, _] = couch_jobs_notifier_sup:get_child_pids(),
- Ref3 = monitor(process, AMon3),
- AMon3 ! a_bad_message,
- receive {'DOWN', Ref3, _, _, _} -> ok end,
+bad_cast_restarts_notifier(#{}) ->
+ couch_jobs_server:force_check_types(),
+ [AMon1, _] = couch_jobs_notifier_sup:get_child_pids(),
+ Ref1 = monitor(process, AMon1),
+ gen_server:cast(AMon1, bad_cast),
+ receive {'DOWN', Ref1, _, _, _} -> ok end.
- restart_app()
- end).
+bad_call_restarts_notifier(#{}) ->
+ couch_jobs_server:force_check_types(),
+ [AMon2, _] = couch_jobs_notifier_sup:get_child_pids(),
+ Ref2 = monitor(process, AMon2),
+ catch gen_server:call(AMon2, bad_call),
+ receive {'DOWN', Ref2, _, _, _} -> ok end.
-bad_messages_restart_activity_monitor(#{}) ->
- ?_test(begin
- couch_jobs_server:force_check_types(),
+bad_info_restarts_notifier(#{}) ->
+ couch_jobs_server:force_check_types(),
+ [AMon3, _] = couch_jobs_notifier_sup:get_child_pids(),
+ Ref3 = monitor(process, AMon3),
+ AMon3 ! a_bad_message,
+ receive {'DOWN', Ref3, _, _, _} -> ok end.
- % bad cast kills the activity monitor
- [AMon1, _] = couch_jobs_activity_monitor_sup:get_child_pids(),
- Ref1 = monitor(process, AMon1),
- gen_server:cast(AMon1, bad_cast),
- receive {'DOWN', Ref1, _, _, _} -> ok end,
- restart_app(),
+bad_cast_restarts_activity_monitor(#{}) ->
+ couch_jobs_server:force_check_types(),
+ [AMon1, _] = couch_jobs_activity_monitor_sup:get_child_pids(),
+ Ref1 = monitor(process, AMon1),
+ gen_server:cast(AMon1, bad_cast),
+ receive {'DOWN', Ref1, _, _, _} -> ok end.
- % bad calls restart activity monitor
- [AMon2, _] = couch_jobs_activity_monitor_sup:get_child_pids(),
- Ref2 = monitor(process, AMon2),
- catch gen_server:call(AMon2, bad_call),
- receive {'DOWN', Ref2, _, _, _} -> ok end,
- restart_app(),
+bad_call_restarts_activity_monitor(#{}) ->
+ couch_jobs_server:force_check_types(),
+ [AMon2, _] = couch_jobs_activity_monitor_sup:get_child_pids(),
+ Ref2 = monitor(process, AMon2),
+ catch gen_server:call(AMon2, bad_call),
+ receive {'DOWN', Ref2, _, _, _} -> ok end.
- % bad info message kills activity monitor
- [AMon3, _] = couch_jobs_activity_monitor_sup:get_child_pids(),
- Ref3 = monitor(process, AMon3),
- AMon3 ! a_bad_message,
- receive {'DOWN', Ref3, _, _, _} -> ok end,
- restart_app()
- end).
+bad_info_restarts_activity_monitor(#{}) ->
+ couch_jobs_server:force_check_types(),
+ % bad info message kills activity monitor
+ [AMon3, _] = couch_jobs_activity_monitor_sup:get_child_pids(),
+ Ref3 = monitor(process, AMon3),
+ AMon3 ! a_bad_message,
+ receive {'DOWN', Ref3, _, _, _} -> ok end.
basic_accept_and_finish(#{t1 := T, j1 := J}) ->
- ?_test(begin
- ok = couch_jobs:add(?TX, T, J, #{}),
- {ok, Job, #{}} = couch_jobs:accept(T),
- ?assertMatch(#{state := running}, get_job(T, J)),
- % check json validation for bad data in finish
- ?assertMatch({error, {json_encoding_error, _}},
- fabric2_fdb:transactional(fun(Tx) ->
- couch_jobs:finish(Tx, Job, #{1 => 1})
- end)),
- Data = #{<<"x">> => 42},
- ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) ->
- couch_jobs:finish(Tx, Job, Data)
+ ok = couch_jobs:add(?TX, T, J, #{}),
+ {ok, Job, #{}} = couch_jobs:accept(T),
+ ?assertMatch(#{state := running}, get_job(T, J)),
+ % check json validation for bad data in finish
+ ?assertMatch({error, {json_encoding_error, _}},
+ fabric2_fdb:transactional(fun(Tx) ->
+ couch_jobs:finish(Tx, Job, #{1 => 1})
end)),
- ?assertMatch(#{state := finished, data := Data}, get_job(T, J))
- end).
+ Data = #{<<"x">> => 42},
+ ?assertEqual(ok, fabric2_fdb:transactional(fun(Tx) ->
+ couch_jobs:finish(Tx, Job, Data)
+ end)),
+ ?assertMatch(#{state := finished, data := Data}, get_job(T, J)).
accept_blocking(#{t1 := T, j1 := J1, j2 := J2}) ->
- ?_test(begin
- Accept = fun() -> exit(couch_jobs:accept(T)) end,
- WaitAccept = fun(Ref) ->
- receive
- {'DOWN', Ref, _, _, Res} -> Res
- after
- 500 -> timeout
- end
- end,
- {_, Ref1} = spawn_monitor(Accept),
- ok = couch_jobs:add(?TX, T, J1, #{}),
- ?assertMatch({ok, #{id := J1}, #{}}, WaitAccept(Ref1)),
- {_, Ref2} = spawn_monitor(Accept),
- ?assertEqual(timeout, WaitAccept(Ref2)),
- ok = couch_jobs:add(?TX, T, J2, #{}),
- ?assertMatch({ok, #{id := J2}, #{}}, WaitAccept(Ref2))
- end).
+ Accept = fun() -> exit(couch_jobs:accept(T)) end,
+ WaitAccept = fun(Ref) ->
+ receive
+ {'DOWN', Ref, _, _, Res} -> Res
+ after
+ 500 -> timeout
+ end
+ end,
+ {_, Ref1} = spawn_monitor(Accept),
+ ok = couch_jobs:add(?TX, T, J1, #{}),
+ ?assertMatch({ok, #{id := J1}, #{}}, WaitAccept(Ref1)),
+ {_, Ref2} = spawn_monitor(Accept),
+ ?assertEqual(timeout, WaitAccept(Ref2)),
+ ok = couch_jobs:add(?TX, T, J2, #{}),
+ ?assertMatch({ok, #{id := J2}, #{}}, WaitAccept(Ref2)).
job_processor_update(#{t1 := T, j1 := J}) ->
- ?_test(begin
- ok = couch_jobs:add(?TX, T, J, #{}),
- {ok, Job, #{}} = couch_jobs:accept(T),
+ ok = couch_jobs:add(?TX, T, J, #{}),
+ {ok, Job, #{}} = couch_jobs:accept(T),
- % Use proper transactions in a few places here instead of passing in
- % ?TX This is mostly to increase code coverage
+ % Use proper transactions in a few places here instead of passing in
+ % ?TX This is mostly to increase code coverage
- ?assertMatch({ok, #{job := true}}, fabric2_fdb:transactional(fun(Tx) ->
- couch_jobs:update(Tx, Job, #{<<"x">> => 1})
- end)),
+ ?assertMatch({ok, #{job := true}}, fabric2_fdb:transactional(fun(Tx) ->
+ couch_jobs:update(Tx, Job, #{<<"x">> => 1})
+ end)),
- ?assertMatch(#{data := #{<<"x">> := 1}, state := running},
- get_job(T, J)),
+ ?assertMatch(#{data := #{<<"x">> := 1}, state := running},
+ get_job(T, J)),
- ?assertMatch({ok, #{job := true}}, fabric2_fdb:transactional(fun(Tx) ->
- couch_jobs:update(Tx, Job)
- end)),
+ ?assertMatch({ok, #{job := true}}, fabric2_fdb:transactional(fun(Tx) ->
+ couch_jobs:update(Tx, Job)
+ end)),
- ?assertMatch(#{data := #{<<"x">> := 1}, state := running},
- get_job(T, J)),
+ ?assertMatch(#{data := #{<<"x">> := 1}, state := running},
+ get_job(T, J)),
- ?assertMatch({ok, #{job := true}}, fabric2_fdb:transactional(fun(Tx) ->
- couch_jobs:update(Tx, Job, #{<<"x">> => 2})
- end)),
+ ?assertMatch({ok, #{job := true}}, fabric2_fdb:transactional(fun(Tx) ->
+ couch_jobs:update(Tx, Job, #{<<"x">> => 2})
+ end)),
- % check json validation for bad data in update
- ?assertMatch({error, {json_encoding_error, _}},
- fabric2_fdb:transactional(fun(Tx) ->
- couch_jobs:update(Tx, Job, #{1 => 1})
- end)),
+ % check json validation for bad data in update
+ ?assertMatch({error, {json_encoding_error, _}},
+ fabric2_fdb:transactional(fun(Tx) ->
+ couch_jobs:update(Tx, Job, #{1 => 1})
+ end)),
- ?assertMatch(#{data := #{<<"x">> := 2}, state := running},
- get_job(T, J)),
+ ?assertMatch(#{data := #{<<"x">> := 2}, state := running},
+ get_job(T, J)),
- % Finish may update the data as well
- ?assertEqual(ok, couch_jobs:finish(?TX, Job, #{<<"x">> => 3})),
- ?assertMatch(#{data := #{<<"x">> := 3}, state := finished},
- get_job(T, J))
- end).
+ % Finish may update the data as well
+ ?assertEqual(ok, couch_jobs:finish(?TX, Job, #{<<"x">> => 3})),
+ ?assertMatch(#{data := #{<<"x">> := 3}, state := finished},
+ get_job(T, J)).
resubmit_enqueues_job(#{t1 := T, j1 := J}) ->
- ?_test(begin
- ok = couch_jobs:add(?TX, T, J, #{}),
- {ok, Job1, #{}} = couch_jobs:accept(T),
- ?assertMatch({ok, _}, couch_jobs:resubmit(?TX, Job1, 6)),
- ?assertEqual(ok, couch_jobs:finish(?TX, Job1)),
- ?assertMatch(#{state := pending, stime := 6}, get_job(T, J)),
- {ok, Job2, #{}} = couch_jobs:accept(T),
- ?assertEqual(ok, couch_jobs:finish(?TX, Job2)),
- ?assertMatch(#{state := finished}, get_job(T, J))
- end).
+ ok = couch_jobs:add(?TX, T, J, #{}),
+ {ok, Job1, #{}} = couch_jobs:accept(T),
+ ?assertMatch({ok, _}, couch_jobs:resubmit(?TX, Job1, 6)),
+ ?assertEqual(ok, couch_jobs:finish(?TX, Job1)),
+ ?assertMatch(#{state := pending, stime := 6}, get_job(T, J)),
+ {ok, Job2, #{}} = couch_jobs:accept(T),
+ ?assertEqual(ok, couch_jobs:finish(?TX, Job2)),
+ ?assertMatch(#{state := finished}, get_job(T, J)).
+
resubmit_finished_updates_job_data(#{t1 := T, j1 := J}) ->
- ?_test(begin
- Data1 = #{<<"test">> => 1},
- Data2 = #{<<"test">> => 2},
- ok = couch_jobs:add(?TX, T, J, Data1),
- {ok, Job1, #{}} = couch_jobs:accept(T),
- ?assertEqual(ok, couch_jobs:finish(?TX, Job1)),
- ?assertMatch({ok, _}, couch_jobs:resubmit(?TX, Job1, 6, Data2)),
- ?assertMatch({ok, _, Data2}, couch_jobs:accept(T))
- end).
+ Data1 = #{<<"test">> => 1},
+ Data2 = #{<<"test">> => 2},
+ ok = couch_jobs:add(?TX, T, J, Data1),
+ {ok, Job1, #{}} = couch_jobs:accept(T),
+ ?assertEqual(ok, couch_jobs:finish(?TX, Job1)),
+ ?assertMatch({ok, _}, couch_jobs:resubmit(?TX, Job1, 6, Data2)),
+ ?assertMatch({ok, _, Data2}, couch_jobs:accept(T)).
resubmit_running_does_not_update_job_data(#{t1 := T, j1 := J}) ->
- ?_test(begin
- Data1 = #{<<"test">> => 1},
- Data2 = #{<<"test">> => 2},
- ok = couch_jobs:add(?TX, T, J, Data1),
- {ok, Job1, #{}} = couch_jobs:accept(T),
- ?assertMatch({ok, _}, couch_jobs:resubmit(?TX, Job1, 6, Data2)),
- ?assertEqual(ok, couch_jobs:finish(?TX, Job1)),
- ?assertMatch({ok, _, Data1}, couch_jobs:accept(T))
- end).
+ Data1 = #{<<"test">> => 1},
+ Data2 = #{<<"test">> => 2},
+ ok = couch_jobs:add(?TX, T, J, Data1),
+ {ok, Job1, #{}} = couch_jobs:accept(T),
+ ?assertMatch({ok, _}, couch_jobs:resubmit(?TX, Job1, 6, Data2)),
+ ?assertEqual(ok, couch_jobs:finish(?TX, Job1)),
+ ?assertMatch({ok, _, Data1}, couch_jobs:accept(T)).
resubmit_custom_schedtime(#{t1 := T, j1 := J}) ->
- ?_test(begin
- ?assertEqual(ok, couch_jobs:add(?TX, T, J, #{}, 7)),
- {ok, Job, #{}} = couch_jobs:accept(T),
- ?assertMatch({ok, _}, couch_jobs:resubmit(?TX, Job, 9)),
- ?assertEqual(ok, couch_jobs:finish(?TX, Job)),
- ?assertMatch(#{stime := 9, state := pending}, get_job(T, J))
- end).
+ ?assertEqual(ok, couch_jobs:add(?TX, T, J, #{}, 7)),
+ {ok, Job, #{}} = couch_jobs:accept(T),
+ ?assertMatch({ok, _}, couch_jobs:resubmit(?TX, Job, 9)),
+ ?assertEqual(ok, couch_jobs:finish(?TX, Job)),
+ ?assertMatch(#{stime := 9, state := pending}, get_job(T, J)).
add_pending_updates_job_data(#{t1 := T, j1 := J}) ->
- ?_test(begin
- Data1 = #{<<"test">> => 1},
- Data2 = #{<<"test">> => 2},
- ok = couch_jobs:add(?TX, T, J, Data1),
- ?assertEqual(ok, couch_jobs:add(?TX, T, J, Data2, 6)),
- ?assertMatch({ok, _, Data2}, couch_jobs:accept(T))
- end).
+ Data1 = #{<<"test">> => 1},
+ Data2 = #{<<"test">> => 2},
+ ok = couch_jobs:add(?TX, T, J, Data1),
+ ?assertEqual(ok, couch_jobs:add(?TX, T, J, Data2, 6)),
+ ?assertMatch({ok, _, Data2}, couch_jobs:accept(T)).
add_finished_updates_job_data(#{t1 := T, j1 := J}) ->
- ?_test(begin
- Data1 = #{<<"test">> => 1},
- Data2 = #{<<"test">> => 2},
- ok = couch_jobs:add(?TX, T, J, Data1),
- {ok, Job1, #{}} = couch_jobs:accept(T),
- ?assertEqual(ok, couch_jobs:finish(?TX, Job1)),
- ?assertEqual(ok, couch_jobs:add(?TX, T, J, Data2, 6)),
- ?assertMatch({ok, _, Data2}, couch_jobs:accept(T))
- end).
+ Data1 = #{<<"test">> => 1},
+ Data2 = #{<<"test">> => 2},
+ ok = couch_jobs:add(?TX, T, J, Data1),
+ {ok, Job1, #{}} = couch_jobs:accept(T),
+ ?assertEqual(ok, couch_jobs:finish(?TX, Job1)),
+ ?assertEqual(ok, couch_jobs:add(?TX, T, J, Data2, 6)),
+ ?assertMatch({ok, _, Data2}, couch_jobs:accept(T)).
add_running_does_not_update_job_data(#{t1 := T, j1 := J}) ->
- ?_test(begin
- Data1 = #{<<"test">> => 1},
- Data2 = #{<<"test">> => 2},
- ok = couch_jobs:add(?TX, T, J, Data1),
- {ok, Job1, #{}} = couch_jobs:accept(T),
- ?assertEqual(ok, couch_jobs:add(?TX, T, J, Data2, 6)),
- ?assertEqual(ok, couch_jobs:finish(?TX, Job1)),
- ?assertMatch({ok, _, Data1}, couch_jobs:accept(T))
- end).
+ Data1 = #{<<"test">> => 1},
+ Data2 = #{<<"test">> => 2},
+ ok = couch_jobs:add(?TX, T, J, Data1),
+ {ok, Job1, #{}} = couch_jobs:accept(T),
+ ?assertEqual(ok, couch_jobs:add(?TX, T, J, Data2, 6)),
+ ?assertEqual(ok, couch_jobs:finish(?TX, Job1)),
+ ?assertMatch({ok, _, Data1}, couch_jobs:accept(T)).
accept_max_schedtime(#{t1 := T, j1 := J1, j2 := J2}) ->
- ?_test(begin
- ok = couch_jobs:add(?TX, T, J1, #{}, 5000),
- ok = couch_jobs:add(?TX, T, J2, #{}, 3000),
- ?assertEqual({error, not_found}, couch_jobs:accept(T,
- #{max_sched_time => 1000})),
- ?assertMatch({ok, #{id := J2}, _}, couch_jobs:accept(T,
- #{max_sched_time => 3000})),
- ?assertMatch({ok, #{id := J1}, _}, couch_jobs:accept(T,
- #{max_sched_time => 9000}))
- end).
+ ok = couch_jobs:add(?TX, T, J1, #{}, 5000),
+ ok = couch_jobs:add(?TX, T, J2, #{}, 3000),
+ ?assertEqual({error, not_found}, couch_jobs:accept(T,
+ #{max_sched_time => 1000})),
+ ?assertMatch({ok, #{id := J2}, _}, couch_jobs:accept(T,
+ #{max_sched_time => 3000})),
+ ?assertMatch({ok, #{id := J1}, _}, couch_jobs:accept(T,
+ #{max_sched_time => 9000})).
accept_no_schedule(#{t1 := T}) ->
- ?_test(begin
- JobCount = 25,
- Jobs = [fabric2_util:uuid() || _ <- lists:seq(1, JobCount)],
- [couch_jobs:add(?TX, T, J, #{}) || J <- Jobs],
- InvalidOpts = #{no_schedule => true, max_sched_time => 1},
- ?assertMatch({error, _}, couch_jobs:accept(T, InvalidOpts)),
- AcceptOpts = #{no_schedule => true},
- Accepted = [begin
- {ok, #{id := J}, _} = couch_jobs:accept(T, AcceptOpts),
- J
- end || _ <- lists:seq(1, JobCount)],
- ?assertEqual(lists:sort(Jobs), lists:sort(Accepted))
- end).
+ JobCount = 25,
+ Jobs = [fabric2_util:uuid() || _ <- lists:seq(1, JobCount)],
+ [couch_jobs:add(?TX, T, J, #{}) || J <- Jobs],
+ InvalidOpts = #{no_schedule => true, max_sched_time => 1},
+ ?assertMatch({error, _}, couch_jobs:accept(T, InvalidOpts)),
+ AcceptOpts = #{no_schedule => true},
+ Accepted = [begin
+ {ok, #{id := J}, _} = couch_jobs:accept(T, AcceptOpts),
+ J
+ end || _ <- lists:seq(1, JobCount)],
+ ?assertEqual(lists:sort(Jobs), lists:sort(Accepted)).
subscribe(#{t1 := T, j1 := J}) ->
- ?_test(begin
- ok = couch_jobs:add(?TX, T, J, #{<<"z">> => 1}),
+ ok = couch_jobs:add(?TX, T, J, #{<<"z">> => 1}),
- ?assertEqual({error, not_found}, couch_jobs:subscribe(<<"xyz">>, J)),
- ?assertEqual({error, not_found}, couch_jobs:subscribe(T, <<"j5">>)),
+ ?assertEqual({error, not_found}, couch_jobs:subscribe(<<"xyz">>, J)),
+ ?assertEqual({error, not_found}, couch_jobs:subscribe(T, <<"j5">>)),
- SubRes0 = couch_jobs:subscribe(T, J),
- ?assertMatch({ok, {_, _}, pending, #{<<"z">> := 1}}, SubRes0),
- {ok, SubId0, pending, _} = SubRes0,
+ SubRes0 = couch_jobs:subscribe(T, J),
+ ?assertMatch({ok, {_, _}, pending, #{<<"z">> := 1}}, SubRes0),
+ {ok, SubId0, pending, _} = SubRes0,
- SubRes1 = couch_jobs:subscribe(T, J),
- ?assertEqual(SubRes0, SubRes1),
+ SubRes1 = couch_jobs:subscribe(T, J),
+ ?assertEqual(SubRes0, SubRes1),
- ?assertEqual(ok, couch_jobs:unsubscribe(SubId0)),
+ ?assertEqual(ok, couch_jobs:unsubscribe(SubId0)),
- SubRes = couch_jobs:subscribe(T, J),
- ?assertMatch({ok, {_, _}, pending, #{<<"z">> := 1}}, SubRes),
- {ok, SubId, pending, _} = SubRes,
+ SubRes = couch_jobs:subscribe(T, J),
+ ?assertMatch({ok, {_, _}, pending, #{<<"z">> := 1}}, SubRes),
+ {ok, SubId, pending, _} = SubRes,
- {ok, Job, _} = couch_jobs:accept(T),
- ?assertMatch({T, J, running, #{<<"z">> := 1}},
- couch_jobs:wait(SubId, 5000)),
+ {ok, Job, _} = couch_jobs:accept(T),
+ ?assertMatch({T, J, running, #{<<"z">> := 1}},
+ couch_jobs:wait(SubId, 5000)),
- % Make sure we get intermediate `running` updates
- ?assertMatch({ok, _}, couch_jobs:update(?TX, Job, #{<<"z">> => 2})),
- ?assertMatch({T, J, running, #{<<"z">> := 2}},
- couch_jobs:wait(SubId, 5000)),
+ % Make sure we get intermediate `running` updates
+ ?assertMatch({ok, _}, couch_jobs:update(?TX, Job, #{<<"z">> => 2})),
+ ?assertMatch({T, J, running, #{<<"z">> := 2}},
+ couch_jobs:wait(SubId, 5000)),
- ?assertEqual(ok, couch_jobs:finish(?TX, Job, #{<<"z">> => 3})),
- ?assertMatch({T, J, finished, #{<<"z">> := 3}},
- couch_jobs:wait(SubId, finished, 5000)),
+ ?assertEqual(ok, couch_jobs:finish(?TX, Job, #{<<"z">> => 3})),
+ ?assertMatch({T, J, finished, #{<<"z">> := 3}},
+ couch_jobs:wait(SubId, finished, 5000)),
- ?assertEqual(timeout, couch_jobs:wait(SubId, 50)),
+ ?assertEqual(timeout, couch_jobs:wait(SubId, 50)),
- ?assertEqual({ok, finished, #{<<"z">> => 3}},
- couch_jobs:subscribe(T, J)),
+ ?assertEqual({ok, finished, #{<<"z">> => 3}},
+ couch_jobs:subscribe(T, J)),
- ?assertEqual(ok, couch_jobs:remove(?TX, T, J)),
- ?assertEqual({error, not_found}, couch_jobs:subscribe(T, J))
- end).
+ ?assertEqual(ok, couch_jobs:remove(?TX, T, J)),
+ ?assertEqual({error, not_found}, couch_jobs:subscribe(T, J)).
remove_when_subscribed_and_pending(#{t1 := T, j1 := J}) ->
- ?_test(begin
- ok = couch_jobs:add(?TX, T, J, #{<<"x">> => 1}),
- {ok, SId, pending, _} = couch_jobs:subscribe(T, J),
+ ok = couch_jobs:add(?TX, T, J, #{<<"x">> => 1}),
+ {ok, SId, pending, _} = couch_jobs:subscribe(T, J),
- couch_jobs:remove(?TX, T, J),
+ couch_jobs:remove(?TX, T, J),
- ?assertMatch({T, J, not_found, not_found}, couch_jobs:wait(SId, 5000)),
- ?assertEqual(timeout, couch_jobs:wait(SId, 50))
- end).
+ ?assertMatch({T, J, not_found, not_found}, couch_jobs:wait(SId, 5000)),
+ ?assertEqual(timeout, couch_jobs:wait(SId, 50)).
remove_when_subscribed_and_running(#{t1 := T, j1 := J}) ->
- ?_test(begin
- ok = couch_jobs:add(?TX, T, J, #{<<"z">> => 2}),
- {ok, SId, pending, _} = couch_jobs:subscribe(T, J),
- {ok, #{}, _} = couch_jobs:accept(T),
- ?assertMatch({_, _, running, _}, couch_jobs:wait(SId, 5000)),
+ ok = couch_jobs:add(?TX, T, J, #{<<"z">> => 2}),
+ {ok, SId, pending, _} = couch_jobs:subscribe(T, J),
+ {ok, #{}, _} = couch_jobs:accept(T),
+ ?assertMatch({_, _, running, _}, couch_jobs:wait(SId, 5000)),
- couch_jobs:remove(?TX, T, J),
+ couch_jobs:remove(?TX, T, J),
- ?assertMatch({T, J, not_found, not_found}, couch_jobs:wait(SId, 5000)),
- ?assertEqual(timeout, couch_jobs:wait(SId, 50))
- end).
+ ?assertMatch({T, J, not_found, not_found}, couch_jobs:wait(SId, 5000)),
+ ?assertEqual(timeout, couch_jobs:wait(SId, 50)).
subscribe_wait_multiple(#{t1 := T, j1 := J1, j2 := J2}) ->
- ?_test(begin
- ok = couch_jobs:add(?TX, T, J1, #{}),
- ok = couch_jobs:add(?TX, T, J2, #{}),
-
- {ok, S1, pending, #{}} = couch_jobs:subscribe(T, J1),
- {ok, S2, pending, #{}} = couch_jobs:subscribe(T, J2),
-
- Subs = [S1, S2],
-
- % Accept one job. Only one running update is expected. PJob1 and PJob2
- % do not necessarily correspond got Job1 and Job2, they could be
- % accepted as Job2 and Job1 respectively.
- {ok, PJob1, _} = couch_jobs:accept(T),
- ?assertMatch({_, _, running, _}, couch_jobs:wait(Subs, 5000)),
- ?assertMatch(timeout, couch_jobs:wait(Subs, 50)),
-
- % Accept another job. Expect another update.
- {ok, PJob2, _} = couch_jobs:accept(T),
- ?assertMatch({_, _, running, _}, couch_jobs:wait(Subs, 5000)),
- ?assertMatch(timeout, couch_jobs:wait(Subs, 50)),
-
- ?assertMatch({ok, _}, couch_jobs:update(?TX, PJob1, #{<<"q">> => 5})),
- ?assertMatch({ok, _}, couch_jobs:update(?TX, PJob2, #{<<"r">> => 6})),
-
- % Each job was updated once, expect two running updates.
- ?assertMatch({_, _, running, _}, couch_jobs:wait(Subs, 5000)),
- ?assertMatch({_, _, running, _}, couch_jobs:wait(Subs, 5000)),
-
- % Finish one job. Expect one finished update only.
- ?assertEqual(ok, couch_jobs:finish(?TX, PJob1)),
-
- ?assertMatch({_, _, finished, #{<<"q">> := 5}},
- couch_jobs:wait(Subs, finished, 5000)),
- ?assertMatch(timeout, couch_jobs:wait(Subs, finished, 50)),
-
- % Finish another job. However, unsubscribe should flush the
- % the message and we should not get it.
- ?assertEqual(ok, couch_jobs:finish(?TX, PJob2)),
- ?assertEqual(ok, couch_jobs:unsubscribe(S1)),
- ?assertEqual(ok, couch_jobs:unsubscribe(S2)),
- ?assertMatch(timeout, couch_jobs:wait(Subs, finished, 50))
- end).
+ ok = couch_jobs:add(?TX, T, J1, #{}),
+ ok = couch_jobs:add(?TX, T, J2, #{}),
+
+ {ok, S1, pending, #{}} = couch_jobs:subscribe(T, J1),
+ {ok, S2, pending, #{}} = couch_jobs:subscribe(T, J2),
+
+ Subs = [S1, S2],
+
+ % Accept one job. Only one running update is expected. PJob1 and PJob2
+ % do not necessarily correspond got Job1 and Job2, they could be
+ % accepted as Job2 and Job1 respectively.
+ {ok, PJob1, _} = couch_jobs:accept(T),
+ ?assertMatch({_, _, running, _}, couch_jobs:wait(Subs, 5000)),
+ ?assertMatch(timeout, couch_jobs:wait(Subs, 50)),
+
+ % Accept another job. Expect another update.
+ {ok, PJob2, _} = couch_jobs:accept(T),
+ ?assertMatch({_, _, running, _}, couch_jobs:wait(Subs, 5000)),
+ ?assertMatch(timeout, couch_jobs:wait(Subs, 50)),
+
+ ?assertMatch({ok, _}, couch_jobs:update(?TX, PJob1, #{<<"q">> => 5})),
+ ?assertMatch({ok, _}, couch_jobs:update(?TX, PJob2, #{<<"r">> => 6})),
+
+ % Each job was updated once, expect two running updates.
+ ?assertMatch({_, _, running, _}, couch_jobs:wait(Subs, 5000)),
+ ?assertMatch({_, _, running, _}, couch_jobs:wait(Subs, 5000)),
+
+ % Finish one job. Expect one finished update only.
+ ?assertEqual(ok, couch_jobs:finish(?TX, PJob1)),
+
+ ?assertMatch({_, _, finished, #{<<"q">> := 5}},
+ couch_jobs:wait(Subs, finished, 5000)),
+ ?assertMatch(timeout, couch_jobs:wait(Subs, finished, 50)),
+
+ % Finish another job. However, unsubscribe should flush the
+ % the message and we should not get it.
+ ?assertEqual(ok, couch_jobs:finish(?TX, PJob2)),
+ ?assertEqual(ok, couch_jobs:unsubscribe(S1)),
+ ?assertEqual(ok, couch_jobs:unsubscribe(S2)),
+ ?assertMatch(timeout, couch_jobs:wait(Subs, finished, 50)).
enqueue_inactive(#{t1 := T, j1 := J, t1_timeout := Timeout}) ->
- {timeout, 10, ?_test(begin
- couch_jobs_server:force_check_types(),
+ couch_jobs_server:force_check_types(),
- ok = couch_jobs:add(?TX, T, J, #{<<"y">> => 1}),
- {ok, Job, _} = couch_jobs:accept(T),
+ ok = couch_jobs:add(?TX, T, J, #{<<"y">> => 1}),
+ {ok, Job, _} = couch_jobs:accept(T),
- {ok, SubId, running, #{<<"y">> := 1}} = couch_jobs:subscribe(T, J),
- Wait = 3 * Timeout * 1000,
- ?assertEqual({T, J, pending, #{<<"y">> => 1}},
- couch_jobs:wait(SubId, pending, Wait)),
- ?assertMatch(#{state := pending}, get_job(T, J)),
+ {ok, SubId, running, #{<<"y">> := 1}} = couch_jobs:subscribe(T, J),
+ Wait = 3 * Timeout * 1000,
+ ?assertEqual({T, J, pending, #{<<"y">> => 1}},
+ couch_jobs:wait(SubId, pending, Wait)),
+ ?assertMatch(#{state := pending}, get_job(T, J)),
- % After job was re-enqueued, old job processor can't update it anymore
- ?assertEqual({error, halt}, couch_jobs:update(?TX, Job)),
- ?assertEqual({error, halt}, couch_jobs:finish(?TX, Job))
- end)}.
+ % After job was re-enqueued, old job processor can't update it anymore
+ ?assertEqual({error, halt}, couch_jobs:update(?TX, Job)),
+ ?assertEqual({error, halt}, couch_jobs:finish(?TX, Job)).
remove_running_job(#{t1 := T, j1 := J}) ->
- ?_test(begin
- ok = couch_jobs:add(?TX, T, J, #{}),
- {ok, Job, _} = couch_jobs:accept(T),
- ?assertEqual(ok, couch_jobs:remove(?TX, T, J)),
- ?assertEqual({error, not_found}, couch_jobs:remove(?TX, T, J)),
- ?assertEqual({error, halt}, couch_jobs:update(?TX, Job)),
- ?assertEqual({error, halt}, couch_jobs:finish(?TX, Job))
- end).
+ ok = couch_jobs:add(?TX, T, J, #{}),
+ {ok, Job, _} = couch_jobs:accept(T),
+ ?assertEqual(ok, couch_jobs:remove(?TX, T, J)),
+ ?assertEqual({error, not_found}, couch_jobs:remove(?TX, T, J)),
+ ?assertEqual({error, halt}, couch_jobs:update(?TX, Job)),
+ ?assertEqual({error, halt}, couch_jobs:finish(?TX, Job)).
check_get_jobs(#{t1 := T1, j1 := J1, t2 := T2, j2 := J2}) ->
- ?_test(begin
- ok = couch_jobs:add(?TX, T1, J1, #{}),
- ok = couch_jobs:add(?TX, T2, J2, #{}),
- ?assertMatch([
- {T2, J2, pending, #{}},
- {T1, J1, pending, #{}}
- ], lists:sort(couch_jobs_fdb:get_jobs())),
- {ok, _, _} = couch_jobs:accept(T1),
- ?assertMatch([
- {T2, J2, pending, #{}},
- {T1, J1, running, #{}}
- ], lists:sort(couch_jobs_fdb:get_jobs()))
- end).
+ ok = couch_jobs:add(?TX, T1, J1, #{}),
+ ok = couch_jobs:add(?TX, T2, J2, #{}),
+ ?assertMatch([
+ {T2, J2, pending, #{}},
+ {T1, J1, pending, #{}}
+ ], lists:sort(couch_jobs_fdb:get_jobs())),
+ {ok, _, _} = couch_jobs:accept(T1),
+ ?assertMatch([
+ {T2, J2, pending, #{}},
+ {T1, J1, running, #{}}
+ ], lists:sort(couch_jobs_fdb:get_jobs())).
use_fabric_transaction_object(#{t1 := T1, j1 := J1, dbname := DbName}) ->
- ?_test(begin
- {ok, Db} = fabric2_db:create(DbName, []),
- ?assertEqual(ok, couch_jobs:add(Db, T1, J1, #{})),
- ?assertMatch(#{state := pending, data := #{}}, get_job(T1, J1)),
- {ok, Job, _} = couch_jobs:accept(T1),
- ?assertEqual(ok, fabric2_fdb:transactional(Db, fun(Db1) ->
- {ok, #{}} = couch_jobs:get_job_data(Db1, T1, J1),
- Doc1 = #doc{id = <<"1">>, body = {[]}},
- {ok, {_, _}} = fabric2_db:update_doc(Db1, Doc1),
- Doc2 = #doc{id = <<"2">>, body = {[]}},
- {ok, {_, _}} = fabric2_db:update_doc(Db1, Doc2),
- couch_jobs:finish(Db1, Job, #{<<"d">> => 1})
- end)),
- ok = couch_jobs:remove(#{tx => undefined}, T1, J1),
- ok = fabric2_db:delete(DbName, [])
- end).
+ {ok, Db} = fabric2_db:create(DbName, []),
+ ?assertEqual(ok, couch_jobs:add(Db, T1, J1, #{})),
+ ?assertMatch(#{state := pending, data := #{}}, get_job(T1, J1)),
+ {ok, Job, _} = couch_jobs:accept(T1),
+ ?assertEqual(ok, fabric2_fdb:transactional(Db, fun(Db1) ->
+ {ok, #{}} = couch_jobs:get_job_data(Db1, T1, J1),
+ Doc1 = #doc{id = <<"1">>, body = {[]}},
+ {ok, {_, _}} = fabric2_db:update_doc(Db1, Doc1),
+ Doc2 = #doc{id = <<"2">>, body = {[]}},
+ {ok, {_, _}} = fabric2_db:update_doc(Db1, Doc2),
+ couch_jobs:finish(Db1, Job, #{<<"d">> => 1})
+ end)),
+ ok = couch_jobs:remove(#{tx => undefined}, T1, J1),
+ ok = fabric2_db:delete(DbName, []).
metadata_version_bump(_) ->
- ?_test(begin
- JTx1 = couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(Tx) -> Tx end),
- ?assertMatch(#{md_version := not_found}, JTx1),
-
- ets:delete_all_objects(couch_jobs_fdb),
- couch_jobs_fdb:bump_metadata_version(),
- JTx2 = couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(Tx) -> Tx end),
- ?assertMatch(#{md_version := Bin} when is_binary(Bin), JTx2),
-
- ets:delete_all_objects(couch_jobs_fdb),
- couch_jobs_fdb:bump_metadata_version(),
- JTx3 = couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(Tx) -> Tx end),
- OldMdv = maps:get(md_version, JTx2),
- NewMdv = maps:get(md_version, JTx3),
- ?assert(NewMdv > OldMdv)
- end).
+ JTx1 = couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(Tx) -> Tx end),
+ ?assertMatch(#{md_version := not_found}, JTx1),
+
+ ets:delete_all_objects(couch_jobs_fdb),
+ couch_jobs_fdb:bump_metadata_version(),
+ JTx2 = couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(Tx) -> Tx end),
+ ?assertMatch(#{md_version := Bin} when is_binary(Bin), JTx2),
+
+ ets:delete_all_objects(couch_jobs_fdb),
+ couch_jobs_fdb:bump_metadata_version(),
+ JTx3 = couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(Tx) -> Tx end),
+ OldMdv = maps:get(md_version, JTx2),
+ NewMdv = maps:get(md_version, JTx3),
+ ?assert(NewMdv > OldMdv).