summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTony Sun <tony.sun427@gmail.com>2020-07-24 09:54:41 -0700
committerTony Sun <tony.sun427@gmail.com>2020-07-24 09:54:41 -0700
commita447f074dbba1417eb902f181f8f15e0da2da856 (patch)
treee7c087e3e7b914fdb52c285bcaced00e5570d5da
parentfd9557a9afd6831bd4271176937bab3b932f88d9 (diff)
downloadcouchdb-a447f074dbba1417eb902f181f8f15e0da2da856.tar.gz
add active_tasks for view builds using version stamps
Active Tasks requires TotalChanges and ChangesDone to show the progress of long running tasks. This requires count_changes_since to be implemented. Unfortunately, that is not easily done via with foundationdb. This commit replaces TotalChanges with the versionstamp + the number of docs as a progress indicator. This can possibly break existing api that relys on TotalChanges. ChangesDone will still exist, but instead of relying on the current changes seq it is simply a reflection of how many documents were written by the updater process.
-rw-r--r--src/couch_views/src/couch_views_indexer.erl33
-rw-r--r--src/couch_views/src/couch_views_util.erl27
-rw-r--r--src/couch_views/test/couch_views_active_tasks_test.erl155
3 files changed, 208 insertions, 7 deletions
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index 31868d9c0..9183d982e 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -90,6 +90,7 @@ init() ->
job => Job,
job_data => Data,
count => 0,
+ changes_done => 0,
limiter => Limiter,
doc_acc => [],
design_opts => Mrst#mrst.design_opts
@@ -132,7 +133,9 @@ upgrade_data(Data) ->
true -> Acc;
false -> maps:put(Key, Default, Acc)
end
- end, Data, Defaults).
+ end, Data, Defaults),
+ % initialize active task
+ fabric2_active_tasks:update_active_task_info(Data, #{}).
% Transaction limit exceeded don't retry
@@ -191,7 +194,8 @@ do_update(Db, Mrst0, State0) ->
last_seq := LastSeq,
limit := Limit,
limiter := Limiter,
- view_vs := ViewVS
+ view_vs := ViewVS,
+ changes_done := ChangesDone0
} = State2,
DocAcc1 = fetch_docs(TxDb, DocAcc),
couch_rate:in(Limiter, Count),
@@ -199,13 +203,16 @@ do_update(Db, Mrst0, State0) ->
{Mrst1, MappedDocs} = map_docs(Mrst0, DocAcc1),
WrittenDocs = write_docs(TxDb, Mrst1, MappedDocs, State2),
+ ChangesDone = ChangesDone0 + WrittenDocs,
+
couch_rate:success(Limiter, WrittenDocs),
case Count < Limit of
true ->
maybe_set_build_status(TxDb, Mrst1, ViewVS,
?INDEX_READY),
- report_progress(State2, finished),
+ report_progress(State2#{changes_done := ChangesDone},
+ finished),
{Mrst1, finished};
false ->
State3 = report_progress(State2, update),
@@ -213,6 +220,7 @@ do_update(Db, Mrst0, State0) ->
tx_db := undefined,
count := 0,
doc_acc := [],
+ changes_done := ChangesDone,
view_seq := LastSeq
}}
end
@@ -483,7 +491,9 @@ report_progress(State, UpdateType) ->
tx_db := TxDb,
job := Job1,
job_data := JobData,
- last_seq := LastSeq
+ last_seq := LastSeq,
+ db_seq := DBSeq,
+ changes_done := ChangesDone
} = State,
#{
@@ -494,9 +504,18 @@ report_progress(State, UpdateType) ->
<<"retries">> := Retries
} = JobData,
+ ActiveTasks = fabric2_active_tasks:get_active_task_info(JobData),
+ TotalDone = case maps:get(<<"changes_done">>, ActiveTasks, 0) of
+ 0 -> ChangesDone;
+ N -> N + ChangesDone
+ end,
+
+ NewActiveTasks = couch_views_util:active_tasks_info(TotalDone,
+ DbName, DDocId, LastSeq, DBSeq),
+
% Reconstruct from scratch to remove any
% possible existing error state.
- NewData = #{
+ NewData0 = #{
<<"db_name">> => DbName,
<<"db_uuid">> => DbUUID,
<<"ddoc_id">> => DDocId,
@@ -504,6 +523,8 @@ report_progress(State, UpdateType) ->
<<"view_seq">> => LastSeq,
<<"retries">> => Retries
},
+ NewData = fabric2_active_tasks:update_active_task_info(NewData0,
+ NewActiveTasks),
case UpdateType of
update ->
@@ -540,4 +561,4 @@ key_size_limit() ->
value_size_limit() ->
- config:get_integer("couch_views", "value_size_limit", ?VALUE_SIZE_LIMIT).
+ config:get_integer("couch_views", "value_size_limit", ?VALUE_SIZE_LIMIT). \ No newline at end of file
diff --git a/src/couch_views/src/couch_views_util.erl b/src/couch_views/src/couch_views_util.erl
index 154e9e270..11bba75bd 100644
--- a/src/couch_views/src/couch_views_util.erl
+++ b/src/couch_views/src/couch_views_util.erl
@@ -17,7 +17,8 @@
ddoc_to_mrst/2,
validate_args/1,
validate_args/2,
- is_paginated/1
+ is_paginated/1,
+ active_tasks_info/5
]).
@@ -276,3 +277,27 @@ is_paginated(#mrargs{page_size = PageSize}) when is_integer(PageSize) ->
is_paginated(_) ->
false.
+
+
+active_tasks_info(ChangesDone, DbName, DDocId, LastSeq, DBSeq) ->
+ #{
+ <<"type">> => <<"indexer">>,
+ <<"database">> => DbName,
+ <<"changes_done">> => ChangesDone,
+ <<"design_document">> => DDocId,
+ <<"current_version_stamp">> => convert_seq_to_stamp(LastSeq),
+ <<"db_version_stamp">> => convert_seq_to_stamp(DBSeq)
+ }.
+
+
+convert_seq_to_stamp(<<"0">>) ->
+ <<"0-0-0">>;
+
+convert_seq_to_stamp(undefined) ->
+ <<"0-0-0">>;
+
+convert_seq_to_stamp(Seq) ->
+ {_, Stamp, Batch, DocNumber} = fabric2_fdb:seq_to_vs(Seq),
+ VS = integer_to_list(Stamp) ++ "-" ++ integer_to_list(Batch) ++ "-"
+ ++ integer_to_list(DocNumber),
+ list_to_binary(VS).
diff --git a/src/couch_views/test/couch_views_active_tasks_test.erl b/src/couch_views/test/couch_views_active_tasks_test.erl
new file mode 100644
index 000000000..f87e01055
--- /dev/null
+++ b/src/couch_views/test/couch_views_active_tasks_test.erl
@@ -0,0 +1,155 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_views_active_tasks_test).
+
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_views/include/couch_views.hrl").
+-include_lib("fabric/test/fabric2_test.hrl").
+
+
+-define(MAP_FUN1, <<"map_fun1">>).
+-define(MAP_FUN2, <<"map_fun2">>).
+-define(INDEX_FOO, <<"_design/foo">>).
+-define(INDEX_BAR, <<"_design/bar">>).
+-define(TOTAL_DOCS, 1000).
+
+
+setup() ->
+ Ctx = test_util:start_couch([
+ fabric,
+ couch_jobs,
+ couch_js,
+ couch_views
+ ]),
+ Ctx.
+
+
+cleanup(Ctx) ->
+ test_util:stop_couch(Ctx).
+
+
+foreach_setup() ->
+ {ok, Db} = fabric2_db:create(?tempdb(), [{user_ctx, ?ADMIN_USER}]),
+
+ DDoc = create_ddoc(?INDEX_FOO, ?MAP_FUN1),
+ Docs = make_docs(?TOTAL_DOCS),
+ fabric2_db:update_docs(Db, [DDoc | Docs]),
+
+ {Db, DDoc}.
+
+
+foreach_teardown({Db, _}) ->
+ meck:unload(),
+ ok = fabric2_db:delete(fabric2_db:name(Db), []).
+
+
+active_tasks_test_() ->
+ {
+ "Active Tasks test",
+ {
+ setup,
+ fun setup/0,
+ fun cleanup/1,
+ {
+ foreach,
+ fun foreach_setup/0,
+ fun foreach_teardown/1,
+ [
+ ?TDEF_FE(verify_basic_active_tasks),
+ ?TDEF_FE(verify_muliple_active_tasks)
+ ]
+ }
+ }
+ }.
+
+
+verify_basic_active_tasks({Db, DDoc}) ->
+ pause_indexer_for_changes(self()),
+ couch_views:build_indices(Db, [DDoc]),
+ {IndexerPid, {changes_done, ChangesDone}} = wait_to_reach_changes(10000),
+ [ActiveTask] = fabric2_active_tasks:get_active_tasks(),
+ ChangesDone1 = maps:get(<<"changes_done">>, ActiveTask),
+ IndexerPid ! continue,
+ % we assume the indexer has run for a bit so it has to > 0
+ ?assert(ChangesDone1 > 0),
+ ?assert(ChangesDone1 =< ChangesDone),
+ ?assertEqual(ChangesDone, ?TOTAL_DOCS).
+
+
+verify_muliple_active_tasks({Db, DDoc}) ->
+ DDoc2 = create_ddoc(?INDEX_BAR, ?MAP_FUN2),
+ fabric2_db:update_doc(Db, DDoc2, []),
+ pause_indexer_for_changes(self()),
+ couch_views:build_indices(Db, [DDoc, DDoc2]),
+
+ {IndexerPid, {changes_done, ChangesDone}} = wait_to_reach_changes(10000),
+ {IndexerPid2, {changes_done, ChangesDone2}} = wait_to_reach_changes(10000),
+
+ ActiveTasks = fabric2_active_tasks:get_active_tasks(),
+
+ ?assertEqual(length(ActiveTasks), 2),
+
+ IndexerPid ! continue,
+ IndexerPid2 ! continue,
+
+ ?assertEqual(ChangesDone, ?TOTAL_DOCS),
+ ?assertEqual(ChangesDone2, ?TOTAL_DOCS).
+
+
+create_ddoc(DDocId, IndexName) ->
+ couch_doc:from_json_obj({[
+ {<<"_id">>, DDocId},
+ {<<"views">>, {[
+ {IndexName, {[
+ {<<"map">>, <<"function(doc) {emit(doc.val, doc.val);}">>}
+ ]}}
+ ]}}
+ ]}).
+
+
+doc(Id, Val) ->
+ couch_doc:from_json_obj({[
+ {<<"_id">>, list_to_binary(integer_to_list(Id))},
+ {<<"val">>, Val}
+ ]}).
+
+
+make_docs(Count) ->
+ [doc(I, Count) || I <- lists:seq(1, Count)].
+
+
+pause_indexer_for_changes(ParentPid) ->
+ meck:new(couch_views_util, [passthrough]),
+ meck:expect(couch_views_util, active_tasks_info, fun(ChangesDone,
+ DbName, DDocId, LastSeq, DBSeq) ->
+ case ChangesDone of
+ ?TOTAL_DOCS ->
+ ParentPid ! {self(), {changes_done, ChangesDone}},
+ receive continue -> ok end;
+ _ ->
+ ok
+ end,
+ meck:passthrough([ChangesDone, DbName, DDocId, LastSeq,
+ DBSeq])
+ end).
+
+
+wait_to_reach_changes(Timeout) ->
+ receive
+ {Pid, {changes_done, ChangesDone}} when is_pid(Pid) ->
+ {Pid, {changes_done, ChangesDone}}
+ after Timeout ->
+ error(timeout_in_pause_indexer_for_changes)
+ end.