summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRussell Branca <chewbranca@apache.org>2021-08-12 14:32:56 -0700
committerRussell Branca <chewbranca@apache.org>2022-07-07 13:48:36 -0700
commit897fbd6076a89fc902163596945331aadc156a01 (patch)
treee4b3b7d31d525734746f09d9dba7543d8c787953
parent29ac7853f203afec40adde85fe4fc2e0e230f565 (diff)
downloadcouchdb-897fbd6076a89fc902163596945331aadc156a01.tar.gz
Add couch_file cache
Initial cherry-pick on top of main branch
-rw-r--r--src/couch/priv/stats_descriptions.cfg12
-rw-r--r--src/couch/src/couch_bt_engine.erl68
-rw-r--r--src/couch/src/couch_bt_engine_compactor.erl114
-rw-r--r--src/couch/src/couch_bt_engine_stream.erl9
-rw-r--r--src/couch/src/couch_db.erl6
-rw-r--r--src/couch/src/couch_db_engine.erl16
-rw-r--r--src/couch/src/couch_file.erl236
-rw-r--r--src/couch/test/eunit/couch_btree_tests.erl11
-rw-r--r--src/couch/test/eunit/couch_file_tests.erl67
-rw-r--r--src/couch_mrview/src/couch_mrview_compactor.erl27
-rw-r--r--src/couch_mrview/src/couch_mrview_index.erl5
-rw-r--r--src/couch_mrview/src/couch_mrview_util.erl13
12 files changed, 377 insertions, 207 deletions
diff --git a/src/couch/priv/stats_descriptions.cfg b/src/couch/priv/stats_descriptions.cfg
index 7c8fd94cb..57ee39f85 100644
--- a/src/couch/priv/stats_descriptions.cfg
+++ b/src/couch/priv/stats_descriptions.cfg
@@ -290,6 +290,18 @@
{type, histogram},
{desc, <<"duration of validate_doc_update function calls">>}
]}.
+{[couchdb, couch_file, cache_misses], [
+ {type, counter},
+ {desc, <<"number of couch_file cache misses">>}
+]}.
+{[couchdb, couch_file, cache_hits], [
+ {type, counter},
+ {desc, <<"number of couch_file cache hits">>}
+]}.
+{[couchdb, couch_file, cache_opens], [
+ {type, counter},
+ {desc, <<"number of new couch_file caches opened">>}
+]}.
{[pread, exceed_eof], [
{type, counter},
{desc, <<"number of the attempts to read beyond end of db file">>}
diff --git a/src/couch/src/couch_bt_engine.erl b/src/couch/src/couch_bt_engine.erl
index 486ed7cb0..822ecc785 100644
--- a/src/couch/src/couch_bt_engine.erl
+++ b/src/couch/src/couch_bt_engine.erl
@@ -22,6 +22,7 @@
is_compacting/1,
init/2,
+ init/3,
terminate/2,
handle_db_updater_call/2,
handle_db_updater_info/2,
@@ -32,6 +33,7 @@
last_activity/1,
+ get_fd_handle/1,
get_compacted_seq/1,
get_del_doc_count/1,
get_disk_version/1,
@@ -115,6 +117,7 @@
-include_lib("kernel/include/file.hrl").
-include_lib("couch/include/couch_db.hrl").
-include("couch_bt_engine.hrl").
+-include_lib("ioq/include/ioq.hrl").
exists(FilePath) ->
case is_file(FilePath) of
@@ -150,26 +153,29 @@ is_compacting(DbName) ->
).
init(FilePath, Options) ->
- {ok, Fd} = open_db_file(FilePath, Options),
- Header =
- case lists:member(create, Options) of
- true ->
- delete_compaction_files(FilePath),
- Header0 = couch_bt_engine_header:new(),
- Header1 = init_set_props(Fd, Header0, Options),
- ok = couch_file:write_header(Fd, Header1),
- Header1;
- false ->
- case couch_file:read_header(Fd) of
- {ok, Header0} ->
- Header0;
- no_valid_header ->
- delete_compaction_files(FilePath),
- Header0 = couch_bt_engine_header:new(),
- ok = couch_file:write_header(Fd, Header0),
- Header0
- end
- end,
+ init(FilePath, Options, undefined).
+
+
+init(FilePath, Options, IOQPid) ->
+ {ok, Fd} = open_db_file(FilePath, Options, IOQPid),
+ Header = case lists:member(create, Options) of
+ true ->
+ delete_compaction_files(FilePath),
+ Header0 = couch_bt_engine_header:new(),
+ Header1 = init_set_props(Fd, Header0, Options),
+ ok = couch_file:write_header(Fd, Header1),
+ Header1;
+ false ->
+ case couch_file:read_header(Fd) of
+ {ok, Header0} ->
+ Header0;
+ no_valid_header ->
+ delete_compaction_files(FilePath),
+ Header0 = couch_bt_engine_header:new(),
+ ok = couch_file:write_header(Fd, Header0),
+ Header0
+ end
+ end,
{ok, init_state(FilePath, Fd, Header, Options)}.
terminate(_Reason, St) ->
@@ -197,15 +203,15 @@ handle_db_updater_call(Msg, St) ->
handle_db_updater_info({'DOWN', Ref, _, _, _}, #st{fd_monitor = Ref} = St) ->
{stop, normal, St#st{fd = undefined, fd_monitor = closed}}.
-incref(St) ->
- {ok, St#st{fd_monitor = erlang:monitor(process, St#st.fd)}}.
+incref(#st{fd=#ioq_file{fd=Fd}}=St) ->
+ {ok, St#st{fd_monitor = erlang:monitor(process, Fd)}}.
decref(St) ->
true = erlang:demonitor(St#st.fd_monitor, [flush]),
ok.
-monitored_by(St) ->
- case erlang:process_info(St#st.fd, monitored_by) of
+monitored_by(#st{fd=#ioq_file{fd=Fd}}=St) ->
+ case erlang:process_info(Fd, monitored_by) of
{monitored_by, Pids} ->
lists:filter(fun is_pid/1, Pids);
_ ->
@@ -215,6 +221,9 @@ monitored_by(St) ->
last_activity(#st{fd = Fd}) ->
couch_file:last_read(Fd).
+get_fd_handle(#st{fd = Fd}) ->
+ Fd.
+
get_compacted_seq(#st{header = Header}) ->
couch_bt_engine_header:get(Header, compacted_seq).
@@ -637,7 +646,8 @@ start_compaction(St, DbName, Options, Parent) ->
{ok, St, Pid}.
finish_compaction(OldState, DbName, Options, CompactFilePath) ->
- {ok, NewState1} = ?MODULE:init(CompactFilePath, Options),
+ IOQPid = ioq:ioq_pid(OldState#st.fd),
+ {ok, NewState1} = ?MODULE:init(CompactFilePath, Options, IOQPid),
OldSeq = get_update_seq(OldState),
NewSeq = get_update_seq(NewState1),
case OldSeq == NewSeq of
@@ -827,8 +837,10 @@ copy_props(#st{header = Header} = St, Props) ->
needs_commit = true
}}.
-open_db_file(FilePath, Options) ->
- case couch_file:open(FilePath, Options) of
+open_db_file(FilePath, Options, IOQPid) ->
+ Hash = list_to_atom(integer_to_list(mem3_hash:crc32(FilePath))),
+ erlang:put(couch_file_hash, Hash),
+ case couch_file:open(FilePath, Options, IOQPid) of
{ok, Fd} ->
{ok, Fd};
{error, enoent} ->
@@ -899,7 +911,7 @@ init_state(FilePath, Fd, Header0, Options) ->
St = #st{
filepath = FilePath,
fd = Fd,
- fd_monitor = erlang:monitor(process, Fd),
+ fd_monitor = erlang:monitor(process, Fd#ioq_file.fd),
header = Header,
needs_commit = false,
id_tree = IdTree,
diff --git a/src/couch/src/couch_bt_engine_compactor.erl b/src/couch/src/couch_bt_engine_compactor.erl
index 8ed55b5c3..2e7917325 100644
--- a/src/couch/src/couch_bt_engine_compactor.erl
+++ b/src/couch/src/couch_bt_engine_compactor.erl
@@ -90,64 +90,64 @@ start(#st{} = St, DbName, Options, Parent) ->
open_compaction_files(DbName, OldSt, Options) ->
#st{
filepath = DbFilePath,
- header = SrcHdr
+ header = SrcHdr,
+ fd = OldFd
} = OldSt,
+ IOQPid = ioq:ioq_pid(OldFd),
DataFile = DbFilePath ++ ".compact.data",
MetaFile = DbFilePath ++ ".compact.meta",
- {ok, DataFd, DataHdr} = open_compaction_file(DataFile),
- {ok, MetaFd, MetaHdr} = open_compaction_file(MetaFile),
+ {ok, DataFd, DataHdr} = open_compaction_file(DataFile, IOQPid),
+ {ok, MetaFd, MetaHdr} = open_compaction_file(MetaFile, IOQPid),
DataHdrIsDbHdr = couch_bt_engine_header:is_header(DataHdr),
- CompSt =
- case {DataHdr, MetaHdr} of
- {#comp_header{} = A, #comp_header{} = A} ->
- % We're restarting a compaction that did not finish
- % before trying to swap out with the original db
- DbHeader = A#comp_header.db_header,
- St0 = couch_bt_engine:init_state(
- DataFile, DataFd, DbHeader, Options
- ),
- St1 = bind_emsort(St0, MetaFd, A#comp_header.meta_st),
- #comp_st{
- db_name = DbName,
- old_st = OldSt,
- new_st = St1,
- meta_fd = MetaFd,
- retry = St0#st.id_tree
- };
- _ when DataHdrIsDbHdr ->
- % We tried to swap out the compaction but there were
- % writes to the database during compaction. Start
- % a compaction retry.
- Header = couch_bt_engine_header:from(SrcHdr),
- ok = reset_compaction_file(MetaFd, Header),
- St0 = couch_bt_engine:init_state(
- DataFile, DataFd, DataHdr, Options
- ),
- St1 = bind_emsort(St0, MetaFd, nil),
- #comp_st{
- db_name = DbName,
- old_st = OldSt,
- new_st = St1,
- meta_fd = MetaFd,
- retry = St0#st.id_tree
- };
- _ ->
- % We're starting a compaction from scratch
- Header = couch_bt_engine_header:from(SrcHdr),
- ok = reset_compaction_file(DataFd, Header),
- ok = reset_compaction_file(MetaFd, Header),
- St0 = couch_bt_engine:init_state(DataFile, DataFd, Header, Options),
- St1 = bind_emsort(St0, MetaFd, nil),
- #comp_st{
- db_name = DbName,
- old_st = OldSt,
- new_st = St1,
- meta_fd = MetaFd,
- retry = nil
- }
- end,
- unlink(DataFd),
- erlang:monitor(process, MetaFd),
+ CompSt = case {DataHdr, MetaHdr} of
+ {#comp_header{}=A, #comp_header{}=A} ->
+ % We're restarting a compaction that did not finish
+ % before trying to swap out with the original db
+ DbHeader = A#comp_header.db_header,
+ St0 = couch_bt_engine:init_state(
+ DataFile, DataFd, DbHeader, Options),
+ St1 = bind_emsort(St0, MetaFd, A#comp_header.meta_st),
+ #comp_st{
+ db_name = DbName,
+ old_st = OldSt,
+ new_st = St1,
+ meta_fd = MetaFd,
+ retry = St0#st.id_tree
+ };
+ _ when DataHdrIsDbHdr ->
+ % We tried to swap out the compaction but there were
+ % writes to the database during compaction. Start
+ % a compaction retry.
+ Header = couch_bt_engine_header:from(SrcHdr),
+ ok = reset_compaction_file(MetaFd, Header),
+ St0 = couch_bt_engine:init_state(
+ DataFile, DataFd, DataHdr, Options),
+ St1 = bind_emsort(St0, MetaFd, nil),
+ #comp_st{
+ db_name = DbName,
+ old_st = OldSt,
+ new_st = St1,
+ meta_fd = MetaFd,
+ retry = St0#st.id_tree
+ };
+ _ ->
+ % We're starting a compaction from scratch
+ Header = couch_bt_engine_header:from(SrcHdr),
+ ok = reset_compaction_file(DataFd, Header),
+ ok = reset_compaction_file(MetaFd, Header),
+ St0 = couch_bt_engine:init_state(DataFile, DataFd, Header, Options),
+ St1 = bind_emsort(St0, MetaFd, nil),
+ #comp_st{
+ db_name = DbName,
+ old_st = OldSt,
+ new_st = St1,
+ meta_fd = MetaFd,
+ retry = nil
+ }
+ end,
+ unlink(ioq:fd_pid(DataFd)),
+ unlink(ioq:ioq_pid(DataFd)),
+ erlang:monitor(process, ioq:fd_pid(MetaFd)),
{ok, CompSt}.
copy_purge_info(#comp_st{} = CompSt) ->
@@ -623,15 +623,15 @@ compact_final_sync(#comp_st{new_st = St0} = CompSt) ->
new_st = St1
}.
-open_compaction_file(FilePath) ->
- case couch_file:open(FilePath, [nologifmissing]) of
+open_compaction_file(FilePath, IOQPid) ->
+ case couch_file:open(FilePath, [nologifmissing], IOQPid) of
{ok, Fd} ->
case couch_file:read_header(Fd) of
{ok, Header} -> {ok, Fd, Header};
no_valid_header -> {ok, Fd, nil}
end;
{error, enoent} ->
- {ok, Fd} = couch_file:open(FilePath, [create]),
+ {ok, Fd} = couch_file:open(FilePath, [create], IOQPid),
{ok, Fd, nil}
end.
diff --git a/src/couch/src/couch_bt_engine_stream.erl b/src/couch/src/couch_bt_engine_stream.erl
index 253877e77..5f593cb25 100644
--- a/src/couch/src/couch_bt_engine_stream.erl
+++ b/src/couch/src/couch_bt_engine_stream.erl
@@ -20,6 +20,9 @@
to_disk_term/1
]).
+-include_lib("ioq/include/ioq.hrl").
+
+
foldl({_Fd, []}, _Fun, Acc) ->
Acc;
foldl({Fd, [{Pos, _} | Rest]}, Fun, Acc) ->
@@ -49,9 +52,9 @@ seek({Fd, [Pos | Rest]}, Offset) when is_integer(Pos) ->
{ok, {Fd, [Tail | Rest]}}
end.
-write({Fd, Written}, Data) when is_pid(Fd) ->
- {ok, Pos, _} = couch_file:append_binary(Fd, Data),
- {ok, {Fd, [{Pos, iolist_size(Data)} | Written]}}.
+write({#ioq_file{fd=Fd}=IOF, Written}, Data) when is_pid(Fd) ->
+ {ok, Pos, _} = couch_file:append_binary(IOF, Data),
+ {ok, {IOF, [{Pos, iolist_size(Data)} | Written]}}.
finalize({Fd, Written}) ->
{ok, {Fd, lists:reverse(Written)}}.
diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl
index c7f1c8b5f..08d9d7426 100644
--- a/src/couch/src/couch_db.erl
+++ b/src/couch/src/couch_db.erl
@@ -46,6 +46,7 @@
get_epochs/1,
get_filepath/1,
get_instance_start_time/1,
+ get_fd_handle/1,
get_pid/1,
get_revs_limit/1,
get_security/1,
@@ -208,6 +209,9 @@ clustered_db(DbName, #user_ctx{} = UserCtx) ->
clustered_db(DbName, UserCtx, SecProps) ->
clustered_db(DbName, [{user_ctx, UserCtx}, {security, SecProps}]).
+get_fd_handle(#db{} = Db) ->
+ couch_db_engine:get_fd_handle(Db).
+
is_db(#db{}) ->
true;
is_db(_) ->
@@ -794,6 +798,8 @@ set_security(_, _) ->
throw(bad_request).
set_user_ctx(#db{} = Db, UserCtx) ->
+ %% TODO:
+ %% couch_db_engine:set_user_ctx(Db, UserCtx),
{ok, Db#db{user_ctx = UserCtx}}.
validate_security_object(SecProps) ->
diff --git a/src/couch/src/couch_db_engine.erl b/src/couch/src/couch_db_engine.erl
index 9e46b816b..aca62fea0 100644
--- a/src/couch/src/couch_db_engine.erl
+++ b/src/couch/src/couch_db_engine.erl
@@ -191,6 +191,10 @@
% All of the get_* functions may be called from many
% processes concurrently.
+% Get the FD handle, usually for #ioq_file{}
+-callback get_fd_handle(DbHandle::db_handle()) -> any().
+
+
% The database should make a note of the update sequence when it
% was last compacted. If the database doesn't need compacting it
% can just hard code a return value of 0.
@@ -676,6 +680,7 @@
last_activity/1,
get_engine/1,
+ get_fd_handle/1,
get_compacted_seq/1,
get_del_doc_count/1,
get_disk_version/1,
@@ -799,6 +804,10 @@ get_engine(#db{} = Db) ->
#db{engine = {Engine, _}} = Db,
Engine.
+get_fd_handle(#db{} = Db) ->
+ #db{engine = {Engine, EngineState}} = Db,
+ Engine:get_fd_handle(EngineState).
+
get_compacted_seq(#db{} = Db) ->
#db{engine = {Engine, EngineState}} = Db,
Engine:get_compacted_seq(EngineState).
@@ -885,6 +894,13 @@ set_update_seq(#db{} = Db, UpdateSeq) ->
{ok, Db#db{engine = {Engine, NewSt}}}.
open_docs(#db{} = Db, DocIds) ->
+ case erlang:get(couch_file_hash) of
+ undefined ->
+ Hash = list_to_atom(integer_to_list(mem3_hash:crc32(Db#db.filepath))),
+ erlang:put(couch_file_hash, Hash);
+ _ ->
+ ok
+ end,
#db{engine = {Engine, EngineState}} = Db,
Engine:open_docs(EngineState, DocIds).
diff --git a/src/couch/src/couch_file.erl b/src/couch/src/couch_file.erl
index ba8d9c42f..9d556da58 100644
--- a/src/couch/src/couch_file.erl
+++ b/src/couch/src/couch_file.erl
@@ -15,6 +15,7 @@
-vsn(2).
-include_lib("couch/include/couch_db.hrl").
+-include_lib("ioq/include/ioq.hrl").
-define(INITIAL_WAIT, 60000).
-define(MONITOR_CHECK, 10000).
@@ -33,11 +34,12 @@
is_sys,
eof = 0,
db_monitor,
- pread_limit = 0
+ pread_limit = 0,
+ tab
}).
% public API
--export([open/1, open/2, close/1, bytes/1, sync/1, truncate/2, set_db_pid/2]).
+-export([open/1, open/2, open/3, close/1, bytes/1, sync/1, truncate/2, set_db_pid/2]).
-export([pread_term/2, pread_iolist/2, pread_binary/2]).
-export([append_binary/2]).
-export([append_raw_chunk/2, assemble_file_chunk/2]).
@@ -66,46 +68,39 @@ open(Filepath) ->
open(Filepath, []).
open(Filepath, Options) ->
- case
- gen_server:start_link(
- couch_file,
- {Filepath, Options, self(), Ref = make_ref()},
- []
- )
- of
- {ok, Fd} ->
- {ok, Fd};
- ignore ->
- % get the error
- receive
- {Ref, Pid, {error, Reason} = Error} ->
- case process_info(self(), trap_exit) of
- {trap_exit, true} ->
- receive
- {'EXIT', Pid, _} -> ok
- end;
- {trap_exit, false} ->
- ok
- end,
- case {lists:member(nologifmissing, Options), Reason} of
- {true, enoent} ->
- ok;
- _ ->
- couch_log:error(
- "Could not open file ~s: ~s",
- [Filepath, file:format_error(Reason)]
- )
- end,
- Error
- end;
- Error ->
- % We can't say much here, because it could be any kind of error.
- % Just let it bubble and an encapsulating subcomponent can perhaps
- % be more informative. It will likely appear in the SASL log, anyway.
+ open(Filepath, Options, undefined).
+
+open(Filepath, Options, IOQPid0) ->
+ case gen_server:start_link(couch_file,
+ {Filepath, Options, self(), Ref = make_ref()}, []) of
+ {ok, Fd} ->
+ IOQPid = case IOQPid0 of
+ undefined ->
+ {ok, IOQPid1} = ioq_server2:start_link({by_shard, Filepath, Fd}),
+ IOQPid1;
+ IOQPid0 when is_pid(IOQPid0) ->
+ IOQPid0
+ end,
+ Tab = gen_server:call(Fd, get_cache_ref),
+ {ok, #ioq_file{fd=Fd, ioq=IOQPid, tab=Tab}};
+ ignore ->
+ % get the error
+ receive
+ {Ref, Pid, {error, Reason} = Error} ->
+ case process_info(self(), trap_exit) of
+ {trap_exit, true} -> receive {'EXIT', Pid, _} -> ok end;
+ {trap_exit, false} -> ok
+ end,
+ case {lists:member(nologifmissing, Options), Reason} of
+ {true, enoent} -> ok;
+ _ ->
+ couch_log:error("Could not open file ~s: ~s",
+ [Filepath, file:format_error(Reason)])
+ end,
Error
end.
-set_db_pid(Fd, Pid) ->
+set_db_pid(#ioq_file{fd=Fd}, Pid) ->
gen_server:call(Fd, {set_db_pid, Pid}).
%%----------------------------------------------------------------------
@@ -155,6 +150,18 @@ pread_term(Fd, Pos) ->
{ok, Bin} = pread_binary(Fd, Pos),
{ok, couch_compress:decompress(Bin)}.
+%% TODO: add purpose docs
+load_from_cache(#ioq_file{tab=undefined}, _Pos) ->
+ missing;
+load_from_cache(#ioq_file{tab=Tab}, Pos) ->
+ case ets:lookup(Tab, Pos) of
+ [{Pos, Res}] ->
+ %% io:format("CACHE HIT: ~p{~p}~n", [Tab, Pos]),
+ Res;
+ [] ->
+ missing
+ end.
+
%%----------------------------------------------------------------------
%% Purpose: Reads a binrary from a file that was written with append_binary
%% Args: Pos, the offset into the file where the term is serialized.
@@ -167,11 +174,18 @@ pread_binary(Fd, Pos) ->
{ok, iolist_to_binary(L)}.
pread_iolist(Fd, Pos) ->
- case ioq:call(Fd, {pread_iolist, Pos}, erlang:get(io_priority)) of
+ case load_from_cache(Fd, Pos) of
{ok, IoList, Md5} ->
+ couch_stats:increment_counter([couchdb, couch_file, cache_hits]),
{ok, verify_md5(Fd, Pos, IoList, Md5)};
- Error ->
- Error
+ missing ->
+ couch_stats:increment_counter([couchdb, couch_file, cache_misses]),
+ case ioq:call(Fd, {pread_iolist, Pos}, erlang:get(io_priority)) of
+ {ok, IoList, Md5} ->
+ {ok, verify_md5(Fd, Pos, IoList, Md5)};
+ Error ->
+ Error
+ end
end.
pread_terms(Fd, PosList) ->
@@ -227,7 +241,7 @@ append_binaries(Fd, Bins) ->
%%----------------------------------------------------------------------
% length in bytes
-bytes(Fd) ->
+bytes(#ioq_file{fd=Fd}) ->
gen_server:call(Fd, bytes, infinity).
%%----------------------------------------------------------------------
@@ -236,7 +250,7 @@ bytes(Fd) ->
%% or {error, Reason}.
%%----------------------------------------------------------------------
-truncate(Fd, Pos) ->
+truncate(#ioq_file{fd=Fd}, Pos) ->
gen_server:call(Fd, {truncate, Pos}, infinity).
%%----------------------------------------------------------------------
@@ -261,7 +275,7 @@ sync(Filepath) when is_list(Filepath) ->
{error, Error} ->
erlang:error(Error)
end;
-sync(Fd) ->
+sync(#ioq_file{fd=Fd}) ->
case gen_server:call(Fd, sync, infinity) of
ok ->
ok;
@@ -273,8 +287,10 @@ sync(Fd) ->
%% Purpose: Close the file.
%% Returns: ok
%%----------------------------------------------------------------------
-close(Fd) ->
- gen_server:call(Fd, close, infinity).
+close(#ioq_file{fd=Fd, ioq=IOP}) ->
+ Res = gen_server:call(Fd, close, infinity),
+ gen_server:call(IOP, close, infinity),
+ Res.
delete(RootDir, Filepath) ->
delete(RootDir, Filepath, []).
@@ -408,7 +424,7 @@ init_status_error(ReturnPid, Ref, Error) ->
ReturnPid ! {Ref, self(), Error},
ignore.
-last_read(Fd) when is_pid(Fd) ->
+last_read(#ioq_file{fd=Fd}) when is_pid(Fd) ->
Now = os:timestamp(),
couch_util:process_dict_get(Fd, read_timestamp, Now).
@@ -419,38 +435,62 @@ init({Filepath, Options, ReturnPid, Ref}) ->
Limit = get_pread_limit(),
IsSys = lists:member(sys_db, Options),
update_read_timestamp(),
- case lists:member(create, Options) of
+ ShouldCache = config:get_boolean("couchdb", "couch_file_cache", true),
+ Tab = case ShouldCache of
true ->
- filelib:ensure_dir(Filepath),
+ couch_stats:increment_counter([couchdb, couch_file, cache_opens]),
+ ets:new(?MODULE, [set, protected, {read_concurrency, true}]);
+ false ->
+ undefined
+ end,
+ case lists:member(create, Options) of
+ true ->
+ filelib:ensure_dir(Filepath),
+ case file:open(Filepath, OpenOptions) of
+ {ok, Fd} ->
+ %% Save Fd in process dictionary for debugging purposes
+ put(couch_file_fd, {Fd, Filepath}),
+ {ok, Length} = file:position(Fd, eof),
+ case Length > 0 of
+ true ->
+ % this means the file already exists and has data.
+ % FYI: We don't differentiate between empty files and non-existant
+ % files here.
+ case lists:member(overwrite, Options) of
+ true ->
+ {ok, 0} = file:position(Fd, 0),
+ ok = file:truncate(Fd),
+ ok = file:sync(Fd),
+ maybe_track_open_os_files(Options),
+ erlang:send_after(?INITIAL_WAIT, self(), maybe_close),
+ {ok, #file{fd=Fd, is_sys=IsSys, pread_limit=Limit, tab=Tab}};
+ false ->
+ ok = file:close(Fd),
+ init_status_error(ReturnPid, Ref, {error, eexist})
+ end;
+ false ->
+ maybe_track_open_os_files(Options),
+ erlang:send_after(?INITIAL_WAIT, self(), maybe_close),
+ {ok, #file{fd=Fd, is_sys=IsSys, pread_limit=Limit, tab=Tab}}
+ end;
+ Error ->
+ init_status_error(ReturnPid, Ref, Error)
+ end;
+ false ->
+ % open in read mode first, so we don't create the file if it doesn't exist.
+ case file:open(Filepath, [read, raw]) of
+ {ok, Fd_Read} ->
case file:open(Filepath, OpenOptions) of
{ok, Fd} ->
- %% Save Fd in process dictionary for debugging purposes
- put(couch_file_fd, {Fd, Filepath}),
- {ok, Length} = file:position(Fd, eof),
- case Length > 0 of
- true ->
- % this means the file already exists and has data.
- % FYI: We don't differentiate between empty files and non-existant
- % files here.
- case lists:member(overwrite, Options) of
- true ->
- {ok, 0} = file:position(Fd, 0),
- ok = file:truncate(Fd),
- ok = file:sync(Fd),
- maybe_track_open_os_files(Options),
- erlang:send_after(?INITIAL_WAIT, self(), maybe_close),
- {ok, #file{fd = Fd, is_sys = IsSys, pread_limit = Limit}};
- false ->
- ok = file:close(Fd),
- init_status_error(ReturnPid, Ref, {error, eexist})
- end;
- false ->
- maybe_track_open_os_files(Options),
- erlang:send_after(?INITIAL_WAIT, self(), maybe_close),
- {ok, #file{fd = Fd, is_sys = IsSys, pread_limit = Limit}}
- end;
- Error ->
- init_status_error(ReturnPid, Ref, Error)
+ %% Save Fd in process dictionary for debugging purposes
+ put(couch_file_fd, {Fd, Filepath}),
+ ok = file:close(Fd_Read),
+ maybe_track_open_os_files(Options),
+ {ok, Eof} = file:position(Fd, eof),
+ erlang:send_after(?INITIAL_WAIT, self(), maybe_close),
+ {ok, #file{fd=Fd, eof=Eof, is_sys=IsSys, pread_limit=Limit, tab=Tab}};
+ Error ->
+ init_status_error(ReturnPid, Ref, Error)
end;
false ->
% open in read mode first, so we don't create the file if it doesn't exist.
@@ -502,16 +542,18 @@ handle_call(close, _From, #file{fd = Fd} = File) ->
handle_call({pread_iolist, Pos}, _From, File) ->
update_read_timestamp(),
{LenIolist, NextPos} = read_raw_iolist_int(File, Pos, 4),
- case iolist_to_binary(LenIolist) of
- % an MD5-prefixed term
- <<1:1/integer, Len:31/integer>> ->
- {Md5AndIoList, _} = read_raw_iolist_int(File, NextPos, Len + 16),
- {Md5, IoList} = extract_md5(Md5AndIoList),
- {reply, {ok, IoList, Md5}, File};
- <<0:1/integer, Len:31/integer>> ->
- {Iolist, _} = read_raw_iolist_int(File, NextPos, Len),
- {reply, {ok, Iolist, <<>>}, File}
- end;
+ Resp = case iolist_to_binary(LenIolist) of
+ <<1:1/integer,Len:31/integer>> -> % an MD5-prefixed term
+ {Md5AndIoList, _} = read_raw_iolist_int(File, NextPos, Len+16),
+ {Md5, IoList} = extract_md5(Md5AndIoList),
+ {ok, IoList, Md5};
+ <<0:1/integer,Len:31/integer>> ->
+ {Iolist, _} = read_raw_iolist_int(File, NextPos, Len),
+ {ok, Iolist, <<>>}
+ end,
+ maybe_cache(File#file.tab, {Pos, Resp}),
+ {reply, Resp, File};
+
handle_call({pread_iolists, PosL}, _From, File) ->
update_read_timestamp(),
LocNums1 = [{Pos, 4} || Pos <- PosL],
@@ -613,8 +655,17 @@ handle_call({write_header, Bin}, _From, #file{fd = Fd, eof = Pos} = File) ->
{reply, Error, reset_eof(File)}
end;
handle_call(find_header, _From, #file{fd = Fd, eof = Pos} = File) ->
- {reply, find_header(Fd, Pos div ?SIZE_BLOCK), File}.
+ {reply, find_header(Fd, Pos div ?SIZE_BLOCK), File};
+handle_call(get_cache_ref, _From, #file{tab=Tab} = File) ->
+ {reply, Tab, File}.
+
+
+handle_cast({cache, Key, Val}, Fd) ->
+ %% TODO: should we skip if value exists?
+ Tab = erlang:get(couch_file_cache),
+ ets:insert(Tab, {Key, Val}),
+ {noreply, Fd};
handle_cast(close, Fd) ->
{stop, normal, Fd}.
@@ -881,7 +932,9 @@ is_idle(#file{is_sys = false}) ->
-spec process_info(CouchFilePid :: pid()) ->
{Fd :: pid() | tuple(), FilePath :: string()} | undefined.
-process_info(Pid) ->
+process_info(Pid) when is_pid(Pid) ->
+ couch_util:process_dict_get(Pid, couch_file_fd);
+process_info(#ioq_file{fd=Pid}) ->
couch_util:process_dict_get(Pid, couch_file_fd).
update_read_timestamp() ->
@@ -905,6 +958,11 @@ reset_eof(#file{} = File) ->
{ok, Eof} = file:position(File#file.fd, eof),
File#file{eof = Eof}.
+maybe_cache(undefined, _Obj) ->
+ ok;
+maybe_cache(Tab, Obj) ->
+ ets:insert(Tab, Obj).
+
-ifdef(TEST).
-include_lib("couch/include/couch_eunit.hrl").
diff --git a/src/couch/test/eunit/couch_btree_tests.erl b/src/couch/test/eunit/couch_btree_tests.erl
index 1c9ba7771..a66f19aab 100644
--- a/src/couch/test/eunit/couch_btree_tests.erl
+++ b/src/couch/test/eunit/couch_btree_tests.erl
@@ -14,6 +14,7 @@
-include_lib("couch/include/couch_eunit.hrl").
-include_lib("couch/include/couch_db.hrl").
+-include_lib("ioq/include/ioq.hrl").
-define(ROWS, 1000).
% seconds
@@ -47,6 +48,8 @@ setup_red() ->
setup_red(_) ->
setup_red().
+teardown(#ioq_file{}=Fd) ->
+ ok = couch_file:close(Fd);
teardown(Fd) when is_pid(Fd) ->
ok = couch_file:close(Fd);
teardown({Fd, _}) ->
@@ -77,6 +80,14 @@ red_test_funs() ->
].
btree_open_test_() ->
+ {
+ setup,
+ fun() -> test_util:start(?MODULE, [ioq]) end,
+ fun test_util:stop/1,
+ fun btree_should_open/0
+ }.
+
+btree_should_open() ->
{ok, Fd} = couch_file:open(?tempfile(), [create, overwrite]),
{ok, Btree} = couch_btree:open(nil, Fd, [{compression, none}]),
{
diff --git a/src/couch/test/eunit/couch_file_tests.erl b/src/couch/test/eunit/couch_file_tests.erl
index 1b54cd70e..37af7e1fc 100644
--- a/src/couch/test/eunit/couch_file_tests.erl
+++ b/src/couch/test/eunit/couch_file_tests.erl
@@ -13,6 +13,7 @@
-module(couch_file_tests).
-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("ioq/include/ioq.hrl").
-define(BLOCK_SIZE, 4096).
-define(setup(F), {setup, fun setup/0, fun teardown/1, F}).
@@ -23,11 +24,40 @@ setup() ->
Fd.
teardown(Fd) ->
- case is_process_alive(Fd) of
+ case is_process_alive(ioq:fd_pid(Fd)) of
true -> ok = couch_file:close(Fd);
false -> ok
end.
+mock_server() ->
+ %%meck:new(config),
+ meck:expect(config, get, fun(Group) ->
+ []
+ end),
+ meck:expect(config, get, fun(_,_) ->
+ undefined
+ end),
+ meck:expect(config, get, fun("ioq2", _, Default) ->
+ Default
+ end),
+ meck:expect(config, get, fun(_, _, Default) ->
+ Default
+ end),
+ %% meck:expect(config, get, fun(_, _, _) ->
+ %% undefined
+ %% end),
+ meck:expect(config, get_integer, fun("ioq2", _, Default) ->
+ Default
+ end),
+ meck:expect(config, get_boolean, fun("ioq2", _, Default) ->
+ Default
+ end).
+
+
+unmock_server(_) ->
+ true = meck:validate(config),
+ ok = meck:unload(config).
+
open_close_test_() ->
{
"Test for proper file open and close",
@@ -39,7 +69,7 @@ open_close_test_() ->
should_return_enoent_if_missed(),
should_ignore_invalid_flags_with_open(),
?setup(fun should_return_pid_on_file_open/1),
- should_close_file_properly(),
+ ?setup(fun should_close_file_properly/0),
?setup(fun should_create_empty_new_files/1)
]
}
@@ -55,7 +85,7 @@ should_ignore_invalid_flags_with_open() ->
).
should_return_pid_on_file_open(Fd) ->
- ?_assert(is_pid(Fd)).
+ ?_assert(is_pid(ioq:fd_pid(Fd))).
should_close_file_properly() ->
{ok, Fd} = couch_file:open(?tempfile(), [create, overwrite]),
@@ -139,9 +169,15 @@ should_not_read_beyond_eof(Fd) ->
{ok, Io} = file:open(Filepath, [read, write, binary]),
ok = file:pwrite(Io, Pos, <<0:1/integer, DoubleBin:31/integer>>),
file:close(Io),
- unlink(Fd),
- ExpectedError = {badmatch, {'EXIT', {bad_return_value, {read_beyond_eof, Filepath}}}},
- ?_assertError(ExpectedError, couch_file:pread_binary(Fd, Pos)).
+ unlink(ioq:fd_pid(Fd)),
+ unlink(ioq:ioq_pid(Fd)),
+ %% ExpectedError = {badmatch, {'EXIT', {bad_return_value,
+ %% {read_beyond_eof, Filepath}}}},
+ %%ExpectedError = {exit, {{bad_return_value, {read_beyond_eof Filepath,}}, _}, _},
+
+ ?_assertExit(
+ {{bad_return_value, {read_beyond_eof, Filepath}}, _},
+ couch_file:pread_binary(Fd, Pos)).
should_truncate(Fd) ->
{ok, 0, _} = couch_file:append_term(Fd, foo),
@@ -179,9 +215,19 @@ should_not_read_more_than_pread_limit(Fd) ->
{_, Filepath} = couch_file:process_info(Fd),
BigBin = list_to_binary(lists:duplicate(100000, 0)),
{ok, Pos, _Size} = couch_file:append_binary(Fd, BigBin),
- unlink(Fd),
- ExpectedError = {badmatch, {'EXIT', {bad_return_value, {exceed_pread_limit, Filepath, 50000}}}},
- ?_assertError(ExpectedError, couch_file:pread_binary(Fd, Pos)).
+ unlink(ioq:fd_pid(Fd)),
+ unlink(ioq:ioq_pid(Fd)),
+ ExpectedError = {badmatch, {'EXIT', {bad_return_value,
+ {exceed_pread_limit, Filepath, 50000}}}},
+ ?debugFmt("EXPECTED ERROR IS: ~p~n", [ExpectedError]),
+ %%?_assert(couch_file:pread_binary(Fd, Pos)).
+ %%try couch_file:pread_binary(Fd, Pos)
+ %%catch E:R:S -> ?debugFmt("GOT ERROR: ~p || ~p~n~p~n", [E,R,S])
+ %%end,
+ %%?_assertError(ExpectedError, couch_file:pread_binary(Fd, Pos)).
+ ?_assertExit(
+ {{bad_return_value, {exceed_pread_limit, Filepath, 50000}}, _},
+ couch_file:pread_binary(Fd, Pos)).
header_test_() ->
{
@@ -540,7 +586,8 @@ fsync_error_test_() ->
}.
fsync_raises_errors() ->
- Fd = spawn(fun() -> fake_fsync_fd() end),
+ FdPid = spawn(fun() -> fake_fsync_fd() end),
+ Fd = #ioq_file{fd=FdPid},
?assertError({fsync_error, eio}, couch_file:sync(Fd)).
fake_fsync_fd() ->
diff --git a/src/couch_mrview/src/couch_mrview_compactor.erl b/src/couch_mrview/src/couch_mrview_compactor.erl
index 28e5a9b3d..c42c27f67 100644
--- a/src/couch_mrview/src/couch_mrview_compactor.erl
+++ b/src/couch_mrview/src/couch_mrview_compactor.erl
@@ -14,6 +14,7 @@
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_mrview/include/couch_mrview.hrl").
+-include_lib("ioq/include/ioq.hrl").
-export([compact/3, swap_compacted/2, remove_compacted/1]).
@@ -46,8 +47,9 @@ compact(State) ->
erlang:put(io_priority, {view_compact, DbName, IdxName}),
{EmptyState, NumDocIds} = couch_util:with_db(DbName, fun(Db) ->
+ IOQPid = ioq:ioq_pid(couch_db:get_fd_handle(Db)),
CompactFName = couch_mrview_util:compaction_file(DbName, Sig),
- {ok, Fd} = couch_mrview_util:open_file(CompactFName),
+ {ok, Fd} = couch_mrview_util:open_file(CompactFName, IOQPid),
ESt = couch_mrview_util:reset_index(Db, Fd, State),
{ok, Count} = couch_db:get_doc_count(Db),
@@ -125,7 +127,7 @@ compact(State) ->
lists:zip(Views, EmptyViews)
),
- unlink(EmptyState#mrst.fd),
+ unlink(ioq:fd_pid(EmptyState#mrst.fd)),
{ok, EmptyState#mrst{
id_btree = NewIdBtree,
views = NewViews,
@@ -139,7 +141,7 @@ recompact(#mrst{db_name = DbName, idx_name = IdxName}, 0) ->
erlang:error({exceeded_recompact_retry_count, [{db_name, DbName}, {idx_name, IdxName}]});
recompact(State, RetryCount) ->
Self = self(),
- link(State#mrst.fd),
+ link(ioq:fd_pid(State#mrst.fd)),
{Pid, Ref} = erlang:spawn_monitor(fun() ->
couch_index_updater:update(Self, couch_mrview_index, State)
end),
@@ -151,10 +153,10 @@ recompact_loop(Pid, Ref, State, RetryCount) ->
% We've made progress so reset RetryCount
recompact_loop(Pid, Ref, State2, recompact_retry_count());
{'DOWN', Ref, _, _, {updated, Pid, State2}} ->
- unlink(State#mrst.fd),
+ unlink(ioq:fd_pid(State#mrst.fd)),
{ok, State2};
{'DOWN', Ref, _, _, Reason} ->
- unlink(State#mrst.fd),
+ unlink(ioq:fd_pid(State#mrst.fd)),
couch_log:warning("Error during recompaction: ~r", [Reason]),
recompact(State, RetryCount - 1)
end.
@@ -239,8 +241,8 @@ swap_compacted(OldState, NewState) ->
fd = NewFd
} = NewState,
- link(NewState#mrst.fd),
- Ref = erlang:monitor(process, NewState#mrst.fd),
+ link(ioq:fd_pid(NewState#mrst.fd)),
+ Ref = erlang:monitor(process, ioq:fd_pid(NewState#mrst.fd)),
RootDir = couch_index_util:root_dir(),
IndexFName = couch_mrview_util:index_file(DbName, Sig),
@@ -256,7 +258,7 @@ swap_compacted(OldState, NewState) ->
ok = couch_file:delete(RootDir, IndexFName),
ok = file:rename(CompactFName, IndexFName),
- unlink(OldState#mrst.fd),
+ unlink(ioq:fd_pid(OldState#mrst.fd)),
erlang:demonitor(OldState#mrst.fd_monitor, [flush]),
{ok, NewState#mrst{fd_monitor = Ref}}.
@@ -295,7 +297,7 @@ recompact_success_after_progress() ->
timer:sleep(100),
exit({updated, self(), State#mrst{update_seq = 2}})
end),
- State = #mrst{fd = self(), update_seq = 0},
+ State = #mrst{fd=#ioq_file{fd=self()}, update_seq=0},
?assertEqual({ok, State#mrst{update_seq = 2}}, recompact(State))
end).
@@ -309,9 +311,10 @@ recompact_exceeded_retry_count() ->
end
),
ok = meck:expect(couch_log, warning, fun(_, _) -> ok end),
- State = #mrst{fd = self(), db_name = foo, idx_name = bar},
- ExpectedError = {exceeded_recompact_retry_count, [{db_name, foo}, {idx_name, bar}]},
- ?assertError(ExpectedError, recompact(State))
+ State = #mrst{fd=#ioq_file{fd=self()}, db_name=foo, idx_name=bar},
+ ExpectedError = {exceeded_recompact_retry_count,
+ [{db_name, foo}, {idx_name, bar}]},
+ ?assertError(ExpectedError, recompact(State))
end).
-endif.
diff --git a/src/couch_mrview/src/couch_mrview_index.erl b/src/couch_mrview/src/couch_mrview_index.erl
index 1bfdb2818..0f3428091 100644
--- a/src/couch_mrview/src/couch_mrview_index.erl
+++ b/src/couch_mrview/src/couch_mrview_index.erl
@@ -101,6 +101,7 @@ open(Db, State0) ->
db_name = DbName,
sig = Sig
} = State = set_partitioned(Db, State0),
+ IOQPid = ioq:ioq_pid(couch_db:get_fd_handle(Db)),
IndexFName = couch_mrview_util:index_file(DbName, Sig),
% If we are upgrading from <= 2.x, we upgrade the view
@@ -120,7 +121,7 @@ open(Db, State0) ->
OldSig = couch_mrview_util:maybe_update_index_file(State),
- case couch_mrview_util:open_file(IndexFName) of
+ case couch_mrview_util:open_file(IndexFName, IOQPid) of
{ok, Fd} ->
case couch_file:read_header(Fd) of
% upgrade code for <= 2.x
@@ -178,7 +179,7 @@ close(State) ->
% outstanding queries are done.
shutdown(State) ->
erlang:demonitor(State#mrst.fd_monitor, [flush]),
- unlink(State#mrst.fd).
+ unlink(ioq:fd_pid(State#mrst.fd)).
delete(#mrst{db_name = DbName, sig = Sig} = State) ->
couch_file:close(State#mrst.fd),
diff --git a/src/couch_mrview/src/couch_mrview_util.erl b/src/couch_mrview/src/couch_mrview_util.erl
index 9e3d292ed..f6ccd062f 100644
--- a/src/couch_mrview/src/couch_mrview_util.erl
+++ b/src/couch_mrview/src/couch_mrview_util.erl
@@ -17,7 +17,7 @@
-export([verify_view_filename/1, get_signature_from_filename/1]).
-export([ddoc_to_mrst/2, init_state/4, reset_index/3]).
-export([make_header/1]).
--export([index_file/2, compaction_file/2, open_file/1]).
+-export([index_file/2, compaction_file/2, open_file/2]).
-export([delete_files/2, delete_index_file/2, delete_compaction_file/2]).
-export([get_row_count/1, all_docs_reduce_to_count/1, reduce_to_count/1]).
-export([all_docs_key_opts/1, all_docs_key_opts/2, key_opts/1, key_opts/2]).
@@ -99,8 +99,8 @@ get_signature_from_filename(FileName) ->
get_view(Db, DDoc, ViewName, Args0) ->
case get_view_index_state(Db, DDoc, ViewName, Args0) of
{ok, State, Args2} ->
- Ref = erlang:monitor(process, State#mrst.fd),
- #mrst{language = Lang, views = Views} = State,
+ Ref = erlang:monitor(process, ioq:fd_pid(State#mrst.fd)),
+ #mrst{language=Lang, views=Views} = State,
{Type, View, Args3} = extract_view(Lang, Args2, ViewName, Views),
check_range(Args3, view_cmp(View)),
Sig = view_sig(Db, State, View, Args3),
@@ -317,6 +317,7 @@ init_state(Db, Fd, State, Header) ->
{ShouldCommit, State#mrst{
fd = Fd,
fd_monitor = erlang:monitor(process, Fd),
+ fd_monitor=erlang:monitor(process, ioq:fd_pid(Fd)),
update_seq = Seq,
purge_seq = PurgeSeq,
id_btree = IdBtree,
@@ -791,10 +792,10 @@ compaction_file(DbName, Sig) ->
FileName = couch_index_util:hexsig(Sig) ++ ".compact.view",
couch_index_util:index_file(mrview, DbName, FileName).
-open_file(FName) ->
- case couch_file:open(FName, [nologifmissing]) of
+open_file(FName, IOQPid) ->
+ case couch_file:open(FName, [nologifmissing], IOQPid) of
{ok, Fd} -> {ok, Fd};
- {error, enoent} -> couch_file:open(FName, [create]);
+ {error, enoent} -> couch_file:open(FName, [create], IOQPid);
Error -> Error
end.