summaryrefslogtreecommitdiff
path: root/deps/rabbit/src
diff options
context:
space:
mode:
Diffstat (limited to 'deps/rabbit/src')
-rw-r--r--deps/rabbit/src/rabbit.erl16
-rw-r--r--deps/rabbit/src/rabbit_channel.erl3
-rw-r--r--deps/rabbit/src/rabbit_message_interceptor.erl65
3 files changed, 76 insertions, 8 deletions
diff --git a/deps/rabbit/src/rabbit.erl b/deps/rabbit/src/rabbit.erl
index 3e6e705a55..1269c77a97 100644
--- a/deps/rabbit/src/rabbit.erl
+++ b/deps/rabbit/src/rabbit.erl
@@ -1646,18 +1646,20 @@ ensure_working_fhc() ->
%% should be placed into persistent_term for efficiency.
persist_static_configuration() ->
persist_static_configuration(
- [{rabbit, classic_queue_index_v2_segment_entry_count},
- {rabbit, classic_queue_store_v2_max_cache_size},
- {rabbit, classic_queue_store_v2_check_crc32}
+ [classic_queue_index_v2_segment_entry_count,
+ classic_queue_store_v2_max_cache_size,
+ classic_queue_store_v2_check_crc32,
+ incoming_message_interceptors
]).
-persist_static_configuration(AppParams) ->
+persist_static_configuration(Params) ->
+ App = ?MODULE,
lists:foreach(
- fun(Key = {App, Param}) ->
+ fun(Param) ->
case application:get_env(App, Param) of
{ok, Value} ->
- ok = persistent_term:put(Key, Value);
+ ok = persistent_term:put({App, Param}, Value);
undefined ->
ok
end
- end, AppParams).
+ end, Params).
diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl
index 666d6e88d6..a78b8e1c08 100644
--- a/deps/rabbit/src/rabbit_channel.erl
+++ b/deps/rabbit/src/rabbit_channel.erl
@@ -1288,9 +1288,10 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
check_write_permitted_on_topic(Exchange, User, RoutingKey, AuthzContext),
%% We decode the content's properties here because we're almost
%% certain to want to look at delivery-mode and priority.
- DecodedContent = #content {properties = Props} =
+ DecodedContent0 = #content {properties = Props} =
maybe_set_fast_reply_to(
rabbit_binary_parser:ensure_content_decoded(Content), State),
+ DecodedContent = rabbit_message_interceptor:intercept(DecodedContent0),
check_user_id_header(Props, State),
check_expiration_header(Props),
DoConfirm = Tx =/= none orelse ConfirmEnabled,
diff --git a/deps/rabbit/src/rabbit_message_interceptor.erl b/deps/rabbit/src/rabbit_message_interceptor.erl
new file mode 100644
index 0000000000..05407349bf
--- /dev/null
+++ b/deps/rabbit/src/rabbit_message_interceptor.erl
@@ -0,0 +1,65 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
+
+%% This module exists since 3.12 replacing plugins rabbitmq-message-timestamp
+%% and rabbitmq-routing-node-stamp. Instead of using these plugins, RabbitMQ core can
+%% now be configured to add such headers. This enables non-AMQP 0.9.1 protocols (that
+%% do not use rabbit_channel) to also add AMQP 0.9.1 headers to incoming messages.
+-module(rabbit_message_interceptor).
+
+-export([intercept/1]).
+
+-include_lib("rabbit_common/include/rabbit.hrl").
+-include_lib("rabbit_common/include/rabbit_framing.hrl").
+
+-define(HEADER_TIMESTAMP, <<"timestamp_in_ms">>).
+-define(HEADER_ROUTING_NODE, <<"x-routed-by">>).
+
+-type content() :: rabbit_types:decoded_content().
+
+-spec intercept(content()) -> content().
+intercept(Content) ->
+ Interceptors = persistent_term:get({rabbit, incoming_message_interceptors}, []),
+ lists:foldl(fun(I, C) ->
+ intercept(C, I)
+ end, Content, Interceptors).
+
+intercept(Content, {set_header_routing_node, Overwrite}) ->
+ Node = atom_to_binary(node()),
+ set_header(Content, {?HEADER_ROUTING_NODE, longstr, Node}, Overwrite);
+intercept(Content0, {set_header_timestamp, Overwrite}) ->
+ NowMs = os:system_time(millisecond),
+ NowSecs = NowMs div 1_000,
+ Content = set_header(Content0, {?HEADER_TIMESTAMP, long, NowMs}, Overwrite),
+ set_property_timestamp(Content, NowSecs, Overwrite).
+
+-spec set_header(content(),
+ {binary(), rabbit_framing:amqp_field_type(), rabbit_framing:amqp_value()},
+ boolean()) ->
+ content().
+set_header(Content = #content{properties = Props = #'P_basic'{headers = Headers0}},
+ Header = {Key, Type, Value}, Overwrite) ->
+ case {rabbit_basic:header(Key, Headers0), Overwrite} of
+ {Val, false} when Val =/= undefined ->
+ Content;
+ _ ->
+ Headers = if Headers0 =:= undefined -> [Header];
+ true -> rabbit_misc:set_table_value(Headers0, Key, Type, Value)
+ end,
+ Content#content{properties = Props#'P_basic'{headers = Headers},
+ properties_bin = none}
+ end.
+
+-spec set_property_timestamp(content(), pos_integer(), boolean()) -> content().
+set_property_timestamp(Content = #content{properties = Props = #'P_basic'{timestamp = Ts}},
+ Timestamp, Overwrite) ->
+ case {Ts, Overwrite} of
+ {Secs, false} when is_integer(Secs) ->
+ Content;
+ _ ->
+ Content#content{properties = Props#'P_basic'{timestamp = Timestamp},
+ properties_bin = none}
+ end.