summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-08-16 13:40:39 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-08-16 13:40:39 +0100
commit5f26be1c52eaeed3579b152571f7dea631c791c9 (patch)
tree87b6fb20744c972b8f184be259e1d3077f912a0d
parent7c559387acdfcdc51eced753696b6c6a2d678662 (diff)
downloadrabbitmq-server-5f26be1c52eaeed3579b152571f7dea631c791c9.tar.gz
Split obtains in two : one for open requests (higher priority) and one for obtains requests (lower priority, and a lower limit)
-rw-r--r--src/file_handle_cache.erl106
1 files changed, 67 insertions, 39 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index e79fbb96..130cdbde 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -148,6 +148,11 @@
-define(FILE_HANDLES_LIMIT_OTHER, 1024).
-define(FILE_HANDLES_CHECK_INTERVAL, 2000).
+-define(OBTAINS_LIMIT(LIMIT), case LIMIT of
+ infinity -> infinity;
+ _ -> trunc((LIMIT * 0.9) - 1)
+ end).
+
%%----------------------------------------------------------------------------
-record(file,
@@ -174,8 +179,10 @@
-record(fhc_state,
{ elders,
- limit,
count,
+ limit,
+ opens,
+ obtains_limit,
obtains,
callbacks,
client_mrefs,
@@ -723,19 +730,35 @@ init([]) ->
_ ->
ulimit()
end,
- error_logger:info_msg("Limiting to approx ~p file handles~n", [Limit]),
- {ok, #fhc_state { elders = dict:new(), limit = Limit, count = 0,
- obtains = [], callbacks = dict:new(),
- client_mrefs = dict:new(), timer_ref = undefined }}.
-
-handle_call(obtain, From, State) ->
- add_obtains(From, false, State);
+ ObtainsLimit = ?OBTAINS_LIMIT(Limit)
+ error_logger:info_msg("Limiting to approx ~p file handles (~p sockets)~n",
+ [Limit, ObtainsLimit]),
+ {ok, #fhc_state { elders = dict:new(), count = 0, limit = Limit, opens = [],
+ obtains_limit = ObtainsLimit, obtains = [],
+ callbacks = dict:new(), client_mrefs = dict:new(),
+ timer_ref = undefined }}.
+
+handle_call(obtain, From, State = #fhc_state { obtains_limit = Limit }) ->
+ case over_limit(Limit, State) of
+ {true, State1 = #fhc_state { obtains = Obtains }} ->
+ {noreply, State1 #fhc_state { obtains = [From | Obtains] }};
+ {false, State1} ->
+ {reply, ok, State1}
+ end;
handle_call({open, Pid, EldestUnusedSince, CanClose}, From, State =
- #fhc_state { elders = Elders }) ->
+ #fhc_state { limit = Limit, elders = Elders }) ->
Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
- add_obtains(From, CanClose,
- ensure_mref(Pid, State #fhc_state { elders = Elders1 })).
+ case over_limit(Limit,
+ ensure_mref(Pid, State #fhc_state { elders = Elders1 })) of
+ {true, State1 = #fhc_state { opens = Opens }} ->
+ case CanClose of
+ true -> {reply, close, State1};
+ false -> {noreply, State1 #fhc_state { opens = [From | Opens] }}
+ end;
+ {false, State1} ->
+ {reply, ok, State1}
+ end.
handle_cast({register_callback, Pid, MFA},
State = #fhc_state { callbacks = Callbacks }) ->
@@ -756,7 +779,7 @@ handle_cast({close, Pid, EldestUnusedSince}, State =
undefined -> dict:erase(Pid, Elders);
_ -> dict:store(Pid, EldestUnusedSince, Elders)
end,
- {noreply, process_obtains(
+ {noreply, process_pending(
ensure_mref(Pid, State #fhc_state { elders = Elders1,
count = Count - 1 }))};
@@ -770,7 +793,7 @@ handle_cast({release_on_death, Pid}, State) ->
handle_info({'DOWN', MRef, process, Pid, _Reason}, State =
#fhc_state { count = Count, callbacks = Callbacks,
client_mrefs = ClientMRefs, elders = Elders }) ->
- {noreply, process_obtains(
+ {noreply, process_pending(
case dict:find(Pid, ClientMRefs) of
{ok, MRef} -> State #fhc_state {
elders = dict:erase(Pid, Elders),
@@ -789,35 +812,40 @@ code_change(_OldVsn, State, _Extra) ->
%% server helpers
%%----------------------------------------------------------------------------
-add_obtains(From, CanClose, State = #fhc_state { count = Count }) ->
- State1 = #fhc_state { count = Count1, limit = Limit, obtains = Obtains } =
+over_limit(Limit, State = #fhc_state { count = Count }) ->
+ State1 = #fhc_state { count = Count1 } =
maybe_reduce(State #fhc_state { count = Count + 1 }),
- case Limit /= infinity andalso Count1 >= Limit of
- true ->
- case CanClose of
- true ->
- {reply, close, State1 #fhc_state { count = Count1 - 1 }};
- false ->
- {noreply, State1 #fhc_state { obtains = [From | Obtains],
- count = Count1 - 1 }}
- end;
- false ->
- {reply, ok, State1}
+ case Limit =/= infinity andalso Count1 >= Limit of
+ true -> {true, State1 #fhc_state { count = Count1 - 1 }};
+ false -> {false, State1}
end.
-process_obtains(State = #fhc_state { obtains = [] }) ->
- State;
-process_obtains(State = #fhc_state { limit = Limit, count = Count })
- when Limit /= infinity andalso Count >= Limit ->
- State;
-process_obtains(State = #fhc_state { limit = Limit, count = Count,
- obtains = Obtains }) ->
- ObtainsLen = length(Obtains),
- ObtainableLen = lists:min([ObtainsLen, Limit - Count]),
- Take = ObtainsLen - ObtainableLen,
- {ObtainsNew, ObtainableRev} = lists:split(Take, Obtains),
- [gen_server:reply(From, ok) || From <- ObtainableRev],
- State #fhc_state { count = Count + ObtainableLen, obtains = ObtainsNew }.
+process_pending(State) ->
+ process_obtains(process_opens(State)).
+
+process_opens(State =
+ #fhc_state { opens = Opens, limit = Limit, count = Count }) ->
+ {Opens1, Count1} = process_pending(Opens, Limit, Count),
+ State #fhc_state { opens = Opens1, count = Count1}.
+
+process_obtains(State = #fhc_state { obtains = Obtains, obtains_limit = Limit,
+ count = Count }) ->
+ {Obtains1, Count1} = process_pending(Obtains, Limit, Count),
+ State #fhc_state { obtains = Obtains1, count = Count1 }.
+
+%% if limit is infinity then we'll never have any pendings, so will
+%% always hit the first head.
+process_pending([], _Limit, Count) ->
+ {[], Count};
+process_pending(Pending, Limit, Count) when Count >= Limit ->
+ {Pending, Count};
+process_pending(Pending, Limit, Count) ->
+ PendingLen = length(Pending),
+ SatisfiableLen = lists:min([PendingLen, Limit - Count]),
+ Take = PendingLen - SatisfiableLen,
+ {PendingNew, SatisfiableRev} = lists:split(Take, Pending),
+ [gen_server:reply(From, ok) || From <- SatisfiableRev],
+ {PendingNew, Count + SatisfiableLen}.
maybe_reduce(State = #fhc_state { limit = Limit, count = Count, elders = Elders,
callbacks = Callbacks, timer_ref = TRef })