From dd3256cdc7048de9e2f32569b875cd104e03a11a Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Fri, 31 Oct 2014 13:08:16 +0000 Subject: Inline parse_field_value/1 and thus prevent sub-binary construction in accordance with the principles documented at http://www.erlang.org/doc/efficiency_guide/binaryhandling.html --- Makefile | 2 +- src/rabbit_binary_parser.erl | 103 ++++++++++++++++++++++++++++++------------- 2 files changed, 74 insertions(+), 31 deletions(-) diff --git a/Makefile b/Makefile index c955a8fc..1b66a306 100644 --- a/Makefile +++ b/Makefile @@ -57,7 +57,7 @@ USE_PROPER_QC:=$(shell erl -noshell -eval 'io:format({module, proper} =:= code:e endif #other args: +native +"{hipe,[o3,verbose]}" -Ddebug=true +debug_info +no_strict_record_tests -ERLC_OPTS=-I $(INCLUDE_DIR) -Wall -v +debug_info $(call boolean_macro,$(USE_SPECS),use_specs) $(call boolean_macro,$(USE_PROPER_QC),use_proper_qc) +ERLC_OPTS=-I $(INCLUDE_DIR) -Wall -v +debug_info +bin_opt_info $(call boolean_macro,$(USE_SPECS),use_specs) $(call boolean_macro,$(USE_PROPER_QC),use_proper_qc) ifdef INSTRUMENT_FOR_QC ERLC_OPTS += -DINSTR_MOD=gm_qc diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl index 3ab82cad..8850d69e 100644 --- a/src/rabbit_binary_parser.erl +++ b/src/rabbit_binary_parser.erl @@ -41,48 +41,91 @@ %% parse_table supports the AMQP 0-8/0-9 standard types, S, I, D, T %% and F, as well as the QPid extensions b, d, f, l, s, t, x, and V. +-define(SIMPLE_PARSE_TABLE(BType, Pattern, RType), + parse_table(<>) -> + [{NameString, RType, Value} | parse_table(Rest)]). + +%% Note that we try to put these in approximately the order we expect +%% to hit them, that's why the empty binary is half way through. + +parse_table(<>) -> + [{NameString, longstr, Value} | parse_table(Rest)]; + +?SIMPLE_PARSE_TABLE($I, Value:32/signed, signedint); +?SIMPLE_PARSE_TABLE($T, Value:64/unsigned, timestamp); + parse_table(<<>>) -> []; -parse_table(<>) -> - {Type, Value, Rest} = parse_field_value(ValueAndRest), - [{NameString, Type, Value} | parse_table(Rest)]. -parse_array(<<>>) -> - []; -parse_array(<>) -> - {Type, Value, Rest} = parse_field_value(ValueAndRest), - [{Type, Value} | parse_array(Rest)]. +?SIMPLE_PARSE_TABLE($b, Value:8/signed, byte); +?SIMPLE_PARSE_TABLE($d, Value:64/float, double); +?SIMPLE_PARSE_TABLE($f, Value:32/float, float); +?SIMPLE_PARSE_TABLE($l, Value:64/signed, long); +?SIMPLE_PARSE_TABLE($s, Value:16/signed, short); + +parse_table(<>) -> + [{NameString, bool, (Value /= 0)} | parse_table(Rest)]; + +parse_table(<>) -> + [{NameString, decimal, {Before, After}} | parse_table(Rest)]; -parse_field_value(<<$S, VLen:32/unsigned, V:VLen/binary, R/binary>>) -> - {longstr, V, R}; +parse_table(<>) -> + [{NameString, table, parse_table(Value)} | parse_table(Rest)]; -parse_field_value(<<$I, V:32/signed, R/binary>>) -> - {signedint, V, R}; +parse_table(<>) -> + [{NameString, array, parse_array(Value)} | parse_table(Rest)]; + +parse_table(<>) -> + [{NameString, binary, Value} | parse_table(Rest)]; + +parse_table(<>) -> + [{NameString, void, undefined} | parse_table(Rest)]. + +-define(SIMPLE_PARSE_ARRAY(BType, Pattern, RType), + parse_array(<>) -> + [{RType, Value} | parse_table(Rest)]). + +parse_array(<>) -> + [{NameString, longstr, Value} | parse_array(Rest)]; + +?SIMPLE_PARSE_ARRAY($I, Value:32/signed, signedint); +?SIMPLE_PARSE_ARRAY($T, Value:64/unsigned, timestamp); + +parse_array(<<>>) -> + []; -parse_field_value(<<$D, Before:8/unsigned, After:32/unsigned, R/binary>>) -> - {decimal, {Before, After}, R}; +?SIMPLE_PARSE_ARRAY($b, Value:8/signed, byte); +?SIMPLE_PARSE_ARRAY($d, Value:64/float, double); +?SIMPLE_PARSE_ARRAY($f, Value:32/float, float); +?SIMPLE_PARSE_ARRAY($l, Value:64/signed, long); +?SIMPLE_PARSE_ARRAY($s, Value:16/signed, short); -parse_field_value(<<$T, V:64/unsigned, R/binary>>) -> - {timestamp, V, R}; +parse_array(<<$t, Value:8/unsigned, Rest/binary>>) -> + [{bool, (Value /= 0)} | parse_array(Rest)]; -parse_field_value(<<$F, VLen:32/unsigned, Table:VLen/binary, R/binary>>) -> - {table, parse_table(Table), R}; +parse_array(<<$D, Before:8/unsigned, After:32/unsigned, Rest/binary>>) -> + [{decimal, {Before, After}} | parse_array(Rest)]; -parse_field_value(<<$A, VLen:32/unsigned, Array:VLen/binary, R/binary>>) -> - {array, parse_array(Array), R}; +parse_array(<<$F, VLen:32/unsigned, Value:VLen/binary, Rest/binary>>) -> + [{table, parse_table(Value)} | parse_array(Rest)]; -parse_field_value(<<$b, V:8/signed, R/binary>>) -> {byte, V, R}; -parse_field_value(<<$d, V:64/float, R/binary>>) -> {double, V, R}; -parse_field_value(<<$f, V:32/float, R/binary>>) -> {float, V, R}; -parse_field_value(<<$l, V:64/signed, R/binary>>) -> {long, V, R}; -parse_field_value(<<$s, V:16/signed, R/binary>>) -> {short, V, R}; -parse_field_value(<<$t, V:8/unsigned, R/binary>>) -> {bool, (V /= 0), R}; +parse_array(<<$A, VLen:32/unsigned, Value:VLen/binary, Rest/binary>>) -> + [{array, parse_array(Value)} | parse_array(Rest)]; -parse_field_value(<<$x, VLen:32/unsigned, V:VLen/binary, R/binary>>) -> - {binary, V, R}; +parse_array(<<$x, VLen:32/unsigned, Value:VLen/binary, Rest/binary>>) -> + [{binary, Value} | parse_array(Rest)]; -parse_field_value(<<$V, R/binary>>) -> - {void, undefined, R}. +parse_array(<<$V, Rest/binary>>) -> + [{void, undefined} | parse_array(Rest)]. ensure_content_decoded(Content = #content{properties = Props}) when Props =/= none -> -- cgit v1.2.1 From e947a3923bfe20d99d43c9341370b137a33607b7 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Fri, 31 Oct 2014 13:09:18 +0000 Subject: Oops --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 1b66a306..c955a8fc 100644 --- a/Makefile +++ b/Makefile @@ -57,7 +57,7 @@ USE_PROPER_QC:=$(shell erl -noshell -eval 'io:format({module, proper} =:= code:e endif #other args: +native +"{hipe,[o3,verbose]}" -Ddebug=true +debug_info +no_strict_record_tests -ERLC_OPTS=-I $(INCLUDE_DIR) -Wall -v +debug_info +bin_opt_info $(call boolean_macro,$(USE_SPECS),use_specs) $(call boolean_macro,$(USE_PROPER_QC),use_proper_qc) +ERLC_OPTS=-I $(INCLUDE_DIR) -Wall -v +debug_info $(call boolean_macro,$(USE_SPECS),use_specs) $(call boolean_macro,$(USE_PROPER_QC),use_proper_qc) ifdef INSTRUMENT_FOR_QC ERLC_OPTS += -DINSTR_MOD=gm_qc -- cgit v1.2.1 From 4d7c548ddb5e8109fe18984e4dadb4f0cbb86078 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Mon, 3 Nov 2014 11:28:31 +0000 Subject: That shouldn't be a complete copy-paste of the parse_table/1 version. --- src/rabbit_binary_parser.erl | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl index 8850d69e..6e277d35 100644 --- a/src/rabbit_binary_parser.erl +++ b/src/rabbit_binary_parser.erl @@ -93,9 +93,8 @@ parse_table(<>) -> [{RType, Value} | parse_table(Rest)]). -parse_array(<>) -> - [{NameString, longstr, Value} | parse_array(Rest)]; +parse_array(<<$S, VLen:32/unsigned, Value:VLen/binary, Rest/binary>>) -> + [{longstr, Value} | parse_array(Rest)]; ?SIMPLE_PARSE_ARRAY($I, Value:32/signed, signedint); ?SIMPLE_PARSE_ARRAY($T, Value:64/unsigned, timestamp); -- cgit v1.2.1 From 694943dc70c2466ee8d3cdcaf190081645a3b160 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Mon, 3 Nov 2014 13:19:43 +0000 Subject: ...and another stupid bug --- src/rabbit_binary_parser.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl index 6e277d35..ee8147f4 100644 --- a/src/rabbit_binary_parser.erl +++ b/src/rabbit_binary_parser.erl @@ -91,7 +91,7 @@ parse_table(<>) -> - [{RType, Value} | parse_table(Rest)]). + [{RType, Value} | parse_array(Rest)]). parse_array(<<$S, VLen:32/unsigned, Value:VLen/binary, Rest/binary>>) -> [{longstr, Value} | parse_array(Rest)]; -- cgit v1.2.1 From c8c5c1504d8b39ceb956adf07f1cbba1147e6990 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Thu, 6 Nov 2014 11:39:25 +0000 Subject: Simple statistics about reads, writes and syncs through the FHC. --- src/file_handle_cache.erl | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 3a7a692c..cec4bccc 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -335,7 +335,7 @@ read(Ref, Count) -> fun ([#handle { is_read = false }]) -> {error, not_open_for_reading}; ([Handle = #handle { hdl = Hdl, offset = Offset }]) -> - case prim_file:read(Hdl, Count) of + case prim_file_read(Hdl, Count) of {ok, Data} = Obj -> Offset1 = Offset + iolist_size(Data), {Obj, [Handle #handle { offset = Offset1 }]}; @@ -355,7 +355,7 @@ append(Ref, Data) -> write_buffer_size_limit = 0, at_eof = true } = Handle1} -> Offset1 = Offset + iolist_size(Data), - {prim_file:write(Hdl, Data), + {prim_file_write(Hdl, Data), [Handle1 #handle { is_dirty = true, offset = Offset1 }]}; {{ok, _Offset}, #handle { write_buffer = WriteBuffer, write_buffer_size = Size, @@ -382,7 +382,7 @@ sync(Ref) -> ok; ([Handle = #handle { hdl = Hdl, is_dirty = true, write_buffer = [] }]) -> - case prim_file:sync(Hdl) of + case prim_file_sync(Hdl) of ok -> {ok, [Handle #handle { is_dirty = false }]}; Error -> {Error, [Handle]} end @@ -539,6 +539,17 @@ info(Items) -> gen_server2:call(?SERVER, {info, Items}, infinity). %% Internal functions %%---------------------------------------------------------------------------- +prim_file_read(Hdl, Size) -> + file_handle_cache_stats:update( + read, Size, fun() -> prim_file:read(Hdl, Size) end). + +prim_file_write(Hdl, Bytes) -> + file_handle_cache_stats:update( + write, iolist_size(Bytes), fun() -> prim_file:write(Hdl, Bytes) end). + +prim_file_sync(Hdl) -> + file_handle_cache_stats:update(sync, fun() -> prim_file:sync(Hdl) end). + is_reader(Mode) -> lists:member(read, Mode). is_writer(Mode) -> lists:member(write, Mode). @@ -742,7 +753,7 @@ soft_close(Handle) -> is_dirty = IsDirty, last_used_at = Then } = Handle1 } -> ok = case IsDirty of - true -> prim_file:sync(Hdl); + true -> prim_file_sync(Hdl); false -> ok end, ok = prim_file:close(Hdl), @@ -817,7 +828,7 @@ write_buffer(Handle = #handle { hdl = Hdl, offset = Offset, write_buffer = WriteBuffer, write_buffer_size = DataSize, at_eof = true }) -> - case prim_file:write(Hdl, lists:reverse(WriteBuffer)) of + case prim_file_write(Hdl, lists:reverse(WriteBuffer)) of ok -> Offset1 = Offset + DataSize, {ok, Handle #handle { offset = Offset1, is_dirty = true, @@ -843,6 +854,7 @@ used(#fhc_state{open_count = C1, %%---------------------------------------------------------------------------- init([AlarmSet, AlarmClear]) -> + file_handle_cache_stats:init(), Limit = case application:get_env(file_handles_high_watermark) of {ok, Watermark} when (is_integer(Watermark) andalso Watermark > 0) -> -- cgit v1.2.1 From 28274a6f2ed0a6b0a26e9ea0a80ef3a16e98274b Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Thu, 6 Nov 2014 12:49:44 +0000 Subject: Forgot to add that... --- src/file_handle_cache_stats.erl | 53 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 src/file_handle_cache_stats.erl diff --git a/src/file_handle_cache_stats.erl b/src/file_handle_cache_stats.erl new file mode 100644 index 00000000..c8af20a2 --- /dev/null +++ b/src/file_handle_cache_stats.erl @@ -0,0 +1,53 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved. +%% + +-module(file_handle_cache_stats). + +%% stats about read / write operations that go through the fhc. + +-export([init/0, update/3, update/2, get/0]). + +-define(TABLE, ?MODULE). +-define(MICRO_TO_MILLI, 1000). + +init() -> + ets:new(?TABLE, [public, named_table]), + [ets:insert(?TABLE, {{Op, Counter}, 0}) || Op <- [read, write], + Counter <- [count, bytes, time]], + [ets:insert(?TABLE, {{Op, Counter}, 0}) || Op <- [sync], + Counter <- [count, time]]. + +update(Op, Bytes, Thunk) -> + {Time, Res} = timer:tc(Thunk), + ets:update_counter(?TABLE, {Op, count}, 1), + ets:update_counter(?TABLE, {Op, bytes}, Bytes), + ets:update_counter(?TABLE, {Op, time}, Time), + Res. + +update(Op, Thunk) -> + {Time, Res} = timer:tc(Thunk), + ets:update_counter(?TABLE, {Op, count}, 1), + ets:update_counter(?TABLE, {Op, time}, Time), + Res. + +get() -> + lists:sort([output(K, V) || {K, V} <- ets:tab2list(?TABLE)]). + +output({Op, time}, Val) -> {flatten_key(Op, time), Val / ?MICRO_TO_MILLI}; +output({Op, Ctr}, Val) -> {flatten_key(Op, Ctr), Val}. + +flatten_key(A, B) -> + list_to_atom("fhc_" ++ atom_to_list(A) ++ "_" ++ atom_to_list(B)). -- cgit v1.2.1 From 5957db9e3d058335f70d0c80acf8e28f14176bfc Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Thu, 6 Nov 2014 13:51:48 +0000 Subject: Move this formatting up to the agent. --- src/file_handle_cache_stats.erl | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/src/file_handle_cache_stats.erl b/src/file_handle_cache_stats.erl index c8af20a2..d055d84a 100644 --- a/src/file_handle_cache_stats.erl +++ b/src/file_handle_cache_stats.erl @@ -21,7 +21,6 @@ -export([init/0, update/3, update/2, get/0]). -define(TABLE, ?MODULE). --define(MICRO_TO_MILLI, 1000). init() -> ets:new(?TABLE, [public, named_table]), @@ -44,10 +43,4 @@ update(Op, Thunk) -> Res. get() -> - lists:sort([output(K, V) || {K, V} <- ets:tab2list(?TABLE)]). - -output({Op, time}, Val) -> {flatten_key(Op, time), Val / ?MICRO_TO_MILLI}; -output({Op, Ctr}, Val) -> {flatten_key(Op, Ctr), Val}. - -flatten_key(A, B) -> - list_to_atom("fhc_" ++ atom_to_list(A) ++ "_" ++ atom_to_list(B)). + lists:sort(ets:tab2list(?TABLE)). -- cgit v1.2.1 From c2088028ff1cbf4a3d4d680f87054e9b5aecb345 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Mon, 10 Nov 2014 13:59:05 +0000 Subject: First pass at adding a read buffer to file_handle_cache. --- src/file_handle_cache.erl | 104 ++++++++++++++++++++++++++++++++++++---------- 1 file changed, 82 insertions(+), 22 deletions(-) diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index cec4bccc..126b3f81 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -178,6 +178,9 @@ write_buffer_size, write_buffer_size_limit, write_buffer, + read_buffer, + read_buffer_size, + read_buffer_size_limit, at_eof, path, mode, @@ -334,13 +337,42 @@ read(Ref, Count) -> [Ref], fun ([#handle { is_read = false }]) -> {error, not_open_for_reading}; - ([Handle = #handle { hdl = Hdl, offset = Offset }]) -> - case prim_file_read(Hdl, Count) of - {ok, Data} = Obj -> Offset1 = Offset + iolist_size(Data), - {Obj, - [Handle #handle { offset = Offset1 }]}; - eof -> {eof, [Handle #handle { at_eof = true }]}; - Error -> {Error, [Handle]} + ([Handle = #handle{read_buffer = Buf, + read_buffer_size = BufSz, + offset = Offset}]) when BufSz >= Count -> + <> = Buf, + {{ok, Hd}, [Handle#handle{offset = Offset + Count, + read_buffer = Tl, + read_buffer_size = BufSz - Count}]}; + ([Handle = #handle{read_buffer = Buf, + read_buffer_size = BufSz, + read_buffer_size_limit = Limit, + hdl = Hdl, + offset = Offset}]) -> + WantedCount = Count - BufSz, + case prim_file_read(Hdl, Limit) of + {ok, Data} -> + ReadCount = size(Data), + case ReadCount < WantedCount of + true -> + OffSet1 = Offset + BufSz + ReadCount, + {{ok, <>}, + [Handle#handle{offset = OffSet1, + read_buffer = <<>>, + read_buffer_size = 0}]}; + false -> + <> = Data, + OffSet1 = Offset + BufSz + WantedCount, + BufSz1 = ReadCount - WantedCount, + {{ok, <>}, + [Handle#handle{offset = OffSet1, + read_buffer = Tl, + read_buffer_size = BufSz1}]} + end; + eof -> + {eof, [Handle #handle { at_eof = true }]}; + Error -> %% TODO correct or change handle? + {Error, [Handle]} end end). @@ -465,8 +497,10 @@ clear(Ref) -> fun ([#handle { at_eof = true, write_buffer_size = 0, offset = 0 }]) -> ok; ([Handle]) -> - case maybe_seek(bof, Handle #handle { write_buffer = [], - write_buffer_size = 0 }) of + case maybe_seek(bof, Handle #handle { write_buffer = [], + write_buffer_size = 0, + read_buffer = <<>>, + read_buffer_size = 0}) of {{ok, 0}, Handle1 = #handle { hdl = Hdl }} -> case prim_file:truncate(Hdl) of ok -> {ok, [Handle1 #handle { at_eof = true }]}; @@ -633,9 +667,11 @@ reopen([{Ref, NewOrReopen, Handle = #handle { hdl = closed, {ok, Hdl} -> Now = now(), {{ok, _Offset}, Handle1} = - maybe_seek(Offset, Handle #handle { hdl = Hdl, - offset = 0, - last_used_at = Now }), + maybe_seek(Offset, Handle #handle { hdl = Hdl, + offset = 0, + read_buffer = <<>>, + read_buffer_size = 0, + last_used_at = Now }), put({Ref, fhc_handle}, Handle1), reopen(RefNewOrReopenHdls, gb_trees:insert(Now, Ref, Tree), [{Ref, Handle1} | RefHdls]); @@ -727,6 +763,9 @@ new_closed_handle(Path, Mode, Options) -> write_buffer_size = 0, write_buffer_size_limit = WriteBufferSize, write_buffer = [], + read_buffer_size = 0, + read_buffer_size_limit = 1000000, %% TODO + read_buffer = <<>>, at_eof = false, path = Path, mode = Mode, @@ -787,17 +826,38 @@ hard_close(Handle) -> Result end. -maybe_seek(NewOffset, Handle = #handle { hdl = Hdl, offset = Offset, - at_eof = AtEoF }) -> +maybe_seek(NewOffset, Handle = #handle{hdl = Hdl, + offset = Offset, + read_buffer = Buf, + read_buffer_size = BufSz, + at_eof = AtEoF}) -> {AtEoF1, NeedsSeek} = needs_seek(AtEoF, Offset, NewOffset), - case (case NeedsSeek of - true -> prim_file:position(Hdl, NewOffset); - false -> {ok, Offset} - end) of - {ok, Offset1} = Result -> - {Result, Handle #handle { offset = Offset1, at_eof = AtEoF1 }}; - {error, _} = Error -> - {Error, Handle} + case NeedsSeek of + true -> + case not is_number(NewOffset) orelse + NewOffset < Offset orelse + NewOffset > BufSz + Offset of + true -> + case prim_file:position(Hdl, NewOffset) of + {ok, Offset1} = Result -> + {Result, Handle#handle{offset = Offset1, + at_eof = AtEoF1, + read_buffer = <<>>, + read_buffer_size = 0}}; + {error, _} = Error -> + {Error, Handle} + end; + false -> + Diff = NewOffset - Offset, + <<_:Diff/binary, Rest/binary>> = Buf, + {{ok, NewOffset}, + Handle#handle{offset = NewOffset, + at_eof = AtEoF1, + read_buffer = Rest, + read_buffer_size = BufSz - Diff}} + end; + false -> + {{ok, Offset}, Handle} end. needs_seek( AtEoF, _CurOffset, cur ) -> {AtEoF, false}; -- cgit v1.2.1 From 0e338d6fc8638f8eadd594ec72c1e766f3350958 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 11 Nov 2014 16:46:20 +0000 Subject: Also generate stats on seek. --- src/file_handle_cache.erl | 6 +++++- src/file_handle_cache_stats.erl | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index cec4bccc..3364c3ae 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -550,6 +550,10 @@ prim_file_write(Hdl, Bytes) -> prim_file_sync(Hdl) -> file_handle_cache_stats:update(sync, fun() -> prim_file:sync(Hdl) end). +prim_file_position(Hdl, NewOffset) -> + file_handle_cache_stats:update( + seek, fun() -> prim_file:position(Hdl, NewOffset) end). + is_reader(Mode) -> lists:member(read, Mode). is_writer(Mode) -> lists:member(write, Mode). @@ -791,7 +795,7 @@ maybe_seek(NewOffset, Handle = #handle { hdl = Hdl, offset = Offset, at_eof = AtEoF }) -> {AtEoF1, NeedsSeek} = needs_seek(AtEoF, Offset, NewOffset), case (case NeedsSeek of - true -> prim_file:position(Hdl, NewOffset); + true -> prim_file_position(Hdl, NewOffset); false -> {ok, Offset} end) of {ok, Offset1} = Result -> diff --git a/src/file_handle_cache_stats.erl b/src/file_handle_cache_stats.erl index d055d84a..5a8d7b29 100644 --- a/src/file_handle_cache_stats.erl +++ b/src/file_handle_cache_stats.erl @@ -26,7 +26,7 @@ init() -> ets:new(?TABLE, [public, named_table]), [ets:insert(?TABLE, {{Op, Counter}, 0}) || Op <- [read, write], Counter <- [count, bytes, time]], - [ets:insert(?TABLE, {{Op, Counter}, 0}) || Op <- [sync], + [ets:insert(?TABLE, {{Op, Counter}, 0}) || Op <- [sync, seek], Counter <- [count, time]]. update(Op, Bytes, Thunk) -> -- cgit v1.2.1 From 54483ee88d6516caa56ced399e5a8cea7a72a1e3 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 11 Nov 2014 17:06:42 +0000 Subject: Emit stats on FHC recycling. --- src/file_handle_cache.erl | 12 +++++++----- src/file_handle_cache_stats.erl | 10 ++++++++-- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 3364c3ae..65f3f45b 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -626,14 +626,16 @@ reopen([], Tree, RefHdls) -> {ok, lists:reverse(RefHdls)}; reopen([{Ref, NewOrReopen, Handle = #handle { hdl = closed, path = Path, - mode = Mode, + mode = Mode0, offset = Offset, last_used_at = undefined }} | RefNewOrReopenHdls] = ToOpen, Tree, RefHdls) -> - case prim_file:open(Path, case NewOrReopen of - new -> Mode; - reopen -> [read | Mode] - end) of + Mode = case NewOrReopen of + new -> Mode0; + reopen -> file_handle_cache_stats:update(reopen), + [read | Mode0] + end, + case prim_file:open(Path, Mode) of {ok, Hdl} -> Now = now(), {{ok, _Offset}, Handle1} = diff --git a/src/file_handle_cache_stats.erl b/src/file_handle_cache_stats.erl index 5a8d7b29..b1fbb3f4 100644 --- a/src/file_handle_cache_stats.erl +++ b/src/file_handle_cache_stats.erl @@ -18,7 +18,7 @@ %% stats about read / write operations that go through the fhc. --export([init/0, update/3, update/2, get/0]). +-export([init/0, update/3, update/2, update/1, get/0]). -define(TABLE, ?MODULE). @@ -27,7 +27,9 @@ init() -> [ets:insert(?TABLE, {{Op, Counter}, 0}) || Op <- [read, write], Counter <- [count, bytes, time]], [ets:insert(?TABLE, {{Op, Counter}, 0}) || Op <- [sync, seek], - Counter <- [count, time]]. + Counter <- [count, time]], + [ets:insert(?TABLE, {{Op, Counter}, 0}) || Op <- [reopen], + Counter <- [count]]. update(Op, Bytes, Thunk) -> {Time, Res} = timer:tc(Thunk), @@ -42,5 +44,9 @@ update(Op, Thunk) -> ets:update_counter(?TABLE, {Op, time}, Time), Res. +update(Op) -> + ets:update_counter(?TABLE, {Op, count}, 1), + ok. + get() -> lists:sort(ets:tab2list(?TABLE)). -- cgit v1.2.1 From cab3f90898beac09f77d75219800f84b52f16c05 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Thu, 13 Nov 2014 09:22:02 +0000 Subject: R13B03 compatibility. --- src/file_handle_cache_stats.erl | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/file_handle_cache_stats.erl b/src/file_handle_cache_stats.erl index b1fbb3f4..832f0b3d 100644 --- a/src/file_handle_cache_stats.erl +++ b/src/file_handle_cache_stats.erl @@ -32,14 +32,14 @@ init() -> Counter <- [count]]. update(Op, Bytes, Thunk) -> - {Time, Res} = timer:tc(Thunk), + {Time, Res} = timer_tc(Thunk), ets:update_counter(?TABLE, {Op, count}, 1), ets:update_counter(?TABLE, {Op, bytes}, Bytes), ets:update_counter(?TABLE, {Op, time}, Time), Res. update(Op, Thunk) -> - {Time, Res} = timer:tc(Thunk), + {Time, Res} = timer_tc(Thunk), ets:update_counter(?TABLE, {Op, count}, 1), ets:update_counter(?TABLE, {Op, time}, Time), Res. @@ -50,3 +50,11 @@ update(Op) -> get() -> lists:sort(ets:tab2list(?TABLE)). + +%% TODO timer:tc/1 was introduced in R14B03; use that function once we +%% require that version. +timer_tc(Thunk) -> + T1 = os:timestamp(), + Res = Thunk(), + T2 = os:timestamp(), + {timer:now_diff(T2, T1), Res}. -- cgit v1.2.1 From 16b013458097e1a9a7d20f0138c24f705a8251f3 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Thu, 13 Nov 2014 10:20:27 +0000 Subject: If they ask to read more than the buffer size, do so. --- src/file_handle_cache.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 519c596d..a7d5ce15 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -346,11 +346,11 @@ read(Ref, Count) -> read_buffer_size = BufSz - Count}]}; ([Handle = #handle{read_buffer = Buf, read_buffer_size = BufSz, - read_buffer_size_limit = Limit, + read_buffer_size_limit = BufSzLimit, hdl = Hdl, offset = Offset}]) -> WantedCount = Count - BufSz, - case prim_file_read(Hdl, Limit) of + case prim_file_read(Hdl, lists:max([BufSzLimit, WantedCount])) of {ok, Data} -> ReadCount = size(Data), case ReadCount < WantedCount of -- cgit v1.2.1 From b573d47f639fdc8f8b5ff869252543b00b1a8fda Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Thu, 13 Nov 2014 10:56:33 +0000 Subject: Be a bit more systematic about reseting the read buffer. --- src/file_handle_cache.erl | 51 ++++++++++++++++++++++++++++------------------- 1 file changed, 30 insertions(+), 21 deletions(-) diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index a7d5ce15..fa896c67 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -334,7 +334,7 @@ close(Ref) -> read(Ref, Count) -> with_flushed_handles( - [Ref], + [Ref], keep, fun ([#handle { is_read = false }]) -> {error, not_open_for_reading}; ([Handle = #handle{read_buffer = Buf, @@ -357,9 +357,8 @@ read(Ref, Count) -> true -> OffSet1 = Offset + BufSz + ReadCount, {{ok, <>}, - [Handle#handle{offset = OffSet1, - read_buffer = <<>>, - read_buffer_size = 0}]}; + [reset_read_buffer( + Handle#handle{offset = OffSet1})]}; false -> <> = Data, OffSet1 = Offset + BufSz + WantedCount, @@ -409,7 +408,7 @@ append(Ref, Data) -> sync(Ref) -> with_flushed_handles( - [Ref], + [Ref], keep, fun ([#handle { is_dirty = false, write_buffer = [] }]) -> ok; ([Handle = #handle { hdl = Hdl, @@ -429,7 +428,7 @@ needs_sync(Ref) -> position(Ref, NewOffset) -> with_flushed_handles( - [Ref], + [Ref], keep, fun ([Handle]) -> {Result, Handle1} = maybe_seek(NewOffset, Handle), {Result, [Handle1]} end). @@ -497,10 +496,8 @@ clear(Ref) -> fun ([#handle { at_eof = true, write_buffer_size = 0, offset = 0 }]) -> ok; ([Handle]) -> - case maybe_seek(bof, Handle #handle { write_buffer = [], - write_buffer_size = 0, - read_buffer = <<>>, - read_buffer_size = 0}) of + case maybe_seek(bof, Handle#handle{write_buffer = [], + write_buffer_size = 0}) of {{ok, 0}, Handle1 = #handle { hdl = Hdl }} -> case prim_file:truncate(Hdl) of ok -> {ok, [Handle1 #handle { at_eof = true }]}; @@ -599,8 +596,15 @@ append_to_write(Mode) -> end. with_handles(Refs, Fun) -> + with_handles(Refs, reset, Fun). + +with_handles(Refs, ReadBuffer, Fun) -> case get_or_reopen([{Ref, reopen} || Ref <- Refs]) of - {ok, Handles} -> + {ok, Handles0} -> + Handles = case ReadBuffer of + reset -> [reset_read_buffer(H) || H <- Handles0]; + keep -> Handles0 + end, case Fun(Handles) of {Result, Handles1} when is_list(Handles1) -> lists:zipwith(fun put_handle/2, Refs, Handles1), @@ -613,8 +617,11 @@ with_handles(Refs, Fun) -> end. with_flushed_handles(Refs, Fun) -> + with_flushed_handles(Refs, reset, Fun). + +with_flushed_handles(Refs, ReadBuffer, Fun) -> with_handles( - Refs, + Refs, ReadBuffer, fun (Handles) -> case lists:foldl( fun (Handle, {ok, HandlesAcc}) -> @@ -673,11 +680,10 @@ reopen([{Ref, NewOrReopen, Handle = #handle { hdl = closed, {ok, Hdl} -> Now = now(), {{ok, _Offset}, Handle1} = - maybe_seek(Offset, Handle #handle { hdl = Hdl, - offset = 0, - read_buffer = <<>>, - read_buffer_size = 0, - last_used_at = Now }), + maybe_seek(Offset, reset_read_buffer( + Handle#handle{hdl = Hdl, + offset = 0, + last_used_at = Now})), put({Ref, fhc_handle}, Handle1), reopen(RefNewOrReopenHdls, gb_trees:insert(Now, Ref, Tree), [{Ref, Handle1} | RefHdls]); @@ -846,10 +852,9 @@ maybe_seek(NewOffset, Handle = #handle{hdl = Hdl, true -> case prim_file_position(Hdl, NewOffset) of {ok, Offset1} = Result -> - {Result, Handle#handle{offset = Offset1, - at_eof = AtEoF1, - read_buffer = <<>>, - read_buffer_size = 0}}; + {Result, reset_read_buffer( + Handle#handle{offset = Offset1, + at_eof = AtEoF1})}; {error, _} = Error -> {Error, Handle} end; @@ -903,6 +908,10 @@ write_buffer(Handle = #handle { hdl = Hdl, offset = Offset, {Error, Handle} end. +reset_read_buffer(Handle) -> + Handle#handle{read_buffer = <<>>, + read_buffer_size = 0}. + infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(total_limit, #fhc_state{limit = Limit}) -> Limit; -- cgit v1.2.1 From 246e224e0fd4d49de6cccb9f76f365a729da0c61 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Thu, 13 Nov 2014 11:10:14 +0000 Subject: Small refactor suggested by Matthias. --- src/file_handle_cache.erl | 34 ++++++++++++++-------------------- 1 file changed, 14 insertions(+), 20 deletions(-) diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index fa896c67..06a72aa4 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -845,27 +845,21 @@ maybe_seek(NewOffset, Handle = #handle{hdl = Hdl, at_eof = AtEoF}) -> {AtEoF1, NeedsSeek} = needs_seek(AtEoF, Offset, NewOffset), case NeedsSeek of + true when is_number(NewOffset) andalso + NewOffset >= Offset andalso NewOffset =< BufSz + Offset -> + Diff = NewOffset - Offset, + <<_:Diff/binary, Rest/binary>> = Buf, + {{ok, NewOffset}, Handle#handle{offset = NewOffset, + at_eof = AtEoF1, + read_buffer = Rest, + read_buffer_size = BufSz - Diff}}; true -> - case not is_number(NewOffset) orelse - NewOffset < Offset orelse - NewOffset > BufSz + Offset of - true -> - case prim_file_position(Hdl, NewOffset) of - {ok, Offset1} = Result -> - {Result, reset_read_buffer( - Handle#handle{offset = Offset1, - at_eof = AtEoF1})}; - {error, _} = Error -> - {Error, Handle} - end; - false -> - Diff = NewOffset - Offset, - <<_:Diff/binary, Rest/binary>> = Buf, - {{ok, NewOffset}, - Handle#handle{offset = NewOffset, - at_eof = AtEoF1, - read_buffer = Rest, - read_buffer_size = BufSz - Diff}} + case prim_file_position(Hdl, NewOffset) of + {ok, Offset1} = Result -> + {Result, reset_read_buffer(Handle#handle{offset = Offset1, + at_eof = AtEoF1})}; + {error, _} = Error -> + {Error, Handle} end; false -> {{ok, Offset}, Handle} -- cgit v1.2.1 From ae2b78b00a2c6844dfe0e0d68136ba104de359b2 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Thu, 13 Nov 2014 11:57:32 +0000 Subject: Remove a couple of TODOs, make the read buffer size configurable, and don't use the read buffer for the QI or msg store transform since they already read in decent sized chunks. --- src/file_handle_cache.erl | 14 ++++++++++---- src/rabbit_msg_store.erl | 3 ++- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 06a72aa4..2922e146 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -240,7 +240,8 @@ -spec(register_callback/3 :: (atom(), atom(), [any()]) -> 'ok'). -spec(open/3 :: (file:filename(), [any()], - [{'write_buffer', (non_neg_integer() | 'infinity' | 'unbuffered')}]) + [{'write_buffer', (non_neg_integer() | 'infinity' | 'unbuffered')} | + {'read_buffer', (non_neg_integer() | 'unbuffered')}]) -> val_or_error(ref())). -spec(close/1 :: (ref()) -> ok_or_error()). -spec(read/2 :: (ref(), non_neg_integer()) -> @@ -370,8 +371,8 @@ read(Ref, Count) -> end; eof -> {eof, [Handle #handle { at_eof = true }]}; - Error -> %% TODO correct or change handle? - {Error, [Handle]} + Error -> + {Error, [reset_read_buffer(Handle)]} end end). @@ -768,6 +769,11 @@ new_closed_handle(Path, Mode, Options) -> infinity -> infinity; N when is_integer(N) -> N end, + ReadBufferSize = + case proplists:get_value(read_buffer, Options, unbuffered) of + unbuffered -> 0; + N2 when is_integer(N2) -> N2 + end, Ref = make_ref(), put({Ref, fhc_handle}, #handle { hdl = closed, offset = 0, @@ -776,7 +782,7 @@ new_closed_handle(Path, Mode, Options) -> write_buffer_size_limit = WriteBufferSize, write_buffer = [], read_buffer_size = 0, - read_buffer_size_limit = 1000000, %% TODO + read_buffer_size_limit = ReadBufferSize, read_buffer = <<>>, at_eof = false, path = Path, diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index b829ae94..6c80ddcd 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -1299,7 +1299,8 @@ should_mask_action(CRef, MsgId, open_file(Dir, FileName, Mode) -> file_handle_cache:open(form_filename(Dir, FileName), ?BINARY_MODE ++ Mode, - [{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]). + [{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE}, + {read_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]). close_handle(Key, CState = #client_msstate { file_handle_cache = FHC }) -> CState #client_msstate { file_handle_cache = close_handle(Key, FHC) }; -- cgit v1.2.1