diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-17 17:24:27 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-17 17:24:27 +0100 |
commit | 1d89aa2cb8176510f51947daf55a126505a98763 (patch) | |
tree | 3f708414362a5e93ed6ea54d05a5da375e3daed5 | |
parent | b40e8ecf8a3f04cff949dcdb0c273e1e84cb70cb (diff) | |
download | rabbitmq-server-1d89aa2cb8176510f51947daf55a126505a98763.tar.gz |
Correct monitoring and actions upon DOWN messages. Note this is especially subtle for obtains, which effectively implicitly allocates temporarily to the blocked caller (FromPid) whilst monitoring it, and then transfers this to the ForPid when possible. Note the ForPid can die before the obtains is processed, which which point the FromPid must be replied to immediately.
-rw-r--r-- | src/file_handle_cache.erl | 170 |
1 files changed, 105 insertions, 65 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index fe4bdc03..0ee3a709 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -183,7 +183,7 @@ obtain_count, obtain_pending, callbacks, - client_mrefs, + counts, timer_ref }). @@ -445,7 +445,7 @@ set_maximum_since_use(MaximumAge) -> end. obtain(Pid) -> - gen_server:call(?SERVER, {obtain, Pid}, infinity). + gen_server:call(?SERVER, {obtain, self(), Pid}, infinity). %%---------------------------------------------------------------------------- %% Internal functions @@ -738,29 +738,36 @@ init([]) -> obtain_count = 0, obtain_pending = [], callbacks = dict:new(), - client_mrefs = dict:new(), + counts = dict:new(), timer_ref = undefined }}. -handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit, - obtain_count = Count, - obtain_pending = Pending, - elders = Elders }) +handle_call({obtain, FromPid, ForPid}, From, + State = #fhc_state { obtain_limit = Limit, + obtain_count = Count, + obtain_pending = Pending, + elders = Elders }) when Limit =/= infinity andalso Count >= Limit -> - {noreply, - State #fhc_state { obtain_pending = [{obtain, Pid, From} | Pending], - elders = dict:erase(Pid, Elders) }}; -handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count, - obtain_pending = Pending, - elders = Elders }) -> - case maybe_reduce(State #fhc_state { obtain_count = Count + 1 }) of + MRef = erlang:monitor(process, FromPid), + Pending1 = [{obtain, FromPid, MRef, From, ForPid} | Pending], + {noreply, ensure_mref(ForPid, State #fhc_state { + obtain_pending = Pending1, + elders = dict:erase(FromPid, Elders) })}; +handle_call({obtain, FromPid, ForPid}, From, + State = #fhc_state { obtain_count = Count, + obtain_pending = Pending, + elders = Elders }) -> + MRef = erlang:monitor(process, FromPid), + case maybe_reduce(ensure_mref(ForPid, State #fhc_state { + obtain_count = Count + 1 })) of {true, State1} -> + Pending1 = [{obtain, FromPid, MRef, From, ForPid} | Pending], {noreply, State1 #fhc_state { obtain_count = Count, - obtain_pending = [{obtain, Pid, From} | Pending], - elders = dict:erase(Pid, Elders) }}; + obtain_pending = Pending1, + elders = dict:erase(FromPid, Elders) }}; {false, State1} -> - _MRef = erlang:monitor(process, Pid), - {reply, ok, State1} + {noreply, + run_pending_item({obtain, FromPid, MRef, From, ForPid}, State1)} end; handle_call({open, Pid, EldestUnusedSince, CanClose}, From, @@ -775,12 +782,13 @@ handle_call({open, Pid, EldestUnusedSince, CanClose}, From, State2 = State1 #fhc_state { open_count = Count }, case CanClose of true -> {reply, close, State2}; - false -> {noreply, State2 #fhc_state { - open_pending = [{open, From} | Pending], - elders = dict:erase(Pid, Elders1) }} + false -> {noreply, + State2 #fhc_state { + open_pending = [{open, Pid, From} | Pending], + elders = dict:erase(Pid, Elders1) }} end; {false, State1} -> - {reply, ok, State1} + {noreply, run_pending_item({open, Pid, From}, State1)} end. handle_cast({register_callback, Pid, MFA}, @@ -794,33 +802,59 @@ handle_cast({update, Pid, EldestUnusedSince}, State = Elders1 = dict:store(Pid, EldestUnusedSince, Elders), %% don't call maybe_reduce from here otherwise we can create a %% storm of messages - {noreply, ensure_mref(Pid, State #fhc_state { elders = Elders1 })}; + {noreply, State #fhc_state { elders = Elders1 }}; handle_cast({close, Pid, EldestUnusedSince}, State = - #fhc_state { elders = Elders, open_count = Count }) -> + #fhc_state { elders = Elders, counts = Counts, + open_count = Count }) -> Elders1 = case EldestUnusedSince of undefined -> dict:erase(Pid, Elders); _ -> dict:store(Pid, EldestUnusedSince, Elders) end, - {noreply, process_pending( - ensure_mref(Pid, State #fhc_state { open_count = Count - 1, - elders = Elders1 }))}; + {Obtained, Opened} = dict:fetch(Pid, Counts), + {noreply, + process_pending(State #fhc_state { + open_count = Count - 1, + counts = dict:store(Pid, {Obtained, Opened - 1}, Counts), + elders = Elders1 })}; handle_cast(check_counts, State) -> {_, State1} = maybe_reduce(State #fhc_state { timer_ref = undefined }), {noreply, State1}. -handle_info({'DOWN', MRef, process, Pid, _Reason}, State = - #fhc_state { obtain_count = Count, callbacks = Callbacks, - client_mrefs = ClientMRefs, elders = Elders }) -> - {noreply, process_pending( - case dict:find(Pid, ClientMRefs) of - {ok, MRef} -> State #fhc_state { - elders = dict:erase(Pid, Elders), - client_mrefs = dict:erase(Pid, ClientMRefs), - callbacks = dict:erase(Pid, Callbacks) }; - _ -> State #fhc_state { obtain_count = Count - 1 } - end)}. +handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = + #fhc_state { obtain_count = ObtainCount, + obtain_pending = ObtainPending, + open_count = OpenCount, + open_pending = OpenPending, + callbacks = Callbacks, + counts = Counts, + elders = Elders }) -> + ObtainPending1 = + lists:filter( + fun ({obtain, FromPid, FromMRef, From, ForPid}) -> + case Pid =:= ForPid of + true -> gen_server:reply(From, ok), + true = erlang:demonitor(FromMRef, [flush]), + false; + false -> Pid =/= FromPid + end + end, ObtainPending), + OpenPending1 = lists:filter(fun ({open, Pid1, _From}) -> + Pid =/= Pid1 + end, OpenPending), + {Obtained, Opened} = case dict:find(Pid, Counts) of + {ok, Val} -> Val; + error -> {0, 0} + end, + {noreply, process_pending(State #fhc_state { + elders = dict:erase(Pid, Elders), + counts = dict:erase(Pid, Counts), + callbacks = dict:erase(Pid, Callbacks), + obtain_count = ObtainCount - Obtained, + obtain_pending = ObtainPending1, + open_count = OpenCount - Opened, + open_pending = OpenPending1 })}. terminate(_Reason, State) -> State. @@ -841,10 +875,10 @@ process_open(State = #fhc_state { limit = Limit, open_pending = Pending, open_count = OpenCount, obtain_count = ObtainCount }) -> - {Pending1, Inc} = - process_pending(Pending, Limit - (ObtainCount + OpenCount)), - State #fhc_state { open_pending = Pending1, - open_count = OpenCount + Inc }. + {Pending1, Inc, State1} = + process_pending(Pending, Limit - (ObtainCount + OpenCount), State), + State1 #fhc_state { open_pending = Pending1, + open_count = OpenCount + Inc }. process_obtain(State = #fhc_state { limit = Limit, obtain_pending = Pending, @@ -853,27 +887,34 @@ process_obtain(State = #fhc_state { limit = Limit, open_count = OpenCount }) -> Quota = lists:min([ObtainLimit - ObtainCount, Limit - (ObtainCount + OpenCount)]), - {Pending1, Inc} = process_pending(Pending, Quota), - State #fhc_state { obtain_pending = Pending1, - obtain_count = ObtainCount + Inc }. - -process_pending([], _Quota) -> - {[], 0}; -process_pending(Pending, Quota) when Quota =< 0 -> - {Pending, 0}; -process_pending(Pending, Quota) -> + {Pending1, Inc, State1} = process_pending(Pending, Quota, State), + State1 #fhc_state { obtain_pending = Pending1, + obtain_count = ObtainCount + Inc }. + +process_pending([], _Quota, State) -> + {[], 0, State}; +process_pending(Pending, Quota, State) when Quota =< 0 -> + {Pending, 0, State}; +process_pending(Pending, Quota, State) -> PendingLen = length(Pending), SatisfiableLen = lists:min([PendingLen, Quota]), Take = PendingLen - SatisfiableLen, {PendingNew, SatisfiableRev} = lists:split(Take, Pending), - [run_pending_item(Item) || Item <- SatisfiableRev], - {PendingNew, SatisfiableLen}. - -run_pending_item({open, From}) -> - gen_server:reply(From, ok); -run_pending_item({obtain, Pid, From}) -> - _MRef = erlang:monitor(process, Pid), - gen_server:reply(From, ok). + {PendingNew, SatisfiableLen, + lists:foldl(fun run_pending_item/2, State, SatisfiableRev)}. + +run_pending_item({open, Pid, From}, State = #fhc_state { counts = Counts }) -> + gen_server:reply(From, ok), + {Obtained, Opened} = dict:fetch(Pid, Counts), + State #fhc_state { + counts = dict:store(Pid, {Obtained, Opened + 1}, Counts) }; +run_pending_item({obtain, _FromPid, FromMRef, From, ForPid}, + State = #fhc_state { counts = Counts }) -> + gen_server:reply(From, ok), + true = erlang:demonitor(FromMRef, [flush]), + {Obtained, Opened} = dict:fetch(ForPid, Counts), + State #fhc_state { + counts = dict:store(ForPid, {Obtained + 1, Opened}, Counts) }. maybe_reduce(State = #fhc_state { limit = Limit, open_count = OpenCount, @@ -948,10 +989,9 @@ ulimit() -> ?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS end. -ensure_mref(Pid, State = #fhc_state { client_mrefs = ClientMRefs }) -> - case dict:find(Pid, ClientMRefs) of - {ok, _MRef} -> State; - error -> MRef = erlang:monitor(process, Pid), - State #fhc_state { - client_mrefs = dict:store(Pid, MRef, ClientMRefs) } +ensure_mref(Pid, State = #fhc_state { counts = Counts }) -> + case dict:find(Pid, Counts) of + {ok, _} -> State; + error -> _MRef = erlang:monitor(process, Pid), + State #fhc_state { counts = dict:store(Pid, {0, 0}, Counts) } end. |