diff options
author | iilyak <iilyak@users.noreply.github.com> | 2019-02-14 05:42:17 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-02-14 05:42:17 -0800 |
commit | 9b85da8a7403eb8f9c7781281c9b760311c3f33c (patch) | |
tree | d6e8b0c5473d1fe08694e6d5c7df2dddc7634d55 | |
parent | e699fe6859eb4fb32eafbd637cda1fc7d5e9043a (diff) | |
parent | 13bf0eb80813477bf3fbe79cffaf0faddffaaca9 (diff) | |
download | couchdb-9b85da8a7403eb8f9c7781281c9b760311c3f33c.tar.gz |
Merge pull request #1642 from cloudant/91984-set-io_priority-for-couch-index-pids
Set io_priority for couch_index pids
-rw-r--r-- | src/couch/src/couch_debug.erl | 20 | ||||
-rw-r--r-- | src/couch/src/test_util.erl | 17 | ||||
-rw-r--r-- | src/couch/test/couch_index_tests.erl | 234 | ||||
-rw-r--r-- | src/couch_index/src/couch_index_compactor.erl | 2 | ||||
-rw-r--r-- | src/couch_index/src/couch_index_updater.erl | 2 | ||||
-rw-r--r-- | src/couch_mrview/src/couch_mrview_updater.erl | 11 |
6 files changed, 271 insertions, 15 deletions
diff --git a/src/couch/src/couch_debug.erl b/src/couch/src/couch_debug.erl index 9506a80ce..290d095bf 100644 --- a/src/couch/src/couch_debug.erl +++ b/src/couch/src/couch_debug.erl @@ -234,19 +234,17 @@ opened_files_contains(FileNameFragment) -> string:str(Path, FileNameFragment) > 0 end, couch_debug:opened_files()). + process_name(Pid) when is_pid(Pid) -> - case process_info(Pid, registered_name) of - {registered_name, Name} -> + Info = process_info(Pid, [registered_name, dictionary, initial_call]), + case Info of + undefined -> + iolist_to_list(io_lib:format("[~p]", [Pid])); + [{registered_name, Name} | _] when Name =/= [] -> iolist_to_list(io_lib:format("~s[~p]", [Name, Pid])); - _ -> - {dictionary, Dict} = process_info(Pid, dictionary), - case proplists:get_value('$initial_call', Dict) of - undefined -> - {initial_call, {M, F, A}} = process_info(Pid, initial_call), - iolist_to_list(io_lib:format("~p:~p/~p[~p]", [M, F, A, Pid])); - {M, F, A} -> - iolist_to_list(io_lib:format("~p:~p/~p[~p]", [M, F, A, Pid])) - end + [_, {dictionary, Dict}, {initial_call, MFA}] -> + {M, F, A} = proplists:get_value('$initial_call', Dict, MFA), + iolist_to_list(io_lib:format("~p:~p/~p[~p]", [M, F, A, Pid])) end; process_name(Else) -> iolist_to_list(io_lib:format("~p", [Else])). diff --git a/src/couch/src/test_util.erl b/src/couch/src/test_util.erl index efb506460..9566e8eec 100644 --- a/src/couch/src/test_util.erl +++ b/src/couch/src/test_util.erl @@ -32,6 +32,7 @@ -export([with_process_restart/1, with_process_restart/2, with_process_restart/3]). -export([wait_process/1, wait_process/2]). -export([wait/1, wait/2, wait/3]). +-export([wait_value/2, wait_other_value/2]). -export([start/1, start/2, start/3, stop/1]). @@ -222,6 +223,22 @@ wait(Fun, Timeout, Delay, Started, _Prev) -> Else end. +wait_value(Fun, Value) -> + wait(fun() -> + case Fun() of + Value -> Value; + _ -> wait + end + end). + +wait_other_value(Fun, Value) -> + wait(fun() -> + case Fun() of + Value -> wait; + Other -> Other + end + end). + start(Module) -> start(Module, [], []). diff --git a/src/couch/test/couch_index_tests.erl b/src/couch/test/couch_index_tests.erl new file mode 100644 index 000000000..fab3806d0 --- /dev/null +++ b/src/couch/test/couch_index_tests.erl @@ -0,0 +1,234 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_index_tests). + +-include_lib("couch/include/couch_eunit.hrl"). +-include_lib("couch/include/couch_db.hrl"). +-include_lib("couch_mrview/include/couch_mrview.hrl"). +-include_lib("stdlib/include/ms_transform.hrl"). + +-define(TIMEOUT, 1000). + +setup() -> + DbName = ?tempdb(), + {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]), + ok = couch_db:close(Db), + create_design_doc(DbName, <<"_design/foo">>, <<"bar">>), + tracer_new(), + DbName. + +teardown(DbName) -> + tracer_delete(), + couch_server:delete(DbName, [?ADMIN_CTX]). + +couch_index_ioq_priority_test_() -> + { + "Test ioq_priority for views", + { + setup, + fun test_util:start_couch/0, fun test_util:stop_couch/1, + { + foreach, + fun setup/0, fun teardown/1, + [ + fun check_io_priority_for_updater/1, + fun check_io_priority_for_compactor/1 + ] + } + } + }. + + +check_io_priority_for_updater(DbName) -> + ?_test(begin + {ok, IndexerPid} = couch_index_server:get_index( + couch_mrview_index, DbName, <<"_design/foo">>), + CouchIndexUpdaterPid = updater_pid(IndexerPid), + tracer_record(CouchIndexUpdaterPid), + + create_docs(DbName), + + CommittedSeq = couch_util:with_db(DbName, fun(Db) -> couch_db:get_update_seq(Db) end), + couch_index:get_state(IndexerPid, CommittedSeq), + [UpdaterPid] = wait_spawn_event_for_pid(CouchIndexUpdaterPid), + + [UpdaterMapProcess] = wait_spawn_by_anonymous_fun( + UpdaterPid, '-start_update/4-fun-0-'), + + ?assert(wait_set_io_priority( + UpdaterMapProcess, {view_update, DbName, <<"_design/foo">>})), + + [UpdaterWriterProcess] = wait_spawn_by_anonymous_fun( + UpdaterPid, '-start_update/4-fun-1-'), + ?assert(wait_set_io_priority( + UpdaterWriterProcess, {view_update, DbName, <<"_design/foo">>})), + + ok + end). + +check_io_priority_for_compactor(DbName) -> + ?_test(begin + {ok, IndexerPid} = couch_index_server:get_index( + couch_mrview_index, DbName, <<"_design/foo">>), + {ok, CompactorPid} = couch_index:get_compactor_pid(IndexerPid), + tracer_record(CompactorPid), + + create_docs(DbName), + + couch_index:compact(IndexerPid), + wait_spawn_event_for_pid(CompactorPid), + + [CompactorProcess] = wait_spawn_by_anonymous_fun( + CompactorPid, '-handle_call/3-fun-0-'), + ?assert(wait_set_io_priority( + CompactorProcess, {view_compact, DbName, <<"_design/foo">>})), + ok + end). + +create_docs(DbName) -> + {ok, Db} = couch_db:open(DbName, [?ADMIN_CTX]), + Doc1 = couch_doc:from_json_obj({[ + {<<"_id">>, <<"doc1">>}, + {<<"value">>, 1} + + ]}), + Doc2 = couch_doc:from_json_obj({[ + {<<"_id">>, <<"doc2">>}, + {<<"value">>, 2} + + ]}), + Doc3 = couch_doc:from_json_obj({[ + {<<"_id">>, <<"doc3">>}, + {<<"value">>, 3} + + ]}), + {ok, _} = couch_db:update_docs(Db, [Doc1, Doc2, Doc3]), + couch_db:ensure_full_commit(Db), + couch_db:close(Db). + +create_design_doc(DbName, DDName, ViewName) -> + {ok, Db} = couch_db:open(DbName, [?ADMIN_CTX]), + DDoc = couch_doc:from_json_obj({[ + {<<"_id">>, DDName}, + {<<"language">>, <<"javascript">>}, + {<<"views">>, {[ + {ViewName, {[ + {<<"map">>, <<"function(doc) { emit(doc.value, null); }">>} + ]}} + ]}} + ]}), + {ok, Rev} = couch_db:update_doc(Db, DDoc, []), + couch_db:ensure_full_commit(Db), + couch_db:close(Db), + Rev. + +wait_set_io_priority(Pid, IOPriority) -> + test_util:wait_value(fun() -> + does_process_set_io_priority(Pid, IOPriority) + end, true). + +does_process_set_io_priority(Pid, IOPriority) -> + PutCallsArgs = find_calls_to_fun(Pid, {erlang, put, 2}), + lists:any(fun([_, Priority]) -> Priority =:= IOPriority end, PutCallsArgs). + +wait_events(MatchSpec) -> + test_util:wait_other_value(fun() -> select(MatchSpec) end, []). + +find_spawned_by_anonymous_fun(ParentPid, Name) -> + AnonymousFuns = select(ets:fun2ms(fun + ({spawned, Pid, _TS, _Name, _Dict, [PPid, {erlang, apply, [Fun, _]}]}) + when is_function(Fun) andalso PPid =:= ParentPid -> {Pid, Fun} + end)), + lists:filtermap(fun({Pid, Fun}) -> + case erlang:fun_info(Fun, name) of + {name, Name} -> {true, Pid}; + _ -> false + end + end, AnonymousFuns). + +find_calls_to_fun(Pid, {Module, Function, Arity}) -> + select(ets:fun2ms(fun + ({call, P, _TS, _Name, _Dict, [{M, F, Args}]}) + when length(Args) =:= Arity + andalso M =:= Module + andalso F =:= Function + andalso P =:= Pid + -> Args + end)). + +wait_spawn_event_for_pid(ParentPid) -> + wait_events(ets:fun2ms(fun + ({spawned, Pid, _TS, _Name, _Dict, [P, _]}) when P =:= ParentPid -> Pid + end)). + +wait_spawn_by_anonymous_fun(ParentPid, Name) -> + test_util:wait_other_value(fun() -> + find_spawned_by_anonymous_fun(ParentPid, Name) + end, []). + +updater_pid(IndexerPid) -> + {links, Links} = process_info(IndexerPid, links), + [Pid] = select_process_by_name_prefix(Links, "couch_index_updater:init/1"), + Pid. + +select_process_by_name_prefix(Pids, Name) -> + lists:filter(fun(Pid) -> + Key = couch_debug:process_name(Pid), + string:str(Key, Name) =:= 1 + end, Pids). + +select(MatchSpec) -> + lists:filtermap(fun(Event) -> + case ets:test_ms(Event, MatchSpec) of + {ok, false} -> false; + {ok, Result} -> {true, Result}; + _ -> false + end + end, tracer_events()). + + +%% ======================== +%% Tracer related functions +%% ------------------------ +tracer_new() -> + ets:new(?MODULE, [public, named_table]), + {ok, _Tracer} = dbg:tracer(process, {fun tracer_collector/2, 0}), + ok. + +tracer_delete() -> + dbg:stop_clear(), + (catch ets:delete(?MODULE)), + ok. + +tracer_record(Pid) -> + {ok, _} = dbg:tp(erlang, put, x), + {ok, _} = dbg:p(Pid, [c, p, sos]), + ok. + +tracer_events() -> + Events = [{Idx, E} || [Idx, E] <- ets:match(?MODULE, {{trace, '$1'}, '$2'})], + {_, Sorted} = lists:unzip(lists:keysort(1, Events)), + Sorted. + +tracer_collector(Msg, Seq) -> + ets:insert(?MODULE, {{trace, Seq}, normalize_trace_msg(Msg)}), + Seq + 1. + +normalize_trace_msg(TraceMsg) -> + case tuple_to_list(TraceMsg) of + [trace_ts, Pid, Type | Info] -> + {TraceInfo, [Timestamp]} = lists:split(length(Info)-1, Info), + {Type, Pid, Timestamp, couch_debug:process_name(Pid), process_info(Pid), TraceInfo}; + [trace, Pid, Type | TraceInfo] -> + {Type, Pid, os:timestamp(), couch_debug:process_name(Pid), process_info(Pid), TraceInfo} + end. diff --git a/src/couch_index/src/couch_index_compactor.erl b/src/couch_index/src/couch_index_compactor.erl index 61f406c1a..8849cf67d 100644 --- a/src/couch_index/src/couch_index_compactor.erl +++ b/src/couch_index/src/couch_index_compactor.erl @@ -117,6 +117,8 @@ compact(Parent, Mod, IdxState) -> compact(Idx, Mod, IdxState, Opts) -> DbName = Mod:get(db_name, IdxState), + IndexName = Mod:get(idx_name, IdxState), + erlang:put(io_priority, {view_compact, DbName, IndexName}), Args = [DbName, Mod:get(idx_name, IdxState)], couch_log:info("Compaction started for db: ~s idx: ~s", Args), {ok, NewIdxState} = couch_util:with_db(DbName, fun(Db) -> diff --git a/src/couch_index/src/couch_index_updater.erl b/src/couch_index/src/couch_index_updater.erl index 7864bde4d..fb15db052 100644 --- a/src/couch_index/src/couch_index_updater.erl +++ b/src/couch_index/src/couch_index_updater.erl @@ -128,6 +128,8 @@ code_change(_OldVsn, State, _Extra) -> update(Idx, Mod, IdxState) -> DbName = Mod:get(db_name, IdxState), + IndexName = Mod:get(idx_name, IdxState), + erlang:put(io_priority, {view_update, DbName, IndexName}), CurrSeq = Mod:get(update_seq, IdxState), UpdateOpts = Mod:get(update_options, IdxState), CommittedOnly = lists:member(committed_only, UpdateOpts), diff --git a/src/couch_mrview/src/couch_mrview_updater.erl b/src/couch_mrview/src/couch_mrview_updater.erl index 9740e6a28..18657b468 100644 --- a/src/couch_mrview/src/couch_mrview_updater.erl +++ b/src/couch_mrview/src/couch_mrview_updater.erl @@ -25,7 +25,6 @@ start_update(Partial, State, NumChanges, NumChangesDone) -> QueueOpts = [{max_size, MaxSize}, {max_items, MaxItems}], {ok, DocQueue} = couch_work_queue:new(QueueOpts), {ok, WriteQueue} = couch_work_queue:new(QueueOpts), - InitState = State#mrst{ first_build=State#mrst.update_seq==0, partial_resp_pid=Partial, @@ -37,6 +36,8 @@ start_update(Partial, State, NumChanges, NumChangesDone) -> Self = self(), MapFun = fun() -> + erlang:put(io_priority, + {view_update, State#mrst.db_name, State#mrst.idx_name}), Progress = case NumChanges of 0 -> 0; _ -> (NumChangesDone * 100) div NumChanges @@ -53,8 +54,11 @@ start_update(Partial, State, NumChanges, NumChangesDone) -> couch_task_status:set_update_frequency(500), map_docs(Self, InitState) end, - WriteFun = fun() -> write_results(Self, InitState) end, - + WriteFun = fun() -> + erlang:put(io_priority, + {view_update, State#mrst.db_name, State#mrst.idx_name}), + write_results(Self, InitState) + end, spawn_link(MapFun), spawn_link(WriteFun), @@ -219,7 +223,6 @@ write_results(Parent, #mrst{db_name = DbName, idx_name = IdxName} = State) -> stop -> Parent ! {new_state, State}; {Go, {Seq, ViewKVs, DocIdKeys, Seqs, Log}} -> - erlang:put(io_priority, {view_update, DbName, IdxName}), NewState = write_kvs(State, Seq, ViewKVs, DocIdKeys, Seqs, Log), if Go == stop -> Parent ! {new_state, NewState}; |