summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2019-11-01 13:46:44 -0400
committerNick Vatamaniuc <nickva@users.noreply.github.com>2019-11-01 23:07:05 -0400
commitb9aa4e81815c38859edbb3b19e2a3639c30e9b1f (patch)
tree5dc6dcbb927ec5da290e7183d4f9235a39c37514
parent18db801b751d9aeb22cb99783eee252bc3cf82d1 (diff)
downloadcouchdb-b9aa4e81815c38859edbb3b19e2a3639c30e9b1f.tar.gz
Do not mark replication jobs as failed if doc processor crashes
Previously if couch_replicator_doc_processor crashed, the job was marked as "failed". We now ignore that case. It's safe to do that since supervisor will restart it anyway, and it will rescan all the docs again. Most of all, we want to prevent the job becoming failed permanently and needing a manual intervention to restart it.
-rw-r--r--src/couch_replicator/src/couch_replicator_doc_processor.erl29
1 files changed, 25 insertions, 4 deletions
diff --git a/src/couch_replicator/src/couch_replicator_doc_processor.erl b/src/couch_replicator/src/couch_replicator_doc_processor.erl
index 772037d8d..23cdeeaac 100644
--- a/src/couch_replicator/src/couch_replicator_doc_processor.erl
+++ b/src/couch_replicator/src/couch_replicator_doc_processor.erl
@@ -101,6 +101,9 @@ db_change(DbName, {ChangeProps} = Change, Server) ->
try
ok = process_change(DbName, Change)
catch
+ exit:{Error, {gen_server, call, [?MODULE, _, _]}} ->
+ ErrMsg = "~p exited ~p while processing change from db ~p",
+ couch_log:error(ErrMsg, [?MODULE, Error, DbName]);
_Tag:Error ->
{RepProps} = get_json_value(doc, ChangeProps),
DocId = get_json_value(<<"_id">>, RepProps),
@@ -611,6 +614,7 @@ cluster_membership_foldl(#rdoc{id = {DbName, DocId} = Id, rid = RepId}, nil) ->
-include_lib("eunit/include/eunit.hrl").
-define(DB, <<"db">>).
+-define(EXIT_DB, <<"exit_db">>).
-define(DOC1, <<"doc1">>).
-define(DOC2, <<"doc2">>).
-define(R1, {"1", ""}).
@@ -625,6 +629,7 @@ doc_processor_test_() ->
[
t_bad_change(),
t_regular_change(),
+ t_change_with_doc_processor_crash(),
t_change_with_existing_job(),
t_deleted_change(),
t_triggered_change(),
@@ -658,6 +663,15 @@ t_regular_change() ->
end).
+% Handle cases where doc processor exits or crashes while processing a change
+t_change_with_doc_processor_crash() ->
+ ?_test(begin
+ mock_existing_jobs_lookup([]),
+ ?assertEqual(acc, db_change(?EXIT_DB, change(), acc)),
+ ?assert(failed_state_not_updated())
+ end).
+
+
% Regular change, parse to a #rep{} and then add job but there is already
% a running job with same Id found.
t_change_with_existing_job() ->
@@ -834,16 +848,19 @@ setup() ->
meck:expect(couch_replicator_clustering, owner, 2, node()),
meck:expect(couch_replicator_clustering, link_cluster_event_listener, 3,
ok),
- meck:expect(couch_replicator_doc_processor_worker, spawn_worker, 4, pid),
+ meck:expect(couch_replicator_doc_processor_worker, spawn_worker, fun
+ ({?EXIT_DB, _}, _, _, _) -> exit(kapow);
+ (_, _, _, _) -> pid
+ end),
meck:expect(couch_replicator_scheduler, remove_job, 1, ok),
meck:expect(couch_replicator_docs, remove_state_fields, 2, ok),
meck:expect(couch_replicator_docs, update_failed, 3, ok),
{ok, Pid} = start_link(),
+ unlink(Pid),
Pid.
teardown(Pid) ->
- unlink(Pid),
exit(Pid, kill),
meck:unload().
@@ -871,10 +888,14 @@ did_not_spawn_worker() ->
updated_doc_with_failed_state() ->
1 == meck:num_calls(couch_replicator_docs, update_failed, '_').
+failed_state_not_updated() ->
+ 0 == meck:num_calls(couch_replicator_docs, update_failed, '_').
mock_existing_jobs_lookup(ExistingJobs) ->
- meck:expect(couch_replicator_scheduler, find_jobs_by_doc,
- fun(?DB, ?DOC1) -> ExistingJobs end).
+ meck:expect(couch_replicator_scheduler, find_jobs_by_doc, fun
+ (?EXIT_DB, ?DOC1) -> [];
+ (?DB, ?DOC1) -> ExistingJobs
+ end).
test_rep(Id) ->