diff options
author | Nick Vatamaniuc <vatamane@apache.org> | 2020-01-13 12:29:49 -0500 |
---|---|---|
committer | Nick Vatamaniuc <nickva@users.noreply.github.com> | 2020-01-13 16:36:29 -0500 |
commit | efb374a6493c4220333a201b649649736ddcc528 (patch) | |
tree | 02cee81d7f4e6ccd02b23b81fa92c4ce3e857547 | |
parent | 73d1e3ca10ae555f8b2bee69e98e5f8720b3e2ce (diff) | |
download | couchdb-efb374a6493c4220333a201b649649736ddcc528.tar.gz |
Improve replicator error reporting
Previously many HTTP requests failed noisily with `function_clause` errors.
Expect some of those failures and handle them better. There are mainly 3 types
of improvements:
1) Error messages are shorter. Instead of `function_clause` with a cryptic
internal fun names, return a simple marker like `bulk_docs_failed`
2) Include the error body if it was returned. HTTP failures besides the error
code may contain useful information in the body to help debug the failure.
3) Do not log or include the stack trace in the message. The error names are
enough to identify the place were they are generated so avoid spamming the
user and the logs with them. This is done by using `{shutdown, Error}` tuples
to bubble up the error the replication scheduler.
There is a small but related cleanup of removing source and target monitors
since we'd want to handle those error better however those errors are never
triggered since we removed local replication endpoints recently.
Fixes: https://github.com/apache/couchdb/issues/2413
5 files changed, 329 insertions, 31 deletions
diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl b/src/couch_replicator/src/couch_replicator_api_wrap.erl index ab1de7df9..a21de4242 100644 --- a/src/couch_replicator/src/couch_replicator_api_wrap.erl +++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl @@ -186,7 +186,9 @@ get_missing_revs(#httpdb{} = Db, IdRevs) -> ), {Id, MissingRevs, PossibleAncestors} end, - {ok, lists:map(ConvertToNativeFun, Props)} + {ok, lists:map(ConvertToNativeFun, Props)}; + (ErrCode, _, ErrMsg) when is_integer(ErrCode) -> + {error, {revs_diff_failed, ErrCode, ErrMsg}} end). @@ -408,7 +410,9 @@ update_docs(#httpdb{} = HttpDb, DocList, Options, UpdateType) -> (413, _, _) -> {error, request_body_too_large}; (417, _, Results) when is_list(Results) -> - {ok, bulk_results_to_errors(DocList, Results, remote)} + {ok, bulk_results_to_errors(DocList, Results, remote)}; + (ErrCode, _, ErrMsg) when is_integer(ErrCode) -> + {error, {bulk_docs_failed, ErrCode, ErrMsg}} end). @@ -466,7 +470,9 @@ changes_since(#httpdb{headers = Headers1, timeout = InactiveTimeout} = HttpDb, end, parse_changes_feed(Options, UserFun2, DataStreamFun2) - end) + end); + (ErrCode, _, ErrMsg) when is_integer(ErrCode) -> + throw({retry_limit, {changes_req_failed, ErrCode, ErrMsg}}) end) catch exit:{http_request_failed, _, _, max_backoff} -> diff --git a/src/couch_replicator/src/couch_replicator_scheduler.erl b/src/couch_replicator/src/couch_replicator_scheduler.erl index f84860c6e..53c040e8c 100644 --- a/src/couch_replicator/src/couch_replicator_scheduler.erl +++ b/src/couch_replicator/src/couch_replicator_scheduler.erl @@ -318,8 +318,12 @@ handle_info({'DOWN', _Ref, process, Pid, normal}, State) -> update_running_jobs_stats(State#state.stats_pid), {noreply, State}; -handle_info({'DOWN', _Ref, process, Pid, Reason}, State) -> +handle_info({'DOWN', _Ref, process, Pid, Reason0}, State) -> {ok, Job} = job_by_pid(Pid), + Reason = case Reason0 of + {shutdown, ShutdownReason} -> ShutdownReason; + Other -> Other + end, ok = handle_crashed_job(Job, Reason, State), {noreply, State}; @@ -873,7 +877,7 @@ is_continuous(#job{rep = Rep}) -> % optimize some options to help the job make progress. -spec maybe_optimize_job_for_rate_limiting(#job{}) -> #job{}. maybe_optimize_job_for_rate_limiting(Job = #job{history = - [{{crashed, {shutdown, max_backoff}}, _} | _]}) -> + [{{crashed, max_backoff}, _} | _]}) -> Opts = [ {checkpoint_interval, 5000}, {worker_processes, 2}, diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl index d69febb81..12d3e5530 100644 --- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl +++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl @@ -73,8 +73,6 @@ workers, stats = couch_replicator_stats:new(), session_id, - source_monitor = nil, - target_monitor = nil, source_seq = nil, use_checkpoints = true, checkpoint_interval = ?DEFAULT_CHECKPOINT_INTERVAL, @@ -242,14 +240,6 @@ handle_cast({report_seq, Seq}, handle_info(shutdown, St) -> {stop, shutdown, St}; -handle_info({'DOWN', Ref, _, _, Why}, #rep_state{source_monitor = Ref} = St) -> - couch_log:error("Source database is down. Reason: ~p", [Why]), - {stop, source_db_down, St}; - -handle_info({'DOWN', Ref, _, _, Why}, #rep_state{target_monitor = Ref} = St) -> - couch_log:error("Target database is down. Reason: ~p", [Why]), - {stop, target_db_down, St}; - handle_info({'EXIT', Pid, max_backoff}, State) -> couch_log:error("Max backoff reached child process ~p", [Pid]), {stop, {shutdown, max_backoff}, State}; @@ -261,10 +251,20 @@ handle_info({'EXIT', Pid, {shutdown, max_backoff}}, State) -> handle_info({'EXIT', Pid, normal}, #rep_state{changes_reader=Pid} = State) -> {noreply, State}; -handle_info({'EXIT', Pid, Reason}, #rep_state{changes_reader=Pid} = State) -> +handle_info({'EXIT', Pid, Reason0}, #rep_state{changes_reader=Pid} = State) -> couch_stats:increment_counter([couch_replicator, changes_reader_deaths]), + Reason = case Reason0 of + {changes_req_failed, _, _} = HttpFail -> + HttpFail; + {http_request_failed, _, _, {error, {code, Code}}} -> + {changes_req_failed, Code}; + {http_request_failed, _, _, {error, Err}} -> + {changes_req_failed, Err}; + Other -> + {changes_reader_died, Other} + end, couch_log:error("ChangesReader process died with reason: ~p", [Reason]), - {stop, changes_reader_died, cancel_timer(State)}; + {stop, {shutdown, Reason}, cancel_timer(State)}; handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager = Pid} = State) -> {noreply, State}; @@ -272,7 +272,7 @@ handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager = Pid} = State) -> handle_info({'EXIT', Pid, Reason}, #rep_state{changes_manager = Pid} = State) -> couch_stats:increment_counter([couch_replicator, changes_manager_deaths]), couch_log:error("ChangesManager process died with reason: ~p", [Reason]), - {stop, changes_manager_died, cancel_timer(State)}; + {stop, {shutdown, {changes_manager_died, Reason}}, cancel_timer(State)}; handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) -> {noreply, State}; @@ -280,7 +280,7 @@ handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) -> handle_info({'EXIT', Pid, Reason}, #rep_state{changes_queue=Pid} = State) -> couch_stats:increment_counter([couch_replicator, changes_queue_deaths]), couch_log:error("ChangesQueue process died with reason: ~p", [Reason]), - {stop, changes_queue_died, cancel_timer(State)}; + {stop, {shutdown, {changes_queue_died, Reason}}, cancel_timer(State)}; handle_info({'EXIT', Pid, normal}, #rep_state{workers = Workers} = State) -> case Workers -- [Pid] of @@ -304,8 +304,14 @@ handle_info({'EXIT', Pid, Reason}, #rep_state{workers = Workers} = State) -> {stop, {unknown_process_died, Pid, Reason}, State2}; true -> couch_stats:increment_counter([couch_replicator, worker_deaths]), - couch_log:error("Worker ~p died with reason: ~p", [Pid, Reason]), - {stop, {worker_died, Pid, Reason}, State2} + StopReason = case Reason of + {shutdown, _} = Err -> + Err; + Other -> + couch_log:error("Worker ~p died with reason: ~p", [Pid, Reason]), + {worker_died, Pid, Other} + end, + {stop, StopReason, State2} end; handle_info(timeout, InitArgs) -> @@ -380,6 +386,11 @@ terminate({shutdown, max_backoff}, State) -> terminate_cleanup(State), couch_replicator_notifier:notify({error, RepId, max_backoff}); +terminate({shutdown, Reason}, State) -> + % Unwrap so when reporting we don't have an extra {shutdown, ...} tuple + % wrapped around the message + terminate(Reason, State); + terminate(Reason, State) -> #rep_state{ source_name = Source, @@ -592,8 +603,6 @@ init_state(Rep) -> src_starttime = get_value(<<"instance_start_time">>, SourceInfo), tgt_starttime = get_value(<<"instance_start_time">>, TargetInfo), session_id = couch_uuids:random(), - source_monitor = db_monitor(Source), - target_monitor = db_monitor(Target), source_seq = SourceSeq, use_checkpoints = get_value(use_checkpoints, Options, true), checkpoint_interval = get_value(checkpoint_interval, Options, @@ -905,12 +914,6 @@ has_session_id(SessionId, [{Props} | Rest]) -> end. -db_monitor(#httpdb{}) -> - nil; -db_monitor(Db) -> - couch_db:monitor(Db). - - get_pending_count(St) -> Rep = St#rep_state.rep_details, Timeout = get_value(connection_timeout, Rep#rep.options), diff --git a/src/couch_replicator/src/couch_replicator_worker.erl b/src/couch_replicator/src/couch_replicator_worker.erl index 3d80f5883..885e171a0 100644 --- a/src/couch_replicator/src/couch_replicator_worker.erl +++ b/src/couch_replicator/src/couch_replicator_worker.erl @@ -169,6 +169,15 @@ handle_info({'EXIT', Pid, normal}, #state{writer = nil} = State) -> handle_info({'EXIT', _Pid, max_backoff}, State) -> {stop, {shutdown, max_backoff}, State}; +handle_info({'EXIT', _Pid, {bulk_docs_failed, _, _} = Err}, State) -> + {stop, {shutdown, Err}, State}; + +handle_info({'EXIT', _Pid, {revs_diff_failed, _, _} = Err}, State) -> + {stop, {shutdown, Err}, State}; + +handle_info({'EXIT', _Pid, {http_request_failed, _, _, _} = Err}, State) -> + {stop, {shutdown, Err}, State}; + handle_info({'EXIT', Pid, Reason}, State) -> {stop, {process_died, Pid, Reason}, State}. @@ -386,7 +395,9 @@ handle_flush_docs_result({ok, Errors}, Target, DocList) -> couch_replicator_stats:new([ {docs_written, length(DocList) - length(Errors)}, {doc_write_failures, length(Errors)} - ]). + ]); +handle_flush_docs_result({error, {bulk_docs_failed, _, _} = Err}, _, _) -> + exit(Err). flush_doc(Target, #doc{id = Id, revs = {Pos, [RevId | _]}} = Doc) -> @@ -425,7 +436,10 @@ find_missing(DocInfos, Target) -> end, {[], 0}, DocInfos), - {ok, Missing} = couch_replicator_api_wrap:get_missing_revs(Target, IdRevs), + Missing = case couch_replicator_api_wrap:get_missing_revs(Target, IdRevs) of + {ok, Result} -> Result; + {error, Error} -> exit(Error) + end, MissingRevsCount = lists:foldl( fun({_Id, MissingRevs, _PAs}, Acc) -> Acc + length(MissingRevs) end, 0, Missing), diff --git a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl new file mode 100644 index 000000000..6b4f95c25 --- /dev/null +++ b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl @@ -0,0 +1,271 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_replicator_error_reporting_tests). + +-include_lib("couch/include/couch_eunit.hrl"). +-include_lib("couch/include/couch_db.hrl"). +-include_lib("couch_replicator/src/couch_replicator.hrl"). + + +setup_all() -> + test_util:start_couch([couch_replicator, chttpd, mem3, fabric]). + + +teardown_all(Ctx) -> + ok = test_util:stop_couch(Ctx). + + +setup() -> + meck:unload(), + Source = setup_db(), + Target = setup_db(), + {Source, Target}. + + +teardown({Source, Target}) -> + meck:unload(), + teardown_db(Source), + teardown_db(Target), + ok. + + +error_reporting_test_() -> + { + setup, + fun setup_all/0, + fun teardown_all/1, + { + foreach, + fun setup/0, + fun teardown/1, + [ + fun t_fail_bulk_docs/1, + fun t_fail_changes_reader/1, + fun t_fail_revs_diff/1, + fun t_fail_changes_queue/1, + fun t_fail_changes_manager/1, + fun t_fail_changes_reader_proc/1 + ] + } + }. + + +t_fail_bulk_docs({Source, Target}) -> + ?_test(begin + populate_db(Source, 1, 5), + {ok, RepId} = replicate(Source, Target), + wait_target_in_sync(Source, Target), + + {ok, Listener} = rep_result_listener(RepId), + mock_fail_req("/_bulk_docs", {ok, "403", [], [<<"{\"x\":\"y\"}">>]}), + populate_db(Source, 6, 6), + + {error, Result} = wait_rep_result(RepId), + ?assertEqual({bulk_docs_failed, 403, {[{<<"x">>, <<"y">>}]}}, Result), + + couch_replicator_notifier:stop(Listener) + end). + + +t_fail_changes_reader({Source, Target}) -> + ?_test(begin + populate_db(Source, 1, 5), + {ok, RepId} = replicate(Source, Target), + wait_target_in_sync(Source, Target), + + {ok, Listener} = rep_result_listener(RepId), + mock_fail_req("/_changes", {ok, "418", [], [<<"{\"x\":\"y\"}">>]}), + populate_db(Source, 6, 6), + + {error, Result} = wait_rep_result(RepId), + ?assertEqual({changes_req_failed, 418, {[{<<"x">>, <<"y">>}]}}, Result), + + couch_replicator_notifier:stop(Listener) + end). + + +t_fail_revs_diff({Source, Target}) -> + ?_test(begin + populate_db(Source, 1, 5), + {ok, RepId} = replicate(Source, Target), + wait_target_in_sync(Source, Target), + + {ok, Listener} = rep_result_listener(RepId), + mock_fail_req("/_revs_diff", {ok, "407", [], [<<"{\"x\":\"y\"}">>]}), + populate_db(Source, 6, 6), + + {error, Result} = wait_rep_result(RepId), + ?assertEqual({revs_diff_failed, 407, {[{<<"x">>, <<"y">>}]}}, Result), + + couch_replicator_notifier:stop(Listener) + end). + + +t_fail_changes_queue({Source, Target}) -> + ?_test(begin + populate_db(Source, 1, 5), + {ok, RepId} = replicate(Source, Target), + wait_target_in_sync(Source, Target), + + RepPid = couch_replicator_test_helper:get_pid(RepId), + State = sys:get_state(RepPid), + ChangesQueue = element(20, State), + ?assert(is_process_alive(ChangesQueue)), + + {ok, Listener} = rep_result_listener(RepId), + exit(ChangesQueue, boom), + + {error, Result} = wait_rep_result(RepId), + ?assertEqual({changes_queue_died, boom}, Result), + couch_replicator_notifier:stop(Listener) + end). + + +t_fail_changes_manager({Source, Target}) -> + ?_test(begin + populate_db(Source, 1, 5), + {ok, RepId} = replicate(Source, Target), + wait_target_in_sync(Source, Target), + + RepPid = couch_replicator_test_helper:get_pid(RepId), + State = sys:get_state(RepPid), + ChangesManager = element(21, State), + ?assert(is_process_alive(ChangesManager)), + + {ok, Listener} = rep_result_listener(RepId), + exit(ChangesManager, bam), + + {error, Result} = wait_rep_result(RepId), + ?assertEqual({changes_manager_died, bam}, Result), + couch_replicator_notifier:stop(Listener) + end). + + +t_fail_changes_reader_proc({Source, Target}) -> + ?_test(begin + populate_db(Source, 1, 5), + {ok, RepId} = replicate(Source, Target), + wait_target_in_sync(Source, Target), + + RepPid = couch_replicator_test_helper:get_pid(RepId), + State = sys:get_state(RepPid), + ChangesReader = element(22, State), + ?assert(is_process_alive(ChangesReader)), + + {ok, Listener} = rep_result_listener(RepId), + exit(ChangesReader, kapow), + + {error, Result} = wait_rep_result(RepId), + ?assertEqual({changes_reader_died, kapow}, Result), + couch_replicator_notifier:stop(Listener) + end). + + +mock_fail_req(Path, Return) -> + meck:expect(ibrowse, send_req_direct, + fun(W, Url, Headers, Meth, Body, Opts, TOut) -> + Args = [W, Url, Headers, Meth, Body, Opts, TOut], + {ok, {_, _, _, _, UPath, _}} = http_uri:parse(Url), + case lists:suffix(Path, UPath) of + true -> Return; + false -> meck:passthrough(Args) + end + end). + + +rep_result_listener(RepId) -> + ReplyTo = self(), + {ok, _Listener} = couch_replicator_notifier:start_link( + fun({_, RepId2, _} = Ev) when RepId2 =:= RepId -> + ReplyTo ! Ev; + (_) -> + ok + end). + + +wait_rep_result(RepId) -> + receive + {finished, RepId, RepResult} -> {ok, RepResult}; + {error, RepId, Reason} -> {error, Reason} + end. + + + +setup_db() -> + DbName = ?tempdb(), + {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]), + ok = couch_db:close(Db), + DbName. + + +teardown_db(DbName) -> + ok = couch_server:delete(DbName, [?ADMIN_CTX]). + + +populate_db(DbName, Start, End) -> + {ok, Db} = couch_db:open_int(DbName, []), + Docs = lists:foldl( + fun(DocIdCounter, Acc) -> + Id = integer_to_binary(DocIdCounter), + Doc = #doc{id = Id, body = {[]}}, + [Doc | Acc] + end, + [], lists:seq(Start, End)), + {ok, _} = couch_db:update_docs(Db, Docs, []), + ok = couch_db:close(Db). + + +wait_target_in_sync(Source, Target) -> + {ok, SourceDb} = couch_db:open_int(Source, []), + {ok, SourceInfo} = couch_db:get_db_info(SourceDb), + ok = couch_db:close(SourceDb), + SourceDocCount = couch_util:get_value(doc_count, SourceInfo), + wait_target_in_sync_loop(SourceDocCount, Target, 300). + + +wait_target_in_sync_loop(_DocCount, _TargetName, 0) -> + erlang:error({assertion_failed, [ + {module, ?MODULE}, {line, ?LINE}, + {reason, "Could not get source and target databases in sync"} + ]}); + +wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft) -> + {ok, Target} = couch_db:open_int(TargetName, []), + {ok, TargetInfo} = couch_db:get_db_info(Target), + ok = couch_db:close(Target), + TargetDocCount = couch_util:get_value(doc_count, TargetInfo), + case TargetDocCount == DocCount of + true -> + true; + false -> + ok = timer:sleep(500), + wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft - 1) + end. + + +replicate(Source, Target) -> + SrcUrl = couch_replicator_test_helper:db_url(Source), + TgtUrl = couch_replicator_test_helper:db_url(Target), + RepObject = {[ + {<<"source">>, SrcUrl}, + {<<"target">>, TgtUrl}, + {<<"continuous">>, true}, + {<<"worker_processes">>, 1}, + {<<"retries_per_request">>, 1}, + % Low connection timeout so _changes feed gets restarted quicker + {<<"connection_timeout">>, 3000} + ]}, + {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER), + ok = couch_replicator_scheduler:add_job(Rep), + couch_replicator_scheduler:reschedule(), + {ok, Rep#rep.id}. |