diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-17 13:54:03 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-17 13:54:03 +0100 |
commit | 73f9b32cce3c81409fa41a810b2dc316a426a042 (patch) | |
tree | e346322457eb7deae44bf82b669d30e979667f8a | |
parent | 64d7eebf985987a10eaeaef3f8e8111b1f9cd5bf (diff) | |
download | rabbitmq-server-73f9b32cce3c81409fa41a810b2dc316a426a042.tar.gz |
Combine obtains and release_on_death
-rw-r--r-- | src/file_handle_cache.erl | 57 | ||||
-rw-r--r-- | src/tcp_acceptor.erl | 6 |
2 files changed, 33 insertions, 30 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 19b2654f..ddf8fe38 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -130,7 +130,7 @@ -export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1, last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1, flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]). --export([release_on_death/1, obtain/0]). +-export([obtain_and_release_on_death/1]). -export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -222,8 +222,7 @@ -spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok'). -spec(delete/1 :: (ref()) -> ok_or_error()). -spec(clear/1 :: (ref()) -> ok_or_error()). --spec(release_on_death/1 :: (pid()) -> 'ok'). --spec(obtain/0 :: () -> 'ok'). +-spec(obtain_and_release_on_death/1 :: (pid()) -> 'ok'). -endif. @@ -445,11 +444,8 @@ set_maximum_since_use(MaximumAge) -> true -> ok end. -release_on_death(Pid) when is_pid(Pid) -> - gen_server:cast(?SERVER, {release_on_death, Pid}). - -obtain() -> - gen_server:call(?SERVER, {obtain, self()}, infinity). +obtain_and_release_on_death(Pid) -> + gen_server:call(?SERVER, {obtain_and_release_on_death, Pid}, infinity). %%---------------------------------------------------------------------------- %% Internal functions @@ -745,22 +741,27 @@ init([]) -> client_mrefs = dict:new(), timer_ref = undefined }}. -handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit, - obtain_count = Count, - obtain_pending = Pending, - elders = Elders }) +handle_call({obtain_and_release_on_death, Pid}, From, + State = #fhc_state { obtain_limit = Limit, + obtain_count = Count, + obtain_pending = Pending, + elders = Elders }) when Limit =/= infinity andalso Count >= Limit -> - {noreply, State #fhc_state { obtain_pending = [From | Pending], - elders = dict:erase(Pid, Elders) }}; -handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count, - obtain_pending = Pending, - elders = Elders }) -> + {noreply, + State #fhc_state { obtain_pending = [{obtain, Pid, From} | Pending], + elders = dict:erase(Pid, Elders) }}; +handle_call({obtain_and_release_on_death, Pid}, From, + State = #fhc_state { obtain_count = Count, + obtain_pending = Pending, + elders = Elders }) -> case maybe_reduce(State #fhc_state { obtain_count = Count + 1 }) of {true, State1} -> - {noreply, State1 #fhc_state { obtain_count = Count, - obtain_pending = [From | Pending], - elders = dict:erase(Pid, Elders) }}; + {noreply, State1 #fhc_state { + obtain_count = Count, + obtain_pending = [{obtain, Pid, From} | Pending], + elders = dict:erase(Pid, Elders) }}; {false, State1} -> + _MRef = erlang:monitor(process, Pid), {reply, ok, State1} end; @@ -777,7 +778,7 @@ handle_call({open, Pid, EldestUnusedSince, CanClose}, From, case CanClose of true -> {reply, close, State2}; false -> {noreply, State2 #fhc_state { - open_pending = [From | Pending], + open_pending = [{open, From} | Pending], elders = dict:erase(Pid, Elders1) }} end; {false, State1} -> @@ -809,11 +810,7 @@ handle_cast({close, Pid, EldestUnusedSince}, State = handle_cast(check_counts, State) -> {_, State1} = maybe_reduce(State #fhc_state { timer_ref = undefined }), - {noreply, State1}; - -handle_cast({release_on_death, Pid}, State) -> - _MRef = erlang:monitor(process, Pid), - {noreply, State}. + {noreply, State1}. handle_info({'DOWN', MRef, process, Pid, _Reason}, State = #fhc_state { obtain_count = Count, callbacks = Callbacks, @@ -871,9 +868,15 @@ process_pending(Pending, Quota) -> SatisfiableLen = lists:min([PendingLen, Quota]), Take = PendingLen - SatisfiableLen, {PendingNew, SatisfiableRev} = lists:split(Take, Pending), - [gen_server:reply(From, ok) || From <- SatisfiableRev], + [run_pending_item(Item) || Item <- SatisfiableRev], {PendingNew, SatisfiableLen}. +run_pending_item({open, From}) -> + gen_server:reply(From, ok); +run_pending_item({obtain, Pid, From}) -> + _MRef = erlang:monitor(process, Pid), + gen_server:reply(From, ok). + maybe_reduce(State = #fhc_state { limit = Limit, open_count = OpenCount, open_pending = OpenPending, diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl index cc4982c9..88fd2fd4 100644 --- a/src/tcp_acceptor.erl +++ b/src/tcp_acceptor.erl @@ -43,11 +43,12 @@ %%-------------------------------------------------------------------- start_link(Callback, LSock) -> - gen_server:start_link(?MODULE, {Callback, LSock}, []). + gen_server:start_link(?MODULE, {Callback, LSock}, [{timeout, infinity}]). %%-------------------------------------------------------------------- init({Callback, LSock}) -> + ok = file_handle_cache:obtain_and_release_on_death(self()), gen_server:cast(self(), accept), {ok, #state{callback=Callback, sock=LSock}}. @@ -83,7 +84,7 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}}, %% is drained. gen_event:which_handlers(error_logger), %% handle - file_handle_cache:release_on_death(apply(M, F, A ++ [Sock])) + file_handle_cache:obtain_and_release_on_death(apply(M, F, A ++ [Sock])) catch {inet_error, Reason} -> gen_tcp:close(Sock), error_logger:error_msg("unable to accept TCP connection: ~p~n", @@ -111,7 +112,6 @@ code_change(_OldVsn, State, _Extra) -> inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). accept(State = #state{sock=LSock}) -> - ok = file_handle_cache:obtain(), case prim_inet:async_accept(LSock, -1) of {ok, Ref} -> {noreply, State#state{ref=Ref}}; Error -> {stop, {cannot_accept, Error}, State} |