diff options
Diffstat (limited to 'src/file_handle_cache.erl')
-rw-r--r-- | src/file_handle_cache.erl | 114 |
1 files changed, 67 insertions, 47 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index e14dfe22..6c3f1b5f 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -120,37 +120,39 @@ %% do not need to worry about their handles being closed by the server %% - reopening them when necessary is handled transparently. %% -%% The server also supports obtain and transfer. obtain/0 blocks until -%% a file descriptor is available, at which point the requesting -%% process is considered to 'own' one more descriptor. transfer/1 -%% transfers ownership of a file descriptor between processes. It is -%% non-blocking. Obtain is used to obtain permission to accept file -%% descriptors. Obtain has a lower limit, set by the ?OBTAIN_LIMIT/1 -%% macro. File handles can use the entire limit, but will be evicted -%% by obtain calls up to the point at which no more obtain calls can -%% be satisfied by the obtains limit. Thus there will always be some -%% capacity available for file handles. Processes that use obtain are -%% never asked to return them, and they are not managed in any way by -%% the server. It is simply a mechanism to ensure that processes that -%% need file descriptors such as sockets can do so in such a way that -%% the overall number of open file descriptors is managed. +%% The server also supports obtain, release and transfer. obtain/0 +%% blocks until a file descriptor is available, at which point the +%% requesting process is considered to 'own' one more +%% descriptor. release/0 is the inverse operation and releases a +%% previously obtained descriptor. transfer/1 transfers ownership of a +%% file descriptor between processes. It is non-blocking. Obtain is +%% used to obtain permission to accept file descriptors. Obtain has a +%% lower limit, set by the ?OBTAIN_LIMIT/1 macro. File handles can use +%% the entire limit, but will be evicted by obtain calls up to the +%% point at which no more obtain calls can be satisfied by the obtains +%% limit. Thus there will always be some capacity available for file +%% handles. Processes that use obtain are never asked to return them, +%% and they are not managed in any way by the server. It is simply a +%% mechanism to ensure that processes that need file descriptors such +%% as sockets can do so in such a way that the overall number of open +%% file descriptors is managed. %% %% The callers of register_callback/3, obtain/0, and the argument of %% transfer/1 are monitored, reducing the count of handles in use %% appropriately when the processes terminate. --behaviour(gen_server). +-behaviour(gen_server2). -export([register_callback/3]). -export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1, current_virtual_offset/1, current_raw_offset/1, flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]). --export([obtain/0, transfer/1, set_limit/1, get_limit/0, info_keys/0, info/0, - info/1]). +-export([obtain/0, release/0, transfer/1, set_limit/1, get_limit/0, info_keys/0, + info/0, info/1]). -export([ulimit/0]). -export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). + terminate/2, code_change/3, prioritise_cast/2]). -define(SERVER, ?MODULE). -define(RESERVED_FOR_OTHERS, 100). @@ -248,6 +250,7 @@ -spec(clear/1 :: (ref()) -> ok_or_error()). -spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok'). -spec(obtain/0 :: () -> 'ok'). +-spec(release/0 :: () -> 'ok'). -spec(transfer/1 :: (pid()) -> 'ok'). -spec(set_limit/1 :: (non_neg_integer()) -> 'ok'). -spec(get_limit/0 :: () -> non_neg_integer()). @@ -266,11 +269,11 @@ %%---------------------------------------------------------------------------- start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], [{timeout, infinity}]). + gen_server2:start_link({local, ?SERVER}, ?MODULE, [], [{timeout, infinity}]). register_callback(M, F, A) when is_atom(M) andalso is_atom(F) andalso is_list(A) -> - gen_server:cast(?SERVER, {register_callback, self(), {M, F, A}}). + gen_server2:cast(?SERVER, {register_callback, self(), {M, F, A}}). open(Path, Mode, Options) -> Path1 = filename:absname(Path), @@ -318,7 +321,7 @@ read(Ref, Count) -> fun ([#handle { is_read = false }]) -> {error, not_open_for_reading}; ([Handle = #handle { hdl = Hdl, offset = Offset }]) -> - case file:read(Hdl, Count) of + case prim_file:read(Hdl, Count) of {ok, Data} = Obj -> Offset1 = Offset + iolist_size(Data), {Obj, [Handle #handle { offset = Offset1 }]}; @@ -338,7 +341,7 @@ append(Ref, Data) -> write_buffer_size_limit = 0, at_eof = true } = Handle1} -> Offset1 = Offset + iolist_size(Data), - {file:write(Hdl, Data), + {prim_file:write(Hdl, Data), [Handle1 #handle { is_dirty = true, offset = Offset1 }]}; {{ok, _Offset}, #handle { write_buffer = WriteBuffer, write_buffer_size = Size, @@ -365,7 +368,7 @@ sync(Ref) -> ok; ([Handle = #handle { hdl = Hdl, is_dirty = true, write_buffer = [] }]) -> - case file:sync(Hdl) of + case prim_file:sync(Hdl) of ok -> {ok, [Handle #handle { is_dirty = false }]}; Error -> {Error, [Handle]} end @@ -382,7 +385,7 @@ truncate(Ref) -> with_flushed_handles( [Ref], fun ([Handle1 = #handle { hdl = Hdl }]) -> - case file:truncate(Hdl) of + case prim_file:truncate(Hdl) of ok -> {ok, [Handle1 #handle { at_eof = true }]}; Error -> {Error, [Handle1]} end @@ -409,7 +412,7 @@ copy(Src, Dest, Count) -> fun ([SHandle = #handle { is_read = true, hdl = SHdl, offset = SOffset }, DHandle = #handle { is_write = true, hdl = DHdl, offset = DOffset }] ) -> - case file:copy(SHdl, DHdl, Count) of + case prim_file:copy(SHdl, DHdl, Count) of {ok, Count1} = Result1 -> {Result1, [SHandle #handle { offset = SOffset + Count1 }, @@ -429,7 +432,7 @@ delete(Ref) -> Handle = #handle { path = Path } -> case hard_close(Handle #handle { is_dirty = false, write_buffer = [] }) of - ok -> file:delete(Path); + ok -> prim_file:delete(Path); {Error, Handle1} -> put_handle(Ref, Handle1), Error end @@ -444,7 +447,7 @@ clear(Ref) -> case maybe_seek(bof, Handle #handle { write_buffer = [], write_buffer_size = 0 }) of {{ok, 0}, Handle1 = #handle { hdl = Hdl }} -> - case file:truncate(Hdl) of + case prim_file:truncate(Hdl) of ok -> {ok, [Handle1 #handle { at_eof = true }]}; Error -> {Error, [Handle1]} end; @@ -471,21 +474,28 @@ set_maximum_since_use(MaximumAge) -> end. obtain() -> - gen_server:call(?SERVER, {obtain, self()}, infinity). + %% If the FHC isn't running, obtains succeed immediately. + case whereis(?SERVER) of + undefined -> ok; + _ -> gen_server2:call(?SERVER, {obtain, self()}, infinity) + end. + +release() -> + gen_server2:cast(?SERVER, {release, self()}). transfer(Pid) -> - gen_server:cast(?SERVER, {transfer, self(), Pid}). + gen_server2:cast(?SERVER, {transfer, self(), Pid}). set_limit(Limit) -> - gen_server:call(?SERVER, {set_limit, Limit}, infinity). + gen_server2:call(?SERVER, {set_limit, Limit}, infinity). get_limit() -> - gen_server:call(?SERVER, get_limit, infinity). + gen_server2:call(?SERVER, get_limit, infinity). info_keys() -> ?INFO_KEYS. info() -> info(?INFO_KEYS). -info(Items) -> gen_server:call(?SERVER, {info, Items}, infinity). +info(Items) -> gen_server2:call(?SERVER, {info, Items}, infinity). %%---------------------------------------------------------------------------- %% Internal functions @@ -539,8 +549,8 @@ get_or_reopen(RefNewOrReopens) -> {ok, [Handle || {_Ref, Handle} <- OpenHdls]}; {OpenHdls, ClosedHdls} -> Oldest = oldest(get_age_tree(), fun () -> now() end), - case gen_server:call(?SERVER, {open, self(), length(ClosedHdls), - Oldest}, infinity) of + case gen_server2:call(?SERVER, {open, self(), length(ClosedHdls), + Oldest}, infinity) of ok -> case reopen(ClosedHdls) of {ok, RefHdls} -> sort_handles(RefNewOrReopens, @@ -567,10 +577,10 @@ reopen([{Ref, NewOrReopen, Handle = #handle { hdl = closed, offset = Offset, last_used_at = undefined }} | RefNewOrReopenHdls] = ToOpen, Tree, RefHdls) -> - case file:open(Path, case NewOrReopen of - new -> Mode; - reopen -> [read | Mode] - end) of + case prim_file:open(Path, case NewOrReopen of + new -> Mode; + reopen -> [read | Mode] + end) of {ok, Hdl} -> Now = now(), {{ok, _Offset}, Handle1} = @@ -583,7 +593,7 @@ reopen([{Ref, NewOrReopen, Handle = #handle { hdl = closed, Error -> %% NB: none of the handles in ToOpen are in the age tree Oldest = oldest(Tree, fun () -> undefined end), - [gen_server:cast(?SERVER, {close, self(), Oldest}) || _ <- ToOpen], + [gen_server2:cast(?SERVER, {close, self(), Oldest}) || _ <- ToOpen], put_age_tree(Tree), Error end. @@ -632,7 +642,7 @@ age_tree_delete(Then) -> fun (Tree) -> Tree1 = gb_trees:delete_any(Then, Tree), Oldest = oldest(Tree1, fun () -> undefined end), - gen_server:cast(?SERVER, {close, self(), Oldest}), + gen_server2:cast(?SERVER, {close, self(), Oldest}), Tree1 end). @@ -642,7 +652,7 @@ age_tree_change() -> case gb_trees:is_empty(Tree) of true -> Tree; false -> {Oldest, _Ref} = gb_trees:smallest(Tree), - gen_server:cast(?SERVER, {update, self(), Oldest}) + gen_server2:cast(?SERVER, {update, self(), Oldest}) end, Tree end). @@ -694,10 +704,10 @@ soft_close(Handle) -> is_dirty = IsDirty, last_used_at = Then } = Handle1 } -> ok = case IsDirty of - true -> file:sync(Hdl); + true -> prim_file:sync(Hdl); false -> ok end, - ok = file:close(Hdl), + ok = prim_file:close(Hdl), age_tree_delete(Then), {ok, Handle1 #handle { hdl = closed, is_dirty = false, @@ -732,7 +742,7 @@ maybe_seek(NewOffset, Handle = #handle { hdl = Hdl, offset = Offset, at_eof = AtEoF }) -> {AtEoF1, NeedsSeek} = needs_seek(AtEoF, Offset, NewOffset), case (case NeedsSeek of - true -> file:position(Hdl, NewOffset); + true -> prim_file:position(Hdl, NewOffset); false -> {ok, Offset} end) of {ok, Offset1} = Result -> @@ -769,7 +779,7 @@ write_buffer(Handle = #handle { hdl = Hdl, offset = Offset, write_buffer = WriteBuffer, write_buffer_size = DataSize, at_eof = true }) -> - case file:write(Hdl, lists:reverse(WriteBuffer)) of + case prim_file:write(Hdl, lists:reverse(WriteBuffer)) of ok -> Offset1 = Offset + DataSize, {ok, Handle #handle { offset = Offset1, is_dirty = true, @@ -785,7 +795,7 @@ i(obtain_limit, #fhc_state{obtain_limit = Limit}) -> Limit; i(Item, _) -> throw({bad_argument, Item}). %%---------------------------------------------------------------------------- -%% gen_server callbacks +%% gen_server2 callbacks %%---------------------------------------------------------------------------- init([]) -> @@ -814,6 +824,12 @@ init([]) -> clients = Clients, timer_ref = undefined }}. +prioritise_cast(Msg, _State) -> + case Msg of + {release, _} -> 5; + _ -> 0 + end. + handle_call({open, Pid, Requested, EldestUnusedSince}, From, State = #fhc_state { open_count = Count, open_pending = Pending, @@ -893,6 +909,10 @@ handle_cast({update, Pid, EldestUnusedSince}, %% storm of messages {noreply, State}; +handle_cast({release, Pid}, State) -> + {noreply, adjust_alarm(State, process_pending( + update_counts(obtain, Pid, -1, State)))}; + handle_cast({close, Pid, EldestUnusedSince}, State = #fhc_state { elders = Elders, clients = Clients }) -> true = case EldestUnusedSince of @@ -1048,7 +1068,7 @@ run_pending_item(#pending { kind = Kind, requested = Requested, from = From }, State = #fhc_state { clients = Clients }) -> - gen_server:reply(From, ok), + gen_server2:reply(From, ok), true = ets:update_element(Clients, Pid, {#cstate.blocked, false}), update_counts(Kind, Pid, Requested, State). |