summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2020-03-23 16:19:16 -0400
committerNick Vatamaniuc <vatamane@apache.org>2020-03-24 14:13:11 -0400
commitf1f1c3717bda86854b7e6ff01dc8b2b7ac905172 (patch)
treedf718d923be2d83cfde38dd9de17b1870b8c2766
parented83bf95de6c3eb4bf82eec5243112244f1d8277 (diff)
downloadcouchdb-check-db-instance-in-indices.tar.gz
Handle db re-creation in view indexingcheck-db-instance-in-indices
Add the db instance id to indexing job data. During indexing ensure the database is opened with the `{uuid, DbUUID}` option. After that any stale db reads in `update/3` will throw the `database_does_not_exist` error. In addition, when the indexing job is re-submitted in `build_view_async/2`, check if it contains a reference to an old db instance id and replace the job. That has to happen since couch_jobs doesn't overwrite job data for running jobs.
-rw-r--r--src/couch_views/src/couch_views_indexer.erl53
-rw-r--r--src/couch_views/src/couch_views_jobs.erl15
-rw-r--r--src/couch_views/test/couch_views_indexer_test.erl100
3 files changed, 141 insertions, 27 deletions
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index 04dbcf815..8ca312ee6 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -46,45 +46,34 @@ init() ->
Data = upgrade_data(Data0),
#{
<<"db_name">> := DbName,
+ <<"db_uuid">> := DbUUID,
<<"ddoc_id">> := DDocId,
<<"sig">> := JobSig,
<<"retries">> := Retries
} = Data,
-
{ok, Db} = try
- fabric2_db:open(DbName, [?ADMIN_CTX])
+ fabric2_db:open(DbName, [?ADMIN_CTX, {uuid, DbUUID}])
catch error:database_does_not_exist ->
- couch_jobs:finish(undefined, Job, Data#{
- error => db_deleted,
- reason => "Database was deleted"
- }),
- exit(normal)
+ fail_job(Job, Data, db_deleted, "Database was deleted")
end,
{ok, DDoc} = case fabric2_db:open_doc(Db, DDocId) of
{ok, DDoc0} ->
{ok, DDoc0};
{not_found, _} ->
- couch_jobs:finish(undefined, Job, Data#{
- error => ddoc_deleted,
- reason => "Design document was deleted"
- }),
- exit(normal)
+ fail_job(Job, Data, ddoc_deleted, "Design document was deleted")
end,
{ok, Mrst} = couch_views_util:ddoc_to_mrst(DbName, DDoc),
HexSig = fabric2_util:to_hex(Mrst#mrst.sig),
- if HexSig == JobSig -> ok; true ->
- couch_jobs:finish(undefined, Job, Data#{
- error => sig_changed,
- reason => <<"Design document was modified">>
- }),
- exit(normal)
+ if HexSig == JobSig -> ok; true ->
+ fail_job(Job, Data, sig_changed, "Design document was modified")
end,
State = #{
tx_db => undefined,
+ db_uuid => DbUUID,
db_seq => undefined,
view_seq => undefined,
last_seq => undefined,
@@ -101,6 +90,8 @@ init() ->
catch
exit:normal ->
ok;
+ error:database_does_not_exist ->
+ fail_job(Job, Data, db_deleted, "Database was deleted");
Error:Reason ->
NewRetry = Retries + 1,
RetryLimit = retry_limit(),
@@ -115,18 +106,22 @@ init() ->
StateErr = State#{job_data := DataErr, last_seq := <<"0">>},
report_progress(StateErr, update);
false ->
- NewData = add_error(Error, Reason, Data),
- couch_jobs:finish(undefined, Job, NewData),
- exit(normal)
+ fail_job(Job, Data, Error, Reason)
end
end.
upgrade_data(Data) ->
- case maps:is_key(<<"retries">>, Data) of
- true -> Data;
- false -> Data#{<<"retries">> =>0}
- end.
+ Defaults = [
+ {<<"retries">>, 0},
+ {<<"db_uuid">>, undefined}
+ ],
+ lists:foldl(fun({Key, Default}, Acc) ->
+ case maps:is_key(Key, Acc) of
+ true -> Acc;
+ false -> maps:put(Key, Default, Acc)
+ end
+ end, Data, Defaults).
% Transaction limit exceeded don't retry
@@ -433,6 +428,7 @@ report_progress(State, UpdateType) ->
#{
<<"db_name">> := DbName,
+ <<"db_uuid">> := DbUUID,
<<"ddoc_id">> := DDocId,
<<"sig">> := Sig,
<<"retries">> := Retries
@@ -442,6 +438,7 @@ report_progress(State, UpdateType) ->
% possible existing error state.
NewData = #{
<<"db_name">> => DbName,
+ <<"db_uuid">> => DbUUID,
<<"ddoc_id">> => DDocId,
<<"sig">> => Sig,
<<"view_seq">> => LastSeq,
@@ -468,6 +465,12 @@ report_progress(State, UpdateType) ->
end.
+fail_job(Job, Data, Error, Reason) ->
+ NewData = add_error(Error, Reason, Data),
+ couch_jobs:finish(undefined, Job, NewData),
+ exit(normal).
+
+
num_changes() ->
config:get_integer("couch_views", "change_limit", 100).
diff --git a/src/couch_views/src/couch_views_jobs.erl b/src/couch_views/src/couch_views_jobs.erl
index 937146ce8..1604841f1 100644
--- a/src/couch_views/src/couch_views_jobs.erl
+++ b/src/couch_views/src/couch_views_jobs.erl
@@ -43,7 +43,19 @@ build_view(TxDb, Mrst, UpdateSeq) ->
build_view_async(TxDb, Mrst) ->
JobId = job_id(TxDb, Mrst),
JobData = job_data(TxDb, Mrst),
- ok = couch_jobs:add(undefined, ?INDEX_JOB_TYPE, JobId, JobData),
+ DbUUID = fabric2_db:get_uuid(TxDb),
+ couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+ case couch_jobs:get_job_data(JTx, ?INDEX_JOB_TYPE, JobId) of
+ {error, not_found} ->
+ ok;
+ {ok, #{} = OldJobData} ->
+ case maps:get(<<"db_uuid">>, OldJobData, undefined) of
+ DbUUID -> ok;
+ _ -> couch_jobs:remove(JTx, ?INDEX_JOB_TYPE, JobId)
+ end
+ end,
+ ok = couch_jobs:add(JTx, ?INDEX_JOB_TYPE, JobId, JobData)
+ end),
{ok, JobId}.
@@ -95,6 +107,7 @@ job_data(Db, Mrst) ->
#{
db_name => fabric2_db:name(Db),
+ db_uuid => fabric2_db:get_uuid(Db),
ddoc_id => DDocId,
sig => fabric2_util:to_hex(Sig),
retries => 0
diff --git a/src/couch_views/test/couch_views_indexer_test.erl b/src/couch_views/test/couch_views_indexer_test.erl
index 5475cf68e..3becb9545 100644
--- a/src/couch_views/test/couch_views_indexer_test.erl
+++ b/src/couch_views/test/couch_views_indexer_test.erl
@@ -16,6 +16,7 @@
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch/include/couch_eunit.hrl").
-include_lib("couch_mrview/include/couch_mrview.hrl").
+-include_lib("couch_views/include/couch_views.hrl").
-include_lib("fabric/test/fabric2_test.hrl").
@@ -47,7 +48,9 @@ indexer_test_() ->
?TDEF_FE(fewer_multipe_identical_keys_from_same_doc),
?TDEF_FE(handle_size_key_limits),
?TDEF_FE(handle_size_value_limits),
- ?TDEF_FE(index_autoupdater_callback)
+ ?TDEF_FE(index_autoupdater_callback),
+ ?TDEF_FE(handle_db_recreated_when_running),
+ ?TDEF_FE(handle_db_recreated_when_finished)
]
}
}
@@ -75,6 +78,7 @@ foreach_setup() ->
foreach_teardown(Db) ->
meck:unload(),
+ config:delete("couch_views", "change_limit"),
ok = fabric2_db:delete(fabric2_db:name(Db), []).
@@ -372,6 +376,76 @@ index_autoupdater_callback(Db) ->
?assertEqual(ok, couch_views_jobs:wait_for_job(JobId, DbSeq)).
+handle_db_recreated_when_running(Db) ->
+ DbName = fabric2_db:name(Db),
+
+ DDoc = create_ddoc(),
+ {ok, _} = fabric2_db:update_doc(Db, DDoc, []),
+ {ok, _} = fabric2_db:update_doc(Db, doc(0), []),
+ {ok, _} = fabric2_db:update_doc(Db, doc(1), []),
+
+ % To intercept job building while it is running ensure updates happen one
+ % row at a time.
+ config:set("couch_views", "change_limit", "1", false),
+
+ meck_intercept_job_update(self()),
+
+ [{ok, JobId}] = couch_views:build_indices(Db, [DDoc]),
+
+ {Indexer, _Job, _Data} = wait_indexer_update(10000),
+
+ {ok, State} = couch_jobs:get_job_state(undefined, ?INDEX_JOB_TYPE, JobId),
+ ?assertEqual(running, State),
+
+ ok = fabric2_db:delete(DbName, []),
+ {ok, Db1} = fabric2_db:create(DbName, [?ADMIN_CTX]),
+
+ {ok, _} = fabric2_db:update_doc(Db1, DDoc, []),
+ {ok, _} = fabric2_db:update_doc(Db1, doc(2), []),
+ {ok, _} = fabric2_db:update_doc(Db1, doc(3), []),
+
+ reset_intercept_job_update(Indexer),
+
+ {ok, Out2} = run_query(Db1, DDoc, ?MAP_FUN1),
+ ?assertEqual([
+ row(<<"2">>, 2, 2),
+ row(<<"3">>, 3, 3)
+ ], Out2).
+
+
+handle_db_recreated_when_finished(Db) ->
+ DbName = fabric2_db:name(Db),
+
+ DDoc = create_ddoc(),
+ {ok, _} = fabric2_db:update_doc(Db, DDoc, []),
+ {ok, _} = fabric2_db:update_doc(Db, doc(0), []),
+ {ok, _} = fabric2_db:update_doc(Db, doc(1), []),
+
+ {ok, Out1} = run_query(Db, DDoc, ?MAP_FUN1),
+ ?assertEqual([
+ row(<<"0">>, 0, 0),
+ row(<<"1">>, 1, 1)
+ ], Out1),
+
+ ok = fabric2_db:delete(DbName, []),
+
+ ?assertError(database_does_not_exist, run_query(Db, DDoc, ?MAP_FUN1)),
+
+ {ok, Db1} = fabric2_db:create(DbName, [?ADMIN_CTX]),
+
+ {ok, _} = fabric2_db:update_doc(Db1, DDoc, []),
+ {ok, _} = fabric2_db:update_doc(Db1, doc(2), []),
+ {ok, _} = fabric2_db:update_doc(Db1, doc(3), []),
+
+ ?assertError(database_does_not_exist, run_query(Db, DDoc, ?MAP_FUN1)),
+
+ {ok, Out2} = run_query(Db1, DDoc, ?MAP_FUN1),
+ ?assertEqual([
+ row(<<"2">>, 2, 2),
+ row(<<"3">>, 3, 3)
+ ], Out2).
+
+
row(Id, Key, Value) ->
{row, [
{id, Id},
@@ -480,3 +554,27 @@ doc(Id, Val) ->
run_query(#{} = Db, DDoc, <<_/binary>> = View) ->
couch_views:query(Db, DDoc, View, fun fold_fun/2, [], #mrargs{}).
+
+
+meck_intercept_job_update(ParentPid) ->
+ meck:new(couch_jobs, [passthrough]),
+ meck:expect(couch_jobs, update, fun(Db, Job, Data) ->
+ ParentPid ! {self(), Job, Data},
+ receive continue -> ok end,
+ meck:passthrough([Db, Job, Data])
+ end).
+
+
+reset_intercept_job_update(IndexerPid) ->
+ meck:expect(couch_jobs, update, fun(Db, Job, Data) ->
+ meck:passthrough([Db, Job, Data])
+ end),
+ IndexerPid ! continue.
+
+
+wait_indexer_update(Timeout) ->
+ receive
+ {Pid, Job, Data} when is_pid(Pid) -> {Pid, Job, Data}
+ after Timeout ->
+ error(timeout_in_wait_indexer_update)
+ end.