summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-08-19 16:27:08 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-08-19 16:27:08 +0100
commit0d30291716324b6a506bde609bf1d679b4d282be (patch)
treeece65901ab0beb1fec56d5a3dfa11729299d28f7
parenta640121b2c55055c879056806a35ad953b0e1e3e (diff)
downloadrabbitmq-server-0d30291716324b6a506bde609bf1d679b4d282be.tar.gz
Reworked substantially
-rw-r--r--src/file_handle_cache.erl205
1 files changed, 117 insertions, 88 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 6c6ed172..c4ce76ee 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -754,19 +754,18 @@ handle_call({open, Pid, EldestUnusedSince, CanClose}, From,
blocked = Blocked }) ->
Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
Item = {open, Pid, From},
- case maybe_reduce(ensure_mref(Pid, State #fhc_state {
- open_count = Count + 1,
- elders = Elders1 })) of
- {true, State1} ->
- State2 = State1 #fhc_state { open_count = Count },
- case CanClose of
- true -> {reply, close, State2};
- false -> {noreply, State2 #fhc_state {
- open_pending = [Item | Pending],
- blocked = sets:add_element(Pid, Blocked) }}
- end;
- {false, State1} ->
- {noreply, run_pending_item(Item, State1)}
+ State1 = ensure_mref(Pid, State #fhc_state { elders = Elders1 }),
+ case needs_reduce(State1 #fhc_state { open_count = Count + 1 }) of
+ true -> case CanClose of
+ true ->
+ {reply, close, State1};
+ false ->
+ {noreply,
+ reduce(State1 #fhc_state {
+ open_pending = [Item | Pending],
+ blocked = sets:add_element(Pid, Blocked) })}
+ end;
+ false -> {noreply, run_pending_item(Item, State1)}
end;
handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit,
@@ -778,19 +777,17 @@ handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit,
{noreply, ensure_mref(Pid, State #fhc_state {
obtain_pending = [Item | Pending],
blocked = sets:add_element(Pid, Blocked) })};
-handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count,
+handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count,
obtain_pending = Pending,
blocked = Blocked }) ->
Item = {obtain, Pid, From},
- case maybe_reduce(ensure_mref(Pid, State #fhc_state {
- obtain_count = Count + 1 })) of
- {true, State1} ->
- {noreply, State1 #fhc_state {
- obtain_count = Count,
- obtain_pending = [Item | Pending],
- blocked = sets:add_element(Pid, Blocked) }};
- {false, State1} ->
- {noreply, run_pending_item(Item, State1)}
+ State1 = ensure_mref(Pid, State),
+ case needs_reduce(State1 #fhc_state { obtain_count = Count + 1 }) of
+ true -> {noreply,
+ reduce(State1 #fhc_state {
+ obtain_pending = [Item | Pending],
+ blocked = sets:add_element(Pid, Blocked) })};
+ false -> {noreply, run_pending_item(Item, State1)}
end.
handle_cast({register_callback, Pid, MFA},
@@ -807,27 +804,26 @@ handle_cast({update, Pid, EldestUnusedSince}, State =
{noreply, State #fhc_state { elders = Elders1 }};
handle_cast({close, Pid, EldestUnusedSince},
- State = #fhc_state { open_count = Count,
- counts = Counts,
- elders = Elders }) ->
+ State = #fhc_state { elders = Elders }) ->
Elders1 = case EldestUnusedSince of
undefined -> dict:erase(Pid, Elders);
_ -> dict:store(Pid, EldestUnusedSince, Elders)
end,
- Counts1 = update_counts(open, Pid, -1, Counts),
- {noreply, process_pending(State #fhc_state { open_count = Count - 1,
- counts = Counts1,
- elders = Elders1 })};
+ {noreply, process_pending(
+ update_counts(open, Pid, -1,
+ State #fhc_state { elders = Elders1 }))};
handle_cast({transfer, FromPid, ToPid}, State) ->
- State1 = #fhc_state { counts = Counts } = ensure_mref(ToPid, State),
- Counts1 = update_counts(obtain, FromPid, -1, Counts),
- Counts2 = update_counts(obtain, ToPid, +1, Counts1),
- {noreply, process_pending(State1 #fhc_state { counts = Counts2 })};
+ {noreply, process_pending(
+ update_counts(obtain, ToPid, +1,
+ update_counts(obtain, FromPid, -1,
+ ensure_mref(ToPid, State))))};
handle_cast(check_counts, State) ->
- {_, State1} = maybe_reduce(State #fhc_state { timer_ref = undefined }),
- {noreply, State1}.
+ {noreply, case needs_reduce(State) of
+ true -> reduce(State #fhc_state { timer_ref = undefined });
+ false -> State
+ end}.
handle_info({'DOWN', _MRef, process, Pid, _Reason}, State =
#fhc_state { open_count = OpenCount,
@@ -866,16 +862,15 @@ code_change(_OldVsn, State, _Extra) ->
process_pending(State = #fhc_state { limit = infinity }) ->
State;
process_pending(State) ->
- process_obtain(process_open(State)).
+ process_open(process_obtain(State)).
process_open(State = #fhc_state { limit = Limit,
open_pending = Pending,
open_count = OpenCount,
obtain_count = ObtainCount }) ->
- {Pending1, Inc, State1} =
+ {Pending1, State1} =
process_pending(Pending, Limit - (ObtainCount + OpenCount), State),
- State1 #fhc_state { open_pending = Pending1,
- open_count = OpenCount + Inc }.
+ State1 #fhc_state { open_pending = Pending1 }.
process_obtain(State = #fhc_state { limit = Limit,
obtain_pending = Pending,
@@ -884,49 +879,63 @@ process_obtain(State = #fhc_state { limit = Limit,
open_count = OpenCount }) ->
Quota = lists:min([ObtainLimit - ObtainCount,
Limit - (ObtainCount + OpenCount)]),
- {Pending1, Inc, State1} = process_pending(Pending, Quota, State),
- State1 #fhc_state { obtain_pending = Pending1,
- obtain_count = ObtainCount + Inc }.
+ {Pending1, State1} = process_pending(Pending, Quota, State),
+ State1 #fhc_state { obtain_pending = Pending1 }.
process_pending([], _Quota, State) ->
- {[], 0, State};
+ {[], State};
process_pending(Pending, Quota, State) when Quota =< 0 ->
- {Pending, 0, State};
+ {Pending, State};
process_pending(Pending, Quota, State) ->
PendingLen = length(Pending),
SatisfiableLen = lists:min([PendingLen, Quota]),
Take = PendingLen - SatisfiableLen,
{PendingNew, SatisfiableRev} = lists:split(Take, Pending),
State1 = lists:foldl(fun run_pending_item/2, State, SatisfiableRev),
- {PendingNew, SatisfiableLen, State1}.
+ {PendingNew, State1}.
-run_pending_item({Kind, Pid, From}, State = #fhc_state { counts = Counts,
- blocked = Blocked }) ->
+run_pending_item({Kind, Pid, From}, State = #fhc_state { blocked = Blocked }) ->
gen_server:reply(From, ok),
- State #fhc_state { counts = update_counts(Kind, Pid, +1, Counts),
- blocked = sets:del_element(Pid, Blocked) }.
-
-update_counts(open, Pid, Delta, Counts) ->
- dict:update(Pid, fun ({Opened, Obtained}) -> {Opened + Delta, Obtained} end,
- Counts);
-update_counts(obtain, Pid, Delta, Counts) ->
- dict:update(Pid, fun ({Opened, Obtained}) -> {Opened, Obtained + Delta} end,
- Counts).
-
-maybe_reduce(State = #fhc_state { limit = Limit,
- open_count = OpenCount,
- open_pending = OpenPending,
- obtain_count = ObtainCount,
- obtain_limit = ObtainLimit,
- obtain_pending = ObtainPending,
- elders = Elders,
- callbacks = Callbacks,
- blocked = Blocked,
- timer_ref = TRef })
- when Limit =/= infinity andalso
- (((OpenCount + ObtainCount) > Limit) orelse
- (OpenPending =/= []) orelse
- (ObtainCount < ObtainLimit andalso ObtainPending =/= [])) ->
+ update_counts(Kind, Pid, +1, State #fhc_state {
+ blocked = sets:del_element(Pid, Blocked) }).
+
+update_counts(Kind, Pid, Delta,
+ State = #fhc_state { counts = Counts,
+ open_count = OpenCount,
+ obtain_count = ObtainCount }) ->
+ {Counts1, OpenDelta, ObtainDelta} =
+ update_counts1(Kind, Pid, Delta, Counts),
+ State #fhc_state { counts = Counts1,
+ open_count = OpenCount + OpenDelta,
+ obtain_count = ObtainCount + ObtainDelta }.
+
+
+update_counts1(open, Pid, Delta, Counts) ->
+ {dict:update(Pid, fun ({Opened, Obtained}) -> {Opened + Delta, Obtained} end,
+ Counts), Delta, 0};
+update_counts1(obtain, Pid, Delta, Counts) ->
+ {dict:update(Pid, fun ({Opened, Obtained}) -> {Opened, Obtained + Delta} end,
+ Counts), 0, Delta}.
+
+needs_reduce(#fhc_state { limit = Limit,
+ open_count = OpenCount,
+ open_pending = OpenPending,
+ obtain_count = ObtainCount,
+ obtain_limit = ObtainLimit,
+ obtain_pending = ObtainPending }) ->
+ Limit =/= infinity
+ andalso ((OpenCount + ObtainCount > Limit)
+ orelse (OpenPending =/= [])
+ orelse (ObtainCount < ObtainLimit
+ andalso ObtainPending =/= [])).
+
+reduce(State = #fhc_state { open_pending = OpenPending,
+ obtain_pending = ObtainPending,
+ elders = Elders,
+ callbacks = Callbacks,
+ blocked = Blocked,
+ counts = Counts,
+ timer_ref = TRef }) ->
Now = now(),
{Pids, Sum, ClientCount} =
dict:fold(fun (_Pid, undefined, Accs) ->
@@ -941,27 +950,38 @@ maybe_reduce(State = #fhc_state { limit = Limit,
end, {[], 0, 0}, Elders),
case Pids of
[] -> ok;
- _ -> AverageAge =
- lists:max([0, ((Sum - (?FILE_HANDLES_CHECK_INTERVAL * 1000))
- / ClientCount)]),
- lists:foreach(
- fun (Pid) ->
- case dict:find(Pid, Callbacks) of
- error -> ok;
- {ok, {M, F, A}} -> apply(M, F, A ++ [AverageAge])
- end
- end, Pids)
+ _ -> case (Sum / ClientCount) - (2000 * ?FILE_HANDLES_CHECK_INTERVAL) of
+ AverageAge when AverageAge > 0 ->
+ lists:foreach(
+ fun (Pid) ->
+ case dict:find(Pid, Callbacks) of
+ error ->
+ ok;
+ {ok, {M, F, A}} ->
+ apply(M, F, A ++ [AverageAge])
+ end
+ end, Pids);
+ _ ->
+ Required = length(OpenPending) + length(ObtainPending),
+ PidsCounts = [{Pid, OpCount}
+ || Pid <- Pids,
+ dict:is_key(Pid, Callbacks),
+ begin
+ {OpCount, _} = dict:fetch(Pid, Counts),
+ OpCount > 0
+ end],
+ {L1, L2} = lists:split(random:uniform(length(PidsCounts)),
+ PidsCounts),
+ close(Callbacks, Required, L2 ++ L1)
+ end
end,
- AboveLimit = Limit =/= infinity andalso OpenCount + ObtainCount > Limit,
case TRef of
undefined -> {ok, TRef1} = timer:apply_after(
?FILE_HANDLES_CHECK_INTERVAL,
gen_server, cast, [?SERVER, check_counts]),
- {AboveLimit, State #fhc_state { timer_ref = TRef1 }};
- _ -> {AboveLimit, State}
- end;
-maybe_reduce(State) ->
- {false, State}.
+ State #fhc_state { timer_ref = TRef1 };
+ _ -> State
+ end.
%% For all unices, assume ulimit exists. Further googling suggests
%% that BSDs (incl OS X), solaris and linux all agree that ulimit -n
@@ -993,6 +1013,15 @@ ulimit() ->
?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS
end.
+close(_Callbacks, _Required, []) ->
+ ok;
+close(_Callbacks, Required, _List) when Required =< 0 ->
+ ok;
+close(Callbacks, Required, [{Pid, Sum} | List]) ->
+ {M, F, A} = dict:fetch(Pid, Callbacks),
+ apply(M, F, A ++ [0]),
+ close(Callbacks, Required - Sum, List).
+
ensure_mref(Pid, State = #fhc_state { counts = Counts }) ->
case dict:find(Pid, Counts) of
{ok, _} -> State;