diff options
Diffstat (limited to 'src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl')
-rw-r--r-- | src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl | 271 |
1 files changed, 271 insertions, 0 deletions
diff --git a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl new file mode 100644 index 000000000..6b4f95c25 --- /dev/null +++ b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl @@ -0,0 +1,271 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_replicator_error_reporting_tests). + +-include_lib("couch/include/couch_eunit.hrl"). +-include_lib("couch/include/couch_db.hrl"). +-include_lib("couch_replicator/src/couch_replicator.hrl"). + + +setup_all() -> + test_util:start_couch([couch_replicator, chttpd, mem3, fabric]). + + +teardown_all(Ctx) -> + ok = test_util:stop_couch(Ctx). + + +setup() -> + meck:unload(), + Source = setup_db(), + Target = setup_db(), + {Source, Target}. + + +teardown({Source, Target}) -> + meck:unload(), + teardown_db(Source), + teardown_db(Target), + ok. + + +error_reporting_test_() -> + { + setup, + fun setup_all/0, + fun teardown_all/1, + { + foreach, + fun setup/0, + fun teardown/1, + [ + fun t_fail_bulk_docs/1, + fun t_fail_changes_reader/1, + fun t_fail_revs_diff/1, + fun t_fail_changes_queue/1, + fun t_fail_changes_manager/1, + fun t_fail_changes_reader_proc/1 + ] + } + }. + + +t_fail_bulk_docs({Source, Target}) -> + ?_test(begin + populate_db(Source, 1, 5), + {ok, RepId} = replicate(Source, Target), + wait_target_in_sync(Source, Target), + + {ok, Listener} = rep_result_listener(RepId), + mock_fail_req("/_bulk_docs", {ok, "403", [], [<<"{\"x\":\"y\"}">>]}), + populate_db(Source, 6, 6), + + {error, Result} = wait_rep_result(RepId), + ?assertEqual({bulk_docs_failed, 403, {[{<<"x">>, <<"y">>}]}}, Result), + + couch_replicator_notifier:stop(Listener) + end). + + +t_fail_changes_reader({Source, Target}) -> + ?_test(begin + populate_db(Source, 1, 5), + {ok, RepId} = replicate(Source, Target), + wait_target_in_sync(Source, Target), + + {ok, Listener} = rep_result_listener(RepId), + mock_fail_req("/_changes", {ok, "418", [], [<<"{\"x\":\"y\"}">>]}), + populate_db(Source, 6, 6), + + {error, Result} = wait_rep_result(RepId), + ?assertEqual({changes_req_failed, 418, {[{<<"x">>, <<"y">>}]}}, Result), + + couch_replicator_notifier:stop(Listener) + end). + + +t_fail_revs_diff({Source, Target}) -> + ?_test(begin + populate_db(Source, 1, 5), + {ok, RepId} = replicate(Source, Target), + wait_target_in_sync(Source, Target), + + {ok, Listener} = rep_result_listener(RepId), + mock_fail_req("/_revs_diff", {ok, "407", [], [<<"{\"x\":\"y\"}">>]}), + populate_db(Source, 6, 6), + + {error, Result} = wait_rep_result(RepId), + ?assertEqual({revs_diff_failed, 407, {[{<<"x">>, <<"y">>}]}}, Result), + + couch_replicator_notifier:stop(Listener) + end). + + +t_fail_changes_queue({Source, Target}) -> + ?_test(begin + populate_db(Source, 1, 5), + {ok, RepId} = replicate(Source, Target), + wait_target_in_sync(Source, Target), + + RepPid = couch_replicator_test_helper:get_pid(RepId), + State = sys:get_state(RepPid), + ChangesQueue = element(20, State), + ?assert(is_process_alive(ChangesQueue)), + + {ok, Listener} = rep_result_listener(RepId), + exit(ChangesQueue, boom), + + {error, Result} = wait_rep_result(RepId), + ?assertEqual({changes_queue_died, boom}, Result), + couch_replicator_notifier:stop(Listener) + end). + + +t_fail_changes_manager({Source, Target}) -> + ?_test(begin + populate_db(Source, 1, 5), + {ok, RepId} = replicate(Source, Target), + wait_target_in_sync(Source, Target), + + RepPid = couch_replicator_test_helper:get_pid(RepId), + State = sys:get_state(RepPid), + ChangesManager = element(21, State), + ?assert(is_process_alive(ChangesManager)), + + {ok, Listener} = rep_result_listener(RepId), + exit(ChangesManager, bam), + + {error, Result} = wait_rep_result(RepId), + ?assertEqual({changes_manager_died, bam}, Result), + couch_replicator_notifier:stop(Listener) + end). + + +t_fail_changes_reader_proc({Source, Target}) -> + ?_test(begin + populate_db(Source, 1, 5), + {ok, RepId} = replicate(Source, Target), + wait_target_in_sync(Source, Target), + + RepPid = couch_replicator_test_helper:get_pid(RepId), + State = sys:get_state(RepPid), + ChangesReader = element(22, State), + ?assert(is_process_alive(ChangesReader)), + + {ok, Listener} = rep_result_listener(RepId), + exit(ChangesReader, kapow), + + {error, Result} = wait_rep_result(RepId), + ?assertEqual({changes_reader_died, kapow}, Result), + couch_replicator_notifier:stop(Listener) + end). + + +mock_fail_req(Path, Return) -> + meck:expect(ibrowse, send_req_direct, + fun(W, Url, Headers, Meth, Body, Opts, TOut) -> + Args = [W, Url, Headers, Meth, Body, Opts, TOut], + {ok, {_, _, _, _, UPath, _}} = http_uri:parse(Url), + case lists:suffix(Path, UPath) of + true -> Return; + false -> meck:passthrough(Args) + end + end). + + +rep_result_listener(RepId) -> + ReplyTo = self(), + {ok, _Listener} = couch_replicator_notifier:start_link( + fun({_, RepId2, _} = Ev) when RepId2 =:= RepId -> + ReplyTo ! Ev; + (_) -> + ok + end). + + +wait_rep_result(RepId) -> + receive + {finished, RepId, RepResult} -> {ok, RepResult}; + {error, RepId, Reason} -> {error, Reason} + end. + + + +setup_db() -> + DbName = ?tempdb(), + {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]), + ok = couch_db:close(Db), + DbName. + + +teardown_db(DbName) -> + ok = couch_server:delete(DbName, [?ADMIN_CTX]). + + +populate_db(DbName, Start, End) -> + {ok, Db} = couch_db:open_int(DbName, []), + Docs = lists:foldl( + fun(DocIdCounter, Acc) -> + Id = integer_to_binary(DocIdCounter), + Doc = #doc{id = Id, body = {[]}}, + [Doc | Acc] + end, + [], lists:seq(Start, End)), + {ok, _} = couch_db:update_docs(Db, Docs, []), + ok = couch_db:close(Db). + + +wait_target_in_sync(Source, Target) -> + {ok, SourceDb} = couch_db:open_int(Source, []), + {ok, SourceInfo} = couch_db:get_db_info(SourceDb), + ok = couch_db:close(SourceDb), + SourceDocCount = couch_util:get_value(doc_count, SourceInfo), + wait_target_in_sync_loop(SourceDocCount, Target, 300). + + +wait_target_in_sync_loop(_DocCount, _TargetName, 0) -> + erlang:error({assertion_failed, [ + {module, ?MODULE}, {line, ?LINE}, + {reason, "Could not get source and target databases in sync"} + ]}); + +wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft) -> + {ok, Target} = couch_db:open_int(TargetName, []), + {ok, TargetInfo} = couch_db:get_db_info(Target), + ok = couch_db:close(Target), + TargetDocCount = couch_util:get_value(doc_count, TargetInfo), + case TargetDocCount == DocCount of + true -> + true; + false -> + ok = timer:sleep(500), + wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft - 1) + end. + + +replicate(Source, Target) -> + SrcUrl = couch_replicator_test_helper:db_url(Source), + TgtUrl = couch_replicator_test_helper:db_url(Target), + RepObject = {[ + {<<"source">>, SrcUrl}, + {<<"target">>, TgtUrl}, + {<<"continuous">>, true}, + {<<"worker_processes">>, 1}, + {<<"retries_per_request">>, 1}, + % Low connection timeout so _changes feed gets restarted quicker + {<<"connection_timeout">>, 3000} + ]}, + {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER), + ok = couch_replicator_scheduler:add_job(Rep), + couch_replicator_scheduler:reschedule(), + {ok, Rep#rep.id}. |