diff options
author | Michael Klishin <klishinm@vmware.com> | 2022-09-20 17:54:50 +0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-09-20 17:54:50 +0400 |
commit | e1d4846025725037228d912859d8a45241789b3e (patch) | |
tree | 40d6e56cfd32b2df9485bb5887df02ac2a827efe | |
parent | ea2010fa95f9dcc525276b9432d5f9226b4f845d (diff) | |
parent | 86b0329932b3410b43a1cf9f153a7acbda5ecf3c (diff) | |
download | rabbitmq-server-git-e1d4846025725037228d912859d8a45241789b3e.tar.gz |
Merge pull request #5828 from rabbitmq/mergify/bp/v3.11.x/pr-4967
Implement header conversion between AMQP 1.0 and 0.9.1 (backport #4967)
-rw-r--r-- | deps/rabbit/src/rabbit_msg_record.erl | 4 | ||||
-rw-r--r-- | deps/rabbitmq_amqp1_0/BUILD.bazel | 4 | ||||
-rw-r--r-- | deps/rabbitmq_amqp1_0/README.md | 11 | ||||
-rw-r--r-- | deps/rabbitmq_amqp1_0/priv/schema/rabbitmq_amqp1_0.schema | 8 | ||||
-rw-r--r-- | deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_message.erl | 75 | ||||
-rw-r--r-- | deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl | 142 | ||||
-rw-r--r-- | deps/rabbitmq_amqp1_0/test/config_schema_SUITE.erl | 54 | ||||
-rw-r--r-- | deps/rabbitmq_amqp1_0/test/config_schema_SUITE_data/rabbitmq_amqp1_0.snippets | 17 |
8 files changed, 301 insertions, 14 deletions
diff --git a/deps/rabbit/src/rabbit_msg_record.erl b/deps/rabbit/src/rabbit_msg_record.erl index 8c45d080ac..cd02519004 100644 --- a/deps/rabbit/src/rabbit_msg_record.erl +++ b/deps/rabbit/src/rabbit_msg_record.erl @@ -7,7 +7,9 @@ to_amqp091/1, add_message_annotations/2, message_annotation/2, - message_annotation/3 + message_annotation/3, + from_091/2, + to_091/2 ]). -include_lib("rabbit_common/include/rabbit_framing.hrl"). diff --git a/deps/rabbitmq_amqp1_0/BUILD.bazel b/deps/rabbitmq_amqp1_0/BUILD.bazel index 5d0b2b2f58..fd303a0414 100644 --- a/deps/rabbitmq_amqp1_0/BUILD.bazel +++ b/deps/rabbitmq_amqp1_0/BUILD.bazel @@ -101,6 +101,10 @@ suites = [ "TMPDIR": "$TEST_TMPDIR", }, ), + rabbitmq_integration_suite( + PACKAGE, + name = "config_schema_SUITE", + ), rabbitmq_suite( name = "unit_SUITE", size = "small", diff --git a/deps/rabbitmq_amqp1_0/README.md b/deps/rabbitmq_amqp1_0/README.md index 10dd5a7df0..1b87dac33d 100644 --- a/deps/rabbitmq_amqp1_0/README.md +++ b/deps/rabbitmq_amqp1_0/README.md @@ -51,6 +51,17 @@ Configuration example using [sysctl config format](https://next.rabbitmq.com/con amqp1_0.protocol_strict_mode = false +Configuration for interoperability between AMQP 0.9.1 and AMQP 1.0. + +``` +# Conversion only handles simple types, such as strings, ints and booleans. +# Convert AMQP 0.9.1 message headers to application properties for an AMQP 1.0 consumer +amqp1_0.convert_amqp091_headers_to_app_props = false | true (default false) +# Convert AMQP 1.0 Application Properties to AMQP 0.9.1 headers +amqp1_0.convert_app_props_to_amqp091_headers = false | true (default false) + +``` + ## Clients we have tested The current field of AMQP 1.0 clients is somewhat limited. Therefore diff --git a/deps/rabbitmq_amqp1_0/priv/schema/rabbitmq_amqp1_0.schema b/deps/rabbitmq_amqp1_0/priv/schema/rabbitmq_amqp1_0.schema index e6cfb68262..09d2cd06b2 100644 --- a/deps/rabbitmq_amqp1_0/priv/schema/rabbitmq_amqp1_0.schema +++ b/deps/rabbitmq_amqp1_0/priv/schema/rabbitmq_amqp1_0.schema @@ -28,4 +28,10 @@ {translation , "rabbitmq_amqp1_0.default_vhost", fun(Conf) -> list_to_binary(cuttlefish:conf_get("amqp1_0.default_vhost", Conf)) -end}.
\ No newline at end of file +end}. + +{mapping, "amqp1_0.convert_amqp091_headers_to_app_props", "rabbitmq_amqp1_0.convert_amqp091_headers_to_app_props", + [{datatype, {enum, [true, false]}}]}. + +{mapping, "amqp1_0.convert_app_props_to_amqp091_headers", "rabbitmq_amqp1_0.convert_app_props_to_amqp091_headers", + [{datatype, {enum, [true, false]}}]}. diff --git a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_message.erl b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_message.erl index d3f3ee8ae9..2ab8806558 100644 --- a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_message.erl +++ b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_message.erl @@ -14,6 +14,9 @@ -define(MESSAGE_ANNOTATIONS_HEADER, <<"x-amqp-1.0-message-annotations">>). -define(STREAM_OFFSET_HEADER, <<"x-stream-offset">>). -define(FOOTER, <<"x-amqp-1.0-footer">>). +-define(CONVERT_AMQP091_HEADERS_TO_APP_PROPS, application:get_env(rabbitmq_amqp1_0, convert_amqp091_headers_to_app_props, false)). +-define(CONVERT_APP_PROPS_TO_AMQP091_HEADERS, application:get_env(rabbitmq_amqp1_0, convert_app_props_to_amqp091_headers, false)). + -include_lib("amqp_client/include/amqp_client.hrl"). -include("rabbit_amqp1_0.hrl"). @@ -58,9 +61,17 @@ assemble(properties, {R, P, C}, Else, Uneaten) -> assemble(app_properties, {R, P = #'P_basic'{headers = Headers}, C}, {#'v1_0.application_properties'{}, Rest}, Uneaten) -> AppPropsBin = chunk(Rest, Uneaten), + Amqp091Headers = case ?CONVERT_APP_PROPS_TO_AMQP091_HEADERS of + true -> + amqp10_app_props_to_amqp091_headers(Headers, AppPropsBin); + _ -> + Headers + end, + AppPropsAdded = set_header( + ?APP_PROPERTIES_HEADER, + AppPropsBin, Amqp091Headers), assemble(body, {R, P#'P_basic'{ - headers = set_header(?APP_PROPERTIES_HEADER, - AppPropsBin, Headers)}, C}, + headers = AppPropsAdded}, C}, decode_section(Rest), Rest); assemble(app_properties, {R, P, C}, Else, Uneaten) -> assemble(body, {R, P, C}, Else, Uneaten); @@ -254,7 +265,16 @@ annotated_message(RKey, #'basic.deliver'{redelivered = Redelivered}, {_, AppProps10Bin} -> AppProps10Bin; undefined -> - [] + case ?CONVERT_AMQP091_HEADERS_TO_APP_PROPS of + true -> + case amqp091_headers_to_amqp10_app_props(Headers) of + undefined -> []; + Other -> + amqp10_framing:encode_bin(Other) + end; + _ -> + [] + end end, DataBin = case Props#'P_basic'.type of <<"amqp-1.0">> -> @@ -282,3 +302,52 @@ map_add(_T, _Key, _Type, undefined, Acc) -> Acc; map_add(KeyType, Key, Type, Value, Acc) -> [{wrap(KeyType, Key), wrap(Type, Value)} | Acc]. + +amqp10_app_props_to_amqp091_headers(CurrentHeaders, AppPropsBin) -> + case amqp10_framing:decode_bin(AppPropsBin) of + [#'v1_0.application_properties'{ content = AppProps}] when is_list(AppProps) -> + Hs = case CurrentHeaders of + undefined -> []; + Headers -> Headers + end, + lists:foldl(fun(Prop, Acc) -> + case Prop of + {{utf8, Key}, {ValueType, Value}} -> + case type10_to_type091(Key, ValueType, Value) of + undefined -> Acc; + Typed -> [Typed |Acc] + end; + _ -> Acc + end + end, Hs, AppProps); + _ -> CurrentHeaders + end. +type10_to_type091(Key, Type, Value) -> + try + rabbit_msg_record:to_091(Key, {Type, Value}) + catch + _:function_clause -> undefined + end. + +amqp091_headers_to_amqp10_app_props(undefined) -> undefined; +amqp091_headers_to_amqp10_app_props(Headers) when is_list(Headers) -> + AppPropsOut = lists:foldl(fun(H, Acc) -> + case H of + {Key, Type, Value} -> + case type091_to_type10(Type, Value) of + undefined -> Acc; + Typed -> + [{{utf8, Key}, Typed}|Acc] + end; + _ -> Acc + end + end, [], Headers), + #'v1_0.application_properties'{content = AppPropsOut}. + +type091_to_type10(Type, Value) -> + try + rabbit_msg_record:from_091(Type, Value) + catch + _:function_clause -> undefined + end. + diff --git a/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl b/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl index cb1e90c1d2..aedd4873eb 100644 --- a/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl +++ b/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl @@ -9,7 +9,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). --include_lib("rabbit_common/include/rabbit_framing.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). -compile(export_all). @@ -22,7 +22,8 @@ all() -> groups() -> [ {tests, [], [ - roundtrip_quorum_queue_with_drain + roundtrip_quorum_queue_with_drain, + message_headers_conversion ]}, {metrics, [], [ auth_attempt_metrics @@ -82,7 +83,7 @@ roundtrip_quorum_queue_with_drain(Config) -> port => Port, container_id => atom_to_binary(?FUNCTION_NAME, utf8), sasl => {plain, <<"guest">>, <<"guest">>}}, - % ct:pal("opening connectoin with ~p", [OpnConf]), + {ok, Connection} = amqp10_client:open_connection(OpnConf), {ok, Session} = amqp10_client:begin_session(Connection), SenderLinkName = <<"test-sender">>, @@ -90,12 +91,7 @@ roundtrip_quorum_queue_with_drain(Config) -> SenderLinkName, Address), - % wait for credit to be received - receive - {amqp10_event, {link, Sender, credited}} -> ok - after 2000 -> - exit(credited_timeout) - end, + wait_for_credit(Sender), % create a new message using a delivery-tag, body and indicate % it's settlement status (true meaning no disposition confirmation @@ -135,6 +131,75 @@ roundtrip_quorum_queue_with_drain(Config) -> ok = amqp10_client:close_connection(Connection), ok. +message_headers_conversion(Config) -> + Host = ?config(rmq_hostname, Config), + Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), + QName = atom_to_binary(?FUNCTION_NAME, utf8), + Address = <<"/amq/queue/", QName/binary>>, + %% declare a quorum queue + Ch = rabbit_ct_client_helpers:open_channel(Config, 0), + amqp_channel:call(Ch, #'queue.declare'{queue = QName, + durable = true, + arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}]}), + + rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env,[rabbitmq_amqp1_0, convert_amqp091_headers_to_app_props, true]), + rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env,[rabbitmq_amqp1_0, convert_app_props_to_amqp091_headers, true]), + + OpnConf = #{address => Host, + port => Port, + container_id => atom_to_binary(?FUNCTION_NAME, utf8), + sasl => {plain, <<"guest">>, <<"guest">>}}, + + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session(Connection), + + amqp10_to_amqp091_header_conversion(Session, Ch, QName, Address), + + amqp091_to_amqp10_header_conversion(Session, Ch, QName, Address), + delete_queue(Config, QName), + ok = amqp10_client:close_connection(Connection), + ok. + +amqp10_to_amqp091_header_conversion(Session,Ch, QName, Address) -> + {ok, Sender} = create_amqp10_sender(Session, Address), + + OutMsg = amqp10_msg:new(<<"my-tag">>, <<"my-body">>, true), + OutMsg2 = amqp10_msg:set_application_properties(#{ + "x-string" => "string-value", + "x-int" => 3, + "x-bool" => true + }, OutMsg), + ok = amqp10_client:send_msg(Sender, OutMsg2), + wait_for_accepts(1), + + {ok, Headers} = amqp091_get_msg_headers(Ch, QName), + + ?assertEqual({bool, true}, rabbit_misc:table_lookup(Headers, <<"x-bool">>)), + ?assertEqual({unsignedint, 3}, rabbit_misc:table_lookup(Headers, <<"x-int">>)), + ?assertEqual({longstr, <<"string-value">>}, rabbit_misc:table_lookup(Headers, <<"x-string">>)). + + +amqp091_to_amqp10_header_conversion(Session, Ch, QName, Address) -> + Amqp091Headers = [{<<"x-forwarding">>, array, + [{table, [{<<"uri">>, longstr, + <<"amqp://localhost/%2F/upstream">>}]}]}, + {<<"x-string">>, longstr, "my-string"}, + {<<"x-int">>, long, 92}, + {<<"x-bool">>, bool, true}], + + amqp_channel:cast(Ch, + #'basic.publish'{exchange = <<"">>, routing_key = QName}, + #amqp_msg{props = #'P_basic'{ + headers = Amqp091Headers}, + payload = <<"foobar">> } + ), + + {ok, [Msg]} = drain_queue(Session, Address, 1), + Amqp10Props = amqp10_msg:application_properties(Msg), + ?assertEqual(true, maps:get(<<"x-bool">>, Amqp10Props, undefined)), + ?assertEqual(92, maps:get(<<"x-int">>, Amqp10Props, undefined)), + ?assertEqual(<<"my-string">>, maps:get(<<"x-string">>, Amqp10Props, undefined)). + auth_attempt_metrics(Config) -> Host = ?config(rmq_hostname, Config), Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), @@ -185,3 +250,62 @@ open_and_close_connection(OpnConf) -> {ok, Connection} = amqp10_client:open_connection(OpnConf), {ok, _} = amqp10_client:begin_session(Connection), ok = amqp10_client:close_connection(Connection). + +% before we can send messages we have to wait for credit from the server +wait_for_credit(Sender) -> + receive + {amqp10_event, {link, Sender, credited}} -> + ok + after 5000 -> + flush("Credit timed out"), + exit(credited_timeout) + end. + +wait_for_accepts(0) -> ok; +wait_for_accepts(N) -> + receive + {amqp10_disposition,{accepted,_}} -> wait_for_accepts(N -1) + after 250 -> + ok + end. +delete_queue(Config, QName) -> + Ch = rabbit_ct_client_helpers:open_channel(Config, 0), + _ = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), + rabbit_ct_client_helpers:close_channel(Ch). + + +amqp091_get_msg_headers(Channel, QName) -> + {#'basic.get_ok'{}, #amqp_msg{props = #'P_basic'{ headers= Headers}}} + = amqp_channel:call(Channel, #'basic.get'{queue = QName, no_ack = true}), + {ok, Headers}. + +create_amqp10_sender(Session, Address) -> + SenderLinkName = <<"test-sender">>, + {ok, Sender} = amqp10_client:attach_sender_link(Session, + SenderLinkName, + Address), + wait_for_credit(Sender), + {ok, Sender}. + + drain_queue(Session, Address, N) -> + flush("Before drain_queue"), + {ok, Receiver} = amqp10_client:attach_receiver_link(Session, + <<"test-receiver">>, + Address, + settled, + configuration), + + ok = amqp10_client:flow_link_credit(Receiver, 1000, never, true), + Msgs = receive_message(Receiver, N, []), + flush("after drain"), + ok = amqp10_client:detach_link(Receiver), + {ok, Msgs}. + +receive_message(_Receiver, 0, Acc) -> lists:reverse(Acc); +receive_message(Receiver, N, Acc) -> + receive + {amqp10_msg, Receiver, Msg} -> + receive_message(Receiver, N-1, [Msg | Acc]) + after 5000 -> + exit(receive_timed_out) + end. diff --git a/deps/rabbitmq_amqp1_0/test/config_schema_SUITE.erl b/deps/rabbitmq_amqp1_0/test/config_schema_SUITE.erl new file mode 100644 index 0000000000..37133b1cfc --- /dev/null +++ b/deps/rabbitmq_amqp1_0/test/config_schema_SUITE.erl @@ -0,0 +1,54 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2022 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(config_schema_SUITE). + +-compile(export_all). + +all() -> + [ + run_snippets + ]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + Config1 = rabbit_ct_helpers:run_setup_steps(Config), + rabbit_ct_config_schema:init_schemas(rabbitmq_amqp1_0, Config1). + + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodename_suffix, Testcase} + ]), + rabbit_ct_helpers:run_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_testcase(Testcase, Config) -> + Config1 = rabbit_ct_helpers:run_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()), + rabbit_ct_helpers:testcase_finished(Config1, Testcase). + +%% ------------------------------------------------------------------- +%% Testcases. +%% ------------------------------------------------------------------- + +run_snippets(Config) -> + ok = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, run_snippets1, [Config]). + +run_snippets1(Config) -> + rabbit_ct_config_schema:run_snippets(Config). diff --git a/deps/rabbitmq_amqp1_0/test/config_schema_SUITE_data/rabbitmq_amqp1_0.snippets b/deps/rabbitmq_amqp1_0/test/config_schema_SUITE_data/rabbitmq_amqp1_0.snippets new file mode 100644 index 0000000000..41a19193ca --- /dev/null +++ b/deps/rabbitmq_amqp1_0/test/config_schema_SUITE_data/rabbitmq_amqp1_0.snippets @@ -0,0 +1,17 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +[ + {rabbitmq_amqp1_0_config, + "amqp1_0.convert_amqp091_headers_to_app_props = true + amqp1_0.convert_app_props_to_amqp091_headers = true", + [{rabbitmq_amqp1_0,[ + {convert_amqp091_headers_to_app_props, true}, + {convert_app_props_to_amqp091_headers, true} + ]}], + [rabbitmq_amqp1_0]} +]. |