diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-11-10 13:59:05 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-11-10 13:59:05 +0000 |
commit | c2088028ff1cbf4a3d4d680f87054e9b5aecb345 (patch) | |
tree | eb3c6b241c825eb57fcf65e46c1936b83d5ec1a3 /src | |
parent | 5957db9e3d058335f70d0c80acf8e28f14176bfc (diff) | |
download | rabbitmq-server-c2088028ff1cbf4a3d4d680f87054e9b5aecb345.tar.gz |
First pass at adding a read buffer to file_handle_cache.
Diffstat (limited to 'src')
-rw-r--r-- | src/file_handle_cache.erl | 104 |
1 files changed, 82 insertions, 22 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index cec4bccc..126b3f81 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -178,6 +178,9 @@ write_buffer_size, write_buffer_size_limit, write_buffer, + read_buffer, + read_buffer_size, + read_buffer_size_limit, at_eof, path, mode, @@ -334,13 +337,42 @@ read(Ref, Count) -> [Ref], fun ([#handle { is_read = false }]) -> {error, not_open_for_reading}; - ([Handle = #handle { hdl = Hdl, offset = Offset }]) -> - case prim_file_read(Hdl, Count) of - {ok, Data} = Obj -> Offset1 = Offset + iolist_size(Data), - {Obj, - [Handle #handle { offset = Offset1 }]}; - eof -> {eof, [Handle #handle { at_eof = true }]}; - Error -> {Error, [Handle]} + ([Handle = #handle{read_buffer = Buf, + read_buffer_size = BufSz, + offset = Offset}]) when BufSz >= Count -> + <<Hd:Count/binary, Tl/binary>> = Buf, + {{ok, Hd}, [Handle#handle{offset = Offset + Count, + read_buffer = Tl, + read_buffer_size = BufSz - Count}]}; + ([Handle = #handle{read_buffer = Buf, + read_buffer_size = BufSz, + read_buffer_size_limit = Limit, + hdl = Hdl, + offset = Offset}]) -> + WantedCount = Count - BufSz, + case prim_file_read(Hdl, Limit) of + {ok, Data} -> + ReadCount = size(Data), + case ReadCount < WantedCount of + true -> + OffSet1 = Offset + BufSz + ReadCount, + {{ok, <<Buf/binary, Data/binary>>}, + [Handle#handle{offset = OffSet1, + read_buffer = <<>>, + read_buffer_size = 0}]}; + false -> + <<Hd:WantedCount/binary, Tl/binary>> = Data, + OffSet1 = Offset + BufSz + WantedCount, + BufSz1 = ReadCount - WantedCount, + {{ok, <<Buf/binary, Hd/binary>>}, + [Handle#handle{offset = OffSet1, + read_buffer = Tl, + read_buffer_size = BufSz1}]} + end; + eof -> + {eof, [Handle #handle { at_eof = true }]}; + Error -> %% TODO correct or change handle? + {Error, [Handle]} end end). @@ -465,8 +497,10 @@ clear(Ref) -> fun ([#handle { at_eof = true, write_buffer_size = 0, offset = 0 }]) -> ok; ([Handle]) -> - case maybe_seek(bof, Handle #handle { write_buffer = [], - write_buffer_size = 0 }) of + case maybe_seek(bof, Handle #handle { write_buffer = [], + write_buffer_size = 0, + read_buffer = <<>>, + read_buffer_size = 0}) of {{ok, 0}, Handle1 = #handle { hdl = Hdl }} -> case prim_file:truncate(Hdl) of ok -> {ok, [Handle1 #handle { at_eof = true }]}; @@ -633,9 +667,11 @@ reopen([{Ref, NewOrReopen, Handle = #handle { hdl = closed, {ok, Hdl} -> Now = now(), {{ok, _Offset}, Handle1} = - maybe_seek(Offset, Handle #handle { hdl = Hdl, - offset = 0, - last_used_at = Now }), + maybe_seek(Offset, Handle #handle { hdl = Hdl, + offset = 0, + read_buffer = <<>>, + read_buffer_size = 0, + last_used_at = Now }), put({Ref, fhc_handle}, Handle1), reopen(RefNewOrReopenHdls, gb_trees:insert(Now, Ref, Tree), [{Ref, Handle1} | RefHdls]); @@ -727,6 +763,9 @@ new_closed_handle(Path, Mode, Options) -> write_buffer_size = 0, write_buffer_size_limit = WriteBufferSize, write_buffer = [], + read_buffer_size = 0, + read_buffer_size_limit = 1000000, %% TODO + read_buffer = <<>>, at_eof = false, path = Path, mode = Mode, @@ -787,17 +826,38 @@ hard_close(Handle) -> Result end. -maybe_seek(NewOffset, Handle = #handle { hdl = Hdl, offset = Offset, - at_eof = AtEoF }) -> +maybe_seek(NewOffset, Handle = #handle{hdl = Hdl, + offset = Offset, + read_buffer = Buf, + read_buffer_size = BufSz, + at_eof = AtEoF}) -> {AtEoF1, NeedsSeek} = needs_seek(AtEoF, Offset, NewOffset), - case (case NeedsSeek of - true -> prim_file:position(Hdl, NewOffset); - false -> {ok, Offset} - end) of - {ok, Offset1} = Result -> - {Result, Handle #handle { offset = Offset1, at_eof = AtEoF1 }}; - {error, _} = Error -> - {Error, Handle} + case NeedsSeek of + true -> + case not is_number(NewOffset) orelse + NewOffset < Offset orelse + NewOffset > BufSz + Offset of + true -> + case prim_file:position(Hdl, NewOffset) of + {ok, Offset1} = Result -> + {Result, Handle#handle{offset = Offset1, + at_eof = AtEoF1, + read_buffer = <<>>, + read_buffer_size = 0}}; + {error, _} = Error -> + {Error, Handle} + end; + false -> + Diff = NewOffset - Offset, + <<_:Diff/binary, Rest/binary>> = Buf, + {{ok, NewOffset}, + Handle#handle{offset = NewOffset, + at_eof = AtEoF1, + read_buffer = Rest, + read_buffer_size = BufSz - Diff}} + end; + false -> + {{ok, Offset}, Handle} end. needs_seek( AtEoF, _CurOffset, cur ) -> {AtEoF, false}; |