diff options
authorMatthias Radestock <>2010-08-22 20:56:10 +0100
committerMatthias Radestock <>2010-08-22 20:56:10 +0100
commitc7e3e3a4aa9f3ba958ec3384a39af6106a13f534 (patch)
parente47333131fc5c3d196fdad432cd1573ff486feb5 (diff)
parent1d2dde0107fd11dad163d28797f03d36991a2580 (diff)
merge bug23153 into default
2 files changed, 241 insertions, 119 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 62a8af5d..f83fa0bc 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -148,7 +148,7 @@
--define(OBTAIN_LIMIT(LIMIT), trunc((LIMIT * 0.9) - 1)).
+-define(OBTAIN_LIMIT(LIMIT), trunc((LIMIT * 0.9) - 2)).
@@ -196,6 +196,13 @@
+ { kind,
+ pid,
+ requested,
+ from
+ }).
%% Specs
@@ -261,9 +268,9 @@ open(Path, Mode, Options) ->
IsWriter = is_writer(Mode1),
case IsWriter andalso HasWriter of
true -> {error, writer_exists};
- false -> Ref = make_ref(),
- case open1(Path1, Mode1, Options, Ref, bof, new) of
- {ok, _Handle} ->
+ false -> {ok, Ref} = new_closed_handle(Path1, Mode1, Options),
+ case get_or_reopen([{Ref, new}]) of
+ {ok, [_Handle1]} ->
RCount1 = case is_reader(Mode1) of
true -> RCount + 1;
false -> RCount
@@ -274,6 +281,7 @@ open(Path, Mode, Options) ->
has_writer = HasWriter1 }),
{ok, Ref};
Error ->
+ erase({Ref, fhc_handle}),
@@ -444,8 +452,8 @@ set_maximum_since_use(MaximumAge) ->
case lists:foldl(
fun ({{Ref, fhc_handle},
Handle = #handle { hdl = Hdl, last_used_at = Then }}, Rep) ->
- Age = timer:now_diff(Now, Then),
- case Hdl =/= closed andalso Age >= MaximumAge of
+ case Hdl =/= closed andalso
+ timer:now_diff(Now, Then) >= MaximumAge of
true -> soft_close(Ref, Handle) orelse Rep;
false -> Rep
@@ -483,18 +491,9 @@ append_to_write(Mode) ->
with_handles(Refs, Fun) ->
- ResHandles = lists:foldl(
- fun (Ref, {ok, HandlesAcc}) ->
- case get_or_reopen(Ref) of
- {ok, Handle} -> {ok, [Handle | HandlesAcc]};
- Error -> Error
- end;
- (_Ref, Error) ->
- Error
- end, {ok, []}, Refs),
- case ResHandles of
+ case get_or_reopen([{Ref, reopen} || Ref <- Refs]) of
{ok, Handles} ->
- case Fun(lists:reverse(Handles)) of
+ case Fun(Handles) of
{Result, Handles1} when is_list(Handles1) ->
lists:zipwith(fun put_handle/2, Refs, Handles1),
@@ -523,17 +522,80 @@ with_flushed_handles(Refs, Fun) ->
-get_or_reopen(Ref) ->
- case get({Ref, fhc_handle}) of
- undefined ->
- {error, not_open, Ref};
- #handle { hdl = closed, offset = Offset,
- path = Path, mode = Mode, options = Options } ->
- open1(Path, Mode, Options, Ref, Offset, reopen);
- Handle ->
- {ok, Handle}
+get_or_reopen(RefNewOrReopens) ->
+ case partition_handles(RefNewOrReopens) of
+ {OpenHdls, []} ->
+ {ok, [Handle || {_Ref, Handle} <- OpenHdls]};
+ {OpenHdls, ClosedHdls} ->
+ Oldest = oldest(get_age_tree(), fun () -> now() end),
+ case gen_server:call(?SERVER, {open, self(), length(ClosedHdls),
+ Oldest}, infinity) of
+ ok ->
+ case reopen(ClosedHdls) of
+ {ok, RefHdls} -> sort_handles(RefNewOrReopens,
+ OpenHdls, RefHdls, []);
+ Error -> Error
+ end;
+ close ->
+ [soft_close(Ref, Handle) ||
+ {{Ref, fhc_handle}, Handle = #handle { hdl = Hdl }} <-
+ get(),
+ Hdl =/= closed],
+ get_or_reopen(RefNewOrReopens)
+ end
+reopen(ClosedHdls) -> reopen(ClosedHdls, get_age_tree(), []).
+reopen([], Tree, RefHdls) ->
+ put_age_tree(Tree),
+ {ok, lists:reverse(RefHdls)};
+reopen([{Ref, NewOrReopen, Handle = #handle { hdl = closed,
+ path = Path,
+ mode = Mode,
+ offset = Offset,
+ last_used_at = undefined }} |
+ RefNewOrReopenHdls] = ToOpen, Tree, RefHdls) ->
+ case file:open(Path, case NewOrReopen of
+ new -> Mode;
+ reopen -> [read | Mode]
+ end) of
+ {ok, Hdl} ->
+ Now = now(),
+ {{ok, Offset1}, Handle1} =
+ maybe_seek(Offset, Handle #handle { hdl = Hdl,
+ offset = 0,
+ last_used_at = Now }),
+ Handle2 = Handle1 #handle { trusted_offset = Offset1 },
+ put({Ref, fhc_handle}, Handle2),
+ reopen(RefNewOrReopenHdls, gb_trees:insert(Now, Ref, Tree),
+ [{Ref, Handle2} | RefHdls]);
+ Error ->
+ %% NB: none of the handles in ToOpen are in the age tree
+ Oldest = oldest(Tree, fun () -> undefined end),
+ [gen_server:cast(?SERVER, {close, self(), Oldest}) || _ <- ToOpen],
+ put_age_tree(Tree),
+ Error
+ end.
+partition_handles(RefNewOrReopens) ->
+ lists:foldr(
+ fun ({Ref, NewOrReopen}, {Open, Closed}) ->
+ case get({Ref, fhc_handle}) of
+ #handle { hdl = closed } = Handle ->
+ {Open, [{Ref, NewOrReopen, Handle} | Closed]};
+ #handle {} = Handle ->
+ {[{Ref, Handle} | Open], Closed}
+ end
+ end, {[], []}, RefNewOrReopens).
+sort_handles([], [], [], Acc) ->
+ {ok, lists:reverse(Acc)};
+sort_handles([{Ref, _} | RefHdls], [{Ref, Handle} | RefHdlsA], RefHdlsB, Acc) ->
+ sort_handles(RefHdls, RefHdlsA, RefHdlsB, [Handle | Acc]);
+sort_handles([{Ref, _} | RefHdls], RefHdlsA, [{Ref, Handle} | RefHdlsB], Acc) ->
+ sort_handles(RefHdls, RefHdlsA, RefHdlsB, [Handle | Acc]).
put_handle(Ref, Handle = #handle { last_used_at = Then }) ->
Now = now(),
age_tree_update(Then, Now, Ref),
@@ -549,20 +611,6 @@ get_age_tree() ->
put_age_tree(Tree) -> put(fhc_age_tree, Tree).
-age_tree_insert(Now, Ref) ->
- Tree = get_age_tree(),
- Tree1 = gb_trees:insert(Now, Ref, Tree),
- {Oldest, _Ref} = gb_trees:smallest(Tree1),
- case gen_server:call(?SERVER, {open, self(), Oldest}, infinity) of
- ok ->
- put_age_tree(Tree1);
- close ->
- [soft_close(Ref1, Handle1) ||
- {{Ref1, fhc_handle}, Handle1 = #handle { hdl = Hdl1 }} <- get(),
- Hdl1 =/= closed],
- age_tree_insert(Now, Ref)
- end.
age_tree_update(Then, Now, Ref) ->
fun (Tree) ->
@@ -573,13 +621,7 @@ age_tree_delete(Then) ->
fun (Tree) ->
Tree1 = gb_trees:delete_any(Then, Tree),
- Oldest = case gb_trees:is_empty(Tree1) of
- true ->
- undefined;
- false ->
- {Oldest1, _Ref} = gb_trees:smallest(Tree1),
- Oldest1
- end,
+ Oldest = oldest(Tree1, fun () -> undefined end),
gen_server:cast(?SERVER, {close, self(), Oldest}),
@@ -595,44 +637,37 @@ age_tree_change() ->
-open1(Path, Mode, Options, Ref, Offset, NewOrReopen) ->
- Mode1 = case NewOrReopen of
- new -> Mode;
- reopen -> [read | Mode]
- end,
- Now = now(),
- age_tree_insert(Now, Ref),
- case file:open(Path, Mode1) of
- {ok, Hdl} ->
- WriteBufferSize =
- case proplists:get_value(write_buffer, Options, unbuffered) of
- unbuffered -> 0;
- infinity -> infinity;
- N when is_integer(N) -> N
- end,
- Handle = #handle { hdl = Hdl,
- offset = 0,
- trusted_offset = 0,
- is_dirty = false,
- write_buffer_size = 0,
- write_buffer_size_limit = WriteBufferSize,
- write_buffer = [],
- at_eof = false,
- path = Path,
- mode = Mode,
- options = Options,
- is_write = is_writer(Mode),
- is_read = is_reader(Mode),
- last_used_at = Now },
- {{ok, Offset1}, Handle1} = maybe_seek(Offset, Handle),
- Handle2 = Handle1 #handle { trusted_offset = Offset1 },
- put({Ref, fhc_handle}, Handle2),
- {ok, Handle2};
- {error, Reason} ->
- age_tree_delete(Now),
- {error, Reason}
+oldest(Tree, DefaultFun) ->
+ case gb_trees:is_empty(Tree) of
+ true -> DefaultFun();
+ false -> {Oldest, _Ref} = gb_trees:smallest(Tree),
+ Oldest
+new_closed_handle(Path, Mode, Options) ->
+ WriteBufferSize =
+ case proplists:get_value(write_buffer, Options, unbuffered) of
+ unbuffered -> 0;
+ infinity -> infinity;
+ N when is_integer(N) -> N
+ end,
+ Ref = make_ref(),
+ put({Ref, fhc_handle}, #handle { hdl = closed,
+ offset = 0,
+ trusted_offset = 0,
+ is_dirty = false,
+ write_buffer_size = 0,
+ write_buffer_size_limit = WriteBufferSize,
+ write_buffer = [],
+ at_eof = false,
+ path = Path,
+ mode = Mode,
+ options = Options,
+ is_write = is_writer(Mode),
+ is_read = is_reader(Mode),
+ last_used_at = undefined }),
+ {ok, Ref}.
soft_close(Ref, Handle) ->
{Res, Handle1} = soft_close(Handle),
case Res of
@@ -646,7 +681,9 @@ soft_close(Handle = #handle { hdl = closed }) ->
{ok, Handle};
soft_close(Handle) ->
case write_buffer(Handle) of
- {ok, #handle { hdl = Hdl, offset = Offset, is_dirty = IsDirty,
+ {ok, #handle { hdl = Hdl,
+ offset = Offset,
+ is_dirty = IsDirty,
last_used_at = Then } = Handle1 } ->
ok = case IsDirty of
true -> file:sync(Hdl);
@@ -654,8 +691,10 @@ soft_close(Handle) ->
ok = file:close(Hdl),
- {ok, Handle1 #handle { hdl = closed, trusted_offset = Offset,
- is_dirty = false }};
+ {ok, Handle1 #handle { hdl = closed,
+ trusted_offset = Offset,
+ is_dirty = false,
+ last_used_at = undefined }};
{_Error, _Handle} = Result ->
@@ -751,34 +790,38 @@ init([]) ->
{ok, #fhc_state { elders = dict:new(),
limit = Limit,
open_count = 0,
- open_pending = [],
+ open_pending = pending_new(),
obtain_limit = ObtainLimit,
obtain_count = 0,
- obtain_pending = [],
+ obtain_pending = pending_new(),
clients = Clients,
timer_ref = undefined }}.
-handle_call({open, Pid, EldestUnusedSince}, From,
+handle_call({open, Pid, Requested, EldestUnusedSince}, From,
State = #fhc_state { open_count = Count,
open_pending = Pending,
elders = Elders,
clients = Clients })
when EldestUnusedSince =/= undefined ->
Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
- Item = {open, Pid, From},
+ Item = #pending { kind = open,
+ pid = Pid,
+ requested = Requested,
+ from = From },
ok = track_client(Pid, Clients),
State1 = State #fhc_state { elders = Elders1 },
- case needs_reduce(State1 #fhc_state { open_count = Count + 1 }) of
+ case needs_reduce(State1 #fhc_state { open_count = Count + Requested }) of
true -> case ets:lookup(Clients, Pid) of
[#cstate { opened = 0 }] ->
true = ets:update_element(
Clients, Pid, {#cstate.blocked, true}),
reduce(State1 #fhc_state {
- open_pending = [Item | Pending] })};
- [#cstate { opened = N }] ->
+ open_pending = pending_in(Item, Pending) })};
+ [#cstate { opened = Opened }] ->
true = ets:update_element(
- Clients, Pid, {#cstate.pending_closes, N}),
+ Clients, Pid,
+ {#cstate.pending_closes, Opened}),
{reply, close, State1}
false -> {noreply, run_pending_item(Item, State1)}
@@ -791,18 +834,18 @@ handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit,
when Limit =/= infinity andalso Count >= Limit ->
ok = track_client(Pid, Clients),
true = ets:update_element(Clients, Pid, {#cstate.blocked, true}),
- Item = {obtain, Pid, From},
- {noreply, State #fhc_state { obtain_pending = [Item | Pending] }};
+ Item = #pending { kind = obtain, pid = Pid, requested = 1, from = From },
+ {noreply, State #fhc_state { obtain_pending = pending_in(Item, Pending) }};
handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count,
obtain_pending = Pending,
clients = Clients }) ->
- Item = {obtain, Pid, From},
+ Item = #pending { kind = obtain, pid = Pid, requested = 1, from = From },
ok = track_client(Pid, Clients),
case needs_reduce(State #fhc_state { obtain_count = Count + 1 }) of
true ->
true = ets:update_element(Clients, Pid, {#cstate.blocked, true}),
- {noreply,
- reduce(State #fhc_state {obtain_pending = [Item | Pending] })};
+ {noreply, reduce(State #fhc_state {
+ obtain_pending = pending_in(Item, Pending) })};
false ->
{noreply, run_pending_item(Item, State)}
@@ -855,18 +898,16 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason},
obtain_count = ObtainCount,
obtain_pending = ObtainPending,
clients = Clients }) ->
- FilterFun = fun ({_Kind, Pid1, _From}) -> Pid1 =/= Pid end,
- OpenPending1 = lists:filter(FilterFun, OpenPending),
- ObtainPending1 = lists:filter(FilterFun, ObtainPending),
[#cstate { opened = Opened, obtained = Obtained }] =
ets:lookup(Clients, Pid),
true = ets:delete(Clients, Pid),
+ FilterFun = fun (#pending { pid = Pid1 }) -> Pid1 =/= Pid end,
{noreply, process_pending(
State #fhc_state {
open_count = OpenCount - Opened,
- open_pending = OpenPending1,
+ open_pending = filter_pending(FilterFun, OpenPending),
obtain_count = ObtainCount - Obtained,
- obtain_pending = ObtainPending1,
+ obtain_pending = filter_pending(FilterFun, ObtainPending),
elders = dict:erase(Pid, Elders) })}.
terminate(_Reason, State = #fhc_state { clients = Clients }) ->
@@ -877,11 +918,58 @@ code_change(_OldVsn, State, _Extra) ->
{ok, State}.
+%% pending queue abstraction helpers
+queue_fold(Fun, Init, Q) ->
+ case queue:out(Q) of
+ {empty, _Q} -> Init;
+ {{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1)
+ end.
+filter_pending(Fun, {Count, Queue}) ->
+ {Delta, Queue1} =
+ queue_fold(fun (Item, {DeltaN, QueueN}) ->
+ case Fun(Item) of
+ true -> {DeltaN, queue:in(Item, QueueN)};
+ false -> {DeltaN - requested(Item), QueueN}
+ end
+ end, {0, queue:new()}, Queue),
+ {Count + Delta, Queue1}.
+pending_new() ->
+ {0, queue:new()}.
+pending_in(Item = #pending { requested = Requested }, {Count, Queue}) ->
+ {Count + Requested, queue:in(Item, Queue)}.
+pending_out({0, _Queue} = Pending) ->
+ {empty, Pending};
+pending_out({N, Queue}) ->
+ {{value, #pending { requested = Requested }} = Result, Queue1} =
+ queue:out(Queue),
+ {Result, {N - Requested, Queue1}}.
+pending_count({Count, _Queue}) ->
+ Count.
+pending_is_empty({0, _Queue}) ->
+ true;
+pending_is_empty({_N, _Queue}) ->
+ false.
%% server helpers
obtain_limit(infinity) -> infinity;
-obtain_limit(Limit) -> ?OBTAIN_LIMIT(Limit).
+obtain_limit(Limit) -> case ?OBTAIN_LIMIT(Limit) of
+ OLimit when OLimit < 0 -> 0;
+ OLimit -> OLimit
+ end.
+requested({_Kind, _Pid, Requested, _From}) ->
+ Requested.
process_pending(State = #fhc_state { limit = infinity }) ->
@@ -906,22 +994,28 @@ process_obtain(State = #fhc_state { limit = Limit,
{Pending1, State1} = process_pending(Pending, Quota, State),
State1 #fhc_state { obtain_pending = Pending1 }.
-process_pending([], _Quota, State) ->
- {[], State};
process_pending(Pending, Quota, State) when Quota =< 0 ->
{Pending, State};
process_pending(Pending, Quota, State) ->
- PendingLen = length(Pending),
- SatisfiableLen = lists:min([PendingLen, Quota]),
- Take = PendingLen - SatisfiableLen,
- {PendingNew, SatisfiableRev} = lists:split(Take, Pending),
- State1 = lists:foldl(fun run_pending_item/2, State, SatisfiableRev),
- {PendingNew, State1}.
-run_pending_item({Kind, Pid, From}, State = #fhc_state { clients = Clients }) ->
+ case pending_out(Pending) of
+ {empty, _Pending} ->
+ {Pending, State};
+ {{value, #pending { requested = Requested }}, _Pending1}
+ when Requested > Quota ->
+ {Pending, State};
+ {{value, #pending { requested = Requested } = Item}, Pending1} ->
+ process_pending(Pending1, Quota - Requested,
+ run_pending_item(Item, State))
+ end.
+run_pending_item(#pending { kind = Kind,
+ pid = Pid,
+ requested = Requested,
+ from = From },
+ State = #fhc_state { clients = Clients }) ->
gen_server:reply(From, ok),
true = ets:update_element(Clients, Pid, {#cstate.blocked, false}),
- update_counts(Kind, Pid, +1, State).
+ update_counts(Kind, Pid, Requested, State).
update_counts(Kind, Pid, Delta,
State = #fhc_state { open_count = OpenCount,
@@ -952,9 +1046,9 @@ needs_reduce(#fhc_state { limit = Limit,
obtain_pending = ObtainPending }) ->
Limit =/= infinity
andalso ((OpenCount + ObtainCount > Limit)
- orelse (OpenPending =/= [])
+ orelse (not pending_is_empty(OpenPending))
orelse (ObtainCount < ObtainLimit
- andalso ObtainPending =/= [])).
+ andalso not pending_is_empty(ObtainPending))).
reduce(State = #fhc_state { open_pending = OpenPending,
obtain_pending = ObtainPending,
@@ -983,7 +1077,8 @@ reduce(State = #fhc_state { open_pending = OpenPending,
notify_age(CStates, AverageAge);
_ ->
notify_age0(Clients, CStates,
- length(OpenPending) + length(ObtainPending))
+ pending_count(OpenPending) +
+ pending_count(ObtainPending))
case TRef of
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 3c90fefa..082e7877 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -56,6 +56,7 @@ test_content_prop_roundtrip(Datum, Binary) ->
all_tests() ->
application:set_env(rabbit, file_handles_high_watermark, 10, infinity),
ok = file_handle_cache:set_limit(10),
+ passed = test_file_handle_cache(),
passed = test_backing_queue(),
passed = test_priority_queue(),
passed = test_bpqueue(),
@@ -1416,6 +1417,32 @@ extra_arg_hook(Hookname, Handler, Args, Extra1, Extra2) ->
test_supervisor_delayed_restart() ->
+test_file_handle_cache() ->
+ %% test copying when there is just one spare handle
+ Limit = file_handle_cache:get_limit(),
+ ok = file_handle_cache:set_limit(5), %% 1 or 2 sockets, 2 msg_stores
+ TmpDir = filename:join(rabbit_mnesia:dir(), "tmp"),
+ ok = filelib:ensure_dir(filename:join(TmpDir, "nothing")),
+ Pid = spawn(fun () -> {ok, Hdl} = file_handle_cache:open(
+ filename:join(TmpDir, "file3"),
+ [write], []),
+ receive close -> ok end,
+ file_handle_cache:delete(Hdl)
+ end),
+ Src = filename:join(TmpDir, "file1"),
+ Dst = filename:join(TmpDir, "file2"),
+ Content = <<"foo">>,
+ ok = file:write_file(Src, Content),
+ {ok, SrcHdl} = file_handle_cache:open(Src, [read], []),
+ {ok, DstHdl} = file_handle_cache:open(Dst, [write], []),
+ Size = size(Content),
+ {ok, Size} = file_handle_cache:copy(SrcHdl, DstHdl, Size),
+ ok = file_handle_cache:delete(SrcHdl),
+ file_handle_cache:delete(DstHdl),
+ Pid ! close,
+ ok = file_handle_cache:set_limit(Limit),
+ passed.
test_backing_queue() ->
case application:get_env(rabbit, backing_queue_module) of
{ok, rabbit_variable_queue} ->