diff options
-rw-r--r-- | src/file_handle_cache.erl | 192 | ||||
-rw-r--r-- | src/tcp_acceptor.erl | 5 |
2 files changed, 94 insertions, 103 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 0ee3a709..e51ce921 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -116,13 +116,13 @@ %% do not need to worry about their handles being closed by the server %% - reopening them when necessary is handled transparently. %% -%% The server also supports obtain and release_on_death. obtain/0 -%% blocks until a file descriptor is available. release_on_death/1 -%% takes a pid and monitors the pid, reducing the count by 1 when the -%% pid dies. Thus the assumption is that obtain/0 is called first, and -%% when that returns, release_on_death/1 is called with the pid who -%% "owns" the file descriptor. This is, for example, used to track the -%% use of file descriptors through network sockets. +%% The server also supports obtain and transfer. obtain/0 blocks until +%% a file descriptor is available. transfer/1 is transfers ownership +%% of a file descriptor between processes. It is non-blocking. +%% +%% The callers of register_callback/3, obtain/0, and the argument of +%% transfer/1 are monitored, reducing the count of handles in use +%% appropriately when the processes terminate. -behaviour(gen_server). @@ -130,7 +130,7 @@ -export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1, last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1, flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]). --export([obtain/1]). +-export([obtain/0, transfer/1]). -export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -222,7 +222,8 @@ -spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok'). -spec(delete/1 :: (ref()) -> ok_or_error()). -spec(clear/1 :: (ref()) -> ok_or_error()). --spec(obtain/1 :: (pid()) -> 'ok'). +-spec(obtain/0 :: () -> 'ok'). +-spec(transfer/1 :: (pid()) -> 'ok'). -endif. @@ -444,8 +445,11 @@ set_maximum_since_use(MaximumAge) -> true -> ok end. -obtain(Pid) -> - gen_server:call(?SERVER, {obtain, self(), Pid}, infinity). +obtain() -> + gen_server:call(?SERVER, {obtain, self()}, infinity). + +transfer(Pid) -> + gen_server:cast(?SERVER, {transfer, self(), Pid}). %%---------------------------------------------------------------------------- %% Internal functions @@ -741,54 +745,48 @@ init([]) -> counts = dict:new(), timer_ref = undefined }}. -handle_call({obtain, FromPid, ForPid}, From, - State = #fhc_state { obtain_limit = Limit, - obtain_count = Count, - obtain_pending = Pending, - elders = Elders }) - when Limit =/= infinity andalso Count >= Limit -> - MRef = erlang:monitor(process, FromPid), - Pending1 = [{obtain, FromPid, MRef, From, ForPid} | Pending], - {noreply, ensure_mref(ForPid, State #fhc_state { - obtain_pending = Pending1, - elders = dict:erase(FromPid, Elders) })}; -handle_call({obtain, FromPid, ForPid}, From, - State = #fhc_state { obtain_count = Count, - obtain_pending = Pending, - elders = Elders }) -> - MRef = erlang:monitor(process, FromPid), - case maybe_reduce(ensure_mref(ForPid, State #fhc_state { - obtain_count = Count + 1 })) of - {true, State1} -> - Pending1 = [{obtain, FromPid, MRef, From, ForPid} | Pending], - {noreply, State1 #fhc_state { - obtain_count = Count, - obtain_pending = Pending1, - elders = dict:erase(FromPid, Elders) }}; - {false, State1} -> - {noreply, - run_pending_item({obtain, FromPid, MRef, From, ForPid}, State1)} - end; - handle_call({open, Pid, EldestUnusedSince, CanClose}, From, State = #fhc_state { open_count = Count, open_pending = Pending, elders = Elders }) -> Elders1 = dict:store(Pid, EldestUnusedSince, Elders), - case maybe_reduce( - ensure_mref(Pid, State #fhc_state { open_count = Count + 1, - elders = Elders1 })) of + Item = {open, Pid, From}, + case maybe_reduce(State #fhc_state { open_count = Count + 1, + elders = Elders1 }) of {true, State1} -> State2 = State1 #fhc_state { open_count = Count }, case CanClose of true -> {reply, close, State2}; - false -> {noreply, - State2 #fhc_state { - open_pending = [{open, Pid, From} | Pending], - elders = dict:erase(Pid, Elders1) }} + false -> {noreply, State2 #fhc_state { + open_pending = [Item | Pending], + elders = dict:erase(Pid, Elders1) }} end; {false, State1} -> - {noreply, run_pending_item({open, Pid, From}, State1)} + {noreply, run_pending_item(Item, State1)} + end; + +handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit, + obtain_count = Count, + obtain_pending = Pending, + elders = Elders }) + when Limit =/= infinity andalso Count >= Limit -> + Item = {obtain, Pid, From}, + {noreply, ensure_mref(Pid, State #fhc_state { + obtain_pending = [Item | Pending], + elders = dict:erase(Pid, Elders) })}; +handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count, + obtain_pending = Pending, + elders = Elders }) -> + Item = {obtain, Pid, From}, + case maybe_reduce(ensure_mref(Pid, State #fhc_state { + obtain_count = Count + 1 })) of + {true, State1} -> + {noreply, State1 #fhc_state { + obtain_count = Count, + obtain_pending = [Item | Pending], + elders = dict:erase(Pid, Elders) }}; + {false, State1} -> + {noreply, run_pending_item(Item, State1)} end. handle_cast({register_callback, Pid, MFA}, @@ -804,57 +802,49 @@ handle_cast({update, Pid, EldestUnusedSince}, State = %% storm of messages {noreply, State #fhc_state { elders = Elders1 }}; -handle_cast({close, Pid, EldestUnusedSince}, State = - #fhc_state { elders = Elders, counts = Counts, - open_count = Count }) -> +handle_cast({close, Pid, EldestUnusedSince}, + State = #fhc_state { open_count = Count, + counts = Counts, + elders = Elders }) -> Elders1 = case EldestUnusedSince of undefined -> dict:erase(Pid, Elders); _ -> dict:store(Pid, EldestUnusedSince, Elders) end, - {Obtained, Opened} = dict:fetch(Pid, Counts), - {noreply, - process_pending(State #fhc_state { - open_count = Count - 1, - counts = dict:store(Pid, {Obtained, Opened - 1}, Counts), - elders = Elders1 })}; + Counts1 = update_counts(open, Pid, -1, Counts), + {noreply, process_pending(State #fhc_state { open_count = Count - 1, + counts = Counts1, + elders = Elders1 })}; + +handle_cast({transfer, FromPid, ToPid}, State) -> + State1 = #fhc_state { counts = Counts } = ensure_mref(ToPid, State), + Counts1 = update_counts(obtain, FromPid, -1, Counts), + Counts2 = update_counts(obtain, ToPid, +1, Counts1), + {noreply, process_pending(State1 #fhc_state { counts = Counts2 })}; handle_cast(check_counts, State) -> {_, State1} = maybe_reduce(State #fhc_state { timer_ref = undefined }), {noreply, State1}. handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = - #fhc_state { obtain_count = ObtainCount, - obtain_pending = ObtainPending, - open_count = OpenCount, + #fhc_state { open_count = OpenCount, open_pending = OpenPending, + obtain_count = ObtainCount, + obtain_pending = ObtainPending, callbacks = Callbacks, counts = Counts, elders = Elders }) -> - ObtainPending1 = - lists:filter( - fun ({obtain, FromPid, FromMRef, From, ForPid}) -> - case Pid =:= ForPid of - true -> gen_server:reply(From, ok), - true = erlang:demonitor(FromMRef, [flush]), - false; - false -> Pid =/= FromPid - end - end, ObtainPending), - OpenPending1 = lists:filter(fun ({open, Pid1, _From}) -> - Pid =/= Pid1 - end, OpenPending), - {Obtained, Opened} = case dict:find(Pid, Counts) of - {ok, Val} -> Val; - error -> {0, 0} - end, + FilterFun = fun ({_Kind, Pid1, _From}) -> Pid1 =/= Pid end, + OpenPending1 = lists:filter(FilterFun, OpenPending), + ObtainPending1 = lists:filter(FilterFun, ObtainPending), + {Opened, Obtained} = dict:fetch(Pid, Counts), {noreply, process_pending(State #fhc_state { - elders = dict:erase(Pid, Elders), - counts = dict:erase(Pid, Counts), - callbacks = dict:erase(Pid, Callbacks), + open_count = OpenCount - Opened, + open_pending = OpenPending1, obtain_count = ObtainCount - Obtained, obtain_pending = ObtainPending1, - open_count = OpenCount - Opened, - open_pending = OpenPending1 })}. + callbacks = dict:erase(Pid, Callbacks), + counts = dict:erase(Pid, Counts), + elders = dict:erase(Pid, Elders) })}. terminate(_Reason, State) -> State. @@ -895,26 +885,27 @@ process_pending([], _Quota, State) -> {[], 0, State}; process_pending(Pending, Quota, State) when Quota =< 0 -> {Pending, 0, State}; -process_pending(Pending, Quota, State) -> +process_pending(Pending, Quota, State = #fhc_state { counts = Counts }) -> PendingLen = length(Pending), SatisfiableLen = lists:min([PendingLen, Quota]), Take = PendingLen - SatisfiableLen, {PendingNew, SatisfiableRev} = lists:split(Take, Pending), - {PendingNew, SatisfiableLen, - lists:foldl(fun run_pending_item/2, State, SatisfiableRev)}. + Counts1 = lists:foldl(fun run_pending_item1/2, Counts, SatisfiableRev), + {PendingNew, SatisfiableLen, State #fhc_state { counts = Counts1 }}. -run_pending_item({open, Pid, From}, State = #fhc_state { counts = Counts }) -> - gen_server:reply(From, ok), - {Obtained, Opened} = dict:fetch(Pid, Counts), - State #fhc_state { - counts = dict:store(Pid, {Obtained, Opened + 1}, Counts) }; -run_pending_item({obtain, _FromPid, FromMRef, From, ForPid}, - State = #fhc_state { counts = Counts }) -> +run_pending_item(Item, State = #fhc_state { counts = Counts }) -> + State #fhc_state { counts = run_pending_item1(Item, Counts) }. + +run_pending_item1({Kind, Pid, From}, Counts) -> gen_server:reply(From, ok), - true = erlang:demonitor(FromMRef, [flush]), - {Obtained, Opened} = dict:fetch(ForPid, Counts), - State #fhc_state { - counts = dict:store(ForPid, {Obtained + 1, Opened}, Counts) }. + update_counts(Kind, Pid, +1, Counts). + +update_counts(open, Pid, Delta, Counts) -> + dict:update(Pid, fun ({Opened, Obtained}) -> {Opened + Delta, Obtained} end, + Counts); +update_counts(obtain, Pid, Delta, Counts) -> + dict:update(Pid, fun ({Opened, Obtained}) -> {Opened, Obtained + Delta} end, + Counts). maybe_reduce(State = #fhc_state { limit = Limit, open_count = OpenCount, @@ -942,10 +933,8 @@ maybe_reduce(State = #fhc_state { limit = Limit, _ -> AverageAge = Sum / ClientCount, lists:foreach( fun (Pid) -> - case dict:find(Pid, Callbacks) of - error -> ok; - {ok, {M, F, A}} -> apply(M, F, A ++ [AverageAge]) - end + {M, F, A} = dict:fetch(Pid, Callbacks), + apply(M, F, A ++ [AverageAge]) end, Pids) end, AboveLimit = Limit =/= infinity andalso OpenCount + ObtainCount > Limit, @@ -993,5 +982,6 @@ ensure_mref(Pid, State = #fhc_state { counts = Counts }) -> case dict:find(Pid, Counts) of {ok, _} -> State; error -> _MRef = erlang:monitor(process, Pid), - State #fhc_state { counts = dict:store(Pid, {0, 0}, Counts) } + State #fhc_state { + counts = dict:store(Pid, {0, 0}, Counts) } end. diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl index 11ce6fc5..0025a048 100644 --- a/src/tcp_acceptor.erl +++ b/src/tcp_acceptor.erl @@ -55,7 +55,6 @@ handle_call(_Request, _From, State) -> {noreply, State}. handle_cast(accept, State) -> - ok = file_handle_cache:obtain(self()), accept(State); handle_cast(_Msg, State) -> @@ -84,9 +83,10 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}}, %% is drained. gen_event:which_handlers(error_logger), %% handle - file_handle_cache:obtain(apply(M, F, A ++ [Sock])) + file_handle_cache:transfer(apply(M, F, A ++ [Sock])) catch {inet_error, Reason} -> gen_tcp:close(Sock), + file_handle_cache:transfer(spawn(fun () -> ok end)), error_logger:error_msg("unable to accept TCP connection: ~p~n", [Reason]) end, @@ -112,6 +112,7 @@ code_change(_OldVsn, State, _Extra) -> inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). accept(State = #state{sock=LSock}) -> + ok = file_handle_cache:obtain(), case prim_inet:async_accept(LSock, -1) of {ok, Ref} -> {noreply, State#state{ref=Ref}}; Error -> {stop, {cannot_accept, Error}, State} |