summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-08-20 15:24:34 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-08-20 15:24:34 +0100
commit75ff6f18b82ca754c74f1f8420a81638b8ad624b (patch)
treee0007bdede7d0252e55eba5a2e904c3662c6f1d6
parentf9b196d2fe12d3f88d318338170f9eb73067648e (diff)
downloadrabbitmq-server-75ff6f18b82ca754c74f1f8420a81638b8ad624b.tar.gz
Convert fhc to use an ets table with record per client which amalgamates several of the previous state entries
-rw-r--r--src/file_handle_cache.erl280
1 files changed, 135 insertions, 145 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index c49eae7c..5a83bf50 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -149,6 +149,7 @@
-define(FILE_HANDLES_CHECK_INTERVAL, 2000).
-define(OBTAIN_LIMIT(LIMIT), trunc((LIMIT * 0.9) - 1)).
+-define(CLIENT_ETS_TABLE, ?MODULE).
%%----------------------------------------------------------------------------
@@ -182,13 +183,19 @@
obtain_limit,
obtain_count,
obtain_pending,
- callbacks,
- counts,
- blocked,
- due_no_open,
+ clients,
timer_ref
}).
+-record(cstate,
+ { pid,
+ callback,
+ opened,
+ obtained,
+ blocked,
+ pending_closes
+ }).
+
%%----------------------------------------------------------------------------
%% Specs
%%----------------------------------------------------------------------------
@@ -735,6 +742,7 @@ init([]) ->
end,
error_logger:info_msg("Limiting to approx ~p file handles (~p sockets)~n",
[Limit, ObtainLimit]),
+ Clients = ets:new(?CLIENT_ETS_TABLE, [set, private, {keypos, #cstate.pid}]),
{ok, #fhc_state { elders = dict:new(),
limit = Limit,
open_count = 0,
@@ -742,33 +750,31 @@ init([]) ->
obtain_limit = ObtainLimit,
obtain_count = 0,
obtain_pending = [],
- callbacks = dict:new(),
- counts = dict:new(),
- blocked = sets:new(),
- due_no_open = sets:new(),
+ clients = Clients,
timer_ref = undefined }}.
handle_call({open, Pid, EldestUnusedSince}, From,
State = #fhc_state { open_count = Count,
open_pending = Pending,
elders = Elders,
- blocked = Blocked,
- due_no_open = DueNoOpen })
+ clients = Clients })
when EldestUnusedSince =/= undefined ->
Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
Item = {open, Pid, From},
- State1 = ensure_mref(Pid, State #fhc_state { elders = Elders1 }),
+ ok = ensure_mref(Pid, Clients),
+ State1 = State #fhc_state { elders = Elders1 },
case needs_reduce(State1 #fhc_state { open_count = Count + 1 }) of
- true -> case dict:fetch(Pid, State1#fhc_state.counts) of
- {0, _} ->
+ true -> case ets:lookup(Clients, Pid) of
+ [#cstate { opened = 0 }] ->
+ true = ets:update_element(
+ Clients, Pid, {#cstate.blocked, true}),
{noreply,
reduce(State1 #fhc_state {
- open_pending = [Item | Pending],
- blocked = sets:add_element(Pid, Blocked) })};
- _ ->
- {reply, close,
- State1 #fhc_state {
- due_no_open = sets:add_element(Pid, DueNoOpen) }}
+ open_pending = [Item | Pending] })};
+ [#cstate { opened = N }] ->
+ true = ets:update_element(
+ Clients, Pid, {#cstate.pending_closes, N}),
+ {reply, close, State1}
end;
false -> {noreply, run_pending_item(Item, State1)}
end;
@@ -776,30 +782,31 @@ handle_call({open, Pid, EldestUnusedSince}, From,
handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit,
obtain_count = Count,
obtain_pending = Pending,
- blocked = Blocked })
+ clients = Clients })
when Limit =/= infinity andalso Count >= Limit ->
+ ok = ensure_mref(Pid, Clients),
+ true = ets:update_element(Clients, Pid, {#cstate.blocked, true}),
Item = {obtain, Pid, From},
- {noreply, ensure_mref(Pid, State #fhc_state {
- obtain_pending = [Item | Pending],
- blocked = sets:add_element(Pid, Blocked) })};
-handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count,
+ {noreply, State #fhc_state { obtain_pending = [Item | Pending] }};
+handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count,
obtain_pending = Pending,
- blocked = Blocked }) ->
+ clients = Clients }) ->
Item = {obtain, Pid, From},
- State1 = ensure_mref(Pid, State),
- case needs_reduce(State1 #fhc_state { obtain_count = Count + 1 }) of
- true -> {noreply,
- reduce(State1 #fhc_state {
- obtain_pending = [Item | Pending],
- blocked = sets:add_element(Pid, Blocked) })};
- false -> {noreply, run_pending_item(Item, State1)}
+ ok = ensure_mref(Pid, Clients),
+ case needs_reduce(State #fhc_state { obtain_count = Count + 1 }) of
+ true ->
+ true = ets:update_element(Clients, Pid, {#cstate.blocked, true}),
+ {noreply,
+ reduce(State #fhc_state {obtain_pending = [Item | Pending] })};
+ false ->
+ {noreply, run_pending_item(Item, State)}
end.
handle_cast({register_callback, Pid, MFA},
- State = #fhc_state { callbacks = Callbacks }) ->
- {noreply, ensure_mref(
- Pid, State #fhc_state {
- callbacks = dict:store(Pid, MFA, Callbacks) })};
+ State = #fhc_state { clients = Clients }) ->
+ ok = ensure_mref(Pid, Clients),
+ true = ets:update_element(Clients, Pid, {#cstate.callback, MFA}),
+ {noreply, State};
handle_cast({update, Pid, EldestUnusedSince},
State = #fhc_state { elders = Elders })
@@ -810,54 +817,52 @@ handle_cast({update, Pid, EldestUnusedSince},
{noreply, State #fhc_state { elders = Elders1 }};
handle_cast({close, Pid, EldestUnusedSince},
- State = #fhc_state { elders = Elders }) ->
+ State = #fhc_state { elders = Elders, clients = Clients }) ->
Elders1 = case EldestUnusedSince of
undefined -> dict:erase(Pid, Elders);
_ -> dict:store(Pid, EldestUnusedSince, Elders)
end,
+ ets:update_counter(Clients, Pid, {#cstate.pending_closes, -1, 0, 0}),
{noreply, process_pending(
update_counts(open, Pid, -1,
State #fhc_state { elders = Elders1 }))};
handle_cast({transfer, FromPid, ToPid}, State) ->
+ ok = ensure_mref(ToPid, State#fhc_state.clients),
{noreply, process_pending(
update_counts(obtain, ToPid, +1,
- update_counts(obtain, FromPid, -1,
- ensure_mref(ToPid, State))))};
+ update_counts(obtain, FromPid, -1, State)))};
handle_cast(check_counts, State) ->
- {noreply, case needs_reduce(State) of
- true -> reduce(State #fhc_state { timer_ref = undefined });
- false -> State
+ State1 = State #fhc_state { timer_ref = undefined },
+ {noreply, case needs_reduce(State1) of
+ true -> reduce(State1);
+ false -> State1
end}.
handle_info({'DOWN', _MRef, process, Pid, _Reason},
- State = #fhc_state { open_count = OpenCount,
+ State = #fhc_state { elders = Elders,
+ open_count = OpenCount,
open_pending = OpenPending,
obtain_count = ObtainCount,
obtain_pending = ObtainPending,
- callbacks = Callbacks,
- counts = Counts,
- elders = Elders,
- blocked = Blocked,
- due_no_open = DueNoOpen }) ->
+ clients = Clients }) ->
FilterFun = fun ({_Kind, Pid1, _From}) -> Pid1 =/= Pid end,
OpenPending1 = lists:filter(FilterFun, OpenPending),
ObtainPending1 = lists:filter(FilterFun, ObtainPending),
- {Opened, Obtained} = dict:fetch(Pid, Counts),
+ [#cstate { opened = Opened, obtained = Obtained }] =
+ ets:lookup(Clients, Pid),
+ true = ets:delete(Clients, Pid),
{noreply, process_pending(
State #fhc_state {
open_count = OpenCount - Opened,
open_pending = OpenPending1,
obtain_count = ObtainCount - Obtained,
obtain_pending = ObtainPending1,
- callbacks = dict:erase(Pid, Callbacks),
- counts = dict:erase(Pid, Counts),
- elders = dict:erase(Pid, Elders),
- blocked = sets:del_element(Pid, Blocked),
- due_no_open = sets:del_element(Pid, DueNoOpen) })}.
+ elders = dict:erase(Pid, Elders) })}.
-terminate(_Reason, State) ->
+terminate(_Reason, State = #fhc_state { clients = Clients }) ->
+ ets:delete(Clients),
State.
code_change(_OldVsn, State, _Extra) ->
@@ -902,33 +907,25 @@ process_pending(Pending, Quota, State) ->
State1 = lists:foldl(fun run_pending_item/2, State, SatisfiableRev),
{PendingNew, State1}.
-run_pending_item({Kind, Pid, From}, State = #fhc_state { blocked = Blocked }) ->
+run_pending_item({Kind, Pid, From}, State = #fhc_state { clients = Clients }) ->
gen_server:reply(From, ok),
- update_counts(Kind, Pid, +1, State #fhc_state {
- blocked = sets:del_element(Pid, Blocked) }).
+ true = ets:update_element(Clients, Pid, {#cstate.blocked, false}),
+ update_counts(Kind, Pid, +1, State).
update_counts(Kind, Pid, Delta,
- State = #fhc_state { counts = Counts,
- open_count = OpenCount,
+ State = #fhc_state { open_count = OpenCount,
obtain_count = ObtainCount,
- due_no_open = DueNoOpen }) ->
- {Counts1, OpenDelta, ObtainDelta} =
- update_counts1(Kind, Pid, Delta, Counts),
- State #fhc_state {
- counts = Counts1,
- open_count = OpenCount + OpenDelta,
- obtain_count = ObtainCount + ObtainDelta,
- due_no_open = case dict:fetch(Pid, Counts1) of
- {0, _} -> sets:del_element(Pid, DueNoOpen);
- _ -> DueNoOpen
- end }.
-
-update_counts1(open, Pid, Delta, Counts) ->
- {dict:update(Pid, fun ({Opened, Obtained}) -> {Opened + Delta, Obtained} end,
- Counts), Delta, 0};
-update_counts1(obtain, Pid, Delta, Counts) ->
- {dict:update(Pid, fun ({Opened, Obtained}) -> {Opened, Obtained + Delta} end,
- Counts), 0, Delta}.
+ clients = Clients }) ->
+ {OpenDelta, ObtainDelta} = update_counts1(Kind, Pid, Delta, Clients),
+ State #fhc_state { open_count = OpenCount + OpenDelta,
+ obtain_count = ObtainCount + ObtainDelta }.
+
+update_counts1(open, Pid, Delta, Clients) ->
+ ets:update_counter(Clients, Pid, {#cstate.opened, Delta}),
+ {Delta, 0};
+update_counts1(obtain, Pid, Delta, Clients) ->
+ ets:update_counter(Clients, Pid, {#cstate.obtained, Delta}),
+ {0, Delta}.
needs_reduce(#fhc_state { limit = Limit,
open_count = OpenCount,
@@ -945,51 +942,77 @@ needs_reduce(#fhc_state { limit = Limit,
reduce(State = #fhc_state { open_pending = OpenPending,
obtain_pending = ObtainPending,
elders = Elders,
- callbacks = Callbacks,
- blocked = Blocked,
- counts = Counts,
- due_no_open = DueNoOpen,
+ clients = Clients,
timer_ref = TRef }) ->
Now = now(),
- {Pids, Sum, ClientCount} =
- dict:fold(fun (Pid, Eldest, {PidsAcc, SumAcc, CountAcc} = Accs) ->
- case sets:is_element(Pid, Blocked) orelse
- sets:is_element(Pid, DueNoOpen) of
+ {CStates, Sum, ClientCount} =
+ dict:fold(fun (Pid, Eldest, {CStatesAcc, SumAcc, CountAcc} = Accs) ->
+ [#cstate { pending_closes = PendingCloses,
+ opened = Opened, blocked = Blocked } =
+ CState] = ets:lookup(Clients, Pid),
+ case Blocked orelse PendingCloses =:= Opened of
true -> Accs;
- false -> {[Pid|PidsAcc],
+ false -> {[CState | CStatesAcc],
SumAcc + timer:now_diff(Now, Eldest),
CountAcc + 1}
end
end, {[], 0, 0}, Elders),
- State1 =
- case Pids of
- [] -> State;
- _ -> case (Sum / ClientCount)
- - (1000 * ?FILE_HANDLES_CHECK_INTERVAL) of
- AverageAge when AverageAge > 0 ->
- lists:foreach(
- fun (Pid) ->
- case dict:find(Pid, Callbacks) of
- error ->
- ok;
- {ok, {M, F, A}} ->
- apply(M, F, A ++ [AverageAge])
- end
- end, Pids),
- State;
- _ ->
- DueNoOpen1 = notify(Pids, Callbacks, Counts,
- DueNoOpen, length(OpenPending) +
- length(ObtainPending)),
- State #fhc_state { due_no_open = DueNoOpen1 }
- end
- end,
+ case CStates of
+ [] -> ok;
+ _ -> case (Sum / ClientCount) -
+ (1000 * ?FILE_HANDLES_CHECK_INTERVAL) of
+ AverageAge when AverageAge > 0 ->
+ notify_age(CStates, AverageAge);
+ _ ->
+ notify_age0(Clients, CStates,
+ length(OpenPending) + length(ObtainPending))
+ end
+ end,
case TRef of
undefined -> {ok, TRef1} = timer:apply_after(
?FILE_HANDLES_CHECK_INTERVAL,
gen_server, cast, [?SERVER, check_counts]),
- State1 #fhc_state { timer_ref = TRef1 };
- _ -> State1
+ State #fhc_state { timer_ref = TRef1 };
+ _ -> State
+ end.
+
+notify_age(CStates, AverageAge) ->
+ lists:foreach(fun (#cstate { callback = undefined }) ->
+ ok;
+ (#cstate { callback = {M, F, A} }) ->
+ apply(M, F, A ++ [AverageAge])
+ end, CStates).
+
+notify_age0(Clients, CStates, Required) ->
+ Notifications =
+ [CState || CState <- CStates, CState#cstate.callback =/= undefined],
+ {L1, L2} = lists:split(random:uniform(length(Notifications)),
+ Notifications),
+ notify(Clients, Required, L2 ++ L1).
+
+notify(_Clients, _Required, []) ->
+ ok;
+notify(_Clients, Required, _Notifications) when Required =< 0 ->
+ ok;
+notify(Clients, Required, [#cstate{ callback = {M, F, A}, opened = Opened,
+ pid = Pid, pending_closes = PendingCloses }
+ | Notifications]) ->
+ Closable = Opened - PendingCloses,
+ apply(M, F, A ++ [0]),
+ ets:update_element(Clients, Pid,
+ {#cstate.pending_closes, PendingCloses + Closable}),
+ notify(Clients, Required - Closable, Notifications).
+
+ensure_mref(Pid, Clients) ->
+ case ets:insert_new(Clients, #cstate { pid = Pid,
+ callback = undefined,
+ opened = 0,
+ obtained = 0,
+ blocked = false,
+ pending_closes = 0 }) of
+ true -> _MRef = erlang:monitor(process, Pid),
+ ok;
+ false -> ok
end.
%% For all unices, assume ulimit exists. Further googling suggests
@@ -1021,36 +1044,3 @@ ulimit() ->
_ ->
?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS
end.
-
-notify(Pids, Callbacks, Counts, DueNoOpen, Required) ->
- Notifications = [{Callback, Pid, OpenCount} ||
- Pid <- Pids,
- case dict:find(Pid, Callbacks) of
- error -> Callback = undefined,
- false;
- {ok, CB} -> Callback = CB,
- true
- end,
- begin
- {OpenCount, _} = dict:fetch(Pid, Counts),
- OpenCount > 0
- end],
- {L1, L2} = lists:split(random:uniform(length(Notifications)),
- Notifications),
- notify(Required, DueNoOpen, L2 ++ L1).
-
-notify(_Required, Acc, []) ->
- Acc;
-notify(Required, Acc, _Notifications) when Required =< 0 ->
- Acc;
-notify(Required, Acc, [{{M, F, A}, Pid, Open} | Notifications]) ->
- apply(M, F, A ++ [0]),
- notify(Required - Open, sets:add_element(Pid, Acc), Notifications).
-
-ensure_mref(Pid, State = #fhc_state { counts = Counts }) ->
- case dict:find(Pid, Counts) of
- {ok, _} -> State;
- error -> _MRef = erlang:monitor(process, Pid),
- State #fhc_state {
- counts = dict:store(Pid, {0, 0}, Counts) }
- end.