diff options
author | Nick Vatamaniuc <vatamane@apache.org> | 2019-11-01 13:46:44 -0400 |
---|---|---|
committer | Nick Vatamaniuc <nickva@users.noreply.github.com> | 2019-11-01 23:07:05 -0400 |
commit | b9aa4e81815c38859edbb3b19e2a3639c30e9b1f (patch) | |
tree | 5dc6dcbb927ec5da290e7183d4f9235a39c37514 | |
parent | 18db801b751d9aeb22cb99783eee252bc3cf82d1 (diff) | |
download | couchdb-b9aa4e81815c38859edbb3b19e2a3639c30e9b1f.tar.gz |
Do not mark replication jobs as failed if doc processor crashes
Previously if couch_replicator_doc_processor crashed, the job was marked as
"failed". We now ignore that case. It's safe to do that since supervisor will
restart it anyway, and it will rescan all the docs again. Most of all, we want
to prevent the job becoming failed permanently and needing a manual
intervention to restart it.
-rw-r--r-- | src/couch_replicator/src/couch_replicator_doc_processor.erl | 29 |
1 files changed, 25 insertions, 4 deletions
diff --git a/src/couch_replicator/src/couch_replicator_doc_processor.erl b/src/couch_replicator/src/couch_replicator_doc_processor.erl index 772037d8d..23cdeeaac 100644 --- a/src/couch_replicator/src/couch_replicator_doc_processor.erl +++ b/src/couch_replicator/src/couch_replicator_doc_processor.erl @@ -101,6 +101,9 @@ db_change(DbName, {ChangeProps} = Change, Server) -> try ok = process_change(DbName, Change) catch + exit:{Error, {gen_server, call, [?MODULE, _, _]}} -> + ErrMsg = "~p exited ~p while processing change from db ~p", + couch_log:error(ErrMsg, [?MODULE, Error, DbName]); _Tag:Error -> {RepProps} = get_json_value(doc, ChangeProps), DocId = get_json_value(<<"_id">>, RepProps), @@ -611,6 +614,7 @@ cluster_membership_foldl(#rdoc{id = {DbName, DocId} = Id, rid = RepId}, nil) -> -include_lib("eunit/include/eunit.hrl"). -define(DB, <<"db">>). +-define(EXIT_DB, <<"exit_db">>). -define(DOC1, <<"doc1">>). -define(DOC2, <<"doc2">>). -define(R1, {"1", ""}). @@ -625,6 +629,7 @@ doc_processor_test_() -> [ t_bad_change(), t_regular_change(), + t_change_with_doc_processor_crash(), t_change_with_existing_job(), t_deleted_change(), t_triggered_change(), @@ -658,6 +663,15 @@ t_regular_change() -> end). +% Handle cases where doc processor exits or crashes while processing a change +t_change_with_doc_processor_crash() -> + ?_test(begin + mock_existing_jobs_lookup([]), + ?assertEqual(acc, db_change(?EXIT_DB, change(), acc)), + ?assert(failed_state_not_updated()) + end). + + % Regular change, parse to a #rep{} and then add job but there is already % a running job with same Id found. t_change_with_existing_job() -> @@ -834,16 +848,19 @@ setup() -> meck:expect(couch_replicator_clustering, owner, 2, node()), meck:expect(couch_replicator_clustering, link_cluster_event_listener, 3, ok), - meck:expect(couch_replicator_doc_processor_worker, spawn_worker, 4, pid), + meck:expect(couch_replicator_doc_processor_worker, spawn_worker, fun + ({?EXIT_DB, _}, _, _, _) -> exit(kapow); + (_, _, _, _) -> pid + end), meck:expect(couch_replicator_scheduler, remove_job, 1, ok), meck:expect(couch_replicator_docs, remove_state_fields, 2, ok), meck:expect(couch_replicator_docs, update_failed, 3, ok), {ok, Pid} = start_link(), + unlink(Pid), Pid. teardown(Pid) -> - unlink(Pid), exit(Pid, kill), meck:unload(). @@ -871,10 +888,14 @@ did_not_spawn_worker() -> updated_doc_with_failed_state() -> 1 == meck:num_calls(couch_replicator_docs, update_failed, '_'). +failed_state_not_updated() -> + 0 == meck:num_calls(couch_replicator_docs, update_failed, '_'). mock_existing_jobs_lookup(ExistingJobs) -> - meck:expect(couch_replicator_scheduler, find_jobs_by_doc, - fun(?DB, ?DOC1) -> ExistingJobs end). + meck:expect(couch_replicator_scheduler, find_jobs_by_doc, fun + (?EXIT_DB, ?DOC1) -> []; + (?DB, ?DOC1) -> ExistingJobs + end). test_rep(Id) -> |