diff options
author | David Ansari <david.ansari@gmx.de> | 2022-05-16 08:46:33 +0000 |
---|---|---|
committer | David Ansari <david.ansari@gmx.de> | 2022-05-16 09:07:46 +0000 |
commit | 4472ddf71c7c9b63ddabb8b997c00e5f188527f9 (patch) | |
tree | 2c4a8825627ce325008f22496b51399748ee9ea4 /deps/amqp10_common | |
parent | 5cf3a43523cc88f16e577a35adb498624363e2b4 (diff) | |
download | rabbitmq-server-git-4472ddf71c7c9b63ddabb8b997c00e5f188527f9.tar.gz |
Increase receiving throughput from a stream via AMQP
This commit increases consumption throughput from a stream via AMQP 0.9.1
for 1 consumer by 83k msg/s or 55%,
for 4 consumers by 140k msg/s or 44%.
This commit tries to follow https://www.erlang.org/doc/efficiency_guide/binaryhandling.html
by reusing match contexts instead of creating new sub-binaries.
The CPU and mmap() memory flame graphs show that
when producing and consuming from a stream via AMQP 0.9.1
module amqp10_binary_parser requires
before this commit: 10.1% CPU time and 8.0% of mmap system calls
after this commit: 2.6% CPU time 2.5% of mmap system calls
Performance tests
Start rabbitmq-server without any plugins enabled and with 4 schedulers:
```
make run-broker PLUGINS="" RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+JPperf true +S 4"
```
Test 1
Perf test client:
```
-x 1 -y 2 -qa x-queue-type=stream -ad false -f persistent -u s1 --qos 10000 --multi-ack-every 1000 -z 30
```
master branch:
sending rate avg msg/s 143k - 146k
receiving rate avg msg/s 188k - 194k
PR:
sending rate avg 133k - 138k
receiving rate avg 266k - 276k
This shows that with AMQP 0.9.1 and a stream, prior to this commit the broker could not
deliver messages to consumers as fast as they were published.
After this commit, it can.
Test 2
First, produce a few millions messages:
```
-x 1 -y 0 -qa x-queue-type=stream -ad false -f persistent -u s2
```
Then, consume them:
```
-x 0 -y 1 -qa x-queue-type=stream -ad false -f persistent -u s2 --qos 10000 --multi-ack-every 1000 -ca x-stream-offset=first -z 30
```
receving rate avg msg/s
master branch:
147k - 156k
PR:
230k - 237k
Improvement: 83k / 55%
Test 3
-x 0 -y 4 -qa x-queue-type=stream -ad false -f persistent -u s2 --qos 10000 --multi-ack-every 1000 -ca x-stream-offset=first -z 30
receving rate avg msg/s
master branch:
313k - 319k
PR:
450k - 461k
Improvement: 140k / 44%
Diffstat (limited to 'deps/amqp10_common')
-rw-r--r-- | deps/amqp10_common/Makefile | 2 | ||||
-rw-r--r-- | deps/amqp10_common/src/amqp10_binary_parser.erl | 118 | ||||
-rw-r--r-- | deps/amqp10_common/src/amqp10_framing.erl | 7 | ||||
-rw-r--r-- | deps/amqp10_common/test/binary_generator_SUITE.erl | 26 | ||||
-rw-r--r-- | deps/amqp10_common/test/binary_parser_SUITE.erl | 58 |
5 files changed, 188 insertions, 23 deletions
diff --git a/deps/amqp10_common/Makefile b/deps/amqp10_common/Makefile index 9967b2d1e0..dceae87a92 100644 --- a/deps/amqp10_common/Makefile +++ b/deps/amqp10_common/Makefile @@ -24,7 +24,7 @@ define HEX_TARBALL_EXTRA_METADATA } endef -DIALYZER_OPTS += --src -r test +DIALYZER_OPTS += --src -r test -DTEST BUILD_DEPS = rabbit_common # Variables and recipes in development.*.mk are meant to be used from diff --git a/deps/amqp10_common/src/amqp10_binary_parser.erl b/deps/amqp10_common/src/amqp10_binary_parser.erl index f4f33e15e9..78784d7502 100644 --- a/deps/amqp10_common/src/amqp10_binary_parser.erl +++ b/deps/amqp10_common/src/amqp10_binary_parser.erl @@ -11,16 +11,20 @@ -include("amqp10_framing.hrl"). -% -spec parse(binary()) -> tuple(). +-ifdef(TEST). -parse_all(ValueBin) when is_binary(ValueBin) -> - lists:reverse(parse_all([], parse(ValueBin))). +-export([parse_all_int/1]). -parse_all(Acc, {Value, <<>>}) -> [Value | Acc]; -parse_all(Acc, {Value, Rest}) -> parse_all([Value | Acc], parse(Rest)). +parse_all_int(ValueBin) when is_binary(ValueBin) -> + lists:reverse(parse_all_int([], parse(ValueBin))). + +parse_all_int(Acc, {Value, <<>>}) -> [Value | Acc]; +parse_all_int(Acc, {Value, Rest}) -> parse_all_int([Value | Acc], parse(Rest)). + +-endif. -spec parse(binary()) -> - {amqp10_binary_generator:amqp10_type(), binary()}. + {amqp10_binary_generator:amqp10_type(), Rest :: binary()}. parse(<<?DESCRIBED,Rest/binary>>) -> parse_described(Rest); parse(Rest) -> @@ -180,3 +184,105 @@ mapify([]) -> []; mapify([Key, Value | Rest]) -> [{Key, Value} | mapify(Rest)]. + +%% parse_all/1 is much faster and much more memory efficient than parse/1. +%% +%% When compiling this module with environment variable ERL_COMPILER_OPTIONS=bin_opt_info, +%% for parse/1 the compiler prints many times: +%% "BINARY CREATED: binary is used in a term that is returned from the function" +%% because sub binaries are created. +%% +%% For parse_all/1 the compiler prints many times: +%% "OPTIMIZED: match context reused" +%% because sub binaries are not created. +%% +%% See also https://www.erlang.org/doc/efficiency_guide/binaryhandling.html +-spec parse_all(binary()) -> + [amqp10_binary_generator:amqp10_type()]. + +parse_all(<<>>) -> + []; + +%% Described Types +parse_all(<<?DESCRIBED, Rest0/binary>>) -> + [Descriptor, Value | Rest] = parse_all(Rest0), + [{described, Descriptor, Value} | Rest]; + +%% Primitives Types +%% +%% Constants +parse_all(<<16#40, R/binary>>) -> [null | parse_all(R)]; +parse_all(<<16#41, R/binary>>) -> [true | parse_all(R)]; +parse_all(<<16#42, R/binary>>) -> [false | parse_all(R)]; +parse_all(<<16#43, R/binary>>) -> [{uint, 0} | parse_all(R)]; +parse_all(<<16#44, R/binary>>) -> [{ulong, 0} | parse_all(R)]; + +%% Fixed-widths. Most integral types have a compact encoding as a byte. +parse_all(<<16#50, V:8/unsigned, R/binary>>) -> [{ubyte, V} | parse_all(R)]; +parse_all(<<16#51, V:8/signed, R/binary>>) -> [{byte, V} | parse_all(R)]; +parse_all(<<16#52, V:8/unsigned, R/binary>>) -> [{uint, V} | parse_all(R)]; +parse_all(<<16#53, V:8/unsigned, R/binary>>) -> [{ulong, V} | parse_all(R)]; +parse_all(<<16#54, V:8/signed, R/binary>>) -> [{int, V} | parse_all(R)]; +parse_all(<<16#55, V:8/signed, R/binary>>) -> [{long, V} | parse_all(R)]; +parse_all(<<16#56, 0:8/unsigned, R/binary>>) -> [{boolean, false} | parse_all(R)]; +parse_all(<<16#56, 1:8/unsigned, R/binary>>) -> [{boolean, true} | parse_all(R)]; +parse_all(<<16#60, V:16/unsigned, R/binary>>) -> [{ushort, V} | parse_all(R)]; +parse_all(<<16#61, V:16/signed, R/binary>>) -> [{short, V} | parse_all(R)]; +parse_all(<<16#70, V:32/unsigned, R/binary>>) -> [{uint, V} | parse_all(R)]; +parse_all(<<16#71, V:32/signed, R/binary>>) -> [{int, V} | parse_all(R)]; +parse_all(<<16#72, V:32/float, R/binary>>) -> [{float, V} | parse_all(R)]; +parse_all(<<16#73, Utf32:4/binary,R/binary>>) -> [{char, Utf32} | parse_all(R)]; +parse_all(<<16#80, V:64/unsigned, R/binary>>) -> [{ulong, V} | parse_all(R)]; +parse_all(<<16#81, V:64/signed, R/binary>>) -> [{long, V} | parse_all(R)]; +parse_all(<<16#82, V:64/float, R/binary>>) -> [{double, V} | parse_all(R)]; +parse_all(<<16#83, TS:64/signed, R/binary>>) -> [{timestamp, TS} | parse_all(R)]; +parse_all(<<16#98, Uuid:16/binary,R/binary>>) -> [{uuid, Uuid} | parse_all(R)]; + +%% Variable-widths +parse_all(<<16#a0, S:8/unsigned, V:S/binary,R/binary>>) -> [{binary, V} | parse_all(R)]; +parse_all(<<16#a1, S:8/unsigned, V:S/binary,R/binary>>) -> [{utf8, V} | parse_all(R)]; +parse_all(<<16#a3, S:8/unsigned, V:S/binary,R/binary>>) -> [{symbol, V} | parse_all(R)]; +parse_all(<<16#b3, S:32/unsigned,V:S/binary,R/binary>>) -> [{symbol, V} | parse_all(R)]; +parse_all(<<16#b0, S:32/unsigned,V:S/binary,R/binary>>) -> [{binary, V} | parse_all(R)]; +parse_all(<<16#b1, S:32/unsigned,V:S/binary,R/binary>>) -> [{utf8, V} | parse_all(R)]; + +%% Compounds +parse_all(<<16#45, R/binary>>) -> + [{list, []} | parse_all(R)]; +parse_all(<<16#c0, S:8/unsigned,CountAndValue:S/binary,R/binary>>) -> + [{list, parse_compound_all(8, CountAndValue)} | parse_all(R)]; +parse_all(<<16#c1, S:8/unsigned,CountAndValue:S/binary,R/binary>>) -> + List = parse_compound_all(8, CountAndValue), + [{map, mapify(List)} | parse_all(R)]; +parse_all(<<16#d0, S:32/unsigned,CountAndValue:S/binary,R/binary>>) -> + [{list, parse_compound_all(32, CountAndValue)} | parse_all(R)]; +parse_all(<<16#d1, S:32/unsigned,CountAndValue:S/binary,R/binary>>) -> + List = parse_compound_all(32, CountAndValue), + [{map, mapify(List)} | parse_all(R)]; + +%% Arrays +parse_all(<<16#e0, S:8/unsigned,CountAndV:S/binary,R/binary>>) -> + [parse_array(8, CountAndV) | parse_all(R)]; +parse_all(<<16#f0, S:32/unsigned,CountAndV:S/binary,R/binary>>) -> + [parse_array(32, CountAndV) | parse_all(R)]; + +%% NaN or +-inf +parse_all(<<16#72, V:32, R/binary>>) -> + [{as_is, 16#72, <<V:32>>} | parse_all(R)]; +parse_all(<<16#82, V:64, R/binary>>) -> + [{as_is, 16#82, <<V:64>>} | parse_all(R)]; + +%% decimals +parse_all(<<16#74, V:32, R/binary>>) -> + [{as_is, 16#74, <<V:32>>} | parse_all(R)]; +parse_all(<<16#84, V:64, R/binary>>) -> + [{as_is, 16#84, <<V:64>>} | parse_all(R)]; +parse_all(<<16#94, V:128, R/binary>>) -> + [{as_is, 16#94, <<V:128>>} | parse_all(R)]; + +parse_all(<<Type, _Bin/binary>>) -> + throw({primitive_type_unsupported, Type, _Bin}). + +parse_compound_all(UnitSize, Bin) -> + <<_Count:UnitSize, Bin1/binary>> = Bin, + parse_all(Bin1). diff --git a/deps/amqp10_common/src/amqp10_framing.erl b/deps/amqp10_common/src/amqp10_framing.erl index fa991b297e..f1117959bf 100644 --- a/deps/amqp10_common/src/amqp10_framing.erl +++ b/deps/amqp10_common/src/amqp10_framing.erl @@ -172,13 +172,8 @@ encode(X) -> encode_bin(X) -> amqp10_binary_generator:generate(encode(X)). - decode_bin(X) -> - [decode(PerfDesc) || PerfDesc <- decode_bin0(X)]. - -decode_bin0(<<>>) -> []; -decode_bin0(X) -> {PerfDesc, Rest} = amqp10_binary_parser:parse(X), - [PerfDesc | decode_bin0(Rest)]. + [decode(PerfDesc) || PerfDesc <- amqp10_binary_parser:parse_all(X)]. symbol_for(X) -> amqp10_framing0:symbol_for(X). diff --git a/deps/amqp10_common/test/binary_generator_SUITE.erl b/deps/amqp10_common/test/binary_generator_SUITE.erl index 773e66aa07..bdb454c14d 100644 --- a/deps/amqp10_common/test/binary_generator_SUITE.erl +++ b/deps/amqp10_common/test/binary_generator_SUITE.erl @@ -20,11 +20,13 @@ all() -> all_tests() -> [ + null, booleans, symbol, timestamp, numerals, utf8, + char, list, map, described, @@ -33,7 +35,7 @@ all_tests() -> groups() -> [ - {tests, [], all_tests()} + {tests, [parallel], all_tests()} ]. init_per_suite(Config) -> @@ -58,6 +60,10 @@ end_per_testcase(_TestCase, _Config) -> %%% Test cases %%%=================================================================== +null(_Config) -> + roundtrip(null), + ok. + booleans(_Config) -> roundtrip(true), roundtrip(false), @@ -77,7 +83,8 @@ numerals(_Config) -> roundtrip({ubyte, 16#FF}), roundtrip({ushort, 0}), roundtrip({ushort, 16#FFFF}), - roundtrip({uint, 0}), + roundtrip({uint, 0}), %% uint:uint0 + roundtrip({uint, 1}), %% uint:smalluint roundtrip({uint, 16#FFFFFFFF}), roundtrip({ulong, 0}), roundtrip({ulong, 16#FFFFFFFFFFFFFFFF}), @@ -106,7 +113,14 @@ utf8(_Config) -> roundtrip({utf8, binary:copy(<<"asdfghjk">>, 64)}), ok. +char(_Config) -> + roundtrip({char, <<$A/utf32>>}), + ok. + list(_Config) -> + %% list:list0 + roundtrip({list, []}), + %% list:list8 roundtrip({list, [{utf8, <<"hi">>}, {int, 123}, {binary, <<"data">>}, @@ -115,6 +129,8 @@ list(_Config) -> {utf8, <<"URL">>}, {utf8, <<"http://example.org/hello-world">>}} ]}), + %% list:list32 + roundtrip({list, [true || _ <- lists:seq(1, 256)]}), ok. map(_Config) -> @@ -122,6 +138,7 @@ map(_Config) -> {{utf8, <<"key1">>}, {utf8, <<"value1">>}}, {{utf8, <<"key2">>}, {int, 33}} ]}), + roundtrip({map, [{{int, N}, {utf8, <<"value">>}} || N <- lists:seq(1, 256)]}), ok. @@ -157,6 +174,8 @@ array(_Config) -> roundtrip({array, {described, Desc, utf8}, [{described, Desc, {utf8, <<"http://example.org/hello">>}}]}), roundtrip({array, {described, Desc, utf8}, []}), + %% array:array32 + roundtrip({array, boolean, [{boolean, true} || _ <- lists:seq(1, 256)]}), ok. %% Utility @@ -164,4 +183,5 @@ array(_Config) -> roundtrip(Term) -> Bin = iolist_to_binary(amqp10_binary_generator:generate(Term)), % generate returns an iolist but parse expects a binary - ?assertMatch({Term, _}, amqp10_binary_parser:parse(Bin)). + ?assertMatch({Term, _}, amqp10_binary_parser:parse(Bin)), + ?assertMatch([Term | _], amqp10_binary_parser:parse_all(Bin)). diff --git a/deps/amqp10_common/test/binary_parser_SUITE.erl b/deps/amqp10_common/test/binary_parser_SUITE.erl index 7c1b2322e3..551d59fe0c 100644 --- a/deps/amqp10_common/test/binary_parser_SUITE.erl +++ b/deps/amqp10_common/test/binary_parser_SUITE.erl @@ -20,12 +20,14 @@ all() -> all_tests() -> [ - array_with_extra_input + roundtrip, + array_with_extra_input, + unsupported_type ]. groups() -> [ - {tests, [], all_tests()} + {tests, [parallel], all_tests()} ]. init_per_suite(Config) -> @@ -50,10 +52,52 @@ end_per_testcase(_TestCase, _Config) -> %%% Test cases %%%=================================================================== +roundtrip(_Config) -> + Terms = [null, + {described, + {utf8, <<"URL">>}, + {utf8, <<"http://example.org/hello-world">>}}, + {described, + {utf8, <<"URL">>}, + {utf8, <<"https://rabbitmq.com">>}}, + {array, ubyte, [{ubyte, 1}, {ubyte, 255}]}, + {boolean, false}, + {list, [{utf8, <<"hi">>}, + {described, + {utf8, <<"URL">>}, + {utf8, <<"http://example.org/hello-world">>}} + ]}, + {list, [{int, 123}, + {array, int, [{int, 1}, {int, 2}, {int, 3}]} + ]}, + {map, [ + {{utf8, <<"key1">>}, {utf8, <<"value1">>}}, + {{utf8, <<"key2">>}, {int, 33}} + ]}, + {array, {described, {utf8, <<"URL">>}, utf8}, []}, + false], + + Bin = lists:foldl( + fun(T, Acc) -> + B = iolist_to_binary(amqp10_binary_generator:generate(T)), + <<Acc/binary, B/binary>> + end, <<>>, Terms), + + ?assertEqual(Terms, amqp10_binary_parser:parse_all_int(Bin)), + ?assertEqual(Terms, amqp10_binary_parser:parse_all(Bin)). + array_with_extra_input(_Config) -> Bin = <<83,16,192,85,10,177,0,0,0,1,48,161,12,114,97,98,98,105,116, 109,113,45,98,111,120,112,255,255,0,0,96,0,50,112,0,0,19,136,163,5,101,110,45,85,83,224,14,2,65,5,102,105,45,70,73,5,101,110,45,85,83,64,64,193,24,2,163,20,68,69,70,69,78,83,73,67,83,46,84,69,83,84,46,83,85,73,84,69,65>>, - ?assertExit({failed_to_parse_array_extra_input_remaining, - %% element type, input, accumulated result - 65, <<105,45,70,73,5,101,110,45,85,83>>, [true,true]}, - amqp10_binary_parser:parse_all(Bin)), - ok. + + Expected = {failed_to_parse_array_extra_input_remaining, + %% element type, input, accumulated result + 65, <<105,45,70,73,5,101,110,45,85,83>>, [true,true]}, + + ?assertExit(Expected, amqp10_binary_parser:parse_all_int(Bin)), + ?assertExit(Expected, amqp10_binary_parser:parse_all(Bin)). + +unsupported_type(_Config) -> + Bin = <<2/integer, "hey">>, + Expected = {primitive_type_unsupported, 16#02, <<"hey">>}, + ?assertThrow(Expected, amqp10_binary_parser:parse_all_int(Bin)), + ?assertThrow(Expected, amqp10_binary_parser:parse_all(Bin)). |