diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-20 13:02:21 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-20 13:02:21 +0100 |
commit | f08a83d1fc8c463d536aab9e542a98d695db5d87 (patch) | |
tree | 7dbb0ade9caa181efac278f8f241439f4e083671 | |
parent | 7e21cda0d6d58465e6567e80bf905894df74bc3f (diff) | |
download | rabbitmq-server-f08a83d1fc8c463d536aab9e542a98d695db5d87.tar.gz |
If we sent an age of 0 to clients, make sure we do not send more ages of 0 to the same clients until they've actually closed all their handles. This ensures that as more requests come in once we're low on fds, we don't send hundreds of 0 ages to the same clients erroneously. It also means that we always target the correct number of *unique* clients to ask to close their fds, which avoids thrashing the same clients and improves performance markedly.
Also, if on open, we send "close" back to the client, that client *is* blocked (actually, due to have 0 opens) as we know it'll close, send us some closed msgs and then re do the open call. Thus we shouldn't be sending it any set maximum age messages.
-rw-r--r-- | src/file_handle_cache.erl | 97 |
1 files changed, 58 insertions, 39 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 312cd6e9..0ecb2e6d 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -185,6 +185,7 @@ callbacks, counts, blocked, + due_no_open, timer_ref }). @@ -745,13 +746,15 @@ init([]) -> callbacks = dict:new(), counts = dict:new(), blocked = sets:new(), + due_no_open = sets:new(), timer_ref = undefined }}. handle_call({open, Pid, EldestUnusedSince, CanClose}, From, State = #fhc_state { open_count = Count, open_pending = Pending, elders = Elders, - blocked = Blocked }) + blocked = Blocked, + due_no_open = DueNoOpen }) when EldestUnusedSince =/= undefined -> Elders1 = dict:store(Pid, EldestUnusedSince, Elders), Item = {open, Pid, From}, @@ -759,7 +762,9 @@ handle_call({open, Pid, EldestUnusedSince, CanClose}, From, case needs_reduce(State1 #fhc_state { open_count = Count + 1 }) of true -> case CanClose of true -> - {reply, close, State1}; + {reply, close, + State1 #fhc_state { + due_no_open = sets:add_element(Pid, DueNoOpen) }}; false -> {noreply, reduce(State1 #fhc_state { @@ -835,7 +840,8 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, callbacks = Callbacks, counts = Counts, elders = Elders, - blocked = Blocked }) -> + blocked = Blocked, + due_no_open = DueNoOpen }) -> FilterFun = fun ({_Kind, Pid1, _From}) -> Pid1 =/= Pid end, OpenPending1 = lists:filter(FilterFun, OpenPending), ObtainPending1 = lists:filter(FilterFun, ObtainPending), @@ -849,7 +855,8 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, callbacks = dict:erase(Pid, Callbacks), counts = dict:erase(Pid, Counts), elders = dict:erase(Pid, Elders), - blocked = sets:del_element(Pid, Blocked) })}. + blocked = sets:del_element(Pid, Blocked), + due_no_open = sets:del_element(Pid, DueNoOpen) })}. terminate(_Reason, State) -> State. @@ -904,13 +911,18 @@ run_pending_item({Kind, Pid, From}, State = #fhc_state { blocked = Blocked }) -> update_counts(Kind, Pid, Delta, State = #fhc_state { counts = Counts, open_count = OpenCount, - obtain_count = ObtainCount }) -> + obtain_count = ObtainCount, + due_no_open = DueNoOpen }) -> {Counts1, OpenDelta, ObtainDelta} = update_counts1(Kind, Pid, Delta, Counts), - State #fhc_state { counts = Counts1, - open_count = OpenCount + OpenDelta, - obtain_count = ObtainCount + ObtainDelta }. - + State #fhc_state { + counts = Counts1, + open_count = OpenCount + OpenDelta, + obtain_count = ObtainCount + ObtainDelta, + due_no_open = case dict:fetch(Pid, Counts1) of + {0, _} -> sets:del_element(Pid, DueNoOpen); + _ -> DueNoOpen + end }. update_counts1(open, Pid, Delta, Counts) -> {dict:update(Pid, fun ({Opened, Obtained}) -> {Opened + Delta, Obtained} end, @@ -937,41 +949,48 @@ reduce(State = #fhc_state { open_pending = OpenPending, callbacks = Callbacks, blocked = Blocked, counts = Counts, + due_no_open = DueNoOpen, timer_ref = TRef }) -> Now = now(), {Pids, Sum, ClientCount} = dict:fold(fun (Pid, Eldest, {PidsAcc, SumAcc, CountAcc} = Accs) -> - case sets:is_element(Pid, Blocked) of + case sets:is_element(Pid, Blocked) orelse + sets:is_element(Pid, DueNoOpen) of true -> Accs; false -> {[Pid|PidsAcc], SumAcc + timer:now_diff(Now, Eldest), CountAcc + 1} end end, {[], 0, 0}, Elders), - case Pids of - [] -> ok; - _ -> case (Sum / ClientCount) - (1000 * ?FILE_HANDLES_CHECK_INTERVAL) of - AverageAge when AverageAge > 0 -> - lists:foreach( - fun (Pid) -> - case dict:find(Pid, Callbacks) of - error -> - ok; - {ok, {M, F, A}} -> - apply(M, F, A ++ [AverageAge]) - end - end, Pids); - _ -> - notify(Pids, Callbacks, Counts, - length(OpenPending) + length(ObtainPending)) - end - end, + State1 = + case Pids of + [] -> State; + _ -> case (Sum / ClientCount) + - (1000 * ?FILE_HANDLES_CHECK_INTERVAL) of + AverageAge when AverageAge > 0 -> + lists:foreach( + fun (Pid) -> + case dict:find(Pid, Callbacks) of + error -> + ok; + {ok, {M, F, A}} -> + apply(M, F, A ++ [AverageAge]) + end + end, Pids), + State; + _ -> + DueNoOpen1 = notify(Pids, Callbacks, Counts, + DueNoOpen, length(OpenPending) + + length(ObtainPending)), + State #fhc_state { due_no_open = DueNoOpen1 } + end + end, case TRef of undefined -> {ok, TRef1} = timer:apply_after( ?FILE_HANDLES_CHECK_INTERVAL, gen_server, cast, [?SERVER, check_counts]), - State #fhc_state { timer_ref = TRef1 }; - _ -> State + State1 #fhc_state { timer_ref = TRef1 }; + _ -> State1 end. %% For all unices, assume ulimit exists. Further googling suggests @@ -1004,8 +1023,8 @@ ulimit() -> ?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS end. -notify(Pids, Callbacks, Counts, Required) -> - Notifications = [{Callback, OpenCount} || +notify(Pids, Callbacks, Counts, DueNoOpen, Required) -> + Notifications = [{Callback, Pid, OpenCount} || Pid <- Pids, case dict:find(Pid, Callbacks) of error -> Callback = undefined, @@ -1019,15 +1038,15 @@ notify(Pids, Callbacks, Counts, Required) -> end], {L1, L2} = lists:split(random:uniform(length(Notifications)), Notifications), - notify(Required, L2 ++ L1). + notify(Required, DueNoOpen, L2 ++ L1). -notify(_Required, []) -> - ok; -notify(Required, _Notifications) when Required =< 0 -> - ok; -notify(Required, [{{M, F, A}, Open} | Notifications]) -> +notify(_Required, Acc, []) -> + Acc; +notify(Required, Acc, _Notifications) when Required =< 0 -> + Acc; +notify(Required, Acc, [{{M, F, A}, Pid, Open} | Notifications]) -> apply(M, F, A ++ [0]), - notify(Required - Open, Notifications). + notify(Required - Open, sets:add_element(Pid, Acc), Notifications). ensure_mref(Pid, State = #fhc_state { counts = Counts }) -> case dict:find(Pid, Counts) of |