summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <klishinm@vmware.com>2022-09-20 17:54:50 +0400
committerGitHub <noreply@github.com>2022-09-20 17:54:50 +0400
commite1d4846025725037228d912859d8a45241789b3e (patch)
tree40d6e56cfd32b2df9485bb5887df02ac2a827efe
parentea2010fa95f9dcc525276b9432d5f9226b4f845d (diff)
parent86b0329932b3410b43a1cf9f153a7acbda5ecf3c (diff)
downloadrabbitmq-server-git-e1d4846025725037228d912859d8a45241789b3e.tar.gz
Merge pull request #5828 from rabbitmq/mergify/bp/v3.11.x/pr-4967
Implement header conversion between AMQP 1.0 and 0.9.1 (backport #4967)
-rw-r--r--deps/rabbit/src/rabbit_msg_record.erl4
-rw-r--r--deps/rabbitmq_amqp1_0/BUILD.bazel4
-rw-r--r--deps/rabbitmq_amqp1_0/README.md11
-rw-r--r--deps/rabbitmq_amqp1_0/priv/schema/rabbitmq_amqp1_0.schema8
-rw-r--r--deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_message.erl75
-rw-r--r--deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl142
-rw-r--r--deps/rabbitmq_amqp1_0/test/config_schema_SUITE.erl54
-rw-r--r--deps/rabbitmq_amqp1_0/test/config_schema_SUITE_data/rabbitmq_amqp1_0.snippets17
8 files changed, 301 insertions, 14 deletions
diff --git a/deps/rabbit/src/rabbit_msg_record.erl b/deps/rabbit/src/rabbit_msg_record.erl
index 8c45d080ac..cd02519004 100644
--- a/deps/rabbit/src/rabbit_msg_record.erl
+++ b/deps/rabbit/src/rabbit_msg_record.erl
@@ -7,7 +7,9 @@
to_amqp091/1,
add_message_annotations/2,
message_annotation/2,
- message_annotation/3
+ message_annotation/3,
+ from_091/2,
+ to_091/2
]).
-include_lib("rabbit_common/include/rabbit_framing.hrl").
diff --git a/deps/rabbitmq_amqp1_0/BUILD.bazel b/deps/rabbitmq_amqp1_0/BUILD.bazel
index 5d0b2b2f58..fd303a0414 100644
--- a/deps/rabbitmq_amqp1_0/BUILD.bazel
+++ b/deps/rabbitmq_amqp1_0/BUILD.bazel
@@ -101,6 +101,10 @@ suites = [
"TMPDIR": "$TEST_TMPDIR",
},
),
+ rabbitmq_integration_suite(
+ PACKAGE,
+ name = "config_schema_SUITE",
+ ),
rabbitmq_suite(
name = "unit_SUITE",
size = "small",
diff --git a/deps/rabbitmq_amqp1_0/README.md b/deps/rabbitmq_amqp1_0/README.md
index 10dd5a7df0..1b87dac33d 100644
--- a/deps/rabbitmq_amqp1_0/README.md
+++ b/deps/rabbitmq_amqp1_0/README.md
@@ -51,6 +51,17 @@ Configuration example using [sysctl config format](https://next.rabbitmq.com/con
amqp1_0.protocol_strict_mode = false
+Configuration for interoperability between AMQP 0.9.1 and AMQP 1.0.
+
+```
+# Conversion only handles simple types, such as strings, ints and booleans.
+# Convert AMQP 0.9.1 message headers to application properties for an AMQP 1.0 consumer
+amqp1_0.convert_amqp091_headers_to_app_props = false | true (default false)
+# Convert AMQP 1.0 Application Properties to AMQP 0.9.1 headers
+amqp1_0.convert_app_props_to_amqp091_headers = false | true (default false)
+
+```
+
## Clients we have tested
The current field of AMQP 1.0 clients is somewhat limited. Therefore
diff --git a/deps/rabbitmq_amqp1_0/priv/schema/rabbitmq_amqp1_0.schema b/deps/rabbitmq_amqp1_0/priv/schema/rabbitmq_amqp1_0.schema
index e6cfb68262..09d2cd06b2 100644
--- a/deps/rabbitmq_amqp1_0/priv/schema/rabbitmq_amqp1_0.schema
+++ b/deps/rabbitmq_amqp1_0/priv/schema/rabbitmq_amqp1_0.schema
@@ -28,4 +28,10 @@
{translation , "rabbitmq_amqp1_0.default_vhost",
fun(Conf) ->
list_to_binary(cuttlefish:conf_get("amqp1_0.default_vhost", Conf))
-end}. \ No newline at end of file
+end}.
+
+{mapping, "amqp1_0.convert_amqp091_headers_to_app_props", "rabbitmq_amqp1_0.convert_amqp091_headers_to_app_props",
+ [{datatype, {enum, [true, false]}}]}.
+
+{mapping, "amqp1_0.convert_app_props_to_amqp091_headers", "rabbitmq_amqp1_0.convert_app_props_to_amqp091_headers",
+ [{datatype, {enum, [true, false]}}]}.
diff --git a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_message.erl b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_message.erl
index d3f3ee8ae9..2ab8806558 100644
--- a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_message.erl
+++ b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_message.erl
@@ -14,6 +14,9 @@
-define(MESSAGE_ANNOTATIONS_HEADER, <<"x-amqp-1.0-message-annotations">>).
-define(STREAM_OFFSET_HEADER, <<"x-stream-offset">>).
-define(FOOTER, <<"x-amqp-1.0-footer">>).
+-define(CONVERT_AMQP091_HEADERS_TO_APP_PROPS, application:get_env(rabbitmq_amqp1_0, convert_amqp091_headers_to_app_props, false)).
+-define(CONVERT_APP_PROPS_TO_AMQP091_HEADERS, application:get_env(rabbitmq_amqp1_0, convert_app_props_to_amqp091_headers, false)).
+
-include_lib("amqp_client/include/amqp_client.hrl").
-include("rabbit_amqp1_0.hrl").
@@ -58,9 +61,17 @@ assemble(properties, {R, P, C}, Else, Uneaten) ->
assemble(app_properties, {R, P = #'P_basic'{headers = Headers}, C},
{#'v1_0.application_properties'{}, Rest}, Uneaten) ->
AppPropsBin = chunk(Rest, Uneaten),
+ Amqp091Headers = case ?CONVERT_APP_PROPS_TO_AMQP091_HEADERS of
+ true ->
+ amqp10_app_props_to_amqp091_headers(Headers, AppPropsBin);
+ _ ->
+ Headers
+ end,
+ AppPropsAdded = set_header(
+ ?APP_PROPERTIES_HEADER,
+ AppPropsBin, Amqp091Headers),
assemble(body, {R, P#'P_basic'{
- headers = set_header(?APP_PROPERTIES_HEADER,
- AppPropsBin, Headers)}, C},
+ headers = AppPropsAdded}, C},
decode_section(Rest), Rest);
assemble(app_properties, {R, P, C}, Else, Uneaten) ->
assemble(body, {R, P, C}, Else, Uneaten);
@@ -254,7 +265,16 @@ annotated_message(RKey, #'basic.deliver'{redelivered = Redelivered},
{_, AppProps10Bin} ->
AppProps10Bin;
undefined ->
- []
+ case ?CONVERT_AMQP091_HEADERS_TO_APP_PROPS of
+ true ->
+ case amqp091_headers_to_amqp10_app_props(Headers) of
+ undefined -> [];
+ Other ->
+ amqp10_framing:encode_bin(Other)
+ end;
+ _ ->
+ []
+ end
end,
DataBin = case Props#'P_basic'.type of
<<"amqp-1.0">> ->
@@ -282,3 +302,52 @@ map_add(_T, _Key, _Type, undefined, Acc) ->
Acc;
map_add(KeyType, Key, Type, Value, Acc) ->
[{wrap(KeyType, Key), wrap(Type, Value)} | Acc].
+
+amqp10_app_props_to_amqp091_headers(CurrentHeaders, AppPropsBin) ->
+ case amqp10_framing:decode_bin(AppPropsBin) of
+ [#'v1_0.application_properties'{ content = AppProps}] when is_list(AppProps) ->
+ Hs = case CurrentHeaders of
+ undefined -> [];
+ Headers -> Headers
+ end,
+ lists:foldl(fun(Prop, Acc) ->
+ case Prop of
+ {{utf8, Key}, {ValueType, Value}} ->
+ case type10_to_type091(Key, ValueType, Value) of
+ undefined -> Acc;
+ Typed -> [Typed |Acc]
+ end;
+ _ -> Acc
+ end
+ end, Hs, AppProps);
+ _ -> CurrentHeaders
+ end.
+type10_to_type091(Key, Type, Value) ->
+ try
+ rabbit_msg_record:to_091(Key, {Type, Value})
+ catch
+ _:function_clause -> undefined
+ end.
+
+amqp091_headers_to_amqp10_app_props(undefined) -> undefined;
+amqp091_headers_to_amqp10_app_props(Headers) when is_list(Headers) ->
+ AppPropsOut = lists:foldl(fun(H, Acc) ->
+ case H of
+ {Key, Type, Value} ->
+ case type091_to_type10(Type, Value) of
+ undefined -> Acc;
+ Typed ->
+ [{{utf8, Key}, Typed}|Acc]
+ end;
+ _ -> Acc
+ end
+ end, [], Headers),
+ #'v1_0.application_properties'{content = AppPropsOut}.
+
+type091_to_type10(Type, Value) ->
+ try
+ rabbit_msg_record:from_091(Type, Value)
+ catch
+ _:function_clause -> undefined
+ end.
+
diff --git a/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl b/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl
index cb1e90c1d2..aedd4873eb 100644
--- a/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl
+++ b/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl
@@ -9,7 +9,7 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
--include_lib("rabbit_common/include/rabbit_framing.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
-compile(export_all).
@@ -22,7 +22,8 @@ all() ->
groups() ->
[
{tests, [], [
- roundtrip_quorum_queue_with_drain
+ roundtrip_quorum_queue_with_drain,
+ message_headers_conversion
]},
{metrics, [], [
auth_attempt_metrics
@@ -82,7 +83,7 @@ roundtrip_quorum_queue_with_drain(Config) ->
port => Port,
container_id => atom_to_binary(?FUNCTION_NAME, utf8),
sasl => {plain, <<"guest">>, <<"guest">>}},
- % ct:pal("opening connectoin with ~p", [OpnConf]),
+
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session(Connection),
SenderLinkName = <<"test-sender">>,
@@ -90,12 +91,7 @@ roundtrip_quorum_queue_with_drain(Config) ->
SenderLinkName,
Address),
- % wait for credit to be received
- receive
- {amqp10_event, {link, Sender, credited}} -> ok
- after 2000 ->
- exit(credited_timeout)
- end,
+ wait_for_credit(Sender),
% create a new message using a delivery-tag, body and indicate
% it's settlement status (true meaning no disposition confirmation
@@ -135,6 +131,75 @@ roundtrip_quorum_queue_with_drain(Config) ->
ok = amqp10_client:close_connection(Connection),
ok.
+message_headers_conversion(Config) ->
+ Host = ?config(rmq_hostname, Config),
+ Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
+ QName = atom_to_binary(?FUNCTION_NAME, utf8),
+ Address = <<"/amq/queue/", QName/binary>>,
+ %% declare a quorum queue
+ Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
+ amqp_channel:call(Ch, #'queue.declare'{queue = QName,
+ durable = true,
+ arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}]}),
+
+ rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env,[rabbitmq_amqp1_0, convert_amqp091_headers_to_app_props, true]),
+ rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env,[rabbitmq_amqp1_0, convert_app_props_to_amqp091_headers, true]),
+
+ OpnConf = #{address => Host,
+ port => Port,
+ container_id => atom_to_binary(?FUNCTION_NAME, utf8),
+ sasl => {plain, <<"guest">>, <<"guest">>}},
+
+ {ok, Connection} = amqp10_client:open_connection(OpnConf),
+ {ok, Session} = amqp10_client:begin_session(Connection),
+
+ amqp10_to_amqp091_header_conversion(Session, Ch, QName, Address),
+
+ amqp091_to_amqp10_header_conversion(Session, Ch, QName, Address),
+ delete_queue(Config, QName),
+ ok = amqp10_client:close_connection(Connection),
+ ok.
+
+amqp10_to_amqp091_header_conversion(Session,Ch, QName, Address) ->
+ {ok, Sender} = create_amqp10_sender(Session, Address),
+
+ OutMsg = amqp10_msg:new(<<"my-tag">>, <<"my-body">>, true),
+ OutMsg2 = amqp10_msg:set_application_properties(#{
+ "x-string" => "string-value",
+ "x-int" => 3,
+ "x-bool" => true
+ }, OutMsg),
+ ok = amqp10_client:send_msg(Sender, OutMsg2),
+ wait_for_accepts(1),
+
+ {ok, Headers} = amqp091_get_msg_headers(Ch, QName),
+
+ ?assertEqual({bool, true}, rabbit_misc:table_lookup(Headers, <<"x-bool">>)),
+ ?assertEqual({unsignedint, 3}, rabbit_misc:table_lookup(Headers, <<"x-int">>)),
+ ?assertEqual({longstr, <<"string-value">>}, rabbit_misc:table_lookup(Headers, <<"x-string">>)).
+
+
+amqp091_to_amqp10_header_conversion(Session, Ch, QName, Address) ->
+ Amqp091Headers = [{<<"x-forwarding">>, array,
+ [{table, [{<<"uri">>, longstr,
+ <<"amqp://localhost/%2F/upstream">>}]}]},
+ {<<"x-string">>, longstr, "my-string"},
+ {<<"x-int">>, long, 92},
+ {<<"x-bool">>, bool, true}],
+
+ amqp_channel:cast(Ch,
+ #'basic.publish'{exchange = <<"">>, routing_key = QName},
+ #amqp_msg{props = #'P_basic'{
+ headers = Amqp091Headers},
+ payload = <<"foobar">> }
+ ),
+
+ {ok, [Msg]} = drain_queue(Session, Address, 1),
+ Amqp10Props = amqp10_msg:application_properties(Msg),
+ ?assertEqual(true, maps:get(<<"x-bool">>, Amqp10Props, undefined)),
+ ?assertEqual(92, maps:get(<<"x-int">>, Amqp10Props, undefined)),
+ ?assertEqual(<<"my-string">>, maps:get(<<"x-string">>, Amqp10Props, undefined)).
+
auth_attempt_metrics(Config) ->
Host = ?config(rmq_hostname, Config),
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
@@ -185,3 +250,62 @@ open_and_close_connection(OpnConf) ->
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, _} = amqp10_client:begin_session(Connection),
ok = amqp10_client:close_connection(Connection).
+
+% before we can send messages we have to wait for credit from the server
+wait_for_credit(Sender) ->
+ receive
+ {amqp10_event, {link, Sender, credited}} ->
+ ok
+ after 5000 ->
+ flush("Credit timed out"),
+ exit(credited_timeout)
+ end.
+
+wait_for_accepts(0) -> ok;
+wait_for_accepts(N) ->
+ receive
+ {amqp10_disposition,{accepted,_}} -> wait_for_accepts(N -1)
+ after 250 ->
+ ok
+ end.
+delete_queue(Config, QName) ->
+ Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
+ _ = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
+ rabbit_ct_client_helpers:close_channel(Ch).
+
+
+amqp091_get_msg_headers(Channel, QName) ->
+ {#'basic.get_ok'{}, #amqp_msg{props = #'P_basic'{ headers= Headers}}}
+ = amqp_channel:call(Channel, #'basic.get'{queue = QName, no_ack = true}),
+ {ok, Headers}.
+
+create_amqp10_sender(Session, Address) ->
+ SenderLinkName = <<"test-sender">>,
+ {ok, Sender} = amqp10_client:attach_sender_link(Session,
+ SenderLinkName,
+ Address),
+ wait_for_credit(Sender),
+ {ok, Sender}.
+
+ drain_queue(Session, Address, N) ->
+ flush("Before drain_queue"),
+ {ok, Receiver} = amqp10_client:attach_receiver_link(Session,
+ <<"test-receiver">>,
+ Address,
+ settled,
+ configuration),
+
+ ok = amqp10_client:flow_link_credit(Receiver, 1000, never, true),
+ Msgs = receive_message(Receiver, N, []),
+ flush("after drain"),
+ ok = amqp10_client:detach_link(Receiver),
+ {ok, Msgs}.
+
+receive_message(_Receiver, 0, Acc) -> lists:reverse(Acc);
+receive_message(Receiver, N, Acc) ->
+ receive
+ {amqp10_msg, Receiver, Msg} ->
+ receive_message(Receiver, N-1, [Msg | Acc])
+ after 5000 ->
+ exit(receive_timed_out)
+ end.
diff --git a/deps/rabbitmq_amqp1_0/test/config_schema_SUITE.erl b/deps/rabbitmq_amqp1_0/test/config_schema_SUITE.erl
new file mode 100644
index 0000000000..37133b1cfc
--- /dev/null
+++ b/deps/rabbitmq_amqp1_0/test/config_schema_SUITE.erl
@@ -0,0 +1,54 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2022 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(config_schema_SUITE).
+
+-compile(export_all).
+
+all() ->
+ [
+ run_snippets
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ Config1 = rabbit_ct_helpers:run_setup_steps(Config),
+ rabbit_ct_config_schema:init_schemas(rabbitmq_amqp1_0, Config1).
+
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodename_suffix, Testcase}
+ ]),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()).
+
+end_per_testcase(Testcase, Config) ->
+ Config1 = rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()),
+ rabbit_ct_helpers:testcase_finished(Config1, Testcase).
+
+%% -------------------------------------------------------------------
+%% Testcases.
+%% -------------------------------------------------------------------
+
+run_snippets(Config) ->
+ ok = rabbit_ct_broker_helpers:rpc(Config, 0,
+ ?MODULE, run_snippets1, [Config]).
+
+run_snippets1(Config) ->
+ rabbit_ct_config_schema:run_snippets(Config).
diff --git a/deps/rabbitmq_amqp1_0/test/config_schema_SUITE_data/rabbitmq_amqp1_0.snippets b/deps/rabbitmq_amqp1_0/test/config_schema_SUITE_data/rabbitmq_amqp1_0.snippets
new file mode 100644
index 0000000000..41a19193ca
--- /dev/null
+++ b/deps/rabbitmq_amqp1_0/test/config_schema_SUITE_data/rabbitmq_amqp1_0.snippets
@@ -0,0 +1,17 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+[
+ {rabbitmq_amqp1_0_config,
+ "amqp1_0.convert_amqp091_headers_to_app_props = true
+ amqp1_0.convert_app_props_to_amqp091_headers = true",
+ [{rabbitmq_amqp1_0,[
+ {convert_amqp091_headers_to_app_props, true},
+ {convert_app_props_to_amqp091_headers, true}
+ ]}],
+ [rabbitmq_amqp1_0]}
+].