diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-19 13:40:34 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-19 13:40:34 +0100 |
commit | a640121b2c55055c879056806a35ad953b0e1e3e (patch) | |
tree | 6196399d8f36528b21241330bf6e48de2e5a5f8a | |
parent | 2d72c4d556f63f11eaa91a3d1b068e59636eb5fc (diff) | |
parent | 6705d9fdab22b03ae998eea8bfce8134de504298 (diff) | |
download | rabbitmq-server-a640121b2c55055c879056806a35ad953b0e1e3e.tar.gz |
Merging default into bug 23139 (substantial debitrot)
-rw-r--r-- | src/file_handle_cache.erl | 74 |
1 files changed, 41 insertions, 33 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 74567d09..6c6ed172 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -184,6 +184,7 @@ obtain_pending, callbacks, counts, + blocked, timer_ref }). @@ -743,12 +744,14 @@ init([]) -> obtain_pending = [], callbacks = dict:new(), counts = dict:new(), + blocked = sets:new(), timer_ref = undefined }}. handle_call({open, Pid, EldestUnusedSince, CanClose}, From, State = #fhc_state { open_count = Count, open_pending = Pending, - elders = Elders }) -> + elders = Elders, + blocked = Blocked }) -> Elders1 = dict:store(Pid, EldestUnusedSince, Elders), Item = {open, Pid, From}, case maybe_reduce(ensure_mref(Pid, State #fhc_state { @@ -760,7 +763,7 @@ handle_call({open, Pid, EldestUnusedSince, CanClose}, From, true -> {reply, close, State2}; false -> {noreply, State2 #fhc_state { open_pending = [Item | Pending], - elders = dict:erase(Pid, Elders1) }} + blocked = sets:add_element(Pid, Blocked) }} end; {false, State1} -> {noreply, run_pending_item(Item, State1)} @@ -769,15 +772,15 @@ handle_call({open, Pid, EldestUnusedSince, CanClose}, From, handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit, obtain_count = Count, obtain_pending = Pending, - elders = Elders }) + blocked = Blocked }) when Limit =/= infinity andalso Count >= Limit -> Item = {obtain, Pid, From}, {noreply, ensure_mref(Pid, State #fhc_state { obtain_pending = [Item | Pending], - elders = dict:erase(Pid, Elders) })}; + blocked = sets:add_element(Pid, Blocked) })}; handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count, obtain_pending = Pending, - elders = Elders }) -> + blocked = Blocked }) -> Item = {obtain, Pid, From}, case maybe_reduce(ensure_mref(Pid, State #fhc_state { obtain_count = Count + 1 })) of @@ -785,7 +788,7 @@ handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count, {noreply, State1 #fhc_state { obtain_count = Count, obtain_pending = [Item | Pending], - elders = dict:erase(Pid, Elders) }}; + blocked = sets:add_element(Pid, Blocked) }}; {false, State1} -> {noreply, run_pending_item(Item, State1)} end. @@ -833,19 +836,22 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = obtain_pending = ObtainPending, callbacks = Callbacks, counts = Counts, - elders = Elders }) -> + elders = Elders, + blocked = Blocked }) -> FilterFun = fun ({_Kind, Pid1, _From}) -> Pid1 =/= Pid end, OpenPending1 = lists:filter(FilterFun, OpenPending), ObtainPending1 = lists:filter(FilterFun, ObtainPending), {Opened, Obtained} = dict:fetch(Pid, Counts), - {noreply, process_pending(State #fhc_state { - open_count = OpenCount - Opened, - open_pending = OpenPending1, - obtain_count = ObtainCount - Obtained, - obtain_pending = ObtainPending1, - callbacks = dict:erase(Pid, Callbacks), - counts = dict:erase(Pid, Counts), - elders = dict:erase(Pid, Elders) })}. + {noreply, process_pending( + State #fhc_state { + open_count = OpenCount - Opened, + open_pending = OpenPending1, + obtain_count = ObtainCount - Obtained, + obtain_pending = ObtainPending1, + callbacks = dict:erase(Pid, Callbacks), + counts = dict:erase(Pid, Counts), + elders = dict:erase(Pid, Elders), + blocked = sets:del_element(Pid, Blocked) })}. terminate(_Reason, State) -> State. @@ -886,30 +892,25 @@ process_pending([], _Quota, State) -> {[], 0, State}; process_pending(Pending, Quota, State) when Quota =< 0 -> {Pending, 0, State}; -process_pending(Pending, Quota, State = #fhc_state { counts = Counts }) -> +process_pending(Pending, Quota, State) -> PendingLen = length(Pending), SatisfiableLen = lists:min([PendingLen, Quota]), Take = PendingLen - SatisfiableLen, {PendingNew, SatisfiableRev} = lists:split(Take, Pending), - Counts1 = lists:foldl(fun run_pending_item1/2, Counts, SatisfiableRev), - {PendingNew, SatisfiableLen, State #fhc_state { counts = Counts1 }}. + State1 = lists:foldl(fun run_pending_item/2, State, SatisfiableRev), + {PendingNew, SatisfiableLen, State1}. -run_pending_item(Item, State = #fhc_state { counts = Counts }) -> - State #fhc_state { counts = run_pending_item1(Item, Counts) }. - -run_pending_item1({Kind, Pid, From}, Counts) -> +run_pending_item({Kind, Pid, From}, State = #fhc_state { counts = Counts, + blocked = Blocked }) -> gen_server:reply(From, ok), - update_counts(Kind, Pid, +1, Counts). + State #fhc_state { counts = update_counts(Kind, Pid, +1, Counts), + blocked = sets:del_element(Pid, Blocked) }. update_counts(open, Pid, Delta, Counts) -> - dict:update(Pid, fun ({Opened, Obtained}) - when Opened >= 0 andalso Obtained >= 0 -> - {Opened + Delta, Obtained} end, + dict:update(Pid, fun ({Opened, Obtained}) -> {Opened + Delta, Obtained} end, Counts); update_counts(obtain, Pid, Delta, Counts) -> - dict:update(Pid, fun ({Opened, Obtained}) - when Opened >= 0 andalso Obtained >= 0 -> - {Opened, Obtained + Delta} end, + dict:update(Pid, fun ({Opened, Obtained}) -> {Opened, Obtained + Delta} end, Counts). maybe_reduce(State = #fhc_state { limit = Limit, @@ -920,6 +921,7 @@ maybe_reduce(State = #fhc_state { limit = Limit, obtain_pending = ObtainPending, elders = Elders, callbacks = Callbacks, + blocked = Blocked, timer_ref = TRef }) when Limit =/= infinity andalso (((OpenCount + ObtainCount) > Limit) orelse @@ -929,13 +931,19 @@ maybe_reduce(State = #fhc_state { limit = Limit, {Pids, Sum, ClientCount} = dict:fold(fun (_Pid, undefined, Accs) -> Accs; - (Pid, Eldest, {PidsAcc, SumAcc, CountAcc}) -> - {[Pid|PidsAcc], SumAcc + timer:now_diff(Now, Eldest), - CountAcc + 1} + (Pid, Eldest, {PidsAcc, SumAcc, CountAcc} = Accs) -> + case sets:is_element(Pid, Blocked) of + true -> Accs; + false -> {[Pid|PidsAcc], + SumAcc + timer:now_diff(Now, Eldest), + CountAcc + 1} + end end, {[], 0, 0}, Elders), case Pids of [] -> ok; - _ -> AverageAge = Sum / ClientCount, + _ -> AverageAge = + lists:max([0, ((Sum - (?FILE_HANDLES_CHECK_INTERVAL * 1000)) + / ClientCount)]), lists:foreach( fun (Pid) -> case dict:find(Pid, Callbacks) of |