summaryrefslogtreecommitdiff
path: root/deps/amqp10_common
diff options
context:
space:
mode:
authorDavid Ansari <david.ansari@gmx.de>2022-05-16 08:46:33 +0000
committerDavid Ansari <david.ansari@gmx.de>2022-05-16 09:07:46 +0000
commit4472ddf71c7c9b63ddabb8b997c00e5f188527f9 (patch)
tree2c4a8825627ce325008f22496b51399748ee9ea4 /deps/amqp10_common
parent5cf3a43523cc88f16e577a35adb498624363e2b4 (diff)
downloadrabbitmq-server-git-4472ddf71c7c9b63ddabb8b997c00e5f188527f9.tar.gz
Increase receiving throughput from a stream via AMQP
This commit increases consumption throughput from a stream via AMQP 0.9.1 for 1 consumer by 83k msg/s or 55%, for 4 consumers by 140k msg/s or 44%. This commit tries to follow https://www.erlang.org/doc/efficiency_guide/binaryhandling.html by reusing match contexts instead of creating new sub-binaries. The CPU and mmap() memory flame graphs show that when producing and consuming from a stream via AMQP 0.9.1 module amqp10_binary_parser requires before this commit: 10.1% CPU time and 8.0% of mmap system calls after this commit: 2.6% CPU time 2.5% of mmap system calls Performance tests Start rabbitmq-server without any plugins enabled and with 4 schedulers: ``` make run-broker PLUGINS="" RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+JPperf true +S 4" ``` Test 1 Perf test client: ``` -x 1 -y 2 -qa x-queue-type=stream -ad false -f persistent -u s1 --qos 10000 --multi-ack-every 1000 -z 30 ``` master branch: sending rate avg msg/s 143k - 146k receiving rate avg msg/s 188k - 194k PR: sending rate avg 133k - 138k receiving rate avg 266k - 276k This shows that with AMQP 0.9.1 and a stream, prior to this commit the broker could not deliver messages to consumers as fast as they were published. After this commit, it can. Test 2 First, produce a few millions messages: ``` -x 1 -y 0 -qa x-queue-type=stream -ad false -f persistent -u s2 ``` Then, consume them: ``` -x 0 -y 1 -qa x-queue-type=stream -ad false -f persistent -u s2 --qos 10000 --multi-ack-every 1000 -ca x-stream-offset=first -z 30 ``` receving rate avg msg/s master branch: 147k - 156k PR: 230k - 237k Improvement: 83k / 55% Test 3 -x 0 -y 4 -qa x-queue-type=stream -ad false -f persistent -u s2 --qos 10000 --multi-ack-every 1000 -ca x-stream-offset=first -z 30 receving rate avg msg/s master branch: 313k - 319k PR: 450k - 461k Improvement: 140k / 44%
Diffstat (limited to 'deps/amqp10_common')
-rw-r--r--deps/amqp10_common/Makefile2
-rw-r--r--deps/amqp10_common/src/amqp10_binary_parser.erl118
-rw-r--r--deps/amqp10_common/src/amqp10_framing.erl7
-rw-r--r--deps/amqp10_common/test/binary_generator_SUITE.erl26
-rw-r--r--deps/amqp10_common/test/binary_parser_SUITE.erl58
5 files changed, 188 insertions, 23 deletions
diff --git a/deps/amqp10_common/Makefile b/deps/amqp10_common/Makefile
index 9967b2d1e0..dceae87a92 100644
--- a/deps/amqp10_common/Makefile
+++ b/deps/amqp10_common/Makefile
@@ -24,7 +24,7 @@ define HEX_TARBALL_EXTRA_METADATA
}
endef
-DIALYZER_OPTS += --src -r test
+DIALYZER_OPTS += --src -r test -DTEST
BUILD_DEPS = rabbit_common
# Variables and recipes in development.*.mk are meant to be used from
diff --git a/deps/amqp10_common/src/amqp10_binary_parser.erl b/deps/amqp10_common/src/amqp10_binary_parser.erl
index f4f33e15e9..78784d7502 100644
--- a/deps/amqp10_common/src/amqp10_binary_parser.erl
+++ b/deps/amqp10_common/src/amqp10_binary_parser.erl
@@ -11,16 +11,20 @@
-include("amqp10_framing.hrl").
-% -spec parse(binary()) -> tuple().
+-ifdef(TEST).
-parse_all(ValueBin) when is_binary(ValueBin) ->
- lists:reverse(parse_all([], parse(ValueBin))).
+-export([parse_all_int/1]).
-parse_all(Acc, {Value, <<>>}) -> [Value | Acc];
-parse_all(Acc, {Value, Rest}) -> parse_all([Value | Acc], parse(Rest)).
+parse_all_int(ValueBin) when is_binary(ValueBin) ->
+ lists:reverse(parse_all_int([], parse(ValueBin))).
+
+parse_all_int(Acc, {Value, <<>>}) -> [Value | Acc];
+parse_all_int(Acc, {Value, Rest}) -> parse_all_int([Value | Acc], parse(Rest)).
+
+-endif.
-spec parse(binary()) ->
- {amqp10_binary_generator:amqp10_type(), binary()}.
+ {amqp10_binary_generator:amqp10_type(), Rest :: binary()}.
parse(<<?DESCRIBED,Rest/binary>>) ->
parse_described(Rest);
parse(Rest) ->
@@ -180,3 +184,105 @@ mapify([]) ->
[];
mapify([Key, Value | Rest]) ->
[{Key, Value} | mapify(Rest)].
+
+%% parse_all/1 is much faster and much more memory efficient than parse/1.
+%%
+%% When compiling this module with environment variable ERL_COMPILER_OPTIONS=bin_opt_info,
+%% for parse/1 the compiler prints many times:
+%% "BINARY CREATED: binary is used in a term that is returned from the function"
+%% because sub binaries are created.
+%%
+%% For parse_all/1 the compiler prints many times:
+%% "OPTIMIZED: match context reused"
+%% because sub binaries are not created.
+%%
+%% See also https://www.erlang.org/doc/efficiency_guide/binaryhandling.html
+-spec parse_all(binary()) ->
+ [amqp10_binary_generator:amqp10_type()].
+
+parse_all(<<>>) ->
+ [];
+
+%% Described Types
+parse_all(<<?DESCRIBED, Rest0/binary>>) ->
+ [Descriptor, Value | Rest] = parse_all(Rest0),
+ [{described, Descriptor, Value} | Rest];
+
+%% Primitives Types
+%%
+%% Constants
+parse_all(<<16#40, R/binary>>) -> [null | parse_all(R)];
+parse_all(<<16#41, R/binary>>) -> [true | parse_all(R)];
+parse_all(<<16#42, R/binary>>) -> [false | parse_all(R)];
+parse_all(<<16#43, R/binary>>) -> [{uint, 0} | parse_all(R)];
+parse_all(<<16#44, R/binary>>) -> [{ulong, 0} | parse_all(R)];
+
+%% Fixed-widths. Most integral types have a compact encoding as a byte.
+parse_all(<<16#50, V:8/unsigned, R/binary>>) -> [{ubyte, V} | parse_all(R)];
+parse_all(<<16#51, V:8/signed, R/binary>>) -> [{byte, V} | parse_all(R)];
+parse_all(<<16#52, V:8/unsigned, R/binary>>) -> [{uint, V} | parse_all(R)];
+parse_all(<<16#53, V:8/unsigned, R/binary>>) -> [{ulong, V} | parse_all(R)];
+parse_all(<<16#54, V:8/signed, R/binary>>) -> [{int, V} | parse_all(R)];
+parse_all(<<16#55, V:8/signed, R/binary>>) -> [{long, V} | parse_all(R)];
+parse_all(<<16#56, 0:8/unsigned, R/binary>>) -> [{boolean, false} | parse_all(R)];
+parse_all(<<16#56, 1:8/unsigned, R/binary>>) -> [{boolean, true} | parse_all(R)];
+parse_all(<<16#60, V:16/unsigned, R/binary>>) -> [{ushort, V} | parse_all(R)];
+parse_all(<<16#61, V:16/signed, R/binary>>) -> [{short, V} | parse_all(R)];
+parse_all(<<16#70, V:32/unsigned, R/binary>>) -> [{uint, V} | parse_all(R)];
+parse_all(<<16#71, V:32/signed, R/binary>>) -> [{int, V} | parse_all(R)];
+parse_all(<<16#72, V:32/float, R/binary>>) -> [{float, V} | parse_all(R)];
+parse_all(<<16#73, Utf32:4/binary,R/binary>>) -> [{char, Utf32} | parse_all(R)];
+parse_all(<<16#80, V:64/unsigned, R/binary>>) -> [{ulong, V} | parse_all(R)];
+parse_all(<<16#81, V:64/signed, R/binary>>) -> [{long, V} | parse_all(R)];
+parse_all(<<16#82, V:64/float, R/binary>>) -> [{double, V} | parse_all(R)];
+parse_all(<<16#83, TS:64/signed, R/binary>>) -> [{timestamp, TS} | parse_all(R)];
+parse_all(<<16#98, Uuid:16/binary,R/binary>>) -> [{uuid, Uuid} | parse_all(R)];
+
+%% Variable-widths
+parse_all(<<16#a0, S:8/unsigned, V:S/binary,R/binary>>) -> [{binary, V} | parse_all(R)];
+parse_all(<<16#a1, S:8/unsigned, V:S/binary,R/binary>>) -> [{utf8, V} | parse_all(R)];
+parse_all(<<16#a3, S:8/unsigned, V:S/binary,R/binary>>) -> [{symbol, V} | parse_all(R)];
+parse_all(<<16#b3, S:32/unsigned,V:S/binary,R/binary>>) -> [{symbol, V} | parse_all(R)];
+parse_all(<<16#b0, S:32/unsigned,V:S/binary,R/binary>>) -> [{binary, V} | parse_all(R)];
+parse_all(<<16#b1, S:32/unsigned,V:S/binary,R/binary>>) -> [{utf8, V} | parse_all(R)];
+
+%% Compounds
+parse_all(<<16#45, R/binary>>) ->
+ [{list, []} | parse_all(R)];
+parse_all(<<16#c0, S:8/unsigned,CountAndValue:S/binary,R/binary>>) ->
+ [{list, parse_compound_all(8, CountAndValue)} | parse_all(R)];
+parse_all(<<16#c1, S:8/unsigned,CountAndValue:S/binary,R/binary>>) ->
+ List = parse_compound_all(8, CountAndValue),
+ [{map, mapify(List)} | parse_all(R)];
+parse_all(<<16#d0, S:32/unsigned,CountAndValue:S/binary,R/binary>>) ->
+ [{list, parse_compound_all(32, CountAndValue)} | parse_all(R)];
+parse_all(<<16#d1, S:32/unsigned,CountAndValue:S/binary,R/binary>>) ->
+ List = parse_compound_all(32, CountAndValue),
+ [{map, mapify(List)} | parse_all(R)];
+
+%% Arrays
+parse_all(<<16#e0, S:8/unsigned,CountAndV:S/binary,R/binary>>) ->
+ [parse_array(8, CountAndV) | parse_all(R)];
+parse_all(<<16#f0, S:32/unsigned,CountAndV:S/binary,R/binary>>) ->
+ [parse_array(32, CountAndV) | parse_all(R)];
+
+%% NaN or +-inf
+parse_all(<<16#72, V:32, R/binary>>) ->
+ [{as_is, 16#72, <<V:32>>} | parse_all(R)];
+parse_all(<<16#82, V:64, R/binary>>) ->
+ [{as_is, 16#82, <<V:64>>} | parse_all(R)];
+
+%% decimals
+parse_all(<<16#74, V:32, R/binary>>) ->
+ [{as_is, 16#74, <<V:32>>} | parse_all(R)];
+parse_all(<<16#84, V:64, R/binary>>) ->
+ [{as_is, 16#84, <<V:64>>} | parse_all(R)];
+parse_all(<<16#94, V:128, R/binary>>) ->
+ [{as_is, 16#94, <<V:128>>} | parse_all(R)];
+
+parse_all(<<Type, _Bin/binary>>) ->
+ throw({primitive_type_unsupported, Type, _Bin}).
+
+parse_compound_all(UnitSize, Bin) ->
+ <<_Count:UnitSize, Bin1/binary>> = Bin,
+ parse_all(Bin1).
diff --git a/deps/amqp10_common/src/amqp10_framing.erl b/deps/amqp10_common/src/amqp10_framing.erl
index fa991b297e..f1117959bf 100644
--- a/deps/amqp10_common/src/amqp10_framing.erl
+++ b/deps/amqp10_common/src/amqp10_framing.erl
@@ -172,13 +172,8 @@ encode(X) ->
encode_bin(X) ->
amqp10_binary_generator:generate(encode(X)).
-
decode_bin(X) ->
- [decode(PerfDesc) || PerfDesc <- decode_bin0(X)].
-
-decode_bin0(<<>>) -> [];
-decode_bin0(X) -> {PerfDesc, Rest} = amqp10_binary_parser:parse(X),
- [PerfDesc | decode_bin0(Rest)].
+ [decode(PerfDesc) || PerfDesc <- amqp10_binary_parser:parse_all(X)].
symbol_for(X) ->
amqp10_framing0:symbol_for(X).
diff --git a/deps/amqp10_common/test/binary_generator_SUITE.erl b/deps/amqp10_common/test/binary_generator_SUITE.erl
index 773e66aa07..bdb454c14d 100644
--- a/deps/amqp10_common/test/binary_generator_SUITE.erl
+++ b/deps/amqp10_common/test/binary_generator_SUITE.erl
@@ -20,11 +20,13 @@ all() ->
all_tests() ->
[
+ null,
booleans,
symbol,
timestamp,
numerals,
utf8,
+ char,
list,
map,
described,
@@ -33,7 +35,7 @@ all_tests() ->
groups() ->
[
- {tests, [], all_tests()}
+ {tests, [parallel], all_tests()}
].
init_per_suite(Config) ->
@@ -58,6 +60,10 @@ end_per_testcase(_TestCase, _Config) ->
%%% Test cases
%%%===================================================================
+null(_Config) ->
+ roundtrip(null),
+ ok.
+
booleans(_Config) ->
roundtrip(true),
roundtrip(false),
@@ -77,7 +83,8 @@ numerals(_Config) ->
roundtrip({ubyte, 16#FF}),
roundtrip({ushort, 0}),
roundtrip({ushort, 16#FFFF}),
- roundtrip({uint, 0}),
+ roundtrip({uint, 0}), %% uint:uint0
+ roundtrip({uint, 1}), %% uint:smalluint
roundtrip({uint, 16#FFFFFFFF}),
roundtrip({ulong, 0}),
roundtrip({ulong, 16#FFFFFFFFFFFFFFFF}),
@@ -106,7 +113,14 @@ utf8(_Config) ->
roundtrip({utf8, binary:copy(<<"asdfghjk">>, 64)}),
ok.
+char(_Config) ->
+ roundtrip({char, <<$A/utf32>>}),
+ ok.
+
list(_Config) ->
+ %% list:list0
+ roundtrip({list, []}),
+ %% list:list8
roundtrip({list, [{utf8, <<"hi">>},
{int, 123},
{binary, <<"data">>},
@@ -115,6 +129,8 @@ list(_Config) ->
{utf8, <<"URL">>},
{utf8, <<"http://example.org/hello-world">>}}
]}),
+ %% list:list32
+ roundtrip({list, [true || _ <- lists:seq(1, 256)]}),
ok.
map(_Config) ->
@@ -122,6 +138,7 @@ map(_Config) ->
{{utf8, <<"key1">>}, {utf8, <<"value1">>}},
{{utf8, <<"key2">>}, {int, 33}}
]}),
+ roundtrip({map, [{{int, N}, {utf8, <<"value">>}} || N <- lists:seq(1, 256)]}),
ok.
@@ -157,6 +174,8 @@ array(_Config) ->
roundtrip({array, {described, Desc, utf8},
[{described, Desc, {utf8, <<"http://example.org/hello">>}}]}),
roundtrip({array, {described, Desc, utf8}, []}),
+ %% array:array32
+ roundtrip({array, boolean, [{boolean, true} || _ <- lists:seq(1, 256)]}),
ok.
%% Utility
@@ -164,4 +183,5 @@ array(_Config) ->
roundtrip(Term) ->
Bin = iolist_to_binary(amqp10_binary_generator:generate(Term)),
% generate returns an iolist but parse expects a binary
- ?assertMatch({Term, _}, amqp10_binary_parser:parse(Bin)).
+ ?assertMatch({Term, _}, amqp10_binary_parser:parse(Bin)),
+ ?assertMatch([Term | _], amqp10_binary_parser:parse_all(Bin)).
diff --git a/deps/amqp10_common/test/binary_parser_SUITE.erl b/deps/amqp10_common/test/binary_parser_SUITE.erl
index 7c1b2322e3..551d59fe0c 100644
--- a/deps/amqp10_common/test/binary_parser_SUITE.erl
+++ b/deps/amqp10_common/test/binary_parser_SUITE.erl
@@ -20,12 +20,14 @@ all() ->
all_tests() ->
[
- array_with_extra_input
+ roundtrip,
+ array_with_extra_input,
+ unsupported_type
].
groups() ->
[
- {tests, [], all_tests()}
+ {tests, [parallel], all_tests()}
].
init_per_suite(Config) ->
@@ -50,10 +52,52 @@ end_per_testcase(_TestCase, _Config) ->
%%% Test cases
%%%===================================================================
+roundtrip(_Config) ->
+ Terms = [null,
+ {described,
+ {utf8, <<"URL">>},
+ {utf8, <<"http://example.org/hello-world">>}},
+ {described,
+ {utf8, <<"URL">>},
+ {utf8, <<"https://rabbitmq.com">>}},
+ {array, ubyte, [{ubyte, 1}, {ubyte, 255}]},
+ {boolean, false},
+ {list, [{utf8, <<"hi">>},
+ {described,
+ {utf8, <<"URL">>},
+ {utf8, <<"http://example.org/hello-world">>}}
+ ]},
+ {list, [{int, 123},
+ {array, int, [{int, 1}, {int, 2}, {int, 3}]}
+ ]},
+ {map, [
+ {{utf8, <<"key1">>}, {utf8, <<"value1">>}},
+ {{utf8, <<"key2">>}, {int, 33}}
+ ]},
+ {array, {described, {utf8, <<"URL">>}, utf8}, []},
+ false],
+
+ Bin = lists:foldl(
+ fun(T, Acc) ->
+ B = iolist_to_binary(amqp10_binary_generator:generate(T)),
+ <<Acc/binary, B/binary>>
+ end, <<>>, Terms),
+
+ ?assertEqual(Terms, amqp10_binary_parser:parse_all_int(Bin)),
+ ?assertEqual(Terms, amqp10_binary_parser:parse_all(Bin)).
+
array_with_extra_input(_Config) ->
Bin = <<83,16,192,85,10,177,0,0,0,1,48,161,12,114,97,98,98,105,116, 109,113,45,98,111,120,112,255,255,0,0,96,0,50,112,0,0,19,136,163,5,101,110,45,85,83,224,14,2,65,5,102,105,45,70,73,5,101,110,45,85,83,64,64,193,24,2,163,20,68,69,70,69,78,83,73,67,83,46,84,69,83,84,46,83,85,73,84,69,65>>,
- ?assertExit({failed_to_parse_array_extra_input_remaining,
- %% element type, input, accumulated result
- 65, <<105,45,70,73,5,101,110,45,85,83>>, [true,true]},
- amqp10_binary_parser:parse_all(Bin)),
- ok.
+
+ Expected = {failed_to_parse_array_extra_input_remaining,
+ %% element type, input, accumulated result
+ 65, <<105,45,70,73,5,101,110,45,85,83>>, [true,true]},
+
+ ?assertExit(Expected, amqp10_binary_parser:parse_all_int(Bin)),
+ ?assertExit(Expected, amqp10_binary_parser:parse_all(Bin)).
+
+unsupported_type(_Config) ->
+ Bin = <<2/integer, "hey">>,
+ Expected = {primitive_type_unsupported, 16#02, <<"hey">>},
+ ?assertThrow(Expected, amqp10_binary_parser:parse_all_int(Bin)),
+ ?assertThrow(Expected, amqp10_binary_parser:parse_all(Bin)).