summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-08-17 17:24:27 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-08-17 17:24:27 +0100
commit1d89aa2cb8176510f51947daf55a126505a98763 (patch)
tree3f708414362a5e93ed6ea54d05a5da375e3daed5
parentb40e8ecf8a3f04cff949dcdb0c273e1e84cb70cb (diff)
downloadrabbitmq-server-1d89aa2cb8176510f51947daf55a126505a98763.tar.gz
Correct monitoring and actions upon DOWN messages. Note this is especially subtle for obtains, which effectively implicitly allocates temporarily to the blocked caller (FromPid) whilst monitoring it, and then transfers this to the ForPid when possible. Note the ForPid can die before the obtains is processed, which which point the FromPid must be replied to immediately.
-rw-r--r--src/file_handle_cache.erl170
1 files changed, 105 insertions, 65 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index fe4bdc03..0ee3a709 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -183,7 +183,7 @@
obtain_count,
obtain_pending,
callbacks,
- client_mrefs,
+ counts,
timer_ref
}).
@@ -445,7 +445,7 @@ set_maximum_since_use(MaximumAge) ->
end.
obtain(Pid) ->
- gen_server:call(?SERVER, {obtain, Pid}, infinity).
+ gen_server:call(?SERVER, {obtain, self(), Pid}, infinity).
%%----------------------------------------------------------------------------
%% Internal functions
@@ -738,29 +738,36 @@ init([]) ->
obtain_count = 0,
obtain_pending = [],
callbacks = dict:new(),
- client_mrefs = dict:new(),
+ counts = dict:new(),
timer_ref = undefined }}.
-handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit,
- obtain_count = Count,
- obtain_pending = Pending,
- elders = Elders })
+handle_call({obtain, FromPid, ForPid}, From,
+ State = #fhc_state { obtain_limit = Limit,
+ obtain_count = Count,
+ obtain_pending = Pending,
+ elders = Elders })
when Limit =/= infinity andalso Count >= Limit ->
- {noreply,
- State #fhc_state { obtain_pending = [{obtain, Pid, From} | Pending],
- elders = dict:erase(Pid, Elders) }};
-handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count,
- obtain_pending = Pending,
- elders = Elders }) ->
- case maybe_reduce(State #fhc_state { obtain_count = Count + 1 }) of
+ MRef = erlang:monitor(process, FromPid),
+ Pending1 = [{obtain, FromPid, MRef, From, ForPid} | Pending],
+ {noreply, ensure_mref(ForPid, State #fhc_state {
+ obtain_pending = Pending1,
+ elders = dict:erase(FromPid, Elders) })};
+handle_call({obtain, FromPid, ForPid}, From,
+ State = #fhc_state { obtain_count = Count,
+ obtain_pending = Pending,
+ elders = Elders }) ->
+ MRef = erlang:monitor(process, FromPid),
+ case maybe_reduce(ensure_mref(ForPid, State #fhc_state {
+ obtain_count = Count + 1 })) of
{true, State1} ->
+ Pending1 = [{obtain, FromPid, MRef, From, ForPid} | Pending],
{noreply, State1 #fhc_state {
obtain_count = Count,
- obtain_pending = [{obtain, Pid, From} | Pending],
- elders = dict:erase(Pid, Elders) }};
+ obtain_pending = Pending1,
+ elders = dict:erase(FromPid, Elders) }};
{false, State1} ->
- _MRef = erlang:monitor(process, Pid),
- {reply, ok, State1}
+ {noreply,
+ run_pending_item({obtain, FromPid, MRef, From, ForPid}, State1)}
end;
handle_call({open, Pid, EldestUnusedSince, CanClose}, From,
@@ -775,12 +782,13 @@ handle_call({open, Pid, EldestUnusedSince, CanClose}, From,
State2 = State1 #fhc_state { open_count = Count },
case CanClose of
true -> {reply, close, State2};
- false -> {noreply, State2 #fhc_state {
- open_pending = [{open, From} | Pending],
- elders = dict:erase(Pid, Elders1) }}
+ false -> {noreply,
+ State2 #fhc_state {
+ open_pending = [{open, Pid, From} | Pending],
+ elders = dict:erase(Pid, Elders1) }}
end;
{false, State1} ->
- {reply, ok, State1}
+ {noreply, run_pending_item({open, Pid, From}, State1)}
end.
handle_cast({register_callback, Pid, MFA},
@@ -794,33 +802,59 @@ handle_cast({update, Pid, EldestUnusedSince}, State =
Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
%% don't call maybe_reduce from here otherwise we can create a
%% storm of messages
- {noreply, ensure_mref(Pid, State #fhc_state { elders = Elders1 })};
+ {noreply, State #fhc_state { elders = Elders1 }};
handle_cast({close, Pid, EldestUnusedSince}, State =
- #fhc_state { elders = Elders, open_count = Count }) ->
+ #fhc_state { elders = Elders, counts = Counts,
+ open_count = Count }) ->
Elders1 = case EldestUnusedSince of
undefined -> dict:erase(Pid, Elders);
_ -> dict:store(Pid, EldestUnusedSince, Elders)
end,
- {noreply, process_pending(
- ensure_mref(Pid, State #fhc_state { open_count = Count - 1,
- elders = Elders1 }))};
+ {Obtained, Opened} = dict:fetch(Pid, Counts),
+ {noreply,
+ process_pending(State #fhc_state {
+ open_count = Count - 1,
+ counts = dict:store(Pid, {Obtained, Opened - 1}, Counts),
+ elders = Elders1 })};
handle_cast(check_counts, State) ->
{_, State1} = maybe_reduce(State #fhc_state { timer_ref = undefined }),
{noreply, State1}.
-handle_info({'DOWN', MRef, process, Pid, _Reason}, State =
- #fhc_state { obtain_count = Count, callbacks = Callbacks,
- client_mrefs = ClientMRefs, elders = Elders }) ->
- {noreply, process_pending(
- case dict:find(Pid, ClientMRefs) of
- {ok, MRef} -> State #fhc_state {
- elders = dict:erase(Pid, Elders),
- client_mrefs = dict:erase(Pid, ClientMRefs),
- callbacks = dict:erase(Pid, Callbacks) };
- _ -> State #fhc_state { obtain_count = Count - 1 }
- end)}.
+handle_info({'DOWN', _MRef, process, Pid, _Reason}, State =
+ #fhc_state { obtain_count = ObtainCount,
+ obtain_pending = ObtainPending,
+ open_count = OpenCount,
+ open_pending = OpenPending,
+ callbacks = Callbacks,
+ counts = Counts,
+ elders = Elders }) ->
+ ObtainPending1 =
+ lists:filter(
+ fun ({obtain, FromPid, FromMRef, From, ForPid}) ->
+ case Pid =:= ForPid of
+ true -> gen_server:reply(From, ok),
+ true = erlang:demonitor(FromMRef, [flush]),
+ false;
+ false -> Pid =/= FromPid
+ end
+ end, ObtainPending),
+ OpenPending1 = lists:filter(fun ({open, Pid1, _From}) ->
+ Pid =/= Pid1
+ end, OpenPending),
+ {Obtained, Opened} = case dict:find(Pid, Counts) of
+ {ok, Val} -> Val;
+ error -> {0, 0}
+ end,
+ {noreply, process_pending(State #fhc_state {
+ elders = dict:erase(Pid, Elders),
+ counts = dict:erase(Pid, Counts),
+ callbacks = dict:erase(Pid, Callbacks),
+ obtain_count = ObtainCount - Obtained,
+ obtain_pending = ObtainPending1,
+ open_count = OpenCount - Opened,
+ open_pending = OpenPending1 })}.
terminate(_Reason, State) ->
State.
@@ -841,10 +875,10 @@ process_open(State = #fhc_state { limit = Limit,
open_pending = Pending,
open_count = OpenCount,
obtain_count = ObtainCount }) ->
- {Pending1, Inc} =
- process_pending(Pending, Limit - (ObtainCount + OpenCount)),
- State #fhc_state { open_pending = Pending1,
- open_count = OpenCount + Inc }.
+ {Pending1, Inc, State1} =
+ process_pending(Pending, Limit - (ObtainCount + OpenCount), State),
+ State1 #fhc_state { open_pending = Pending1,
+ open_count = OpenCount + Inc }.
process_obtain(State = #fhc_state { limit = Limit,
obtain_pending = Pending,
@@ -853,27 +887,34 @@ process_obtain(State = #fhc_state { limit = Limit,
open_count = OpenCount }) ->
Quota = lists:min([ObtainLimit - ObtainCount,
Limit - (ObtainCount + OpenCount)]),
- {Pending1, Inc} = process_pending(Pending, Quota),
- State #fhc_state { obtain_pending = Pending1,
- obtain_count = ObtainCount + Inc }.
-
-process_pending([], _Quota) ->
- {[], 0};
-process_pending(Pending, Quota) when Quota =< 0 ->
- {Pending, 0};
-process_pending(Pending, Quota) ->
+ {Pending1, Inc, State1} = process_pending(Pending, Quota, State),
+ State1 #fhc_state { obtain_pending = Pending1,
+ obtain_count = ObtainCount + Inc }.
+
+process_pending([], _Quota, State) ->
+ {[], 0, State};
+process_pending(Pending, Quota, State) when Quota =< 0 ->
+ {Pending, 0, State};
+process_pending(Pending, Quota, State) ->
PendingLen = length(Pending),
SatisfiableLen = lists:min([PendingLen, Quota]),
Take = PendingLen - SatisfiableLen,
{PendingNew, SatisfiableRev} = lists:split(Take, Pending),
- [run_pending_item(Item) || Item <- SatisfiableRev],
- {PendingNew, SatisfiableLen}.
-
-run_pending_item({open, From}) ->
- gen_server:reply(From, ok);
-run_pending_item({obtain, Pid, From}) ->
- _MRef = erlang:monitor(process, Pid),
- gen_server:reply(From, ok).
+ {PendingNew, SatisfiableLen,
+ lists:foldl(fun run_pending_item/2, State, SatisfiableRev)}.
+
+run_pending_item({open, Pid, From}, State = #fhc_state { counts = Counts }) ->
+ gen_server:reply(From, ok),
+ {Obtained, Opened} = dict:fetch(Pid, Counts),
+ State #fhc_state {
+ counts = dict:store(Pid, {Obtained, Opened + 1}, Counts) };
+run_pending_item({obtain, _FromPid, FromMRef, From, ForPid},
+ State = #fhc_state { counts = Counts }) ->
+ gen_server:reply(From, ok),
+ true = erlang:demonitor(FromMRef, [flush]),
+ {Obtained, Opened} = dict:fetch(ForPid, Counts),
+ State #fhc_state {
+ counts = dict:store(ForPid, {Obtained + 1, Opened}, Counts) }.
maybe_reduce(State = #fhc_state { limit = Limit,
open_count = OpenCount,
@@ -948,10 +989,9 @@ ulimit() ->
?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS
end.
-ensure_mref(Pid, State = #fhc_state { client_mrefs = ClientMRefs }) ->
- case dict:find(Pid, ClientMRefs) of
- {ok, _MRef} -> State;
- error -> MRef = erlang:monitor(process, Pid),
- State #fhc_state {
- client_mrefs = dict:store(Pid, MRef, ClientMRefs) }
+ensure_mref(Pid, State = #fhc_state { counts = Counts }) ->
+ case dict:find(Pid, Counts) of
+ {ok, _} -> State;
+ error -> _MRef = erlang:monitor(process, Pid),
+ State #fhc_state { counts = dict:store(Pid, {0, 0}, Counts) }
end.