summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarren Smith <garren.smith@gmail.com>2020-03-23 14:28:37 +0200
committergarren smith <garren.smith@gmail.com>2020-04-06 17:55:49 +0200
commit9d27c6e817692ee98acdd1369b771ccb1b8bd79d (patch)
treed4f9e21f4d711658a08dcb73b132cecebeb3d627
parent551bd91d1f0c195d44c5a5f660c1bdcfcc117b1a (diff)
downloadcouchdb-9d27c6e817692ee98acdd1369b771ccb1b8bd79d.tar.gz
Add couch_views_indexer build to creation versionstamp
This creates a versionstamp for when an indexed was created and build status for indexes. if the index has a creation_vs, then couch_views_indexer will built the index to this creation versionstamp.
-rw-r--r--src/couch_views/include/couch_views.hrl6
-rw-r--r--src/couch_views/src/couch_views_fdb.erl76
-rw-r--r--src/couch_views/src/couch_views_indexer.erl77
-rw-r--r--src/couch_views/src/couch_views_jobs.erl21
-rw-r--r--src/couch_views/test/couch_views_indexer_test.erl42
-rw-r--r--src/couch_views/test/couch_views_trace_index_test.erl5
-rw-r--r--src/fabric/src/fabric2_fdb.erl12
7 files changed, 206 insertions, 33 deletions
diff --git a/src/couch_views/include/couch_views.hrl b/src/couch_views/include/couch_views.hrl
index c40bb0212..3d0110f65 100644
--- a/src/couch_views/include/couch_views.hrl
+++ b/src/couch_views/include/couch_views.hrl
@@ -18,6 +18,8 @@
-define(VIEW_UPDATE_SEQ, 0).
-define(VIEW_ROW_COUNT, 1).
-define(VIEW_KV_SIZE, 2).
+-define(VIEW_BUILD_STATUS, 3).
+-define(VIEW_CREATION_VS, 4).
% Data keys
-define(VIEW_ID_RANGE, 0).
@@ -25,3 +27,7 @@
% jobs api
-define(INDEX_JOB_TYPE, <<"views">>).
+
+% indexing progress
+-define(INDEX_BUILDING, <<"building">>).
+-define(INDEX_READY, <<"ready">>).
diff --git a/src/couch_views/src/couch_views_fdb.erl b/src/couch_views/src/couch_views_fdb.erl
index a0224b2b8..3b008d44b 100644
--- a/src/couch_views/src/couch_views_fdb.erl
+++ b/src/couch_views/src/couch_views_fdb.erl
@@ -13,6 +13,12 @@
-module(couch_views_fdb).
-export([
+ new_interactive_index/3,
+ new_creation_vs/3,
+ get_creation_vs/2,
+ get_build_status/2,
+ set_build_status/3,
+
get_update_seq/2,
set_update_seq/3,
@@ -39,6 +45,60 @@
-include_lib("fabric/include/fabric2.hrl").
+new_interactive_index(Db, Mrst, VS) ->
+ couch_views_fdb:new_creation_vs(Db, Mrst, VS),
+ couch_views_fdb:set_build_status(Db, Mrst, ?INDEX_BUILDING).
+
+
+%Interactive View Creation Versionstamp
+%(<db>, ?DB_VIEWS, ?VIEW_INFO, ?VIEW_CREATION_VS, Sig) = VS
+
+new_creation_vs(TxDb, #mrst{} = Mrst, VS) ->
+ #{
+ tx := Tx
+ } = TxDb,
+ Key = creation_vs_key(TxDb, Mrst#mrst.sig),
+ Value = erlfdb_tuple:pack_vs({VS}),
+ ok = erlfdb:set_versionstamped_value(Tx, Key, Value).
+
+
+get_creation_vs(TxDb, #mrst{} = Mrst) ->
+ get_creation_vs(TxDb, Mrst#mrst.sig);
+
+get_creation_vs(TxDb, Sig) ->
+ #{
+ tx := Tx
+ } = TxDb,
+ Key = creation_vs_key(TxDb, Sig),
+ case erlfdb:wait(erlfdb:get(Tx, Key)) of
+ not_found ->
+ not_found;
+ EK ->
+ {VS} = erlfdb_tuple:unpack(EK),
+ VS
+ end.
+
+
+%Interactive View Build Status
+%(<db>, ?DB_VIEWS, ?VIEW_INFO, ?VIEW_BUILD_STATUS, Sig) = INDEX_BUILDING | INDEX_READY
+
+get_build_status(TxDb, #mrst{sig = Sig}) ->
+ #{
+ tx := Tx
+ } = TxDb,
+ Key = build_status_key(TxDb, Sig),
+ erlfdb:wait(erlfdb:get(Tx, Key)).
+
+
+set_build_status(TxDb, #mrst{sig = Sig}, State) ->
+ #{
+ tx := Tx
+ } = TxDb,
+
+ Key = build_status_key(TxDb, Sig),
+ ok = erlfdb:set(Tx, Key, State).
+
+
% View Build Sequence Access
% (<db>, ?DB_VIEWS, Sig, ?VIEW_UPDATE_SEQ) = Sequence
@@ -340,6 +400,22 @@ map_idx_range(DbPrefix, Sig, ViewId, MapKey, DocId) ->
erlfdb_tuple:range(Key, DbPrefix).
+creation_vs_key(Db, Sig) ->
+ #{
+ db_prefix := DbPrefix
+ } = Db,
+ Key = {?DB_VIEWS, ?VIEW_INFO, ?VIEW_CREATION_VS, Sig},
+ erlfdb_tuple:pack(Key, DbPrefix).
+
+
+build_status_key(Db, Sig) ->
+ #{
+ db_prefix := DbPrefix
+ } = Db,
+ Key = {?DB_VIEWS, ?VIEW_INFO, ?VIEW_BUILD_STATUS, Sig},
+ erlfdb_tuple:pack(Key, DbPrefix).
+
+
process_rows(Rows) ->
Encoded = lists:map(fun({K, V}) ->
EK1 = couch_views_encoding:encode(K, key),
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index 0127bacec..ab5aaade2 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -18,7 +18,9 @@
-export([
- init/0
+ init/0,
+ map_docs/2,
+ write_docs/4
]).
-ifdef(TEST).
@@ -80,6 +82,7 @@ init() ->
db_seq => undefined,
view_seq => undefined,
last_seq => undefined,
+ view_vs => undefined,
job => Job,
job_data => Data,
count => 0,
@@ -174,22 +177,7 @@ update(#{} = Db, Mrst0, State0) ->
do_update(Db, Mrst0, State0) ->
fabric2_fdb:transactional(Db, fun(TxDb) ->
- % In the first iteration of update we need
- % to populate our db and view sequences
- State1 = case State0 of
- #{db_seq := undefined} ->
- ViewSeq = couch_views_fdb:get_update_seq(TxDb, Mrst0),
- State0#{
- tx_db := TxDb,
- db_seq := fabric2_db:get_update_seq(TxDb),
- view_seq := ViewSeq,
- last_seq := ViewSeq
- };
- _ ->
- State0#{
- tx_db := TxDb
- }
- end,
+ State1 = get_update_start_state(TxDb, Mrst0, State0),
{ok, State2} = fold_changes(State1),
@@ -198,7 +186,8 @@ do_update(Db, Mrst0, State0) ->
doc_acc := DocAcc,
last_seq := LastSeq,
limit := Limit,
- limiter := Limiter
+ limiter := Limiter,
+ view_vs := ViewVS
} = State2,
DocAcc1 = fetch_docs(TxDb, DocAcc),
couch_rate:in(Limiter, Count),
@@ -210,6 +199,8 @@ do_update(Db, Mrst0, State0) ->
case Count < Limit of
true ->
+ maybe_set_build_status(TxDb, Mrst1, ViewVS,
+ ?INDEX_READY),
report_progress(State2, finished),
{Mrst1, finished};
false ->
@@ -224,6 +215,33 @@ do_update(Db, Mrst0, State0) ->
end).
+maybe_set_build_status(_TxDb, _Mrst1, not_found, _State) ->
+ ok;
+
+maybe_set_build_status(TxDb, Mrst1, _ViewVS, State) ->
+ couch_views_fdb:set_build_status(TxDb, Mrst1, State).
+
+
+% In the first iteration of update we need
+% to populate our db and view sequences
+get_update_start_state(TxDb, Mrst, #{db_seq := undefined} = State) ->
+ ViewVS = couch_views_fdb:get_creation_vs(TxDb, Mrst),
+ ViewSeq = couch_views_fdb:get_update_seq(TxDb, Mrst),
+
+ State#{
+ tx_db := TxDb,
+ db_seq := fabric2_db:get_update_seq(TxDb),
+ view_vs := ViewVS,
+ view_seq := ViewSeq,
+ last_seq := ViewSeq
+ };
+
+get_update_start_state(TxDb, _Idx, State) ->
+ State#{
+ tx_db := TxDb
+ }.
+
+
fold_changes(State) ->
#{
view_seq := SinceSeq,
@@ -240,7 +258,8 @@ process_changes(Change, Acc) ->
#{
doc_acc := DocAcc,
count := Count,
- design_opts := DesignOpts
+ design_opts := DesignOpts,
+ view_vs := ViewVS
} = Acc,
#{
@@ -263,8 +282,22 @@ process_changes(Change, Acc) ->
last_seq := LastSeq
}
end,
- {ok, Acc1}.
+ DocVS = fabric2_fdb:seq_to_vs(LastSeq),
+
+ Go = maybe_stop_at_vs(ViewVS, DocVS),
+ {Go, Acc1}.
+
+
+maybe_stop_at_vs({versionstamp, _} = ViewVS, DocVS) when DocVS >= ViewVS ->
+ stop;
+
+maybe_stop_at_vs(_, _) ->
+ ok.
+
+
+map_docs(Mrst, []) ->
+ {Mrst, []};
map_docs(Mrst, Docs) ->
% Run all the non deleted docs through the view engine and
@@ -328,7 +361,9 @@ write_docs(TxDb, Mrst, Docs, State) ->
N + 1
end, 0, Docs),
- couch_views_fdb:set_update_seq(TxDb, Sig, LastSeq),
+ if LastSeq == false -> ok; true ->
+ couch_views_fdb:set_update_seq(TxDb, Sig, LastSeq)
+ end,
DocsNumber.
diff --git a/src/couch_views/src/couch_views_jobs.erl b/src/couch_views/src/couch_views_jobs.erl
index 1604841f1..b97e7ce0f 100644
--- a/src/couch_views/src/couch_views_jobs.erl
+++ b/src/couch_views/src/couch_views_jobs.erl
@@ -40,11 +40,12 @@ build_view(TxDb, Mrst, UpdateSeq) ->
end.
-build_view_async(TxDb, Mrst) ->
- JobId = job_id(TxDb, Mrst),
- JobData = job_data(TxDb, Mrst),
- DbUUID = fabric2_db:get_uuid(TxDb),
- couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+build_view_async(TxDb0, Mrst) ->
+ JobId = job_id(TxDb0, Mrst),
+ JobData = job_data(TxDb0, Mrst),
+ DbUUID = fabric2_db:get_uuid(TxDb0),
+ TxDb1 = ensure_correct_tx(TxDb0),
+ couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(TxDb1), fun(JTx) ->
case couch_jobs:get_job_data(JTx, ?INDEX_JOB_TYPE, JobId) of
{error, not_found} ->
ok;
@@ -59,6 +60,16 @@ build_view_async(TxDb, Mrst) ->
{ok, JobId}.
+ensure_correct_tx(#{tx := undefined} = TxDb) ->
+ TxDb;
+
+ensure_correct_tx(#{tx := Tx} = TxDb) ->
+ case erlfdb:is_read_only(Tx) of
+ true -> TxDb#{tx := undefined};
+ false -> TxDb
+ end.
+
+
wait_for_job(JobId, UpdateSeq) ->
case couch_jobs:subscribe(?INDEX_JOB_TYPE, JobId) of
{ok, Subscription, _State, _Data} ->
diff --git a/src/couch_views/test/couch_views_indexer_test.erl b/src/couch_views/test/couch_views_indexer_test.erl
index 43b58284d..8ddb64b9c 100644
--- a/src/couch_views/test/couch_views_indexer_test.erl
+++ b/src/couch_views/test/couch_views_indexer_test.erl
@@ -51,7 +51,8 @@ indexer_test_() ->
?TDEF_FE(index_autoupdater_callback),
?TDEF_FE(handle_db_recreated_when_running),
?TDEF_FE(handle_db_recreated_after_finished),
- ?TDEF_FE(index_budget_is_changing)
+ ?TDEF_FE(index_budget_is_changing),
+ ?TDEF_FE(index_can_recover_from_crash, 60)
]
}
}
@@ -508,6 +509,41 @@ handle_db_recreated_after_finished(Db) ->
], Out2).
+index_can_recover_from_crash(Db) ->
+ ok = meck:new(config, [passthrough]),
+ ok = meck:expect(config, get_integer, fun(Section, Key, Default) ->
+ case Section == "couch_views" andalso Key == "change_limit" of
+ true -> 1;
+ _ -> Default
+ end
+ end),
+ meck:new(couch_eval, [passthrough]),
+ meck:expect(couch_eval, map_docs, fun(State, Docs) ->
+ Doc = hd(Docs),
+ case Doc#doc.id == <<"2">> of
+ true ->
+ % remove the mock so that next time the doc is processed
+ % it will work
+ meck:unload(couch_eval),
+ throw({fake_crash, test_jobs_restart});
+ false ->
+ meck:passthrough([State, Docs])
+ end
+ end),
+
+ DDoc = create_ddoc(),
+ Docs = make_docs(3),
+ {ok, _} = fabric2_db:update_doc(Db, DDoc, []),
+ {ok, _} = fabric2_db:update_docs(Db, Docs, []),
+
+ {ok, Out} = run_query(Db, DDoc, ?MAP_FUN1),
+ ?assertEqual([
+ row(<<"1">>, 1, 1),
+ row(<<"2">>, 2, 2),
+ row(<<"3">>, 3, 3)
+ ], Out).
+
+
row(Id, Key, Value) ->
{row, [
{id, Id},
@@ -603,6 +639,10 @@ create_ddoc(multi_emit_key_limit) ->
]}).
+make_docs(Count) ->
+ [doc(I) || I <- lists:seq(1, Count)].
+
+
doc(Id) ->
doc(Id, Id).
diff --git a/src/couch_views/test/couch_views_trace_index_test.erl b/src/couch_views/test/couch_views_trace_index_test.erl
index f8a5ce535..5b15a4ce2 100644
--- a/src/couch_views/test/couch_views_trace_index_test.erl
+++ b/src/couch_views/test/couch_views_trace_index_test.erl
@@ -77,10 +77,13 @@ trace_single_doc(Db) ->
{ok, _} = fabric2_db:update_doc(Db, Doc, []),
{ok, Mrst} = couch_views_util:ddoc_to_mrst(DbName, DDoc),
+ HexSig = fabric2_util:to_hex(Mrst#mrst.sig),
JobData = #{
<<"db_name">> => DbName,
+ <<"db_uuid">> => fabric2_db:get_uuid(Db),
<<"ddoc_id">> => <<"_design/bar">>,
- <<"sig">> => fabric2_util:to_hex(Mrst#mrst.sig)
+ <<"sig">> => HexSig,
+ <<"retries">> => 0
},
meck:expect(couch_jobs, accept, 2, {ok, job, JobData}),
meck:expect(couch_jobs, update, 3, {ok, job}),
diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl
index 912d4dfa8..2295a5648 100644
--- a/src/fabric/src/fabric2_fdb.erl
+++ b/src/fabric/src/fabric2_fdb.erl
@@ -64,6 +64,8 @@
seq_to_vs/1,
next_vs/1,
+ new_versionstamp/1,
+
debug_cluster/0,
debug_cluster/2
]).
@@ -1021,6 +1023,11 @@ next_vs({versionstamp, VS, Batch, TxId}) ->
{versionstamp, V, B, T}.
+new_versionstamp(Tx) ->
+ TxId = erlfdb:get_next_tx_id(Tx),
+ {versionstamp, 16#FFFFFFFFFFFFFFFF, 16#FFFF, TxId}.
+
+
debug_cluster() ->
debug_cluster(<<>>, <<16#FE, 16#FF, 16#FF>>).
@@ -1763,11 +1770,6 @@ get_transaction_id(Tx, LayerPrefix) ->
end.
-new_versionstamp(Tx) ->
- TxId = erlfdb:get_next_tx_id(Tx),
- {versionstamp, 16#FFFFFFFFFFFFFFFF, 16#FFFF, TxId}.
-
-
on_commit(Tx, Fun) when is_function(Fun, 0) ->
% Here we rely on Tx objects matching. However they contain a nif resource
% object. Before Erlang 20.0 those would have been represented as empty