summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-11-10 13:59:05 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-11-10 13:59:05 +0000
commitc2088028ff1cbf4a3d4d680f87054e9b5aecb345 (patch)
treeeb3c6b241c825eb57fcf65e46c1936b83d5ec1a3
parent5957db9e3d058335f70d0c80acf8e28f14176bfc (diff)
downloadrabbitmq-server-c2088028ff1cbf4a3d4d680f87054e9b5aecb345.tar.gz
First pass at adding a read buffer to file_handle_cache.
-rw-r--r--src/file_handle_cache.erl104
1 files changed, 82 insertions, 22 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index cec4bccc..126b3f81 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -178,6 +178,9 @@
write_buffer_size,
write_buffer_size_limit,
write_buffer,
+ read_buffer,
+ read_buffer_size,
+ read_buffer_size_limit,
at_eof,
path,
mode,
@@ -334,13 +337,42 @@ read(Ref, Count) ->
[Ref],
fun ([#handle { is_read = false }]) ->
{error, not_open_for_reading};
- ([Handle = #handle { hdl = Hdl, offset = Offset }]) ->
- case prim_file_read(Hdl, Count) of
- {ok, Data} = Obj -> Offset1 = Offset + iolist_size(Data),
- {Obj,
- [Handle #handle { offset = Offset1 }]};
- eof -> {eof, [Handle #handle { at_eof = true }]};
- Error -> {Error, [Handle]}
+ ([Handle = #handle{read_buffer = Buf,
+ read_buffer_size = BufSz,
+ offset = Offset}]) when BufSz >= Count ->
+ <<Hd:Count/binary, Tl/binary>> = Buf,
+ {{ok, Hd}, [Handle#handle{offset = Offset + Count,
+ read_buffer = Tl,
+ read_buffer_size = BufSz - Count}]};
+ ([Handle = #handle{read_buffer = Buf,
+ read_buffer_size = BufSz,
+ read_buffer_size_limit = Limit,
+ hdl = Hdl,
+ offset = Offset}]) ->
+ WantedCount = Count - BufSz,
+ case prim_file_read(Hdl, Limit) of
+ {ok, Data} ->
+ ReadCount = size(Data),
+ case ReadCount < WantedCount of
+ true ->
+ OffSet1 = Offset + BufSz + ReadCount,
+ {{ok, <<Buf/binary, Data/binary>>},
+ [Handle#handle{offset = OffSet1,
+ read_buffer = <<>>,
+ read_buffer_size = 0}]};
+ false ->
+ <<Hd:WantedCount/binary, Tl/binary>> = Data,
+ OffSet1 = Offset + BufSz + WantedCount,
+ BufSz1 = ReadCount - WantedCount,
+ {{ok, <<Buf/binary, Hd/binary>>},
+ [Handle#handle{offset = OffSet1,
+ read_buffer = Tl,
+ read_buffer_size = BufSz1}]}
+ end;
+ eof ->
+ {eof, [Handle #handle { at_eof = true }]};
+ Error -> %% TODO correct or change handle?
+ {Error, [Handle]}
end
end).
@@ -465,8 +497,10 @@ clear(Ref) ->
fun ([#handle { at_eof = true, write_buffer_size = 0, offset = 0 }]) ->
ok;
([Handle]) ->
- case maybe_seek(bof, Handle #handle { write_buffer = [],
- write_buffer_size = 0 }) of
+ case maybe_seek(bof, Handle #handle { write_buffer = [],
+ write_buffer_size = 0,
+ read_buffer = <<>>,
+ read_buffer_size = 0}) of
{{ok, 0}, Handle1 = #handle { hdl = Hdl }} ->
case prim_file:truncate(Hdl) of
ok -> {ok, [Handle1 #handle { at_eof = true }]};
@@ -633,9 +667,11 @@ reopen([{Ref, NewOrReopen, Handle = #handle { hdl = closed,
{ok, Hdl} ->
Now = now(),
{{ok, _Offset}, Handle1} =
- maybe_seek(Offset, Handle #handle { hdl = Hdl,
- offset = 0,
- last_used_at = Now }),
+ maybe_seek(Offset, Handle #handle { hdl = Hdl,
+ offset = 0,
+ read_buffer = <<>>,
+ read_buffer_size = 0,
+ last_used_at = Now }),
put({Ref, fhc_handle}, Handle1),
reopen(RefNewOrReopenHdls, gb_trees:insert(Now, Ref, Tree),
[{Ref, Handle1} | RefHdls]);
@@ -727,6 +763,9 @@ new_closed_handle(Path, Mode, Options) ->
write_buffer_size = 0,
write_buffer_size_limit = WriteBufferSize,
write_buffer = [],
+ read_buffer_size = 0,
+ read_buffer_size_limit = 1000000, %% TODO
+ read_buffer = <<>>,
at_eof = false,
path = Path,
mode = Mode,
@@ -787,17 +826,38 @@ hard_close(Handle) ->
Result
end.
-maybe_seek(NewOffset, Handle = #handle { hdl = Hdl, offset = Offset,
- at_eof = AtEoF }) ->
+maybe_seek(NewOffset, Handle = #handle{hdl = Hdl,
+ offset = Offset,
+ read_buffer = Buf,
+ read_buffer_size = BufSz,
+ at_eof = AtEoF}) ->
{AtEoF1, NeedsSeek} = needs_seek(AtEoF, Offset, NewOffset),
- case (case NeedsSeek of
- true -> prim_file:position(Hdl, NewOffset);
- false -> {ok, Offset}
- end) of
- {ok, Offset1} = Result ->
- {Result, Handle #handle { offset = Offset1, at_eof = AtEoF1 }};
- {error, _} = Error ->
- {Error, Handle}
+ case NeedsSeek of
+ true ->
+ case not is_number(NewOffset) orelse
+ NewOffset < Offset orelse
+ NewOffset > BufSz + Offset of
+ true ->
+ case prim_file:position(Hdl, NewOffset) of
+ {ok, Offset1} = Result ->
+ {Result, Handle#handle{offset = Offset1,
+ at_eof = AtEoF1,
+ read_buffer = <<>>,
+ read_buffer_size = 0}};
+ {error, _} = Error ->
+ {Error, Handle}
+ end;
+ false ->
+ Diff = NewOffset - Offset,
+ <<_:Diff/binary, Rest/binary>> = Buf,
+ {{ok, NewOffset},
+ Handle#handle{offset = NewOffset,
+ at_eof = AtEoF1,
+ read_buffer = Rest,
+ read_buffer_size = BufSz - Diff}}
+ end;
+ false ->
+ {{ok, Offset}, Handle}
end.
needs_seek( AtEoF, _CurOffset, cur ) -> {AtEoF, false};