diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-19 16:27:08 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-19 16:27:08 +0100 |
commit | 0d30291716324b6a506bde609bf1d679b4d282be (patch) | |
tree | ece65901ab0beb1fec56d5a3dfa11729299d28f7 | |
parent | a640121b2c55055c879056806a35ad953b0e1e3e (diff) | |
download | rabbitmq-server-0d30291716324b6a506bde609bf1d679b4d282be.tar.gz |
Reworked substantially
-rw-r--r-- | src/file_handle_cache.erl | 205 |
1 files changed, 117 insertions, 88 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 6c6ed172..c4ce76ee 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -754,19 +754,18 @@ handle_call({open, Pid, EldestUnusedSince, CanClose}, From, blocked = Blocked }) -> Elders1 = dict:store(Pid, EldestUnusedSince, Elders), Item = {open, Pid, From}, - case maybe_reduce(ensure_mref(Pid, State #fhc_state { - open_count = Count + 1, - elders = Elders1 })) of - {true, State1} -> - State2 = State1 #fhc_state { open_count = Count }, - case CanClose of - true -> {reply, close, State2}; - false -> {noreply, State2 #fhc_state { - open_pending = [Item | Pending], - blocked = sets:add_element(Pid, Blocked) }} - end; - {false, State1} -> - {noreply, run_pending_item(Item, State1)} + State1 = ensure_mref(Pid, State #fhc_state { elders = Elders1 }), + case needs_reduce(State1 #fhc_state { open_count = Count + 1 }) of + true -> case CanClose of + true -> + {reply, close, State1}; + false -> + {noreply, + reduce(State1 #fhc_state { + open_pending = [Item | Pending], + blocked = sets:add_element(Pid, Blocked) })} + end; + false -> {noreply, run_pending_item(Item, State1)} end; handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit, @@ -778,19 +777,17 @@ handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit, {noreply, ensure_mref(Pid, State #fhc_state { obtain_pending = [Item | Pending], blocked = sets:add_element(Pid, Blocked) })}; -handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count, +handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count, obtain_pending = Pending, blocked = Blocked }) -> Item = {obtain, Pid, From}, - case maybe_reduce(ensure_mref(Pid, State #fhc_state { - obtain_count = Count + 1 })) of - {true, State1} -> - {noreply, State1 #fhc_state { - obtain_count = Count, - obtain_pending = [Item | Pending], - blocked = sets:add_element(Pid, Blocked) }}; - {false, State1} -> - {noreply, run_pending_item(Item, State1)} + State1 = ensure_mref(Pid, State), + case needs_reduce(State1 #fhc_state { obtain_count = Count + 1 }) of + true -> {noreply, + reduce(State1 #fhc_state { + obtain_pending = [Item | Pending], + blocked = sets:add_element(Pid, Blocked) })}; + false -> {noreply, run_pending_item(Item, State1)} end. handle_cast({register_callback, Pid, MFA}, @@ -807,27 +804,26 @@ handle_cast({update, Pid, EldestUnusedSince}, State = {noreply, State #fhc_state { elders = Elders1 }}; handle_cast({close, Pid, EldestUnusedSince}, - State = #fhc_state { open_count = Count, - counts = Counts, - elders = Elders }) -> + State = #fhc_state { elders = Elders }) -> Elders1 = case EldestUnusedSince of undefined -> dict:erase(Pid, Elders); _ -> dict:store(Pid, EldestUnusedSince, Elders) end, - Counts1 = update_counts(open, Pid, -1, Counts), - {noreply, process_pending(State #fhc_state { open_count = Count - 1, - counts = Counts1, - elders = Elders1 })}; + {noreply, process_pending( + update_counts(open, Pid, -1, + State #fhc_state { elders = Elders1 }))}; handle_cast({transfer, FromPid, ToPid}, State) -> - State1 = #fhc_state { counts = Counts } = ensure_mref(ToPid, State), - Counts1 = update_counts(obtain, FromPid, -1, Counts), - Counts2 = update_counts(obtain, ToPid, +1, Counts1), - {noreply, process_pending(State1 #fhc_state { counts = Counts2 })}; + {noreply, process_pending( + update_counts(obtain, ToPid, +1, + update_counts(obtain, FromPid, -1, + ensure_mref(ToPid, State))))}; handle_cast(check_counts, State) -> - {_, State1} = maybe_reduce(State #fhc_state { timer_ref = undefined }), - {noreply, State1}. + {noreply, case needs_reduce(State) of + true -> reduce(State #fhc_state { timer_ref = undefined }); + false -> State + end}. handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #fhc_state { open_count = OpenCount, @@ -866,16 +862,15 @@ code_change(_OldVsn, State, _Extra) -> process_pending(State = #fhc_state { limit = infinity }) -> State; process_pending(State) -> - process_obtain(process_open(State)). + process_open(process_obtain(State)). process_open(State = #fhc_state { limit = Limit, open_pending = Pending, open_count = OpenCount, obtain_count = ObtainCount }) -> - {Pending1, Inc, State1} = + {Pending1, State1} = process_pending(Pending, Limit - (ObtainCount + OpenCount), State), - State1 #fhc_state { open_pending = Pending1, - open_count = OpenCount + Inc }. + State1 #fhc_state { open_pending = Pending1 }. process_obtain(State = #fhc_state { limit = Limit, obtain_pending = Pending, @@ -884,49 +879,63 @@ process_obtain(State = #fhc_state { limit = Limit, open_count = OpenCount }) -> Quota = lists:min([ObtainLimit - ObtainCount, Limit - (ObtainCount + OpenCount)]), - {Pending1, Inc, State1} = process_pending(Pending, Quota, State), - State1 #fhc_state { obtain_pending = Pending1, - obtain_count = ObtainCount + Inc }. + {Pending1, State1} = process_pending(Pending, Quota, State), + State1 #fhc_state { obtain_pending = Pending1 }. process_pending([], _Quota, State) -> - {[], 0, State}; + {[], State}; process_pending(Pending, Quota, State) when Quota =< 0 -> - {Pending, 0, State}; + {Pending, State}; process_pending(Pending, Quota, State) -> PendingLen = length(Pending), SatisfiableLen = lists:min([PendingLen, Quota]), Take = PendingLen - SatisfiableLen, {PendingNew, SatisfiableRev} = lists:split(Take, Pending), State1 = lists:foldl(fun run_pending_item/2, State, SatisfiableRev), - {PendingNew, SatisfiableLen, State1}. + {PendingNew, State1}. -run_pending_item({Kind, Pid, From}, State = #fhc_state { counts = Counts, - blocked = Blocked }) -> +run_pending_item({Kind, Pid, From}, State = #fhc_state { blocked = Blocked }) -> gen_server:reply(From, ok), - State #fhc_state { counts = update_counts(Kind, Pid, +1, Counts), - blocked = sets:del_element(Pid, Blocked) }. - -update_counts(open, Pid, Delta, Counts) -> - dict:update(Pid, fun ({Opened, Obtained}) -> {Opened + Delta, Obtained} end, - Counts); -update_counts(obtain, Pid, Delta, Counts) -> - dict:update(Pid, fun ({Opened, Obtained}) -> {Opened, Obtained + Delta} end, - Counts). - -maybe_reduce(State = #fhc_state { limit = Limit, - open_count = OpenCount, - open_pending = OpenPending, - obtain_count = ObtainCount, - obtain_limit = ObtainLimit, - obtain_pending = ObtainPending, - elders = Elders, - callbacks = Callbacks, - blocked = Blocked, - timer_ref = TRef }) - when Limit =/= infinity andalso - (((OpenCount + ObtainCount) > Limit) orelse - (OpenPending =/= []) orelse - (ObtainCount < ObtainLimit andalso ObtainPending =/= [])) -> + update_counts(Kind, Pid, +1, State #fhc_state { + blocked = sets:del_element(Pid, Blocked) }). + +update_counts(Kind, Pid, Delta, + State = #fhc_state { counts = Counts, + open_count = OpenCount, + obtain_count = ObtainCount }) -> + {Counts1, OpenDelta, ObtainDelta} = + update_counts1(Kind, Pid, Delta, Counts), + State #fhc_state { counts = Counts1, + open_count = OpenCount + OpenDelta, + obtain_count = ObtainCount + ObtainDelta }. + + +update_counts1(open, Pid, Delta, Counts) -> + {dict:update(Pid, fun ({Opened, Obtained}) -> {Opened + Delta, Obtained} end, + Counts), Delta, 0}; +update_counts1(obtain, Pid, Delta, Counts) -> + {dict:update(Pid, fun ({Opened, Obtained}) -> {Opened, Obtained + Delta} end, + Counts), 0, Delta}. + +needs_reduce(#fhc_state { limit = Limit, + open_count = OpenCount, + open_pending = OpenPending, + obtain_count = ObtainCount, + obtain_limit = ObtainLimit, + obtain_pending = ObtainPending }) -> + Limit =/= infinity + andalso ((OpenCount + ObtainCount > Limit) + orelse (OpenPending =/= []) + orelse (ObtainCount < ObtainLimit + andalso ObtainPending =/= [])). + +reduce(State = #fhc_state { open_pending = OpenPending, + obtain_pending = ObtainPending, + elders = Elders, + callbacks = Callbacks, + blocked = Blocked, + counts = Counts, + timer_ref = TRef }) -> Now = now(), {Pids, Sum, ClientCount} = dict:fold(fun (_Pid, undefined, Accs) -> @@ -941,27 +950,38 @@ maybe_reduce(State = #fhc_state { limit = Limit, end, {[], 0, 0}, Elders), case Pids of [] -> ok; - _ -> AverageAge = - lists:max([0, ((Sum - (?FILE_HANDLES_CHECK_INTERVAL * 1000)) - / ClientCount)]), - lists:foreach( - fun (Pid) -> - case dict:find(Pid, Callbacks) of - error -> ok; - {ok, {M, F, A}} -> apply(M, F, A ++ [AverageAge]) - end - end, Pids) + _ -> case (Sum / ClientCount) - (2000 * ?FILE_HANDLES_CHECK_INTERVAL) of + AverageAge when AverageAge > 0 -> + lists:foreach( + fun (Pid) -> + case dict:find(Pid, Callbacks) of + error -> + ok; + {ok, {M, F, A}} -> + apply(M, F, A ++ [AverageAge]) + end + end, Pids); + _ -> + Required = length(OpenPending) + length(ObtainPending), + PidsCounts = [{Pid, OpCount} + || Pid <- Pids, + dict:is_key(Pid, Callbacks), + begin + {OpCount, _} = dict:fetch(Pid, Counts), + OpCount > 0 + end], + {L1, L2} = lists:split(random:uniform(length(PidsCounts)), + PidsCounts), + close(Callbacks, Required, L2 ++ L1) + end end, - AboveLimit = Limit =/= infinity andalso OpenCount + ObtainCount > Limit, case TRef of undefined -> {ok, TRef1} = timer:apply_after( ?FILE_HANDLES_CHECK_INTERVAL, gen_server, cast, [?SERVER, check_counts]), - {AboveLimit, State #fhc_state { timer_ref = TRef1 }}; - _ -> {AboveLimit, State} - end; -maybe_reduce(State) -> - {false, State}. + State #fhc_state { timer_ref = TRef1 }; + _ -> State + end. %% For all unices, assume ulimit exists. Further googling suggests %% that BSDs (incl OS X), solaris and linux all agree that ulimit -n @@ -993,6 +1013,15 @@ ulimit() -> ?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS end. +close(_Callbacks, _Required, []) -> + ok; +close(_Callbacks, Required, _List) when Required =< 0 -> + ok; +close(Callbacks, Required, [{Pid, Sum} | List]) -> + {M, F, A} = dict:fetch(Pid, Callbacks), + apply(M, F, A ++ [0]), + close(Callbacks, Required - Sum, List). + ensure_mref(Pid, State = #fhc_state { counts = Counts }) -> case dict:find(Pid, Counts) of {ok, _} -> State; |