diff options
Diffstat (limited to 'deps/rabbitmq_federation/src/rabbit_federation_upstream_exchange.erl')
-rw-r--r-- | deps/rabbitmq_federation/src/rabbit_federation_upstream_exchange.erl | 75 |
1 files changed, 75 insertions, 0 deletions
diff --git a/deps/rabbitmq_federation/src/rabbit_federation_upstream_exchange.erl b/deps/rabbitmq_federation/src/rabbit_federation_upstream_exchange.erl new file mode 100644 index 0000000000..6018dd90a5 --- /dev/null +++ b/deps/rabbitmq_federation/src/rabbit_federation_upstream_exchange.erl @@ -0,0 +1,75 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_federation_upstream_exchange). + +-rabbit_boot_step({?MODULE, + [{description, "federation upstream exchange type"}, + {mfa, {rabbit_registry, register, + [exchange, <<"x-federation-upstream">>, ?MODULE]}}, + {requires, rabbit_registry}, + {cleanup, {rabbit_registry, unregister, + [exchange, <<"x-federation-upstream">>]}}, + {enables, recovery}]}). + +-include_lib("rabbit_common/include/rabbit.hrl"). +-include("rabbit_federation.hrl"). + +-behaviour(rabbit_exchange_type). + +-export([description/0, serialise_events/0, route/2]). +-export([validate/1, validate_binding/2, + create/2, delete/3, policy_changed/2, + add_binding/3, remove_bindings/3, assert_args_equivalence/2]). +-export([info/1, info/2]). + +%%---------------------------------------------------------------------------- + +info(_X) -> []. +info(_X, _) -> []. + +description() -> + [{description, <<"Federation upstream helper exchange">>}, + {internal_purpose, federation}]. + +serialise_events() -> false. + +route(X = #exchange{arguments = Args}, + D = #delivery{message = #basic_message{content = Content}}) -> + %% This arg was introduced in the same release as this exchange type; + %% it must be set + {long, MaxHops} = rabbit_misc:table_lookup(Args, ?MAX_HOPS_ARG), + %% Will be missing for pre-3.3.0 versions + DName = case rabbit_misc:table_lookup(Args, ?DOWNSTREAM_NAME_ARG) of + {longstr, Val0} -> Val0; + _ -> unknown + end, + %% Will be missing for pre-3.8.9 versions + DVhost = case rabbit_misc:table_lookup(Args, ?DOWNSTREAM_VHOST_ARG) of + {longstr, Val1} -> Val1; + _ -> unknown + end, + Headers = rabbit_basic:extract_headers(Content), + case rabbit_federation_util:should_forward(Headers, MaxHops, DName, DVhost) of + true -> rabbit_exchange_type_fanout:route(X, D); + false -> [] + end. + +validate(#exchange{arguments = Args}) -> + rabbit_federation_util:validate_arg(?MAX_HOPS_ARG, long, Args). + +validate_binding(_X, _B) -> ok. +create(_Tx, _X) -> ok. +delete(_Tx, _X, _Bs) -> ok. +policy_changed(_X1, _X2) -> ok. +add_binding(_Tx, _X, _B) -> ok. +remove_bindings(_Tx, _X, _Bs) -> ok. + +assert_args_equivalence(X = #exchange{name = Name, + arguments = Args}, ReqArgs) -> + rabbit_misc:assert_args_equivalence(Args, ReqArgs, Name, [?MAX_HOPS_ARG]), + rabbit_exchange:assert_args_equivalence(X, Args). |