diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-17 17:42:23 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-17 17:42:23 +0100 |
commit | 1df32045348e5c17a88d36d63553539f11ba5b5b (patch) | |
tree | 9f2672de102386cad49ca755d5657a14adba6cc3 | |
parent | 1d89aa2cb8176510f51947daf55a126505a98763 (diff) | |
download | rabbitmq-server-1df32045348e5c17a88d36d63553539f11ba5b5b.tar.gz |
Track blocked pids explicitly
-rw-r--r-- | src/file_handle_cache.erl | 69 |
1 files changed, 42 insertions, 27 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 0ee3a709..e61e9e25 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -184,6 +184,7 @@ obtain_pending, callbacks, counts, + blocked, timer_ref }). @@ -739,32 +740,34 @@ init([]) -> obtain_pending = [], callbacks = dict:new(), counts = dict:new(), + blocked = sets:new(), timer_ref = undefined }}. handle_call({obtain, FromPid, ForPid}, From, State = #fhc_state { obtain_limit = Limit, obtain_count = Count, obtain_pending = Pending, - elders = Elders }) + blocked = Blocked }) when Limit =/= infinity andalso Count >= Limit -> MRef = erlang:monitor(process, FromPid), Pending1 = [{obtain, FromPid, MRef, From, ForPid} | Pending], - {noreply, ensure_mref(ForPid, State #fhc_state { - obtain_pending = Pending1, - elders = dict:erase(FromPid, Elders) })}; + {noreply, ensure_mref(ForPid, + State #fhc_state { + blocked = sets:add_element(FromPid, Blocked), + obtain_pending = Pending1 })}; handle_call({obtain, FromPid, ForPid}, From, State = #fhc_state { obtain_count = Count, obtain_pending = Pending, - elders = Elders }) -> + blocked = Blocked }) -> MRef = erlang:monitor(process, FromPid), case maybe_reduce(ensure_mref(ForPid, State #fhc_state { obtain_count = Count + 1 })) of {true, State1} -> Pending1 = [{obtain, FromPid, MRef, From, ForPid} | Pending], {noreply, State1 #fhc_state { + blocked = sets:add_element(FromPid, Blocked), obtain_count = Count, - obtain_pending = Pending1, - elders = dict:erase(FromPid, Elders) }}; + obtain_pending = Pending1 }}; {false, State1} -> {noreply, run_pending_item({obtain, FromPid, MRef, From, ForPid}, State1)} @@ -773,7 +776,8 @@ handle_call({obtain, FromPid, ForPid}, From, handle_call({open, Pid, EldestUnusedSince, CanClose}, From, State = #fhc_state { open_count = Count, open_pending = Pending, - elders = Elders }) -> + elders = Elders, + blocked = Blocked }) -> Elders1 = dict:store(Pid, EldestUnusedSince, Elders), case maybe_reduce( ensure_mref(Pid, State #fhc_state { open_count = Count + 1, @@ -784,8 +788,8 @@ handle_call({open, Pid, EldestUnusedSince, CanClose}, From, true -> {reply, close, State2}; false -> {noreply, State2 #fhc_state { - open_pending = [{open, Pid, From} | Pending], - elders = dict:erase(Pid, Elders1) }} + blocked = sets:add_element(Pid, Blocked), + open_pending = [{open, Pid, From} | Pending] }} end; {false, State1} -> {noreply, run_pending_item({open, Pid, From}, State1)} @@ -829,7 +833,8 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = open_pending = OpenPending, callbacks = Callbacks, counts = Counts, - elders = Elders }) -> + elders = Elders, + blocked = Blocked }) -> ObtainPending1 = lists:filter( fun ({obtain, FromPid, FromMRef, From, ForPid}) -> @@ -847,14 +852,16 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = {ok, Val} -> Val; error -> {0, 0} end, - {noreply, process_pending(State #fhc_state { - elders = dict:erase(Pid, Elders), - counts = dict:erase(Pid, Counts), - callbacks = dict:erase(Pid, Callbacks), - obtain_count = ObtainCount - Obtained, - obtain_pending = ObtainPending1, - open_count = OpenCount - Opened, - open_pending = OpenPending1 })}. + {noreply, process_pending( + State #fhc_state { + elders = dict:erase(Pid, Elders), + counts = dict:erase(Pid, Counts), + callbacks = dict:erase(Pid, Callbacks), + obtain_count = ObtainCount - Obtained, + obtain_pending = ObtainPending1, + open_count = OpenCount - Opened, + open_pending = OpenPending1, + blocked = sets:del_element(Pid, Blocked) })}. terminate(_Reason, State) -> State. @@ -903,18 +910,21 @@ process_pending(Pending, Quota, State) -> {PendingNew, SatisfiableLen, lists:foldl(fun run_pending_item/2, State, SatisfiableRev)}. -run_pending_item({open, Pid, From}, State = #fhc_state { counts = Counts }) -> +run_pending_item({open, Pid, From}, State = #fhc_state { counts = Counts, + blocked = Blocked }) -> gen_server:reply(From, ok), {Obtained, Opened} = dict:fetch(Pid, Counts), State #fhc_state { - counts = dict:store(Pid, {Obtained, Opened + 1}, Counts) }; -run_pending_item({obtain, _FromPid, FromMRef, From, ForPid}, - State = #fhc_state { counts = Counts }) -> + counts = dict:store(Pid, {Obtained, Opened + 1}, Counts), + blocked = sets:del_element(Pid, Blocked) }; +run_pending_item({obtain, FromPid, FromMRef, From, ForPid}, + State = #fhc_state { counts = Counts, blocked = Blocked }) -> gen_server:reply(From, ok), true = erlang:demonitor(FromMRef, [flush]), {Obtained, Opened} = dict:fetch(ForPid, Counts), State #fhc_state { - counts = dict:store(ForPid, {Obtained + 1, Opened}, Counts) }. + counts = dict:store(ForPid, {Obtained + 1, Opened}, Counts), + blocked = sets:del_element(FromPid, Blocked) }. maybe_reduce(State = #fhc_state { limit = Limit, open_count = OpenCount, @@ -924,6 +934,7 @@ maybe_reduce(State = #fhc_state { limit = Limit, obtain_pending = ObtainPending, elders = Elders, callbacks = Callbacks, + blocked = Blocked, timer_ref = TRef }) when Limit =/= infinity andalso (((OpenCount + ObtainCount) > Limit) orelse @@ -933,9 +944,13 @@ maybe_reduce(State = #fhc_state { limit = Limit, {Pids, Sum, ClientCount} = dict:fold(fun (_Pid, undefined, Accs) -> Accs; - (Pid, Eldest, {PidsAcc, SumAcc, CountAcc}) -> - {[Pid|PidsAcc], SumAcc + timer:now_diff(Now, Eldest), - CountAcc + 1} + (Pid, Eldest, {PidsAcc, SumAcc, CountAcc} = Accs) -> + case sets:is_element(Pid, Blocked) of + true -> Accs; + false -> {[Pid|PidsAcc], + SumAcc + timer:now_diff(Now, Eldest), + CountAcc + 1} + end end, {[], 0, 0}, Elders), case Pids of [] -> ok; |