summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul J. Davis <paul.joseph.davis@gmail.com>2020-04-10 13:52:53 -0500
committerPaul J. Davis <paul.joseph.davis@gmail.com>2020-04-10 13:52:53 -0500
commit184096864aa78b85a0a6c7a061a9629e7c8abc15 (patch)
treee2f20864bb5b1fefeafd6420c4febd722ba415e3
parent10796df983d2b9ea15cd7e8b38c67cbf55480995 (diff)
downloadcouchdb-prototype/fdb-layer-view-cleanup.tar.gz
-rw-r--r--src/couch_jobs/src/couch_jobs.erl1
-rw-r--r--src/couch_views/src/couch_views_indexer.erl17
-rw-r--r--src/couch_views/src/couch_views_jobs.erl3
-rw-r--r--src/couch_views/test/couch_views_cleanup_test.erl100
4 files changed, 89 insertions, 32 deletions
diff --git a/src/couch_jobs/src/couch_jobs.erl b/src/couch_jobs/src/couch_jobs.erl
index d9ea0fbfa..dd1b860d8 100644
--- a/src/couch_jobs/src/couch_jobs.erl
+++ b/src/couch_jobs/src/couch_jobs.erl
@@ -73,6 +73,7 @@ add(Tx, Type, JobId, JobData, ScheduledTime) when is_binary(JobId),
-spec remove(jtx(), job_type(), job_id()) -> ok | {error, any()}.
remove(Tx, Type, JobId) when is_binary(JobId) ->
+ io:format(standard_error, "~p ~p ~p~n", [Tx, Type, JobId]),
couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) ->
couch_jobs_fdb:remove(JTx, job(Type, JobId))
end).
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index ab5aaade2..b3cd5e339 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -54,6 +54,16 @@ init() ->
<<"retries">> := Retries
} = Data,
+ io:format(standard_error, "BUILDING: ~p ~s ~s~n", [self(), DbName, JobSig]),
+
+ Self = self(),
+ spawn(fun() ->
+ Ref = erlang:monitor(process, Self),
+ receive {'DOWN', Ref, _, _, Thing} ->
+ io:format(standard_error, "WHUT? ~p~n", [Thing])
+ end
+ end),
+
{ok, Db} = try
fabric2_db:open(DbName, [?ADMIN_CTX, {uuid, DbUUID}])
catch error:database_does_not_exist ->
@@ -92,13 +102,18 @@ init() ->
},
try
- update(Db, Mrst, State)
+ io:format(standard_error, "START: ~p ~s ~s~n", [self(), DbName, JobSig]),
+ update(Db, Mrst, State),
+ io:format(standard_error, "DONE: ~p ~s ~s~n", [self(), DbName, JobSig])
catch
exit:normal ->
+ Stack = erlang:get_stacktrace(),
+ io:format(standard_error, "STACK: ~p~n", [Stack]),
ok;
error:database_does_not_exist ->
fail_job(Job, Data, db_deleted, "Database was deleted");
Error:Reason ->
+ io:format(standard_error, "Whut? ~p ~p~n", [Error, Reason]),
couch_rate:failure(Limiter),
NewRetry = Retries + 1,
RetryLimit = retry_limit(),
diff --git a/src/couch_views/src/couch_views_jobs.erl b/src/couch_views/src/couch_views_jobs.erl
index 7d1ac3c33..7ebed6930 100644
--- a/src/couch_views/src/couch_views_jobs.erl
+++ b/src/couch_views/src/couch_views_jobs.erl
@@ -36,7 +36,7 @@ set_timeout() ->
build_view(TxDb, Mrst, UpdateSeq) ->
{ok, JobId} = build_view_async(TxDb, Mrst),
case wait_for_job(JobId, Mrst#mrst.idx_name, UpdateSeq) of
- ok -> ok;
+ ok -> io:format(standard_error, "Done waiting~n", []), ok;
retry -> build_view(TxDb, Mrst, UpdateSeq)
end.
@@ -78,6 +78,7 @@ ensure_correct_tx(#{tx := Tx} = TxDb) ->
wait_for_job(JobId, DDocId, UpdateSeq) ->
+ io:format(standard_error, "WAITING FOR JOB~n", []),
case couch_jobs:subscribe(?INDEX_JOB_TYPE, JobId) of
{ok, Subscription, _State, _Data} ->
wait_for_job(JobId, Subscription, DDocId, UpdateSeq);
diff --git a/src/couch_views/test/couch_views_cleanup_test.erl b/src/couch_views/test/couch_views_cleanup_test.erl
index cdd416557..f65e9987a 100644
--- a/src/couch_views/test/couch_views_cleanup_test.erl
+++ b/src/couch_views/test/couch_views_cleanup_test.erl
@@ -34,17 +34,19 @@ clean_old_indices_test_() ->
fun setup/0,
fun cleanup/1,
[
- ?TDEF_FE(empty_db),
- ?TDEF_FE(db_with_no_ddocs),
- ?TDEF_FE(db_with_ddoc),
- ?TDEF_FE(db_with_many_ddocs),
- ?TDEF_FE(after_ddoc_deletion),
- ?TDEF_FE(all_ddocs_deleted),
- ?TDEF_FE(after_ddoc_recreated),
- ?TDEF_FE(refcounted_sigs),
- ?TDEF_FE(removes_old_jobs),
- ?TDEF_FE(after_job_accepted),
- ?TDEF_FE(during_index_build)
+ %% ?TDEF_FE(empty_db),
+ %% ?TDEF_FE(db_with_no_ddocs),
+ %% ?TDEF_FE(db_with_ddoc),
+ %% ?TDEF_FE(db_with_many_ddocs),
+ %% ?TDEF_FE(after_ddoc_deletion),
+ %% ?TDEF_FE(all_ddocs_deleted),
+ %% ?TDEF_FE(after_ddoc_recreated),
+ %% ?TDEF_FE(refcounted_sigs),
+ %% ?TDEF_FE(removes_old_jobs),
+ %% ?TDEF_FE(after_job_accepted_initial_build),
+ %% ?TDEF_FE(after_job_accepted_rebuild),
+ %% ?TDEF_FE(during_index_initial_build),
+ ?TDEF_FE(during_index_rebuild)
]
}
}
@@ -107,18 +109,13 @@ after_ddoc_deletion(Db) ->
lists:foreach(fun(DDoc) ->
?assertEqual(10, length(run_query(Db, DDoc)))
end, DDocs),
- ?debugFmt("Here?", []),
[ToDel | RestDDocs] = DDocs,
delete_doc(Db, ToDel),
% Not yet cleaned up
- ?debugFmt("Here?", []),
?assertEqual(true, view_has_data(Db, ToDel)),
?assertEqual(ok, fabric2_index:cleanup(Db)),
- ?debugFmt("Here?", []),
?assertError({ddoc_deleted, _}, run_query(Db, ToDel)),
- ?debugFmt("Here?", []),
lists:foreach(fun(DDoc) ->
- ?debugFmt("Here: ~p", [DDoc]),
?assertEqual(10, length(run_query(Db, DDoc)))
end, RestDDocs).
@@ -205,36 +202,75 @@ removes_old_jobs(Db) ->
?assertEqual(false, job_exists(Db, DDoc)).
-after_job_accepted(Db) ->
- delete_and_cleanup_during_job(Db, fun meck_intercept_job_accept/1).
+after_job_accepted_initial_build(Db) ->
+ cleanup_during_initial_build(Db, fun meck_intercept_job_accept/1).
-during_index_build(Db) ->
- delete_and_cleanup_during_job(Db, fun meck_intercept_job_update/1).
+after_job_accepted_rebuild(Db) ->
+ cleanup_during_rebuild(Db, fun meck_intercept_job_accept/1).
-delete_and_cleanup_during_job(Db, InterruptFun) ->
+during_index_initial_build(Db) ->
+ cleanup_during_initial_build(Db, fun meck_intercept_job_update/1).
+
+
+during_index_rebuild(Db) ->
+ cleanup_during_rebuild(Db, fun meck_intercept_job_update/1).
+
+
+cleanup_during_initial_build(Db, InterruptFun) ->
InterruptFun(self()),
create_docs(Db, 10),
DDoc = create_ddoc(Db, <<"foo">>),
- {_, Ref} = spawn_monitor(fun() -> run_query(Db, DDoc) end),
+ {_, Ref1} = spawn_monitor(fun() -> run_query(Db, DDoc) end),
+
+ receive {JobPid, triggered} -> ok end,
+ delete_doc(Db, DDoc),
+ ok = fabric2_index:cleanup(Db),
+ JobPid ! continue,
+
+ receive {'DOWN', Ref1, _, _, _} -> ok end,
+
+ ok = fabric2_index:cleanup(Db),
+ ?assertError({ddoc_deleted, _}, run_query(Db, DDoc)),
+
+ ?assertEqual(false, view_has_data(Db, DDoc)),
+ ?assertEqual(false, job_exists(Db, DDoc)).
+
+
+cleanup_during_rebuild(Db, InterruptFun) ->
+ create_docs(Db, 10),
+ DDoc = create_ddoc(Db, <<"foo">>),
+ ?assertEqual(10, length(run_query(Db, DDoc))),
+
+ InterruptFun(self()),
+
+ create_docs(Db, 10, 10),
+ ?debugFmt("HERE", []),
+ {_, Ref1} = spawn_monitor(fun() -> run_query(Db, DDoc) end),
+
+ ?debugFmt("HERE", []),
receive {JobPid, triggered} -> ok end,
delete_doc(Db, DDoc),
ok = fabric2_index:cleanup(Db),
JobPid ! continue,
- receive
- {'DOWN', Ref, _, _, Reason} ->
- ?assertMatch({{ddoc_deleted, _}, _}, Reason)
- end,
+ ?debugFmt("HERE", []),
+ receive {'DOWN', Ref1, _, _, _} -> ok end,
+
+ ?debugFmt("Again", []),
+ ok = fabric2_index:cleanup(Db),
+ ?assertError({ddoc_deleted, _}, run_query(Db, DDoc)),
+ ?debugFmt("Whut", []),
?assertEqual(false, view_has_data(Db, DDoc)),
?assertEqual(false, job_exists(Db, DDoc)).
+
run_query(Db, DDocId) when is_binary(DDocId) ->
{ok, DDoc} = fabric2_db:open_doc(Db, <<"_design/", DDocId/binary>>),
run_query(Db, DDoc);
@@ -292,10 +328,10 @@ meck_intercept_job_accept(ParentPid) ->
meck_intercept_job_update(ParentPid) ->
meck:new(couch_jobs, [passthrough]),
- meck:expect(couch_jobs, update, fun(Db, Job, Data) ->
- Result = meck:passthrough([Db, Job, Data]),
+ meck:expect(couch_jobs, finish, fun(Tx, Job, Data) ->
ParentPid ! {self(), triggered},
receive continue -> ok end,
+ Result = meck:passthrough([Tx, Job, Data]),
meck:unload(),
Result
end).
@@ -340,11 +376,15 @@ create_doc(Db, Id, Body) ->
Doc#doc{revs = {Pos, [Rev]}}.
-create_docs(Db, Count) when is_integer(Count), Count > 1 ->
+create_docs(Db, Count) ->
+ create_docs(Db, Count, 0).
+
+
+create_docs(Db, Count, Offset) ->
lists:map(fun(Seq) ->
Id = io_lib:format("~6..0b", [Seq]),
create_doc(Db, iolist_to_binary(Id))
- end, lists:seq(1, Count)).
+ end, lists:seq(Offset + 1, Offset + Count)).
delete_doc(Db, DDoc) ->