diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-21 20:01:04 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-21 20:01:04 +0100 |
commit | b8b7b09d009e97583ee23c567dad633cbdc4e243 (patch) | |
tree | 073a36ebec1a1e68ebdcc9ebb0b322d2eeb2f311 | |
parent | 21f5d5745c7266ae85b83c4880b729605e6be6f4 (diff) | |
download | rabbitmq-server-b8b7b09d009e97583ee23c567dad633cbdc4e243.tar.gz |
Modify server logic to deal with requests for multiple fds. Including nice pending_queues abstraction. Client logic remains to be done.
-rw-r--r-- | src/file_handle_cache.erl | 107 |
1 files changed, 76 insertions, 31 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 6f8ab547..a7ef2d66 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -545,8 +545,7 @@ age_tree_insert(Now, Ref) -> Tree = get_age_tree(), Tree1 = gb_trees:insert(Now, Ref, Tree), {Oldest, _Ref} = gb_trees:smallest(Tree1), - case gen_server:call(?SERVER, {open, self(), 1, - gb_trees:size(Tree), Oldest}, infinity) of + case gen_server:call(?SERVER, {open, self(), 1, Oldest}, infinity) of ok -> put_age_tree(Tree1); close -> @@ -747,14 +746,14 @@ init([]) -> {ok, #fhc_state { elders = dict:new(), limit = Limit, open_count = 0, - open_pending = [], + open_pending = pending_new(), obtain_limit = ObtainLimit, obtain_count = 0, - obtain_pending = [], + obtain_pending = pending_new(), clients = Clients, timer_ref = undefined }}. -handle_call({open, Pid, Requested, Closable, EldestUnusedSince}, From, +handle_call({open, Pid, Requested, EldestUnusedSince}, From, State = #fhc_state { open_count = Count, open_pending = Pending, elders = Elders, @@ -765,17 +764,17 @@ handle_call({open, Pid, Requested, Closable, EldestUnusedSince}, From, ok = track_client(Pid, Clients), State1 = State #fhc_state { elders = Elders1 }, case needs_reduce(State1 #fhc_state { open_count = Count + Requested }) of - true -> case Closable of - 0 -> + true -> case ets:lookup(Clients, Pid) of + [#cstate { opened = 0 }] -> true = ets:update_element( Clients, Pid, {#cstate.blocked, true}), {noreply, reduce(State1 #fhc_state { - open_pending = [Item | Pending] })}; - _ -> - true = ets:update_element( - Clients, Pid, - {#cstate.pending_closes, Closable}), + open_pending = pending_in(Item, Pending) })}; + [#cstate { opened = Opened }] -> + true = + ets:update_element( + Clients, Pid, {#cstate.pending_closes, Opened}), {reply, close, State1} end; false -> {noreply, run_pending_item(Item, State1)} @@ -789,7 +788,7 @@ handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit, ok = track_client(Pid, Clients), true = ets:update_element(Clients, Pid, {#cstate.blocked, true}), Item = {obtain, Pid, 1, From}, - {noreply, State #fhc_state { obtain_pending = [Item | Pending] }}; + {noreply, State #fhc_state { obtain_pending = pending_in(Item, Pending) }}; handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count, obtain_pending = Pending, clients = Clients }) -> @@ -798,8 +797,8 @@ handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count, case needs_reduce(State #fhc_state { obtain_count = Count + 1 }) of true -> true = ets:update_element(Clients, Pid, {#cstate.blocked, true}), - {noreply, - reduce(State #fhc_state {obtain_pending = [Item | Pending] })}; + {noreply, reduce(State #fhc_state { + obtain_pending = pending_in(Item, Pending) })}; false -> {noreply, run_pending_item(Item, State)} end. @@ -849,18 +848,16 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, obtain_count = ObtainCount, obtain_pending = ObtainPending, clients = Clients }) -> - FilterFun = fun ({_Kind, Pid1, _Requested, _From}) -> Pid1 =/= Pid end, - OpenPending1 = lists:filter(FilterFun, OpenPending), - ObtainPending1 = lists:filter(FilterFun, ObtainPending), [#cstate { opened = Opened, obtained = Obtained }] = ets:lookup(Clients, Pid), true = ets:delete(Clients, Pid), + FilterFun = fun ({_Kind, Pid1, _Requested, _From}) -> Pid1 =/= Pid end, {noreply, process_pending( State #fhc_state { open_count = OpenCount - Opened, - open_pending = OpenPending1, + open_pending = filter_pending(FilterFun, OpenPending), obtain_count = ObtainCount - Obtained, - obtain_pending = ObtainPending1, + obtain_pending = filter_pending(FilterFun, ObtainPending), elders = dict:erase(Pid, Elders) })}. terminate(_Reason, State = #fhc_state { clients = Clients }) -> @@ -871,9 +868,52 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. %%---------------------------------------------------------------------------- +%% pending queue abstraction helpers +%%---------------------------------------------------------------------------- + +queue_fold(Fun, Init, Q) -> + case queue:out(Q) of + {empty, _Q} -> Init; + {{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1) + end. + +filter_pending(Fun, {Count, Queue}) -> + {Delta, Queue1} = + queue_fold(fun (Item, {DeltaN, QueueN}) -> + case Fun(Item) of + true -> {DeltaN, queue:in(Item, QueueN)}; + false -> {DeltaN - requested(Item), QueueN} + end + end, {0, queue:new()}, Queue), + {Count + Delta, Queue1}. + +pending_new() -> + {0, queue:new()}. + +pending_in(Item, {Count, Queue}) -> + {Count + requested(Item), queue:in(Item, Queue)}. + +pending_out({0, _Queue} = Pending) -> + {empty, Pending}; +pending_out({N, Queue}) -> + {{value, Item} = Result, Queue1} = queue:out(Queue), + {Result, {N - requested(Item), Queue1}}. + +pending_count({Count, _Queue}) -> + Count. + +pending_is_empty({0, _Queue}) -> + true; +pending_is_empty({_N, _Queue}) -> + false. + +%%---------------------------------------------------------------------------- %% server helpers %%---------------------------------------------------------------------------- +requested({_Kind, _Pid, Requested, _From}) -> + Requested. + process_pending(State = #fhc_state { limit = infinity }) -> State; process_pending(State) -> @@ -897,17 +937,21 @@ process_obtain(State = #fhc_state { limit = Limit, {Pending1, State1} = process_pending(Pending, Quota, State), State1 #fhc_state { obtain_pending = Pending1 }. -process_pending([], _Quota, State) -> - {[], State}; process_pending(Pending, Quota, State) when Quota =< 0 -> {Pending, State}; process_pending(Pending, Quota, State) -> - PendingLen = length(Pending), - SatisfiableLen = lists:min([PendingLen, Quota]), - Take = PendingLen - SatisfiableLen, - {PendingNew, SatisfiableRev} = lists:split(Take, Pending), - State1 = lists:foldl(fun run_pending_item/2, State, SatisfiableRev), - {PendingNew, State1}. + case pending_out(Pending) of + {empty, _Pending} -> + {Pending, State}; + {{value, Item}, Pending1} -> + case requested(Item) of + Requested when Requested > Quota -> + {Pending, State}; + Requested -> + process_pending(Pending1, Quota - Requested, + run_pending_item(Item, State)) + end + end. run_pending_item({Kind, Pid, Requested, From}, State = #fhc_state { clients = Clients }) -> @@ -938,9 +982,9 @@ needs_reduce(#fhc_state { limit = Limit, obtain_pending = ObtainPending }) -> Limit =/= infinity andalso ((OpenCount + ObtainCount > Limit) - orelse (OpenPending =/= []) + orelse (not pending_is_empty(OpenPending)) orelse (ObtainCount < ObtainLimit - andalso ObtainPending =/= [])). + andalso not pending_is_empty(ObtainPending))). reduce(State = #fhc_state { open_pending = OpenPending, obtain_pending = ObtainPending, @@ -969,7 +1013,8 @@ reduce(State = #fhc_state { open_pending = OpenPending, notify_age(CStates, AverageAge); _ -> notify_age0(Clients, CStates, - length(OpenPending) + length(ObtainPending)) + pending_count(OpenPending) + + pending_count(ObtainPending)) end end, case TRef of |