diff options
author | Garren Smith <garren.smith@gmail.com> | 2020-03-23 14:28:37 +0200 |
---|---|---|
committer | garren smith <garren.smith@gmail.com> | 2020-04-06 17:55:49 +0200 |
commit | 9d27c6e817692ee98acdd1369b771ccb1b8bd79d (patch) | |
tree | d4f9e21f4d711658a08dcb73b132cecebeb3d627 | |
parent | 551bd91d1f0c195d44c5a5f660c1bdcfcc117b1a (diff) | |
download | couchdb-9d27c6e817692ee98acdd1369b771ccb1b8bd79d.tar.gz |
Add couch_views_indexer build to creation versionstamp
This creates a versionstamp for when an indexed was created
and build status for indexes. if the index has a creation_vs, then
couch_views_indexer will built the index to this creation versionstamp.
-rw-r--r-- | src/couch_views/include/couch_views.hrl | 6 | ||||
-rw-r--r-- | src/couch_views/src/couch_views_fdb.erl | 76 | ||||
-rw-r--r-- | src/couch_views/src/couch_views_indexer.erl | 77 | ||||
-rw-r--r-- | src/couch_views/src/couch_views_jobs.erl | 21 | ||||
-rw-r--r-- | src/couch_views/test/couch_views_indexer_test.erl | 42 | ||||
-rw-r--r-- | src/couch_views/test/couch_views_trace_index_test.erl | 5 | ||||
-rw-r--r-- | src/fabric/src/fabric2_fdb.erl | 12 |
7 files changed, 206 insertions, 33 deletions
diff --git a/src/couch_views/include/couch_views.hrl b/src/couch_views/include/couch_views.hrl index c40bb0212..3d0110f65 100644 --- a/src/couch_views/include/couch_views.hrl +++ b/src/couch_views/include/couch_views.hrl @@ -18,6 +18,8 @@ -define(VIEW_UPDATE_SEQ, 0). -define(VIEW_ROW_COUNT, 1). -define(VIEW_KV_SIZE, 2). +-define(VIEW_BUILD_STATUS, 3). +-define(VIEW_CREATION_VS, 4). % Data keys -define(VIEW_ID_RANGE, 0). @@ -25,3 +27,7 @@ % jobs api -define(INDEX_JOB_TYPE, <<"views">>). + +% indexing progress +-define(INDEX_BUILDING, <<"building">>). +-define(INDEX_READY, <<"ready">>). diff --git a/src/couch_views/src/couch_views_fdb.erl b/src/couch_views/src/couch_views_fdb.erl index a0224b2b8..3b008d44b 100644 --- a/src/couch_views/src/couch_views_fdb.erl +++ b/src/couch_views/src/couch_views_fdb.erl @@ -13,6 +13,12 @@ -module(couch_views_fdb). -export([ + new_interactive_index/3, + new_creation_vs/3, + get_creation_vs/2, + get_build_status/2, + set_build_status/3, + get_update_seq/2, set_update_seq/3, @@ -39,6 +45,60 @@ -include_lib("fabric/include/fabric2.hrl"). +new_interactive_index(Db, Mrst, VS) -> + couch_views_fdb:new_creation_vs(Db, Mrst, VS), + couch_views_fdb:set_build_status(Db, Mrst, ?INDEX_BUILDING). + + +%Interactive View Creation Versionstamp +%(<db>, ?DB_VIEWS, ?VIEW_INFO, ?VIEW_CREATION_VS, Sig) = VS + +new_creation_vs(TxDb, #mrst{} = Mrst, VS) -> + #{ + tx := Tx + } = TxDb, + Key = creation_vs_key(TxDb, Mrst#mrst.sig), + Value = erlfdb_tuple:pack_vs({VS}), + ok = erlfdb:set_versionstamped_value(Tx, Key, Value). + + +get_creation_vs(TxDb, #mrst{} = Mrst) -> + get_creation_vs(TxDb, Mrst#mrst.sig); + +get_creation_vs(TxDb, Sig) -> + #{ + tx := Tx + } = TxDb, + Key = creation_vs_key(TxDb, Sig), + case erlfdb:wait(erlfdb:get(Tx, Key)) of + not_found -> + not_found; + EK -> + {VS} = erlfdb_tuple:unpack(EK), + VS + end. + + +%Interactive View Build Status +%(<db>, ?DB_VIEWS, ?VIEW_INFO, ?VIEW_BUILD_STATUS, Sig) = INDEX_BUILDING | INDEX_READY + +get_build_status(TxDb, #mrst{sig = Sig}) -> + #{ + tx := Tx + } = TxDb, + Key = build_status_key(TxDb, Sig), + erlfdb:wait(erlfdb:get(Tx, Key)). + + +set_build_status(TxDb, #mrst{sig = Sig}, State) -> + #{ + tx := Tx + } = TxDb, + + Key = build_status_key(TxDb, Sig), + ok = erlfdb:set(Tx, Key, State). + + % View Build Sequence Access % (<db>, ?DB_VIEWS, Sig, ?VIEW_UPDATE_SEQ) = Sequence @@ -340,6 +400,22 @@ map_idx_range(DbPrefix, Sig, ViewId, MapKey, DocId) -> erlfdb_tuple:range(Key, DbPrefix). +creation_vs_key(Db, Sig) -> + #{ + db_prefix := DbPrefix + } = Db, + Key = {?DB_VIEWS, ?VIEW_INFO, ?VIEW_CREATION_VS, Sig}, + erlfdb_tuple:pack(Key, DbPrefix). + + +build_status_key(Db, Sig) -> + #{ + db_prefix := DbPrefix + } = Db, + Key = {?DB_VIEWS, ?VIEW_INFO, ?VIEW_BUILD_STATUS, Sig}, + erlfdb_tuple:pack(Key, DbPrefix). + + process_rows(Rows) -> Encoded = lists:map(fun({K, V}) -> EK1 = couch_views_encoding:encode(K, key), diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl index 0127bacec..ab5aaade2 100644 --- a/src/couch_views/src/couch_views_indexer.erl +++ b/src/couch_views/src/couch_views_indexer.erl @@ -18,7 +18,9 @@ -export([ - init/0 + init/0, + map_docs/2, + write_docs/4 ]). -ifdef(TEST). @@ -80,6 +82,7 @@ init() -> db_seq => undefined, view_seq => undefined, last_seq => undefined, + view_vs => undefined, job => Job, job_data => Data, count => 0, @@ -174,22 +177,7 @@ update(#{} = Db, Mrst0, State0) -> do_update(Db, Mrst0, State0) -> fabric2_fdb:transactional(Db, fun(TxDb) -> - % In the first iteration of update we need - % to populate our db and view sequences - State1 = case State0 of - #{db_seq := undefined} -> - ViewSeq = couch_views_fdb:get_update_seq(TxDb, Mrst0), - State0#{ - tx_db := TxDb, - db_seq := fabric2_db:get_update_seq(TxDb), - view_seq := ViewSeq, - last_seq := ViewSeq - }; - _ -> - State0#{ - tx_db := TxDb - } - end, + State1 = get_update_start_state(TxDb, Mrst0, State0), {ok, State2} = fold_changes(State1), @@ -198,7 +186,8 @@ do_update(Db, Mrst0, State0) -> doc_acc := DocAcc, last_seq := LastSeq, limit := Limit, - limiter := Limiter + limiter := Limiter, + view_vs := ViewVS } = State2, DocAcc1 = fetch_docs(TxDb, DocAcc), couch_rate:in(Limiter, Count), @@ -210,6 +199,8 @@ do_update(Db, Mrst0, State0) -> case Count < Limit of true -> + maybe_set_build_status(TxDb, Mrst1, ViewVS, + ?INDEX_READY), report_progress(State2, finished), {Mrst1, finished}; false -> @@ -224,6 +215,33 @@ do_update(Db, Mrst0, State0) -> end). +maybe_set_build_status(_TxDb, _Mrst1, not_found, _State) -> + ok; + +maybe_set_build_status(TxDb, Mrst1, _ViewVS, State) -> + couch_views_fdb:set_build_status(TxDb, Mrst1, State). + + +% In the first iteration of update we need +% to populate our db and view sequences +get_update_start_state(TxDb, Mrst, #{db_seq := undefined} = State) -> + ViewVS = couch_views_fdb:get_creation_vs(TxDb, Mrst), + ViewSeq = couch_views_fdb:get_update_seq(TxDb, Mrst), + + State#{ + tx_db := TxDb, + db_seq := fabric2_db:get_update_seq(TxDb), + view_vs := ViewVS, + view_seq := ViewSeq, + last_seq := ViewSeq + }; + +get_update_start_state(TxDb, _Idx, State) -> + State#{ + tx_db := TxDb + }. + + fold_changes(State) -> #{ view_seq := SinceSeq, @@ -240,7 +258,8 @@ process_changes(Change, Acc) -> #{ doc_acc := DocAcc, count := Count, - design_opts := DesignOpts + design_opts := DesignOpts, + view_vs := ViewVS } = Acc, #{ @@ -263,8 +282,22 @@ process_changes(Change, Acc) -> last_seq := LastSeq } end, - {ok, Acc1}. + DocVS = fabric2_fdb:seq_to_vs(LastSeq), + + Go = maybe_stop_at_vs(ViewVS, DocVS), + {Go, Acc1}. + + +maybe_stop_at_vs({versionstamp, _} = ViewVS, DocVS) when DocVS >= ViewVS -> + stop; + +maybe_stop_at_vs(_, _) -> + ok. + + +map_docs(Mrst, []) -> + {Mrst, []}; map_docs(Mrst, Docs) -> % Run all the non deleted docs through the view engine and @@ -328,7 +361,9 @@ write_docs(TxDb, Mrst, Docs, State) -> N + 1 end, 0, Docs), - couch_views_fdb:set_update_seq(TxDb, Sig, LastSeq), + if LastSeq == false -> ok; true -> + couch_views_fdb:set_update_seq(TxDb, Sig, LastSeq) + end, DocsNumber. diff --git a/src/couch_views/src/couch_views_jobs.erl b/src/couch_views/src/couch_views_jobs.erl index 1604841f1..b97e7ce0f 100644 --- a/src/couch_views/src/couch_views_jobs.erl +++ b/src/couch_views/src/couch_views_jobs.erl @@ -40,11 +40,12 @@ build_view(TxDb, Mrst, UpdateSeq) -> end. -build_view_async(TxDb, Mrst) -> - JobId = job_id(TxDb, Mrst), - JobData = job_data(TxDb, Mrst), - DbUUID = fabric2_db:get_uuid(TxDb), - couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) -> +build_view_async(TxDb0, Mrst) -> + JobId = job_id(TxDb0, Mrst), + JobData = job_data(TxDb0, Mrst), + DbUUID = fabric2_db:get_uuid(TxDb0), + TxDb1 = ensure_correct_tx(TxDb0), + couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(TxDb1), fun(JTx) -> case couch_jobs:get_job_data(JTx, ?INDEX_JOB_TYPE, JobId) of {error, not_found} -> ok; @@ -59,6 +60,16 @@ build_view_async(TxDb, Mrst) -> {ok, JobId}. +ensure_correct_tx(#{tx := undefined} = TxDb) -> + TxDb; + +ensure_correct_tx(#{tx := Tx} = TxDb) -> + case erlfdb:is_read_only(Tx) of + true -> TxDb#{tx := undefined}; + false -> TxDb + end. + + wait_for_job(JobId, UpdateSeq) -> case couch_jobs:subscribe(?INDEX_JOB_TYPE, JobId) of {ok, Subscription, _State, _Data} -> diff --git a/src/couch_views/test/couch_views_indexer_test.erl b/src/couch_views/test/couch_views_indexer_test.erl index 43b58284d..8ddb64b9c 100644 --- a/src/couch_views/test/couch_views_indexer_test.erl +++ b/src/couch_views/test/couch_views_indexer_test.erl @@ -51,7 +51,8 @@ indexer_test_() -> ?TDEF_FE(index_autoupdater_callback), ?TDEF_FE(handle_db_recreated_when_running), ?TDEF_FE(handle_db_recreated_after_finished), - ?TDEF_FE(index_budget_is_changing) + ?TDEF_FE(index_budget_is_changing), + ?TDEF_FE(index_can_recover_from_crash, 60) ] } } @@ -508,6 +509,41 @@ handle_db_recreated_after_finished(Db) -> ], Out2). +index_can_recover_from_crash(Db) -> + ok = meck:new(config, [passthrough]), + ok = meck:expect(config, get_integer, fun(Section, Key, Default) -> + case Section == "couch_views" andalso Key == "change_limit" of + true -> 1; + _ -> Default + end + end), + meck:new(couch_eval, [passthrough]), + meck:expect(couch_eval, map_docs, fun(State, Docs) -> + Doc = hd(Docs), + case Doc#doc.id == <<"2">> of + true -> + % remove the mock so that next time the doc is processed + % it will work + meck:unload(couch_eval), + throw({fake_crash, test_jobs_restart}); + false -> + meck:passthrough([State, Docs]) + end + end), + + DDoc = create_ddoc(), + Docs = make_docs(3), + {ok, _} = fabric2_db:update_doc(Db, DDoc, []), + {ok, _} = fabric2_db:update_docs(Db, Docs, []), + + {ok, Out} = run_query(Db, DDoc, ?MAP_FUN1), + ?assertEqual([ + row(<<"1">>, 1, 1), + row(<<"2">>, 2, 2), + row(<<"3">>, 3, 3) + ], Out). + + row(Id, Key, Value) -> {row, [ {id, Id}, @@ -603,6 +639,10 @@ create_ddoc(multi_emit_key_limit) -> ]}). +make_docs(Count) -> + [doc(I) || I <- lists:seq(1, Count)]. + + doc(Id) -> doc(Id, Id). diff --git a/src/couch_views/test/couch_views_trace_index_test.erl b/src/couch_views/test/couch_views_trace_index_test.erl index f8a5ce535..5b15a4ce2 100644 --- a/src/couch_views/test/couch_views_trace_index_test.erl +++ b/src/couch_views/test/couch_views_trace_index_test.erl @@ -77,10 +77,13 @@ trace_single_doc(Db) -> {ok, _} = fabric2_db:update_doc(Db, Doc, []), {ok, Mrst} = couch_views_util:ddoc_to_mrst(DbName, DDoc), + HexSig = fabric2_util:to_hex(Mrst#mrst.sig), JobData = #{ <<"db_name">> => DbName, + <<"db_uuid">> => fabric2_db:get_uuid(Db), <<"ddoc_id">> => <<"_design/bar">>, - <<"sig">> => fabric2_util:to_hex(Mrst#mrst.sig) + <<"sig">> => HexSig, + <<"retries">> => 0 }, meck:expect(couch_jobs, accept, 2, {ok, job, JobData}), meck:expect(couch_jobs, update, 3, {ok, job}), diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl index 912d4dfa8..2295a5648 100644 --- a/src/fabric/src/fabric2_fdb.erl +++ b/src/fabric/src/fabric2_fdb.erl @@ -64,6 +64,8 @@ seq_to_vs/1, next_vs/1, + new_versionstamp/1, + debug_cluster/0, debug_cluster/2 ]). @@ -1021,6 +1023,11 @@ next_vs({versionstamp, VS, Batch, TxId}) -> {versionstamp, V, B, T}. +new_versionstamp(Tx) -> + TxId = erlfdb:get_next_tx_id(Tx), + {versionstamp, 16#FFFFFFFFFFFFFFFF, 16#FFFF, TxId}. + + debug_cluster() -> debug_cluster(<<>>, <<16#FE, 16#FF, 16#FF>>). @@ -1763,11 +1770,6 @@ get_transaction_id(Tx, LayerPrefix) -> end. -new_versionstamp(Tx) -> - TxId = erlfdb:get_next_tx_id(Tx), - {versionstamp, 16#FFFFFFFFFFFFFFFF, 16#FFFF, TxId}. - - on_commit(Tx, Fun) when is_function(Fun, 0) -> % Here we rely on Tx objects matching. However they contain a nif resource % object. Before Erlang 20.0 those would have been represented as empty |