summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2019-11-01 13:46:44 -0400
committerNick Vatamaniuc <vatamane@apache.org>2019-11-01 15:29:56 -0400
commit89dc3852a1f1df4483dd1194dfa3561c16b0d4d3 (patch)
tree5dc6dcbb927ec5da290e7183d4f9235a39c37514
parent18db801b751d9aeb22cb99783eee252bc3cf82d1 (diff)
downloadcouchdb-handle-replication-process-restart-better.tar.gz
Do not mark replication jobs as failed if doc processor crasheshandle-replication-process-restart-better
Previously if couch_replicator_doc_processor crashed, the job was marked as "failed". We now ignore that case. It's safe to do that since supervisor will restart it anyway, and it will rescan all the docs again. Most of all, we want to prevent the job becoming failed permanently and needing a manual intervention to restart it.
-rw-r--r--src/couch_replicator/src/couch_replicator_doc_processor.erl29
1 files changed, 25 insertions, 4 deletions
diff --git a/src/couch_replicator/src/couch_replicator_doc_processor.erl b/src/couch_replicator/src/couch_replicator_doc_processor.erl
index 772037d8d..23cdeeaac 100644
--- a/src/couch_replicator/src/couch_replicator_doc_processor.erl
+++ b/src/couch_replicator/src/couch_replicator_doc_processor.erl
@@ -101,6 +101,9 @@ db_change(DbName, {ChangeProps} = Change, Server) ->
try
ok = process_change(DbName, Change)
catch
+ exit:{Error, {gen_server, call, [?MODULE, _, _]}} ->
+ ErrMsg = "~p exited ~p while processing change from db ~p",
+ couch_log:error(ErrMsg, [?MODULE, Error, DbName]);
_Tag:Error ->
{RepProps} = get_json_value(doc, ChangeProps),
DocId = get_json_value(<<"_id">>, RepProps),
@@ -611,6 +614,7 @@ cluster_membership_foldl(#rdoc{id = {DbName, DocId} = Id, rid = RepId}, nil) ->
-include_lib("eunit/include/eunit.hrl").
-define(DB, <<"db">>).
+-define(EXIT_DB, <<"exit_db">>).
-define(DOC1, <<"doc1">>).
-define(DOC2, <<"doc2">>).
-define(R1, {"1", ""}).
@@ -625,6 +629,7 @@ doc_processor_test_() ->
[
t_bad_change(),
t_regular_change(),
+ t_change_with_doc_processor_crash(),
t_change_with_existing_job(),
t_deleted_change(),
t_triggered_change(),
@@ -658,6 +663,15 @@ t_regular_change() ->
end).
+% Handle cases where doc processor exits or crashes while processing a change
+t_change_with_doc_processor_crash() ->
+ ?_test(begin
+ mock_existing_jobs_lookup([]),
+ ?assertEqual(acc, db_change(?EXIT_DB, change(), acc)),
+ ?assert(failed_state_not_updated())
+ end).
+
+
% Regular change, parse to a #rep{} and then add job but there is already
% a running job with same Id found.
t_change_with_existing_job() ->
@@ -834,16 +848,19 @@ setup() ->
meck:expect(couch_replicator_clustering, owner, 2, node()),
meck:expect(couch_replicator_clustering, link_cluster_event_listener, 3,
ok),
- meck:expect(couch_replicator_doc_processor_worker, spawn_worker, 4, pid),
+ meck:expect(couch_replicator_doc_processor_worker, spawn_worker, fun
+ ({?EXIT_DB, _}, _, _, _) -> exit(kapow);
+ (_, _, _, _) -> pid
+ end),
meck:expect(couch_replicator_scheduler, remove_job, 1, ok),
meck:expect(couch_replicator_docs, remove_state_fields, 2, ok),
meck:expect(couch_replicator_docs, update_failed, 3, ok),
{ok, Pid} = start_link(),
+ unlink(Pid),
Pid.
teardown(Pid) ->
- unlink(Pid),
exit(Pid, kill),
meck:unload().
@@ -871,10 +888,14 @@ did_not_spawn_worker() ->
updated_doc_with_failed_state() ->
1 == meck:num_calls(couch_replicator_docs, update_failed, '_').
+failed_state_not_updated() ->
+ 0 == meck:num_calls(couch_replicator_docs, update_failed, '_').
mock_existing_jobs_lookup(ExistingJobs) ->
- meck:expect(couch_replicator_scheduler, find_jobs_by_doc,
- fun(?DB, ?DOC1) -> ExistingJobs end).
+ meck:expect(couch_replicator_scheduler, find_jobs_by_doc, fun
+ (?EXIT_DB, ?DOC1) -> [];
+ (?DB, ?DOC1) -> ExistingJobs
+ end).
test_rep(Id) ->