From 044f6e3bac20af1a2f6391ba0c670a8b704f0e31 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Fri, 12 May 2023 14:12:50 +0000 Subject: Move plugin rabbitmq-message-timestamp to the core As reported in https://groups.google.com/g/rabbitmq-users/c/x8ACs4dBlkI/ plugins that implement rabbit_channel_interceptor break with Native MQTT in 3.12 because Native MQTT does not use rabbit_channel anymore. Specifically, these plugins don't work anymore in 3.12 when sending a message from an MQTT publisher to an AMQP 0.9.1 consumer. Two of these plugins are https://github.com/rabbitmq/rabbitmq-message-timestamp and https://github.com/rabbitmq/rabbitmq-routing-node-stamp This commit moves both plugins into rabbitmq-server. Therefore, these plugins are deprecated starting in 3.12. Instead of using these plugins, the user gets the same behaviour by configuring rabbitmq.conf as follows: ``` incoming_message_interceptors.set_header_timestamp.overwrite = false incoming_message_interceptors.set_header_routing_node.overwrite = false ``` While both plugins were incompatible to be used together, this commit allows setting both headers. We name the top level configuration key `incoming_message_interceptors` because only incoming messages are intercepted. Currently, only `set_header_timestamp` and `set_header_routing_node` are supported. (We might support more in the future.) Both can set `overwrite` to `false` or `true`. The meaning of `overwrite` is the same as documented in https://github.com/rabbitmq/rabbitmq-message-timestamp#always-overwrite-timestamps i.e. whether headers should be overwritten if they are already present in the message. Both `set_header_timestamp` and `set_header_routing_node` behave exactly to plugins `rabbitmq-message-timestamp` and `rabbitmq-routing-node-stamp`, respectively. Upon node boot, the configuration is put into persistent_term to not cause any performance penalty in the default case where these settings are disabled. The channel and MQTT connection process will intercept incoming messages and - if configured - add the desired AMQP 0.9.1 headers. For now, this allows using Native MQTT in 3.12 with the old plugins behaviour. In the future, once "message containers" are implemented, we can think about more generic message interceptors where plugins can be written to modify arbitrary headers or message contents for various protocols. Likewise, in the future, once MQTT 5.0 is implemented, we can think about an MQTT connection interceptor which could function similar to a `rabbit_channel_interceptor` allowing to modify any MQTT packet. --- deps/rabbit/BUILD.bazel | 5 + deps/rabbit/app.bzl | 12 +++ deps/rabbit/priv/schema/rabbit.schema | 26 +++++ deps/rabbit/src/rabbit.erl | 16 +-- deps/rabbit/src/rabbit_channel.erl | 3 +- deps/rabbit/src/rabbit_message_interceptor.erl | 65 ++++++++++++ .../test/config_schema_SUITE_data/rabbit.snippets | 22 ++++ .../test/rabbit_message_interceptor_SUITE.erl | 112 +++++++++++++++++++++ deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl | 15 +-- deps/rabbitmq_mqtt/test/shared_SUITE.erl | 31 ++++++ 10 files changed, 292 insertions(+), 15 deletions(-) create mode 100644 deps/rabbit/src/rabbit_message_interceptor.erl create mode 100644 deps/rabbit/test/rabbit_message_interceptor_SUITE.erl (limited to 'deps') diff --git a/deps/rabbit/BUILD.bazel b/deps/rabbit/BUILD.bazel index 000e00d7d7..f7476b222d 100644 --- a/deps/rabbit/BUILD.bazel +++ b/deps/rabbit/BUILD.bazel @@ -498,6 +498,11 @@ rabbitmq_integration_suite( ], ) +rabbitmq_integration_suite( + name = "rabbit_message_interceptor_SUITE", + size = "medium", +) + rabbitmq_integration_suite( name = "message_size_limit_SUITE", size = "medium", diff --git a/deps/rabbit/app.bzl b/deps/rabbit/app.bzl index 950cdd31e8..5ab5a71ca7 100644 --- a/deps/rabbit/app.bzl +++ b/deps/rabbit/app.bzl @@ -139,6 +139,7 @@ def all_beam_files(name = "all_beam_files"): "src/rabbit_looking_glass.erl", "src/rabbit_maintenance.erl", "src/rabbit_memory_monitor.erl", + "src/rabbit_message_interceptor.erl", "src/rabbit_metrics.erl", "src/rabbit_mirror_queue_coordinator.erl", "src/rabbit_mirror_queue_master.erl", @@ -379,6 +380,7 @@ def all_test_beam_files(name = "all_test_beam_files"): "src/rabbit_looking_glass.erl", "src/rabbit_maintenance.erl", "src/rabbit_memory_monitor.erl", + "src/rabbit_message_interceptor.erl", "src/rabbit_metrics.erl", "src/rabbit_mirror_queue_coordinator.erl", "src/rabbit_mirror_queue_master.erl", @@ -635,6 +637,7 @@ def all_srcs(name = "all_srcs"): "src/rabbit_looking_glass.erl", "src/rabbit_maintenance.erl", "src/rabbit_memory_monitor.erl", + "src/rabbit_message_interceptor.erl", "src/rabbit_metrics.erl", "src/rabbit_mirror_queue_coordinator.erl", "src/rabbit_mirror_queue_master.erl", @@ -1936,3 +1939,12 @@ def test_suite_beam_files(name = "test_suite_beam_files"): app_name = "rabbit", erlc_opts = "//:test_erlc_opts", ) + erlang_bytecode( + name = "rabbit_message_interceptor_SUITE_beam_files", + testonly = True, + srcs = ["test/rabbit_message_interceptor_SUITE.erl"], + outs = ["test/rabbit_message_interceptor_SUITE.beam"], + app_name = "rabbit", + erlc_opts = "//:test_erlc_opts", + deps = ["//deps/amqp_client:erlang_app"], + ) diff --git a/deps/rabbit/priv/schema/rabbit.schema b/deps/rabbit/priv/schema/rabbit.schema index 5024376213..e8226ab4e8 100644 --- a/deps/rabbit/priv/schema/rabbit.schema +++ b/deps/rabbit/priv/schema/rabbit.schema @@ -2504,6 +2504,32 @@ end}. end }. +%% +%% Message interceptors +%% +{mapping, "incoming_message_interceptors.$interceptor.overwrite", "rabbit.incoming_message_interceptors", [ + {datatype, {enum, [true, false]}}]}. + +{translation, "rabbit.incoming_message_interceptors", + fun(Conf) -> + case cuttlefish_variable:filter_by_prefix("incoming_message_interceptors", Conf) of + [] -> + cuttlefish:unset(); + L -> + [begin + Interceptor = list_to_atom(Interceptor0), + case lists:member(Interceptor, [set_header_timestamp, + set_header_routing_node]) of + true -> + {Interceptor, Overwrite}; + false -> + cuttlefish:invalid(io_lib:format("~p is invalid", [Interceptor])) + end + end || {["incoming_message_interceptors", Interceptor0, "overwrite"], Overwrite} <- L] + end + end +}. + % =============================== % Validators % =============================== diff --git a/deps/rabbit/src/rabbit.erl b/deps/rabbit/src/rabbit.erl index 3e6e705a55..1269c77a97 100644 --- a/deps/rabbit/src/rabbit.erl +++ b/deps/rabbit/src/rabbit.erl @@ -1646,18 +1646,20 @@ ensure_working_fhc() -> %% should be placed into persistent_term for efficiency. persist_static_configuration() -> persist_static_configuration( - [{rabbit, classic_queue_index_v2_segment_entry_count}, - {rabbit, classic_queue_store_v2_max_cache_size}, - {rabbit, classic_queue_store_v2_check_crc32} + [classic_queue_index_v2_segment_entry_count, + classic_queue_store_v2_max_cache_size, + classic_queue_store_v2_check_crc32, + incoming_message_interceptors ]). -persist_static_configuration(AppParams) -> +persist_static_configuration(Params) -> + App = ?MODULE, lists:foreach( - fun(Key = {App, Param}) -> + fun(Param) -> case application:get_env(App, Param) of {ok, Value} -> - ok = persistent_term:put(Key, Value); + ok = persistent_term:put({App, Param}, Value); undefined -> ok end - end, AppParams). + end, Params). diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 666d6e88d6..a78b8e1c08 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -1288,9 +1288,10 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, check_write_permitted_on_topic(Exchange, User, RoutingKey, AuthzContext), %% We decode the content's properties here because we're almost %% certain to want to look at delivery-mode and priority. - DecodedContent = #content {properties = Props} = + DecodedContent0 = #content {properties = Props} = maybe_set_fast_reply_to( rabbit_binary_parser:ensure_content_decoded(Content), State), + DecodedContent = rabbit_message_interceptor:intercept(DecodedContent0), check_user_id_header(Props, State), check_expiration_header(Props), DoConfirm = Tx =/= none orelse ConfirmEnabled, diff --git a/deps/rabbit/src/rabbit_message_interceptor.erl b/deps/rabbit/src/rabbit_message_interceptor.erl new file mode 100644 index 0000000000..05407349bf --- /dev/null +++ b/deps/rabbit/src/rabbit_message_interceptor.erl @@ -0,0 +1,65 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved. + +%% This module exists since 3.12 replacing plugins rabbitmq-message-timestamp +%% and rabbitmq-routing-node-stamp. Instead of using these plugins, RabbitMQ core can +%% now be configured to add such headers. This enables non-AMQP 0.9.1 protocols (that +%% do not use rabbit_channel) to also add AMQP 0.9.1 headers to incoming messages. +-module(rabbit_message_interceptor). + +-export([intercept/1]). + +-include_lib("rabbit_common/include/rabbit.hrl"). +-include_lib("rabbit_common/include/rabbit_framing.hrl"). + +-define(HEADER_TIMESTAMP, <<"timestamp_in_ms">>). +-define(HEADER_ROUTING_NODE, <<"x-routed-by">>). + +-type content() :: rabbit_types:decoded_content(). + +-spec intercept(content()) -> content(). +intercept(Content) -> + Interceptors = persistent_term:get({rabbit, incoming_message_interceptors}, []), + lists:foldl(fun(I, C) -> + intercept(C, I) + end, Content, Interceptors). + +intercept(Content, {set_header_routing_node, Overwrite}) -> + Node = atom_to_binary(node()), + set_header(Content, {?HEADER_ROUTING_NODE, longstr, Node}, Overwrite); +intercept(Content0, {set_header_timestamp, Overwrite}) -> + NowMs = os:system_time(millisecond), + NowSecs = NowMs div 1_000, + Content = set_header(Content0, {?HEADER_TIMESTAMP, long, NowMs}, Overwrite), + set_property_timestamp(Content, NowSecs, Overwrite). + +-spec set_header(content(), + {binary(), rabbit_framing:amqp_field_type(), rabbit_framing:amqp_value()}, + boolean()) -> + content(). +set_header(Content = #content{properties = Props = #'P_basic'{headers = Headers0}}, + Header = {Key, Type, Value}, Overwrite) -> + case {rabbit_basic:header(Key, Headers0), Overwrite} of + {Val, false} when Val =/= undefined -> + Content; + _ -> + Headers = if Headers0 =:= undefined -> [Header]; + true -> rabbit_misc:set_table_value(Headers0, Key, Type, Value) + end, + Content#content{properties = Props#'P_basic'{headers = Headers}, + properties_bin = none} + end. + +-spec set_property_timestamp(content(), pos_integer(), boolean()) -> content(). +set_property_timestamp(Content = #content{properties = Props = #'P_basic'{timestamp = Ts}}, + Timestamp, Overwrite) -> + case {Ts, Overwrite} of + {Secs, false} when is_integer(Secs) -> + Content; + _ -> + Content#content{properties = Props#'P_basic'{timestamp = Timestamp}, + properties_bin = none} + end. diff --git a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets index b9444eed47..2e6fae9aa5 100644 --- a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets +++ b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets @@ -925,6 +925,28 @@ credential_validator.regexp = ^abc\\d+", [{rabbit, [ {permitted_deprecated_features, #{classic_mirrored_queues => false}} ]}], + []}, + + %% + %% Message interceptors + %% + + {message_interceptors, + "incoming_message_interceptors.set_header_timestamp.overwrite = true", + [{rabbit, [ + {incoming_message_interceptors, [{set_header_timestamp, true}]} + ]}], + []}, + + {message_interceptors, + " + incoming_message_interceptors.set_header_routing_node.overwrite = false + incoming_message_interceptors.set_header_timestamp.overwrite = false + ", + [{rabbit, [ + {incoming_message_interceptors, [{set_header_routing_node, false}, + {set_header_timestamp, false}]} + ]}], []} ]. diff --git a/deps/rabbit/test/rabbit_message_interceptor_SUITE.erl b/deps/rabbit/test/rabbit_message_interceptor_SUITE.erl new file mode 100644 index 0000000000..db70c8e45f --- /dev/null +++ b/deps/rabbit/test/rabbit_message_interceptor_SUITE.erl @@ -0,0 +1,112 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2018-2023 VMware, Inc. or its affiliates. All rights reserved. + +-module(rabbit_message_interceptor_SUITE). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). + +-compile([nowarn_export_all, export_all]). + +-import(rabbit_ct_helpers, [eventually/1]). + +all() -> + [ + {group, tests} + ]. + +groups() -> + [ + {tests, [shuffle], [headers_overwrite, + headers_no_overwrite + ]} + ]. + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_testcase(Testcase, Config0) -> + Config1 = rabbit_ct_helpers:set_config( + Config0, [{rmq_nodename_suffix, Testcase}]), + Overwrite = case Testcase of + headers_overwrite -> true; + headers_no_overwrite -> false + end, + Val = maps:to_list( + maps:from_keys([set_header_timestamp, + set_header_routing_node], + Overwrite)), + Config = rabbit_ct_helpers:merge_app_env( + Config1, {rabbit, [{incoming_message_interceptors, Val}]}), + rabbit_ct_helpers:run_steps( + Config, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_testcase(Testcase, Config0) -> + Config = rabbit_ct_helpers:testcase_finished(Config0, Testcase), + rabbit_ct_helpers:run_teardown_steps( + Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). + +headers_overwrite(Config) -> + headers(true, Config). + +headers_no_overwrite(Config) -> + headers(false, Config). + +headers(Overwrite, Config) -> + Server = atom_to_binary(rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename)), + Payload = QName = atom_to_binary(?FUNCTION_NAME), + NowSecs = os:system_time(second), + NowMs = os:system_time(millisecond), + Ch = rabbit_ct_client_helpers:open_channel(Config), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}), + amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, + #amqp_msg{payload = Payload}), + AssertHeaders = + fun() -> + eventually( + ?_assertMatch( + {#'basic.get_ok'{}, + #amqp_msg{payload = Payload, + props = #'P_basic'{ + timestamp = Secs, + headers = [{<<"timestamp_in_ms">>, long, Ms}, + {<<"x-routed-by">>, longstr, Server}] + }}} + when Ms < NowMs + 4000 andalso + Ms > NowMs - 4000 andalso + Secs < NowSecs + 4 andalso + Secs > NowSecs - 4, + amqp_channel:call(Ch, #'basic.get'{queue = QName}))) + end, + AssertHeaders(), + + Msg = #amqp_msg{payload = Payload, + props = #'P_basic'{ + timestamp = 1, + headers = [{<<"timestamp_in_ms">>, long, 1000}, + {<<"x-routed-by">>, longstr, <<"rabbit@my-node">>}] + }}, + amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, Msg), + case Overwrite of + true -> + AssertHeaders(); + false -> + eventually( + ?_assertMatch( + {#'basic.get_ok'{}, Msg}, + amqp_channel:call(Ch, #'basic.get'{queue = QName}))) + end, + + #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), + ok. diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl index 48bc67d45b..7810dc1fc7 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl @@ -1082,13 +1082,14 @@ publish_to_queues( headers = [{<<"x-mqtt-publish-qos">>, byte, Qos}], delivery_mode = delivery_mode(Qos)}, {ClassId, _MethodId} = rabbit_framing_amqp_0_9_1:method_id('basic.publish'), - Content = #content{ - class_id = ClassId, - properties = Props, - properties_bin = none, - protocol = none, - payload_fragments_rev = [Payload] - }, + Content0 = #content{ + class_id = ClassId, + properties = Props, + properties_bin = none, + protocol = none, + payload_fragments_rev = [Payload] + }, + Content = rabbit_message_interceptor:intercept(Content0), BasicMessage = #basic_message{ exchange_name = ExchangeName, routing_keys = [RoutingKey], diff --git a/deps/rabbitmq_mqtt/test/shared_SUITE.erl b/deps/rabbitmq_mqtt/test/shared_SUITE.erl index de94bdee7a..6227aa0b57 100644 --- a/deps/rabbitmq_mqtt/test/shared_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/shared_SUITE.erl @@ -96,6 +96,7 @@ subgroups() -> ,trace ,max_packet_size_unauthenticated ,default_queue_type + ,incoming_message_interceptors ]} ]}, {cluster_size_3, [], @@ -1424,6 +1425,36 @@ default_queue_type(Config) -> ok = emqtt:disconnect(C2), ok = rabbit_ct_broker_helpers:delete_vhost(Config, Vhost). +incoming_message_interceptors(Config) -> + Key = {rabbit, ?FUNCTION_NAME}, + ok = rpc(Config, persistent_term, put, [Key, [{set_header_timestamp, false}]]), + Ch = rabbit_ct_client_helpers:open_channel(Config), + Payload = ClientId = QName = Topic = atom_to_binary(?FUNCTION_NAME), + declare_queue(Ch, QName, []), + bind(Ch, QName, Topic), + C = connect(ClientId, Config), + ok = emqtt:publish(C, Topic, Payload), + NowSecs = os:system_time(second), + NowMs = os:system_time(millisecond), + eventually( + ?_assertMatch( + {#'basic.get_ok'{}, + #amqp_msg{payload = Payload, + props = #'P_basic'{ + timestamp = Secs, + headers = [{<<"timestamp_in_ms">>, long, Ms}, + {<<"x-mqtt-publish-qos">>, byte, 0}] + }}} + when Ms < NowMs + 4000 andalso + Ms > NowMs - 4000 andalso + Secs < NowSecs + 4 andalso + Secs > NowSecs - 4, + amqp_channel:call(Ch, #'basic.get'{queue = QName}))), + + delete_queue(Ch, QName), + true = rpc(Config, persistent_term, erase, [Key]), + ok = emqtt:disconnect(C). + %% ------------------------------------------------------------------- %% Internal helpers %% ------------------------------------------------------------------- -- cgit v1.2.1 From ddabc3519117c1556ef1a5467931242d07e5f425 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Mon, 15 May 2023 10:06:01 +0000 Subject: Change rabbitmq.conf key to message_interceptors.incoming.* as it nicer categorises if there will be a future "message_interceptors.outgoing.*" key. We leave the advanced config file key because simple single value settings should not require using the advanced config file. --- deps/rabbit/priv/schema/rabbit.schema | 6 +++--- deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) (limited to 'deps') diff --git a/deps/rabbit/priv/schema/rabbit.schema b/deps/rabbit/priv/schema/rabbit.schema index e8226ab4e8..7c8b627e9c 100644 --- a/deps/rabbit/priv/schema/rabbit.schema +++ b/deps/rabbit/priv/schema/rabbit.schema @@ -2507,12 +2507,12 @@ end}. %% %% Message interceptors %% -{mapping, "incoming_message_interceptors.$interceptor.overwrite", "rabbit.incoming_message_interceptors", [ +{mapping, "message_interceptors.incoming.$interceptor.overwrite", "rabbit.incoming_message_interceptors", [ {datatype, {enum, [true, false]}}]}. {translation, "rabbit.incoming_message_interceptors", fun(Conf) -> - case cuttlefish_variable:filter_by_prefix("incoming_message_interceptors", Conf) of + case cuttlefish_variable:filter_by_prefix("message_interceptors", Conf) of [] -> cuttlefish:unset(); L -> @@ -2525,7 +2525,7 @@ end}. false -> cuttlefish:invalid(io_lib:format("~p is invalid", [Interceptor])) end - end || {["incoming_message_interceptors", Interceptor0, "overwrite"], Overwrite} <- L] + end || {["message_interceptors", "incoming", Interceptor0, "overwrite"], Overwrite} <- L] end end }. diff --git a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets index 2e6fae9aa5..b387befeee 100644 --- a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets +++ b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets @@ -932,7 +932,7 @@ credential_validator.regexp = ^abc\\d+", %% {message_interceptors, - "incoming_message_interceptors.set_header_timestamp.overwrite = true", + "message_interceptors.incoming.set_header_timestamp.overwrite = true", [{rabbit, [ {incoming_message_interceptors, [{set_header_timestamp, true}]} ]}], @@ -940,8 +940,8 @@ credential_validator.regexp = ^abc\\d+", {message_interceptors, " - incoming_message_interceptors.set_header_routing_node.overwrite = false - incoming_message_interceptors.set_header_timestamp.overwrite = false + message_interceptors.incoming.set_header_routing_node.overwrite = false + message_interceptors.incoming.set_header_timestamp.overwrite = false ", [{rabbit, [ {incoming_message_interceptors, [{set_header_routing_node, false}, -- cgit v1.2.1