summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-11-14 16:38:07 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-11-14 16:38:07 +0000
commit54f8c42b87e4131ec02e9103629b59147b14bbd2 (patch)
tree506e21908d8e88dc53da9a31b72c0e23df434931
parent9ae81860dc014327fd49ef6270f722d5ec0a38c4 (diff)
parentbbb6650d6a529201424a10b2d7c5c002d632e374 (diff)
downloadrabbitmq-server-54f8c42b87e4131ec02e9103629b59147b14bbd2.tar.gz
stable to default
-rw-r--r--src/file_handle_cache.erl161
-rw-r--r--src/file_handle_cache_stats.erl60
-rw-r--r--src/rabbit_binary_parser.erl102
-rw-r--r--src/rabbit_msg_store.erl3
4 files changed, 258 insertions, 68 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 3a7a692c..2922e146 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -178,6 +178,9 @@
write_buffer_size,
write_buffer_size_limit,
write_buffer,
+ read_buffer,
+ read_buffer_size,
+ read_buffer_size_limit,
at_eof,
path,
mode,
@@ -237,7 +240,8 @@
-spec(register_callback/3 :: (atom(), atom(), [any()]) -> 'ok').
-spec(open/3 ::
(file:filename(), [any()],
- [{'write_buffer', (non_neg_integer() | 'infinity' | 'unbuffered')}])
+ [{'write_buffer', (non_neg_integer() | 'infinity' | 'unbuffered')} |
+ {'read_buffer', (non_neg_integer() | 'unbuffered')}])
-> val_or_error(ref())).
-spec(close/1 :: (ref()) -> ok_or_error()).
-spec(read/2 :: (ref(), non_neg_integer()) ->
@@ -331,16 +335,44 @@ close(Ref) ->
read(Ref, Count) ->
with_flushed_handles(
- [Ref],
+ [Ref], keep,
fun ([#handle { is_read = false }]) ->
{error, not_open_for_reading};
- ([Handle = #handle { hdl = Hdl, offset = Offset }]) ->
- case prim_file:read(Hdl, Count) of
- {ok, Data} = Obj -> Offset1 = Offset + iolist_size(Data),
- {Obj,
- [Handle #handle { offset = Offset1 }]};
- eof -> {eof, [Handle #handle { at_eof = true }]};
- Error -> {Error, [Handle]}
+ ([Handle = #handle{read_buffer = Buf,
+ read_buffer_size = BufSz,
+ offset = Offset}]) when BufSz >= Count ->
+ <<Hd:Count/binary, Tl/binary>> = Buf,
+ {{ok, Hd}, [Handle#handle{offset = Offset + Count,
+ read_buffer = Tl,
+ read_buffer_size = BufSz - Count}]};
+ ([Handle = #handle{read_buffer = Buf,
+ read_buffer_size = BufSz,
+ read_buffer_size_limit = BufSzLimit,
+ hdl = Hdl,
+ offset = Offset}]) ->
+ WantedCount = Count - BufSz,
+ case prim_file_read(Hdl, lists:max([BufSzLimit, WantedCount])) of
+ {ok, Data} ->
+ ReadCount = size(Data),
+ case ReadCount < WantedCount of
+ true ->
+ OffSet1 = Offset + BufSz + ReadCount,
+ {{ok, <<Buf/binary, Data/binary>>},
+ [reset_read_buffer(
+ Handle#handle{offset = OffSet1})]};
+ false ->
+ <<Hd:WantedCount/binary, Tl/binary>> = Data,
+ OffSet1 = Offset + BufSz + WantedCount,
+ BufSz1 = ReadCount - WantedCount,
+ {{ok, <<Buf/binary, Hd/binary>>},
+ [Handle#handle{offset = OffSet1,
+ read_buffer = Tl,
+ read_buffer_size = BufSz1}]}
+ end;
+ eof ->
+ {eof, [Handle #handle { at_eof = true }]};
+ Error ->
+ {Error, [reset_read_buffer(Handle)]}
end
end).
@@ -355,7 +387,7 @@ append(Ref, Data) ->
write_buffer_size_limit = 0,
at_eof = true } = Handle1} ->
Offset1 = Offset + iolist_size(Data),
- {prim_file:write(Hdl, Data),
+ {prim_file_write(Hdl, Data),
[Handle1 #handle { is_dirty = true, offset = Offset1 }]};
{{ok, _Offset}, #handle { write_buffer = WriteBuffer,
write_buffer_size = Size,
@@ -377,12 +409,12 @@ append(Ref, Data) ->
sync(Ref) ->
with_flushed_handles(
- [Ref],
+ [Ref], keep,
fun ([#handle { is_dirty = false, write_buffer = [] }]) ->
ok;
([Handle = #handle { hdl = Hdl,
is_dirty = true, write_buffer = [] }]) ->
- case prim_file:sync(Hdl) of
+ case prim_file_sync(Hdl) of
ok -> {ok, [Handle #handle { is_dirty = false }]};
Error -> {Error, [Handle]}
end
@@ -397,7 +429,7 @@ needs_sync(Ref) ->
position(Ref, NewOffset) ->
with_flushed_handles(
- [Ref],
+ [Ref], keep,
fun ([Handle]) -> {Result, Handle1} = maybe_seek(NewOffset, Handle),
{Result, [Handle1]}
end).
@@ -465,8 +497,8 @@ clear(Ref) ->
fun ([#handle { at_eof = true, write_buffer_size = 0, offset = 0 }]) ->
ok;
([Handle]) ->
- case maybe_seek(bof, Handle #handle { write_buffer = [],
- write_buffer_size = 0 }) of
+ case maybe_seek(bof, Handle#handle{write_buffer = [],
+ write_buffer_size = 0}) of
{{ok, 0}, Handle1 = #handle { hdl = Hdl }} ->
case prim_file:truncate(Hdl) of
ok -> {ok, [Handle1 #handle { at_eof = true }]};
@@ -539,6 +571,21 @@ info(Items) -> gen_server2:call(?SERVER, {info, Items}, infinity).
%% Internal functions
%%----------------------------------------------------------------------------
+prim_file_read(Hdl, Size) ->
+ file_handle_cache_stats:update(
+ read, Size, fun() -> prim_file:read(Hdl, Size) end).
+
+prim_file_write(Hdl, Bytes) ->
+ file_handle_cache_stats:update(
+ write, iolist_size(Bytes), fun() -> prim_file:write(Hdl, Bytes) end).
+
+prim_file_sync(Hdl) ->
+ file_handle_cache_stats:update(sync, fun() -> prim_file:sync(Hdl) end).
+
+prim_file_position(Hdl, NewOffset) ->
+ file_handle_cache_stats:update(
+ seek, fun() -> prim_file:position(Hdl, NewOffset) end).
+
is_reader(Mode) -> lists:member(read, Mode).
is_writer(Mode) -> lists:member(write, Mode).
@@ -550,8 +597,15 @@ append_to_write(Mode) ->
end.
with_handles(Refs, Fun) ->
+ with_handles(Refs, reset, Fun).
+
+with_handles(Refs, ReadBuffer, Fun) ->
case get_or_reopen([{Ref, reopen} || Ref <- Refs]) of
- {ok, Handles} ->
+ {ok, Handles0} ->
+ Handles = case ReadBuffer of
+ reset -> [reset_read_buffer(H) || H <- Handles0];
+ keep -> Handles0
+ end,
case Fun(Handles) of
{Result, Handles1} when is_list(Handles1) ->
lists:zipwith(fun put_handle/2, Refs, Handles1),
@@ -564,8 +618,11 @@ with_handles(Refs, Fun) ->
end.
with_flushed_handles(Refs, Fun) ->
+ with_flushed_handles(Refs, reset, Fun).
+
+with_flushed_handles(Refs, ReadBuffer, Fun) ->
with_handles(
- Refs,
+ Refs, ReadBuffer,
fun (Handles) ->
case lists:foldl(
fun (Handle, {ok, HandlesAcc}) ->
@@ -611,20 +668,23 @@ reopen([], Tree, RefHdls) ->
{ok, lists:reverse(RefHdls)};
reopen([{Ref, NewOrReopen, Handle = #handle { hdl = closed,
path = Path,
- mode = Mode,
+ mode = Mode0,
offset = Offset,
last_used_at = undefined }} |
RefNewOrReopenHdls] = ToOpen, Tree, RefHdls) ->
- case prim_file:open(Path, case NewOrReopen of
- new -> Mode;
- reopen -> [read | Mode]
- end) of
+ Mode = case NewOrReopen of
+ new -> Mode0;
+ reopen -> file_handle_cache_stats:update(reopen),
+ [read | Mode0]
+ end,
+ case prim_file:open(Path, Mode) of
{ok, Hdl} ->
Now = now(),
{{ok, _Offset}, Handle1} =
- maybe_seek(Offset, Handle #handle { hdl = Hdl,
- offset = 0,
- last_used_at = Now }),
+ maybe_seek(Offset, reset_read_buffer(
+ Handle#handle{hdl = Hdl,
+ offset = 0,
+ last_used_at = Now})),
put({Ref, fhc_handle}, Handle1),
reopen(RefNewOrReopenHdls, gb_trees:insert(Now, Ref, Tree),
[{Ref, Handle1} | RefHdls]);
@@ -709,6 +769,11 @@ new_closed_handle(Path, Mode, Options) ->
infinity -> infinity;
N when is_integer(N) -> N
end,
+ ReadBufferSize =
+ case proplists:get_value(read_buffer, Options, unbuffered) of
+ unbuffered -> 0;
+ N2 when is_integer(N2) -> N2
+ end,
Ref = make_ref(),
put({Ref, fhc_handle}, #handle { hdl = closed,
offset = 0,
@@ -716,6 +781,9 @@ new_closed_handle(Path, Mode, Options) ->
write_buffer_size = 0,
write_buffer_size_limit = WriteBufferSize,
write_buffer = [],
+ read_buffer_size = 0,
+ read_buffer_size_limit = ReadBufferSize,
+ read_buffer = <<>>,
at_eof = false,
path = Path,
mode = Mode,
@@ -742,7 +810,7 @@ soft_close(Handle) ->
is_dirty = IsDirty,
last_used_at = Then } = Handle1 } ->
ok = case IsDirty of
- true -> prim_file:sync(Hdl);
+ true -> prim_file_sync(Hdl);
false -> ok
end,
ok = prim_file:close(Hdl),
@@ -776,17 +844,31 @@ hard_close(Handle) ->
Result
end.
-maybe_seek(NewOffset, Handle = #handle { hdl = Hdl, offset = Offset,
- at_eof = AtEoF }) ->
+maybe_seek(NewOffset, Handle = #handle{hdl = Hdl,
+ offset = Offset,
+ read_buffer = Buf,
+ read_buffer_size = BufSz,
+ at_eof = AtEoF}) ->
{AtEoF1, NeedsSeek} = needs_seek(AtEoF, Offset, NewOffset),
- case (case NeedsSeek of
- true -> prim_file:position(Hdl, NewOffset);
- false -> {ok, Offset}
- end) of
- {ok, Offset1} = Result ->
- {Result, Handle #handle { offset = Offset1, at_eof = AtEoF1 }};
- {error, _} = Error ->
- {Error, Handle}
+ case NeedsSeek of
+ true when is_number(NewOffset) andalso
+ NewOffset >= Offset andalso NewOffset =< BufSz + Offset ->
+ Diff = NewOffset - Offset,
+ <<_:Diff/binary, Rest/binary>> = Buf,
+ {{ok, NewOffset}, Handle#handle{offset = NewOffset,
+ at_eof = AtEoF1,
+ read_buffer = Rest,
+ read_buffer_size = BufSz - Diff}};
+ true ->
+ case prim_file_position(Hdl, NewOffset) of
+ {ok, Offset1} = Result ->
+ {Result, reset_read_buffer(Handle#handle{offset = Offset1,
+ at_eof = AtEoF1})};
+ {error, _} = Error ->
+ {Error, Handle}
+ end;
+ false ->
+ {{ok, Offset}, Handle}
end.
needs_seek( AtEoF, _CurOffset, cur ) -> {AtEoF, false};
@@ -817,7 +899,7 @@ write_buffer(Handle = #handle { hdl = Hdl, offset = Offset,
write_buffer = WriteBuffer,
write_buffer_size = DataSize,
at_eof = true }) ->
- case prim_file:write(Hdl, lists:reverse(WriteBuffer)) of
+ case prim_file_write(Hdl, lists:reverse(WriteBuffer)) of
ok ->
Offset1 = Offset + DataSize,
{ok, Handle #handle { offset = Offset1, is_dirty = true,
@@ -826,6 +908,10 @@ write_buffer(Handle = #handle { hdl = Hdl, offset = Offset,
{Error, Handle}
end.
+reset_read_buffer(Handle) ->
+ Handle#handle{read_buffer = <<>>,
+ read_buffer_size = 0}.
+
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(total_limit, #fhc_state{limit = Limit}) -> Limit;
@@ -843,6 +929,7 @@ used(#fhc_state{open_count = C1,
%%----------------------------------------------------------------------------
init([AlarmSet, AlarmClear]) ->
+ file_handle_cache_stats:init(),
Limit = case application:get_env(file_handles_high_watermark) of
{ok, Watermark} when (is_integer(Watermark) andalso
Watermark > 0) ->
diff --git a/src/file_handle_cache_stats.erl b/src/file_handle_cache_stats.erl
new file mode 100644
index 00000000..832f0b3d
--- /dev/null
+++ b/src/file_handle_cache_stats.erl
@@ -0,0 +1,60 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved.
+%%
+
+-module(file_handle_cache_stats).
+
+%% stats about read / write operations that go through the fhc.
+
+-export([init/0, update/3, update/2, update/1, get/0]).
+
+-define(TABLE, ?MODULE).
+
+init() ->
+ ets:new(?TABLE, [public, named_table]),
+ [ets:insert(?TABLE, {{Op, Counter}, 0}) || Op <- [read, write],
+ Counter <- [count, bytes, time]],
+ [ets:insert(?TABLE, {{Op, Counter}, 0}) || Op <- [sync, seek],
+ Counter <- [count, time]],
+ [ets:insert(?TABLE, {{Op, Counter}, 0}) || Op <- [reopen],
+ Counter <- [count]].
+
+update(Op, Bytes, Thunk) ->
+ {Time, Res} = timer_tc(Thunk),
+ ets:update_counter(?TABLE, {Op, count}, 1),
+ ets:update_counter(?TABLE, {Op, bytes}, Bytes),
+ ets:update_counter(?TABLE, {Op, time}, Time),
+ Res.
+
+update(Op, Thunk) ->
+ {Time, Res} = timer_tc(Thunk),
+ ets:update_counter(?TABLE, {Op, count}, 1),
+ ets:update_counter(?TABLE, {Op, time}, Time),
+ Res.
+
+update(Op) ->
+ ets:update_counter(?TABLE, {Op, count}, 1),
+ ok.
+
+get() ->
+ lists:sort(ets:tab2list(?TABLE)).
+
+%% TODO timer:tc/1 was introduced in R14B03; use that function once we
+%% require that version.
+timer_tc(Thunk) ->
+ T1 = os:timestamp(),
+ Res = Thunk(),
+ T2 = os:timestamp(),
+ {timer:now_diff(T2, T1), Res}.
diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl
index 3ab82cad..ee8147f4 100644
--- a/src/rabbit_binary_parser.erl
+++ b/src/rabbit_binary_parser.erl
@@ -41,48 +41,90 @@
%% parse_table supports the AMQP 0-8/0-9 standard types, S, I, D, T
%% and F, as well as the QPid extensions b, d, f, l, s, t, x, and V.
+-define(SIMPLE_PARSE_TABLE(BType, Pattern, RType),
+ parse_table(<<NLen:8/unsigned, NameString:NLen/binary,
+ BType, Pattern, Rest/binary>>) ->
+ [{NameString, RType, Value} | parse_table(Rest)]).
+
+%% Note that we try to put these in approximately the order we expect
+%% to hit them, that's why the empty binary is half way through.
+
+parse_table(<<NLen:8/unsigned, NameString:NLen/binary,
+ $S, VLen:32/unsigned, Value:VLen/binary, Rest/binary>>) ->
+ [{NameString, longstr, Value} | parse_table(Rest)];
+
+?SIMPLE_PARSE_TABLE($I, Value:32/signed, signedint);
+?SIMPLE_PARSE_TABLE($T, Value:64/unsigned, timestamp);
+
parse_table(<<>>) ->
[];
-parse_table(<<NLen:8/unsigned, NameString:NLen/binary, ValueAndRest/binary>>) ->
- {Type, Value, Rest} = parse_field_value(ValueAndRest),
- [{NameString, Type, Value} | parse_table(Rest)].
-parse_array(<<>>) ->
- [];
-parse_array(<<ValueAndRest/binary>>) ->
- {Type, Value, Rest} = parse_field_value(ValueAndRest),
- [{Type, Value} | parse_array(Rest)].
+?SIMPLE_PARSE_TABLE($b, Value:8/signed, byte);
+?SIMPLE_PARSE_TABLE($d, Value:64/float, double);
+?SIMPLE_PARSE_TABLE($f, Value:32/float, float);
+?SIMPLE_PARSE_TABLE($l, Value:64/signed, long);
+?SIMPLE_PARSE_TABLE($s, Value:16/signed, short);
+
+parse_table(<<NLen:8/unsigned, NameString:NLen/binary,
+ $t, Value:8/unsigned, Rest/binary>>) ->
+ [{NameString, bool, (Value /= 0)} | parse_table(Rest)];
+
+parse_table(<<NLen:8/unsigned, NameString:NLen/binary,
+ $D, Before:8/unsigned, After:32/unsigned, Rest/binary>>) ->
+ [{NameString, decimal, {Before, After}} | parse_table(Rest)];
-parse_field_value(<<$S, VLen:32/unsigned, V:VLen/binary, R/binary>>) ->
- {longstr, V, R};
+parse_table(<<NLen:8/unsigned, NameString:NLen/binary,
+ $F, VLen:32/unsigned, Value:VLen/binary, Rest/binary>>) ->
+ [{NameString, table, parse_table(Value)} | parse_table(Rest)];
-parse_field_value(<<$I, V:32/signed, R/binary>>) ->
- {signedint, V, R};
+parse_table(<<NLen:8/unsigned, NameString:NLen/binary,
+ $A, VLen:32/unsigned, Value:VLen/binary, Rest/binary>>) ->
+ [{NameString, array, parse_array(Value)} | parse_table(Rest)];
+
+parse_table(<<NLen:8/unsigned, NameString:NLen/binary,
+ $x, VLen:32/unsigned, Value:VLen/binary, Rest/binary>>) ->
+ [{NameString, binary, Value} | parse_table(Rest)];
+
+parse_table(<<NLen:8/unsigned, NameString:NLen/binary,
+ $V, Rest/binary>>) ->
+ [{NameString, void, undefined} | parse_table(Rest)].
+
+-define(SIMPLE_PARSE_ARRAY(BType, Pattern, RType),
+ parse_array(<<BType, Pattern, Rest/binary>>) ->
+ [{RType, Value} | parse_array(Rest)]).
+
+parse_array(<<$S, VLen:32/unsigned, Value:VLen/binary, Rest/binary>>) ->
+ [{longstr, Value} | parse_array(Rest)];
+
+?SIMPLE_PARSE_ARRAY($I, Value:32/signed, signedint);
+?SIMPLE_PARSE_ARRAY($T, Value:64/unsigned, timestamp);
+
+parse_array(<<>>) ->
+ [];
-parse_field_value(<<$D, Before:8/unsigned, After:32/unsigned, R/binary>>) ->
- {decimal, {Before, After}, R};
+?SIMPLE_PARSE_ARRAY($b, Value:8/signed, byte);
+?SIMPLE_PARSE_ARRAY($d, Value:64/float, double);
+?SIMPLE_PARSE_ARRAY($f, Value:32/float, float);
+?SIMPLE_PARSE_ARRAY($l, Value:64/signed, long);
+?SIMPLE_PARSE_ARRAY($s, Value:16/signed, short);
-parse_field_value(<<$T, V:64/unsigned, R/binary>>) ->
- {timestamp, V, R};
+parse_array(<<$t, Value:8/unsigned, Rest/binary>>) ->
+ [{bool, (Value /= 0)} | parse_array(Rest)];
-parse_field_value(<<$F, VLen:32/unsigned, Table:VLen/binary, R/binary>>) ->
- {table, parse_table(Table), R};
+parse_array(<<$D, Before:8/unsigned, After:32/unsigned, Rest/binary>>) ->
+ [{decimal, {Before, After}} | parse_array(Rest)];
-parse_field_value(<<$A, VLen:32/unsigned, Array:VLen/binary, R/binary>>) ->
- {array, parse_array(Array), R};
+parse_array(<<$F, VLen:32/unsigned, Value:VLen/binary, Rest/binary>>) ->
+ [{table, parse_table(Value)} | parse_array(Rest)];
-parse_field_value(<<$b, V:8/signed, R/binary>>) -> {byte, V, R};
-parse_field_value(<<$d, V:64/float, R/binary>>) -> {double, V, R};
-parse_field_value(<<$f, V:32/float, R/binary>>) -> {float, V, R};
-parse_field_value(<<$l, V:64/signed, R/binary>>) -> {long, V, R};
-parse_field_value(<<$s, V:16/signed, R/binary>>) -> {short, V, R};
-parse_field_value(<<$t, V:8/unsigned, R/binary>>) -> {bool, (V /= 0), R};
+parse_array(<<$A, VLen:32/unsigned, Value:VLen/binary, Rest/binary>>) ->
+ [{array, parse_array(Value)} | parse_array(Rest)];
-parse_field_value(<<$x, VLen:32/unsigned, V:VLen/binary, R/binary>>) ->
- {binary, V, R};
+parse_array(<<$x, VLen:32/unsigned, Value:VLen/binary, Rest/binary>>) ->
+ [{binary, Value} | parse_array(Rest)];
-parse_field_value(<<$V, R/binary>>) ->
- {void, undefined, R}.
+parse_array(<<$V, Rest/binary>>) ->
+ [{void, undefined} | parse_array(Rest)].
ensure_content_decoded(Content = #content{properties = Props})
when Props =/= none ->
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index b829ae94..6c80ddcd 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -1299,7 +1299,8 @@ should_mask_action(CRef, MsgId,
open_file(Dir, FileName, Mode) ->
file_handle_cache:open(form_filename(Dir, FileName), ?BINARY_MODE ++ Mode,
- [{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]).
+ [{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE},
+ {read_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]).
close_handle(Key, CState = #client_msstate { file_handle_cache = FHC }) ->
CState #client_msstate { file_handle_cache = close_handle(Key, FHC) };