diff options
author | Paul J. Davis <paul.joseph.davis@gmail.com> | 2020-04-10 13:52:53 -0500 |
---|---|---|
committer | Paul J. Davis <paul.joseph.davis@gmail.com> | 2020-04-10 13:52:53 -0500 |
commit | 184096864aa78b85a0a6c7a061a9629e7c8abc15 (patch) | |
tree | e2f20864bb5b1fefeafd6420c4febd722ba415e3 | |
parent | 10796df983d2b9ea15cd7e8b38c67cbf55480995 (diff) | |
download | couchdb-prototype/fdb-layer-view-cleanup.tar.gz |
WIP: moar stuffprototype/fdb-layer-view-cleanup
-rw-r--r-- | src/couch_jobs/src/couch_jobs.erl | 1 | ||||
-rw-r--r-- | src/couch_views/src/couch_views_indexer.erl | 17 | ||||
-rw-r--r-- | src/couch_views/src/couch_views_jobs.erl | 3 | ||||
-rw-r--r-- | src/couch_views/test/couch_views_cleanup_test.erl | 100 |
4 files changed, 89 insertions, 32 deletions
diff --git a/src/couch_jobs/src/couch_jobs.erl b/src/couch_jobs/src/couch_jobs.erl index d9ea0fbfa..dd1b860d8 100644 --- a/src/couch_jobs/src/couch_jobs.erl +++ b/src/couch_jobs/src/couch_jobs.erl @@ -73,6 +73,7 @@ add(Tx, Type, JobId, JobData, ScheduledTime) when is_binary(JobId), -spec remove(jtx(), job_type(), job_id()) -> ok | {error, any()}. remove(Tx, Type, JobId) when is_binary(JobId) -> + io:format(standard_error, "~p ~p ~p~n", [Tx, Type, JobId]), couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) -> couch_jobs_fdb:remove(JTx, job(Type, JobId)) end). diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl index ab5aaade2..b3cd5e339 100644 --- a/src/couch_views/src/couch_views_indexer.erl +++ b/src/couch_views/src/couch_views_indexer.erl @@ -54,6 +54,16 @@ init() -> <<"retries">> := Retries } = Data, + io:format(standard_error, "BUILDING: ~p ~s ~s~n", [self(), DbName, JobSig]), + + Self = self(), + spawn(fun() -> + Ref = erlang:monitor(process, Self), + receive {'DOWN', Ref, _, _, Thing} -> + io:format(standard_error, "WHUT? ~p~n", [Thing]) + end + end), + {ok, Db} = try fabric2_db:open(DbName, [?ADMIN_CTX, {uuid, DbUUID}]) catch error:database_does_not_exist -> @@ -92,13 +102,18 @@ init() -> }, try - update(Db, Mrst, State) + io:format(standard_error, "START: ~p ~s ~s~n", [self(), DbName, JobSig]), + update(Db, Mrst, State), + io:format(standard_error, "DONE: ~p ~s ~s~n", [self(), DbName, JobSig]) catch exit:normal -> + Stack = erlang:get_stacktrace(), + io:format(standard_error, "STACK: ~p~n", [Stack]), ok; error:database_does_not_exist -> fail_job(Job, Data, db_deleted, "Database was deleted"); Error:Reason -> + io:format(standard_error, "Whut? ~p ~p~n", [Error, Reason]), couch_rate:failure(Limiter), NewRetry = Retries + 1, RetryLimit = retry_limit(), diff --git a/src/couch_views/src/couch_views_jobs.erl b/src/couch_views/src/couch_views_jobs.erl index 7d1ac3c33..7ebed6930 100644 --- a/src/couch_views/src/couch_views_jobs.erl +++ b/src/couch_views/src/couch_views_jobs.erl @@ -36,7 +36,7 @@ set_timeout() -> build_view(TxDb, Mrst, UpdateSeq) -> {ok, JobId} = build_view_async(TxDb, Mrst), case wait_for_job(JobId, Mrst#mrst.idx_name, UpdateSeq) of - ok -> ok; + ok -> io:format(standard_error, "Done waiting~n", []), ok; retry -> build_view(TxDb, Mrst, UpdateSeq) end. @@ -78,6 +78,7 @@ ensure_correct_tx(#{tx := Tx} = TxDb) -> wait_for_job(JobId, DDocId, UpdateSeq) -> + io:format(standard_error, "WAITING FOR JOB~n", []), case couch_jobs:subscribe(?INDEX_JOB_TYPE, JobId) of {ok, Subscription, _State, _Data} -> wait_for_job(JobId, Subscription, DDocId, UpdateSeq); diff --git a/src/couch_views/test/couch_views_cleanup_test.erl b/src/couch_views/test/couch_views_cleanup_test.erl index cdd416557..f65e9987a 100644 --- a/src/couch_views/test/couch_views_cleanup_test.erl +++ b/src/couch_views/test/couch_views_cleanup_test.erl @@ -34,17 +34,19 @@ clean_old_indices_test_() -> fun setup/0, fun cleanup/1, [ - ?TDEF_FE(empty_db), - ?TDEF_FE(db_with_no_ddocs), - ?TDEF_FE(db_with_ddoc), - ?TDEF_FE(db_with_many_ddocs), - ?TDEF_FE(after_ddoc_deletion), - ?TDEF_FE(all_ddocs_deleted), - ?TDEF_FE(after_ddoc_recreated), - ?TDEF_FE(refcounted_sigs), - ?TDEF_FE(removes_old_jobs), - ?TDEF_FE(after_job_accepted), - ?TDEF_FE(during_index_build) + %% ?TDEF_FE(empty_db), + %% ?TDEF_FE(db_with_no_ddocs), + %% ?TDEF_FE(db_with_ddoc), + %% ?TDEF_FE(db_with_many_ddocs), + %% ?TDEF_FE(after_ddoc_deletion), + %% ?TDEF_FE(all_ddocs_deleted), + %% ?TDEF_FE(after_ddoc_recreated), + %% ?TDEF_FE(refcounted_sigs), + %% ?TDEF_FE(removes_old_jobs), + %% ?TDEF_FE(after_job_accepted_initial_build), + %% ?TDEF_FE(after_job_accepted_rebuild), + %% ?TDEF_FE(during_index_initial_build), + ?TDEF_FE(during_index_rebuild) ] } } @@ -107,18 +109,13 @@ after_ddoc_deletion(Db) -> lists:foreach(fun(DDoc) -> ?assertEqual(10, length(run_query(Db, DDoc))) end, DDocs), - ?debugFmt("Here?", []), [ToDel | RestDDocs] = DDocs, delete_doc(Db, ToDel), % Not yet cleaned up - ?debugFmt("Here?", []), ?assertEqual(true, view_has_data(Db, ToDel)), ?assertEqual(ok, fabric2_index:cleanup(Db)), - ?debugFmt("Here?", []), ?assertError({ddoc_deleted, _}, run_query(Db, ToDel)), - ?debugFmt("Here?", []), lists:foreach(fun(DDoc) -> - ?debugFmt("Here: ~p", [DDoc]), ?assertEqual(10, length(run_query(Db, DDoc))) end, RestDDocs). @@ -205,36 +202,75 @@ removes_old_jobs(Db) -> ?assertEqual(false, job_exists(Db, DDoc)). -after_job_accepted(Db) -> - delete_and_cleanup_during_job(Db, fun meck_intercept_job_accept/1). +after_job_accepted_initial_build(Db) -> + cleanup_during_initial_build(Db, fun meck_intercept_job_accept/1). -during_index_build(Db) -> - delete_and_cleanup_during_job(Db, fun meck_intercept_job_update/1). +after_job_accepted_rebuild(Db) -> + cleanup_during_rebuild(Db, fun meck_intercept_job_accept/1). -delete_and_cleanup_during_job(Db, InterruptFun) -> +during_index_initial_build(Db) -> + cleanup_during_initial_build(Db, fun meck_intercept_job_update/1). + + +during_index_rebuild(Db) -> + cleanup_during_rebuild(Db, fun meck_intercept_job_update/1). + + +cleanup_during_initial_build(Db, InterruptFun) -> InterruptFun(self()), create_docs(Db, 10), DDoc = create_ddoc(Db, <<"foo">>), - {_, Ref} = spawn_monitor(fun() -> run_query(Db, DDoc) end), + {_, Ref1} = spawn_monitor(fun() -> run_query(Db, DDoc) end), + + receive {JobPid, triggered} -> ok end, + delete_doc(Db, DDoc), + ok = fabric2_index:cleanup(Db), + JobPid ! continue, + + receive {'DOWN', Ref1, _, _, _} -> ok end, + + ok = fabric2_index:cleanup(Db), + ?assertError({ddoc_deleted, _}, run_query(Db, DDoc)), + + ?assertEqual(false, view_has_data(Db, DDoc)), + ?assertEqual(false, job_exists(Db, DDoc)). + + +cleanup_during_rebuild(Db, InterruptFun) -> + create_docs(Db, 10), + DDoc = create_ddoc(Db, <<"foo">>), + ?assertEqual(10, length(run_query(Db, DDoc))), + + InterruptFun(self()), + + create_docs(Db, 10, 10), + ?debugFmt("HERE", []), + {_, Ref1} = spawn_monitor(fun() -> run_query(Db, DDoc) end), + + ?debugFmt("HERE", []), receive {JobPid, triggered} -> ok end, delete_doc(Db, DDoc), ok = fabric2_index:cleanup(Db), JobPid ! continue, - receive - {'DOWN', Ref, _, _, Reason} -> - ?assertMatch({{ddoc_deleted, _}, _}, Reason) - end, + ?debugFmt("HERE", []), + receive {'DOWN', Ref1, _, _, _} -> ok end, + + ?debugFmt("Again", []), + ok = fabric2_index:cleanup(Db), + ?assertError({ddoc_deleted, _}, run_query(Db, DDoc)), + ?debugFmt("Whut", []), ?assertEqual(false, view_has_data(Db, DDoc)), ?assertEqual(false, job_exists(Db, DDoc)). + run_query(Db, DDocId) when is_binary(DDocId) -> {ok, DDoc} = fabric2_db:open_doc(Db, <<"_design/", DDocId/binary>>), run_query(Db, DDoc); @@ -292,10 +328,10 @@ meck_intercept_job_accept(ParentPid) -> meck_intercept_job_update(ParentPid) -> meck:new(couch_jobs, [passthrough]), - meck:expect(couch_jobs, update, fun(Db, Job, Data) -> - Result = meck:passthrough([Db, Job, Data]), + meck:expect(couch_jobs, finish, fun(Tx, Job, Data) -> ParentPid ! {self(), triggered}, receive continue -> ok end, + Result = meck:passthrough([Tx, Job, Data]), meck:unload(), Result end). @@ -340,11 +376,15 @@ create_doc(Db, Id, Body) -> Doc#doc{revs = {Pos, [Rev]}}. -create_docs(Db, Count) when is_integer(Count), Count > 1 -> +create_docs(Db, Count) -> + create_docs(Db, Count, 0). + + +create_docs(Db, Count, Offset) -> lists:map(fun(Seq) -> Id = io_lib:format("~6..0b", [Seq]), create_doc(Db, iolist_to_binary(Id)) - end, lists:seq(1, Count)). + end, lists:seq(Offset + 1, Offset + Count)). delete_doc(Db, DDoc) -> |