diff options
author | Russell Branca <chewbranca@apache.org> | 2021-08-12 14:32:56 -0700 |
---|---|---|
committer | Russell Branca <chewbranca@apache.org> | 2022-07-07 13:48:36 -0700 |
commit | 897fbd6076a89fc902163596945331aadc156a01 (patch) | |
tree | e4b3b7d31d525734746f09d9dba7543d8c787953 | |
parent | 29ac7853f203afec40adde85fe4fc2e0e230f565 (diff) | |
download | couchdb-897fbd6076a89fc902163596945331aadc156a01.tar.gz |
Add couch_file cache
Initial cherry-pick on top of main branch
-rw-r--r-- | src/couch/priv/stats_descriptions.cfg | 12 | ||||
-rw-r--r-- | src/couch/src/couch_bt_engine.erl | 68 | ||||
-rw-r--r-- | src/couch/src/couch_bt_engine_compactor.erl | 114 | ||||
-rw-r--r-- | src/couch/src/couch_bt_engine_stream.erl | 9 | ||||
-rw-r--r-- | src/couch/src/couch_db.erl | 6 | ||||
-rw-r--r-- | src/couch/src/couch_db_engine.erl | 16 | ||||
-rw-r--r-- | src/couch/src/couch_file.erl | 236 | ||||
-rw-r--r-- | src/couch/test/eunit/couch_btree_tests.erl | 11 | ||||
-rw-r--r-- | src/couch/test/eunit/couch_file_tests.erl | 67 | ||||
-rw-r--r-- | src/couch_mrview/src/couch_mrview_compactor.erl | 27 | ||||
-rw-r--r-- | src/couch_mrview/src/couch_mrview_index.erl | 5 | ||||
-rw-r--r-- | src/couch_mrview/src/couch_mrview_util.erl | 13 |
12 files changed, 377 insertions, 207 deletions
diff --git a/src/couch/priv/stats_descriptions.cfg b/src/couch/priv/stats_descriptions.cfg index 7c8fd94cb..57ee39f85 100644 --- a/src/couch/priv/stats_descriptions.cfg +++ b/src/couch/priv/stats_descriptions.cfg @@ -290,6 +290,18 @@ {type, histogram}, {desc, <<"duration of validate_doc_update function calls">>} ]}. +{[couchdb, couch_file, cache_misses], [ + {type, counter}, + {desc, <<"number of couch_file cache misses">>} +]}. +{[couchdb, couch_file, cache_hits], [ + {type, counter}, + {desc, <<"number of couch_file cache hits">>} +]}. +{[couchdb, couch_file, cache_opens], [ + {type, counter}, + {desc, <<"number of new couch_file caches opened">>} +]}. {[pread, exceed_eof], [ {type, counter}, {desc, <<"number of the attempts to read beyond end of db file">>} diff --git a/src/couch/src/couch_bt_engine.erl b/src/couch/src/couch_bt_engine.erl index 486ed7cb0..822ecc785 100644 --- a/src/couch/src/couch_bt_engine.erl +++ b/src/couch/src/couch_bt_engine.erl @@ -22,6 +22,7 @@ is_compacting/1, init/2, + init/3, terminate/2, handle_db_updater_call/2, handle_db_updater_info/2, @@ -32,6 +33,7 @@ last_activity/1, + get_fd_handle/1, get_compacted_seq/1, get_del_doc_count/1, get_disk_version/1, @@ -115,6 +117,7 @@ -include_lib("kernel/include/file.hrl"). -include_lib("couch/include/couch_db.hrl"). -include("couch_bt_engine.hrl"). +-include_lib("ioq/include/ioq.hrl"). exists(FilePath) -> case is_file(FilePath) of @@ -150,26 +153,29 @@ is_compacting(DbName) -> ). init(FilePath, Options) -> - {ok, Fd} = open_db_file(FilePath, Options), - Header = - case lists:member(create, Options) of - true -> - delete_compaction_files(FilePath), - Header0 = couch_bt_engine_header:new(), - Header1 = init_set_props(Fd, Header0, Options), - ok = couch_file:write_header(Fd, Header1), - Header1; - false -> - case couch_file:read_header(Fd) of - {ok, Header0} -> - Header0; - no_valid_header -> - delete_compaction_files(FilePath), - Header0 = couch_bt_engine_header:new(), - ok = couch_file:write_header(Fd, Header0), - Header0 - end - end, + init(FilePath, Options, undefined). + + +init(FilePath, Options, IOQPid) -> + {ok, Fd} = open_db_file(FilePath, Options, IOQPid), + Header = case lists:member(create, Options) of + true -> + delete_compaction_files(FilePath), + Header0 = couch_bt_engine_header:new(), + Header1 = init_set_props(Fd, Header0, Options), + ok = couch_file:write_header(Fd, Header1), + Header1; + false -> + case couch_file:read_header(Fd) of + {ok, Header0} -> + Header0; + no_valid_header -> + delete_compaction_files(FilePath), + Header0 = couch_bt_engine_header:new(), + ok = couch_file:write_header(Fd, Header0), + Header0 + end + end, {ok, init_state(FilePath, Fd, Header, Options)}. terminate(_Reason, St) -> @@ -197,15 +203,15 @@ handle_db_updater_call(Msg, St) -> handle_db_updater_info({'DOWN', Ref, _, _, _}, #st{fd_monitor = Ref} = St) -> {stop, normal, St#st{fd = undefined, fd_monitor = closed}}. -incref(St) -> - {ok, St#st{fd_monitor = erlang:monitor(process, St#st.fd)}}. +incref(#st{fd=#ioq_file{fd=Fd}}=St) -> + {ok, St#st{fd_monitor = erlang:monitor(process, Fd)}}. decref(St) -> true = erlang:demonitor(St#st.fd_monitor, [flush]), ok. -monitored_by(St) -> - case erlang:process_info(St#st.fd, monitored_by) of +monitored_by(#st{fd=#ioq_file{fd=Fd}}=St) -> + case erlang:process_info(Fd, monitored_by) of {monitored_by, Pids} -> lists:filter(fun is_pid/1, Pids); _ -> @@ -215,6 +221,9 @@ monitored_by(St) -> last_activity(#st{fd = Fd}) -> couch_file:last_read(Fd). +get_fd_handle(#st{fd = Fd}) -> + Fd. + get_compacted_seq(#st{header = Header}) -> couch_bt_engine_header:get(Header, compacted_seq). @@ -637,7 +646,8 @@ start_compaction(St, DbName, Options, Parent) -> {ok, St, Pid}. finish_compaction(OldState, DbName, Options, CompactFilePath) -> - {ok, NewState1} = ?MODULE:init(CompactFilePath, Options), + IOQPid = ioq:ioq_pid(OldState#st.fd), + {ok, NewState1} = ?MODULE:init(CompactFilePath, Options, IOQPid), OldSeq = get_update_seq(OldState), NewSeq = get_update_seq(NewState1), case OldSeq == NewSeq of @@ -827,8 +837,10 @@ copy_props(#st{header = Header} = St, Props) -> needs_commit = true }}. -open_db_file(FilePath, Options) -> - case couch_file:open(FilePath, Options) of +open_db_file(FilePath, Options, IOQPid) -> + Hash = list_to_atom(integer_to_list(mem3_hash:crc32(FilePath))), + erlang:put(couch_file_hash, Hash), + case couch_file:open(FilePath, Options, IOQPid) of {ok, Fd} -> {ok, Fd}; {error, enoent} -> @@ -899,7 +911,7 @@ init_state(FilePath, Fd, Header0, Options) -> St = #st{ filepath = FilePath, fd = Fd, - fd_monitor = erlang:monitor(process, Fd), + fd_monitor = erlang:monitor(process, Fd#ioq_file.fd), header = Header, needs_commit = false, id_tree = IdTree, diff --git a/src/couch/src/couch_bt_engine_compactor.erl b/src/couch/src/couch_bt_engine_compactor.erl index 8ed55b5c3..2e7917325 100644 --- a/src/couch/src/couch_bt_engine_compactor.erl +++ b/src/couch/src/couch_bt_engine_compactor.erl @@ -90,64 +90,64 @@ start(#st{} = St, DbName, Options, Parent) -> open_compaction_files(DbName, OldSt, Options) -> #st{ filepath = DbFilePath, - header = SrcHdr + header = SrcHdr, + fd = OldFd } = OldSt, + IOQPid = ioq:ioq_pid(OldFd), DataFile = DbFilePath ++ ".compact.data", MetaFile = DbFilePath ++ ".compact.meta", - {ok, DataFd, DataHdr} = open_compaction_file(DataFile), - {ok, MetaFd, MetaHdr} = open_compaction_file(MetaFile), + {ok, DataFd, DataHdr} = open_compaction_file(DataFile, IOQPid), + {ok, MetaFd, MetaHdr} = open_compaction_file(MetaFile, IOQPid), DataHdrIsDbHdr = couch_bt_engine_header:is_header(DataHdr), - CompSt = - case {DataHdr, MetaHdr} of - {#comp_header{} = A, #comp_header{} = A} -> - % We're restarting a compaction that did not finish - % before trying to swap out with the original db - DbHeader = A#comp_header.db_header, - St0 = couch_bt_engine:init_state( - DataFile, DataFd, DbHeader, Options - ), - St1 = bind_emsort(St0, MetaFd, A#comp_header.meta_st), - #comp_st{ - db_name = DbName, - old_st = OldSt, - new_st = St1, - meta_fd = MetaFd, - retry = St0#st.id_tree - }; - _ when DataHdrIsDbHdr -> - % We tried to swap out the compaction but there were - % writes to the database during compaction. Start - % a compaction retry. - Header = couch_bt_engine_header:from(SrcHdr), - ok = reset_compaction_file(MetaFd, Header), - St0 = couch_bt_engine:init_state( - DataFile, DataFd, DataHdr, Options - ), - St1 = bind_emsort(St0, MetaFd, nil), - #comp_st{ - db_name = DbName, - old_st = OldSt, - new_st = St1, - meta_fd = MetaFd, - retry = St0#st.id_tree - }; - _ -> - % We're starting a compaction from scratch - Header = couch_bt_engine_header:from(SrcHdr), - ok = reset_compaction_file(DataFd, Header), - ok = reset_compaction_file(MetaFd, Header), - St0 = couch_bt_engine:init_state(DataFile, DataFd, Header, Options), - St1 = bind_emsort(St0, MetaFd, nil), - #comp_st{ - db_name = DbName, - old_st = OldSt, - new_st = St1, - meta_fd = MetaFd, - retry = nil - } - end, - unlink(DataFd), - erlang:monitor(process, MetaFd), + CompSt = case {DataHdr, MetaHdr} of + {#comp_header{}=A, #comp_header{}=A} -> + % We're restarting a compaction that did not finish + % before trying to swap out with the original db + DbHeader = A#comp_header.db_header, + St0 = couch_bt_engine:init_state( + DataFile, DataFd, DbHeader, Options), + St1 = bind_emsort(St0, MetaFd, A#comp_header.meta_st), + #comp_st{ + db_name = DbName, + old_st = OldSt, + new_st = St1, + meta_fd = MetaFd, + retry = St0#st.id_tree + }; + _ when DataHdrIsDbHdr -> + % We tried to swap out the compaction but there were + % writes to the database during compaction. Start + % a compaction retry. + Header = couch_bt_engine_header:from(SrcHdr), + ok = reset_compaction_file(MetaFd, Header), + St0 = couch_bt_engine:init_state( + DataFile, DataFd, DataHdr, Options), + St1 = bind_emsort(St0, MetaFd, nil), + #comp_st{ + db_name = DbName, + old_st = OldSt, + new_st = St1, + meta_fd = MetaFd, + retry = St0#st.id_tree + }; + _ -> + % We're starting a compaction from scratch + Header = couch_bt_engine_header:from(SrcHdr), + ok = reset_compaction_file(DataFd, Header), + ok = reset_compaction_file(MetaFd, Header), + St0 = couch_bt_engine:init_state(DataFile, DataFd, Header, Options), + St1 = bind_emsort(St0, MetaFd, nil), + #comp_st{ + db_name = DbName, + old_st = OldSt, + new_st = St1, + meta_fd = MetaFd, + retry = nil + } + end, + unlink(ioq:fd_pid(DataFd)), + unlink(ioq:ioq_pid(DataFd)), + erlang:monitor(process, ioq:fd_pid(MetaFd)), {ok, CompSt}. copy_purge_info(#comp_st{} = CompSt) -> @@ -623,15 +623,15 @@ compact_final_sync(#comp_st{new_st = St0} = CompSt) -> new_st = St1 }. -open_compaction_file(FilePath) -> - case couch_file:open(FilePath, [nologifmissing]) of +open_compaction_file(FilePath, IOQPid) -> + case couch_file:open(FilePath, [nologifmissing], IOQPid) of {ok, Fd} -> case couch_file:read_header(Fd) of {ok, Header} -> {ok, Fd, Header}; no_valid_header -> {ok, Fd, nil} end; {error, enoent} -> - {ok, Fd} = couch_file:open(FilePath, [create]), + {ok, Fd} = couch_file:open(FilePath, [create], IOQPid), {ok, Fd, nil} end. diff --git a/src/couch/src/couch_bt_engine_stream.erl b/src/couch/src/couch_bt_engine_stream.erl index 253877e77..5f593cb25 100644 --- a/src/couch/src/couch_bt_engine_stream.erl +++ b/src/couch/src/couch_bt_engine_stream.erl @@ -20,6 +20,9 @@ to_disk_term/1 ]). +-include_lib("ioq/include/ioq.hrl"). + + foldl({_Fd, []}, _Fun, Acc) -> Acc; foldl({Fd, [{Pos, _} | Rest]}, Fun, Acc) -> @@ -49,9 +52,9 @@ seek({Fd, [Pos | Rest]}, Offset) when is_integer(Pos) -> {ok, {Fd, [Tail | Rest]}} end. -write({Fd, Written}, Data) when is_pid(Fd) -> - {ok, Pos, _} = couch_file:append_binary(Fd, Data), - {ok, {Fd, [{Pos, iolist_size(Data)} | Written]}}. +write({#ioq_file{fd=Fd}=IOF, Written}, Data) when is_pid(Fd) -> + {ok, Pos, _} = couch_file:append_binary(IOF, Data), + {ok, {IOF, [{Pos, iolist_size(Data)} | Written]}}. finalize({Fd, Written}) -> {ok, {Fd, lists:reverse(Written)}}. diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl index c7f1c8b5f..08d9d7426 100644 --- a/src/couch/src/couch_db.erl +++ b/src/couch/src/couch_db.erl @@ -46,6 +46,7 @@ get_epochs/1, get_filepath/1, get_instance_start_time/1, + get_fd_handle/1, get_pid/1, get_revs_limit/1, get_security/1, @@ -208,6 +209,9 @@ clustered_db(DbName, #user_ctx{} = UserCtx) -> clustered_db(DbName, UserCtx, SecProps) -> clustered_db(DbName, [{user_ctx, UserCtx}, {security, SecProps}]). +get_fd_handle(#db{} = Db) -> + couch_db_engine:get_fd_handle(Db). + is_db(#db{}) -> true; is_db(_) -> @@ -794,6 +798,8 @@ set_security(_, _) -> throw(bad_request). set_user_ctx(#db{} = Db, UserCtx) -> + %% TODO: + %% couch_db_engine:set_user_ctx(Db, UserCtx), {ok, Db#db{user_ctx = UserCtx}}. validate_security_object(SecProps) -> diff --git a/src/couch/src/couch_db_engine.erl b/src/couch/src/couch_db_engine.erl index 9e46b816b..aca62fea0 100644 --- a/src/couch/src/couch_db_engine.erl +++ b/src/couch/src/couch_db_engine.erl @@ -191,6 +191,10 @@ % All of the get_* functions may be called from many % processes concurrently. +% Get the FD handle, usually for #ioq_file{} +-callback get_fd_handle(DbHandle::db_handle()) -> any(). + + % The database should make a note of the update sequence when it % was last compacted. If the database doesn't need compacting it % can just hard code a return value of 0. @@ -676,6 +680,7 @@ last_activity/1, get_engine/1, + get_fd_handle/1, get_compacted_seq/1, get_del_doc_count/1, get_disk_version/1, @@ -799,6 +804,10 @@ get_engine(#db{} = Db) -> #db{engine = {Engine, _}} = Db, Engine. +get_fd_handle(#db{} = Db) -> + #db{engine = {Engine, EngineState}} = Db, + Engine:get_fd_handle(EngineState). + get_compacted_seq(#db{} = Db) -> #db{engine = {Engine, EngineState}} = Db, Engine:get_compacted_seq(EngineState). @@ -885,6 +894,13 @@ set_update_seq(#db{} = Db, UpdateSeq) -> {ok, Db#db{engine = {Engine, NewSt}}}. open_docs(#db{} = Db, DocIds) -> + case erlang:get(couch_file_hash) of + undefined -> + Hash = list_to_atom(integer_to_list(mem3_hash:crc32(Db#db.filepath))), + erlang:put(couch_file_hash, Hash); + _ -> + ok + end, #db{engine = {Engine, EngineState}} = Db, Engine:open_docs(EngineState, DocIds). diff --git a/src/couch/src/couch_file.erl b/src/couch/src/couch_file.erl index ba8d9c42f..9d556da58 100644 --- a/src/couch/src/couch_file.erl +++ b/src/couch/src/couch_file.erl @@ -15,6 +15,7 @@ -vsn(2). -include_lib("couch/include/couch_db.hrl"). +-include_lib("ioq/include/ioq.hrl"). -define(INITIAL_WAIT, 60000). -define(MONITOR_CHECK, 10000). @@ -33,11 +34,12 @@ is_sys, eof = 0, db_monitor, - pread_limit = 0 + pread_limit = 0, + tab }). % public API --export([open/1, open/2, close/1, bytes/1, sync/1, truncate/2, set_db_pid/2]). +-export([open/1, open/2, open/3, close/1, bytes/1, sync/1, truncate/2, set_db_pid/2]). -export([pread_term/2, pread_iolist/2, pread_binary/2]). -export([append_binary/2]). -export([append_raw_chunk/2, assemble_file_chunk/2]). @@ -66,46 +68,39 @@ open(Filepath) -> open(Filepath, []). open(Filepath, Options) -> - case - gen_server:start_link( - couch_file, - {Filepath, Options, self(), Ref = make_ref()}, - [] - ) - of - {ok, Fd} -> - {ok, Fd}; - ignore -> - % get the error - receive - {Ref, Pid, {error, Reason} = Error} -> - case process_info(self(), trap_exit) of - {trap_exit, true} -> - receive - {'EXIT', Pid, _} -> ok - end; - {trap_exit, false} -> - ok - end, - case {lists:member(nologifmissing, Options), Reason} of - {true, enoent} -> - ok; - _ -> - couch_log:error( - "Could not open file ~s: ~s", - [Filepath, file:format_error(Reason)] - ) - end, - Error - end; - Error -> - % We can't say much here, because it could be any kind of error. - % Just let it bubble and an encapsulating subcomponent can perhaps - % be more informative. It will likely appear in the SASL log, anyway. + open(Filepath, Options, undefined). + +open(Filepath, Options, IOQPid0) -> + case gen_server:start_link(couch_file, + {Filepath, Options, self(), Ref = make_ref()}, []) of + {ok, Fd} -> + IOQPid = case IOQPid0 of + undefined -> + {ok, IOQPid1} = ioq_server2:start_link({by_shard, Filepath, Fd}), + IOQPid1; + IOQPid0 when is_pid(IOQPid0) -> + IOQPid0 + end, + Tab = gen_server:call(Fd, get_cache_ref), + {ok, #ioq_file{fd=Fd, ioq=IOQPid, tab=Tab}}; + ignore -> + % get the error + receive + {Ref, Pid, {error, Reason} = Error} -> + case process_info(self(), trap_exit) of + {trap_exit, true} -> receive {'EXIT', Pid, _} -> ok end; + {trap_exit, false} -> ok + end, + case {lists:member(nologifmissing, Options), Reason} of + {true, enoent} -> ok; + _ -> + couch_log:error("Could not open file ~s: ~s", + [Filepath, file:format_error(Reason)]) + end, Error end. -set_db_pid(Fd, Pid) -> +set_db_pid(#ioq_file{fd=Fd}, Pid) -> gen_server:call(Fd, {set_db_pid, Pid}). %%---------------------------------------------------------------------- @@ -155,6 +150,18 @@ pread_term(Fd, Pos) -> {ok, Bin} = pread_binary(Fd, Pos), {ok, couch_compress:decompress(Bin)}. +%% TODO: add purpose docs +load_from_cache(#ioq_file{tab=undefined}, _Pos) -> + missing; +load_from_cache(#ioq_file{tab=Tab}, Pos) -> + case ets:lookup(Tab, Pos) of + [{Pos, Res}] -> + %% io:format("CACHE HIT: ~p{~p}~n", [Tab, Pos]), + Res; + [] -> + missing + end. + %%---------------------------------------------------------------------- %% Purpose: Reads a binrary from a file that was written with append_binary %% Args: Pos, the offset into the file where the term is serialized. @@ -167,11 +174,18 @@ pread_binary(Fd, Pos) -> {ok, iolist_to_binary(L)}. pread_iolist(Fd, Pos) -> - case ioq:call(Fd, {pread_iolist, Pos}, erlang:get(io_priority)) of + case load_from_cache(Fd, Pos) of {ok, IoList, Md5} -> + couch_stats:increment_counter([couchdb, couch_file, cache_hits]), {ok, verify_md5(Fd, Pos, IoList, Md5)}; - Error -> - Error + missing -> + couch_stats:increment_counter([couchdb, couch_file, cache_misses]), + case ioq:call(Fd, {pread_iolist, Pos}, erlang:get(io_priority)) of + {ok, IoList, Md5} -> + {ok, verify_md5(Fd, Pos, IoList, Md5)}; + Error -> + Error + end end. pread_terms(Fd, PosList) -> @@ -227,7 +241,7 @@ append_binaries(Fd, Bins) -> %%---------------------------------------------------------------------- % length in bytes -bytes(Fd) -> +bytes(#ioq_file{fd=Fd}) -> gen_server:call(Fd, bytes, infinity). %%---------------------------------------------------------------------- @@ -236,7 +250,7 @@ bytes(Fd) -> %% or {error, Reason}. %%---------------------------------------------------------------------- -truncate(Fd, Pos) -> +truncate(#ioq_file{fd=Fd}, Pos) -> gen_server:call(Fd, {truncate, Pos}, infinity). %%---------------------------------------------------------------------- @@ -261,7 +275,7 @@ sync(Filepath) when is_list(Filepath) -> {error, Error} -> erlang:error(Error) end; -sync(Fd) -> +sync(#ioq_file{fd=Fd}) -> case gen_server:call(Fd, sync, infinity) of ok -> ok; @@ -273,8 +287,10 @@ sync(Fd) -> %% Purpose: Close the file. %% Returns: ok %%---------------------------------------------------------------------- -close(Fd) -> - gen_server:call(Fd, close, infinity). +close(#ioq_file{fd=Fd, ioq=IOP}) -> + Res = gen_server:call(Fd, close, infinity), + gen_server:call(IOP, close, infinity), + Res. delete(RootDir, Filepath) -> delete(RootDir, Filepath, []). @@ -408,7 +424,7 @@ init_status_error(ReturnPid, Ref, Error) -> ReturnPid ! {Ref, self(), Error}, ignore. -last_read(Fd) when is_pid(Fd) -> +last_read(#ioq_file{fd=Fd}) when is_pid(Fd) -> Now = os:timestamp(), couch_util:process_dict_get(Fd, read_timestamp, Now). @@ -419,38 +435,62 @@ init({Filepath, Options, ReturnPid, Ref}) -> Limit = get_pread_limit(), IsSys = lists:member(sys_db, Options), update_read_timestamp(), - case lists:member(create, Options) of + ShouldCache = config:get_boolean("couchdb", "couch_file_cache", true), + Tab = case ShouldCache of true -> - filelib:ensure_dir(Filepath), + couch_stats:increment_counter([couchdb, couch_file, cache_opens]), + ets:new(?MODULE, [set, protected, {read_concurrency, true}]); + false -> + undefined + end, + case lists:member(create, Options) of + true -> + filelib:ensure_dir(Filepath), + case file:open(Filepath, OpenOptions) of + {ok, Fd} -> + %% Save Fd in process dictionary for debugging purposes + put(couch_file_fd, {Fd, Filepath}), + {ok, Length} = file:position(Fd, eof), + case Length > 0 of + true -> + % this means the file already exists and has data. + % FYI: We don't differentiate between empty files and non-existant + % files here. + case lists:member(overwrite, Options) of + true -> + {ok, 0} = file:position(Fd, 0), + ok = file:truncate(Fd), + ok = file:sync(Fd), + maybe_track_open_os_files(Options), + erlang:send_after(?INITIAL_WAIT, self(), maybe_close), + {ok, #file{fd=Fd, is_sys=IsSys, pread_limit=Limit, tab=Tab}}; + false -> + ok = file:close(Fd), + init_status_error(ReturnPid, Ref, {error, eexist}) + end; + false -> + maybe_track_open_os_files(Options), + erlang:send_after(?INITIAL_WAIT, self(), maybe_close), + {ok, #file{fd=Fd, is_sys=IsSys, pread_limit=Limit, tab=Tab}} + end; + Error -> + init_status_error(ReturnPid, Ref, Error) + end; + false -> + % open in read mode first, so we don't create the file if it doesn't exist. + case file:open(Filepath, [read, raw]) of + {ok, Fd_Read} -> case file:open(Filepath, OpenOptions) of {ok, Fd} -> - %% Save Fd in process dictionary for debugging purposes - put(couch_file_fd, {Fd, Filepath}), - {ok, Length} = file:position(Fd, eof), - case Length > 0 of - true -> - % this means the file already exists and has data. - % FYI: We don't differentiate between empty files and non-existant - % files here. - case lists:member(overwrite, Options) of - true -> - {ok, 0} = file:position(Fd, 0), - ok = file:truncate(Fd), - ok = file:sync(Fd), - maybe_track_open_os_files(Options), - erlang:send_after(?INITIAL_WAIT, self(), maybe_close), - {ok, #file{fd = Fd, is_sys = IsSys, pread_limit = Limit}}; - false -> - ok = file:close(Fd), - init_status_error(ReturnPid, Ref, {error, eexist}) - end; - false -> - maybe_track_open_os_files(Options), - erlang:send_after(?INITIAL_WAIT, self(), maybe_close), - {ok, #file{fd = Fd, is_sys = IsSys, pread_limit = Limit}} - end; - Error -> - init_status_error(ReturnPid, Ref, Error) + %% Save Fd in process dictionary for debugging purposes + put(couch_file_fd, {Fd, Filepath}), + ok = file:close(Fd_Read), + maybe_track_open_os_files(Options), + {ok, Eof} = file:position(Fd, eof), + erlang:send_after(?INITIAL_WAIT, self(), maybe_close), + {ok, #file{fd=Fd, eof=Eof, is_sys=IsSys, pread_limit=Limit, tab=Tab}}; + Error -> + init_status_error(ReturnPid, Ref, Error) end; false -> % open in read mode first, so we don't create the file if it doesn't exist. @@ -502,16 +542,18 @@ handle_call(close, _From, #file{fd = Fd} = File) -> handle_call({pread_iolist, Pos}, _From, File) -> update_read_timestamp(), {LenIolist, NextPos} = read_raw_iolist_int(File, Pos, 4), - case iolist_to_binary(LenIolist) of - % an MD5-prefixed term - <<1:1/integer, Len:31/integer>> -> - {Md5AndIoList, _} = read_raw_iolist_int(File, NextPos, Len + 16), - {Md5, IoList} = extract_md5(Md5AndIoList), - {reply, {ok, IoList, Md5}, File}; - <<0:1/integer, Len:31/integer>> -> - {Iolist, _} = read_raw_iolist_int(File, NextPos, Len), - {reply, {ok, Iolist, <<>>}, File} - end; + Resp = case iolist_to_binary(LenIolist) of + <<1:1/integer,Len:31/integer>> -> % an MD5-prefixed term + {Md5AndIoList, _} = read_raw_iolist_int(File, NextPos, Len+16), + {Md5, IoList} = extract_md5(Md5AndIoList), + {ok, IoList, Md5}; + <<0:1/integer,Len:31/integer>> -> + {Iolist, _} = read_raw_iolist_int(File, NextPos, Len), + {ok, Iolist, <<>>} + end, + maybe_cache(File#file.tab, {Pos, Resp}), + {reply, Resp, File}; + handle_call({pread_iolists, PosL}, _From, File) -> update_read_timestamp(), LocNums1 = [{Pos, 4} || Pos <- PosL], @@ -613,8 +655,17 @@ handle_call({write_header, Bin}, _From, #file{fd = Fd, eof = Pos} = File) -> {reply, Error, reset_eof(File)} end; handle_call(find_header, _From, #file{fd = Fd, eof = Pos} = File) -> - {reply, find_header(Fd, Pos div ?SIZE_BLOCK), File}. + {reply, find_header(Fd, Pos div ?SIZE_BLOCK), File}; +handle_call(get_cache_ref, _From, #file{tab=Tab} = File) -> + {reply, Tab, File}. + + +handle_cast({cache, Key, Val}, Fd) -> + %% TODO: should we skip if value exists? + Tab = erlang:get(couch_file_cache), + ets:insert(Tab, {Key, Val}), + {noreply, Fd}; handle_cast(close, Fd) -> {stop, normal, Fd}. @@ -881,7 +932,9 @@ is_idle(#file{is_sys = false}) -> -spec process_info(CouchFilePid :: pid()) -> {Fd :: pid() | tuple(), FilePath :: string()} | undefined. -process_info(Pid) -> +process_info(Pid) when is_pid(Pid) -> + couch_util:process_dict_get(Pid, couch_file_fd); +process_info(#ioq_file{fd=Pid}) -> couch_util:process_dict_get(Pid, couch_file_fd). update_read_timestamp() -> @@ -905,6 +958,11 @@ reset_eof(#file{} = File) -> {ok, Eof} = file:position(File#file.fd, eof), File#file{eof = Eof}. +maybe_cache(undefined, _Obj) -> + ok; +maybe_cache(Tab, Obj) -> + ets:insert(Tab, Obj). + -ifdef(TEST). -include_lib("couch/include/couch_eunit.hrl"). diff --git a/src/couch/test/eunit/couch_btree_tests.erl b/src/couch/test/eunit/couch_btree_tests.erl index 1c9ba7771..a66f19aab 100644 --- a/src/couch/test/eunit/couch_btree_tests.erl +++ b/src/couch/test/eunit/couch_btree_tests.erl @@ -14,6 +14,7 @@ -include_lib("couch/include/couch_eunit.hrl"). -include_lib("couch/include/couch_db.hrl"). +-include_lib("ioq/include/ioq.hrl"). -define(ROWS, 1000). % seconds @@ -47,6 +48,8 @@ setup_red() -> setup_red(_) -> setup_red(). +teardown(#ioq_file{}=Fd) -> + ok = couch_file:close(Fd); teardown(Fd) when is_pid(Fd) -> ok = couch_file:close(Fd); teardown({Fd, _}) -> @@ -77,6 +80,14 @@ red_test_funs() -> ]. btree_open_test_() -> + { + setup, + fun() -> test_util:start(?MODULE, [ioq]) end, + fun test_util:stop/1, + fun btree_should_open/0 + }. + +btree_should_open() -> {ok, Fd} = couch_file:open(?tempfile(), [create, overwrite]), {ok, Btree} = couch_btree:open(nil, Fd, [{compression, none}]), { diff --git a/src/couch/test/eunit/couch_file_tests.erl b/src/couch/test/eunit/couch_file_tests.erl index 1b54cd70e..37af7e1fc 100644 --- a/src/couch/test/eunit/couch_file_tests.erl +++ b/src/couch/test/eunit/couch_file_tests.erl @@ -13,6 +13,7 @@ -module(couch_file_tests). -include_lib("couch/include/couch_eunit.hrl"). +-include_lib("ioq/include/ioq.hrl"). -define(BLOCK_SIZE, 4096). -define(setup(F), {setup, fun setup/0, fun teardown/1, F}). @@ -23,11 +24,40 @@ setup() -> Fd. teardown(Fd) -> - case is_process_alive(Fd) of + case is_process_alive(ioq:fd_pid(Fd)) of true -> ok = couch_file:close(Fd); false -> ok end. +mock_server() -> + %%meck:new(config), + meck:expect(config, get, fun(Group) -> + [] + end), + meck:expect(config, get, fun(_,_) -> + undefined + end), + meck:expect(config, get, fun("ioq2", _, Default) -> + Default + end), + meck:expect(config, get, fun(_, _, Default) -> + Default + end), + %% meck:expect(config, get, fun(_, _, _) -> + %% undefined + %% end), + meck:expect(config, get_integer, fun("ioq2", _, Default) -> + Default + end), + meck:expect(config, get_boolean, fun("ioq2", _, Default) -> + Default + end). + + +unmock_server(_) -> + true = meck:validate(config), + ok = meck:unload(config). + open_close_test_() -> { "Test for proper file open and close", @@ -39,7 +69,7 @@ open_close_test_() -> should_return_enoent_if_missed(), should_ignore_invalid_flags_with_open(), ?setup(fun should_return_pid_on_file_open/1), - should_close_file_properly(), + ?setup(fun should_close_file_properly/0), ?setup(fun should_create_empty_new_files/1) ] } @@ -55,7 +85,7 @@ should_ignore_invalid_flags_with_open() -> ). should_return_pid_on_file_open(Fd) -> - ?_assert(is_pid(Fd)). + ?_assert(is_pid(ioq:fd_pid(Fd))). should_close_file_properly() -> {ok, Fd} = couch_file:open(?tempfile(), [create, overwrite]), @@ -139,9 +169,15 @@ should_not_read_beyond_eof(Fd) -> {ok, Io} = file:open(Filepath, [read, write, binary]), ok = file:pwrite(Io, Pos, <<0:1/integer, DoubleBin:31/integer>>), file:close(Io), - unlink(Fd), - ExpectedError = {badmatch, {'EXIT', {bad_return_value, {read_beyond_eof, Filepath}}}}, - ?_assertError(ExpectedError, couch_file:pread_binary(Fd, Pos)). + unlink(ioq:fd_pid(Fd)), + unlink(ioq:ioq_pid(Fd)), + %% ExpectedError = {badmatch, {'EXIT', {bad_return_value, + %% {read_beyond_eof, Filepath}}}}, + %%ExpectedError = {exit, {{bad_return_value, {read_beyond_eof Filepath,}}, _}, _}, + + ?_assertExit( + {{bad_return_value, {read_beyond_eof, Filepath}}, _}, + couch_file:pread_binary(Fd, Pos)). should_truncate(Fd) -> {ok, 0, _} = couch_file:append_term(Fd, foo), @@ -179,9 +215,19 @@ should_not_read_more_than_pread_limit(Fd) -> {_, Filepath} = couch_file:process_info(Fd), BigBin = list_to_binary(lists:duplicate(100000, 0)), {ok, Pos, _Size} = couch_file:append_binary(Fd, BigBin), - unlink(Fd), - ExpectedError = {badmatch, {'EXIT', {bad_return_value, {exceed_pread_limit, Filepath, 50000}}}}, - ?_assertError(ExpectedError, couch_file:pread_binary(Fd, Pos)). + unlink(ioq:fd_pid(Fd)), + unlink(ioq:ioq_pid(Fd)), + ExpectedError = {badmatch, {'EXIT', {bad_return_value, + {exceed_pread_limit, Filepath, 50000}}}}, + ?debugFmt("EXPECTED ERROR IS: ~p~n", [ExpectedError]), + %%?_assert(couch_file:pread_binary(Fd, Pos)). + %%try couch_file:pread_binary(Fd, Pos) + %%catch E:R:S -> ?debugFmt("GOT ERROR: ~p || ~p~n~p~n", [E,R,S]) + %%end, + %%?_assertError(ExpectedError, couch_file:pread_binary(Fd, Pos)). + ?_assertExit( + {{bad_return_value, {exceed_pread_limit, Filepath, 50000}}, _}, + couch_file:pread_binary(Fd, Pos)). header_test_() -> { @@ -540,7 +586,8 @@ fsync_error_test_() -> }. fsync_raises_errors() -> - Fd = spawn(fun() -> fake_fsync_fd() end), + FdPid = spawn(fun() -> fake_fsync_fd() end), + Fd = #ioq_file{fd=FdPid}, ?assertError({fsync_error, eio}, couch_file:sync(Fd)). fake_fsync_fd() -> diff --git a/src/couch_mrview/src/couch_mrview_compactor.erl b/src/couch_mrview/src/couch_mrview_compactor.erl index 28e5a9b3d..c42c27f67 100644 --- a/src/couch_mrview/src/couch_mrview_compactor.erl +++ b/src/couch_mrview/src/couch_mrview_compactor.erl @@ -14,6 +14,7 @@ -include_lib("couch/include/couch_db.hrl"). -include_lib("couch_mrview/include/couch_mrview.hrl"). +-include_lib("ioq/include/ioq.hrl"). -export([compact/3, swap_compacted/2, remove_compacted/1]). @@ -46,8 +47,9 @@ compact(State) -> erlang:put(io_priority, {view_compact, DbName, IdxName}), {EmptyState, NumDocIds} = couch_util:with_db(DbName, fun(Db) -> + IOQPid = ioq:ioq_pid(couch_db:get_fd_handle(Db)), CompactFName = couch_mrview_util:compaction_file(DbName, Sig), - {ok, Fd} = couch_mrview_util:open_file(CompactFName), + {ok, Fd} = couch_mrview_util:open_file(CompactFName, IOQPid), ESt = couch_mrview_util:reset_index(Db, Fd, State), {ok, Count} = couch_db:get_doc_count(Db), @@ -125,7 +127,7 @@ compact(State) -> lists:zip(Views, EmptyViews) ), - unlink(EmptyState#mrst.fd), + unlink(ioq:fd_pid(EmptyState#mrst.fd)), {ok, EmptyState#mrst{ id_btree = NewIdBtree, views = NewViews, @@ -139,7 +141,7 @@ recompact(#mrst{db_name = DbName, idx_name = IdxName}, 0) -> erlang:error({exceeded_recompact_retry_count, [{db_name, DbName}, {idx_name, IdxName}]}); recompact(State, RetryCount) -> Self = self(), - link(State#mrst.fd), + link(ioq:fd_pid(State#mrst.fd)), {Pid, Ref} = erlang:spawn_monitor(fun() -> couch_index_updater:update(Self, couch_mrview_index, State) end), @@ -151,10 +153,10 @@ recompact_loop(Pid, Ref, State, RetryCount) -> % We've made progress so reset RetryCount recompact_loop(Pid, Ref, State2, recompact_retry_count()); {'DOWN', Ref, _, _, {updated, Pid, State2}} -> - unlink(State#mrst.fd), + unlink(ioq:fd_pid(State#mrst.fd)), {ok, State2}; {'DOWN', Ref, _, _, Reason} -> - unlink(State#mrst.fd), + unlink(ioq:fd_pid(State#mrst.fd)), couch_log:warning("Error during recompaction: ~r", [Reason]), recompact(State, RetryCount - 1) end. @@ -239,8 +241,8 @@ swap_compacted(OldState, NewState) -> fd = NewFd } = NewState, - link(NewState#mrst.fd), - Ref = erlang:monitor(process, NewState#mrst.fd), + link(ioq:fd_pid(NewState#mrst.fd)), + Ref = erlang:monitor(process, ioq:fd_pid(NewState#mrst.fd)), RootDir = couch_index_util:root_dir(), IndexFName = couch_mrview_util:index_file(DbName, Sig), @@ -256,7 +258,7 @@ swap_compacted(OldState, NewState) -> ok = couch_file:delete(RootDir, IndexFName), ok = file:rename(CompactFName, IndexFName), - unlink(OldState#mrst.fd), + unlink(ioq:fd_pid(OldState#mrst.fd)), erlang:demonitor(OldState#mrst.fd_monitor, [flush]), {ok, NewState#mrst{fd_monitor = Ref}}. @@ -295,7 +297,7 @@ recompact_success_after_progress() -> timer:sleep(100), exit({updated, self(), State#mrst{update_seq = 2}}) end), - State = #mrst{fd = self(), update_seq = 0}, + State = #mrst{fd=#ioq_file{fd=self()}, update_seq=0}, ?assertEqual({ok, State#mrst{update_seq = 2}}, recompact(State)) end). @@ -309,9 +311,10 @@ recompact_exceeded_retry_count() -> end ), ok = meck:expect(couch_log, warning, fun(_, _) -> ok end), - State = #mrst{fd = self(), db_name = foo, idx_name = bar}, - ExpectedError = {exceeded_recompact_retry_count, [{db_name, foo}, {idx_name, bar}]}, - ?assertError(ExpectedError, recompact(State)) + State = #mrst{fd=#ioq_file{fd=self()}, db_name=foo, idx_name=bar}, + ExpectedError = {exceeded_recompact_retry_count, + [{db_name, foo}, {idx_name, bar}]}, + ?assertError(ExpectedError, recompact(State)) end). -endif. diff --git a/src/couch_mrview/src/couch_mrview_index.erl b/src/couch_mrview/src/couch_mrview_index.erl index 1bfdb2818..0f3428091 100644 --- a/src/couch_mrview/src/couch_mrview_index.erl +++ b/src/couch_mrview/src/couch_mrview_index.erl @@ -101,6 +101,7 @@ open(Db, State0) -> db_name = DbName, sig = Sig } = State = set_partitioned(Db, State0), + IOQPid = ioq:ioq_pid(couch_db:get_fd_handle(Db)), IndexFName = couch_mrview_util:index_file(DbName, Sig), % If we are upgrading from <= 2.x, we upgrade the view @@ -120,7 +121,7 @@ open(Db, State0) -> OldSig = couch_mrview_util:maybe_update_index_file(State), - case couch_mrview_util:open_file(IndexFName) of + case couch_mrview_util:open_file(IndexFName, IOQPid) of {ok, Fd} -> case couch_file:read_header(Fd) of % upgrade code for <= 2.x @@ -178,7 +179,7 @@ close(State) -> % outstanding queries are done. shutdown(State) -> erlang:demonitor(State#mrst.fd_monitor, [flush]), - unlink(State#mrst.fd). + unlink(ioq:fd_pid(State#mrst.fd)). delete(#mrst{db_name = DbName, sig = Sig} = State) -> couch_file:close(State#mrst.fd), diff --git a/src/couch_mrview/src/couch_mrview_util.erl b/src/couch_mrview/src/couch_mrview_util.erl index 9e3d292ed..f6ccd062f 100644 --- a/src/couch_mrview/src/couch_mrview_util.erl +++ b/src/couch_mrview/src/couch_mrview_util.erl @@ -17,7 +17,7 @@ -export([verify_view_filename/1, get_signature_from_filename/1]). -export([ddoc_to_mrst/2, init_state/4, reset_index/3]). -export([make_header/1]). --export([index_file/2, compaction_file/2, open_file/1]). +-export([index_file/2, compaction_file/2, open_file/2]). -export([delete_files/2, delete_index_file/2, delete_compaction_file/2]). -export([get_row_count/1, all_docs_reduce_to_count/1, reduce_to_count/1]). -export([all_docs_key_opts/1, all_docs_key_opts/2, key_opts/1, key_opts/2]). @@ -99,8 +99,8 @@ get_signature_from_filename(FileName) -> get_view(Db, DDoc, ViewName, Args0) -> case get_view_index_state(Db, DDoc, ViewName, Args0) of {ok, State, Args2} -> - Ref = erlang:monitor(process, State#mrst.fd), - #mrst{language = Lang, views = Views} = State, + Ref = erlang:monitor(process, ioq:fd_pid(State#mrst.fd)), + #mrst{language=Lang, views=Views} = State, {Type, View, Args3} = extract_view(Lang, Args2, ViewName, Views), check_range(Args3, view_cmp(View)), Sig = view_sig(Db, State, View, Args3), @@ -317,6 +317,7 @@ init_state(Db, Fd, State, Header) -> {ShouldCommit, State#mrst{ fd = Fd, fd_monitor = erlang:monitor(process, Fd), + fd_monitor=erlang:monitor(process, ioq:fd_pid(Fd)), update_seq = Seq, purge_seq = PurgeSeq, id_btree = IdBtree, @@ -791,10 +792,10 @@ compaction_file(DbName, Sig) -> FileName = couch_index_util:hexsig(Sig) ++ ".compact.view", couch_index_util:index_file(mrview, DbName, FileName). -open_file(FName) -> - case couch_file:open(FName, [nologifmissing]) of +open_file(FName, IOQPid) -> + case couch_file:open(FName, [nologifmissing], IOQPid) of {ok, Fd} -> {ok, Fd}; - {error, enoent} -> couch_file:open(FName, [create]); + {error, enoent} -> couch_file:open(FName, [create], IOQPid); Error -> Error end. |