summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRussell Branca <chewbranca@apache.org>2021-08-17 15:09:28 -0700
committerRussell Branca <chewbranca@apache.org>2021-08-25 12:49:56 -0700
commit05f6b0672985c469a75cb54a887b2dab94d3a82d (patch)
treeb94b2e1ffc7b4a6dbd9fcf20a9d0f127e974f16c
parentb5bc42915faea0f6424d5b7d9a99bd9c6691ffb4 (diff)
downloadcouchdb-05f6b0672985c469a75cb54a887b2dab94d3a82d.tar.gz
Rework cache and IOQ2 pid per couch_file
-rw-r--r--src/couch/src/couch_bt_engine.erl11
-rw-r--r--src/couch/src/couch_db.erl2
-rw-r--r--src/couch/src/couch_file.erl105
3 files changed, 65 insertions, 53 deletions
diff --git a/src/couch/src/couch_bt_engine.erl b/src/couch/src/couch_bt_engine.erl
index 60f79c331..dc5bbfcbb 100644
--- a/src/couch/src/couch_bt_engine.erl
+++ b/src/couch/src/couch_bt_engine.erl
@@ -117,6 +117,7 @@
-include_lib("kernel/include/file.hrl").
-include_lib("couch/include/couch_db.hrl").
-include("couch_bt_engine.hrl").
+-include_lib("ioq/include/ioq.hrl").
exists(FilePath) ->
@@ -192,8 +193,8 @@ handle_db_updater_info({'DOWN', Ref, _, _, _}, #st{fd_monitor=Ref} = St) ->
{stop, normal, St#st{fd=undefined, fd_monitor=closed}}.
-incref(St) ->
- {ok, St#st{fd_monitor = erlang:monitor(process, St#st.fd)}}.
+incref(#st{fd=#ioq_file{fd=Fd}}=St) ->
+ {ok, St#st{fd_monitor = erlang:monitor(process, Fd)}}.
decref(St) ->
@@ -201,8 +202,8 @@ decref(St) ->
ok.
-monitored_by(St) ->
- case erlang:process_info(St#st.fd, monitored_by) of
+monitored_by(#st{fd=#ioq_file{fd=Fd}}=St) ->
+ case erlang:process_info(Fd, monitored_by) of
{monitored_by, Pids} ->
lists:filter(fun is_pid/1, Pids);
_ ->
@@ -920,7 +921,7 @@ init_state(FilePath, Fd, Header0, Options) ->
St = #st{
filepath = FilePath,
fd = Fd,
- fd_monitor = erlang:monitor(process, Fd),
+ fd_monitor = erlang:monitor(process, Fd#ioq_file.fd),
header = Header,
needs_commit = false,
id_tree = IdTree,
diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl
index 8837101ec..aec0f5408 100644
--- a/src/couch/src/couch_db.erl
+++ b/src/couch/src/couch_db.erl
@@ -757,6 +757,8 @@ set_security(_, _) ->
throw(bad_request).
set_user_ctx(#db{} = Db, UserCtx) ->
+ %% TODO:
+ %% couch_db_engine:set_user_ctx(Db, UserCtx),
{ok, Db#db{user_ctx = UserCtx}}.
validate_security_object(SecProps) ->
diff --git a/src/couch/src/couch_file.erl b/src/couch/src/couch_file.erl
index b29c54dd2..675e6d60e 100644
--- a/src/couch/src/couch_file.erl
+++ b/src/couch/src/couch_file.erl
@@ -15,6 +15,7 @@
-vsn(2).
-include_lib("couch/include/couch_db.hrl").
+-include_lib("ioq/include/ioq.hrl").
-define(INITIAL_WAIT, 60000).
@@ -33,12 +34,13 @@
is_sys,
eof = 0,
db_monitor,
- pread_limit = 0
+ pread_limit = 0,
+ tab
}).
% public API
-export([open/1, open/2, close/1, bytes/1, sync/1, truncate/2, set_db_pid/2]).
--export([pread_term/2, pread_term/3, pread_iolist/2, pread_binary/2]).
+-export([pread_term/2, pread_iolist/2, pread_binary/2]).
-export([append_binary/2, append_binary_md5/2]).
-export([append_raw_chunk/2, assemble_file_chunk/1, assemble_file_chunk/2]).
-export([append_term/2, append_term/3, append_term_md5/2, append_term_md5/3]).
@@ -69,7 +71,9 @@ open(Filepath, Options) ->
case gen_server:start_link(couch_file,
{Filepath, Options, self(), Ref = make_ref()}, []) of
{ok, Fd} ->
- {ok, Fd};
+ {ok, IOQPid} = ioq_server2:start_link({by_shard, Filepath, Fd}),
+ Tab = gen_server:call(Fd, get_cache_ref),
+ {ok, #ioq_file{fd=Fd, ioq=IOQPid, tab=Tab}};
ignore ->
% get the error
receive
@@ -94,7 +98,7 @@ open(Filepath, Options) ->
end.
-set_db_pid(Fd, Pid) ->
+set_db_pid(#ioq_file{fd=Fd}, Pid) ->
gen_server:call(Fd, {set_db_pid, Pid}).
@@ -156,36 +160,20 @@ assemble_file_chunk(Bin, Md5) ->
%%----------------------------------------------------------------------
pread_term(Fd, Pos) ->
- UseCache = config:get_boolean("couchdb", "use_couch_file_cache", true),
- pread_term(Fd, Pos, UseCache).
-
-
-pread_term(Fd, Pos, true) ->
- case erlang:get(couch_file_hash) of
- undefined ->
- pread_term(Fd, Pos, false);
- _ ->
- load_from_cache(Fd, Pos)
- end;
-pread_term(Fd, Pos, false) ->
{ok, Bin} = pread_binary(Fd, Pos),
{ok, couch_compress:decompress(Bin)}.
%% TODO: add purpose docs
-load_from_cache(Fd, Pos) ->
- Hash = erlang:get(couch_file_hash),
- case ets:lookup(Hash, Pos) of
- [{Pos, {ok, Res}}] ->
- {ok, Res};
+load_from_cache(#ioq_file{tab=undefined}, _Pos) ->
+ missing;
+load_from_cache(#ioq_file{tab=Tab}, Pos) ->
+ case ets:lookup(Tab, Pos) of
+ [{Pos, Res}] ->
+ %% io:format("CACHE HIT: ~p{~p}~n", [Tab, Pos]),
+ Res;
[] ->
- %% TODO: don't repeat this, but avoid circular recursion
- %% pread_term(Fd, Pos, false),
- {ok, Bin} = pread_binary(Fd, Pos),
- Val = {ok, couch_compress:decompress(Bin)},
- %% TODO: should probably be inserted directly by the gen_server
- gen_server:cast(Fd, {cache, Pos, Val}),
- Val
+ missing
end.
@@ -202,11 +190,16 @@ pread_binary(Fd, Pos) ->
pread_iolist(Fd, Pos) ->
- case ioq:call(Fd, {pread_iolist, Pos}, erlang:get(io_priority)) of
+ case load_from_cache(Fd, Pos) of
{ok, IoList, Md5} ->
{ok, verify_md5(Fd, Pos, IoList, Md5)};
- Error ->
- Error
+ missing ->
+ case ioq:call(Fd, {pread_iolist, Pos}, erlang:get(io_priority)) of
+ {ok, IoList, Md5} ->
+ {ok, verify_md5(Fd, Pos, IoList, Md5)};
+ Error ->
+ Error
+ end
end.
@@ -259,7 +252,7 @@ append_binaries(Fd, Bins) ->
%%----------------------------------------------------------------------
% length in bytes
-bytes(Fd) ->
+bytes(#ioq_file{fd=Fd}) ->
gen_server:call(Fd, bytes, infinity).
%%----------------------------------------------------------------------
@@ -268,7 +261,7 @@ bytes(Fd) ->
%% or {error, Reason}.
%%----------------------------------------------------------------------
-truncate(Fd, Pos) ->
+truncate(#ioq_file{fd=Fd}, Pos) ->
gen_server:call(Fd, {truncate, Pos}, infinity).
%%----------------------------------------------------------------------
@@ -293,7 +286,7 @@ sync(Filepath) when is_list(Filepath) ->
{error, Error} ->
erlang:error(Error)
end;
-sync(Fd) ->
+sync(#ioq_file{fd=Fd}) ->
case gen_server:call(Fd, sync, infinity) of
ok ->
ok;
@@ -305,8 +298,10 @@ sync(Fd) ->
%% Purpose: Close the file.
%% Returns: ok
%%----------------------------------------------------------------------
-close(Fd) ->
- gen_server:call(Fd, close, infinity).
+close(#ioq_file{fd=Fd, ioq=IOP}) ->
+ Res = gen_server:call(Fd, close, infinity),
+ gen_server:call(IOP, close, infinity),
+ Res.
delete(RootDir, Filepath) ->
@@ -423,7 +418,7 @@ init_status_error(ReturnPid, Ref, Error) ->
ignore.
-last_read(Fd) when is_pid(Fd) ->
+last_read(#ioq_file{fd=Fd}) when is_pid(Fd) ->
Now = os:timestamp(),
couch_util:process_dict_get(Fd, read_timestamp, Now).
@@ -435,9 +430,13 @@ init({Filepath, Options, ReturnPid, Ref}) ->
Limit = get_pread_limit(),
IsSys = lists:member(sys_db, Options),
update_read_timestamp(),
- Tab = list_to_atom(integer_to_list(mem3_hash:crc32(Filepath))),
- erlang:put(couch_file_cache, Tab),
- ets:new(Tab, [set, protected, named_table, {read_concurrency, true}]),
+ ShouldCache = config:get_boolean("couchdb", "couch_file_cache", true),
+ Tab = case ShouldCache of
+ true ->
+ ets:new(?MODULE, [set, protected, {read_concurrency, true}]);
+ false ->
+ undefined
+ end,
case lists:member(create, Options) of
true ->
filelib:ensure_dir(Filepath),
@@ -458,7 +457,7 @@ init({Filepath, Options, ReturnPid, Ref}) ->
ok = file:sync(Fd),
maybe_track_open_os_files(Options),
erlang:send_after(?INITIAL_WAIT, self(), maybe_close),
- {ok, #file{fd=Fd, is_sys=IsSys, pread_limit=Limit}};
+ {ok, #file{fd=Fd, is_sys=IsSys, pread_limit=Limit, tab=Tab}};
false ->
ok = file:close(Fd),
init_status_error(ReturnPid, Ref, {error, eexist})
@@ -466,7 +465,7 @@ init({Filepath, Options, ReturnPid, Ref}) ->
false ->
maybe_track_open_os_files(Options),
erlang:send_after(?INITIAL_WAIT, self(), maybe_close),
- {ok, #file{fd=Fd, is_sys=IsSys, pread_limit=Limit}}
+ {ok, #file{fd=Fd, is_sys=IsSys, pread_limit=Limit, tab=Tab}}
end;
Error ->
init_status_error(ReturnPid, Ref, Error)
@@ -483,7 +482,7 @@ init({Filepath, Options, ReturnPid, Ref}) ->
maybe_track_open_os_files(Options),
{ok, Eof} = file:position(Fd, eof),
erlang:send_after(?INITIAL_WAIT, self(), maybe_close),
- {ok, #file{fd=Fd, eof=Eof, is_sys=IsSys, pread_limit=Limit}};
+ {ok, #file{fd=Fd, eof=Eof, is_sys=IsSys, pread_limit=Limit, tab=Tab}};
Error ->
init_status_error(ReturnPid, Ref, Error)
end;
@@ -522,15 +521,17 @@ handle_call(close, _From, #file{fd=Fd}=File) ->
handle_call({pread_iolist, Pos}, _From, File) ->
update_read_timestamp(),
{LenIolist, NextPos} = read_raw_iolist_int(File, Pos, 4),
- case iolist_to_binary(LenIolist) of
+ Resp = case iolist_to_binary(LenIolist) of
<<1:1/integer,Len:31/integer>> -> % an MD5-prefixed term
{Md5AndIoList, _} = read_raw_iolist_int(File, NextPos, Len+16),
{Md5, IoList} = extract_md5(Md5AndIoList),
- {reply, {ok, IoList, Md5}, File};
+ {ok, IoList, Md5};
<<0:1/integer,Len:31/integer>> ->
{Iolist, _} = read_raw_iolist_int(File, NextPos, Len),
- {reply, {ok, Iolist, <<>>}, File}
- end;
+ {ok, Iolist, <<>>}
+ end,
+ maybe_cache(File#file.tab, {Pos, Resp}),
+ {reply, Resp, File};
handle_call({pread_iolists, PosL}, _From, File) ->
update_read_timestamp(),
@@ -629,7 +630,10 @@ handle_call({write_header, Bin}, _From, #file{fd = Fd, eof = Pos} = File) ->
end;
handle_call(find_header, _From, #file{fd = Fd, eof = Pos} = File) ->
- {reply, find_header(Fd, Pos div ?SIZE_BLOCK), File}.
+ {reply, find_header(Fd, Pos div ?SIZE_BLOCK), File};
+
+handle_call(get_cache_ref, _From, #file{tab=Tab} = File) ->
+ {reply, Tab, File}.
handle_cast({cache, Key, Val}, Fd) ->
@@ -909,6 +913,11 @@ reset_eof(#file{} = File) ->
{ok, Eof} = file:position(File#file.fd, eof),
File#file{eof = Eof}.
+maybe_cache(undefined, _Obj) ->
+ ok;
+maybe_cache(Tab, Obj) ->
+ ets:insert(Tab, Obj).
+
-ifdef(TEST).
-include_lib("couch/include/couch_eunit.hrl").