summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2020-08-28 04:36:18 -0400
committerNick Vatamaniuc <nickva@users.noreply.github.com>2020-09-15 16:13:46 -0400
commitae858196848cf9533dfa03a2006227481f47388d (patch)
tree1daa6d95727f31edf2299842c8501cf0df57c1b1
parent99262909129602bceac82e7907ebfcafc9eba629 (diff)
downloadcouchdb-ae858196848cf9533dfa03a2006227481f47388d.tar.gz
Update and clean up tests
Update tests to use the new replicator. Also clean up redundancy and re-use some of the newer macros from fabric2 (?TDEF_FE). Make sure replicator tests are included in `make check`
-rw-r--r--Makefile2
-rw-r--r--src/couch_replicator/test/eunit/couch_replicator_attachments_too_large.erl90
-rw-r--r--src/couch_replicator/test/eunit/couch_replicator_connection_tests.erl274
-rw-r--r--src/couch_replicator/test/eunit/couch_replicator_create_target_with_options_tests.erl129
-rw-r--r--src/couch_replicator/test/eunit/couch_replicator_db_tests.erl332
-rw-r--r--src/couch_replicator/test/eunit/couch_replicator_filtered_tests.erl348
-rw-r--r--src/couch_replicator/test/eunit/couch_replicator_httpc_pool_tests.erl125
-rw-r--r--src/couch_replicator/test/eunit/couch_replicator_id_too_long_tests.erl91
-rw-r--r--src/couch_replicator/test/eunit/couch_replicator_job_server_tests.erl437
-rw-r--r--src/couch_replicator/test/eunit/couch_replicator_large_atts_tests.erl123
-rw-r--r--src/couch_replicator/test/eunit/couch_replicator_many_leaves_tests.erl241
-rw-r--r--src/couch_replicator/test/eunit/couch_replicator_missing_stubs_tests.erl179
-rw-r--r--src/couch_replicator/test/eunit/couch_replicator_proxy_tests.erl135
-rw-r--r--src/couch_replicator/test/eunit/couch_replicator_rate_limiter_tests.erl77
-rw-r--r--src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl223
-rw-r--r--src/couch_replicator/test/eunit/couch_replicator_selector_tests.erl136
-rw-r--r--src/couch_replicator/test/eunit/couch_replicator_small_max_request_size_target.erl190
-rw-r--r--src/couch_replicator/test/eunit/couch_replicator_test_helper.erl323
-rw-r--r--src/couch_replicator/test/eunit/couch_replicator_transient_jobs_tests.erl106
-rw-r--r--src/couch_replicator/test/eunit/couch_replicator_use_checkpoints_tests.erl207
-rw-r--r--test/elixir/test/replication_test.exs5
21 files changed, 2210 insertions, 1563 deletions
diff --git a/Makefile b/Makefile
index e8d366296..35b62f949 100644
--- a/Makefile
+++ b/Makefile
@@ -163,7 +163,7 @@ endif
.PHONY: check
check: all
@$(MAKE) emilio
- make eunit apps=couch_eval,couch_expiring_cache,ctrace,couch_jobs,couch_views,fabric,mango,chttpd
+ make eunit apps=couch_eval,couch_expiring_cache,ctrace,couch_jobs,couch_views,fabric,mango,chttpd,couch_replicator
make elixir tests=test/elixir/test/basics_test.exs,test/elixir/test/replication_test.exs,test/elixir/test/map_test.exs,test/elixir/test/all_docs_test.exs,test/elixir/test/bulk_docs_test.exs
make exunit apps=chttpd
make mango-test
diff --git a/src/couch_replicator/test/eunit/couch_replicator_attachments_too_large.erl b/src/couch_replicator/test/eunit/couch_replicator_attachments_too_large.erl
index ac4bb84f3..0e7e0ea5a 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_attachments_too_large.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_attachments_too_large.erl
@@ -12,72 +12,60 @@
-module(couch_replicator_attachments_too_large).
+
-include_lib("couch/include/couch_eunit.hrl").
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_replicator/src/couch_replicator.hrl").
-
-
-setup(_) ->
- Ctx = test_util:start_couch([couch_replicator]),
- Source = create_db(),
- create_doc_with_attachment(Source, <<"doc">>, 1000),
- Target = create_db(),
- {Ctx, {Source, Target}}.
-
-
-teardown(_, {Ctx, {Source, Target}}) ->
- delete_db(Source),
- delete_db(Target),
- config:delete("couchdb", "max_attachment_size"),
- ok = test_util:stop_couch(Ctx).
+-include_lib("fabric/test/fabric2_test.hrl").
attachment_too_large_replication_test_() ->
- Pairs = [{remote, remote}],
{
- "Attachment size too large replication tests",
+ setup,
+ fun couch_replicator_test_helper:start_couch/0,
+ fun couch_replicator_test_helper:stop_couch/1,
{
- foreachx,
- fun setup/1, fun teardown/2,
- [{Pair, fun should_succeed/2} || Pair <- Pairs] ++
- [{Pair, fun should_fail/2} || Pair <- Pairs]
+ foreach,
+ fun setup/0,
+ fun teardown/1,
+ [
+ ?TDEF_FE(t_should_succeed),
+ ?TDEF_FE(t_should_fail)
+ ]
}
}.
-should_succeed({From, To}, {_Ctx, {Source, Target}}) ->
- RepObject = {[
- {<<"source">>, db_url(From, Source)},
- {<<"target">>, db_url(To, Target)}
- ]},
- config:set("couchdb", "max_attachment_size", "1000", _Persist = false),
- {ok, _} = couch_replicator:replicate(RepObject, ?ADMIN_USER),
- ?_assertEqual(ok, couch_replicator_test_helper:compare_dbs(Source, Target)).
+setup() ->
+ Source = couch_replicator_test_helper:create_db(),
+ create_doc_with_attachment(Source, <<"doc">>, 1000),
+ Target = couch_replicator_test_helper:create_db(),
+ {Source, Target}.
-should_fail({From, To}, {_Ctx, {Source, Target}}) ->
- RepObject = {[
- {<<"source">>, db_url(From, Source)},
- {<<"target">>, db_url(To, Target)}
- ]},
- config:set("couchdb", "max_attachment_size", "999", _Persist = false),
- {ok, _} = couch_replicator:replicate(RepObject, ?ADMIN_USER),
- ?_assertError({badmatch, {not_found, missing}},
- couch_replicator_test_helper:compare_dbs(Source, Target)).
+teardown({Source, Target}) ->
+ config:delete("couchdb", "max_attachment_size", false),
+ couch_replicator_test_helper:delete_db(Source),
+ couch_replicator_test_helper:delete_db(Target).
-create_db() ->
- DbName = ?tempdb(),
- {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
- ok = couch_db:close(Db),
- DbName.
+t_should_succeed({Source, Target}) ->
+ config:set("couchdb", "max_attachment_size", "1000", false),
+ {ok, _} = couch_replicator_test_helper:replicate(Source, Target),
+ ?assertEqual(ok, couch_replicator_test_helper:compare_dbs(Source, Target)).
+
+
+t_should_fail({Source, Target}) ->
+ config:set("couchdb", "max_attachment_size", "999", false),
+ {ok, _} = couch_replicator_test_helper:replicate(Source, Target),
+ ExceptIds = [<<"doc">>],
+ ?assertEqual(ok, couch_replicator_test_helper:compare_dbs(Source,
+ Target, ExceptIds)).
create_doc_with_attachment(DbName, DocId, AttSize) ->
- {ok, Db} = couch_db:open(DbName, [?ADMIN_CTX]),
Doc = #doc{id = DocId, atts = att(AttSize)},
- {ok, _} = couch_db:update_doc(Db, Doc, []),
- couch_db:close(Db),
+ couch_replicator_test_helper:create_docs(DbName, [Doc]),
ok.
@@ -90,13 +78,3 @@ att(Size) when is_integer(Size), Size >= 1 ->
<< <<"x">> || _ <- lists:seq(1, Size) >>
end}
])].
-
-
-delete_db(DbName) ->
- ok = couch_server:delete(DbName, [?ADMIN_CTX]).
-
-
-db_url(remote, DbName) ->
- Addr = config:get("httpd", "bind_address", "127.0.0.1"),
- Port = mochiweb_socket_server:get(couch_httpd, port),
- ?l2b(io_lib:format("http://~s:~b/~s", [Addr, Port, DbName])).
diff --git a/src/couch_replicator/test/eunit/couch_replicator_connection_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_connection_tests.erl
index e75cc5a63..df30db25d 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_connection_tests.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_connection_tests.erl
@@ -12,187 +12,176 @@
-module(couch_replicator_connection_tests).
+
-include_lib("couch/include/couch_eunit.hrl").
-include_lib("couch/include/couch_db.hrl").
-
--define(TIMEOUT, 1000).
-
-
-setup() ->
- Host = config:get("httpd", "bind_address", "127.0.0.1"),
- Port = config:get("httpd", "port", "5984"),
- {Host, Port}.
-
-teardown(_) ->
- ok.
+-include_lib("fabric/test/fabric2_test.hrl").
httpc_pool_test_() ->
{
- "replicator connection sharing tests",
+ "Replicator connection sharing tests",
{
setup,
- fun() -> test_util:start_couch([couch_replicator]) end, fun test_util:stop_couch/1,
+ fun couch_replicator_test_helper:start_couch/0,
+ fun couch_replicator_test_helper:stop_couch/1,
{
foreach,
- fun setup/0, fun teardown/1,
+ fun setup/0,
+ fun teardown/1,
[
- fun connections_shared_after_release/1,
- fun connections_not_shared_after_owner_death/1,
- fun idle_connections_closed/1,
- fun test_owner_monitors/1,
- fun worker_discards_creds_on_create/1,
- fun worker_discards_url_creds_after_request/1,
- fun worker_discards_creds_in_headers_after_request/1,
- fun worker_discards_proxy_creds_after_request/1
+ ?TDEF_FE(connections_shared_after_release),
+ ?TDEF_FE(connections_not_shared_after_owner_death),
+ ?TDEF_FE(idle_connections_closed),
+ ?TDEF_FE(test_owner_monitors),
+ ?TDEF_FE(worker_discards_creds_on_create),
+ ?TDEF_FE(worker_discards_url_creds_after_request),
+ ?TDEF_FE(worker_discards_creds_in_headers_after_request),
+ ?TDEF_FE(worker_discards_proxy_creds_after_request)
]
}
}
}.
+setup() ->
+ Host = config:get("chttpd", "bind_address", "127.0.0.1"),
+ Port = config:get("chttpd", "port", "5984"),
+ {Host, Port}.
+
+
+teardown(_) ->
+ ok.
+
+
connections_shared_after_release({Host, Port}) ->
- ?_test(begin
- URL = "http://" ++ Host ++ ":" ++ Port,
- Self = self(),
- {ok, Pid} = couch_replicator_connection:acquire(URL),
- couch_replicator_connection:release(Pid),
- spawn(fun() ->
- Self ! couch_replicator_connection:acquire(URL)
- end),
- receive
- {ok, Pid2} ->
- ?assertEqual(Pid, Pid2)
- end
- end).
+ URL = "http://" ++ Host ++ ":" ++ Port,
+ Self = self(),
+ {ok, Pid} = couch_replicator_connection:acquire(URL),
+ couch_replicator_connection:release(Pid),
+ spawn(fun() ->
+ Self ! couch_replicator_connection:acquire(URL)
+ end),
+ receive
+ {ok, Pid2} ->
+ ?assertEqual(Pid, Pid2)
+ end.
connections_not_shared_after_owner_death({Host, Port}) ->
- ?_test(begin
- URL = "http://" ++ Host ++ ":" ++ Port,
- Self = self(),
- spawn(fun() ->
- Self ! couch_replicator_connection:acquire(URL),
- error("simulate division by zero without compiler warning")
- end),
- receive
- {ok, Pid} ->
- {ok, Pid2} = couch_replicator_connection:acquire(URL),
- ?assertNotEqual(Pid, Pid2),
- MRef = monitor(process, Pid),
- receive {'DOWN', MRef, process, Pid, _Reason} ->
+ URL = "http://" ++ Host ++ ":" ++ Port,
+ Self = self(),
+ spawn(fun() ->
+ Self ! couch_replicator_connection:acquire(URL),
+ error("simulate division by zero without compiler warning")
+ end),
+ receive
+ {ok, Pid} ->
+ {ok, Pid2} = couch_replicator_connection:acquire(URL),
+ ?assertNotEqual(Pid, Pid2),
+ MRef = monitor(process, Pid),
+ receive
+ {'DOWN', MRef, process, Pid, _Reason} ->
?assert(not is_process_alive(Pid));
- Other -> throw(Other)
- end
- end
- end).
+ Other ->
+ throw(Other)
+ end
+ end.
idle_connections_closed({Host, Port}) ->
- ?_test(begin
- URL = "http://" ++ Host ++ ":" ++ Port,
- {ok, Pid} = couch_replicator_connection:acquire(URL),
- couch_replicator_connection ! close_idle_connections,
- ?assert(ets:member(couch_replicator_connection, Pid)),
- % block until idle connections have closed
- sys:get_status(couch_replicator_connection),
- couch_replicator_connection:release(Pid),
- couch_replicator_connection ! close_idle_connections,
- % block until idle connections have closed
- sys:get_status(couch_replicator_connection),
- ?assert(not ets:member(couch_replicator_connection, Pid))
- end).
+ URL = "http://" ++ Host ++ ":" ++ Port,
+ {ok, Pid} = couch_replicator_connection:acquire(URL),
+ couch_replicator_connection ! close_idle_connections,
+ ?assert(ets:member(couch_replicator_connection, Pid)),
+ % block until idle connections have closed
+ sys:get_status(couch_replicator_connection),
+ couch_replicator_connection:release(Pid),
+ couch_replicator_connection ! close_idle_connections,
+ % block until idle connections have closed
+ sys:get_status(couch_replicator_connection),
+ ?assert(not ets:member(couch_replicator_connection, Pid)).
test_owner_monitors({Host, Port}) ->
- ?_test(begin
- URL = "http://" ++ Host ++ ":" ++ Port,
- {ok, Worker0} = couch_replicator_connection:acquire(URL),
- assert_monitors_equal([{process, self()}]),
- couch_replicator_connection:release(Worker0),
- assert_monitors_equal([]),
- {Workers, Monitors} = lists:foldl(fun(_, {WAcc, MAcc}) ->
- {ok, Worker1} = couch_replicator_connection:acquire(URL),
- MAcc1 = [{process, self()} | MAcc],
- assert_monitors_equal(MAcc1),
- {[Worker1 | WAcc], MAcc1}
- end, {[], []}, lists:seq(1,5)),
- lists:foldl(fun(Worker2, Acc) ->
- [_ | NewAcc] = Acc,
- couch_replicator_connection:release(Worker2),
- assert_monitors_equal(NewAcc),
- NewAcc
- end, Monitors, Workers)
- end).
+ URL = "http://" ++ Host ++ ":" ++ Port,
+ {ok, Worker0} = couch_replicator_connection:acquire(URL),
+ assert_monitors_equal([{process, self()}]),
+ couch_replicator_connection:release(Worker0),
+ assert_monitors_equal([]),
+ {Workers, Monitors} = lists:foldl(fun(_, {WAcc, MAcc}) ->
+ {ok, Worker1} = couch_replicator_connection:acquire(URL),
+ MAcc1 = [{process, self()} | MAcc],
+ assert_monitors_equal(MAcc1),
+ {[Worker1 | WAcc], MAcc1}
+ end, {[], []}, lists:seq(1, 5)),
+ lists:foldl(fun(Worker2, Acc) ->
+ [_ | NewAcc] = Acc,
+ couch_replicator_connection:release(Worker2),
+ assert_monitors_equal(NewAcc),
+ NewAcc
+ end, Monitors, Workers).
worker_discards_creds_on_create({Host, Port}) ->
- ?_test(begin
- {User, Pass, B64Auth} = user_pass(),
- URL = "http://" ++ User ++ ":" ++ Pass ++ "@" ++ Host ++ ":" ++ Port,
- {ok, WPid} = couch_replicator_connection:acquire(URL),
- Internals = worker_internals(WPid),
- ?assert(string:str(Internals, B64Auth) =:= 0),
- ?assert(string:str(Internals, Pass) =:= 0)
- end).
+ {User, Pass, B64Auth} = user_pass(),
+ URL = "http://" ++ User ++ ":" ++ Pass ++ "@" ++ Host ++ ":" ++ Port,
+ {ok, WPid} = couch_replicator_connection:acquire(URL),
+ Internals = worker_internals(WPid),
+ ?assert(string:str(Internals, B64Auth) =:= 0),
+ ?assert(string:str(Internals, Pass) =:= 0).
worker_discards_url_creds_after_request({Host, _}) ->
- ?_test(begin
- {User, Pass, B64Auth} = user_pass(),
- {Port, ServerPid} = server(),
- PortStr = integer_to_list(Port),
- URL = "http://" ++ User ++ ":" ++ Pass ++ "@" ++ Host ++ ":" ++ PortStr,
- {ok, WPid} = couch_replicator_connection:acquire(URL),
- ?assertMatch({ok, "200", _, _}, send_req(WPid, URL, [], [])),
- Internals = worker_internals(WPid),
- ?assert(string:str(Internals, B64Auth) =:= 0),
- ?assert(string:str(Internals, Pass) =:= 0),
- couch_replicator_connection:release(WPid),
- unlink(ServerPid),
- exit(ServerPid, kill)
- end).
+ {User, Pass, B64Auth} = user_pass(),
+ {Port, ServerPid} = server(),
+ PortStr = integer_to_list(Port),
+ URL = "http://" ++ User ++ ":" ++ Pass ++ "@" ++ Host ++ ":" ++ PortStr,
+ {ok, WPid} = couch_replicator_connection:acquire(URL),
+ ?assertMatch({ok, "200", _, _}, send_req(WPid, URL, [], [])),
+ Internals = worker_internals(WPid),
+ ?assert(string:str(Internals, B64Auth) =:= 0),
+ ?assert(string:str(Internals, Pass) =:= 0),
+ couch_replicator_connection:release(WPid),
+ unlink(ServerPid),
+ exit(ServerPid, kill).
worker_discards_creds_in_headers_after_request({Host, _}) ->
- ?_test(begin
- {_User, Pass, B64Auth} = user_pass(),
- {Port, ServerPid} = server(),
- PortStr = integer_to_list(Port),
- URL = "http://" ++ Host ++ ":" ++ PortStr,
- {ok, WPid} = couch_replicator_connection:acquire(URL),
- Headers = [{"Authorization", "Basic " ++ B64Auth}],
- ?assertMatch({ok, "200", _, _}, send_req(WPid, URL, Headers, [])),
- Internals = worker_internals(WPid),
- ?assert(string:str(Internals, B64Auth) =:= 0),
- ?assert(string:str(Internals, Pass) =:= 0),
- couch_replicator_connection:release(WPid),
- unlink(ServerPid),
- exit(ServerPid, kill)
- end).
+ {_User, Pass, B64Auth} = user_pass(),
+ {Port, ServerPid} = server(),
+ PortStr = integer_to_list(Port),
+ URL = "http://" ++ Host ++ ":" ++ PortStr,
+ {ok, WPid} = couch_replicator_connection:acquire(URL),
+ Headers = [{"Authorization", "Basic " ++ B64Auth}],
+ ?assertMatch({ok, "200", _, _}, send_req(WPid, URL, Headers, [])),
+ Internals = worker_internals(WPid),
+ ?assert(string:str(Internals, B64Auth) =:= 0),
+ ?assert(string:str(Internals, Pass) =:= 0),
+ couch_replicator_connection:release(WPid),
+ unlink(ServerPid),
+ exit(ServerPid, kill).
worker_discards_proxy_creds_after_request({Host, _}) ->
- ?_test(begin
- {User, Pass, B64Auth} = user_pass(),
- {Port, ServerPid} = server(),
- PortStr = integer_to_list(Port),
- URL = "http://" ++ Host ++ ":" ++ PortStr,
- {ok, WPid} = couch_replicator_connection:acquire(URL),
- Opts = [
- {proxy_host, Host},
- {proxy_port, Port},
- {proxy_user, User},
- {proxy_pass, Pass}
- ],
- ?assertMatch({ok, "200", _, _}, send_req(WPid, URL, [], Opts)),
- Internals = worker_internals(WPid),
- ?assert(string:str(Internals, B64Auth) =:= 0),
- ?assert(string:str(Internals, Pass) =:= 0),
- couch_replicator_connection:release(WPid),
- unlink(ServerPid),
- exit(ServerPid, kill)
- end).
+ {User, Pass, B64Auth} = user_pass(),
+ {Port, ServerPid} = server(),
+ PortStr = integer_to_list(Port),
+ URL = "http://" ++ Host ++ ":" ++ PortStr,
+ {ok, WPid} = couch_replicator_connection:acquire(URL),
+ Opts = [
+ {proxy_host, Host},
+ {proxy_port, Port},
+ {proxy_user, User},
+ {proxy_pass, Pass}
+ ],
+ ?assertMatch({ok, "200", _, _}, send_req(WPid, URL, [], Opts)),
+ Internals = worker_internals(WPid),
+ ?assert(string:str(Internals, B64Auth) =:= 0),
+ ?assert(string:str(Internals, Pass) =:= 0),
+ couch_replicator_connection:release(WPid),
+ unlink(ServerPid),
+ exit(ServerPid, kill).
send_req(WPid, URL, Headers, Opts) ->
@@ -237,5 +226,6 @@ server_responder(LSock) ->
assert_monitors_equal(ShouldBe) ->
sys:get_status(couch_replicator_connection),
- {monitors, Monitors} = process_info(whereis(couch_replicator_connection), monitors),
+ {monitors, Monitors} = process_info(whereis(couch_replicator_connection),
+ monitors),
?assertEqual(Monitors, ShouldBe).
diff --git a/src/couch_replicator/test/eunit/couch_replicator_create_target_with_options_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_create_target_with_options_tests.erl
index 63310d39e..c957fc199 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_create_target_with_options_tests.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_create_target_with_options_tests.erl
@@ -12,132 +12,137 @@
-module(couch_replicator_create_target_with_options_tests).
+
-include_lib("couch/include/couch_eunit.hrl").
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_replicator/src/couch_replicator.hrl").
-
--define(USERNAME, "rep_admin").
--define(PASSWORD, "secret").
-
-setup() ->
- Ctx = test_util:start_couch([fabric, mem3, couch_replicator, chttpd]),
- Hashed = couch_passwords:hash_admin_password(?PASSWORD),
- ok = config:set("admins", ?USERNAME, ?b2l(Hashed), _Persist=false),
- Source = ?tempdb(),
- Target = ?tempdb(),
- {Ctx, {Source, Target}}.
-
-
-teardown({Ctx, {_Source, _Target}}) ->
- config:delete("admins", ?USERNAME),
- ok = test_util:stop_couch(Ctx).
+-include_lib("fabric/test/fabric2_test.hrl").
create_target_with_options_replication_test_() ->
{
"Create target with range partitions tests",
{
- foreach,
- fun setup/0, fun teardown/1,
- [
- fun should_create_target_with_q_4/1,
- fun should_create_target_with_q_2_n_1/1,
- fun should_create_target_with_default/1,
- fun should_not_create_target_with_q_any/1
- ]
+ setup,
+ fun couch_replicator_test_helper:start_couch/0,
+ fun couch_replicator_test_helper:stop_couch/1,
+ {
+ foreach,
+ fun setup/0,
+ fun teardown/1,
+ [
+ ?TDEF_FE(should_create_target_with_q_4),
+ ?TDEF_FE(should_create_target_with_q_2_n_1),
+ ?TDEF_FE(should_create_target_with_default),
+ ?TDEF_FE(should_not_create_target_with_q_any)
+ ]
+ }
}
}.
-should_create_target_with_q_4({_Ctx, {Source, Target}}) ->
+setup() ->
+ Source = ?tempdb(),
+ Target = ?tempdb(),
+ {Source, Target}.
+
+
+teardown({Source, Target}) ->
+ delete_db(Source),
+ delete_db(Target).
+
+
+should_create_target_with_q_4({Source, Target}) ->
RepObject = {[
- {<<"source">>, db_url(Source)},
- {<<"target">>, db_url(Target)},
+ {<<"source">>, Source},
+ {<<"target">>, Target},
{<<"create_target">>, true},
{<<"create_target_params">>, {[{<<"q">>, <<"4">>}]}}
]},
create_db(Source),
create_doc(Source),
- {ok, _} = couch_replicator:replicate(RepObject, ?ADMIN_USER),
+ {ok, _} = couch_replicator_test_helper:replicate(RepObject),
- {ok, TargetInfo} = fabric:get_db_info(Target),
+ TargetInfo = db_info(Target),
{ClusterInfo} = couch_util:get_value(cluster, TargetInfo),
delete_db(Source),
delete_db(Target),
- ?_assertEqual(4, couch_util:get_value(q, ClusterInfo)).
+ ?assertEqual(0, couch_util:get_value(q, ClusterInfo)).
-should_create_target_with_q_2_n_1({_Ctx, {Source, Target}}) ->
+should_create_target_with_q_2_n_1({Source, Target}) ->
RepObject = {[
- {<<"source">>, db_url(Source)},
- {<<"target">>, db_url(Target)},
+ {<<"source">>, Source},
+ {<<"target">>, Target},
{<<"create_target">>, true},
{<<"create_target_params">>,
{[{<<"q">>, <<"2">>}, {<<"n">>, <<"1">>}]}}
]},
create_db(Source),
create_doc(Source),
- {ok, _} = couch_replicator:replicate(RepObject, ?ADMIN_USER),
+ {ok, _} = couch_replicator_test_helper:replicate(RepObject),
- {ok, TargetInfo} = fabric:get_db_info(Target),
+ TargetInfo = db_info(Target),
{ClusterInfo} = couch_util:get_value(cluster, TargetInfo),
delete_db(Source),
delete_db(Target),
- [
- ?_assertEqual(2, couch_util:get_value(q, ClusterInfo)),
- ?_assertEqual(1, couch_util:get_value(n, ClusterInfo))
- ].
+ ?assertEqual(0, couch_util:get_value(q, ClusterInfo)),
+ ?assertEqual(0, couch_util:get_value(n, ClusterInfo)).
-should_create_target_with_default({_Ctx, {Source, Target}}) ->
+should_create_target_with_default({Source, Target}) ->
RepObject = {[
- {<<"source">>, db_url(Source)},
- {<<"target">>, db_url(Target)},
+ {<<"source">>, Source},
+ {<<"target">>, Target},
{<<"create_target">>, true}
]},
create_db(Source),
create_doc(Source),
- {ok, _} = couch_replicator:replicate(RepObject, ?ADMIN_USER),
+ {ok, _} = couch_replicator_test_helper:replicate(RepObject),
- {ok, TargetInfo} = fabric:get_db_info(Target),
+ TargetInfo = db_info(Target),
{ClusterInfo} = couch_util:get_value(cluster, TargetInfo),
- Q = config:get("cluster", "q", "8"),
delete_db(Source),
delete_db(Target),
- ?_assertEqual(list_to_integer(Q), couch_util:get_value(q, ClusterInfo)).
+ ?assertEqual(0, couch_util:get_value(q, ClusterInfo)).
-should_not_create_target_with_q_any({_Ctx, {Source, Target}}) ->
+should_not_create_target_with_q_any({Source, Target}) ->
RepObject = {[
- {<<"source">>, db_url(Source)},
- {<<"target">>, db_url(Target)},
+ {<<"source">>, Source},
+ {<<"target">>, Target},
{<<"create_target">>, false},
{<<"create_target_params">>, {[{<<"q">>, <<"1">>}]}}
]},
create_db(Source),
create_doc(Source),
- {error, _} = couch_replicator:replicate(RepObject, ?ADMIN_USER),
- DbExist = is_list(catch mem3:shards(Target)),
+ {error, _} = couch_replicator_test_helper:replicate(RepObject),
+ Exists = try
+ fabric2_db:open(Target, [?ADMIN_CTX]),
+ ?assert(false)
+ catch
+ error:database_does_not_exist ->
+ database_does_not_exist
+ end,
delete_db(Source),
- ?_assertEqual(false, DbExist).
+ ?assertEqual(Exists, database_does_not_exist).
create_doc(DbName) ->
- Body = {[{<<"foo">>, <<"bar">>}]},
- NewDoc = #doc{body = Body},
- {ok, _} = fabric:update_doc(DbName, NewDoc, [?ADMIN_CTX]).
+ couch_replicator_test_helper:create_docs(DbName, [
+ #{<<"_id">> => fabric2_util:uuid(), <<"foo">> => <<"bar">>}
+ ]).
create_db(DbName) ->
- ok = fabric:create_db(DbName, [?ADMIN_CTX]).
+ couch_replicator_test_helper:create_db(DbName).
delete_db(DbName) ->
- ok = fabric:delete_db(DbName, [?ADMIN_CTX]).
+ couch_replicator_test_helper:delete_db(DbName).
-db_url(DbName) ->
- Addr = config:get("chttpd", "bind_address", "127.0.0.1"),
- Port = mochiweb_socket_server:get(chttpd, port),
- ?l2b(io_lib:format("http://~s:~s@~s:~b/~s", [?USERNAME, ?PASSWORD, Addr,
- Port, DbName])).
+db_info(DbName) ->
+ {ok, Db} = fabric2_db:open(DbName, [?ADMIN_CTX]),
+ {ok, Info} = fabric2_db:get_db_info(Db),
+ Info.
diff --git a/src/couch_replicator/test/eunit/couch_replicator_db_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_db_tests.erl
new file mode 100644
index 000000000..053441007
--- /dev/null
+++ b/src/couch_replicator/test/eunit/couch_replicator_db_tests.erl
@@ -0,0 +1,332 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_db_tests).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_replicator/src/couch_replicator.hrl").
+-include_lib("fabric/test/fabric2_test.hrl").
+
+
+couch_replicator_db_test_() ->
+ {
+ "Replications are started from docs in _replicator dbs",
+ {
+ setup,
+ fun couch_replicator_test_helper:start_couch/0,
+ fun couch_replicator_test_helper:stop_couch/1,
+ {
+ foreach,
+ fun setup/0,
+ fun teardown/1,
+ [
+ ?TDEF_FE(default_replicator_db_is_created),
+ ?TDEF_FE(continuous_replication_created_from_doc, 15),
+ ?TDEF_FE(normal_replication_created_from_doc, 15),
+ ?TDEF_FE(replicator_db_deleted, 15),
+ ?TDEF_FE(replicator_db_recreated, 15),
+ ?TDEF_FE(invalid_replication_docs),
+ ?TDEF_FE(duplicate_persistent_replication, 15),
+ ?TDEF_FE(duplicate_transient_replication, 30)
+ ]
+ }
+ }
+ }.
+
+
+setup() ->
+ Source = couch_replicator_test_helper:create_db(),
+ create_doc(Source, #{<<"_id">> => <<"doc1">>}),
+ Target = couch_replicator_test_helper:create_db(),
+ Name = ?tempdb(),
+ RepDb = couch_replicator_test_helper:create_db(<<Name/binary,
+ "/_replicator">>),
+ config:set("replicator", "stats_update_interval_sec", "0", false),
+ config:set("replicator", "create_replicator_db", "false", false),
+ config:set("couchdb", "enable_database_recovery", "false", false),
+ config:set("replicator", "min_backoff_penalty_sec", "1", false),
+ {Source, Target, RepDb}.
+
+
+teardown({Source, Target, RepDb}) ->
+ config:delete("replicator", "stats_update_interval_sec", false),
+ config:delete("replicator", "create_replicator_db", false),
+ config:delete("couchdb", "enable_database_recovery", false),
+ config:delete("replicator", "min_backoff_penalty_sec", false),
+
+ couch_replicator_test_helper:delete_db(RepDb),
+ couch_replicator_test_helper:delete_db(?REP_DB_NAME),
+ couch_replicator_test_helper:delete_db(Source),
+ couch_replicator_test_helper:delete_db(Target).
+
+
+default_replicator_db_is_created({_, _, _}) ->
+ config:set("replicator", "create_replicator_db", "true", false),
+ ?assertEqual(ignore, couch_replicator:ensure_rep_db_exists()),
+ ?assertMatch({ok, #{}}, fabric2_db:open(?REP_DB_NAME, [])).
+
+
+continuous_replication_created_from_doc({Source, Target, RepDb}) ->
+ DocId = <<"rdoc1">>,
+ RDoc = rep_doc(Source, Target, DocId, #{<<"continuous">> => true}),
+ create_doc(RepDb, RDoc),
+ wait_scheduler_docs_state(RepDb, DocId, <<"running">>),
+
+ {Code, DocInfo} = scheduler_docs(RepDb, DocId),
+ ?assertEqual(200, Code),
+ ?assertMatch(#{
+ <<"database">> := RepDb,
+ <<"doc_id">> := DocId
+ }, DocInfo),
+
+ RepId = maps:get(<<"id">>, DocInfo),
+
+ ?assertMatch([#{
+ <<"database">> := RepDb,
+ <<"doc_id">> := DocId,
+ <<"id">> := RepId,
+ <<"state">> := <<"running">>
+ }], couch_replicator_test_helper:scheduler_jobs()),
+
+ ?assertMatch({200, #{
+ <<"database">> := RepDb,
+ <<"doc_id">> := DocId,
+ <<"id">> := RepId,
+ <<"state">> := <<"running">>
+ }}, scheduler_jobs(RepId)),
+
+ delete_doc(RepDb, DocId),
+ wait_scheduler_docs_not_found(RepDb, DocId),
+ ?assertMatch({404, #{}}, scheduler_jobs(RepId)).
+
+
+normal_replication_created_from_doc({Source, Target, RepDb}) ->
+ DocId = <<"rdoc2">>,
+ RDoc = rep_doc(Source, Target, DocId),
+ create_doc(RepDb, RDoc),
+ wait_scheduler_docs_state(RepDb, DocId, <<"completed">>),
+
+ {Code, DocInfo} = scheduler_docs(RepDb, DocId),
+ ?assertEqual(200, Code),
+ ?assertMatch(#{
+ <<"database">> := RepDb,
+ <<"doc_id">> := DocId,
+ <<"state">> := <<"completed">>,
+ <<"info">> := #{
+ <<"docs_written">> := 1,
+ <<"docs_read">> := 1,
+ <<"missing_revisions_found">> := 1
+ }
+ }, DocInfo),
+
+ wait_doc_state(RepDb, DocId, <<"completed">>),
+ ?assertMatch(#{
+ <<"_replication_state">> := <<"completed">>,
+ <<"_replication_stats">> := #{
+ <<"docs_written">> := 1,
+ <<"docs_read">> := 1,
+ <<"missing_revisions_found">> := 1
+ }
+ }, read_doc(RepDb, DocId)),
+
+ delete_doc(RepDb, DocId),
+ wait_scheduler_docs_not_found(RepDb, DocId).
+
+
+replicator_db_deleted({Source, Target, RepDb}) ->
+ DocId = <<"rdoc3">>,
+ RDoc = rep_doc(Source, Target, DocId, #{<<"continuous">> => true}),
+ create_doc(RepDb, RDoc),
+ wait_scheduler_docs_state(RepDb, DocId, <<"running">>),
+ fabric2_db:delete(RepDb, [?ADMIN_CTX]),
+ wait_scheduler_docs_not_found(RepDb, DocId).
+
+
+replicator_db_recreated({Source, Target, RepDb}) ->
+ DocId = <<"rdoc4">>,
+ RDoc = rep_doc(Source, Target, DocId, #{<<"continuous">> => true}),
+ create_doc(RepDb, RDoc),
+ wait_scheduler_docs_state(RepDb, DocId, <<"running">>),
+
+ config:set("couchdb", "enable_database_recovery", "true", false),
+ fabric2_db:delete(RepDb, [?ADMIN_CTX]),
+ wait_scheduler_docs_not_found(RepDb, DocId),
+
+ Opts = [{start_key, RepDb}, {end_key, RepDb}],
+ {ok, [DbInfo]} = fabric2_db:list_deleted_dbs_info(Opts),
+ {_, Timestamp} = lists:keyfind(timestamp, 1, DbInfo),
+ ok = fabric2_db:undelete(RepDb, RepDb, Timestamp, [?ADMIN_CTX]),
+ wait_scheduler_docs_state(RepDb, DocId, <<"running">>),
+
+ config:set("couchdb", "enable_database_recovery", "false", false),
+ fabric2_db:delete(RepDb, [?ADMIN_CTX]),
+ wait_scheduler_docs_not_found(RepDb, DocId).
+
+
+invalid_replication_docs({_, _, RepDb}) ->
+ Docs = [
+ #{
+ <<"_id">> => <<"1">>,
+ <<"source">> => <<"http://127.0.0.1:1000">>
+ },
+ #{
+ <<"_id">> => <<"1">>,
+ <<"target">> => <<"http://127.0.0.1:1001">>
+ },
+ #{
+ <<"_id">> => <<"1">>
+ },
+ #{
+ <<"_id">> => <<"1">>,
+ <<"source">> => <<"http://127.0.0.1:1002">>,
+ <<"target">> => <<"http://127.0.0.1:1003">>,
+ <<"create_target">> => <<"bad">>
+ },
+ #{
+ <<"_id">> => <<"1">>,
+ <<"source">> => #{<<"junk">> => 42},
+ <<"target">> => <<"http://127.0.0.1:1004">>
+ },
+ #{
+ <<"_id">> => <<"1">>,
+ <<"source">> => <<"http://127.0.0.1:1005">>,
+ <<"target">> => <<"http://127.0.0.1:1006">>,
+ <<"selector">> => #{},
+ <<"filter">> => <<"a/b">>
+ },
+ #{
+ <<"_id">> => <<"1">>,
+ <<"source">> => <<"http://127.0.0.1:1007">>,
+ <<"target">> => <<"https://127.0.0.1:1008">>,
+ <<"doc_ids">> => 42
+ }
+ ],
+ lists:foreach(fun(Doc) ->
+ ?assertThrow({forbidden, _}, create_doc(RepDb, Doc))
+ end, Docs).
+
+
+duplicate_persistent_replication({Source, Target, RepDb}) ->
+ DocId1 = <<"rdoc5">>,
+ RDoc1 = rep_doc(Source, Target, DocId1, #{<<"continuous">> => true}),
+ create_doc(RepDb, RDoc1),
+ wait_scheduler_docs_state(RepDb, DocId1, <<"running">>),
+
+ DocId2 = <<"rdoc6">>,
+ RDoc2 = rep_doc(Source, Target, DocId2, #{<<"continuous">> => true}),
+ create_doc(RepDb, RDoc2),
+ wait_scheduler_docs_state(RepDb, DocId2, <<"failed">>),
+
+ delete_doc(RepDb, DocId1),
+ delete_doc(RepDb, DocId2),
+
+ wait_scheduler_docs_not_found(RepDb, DocId1),
+ wait_scheduler_docs_not_found(RepDb, DocId2).
+
+
+duplicate_transient_replication({Source, Target, RepDb}) ->
+ {ok, _Pid, RepId} = couch_replicator_test_helper:replicate_continuous(
+ Source, Target),
+
+ DocId = <<"rdoc7">>,
+ RDoc = rep_doc(Source, Target, DocId, #{<<"continuous">> => true}),
+ create_doc(RepDb, RDoc),
+ wait_scheduler_docs_state(RepDb, DocId, <<"crashing">>),
+
+ couch_replicator_test_helper:cancel(RepId),
+ wait_reschedule_docs_state(RepDb, DocId, <<"running">>),
+
+ delete_doc(RepDb, DocId),
+ wait_scheduler_docs_not_found(RepDb, DocId).
+
+
+scheduler_jobs(Id) ->
+ SUrl = couch_replicator_test_helper:server_url(),
+ Url = lists:flatten(io_lib:format("~s/_scheduler/jobs/~s", [SUrl, Id])),
+ {ok, Code, _, Body} = test_request:get(Url, []),
+ {Code, jiffy:decode(Body, [return_maps])}.
+
+
+scheduler_docs(DbName, DocId) ->
+ SUrl = couch_replicator_test_helper:server_url(),
+ Fmt = "~s/_scheduler/docs/~s/~s",
+ Url = lists:flatten(io_lib:format(Fmt, [SUrl, DbName, DocId])),
+ {ok, Code, _, Body} = test_request:get(Url, []),
+ {Code, jiffy:decode(Body, [return_maps])}.
+
+
+rep_doc(Source, Target, DocId) ->
+ rep_doc(Source, Target, DocId, #{}).
+
+
+rep_doc(Source, Target, DocId, #{} = Extra) ->
+ maps:merge(#{
+ <<"_id">> => DocId,
+ <<"source">> => couch_replicator_test_helper:db_url(Source),
+ <<"target">> => couch_replicator_test_helper:db_url(Target)
+ }, Extra).
+
+
+create_doc(DbName, Doc) ->
+ couch_replicator_test_helper:create_docs(DbName, [Doc]).
+
+
+delete_doc(DbName, DocId) ->
+ {ok, Db} = fabric2_db:open(DbName, [?ADMIN_CTX]),
+ {ok, Doc} = fabric2_db:open_doc(Db, DocId),
+ Doc1 = Doc#doc{deleted = true},
+ {ok, _} = fabric2_db:update_doc(Db, Doc1, []).
+
+
+read_doc(DbName, DocId) ->
+ {ok, Db} = fabric2_db:open(DbName, [?ADMIN_CTX]),
+ {ok, Doc} = fabric2_db:open_doc(Db, DocId, [ejson_body]),
+ Body = Doc#doc.body,
+ couch_util:json_decode(couch_util:json_encode(Body), [return_maps]).
+
+
+wait_scheduler_docs_state(DbName, DocId, State) ->
+ test_util:wait(fun() ->
+ case scheduler_docs(DbName, DocId) of
+ {200, #{<<"state">> := State} = Res} -> Res;
+ {_, _} -> wait
+ end
+ end, 10000, 250).
+
+
+wait_scheduler_docs_not_found(DbName, DocId) ->
+ test_util:wait(fun() ->
+ case scheduler_docs(DbName, DocId) of
+ {404, _} -> ok;
+ {_, _} -> wait
+ end
+ end, 10000, 250).
+
+
+wait_reschedule_docs_state(DbName, DocId, State) ->
+ test_util:wait(fun() ->
+ couch_replicator_job_server:reschedule(),
+ case scheduler_docs(DbName, DocId) of
+ {200, #{<<"state">> := State} = Res} -> Res;
+ {_, _} -> wait
+ end
+ end, 10000, 500).
+
+
+wait_doc_state(DbName, DocId, State) ->
+ test_util:wait(fun() ->
+ case read_doc(DbName, DocId) of
+ #{<<"_replication_state">> := State} -> ok;
+ #{} -> wait
+ end
+ end, 10000, 250).
diff --git a/src/couch_replicator/test/eunit/couch_replicator_filtered_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_filtered_tests.erl
index 7ac9a4d71..4d72c84f2 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_filtered_tests.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_filtered_tests.erl
@@ -12,17 +12,20 @@
-module(couch_replicator_filtered_tests).
+
-include_lib("couch/include/couch_eunit.hrl").
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_replicator/src/couch_replicator.hrl").
+-include_lib("fabric/test/fabric2_test.hrl").
--define(DDOC, {[
- {<<"_id">>, <<"_design/filter_ddoc">>},
- {<<"filters">>, {[
- {<<"testfilter">>, <<"
+-define(DDOC_ID, <<"_design/filter_ddoc">>).
+-define(DDOC, #{
+ <<"_id">> => ?DDOC_ID,
+ <<"filters">> => #{
+ <<"testfilter">> => <<"
function(doc, req){if (doc.class == 'mammal') return true;}
- ">>},
- {<<"queryfilter">>, <<"
+ ">>,
+ <<"queryfilter">> => <<"
function(doc, req) {
if (doc.class && req.query.starts) {
return doc.class.indexOf(req.query.starts) === 0;
@@ -31,99 +34,87 @@
return false;
}
}
- ">>}
- ]}},
- {<<"views">>, {[
- {<<"mammals">>, {[
- {<<"map">>, <<"
+ ">>
+ },
+ <<"views">> => #{
+ <<"mammals">> => #{
+ <<"map">> => <<"
function(doc) {
if (doc.class == 'mammal') {
emit(doc._id, null);
}
}
- ">>}
- ]}}
- ]}}
-]}).
-
-setup(_) ->
- Ctx = test_util:start_couch([couch_replicator]),
- Source = create_db(),
- create_docs(Source),
- Target = create_db(),
- {Ctx, {Source, Target}}.
+ ">>
+ }
+ }
+}).
-teardown(_, {Ctx, {Source, Target}}) ->
- delete_db(Source),
- delete_db(Target),
- ok = application:stop(couch_replicator),
- ok = test_util:stop_couch(Ctx).
filtered_replication_test_() ->
- Pairs = [{remote, remote}],
{
- "Filtered replication tests",
+ "Replications with filters tests",
{
- foreachx,
- fun setup/1, fun teardown/2,
- [{Pair, fun should_succeed/2} || Pair <- Pairs]
+ setup,
+ fun couch_replicator_test_helper:start_couch/0,
+ fun couch_replicator_test_helper:stop_couch/1,
+ {
+ foreach,
+ fun setup/0,
+ fun teardown/1,
+ [
+ ?TDEF_FE(filtered_replication_test),
+ ?TDEF_FE(query_filtered_replication_test),
+ ?TDEF_FE(view_filtered_replication_test),
+ ?TDEF_FE(replication_id_changes_if_filter_changes, 15)
+ ]
+ }
}
}.
-query_filtered_replication_test_() ->
- Pairs = [{remote, remote}],
- {
- "Filtered with query replication tests",
- {
- foreachx,
- fun setup/1, fun teardown/2,
- [{Pair, fun should_succeed_with_query/2} || Pair <- Pairs]
- }
- }.
-view_filtered_replication_test_() ->
- Pairs = [{remote, remote}],
- {
- "Filtered with a view replication tests",
- {
- foreachx,
- fun setup/1, fun teardown/2,
- [{Pair, fun should_succeed_with_view/2} || Pair <- Pairs]
- }
- }.
+setup() ->
+ Source = couch_replicator_test_helper:create_db(),
+ create_docs(Source),
+ Target = couch_replicator_test_helper:create_db(),
+ config:set("replicator", "stats_update_interval_sec", "0", false),
+ config:set("replicator", "interval_sec", "1", false),
+ {Source, Target}.
+
+
+teardown({Source, Target}) ->
+ config:delete("replicator", "stats_update_interval_sec", false),
+ config:delete("replicator", "checkpoint_interval", false),
+ config:delete("replicator", "interval_sec", false),
+ couch_replicator_test_helper:delete_db(Source),
+ couch_replicator_test_helper:delete_db(Target).
-should_succeed({From, To}, {_Ctx, {Source, Target}}) ->
+
+filtered_replication_test({Source, Target}) ->
RepObject = {[
- {<<"source">>, db_url(From, Source)},
- {<<"target">>, db_url(To, Target)},
+ {<<"source">>, Source},
+ {<<"target">>, Target},
{<<"filter">>, <<"filter_ddoc/testfilter">>}
]},
- {ok, _} = couch_replicator:replicate(RepObject, ?ADMIN_USER),
- %% FilteredFun is an Erlang version of following JS function
- %% function(doc, req){if (doc.class == 'mammal') return true;}
+ {ok, _} = couch_replicator_test_helper:replicate(RepObject),
FilterFun = fun(_DocId, {Props}) ->
couch_util:get_value(<<"class">>, Props) == <<"mammal">>
end,
{ok, TargetDbInfo, AllReplies} = compare_dbs(Source, Target, FilterFun),
- {lists:flatten(io_lib:format("~p -> ~p", [From, To])), [
- {"Target DB has proper number of docs",
- ?_assertEqual(1, proplists:get_value(doc_count, TargetDbInfo))},
- {"Target DB doesn't have deleted docs",
- ?_assertEqual(0, proplists:get_value(doc_del_count, TargetDbInfo))},
- {"All the docs filtered as expected",
- ?_assert(lists:all(fun(Valid) -> Valid end, AllReplies))}
- ]}.
-
-should_succeed_with_query({From, To}, {_Ctx, {Source, Target}}) ->
+ ?assertEqual(1, proplists:get_value(doc_count, TargetDbInfo)),
+ ?assertEqual(0, proplists:get_value(doc_del_count, TargetDbInfo)),
+ ?assert(lists:all(fun(Valid) -> Valid end, AllReplies)).
+
+
+query_filtered_replication_test({Source, Target}) ->
RepObject = {[
- {<<"source">>, db_url(From, Source)},
- {<<"target">>, db_url(To, Target)},
+ {<<"source">>, Source},
+ {<<"target">>, Target},
{<<"filter">>, <<"filter_ddoc/queryfilter">>},
{<<"query_params">>, {[
{<<"starts">>, <<"a">>}
]}}
]},
- {ok, _} = couch_replicator:replicate(RepObject, ?ADMIN_USER),
+ {ok, _} = couch_replicator_test_helper:replicate(RepObject),
FilterFun = fun(_DocId, {Props}) ->
case couch_util:get_value(<<"class">>, Props) of
<<"a", _/binary>> -> true;
@@ -131,109 +122,144 @@ should_succeed_with_query({From, To}, {_Ctx, {Source, Target}}) ->
end
end,
{ok, TargetDbInfo, AllReplies} = compare_dbs(Source, Target, FilterFun),
- {lists:flatten(io_lib:format("~p -> ~p", [From, To])), [
- {"Target DB has proper number of docs",
- ?_assertEqual(2, proplists:get_value(doc_count, TargetDbInfo))},
- {"Target DB doesn't have deleted docs",
- ?_assertEqual(0, proplists:get_value(doc_del_count, TargetDbInfo))},
- {"All the docs filtered as expected",
- ?_assert(lists:all(fun(Valid) -> Valid end, AllReplies))}
- ]}.
-
-should_succeed_with_view({From, To}, {_Ctx, {Source, Target}}) ->
+ ?assertEqual(2, proplists:get_value(doc_count, TargetDbInfo)),
+ ?assertEqual(0, proplists:get_value(doc_del_count, TargetDbInfo)),
+ ?assert(lists:all(fun(Valid) -> Valid end, AllReplies)).
+
+
+view_filtered_replication_test({Source, Target}) ->
RepObject = {[
- {<<"source">>, db_url(From, Source)},
- {<<"target">>, db_url(To, Target)},
+ {<<"source">>, Source},
+ {<<"target">>, Target},
{<<"filter">>, <<"_view">>},
{<<"query_params">>, {[
{<<"view">>, <<"filter_ddoc/mammals">>}
]}}
]},
- {ok, _} = couch_replicator:replicate(RepObject, ?ADMIN_USER),
+ {ok, _} = couch_replicator_test_helper:replicate(RepObject),
FilterFun = fun(_DocId, {Props}) ->
couch_util:get_value(<<"class">>, Props) == <<"mammal">>
end,
{ok, TargetDbInfo, AllReplies} = compare_dbs(Source, Target, FilterFun),
- {lists:flatten(io_lib:format("~p -> ~p", [From, To])), [
- {"Target DB has proper number of docs",
- ?_assertEqual(1, proplists:get_value(doc_count, TargetDbInfo))},
- {"Target DB doesn't have deleted docs",
- ?_assertEqual(0, proplists:get_value(doc_del_count, TargetDbInfo))},
- {"All the docs filtered as expected",
- ?_assert(lists:all(fun(Valid) -> Valid end, AllReplies))}
- ]}.
+ ?assertEqual(1, proplists:get_value(doc_count, TargetDbInfo)),
+ ?assertEqual(0, proplists:get_value(doc_del_count, TargetDbInfo)),
+ ?assert(lists:all(fun(Valid) -> Valid end, AllReplies)).
+
+
+replication_id_changes_if_filter_changes({Source, Target}) ->
+ config:set("replicator", "checkpoint_interval", "500", false),
+ Rep = {[
+ {<<"source">>, Source},
+ {<<"target">>, Target},
+ {<<"filter">>, <<"filter_ddoc/testfilter">>},
+ {<<"continuous">>, true}
+ ]},
+ {ok, _, RepId1} = couch_replicator_test_helper:replicate_continuous(Rep),
+
+ wait_scheduler_docs_written(1),
+
+ ?assertMatch([#{<<"id">> := RepId1}],
+ couch_replicator_test_helper:scheduler_jobs()),
+
+ FilterFun1 = fun(_, {Props}) ->
+ couch_util:get_value(<<"class">>, Props) == <<"mammal">>
+ end,
+ {ok, TargetDbInfo1, AllReplies1} = compare_dbs(Source, Target, FilterFun1),
+ ?assertEqual(1, proplists:get_value(doc_count, TargetDbInfo1)),
+ ?assert(lists:all(fun(Valid) -> Valid end, AllReplies1)),
+
+ {ok, SourceDb} = fabric2_db:open(Source, [?ADMIN_CTX]),
+ {ok, DDoc1} = fabric2_db:open_doc(SourceDb, ?DDOC_ID),
+ Flt = <<"function(doc, req) {if (doc.class == 'reptiles') return true};">>,
+ DDoc2 = DDoc1#doc{body = {[
+ {<<"filters">>, {[
+ {<<"testfilter">>, Flt}
+ ]}}
+ ]}},
+ {ok, {_, _}} = fabric2_db:update_doc(SourceDb, DDoc2),
+ Info = wait_scheduler_repid_change(RepId1),
+
+ RepId2 = maps:get(<<"id">>, Info),
+ ?assert(RepId1 =/= RepId2),
+
+ wait_scheduler_docs_written(1),
+
+ FilterFun2 = fun(_, {Props}) ->
+ Class = couch_util:get_value(<<"class">>, Props),
+ Class == <<"mammal">> orelse Class == <<"reptiles">>
+ end,
+ {ok, TargetDbInfo2, AllReplies2} = compare_dbs(Source, Target, FilterFun2),
+ ?assertEqual(2, proplists:get_value(doc_count, TargetDbInfo2)),
+ ?assert(lists:all(fun(Valid) -> Valid end, AllReplies2)),
+
+ couch_replicator_test_helper:cancel(RepId2).
+
compare_dbs(Source, Target, FilterFun) ->
- {ok, SourceDb} = couch_db:open_int(Source, []),
- {ok, TargetDb} = couch_db:open_int(Target, []),
- {ok, TargetDbInfo} = couch_db:get_db_info(TargetDb),
- Fun = fun(FullDocInfo, Acc) ->
- {ok, DocId, SourceDoc} = read_doc(SourceDb, FullDocInfo),
- TargetReply = read_doc(TargetDb, DocId),
- case FilterFun(DocId, SourceDoc) of
- true ->
- ValidReply = {ok, DocId, SourceDoc} == TargetReply,
- {ok, [ValidReply|Acc]};
- false ->
- ValidReply = {not_found, missing} == TargetReply,
- {ok, [ValidReply|Acc]}
+ {ok, TargetDb} = fabric2_db:open(Target, [?ADMIN_CTX]),
+ {ok, TargetDbInfo} = fabric2_db:get_db_info(TargetDb),
+ Fun = fun(SrcDoc, TgtDoc, Acc) ->
+ case FilterFun(SrcDoc#doc.id, SrcDoc#doc.body) of
+ true -> [SrcDoc == TgtDoc | Acc];
+ false -> [not_found == TgtDoc | Acc]
end
end,
- {ok, AllReplies} = couch_db:fold_docs(SourceDb, Fun, [], []),
- ok = couch_db:close(SourceDb),
- ok = couch_db:close(TargetDb),
- {ok, TargetDbInfo, AllReplies}.
-
-read_doc(Db, DocIdOrInfo) ->
- case couch_db:open_doc(Db, DocIdOrInfo) of
- {ok, Doc} ->
- {Props} = couch_doc:to_json_obj(Doc, [attachments]),
- DocId = couch_util:get_value(<<"_id">>, Props),
- {ok, DocId, {Props}};
- Error ->
- Error
- end.
-
-create_db() ->
- DbName = ?tempdb(),
- {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
- ok = couch_db:close(Db),
- DbName.
+ Res = couch_replicator_test_helper:compare_fold(Source, Target, Fun, []),
+ {ok, TargetDbInfo, Res}.
+
create_docs(DbName) ->
- {ok, Db} = couch_db:open(DbName, [?ADMIN_CTX]),
- DDoc = couch_doc:from_json_obj(?DDOC),
- Doc1 = couch_doc:from_json_obj({[
- {<<"_id">>, <<"doc1">>},
- {<<"class">>, <<"mammal">>},
- {<<"value">>, 1}
-
- ]}),
- Doc2 = couch_doc:from_json_obj({[
- {<<"_id">>, <<"doc2">>},
- {<<"class">>, <<"amphibians">>},
- {<<"value">>, 2}
-
- ]}),
- Doc3 = couch_doc:from_json_obj({[
- {<<"_id">>, <<"doc3">>},
- {<<"class">>, <<"reptiles">>},
- {<<"value">>, 3}
-
- ]}),
- Doc4 = couch_doc:from_json_obj({[
- {<<"_id">>, <<"doc4">>},
- {<<"class">>, <<"arthropods">>},
- {<<"value">>, 2}
-
- ]}),
- {ok, _} = couch_db:update_docs(Db, [DDoc, Doc1, Doc2, Doc3, Doc4]),
- couch_db:close(Db).
-
-delete_db(DbName) ->
- ok = couch_server:delete(DbName, [?ADMIN_CTX]).
-
-db_url(remote, DbName) ->
- Addr = config:get("httpd", "bind_address", "127.0.0.1"),
- Port = mochiweb_socket_server:get(couch_httpd, port),
- ?l2b(io_lib:format("http://~s:~b/~s", [Addr, Port, DbName])).
+ couch_replicator_test_helper:create_docs(DbName, [
+ ?DDOC,
+ #{
+ <<"_id">> => <<"doc1">>,
+ <<"class">> => <<"mammal">>,
+ <<"value">> => 1
+ },
+ #{
+ <<"_id">> => <<"doc2">>,
+ <<"class">> => <<"amphibians">>,
+ <<"value">> => 2
+ },
+ #{
+ <<"_id">> => <<"doc3">>,
+ <<"class">> => <<"reptiles">>,
+ <<"value">> => 3
+ },
+ #{
+ <<"_id">> => <<"doc4">>,
+ <<"class">> => <<"arthropods">>,
+ <<"value">> => 2
+ }
+ ]).
+
+
+wait_scheduler_docs_written(DocsWritten) ->
+ test_util:wait(fun() ->
+ case couch_replicator_test_helper:scheduler_jobs() of
+ [] ->
+ wait;
+ [#{<<"info">> := null}] ->
+ wait;
+ [#{<<"info">> := Info}] ->
+ case Info of
+ #{<<"docs_written">> := DocsWritten} -> Info;
+ #{} -> wait
+ end
+ end
+ end, 10000, 250).
+
+
+wait_scheduler_repid_change(OldRepId) ->
+ test_util:wait(fun() ->
+ case couch_replicator_test_helper:scheduler_jobs() of
+ [] ->
+ wait;
+ [#{<<"id">> := OldRepId}] ->
+ wait;
+ [#{<<"id">> := null}] ->
+ wait;
+ [#{<<"id">> := NewId} = Info] when is_binary(NewId) ->
+ Info
+ end
+ end, 10000, 250).
diff --git a/src/couch_replicator/test/eunit/couch_replicator_httpc_pool_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_httpc_pool_tests.erl
index c4ad4e9b6..6c61446cc 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_httpc_pool_tests.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_httpc_pool_tests.erl
@@ -12,17 +12,13 @@
-module(couch_replicator_httpc_pool_tests).
+
-include_lib("couch/include/couch_eunit.hrl").
-include_lib("couch/include/couch_db.hrl").
-
--define(TIMEOUT, 1000).
+-include_lib("fabric/test/fabric2_test.hrl").
-setup() ->
- spawn_pool().
-
-teardown(Pool) ->
- stop_pool(Pool).
+-define(TIMEOUT, 1000).
httpc_pool_test_() ->
@@ -30,75 +26,81 @@ httpc_pool_test_() ->
"httpc pool tests",
{
setup,
- fun() -> test_util:start_couch([couch_replicator]) end, fun test_util:stop_couch/1,
+ fun couch_replicator_test_helper:start_couch/0,
+ fun couch_replicator_test_helper:stop_couch/1,
{
foreach,
- fun setup/0, fun teardown/1,
+ fun setup/0,
+ fun teardown/1,
[
- fun should_block_new_clients_when_full/1,
- fun should_replace_worker_on_death/1
+ ?TDEF_FE(should_block_new_clients_when_full),
+ ?TDEF_FE(should_replace_worker_on_death)
]
}
}
}.
+setup() ->
+ spawn_pool().
+
+
+teardown(Pool) ->
+ stop_pool(Pool).
+
+
should_block_new_clients_when_full(Pool) ->
- ?_test(begin
- Client1 = spawn_client(Pool),
- Client2 = spawn_client(Pool),
- Client3 = spawn_client(Pool),
+ Client1 = spawn_client(Pool),
+ Client2 = spawn_client(Pool),
+ Client3 = spawn_client(Pool),
+
+ ?assertEqual(ok, ping_client(Client1)),
+ ?assertEqual(ok, ping_client(Client2)),
+ ?assertEqual(ok, ping_client(Client3)),
- ?assertEqual(ok, ping_client(Client1)),
- ?assertEqual(ok, ping_client(Client2)),
- ?assertEqual(ok, ping_client(Client3)),
+ Worker1 = get_client_worker(Client1, "1"),
+ Worker2 = get_client_worker(Client2, "2"),
+ Worker3 = get_client_worker(Client3, "3"),
- Worker1 = get_client_worker(Client1, "1"),
- Worker2 = get_client_worker(Client2, "2"),
- Worker3 = get_client_worker(Client3, "3"),
+ ?assert(is_process_alive(Worker1)),
+ ?assert(is_process_alive(Worker2)),
+ ?assert(is_process_alive(Worker3)),
- ?assert(is_process_alive(Worker1)),
- ?assert(is_process_alive(Worker2)),
- ?assert(is_process_alive(Worker3)),
+ ?assertNotEqual(Worker1, Worker2),
+ ?assertNotEqual(Worker2, Worker3),
+ ?assertNotEqual(Worker3, Worker1),
- ?assertNotEqual(Worker1, Worker2),
- ?assertNotEqual(Worker2, Worker3),
- ?assertNotEqual(Worker3, Worker1),
+ Client4 = spawn_client(Pool),
+ ?assertEqual(timeout, ping_client(Client4)),
- Client4 = spawn_client(Pool),
- ?assertEqual(timeout, ping_client(Client4)),
+ ?assertEqual(ok, stop_client(Client1)),
+ ?assertEqual(ok, ping_client(Client4)),
- ?assertEqual(ok, stop_client(Client1)),
- ?assertEqual(ok, ping_client(Client4)),
+ Worker4 = get_client_worker(Client4, "4"),
+ ?assertEqual(Worker1, Worker4),
- Worker4 = get_client_worker(Client4, "4"),
- ?assertEqual(Worker1, Worker4),
+ lists:foreach(fun(C) ->
+ ?assertEqual(ok, stop_client(C))
+ end, [Client2, Client3, Client4]).
- lists:foreach(
- fun(C) ->
- ?assertEqual(ok, stop_client(C))
- end, [Client2, Client3, Client4])
- end).
should_replace_worker_on_death(Pool) ->
- ?_test(begin
- Client1 = spawn_client(Pool),
- ?assertEqual(ok, ping_client(Client1)),
- Worker1 = get_client_worker(Client1, "1"),
- ?assert(is_process_alive(Worker1)),
+ Client1 = spawn_client(Pool),
+ ?assertEqual(ok, ping_client(Client1)),
+ Worker1 = get_client_worker(Client1, "1"),
+ ?assert(is_process_alive(Worker1)),
- ?assertEqual(ok, kill_client_worker(Client1)),
- ?assertNot(is_process_alive(Worker1)),
- ?assertEqual(ok, stop_client(Client1)),
+ ?assertEqual(ok, kill_client_worker(Client1)),
+ ?assertNot(is_process_alive(Worker1)),
+ ?assertEqual(ok, stop_client(Client1)),
- Client2 = spawn_client(Pool),
- ?assertEqual(ok, ping_client(Client2)),
- Worker2 = get_client_worker(Client2, "2"),
- ?assert(is_process_alive(Worker2)),
+ Client2 = spawn_client(Pool),
+ ?assertEqual(ok, ping_client(Client2)),
+ Worker2 = get_client_worker(Client2, "2"),
+ ?assert(is_process_alive(Worker2)),
- ?assertNotEqual(Worker1, Worker2),
- ?assertEqual(ok, stop_client(Client2))
- end).
+ ?assertNotEqual(Worker1, Worker2),
+ ?assertEqual(ok, stop_client(Client2)).
spawn_client(Pool) ->
@@ -110,6 +112,7 @@ spawn_client(Pool) ->
end),
{Pid, Ref}.
+
ping_client({Pid, Ref}) ->
Pid ! ping,
receive
@@ -119,18 +122,18 @@ ping_client({Pid, Ref}) ->
timeout
end.
+
get_client_worker({Pid, Ref}, ClientName) ->
Pid ! get_worker,
receive
{worker, Ref, Worker} ->
Worker
after ?TIMEOUT ->
- erlang:error(
- {assertion_failed,
- [{module, ?MODULE}, {line, ?LINE},
- {reason, "Timeout getting client " ++ ClientName ++ " worker"}]})
+ erlang:error({assertion_failed, [{module, ?MODULE}, {line, ?LINE},
+ {reason, "Timeout getting client " ++ ClientName ++ " worker"}]})
end.
+
stop_client({Pid, Ref}) ->
Pid ! stop,
receive
@@ -140,6 +143,7 @@ stop_client({Pid, Ref}) ->
timeout
end.
+
kill_client_worker({Pid, Ref}) ->
Pid ! get_worker,
receive
@@ -150,6 +154,7 @@ kill_client_worker({Pid, Ref}) ->
timeout
end.
+
loop(Parent, Ref, Worker, Pool) ->
receive
ping ->
@@ -163,12 +168,14 @@ loop(Parent, Ref, Worker, Pool) ->
Parent ! {stop, Ref}
end.
+
spawn_pool() ->
- Host = config:get("httpd", "bind_address", "127.0.0.1"),
- Port = config:get("httpd", "port", "5984"),
+ Host = config:get("chttpd", "bind_address", "127.0.0.1"),
+ Port = config:get("chttpd", "port", "5984"),
{ok, Pool} = couch_replicator_httpc_pool:start_link(
"http://" ++ Host ++ ":" ++ Port, [{max_connections, 3}]),
Pool.
+
stop_pool(Pool) ->
ok = couch_replicator_httpc_pool:stop(Pool).
diff --git a/src/couch_replicator/test/eunit/couch_replicator_id_too_long_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_id_too_long_tests.erl
index a4696c4b8..3a0e6f7bd 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_id_too_long_tests.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_id_too_long_tests.erl
@@ -15,76 +15,57 @@
-include_lib("couch/include/couch_eunit.hrl").
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_replicator/src/couch_replicator.hrl").
-
-
-setup(_) ->
- Ctx = test_util:start_couch([couch_replicator]),
- Source = create_db(),
- create_doc(Source),
- Target = create_db(),
- {Ctx, {Source, Target}}.
-
-
-teardown(_, {Ctx, {Source, Target}}) ->
- delete_db(Source),
- delete_db(Target),
- config:set("replicator", "max_document_id_length", "infinity"),
- ok = test_util:stop_couch(Ctx).
+-include_lib("fabric/test/fabric2_test.hrl").
id_too_long_replication_test_() ->
- Pairs = [{remote, remote}],
{
"Doc id too long tests",
{
- foreachx,
- fun setup/1, fun teardown/2,
- [{Pair, fun should_succeed/2} || Pair <- Pairs] ++
- [{Pair, fun should_fail/2} || Pair <- Pairs]
+ setup,
+ fun couch_replicator_test_helper:start_couch/0,
+ fun couch_replicator_test_helper:stop_couch/1,
+ {
+ foreach,
+ fun setup/0,
+ fun teardown/1,
+ [
+ ?TDEF_FE(should_succeed),
+ ?TDEF_FE(should_fail)
+
+ ]
+ }
}
}.
-should_succeed({From, To}, {_Ctx, {Source, Target}}) ->
- RepObject = {[
- {<<"source">>, db_url(From, Source)},
- {<<"target">>, db_url(To, Target)}
- ]},
- config:set("replicator", "max_document_id_length", "5"),
- {ok, _} = couch_replicator:replicate(RepObject, ?ADMIN_USER),
- ?_assertEqual(ok, couch_replicator_test_helper:compare_dbs(Source, Target)).
-
+setup() ->
+ Source = couch_replicator_test_helper:create_db(),
+ create_doc(Source),
+ Target = couch_replicator_test_helper:create_db(),
+ {Source, Target}.
-should_fail({From, To}, {_Ctx, {Source, Target}}) ->
- RepObject = {[
- {<<"source">>, db_url(From, Source)},
- {<<"target">>, db_url(To, Target)}
- ]},
- config:set("replicator", "max_document_id_length", "4"),
- {ok, _} = couch_replicator:replicate(RepObject, ?ADMIN_USER),
- ?_assertError({badmatch, {not_found, missing}},
- couch_replicator_test_helper:compare_dbs(Source, Target)).
+teardown({Source, Target}) ->
+ config:delete("replicator", "max_document_id_length", false),
+ couch_replicator_test_helper:delete_db(Source),
+ couch_replicator_test_helper:delete_db(Target).
-create_db() ->
- DbName = ?tempdb(),
- {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
- ok = couch_db:close(Db),
- DbName.
+should_succeed({Source, Target}) ->
+ config:set("replicator", "max_document_id_length", "5", false),
+ {ok, _} = couch_replicator_test_helper:replicate(Source, Target),
+ ?assertEqual(ok, couch_replicator_test_helper:compare_dbs(Source, Target)).
-create_doc(DbName) ->
- {ok, Db} = couch_db:open(DbName, [?ADMIN_CTX]),
- Doc = couch_doc:from_json_obj({[{<<"_id">>, <<"12345">>}]}),
- {ok, _} = couch_db:update_doc(Db, Doc, []),
- couch_db:close(Db).
+should_fail({Source, Target}) ->
+ config:set("replicator", "max_document_id_length", "4", false),
+ {ok, _} = couch_replicator_test_helper:replicate(Source, Target),
+ ExceptIds = [<<"12345">>],
+ ?assertEqual(ok, couch_replicator_test_helper:compare_dbs(Source, Target,
+ ExceptIds)).
-delete_db(DbName) ->
- ok = couch_server:delete(DbName, [?ADMIN_CTX]).
-
-db_url(remote, DbName) ->
- Addr = config:get("httpd", "bind_address", "127.0.0.1"),
- Port = mochiweb_socket_server:get(couch_httpd, port),
- ?l2b(io_lib:format("http://~s:~b/~s", [Addr, Port, DbName])).
+create_doc(DbName) ->
+ Docs = [#{<<"_id">> => <<"12345">>}],
+ couch_replicator_test_helper:create_docs(DbName, Docs).
diff --git a/src/couch_replicator/test/eunit/couch_replicator_job_server_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_job_server_tests.erl
new file mode 100644
index 000000000..698a84400
--- /dev/null
+++ b/src/couch_replicator/test/eunit/couch_replicator_job_server_tests.erl
@@ -0,0 +1,437 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_job_server_tests).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("fabric/test/fabric2_test.hrl").
+
+
+-define(SHUTDOWN_TIMEOUT, 1000).
+-define(JOB_SERVER, couch_replicator_job_server).
+
+
+job_server_test_() ->
+ {
+ "Test job server",
+ {
+ setup,
+ fun setup_all/0,
+ fun teardown_all/1,
+ {
+ foreach,
+ fun setup/0,
+ fun teardown/1,
+ [
+ ?TDEF_FE(should_start_up),
+ ?TDEF_FE(reschedule_resets_timer),
+ ?TDEF_FE(reschedule_reads_config),
+ ?TDEF_FE(acceptors_spawned_if_pending),
+ ?TDEF_FE(acceptors_not_spawned_if_no_pending),
+ ?TDEF_FE(acceptors_not_spawned_if_no_max_churn),
+ ?TDEF_FE(acceptors_not_spawned_if_no_churn_budget),
+ ?TDEF_FE(acceptors_spawned_on_acceptor_exit),
+ ?TDEF_FE(acceptor_turns_into_worker),
+ ?TDEF_FE(acceptors_spawned_on_worker_exit),
+ ?TDEF_FE(excess_acceptors_spawned),
+ ?TDEF_FE(excess_workers_trimmed_on_reschedule),
+ ?TDEF_FE(recent_workers_are_not_stopped)
+ ]
+ }
+ }
+ }.
+
+
+setup_all() ->
+ Ctx = test_util:start_couch(),
+ meck:new(couch_replicator_job_server, [passthrough]),
+ mock_pending(0),
+ meck:expect(couch_replicator_jobs, set_timeout, 0, ok),
+ meck:expect(couch_replicator_jobs, fold_jobs, 3, ok),
+ meck:expect(couch_replicator_job, start_link, fun() ->
+ {ok, spawn_link(fun() -> start_job() end)}
+ end),
+ Ctx.
+
+
+teardown_all(Ctx) ->
+ meck:unload(),
+ config_delete("interval_sec"),
+ config_delete("max_acceptors"),
+ config_delete("max_jobs"),
+ config_delete("max_churn"),
+ config_delete("min_run_time_sec"),
+ config_delete("transient_job_max_age_sec"),
+ test_util:stop_couch(Ctx).
+
+
+setup() ->
+ config_set("interval_sec", "99999"),
+ config_set("max_acceptors", "0"),
+ config_set("max_jobs", "0"),
+ config_set("max_churn", "1"),
+ config_set("min_run_time_sec", "0"),
+ config_set("transient_job_max_age_sec", "99999"),
+
+ mock_pending(0),
+
+ {ok, SPid} = ?JOB_SERVER:start_link(?SHUTDOWN_TIMEOUT),
+ SPid.
+
+
+teardown(SPid) when is_pid(SPid) ->
+ unlink(SPid),
+ Ref = monitor(process, SPid),
+ exit(SPid, kill),
+ receive {'DOWN', Ref, _, _, _} -> ok end,
+
+ meck:reset(couch_replicator_jobs),
+ meck:reset(couch_replicator_job),
+ meck:reset(couch_replicator_job_server),
+
+ config_delete("interval_sec"),
+ config_delete("max_acceptors"),
+ config_delete("max_jobs"),
+ config_delete("max_churn"),
+ config_delete("min_run_time_sec"),
+ config_delete("transient_job_max_age_sec").
+
+
+should_start_up(SPid) ->
+ ?assert(is_process_alive(SPid)),
+ ?assertEqual(SPid, whereis(?JOB_SERVER)),
+ State = sys:get_state(?JOB_SERVER),
+ #{
+ acceptors := #{},
+ workers := #{},
+ churn := 0,
+ config := Config,
+ timer := Timer,
+ timeout := ?SHUTDOWN_TIMEOUT
+ } = State,
+
+ % Make sure it read the config
+ ?assertMatch(#{
+ max_acceptors := 0,
+ interval_sec := 99999,
+ max_jobs := 0,
+ max_churn := 1,
+ min_run_time_sec := 0,
+ transient_job_max_age_sec := 99999
+ }, Config),
+
+ % Timer was set up
+ ?assert(is_reference(Timer)),
+ ?assert(is_integer(erlang:read_timer(Timer))).
+
+
+reschedule_resets_timer(_) ->
+ #{timer := OldTimer} = sys:get_state(?JOB_SERVER),
+
+ ?assertEqual(ok, ?JOB_SERVER:reschedule()),
+
+ #{timer := Timer} = sys:get_state(?JOB_SERVER),
+ ?assert(is_reference(Timer)),
+ ?assert(Timer =/= OldTimer).
+
+
+reschedule_reads_config(_) ->
+ config_set("interval_sec", "99998"),
+
+ ?JOB_SERVER:reschedule(),
+
+ #{config := Config} = sys:get_state(?JOB_SERVER),
+ ?assertMatch(#{interval_sec := 99998}, Config).
+
+
+acceptors_spawned_if_pending(_) ->
+ config_set("max_acceptors", "1"),
+ mock_pending(1),
+
+ ?JOB_SERVER:reschedule(),
+
+ ?assertMatch([Pid] when is_pid(Pid), acceptors()).
+
+
+acceptors_not_spawned_if_no_pending(_) ->
+ config_set("max_acceptors", "1"),
+ mock_pending(0),
+
+ ?JOB_SERVER:reschedule(),
+
+ ?assertEqual([], acceptors()).
+
+
+acceptors_not_spawned_if_no_max_churn(_) ->
+ config_set("max_churn", "0"),
+ config_set("max_acceptors", "1"),
+ mock_pending(1),
+
+ ?JOB_SERVER:reschedule(),
+
+ ?assertEqual([], acceptors()).
+
+
+acceptors_not_spawned_if_no_churn_budget(_) ->
+ config_set("max_churn", "1"),
+ config_set("max_acceptors", "1"),
+ mock_pending(0),
+
+ % To read the config
+ ?JOB_SERVER:reschedule(),
+
+ ?assertEqual([], acceptors()),
+
+ mock_pending(1),
+
+ % Exhaust churn budget
+ sys:replace_state(couch_replicator_job_server, fun(#{} = St) ->
+ St#{churn := 1}
+ end),
+
+ ?JOB_SERVER:reschedule(),
+
+ ?assertEqual([], acceptors()).
+
+
+acceptors_spawned_on_acceptor_exit(_) ->
+ config_set("max_acceptors", "3"),
+ config_set("max_jobs", "4"),
+ mock_pending(1),
+
+ ?JOB_SERVER:reschedule(),
+
+ [A1] = acceptors(),
+
+ exit(A1, kill),
+ meck:wait(?JOB_SERVER, handle_info, [{'EXIT', A1, killed}, '_'], 2000),
+
+ ?assertEqual(3, length(acceptors())).
+
+
+acceptor_turns_into_worker(_) ->
+ config_set("max_acceptors", "3"),
+ config_set("max_jobs", "4"),
+ mock_pending(1),
+
+ ?JOB_SERVER:reschedule(),
+
+ [A1] = acceptors(),
+ accept_job(A1, true),
+ ?assertEqual(3, length(acceptors())),
+ #{workers := Workers} = sys:get_state(?JOB_SERVER),
+ ?assertMatch([{A1, {true, _}}], maps:to_list(Workers)).
+
+
+acceptors_spawned_on_worker_exit(_) ->
+ config_set("max_acceptors", "1"),
+ config_set("max_jobs", "1"),
+ mock_pending(1),
+
+ ?JOB_SERVER:reschedule(),
+
+ [A1] = acceptors(),
+ accept_job(A1, true),
+
+ % Since max_jobs = 1 no more acceptors are spawned
+ ?assertEqual(0, length(acceptors())),
+
+ % Same acceptor process is now a worker
+ ?assertEqual([A1], workers()),
+
+ exit(A1, shutdown),
+ meck:wait(?JOB_SERVER, handle_info, [{'EXIT', A1, shutdown}, '_'], 2000),
+
+ % New acceptor process started
+ ?assertEqual(1, length(acceptors())),
+ ?assertEqual(0, length(workers())).
+
+
+excess_acceptors_spawned(_) ->
+ config_set("max_acceptors", "2"),
+ config_set("max_churn", "3"),
+ config_set("max_jobs", "4"),
+ mock_pending(100),
+
+ ?JOB_SERVER:reschedule(),
+
+ ?assertEqual(3, length(acceptors())),
+
+ accept_all(),
+
+ ?assertEqual(3, length(workers())),
+ ?assertEqual(1, length(acceptors())),
+ % Check that the churn budget was consumed
+ ?assertMatch(#{churn := 3}, sys:get_state(?JOB_SERVER)),
+
+ accept_all(),
+
+ % No more acceptors spawned after reaching max_jobs
+ ?assertEqual(0, length(acceptors())),
+ ?assertEqual(4, length(workers())),
+
+ ?JOB_SERVER:reschedule(),
+
+ % Since all churn budget was consumed, no new acceptors should have beens
+ % spawned this cycle but churn budget should have been reset
+ ?assertEqual(0, length(acceptors())),
+ ?assertEqual(4, length(workers())),
+ ?assertMatch(#{churn := 0}, sys:get_state(?JOB_SERVER)),
+
+ ?JOB_SERVER:reschedule(),
+
+ % Should have spawned 3 excess acceptors
+ ?assertEqual(3, length(acceptors())),
+ ?assertEqual(4, length(workers())),
+
+ accept_all(),
+
+ % Running with an excess number of workers
+ ?assertEqual(0, length(acceptors())),
+ ?assertEqual(7, length(workers())).
+
+
+excess_workers_trimmed_on_reschedule(_) ->
+ config_set("max_acceptors", "2"),
+ config_set("max_churn", "3"),
+ config_set("max_jobs", "4"),
+ mock_pending(100),
+
+ ?JOB_SERVER:reschedule(),
+
+ [A1, A2, A3] = acceptors(),
+ accept_job(A1, true),
+ accept_job(A2, false),
+ accept_job(A3, false),
+ [A4] = acceptors(),
+ accept_job(A4, true),
+
+ ?JOB_SERVER:reschedule(),
+
+ % First reschedule was to reset the churn budget, this next one is to spawn
+ % an excess number of acceptors.
+ ?JOB_SERVER:reschedule(),
+
+ [A5, A6, A7] = acceptors(),
+ accept_job(A5, true),
+ accept_job(A6, false),
+ accept_job(A7, false),
+
+ ?assertEqual(7, length(workers())),
+
+ % Running with an excess number of workers. These should be trimmed on the
+ % during the next cycle
+ ?JOB_SERVER:reschedule(),
+
+ Workers = workers(),
+ ?assertEqual(4, length(Workers)),
+ ?assertEqual(0, length(acceptors())),
+
+ % Check that A1 and A4 were skipped since they are not continuous
+ ?assertEqual(Workers, Workers -- [A2, A3, A6]).
+
+
+recent_workers_are_not_stopped(_) ->
+ config_set("max_acceptors", "2"),
+ config_set("max_churn", "3"),
+ config_set("max_jobs", "4"),
+ mock_pending(100),
+
+ ?JOB_SERVER:reschedule(),
+
+ [A1, A2, A3] = acceptors(),
+ accept_job(A1, true),
+ accept_job(A2, false),
+ accept_job(A3, false),
+ [A4] = acceptors(),
+ accept_job(A4, true),
+
+ ?JOB_SERVER:reschedule(),
+
+ % First reschedule was to reset the churn budget, this next one is to spawn
+ % an excess number of acceptors.
+ ?JOB_SERVER:reschedule(),
+
+ [A5, A6, A7] = acceptors(),
+ accept_job(A5, true),
+ accept_job(A6, false),
+ accept_job(A7, false),
+
+ ?assertEqual(7, length(workers())),
+
+ % Running with an excess number of workers. But they won't be stopped on
+ % reschedule if they ran for a period less than min_run_time_sec during the
+ % next cycle
+ config_set("min_run_time_sec", "9999"),
+
+ % don't want to start new acceptors anymore
+ mock_pending(0),
+ config_set("max_acceptors", "0"),
+
+ ?JOB_SERVER:reschedule(),
+
+ ?assertEqual(7, length(workers())),
+ ?assertEqual(0, length(acceptors())),
+
+ config_set("min_run_time_sec", "0"),
+
+ ?JOB_SERVER:reschedule(),
+
+ ?assertEqual(4, length(workers())),
+ ?assertEqual(0, length(acceptors())).
+
+
+config_set(K, V) ->
+ config:set("replicator", K, V, _Persist = false).
+
+
+config_delete(K) ->
+ config:delete("replicator", K, _Persist = false).
+
+
+mock_pending(N) ->
+ meck:expect(couch_replicator_jobs, pending_count, 2, N).
+
+
+acceptors() ->
+ #{acceptors := Acceptors} = sys:get_state(?JOB_SERVER),
+ maps:keys(Acceptors).
+
+
+workers() ->
+ #{workers := Workers} = sys:get_state(?JOB_SERVER),
+ maps:keys(Workers).
+
+
+accept_job(APid, Normal) ->
+ APid ! {accept_job, Normal, self()},
+ receive
+ {job_accepted, APid} -> ok
+ after
+ 5000 ->
+ error(test_job_accept_timeout)
+ end.
+
+
+accept_all() ->
+ [accept_job(APid, true) || APid <- acceptors()].
+
+
+start_job() ->
+ receive
+ {accept_job, Normal, From} ->
+ ok = ?JOB_SERVER:accepted(self(), Normal),
+ From ! {job_accepted, self()},
+ start_job();
+ {exit_job, ExitSig} ->
+ exit(ExitSig)
+ end.
diff --git a/src/couch_replicator/test/eunit/couch_replicator_large_atts_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_large_atts_tests.erl
index 27c89a0cd..fcbdf229f 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_large_atts_tests.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_large_atts_tests.erl
@@ -14,12 +14,8 @@
-include_lib("couch/include/couch_eunit.hrl").
-include_lib("couch/include/couch_db.hrl").
+-include_lib("fabric/test/fabric2_test.hrl").
--import(couch_replicator_test_helper, [
- db_url/1,
- replicate/2,
- compare_dbs/2
-]).
-define(ATT_SIZE_1, 2 * 1024 * 1024).
-define(ATT_SIZE_2, round(6.6 * 1024 * 1024)).
@@ -27,90 +23,65 @@
-define(TIMEOUT_EUNIT, 120).
-setup() ->
- DbName = ?tempdb(),
- {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
- ok = couch_db:close(Db),
- DbName.
-
-setup(remote) ->
- {remote, setup()};
-setup({A, B}) ->
- Ctx = test_util:start_couch([couch_replicator]),
- config:set("attachments", "compressible_types", "text/*", false),
- Source = setup(A),
- Target = setup(B),
- {Ctx, {Source, Target}}.
-
-teardown({remote, DbName}) ->
- teardown(DbName);
-teardown(DbName) ->
- ok = couch_server:delete(DbName, [?ADMIN_CTX]),
- ok.
-
-teardown(_, {Ctx, {Source, Target}}) ->
- teardown(Source),
- teardown(Target),
-
- ok = application:stop(couch_replicator),
- ok = test_util:stop_couch(Ctx).
-
large_atts_test_() ->
- Pairs = [{remote, remote}],
{
- "Replicate docs with large attachments",
+ "Large attachment replication test",
{
- foreachx,
- fun setup/1, fun teardown/2,
- [{Pair, fun should_populate_replicate_compact/2}
- || Pair <- Pairs]
+ setup,
+ fun couch_replicator_test_helper:start_couch/0,
+ fun couch_replicator_test_helper:stop_couch/1,
+ {
+ foreach,
+ fun setup/0,
+ fun teardown/1,
+ [
+ ?TDEF_FE(should_replicate_attachments, 120)
+ ]
+ }
}
}.
-should_populate_replicate_compact({From, To}, {_Ctx, {Source, Target}}) ->
- {lists:flatten(io_lib:format("~p -> ~p", [From, To])),
- {inorder, [should_populate_source(Source),
- should_replicate(Source, Target),
- should_compare_databases(Source, Target)]}}.
+setup() ->
+ AttCfg = config:get("attachments", "compressible_types"),
+ config:set("attachments", "compressible_types", "text/*", false),
+ Source = couch_replicator_test_helper:create_db(),
+ ok = populate_db(Source, ?DOCS_COUNT),
+ Target = couch_replicator_test_helper:create_db(),
+ {AttCfg, Source, Target}.
+
-should_populate_source({remote, Source}) ->
- should_populate_source(Source);
-should_populate_source(Source) ->
- {timeout, ?TIMEOUT_EUNIT, ?_test(populate_db(Source, ?DOCS_COUNT))}.
+teardown({AttCfg, Source, Target}) ->
+ couch_replicator_test_helper:delete_db(Source),
+ couch_replicator_test_helper:delete_db(Target),
+ case AttCfg of
+ undefined ->
+ config:delete("attachments", "compressible_types", false);
+ _ ->
+ config:set("attachments", "compressible_types", AttCfg)
+ end.
-should_replicate({remote, Source}, Target) ->
- should_replicate(db_url(Source), Target);
-should_replicate(Source, {remote, Target}) ->
- should_replicate(Source, db_url(Target));
-should_replicate(Source, Target) ->
- {timeout, ?TIMEOUT_EUNIT, ?_test(replicate(Source, Target))}.
-should_compare_databases({remote, Source}, Target) ->
- should_compare_databases(Source, Target);
-should_compare_databases(Source, {remote, Target}) ->
- should_compare_databases(Source, Target);
-should_compare_databases(Source, Target) ->
- {timeout, ?TIMEOUT_EUNIT, ?_test(compare_dbs(Source, Target))}.
+should_replicate_attachments({_AttCfg, Source, Target}) ->
+ ?assertMatch({ok, _},
+ couch_replicator_test_helper:replicate(Source, Target)),
+ ?assertEqual(ok, couch_replicator_test_helper:compare_dbs(Source, Target)).
populate_db(DbName, DocCount) ->
- {ok, Db} = couch_db:open_int(DbName, []),
- Docs = lists:foldl(
- fun(DocIdCounter, Acc) ->
- Doc = #doc{
- id = iolist_to_binary(["doc", integer_to_list(DocIdCounter)]),
- body = {[]},
- atts = [
- att(<<"att1">>, ?ATT_SIZE_1, <<"text/plain">>),
- att(<<"att2">>, ?ATT_SIZE_2, <<"app/binary">>)
- ]
- },
- [Doc | Acc]
- end,
- [], lists:seq(1, DocCount)),
- {ok, _} = couch_db:update_docs(Db, Docs, []),
- couch_db:close(Db).
+ Docs = lists:foldl(fun(DocIdCounter, Acc) ->
+ Doc = #doc{
+ id = iolist_to_binary(["doc", integer_to_list(DocIdCounter)]),
+ body = {[]},
+ atts = [
+ att(<<"att1">>, ?ATT_SIZE_1, <<"text/plain">>),
+ att(<<"att2">>, ?ATT_SIZE_2, <<"app/binary">>)
+ ]
+ },
+ [Doc | Acc]
+ end, [], lists:seq(1, DocCount)),
+ couch_replicator_test_helper:create_docs(DbName, Docs).
+
att(Name, Size, Type) ->
couch_att:new([
diff --git a/src/couch_replicator/test/eunit/couch_replicator_many_leaves_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_many_leaves_tests.erl
index c7933b472..3dbfa6aba 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_many_leaves_tests.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_many_leaves_tests.erl
@@ -14,11 +14,8 @@
-include_lib("couch/include/couch_eunit.hrl").
-include_lib("couch/include/couch_db.hrl").
+-include_lib("fabric/test/fabric2_test.hrl").
--import(couch_replicator_test_helper, [
- db_url/1,
- replicate/2
-]).
-define(DOCS_CONFLICTS, [
{<<"doc1">>, 10},
@@ -28,178 +25,150 @@
{<<"doc3">>, 210}
]).
-define(NUM_ATTS, 2).
--define(TIMEOUT_EUNIT, 60).
-define(i2l(I), integer_to_list(I)).
-define(io2b(Io), iolist_to_binary(Io)).
-setup() ->
- DbName = ?tempdb(),
- {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
- ok = couch_db:close(Db),
- DbName.
-
-
-setup(remote) ->
- {remote, setup()};
-setup({A, B}) ->
- Ctx = test_util:start_couch([couch_replicator]),
- Source = setup(A),
- Target = setup(B),
- {Ctx, {Source, Target}}.
-
-teardown({remote, DbName}) ->
- teardown(DbName);
-teardown(DbName) ->
- ok = couch_server:delete(DbName, [?ADMIN_CTX]),
- ok.
-
-teardown(_, {Ctx, {Source, Target}}) ->
- teardown(Source),
- teardown(Target),
- ok = application:stop(couch_replicator),
- ok = test_util:stop_couch(Ctx).
docs_with_many_leaves_test_() ->
- Pairs = [{remote, remote}],
{
"Replicate documents with many leaves",
{
- foreachx,
- fun setup/1, fun teardown/2,
- [{Pair, fun should_populate_replicate_compact/2}
- || Pair <- Pairs]
+ setup,
+ fun couch_replicator_test_helper:start_couch/0,
+ fun couch_replicator_test_helper:stop_couch/1,
+ {
+ foreach,
+ fun setup/0,
+ fun teardown/1,
+ [
+ ?TDEF_FE(should_replicate_doc_with_many_leaves, 180)
+ ]
+ }
}
}.
-should_populate_replicate_compact({From, To}, {_Ctx, {Source, Target}}) ->
- {lists:flatten(io_lib:format("~p -> ~p", [From, To])),
- {inorder, [
- should_populate_source(Source),
- should_replicate(Source, Target),
- should_verify_target(Source, Target),
- should_add_attachments_to_source(Source),
- should_replicate(Source, Target),
- should_verify_target(Source, Target)
- ]}}.
-
-should_populate_source({remote, Source}) ->
- should_populate_source(Source);
-should_populate_source(Source) ->
- {timeout, ?TIMEOUT_EUNIT, ?_test(populate_db(Source))}.
-
-should_replicate({remote, Source}, Target) ->
- should_replicate(db_url(Source), Target);
-should_replicate(Source, {remote, Target}) ->
- should_replicate(Source, db_url(Target));
-should_replicate(Source, Target) ->
- {timeout, ?TIMEOUT_EUNIT, ?_test(replicate(Source, Target))}.
-
-should_verify_target({remote, Source}, Target) ->
- should_verify_target(Source, Target);
-should_verify_target(Source, {remote, Target}) ->
- should_verify_target(Source, Target);
-should_verify_target(Source, Target) ->
- {timeout, ?TIMEOUT_EUNIT, ?_test(begin
- {ok, SourceDb} = couch_db:open_int(Source, []),
- {ok, TargetDb} = couch_db:open_int(Target, []),
- verify_target(SourceDb, TargetDb, ?DOCS_CONFLICTS),
- ok = couch_db:close(SourceDb),
- ok = couch_db:close(TargetDb)
- end)}.
-
-should_add_attachments_to_source({remote, Source}) ->
- should_add_attachments_to_source(Source);
-should_add_attachments_to_source(Source) ->
- {timeout, ?TIMEOUT_EUNIT, ?_test(begin
- {ok, SourceDb} = couch_db:open_int(Source, [?ADMIN_CTX]),
- add_attachments(SourceDb, ?NUM_ATTS, ?DOCS_CONFLICTS),
- ok = couch_db:close(SourceDb)
- end)}.
+setup() ->
+ Source = couch_replicator_test_helper:create_db(),
+ populate_db(Source),
+ Target = couch_replicator_test_helper:create_db(),
+ {Source, Target}.
+
+
+teardown({Source, Target}) ->
+ couch_replicator_test_helper:delete_db(Source),
+ couch_replicator_test_helper:delete_db(Target).
+
+
+should_replicate_doc_with_many_leaves({Source, Target}) ->
+ replicate(Source, Target),
+ {ok, SourceDb} = fabric2_db:open(Source, [?ADMIN_CTX]),
+ {ok, TargetDb} = fabric2_db:open(Target, [?ADMIN_CTX]),
+ verify_target(SourceDb, TargetDb, ?DOCS_CONFLICTS),
+ add_attachments(SourceDb, ?NUM_ATTS, ?DOCS_CONFLICTS),
+ replicate(Source, Target),
+ verify_target(SourceDb, TargetDb, ?DOCS_CONFLICTS).
+
populate_db(DbName) ->
- {ok, Db} = couch_db:open_int(DbName, [?ADMIN_CTX]),
- lists:foreach(
- fun({DocId, NumConflicts}) ->
- Value = <<"0">>,
- Doc = #doc{
- id = DocId,
- body = {[ {<<"value">>, Value} ]}
- },
- {ok, _} = couch_db:update_doc(Db, Doc, [?ADMIN_CTX]),
- {ok, _} = add_doc_siblings(Db, DocId, NumConflicts)
- end, ?DOCS_CONFLICTS),
- couch_db:close(Db).
-
-add_doc_siblings(Db, DocId, NumLeaves) when NumLeaves > 0 ->
+ {ok, Db} = fabric2_db:open(DbName, [?ADMIN_CTX]),
+ lists:foreach(fun({DocId, NumConflicts}) ->
+ Doc = #doc{
+ id = DocId,
+ body = {[{<<"value">>, <<"0">>}]}
+ },
+ {ok, _} = fabric2_db:update_doc(Db, Doc),
+ {ok, _} = add_doc_siblings(Db, DocId, NumConflicts)
+ end, ?DOCS_CONFLICTS).
+
+
+add_doc_siblings(#{} = Db, DocId, NumLeaves) when NumLeaves > 0 ->
add_doc_siblings(Db, DocId, NumLeaves, [], []).
-add_doc_siblings(Db, _DocId, 0, AccDocs, AccRevs) ->
- {ok, []} = couch_db:update_docs(Db, AccDocs, [], replicated_changes),
+
+add_doc_siblings(#{} = Db, _DocId, 0, AccDocs, AccRevs) ->
+ {ok, []} = fabric2_db:update_docs(Db, AccDocs, [replicated_changes]),
{ok, AccRevs};
-add_doc_siblings(Db, DocId, NumLeaves, AccDocs, AccRevs) ->
+add_doc_siblings(#{} = Db, DocId, NumLeaves, AccDocs, AccRevs) ->
Value = ?l2b(?i2l(NumLeaves)),
Rev = couch_hash:md5_hash(Value),
Doc = #doc{
id = DocId,
revs = {1, [Rev]},
- body = {[ {<<"value">>, Value} ]}
+ body = {[{<<"value">>, Value}]}
},
add_doc_siblings(Db, DocId, NumLeaves - 1,
- [Doc | AccDocs], [{1, Rev} | AccRevs]).
+ [Doc | AccDocs], [{1, Rev} | AccRevs]).
+
verify_target(_SourceDb, _TargetDb, []) ->
ok;
-verify_target(SourceDb, TargetDb, [{DocId, NumConflicts} | Rest]) ->
- {ok, SourceLookups} = couch_db:open_doc_revs(
- SourceDb,
- DocId,
- all,
- [conflicts, deleted_conflicts]),
- {ok, TargetLookups} = couch_db:open_doc_revs(
- TargetDb,
- DocId,
- all,
- [conflicts, deleted_conflicts]),
+
+verify_target(#{} = SourceDb, #{} = TargetDb,
+ [{DocId, NumConflicts} | Rest]) ->
+ Opts = [conflicts, deleted_conflicts],
+ {ok, SourceLookups} = open_doc_revs(SourceDb, DocId, Opts),
+ {ok, TargetLookups} = open_doc_revs(TargetDb, DocId, Opts),
SourceDocs = [Doc || {ok, Doc} <- SourceLookups],
TargetDocs = [Doc || {ok, Doc} <- TargetLookups],
Total = NumConflicts + 1,
?assertEqual(Total, length(TargetDocs)),
- lists:foreach(
- fun({SourceDoc, TargetDoc}) ->
- SourceJson = couch_doc:to_json_obj(SourceDoc, [attachments]),
- TargetJson = couch_doc:to_json_obj(TargetDoc, [attachments]),
- ?assertEqual(SourceJson, TargetJson)
- end,
- lists:zip(SourceDocs, TargetDocs)),
+ lists:foreach(fun({SourceDoc, TargetDoc}) ->
+ ?assertEqual(json_doc(SourceDoc), json_doc(TargetDoc))
+ end, lists:zip(SourceDocs, TargetDocs)),
verify_target(SourceDb, TargetDb, Rest).
-add_attachments(_SourceDb, _NumAtts, []) ->
+
+add_attachments(_SourceDb, _NumAtts, []) ->
ok;
-add_attachments(SourceDb, NumAtts, [{DocId, NumConflicts} | Rest]) ->
- {ok, SourceLookups} = couch_db:open_doc_revs(SourceDb, DocId, all, []),
+
+add_attachments(#{} = SourceDb, NumAtts,
+ [{DocId, NumConflicts} | Rest]) ->
+ {ok, SourceLookups} = open_doc_revs(SourceDb, DocId, []),
SourceDocs = [Doc || {ok, Doc} <- SourceLookups],
Total = NumConflicts + 1,
?assertEqual(Total, length(SourceDocs)),
- NewDocs = lists:foldl(
- fun(#doc{atts = Atts, revs = {Pos, [Rev | _]}} = Doc, Acc) ->
+ NewDocs = lists:foldl(fun
+ (#doc{atts = Atts, revs = {Pos, [Rev | _]}} = Doc, Acc) ->
NewAtts = lists:foldl(fun(I, AttAcc) ->
- AttData = crypto:strong_rand_bytes(100),
- NewAtt = couch_att:new([
- {name, ?io2b(["att_", ?i2l(I), "_",
- couch_doc:rev_to_str({Pos, Rev})])},
- {type, <<"application/foobar">>},
- {att_len, byte_size(AttData)},
- {data, AttData}
- ]),
- [NewAtt | AttAcc]
+ [att(I, {Pos, Rev}, 100) | AttAcc]
end, [], lists:seq(1, NumAtts)),
[Doc#doc{atts = Atts ++ NewAtts} | Acc]
- end,
- [], SourceDocs),
- {ok, UpdateResults} = couch_db:update_docs(SourceDb, NewDocs, []),
- NewRevs = [R || {ok, R} <- UpdateResults],
- ?assertEqual(length(NewDocs), length(NewRevs)),
+ end, [], SourceDocs),
+ lists:foreach(fun(#doc{} = Doc) ->
+ ?assertMatch({ok, _}, fabric2_db:update_doc(SourceDb, Doc))
+ end, NewDocs),
add_attachments(SourceDb, NumAtts, Rest).
+
+att(I, PosRev, Size) ->
+ Name = ?io2b(["att_", ?i2l(I), "_", couch_doc:rev_to_str(PosRev)]),
+ AttData = crypto:strong_rand_bytes(Size),
+ couch_att:new([
+ {name, Name},
+ {type, <<"application/foobar">>},
+ {att_len, byte_size(AttData)},
+ {data, AttData}
+ ]).
+
+
+open_doc_revs(#{} = Db, DocId, Opts) ->
+ fabric2_db:open_doc_revs(Db, DocId, all, Opts).
+
+
+json_doc(#doc{} = Doc) ->
+ couch_doc:to_json_obj(Doc, [attachments]).
+
+
+replicate(Source, Target) ->
+ % Serialize the concurrent updates of the same document in order
+ % to prevent having to set higher timeouts due to FDB conflicts
+ RepObject = #{
+ <<"source">> => Source,
+ <<"target">> => Target,
+ <<"worker_processes">> => 1,
+ <<"http_connections">> => 1
+ },
+ ?assertMatch({ok, _},
+ couch_replicator_test_helper:replicate(RepObject)).
diff --git a/src/couch_replicator/test/eunit/couch_replicator_missing_stubs_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_missing_stubs_tests.erl
index ff08b5ee5..e672c76b7 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_missing_stubs_tests.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_missing_stubs_tests.erl
@@ -14,103 +14,59 @@
-include_lib("couch/include/couch_eunit.hrl").
-include_lib("couch/include/couch_db.hrl").
+-include_lib("fabric/test/fabric2_test.hrl").
--import(couch_replicator_test_helper, [
- db_url/1,
- replicate/2,
- compare_dbs/2
-]).
-define(REVS_LIMIT, 3).
--define(TIMEOUT_EUNIT, 30).
-setup() ->
- DbName = ?tempdb(),
- {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
- ok = couch_db:close(Db),
- DbName.
-
-setup(remote) ->
- {remote, setup()};
-setup({A, B}) ->
- Ctx = test_util:start_couch([couch_replicator]),
- Source = setup(A),
- Target = setup(B),
- {Ctx, {Source, Target}}.
-
-teardown({remote, DbName}) ->
- teardown(DbName);
-teardown(DbName) ->
- ok = couch_server:delete(DbName, [?ADMIN_CTX]),
- ok.
-
-teardown(_, {Ctx, {Source, Target}}) ->
- teardown(Source),
- teardown(Target),
- ok = application:stop(couch_replicator),
- ok = test_util:stop_couch(Ctx).
missing_stubs_test_() ->
- Pairs = [{remote, remote}],
{
"Replicate docs with missing stubs (COUCHDB-1365)",
{
- foreachx,
- fun setup/1, fun teardown/2,
- [{Pair, fun should_replicate_docs_with_missed_att_stubs/2}
- || Pair <- Pairs]
+ setup,
+ fun couch_replicator_test_helper:start_couch/0,
+ fun couch_replicator_test_helper:stop_couch/1,
+ {
+ foreach,
+ fun setup/0,
+ fun teardown/1,
+ [
+ ?TDEF_FE(should_replicate_docs_with_missed_att_stubs, 60)
+ ]
+ }
}
}.
-should_replicate_docs_with_missed_att_stubs({From, To}, {_Ctx, {Source, Target}}) ->
- {lists:flatten(io_lib:format("~p -> ~p", [From, To])),
- {inorder, [
- should_populate_source(Source),
- should_set_target_revs_limit(Target, ?REVS_LIMIT),
- should_replicate(Source, Target),
- should_compare_databases(Source, Target),
- should_update_source_docs(Source, ?REVS_LIMIT * 2),
- should_replicate(Source, Target),
- should_compare_databases(Source, Target)
- ]}}.
-
-should_populate_source({remote, Source}) ->
- should_populate_source(Source);
-should_populate_source(Source) ->
- {timeout, ?TIMEOUT_EUNIT, ?_test(populate_db(Source))}.
-
-should_replicate({remote, Source}, Target) ->
- should_replicate(db_url(Source), Target);
-should_replicate(Source, {remote, Target}) ->
- should_replicate(Source, db_url(Target));
-should_replicate(Source, Target) ->
- {timeout, ?TIMEOUT_EUNIT, ?_test(replicate(Source, Target))}.
-
-should_set_target_revs_limit({remote, Target}, RevsLimit) ->
- should_set_target_revs_limit(Target, RevsLimit);
-should_set_target_revs_limit(Target, RevsLimit) ->
- ?_test(begin
- {ok, Db} = couch_db:open_int(Target, [?ADMIN_CTX]),
- ?assertEqual(ok, couch_db:set_revs_limit(Db, RevsLimit)),
- ok = couch_db:close(Db)
- end).
-
-should_compare_databases({remote, Source}, Target) ->
- should_compare_databases(Source, Target);
-should_compare_databases(Source, {remote, Target}) ->
- should_compare_databases(Source, Target);
-should_compare_databases(Source, Target) ->
- {timeout, ?TIMEOUT_EUNIT, ?_test(compare_dbs(Source, Target))}.
-
-should_update_source_docs({remote, Source}, Times) ->
- should_update_source_docs(Source, Times);
-should_update_source_docs(Source, Times) ->
- {timeout, ?TIMEOUT_EUNIT, ?_test(update_db_docs(Source, Times))}.
+setup() ->
+ Source = couch_replicator_test_helper:create_db(),
+ populate_db(Source),
+ Target = couch_replicator_test_helper:create_db(),
+ {Source, Target}.
+
+
+teardown({Source, Target}) ->
+ couch_replicator_test_helper:delete_db(Source),
+ couch_replicator_test_helper:delete_db(Target).
+
+
+should_replicate_docs_with_missed_att_stubs({Source, Target}) ->
+ {ok, TargetDb} = fabric2_db:open(Target, [?ADMIN_CTX]),
+ ?assertEqual(ok, fabric2_db:set_revs_limit(TargetDb, ?REVS_LIMIT)),
+
+ ?assertMatch({ok, _},
+ couch_replicator_test_helper:replicate(Source, Target)),
+ ?assertEqual(ok, couch_replicator_test_helper:compare_dbs(Source, Target)),
+
+ ok = update_db_docs(Source, ?REVS_LIMIT * 2),
+
+ ?assertMatch({ok, _},
+ couch_replicator_test_helper:replicate(Source, Target)),
+ ?assertEqual(ok, couch_replicator_test_helper:compare_dbs(Source, Target)).
populate_db(DbName) ->
- {ok, Db} = couch_db:open_int(DbName, []),
AttData = crypto:strong_rand_bytes(6000),
Doc = #doc{
id = <<"doc1">>,
@@ -120,35 +76,40 @@ populate_db(DbName) ->
{type, <<"application/foobar">>},
{att_len, byte_size(AttData)},
{data, AttData}
- ])
+ ])
]
},
- {ok, _} = couch_db:update_doc(Db, Doc, []),
- couch_db:close(Db).
+ couch_replicator_test_helper:create_docs(DbName, [Doc]).
+
update_db_docs(DbName, Times) ->
- {ok, Db} = couch_db:open_int(DbName, []),
- {ok, _} = couch_db:fold_docs(
- Db,
- fun(FDI, Acc) -> db_fold_fun(FDI, Acc) end,
- {DbName, Times},
- []),
- ok = couch_db:close(Db).
-
-db_fold_fun(FullDocInfo, {DbName, Times}) ->
- {ok, Db} = couch_db:open_int(DbName, []),
- {ok, Doc} = couch_db:open_doc(Db, FullDocInfo),
- lists:foldl(
- fun(_, {Pos, RevId}) ->
- {ok, Db2} = couch_db:reopen(Db),
- NewDocVersion = Doc#doc{
- revs = {Pos, [RevId]},
- body = {[{<<"value">>, base64:encode(crypto:strong_rand_bytes(100))}]}
- },
- {ok, NewRev} = couch_db:update_doc(Db2, NewDocVersion, []),
- NewRev
- end,
- {element(1, Doc#doc.revs), hd(element(2, Doc#doc.revs))},
- lists:seq(1, Times)),
- ok = couch_db:close(Db),
- {ok, {DbName, Times}}.
+ {ok, Db} = fabric2_db:open(DbName, [?ADMIN_CTX]),
+ FoldFun = fun
+ ({meta, _Meta}, Acc) ->
+ {ok, Acc};
+ (complete, Acc) ->
+ {ok, Acc};
+ ({row, Row}, Acc) ->
+ {_, DocId} = lists:keyfind(id, 1, Row),
+ ok = update_doc(DbName, DocId, Times),
+ {ok, Acc}
+ end,
+ Opts = [{restart_tx, true}],
+ {ok, _} = fabric2_db:fold_docs(Db, FoldFun, ok, Opts),
+ ok.
+
+
+update_doc(_DbName, _DocId, 0) ->
+ ok;
+
+update_doc(DbName, DocId, Times) ->
+ {ok, Db} = fabric2_db:open(DbName, [?ADMIN_CTX]),
+ {ok, Doc} = fabric2_db:open_doc(Db, DocId, []),
+ #doc{revs = {Pos, [Rev | _]}} = Doc,
+ Val = base64:encode(crypto:strong_rand_bytes(100)),
+ Doc1 = Doc#doc{
+ revs = {Pos, [Rev]},
+ body = {[{<<"value">>, Val}]}
+ },
+ {ok, _} = fabric2_db:update_doc(Db, Doc1),
+ update_doc(DbName, DocId, Times - 1).
diff --git a/src/couch_replicator/test/eunit/couch_replicator_proxy_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_proxy_tests.erl
index da46b8a26..f5e745d90 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_proxy_tests.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_proxy_tests.erl
@@ -14,15 +14,7 @@
-include_lib("couch/include/couch_eunit.hrl").
-include_lib("couch_replicator/src/couch_replicator.hrl").
--include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
-
-
-setup() ->
- ok.
-
-
-teardown(_) ->
- ok.
+-include_lib("fabric/test/fabric2_test.hrl").
replicator_proxy_test_() ->
@@ -30,87 +22,78 @@ replicator_proxy_test_() ->
"replicator proxy tests",
{
setup,
- fun() -> test_util:start_couch([couch_replicator]) end, fun test_util:stop_couch/1,
- {
- foreach,
- fun setup/0, fun teardown/1,
- [
- fun parse_rep_doc_without_proxy/1,
- fun parse_rep_doc_with_proxy/1,
- fun parse_rep_source_target_proxy/1,
- fun mutually_exclusive_proxy_and_source_proxy/1,
- fun mutually_exclusive_proxy_and_target_proxy/1
- ]
- }
+ fun couch_replicator_test_helper:start_couch/0,
+ fun couch_replicator_test_helper:stop_couch/1,
+ with([
+ ?TDEF(parse_rep_doc_without_proxy),
+ ?TDEF(parse_rep_doc_with_proxy),
+ ?TDEF(parse_rep_source_target_proxy),
+ ?TDEF(mutually_exclusive_proxy_and_source_proxy),
+ ?TDEF(mutually_exclusive_proxy_and_target_proxy)
+ ])
}
}.
parse_rep_doc_without_proxy(_) ->
- ?_test(begin
- NoProxyDoc = {[
- {<<"source">>, <<"http://unproxied.com">>},
- {<<"target">>, <<"http://otherunproxied.com">>}
- ]},
- Rep = couch_replicator_docs:parse_rep_doc(NoProxyDoc),
- ?assertEqual((Rep#rep.source)#httpdb.proxy_url, undefined),
- ?assertEqual((Rep#rep.target)#httpdb.proxy_url, undefined)
- end).
+ NoProxyDoc = {[
+ {<<"source">>, <<"http://unproxied.com">>},
+ {<<"target">>, <<"http://otherunproxied.com">>}
+ ]},
+ Rep = couch_replicator_parse:parse_rep_doc(NoProxyDoc),
+ Src = maps:get(?SOURCE, Rep),
+ Tgt = maps:get(?TARGET, Rep),
+ ?assertEqual(null, maps:get(<<"proxy_url">>, Src)),
+ ?assertEqual(null, maps:get(<<"proxy_url">>, Tgt)).
parse_rep_doc_with_proxy(_) ->
- ?_test(begin
- ProxyURL = <<"http://myproxy.com">>,
- ProxyDoc = {[
- {<<"source">>, <<"http://unproxied.com">>},
- {<<"target">>, <<"http://otherunproxied.com">>},
- {<<"proxy">>, ProxyURL}
- ]},
- Rep = couch_replicator_docs:parse_rep_doc(ProxyDoc),
- ?assertEqual((Rep#rep.source)#httpdb.proxy_url, binary_to_list(ProxyURL)),
- ?assertEqual((Rep#rep.target)#httpdb.proxy_url, binary_to_list(ProxyURL))
- end).
+ ProxyURL = <<"http://myproxy.com">>,
+ ProxyDoc = {[
+ {<<"source">>, <<"http://unproxied.com">>},
+ {<<"target">>, <<"http://otherunproxied.com">>},
+ {<<"proxy">>, ProxyURL}
+ ]},
+ Rep = couch_replicator_parse:parse_rep_doc(ProxyDoc),
+ Src = maps:get(?SOURCE, Rep),
+ Tgt = maps:get(?TARGET, Rep),
+ ?assertEqual(ProxyURL, maps:get(<<"proxy_url">>, Src)),
+ ?assertEqual(ProxyURL, maps:get(<<"proxy_url">>, Tgt)).
parse_rep_source_target_proxy(_) ->
- ?_test(begin
- SrcProxyURL = <<"http://mysrcproxy.com">>,
- TgtProxyURL = <<"http://mytgtproxy.com:9999">>,
- ProxyDoc = {[
- {<<"source">>, <<"http://unproxied.com">>},
- {<<"target">>, <<"http://otherunproxied.com">>},
- {<<"source_proxy">>, SrcProxyURL},
- {<<"target_proxy">>, TgtProxyURL}
- ]},
- Rep = couch_replicator_docs:parse_rep_doc(ProxyDoc),
- ?assertEqual((Rep#rep.source)#httpdb.proxy_url,
- binary_to_list(SrcProxyURL)),
- ?assertEqual((Rep#rep.target)#httpdb.proxy_url,
- binary_to_list(TgtProxyURL))
- end).
+ SrcProxyURL = <<"http://mysrcproxy.com">>,
+ TgtProxyURL = <<"http://mytgtproxy.com:9999">>,
+ ProxyDoc = {[
+ {<<"source">>, <<"http://unproxied.com">>},
+ {<<"target">>, <<"http://otherunproxied.com">>},
+ {<<"source_proxy">>, SrcProxyURL},
+ {<<"target_proxy">>, TgtProxyURL}
+ ]},
+ Rep = couch_replicator_parse:parse_rep_doc(ProxyDoc),
+ Src = maps:get(?SOURCE, Rep),
+ Tgt = maps:get(?TARGET, Rep),
+ ?assertEqual(SrcProxyURL, maps:get(<<"proxy_url">>, Src)),
+ ?assertEqual(TgtProxyURL, maps:get(<<"proxy_url">>, Tgt)).
mutually_exclusive_proxy_and_source_proxy(_) ->
- ?_test(begin
- ProxyDoc = {[
- {<<"source">>, <<"http://unproxied.com">>},
- {<<"target">>, <<"http://otherunproxied.com">>},
- {<<"proxy">>, <<"oldstyleproxy.local">>},
- {<<"source_proxy">>, <<"sourceproxy.local">>}
- ]},
- ?assertThrow({bad_rep_doc, _},
- couch_replicator_docs:parse_rep_doc(ProxyDoc))
- end).
+ ProxyDoc = {[
+ {<<"source">>, <<"http://unproxied.com">>},
+ {<<"target">>, <<"http://otherunproxied.com">>},
+ {<<"proxy">>, <<"oldstyleproxy.local">>},
+ {<<"source_proxy">>, <<"sourceproxy.local">>}
+ ]},
+ ?assertThrow({bad_rep_doc, _},
+ couch_replicator_parse:parse_rep_doc(ProxyDoc)).
mutually_exclusive_proxy_and_target_proxy(_) ->
- ?_test(begin
- ProxyDoc = {[
- {<<"source">>, <<"http://unproxied.com">>},
- {<<"target">>, <<"http://otherunproxied.com">>},
- {<<"proxy">>, <<"oldstyleproxy.local">>},
- {<<"target_proxy">>, <<"targetproxy.local">>}
- ]},
- ?assertThrow({bad_rep_doc, _},
- couch_replicator_docs:parse_rep_doc(ProxyDoc))
- end).
+ ProxyDoc = {[
+ {<<"source">>, <<"http://unproxied.com">>},
+ {<<"target">>, <<"http://otherunproxied.com">>},
+ {<<"proxy">>, <<"oldstyleproxy.local">>},
+ {<<"target_proxy">>, <<"targetproxy.local">>}
+ ]},
+ ?assertThrow({bad_rep_doc, _},
+ couch_replicator_parse:parse_rep_doc(ProxyDoc)).
diff --git a/src/couch_replicator/test/eunit/couch_replicator_rate_limiter_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_rate_limiter_tests.erl
index 034550aec..fb9892017 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_rate_limiter_tests.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_rate_limiter_tests.erl
@@ -1,6 +1,7 @@
-module(couch_replicator_rate_limiter_tests).
-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("fabric/test/fabric2_test.hrl").
rate_limiter_test_() ->
@@ -9,64 +10,52 @@ rate_limiter_test_() ->
fun setup/0,
fun teardown/1,
[
- t_new_key(),
- t_1_failure(),
- t_2_failures_back_to_back(),
- t_2_failures(),
- t_success_threshold(),
- t_1_failure_2_successes()
+ ?TDEF_FE(t_new_key),
+ ?TDEF_FE(t_1_failure),
+ ?TDEF_FE(t_2_failures_back_to_back),
+ ?TDEF_FE(t_2_failures),
+ ?TDEF_FE(t_success_threshold),
+ ?TDEF_FE(t_1_failure_2_successes)
]
}.
-t_new_key() ->
- ?_test(begin
- ?assertEqual(0, couch_replicator_rate_limiter:interval({"foo", get}))
- end).
+t_new_key(_) ->
+ ?assertEqual(0, couch_replicator_rate_limiter:interval({"foo", get})).
-t_1_failure() ->
- ?_test(begin
- ?assertEqual(24, couch_replicator_rate_limiter:failure({"foo", get}))
- end).
+t_1_failure(_) ->
+ ?assertEqual(24, couch_replicator_rate_limiter:failure({"foo", get})).
-t_2_failures() ->
- ?_test(begin
- couch_replicator_rate_limiter:failure({"foo", get}),
- low_pass_filter_delay(),
- Interval = couch_replicator_rate_limiter:failure({"foo", get}),
- ?assertEqual(29, Interval)
- end).
+t_2_failures(_) ->
+ couch_replicator_rate_limiter:failure({"foo", get}),
+ low_pass_filter_delay(),
+ Interval = couch_replicator_rate_limiter:failure({"foo", get}),
+ ?assertEqual(29, Interval).
-t_2_failures_back_to_back() ->
- ?_test(begin
- couch_replicator_rate_limiter:failure({"foo", get}),
- Interval = couch_replicator_rate_limiter:failure({"foo", get}),
- ?assertEqual(24, Interval)
- end).
+t_2_failures_back_to_back(_) ->
+ couch_replicator_rate_limiter:failure({"foo", get}),
+ Interval = couch_replicator_rate_limiter:failure({"foo", get}),
+ ?assertEqual(24, Interval).
-t_success_threshold() ->
- ?_test(begin
- Interval = couch_replicator_rate_limiter:success({"foo", get}),
- ?assertEqual(0, Interval),
- Interval = couch_replicator_rate_limiter:success({"foo", get}),
- ?assertEqual(0, Interval)
- end).
+t_success_threshold(_) ->
+ Interval = couch_replicator_rate_limiter:success({"foo", get}),
+ ?assertEqual(0, Interval),
+ Interval = couch_replicator_rate_limiter:success({"foo", get}),
+ ?assertEqual(0, Interval).
-t_1_failure_2_successes() ->
- ?_test(begin
- couch_replicator_rate_limiter:failure({"foo", get}),
- low_pass_filter_delay(),
- Succ1 = couch_replicator_rate_limiter:success({"foo", get}),
- ?assertEqual(20, Succ1),
- low_pass_filter_delay(),
- Succ2 = couch_replicator_rate_limiter:success({"foo", get}),
- ?assertEqual(0, Succ2)
- end).
+t_1_failure_2_successes(_) ->
+ couch_replicator_rate_limiter:failure({"foo", get}),
+ low_pass_filter_delay(),
+ Succ1 = couch_replicator_rate_limiter:success({"foo", get}),
+ ?assertEqual(20, Succ1),
+ low_pass_filter_delay(),
+ Succ2 = couch_replicator_rate_limiter:success({"foo", get}),
+ ?assertEqual(0, Succ2).
low_pass_filter_delay() ->
diff --git a/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl b/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl
index 037f37191..4b7c37d9e 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl
@@ -15,139 +15,72 @@
-include_lib("couch/include/couch_eunit.hrl").
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_replicator/src/couch_replicator.hrl").
-
--define(DELAY, 500).
--define(TIMEOUT, 60000).
-
-
-setup_all() ->
- test_util:start_couch([couch_replicator, chttpd, mem3, fabric]).
-
-
-teardown_all(Ctx) ->
- ok = test_util:stop_couch(Ctx).
-
-
-setup() ->
- Source = setup_db(),
- Target = setup_db(),
- {Source, Target}.
+-include_lib("fabric/test/fabric2_test.hrl").
-teardown({Source, Target}) ->
- teardown_db(Source),
- teardown_db(Target),
- ok.
+-define(DELAY, 500).
stats_retained_test_() ->
{
setup,
- fun setup_all/0,
- fun teardown_all/1,
+ fun couch_replicator_test_helper:start_couch/0,
+ fun couch_replicator_test_helper:stop_couch/1,
{
foreach,
fun setup/0,
fun teardown/1,
[
- fun t_stats_retained_by_scheduler/1,
- fun t_stats_retained_on_job_removal/1
+ ?TDEF_FE(t_stats_retained_on_job_removal, 60)
]
}
}.
-t_stats_retained_by_scheduler({Source, Target}) ->
- ?_test(begin
- {ok, _} = add_vdu(Target),
- populate_db_reject_even_docs(Source, 1, 10),
- {ok, RepPid, RepId} = replicate(Source, Target),
- wait_target_in_sync(6, Target),
-
- check_active_tasks(10, 5, 5),
- check_scheduler_jobs(10, 5, 5),
+setup() ->
+ Source = couch_replicator_test_helper:create_db(),
+ Target = couch_replicator_test_helper:create_db(),
+ config:set("replicator", "stats_update_interval_sec", "0", false),
+ config:set("replicator", "checkpoint_interval", "1000", false),
+ {Source, Target}.
- stop_job(RepPid),
- check_scheduler_jobs(10, 5, 5),
- start_job(),
- check_active_tasks(10, 5, 5),
- check_scheduler_jobs(10, 5, 5),
- couch_replicator_scheduler:remove_job(RepId)
- end).
+teardown({Source, Target}) ->
+ config:delete("replicator", "stats_update_interval_sec", false),
+ config:delete("replicator", "checkpoint_interval", false),
+ couch_replicator_test_helper:delete_db(Source),
+ couch_replicator_test_helper:delete_db(Target).
t_stats_retained_on_job_removal({Source, Target}) ->
- ?_test(begin
- {ok, _} = add_vdu(Target),
- populate_db_reject_even_docs(Source, 1, 10),
- {ok, _, RepId} = replicate(Source, Target),
- wait_target_in_sync(6, Target), % 5 + 1 vdu
-
- check_active_tasks(10, 5, 5),
- check_scheduler_jobs(10, 5, 5),
+ {ok, _} = add_vdu(Target),
+ populate_db_reject_even_docs(Source, 1, 10),
+ {ok, Pid1, RepId} = replicate(Source, Target),
+ wait_target_in_sync(6, Target), % 5 + 1 vdu
- couch_replicator_scheduler:remove_job(RepId),
+ check_scheduler_jobs(10, 5, 5),
- populate_db_reject_even_docs(Source, 11, 20),
- {ok, _, RepId} = replicate(Source, Target),
- wait_target_in_sync(11, Target), % 6 + 5
+ cancel(RepId, Pid1),
- check_scheduler_jobs(20, 10, 10),
- check_active_tasks(20, 10, 10),
+ populate_db_reject_even_docs(Source, 11, 20),
+ {ok, Pid2, RepId} = replicate(Source, Target),
+ wait_target_in_sync(11, Target), % 6 + 5
- couch_replicator_scheduler:remove_job(RepId),
+ check_scheduler_jobs(20, 10, 10),
- populate_db_reject_even_docs(Source, 21, 30),
- {ok, _, RepId} = replicate(Source, Target),
- wait_target_in_sync(16, Target), % 11 + 5
+ cancel(RepId, Pid2),
- check_scheduler_jobs(30, 15, 15),
- check_active_tasks(30, 15, 15),
-
- couch_replicator_scheduler:remove_job(RepId)
- end).
+ populate_db_reject_even_docs(Source, 21, 30),
+ {ok, Pid3, RepId} = replicate(Source, Target),
+ wait_target_in_sync(16, Target), % 11 + 5
+ check_scheduler_jobs(30, 15, 15),
-setup_db() ->
- DbName = ?tempdb(),
- {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
- ok = couch_db:close(Db),
- DbName.
-
-
-teardown_db(DbName) ->
- ok = couch_server:delete(DbName, [?ADMIN_CTX]),
- ok.
-
-
-stop_job(RepPid) ->
- Ref = erlang:monitor(process, RepPid),
- gen_server:cast(couch_replicator_scheduler, {set_max_jobs, 0}),
- couch_replicator_scheduler:reschedule(),
- receive
- {'DOWN', Ref, _, _, _} -> ok
- after ?TIMEOUT ->
- erlang:error(timeout)
- end.
-
-
-start_job() ->
- gen_server:cast(couch_replicator_scheduler, {set_max_jobs, 500}),
- couch_replicator_scheduler:reschedule().
-
-
-check_active_tasks(DocsRead, DocsWritten, DocsFailed) ->
- RepTask = wait_for_task_status(),
- ?assertNotEqual(timeout, RepTask),
- ?assertEqual(DocsRead, couch_util:get_value(docs_read, RepTask)),
- ?assertEqual(DocsWritten, couch_util:get_value(docs_written, RepTask)),
- ?assertEqual(DocsFailed, couch_util:get_value(doc_write_failures,
- RepTask)).
+ cancel(RepId, Pid3).
check_scheduler_jobs(DocsRead, DocsWritten, DocFailed) ->
- Info = wait_scheduler_info(),
+ Info = wait_scheduler_info(DocsRead),
?assert(maps:is_key(<<"changes_pending">>, Info)),
?assert(maps:is_key(<<"doc_write_failures">>, Info)),
?assert(maps:is_key(<<"docs_read">>, Info)),
@@ -161,27 +94,18 @@ check_scheduler_jobs(DocsRead, DocsWritten, DocFailed) ->
?assertMatch(#{<<"doc_write_failures">> := DocFailed}, Info).
-replication_tasks() ->
- lists:filter(fun(P) ->
- couch_util:get_value(type, P) =:= replication
- end, couch_task_status:all()).
-
-
-wait_for_task_status() ->
+wait_scheduler_info(DocsRead) ->
test_util:wait(fun() ->
- case replication_tasks() of
- [] -> wait;
- [RepTask] -> RepTask
- end
- end).
-
-
-wait_scheduler_info() ->
- test_util:wait(fun() ->
- case scheduler_jobs() of
- [] -> wait;
- [#{<<"info">> := null}] -> wait;
- [#{<<"info">> := Info}] -> Info
+ case couch_replicator_test_helper:scheduler_jobs() of
+ [] ->
+ wait;
+ [#{<<"info">> := null}] ->
+ wait;
+ [#{<<"info">> := Info}] ->
+ case Info of
+ #{<<"docs_read">> := DocsRead} -> Info;
+ #{} -> wait
+ end
end
end).
@@ -197,16 +121,12 @@ populate_db_reject_even_docs(DbName, Start, End) ->
populate_db(DbName, Start, End, BodyFun) when is_function(BodyFun, 1) ->
- {ok, Db} = couch_db:open_int(DbName, []),
- Docs = lists:foldl(
- fun(DocIdCounter, Acc) ->
- Id = integer_to_binary(DocIdCounter),
- Doc = #doc{id = Id, body = BodyFun(DocIdCounter)},
- [Doc | Acc]
- end,
- [], lists:seq(Start, End)),
- {ok, _} = couch_db:update_docs(Db, Docs, []),
- ok = couch_db:close(Db).
+ Docs = lists:foldl(fun(DocIdCounter, Acc) ->
+ Id = integer_to_binary(DocIdCounter),
+ Doc = #doc{id = Id, body = BodyFun(DocIdCounter)},
+ [Doc | Acc]
+ end, [], lists:seq(Start, End)),
+ couch_replicator_test_helper:create_docs(DbName, Docs).
wait_target_in_sync(DocCount, Target) when is_integer(DocCount) ->
@@ -215,14 +135,13 @@ wait_target_in_sync(DocCount, Target) when is_integer(DocCount) ->
wait_target_in_sync_loop(_DocCount, _TargetName, 0) ->
erlang:error({assertion_failed, [
- {module, ?MODULE}, {line, ?LINE},
- {reason, "Could not get source and target databases in sync"}
+ {module, ?MODULE}, {line, ?LINE},
+ {reason, "Could not get source and target databases in sync"}
]});
wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft) ->
- {ok, Target} = couch_db:open_int(TargetName, []),
- {ok, TargetInfo} = couch_db:get_db_info(Target),
- ok = couch_db:close(Target),
+ {ok, Db} = fabric2_db:open(TargetName, [?ADMIN_CTX]),
+ {ok, TargetInfo} = fabric2_db:get_db_info(Db),
TargetDocCount = couch_util:get_value(doc_count, TargetInfo),
case TargetDocCount == DocCount of
true ->
@@ -234,27 +153,11 @@ wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft) ->
replicate(Source, Target) ->
- SrcUrl = couch_replicator_test_helper:db_url(Source),
- TgtUrl = couch_replicator_test_helper:db_url(Target),
- RepObject = {[
- {<<"source">>, SrcUrl},
- {<<"target">>, TgtUrl},
- {<<"continuous">>, true}
- ]},
- {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
- ok = couch_replicator_scheduler:add_job(Rep),
- couch_replicator_scheduler:reschedule(),
- Pid = couch_replicator_test_helper:get_pid(Rep#rep.id),
- {ok, Pid, Rep#rep.id}.
-
-
-scheduler_jobs() ->
- Addr = config:get("chttpd", "bind_address", "127.0.0.1"),
- Port = mochiweb_socket_server:get(chttpd, port),
- Url = lists:flatten(io_lib:format("http://~s:~b/_scheduler/jobs", [Addr, Port])),
- {ok, 200, _, Body} = test_request:get(Url, []),
- Json = jiffy:decode(Body, [return_maps]),
- maps:get(<<"jobs">>, Json).
+ couch_replicator_test_helper:replicate_continuous(Source, Target).
+
+
+cancel(RepId, Pid) ->
+ couch_replicator_test_helper:cancel(RepId, Pid).
vdu() ->
@@ -274,9 +177,5 @@ add_vdu(DbName) ->
{<<"validate_doc_update">>, vdu()}
],
Doc = couch_doc:from_json_obj({DocProps}, []),
- {ok, Db} = couch_db:open_int(DbName, [?ADMIN_CTX]),
- try
- {ok, _Rev} = couch_db:update_doc(Db, Doc, [])
- after
- couch_db:close(Db)
- end.
+ {ok, Db} = fabric2_db:open(DbName, [?ADMIN_CTX]),
+ {ok, _} = fabric2_db:update_doc(Db, Doc, []).
diff --git a/src/couch_replicator/test/eunit/couch_replicator_selector_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_selector_tests.erl
index 5026c1435..5dfe4ba91 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_selector_tests.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_selector_tests.erl
@@ -15,103 +15,69 @@
-include_lib("couch/include/couch_eunit.hrl").
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_replicator/src/couch_replicator.hrl").
+-include_lib("fabric/test/fabric2_test.hrl").
-setup(_) ->
- Ctx = test_util:start_couch([couch_replicator]),
- Source = create_db(),
- create_docs(Source),
- Target = create_db(),
- {Ctx, {Source, Target}}.
-
-teardown(_, {Ctx, {Source, Target}}) ->
- delete_db(Source),
- delete_db(Target),
- ok = application:stop(couch_replicator),
- ok = test_util:stop_couch(Ctx).
-
selector_replication_test_() ->
- Pairs = [{remote, remote}],
{
"Selector filtered replication tests",
{
- foreachx,
- fun setup/1, fun teardown/2,
- [{Pair, fun should_succeed/2} || Pair <- Pairs]
+ setup,
+ fun couch_replicator_test_helper:start_couch/0,
+ fun couch_replicator_test_helper:stop_couch/1,
+ {
+ foreach,
+ fun setup/0,
+ fun teardown/1,
+ [
+ ?TDEF_FE(should_replicate_with_selector)
+ ]
+ }
}
}.
-should_succeed({From, To}, {_Ctx, {Source, Target}}) ->
- RepObject = {[
- {<<"source">>, db_url(From, Source)},
- {<<"target">>, db_url(To, Target)},
- {<<"selector">>, {[{<<"_id">>, <<"doc2">>}]}}
- ]},
- {ok, _} = couch_replicator:replicate(RepObject, ?ADMIN_USER),
- %% FilteredFun is an Erlang version of following mango selector
- FilterFun = fun(_DocId, {Props}) ->
- couch_util:get_value(<<"_id">>, Props) == <<"doc2">>
- end,
- {ok, TargetDbInfo, AllReplies} = compare_dbs(Source, Target, FilterFun),
- {lists:flatten(io_lib:format("~p -> ~p", [From, To])), [
- {"Target DB has proper number of docs",
- ?_assertEqual(1, proplists:get_value(doc_count, TargetDbInfo))},
- {"All the docs selected as expected",
- ?_assert(lists:all(fun(Valid) -> Valid end, AllReplies))}
- ]}.
-compare_dbs(Source, Target, FilterFun) ->
- {ok, SourceDb} = couch_db:open_int(Source, []),
- {ok, TargetDb} = couch_db:open_int(Target, []),
- {ok, TargetDbInfo} = couch_db:get_db_info(TargetDb),
- Fun = fun(FullDocInfo, Acc) ->
- {ok, DocId, SourceDoc} = read_doc(SourceDb, FullDocInfo),
- TargetReply = read_doc(TargetDb, DocId),
- case FilterFun(DocId, SourceDoc) of
- true ->
- ValidReply = {ok, DocId, SourceDoc} == TargetReply,
- {ok, [ValidReply|Acc]};
- false ->
- ValidReply = {not_found, missing} == TargetReply,
- {ok, [ValidReply|Acc]}
- end
- end,
- {ok, AllReplies} = couch_db:fold_docs(SourceDb, Fun, [], []),
- ok = couch_db:close(SourceDb),
- ok = couch_db:close(TargetDb),
- {ok, TargetDbInfo, AllReplies}.
+setup() ->
+ Source = couch_replicator_test_helper:create_db(),
+ create_docs(Source),
+ Target = couch_replicator_test_helper:create_db(),
+ {Source, Target}.
-read_doc(Db, DocIdOrInfo) ->
- case couch_db:open_doc(Db, DocIdOrInfo) of
- {ok, Doc} ->
- {Props} = couch_doc:to_json_obj(Doc, [attachments]),
- DocId = couch_util:get_value(<<"_id">>, Props),
- {ok, DocId, {Props}};
- Error ->
- Error
- end.
-create_db() ->
- DbName = ?tempdb(),
- {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
- ok = couch_db:close(Db),
- DbName.
+teardown({Source, Target}) ->
+ couch_replicator_test_helper:delete_db(Source),
+ couch_replicator_test_helper:delete_db(Target).
-create_docs(DbName) ->
- {ok, Db} = couch_db:open(DbName, [?ADMIN_CTX]),
- Doc1 = couch_doc:from_json_obj({[
- {<<"_id">>, <<"doc1">>}
- ]}),
- Doc2 = couch_doc:from_json_obj({[
- {<<"_id">>, <<"doc2">>}
- ]}),
- {ok, _} = couch_db:update_docs(Db, [Doc1, Doc2]),
- couch_db:close(Db).
-delete_db(DbName) ->
- ok = couch_server:delete(DbName, [?ADMIN_CTX]).
+should_replicate_with_selector({Source, Target}) ->
+ RepObject = #{
+ <<"source">> => Source,
+ <<"target">> => Target,
+ <<"selector">> => #{
+ <<"_id">> => <<"doc2">>
+ }
+ },
+ ?assertMatch({ok, _}, couch_replicator_test_helper:replicate(RepObject)),
+ {ok, TargetDbInfo, AllReplies} = compare_dbs(Source, Target),
+ ?assertEqual(1, proplists:get_value(doc_count, TargetDbInfo)),
+ ?assert(lists:all(fun(Valid) -> Valid end, AllReplies)).
+
-db_url(remote, DbName) ->
- Addr = config:get("httpd", "bind_address", "127.0.0.1"),
- Port = mochiweb_socket_server:get(couch_httpd, port),
- ?l2b(io_lib:format("http://~s:~b/~s", [Addr, Port, DbName])).
+compare_dbs(Source, Target) ->
+ {ok, TargetDb} = fabric2_db:open(Target, []),
+ {ok, TargetDbInfo} = fabric2_db:get_db_info(TargetDb),
+ Fun = fun(SrcDoc, TgtDoc, Acc) ->
+ case SrcDoc#doc.id == <<"doc2">> of
+ true -> [SrcDoc#doc.body == TgtDoc#doc.body | Acc];
+ false -> [not_found == TgtDoc | Acc]
+ end
+ end,
+ Res = couch_replicator_test_helper:compare_fold(Source, Target, Fun, []),
+ {ok, TargetDbInfo, Res}.
+
+
+create_docs(DbName) ->
+ couch_replicator_test_helper:create_docs(DbName, [
+ #{<<"_id">> => <<"doc1">>},
+ #{<<"_id">> => <<"doc2">>}
+ ]).
diff --git a/src/couch_replicator/test/eunit/couch_replicator_small_max_request_size_target.erl b/src/couch_replicator/test/eunit/couch_replicator_small_max_request_size_target.erl
index 8aebbe151..b113c5392 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_small_max_request_size_target.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_small_max_request_size_target.erl
@@ -1,139 +1,70 @@
-module(couch_replicator_small_max_request_size_target).
+
-include_lib("couch/include/couch_eunit.hrl").
-include_lib("couch/include/couch_db.hrl").
-
--import(couch_replicator_test_helper, [
- db_url/1,
- replicate/1,
- compare_dbs/3
-]).
-
--define(TIMEOUT_EUNIT, 360).
-
-
-setup() ->
- DbName = ?tempdb(),
- {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
- ok = couch_db:close(Db),
- DbName.
-
-
-setup(remote) ->
- {remote, setup()};
-
-setup({A, B}) ->
- Ctx = test_util:start_couch([couch_replicator]),
- config:set("httpd", "max_http_request_size", "10000", false),
- Source = setup(A),
- Target = setup(B),
- {Ctx, {Source, Target}}.
-
-
-teardown({remote, DbName}) ->
- teardown(DbName);
-teardown(DbName) ->
- ok = couch_server:delete(DbName, [?ADMIN_CTX]),
- ok.
-
-teardown(_, {Ctx, {Source, Target}}) ->
- teardown(Source),
- teardown(Target),
- ok = application:stop(couch_replicator),
- ok = test_util:stop_couch(Ctx).
+-include_lib("fabric/test/fabric2_test.hrl").
reduce_max_request_size_test_() ->
- Pairs = [{remote, remote}],
{
"Replicate docs when target has a small max_http_request_size",
{
- foreachx,
- fun setup/1, fun teardown/2,
- [{Pair, fun should_replicate_all_docs/2}
- || Pair <- Pairs]
- ++ [{Pair, fun should_replicate_one/2}
- || Pair <- Pairs]
- % Disabled. See issue 574. Sometimes PUTs with a doc and
- % attachment which exceed maximum request size are simply
- % closed instead of returning a 413 request. That makes these
- % tests flaky.
- ++ [{Pair, fun should_replicate_one_with_attachment/2}
- || Pair <- Pairs]
+ setup,
+ fun couch_replicator_test_helper:start_couch/0,
+ fun couch_replicator_test_helper:stop_couch/1,
+ {
+ foreach,
+ fun setup/0,
+ fun teardown/1,
+ [
+ ?TDEF_FE(should_replicate_all_docs, 120),
+ ?TDEF_FE(should_replicate_one, 120),
+ ?TDEF_FE(should_replicate_one_with_attachment, 120)
+ ]
+ }
}
}.
-% Test documents which are below max_http_request_size but when batched, batch size
-% will be greater than max_http_request_size. Replicator could automatically split
-% the batch into smaller batches and POST those separately.
-should_replicate_all_docs({From, To}, {_Ctx, {Source, Target}}) ->
- {lists:flatten(io_lib:format("~p -> ~p", [From, To])),
- {inorder, [should_populate_source(Source),
- should_replicate(Source, Target),
- should_compare_databases(Source, Target, [])]}}.
-
-
-% If a document is too large to post as a single request, that document is
-% skipped but replication overall will make progress and not crash.
-should_replicate_one({From, To}, {_Ctx, {Source, Target}}) ->
- {lists:flatten(io_lib:format("~p -> ~p", [From, To])),
- {inorder, [should_populate_source_one_large_one_small(Source),
- should_replicate(Source, Target),
- should_compare_databases(Source, Target, [<<"doc0">>])]}}.
-
-
-% If a document has an attachment > 64 * 1024 bytes, replicator will switch to
-% POST-ing individual documents directly and skip bulk_docs. Test that case
-% separately
-% See note in main test function why this was disabled.
-should_replicate_one_with_attachment({From, To}, {_Ctx, {Source, Target}}) ->
- {lists:flatten(io_lib:format("~p -> ~p", [From, To])),
- {inorder, [should_populate_source_one_large_attachment(Source),
- should_populate_source(Source),
- should_replicate(Source, Target),
- should_compare_databases(Source, Target, [<<"doc0">>])]}}.
-
-
-should_populate_source({remote, Source}) ->
- should_populate_source(Source);
-
-should_populate_source(Source) ->
- {timeout, ?TIMEOUT_EUNIT, ?_test(add_docs(Source, 5, 3000, 0))}.
-
-
-should_populate_source_one_large_one_small({remote, Source}) ->
- should_populate_source_one_large_one_small(Source);
-
-should_populate_source_one_large_one_small(Source) ->
- {timeout, ?TIMEOUT_EUNIT, ?_test(one_large_one_small(Source, 12000, 3000))}.
-
-
-should_populate_source_one_large_attachment({remote, Source}) ->
- should_populate_source_one_large_attachment(Source);
-
-should_populate_source_one_large_attachment(Source) ->
- {timeout, ?TIMEOUT_EUNIT, ?_test(one_large_attachment(Source, 70000, 70000))}.
+setup() ->
+ Source = couch_replicator_test_helper:create_db(),
+ Target = couch_replicator_test_helper:create_db(),
+ config:set("httpd", "max_http_request_size", "10000", false),
+ {Source, Target}.
-should_replicate({remote, Source}, Target) ->
- should_replicate(db_url(Source), Target);
+teardown({Source, Target}) ->
+ config:delete("httpd", "max_http_request_size", false),
+ couch_replicator_test_helper:delete_db(Source),
+ couch_replicator_test_helper:delete_db(Target).
-should_replicate(Source, {remote, Target}) ->
- should_replicate(Source, db_url(Target));
-should_replicate(Source, Target) ->
- {timeout, ?TIMEOUT_EUNIT, ?_test(replicate(Source, Target))}.
+% Test documents which are below max_http_request_size but when batched, batch
+% size will be greater than max_http_request_size. Replicator could
+% automatically split the batch into smaller batches and POST those separately.
+should_replicate_all_docs({Source, Target}) ->
+ ?assertEqual(ok, add_docs(Source, 5, 3000, 0)),
+ replicate(Source, Target),
+ compare_dbs(Source, Target, []).
-should_compare_databases({remote, Source}, Target, ExceptIds) ->
- should_compare_databases(Source, Target, ExceptIds);
+% If a document is too large to post as a single request, that document is
+% skipped but replication overall will make progress and not crash.
+should_replicate_one({Source, Target}) ->
+ ?assertEqual(ok, one_large_one_small(Source, 12000, 3000)),
+ replicate(Source, Target),
+ compare_dbs(Source, Target, [<<"doc0">>]).
-should_compare_databases(Source, {remote, Target}, ExceptIds) ->
- should_compare_databases(Source, Target, ExceptIds);
-should_compare_databases(Source, Target, ExceptIds) ->
- {timeout, ?TIMEOUT_EUNIT, ?_test(compare_dbs(Source, Target, ExceptIds))}.
+% If a document has an attachment > 64 * 1024 bytes, replicator will switch to
+% POST-ing individual documents directly and skip bulk_docs. Test that case
+% separately See note in main test function why this was disabled.
+should_replicate_one_with_attachment({Source, Target}) ->
+ ?assertEqual(ok, one_large_attachment(Source, 70000, 70000)),
+ ?assertEqual(ok, add_docs(Source, 5, 3000, 0)),
+ replicate(Source, Target),
+ compare_dbs(Source, Target, [<<"doc0">>]).
binary_chunk(Size) when is_integer(Size), Size > 0 ->
@@ -150,19 +81,21 @@ add_docs(DbName, DocCount, DocSize, AttSize) ->
one_large_one_small(DbName, Large, Small) ->
add_doc(DbName, <<"doc0">>, Large, 0),
- add_doc(DbName, <<"doc1">>, Small, 0).
+ add_doc(DbName, <<"doc1">>, Small, 0),
+ ok.
one_large_attachment(DbName, Size, AttSize) ->
- add_doc(DbName, <<"doc0">>, Size, AttSize).
+ add_doc(DbName, <<"doc0">>, Size, AttSize),
+ ok.
add_doc(DbName, DocId, Size, AttSize) when is_binary(DocId) ->
- {ok, Db} = couch_db:open_int(DbName, []),
- Doc0 = #doc{id = DocId, body = {[{<<"x">>, binary_chunk(Size)}]}},
- Doc = Doc0#doc{atts = atts(AttSize)},
- {ok, _} = couch_db:update_doc(Db, Doc, []),
- couch_db:close(Db).
+ {ok, Db} = fabric2_db:open(DbName, [?ADMIN_CTX]),
+ Doc0 = #doc{id = DocId, body = {[{<<"x">>, binary_chunk(Size)}]}},
+ Doc = Doc0#doc{atts = atts(AttSize)},
+ {ok, _} = fabric2_db:update_doc(Db, Doc, []),
+ ok.
atts(0) ->
@@ -178,8 +111,13 @@ atts(Size) ->
replicate(Source, Target) ->
- replicate({[
- {<<"source">>, Source},
- {<<"target">>, Target},
- {<<"worker_processes">>, "1"} % This make batch_size predictable
- ]}).
+ ?assertMatch({ok, _}, couch_replicator_test_helper:replicate(#{
+ <<"source">> => Source,
+ <<"target">> => Target,
+ <<"worker_processes">> => 1 % This make batch_size predictable
+ })).
+
+
+compare_dbs(Source, Target, ExceptIds) ->
+ ?assertEqual(ok, couch_replicator_test_helper:compare_dbs(Source, Target,
+ ExceptIds)).
diff --git a/src/couch_replicator/test/eunit/couch_replicator_test_helper.erl b/src/couch_replicator/test/eunit/couch_replicator_test_helper.erl
index fd0409164..2ac447eb3 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_test_helper.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_test_helper.erl
@@ -1,51 +1,166 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
-module(couch_replicator_test_helper).
--include_lib("couch/include/couch_eunit.hrl").
--include_lib("couch/include/couch_db.hrl").
--include_lib("couch_replicator/src/couch_replicator.hrl").
-export([
+ start_couch/0,
+ stop_couch/1,
+
+ create_db/0,
+ create_db/1,
+ delete_db/1,
+
+ server_url/0,
+ db_url/1,
+
+ create_docs/2,
+
compare_dbs/2,
compare_dbs/3,
- db_url/1,
- replicate/1,
+ compare_fold/4,
+
+ compare_docs/2,
+
get_pid/1,
- replicate/2
+
+ replicate/1,
+ replicate/2,
+ replicate_continuous/1,
+ replicate_continuous/2,
+
+ cancel/1,
+ cancel/2,
+
+ scheduler_jobs/0
]).
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_replicator/src/couch_replicator.hrl").
+
+
+-define(USERNAME, "rep_eunit_admin").
+-define(PASSWORD, "rep_eunit_password").
+
+
+start_couch() ->
+ Ctx = test_util:start_couch([fabric, chttpd, couch_replicator]),
+ Hashed = couch_passwords:hash_admin_password(?PASSWORD),
+ ok = config:set("admins", ?USERNAME, ?b2l(Hashed), _Persist = false),
+ Ctx.
+
+
+stop_couch(Ctx) ->
+ config:delete("admins", ?USERNAME, _Persist = false),
+ test_util:stop_couch(Ctx).
+
+
+create_db() ->
+ {ok, Db} = fabric2_db:create(?tempdb(), [?ADMIN_CTX]),
+ fabric2_db:name(Db).
+
+
+create_db(DbName) when is_binary(DbName) ->
+ {ok, Db} = fabric2_db:create(DbName, [?ADMIN_CTX]),
+ fabric2_db:name(Db).
+
+
+delete_db(DbName) ->
+ try
+ ok = fabric2_db:delete(DbName, [?ADMIN_CTX])
+ catch
+ error:database_does_not_exist ->
+ ok
+ end.
+
+
+server_url() ->
+ Addr = config:get("chttpd", "bind_address", "127.0.0.1"),
+ Port = mochiweb_socket_server:get(chttpd, port),
+ Fmt = "http://~s:~s@~s:~b",
+ ?l2b(io_lib:format(Fmt, [?USERNAME, ?PASSWORD, Addr, Port])).
+
+
+db_url(DbName) ->
+ ?l2b(io_lib:format("~s/~s", [server_url(), DbName])).
+
+
+create_docs(DbName, Docs) when is_binary(DbName), is_list(Docs) ->
+ {ok, Db} = fabric2_db:open(DbName, [?ADMIN_CTX]),
+ Docs1 = lists:map(fun(Doc) ->
+ case Doc of
+ #{} ->
+ Doc1 = couch_util:json_decode(couch_util:json_encode(Doc)),
+ couch_doc:from_json_obj(Doc1);
+ #doc{} ->
+ Doc
+ end
+ end, Docs),
+ {ok, ResList} = fabric2_db:update_docs(Db, Docs1),
+ lists:foreach(fun(Res) ->
+ ?assertMatch({ok, {_, Rev}} when is_binary(Rev), Res)
+ end, ResList).
+
+
compare_dbs(Source, Target) ->
- compare_dbs(Source, Target, []).
-
-
-compare_dbs(Source, Target, ExceptIds) ->
- {ok, SourceDb} = couch_db:open_int(Source, []),
- {ok, TargetDb} = couch_db:open_int(Target, []),
-
- Fun = fun(FullDocInfo, Acc) ->
- {ok, DocSource} = couch_db:open_doc(SourceDb, FullDocInfo),
- Id = DocSource#doc.id,
- case lists:member(Id, ExceptIds) of
- true ->
- ?assertEqual(not_found, couch_db:get_doc_info(TargetDb, Id));
- false ->
- {ok, TDoc} = couch_db:open_doc(TargetDb, Id),
- compare_docs(DocSource, TDoc)
+ Fun = fun(SrcDoc, TgtDoc, ok) -> compare_docs(SrcDoc, TgtDoc) end,
+ compare_fold(Source, Target, Fun, ok).
+
+
+compare_dbs(Source, Target, ExceptIds) when is_binary(Source),
+ is_binary(Target), is_list(ExceptIds) ->
+ Fun = fun(SrcDoc, TgtDoc, ok) ->
+ case lists:member(SrcDoc#doc.id, ExceptIds) of
+ true -> ?assertEqual(not_found, TgtDoc);
+ false -> compare_docs(SrcDoc, TgtDoc)
end,
- {ok, Acc}
+ ok
end,
+ compare_fold(Source, Target, Fun, ok).
+
- {ok, _} = couch_db:fold_docs(SourceDb, Fun, [], []),
- ok = couch_db:close(SourceDb),
- ok = couch_db:close(TargetDb).
+compare_fold(Source, Target, Fun, Acc0) when
+ is_binary(Source), is_binary(Target), is_function(Fun, 3) ->
+ {ok, SourceDb} = fabric2_db:open(Source, [?ADMIN_CTX]),
+ {ok, TargetDb} = fabric2_db:open(Target, [?ADMIN_CTX]),
+ fabric2_fdb:transactional(SourceDb, fun(TxSourceDb) ->
+ FoldFun = fun
+ ({meta, _Meta}, Acc) ->
+ {ok, Acc};
+ (complete, Acc) ->
+ {ok, Acc};
+ ({row, Row}, Acc) ->
+ {_, Id} = lists:keyfind(id, 1, Row),
+ SrcDoc = open_doc(TxSourceDb, Id),
+ TgtDoc = open_doc(TargetDb, Id),
+ {ok, Fun(SrcDoc, TgtDoc, Acc)}
+ end,
+ Opts = [{restart_tx, true}],
+ {ok, AccF} = fabric2_db:fold_docs(TxSourceDb, FoldFun, Acc0, Opts),
+ AccF
+ end).
-compare_docs(Doc1, Doc2) ->
+compare_docs(#doc{} = Doc1, Doc2) when
+ is_record(Doc2, doc) orelse Doc2 =:= not_found ->
+ ?assert(Doc2 =/= not_found),
?assertEqual(Doc1#doc.body, Doc2#doc.body),
#doc{atts = Atts1} = Doc1,
#doc{atts = Atts2} = Doc2,
?assertEqual(lists:sort([couch_att:fetch(name, Att) || Att <- Atts1]),
- lists:sort([couch_att:fetch(name, Att) || Att <- Atts2])),
+ lists:sort([couch_att:fetch(name, Att) || Att <- Atts2])),
FunCompareAtts = fun(Att) ->
AttName = couch_att:fetch(name, Att),
{ok, AttTarget} = find_att(Atts2, AttName),
@@ -68,19 +183,109 @@ compare_docs(Doc1, Doc2) ->
?assert(is_integer(couch_att:fetch(disk_len, AttTarget))),
?assert(is_integer(couch_att:fetch(att_len, AttTarget))),
?assertEqual(couch_att:fetch(disk_len, Att),
- couch_att:fetch(disk_len, AttTarget)),
+ couch_att:fetch(disk_len, AttTarget)),
?assertEqual(couch_att:fetch(att_len, Att),
- couch_att:fetch(att_len, AttTarget)),
+ couch_att:fetch(att_len, AttTarget)),
?assertEqual(couch_att:fetch(type, Att),
- couch_att:fetch(type, AttTarget)),
+ couch_att:fetch(type, AttTarget)),
?assertEqual(couch_att:fetch(md5, Att),
- couch_att:fetch(md5, AttTarget))
+ couch_att:fetch(md5, AttTarget))
end,
lists:foreach(FunCompareAtts, Atts1).
+get_pid(RepId) ->
+ JobId = case couch_replicator_jobs:get_job_id(undefined, RepId) of
+ {ok, JobId0} -> JobId0;
+ {error, not_found} -> RepId
+ end,
+ {ok, #{<<"state">> := <<"running">>, <<"pid">> := Pid0}} =
+ couch_replicator_jobs:get_job_data(undefined, JobId),
+ Pid = list_to_pid(binary_to_list(Pid0)),
+ ?assert(is_pid(Pid)),
+ ?assert(is_process_alive(Pid)),
+ Pid.
+
+
+replicate({[_ | _]} = EJson) ->
+ Str = couch_util:json_encode(EJson),
+ replicate(couch_util:json_decode(Str, [return_maps]));
+
+replicate(#{} = Rep0) ->
+ Rep = maybe_db_urls(Rep0),
+ {ok, Id, _} = couch_replicator_parse:parse_transient_rep(Rep, null),
+ ok = cancel(Id),
+ try
+ couch_replicator:replicate(Rep, ?ADMIN_USER)
+ after
+ ok = cancel(Id)
+ end.
+
+
+replicate(Source, Target) ->
+ replicate(#{
+ <<"source">> => Source,
+ <<"target">> => Target
+ }).
+
+
+replicate_continuous({[_ | _]} = EJson) ->
+ Str = couch_util:json_encode(EJson),
+ replicate_continuous(couch_util:json_decode(Str, [return_maps]));
+
+replicate_continuous(#{<<"continuous">> := true} = Rep0) ->
+ Rep = maybe_db_urls(Rep0),
+ {ok, {continuous, RepId}} = couch_replicator:replicate(Rep, ?ADMIN_USER),
+ {ok, get_pid(RepId), RepId}.
+
+
+replicate_continuous(Source, Target) ->
+ replicate_continuous(#{
+ <<"source">> => Source,
+ <<"target">> => Target,
+ <<"continuous">> => true
+ }).
+
+
+cancel(Id) when is_binary(Id) ->
+ CancelRep = #{<<"cancel">> => true, <<"id">> => Id},
+ case couch_replicator:replicate(CancelRep, ?ADMIN_USER) of
+ {ok, {cancelled, <<_/binary>>}} -> ok;
+ {error, not_found} -> ok
+ end.
+
+
+cancel(Id, Pid) when is_pid(Pid), is_binary(Id) ->
+ Ref = monitor(process, Pid),
+ try
+ cancel(Id)
+ after
+ receive
+ {'DOWN', Ref, _, _, _} -> ok
+ after 60000 ->
+ error(replicator_pid_death_timeout)
+ end
+ end.
+
+
+scheduler_jobs() ->
+ ServerUrl = couch_replicator_test_helper:server_url(),
+ Url = lists:flatten(io_lib:format("~s/_scheduler/jobs", [ServerUrl])),
+ {ok, 200, _, Body} = test_request:get(Url, []),
+ Json = jiffy:decode(Body, [return_maps]),
+ maps:get(<<"jobs">>, Json).
+
+
+open_doc(Db, DocId) ->
+ case fabric2_db:open_doc(Db, DocId, []) of
+ {ok, #doc{deleted = false} = Doc} -> Doc;
+ {not_found, missing} -> not_found
+ end.
+
+
find_att([], _Name) ->
nil;
+
find_att([Att | Rest], Name) ->
case couch_att:fetch(name, Att) of
Name ->
@@ -91,45 +296,29 @@ find_att([Att | Rest], Name) ->
att_md5(Att) ->
- Md50 = couch_att:foldl(
- Att,
- fun(Chunk, Acc) -> couch_hash:md5_hash_update(Acc, Chunk) end,
- couch_hash:md5_hash_init()),
+ Md50 = couch_att:foldl(Att, fun(Chunk, Acc) ->
+ couch_hash:md5_hash_update(Acc, Chunk)
+ end, couch_hash:md5_hash_init()),
couch_hash:md5_hash_final(Md50).
+
att_decoded_md5(Att) ->
- Md50 = couch_att:foldl_decode(
- Att,
- fun(Chunk, Acc) -> couch_hash:md5_hash_update(Acc, Chunk) end,
- couch_hash:md5_hash_init()),
+ Md50 = couch_att:foldl_decode(Att, fun(Chunk, Acc) ->
+ couch_hash:md5_hash_update(Acc, Chunk)
+ end, couch_hash:md5_hash_init()),
couch_hash:md5_hash_final(Md50).
-db_url(DbName) ->
- iolist_to_binary([
- "http://", config:get("httpd", "bind_address", "127.0.0.1"),
- ":", integer_to_list(mochiweb_socket_server:get(couch_httpd, port)),
- "/", DbName
- ]).
-
-get_pid(RepId) ->
- Pid = global:whereis_name({couch_replicator_scheduler_job,RepId}),
- ?assert(is_pid(Pid)),
- Pid.
-replicate(Source, Target) ->
- replicate({[
- {<<"source">>, Source},
- {<<"target">>, Target}
- ]}).
-
-replicate({[_ | _]} = RepObject) ->
- {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
- ok = couch_replicator_scheduler:add_job(Rep),
- couch_replicator_scheduler:reschedule(),
- Pid = get_pid(Rep#rep.id),
- MonRef = erlang:monitor(process, Pid),
- receive
- {'DOWN', MonRef, process, Pid, _} ->
- ok
+maybe_db_urls(#{} = Rep) ->
+ #{<<"source">> := Src, <<"target">> := Tgt} = Rep,
+ Src1 = case Src of
+ <<"http://", _/binary>> -> Src;
+ <<"https://", _/binary>> -> Src;
+ <<_/binary>> -> db_url(Src)
+ end,
+ Tgt1 = case Tgt of
+ <<"http://", _/binary>> -> Tgt;
+ <<"https://", _/binary>> -> Tgt;
+ <<_/binary>> -> db_url(Tgt)
end,
- ok = couch_replicator_scheduler:remove_job(Rep#rep.id).
+ Rep#{<<"source">> := Src1, <<"target">> := Tgt1}.
diff --git a/src/couch_replicator/test/eunit/couch_replicator_transient_jobs_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_transient_jobs_tests.erl
new file mode 100644
index 000000000..25fc6a3ff
--- /dev/null
+++ b/src/couch_replicator/test/eunit/couch_replicator_transient_jobs_tests.erl
@@ -0,0 +1,106 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_transient_jobs_tests).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_replicator/src/couch_replicator.hrl").
+-include_lib("fabric/test/fabric2_test.hrl").
+
+
+transient_jobs_test_() ->
+ {
+ "Transient jobs tests",
+ {
+ setup,
+ fun couch_replicator_test_helper:start_couch/0,
+ fun couch_replicator_test_helper:stop_couch/1,
+ {
+ foreach,
+ fun setup/0,
+ fun teardown/1,
+ [
+ ?TDEF_FE(transient_job_is_removed, 10),
+ ?TDEF_FE(posting_same_job_is_a_noop, 10)
+ ]
+ }
+ }
+ }.
+
+
+setup() ->
+ Source = couch_replicator_test_helper:create_db(),
+ couch_replicator_test_helper:create_docs(Source, [
+ #{<<"_id">> => <<"doc1">>}
+ ]),
+ Target = couch_replicator_test_helper:create_db(),
+ config:set("replicator", "stats_update_interval_sec", "0", false),
+ config:set("replicator", "transient_job_max_age_sec", "9999", false),
+ {Source, Target}.
+
+
+teardown({Source, Target}) ->
+ config:delete("replicator", "stats_update_interval_sec", false),
+ config:delete("replicator", "transient_job_max_age_sec", false),
+ couch_replicator_test_helper:delete_db(Source),
+ couch_replicator_test_helper:delete_db(Target).
+
+
+transient_job_is_removed({Source, Target}) ->
+ {ok, #{}} = replicate(Source, Target),
+ JobId = get_rep_id(Source, Target),
+
+ couch_replicator_job_server:reschedule(),
+
+ % Still there after clean up attempt ran
+ ?assertMatch({200, #{}}, scheduler_jobs(JobId)),
+
+ config:set("replicator", "transient_job_max_age_sec", "0", false),
+ couch_replicator_job_server:reschedule(),
+
+ % Should be gone now
+ ?assertMatch({404, #{}}, scheduler_jobs(JobId)).
+
+
+posting_same_job_is_a_noop({Source, Target}) ->
+ {ok, Pid1, RepId1} = replicate_continuous(Source, Target),
+ {ok, Pid2, RepId2} = replicate_continuous(Source, Target),
+ ?assertEqual(RepId1, RepId2),
+ ?assertEqual(Pid1, Pid2),
+ couch_replicator_test_helper:cancel(RepId1).
+
+
+get_rep_id(Source, Target) ->
+ {ok, Id, _} = couch_replicator_parse:parse_transient_rep(#{
+ <<"source">> => couch_replicator_test_helper:db_url(Source),
+ <<"target">> => couch_replicator_test_helper:db_url(Target)
+ }, null),
+ Id.
+
+
+replicate(Source, Target) ->
+ couch_replicator:replicate(#{
+ <<"source">> => couch_replicator_test_helper:db_url(Source),
+ <<"target">> => couch_replicator_test_helper:db_url(Target)
+ }, ?ADMIN_USER).
+
+
+replicate_continuous(Source, Target) ->
+ couch_replicator_test_helper:replicate_continuous(Source, Target).
+
+
+scheduler_jobs(Id) ->
+ SUrl = couch_replicator_test_helper:server_url(),
+ Url = lists:flatten(io_lib:format("~s/_scheduler/jobs/~s", [SUrl, Id])),
+ {ok, Code, _, Body} = test_request:get(Url, []),
+ {Code, jiffy:decode(Body, [return_maps])}.
diff --git a/src/couch_replicator/test/eunit/couch_replicator_use_checkpoints_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_use_checkpoints_tests.erl
index 8e4a21dbb..4371eff1f 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_use_checkpoints_tests.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_use_checkpoints_tests.erl
@@ -14,165 +14,82 @@
-include_lib("couch/include/couch_eunit.hrl").
-include_lib("couch/include/couch_db.hrl").
+-include_lib("fabric/test/fabric2_test.hrl").
--import(couch_replicator_test_helper, [
- db_url/1,
- replicate/1
-]).
-define(DOCS_COUNT, 100).
--define(TIMEOUT_EUNIT, 30).
-define(i2l(I), integer_to_list(I)).
-define(io2b(Io), iolist_to_binary(Io)).
-start(false) ->
- fun
- ({finished, _, {CheckpointHistory}}) ->
- ?assertEqual([{<<"use_checkpoints">>,false}], CheckpointHistory);
- (_) ->
- ok
- end;
-start(true) ->
- fun
- ({finished, _, {CheckpointHistory}}) ->
- ?assertNotEqual(false, lists:keyfind(<<"session_id">>,
- 1, CheckpointHistory));
- (_) ->
- ok
- end.
-
-stop(_, _) ->
- ok.
-
-setup() ->
- DbName = ?tempdb(),
- {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
- ok = couch_db:close(Db),
- DbName.
-
-setup(remote) ->
- {remote, setup()};
-setup({_, Fun, {A, B}}) ->
- Ctx = test_util:start_couch([couch_replicator]),
- {ok, Listener} = couch_replicator_notifier:start_link(Fun),
- Source = setup(A),
- Target = setup(B),
- {Ctx, {Source, Target, Listener}}.
-
-teardown({remote, DbName}) ->
- teardown(DbName);
-teardown(DbName) ->
- ok = couch_server:delete(DbName, [?ADMIN_CTX]),
- ok.
-
-teardown(_, {Ctx, {Source, Target, Listener}}) ->
- teardown(Source),
- teardown(Target),
-
- couch_replicator_notifier:stop(Listener),
- ok = application:stop(couch_replicator),
- ok = test_util:stop_couch(Ctx).
-
use_checkpoints_test_() ->
{
- "Replication use_checkpoints feature tests",
+ setup,
+ fun couch_replicator_test_helper:start_couch/0,
+ fun couch_replicator_test_helper:stop_couch/1,
{
- foreachx,
- fun start/1, fun stop/2,
- [{UseCheckpoints, fun use_checkpoints_tests/2}
- || UseCheckpoints <- [false, true]]
+ foreach,
+ fun setup/0,
+ fun teardown/1,
+ [
+ ?TDEF_FE(t_replicate_with_checkpoints, 15),
+ ?TDEF_FE(t_replicate_without_checkpoints, 15)
+ ]
}
}.
-use_checkpoints_tests(UseCheckpoints, Fun) ->
- Pairs = [{remote, remote}],
- {
- "use_checkpoints: " ++ atom_to_list(UseCheckpoints),
- {
- foreachx,
- fun setup/1, fun teardown/2,
- [{{UseCheckpoints, Fun, Pair}, fun should_test_checkpoints/2}
- || Pair <- Pairs]
- }
- }.
-should_test_checkpoints({UseCheckpoints, _, {From, To}}, {_Ctx, {Source, Target, _}}) ->
- should_test_checkpoints(UseCheckpoints, {From, To}, {Source, Target}).
-should_test_checkpoints(UseCheckpoints, {From, To}, {Source, Target}) ->
- {lists:flatten(io_lib:format("~p -> ~p", [From, To])),
- {inorder, [
- should_populate_source(Source, ?DOCS_COUNT),
- should_replicate(Source, Target, UseCheckpoints),
- should_compare_databases(Source, Target)
- ]}}.
-
-should_populate_source({remote, Source}, DocCount) ->
- should_populate_source(Source, DocCount);
-should_populate_source(Source, DocCount) ->
- {timeout, ?TIMEOUT_EUNIT, ?_test(populate_db(Source, DocCount))}.
-
-should_replicate({remote, Source}, Target, UseCheckpoints) ->
- should_replicate(db_url(Source), Target, UseCheckpoints);
-should_replicate(Source, {remote, Target}, UseCheckpoints) ->
- should_replicate(Source, db_url(Target), UseCheckpoints);
-should_replicate(Source, Target, UseCheckpoints) ->
- {timeout, ?TIMEOUT_EUNIT, ?_test(replicate(Source, Target, UseCheckpoints))}.
-
-should_compare_databases({remote, Source}, Target) ->
- should_compare_databases(Source, Target);
-should_compare_databases(Source, {remote, Target}) ->
- should_compare_databases(Source, Target);
-should_compare_databases(Source, Target) ->
- {timeout, ?TIMEOUT_EUNIT, ?_test(compare_dbs(Source, Target))}.
+setup() ->
+ Source = couch_replicator_test_helper:create_db(),
+ Target = couch_replicator_test_helper:create_db(),
+ {Source, Target}.
-populate_db(DbName, DocCount) ->
- {ok, Db} = couch_db:open_int(DbName, []),
- Docs = lists:foldl(
- fun(DocIdCounter, Acc) ->
- Id = ?io2b(["doc", ?i2l(DocIdCounter)]),
- Value = ?io2b(["val", ?i2l(DocIdCounter)]),
- Doc = #doc{
- id = Id,
- body = {[ {<<"value">>, Value} ]}
- },
- [Doc | Acc]
- end,
- [], lists:seq(1, DocCount)),
- {ok, _} = couch_db:update_docs(Db, Docs, []),
- ok = couch_db:close(Db).
-
-compare_dbs(Source, Target) ->
- {ok, SourceDb} = couch_db:open_int(Source, []),
- {ok, TargetDb} = couch_db:open_int(Target, []),
- Fun = fun(FullDocInfo, Acc) ->
- {ok, Doc} = couch_db:open_doc(SourceDb, FullDocInfo),
- {Props} = DocJson = couch_doc:to_json_obj(Doc, [attachments]),
- DocId = couch_util:get_value(<<"_id">>, Props),
- DocTarget = case couch_db:open_doc(TargetDb, DocId) of
- {ok, DocT} ->
- DocT;
- Error ->
- erlang:error(
- {assertion_failed,
- [{module, ?MODULE}, {line, ?LINE},
- {reason, lists:concat(["Error opening document '",
- ?b2l(DocId), "' from target: ",
- couch_util:to_list(Error)])}]})
- end,
- DocTargetJson = couch_doc:to_json_obj(DocTarget, [attachments]),
- ?assertEqual(DocJson, DocTargetJson),
- {ok, Acc}
- end,
- {ok, _} = couch_db:fold_docs(SourceDb, Fun, [], []),
- ok = couch_db:close(SourceDb),
- ok = couch_db:close(TargetDb).
-
-replicate(Source, Target, UseCheckpoints) ->
- replicate({[
- {<<"source">>, Source},
- {<<"target">>, Target},
- {<<"use_checkpoints">>, UseCheckpoints}
- ]}).
+teardown({Source, Target}) ->
+ couch_replicator_test_helper:delete_db(Source),
+ couch_replicator_test_helper:delete_db(Target).
+
+t_replicate_with_checkpoints({Source, Target}) ->
+ populate_db(Source, ?DOCS_COUNT),
+ Res = couch_replicator_test_helper:replicate(#{
+ <<"source">> => Source,
+ <<"target">> => Target,
+ <<"use_checkpoints">> => true
+ }),
+ ?assertMatch({ok, _}, Res),
+
+ {ok, History} = Res,
+ ?assertMatch(#{<<"history">> := _, <<"session_id">> := _}, History),
+
+ Checkpoints = maps:get(<<"history">>, History),
+ SessionId = maps:get(<<"session_id">>, History),
+ ?assert(is_binary(SessionId)),
+ ?assert(is_list(Checkpoints)),
+ ?assert(length(Checkpoints) >= 1),
+
+ couch_replicator_test_helper:compare_dbs(Source, Target).
+
+
+t_replicate_without_checkpoints({Source, Target}) ->
+ populate_db(Source, ?DOCS_COUNT),
+ Res = couch_replicator_test_helper:replicate(#{
+ <<"source">> => Source,
+ <<"target">> => Target,
+ <<"use_checkpoints">> => false
+ }),
+ ?assertEqual({ok, #{<<"use_checkpoints">> => false}}, Res),
+ couch_replicator_test_helper:compare_dbs(Source, Target).
+
+
+populate_db(DbName, DocCount) ->
+ Docs = lists:foldl(fun(DocIdCounter, Acc) ->
+ Id = ?io2b(["doc", ?i2l(DocIdCounter)]),
+ Value = ?io2b(["val", ?i2l(DocIdCounter)]),
+ Doc = #doc{
+ id = Id,
+ body = {[{<<"value">>, Value}]}
+ },
+ [Doc | Acc]
+ end, [], lists:seq(1, DocCount)),
+ couch_replicator_test_helper:create_docs(DbName, Docs).
diff --git a/test/elixir/test/replication_test.exs b/test/elixir/test/replication_test.exs
index 8b657d916..9af5ef81a 100644
--- a/test/elixir/test/replication_test.exs
+++ b/test/elixir/test/replication_test.exs
@@ -14,7 +14,10 @@ defmodule ReplicationTest do
# This should probably go into `make elixir` like what
# happens for JavaScript tests.
- @moduletag config: [{"replicator", "startup_jitter", "0"}]
+ @moduletag config: [
+ {"replicator", "startup_jitter", "0"},
+ {"replicator", "stats_update_interval_sec", "0"}
+ ]
test "source database not found with host" do
name = random_db_name()