summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-08-19 13:40:34 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-08-19 13:40:34 +0100
commita640121b2c55055c879056806a35ad953b0e1e3e (patch)
tree6196399d8f36528b21241330bf6e48de2e5a5f8a
parent2d72c4d556f63f11eaa91a3d1b068e59636eb5fc (diff)
parent6705d9fdab22b03ae998eea8bfce8134de504298 (diff)
downloadrabbitmq-server-a640121b2c55055c879056806a35ad953b0e1e3e.tar.gz
Merging default into bug 23139 (substantial debitrot)
-rw-r--r--src/file_handle_cache.erl74
1 files changed, 41 insertions, 33 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 74567d09..6c6ed172 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -184,6 +184,7 @@
obtain_pending,
callbacks,
counts,
+ blocked,
timer_ref
}).
@@ -743,12 +744,14 @@ init([]) ->
obtain_pending = [],
callbacks = dict:new(),
counts = dict:new(),
+ blocked = sets:new(),
timer_ref = undefined }}.
handle_call({open, Pid, EldestUnusedSince, CanClose}, From,
State = #fhc_state { open_count = Count,
open_pending = Pending,
- elders = Elders }) ->
+ elders = Elders,
+ blocked = Blocked }) ->
Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
Item = {open, Pid, From},
case maybe_reduce(ensure_mref(Pid, State #fhc_state {
@@ -760,7 +763,7 @@ handle_call({open, Pid, EldestUnusedSince, CanClose}, From,
true -> {reply, close, State2};
false -> {noreply, State2 #fhc_state {
open_pending = [Item | Pending],
- elders = dict:erase(Pid, Elders1) }}
+ blocked = sets:add_element(Pid, Blocked) }}
end;
{false, State1} ->
{noreply, run_pending_item(Item, State1)}
@@ -769,15 +772,15 @@ handle_call({open, Pid, EldestUnusedSince, CanClose}, From,
handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit,
obtain_count = Count,
obtain_pending = Pending,
- elders = Elders })
+ blocked = Blocked })
when Limit =/= infinity andalso Count >= Limit ->
Item = {obtain, Pid, From},
{noreply, ensure_mref(Pid, State #fhc_state {
obtain_pending = [Item | Pending],
- elders = dict:erase(Pid, Elders) })};
+ blocked = sets:add_element(Pid, Blocked) })};
handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count,
obtain_pending = Pending,
- elders = Elders }) ->
+ blocked = Blocked }) ->
Item = {obtain, Pid, From},
case maybe_reduce(ensure_mref(Pid, State #fhc_state {
obtain_count = Count + 1 })) of
@@ -785,7 +788,7 @@ handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count,
{noreply, State1 #fhc_state {
obtain_count = Count,
obtain_pending = [Item | Pending],
- elders = dict:erase(Pid, Elders) }};
+ blocked = sets:add_element(Pid, Blocked) }};
{false, State1} ->
{noreply, run_pending_item(Item, State1)}
end.
@@ -833,19 +836,22 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, State =
obtain_pending = ObtainPending,
callbacks = Callbacks,
counts = Counts,
- elders = Elders }) ->
+ elders = Elders,
+ blocked = Blocked }) ->
FilterFun = fun ({_Kind, Pid1, _From}) -> Pid1 =/= Pid end,
OpenPending1 = lists:filter(FilterFun, OpenPending),
ObtainPending1 = lists:filter(FilterFun, ObtainPending),
{Opened, Obtained} = dict:fetch(Pid, Counts),
- {noreply, process_pending(State #fhc_state {
- open_count = OpenCount - Opened,
- open_pending = OpenPending1,
- obtain_count = ObtainCount - Obtained,
- obtain_pending = ObtainPending1,
- callbacks = dict:erase(Pid, Callbacks),
- counts = dict:erase(Pid, Counts),
- elders = dict:erase(Pid, Elders) })}.
+ {noreply, process_pending(
+ State #fhc_state {
+ open_count = OpenCount - Opened,
+ open_pending = OpenPending1,
+ obtain_count = ObtainCount - Obtained,
+ obtain_pending = ObtainPending1,
+ callbacks = dict:erase(Pid, Callbacks),
+ counts = dict:erase(Pid, Counts),
+ elders = dict:erase(Pid, Elders),
+ blocked = sets:del_element(Pid, Blocked) })}.
terminate(_Reason, State) ->
State.
@@ -886,30 +892,25 @@ process_pending([], _Quota, State) ->
{[], 0, State};
process_pending(Pending, Quota, State) when Quota =< 0 ->
{Pending, 0, State};
-process_pending(Pending, Quota, State = #fhc_state { counts = Counts }) ->
+process_pending(Pending, Quota, State) ->
PendingLen = length(Pending),
SatisfiableLen = lists:min([PendingLen, Quota]),
Take = PendingLen - SatisfiableLen,
{PendingNew, SatisfiableRev} = lists:split(Take, Pending),
- Counts1 = lists:foldl(fun run_pending_item1/2, Counts, SatisfiableRev),
- {PendingNew, SatisfiableLen, State #fhc_state { counts = Counts1 }}.
+ State1 = lists:foldl(fun run_pending_item/2, State, SatisfiableRev),
+ {PendingNew, SatisfiableLen, State1}.
-run_pending_item(Item, State = #fhc_state { counts = Counts }) ->
- State #fhc_state { counts = run_pending_item1(Item, Counts) }.
-
-run_pending_item1({Kind, Pid, From}, Counts) ->
+run_pending_item({Kind, Pid, From}, State = #fhc_state { counts = Counts,
+ blocked = Blocked }) ->
gen_server:reply(From, ok),
- update_counts(Kind, Pid, +1, Counts).
+ State #fhc_state { counts = update_counts(Kind, Pid, +1, Counts),
+ blocked = sets:del_element(Pid, Blocked) }.
update_counts(open, Pid, Delta, Counts) ->
- dict:update(Pid, fun ({Opened, Obtained})
- when Opened >= 0 andalso Obtained >= 0 ->
- {Opened + Delta, Obtained} end,
+ dict:update(Pid, fun ({Opened, Obtained}) -> {Opened + Delta, Obtained} end,
Counts);
update_counts(obtain, Pid, Delta, Counts) ->
- dict:update(Pid, fun ({Opened, Obtained})
- when Opened >= 0 andalso Obtained >= 0 ->
- {Opened, Obtained + Delta} end,
+ dict:update(Pid, fun ({Opened, Obtained}) -> {Opened, Obtained + Delta} end,
Counts).
maybe_reduce(State = #fhc_state { limit = Limit,
@@ -920,6 +921,7 @@ maybe_reduce(State = #fhc_state { limit = Limit,
obtain_pending = ObtainPending,
elders = Elders,
callbacks = Callbacks,
+ blocked = Blocked,
timer_ref = TRef })
when Limit =/= infinity andalso
(((OpenCount + ObtainCount) > Limit) orelse
@@ -929,13 +931,19 @@ maybe_reduce(State = #fhc_state { limit = Limit,
{Pids, Sum, ClientCount} =
dict:fold(fun (_Pid, undefined, Accs) ->
Accs;
- (Pid, Eldest, {PidsAcc, SumAcc, CountAcc}) ->
- {[Pid|PidsAcc], SumAcc + timer:now_diff(Now, Eldest),
- CountAcc + 1}
+ (Pid, Eldest, {PidsAcc, SumAcc, CountAcc} = Accs) ->
+ case sets:is_element(Pid, Blocked) of
+ true -> Accs;
+ false -> {[Pid|PidsAcc],
+ SumAcc + timer:now_diff(Now, Eldest),
+ CountAcc + 1}
+ end
end, {[], 0, 0}, Elders),
case Pids of
[] -> ok;
- _ -> AverageAge = Sum / ClientCount,
+ _ -> AverageAge =
+ lists:max([0, ((Sum - (?FILE_HANDLES_CHECK_INTERVAL * 1000))
+ / ClientCount)]),
lists:foreach(
fun (Pid) ->
case dict:find(Pid, Callbacks) of