diff options
author | Joan Touzet <wohali@users.noreply.github.com> | 2020-01-13 21:36:54 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-01-13 21:36:54 +0000 |
commit | f366a2df199d5338cd1a71a1eeedea50caa6da12 (patch) | |
tree | 4783c3ebba9fa82ea849660e076bd60eb03eaaaa | |
parent | d396a7c0eca4429c50a552d8f83a65527c8e2376 (diff) | |
parent | efb374a6493c4220333a201b649649736ddcc528 (diff) | |
download | couchdb-compiler-warnings-1.tar.gz |
Merge branch 'master' into compiler-warnings-1compiler-warnings-1
-rw-r--r-- | LICENSE | 2 | ||||
-rw-r--r-- | NOTICE | 2 | ||||
-rw-r--r-- | src/couch_replicator/src/couch_replicator_api_wrap.erl | 12 | ||||
-rw-r--r-- | src/couch_replicator/src/couch_replicator_scheduler.erl | 8 | ||||
-rw-r--r-- | src/couch_replicator/src/couch_replicator_scheduler_job.erl | 51 | ||||
-rw-r--r-- | src/couch_replicator/src/couch_replicator_worker.erl | 18 | ||||
-rw-r--r-- | src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl | 271 |
7 files changed, 331 insertions, 33 deletions
@@ -187,7 +187,7 @@ same "printed page" as the copyright notice for easier identification within third-party archives. - Copyright 2019 The Apache Foundation + Copyright 2020 The Apache Foundation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -1,5 +1,5 @@ Apache CouchDB -Copyright 2009-2019 The Apache Software Foundation +Copyright 2009-2020 The Apache Software Foundation This product includes software developed at The Apache Software Foundation (http://www.apache.org/). diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl b/src/couch_replicator/src/couch_replicator_api_wrap.erl index ab1de7df9..a21de4242 100644 --- a/src/couch_replicator/src/couch_replicator_api_wrap.erl +++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl @@ -186,7 +186,9 @@ get_missing_revs(#httpdb{} = Db, IdRevs) -> ), {Id, MissingRevs, PossibleAncestors} end, - {ok, lists:map(ConvertToNativeFun, Props)} + {ok, lists:map(ConvertToNativeFun, Props)}; + (ErrCode, _, ErrMsg) when is_integer(ErrCode) -> + {error, {revs_diff_failed, ErrCode, ErrMsg}} end). @@ -408,7 +410,9 @@ update_docs(#httpdb{} = HttpDb, DocList, Options, UpdateType) -> (413, _, _) -> {error, request_body_too_large}; (417, _, Results) when is_list(Results) -> - {ok, bulk_results_to_errors(DocList, Results, remote)} + {ok, bulk_results_to_errors(DocList, Results, remote)}; + (ErrCode, _, ErrMsg) when is_integer(ErrCode) -> + {error, {bulk_docs_failed, ErrCode, ErrMsg}} end). @@ -466,7 +470,9 @@ changes_since(#httpdb{headers = Headers1, timeout = InactiveTimeout} = HttpDb, end, parse_changes_feed(Options, UserFun2, DataStreamFun2) - end) + end); + (ErrCode, _, ErrMsg) when is_integer(ErrCode) -> + throw({retry_limit, {changes_req_failed, ErrCode, ErrMsg}}) end) catch exit:{http_request_failed, _, _, max_backoff} -> diff --git a/src/couch_replicator/src/couch_replicator_scheduler.erl b/src/couch_replicator/src/couch_replicator_scheduler.erl index f84860c6e..53c040e8c 100644 --- a/src/couch_replicator/src/couch_replicator_scheduler.erl +++ b/src/couch_replicator/src/couch_replicator_scheduler.erl @@ -318,8 +318,12 @@ handle_info({'DOWN', _Ref, process, Pid, normal}, State) -> update_running_jobs_stats(State#state.stats_pid), {noreply, State}; -handle_info({'DOWN', _Ref, process, Pid, Reason}, State) -> +handle_info({'DOWN', _Ref, process, Pid, Reason0}, State) -> {ok, Job} = job_by_pid(Pid), + Reason = case Reason0 of + {shutdown, ShutdownReason} -> ShutdownReason; + Other -> Other + end, ok = handle_crashed_job(Job, Reason, State), {noreply, State}; @@ -873,7 +877,7 @@ is_continuous(#job{rep = Rep}) -> % optimize some options to help the job make progress. -spec maybe_optimize_job_for_rate_limiting(#job{}) -> #job{}. maybe_optimize_job_for_rate_limiting(Job = #job{history = - [{{crashed, {shutdown, max_backoff}}, _} | _]}) -> + [{{crashed, max_backoff}, _} | _]}) -> Opts = [ {checkpoint_interval, 5000}, {worker_processes, 2}, diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl index d69febb81..12d3e5530 100644 --- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl +++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl @@ -73,8 +73,6 @@ workers, stats = couch_replicator_stats:new(), session_id, - source_monitor = nil, - target_monitor = nil, source_seq = nil, use_checkpoints = true, checkpoint_interval = ?DEFAULT_CHECKPOINT_INTERVAL, @@ -242,14 +240,6 @@ handle_cast({report_seq, Seq}, handle_info(shutdown, St) -> {stop, shutdown, St}; -handle_info({'DOWN', Ref, _, _, Why}, #rep_state{source_monitor = Ref} = St) -> - couch_log:error("Source database is down. Reason: ~p", [Why]), - {stop, source_db_down, St}; - -handle_info({'DOWN', Ref, _, _, Why}, #rep_state{target_monitor = Ref} = St) -> - couch_log:error("Target database is down. Reason: ~p", [Why]), - {stop, target_db_down, St}; - handle_info({'EXIT', Pid, max_backoff}, State) -> couch_log:error("Max backoff reached child process ~p", [Pid]), {stop, {shutdown, max_backoff}, State}; @@ -261,10 +251,20 @@ handle_info({'EXIT', Pid, {shutdown, max_backoff}}, State) -> handle_info({'EXIT', Pid, normal}, #rep_state{changes_reader=Pid} = State) -> {noreply, State}; -handle_info({'EXIT', Pid, Reason}, #rep_state{changes_reader=Pid} = State) -> +handle_info({'EXIT', Pid, Reason0}, #rep_state{changes_reader=Pid} = State) -> couch_stats:increment_counter([couch_replicator, changes_reader_deaths]), + Reason = case Reason0 of + {changes_req_failed, _, _} = HttpFail -> + HttpFail; + {http_request_failed, _, _, {error, {code, Code}}} -> + {changes_req_failed, Code}; + {http_request_failed, _, _, {error, Err}} -> + {changes_req_failed, Err}; + Other -> + {changes_reader_died, Other} + end, couch_log:error("ChangesReader process died with reason: ~p", [Reason]), - {stop, changes_reader_died, cancel_timer(State)}; + {stop, {shutdown, Reason}, cancel_timer(State)}; handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager = Pid} = State) -> {noreply, State}; @@ -272,7 +272,7 @@ handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager = Pid} = State) -> handle_info({'EXIT', Pid, Reason}, #rep_state{changes_manager = Pid} = State) -> couch_stats:increment_counter([couch_replicator, changes_manager_deaths]), couch_log:error("ChangesManager process died with reason: ~p", [Reason]), - {stop, changes_manager_died, cancel_timer(State)}; + {stop, {shutdown, {changes_manager_died, Reason}}, cancel_timer(State)}; handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) -> {noreply, State}; @@ -280,7 +280,7 @@ handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) -> handle_info({'EXIT', Pid, Reason}, #rep_state{changes_queue=Pid} = State) -> couch_stats:increment_counter([couch_replicator, changes_queue_deaths]), couch_log:error("ChangesQueue process died with reason: ~p", [Reason]), - {stop, changes_queue_died, cancel_timer(State)}; + {stop, {shutdown, {changes_queue_died, Reason}}, cancel_timer(State)}; handle_info({'EXIT', Pid, normal}, #rep_state{workers = Workers} = State) -> case Workers -- [Pid] of @@ -304,8 +304,14 @@ handle_info({'EXIT', Pid, Reason}, #rep_state{workers = Workers} = State) -> {stop, {unknown_process_died, Pid, Reason}, State2}; true -> couch_stats:increment_counter([couch_replicator, worker_deaths]), - couch_log:error("Worker ~p died with reason: ~p", [Pid, Reason]), - {stop, {worker_died, Pid, Reason}, State2} + StopReason = case Reason of + {shutdown, _} = Err -> + Err; + Other -> + couch_log:error("Worker ~p died with reason: ~p", [Pid, Reason]), + {worker_died, Pid, Other} + end, + {stop, StopReason, State2} end; handle_info(timeout, InitArgs) -> @@ -380,6 +386,11 @@ terminate({shutdown, max_backoff}, State) -> terminate_cleanup(State), couch_replicator_notifier:notify({error, RepId, max_backoff}); +terminate({shutdown, Reason}, State) -> + % Unwrap so when reporting we don't have an extra {shutdown, ...} tuple + % wrapped around the message + terminate(Reason, State); + terminate(Reason, State) -> #rep_state{ source_name = Source, @@ -592,8 +603,6 @@ init_state(Rep) -> src_starttime = get_value(<<"instance_start_time">>, SourceInfo), tgt_starttime = get_value(<<"instance_start_time">>, TargetInfo), session_id = couch_uuids:random(), - source_monitor = db_monitor(Source), - target_monitor = db_monitor(Target), source_seq = SourceSeq, use_checkpoints = get_value(use_checkpoints, Options, true), checkpoint_interval = get_value(checkpoint_interval, Options, @@ -905,12 +914,6 @@ has_session_id(SessionId, [{Props} | Rest]) -> end. -db_monitor(#httpdb{}) -> - nil; -db_monitor(Db) -> - couch_db:monitor(Db). - - get_pending_count(St) -> Rep = St#rep_state.rep_details, Timeout = get_value(connection_timeout, Rep#rep.options), diff --git a/src/couch_replicator/src/couch_replicator_worker.erl b/src/couch_replicator/src/couch_replicator_worker.erl index 3d80f5883..885e171a0 100644 --- a/src/couch_replicator/src/couch_replicator_worker.erl +++ b/src/couch_replicator/src/couch_replicator_worker.erl @@ -169,6 +169,15 @@ handle_info({'EXIT', Pid, normal}, #state{writer = nil} = State) -> handle_info({'EXIT', _Pid, max_backoff}, State) -> {stop, {shutdown, max_backoff}, State}; +handle_info({'EXIT', _Pid, {bulk_docs_failed, _, _} = Err}, State) -> + {stop, {shutdown, Err}, State}; + +handle_info({'EXIT', _Pid, {revs_diff_failed, _, _} = Err}, State) -> + {stop, {shutdown, Err}, State}; + +handle_info({'EXIT', _Pid, {http_request_failed, _, _, _} = Err}, State) -> + {stop, {shutdown, Err}, State}; + handle_info({'EXIT', Pid, Reason}, State) -> {stop, {process_died, Pid, Reason}, State}. @@ -386,7 +395,9 @@ handle_flush_docs_result({ok, Errors}, Target, DocList) -> couch_replicator_stats:new([ {docs_written, length(DocList) - length(Errors)}, {doc_write_failures, length(Errors)} - ]). + ]); +handle_flush_docs_result({error, {bulk_docs_failed, _, _} = Err}, _, _) -> + exit(Err). flush_doc(Target, #doc{id = Id, revs = {Pos, [RevId | _]}} = Doc) -> @@ -425,7 +436,10 @@ find_missing(DocInfos, Target) -> end, {[], 0}, DocInfos), - {ok, Missing} = couch_replicator_api_wrap:get_missing_revs(Target, IdRevs), + Missing = case couch_replicator_api_wrap:get_missing_revs(Target, IdRevs) of + {ok, Result} -> Result; + {error, Error} -> exit(Error) + end, MissingRevsCount = lists:foldl( fun({_Id, MissingRevs, _PAs}, Acc) -> Acc + length(MissingRevs) end, 0, Missing), diff --git a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl new file mode 100644 index 000000000..6b4f95c25 --- /dev/null +++ b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl @@ -0,0 +1,271 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_replicator_error_reporting_tests). + +-include_lib("couch/include/couch_eunit.hrl"). +-include_lib("couch/include/couch_db.hrl"). +-include_lib("couch_replicator/src/couch_replicator.hrl"). + + +setup_all() -> + test_util:start_couch([couch_replicator, chttpd, mem3, fabric]). + + +teardown_all(Ctx) -> + ok = test_util:stop_couch(Ctx). + + +setup() -> + meck:unload(), + Source = setup_db(), + Target = setup_db(), + {Source, Target}. + + +teardown({Source, Target}) -> + meck:unload(), + teardown_db(Source), + teardown_db(Target), + ok. + + +error_reporting_test_() -> + { + setup, + fun setup_all/0, + fun teardown_all/1, + { + foreach, + fun setup/0, + fun teardown/1, + [ + fun t_fail_bulk_docs/1, + fun t_fail_changes_reader/1, + fun t_fail_revs_diff/1, + fun t_fail_changes_queue/1, + fun t_fail_changes_manager/1, + fun t_fail_changes_reader_proc/1 + ] + } + }. + + +t_fail_bulk_docs({Source, Target}) -> + ?_test(begin + populate_db(Source, 1, 5), + {ok, RepId} = replicate(Source, Target), + wait_target_in_sync(Source, Target), + + {ok, Listener} = rep_result_listener(RepId), + mock_fail_req("/_bulk_docs", {ok, "403", [], [<<"{\"x\":\"y\"}">>]}), + populate_db(Source, 6, 6), + + {error, Result} = wait_rep_result(RepId), + ?assertEqual({bulk_docs_failed, 403, {[{<<"x">>, <<"y">>}]}}, Result), + + couch_replicator_notifier:stop(Listener) + end). + + +t_fail_changes_reader({Source, Target}) -> + ?_test(begin + populate_db(Source, 1, 5), + {ok, RepId} = replicate(Source, Target), + wait_target_in_sync(Source, Target), + + {ok, Listener} = rep_result_listener(RepId), + mock_fail_req("/_changes", {ok, "418", [], [<<"{\"x\":\"y\"}">>]}), + populate_db(Source, 6, 6), + + {error, Result} = wait_rep_result(RepId), + ?assertEqual({changes_req_failed, 418, {[{<<"x">>, <<"y">>}]}}, Result), + + couch_replicator_notifier:stop(Listener) + end). + + +t_fail_revs_diff({Source, Target}) -> + ?_test(begin + populate_db(Source, 1, 5), + {ok, RepId} = replicate(Source, Target), + wait_target_in_sync(Source, Target), + + {ok, Listener} = rep_result_listener(RepId), + mock_fail_req("/_revs_diff", {ok, "407", [], [<<"{\"x\":\"y\"}">>]}), + populate_db(Source, 6, 6), + + {error, Result} = wait_rep_result(RepId), + ?assertEqual({revs_diff_failed, 407, {[{<<"x">>, <<"y">>}]}}, Result), + + couch_replicator_notifier:stop(Listener) + end). + + +t_fail_changes_queue({Source, Target}) -> + ?_test(begin + populate_db(Source, 1, 5), + {ok, RepId} = replicate(Source, Target), + wait_target_in_sync(Source, Target), + + RepPid = couch_replicator_test_helper:get_pid(RepId), + State = sys:get_state(RepPid), + ChangesQueue = element(20, State), + ?assert(is_process_alive(ChangesQueue)), + + {ok, Listener} = rep_result_listener(RepId), + exit(ChangesQueue, boom), + + {error, Result} = wait_rep_result(RepId), + ?assertEqual({changes_queue_died, boom}, Result), + couch_replicator_notifier:stop(Listener) + end). + + +t_fail_changes_manager({Source, Target}) -> + ?_test(begin + populate_db(Source, 1, 5), + {ok, RepId} = replicate(Source, Target), + wait_target_in_sync(Source, Target), + + RepPid = couch_replicator_test_helper:get_pid(RepId), + State = sys:get_state(RepPid), + ChangesManager = element(21, State), + ?assert(is_process_alive(ChangesManager)), + + {ok, Listener} = rep_result_listener(RepId), + exit(ChangesManager, bam), + + {error, Result} = wait_rep_result(RepId), + ?assertEqual({changes_manager_died, bam}, Result), + couch_replicator_notifier:stop(Listener) + end). + + +t_fail_changes_reader_proc({Source, Target}) -> + ?_test(begin + populate_db(Source, 1, 5), + {ok, RepId} = replicate(Source, Target), + wait_target_in_sync(Source, Target), + + RepPid = couch_replicator_test_helper:get_pid(RepId), + State = sys:get_state(RepPid), + ChangesReader = element(22, State), + ?assert(is_process_alive(ChangesReader)), + + {ok, Listener} = rep_result_listener(RepId), + exit(ChangesReader, kapow), + + {error, Result} = wait_rep_result(RepId), + ?assertEqual({changes_reader_died, kapow}, Result), + couch_replicator_notifier:stop(Listener) + end). + + +mock_fail_req(Path, Return) -> + meck:expect(ibrowse, send_req_direct, + fun(W, Url, Headers, Meth, Body, Opts, TOut) -> + Args = [W, Url, Headers, Meth, Body, Opts, TOut], + {ok, {_, _, _, _, UPath, _}} = http_uri:parse(Url), + case lists:suffix(Path, UPath) of + true -> Return; + false -> meck:passthrough(Args) + end + end). + + +rep_result_listener(RepId) -> + ReplyTo = self(), + {ok, _Listener} = couch_replicator_notifier:start_link( + fun({_, RepId2, _} = Ev) when RepId2 =:= RepId -> + ReplyTo ! Ev; + (_) -> + ok + end). + + +wait_rep_result(RepId) -> + receive + {finished, RepId, RepResult} -> {ok, RepResult}; + {error, RepId, Reason} -> {error, Reason} + end. + + + +setup_db() -> + DbName = ?tempdb(), + {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]), + ok = couch_db:close(Db), + DbName. + + +teardown_db(DbName) -> + ok = couch_server:delete(DbName, [?ADMIN_CTX]). + + +populate_db(DbName, Start, End) -> + {ok, Db} = couch_db:open_int(DbName, []), + Docs = lists:foldl( + fun(DocIdCounter, Acc) -> + Id = integer_to_binary(DocIdCounter), + Doc = #doc{id = Id, body = {[]}}, + [Doc | Acc] + end, + [], lists:seq(Start, End)), + {ok, _} = couch_db:update_docs(Db, Docs, []), + ok = couch_db:close(Db). + + +wait_target_in_sync(Source, Target) -> + {ok, SourceDb} = couch_db:open_int(Source, []), + {ok, SourceInfo} = couch_db:get_db_info(SourceDb), + ok = couch_db:close(SourceDb), + SourceDocCount = couch_util:get_value(doc_count, SourceInfo), + wait_target_in_sync_loop(SourceDocCount, Target, 300). + + +wait_target_in_sync_loop(_DocCount, _TargetName, 0) -> + erlang:error({assertion_failed, [ + {module, ?MODULE}, {line, ?LINE}, + {reason, "Could not get source and target databases in sync"} + ]}); + +wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft) -> + {ok, Target} = couch_db:open_int(TargetName, []), + {ok, TargetInfo} = couch_db:get_db_info(Target), + ok = couch_db:close(Target), + TargetDocCount = couch_util:get_value(doc_count, TargetInfo), + case TargetDocCount == DocCount of + true -> + true; + false -> + ok = timer:sleep(500), + wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft - 1) + end. + + +replicate(Source, Target) -> + SrcUrl = couch_replicator_test_helper:db_url(Source), + TgtUrl = couch_replicator_test_helper:db_url(Target), + RepObject = {[ + {<<"source">>, SrcUrl}, + {<<"target">>, TgtUrl}, + {<<"continuous">>, true}, + {<<"worker_processes">>, 1}, + {<<"retries_per_request">>, 1}, + % Low connection timeout so _changes feed gets restarted quicker + {<<"connection_timeout">>, 3000} + ]}, + {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER), + ok = couch_replicator_scheduler:add_job(Rep), + couch_replicator_scheduler:reschedule(), + {ok, Rep#rep.id}. |