diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-20 15:24:34 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-20 15:24:34 +0100 |
commit | 75ff6f18b82ca754c74f1f8420a81638b8ad624b (patch) | |
tree | e0007bdede7d0252e55eba5a2e904c3662c6f1d6 | |
parent | f9b196d2fe12d3f88d318338170f9eb73067648e (diff) | |
download | rabbitmq-server-75ff6f18b82ca754c74f1f8420a81638b8ad624b.tar.gz |
Convert fhc to use an ets table with record per client which amalgamates several of the previous state entries
-rw-r--r-- | src/file_handle_cache.erl | 280 |
1 files changed, 135 insertions, 145 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index c49eae7c..5a83bf50 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -149,6 +149,7 @@ -define(FILE_HANDLES_CHECK_INTERVAL, 2000). -define(OBTAIN_LIMIT(LIMIT), trunc((LIMIT * 0.9) - 1)). +-define(CLIENT_ETS_TABLE, ?MODULE). %%---------------------------------------------------------------------------- @@ -182,13 +183,19 @@ obtain_limit, obtain_count, obtain_pending, - callbacks, - counts, - blocked, - due_no_open, + clients, timer_ref }). +-record(cstate, + { pid, + callback, + opened, + obtained, + blocked, + pending_closes + }). + %%---------------------------------------------------------------------------- %% Specs %%---------------------------------------------------------------------------- @@ -735,6 +742,7 @@ init([]) -> end, error_logger:info_msg("Limiting to approx ~p file handles (~p sockets)~n", [Limit, ObtainLimit]), + Clients = ets:new(?CLIENT_ETS_TABLE, [set, private, {keypos, #cstate.pid}]), {ok, #fhc_state { elders = dict:new(), limit = Limit, open_count = 0, @@ -742,33 +750,31 @@ init([]) -> obtain_limit = ObtainLimit, obtain_count = 0, obtain_pending = [], - callbacks = dict:new(), - counts = dict:new(), - blocked = sets:new(), - due_no_open = sets:new(), + clients = Clients, timer_ref = undefined }}. handle_call({open, Pid, EldestUnusedSince}, From, State = #fhc_state { open_count = Count, open_pending = Pending, elders = Elders, - blocked = Blocked, - due_no_open = DueNoOpen }) + clients = Clients }) when EldestUnusedSince =/= undefined -> Elders1 = dict:store(Pid, EldestUnusedSince, Elders), Item = {open, Pid, From}, - State1 = ensure_mref(Pid, State #fhc_state { elders = Elders1 }), + ok = ensure_mref(Pid, Clients), + State1 = State #fhc_state { elders = Elders1 }, case needs_reduce(State1 #fhc_state { open_count = Count + 1 }) of - true -> case dict:fetch(Pid, State1#fhc_state.counts) of - {0, _} -> + true -> case ets:lookup(Clients, Pid) of + [#cstate { opened = 0 }] -> + true = ets:update_element( + Clients, Pid, {#cstate.blocked, true}), {noreply, reduce(State1 #fhc_state { - open_pending = [Item | Pending], - blocked = sets:add_element(Pid, Blocked) })}; - _ -> - {reply, close, - State1 #fhc_state { - due_no_open = sets:add_element(Pid, DueNoOpen) }} + open_pending = [Item | Pending] })}; + [#cstate { opened = N }] -> + true = ets:update_element( + Clients, Pid, {#cstate.pending_closes, N}), + {reply, close, State1} end; false -> {noreply, run_pending_item(Item, State1)} end; @@ -776,30 +782,31 @@ handle_call({open, Pid, EldestUnusedSince}, From, handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit, obtain_count = Count, obtain_pending = Pending, - blocked = Blocked }) + clients = Clients }) when Limit =/= infinity andalso Count >= Limit -> + ok = ensure_mref(Pid, Clients), + true = ets:update_element(Clients, Pid, {#cstate.blocked, true}), Item = {obtain, Pid, From}, - {noreply, ensure_mref(Pid, State #fhc_state { - obtain_pending = [Item | Pending], - blocked = sets:add_element(Pid, Blocked) })}; -handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count, + {noreply, State #fhc_state { obtain_pending = [Item | Pending] }}; +handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count, obtain_pending = Pending, - blocked = Blocked }) -> + clients = Clients }) -> Item = {obtain, Pid, From}, - State1 = ensure_mref(Pid, State), - case needs_reduce(State1 #fhc_state { obtain_count = Count + 1 }) of - true -> {noreply, - reduce(State1 #fhc_state { - obtain_pending = [Item | Pending], - blocked = sets:add_element(Pid, Blocked) })}; - false -> {noreply, run_pending_item(Item, State1)} + ok = ensure_mref(Pid, Clients), + case needs_reduce(State #fhc_state { obtain_count = Count + 1 }) of + true -> + true = ets:update_element(Clients, Pid, {#cstate.blocked, true}), + {noreply, + reduce(State #fhc_state {obtain_pending = [Item | Pending] })}; + false -> + {noreply, run_pending_item(Item, State)} end. handle_cast({register_callback, Pid, MFA}, - State = #fhc_state { callbacks = Callbacks }) -> - {noreply, ensure_mref( - Pid, State #fhc_state { - callbacks = dict:store(Pid, MFA, Callbacks) })}; + State = #fhc_state { clients = Clients }) -> + ok = ensure_mref(Pid, Clients), + true = ets:update_element(Clients, Pid, {#cstate.callback, MFA}), + {noreply, State}; handle_cast({update, Pid, EldestUnusedSince}, State = #fhc_state { elders = Elders }) @@ -810,54 +817,52 @@ handle_cast({update, Pid, EldestUnusedSince}, {noreply, State #fhc_state { elders = Elders1 }}; handle_cast({close, Pid, EldestUnusedSince}, - State = #fhc_state { elders = Elders }) -> + State = #fhc_state { elders = Elders, clients = Clients }) -> Elders1 = case EldestUnusedSince of undefined -> dict:erase(Pid, Elders); _ -> dict:store(Pid, EldestUnusedSince, Elders) end, + ets:update_counter(Clients, Pid, {#cstate.pending_closes, -1, 0, 0}), {noreply, process_pending( update_counts(open, Pid, -1, State #fhc_state { elders = Elders1 }))}; handle_cast({transfer, FromPid, ToPid}, State) -> + ok = ensure_mref(ToPid, State#fhc_state.clients), {noreply, process_pending( update_counts(obtain, ToPid, +1, - update_counts(obtain, FromPid, -1, - ensure_mref(ToPid, State))))}; + update_counts(obtain, FromPid, -1, State)))}; handle_cast(check_counts, State) -> - {noreply, case needs_reduce(State) of - true -> reduce(State #fhc_state { timer_ref = undefined }); - false -> State + State1 = State #fhc_state { timer_ref = undefined }, + {noreply, case needs_reduce(State1) of + true -> reduce(State1); + false -> State1 end}. handle_info({'DOWN', _MRef, process, Pid, _Reason}, - State = #fhc_state { open_count = OpenCount, + State = #fhc_state { elders = Elders, + open_count = OpenCount, open_pending = OpenPending, obtain_count = ObtainCount, obtain_pending = ObtainPending, - callbacks = Callbacks, - counts = Counts, - elders = Elders, - blocked = Blocked, - due_no_open = DueNoOpen }) -> + clients = Clients }) -> FilterFun = fun ({_Kind, Pid1, _From}) -> Pid1 =/= Pid end, OpenPending1 = lists:filter(FilterFun, OpenPending), ObtainPending1 = lists:filter(FilterFun, ObtainPending), - {Opened, Obtained} = dict:fetch(Pid, Counts), + [#cstate { opened = Opened, obtained = Obtained }] = + ets:lookup(Clients, Pid), + true = ets:delete(Clients, Pid), {noreply, process_pending( State #fhc_state { open_count = OpenCount - Opened, open_pending = OpenPending1, obtain_count = ObtainCount - Obtained, obtain_pending = ObtainPending1, - callbacks = dict:erase(Pid, Callbacks), - counts = dict:erase(Pid, Counts), - elders = dict:erase(Pid, Elders), - blocked = sets:del_element(Pid, Blocked), - due_no_open = sets:del_element(Pid, DueNoOpen) })}. + elders = dict:erase(Pid, Elders) })}. -terminate(_Reason, State) -> +terminate(_Reason, State = #fhc_state { clients = Clients }) -> + ets:delete(Clients), State. code_change(_OldVsn, State, _Extra) -> @@ -902,33 +907,25 @@ process_pending(Pending, Quota, State) -> State1 = lists:foldl(fun run_pending_item/2, State, SatisfiableRev), {PendingNew, State1}. -run_pending_item({Kind, Pid, From}, State = #fhc_state { blocked = Blocked }) -> +run_pending_item({Kind, Pid, From}, State = #fhc_state { clients = Clients }) -> gen_server:reply(From, ok), - update_counts(Kind, Pid, +1, State #fhc_state { - blocked = sets:del_element(Pid, Blocked) }). + true = ets:update_element(Clients, Pid, {#cstate.blocked, false}), + update_counts(Kind, Pid, +1, State). update_counts(Kind, Pid, Delta, - State = #fhc_state { counts = Counts, - open_count = OpenCount, + State = #fhc_state { open_count = OpenCount, obtain_count = ObtainCount, - due_no_open = DueNoOpen }) -> - {Counts1, OpenDelta, ObtainDelta} = - update_counts1(Kind, Pid, Delta, Counts), - State #fhc_state { - counts = Counts1, - open_count = OpenCount + OpenDelta, - obtain_count = ObtainCount + ObtainDelta, - due_no_open = case dict:fetch(Pid, Counts1) of - {0, _} -> sets:del_element(Pid, DueNoOpen); - _ -> DueNoOpen - end }. - -update_counts1(open, Pid, Delta, Counts) -> - {dict:update(Pid, fun ({Opened, Obtained}) -> {Opened + Delta, Obtained} end, - Counts), Delta, 0}; -update_counts1(obtain, Pid, Delta, Counts) -> - {dict:update(Pid, fun ({Opened, Obtained}) -> {Opened, Obtained + Delta} end, - Counts), 0, Delta}. + clients = Clients }) -> + {OpenDelta, ObtainDelta} = update_counts1(Kind, Pid, Delta, Clients), + State #fhc_state { open_count = OpenCount + OpenDelta, + obtain_count = ObtainCount + ObtainDelta }. + +update_counts1(open, Pid, Delta, Clients) -> + ets:update_counter(Clients, Pid, {#cstate.opened, Delta}), + {Delta, 0}; +update_counts1(obtain, Pid, Delta, Clients) -> + ets:update_counter(Clients, Pid, {#cstate.obtained, Delta}), + {0, Delta}. needs_reduce(#fhc_state { limit = Limit, open_count = OpenCount, @@ -945,51 +942,77 @@ needs_reduce(#fhc_state { limit = Limit, reduce(State = #fhc_state { open_pending = OpenPending, obtain_pending = ObtainPending, elders = Elders, - callbacks = Callbacks, - blocked = Blocked, - counts = Counts, - due_no_open = DueNoOpen, + clients = Clients, timer_ref = TRef }) -> Now = now(), - {Pids, Sum, ClientCount} = - dict:fold(fun (Pid, Eldest, {PidsAcc, SumAcc, CountAcc} = Accs) -> - case sets:is_element(Pid, Blocked) orelse - sets:is_element(Pid, DueNoOpen) of + {CStates, Sum, ClientCount} = + dict:fold(fun (Pid, Eldest, {CStatesAcc, SumAcc, CountAcc} = Accs) -> + [#cstate { pending_closes = PendingCloses, + opened = Opened, blocked = Blocked } = + CState] = ets:lookup(Clients, Pid), + case Blocked orelse PendingCloses =:= Opened of true -> Accs; - false -> {[Pid|PidsAcc], + false -> {[CState | CStatesAcc], SumAcc + timer:now_diff(Now, Eldest), CountAcc + 1} end end, {[], 0, 0}, Elders), - State1 = - case Pids of - [] -> State; - _ -> case (Sum / ClientCount) - - (1000 * ?FILE_HANDLES_CHECK_INTERVAL) of - AverageAge when AverageAge > 0 -> - lists:foreach( - fun (Pid) -> - case dict:find(Pid, Callbacks) of - error -> - ok; - {ok, {M, F, A}} -> - apply(M, F, A ++ [AverageAge]) - end - end, Pids), - State; - _ -> - DueNoOpen1 = notify(Pids, Callbacks, Counts, - DueNoOpen, length(OpenPending) + - length(ObtainPending)), - State #fhc_state { due_no_open = DueNoOpen1 } - end - end, + case CStates of + [] -> ok; + _ -> case (Sum / ClientCount) - + (1000 * ?FILE_HANDLES_CHECK_INTERVAL) of + AverageAge when AverageAge > 0 -> + notify_age(CStates, AverageAge); + _ -> + notify_age0(Clients, CStates, + length(OpenPending) + length(ObtainPending)) + end + end, case TRef of undefined -> {ok, TRef1} = timer:apply_after( ?FILE_HANDLES_CHECK_INTERVAL, gen_server, cast, [?SERVER, check_counts]), - State1 #fhc_state { timer_ref = TRef1 }; - _ -> State1 + State #fhc_state { timer_ref = TRef1 }; + _ -> State + end. + +notify_age(CStates, AverageAge) -> + lists:foreach(fun (#cstate { callback = undefined }) -> + ok; + (#cstate { callback = {M, F, A} }) -> + apply(M, F, A ++ [AverageAge]) + end, CStates). + +notify_age0(Clients, CStates, Required) -> + Notifications = + [CState || CState <- CStates, CState#cstate.callback =/= undefined], + {L1, L2} = lists:split(random:uniform(length(Notifications)), + Notifications), + notify(Clients, Required, L2 ++ L1). + +notify(_Clients, _Required, []) -> + ok; +notify(_Clients, Required, _Notifications) when Required =< 0 -> + ok; +notify(Clients, Required, [#cstate{ callback = {M, F, A}, opened = Opened, + pid = Pid, pending_closes = PendingCloses } + | Notifications]) -> + Closable = Opened - PendingCloses, + apply(M, F, A ++ [0]), + ets:update_element(Clients, Pid, + {#cstate.pending_closes, PendingCloses + Closable}), + notify(Clients, Required - Closable, Notifications). + +ensure_mref(Pid, Clients) -> + case ets:insert_new(Clients, #cstate { pid = Pid, + callback = undefined, + opened = 0, + obtained = 0, + blocked = false, + pending_closes = 0 }) of + true -> _MRef = erlang:monitor(process, Pid), + ok; + false -> ok end. %% For all unices, assume ulimit exists. Further googling suggests @@ -1021,36 +1044,3 @@ ulimit() -> _ -> ?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS end. - -notify(Pids, Callbacks, Counts, DueNoOpen, Required) -> - Notifications = [{Callback, Pid, OpenCount} || - Pid <- Pids, - case dict:find(Pid, Callbacks) of - error -> Callback = undefined, - false; - {ok, CB} -> Callback = CB, - true - end, - begin - {OpenCount, _} = dict:fetch(Pid, Counts), - OpenCount > 0 - end], - {L1, L2} = lists:split(random:uniform(length(Notifications)), - Notifications), - notify(Required, DueNoOpen, L2 ++ L1). - -notify(_Required, Acc, []) -> - Acc; -notify(Required, Acc, _Notifications) when Required =< 0 -> - Acc; -notify(Required, Acc, [{{M, F, A}, Pid, Open} | Notifications]) -> - apply(M, F, A ++ [0]), - notify(Required - Open, sets:add_element(Pid, Acc), Notifications). - -ensure_mref(Pid, State = #fhc_state { counts = Counts }) -> - case dict:find(Pid, Counts) of - {ok, _} -> State; - error -> _MRef = erlang:monitor(process, Pid), - State #fhc_state { - counts = dict:store(Pid, {0, 0}, Counts) } - end. |