summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-08-21 20:01:04 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-08-21 20:01:04 +0100
commitb8b7b09d009e97583ee23c567dad633cbdc4e243 (patch)
tree073a36ebec1a1e68ebdcc9ebb0b322d2eeb2f311
parent21f5d5745c7266ae85b83c4880b729605e6be6f4 (diff)
downloadrabbitmq-server-b8b7b09d009e97583ee23c567dad633cbdc4e243.tar.gz
Modify server logic to deal with requests for multiple fds. Including nice pending_queues abstraction. Client logic remains to be done.
-rw-r--r--src/file_handle_cache.erl107
1 files changed, 76 insertions, 31 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 6f8ab547..a7ef2d66 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -545,8 +545,7 @@ age_tree_insert(Now, Ref) ->
Tree = get_age_tree(),
Tree1 = gb_trees:insert(Now, Ref, Tree),
{Oldest, _Ref} = gb_trees:smallest(Tree1),
- case gen_server:call(?SERVER, {open, self(), 1,
- gb_trees:size(Tree), Oldest}, infinity) of
+ case gen_server:call(?SERVER, {open, self(), 1, Oldest}, infinity) of
ok ->
put_age_tree(Tree1);
close ->
@@ -747,14 +746,14 @@ init([]) ->
{ok, #fhc_state { elders = dict:new(),
limit = Limit,
open_count = 0,
- open_pending = [],
+ open_pending = pending_new(),
obtain_limit = ObtainLimit,
obtain_count = 0,
- obtain_pending = [],
+ obtain_pending = pending_new(),
clients = Clients,
timer_ref = undefined }}.
-handle_call({open, Pid, Requested, Closable, EldestUnusedSince}, From,
+handle_call({open, Pid, Requested, EldestUnusedSince}, From,
State = #fhc_state { open_count = Count,
open_pending = Pending,
elders = Elders,
@@ -765,17 +764,17 @@ handle_call({open, Pid, Requested, Closable, EldestUnusedSince}, From,
ok = track_client(Pid, Clients),
State1 = State #fhc_state { elders = Elders1 },
case needs_reduce(State1 #fhc_state { open_count = Count + Requested }) of
- true -> case Closable of
- 0 ->
+ true -> case ets:lookup(Clients, Pid) of
+ [#cstate { opened = 0 }] ->
true = ets:update_element(
Clients, Pid, {#cstate.blocked, true}),
{noreply,
reduce(State1 #fhc_state {
- open_pending = [Item | Pending] })};
- _ ->
- true = ets:update_element(
- Clients, Pid,
- {#cstate.pending_closes, Closable}),
+ open_pending = pending_in(Item, Pending) })};
+ [#cstate { opened = Opened }] ->
+ true =
+ ets:update_element(
+ Clients, Pid, {#cstate.pending_closes, Opened}),
{reply, close, State1}
end;
false -> {noreply, run_pending_item(Item, State1)}
@@ -789,7 +788,7 @@ handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit,
ok = track_client(Pid, Clients),
true = ets:update_element(Clients, Pid, {#cstate.blocked, true}),
Item = {obtain, Pid, 1, From},
- {noreply, State #fhc_state { obtain_pending = [Item | Pending] }};
+ {noreply, State #fhc_state { obtain_pending = pending_in(Item, Pending) }};
handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count,
obtain_pending = Pending,
clients = Clients }) ->
@@ -798,8 +797,8 @@ handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count,
case needs_reduce(State #fhc_state { obtain_count = Count + 1 }) of
true ->
true = ets:update_element(Clients, Pid, {#cstate.blocked, true}),
- {noreply,
- reduce(State #fhc_state {obtain_pending = [Item | Pending] })};
+ {noreply, reduce(State #fhc_state {
+ obtain_pending = pending_in(Item, Pending) })};
false ->
{noreply, run_pending_item(Item, State)}
end.
@@ -849,18 +848,16 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason},
obtain_count = ObtainCount,
obtain_pending = ObtainPending,
clients = Clients }) ->
- FilterFun = fun ({_Kind, Pid1, _Requested, _From}) -> Pid1 =/= Pid end,
- OpenPending1 = lists:filter(FilterFun, OpenPending),
- ObtainPending1 = lists:filter(FilterFun, ObtainPending),
[#cstate { opened = Opened, obtained = Obtained }] =
ets:lookup(Clients, Pid),
true = ets:delete(Clients, Pid),
+ FilterFun = fun ({_Kind, Pid1, _Requested, _From}) -> Pid1 =/= Pid end,
{noreply, process_pending(
State #fhc_state {
open_count = OpenCount - Opened,
- open_pending = OpenPending1,
+ open_pending = filter_pending(FilterFun, OpenPending),
obtain_count = ObtainCount - Obtained,
- obtain_pending = ObtainPending1,
+ obtain_pending = filter_pending(FilterFun, ObtainPending),
elders = dict:erase(Pid, Elders) })}.
terminate(_Reason, State = #fhc_state { clients = Clients }) ->
@@ -871,9 +868,52 @@ code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%----------------------------------------------------------------------------
+%% pending queue abstraction helpers
+%%----------------------------------------------------------------------------
+
+queue_fold(Fun, Init, Q) ->
+ case queue:out(Q) of
+ {empty, _Q} -> Init;
+ {{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1)
+ end.
+
+filter_pending(Fun, {Count, Queue}) ->
+ {Delta, Queue1} =
+ queue_fold(fun (Item, {DeltaN, QueueN}) ->
+ case Fun(Item) of
+ true -> {DeltaN, queue:in(Item, QueueN)};
+ false -> {DeltaN - requested(Item), QueueN}
+ end
+ end, {0, queue:new()}, Queue),
+ {Count + Delta, Queue1}.
+
+pending_new() ->
+ {0, queue:new()}.
+
+pending_in(Item, {Count, Queue}) ->
+ {Count + requested(Item), queue:in(Item, Queue)}.
+
+pending_out({0, _Queue} = Pending) ->
+ {empty, Pending};
+pending_out({N, Queue}) ->
+ {{value, Item} = Result, Queue1} = queue:out(Queue),
+ {Result, {N - requested(Item), Queue1}}.
+
+pending_count({Count, _Queue}) ->
+ Count.
+
+pending_is_empty({0, _Queue}) ->
+ true;
+pending_is_empty({_N, _Queue}) ->
+ false.
+
+%%----------------------------------------------------------------------------
%% server helpers
%%----------------------------------------------------------------------------
+requested({_Kind, _Pid, Requested, _From}) ->
+ Requested.
+
process_pending(State = #fhc_state { limit = infinity }) ->
State;
process_pending(State) ->
@@ -897,17 +937,21 @@ process_obtain(State = #fhc_state { limit = Limit,
{Pending1, State1} = process_pending(Pending, Quota, State),
State1 #fhc_state { obtain_pending = Pending1 }.
-process_pending([], _Quota, State) ->
- {[], State};
process_pending(Pending, Quota, State) when Quota =< 0 ->
{Pending, State};
process_pending(Pending, Quota, State) ->
- PendingLen = length(Pending),
- SatisfiableLen = lists:min([PendingLen, Quota]),
- Take = PendingLen - SatisfiableLen,
- {PendingNew, SatisfiableRev} = lists:split(Take, Pending),
- State1 = lists:foldl(fun run_pending_item/2, State, SatisfiableRev),
- {PendingNew, State1}.
+ case pending_out(Pending) of
+ {empty, _Pending} ->
+ {Pending, State};
+ {{value, Item}, Pending1} ->
+ case requested(Item) of
+ Requested when Requested > Quota ->
+ {Pending, State};
+ Requested ->
+ process_pending(Pending1, Quota - Requested,
+ run_pending_item(Item, State))
+ end
+ end.
run_pending_item({Kind, Pid, Requested, From},
State = #fhc_state { clients = Clients }) ->
@@ -938,9 +982,9 @@ needs_reduce(#fhc_state { limit = Limit,
obtain_pending = ObtainPending }) ->
Limit =/= infinity
andalso ((OpenCount + ObtainCount > Limit)
- orelse (OpenPending =/= [])
+ orelse (not pending_is_empty(OpenPending))
orelse (ObtainCount < ObtainLimit
- andalso ObtainPending =/= [])).
+ andalso not pending_is_empty(ObtainPending))).
reduce(State = #fhc_state { open_pending = OpenPending,
obtain_pending = ObtainPending,
@@ -969,7 +1013,8 @@ reduce(State = #fhc_state { open_pending = OpenPending,
notify_age(CStates, AverageAge);
_ ->
notify_age0(Clients, CStates,
- length(OpenPending) + length(ObtainPending))
+ pending_count(OpenPending) +
+ pending_count(ObtainPending))
end
end,
case TRef of