summaryrefslogtreecommitdiff
path: root/src/file_handle_cache.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/file_handle_cache.erl')
-rw-r--r--src/file_handle_cache.erl114
1 files changed, 67 insertions, 47 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index e14dfe22..6c3f1b5f 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -120,37 +120,39 @@
%% do not need to worry about their handles being closed by the server
%% - reopening them when necessary is handled transparently.
%%
-%% The server also supports obtain and transfer. obtain/0 blocks until
-%% a file descriptor is available, at which point the requesting
-%% process is considered to 'own' one more descriptor. transfer/1
-%% transfers ownership of a file descriptor between processes. It is
-%% non-blocking. Obtain is used to obtain permission to accept file
-%% descriptors. Obtain has a lower limit, set by the ?OBTAIN_LIMIT/1
-%% macro. File handles can use the entire limit, but will be evicted
-%% by obtain calls up to the point at which no more obtain calls can
-%% be satisfied by the obtains limit. Thus there will always be some
-%% capacity available for file handles. Processes that use obtain are
-%% never asked to return them, and they are not managed in any way by
-%% the server. It is simply a mechanism to ensure that processes that
-%% need file descriptors such as sockets can do so in such a way that
-%% the overall number of open file descriptors is managed.
+%% The server also supports obtain, release and transfer. obtain/0
+%% blocks until a file descriptor is available, at which point the
+%% requesting process is considered to 'own' one more
+%% descriptor. release/0 is the inverse operation and releases a
+%% previously obtained descriptor. transfer/1 transfers ownership of a
+%% file descriptor between processes. It is non-blocking. Obtain is
+%% used to obtain permission to accept file descriptors. Obtain has a
+%% lower limit, set by the ?OBTAIN_LIMIT/1 macro. File handles can use
+%% the entire limit, but will be evicted by obtain calls up to the
+%% point at which no more obtain calls can be satisfied by the obtains
+%% limit. Thus there will always be some capacity available for file
+%% handles. Processes that use obtain are never asked to return them,
+%% and they are not managed in any way by the server. It is simply a
+%% mechanism to ensure that processes that need file descriptors such
+%% as sockets can do so in such a way that the overall number of open
+%% file descriptors is managed.
%%
%% The callers of register_callback/3, obtain/0, and the argument of
%% transfer/1 are monitored, reducing the count of handles in use
%% appropriately when the processes terminate.
--behaviour(gen_server).
+-behaviour(gen_server2).
-export([register_callback/3]).
-export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1,
current_virtual_offset/1, current_raw_offset/1, flush/1, copy/3,
set_maximum_since_use/1, delete/1, clear/1]).
--export([obtain/0, transfer/1, set_limit/1, get_limit/0, info_keys/0, info/0,
- info/1]).
+-export([obtain/0, release/0, transfer/1, set_limit/1, get_limit/0, info_keys/0,
+ info/0, info/1]).
-export([ulimit/0]).
-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
+ terminate/2, code_change/3, prioritise_cast/2]).
-define(SERVER, ?MODULE).
-define(RESERVED_FOR_OTHERS, 100).
@@ -248,6 +250,7 @@
-spec(clear/1 :: (ref()) -> ok_or_error()).
-spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok').
-spec(obtain/0 :: () -> 'ok').
+-spec(release/0 :: () -> 'ok').
-spec(transfer/1 :: (pid()) -> 'ok').
-spec(set_limit/1 :: (non_neg_integer()) -> 'ok').
-spec(get_limit/0 :: () -> non_neg_integer()).
@@ -266,11 +269,11 @@
%%----------------------------------------------------------------------------
start_link() ->
- gen_server:start_link({local, ?SERVER}, ?MODULE, [], [{timeout, infinity}]).
+ gen_server2:start_link({local, ?SERVER}, ?MODULE, [], [{timeout, infinity}]).
register_callback(M, F, A)
when is_atom(M) andalso is_atom(F) andalso is_list(A) ->
- gen_server:cast(?SERVER, {register_callback, self(), {M, F, A}}).
+ gen_server2:cast(?SERVER, {register_callback, self(), {M, F, A}}).
open(Path, Mode, Options) ->
Path1 = filename:absname(Path),
@@ -318,7 +321,7 @@ read(Ref, Count) ->
fun ([#handle { is_read = false }]) ->
{error, not_open_for_reading};
([Handle = #handle { hdl = Hdl, offset = Offset }]) ->
- case file:read(Hdl, Count) of
+ case prim_file:read(Hdl, Count) of
{ok, Data} = Obj -> Offset1 = Offset + iolist_size(Data),
{Obj,
[Handle #handle { offset = Offset1 }]};
@@ -338,7 +341,7 @@ append(Ref, Data) ->
write_buffer_size_limit = 0,
at_eof = true } = Handle1} ->
Offset1 = Offset + iolist_size(Data),
- {file:write(Hdl, Data),
+ {prim_file:write(Hdl, Data),
[Handle1 #handle { is_dirty = true, offset = Offset1 }]};
{{ok, _Offset}, #handle { write_buffer = WriteBuffer,
write_buffer_size = Size,
@@ -365,7 +368,7 @@ sync(Ref) ->
ok;
([Handle = #handle { hdl = Hdl,
is_dirty = true, write_buffer = [] }]) ->
- case file:sync(Hdl) of
+ case prim_file:sync(Hdl) of
ok -> {ok, [Handle #handle { is_dirty = false }]};
Error -> {Error, [Handle]}
end
@@ -382,7 +385,7 @@ truncate(Ref) ->
with_flushed_handles(
[Ref],
fun ([Handle1 = #handle { hdl = Hdl }]) ->
- case file:truncate(Hdl) of
+ case prim_file:truncate(Hdl) of
ok -> {ok, [Handle1 #handle { at_eof = true }]};
Error -> {Error, [Handle1]}
end
@@ -409,7 +412,7 @@ copy(Src, Dest, Count) ->
fun ([SHandle = #handle { is_read = true, hdl = SHdl, offset = SOffset },
DHandle = #handle { is_write = true, hdl = DHdl, offset = DOffset }]
) ->
- case file:copy(SHdl, DHdl, Count) of
+ case prim_file:copy(SHdl, DHdl, Count) of
{ok, Count1} = Result1 ->
{Result1,
[SHandle #handle { offset = SOffset + Count1 },
@@ -429,7 +432,7 @@ delete(Ref) ->
Handle = #handle { path = Path } ->
case hard_close(Handle #handle { is_dirty = false,
write_buffer = [] }) of
- ok -> file:delete(Path);
+ ok -> prim_file:delete(Path);
{Error, Handle1} -> put_handle(Ref, Handle1),
Error
end
@@ -444,7 +447,7 @@ clear(Ref) ->
case maybe_seek(bof, Handle #handle { write_buffer = [],
write_buffer_size = 0 }) of
{{ok, 0}, Handle1 = #handle { hdl = Hdl }} ->
- case file:truncate(Hdl) of
+ case prim_file:truncate(Hdl) of
ok -> {ok, [Handle1 #handle { at_eof = true }]};
Error -> {Error, [Handle1]}
end;
@@ -471,21 +474,28 @@ set_maximum_since_use(MaximumAge) ->
end.
obtain() ->
- gen_server:call(?SERVER, {obtain, self()}, infinity).
+ %% If the FHC isn't running, obtains succeed immediately.
+ case whereis(?SERVER) of
+ undefined -> ok;
+ _ -> gen_server2:call(?SERVER, {obtain, self()}, infinity)
+ end.
+
+release() ->
+ gen_server2:cast(?SERVER, {release, self()}).
transfer(Pid) ->
- gen_server:cast(?SERVER, {transfer, self(), Pid}).
+ gen_server2:cast(?SERVER, {transfer, self(), Pid}).
set_limit(Limit) ->
- gen_server:call(?SERVER, {set_limit, Limit}, infinity).
+ gen_server2:call(?SERVER, {set_limit, Limit}, infinity).
get_limit() ->
- gen_server:call(?SERVER, get_limit, infinity).
+ gen_server2:call(?SERVER, get_limit, infinity).
info_keys() -> ?INFO_KEYS.
info() -> info(?INFO_KEYS).
-info(Items) -> gen_server:call(?SERVER, {info, Items}, infinity).
+info(Items) -> gen_server2:call(?SERVER, {info, Items}, infinity).
%%----------------------------------------------------------------------------
%% Internal functions
@@ -539,8 +549,8 @@ get_or_reopen(RefNewOrReopens) ->
{ok, [Handle || {_Ref, Handle} <- OpenHdls]};
{OpenHdls, ClosedHdls} ->
Oldest = oldest(get_age_tree(), fun () -> now() end),
- case gen_server:call(?SERVER, {open, self(), length(ClosedHdls),
- Oldest}, infinity) of
+ case gen_server2:call(?SERVER, {open, self(), length(ClosedHdls),
+ Oldest}, infinity) of
ok ->
case reopen(ClosedHdls) of
{ok, RefHdls} -> sort_handles(RefNewOrReopens,
@@ -567,10 +577,10 @@ reopen([{Ref, NewOrReopen, Handle = #handle { hdl = closed,
offset = Offset,
last_used_at = undefined }} |
RefNewOrReopenHdls] = ToOpen, Tree, RefHdls) ->
- case file:open(Path, case NewOrReopen of
- new -> Mode;
- reopen -> [read | Mode]
- end) of
+ case prim_file:open(Path, case NewOrReopen of
+ new -> Mode;
+ reopen -> [read | Mode]
+ end) of
{ok, Hdl} ->
Now = now(),
{{ok, _Offset}, Handle1} =
@@ -583,7 +593,7 @@ reopen([{Ref, NewOrReopen, Handle = #handle { hdl = closed,
Error ->
%% NB: none of the handles in ToOpen are in the age tree
Oldest = oldest(Tree, fun () -> undefined end),
- [gen_server:cast(?SERVER, {close, self(), Oldest}) || _ <- ToOpen],
+ [gen_server2:cast(?SERVER, {close, self(), Oldest}) || _ <- ToOpen],
put_age_tree(Tree),
Error
end.
@@ -632,7 +642,7 @@ age_tree_delete(Then) ->
fun (Tree) ->
Tree1 = gb_trees:delete_any(Then, Tree),
Oldest = oldest(Tree1, fun () -> undefined end),
- gen_server:cast(?SERVER, {close, self(), Oldest}),
+ gen_server2:cast(?SERVER, {close, self(), Oldest}),
Tree1
end).
@@ -642,7 +652,7 @@ age_tree_change() ->
case gb_trees:is_empty(Tree) of
true -> Tree;
false -> {Oldest, _Ref} = gb_trees:smallest(Tree),
- gen_server:cast(?SERVER, {update, self(), Oldest})
+ gen_server2:cast(?SERVER, {update, self(), Oldest})
end,
Tree
end).
@@ -694,10 +704,10 @@ soft_close(Handle) ->
is_dirty = IsDirty,
last_used_at = Then } = Handle1 } ->
ok = case IsDirty of
- true -> file:sync(Hdl);
+ true -> prim_file:sync(Hdl);
false -> ok
end,
- ok = file:close(Hdl),
+ ok = prim_file:close(Hdl),
age_tree_delete(Then),
{ok, Handle1 #handle { hdl = closed,
is_dirty = false,
@@ -732,7 +742,7 @@ maybe_seek(NewOffset, Handle = #handle { hdl = Hdl, offset = Offset,
at_eof = AtEoF }) ->
{AtEoF1, NeedsSeek} = needs_seek(AtEoF, Offset, NewOffset),
case (case NeedsSeek of
- true -> file:position(Hdl, NewOffset);
+ true -> prim_file:position(Hdl, NewOffset);
false -> {ok, Offset}
end) of
{ok, Offset1} = Result ->
@@ -769,7 +779,7 @@ write_buffer(Handle = #handle { hdl = Hdl, offset = Offset,
write_buffer = WriteBuffer,
write_buffer_size = DataSize,
at_eof = true }) ->
- case file:write(Hdl, lists:reverse(WriteBuffer)) of
+ case prim_file:write(Hdl, lists:reverse(WriteBuffer)) of
ok ->
Offset1 = Offset + DataSize,
{ok, Handle #handle { offset = Offset1, is_dirty = true,
@@ -785,7 +795,7 @@ i(obtain_limit, #fhc_state{obtain_limit = Limit}) -> Limit;
i(Item, _) -> throw({bad_argument, Item}).
%%----------------------------------------------------------------------------
-%% gen_server callbacks
+%% gen_server2 callbacks
%%----------------------------------------------------------------------------
init([]) ->
@@ -814,6 +824,12 @@ init([]) ->
clients = Clients,
timer_ref = undefined }}.
+prioritise_cast(Msg, _State) ->
+ case Msg of
+ {release, _} -> 5;
+ _ -> 0
+ end.
+
handle_call({open, Pid, Requested, EldestUnusedSince}, From,
State = #fhc_state { open_count = Count,
open_pending = Pending,
@@ -893,6 +909,10 @@ handle_cast({update, Pid, EldestUnusedSince},
%% storm of messages
{noreply, State};
+handle_cast({release, Pid}, State) ->
+ {noreply, adjust_alarm(State, process_pending(
+ update_counts(obtain, Pid, -1, State)))};
+
handle_cast({close, Pid, EldestUnusedSince},
State = #fhc_state { elders = Elders, clients = Clients }) ->
true = case EldestUnusedSince of
@@ -1048,7 +1068,7 @@ run_pending_item(#pending { kind = Kind,
requested = Requested,
from = From },
State = #fhc_state { clients = Clients }) ->
- gen_server:reply(From, ok),
+ gen_server2:reply(From, ok),
true = ets:update_element(Clients, Pid, {#cstate.blocked, false}),
update_counts(Kind, Pid, Requested, State).