diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-16 13:40:39 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-16 13:40:39 +0100 |
commit | 5f26be1c52eaeed3579b152571f7dea631c791c9 (patch) | |
tree | 87b6fb20744c972b8f184be259e1d3077f912a0d | |
parent | 7c559387acdfcdc51eced753696b6c6a2d678662 (diff) | |
download | rabbitmq-server-5f26be1c52eaeed3579b152571f7dea631c791c9.tar.gz |
Split obtains in two : one for open requests (higher priority) and one for obtains requests (lower priority, and a lower limit)
-rw-r--r-- | src/file_handle_cache.erl | 106 |
1 files changed, 67 insertions, 39 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index e79fbb96..130cdbde 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -148,6 +148,11 @@ -define(FILE_HANDLES_LIMIT_OTHER, 1024). -define(FILE_HANDLES_CHECK_INTERVAL, 2000). +-define(OBTAINS_LIMIT(LIMIT), case LIMIT of + infinity -> infinity; + _ -> trunc((LIMIT * 0.9) - 1) + end). + %%---------------------------------------------------------------------------- -record(file, @@ -174,8 +179,10 @@ -record(fhc_state, { elders, - limit, count, + limit, + opens, + obtains_limit, obtains, callbacks, client_mrefs, @@ -723,19 +730,35 @@ init([]) -> _ -> ulimit() end, - error_logger:info_msg("Limiting to approx ~p file handles~n", [Limit]), - {ok, #fhc_state { elders = dict:new(), limit = Limit, count = 0, - obtains = [], callbacks = dict:new(), - client_mrefs = dict:new(), timer_ref = undefined }}. - -handle_call(obtain, From, State) -> - add_obtains(From, false, State); + ObtainsLimit = ?OBTAINS_LIMIT(Limit) + error_logger:info_msg("Limiting to approx ~p file handles (~p sockets)~n", + [Limit, ObtainsLimit]), + {ok, #fhc_state { elders = dict:new(), count = 0, limit = Limit, opens = [], + obtains_limit = ObtainsLimit, obtains = [], + callbacks = dict:new(), client_mrefs = dict:new(), + timer_ref = undefined }}. + +handle_call(obtain, From, State = #fhc_state { obtains_limit = Limit }) -> + case over_limit(Limit, State) of + {true, State1 = #fhc_state { obtains = Obtains }} -> + {noreply, State1 #fhc_state { obtains = [From | Obtains] }}; + {false, State1} -> + {reply, ok, State1} + end; handle_call({open, Pid, EldestUnusedSince, CanClose}, From, State = - #fhc_state { elders = Elders }) -> + #fhc_state { limit = Limit, elders = Elders }) -> Elders1 = dict:store(Pid, EldestUnusedSince, Elders), - add_obtains(From, CanClose, - ensure_mref(Pid, State #fhc_state { elders = Elders1 })). + case over_limit(Limit, + ensure_mref(Pid, State #fhc_state { elders = Elders1 })) of + {true, State1 = #fhc_state { opens = Opens }} -> + case CanClose of + true -> {reply, close, State1}; + false -> {noreply, State1 #fhc_state { opens = [From | Opens] }} + end; + {false, State1} -> + {reply, ok, State1} + end. handle_cast({register_callback, Pid, MFA}, State = #fhc_state { callbacks = Callbacks }) -> @@ -756,7 +779,7 @@ handle_cast({close, Pid, EldestUnusedSince}, State = undefined -> dict:erase(Pid, Elders); _ -> dict:store(Pid, EldestUnusedSince, Elders) end, - {noreply, process_obtains( + {noreply, process_pending( ensure_mref(Pid, State #fhc_state { elders = Elders1, count = Count - 1 }))}; @@ -770,7 +793,7 @@ handle_cast({release_on_death, Pid}, State) -> handle_info({'DOWN', MRef, process, Pid, _Reason}, State = #fhc_state { count = Count, callbacks = Callbacks, client_mrefs = ClientMRefs, elders = Elders }) -> - {noreply, process_obtains( + {noreply, process_pending( case dict:find(Pid, ClientMRefs) of {ok, MRef} -> State #fhc_state { elders = dict:erase(Pid, Elders), @@ -789,35 +812,40 @@ code_change(_OldVsn, State, _Extra) -> %% server helpers %%---------------------------------------------------------------------------- -add_obtains(From, CanClose, State = #fhc_state { count = Count }) -> - State1 = #fhc_state { count = Count1, limit = Limit, obtains = Obtains } = +over_limit(Limit, State = #fhc_state { count = Count }) -> + State1 = #fhc_state { count = Count1 } = maybe_reduce(State #fhc_state { count = Count + 1 }), - case Limit /= infinity andalso Count1 >= Limit of - true -> - case CanClose of - true -> - {reply, close, State1 #fhc_state { count = Count1 - 1 }}; - false -> - {noreply, State1 #fhc_state { obtains = [From | Obtains], - count = Count1 - 1 }} - end; - false -> - {reply, ok, State1} + case Limit =/= infinity andalso Count1 >= Limit of + true -> {true, State1 #fhc_state { count = Count1 - 1 }}; + false -> {false, State1} end. -process_obtains(State = #fhc_state { obtains = [] }) -> - State; -process_obtains(State = #fhc_state { limit = Limit, count = Count }) - when Limit /= infinity andalso Count >= Limit -> - State; -process_obtains(State = #fhc_state { limit = Limit, count = Count, - obtains = Obtains }) -> - ObtainsLen = length(Obtains), - ObtainableLen = lists:min([ObtainsLen, Limit - Count]), - Take = ObtainsLen - ObtainableLen, - {ObtainsNew, ObtainableRev} = lists:split(Take, Obtains), - [gen_server:reply(From, ok) || From <- ObtainableRev], - State #fhc_state { count = Count + ObtainableLen, obtains = ObtainsNew }. +process_pending(State) -> + process_obtains(process_opens(State)). + +process_opens(State = + #fhc_state { opens = Opens, limit = Limit, count = Count }) -> + {Opens1, Count1} = process_pending(Opens, Limit, Count), + State #fhc_state { opens = Opens1, count = Count1}. + +process_obtains(State = #fhc_state { obtains = Obtains, obtains_limit = Limit, + count = Count }) -> + {Obtains1, Count1} = process_pending(Obtains, Limit, Count), + State #fhc_state { obtains = Obtains1, count = Count1 }. + +%% if limit is infinity then we'll never have any pendings, so will +%% always hit the first head. +process_pending([], _Limit, Count) -> + {[], Count}; +process_pending(Pending, Limit, Count) when Count >= Limit -> + {Pending, Count}; +process_pending(Pending, Limit, Count) -> + PendingLen = length(Pending), + SatisfiableLen = lists:min([PendingLen, Limit - Count]), + Take = PendingLen - SatisfiableLen, + {PendingNew, SatisfiableRev} = lists:split(Take, Pending), + [gen_server:reply(From, ok) || From <- SatisfiableRev], + {PendingNew, Count + SatisfiableLen}. maybe_reduce(State = #fhc_state { limit = Limit, count = Count, elders = Elders, callbacks = Callbacks, timer_ref = TRef }) |