summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/file_handle_cache.erl192
-rw-r--r--src/tcp_acceptor.erl5
2 files changed, 94 insertions, 103 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 0ee3a709..e51ce921 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -116,13 +116,13 @@
%% do not need to worry about their handles being closed by the server
%% - reopening them when necessary is handled transparently.
%%
-%% The server also supports obtain and release_on_death. obtain/0
-%% blocks until a file descriptor is available. release_on_death/1
-%% takes a pid and monitors the pid, reducing the count by 1 when the
-%% pid dies. Thus the assumption is that obtain/0 is called first, and
-%% when that returns, release_on_death/1 is called with the pid who
-%% "owns" the file descriptor. This is, for example, used to track the
-%% use of file descriptors through network sockets.
+%% The server also supports obtain and transfer. obtain/0 blocks until
+%% a file descriptor is available. transfer/1 is transfers ownership
+%% of a file descriptor between processes. It is non-blocking.
+%%
+%% The callers of register_callback/3, obtain/0, and the argument of
+%% transfer/1 are monitored, reducing the count of handles in use
+%% appropriately when the processes terminate.
-behaviour(gen_server).
@@ -130,7 +130,7 @@
-export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1,
last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1,
flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]).
--export([obtain/1]).
+-export([obtain/0, transfer/1]).
-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -222,7 +222,8 @@
-spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok').
-spec(delete/1 :: (ref()) -> ok_or_error()).
-spec(clear/1 :: (ref()) -> ok_or_error()).
--spec(obtain/1 :: (pid()) -> 'ok').
+-spec(obtain/0 :: () -> 'ok').
+-spec(transfer/1 :: (pid()) -> 'ok').
-endif.
@@ -444,8 +445,11 @@ set_maximum_since_use(MaximumAge) ->
true -> ok
end.
-obtain(Pid) ->
- gen_server:call(?SERVER, {obtain, self(), Pid}, infinity).
+obtain() ->
+ gen_server:call(?SERVER, {obtain, self()}, infinity).
+
+transfer(Pid) ->
+ gen_server:cast(?SERVER, {transfer, self(), Pid}).
%%----------------------------------------------------------------------------
%% Internal functions
@@ -741,54 +745,48 @@ init([]) ->
counts = dict:new(),
timer_ref = undefined }}.
-handle_call({obtain, FromPid, ForPid}, From,
- State = #fhc_state { obtain_limit = Limit,
- obtain_count = Count,
- obtain_pending = Pending,
- elders = Elders })
- when Limit =/= infinity andalso Count >= Limit ->
- MRef = erlang:monitor(process, FromPid),
- Pending1 = [{obtain, FromPid, MRef, From, ForPid} | Pending],
- {noreply, ensure_mref(ForPid, State #fhc_state {
- obtain_pending = Pending1,
- elders = dict:erase(FromPid, Elders) })};
-handle_call({obtain, FromPid, ForPid}, From,
- State = #fhc_state { obtain_count = Count,
- obtain_pending = Pending,
- elders = Elders }) ->
- MRef = erlang:monitor(process, FromPid),
- case maybe_reduce(ensure_mref(ForPid, State #fhc_state {
- obtain_count = Count + 1 })) of
- {true, State1} ->
- Pending1 = [{obtain, FromPid, MRef, From, ForPid} | Pending],
- {noreply, State1 #fhc_state {
- obtain_count = Count,
- obtain_pending = Pending1,
- elders = dict:erase(FromPid, Elders) }};
- {false, State1} ->
- {noreply,
- run_pending_item({obtain, FromPid, MRef, From, ForPid}, State1)}
- end;
-
handle_call({open, Pid, EldestUnusedSince, CanClose}, From,
State = #fhc_state { open_count = Count,
open_pending = Pending,
elders = Elders }) ->
Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
- case maybe_reduce(
- ensure_mref(Pid, State #fhc_state { open_count = Count + 1,
- elders = Elders1 })) of
+ Item = {open, Pid, From},
+ case maybe_reduce(State #fhc_state { open_count = Count + 1,
+ elders = Elders1 }) of
{true, State1} ->
State2 = State1 #fhc_state { open_count = Count },
case CanClose of
true -> {reply, close, State2};
- false -> {noreply,
- State2 #fhc_state {
- open_pending = [{open, Pid, From} | Pending],
- elders = dict:erase(Pid, Elders1) }}
+ false -> {noreply, State2 #fhc_state {
+ open_pending = [Item | Pending],
+ elders = dict:erase(Pid, Elders1) }}
end;
{false, State1} ->
- {noreply, run_pending_item({open, Pid, From}, State1)}
+ {noreply, run_pending_item(Item, State1)}
+ end;
+
+handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit,
+ obtain_count = Count,
+ obtain_pending = Pending,
+ elders = Elders })
+ when Limit =/= infinity andalso Count >= Limit ->
+ Item = {obtain, Pid, From},
+ {noreply, ensure_mref(Pid, State #fhc_state {
+ obtain_pending = [Item | Pending],
+ elders = dict:erase(Pid, Elders) })};
+handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count,
+ obtain_pending = Pending,
+ elders = Elders }) ->
+ Item = {obtain, Pid, From},
+ case maybe_reduce(ensure_mref(Pid, State #fhc_state {
+ obtain_count = Count + 1 })) of
+ {true, State1} ->
+ {noreply, State1 #fhc_state {
+ obtain_count = Count,
+ obtain_pending = [Item | Pending],
+ elders = dict:erase(Pid, Elders) }};
+ {false, State1} ->
+ {noreply, run_pending_item(Item, State1)}
end.
handle_cast({register_callback, Pid, MFA},
@@ -804,57 +802,49 @@ handle_cast({update, Pid, EldestUnusedSince}, State =
%% storm of messages
{noreply, State #fhc_state { elders = Elders1 }};
-handle_cast({close, Pid, EldestUnusedSince}, State =
- #fhc_state { elders = Elders, counts = Counts,
- open_count = Count }) ->
+handle_cast({close, Pid, EldestUnusedSince},
+ State = #fhc_state { open_count = Count,
+ counts = Counts,
+ elders = Elders }) ->
Elders1 = case EldestUnusedSince of
undefined -> dict:erase(Pid, Elders);
_ -> dict:store(Pid, EldestUnusedSince, Elders)
end,
- {Obtained, Opened} = dict:fetch(Pid, Counts),
- {noreply,
- process_pending(State #fhc_state {
- open_count = Count - 1,
- counts = dict:store(Pid, {Obtained, Opened - 1}, Counts),
- elders = Elders1 })};
+ Counts1 = update_counts(open, Pid, -1, Counts),
+ {noreply, process_pending(State #fhc_state { open_count = Count - 1,
+ counts = Counts1,
+ elders = Elders1 })};
+
+handle_cast({transfer, FromPid, ToPid}, State) ->
+ State1 = #fhc_state { counts = Counts } = ensure_mref(ToPid, State),
+ Counts1 = update_counts(obtain, FromPid, -1, Counts),
+ Counts2 = update_counts(obtain, ToPid, +1, Counts1),
+ {noreply, process_pending(State1 #fhc_state { counts = Counts2 })};
handle_cast(check_counts, State) ->
{_, State1} = maybe_reduce(State #fhc_state { timer_ref = undefined }),
{noreply, State1}.
handle_info({'DOWN', _MRef, process, Pid, _Reason}, State =
- #fhc_state { obtain_count = ObtainCount,
- obtain_pending = ObtainPending,
- open_count = OpenCount,
+ #fhc_state { open_count = OpenCount,
open_pending = OpenPending,
+ obtain_count = ObtainCount,
+ obtain_pending = ObtainPending,
callbacks = Callbacks,
counts = Counts,
elders = Elders }) ->
- ObtainPending1 =
- lists:filter(
- fun ({obtain, FromPid, FromMRef, From, ForPid}) ->
- case Pid =:= ForPid of
- true -> gen_server:reply(From, ok),
- true = erlang:demonitor(FromMRef, [flush]),
- false;
- false -> Pid =/= FromPid
- end
- end, ObtainPending),
- OpenPending1 = lists:filter(fun ({open, Pid1, _From}) ->
- Pid =/= Pid1
- end, OpenPending),
- {Obtained, Opened} = case dict:find(Pid, Counts) of
- {ok, Val} -> Val;
- error -> {0, 0}
- end,
+ FilterFun = fun ({_Kind, Pid1, _From}) -> Pid1 =/= Pid end,
+ OpenPending1 = lists:filter(FilterFun, OpenPending),
+ ObtainPending1 = lists:filter(FilterFun, ObtainPending),
+ {Opened, Obtained} = dict:fetch(Pid, Counts),
{noreply, process_pending(State #fhc_state {
- elders = dict:erase(Pid, Elders),
- counts = dict:erase(Pid, Counts),
- callbacks = dict:erase(Pid, Callbacks),
+ open_count = OpenCount - Opened,
+ open_pending = OpenPending1,
obtain_count = ObtainCount - Obtained,
obtain_pending = ObtainPending1,
- open_count = OpenCount - Opened,
- open_pending = OpenPending1 })}.
+ callbacks = dict:erase(Pid, Callbacks),
+ counts = dict:erase(Pid, Counts),
+ elders = dict:erase(Pid, Elders) })}.
terminate(_Reason, State) ->
State.
@@ -895,26 +885,27 @@ process_pending([], _Quota, State) ->
{[], 0, State};
process_pending(Pending, Quota, State) when Quota =< 0 ->
{Pending, 0, State};
-process_pending(Pending, Quota, State) ->
+process_pending(Pending, Quota, State = #fhc_state { counts = Counts }) ->
PendingLen = length(Pending),
SatisfiableLen = lists:min([PendingLen, Quota]),
Take = PendingLen - SatisfiableLen,
{PendingNew, SatisfiableRev} = lists:split(Take, Pending),
- {PendingNew, SatisfiableLen,
- lists:foldl(fun run_pending_item/2, State, SatisfiableRev)}.
+ Counts1 = lists:foldl(fun run_pending_item1/2, Counts, SatisfiableRev),
+ {PendingNew, SatisfiableLen, State #fhc_state { counts = Counts1 }}.
-run_pending_item({open, Pid, From}, State = #fhc_state { counts = Counts }) ->
- gen_server:reply(From, ok),
- {Obtained, Opened} = dict:fetch(Pid, Counts),
- State #fhc_state {
- counts = dict:store(Pid, {Obtained, Opened + 1}, Counts) };
-run_pending_item({obtain, _FromPid, FromMRef, From, ForPid},
- State = #fhc_state { counts = Counts }) ->
+run_pending_item(Item, State = #fhc_state { counts = Counts }) ->
+ State #fhc_state { counts = run_pending_item1(Item, Counts) }.
+
+run_pending_item1({Kind, Pid, From}, Counts) ->
gen_server:reply(From, ok),
- true = erlang:demonitor(FromMRef, [flush]),
- {Obtained, Opened} = dict:fetch(ForPid, Counts),
- State #fhc_state {
- counts = dict:store(ForPid, {Obtained + 1, Opened}, Counts) }.
+ update_counts(Kind, Pid, +1, Counts).
+
+update_counts(open, Pid, Delta, Counts) ->
+ dict:update(Pid, fun ({Opened, Obtained}) -> {Opened + Delta, Obtained} end,
+ Counts);
+update_counts(obtain, Pid, Delta, Counts) ->
+ dict:update(Pid, fun ({Opened, Obtained}) -> {Opened, Obtained + Delta} end,
+ Counts).
maybe_reduce(State = #fhc_state { limit = Limit,
open_count = OpenCount,
@@ -942,10 +933,8 @@ maybe_reduce(State = #fhc_state { limit = Limit,
_ -> AverageAge = Sum / ClientCount,
lists:foreach(
fun (Pid) ->
- case dict:find(Pid, Callbacks) of
- error -> ok;
- {ok, {M, F, A}} -> apply(M, F, A ++ [AverageAge])
- end
+ {M, F, A} = dict:fetch(Pid, Callbacks),
+ apply(M, F, A ++ [AverageAge])
end, Pids)
end,
AboveLimit = Limit =/= infinity andalso OpenCount + ObtainCount > Limit,
@@ -993,5 +982,6 @@ ensure_mref(Pid, State = #fhc_state { counts = Counts }) ->
case dict:find(Pid, Counts) of
{ok, _} -> State;
error -> _MRef = erlang:monitor(process, Pid),
- State #fhc_state { counts = dict:store(Pid, {0, 0}, Counts) }
+ State #fhc_state {
+ counts = dict:store(Pid, {0, 0}, Counts) }
end.
diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl
index 11ce6fc5..0025a048 100644
--- a/src/tcp_acceptor.erl
+++ b/src/tcp_acceptor.erl
@@ -55,7 +55,6 @@ handle_call(_Request, _From, State) ->
{noreply, State}.
handle_cast(accept, State) ->
- ok = file_handle_cache:obtain(self()),
accept(State);
handle_cast(_Msg, State) ->
@@ -84,9 +83,10 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}},
%% is drained.
gen_event:which_handlers(error_logger),
%% handle
- file_handle_cache:obtain(apply(M, F, A ++ [Sock]))
+ file_handle_cache:transfer(apply(M, F, A ++ [Sock]))
catch {inet_error, Reason} ->
gen_tcp:close(Sock),
+ file_handle_cache:transfer(spawn(fun () -> ok end)),
error_logger:error_msg("unable to accept TCP connection: ~p~n",
[Reason])
end,
@@ -112,6 +112,7 @@ code_change(_OldVsn, State, _Extra) ->
inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
accept(State = #state{sock=LSock}) ->
+ ok = file_handle_cache:obtain(),
case prim_inet:async_accept(LSock, -1) of
{ok, Ref} -> {noreply, State#state{ref=Ref}};
Error -> {stop, {cannot_accept, Error}, State}