summaryrefslogtreecommitdiff
path: root/src/file_handle_cache.erl
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2011-08-02 10:48:18 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2011-08-02 10:48:18 +0100
commit62b6ce3e78a42a69a336049e5872bf4686a8543a (patch)
tree7a55098c74ec108cdb5d094ed16ccd9eb661e06c /src/file_handle_cache.erl
parentc6248e4437c04032e9231d265181e2e87d615ef5 (diff)
parente2c57c78fcc0281eeb78dd1914287e539265244c (diff)
downloadrabbitmq-server-62b6ce3e78a42a69a336049e5872bf4686a8543a.tar.gz
merge default into bug23504
Diffstat (limited to 'src/file_handle_cache.erl')
-rw-r--r--src/file_handle_cache.erl177
1 files changed, 99 insertions, 78 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index b5b07eca..235e14c0 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -146,7 +146,8 @@
-export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1,
last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1,
flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]).
--export([obtain/0, transfer/1, set_limit/1, get_limit/0]).
+-export([obtain/0, transfer/1, set_limit/1, get_limit/0, info_keys/0, info/0,
+ info/1]).
-export([ulimit/0]).
-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2,
@@ -155,13 +156,6 @@
-define(SERVER, ?MODULE).
-define(RESERVED_FOR_OTHERS, 100).
-%% Googling around suggests that Windows has a limit somewhere around
-%% 16M, eg
-%% http://blogs.technet.com/markrussinovich/archive/2009/09/29/3283844.aspx
-%% however, it turns out that's only available through the win32
-%% API. Via the C Runtime, we have just 512:
-%% http://msdn.microsoft.com/en-us/library/6e3b887c%28VS.80%29.aspx
--define(FILE_HANDLES_LIMIT_WINDOWS, 512).
-define(FILE_HANDLES_LIMIT_OTHER, 1024).
-define(FILE_HANDLES_CHECK_INTERVAL, 2000).
@@ -241,7 +235,7 @@
-> val_or_error(ref())).
-spec(close/1 :: (ref()) -> ok_or_error()).
-spec(read/2 :: (ref(), non_neg_integer()) ->
- val_or_error([char()] | binary()) | 'eof').
+ val_or_error([char()] | binary()) | 'eof').
-spec(append/2 :: (ref(), iodata()) -> ok_or_error()).
-spec(sync/1 :: (ref()) -> ok_or_error()).
-spec(position/2 :: (ref(), position()) -> val_or_error(offset())).
@@ -251,7 +245,7 @@
-spec(current_raw_offset/1 :: (ref()) -> val_or_error(offset())).
-spec(flush/1 :: (ref()) -> ok_or_error()).
-spec(copy/3 :: (ref(), ref(), non_neg_integer()) ->
- val_or_error(non_neg_integer())).
+ val_or_error(non_neg_integer())).
-spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok').
-spec(delete/1 :: (ref()) -> ok_or_error()).
-spec(clear/1 :: (ref()) -> ok_or_error()).
@@ -259,11 +253,17 @@
-spec(transfer/1 :: (pid()) -> 'ok').
-spec(set_limit/1 :: (non_neg_integer()) -> 'ok').
-spec(get_limit/0 :: () -> non_neg_integer()).
+-spec(info_keys/0 :: () -> [atom()]).
+-spec(info/0 :: () -> [{atom(), any()}]).
+-spec(info/1 :: ([atom()]) -> [{atom(), any()}]).
-spec(ulimit/0 :: () -> 'infinity' | 'unknown' | non_neg_integer()).
-endif.
%%----------------------------------------------------------------------------
+-define(INFO_KEYS, [obtain_count, obtain_limit]).
+
+%%----------------------------------------------------------------------------
%% Public API
%%----------------------------------------------------------------------------
@@ -494,6 +494,11 @@ set_limit(Limit) ->
get_limit() ->
gen_server:call(?SERVER, get_limit, infinity).
+info_keys() -> ?INFO_KEYS.
+
+info() -> info(?INFO_KEYS).
+info(Items) -> gen_server:call(?SERVER, {info, Items}, infinity).
+
%%----------------------------------------------------------------------------
%% Internal functions
%%----------------------------------------------------------------------------
@@ -789,6 +794,12 @@ write_buffer(Handle = #handle { hdl = Hdl, offset = Offset,
{Error, Handle}
end.
+infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
+
+i(obtain_count, #fhc_state{obtain_count = Count}) -> Count;
+i(obtain_limit, #fhc_state{obtain_limit = Limit}) -> Limit;
+i(Item, _) -> throw({bad_argument, Item}).
+
%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------
@@ -849,35 +860,41 @@ handle_call({open, Pid, Requested, EldestUnusedSince}, From,
false -> {noreply, run_pending_item(Item, State1)}
end;
-handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit,
- obtain_count = Count,
- obtain_pending = Pending,
- clients = Clients })
- when Limit =/= infinity andalso Count >= Limit ->
- ok = track_client(Pid, Clients),
- true = ets:update_element(Clients, Pid, {#cstate.blocked, true}),
- Item = #pending { kind = obtain, pid = Pid, requested = 1, from = From },
- {noreply, State #fhc_state { obtain_pending = pending_in(Item, Pending) }};
handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count,
obtain_pending = Pending,
clients = Clients }) ->
- Item = #pending { kind = obtain, pid = Pid, requested = 1, from = From },
ok = track_client(Pid, Clients),
- case needs_reduce(State #fhc_state { obtain_count = Count + 1 }) of
- true ->
- true = ets:update_element(Clients, Pid, {#cstate.blocked, true}),
- {noreply, reduce(State #fhc_state {
- obtain_pending = pending_in(Item, Pending) })};
- false ->
- {noreply, run_pending_item(Item, State)}
- end;
+ Item = #pending { kind = obtain, pid = Pid, requested = 1, from = From },
+ Enqueue = fun () ->
+ true = ets:update_element(Clients, Pid,
+ {#cstate.blocked, true}),
+ State #fhc_state {
+ obtain_pending = pending_in(Item, Pending) }
+ end,
+ {noreply,
+ case obtain_limit_reached(State) of
+ true -> Enqueue();
+ false -> case needs_reduce(State #fhc_state {
+ obtain_count = Count + 1 }) of
+ true -> reduce(Enqueue());
+ false -> adjust_alarm(
+ State, run_pending_item(Item, State))
+ end
+ end};
+
handle_call({set_limit, Limit}, _From, State) ->
- {reply, ok, maybe_reduce(
- process_pending(State #fhc_state {
- limit = Limit,
- obtain_limit = obtain_limit(Limit) }))};
+ {reply, ok, adjust_alarm(
+ State, maybe_reduce(
+ process_pending(
+ State #fhc_state {
+ limit = Limit,
+ obtain_limit = obtain_limit(Limit) })))};
+
handle_call(get_limit, _From, State = #fhc_state { limit = Limit }) ->
- {reply, Limit, State}.
+ {reply, Limit, State};
+
+handle_call({info, Items}, _From, State) ->
+ {reply, infos(Items, State), State}.
handle_cast({register_callback, Pid, MFA},
State = #fhc_state { clients = Clients }) ->
@@ -900,9 +917,9 @@ handle_cast({close, Pid, EldestUnusedSince},
_ -> dict:store(Pid, EldestUnusedSince, Elders)
end,
ets:update_counter(Clients, Pid, {#cstate.pending_closes, -1, 0, 0}),
- {noreply, process_pending(
+ {noreply, adjust_alarm(State, process_pending(
update_counts(open, Pid, -1,
- State #fhc_state { elders = Elders1 }))};
+ State #fhc_state { elders = Elders1 })))};
handle_cast({transfer, FromPid, ToPid}, State) ->
ok = track_client(ToPid, State#fhc_state.clients),
@@ -924,13 +941,15 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason},
ets:lookup(Clients, Pid),
true = ets:delete(Clients, Pid),
FilterFun = fun (#pending { pid = Pid1 }) -> Pid1 =/= Pid end,
- {noreply, process_pending(
- State #fhc_state {
- open_count = OpenCount - Opened,
- open_pending = filter_pending(FilterFun, OpenPending),
- obtain_count = ObtainCount - Obtained,
- obtain_pending = filter_pending(FilterFun, ObtainPending),
- elders = dict:erase(Pid, Elders) })}.
+ {noreply, adjust_alarm(
+ State,
+ process_pending(
+ State #fhc_state {
+ open_count = OpenCount - Opened,
+ open_pending = filter_pending(FilterFun, OpenPending),
+ obtain_count = ObtainCount - Obtained,
+ obtain_pending = filter_pending(FilterFun, ObtainPending),
+ elders = dict:erase(Pid, Elders) }))}.
terminate(_Reason, State = #fhc_state { clients = Clients }) ->
ets:delete(Clients),
@@ -951,12 +970,13 @@ queue_fold(Fun, Init, Q) ->
filter_pending(Fun, {Count, Queue}) ->
{Delta, Queue1} =
- queue_fold(fun (Item, {DeltaN, QueueN}) ->
- case Fun(Item) of
- true -> {DeltaN, queue:in(Item, QueueN)};
- false -> {DeltaN - requested(Item), QueueN}
- end
- end, {0, queue:new()}, Queue),
+ queue_fold(
+ fun (Item = #pending { requested = Requested }, {DeltaN, QueueN}) ->
+ case Fun(Item) of
+ true -> {DeltaN, queue:in(Item, QueueN)};
+ false -> {DeltaN - Requested, QueueN}
+ end
+ end, {0, queue:new()}, Queue),
{Count + Delta, Queue1}.
pending_new() ->
@@ -990,8 +1010,17 @@ obtain_limit(Limit) -> case ?OBTAIN_LIMIT(Limit) of
OLimit -> OLimit
end.
-requested({_Kind, _Pid, Requested, _From}) ->
- Requested.
+obtain_limit_reached(#fhc_state { obtain_limit = Limit,
+ obtain_count = Count}) ->
+ Limit =/= infinity andalso Count >= Limit.
+
+adjust_alarm(OldState, NewState) ->
+ case {obtain_limit_reached(OldState), obtain_limit_reached(NewState)} of
+ {false, true} -> alarm_handler:set_alarm({file_descriptor_limit, []});
+ {true, false} -> alarm_handler:clear_alarm(file_descriptor_limit);
+ _ -> ok
+ end,
+ NewState.
process_pending(State = #fhc_state { limit = infinity }) ->
State;
@@ -1094,7 +1123,7 @@ reduce(State = #fhc_state { open_pending = OpenPending,
case CStates of
[] -> ok;
_ -> case (Sum / ClientCount) -
- (1000 * ?FILE_HANDLES_CHECK_INTERVAL) of
+ (1000 * ?FILE_HANDLES_CHECK_INTERVAL) of
AverageAge when AverageAge > 0 ->
notify_age(CStates, AverageAge);
_ ->
@@ -1118,11 +1147,12 @@ notify_age(CStates, AverageAge) ->
end, CStates).
notify_age0(Clients, CStates, Required) ->
- Notifications =
- [CState || CState <- CStates, CState#cstate.callback =/= undefined],
- {L1, L2} = lists:split(random:uniform(length(Notifications)),
- Notifications),
- notify(Clients, Required, L2 ++ L1).
+ case [CState || CState <- CStates, CState#cstate.callback =/= undefined] of
+ [] -> ok;
+ Notifications -> S = random:uniform(length(Notifications)),
+ {L1, L2} = lists:split(S, Notifications),
+ notify(Clients, Required, L2 ++ L1)
+ end.
notify(_Clients, _Required, []) ->
ok;
@@ -1147,29 +1177,20 @@ track_client(Pid, Clients) ->
false -> ok
end.
-%% For all unices, assume ulimit exists. Further googling suggests
-%% that BSDs (incl OS X), solaris and linux all agree that ulimit -n
-%% is file handles
+
+%% To increase the number of file descriptors: on Windows set ERL_MAX_PORTS
+%% environment variable, on Linux set `ulimit -n`.
ulimit() ->
- case os:type() of
- {win32, _OsName} ->
- ?FILE_HANDLES_LIMIT_WINDOWS;
- {unix, _OsName} ->
- %% Under Linux, Solaris and FreeBSD, ulimit is a shell
- %% builtin, not a command. In OS X and AIX it's a command.
- %% Fortunately, os:cmd invokes the cmd in a shell env, so
- %% we're safe in all cases.
- case os:cmd("ulimit -n") of
- "unlimited" ->
- infinity;
- String = [C|_] when $0 =< C andalso C =< $9 ->
- list_to_integer(
- lists:takewhile(
- fun (D) -> $0 =< D andalso D =< $9 end, String));
- _ ->
- %% probably a variant of
- %% "/bin/sh: line 1: ulimit: command not found\n"
- unknown
+ case proplists:get_value(max_fds, erlang:system_info(check_io)) of
+ MaxFds when is_integer(MaxFds) andalso MaxFds > 1 ->
+ case os:type() of
+ {win32, _OsName} ->
+ %% On Windows max_fds is twice the number of open files:
+ %% https://github.com/yrashk/erlang/blob/e1282325ed75e52a98d5/erts/emulator/sys/win32/sys.c#L2459-2466
+ MaxFds div 2;
+ _Any ->
+ %% For other operating systems trust Erlang.
+ MaxFds
end;
_ ->
unknown