diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2010-08-17 12:54:57 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-08-17 12:54:57 +0100 |
commit | 64d7eebf985987a10eaeaef3f8e8111b1f9cd5bf (patch) | |
tree | 257d1e863c0fb0cfcf671bf2998b1950e888287e | |
parent | 8ccf027aee330024c6c626d485798fc796a6dd6e (diff) | |
parent | c9ecb104b823054dfda5333d2aa8f6f019882d1d (diff) | |
download | rabbitmq-server-64d7eebf985987a10eaeaef3f8e8111b1f9cd5bf.tar.gz |
merge bug23132 into default
-rw-r--r-- | src/file_handle_cache.erl | 241 |
1 files changed, 165 insertions, 76 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 8db5a794..19b2654f 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -148,6 +148,8 @@ -define(FILE_HANDLES_LIMIT_OTHER, 1024). -define(FILE_HANDLES_CHECK_INTERVAL, 2000). +-define(OBTAIN_LIMIT(LIMIT), trunc((LIMIT * 0.9) - 1)). + %%---------------------------------------------------------------------------- -record(file, @@ -175,8 +177,11 @@ -record(fhc_state, { elders, limit, - count, - obtains, + open_count, + open_pending, + obtain_limit, + obtain_count, + obtain_pending, callbacks, client_mrefs, timer_ref @@ -308,7 +313,7 @@ append(Ref, Data) -> Size1 = Size + iolist_size(Data), Handle2 = Handle1 #handle { write_buffer = WriteBuffer1, write_buffer_size = Size1 }, - case Limit /= infinity andalso Size1 > Limit of + case Limit =/= infinity andalso Size1 > Limit of true -> {Result, Handle3} = write_buffer(Handle2), {Result, [Handle3]}; false -> {ok, [Handle2]} @@ -429,28 +434,22 @@ set_maximum_since_use(MaximumAge) -> fun ({{Ref, fhc_handle}, Handle = #handle { hdl = Hdl, last_used_at = Then }}, Rep) -> Age = timer:now_diff(Now, Then), - case Hdl /= closed andalso Age >= MaximumAge of - true -> {Res, Handle1} = soft_close(Handle), - case Res of - ok -> put({Ref, fhc_handle}, Handle1), - false; - _ -> put_handle(Ref, Handle1), - Rep - end; + case Hdl =/= closed andalso Age >= MaximumAge of + true -> soft_close(Ref, Handle) orelse Rep; false -> Rep end; (_KeyValuePair, Rep) -> Rep - end, true, get()) of - true -> age_tree_change(), ok; - false -> ok + end, false, get()) of + false -> age_tree_change(), ok; + true -> ok end. release_on_death(Pid) when is_pid(Pid) -> gen_server:cast(?SERVER, {release_on_death, Pid}). obtain() -> - gen_server:call(?SERVER, obtain, infinity). + gen_server:call(?SERVER, {obtain, self()}, infinity). %%---------------------------------------------------------------------------- %% Internal functions @@ -523,20 +522,30 @@ put_handle(Ref, Handle = #handle { last_used_at = Then }) -> age_tree_update(Then, Now, Ref), put({Ref, fhc_handle}, Handle #handle { last_used_at = Now }). -with_age_tree(Fun) -> - put(fhc_age_tree, Fun(case get(fhc_age_tree) of - undefined -> gb_trees:empty(); - AgeTree -> AgeTree - end)). +with_age_tree(Fun) -> put_age_tree(Fun(get_age_tree())). + +get_age_tree() -> + case get(fhc_age_tree) of + undefined -> gb_trees:empty(); + AgeTree -> AgeTree + end. + +put_age_tree(Tree) -> put(fhc_age_tree, Tree). age_tree_insert(Now, Ref) -> - with_age_tree( - fun (Tree) -> - Tree1 = gb_trees:insert(Now, Ref, Tree), - {Oldest, _Ref} = gb_trees:smallest(Tree1), - gen_server:cast(?SERVER, {open, self(), Oldest}), - Tree1 - end). + Tree = get_age_tree(), + Tree1 = gb_trees:insert(Now, Ref, Tree), + {Oldest, _Ref} = gb_trees:smallest(Tree1), + case gen_server:call(?SERVER, {open, self(), Oldest, + not gb_trees:is_empty(Tree)}, infinity) of + ok -> + put_age_tree(Tree1); + close -> + [soft_close(Ref1, Handle1) || + {{Ref1, fhc_handle}, Handle1 = #handle { hdl = Hdl1 }} <- get(), + Hdl1 =/= closed], + age_tree_insert(Now, Ref) + end. age_tree_update(Then, Now, Ref) -> with_age_tree( @@ -575,6 +584,8 @@ open1(Path, Mode, Options, Ref, Offset, NewOrReopen) -> new -> Mode; reopen -> [read | Mode] end, + Now = now(), + age_tree_insert(Now, Ref), case file:open(Path, Mode1) of {ok, Hdl} -> WriteBufferSize = @@ -583,7 +594,6 @@ open1(Path, Mode, Options, Ref, Offset, NewOrReopen) -> infinity -> infinity; N when is_integer(N) -> N end, - Now = now(), Handle = #handle { hdl = Hdl, offset = 0, trusted_offset = 0, @@ -601,12 +611,21 @@ open1(Path, Mode, Options, Ref, Offset, NewOrReopen) -> {{ok, Offset1}, Handle1} = maybe_seek(Offset, Handle), Handle2 = Handle1 #handle { trusted_offset = Offset1 }, put({Ref, fhc_handle}, Handle2), - age_tree_insert(Now, Ref), {ok, Handle2}; {error, Reason} -> + age_tree_delete(Now), {error, Reason} end. +soft_close(Ref, Handle) -> + {Res, Handle1} = soft_close(Handle), + case Res of + ok -> put({Ref, fhc_handle}, Handle1), + true; + _ -> put_handle(Ref, Handle1), + false + end. + soft_close(Handle = #handle { hdl = closed }) -> {ok, Handle}; soft_close(Handle) -> @@ -709,18 +728,60 @@ init([]) -> _ -> ulimit() end, - error_logger:info_msg("Limiting to approx ~p file handles~n", [Limit]), - {ok, #fhc_state { elders = dict:new(), limit = Limit, count = 0, - obtains = [], callbacks = dict:new(), - client_mrefs = dict:new(), timer_ref = undefined }}. - -handle_call(obtain, From, State = #fhc_state { count = Count }) -> - State1 = #fhc_state { count = Count1, limit = Limit, obtains = Obtains } = - maybe_reduce(State #fhc_state { count = Count + 1 }), - case Limit /= infinity andalso Count1 >= Limit of - true -> {noreply, State1 #fhc_state { obtains = [From | Obtains], - count = Count1 - 1 }}; - false -> {reply, ok, State1} + ObtainLimit = case Limit of + infinity -> infinity; + _ -> ?OBTAIN_LIMIT(Limit) + end, + error_logger:info_msg("Limiting to approx ~p file handles (~p sockets)~n", + [Limit, ObtainLimit]), + {ok, #fhc_state { elders = dict:new(), + limit = Limit, + open_count = 0, + open_pending = [], + obtain_limit = ObtainLimit, + obtain_count = 0, + obtain_pending = [], + callbacks = dict:new(), + client_mrefs = dict:new(), + timer_ref = undefined }}. + +handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit, + obtain_count = Count, + obtain_pending = Pending, + elders = Elders }) + when Limit =/= infinity andalso Count >= Limit -> + {noreply, State #fhc_state { obtain_pending = [From | Pending], + elders = dict:erase(Pid, Elders) }}; +handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count, + obtain_pending = Pending, + elders = Elders }) -> + case maybe_reduce(State #fhc_state { obtain_count = Count + 1 }) of + {true, State1} -> + {noreply, State1 #fhc_state { obtain_count = Count, + obtain_pending = [From | Pending], + elders = dict:erase(Pid, Elders) }}; + {false, State1} -> + {reply, ok, State1} + end; + +handle_call({open, Pid, EldestUnusedSince, CanClose}, From, + State = #fhc_state { open_count = Count, + open_pending = Pending, + elders = Elders }) -> + Elders1 = dict:store(Pid, EldestUnusedSince, Elders), + case maybe_reduce( + ensure_mref(Pid, State #fhc_state { open_count = Count + 1, + elders = Elders1 })) of + {true, State1} -> + State2 = State1 #fhc_state { open_count = Count }, + case CanClose of + true -> {reply, close, State2}; + false -> {noreply, State2 #fhc_state { + open_pending = [From | Pending], + elders = dict:erase(Pid, Elders1) }} + end; + {false, State1} -> + {reply, ok, State1} end. handle_cast({register_callback, Pid, MFA}, @@ -729,47 +790,41 @@ handle_cast({register_callback, Pid, MFA}, Pid, State #fhc_state { callbacks = dict:store(Pid, MFA, Callbacks) })}; -handle_cast({open, Pid, EldestUnusedSince}, State = - #fhc_state { elders = Elders, count = Count }) -> - Elders1 = dict:store(Pid, EldestUnusedSince, Elders), - {noreply, maybe_reduce( - ensure_mref(Pid, State #fhc_state { elders = Elders1, - count = Count + 1 }))}; - handle_cast({update, Pid, EldestUnusedSince}, State = - #fhc_state { elders = Elders }) -> + #fhc_state { elders = Elders }) -> Elders1 = dict:store(Pid, EldestUnusedSince, Elders), %% don't call maybe_reduce from here otherwise we can create a %% storm of messages {noreply, ensure_mref(Pid, State #fhc_state { elders = Elders1 })}; handle_cast({close, Pid, EldestUnusedSince}, State = - #fhc_state { elders = Elders, count = Count }) -> + #fhc_state { elders = Elders, open_count = Count }) -> Elders1 = case EldestUnusedSince of undefined -> dict:erase(Pid, Elders); _ -> dict:store(Pid, EldestUnusedSince, Elders) end, - {noreply, process_obtains( - ensure_mref(Pid, State #fhc_state { elders = Elders1, - count = Count - 1 }))}; + {noreply, process_pending( + ensure_mref(Pid, State #fhc_state { open_count = Count - 1, + elders = Elders1 }))}; handle_cast(check_counts, State) -> - {noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })}; + {_, State1} = maybe_reduce(State #fhc_state { timer_ref = undefined }), + {noreply, State1}; handle_cast({release_on_death, Pid}, State) -> _MRef = erlang:monitor(process, Pid), {noreply, State}. handle_info({'DOWN', MRef, process, Pid, _Reason}, State = - #fhc_state { count = Count, callbacks = Callbacks, + #fhc_state { obtain_count = Count, callbacks = Callbacks, client_mrefs = ClientMRefs, elders = Elders }) -> - {noreply, process_obtains( + {noreply, process_pending( case dict:find(Pid, ClientMRefs) of {ok, MRef} -> State #fhc_state { elders = dict:erase(Pid, Elders), client_mrefs = dict:erase(Pid, ClientMRefs), callbacks = dict:erase(Pid, Callbacks) }; - _ -> State #fhc_state { count = Count - 1 } + _ -> State #fhc_state { obtain_count = Count - 1 } end)}. terminate(_Reason, State) -> @@ -782,23 +837,56 @@ code_change(_OldVsn, State, _Extra) -> %% server helpers %%---------------------------------------------------------------------------- -process_obtains(State = #fhc_state { obtains = [] }) -> +process_pending(State = #fhc_state { limit = infinity }) -> State; -process_obtains(State = #fhc_state { limit = Limit, count = Count }) - when Limit /= infinity andalso Count >= Limit -> - State; -process_obtains(State = #fhc_state { limit = Limit, count = Count, - obtains = Obtains }) -> - ObtainsLen = length(Obtains), - ObtainableLen = lists:min([ObtainsLen, Limit - Count]), - Take = ObtainsLen - ObtainableLen, - {ObtainsNew, ObtainableRev} = lists:split(Take, Obtains), - [gen_server:reply(From, ok) || From <- ObtainableRev], - State #fhc_state { count = Count + ObtainableLen, obtains = ObtainsNew }. - -maybe_reduce(State = #fhc_state { limit = Limit, count = Count, elders = Elders, - callbacks = Callbacks, timer_ref = TRef }) - when Limit /= infinity andalso Count >= Limit -> +process_pending(State) -> + process_obtain(process_open(State)). + +process_open(State = #fhc_state { limit = Limit, + open_pending = Pending, + open_count = OpenCount, + obtain_count = ObtainCount }) -> + {Pending1, Inc} = + process_pending(Pending, Limit - (ObtainCount + OpenCount)), + State #fhc_state { open_pending = Pending1, + open_count = OpenCount + Inc }. + +process_obtain(State = #fhc_state { limit = Limit, + obtain_pending = Pending, + obtain_limit = ObtainLimit, + obtain_count = ObtainCount, + open_count = OpenCount }) -> + Quota = lists:min([ObtainLimit - ObtainCount, + Limit - (ObtainCount + OpenCount)]), + {Pending1, Inc} = process_pending(Pending, Quota), + State #fhc_state { obtain_pending = Pending1, + obtain_count = ObtainCount + Inc }. + +process_pending([], _Quota) -> + {[], 0}; +process_pending(Pending, Quota) when Quota =< 0 -> + {Pending, 0}; +process_pending(Pending, Quota) -> + PendingLen = length(Pending), + SatisfiableLen = lists:min([PendingLen, Quota]), + Take = PendingLen - SatisfiableLen, + {PendingNew, SatisfiableRev} = lists:split(Take, Pending), + [gen_server:reply(From, ok) || From <- SatisfiableRev], + {PendingNew, SatisfiableLen}. + +maybe_reduce(State = #fhc_state { limit = Limit, + open_count = OpenCount, + open_pending = OpenPending, + obtain_count = ObtainCount, + obtain_limit = ObtainLimit, + obtain_pending = ObtainPending, + elders = Elders, + callbacks = Callbacks, + timer_ref = TRef }) + when Limit =/= infinity andalso + (((OpenCount + ObtainCount) > Limit) orelse + (OpenPending =/= []) orelse + (ObtainCount < ObtainLimit andalso ObtainPending =/= [])) -> Now = now(), {Pids, Sum, ClientCount} = dict:fold(fun (_Pid, undefined, Accs) -> @@ -818,15 +906,16 @@ maybe_reduce(State = #fhc_state { limit = Limit, count = Count, elders = Elders, end end, Pids) end, + AboveLimit = Limit =/= infinity andalso OpenCount + ObtainCount > Limit, case TRef of undefined -> {ok, TRef1} = timer:apply_after( ?FILE_HANDLES_CHECK_INTERVAL, gen_server, cast, [?SERVER, check_counts]), - State #fhc_state { timer_ref = TRef1 }; - _ -> State + {AboveLimit, State #fhc_state { timer_ref = TRef1 }}; + _ -> {AboveLimit, State} end; maybe_reduce(State) -> - State. + {false, State}. %% For all unices, assume ulimit exists. Further googling suggests %% that BSDs (incl OS X), solaris and linux all agree that ulimit -n |