summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoriilyak <iilyak@users.noreply.github.com>2019-02-14 05:42:17 -0800
committerGitHub <noreply@github.com>2019-02-14 05:42:17 -0800
commit9b85da8a7403eb8f9c7781281c9b760311c3f33c (patch)
treed6e8b0c5473d1fe08694e6d5c7df2dddc7634d55
parente699fe6859eb4fb32eafbd637cda1fc7d5e9043a (diff)
parent13bf0eb80813477bf3fbe79cffaf0faddffaaca9 (diff)
downloadcouchdb-9b85da8a7403eb8f9c7781281c9b760311c3f33c.tar.gz
Merge pull request #1642 from cloudant/91984-set-io_priority-for-couch-index-pids
Set io_priority for couch_index pids
-rw-r--r--src/couch/src/couch_debug.erl20
-rw-r--r--src/couch/src/test_util.erl17
-rw-r--r--src/couch/test/couch_index_tests.erl234
-rw-r--r--src/couch_index/src/couch_index_compactor.erl2
-rw-r--r--src/couch_index/src/couch_index_updater.erl2
-rw-r--r--src/couch_mrview/src/couch_mrview_updater.erl11
6 files changed, 271 insertions, 15 deletions
diff --git a/src/couch/src/couch_debug.erl b/src/couch/src/couch_debug.erl
index 9506a80ce..290d095bf 100644
--- a/src/couch/src/couch_debug.erl
+++ b/src/couch/src/couch_debug.erl
@@ -234,19 +234,17 @@ opened_files_contains(FileNameFragment) ->
string:str(Path, FileNameFragment) > 0
end, couch_debug:opened_files()).
+
process_name(Pid) when is_pid(Pid) ->
- case process_info(Pid, registered_name) of
- {registered_name, Name} ->
+ Info = process_info(Pid, [registered_name, dictionary, initial_call]),
+ case Info of
+ undefined ->
+ iolist_to_list(io_lib:format("[~p]", [Pid]));
+ [{registered_name, Name} | _] when Name =/= [] ->
iolist_to_list(io_lib:format("~s[~p]", [Name, Pid]));
- _ ->
- {dictionary, Dict} = process_info(Pid, dictionary),
- case proplists:get_value('$initial_call', Dict) of
- undefined ->
- {initial_call, {M, F, A}} = process_info(Pid, initial_call),
- iolist_to_list(io_lib:format("~p:~p/~p[~p]", [M, F, A, Pid]));
- {M, F, A} ->
- iolist_to_list(io_lib:format("~p:~p/~p[~p]", [M, F, A, Pid]))
- end
+ [_, {dictionary, Dict}, {initial_call, MFA}] ->
+ {M, F, A} = proplists:get_value('$initial_call', Dict, MFA),
+ iolist_to_list(io_lib:format("~p:~p/~p[~p]", [M, F, A, Pid]))
end;
process_name(Else) ->
iolist_to_list(io_lib:format("~p", [Else])).
diff --git a/src/couch/src/test_util.erl b/src/couch/src/test_util.erl
index efb506460..9566e8eec 100644
--- a/src/couch/src/test_util.erl
+++ b/src/couch/src/test_util.erl
@@ -32,6 +32,7 @@
-export([with_process_restart/1, with_process_restart/2, with_process_restart/3]).
-export([wait_process/1, wait_process/2]).
-export([wait/1, wait/2, wait/3]).
+-export([wait_value/2, wait_other_value/2]).
-export([start/1, start/2, start/3, stop/1]).
@@ -222,6 +223,22 @@ wait(Fun, Timeout, Delay, Started, _Prev) ->
Else
end.
+wait_value(Fun, Value) ->
+ wait(fun() ->
+ case Fun() of
+ Value -> Value;
+ _ -> wait
+ end
+ end).
+
+wait_other_value(Fun, Value) ->
+ wait(fun() ->
+ case Fun() of
+ Value -> wait;
+ Other -> Other
+ end
+ end).
+
start(Module) ->
start(Module, [], []).
diff --git a/src/couch/test/couch_index_tests.erl b/src/couch/test/couch_index_tests.erl
new file mode 100644
index 000000000..fab3806d0
--- /dev/null
+++ b/src/couch/test/couch_index_tests.erl
@@ -0,0 +1,234 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_index_tests).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+-include_lib("stdlib/include/ms_transform.hrl").
+
+-define(TIMEOUT, 1000).
+
+setup() ->
+ DbName = ?tempdb(),
+ {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
+ ok = couch_db:close(Db),
+ create_design_doc(DbName, <<"_design/foo">>, <<"bar">>),
+ tracer_new(),
+ DbName.
+
+teardown(DbName) ->
+ tracer_delete(),
+ couch_server:delete(DbName, [?ADMIN_CTX]).
+
+couch_index_ioq_priority_test_() ->
+ {
+ "Test ioq_priority for views",
+ {
+ setup,
+ fun test_util:start_couch/0, fun test_util:stop_couch/1,
+ {
+ foreach,
+ fun setup/0, fun teardown/1,
+ [
+ fun check_io_priority_for_updater/1,
+ fun check_io_priority_for_compactor/1
+ ]
+ }
+ }
+ }.
+
+
+check_io_priority_for_updater(DbName) ->
+ ?_test(begin
+ {ok, IndexerPid} = couch_index_server:get_index(
+ couch_mrview_index, DbName, <<"_design/foo">>),
+ CouchIndexUpdaterPid = updater_pid(IndexerPid),
+ tracer_record(CouchIndexUpdaterPid),
+
+ create_docs(DbName),
+
+ CommittedSeq = couch_util:with_db(DbName, fun(Db) -> couch_db:get_update_seq(Db) end),
+ couch_index:get_state(IndexerPid, CommittedSeq),
+ [UpdaterPid] = wait_spawn_event_for_pid(CouchIndexUpdaterPid),
+
+ [UpdaterMapProcess] = wait_spawn_by_anonymous_fun(
+ UpdaterPid, '-start_update/4-fun-0-'),
+
+ ?assert(wait_set_io_priority(
+ UpdaterMapProcess, {view_update, DbName, <<"_design/foo">>})),
+
+ [UpdaterWriterProcess] = wait_spawn_by_anonymous_fun(
+ UpdaterPid, '-start_update/4-fun-1-'),
+ ?assert(wait_set_io_priority(
+ UpdaterWriterProcess, {view_update, DbName, <<"_design/foo">>})),
+
+ ok
+ end).
+
+check_io_priority_for_compactor(DbName) ->
+ ?_test(begin
+ {ok, IndexerPid} = couch_index_server:get_index(
+ couch_mrview_index, DbName, <<"_design/foo">>),
+ {ok, CompactorPid} = couch_index:get_compactor_pid(IndexerPid),
+ tracer_record(CompactorPid),
+
+ create_docs(DbName),
+
+ couch_index:compact(IndexerPid),
+ wait_spawn_event_for_pid(CompactorPid),
+
+ [CompactorProcess] = wait_spawn_by_anonymous_fun(
+ CompactorPid, '-handle_call/3-fun-0-'),
+ ?assert(wait_set_io_priority(
+ CompactorProcess, {view_compact, DbName, <<"_design/foo">>})),
+ ok
+ end).
+
+create_docs(DbName) ->
+ {ok, Db} = couch_db:open(DbName, [?ADMIN_CTX]),
+ Doc1 = couch_doc:from_json_obj({[
+ {<<"_id">>, <<"doc1">>},
+ {<<"value">>, 1}
+
+ ]}),
+ Doc2 = couch_doc:from_json_obj({[
+ {<<"_id">>, <<"doc2">>},
+ {<<"value">>, 2}
+
+ ]}),
+ Doc3 = couch_doc:from_json_obj({[
+ {<<"_id">>, <<"doc3">>},
+ {<<"value">>, 3}
+
+ ]}),
+ {ok, _} = couch_db:update_docs(Db, [Doc1, Doc2, Doc3]),
+ couch_db:ensure_full_commit(Db),
+ couch_db:close(Db).
+
+create_design_doc(DbName, DDName, ViewName) ->
+ {ok, Db} = couch_db:open(DbName, [?ADMIN_CTX]),
+ DDoc = couch_doc:from_json_obj({[
+ {<<"_id">>, DDName},
+ {<<"language">>, <<"javascript">>},
+ {<<"views">>, {[
+ {ViewName, {[
+ {<<"map">>, <<"function(doc) { emit(doc.value, null); }">>}
+ ]}}
+ ]}}
+ ]}),
+ {ok, Rev} = couch_db:update_doc(Db, DDoc, []),
+ couch_db:ensure_full_commit(Db),
+ couch_db:close(Db),
+ Rev.
+
+wait_set_io_priority(Pid, IOPriority) ->
+ test_util:wait_value(fun() ->
+ does_process_set_io_priority(Pid, IOPriority)
+ end, true).
+
+does_process_set_io_priority(Pid, IOPriority) ->
+ PutCallsArgs = find_calls_to_fun(Pid, {erlang, put, 2}),
+ lists:any(fun([_, Priority]) -> Priority =:= IOPriority end, PutCallsArgs).
+
+wait_events(MatchSpec) ->
+ test_util:wait_other_value(fun() -> select(MatchSpec) end, []).
+
+find_spawned_by_anonymous_fun(ParentPid, Name) ->
+ AnonymousFuns = select(ets:fun2ms(fun
+ ({spawned, Pid, _TS, _Name, _Dict, [PPid, {erlang, apply, [Fun, _]}]})
+ when is_function(Fun) andalso PPid =:= ParentPid -> {Pid, Fun}
+ end)),
+ lists:filtermap(fun({Pid, Fun}) ->
+ case erlang:fun_info(Fun, name) of
+ {name, Name} -> {true, Pid};
+ _ -> false
+ end
+ end, AnonymousFuns).
+
+find_calls_to_fun(Pid, {Module, Function, Arity}) ->
+ select(ets:fun2ms(fun
+ ({call, P, _TS, _Name, _Dict, [{M, F, Args}]})
+ when length(Args) =:= Arity
+ andalso M =:= Module
+ andalso F =:= Function
+ andalso P =:= Pid
+ -> Args
+ end)).
+
+wait_spawn_event_for_pid(ParentPid) ->
+ wait_events(ets:fun2ms(fun
+ ({spawned, Pid, _TS, _Name, _Dict, [P, _]}) when P =:= ParentPid -> Pid
+ end)).
+
+wait_spawn_by_anonymous_fun(ParentPid, Name) ->
+ test_util:wait_other_value(fun() ->
+ find_spawned_by_anonymous_fun(ParentPid, Name)
+ end, []).
+
+updater_pid(IndexerPid) ->
+ {links, Links} = process_info(IndexerPid, links),
+ [Pid] = select_process_by_name_prefix(Links, "couch_index_updater:init/1"),
+ Pid.
+
+select_process_by_name_prefix(Pids, Name) ->
+ lists:filter(fun(Pid) ->
+ Key = couch_debug:process_name(Pid),
+ string:str(Key, Name) =:= 1
+ end, Pids).
+
+select(MatchSpec) ->
+ lists:filtermap(fun(Event) ->
+ case ets:test_ms(Event, MatchSpec) of
+ {ok, false} -> false;
+ {ok, Result} -> {true, Result};
+ _ -> false
+ end
+ end, tracer_events()).
+
+
+%% ========================
+%% Tracer related functions
+%% ------------------------
+tracer_new() ->
+ ets:new(?MODULE, [public, named_table]),
+ {ok, _Tracer} = dbg:tracer(process, {fun tracer_collector/2, 0}),
+ ok.
+
+tracer_delete() ->
+ dbg:stop_clear(),
+ (catch ets:delete(?MODULE)),
+ ok.
+
+tracer_record(Pid) ->
+ {ok, _} = dbg:tp(erlang, put, x),
+ {ok, _} = dbg:p(Pid, [c, p, sos]),
+ ok.
+
+tracer_events() ->
+ Events = [{Idx, E} || [Idx, E] <- ets:match(?MODULE, {{trace, '$1'}, '$2'})],
+ {_, Sorted} = lists:unzip(lists:keysort(1, Events)),
+ Sorted.
+
+tracer_collector(Msg, Seq) ->
+ ets:insert(?MODULE, {{trace, Seq}, normalize_trace_msg(Msg)}),
+ Seq + 1.
+
+normalize_trace_msg(TraceMsg) ->
+ case tuple_to_list(TraceMsg) of
+ [trace_ts, Pid, Type | Info] ->
+ {TraceInfo, [Timestamp]} = lists:split(length(Info)-1, Info),
+ {Type, Pid, Timestamp, couch_debug:process_name(Pid), process_info(Pid), TraceInfo};
+ [trace, Pid, Type | TraceInfo] ->
+ {Type, Pid, os:timestamp(), couch_debug:process_name(Pid), process_info(Pid), TraceInfo}
+ end.
diff --git a/src/couch_index/src/couch_index_compactor.erl b/src/couch_index/src/couch_index_compactor.erl
index 61f406c1a..8849cf67d 100644
--- a/src/couch_index/src/couch_index_compactor.erl
+++ b/src/couch_index/src/couch_index_compactor.erl
@@ -117,6 +117,8 @@ compact(Parent, Mod, IdxState) ->
compact(Idx, Mod, IdxState, Opts) ->
DbName = Mod:get(db_name, IdxState),
+ IndexName = Mod:get(idx_name, IdxState),
+ erlang:put(io_priority, {view_compact, DbName, IndexName}),
Args = [DbName, Mod:get(idx_name, IdxState)],
couch_log:info("Compaction started for db: ~s idx: ~s", Args),
{ok, NewIdxState} = couch_util:with_db(DbName, fun(Db) ->
diff --git a/src/couch_index/src/couch_index_updater.erl b/src/couch_index/src/couch_index_updater.erl
index 7864bde4d..fb15db052 100644
--- a/src/couch_index/src/couch_index_updater.erl
+++ b/src/couch_index/src/couch_index_updater.erl
@@ -128,6 +128,8 @@ code_change(_OldVsn, State, _Extra) ->
update(Idx, Mod, IdxState) ->
DbName = Mod:get(db_name, IdxState),
+ IndexName = Mod:get(idx_name, IdxState),
+ erlang:put(io_priority, {view_update, DbName, IndexName}),
CurrSeq = Mod:get(update_seq, IdxState),
UpdateOpts = Mod:get(update_options, IdxState),
CommittedOnly = lists:member(committed_only, UpdateOpts),
diff --git a/src/couch_mrview/src/couch_mrview_updater.erl b/src/couch_mrview/src/couch_mrview_updater.erl
index 9740e6a28..18657b468 100644
--- a/src/couch_mrview/src/couch_mrview_updater.erl
+++ b/src/couch_mrview/src/couch_mrview_updater.erl
@@ -25,7 +25,6 @@ start_update(Partial, State, NumChanges, NumChangesDone) ->
QueueOpts = [{max_size, MaxSize}, {max_items, MaxItems}],
{ok, DocQueue} = couch_work_queue:new(QueueOpts),
{ok, WriteQueue} = couch_work_queue:new(QueueOpts),
-
InitState = State#mrst{
first_build=State#mrst.update_seq==0,
partial_resp_pid=Partial,
@@ -37,6 +36,8 @@ start_update(Partial, State, NumChanges, NumChangesDone) ->
Self = self(),
MapFun = fun() ->
+ erlang:put(io_priority,
+ {view_update, State#mrst.db_name, State#mrst.idx_name}),
Progress = case NumChanges of
0 -> 0;
_ -> (NumChangesDone * 100) div NumChanges
@@ -53,8 +54,11 @@ start_update(Partial, State, NumChanges, NumChangesDone) ->
couch_task_status:set_update_frequency(500),
map_docs(Self, InitState)
end,
- WriteFun = fun() -> write_results(Self, InitState) end,
-
+ WriteFun = fun() ->
+ erlang:put(io_priority,
+ {view_update, State#mrst.db_name, State#mrst.idx_name}),
+ write_results(Self, InitState)
+ end,
spawn_link(MapFun),
spawn_link(WriteFun),
@@ -219,7 +223,6 @@ write_results(Parent, #mrst{db_name = DbName, idx_name = IdxName} = State) ->
stop ->
Parent ! {new_state, State};
{Go, {Seq, ViewKVs, DocIdKeys, Seqs, Log}} ->
- erlang:put(io_priority, {view_update, DbName, IdxName}),
NewState = write_kvs(State, Seq, ViewKVs, DocIdKeys, Seqs, Log),
if Go == stop ->
Parent ! {new_state, NewState};