diff options
author | Russell Branca <chewbranca@apache.org> | 2021-08-17 15:09:28 -0700 |
---|---|---|
committer | Russell Branca <chewbranca@apache.org> | 2021-08-17 15:09:28 -0700 |
commit | 3dc261da0e0ab0c1428be01e0520aff777f5f5bd (patch) | |
tree | 8126a0a1df68fa4c443b11e503af2d9255c9b534 | |
parent | 2cdfb8f967c0860ea2f04554757925731b78106e (diff) | |
download | couchdb-3dc261da0e0ab0c1428be01e0520aff777f5f5bd.tar.gz |
Rework cache and IOQ2 pid per couch_file
-rw-r--r-- | src/couch/src/couch_bt_engine.erl | 11 | ||||
-rw-r--r-- | src/couch/src/couch_db.erl | 2 | ||||
-rw-r--r-- | src/couch/src/couch_file.erl | 105 |
3 files changed, 65 insertions, 53 deletions
diff --git a/src/couch/src/couch_bt_engine.erl b/src/couch/src/couch_bt_engine.erl index 60f79c331..dc5bbfcbb 100644 --- a/src/couch/src/couch_bt_engine.erl +++ b/src/couch/src/couch_bt_engine.erl @@ -117,6 +117,7 @@ -include_lib("kernel/include/file.hrl"). -include_lib("couch/include/couch_db.hrl"). -include("couch_bt_engine.hrl"). +-include_lib("ioq/include/ioq.hrl"). exists(FilePath) -> @@ -192,8 +193,8 @@ handle_db_updater_info({'DOWN', Ref, _, _, _}, #st{fd_monitor=Ref} = St) -> {stop, normal, St#st{fd=undefined, fd_monitor=closed}}. -incref(St) -> - {ok, St#st{fd_monitor = erlang:monitor(process, St#st.fd)}}. +incref(#st{fd=#ioq_file{fd=Fd}}=St) -> + {ok, St#st{fd_monitor = erlang:monitor(process, Fd)}}. decref(St) -> @@ -201,8 +202,8 @@ decref(St) -> ok. -monitored_by(St) -> - case erlang:process_info(St#st.fd, monitored_by) of +monitored_by(#st{fd=#ioq_file{fd=Fd}}=St) -> + case erlang:process_info(Fd, monitored_by) of {monitored_by, Pids} -> lists:filter(fun is_pid/1, Pids); _ -> @@ -920,7 +921,7 @@ init_state(FilePath, Fd, Header0, Options) -> St = #st{ filepath = FilePath, fd = Fd, - fd_monitor = erlang:monitor(process, Fd), + fd_monitor = erlang:monitor(process, Fd#ioq_file.fd), header = Header, needs_commit = false, id_tree = IdTree, diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl index 8837101ec..aec0f5408 100644 --- a/src/couch/src/couch_db.erl +++ b/src/couch/src/couch_db.erl @@ -757,6 +757,8 @@ set_security(_, _) -> throw(bad_request). set_user_ctx(#db{} = Db, UserCtx) -> + %% TODO: + %% couch_db_engine:set_user_ctx(Db, UserCtx), {ok, Db#db{user_ctx = UserCtx}}. validate_security_object(SecProps) -> diff --git a/src/couch/src/couch_file.erl b/src/couch/src/couch_file.erl index b29c54dd2..675e6d60e 100644 --- a/src/couch/src/couch_file.erl +++ b/src/couch/src/couch_file.erl @@ -15,6 +15,7 @@ -vsn(2). -include_lib("couch/include/couch_db.hrl"). +-include_lib("ioq/include/ioq.hrl"). -define(INITIAL_WAIT, 60000). @@ -33,12 +34,13 @@ is_sys, eof = 0, db_monitor, - pread_limit = 0 + pread_limit = 0, + tab }). % public API -export([open/1, open/2, close/1, bytes/1, sync/1, truncate/2, set_db_pid/2]). --export([pread_term/2, pread_term/3, pread_iolist/2, pread_binary/2]). +-export([pread_term/2, pread_iolist/2, pread_binary/2]). -export([append_binary/2, append_binary_md5/2]). -export([append_raw_chunk/2, assemble_file_chunk/1, assemble_file_chunk/2]). -export([append_term/2, append_term/3, append_term_md5/2, append_term_md5/3]). @@ -69,7 +71,9 @@ open(Filepath, Options) -> case gen_server:start_link(couch_file, {Filepath, Options, self(), Ref = make_ref()}, []) of {ok, Fd} -> - {ok, Fd}; + {ok, IOQPid} = ioq_server2:start_link({by_shard, Filepath, Fd}), + Tab = gen_server:call(Fd, get_cache_ref), + {ok, #ioq_file{fd=Fd, ioq=IOQPid, tab=Tab}}; ignore -> % get the error receive @@ -94,7 +98,7 @@ open(Filepath, Options) -> end. -set_db_pid(Fd, Pid) -> +set_db_pid(#ioq_file{fd=Fd}, Pid) -> gen_server:call(Fd, {set_db_pid, Pid}). @@ -156,36 +160,20 @@ assemble_file_chunk(Bin, Md5) -> %%---------------------------------------------------------------------- pread_term(Fd, Pos) -> - UseCache = config:get_boolean("couchdb", "use_couch_file_cache", true), - pread_term(Fd, Pos, UseCache). - - -pread_term(Fd, Pos, true) -> - case erlang:get(couch_file_hash) of - undefined -> - pread_term(Fd, Pos, false); - _ -> - load_from_cache(Fd, Pos) - end; -pread_term(Fd, Pos, false) -> {ok, Bin} = pread_binary(Fd, Pos), {ok, couch_compress:decompress(Bin)}. %% TODO: add purpose docs -load_from_cache(Fd, Pos) -> - Hash = erlang:get(couch_file_hash), - case ets:lookup(Hash, Pos) of - [{Pos, {ok, Res}}] -> - {ok, Res}; +load_from_cache(#ioq_file{tab=undefined}, _Pos) -> + missing; +load_from_cache(#ioq_file{tab=Tab}, Pos) -> + case ets:lookup(Tab, Pos) of + [{Pos, Res}] -> + %% io:format("CACHE HIT: ~p{~p}~n", [Tab, Pos]), + Res; [] -> - %% TODO: don't repeat this, but avoid circular recursion - %% pread_term(Fd, Pos, false), - {ok, Bin} = pread_binary(Fd, Pos), - Val = {ok, couch_compress:decompress(Bin)}, - %% TODO: should probably be inserted directly by the gen_server - gen_server:cast(Fd, {cache, Pos, Val}), - Val + missing end. @@ -202,11 +190,16 @@ pread_binary(Fd, Pos) -> pread_iolist(Fd, Pos) -> - case ioq:call(Fd, {pread_iolist, Pos}, erlang:get(io_priority)) of + case load_from_cache(Fd, Pos) of {ok, IoList, Md5} -> {ok, verify_md5(Fd, Pos, IoList, Md5)}; - Error -> - Error + missing -> + case ioq:call(Fd, {pread_iolist, Pos}, erlang:get(io_priority)) of + {ok, IoList, Md5} -> + {ok, verify_md5(Fd, Pos, IoList, Md5)}; + Error -> + Error + end end. @@ -259,7 +252,7 @@ append_binaries(Fd, Bins) -> %%---------------------------------------------------------------------- % length in bytes -bytes(Fd) -> +bytes(#ioq_file{fd=Fd}) -> gen_server:call(Fd, bytes, infinity). %%---------------------------------------------------------------------- @@ -268,7 +261,7 @@ bytes(Fd) -> %% or {error, Reason}. %%---------------------------------------------------------------------- -truncate(Fd, Pos) -> +truncate(#ioq_file{fd=Fd}, Pos) -> gen_server:call(Fd, {truncate, Pos}, infinity). %%---------------------------------------------------------------------- @@ -293,7 +286,7 @@ sync(Filepath) when is_list(Filepath) -> {error, Error} -> erlang:error(Error) end; -sync(Fd) -> +sync(#ioq_file{fd=Fd}) -> case gen_server:call(Fd, sync, infinity) of ok -> ok; @@ -305,8 +298,10 @@ sync(Fd) -> %% Purpose: Close the file. %% Returns: ok %%---------------------------------------------------------------------- -close(Fd) -> - gen_server:call(Fd, close, infinity). +close(#ioq_file{fd=Fd, ioq=IOP}) -> + Res = gen_server:call(Fd, close, infinity), + gen_server:call(IOP, close, infinity), + Res. delete(RootDir, Filepath) -> @@ -423,7 +418,7 @@ init_status_error(ReturnPid, Ref, Error) -> ignore. -last_read(Fd) when is_pid(Fd) -> +last_read(#ioq_file{fd=Fd}) when is_pid(Fd) -> Now = os:timestamp(), couch_util:process_dict_get(Fd, read_timestamp, Now). @@ -435,9 +430,13 @@ init({Filepath, Options, ReturnPid, Ref}) -> Limit = get_pread_limit(), IsSys = lists:member(sys_db, Options), update_read_timestamp(), - Tab = list_to_atom(integer_to_list(mem3_hash:crc32(Filepath))), - erlang:put(couch_file_cache, Tab), - ets:new(Tab, [set, protected, named_table, {read_concurrency, true}]), + ShouldCache = config:get_boolean("couchdb", "couch_file_cache", true), + Tab = case ShouldCache of + true -> + ets:new(?MODULE, [set, protected, {read_concurrency, true}]); + false -> + undefined + end, case lists:member(create, Options) of true -> filelib:ensure_dir(Filepath), @@ -458,7 +457,7 @@ init({Filepath, Options, ReturnPid, Ref}) -> ok = file:sync(Fd), maybe_track_open_os_files(Options), erlang:send_after(?INITIAL_WAIT, self(), maybe_close), - {ok, #file{fd=Fd, is_sys=IsSys, pread_limit=Limit}}; + {ok, #file{fd=Fd, is_sys=IsSys, pread_limit=Limit, tab=Tab}}; false -> ok = file:close(Fd), init_status_error(ReturnPid, Ref, {error, eexist}) @@ -466,7 +465,7 @@ init({Filepath, Options, ReturnPid, Ref}) -> false -> maybe_track_open_os_files(Options), erlang:send_after(?INITIAL_WAIT, self(), maybe_close), - {ok, #file{fd=Fd, is_sys=IsSys, pread_limit=Limit}} + {ok, #file{fd=Fd, is_sys=IsSys, pread_limit=Limit, tab=Tab}} end; Error -> init_status_error(ReturnPid, Ref, Error) @@ -483,7 +482,7 @@ init({Filepath, Options, ReturnPid, Ref}) -> maybe_track_open_os_files(Options), {ok, Eof} = file:position(Fd, eof), erlang:send_after(?INITIAL_WAIT, self(), maybe_close), - {ok, #file{fd=Fd, eof=Eof, is_sys=IsSys, pread_limit=Limit}}; + {ok, #file{fd=Fd, eof=Eof, is_sys=IsSys, pread_limit=Limit, tab=Tab}}; Error -> init_status_error(ReturnPid, Ref, Error) end; @@ -522,15 +521,17 @@ handle_call(close, _From, #file{fd=Fd}=File) -> handle_call({pread_iolist, Pos}, _From, File) -> update_read_timestamp(), {LenIolist, NextPos} = read_raw_iolist_int(File, Pos, 4), - case iolist_to_binary(LenIolist) of + Resp = case iolist_to_binary(LenIolist) of <<1:1/integer,Len:31/integer>> -> % an MD5-prefixed term {Md5AndIoList, _} = read_raw_iolist_int(File, NextPos, Len+16), {Md5, IoList} = extract_md5(Md5AndIoList), - {reply, {ok, IoList, Md5}, File}; + {ok, IoList, Md5}; <<0:1/integer,Len:31/integer>> -> {Iolist, _} = read_raw_iolist_int(File, NextPos, Len), - {reply, {ok, Iolist, <<>>}, File} - end; + {ok, Iolist, <<>>} + end, + maybe_cache(File#file.tab, {Pos, Resp}), + {reply, Resp, File}; handle_call({pread_iolists, PosL}, _From, File) -> update_read_timestamp(), @@ -629,7 +630,10 @@ handle_call({write_header, Bin}, _From, #file{fd = Fd, eof = Pos} = File) -> end; handle_call(find_header, _From, #file{fd = Fd, eof = Pos} = File) -> - {reply, find_header(Fd, Pos div ?SIZE_BLOCK), File}. + {reply, find_header(Fd, Pos div ?SIZE_BLOCK), File}; + +handle_call(get_cache_ref, _From, #file{tab=Tab} = File) -> + {reply, Tab, File}. handle_cast({cache, Key, Val}, Fd) -> @@ -909,6 +913,11 @@ reset_eof(#file{} = File) -> {ok, Eof} = file:position(File#file.fd, eof), File#file{eof = Eof}. +maybe_cache(undefined, _Obj) -> + ok; +maybe_cache(Tab, Obj) -> + ets:insert(Tab, Obj). + -ifdef(TEST). -include_lib("couch/include/couch_eunit.hrl"). |