summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoan Touzet <wohali@users.noreply.github.com>2020-01-13 21:36:54 +0000
committerGitHub <noreply@github.com>2020-01-13 21:36:54 +0000
commitf366a2df199d5338cd1a71a1eeedea50caa6da12 (patch)
tree4783c3ebba9fa82ea849660e076bd60eb03eaaaa
parentd396a7c0eca4429c50a552d8f83a65527c8e2376 (diff)
parentefb374a6493c4220333a201b649649736ddcc528 (diff)
downloadcouchdb-compiler-warnings-1.tar.gz
Merge branch 'master' into compiler-warnings-1compiler-warnings-1
-rw-r--r--LICENSE2
-rw-r--r--NOTICE2
-rw-r--r--src/couch_replicator/src/couch_replicator_api_wrap.erl12
-rw-r--r--src/couch_replicator/src/couch_replicator_scheduler.erl8
-rw-r--r--src/couch_replicator/src/couch_replicator_scheduler_job.erl51
-rw-r--r--src/couch_replicator/src/couch_replicator_worker.erl18
-rw-r--r--src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl271
7 files changed, 331 insertions, 33 deletions
diff --git a/LICENSE b/LICENSE
index 43602516a..8278d4a2d 100644
--- a/LICENSE
+++ b/LICENSE
@@ -187,7 +187,7 @@
same "printed page" as the copyright notice for easier
identification within third-party archives.
- Copyright 2019 The Apache Foundation
+ Copyright 2020 The Apache Foundation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
diff --git a/NOTICE b/NOTICE
index 23cf02ff6..e8674668b 100644
--- a/NOTICE
+++ b/NOTICE
@@ -1,5 +1,5 @@
Apache CouchDB
-Copyright 2009-2019 The Apache Software Foundation
+Copyright 2009-2020 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl b/src/couch_replicator/src/couch_replicator_api_wrap.erl
index ab1de7df9..a21de4242 100644
--- a/src/couch_replicator/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl
@@ -186,7 +186,9 @@ get_missing_revs(#httpdb{} = Db, IdRevs) ->
),
{Id, MissingRevs, PossibleAncestors}
end,
- {ok, lists:map(ConvertToNativeFun, Props)}
+ {ok, lists:map(ConvertToNativeFun, Props)};
+ (ErrCode, _, ErrMsg) when is_integer(ErrCode) ->
+ {error, {revs_diff_failed, ErrCode, ErrMsg}}
end).
@@ -408,7 +410,9 @@ update_docs(#httpdb{} = HttpDb, DocList, Options, UpdateType) ->
(413, _, _) ->
{error, request_body_too_large};
(417, _, Results) when is_list(Results) ->
- {ok, bulk_results_to_errors(DocList, Results, remote)}
+ {ok, bulk_results_to_errors(DocList, Results, remote)};
+ (ErrCode, _, ErrMsg) when is_integer(ErrCode) ->
+ {error, {bulk_docs_failed, ErrCode, ErrMsg}}
end).
@@ -466,7 +470,9 @@ changes_since(#httpdb{headers = Headers1, timeout = InactiveTimeout} = HttpDb,
end,
parse_changes_feed(Options, UserFun2,
DataStreamFun2)
- end)
+ end);
+ (ErrCode, _, ErrMsg) when is_integer(ErrCode) ->
+ throw({retry_limit, {changes_req_failed, ErrCode, ErrMsg}})
end)
catch
exit:{http_request_failed, _, _, max_backoff} ->
diff --git a/src/couch_replicator/src/couch_replicator_scheduler.erl b/src/couch_replicator/src/couch_replicator_scheduler.erl
index f84860c6e..53c040e8c 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler.erl
@@ -318,8 +318,12 @@ handle_info({'DOWN', _Ref, process, Pid, normal}, State) ->
update_running_jobs_stats(State#state.stats_pid),
{noreply, State};
-handle_info({'DOWN', _Ref, process, Pid, Reason}, State) ->
+handle_info({'DOWN', _Ref, process, Pid, Reason0}, State) ->
{ok, Job} = job_by_pid(Pid),
+ Reason = case Reason0 of
+ {shutdown, ShutdownReason} -> ShutdownReason;
+ Other -> Other
+ end,
ok = handle_crashed_job(Job, Reason, State),
{noreply, State};
@@ -873,7 +877,7 @@ is_continuous(#job{rep = Rep}) ->
% optimize some options to help the job make progress.
-spec maybe_optimize_job_for_rate_limiting(#job{}) -> #job{}.
maybe_optimize_job_for_rate_limiting(Job = #job{history =
- [{{crashed, {shutdown, max_backoff}}, _} | _]}) ->
+ [{{crashed, max_backoff}, _} | _]}) ->
Opts = [
{checkpoint_interval, 5000},
{worker_processes, 2},
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
index d69febb81..12d3e5530 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
@@ -73,8 +73,6 @@
workers,
stats = couch_replicator_stats:new(),
session_id,
- source_monitor = nil,
- target_monitor = nil,
source_seq = nil,
use_checkpoints = true,
checkpoint_interval = ?DEFAULT_CHECKPOINT_INTERVAL,
@@ -242,14 +240,6 @@ handle_cast({report_seq, Seq},
handle_info(shutdown, St) ->
{stop, shutdown, St};
-handle_info({'DOWN', Ref, _, _, Why}, #rep_state{source_monitor = Ref} = St) ->
- couch_log:error("Source database is down. Reason: ~p", [Why]),
- {stop, source_db_down, St};
-
-handle_info({'DOWN', Ref, _, _, Why}, #rep_state{target_monitor = Ref} = St) ->
- couch_log:error("Target database is down. Reason: ~p", [Why]),
- {stop, target_db_down, St};
-
handle_info({'EXIT', Pid, max_backoff}, State) ->
couch_log:error("Max backoff reached child process ~p", [Pid]),
{stop, {shutdown, max_backoff}, State};
@@ -261,10 +251,20 @@ handle_info({'EXIT', Pid, {shutdown, max_backoff}}, State) ->
handle_info({'EXIT', Pid, normal}, #rep_state{changes_reader=Pid} = State) ->
{noreply, State};
-handle_info({'EXIT', Pid, Reason}, #rep_state{changes_reader=Pid} = State) ->
+handle_info({'EXIT', Pid, Reason0}, #rep_state{changes_reader=Pid} = State) ->
couch_stats:increment_counter([couch_replicator, changes_reader_deaths]),
+ Reason = case Reason0 of
+ {changes_req_failed, _, _} = HttpFail ->
+ HttpFail;
+ {http_request_failed, _, _, {error, {code, Code}}} ->
+ {changes_req_failed, Code};
+ {http_request_failed, _, _, {error, Err}} ->
+ {changes_req_failed, Err};
+ Other ->
+ {changes_reader_died, Other}
+ end,
couch_log:error("ChangesReader process died with reason: ~p", [Reason]),
- {stop, changes_reader_died, cancel_timer(State)};
+ {stop, {shutdown, Reason}, cancel_timer(State)};
handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager = Pid} = State) ->
{noreply, State};
@@ -272,7 +272,7 @@ handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager = Pid} = State) ->
handle_info({'EXIT', Pid, Reason}, #rep_state{changes_manager = Pid} = State) ->
couch_stats:increment_counter([couch_replicator, changes_manager_deaths]),
couch_log:error("ChangesManager process died with reason: ~p", [Reason]),
- {stop, changes_manager_died, cancel_timer(State)};
+ {stop, {shutdown, {changes_manager_died, Reason}}, cancel_timer(State)};
handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) ->
{noreply, State};
@@ -280,7 +280,7 @@ handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) ->
handle_info({'EXIT', Pid, Reason}, #rep_state{changes_queue=Pid} = State) ->
couch_stats:increment_counter([couch_replicator, changes_queue_deaths]),
couch_log:error("ChangesQueue process died with reason: ~p", [Reason]),
- {stop, changes_queue_died, cancel_timer(State)};
+ {stop, {shutdown, {changes_queue_died, Reason}}, cancel_timer(State)};
handle_info({'EXIT', Pid, normal}, #rep_state{workers = Workers} = State) ->
case Workers -- [Pid] of
@@ -304,8 +304,14 @@ handle_info({'EXIT', Pid, Reason}, #rep_state{workers = Workers} = State) ->
{stop, {unknown_process_died, Pid, Reason}, State2};
true ->
couch_stats:increment_counter([couch_replicator, worker_deaths]),
- couch_log:error("Worker ~p died with reason: ~p", [Pid, Reason]),
- {stop, {worker_died, Pid, Reason}, State2}
+ StopReason = case Reason of
+ {shutdown, _} = Err ->
+ Err;
+ Other ->
+ couch_log:error("Worker ~p died with reason: ~p", [Pid, Reason]),
+ {worker_died, Pid, Other}
+ end,
+ {stop, StopReason, State2}
end;
handle_info(timeout, InitArgs) ->
@@ -380,6 +386,11 @@ terminate({shutdown, max_backoff}, State) ->
terminate_cleanup(State),
couch_replicator_notifier:notify({error, RepId, max_backoff});
+terminate({shutdown, Reason}, State) ->
+ % Unwrap so when reporting we don't have an extra {shutdown, ...} tuple
+ % wrapped around the message
+ terminate(Reason, State);
+
terminate(Reason, State) ->
#rep_state{
source_name = Source,
@@ -592,8 +603,6 @@ init_state(Rep) ->
src_starttime = get_value(<<"instance_start_time">>, SourceInfo),
tgt_starttime = get_value(<<"instance_start_time">>, TargetInfo),
session_id = couch_uuids:random(),
- source_monitor = db_monitor(Source),
- target_monitor = db_monitor(Target),
source_seq = SourceSeq,
use_checkpoints = get_value(use_checkpoints, Options, true),
checkpoint_interval = get_value(checkpoint_interval, Options,
@@ -905,12 +914,6 @@ has_session_id(SessionId, [{Props} | Rest]) ->
end.
-db_monitor(#httpdb{}) ->
- nil;
-db_monitor(Db) ->
- couch_db:monitor(Db).
-
-
get_pending_count(St) ->
Rep = St#rep_state.rep_details,
Timeout = get_value(connection_timeout, Rep#rep.options),
diff --git a/src/couch_replicator/src/couch_replicator_worker.erl b/src/couch_replicator/src/couch_replicator_worker.erl
index 3d80f5883..885e171a0 100644
--- a/src/couch_replicator/src/couch_replicator_worker.erl
+++ b/src/couch_replicator/src/couch_replicator_worker.erl
@@ -169,6 +169,15 @@ handle_info({'EXIT', Pid, normal}, #state{writer = nil} = State) ->
handle_info({'EXIT', _Pid, max_backoff}, State) ->
{stop, {shutdown, max_backoff}, State};
+handle_info({'EXIT', _Pid, {bulk_docs_failed, _, _} = Err}, State) ->
+ {stop, {shutdown, Err}, State};
+
+handle_info({'EXIT', _Pid, {revs_diff_failed, _, _} = Err}, State) ->
+ {stop, {shutdown, Err}, State};
+
+handle_info({'EXIT', _Pid, {http_request_failed, _, _, _} = Err}, State) ->
+ {stop, {shutdown, Err}, State};
+
handle_info({'EXIT', Pid, Reason}, State) ->
{stop, {process_died, Pid, Reason}, State}.
@@ -386,7 +395,9 @@ handle_flush_docs_result({ok, Errors}, Target, DocList) ->
couch_replicator_stats:new([
{docs_written, length(DocList) - length(Errors)},
{doc_write_failures, length(Errors)}
- ]).
+ ]);
+handle_flush_docs_result({error, {bulk_docs_failed, _, _} = Err}, _, _) ->
+ exit(Err).
flush_doc(Target, #doc{id = Id, revs = {Pos, [RevId | _]}} = Doc) ->
@@ -425,7 +436,10 @@ find_missing(DocInfos, Target) ->
end, {[], 0}, DocInfos),
- {ok, Missing} = couch_replicator_api_wrap:get_missing_revs(Target, IdRevs),
+ Missing = case couch_replicator_api_wrap:get_missing_revs(Target, IdRevs) of
+ {ok, Result} -> Result;
+ {error, Error} -> exit(Error)
+ end,
MissingRevsCount = lists:foldl(
fun({_Id, MissingRevs, _PAs}, Acc) -> Acc + length(MissingRevs) end,
0, Missing),
diff --git a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
new file mode 100644
index 000000000..6b4f95c25
--- /dev/null
+++ b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
@@ -0,0 +1,271 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_error_reporting_tests).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_replicator/src/couch_replicator.hrl").
+
+
+setup_all() ->
+ test_util:start_couch([couch_replicator, chttpd, mem3, fabric]).
+
+
+teardown_all(Ctx) ->
+ ok = test_util:stop_couch(Ctx).
+
+
+setup() ->
+ meck:unload(),
+ Source = setup_db(),
+ Target = setup_db(),
+ {Source, Target}.
+
+
+teardown({Source, Target}) ->
+ meck:unload(),
+ teardown_db(Source),
+ teardown_db(Target),
+ ok.
+
+
+error_reporting_test_() ->
+ {
+ setup,
+ fun setup_all/0,
+ fun teardown_all/1,
+ {
+ foreach,
+ fun setup/0,
+ fun teardown/1,
+ [
+ fun t_fail_bulk_docs/1,
+ fun t_fail_changes_reader/1,
+ fun t_fail_revs_diff/1,
+ fun t_fail_changes_queue/1,
+ fun t_fail_changes_manager/1,
+ fun t_fail_changes_reader_proc/1
+ ]
+ }
+ }.
+
+
+t_fail_bulk_docs({Source, Target}) ->
+ ?_test(begin
+ populate_db(Source, 1, 5),
+ {ok, RepId} = replicate(Source, Target),
+ wait_target_in_sync(Source, Target),
+
+ {ok, Listener} = rep_result_listener(RepId),
+ mock_fail_req("/_bulk_docs", {ok, "403", [], [<<"{\"x\":\"y\"}">>]}),
+ populate_db(Source, 6, 6),
+
+ {error, Result} = wait_rep_result(RepId),
+ ?assertEqual({bulk_docs_failed, 403, {[{<<"x">>, <<"y">>}]}}, Result),
+
+ couch_replicator_notifier:stop(Listener)
+ end).
+
+
+t_fail_changes_reader({Source, Target}) ->
+ ?_test(begin
+ populate_db(Source, 1, 5),
+ {ok, RepId} = replicate(Source, Target),
+ wait_target_in_sync(Source, Target),
+
+ {ok, Listener} = rep_result_listener(RepId),
+ mock_fail_req("/_changes", {ok, "418", [], [<<"{\"x\":\"y\"}">>]}),
+ populate_db(Source, 6, 6),
+
+ {error, Result} = wait_rep_result(RepId),
+ ?assertEqual({changes_req_failed, 418, {[{<<"x">>, <<"y">>}]}}, Result),
+
+ couch_replicator_notifier:stop(Listener)
+ end).
+
+
+t_fail_revs_diff({Source, Target}) ->
+ ?_test(begin
+ populate_db(Source, 1, 5),
+ {ok, RepId} = replicate(Source, Target),
+ wait_target_in_sync(Source, Target),
+
+ {ok, Listener} = rep_result_listener(RepId),
+ mock_fail_req("/_revs_diff", {ok, "407", [], [<<"{\"x\":\"y\"}">>]}),
+ populate_db(Source, 6, 6),
+
+ {error, Result} = wait_rep_result(RepId),
+ ?assertEqual({revs_diff_failed, 407, {[{<<"x">>, <<"y">>}]}}, Result),
+
+ couch_replicator_notifier:stop(Listener)
+ end).
+
+
+t_fail_changes_queue({Source, Target}) ->
+ ?_test(begin
+ populate_db(Source, 1, 5),
+ {ok, RepId} = replicate(Source, Target),
+ wait_target_in_sync(Source, Target),
+
+ RepPid = couch_replicator_test_helper:get_pid(RepId),
+ State = sys:get_state(RepPid),
+ ChangesQueue = element(20, State),
+ ?assert(is_process_alive(ChangesQueue)),
+
+ {ok, Listener} = rep_result_listener(RepId),
+ exit(ChangesQueue, boom),
+
+ {error, Result} = wait_rep_result(RepId),
+ ?assertEqual({changes_queue_died, boom}, Result),
+ couch_replicator_notifier:stop(Listener)
+ end).
+
+
+t_fail_changes_manager({Source, Target}) ->
+ ?_test(begin
+ populate_db(Source, 1, 5),
+ {ok, RepId} = replicate(Source, Target),
+ wait_target_in_sync(Source, Target),
+
+ RepPid = couch_replicator_test_helper:get_pid(RepId),
+ State = sys:get_state(RepPid),
+ ChangesManager = element(21, State),
+ ?assert(is_process_alive(ChangesManager)),
+
+ {ok, Listener} = rep_result_listener(RepId),
+ exit(ChangesManager, bam),
+
+ {error, Result} = wait_rep_result(RepId),
+ ?assertEqual({changes_manager_died, bam}, Result),
+ couch_replicator_notifier:stop(Listener)
+ end).
+
+
+t_fail_changes_reader_proc({Source, Target}) ->
+ ?_test(begin
+ populate_db(Source, 1, 5),
+ {ok, RepId} = replicate(Source, Target),
+ wait_target_in_sync(Source, Target),
+
+ RepPid = couch_replicator_test_helper:get_pid(RepId),
+ State = sys:get_state(RepPid),
+ ChangesReader = element(22, State),
+ ?assert(is_process_alive(ChangesReader)),
+
+ {ok, Listener} = rep_result_listener(RepId),
+ exit(ChangesReader, kapow),
+
+ {error, Result} = wait_rep_result(RepId),
+ ?assertEqual({changes_reader_died, kapow}, Result),
+ couch_replicator_notifier:stop(Listener)
+ end).
+
+
+mock_fail_req(Path, Return) ->
+ meck:expect(ibrowse, send_req_direct,
+ fun(W, Url, Headers, Meth, Body, Opts, TOut) ->
+ Args = [W, Url, Headers, Meth, Body, Opts, TOut],
+ {ok, {_, _, _, _, UPath, _}} = http_uri:parse(Url),
+ case lists:suffix(Path, UPath) of
+ true -> Return;
+ false -> meck:passthrough(Args)
+ end
+ end).
+
+
+rep_result_listener(RepId) ->
+ ReplyTo = self(),
+ {ok, _Listener} = couch_replicator_notifier:start_link(
+ fun({_, RepId2, _} = Ev) when RepId2 =:= RepId ->
+ ReplyTo ! Ev;
+ (_) ->
+ ok
+ end).
+
+
+wait_rep_result(RepId) ->
+ receive
+ {finished, RepId, RepResult} -> {ok, RepResult};
+ {error, RepId, Reason} -> {error, Reason}
+ end.
+
+
+
+setup_db() ->
+ DbName = ?tempdb(),
+ {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
+ ok = couch_db:close(Db),
+ DbName.
+
+
+teardown_db(DbName) ->
+ ok = couch_server:delete(DbName, [?ADMIN_CTX]).
+
+
+populate_db(DbName, Start, End) ->
+ {ok, Db} = couch_db:open_int(DbName, []),
+ Docs = lists:foldl(
+ fun(DocIdCounter, Acc) ->
+ Id = integer_to_binary(DocIdCounter),
+ Doc = #doc{id = Id, body = {[]}},
+ [Doc | Acc]
+ end,
+ [], lists:seq(Start, End)),
+ {ok, _} = couch_db:update_docs(Db, Docs, []),
+ ok = couch_db:close(Db).
+
+
+wait_target_in_sync(Source, Target) ->
+ {ok, SourceDb} = couch_db:open_int(Source, []),
+ {ok, SourceInfo} = couch_db:get_db_info(SourceDb),
+ ok = couch_db:close(SourceDb),
+ SourceDocCount = couch_util:get_value(doc_count, SourceInfo),
+ wait_target_in_sync_loop(SourceDocCount, Target, 300).
+
+
+wait_target_in_sync_loop(_DocCount, _TargetName, 0) ->
+ erlang:error({assertion_failed, [
+ {module, ?MODULE}, {line, ?LINE},
+ {reason, "Could not get source and target databases in sync"}
+ ]});
+
+wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft) ->
+ {ok, Target} = couch_db:open_int(TargetName, []),
+ {ok, TargetInfo} = couch_db:get_db_info(Target),
+ ok = couch_db:close(Target),
+ TargetDocCount = couch_util:get_value(doc_count, TargetInfo),
+ case TargetDocCount == DocCount of
+ true ->
+ true;
+ false ->
+ ok = timer:sleep(500),
+ wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft - 1)
+ end.
+
+
+replicate(Source, Target) ->
+ SrcUrl = couch_replicator_test_helper:db_url(Source),
+ TgtUrl = couch_replicator_test_helper:db_url(Target),
+ RepObject = {[
+ {<<"source">>, SrcUrl},
+ {<<"target">>, TgtUrl},
+ {<<"continuous">>, true},
+ {<<"worker_processes">>, 1},
+ {<<"retries_per_request">>, 1},
+ % Low connection timeout so _changes feed gets restarted quicker
+ {<<"connection_timeout">>, 3000}
+ ]},
+ {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
+ ok = couch_replicator_scheduler:add_job(Rep),
+ couch_replicator_scheduler:reschedule(),
+ {ok, Rep#rep.id}.