diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-19 13:07:44 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-19 13:07:44 +0100 |
commit | 2d72c4d556f63f11eaa91a3d1b068e59636eb5fc (patch) | |
tree | 9ca91f38ea46e6ab32e90cc10a140f115443bc8e | |
parent | 0af3a826e9a4743256ae8e86d1dcce2a9df4ef40 (diff) | |
parent | 4efa0e92e8ab33daf9fc502e8cd036cee3019ce0 (diff) | |
download | rabbitmq-server-2d72c4d556f63f11eaa91a3d1b068e59636eb5fc.tar.gz |
Merging bug 23138 into default
-rw-r--r-- | src/file_handle_cache.erl | 205 | ||||
-rw-r--r-- | src/tcp_acceptor.erl | 7 |
2 files changed, 126 insertions, 86 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index fe4bdc03..74567d09 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -116,13 +116,13 @@ %% do not need to worry about their handles being closed by the server %% - reopening them when necessary is handled transparently. %% -%% The server also supports obtain and release_on_death. obtain/0 -%% blocks until a file descriptor is available. release_on_death/1 -%% takes a pid and monitors the pid, reducing the count by 1 when the -%% pid dies. Thus the assumption is that obtain/0 is called first, and -%% when that returns, release_on_death/1 is called with the pid who -%% "owns" the file descriptor. This is, for example, used to track the -%% use of file descriptors through network sockets. +%% The server also supports obtain and transfer. obtain/0 blocks until +%% a file descriptor is available. transfer/1 is transfers ownership +%% of a file descriptor between processes. It is non-blocking. +%% +%% The callers of register_callback/3, obtain/0, and the argument of +%% transfer/1 are monitored, reducing the count of handles in use +%% appropriately when the processes terminate. -behaviour(gen_server). @@ -130,7 +130,7 @@ -export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1, last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1, flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]). --export([obtain/1]). +-export([obtain/0, transfer/1]). -export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -183,7 +183,7 @@ obtain_count, obtain_pending, callbacks, - client_mrefs, + counts, timer_ref }). @@ -222,7 +222,8 @@ -spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok'). -spec(delete/1 :: (ref()) -> ok_or_error()). -spec(clear/1 :: (ref()) -> ok_or_error()). --spec(obtain/1 :: (pid()) -> 'ok'). +-spec(obtain/0 :: () -> 'ok'). +-spec(transfer/1 :: (pid()) -> 'ok'). -endif. @@ -444,8 +445,11 @@ set_maximum_since_use(MaximumAge) -> true -> ok end. -obtain(Pid) -> - gen_server:call(?SERVER, {obtain, Pid}, infinity). +obtain() -> + gen_server:call(?SERVER, {obtain, self()}, infinity). + +transfer(Pid) -> + gen_server:cast(?SERVER, {transfer, self(), Pid}). %%---------------------------------------------------------------------------- %% Internal functions @@ -738,49 +742,52 @@ init([]) -> obtain_count = 0, obtain_pending = [], callbacks = dict:new(), - client_mrefs = dict:new(), + counts = dict:new(), timer_ref = undefined }}. -handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit, - obtain_count = Count, - obtain_pending = Pending, - elders = Elders }) - when Limit =/= infinity andalso Count >= Limit -> - {noreply, - State #fhc_state { obtain_pending = [{obtain, Pid, From} | Pending], - elders = dict:erase(Pid, Elders) }}; -handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count, - obtain_pending = Pending, - elders = Elders }) -> - case maybe_reduce(State #fhc_state { obtain_count = Count + 1 }) of - {true, State1} -> - {noreply, State1 #fhc_state { - obtain_count = Count, - obtain_pending = [{obtain, Pid, From} | Pending], - elders = dict:erase(Pid, Elders) }}; - {false, State1} -> - _MRef = erlang:monitor(process, Pid), - {reply, ok, State1} - end; - handle_call({open, Pid, EldestUnusedSince, CanClose}, From, State = #fhc_state { open_count = Count, open_pending = Pending, elders = Elders }) -> Elders1 = dict:store(Pid, EldestUnusedSince, Elders), - case maybe_reduce( - ensure_mref(Pid, State #fhc_state { open_count = Count + 1, - elders = Elders1 })) of + Item = {open, Pid, From}, + case maybe_reduce(ensure_mref(Pid, State #fhc_state { + open_count = Count + 1, + elders = Elders1 })) of {true, State1} -> State2 = State1 #fhc_state { open_count = Count }, case CanClose of true -> {reply, close, State2}; false -> {noreply, State2 #fhc_state { - open_pending = [{open, From} | Pending], + open_pending = [Item | Pending], elders = dict:erase(Pid, Elders1) }} end; {false, State1} -> - {reply, ok, State1} + {noreply, run_pending_item(Item, State1)} + end; + +handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit, + obtain_count = Count, + obtain_pending = Pending, + elders = Elders }) + when Limit =/= infinity andalso Count >= Limit -> + Item = {obtain, Pid, From}, + {noreply, ensure_mref(Pid, State #fhc_state { + obtain_pending = [Item | Pending], + elders = dict:erase(Pid, Elders) })}; +handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count, + obtain_pending = Pending, + elders = Elders }) -> + Item = {obtain, Pid, From}, + case maybe_reduce(ensure_mref(Pid, State #fhc_state { + obtain_count = Count + 1 })) of + {true, State1} -> + {noreply, State1 #fhc_state { + obtain_count = Count, + obtain_pending = [Item | Pending], + elders = dict:erase(Pid, Elders) }}; + {false, State1} -> + {noreply, run_pending_item(Item, State1)} end. handle_cast({register_callback, Pid, MFA}, @@ -794,33 +801,51 @@ handle_cast({update, Pid, EldestUnusedSince}, State = Elders1 = dict:store(Pid, EldestUnusedSince, Elders), %% don't call maybe_reduce from here otherwise we can create a %% storm of messages - {noreply, ensure_mref(Pid, State #fhc_state { elders = Elders1 })}; + {noreply, State #fhc_state { elders = Elders1 }}; -handle_cast({close, Pid, EldestUnusedSince}, State = - #fhc_state { elders = Elders, open_count = Count }) -> +handle_cast({close, Pid, EldestUnusedSince}, + State = #fhc_state { open_count = Count, + counts = Counts, + elders = Elders }) -> Elders1 = case EldestUnusedSince of undefined -> dict:erase(Pid, Elders); _ -> dict:store(Pid, EldestUnusedSince, Elders) end, - {noreply, process_pending( - ensure_mref(Pid, State #fhc_state { open_count = Count - 1, - elders = Elders1 }))}; + Counts1 = update_counts(open, Pid, -1, Counts), + {noreply, process_pending(State #fhc_state { open_count = Count - 1, + counts = Counts1, + elders = Elders1 })}; + +handle_cast({transfer, FromPid, ToPid}, State) -> + State1 = #fhc_state { counts = Counts } = ensure_mref(ToPid, State), + Counts1 = update_counts(obtain, FromPid, -1, Counts), + Counts2 = update_counts(obtain, ToPid, +1, Counts1), + {noreply, process_pending(State1 #fhc_state { counts = Counts2 })}; handle_cast(check_counts, State) -> {_, State1} = maybe_reduce(State #fhc_state { timer_ref = undefined }), {noreply, State1}. -handle_info({'DOWN', MRef, process, Pid, _Reason}, State = - #fhc_state { obtain_count = Count, callbacks = Callbacks, - client_mrefs = ClientMRefs, elders = Elders }) -> - {noreply, process_pending( - case dict:find(Pid, ClientMRefs) of - {ok, MRef} -> State #fhc_state { - elders = dict:erase(Pid, Elders), - client_mrefs = dict:erase(Pid, ClientMRefs), - callbacks = dict:erase(Pid, Callbacks) }; - _ -> State #fhc_state { obtain_count = Count - 1 } - end)}. +handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = + #fhc_state { open_count = OpenCount, + open_pending = OpenPending, + obtain_count = ObtainCount, + obtain_pending = ObtainPending, + callbacks = Callbacks, + counts = Counts, + elders = Elders }) -> + FilterFun = fun ({_Kind, Pid1, _From}) -> Pid1 =/= Pid end, + OpenPending1 = lists:filter(FilterFun, OpenPending), + ObtainPending1 = lists:filter(FilterFun, ObtainPending), + {Opened, Obtained} = dict:fetch(Pid, Counts), + {noreply, process_pending(State #fhc_state { + open_count = OpenCount - Opened, + open_pending = OpenPending1, + obtain_count = ObtainCount - Obtained, + obtain_pending = ObtainPending1, + callbacks = dict:erase(Pid, Callbacks), + counts = dict:erase(Pid, Counts), + elders = dict:erase(Pid, Elders) })}. terminate(_Reason, State) -> State. @@ -841,10 +866,10 @@ process_open(State = #fhc_state { limit = Limit, open_pending = Pending, open_count = OpenCount, obtain_count = ObtainCount }) -> - {Pending1, Inc} = - process_pending(Pending, Limit - (ObtainCount + OpenCount)), - State #fhc_state { open_pending = Pending1, - open_count = OpenCount + Inc }. + {Pending1, Inc, State1} = + process_pending(Pending, Limit - (ObtainCount + OpenCount), State), + State1 #fhc_state { open_pending = Pending1, + open_count = OpenCount + Inc }. process_obtain(State = #fhc_state { limit = Limit, obtain_pending = Pending, @@ -853,27 +878,39 @@ process_obtain(State = #fhc_state { limit = Limit, open_count = OpenCount }) -> Quota = lists:min([ObtainLimit - ObtainCount, Limit - (ObtainCount + OpenCount)]), - {Pending1, Inc} = process_pending(Pending, Quota), - State #fhc_state { obtain_pending = Pending1, - obtain_count = ObtainCount + Inc }. - -process_pending([], _Quota) -> - {[], 0}; -process_pending(Pending, Quota) when Quota =< 0 -> - {Pending, 0}; -process_pending(Pending, Quota) -> + {Pending1, Inc, State1} = process_pending(Pending, Quota, State), + State1 #fhc_state { obtain_pending = Pending1, + obtain_count = ObtainCount + Inc }. + +process_pending([], _Quota, State) -> + {[], 0, State}; +process_pending(Pending, Quota, State) when Quota =< 0 -> + {Pending, 0, State}; +process_pending(Pending, Quota, State = #fhc_state { counts = Counts }) -> PendingLen = length(Pending), SatisfiableLen = lists:min([PendingLen, Quota]), Take = PendingLen - SatisfiableLen, {PendingNew, SatisfiableRev} = lists:split(Take, Pending), - [run_pending_item(Item) || Item <- SatisfiableRev], - {PendingNew, SatisfiableLen}. - -run_pending_item({open, From}) -> - gen_server:reply(From, ok); -run_pending_item({obtain, Pid, From}) -> - _MRef = erlang:monitor(process, Pid), - gen_server:reply(From, ok). + Counts1 = lists:foldl(fun run_pending_item1/2, Counts, SatisfiableRev), + {PendingNew, SatisfiableLen, State #fhc_state { counts = Counts1 }}. + +run_pending_item(Item, State = #fhc_state { counts = Counts }) -> + State #fhc_state { counts = run_pending_item1(Item, Counts) }. + +run_pending_item1({Kind, Pid, From}, Counts) -> + gen_server:reply(From, ok), + update_counts(Kind, Pid, +1, Counts). + +update_counts(open, Pid, Delta, Counts) -> + dict:update(Pid, fun ({Opened, Obtained}) + when Opened >= 0 andalso Obtained >= 0 -> + {Opened + Delta, Obtained} end, + Counts); +update_counts(obtain, Pid, Delta, Counts) -> + dict:update(Pid, fun ({Opened, Obtained}) + when Opened >= 0 andalso Obtained >= 0 -> + {Opened, Obtained + Delta} end, + Counts). maybe_reduce(State = #fhc_state { limit = Limit, open_count = OpenCount, @@ -948,10 +985,10 @@ ulimit() -> ?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS end. -ensure_mref(Pid, State = #fhc_state { client_mrefs = ClientMRefs }) -> - case dict:find(Pid, ClientMRefs) of - {ok, _MRef} -> State; - error -> MRef = erlang:monitor(process, Pid), - State #fhc_state { - client_mrefs = dict:store(Pid, MRef, ClientMRefs) } +ensure_mref(Pid, State = #fhc_state { counts = Counts }) -> + case dict:find(Pid, Counts) of + {ok, _} -> State; + error -> _MRef = erlang:monitor(process, Pid), + State #fhc_state { + counts = dict:store(Pid, {0, 0}, Counts) } end. diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl index 11ce6fc5..c9809ace 100644 --- a/src/tcp_acceptor.erl +++ b/src/tcp_acceptor.erl @@ -55,7 +55,7 @@ handle_call(_Request, _From, State) -> {noreply, State}. handle_cast(accept, State) -> - ok = file_handle_cache:obtain(self()), + ok = file_handle_cache:obtain(), accept(State); handle_cast(_Msg, State) -> @@ -84,7 +84,8 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}}, %% is drained. gen_event:which_handlers(error_logger), %% handle - file_handle_cache:obtain(apply(M, F, A ++ [Sock])) + file_handle_cache:transfer(apply(M, F, A ++ [Sock])), + ok = file_handle_cache:obtain() catch {inet_error, Reason} -> gen_tcp:close(Sock), error_logger:error_msg("unable to accept TCP connection: ~p~n", @@ -93,11 +94,13 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}}, %% accept more accept(State); + handle_info({inet_async, LSock, Ref, {error, closed}}, State=#state{sock=LSock, ref=Ref}) -> %% It would be wrong to attempt to restart the acceptor when we %% know this will fail. {stop, normal, State}; + handle_info(_Info, State) -> {noreply, State}. |