diff options
author | dcorbacho <dparracorbacho@piotal.io> | 2020-11-18 14:27:41 +0000 |
---|---|---|
committer | dcorbacho <dparracorbacho@piotal.io> | 2020-11-18 14:27:41 +0000 |
commit | f23a51261d9502ec39df0f8db47ba6b22aa7659f (patch) | |
tree | 53dcdf46e7dc2c14e81ee960bce8793879b488d3 /deps/rabbitmq_federation/src | |
parent | afa2c2bf6c7e0e9b63f4fb53dc931c70388e1c82 (diff) | |
parent | 9f6d64ec4a4b1eeac24d7846c5c64fd96798d892 (diff) | |
download | rabbitmq-server-git-stream-timestamp-offset.tar.gz |
Merge remote-tracking branch 'origin/master' into stream-timestamp-offsetstream-timestamp-offset
Diffstat (limited to 'deps/rabbitmq_federation/src')
19 files changed, 2935 insertions, 0 deletions
diff --git a/deps/rabbitmq_federation/src/Elixir.RabbitMQ.CLI.Ctl.Commands.FederationStatusCommand.erl b/deps/rabbitmq_federation/src/Elixir.RabbitMQ.CLI.Ctl.Commands.FederationStatusCommand.erl new file mode 100644 index 0000000000..bab4dddeec --- /dev/null +++ b/deps/rabbitmq_federation/src/Elixir.RabbitMQ.CLI.Ctl.Commands.FederationStatusCommand.erl @@ -0,0 +1,117 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module('Elixir.RabbitMQ.CLI.Ctl.Commands.FederationStatusCommand'). + +-include("rabbit_federation.hrl"). + +-behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour'). + +-export([ + usage/0, + usage_additional/0, + usage_doc_guides/0, + flags/0, + validate/2, + merge_defaults/2, + banner/2, + run/2, + switches/0, + aliases/0, + output/2, + scopes/0, + formatter/0, + help_section/0, + description/0 + ]). + + +%%---------------------------------------------------------------------------- +%% Callbacks +%%---------------------------------------------------------------------------- +usage() -> + <<"federation_status [--only-down]">>. + +usage_additional() -> + [ + {<<"--only-down">>, <<"only display links that failed or are not currently connected">>} + ]. + +usage_doc_guides() -> + [?FEDERATION_GUIDE_URL]. + +help_section() -> + {plugin, federation}. + +description() -> + <<"Displays federation link status">>. + +flags() -> + []. + +validate(_,_) -> + ok. + +formatter() -> + 'Elixir.RabbitMQ.CLI.Formatters.Erlang'. + +merge_defaults(A, Opts) -> + {A, maps:merge(#{only_down => false}, Opts)}. + +banner(_, #{node := Node, only_down := true}) -> + erlang:iolist_to_binary([<<"Listing federation links which are down on node ">>, + atom_to_binary(Node, utf8), <<"...">>]); +banner(_, #{node := Node, only_down := false}) -> + erlang:iolist_to_binary([<<"Listing federation links on node ">>, + atom_to_binary(Node, utf8), <<"...">>]). + +run(_Args, #{node := Node, only_down := OnlyDown}) -> + case rabbit_misc:rpc_call(Node, rabbit_federation_status, status, []) of + {badrpc, _} = Error -> + Error; + Status -> + {stream, filter(Status, OnlyDown)} + end. + +switches() -> + [{only_down, boolean}]. + +aliases() -> + []. + +output({stream, FederationStatus}, _) -> + Formatted = [begin + Timestamp = proplists:get_value(timestamp, St), + Map0 = maps:remove(timestamp, maps:from_list(St)), + Map1 = maps:merge(#{queue => <<>>, + exchange => <<>>, + upstream_queue => <<>>, + upstream_exchange => <<>>, + local_connection => <<>>, + error => <<>>}, Map0), + Map1#{last_changed => fmt_ts(Timestamp)} + end || St <- FederationStatus], + {stream, Formatted}; +output(E, _Opts) -> + 'Elixir.RabbitMQ.CLI.DefaultOutput':output(E). + +scopes() -> + ['ctl', 'diagnostics']. + +%%---------------------------------------------------------------------------- +%% Formatting +%%---------------------------------------------------------------------------- +fmt_ts({{YY, MM, DD}, {Hour, Min, Sec}}) -> + erlang:list_to_binary( + io_lib:format("~4..0w-~2..0w-~2..0w ~2..0w:~2..0w:~2..0w", + [YY, MM, DD, Hour, Min, Sec])). + +filter(Status, _OnlyDown = false) -> + Status; +filter(Status, _OnlyDown = true) -> + [St || St <- Status, + not lists:member(proplists:get_value(status, St), [running, starting])]. diff --git a/deps/rabbitmq_federation/src/Elixir.RabbitMQ.CLI.Ctl.Commands.RestartFederationLinkCommand.erl b/deps/rabbitmq_federation/src/Elixir.RabbitMQ.CLI.Ctl.Commands.RestartFederationLinkCommand.erl new file mode 100644 index 0000000000..8d062c692c --- /dev/null +++ b/deps/rabbitmq_federation/src/Elixir.RabbitMQ.CLI.Ctl.Commands.RestartFederationLinkCommand.erl @@ -0,0 +1,84 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module('Elixir.RabbitMQ.CLI.Ctl.Commands.RestartFederationLinkCommand'). + +-include("rabbit_federation.hrl"). + +-behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour'). + +-export([ + usage/0, + usage_additional/0, + usage_doc_guides/0, + flags/0, + validate/2, + merge_defaults/2, + banner/2, + run/2, + aliases/0, + output/2, + help_section/0, + description/0 + ]). + + +%%---------------------------------------------------------------------------- +%% Callbacks +%%---------------------------------------------------------------------------- +usage() -> + <<"restart_federation_link <link_id>">>. + +usage_additional() -> + [ + {<<"<link_id>">>, <<"ID of the link to restart">>} + ]. + +usage_doc_guides() -> + [?FEDERATION_GUIDE_URL]. + +help_section() -> + {plugin, federation}. + +description() -> + <<"Restarts a running federation link">>. + +flags() -> + []. + +validate([], _Opts) -> + {validation_failure, not_enough_args}; +validate([_, _ | _], _Opts) -> + {validation_failure, too_many_args}; +validate([_], _) -> + ok. + +merge_defaults(A, O) -> + {A, O}. + +banner([Link], #{node := Node}) -> + erlang:iolist_to_binary([<<"Restarting federation link ">>, Link, << " on node ">>, + atom_to_binary(Node, utf8)]). + +run([Id], #{node := Node}) -> + case rabbit_misc:rpc_call(Node, rabbit_federation_status, lookup, [Id]) of + {badrpc, _} = Error -> + Error; + not_found -> + {error, <<"Link with the given ID was not found">>}; + Obj -> + Upstream = proplists:get_value(upstream, Obj), + Supervisor = proplists:get_value(supervisor, Obj), + rabbit_misc:rpc_call(Node, rabbit_federation_link_sup, restart, + [Supervisor, Upstream]) + end. + +aliases() -> + []. + +output(Output, _Opts) -> + 'Elixir.RabbitMQ.CLI.DefaultOutput':output(Output). diff --git a/deps/rabbitmq_federation/src/rabbit_federation_app.erl b/deps/rabbitmq_federation/src/rabbit_federation_app.erl new file mode 100644 index 0000000000..ee7ba91e5f --- /dev/null +++ b/deps/rabbitmq_federation/src/rabbit_federation_app.erl @@ -0,0 +1,38 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_federation_app). + +-behaviour(application). +-export([start/2, stop/1]). + +%% Dummy supervisor - see Ulf Wiger's comment at +%% http://erlang.2086793.n4.nabble.com/initializing-library-applications-without-processes-td2094473.html + +%% All of our actual server processes are supervised by +%% rabbit_federation_sup, which is started by a rabbit_boot_step +%% (since it needs to start up before queue / exchange recovery, so it +%% can't be part of our application). +%% +%% However, we still need an application behaviour since we need to +%% know when our application has started since then the Erlang client +%% will have started and we can therefore start our links going. Since +%% the application behaviour needs a tree of processes to supervise, +%% this is it... +-behaviour(supervisor). +-export([init/1]). + +start(_Type, _StartArgs) -> + rabbit_federation_exchange_link:go(), + rabbit_federation_queue_link:go(), + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +stop(_State) -> + ok. +%%---------------------------------------------------------------------------- + +init([]) -> {ok, {{one_for_one, 3, 10}, []}}. diff --git a/deps/rabbitmq_federation/src/rabbit_federation_db.erl b/deps/rabbitmq_federation/src/rabbit_federation_db.erl new file mode 100644 index 0000000000..e35e3646a8 --- /dev/null +++ b/deps/rabbitmq_federation/src/rabbit_federation_db.erl @@ -0,0 +1,47 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_federation_db). + +-include("rabbit_federation.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). + +-define(DICT, orddict). + +-export([get_active_suffix/3, set_active_suffix/3, prune_scratch/2]). + +%%---------------------------------------------------------------------------- + +get_active_suffix(XName, Upstream, Default) -> + case rabbit_exchange:lookup_scratch(XName, federation) of + {ok, Dict} -> + case ?DICT:find(key(Upstream), Dict) of + {ok, Suffix} -> Suffix; + error -> Default + end; + {error, not_found} -> + Default + end. + +set_active_suffix(XName, Upstream, Suffix) -> + ok = rabbit_exchange:update_scratch( + XName, federation, + fun(D) -> ?DICT:store(key(Upstream), Suffix, ensure(D)) end). + +prune_scratch(XName, Upstreams) -> + ok = rabbit_exchange:update_scratch( + XName, federation, + fun(D) -> Keys = [key(U) || U <- Upstreams], + ?DICT:filter( + fun(K, _V) -> lists:member(K, Keys) end, ensure(D)) + end). + +key(#upstream{name = UpstreamName, exchange_name = XNameBin}) -> + {UpstreamName, XNameBin}. + +ensure(undefined) -> ?DICT:new(); +ensure(D) -> D. diff --git a/deps/rabbitmq_federation/src/rabbit_federation_event.erl b/deps/rabbitmq_federation/src/rabbit_federation_event.erl new file mode 100644 index 0000000000..417b8ecba3 --- /dev/null +++ b/deps/rabbitmq_federation/src/rabbit_federation_event.erl @@ -0,0 +1,54 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_federation_event). +-behaviour(gen_event). + +-include_lib("rabbit_common/include/rabbit.hrl"). + +-export([add_handler/0, remove_handler/0]). + +-export([init/1, handle_call/2, handle_event/2, handle_info/2, + terminate/2, code_change/3]). + +-import(rabbit_misc, [pget/2]). + +%%---------------------------------------------------------------------------- + +add_handler() -> + gen_event:add_handler(rabbit_event, ?MODULE, []). + +remove_handler() -> + gen_event:delete_handler(rabbit_event, ?MODULE, []). + +init([]) -> + {ok, []}. + +handle_call(_Request, State) -> + {ok, not_understood, State}. + +handle_event(#event{type = parameter_set, + props = Props0}, State) -> + Props = rabbit_data_coercion:to_list(Props0), + case {pget(component, Props), pget(name, Props)} of + {global, cluster_name} -> + rabbit_federation_parameters:adjust(everything); + _ -> + ok + end, + {ok, State}; +handle_event(_Event, State) -> + {ok, State}. + +handle_info(_Info, State) -> + {ok, State}. + +terminate(_Arg, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/deps/rabbitmq_federation/src/rabbit_federation_exchange.erl b/deps/rabbitmq_federation/src/rabbit_federation_exchange.erl new file mode 100644 index 0000000000..6b85b6756b --- /dev/null +++ b/deps/rabbitmq_federation/src/rabbit_federation_exchange.erl @@ -0,0 +1,105 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +%% TODO rename this +-module(rabbit_federation_exchange). + +-rabbit_boot_step({?MODULE, + [{description, "federation exchange decorator"}, + {mfa, {rabbit_registry, register, + [exchange_decorator, <<"federation">>, ?MODULE]}}, + {requires, rabbit_registry}, + {cleanup, {rabbit_registry, unregister, + [exchange_decorator, <<"federation">>]}}, + {enables, recovery}]}). + +-include_lib("amqp_client/include/amqp_client.hrl"). + +-behaviour(rabbit_exchange_decorator). + +-export([description/0, serialise_events/1]). +-export([create/2, delete/3, policy_changed/2, + add_binding/3, remove_bindings/3, route/2, active_for/1]). + +%%---------------------------------------------------------------------------- + +description() -> + [{description, <<"Federation exchange decorator">>}]. + +serialise_events(X) -> federate(X). + +create(transaction, _X) -> + ok; +create(none, X) -> + maybe_start(X). + +delete(transaction, _X, _Bs) -> + ok; +delete(none, X, _Bs) -> + maybe_stop(X). + +policy_changed(OldX, NewX) -> + maybe_stop(OldX), + maybe_start(NewX). + +add_binding(transaction, _X, _B) -> + ok; +add_binding(Serial, X = #exchange{name = XName}, B) -> + case federate(X) of + true -> rabbit_federation_exchange_link:add_binding(Serial, XName, B), + ok; + false -> ok + end. + +remove_bindings(transaction, _X, _Bs) -> + ok; +remove_bindings(Serial, X = #exchange{name = XName}, Bs) -> + case federate(X) of + true -> rabbit_federation_exchange_link:remove_bindings(Serial, XName, Bs), + ok; + false -> ok + end. + +route(_, _) -> []. + +active_for(X) -> + case federate(X) of + true -> noroute; + false -> none + end. + +%%---------------------------------------------------------------------------- + +%% Don't federate default exchange, we can't bind to it +federate(#exchange{name = #resource{name = <<"">>}}) -> + false; + +%% Don't federate any of our intermediate exchanges. Note that we use +%% internal=true since older brokers may not declare +%% x-federation-upstream on us. Also other internal exchanges should +%% probably not be federated. +federate(#exchange{internal = true}) -> + false; + +federate(X) -> + rabbit_federation_upstream:federate(X). + +maybe_start(X = #exchange{name = XName})-> + case federate(X) of + true -> ok = rabbit_federation_db:prune_scratch( + XName, rabbit_federation_upstream:for(X)), + ok = rabbit_federation_exchange_link_sup_sup:start_child(X), + ok; + false -> ok + end. + +maybe_stop(X = #exchange{name = XName}) -> + case federate(X) of + true -> ok = rabbit_federation_exchange_link_sup_sup:stop_child(X), + rabbit_federation_status:remove_exchange_or_queue(XName); + false -> ok + end. diff --git a/deps/rabbitmq_federation/src/rabbit_federation_exchange_link.erl b/deps/rabbitmq_federation/src/rabbit_federation_exchange_link.erl new file mode 100644 index 0000000000..869ab047ae --- /dev/null +++ b/deps/rabbitmq_federation/src/rabbit_federation_exchange_link.erl @@ -0,0 +1,696 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_federation_exchange_link). + +%% pg2 is deprecated in OTP 23. +-compile(nowarn_deprecated_function). + +-include_lib("amqp_client/include/amqp_client.hrl"). +-include("rabbit_federation.hrl"). + +-behaviour(gen_server2). + +-export([go/0, add_binding/3, remove_bindings/3]). +-export([list_routing_keys/1]). %% For testing + +-export([start_link/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-import(rabbit_misc, [pget/2]). +-import(rabbit_federation_util, [name/1, vhost/1, pgname/1]). + +-record(state, {upstream, + upstream_params, + upstream_name, + connection, + channel, + cmd_channel, + consumer_tag, + queue, + internal_exchange, + waiting_cmds = gb_trees:empty(), + next_serial, + bindings = #{}, + downstream_connection, + downstream_channel, + downstream_exchange, + unacked, + internal_exchange_timer, + internal_exchange_interval}). + +%%---------------------------------------------------------------------------- + +%% We start off in a state where we do not connect, since we can first +%% start during exchange recovery, when rabbit is not fully started +%% and the Erlang client is not running. This then gets invoked when +%% the federation app is started. +go() -> cast(go). + +add_binding(S, XN, B) -> cast(XN, {enqueue, S, {add_binding, B}}). +remove_bindings(S, XN, Bs) -> cast(XN, {enqueue, S, {remove_bindings, Bs}}). + +list_routing_keys(XN) -> call(XN, list_routing_keys). + +%%---------------------------------------------------------------------------- + +start_link(Args) -> + gen_server2:start_link(?MODULE, Args, [{timeout, infinity}]). + +init({Upstream, XName}) -> + %% If we are starting up due to a policy change then it's possible + %% for the exchange to have been deleted before we got here, in which + %% case it's possible that delete callback would also have been called + %% before we got here. So check if we still exist. + case rabbit_exchange:lookup(XName) of + {ok, X} -> + DeobfuscatedUpstream = rabbit_federation_util:deobfuscate_upstream(Upstream), + DeobfuscatedUParams = rabbit_federation_upstream:to_params(DeobfuscatedUpstream, X), + UParams = rabbit_federation_util:obfuscate_upstream_params(DeobfuscatedUParams), + rabbit_federation_status:report(Upstream, UParams, XName, starting), + join(rabbit_federation_exchanges), + join({rabbit_federation_exchange, XName}), + gen_server2:cast(self(), maybe_go), + {ok, {not_started, {Upstream, UParams, XName}}}; + {error, not_found} -> + rabbit_federation_link_util:log_warning(XName, "not found, stopping link~n", []), + {stop, gone} + end. + +handle_call(list_routing_keys, _From, State = #state{bindings = Bindings}) -> + {reply, lists:sort([K || {K, _} <- maps:keys(Bindings)]), State}; + +handle_call(Msg, _From, State) -> + {stop, {unexpected_call, Msg}, State}. + +handle_cast(maybe_go, S0 = {not_started, _Args}) -> + case federation_up() of + true -> go(S0); + false -> {noreply, S0} + end; + +handle_cast(go, S0 = {not_started, _Args}) -> + go(S0); + +%% There's a small race - I think we can realise federation is up +%% before 'go' gets invoked. Ignore. +handle_cast(go, State) -> + {noreply, State}; + +handle_cast({enqueue, _, _}, State = {not_started, _}) -> + {noreply, State}; + +handle_cast({enqueue, Serial, Cmd}, + State = #state{waiting_cmds = Waiting, + downstream_exchange = XName}) -> + Waiting1 = gb_trees:insert(Serial, Cmd, Waiting), + try + {noreply, play_back_commands(State#state{waiting_cmds = Waiting1})} + catch exit:{{shutdown, {server_initiated_close, 404, Text}}, _} -> + rabbit_federation_link_util:log_warning( + XName, "detected upstream changes, restarting link: ~p~n", [Text]), + {stop, {shutdown, restart}, State} + end; + +handle_cast(Msg, State) -> + {stop, {unexpected_cast, Msg}, State}. + +handle_info(#'basic.consume_ok'{}, State) -> + {noreply, State}; + +handle_info(#'basic.ack'{} = Ack, State = #state{channel = Ch, + unacked = Unacked}) -> + Unacked1 = rabbit_federation_link_util:ack(Ack, Ch, Unacked), + {noreply, State#state{unacked = Unacked1}}; + +handle_info(#'basic.nack'{} = Nack, State = #state{channel = Ch, + unacked = Unacked}) -> + Unacked1 = rabbit_federation_link_util:nack(Nack, Ch, Unacked), + {noreply, State#state{unacked = Unacked1}}; + +handle_info({#'basic.deliver'{routing_key = Key, + redelivered = Redelivered} = DeliverMethod, Msg}, + State = #state{ + upstream = Upstream = #upstream{max_hops = MaxH}, + upstream_params = UParams = #upstream_params{x_or_q = UpstreamX}, + upstream_name = UName, + downstream_exchange = #resource{name = XNameBin, virtual_host = DVhost}, + downstream_channel = DCh, + channel = Ch, + unacked = Unacked}) -> + UVhost = vhost(UpstreamX), + PublishMethod = #'basic.publish'{exchange = XNameBin, + routing_key = Key}, + HeadersFun = fun (H) -> update_routing_headers(UParams, UName, UVhost, Redelivered, H) end, + %% We need to check should_forward/2 here in case the upstream + %% does not have federation and thus is using a fanout exchange. + ForwardFun = fun (H) -> + DName = rabbit_nodes:cluster_name(), + rabbit_federation_util:should_forward(H, MaxH, DName, DVhost) + end, + Unacked1 = rabbit_federation_link_util:forward( + Upstream, DeliverMethod, Ch, DCh, PublishMethod, + HeadersFun, ForwardFun, Msg, Unacked), + {noreply, State#state{unacked = Unacked1}}; + +handle_info(#'basic.cancel'{}, State = #state{upstream = Upstream, + upstream_params = UParams, + downstream_exchange = XName}) -> + rabbit_federation_link_util:connection_error( + local, basic_cancel, Upstream, UParams, XName, State); + +handle_info({'DOWN', _Ref, process, Pid, Reason}, + State = #state{downstream_channel = DCh, + channel = Ch, + cmd_channel = CmdCh, + upstream = Upstream, + upstream_params = UParams, + downstream_exchange = XName}) -> + handle_down(Pid, Reason, Ch, CmdCh, DCh, + {Upstream, UParams, XName}, State); + +handle_info(check_internal_exchange, State = #state{internal_exchange = IntXNameBin, + internal_exchange_interval = Interval}) -> + case check_internal_exchange(IntXNameBin, State) of + upstream_not_found -> + rabbit_log_federation:warning("Federation link could not find upstream exchange '~s' and will restart", + [IntXNameBin]), + {stop, {shutdown, restart}, State}; + _ -> + TRef = erlang:send_after(Interval, self(), check_internal_exchange), + {noreply, State#state{internal_exchange_timer = TRef}} + end; + +handle_info(Msg, State) -> + {stop, {unexpected_info, Msg}, State}. + +terminate(_Reason, {not_started, _}) -> + ok; +terminate(Reason, #state{downstream_connection = DConn, + connection = Conn, + upstream = Upstream, + upstream_params = UParams, + downstream_exchange = XName, + internal_exchange_timer = TRef, + internal_exchange = IntExchange, + queue = Queue}) when Reason =:= shutdown; + Reason =:= {shutdown, restart}; + Reason =:= gone -> + timer:cancel(TRef), + rabbit_federation_link_util:ensure_connection_closed(DConn), + + rabbit_log:debug("Exchange federation: link is shutting down, resource cleanup mode: ~p", [Upstream#upstream.resource_cleanup_mode]), + case Upstream#upstream.resource_cleanup_mode of + never -> ok; + _ -> + %% This is a normal shutdown and we are allowed to clean up the internally used queue and exchange + rabbit_log:debug("Federated exchange '~s' link will delete its internal queue '~s'", [Upstream#upstream.exchange_name, Queue]), + delete_upstream_queue(Conn, Queue), + rabbit_log:debug("Federated exchange '~s' link will delete its upstream exchange", [Upstream#upstream.exchange_name]), + delete_upstream_exchange(Conn, IntExchange) + end, + + rabbit_federation_link_util:ensure_connection_closed(Conn), + rabbit_federation_link_util:log_terminate(Reason, Upstream, UParams, XName), + ok; +%% unexpected shutdown +terminate(Reason, #state{downstream_connection = DConn, + connection = Conn, + upstream = Upstream, + upstream_params = UParams, + downstream_exchange = XName, + internal_exchange_timer = TRef}) -> + timer:cancel(TRef), + + rabbit_federation_link_util:ensure_connection_closed(DConn), + + %% unlike in the clean shutdown case above, we keep the queue + %% and exchange around + + rabbit_federation_link_util:ensure_connection_closed(Conn), + rabbit_federation_link_util:log_terminate(Reason, Upstream, UParams, XName), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%---------------------------------------------------------------------------- + +call(XName, Msg) -> [gen_server2:call(Pid, Msg, infinity) || Pid <- x(XName)]. +cast(Msg) -> [gen_server2:cast(Pid, Msg) || Pid <- all()]. +cast(XName, Msg) -> [gen_server2:cast(Pid, Msg) || Pid <- x(XName)]. + +join(Name) -> + pg2:create(pgname(Name)), + ok = pg2:join(pgname(Name), self()). + +all() -> + pg2:create(pgname(rabbit_federation_exchanges)), + pg2:get_members(pgname(rabbit_federation_exchanges)). + +x(XName) -> + pg2:create(pgname({rabbit_federation_exchange, XName})), + pg2:get_members(pgname({rabbit_federation_exchange, XName})). + +%%---------------------------------------------------------------------------- + +federation_up() -> is_pid(whereis(rabbit_federation_app)). + +handle_command({add_binding, Binding}, State) -> + add_binding(Binding, State); + +handle_command({remove_bindings, Bindings}, State) -> + lists:foldl(fun remove_binding/2, State, Bindings). + +play_back_commands(State = #state{waiting_cmds = Waiting, + next_serial = Next}) -> + case gb_trees:is_empty(Waiting) of + false -> case gb_trees:take_smallest(Waiting) of + {Next, Cmd, Waiting1} -> + %% The next one. Just execute it. + play_back_commands( + handle_command(Cmd, State#state{ + waiting_cmds = Waiting1, + next_serial = Next + 1})); + {Serial, _Cmd, Waiting1} when Serial < Next -> + %% This command came from before we executed + %% binding:list_for_source. Ignore it. + play_back_commands(State#state{ + waiting_cmds = Waiting1}); + _ -> + %% Some future command. Don't do anything. + State + end; + true -> State + end. + +add_binding(B, State) -> + binding_op(fun record_binding/2, bind_cmd(bind, B, State), B, State). + +remove_binding(B, State) -> + binding_op(fun forget_binding/2, bind_cmd(unbind, B, State), B, State). + +record_binding(B = #binding{destination = Dest}, + State = #state{bindings = Bs}) -> + {DoIt, Set} = case maps:find(key(B), Bs) of + error -> {true, sets:from_list([Dest])}; + {ok, Dests} -> {false, sets:add_element( + Dest, Dests)} + end, + {DoIt, State#state{bindings = maps:put(key(B), Set, Bs)}}. + +forget_binding(B = #binding{destination = Dest}, + State = #state{bindings = Bs}) -> + Dests = sets:del_element(Dest, maps:get(key(B), Bs)), + {DoIt, Bs1} = case sets:size(Dests) of + 0 -> {true, maps:remove(key(B), Bs)}; + _ -> {false, maps:put(key(B), Dests, Bs)} + end, + {DoIt, State#state{bindings = Bs1}}. + +binding_op(UpdateFun, Cmd, B = #binding{args = Args}, + State = #state{cmd_channel = Ch}) -> + {DoIt, State1} = + case rabbit_misc:table_lookup(Args, ?BINDING_HEADER) of + undefined -> UpdateFun(B, State); + {array, _} -> {Cmd =/= ignore, State} + end, + case DoIt of + true -> amqp_channel:call(Ch, Cmd); + false -> ok + end, + State1. + +bind_cmd(Type, #binding{key = Key, args = Args}, + State = #state{internal_exchange = IntXNameBin, + upstream_params = UpstreamParams, + upstream = Upstream}) -> + #upstream_params{x_or_q = X} = UpstreamParams, + #upstream{bind_nowait = Nowait} = Upstream, + case update_binding(Args, State) of + ignore -> ignore; + NewArgs -> bind_cmd0(Type, name(X), IntXNameBin, Key, NewArgs, Nowait) + end. + +bind_cmd0(bind, Source, Destination, RoutingKey, Arguments, Nowait) -> + #'exchange.bind'{source = Source, + destination = Destination, + routing_key = RoutingKey, + arguments = Arguments, + nowait = Nowait}; + +bind_cmd0(unbind, Source, Destination, RoutingKey, Arguments, Nowait) -> + #'exchange.unbind'{source = Source, + destination = Destination, + routing_key = RoutingKey, + arguments = Arguments, + nowait = Nowait}. + +%% This function adds information about the current node to the +%% binding arguments, or returns 'ignore' if it determines the binding +%% should propagate no further. The interesting part is the latter. +%% +%% We want bindings to propagate in the same way as messages +%% w.r.t. max_hops - if we determine that a message can get from node +%% A to B (assuming bindings are in place) then it follows that a +%% binding at B should propagate back to A, and no further. There is +%% no point in propagating bindings past the point where messages +%% would propagate, and we will lose messages if bindings don't +%% propagate as far. +%% +%% Note that we still want to have limits on how far messages can +%% propagate: limiting our bindings is not enough, since other +%% bindings from other nodes can overlap. +%% +%% So in short we want bindings to obey max_hops. However, they can't +%% just obey the max_hops of the current link, since they are +%% travelling in the opposite direction to messages! Consider the +%% following federation: +%% +%% A -----------> B -----------> C +%% max_hops=1 max_hops=2 +%% +%% where the arrows indicate message flow. A binding created at C +%% should propagate to B, then to A, and no further. Therefore every +%% time we traverse a link, we keep a count of the number of hops that +%% a message could have made so far to reach this point, and still be +%% able to propagate. When this number ("hops" below) reaches 0 we +%% propagate no further. +%% +%% hops(link(N)) is given by: +%% +%% min(hops(link(N-1))-1, max_hops(link(N))) +%% +%% where link(N) is the link that bindings propagate over after N +%% steps (e.g. link(1) is CB above, link(2) is BA). +%% +%% In other words, we count down to 0 from the link with the most +%% restrictive max_hops we have yet passed through. + +update_binding(Args, #state{downstream_exchange = X, + upstream = Upstream, + upstream_params = #upstream_params{x_or_q = UpstreamX}, + upstream_name = UName}) -> + #upstream{max_hops = MaxHops} = Upstream, + UVhost = vhost(UpstreamX), + Hops = case rabbit_misc:table_lookup(Args, ?BINDING_HEADER) of + undefined -> MaxHops; + {array, All} -> [{table, Prev} | _] = All, + PrevHops = get_hops(Prev), + case rabbit_federation_util:already_seen( + UName, UVhost, All) of + true -> 0; + false -> lists:min([PrevHops - 1, MaxHops]) + end + end, + case Hops of + 0 -> ignore; + _ -> Cluster = rabbit_nodes:cluster_name(), + ABSuffix = rabbit_federation_db:get_active_suffix( + X, Upstream, <<"A">>), + DVhost = vhost(X), + DName = name(X), + Down = <<DVhost/binary,":", DName/binary, " ", ABSuffix/binary>>, + Info = [{<<"cluster-name">>, longstr, Cluster}, + {<<"vhost">>, longstr, DVhost}, + {<<"exchange">>, longstr, Down}, + {<<"hops">>, short, Hops}], + rabbit_basic:prepend_table_header(?BINDING_HEADER, Info, Args) + end. + + + +key(#binding{key = Key, args = Args}) -> {Key, Args}. + +go(S0 = {not_started, {Upstream, UParams, DownXName}}) -> + Unacked = rabbit_federation_link_util:unacked_new(), + + log_link_startup_attempt(Upstream, DownXName), + rabbit_federation_link_util:start_conn_ch( + fun (Conn, Ch, DConn, DCh) -> + {ok, CmdCh} = open_cmd_channel(Conn, Upstream, UParams, DownXName, S0), + erlang:monitor(process, CmdCh), + Props = pget(server_properties, + amqp_connection:info(Conn, [server_properties])), + UName = case rabbit_misc:table_lookup( + Props, <<"cluster_name">>) of + {longstr, N} -> N; + _ -> unknown + end, + {Serial, Bindings} = + rabbit_misc:execute_mnesia_transaction( + fun () -> + {rabbit_exchange:peek_serial(DownXName), + rabbit_binding:list_for_source(DownXName)} + end), + true = is_integer(Serial), + %% If we are very short lived, Serial can be undefined at + %% this point (since the deletion of the X could have + %% overtaken the creation of this process). However, this + %% is not a big deal - 'undefined' just becomes the next + %% serial we will process. Since it compares larger than + %% any number we never process any commands. And we will + %% soon get told to stop anyway. + {ok, Interval} = application:get_env(rabbitmq_federation, + internal_exchange_check_interval), + State = ensure_upstream_bindings( + consume_from_upstream_queue( + #state{upstream = Upstream, + upstream_params = UParams, + upstream_name = UName, + connection = Conn, + channel = Ch, + cmd_channel = CmdCh, + next_serial = Serial, + downstream_connection = DConn, + downstream_channel = DCh, + downstream_exchange = DownXName, + unacked = Unacked, + internal_exchange_interval = Interval}), + Bindings), + rabbit_log_federation:info("Federation link for ~s (upstream: ~s) will perform internal exchange checks " + "every ~b seconds", [rabbit_misc:rs(DownXName), UName, round(Interval / 1000)]), + TRef = erlang:send_after(Interval, self(), check_internal_exchange), + {noreply, State#state{internal_exchange_timer = TRef}} + end, Upstream, UParams, DownXName, S0). + +log_link_startup_attempt(OUpstream, DownXName) -> + rabbit_log_federation:debug("Will try to start a federation link for ~s, upstream: '~s'", + [rabbit_misc:rs(DownXName), OUpstream#upstream.name]). + +open_cmd_channel(Conn, Upstream = #upstream{name = UName}, UParams, DownXName, S0) -> + rabbit_log_federation:debug("Will open a command channel to upstream '~s' for downstream federated ~s", + [UName, rabbit_misc:rs(DownXName)]), + case amqp_connection:open_channel(Conn) of + {ok, CCh} -> + erlang:monitor(process, CCh), + {ok, CCh}; + E -> + rabbit_federation_link_util:ensure_connection_closed(Conn), + rabbit_federation_link_util:connection_error(command_channel, E, + Upstream, UParams, DownXName, S0), + E + end. + +consume_from_upstream_queue( + State = #state{upstream = Upstream, + upstream_params = UParams, + channel = Ch, + downstream_exchange = DownXName}) -> + #upstream{prefetch_count = Prefetch, + expires = Expiry, + message_ttl = TTL, + ha_policy = HA} = Upstream, + #upstream_params{x_or_q = X, + params = Params} = UParams, + Q = upstream_queue_name(name(X), vhost(Params), DownXName), + Args = [A || {_K, _T, V} = A + <- [{<<"x-expires">>, long, Expiry}, + {<<"x-message-ttl">>, long, TTL}, + {<<"x-ha-policy">>, longstr, HA}, + {<<"x-internal-purpose">>, longstr, <<"federation">>}], + V =/= none], + amqp_channel:call(Ch, #'queue.declare'{queue = Q, + durable = true, + arguments = Args}), + NoAck = Upstream#upstream.ack_mode =:= 'no-ack', + case NoAck of + false -> amqp_channel:call(Ch, #'basic.qos'{prefetch_count = Prefetch}); + true -> ok + end, + #'basic.consume_ok'{consumer_tag = CTag} = + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, + no_ack = NoAck}, self()), + State#state{consumer_tag = CTag, + queue = Q}. + +ensure_upstream_bindings(State = #state{upstream = Upstream, + connection = Conn, + channel = Ch, + downstream_exchange = DownXName, + queue = Q}, Bindings) -> + OldSuffix = rabbit_federation_db:get_active_suffix( + DownXName, Upstream, <<"A">>), + Suffix = case OldSuffix of + <<"A">> -> <<"B">>; + <<"B">> -> <<"A">> + end, + IntXNameBin = upstream_exchange_name(Q, Suffix), + ensure_upstream_exchange(State), + ensure_internal_exchange(IntXNameBin, State), + amqp_channel:call(Ch, #'queue.bind'{exchange = IntXNameBin, queue = Q}), + State1 = State#state{internal_exchange = IntXNameBin}, + rabbit_federation_db:set_active_suffix(DownXName, Upstream, Suffix), + State2 = lists:foldl(fun add_binding/2, State1, Bindings), + OldIntXNameBin = upstream_exchange_name(Q, OldSuffix), + delete_upstream_exchange(Conn, OldIntXNameBin), + State2. + +ensure_upstream_exchange(#state{upstream_params = UParams, + connection = Conn, + channel = Ch}) -> + #upstream_params{x_or_q = X} = UParams, + #exchange{type = Type, + durable = Durable, + auto_delete = AutoDelete, + internal = Internal, + arguments = Arguments} = X, + Decl = #'exchange.declare'{exchange = name(X), + type = list_to_binary(atom_to_list(Type)), + durable = Durable, + auto_delete = AutoDelete, + internal = Internal, + arguments = Arguments}, + rabbit_federation_link_util:disposable_channel_call( + Conn, Decl#'exchange.declare'{passive = true}, + fun(?NOT_FOUND, _Text) -> + amqp_channel:call(Ch, Decl) + end). + +ensure_internal_exchange(IntXNameBin, + #state{upstream = #upstream{max_hops = MaxHops, name = UName}, + upstream_params = UParams, + connection = Conn, + channel = Ch, + downstream_exchange = #resource{virtual_host = DVhost}}) -> + rabbit_log_federation:debug("Exchange federation will set up exchange '~s' in upstream '~s'", + [IntXNameBin, UName]), + #upstream_params{params = Params} = rabbit_federation_util:deobfuscate_upstream_params(UParams), + rabbit_log_federation:debug("Will delete upstream exchange '~s'", [IntXNameBin]), + delete_upstream_exchange(Conn, IntXNameBin), + rabbit_log_federation:debug("Will declare an internal upstream exchange '~s'", [IntXNameBin]), + Base = #'exchange.declare'{exchange = IntXNameBin, + durable = true, + internal = true, + auto_delete = true}, + Purpose = [{<<"x-internal-purpose">>, longstr, <<"federation">>}], + XFUArgs = [{?MAX_HOPS_ARG, long, MaxHops}, + {?DOWNSTREAM_NAME_ARG, longstr, cycle_detection_node_identifier()}, + {?DOWNSTREAM_VHOST_ARG, longstr, DVhost} + | Purpose], + XFU = Base#'exchange.declare'{type = <<"x-federation-upstream">>, + arguments = XFUArgs}, + Fan = Base#'exchange.declare'{type = <<"fanout">>, + arguments = Purpose}, + rabbit_federation_link_util:disposable_connection_call( + Params, XFU, fun(?COMMAND_INVALID, _Text) -> + amqp_channel:call(Ch, Fan) + end). + +check_internal_exchange(IntXNameBin, + #state{upstream = #upstream{max_hops = MaxHops, name = UName}, + upstream_params = UParams, + downstream_exchange = XName = #resource{virtual_host = DVhost}}) -> + #upstream_params{params = Params} = + rabbit_federation_util:deobfuscate_upstream_params(UParams), + rabbit_log_federation:debug("Exchange federation will check on exchange '~s' in upstream '~s'", + [IntXNameBin, UName]), + Base = #'exchange.declare'{exchange = IntXNameBin, + passive = true, + durable = true, + internal = true, + auto_delete = true}, + Purpose = [{<<"x-internal-purpose">>, longstr, <<"federation">>}], + XFUArgs = [{?MAX_HOPS_ARG, long, MaxHops}, + {?DOWNSTREAM_NAME_ARG, longstr, cycle_detection_node_identifier()}, + {?DOWNSTREAM_VHOST_ARG, longstr, DVhost} + | Purpose], + XFU = Base#'exchange.declare'{type = <<"x-federation-upstream">>, + arguments = XFUArgs}, + rabbit_federation_link_util:disposable_connection_call( + Params, XFU, fun(404, Text) -> + rabbit_federation_link_util:log_warning( + XName, "detected internal upstream exchange changes," + " restarting link: ~p~n", [Text]), + upstream_not_found; + (Code, Text) -> + rabbit_federation_link_util:log_warning( + XName, "internal upstream exchange check failed: ~p ~p~n", + [Code, Text]), + error + end). + +upstream_queue_name(XNameBin, VHost, #resource{name = DownXNameBin, + virtual_host = DownVHost}) -> + Node = rabbit_nodes:cluster_name(), + DownPart = case DownVHost of + VHost -> case DownXNameBin of + XNameBin -> <<"">>; + _ -> <<":", DownXNameBin/binary>> + end; + _ -> <<":", DownVHost/binary, + ":", DownXNameBin/binary>> + end, + <<"federation: ", XNameBin/binary, " -> ", Node/binary, DownPart/binary>>. + +cycle_detection_node_identifier() -> + rabbit_nodes:cluster_name(). + +upstream_exchange_name(UpstreamQName, Suffix) -> + <<UpstreamQName/binary, " ", Suffix/binary>>. + +delete_upstream_exchange(Conn, XNameBin) -> + rabbit_federation_link_util:disposable_channel_call( + Conn, #'exchange.delete'{exchange = XNameBin}). + +delete_upstream_queue(Conn, Queue) -> + rabbit_federation_link_util:disposable_channel_call( + Conn, #'queue.delete'{queue = Queue}). + +update_routing_headers(#upstream_params{table = Table}, UpstreamName, UVhost, Redelivered, Headers) -> + NewValue = Table ++ + [{<<"redelivered">>, bool, Redelivered}] ++ + header_for_upstream_name(UpstreamName) ++ + header_for_upstream_vhost(UVhost), + rabbit_basic:prepend_table_header(?ROUTING_HEADER, NewValue, Headers). + +header_for_upstream_name(unknown) -> []; +header_for_upstream_name(Name) -> [{<<"cluster-name">>, longstr, Name}]. + +header_for_upstream_vhost(unknown) -> []; +header_for_upstream_vhost(Name) -> [{<<"vhost">>, longstr, Name}]. + +get_hops(Table) -> + case rabbit_misc:table_lookup(Table, <<"hops">>) of + %% see rabbit_binary_generator + {short, N} -> N; + {long, N} -> N; + {byte, N} -> N; + {signedint, N} -> N; + {unsignedbyte, N} -> N; + {unsignedshort, N} -> N; + {unsignedint, N} -> N; + {_, N} when is_integer(N) andalso N >= 0 -> N + end. + +handle_down(DCh, Reason, _Ch, _CmdCh, DCh, Args, State) -> + rabbit_federation_link_util:handle_downstream_down(Reason, Args, State); +handle_down(ChPid, Reason, Ch, CmdCh, _DCh, Args, State) + when ChPid =:= Ch; ChPid =:= CmdCh -> + rabbit_federation_link_util:handle_upstream_down(Reason, Args, State). diff --git a/deps/rabbitmq_federation/src/rabbit_federation_exchange_link_sup_sup.erl b/deps/rabbitmq_federation/src/rabbit_federation_exchange_link_sup_sup.erl new file mode 100644 index 0000000000..fda76a5070 --- /dev/null +++ b/deps/rabbitmq_federation/src/rabbit_federation_exchange_link_sup_sup.erl @@ -0,0 +1,75 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_federation_exchange_link_sup_sup). + +-behaviour(mirrored_supervisor). + +-include_lib("rabbit_common/include/rabbit.hrl"). +-define(SUPERVISOR, ?MODULE). + +%% Supervises the upstream links for all exchanges (but not queues). We need +%% different handling here since exchanges want a mirrored sup. + +-export([start_link/0, start_child/1, adjust/1, stop_child/1]). +-export([init/1]). + +%%---------------------------------------------------------------------------- + +start_link() -> + mirrored_supervisor:start_link({local, ?SUPERVISOR}, ?SUPERVISOR, + fun rabbit_misc:execute_mnesia_transaction/1, + ?MODULE, []). + +%% Note that the next supervisor down, rabbit_federation_link_sup, is common +%% between exchanges and queues. +start_child(X) -> + case mirrored_supervisor:start_child( + ?SUPERVISOR, + {id(X), {rabbit_federation_link_sup, start_link, [X]}, + transient, ?SUPERVISOR_WAIT, supervisor, + [rabbit_federation_link_sup]}) of + {ok, _Pid} -> ok; + {error, {already_started, _Pid}} -> + #exchange{name = ExchangeName} = X, + rabbit_log_federation:debug("Federation link for exchange ~p was already started", + [rabbit_misc:rs(ExchangeName)]), + ok; + %% A link returned {stop, gone}, the link_sup shut down, that's OK. + {error, {shutdown, _}} -> ok + end. + +adjust({clear_upstream, VHost, UpstreamName}) -> + [rabbit_federation_link_sup:adjust(Pid, X, {clear_upstream, UpstreamName}) || + {#exchange{name = Name} = X, Pid, _, _} <- mirrored_supervisor:which_children(?SUPERVISOR), + Name#resource.virtual_host == VHost], + ok; +adjust(Reason) -> + [rabbit_federation_link_sup:adjust(Pid, X, Reason) || + {X, Pid, _, _} <- mirrored_supervisor:which_children(?SUPERVISOR)], + ok. + +stop_child(X) -> + case mirrored_supervisor:terminate_child(?SUPERVISOR, id(X)) of + ok -> ok; + {error, Err} -> + #exchange{name = ExchangeName} = X, + rabbit_log_federation:warning( + "Attempt to stop a federation link for exchange ~p failed: ~p", + [rabbit_misc:rs(ExchangeName), Err]), + ok + end, + ok = mirrored_supervisor:delete_child(?SUPERVISOR, id(X)). + +%%---------------------------------------------------------------------------- + +init([]) -> + {ok, {{one_for_one, 1200, 60}, []}}. + +%% See comment in rabbit_federation_queue_link_sup_sup:id/1 +id(X = #exchange{policy = Policy}) -> X1 = rabbit_exchange:immutable(X), + X1#exchange{policy = Policy}. diff --git a/deps/rabbitmq_federation/src/rabbit_federation_link_sup.erl b/deps/rabbitmq_federation/src/rabbit_federation_link_sup.erl new file mode 100644 index 0000000000..27d1b50277 --- /dev/null +++ b/deps/rabbitmq_federation/src/rabbit_federation_link_sup.erl @@ -0,0 +1,109 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_federation_link_sup). + +-behaviour(supervisor2). + +-include_lib("rabbit_common/include/rabbit.hrl"). +-include_lib("rabbit/include/amqqueue.hrl"). +-include("rabbit_federation.hrl"). + +%% Supervises the upstream links for an exchange or queue. + +-export([start_link/1, adjust/3, restart/2]). +-export([init/1]). + +start_link(XorQ) -> + supervisor2:start_link(?MODULE, XorQ). + +adjust(Sup, XorQ, everything) -> + [stop(Sup, Upstream, XorQ) || + {Upstream, _, _, _} <- supervisor2:which_children(Sup)], + [{ok, _Pid} = supervisor2:start_child(Sup, Spec) || Spec <- specs(XorQ)]; + +adjust(Sup, XorQ, {upstream, UpstreamName}) -> + OldUpstreams0 = children(Sup, UpstreamName), + NewUpstreams0 = rabbit_federation_upstream:for(XorQ, UpstreamName), + %% If any haven't changed, don't restart them. The broker will + %% avoid telling us about connections that have not changed + %% syntactically, but even if one has, this XorQ may not have that + %% connection in an upstream, so we still need to check here. + {OldUpstreams, NewUpstreams} = + lists:foldl( + fun (OldU, {OldUs, NewUs}) -> + case lists:member(OldU, NewUs) of + true -> {OldUs -- [OldU], NewUs -- [OldU]}; + false -> {OldUs, NewUs} + end + end, {OldUpstreams0, NewUpstreams0}, OldUpstreams0), + [stop(Sup, OldUpstream, XorQ) || OldUpstream <- OldUpstreams], + [start(Sup, NewUpstream, XorQ) || NewUpstream <- NewUpstreams]; + +adjust(Sup, XorQ, {clear_upstream, UpstreamName}) -> + ok = rabbit_federation_db:prune_scratch( + name(XorQ), rabbit_federation_upstream:for(XorQ)), + [stop(Sup, Upstream, XorQ) || Upstream <- children(Sup, UpstreamName)]; + +adjust(Sup, X = #exchange{name = XName}, {upstream_set, _Set}) -> + adjust(Sup, X, everything), + case rabbit_federation_upstream:federate(X) of + false -> ok; + true -> ok = rabbit_federation_db:prune_scratch( + XName, rabbit_federation_upstream:for(X)) + end; +adjust(Sup, Q, {upstream_set, _}) when ?is_amqqueue(Q) -> + adjust(Sup, Q, everything); +adjust(Sup, XorQ, {clear_upstream_set, _}) -> + adjust(Sup, XorQ, everything). + +restart(Sup, Upstream) -> + ok = supervisor2:terminate_child(Sup, Upstream), + {ok, _Pid} = supervisor2:restart_child(Sup, Upstream), + ok. + +start(Sup, Upstream, XorQ) -> + {ok, _Pid} = supervisor2:start_child(Sup, spec(rabbit_federation_util:obfuscate_upstream(Upstream), XorQ)), + ok. + +stop(Sup, Upstream, XorQ) -> + ok = supervisor2:terminate_child(Sup, Upstream), + ok = supervisor2:delete_child(Sup, Upstream), + %% While the link will report its own removal, that only works if + %% the link was actually up. If the link was broken and failing to + %% come up, the possibility exists that there *is* no link + %% process, but we still have a report in the status table. So + %% remove it here too. + rabbit_federation_status:remove(Upstream, name(XorQ)). + +children(Sup, UpstreamName) -> + rabbit_federation_util:find_upstreams( + UpstreamName, [U || {U, _, _, _} <- supervisor2:which_children(Sup)]). + +%%---------------------------------------------------------------------------- + +init(XorQ) -> + %% 1, ?MAX_WAIT so that we always give up after one fast retry and get + %% into the reconnect delay. + {ok, {{one_for_one, 1, ?MAX_WAIT}, specs(XorQ)}}. + +specs(XorQ) -> + [spec(rabbit_federation_util:obfuscate_upstream(Upstream), XorQ) + || Upstream <- rabbit_federation_upstream:for(XorQ)]. + +spec(U = #upstream{reconnect_delay = Delay}, #exchange{name = XName}) -> + {U, {rabbit_federation_exchange_link, start_link, [{U, XName}]}, + {permanent, Delay}, ?WORKER_WAIT, worker, + [rabbit_federation_exchange_link]}; + +spec(Upstream = #upstream{reconnect_delay = Delay}, Q) when ?is_amqqueue(Q) -> + {Upstream, {rabbit_federation_queue_link, start_link, [{Upstream, Q}]}, + {permanent, Delay}, ?WORKER_WAIT, worker, + [rabbit_federation_queue_link]}. + +name(#exchange{name = XName}) -> XName; +name(Q) when ?is_amqqueue(Q) -> amqqueue:get_name(Q). diff --git a/deps/rabbitmq_federation/src/rabbit_federation_link_util.erl b/deps/rabbitmq_federation/src/rabbit_federation_link_util.erl new file mode 100644 index 0000000000..a5fd560f0b --- /dev/null +++ b/deps/rabbitmq_federation/src/rabbit_federation_link_util.erl @@ -0,0 +1,364 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_federation_link_util). + +-include_lib("rabbit/include/amqqueue.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). +-include("rabbit_federation.hrl"). + +%% real +-export([start_conn_ch/5, disposable_channel_call/2, disposable_channel_call/3, + disposable_connection_call/3, ensure_connection_closed/1, + log_terminate/4, unacked_new/0, ack/3, nack/3, forward/9, + handle_downstream_down/3, handle_upstream_down/3, + get_connection_name/2, log_debug/3, log_info/3, log_warning/3, + log_error/3]). + +%% temp +-export([connection_error/6]). + +-import(rabbit_misc, [pget/2]). + +-define(MAX_CONNECTION_CLOSE_TIMEOUT, 10000). + +%%---------------------------------------------------------------------------- + +start_conn_ch(Fun, OUpstream, OUParams, + XorQName = #resource{virtual_host = DownVHost}, State) -> + + Upstream = rabbit_federation_util:deobfuscate_upstream(OUpstream), + UParams = rabbit_federation_util:deobfuscate_upstream_params(OUParams), + + ConnName = get_connection_name(Upstream, UParams), + case open_monitor(#amqp_params_direct{virtual_host = DownVHost}, ConnName) of + {ok, DConn, DCh} -> + case Upstream#upstream.ack_mode of + 'on-confirm' -> + #'confirm.select_ok'{} = + amqp_channel:call(DCh, #'confirm.select'{}), + amqp_channel:register_confirm_handler(DCh, self()); + _ -> + ok + end, + case open_monitor(UParams#upstream_params.params, ConnName) of + {ok, Conn, Ch} -> + %% Don't trap exits until we have established + %% connections so that if we try to delete + %% federation upstreams while waiting for a + %% connection to be established then we don't + %% block + process_flag(trap_exit, true), + try + R = Fun(Conn, Ch, DConn, DCh), + log_info( + XorQName, "connected to ~s~n", + [rabbit_federation_upstream:params_to_string( + UParams)]), + Name = pget(name, amqp_connection:info(DConn, [name])), + rabbit_federation_status:report( + OUpstream, OUParams, XorQName, {running, Name}), + R + catch exit:E -> + %% terminate/2 will not get this, as we + %% have not put them in our state yet + ensure_connection_closed(DConn), + ensure_connection_closed(Conn), + connection_error(remote_start, E, + OUpstream, OUParams, XorQName, State) + end; + E -> + ensure_connection_closed(DConn), + connection_error(remote_start, E, + OUpstream, OUParams, XorQName, State) + end; + E -> + connection_error(local_start, E, + OUpstream, OUParams, XorQName, State) + end. + +get_connection_name(#upstream{name = UpstreamName}, + #upstream_params{x_or_q = Resource}) when is_record(Resource, exchange)-> + Policy = Resource#exchange.policy, + PolicyName = proplists:get_value(name, Policy), + connection_name(UpstreamName, PolicyName); + +get_connection_name(#upstream{name = UpstreamName}, + #upstream_params{x_or_q = Resource}) when ?is_amqqueue(Resource) -> + Policy = amqqueue:get_policy(Resource), + PolicyName = proplists:get_value(name, Policy), + connection_name(UpstreamName, PolicyName); + +get_connection_name(_, _) -> + connection_name(undefined, undefined). + +connection_name(Upstream, Policy) when is_binary(Upstream), is_binary(Policy) -> + <<<<"Federation link (upstream: ">>/binary, Upstream/binary, <<", policy: ">>/binary, Policy/binary, <<")">>/binary>>; +connection_name(_, _) -> + <<"Federation link">>. + +open_monitor(Params, Name) -> + case open(Params, Name) of + {ok, Conn, Ch} -> erlang:monitor(process, Ch), + {ok, Conn, Ch}; + E -> E + end. + +open(Params, Name) -> + try + amqp_connection:start(Params, Name) + of + {ok, Conn} -> + try + amqp_connection:open_channel(Conn) + of + {ok, Ch} -> {ok, Conn, Ch}; + E -> ensure_connection_closed(Conn), + E + catch + _:E -> + ensure_connection_closed(Conn), + E + end; + E -> E + catch + _:E -> E + end. + +ensure_channel_closed(Ch) -> catch amqp_channel:close(Ch). + +ensure_connection_closed(Conn) -> + catch amqp_connection:close(Conn, ?MAX_CONNECTION_CLOSE_TIMEOUT). + +connection_error(remote_start, {{shutdown, {server_initiated_close, Code, Message}}, _} = E, + Upstream, UParams, XorQName, State) -> + rabbit_federation_status:report( + Upstream, UParams, XorQName, clean_reason(E)), + log_warning(XorQName, + "did not connect to ~s. Server has closed the connection due to an error, code: ~p, " + "message: ~s", + [rabbit_federation_upstream:params_to_string(UParams), + Code, Message]), + {stop, {shutdown, restart}, State}; + +connection_error(remote_start, E, Upstream, UParams, XorQName, State) -> + rabbit_federation_status:report( + Upstream, UParams, XorQName, clean_reason(E)), + log_warning(XorQName, "did not connect to ~s. Reason: ~p", + [rabbit_federation_upstream:params_to_string(UParams), + E]), + {stop, {shutdown, restart}, State}; + +connection_error(remote, E, Upstream, UParams, XorQName, State) -> + rabbit_federation_status:report( + Upstream, UParams, XorQName, clean_reason(E)), + log_info(XorQName, "disconnected from ~s~n~p", + [rabbit_federation_upstream:params_to_string(UParams), E]), + {stop, {shutdown, restart}, State}; + +connection_error(command_channel, E, Upstream, UParams, XorQName, State) -> + rabbit_federation_status:report( + Upstream, UParams, XorQName, clean_reason(E)), + log_info(XorQName, "failed to open a command channel for upstream ~s~n~p", + [rabbit_federation_upstream:params_to_string(UParams), E]), + {stop, {shutdown, restart}, State}; + +connection_error(local, basic_cancel, Upstream, UParams, XorQName, State) -> + rabbit_federation_status:report( + Upstream, UParams, XorQName, {error, basic_cancel}), + log_info(XorQName, "received a 'basic.cancel'", []), + {stop, {shutdown, restart}, State}; + +connection_error(local_start, E, Upstream, UParams, XorQName, State) -> + rabbit_federation_status:report( + Upstream, UParams, XorQName, clean_reason(E)), + log_warning(XorQName, "did not connect locally~n~p", [E]), + {stop, {shutdown, restart}, State}. + +%% If we terminate due to a gen_server call exploding (almost +%% certainly due to an amqp_channel:call() exploding) then we do not +%% want to report the gen_server call in our status. +clean_reason({E = {shutdown, _}, _}) -> E; +clean_reason(E) -> E. + +%% local / disconnected never gets invoked, see handle_info({'DOWN', ... + +%%---------------------------------------------------------------------------- + +unacked_new() -> gb_trees:empty(). + +ack(#'basic.ack'{delivery_tag = Seq, + multiple = Multiple}, Ch, Unack) -> + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = gb_trees:get(Seq, Unack), + multiple = Multiple}), + remove_delivery_tags(Seq, Multiple, Unack). + + +%% Note: at time of writing the broker will never send requeue=false. And it's +%% hard to imagine why it would. But we may as well handle it. +nack(#'basic.nack'{delivery_tag = Seq, + multiple = Multiple, + requeue = Requeue}, Ch, Unack) -> + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = gb_trees:get(Seq, Unack), + multiple = Multiple, + requeue = Requeue}), + remove_delivery_tags(Seq, Multiple, Unack). + +remove_delivery_tags(Seq, false, Unacked) -> + gb_trees:delete(Seq, Unacked); +remove_delivery_tags(Seq, true, Unacked) -> + case gb_trees:is_empty(Unacked) of + true -> Unacked; + false -> {Smallest, _Val, Unacked1} = gb_trees:take_smallest(Unacked), + case Smallest > Seq of + true -> Unacked; + false -> remove_delivery_tags(Seq, true, Unacked1) + end + end. + +forward(#upstream{ack_mode = AckMode, + trust_user_id = Trust}, + #'basic.deliver'{delivery_tag = DT}, + Ch, DCh, PublishMethod, HeadersFun, ForwardFun, Msg, Unacked) -> + Headers = extract_headers(Msg), + case ForwardFun(Headers) of + true -> Msg1 = maybe_clear_user_id( + Trust, update_headers(HeadersFun(Headers), Msg)), + Seq = case AckMode of + 'on-confirm' -> amqp_channel:next_publish_seqno(DCh); + _ -> ignore + end, + amqp_channel:cast(DCh, PublishMethod, Msg1), + case AckMode of + 'on-confirm' -> + gb_trees:insert(Seq, DT, Unacked); + 'on-publish' -> + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DT}), + Unacked; + 'no-ack' -> + Unacked + end; + false -> amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DT}), + %% Drop it, but acknowledge it! + Unacked + end. + +maybe_clear_user_id(false, Msg = #amqp_msg{props = Props}) -> + Msg#amqp_msg{props = Props#'P_basic'{user_id = undefined}}; +maybe_clear_user_id(true, Msg) -> + Msg. + +extract_headers(#amqp_msg{props = #'P_basic'{headers = Headers}}) -> + Headers. + +update_headers(Headers, Msg = #amqp_msg{props = Props}) -> + Msg#amqp_msg{props = Props#'P_basic'{headers = Headers}}. + +%%---------------------------------------------------------------------------- + +%% If the downstream channel shuts down cleanly, we can just ignore it +%% - we're the same node, we're presumably about to go down too. +handle_downstream_down(shutdown, _Args, State) -> + {noreply, State}; + +handle_downstream_down(Reason, _Args, State) -> + {stop, {downstream_channel_down, Reason}, State}. + +%% If the upstream channel goes down for an intelligible reason, just +%% log it and die quietly. +handle_upstream_down({shutdown, Reason}, {Upstream, UParams, XName}, State) -> + rabbit_federation_link_util:connection_error( + remote, {upstream_channel_down, Reason}, Upstream, UParams, XName, State); + +handle_upstream_down(Reason, _Args, State) -> + {stop, {upstream_channel_down, Reason}, State}. + +%%---------------------------------------------------------------------------- + +log_terminate(gone, _Upstream, _UParams, _XorQName) -> + %% the link cannot start, this has been logged already + ok; +log_terminate({shutdown, restart}, _Upstream, _UParams, _XorQName) -> + %% We've already logged this before munging the reason + ok; +log_terminate(shutdown, Upstream, UParams, XorQName) -> + %% The supervisor is shutting us down; we are probably restarting + %% the link because configuration has changed. So try to shut down + %% nicely so that we do not cause unacked messages to be + %% redelivered. + log_info(XorQName, "disconnecting from ~s~n", + [rabbit_federation_upstream:params_to_string(UParams)]), + rabbit_federation_status:remove(Upstream, XorQName); + +log_terminate(Reason, Upstream, UParams, XorQName) -> + %% Unexpected death. sasl will log it, but we should update + %% rabbit_federation_status. + rabbit_federation_status:report( + Upstream, UParams, XorQName, clean_reason(Reason)). + +log_debug(XorQName, Fmt, Args) -> log(debug, XorQName, Fmt, Args). +log_info(XorQName, Fmt, Args) -> log(info, XorQName, Fmt, Args). +log_warning(XorQName, Fmt, Args) -> log(warning, XorQName, Fmt, Args). +log_error(XorQName, Fmt, Args) -> log(error, XorQName, Fmt, Args). + +log(Level, XorQName, Fmt0, Args0) -> + Fmt = "Federation ~s " ++ Fmt0, + Args = [rabbit_misc:rs(XorQName) | Args0], + case Level of + debug -> rabbit_log_federation:debug(Fmt, Args); + info -> rabbit_log_federation:info(Fmt, Args); + warning -> rabbit_log_federation:warning(Fmt, Args); + error -> rabbit_log_federation:error(Fmt, Args) + end. + +%%---------------------------------------------------------------------------- + +disposable_channel_call(Conn, Method) -> + disposable_channel_call(Conn, Method, fun(_, _) -> ok end). + +disposable_channel_call(Conn, Method, ErrFun) -> + try + {ok, Ch} = amqp_connection:open_channel(Conn), + try + amqp_channel:call(Ch, Method) + catch exit:{{shutdown, {server_initiated_close, Code, Message}}, _} -> + ErrFun(Code, Message) + after + ensure_channel_closed(Ch) + end + catch + Exception:Reason -> + rabbit_log_federation:error("Federation link could not create a disposable (one-off) channel due to an error ~p: ~p~n", [Exception, Reason]) + end. + +disposable_connection_call(Params, Method, ErrFun) -> + try + rabbit_log_federation:debug("Disposable connection parameters: ~p", [Params]), + case open(Params, <<"Disposable exchange federation link connection">>) of + {ok, Conn, Ch} -> + try + amqp_channel:call(Ch, Method) + catch exit:{{shutdown, {connection_closing, {server_initiated_close, Code, Message}}}, _} -> + ErrFun(Code, Message); + exit:{{shutdown, {server_initiated_close, Code, Message}}, _} -> + ErrFun(Code, Message) + after + ensure_connection_closed(Conn) + end; + {error, {auth_failure, Message}} -> + rabbit_log_federation:error("Federation link could not open a disposable (one-off) connection " + "due to an authentication failure: ~s~n", [Message]); + Error -> + rabbit_log_federation:error("Federation link could not open a disposable (one-off) connection, " + "reason: ~p~n", [Error]), + Error + end + catch + Exception:Reason -> + rabbit_log_federation:error("Federation link could not create a disposable (one-off) connection " + "due to an error ~p: ~p~n", [Exception, Reason]) + end. diff --git a/deps/rabbitmq_federation/src/rabbit_federation_parameters.erl b/deps/rabbitmq_federation/src/rabbit_federation_parameters.erl new file mode 100644 index 0000000000..928e41dc0f --- /dev/null +++ b/deps/rabbitmq_federation/src/rabbit_federation_parameters.erl @@ -0,0 +1,139 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_federation_parameters). +-behaviour(rabbit_runtime_parameter). +-behaviour(rabbit_policy_validator). + +-include_lib("rabbit_common/include/rabbit.hrl"). + +-export([validate/5, notify/5, notify_clear/4]). +-export([register/0, unregister/0, validate_policy/1, adjust/1]). + +-define(RUNTIME_PARAMETERS, + [{runtime_parameter, <<"federation">>}, + {runtime_parameter, <<"federation-upstream">>}, + {runtime_parameter, <<"federation-upstream-set">>}, + {policy_validator, <<"federation-upstream">>}, + {policy_validator, <<"federation-upstream-pattern">>}, + {policy_validator, <<"federation-upstream-set">>}]). + +-rabbit_boot_step({?MODULE, + [{description, "federation parameters"}, + {mfa, {rabbit_federation_parameters, register, []}}, + {requires, rabbit_registry}, + {cleanup, {rabbit_federation_parameters, unregister, []}}, + {enables, recovery}]}). + +register() -> + [rabbit_registry:register(Class, Name, ?MODULE) || + {Class, Name} <- ?RUNTIME_PARAMETERS], + ok. + +unregister() -> + [rabbit_registry:unregister(Class, Name) || + {Class, Name} <- ?RUNTIME_PARAMETERS], + ok. + +validate(_VHost, <<"federation-upstream-set">>, Name, Term0, _User) -> + Term = [rabbit_data_coercion:to_proplist(Upstream) || Upstream <- Term0], + [rabbit_parameter_validation:proplist( + Name, + [{<<"upstream">>, fun rabbit_parameter_validation:binary/2, mandatory} | + shared_validation()], Upstream) + || Upstream <- Term]; + +validate(_VHost, <<"federation-upstream">>, Name, Term0, _User) -> + Term = rabbit_data_coercion:to_proplist(Term0), + rabbit_parameter_validation:proplist( + Name, [{<<"uri">>, fun validate_uri/2, mandatory} | + shared_validation()], Term); + +validate(_VHost, _Component, Name, _Term, _User) -> + {error, "name not recognised: ~p", [Name]}. + +notify(_VHost, <<"federation-upstream-set">>, Name, _Term, _Username) -> + adjust({upstream_set, Name}); + +notify(_VHost, <<"federation-upstream">>, Name, _Term, _Username) -> + adjust({upstream, Name}). + +notify_clear(_VHost, <<"federation-upstream-set">>, Name, _Username) -> + adjust({clear_upstream_set, Name}); + +notify_clear(VHost, <<"federation-upstream">>, Name, _Username) -> + rabbit_federation_exchange_link_sup_sup:adjust({clear_upstream, VHost, Name}), + rabbit_federation_queue_link_sup_sup:adjust({clear_upstream, VHost, Name}). + +adjust(Thing) -> + rabbit_federation_exchange_link_sup_sup:adjust(Thing), + rabbit_federation_queue_link_sup_sup:adjust(Thing). + +%%---------------------------------------------------------------------------- + +shared_validation() -> + [{<<"exchange">>, fun rabbit_parameter_validation:binary/2, optional}, + {<<"queue">>, fun rabbit_parameter_validation:binary/2, optional}, + {<<"consumer-tag">>, fun rabbit_parameter_validation:binary/2, optional}, + {<<"prefetch-count">>, fun rabbit_parameter_validation:number/2, optional}, + {<<"reconnect-delay">>,fun rabbit_parameter_validation:number/2, optional}, + {<<"max-hops">>, fun rabbit_parameter_validation:number/2, optional}, + {<<"expires">>, fun rabbit_parameter_validation:number/2, optional}, + {<<"message-ttl">>, fun rabbit_parameter_validation:number/2, optional}, + {<<"trust-user-id">>, fun rabbit_parameter_validation:boolean/2, optional}, + {<<"ack-mode">>, rabbit_parameter_validation:enum( + ['no-ack', 'on-publish', 'on-confirm']), optional}, + {<<"resource-cleanup-mode">>, rabbit_parameter_validation:enum(['default', 'never']), optional}, + {<<"ha-policy">>, fun rabbit_parameter_validation:binary/2, optional}, + {<<"bind-nowait">>, fun rabbit_parameter_validation:boolean/2, optional}]. + +validate_uri(Name, Term) when is_binary(Term) -> + case rabbit_parameter_validation:binary(Name, Term) of + ok -> case amqp_uri:parse(binary_to_list(Term)) of + {ok, _} -> ok; + {error, E} -> {error, "\"~s\" not a valid URI: ~p", [Term, E]} + end; + E -> E + end; +validate_uri(Name, Term) -> + case rabbit_parameter_validation:list(Name, Term) of + ok -> case [V || U <- Term, + V <- [validate_uri(Name, U)], + element(1, V) =:= error] of + [] -> ok; + [E | _] -> E + end; + E -> E + end. + +%%---------------------------------------------------------------------------- + +validate_policy([{<<"federation-upstream-set">>, Value}]) + when is_binary(Value) -> + ok; +validate_policy([{<<"federation-upstream-set">>, Value}]) -> + {error, "~p is not a valid federation upstream set name", [Value]}; + +validate_policy([{<<"federation-upstream-pattern">>, Value}]) + when is_binary(Value) -> + case re:compile(Value) of + {ok, _} -> ok; + {error, Reason} -> {error, "could not compile pattern ~s to a regular expression. " + "Error: ~p", [Value, Reason]} + end; +validate_policy([{<<"federation-upstream-pattern">>, Value}]) -> + {error, "~p is not a valid federation upstream pattern name", [Value]}; + +validate_policy([{<<"federation-upstream">>, Value}]) + when is_binary(Value) -> + ok; +validate_policy([{<<"federation-upstream">>, Value}]) -> + {error, "~p is not a valid federation upstream name", [Value]}; + +validate_policy(L) when length(L) >= 2 -> + {error, "cannot specify federation-upstream, federation-upstream-set " + "or federation-upstream-pattern together", []}. diff --git a/deps/rabbitmq_federation/src/rabbit_federation_queue.erl b/deps/rabbitmq_federation/src/rabbit_federation_queue.erl new file mode 100644 index 0000000000..3117792589 --- /dev/null +++ b/deps/rabbitmq_federation/src/rabbit_federation_queue.erl @@ -0,0 +1,111 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_federation_queue). + +-rabbit_boot_step({?MODULE, + [{description, "federation queue decorator"}, + {mfa, {rabbit_queue_decorator, register, + [<<"federation">>, ?MODULE]}}, + {requires, rabbit_registry}, + {cleanup, {rabbit_queue_decorator, unregister, + [<<"federation">>]}}, + {enables, recovery}]}). + +-include_lib("rabbit/include/amqqueue.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). +-include("rabbit_federation.hrl"). + +-behaviour(rabbit_queue_decorator). + +-export([startup/1, shutdown/1, policy_changed/2, active_for/1, + consumer_state_changed/3]). +-export([policy_changed_local/2]). + +%%---------------------------------------------------------------------------- + +startup(Q) -> + case active_for(Q) of + true -> rabbit_federation_queue_link_sup_sup:start_child(Q); + false -> ok + end, + ok. + +shutdown(Q) when ?is_amqqueue(Q) -> + QName = amqqueue:get_name(Q), + case active_for(Q) of + true -> rabbit_federation_queue_link_sup_sup:stop_child(Q), + rabbit_federation_status:remove_exchange_or_queue(QName); + false -> ok + end, + ok. + +policy_changed(Q1, Q2) when ?is_amqqueue(Q1) -> + QName = amqqueue:get_name(Q1), + case rabbit_amqqueue:lookup(QName) of + {ok, Q0} when ?is_amqqueue(Q0) -> + QPid = amqqueue:get_pid(Q0), + rpc:call(node(QPid), rabbit_federation_queue, + policy_changed_local, [Q1, Q2]); + {error, not_found} -> + ok + end. + +policy_changed_local(Q1, Q2) -> + shutdown(Q1), + startup(Q2). + +active_for(Q) -> + Args = amqqueue:get_arguments(Q), + case rabbit_misc:table_lookup(Args, <<"x-internal-purpose">>) of + {longstr, _} -> false; %% [0] + _ -> rabbit_federation_upstream:federate(Q) + end. +%% [0] Currently the only "internal purpose" is federation, but I +%% suspect if we introduce another one it will also be for something +%% that doesn't want to be federated. + +%% We need to reconsider whether we need to run or pause every time +%% the consumer state changes in the queue. But why can the state +%% change? +%% +%% consumer blocked | We may have no more active consumers, and thus need to +%% | pause +%% | +%% consumer unblocked | We don't care +%% | +%% queue empty | The queue has become empty therefore we need to run to +%% | get more messages +%% | +%% basic consume | We don't care +%% | +%% basic cancel | We may have no more active consumers, and thus need to +%% | pause +%% | +%% refresh | We asked for it (we have started a new link after +%% | failover and need something to prod us into action +%% | (or not)). +%% +%% In the cases where we don't care it's not prohibitively expensive +%% for us to be here anyway, so never mind. +%% +%% Note that there is no "queue became non-empty" state change - that's +%% because of the queue invariant. If the queue transitions from empty to +%% non-empty then it must have no active consumers - in which case it stays +%% the same from our POV. + +consumer_state_changed(Q, MaxActivePriority, IsEmpty) -> + QName = amqqueue:get_name(Q), + case IsEmpty andalso active_unfederated(MaxActivePriority) of + true -> rabbit_federation_queue_link:run(QName); + false -> rabbit_federation_queue_link:pause(QName) + end, + ok. + +active_unfederated(empty) -> false; +active_unfederated(P) when P >= 0 -> true; +active_unfederated(_P) -> false. diff --git a/deps/rabbitmq_federation/src/rabbit_federation_queue_link.erl b/deps/rabbitmq_federation/src/rabbit_federation_queue_link.erl new file mode 100644 index 0000000000..97389cb8f6 --- /dev/null +++ b/deps/rabbitmq_federation/src/rabbit_federation_queue_link.erl @@ -0,0 +1,330 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_federation_queue_link). + +%% pg2 is deprecated in OTP 23. +-compile(nowarn_deprecated_function). + +-include_lib("rabbit/include/amqqueue.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). +-include("rabbit_federation.hrl"). + +-behaviour(gen_server2). + +-export([start_link/1, go/0, run/1, pause/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-import(rabbit_misc, [pget/2]). +-import(rabbit_federation_util, [name/1, pgname/1]). + +-record(not_started, {queue, run, upstream, upstream_params}). +-record(state, {queue, run, conn, ch, dconn, dch, upstream, upstream_params, + unacked}). + +start_link(Args) -> + gen_server2:start_link(?MODULE, Args, [{timeout, infinity}]). + +run(QName) -> cast(QName, run). +pause(QName) -> cast(QName, pause). +go() -> cast(go). + +%%---------------------------------------------------------------------------- +%%call(QName, Msg) -> [gen_server2:call(Pid, Msg, infinity) || Pid <- q(QName)]. +cast(Msg) -> [gen_server2:cast(Pid, Msg) || Pid <- all()]. +cast(QName, Msg) -> [gen_server2:cast(Pid, Msg) || Pid <- q(QName)]. + +join(Name) -> + pg2:create(pgname(Name)), + ok = pg2:join(pgname(Name), self()). + +all() -> + pg2:create(pgname(rabbit_federation_queues)), + pg2:get_members(pgname(rabbit_federation_queues)). + +q(QName) -> + pg2:create(pgname({rabbit_federation_queue, QName})), + pg2:get_members(pgname({rabbit_federation_queue, QName})). + +federation_up() -> + proplists:is_defined(rabbitmq_federation, + application:which_applications(infinity)). + +%%---------------------------------------------------------------------------- + +init({Upstream, Queue}) when ?is_amqqueue(Queue) -> + QName = amqqueue:get_name(Queue), + case rabbit_amqqueue:lookup(QName) of + {ok, Q} -> + DeobfuscatedUpstream = rabbit_federation_util:deobfuscate_upstream(Upstream), + DeobfuscatedUParams = rabbit_federation_upstream:to_params(DeobfuscatedUpstream, Queue), + UParams = rabbit_federation_util:obfuscate_upstream_params(DeobfuscatedUParams), + rabbit_federation_status:report(Upstream, UParams, QName, starting), + join(rabbit_federation_queues), + join({rabbit_federation_queue, QName}), + gen_server2:cast(self(), maybe_go), + rabbit_amqqueue:notify_decorators(Q), + {ok, #not_started{queue = Queue, + run = false, + upstream = Upstream, + upstream_params = UParams}}; + {error, not_found} -> + rabbit_federation_link_util:log_warning(QName, "not found, stopping link~n", []), + {stop, gone} + end. + +handle_call(Msg, _From, State) -> + {stop, {unexpected_call, Msg}, State}. + +handle_cast(maybe_go, State) -> + case federation_up() of + true -> go(State); + false -> {noreply, State} + end; + +handle_cast(go, State = #not_started{}) -> + go(State); + +handle_cast(go, State) -> + {noreply, State}; + +handle_cast(run, State = #state{upstream = Upstream, + upstream_params = UParams, + ch = Ch, + run = false}) -> + consume(Ch, Upstream, UParams#upstream_params.x_or_q), + {noreply, State#state{run = true}}; + +handle_cast(run, State = #not_started{}) -> + {noreply, State#not_started{run = true}}; + +handle_cast(run, State) -> + %% Already started + {noreply, State}; + +handle_cast(pause, State = #state{run = false}) -> + %% Already paused + {noreply, State}; + +handle_cast(pause, State = #not_started{}) -> + {noreply, State#not_started{run = false}}; + +handle_cast(pause, State = #state{ch = Ch, upstream = Upstream}) -> + cancel(Ch, Upstream), + {noreply, State#state{run = false}}; + +handle_cast(Msg, State) -> + {stop, {unexpected_cast, Msg}, State}. + +handle_info(#'basic.consume_ok'{}, State) -> + {noreply, State}; + +handle_info(#'basic.ack'{} = Ack, State = #state{ch = Ch, + unacked = Unacked}) -> + Unacked1 = rabbit_federation_link_util:ack(Ack, Ch, Unacked), + {noreply, State#state{unacked = Unacked1}}; + +handle_info(#'basic.nack'{} = Nack, State = #state{ch = Ch, + unacked = Unacked}) -> + Unacked1 = rabbit_federation_link_util:nack(Nack, Ch, Unacked), + {noreply, State#state{unacked = Unacked1}}; + +handle_info({#'basic.deliver'{redelivered = Redelivered, + exchange = X, + routing_key = K} = DeliverMethod, Msg}, + State = #state{queue = Q, + upstream = Upstream, + upstream_params = UParams, + ch = Ch, + dch = DCh, + unacked = Unacked}) when ?is_amqqueue(Q) -> + QName = amqqueue:get_name(Q), + PublishMethod = #'basic.publish'{exchange = <<"">>, + routing_key = QName#resource.name}, + HeadersFun = fun (H) -> update_headers(UParams, Redelivered, X, K, H) end, + ForwardFun = fun (_H) -> true end, + Unacked1 = rabbit_federation_link_util:forward( + Upstream, DeliverMethod, Ch, DCh, PublishMethod, + HeadersFun, ForwardFun, Msg, Unacked), + %% TODO actually we could reject when 'stopped' + {noreply, State#state{unacked = Unacked1}}; + +handle_info(#'basic.cancel'{}, + State = #state{queue = Q, + upstream = Upstream, + upstream_params = UParams}) when ?is_amqqueue(Q) -> + QName = amqqueue:get_name(Q), + rabbit_federation_link_util:connection_error( + local, basic_cancel, Upstream, UParams, QName, State); + +handle_info({'DOWN', _Ref, process, Pid, Reason}, + State = #state{dch = DCh, + ch = Ch, + upstream = Upstream, + upstream_params = UParams, + queue = Q}) when ?is_amqqueue(Q) -> + QName = amqqueue:get_name(Q), + handle_down(Pid, Reason, Ch, DCh, {Upstream, UParams, QName}, State); + +handle_info(Msg, State) -> + {stop, {unexpected_info, Msg}, State}. + +terminate(Reason, #not_started{upstream = Upstream, + upstream_params = UParams, + queue = Q}) when ?is_amqqueue(Q) -> + QName = amqqueue:get_name(Q), + rabbit_federation_link_util:log_terminate(Reason, Upstream, UParams, QName), + ok; + +terminate(Reason, #state{dconn = DConn, + conn = Conn, + upstream = Upstream, + upstream_params = UParams, + queue = Q}) when ?is_amqqueue(Q) -> + QName = amqqueue:get_name(Q), + rabbit_federation_link_util:ensure_connection_closed(DConn), + rabbit_federation_link_util:ensure_connection_closed(Conn), + rabbit_federation_link_util:log_terminate(Reason, Upstream, UParams, QName), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%---------------------------------------------------------------------------- + +go(S0 = #not_started{run = Run, + upstream = Upstream = #upstream{ + prefetch_count = Prefetch}, + upstream_params = UParams, + queue = Queue}) when ?is_amqqueue(Queue) -> + QName = amqqueue:get_name(Queue), + #upstream_params{x_or_q = UQueue} = UParams, + Durable = amqqueue:is_durable(UQueue), + AutoDelete = amqqueue:is_auto_delete(UQueue), + Args = amqqueue:get_arguments(UQueue), + Unacked = rabbit_federation_link_util:unacked_new(), + rabbit_federation_link_util:start_conn_ch( + fun (Conn, Ch, DConn, DCh) -> + check_upstream_suitable(Conn), + amqp_channel:call(Ch, #'queue.declare'{queue = name(UQueue), + durable = Durable, + auto_delete = AutoDelete, + arguments = Args}), + case Upstream#upstream.ack_mode of + 'no-ack' -> ok; + _ -> amqp_channel:call( + Ch, #'basic.qos'{prefetch_count = Prefetch}) + end, + amqp_selective_consumer:register_default_consumer(Ch, self()), + case Run of + true -> consume(Ch, Upstream, UQueue); + false -> ok + end, + {noreply, #state{queue = Queue, + run = Run, + conn = Conn, + ch = Ch, + dconn = DConn, + dch = DCh, + upstream = Upstream, + upstream_params = UParams, + unacked = Unacked}} + end, Upstream, UParams, QName, S0). + +check_upstream_suitable(Conn) -> + Props = pget(server_properties, + amqp_connection:info(Conn, [server_properties])), + {table, Caps} = rabbit_misc:table_lookup(Props, <<"capabilities">>), + case rabbit_misc:table_lookup(Caps, <<"consumer_priorities">>) of + {bool, true} -> ok; + _ -> exit({error, upstream_lacks_consumer_priorities}) + end. + +update_headers(UParams, Redelivered, X, K, undefined) -> + update_headers(UParams, Redelivered, X, K, []); + +update_headers(#upstream_params{table = Table}, Redelivered, X, K, Headers) -> + {Headers1, Count} = + case rabbit_misc:table_lookup(Headers, ?ROUTING_HEADER) of + undefined -> + %% We only want to record the original exchange and + %% routing key the first time a message gets + %% forwarded; after that it's known that they were + %% <<>> and QueueName respectively. + {init_x_original_source_headers(Headers, X, K), 0}; + {array, Been} -> + update_visit_count(Table, Been, Headers); + %% this means the header comes from the client + %% which re-published the message, most likely unintentionally. + %% We can't assume much about the value, so we simply ignore it. + _Other -> + {init_x_original_source_headers(Headers, X, K), 0} + end, + rabbit_basic:prepend_table_header( + ?ROUTING_HEADER, Table ++ [{<<"redelivered">>, bool, Redelivered}, + {<<"visit-count">>, long, Count + 1}], + swap_cc_header(Headers1)). + +init_x_original_source_headers(Headers, X, K) -> + rabbit_misc:set_table_value( + rabbit_misc:set_table_value( + Headers, <<"x-original-exchange">>, longstr, X), + <<"x-original-routing-key">>, longstr, K). + +update_visit_count(Table, Been, Headers) -> + {Found, Been1} = lists:partition( + fun(I) -> visit_match(I, Table) end, + Been), + C = case Found of + [] -> 0; + [{table, T}] -> case rabbit_misc:table_lookup( + T, <<"visit-count">>) of + {_, I} when is_number(I) -> I; + _ -> 0 + end + end, + {rabbit_misc:set_table_value( + Headers, ?ROUTING_HEADER, array, Been1), C}. + +swap_cc_header(Table) -> + [{case K of + <<"CC">> -> <<"x-original-cc">>; + _ -> K + end, T, V} || {K, T, V} <- Table]. + +visit_match({table, T}, Info) -> + lists:all(fun (K) -> + rabbit_misc:table_lookup(T, K) =:= + rabbit_misc:table_lookup(Info, K) + end, [<<"uri">>, <<"virtual_host">>, <<"queue">>]); +visit_match(_ ,_) -> + false. + +consumer_tag(#upstream{consumer_tag = ConsumerTag}) -> + ConsumerTag. + +consume(Ch, Upstream, UQueue) -> + ConsumerTag = consumer_tag(Upstream), + NoAck = Upstream#upstream.ack_mode =:= 'no-ack', + amqp_channel:cast( + Ch, #'basic.consume'{queue = name(UQueue), + no_ack = NoAck, + nowait = true, + consumer_tag = ConsumerTag, + arguments = [{<<"x-priority">>, long, -1}]}). + +cancel(Ch, Upstream) -> + ConsumerTag = consumer_tag(Upstream), + amqp_channel:cast(Ch, #'basic.cancel'{nowait = true, + consumer_tag = ConsumerTag}). + +handle_down(DCh, Reason, _Ch, DCh, Args, State) -> + rabbit_federation_link_util:handle_downstream_down(Reason, Args, State); +handle_down(Ch, Reason, Ch, _DCh, Args, State) -> + rabbit_federation_link_util:handle_upstream_down(Reason, Args, State). diff --git a/deps/rabbitmq_federation/src/rabbit_federation_queue_link_sup_sup.erl b/deps/rabbitmq_federation/src/rabbit_federation_queue_link_sup_sup.erl new file mode 100644 index 0000000000..1f6ec2b88f --- /dev/null +++ b/deps/rabbitmq_federation/src/rabbit_federation_queue_link_sup_sup.erl @@ -0,0 +1,87 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_federation_queue_link_sup_sup). + +-behaviour(mirrored_supervisor). + +-include_lib("rabbit_common/include/rabbit.hrl"). +-include_lib("rabbit/include/amqqueue.hrl"). +-define(SUPERVISOR, ?MODULE). + +%% Supervises the upstream links for all queues (but not exchanges). We need +%% different handling here since queues do not want a mirrored sup. + +-export([start_link/0, start_child/1, adjust/1, stop_child/1]). +-export([init/1]). + +%%---------------------------------------------------------------------------- + +start_link() -> + mirrored_supervisor:start_link({local, ?SUPERVISOR}, ?SUPERVISOR, + fun rabbit_misc:execute_mnesia_transaction/1, + ?MODULE, []). + +%% Note that the next supervisor down, rabbit_federation_link_sup, is common +%% between exchanges and queues. +start_child(Q) -> + case mirrored_supervisor:start_child( + ?SUPERVISOR, + {id(Q), {rabbit_federation_link_sup, start_link, [Q]}, + transient, ?SUPERVISOR_WAIT, supervisor, + [rabbit_federation_link_sup]}) of + {ok, _Pid} -> ok; + {error, {already_started, _Pid}} -> + QueueName = amqqueue:get_name(Q), + rabbit_log_federation:warning("Federation link for queue ~p was already started", + [rabbit_misc:rs(QueueName)]), + ok; + %% A link returned {stop, gone}, the link_sup shut down, that's OK. + {error, {shutdown, _}} -> ok + end. + + +adjust({clear_upstream, VHost, UpstreamName}) -> + [rabbit_federation_link_sup:adjust(Pid, Q, {clear_upstream, UpstreamName}) || + {Q, Pid, _, _} <- mirrored_supervisor:which_children(?SUPERVISOR), + ?amqqueue_vhost_equals(Q, VHost)], + ok; +adjust(Reason) -> + [rabbit_federation_link_sup:adjust(Pid, Q, Reason) || + {Q, Pid, _, _} <- mirrored_supervisor:which_children(?SUPERVISOR)], + ok. + +stop_child(Q) -> + case mirrored_supervisor:terminate_child(?SUPERVISOR, id(Q)) of + ok -> ok; + {error, Err} -> + QueueName = amqqueue:get_name(Q), + rabbit_log_federation:warning( + "Attempt to stop a federation link for queue ~p failed: ~p", + [rabbit_misc:rs(QueueName), Err]), + ok + end, + ok = mirrored_supervisor:delete_child(?SUPERVISOR, id(Q)). + +%%---------------------------------------------------------------------------- + +init([]) -> + {ok, {{one_for_one, 1200, 60}, []}}. + +%% Clean out all mutable aspects of the queue except policy. We need +%% to keep the entire queue around rather than just take its name +%% since we will want to know its policy to determine how to federate +%% it, and its immutable properties in case we want to redeclare it +%% upstream. We don't just take its name and look it up again since +%% that would introduce race conditions when policies change +%% frequently. Note that since we take down all the links and start +%% again when policies change, the policy will always be correct, so +%% we don't clear it out here and can trust it. +id(Q) when ?is_amqqueue(Q) -> + Policy = amqqueue:get_policy(Q), + Q1 = rabbit_amqqueue:immutable(Q), + amqqueue:set_policy(Q1, Policy). diff --git a/deps/rabbitmq_federation/src/rabbit_federation_status.erl b/deps/rabbitmq_federation/src/rabbit_federation_status.erl new file mode 100644 index 0000000000..04afec990d --- /dev/null +++ b/deps/rabbitmq_federation/src/rabbit_federation_status.erl @@ -0,0 +1,175 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_federation_status). +-behaviour(gen_server). + +-include_lib("amqp_client/include/amqp_client.hrl"). +-include("rabbit_federation.hrl"). + +-export([start_link/0]). + +-export([report/4, remove_exchange_or_queue/1, remove/2, status/0, lookup/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-import(rabbit_federation_util, [name/1]). + +-define(SERVER, ?MODULE). +-define(ETS_NAME, ?MODULE). + +-record(state, {}). +-record(entry, {key, uri, status, timestamp, id, supervisor, upstream}). + +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +report(Upstream, UParams, XorQName, Status) -> + [Supervisor | _] = get('$ancestors'), + gen_server:cast(?SERVER, {report, Supervisor, Upstream, UParams, XorQName, + Status, calendar:local_time()}). + +remove_exchange_or_queue(XorQName) -> + gen_server:call(?SERVER, {remove_exchange_or_queue, XorQName}, infinity). + +remove(Upstream, XorQName) -> + gen_server:call(?SERVER, {remove, Upstream, XorQName}, infinity). + +status() -> + gen_server:call(?SERVER, status, infinity). + +lookup(Id) -> + gen_server:call(?SERVER, {lookup, Id}, infinity). + +init([]) -> + ?ETS_NAME = ets:new(?ETS_NAME, + [named_table, {keypos, #entry.key}, private]), + {ok, #state{}}. + +handle_call({remove_exchange_or_queue, XorQName}, _From, State) -> + [link_gone(Entry) + || Entry <- ets:match_object(?ETS_NAME, match_entry(xorqkey(XorQName)))], + {reply, ok, State}; + +handle_call({remove, Upstream, XorQName}, _From, State) -> + case ets:match_object(?ETS_NAME, match_entry(key(XorQName, Upstream))) of + [Entry] -> link_gone(Entry); + [] -> ok + end, + {reply, ok, State}; + +handle_call({lookup, Id}, _From, State) -> + Link = case ets:match_object(?ETS_NAME, match_id(Id)) of + [Entry] -> + [{key, Entry#entry.key}, + {uri, Entry#entry.uri}, + {status, Entry#entry.status}, + {timestamp, Entry#entry.timestamp}, + {id, Entry#entry.id}, + {supervisor, Entry#entry.supervisor}, + {upstream, Entry#entry.upstream}]; + [] -> not_found + end, + {reply, Link, State}; + +handle_call(status, _From, State) -> + Entries = ets:tab2list(?ETS_NAME), + {reply, [format(Entry) || Entry <- Entries], State}. + +handle_cast({report, Supervisor, Upstream, #upstream_params{safe_uri = URI}, + XorQName, Status, Timestamp}, State) -> + Key = key(XorQName, Upstream), + Entry = #entry{key = Key, + status = Status, + uri = URI, + timestamp = Timestamp, + supervisor = Supervisor, + upstream = Upstream, + id = unique_id(Key)}, + true = ets:insert(?ETS_NAME, Entry), + rabbit_event:notify(federation_link_status, format(Entry)), + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +format(#entry{status = Status, + uri = URI, + timestamp = Timestamp} = Entry) -> + identity(Entry) ++ split_status(Status) ++ [{uri, URI}, + {timestamp, Timestamp}]. + +identity(#entry{key = {#resource{virtual_host = VHost, + kind = Type, + name = XorQNameBin}, + UpstreamName, UXorQNameBin}, + id = Id, + upstream = #upstream{consumer_tag = ConsumerTag}}) -> + case Type of + exchange -> [{exchange, XorQNameBin}, + {upstream_exchange, UXorQNameBin}]; + queue -> [{queue, XorQNameBin}, + {upstream_queue, UXorQNameBin}, + {consumer_tag, ConsumerTag}] + end ++ [{type, Type}, + {vhost, VHost}, + {upstream, UpstreamName}, + {id, Id}]. + +unique_id(Key = {#resource{}, UpName, ResName}) when is_binary(UpName), is_binary(ResName) -> + PHash = erlang:phash2(Key, 1 bsl 32), + << << case N >= 10 of + true -> N - 10 + $a; + false -> N + $0 end >> + || <<N:4>> <= <<PHash:32>> >>. + +split_status({running, ConnName}) -> [{status, running}, + {local_connection, ConnName}]; +split_status({Status, Error}) -> [{status, Status}, + {error, Error}]; +split_status(Status) when is_atom(Status) -> [{status, Status}]. + +link_gone(Entry) -> + rabbit_event:notify(federation_link_removed, identity(Entry)), + true = ets:delete_object(?ETS_NAME, Entry). + +%% We don't want to key off the entire upstream, bits of it may change +key(XName = #resource{kind = exchange}, #upstream{name = UpstreamName, + exchange_name = UXNameBin}) -> + {XName, UpstreamName, UXNameBin}; + +key(QName = #resource{kind = queue}, #upstream{name = UpstreamName, + queue_name = UQNameBin}) -> + {QName, UpstreamName, UQNameBin}. + +xorqkey(XorQName) -> + {XorQName, '_', '_'}. + +match_entry(Key) -> + #entry{key = Key, + uri = '_', + status = '_', + timestamp = '_', + id = '_', + supervisor = '_', + upstream = '_'}. + +match_id(Id) -> + #entry{key = '_', + uri = '_', + status = '_', + timestamp = '_', + id = Id, + supervisor = '_', + upstream = '_'}. diff --git a/deps/rabbitmq_federation/src/rabbit_federation_sup.erl b/deps/rabbitmq_federation/src/rabbit_federation_sup.erl new file mode 100644 index 0000000000..d3642b52c2 --- /dev/null +++ b/deps/rabbitmq_federation/src/rabbit_federation_sup.erl @@ -0,0 +1,63 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_federation_sup). + +-behaviour(supervisor). + +%% Supervises everything. There is just one of these. + +-include_lib("rabbit_common/include/rabbit.hrl"). +-define(SUPERVISOR, rabbit_federation_sup). + +-export([start_link/0, stop/0]). + +-export([init/1]). + +%% This supervisor needs to be part of the rabbit application since +%% a) it needs to be in place when exchange recovery takes place +%% b) it needs to go up and down with rabbit + +-rabbit_boot_step({rabbit_federation_supervisor, + [{description, "federation"}, + {mfa, {rabbit_sup, start_child, [?MODULE]}}, + {requires, kernel_ready}, + {cleanup, {?MODULE, stop, []}}, + {enables, rabbit_federation_exchange}, + {enables, rabbit_federation_queue}]}). + +%%---------------------------------------------------------------------------- + +start_link() -> + R = supervisor:start_link({local, ?SUPERVISOR}, ?MODULE, []), + rabbit_federation_event:add_handler(), + R. + +stop() -> + rabbit_federation_event:remove_handler(), + ok = supervisor:terminate_child(rabbit_sup, ?MODULE), + ok = supervisor:delete_child(rabbit_sup, ?MODULE). + +%%---------------------------------------------------------------------------- + +init([]) -> + Status = {status, {rabbit_federation_status, start_link, []}, + transient, ?WORKER_WAIT, worker, + [rabbit_federation_status]}, + XLinkSupSup = {x_links, + {rabbit_federation_exchange_link_sup_sup, start_link, []}, + transient, ?SUPERVISOR_WAIT, supervisor, + [rabbit_federation_exchange_link_sup_sup]}, + QLinkSupSup = {q_links, + {rabbit_federation_queue_link_sup_sup, start_link, []}, + transient, ?SUPERVISOR_WAIT, supervisor, + [rabbit_federation_queue_link_sup_sup]}, + %% with default reconnect-delay of 5 second, this supports up to + %% 100 links constantly failing and being restarted a minute + %% (or 200 links if reconnect-delay is 10 seconds, 600 with 30 seconds, + %% etc: N * (60/reconnect-delay) <= 1200) + {ok, {{one_for_one, 1200, 60}, [Status, XLinkSupSup, QLinkSupSup]}}. diff --git a/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl b/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl new file mode 100644 index 0000000000..e079b850b5 --- /dev/null +++ b/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl @@ -0,0 +1,164 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_federation_upstream). + +-include("rabbit_federation.hrl"). +-include_lib("rabbit/include/amqqueue.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). + +-export([federate/1, for/1, for/2, params_to_string/1, to_params/2]). +%% For testing +-export([from_set/2, from_pattern/2, remove_credentials/1]). + +-import(rabbit_misc, [pget/2, pget/3]). +-import(rabbit_federation_util, [name/1, vhost/1, r/1]). +-import(rabbit_data_coercion, [to_atom/1]). + +%%---------------------------------------------------------------------------- + +federate(XorQ) -> + rabbit_policy:get(<<"federation-upstream">>, XorQ) =/= undefined orelse + rabbit_policy:get(<<"federation-upstream-set">>, XorQ) =/= undefined orelse + rabbit_policy:get(<<"federation-upstream-pattern">>, XorQ) =/= undefined. + +for(XorQ) -> + case federate(XorQ) of + false -> []; + true -> from_set_contents(upstreams(XorQ), XorQ) + end. + +for(XorQ, UpstreamName) -> + case federate(XorQ) of + false -> []; + true -> rabbit_federation_util:find_upstreams( + UpstreamName, from_set_contents(upstreams(XorQ), XorQ)) + end. + +upstreams(XorQ) -> + UName = rabbit_policy:get(<<"federation-upstream">>, XorQ), + USetName = rabbit_policy:get(<<"federation-upstream-set">>, XorQ), + UPatternValue = rabbit_policy:get(<<"federation-upstream-pattern">>, XorQ), + %% Cannot define 2 at a time, see rabbit_federation_parameters:validate_policy/1 + case {UName, USetName, UPatternValue} of + {undefined, undefined, undefined} -> []; + {undefined, undefined, _} -> find_contents(UPatternValue, vhost(XorQ)); + {undefined, _, undefined} -> set_contents(USetName, vhost(XorQ)); + {_, undefined, undefined} -> [[{<<"upstream">>, UName}]] + end. + +params_table(SafeURI, XorQ) -> + Key = case XorQ of + #exchange{} -> <<"exchange">>; + Q when ?is_amqqueue(Q) -> <<"queue">> + end, + [{<<"uri">>, longstr, SafeURI}, + {Key, longstr, name(XorQ)}]. + +params_to_string(#upstream_params{safe_uri = SafeURI, + x_or_q = XorQ}) -> + print("~s on ~s", [rabbit_misc:rs(r(XorQ)), SafeURI]). + +remove_credentials(URI) -> + list_to_binary(amqp_uri:remove_credentials(binary_to_list(URI))). + +to_params(Upstream = #upstream{uris = URIs}, XorQ) -> + URI = lists:nth(rand:uniform(length(URIs)), URIs), + {ok, Params} = amqp_uri:parse(binary_to_list(URI), vhost(XorQ)), + XorQ1 = with_name(Upstream, vhost(Params), XorQ), + SafeURI = remove_credentials(URI), + #upstream_params{params = Params, + uri = URI, + x_or_q = XorQ1, + safe_uri = SafeURI, + table = params_table(SafeURI, XorQ)}. + +print(Fmt, Args) -> iolist_to_binary(io_lib:format(Fmt, Args)). + +from_set(SetName, XorQ) -> + from_set_contents(set_contents(SetName, vhost(XorQ)), XorQ). + +from_pattern(SetName, XorQ) -> + from_set_contents(find_contents(SetName, vhost(XorQ)), XorQ). + +set_contents(<<"all">>, VHost) -> + Upstreams0 = rabbit_runtime_parameters:list( + VHost, <<"federation-upstream">>), + Upstreams = [rabbit_data_coercion:to_list(U) || U <- Upstreams0], + [[{<<"upstream">>, pget(name, U)}] || U <- Upstreams]; + +set_contents(SetName, VHost) -> + case rabbit_runtime_parameters:value( + VHost, <<"federation-upstream-set">>, SetName) of + not_found -> []; + Set -> Set + end. + +find_contents(RegExp, VHost) -> + Upstreams0 = rabbit_runtime_parameters:list( + VHost, <<"federation-upstream">>), + Upstreams = [rabbit_data_coercion:to_list(U) || U <- Upstreams0, + re:run(pget(name, U), RegExp) =/= nomatch], + [[{<<"upstream">>, pget(name, U)}] || U <- Upstreams]. + +from_set_contents(Set, XorQ) -> + Results = [from_set_element(P, XorQ) || P <- Set], + [R || R <- Results, R =/= not_found]. + +from_set_element(UpstreamSetElem0, XorQ) -> + UpstreamSetElem = rabbit_data_coercion:to_proplist(UpstreamSetElem0), + Name = bget(upstream, UpstreamSetElem, []), + case rabbit_runtime_parameters:value( + vhost(XorQ), <<"federation-upstream">>, Name) of + not_found -> not_found; + Upstream -> from_upstream_or_set( + UpstreamSetElem, Name, Upstream, XorQ) + end. + +from_upstream_or_set(US, Name, U, XorQ) -> + URIParam = bget(uri, US, U), + URIs = case URIParam of + B when is_binary(B) -> [B]; + L when is_list(L) -> L + end, + #upstream{uris = URIs, + exchange_name = bget(exchange, US, U, name(XorQ)), + queue_name = bget(queue, US, U, name(XorQ)), + consumer_tag = bget('consumer-tag', US, U, <<"federation-link-", Name/binary>>), + prefetch_count = bget('prefetch-count', US, U, ?DEF_PREFETCH), + reconnect_delay = bget('reconnect-delay', US, U, 5), + max_hops = bget('max-hops', US, U, 1), + expires = bget(expires, US, U, none), + message_ttl = bget('message-ttl', US, U, none), + trust_user_id = bget('trust-user-id', US, U, false), + ack_mode = to_atom(bget('ack-mode', US, U, <<"on-confirm">>)), + ha_policy = bget('ha-policy', US, U, none), + name = Name, + bind_nowait = bget('bind-nowait', US, U, false), + resource_cleanup_mode = to_atom(bget('resource-cleanup-mode', US, U, <<"default">>))}. + +%%---------------------------------------------------------------------------- + +bget(K, L1, L2) -> bget(K, L1, L2, undefined). + +bget(K0, L1, L2, D) -> + K = a2b(K0), + %% coerce maps to proplists + PL1 = rabbit_data_coercion:to_list(L1), + PL2 = rabbit_data_coercion:to_list(L2), + case pget(K, PL1, undefined) of + undefined -> pget(K, PL2, D); + Result -> Result + end. + +a2b(A) -> list_to_binary(atom_to_list(A)). + +with_name(#upstream{exchange_name = XNameBin}, VHostBin, X = #exchange{}) -> + X#exchange{name = rabbit_misc:r(VHostBin, exchange, XNameBin)}; + +with_name(#upstream{queue_name = QNameBin}, VHostBin, Q) when ?is_amqqueue(Q) -> + amqqueue:set_name(Q, rabbit_misc:r(VHostBin, queue, QNameBin)). diff --git a/deps/rabbitmq_federation/src/rabbit_federation_upstream_exchange.erl b/deps/rabbitmq_federation/src/rabbit_federation_upstream_exchange.erl new file mode 100644 index 0000000000..6018dd90a5 --- /dev/null +++ b/deps/rabbitmq_federation/src/rabbit_federation_upstream_exchange.erl @@ -0,0 +1,75 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_federation_upstream_exchange). + +-rabbit_boot_step({?MODULE, + [{description, "federation upstream exchange type"}, + {mfa, {rabbit_registry, register, + [exchange, <<"x-federation-upstream">>, ?MODULE]}}, + {requires, rabbit_registry}, + {cleanup, {rabbit_registry, unregister, + [exchange, <<"x-federation-upstream">>]}}, + {enables, recovery}]}). + +-include_lib("rabbit_common/include/rabbit.hrl"). +-include("rabbit_federation.hrl"). + +-behaviour(rabbit_exchange_type). + +-export([description/0, serialise_events/0, route/2]). +-export([validate/1, validate_binding/2, + create/2, delete/3, policy_changed/2, + add_binding/3, remove_bindings/3, assert_args_equivalence/2]). +-export([info/1, info/2]). + +%%---------------------------------------------------------------------------- + +info(_X) -> []. +info(_X, _) -> []. + +description() -> + [{description, <<"Federation upstream helper exchange">>}, + {internal_purpose, federation}]. + +serialise_events() -> false. + +route(X = #exchange{arguments = Args}, + D = #delivery{message = #basic_message{content = Content}}) -> + %% This arg was introduced in the same release as this exchange type; + %% it must be set + {long, MaxHops} = rabbit_misc:table_lookup(Args, ?MAX_HOPS_ARG), + %% Will be missing for pre-3.3.0 versions + DName = case rabbit_misc:table_lookup(Args, ?DOWNSTREAM_NAME_ARG) of + {longstr, Val0} -> Val0; + _ -> unknown + end, + %% Will be missing for pre-3.8.9 versions + DVhost = case rabbit_misc:table_lookup(Args, ?DOWNSTREAM_VHOST_ARG) of + {longstr, Val1} -> Val1; + _ -> unknown + end, + Headers = rabbit_basic:extract_headers(Content), + case rabbit_federation_util:should_forward(Headers, MaxHops, DName, DVhost) of + true -> rabbit_exchange_type_fanout:route(X, D); + false -> [] + end. + +validate(#exchange{arguments = Args}) -> + rabbit_federation_util:validate_arg(?MAX_HOPS_ARG, long, Args). + +validate_binding(_X, _B) -> ok. +create(_Tx, _X) -> ok. +delete(_Tx, _X, _Bs) -> ok. +policy_changed(_X1, _X2) -> ok. +add_binding(_Tx, _X, _B) -> ok. +remove_bindings(_Tx, _X, _Bs) -> ok. + +assert_args_equivalence(X = #exchange{name = Name, + arguments = Args}, ReqArgs) -> + rabbit_misc:assert_args_equivalence(Args, ReqArgs, Name, [?MAX_HOPS_ARG]), + rabbit_exchange:assert_args_equivalence(X, Args). diff --git a/deps/rabbitmq_federation/src/rabbit_federation_util.erl b/deps/rabbitmq_federation/src/rabbit_federation_util.erl new file mode 100644 index 0000000000..160bac996e --- /dev/null +++ b/deps/rabbitmq_federation/src/rabbit_federation_util.erl @@ -0,0 +1,102 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_federation_util). + +-include_lib("rabbit/include/amqqueue.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). +-include("rabbit_federation.hrl"). + +-export([should_forward/4, find_upstreams/2, already_seen/3]). +-export([validate_arg/3, fail/2, name/1, vhost/1, r/1, pgname/1]). +-export([obfuscate_upstream/1, deobfuscate_upstream/1, obfuscate_upstream_params/1, deobfuscate_upstream_params/1]). + +-import(rabbit_misc, [pget_or_die/2, pget/3]). + +%%---------------------------------------------------------------------------- + +should_forward(undefined, _MaxHops, _DName, _DVhost) -> + true; +should_forward(Headers, MaxHops, DName, DVhost) -> + case rabbit_misc:table_lookup(Headers, ?ROUTING_HEADER) of + {array, A} -> length(A) < MaxHops andalso not already_seen(DName, DVhost, A); + _ -> true + end. + +%% Used to detect message and binding forwarding cycles. +already_seen(UpstreamID, UpstreamVhost, Array) -> + lists:any(fun ({table, T}) -> + {longstr, UpstreamID} =:= rabbit_misc:table_lookup(T, <<"cluster-name">>) andalso + {longstr, UpstreamVhost} =:= rabbit_misc:table_lookup(T, <<"vhost">>); + (_) -> + false + end, Array). + +find_upstreams(Name, Upstreams) -> + [U || U = #upstream{name = Name2} <- Upstreams, + Name =:= Name2]. + +validate_arg(Name, Type, Args) -> + case rabbit_misc:table_lookup(Args, Name) of + {Type, _} -> ok; + undefined -> fail("Argument ~s missing", [Name]); + _ -> fail("Argument ~s must be of type ~s", [Name, Type]) + end. + +-spec fail(io:format(), [term()]) -> no_return(). + +fail(Fmt, Args) -> rabbit_misc:protocol_error(precondition_failed, Fmt, Args). + +name( #resource{name = XorQName}) -> XorQName; +name(#exchange{name = #resource{name = XName}}) -> XName; +name(Q) when ?is_amqqueue(Q) -> #resource{name = QName} = amqqueue:get_name(Q), QName. + +vhost( #resource{virtual_host = VHost}) -> VHost; +vhost(#exchange{name = #resource{virtual_host = VHost}}) -> VHost; +vhost(Q) when ?is_amqqueue(Q) -> #resource{virtual_host = VHost} = amqqueue:get_name(Q), VHost; +vhost(#amqp_params_direct{virtual_host = VHost}) -> VHost; +vhost(#amqp_params_network{virtual_host = VHost}) -> VHost. + +r(#exchange{name = XName}) -> XName; +r(Q) when ?is_amqqueue(Q) -> amqqueue:get_name(Q). + +pgname(Name) -> + case application:get_env(rabbitmq_federation, pgroup_name_cluster_id) of + {ok, false} -> Name; + {ok, true} -> {rabbit_nodes:cluster_name(), Name}; + %% default value is 'false', so do the same thing + {ok, undefined} -> Name; + _ -> Name + end. + +obfuscate_upstream(#upstream{uris = Uris} = Upstream) -> + Upstream#upstream{uris = [credentials_obfuscation:encrypt(Uri) || Uri <- Uris]}. + +obfuscate_upstream_params(#upstream_params{uri = Uri, params = #amqp_params_network{password = Password} = Params} = UParams) -> + UParams#upstream_params{ + uri = credentials_obfuscation:encrypt(Uri), + params = Params#amqp_params_network{password = credentials_obfuscation:encrypt(rabbit_data_coercion:to_binary(Password))} + }; +obfuscate_upstream_params(#upstream_params{uri = Uri, params = #amqp_params_direct{password = Password} = Params} = UParams) -> + UParams#upstream_params{ + uri = credentials_obfuscation:encrypt(Uri), + params = Params#amqp_params_direct{password = credentials_obfuscation:encrypt(rabbit_data_coercion:to_binary(Password))} + }. + +deobfuscate_upstream(#upstream{uris = EncryptedUris} = Upstream) -> + Upstream#upstream{uris = [credentials_obfuscation:decrypt(EncryptedUri) || EncryptedUri <- EncryptedUris]}. + +deobfuscate_upstream_params(#upstream_params{uri = EncryptedUri, params = #amqp_params_network{password = EncryptedPassword} = Params} = UParams) -> + UParams#upstream_params{ + uri = credentials_obfuscation:decrypt(EncryptedUri), + params = Params#amqp_params_network{password = credentials_obfuscation:decrypt(EncryptedPassword)} + }; +deobfuscate_upstream_params(#upstream_params{uri = EncryptedUri, params = #amqp_params_direct{password = EncryptedPassword} = Params} = UParams) -> + UParams#upstream_params{ + uri = credentials_obfuscation:decrypt(EncryptedUri), + params = Params#amqp_params_direct{password = credentials_obfuscation:decrypt(EncryptedPassword)} + }. |