summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Ansari <david.ansari@gmx.de>2023-05-12 14:12:50 +0000
committerDavid Ansari <david.ansari@gmx.de>2023-05-15 08:37:52 +0000
commit044f6e3bac20af1a2f6391ba0c670a8b704f0e31 (patch)
tree288aae5412602f624381327640be53bb7ea6e1aa
parent92017ac61274b39ed338f6a7d7a0310dab3fa86a (diff)
downloadrabbitmq-server-git-044f6e3bac20af1a2f6391ba0c670a8b704f0e31.tar.gz
Move plugin rabbitmq-message-timestamp to the core
As reported in https://groups.google.com/g/rabbitmq-users/c/x8ACs4dBlkI/ plugins that implement rabbit_channel_interceptor break with Native MQTT in 3.12 because Native MQTT does not use rabbit_channel anymore. Specifically, these plugins don't work anymore in 3.12 when sending a message from an MQTT publisher to an AMQP 0.9.1 consumer. Two of these plugins are https://github.com/rabbitmq/rabbitmq-message-timestamp and https://github.com/rabbitmq/rabbitmq-routing-node-stamp This commit moves both plugins into rabbitmq-server. Therefore, these plugins are deprecated starting in 3.12. Instead of using these plugins, the user gets the same behaviour by configuring rabbitmq.conf as follows: ``` incoming_message_interceptors.set_header_timestamp.overwrite = false incoming_message_interceptors.set_header_routing_node.overwrite = false ``` While both plugins were incompatible to be used together, this commit allows setting both headers. We name the top level configuration key `incoming_message_interceptors` because only incoming messages are intercepted. Currently, only `set_header_timestamp` and `set_header_routing_node` are supported. (We might support more in the future.) Both can set `overwrite` to `false` or `true`. The meaning of `overwrite` is the same as documented in https://github.com/rabbitmq/rabbitmq-message-timestamp#always-overwrite-timestamps i.e. whether headers should be overwritten if they are already present in the message. Both `set_header_timestamp` and `set_header_routing_node` behave exactly to plugins `rabbitmq-message-timestamp` and `rabbitmq-routing-node-stamp`, respectively. Upon node boot, the configuration is put into persistent_term to not cause any performance penalty in the default case where these settings are disabled. The channel and MQTT connection process will intercept incoming messages and - if configured - add the desired AMQP 0.9.1 headers. For now, this allows using Native MQTT in 3.12 with the old plugins behaviour. In the future, once "message containers" are implemented, we can think about more generic message interceptors where plugins can be written to modify arbitrary headers or message contents for various protocols. Likewise, in the future, once MQTT 5.0 is implemented, we can think about an MQTT connection interceptor which could function similar to a `rabbit_channel_interceptor` allowing to modify any MQTT packet.
-rw-r--r--deps/rabbit/BUILD.bazel5
-rw-r--r--deps/rabbit/app.bzl12
-rw-r--r--deps/rabbit/priv/schema/rabbit.schema26
-rw-r--r--deps/rabbit/src/rabbit.erl16
-rw-r--r--deps/rabbit/src/rabbit_channel.erl3
-rw-r--r--deps/rabbit/src/rabbit_message_interceptor.erl65
-rw-r--r--deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets22
-rw-r--r--deps/rabbit/test/rabbit_message_interceptor_SUITE.erl112
-rw-r--r--deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl15
-rw-r--r--deps/rabbitmq_mqtt/test/shared_SUITE.erl31
-rwxr-xr-xmoduleindex.yaml1
11 files changed, 293 insertions, 15 deletions
diff --git a/deps/rabbit/BUILD.bazel b/deps/rabbit/BUILD.bazel
index 000e00d7d7..f7476b222d 100644
--- a/deps/rabbit/BUILD.bazel
+++ b/deps/rabbit/BUILD.bazel
@@ -499,6 +499,11 @@ rabbitmq_integration_suite(
)
rabbitmq_integration_suite(
+ name = "rabbit_message_interceptor_SUITE",
+ size = "medium",
+)
+
+rabbitmq_integration_suite(
name = "message_size_limit_SUITE",
size = "medium",
)
diff --git a/deps/rabbit/app.bzl b/deps/rabbit/app.bzl
index 950cdd31e8..5ab5a71ca7 100644
--- a/deps/rabbit/app.bzl
+++ b/deps/rabbit/app.bzl
@@ -139,6 +139,7 @@ def all_beam_files(name = "all_beam_files"):
"src/rabbit_looking_glass.erl",
"src/rabbit_maintenance.erl",
"src/rabbit_memory_monitor.erl",
+ "src/rabbit_message_interceptor.erl",
"src/rabbit_metrics.erl",
"src/rabbit_mirror_queue_coordinator.erl",
"src/rabbit_mirror_queue_master.erl",
@@ -379,6 +380,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
"src/rabbit_looking_glass.erl",
"src/rabbit_maintenance.erl",
"src/rabbit_memory_monitor.erl",
+ "src/rabbit_message_interceptor.erl",
"src/rabbit_metrics.erl",
"src/rabbit_mirror_queue_coordinator.erl",
"src/rabbit_mirror_queue_master.erl",
@@ -635,6 +637,7 @@ def all_srcs(name = "all_srcs"):
"src/rabbit_looking_glass.erl",
"src/rabbit_maintenance.erl",
"src/rabbit_memory_monitor.erl",
+ "src/rabbit_message_interceptor.erl",
"src/rabbit_metrics.erl",
"src/rabbit_mirror_queue_coordinator.erl",
"src/rabbit_mirror_queue_master.erl",
@@ -1936,3 +1939,12 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
)
+ erlang_bytecode(
+ name = "rabbit_message_interceptor_SUITE_beam_files",
+ testonly = True,
+ srcs = ["test/rabbit_message_interceptor_SUITE.erl"],
+ outs = ["test/rabbit_message_interceptor_SUITE.beam"],
+ app_name = "rabbit",
+ erlc_opts = "//:test_erlc_opts",
+ deps = ["//deps/amqp_client:erlang_app"],
+ )
diff --git a/deps/rabbit/priv/schema/rabbit.schema b/deps/rabbit/priv/schema/rabbit.schema
index 5024376213..e8226ab4e8 100644
--- a/deps/rabbit/priv/schema/rabbit.schema
+++ b/deps/rabbit/priv/schema/rabbit.schema
@@ -2504,6 +2504,32 @@ end}.
end
}.
+%%
+%% Message interceptors
+%%
+{mapping, "incoming_message_interceptors.$interceptor.overwrite", "rabbit.incoming_message_interceptors", [
+ {datatype, {enum, [true, false]}}]}.
+
+{translation, "rabbit.incoming_message_interceptors",
+ fun(Conf) ->
+ case cuttlefish_variable:filter_by_prefix("incoming_message_interceptors", Conf) of
+ [] ->
+ cuttlefish:unset();
+ L ->
+ [begin
+ Interceptor = list_to_atom(Interceptor0),
+ case lists:member(Interceptor, [set_header_timestamp,
+ set_header_routing_node]) of
+ true ->
+ {Interceptor, Overwrite};
+ false ->
+ cuttlefish:invalid(io_lib:format("~p is invalid", [Interceptor]))
+ end
+ end || {["incoming_message_interceptors", Interceptor0, "overwrite"], Overwrite} <- L]
+ end
+ end
+}.
+
% ===============================
% Validators
% ===============================
diff --git a/deps/rabbit/src/rabbit.erl b/deps/rabbit/src/rabbit.erl
index 3e6e705a55..1269c77a97 100644
--- a/deps/rabbit/src/rabbit.erl
+++ b/deps/rabbit/src/rabbit.erl
@@ -1646,18 +1646,20 @@ ensure_working_fhc() ->
%% should be placed into persistent_term for efficiency.
persist_static_configuration() ->
persist_static_configuration(
- [{rabbit, classic_queue_index_v2_segment_entry_count},
- {rabbit, classic_queue_store_v2_max_cache_size},
- {rabbit, classic_queue_store_v2_check_crc32}
+ [classic_queue_index_v2_segment_entry_count,
+ classic_queue_store_v2_max_cache_size,
+ classic_queue_store_v2_check_crc32,
+ incoming_message_interceptors
]).
-persist_static_configuration(AppParams) ->
+persist_static_configuration(Params) ->
+ App = ?MODULE,
lists:foreach(
- fun(Key = {App, Param}) ->
+ fun(Param) ->
case application:get_env(App, Param) of
{ok, Value} ->
- ok = persistent_term:put(Key, Value);
+ ok = persistent_term:put({App, Param}, Value);
undefined ->
ok
end
- end, AppParams).
+ end, Params).
diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl
index 666d6e88d6..a78b8e1c08 100644
--- a/deps/rabbit/src/rabbit_channel.erl
+++ b/deps/rabbit/src/rabbit_channel.erl
@@ -1288,9 +1288,10 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
check_write_permitted_on_topic(Exchange, User, RoutingKey, AuthzContext),
%% We decode the content's properties here because we're almost
%% certain to want to look at delivery-mode and priority.
- DecodedContent = #content {properties = Props} =
+ DecodedContent0 = #content {properties = Props} =
maybe_set_fast_reply_to(
rabbit_binary_parser:ensure_content_decoded(Content), State),
+ DecodedContent = rabbit_message_interceptor:intercept(DecodedContent0),
check_user_id_header(Props, State),
check_expiration_header(Props),
DoConfirm = Tx =/= none orelse ConfirmEnabled,
diff --git a/deps/rabbit/src/rabbit_message_interceptor.erl b/deps/rabbit/src/rabbit_message_interceptor.erl
new file mode 100644
index 0000000000..05407349bf
--- /dev/null
+++ b/deps/rabbit/src/rabbit_message_interceptor.erl
@@ -0,0 +1,65 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
+
+%% This module exists since 3.12 replacing plugins rabbitmq-message-timestamp
+%% and rabbitmq-routing-node-stamp. Instead of using these plugins, RabbitMQ core can
+%% now be configured to add such headers. This enables non-AMQP 0.9.1 protocols (that
+%% do not use rabbit_channel) to also add AMQP 0.9.1 headers to incoming messages.
+-module(rabbit_message_interceptor).
+
+-export([intercept/1]).
+
+-include_lib("rabbit_common/include/rabbit.hrl").
+-include_lib("rabbit_common/include/rabbit_framing.hrl").
+
+-define(HEADER_TIMESTAMP, <<"timestamp_in_ms">>).
+-define(HEADER_ROUTING_NODE, <<"x-routed-by">>).
+
+-type content() :: rabbit_types:decoded_content().
+
+-spec intercept(content()) -> content().
+intercept(Content) ->
+ Interceptors = persistent_term:get({rabbit, incoming_message_interceptors}, []),
+ lists:foldl(fun(I, C) ->
+ intercept(C, I)
+ end, Content, Interceptors).
+
+intercept(Content, {set_header_routing_node, Overwrite}) ->
+ Node = atom_to_binary(node()),
+ set_header(Content, {?HEADER_ROUTING_NODE, longstr, Node}, Overwrite);
+intercept(Content0, {set_header_timestamp, Overwrite}) ->
+ NowMs = os:system_time(millisecond),
+ NowSecs = NowMs div 1_000,
+ Content = set_header(Content0, {?HEADER_TIMESTAMP, long, NowMs}, Overwrite),
+ set_property_timestamp(Content, NowSecs, Overwrite).
+
+-spec set_header(content(),
+ {binary(), rabbit_framing:amqp_field_type(), rabbit_framing:amqp_value()},
+ boolean()) ->
+ content().
+set_header(Content = #content{properties = Props = #'P_basic'{headers = Headers0}},
+ Header = {Key, Type, Value}, Overwrite) ->
+ case {rabbit_basic:header(Key, Headers0), Overwrite} of
+ {Val, false} when Val =/= undefined ->
+ Content;
+ _ ->
+ Headers = if Headers0 =:= undefined -> [Header];
+ true -> rabbit_misc:set_table_value(Headers0, Key, Type, Value)
+ end,
+ Content#content{properties = Props#'P_basic'{headers = Headers},
+ properties_bin = none}
+ end.
+
+-spec set_property_timestamp(content(), pos_integer(), boolean()) -> content().
+set_property_timestamp(Content = #content{properties = Props = #'P_basic'{timestamp = Ts}},
+ Timestamp, Overwrite) ->
+ case {Ts, Overwrite} of
+ {Secs, false} when is_integer(Secs) ->
+ Content;
+ _ ->
+ Content#content{properties = Props#'P_basic'{timestamp = Timestamp},
+ properties_bin = none}
+ end.
diff --git a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets
index b9444eed47..2e6fae9aa5 100644
--- a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets
+++ b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets
@@ -925,6 +925,28 @@ credential_validator.regexp = ^abc\\d+",
[{rabbit, [
{permitted_deprecated_features, #{classic_mirrored_queues => false}}
]}],
+ []},
+
+ %%
+ %% Message interceptors
+ %%
+
+ {message_interceptors,
+ "incoming_message_interceptors.set_header_timestamp.overwrite = true",
+ [{rabbit, [
+ {incoming_message_interceptors, [{set_header_timestamp, true}]}
+ ]}],
+ []},
+
+ {message_interceptors,
+ "
+ incoming_message_interceptors.set_header_routing_node.overwrite = false
+ incoming_message_interceptors.set_header_timestamp.overwrite = false
+ ",
+ [{rabbit, [
+ {incoming_message_interceptors, [{set_header_routing_node, false},
+ {set_header_timestamp, false}]}
+ ]}],
[]}
].
diff --git a/deps/rabbit/test/rabbit_message_interceptor_SUITE.erl b/deps/rabbit/test/rabbit_message_interceptor_SUITE.erl
new file mode 100644
index 0000000000..db70c8e45f
--- /dev/null
+++ b/deps/rabbit/test/rabbit_message_interceptor_SUITE.erl
@@ -0,0 +1,112 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2018-2023 VMware, Inc. or its affiliates. All rights reserved.
+
+-module(rabbit_message_interceptor_SUITE).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+-compile([nowarn_export_all, export_all]).
+
+-import(rabbit_ct_helpers, [eventually/1]).
+
+all() ->
+ [
+ {group, tests}
+ ].
+
+groups() ->
+ [
+ {tests, [shuffle], [headers_overwrite,
+ headers_no_overwrite
+ ]}
+ ].
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_testcase(Testcase, Config0) ->
+ Config1 = rabbit_ct_helpers:set_config(
+ Config0, [{rmq_nodename_suffix, Testcase}]),
+ Overwrite = case Testcase of
+ headers_overwrite -> true;
+ headers_no_overwrite -> false
+ end,
+ Val = maps:to_list(
+ maps:from_keys([set_header_timestamp,
+ set_header_routing_node],
+ Overwrite)),
+ Config = rabbit_ct_helpers:merge_app_env(
+ Config1, {rabbit, [{incoming_message_interceptors, Val}]}),
+ rabbit_ct_helpers:run_steps(
+ Config,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()).
+
+end_per_testcase(Testcase, Config0) ->
+ Config = rabbit_ct_helpers:testcase_finished(Config0, Testcase),
+ rabbit_ct_helpers:run_teardown_steps(
+ Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+headers_overwrite(Config) ->
+ headers(true, Config).
+
+headers_no_overwrite(Config) ->
+ headers(false, Config).
+
+headers(Overwrite, Config) ->
+ Server = atom_to_binary(rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename)),
+ Payload = QName = atom_to_binary(?FUNCTION_NAME),
+ NowSecs = os:system_time(second),
+ NowMs = os:system_time(millisecond),
+ Ch = rabbit_ct_client_helpers:open_channel(Config),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}),
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName},
+ #amqp_msg{payload = Payload}),
+ AssertHeaders =
+ fun() ->
+ eventually(
+ ?_assertMatch(
+ {#'basic.get_ok'{},
+ #amqp_msg{payload = Payload,
+ props = #'P_basic'{
+ timestamp = Secs,
+ headers = [{<<"timestamp_in_ms">>, long, Ms},
+ {<<"x-routed-by">>, longstr, Server}]
+ }}}
+ when Ms < NowMs + 4000 andalso
+ Ms > NowMs - 4000 andalso
+ Secs < NowSecs + 4 andalso
+ Secs > NowSecs - 4,
+ amqp_channel:call(Ch, #'basic.get'{queue = QName})))
+ end,
+ AssertHeaders(),
+
+ Msg = #amqp_msg{payload = Payload,
+ props = #'P_basic'{
+ timestamp = 1,
+ headers = [{<<"timestamp_in_ms">>, long, 1000},
+ {<<"x-routed-by">>, longstr, <<"rabbit@my-node">>}]
+ }},
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, Msg),
+ case Overwrite of
+ true ->
+ AssertHeaders();
+ false ->
+ eventually(
+ ?_assertMatch(
+ {#'basic.get_ok'{}, Msg},
+ amqp_channel:call(Ch, #'basic.get'{queue = QName})))
+ end,
+
+ #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
+ ok.
diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
index 48bc67d45b..7810dc1fc7 100644
--- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
+++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
@@ -1082,13 +1082,14 @@ publish_to_queues(
headers = [{<<"x-mqtt-publish-qos">>, byte, Qos}],
delivery_mode = delivery_mode(Qos)},
{ClassId, _MethodId} = rabbit_framing_amqp_0_9_1:method_id('basic.publish'),
- Content = #content{
- class_id = ClassId,
- properties = Props,
- properties_bin = none,
- protocol = none,
- payload_fragments_rev = [Payload]
- },
+ Content0 = #content{
+ class_id = ClassId,
+ properties = Props,
+ properties_bin = none,
+ protocol = none,
+ payload_fragments_rev = [Payload]
+ },
+ Content = rabbit_message_interceptor:intercept(Content0),
BasicMessage = #basic_message{
exchange_name = ExchangeName,
routing_keys = [RoutingKey],
diff --git a/deps/rabbitmq_mqtt/test/shared_SUITE.erl b/deps/rabbitmq_mqtt/test/shared_SUITE.erl
index de94bdee7a..6227aa0b57 100644
--- a/deps/rabbitmq_mqtt/test/shared_SUITE.erl
+++ b/deps/rabbitmq_mqtt/test/shared_SUITE.erl
@@ -96,6 +96,7 @@ subgroups() ->
,trace
,max_packet_size_unauthenticated
,default_queue_type
+ ,incoming_message_interceptors
]}
]},
{cluster_size_3, [],
@@ -1424,6 +1425,36 @@ default_queue_type(Config) ->
ok = emqtt:disconnect(C2),
ok = rabbit_ct_broker_helpers:delete_vhost(Config, Vhost).
+incoming_message_interceptors(Config) ->
+ Key = {rabbit, ?FUNCTION_NAME},
+ ok = rpc(Config, persistent_term, put, [Key, [{set_header_timestamp, false}]]),
+ Ch = rabbit_ct_client_helpers:open_channel(Config),
+ Payload = ClientId = QName = Topic = atom_to_binary(?FUNCTION_NAME),
+ declare_queue(Ch, QName, []),
+ bind(Ch, QName, Topic),
+ C = connect(ClientId, Config),
+ ok = emqtt:publish(C, Topic, Payload),
+ NowSecs = os:system_time(second),
+ NowMs = os:system_time(millisecond),
+ eventually(
+ ?_assertMatch(
+ {#'basic.get_ok'{},
+ #amqp_msg{payload = Payload,
+ props = #'P_basic'{
+ timestamp = Secs,
+ headers = [{<<"timestamp_in_ms">>, long, Ms},
+ {<<"x-mqtt-publish-qos">>, byte, 0}]
+ }}}
+ when Ms < NowMs + 4000 andalso
+ Ms > NowMs - 4000 andalso
+ Secs < NowSecs + 4 andalso
+ Secs > NowSecs - 4,
+ amqp_channel:call(Ch, #'basic.get'{queue = QName}))),
+
+ delete_queue(Ch, QName),
+ true = rpc(Config, persistent_term, erase, [Key]),
+ ok = emqtt:disconnect(C).
+
%% -------------------------------------------------------------------
%% Internal helpers
%% -------------------------------------------------------------------
diff --git a/moduleindex.yaml b/moduleindex.yaml
index 8d9b7c854d..a4880c091d 100755
--- a/moduleindex.yaml
+++ b/moduleindex.yaml
@@ -575,6 +575,7 @@ rabbit:
- rabbit_looking_glass
- rabbit_maintenance
- rabbit_memory_monitor
+- rabbit_message_interceptor
- rabbit_metrics
- rabbit_mirror_queue_coordinator
- rabbit_mirror_queue_master