diff options
author | Tony Sun <tony.sun427@gmail.com> | 2020-07-24 10:01:43 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-07-24 10:01:43 -0700 |
commit | 0c7c77eb59d626505cf6ec4162e1053837bc3593 (patch) | |
tree | f4ca1e1110a19d4a526efe91f5e51fac2eba5508 | |
parent | 81a8db719418f7b6a8af3d4d767e1d99dc139ace (diff) | |
parent | a447f074dbba1417eb902f181f8f15e0da2da856 (diff) | |
download | couchdb-0c7c77eb59d626505cf6ec4162e1053837bc3593.tar.gz |
Merge pull request #3003 from apache/add_active_tasks_fdb
add active_tasks for view builds using version stamps
-rw-r--r-- | src/chttpd/src/chttpd_misc.erl | 7 | ||||
-rw-r--r-- | src/couch_jobs/src/couch_jobs.erl | 19 | ||||
-rw-r--r-- | src/couch_views/src/couch_views_indexer.erl | 33 | ||||
-rw-r--r-- | src/couch_views/src/couch_views_util.erl | 27 | ||||
-rw-r--r-- | src/couch_views/test/couch_views_active_tasks_test.erl | 155 | ||||
-rw-r--r-- | src/fabric/src/fabric2_active_tasks.erl | 51 |
6 files changed, 280 insertions, 12 deletions
diff --git a/src/chttpd/src/chttpd_misc.erl b/src/chttpd/src/chttpd_misc.erl index 07d53714a..ec2435c41 100644 --- a/src/chttpd/src/chttpd_misc.erl +++ b/src/chttpd/src/chttpd_misc.erl @@ -294,11 +294,8 @@ dbs_info_callback({error, Reason}, #vacc{resp = Resp0} = Acc) -> handle_task_status_req(#httpd{method='GET'}=Req) -> ok = chttpd:verify_is_server_admin(Req), - {Replies, _BadNodes} = gen_server:multi_call(couch_task_status, all), - Response = lists:flatmap(fun({Node, Tasks}) -> - [{[{node,Node} | Task]} || Task <- Tasks] - end, Replies), - send_json(Req, lists:sort(Response)); + ActiveTasks = fabric2_active_tasks:get_active_tasks(), + send_json(Req, ActiveTasks); handle_task_status_req(Req) -> send_method_not_allowed(Req, "GET,HEAD"). diff --git a/src/couch_jobs/src/couch_jobs.erl b/src/couch_jobs/src/couch_jobs.erl index 88b4bf470..f6fb62664 100644 --- a/src/couch_jobs/src/couch_jobs.erl +++ b/src/couch_jobs/src/couch_jobs.erl @@ -19,6 +19,8 @@ remove/3, get_job_data/3, get_job_state/3, + get_active_jobs_ids/2, + get_types/1, % Job processing accept/1, @@ -104,6 +106,23 @@ get_job_state(Tx, Type, JobId) when is_binary(JobId) -> end). +-spec get_active_jobs_ids(jtx(), job_type()) -> [job_id()] | {error, + any()}. +get_active_jobs_ids(Tx, Type) -> + couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) -> + Since = couch_jobs_fdb:get_active_since(JTx, Type, + {versionstamp, 0, 0}), + maps:keys(Since) + end). + + +-spec get_types(jtx()) -> [job_type()] | {error, any()}. +get_types(Tx) -> + couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Tx), fun(JTx) -> + couch_jobs_fdb:get_types(JTx) + end). + + %% Job processor API -spec accept(job_type()) -> {ok, job(), job_data()} | {error, any()}. diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl index 31868d9c0..9183d982e 100644 --- a/src/couch_views/src/couch_views_indexer.erl +++ b/src/couch_views/src/couch_views_indexer.erl @@ -90,6 +90,7 @@ init() -> job => Job, job_data => Data, count => 0, + changes_done => 0, limiter => Limiter, doc_acc => [], design_opts => Mrst#mrst.design_opts @@ -132,7 +133,9 @@ upgrade_data(Data) -> true -> Acc; false -> maps:put(Key, Default, Acc) end - end, Data, Defaults). + end, Data, Defaults), + % initialize active task + fabric2_active_tasks:update_active_task_info(Data, #{}). % Transaction limit exceeded don't retry @@ -191,7 +194,8 @@ do_update(Db, Mrst0, State0) -> last_seq := LastSeq, limit := Limit, limiter := Limiter, - view_vs := ViewVS + view_vs := ViewVS, + changes_done := ChangesDone0 } = State2, DocAcc1 = fetch_docs(TxDb, DocAcc), couch_rate:in(Limiter, Count), @@ -199,13 +203,16 @@ do_update(Db, Mrst0, State0) -> {Mrst1, MappedDocs} = map_docs(Mrst0, DocAcc1), WrittenDocs = write_docs(TxDb, Mrst1, MappedDocs, State2), + ChangesDone = ChangesDone0 + WrittenDocs, + couch_rate:success(Limiter, WrittenDocs), case Count < Limit of true -> maybe_set_build_status(TxDb, Mrst1, ViewVS, ?INDEX_READY), - report_progress(State2, finished), + report_progress(State2#{changes_done := ChangesDone}, + finished), {Mrst1, finished}; false -> State3 = report_progress(State2, update), @@ -213,6 +220,7 @@ do_update(Db, Mrst0, State0) -> tx_db := undefined, count := 0, doc_acc := [], + changes_done := ChangesDone, view_seq := LastSeq }} end @@ -483,7 +491,9 @@ report_progress(State, UpdateType) -> tx_db := TxDb, job := Job1, job_data := JobData, - last_seq := LastSeq + last_seq := LastSeq, + db_seq := DBSeq, + changes_done := ChangesDone } = State, #{ @@ -494,9 +504,18 @@ report_progress(State, UpdateType) -> <<"retries">> := Retries } = JobData, + ActiveTasks = fabric2_active_tasks:get_active_task_info(JobData), + TotalDone = case maps:get(<<"changes_done">>, ActiveTasks, 0) of + 0 -> ChangesDone; + N -> N + ChangesDone + end, + + NewActiveTasks = couch_views_util:active_tasks_info(TotalDone, + DbName, DDocId, LastSeq, DBSeq), + % Reconstruct from scratch to remove any % possible existing error state. - NewData = #{ + NewData0 = #{ <<"db_name">> => DbName, <<"db_uuid">> => DbUUID, <<"ddoc_id">> => DDocId, @@ -504,6 +523,8 @@ report_progress(State, UpdateType) -> <<"view_seq">> => LastSeq, <<"retries">> => Retries }, + NewData = fabric2_active_tasks:update_active_task_info(NewData0, + NewActiveTasks), case UpdateType of update -> @@ -540,4 +561,4 @@ key_size_limit() -> value_size_limit() -> - config:get_integer("couch_views", "value_size_limit", ?VALUE_SIZE_LIMIT). + config:get_integer("couch_views", "value_size_limit", ?VALUE_SIZE_LIMIT).
\ No newline at end of file diff --git a/src/couch_views/src/couch_views_util.erl b/src/couch_views/src/couch_views_util.erl index 154e9e270..11bba75bd 100644 --- a/src/couch_views/src/couch_views_util.erl +++ b/src/couch_views/src/couch_views_util.erl @@ -17,7 +17,8 @@ ddoc_to_mrst/2, validate_args/1, validate_args/2, - is_paginated/1 + is_paginated/1, + active_tasks_info/5 ]). @@ -276,3 +277,27 @@ is_paginated(#mrargs{page_size = PageSize}) when is_integer(PageSize) -> is_paginated(_) -> false. + + +active_tasks_info(ChangesDone, DbName, DDocId, LastSeq, DBSeq) -> + #{ + <<"type">> => <<"indexer">>, + <<"database">> => DbName, + <<"changes_done">> => ChangesDone, + <<"design_document">> => DDocId, + <<"current_version_stamp">> => convert_seq_to_stamp(LastSeq), + <<"db_version_stamp">> => convert_seq_to_stamp(DBSeq) + }. + + +convert_seq_to_stamp(<<"0">>) -> + <<"0-0-0">>; + +convert_seq_to_stamp(undefined) -> + <<"0-0-0">>; + +convert_seq_to_stamp(Seq) -> + {_, Stamp, Batch, DocNumber} = fabric2_fdb:seq_to_vs(Seq), + VS = integer_to_list(Stamp) ++ "-" ++ integer_to_list(Batch) ++ "-" + ++ integer_to_list(DocNumber), + list_to_binary(VS). diff --git a/src/couch_views/test/couch_views_active_tasks_test.erl b/src/couch_views/test/couch_views_active_tasks_test.erl new file mode 100644 index 000000000..f87e01055 --- /dev/null +++ b/src/couch_views/test/couch_views_active_tasks_test.erl @@ -0,0 +1,155 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_views_active_tasks_test). + + +-include_lib("couch/include/couch_eunit.hrl"). +-include_lib("couch/include/couch_db.hrl"). +-include_lib("couch_views/include/couch_views.hrl"). +-include_lib("fabric/test/fabric2_test.hrl"). + + +-define(MAP_FUN1, <<"map_fun1">>). +-define(MAP_FUN2, <<"map_fun2">>). +-define(INDEX_FOO, <<"_design/foo">>). +-define(INDEX_BAR, <<"_design/bar">>). +-define(TOTAL_DOCS, 1000). + + +setup() -> + Ctx = test_util:start_couch([ + fabric, + couch_jobs, + couch_js, + couch_views + ]), + Ctx. + + +cleanup(Ctx) -> + test_util:stop_couch(Ctx). + + +foreach_setup() -> + {ok, Db} = fabric2_db:create(?tempdb(), [{user_ctx, ?ADMIN_USER}]), + + DDoc = create_ddoc(?INDEX_FOO, ?MAP_FUN1), + Docs = make_docs(?TOTAL_DOCS), + fabric2_db:update_docs(Db, [DDoc | Docs]), + + {Db, DDoc}. + + +foreach_teardown({Db, _}) -> + meck:unload(), + ok = fabric2_db:delete(fabric2_db:name(Db), []). + + +active_tasks_test_() -> + { + "Active Tasks test", + { + setup, + fun setup/0, + fun cleanup/1, + { + foreach, + fun foreach_setup/0, + fun foreach_teardown/1, + [ + ?TDEF_FE(verify_basic_active_tasks), + ?TDEF_FE(verify_muliple_active_tasks) + ] + } + } + }. + + +verify_basic_active_tasks({Db, DDoc}) -> + pause_indexer_for_changes(self()), + couch_views:build_indices(Db, [DDoc]), + {IndexerPid, {changes_done, ChangesDone}} = wait_to_reach_changes(10000), + [ActiveTask] = fabric2_active_tasks:get_active_tasks(), + ChangesDone1 = maps:get(<<"changes_done">>, ActiveTask), + IndexerPid ! continue, + % we assume the indexer has run for a bit so it has to > 0 + ?assert(ChangesDone1 > 0), + ?assert(ChangesDone1 =< ChangesDone), + ?assertEqual(ChangesDone, ?TOTAL_DOCS). + + +verify_muliple_active_tasks({Db, DDoc}) -> + DDoc2 = create_ddoc(?INDEX_BAR, ?MAP_FUN2), + fabric2_db:update_doc(Db, DDoc2, []), + pause_indexer_for_changes(self()), + couch_views:build_indices(Db, [DDoc, DDoc2]), + + {IndexerPid, {changes_done, ChangesDone}} = wait_to_reach_changes(10000), + {IndexerPid2, {changes_done, ChangesDone2}} = wait_to_reach_changes(10000), + + ActiveTasks = fabric2_active_tasks:get_active_tasks(), + + ?assertEqual(length(ActiveTasks), 2), + + IndexerPid ! continue, + IndexerPid2 ! continue, + + ?assertEqual(ChangesDone, ?TOTAL_DOCS), + ?assertEqual(ChangesDone2, ?TOTAL_DOCS). + + +create_ddoc(DDocId, IndexName) -> + couch_doc:from_json_obj({[ + {<<"_id">>, DDocId}, + {<<"views">>, {[ + {IndexName, {[ + {<<"map">>, <<"function(doc) {emit(doc.val, doc.val);}">>} + ]}} + ]}} + ]}). + + +doc(Id, Val) -> + couch_doc:from_json_obj({[ + {<<"_id">>, list_to_binary(integer_to_list(Id))}, + {<<"val">>, Val} + ]}). + + +make_docs(Count) -> + [doc(I, Count) || I <- lists:seq(1, Count)]. + + +pause_indexer_for_changes(ParentPid) -> + meck:new(couch_views_util, [passthrough]), + meck:expect(couch_views_util, active_tasks_info, fun(ChangesDone, + DbName, DDocId, LastSeq, DBSeq) -> + case ChangesDone of + ?TOTAL_DOCS -> + ParentPid ! {self(), {changes_done, ChangesDone}}, + receive continue -> ok end; + _ -> + ok + end, + meck:passthrough([ChangesDone, DbName, DDocId, LastSeq, + DBSeq]) + end). + + +wait_to_reach_changes(Timeout) -> + receive + {Pid, {changes_done, ChangesDone}} when is_pid(Pid) -> + {Pid, {changes_done, ChangesDone}} + after Timeout -> + error(timeout_in_pause_indexer_for_changes) + end. diff --git a/src/fabric/src/fabric2_active_tasks.erl b/src/fabric/src/fabric2_active_tasks.erl new file mode 100644 index 000000000..2c03ec3a9 --- /dev/null +++ b/src/fabric/src/fabric2_active_tasks.erl @@ -0,0 +1,51 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + + +-module(fabric2_active_tasks). + + +-export([ + get_active_tasks/0, + get_active_task_info/1, + + update_active_task_info/2 +]). + + +-define(ACTIVE_TASK_INFO, <<"active_task_info">>). + + +get_active_tasks() -> + couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(undefined), fun(JTx) -> + Types = couch_jobs:get_types(JTx), + lists:foldl(fun(Type, TaskAcc) -> + JobIds = couch_jobs:get_active_jobs_ids(JTx, Type), + Tasks = lists:filtermap(fun(JobId) -> + {ok, Data} = couch_jobs:get_job_data(JTx, Type, JobId), + case maps:get(?ACTIVE_TASK_INFO, Data, not_found) of + not_found -> false; + Info -> {true, Info} + end + end, JobIds), + TaskAcc ++ Tasks + end, [], Types) + end). + + +get_active_task_info(JobData) -> + #{?ACTIVE_TASK_INFO:= ActiveTaskInfo} = JobData, + ActiveTaskInfo. + + +update_active_task_info(JobData, ActiveTaskInfo) -> + JobData#{?ACTIVE_TASK_INFO => ActiveTaskInfo}. |