summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-08-22 04:06:18 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-08-22 04:06:18 +0100
commitbbea6348b8c91487e179c67b909e5a6e2393d1a3 (patch)
treebd4839641a263ccda4c77c8c41c9d9e59da85503
parentb8b7b09d009e97583ee23c567dad633cbdc4e243 (diff)
downloadrabbitmq-server-bbea6348b8c91487e179c67b909e5a6e2393d1a3.tar.gz
Rejigged client side so that it asks for the correct number of fds upfront.
-rw-r--r--src/file_handle_cache.erl154
1 files changed, 94 insertions, 60 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index a7ef2d66..1be04371 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -260,8 +260,9 @@ open(Path, Mode, Options) ->
case IsWriter andalso HasWriter of
true -> {error, writer_exists};
false -> Ref = make_ref(),
- case open1(Path1, Mode1, Options, Ref, bof, new) of
- {ok, _Handle} ->
+ {ok, _Handle} = new_closed_handle(Path1, Mode1, Options, Ref),
+ case get_opened_rev([{Ref, new}]) of
+ {ok, [_Handle1]} ->
RCount1 = case is_reader(Mode1) of
true -> RCount + 1;
false -> RCount
@@ -272,6 +273,7 @@ open(Path, Mode, Options) ->
has_writer = HasWriter1 }),
{ok, Ref};
Error ->
+ erase({Ref, fhc_handle}),
Error
end
end.
@@ -475,16 +477,7 @@ append_to_write(Mode) ->
end.
with_handles(Refs, Fun) ->
- ResHandles = lists:foldl(
- fun (Ref, {ok, HandlesAcc}) ->
- case get_or_reopen(Ref) of
- {ok, Handle} -> {ok, [Handle | HandlesAcc]};
- Error -> Error
- end;
- (_Ref, Error) ->
- Error
- end, {ok, []}, Refs),
- case ResHandles of
+ case get_opened_rev([{Ref, reopen} || Ref <- Refs]) of
{ok, Handles} ->
case Fun(lists:reverse(Handles)) of
{Result, Handles1} when is_list(Handles1) ->
@@ -515,17 +508,69 @@ with_flushed_handles(Refs, Fun) ->
end
end).
-get_or_reopen(Ref) ->
- case get({Ref, fhc_handle}) of
- undefined ->
- {error, not_open, Ref};
- #handle { hdl = closed, offset = Offset,
- path = Path, mode = Mode, options = Options } ->
- open1(Path, Mode, Options, Ref, Offset, reopen);
- Handle ->
- {ok, Handle}
+get_opened_rev(RefNewOrReopens) ->
+ case partition_handles_rev(RefNewOrReopens) of
+ {OpenHdlsRev, []} ->
+ {ok, [Handle || {_Ref, Handle} <- OpenHdlsRev]};
+ {OpenHdlsRev, ClosedHdlsRev} ->
+ Tree = get_age_tree(),
+ Oldest = case gb_trees:is_empty(Tree) of
+ true -> now();
+ false -> {Then, _Ref} = gb_trees:smallest(Tree),
+ Then
+ end,
+ case gen_server:call(?SERVER, {open, self(), length(ClosedHdlsRev),
+ Oldest}, infinity) of
+ ok ->
+ case reopen(ClosedHdlsRev, Tree, []) of
+ {ok, RefHdls} ->
+ sort_handles(RefNewOrReopens,
+ lists:reverse(OpenHdlsRev),
+ RefHdls, []);
+ {error, Error} ->
+ {error, Error}
+ end;
+ close ->
+ [soft_close(Ref1, Handle1) ||
+ {{Ref1, fhc_handle},
+ Handle1 = #handle { hdl = Hdl1 }} <- get(),
+ Hdl1 =/= closed],
+ get_opened_rev(RefNewOrReopens)
+ end
+ end.
+
+sort_handles([], [], [], Acc) ->
+ {ok, Acc};
+sort_handles([{Ref, _} | RefHdls], [{Ref, Handle} | RefHdlsA], RefHdlsB, Acc) ->
+ sort_handles(RefHdls, RefHdlsA, RefHdlsB, [Handle | Acc]);
+sort_handles([{Ref, _} | RefHdls], RefHdlsA, [{Ref, Handle} | RefHdlsB], Acc) ->
+ sort_handles(RefHdls, RefHdlsA, RefHdlsB, [Handle | Acc]).
+
+reopen([], Tree, RefHdls) ->
+ put_age_tree(Tree),
+ {ok, RefHdls};
+reopen([{Ref, NewOrReopen, Handle} | RefNewOrReopenHdlsRev], Tree, RefHdls) ->
+ Now = now(),
+ case open1(Ref, Handle #handle { last_used_at = Now }, NewOrReopen) of
+ {ok, #handle {} = Handle1} ->
+ reopen(RefNewOrReopenHdlsRev, gb_trees:insert(Now, Ref, Tree),
+ [{Ref, Handle1} | RefHdls]);
+ {error, Reason} ->
+ age_tree_delete(Handle #handle.last_used_at),
+ {error, Reason}
end.
+partition_handles_rev(RefNewOrReopens) ->
+ lists:foldl(
+ fun ({Ref, NewOrReopen}, {Open, Closed}) ->
+ case get({Ref, fhc_handle}) of
+ #handle { hdl = closed } = Handle ->
+ {Open, [{Ref, NewOrReopen, Handle} | Closed]};
+ #handle {} = Handle ->
+ {[{Ref, Handle} | Open], Closed}
+ end
+ end, {[], []}, RefNewOrReopens).
+
put_handle(Ref, Handle = #handle { last_used_at = Then }) ->
Now = now(),
age_tree_update(Then, Now, Ref),
@@ -541,20 +586,6 @@ get_age_tree() ->
put_age_tree(Tree) -> put(fhc_age_tree, Tree).
-age_tree_insert(Now, Ref) ->
- Tree = get_age_tree(),
- Tree1 = gb_trees:insert(Now, Ref, Tree),
- {Oldest, _Ref} = gb_trees:smallest(Tree1),
- case gen_server:call(?SERVER, {open, self(), 1, Oldest}, infinity) of
- ok ->
- put_age_tree(Tree1);
- close ->
- [soft_close(Ref1, Handle1) ||
- {{Ref1, fhc_handle}, Handle1 = #handle { hdl = Hdl1 }} <- get(),
- Hdl1 =/= closed],
- age_tree_insert(Now, Ref)
- end.
-
age_tree_update(Then, Now, Ref) ->
with_age_tree(
fun (Tree) ->
@@ -587,41 +618,44 @@ age_tree_change() ->
Tree
end).
-open1(Path, Mode, Options, Ref, Offset, NewOrReopen) ->
+new_closed_handle(Path, Mode, Options, Ref) ->
+ WriteBufferSize =
+ case proplists:get_value(write_buffer, Options, unbuffered) of
+ unbuffered -> 0;
+ infinity -> infinity;
+ N when is_integer(N) -> N
+ end,
+ Handle = #handle { hdl = closed,
+ offset = 0,
+ trusted_offset = 0,
+ is_dirty = false,
+ write_buffer_size = 0,
+ write_buffer_size_limit = WriteBufferSize,
+ write_buffer = [],
+ at_eof = false,
+ path = Path,
+ mode = Mode,
+ options = Options,
+ is_write = is_writer(Mode),
+ is_read = is_reader(Mode),
+ last_used_at = undefined },
+ put({Ref, fhc_handle}, Handle),
+ {ok, Handle}.
+
+open1(Ref, #handle { hdl = closed, path = Path, mode = Mode, offset = Offset } =
+ Handle, NewOrReopen) ->
Mode1 = case NewOrReopen of
new -> Mode;
reopen -> [read | Mode]
end,
- Now = now(),
- age_tree_insert(Now, Ref),
case file:open(Path, Mode1) of
{ok, Hdl} ->
- WriteBufferSize =
- case proplists:get_value(write_buffer, Options, unbuffered) of
- unbuffered -> 0;
- infinity -> infinity;
- N when is_integer(N) -> N
- end,
- Handle = #handle { hdl = Hdl,
- offset = 0,
- trusted_offset = 0,
- is_dirty = false,
- write_buffer_size = 0,
- write_buffer_size_limit = WriteBufferSize,
- write_buffer = [],
- at_eof = false,
- path = Path,
- mode = Mode,
- options = Options,
- is_write = is_writer(Mode),
- is_read = is_reader(Mode),
- last_used_at = Now },
- {{ok, Offset1}, Handle1} = maybe_seek(Offset, Handle),
+ {{ok, Offset1}, Handle1} =
+ maybe_seek(Offset, Handle #handle { hdl = Hdl, offset = 0 }),
Handle2 = Handle1 #handle { trusted_offset = Offset1 },
put({Ref, fhc_handle}, Handle2),
{ok, Handle2};
{error, Reason} ->
- age_tree_delete(Now),
{error, Reason}
end.