diff options
authorMatthew Sackman <>2010-08-17 17:24:27 +0100
committerMatthew Sackman <>2010-08-17 17:24:27 +0100
commit1d89aa2cb8176510f51947daf55a126505a98763 (patch)
parentb40e8ecf8a3f04cff949dcdb0c273e1e84cb70cb (diff)
Correct monitoring and actions upon DOWN messages. Note this is especially subtle for obtains, which effectively implicitly allocates temporarily to the blocked caller (FromPid) whilst monitoring it, and then transfers this to the ForPid when possible. Note the ForPid can die before the obtains is processed, which which point the FromPid must be replied to immediately.
1 files changed, 105 insertions, 65 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index fe4bdc03..0ee3a709 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -183,7 +183,7 @@
- client_mrefs,
+ counts,
@@ -445,7 +445,7 @@ set_maximum_since_use(MaximumAge) ->
obtain(Pid) ->
- gen_server:call(?SERVER, {obtain, Pid}, infinity).
+ gen_server:call(?SERVER, {obtain, self(), Pid}, infinity).
%% Internal functions
@@ -738,29 +738,36 @@ init([]) ->
obtain_count = 0,
obtain_pending = [],
callbacks = dict:new(),
- client_mrefs = dict:new(),
+ counts = dict:new(),
timer_ref = undefined }}.
-handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit,
- obtain_count = Count,
- obtain_pending = Pending,
- elders = Elders })
+handle_call({obtain, FromPid, ForPid}, From,
+ State = #fhc_state { obtain_limit = Limit,
+ obtain_count = Count,
+ obtain_pending = Pending,
+ elders = Elders })
when Limit =/= infinity andalso Count >= Limit ->
- {noreply,
- State #fhc_state { obtain_pending = [{obtain, Pid, From} | Pending],
- elders = dict:erase(Pid, Elders) }};
-handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count,
- obtain_pending = Pending,
- elders = Elders }) ->
- case maybe_reduce(State #fhc_state { obtain_count = Count + 1 }) of
+ MRef = erlang:monitor(process, FromPid),
+ Pending1 = [{obtain, FromPid, MRef, From, ForPid} | Pending],
+ {noreply, ensure_mref(ForPid, State #fhc_state {
+ obtain_pending = Pending1,
+ elders = dict:erase(FromPid, Elders) })};
+handle_call({obtain, FromPid, ForPid}, From,
+ State = #fhc_state { obtain_count = Count,
+ obtain_pending = Pending,
+ elders = Elders }) ->
+ MRef = erlang:monitor(process, FromPid),
+ case maybe_reduce(ensure_mref(ForPid, State #fhc_state {
+ obtain_count = Count + 1 })) of
{true, State1} ->
+ Pending1 = [{obtain, FromPid, MRef, From, ForPid} | Pending],
{noreply, State1 #fhc_state {
obtain_count = Count,
- obtain_pending = [{obtain, Pid, From} | Pending],
- elders = dict:erase(Pid, Elders) }};
+ obtain_pending = Pending1,
+ elders = dict:erase(FromPid, Elders) }};
{false, State1} ->
- _MRef = erlang:monitor(process, Pid),
- {reply, ok, State1}
+ {noreply,
+ run_pending_item({obtain, FromPid, MRef, From, ForPid}, State1)}
handle_call({open, Pid, EldestUnusedSince, CanClose}, From,
@@ -775,12 +782,13 @@ handle_call({open, Pid, EldestUnusedSince, CanClose}, From,
State2 = State1 #fhc_state { open_count = Count },
case CanClose of
true -> {reply, close, State2};
- false -> {noreply, State2 #fhc_state {
- open_pending = [{open, From} | Pending],
- elders = dict:erase(Pid, Elders1) }}
+ false -> {noreply,
+ State2 #fhc_state {
+ open_pending = [{open, Pid, From} | Pending],
+ elders = dict:erase(Pid, Elders1) }}
{false, State1} ->
- {reply, ok, State1}
+ {noreply, run_pending_item({open, Pid, From}, State1)}
handle_cast({register_callback, Pid, MFA},
@@ -794,33 +802,59 @@ handle_cast({update, Pid, EldestUnusedSince}, State =
Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
%% don't call maybe_reduce from here otherwise we can create a
%% storm of messages
- {noreply, ensure_mref(Pid, State #fhc_state { elders = Elders1 })};
+ {noreply, State #fhc_state { elders = Elders1 }};
handle_cast({close, Pid, EldestUnusedSince}, State =
- #fhc_state { elders = Elders, open_count = Count }) ->
+ #fhc_state { elders = Elders, counts = Counts,
+ open_count = Count }) ->
Elders1 = case EldestUnusedSince of
undefined -> dict:erase(Pid, Elders);
_ -> dict:store(Pid, EldestUnusedSince, Elders)
- {noreply, process_pending(
- ensure_mref(Pid, State #fhc_state { open_count = Count - 1,
- elders = Elders1 }))};
+ {Obtained, Opened} = dict:fetch(Pid, Counts),
+ {noreply,
+ process_pending(State #fhc_state {
+ open_count = Count - 1,
+ counts = dict:store(Pid, {Obtained, Opened - 1}, Counts),
+ elders = Elders1 })};
handle_cast(check_counts, State) ->
{_, State1} = maybe_reduce(State #fhc_state { timer_ref = undefined }),
{noreply, State1}.
-handle_info({'DOWN', MRef, process, Pid, _Reason}, State =
- #fhc_state { obtain_count = Count, callbacks = Callbacks,
- client_mrefs = ClientMRefs, elders = Elders }) ->
- {noreply, process_pending(
- case dict:find(Pid, ClientMRefs) of
- {ok, MRef} -> State #fhc_state {
- elders = dict:erase(Pid, Elders),
- client_mrefs = dict:erase(Pid, ClientMRefs),
- callbacks = dict:erase(Pid, Callbacks) };
- _ -> State #fhc_state { obtain_count = Count - 1 }
- end)}.
+handle_info({'DOWN', _MRef, process, Pid, _Reason}, State =
+ #fhc_state { obtain_count = ObtainCount,
+ obtain_pending = ObtainPending,
+ open_count = OpenCount,
+ open_pending = OpenPending,
+ callbacks = Callbacks,
+ counts = Counts,
+ elders = Elders }) ->
+ ObtainPending1 =
+ lists:filter(
+ fun ({obtain, FromPid, FromMRef, From, ForPid}) ->
+ case Pid =:= ForPid of
+ true -> gen_server:reply(From, ok),
+ true = erlang:demonitor(FromMRef, [flush]),
+ false;
+ false -> Pid =/= FromPid
+ end
+ end, ObtainPending),
+ OpenPending1 = lists:filter(fun ({open, Pid1, _From}) ->
+ Pid =/= Pid1
+ end, OpenPending),
+ {Obtained, Opened} = case dict:find(Pid, Counts) of
+ {ok, Val} -> Val;
+ error -> {0, 0}
+ end,
+ {noreply, process_pending(State #fhc_state {
+ elders = dict:erase(Pid, Elders),
+ counts = dict:erase(Pid, Counts),
+ callbacks = dict:erase(Pid, Callbacks),
+ obtain_count = ObtainCount - Obtained,
+ obtain_pending = ObtainPending1,
+ open_count = OpenCount - Opened,
+ open_pending = OpenPending1 })}.
terminate(_Reason, State) ->
@@ -841,10 +875,10 @@ process_open(State = #fhc_state { limit = Limit,
open_pending = Pending,
open_count = OpenCount,
obtain_count = ObtainCount }) ->
- {Pending1, Inc} =
- process_pending(Pending, Limit - (ObtainCount + OpenCount)),
- State #fhc_state { open_pending = Pending1,
- open_count = OpenCount + Inc }.
+ {Pending1, Inc, State1} =
+ process_pending(Pending, Limit - (ObtainCount + OpenCount), State),
+ State1 #fhc_state { open_pending = Pending1,
+ open_count = OpenCount + Inc }.
process_obtain(State = #fhc_state { limit = Limit,
obtain_pending = Pending,
@@ -853,27 +887,34 @@ process_obtain(State = #fhc_state { limit = Limit,
open_count = OpenCount }) ->
Quota = lists:min([ObtainLimit - ObtainCount,
Limit - (ObtainCount + OpenCount)]),
- {Pending1, Inc} = process_pending(Pending, Quota),
- State #fhc_state { obtain_pending = Pending1,
- obtain_count = ObtainCount + Inc }.
-process_pending([], _Quota) ->
- {[], 0};
-process_pending(Pending, Quota) when Quota =< 0 ->
- {Pending, 0};
-process_pending(Pending, Quota) ->
+ {Pending1, Inc, State1} = process_pending(Pending, Quota, State),
+ State1 #fhc_state { obtain_pending = Pending1,
+ obtain_count = ObtainCount + Inc }.
+process_pending([], _Quota, State) ->
+ {[], 0, State};
+process_pending(Pending, Quota, State) when Quota =< 0 ->
+ {Pending, 0, State};
+process_pending(Pending, Quota, State) ->
PendingLen = length(Pending),
SatisfiableLen = lists:min([PendingLen, Quota]),
Take = PendingLen - SatisfiableLen,
{PendingNew, SatisfiableRev} = lists:split(Take, Pending),
- [run_pending_item(Item) || Item <- SatisfiableRev],
- {PendingNew, SatisfiableLen}.
-run_pending_item({open, From}) ->
- gen_server:reply(From, ok);
-run_pending_item({obtain, Pid, From}) ->
- _MRef = erlang:monitor(process, Pid),
- gen_server:reply(From, ok).
+ {PendingNew, SatisfiableLen,
+ lists:foldl(fun run_pending_item/2, State, SatisfiableRev)}.
+run_pending_item({open, Pid, From}, State = #fhc_state { counts = Counts }) ->
+ gen_server:reply(From, ok),
+ {Obtained, Opened} = dict:fetch(Pid, Counts),
+ State #fhc_state {
+ counts = dict:store(Pid, {Obtained, Opened + 1}, Counts) };
+run_pending_item({obtain, _FromPid, FromMRef, From, ForPid},
+ State = #fhc_state { counts = Counts }) ->
+ gen_server:reply(From, ok),
+ true = erlang:demonitor(FromMRef, [flush]),
+ {Obtained, Opened} = dict:fetch(ForPid, Counts),
+ State #fhc_state {
+ counts = dict:store(ForPid, {Obtained + 1, Opened}, Counts) }.
maybe_reduce(State = #fhc_state { limit = Limit,
open_count = OpenCount,
@@ -948,10 +989,9 @@ ulimit() ->
-ensure_mref(Pid, State = #fhc_state { client_mrefs = ClientMRefs }) ->
- case dict:find(Pid, ClientMRefs) of
- {ok, _MRef} -> State;
- error -> MRef = erlang:monitor(process, Pid),
- State #fhc_state {
- client_mrefs = dict:store(Pid, MRef, ClientMRefs) }
+ensure_mref(Pid, State = #fhc_state { counts = Counts }) ->
+ case dict:find(Pid, Counts) of
+ {ok, _} -> State;
+ error -> _MRef = erlang:monitor(process, Pid),
+ State #fhc_state { counts = dict:store(Pid, {0, 0}, Counts) }