diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-08-02 10:48:18 +0100 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-08-02 10:48:18 +0100 |
commit | 62b6ce3e78a42a69a336049e5872bf4686a8543a (patch) | |
tree | 7a55098c74ec108cdb5d094ed16ccd9eb661e06c /src/file_handle_cache.erl | |
parent | c6248e4437c04032e9231d265181e2e87d615ef5 (diff) | |
parent | e2c57c78fcc0281eeb78dd1914287e539265244c (diff) | |
download | rabbitmq-server-62b6ce3e78a42a69a336049e5872bf4686a8543a.tar.gz |
merge default into bug23504
Diffstat (limited to 'src/file_handle_cache.erl')
-rw-r--r-- | src/file_handle_cache.erl | 177 |
1 files changed, 99 insertions, 78 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index b5b07eca..235e14c0 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -146,7 +146,8 @@ -export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1, last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1, flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]). --export([obtain/0, transfer/1, set_limit/1, get_limit/0]). +-export([obtain/0, transfer/1, set_limit/1, get_limit/0, info_keys/0, info/0, + info/1]). -export([ulimit/0]). -export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -155,13 +156,6 @@ -define(SERVER, ?MODULE). -define(RESERVED_FOR_OTHERS, 100). -%% Googling around suggests that Windows has a limit somewhere around -%% 16M, eg -%% http://blogs.technet.com/markrussinovich/archive/2009/09/29/3283844.aspx -%% however, it turns out that's only available through the win32 -%% API. Via the C Runtime, we have just 512: -%% http://msdn.microsoft.com/en-us/library/6e3b887c%28VS.80%29.aspx --define(FILE_HANDLES_LIMIT_WINDOWS, 512). -define(FILE_HANDLES_LIMIT_OTHER, 1024). -define(FILE_HANDLES_CHECK_INTERVAL, 2000). @@ -241,7 +235,7 @@ -> val_or_error(ref())). -spec(close/1 :: (ref()) -> ok_or_error()). -spec(read/2 :: (ref(), non_neg_integer()) -> - val_or_error([char()] | binary()) | 'eof'). + val_or_error([char()] | binary()) | 'eof'). -spec(append/2 :: (ref(), iodata()) -> ok_or_error()). -spec(sync/1 :: (ref()) -> ok_or_error()). -spec(position/2 :: (ref(), position()) -> val_or_error(offset())). @@ -251,7 +245,7 @@ -spec(current_raw_offset/1 :: (ref()) -> val_or_error(offset())). -spec(flush/1 :: (ref()) -> ok_or_error()). -spec(copy/3 :: (ref(), ref(), non_neg_integer()) -> - val_or_error(non_neg_integer())). + val_or_error(non_neg_integer())). -spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok'). -spec(delete/1 :: (ref()) -> ok_or_error()). -spec(clear/1 :: (ref()) -> ok_or_error()). @@ -259,11 +253,17 @@ -spec(transfer/1 :: (pid()) -> 'ok'). -spec(set_limit/1 :: (non_neg_integer()) -> 'ok'). -spec(get_limit/0 :: () -> non_neg_integer()). +-spec(info_keys/0 :: () -> [atom()]). +-spec(info/0 :: () -> [{atom(), any()}]). +-spec(info/1 :: ([atom()]) -> [{atom(), any()}]). -spec(ulimit/0 :: () -> 'infinity' | 'unknown' | non_neg_integer()). -endif. %%---------------------------------------------------------------------------- +-define(INFO_KEYS, [obtain_count, obtain_limit]). + +%%---------------------------------------------------------------------------- %% Public API %%---------------------------------------------------------------------------- @@ -494,6 +494,11 @@ set_limit(Limit) -> get_limit() -> gen_server:call(?SERVER, get_limit, infinity). +info_keys() -> ?INFO_KEYS. + +info() -> info(?INFO_KEYS). +info(Items) -> gen_server:call(?SERVER, {info, Items}, infinity). + %%---------------------------------------------------------------------------- %% Internal functions %%---------------------------------------------------------------------------- @@ -789,6 +794,12 @@ write_buffer(Handle = #handle { hdl = Hdl, offset = Offset, {Error, Handle} end. +infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. + +i(obtain_count, #fhc_state{obtain_count = Count}) -> Count; +i(obtain_limit, #fhc_state{obtain_limit = Limit}) -> Limit; +i(Item, _) -> throw({bad_argument, Item}). + %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- @@ -849,35 +860,41 @@ handle_call({open, Pid, Requested, EldestUnusedSince}, From, false -> {noreply, run_pending_item(Item, State1)} end; -handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit, - obtain_count = Count, - obtain_pending = Pending, - clients = Clients }) - when Limit =/= infinity andalso Count >= Limit -> - ok = track_client(Pid, Clients), - true = ets:update_element(Clients, Pid, {#cstate.blocked, true}), - Item = #pending { kind = obtain, pid = Pid, requested = 1, from = From }, - {noreply, State #fhc_state { obtain_pending = pending_in(Item, Pending) }}; handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count, obtain_pending = Pending, clients = Clients }) -> - Item = #pending { kind = obtain, pid = Pid, requested = 1, from = From }, ok = track_client(Pid, Clients), - case needs_reduce(State #fhc_state { obtain_count = Count + 1 }) of - true -> - true = ets:update_element(Clients, Pid, {#cstate.blocked, true}), - {noreply, reduce(State #fhc_state { - obtain_pending = pending_in(Item, Pending) })}; - false -> - {noreply, run_pending_item(Item, State)} - end; + Item = #pending { kind = obtain, pid = Pid, requested = 1, from = From }, + Enqueue = fun () -> + true = ets:update_element(Clients, Pid, + {#cstate.blocked, true}), + State #fhc_state { + obtain_pending = pending_in(Item, Pending) } + end, + {noreply, + case obtain_limit_reached(State) of + true -> Enqueue(); + false -> case needs_reduce(State #fhc_state { + obtain_count = Count + 1 }) of + true -> reduce(Enqueue()); + false -> adjust_alarm( + State, run_pending_item(Item, State)) + end + end}; + handle_call({set_limit, Limit}, _From, State) -> - {reply, ok, maybe_reduce( - process_pending(State #fhc_state { - limit = Limit, - obtain_limit = obtain_limit(Limit) }))}; + {reply, ok, adjust_alarm( + State, maybe_reduce( + process_pending( + State #fhc_state { + limit = Limit, + obtain_limit = obtain_limit(Limit) })))}; + handle_call(get_limit, _From, State = #fhc_state { limit = Limit }) -> - {reply, Limit, State}. + {reply, Limit, State}; + +handle_call({info, Items}, _From, State) -> + {reply, infos(Items, State), State}. handle_cast({register_callback, Pid, MFA}, State = #fhc_state { clients = Clients }) -> @@ -900,9 +917,9 @@ handle_cast({close, Pid, EldestUnusedSince}, _ -> dict:store(Pid, EldestUnusedSince, Elders) end, ets:update_counter(Clients, Pid, {#cstate.pending_closes, -1, 0, 0}), - {noreply, process_pending( + {noreply, adjust_alarm(State, process_pending( update_counts(open, Pid, -1, - State #fhc_state { elders = Elders1 }))}; + State #fhc_state { elders = Elders1 })))}; handle_cast({transfer, FromPid, ToPid}, State) -> ok = track_client(ToPid, State#fhc_state.clients), @@ -924,13 +941,15 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, ets:lookup(Clients, Pid), true = ets:delete(Clients, Pid), FilterFun = fun (#pending { pid = Pid1 }) -> Pid1 =/= Pid end, - {noreply, process_pending( - State #fhc_state { - open_count = OpenCount - Opened, - open_pending = filter_pending(FilterFun, OpenPending), - obtain_count = ObtainCount - Obtained, - obtain_pending = filter_pending(FilterFun, ObtainPending), - elders = dict:erase(Pid, Elders) })}. + {noreply, adjust_alarm( + State, + process_pending( + State #fhc_state { + open_count = OpenCount - Opened, + open_pending = filter_pending(FilterFun, OpenPending), + obtain_count = ObtainCount - Obtained, + obtain_pending = filter_pending(FilterFun, ObtainPending), + elders = dict:erase(Pid, Elders) }))}. terminate(_Reason, State = #fhc_state { clients = Clients }) -> ets:delete(Clients), @@ -951,12 +970,13 @@ queue_fold(Fun, Init, Q) -> filter_pending(Fun, {Count, Queue}) -> {Delta, Queue1} = - queue_fold(fun (Item, {DeltaN, QueueN}) -> - case Fun(Item) of - true -> {DeltaN, queue:in(Item, QueueN)}; - false -> {DeltaN - requested(Item), QueueN} - end - end, {0, queue:new()}, Queue), + queue_fold( + fun (Item = #pending { requested = Requested }, {DeltaN, QueueN}) -> + case Fun(Item) of + true -> {DeltaN, queue:in(Item, QueueN)}; + false -> {DeltaN - Requested, QueueN} + end + end, {0, queue:new()}, Queue), {Count + Delta, Queue1}. pending_new() -> @@ -990,8 +1010,17 @@ obtain_limit(Limit) -> case ?OBTAIN_LIMIT(Limit) of OLimit -> OLimit end. -requested({_Kind, _Pid, Requested, _From}) -> - Requested. +obtain_limit_reached(#fhc_state { obtain_limit = Limit, + obtain_count = Count}) -> + Limit =/= infinity andalso Count >= Limit. + +adjust_alarm(OldState, NewState) -> + case {obtain_limit_reached(OldState), obtain_limit_reached(NewState)} of + {false, true} -> alarm_handler:set_alarm({file_descriptor_limit, []}); + {true, false} -> alarm_handler:clear_alarm(file_descriptor_limit); + _ -> ok + end, + NewState. process_pending(State = #fhc_state { limit = infinity }) -> State; @@ -1094,7 +1123,7 @@ reduce(State = #fhc_state { open_pending = OpenPending, case CStates of [] -> ok; _ -> case (Sum / ClientCount) - - (1000 * ?FILE_HANDLES_CHECK_INTERVAL) of + (1000 * ?FILE_HANDLES_CHECK_INTERVAL) of AverageAge when AverageAge > 0 -> notify_age(CStates, AverageAge); _ -> @@ -1118,11 +1147,12 @@ notify_age(CStates, AverageAge) -> end, CStates). notify_age0(Clients, CStates, Required) -> - Notifications = - [CState || CState <- CStates, CState#cstate.callback =/= undefined], - {L1, L2} = lists:split(random:uniform(length(Notifications)), - Notifications), - notify(Clients, Required, L2 ++ L1). + case [CState || CState <- CStates, CState#cstate.callback =/= undefined] of + [] -> ok; + Notifications -> S = random:uniform(length(Notifications)), + {L1, L2} = lists:split(S, Notifications), + notify(Clients, Required, L2 ++ L1) + end. notify(_Clients, _Required, []) -> ok; @@ -1147,29 +1177,20 @@ track_client(Pid, Clients) -> false -> ok end. -%% For all unices, assume ulimit exists. Further googling suggests -%% that BSDs (incl OS X), solaris and linux all agree that ulimit -n -%% is file handles + +%% To increase the number of file descriptors: on Windows set ERL_MAX_PORTS +%% environment variable, on Linux set `ulimit -n`. ulimit() -> - case os:type() of - {win32, _OsName} -> - ?FILE_HANDLES_LIMIT_WINDOWS; - {unix, _OsName} -> - %% Under Linux, Solaris and FreeBSD, ulimit is a shell - %% builtin, not a command. In OS X and AIX it's a command. - %% Fortunately, os:cmd invokes the cmd in a shell env, so - %% we're safe in all cases. - case os:cmd("ulimit -n") of - "unlimited" -> - infinity; - String = [C|_] when $0 =< C andalso C =< $9 -> - list_to_integer( - lists:takewhile( - fun (D) -> $0 =< D andalso D =< $9 end, String)); - _ -> - %% probably a variant of - %% "/bin/sh: line 1: ulimit: command not found\n" - unknown + case proplists:get_value(max_fds, erlang:system_info(check_io)) of + MaxFds when is_integer(MaxFds) andalso MaxFds > 1 -> + case os:type() of + {win32, _OsName} -> + %% On Windows max_fds is twice the number of open files: + %% https://github.com/yrashk/erlang/blob/e1282325ed75e52a98d5/erts/emulator/sys/win32/sys.c#L2459-2466 + MaxFds div 2; + _Any -> + %% For other operating systems trust Erlang. + MaxFds end; _ -> unknown |