summaryrefslogtreecommitdiff
path: root/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl')
-rw-r--r--src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl271
1 files changed, 271 insertions, 0 deletions
diff --git a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
new file mode 100644
index 000000000..6b4f95c25
--- /dev/null
+++ b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
@@ -0,0 +1,271 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_error_reporting_tests).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_replicator/src/couch_replicator.hrl").
+
+
+setup_all() ->
+ test_util:start_couch([couch_replicator, chttpd, mem3, fabric]).
+
+
+teardown_all(Ctx) ->
+ ok = test_util:stop_couch(Ctx).
+
+
+setup() ->
+ meck:unload(),
+ Source = setup_db(),
+ Target = setup_db(),
+ {Source, Target}.
+
+
+teardown({Source, Target}) ->
+ meck:unload(),
+ teardown_db(Source),
+ teardown_db(Target),
+ ok.
+
+
+error_reporting_test_() ->
+ {
+ setup,
+ fun setup_all/0,
+ fun teardown_all/1,
+ {
+ foreach,
+ fun setup/0,
+ fun teardown/1,
+ [
+ fun t_fail_bulk_docs/1,
+ fun t_fail_changes_reader/1,
+ fun t_fail_revs_diff/1,
+ fun t_fail_changes_queue/1,
+ fun t_fail_changes_manager/1,
+ fun t_fail_changes_reader_proc/1
+ ]
+ }
+ }.
+
+
+t_fail_bulk_docs({Source, Target}) ->
+ ?_test(begin
+ populate_db(Source, 1, 5),
+ {ok, RepId} = replicate(Source, Target),
+ wait_target_in_sync(Source, Target),
+
+ {ok, Listener} = rep_result_listener(RepId),
+ mock_fail_req("/_bulk_docs", {ok, "403", [], [<<"{\"x\":\"y\"}">>]}),
+ populate_db(Source, 6, 6),
+
+ {error, Result} = wait_rep_result(RepId),
+ ?assertEqual({bulk_docs_failed, 403, {[{<<"x">>, <<"y">>}]}}, Result),
+
+ couch_replicator_notifier:stop(Listener)
+ end).
+
+
+t_fail_changes_reader({Source, Target}) ->
+ ?_test(begin
+ populate_db(Source, 1, 5),
+ {ok, RepId} = replicate(Source, Target),
+ wait_target_in_sync(Source, Target),
+
+ {ok, Listener} = rep_result_listener(RepId),
+ mock_fail_req("/_changes", {ok, "418", [], [<<"{\"x\":\"y\"}">>]}),
+ populate_db(Source, 6, 6),
+
+ {error, Result} = wait_rep_result(RepId),
+ ?assertEqual({changes_req_failed, 418, {[{<<"x">>, <<"y">>}]}}, Result),
+
+ couch_replicator_notifier:stop(Listener)
+ end).
+
+
+t_fail_revs_diff({Source, Target}) ->
+ ?_test(begin
+ populate_db(Source, 1, 5),
+ {ok, RepId} = replicate(Source, Target),
+ wait_target_in_sync(Source, Target),
+
+ {ok, Listener} = rep_result_listener(RepId),
+ mock_fail_req("/_revs_diff", {ok, "407", [], [<<"{\"x\":\"y\"}">>]}),
+ populate_db(Source, 6, 6),
+
+ {error, Result} = wait_rep_result(RepId),
+ ?assertEqual({revs_diff_failed, 407, {[{<<"x">>, <<"y">>}]}}, Result),
+
+ couch_replicator_notifier:stop(Listener)
+ end).
+
+
+t_fail_changes_queue({Source, Target}) ->
+ ?_test(begin
+ populate_db(Source, 1, 5),
+ {ok, RepId} = replicate(Source, Target),
+ wait_target_in_sync(Source, Target),
+
+ RepPid = couch_replicator_test_helper:get_pid(RepId),
+ State = sys:get_state(RepPid),
+ ChangesQueue = element(20, State),
+ ?assert(is_process_alive(ChangesQueue)),
+
+ {ok, Listener} = rep_result_listener(RepId),
+ exit(ChangesQueue, boom),
+
+ {error, Result} = wait_rep_result(RepId),
+ ?assertEqual({changes_queue_died, boom}, Result),
+ couch_replicator_notifier:stop(Listener)
+ end).
+
+
+t_fail_changes_manager({Source, Target}) ->
+ ?_test(begin
+ populate_db(Source, 1, 5),
+ {ok, RepId} = replicate(Source, Target),
+ wait_target_in_sync(Source, Target),
+
+ RepPid = couch_replicator_test_helper:get_pid(RepId),
+ State = sys:get_state(RepPid),
+ ChangesManager = element(21, State),
+ ?assert(is_process_alive(ChangesManager)),
+
+ {ok, Listener} = rep_result_listener(RepId),
+ exit(ChangesManager, bam),
+
+ {error, Result} = wait_rep_result(RepId),
+ ?assertEqual({changes_manager_died, bam}, Result),
+ couch_replicator_notifier:stop(Listener)
+ end).
+
+
+t_fail_changes_reader_proc({Source, Target}) ->
+ ?_test(begin
+ populate_db(Source, 1, 5),
+ {ok, RepId} = replicate(Source, Target),
+ wait_target_in_sync(Source, Target),
+
+ RepPid = couch_replicator_test_helper:get_pid(RepId),
+ State = sys:get_state(RepPid),
+ ChangesReader = element(22, State),
+ ?assert(is_process_alive(ChangesReader)),
+
+ {ok, Listener} = rep_result_listener(RepId),
+ exit(ChangesReader, kapow),
+
+ {error, Result} = wait_rep_result(RepId),
+ ?assertEqual({changes_reader_died, kapow}, Result),
+ couch_replicator_notifier:stop(Listener)
+ end).
+
+
+mock_fail_req(Path, Return) ->
+ meck:expect(ibrowse, send_req_direct,
+ fun(W, Url, Headers, Meth, Body, Opts, TOut) ->
+ Args = [W, Url, Headers, Meth, Body, Opts, TOut],
+ {ok, {_, _, _, _, UPath, _}} = http_uri:parse(Url),
+ case lists:suffix(Path, UPath) of
+ true -> Return;
+ false -> meck:passthrough(Args)
+ end
+ end).
+
+
+rep_result_listener(RepId) ->
+ ReplyTo = self(),
+ {ok, _Listener} = couch_replicator_notifier:start_link(
+ fun({_, RepId2, _} = Ev) when RepId2 =:= RepId ->
+ ReplyTo ! Ev;
+ (_) ->
+ ok
+ end).
+
+
+wait_rep_result(RepId) ->
+ receive
+ {finished, RepId, RepResult} -> {ok, RepResult};
+ {error, RepId, Reason} -> {error, Reason}
+ end.
+
+
+
+setup_db() ->
+ DbName = ?tempdb(),
+ {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
+ ok = couch_db:close(Db),
+ DbName.
+
+
+teardown_db(DbName) ->
+ ok = couch_server:delete(DbName, [?ADMIN_CTX]).
+
+
+populate_db(DbName, Start, End) ->
+ {ok, Db} = couch_db:open_int(DbName, []),
+ Docs = lists:foldl(
+ fun(DocIdCounter, Acc) ->
+ Id = integer_to_binary(DocIdCounter),
+ Doc = #doc{id = Id, body = {[]}},
+ [Doc | Acc]
+ end,
+ [], lists:seq(Start, End)),
+ {ok, _} = couch_db:update_docs(Db, Docs, []),
+ ok = couch_db:close(Db).
+
+
+wait_target_in_sync(Source, Target) ->
+ {ok, SourceDb} = couch_db:open_int(Source, []),
+ {ok, SourceInfo} = couch_db:get_db_info(SourceDb),
+ ok = couch_db:close(SourceDb),
+ SourceDocCount = couch_util:get_value(doc_count, SourceInfo),
+ wait_target_in_sync_loop(SourceDocCount, Target, 300).
+
+
+wait_target_in_sync_loop(_DocCount, _TargetName, 0) ->
+ erlang:error({assertion_failed, [
+ {module, ?MODULE}, {line, ?LINE},
+ {reason, "Could not get source and target databases in sync"}
+ ]});
+
+wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft) ->
+ {ok, Target} = couch_db:open_int(TargetName, []),
+ {ok, TargetInfo} = couch_db:get_db_info(Target),
+ ok = couch_db:close(Target),
+ TargetDocCount = couch_util:get_value(doc_count, TargetInfo),
+ case TargetDocCount == DocCount of
+ true ->
+ true;
+ false ->
+ ok = timer:sleep(500),
+ wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft - 1)
+ end.
+
+
+replicate(Source, Target) ->
+ SrcUrl = couch_replicator_test_helper:db_url(Source),
+ TgtUrl = couch_replicator_test_helper:db_url(Target),
+ RepObject = {[
+ {<<"source">>, SrcUrl},
+ {<<"target">>, TgtUrl},
+ {<<"continuous">>, true},
+ {<<"worker_processes">>, 1},
+ {<<"retries_per_request">>, 1},
+ % Low connection timeout so _changes feed gets restarted quicker
+ {<<"connection_timeout">>, 3000}
+ ]},
+ {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
+ ok = couch_replicator_scheduler:add_job(Rep),
+ couch_replicator_scheduler:reschedule(),
+ {ok, Rep#rep.id}.