diff options
author | Russell Branca <chewbranca@apache.org> | 2019-03-06 19:31:04 +0000 |
---|---|---|
committer | Russell Branca <chewbranca@apache.org> | 2019-03-06 19:31:04 +0000 |
commit | b85c17d569ac6e631a02acc0eecf2bdc6090c67c (patch) | |
tree | f89d432b9f789cf011466562976d06a175badc6d | |
parent | 3b4cf711a13ac8afd41271118fca69f5c4310edf (diff) | |
download | couchdb-b85c17d569ac6e631a02acc0eecf2bdc6090c67c.tar.gz |
Ensure io_priority is explicitly set throughout CouchDB
22 files changed, 94 insertions, 5 deletions
diff --git a/src/chttpd/src/chttpd_auth_cache.erl b/src/chttpd/src/chttpd_auth_cache.erl index 638d8c748..17e448f51 100644 --- a/src/chttpd/src/chttpd_auth_cache.erl +++ b/src/chttpd/src/chttpd_auth_cache.erl @@ -139,7 +139,9 @@ spawn_changes(Since) -> Pid. listen_for_changes(Since) -> - ensure_auth_ddoc_exists(dbname(), <<"_design/_auth">>), + DbName = dbname(), + erlang:put(io_priority, {system, DbName}), + ensure_auth_ddoc_exists(DbName, <<"_design/_auth">>), CBFun = fun ?MODULE:changes_callback/2, Args = #changes_args{ feed = "continuous", @@ -147,7 +149,7 @@ listen_for_changes(Since) -> heartbeat = true, filter = {default, main_only} }, - fabric:changes(dbname(), CBFun, Since, Args). + fabric:changes(DbName, CBFun, Since, Args). changes_callback(waiting_for_updates, Acc) -> {ok, Acc}; diff --git a/src/couch/src/couch_auth_cache.erl b/src/couch/src/couch_auth_cache.erl index 157b0902e..3f40bab11 100644 --- a/src/couch/src/couch_auth_cache.erl +++ b/src/couch/src/couch_auth_cache.erl @@ -150,6 +150,7 @@ init(_) -> ?BY_USER = ets:new(?BY_USER, [set, protected, named_table]), ?BY_ATIME = ets:new(?BY_ATIME, [ordered_set, private, named_table]), AuthDbName = config:get("couch_httpd_auth", "authentication_db"), + erlang:put(io_priority, {system, AuthDbName}), process_flag(trap_exit, true), ok = config:listen_for_changes(?MODULE, nil), {ok, Listener} = couch_event:link_listener( diff --git a/src/couch/src/couch_doc.erl b/src/couch/src/couch_doc.erl index 4a49372c7..17d559708 100644 --- a/src/couch/src/couch_doc.erl +++ b/src/couch/src/couch_doc.erl @@ -433,6 +433,13 @@ len_doc_to_multi_part_stream(Boundary, JsonBytes, Atts, SendEncodedAtts) -> doc_to_multi_part_stream(Boundary, JsonBytes, Atts, WriteFun, SendEncodedAtts) -> + case erlang:get(io_priority) of + undefined -> + %% TODO: use proper dbname here + erlang:put(interactive, <<"asdf">>); + _ -> + ok + end, AttsToInclude = lists:filter(fun(Att)-> couch_att:fetch(data, Att) /= stub end, Atts), AttsDecoded = decode_attributes(AttsToInclude, SendEncodedAtts), AttFun = case SendEncodedAtts of diff --git a/src/couch/src/couch_httpd.erl b/src/couch/src/couch_httpd.erl index 3cdfc0ca3..eea98d533 100644 --- a/src/couch/src/couch_httpd.erl +++ b/src/couch/src/couch_httpd.erl @@ -229,6 +229,8 @@ handle_request(MochiReq, DefaultFun, UrlHandlers, DbUrlHandlers, handle_request_int(MochiReq, DefaultFun, UrlHandlers, DbUrlHandlers, DesignUrlHandlers) -> + %% TODO: find appropriate value for internal API + erlang:put(io_priority, {interactive, <<"asdf">>}), Begin = os:timestamp(), % for the path, use the raw path with the query string and fragment % removed, but URL quoting left intact diff --git a/src/couch/src/couch_httpd_db.erl b/src/couch/src/couch_httpd_db.erl index 6cfae9610..2dd2b107d 100644 --- a/src/couch/src/couch_httpd_db.erl +++ b/src/couch/src/couch_httpd_db.erl @@ -39,6 +39,8 @@ % Database request handlers handle_request(#httpd{path_parts=[DbName|RestParts],method=Method, db_url_handlers=DbUrlHandlers}=Req)-> + %% TODO: set proper io_priority value here + erlang:put(io_priority, {interactive, DbName}), case {Method, RestParts} of {'PUT', []} -> create_db_req(Req, DbName); diff --git a/src/couch/src/couch_httpd_multipart.erl b/src/couch/src/couch_httpd_multipart.erl index 33795a3a1..b2e1851ca 100644 --- a/src/couch/src/couch_httpd_multipart.erl +++ b/src/couch/src/couch_httpd_multipart.erl @@ -26,7 +26,9 @@ decode_multipart_stream(ContentType, DataFun, Ref) -> Parent = self(), NumMpWriters = num_mp_writers(), + Priority = erlang:get(io_priority), {Parser, ParserRef} = spawn_monitor(fun() -> + erlang:put(io_priority, Priority), ParentRef = erlang:monitor(process, Parent), put(mp_parent_ref, ParentRef), num_mp_writers(NumMpWriters), @@ -244,6 +246,14 @@ atts_to_mp([], _Boundary, WriteFun, _AttFun) -> WriteFun(<<"--">>); atts_to_mp([{Att, Name, Len, Type, Encoding} | RestAtts], Boundary, WriteFun, AttFun) -> + case erlang:get(io_priority) of + undefined -> + %% TODO: set proper io_priority + %% TODO: this case shouldn't happen + erlang:put(io_priority, {interactive, <<"asdf">>}); + _ -> + ok + end, LengthBin = list_to_binary(integer_to_list(Len)), % write headers WriteFun(<<"\r\nContent-Disposition: attachment; filename=\"", Name/binary, "\"">>), diff --git a/src/couch/src/couch_multidb_changes.erl b/src/couch/src/couch_multidb_changes.erl index b6a7873fb..95e0776bb 100644 --- a/src/couch/src/couch_multidb_changes.erl +++ b/src/couch/src/couch_multidb_changes.erl @@ -231,6 +231,7 @@ start_changes_reader(DbName, Since) -> changes_reader(Server, DbName, Since) -> + erlang:put(io_priority, {system, DbName}), {ok, Db} = couch_db:open_int(DbName, [?CTX, sys_db]), ChangesArgs = #changes_args{ include_docs = true, @@ -253,9 +254,10 @@ changes_reader_cb(_, _, Acc) -> scan_all_dbs(Server, DbSuffix) when is_pid(Server) -> + DbName = config:get("mem3", "shards_db", "_dbs"), + erlang:put(io_priority, {system, DbName}), ok = scan_local_db(Server, DbSuffix), - {ok, Db} = mem3_util:ensure_exists( - config:get("mem3", "shards_db", "_dbs")), + {ok, Db} = mem3_util:ensure_exists(DbName), ChangesFun = couch_changes:handle_changes(#changes_args{}, nil, Db, nil), ChangesFun({fun scan_changes_cb/3, {Server, DbSuffix, 1}}), couch_db:close(Db). diff --git a/src/couch_index/src/couch_index.erl b/src/couch_index/src/couch_index.erl index cae95779e..b593e4971 100644 --- a/src/couch_index/src/couch_index.erl +++ b/src/couch_index/src/couch_index.erl @@ -81,6 +81,7 @@ get_compactor_pid(Pid) -> init({Mod, IdxState}) -> DbName = Mod:get(db_name, IdxState), + erlang:put(io_priority, {system, DbName}), erlang:send_after(?CHECK_INTERVAL, self(), maybe_close), Resp = couch_util:with_db(DbName, fun(Db) -> case Mod:open(Db, IdxState) of diff --git a/src/couch_index/src/couch_index_compactor.erl b/src/couch_index/src/couch_index_compactor.erl index 61f406c1a..8849cf67d 100644 --- a/src/couch_index/src/couch_index_compactor.erl +++ b/src/couch_index/src/couch_index_compactor.erl @@ -117,6 +117,8 @@ compact(Parent, Mod, IdxState) -> compact(Idx, Mod, IdxState, Opts) -> DbName = Mod:get(db_name, IdxState), + IndexName = Mod:get(idx_name, IdxState), + erlang:put(io_priority, {view_compact, DbName, IndexName}), Args = [DbName, Mod:get(idx_name, IdxState)], couch_log:info("Compaction started for db: ~s idx: ~s", Args), {ok, NewIdxState} = couch_util:with_db(DbName, fun(Db) -> diff --git a/src/couch_index/src/couch_index_updater.erl b/src/couch_index/src/couch_index_updater.erl index 7864bde4d..fb15db052 100644 --- a/src/couch_index/src/couch_index_updater.erl +++ b/src/couch_index/src/couch_index_updater.erl @@ -128,6 +128,8 @@ code_change(_OldVsn, State, _Extra) -> update(Idx, Mod, IdxState) -> DbName = Mod:get(db_name, IdxState), + IndexName = Mod:get(idx_name, IdxState), + erlang:put(io_priority, {view_update, DbName, IndexName}), CurrSeq = Mod:get(update_seq, IdxState), UpdateOpts = Mod:get(update_options, IdxState), CommittedOnly = lists:member(committed_only, UpdateOpts), diff --git a/src/couch_mrview/src/couch_mrview_updater.erl b/src/couch_mrview/src/couch_mrview_updater.erl index dcc70e4c2..82d1f6599 100644 --- a/src/couch_mrview/src/couch_mrview_updater.erl +++ b/src/couch_mrview/src/couch_mrview_updater.erl @@ -225,6 +225,7 @@ map_docs(Parent, #mrst{db_name = DbName, idx_name = IdxName} = State0) -> write_results(Parent, #mrst{db_name = DbName, idx_name = IdxName} = State) -> + erlang:put(io_priority, {view_update, DbName, IdxName}), case accumulate_writes(State, State#mrst.write_queue, nil) of stop -> Parent ! {new_state, State}; diff --git a/src/couch_peruser/src/couch_peruser.erl b/src/couch_peruser/src/couch_peruser.erl index 886fb4f6e..08fc3d7a0 100644 --- a/src/couch_peruser/src/couch_peruser.erl +++ b/src/couch_peruser/src/couch_peruser.erl @@ -73,6 +73,7 @@ init_state() -> couch_log:debug("peruser: enabled on node ~p", [node()]), DbName = ?l2b(config:get( "couch_httpd_auth", "authentication_db", "_users")), + erlang:put(io_priority, {system, DbName}), DeleteDbs = config:get_boolean("couch_peruser", "delete_dbs", false), Q = config:get_integer("couch_peruser", "q", 1), Prefix = config:get("couch_peruser", "database_prefix", ?DEFAULT_USERDB_PREFIX), @@ -137,6 +138,7 @@ start_listening(#state{db_name=DbName, delete_dbs=DeleteDbs, -spec init_changes_handler(ChangesState :: #changes_state{}) -> ok. init_changes_handler(#changes_state{db_name=DbName} = ChangesState) -> + erlang:put(io_priority, {system, DbName}), % couch_log:debug("peruser: init_changes_handler() on DbName ~p", [DbName]), try {ok, Db} = couch_db:open_int(DbName, [?ADMIN_CTX, sys_db]), diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl b/src/couch_replicator/src/couch_replicator_api_wrap.erl index 44c290d33..825b70678 100644 --- a/src/couch_replicator/src/couch_replicator_api_wrap.erl +++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl @@ -216,6 +216,10 @@ ensure_full_commit(Db) -> get_missing_revs(#httpdb{} = Db, IdRevs) -> + case erlang:get(io_priority) of + undefined -> erlang:put(io_priority, {system, db_uri(Db)}); + _ -> ok + end, JsonBody = {[{Id, couch_doc:revs_to_strs(Revs)} || {Id, Revs} <- IdRevs]}, send_req( Db, @@ -234,6 +238,10 @@ get_missing_revs(#httpdb{} = Db, IdRevs) -> {ok, lists:map(ConvertToNativeFun, Props)} end); get_missing_revs(Db, IdRevs) -> + case erlang:get(io_priority) of + undefined -> erlang:put(io_priority, {system, couch_db:name(Db)}); + _ -> ok + end, couch_db:get_missing_revs(Db, IdRevs). @@ -250,6 +258,7 @@ open_doc_revs(#httpdb{} = HttpDb, Id, Revs, Options, Fun, Acc) -> Path = encode_doc_id(Id), QS = options_to_query_args(HttpDb, Path, [revs, {open_revs, Revs} | Options]), {Pid, Ref} = spawn_monitor(fun() -> + erlang:put(io_priority, {system, db_uri(HttpDb)}), Self = self(), Callback = fun (200, Headers, StreamDataFun) -> @@ -753,6 +762,8 @@ receive_docs(Streamer, UserFun, Ref, UserAcc) -> run_user_fun(UserFun, Arg, UserAcc, OldRef) -> {Pid, Ref} = spawn_monitor(fun() -> + %% TODO: find proper db name value here + erlang:put(io_priority, {system, <<"asdf">>}), try UserFun(Arg, UserAcc) of Resp -> exit({exit_ok, Resp}) diff --git a/src/couch_replicator/src/couch_replicator_changes_reader.erl b/src/couch_replicator/src/couch_replicator_changes_reader.erl index 2e4df5365..c0f2d7171 100644 --- a/src/couch_replicator/src/couch_replicator_changes_reader.erl +++ b/src/couch_replicator/src/couch_replicator_changes_reader.erl @@ -37,6 +37,7 @@ start_link(StartSeq, #httpdb{} = Db, ChangesQueue, Options) -> start_link(StartSeq, Db, ChangesQueue, Options) -> Parent = self(), {ok, spawn_link(fun() -> + erlang:put(io_priority, {interactive, couch_db:name(Db)}), ?MODULE:read_changes(Parent, StartSeq, Db, ChangesQueue, Options) end)}. diff --git a/src/couch_replicator/src/couch_replicator_docs.erl b/src/couch_replicator/src/couch_replicator_docs.erl index bbf9694d7..178586198 100644 --- a/src/couch_replicator/src/couch_replicator_docs.erl +++ b/src/couch_replicator/src/couch_replicator_docs.erl @@ -123,6 +123,10 @@ update_error(#rep{db_name = DbName, doc_id = DocId, id = RepId}, Error) -> -spec ensure_rep_db_exists() -> {ok, Db::any()}. ensure_rep_db_exists() -> + case erlang:get(io_priority) of + undefined -> erlang:put(io_priority, {system, ?REP_DB_NAME}); + _ -> ok + end, Db = case couch_db:open_int(?REP_DB_NAME, [?CTX, sys_db, nologifmissing]) of {ok, Db0} -> @@ -137,6 +141,10 @@ ensure_rep_db_exists() -> -spec ensure_rep_ddoc_exists(binary()) -> ok. ensure_rep_ddoc_exists(RepDb) -> + case erlang:get(io_priority) of + undefined -> erlang:put(io_priority, {system, ?REP_DB_NAME}); + _ -> ok + end, case mem3:belongs(RepDb, ?REP_DESIGN_DOC) of true -> ensure_rep_ddoc_exists(RepDb, ?REP_DESIGN_DOC); @@ -147,6 +155,10 @@ ensure_rep_ddoc_exists(RepDb) -> -spec ensure_rep_ddoc_exists(binary(), binary()) -> ok. ensure_rep_ddoc_exists(RepDb, DDocId) -> + case erlang:get(io_priority) of + undefined -> erlang:put(io_priority, {system, ?REP_DB_NAME}); + _ -> ok + end, case open_rep_doc(RepDb, DDocId) of {not_found, no_db_file} -> %% database was deleted. diff --git a/src/couch_replicator/src/couch_replicator_filters.erl b/src/couch_replicator/src/couch_replicator_filters.erl index 5668820d1..b8bec56cb 100644 --- a/src/couch_replicator/src/couch_replicator_filters.erl +++ b/src/couch_replicator/src/couch_replicator_filters.erl @@ -66,7 +66,9 @@ parse(Options) -> -spec fetch(binary(), binary(), binary(), #user_ctx{}) -> {ok, {[_]}} | {error, binary()}. fetch(DDocName, FilterName, Source, UserCtx) -> + Priority = erlang:get(io_priority), {Pid, Ref} = spawn_monitor(fun() -> + erlang:put(io_priority, Priority), try fetch_internal(DDocName, FilterName, Source, UserCtx) of Resp -> exit({exit_ok, Resp}) diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl index 412ff7d05..a05f80064 100644 --- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl +++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl @@ -109,6 +109,9 @@ init(InitArgs) -> do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) -> process_flag(trap_exit, true), + %% TODO: what value to put here + %% what is using this? + erlang:put(io_priority, {db_update, <<"asdf">>}), timer:sleep(startup_jitter()), diff --git a/src/couch_replicator/src/couch_replicator_worker.erl b/src/couch_replicator/src/couch_replicator_worker.erl index ec98fa0f3..8425ae9b9 100644 --- a/src/couch_replicator/src/couch_replicator_worker.erl +++ b/src/couch_replicator/src/couch_replicator_worker.erl @@ -76,6 +76,7 @@ start_link(Cp, #httpdb{} = Source, Target, ChangesManager, MaxConns) -> start_link(Cp, Source, Target, ChangesManager, _MaxConns) -> Pid = spawn_link(fun() -> erlang:put(last_stats_report, os:timestamp()), + erlang:put(io_priority, {interactive, couch_db:name(Source)}), queue_fetch_loop(Source, Target, Cp, Cp, ChangesManager) end), {ok, Pid}. @@ -84,7 +85,11 @@ start_link(Cp, Source, Target, ChangesManager, _MaxConns) -> init({Cp, Source, Target, ChangesManager, MaxConns}) -> process_flag(trap_exit, true), Parent = self(), + %% TODO: set proper dbname value here, extract from #httpdb{}/#db{} + Priority = {interactive, <<"asdf">>}, + erlang:put(io_priority, Priority), LoopPid = spawn_link(fun() -> + erlang:put(io_priority, Priority), queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager) end), erlang:put(last_stats_report, os:timestamp()), @@ -408,8 +413,10 @@ spawn_writer(Target, #batch{docs = DocList, size = Size}) -> ok end, Parent = self(), + Priority = erlang:get(io_priority), spawn_link( fun() -> + erlang:put(io_priority, Priority), Target2 = open_db(Target), Stats = flush_docs(Target2, DocList), close_db(Target2), diff --git a/src/global_changes/src/global_changes_server.erl b/src/global_changes/src/global_changes_server.erl index 7e3062586..5f66237da 100644 --- a/src/global_changes/src/global_changes_server.erl +++ b/src/global_changes/src/global_changes_server.erl @@ -60,6 +60,9 @@ init([]) -> UpdateDb = case UpdateDb0 of "false" -> false; _ -> true end, MaxWriteDelay = list_to_integer(MaxWriteDelay0), + GlobalChangesDbName = global_changes_util:get_dbname(), + erlang:put(io_priority, {db_update, GlobalChangesDbName}), + % Start our write triggers erlang:send_after(MaxWriteDelay, self(), flush_updates), @@ -68,7 +71,7 @@ init([]) -> pending_update_count=0, pending_updates=sets:new(), max_write_delay=MaxWriteDelay, - dbname=global_changes_util:get_dbname(), + dbname=GlobalChangesDbName, handler_ref=erlang:monitor(process, Handler) }, {ok, State}. diff --git a/src/mem3/src/mem3_nodes.erl b/src/mem3/src/mem3_nodes.erl index dd5be1a72..8fbe7c65e 100644 --- a/src/mem3/src/mem3_nodes.erl +++ b/src/mem3/src/mem3_nodes.erl @@ -41,6 +41,8 @@ get_node_info(Node, Key) -> end. init([]) -> + DbName = list_to_binary(config:get("mem3", "nodes_db", "_nodes")), + erlang:put(io_priority, {system, DbName}), ets:new(?MODULE, [named_table, {read_concurrency, true}]), UpdateSeq = initialize_nodelist(), {Pid, _} = spawn_monitor(fun() -> listen_for_changes(UpdateSeq) end), @@ -109,6 +111,7 @@ first_fold(#full_doc_info{id=Id}=DocInfo, Db) -> listen_for_changes(Since) -> DbName = config:get("mem3", "nodes_db", "_nodes"), + erlang:put(io_priority, {system, DbName}), {ok, Db} = mem3_util:ensure_exists(DbName), Args = #changes_args{ feed = "continuous", diff --git a/src/mem3/src/mem3_shards.erl b/src/mem3/src/mem3_shards.erl index 6afc22f57..5e34921f0 100644 --- a/src/mem3/src/mem3_shards.erl +++ b/src/mem3/src/mem3_shards.erl @@ -198,6 +198,8 @@ init([]) -> ok = config:listen_for_changes(?MODULE, nil), SizeList = config:get("mem3", "shard_cache_size", "25000"), WriteTimeout = config:get_integer("mem3", "shard_write_timeout", 1000), + DbName = config:get("mem3", "shards_db", "_dbs"), + erlang:put(io_priority, {system, DbName}), UpdateSeq = get_update_seq(), {ok, #st{ max_size = list_to_integer(SizeList), @@ -319,6 +321,7 @@ get_update_seq() -> listen_for_changes(Since) -> DbName = config:get("mem3", "shards_db", "_dbs"), + erlang:put(io_priority, {system, DbName}), {ok, Db} = mem3_util:ensure_exists(DbName), Args = #changes_args{ feed = "continuous", @@ -365,6 +368,10 @@ changes_callback(timeout, _) -> load_shards_from_disk(DbName) when is_binary(DbName) -> couch_stats:increment_counter([mem3, shard_cache, miss]), X = ?l2b(config:get("mem3", "shards_db", "_dbs")), + case erlang:get(io_priority) of + undefined -> erlang:put(io_priority, {system, X}); + _ -> ok + end, {ok, Db} = mem3_util:ensure_exists(X), try load_shards_from_db(Db, DbName) @@ -373,6 +380,10 @@ load_shards_from_disk(DbName) when is_binary(DbName) -> end. load_shards_from_db(ShardDb, DbName) -> + case erlang:get(io_priority) of + undefined -> erlang:put(io_priority, {system, couch_db:name(ShardDb)}); + _ -> ok + end, case couch_db:open_doc(ShardDb, DbName, [ejson_body]) of {ok, #doc{body = {Props}}} -> Seq = couch_db:get_update_seq(ShardDb), diff --git a/src/mem3/src/mem3_util.erl b/src/mem3/src/mem3_util.erl index b44ca2332..44647838d 100644 --- a/src/mem3/src/mem3_util.erl +++ b/src/mem3/src/mem3_util.erl @@ -85,6 +85,7 @@ write_db_doc(Doc) -> write_db_doc(DbName, Doc, true). write_db_doc(DbName, #doc{id=Id, body=Body} = Doc, ShouldMutate) -> + erlang:put(io_priority, {system, DbName}), {ok, Db} = couch_db:open(DbName, [?ADMIN_CTX]), try couch_db:open_doc(Db, Id, [ejson_body]) of {ok, #doc{body = Body}} -> @@ -111,6 +112,7 @@ delete_db_doc(DocId) -> delete_db_doc(DbName, DocId, true). delete_db_doc(DbName, DocId, ShouldMutate) -> + erlang:put(io_priority, {system, DbName}), {ok, Db} = couch_db:open(DbName, [?ADMIN_CTX]), {ok, Revs} = couch_db:open_doc_revs(Db, DocId, all, []), try [Doc#doc{deleted=true} || {ok, #doc{deleted=false}=Doc} <- Revs] of |