diff options
-rw-r--r-- | src/file_handle_cache.erl | 862 | ||||
-rw-r--r-- | src/rabbit.erl | 6 | ||||
-rw-r--r-- | src/tcp_acceptor.erl | 3 | ||||
-rw-r--r-- | src/worker_pool_worker.erl | 15 |
4 files changed, 885 insertions, 1 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl new file mode 100644 index 00000000..0f648dcd --- /dev/null +++ b/src/file_handle_cache.erl @@ -0,0 +1,862 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(file_handle_cache). + +%% A File Handle Cache +%% +%% This extends a subset of the functionality of the Erlang file +%% module. +%% +%% Some constraints +%% 1) This supports one writer, multiple readers per file. Nothing +%% else. +%% 2) Do not open the same file from different processes. Bad things +%% may happen. +%% 3) Writes are all appends. You cannot write to the middle of a +%% file, although you can truncate and then append if you want. +%% 4) Although there is a write buffer, there is no read buffer. Feel +%% free to use the read_ahead mode, but beware of the interaction +%% between that buffer and the write buffer. +%% +%% Some benefits +%% 1) You do not have to remember to call sync before close +%% 2) Buffering is much more flexible than with plain file module, and +%% you can control when the buffer gets flushed out. This means that +%% you can rely on reads-after-writes working, without having to call +%% the expensive sync. +%% 3) Unnecessary calls to position and sync get optimised out. +%% 4) You can find out what your 'real' offset is, and what your +%% 'virtual' offset is (i.e. where the hdl really is, and where it +%% would be after the write buffer is written out). +%% 5) You can find out what the offset was when you last sync'd. +%% +%% There is also a server component which serves to limit the number +%% of open file handles in a "soft" way - the server will never +%% prevent a client from opening a handle, but may immediately tell it +%% to close the handle. Thus you can set the limit to zero and it will +%% still all work correctly, it is just that effectively no caching +%% will take place. The operation of limiting is as follows: +%% +%% On open and close, the client sends messages to the server +%% informing it of opens and closes. This allows the server to keep +%% track of the number of open handles. The client also keeps a +%% gb_tree which is updated on every use of a file handle, mapping the +%% time at which the file handle was last used (timestamp) to the +%% handle. Thus the smallest key in this tree maps to the file handle +%% that has not been used for the longest amount of time. This +%% smallest key is included in the messages to the server. As such, +%% the server keeps track of when the least recently used file handle +%% was used *at the point of the most recent open or close* by each +%% client. +%% +%% Note that this data can go very out of date, by the client using +%% the least recently used handle. +%% +%% When the limit is reached, the server calculates the average age of +%% the last reported least recently used file handle of all the +%% clients. It then tells all the clients to close any handles not +%% used for longer than this average, by invoking the callback the +%% client registered. The client should receive this message and pass +%% it into set_maximum_since_use/1. However, it is highly possible +%% this age will be greater than the ages of all the handles the +%% client knows of because the client has used its file handles in the +%% mean time. Thus at this point the client reports to the server the +%% current timestamp at which its least recently used file handle was +%% last used. The server will check two seconds later that either it +%% is back under the limit, in which case all is well again, or if +%% not, it will calculate a new average age. Its data will be much +%% more recent now, and so it is very likely that when this is +%% communicated to the clients, the clients will close file handles. +%% +%% The advantage of this scheme is that there is only communication +%% from the client to the server on open, close, and when in the +%% process of trying to reduce file handle usage. There is no +%% communication from the client to the server on normal file handle +%% operations. This scheme forms a feed-back loop - the server does +%% not care which file handles are closed, just that some are, and it +%% checks this repeatedly when over the limit. Given the guarantees of +%% now(), even if there is just one file handle open, a limit of 1, +%% and one client, it is certain that when the client calculates the +%% age of the handle, it will be greater than when the server +%% calculated it, hence it should be closed. +%% +%% Handles which are closed as a result of the server are put into a +%% "soft-closed" state in which the handle is closed (data flushed out +%% and sync'd first) but the state is maintained. The handle will be +%% fully reopened again as soon as needed, thus users of this library +%% do not need to worry about their handles being closed by the server +%% - reopening them when necessary is handled transparently. +%% +%% The server also supports obtain and release_on_death. obtain/0 +%% blocks until a file descriptor is available. release_on_death/1 +%% takes a pid and monitors the pid, reducing the count by 1 when the +%% pid dies. Thus the assumption is that obtain/0 is called first, and +%% when that returns, release_on_death/1 is called with the pid who +%% "owns" the file descriptor. This is, for example, used to track the +%% use of file descriptors through network sockets. + +-behaviour(gen_server). + +-export([register_callback/3]). +-export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1, + last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1, + flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]). +-export([release_on_death/1, obtain/0]). + +-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). +-define(RESERVED_FOR_OTHERS, 100). +-define(FILE_HANDLES_LIMIT_WINDOWS, 10000000). +-define(FILE_HANDLES_LIMIT_OTHER, 1024). +-define(FILE_HANDLES_CHECK_INTERVAL, 2000). + +%%---------------------------------------------------------------------------- + +-record(file, + { reader_count, + has_writer + }). + +-record(handle, + { hdl, + offset, + trusted_offset, + is_dirty, + write_buffer_size, + write_buffer_size_limit, + write_buffer, + at_eof, + path, + mode, + options, + is_write, + is_read, + last_used_at + }). + +-record(fhc_state, + { elders, + limit, + count, + obtains, + callbacks, + client_mrefs, + timer_ref + }). + +%%---------------------------------------------------------------------------- +%% Specs +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(ref() :: any()). +-type(error() :: {'error', any()}). +-type(ok_or_error() :: ('ok' | error())). +-type(val_or_error(T) :: ({'ok', T} | error())). +-type(position() :: ('bof' | 'eof' | non_neg_integer() | + {('bof' |'eof'), non_neg_integer()} | {'cur', integer()})). +-type(offset() :: non_neg_integer()). + +-spec(register_callback/3 :: (atom(), atom(), [any()]) -> 'ok'). +-spec(open/3 :: + (string(), [any()], + [{'write_buffer', (non_neg_integer() | 'infinity' | 'unbuffered')}]) -> + val_or_error(ref())). +-spec(close/1 :: (ref()) -> ok_or_error()). +-spec(read/2 :: (ref(), non_neg_integer()) -> + val_or_error([char()] | binary()) | 'eof'). +-spec(append/2 :: (ref(), iodata()) -> ok_or_error()). +-spec(sync/1 :: (ref()) -> ok_or_error()). +-spec(position/2 :: (ref(), position()) -> val_or_error(offset())). +-spec(truncate/1 :: (ref()) -> ok_or_error()). +-spec(last_sync_offset/1 :: (ref()) -> val_or_error(offset())). +-spec(current_virtual_offset/1 :: (ref()) -> val_or_error(offset())). +-spec(current_raw_offset/1 :: (ref()) -> val_or_error(offset())). +-spec(flush/1 :: (ref()) -> ok_or_error()). +-spec(copy/3 :: (ref(), ref(), non_neg_integer()) -> + val_or_error(non_neg_integer())). +-spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok'). +-spec(delete/1 :: (ref()) -> ok_or_error()). +-spec(clear/1 :: (ref()) -> ok_or_error()). +-spec(release_on_death/1 :: (pid()) -> 'ok'). +-spec(obtain/0 :: () -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- +%% Public API +%%---------------------------------------------------------------------------- + +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], [{timeout, infinity}]). + +register_callback(M, F, A) + when is_atom(M) andalso is_atom(F) andalso is_list(A) -> + gen_server:cast(?SERVER, {register_callback, self(), {M, F, A}}). + +open(Path, Mode, Options) -> + Path1 = filename:absname(Path), + File1 = #file { reader_count = RCount, has_writer = HasWriter } = + case get({Path1, fhc_file}) of + File = #file {} -> File; + undefined -> #file { reader_count = 0, + has_writer = false } + end, + Mode1 = append_to_write(Mode), + IsWriter = is_writer(Mode1), + case IsWriter andalso HasWriter of + true -> {error, writer_exists}; + false -> Ref = make_ref(), + case open1(Path1, Mode1, Options, Ref, bof, new) of + {ok, _Handle} -> + RCount1 = case is_reader(Mode1) of + true -> RCount + 1; + false -> RCount + end, + HasWriter1 = HasWriter orelse IsWriter, + put({Path1, fhc_file}, + File1 #file { reader_count = RCount1, + has_writer = HasWriter1 }), + {ok, Ref}; + Error -> + Error + end + end. + +close(Ref) -> + case erase({Ref, fhc_handle}) of + undefined -> ok; + Handle -> case hard_close(Handle) of + ok -> ok; + {Error, Handle1} -> put_handle(Ref, Handle1), + Error + end + end. + +read(Ref, Count) -> + with_flushed_handles( + [Ref], + fun ([#handle { is_read = false }]) -> + {error, not_open_for_reading}; + ([Handle = #handle { hdl = Hdl, offset = Offset }]) -> + case file:read(Hdl, Count) of + {ok, Data} = Obj -> Offset1 = Offset + iolist_size(Data), + {Obj, + [Handle #handle { offset = Offset1 }]}; + eof -> {eof, [Handle #handle { at_eof = true }]}; + Error -> {Error, [Handle]} + end + end). + +append(Ref, Data) -> + with_handles( + [Ref], + fun ([#handle { is_write = false }]) -> + {error, not_open_for_writing}; + ([Handle]) -> + case maybe_seek(eof, Handle) of + {{ok, _Offset}, #handle { hdl = Hdl, offset = Offset, + write_buffer_size_limit = 0, + at_eof = true } = Handle1} -> + Offset1 = Offset + iolist_size(Data), + {file:write(Hdl, Data), + [Handle1 #handle { is_dirty = true, offset = Offset1 }]}; + {{ok, _Offset}, #handle { write_buffer = WriteBuffer, + write_buffer_size = Size, + write_buffer_size_limit = Limit, + at_eof = true } = Handle1} -> + WriteBuffer1 = [Data | WriteBuffer], + Size1 = Size + iolist_size(Data), + Handle2 = Handle1 #handle { write_buffer = WriteBuffer1, + write_buffer_size = Size1 }, + case Limit /= infinity andalso Size1 > Limit of + true -> {Result, Handle3} = write_buffer(Handle2), + {Result, [Handle3]}; + false -> {ok, [Handle2]} + end; + {{error, _} = Error, Handle1} -> + {Error, [Handle1]} + end + end). + +sync(Ref) -> + with_flushed_handles( + [Ref], + fun ([#handle { is_dirty = false, write_buffer = [] }]) -> + ok; + ([Handle = #handle { hdl = Hdl, offset = Offset, + is_dirty = true, write_buffer = [] }]) -> + case file:sync(Hdl) of + ok -> {ok, [Handle #handle { trusted_offset = Offset, + is_dirty = false }]}; + Error -> {Error, [Handle]} + end + end). + +position(Ref, NewOffset) -> + with_flushed_handles( + [Ref], + fun ([Handle]) -> {Result, Handle1} = maybe_seek(NewOffset, Handle), + {Result, [Handle1]} + end). + +truncate(Ref) -> + with_flushed_handles( + [Ref], + fun ([Handle1 = #handle { hdl = Hdl, offset = Offset, + trusted_offset = TOffset }]) -> + case file:truncate(Hdl) of + ok -> TOffset1 = lists:min([Offset, TOffset]), + {ok, [Handle1 #handle { trusted_offset = TOffset1, + at_eof = true }]}; + Error -> {Error, [Handle1]} + end + end). + +last_sync_offset(Ref) -> + with_handles([Ref], fun ([#handle { trusted_offset = TOffset }]) -> + {ok, TOffset} + end). + +current_virtual_offset(Ref) -> + with_handles([Ref], fun ([#handle { at_eof = true, is_write = true, + offset = Offset, + write_buffer_size = Size }]) -> + {ok, Offset + Size}; + ([#handle { offset = Offset }]) -> + {ok, Offset} + end). + +current_raw_offset(Ref) -> + with_handles([Ref], fun ([Handle]) -> {ok, Handle #handle.offset} end). + +flush(Ref) -> + with_flushed_handles([Ref], fun ([Handle]) -> {ok, [Handle]} end). + +copy(Src, Dest, Count) -> + with_flushed_handles( + [Src, Dest], + fun ([SHandle = #handle { is_read = true, hdl = SHdl, offset = SOffset }, + DHandle = #handle { is_write = true, hdl = DHdl, offset = DOffset }] + ) -> + case file:copy(SHdl, DHdl, Count) of + {ok, Count1} = Result1 -> + {Result1, + [SHandle #handle { offset = SOffset + Count1 }, + DHandle #handle { offset = DOffset + Count1 }]}; + Error -> + {Error, [SHandle, DHandle]} + end; + (_Handles) -> + {error, incorrect_handle_modes} + end). + +delete(Ref) -> + case erase({Ref, fhc_handle}) of + undefined -> + ok; + Handle = #handle { path = Path } -> + case hard_close(Handle #handle { is_dirty = false, + write_buffer = [] }) of + ok -> file:delete(Path); + {Error, Handle1} -> put_handle(Ref, Handle1), + Error + end + end. + +clear(Ref) -> + with_handles( + [Ref], + fun ([#handle { at_eof = true, write_buffer_size = 0, offset = 0 }]) -> + ok; + ([Handle]) -> + case maybe_seek(bof, Handle #handle { write_buffer = [], + write_buffer_size = 0 }) of + {{ok, 0}, Handle1 = #handle { hdl = Hdl }} -> + case file:truncate(Hdl) of + ok -> {ok, [Handle1 #handle {trusted_offset = 0, + at_eof = true }]}; + Error -> {Error, [Handle1]} + end; + {{error, _} = Error, Handle1} -> + {Error, [Handle1]} + end + end). + +set_maximum_since_use(MaximumAge) -> + Now = now(), + case lists:foldl( + fun ({{Ref, fhc_handle}, + Handle = #handle { hdl = Hdl, last_used_at = Then }}, Rep) -> + Age = timer:now_diff(Now, Then), + case Hdl /= closed andalso Age >= MaximumAge of + true -> {Res, Handle1} = soft_close(Handle), + case Res of + ok -> put({Ref, fhc_handle}, Handle1), + false; + _ -> put_handle(Ref, Handle1), + Rep + end; + false -> Rep + end; + (_KeyValuePair, Rep) -> + Rep + end, true, get()) of + true -> age_tree_change(), ok; + false -> ok + end. + +release_on_death(Pid) when is_pid(Pid) -> + gen_server:cast(?SERVER, {release_on_death, Pid}). + +obtain() -> + gen_server:call(?SERVER, obtain, infinity). + +%%---------------------------------------------------------------------------- +%% Internal functions +%%---------------------------------------------------------------------------- + +is_reader(Mode) -> lists:member(read, Mode). + +is_writer(Mode) -> lists:member(write, Mode). + +append_to_write(Mode) -> + case lists:member(append, Mode) of + true -> [write | Mode -- [append, write]]; + false -> Mode + end. + +with_handles(Refs, Fun) -> + ResHandles = lists:foldl( + fun (Ref, {ok, HandlesAcc}) -> + case get_or_reopen(Ref) of + {ok, Handle} -> {ok, [Handle | HandlesAcc]}; + Error -> Error + end; + (_Ref, Error) -> + Error + end, {ok, []}, Refs), + case ResHandles of + {ok, Handles} -> + case Fun(lists:reverse(Handles)) of + {Result, Handles1} when is_list(Handles1) -> + lists:zipwith(fun put_handle/2, Refs, Handles1), + Result; + Result -> + Result + end; + Error -> + Error + end. + +with_flushed_handles(Refs, Fun) -> + with_handles( + Refs, + fun (Handles) -> + case lists:foldl( + fun (Handle, {ok, HandlesAcc}) -> + {Res, Handle1} = write_buffer(Handle), + {Res, [Handle1 | HandlesAcc]}; + (Handle, {Error, HandlesAcc}) -> + {Error, [Handle | HandlesAcc]} + end, {ok, []}, Handles) of + {ok, Handles1} -> + Fun(lists:reverse(Handles1)); + {Error, Handles1} -> + {Error, lists:reverse(Handles1)} + end + end). + +get_or_reopen(Ref) -> + case get({Ref, fhc_handle}) of + undefined -> + {error, not_open, Ref}; + #handle { hdl = closed, offset = Offset, + path = Path, mode = Mode, options = Options } -> + open1(Path, Mode, Options, Ref, Offset, reopen); + Handle -> + {ok, Handle} + end. + +put_handle(Ref, Handle = #handle { last_used_at = Then }) -> + Now = now(), + age_tree_update(Then, Now, Ref), + put({Ref, fhc_handle}, Handle #handle { last_used_at = Now }). + +with_age_tree(Fun) -> + put(fhc_age_tree, Fun(case get(fhc_age_tree) of + undefined -> gb_trees:empty(); + AgeTree -> AgeTree + end)). + +age_tree_insert(Now, Ref) -> + with_age_tree( + fun (Tree) -> + Tree1 = gb_trees:insert(Now, Ref, Tree), + {Oldest, _Ref} = gb_trees:smallest(Tree1), + gen_server:cast(?SERVER, {open, self(), Oldest}), + Tree1 + end). + +age_tree_update(Then, Now, Ref) -> + with_age_tree( + fun (Tree) -> + gb_trees:insert(Now, Ref, gb_trees:delete_any(Then, Tree)) + end). + +age_tree_delete(Then) -> + with_age_tree( + fun (Tree) -> + Tree1 = gb_trees:delete_any(Then, Tree), + Oldest = case gb_trees:is_empty(Tree1) of + true -> + undefined; + false -> + {Oldest1, _Ref} = gb_trees:smallest(Tree1), + Oldest1 + end, + gen_server:cast(?SERVER, {close, self(), Oldest}), + Tree1 + end). + +age_tree_change() -> + with_age_tree( + fun (Tree) -> + case gb_trees:is_empty(Tree) of + true -> Tree; + false -> {Oldest, _Ref} = gb_trees:smallest(Tree), + gen_server:cast(?SERVER, {update, self(), Oldest}) + end, + Tree + end). + +open1(Path, Mode, Options, Ref, Offset, NewOrReopen) -> + Mode1 = case NewOrReopen of + new -> Mode; + reopen -> [read | Mode] + end, + case file:open(Path, Mode1) of + {ok, Hdl} -> + WriteBufferSize = + case proplists:get_value(write_buffer, Options, unbuffered) of + unbuffered -> 0; + infinity -> infinity; + N when is_integer(N) -> N + end, + Now = now(), + Handle = #handle { hdl = Hdl, + offset = 0, + trusted_offset = 0, + is_dirty = false, + write_buffer_size = 0, + write_buffer_size_limit = WriteBufferSize, + write_buffer = [], + at_eof = false, + path = Path, + mode = Mode, + options = Options, + is_write = is_writer(Mode), + is_read = is_reader(Mode), + last_used_at = Now }, + {{ok, Offset1}, Handle1} = maybe_seek(Offset, Handle), + Handle2 = Handle1 #handle { trusted_offset = Offset1 }, + put({Ref, fhc_handle}, Handle2), + age_tree_insert(Now, Ref), + {ok, Handle2}; + {error, Reason} -> + {error, Reason} + end. + +soft_close(Handle = #handle { hdl = closed }) -> + {ok, Handle}; +soft_close(Handle) -> + case write_buffer(Handle) of + {ok, #handle { hdl = Hdl, offset = Offset, is_dirty = IsDirty, + last_used_at = Then } = Handle1 } -> + ok = case IsDirty of + true -> file:sync(Hdl); + false -> ok + end, + ok = file:close(Hdl), + age_tree_delete(Then), + {ok, Handle1 #handle { hdl = closed, trusted_offset = Offset, + is_dirty = false }}; + {_Error, _Handle} = Result -> + Result + end. + +hard_close(Handle) -> + case soft_close(Handle) of + {ok, #handle { path = Path, + is_read = IsReader, is_write = IsWriter }} -> + #file { reader_count = RCount, has_writer = HasWriter } = File = + get({Path, fhc_file}), + RCount1 = case IsReader of + true -> RCount - 1; + false -> RCount + end, + HasWriter1 = HasWriter andalso not IsWriter, + case RCount1 =:= 0 andalso not HasWriter1 of + true -> erase({Path, fhc_file}); + false -> put({Path, fhc_file}, + File #file { reader_count = RCount1, + has_writer = HasWriter1 }) + end, + ok; + {_Error, _Handle} = Result -> + Result + end. + +maybe_seek(NewOffset, Handle = #handle { hdl = Hdl, offset = Offset, + at_eof = AtEoF }) -> + {AtEoF1, NeedsSeek} = needs_seek(AtEoF, Offset, NewOffset), + case (case NeedsSeek of + true -> file:position(Hdl, NewOffset); + false -> {ok, Offset} + end) of + {ok, Offset1} = Result -> + {Result, Handle #handle { offset = Offset1, at_eof = AtEoF1 }}; + {error, _} = Error -> + {Error, Handle} + end. + +needs_seek( AtEoF, _CurOffset, cur ) -> {AtEoF, false}; +needs_seek( AtEoF, _CurOffset, {cur, 0}) -> {AtEoF, false}; +needs_seek( true, _CurOffset, eof ) -> {true , false}; +needs_seek( true, _CurOffset, {eof, 0}) -> {true , false}; +needs_seek( false, _CurOffset, eof ) -> {true , true }; +needs_seek( false, _CurOffset, {eof, 0}) -> {true , true }; +needs_seek( AtEoF, 0, bof ) -> {AtEoF, false}; +needs_seek( AtEoF, 0, {bof, 0}) -> {AtEoF, false}; +needs_seek( AtEoF, CurOffset, CurOffset) -> {AtEoF, false}; +needs_seek( true, CurOffset, {bof, DesiredOffset}) + when DesiredOffset >= CurOffset -> + {true, true}; +needs_seek( true, _CurOffset, {cur, DesiredOffset}) + when DesiredOffset > 0 -> + {true, true}; +needs_seek( true, CurOffset, DesiredOffset) %% same as {bof, DO} + when is_integer(DesiredOffset) andalso DesiredOffset >= CurOffset -> + {true, true}; +%% because we can't really track size, we could well end up at EoF and not know +needs_seek(_AtEoF, _CurOffset, _DesiredOffset) -> + {false, true}. + +write_buffer(Handle = #handle { write_buffer = [] }) -> + {ok, Handle}; +write_buffer(Handle = #handle { hdl = Hdl, offset = Offset, + write_buffer = WriteBuffer, + write_buffer_size = DataSize, + at_eof = true }) -> + case file:write(Hdl, lists:reverse(WriteBuffer)) of + ok -> + Offset1 = Offset + DataSize, + {ok, Handle #handle { offset = Offset1, is_dirty = true, + write_buffer = [], write_buffer_size = 0 }}; + {error, _} = Error -> + {Error, Handle} + end. + +%%---------------------------------------------------------------------------- +%% gen_server callbacks +%%---------------------------------------------------------------------------- + +init([]) -> + Limit = case application:get_env(file_handles_high_watermark) of + {ok, Watermark} when (is_integer(Watermark) andalso + Watermark > 0) -> + Watermark; + _ -> + ulimit() + end, + error_logger:info_msg("Limiting to approx ~p file handles~n", [Limit]), + {ok, #fhc_state { elders = dict:new(), limit = Limit, count = 0, + obtains = [], callbacks = dict:new(), + client_mrefs = dict:new(), timer_ref = undefined }}. + +handle_call(obtain, From, State = #fhc_state { count = Count }) -> + State1 = #fhc_state { count = Count1, limit = Limit, obtains = Obtains } = + maybe_reduce(State #fhc_state { count = Count + 1 }), + case Limit /= infinity andalso Count1 >= Limit of + true -> {noreply, State1 #fhc_state { obtains = [From | Obtains], + count = Count1 - 1 }}; + false -> {reply, ok, State1} + end. + +handle_cast({register_callback, Pid, MFA}, + State = #fhc_state { callbacks = Callbacks }) -> + {noreply, ensure_mref( + Pid, State #fhc_state { + callbacks = dict:store(Pid, MFA, Callbacks) })}; + +handle_cast({open, Pid, EldestUnusedSince}, State = + #fhc_state { elders = Elders, count = Count }) -> + Elders1 = dict:store(Pid, EldestUnusedSince, Elders), + {noreply, maybe_reduce( + ensure_mref(Pid, State #fhc_state { elders = Elders1, + count = Count + 1 }))}; + +handle_cast({update, Pid, EldestUnusedSince}, State = + #fhc_state { elders = Elders }) -> + Elders1 = dict:store(Pid, EldestUnusedSince, Elders), + %% don't call maybe_reduce from here otherwise we can create a + %% storm of messages + {noreply, ensure_mref(Pid, State #fhc_state { elders = Elders1 })}; + +handle_cast({close, Pid, EldestUnusedSince}, State = + #fhc_state { elders = Elders, count = Count }) -> + Elders1 = case EldestUnusedSince of + undefined -> dict:erase(Pid, Elders); + _ -> dict:store(Pid, EldestUnusedSince, Elders) + end, + {noreply, process_obtains( + ensure_mref(Pid, State #fhc_state { elders = Elders1, + count = Count - 1 }))}; + +handle_cast(check_counts, State) -> + {noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })}; + +handle_cast({release_on_death, Pid}, State) -> + _MRef = erlang:monitor(process, Pid), + {noreply, State}. + +handle_info({'DOWN', MRef, process, Pid, _Reason}, State = + #fhc_state { count = Count, callbacks = Callbacks, + client_mrefs = ClientMRefs, elders = Elders }) -> + {noreply, process_obtains( + case dict:find(Pid, ClientMRefs) of + {ok, MRef} -> State #fhc_state { + elders = dict:erase(Pid, Elders), + client_mrefs = dict:erase(Pid, ClientMRefs), + callbacks = dict:erase(Pid, Callbacks) }; + _ -> State #fhc_state { count = Count - 1 } + end)}. + +terminate(_Reason, State) -> + State. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%---------------------------------------------------------------------------- +%% server helpers +%%---------------------------------------------------------------------------- + +process_obtains(State = #fhc_state { obtains = [] }) -> + State; +process_obtains(State = #fhc_state { limit = Limit, count = Count }) + when Limit /= infinity andalso Count >= Limit -> + State; +process_obtains(State = #fhc_state { limit = Limit, count = Count, + obtains = Obtains }) -> + ObtainsLen = length(Obtains), + ObtainableLen = lists:min([ObtainsLen, Limit - Count]), + Take = ObtainsLen - ObtainableLen, + {ObtainsNew, ObtainableRev} = lists:split(Take, Obtains), + [gen_server:reply(From, ok) || From <- ObtainableRev], + State #fhc_state { count = Count + ObtainableLen, obtains = ObtainsNew }. + +maybe_reduce(State = #fhc_state { limit = Limit, count = Count, elders = Elders, + callbacks = Callbacks, timer_ref = TRef }) + when Limit /= infinity andalso Count >= Limit -> + Now = now(), + {Pids, Sum, ClientCount} = + dict:fold(fun (_Pid, undefined, Accs) -> + Accs; + (Pid, Eldest, {PidsAcc, SumAcc, CountAcc}) -> + {[Pid|PidsAcc], SumAcc + timer:now_diff(Now, Eldest), + CountAcc + 1} + end, {[], 0, 0}, Elders), + case Pids of + [] -> ok; + _ -> AverageAge = Sum / ClientCount, + lists:foreach( + fun (Pid) -> + case dict:find(Pid, Callbacks) of + error -> ok; + {ok, {M, F, A}} -> apply(M, F, A ++ [AverageAge]) + end + end, Pids) + end, + case TRef of + undefined -> {ok, TRef1} = timer:apply_after( + ?FILE_HANDLES_CHECK_INTERVAL, + gen_server, cast, [?SERVER, check_counts]), + State #fhc_state { timer_ref = TRef1 }; + _ -> State + end; +maybe_reduce(State) -> + State. + +%% Googling around suggests that Windows has a limit somewhere around +%% 16M, eg +%% http://blogs.technet.com/markrussinovich/archive/2009/09/29/3283844.aspx +%% For everything else, assume ulimit exists. Further googling +%% suggests that BSDs (incl OS X), solaris and linux all agree that +%% ulimit -n is file handles +ulimit() -> + case os:type() of + {win32, _OsName} -> + ?FILE_HANDLES_LIMIT_WINDOWS; + {unix, _OsName} -> + %% Under Linux, Solaris and FreeBSD, ulimit is a shell + %% builtin, not a command. In OS X, it's a command. + %% Fortunately, os:cmd invokes the cmd in a shell env, so + %% we're safe in all cases. + case os:cmd("ulimit -n") of + "unlimited" -> + infinity; + String = [C|_] when $0 =< C andalso C =< $9 -> + Num = list_to_integer( + lists:takewhile( + fun (D) -> $0 =< D andalso D =< $9 end, String)) - + ?RESERVED_FOR_OTHERS, + lists:max([1, Num]); + _ -> + %% probably a variant of + %% "/bin/sh: line 1: ulimit: command not found\n" + ?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS + end; + _ -> + ?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS + end. + +ensure_mref(Pid, State = #fhc_state { client_mrefs = ClientMRefs }) -> + case dict:find(Pid, ClientMRefs) of + {ok, _MRef} -> State; + error -> MRef = erlang:monitor(process, Pid), + State #fhc_state { + client_mrefs = dict:store(Pid, MRef, ClientMRefs) } + end. diff --git a/src/rabbit.erl b/src/rabbit.erl index 225ff450..7ca5b07b 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -54,6 +54,12 @@ [{mfa, {rabbit_mnesia, init, []}}, {enables, external_infrastructure}]}). +-rabbit_boot_step({file_handle_cache, + [{description, "file handle cache server"}, + {mfa, {rabbit_sup, start_restartable_child, + [file_handle_cache]}}, + {enables, worker_pool}]}). + -rabbit_boot_step({worker_pool, [{description, "worker pool"}, {mfa, {rabbit_sup, start_child, [worker_pool_sup]}}, diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl index 68efc27f..3b23daa5 100644 --- a/src/tcp_acceptor.erl +++ b/src/tcp_acceptor.erl @@ -76,7 +76,7 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}}, [inet_parse:ntoa(Address), Port, inet_parse:ntoa(PeerAddress), PeerPort]), %% handle - apply(M, F, A ++ [Sock]) + file_handle_cache:release_on_death(apply(M, F, A ++ [Sock])) catch {inet_error, Reason} -> gen_tcp:close(Sock), error_logger:error_msg("unable to accept TCP connection: ~p~n", @@ -104,6 +104,7 @@ code_change(_OldVsn, State, _Extra) -> inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). accept(State = #state{sock=LSock}) -> + ok = file_handle_cache:obtain(), case prim_inet:async_accept(LSock, -1) of {ok, Ref} -> {noreply, State#state{ref=Ref}}; Error -> {stop, {cannot_accept, Error}, State} diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl index d3a48119..d3942a1b 100644 --- a/src/worker_pool_worker.erl +++ b/src/worker_pool_worker.erl @@ -35,6 +35,8 @@ -export([start_link/1, submit/2, submit_async/2, run/1]). +-export([set_maximum_since_use/2]). + -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -47,6 +49,8 @@ -spec(submit_async/2 :: (pid(), fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok'). +-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok'). + -endif. %%---------------------------------------------------------------------------- @@ -65,7 +69,14 @@ submit(Pid, Fun) -> submit_async(Pid, Fun) -> gen_server2:cast(Pid, {submit_async, Fun}). +set_maximum_since_use(Pid, Age) -> + gen_server2:pcast(Pid, 8, {set_maximum_since_use, Age}). + +%%---------------------------------------------------------------------------- + init([WId]) -> + ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use, + [self()]), ok = worker_pool:idle(WId), put(worker_pool_worker, true), {ok, WId, hibernate, @@ -84,6 +95,10 @@ handle_cast({submit_async, Fun}, WId) -> ok = worker_pool:idle(WId), {noreply, WId, hibernate}; +handle_cast({set_maximum_since_use, Age}, WId) -> + ok = file_handle_cache:set_maximum_since_use(Age), + {noreply, WId, hibernate}; + handle_cast(Msg, State) -> {stop, {unexpected_cast, Msg}, State}. |