diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2010-08-22 20:56:10 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-08-22 20:56:10 +0100 |
commit | c7e3e3a4aa9f3ba958ec3384a39af6106a13f534 (patch) | |
tree | 289f751681f1801300d36561a2042c6ef19873c6 | |
parent | e47333131fc5c3d196fdad432cd1573ff486feb5 (diff) | |
parent | 1d2dde0107fd11dad163d28797f03d36991a2580 (diff) | |
download | rabbitmq-server-c7e3e3a4aa9f3ba958ec3384a39af6106a13f534.tar.gz |
merge bug23153 into default
-rw-r--r-- | src/file_handle_cache.erl | 333 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 27 |
2 files changed, 241 insertions, 119 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 62a8af5d..f83fa0bc 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -148,7 +148,7 @@ -define(FILE_HANDLES_LIMIT_OTHER, 1024). -define(FILE_HANDLES_CHECK_INTERVAL, 2000). --define(OBTAIN_LIMIT(LIMIT), trunc((LIMIT * 0.9) - 1)). +-define(OBTAIN_LIMIT(LIMIT), trunc((LIMIT * 0.9) - 2)). -define(CLIENT_ETS_TABLE, ?MODULE). %%---------------------------------------------------------------------------- @@ -196,6 +196,13 @@ pending_closes }). +-record(pending, + { kind, + pid, + requested, + from + }). + %%---------------------------------------------------------------------------- %% Specs %%---------------------------------------------------------------------------- @@ -261,9 +268,9 @@ open(Path, Mode, Options) -> IsWriter = is_writer(Mode1), case IsWriter andalso HasWriter of true -> {error, writer_exists}; - false -> Ref = make_ref(), - case open1(Path1, Mode1, Options, Ref, bof, new) of - {ok, _Handle} -> + false -> {ok, Ref} = new_closed_handle(Path1, Mode1, Options), + case get_or_reopen([{Ref, new}]) of + {ok, [_Handle1]} -> RCount1 = case is_reader(Mode1) of true -> RCount + 1; false -> RCount @@ -274,6 +281,7 @@ open(Path, Mode, Options) -> has_writer = HasWriter1 }), {ok, Ref}; Error -> + erase({Ref, fhc_handle}), Error end end. @@ -444,8 +452,8 @@ set_maximum_since_use(MaximumAge) -> case lists:foldl( fun ({{Ref, fhc_handle}, Handle = #handle { hdl = Hdl, last_used_at = Then }}, Rep) -> - Age = timer:now_diff(Now, Then), - case Hdl =/= closed andalso Age >= MaximumAge of + case Hdl =/= closed andalso + timer:now_diff(Now, Then) >= MaximumAge of true -> soft_close(Ref, Handle) orelse Rep; false -> Rep end; @@ -483,18 +491,9 @@ append_to_write(Mode) -> end. with_handles(Refs, Fun) -> - ResHandles = lists:foldl( - fun (Ref, {ok, HandlesAcc}) -> - case get_or_reopen(Ref) of - {ok, Handle} -> {ok, [Handle | HandlesAcc]}; - Error -> Error - end; - (_Ref, Error) -> - Error - end, {ok, []}, Refs), - case ResHandles of + case get_or_reopen([{Ref, reopen} || Ref <- Refs]) of {ok, Handles} -> - case Fun(lists:reverse(Handles)) of + case Fun(Handles) of {Result, Handles1} when is_list(Handles1) -> lists:zipwith(fun put_handle/2, Refs, Handles1), Result; @@ -523,17 +522,80 @@ with_flushed_handles(Refs, Fun) -> end end). -get_or_reopen(Ref) -> - case get({Ref, fhc_handle}) of - undefined -> - {error, not_open, Ref}; - #handle { hdl = closed, offset = Offset, - path = Path, mode = Mode, options = Options } -> - open1(Path, Mode, Options, Ref, Offset, reopen); - Handle -> - {ok, Handle} +get_or_reopen(RefNewOrReopens) -> + case partition_handles(RefNewOrReopens) of + {OpenHdls, []} -> + {ok, [Handle || {_Ref, Handle} <- OpenHdls]}; + {OpenHdls, ClosedHdls} -> + Oldest = oldest(get_age_tree(), fun () -> now() end), + case gen_server:call(?SERVER, {open, self(), length(ClosedHdls), + Oldest}, infinity) of + ok -> + case reopen(ClosedHdls) of + {ok, RefHdls} -> sort_handles(RefNewOrReopens, + OpenHdls, RefHdls, []); + Error -> Error + end; + close -> + [soft_close(Ref, Handle) || + {{Ref, fhc_handle}, Handle = #handle { hdl = Hdl }} <- + get(), + Hdl =/= closed], + get_or_reopen(RefNewOrReopens) + end end. +reopen(ClosedHdls) -> reopen(ClosedHdls, get_age_tree(), []). + +reopen([], Tree, RefHdls) -> + put_age_tree(Tree), + {ok, lists:reverse(RefHdls)}; +reopen([{Ref, NewOrReopen, Handle = #handle { hdl = closed, + path = Path, + mode = Mode, + offset = Offset, + last_used_at = undefined }} | + RefNewOrReopenHdls] = ToOpen, Tree, RefHdls) -> + case file:open(Path, case NewOrReopen of + new -> Mode; + reopen -> [read | Mode] + end) of + {ok, Hdl} -> + Now = now(), + {{ok, Offset1}, Handle1} = + maybe_seek(Offset, Handle #handle { hdl = Hdl, + offset = 0, + last_used_at = Now }), + Handle2 = Handle1 #handle { trusted_offset = Offset1 }, + put({Ref, fhc_handle}, Handle2), + reopen(RefNewOrReopenHdls, gb_trees:insert(Now, Ref, Tree), + [{Ref, Handle2} | RefHdls]); + Error -> + %% NB: none of the handles in ToOpen are in the age tree + Oldest = oldest(Tree, fun () -> undefined end), + [gen_server:cast(?SERVER, {close, self(), Oldest}) || _ <- ToOpen], + put_age_tree(Tree), + Error + end. + +partition_handles(RefNewOrReopens) -> + lists:foldr( + fun ({Ref, NewOrReopen}, {Open, Closed}) -> + case get({Ref, fhc_handle}) of + #handle { hdl = closed } = Handle -> + {Open, [{Ref, NewOrReopen, Handle} | Closed]}; + #handle {} = Handle -> + {[{Ref, Handle} | Open], Closed} + end + end, {[], []}, RefNewOrReopens). + +sort_handles([], [], [], Acc) -> + {ok, lists:reverse(Acc)}; +sort_handles([{Ref, _} | RefHdls], [{Ref, Handle} | RefHdlsA], RefHdlsB, Acc) -> + sort_handles(RefHdls, RefHdlsA, RefHdlsB, [Handle | Acc]); +sort_handles([{Ref, _} | RefHdls], RefHdlsA, [{Ref, Handle} | RefHdlsB], Acc) -> + sort_handles(RefHdls, RefHdlsA, RefHdlsB, [Handle | Acc]). + put_handle(Ref, Handle = #handle { last_used_at = Then }) -> Now = now(), age_tree_update(Then, Now, Ref), @@ -549,20 +611,6 @@ get_age_tree() -> put_age_tree(Tree) -> put(fhc_age_tree, Tree). -age_tree_insert(Now, Ref) -> - Tree = get_age_tree(), - Tree1 = gb_trees:insert(Now, Ref, Tree), - {Oldest, _Ref} = gb_trees:smallest(Tree1), - case gen_server:call(?SERVER, {open, self(), Oldest}, infinity) of - ok -> - put_age_tree(Tree1); - close -> - [soft_close(Ref1, Handle1) || - {{Ref1, fhc_handle}, Handle1 = #handle { hdl = Hdl1 }} <- get(), - Hdl1 =/= closed], - age_tree_insert(Now, Ref) - end. - age_tree_update(Then, Now, Ref) -> with_age_tree( fun (Tree) -> @@ -573,13 +621,7 @@ age_tree_delete(Then) -> with_age_tree( fun (Tree) -> Tree1 = gb_trees:delete_any(Then, Tree), - Oldest = case gb_trees:is_empty(Tree1) of - true -> - undefined; - false -> - {Oldest1, _Ref} = gb_trees:smallest(Tree1), - Oldest1 - end, + Oldest = oldest(Tree1, fun () -> undefined end), gen_server:cast(?SERVER, {close, self(), Oldest}), Tree1 end). @@ -595,44 +637,37 @@ age_tree_change() -> Tree end). -open1(Path, Mode, Options, Ref, Offset, NewOrReopen) -> - Mode1 = case NewOrReopen of - new -> Mode; - reopen -> [read | Mode] - end, - Now = now(), - age_tree_insert(Now, Ref), - case file:open(Path, Mode1) of - {ok, Hdl} -> - WriteBufferSize = - case proplists:get_value(write_buffer, Options, unbuffered) of - unbuffered -> 0; - infinity -> infinity; - N when is_integer(N) -> N - end, - Handle = #handle { hdl = Hdl, - offset = 0, - trusted_offset = 0, - is_dirty = false, - write_buffer_size = 0, - write_buffer_size_limit = WriteBufferSize, - write_buffer = [], - at_eof = false, - path = Path, - mode = Mode, - options = Options, - is_write = is_writer(Mode), - is_read = is_reader(Mode), - last_used_at = Now }, - {{ok, Offset1}, Handle1} = maybe_seek(Offset, Handle), - Handle2 = Handle1 #handle { trusted_offset = Offset1 }, - put({Ref, fhc_handle}, Handle2), - {ok, Handle2}; - {error, Reason} -> - age_tree_delete(Now), - {error, Reason} +oldest(Tree, DefaultFun) -> + case gb_trees:is_empty(Tree) of + true -> DefaultFun(); + false -> {Oldest, _Ref} = gb_trees:smallest(Tree), + Oldest end. +new_closed_handle(Path, Mode, Options) -> + WriteBufferSize = + case proplists:get_value(write_buffer, Options, unbuffered) of + unbuffered -> 0; + infinity -> infinity; + N when is_integer(N) -> N + end, + Ref = make_ref(), + put({Ref, fhc_handle}, #handle { hdl = closed, + offset = 0, + trusted_offset = 0, + is_dirty = false, + write_buffer_size = 0, + write_buffer_size_limit = WriteBufferSize, + write_buffer = [], + at_eof = false, + path = Path, + mode = Mode, + options = Options, + is_write = is_writer(Mode), + is_read = is_reader(Mode), + last_used_at = undefined }), + {ok, Ref}. + soft_close(Ref, Handle) -> {Res, Handle1} = soft_close(Handle), case Res of @@ -646,7 +681,9 @@ soft_close(Handle = #handle { hdl = closed }) -> {ok, Handle}; soft_close(Handle) -> case write_buffer(Handle) of - {ok, #handle { hdl = Hdl, offset = Offset, is_dirty = IsDirty, + {ok, #handle { hdl = Hdl, + offset = Offset, + is_dirty = IsDirty, last_used_at = Then } = Handle1 } -> ok = case IsDirty of true -> file:sync(Hdl); @@ -654,8 +691,10 @@ soft_close(Handle) -> end, ok = file:close(Hdl), age_tree_delete(Then), - {ok, Handle1 #handle { hdl = closed, trusted_offset = Offset, - is_dirty = false }}; + {ok, Handle1 #handle { hdl = closed, + trusted_offset = Offset, + is_dirty = false, + last_used_at = undefined }}; {_Error, _Handle} = Result -> Result end. @@ -751,34 +790,38 @@ init([]) -> {ok, #fhc_state { elders = dict:new(), limit = Limit, open_count = 0, - open_pending = [], + open_pending = pending_new(), obtain_limit = ObtainLimit, obtain_count = 0, - obtain_pending = [], + obtain_pending = pending_new(), clients = Clients, timer_ref = undefined }}. -handle_call({open, Pid, EldestUnusedSince}, From, +handle_call({open, Pid, Requested, EldestUnusedSince}, From, State = #fhc_state { open_count = Count, open_pending = Pending, elders = Elders, clients = Clients }) when EldestUnusedSince =/= undefined -> Elders1 = dict:store(Pid, EldestUnusedSince, Elders), - Item = {open, Pid, From}, + Item = #pending { kind = open, + pid = Pid, + requested = Requested, + from = From }, ok = track_client(Pid, Clients), State1 = State #fhc_state { elders = Elders1 }, - case needs_reduce(State1 #fhc_state { open_count = Count + 1 }) of + case needs_reduce(State1 #fhc_state { open_count = Count + Requested }) of true -> case ets:lookup(Clients, Pid) of [#cstate { opened = 0 }] -> true = ets:update_element( Clients, Pid, {#cstate.blocked, true}), {noreply, reduce(State1 #fhc_state { - open_pending = [Item | Pending] })}; - [#cstate { opened = N }] -> + open_pending = pending_in(Item, Pending) })}; + [#cstate { opened = Opened }] -> true = ets:update_element( - Clients, Pid, {#cstate.pending_closes, N}), + Clients, Pid, + {#cstate.pending_closes, Opened}), {reply, close, State1} end; false -> {noreply, run_pending_item(Item, State1)} @@ -791,18 +834,18 @@ handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit, when Limit =/= infinity andalso Count >= Limit -> ok = track_client(Pid, Clients), true = ets:update_element(Clients, Pid, {#cstate.blocked, true}), - Item = {obtain, Pid, From}, - {noreply, State #fhc_state { obtain_pending = [Item | Pending] }}; + Item = #pending { kind = obtain, pid = Pid, requested = 1, from = From }, + {noreply, State #fhc_state { obtain_pending = pending_in(Item, Pending) }}; handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count, obtain_pending = Pending, clients = Clients }) -> - Item = {obtain, Pid, From}, + Item = #pending { kind = obtain, pid = Pid, requested = 1, from = From }, ok = track_client(Pid, Clients), case needs_reduce(State #fhc_state { obtain_count = Count + 1 }) of true -> true = ets:update_element(Clients, Pid, {#cstate.blocked, true}), - {noreply, - reduce(State #fhc_state {obtain_pending = [Item | Pending] })}; + {noreply, reduce(State #fhc_state { + obtain_pending = pending_in(Item, Pending) })}; false -> {noreply, run_pending_item(Item, State)} end; @@ -855,18 +898,16 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, obtain_count = ObtainCount, obtain_pending = ObtainPending, clients = Clients }) -> - FilterFun = fun ({_Kind, Pid1, _From}) -> Pid1 =/= Pid end, - OpenPending1 = lists:filter(FilterFun, OpenPending), - ObtainPending1 = lists:filter(FilterFun, ObtainPending), [#cstate { opened = Opened, obtained = Obtained }] = ets:lookup(Clients, Pid), true = ets:delete(Clients, Pid), + FilterFun = fun (#pending { pid = Pid1 }) -> Pid1 =/= Pid end, {noreply, process_pending( State #fhc_state { open_count = OpenCount - Opened, - open_pending = OpenPending1, + open_pending = filter_pending(FilterFun, OpenPending), obtain_count = ObtainCount - Obtained, - obtain_pending = ObtainPending1, + obtain_pending = filter_pending(FilterFun, ObtainPending), elders = dict:erase(Pid, Elders) })}. terminate(_Reason, State = #fhc_state { clients = Clients }) -> @@ -877,11 +918,58 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. %%---------------------------------------------------------------------------- +%% pending queue abstraction helpers +%%---------------------------------------------------------------------------- + +queue_fold(Fun, Init, Q) -> + case queue:out(Q) of + {empty, _Q} -> Init; + {{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1) + end. + +filter_pending(Fun, {Count, Queue}) -> + {Delta, Queue1} = + queue_fold(fun (Item, {DeltaN, QueueN}) -> + case Fun(Item) of + true -> {DeltaN, queue:in(Item, QueueN)}; + false -> {DeltaN - requested(Item), QueueN} + end + end, {0, queue:new()}, Queue), + {Count + Delta, Queue1}. + +pending_new() -> + {0, queue:new()}. + +pending_in(Item = #pending { requested = Requested }, {Count, Queue}) -> + {Count + Requested, queue:in(Item, Queue)}. + +pending_out({0, _Queue} = Pending) -> + {empty, Pending}; +pending_out({N, Queue}) -> + {{value, #pending { requested = Requested }} = Result, Queue1} = + queue:out(Queue), + {Result, {N - Requested, Queue1}}. + +pending_count({Count, _Queue}) -> + Count. + +pending_is_empty({0, _Queue}) -> + true; +pending_is_empty({_N, _Queue}) -> + false. + +%%---------------------------------------------------------------------------- %% server helpers %%---------------------------------------------------------------------------- obtain_limit(infinity) -> infinity; -obtain_limit(Limit) -> ?OBTAIN_LIMIT(Limit). +obtain_limit(Limit) -> case ?OBTAIN_LIMIT(Limit) of + OLimit when OLimit < 0 -> 0; + OLimit -> OLimit + end. + +requested({_Kind, _Pid, Requested, _From}) -> + Requested. process_pending(State = #fhc_state { limit = infinity }) -> State; @@ -906,22 +994,28 @@ process_obtain(State = #fhc_state { limit = Limit, {Pending1, State1} = process_pending(Pending, Quota, State), State1 #fhc_state { obtain_pending = Pending1 }. -process_pending([], _Quota, State) -> - {[], State}; process_pending(Pending, Quota, State) when Quota =< 0 -> {Pending, State}; process_pending(Pending, Quota, State) -> - PendingLen = length(Pending), - SatisfiableLen = lists:min([PendingLen, Quota]), - Take = PendingLen - SatisfiableLen, - {PendingNew, SatisfiableRev} = lists:split(Take, Pending), - State1 = lists:foldl(fun run_pending_item/2, State, SatisfiableRev), - {PendingNew, State1}. - -run_pending_item({Kind, Pid, From}, State = #fhc_state { clients = Clients }) -> + case pending_out(Pending) of + {empty, _Pending} -> + {Pending, State}; + {{value, #pending { requested = Requested }}, _Pending1} + when Requested > Quota -> + {Pending, State}; + {{value, #pending { requested = Requested } = Item}, Pending1} -> + process_pending(Pending1, Quota - Requested, + run_pending_item(Item, State)) + end. + +run_pending_item(#pending { kind = Kind, + pid = Pid, + requested = Requested, + from = From }, + State = #fhc_state { clients = Clients }) -> gen_server:reply(From, ok), true = ets:update_element(Clients, Pid, {#cstate.blocked, false}), - update_counts(Kind, Pid, +1, State). + update_counts(Kind, Pid, Requested, State). update_counts(Kind, Pid, Delta, State = #fhc_state { open_count = OpenCount, @@ -952,9 +1046,9 @@ needs_reduce(#fhc_state { limit = Limit, obtain_pending = ObtainPending }) -> Limit =/= infinity andalso ((OpenCount + ObtainCount > Limit) - orelse (OpenPending =/= []) + orelse (not pending_is_empty(OpenPending)) orelse (ObtainCount < ObtainLimit - andalso ObtainPending =/= [])). + andalso not pending_is_empty(ObtainPending))). reduce(State = #fhc_state { open_pending = OpenPending, obtain_pending = ObtainPending, @@ -983,7 +1077,8 @@ reduce(State = #fhc_state { open_pending = OpenPending, notify_age(CStates, AverageAge); _ -> notify_age0(Clients, CStates, - length(OpenPending) + length(ObtainPending)) + pending_count(OpenPending) + + pending_count(ObtainPending)) end end, case TRef of diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 3c90fefa..082e7877 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -56,6 +56,7 @@ test_content_prop_roundtrip(Datum, Binary) -> all_tests() -> application:set_env(rabbit, file_handles_high_watermark, 10, infinity), ok = file_handle_cache:set_limit(10), + passed = test_file_handle_cache(), passed = test_backing_queue(), passed = test_priority_queue(), passed = test_bpqueue(), @@ -1416,6 +1417,32 @@ extra_arg_hook(Hookname, Handler, Args, Extra1, Extra2) -> test_supervisor_delayed_restart() -> test_sup:test_supervisor_delayed_restart(). +test_file_handle_cache() -> + %% test copying when there is just one spare handle + Limit = file_handle_cache:get_limit(), + ok = file_handle_cache:set_limit(5), %% 1 or 2 sockets, 2 msg_stores + TmpDir = filename:join(rabbit_mnesia:dir(), "tmp"), + ok = filelib:ensure_dir(filename:join(TmpDir, "nothing")), + Pid = spawn(fun () -> {ok, Hdl} = file_handle_cache:open( + filename:join(TmpDir, "file3"), + [write], []), + receive close -> ok end, + file_handle_cache:delete(Hdl) + end), + Src = filename:join(TmpDir, "file1"), + Dst = filename:join(TmpDir, "file2"), + Content = <<"foo">>, + ok = file:write_file(Src, Content), + {ok, SrcHdl} = file_handle_cache:open(Src, [read], []), + {ok, DstHdl} = file_handle_cache:open(Dst, [write], []), + Size = size(Content), + {ok, Size} = file_handle_cache:copy(SrcHdl, DstHdl, Size), + ok = file_handle_cache:delete(SrcHdl), + file_handle_cache:delete(DstHdl), + Pid ! close, + ok = file_handle_cache:set_limit(Limit), + passed. + test_backing_queue() -> case application:get_env(rabbit, backing_queue_module) of {ok, rabbit_variable_queue} -> |