diff options
author | Nick Vatamaniuc <vatamane@apache.org> | 2020-03-23 16:19:16 -0400 |
---|---|---|
committer | Nick Vatamaniuc <nickva@users.noreply.github.com> | 2020-03-25 15:10:47 -0400 |
commit | e06c5f360ad3b217591267eb2b05e305899b8887 (patch) | |
tree | 2c45b0d9b45c5a96d04655b35ad7bf8f1331fa03 | |
parent | 0f27bf5949b4489f4e47516c1018ee8fcac1f305 (diff) | |
download | couchdb-e06c5f360ad3b217591267eb2b05e305899b8887.tar.gz |
Handle db re-creation in view indexing
Add the db instance id to indexing job data. During indexing ensure the
database is opened with the `{uuid, DbUUID}` option. After that any stale db
reads in `update/3` will throw the `database_does_not_exist` error.
In addition, when the indexing job is re-submitted in `build_view_async/2`,
check if it contains a reference to an old db instance id and replace the job.
That has to happen since couch_jobs doesn't overwrite job data for running
jobs.
-rw-r--r-- | src/couch_views/src/couch_views_indexer.erl | 52 | ||||
-rw-r--r-- | src/couch_views/src/couch_views_jobs.erl | 15 | ||||
-rw-r--r-- | src/couch_views/test/couch_views_indexer_test.erl | 111 |
3 files changed, 152 insertions, 26 deletions
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl index 04dbcf815..b41d0679b 100644 --- a/src/couch_views/src/couch_views_indexer.erl +++ b/src/couch_views/src/couch_views_indexer.erl @@ -46,45 +46,35 @@ init() -> Data = upgrade_data(Data0), #{ <<"db_name">> := DbName, + <<"db_uuid">> := DbUUID, <<"ddoc_id">> := DDocId, <<"sig">> := JobSig, <<"retries">> := Retries } = Data, {ok, Db} = try - fabric2_db:open(DbName, [?ADMIN_CTX]) + fabric2_db:open(DbName, [?ADMIN_CTX, {uuid, DbUUID}]) catch error:database_does_not_exist -> - couch_jobs:finish(undefined, Job, Data#{ - error => db_deleted, - reason => "Database was deleted" - }), - exit(normal) + fail_job(Job, Data, db_deleted, "Database was deleted") end, {ok, DDoc} = case fabric2_db:open_doc(Db, DDocId) of {ok, DDoc0} -> {ok, DDoc0}; {not_found, _} -> - couch_jobs:finish(undefined, Job, Data#{ - error => ddoc_deleted, - reason => "Design document was deleted" - }), - exit(normal) + fail_job(Job, Data, ddoc_deleted, "Design document was deleted") end, {ok, Mrst} = couch_views_util:ddoc_to_mrst(DbName, DDoc), HexSig = fabric2_util:to_hex(Mrst#mrst.sig), - if HexSig == JobSig -> ok; true -> - couch_jobs:finish(undefined, Job, Data#{ - error => sig_changed, - reason => <<"Design document was modified">> - }), - exit(normal) + if HexSig == JobSig -> ok; true -> + fail_job(Job, Data, sig_changed, "Design document was modified") end, State = #{ tx_db => undefined, + db_uuid => DbUUID, db_seq => undefined, view_seq => undefined, last_seq => undefined, @@ -101,6 +91,8 @@ init() -> catch exit:normal -> ok; + error:database_does_not_exist -> + fail_job(Job, Data, db_deleted, "Database was deleted"); Error:Reason -> NewRetry = Retries + 1, RetryLimit = retry_limit(), @@ -115,18 +107,22 @@ init() -> StateErr = State#{job_data := DataErr, last_seq := <<"0">>}, report_progress(StateErr, update); false -> - NewData = add_error(Error, Reason, Data), - couch_jobs:finish(undefined, Job, NewData), - exit(normal) + fail_job(Job, Data, Error, Reason) end end. upgrade_data(Data) -> - case maps:is_key(<<"retries">>, Data) of - true -> Data; - false -> Data#{<<"retries">> =>0} - end. + Defaults = [ + {<<"retries">>, 0}, + {<<"db_uuid">>, undefined} + ], + lists:foldl(fun({Key, Default}, Acc) -> + case maps:is_key(Key, Acc) of + true -> Acc; + false -> maps:put(Key, Default, Acc) + end + end, Data, Defaults). % Transaction limit exceeded don't retry @@ -433,6 +429,7 @@ report_progress(State, UpdateType) -> #{ <<"db_name">> := DbName, + <<"db_uuid">> := DbUUID, <<"ddoc_id">> := DDocId, <<"sig">> := Sig, <<"retries">> := Retries @@ -442,6 +439,7 @@ report_progress(State, UpdateType) -> % possible existing error state. NewData = #{ <<"db_name">> => DbName, + <<"db_uuid">> => DbUUID, <<"ddoc_id">> => DDocId, <<"sig">> => Sig, <<"view_seq">> => LastSeq, @@ -468,6 +466,12 @@ report_progress(State, UpdateType) -> end. +fail_job(Job, Data, Error, Reason) -> + NewData = add_error(Error, Reason, Data), + couch_jobs:finish(undefined, Job, NewData), + exit(normal). + + num_changes() -> config:get_integer("couch_views", "change_limit", 100). diff --git a/src/couch_views/src/couch_views_jobs.erl b/src/couch_views/src/couch_views_jobs.erl index 937146ce8..1604841f1 100644 --- a/src/couch_views/src/couch_views_jobs.erl +++ b/src/couch_views/src/couch_views_jobs.erl @@ -43,7 +43,19 @@ build_view(TxDb, Mrst, UpdateSeq) -> build_view_async(TxDb, Mrst) -> JobId = job_id(TxDb, Mrst), JobData = job_data(TxDb, Mrst), - ok = couch_jobs:add(undefined, ?INDEX_JOB_TYPE, JobId, JobData), + DbUUID = fabric2_db:get_uuid(TxDb), + couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) -> + case couch_jobs:get_job_data(JTx, ?INDEX_JOB_TYPE, JobId) of + {error, not_found} -> + ok; + {ok, #{} = OldJobData} -> + case maps:get(<<"db_uuid">>, OldJobData, undefined) of + DbUUID -> ok; + _ -> couch_jobs:remove(JTx, ?INDEX_JOB_TYPE, JobId) + end + end, + ok = couch_jobs:add(JTx, ?INDEX_JOB_TYPE, JobId, JobData) + end), {ok, JobId}. @@ -95,6 +107,7 @@ job_data(Db, Mrst) -> #{ db_name => fabric2_db:name(Db), + db_uuid => fabric2_db:get_uuid(Db), ddoc_id => DDocId, sig => fabric2_util:to_hex(Sig), retries => 0 diff --git a/src/couch_views/test/couch_views_indexer_test.erl b/src/couch_views/test/couch_views_indexer_test.erl index 5475cf68e..8f8f3c5cb 100644 --- a/src/couch_views/test/couch_views_indexer_test.erl +++ b/src/couch_views/test/couch_views_indexer_test.erl @@ -16,6 +16,7 @@ -include_lib("couch/include/couch_db.hrl"). -include_lib("couch/include/couch_eunit.hrl"). -include_lib("couch_mrview/include/couch_mrview.hrl"). +-include_lib("couch_views/include/couch_views.hrl"). -include_lib("fabric/test/fabric2_test.hrl"). @@ -47,7 +48,9 @@ indexer_test_() -> ?TDEF_FE(fewer_multipe_identical_keys_from_same_doc), ?TDEF_FE(handle_size_key_limits), ?TDEF_FE(handle_size_value_limits), - ?TDEF_FE(index_autoupdater_callback) + ?TDEF_FE(index_autoupdater_callback), + ?TDEF_FE(handle_db_recreated_when_running), + ?TDEF_FE(handle_db_recreated_after_finished) ] } } @@ -75,6 +78,7 @@ foreach_setup() -> foreach_teardown(Db) -> meck:unload(), + config:delete("couch_views", "change_limit"), ok = fabric2_db:delete(fabric2_db:name(Db), []). @@ -372,6 +376,87 @@ index_autoupdater_callback(Db) -> ?assertEqual(ok, couch_views_jobs:wait_for_job(JobId, DbSeq)). +handle_db_recreated_when_running(Db) -> + DbName = fabric2_db:name(Db), + + DDoc = create_ddoc(), + {ok, _} = fabric2_db:update_doc(Db, DDoc, []), + {ok, _} = fabric2_db:update_doc(Db, doc(0), []), + {ok, _} = fabric2_db:update_doc(Db, doc(1), []), + + % To intercept job building while it is running ensure updates happen one + % row at a time. + config:set("couch_views", "change_limit", "1", false), + + meck_intercept_job_update(self()), + + [{ok, JobId}] = couch_views:build_indices(Db, [DDoc]), + + {Indexer, _Job, _Data} = wait_indexer_update(10000), + + {ok, State} = couch_jobs:get_job_state(undefined, ?INDEX_JOB_TYPE, JobId), + ?assertEqual(running, State), + + {ok, SubId, running, _} = couch_jobs:subscribe(?INDEX_JOB_TYPE, JobId), + + ok = fabric2_db:delete(DbName, []), + {ok, Db1} = fabric2_db:create(DbName, [?ADMIN_CTX]), + + Indexer ! continue, + + ?assertMatch({ + ?INDEX_JOB_TYPE, + JobId, + finished, + #{<<"error">> := <<"db_deleted">>} + }, couch_jobs:wait(SubId, infinity)), + + {ok, _} = fabric2_db:update_doc(Db1, DDoc, []), + {ok, _} = fabric2_db:update_doc(Db1, doc(2), []), + {ok, _} = fabric2_db:update_doc(Db1, doc(3), []), + + reset_intercept_job_update(Indexer), + + {ok, Out2} = run_query(Db1, DDoc, ?MAP_FUN1), + ?assertEqual([ + row(<<"2">>, 2, 2), + row(<<"3">>, 3, 3) + ], Out2). + + +handle_db_recreated_after_finished(Db) -> + DbName = fabric2_db:name(Db), + + DDoc = create_ddoc(), + {ok, _} = fabric2_db:update_doc(Db, DDoc, []), + {ok, _} = fabric2_db:update_doc(Db, doc(0), []), + {ok, _} = fabric2_db:update_doc(Db, doc(1), []), + + {ok, Out1} = run_query(Db, DDoc, ?MAP_FUN1), + ?assertEqual([ + row(<<"0">>, 0, 0), + row(<<"1">>, 1, 1) + ], Out1), + + ok = fabric2_db:delete(DbName, []), + + ?assertError(database_does_not_exist, run_query(Db, DDoc, ?MAP_FUN1)), + + {ok, Db1} = fabric2_db:create(DbName, [?ADMIN_CTX]), + + {ok, _} = fabric2_db:update_doc(Db1, DDoc, []), + {ok, _} = fabric2_db:update_doc(Db1, doc(2), []), + {ok, _} = fabric2_db:update_doc(Db1, doc(3), []), + + ?assertError(database_does_not_exist, run_query(Db, DDoc, ?MAP_FUN1)), + + {ok, Out2} = run_query(Db1, DDoc, ?MAP_FUN1), + ?assertEqual([ + row(<<"2">>, 2, 2), + row(<<"3">>, 3, 3) + ], Out2). + + row(Id, Key, Value) -> {row, [ {id, Id}, @@ -480,3 +565,27 @@ doc(Id, Val) -> run_query(#{} = Db, DDoc, <<_/binary>> = View) -> couch_views:query(Db, DDoc, View, fun fold_fun/2, [], #mrargs{}). + + +meck_intercept_job_update(ParentPid) -> + meck:new(couch_jobs, [passthrough]), + meck:expect(couch_jobs, update, fun(Db, Job, Data) -> + ParentPid ! {self(), Job, Data}, + receive continue -> ok end, + meck:passthrough([Db, Job, Data]) + end). + + +reset_intercept_job_update(IndexerPid) -> + meck:expect(couch_jobs, update, fun(Db, Job, Data) -> + meck:passthrough([Db, Job, Data]) + end), + IndexerPid ! continue. + + +wait_indexer_update(Timeout) -> + receive + {Pid, Job, Data} when is_pid(Pid) -> {Pid, Job, Data} + after Timeout -> + error(timeout_in_wait_indexer_update) + end. |