summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2020-01-13 12:29:49 -0500
committerNick Vatamaniuc <vatamane@apache.org>2020-01-13 13:13:06 -0500
commit872bdb8970b605873685092e1ad443fdf2ae2d76 (patch)
tree02cee81d7f4e6ccd02b23b81fa92c4ce3e857547
parent73d1e3ca10ae555f8b2bee69e98e5f8720b3e2ce (diff)
downloadcouchdb-improve-error-reporting-in-replicator.tar.gz
Improve replicator error reportingimprove-error-reporting-in-replicator
Previously many HTTP requests failed noisily with `function_clause` errors. Expect some of those failures and handle them better. There are mainly 3 types of improvements: 1) Error messages are shorter. Instead of `function_clause` with a cryptic internal fun names, return a simple marker like `bulk_docs_failed` 2) Include the error body if it was returned. HTTP failures besides the error code may contain useful information in the body to help debug the failure. 3) Do not log or include the stack trace in the message. The error names are enough to identify the place were they are generated so avoid spamming the user and the logs with them. This is done by using `{shutdown, Error}` tuples to bubble up the error the replication scheduler. There is a small but related cleanup of removing source and target monitors since we'd want to handle those error better however those errors are never triggered since we removed local replication endpoints recently. Fixes: https://github.com/apache/couchdb/issues/2413
-rw-r--r--src/couch_replicator/src/couch_replicator_api_wrap.erl12
-rw-r--r--src/couch_replicator/src/couch_replicator_scheduler.erl8
-rw-r--r--src/couch_replicator/src/couch_replicator_scheduler_job.erl51
-rw-r--r--src/couch_replicator/src/couch_replicator_worker.erl18
-rw-r--r--src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl271
5 files changed, 329 insertions, 31 deletions
diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl b/src/couch_replicator/src/couch_replicator_api_wrap.erl
index ab1de7df9..a21de4242 100644
--- a/src/couch_replicator/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl
@@ -186,7 +186,9 @@ get_missing_revs(#httpdb{} = Db, IdRevs) ->
),
{Id, MissingRevs, PossibleAncestors}
end,
- {ok, lists:map(ConvertToNativeFun, Props)}
+ {ok, lists:map(ConvertToNativeFun, Props)};
+ (ErrCode, _, ErrMsg) when is_integer(ErrCode) ->
+ {error, {revs_diff_failed, ErrCode, ErrMsg}}
end).
@@ -408,7 +410,9 @@ update_docs(#httpdb{} = HttpDb, DocList, Options, UpdateType) ->
(413, _, _) ->
{error, request_body_too_large};
(417, _, Results) when is_list(Results) ->
- {ok, bulk_results_to_errors(DocList, Results, remote)}
+ {ok, bulk_results_to_errors(DocList, Results, remote)};
+ (ErrCode, _, ErrMsg) when is_integer(ErrCode) ->
+ {error, {bulk_docs_failed, ErrCode, ErrMsg}}
end).
@@ -466,7 +470,9 @@ changes_since(#httpdb{headers = Headers1, timeout = InactiveTimeout} = HttpDb,
end,
parse_changes_feed(Options, UserFun2,
DataStreamFun2)
- end)
+ end);
+ (ErrCode, _, ErrMsg) when is_integer(ErrCode) ->
+ throw({retry_limit, {changes_req_failed, ErrCode, ErrMsg}})
end)
catch
exit:{http_request_failed, _, _, max_backoff} ->
diff --git a/src/couch_replicator/src/couch_replicator_scheduler.erl b/src/couch_replicator/src/couch_replicator_scheduler.erl
index f84860c6e..53c040e8c 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler.erl
@@ -318,8 +318,12 @@ handle_info({'DOWN', _Ref, process, Pid, normal}, State) ->
update_running_jobs_stats(State#state.stats_pid),
{noreply, State};
-handle_info({'DOWN', _Ref, process, Pid, Reason}, State) ->
+handle_info({'DOWN', _Ref, process, Pid, Reason0}, State) ->
{ok, Job} = job_by_pid(Pid),
+ Reason = case Reason0 of
+ {shutdown, ShutdownReason} -> ShutdownReason;
+ Other -> Other
+ end,
ok = handle_crashed_job(Job, Reason, State),
{noreply, State};
@@ -873,7 +877,7 @@ is_continuous(#job{rep = Rep}) ->
% optimize some options to help the job make progress.
-spec maybe_optimize_job_for_rate_limiting(#job{}) -> #job{}.
maybe_optimize_job_for_rate_limiting(Job = #job{history =
- [{{crashed, {shutdown, max_backoff}}, _} | _]}) ->
+ [{{crashed, max_backoff}, _} | _]}) ->
Opts = [
{checkpoint_interval, 5000},
{worker_processes, 2},
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
index d69febb81..12d3e5530 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
@@ -73,8 +73,6 @@
workers,
stats = couch_replicator_stats:new(),
session_id,
- source_monitor = nil,
- target_monitor = nil,
source_seq = nil,
use_checkpoints = true,
checkpoint_interval = ?DEFAULT_CHECKPOINT_INTERVAL,
@@ -242,14 +240,6 @@ handle_cast({report_seq, Seq},
handle_info(shutdown, St) ->
{stop, shutdown, St};
-handle_info({'DOWN', Ref, _, _, Why}, #rep_state{source_monitor = Ref} = St) ->
- couch_log:error("Source database is down. Reason: ~p", [Why]),
- {stop, source_db_down, St};
-
-handle_info({'DOWN', Ref, _, _, Why}, #rep_state{target_monitor = Ref} = St) ->
- couch_log:error("Target database is down. Reason: ~p", [Why]),
- {stop, target_db_down, St};
-
handle_info({'EXIT', Pid, max_backoff}, State) ->
couch_log:error("Max backoff reached child process ~p", [Pid]),
{stop, {shutdown, max_backoff}, State};
@@ -261,10 +251,20 @@ handle_info({'EXIT', Pid, {shutdown, max_backoff}}, State) ->
handle_info({'EXIT', Pid, normal}, #rep_state{changes_reader=Pid} = State) ->
{noreply, State};
-handle_info({'EXIT', Pid, Reason}, #rep_state{changes_reader=Pid} = State) ->
+handle_info({'EXIT', Pid, Reason0}, #rep_state{changes_reader=Pid} = State) ->
couch_stats:increment_counter([couch_replicator, changes_reader_deaths]),
+ Reason = case Reason0 of
+ {changes_req_failed, _, _} = HttpFail ->
+ HttpFail;
+ {http_request_failed, _, _, {error, {code, Code}}} ->
+ {changes_req_failed, Code};
+ {http_request_failed, _, _, {error, Err}} ->
+ {changes_req_failed, Err};
+ Other ->
+ {changes_reader_died, Other}
+ end,
couch_log:error("ChangesReader process died with reason: ~p", [Reason]),
- {stop, changes_reader_died, cancel_timer(State)};
+ {stop, {shutdown, Reason}, cancel_timer(State)};
handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager = Pid} = State) ->
{noreply, State};
@@ -272,7 +272,7 @@ handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager = Pid} = State) ->
handle_info({'EXIT', Pid, Reason}, #rep_state{changes_manager = Pid} = State) ->
couch_stats:increment_counter([couch_replicator, changes_manager_deaths]),
couch_log:error("ChangesManager process died with reason: ~p", [Reason]),
- {stop, changes_manager_died, cancel_timer(State)};
+ {stop, {shutdown, {changes_manager_died, Reason}}, cancel_timer(State)};
handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) ->
{noreply, State};
@@ -280,7 +280,7 @@ handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) ->
handle_info({'EXIT', Pid, Reason}, #rep_state{changes_queue=Pid} = State) ->
couch_stats:increment_counter([couch_replicator, changes_queue_deaths]),
couch_log:error("ChangesQueue process died with reason: ~p", [Reason]),
- {stop, changes_queue_died, cancel_timer(State)};
+ {stop, {shutdown, {changes_queue_died, Reason}}, cancel_timer(State)};
handle_info({'EXIT', Pid, normal}, #rep_state{workers = Workers} = State) ->
case Workers -- [Pid] of
@@ -304,8 +304,14 @@ handle_info({'EXIT', Pid, Reason}, #rep_state{workers = Workers} = State) ->
{stop, {unknown_process_died, Pid, Reason}, State2};
true ->
couch_stats:increment_counter([couch_replicator, worker_deaths]),
- couch_log:error("Worker ~p died with reason: ~p", [Pid, Reason]),
- {stop, {worker_died, Pid, Reason}, State2}
+ StopReason = case Reason of
+ {shutdown, _} = Err ->
+ Err;
+ Other ->
+ couch_log:error("Worker ~p died with reason: ~p", [Pid, Reason]),
+ {worker_died, Pid, Other}
+ end,
+ {stop, StopReason, State2}
end;
handle_info(timeout, InitArgs) ->
@@ -380,6 +386,11 @@ terminate({shutdown, max_backoff}, State) ->
terminate_cleanup(State),
couch_replicator_notifier:notify({error, RepId, max_backoff});
+terminate({shutdown, Reason}, State) ->
+ % Unwrap so when reporting we don't have an extra {shutdown, ...} tuple
+ % wrapped around the message
+ terminate(Reason, State);
+
terminate(Reason, State) ->
#rep_state{
source_name = Source,
@@ -592,8 +603,6 @@ init_state(Rep) ->
src_starttime = get_value(<<"instance_start_time">>, SourceInfo),
tgt_starttime = get_value(<<"instance_start_time">>, TargetInfo),
session_id = couch_uuids:random(),
- source_monitor = db_monitor(Source),
- target_monitor = db_monitor(Target),
source_seq = SourceSeq,
use_checkpoints = get_value(use_checkpoints, Options, true),
checkpoint_interval = get_value(checkpoint_interval, Options,
@@ -905,12 +914,6 @@ has_session_id(SessionId, [{Props} | Rest]) ->
end.
-db_monitor(#httpdb{}) ->
- nil;
-db_monitor(Db) ->
- couch_db:monitor(Db).
-
-
get_pending_count(St) ->
Rep = St#rep_state.rep_details,
Timeout = get_value(connection_timeout, Rep#rep.options),
diff --git a/src/couch_replicator/src/couch_replicator_worker.erl b/src/couch_replicator/src/couch_replicator_worker.erl
index 3d80f5883..885e171a0 100644
--- a/src/couch_replicator/src/couch_replicator_worker.erl
+++ b/src/couch_replicator/src/couch_replicator_worker.erl
@@ -169,6 +169,15 @@ handle_info({'EXIT', Pid, normal}, #state{writer = nil} = State) ->
handle_info({'EXIT', _Pid, max_backoff}, State) ->
{stop, {shutdown, max_backoff}, State};
+handle_info({'EXIT', _Pid, {bulk_docs_failed, _, _} = Err}, State) ->
+ {stop, {shutdown, Err}, State};
+
+handle_info({'EXIT', _Pid, {revs_diff_failed, _, _} = Err}, State) ->
+ {stop, {shutdown, Err}, State};
+
+handle_info({'EXIT', _Pid, {http_request_failed, _, _, _} = Err}, State) ->
+ {stop, {shutdown, Err}, State};
+
handle_info({'EXIT', Pid, Reason}, State) ->
{stop, {process_died, Pid, Reason}, State}.
@@ -386,7 +395,9 @@ handle_flush_docs_result({ok, Errors}, Target, DocList) ->
couch_replicator_stats:new([
{docs_written, length(DocList) - length(Errors)},
{doc_write_failures, length(Errors)}
- ]).
+ ]);
+handle_flush_docs_result({error, {bulk_docs_failed, _, _} = Err}, _, _) ->
+ exit(Err).
flush_doc(Target, #doc{id = Id, revs = {Pos, [RevId | _]}} = Doc) ->
@@ -425,7 +436,10 @@ find_missing(DocInfos, Target) ->
end, {[], 0}, DocInfos),
- {ok, Missing} = couch_replicator_api_wrap:get_missing_revs(Target, IdRevs),
+ Missing = case couch_replicator_api_wrap:get_missing_revs(Target, IdRevs) of
+ {ok, Result} -> Result;
+ {error, Error} -> exit(Error)
+ end,
MissingRevsCount = lists:foldl(
fun({_Id, MissingRevs, _PAs}, Acc) -> Acc + length(MissingRevs) end,
0, Missing),
diff --git a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
new file mode 100644
index 000000000..6b4f95c25
--- /dev/null
+++ b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
@@ -0,0 +1,271 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_error_reporting_tests).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_replicator/src/couch_replicator.hrl").
+
+
+setup_all() ->
+ test_util:start_couch([couch_replicator, chttpd, mem3, fabric]).
+
+
+teardown_all(Ctx) ->
+ ok = test_util:stop_couch(Ctx).
+
+
+setup() ->
+ meck:unload(),
+ Source = setup_db(),
+ Target = setup_db(),
+ {Source, Target}.
+
+
+teardown({Source, Target}) ->
+ meck:unload(),
+ teardown_db(Source),
+ teardown_db(Target),
+ ok.
+
+
+error_reporting_test_() ->
+ {
+ setup,
+ fun setup_all/0,
+ fun teardown_all/1,
+ {
+ foreach,
+ fun setup/0,
+ fun teardown/1,
+ [
+ fun t_fail_bulk_docs/1,
+ fun t_fail_changes_reader/1,
+ fun t_fail_revs_diff/1,
+ fun t_fail_changes_queue/1,
+ fun t_fail_changes_manager/1,
+ fun t_fail_changes_reader_proc/1
+ ]
+ }
+ }.
+
+
+t_fail_bulk_docs({Source, Target}) ->
+ ?_test(begin
+ populate_db(Source, 1, 5),
+ {ok, RepId} = replicate(Source, Target),
+ wait_target_in_sync(Source, Target),
+
+ {ok, Listener} = rep_result_listener(RepId),
+ mock_fail_req("/_bulk_docs", {ok, "403", [], [<<"{\"x\":\"y\"}">>]}),
+ populate_db(Source, 6, 6),
+
+ {error, Result} = wait_rep_result(RepId),
+ ?assertEqual({bulk_docs_failed, 403, {[{<<"x">>, <<"y">>}]}}, Result),
+
+ couch_replicator_notifier:stop(Listener)
+ end).
+
+
+t_fail_changes_reader({Source, Target}) ->
+ ?_test(begin
+ populate_db(Source, 1, 5),
+ {ok, RepId} = replicate(Source, Target),
+ wait_target_in_sync(Source, Target),
+
+ {ok, Listener} = rep_result_listener(RepId),
+ mock_fail_req("/_changes", {ok, "418", [], [<<"{\"x\":\"y\"}">>]}),
+ populate_db(Source, 6, 6),
+
+ {error, Result} = wait_rep_result(RepId),
+ ?assertEqual({changes_req_failed, 418, {[{<<"x">>, <<"y">>}]}}, Result),
+
+ couch_replicator_notifier:stop(Listener)
+ end).
+
+
+t_fail_revs_diff({Source, Target}) ->
+ ?_test(begin
+ populate_db(Source, 1, 5),
+ {ok, RepId} = replicate(Source, Target),
+ wait_target_in_sync(Source, Target),
+
+ {ok, Listener} = rep_result_listener(RepId),
+ mock_fail_req("/_revs_diff", {ok, "407", [], [<<"{\"x\":\"y\"}">>]}),
+ populate_db(Source, 6, 6),
+
+ {error, Result} = wait_rep_result(RepId),
+ ?assertEqual({revs_diff_failed, 407, {[{<<"x">>, <<"y">>}]}}, Result),
+
+ couch_replicator_notifier:stop(Listener)
+ end).
+
+
+t_fail_changes_queue({Source, Target}) ->
+ ?_test(begin
+ populate_db(Source, 1, 5),
+ {ok, RepId} = replicate(Source, Target),
+ wait_target_in_sync(Source, Target),
+
+ RepPid = couch_replicator_test_helper:get_pid(RepId),
+ State = sys:get_state(RepPid),
+ ChangesQueue = element(20, State),
+ ?assert(is_process_alive(ChangesQueue)),
+
+ {ok, Listener} = rep_result_listener(RepId),
+ exit(ChangesQueue, boom),
+
+ {error, Result} = wait_rep_result(RepId),
+ ?assertEqual({changes_queue_died, boom}, Result),
+ couch_replicator_notifier:stop(Listener)
+ end).
+
+
+t_fail_changes_manager({Source, Target}) ->
+ ?_test(begin
+ populate_db(Source, 1, 5),
+ {ok, RepId} = replicate(Source, Target),
+ wait_target_in_sync(Source, Target),
+
+ RepPid = couch_replicator_test_helper:get_pid(RepId),
+ State = sys:get_state(RepPid),
+ ChangesManager = element(21, State),
+ ?assert(is_process_alive(ChangesManager)),
+
+ {ok, Listener} = rep_result_listener(RepId),
+ exit(ChangesManager, bam),
+
+ {error, Result} = wait_rep_result(RepId),
+ ?assertEqual({changes_manager_died, bam}, Result),
+ couch_replicator_notifier:stop(Listener)
+ end).
+
+
+t_fail_changes_reader_proc({Source, Target}) ->
+ ?_test(begin
+ populate_db(Source, 1, 5),
+ {ok, RepId} = replicate(Source, Target),
+ wait_target_in_sync(Source, Target),
+
+ RepPid = couch_replicator_test_helper:get_pid(RepId),
+ State = sys:get_state(RepPid),
+ ChangesReader = element(22, State),
+ ?assert(is_process_alive(ChangesReader)),
+
+ {ok, Listener} = rep_result_listener(RepId),
+ exit(ChangesReader, kapow),
+
+ {error, Result} = wait_rep_result(RepId),
+ ?assertEqual({changes_reader_died, kapow}, Result),
+ couch_replicator_notifier:stop(Listener)
+ end).
+
+
+mock_fail_req(Path, Return) ->
+ meck:expect(ibrowse, send_req_direct,
+ fun(W, Url, Headers, Meth, Body, Opts, TOut) ->
+ Args = [W, Url, Headers, Meth, Body, Opts, TOut],
+ {ok, {_, _, _, _, UPath, _}} = http_uri:parse(Url),
+ case lists:suffix(Path, UPath) of
+ true -> Return;
+ false -> meck:passthrough(Args)
+ end
+ end).
+
+
+rep_result_listener(RepId) ->
+ ReplyTo = self(),
+ {ok, _Listener} = couch_replicator_notifier:start_link(
+ fun({_, RepId2, _} = Ev) when RepId2 =:= RepId ->
+ ReplyTo ! Ev;
+ (_) ->
+ ok
+ end).
+
+
+wait_rep_result(RepId) ->
+ receive
+ {finished, RepId, RepResult} -> {ok, RepResult};
+ {error, RepId, Reason} -> {error, Reason}
+ end.
+
+
+
+setup_db() ->
+ DbName = ?tempdb(),
+ {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
+ ok = couch_db:close(Db),
+ DbName.
+
+
+teardown_db(DbName) ->
+ ok = couch_server:delete(DbName, [?ADMIN_CTX]).
+
+
+populate_db(DbName, Start, End) ->
+ {ok, Db} = couch_db:open_int(DbName, []),
+ Docs = lists:foldl(
+ fun(DocIdCounter, Acc) ->
+ Id = integer_to_binary(DocIdCounter),
+ Doc = #doc{id = Id, body = {[]}},
+ [Doc | Acc]
+ end,
+ [], lists:seq(Start, End)),
+ {ok, _} = couch_db:update_docs(Db, Docs, []),
+ ok = couch_db:close(Db).
+
+
+wait_target_in_sync(Source, Target) ->
+ {ok, SourceDb} = couch_db:open_int(Source, []),
+ {ok, SourceInfo} = couch_db:get_db_info(SourceDb),
+ ok = couch_db:close(SourceDb),
+ SourceDocCount = couch_util:get_value(doc_count, SourceInfo),
+ wait_target_in_sync_loop(SourceDocCount, Target, 300).
+
+
+wait_target_in_sync_loop(_DocCount, _TargetName, 0) ->
+ erlang:error({assertion_failed, [
+ {module, ?MODULE}, {line, ?LINE},
+ {reason, "Could not get source and target databases in sync"}
+ ]});
+
+wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft) ->
+ {ok, Target} = couch_db:open_int(TargetName, []),
+ {ok, TargetInfo} = couch_db:get_db_info(Target),
+ ok = couch_db:close(Target),
+ TargetDocCount = couch_util:get_value(doc_count, TargetInfo),
+ case TargetDocCount == DocCount of
+ true ->
+ true;
+ false ->
+ ok = timer:sleep(500),
+ wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft - 1)
+ end.
+
+
+replicate(Source, Target) ->
+ SrcUrl = couch_replicator_test_helper:db_url(Source),
+ TgtUrl = couch_replicator_test_helper:db_url(Target),
+ RepObject = {[
+ {<<"source">>, SrcUrl},
+ {<<"target">>, TgtUrl},
+ {<<"continuous">>, true},
+ {<<"worker_processes">>, 1},
+ {<<"retries_per_request">>, 1},
+ % Low connection timeout so _changes feed gets restarted quicker
+ {<<"connection_timeout">>, 3000}
+ ]},
+ {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
+ ok = couch_replicator_scheduler:add_job(Rep),
+ couch_replicator_scheduler:reschedule(),
+ {ok, Rep#rep.id}.