summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-11-13 10:56:33 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-11-13 10:56:33 +0000
commitb573d47f639fdc8f8b5ff869252543b00b1a8fda (patch)
tree142c61bc982f3d54879fff0b831dd4fa43350fa0
parent16b013458097e1a9a7d20f0138c24f705a8251f3 (diff)
downloadrabbitmq-server-b573d47f639fdc8f8b5ff869252543b00b1a8fda.tar.gz
Be a bit more systematic about reseting the read buffer.
-rw-r--r--src/file_handle_cache.erl51
1 files changed, 30 insertions, 21 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index a7d5ce15..fa896c67 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -334,7 +334,7 @@ close(Ref) ->
read(Ref, Count) ->
with_flushed_handles(
- [Ref],
+ [Ref], keep,
fun ([#handle { is_read = false }]) ->
{error, not_open_for_reading};
([Handle = #handle{read_buffer = Buf,
@@ -357,9 +357,8 @@ read(Ref, Count) ->
true ->
OffSet1 = Offset + BufSz + ReadCount,
{{ok, <<Buf/binary, Data/binary>>},
- [Handle#handle{offset = OffSet1,
- read_buffer = <<>>,
- read_buffer_size = 0}]};
+ [reset_read_buffer(
+ Handle#handle{offset = OffSet1})]};
false ->
<<Hd:WantedCount/binary, Tl/binary>> = Data,
OffSet1 = Offset + BufSz + WantedCount,
@@ -409,7 +408,7 @@ append(Ref, Data) ->
sync(Ref) ->
with_flushed_handles(
- [Ref],
+ [Ref], keep,
fun ([#handle { is_dirty = false, write_buffer = [] }]) ->
ok;
([Handle = #handle { hdl = Hdl,
@@ -429,7 +428,7 @@ needs_sync(Ref) ->
position(Ref, NewOffset) ->
with_flushed_handles(
- [Ref],
+ [Ref], keep,
fun ([Handle]) -> {Result, Handle1} = maybe_seek(NewOffset, Handle),
{Result, [Handle1]}
end).
@@ -497,10 +496,8 @@ clear(Ref) ->
fun ([#handle { at_eof = true, write_buffer_size = 0, offset = 0 }]) ->
ok;
([Handle]) ->
- case maybe_seek(bof, Handle #handle { write_buffer = [],
- write_buffer_size = 0,
- read_buffer = <<>>,
- read_buffer_size = 0}) of
+ case maybe_seek(bof, Handle#handle{write_buffer = [],
+ write_buffer_size = 0}) of
{{ok, 0}, Handle1 = #handle { hdl = Hdl }} ->
case prim_file:truncate(Hdl) of
ok -> {ok, [Handle1 #handle { at_eof = true }]};
@@ -599,8 +596,15 @@ append_to_write(Mode) ->
end.
with_handles(Refs, Fun) ->
+ with_handles(Refs, reset, Fun).
+
+with_handles(Refs, ReadBuffer, Fun) ->
case get_or_reopen([{Ref, reopen} || Ref <- Refs]) of
- {ok, Handles} ->
+ {ok, Handles0} ->
+ Handles = case ReadBuffer of
+ reset -> [reset_read_buffer(H) || H <- Handles0];
+ keep -> Handles0
+ end,
case Fun(Handles) of
{Result, Handles1} when is_list(Handles1) ->
lists:zipwith(fun put_handle/2, Refs, Handles1),
@@ -613,8 +617,11 @@ with_handles(Refs, Fun) ->
end.
with_flushed_handles(Refs, Fun) ->
+ with_flushed_handles(Refs, reset, Fun).
+
+with_flushed_handles(Refs, ReadBuffer, Fun) ->
with_handles(
- Refs,
+ Refs, ReadBuffer,
fun (Handles) ->
case lists:foldl(
fun (Handle, {ok, HandlesAcc}) ->
@@ -673,11 +680,10 @@ reopen([{Ref, NewOrReopen, Handle = #handle { hdl = closed,
{ok, Hdl} ->
Now = now(),
{{ok, _Offset}, Handle1} =
- maybe_seek(Offset, Handle #handle { hdl = Hdl,
- offset = 0,
- read_buffer = <<>>,
- read_buffer_size = 0,
- last_used_at = Now }),
+ maybe_seek(Offset, reset_read_buffer(
+ Handle#handle{hdl = Hdl,
+ offset = 0,
+ last_used_at = Now})),
put({Ref, fhc_handle}, Handle1),
reopen(RefNewOrReopenHdls, gb_trees:insert(Now, Ref, Tree),
[{Ref, Handle1} | RefHdls]);
@@ -846,10 +852,9 @@ maybe_seek(NewOffset, Handle = #handle{hdl = Hdl,
true ->
case prim_file_position(Hdl, NewOffset) of
{ok, Offset1} = Result ->
- {Result, Handle#handle{offset = Offset1,
- at_eof = AtEoF1,
- read_buffer = <<>>,
- read_buffer_size = 0}};
+ {Result, reset_read_buffer(
+ Handle#handle{offset = Offset1,
+ at_eof = AtEoF1})};
{error, _} = Error ->
{Error, Handle}
end;
@@ -903,6 +908,10 @@ write_buffer(Handle = #handle { hdl = Hdl, offset = Offset,
{Error, Handle}
end.
+reset_read_buffer(Handle) ->
+ Handle#handle{read_buffer = <<>>,
+ read_buffer_size = 0}.
+
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(total_limit, #fhc_state{limit = Limit}) -> Limit;