diff options
author | Nick Vatamaniuc <vatamane@apache.org> | 2020-08-28 04:36:18 -0400 |
---|---|---|
committer | Nick Vatamaniuc <nickva@users.noreply.github.com> | 2020-09-15 16:13:46 -0400 |
commit | ae858196848cf9533dfa03a2006227481f47388d (patch) | |
tree | 1daa6d95727f31edf2299842c8501cf0df57c1b1 | |
parent | 99262909129602bceac82e7907ebfcafc9eba629 (diff) | |
download | couchdb-ae858196848cf9533dfa03a2006227481f47388d.tar.gz |
Update and clean up tests
Update tests to use the new replicator. Also clean up redundancy and re-use
some of the newer macros from fabric2 (?TDEF_FE).
Make sure replicator tests are included in `make check`
21 files changed, 2210 insertions, 1563 deletions
@@ -163,7 +163,7 @@ endif .PHONY: check check: all @$(MAKE) emilio - make eunit apps=couch_eval,couch_expiring_cache,ctrace,couch_jobs,couch_views,fabric,mango,chttpd + make eunit apps=couch_eval,couch_expiring_cache,ctrace,couch_jobs,couch_views,fabric,mango,chttpd,couch_replicator make elixir tests=test/elixir/test/basics_test.exs,test/elixir/test/replication_test.exs,test/elixir/test/map_test.exs,test/elixir/test/all_docs_test.exs,test/elixir/test/bulk_docs_test.exs make exunit apps=chttpd make mango-test diff --git a/src/couch_replicator/test/eunit/couch_replicator_attachments_too_large.erl b/src/couch_replicator/test/eunit/couch_replicator_attachments_too_large.erl index ac4bb84f3..0e7e0ea5a 100644 --- a/src/couch_replicator/test/eunit/couch_replicator_attachments_too_large.erl +++ b/src/couch_replicator/test/eunit/couch_replicator_attachments_too_large.erl @@ -12,72 +12,60 @@ -module(couch_replicator_attachments_too_large). + -include_lib("couch/include/couch_eunit.hrl"). -include_lib("couch/include/couch_db.hrl"). -include_lib("couch_replicator/src/couch_replicator.hrl"). - - -setup(_) -> - Ctx = test_util:start_couch([couch_replicator]), - Source = create_db(), - create_doc_with_attachment(Source, <<"doc">>, 1000), - Target = create_db(), - {Ctx, {Source, Target}}. - - -teardown(_, {Ctx, {Source, Target}}) -> - delete_db(Source), - delete_db(Target), - config:delete("couchdb", "max_attachment_size"), - ok = test_util:stop_couch(Ctx). +-include_lib("fabric/test/fabric2_test.hrl"). attachment_too_large_replication_test_() -> - Pairs = [{remote, remote}], { - "Attachment size too large replication tests", + setup, + fun couch_replicator_test_helper:start_couch/0, + fun couch_replicator_test_helper:stop_couch/1, { - foreachx, - fun setup/1, fun teardown/2, - [{Pair, fun should_succeed/2} || Pair <- Pairs] ++ - [{Pair, fun should_fail/2} || Pair <- Pairs] + foreach, + fun setup/0, + fun teardown/1, + [ + ?TDEF_FE(t_should_succeed), + ?TDEF_FE(t_should_fail) + ] } }. -should_succeed({From, To}, {_Ctx, {Source, Target}}) -> - RepObject = {[ - {<<"source">>, db_url(From, Source)}, - {<<"target">>, db_url(To, Target)} - ]}, - config:set("couchdb", "max_attachment_size", "1000", _Persist = false), - {ok, _} = couch_replicator:replicate(RepObject, ?ADMIN_USER), - ?_assertEqual(ok, couch_replicator_test_helper:compare_dbs(Source, Target)). +setup() -> + Source = couch_replicator_test_helper:create_db(), + create_doc_with_attachment(Source, <<"doc">>, 1000), + Target = couch_replicator_test_helper:create_db(), + {Source, Target}. -should_fail({From, To}, {_Ctx, {Source, Target}}) -> - RepObject = {[ - {<<"source">>, db_url(From, Source)}, - {<<"target">>, db_url(To, Target)} - ]}, - config:set("couchdb", "max_attachment_size", "999", _Persist = false), - {ok, _} = couch_replicator:replicate(RepObject, ?ADMIN_USER), - ?_assertError({badmatch, {not_found, missing}}, - couch_replicator_test_helper:compare_dbs(Source, Target)). +teardown({Source, Target}) -> + config:delete("couchdb", "max_attachment_size", false), + couch_replicator_test_helper:delete_db(Source), + couch_replicator_test_helper:delete_db(Target). -create_db() -> - DbName = ?tempdb(), - {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]), - ok = couch_db:close(Db), - DbName. +t_should_succeed({Source, Target}) -> + config:set("couchdb", "max_attachment_size", "1000", false), + {ok, _} = couch_replicator_test_helper:replicate(Source, Target), + ?assertEqual(ok, couch_replicator_test_helper:compare_dbs(Source, Target)). + + +t_should_fail({Source, Target}) -> + config:set("couchdb", "max_attachment_size", "999", false), + {ok, _} = couch_replicator_test_helper:replicate(Source, Target), + ExceptIds = [<<"doc">>], + ?assertEqual(ok, couch_replicator_test_helper:compare_dbs(Source, + Target, ExceptIds)). create_doc_with_attachment(DbName, DocId, AttSize) -> - {ok, Db} = couch_db:open(DbName, [?ADMIN_CTX]), Doc = #doc{id = DocId, atts = att(AttSize)}, - {ok, _} = couch_db:update_doc(Db, Doc, []), - couch_db:close(Db), + couch_replicator_test_helper:create_docs(DbName, [Doc]), ok. @@ -90,13 +78,3 @@ att(Size) when is_integer(Size), Size >= 1 -> << <<"x">> || _ <- lists:seq(1, Size) >> end} ])]. - - -delete_db(DbName) -> - ok = couch_server:delete(DbName, [?ADMIN_CTX]). - - -db_url(remote, DbName) -> - Addr = config:get("httpd", "bind_address", "127.0.0.1"), - Port = mochiweb_socket_server:get(couch_httpd, port), - ?l2b(io_lib:format("http://~s:~b/~s", [Addr, Port, DbName])). diff --git a/src/couch_replicator/test/eunit/couch_replicator_connection_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_connection_tests.erl index e75cc5a63..df30db25d 100644 --- a/src/couch_replicator/test/eunit/couch_replicator_connection_tests.erl +++ b/src/couch_replicator/test/eunit/couch_replicator_connection_tests.erl @@ -12,187 +12,176 @@ -module(couch_replicator_connection_tests). + -include_lib("couch/include/couch_eunit.hrl"). -include_lib("couch/include/couch_db.hrl"). - --define(TIMEOUT, 1000). - - -setup() -> - Host = config:get("httpd", "bind_address", "127.0.0.1"), - Port = config:get("httpd", "port", "5984"), - {Host, Port}. - -teardown(_) -> - ok. +-include_lib("fabric/test/fabric2_test.hrl"). httpc_pool_test_() -> { - "replicator connection sharing tests", + "Replicator connection sharing tests", { setup, - fun() -> test_util:start_couch([couch_replicator]) end, fun test_util:stop_couch/1, + fun couch_replicator_test_helper:start_couch/0, + fun couch_replicator_test_helper:stop_couch/1, { foreach, - fun setup/0, fun teardown/1, + fun setup/0, + fun teardown/1, [ - fun connections_shared_after_release/1, - fun connections_not_shared_after_owner_death/1, - fun idle_connections_closed/1, - fun test_owner_monitors/1, - fun worker_discards_creds_on_create/1, - fun worker_discards_url_creds_after_request/1, - fun worker_discards_creds_in_headers_after_request/1, - fun worker_discards_proxy_creds_after_request/1 + ?TDEF_FE(connections_shared_after_release), + ?TDEF_FE(connections_not_shared_after_owner_death), + ?TDEF_FE(idle_connections_closed), + ?TDEF_FE(test_owner_monitors), + ?TDEF_FE(worker_discards_creds_on_create), + ?TDEF_FE(worker_discards_url_creds_after_request), + ?TDEF_FE(worker_discards_creds_in_headers_after_request), + ?TDEF_FE(worker_discards_proxy_creds_after_request) ] } } }. +setup() -> + Host = config:get("chttpd", "bind_address", "127.0.0.1"), + Port = config:get("chttpd", "port", "5984"), + {Host, Port}. + + +teardown(_) -> + ok. + + connections_shared_after_release({Host, Port}) -> - ?_test(begin - URL = "http://" ++ Host ++ ":" ++ Port, - Self = self(), - {ok, Pid} = couch_replicator_connection:acquire(URL), - couch_replicator_connection:release(Pid), - spawn(fun() -> - Self ! couch_replicator_connection:acquire(URL) - end), - receive - {ok, Pid2} -> - ?assertEqual(Pid, Pid2) - end - end). + URL = "http://" ++ Host ++ ":" ++ Port, + Self = self(), + {ok, Pid} = couch_replicator_connection:acquire(URL), + couch_replicator_connection:release(Pid), + spawn(fun() -> + Self ! couch_replicator_connection:acquire(URL) + end), + receive + {ok, Pid2} -> + ?assertEqual(Pid, Pid2) + end. connections_not_shared_after_owner_death({Host, Port}) -> - ?_test(begin - URL = "http://" ++ Host ++ ":" ++ Port, - Self = self(), - spawn(fun() -> - Self ! couch_replicator_connection:acquire(URL), - error("simulate division by zero without compiler warning") - end), - receive - {ok, Pid} -> - {ok, Pid2} = couch_replicator_connection:acquire(URL), - ?assertNotEqual(Pid, Pid2), - MRef = monitor(process, Pid), - receive {'DOWN', MRef, process, Pid, _Reason} -> + URL = "http://" ++ Host ++ ":" ++ Port, + Self = self(), + spawn(fun() -> + Self ! couch_replicator_connection:acquire(URL), + error("simulate division by zero without compiler warning") + end), + receive + {ok, Pid} -> + {ok, Pid2} = couch_replicator_connection:acquire(URL), + ?assertNotEqual(Pid, Pid2), + MRef = monitor(process, Pid), + receive + {'DOWN', MRef, process, Pid, _Reason} -> ?assert(not is_process_alive(Pid)); - Other -> throw(Other) - end - end - end). + Other -> + throw(Other) + end + end. idle_connections_closed({Host, Port}) -> - ?_test(begin - URL = "http://" ++ Host ++ ":" ++ Port, - {ok, Pid} = couch_replicator_connection:acquire(URL), - couch_replicator_connection ! close_idle_connections, - ?assert(ets:member(couch_replicator_connection, Pid)), - % block until idle connections have closed - sys:get_status(couch_replicator_connection), - couch_replicator_connection:release(Pid), - couch_replicator_connection ! close_idle_connections, - % block until idle connections have closed - sys:get_status(couch_replicator_connection), - ?assert(not ets:member(couch_replicator_connection, Pid)) - end). + URL = "http://" ++ Host ++ ":" ++ Port, + {ok, Pid} = couch_replicator_connection:acquire(URL), + couch_replicator_connection ! close_idle_connections, + ?assert(ets:member(couch_replicator_connection, Pid)), + % block until idle connections have closed + sys:get_status(couch_replicator_connection), + couch_replicator_connection:release(Pid), + couch_replicator_connection ! close_idle_connections, + % block until idle connections have closed + sys:get_status(couch_replicator_connection), + ?assert(not ets:member(couch_replicator_connection, Pid)). test_owner_monitors({Host, Port}) -> - ?_test(begin - URL = "http://" ++ Host ++ ":" ++ Port, - {ok, Worker0} = couch_replicator_connection:acquire(URL), - assert_monitors_equal([{process, self()}]), - couch_replicator_connection:release(Worker0), - assert_monitors_equal([]), - {Workers, Monitors} = lists:foldl(fun(_, {WAcc, MAcc}) -> - {ok, Worker1} = couch_replicator_connection:acquire(URL), - MAcc1 = [{process, self()} | MAcc], - assert_monitors_equal(MAcc1), - {[Worker1 | WAcc], MAcc1} - end, {[], []}, lists:seq(1,5)), - lists:foldl(fun(Worker2, Acc) -> - [_ | NewAcc] = Acc, - couch_replicator_connection:release(Worker2), - assert_monitors_equal(NewAcc), - NewAcc - end, Monitors, Workers) - end). + URL = "http://" ++ Host ++ ":" ++ Port, + {ok, Worker0} = couch_replicator_connection:acquire(URL), + assert_monitors_equal([{process, self()}]), + couch_replicator_connection:release(Worker0), + assert_monitors_equal([]), + {Workers, Monitors} = lists:foldl(fun(_, {WAcc, MAcc}) -> + {ok, Worker1} = couch_replicator_connection:acquire(URL), + MAcc1 = [{process, self()} | MAcc], + assert_monitors_equal(MAcc1), + {[Worker1 | WAcc], MAcc1} + end, {[], []}, lists:seq(1, 5)), + lists:foldl(fun(Worker2, Acc) -> + [_ | NewAcc] = Acc, + couch_replicator_connection:release(Worker2), + assert_monitors_equal(NewAcc), + NewAcc + end, Monitors, Workers). worker_discards_creds_on_create({Host, Port}) -> - ?_test(begin - {User, Pass, B64Auth} = user_pass(), - URL = "http://" ++ User ++ ":" ++ Pass ++ "@" ++ Host ++ ":" ++ Port, - {ok, WPid} = couch_replicator_connection:acquire(URL), - Internals = worker_internals(WPid), - ?assert(string:str(Internals, B64Auth) =:= 0), - ?assert(string:str(Internals, Pass) =:= 0) - end). + {User, Pass, B64Auth} = user_pass(), + URL = "http://" ++ User ++ ":" ++ Pass ++ "@" ++ Host ++ ":" ++ Port, + {ok, WPid} = couch_replicator_connection:acquire(URL), + Internals = worker_internals(WPid), + ?assert(string:str(Internals, B64Auth) =:= 0), + ?assert(string:str(Internals, Pass) =:= 0). worker_discards_url_creds_after_request({Host, _}) -> - ?_test(begin - {User, Pass, B64Auth} = user_pass(), - {Port, ServerPid} = server(), - PortStr = integer_to_list(Port), - URL = "http://" ++ User ++ ":" ++ Pass ++ "@" ++ Host ++ ":" ++ PortStr, - {ok, WPid} = couch_replicator_connection:acquire(URL), - ?assertMatch({ok, "200", _, _}, send_req(WPid, URL, [], [])), - Internals = worker_internals(WPid), - ?assert(string:str(Internals, B64Auth) =:= 0), - ?assert(string:str(Internals, Pass) =:= 0), - couch_replicator_connection:release(WPid), - unlink(ServerPid), - exit(ServerPid, kill) - end). + {User, Pass, B64Auth} = user_pass(), + {Port, ServerPid} = server(), + PortStr = integer_to_list(Port), + URL = "http://" ++ User ++ ":" ++ Pass ++ "@" ++ Host ++ ":" ++ PortStr, + {ok, WPid} = couch_replicator_connection:acquire(URL), + ?assertMatch({ok, "200", _, _}, send_req(WPid, URL, [], [])), + Internals = worker_internals(WPid), + ?assert(string:str(Internals, B64Auth) =:= 0), + ?assert(string:str(Internals, Pass) =:= 0), + couch_replicator_connection:release(WPid), + unlink(ServerPid), + exit(ServerPid, kill). worker_discards_creds_in_headers_after_request({Host, _}) -> - ?_test(begin - {_User, Pass, B64Auth} = user_pass(), - {Port, ServerPid} = server(), - PortStr = integer_to_list(Port), - URL = "http://" ++ Host ++ ":" ++ PortStr, - {ok, WPid} = couch_replicator_connection:acquire(URL), - Headers = [{"Authorization", "Basic " ++ B64Auth}], - ?assertMatch({ok, "200", _, _}, send_req(WPid, URL, Headers, [])), - Internals = worker_internals(WPid), - ?assert(string:str(Internals, B64Auth) =:= 0), - ?assert(string:str(Internals, Pass) =:= 0), - couch_replicator_connection:release(WPid), - unlink(ServerPid), - exit(ServerPid, kill) - end). + {_User, Pass, B64Auth} = user_pass(), + {Port, ServerPid} = server(), + PortStr = integer_to_list(Port), + URL = "http://" ++ Host ++ ":" ++ PortStr, + {ok, WPid} = couch_replicator_connection:acquire(URL), + Headers = [{"Authorization", "Basic " ++ B64Auth}], + ?assertMatch({ok, "200", _, _}, send_req(WPid, URL, Headers, [])), + Internals = worker_internals(WPid), + ?assert(string:str(Internals, B64Auth) =:= 0), + ?assert(string:str(Internals, Pass) =:= 0), + couch_replicator_connection:release(WPid), + unlink(ServerPid), + exit(ServerPid, kill). worker_discards_proxy_creds_after_request({Host, _}) -> - ?_test(begin - {User, Pass, B64Auth} = user_pass(), - {Port, ServerPid} = server(), - PortStr = integer_to_list(Port), - URL = "http://" ++ Host ++ ":" ++ PortStr, - {ok, WPid} = couch_replicator_connection:acquire(URL), - Opts = [ - {proxy_host, Host}, - {proxy_port, Port}, - {proxy_user, User}, - {proxy_pass, Pass} - ], - ?assertMatch({ok, "200", _, _}, send_req(WPid, URL, [], Opts)), - Internals = worker_internals(WPid), - ?assert(string:str(Internals, B64Auth) =:= 0), - ?assert(string:str(Internals, Pass) =:= 0), - couch_replicator_connection:release(WPid), - unlink(ServerPid), - exit(ServerPid, kill) - end). + {User, Pass, B64Auth} = user_pass(), + {Port, ServerPid} = server(), + PortStr = integer_to_list(Port), + URL = "http://" ++ Host ++ ":" ++ PortStr, + {ok, WPid} = couch_replicator_connection:acquire(URL), + Opts = [ + {proxy_host, Host}, + {proxy_port, Port}, + {proxy_user, User}, + {proxy_pass, Pass} + ], + ?assertMatch({ok, "200", _, _}, send_req(WPid, URL, [], Opts)), + Internals = worker_internals(WPid), + ?assert(string:str(Internals, B64Auth) =:= 0), + ?assert(string:str(Internals, Pass) =:= 0), + couch_replicator_connection:release(WPid), + unlink(ServerPid), + exit(ServerPid, kill). send_req(WPid, URL, Headers, Opts) -> @@ -237,5 +226,6 @@ server_responder(LSock) -> assert_monitors_equal(ShouldBe) -> sys:get_status(couch_replicator_connection), - {monitors, Monitors} = process_info(whereis(couch_replicator_connection), monitors), + {monitors, Monitors} = process_info(whereis(couch_replicator_connection), + monitors), ?assertEqual(Monitors, ShouldBe). diff --git a/src/couch_replicator/test/eunit/couch_replicator_create_target_with_options_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_create_target_with_options_tests.erl index 63310d39e..c957fc199 100644 --- a/src/couch_replicator/test/eunit/couch_replicator_create_target_with_options_tests.erl +++ b/src/couch_replicator/test/eunit/couch_replicator_create_target_with_options_tests.erl @@ -12,132 +12,137 @@ -module(couch_replicator_create_target_with_options_tests). + -include_lib("couch/include/couch_eunit.hrl"). -include_lib("couch/include/couch_db.hrl"). -include_lib("couch_replicator/src/couch_replicator.hrl"). - --define(USERNAME, "rep_admin"). --define(PASSWORD, "secret"). - -setup() -> - Ctx = test_util:start_couch([fabric, mem3, couch_replicator, chttpd]), - Hashed = couch_passwords:hash_admin_password(?PASSWORD), - ok = config:set("admins", ?USERNAME, ?b2l(Hashed), _Persist=false), - Source = ?tempdb(), - Target = ?tempdb(), - {Ctx, {Source, Target}}. - - -teardown({Ctx, {_Source, _Target}}) -> - config:delete("admins", ?USERNAME), - ok = test_util:stop_couch(Ctx). +-include_lib("fabric/test/fabric2_test.hrl"). create_target_with_options_replication_test_() -> { "Create target with range partitions tests", { - foreach, - fun setup/0, fun teardown/1, - [ - fun should_create_target_with_q_4/1, - fun should_create_target_with_q_2_n_1/1, - fun should_create_target_with_default/1, - fun should_not_create_target_with_q_any/1 - ] + setup, + fun couch_replicator_test_helper:start_couch/0, + fun couch_replicator_test_helper:stop_couch/1, + { + foreach, + fun setup/0, + fun teardown/1, + [ + ?TDEF_FE(should_create_target_with_q_4), + ?TDEF_FE(should_create_target_with_q_2_n_1), + ?TDEF_FE(should_create_target_with_default), + ?TDEF_FE(should_not_create_target_with_q_any) + ] + } } }. -should_create_target_with_q_4({_Ctx, {Source, Target}}) -> +setup() -> + Source = ?tempdb(), + Target = ?tempdb(), + {Source, Target}. + + +teardown({Source, Target}) -> + delete_db(Source), + delete_db(Target). + + +should_create_target_with_q_4({Source, Target}) -> RepObject = {[ - {<<"source">>, db_url(Source)}, - {<<"target">>, db_url(Target)}, + {<<"source">>, Source}, + {<<"target">>, Target}, {<<"create_target">>, true}, {<<"create_target_params">>, {[{<<"q">>, <<"4">>}]}} ]}, create_db(Source), create_doc(Source), - {ok, _} = couch_replicator:replicate(RepObject, ?ADMIN_USER), + {ok, _} = couch_replicator_test_helper:replicate(RepObject), - {ok, TargetInfo} = fabric:get_db_info(Target), + TargetInfo = db_info(Target), {ClusterInfo} = couch_util:get_value(cluster, TargetInfo), delete_db(Source), delete_db(Target), - ?_assertEqual(4, couch_util:get_value(q, ClusterInfo)). + ?assertEqual(0, couch_util:get_value(q, ClusterInfo)). -should_create_target_with_q_2_n_1({_Ctx, {Source, Target}}) -> +should_create_target_with_q_2_n_1({Source, Target}) -> RepObject = {[ - {<<"source">>, db_url(Source)}, - {<<"target">>, db_url(Target)}, + {<<"source">>, Source}, + {<<"target">>, Target}, {<<"create_target">>, true}, {<<"create_target_params">>, {[{<<"q">>, <<"2">>}, {<<"n">>, <<"1">>}]}} ]}, create_db(Source), create_doc(Source), - {ok, _} = couch_replicator:replicate(RepObject, ?ADMIN_USER), + {ok, _} = couch_replicator_test_helper:replicate(RepObject), - {ok, TargetInfo} = fabric:get_db_info(Target), + TargetInfo = db_info(Target), {ClusterInfo} = couch_util:get_value(cluster, TargetInfo), delete_db(Source), delete_db(Target), - [ - ?_assertEqual(2, couch_util:get_value(q, ClusterInfo)), - ?_assertEqual(1, couch_util:get_value(n, ClusterInfo)) - ]. + ?assertEqual(0, couch_util:get_value(q, ClusterInfo)), + ?assertEqual(0, couch_util:get_value(n, ClusterInfo)). -should_create_target_with_default({_Ctx, {Source, Target}}) -> +should_create_target_with_default({Source, Target}) -> RepObject = {[ - {<<"source">>, db_url(Source)}, - {<<"target">>, db_url(Target)}, + {<<"source">>, Source}, + {<<"target">>, Target}, {<<"create_target">>, true} ]}, create_db(Source), create_doc(Source), - {ok, _} = couch_replicator:replicate(RepObject, ?ADMIN_USER), + {ok, _} = couch_replicator_test_helper:replicate(RepObject), - {ok, TargetInfo} = fabric:get_db_info(Target), + TargetInfo = db_info(Target), {ClusterInfo} = couch_util:get_value(cluster, TargetInfo), - Q = config:get("cluster", "q", "8"), delete_db(Source), delete_db(Target), - ?_assertEqual(list_to_integer(Q), couch_util:get_value(q, ClusterInfo)). + ?assertEqual(0, couch_util:get_value(q, ClusterInfo)). -should_not_create_target_with_q_any({_Ctx, {Source, Target}}) -> +should_not_create_target_with_q_any({Source, Target}) -> RepObject = {[ - {<<"source">>, db_url(Source)}, - {<<"target">>, db_url(Target)}, + {<<"source">>, Source}, + {<<"target">>, Target}, {<<"create_target">>, false}, {<<"create_target_params">>, {[{<<"q">>, <<"1">>}]}} ]}, create_db(Source), create_doc(Source), - {error, _} = couch_replicator:replicate(RepObject, ?ADMIN_USER), - DbExist = is_list(catch mem3:shards(Target)), + {error, _} = couch_replicator_test_helper:replicate(RepObject), + Exists = try + fabric2_db:open(Target, [?ADMIN_CTX]), + ?assert(false) + catch + error:database_does_not_exist -> + database_does_not_exist + end, delete_db(Source), - ?_assertEqual(false, DbExist). + ?assertEqual(Exists, database_does_not_exist). create_doc(DbName) -> - Body = {[{<<"foo">>, <<"bar">>}]}, - NewDoc = #doc{body = Body}, - {ok, _} = fabric:update_doc(DbName, NewDoc, [?ADMIN_CTX]). + couch_replicator_test_helper:create_docs(DbName, [ + #{<<"_id">> => fabric2_util:uuid(), <<"foo">> => <<"bar">>} + ]). create_db(DbName) -> - ok = fabric:create_db(DbName, [?ADMIN_CTX]). + couch_replicator_test_helper:create_db(DbName). delete_db(DbName) -> - ok = fabric:delete_db(DbName, [?ADMIN_CTX]). + couch_replicator_test_helper:delete_db(DbName). -db_url(DbName) -> - Addr = config:get("chttpd", "bind_address", "127.0.0.1"), - Port = mochiweb_socket_server:get(chttpd, port), - ?l2b(io_lib:format("http://~s:~s@~s:~b/~s", [?USERNAME, ?PASSWORD, Addr, - Port, DbName])). +db_info(DbName) -> + {ok, Db} = fabric2_db:open(DbName, [?ADMIN_CTX]), + {ok, Info} = fabric2_db:get_db_info(Db), + Info. diff --git a/src/couch_replicator/test/eunit/couch_replicator_db_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_db_tests.erl new file mode 100644 index 000000000..053441007 --- /dev/null +++ b/src/couch_replicator/test/eunit/couch_replicator_db_tests.erl @@ -0,0 +1,332 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_replicator_db_tests). + +-include_lib("couch/include/couch_eunit.hrl"). +-include_lib("couch/include/couch_db.hrl"). +-include_lib("couch_replicator/src/couch_replicator.hrl"). +-include_lib("fabric/test/fabric2_test.hrl"). + + +couch_replicator_db_test_() -> + { + "Replications are started from docs in _replicator dbs", + { + setup, + fun couch_replicator_test_helper:start_couch/0, + fun couch_replicator_test_helper:stop_couch/1, + { + foreach, + fun setup/0, + fun teardown/1, + [ + ?TDEF_FE(default_replicator_db_is_created), + ?TDEF_FE(continuous_replication_created_from_doc, 15), + ?TDEF_FE(normal_replication_created_from_doc, 15), + ?TDEF_FE(replicator_db_deleted, 15), + ?TDEF_FE(replicator_db_recreated, 15), + ?TDEF_FE(invalid_replication_docs), + ?TDEF_FE(duplicate_persistent_replication, 15), + ?TDEF_FE(duplicate_transient_replication, 30) + ] + } + } + }. + + +setup() -> + Source = couch_replicator_test_helper:create_db(), + create_doc(Source, #{<<"_id">> => <<"doc1">>}), + Target = couch_replicator_test_helper:create_db(), + Name = ?tempdb(), + RepDb = couch_replicator_test_helper:create_db(<<Name/binary, + "/_replicator">>), + config:set("replicator", "stats_update_interval_sec", "0", false), + config:set("replicator", "create_replicator_db", "false", false), + config:set("couchdb", "enable_database_recovery", "false", false), + config:set("replicator", "min_backoff_penalty_sec", "1", false), + {Source, Target, RepDb}. + + +teardown({Source, Target, RepDb}) -> + config:delete("replicator", "stats_update_interval_sec", false), + config:delete("replicator", "create_replicator_db", false), + config:delete("couchdb", "enable_database_recovery", false), + config:delete("replicator", "min_backoff_penalty_sec", false), + + couch_replicator_test_helper:delete_db(RepDb), + couch_replicator_test_helper:delete_db(?REP_DB_NAME), + couch_replicator_test_helper:delete_db(Source), + couch_replicator_test_helper:delete_db(Target). + + +default_replicator_db_is_created({_, _, _}) -> + config:set("replicator", "create_replicator_db", "true", false), + ?assertEqual(ignore, couch_replicator:ensure_rep_db_exists()), + ?assertMatch({ok, #{}}, fabric2_db:open(?REP_DB_NAME, [])). + + +continuous_replication_created_from_doc({Source, Target, RepDb}) -> + DocId = <<"rdoc1">>, + RDoc = rep_doc(Source, Target, DocId, #{<<"continuous">> => true}), + create_doc(RepDb, RDoc), + wait_scheduler_docs_state(RepDb, DocId, <<"running">>), + + {Code, DocInfo} = scheduler_docs(RepDb, DocId), + ?assertEqual(200, Code), + ?assertMatch(#{ + <<"database">> := RepDb, + <<"doc_id">> := DocId + }, DocInfo), + + RepId = maps:get(<<"id">>, DocInfo), + + ?assertMatch([#{ + <<"database">> := RepDb, + <<"doc_id">> := DocId, + <<"id">> := RepId, + <<"state">> := <<"running">> + }], couch_replicator_test_helper:scheduler_jobs()), + + ?assertMatch({200, #{ + <<"database">> := RepDb, + <<"doc_id">> := DocId, + <<"id">> := RepId, + <<"state">> := <<"running">> + }}, scheduler_jobs(RepId)), + + delete_doc(RepDb, DocId), + wait_scheduler_docs_not_found(RepDb, DocId), + ?assertMatch({404, #{}}, scheduler_jobs(RepId)). + + +normal_replication_created_from_doc({Source, Target, RepDb}) -> + DocId = <<"rdoc2">>, + RDoc = rep_doc(Source, Target, DocId), + create_doc(RepDb, RDoc), + wait_scheduler_docs_state(RepDb, DocId, <<"completed">>), + + {Code, DocInfo} = scheduler_docs(RepDb, DocId), + ?assertEqual(200, Code), + ?assertMatch(#{ + <<"database">> := RepDb, + <<"doc_id">> := DocId, + <<"state">> := <<"completed">>, + <<"info">> := #{ + <<"docs_written">> := 1, + <<"docs_read">> := 1, + <<"missing_revisions_found">> := 1 + } + }, DocInfo), + + wait_doc_state(RepDb, DocId, <<"completed">>), + ?assertMatch(#{ + <<"_replication_state">> := <<"completed">>, + <<"_replication_stats">> := #{ + <<"docs_written">> := 1, + <<"docs_read">> := 1, + <<"missing_revisions_found">> := 1 + } + }, read_doc(RepDb, DocId)), + + delete_doc(RepDb, DocId), + wait_scheduler_docs_not_found(RepDb, DocId). + + +replicator_db_deleted({Source, Target, RepDb}) -> + DocId = <<"rdoc3">>, + RDoc = rep_doc(Source, Target, DocId, #{<<"continuous">> => true}), + create_doc(RepDb, RDoc), + wait_scheduler_docs_state(RepDb, DocId, <<"running">>), + fabric2_db:delete(RepDb, [?ADMIN_CTX]), + wait_scheduler_docs_not_found(RepDb, DocId). + + +replicator_db_recreated({Source, Target, RepDb}) -> + DocId = <<"rdoc4">>, + RDoc = rep_doc(Source, Target, DocId, #{<<"continuous">> => true}), + create_doc(RepDb, RDoc), + wait_scheduler_docs_state(RepDb, DocId, <<"running">>), + + config:set("couchdb", "enable_database_recovery", "true", false), + fabric2_db:delete(RepDb, [?ADMIN_CTX]), + wait_scheduler_docs_not_found(RepDb, DocId), + + Opts = [{start_key, RepDb}, {end_key, RepDb}], + {ok, [DbInfo]} = fabric2_db:list_deleted_dbs_info(Opts), + {_, Timestamp} = lists:keyfind(timestamp, 1, DbInfo), + ok = fabric2_db:undelete(RepDb, RepDb, Timestamp, [?ADMIN_CTX]), + wait_scheduler_docs_state(RepDb, DocId, <<"running">>), + + config:set("couchdb", "enable_database_recovery", "false", false), + fabric2_db:delete(RepDb, [?ADMIN_CTX]), + wait_scheduler_docs_not_found(RepDb, DocId). + + +invalid_replication_docs({_, _, RepDb}) -> + Docs = [ + #{ + <<"_id">> => <<"1">>, + <<"source">> => <<"http://127.0.0.1:1000">> + }, + #{ + <<"_id">> => <<"1">>, + <<"target">> => <<"http://127.0.0.1:1001">> + }, + #{ + <<"_id">> => <<"1">> + }, + #{ + <<"_id">> => <<"1">>, + <<"source">> => <<"http://127.0.0.1:1002">>, + <<"target">> => <<"http://127.0.0.1:1003">>, + <<"create_target">> => <<"bad">> + }, + #{ + <<"_id">> => <<"1">>, + <<"source">> => #{<<"junk">> => 42}, + <<"target">> => <<"http://127.0.0.1:1004">> + }, + #{ + <<"_id">> => <<"1">>, + <<"source">> => <<"http://127.0.0.1:1005">>, + <<"target">> => <<"http://127.0.0.1:1006">>, + <<"selector">> => #{}, + <<"filter">> => <<"a/b">> + }, + #{ + <<"_id">> => <<"1">>, + <<"source">> => <<"http://127.0.0.1:1007">>, + <<"target">> => <<"https://127.0.0.1:1008">>, + <<"doc_ids">> => 42 + } + ], + lists:foreach(fun(Doc) -> + ?assertThrow({forbidden, _}, create_doc(RepDb, Doc)) + end, Docs). + + +duplicate_persistent_replication({Source, Target, RepDb}) -> + DocId1 = <<"rdoc5">>, + RDoc1 = rep_doc(Source, Target, DocId1, #{<<"continuous">> => true}), + create_doc(RepDb, RDoc1), + wait_scheduler_docs_state(RepDb, DocId1, <<"running">>), + + DocId2 = <<"rdoc6">>, + RDoc2 = rep_doc(Source, Target, DocId2, #{<<"continuous">> => true}), + create_doc(RepDb, RDoc2), + wait_scheduler_docs_state(RepDb, DocId2, <<"failed">>), + + delete_doc(RepDb, DocId1), + delete_doc(RepDb, DocId2), + + wait_scheduler_docs_not_found(RepDb, DocId1), + wait_scheduler_docs_not_found(RepDb, DocId2). + + +duplicate_transient_replication({Source, Target, RepDb}) -> + {ok, _Pid, RepId} = couch_replicator_test_helper:replicate_continuous( + Source, Target), + + DocId = <<"rdoc7">>, + RDoc = rep_doc(Source, Target, DocId, #{<<"continuous">> => true}), + create_doc(RepDb, RDoc), + wait_scheduler_docs_state(RepDb, DocId, <<"crashing">>), + + couch_replicator_test_helper:cancel(RepId), + wait_reschedule_docs_state(RepDb, DocId, <<"running">>), + + delete_doc(RepDb, DocId), + wait_scheduler_docs_not_found(RepDb, DocId). + + +scheduler_jobs(Id) -> + SUrl = couch_replicator_test_helper:server_url(), + Url = lists:flatten(io_lib:format("~s/_scheduler/jobs/~s", [SUrl, Id])), + {ok, Code, _, Body} = test_request:get(Url, []), + {Code, jiffy:decode(Body, [return_maps])}. + + +scheduler_docs(DbName, DocId) -> + SUrl = couch_replicator_test_helper:server_url(), + Fmt = "~s/_scheduler/docs/~s/~s", + Url = lists:flatten(io_lib:format(Fmt, [SUrl, DbName, DocId])), + {ok, Code, _, Body} = test_request:get(Url, []), + {Code, jiffy:decode(Body, [return_maps])}. + + +rep_doc(Source, Target, DocId) -> + rep_doc(Source, Target, DocId, #{}). + + +rep_doc(Source, Target, DocId, #{} = Extra) -> + maps:merge(#{ + <<"_id">> => DocId, + <<"source">> => couch_replicator_test_helper:db_url(Source), + <<"target">> => couch_replicator_test_helper:db_url(Target) + }, Extra). + + +create_doc(DbName, Doc) -> + couch_replicator_test_helper:create_docs(DbName, [Doc]). + + +delete_doc(DbName, DocId) -> + {ok, Db} = fabric2_db:open(DbName, [?ADMIN_CTX]), + {ok, Doc} = fabric2_db:open_doc(Db, DocId), + Doc1 = Doc#doc{deleted = true}, + {ok, _} = fabric2_db:update_doc(Db, Doc1, []). + + +read_doc(DbName, DocId) -> + {ok, Db} = fabric2_db:open(DbName, [?ADMIN_CTX]), + {ok, Doc} = fabric2_db:open_doc(Db, DocId, [ejson_body]), + Body = Doc#doc.body, + couch_util:json_decode(couch_util:json_encode(Body), [return_maps]). + + +wait_scheduler_docs_state(DbName, DocId, State) -> + test_util:wait(fun() -> + case scheduler_docs(DbName, DocId) of + {200, #{<<"state">> := State} = Res} -> Res; + {_, _} -> wait + end + end, 10000, 250). + + +wait_scheduler_docs_not_found(DbName, DocId) -> + test_util:wait(fun() -> + case scheduler_docs(DbName, DocId) of + {404, _} -> ok; + {_, _} -> wait + end + end, 10000, 250). + + +wait_reschedule_docs_state(DbName, DocId, State) -> + test_util:wait(fun() -> + couch_replicator_job_server:reschedule(), + case scheduler_docs(DbName, DocId) of + {200, #{<<"state">> := State} = Res} -> Res; + {_, _} -> wait + end + end, 10000, 500). + + +wait_doc_state(DbName, DocId, State) -> + test_util:wait(fun() -> + case read_doc(DbName, DocId) of + #{<<"_replication_state">> := State} -> ok; + #{} -> wait + end + end, 10000, 250). diff --git a/src/couch_replicator/test/eunit/couch_replicator_filtered_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_filtered_tests.erl index 7ac9a4d71..4d72c84f2 100644 --- a/src/couch_replicator/test/eunit/couch_replicator_filtered_tests.erl +++ b/src/couch_replicator/test/eunit/couch_replicator_filtered_tests.erl @@ -12,17 +12,20 @@ -module(couch_replicator_filtered_tests). + -include_lib("couch/include/couch_eunit.hrl"). -include_lib("couch/include/couch_db.hrl"). -include_lib("couch_replicator/src/couch_replicator.hrl"). +-include_lib("fabric/test/fabric2_test.hrl"). --define(DDOC, {[ - {<<"_id">>, <<"_design/filter_ddoc">>}, - {<<"filters">>, {[ - {<<"testfilter">>, <<" +-define(DDOC_ID, <<"_design/filter_ddoc">>). +-define(DDOC, #{ + <<"_id">> => ?DDOC_ID, + <<"filters">> => #{ + <<"testfilter">> => <<" function(doc, req){if (doc.class == 'mammal') return true;} - ">>}, - {<<"queryfilter">>, <<" + ">>, + <<"queryfilter">> => <<" function(doc, req) { if (doc.class && req.query.starts) { return doc.class.indexOf(req.query.starts) === 0; @@ -31,99 +34,87 @@ return false; } } - ">>} - ]}}, - {<<"views">>, {[ - {<<"mammals">>, {[ - {<<"map">>, <<" + ">> + }, + <<"views">> => #{ + <<"mammals">> => #{ + <<"map">> => <<" function(doc) { if (doc.class == 'mammal') { emit(doc._id, null); } } - ">>} - ]}} - ]}} -]}). - -setup(_) -> - Ctx = test_util:start_couch([couch_replicator]), - Source = create_db(), - create_docs(Source), - Target = create_db(), - {Ctx, {Source, Target}}. + ">> + } + } +}). -teardown(_, {Ctx, {Source, Target}}) -> - delete_db(Source), - delete_db(Target), - ok = application:stop(couch_replicator), - ok = test_util:stop_couch(Ctx). filtered_replication_test_() -> - Pairs = [{remote, remote}], { - "Filtered replication tests", + "Replications with filters tests", { - foreachx, - fun setup/1, fun teardown/2, - [{Pair, fun should_succeed/2} || Pair <- Pairs] + setup, + fun couch_replicator_test_helper:start_couch/0, + fun couch_replicator_test_helper:stop_couch/1, + { + foreach, + fun setup/0, + fun teardown/1, + [ + ?TDEF_FE(filtered_replication_test), + ?TDEF_FE(query_filtered_replication_test), + ?TDEF_FE(view_filtered_replication_test), + ?TDEF_FE(replication_id_changes_if_filter_changes, 15) + ] + } } }. -query_filtered_replication_test_() -> - Pairs = [{remote, remote}], - { - "Filtered with query replication tests", - { - foreachx, - fun setup/1, fun teardown/2, - [{Pair, fun should_succeed_with_query/2} || Pair <- Pairs] - } - }. -view_filtered_replication_test_() -> - Pairs = [{remote, remote}], - { - "Filtered with a view replication tests", - { - foreachx, - fun setup/1, fun teardown/2, - [{Pair, fun should_succeed_with_view/2} || Pair <- Pairs] - } - }. +setup() -> + Source = couch_replicator_test_helper:create_db(), + create_docs(Source), + Target = couch_replicator_test_helper:create_db(), + config:set("replicator", "stats_update_interval_sec", "0", false), + config:set("replicator", "interval_sec", "1", false), + {Source, Target}. + + +teardown({Source, Target}) -> + config:delete("replicator", "stats_update_interval_sec", false), + config:delete("replicator", "checkpoint_interval", false), + config:delete("replicator", "interval_sec", false), + couch_replicator_test_helper:delete_db(Source), + couch_replicator_test_helper:delete_db(Target). -should_succeed({From, To}, {_Ctx, {Source, Target}}) -> + +filtered_replication_test({Source, Target}) -> RepObject = {[ - {<<"source">>, db_url(From, Source)}, - {<<"target">>, db_url(To, Target)}, + {<<"source">>, Source}, + {<<"target">>, Target}, {<<"filter">>, <<"filter_ddoc/testfilter">>} ]}, - {ok, _} = couch_replicator:replicate(RepObject, ?ADMIN_USER), - %% FilteredFun is an Erlang version of following JS function - %% function(doc, req){if (doc.class == 'mammal') return true;} + {ok, _} = couch_replicator_test_helper:replicate(RepObject), FilterFun = fun(_DocId, {Props}) -> couch_util:get_value(<<"class">>, Props) == <<"mammal">> end, {ok, TargetDbInfo, AllReplies} = compare_dbs(Source, Target, FilterFun), - {lists:flatten(io_lib:format("~p -> ~p", [From, To])), [ - {"Target DB has proper number of docs", - ?_assertEqual(1, proplists:get_value(doc_count, TargetDbInfo))}, - {"Target DB doesn't have deleted docs", - ?_assertEqual(0, proplists:get_value(doc_del_count, TargetDbInfo))}, - {"All the docs filtered as expected", - ?_assert(lists:all(fun(Valid) -> Valid end, AllReplies))} - ]}. - -should_succeed_with_query({From, To}, {_Ctx, {Source, Target}}) -> + ?assertEqual(1, proplists:get_value(doc_count, TargetDbInfo)), + ?assertEqual(0, proplists:get_value(doc_del_count, TargetDbInfo)), + ?assert(lists:all(fun(Valid) -> Valid end, AllReplies)). + + +query_filtered_replication_test({Source, Target}) -> RepObject = {[ - {<<"source">>, db_url(From, Source)}, - {<<"target">>, db_url(To, Target)}, + {<<"source">>, Source}, + {<<"target">>, Target}, {<<"filter">>, <<"filter_ddoc/queryfilter">>}, {<<"query_params">>, {[ {<<"starts">>, <<"a">>} ]}} ]}, - {ok, _} = couch_replicator:replicate(RepObject, ?ADMIN_USER), + {ok, _} = couch_replicator_test_helper:replicate(RepObject), FilterFun = fun(_DocId, {Props}) -> case couch_util:get_value(<<"class">>, Props) of <<"a", _/binary>> -> true; @@ -131,109 +122,144 @@ should_succeed_with_query({From, To}, {_Ctx, {Source, Target}}) -> end end, {ok, TargetDbInfo, AllReplies} = compare_dbs(Source, Target, FilterFun), - {lists:flatten(io_lib:format("~p -> ~p", [From, To])), [ - {"Target DB has proper number of docs", - ?_assertEqual(2, proplists:get_value(doc_count, TargetDbInfo))}, - {"Target DB doesn't have deleted docs", - ?_assertEqual(0, proplists:get_value(doc_del_count, TargetDbInfo))}, - {"All the docs filtered as expected", - ?_assert(lists:all(fun(Valid) -> Valid end, AllReplies))} - ]}. - -should_succeed_with_view({From, To}, {_Ctx, {Source, Target}}) -> + ?assertEqual(2, proplists:get_value(doc_count, TargetDbInfo)), + ?assertEqual(0, proplists:get_value(doc_del_count, TargetDbInfo)), + ?assert(lists:all(fun(Valid) -> Valid end, AllReplies)). + + +view_filtered_replication_test({Source, Target}) -> RepObject = {[ - {<<"source">>, db_url(From, Source)}, - {<<"target">>, db_url(To, Target)}, + {<<"source">>, Source}, + {<<"target">>, Target}, {<<"filter">>, <<"_view">>}, {<<"query_params">>, {[ {<<"view">>, <<"filter_ddoc/mammals">>} ]}} ]}, - {ok, _} = couch_replicator:replicate(RepObject, ?ADMIN_USER), + {ok, _} = couch_replicator_test_helper:replicate(RepObject), FilterFun = fun(_DocId, {Props}) -> couch_util:get_value(<<"class">>, Props) == <<"mammal">> end, {ok, TargetDbInfo, AllReplies} = compare_dbs(Source, Target, FilterFun), - {lists:flatten(io_lib:format("~p -> ~p", [From, To])), [ - {"Target DB has proper number of docs", - ?_assertEqual(1, proplists:get_value(doc_count, TargetDbInfo))}, - {"Target DB doesn't have deleted docs", - ?_assertEqual(0, proplists:get_value(doc_del_count, TargetDbInfo))}, - {"All the docs filtered as expected", - ?_assert(lists:all(fun(Valid) -> Valid end, AllReplies))} - ]}. + ?assertEqual(1, proplists:get_value(doc_count, TargetDbInfo)), + ?assertEqual(0, proplists:get_value(doc_del_count, TargetDbInfo)), + ?assert(lists:all(fun(Valid) -> Valid end, AllReplies)). + + +replication_id_changes_if_filter_changes({Source, Target}) -> + config:set("replicator", "checkpoint_interval", "500", false), + Rep = {[ + {<<"source">>, Source}, + {<<"target">>, Target}, + {<<"filter">>, <<"filter_ddoc/testfilter">>}, + {<<"continuous">>, true} + ]}, + {ok, _, RepId1} = couch_replicator_test_helper:replicate_continuous(Rep), + + wait_scheduler_docs_written(1), + + ?assertMatch([#{<<"id">> := RepId1}], + couch_replicator_test_helper:scheduler_jobs()), + + FilterFun1 = fun(_, {Props}) -> + couch_util:get_value(<<"class">>, Props) == <<"mammal">> + end, + {ok, TargetDbInfo1, AllReplies1} = compare_dbs(Source, Target, FilterFun1), + ?assertEqual(1, proplists:get_value(doc_count, TargetDbInfo1)), + ?assert(lists:all(fun(Valid) -> Valid end, AllReplies1)), + + {ok, SourceDb} = fabric2_db:open(Source, [?ADMIN_CTX]), + {ok, DDoc1} = fabric2_db:open_doc(SourceDb, ?DDOC_ID), + Flt = <<"function(doc, req) {if (doc.class == 'reptiles') return true};">>, + DDoc2 = DDoc1#doc{body = {[ + {<<"filters">>, {[ + {<<"testfilter">>, Flt} + ]}} + ]}}, + {ok, {_, _}} = fabric2_db:update_doc(SourceDb, DDoc2), + Info = wait_scheduler_repid_change(RepId1), + + RepId2 = maps:get(<<"id">>, Info), + ?assert(RepId1 =/= RepId2), + + wait_scheduler_docs_written(1), + + FilterFun2 = fun(_, {Props}) -> + Class = couch_util:get_value(<<"class">>, Props), + Class == <<"mammal">> orelse Class == <<"reptiles">> + end, + {ok, TargetDbInfo2, AllReplies2} = compare_dbs(Source, Target, FilterFun2), + ?assertEqual(2, proplists:get_value(doc_count, TargetDbInfo2)), + ?assert(lists:all(fun(Valid) -> Valid end, AllReplies2)), + + couch_replicator_test_helper:cancel(RepId2). + compare_dbs(Source, Target, FilterFun) -> - {ok, SourceDb} = couch_db:open_int(Source, []), - {ok, TargetDb} = couch_db:open_int(Target, []), - {ok, TargetDbInfo} = couch_db:get_db_info(TargetDb), - Fun = fun(FullDocInfo, Acc) -> - {ok, DocId, SourceDoc} = read_doc(SourceDb, FullDocInfo), - TargetReply = read_doc(TargetDb, DocId), - case FilterFun(DocId, SourceDoc) of - true -> - ValidReply = {ok, DocId, SourceDoc} == TargetReply, - {ok, [ValidReply|Acc]}; - false -> - ValidReply = {not_found, missing} == TargetReply, - {ok, [ValidReply|Acc]} + {ok, TargetDb} = fabric2_db:open(Target, [?ADMIN_CTX]), + {ok, TargetDbInfo} = fabric2_db:get_db_info(TargetDb), + Fun = fun(SrcDoc, TgtDoc, Acc) -> + case FilterFun(SrcDoc#doc.id, SrcDoc#doc.body) of + true -> [SrcDoc == TgtDoc | Acc]; + false -> [not_found == TgtDoc | Acc] end end, - {ok, AllReplies} = couch_db:fold_docs(SourceDb, Fun, [], []), - ok = couch_db:close(SourceDb), - ok = couch_db:close(TargetDb), - {ok, TargetDbInfo, AllReplies}. - -read_doc(Db, DocIdOrInfo) -> - case couch_db:open_doc(Db, DocIdOrInfo) of - {ok, Doc} -> - {Props} = couch_doc:to_json_obj(Doc, [attachments]), - DocId = couch_util:get_value(<<"_id">>, Props), - {ok, DocId, {Props}}; - Error -> - Error - end. - -create_db() -> - DbName = ?tempdb(), - {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]), - ok = couch_db:close(Db), - DbName. + Res = couch_replicator_test_helper:compare_fold(Source, Target, Fun, []), + {ok, TargetDbInfo, Res}. + create_docs(DbName) -> - {ok, Db} = couch_db:open(DbName, [?ADMIN_CTX]), - DDoc = couch_doc:from_json_obj(?DDOC), - Doc1 = couch_doc:from_json_obj({[ - {<<"_id">>, <<"doc1">>}, - {<<"class">>, <<"mammal">>}, - {<<"value">>, 1} - - ]}), - Doc2 = couch_doc:from_json_obj({[ - {<<"_id">>, <<"doc2">>}, - {<<"class">>, <<"amphibians">>}, - {<<"value">>, 2} - - ]}), - Doc3 = couch_doc:from_json_obj({[ - {<<"_id">>, <<"doc3">>}, - {<<"class">>, <<"reptiles">>}, - {<<"value">>, 3} - - ]}), - Doc4 = couch_doc:from_json_obj({[ - {<<"_id">>, <<"doc4">>}, - {<<"class">>, <<"arthropods">>}, - {<<"value">>, 2} - - ]}), - {ok, _} = couch_db:update_docs(Db, [DDoc, Doc1, Doc2, Doc3, Doc4]), - couch_db:close(Db). - -delete_db(DbName) -> - ok = couch_server:delete(DbName, [?ADMIN_CTX]). - -db_url(remote, DbName) -> - Addr = config:get("httpd", "bind_address", "127.0.0.1"), - Port = mochiweb_socket_server:get(couch_httpd, port), - ?l2b(io_lib:format("http://~s:~b/~s", [Addr, Port, DbName])). + couch_replicator_test_helper:create_docs(DbName, [ + ?DDOC, + #{ + <<"_id">> => <<"doc1">>, + <<"class">> => <<"mammal">>, + <<"value">> => 1 + }, + #{ + <<"_id">> => <<"doc2">>, + <<"class">> => <<"amphibians">>, + <<"value">> => 2 + }, + #{ + <<"_id">> => <<"doc3">>, + <<"class">> => <<"reptiles">>, + <<"value">> => 3 + }, + #{ + <<"_id">> => <<"doc4">>, + <<"class">> => <<"arthropods">>, + <<"value">> => 2 + } + ]). + + +wait_scheduler_docs_written(DocsWritten) -> + test_util:wait(fun() -> + case couch_replicator_test_helper:scheduler_jobs() of + [] -> + wait; + [#{<<"info">> := null}] -> + wait; + [#{<<"info">> := Info}] -> + case Info of + #{<<"docs_written">> := DocsWritten} -> Info; + #{} -> wait + end + end + end, 10000, 250). + + +wait_scheduler_repid_change(OldRepId) -> + test_util:wait(fun() -> + case couch_replicator_test_helper:scheduler_jobs() of + [] -> + wait; + [#{<<"id">> := OldRepId}] -> + wait; + [#{<<"id">> := null}] -> + wait; + [#{<<"id">> := NewId} = Info] when is_binary(NewId) -> + Info + end + end, 10000, 250). diff --git a/src/couch_replicator/test/eunit/couch_replicator_httpc_pool_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_httpc_pool_tests.erl index c4ad4e9b6..6c61446cc 100644 --- a/src/couch_replicator/test/eunit/couch_replicator_httpc_pool_tests.erl +++ b/src/couch_replicator/test/eunit/couch_replicator_httpc_pool_tests.erl @@ -12,17 +12,13 @@ -module(couch_replicator_httpc_pool_tests). + -include_lib("couch/include/couch_eunit.hrl"). -include_lib("couch/include/couch_db.hrl"). - --define(TIMEOUT, 1000). +-include_lib("fabric/test/fabric2_test.hrl"). -setup() -> - spawn_pool(). - -teardown(Pool) -> - stop_pool(Pool). +-define(TIMEOUT, 1000). httpc_pool_test_() -> @@ -30,75 +26,81 @@ httpc_pool_test_() -> "httpc pool tests", { setup, - fun() -> test_util:start_couch([couch_replicator]) end, fun test_util:stop_couch/1, + fun couch_replicator_test_helper:start_couch/0, + fun couch_replicator_test_helper:stop_couch/1, { foreach, - fun setup/0, fun teardown/1, + fun setup/0, + fun teardown/1, [ - fun should_block_new_clients_when_full/1, - fun should_replace_worker_on_death/1 + ?TDEF_FE(should_block_new_clients_when_full), + ?TDEF_FE(should_replace_worker_on_death) ] } } }. +setup() -> + spawn_pool(). + + +teardown(Pool) -> + stop_pool(Pool). + + should_block_new_clients_when_full(Pool) -> - ?_test(begin - Client1 = spawn_client(Pool), - Client2 = spawn_client(Pool), - Client3 = spawn_client(Pool), + Client1 = spawn_client(Pool), + Client2 = spawn_client(Pool), + Client3 = spawn_client(Pool), + + ?assertEqual(ok, ping_client(Client1)), + ?assertEqual(ok, ping_client(Client2)), + ?assertEqual(ok, ping_client(Client3)), - ?assertEqual(ok, ping_client(Client1)), - ?assertEqual(ok, ping_client(Client2)), - ?assertEqual(ok, ping_client(Client3)), + Worker1 = get_client_worker(Client1, "1"), + Worker2 = get_client_worker(Client2, "2"), + Worker3 = get_client_worker(Client3, "3"), - Worker1 = get_client_worker(Client1, "1"), - Worker2 = get_client_worker(Client2, "2"), - Worker3 = get_client_worker(Client3, "3"), + ?assert(is_process_alive(Worker1)), + ?assert(is_process_alive(Worker2)), + ?assert(is_process_alive(Worker3)), - ?assert(is_process_alive(Worker1)), - ?assert(is_process_alive(Worker2)), - ?assert(is_process_alive(Worker3)), + ?assertNotEqual(Worker1, Worker2), + ?assertNotEqual(Worker2, Worker3), + ?assertNotEqual(Worker3, Worker1), - ?assertNotEqual(Worker1, Worker2), - ?assertNotEqual(Worker2, Worker3), - ?assertNotEqual(Worker3, Worker1), + Client4 = spawn_client(Pool), + ?assertEqual(timeout, ping_client(Client4)), - Client4 = spawn_client(Pool), - ?assertEqual(timeout, ping_client(Client4)), + ?assertEqual(ok, stop_client(Client1)), + ?assertEqual(ok, ping_client(Client4)), - ?assertEqual(ok, stop_client(Client1)), - ?assertEqual(ok, ping_client(Client4)), + Worker4 = get_client_worker(Client4, "4"), + ?assertEqual(Worker1, Worker4), - Worker4 = get_client_worker(Client4, "4"), - ?assertEqual(Worker1, Worker4), + lists:foreach(fun(C) -> + ?assertEqual(ok, stop_client(C)) + end, [Client2, Client3, Client4]). - lists:foreach( - fun(C) -> - ?assertEqual(ok, stop_client(C)) - end, [Client2, Client3, Client4]) - end). should_replace_worker_on_death(Pool) -> - ?_test(begin - Client1 = spawn_client(Pool), - ?assertEqual(ok, ping_client(Client1)), - Worker1 = get_client_worker(Client1, "1"), - ?assert(is_process_alive(Worker1)), + Client1 = spawn_client(Pool), + ?assertEqual(ok, ping_client(Client1)), + Worker1 = get_client_worker(Client1, "1"), + ?assert(is_process_alive(Worker1)), - ?assertEqual(ok, kill_client_worker(Client1)), - ?assertNot(is_process_alive(Worker1)), - ?assertEqual(ok, stop_client(Client1)), + ?assertEqual(ok, kill_client_worker(Client1)), + ?assertNot(is_process_alive(Worker1)), + ?assertEqual(ok, stop_client(Client1)), - Client2 = spawn_client(Pool), - ?assertEqual(ok, ping_client(Client2)), - Worker2 = get_client_worker(Client2, "2"), - ?assert(is_process_alive(Worker2)), + Client2 = spawn_client(Pool), + ?assertEqual(ok, ping_client(Client2)), + Worker2 = get_client_worker(Client2, "2"), + ?assert(is_process_alive(Worker2)), - ?assertNotEqual(Worker1, Worker2), - ?assertEqual(ok, stop_client(Client2)) - end). + ?assertNotEqual(Worker1, Worker2), + ?assertEqual(ok, stop_client(Client2)). spawn_client(Pool) -> @@ -110,6 +112,7 @@ spawn_client(Pool) -> end), {Pid, Ref}. + ping_client({Pid, Ref}) -> Pid ! ping, receive @@ -119,18 +122,18 @@ ping_client({Pid, Ref}) -> timeout end. + get_client_worker({Pid, Ref}, ClientName) -> Pid ! get_worker, receive {worker, Ref, Worker} -> Worker after ?TIMEOUT -> - erlang:error( - {assertion_failed, - [{module, ?MODULE}, {line, ?LINE}, - {reason, "Timeout getting client " ++ ClientName ++ " worker"}]}) + erlang:error({assertion_failed, [{module, ?MODULE}, {line, ?LINE}, + {reason, "Timeout getting client " ++ ClientName ++ " worker"}]}) end. + stop_client({Pid, Ref}) -> Pid ! stop, receive @@ -140,6 +143,7 @@ stop_client({Pid, Ref}) -> timeout end. + kill_client_worker({Pid, Ref}) -> Pid ! get_worker, receive @@ -150,6 +154,7 @@ kill_client_worker({Pid, Ref}) -> timeout end. + loop(Parent, Ref, Worker, Pool) -> receive ping -> @@ -163,12 +168,14 @@ loop(Parent, Ref, Worker, Pool) -> Parent ! {stop, Ref} end. + spawn_pool() -> - Host = config:get("httpd", "bind_address", "127.0.0.1"), - Port = config:get("httpd", "port", "5984"), + Host = config:get("chttpd", "bind_address", "127.0.0.1"), + Port = config:get("chttpd", "port", "5984"), {ok, Pool} = couch_replicator_httpc_pool:start_link( "http://" ++ Host ++ ":" ++ Port, [{max_connections, 3}]), Pool. + stop_pool(Pool) -> ok = couch_replicator_httpc_pool:stop(Pool). diff --git a/src/couch_replicator/test/eunit/couch_replicator_id_too_long_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_id_too_long_tests.erl index a4696c4b8..3a0e6f7bd 100644 --- a/src/couch_replicator/test/eunit/couch_replicator_id_too_long_tests.erl +++ b/src/couch_replicator/test/eunit/couch_replicator_id_too_long_tests.erl @@ -15,76 +15,57 @@ -include_lib("couch/include/couch_eunit.hrl"). -include_lib("couch/include/couch_db.hrl"). -include_lib("couch_replicator/src/couch_replicator.hrl"). - - -setup(_) -> - Ctx = test_util:start_couch([couch_replicator]), - Source = create_db(), - create_doc(Source), - Target = create_db(), - {Ctx, {Source, Target}}. - - -teardown(_, {Ctx, {Source, Target}}) -> - delete_db(Source), - delete_db(Target), - config:set("replicator", "max_document_id_length", "infinity"), - ok = test_util:stop_couch(Ctx). +-include_lib("fabric/test/fabric2_test.hrl"). id_too_long_replication_test_() -> - Pairs = [{remote, remote}], { "Doc id too long tests", { - foreachx, - fun setup/1, fun teardown/2, - [{Pair, fun should_succeed/2} || Pair <- Pairs] ++ - [{Pair, fun should_fail/2} || Pair <- Pairs] + setup, + fun couch_replicator_test_helper:start_couch/0, + fun couch_replicator_test_helper:stop_couch/1, + { + foreach, + fun setup/0, + fun teardown/1, + [ + ?TDEF_FE(should_succeed), + ?TDEF_FE(should_fail) + + ] + } } }. -should_succeed({From, To}, {_Ctx, {Source, Target}}) -> - RepObject = {[ - {<<"source">>, db_url(From, Source)}, - {<<"target">>, db_url(To, Target)} - ]}, - config:set("replicator", "max_document_id_length", "5"), - {ok, _} = couch_replicator:replicate(RepObject, ?ADMIN_USER), - ?_assertEqual(ok, couch_replicator_test_helper:compare_dbs(Source, Target)). - +setup() -> + Source = couch_replicator_test_helper:create_db(), + create_doc(Source), + Target = couch_replicator_test_helper:create_db(), + {Source, Target}. -should_fail({From, To}, {_Ctx, {Source, Target}}) -> - RepObject = {[ - {<<"source">>, db_url(From, Source)}, - {<<"target">>, db_url(To, Target)} - ]}, - config:set("replicator", "max_document_id_length", "4"), - {ok, _} = couch_replicator:replicate(RepObject, ?ADMIN_USER), - ?_assertError({badmatch, {not_found, missing}}, - couch_replicator_test_helper:compare_dbs(Source, Target)). +teardown({Source, Target}) -> + config:delete("replicator", "max_document_id_length", false), + couch_replicator_test_helper:delete_db(Source), + couch_replicator_test_helper:delete_db(Target). -create_db() -> - DbName = ?tempdb(), - {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]), - ok = couch_db:close(Db), - DbName. +should_succeed({Source, Target}) -> + config:set("replicator", "max_document_id_length", "5", false), + {ok, _} = couch_replicator_test_helper:replicate(Source, Target), + ?assertEqual(ok, couch_replicator_test_helper:compare_dbs(Source, Target)). -create_doc(DbName) -> - {ok, Db} = couch_db:open(DbName, [?ADMIN_CTX]), - Doc = couch_doc:from_json_obj({[{<<"_id">>, <<"12345">>}]}), - {ok, _} = couch_db:update_doc(Db, Doc, []), - couch_db:close(Db). +should_fail({Source, Target}) -> + config:set("replicator", "max_document_id_length", "4", false), + {ok, _} = couch_replicator_test_helper:replicate(Source, Target), + ExceptIds = [<<"12345">>], + ?assertEqual(ok, couch_replicator_test_helper:compare_dbs(Source, Target, + ExceptIds)). -delete_db(DbName) -> - ok = couch_server:delete(DbName, [?ADMIN_CTX]). - -db_url(remote, DbName) -> - Addr = config:get("httpd", "bind_address", "127.0.0.1"), - Port = mochiweb_socket_server:get(couch_httpd, port), - ?l2b(io_lib:format("http://~s:~b/~s", [Addr, Port, DbName])). +create_doc(DbName) -> + Docs = [#{<<"_id">> => <<"12345">>}], + couch_replicator_test_helper:create_docs(DbName, Docs). diff --git a/src/couch_replicator/test/eunit/couch_replicator_job_server_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_job_server_tests.erl new file mode 100644 index 000000000..698a84400 --- /dev/null +++ b/src/couch_replicator/test/eunit/couch_replicator_job_server_tests.erl @@ -0,0 +1,437 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_replicator_job_server_tests). + +-include_lib("couch/include/couch_eunit.hrl"). +-include_lib("fabric/test/fabric2_test.hrl"). + + +-define(SHUTDOWN_TIMEOUT, 1000). +-define(JOB_SERVER, couch_replicator_job_server). + + +job_server_test_() -> + { + "Test job server", + { + setup, + fun setup_all/0, + fun teardown_all/1, + { + foreach, + fun setup/0, + fun teardown/1, + [ + ?TDEF_FE(should_start_up), + ?TDEF_FE(reschedule_resets_timer), + ?TDEF_FE(reschedule_reads_config), + ?TDEF_FE(acceptors_spawned_if_pending), + ?TDEF_FE(acceptors_not_spawned_if_no_pending), + ?TDEF_FE(acceptors_not_spawned_if_no_max_churn), + ?TDEF_FE(acceptors_not_spawned_if_no_churn_budget), + ?TDEF_FE(acceptors_spawned_on_acceptor_exit), + ?TDEF_FE(acceptor_turns_into_worker), + ?TDEF_FE(acceptors_spawned_on_worker_exit), + ?TDEF_FE(excess_acceptors_spawned), + ?TDEF_FE(excess_workers_trimmed_on_reschedule), + ?TDEF_FE(recent_workers_are_not_stopped) + ] + } + } + }. + + +setup_all() -> + Ctx = test_util:start_couch(), + meck:new(couch_replicator_job_server, [passthrough]), + mock_pending(0), + meck:expect(couch_replicator_jobs, set_timeout, 0, ok), + meck:expect(couch_replicator_jobs, fold_jobs, 3, ok), + meck:expect(couch_replicator_job, start_link, fun() -> + {ok, spawn_link(fun() -> start_job() end)} + end), + Ctx. + + +teardown_all(Ctx) -> + meck:unload(), + config_delete("interval_sec"), + config_delete("max_acceptors"), + config_delete("max_jobs"), + config_delete("max_churn"), + config_delete("min_run_time_sec"), + config_delete("transient_job_max_age_sec"), + test_util:stop_couch(Ctx). + + +setup() -> + config_set("interval_sec", "99999"), + config_set("max_acceptors", "0"), + config_set("max_jobs", "0"), + config_set("max_churn", "1"), + config_set("min_run_time_sec", "0"), + config_set("transient_job_max_age_sec", "99999"), + + mock_pending(0), + + {ok, SPid} = ?JOB_SERVER:start_link(?SHUTDOWN_TIMEOUT), + SPid. + + +teardown(SPid) when is_pid(SPid) -> + unlink(SPid), + Ref = monitor(process, SPid), + exit(SPid, kill), + receive {'DOWN', Ref, _, _, _} -> ok end, + + meck:reset(couch_replicator_jobs), + meck:reset(couch_replicator_job), + meck:reset(couch_replicator_job_server), + + config_delete("interval_sec"), + config_delete("max_acceptors"), + config_delete("max_jobs"), + config_delete("max_churn"), + config_delete("min_run_time_sec"), + config_delete("transient_job_max_age_sec"). + + +should_start_up(SPid) -> + ?assert(is_process_alive(SPid)), + ?assertEqual(SPid, whereis(?JOB_SERVER)), + State = sys:get_state(?JOB_SERVER), + #{ + acceptors := #{}, + workers := #{}, + churn := 0, + config := Config, + timer := Timer, + timeout := ?SHUTDOWN_TIMEOUT + } = State, + + % Make sure it read the config + ?assertMatch(#{ + max_acceptors := 0, + interval_sec := 99999, + max_jobs := 0, + max_churn := 1, + min_run_time_sec := 0, + transient_job_max_age_sec := 99999 + }, Config), + + % Timer was set up + ?assert(is_reference(Timer)), + ?assert(is_integer(erlang:read_timer(Timer))). + + +reschedule_resets_timer(_) -> + #{timer := OldTimer} = sys:get_state(?JOB_SERVER), + + ?assertEqual(ok, ?JOB_SERVER:reschedule()), + + #{timer := Timer} = sys:get_state(?JOB_SERVER), + ?assert(is_reference(Timer)), + ?assert(Timer =/= OldTimer). + + +reschedule_reads_config(_) -> + config_set("interval_sec", "99998"), + + ?JOB_SERVER:reschedule(), + + #{config := Config} = sys:get_state(?JOB_SERVER), + ?assertMatch(#{interval_sec := 99998}, Config). + + +acceptors_spawned_if_pending(_) -> + config_set("max_acceptors", "1"), + mock_pending(1), + + ?JOB_SERVER:reschedule(), + + ?assertMatch([Pid] when is_pid(Pid), acceptors()). + + +acceptors_not_spawned_if_no_pending(_) -> + config_set("max_acceptors", "1"), + mock_pending(0), + + ?JOB_SERVER:reschedule(), + + ?assertEqual([], acceptors()). + + +acceptors_not_spawned_if_no_max_churn(_) -> + config_set("max_churn", "0"), + config_set("max_acceptors", "1"), + mock_pending(1), + + ?JOB_SERVER:reschedule(), + + ?assertEqual([], acceptors()). + + +acceptors_not_spawned_if_no_churn_budget(_) -> + config_set("max_churn", "1"), + config_set("max_acceptors", "1"), + mock_pending(0), + + % To read the config + ?JOB_SERVER:reschedule(), + + ?assertEqual([], acceptors()), + + mock_pending(1), + + % Exhaust churn budget + sys:replace_state(couch_replicator_job_server, fun(#{} = St) -> + St#{churn := 1} + end), + + ?JOB_SERVER:reschedule(), + + ?assertEqual([], acceptors()). + + +acceptors_spawned_on_acceptor_exit(_) -> + config_set("max_acceptors", "3"), + config_set("max_jobs", "4"), + mock_pending(1), + + ?JOB_SERVER:reschedule(), + + [A1] = acceptors(), + + exit(A1, kill), + meck:wait(?JOB_SERVER, handle_info, [{'EXIT', A1, killed}, '_'], 2000), + + ?assertEqual(3, length(acceptors())). + + +acceptor_turns_into_worker(_) -> + config_set("max_acceptors", "3"), + config_set("max_jobs", "4"), + mock_pending(1), + + ?JOB_SERVER:reschedule(), + + [A1] = acceptors(), + accept_job(A1, true), + ?assertEqual(3, length(acceptors())), + #{workers := Workers} = sys:get_state(?JOB_SERVER), + ?assertMatch([{A1, {true, _}}], maps:to_list(Workers)). + + +acceptors_spawned_on_worker_exit(_) -> + config_set("max_acceptors", "1"), + config_set("max_jobs", "1"), + mock_pending(1), + + ?JOB_SERVER:reschedule(), + + [A1] = acceptors(), + accept_job(A1, true), + + % Since max_jobs = 1 no more acceptors are spawned + ?assertEqual(0, length(acceptors())), + + % Same acceptor process is now a worker + ?assertEqual([A1], workers()), + + exit(A1, shutdown), + meck:wait(?JOB_SERVER, handle_info, [{'EXIT', A1, shutdown}, '_'], 2000), + + % New acceptor process started + ?assertEqual(1, length(acceptors())), + ?assertEqual(0, length(workers())). + + +excess_acceptors_spawned(_) -> + config_set("max_acceptors", "2"), + config_set("max_churn", "3"), + config_set("max_jobs", "4"), + mock_pending(100), + + ?JOB_SERVER:reschedule(), + + ?assertEqual(3, length(acceptors())), + + accept_all(), + + ?assertEqual(3, length(workers())), + ?assertEqual(1, length(acceptors())), + % Check that the churn budget was consumed + ?assertMatch(#{churn := 3}, sys:get_state(?JOB_SERVER)), + + accept_all(), + + % No more acceptors spawned after reaching max_jobs + ?assertEqual(0, length(acceptors())), + ?assertEqual(4, length(workers())), + + ?JOB_SERVER:reschedule(), + + % Since all churn budget was consumed, no new acceptors should have beens + % spawned this cycle but churn budget should have been reset + ?assertEqual(0, length(acceptors())), + ?assertEqual(4, length(workers())), + ?assertMatch(#{churn := 0}, sys:get_state(?JOB_SERVER)), + + ?JOB_SERVER:reschedule(), + + % Should have spawned 3 excess acceptors + ?assertEqual(3, length(acceptors())), + ?assertEqual(4, length(workers())), + + accept_all(), + + % Running with an excess number of workers + ?assertEqual(0, length(acceptors())), + ?assertEqual(7, length(workers())). + + +excess_workers_trimmed_on_reschedule(_) -> + config_set("max_acceptors", "2"), + config_set("max_churn", "3"), + config_set("max_jobs", "4"), + mock_pending(100), + + ?JOB_SERVER:reschedule(), + + [A1, A2, A3] = acceptors(), + accept_job(A1, true), + accept_job(A2, false), + accept_job(A3, false), + [A4] = acceptors(), + accept_job(A4, true), + + ?JOB_SERVER:reschedule(), + + % First reschedule was to reset the churn budget, this next one is to spawn + % an excess number of acceptors. + ?JOB_SERVER:reschedule(), + + [A5, A6, A7] = acceptors(), + accept_job(A5, true), + accept_job(A6, false), + accept_job(A7, false), + + ?assertEqual(7, length(workers())), + + % Running with an excess number of workers. These should be trimmed on the + % during the next cycle + ?JOB_SERVER:reschedule(), + + Workers = workers(), + ?assertEqual(4, length(Workers)), + ?assertEqual(0, length(acceptors())), + + % Check that A1 and A4 were skipped since they are not continuous + ?assertEqual(Workers, Workers -- [A2, A3, A6]). + + +recent_workers_are_not_stopped(_) -> + config_set("max_acceptors", "2"), + config_set("max_churn", "3"), + config_set("max_jobs", "4"), + mock_pending(100), + + ?JOB_SERVER:reschedule(), + + [A1, A2, A3] = acceptors(), + accept_job(A1, true), + accept_job(A2, false), + accept_job(A3, false), + [A4] = acceptors(), + accept_job(A4, true), + + ?JOB_SERVER:reschedule(), + + % First reschedule was to reset the churn budget, this next one is to spawn + % an excess number of acceptors. + ?JOB_SERVER:reschedule(), + + [A5, A6, A7] = acceptors(), + accept_job(A5, true), + accept_job(A6, false), + accept_job(A7, false), + + ?assertEqual(7, length(workers())), + + % Running with an excess number of workers. But they won't be stopped on + % reschedule if they ran for a period less than min_run_time_sec during the + % next cycle + config_set("min_run_time_sec", "9999"), + + % don't want to start new acceptors anymore + mock_pending(0), + config_set("max_acceptors", "0"), + + ?JOB_SERVER:reschedule(), + + ?assertEqual(7, length(workers())), + ?assertEqual(0, length(acceptors())), + + config_set("min_run_time_sec", "0"), + + ?JOB_SERVER:reschedule(), + + ?assertEqual(4, length(workers())), + ?assertEqual(0, length(acceptors())). + + +config_set(K, V) -> + config:set("replicator", K, V, _Persist = false). + + +config_delete(K) -> + config:delete("replicator", K, _Persist = false). + + +mock_pending(N) -> + meck:expect(couch_replicator_jobs, pending_count, 2, N). + + +acceptors() -> + #{acceptors := Acceptors} = sys:get_state(?JOB_SERVER), + maps:keys(Acceptors). + + +workers() -> + #{workers := Workers} = sys:get_state(?JOB_SERVER), + maps:keys(Workers). + + +accept_job(APid, Normal) -> + APid ! {accept_job, Normal, self()}, + receive + {job_accepted, APid} -> ok + after + 5000 -> + error(test_job_accept_timeout) + end. + + +accept_all() -> + [accept_job(APid, true) || APid <- acceptors()]. + + +start_job() -> + receive + {accept_job, Normal, From} -> + ok = ?JOB_SERVER:accepted(self(), Normal), + From ! {job_accepted, self()}, + start_job(); + {exit_job, ExitSig} -> + exit(ExitSig) + end. diff --git a/src/couch_replicator/test/eunit/couch_replicator_large_atts_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_large_atts_tests.erl index 27c89a0cd..fcbdf229f 100644 --- a/src/couch_replicator/test/eunit/couch_replicator_large_atts_tests.erl +++ b/src/couch_replicator/test/eunit/couch_replicator_large_atts_tests.erl @@ -14,12 +14,8 @@ -include_lib("couch/include/couch_eunit.hrl"). -include_lib("couch/include/couch_db.hrl"). +-include_lib("fabric/test/fabric2_test.hrl"). --import(couch_replicator_test_helper, [ - db_url/1, - replicate/2, - compare_dbs/2 -]). -define(ATT_SIZE_1, 2 * 1024 * 1024). -define(ATT_SIZE_2, round(6.6 * 1024 * 1024)). @@ -27,90 +23,65 @@ -define(TIMEOUT_EUNIT, 120). -setup() -> - DbName = ?tempdb(), - {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]), - ok = couch_db:close(Db), - DbName. - -setup(remote) -> - {remote, setup()}; -setup({A, B}) -> - Ctx = test_util:start_couch([couch_replicator]), - config:set("attachments", "compressible_types", "text/*", false), - Source = setup(A), - Target = setup(B), - {Ctx, {Source, Target}}. - -teardown({remote, DbName}) -> - teardown(DbName); -teardown(DbName) -> - ok = couch_server:delete(DbName, [?ADMIN_CTX]), - ok. - -teardown(_, {Ctx, {Source, Target}}) -> - teardown(Source), - teardown(Target), - - ok = application:stop(couch_replicator), - ok = test_util:stop_couch(Ctx). - large_atts_test_() -> - Pairs = [{remote, remote}], { - "Replicate docs with large attachments", + "Large attachment replication test", { - foreachx, - fun setup/1, fun teardown/2, - [{Pair, fun should_populate_replicate_compact/2} - || Pair <- Pairs] + setup, + fun couch_replicator_test_helper:start_couch/0, + fun couch_replicator_test_helper:stop_couch/1, + { + foreach, + fun setup/0, + fun teardown/1, + [ + ?TDEF_FE(should_replicate_attachments, 120) + ] + } } }. -should_populate_replicate_compact({From, To}, {_Ctx, {Source, Target}}) -> - {lists:flatten(io_lib:format("~p -> ~p", [From, To])), - {inorder, [should_populate_source(Source), - should_replicate(Source, Target), - should_compare_databases(Source, Target)]}}. +setup() -> + AttCfg = config:get("attachments", "compressible_types"), + config:set("attachments", "compressible_types", "text/*", false), + Source = couch_replicator_test_helper:create_db(), + ok = populate_db(Source, ?DOCS_COUNT), + Target = couch_replicator_test_helper:create_db(), + {AttCfg, Source, Target}. + -should_populate_source({remote, Source}) -> - should_populate_source(Source); -should_populate_source(Source) -> - {timeout, ?TIMEOUT_EUNIT, ?_test(populate_db(Source, ?DOCS_COUNT))}. +teardown({AttCfg, Source, Target}) -> + couch_replicator_test_helper:delete_db(Source), + couch_replicator_test_helper:delete_db(Target), + case AttCfg of + undefined -> + config:delete("attachments", "compressible_types", false); + _ -> + config:set("attachments", "compressible_types", AttCfg) + end. -should_replicate({remote, Source}, Target) -> - should_replicate(db_url(Source), Target); -should_replicate(Source, {remote, Target}) -> - should_replicate(Source, db_url(Target)); -should_replicate(Source, Target) -> - {timeout, ?TIMEOUT_EUNIT, ?_test(replicate(Source, Target))}. -should_compare_databases({remote, Source}, Target) -> - should_compare_databases(Source, Target); -should_compare_databases(Source, {remote, Target}) -> - should_compare_databases(Source, Target); -should_compare_databases(Source, Target) -> - {timeout, ?TIMEOUT_EUNIT, ?_test(compare_dbs(Source, Target))}. +should_replicate_attachments({_AttCfg, Source, Target}) -> + ?assertMatch({ok, _}, + couch_replicator_test_helper:replicate(Source, Target)), + ?assertEqual(ok, couch_replicator_test_helper:compare_dbs(Source, Target)). populate_db(DbName, DocCount) -> - {ok, Db} = couch_db:open_int(DbName, []), - Docs = lists:foldl( - fun(DocIdCounter, Acc) -> - Doc = #doc{ - id = iolist_to_binary(["doc", integer_to_list(DocIdCounter)]), - body = {[]}, - atts = [ - att(<<"att1">>, ?ATT_SIZE_1, <<"text/plain">>), - att(<<"att2">>, ?ATT_SIZE_2, <<"app/binary">>) - ] - }, - [Doc | Acc] - end, - [], lists:seq(1, DocCount)), - {ok, _} = couch_db:update_docs(Db, Docs, []), - couch_db:close(Db). + Docs = lists:foldl(fun(DocIdCounter, Acc) -> + Doc = #doc{ + id = iolist_to_binary(["doc", integer_to_list(DocIdCounter)]), + body = {[]}, + atts = [ + att(<<"att1">>, ?ATT_SIZE_1, <<"text/plain">>), + att(<<"att2">>, ?ATT_SIZE_2, <<"app/binary">>) + ] + }, + [Doc | Acc] + end, [], lists:seq(1, DocCount)), + couch_replicator_test_helper:create_docs(DbName, Docs). + att(Name, Size, Type) -> couch_att:new([ diff --git a/src/couch_replicator/test/eunit/couch_replicator_many_leaves_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_many_leaves_tests.erl index c7933b472..3dbfa6aba 100644 --- a/src/couch_replicator/test/eunit/couch_replicator_many_leaves_tests.erl +++ b/src/couch_replicator/test/eunit/couch_replicator_many_leaves_tests.erl @@ -14,11 +14,8 @@ -include_lib("couch/include/couch_eunit.hrl"). -include_lib("couch/include/couch_db.hrl"). +-include_lib("fabric/test/fabric2_test.hrl"). --import(couch_replicator_test_helper, [ - db_url/1, - replicate/2 -]). -define(DOCS_CONFLICTS, [ {<<"doc1">>, 10}, @@ -28,178 +25,150 @@ {<<"doc3">>, 210} ]). -define(NUM_ATTS, 2). --define(TIMEOUT_EUNIT, 60). -define(i2l(I), integer_to_list(I)). -define(io2b(Io), iolist_to_binary(Io)). -setup() -> - DbName = ?tempdb(), - {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]), - ok = couch_db:close(Db), - DbName. - - -setup(remote) -> - {remote, setup()}; -setup({A, B}) -> - Ctx = test_util:start_couch([couch_replicator]), - Source = setup(A), - Target = setup(B), - {Ctx, {Source, Target}}. - -teardown({remote, DbName}) -> - teardown(DbName); -teardown(DbName) -> - ok = couch_server:delete(DbName, [?ADMIN_CTX]), - ok. - -teardown(_, {Ctx, {Source, Target}}) -> - teardown(Source), - teardown(Target), - ok = application:stop(couch_replicator), - ok = test_util:stop_couch(Ctx). docs_with_many_leaves_test_() -> - Pairs = [{remote, remote}], { "Replicate documents with many leaves", { - foreachx, - fun setup/1, fun teardown/2, - [{Pair, fun should_populate_replicate_compact/2} - || Pair <- Pairs] + setup, + fun couch_replicator_test_helper:start_couch/0, + fun couch_replicator_test_helper:stop_couch/1, + { + foreach, + fun setup/0, + fun teardown/1, + [ + ?TDEF_FE(should_replicate_doc_with_many_leaves, 180) + ] + } } }. -should_populate_replicate_compact({From, To}, {_Ctx, {Source, Target}}) -> - {lists:flatten(io_lib:format("~p -> ~p", [From, To])), - {inorder, [ - should_populate_source(Source), - should_replicate(Source, Target), - should_verify_target(Source, Target), - should_add_attachments_to_source(Source), - should_replicate(Source, Target), - should_verify_target(Source, Target) - ]}}. - -should_populate_source({remote, Source}) -> - should_populate_source(Source); -should_populate_source(Source) -> - {timeout, ?TIMEOUT_EUNIT, ?_test(populate_db(Source))}. - -should_replicate({remote, Source}, Target) -> - should_replicate(db_url(Source), Target); -should_replicate(Source, {remote, Target}) -> - should_replicate(Source, db_url(Target)); -should_replicate(Source, Target) -> - {timeout, ?TIMEOUT_EUNIT, ?_test(replicate(Source, Target))}. - -should_verify_target({remote, Source}, Target) -> - should_verify_target(Source, Target); -should_verify_target(Source, {remote, Target}) -> - should_verify_target(Source, Target); -should_verify_target(Source, Target) -> - {timeout, ?TIMEOUT_EUNIT, ?_test(begin - {ok, SourceDb} = couch_db:open_int(Source, []), - {ok, TargetDb} = couch_db:open_int(Target, []), - verify_target(SourceDb, TargetDb, ?DOCS_CONFLICTS), - ok = couch_db:close(SourceDb), - ok = couch_db:close(TargetDb) - end)}. - -should_add_attachments_to_source({remote, Source}) -> - should_add_attachments_to_source(Source); -should_add_attachments_to_source(Source) -> - {timeout, ?TIMEOUT_EUNIT, ?_test(begin - {ok, SourceDb} = couch_db:open_int(Source, [?ADMIN_CTX]), - add_attachments(SourceDb, ?NUM_ATTS, ?DOCS_CONFLICTS), - ok = couch_db:close(SourceDb) - end)}. +setup() -> + Source = couch_replicator_test_helper:create_db(), + populate_db(Source), + Target = couch_replicator_test_helper:create_db(), + {Source, Target}. + + +teardown({Source, Target}) -> + couch_replicator_test_helper:delete_db(Source), + couch_replicator_test_helper:delete_db(Target). + + +should_replicate_doc_with_many_leaves({Source, Target}) -> + replicate(Source, Target), + {ok, SourceDb} = fabric2_db:open(Source, [?ADMIN_CTX]), + {ok, TargetDb} = fabric2_db:open(Target, [?ADMIN_CTX]), + verify_target(SourceDb, TargetDb, ?DOCS_CONFLICTS), + add_attachments(SourceDb, ?NUM_ATTS, ?DOCS_CONFLICTS), + replicate(Source, Target), + verify_target(SourceDb, TargetDb, ?DOCS_CONFLICTS). + populate_db(DbName) -> - {ok, Db} = couch_db:open_int(DbName, [?ADMIN_CTX]), - lists:foreach( - fun({DocId, NumConflicts}) -> - Value = <<"0">>, - Doc = #doc{ - id = DocId, - body = {[ {<<"value">>, Value} ]} - }, - {ok, _} = couch_db:update_doc(Db, Doc, [?ADMIN_CTX]), - {ok, _} = add_doc_siblings(Db, DocId, NumConflicts) - end, ?DOCS_CONFLICTS), - couch_db:close(Db). - -add_doc_siblings(Db, DocId, NumLeaves) when NumLeaves > 0 -> + {ok, Db} = fabric2_db:open(DbName, [?ADMIN_CTX]), + lists:foreach(fun({DocId, NumConflicts}) -> + Doc = #doc{ + id = DocId, + body = {[{<<"value">>, <<"0">>}]} + }, + {ok, _} = fabric2_db:update_doc(Db, Doc), + {ok, _} = add_doc_siblings(Db, DocId, NumConflicts) + end, ?DOCS_CONFLICTS). + + +add_doc_siblings(#{} = Db, DocId, NumLeaves) when NumLeaves > 0 -> add_doc_siblings(Db, DocId, NumLeaves, [], []). -add_doc_siblings(Db, _DocId, 0, AccDocs, AccRevs) -> - {ok, []} = couch_db:update_docs(Db, AccDocs, [], replicated_changes), + +add_doc_siblings(#{} = Db, _DocId, 0, AccDocs, AccRevs) -> + {ok, []} = fabric2_db:update_docs(Db, AccDocs, [replicated_changes]), {ok, AccRevs}; -add_doc_siblings(Db, DocId, NumLeaves, AccDocs, AccRevs) -> +add_doc_siblings(#{} = Db, DocId, NumLeaves, AccDocs, AccRevs) -> Value = ?l2b(?i2l(NumLeaves)), Rev = couch_hash:md5_hash(Value), Doc = #doc{ id = DocId, revs = {1, [Rev]}, - body = {[ {<<"value">>, Value} ]} + body = {[{<<"value">>, Value}]} }, add_doc_siblings(Db, DocId, NumLeaves - 1, - [Doc | AccDocs], [{1, Rev} | AccRevs]). + [Doc | AccDocs], [{1, Rev} | AccRevs]). + verify_target(_SourceDb, _TargetDb, []) -> ok; -verify_target(SourceDb, TargetDb, [{DocId, NumConflicts} | Rest]) -> - {ok, SourceLookups} = couch_db:open_doc_revs( - SourceDb, - DocId, - all, - [conflicts, deleted_conflicts]), - {ok, TargetLookups} = couch_db:open_doc_revs( - TargetDb, - DocId, - all, - [conflicts, deleted_conflicts]), + +verify_target(#{} = SourceDb, #{} = TargetDb, + [{DocId, NumConflicts} | Rest]) -> + Opts = [conflicts, deleted_conflicts], + {ok, SourceLookups} = open_doc_revs(SourceDb, DocId, Opts), + {ok, TargetLookups} = open_doc_revs(TargetDb, DocId, Opts), SourceDocs = [Doc || {ok, Doc} <- SourceLookups], TargetDocs = [Doc || {ok, Doc} <- TargetLookups], Total = NumConflicts + 1, ?assertEqual(Total, length(TargetDocs)), - lists:foreach( - fun({SourceDoc, TargetDoc}) -> - SourceJson = couch_doc:to_json_obj(SourceDoc, [attachments]), - TargetJson = couch_doc:to_json_obj(TargetDoc, [attachments]), - ?assertEqual(SourceJson, TargetJson) - end, - lists:zip(SourceDocs, TargetDocs)), + lists:foreach(fun({SourceDoc, TargetDoc}) -> + ?assertEqual(json_doc(SourceDoc), json_doc(TargetDoc)) + end, lists:zip(SourceDocs, TargetDocs)), verify_target(SourceDb, TargetDb, Rest). -add_attachments(_SourceDb, _NumAtts, []) -> + +add_attachments(_SourceDb, _NumAtts, []) -> ok; -add_attachments(SourceDb, NumAtts, [{DocId, NumConflicts} | Rest]) -> - {ok, SourceLookups} = couch_db:open_doc_revs(SourceDb, DocId, all, []), + +add_attachments(#{} = SourceDb, NumAtts, + [{DocId, NumConflicts} | Rest]) -> + {ok, SourceLookups} = open_doc_revs(SourceDb, DocId, []), SourceDocs = [Doc || {ok, Doc} <- SourceLookups], Total = NumConflicts + 1, ?assertEqual(Total, length(SourceDocs)), - NewDocs = lists:foldl( - fun(#doc{atts = Atts, revs = {Pos, [Rev | _]}} = Doc, Acc) -> + NewDocs = lists:foldl(fun + (#doc{atts = Atts, revs = {Pos, [Rev | _]}} = Doc, Acc) -> NewAtts = lists:foldl(fun(I, AttAcc) -> - AttData = crypto:strong_rand_bytes(100), - NewAtt = couch_att:new([ - {name, ?io2b(["att_", ?i2l(I), "_", - couch_doc:rev_to_str({Pos, Rev})])}, - {type, <<"application/foobar">>}, - {att_len, byte_size(AttData)}, - {data, AttData} - ]), - [NewAtt | AttAcc] + [att(I, {Pos, Rev}, 100) | AttAcc] end, [], lists:seq(1, NumAtts)), [Doc#doc{atts = Atts ++ NewAtts} | Acc] - end, - [], SourceDocs), - {ok, UpdateResults} = couch_db:update_docs(SourceDb, NewDocs, []), - NewRevs = [R || {ok, R} <- UpdateResults], - ?assertEqual(length(NewDocs), length(NewRevs)), + end, [], SourceDocs), + lists:foreach(fun(#doc{} = Doc) -> + ?assertMatch({ok, _}, fabric2_db:update_doc(SourceDb, Doc)) + end, NewDocs), add_attachments(SourceDb, NumAtts, Rest). + +att(I, PosRev, Size) -> + Name = ?io2b(["att_", ?i2l(I), "_", couch_doc:rev_to_str(PosRev)]), + AttData = crypto:strong_rand_bytes(Size), + couch_att:new([ + {name, Name}, + {type, <<"application/foobar">>}, + {att_len, byte_size(AttData)}, + {data, AttData} + ]). + + +open_doc_revs(#{} = Db, DocId, Opts) -> + fabric2_db:open_doc_revs(Db, DocId, all, Opts). + + +json_doc(#doc{} = Doc) -> + couch_doc:to_json_obj(Doc, [attachments]). + + +replicate(Source, Target) -> + % Serialize the concurrent updates of the same document in order + % to prevent having to set higher timeouts due to FDB conflicts + RepObject = #{ + <<"source">> => Source, + <<"target">> => Target, + <<"worker_processes">> => 1, + <<"http_connections">> => 1 + }, + ?assertMatch({ok, _}, + couch_replicator_test_helper:replicate(RepObject)). diff --git a/src/couch_replicator/test/eunit/couch_replicator_missing_stubs_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_missing_stubs_tests.erl index ff08b5ee5..e672c76b7 100644 --- a/src/couch_replicator/test/eunit/couch_replicator_missing_stubs_tests.erl +++ b/src/couch_replicator/test/eunit/couch_replicator_missing_stubs_tests.erl @@ -14,103 +14,59 @@ -include_lib("couch/include/couch_eunit.hrl"). -include_lib("couch/include/couch_db.hrl"). +-include_lib("fabric/test/fabric2_test.hrl"). --import(couch_replicator_test_helper, [ - db_url/1, - replicate/2, - compare_dbs/2 -]). -define(REVS_LIMIT, 3). --define(TIMEOUT_EUNIT, 30). -setup() -> - DbName = ?tempdb(), - {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]), - ok = couch_db:close(Db), - DbName. - -setup(remote) -> - {remote, setup()}; -setup({A, B}) -> - Ctx = test_util:start_couch([couch_replicator]), - Source = setup(A), - Target = setup(B), - {Ctx, {Source, Target}}. - -teardown({remote, DbName}) -> - teardown(DbName); -teardown(DbName) -> - ok = couch_server:delete(DbName, [?ADMIN_CTX]), - ok. - -teardown(_, {Ctx, {Source, Target}}) -> - teardown(Source), - teardown(Target), - ok = application:stop(couch_replicator), - ok = test_util:stop_couch(Ctx). missing_stubs_test_() -> - Pairs = [{remote, remote}], { "Replicate docs with missing stubs (COUCHDB-1365)", { - foreachx, - fun setup/1, fun teardown/2, - [{Pair, fun should_replicate_docs_with_missed_att_stubs/2} - || Pair <- Pairs] + setup, + fun couch_replicator_test_helper:start_couch/0, + fun couch_replicator_test_helper:stop_couch/1, + { + foreach, + fun setup/0, + fun teardown/1, + [ + ?TDEF_FE(should_replicate_docs_with_missed_att_stubs, 60) + ] + } } }. -should_replicate_docs_with_missed_att_stubs({From, To}, {_Ctx, {Source, Target}}) -> - {lists:flatten(io_lib:format("~p -> ~p", [From, To])), - {inorder, [ - should_populate_source(Source), - should_set_target_revs_limit(Target, ?REVS_LIMIT), - should_replicate(Source, Target), - should_compare_databases(Source, Target), - should_update_source_docs(Source, ?REVS_LIMIT * 2), - should_replicate(Source, Target), - should_compare_databases(Source, Target) - ]}}. - -should_populate_source({remote, Source}) -> - should_populate_source(Source); -should_populate_source(Source) -> - {timeout, ?TIMEOUT_EUNIT, ?_test(populate_db(Source))}. - -should_replicate({remote, Source}, Target) -> - should_replicate(db_url(Source), Target); -should_replicate(Source, {remote, Target}) -> - should_replicate(Source, db_url(Target)); -should_replicate(Source, Target) -> - {timeout, ?TIMEOUT_EUNIT, ?_test(replicate(Source, Target))}. - -should_set_target_revs_limit({remote, Target}, RevsLimit) -> - should_set_target_revs_limit(Target, RevsLimit); -should_set_target_revs_limit(Target, RevsLimit) -> - ?_test(begin - {ok, Db} = couch_db:open_int(Target, [?ADMIN_CTX]), - ?assertEqual(ok, couch_db:set_revs_limit(Db, RevsLimit)), - ok = couch_db:close(Db) - end). - -should_compare_databases({remote, Source}, Target) -> - should_compare_databases(Source, Target); -should_compare_databases(Source, {remote, Target}) -> - should_compare_databases(Source, Target); -should_compare_databases(Source, Target) -> - {timeout, ?TIMEOUT_EUNIT, ?_test(compare_dbs(Source, Target))}. - -should_update_source_docs({remote, Source}, Times) -> - should_update_source_docs(Source, Times); -should_update_source_docs(Source, Times) -> - {timeout, ?TIMEOUT_EUNIT, ?_test(update_db_docs(Source, Times))}. +setup() -> + Source = couch_replicator_test_helper:create_db(), + populate_db(Source), + Target = couch_replicator_test_helper:create_db(), + {Source, Target}. + + +teardown({Source, Target}) -> + couch_replicator_test_helper:delete_db(Source), + couch_replicator_test_helper:delete_db(Target). + + +should_replicate_docs_with_missed_att_stubs({Source, Target}) -> + {ok, TargetDb} = fabric2_db:open(Target, [?ADMIN_CTX]), + ?assertEqual(ok, fabric2_db:set_revs_limit(TargetDb, ?REVS_LIMIT)), + + ?assertMatch({ok, _}, + couch_replicator_test_helper:replicate(Source, Target)), + ?assertEqual(ok, couch_replicator_test_helper:compare_dbs(Source, Target)), + + ok = update_db_docs(Source, ?REVS_LIMIT * 2), + + ?assertMatch({ok, _}, + couch_replicator_test_helper:replicate(Source, Target)), + ?assertEqual(ok, couch_replicator_test_helper:compare_dbs(Source, Target)). populate_db(DbName) -> - {ok, Db} = couch_db:open_int(DbName, []), AttData = crypto:strong_rand_bytes(6000), Doc = #doc{ id = <<"doc1">>, @@ -120,35 +76,40 @@ populate_db(DbName) -> {type, <<"application/foobar">>}, {att_len, byte_size(AttData)}, {data, AttData} - ]) + ]) ] }, - {ok, _} = couch_db:update_doc(Db, Doc, []), - couch_db:close(Db). + couch_replicator_test_helper:create_docs(DbName, [Doc]). + update_db_docs(DbName, Times) -> - {ok, Db} = couch_db:open_int(DbName, []), - {ok, _} = couch_db:fold_docs( - Db, - fun(FDI, Acc) -> db_fold_fun(FDI, Acc) end, - {DbName, Times}, - []), - ok = couch_db:close(Db). - -db_fold_fun(FullDocInfo, {DbName, Times}) -> - {ok, Db} = couch_db:open_int(DbName, []), - {ok, Doc} = couch_db:open_doc(Db, FullDocInfo), - lists:foldl( - fun(_, {Pos, RevId}) -> - {ok, Db2} = couch_db:reopen(Db), - NewDocVersion = Doc#doc{ - revs = {Pos, [RevId]}, - body = {[{<<"value">>, base64:encode(crypto:strong_rand_bytes(100))}]} - }, - {ok, NewRev} = couch_db:update_doc(Db2, NewDocVersion, []), - NewRev - end, - {element(1, Doc#doc.revs), hd(element(2, Doc#doc.revs))}, - lists:seq(1, Times)), - ok = couch_db:close(Db), - {ok, {DbName, Times}}. + {ok, Db} = fabric2_db:open(DbName, [?ADMIN_CTX]), + FoldFun = fun + ({meta, _Meta}, Acc) -> + {ok, Acc}; + (complete, Acc) -> + {ok, Acc}; + ({row, Row}, Acc) -> + {_, DocId} = lists:keyfind(id, 1, Row), + ok = update_doc(DbName, DocId, Times), + {ok, Acc} + end, + Opts = [{restart_tx, true}], + {ok, _} = fabric2_db:fold_docs(Db, FoldFun, ok, Opts), + ok. + + +update_doc(_DbName, _DocId, 0) -> + ok; + +update_doc(DbName, DocId, Times) -> + {ok, Db} = fabric2_db:open(DbName, [?ADMIN_CTX]), + {ok, Doc} = fabric2_db:open_doc(Db, DocId, []), + #doc{revs = {Pos, [Rev | _]}} = Doc, + Val = base64:encode(crypto:strong_rand_bytes(100)), + Doc1 = Doc#doc{ + revs = {Pos, [Rev]}, + body = {[{<<"value">>, Val}]} + }, + {ok, _} = fabric2_db:update_doc(Db, Doc1), + update_doc(DbName, DocId, Times - 1). diff --git a/src/couch_replicator/test/eunit/couch_replicator_proxy_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_proxy_tests.erl index da46b8a26..f5e745d90 100644 --- a/src/couch_replicator/test/eunit/couch_replicator_proxy_tests.erl +++ b/src/couch_replicator/test/eunit/couch_replicator_proxy_tests.erl @@ -14,15 +14,7 @@ -include_lib("couch/include/couch_eunit.hrl"). -include_lib("couch_replicator/src/couch_replicator.hrl"). --include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl"). - - -setup() -> - ok. - - -teardown(_) -> - ok. +-include_lib("fabric/test/fabric2_test.hrl"). replicator_proxy_test_() -> @@ -30,87 +22,78 @@ replicator_proxy_test_() -> "replicator proxy tests", { setup, - fun() -> test_util:start_couch([couch_replicator]) end, fun test_util:stop_couch/1, - { - foreach, - fun setup/0, fun teardown/1, - [ - fun parse_rep_doc_without_proxy/1, - fun parse_rep_doc_with_proxy/1, - fun parse_rep_source_target_proxy/1, - fun mutually_exclusive_proxy_and_source_proxy/1, - fun mutually_exclusive_proxy_and_target_proxy/1 - ] - } + fun couch_replicator_test_helper:start_couch/0, + fun couch_replicator_test_helper:stop_couch/1, + with([ + ?TDEF(parse_rep_doc_without_proxy), + ?TDEF(parse_rep_doc_with_proxy), + ?TDEF(parse_rep_source_target_proxy), + ?TDEF(mutually_exclusive_proxy_and_source_proxy), + ?TDEF(mutually_exclusive_proxy_and_target_proxy) + ]) } }. parse_rep_doc_without_proxy(_) -> - ?_test(begin - NoProxyDoc = {[ - {<<"source">>, <<"http://unproxied.com">>}, - {<<"target">>, <<"http://otherunproxied.com">>} - ]}, - Rep = couch_replicator_docs:parse_rep_doc(NoProxyDoc), - ?assertEqual((Rep#rep.source)#httpdb.proxy_url, undefined), - ?assertEqual((Rep#rep.target)#httpdb.proxy_url, undefined) - end). + NoProxyDoc = {[ + {<<"source">>, <<"http://unproxied.com">>}, + {<<"target">>, <<"http://otherunproxied.com">>} + ]}, + Rep = couch_replicator_parse:parse_rep_doc(NoProxyDoc), + Src = maps:get(?SOURCE, Rep), + Tgt = maps:get(?TARGET, Rep), + ?assertEqual(null, maps:get(<<"proxy_url">>, Src)), + ?assertEqual(null, maps:get(<<"proxy_url">>, Tgt)). parse_rep_doc_with_proxy(_) -> - ?_test(begin - ProxyURL = <<"http://myproxy.com">>, - ProxyDoc = {[ - {<<"source">>, <<"http://unproxied.com">>}, - {<<"target">>, <<"http://otherunproxied.com">>}, - {<<"proxy">>, ProxyURL} - ]}, - Rep = couch_replicator_docs:parse_rep_doc(ProxyDoc), - ?assertEqual((Rep#rep.source)#httpdb.proxy_url, binary_to_list(ProxyURL)), - ?assertEqual((Rep#rep.target)#httpdb.proxy_url, binary_to_list(ProxyURL)) - end). + ProxyURL = <<"http://myproxy.com">>, + ProxyDoc = {[ + {<<"source">>, <<"http://unproxied.com">>}, + {<<"target">>, <<"http://otherunproxied.com">>}, + {<<"proxy">>, ProxyURL} + ]}, + Rep = couch_replicator_parse:parse_rep_doc(ProxyDoc), + Src = maps:get(?SOURCE, Rep), + Tgt = maps:get(?TARGET, Rep), + ?assertEqual(ProxyURL, maps:get(<<"proxy_url">>, Src)), + ?assertEqual(ProxyURL, maps:get(<<"proxy_url">>, Tgt)). parse_rep_source_target_proxy(_) -> - ?_test(begin - SrcProxyURL = <<"http://mysrcproxy.com">>, - TgtProxyURL = <<"http://mytgtproxy.com:9999">>, - ProxyDoc = {[ - {<<"source">>, <<"http://unproxied.com">>}, - {<<"target">>, <<"http://otherunproxied.com">>}, - {<<"source_proxy">>, SrcProxyURL}, - {<<"target_proxy">>, TgtProxyURL} - ]}, - Rep = couch_replicator_docs:parse_rep_doc(ProxyDoc), - ?assertEqual((Rep#rep.source)#httpdb.proxy_url, - binary_to_list(SrcProxyURL)), - ?assertEqual((Rep#rep.target)#httpdb.proxy_url, - binary_to_list(TgtProxyURL)) - end). + SrcProxyURL = <<"http://mysrcproxy.com">>, + TgtProxyURL = <<"http://mytgtproxy.com:9999">>, + ProxyDoc = {[ + {<<"source">>, <<"http://unproxied.com">>}, + {<<"target">>, <<"http://otherunproxied.com">>}, + {<<"source_proxy">>, SrcProxyURL}, + {<<"target_proxy">>, TgtProxyURL} + ]}, + Rep = couch_replicator_parse:parse_rep_doc(ProxyDoc), + Src = maps:get(?SOURCE, Rep), + Tgt = maps:get(?TARGET, Rep), + ?assertEqual(SrcProxyURL, maps:get(<<"proxy_url">>, Src)), + ?assertEqual(TgtProxyURL, maps:get(<<"proxy_url">>, Tgt)). mutually_exclusive_proxy_and_source_proxy(_) -> - ?_test(begin - ProxyDoc = {[ - {<<"source">>, <<"http://unproxied.com">>}, - {<<"target">>, <<"http://otherunproxied.com">>}, - {<<"proxy">>, <<"oldstyleproxy.local">>}, - {<<"source_proxy">>, <<"sourceproxy.local">>} - ]}, - ?assertThrow({bad_rep_doc, _}, - couch_replicator_docs:parse_rep_doc(ProxyDoc)) - end). + ProxyDoc = {[ + {<<"source">>, <<"http://unproxied.com">>}, + {<<"target">>, <<"http://otherunproxied.com">>}, + {<<"proxy">>, <<"oldstyleproxy.local">>}, + {<<"source_proxy">>, <<"sourceproxy.local">>} + ]}, + ?assertThrow({bad_rep_doc, _}, + couch_replicator_parse:parse_rep_doc(ProxyDoc)). mutually_exclusive_proxy_and_target_proxy(_) -> - ?_test(begin - ProxyDoc = {[ - {<<"source">>, <<"http://unproxied.com">>}, - {<<"target">>, <<"http://otherunproxied.com">>}, - {<<"proxy">>, <<"oldstyleproxy.local">>}, - {<<"target_proxy">>, <<"targetproxy.local">>} - ]}, - ?assertThrow({bad_rep_doc, _}, - couch_replicator_docs:parse_rep_doc(ProxyDoc)) - end). + ProxyDoc = {[ + {<<"source">>, <<"http://unproxied.com">>}, + {<<"target">>, <<"http://otherunproxied.com">>}, + {<<"proxy">>, <<"oldstyleproxy.local">>}, + {<<"target_proxy">>, <<"targetproxy.local">>} + ]}, + ?assertThrow({bad_rep_doc, _}, + couch_replicator_parse:parse_rep_doc(ProxyDoc)). diff --git a/src/couch_replicator/test/eunit/couch_replicator_rate_limiter_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_rate_limiter_tests.erl index 034550aec..fb9892017 100644 --- a/src/couch_replicator/test/eunit/couch_replicator_rate_limiter_tests.erl +++ b/src/couch_replicator/test/eunit/couch_replicator_rate_limiter_tests.erl @@ -1,6 +1,7 @@ -module(couch_replicator_rate_limiter_tests). -include_lib("couch/include/couch_eunit.hrl"). +-include_lib("fabric/test/fabric2_test.hrl"). rate_limiter_test_() -> @@ -9,64 +10,52 @@ rate_limiter_test_() -> fun setup/0, fun teardown/1, [ - t_new_key(), - t_1_failure(), - t_2_failures_back_to_back(), - t_2_failures(), - t_success_threshold(), - t_1_failure_2_successes() + ?TDEF_FE(t_new_key), + ?TDEF_FE(t_1_failure), + ?TDEF_FE(t_2_failures_back_to_back), + ?TDEF_FE(t_2_failures), + ?TDEF_FE(t_success_threshold), + ?TDEF_FE(t_1_failure_2_successes) ] }. -t_new_key() -> - ?_test(begin - ?assertEqual(0, couch_replicator_rate_limiter:interval({"foo", get})) - end). +t_new_key(_) -> + ?assertEqual(0, couch_replicator_rate_limiter:interval({"foo", get})). -t_1_failure() -> - ?_test(begin - ?assertEqual(24, couch_replicator_rate_limiter:failure({"foo", get})) - end). +t_1_failure(_) -> + ?assertEqual(24, couch_replicator_rate_limiter:failure({"foo", get})). -t_2_failures() -> - ?_test(begin - couch_replicator_rate_limiter:failure({"foo", get}), - low_pass_filter_delay(), - Interval = couch_replicator_rate_limiter:failure({"foo", get}), - ?assertEqual(29, Interval) - end). +t_2_failures(_) -> + couch_replicator_rate_limiter:failure({"foo", get}), + low_pass_filter_delay(), + Interval = couch_replicator_rate_limiter:failure({"foo", get}), + ?assertEqual(29, Interval). -t_2_failures_back_to_back() -> - ?_test(begin - couch_replicator_rate_limiter:failure({"foo", get}), - Interval = couch_replicator_rate_limiter:failure({"foo", get}), - ?assertEqual(24, Interval) - end). +t_2_failures_back_to_back(_) -> + couch_replicator_rate_limiter:failure({"foo", get}), + Interval = couch_replicator_rate_limiter:failure({"foo", get}), + ?assertEqual(24, Interval). -t_success_threshold() -> - ?_test(begin - Interval = couch_replicator_rate_limiter:success({"foo", get}), - ?assertEqual(0, Interval), - Interval = couch_replicator_rate_limiter:success({"foo", get}), - ?assertEqual(0, Interval) - end). +t_success_threshold(_) -> + Interval = couch_replicator_rate_limiter:success({"foo", get}), + ?assertEqual(0, Interval), + Interval = couch_replicator_rate_limiter:success({"foo", get}), + ?assertEqual(0, Interval). -t_1_failure_2_successes() -> - ?_test(begin - couch_replicator_rate_limiter:failure({"foo", get}), - low_pass_filter_delay(), - Succ1 = couch_replicator_rate_limiter:success({"foo", get}), - ?assertEqual(20, Succ1), - low_pass_filter_delay(), - Succ2 = couch_replicator_rate_limiter:success({"foo", get}), - ?assertEqual(0, Succ2) - end). +t_1_failure_2_successes(_) -> + couch_replicator_rate_limiter:failure({"foo", get}), + low_pass_filter_delay(), + Succ1 = couch_replicator_rate_limiter:success({"foo", get}), + ?assertEqual(20, Succ1), + low_pass_filter_delay(), + Succ2 = couch_replicator_rate_limiter:success({"foo", get}), + ?assertEqual(0, Succ2). low_pass_filter_delay() -> diff --git a/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl b/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl index 037f37191..4b7c37d9e 100644 --- a/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl +++ b/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl @@ -15,139 +15,72 @@ -include_lib("couch/include/couch_eunit.hrl"). -include_lib("couch/include/couch_db.hrl"). -include_lib("couch_replicator/src/couch_replicator.hrl"). - --define(DELAY, 500). --define(TIMEOUT, 60000). - - -setup_all() -> - test_util:start_couch([couch_replicator, chttpd, mem3, fabric]). - - -teardown_all(Ctx) -> - ok = test_util:stop_couch(Ctx). - - -setup() -> - Source = setup_db(), - Target = setup_db(), - {Source, Target}. +-include_lib("fabric/test/fabric2_test.hrl"). -teardown({Source, Target}) -> - teardown_db(Source), - teardown_db(Target), - ok. +-define(DELAY, 500). stats_retained_test_() -> { setup, - fun setup_all/0, - fun teardown_all/1, + fun couch_replicator_test_helper:start_couch/0, + fun couch_replicator_test_helper:stop_couch/1, { foreach, fun setup/0, fun teardown/1, [ - fun t_stats_retained_by_scheduler/1, - fun t_stats_retained_on_job_removal/1 + ?TDEF_FE(t_stats_retained_on_job_removal, 60) ] } }. -t_stats_retained_by_scheduler({Source, Target}) -> - ?_test(begin - {ok, _} = add_vdu(Target), - populate_db_reject_even_docs(Source, 1, 10), - {ok, RepPid, RepId} = replicate(Source, Target), - wait_target_in_sync(6, Target), - - check_active_tasks(10, 5, 5), - check_scheduler_jobs(10, 5, 5), +setup() -> + Source = couch_replicator_test_helper:create_db(), + Target = couch_replicator_test_helper:create_db(), + config:set("replicator", "stats_update_interval_sec", "0", false), + config:set("replicator", "checkpoint_interval", "1000", false), + {Source, Target}. - stop_job(RepPid), - check_scheduler_jobs(10, 5, 5), - start_job(), - check_active_tasks(10, 5, 5), - check_scheduler_jobs(10, 5, 5), - couch_replicator_scheduler:remove_job(RepId) - end). +teardown({Source, Target}) -> + config:delete("replicator", "stats_update_interval_sec", false), + config:delete("replicator", "checkpoint_interval", false), + couch_replicator_test_helper:delete_db(Source), + couch_replicator_test_helper:delete_db(Target). t_stats_retained_on_job_removal({Source, Target}) -> - ?_test(begin - {ok, _} = add_vdu(Target), - populate_db_reject_even_docs(Source, 1, 10), - {ok, _, RepId} = replicate(Source, Target), - wait_target_in_sync(6, Target), % 5 + 1 vdu - - check_active_tasks(10, 5, 5), - check_scheduler_jobs(10, 5, 5), + {ok, _} = add_vdu(Target), + populate_db_reject_even_docs(Source, 1, 10), + {ok, Pid1, RepId} = replicate(Source, Target), + wait_target_in_sync(6, Target), % 5 + 1 vdu - couch_replicator_scheduler:remove_job(RepId), + check_scheduler_jobs(10, 5, 5), - populate_db_reject_even_docs(Source, 11, 20), - {ok, _, RepId} = replicate(Source, Target), - wait_target_in_sync(11, Target), % 6 + 5 + cancel(RepId, Pid1), - check_scheduler_jobs(20, 10, 10), - check_active_tasks(20, 10, 10), + populate_db_reject_even_docs(Source, 11, 20), + {ok, Pid2, RepId} = replicate(Source, Target), + wait_target_in_sync(11, Target), % 6 + 5 - couch_replicator_scheduler:remove_job(RepId), + check_scheduler_jobs(20, 10, 10), - populate_db_reject_even_docs(Source, 21, 30), - {ok, _, RepId} = replicate(Source, Target), - wait_target_in_sync(16, Target), % 11 + 5 + cancel(RepId, Pid2), - check_scheduler_jobs(30, 15, 15), - check_active_tasks(30, 15, 15), - - couch_replicator_scheduler:remove_job(RepId) - end). + populate_db_reject_even_docs(Source, 21, 30), + {ok, Pid3, RepId} = replicate(Source, Target), + wait_target_in_sync(16, Target), % 11 + 5 + check_scheduler_jobs(30, 15, 15), -setup_db() -> - DbName = ?tempdb(), - {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]), - ok = couch_db:close(Db), - DbName. - - -teardown_db(DbName) -> - ok = couch_server:delete(DbName, [?ADMIN_CTX]), - ok. - - -stop_job(RepPid) -> - Ref = erlang:monitor(process, RepPid), - gen_server:cast(couch_replicator_scheduler, {set_max_jobs, 0}), - couch_replicator_scheduler:reschedule(), - receive - {'DOWN', Ref, _, _, _} -> ok - after ?TIMEOUT -> - erlang:error(timeout) - end. - - -start_job() -> - gen_server:cast(couch_replicator_scheduler, {set_max_jobs, 500}), - couch_replicator_scheduler:reschedule(). - - -check_active_tasks(DocsRead, DocsWritten, DocsFailed) -> - RepTask = wait_for_task_status(), - ?assertNotEqual(timeout, RepTask), - ?assertEqual(DocsRead, couch_util:get_value(docs_read, RepTask)), - ?assertEqual(DocsWritten, couch_util:get_value(docs_written, RepTask)), - ?assertEqual(DocsFailed, couch_util:get_value(doc_write_failures, - RepTask)). + cancel(RepId, Pid3). check_scheduler_jobs(DocsRead, DocsWritten, DocFailed) -> - Info = wait_scheduler_info(), + Info = wait_scheduler_info(DocsRead), ?assert(maps:is_key(<<"changes_pending">>, Info)), ?assert(maps:is_key(<<"doc_write_failures">>, Info)), ?assert(maps:is_key(<<"docs_read">>, Info)), @@ -161,27 +94,18 @@ check_scheduler_jobs(DocsRead, DocsWritten, DocFailed) -> ?assertMatch(#{<<"doc_write_failures">> := DocFailed}, Info). -replication_tasks() -> - lists:filter(fun(P) -> - couch_util:get_value(type, P) =:= replication - end, couch_task_status:all()). - - -wait_for_task_status() -> +wait_scheduler_info(DocsRead) -> test_util:wait(fun() -> - case replication_tasks() of - [] -> wait; - [RepTask] -> RepTask - end - end). - - -wait_scheduler_info() -> - test_util:wait(fun() -> - case scheduler_jobs() of - [] -> wait; - [#{<<"info">> := null}] -> wait; - [#{<<"info">> := Info}] -> Info + case couch_replicator_test_helper:scheduler_jobs() of + [] -> + wait; + [#{<<"info">> := null}] -> + wait; + [#{<<"info">> := Info}] -> + case Info of + #{<<"docs_read">> := DocsRead} -> Info; + #{} -> wait + end end end). @@ -197,16 +121,12 @@ populate_db_reject_even_docs(DbName, Start, End) -> populate_db(DbName, Start, End, BodyFun) when is_function(BodyFun, 1) -> - {ok, Db} = couch_db:open_int(DbName, []), - Docs = lists:foldl( - fun(DocIdCounter, Acc) -> - Id = integer_to_binary(DocIdCounter), - Doc = #doc{id = Id, body = BodyFun(DocIdCounter)}, - [Doc | Acc] - end, - [], lists:seq(Start, End)), - {ok, _} = couch_db:update_docs(Db, Docs, []), - ok = couch_db:close(Db). + Docs = lists:foldl(fun(DocIdCounter, Acc) -> + Id = integer_to_binary(DocIdCounter), + Doc = #doc{id = Id, body = BodyFun(DocIdCounter)}, + [Doc | Acc] + end, [], lists:seq(Start, End)), + couch_replicator_test_helper:create_docs(DbName, Docs). wait_target_in_sync(DocCount, Target) when is_integer(DocCount) -> @@ -215,14 +135,13 @@ wait_target_in_sync(DocCount, Target) when is_integer(DocCount) -> wait_target_in_sync_loop(_DocCount, _TargetName, 0) -> erlang:error({assertion_failed, [ - {module, ?MODULE}, {line, ?LINE}, - {reason, "Could not get source and target databases in sync"} + {module, ?MODULE}, {line, ?LINE}, + {reason, "Could not get source and target databases in sync"} ]}); wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft) -> - {ok, Target} = couch_db:open_int(TargetName, []), - {ok, TargetInfo} = couch_db:get_db_info(Target), - ok = couch_db:close(Target), + {ok, Db} = fabric2_db:open(TargetName, [?ADMIN_CTX]), + {ok, TargetInfo} = fabric2_db:get_db_info(Db), TargetDocCount = couch_util:get_value(doc_count, TargetInfo), case TargetDocCount == DocCount of true -> @@ -234,27 +153,11 @@ wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft) -> replicate(Source, Target) -> - SrcUrl = couch_replicator_test_helper:db_url(Source), - TgtUrl = couch_replicator_test_helper:db_url(Target), - RepObject = {[ - {<<"source">>, SrcUrl}, - {<<"target">>, TgtUrl}, - {<<"continuous">>, true} - ]}, - {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER), - ok = couch_replicator_scheduler:add_job(Rep), - couch_replicator_scheduler:reschedule(), - Pid = couch_replicator_test_helper:get_pid(Rep#rep.id), - {ok, Pid, Rep#rep.id}. - - -scheduler_jobs() -> - Addr = config:get("chttpd", "bind_address", "127.0.0.1"), - Port = mochiweb_socket_server:get(chttpd, port), - Url = lists:flatten(io_lib:format("http://~s:~b/_scheduler/jobs", [Addr, Port])), - {ok, 200, _, Body} = test_request:get(Url, []), - Json = jiffy:decode(Body, [return_maps]), - maps:get(<<"jobs">>, Json). + couch_replicator_test_helper:replicate_continuous(Source, Target). + + +cancel(RepId, Pid) -> + couch_replicator_test_helper:cancel(RepId, Pid). vdu() -> @@ -274,9 +177,5 @@ add_vdu(DbName) -> {<<"validate_doc_update">>, vdu()} ], Doc = couch_doc:from_json_obj({DocProps}, []), - {ok, Db} = couch_db:open_int(DbName, [?ADMIN_CTX]), - try - {ok, _Rev} = couch_db:update_doc(Db, Doc, []) - after - couch_db:close(Db) - end. + {ok, Db} = fabric2_db:open(DbName, [?ADMIN_CTX]), + {ok, _} = fabric2_db:update_doc(Db, Doc, []). diff --git a/src/couch_replicator/test/eunit/couch_replicator_selector_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_selector_tests.erl index 5026c1435..5dfe4ba91 100644 --- a/src/couch_replicator/test/eunit/couch_replicator_selector_tests.erl +++ b/src/couch_replicator/test/eunit/couch_replicator_selector_tests.erl @@ -15,103 +15,69 @@ -include_lib("couch/include/couch_eunit.hrl"). -include_lib("couch/include/couch_db.hrl"). -include_lib("couch_replicator/src/couch_replicator.hrl"). +-include_lib("fabric/test/fabric2_test.hrl"). -setup(_) -> - Ctx = test_util:start_couch([couch_replicator]), - Source = create_db(), - create_docs(Source), - Target = create_db(), - {Ctx, {Source, Target}}. - -teardown(_, {Ctx, {Source, Target}}) -> - delete_db(Source), - delete_db(Target), - ok = application:stop(couch_replicator), - ok = test_util:stop_couch(Ctx). - selector_replication_test_() -> - Pairs = [{remote, remote}], { "Selector filtered replication tests", { - foreachx, - fun setup/1, fun teardown/2, - [{Pair, fun should_succeed/2} || Pair <- Pairs] + setup, + fun couch_replicator_test_helper:start_couch/0, + fun couch_replicator_test_helper:stop_couch/1, + { + foreach, + fun setup/0, + fun teardown/1, + [ + ?TDEF_FE(should_replicate_with_selector) + ] + } } }. -should_succeed({From, To}, {_Ctx, {Source, Target}}) -> - RepObject = {[ - {<<"source">>, db_url(From, Source)}, - {<<"target">>, db_url(To, Target)}, - {<<"selector">>, {[{<<"_id">>, <<"doc2">>}]}} - ]}, - {ok, _} = couch_replicator:replicate(RepObject, ?ADMIN_USER), - %% FilteredFun is an Erlang version of following mango selector - FilterFun = fun(_DocId, {Props}) -> - couch_util:get_value(<<"_id">>, Props) == <<"doc2">> - end, - {ok, TargetDbInfo, AllReplies} = compare_dbs(Source, Target, FilterFun), - {lists:flatten(io_lib:format("~p -> ~p", [From, To])), [ - {"Target DB has proper number of docs", - ?_assertEqual(1, proplists:get_value(doc_count, TargetDbInfo))}, - {"All the docs selected as expected", - ?_assert(lists:all(fun(Valid) -> Valid end, AllReplies))} - ]}. -compare_dbs(Source, Target, FilterFun) -> - {ok, SourceDb} = couch_db:open_int(Source, []), - {ok, TargetDb} = couch_db:open_int(Target, []), - {ok, TargetDbInfo} = couch_db:get_db_info(TargetDb), - Fun = fun(FullDocInfo, Acc) -> - {ok, DocId, SourceDoc} = read_doc(SourceDb, FullDocInfo), - TargetReply = read_doc(TargetDb, DocId), - case FilterFun(DocId, SourceDoc) of - true -> - ValidReply = {ok, DocId, SourceDoc} == TargetReply, - {ok, [ValidReply|Acc]}; - false -> - ValidReply = {not_found, missing} == TargetReply, - {ok, [ValidReply|Acc]} - end - end, - {ok, AllReplies} = couch_db:fold_docs(SourceDb, Fun, [], []), - ok = couch_db:close(SourceDb), - ok = couch_db:close(TargetDb), - {ok, TargetDbInfo, AllReplies}. +setup() -> + Source = couch_replicator_test_helper:create_db(), + create_docs(Source), + Target = couch_replicator_test_helper:create_db(), + {Source, Target}. -read_doc(Db, DocIdOrInfo) -> - case couch_db:open_doc(Db, DocIdOrInfo) of - {ok, Doc} -> - {Props} = couch_doc:to_json_obj(Doc, [attachments]), - DocId = couch_util:get_value(<<"_id">>, Props), - {ok, DocId, {Props}}; - Error -> - Error - end. -create_db() -> - DbName = ?tempdb(), - {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]), - ok = couch_db:close(Db), - DbName. +teardown({Source, Target}) -> + couch_replicator_test_helper:delete_db(Source), + couch_replicator_test_helper:delete_db(Target). -create_docs(DbName) -> - {ok, Db} = couch_db:open(DbName, [?ADMIN_CTX]), - Doc1 = couch_doc:from_json_obj({[ - {<<"_id">>, <<"doc1">>} - ]}), - Doc2 = couch_doc:from_json_obj({[ - {<<"_id">>, <<"doc2">>} - ]}), - {ok, _} = couch_db:update_docs(Db, [Doc1, Doc2]), - couch_db:close(Db). -delete_db(DbName) -> - ok = couch_server:delete(DbName, [?ADMIN_CTX]). +should_replicate_with_selector({Source, Target}) -> + RepObject = #{ + <<"source">> => Source, + <<"target">> => Target, + <<"selector">> => #{ + <<"_id">> => <<"doc2">> + } + }, + ?assertMatch({ok, _}, couch_replicator_test_helper:replicate(RepObject)), + {ok, TargetDbInfo, AllReplies} = compare_dbs(Source, Target), + ?assertEqual(1, proplists:get_value(doc_count, TargetDbInfo)), + ?assert(lists:all(fun(Valid) -> Valid end, AllReplies)). + -db_url(remote, DbName) -> - Addr = config:get("httpd", "bind_address", "127.0.0.1"), - Port = mochiweb_socket_server:get(couch_httpd, port), - ?l2b(io_lib:format("http://~s:~b/~s", [Addr, Port, DbName])). +compare_dbs(Source, Target) -> + {ok, TargetDb} = fabric2_db:open(Target, []), + {ok, TargetDbInfo} = fabric2_db:get_db_info(TargetDb), + Fun = fun(SrcDoc, TgtDoc, Acc) -> + case SrcDoc#doc.id == <<"doc2">> of + true -> [SrcDoc#doc.body == TgtDoc#doc.body | Acc]; + false -> [not_found == TgtDoc | Acc] + end + end, + Res = couch_replicator_test_helper:compare_fold(Source, Target, Fun, []), + {ok, TargetDbInfo, Res}. + + +create_docs(DbName) -> + couch_replicator_test_helper:create_docs(DbName, [ + #{<<"_id">> => <<"doc1">>}, + #{<<"_id">> => <<"doc2">>} + ]). diff --git a/src/couch_replicator/test/eunit/couch_replicator_small_max_request_size_target.erl b/src/couch_replicator/test/eunit/couch_replicator_small_max_request_size_target.erl index 8aebbe151..b113c5392 100644 --- a/src/couch_replicator/test/eunit/couch_replicator_small_max_request_size_target.erl +++ b/src/couch_replicator/test/eunit/couch_replicator_small_max_request_size_target.erl @@ -1,139 +1,70 @@ -module(couch_replicator_small_max_request_size_target). + -include_lib("couch/include/couch_eunit.hrl"). -include_lib("couch/include/couch_db.hrl"). - --import(couch_replicator_test_helper, [ - db_url/1, - replicate/1, - compare_dbs/3 -]). - --define(TIMEOUT_EUNIT, 360). - - -setup() -> - DbName = ?tempdb(), - {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]), - ok = couch_db:close(Db), - DbName. - - -setup(remote) -> - {remote, setup()}; - -setup({A, B}) -> - Ctx = test_util:start_couch([couch_replicator]), - config:set("httpd", "max_http_request_size", "10000", false), - Source = setup(A), - Target = setup(B), - {Ctx, {Source, Target}}. - - -teardown({remote, DbName}) -> - teardown(DbName); -teardown(DbName) -> - ok = couch_server:delete(DbName, [?ADMIN_CTX]), - ok. - -teardown(_, {Ctx, {Source, Target}}) -> - teardown(Source), - teardown(Target), - ok = application:stop(couch_replicator), - ok = test_util:stop_couch(Ctx). +-include_lib("fabric/test/fabric2_test.hrl"). reduce_max_request_size_test_() -> - Pairs = [{remote, remote}], { "Replicate docs when target has a small max_http_request_size", { - foreachx, - fun setup/1, fun teardown/2, - [{Pair, fun should_replicate_all_docs/2} - || Pair <- Pairs] - ++ [{Pair, fun should_replicate_one/2} - || Pair <- Pairs] - % Disabled. See issue 574. Sometimes PUTs with a doc and - % attachment which exceed maximum request size are simply - % closed instead of returning a 413 request. That makes these - % tests flaky. - ++ [{Pair, fun should_replicate_one_with_attachment/2} - || Pair <- Pairs] + setup, + fun couch_replicator_test_helper:start_couch/0, + fun couch_replicator_test_helper:stop_couch/1, + { + foreach, + fun setup/0, + fun teardown/1, + [ + ?TDEF_FE(should_replicate_all_docs, 120), + ?TDEF_FE(should_replicate_one, 120), + ?TDEF_FE(should_replicate_one_with_attachment, 120) + ] + } } }. -% Test documents which are below max_http_request_size but when batched, batch size -% will be greater than max_http_request_size. Replicator could automatically split -% the batch into smaller batches and POST those separately. -should_replicate_all_docs({From, To}, {_Ctx, {Source, Target}}) -> - {lists:flatten(io_lib:format("~p -> ~p", [From, To])), - {inorder, [should_populate_source(Source), - should_replicate(Source, Target), - should_compare_databases(Source, Target, [])]}}. - - -% If a document is too large to post as a single request, that document is -% skipped but replication overall will make progress and not crash. -should_replicate_one({From, To}, {_Ctx, {Source, Target}}) -> - {lists:flatten(io_lib:format("~p -> ~p", [From, To])), - {inorder, [should_populate_source_one_large_one_small(Source), - should_replicate(Source, Target), - should_compare_databases(Source, Target, [<<"doc0">>])]}}. - - -% If a document has an attachment > 64 * 1024 bytes, replicator will switch to -% POST-ing individual documents directly and skip bulk_docs. Test that case -% separately -% See note in main test function why this was disabled. -should_replicate_one_with_attachment({From, To}, {_Ctx, {Source, Target}}) -> - {lists:flatten(io_lib:format("~p -> ~p", [From, To])), - {inorder, [should_populate_source_one_large_attachment(Source), - should_populate_source(Source), - should_replicate(Source, Target), - should_compare_databases(Source, Target, [<<"doc0">>])]}}. - - -should_populate_source({remote, Source}) -> - should_populate_source(Source); - -should_populate_source(Source) -> - {timeout, ?TIMEOUT_EUNIT, ?_test(add_docs(Source, 5, 3000, 0))}. - - -should_populate_source_one_large_one_small({remote, Source}) -> - should_populate_source_one_large_one_small(Source); - -should_populate_source_one_large_one_small(Source) -> - {timeout, ?TIMEOUT_EUNIT, ?_test(one_large_one_small(Source, 12000, 3000))}. - - -should_populate_source_one_large_attachment({remote, Source}) -> - should_populate_source_one_large_attachment(Source); - -should_populate_source_one_large_attachment(Source) -> - {timeout, ?TIMEOUT_EUNIT, ?_test(one_large_attachment(Source, 70000, 70000))}. +setup() -> + Source = couch_replicator_test_helper:create_db(), + Target = couch_replicator_test_helper:create_db(), + config:set("httpd", "max_http_request_size", "10000", false), + {Source, Target}. -should_replicate({remote, Source}, Target) -> - should_replicate(db_url(Source), Target); +teardown({Source, Target}) -> + config:delete("httpd", "max_http_request_size", false), + couch_replicator_test_helper:delete_db(Source), + couch_replicator_test_helper:delete_db(Target). -should_replicate(Source, {remote, Target}) -> - should_replicate(Source, db_url(Target)); -should_replicate(Source, Target) -> - {timeout, ?TIMEOUT_EUNIT, ?_test(replicate(Source, Target))}. +% Test documents which are below max_http_request_size but when batched, batch +% size will be greater than max_http_request_size. Replicator could +% automatically split the batch into smaller batches and POST those separately. +should_replicate_all_docs({Source, Target}) -> + ?assertEqual(ok, add_docs(Source, 5, 3000, 0)), + replicate(Source, Target), + compare_dbs(Source, Target, []). -should_compare_databases({remote, Source}, Target, ExceptIds) -> - should_compare_databases(Source, Target, ExceptIds); +% If a document is too large to post as a single request, that document is +% skipped but replication overall will make progress and not crash. +should_replicate_one({Source, Target}) -> + ?assertEqual(ok, one_large_one_small(Source, 12000, 3000)), + replicate(Source, Target), + compare_dbs(Source, Target, [<<"doc0">>]). -should_compare_databases(Source, {remote, Target}, ExceptIds) -> - should_compare_databases(Source, Target, ExceptIds); -should_compare_databases(Source, Target, ExceptIds) -> - {timeout, ?TIMEOUT_EUNIT, ?_test(compare_dbs(Source, Target, ExceptIds))}. +% If a document has an attachment > 64 * 1024 bytes, replicator will switch to +% POST-ing individual documents directly and skip bulk_docs. Test that case +% separately See note in main test function why this was disabled. +should_replicate_one_with_attachment({Source, Target}) -> + ?assertEqual(ok, one_large_attachment(Source, 70000, 70000)), + ?assertEqual(ok, add_docs(Source, 5, 3000, 0)), + replicate(Source, Target), + compare_dbs(Source, Target, [<<"doc0">>]). binary_chunk(Size) when is_integer(Size), Size > 0 -> @@ -150,19 +81,21 @@ add_docs(DbName, DocCount, DocSize, AttSize) -> one_large_one_small(DbName, Large, Small) -> add_doc(DbName, <<"doc0">>, Large, 0), - add_doc(DbName, <<"doc1">>, Small, 0). + add_doc(DbName, <<"doc1">>, Small, 0), + ok. one_large_attachment(DbName, Size, AttSize) -> - add_doc(DbName, <<"doc0">>, Size, AttSize). + add_doc(DbName, <<"doc0">>, Size, AttSize), + ok. add_doc(DbName, DocId, Size, AttSize) when is_binary(DocId) -> - {ok, Db} = couch_db:open_int(DbName, []), - Doc0 = #doc{id = DocId, body = {[{<<"x">>, binary_chunk(Size)}]}}, - Doc = Doc0#doc{atts = atts(AttSize)}, - {ok, _} = couch_db:update_doc(Db, Doc, []), - couch_db:close(Db). + {ok, Db} = fabric2_db:open(DbName, [?ADMIN_CTX]), + Doc0 = #doc{id = DocId, body = {[{<<"x">>, binary_chunk(Size)}]}}, + Doc = Doc0#doc{atts = atts(AttSize)}, + {ok, _} = fabric2_db:update_doc(Db, Doc, []), + ok. atts(0) -> @@ -178,8 +111,13 @@ atts(Size) -> replicate(Source, Target) -> - replicate({[ - {<<"source">>, Source}, - {<<"target">>, Target}, - {<<"worker_processes">>, "1"} % This make batch_size predictable - ]}). + ?assertMatch({ok, _}, couch_replicator_test_helper:replicate(#{ + <<"source">> => Source, + <<"target">> => Target, + <<"worker_processes">> => 1 % This make batch_size predictable + })). + + +compare_dbs(Source, Target, ExceptIds) -> + ?assertEqual(ok, couch_replicator_test_helper:compare_dbs(Source, Target, + ExceptIds)). diff --git a/src/couch_replicator/test/eunit/couch_replicator_test_helper.erl b/src/couch_replicator/test/eunit/couch_replicator_test_helper.erl index fd0409164..2ac447eb3 100644 --- a/src/couch_replicator/test/eunit/couch_replicator_test_helper.erl +++ b/src/couch_replicator/test/eunit/couch_replicator_test_helper.erl @@ -1,51 +1,166 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + -module(couch_replicator_test_helper). --include_lib("couch/include/couch_eunit.hrl"). --include_lib("couch/include/couch_db.hrl"). --include_lib("couch_replicator/src/couch_replicator.hrl"). -export([ + start_couch/0, + stop_couch/1, + + create_db/0, + create_db/1, + delete_db/1, + + server_url/0, + db_url/1, + + create_docs/2, + compare_dbs/2, compare_dbs/3, - db_url/1, - replicate/1, + compare_fold/4, + + compare_docs/2, + get_pid/1, - replicate/2 + + replicate/1, + replicate/2, + replicate_continuous/1, + replicate_continuous/2, + + cancel/1, + cancel/2, + + scheduler_jobs/0 ]). +-include_lib("couch/include/couch_eunit.hrl"). +-include_lib("couch/include/couch_db.hrl"). +-include_lib("couch_replicator/src/couch_replicator.hrl"). + + +-define(USERNAME, "rep_eunit_admin"). +-define(PASSWORD, "rep_eunit_password"). + + +start_couch() -> + Ctx = test_util:start_couch([fabric, chttpd, couch_replicator]), + Hashed = couch_passwords:hash_admin_password(?PASSWORD), + ok = config:set("admins", ?USERNAME, ?b2l(Hashed), _Persist = false), + Ctx. + + +stop_couch(Ctx) -> + config:delete("admins", ?USERNAME, _Persist = false), + test_util:stop_couch(Ctx). + + +create_db() -> + {ok, Db} = fabric2_db:create(?tempdb(), [?ADMIN_CTX]), + fabric2_db:name(Db). + + +create_db(DbName) when is_binary(DbName) -> + {ok, Db} = fabric2_db:create(DbName, [?ADMIN_CTX]), + fabric2_db:name(Db). + + +delete_db(DbName) -> + try + ok = fabric2_db:delete(DbName, [?ADMIN_CTX]) + catch + error:database_does_not_exist -> + ok + end. + + +server_url() -> + Addr = config:get("chttpd", "bind_address", "127.0.0.1"), + Port = mochiweb_socket_server:get(chttpd, port), + Fmt = "http://~s:~s@~s:~b", + ?l2b(io_lib:format(Fmt, [?USERNAME, ?PASSWORD, Addr, Port])). + + +db_url(DbName) -> + ?l2b(io_lib:format("~s/~s", [server_url(), DbName])). + + +create_docs(DbName, Docs) when is_binary(DbName), is_list(Docs) -> + {ok, Db} = fabric2_db:open(DbName, [?ADMIN_CTX]), + Docs1 = lists:map(fun(Doc) -> + case Doc of + #{} -> + Doc1 = couch_util:json_decode(couch_util:json_encode(Doc)), + couch_doc:from_json_obj(Doc1); + #doc{} -> + Doc + end + end, Docs), + {ok, ResList} = fabric2_db:update_docs(Db, Docs1), + lists:foreach(fun(Res) -> + ?assertMatch({ok, {_, Rev}} when is_binary(Rev), Res) + end, ResList). + + compare_dbs(Source, Target) -> - compare_dbs(Source, Target, []). - - -compare_dbs(Source, Target, ExceptIds) -> - {ok, SourceDb} = couch_db:open_int(Source, []), - {ok, TargetDb} = couch_db:open_int(Target, []), - - Fun = fun(FullDocInfo, Acc) -> - {ok, DocSource} = couch_db:open_doc(SourceDb, FullDocInfo), - Id = DocSource#doc.id, - case lists:member(Id, ExceptIds) of - true -> - ?assertEqual(not_found, couch_db:get_doc_info(TargetDb, Id)); - false -> - {ok, TDoc} = couch_db:open_doc(TargetDb, Id), - compare_docs(DocSource, TDoc) + Fun = fun(SrcDoc, TgtDoc, ok) -> compare_docs(SrcDoc, TgtDoc) end, + compare_fold(Source, Target, Fun, ok). + + +compare_dbs(Source, Target, ExceptIds) when is_binary(Source), + is_binary(Target), is_list(ExceptIds) -> + Fun = fun(SrcDoc, TgtDoc, ok) -> + case lists:member(SrcDoc#doc.id, ExceptIds) of + true -> ?assertEqual(not_found, TgtDoc); + false -> compare_docs(SrcDoc, TgtDoc) end, - {ok, Acc} + ok end, + compare_fold(Source, Target, Fun, ok). + - {ok, _} = couch_db:fold_docs(SourceDb, Fun, [], []), - ok = couch_db:close(SourceDb), - ok = couch_db:close(TargetDb). +compare_fold(Source, Target, Fun, Acc0) when + is_binary(Source), is_binary(Target), is_function(Fun, 3) -> + {ok, SourceDb} = fabric2_db:open(Source, [?ADMIN_CTX]), + {ok, TargetDb} = fabric2_db:open(Target, [?ADMIN_CTX]), + fabric2_fdb:transactional(SourceDb, fun(TxSourceDb) -> + FoldFun = fun + ({meta, _Meta}, Acc) -> + {ok, Acc}; + (complete, Acc) -> + {ok, Acc}; + ({row, Row}, Acc) -> + {_, Id} = lists:keyfind(id, 1, Row), + SrcDoc = open_doc(TxSourceDb, Id), + TgtDoc = open_doc(TargetDb, Id), + {ok, Fun(SrcDoc, TgtDoc, Acc)} + end, + Opts = [{restart_tx, true}], + {ok, AccF} = fabric2_db:fold_docs(TxSourceDb, FoldFun, Acc0, Opts), + AccF + end). -compare_docs(Doc1, Doc2) -> +compare_docs(#doc{} = Doc1, Doc2) when + is_record(Doc2, doc) orelse Doc2 =:= not_found -> + ?assert(Doc2 =/= not_found), ?assertEqual(Doc1#doc.body, Doc2#doc.body), #doc{atts = Atts1} = Doc1, #doc{atts = Atts2} = Doc2, ?assertEqual(lists:sort([couch_att:fetch(name, Att) || Att <- Atts1]), - lists:sort([couch_att:fetch(name, Att) || Att <- Atts2])), + lists:sort([couch_att:fetch(name, Att) || Att <- Atts2])), FunCompareAtts = fun(Att) -> AttName = couch_att:fetch(name, Att), {ok, AttTarget} = find_att(Atts2, AttName), @@ -68,19 +183,109 @@ compare_docs(Doc1, Doc2) -> ?assert(is_integer(couch_att:fetch(disk_len, AttTarget))), ?assert(is_integer(couch_att:fetch(att_len, AttTarget))), ?assertEqual(couch_att:fetch(disk_len, Att), - couch_att:fetch(disk_len, AttTarget)), + couch_att:fetch(disk_len, AttTarget)), ?assertEqual(couch_att:fetch(att_len, Att), - couch_att:fetch(att_len, AttTarget)), + couch_att:fetch(att_len, AttTarget)), ?assertEqual(couch_att:fetch(type, Att), - couch_att:fetch(type, AttTarget)), + couch_att:fetch(type, AttTarget)), ?assertEqual(couch_att:fetch(md5, Att), - couch_att:fetch(md5, AttTarget)) + couch_att:fetch(md5, AttTarget)) end, lists:foreach(FunCompareAtts, Atts1). +get_pid(RepId) -> + JobId = case couch_replicator_jobs:get_job_id(undefined, RepId) of + {ok, JobId0} -> JobId0; + {error, not_found} -> RepId + end, + {ok, #{<<"state">> := <<"running">>, <<"pid">> := Pid0}} = + couch_replicator_jobs:get_job_data(undefined, JobId), + Pid = list_to_pid(binary_to_list(Pid0)), + ?assert(is_pid(Pid)), + ?assert(is_process_alive(Pid)), + Pid. + + +replicate({[_ | _]} = EJson) -> + Str = couch_util:json_encode(EJson), + replicate(couch_util:json_decode(Str, [return_maps])); + +replicate(#{} = Rep0) -> + Rep = maybe_db_urls(Rep0), + {ok, Id, _} = couch_replicator_parse:parse_transient_rep(Rep, null), + ok = cancel(Id), + try + couch_replicator:replicate(Rep, ?ADMIN_USER) + after + ok = cancel(Id) + end. + + +replicate(Source, Target) -> + replicate(#{ + <<"source">> => Source, + <<"target">> => Target + }). + + +replicate_continuous({[_ | _]} = EJson) -> + Str = couch_util:json_encode(EJson), + replicate_continuous(couch_util:json_decode(Str, [return_maps])); + +replicate_continuous(#{<<"continuous">> := true} = Rep0) -> + Rep = maybe_db_urls(Rep0), + {ok, {continuous, RepId}} = couch_replicator:replicate(Rep, ?ADMIN_USER), + {ok, get_pid(RepId), RepId}. + + +replicate_continuous(Source, Target) -> + replicate_continuous(#{ + <<"source">> => Source, + <<"target">> => Target, + <<"continuous">> => true + }). + + +cancel(Id) when is_binary(Id) -> + CancelRep = #{<<"cancel">> => true, <<"id">> => Id}, + case couch_replicator:replicate(CancelRep, ?ADMIN_USER) of + {ok, {cancelled, <<_/binary>>}} -> ok; + {error, not_found} -> ok + end. + + +cancel(Id, Pid) when is_pid(Pid), is_binary(Id) -> + Ref = monitor(process, Pid), + try + cancel(Id) + after + receive + {'DOWN', Ref, _, _, _} -> ok + after 60000 -> + error(replicator_pid_death_timeout) + end + end. + + +scheduler_jobs() -> + ServerUrl = couch_replicator_test_helper:server_url(), + Url = lists:flatten(io_lib:format("~s/_scheduler/jobs", [ServerUrl])), + {ok, 200, _, Body} = test_request:get(Url, []), + Json = jiffy:decode(Body, [return_maps]), + maps:get(<<"jobs">>, Json). + + +open_doc(Db, DocId) -> + case fabric2_db:open_doc(Db, DocId, []) of + {ok, #doc{deleted = false} = Doc} -> Doc; + {not_found, missing} -> not_found + end. + + find_att([], _Name) -> nil; + find_att([Att | Rest], Name) -> case couch_att:fetch(name, Att) of Name -> @@ -91,45 +296,29 @@ find_att([Att | Rest], Name) -> att_md5(Att) -> - Md50 = couch_att:foldl( - Att, - fun(Chunk, Acc) -> couch_hash:md5_hash_update(Acc, Chunk) end, - couch_hash:md5_hash_init()), + Md50 = couch_att:foldl(Att, fun(Chunk, Acc) -> + couch_hash:md5_hash_update(Acc, Chunk) + end, couch_hash:md5_hash_init()), couch_hash:md5_hash_final(Md50). + att_decoded_md5(Att) -> - Md50 = couch_att:foldl_decode( - Att, - fun(Chunk, Acc) -> couch_hash:md5_hash_update(Acc, Chunk) end, - couch_hash:md5_hash_init()), + Md50 = couch_att:foldl_decode(Att, fun(Chunk, Acc) -> + couch_hash:md5_hash_update(Acc, Chunk) + end, couch_hash:md5_hash_init()), couch_hash:md5_hash_final(Md50). -db_url(DbName) -> - iolist_to_binary([ - "http://", config:get("httpd", "bind_address", "127.0.0.1"), - ":", integer_to_list(mochiweb_socket_server:get(couch_httpd, port)), - "/", DbName - ]). - -get_pid(RepId) -> - Pid = global:whereis_name({couch_replicator_scheduler_job,RepId}), - ?assert(is_pid(Pid)), - Pid. -replicate(Source, Target) -> - replicate({[ - {<<"source">>, Source}, - {<<"target">>, Target} - ]}). - -replicate({[_ | _]} = RepObject) -> - {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER), - ok = couch_replicator_scheduler:add_job(Rep), - couch_replicator_scheduler:reschedule(), - Pid = get_pid(Rep#rep.id), - MonRef = erlang:monitor(process, Pid), - receive - {'DOWN', MonRef, process, Pid, _} -> - ok +maybe_db_urls(#{} = Rep) -> + #{<<"source">> := Src, <<"target">> := Tgt} = Rep, + Src1 = case Src of + <<"http://", _/binary>> -> Src; + <<"https://", _/binary>> -> Src; + <<_/binary>> -> db_url(Src) + end, + Tgt1 = case Tgt of + <<"http://", _/binary>> -> Tgt; + <<"https://", _/binary>> -> Tgt; + <<_/binary>> -> db_url(Tgt) end, - ok = couch_replicator_scheduler:remove_job(Rep#rep.id). + Rep#{<<"source">> := Src1, <<"target">> := Tgt1}. diff --git a/src/couch_replicator/test/eunit/couch_replicator_transient_jobs_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_transient_jobs_tests.erl new file mode 100644 index 000000000..25fc6a3ff --- /dev/null +++ b/src/couch_replicator/test/eunit/couch_replicator_transient_jobs_tests.erl @@ -0,0 +1,106 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_replicator_transient_jobs_tests). + +-include_lib("couch/include/couch_eunit.hrl"). +-include_lib("couch/include/couch_db.hrl"). +-include_lib("couch_replicator/src/couch_replicator.hrl"). +-include_lib("fabric/test/fabric2_test.hrl"). + + +transient_jobs_test_() -> + { + "Transient jobs tests", + { + setup, + fun couch_replicator_test_helper:start_couch/0, + fun couch_replicator_test_helper:stop_couch/1, + { + foreach, + fun setup/0, + fun teardown/1, + [ + ?TDEF_FE(transient_job_is_removed, 10), + ?TDEF_FE(posting_same_job_is_a_noop, 10) + ] + } + } + }. + + +setup() -> + Source = couch_replicator_test_helper:create_db(), + couch_replicator_test_helper:create_docs(Source, [ + #{<<"_id">> => <<"doc1">>} + ]), + Target = couch_replicator_test_helper:create_db(), + config:set("replicator", "stats_update_interval_sec", "0", false), + config:set("replicator", "transient_job_max_age_sec", "9999", false), + {Source, Target}. + + +teardown({Source, Target}) -> + config:delete("replicator", "stats_update_interval_sec", false), + config:delete("replicator", "transient_job_max_age_sec", false), + couch_replicator_test_helper:delete_db(Source), + couch_replicator_test_helper:delete_db(Target). + + +transient_job_is_removed({Source, Target}) -> + {ok, #{}} = replicate(Source, Target), + JobId = get_rep_id(Source, Target), + + couch_replicator_job_server:reschedule(), + + % Still there after clean up attempt ran + ?assertMatch({200, #{}}, scheduler_jobs(JobId)), + + config:set("replicator", "transient_job_max_age_sec", "0", false), + couch_replicator_job_server:reschedule(), + + % Should be gone now + ?assertMatch({404, #{}}, scheduler_jobs(JobId)). + + +posting_same_job_is_a_noop({Source, Target}) -> + {ok, Pid1, RepId1} = replicate_continuous(Source, Target), + {ok, Pid2, RepId2} = replicate_continuous(Source, Target), + ?assertEqual(RepId1, RepId2), + ?assertEqual(Pid1, Pid2), + couch_replicator_test_helper:cancel(RepId1). + + +get_rep_id(Source, Target) -> + {ok, Id, _} = couch_replicator_parse:parse_transient_rep(#{ + <<"source">> => couch_replicator_test_helper:db_url(Source), + <<"target">> => couch_replicator_test_helper:db_url(Target) + }, null), + Id. + + +replicate(Source, Target) -> + couch_replicator:replicate(#{ + <<"source">> => couch_replicator_test_helper:db_url(Source), + <<"target">> => couch_replicator_test_helper:db_url(Target) + }, ?ADMIN_USER). + + +replicate_continuous(Source, Target) -> + couch_replicator_test_helper:replicate_continuous(Source, Target). + + +scheduler_jobs(Id) -> + SUrl = couch_replicator_test_helper:server_url(), + Url = lists:flatten(io_lib:format("~s/_scheduler/jobs/~s", [SUrl, Id])), + {ok, Code, _, Body} = test_request:get(Url, []), + {Code, jiffy:decode(Body, [return_maps])}. diff --git a/src/couch_replicator/test/eunit/couch_replicator_use_checkpoints_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_use_checkpoints_tests.erl index 8e4a21dbb..4371eff1f 100644 --- a/src/couch_replicator/test/eunit/couch_replicator_use_checkpoints_tests.erl +++ b/src/couch_replicator/test/eunit/couch_replicator_use_checkpoints_tests.erl @@ -14,165 +14,82 @@ -include_lib("couch/include/couch_eunit.hrl"). -include_lib("couch/include/couch_db.hrl"). +-include_lib("fabric/test/fabric2_test.hrl"). --import(couch_replicator_test_helper, [ - db_url/1, - replicate/1 -]). -define(DOCS_COUNT, 100). --define(TIMEOUT_EUNIT, 30). -define(i2l(I), integer_to_list(I)). -define(io2b(Io), iolist_to_binary(Io)). -start(false) -> - fun - ({finished, _, {CheckpointHistory}}) -> - ?assertEqual([{<<"use_checkpoints">>,false}], CheckpointHistory); - (_) -> - ok - end; -start(true) -> - fun - ({finished, _, {CheckpointHistory}}) -> - ?assertNotEqual(false, lists:keyfind(<<"session_id">>, - 1, CheckpointHistory)); - (_) -> - ok - end. - -stop(_, _) -> - ok. - -setup() -> - DbName = ?tempdb(), - {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]), - ok = couch_db:close(Db), - DbName. - -setup(remote) -> - {remote, setup()}; -setup({_, Fun, {A, B}}) -> - Ctx = test_util:start_couch([couch_replicator]), - {ok, Listener} = couch_replicator_notifier:start_link(Fun), - Source = setup(A), - Target = setup(B), - {Ctx, {Source, Target, Listener}}. - -teardown({remote, DbName}) -> - teardown(DbName); -teardown(DbName) -> - ok = couch_server:delete(DbName, [?ADMIN_CTX]), - ok. - -teardown(_, {Ctx, {Source, Target, Listener}}) -> - teardown(Source), - teardown(Target), - - couch_replicator_notifier:stop(Listener), - ok = application:stop(couch_replicator), - ok = test_util:stop_couch(Ctx). - use_checkpoints_test_() -> { - "Replication use_checkpoints feature tests", + setup, + fun couch_replicator_test_helper:start_couch/0, + fun couch_replicator_test_helper:stop_couch/1, { - foreachx, - fun start/1, fun stop/2, - [{UseCheckpoints, fun use_checkpoints_tests/2} - || UseCheckpoints <- [false, true]] + foreach, + fun setup/0, + fun teardown/1, + [ + ?TDEF_FE(t_replicate_with_checkpoints, 15), + ?TDEF_FE(t_replicate_without_checkpoints, 15) + ] } }. -use_checkpoints_tests(UseCheckpoints, Fun) -> - Pairs = [{remote, remote}], - { - "use_checkpoints: " ++ atom_to_list(UseCheckpoints), - { - foreachx, - fun setup/1, fun teardown/2, - [{{UseCheckpoints, Fun, Pair}, fun should_test_checkpoints/2} - || Pair <- Pairs] - } - }. -should_test_checkpoints({UseCheckpoints, _, {From, To}}, {_Ctx, {Source, Target, _}}) -> - should_test_checkpoints(UseCheckpoints, {From, To}, {Source, Target}). -should_test_checkpoints(UseCheckpoints, {From, To}, {Source, Target}) -> - {lists:flatten(io_lib:format("~p -> ~p", [From, To])), - {inorder, [ - should_populate_source(Source, ?DOCS_COUNT), - should_replicate(Source, Target, UseCheckpoints), - should_compare_databases(Source, Target) - ]}}. - -should_populate_source({remote, Source}, DocCount) -> - should_populate_source(Source, DocCount); -should_populate_source(Source, DocCount) -> - {timeout, ?TIMEOUT_EUNIT, ?_test(populate_db(Source, DocCount))}. - -should_replicate({remote, Source}, Target, UseCheckpoints) -> - should_replicate(db_url(Source), Target, UseCheckpoints); -should_replicate(Source, {remote, Target}, UseCheckpoints) -> - should_replicate(Source, db_url(Target), UseCheckpoints); -should_replicate(Source, Target, UseCheckpoints) -> - {timeout, ?TIMEOUT_EUNIT, ?_test(replicate(Source, Target, UseCheckpoints))}. - -should_compare_databases({remote, Source}, Target) -> - should_compare_databases(Source, Target); -should_compare_databases(Source, {remote, Target}) -> - should_compare_databases(Source, Target); -should_compare_databases(Source, Target) -> - {timeout, ?TIMEOUT_EUNIT, ?_test(compare_dbs(Source, Target))}. +setup() -> + Source = couch_replicator_test_helper:create_db(), + Target = couch_replicator_test_helper:create_db(), + {Source, Target}. -populate_db(DbName, DocCount) -> - {ok, Db} = couch_db:open_int(DbName, []), - Docs = lists:foldl( - fun(DocIdCounter, Acc) -> - Id = ?io2b(["doc", ?i2l(DocIdCounter)]), - Value = ?io2b(["val", ?i2l(DocIdCounter)]), - Doc = #doc{ - id = Id, - body = {[ {<<"value">>, Value} ]} - }, - [Doc | Acc] - end, - [], lists:seq(1, DocCount)), - {ok, _} = couch_db:update_docs(Db, Docs, []), - ok = couch_db:close(Db). - -compare_dbs(Source, Target) -> - {ok, SourceDb} = couch_db:open_int(Source, []), - {ok, TargetDb} = couch_db:open_int(Target, []), - Fun = fun(FullDocInfo, Acc) -> - {ok, Doc} = couch_db:open_doc(SourceDb, FullDocInfo), - {Props} = DocJson = couch_doc:to_json_obj(Doc, [attachments]), - DocId = couch_util:get_value(<<"_id">>, Props), - DocTarget = case couch_db:open_doc(TargetDb, DocId) of - {ok, DocT} -> - DocT; - Error -> - erlang:error( - {assertion_failed, - [{module, ?MODULE}, {line, ?LINE}, - {reason, lists:concat(["Error opening document '", - ?b2l(DocId), "' from target: ", - couch_util:to_list(Error)])}]}) - end, - DocTargetJson = couch_doc:to_json_obj(DocTarget, [attachments]), - ?assertEqual(DocJson, DocTargetJson), - {ok, Acc} - end, - {ok, _} = couch_db:fold_docs(SourceDb, Fun, [], []), - ok = couch_db:close(SourceDb), - ok = couch_db:close(TargetDb). - -replicate(Source, Target, UseCheckpoints) -> - replicate({[ - {<<"source">>, Source}, - {<<"target">>, Target}, - {<<"use_checkpoints">>, UseCheckpoints} - ]}). +teardown({Source, Target}) -> + couch_replicator_test_helper:delete_db(Source), + couch_replicator_test_helper:delete_db(Target). + +t_replicate_with_checkpoints({Source, Target}) -> + populate_db(Source, ?DOCS_COUNT), + Res = couch_replicator_test_helper:replicate(#{ + <<"source">> => Source, + <<"target">> => Target, + <<"use_checkpoints">> => true + }), + ?assertMatch({ok, _}, Res), + + {ok, History} = Res, + ?assertMatch(#{<<"history">> := _, <<"session_id">> := _}, History), + + Checkpoints = maps:get(<<"history">>, History), + SessionId = maps:get(<<"session_id">>, History), + ?assert(is_binary(SessionId)), + ?assert(is_list(Checkpoints)), + ?assert(length(Checkpoints) >= 1), + + couch_replicator_test_helper:compare_dbs(Source, Target). + + +t_replicate_without_checkpoints({Source, Target}) -> + populate_db(Source, ?DOCS_COUNT), + Res = couch_replicator_test_helper:replicate(#{ + <<"source">> => Source, + <<"target">> => Target, + <<"use_checkpoints">> => false + }), + ?assertEqual({ok, #{<<"use_checkpoints">> => false}}, Res), + couch_replicator_test_helper:compare_dbs(Source, Target). + + +populate_db(DbName, DocCount) -> + Docs = lists:foldl(fun(DocIdCounter, Acc) -> + Id = ?io2b(["doc", ?i2l(DocIdCounter)]), + Value = ?io2b(["val", ?i2l(DocIdCounter)]), + Doc = #doc{ + id = Id, + body = {[{<<"value">>, Value}]} + }, + [Doc | Acc] + end, [], lists:seq(1, DocCount)), + couch_replicator_test_helper:create_docs(DbName, Docs). diff --git a/test/elixir/test/replication_test.exs b/test/elixir/test/replication_test.exs index 8b657d916..9af5ef81a 100644 --- a/test/elixir/test/replication_test.exs +++ b/test/elixir/test/replication_test.exs @@ -14,7 +14,10 @@ defmodule ReplicationTest do # This should probably go into `make elixir` like what # happens for JavaScript tests. - @moduletag config: [{"replicator", "startup_jitter", "0"}] + @moduletag config: [ + {"replicator", "startup_jitter", "0"}, + {"replicator", "stats_update_interval_sec", "0"} + ] test "source database not found with host" do name = random_db_name() |