summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-08-20 13:02:21 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-08-20 13:02:21 +0100
commitf08a83d1fc8c463d536aab9e542a98d695db5d87 (patch)
tree7dbb0ade9caa181efac278f8f241439f4e083671
parent7e21cda0d6d58465e6567e80bf905894df74bc3f (diff)
downloadrabbitmq-server-f08a83d1fc8c463d536aab9e542a98d695db5d87.tar.gz
If we sent an age of 0 to clients, make sure we do not send more ages of 0 to the same clients until they've actually closed all their handles. This ensures that as more requests come in once we're low on fds, we don't send hundreds of 0 ages to the same clients erroneously. It also means that we always target the correct number of *unique* clients to ask to close their fds, which avoids thrashing the same clients and improves performance markedly.
Also, if on open, we send "close" back to the client, that client *is* blocked (actually, due to have 0 opens) as we know it'll close, send us some closed msgs and then re do the open call. Thus we shouldn't be sending it any set maximum age messages.
-rw-r--r--src/file_handle_cache.erl97
1 files changed, 58 insertions, 39 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 312cd6e9..0ecb2e6d 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -185,6 +185,7 @@
callbacks,
counts,
blocked,
+ due_no_open,
timer_ref
}).
@@ -745,13 +746,15 @@ init([]) ->
callbacks = dict:new(),
counts = dict:new(),
blocked = sets:new(),
+ due_no_open = sets:new(),
timer_ref = undefined }}.
handle_call({open, Pid, EldestUnusedSince, CanClose}, From,
State = #fhc_state { open_count = Count,
open_pending = Pending,
elders = Elders,
- blocked = Blocked })
+ blocked = Blocked,
+ due_no_open = DueNoOpen })
when EldestUnusedSince =/= undefined ->
Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
Item = {open, Pid, From},
@@ -759,7 +762,9 @@ handle_call({open, Pid, EldestUnusedSince, CanClose}, From,
case needs_reduce(State1 #fhc_state { open_count = Count + 1 }) of
true -> case CanClose of
true ->
- {reply, close, State1};
+ {reply, close,
+ State1 #fhc_state {
+ due_no_open = sets:add_element(Pid, DueNoOpen) }};
false ->
{noreply,
reduce(State1 #fhc_state {
@@ -835,7 +840,8 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason},
callbacks = Callbacks,
counts = Counts,
elders = Elders,
- blocked = Blocked }) ->
+ blocked = Blocked,
+ due_no_open = DueNoOpen }) ->
FilterFun = fun ({_Kind, Pid1, _From}) -> Pid1 =/= Pid end,
OpenPending1 = lists:filter(FilterFun, OpenPending),
ObtainPending1 = lists:filter(FilterFun, ObtainPending),
@@ -849,7 +855,8 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason},
callbacks = dict:erase(Pid, Callbacks),
counts = dict:erase(Pid, Counts),
elders = dict:erase(Pid, Elders),
- blocked = sets:del_element(Pid, Blocked) })}.
+ blocked = sets:del_element(Pid, Blocked),
+ due_no_open = sets:del_element(Pid, DueNoOpen) })}.
terminate(_Reason, State) ->
State.
@@ -904,13 +911,18 @@ run_pending_item({Kind, Pid, From}, State = #fhc_state { blocked = Blocked }) ->
update_counts(Kind, Pid, Delta,
State = #fhc_state { counts = Counts,
open_count = OpenCount,
- obtain_count = ObtainCount }) ->
+ obtain_count = ObtainCount,
+ due_no_open = DueNoOpen }) ->
{Counts1, OpenDelta, ObtainDelta} =
update_counts1(Kind, Pid, Delta, Counts),
- State #fhc_state { counts = Counts1,
- open_count = OpenCount + OpenDelta,
- obtain_count = ObtainCount + ObtainDelta }.
-
+ State #fhc_state {
+ counts = Counts1,
+ open_count = OpenCount + OpenDelta,
+ obtain_count = ObtainCount + ObtainDelta,
+ due_no_open = case dict:fetch(Pid, Counts1) of
+ {0, _} -> sets:del_element(Pid, DueNoOpen);
+ _ -> DueNoOpen
+ end }.
update_counts1(open, Pid, Delta, Counts) ->
{dict:update(Pid, fun ({Opened, Obtained}) -> {Opened + Delta, Obtained} end,
@@ -937,41 +949,48 @@ reduce(State = #fhc_state { open_pending = OpenPending,
callbacks = Callbacks,
blocked = Blocked,
counts = Counts,
+ due_no_open = DueNoOpen,
timer_ref = TRef }) ->
Now = now(),
{Pids, Sum, ClientCount} =
dict:fold(fun (Pid, Eldest, {PidsAcc, SumAcc, CountAcc} = Accs) ->
- case sets:is_element(Pid, Blocked) of
+ case sets:is_element(Pid, Blocked) orelse
+ sets:is_element(Pid, DueNoOpen) of
true -> Accs;
false -> {[Pid|PidsAcc],
SumAcc + timer:now_diff(Now, Eldest),
CountAcc + 1}
end
end, {[], 0, 0}, Elders),
- case Pids of
- [] -> ok;
- _ -> case (Sum / ClientCount) - (1000 * ?FILE_HANDLES_CHECK_INTERVAL) of
- AverageAge when AverageAge > 0 ->
- lists:foreach(
- fun (Pid) ->
- case dict:find(Pid, Callbacks) of
- error ->
- ok;
- {ok, {M, F, A}} ->
- apply(M, F, A ++ [AverageAge])
- end
- end, Pids);
- _ ->
- notify(Pids, Callbacks, Counts,
- length(OpenPending) + length(ObtainPending))
- end
- end,
+ State1 =
+ case Pids of
+ [] -> State;
+ _ -> case (Sum / ClientCount)
+ - (1000 * ?FILE_HANDLES_CHECK_INTERVAL) of
+ AverageAge when AverageAge > 0 ->
+ lists:foreach(
+ fun (Pid) ->
+ case dict:find(Pid, Callbacks) of
+ error ->
+ ok;
+ {ok, {M, F, A}} ->
+ apply(M, F, A ++ [AverageAge])
+ end
+ end, Pids),
+ State;
+ _ ->
+ DueNoOpen1 = notify(Pids, Callbacks, Counts,
+ DueNoOpen, length(OpenPending) +
+ length(ObtainPending)),
+ State #fhc_state { due_no_open = DueNoOpen1 }
+ end
+ end,
case TRef of
undefined -> {ok, TRef1} = timer:apply_after(
?FILE_HANDLES_CHECK_INTERVAL,
gen_server, cast, [?SERVER, check_counts]),
- State #fhc_state { timer_ref = TRef1 };
- _ -> State
+ State1 #fhc_state { timer_ref = TRef1 };
+ _ -> State1
end.
%% For all unices, assume ulimit exists. Further googling suggests
@@ -1004,8 +1023,8 @@ ulimit() ->
?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS
end.
-notify(Pids, Callbacks, Counts, Required) ->
- Notifications = [{Callback, OpenCount} ||
+notify(Pids, Callbacks, Counts, DueNoOpen, Required) ->
+ Notifications = [{Callback, Pid, OpenCount} ||
Pid <- Pids,
case dict:find(Pid, Callbacks) of
error -> Callback = undefined,
@@ -1019,15 +1038,15 @@ notify(Pids, Callbacks, Counts, Required) ->
end],
{L1, L2} = lists:split(random:uniform(length(Notifications)),
Notifications),
- notify(Required, L2 ++ L1).
+ notify(Required, DueNoOpen, L2 ++ L1).
-notify(_Required, []) ->
- ok;
-notify(Required, _Notifications) when Required =< 0 ->
- ok;
-notify(Required, [{{M, F, A}, Open} | Notifications]) ->
+notify(_Required, Acc, []) ->
+ Acc;
+notify(Required, Acc, _Notifications) when Required =< 0 ->
+ Acc;
+notify(Required, Acc, [{{M, F, A}, Pid, Open} | Notifications]) ->
apply(M, F, A ++ [0]),
- notify(Required - Open, Notifications).
+ notify(Required - Open, sets:add_element(Pid, Acc), Notifications).
ensure_mref(Pid, State = #fhc_state { counts = Counts }) ->
case dict:find(Pid, Counts) of