summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-08-19 13:07:44 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-08-19 13:07:44 +0100
commit2d72c4d556f63f11eaa91a3d1b068e59636eb5fc (patch)
tree9ca91f38ea46e6ab32e90cc10a140f115443bc8e
parent0af3a826e9a4743256ae8e86d1dcce2a9df4ef40 (diff)
parent4efa0e92e8ab33daf9fc502e8cd036cee3019ce0 (diff)
downloadrabbitmq-server-2d72c4d556f63f11eaa91a3d1b068e59636eb5fc.tar.gz
Merging bug 23138 into default
-rw-r--r--src/file_handle_cache.erl205
-rw-r--r--src/tcp_acceptor.erl7
2 files changed, 126 insertions, 86 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index fe4bdc03..74567d09 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -116,13 +116,13 @@
%% do not need to worry about their handles being closed by the server
%% - reopening them when necessary is handled transparently.
%%
-%% The server also supports obtain and release_on_death. obtain/0
-%% blocks until a file descriptor is available. release_on_death/1
-%% takes a pid and monitors the pid, reducing the count by 1 when the
-%% pid dies. Thus the assumption is that obtain/0 is called first, and
-%% when that returns, release_on_death/1 is called with the pid who
-%% "owns" the file descriptor. This is, for example, used to track the
-%% use of file descriptors through network sockets.
+%% The server also supports obtain and transfer. obtain/0 blocks until
+%% a file descriptor is available. transfer/1 is transfers ownership
+%% of a file descriptor between processes. It is non-blocking.
+%%
+%% The callers of register_callback/3, obtain/0, and the argument of
+%% transfer/1 are monitored, reducing the count of handles in use
+%% appropriately when the processes terminate.
-behaviour(gen_server).
@@ -130,7 +130,7 @@
-export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1,
last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1,
flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]).
--export([obtain/1]).
+-export([obtain/0, transfer/1]).
-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -183,7 +183,7 @@
obtain_count,
obtain_pending,
callbacks,
- client_mrefs,
+ counts,
timer_ref
}).
@@ -222,7 +222,8 @@
-spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok').
-spec(delete/1 :: (ref()) -> ok_or_error()).
-spec(clear/1 :: (ref()) -> ok_or_error()).
--spec(obtain/1 :: (pid()) -> 'ok').
+-spec(obtain/0 :: () -> 'ok').
+-spec(transfer/1 :: (pid()) -> 'ok').
-endif.
@@ -444,8 +445,11 @@ set_maximum_since_use(MaximumAge) ->
true -> ok
end.
-obtain(Pid) ->
- gen_server:call(?SERVER, {obtain, Pid}, infinity).
+obtain() ->
+ gen_server:call(?SERVER, {obtain, self()}, infinity).
+
+transfer(Pid) ->
+ gen_server:cast(?SERVER, {transfer, self(), Pid}).
%%----------------------------------------------------------------------------
%% Internal functions
@@ -738,49 +742,52 @@ init([]) ->
obtain_count = 0,
obtain_pending = [],
callbacks = dict:new(),
- client_mrefs = dict:new(),
+ counts = dict:new(),
timer_ref = undefined }}.
-handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit,
- obtain_count = Count,
- obtain_pending = Pending,
- elders = Elders })
- when Limit =/= infinity andalso Count >= Limit ->
- {noreply,
- State #fhc_state { obtain_pending = [{obtain, Pid, From} | Pending],
- elders = dict:erase(Pid, Elders) }};
-handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count,
- obtain_pending = Pending,
- elders = Elders }) ->
- case maybe_reduce(State #fhc_state { obtain_count = Count + 1 }) of
- {true, State1} ->
- {noreply, State1 #fhc_state {
- obtain_count = Count,
- obtain_pending = [{obtain, Pid, From} | Pending],
- elders = dict:erase(Pid, Elders) }};
- {false, State1} ->
- _MRef = erlang:monitor(process, Pid),
- {reply, ok, State1}
- end;
-
handle_call({open, Pid, EldestUnusedSince, CanClose}, From,
State = #fhc_state { open_count = Count,
open_pending = Pending,
elders = Elders }) ->
Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
- case maybe_reduce(
- ensure_mref(Pid, State #fhc_state { open_count = Count + 1,
- elders = Elders1 })) of
+ Item = {open, Pid, From},
+ case maybe_reduce(ensure_mref(Pid, State #fhc_state {
+ open_count = Count + 1,
+ elders = Elders1 })) of
{true, State1} ->
State2 = State1 #fhc_state { open_count = Count },
case CanClose of
true -> {reply, close, State2};
false -> {noreply, State2 #fhc_state {
- open_pending = [{open, From} | Pending],
+ open_pending = [Item | Pending],
elders = dict:erase(Pid, Elders1) }}
end;
{false, State1} ->
- {reply, ok, State1}
+ {noreply, run_pending_item(Item, State1)}
+ end;
+
+handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit,
+ obtain_count = Count,
+ obtain_pending = Pending,
+ elders = Elders })
+ when Limit =/= infinity andalso Count >= Limit ->
+ Item = {obtain, Pid, From},
+ {noreply, ensure_mref(Pid, State #fhc_state {
+ obtain_pending = [Item | Pending],
+ elders = dict:erase(Pid, Elders) })};
+handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count,
+ obtain_pending = Pending,
+ elders = Elders }) ->
+ Item = {obtain, Pid, From},
+ case maybe_reduce(ensure_mref(Pid, State #fhc_state {
+ obtain_count = Count + 1 })) of
+ {true, State1} ->
+ {noreply, State1 #fhc_state {
+ obtain_count = Count,
+ obtain_pending = [Item | Pending],
+ elders = dict:erase(Pid, Elders) }};
+ {false, State1} ->
+ {noreply, run_pending_item(Item, State1)}
end.
handle_cast({register_callback, Pid, MFA},
@@ -794,33 +801,51 @@ handle_cast({update, Pid, EldestUnusedSince}, State =
Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
%% don't call maybe_reduce from here otherwise we can create a
%% storm of messages
- {noreply, ensure_mref(Pid, State #fhc_state { elders = Elders1 })};
+ {noreply, State #fhc_state { elders = Elders1 }};
-handle_cast({close, Pid, EldestUnusedSince}, State =
- #fhc_state { elders = Elders, open_count = Count }) ->
+handle_cast({close, Pid, EldestUnusedSince},
+ State = #fhc_state { open_count = Count,
+ counts = Counts,
+ elders = Elders }) ->
Elders1 = case EldestUnusedSince of
undefined -> dict:erase(Pid, Elders);
_ -> dict:store(Pid, EldestUnusedSince, Elders)
end,
- {noreply, process_pending(
- ensure_mref(Pid, State #fhc_state { open_count = Count - 1,
- elders = Elders1 }))};
+ Counts1 = update_counts(open, Pid, -1, Counts),
+ {noreply, process_pending(State #fhc_state { open_count = Count - 1,
+ counts = Counts1,
+ elders = Elders1 })};
+
+handle_cast({transfer, FromPid, ToPid}, State) ->
+ State1 = #fhc_state { counts = Counts } = ensure_mref(ToPid, State),
+ Counts1 = update_counts(obtain, FromPid, -1, Counts),
+ Counts2 = update_counts(obtain, ToPid, +1, Counts1),
+ {noreply, process_pending(State1 #fhc_state { counts = Counts2 })};
handle_cast(check_counts, State) ->
{_, State1} = maybe_reduce(State #fhc_state { timer_ref = undefined }),
{noreply, State1}.
-handle_info({'DOWN', MRef, process, Pid, _Reason}, State =
- #fhc_state { obtain_count = Count, callbacks = Callbacks,
- client_mrefs = ClientMRefs, elders = Elders }) ->
- {noreply, process_pending(
- case dict:find(Pid, ClientMRefs) of
- {ok, MRef} -> State #fhc_state {
- elders = dict:erase(Pid, Elders),
- client_mrefs = dict:erase(Pid, ClientMRefs),
- callbacks = dict:erase(Pid, Callbacks) };
- _ -> State #fhc_state { obtain_count = Count - 1 }
- end)}.
+handle_info({'DOWN', _MRef, process, Pid, _Reason}, State =
+ #fhc_state { open_count = OpenCount,
+ open_pending = OpenPending,
+ obtain_count = ObtainCount,
+ obtain_pending = ObtainPending,
+ callbacks = Callbacks,
+ counts = Counts,
+ elders = Elders }) ->
+ FilterFun = fun ({_Kind, Pid1, _From}) -> Pid1 =/= Pid end,
+ OpenPending1 = lists:filter(FilterFun, OpenPending),
+ ObtainPending1 = lists:filter(FilterFun, ObtainPending),
+ {Opened, Obtained} = dict:fetch(Pid, Counts),
+ {noreply, process_pending(State #fhc_state {
+ open_count = OpenCount - Opened,
+ open_pending = OpenPending1,
+ obtain_count = ObtainCount - Obtained,
+ obtain_pending = ObtainPending1,
+ callbacks = dict:erase(Pid, Callbacks),
+ counts = dict:erase(Pid, Counts),
+ elders = dict:erase(Pid, Elders) })}.
terminate(_Reason, State) ->
State.
@@ -841,10 +866,10 @@ process_open(State = #fhc_state { limit = Limit,
open_pending = Pending,
open_count = OpenCount,
obtain_count = ObtainCount }) ->
- {Pending1, Inc} =
- process_pending(Pending, Limit - (ObtainCount + OpenCount)),
- State #fhc_state { open_pending = Pending1,
- open_count = OpenCount + Inc }.
+ {Pending1, Inc, State1} =
+ process_pending(Pending, Limit - (ObtainCount + OpenCount), State),
+ State1 #fhc_state { open_pending = Pending1,
+ open_count = OpenCount + Inc }.
process_obtain(State = #fhc_state { limit = Limit,
obtain_pending = Pending,
@@ -853,27 +878,39 @@ process_obtain(State = #fhc_state { limit = Limit,
open_count = OpenCount }) ->
Quota = lists:min([ObtainLimit - ObtainCount,
Limit - (ObtainCount + OpenCount)]),
- {Pending1, Inc} = process_pending(Pending, Quota),
- State #fhc_state { obtain_pending = Pending1,
- obtain_count = ObtainCount + Inc }.
-
-process_pending([], _Quota) ->
- {[], 0};
-process_pending(Pending, Quota) when Quota =< 0 ->
- {Pending, 0};
-process_pending(Pending, Quota) ->
+ {Pending1, Inc, State1} = process_pending(Pending, Quota, State),
+ State1 #fhc_state { obtain_pending = Pending1,
+ obtain_count = ObtainCount + Inc }.
+
+process_pending([], _Quota, State) ->
+ {[], 0, State};
+process_pending(Pending, Quota, State) when Quota =< 0 ->
+ {Pending, 0, State};
+process_pending(Pending, Quota, State = #fhc_state { counts = Counts }) ->
PendingLen = length(Pending),
SatisfiableLen = lists:min([PendingLen, Quota]),
Take = PendingLen - SatisfiableLen,
{PendingNew, SatisfiableRev} = lists:split(Take, Pending),
- [run_pending_item(Item) || Item <- SatisfiableRev],
- {PendingNew, SatisfiableLen}.
-
-run_pending_item({open, From}) ->
- gen_server:reply(From, ok);
-run_pending_item({obtain, Pid, From}) ->
- _MRef = erlang:monitor(process, Pid),
- gen_server:reply(From, ok).
+ Counts1 = lists:foldl(fun run_pending_item1/2, Counts, SatisfiableRev),
+ {PendingNew, SatisfiableLen, State #fhc_state { counts = Counts1 }}.
+
+run_pending_item(Item, State = #fhc_state { counts = Counts }) ->
+ State #fhc_state { counts = run_pending_item1(Item, Counts) }.
+
+run_pending_item1({Kind, Pid, From}, Counts) ->
+ gen_server:reply(From, ok),
+ update_counts(Kind, Pid, +1, Counts).
+
+update_counts(open, Pid, Delta, Counts) ->
+ dict:update(Pid, fun ({Opened, Obtained})
+ when Opened >= 0 andalso Obtained >= 0 ->
+ {Opened + Delta, Obtained} end,
+ Counts);
+update_counts(obtain, Pid, Delta, Counts) ->
+ dict:update(Pid, fun ({Opened, Obtained})
+ when Opened >= 0 andalso Obtained >= 0 ->
+ {Opened, Obtained + Delta} end,
+ Counts).
maybe_reduce(State = #fhc_state { limit = Limit,
open_count = OpenCount,
@@ -948,10 +985,10 @@ ulimit() ->
?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS
end.
-ensure_mref(Pid, State = #fhc_state { client_mrefs = ClientMRefs }) ->
- case dict:find(Pid, ClientMRefs) of
- {ok, _MRef} -> State;
- error -> MRef = erlang:monitor(process, Pid),
- State #fhc_state {
- client_mrefs = dict:store(Pid, MRef, ClientMRefs) }
+ensure_mref(Pid, State = #fhc_state { counts = Counts }) ->
+ case dict:find(Pid, Counts) of
+ {ok, _} -> State;
+ error -> _MRef = erlang:monitor(process, Pid),
+ State #fhc_state {
+ counts = dict:store(Pid, {0, 0}, Counts) }
end.
diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl
index 11ce6fc5..c9809ace 100644
--- a/src/tcp_acceptor.erl
+++ b/src/tcp_acceptor.erl
@@ -55,7 +55,7 @@ handle_call(_Request, _From, State) ->
{noreply, State}.
handle_cast(accept, State) ->
- ok = file_handle_cache:obtain(self()),
+ ok = file_handle_cache:obtain(),
accept(State);
handle_cast(_Msg, State) ->
@@ -84,7 +84,8 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}},
%% is drained.
gen_event:which_handlers(error_logger),
%% handle
- file_handle_cache:obtain(apply(M, F, A ++ [Sock]))
+ file_handle_cache:transfer(apply(M, F, A ++ [Sock])),
+ ok = file_handle_cache:obtain()
catch {inet_error, Reason} ->
gen_tcp:close(Sock),
error_logger:error_msg("unable to accept TCP connection: ~p~n",
@@ -93,11 +94,13 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}},
%% accept more
accept(State);
+
handle_info({inet_async, LSock, Ref, {error, closed}},
State=#state{sock=LSock, ref=Ref}) ->
%% It would be wrong to attempt to restart the acceptor when we
%% know this will fail.
{stop, normal, State};
+
handle_info(_Info, State) ->
{noreply, State}.