summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-08-17 13:54:03 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-08-17 13:54:03 +0100
commit73f9b32cce3c81409fa41a810b2dc316a426a042 (patch)
treee346322457eb7deae44bf82b669d30e979667f8a
parent64d7eebf985987a10eaeaef3f8e8111b1f9cd5bf (diff)
downloadrabbitmq-server-73f9b32cce3c81409fa41a810b2dc316a426a042.tar.gz
Combine obtains and release_on_death
-rw-r--r--src/file_handle_cache.erl57
-rw-r--r--src/tcp_acceptor.erl6
2 files changed, 33 insertions, 30 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 19b2654f..ddf8fe38 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -130,7 +130,7 @@
-export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1,
last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1,
flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]).
--export([release_on_death/1, obtain/0]).
+-export([obtain_and_release_on_death/1]).
-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -222,8 +222,7 @@
-spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok').
-spec(delete/1 :: (ref()) -> ok_or_error()).
-spec(clear/1 :: (ref()) -> ok_or_error()).
--spec(release_on_death/1 :: (pid()) -> 'ok').
--spec(obtain/0 :: () -> 'ok').
+-spec(obtain_and_release_on_death/1 :: (pid()) -> 'ok').
-endif.
@@ -445,11 +444,8 @@ set_maximum_since_use(MaximumAge) ->
true -> ok
end.
-release_on_death(Pid) when is_pid(Pid) ->
- gen_server:cast(?SERVER, {release_on_death, Pid}).
-
-obtain() ->
- gen_server:call(?SERVER, {obtain, self()}, infinity).
+obtain_and_release_on_death(Pid) ->
+ gen_server:call(?SERVER, {obtain_and_release_on_death, Pid}, infinity).
%%----------------------------------------------------------------------------
%% Internal functions
@@ -745,22 +741,27 @@ init([]) ->
client_mrefs = dict:new(),
timer_ref = undefined }}.
-handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit,
- obtain_count = Count,
- obtain_pending = Pending,
- elders = Elders })
+handle_call({obtain_and_release_on_death, Pid}, From,
+ State = #fhc_state { obtain_limit = Limit,
+ obtain_count = Count,
+ obtain_pending = Pending,
+ elders = Elders })
when Limit =/= infinity andalso Count >= Limit ->
- {noreply, State #fhc_state { obtain_pending = [From | Pending],
- elders = dict:erase(Pid, Elders) }};
-handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count,
- obtain_pending = Pending,
- elders = Elders }) ->
+ {noreply,
+ State #fhc_state { obtain_pending = [{obtain, Pid, From} | Pending],
+ elders = dict:erase(Pid, Elders) }};
+handle_call({obtain_and_release_on_death, Pid}, From,
+ State = #fhc_state { obtain_count = Count,
+ obtain_pending = Pending,
+ elders = Elders }) ->
case maybe_reduce(State #fhc_state { obtain_count = Count + 1 }) of
{true, State1} ->
- {noreply, State1 #fhc_state { obtain_count = Count,
- obtain_pending = [From | Pending],
- elders = dict:erase(Pid, Elders) }};
+ {noreply, State1 #fhc_state {
+ obtain_count = Count,
+ obtain_pending = [{obtain, Pid, From} | Pending],
+ elders = dict:erase(Pid, Elders) }};
{false, State1} ->
+ _MRef = erlang:monitor(process, Pid),
{reply, ok, State1}
end;
@@ -777,7 +778,7 @@ handle_call({open, Pid, EldestUnusedSince, CanClose}, From,
case CanClose of
true -> {reply, close, State2};
false -> {noreply, State2 #fhc_state {
- open_pending = [From | Pending],
+ open_pending = [{open, From} | Pending],
elders = dict:erase(Pid, Elders1) }}
end;
{false, State1} ->
@@ -809,11 +810,7 @@ handle_cast({close, Pid, EldestUnusedSince}, State =
handle_cast(check_counts, State) ->
{_, State1} = maybe_reduce(State #fhc_state { timer_ref = undefined }),
- {noreply, State1};
-
-handle_cast({release_on_death, Pid}, State) ->
- _MRef = erlang:monitor(process, Pid),
- {noreply, State}.
+ {noreply, State1}.
handle_info({'DOWN', MRef, process, Pid, _Reason}, State =
#fhc_state { obtain_count = Count, callbacks = Callbacks,
@@ -871,9 +868,15 @@ process_pending(Pending, Quota) ->
SatisfiableLen = lists:min([PendingLen, Quota]),
Take = PendingLen - SatisfiableLen,
{PendingNew, SatisfiableRev} = lists:split(Take, Pending),
- [gen_server:reply(From, ok) || From <- SatisfiableRev],
+ [run_pending_item(Item) || Item <- SatisfiableRev],
{PendingNew, SatisfiableLen}.
+run_pending_item({open, From}) ->
+ gen_server:reply(From, ok);
+run_pending_item({obtain, Pid, From}) ->
+ _MRef = erlang:monitor(process, Pid),
+ gen_server:reply(From, ok).
+
maybe_reduce(State = #fhc_state { limit = Limit,
open_count = OpenCount,
open_pending = OpenPending,
diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl
index cc4982c9..88fd2fd4 100644
--- a/src/tcp_acceptor.erl
+++ b/src/tcp_acceptor.erl
@@ -43,11 +43,12 @@
%%--------------------------------------------------------------------
start_link(Callback, LSock) ->
- gen_server:start_link(?MODULE, {Callback, LSock}, []).
+ gen_server:start_link(?MODULE, {Callback, LSock}, [{timeout, infinity}]).
%%--------------------------------------------------------------------
init({Callback, LSock}) ->
+ ok = file_handle_cache:obtain_and_release_on_death(self()),
gen_server:cast(self(), accept),
{ok, #state{callback=Callback, sock=LSock}}.
@@ -83,7 +84,7 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}},
%% is drained.
gen_event:which_handlers(error_logger),
%% handle
- file_handle_cache:release_on_death(apply(M, F, A ++ [Sock]))
+ file_handle_cache:obtain_and_release_on_death(apply(M, F, A ++ [Sock]))
catch {inet_error, Reason} ->
gen_tcp:close(Sock),
error_logger:error_msg("unable to accept TCP connection: ~p~n",
@@ -111,7 +112,6 @@ code_change(_OldVsn, State, _Extra) ->
inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
accept(State = #state{sock=LSock}) ->
- ok = file_handle_cache:obtain(),
case prim_inet:async_accept(LSock, -1) of
{ok, Ref} -> {noreply, State#state{ref=Ref}};
Error -> {stop, {cannot_accept, Error}, State}