summaryrefslogtreecommitdiff
path: root/deps/rabbitmq_federation/src
diff options
context:
space:
mode:
authordcorbacho <dparracorbacho@piotal.io>2020-11-18 14:27:41 +0000
committerdcorbacho <dparracorbacho@piotal.io>2020-11-18 14:27:41 +0000
commitf23a51261d9502ec39df0f8db47ba6b22aa7659f (patch)
tree53dcdf46e7dc2c14e81ee960bce8793879b488d3 /deps/rabbitmq_federation/src
parentafa2c2bf6c7e0e9b63f4fb53dc931c70388e1c82 (diff)
parent9f6d64ec4a4b1eeac24d7846c5c64fd96798d892 (diff)
downloadrabbitmq-server-git-stream-timestamp-offset.tar.gz
Merge remote-tracking branch 'origin/master' into stream-timestamp-offsetstream-timestamp-offset
Diffstat (limited to 'deps/rabbitmq_federation/src')
-rw-r--r--deps/rabbitmq_federation/src/Elixir.RabbitMQ.CLI.Ctl.Commands.FederationStatusCommand.erl117
-rw-r--r--deps/rabbitmq_federation/src/Elixir.RabbitMQ.CLI.Ctl.Commands.RestartFederationLinkCommand.erl84
-rw-r--r--deps/rabbitmq_federation/src/rabbit_federation_app.erl38
-rw-r--r--deps/rabbitmq_federation/src/rabbit_federation_db.erl47
-rw-r--r--deps/rabbitmq_federation/src/rabbit_federation_event.erl54
-rw-r--r--deps/rabbitmq_federation/src/rabbit_federation_exchange.erl105
-rw-r--r--deps/rabbitmq_federation/src/rabbit_federation_exchange_link.erl696
-rw-r--r--deps/rabbitmq_federation/src/rabbit_federation_exchange_link_sup_sup.erl75
-rw-r--r--deps/rabbitmq_federation/src/rabbit_federation_link_sup.erl109
-rw-r--r--deps/rabbitmq_federation/src/rabbit_federation_link_util.erl364
-rw-r--r--deps/rabbitmq_federation/src/rabbit_federation_parameters.erl139
-rw-r--r--deps/rabbitmq_federation/src/rabbit_federation_queue.erl111
-rw-r--r--deps/rabbitmq_federation/src/rabbit_federation_queue_link.erl330
-rw-r--r--deps/rabbitmq_federation/src/rabbit_federation_queue_link_sup_sup.erl87
-rw-r--r--deps/rabbitmq_federation/src/rabbit_federation_status.erl175
-rw-r--r--deps/rabbitmq_federation/src/rabbit_federation_sup.erl63
-rw-r--r--deps/rabbitmq_federation/src/rabbit_federation_upstream.erl164
-rw-r--r--deps/rabbitmq_federation/src/rabbit_federation_upstream_exchange.erl75
-rw-r--r--deps/rabbitmq_federation/src/rabbit_federation_util.erl102
19 files changed, 2935 insertions, 0 deletions
diff --git a/deps/rabbitmq_federation/src/Elixir.RabbitMQ.CLI.Ctl.Commands.FederationStatusCommand.erl b/deps/rabbitmq_federation/src/Elixir.RabbitMQ.CLI.Ctl.Commands.FederationStatusCommand.erl
new file mode 100644
index 0000000000..bab4dddeec
--- /dev/null
+++ b/deps/rabbitmq_federation/src/Elixir.RabbitMQ.CLI.Ctl.Commands.FederationStatusCommand.erl
@@ -0,0 +1,117 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module('Elixir.RabbitMQ.CLI.Ctl.Commands.FederationStatusCommand').
+
+-include("rabbit_federation.hrl").
+
+-behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour').
+
+-export([
+ usage/0,
+ usage_additional/0,
+ usage_doc_guides/0,
+ flags/0,
+ validate/2,
+ merge_defaults/2,
+ banner/2,
+ run/2,
+ switches/0,
+ aliases/0,
+ output/2,
+ scopes/0,
+ formatter/0,
+ help_section/0,
+ description/0
+ ]).
+
+
+%%----------------------------------------------------------------------------
+%% Callbacks
+%%----------------------------------------------------------------------------
+usage() ->
+ <<"federation_status [--only-down]">>.
+
+usage_additional() ->
+ [
+ {<<"--only-down">>, <<"only display links that failed or are not currently connected">>}
+ ].
+
+usage_doc_guides() ->
+ [?FEDERATION_GUIDE_URL].
+
+help_section() ->
+ {plugin, federation}.
+
+description() ->
+ <<"Displays federation link status">>.
+
+flags() ->
+ [].
+
+validate(_,_) ->
+ ok.
+
+formatter() ->
+ 'Elixir.RabbitMQ.CLI.Formatters.Erlang'.
+
+merge_defaults(A, Opts) ->
+ {A, maps:merge(#{only_down => false}, Opts)}.
+
+banner(_, #{node := Node, only_down := true}) ->
+ erlang:iolist_to_binary([<<"Listing federation links which are down on node ">>,
+ atom_to_binary(Node, utf8), <<"...">>]);
+banner(_, #{node := Node, only_down := false}) ->
+ erlang:iolist_to_binary([<<"Listing federation links on node ">>,
+ atom_to_binary(Node, utf8), <<"...">>]).
+
+run(_Args, #{node := Node, only_down := OnlyDown}) ->
+ case rabbit_misc:rpc_call(Node, rabbit_federation_status, status, []) of
+ {badrpc, _} = Error ->
+ Error;
+ Status ->
+ {stream, filter(Status, OnlyDown)}
+ end.
+
+switches() ->
+ [{only_down, boolean}].
+
+aliases() ->
+ [].
+
+output({stream, FederationStatus}, _) ->
+ Formatted = [begin
+ Timestamp = proplists:get_value(timestamp, St),
+ Map0 = maps:remove(timestamp, maps:from_list(St)),
+ Map1 = maps:merge(#{queue => <<>>,
+ exchange => <<>>,
+ upstream_queue => <<>>,
+ upstream_exchange => <<>>,
+ local_connection => <<>>,
+ error => <<>>}, Map0),
+ Map1#{last_changed => fmt_ts(Timestamp)}
+ end || St <- FederationStatus],
+ {stream, Formatted};
+output(E, _Opts) ->
+ 'Elixir.RabbitMQ.CLI.DefaultOutput':output(E).
+
+scopes() ->
+ ['ctl', 'diagnostics'].
+
+%%----------------------------------------------------------------------------
+%% Formatting
+%%----------------------------------------------------------------------------
+fmt_ts({{YY, MM, DD}, {Hour, Min, Sec}}) ->
+ erlang:list_to_binary(
+ io_lib:format("~4..0w-~2..0w-~2..0w ~2..0w:~2..0w:~2..0w",
+ [YY, MM, DD, Hour, Min, Sec])).
+
+filter(Status, _OnlyDown = false) ->
+ Status;
+filter(Status, _OnlyDown = true) ->
+ [St || St <- Status,
+ not lists:member(proplists:get_value(status, St), [running, starting])].
diff --git a/deps/rabbitmq_federation/src/Elixir.RabbitMQ.CLI.Ctl.Commands.RestartFederationLinkCommand.erl b/deps/rabbitmq_federation/src/Elixir.RabbitMQ.CLI.Ctl.Commands.RestartFederationLinkCommand.erl
new file mode 100644
index 0000000000..8d062c692c
--- /dev/null
+++ b/deps/rabbitmq_federation/src/Elixir.RabbitMQ.CLI.Ctl.Commands.RestartFederationLinkCommand.erl
@@ -0,0 +1,84 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module('Elixir.RabbitMQ.CLI.Ctl.Commands.RestartFederationLinkCommand').
+
+-include("rabbit_federation.hrl").
+
+-behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour').
+
+-export([
+ usage/0,
+ usage_additional/0,
+ usage_doc_guides/0,
+ flags/0,
+ validate/2,
+ merge_defaults/2,
+ banner/2,
+ run/2,
+ aliases/0,
+ output/2,
+ help_section/0,
+ description/0
+ ]).
+
+
+%%----------------------------------------------------------------------------
+%% Callbacks
+%%----------------------------------------------------------------------------
+usage() ->
+ <<"restart_federation_link <link_id>">>.
+
+usage_additional() ->
+ [
+ {<<"<link_id>">>, <<"ID of the link to restart">>}
+ ].
+
+usage_doc_guides() ->
+ [?FEDERATION_GUIDE_URL].
+
+help_section() ->
+ {plugin, federation}.
+
+description() ->
+ <<"Restarts a running federation link">>.
+
+flags() ->
+ [].
+
+validate([], _Opts) ->
+ {validation_failure, not_enough_args};
+validate([_, _ | _], _Opts) ->
+ {validation_failure, too_many_args};
+validate([_], _) ->
+ ok.
+
+merge_defaults(A, O) ->
+ {A, O}.
+
+banner([Link], #{node := Node}) ->
+ erlang:iolist_to_binary([<<"Restarting federation link ">>, Link, << " on node ">>,
+ atom_to_binary(Node, utf8)]).
+
+run([Id], #{node := Node}) ->
+ case rabbit_misc:rpc_call(Node, rabbit_federation_status, lookup, [Id]) of
+ {badrpc, _} = Error ->
+ Error;
+ not_found ->
+ {error, <<"Link with the given ID was not found">>};
+ Obj ->
+ Upstream = proplists:get_value(upstream, Obj),
+ Supervisor = proplists:get_value(supervisor, Obj),
+ rabbit_misc:rpc_call(Node, rabbit_federation_link_sup, restart,
+ [Supervisor, Upstream])
+ end.
+
+aliases() ->
+ [].
+
+output(Output, _Opts) ->
+ 'Elixir.RabbitMQ.CLI.DefaultOutput':output(Output).
diff --git a/deps/rabbitmq_federation/src/rabbit_federation_app.erl b/deps/rabbitmq_federation/src/rabbit_federation_app.erl
new file mode 100644
index 0000000000..ee7ba91e5f
--- /dev/null
+++ b/deps/rabbitmq_federation/src/rabbit_federation_app.erl
@@ -0,0 +1,38 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_federation_app).
+
+-behaviour(application).
+-export([start/2, stop/1]).
+
+%% Dummy supervisor - see Ulf Wiger's comment at
+%% http://erlang.2086793.n4.nabble.com/initializing-library-applications-without-processes-td2094473.html
+
+%% All of our actual server processes are supervised by
+%% rabbit_federation_sup, which is started by a rabbit_boot_step
+%% (since it needs to start up before queue / exchange recovery, so it
+%% can't be part of our application).
+%%
+%% However, we still need an application behaviour since we need to
+%% know when our application has started since then the Erlang client
+%% will have started and we can therefore start our links going. Since
+%% the application behaviour needs a tree of processes to supervise,
+%% this is it...
+-behaviour(supervisor).
+-export([init/1]).
+
+start(_Type, _StartArgs) ->
+ rabbit_federation_exchange_link:go(),
+ rabbit_federation_queue_link:go(),
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+stop(_State) ->
+ ok.
+%%----------------------------------------------------------------------------
+
+init([]) -> {ok, {{one_for_one, 3, 10}, []}}.
diff --git a/deps/rabbitmq_federation/src/rabbit_federation_db.erl b/deps/rabbitmq_federation/src/rabbit_federation_db.erl
new file mode 100644
index 0000000000..e35e3646a8
--- /dev/null
+++ b/deps/rabbitmq_federation/src/rabbit_federation_db.erl
@@ -0,0 +1,47 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_federation_db).
+
+-include("rabbit_federation.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+-define(DICT, orddict).
+
+-export([get_active_suffix/3, set_active_suffix/3, prune_scratch/2]).
+
+%%----------------------------------------------------------------------------
+
+get_active_suffix(XName, Upstream, Default) ->
+ case rabbit_exchange:lookup_scratch(XName, federation) of
+ {ok, Dict} ->
+ case ?DICT:find(key(Upstream), Dict) of
+ {ok, Suffix} -> Suffix;
+ error -> Default
+ end;
+ {error, not_found} ->
+ Default
+ end.
+
+set_active_suffix(XName, Upstream, Suffix) ->
+ ok = rabbit_exchange:update_scratch(
+ XName, federation,
+ fun(D) -> ?DICT:store(key(Upstream), Suffix, ensure(D)) end).
+
+prune_scratch(XName, Upstreams) ->
+ ok = rabbit_exchange:update_scratch(
+ XName, federation,
+ fun(D) -> Keys = [key(U) || U <- Upstreams],
+ ?DICT:filter(
+ fun(K, _V) -> lists:member(K, Keys) end, ensure(D))
+ end).
+
+key(#upstream{name = UpstreamName, exchange_name = XNameBin}) ->
+ {UpstreamName, XNameBin}.
+
+ensure(undefined) -> ?DICT:new();
+ensure(D) -> D.
diff --git a/deps/rabbitmq_federation/src/rabbit_federation_event.erl b/deps/rabbitmq_federation/src/rabbit_federation_event.erl
new file mode 100644
index 0000000000..417b8ecba3
--- /dev/null
+++ b/deps/rabbitmq_federation/src/rabbit_federation_event.erl
@@ -0,0 +1,54 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_federation_event).
+-behaviour(gen_event).
+
+-include_lib("rabbit_common/include/rabbit.hrl").
+
+-export([add_handler/0, remove_handler/0]).
+
+-export([init/1, handle_call/2, handle_event/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-import(rabbit_misc, [pget/2]).
+
+%%----------------------------------------------------------------------------
+
+add_handler() ->
+ gen_event:add_handler(rabbit_event, ?MODULE, []).
+
+remove_handler() ->
+ gen_event:delete_handler(rabbit_event, ?MODULE, []).
+
+init([]) ->
+ {ok, []}.
+
+handle_call(_Request, State) ->
+ {ok, not_understood, State}.
+
+handle_event(#event{type = parameter_set,
+ props = Props0}, State) ->
+ Props = rabbit_data_coercion:to_list(Props0),
+ case {pget(component, Props), pget(name, Props)} of
+ {global, cluster_name} ->
+ rabbit_federation_parameters:adjust(everything);
+ _ ->
+ ok
+ end,
+ {ok, State};
+handle_event(_Event, State) ->
+ {ok, State}.
+
+handle_info(_Info, State) ->
+ {ok, State}.
+
+terminate(_Arg, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
diff --git a/deps/rabbitmq_federation/src/rabbit_federation_exchange.erl b/deps/rabbitmq_federation/src/rabbit_federation_exchange.erl
new file mode 100644
index 0000000000..6b85b6756b
--- /dev/null
+++ b/deps/rabbitmq_federation/src/rabbit_federation_exchange.erl
@@ -0,0 +1,105 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+%% TODO rename this
+-module(rabbit_federation_exchange).
+
+-rabbit_boot_step({?MODULE,
+ [{description, "federation exchange decorator"},
+ {mfa, {rabbit_registry, register,
+ [exchange_decorator, <<"federation">>, ?MODULE]}},
+ {requires, rabbit_registry},
+ {cleanup, {rabbit_registry, unregister,
+ [exchange_decorator, <<"federation">>]}},
+ {enables, recovery}]}).
+
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+-behaviour(rabbit_exchange_decorator).
+
+-export([description/0, serialise_events/1]).
+-export([create/2, delete/3, policy_changed/2,
+ add_binding/3, remove_bindings/3, route/2, active_for/1]).
+
+%%----------------------------------------------------------------------------
+
+description() ->
+ [{description, <<"Federation exchange decorator">>}].
+
+serialise_events(X) -> federate(X).
+
+create(transaction, _X) ->
+ ok;
+create(none, X) ->
+ maybe_start(X).
+
+delete(transaction, _X, _Bs) ->
+ ok;
+delete(none, X, _Bs) ->
+ maybe_stop(X).
+
+policy_changed(OldX, NewX) ->
+ maybe_stop(OldX),
+ maybe_start(NewX).
+
+add_binding(transaction, _X, _B) ->
+ ok;
+add_binding(Serial, X = #exchange{name = XName}, B) ->
+ case federate(X) of
+ true -> rabbit_federation_exchange_link:add_binding(Serial, XName, B),
+ ok;
+ false -> ok
+ end.
+
+remove_bindings(transaction, _X, _Bs) ->
+ ok;
+remove_bindings(Serial, X = #exchange{name = XName}, Bs) ->
+ case federate(X) of
+ true -> rabbit_federation_exchange_link:remove_bindings(Serial, XName, Bs),
+ ok;
+ false -> ok
+ end.
+
+route(_, _) -> [].
+
+active_for(X) ->
+ case federate(X) of
+ true -> noroute;
+ false -> none
+ end.
+
+%%----------------------------------------------------------------------------
+
+%% Don't federate default exchange, we can't bind to it
+federate(#exchange{name = #resource{name = <<"">>}}) ->
+ false;
+
+%% Don't federate any of our intermediate exchanges. Note that we use
+%% internal=true since older brokers may not declare
+%% x-federation-upstream on us. Also other internal exchanges should
+%% probably not be federated.
+federate(#exchange{internal = true}) ->
+ false;
+
+federate(X) ->
+ rabbit_federation_upstream:federate(X).
+
+maybe_start(X = #exchange{name = XName})->
+ case federate(X) of
+ true -> ok = rabbit_federation_db:prune_scratch(
+ XName, rabbit_federation_upstream:for(X)),
+ ok = rabbit_federation_exchange_link_sup_sup:start_child(X),
+ ok;
+ false -> ok
+ end.
+
+maybe_stop(X = #exchange{name = XName}) ->
+ case federate(X) of
+ true -> ok = rabbit_federation_exchange_link_sup_sup:stop_child(X),
+ rabbit_federation_status:remove_exchange_or_queue(XName);
+ false -> ok
+ end.
diff --git a/deps/rabbitmq_federation/src/rabbit_federation_exchange_link.erl b/deps/rabbitmq_federation/src/rabbit_federation_exchange_link.erl
new file mode 100644
index 0000000000..869ab047ae
--- /dev/null
+++ b/deps/rabbitmq_federation/src/rabbit_federation_exchange_link.erl
@@ -0,0 +1,696 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_federation_exchange_link).
+
+%% pg2 is deprecated in OTP 23.
+-compile(nowarn_deprecated_function).
+
+-include_lib("amqp_client/include/amqp_client.hrl").
+-include("rabbit_federation.hrl").
+
+-behaviour(gen_server2).
+
+-export([go/0, add_binding/3, remove_bindings/3]).
+-export([list_routing_keys/1]). %% For testing
+
+-export([start_link/1]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-import(rabbit_misc, [pget/2]).
+-import(rabbit_federation_util, [name/1, vhost/1, pgname/1]).
+
+-record(state, {upstream,
+ upstream_params,
+ upstream_name,
+ connection,
+ channel,
+ cmd_channel,
+ consumer_tag,
+ queue,
+ internal_exchange,
+ waiting_cmds = gb_trees:empty(),
+ next_serial,
+ bindings = #{},
+ downstream_connection,
+ downstream_channel,
+ downstream_exchange,
+ unacked,
+ internal_exchange_timer,
+ internal_exchange_interval}).
+
+%%----------------------------------------------------------------------------
+
+%% We start off in a state where we do not connect, since we can first
+%% start during exchange recovery, when rabbit is not fully started
+%% and the Erlang client is not running. This then gets invoked when
+%% the federation app is started.
+go() -> cast(go).
+
+add_binding(S, XN, B) -> cast(XN, {enqueue, S, {add_binding, B}}).
+remove_bindings(S, XN, Bs) -> cast(XN, {enqueue, S, {remove_bindings, Bs}}).
+
+list_routing_keys(XN) -> call(XN, list_routing_keys).
+
+%%----------------------------------------------------------------------------
+
+start_link(Args) ->
+ gen_server2:start_link(?MODULE, Args, [{timeout, infinity}]).
+
+init({Upstream, XName}) ->
+ %% If we are starting up due to a policy change then it's possible
+ %% for the exchange to have been deleted before we got here, in which
+ %% case it's possible that delete callback would also have been called
+ %% before we got here. So check if we still exist.
+ case rabbit_exchange:lookup(XName) of
+ {ok, X} ->
+ DeobfuscatedUpstream = rabbit_federation_util:deobfuscate_upstream(Upstream),
+ DeobfuscatedUParams = rabbit_federation_upstream:to_params(DeobfuscatedUpstream, X),
+ UParams = rabbit_federation_util:obfuscate_upstream_params(DeobfuscatedUParams),
+ rabbit_federation_status:report(Upstream, UParams, XName, starting),
+ join(rabbit_federation_exchanges),
+ join({rabbit_federation_exchange, XName}),
+ gen_server2:cast(self(), maybe_go),
+ {ok, {not_started, {Upstream, UParams, XName}}};
+ {error, not_found} ->
+ rabbit_federation_link_util:log_warning(XName, "not found, stopping link~n", []),
+ {stop, gone}
+ end.
+
+handle_call(list_routing_keys, _From, State = #state{bindings = Bindings}) ->
+ {reply, lists:sort([K || {K, _} <- maps:keys(Bindings)]), State};
+
+handle_call(Msg, _From, State) ->
+ {stop, {unexpected_call, Msg}, State}.
+
+handle_cast(maybe_go, S0 = {not_started, _Args}) ->
+ case federation_up() of
+ true -> go(S0);
+ false -> {noreply, S0}
+ end;
+
+handle_cast(go, S0 = {not_started, _Args}) ->
+ go(S0);
+
+%% There's a small race - I think we can realise federation is up
+%% before 'go' gets invoked. Ignore.
+handle_cast(go, State) ->
+ {noreply, State};
+
+handle_cast({enqueue, _, _}, State = {not_started, _}) ->
+ {noreply, State};
+
+handle_cast({enqueue, Serial, Cmd},
+ State = #state{waiting_cmds = Waiting,
+ downstream_exchange = XName}) ->
+ Waiting1 = gb_trees:insert(Serial, Cmd, Waiting),
+ try
+ {noreply, play_back_commands(State#state{waiting_cmds = Waiting1})}
+ catch exit:{{shutdown, {server_initiated_close, 404, Text}}, _} ->
+ rabbit_federation_link_util:log_warning(
+ XName, "detected upstream changes, restarting link: ~p~n", [Text]),
+ {stop, {shutdown, restart}, State}
+ end;
+
+handle_cast(Msg, State) ->
+ {stop, {unexpected_cast, Msg}, State}.
+
+handle_info(#'basic.consume_ok'{}, State) ->
+ {noreply, State};
+
+handle_info(#'basic.ack'{} = Ack, State = #state{channel = Ch,
+ unacked = Unacked}) ->
+ Unacked1 = rabbit_federation_link_util:ack(Ack, Ch, Unacked),
+ {noreply, State#state{unacked = Unacked1}};
+
+handle_info(#'basic.nack'{} = Nack, State = #state{channel = Ch,
+ unacked = Unacked}) ->
+ Unacked1 = rabbit_federation_link_util:nack(Nack, Ch, Unacked),
+ {noreply, State#state{unacked = Unacked1}};
+
+handle_info({#'basic.deliver'{routing_key = Key,
+ redelivered = Redelivered} = DeliverMethod, Msg},
+ State = #state{
+ upstream = Upstream = #upstream{max_hops = MaxH},
+ upstream_params = UParams = #upstream_params{x_or_q = UpstreamX},
+ upstream_name = UName,
+ downstream_exchange = #resource{name = XNameBin, virtual_host = DVhost},
+ downstream_channel = DCh,
+ channel = Ch,
+ unacked = Unacked}) ->
+ UVhost = vhost(UpstreamX),
+ PublishMethod = #'basic.publish'{exchange = XNameBin,
+ routing_key = Key},
+ HeadersFun = fun (H) -> update_routing_headers(UParams, UName, UVhost, Redelivered, H) end,
+ %% We need to check should_forward/2 here in case the upstream
+ %% does not have federation and thus is using a fanout exchange.
+ ForwardFun = fun (H) ->
+ DName = rabbit_nodes:cluster_name(),
+ rabbit_federation_util:should_forward(H, MaxH, DName, DVhost)
+ end,
+ Unacked1 = rabbit_federation_link_util:forward(
+ Upstream, DeliverMethod, Ch, DCh, PublishMethod,
+ HeadersFun, ForwardFun, Msg, Unacked),
+ {noreply, State#state{unacked = Unacked1}};
+
+handle_info(#'basic.cancel'{}, State = #state{upstream = Upstream,
+ upstream_params = UParams,
+ downstream_exchange = XName}) ->
+ rabbit_federation_link_util:connection_error(
+ local, basic_cancel, Upstream, UParams, XName, State);
+
+handle_info({'DOWN', _Ref, process, Pid, Reason},
+ State = #state{downstream_channel = DCh,
+ channel = Ch,
+ cmd_channel = CmdCh,
+ upstream = Upstream,
+ upstream_params = UParams,
+ downstream_exchange = XName}) ->
+ handle_down(Pid, Reason, Ch, CmdCh, DCh,
+ {Upstream, UParams, XName}, State);
+
+handle_info(check_internal_exchange, State = #state{internal_exchange = IntXNameBin,
+ internal_exchange_interval = Interval}) ->
+ case check_internal_exchange(IntXNameBin, State) of
+ upstream_not_found ->
+ rabbit_log_federation:warning("Federation link could not find upstream exchange '~s' and will restart",
+ [IntXNameBin]),
+ {stop, {shutdown, restart}, State};
+ _ ->
+ TRef = erlang:send_after(Interval, self(), check_internal_exchange),
+ {noreply, State#state{internal_exchange_timer = TRef}}
+ end;
+
+handle_info(Msg, State) ->
+ {stop, {unexpected_info, Msg}, State}.
+
+terminate(_Reason, {not_started, _}) ->
+ ok;
+terminate(Reason, #state{downstream_connection = DConn,
+ connection = Conn,
+ upstream = Upstream,
+ upstream_params = UParams,
+ downstream_exchange = XName,
+ internal_exchange_timer = TRef,
+ internal_exchange = IntExchange,
+ queue = Queue}) when Reason =:= shutdown;
+ Reason =:= {shutdown, restart};
+ Reason =:= gone ->
+ timer:cancel(TRef),
+ rabbit_federation_link_util:ensure_connection_closed(DConn),
+
+ rabbit_log:debug("Exchange federation: link is shutting down, resource cleanup mode: ~p", [Upstream#upstream.resource_cleanup_mode]),
+ case Upstream#upstream.resource_cleanup_mode of
+ never -> ok;
+ _ ->
+ %% This is a normal shutdown and we are allowed to clean up the internally used queue and exchange
+ rabbit_log:debug("Federated exchange '~s' link will delete its internal queue '~s'", [Upstream#upstream.exchange_name, Queue]),
+ delete_upstream_queue(Conn, Queue),
+ rabbit_log:debug("Federated exchange '~s' link will delete its upstream exchange", [Upstream#upstream.exchange_name]),
+ delete_upstream_exchange(Conn, IntExchange)
+ end,
+
+ rabbit_federation_link_util:ensure_connection_closed(Conn),
+ rabbit_federation_link_util:log_terminate(Reason, Upstream, UParams, XName),
+ ok;
+%% unexpected shutdown
+terminate(Reason, #state{downstream_connection = DConn,
+ connection = Conn,
+ upstream = Upstream,
+ upstream_params = UParams,
+ downstream_exchange = XName,
+ internal_exchange_timer = TRef}) ->
+ timer:cancel(TRef),
+
+ rabbit_federation_link_util:ensure_connection_closed(DConn),
+
+ %% unlike in the clean shutdown case above, we keep the queue
+ %% and exchange around
+
+ rabbit_federation_link_util:ensure_connection_closed(Conn),
+ rabbit_federation_link_util:log_terminate(Reason, Upstream, UParams, XName),
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%----------------------------------------------------------------------------
+
+call(XName, Msg) -> [gen_server2:call(Pid, Msg, infinity) || Pid <- x(XName)].
+cast(Msg) -> [gen_server2:cast(Pid, Msg) || Pid <- all()].
+cast(XName, Msg) -> [gen_server2:cast(Pid, Msg) || Pid <- x(XName)].
+
+join(Name) ->
+ pg2:create(pgname(Name)),
+ ok = pg2:join(pgname(Name), self()).
+
+all() ->
+ pg2:create(pgname(rabbit_federation_exchanges)),
+ pg2:get_members(pgname(rabbit_federation_exchanges)).
+
+x(XName) ->
+ pg2:create(pgname({rabbit_federation_exchange, XName})),
+ pg2:get_members(pgname({rabbit_federation_exchange, XName})).
+
+%%----------------------------------------------------------------------------
+
+federation_up() -> is_pid(whereis(rabbit_federation_app)).
+
+handle_command({add_binding, Binding}, State) ->
+ add_binding(Binding, State);
+
+handle_command({remove_bindings, Bindings}, State) ->
+ lists:foldl(fun remove_binding/2, State, Bindings).
+
+play_back_commands(State = #state{waiting_cmds = Waiting,
+ next_serial = Next}) ->
+ case gb_trees:is_empty(Waiting) of
+ false -> case gb_trees:take_smallest(Waiting) of
+ {Next, Cmd, Waiting1} ->
+ %% The next one. Just execute it.
+ play_back_commands(
+ handle_command(Cmd, State#state{
+ waiting_cmds = Waiting1,
+ next_serial = Next + 1}));
+ {Serial, _Cmd, Waiting1} when Serial < Next ->
+ %% This command came from before we executed
+ %% binding:list_for_source. Ignore it.
+ play_back_commands(State#state{
+ waiting_cmds = Waiting1});
+ _ ->
+ %% Some future command. Don't do anything.
+ State
+ end;
+ true -> State
+ end.
+
+add_binding(B, State) ->
+ binding_op(fun record_binding/2, bind_cmd(bind, B, State), B, State).
+
+remove_binding(B, State) ->
+ binding_op(fun forget_binding/2, bind_cmd(unbind, B, State), B, State).
+
+record_binding(B = #binding{destination = Dest},
+ State = #state{bindings = Bs}) ->
+ {DoIt, Set} = case maps:find(key(B), Bs) of
+ error -> {true, sets:from_list([Dest])};
+ {ok, Dests} -> {false, sets:add_element(
+ Dest, Dests)}
+ end,
+ {DoIt, State#state{bindings = maps:put(key(B), Set, Bs)}}.
+
+forget_binding(B = #binding{destination = Dest},
+ State = #state{bindings = Bs}) ->
+ Dests = sets:del_element(Dest, maps:get(key(B), Bs)),
+ {DoIt, Bs1} = case sets:size(Dests) of
+ 0 -> {true, maps:remove(key(B), Bs)};
+ _ -> {false, maps:put(key(B), Dests, Bs)}
+ end,
+ {DoIt, State#state{bindings = Bs1}}.
+
+binding_op(UpdateFun, Cmd, B = #binding{args = Args},
+ State = #state{cmd_channel = Ch}) ->
+ {DoIt, State1} =
+ case rabbit_misc:table_lookup(Args, ?BINDING_HEADER) of
+ undefined -> UpdateFun(B, State);
+ {array, _} -> {Cmd =/= ignore, State}
+ end,
+ case DoIt of
+ true -> amqp_channel:call(Ch, Cmd);
+ false -> ok
+ end,
+ State1.
+
+bind_cmd(Type, #binding{key = Key, args = Args},
+ State = #state{internal_exchange = IntXNameBin,
+ upstream_params = UpstreamParams,
+ upstream = Upstream}) ->
+ #upstream_params{x_or_q = X} = UpstreamParams,
+ #upstream{bind_nowait = Nowait} = Upstream,
+ case update_binding(Args, State) of
+ ignore -> ignore;
+ NewArgs -> bind_cmd0(Type, name(X), IntXNameBin, Key, NewArgs, Nowait)
+ end.
+
+bind_cmd0(bind, Source, Destination, RoutingKey, Arguments, Nowait) ->
+ #'exchange.bind'{source = Source,
+ destination = Destination,
+ routing_key = RoutingKey,
+ arguments = Arguments,
+ nowait = Nowait};
+
+bind_cmd0(unbind, Source, Destination, RoutingKey, Arguments, Nowait) ->
+ #'exchange.unbind'{source = Source,
+ destination = Destination,
+ routing_key = RoutingKey,
+ arguments = Arguments,
+ nowait = Nowait}.
+
+%% This function adds information about the current node to the
+%% binding arguments, or returns 'ignore' if it determines the binding
+%% should propagate no further. The interesting part is the latter.
+%%
+%% We want bindings to propagate in the same way as messages
+%% w.r.t. max_hops - if we determine that a message can get from node
+%% A to B (assuming bindings are in place) then it follows that a
+%% binding at B should propagate back to A, and no further. There is
+%% no point in propagating bindings past the point where messages
+%% would propagate, and we will lose messages if bindings don't
+%% propagate as far.
+%%
+%% Note that we still want to have limits on how far messages can
+%% propagate: limiting our bindings is not enough, since other
+%% bindings from other nodes can overlap.
+%%
+%% So in short we want bindings to obey max_hops. However, they can't
+%% just obey the max_hops of the current link, since they are
+%% travelling in the opposite direction to messages! Consider the
+%% following federation:
+%%
+%% A -----------> B -----------> C
+%% max_hops=1 max_hops=2
+%%
+%% where the arrows indicate message flow. A binding created at C
+%% should propagate to B, then to A, and no further. Therefore every
+%% time we traverse a link, we keep a count of the number of hops that
+%% a message could have made so far to reach this point, and still be
+%% able to propagate. When this number ("hops" below) reaches 0 we
+%% propagate no further.
+%%
+%% hops(link(N)) is given by:
+%%
+%% min(hops(link(N-1))-1, max_hops(link(N)))
+%%
+%% where link(N) is the link that bindings propagate over after N
+%% steps (e.g. link(1) is CB above, link(2) is BA).
+%%
+%% In other words, we count down to 0 from the link with the most
+%% restrictive max_hops we have yet passed through.
+
+update_binding(Args, #state{downstream_exchange = X,
+ upstream = Upstream,
+ upstream_params = #upstream_params{x_or_q = UpstreamX},
+ upstream_name = UName}) ->
+ #upstream{max_hops = MaxHops} = Upstream,
+ UVhost = vhost(UpstreamX),
+ Hops = case rabbit_misc:table_lookup(Args, ?BINDING_HEADER) of
+ undefined -> MaxHops;
+ {array, All} -> [{table, Prev} | _] = All,
+ PrevHops = get_hops(Prev),
+ case rabbit_federation_util:already_seen(
+ UName, UVhost, All) of
+ true -> 0;
+ false -> lists:min([PrevHops - 1, MaxHops])
+ end
+ end,
+ case Hops of
+ 0 -> ignore;
+ _ -> Cluster = rabbit_nodes:cluster_name(),
+ ABSuffix = rabbit_federation_db:get_active_suffix(
+ X, Upstream, <<"A">>),
+ DVhost = vhost(X),
+ DName = name(X),
+ Down = <<DVhost/binary,":", DName/binary, " ", ABSuffix/binary>>,
+ Info = [{<<"cluster-name">>, longstr, Cluster},
+ {<<"vhost">>, longstr, DVhost},
+ {<<"exchange">>, longstr, Down},
+ {<<"hops">>, short, Hops}],
+ rabbit_basic:prepend_table_header(?BINDING_HEADER, Info, Args)
+ end.
+
+
+
+key(#binding{key = Key, args = Args}) -> {Key, Args}.
+
+go(S0 = {not_started, {Upstream, UParams, DownXName}}) ->
+ Unacked = rabbit_federation_link_util:unacked_new(),
+
+ log_link_startup_attempt(Upstream, DownXName),
+ rabbit_federation_link_util:start_conn_ch(
+ fun (Conn, Ch, DConn, DCh) ->
+ {ok, CmdCh} = open_cmd_channel(Conn, Upstream, UParams, DownXName, S0),
+ erlang:monitor(process, CmdCh),
+ Props = pget(server_properties,
+ amqp_connection:info(Conn, [server_properties])),
+ UName = case rabbit_misc:table_lookup(
+ Props, <<"cluster_name">>) of
+ {longstr, N} -> N;
+ _ -> unknown
+ end,
+ {Serial, Bindings} =
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ {rabbit_exchange:peek_serial(DownXName),
+ rabbit_binding:list_for_source(DownXName)}
+ end),
+ true = is_integer(Serial),
+ %% If we are very short lived, Serial can be undefined at
+ %% this point (since the deletion of the X could have
+ %% overtaken the creation of this process). However, this
+ %% is not a big deal - 'undefined' just becomes the next
+ %% serial we will process. Since it compares larger than
+ %% any number we never process any commands. And we will
+ %% soon get told to stop anyway.
+ {ok, Interval} = application:get_env(rabbitmq_federation,
+ internal_exchange_check_interval),
+ State = ensure_upstream_bindings(
+ consume_from_upstream_queue(
+ #state{upstream = Upstream,
+ upstream_params = UParams,
+ upstream_name = UName,
+ connection = Conn,
+ channel = Ch,
+ cmd_channel = CmdCh,
+ next_serial = Serial,
+ downstream_connection = DConn,
+ downstream_channel = DCh,
+ downstream_exchange = DownXName,
+ unacked = Unacked,
+ internal_exchange_interval = Interval}),
+ Bindings),
+ rabbit_log_federation:info("Federation link for ~s (upstream: ~s) will perform internal exchange checks "
+ "every ~b seconds", [rabbit_misc:rs(DownXName), UName, round(Interval / 1000)]),
+ TRef = erlang:send_after(Interval, self(), check_internal_exchange),
+ {noreply, State#state{internal_exchange_timer = TRef}}
+ end, Upstream, UParams, DownXName, S0).
+
+log_link_startup_attempt(OUpstream, DownXName) ->
+ rabbit_log_federation:debug("Will try to start a federation link for ~s, upstream: '~s'",
+ [rabbit_misc:rs(DownXName), OUpstream#upstream.name]).
+
+open_cmd_channel(Conn, Upstream = #upstream{name = UName}, UParams, DownXName, S0) ->
+ rabbit_log_federation:debug("Will open a command channel to upstream '~s' for downstream federated ~s",
+ [UName, rabbit_misc:rs(DownXName)]),
+ case amqp_connection:open_channel(Conn) of
+ {ok, CCh} ->
+ erlang:monitor(process, CCh),
+ {ok, CCh};
+ E ->
+ rabbit_federation_link_util:ensure_connection_closed(Conn),
+ rabbit_federation_link_util:connection_error(command_channel, E,
+ Upstream, UParams, DownXName, S0),
+ E
+ end.
+
+consume_from_upstream_queue(
+ State = #state{upstream = Upstream,
+ upstream_params = UParams,
+ channel = Ch,
+ downstream_exchange = DownXName}) ->
+ #upstream{prefetch_count = Prefetch,
+ expires = Expiry,
+ message_ttl = TTL,
+ ha_policy = HA} = Upstream,
+ #upstream_params{x_or_q = X,
+ params = Params} = UParams,
+ Q = upstream_queue_name(name(X), vhost(Params), DownXName),
+ Args = [A || {_K, _T, V} = A
+ <- [{<<"x-expires">>, long, Expiry},
+ {<<"x-message-ttl">>, long, TTL},
+ {<<"x-ha-policy">>, longstr, HA},
+ {<<"x-internal-purpose">>, longstr, <<"federation">>}],
+ V =/= none],
+ amqp_channel:call(Ch, #'queue.declare'{queue = Q,
+ durable = true,
+ arguments = Args}),
+ NoAck = Upstream#upstream.ack_mode =:= 'no-ack',
+ case NoAck of
+ false -> amqp_channel:call(Ch, #'basic.qos'{prefetch_count = Prefetch});
+ true -> ok
+ end,
+ #'basic.consume_ok'{consumer_tag = CTag} =
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q,
+ no_ack = NoAck}, self()),
+ State#state{consumer_tag = CTag,
+ queue = Q}.
+
+ensure_upstream_bindings(State = #state{upstream = Upstream,
+ connection = Conn,
+ channel = Ch,
+ downstream_exchange = DownXName,
+ queue = Q}, Bindings) ->
+ OldSuffix = rabbit_federation_db:get_active_suffix(
+ DownXName, Upstream, <<"A">>),
+ Suffix = case OldSuffix of
+ <<"A">> -> <<"B">>;
+ <<"B">> -> <<"A">>
+ end,
+ IntXNameBin = upstream_exchange_name(Q, Suffix),
+ ensure_upstream_exchange(State),
+ ensure_internal_exchange(IntXNameBin, State),
+ amqp_channel:call(Ch, #'queue.bind'{exchange = IntXNameBin, queue = Q}),
+ State1 = State#state{internal_exchange = IntXNameBin},
+ rabbit_federation_db:set_active_suffix(DownXName, Upstream, Suffix),
+ State2 = lists:foldl(fun add_binding/2, State1, Bindings),
+ OldIntXNameBin = upstream_exchange_name(Q, OldSuffix),
+ delete_upstream_exchange(Conn, OldIntXNameBin),
+ State2.
+
+ensure_upstream_exchange(#state{upstream_params = UParams,
+ connection = Conn,
+ channel = Ch}) ->
+ #upstream_params{x_or_q = X} = UParams,
+ #exchange{type = Type,
+ durable = Durable,
+ auto_delete = AutoDelete,
+ internal = Internal,
+ arguments = Arguments} = X,
+ Decl = #'exchange.declare'{exchange = name(X),
+ type = list_to_binary(atom_to_list(Type)),
+ durable = Durable,
+ auto_delete = AutoDelete,
+ internal = Internal,
+ arguments = Arguments},
+ rabbit_federation_link_util:disposable_channel_call(
+ Conn, Decl#'exchange.declare'{passive = true},
+ fun(?NOT_FOUND, _Text) ->
+ amqp_channel:call(Ch, Decl)
+ end).
+
+ensure_internal_exchange(IntXNameBin,
+ #state{upstream = #upstream{max_hops = MaxHops, name = UName},
+ upstream_params = UParams,
+ connection = Conn,
+ channel = Ch,
+ downstream_exchange = #resource{virtual_host = DVhost}}) ->
+ rabbit_log_federation:debug("Exchange federation will set up exchange '~s' in upstream '~s'",
+ [IntXNameBin, UName]),
+ #upstream_params{params = Params} = rabbit_federation_util:deobfuscate_upstream_params(UParams),
+ rabbit_log_federation:debug("Will delete upstream exchange '~s'", [IntXNameBin]),
+ delete_upstream_exchange(Conn, IntXNameBin),
+ rabbit_log_federation:debug("Will declare an internal upstream exchange '~s'", [IntXNameBin]),
+ Base = #'exchange.declare'{exchange = IntXNameBin,
+ durable = true,
+ internal = true,
+ auto_delete = true},
+ Purpose = [{<<"x-internal-purpose">>, longstr, <<"federation">>}],
+ XFUArgs = [{?MAX_HOPS_ARG, long, MaxHops},
+ {?DOWNSTREAM_NAME_ARG, longstr, cycle_detection_node_identifier()},
+ {?DOWNSTREAM_VHOST_ARG, longstr, DVhost}
+ | Purpose],
+ XFU = Base#'exchange.declare'{type = <<"x-federation-upstream">>,
+ arguments = XFUArgs},
+ Fan = Base#'exchange.declare'{type = <<"fanout">>,
+ arguments = Purpose},
+ rabbit_federation_link_util:disposable_connection_call(
+ Params, XFU, fun(?COMMAND_INVALID, _Text) ->
+ amqp_channel:call(Ch, Fan)
+ end).
+
+check_internal_exchange(IntXNameBin,
+ #state{upstream = #upstream{max_hops = MaxHops, name = UName},
+ upstream_params = UParams,
+ downstream_exchange = XName = #resource{virtual_host = DVhost}}) ->
+ #upstream_params{params = Params} =
+ rabbit_federation_util:deobfuscate_upstream_params(UParams),
+ rabbit_log_federation:debug("Exchange federation will check on exchange '~s' in upstream '~s'",
+ [IntXNameBin, UName]),
+ Base = #'exchange.declare'{exchange = IntXNameBin,
+ passive = true,
+ durable = true,
+ internal = true,
+ auto_delete = true},
+ Purpose = [{<<"x-internal-purpose">>, longstr, <<"federation">>}],
+ XFUArgs = [{?MAX_HOPS_ARG, long, MaxHops},
+ {?DOWNSTREAM_NAME_ARG, longstr, cycle_detection_node_identifier()},
+ {?DOWNSTREAM_VHOST_ARG, longstr, DVhost}
+ | Purpose],
+ XFU = Base#'exchange.declare'{type = <<"x-federation-upstream">>,
+ arguments = XFUArgs},
+ rabbit_federation_link_util:disposable_connection_call(
+ Params, XFU, fun(404, Text) ->
+ rabbit_federation_link_util:log_warning(
+ XName, "detected internal upstream exchange changes,"
+ " restarting link: ~p~n", [Text]),
+ upstream_not_found;
+ (Code, Text) ->
+ rabbit_federation_link_util:log_warning(
+ XName, "internal upstream exchange check failed: ~p ~p~n",
+ [Code, Text]),
+ error
+ end).
+
+upstream_queue_name(XNameBin, VHost, #resource{name = DownXNameBin,
+ virtual_host = DownVHost}) ->
+ Node = rabbit_nodes:cluster_name(),
+ DownPart = case DownVHost of
+ VHost -> case DownXNameBin of
+ XNameBin -> <<"">>;
+ _ -> <<":", DownXNameBin/binary>>
+ end;
+ _ -> <<":", DownVHost/binary,
+ ":", DownXNameBin/binary>>
+ end,
+ <<"federation: ", XNameBin/binary, " -> ", Node/binary, DownPart/binary>>.
+
+cycle_detection_node_identifier() ->
+ rabbit_nodes:cluster_name().
+
+upstream_exchange_name(UpstreamQName, Suffix) ->
+ <<UpstreamQName/binary, " ", Suffix/binary>>.
+
+delete_upstream_exchange(Conn, XNameBin) ->
+ rabbit_federation_link_util:disposable_channel_call(
+ Conn, #'exchange.delete'{exchange = XNameBin}).
+
+delete_upstream_queue(Conn, Queue) ->
+ rabbit_federation_link_util:disposable_channel_call(
+ Conn, #'queue.delete'{queue = Queue}).
+
+update_routing_headers(#upstream_params{table = Table}, UpstreamName, UVhost, Redelivered, Headers) ->
+ NewValue = Table ++
+ [{<<"redelivered">>, bool, Redelivered}] ++
+ header_for_upstream_name(UpstreamName) ++
+ header_for_upstream_vhost(UVhost),
+ rabbit_basic:prepend_table_header(?ROUTING_HEADER, NewValue, Headers).
+
+header_for_upstream_name(unknown) -> [];
+header_for_upstream_name(Name) -> [{<<"cluster-name">>, longstr, Name}].
+
+header_for_upstream_vhost(unknown) -> [];
+header_for_upstream_vhost(Name) -> [{<<"vhost">>, longstr, Name}].
+
+get_hops(Table) ->
+ case rabbit_misc:table_lookup(Table, <<"hops">>) of
+ %% see rabbit_binary_generator
+ {short, N} -> N;
+ {long, N} -> N;
+ {byte, N} -> N;
+ {signedint, N} -> N;
+ {unsignedbyte, N} -> N;
+ {unsignedshort, N} -> N;
+ {unsignedint, N} -> N;
+ {_, N} when is_integer(N) andalso N >= 0 -> N
+ end.
+
+handle_down(DCh, Reason, _Ch, _CmdCh, DCh, Args, State) ->
+ rabbit_federation_link_util:handle_downstream_down(Reason, Args, State);
+handle_down(ChPid, Reason, Ch, CmdCh, _DCh, Args, State)
+ when ChPid =:= Ch; ChPid =:= CmdCh ->
+ rabbit_federation_link_util:handle_upstream_down(Reason, Args, State).
diff --git a/deps/rabbitmq_federation/src/rabbit_federation_exchange_link_sup_sup.erl b/deps/rabbitmq_federation/src/rabbit_federation_exchange_link_sup_sup.erl
new file mode 100644
index 0000000000..fda76a5070
--- /dev/null
+++ b/deps/rabbitmq_federation/src/rabbit_federation_exchange_link_sup_sup.erl
@@ -0,0 +1,75 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_federation_exchange_link_sup_sup).
+
+-behaviour(mirrored_supervisor).
+
+-include_lib("rabbit_common/include/rabbit.hrl").
+-define(SUPERVISOR, ?MODULE).
+
+%% Supervises the upstream links for all exchanges (but not queues). We need
+%% different handling here since exchanges want a mirrored sup.
+
+-export([start_link/0, start_child/1, adjust/1, stop_child/1]).
+-export([init/1]).
+
+%%----------------------------------------------------------------------------
+
+start_link() ->
+ mirrored_supervisor:start_link({local, ?SUPERVISOR}, ?SUPERVISOR,
+ fun rabbit_misc:execute_mnesia_transaction/1,
+ ?MODULE, []).
+
+%% Note that the next supervisor down, rabbit_federation_link_sup, is common
+%% between exchanges and queues.
+start_child(X) ->
+ case mirrored_supervisor:start_child(
+ ?SUPERVISOR,
+ {id(X), {rabbit_federation_link_sup, start_link, [X]},
+ transient, ?SUPERVISOR_WAIT, supervisor,
+ [rabbit_federation_link_sup]}) of
+ {ok, _Pid} -> ok;
+ {error, {already_started, _Pid}} ->
+ #exchange{name = ExchangeName} = X,
+ rabbit_log_federation:debug("Federation link for exchange ~p was already started",
+ [rabbit_misc:rs(ExchangeName)]),
+ ok;
+ %% A link returned {stop, gone}, the link_sup shut down, that's OK.
+ {error, {shutdown, _}} -> ok
+ end.
+
+adjust({clear_upstream, VHost, UpstreamName}) ->
+ [rabbit_federation_link_sup:adjust(Pid, X, {clear_upstream, UpstreamName}) ||
+ {#exchange{name = Name} = X, Pid, _, _} <- mirrored_supervisor:which_children(?SUPERVISOR),
+ Name#resource.virtual_host == VHost],
+ ok;
+adjust(Reason) ->
+ [rabbit_federation_link_sup:adjust(Pid, X, Reason) ||
+ {X, Pid, _, _} <- mirrored_supervisor:which_children(?SUPERVISOR)],
+ ok.
+
+stop_child(X) ->
+ case mirrored_supervisor:terminate_child(?SUPERVISOR, id(X)) of
+ ok -> ok;
+ {error, Err} ->
+ #exchange{name = ExchangeName} = X,
+ rabbit_log_federation:warning(
+ "Attempt to stop a federation link for exchange ~p failed: ~p",
+ [rabbit_misc:rs(ExchangeName), Err]),
+ ok
+ end,
+ ok = mirrored_supervisor:delete_child(?SUPERVISOR, id(X)).
+
+%%----------------------------------------------------------------------------
+
+init([]) ->
+ {ok, {{one_for_one, 1200, 60}, []}}.
+
+%% See comment in rabbit_federation_queue_link_sup_sup:id/1
+id(X = #exchange{policy = Policy}) -> X1 = rabbit_exchange:immutable(X),
+ X1#exchange{policy = Policy}.
diff --git a/deps/rabbitmq_federation/src/rabbit_federation_link_sup.erl b/deps/rabbitmq_federation/src/rabbit_federation_link_sup.erl
new file mode 100644
index 0000000000..27d1b50277
--- /dev/null
+++ b/deps/rabbitmq_federation/src/rabbit_federation_link_sup.erl
@@ -0,0 +1,109 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_federation_link_sup).
+
+-behaviour(supervisor2).
+
+-include_lib("rabbit_common/include/rabbit.hrl").
+-include_lib("rabbit/include/amqqueue.hrl").
+-include("rabbit_federation.hrl").
+
+%% Supervises the upstream links for an exchange or queue.
+
+-export([start_link/1, adjust/3, restart/2]).
+-export([init/1]).
+
+start_link(XorQ) ->
+ supervisor2:start_link(?MODULE, XorQ).
+
+adjust(Sup, XorQ, everything) ->
+ [stop(Sup, Upstream, XorQ) ||
+ {Upstream, _, _, _} <- supervisor2:which_children(Sup)],
+ [{ok, _Pid} = supervisor2:start_child(Sup, Spec) || Spec <- specs(XorQ)];
+
+adjust(Sup, XorQ, {upstream, UpstreamName}) ->
+ OldUpstreams0 = children(Sup, UpstreamName),
+ NewUpstreams0 = rabbit_federation_upstream:for(XorQ, UpstreamName),
+ %% If any haven't changed, don't restart them. The broker will
+ %% avoid telling us about connections that have not changed
+ %% syntactically, but even if one has, this XorQ may not have that
+ %% connection in an upstream, so we still need to check here.
+ {OldUpstreams, NewUpstreams} =
+ lists:foldl(
+ fun (OldU, {OldUs, NewUs}) ->
+ case lists:member(OldU, NewUs) of
+ true -> {OldUs -- [OldU], NewUs -- [OldU]};
+ false -> {OldUs, NewUs}
+ end
+ end, {OldUpstreams0, NewUpstreams0}, OldUpstreams0),
+ [stop(Sup, OldUpstream, XorQ) || OldUpstream <- OldUpstreams],
+ [start(Sup, NewUpstream, XorQ) || NewUpstream <- NewUpstreams];
+
+adjust(Sup, XorQ, {clear_upstream, UpstreamName}) ->
+ ok = rabbit_federation_db:prune_scratch(
+ name(XorQ), rabbit_federation_upstream:for(XorQ)),
+ [stop(Sup, Upstream, XorQ) || Upstream <- children(Sup, UpstreamName)];
+
+adjust(Sup, X = #exchange{name = XName}, {upstream_set, _Set}) ->
+ adjust(Sup, X, everything),
+ case rabbit_federation_upstream:federate(X) of
+ false -> ok;
+ true -> ok = rabbit_federation_db:prune_scratch(
+ XName, rabbit_federation_upstream:for(X))
+ end;
+adjust(Sup, Q, {upstream_set, _}) when ?is_amqqueue(Q) ->
+ adjust(Sup, Q, everything);
+adjust(Sup, XorQ, {clear_upstream_set, _}) ->
+ adjust(Sup, XorQ, everything).
+
+restart(Sup, Upstream) ->
+ ok = supervisor2:terminate_child(Sup, Upstream),
+ {ok, _Pid} = supervisor2:restart_child(Sup, Upstream),
+ ok.
+
+start(Sup, Upstream, XorQ) ->
+ {ok, _Pid} = supervisor2:start_child(Sup, spec(rabbit_federation_util:obfuscate_upstream(Upstream), XorQ)),
+ ok.
+
+stop(Sup, Upstream, XorQ) ->
+ ok = supervisor2:terminate_child(Sup, Upstream),
+ ok = supervisor2:delete_child(Sup, Upstream),
+ %% While the link will report its own removal, that only works if
+ %% the link was actually up. If the link was broken and failing to
+ %% come up, the possibility exists that there *is* no link
+ %% process, but we still have a report in the status table. So
+ %% remove it here too.
+ rabbit_federation_status:remove(Upstream, name(XorQ)).
+
+children(Sup, UpstreamName) ->
+ rabbit_federation_util:find_upstreams(
+ UpstreamName, [U || {U, _, _, _} <- supervisor2:which_children(Sup)]).
+
+%%----------------------------------------------------------------------------
+
+init(XorQ) ->
+ %% 1, ?MAX_WAIT so that we always give up after one fast retry and get
+ %% into the reconnect delay.
+ {ok, {{one_for_one, 1, ?MAX_WAIT}, specs(XorQ)}}.
+
+specs(XorQ) ->
+ [spec(rabbit_federation_util:obfuscate_upstream(Upstream), XorQ)
+ || Upstream <- rabbit_federation_upstream:for(XorQ)].
+
+spec(U = #upstream{reconnect_delay = Delay}, #exchange{name = XName}) ->
+ {U, {rabbit_federation_exchange_link, start_link, [{U, XName}]},
+ {permanent, Delay}, ?WORKER_WAIT, worker,
+ [rabbit_federation_exchange_link]};
+
+spec(Upstream = #upstream{reconnect_delay = Delay}, Q) when ?is_amqqueue(Q) ->
+ {Upstream, {rabbit_federation_queue_link, start_link, [{Upstream, Q}]},
+ {permanent, Delay}, ?WORKER_WAIT, worker,
+ [rabbit_federation_queue_link]}.
+
+name(#exchange{name = XName}) -> XName;
+name(Q) when ?is_amqqueue(Q) -> amqqueue:get_name(Q).
diff --git a/deps/rabbitmq_federation/src/rabbit_federation_link_util.erl b/deps/rabbitmq_federation/src/rabbit_federation_link_util.erl
new file mode 100644
index 0000000000..a5fd560f0b
--- /dev/null
+++ b/deps/rabbitmq_federation/src/rabbit_federation_link_util.erl
@@ -0,0 +1,364 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_federation_link_util).
+
+-include_lib("rabbit/include/amqqueue.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+-include("rabbit_federation.hrl").
+
+%% real
+-export([start_conn_ch/5, disposable_channel_call/2, disposable_channel_call/3,
+ disposable_connection_call/3, ensure_connection_closed/1,
+ log_terminate/4, unacked_new/0, ack/3, nack/3, forward/9,
+ handle_downstream_down/3, handle_upstream_down/3,
+ get_connection_name/2, log_debug/3, log_info/3, log_warning/3,
+ log_error/3]).
+
+%% temp
+-export([connection_error/6]).
+
+-import(rabbit_misc, [pget/2]).
+
+-define(MAX_CONNECTION_CLOSE_TIMEOUT, 10000).
+
+%%----------------------------------------------------------------------------
+
+start_conn_ch(Fun, OUpstream, OUParams,
+ XorQName = #resource{virtual_host = DownVHost}, State) ->
+
+ Upstream = rabbit_federation_util:deobfuscate_upstream(OUpstream),
+ UParams = rabbit_federation_util:deobfuscate_upstream_params(OUParams),
+
+ ConnName = get_connection_name(Upstream, UParams),
+ case open_monitor(#amqp_params_direct{virtual_host = DownVHost}, ConnName) of
+ {ok, DConn, DCh} ->
+ case Upstream#upstream.ack_mode of
+ 'on-confirm' ->
+ #'confirm.select_ok'{} =
+ amqp_channel:call(DCh, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(DCh, self());
+ _ ->
+ ok
+ end,
+ case open_monitor(UParams#upstream_params.params, ConnName) of
+ {ok, Conn, Ch} ->
+ %% Don't trap exits until we have established
+ %% connections so that if we try to delete
+ %% federation upstreams while waiting for a
+ %% connection to be established then we don't
+ %% block
+ process_flag(trap_exit, true),
+ try
+ R = Fun(Conn, Ch, DConn, DCh),
+ log_info(
+ XorQName, "connected to ~s~n",
+ [rabbit_federation_upstream:params_to_string(
+ UParams)]),
+ Name = pget(name, amqp_connection:info(DConn, [name])),
+ rabbit_federation_status:report(
+ OUpstream, OUParams, XorQName, {running, Name}),
+ R
+ catch exit:E ->
+ %% terminate/2 will not get this, as we
+ %% have not put them in our state yet
+ ensure_connection_closed(DConn),
+ ensure_connection_closed(Conn),
+ connection_error(remote_start, E,
+ OUpstream, OUParams, XorQName, State)
+ end;
+ E ->
+ ensure_connection_closed(DConn),
+ connection_error(remote_start, E,
+ OUpstream, OUParams, XorQName, State)
+ end;
+ E ->
+ connection_error(local_start, E,
+ OUpstream, OUParams, XorQName, State)
+ end.
+
+get_connection_name(#upstream{name = UpstreamName},
+ #upstream_params{x_or_q = Resource}) when is_record(Resource, exchange)->
+ Policy = Resource#exchange.policy,
+ PolicyName = proplists:get_value(name, Policy),
+ connection_name(UpstreamName, PolicyName);
+
+get_connection_name(#upstream{name = UpstreamName},
+ #upstream_params{x_or_q = Resource}) when ?is_amqqueue(Resource) ->
+ Policy = amqqueue:get_policy(Resource),
+ PolicyName = proplists:get_value(name, Policy),
+ connection_name(UpstreamName, PolicyName);
+
+get_connection_name(_, _) ->
+ connection_name(undefined, undefined).
+
+connection_name(Upstream, Policy) when is_binary(Upstream), is_binary(Policy) ->
+ <<<<"Federation link (upstream: ">>/binary, Upstream/binary, <<", policy: ">>/binary, Policy/binary, <<")">>/binary>>;
+connection_name(_, _) ->
+ <<"Federation link">>.
+
+open_monitor(Params, Name) ->
+ case open(Params, Name) of
+ {ok, Conn, Ch} -> erlang:monitor(process, Ch),
+ {ok, Conn, Ch};
+ E -> E
+ end.
+
+open(Params, Name) ->
+ try
+ amqp_connection:start(Params, Name)
+ of
+ {ok, Conn} ->
+ try
+ amqp_connection:open_channel(Conn)
+ of
+ {ok, Ch} -> {ok, Conn, Ch};
+ E -> ensure_connection_closed(Conn),
+ E
+ catch
+ _:E ->
+ ensure_connection_closed(Conn),
+ E
+ end;
+ E -> E
+ catch
+ _:E -> E
+ end.
+
+ensure_channel_closed(Ch) -> catch amqp_channel:close(Ch).
+
+ensure_connection_closed(Conn) ->
+ catch amqp_connection:close(Conn, ?MAX_CONNECTION_CLOSE_TIMEOUT).
+
+connection_error(remote_start, {{shutdown, {server_initiated_close, Code, Message}}, _} = E,
+ Upstream, UParams, XorQName, State) ->
+ rabbit_federation_status:report(
+ Upstream, UParams, XorQName, clean_reason(E)),
+ log_warning(XorQName,
+ "did not connect to ~s. Server has closed the connection due to an error, code: ~p, "
+ "message: ~s",
+ [rabbit_federation_upstream:params_to_string(UParams),
+ Code, Message]),
+ {stop, {shutdown, restart}, State};
+
+connection_error(remote_start, E, Upstream, UParams, XorQName, State) ->
+ rabbit_federation_status:report(
+ Upstream, UParams, XorQName, clean_reason(E)),
+ log_warning(XorQName, "did not connect to ~s. Reason: ~p",
+ [rabbit_federation_upstream:params_to_string(UParams),
+ E]),
+ {stop, {shutdown, restart}, State};
+
+connection_error(remote, E, Upstream, UParams, XorQName, State) ->
+ rabbit_federation_status:report(
+ Upstream, UParams, XorQName, clean_reason(E)),
+ log_info(XorQName, "disconnected from ~s~n~p",
+ [rabbit_federation_upstream:params_to_string(UParams), E]),
+ {stop, {shutdown, restart}, State};
+
+connection_error(command_channel, E, Upstream, UParams, XorQName, State) ->
+ rabbit_federation_status:report(
+ Upstream, UParams, XorQName, clean_reason(E)),
+ log_info(XorQName, "failed to open a command channel for upstream ~s~n~p",
+ [rabbit_federation_upstream:params_to_string(UParams), E]),
+ {stop, {shutdown, restart}, State};
+
+connection_error(local, basic_cancel, Upstream, UParams, XorQName, State) ->
+ rabbit_federation_status:report(
+ Upstream, UParams, XorQName, {error, basic_cancel}),
+ log_info(XorQName, "received a 'basic.cancel'", []),
+ {stop, {shutdown, restart}, State};
+
+connection_error(local_start, E, Upstream, UParams, XorQName, State) ->
+ rabbit_federation_status:report(
+ Upstream, UParams, XorQName, clean_reason(E)),
+ log_warning(XorQName, "did not connect locally~n~p", [E]),
+ {stop, {shutdown, restart}, State}.
+
+%% If we terminate due to a gen_server call exploding (almost
+%% certainly due to an amqp_channel:call() exploding) then we do not
+%% want to report the gen_server call in our status.
+clean_reason({E = {shutdown, _}, _}) -> E;
+clean_reason(E) -> E.
+
+%% local / disconnected never gets invoked, see handle_info({'DOWN', ...
+
+%%----------------------------------------------------------------------------
+
+unacked_new() -> gb_trees:empty().
+
+ack(#'basic.ack'{delivery_tag = Seq,
+ multiple = Multiple}, Ch, Unack) ->
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = gb_trees:get(Seq, Unack),
+ multiple = Multiple}),
+ remove_delivery_tags(Seq, Multiple, Unack).
+
+
+%% Note: at time of writing the broker will never send requeue=false. And it's
+%% hard to imagine why it would. But we may as well handle it.
+nack(#'basic.nack'{delivery_tag = Seq,
+ multiple = Multiple,
+ requeue = Requeue}, Ch, Unack) ->
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = gb_trees:get(Seq, Unack),
+ multiple = Multiple,
+ requeue = Requeue}),
+ remove_delivery_tags(Seq, Multiple, Unack).
+
+remove_delivery_tags(Seq, false, Unacked) ->
+ gb_trees:delete(Seq, Unacked);
+remove_delivery_tags(Seq, true, Unacked) ->
+ case gb_trees:is_empty(Unacked) of
+ true -> Unacked;
+ false -> {Smallest, _Val, Unacked1} = gb_trees:take_smallest(Unacked),
+ case Smallest > Seq of
+ true -> Unacked;
+ false -> remove_delivery_tags(Seq, true, Unacked1)
+ end
+ end.
+
+forward(#upstream{ack_mode = AckMode,
+ trust_user_id = Trust},
+ #'basic.deliver'{delivery_tag = DT},
+ Ch, DCh, PublishMethod, HeadersFun, ForwardFun, Msg, Unacked) ->
+ Headers = extract_headers(Msg),
+ case ForwardFun(Headers) of
+ true -> Msg1 = maybe_clear_user_id(
+ Trust, update_headers(HeadersFun(Headers), Msg)),
+ Seq = case AckMode of
+ 'on-confirm' -> amqp_channel:next_publish_seqno(DCh);
+ _ -> ignore
+ end,
+ amqp_channel:cast(DCh, PublishMethod, Msg1),
+ case AckMode of
+ 'on-confirm' ->
+ gb_trees:insert(Seq, DT, Unacked);
+ 'on-publish' ->
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DT}),
+ Unacked;
+ 'no-ack' ->
+ Unacked
+ end;
+ false -> amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DT}),
+ %% Drop it, but acknowledge it!
+ Unacked
+ end.
+
+maybe_clear_user_id(false, Msg = #amqp_msg{props = Props}) ->
+ Msg#amqp_msg{props = Props#'P_basic'{user_id = undefined}};
+maybe_clear_user_id(true, Msg) ->
+ Msg.
+
+extract_headers(#amqp_msg{props = #'P_basic'{headers = Headers}}) ->
+ Headers.
+
+update_headers(Headers, Msg = #amqp_msg{props = Props}) ->
+ Msg#amqp_msg{props = Props#'P_basic'{headers = Headers}}.
+
+%%----------------------------------------------------------------------------
+
+%% If the downstream channel shuts down cleanly, we can just ignore it
+%% - we're the same node, we're presumably about to go down too.
+handle_downstream_down(shutdown, _Args, State) ->
+ {noreply, State};
+
+handle_downstream_down(Reason, _Args, State) ->
+ {stop, {downstream_channel_down, Reason}, State}.
+
+%% If the upstream channel goes down for an intelligible reason, just
+%% log it and die quietly.
+handle_upstream_down({shutdown, Reason}, {Upstream, UParams, XName}, State) ->
+ rabbit_federation_link_util:connection_error(
+ remote, {upstream_channel_down, Reason}, Upstream, UParams, XName, State);
+
+handle_upstream_down(Reason, _Args, State) ->
+ {stop, {upstream_channel_down, Reason}, State}.
+
+%%----------------------------------------------------------------------------
+
+log_terminate(gone, _Upstream, _UParams, _XorQName) ->
+ %% the link cannot start, this has been logged already
+ ok;
+log_terminate({shutdown, restart}, _Upstream, _UParams, _XorQName) ->
+ %% We've already logged this before munging the reason
+ ok;
+log_terminate(shutdown, Upstream, UParams, XorQName) ->
+ %% The supervisor is shutting us down; we are probably restarting
+ %% the link because configuration has changed. So try to shut down
+ %% nicely so that we do not cause unacked messages to be
+ %% redelivered.
+ log_info(XorQName, "disconnecting from ~s~n",
+ [rabbit_federation_upstream:params_to_string(UParams)]),
+ rabbit_federation_status:remove(Upstream, XorQName);
+
+log_terminate(Reason, Upstream, UParams, XorQName) ->
+ %% Unexpected death. sasl will log it, but we should update
+ %% rabbit_federation_status.
+ rabbit_federation_status:report(
+ Upstream, UParams, XorQName, clean_reason(Reason)).
+
+log_debug(XorQName, Fmt, Args) -> log(debug, XorQName, Fmt, Args).
+log_info(XorQName, Fmt, Args) -> log(info, XorQName, Fmt, Args).
+log_warning(XorQName, Fmt, Args) -> log(warning, XorQName, Fmt, Args).
+log_error(XorQName, Fmt, Args) -> log(error, XorQName, Fmt, Args).
+
+log(Level, XorQName, Fmt0, Args0) ->
+ Fmt = "Federation ~s " ++ Fmt0,
+ Args = [rabbit_misc:rs(XorQName) | Args0],
+ case Level of
+ debug -> rabbit_log_federation:debug(Fmt, Args);
+ info -> rabbit_log_federation:info(Fmt, Args);
+ warning -> rabbit_log_federation:warning(Fmt, Args);
+ error -> rabbit_log_federation:error(Fmt, Args)
+ end.
+
+%%----------------------------------------------------------------------------
+
+disposable_channel_call(Conn, Method) ->
+ disposable_channel_call(Conn, Method, fun(_, _) -> ok end).
+
+disposable_channel_call(Conn, Method, ErrFun) ->
+ try
+ {ok, Ch} = amqp_connection:open_channel(Conn),
+ try
+ amqp_channel:call(Ch, Method)
+ catch exit:{{shutdown, {server_initiated_close, Code, Message}}, _} ->
+ ErrFun(Code, Message)
+ after
+ ensure_channel_closed(Ch)
+ end
+ catch
+ Exception:Reason ->
+ rabbit_log_federation:error("Federation link could not create a disposable (one-off) channel due to an error ~p: ~p~n", [Exception, Reason])
+ end.
+
+disposable_connection_call(Params, Method, ErrFun) ->
+ try
+ rabbit_log_federation:debug("Disposable connection parameters: ~p", [Params]),
+ case open(Params, <<"Disposable exchange federation link connection">>) of
+ {ok, Conn, Ch} ->
+ try
+ amqp_channel:call(Ch, Method)
+ catch exit:{{shutdown, {connection_closing, {server_initiated_close, Code, Message}}}, _} ->
+ ErrFun(Code, Message);
+ exit:{{shutdown, {server_initiated_close, Code, Message}}, _} ->
+ ErrFun(Code, Message)
+ after
+ ensure_connection_closed(Conn)
+ end;
+ {error, {auth_failure, Message}} ->
+ rabbit_log_federation:error("Federation link could not open a disposable (one-off) connection "
+ "due to an authentication failure: ~s~n", [Message]);
+ Error ->
+ rabbit_log_federation:error("Federation link could not open a disposable (one-off) connection, "
+ "reason: ~p~n", [Error]),
+ Error
+ end
+ catch
+ Exception:Reason ->
+ rabbit_log_federation:error("Federation link could not create a disposable (one-off) connection "
+ "due to an error ~p: ~p~n", [Exception, Reason])
+ end.
diff --git a/deps/rabbitmq_federation/src/rabbit_federation_parameters.erl b/deps/rabbitmq_federation/src/rabbit_federation_parameters.erl
new file mode 100644
index 0000000000..928e41dc0f
--- /dev/null
+++ b/deps/rabbitmq_federation/src/rabbit_federation_parameters.erl
@@ -0,0 +1,139 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_federation_parameters).
+-behaviour(rabbit_runtime_parameter).
+-behaviour(rabbit_policy_validator).
+
+-include_lib("rabbit_common/include/rabbit.hrl").
+
+-export([validate/5, notify/5, notify_clear/4]).
+-export([register/0, unregister/0, validate_policy/1, adjust/1]).
+
+-define(RUNTIME_PARAMETERS,
+ [{runtime_parameter, <<"federation">>},
+ {runtime_parameter, <<"federation-upstream">>},
+ {runtime_parameter, <<"federation-upstream-set">>},
+ {policy_validator, <<"federation-upstream">>},
+ {policy_validator, <<"federation-upstream-pattern">>},
+ {policy_validator, <<"federation-upstream-set">>}]).
+
+-rabbit_boot_step({?MODULE,
+ [{description, "federation parameters"},
+ {mfa, {rabbit_federation_parameters, register, []}},
+ {requires, rabbit_registry},
+ {cleanup, {rabbit_federation_parameters, unregister, []}},
+ {enables, recovery}]}).
+
+register() ->
+ [rabbit_registry:register(Class, Name, ?MODULE) ||
+ {Class, Name} <- ?RUNTIME_PARAMETERS],
+ ok.
+
+unregister() ->
+ [rabbit_registry:unregister(Class, Name) ||
+ {Class, Name} <- ?RUNTIME_PARAMETERS],
+ ok.
+
+validate(_VHost, <<"federation-upstream-set">>, Name, Term0, _User) ->
+ Term = [rabbit_data_coercion:to_proplist(Upstream) || Upstream <- Term0],
+ [rabbit_parameter_validation:proplist(
+ Name,
+ [{<<"upstream">>, fun rabbit_parameter_validation:binary/2, mandatory} |
+ shared_validation()], Upstream)
+ || Upstream <- Term];
+
+validate(_VHost, <<"federation-upstream">>, Name, Term0, _User) ->
+ Term = rabbit_data_coercion:to_proplist(Term0),
+ rabbit_parameter_validation:proplist(
+ Name, [{<<"uri">>, fun validate_uri/2, mandatory} |
+ shared_validation()], Term);
+
+validate(_VHost, _Component, Name, _Term, _User) ->
+ {error, "name not recognised: ~p", [Name]}.
+
+notify(_VHost, <<"federation-upstream-set">>, Name, _Term, _Username) ->
+ adjust({upstream_set, Name});
+
+notify(_VHost, <<"federation-upstream">>, Name, _Term, _Username) ->
+ adjust({upstream, Name}).
+
+notify_clear(_VHost, <<"federation-upstream-set">>, Name, _Username) ->
+ adjust({clear_upstream_set, Name});
+
+notify_clear(VHost, <<"federation-upstream">>, Name, _Username) ->
+ rabbit_federation_exchange_link_sup_sup:adjust({clear_upstream, VHost, Name}),
+ rabbit_federation_queue_link_sup_sup:adjust({clear_upstream, VHost, Name}).
+
+adjust(Thing) ->
+ rabbit_federation_exchange_link_sup_sup:adjust(Thing),
+ rabbit_federation_queue_link_sup_sup:adjust(Thing).
+
+%%----------------------------------------------------------------------------
+
+shared_validation() ->
+ [{<<"exchange">>, fun rabbit_parameter_validation:binary/2, optional},
+ {<<"queue">>, fun rabbit_parameter_validation:binary/2, optional},
+ {<<"consumer-tag">>, fun rabbit_parameter_validation:binary/2, optional},
+ {<<"prefetch-count">>, fun rabbit_parameter_validation:number/2, optional},
+ {<<"reconnect-delay">>,fun rabbit_parameter_validation:number/2, optional},
+ {<<"max-hops">>, fun rabbit_parameter_validation:number/2, optional},
+ {<<"expires">>, fun rabbit_parameter_validation:number/2, optional},
+ {<<"message-ttl">>, fun rabbit_parameter_validation:number/2, optional},
+ {<<"trust-user-id">>, fun rabbit_parameter_validation:boolean/2, optional},
+ {<<"ack-mode">>, rabbit_parameter_validation:enum(
+ ['no-ack', 'on-publish', 'on-confirm']), optional},
+ {<<"resource-cleanup-mode">>, rabbit_parameter_validation:enum(['default', 'never']), optional},
+ {<<"ha-policy">>, fun rabbit_parameter_validation:binary/2, optional},
+ {<<"bind-nowait">>, fun rabbit_parameter_validation:boolean/2, optional}].
+
+validate_uri(Name, Term) when is_binary(Term) ->
+ case rabbit_parameter_validation:binary(Name, Term) of
+ ok -> case amqp_uri:parse(binary_to_list(Term)) of
+ {ok, _} -> ok;
+ {error, E} -> {error, "\"~s\" not a valid URI: ~p", [Term, E]}
+ end;
+ E -> E
+ end;
+validate_uri(Name, Term) ->
+ case rabbit_parameter_validation:list(Name, Term) of
+ ok -> case [V || U <- Term,
+ V <- [validate_uri(Name, U)],
+ element(1, V) =:= error] of
+ [] -> ok;
+ [E | _] -> E
+ end;
+ E -> E
+ end.
+
+%%----------------------------------------------------------------------------
+
+validate_policy([{<<"federation-upstream-set">>, Value}])
+ when is_binary(Value) ->
+ ok;
+validate_policy([{<<"federation-upstream-set">>, Value}]) ->
+ {error, "~p is not a valid federation upstream set name", [Value]};
+
+validate_policy([{<<"federation-upstream-pattern">>, Value}])
+ when is_binary(Value) ->
+ case re:compile(Value) of
+ {ok, _} -> ok;
+ {error, Reason} -> {error, "could not compile pattern ~s to a regular expression. "
+ "Error: ~p", [Value, Reason]}
+ end;
+validate_policy([{<<"federation-upstream-pattern">>, Value}]) ->
+ {error, "~p is not a valid federation upstream pattern name", [Value]};
+
+validate_policy([{<<"federation-upstream">>, Value}])
+ when is_binary(Value) ->
+ ok;
+validate_policy([{<<"federation-upstream">>, Value}]) ->
+ {error, "~p is not a valid federation upstream name", [Value]};
+
+validate_policy(L) when length(L) >= 2 ->
+ {error, "cannot specify federation-upstream, federation-upstream-set "
+ "or federation-upstream-pattern together", []}.
diff --git a/deps/rabbitmq_federation/src/rabbit_federation_queue.erl b/deps/rabbitmq_federation/src/rabbit_federation_queue.erl
new file mode 100644
index 0000000000..3117792589
--- /dev/null
+++ b/deps/rabbitmq_federation/src/rabbit_federation_queue.erl
@@ -0,0 +1,111 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_federation_queue).
+
+-rabbit_boot_step({?MODULE,
+ [{description, "federation queue decorator"},
+ {mfa, {rabbit_queue_decorator, register,
+ [<<"federation">>, ?MODULE]}},
+ {requires, rabbit_registry},
+ {cleanup, {rabbit_queue_decorator, unregister,
+ [<<"federation">>]}},
+ {enables, recovery}]}).
+
+-include_lib("rabbit/include/amqqueue.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+-include("rabbit_federation.hrl").
+
+-behaviour(rabbit_queue_decorator).
+
+-export([startup/1, shutdown/1, policy_changed/2, active_for/1,
+ consumer_state_changed/3]).
+-export([policy_changed_local/2]).
+
+%%----------------------------------------------------------------------------
+
+startup(Q) ->
+ case active_for(Q) of
+ true -> rabbit_federation_queue_link_sup_sup:start_child(Q);
+ false -> ok
+ end,
+ ok.
+
+shutdown(Q) when ?is_amqqueue(Q) ->
+ QName = amqqueue:get_name(Q),
+ case active_for(Q) of
+ true -> rabbit_federation_queue_link_sup_sup:stop_child(Q),
+ rabbit_federation_status:remove_exchange_or_queue(QName);
+ false -> ok
+ end,
+ ok.
+
+policy_changed(Q1, Q2) when ?is_amqqueue(Q1) ->
+ QName = amqqueue:get_name(Q1),
+ case rabbit_amqqueue:lookup(QName) of
+ {ok, Q0} when ?is_amqqueue(Q0) ->
+ QPid = amqqueue:get_pid(Q0),
+ rpc:call(node(QPid), rabbit_federation_queue,
+ policy_changed_local, [Q1, Q2]);
+ {error, not_found} ->
+ ok
+ end.
+
+policy_changed_local(Q1, Q2) ->
+ shutdown(Q1),
+ startup(Q2).
+
+active_for(Q) ->
+ Args = amqqueue:get_arguments(Q),
+ case rabbit_misc:table_lookup(Args, <<"x-internal-purpose">>) of
+ {longstr, _} -> false; %% [0]
+ _ -> rabbit_federation_upstream:federate(Q)
+ end.
+%% [0] Currently the only "internal purpose" is federation, but I
+%% suspect if we introduce another one it will also be for something
+%% that doesn't want to be federated.
+
+%% We need to reconsider whether we need to run or pause every time
+%% the consumer state changes in the queue. But why can the state
+%% change?
+%%
+%% consumer blocked | We may have no more active consumers, and thus need to
+%% | pause
+%% |
+%% consumer unblocked | We don't care
+%% |
+%% queue empty | The queue has become empty therefore we need to run to
+%% | get more messages
+%% |
+%% basic consume | We don't care
+%% |
+%% basic cancel | We may have no more active consumers, and thus need to
+%% | pause
+%% |
+%% refresh | We asked for it (we have started a new link after
+%% | failover and need something to prod us into action
+%% | (or not)).
+%%
+%% In the cases where we don't care it's not prohibitively expensive
+%% for us to be here anyway, so never mind.
+%%
+%% Note that there is no "queue became non-empty" state change - that's
+%% because of the queue invariant. If the queue transitions from empty to
+%% non-empty then it must have no active consumers - in which case it stays
+%% the same from our POV.
+
+consumer_state_changed(Q, MaxActivePriority, IsEmpty) ->
+ QName = amqqueue:get_name(Q),
+ case IsEmpty andalso active_unfederated(MaxActivePriority) of
+ true -> rabbit_federation_queue_link:run(QName);
+ false -> rabbit_federation_queue_link:pause(QName)
+ end,
+ ok.
+
+active_unfederated(empty) -> false;
+active_unfederated(P) when P >= 0 -> true;
+active_unfederated(_P) -> false.
diff --git a/deps/rabbitmq_federation/src/rabbit_federation_queue_link.erl b/deps/rabbitmq_federation/src/rabbit_federation_queue_link.erl
new file mode 100644
index 0000000000..97389cb8f6
--- /dev/null
+++ b/deps/rabbitmq_federation/src/rabbit_federation_queue_link.erl
@@ -0,0 +1,330 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_federation_queue_link).
+
+%% pg2 is deprecated in OTP 23.
+-compile(nowarn_deprecated_function).
+
+-include_lib("rabbit/include/amqqueue.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+-include("rabbit_federation.hrl").
+
+-behaviour(gen_server2).
+
+-export([start_link/1, go/0, run/1, pause/1]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-import(rabbit_misc, [pget/2]).
+-import(rabbit_federation_util, [name/1, pgname/1]).
+
+-record(not_started, {queue, run, upstream, upstream_params}).
+-record(state, {queue, run, conn, ch, dconn, dch, upstream, upstream_params,
+ unacked}).
+
+start_link(Args) ->
+ gen_server2:start_link(?MODULE, Args, [{timeout, infinity}]).
+
+run(QName) -> cast(QName, run).
+pause(QName) -> cast(QName, pause).
+go() -> cast(go).
+
+%%----------------------------------------------------------------------------
+%%call(QName, Msg) -> [gen_server2:call(Pid, Msg, infinity) || Pid <- q(QName)].
+cast(Msg) -> [gen_server2:cast(Pid, Msg) || Pid <- all()].
+cast(QName, Msg) -> [gen_server2:cast(Pid, Msg) || Pid <- q(QName)].
+
+join(Name) ->
+ pg2:create(pgname(Name)),
+ ok = pg2:join(pgname(Name), self()).
+
+all() ->
+ pg2:create(pgname(rabbit_federation_queues)),
+ pg2:get_members(pgname(rabbit_federation_queues)).
+
+q(QName) ->
+ pg2:create(pgname({rabbit_federation_queue, QName})),
+ pg2:get_members(pgname({rabbit_federation_queue, QName})).
+
+federation_up() ->
+ proplists:is_defined(rabbitmq_federation,
+ application:which_applications(infinity)).
+
+%%----------------------------------------------------------------------------
+
+init({Upstream, Queue}) when ?is_amqqueue(Queue) ->
+ QName = amqqueue:get_name(Queue),
+ case rabbit_amqqueue:lookup(QName) of
+ {ok, Q} ->
+ DeobfuscatedUpstream = rabbit_federation_util:deobfuscate_upstream(Upstream),
+ DeobfuscatedUParams = rabbit_federation_upstream:to_params(DeobfuscatedUpstream, Queue),
+ UParams = rabbit_federation_util:obfuscate_upstream_params(DeobfuscatedUParams),
+ rabbit_federation_status:report(Upstream, UParams, QName, starting),
+ join(rabbit_federation_queues),
+ join({rabbit_federation_queue, QName}),
+ gen_server2:cast(self(), maybe_go),
+ rabbit_amqqueue:notify_decorators(Q),
+ {ok, #not_started{queue = Queue,
+ run = false,
+ upstream = Upstream,
+ upstream_params = UParams}};
+ {error, not_found} ->
+ rabbit_federation_link_util:log_warning(QName, "not found, stopping link~n", []),
+ {stop, gone}
+ end.
+
+handle_call(Msg, _From, State) ->
+ {stop, {unexpected_call, Msg}, State}.
+
+handle_cast(maybe_go, State) ->
+ case federation_up() of
+ true -> go(State);
+ false -> {noreply, State}
+ end;
+
+handle_cast(go, State = #not_started{}) ->
+ go(State);
+
+handle_cast(go, State) ->
+ {noreply, State};
+
+handle_cast(run, State = #state{upstream = Upstream,
+ upstream_params = UParams,
+ ch = Ch,
+ run = false}) ->
+ consume(Ch, Upstream, UParams#upstream_params.x_or_q),
+ {noreply, State#state{run = true}};
+
+handle_cast(run, State = #not_started{}) ->
+ {noreply, State#not_started{run = true}};
+
+handle_cast(run, State) ->
+ %% Already started
+ {noreply, State};
+
+handle_cast(pause, State = #state{run = false}) ->
+ %% Already paused
+ {noreply, State};
+
+handle_cast(pause, State = #not_started{}) ->
+ {noreply, State#not_started{run = false}};
+
+handle_cast(pause, State = #state{ch = Ch, upstream = Upstream}) ->
+ cancel(Ch, Upstream),
+ {noreply, State#state{run = false}};
+
+handle_cast(Msg, State) ->
+ {stop, {unexpected_cast, Msg}, State}.
+
+handle_info(#'basic.consume_ok'{}, State) ->
+ {noreply, State};
+
+handle_info(#'basic.ack'{} = Ack, State = #state{ch = Ch,
+ unacked = Unacked}) ->
+ Unacked1 = rabbit_federation_link_util:ack(Ack, Ch, Unacked),
+ {noreply, State#state{unacked = Unacked1}};
+
+handle_info(#'basic.nack'{} = Nack, State = #state{ch = Ch,
+ unacked = Unacked}) ->
+ Unacked1 = rabbit_federation_link_util:nack(Nack, Ch, Unacked),
+ {noreply, State#state{unacked = Unacked1}};
+
+handle_info({#'basic.deliver'{redelivered = Redelivered,
+ exchange = X,
+ routing_key = K} = DeliverMethod, Msg},
+ State = #state{queue = Q,
+ upstream = Upstream,
+ upstream_params = UParams,
+ ch = Ch,
+ dch = DCh,
+ unacked = Unacked}) when ?is_amqqueue(Q) ->
+ QName = amqqueue:get_name(Q),
+ PublishMethod = #'basic.publish'{exchange = <<"">>,
+ routing_key = QName#resource.name},
+ HeadersFun = fun (H) -> update_headers(UParams, Redelivered, X, K, H) end,
+ ForwardFun = fun (_H) -> true end,
+ Unacked1 = rabbit_federation_link_util:forward(
+ Upstream, DeliverMethod, Ch, DCh, PublishMethod,
+ HeadersFun, ForwardFun, Msg, Unacked),
+ %% TODO actually we could reject when 'stopped'
+ {noreply, State#state{unacked = Unacked1}};
+
+handle_info(#'basic.cancel'{},
+ State = #state{queue = Q,
+ upstream = Upstream,
+ upstream_params = UParams}) when ?is_amqqueue(Q) ->
+ QName = amqqueue:get_name(Q),
+ rabbit_federation_link_util:connection_error(
+ local, basic_cancel, Upstream, UParams, QName, State);
+
+handle_info({'DOWN', _Ref, process, Pid, Reason},
+ State = #state{dch = DCh,
+ ch = Ch,
+ upstream = Upstream,
+ upstream_params = UParams,
+ queue = Q}) when ?is_amqqueue(Q) ->
+ QName = amqqueue:get_name(Q),
+ handle_down(Pid, Reason, Ch, DCh, {Upstream, UParams, QName}, State);
+
+handle_info(Msg, State) ->
+ {stop, {unexpected_info, Msg}, State}.
+
+terminate(Reason, #not_started{upstream = Upstream,
+ upstream_params = UParams,
+ queue = Q}) when ?is_amqqueue(Q) ->
+ QName = amqqueue:get_name(Q),
+ rabbit_federation_link_util:log_terminate(Reason, Upstream, UParams, QName),
+ ok;
+
+terminate(Reason, #state{dconn = DConn,
+ conn = Conn,
+ upstream = Upstream,
+ upstream_params = UParams,
+ queue = Q}) when ?is_amqqueue(Q) ->
+ QName = amqqueue:get_name(Q),
+ rabbit_federation_link_util:ensure_connection_closed(DConn),
+ rabbit_federation_link_util:ensure_connection_closed(Conn),
+ rabbit_federation_link_util:log_terminate(Reason, Upstream, UParams, QName),
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%----------------------------------------------------------------------------
+
+go(S0 = #not_started{run = Run,
+ upstream = Upstream = #upstream{
+ prefetch_count = Prefetch},
+ upstream_params = UParams,
+ queue = Queue}) when ?is_amqqueue(Queue) ->
+ QName = amqqueue:get_name(Queue),
+ #upstream_params{x_or_q = UQueue} = UParams,
+ Durable = amqqueue:is_durable(UQueue),
+ AutoDelete = amqqueue:is_auto_delete(UQueue),
+ Args = amqqueue:get_arguments(UQueue),
+ Unacked = rabbit_federation_link_util:unacked_new(),
+ rabbit_federation_link_util:start_conn_ch(
+ fun (Conn, Ch, DConn, DCh) ->
+ check_upstream_suitable(Conn),
+ amqp_channel:call(Ch, #'queue.declare'{queue = name(UQueue),
+ durable = Durable,
+ auto_delete = AutoDelete,
+ arguments = Args}),
+ case Upstream#upstream.ack_mode of
+ 'no-ack' -> ok;
+ _ -> amqp_channel:call(
+ Ch, #'basic.qos'{prefetch_count = Prefetch})
+ end,
+ amqp_selective_consumer:register_default_consumer(Ch, self()),
+ case Run of
+ true -> consume(Ch, Upstream, UQueue);
+ false -> ok
+ end,
+ {noreply, #state{queue = Queue,
+ run = Run,
+ conn = Conn,
+ ch = Ch,
+ dconn = DConn,
+ dch = DCh,
+ upstream = Upstream,
+ upstream_params = UParams,
+ unacked = Unacked}}
+ end, Upstream, UParams, QName, S0).
+
+check_upstream_suitable(Conn) ->
+ Props = pget(server_properties,
+ amqp_connection:info(Conn, [server_properties])),
+ {table, Caps} = rabbit_misc:table_lookup(Props, <<"capabilities">>),
+ case rabbit_misc:table_lookup(Caps, <<"consumer_priorities">>) of
+ {bool, true} -> ok;
+ _ -> exit({error, upstream_lacks_consumer_priorities})
+ end.
+
+update_headers(UParams, Redelivered, X, K, undefined) ->
+ update_headers(UParams, Redelivered, X, K, []);
+
+update_headers(#upstream_params{table = Table}, Redelivered, X, K, Headers) ->
+ {Headers1, Count} =
+ case rabbit_misc:table_lookup(Headers, ?ROUTING_HEADER) of
+ undefined ->
+ %% We only want to record the original exchange and
+ %% routing key the first time a message gets
+ %% forwarded; after that it's known that they were
+ %% <<>> and QueueName respectively.
+ {init_x_original_source_headers(Headers, X, K), 0};
+ {array, Been} ->
+ update_visit_count(Table, Been, Headers);
+ %% this means the header comes from the client
+ %% which re-published the message, most likely unintentionally.
+ %% We can't assume much about the value, so we simply ignore it.
+ _Other ->
+ {init_x_original_source_headers(Headers, X, K), 0}
+ end,
+ rabbit_basic:prepend_table_header(
+ ?ROUTING_HEADER, Table ++ [{<<"redelivered">>, bool, Redelivered},
+ {<<"visit-count">>, long, Count + 1}],
+ swap_cc_header(Headers1)).
+
+init_x_original_source_headers(Headers, X, K) ->
+ rabbit_misc:set_table_value(
+ rabbit_misc:set_table_value(
+ Headers, <<"x-original-exchange">>, longstr, X),
+ <<"x-original-routing-key">>, longstr, K).
+
+update_visit_count(Table, Been, Headers) ->
+ {Found, Been1} = lists:partition(
+ fun(I) -> visit_match(I, Table) end,
+ Been),
+ C = case Found of
+ [] -> 0;
+ [{table, T}] -> case rabbit_misc:table_lookup(
+ T, <<"visit-count">>) of
+ {_, I} when is_number(I) -> I;
+ _ -> 0
+ end
+ end,
+ {rabbit_misc:set_table_value(
+ Headers, ?ROUTING_HEADER, array, Been1), C}.
+
+swap_cc_header(Table) ->
+ [{case K of
+ <<"CC">> -> <<"x-original-cc">>;
+ _ -> K
+ end, T, V} || {K, T, V} <- Table].
+
+visit_match({table, T}, Info) ->
+ lists:all(fun (K) ->
+ rabbit_misc:table_lookup(T, K) =:=
+ rabbit_misc:table_lookup(Info, K)
+ end, [<<"uri">>, <<"virtual_host">>, <<"queue">>]);
+visit_match(_ ,_) ->
+ false.
+
+consumer_tag(#upstream{consumer_tag = ConsumerTag}) ->
+ ConsumerTag.
+
+consume(Ch, Upstream, UQueue) ->
+ ConsumerTag = consumer_tag(Upstream),
+ NoAck = Upstream#upstream.ack_mode =:= 'no-ack',
+ amqp_channel:cast(
+ Ch, #'basic.consume'{queue = name(UQueue),
+ no_ack = NoAck,
+ nowait = true,
+ consumer_tag = ConsumerTag,
+ arguments = [{<<"x-priority">>, long, -1}]}).
+
+cancel(Ch, Upstream) ->
+ ConsumerTag = consumer_tag(Upstream),
+ amqp_channel:cast(Ch, #'basic.cancel'{nowait = true,
+ consumer_tag = ConsumerTag}).
+
+handle_down(DCh, Reason, _Ch, DCh, Args, State) ->
+ rabbit_federation_link_util:handle_downstream_down(Reason, Args, State);
+handle_down(Ch, Reason, Ch, _DCh, Args, State) ->
+ rabbit_federation_link_util:handle_upstream_down(Reason, Args, State).
diff --git a/deps/rabbitmq_federation/src/rabbit_federation_queue_link_sup_sup.erl b/deps/rabbitmq_federation/src/rabbit_federation_queue_link_sup_sup.erl
new file mode 100644
index 0000000000..1f6ec2b88f
--- /dev/null
+++ b/deps/rabbitmq_federation/src/rabbit_federation_queue_link_sup_sup.erl
@@ -0,0 +1,87 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_federation_queue_link_sup_sup).
+
+-behaviour(mirrored_supervisor).
+
+-include_lib("rabbit_common/include/rabbit.hrl").
+-include_lib("rabbit/include/amqqueue.hrl").
+-define(SUPERVISOR, ?MODULE).
+
+%% Supervises the upstream links for all queues (but not exchanges). We need
+%% different handling here since queues do not want a mirrored sup.
+
+-export([start_link/0, start_child/1, adjust/1, stop_child/1]).
+-export([init/1]).
+
+%%----------------------------------------------------------------------------
+
+start_link() ->
+ mirrored_supervisor:start_link({local, ?SUPERVISOR}, ?SUPERVISOR,
+ fun rabbit_misc:execute_mnesia_transaction/1,
+ ?MODULE, []).
+
+%% Note that the next supervisor down, rabbit_federation_link_sup, is common
+%% between exchanges and queues.
+start_child(Q) ->
+ case mirrored_supervisor:start_child(
+ ?SUPERVISOR,
+ {id(Q), {rabbit_federation_link_sup, start_link, [Q]},
+ transient, ?SUPERVISOR_WAIT, supervisor,
+ [rabbit_federation_link_sup]}) of
+ {ok, _Pid} -> ok;
+ {error, {already_started, _Pid}} ->
+ QueueName = amqqueue:get_name(Q),
+ rabbit_log_federation:warning("Federation link for queue ~p was already started",
+ [rabbit_misc:rs(QueueName)]),
+ ok;
+ %% A link returned {stop, gone}, the link_sup shut down, that's OK.
+ {error, {shutdown, _}} -> ok
+ end.
+
+
+adjust({clear_upstream, VHost, UpstreamName}) ->
+ [rabbit_federation_link_sup:adjust(Pid, Q, {clear_upstream, UpstreamName}) ||
+ {Q, Pid, _, _} <- mirrored_supervisor:which_children(?SUPERVISOR),
+ ?amqqueue_vhost_equals(Q, VHost)],
+ ok;
+adjust(Reason) ->
+ [rabbit_federation_link_sup:adjust(Pid, Q, Reason) ||
+ {Q, Pid, _, _} <- mirrored_supervisor:which_children(?SUPERVISOR)],
+ ok.
+
+stop_child(Q) ->
+ case mirrored_supervisor:terminate_child(?SUPERVISOR, id(Q)) of
+ ok -> ok;
+ {error, Err} ->
+ QueueName = amqqueue:get_name(Q),
+ rabbit_log_federation:warning(
+ "Attempt to stop a federation link for queue ~p failed: ~p",
+ [rabbit_misc:rs(QueueName), Err]),
+ ok
+ end,
+ ok = mirrored_supervisor:delete_child(?SUPERVISOR, id(Q)).
+
+%%----------------------------------------------------------------------------
+
+init([]) ->
+ {ok, {{one_for_one, 1200, 60}, []}}.
+
+%% Clean out all mutable aspects of the queue except policy. We need
+%% to keep the entire queue around rather than just take its name
+%% since we will want to know its policy to determine how to federate
+%% it, and its immutable properties in case we want to redeclare it
+%% upstream. We don't just take its name and look it up again since
+%% that would introduce race conditions when policies change
+%% frequently. Note that since we take down all the links and start
+%% again when policies change, the policy will always be correct, so
+%% we don't clear it out here and can trust it.
+id(Q) when ?is_amqqueue(Q) ->
+ Policy = amqqueue:get_policy(Q),
+ Q1 = rabbit_amqqueue:immutable(Q),
+ amqqueue:set_policy(Q1, Policy).
diff --git a/deps/rabbitmq_federation/src/rabbit_federation_status.erl b/deps/rabbitmq_federation/src/rabbit_federation_status.erl
new file mode 100644
index 0000000000..04afec990d
--- /dev/null
+++ b/deps/rabbitmq_federation/src/rabbit_federation_status.erl
@@ -0,0 +1,175 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_federation_status).
+-behaviour(gen_server).
+
+-include_lib("amqp_client/include/amqp_client.hrl").
+-include("rabbit_federation.hrl").
+
+-export([start_link/0]).
+
+-export([report/4, remove_exchange_or_queue/1, remove/2, status/0, lookup/1]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-import(rabbit_federation_util, [name/1]).
+
+-define(SERVER, ?MODULE).
+-define(ETS_NAME, ?MODULE).
+
+-record(state, {}).
+-record(entry, {key, uri, status, timestamp, id, supervisor, upstream}).
+
+start_link() ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+
+report(Upstream, UParams, XorQName, Status) ->
+ [Supervisor | _] = get('$ancestors'),
+ gen_server:cast(?SERVER, {report, Supervisor, Upstream, UParams, XorQName,
+ Status, calendar:local_time()}).
+
+remove_exchange_or_queue(XorQName) ->
+ gen_server:call(?SERVER, {remove_exchange_or_queue, XorQName}, infinity).
+
+remove(Upstream, XorQName) ->
+ gen_server:call(?SERVER, {remove, Upstream, XorQName}, infinity).
+
+status() ->
+ gen_server:call(?SERVER, status, infinity).
+
+lookup(Id) ->
+ gen_server:call(?SERVER, {lookup, Id}, infinity).
+
+init([]) ->
+ ?ETS_NAME = ets:new(?ETS_NAME,
+ [named_table, {keypos, #entry.key}, private]),
+ {ok, #state{}}.
+
+handle_call({remove_exchange_or_queue, XorQName}, _From, State) ->
+ [link_gone(Entry)
+ || Entry <- ets:match_object(?ETS_NAME, match_entry(xorqkey(XorQName)))],
+ {reply, ok, State};
+
+handle_call({remove, Upstream, XorQName}, _From, State) ->
+ case ets:match_object(?ETS_NAME, match_entry(key(XorQName, Upstream))) of
+ [Entry] -> link_gone(Entry);
+ [] -> ok
+ end,
+ {reply, ok, State};
+
+handle_call({lookup, Id}, _From, State) ->
+ Link = case ets:match_object(?ETS_NAME, match_id(Id)) of
+ [Entry] ->
+ [{key, Entry#entry.key},
+ {uri, Entry#entry.uri},
+ {status, Entry#entry.status},
+ {timestamp, Entry#entry.timestamp},
+ {id, Entry#entry.id},
+ {supervisor, Entry#entry.supervisor},
+ {upstream, Entry#entry.upstream}];
+ [] -> not_found
+ end,
+ {reply, Link, State};
+
+handle_call(status, _From, State) ->
+ Entries = ets:tab2list(?ETS_NAME),
+ {reply, [format(Entry) || Entry <- Entries], State}.
+
+handle_cast({report, Supervisor, Upstream, #upstream_params{safe_uri = URI},
+ XorQName, Status, Timestamp}, State) ->
+ Key = key(XorQName, Upstream),
+ Entry = #entry{key = Key,
+ status = Status,
+ uri = URI,
+ timestamp = Timestamp,
+ supervisor = Supervisor,
+ upstream = Upstream,
+ id = unique_id(Key)},
+ true = ets:insert(?ETS_NAME, Entry),
+ rabbit_event:notify(federation_link_status, format(Entry)),
+ {noreply, State}.
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+format(#entry{status = Status,
+ uri = URI,
+ timestamp = Timestamp} = Entry) ->
+ identity(Entry) ++ split_status(Status) ++ [{uri, URI},
+ {timestamp, Timestamp}].
+
+identity(#entry{key = {#resource{virtual_host = VHost,
+ kind = Type,
+ name = XorQNameBin},
+ UpstreamName, UXorQNameBin},
+ id = Id,
+ upstream = #upstream{consumer_tag = ConsumerTag}}) ->
+ case Type of
+ exchange -> [{exchange, XorQNameBin},
+ {upstream_exchange, UXorQNameBin}];
+ queue -> [{queue, XorQNameBin},
+ {upstream_queue, UXorQNameBin},
+ {consumer_tag, ConsumerTag}]
+ end ++ [{type, Type},
+ {vhost, VHost},
+ {upstream, UpstreamName},
+ {id, Id}].
+
+unique_id(Key = {#resource{}, UpName, ResName}) when is_binary(UpName), is_binary(ResName) ->
+ PHash = erlang:phash2(Key, 1 bsl 32),
+ << << case N >= 10 of
+ true -> N - 10 + $a;
+ false -> N + $0 end >>
+ || <<N:4>> <= <<PHash:32>> >>.
+
+split_status({running, ConnName}) -> [{status, running},
+ {local_connection, ConnName}];
+split_status({Status, Error}) -> [{status, Status},
+ {error, Error}];
+split_status(Status) when is_atom(Status) -> [{status, Status}].
+
+link_gone(Entry) ->
+ rabbit_event:notify(federation_link_removed, identity(Entry)),
+ true = ets:delete_object(?ETS_NAME, Entry).
+
+%% We don't want to key off the entire upstream, bits of it may change
+key(XName = #resource{kind = exchange}, #upstream{name = UpstreamName,
+ exchange_name = UXNameBin}) ->
+ {XName, UpstreamName, UXNameBin};
+
+key(QName = #resource{kind = queue}, #upstream{name = UpstreamName,
+ queue_name = UQNameBin}) ->
+ {QName, UpstreamName, UQNameBin}.
+
+xorqkey(XorQName) ->
+ {XorQName, '_', '_'}.
+
+match_entry(Key) ->
+ #entry{key = Key,
+ uri = '_',
+ status = '_',
+ timestamp = '_',
+ id = '_',
+ supervisor = '_',
+ upstream = '_'}.
+
+match_id(Id) ->
+ #entry{key = '_',
+ uri = '_',
+ status = '_',
+ timestamp = '_',
+ id = Id,
+ supervisor = '_',
+ upstream = '_'}.
diff --git a/deps/rabbitmq_federation/src/rabbit_federation_sup.erl b/deps/rabbitmq_federation/src/rabbit_federation_sup.erl
new file mode 100644
index 0000000000..d3642b52c2
--- /dev/null
+++ b/deps/rabbitmq_federation/src/rabbit_federation_sup.erl
@@ -0,0 +1,63 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_federation_sup).
+
+-behaviour(supervisor).
+
+%% Supervises everything. There is just one of these.
+
+-include_lib("rabbit_common/include/rabbit.hrl").
+-define(SUPERVISOR, rabbit_federation_sup).
+
+-export([start_link/0, stop/0]).
+
+-export([init/1]).
+
+%% This supervisor needs to be part of the rabbit application since
+%% a) it needs to be in place when exchange recovery takes place
+%% b) it needs to go up and down with rabbit
+
+-rabbit_boot_step({rabbit_federation_supervisor,
+ [{description, "federation"},
+ {mfa, {rabbit_sup, start_child, [?MODULE]}},
+ {requires, kernel_ready},
+ {cleanup, {?MODULE, stop, []}},
+ {enables, rabbit_federation_exchange},
+ {enables, rabbit_federation_queue}]}).
+
+%%----------------------------------------------------------------------------
+
+start_link() ->
+ R = supervisor:start_link({local, ?SUPERVISOR}, ?MODULE, []),
+ rabbit_federation_event:add_handler(),
+ R.
+
+stop() ->
+ rabbit_federation_event:remove_handler(),
+ ok = supervisor:terminate_child(rabbit_sup, ?MODULE),
+ ok = supervisor:delete_child(rabbit_sup, ?MODULE).
+
+%%----------------------------------------------------------------------------
+
+init([]) ->
+ Status = {status, {rabbit_federation_status, start_link, []},
+ transient, ?WORKER_WAIT, worker,
+ [rabbit_federation_status]},
+ XLinkSupSup = {x_links,
+ {rabbit_federation_exchange_link_sup_sup, start_link, []},
+ transient, ?SUPERVISOR_WAIT, supervisor,
+ [rabbit_federation_exchange_link_sup_sup]},
+ QLinkSupSup = {q_links,
+ {rabbit_federation_queue_link_sup_sup, start_link, []},
+ transient, ?SUPERVISOR_WAIT, supervisor,
+ [rabbit_federation_queue_link_sup_sup]},
+ %% with default reconnect-delay of 5 second, this supports up to
+ %% 100 links constantly failing and being restarted a minute
+ %% (or 200 links if reconnect-delay is 10 seconds, 600 with 30 seconds,
+ %% etc: N * (60/reconnect-delay) <= 1200)
+ {ok, {{one_for_one, 1200, 60}, [Status, XLinkSupSup, QLinkSupSup]}}.
diff --git a/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl b/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl
new file mode 100644
index 0000000000..e079b850b5
--- /dev/null
+++ b/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl
@@ -0,0 +1,164 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_federation_upstream).
+
+-include("rabbit_federation.hrl").
+-include_lib("rabbit/include/amqqueue.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+-export([federate/1, for/1, for/2, params_to_string/1, to_params/2]).
+%% For testing
+-export([from_set/2, from_pattern/2, remove_credentials/1]).
+
+-import(rabbit_misc, [pget/2, pget/3]).
+-import(rabbit_federation_util, [name/1, vhost/1, r/1]).
+-import(rabbit_data_coercion, [to_atom/1]).
+
+%%----------------------------------------------------------------------------
+
+federate(XorQ) ->
+ rabbit_policy:get(<<"federation-upstream">>, XorQ) =/= undefined orelse
+ rabbit_policy:get(<<"federation-upstream-set">>, XorQ) =/= undefined orelse
+ rabbit_policy:get(<<"federation-upstream-pattern">>, XorQ) =/= undefined.
+
+for(XorQ) ->
+ case federate(XorQ) of
+ false -> [];
+ true -> from_set_contents(upstreams(XorQ), XorQ)
+ end.
+
+for(XorQ, UpstreamName) ->
+ case federate(XorQ) of
+ false -> [];
+ true -> rabbit_federation_util:find_upstreams(
+ UpstreamName, from_set_contents(upstreams(XorQ), XorQ))
+ end.
+
+upstreams(XorQ) ->
+ UName = rabbit_policy:get(<<"federation-upstream">>, XorQ),
+ USetName = rabbit_policy:get(<<"federation-upstream-set">>, XorQ),
+ UPatternValue = rabbit_policy:get(<<"federation-upstream-pattern">>, XorQ),
+ %% Cannot define 2 at a time, see rabbit_federation_parameters:validate_policy/1
+ case {UName, USetName, UPatternValue} of
+ {undefined, undefined, undefined} -> [];
+ {undefined, undefined, _} -> find_contents(UPatternValue, vhost(XorQ));
+ {undefined, _, undefined} -> set_contents(USetName, vhost(XorQ));
+ {_, undefined, undefined} -> [[{<<"upstream">>, UName}]]
+ end.
+
+params_table(SafeURI, XorQ) ->
+ Key = case XorQ of
+ #exchange{} -> <<"exchange">>;
+ Q when ?is_amqqueue(Q) -> <<"queue">>
+ end,
+ [{<<"uri">>, longstr, SafeURI},
+ {Key, longstr, name(XorQ)}].
+
+params_to_string(#upstream_params{safe_uri = SafeURI,
+ x_or_q = XorQ}) ->
+ print("~s on ~s", [rabbit_misc:rs(r(XorQ)), SafeURI]).
+
+remove_credentials(URI) ->
+ list_to_binary(amqp_uri:remove_credentials(binary_to_list(URI))).
+
+to_params(Upstream = #upstream{uris = URIs}, XorQ) ->
+ URI = lists:nth(rand:uniform(length(URIs)), URIs),
+ {ok, Params} = amqp_uri:parse(binary_to_list(URI), vhost(XorQ)),
+ XorQ1 = with_name(Upstream, vhost(Params), XorQ),
+ SafeURI = remove_credentials(URI),
+ #upstream_params{params = Params,
+ uri = URI,
+ x_or_q = XorQ1,
+ safe_uri = SafeURI,
+ table = params_table(SafeURI, XorQ)}.
+
+print(Fmt, Args) -> iolist_to_binary(io_lib:format(Fmt, Args)).
+
+from_set(SetName, XorQ) ->
+ from_set_contents(set_contents(SetName, vhost(XorQ)), XorQ).
+
+from_pattern(SetName, XorQ) ->
+ from_set_contents(find_contents(SetName, vhost(XorQ)), XorQ).
+
+set_contents(<<"all">>, VHost) ->
+ Upstreams0 = rabbit_runtime_parameters:list(
+ VHost, <<"federation-upstream">>),
+ Upstreams = [rabbit_data_coercion:to_list(U) || U <- Upstreams0],
+ [[{<<"upstream">>, pget(name, U)}] || U <- Upstreams];
+
+set_contents(SetName, VHost) ->
+ case rabbit_runtime_parameters:value(
+ VHost, <<"federation-upstream-set">>, SetName) of
+ not_found -> [];
+ Set -> Set
+ end.
+
+find_contents(RegExp, VHost) ->
+ Upstreams0 = rabbit_runtime_parameters:list(
+ VHost, <<"federation-upstream">>),
+ Upstreams = [rabbit_data_coercion:to_list(U) || U <- Upstreams0,
+ re:run(pget(name, U), RegExp) =/= nomatch],
+ [[{<<"upstream">>, pget(name, U)}] || U <- Upstreams].
+
+from_set_contents(Set, XorQ) ->
+ Results = [from_set_element(P, XorQ) || P <- Set],
+ [R || R <- Results, R =/= not_found].
+
+from_set_element(UpstreamSetElem0, XorQ) ->
+ UpstreamSetElem = rabbit_data_coercion:to_proplist(UpstreamSetElem0),
+ Name = bget(upstream, UpstreamSetElem, []),
+ case rabbit_runtime_parameters:value(
+ vhost(XorQ), <<"federation-upstream">>, Name) of
+ not_found -> not_found;
+ Upstream -> from_upstream_or_set(
+ UpstreamSetElem, Name, Upstream, XorQ)
+ end.
+
+from_upstream_or_set(US, Name, U, XorQ) ->
+ URIParam = bget(uri, US, U),
+ URIs = case URIParam of
+ B when is_binary(B) -> [B];
+ L when is_list(L) -> L
+ end,
+ #upstream{uris = URIs,
+ exchange_name = bget(exchange, US, U, name(XorQ)),
+ queue_name = bget(queue, US, U, name(XorQ)),
+ consumer_tag = bget('consumer-tag', US, U, <<"federation-link-", Name/binary>>),
+ prefetch_count = bget('prefetch-count', US, U, ?DEF_PREFETCH),
+ reconnect_delay = bget('reconnect-delay', US, U, 5),
+ max_hops = bget('max-hops', US, U, 1),
+ expires = bget(expires, US, U, none),
+ message_ttl = bget('message-ttl', US, U, none),
+ trust_user_id = bget('trust-user-id', US, U, false),
+ ack_mode = to_atom(bget('ack-mode', US, U, <<"on-confirm">>)),
+ ha_policy = bget('ha-policy', US, U, none),
+ name = Name,
+ bind_nowait = bget('bind-nowait', US, U, false),
+ resource_cleanup_mode = to_atom(bget('resource-cleanup-mode', US, U, <<"default">>))}.
+
+%%----------------------------------------------------------------------------
+
+bget(K, L1, L2) -> bget(K, L1, L2, undefined).
+
+bget(K0, L1, L2, D) ->
+ K = a2b(K0),
+ %% coerce maps to proplists
+ PL1 = rabbit_data_coercion:to_list(L1),
+ PL2 = rabbit_data_coercion:to_list(L2),
+ case pget(K, PL1, undefined) of
+ undefined -> pget(K, PL2, D);
+ Result -> Result
+ end.
+
+a2b(A) -> list_to_binary(atom_to_list(A)).
+
+with_name(#upstream{exchange_name = XNameBin}, VHostBin, X = #exchange{}) ->
+ X#exchange{name = rabbit_misc:r(VHostBin, exchange, XNameBin)};
+
+with_name(#upstream{queue_name = QNameBin}, VHostBin, Q) when ?is_amqqueue(Q) ->
+ amqqueue:set_name(Q, rabbit_misc:r(VHostBin, queue, QNameBin)).
diff --git a/deps/rabbitmq_federation/src/rabbit_federation_upstream_exchange.erl b/deps/rabbitmq_federation/src/rabbit_federation_upstream_exchange.erl
new file mode 100644
index 0000000000..6018dd90a5
--- /dev/null
+++ b/deps/rabbitmq_federation/src/rabbit_federation_upstream_exchange.erl
@@ -0,0 +1,75 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_federation_upstream_exchange).
+
+-rabbit_boot_step({?MODULE,
+ [{description, "federation upstream exchange type"},
+ {mfa, {rabbit_registry, register,
+ [exchange, <<"x-federation-upstream">>, ?MODULE]}},
+ {requires, rabbit_registry},
+ {cleanup, {rabbit_registry, unregister,
+ [exchange, <<"x-federation-upstream">>]}},
+ {enables, recovery}]}).
+
+-include_lib("rabbit_common/include/rabbit.hrl").
+-include("rabbit_federation.hrl").
+
+-behaviour(rabbit_exchange_type).
+
+-export([description/0, serialise_events/0, route/2]).
+-export([validate/1, validate_binding/2,
+ create/2, delete/3, policy_changed/2,
+ add_binding/3, remove_bindings/3, assert_args_equivalence/2]).
+-export([info/1, info/2]).
+
+%%----------------------------------------------------------------------------
+
+info(_X) -> [].
+info(_X, _) -> [].
+
+description() ->
+ [{description, <<"Federation upstream helper exchange">>},
+ {internal_purpose, federation}].
+
+serialise_events() -> false.
+
+route(X = #exchange{arguments = Args},
+ D = #delivery{message = #basic_message{content = Content}}) ->
+ %% This arg was introduced in the same release as this exchange type;
+ %% it must be set
+ {long, MaxHops} = rabbit_misc:table_lookup(Args, ?MAX_HOPS_ARG),
+ %% Will be missing for pre-3.3.0 versions
+ DName = case rabbit_misc:table_lookup(Args, ?DOWNSTREAM_NAME_ARG) of
+ {longstr, Val0} -> Val0;
+ _ -> unknown
+ end,
+ %% Will be missing for pre-3.8.9 versions
+ DVhost = case rabbit_misc:table_lookup(Args, ?DOWNSTREAM_VHOST_ARG) of
+ {longstr, Val1} -> Val1;
+ _ -> unknown
+ end,
+ Headers = rabbit_basic:extract_headers(Content),
+ case rabbit_federation_util:should_forward(Headers, MaxHops, DName, DVhost) of
+ true -> rabbit_exchange_type_fanout:route(X, D);
+ false -> []
+ end.
+
+validate(#exchange{arguments = Args}) ->
+ rabbit_federation_util:validate_arg(?MAX_HOPS_ARG, long, Args).
+
+validate_binding(_X, _B) -> ok.
+create(_Tx, _X) -> ok.
+delete(_Tx, _X, _Bs) -> ok.
+policy_changed(_X1, _X2) -> ok.
+add_binding(_Tx, _X, _B) -> ok.
+remove_bindings(_Tx, _X, _Bs) -> ok.
+
+assert_args_equivalence(X = #exchange{name = Name,
+ arguments = Args}, ReqArgs) ->
+ rabbit_misc:assert_args_equivalence(Args, ReqArgs, Name, [?MAX_HOPS_ARG]),
+ rabbit_exchange:assert_args_equivalence(X, Args).
diff --git a/deps/rabbitmq_federation/src/rabbit_federation_util.erl b/deps/rabbitmq_federation/src/rabbit_federation_util.erl
new file mode 100644
index 0000000000..160bac996e
--- /dev/null
+++ b/deps/rabbitmq_federation/src/rabbit_federation_util.erl
@@ -0,0 +1,102 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_federation_util).
+
+-include_lib("rabbit/include/amqqueue.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+-include("rabbit_federation.hrl").
+
+-export([should_forward/4, find_upstreams/2, already_seen/3]).
+-export([validate_arg/3, fail/2, name/1, vhost/1, r/1, pgname/1]).
+-export([obfuscate_upstream/1, deobfuscate_upstream/1, obfuscate_upstream_params/1, deobfuscate_upstream_params/1]).
+
+-import(rabbit_misc, [pget_or_die/2, pget/3]).
+
+%%----------------------------------------------------------------------------
+
+should_forward(undefined, _MaxHops, _DName, _DVhost) ->
+ true;
+should_forward(Headers, MaxHops, DName, DVhost) ->
+ case rabbit_misc:table_lookup(Headers, ?ROUTING_HEADER) of
+ {array, A} -> length(A) < MaxHops andalso not already_seen(DName, DVhost, A);
+ _ -> true
+ end.
+
+%% Used to detect message and binding forwarding cycles.
+already_seen(UpstreamID, UpstreamVhost, Array) ->
+ lists:any(fun ({table, T}) ->
+ {longstr, UpstreamID} =:= rabbit_misc:table_lookup(T, <<"cluster-name">>) andalso
+ {longstr, UpstreamVhost} =:= rabbit_misc:table_lookup(T, <<"vhost">>);
+ (_) ->
+ false
+ end, Array).
+
+find_upstreams(Name, Upstreams) ->
+ [U || U = #upstream{name = Name2} <- Upstreams,
+ Name =:= Name2].
+
+validate_arg(Name, Type, Args) ->
+ case rabbit_misc:table_lookup(Args, Name) of
+ {Type, _} -> ok;
+ undefined -> fail("Argument ~s missing", [Name]);
+ _ -> fail("Argument ~s must be of type ~s", [Name, Type])
+ end.
+
+-spec fail(io:format(), [term()]) -> no_return().
+
+fail(Fmt, Args) -> rabbit_misc:protocol_error(precondition_failed, Fmt, Args).
+
+name( #resource{name = XorQName}) -> XorQName;
+name(#exchange{name = #resource{name = XName}}) -> XName;
+name(Q) when ?is_amqqueue(Q) -> #resource{name = QName} = amqqueue:get_name(Q), QName.
+
+vhost( #resource{virtual_host = VHost}) -> VHost;
+vhost(#exchange{name = #resource{virtual_host = VHost}}) -> VHost;
+vhost(Q) when ?is_amqqueue(Q) -> #resource{virtual_host = VHost} = amqqueue:get_name(Q), VHost;
+vhost(#amqp_params_direct{virtual_host = VHost}) -> VHost;
+vhost(#amqp_params_network{virtual_host = VHost}) -> VHost.
+
+r(#exchange{name = XName}) -> XName;
+r(Q) when ?is_amqqueue(Q) -> amqqueue:get_name(Q).
+
+pgname(Name) ->
+ case application:get_env(rabbitmq_federation, pgroup_name_cluster_id) of
+ {ok, false} -> Name;
+ {ok, true} -> {rabbit_nodes:cluster_name(), Name};
+ %% default value is 'false', so do the same thing
+ {ok, undefined} -> Name;
+ _ -> Name
+ end.
+
+obfuscate_upstream(#upstream{uris = Uris} = Upstream) ->
+ Upstream#upstream{uris = [credentials_obfuscation:encrypt(Uri) || Uri <- Uris]}.
+
+obfuscate_upstream_params(#upstream_params{uri = Uri, params = #amqp_params_network{password = Password} = Params} = UParams) ->
+ UParams#upstream_params{
+ uri = credentials_obfuscation:encrypt(Uri),
+ params = Params#amqp_params_network{password = credentials_obfuscation:encrypt(rabbit_data_coercion:to_binary(Password))}
+ };
+obfuscate_upstream_params(#upstream_params{uri = Uri, params = #amqp_params_direct{password = Password} = Params} = UParams) ->
+ UParams#upstream_params{
+ uri = credentials_obfuscation:encrypt(Uri),
+ params = Params#amqp_params_direct{password = credentials_obfuscation:encrypt(rabbit_data_coercion:to_binary(Password))}
+ }.
+
+deobfuscate_upstream(#upstream{uris = EncryptedUris} = Upstream) ->
+ Upstream#upstream{uris = [credentials_obfuscation:decrypt(EncryptedUri) || EncryptedUri <- EncryptedUris]}.
+
+deobfuscate_upstream_params(#upstream_params{uri = EncryptedUri, params = #amqp_params_network{password = EncryptedPassword} = Params} = UParams) ->
+ UParams#upstream_params{
+ uri = credentials_obfuscation:decrypt(EncryptedUri),
+ params = Params#amqp_params_network{password = credentials_obfuscation:decrypt(EncryptedPassword)}
+ };
+deobfuscate_upstream_params(#upstream_params{uri = EncryptedUri, params = #amqp_params_direct{password = EncryptedPassword} = Params} = UParams) ->
+ UParams#upstream_params{
+ uri = credentials_obfuscation:decrypt(EncryptedUri),
+ params = Params#amqp_params_direct{password = credentials_obfuscation:decrypt(EncryptedPassword)}
+ }.