summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-08-17 17:42:23 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-08-17 17:42:23 +0100
commit1df32045348e5c17a88d36d63553539f11ba5b5b (patch)
tree9f2672de102386cad49ca755d5657a14adba6cc3
parent1d89aa2cb8176510f51947daf55a126505a98763 (diff)
downloadrabbitmq-server-1df32045348e5c17a88d36d63553539f11ba5b5b.tar.gz
Track blocked pids explicitly
-rw-r--r--src/file_handle_cache.erl69
1 files changed, 42 insertions, 27 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 0ee3a709..e61e9e25 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -184,6 +184,7 @@
obtain_pending,
callbacks,
counts,
+ blocked,
timer_ref
}).
@@ -739,32 +740,34 @@ init([]) ->
obtain_pending = [],
callbacks = dict:new(),
counts = dict:new(),
+ blocked = sets:new(),
timer_ref = undefined }}.
handle_call({obtain, FromPid, ForPid}, From,
State = #fhc_state { obtain_limit = Limit,
obtain_count = Count,
obtain_pending = Pending,
- elders = Elders })
+ blocked = Blocked })
when Limit =/= infinity andalso Count >= Limit ->
MRef = erlang:monitor(process, FromPid),
Pending1 = [{obtain, FromPid, MRef, From, ForPid} | Pending],
- {noreply, ensure_mref(ForPid, State #fhc_state {
- obtain_pending = Pending1,
- elders = dict:erase(FromPid, Elders) })};
+ {noreply, ensure_mref(ForPid,
+ State #fhc_state {
+ blocked = sets:add_element(FromPid, Blocked),
+ obtain_pending = Pending1 })};
handle_call({obtain, FromPid, ForPid}, From,
State = #fhc_state { obtain_count = Count,
obtain_pending = Pending,
- elders = Elders }) ->
+ blocked = Blocked }) ->
MRef = erlang:monitor(process, FromPid),
case maybe_reduce(ensure_mref(ForPid, State #fhc_state {
obtain_count = Count + 1 })) of
{true, State1} ->
Pending1 = [{obtain, FromPid, MRef, From, ForPid} | Pending],
{noreply, State1 #fhc_state {
+ blocked = sets:add_element(FromPid, Blocked),
obtain_count = Count,
- obtain_pending = Pending1,
- elders = dict:erase(FromPid, Elders) }};
+ obtain_pending = Pending1 }};
{false, State1} ->
{noreply,
run_pending_item({obtain, FromPid, MRef, From, ForPid}, State1)}
@@ -773,7 +776,8 @@ handle_call({obtain, FromPid, ForPid}, From,
handle_call({open, Pid, EldestUnusedSince, CanClose}, From,
State = #fhc_state { open_count = Count,
open_pending = Pending,
- elders = Elders }) ->
+ elders = Elders,
+ blocked = Blocked }) ->
Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
case maybe_reduce(
ensure_mref(Pid, State #fhc_state { open_count = Count + 1,
@@ -784,8 +788,8 @@ handle_call({open, Pid, EldestUnusedSince, CanClose}, From,
true -> {reply, close, State2};
false -> {noreply,
State2 #fhc_state {
- open_pending = [{open, Pid, From} | Pending],
- elders = dict:erase(Pid, Elders1) }}
+ blocked = sets:add_element(Pid, Blocked),
+ open_pending = [{open, Pid, From} | Pending] }}
end;
{false, State1} ->
{noreply, run_pending_item({open, Pid, From}, State1)}
@@ -829,7 +833,8 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, State =
open_pending = OpenPending,
callbacks = Callbacks,
counts = Counts,
- elders = Elders }) ->
+ elders = Elders,
+ blocked = Blocked }) ->
ObtainPending1 =
lists:filter(
fun ({obtain, FromPid, FromMRef, From, ForPid}) ->
@@ -847,14 +852,16 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, State =
{ok, Val} -> Val;
error -> {0, 0}
end,
- {noreply, process_pending(State #fhc_state {
- elders = dict:erase(Pid, Elders),
- counts = dict:erase(Pid, Counts),
- callbacks = dict:erase(Pid, Callbacks),
- obtain_count = ObtainCount - Obtained,
- obtain_pending = ObtainPending1,
- open_count = OpenCount - Opened,
- open_pending = OpenPending1 })}.
+ {noreply, process_pending(
+ State #fhc_state {
+ elders = dict:erase(Pid, Elders),
+ counts = dict:erase(Pid, Counts),
+ callbacks = dict:erase(Pid, Callbacks),
+ obtain_count = ObtainCount - Obtained,
+ obtain_pending = ObtainPending1,
+ open_count = OpenCount - Opened,
+ open_pending = OpenPending1,
+ blocked = sets:del_element(Pid, Blocked) })}.
terminate(_Reason, State) ->
State.
@@ -903,18 +910,21 @@ process_pending(Pending, Quota, State) ->
{PendingNew, SatisfiableLen,
lists:foldl(fun run_pending_item/2, State, SatisfiableRev)}.
-run_pending_item({open, Pid, From}, State = #fhc_state { counts = Counts }) ->
+run_pending_item({open, Pid, From}, State = #fhc_state { counts = Counts,
+ blocked = Blocked }) ->
gen_server:reply(From, ok),
{Obtained, Opened} = dict:fetch(Pid, Counts),
State #fhc_state {
- counts = dict:store(Pid, {Obtained, Opened + 1}, Counts) };
-run_pending_item({obtain, _FromPid, FromMRef, From, ForPid},
- State = #fhc_state { counts = Counts }) ->
+ counts = dict:store(Pid, {Obtained, Opened + 1}, Counts),
+ blocked = sets:del_element(Pid, Blocked) };
+run_pending_item({obtain, FromPid, FromMRef, From, ForPid},
+ State = #fhc_state { counts = Counts, blocked = Blocked }) ->
gen_server:reply(From, ok),
true = erlang:demonitor(FromMRef, [flush]),
{Obtained, Opened} = dict:fetch(ForPid, Counts),
State #fhc_state {
- counts = dict:store(ForPid, {Obtained + 1, Opened}, Counts) }.
+ counts = dict:store(ForPid, {Obtained + 1, Opened}, Counts),
+ blocked = sets:del_element(FromPid, Blocked) }.
maybe_reduce(State = #fhc_state { limit = Limit,
open_count = OpenCount,
@@ -924,6 +934,7 @@ maybe_reduce(State = #fhc_state { limit = Limit,
obtain_pending = ObtainPending,
elders = Elders,
callbacks = Callbacks,
+ blocked = Blocked,
timer_ref = TRef })
when Limit =/= infinity andalso
(((OpenCount + ObtainCount) > Limit) orelse
@@ -933,9 +944,13 @@ maybe_reduce(State = #fhc_state { limit = Limit,
{Pids, Sum, ClientCount} =
dict:fold(fun (_Pid, undefined, Accs) ->
Accs;
- (Pid, Eldest, {PidsAcc, SumAcc, CountAcc}) ->
- {[Pid|PidsAcc], SumAcc + timer:now_diff(Now, Eldest),
- CountAcc + 1}
+ (Pid, Eldest, {PidsAcc, SumAcc, CountAcc} = Accs) ->
+ case sets:is_element(Pid, Blocked) of
+ true -> Accs;
+ false -> {[Pid|PidsAcc],
+ SumAcc + timer:now_diff(Now, Eldest),
+ CountAcc + 1}
+ end
end, {[], 0, 0}, Elders),
case Pids of
[] -> ok;