summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-08-20 18:53:39 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-08-20 18:53:39 +0100
commit21f5d5745c7266ae85b83c4880b729605e6be6f4 (patch)
treed2c6a34c8063499cdc8dbb9875f14556e97a3413
parent1dde9bfdec985cf693f3162fa77d94480a68ec0c (diff)
downloadrabbitmq-server-21f5d5745c7266ae85b83c4880b729605e6be6f4.tar.gz
Add infrastructure to server to be able to handle requests for multiple fds
-rw-r--r--src/file_handle_cache.erl29
1 files changed, 16 insertions, 13 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 9d308a90..6f8ab547 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -545,7 +545,8 @@ age_tree_insert(Now, Ref) ->
Tree = get_age_tree(),
Tree1 = gb_trees:insert(Now, Ref, Tree),
{Oldest, _Ref} = gb_trees:smallest(Tree1),
- case gen_server:call(?SERVER, {open, self(), Oldest}, infinity) of
+ case gen_server:call(?SERVER, {open, self(), 1,
+ gb_trees:size(Tree), Oldest}, infinity) of
ok ->
put_age_tree(Tree1);
close ->
@@ -753,27 +754,28 @@ init([]) ->
clients = Clients,
timer_ref = undefined }}.
-handle_call({open, Pid, EldestUnusedSince}, From,
+handle_call({open, Pid, Requested, Closable, EldestUnusedSince}, From,
State = #fhc_state { open_count = Count,
open_pending = Pending,
elders = Elders,
clients = Clients })
when EldestUnusedSince =/= undefined ->
Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
- Item = {open, Pid, From},
+ Item = {open, Pid, Requested, From},
ok = track_client(Pid, Clients),
State1 = State #fhc_state { elders = Elders1 },
- case needs_reduce(State1 #fhc_state { open_count = Count + 1 }) of
- true -> case ets:lookup(Clients, Pid) of
- [#cstate { opened = 0 }] ->
+ case needs_reduce(State1 #fhc_state { open_count = Count + Requested }) of
+ true -> case Closable of
+ 0 ->
true = ets:update_element(
Clients, Pid, {#cstate.blocked, true}),
{noreply,
reduce(State1 #fhc_state {
open_pending = [Item | Pending] })};
- [#cstate { opened = N }] ->
+ _ ->
true = ets:update_element(
- Clients, Pid, {#cstate.pending_closes, N}),
+ Clients, Pid,
+ {#cstate.pending_closes, Closable}),
{reply, close, State1}
end;
false -> {noreply, run_pending_item(Item, State1)}
@@ -786,12 +788,12 @@ handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit,
when Limit =/= infinity andalso Count >= Limit ->
ok = track_client(Pid, Clients),
true = ets:update_element(Clients, Pid, {#cstate.blocked, true}),
- Item = {obtain, Pid, From},
+ Item = {obtain, Pid, 1, From},
{noreply, State #fhc_state { obtain_pending = [Item | Pending] }};
handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count,
obtain_pending = Pending,
clients = Clients }) ->
- Item = {obtain, Pid, From},
+ Item = {obtain, Pid, 1, From},
ok = track_client(Pid, Clients),
case needs_reduce(State #fhc_state { obtain_count = Count + 1 }) of
true ->
@@ -847,7 +849,7 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason},
obtain_count = ObtainCount,
obtain_pending = ObtainPending,
clients = Clients }) ->
- FilterFun = fun ({_Kind, Pid1, _From}) -> Pid1 =/= Pid end,
+ FilterFun = fun ({_Kind, Pid1, _Requested, _From}) -> Pid1 =/= Pid end,
OpenPending1 = lists:filter(FilterFun, OpenPending),
ObtainPending1 = lists:filter(FilterFun, ObtainPending),
[#cstate { opened = Opened, obtained = Obtained }] =
@@ -907,10 +909,11 @@ process_pending(Pending, Quota, State) ->
State1 = lists:foldl(fun run_pending_item/2, State, SatisfiableRev),
{PendingNew, State1}.
-run_pending_item({Kind, Pid, From}, State = #fhc_state { clients = Clients }) ->
+run_pending_item({Kind, Pid, Requested, From},
+ State = #fhc_state { clients = Clients }) ->
gen_server:reply(From, ok),
true = ets:update_element(Clients, Pid, {#cstate.blocked, false}),
- update_counts(Kind, Pid, +1, State).
+ update_counts(Kind, Pid, Requested, State).
update_counts(Kind, Pid, Delta,
State = #fhc_state { open_count = OpenCount,