diff options
Diffstat (limited to 'deps/rabbitmq_federation')
41 files changed, 14662 insertions, 0 deletions
diff --git a/deps/rabbitmq_federation/.gitignore b/deps/rabbitmq_federation/.gitignore
new file mode 100644
index 0000000000..f05d110de3
--- /dev/null
+++ b/deps/rabbitmq_federation/.gitignore Project
maintainers who do not follow or enforce the Code of Conduct may be permanently removed
from the project team.

This Code of Conduct applies both within project spaces and in public spaces when an
individual is representing the project or its community. This starts up a bunch of link processes +(one for each upstream) which: + + * Connect to the upstream broker + * Create a queue and bind it to the upstream exchange + * Keep bindings in sync with the downstream exchange + * Consume messages from the upstream queue and republish them to the + downstream exchange (matching confirms with acks) + +Each link process monitors the connections / channels it opens, and +dies if they do. We use a supervisor2 to ensure that we get some +backoff when restarting. + +We use process groups to identify all link processes for a certain +exchange, as well as all link processes together. + +However, there are a bunch of wrinkles: + + +Wrinkle: The exchange will be recovered when the Erlang client is not available +=============================================================================== + +Exchange recovery happens within the rabbit application - therefore at +the time that the exchange is recovered, we can't make any connections +since the amqp_client application has not yet started. Each link +therefore initially has a state 'not_started'. When it is created it +checks to see if the rabbitmq_federation application is running. If +so, it starts fully. If not, it goes into the 'not_started' +state. When rabbitmq_federation starts, it sends a 'go' message to all +links, prodding them to bring up the link. + + +Wrinkle: On reconnect we want to assert bindings atomically +=========================================================== + +If the link goes down for whatever reason, then by the time it comes +up again the bindings downstream may no longer be in sync with those +upstream. Therefore on link establishment we want to ensure that a +certain set of bindings exists. (Of course bringing up a link for the +first time is a simple case of this.) And we want to do this with AMQP +methods. But if we were to tear down all bindings and recreate them, +we would have a time period when messages would not be forwarded for +bindings that *do* still exist before and after. + +We use exchange to exchange bindings to work around this: + +We bind the upstream exchange (X) to the upstream queue (Q) via an +internal fanout exchange (IXA) like so: (routing keys R1 and R2): + + X----R1,R2--->IXA---->Q + +This has the same effect as binding the queue to the exchange directly. + +Now imagine the link has gone down, and is about to be +reestablished. In the meanwhile, routing has changed downstream so +that we now want routing keys R1 and R3. On link reconnection we can +create and bind another internal fanout exchange IXB: + + X----R1,R2--->IXA---->Q + | ^ + | | + \----R1,R3--->IXB-----/ + +and then delete the original exchange IXA: + + X Q + | ^ + | | + \----R1,R3--->IXB-----/ + +This means that messages matching R1 are always routed during the +switchover. Messages for R3 will start being routed as soon as we bind +the second exchange, and messages for R2 will be stopped in a timely +way. Of course this could lag the downstream situation somewhat, in +which case some R2 messages will get thrown away downstream since they +are unroutable. However this lag is inevitable when the link goes +down. + +This means that the downstream only needs to keep track of whether the +upstream is currently going via internal exchange A or B. This is +held in the exchange scratch space in Mnesia. + + +Wrinkle: We need to amalgamate bindings +======================================= + +Since we only bind to one exchange upstream, but the downstream +exchange can be bound to many queues, we can have duplicated bindings +downstream (same source, routing key and args but different +destination) that cannot be duplicated upstream (since the destination +is the same). The link therefore maintains a mapping of (Key, Args) to +set(Dest). Duplicated bindings do not get repeated upstream, and are +only unbound upstream when the last one goes away downstream. + +Furthermore, this works as an optimisation since this will tend to +reduce upstream binding count and churn. + + +Wrinkle: We may receive binding events out of order +=================================================== + +The rabbit_federation_exchange callbacks are invoked by channel +processes within rabbit. Therefore they can be executed concurrently, +and can arrive at the link processes in an order that does not +correspond to the wall clock. + +We need to keep the state of the link in sync with Mnesia. Therefore +not only do we need to impose an ordering on these events, we need to +impose Mnesia's ordering on them. We therefore added a function to the +callback interface, serialise_events. When this returns true, the +callback mechanism inside rabbit increments a per-exchange counter +within an Mnesia transaction, and returns the value as part of the +add_binding and remove_binding callbacks. The link process then queues +up these events, and replays them in order. The link process's state +thus always follows Mnesia (it may be delayed, but the effects happen +in the same order). + + +Other issues +============ + +Since links are implemented in terms of AMQP, link failure may cause +messages to be redelivered. If you're unlucky this could lead to +duplication. + +Message duplication can also happen with some topologies. In some +cases it may not be possible to set max_hops such that messages arrive +once at every node. + +While we correctly order bind / unbind events, we don't do the same +thing for exchange creation / deletion. (This is harder - if you +delete and recreate an exchange with the same name, is it the same +exchange? What about if its type changes?) This would only be an issue +if exchanges churn rapidly; however we could get into a state where +Mnesia sees CDCD but we see CDDC and leave a process running when we +shouldn't. diff --git a/deps/rabbitmq_federation/README.md b/deps/rabbitmq_federation/README.md new file mode 100644 index 0000000000..efebf43d3a --- /dev/null +++ b/deps/rabbitmq_federation/README.md @@ -0,0 +1,25 @@ +## RabbitMQ Federation + +[![Build Status](https://travis-ci.org/rabbitmq/rabbitmq-federation.svg?branch=master)](https://travis-ci.org/rabbitmq/rabbitmq-federation) + +RabbitMQ federation offers a group of features for loosely +coupled and WAN-friendly distributed RabbitMQ setups. Note that +this is not an alternative to queue mirroring. + + +## Supported RabbitMQ Versions + +This plugin ships with RabbitMQ, there is no need to +install it separately. + + +## Documentation + +See [RabbitMQ federation plugin](https://www.rabbitmq.com/federation.html) on rabbitmq.com. + + +## License and Copyright + +Released under [the same license as RabbitMQ](https://www.rabbitmq.com/mpl.html). + +2007-2015 (c) 2007-2020 VMware, Inc. or its affiliates. diff --git a/deps/rabbitmq_federation/erlang.mk b/deps/rabbitmq_federation/erlang.mk new file mode 100644 index 0000000000..fce4be0b0a --- /dev/null +++ b/deps/rabbitmq_federation/erlang.mk @@ -0,0 +1,7808 @@ +# Copyright (c) 2013-2016, Loïc Hoguin <essen@ninenines.eu> +# +# Permission to use, copy, modify, and/or distribute this software for any +# purpose with or without fee is hereby granted, provided that the above +# copyright notice and this permission notice appear in all copies. +# +# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS. grep -qs ^$$dep$$ $(ERLANG_MK_RECURSIVE_TMP_LIST); then \
		if grep -qs -E "^[[:blank:]]*include[[:blank:]]+(erlang\.mk|.*/erlang\.mk|.*ERLANG_MK_FILENAME.*)$$" \
			$$dep/GNUmakefile $$dep/makefile $$dep/Makefile; then \
			$(MAKE) -C $$dep fetch-deps \
				IS_DEP=1 \
				ERLANG_MK_RECURSIVE_TMP_LIST=$(ERLANG_MK_RECURSIVE_TMP_LIST); \
		fi \
	fi \
done '{"federation-upstream-set": "upstream"}' +$CTL set_policy fed12 "^fed12\." '{"federation-upstream-set": "upstream12"}' +$CTL set_policy one "^two$" '{"federation-upstream-set": "one"}' +$CTL set_policy two "^one$" '{"federation-upstream-set": "two"}' +$CTL set_policy hare "^hare\." '{"federation-upstream-set": "upstream5673"}' +$CTL set_policy all "^all\." '{"federation-upstream-set": "all"}' +$CTL set_policy new "^new\." '{"federation-upstream-set": "new-set"}' diff --git a/deps/rabbitmq_federation/etc/setup-rabbit-test.sh b/deps/rabbitmq_federation/etc/setup-rabbit-test.sh new file mode 100755 index 0000000000..2e2282ee07 --- /dev/null +++ b/deps/rabbitmq_federation/etc/setup-rabbit-test.sh @@ -0,0 +1,2 @@ +#!/bin/sh -e +sh -e `dirname $0`/rabbit-test.sh "$DEPS_DIR/rabbit/scripts/rabbitmqctl -n $RABBITMQ_NODENAME" diff --git a/deps/rabbitmq_federation/include/rabbit_federation.hrl b/deps/rabbitmq_federation/include/rabbit_federation.hrl new file mode 100644 index 0000000000..af92e1aa25 --- /dev/null +++ b/deps/rabbitmq_federation/include/rabbit_federation.hrl @@ -0,0 +1,44 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-record(upstream, {uris, + exchange_name, + queue_name, + consumer_tag, + prefetch_count, + max_hops, + reconnect_delay, + expires, + message_ttl, + trust_user_id, + ack_mode, + ha_policy, + name, + bind_nowait, + resource_cleanup_mode}). + +-record(upstream_params, + {uri, + params, + x_or_q, + %% The next two can be derived from the above three, but we don't + %% want to do that every time we forward a message. + safe_uri, + table}). + +%% Name of the message header used to collect the hop (forwarding) path +%% metadata as the message is forwarded by exchange federation. +-define(ROUTING_HEADER, <<"x-received-from">>). +-define(BINDING_HEADER, <<"x-bound-from">>). +-define(MAX_HOPS_ARG, <<"x-max-hops">>). +%% Identifies a cluster, used by exchange federation cycle detection +-define(DOWNSTREAM_NAME_ARG, <<"x-downstream-name">>). +%% Identifies a virtual host, used by exchange federation cycle detection +-define(DOWNSTREAM_VHOST_ARG, <<"x-downstream-vhost">>). +-define(DEF_PREFETCH, 1000). + +-define(FEDERATION_GUIDE_URL, <<"https://rabbitmq.com/federation.html">>). diff --git a/deps/rabbitmq_federation/rabbitmq-components.mk b/deps/rabbitmq_federation/rabbitmq-components.mk new file mode 100644 index 0000000000..b2a3be8b35 --- /dev/null +++ b/deps/rabbitmq_federation/rabbitmq-components.mk @@ -0,0 +1,359 @@ +ifeq ($(.DEFAULT_GOAL),) +# Define default goal to `all` because this file defines some targets +# before the inclusion of erlang.mk leading to the wrong target becoming +# the default. +.DEFAULT_GOAL = all +endif + +# PROJECT_VERSION defaults to: +# 1. the version exported by rabbitmq-server-release; +# 2. the version stored in `git-revisions.txt`, if it exists; +# 3. a version based on git-describe(1), if it is a Git clone; +# 4. 0.0.0 + +PROJECT_VERSION := $(RABBITMQ_VERSION) + +ifeq ($(PROJECT_VERSION),) +PROJECT_VERSION := $(shell \ +if test -f git-revisions.txt; then \ + head -n1 git-revisions.txt | \ + awk '{print $$$(words $(PROJECT_DESCRIPTION) version);}'; \ +else \ + (git describe --dirty --abbrev=7 --tags --always --first-parent \ + 2>/dev/null || echo rabbitmq_v0_0_0) | \ + sed -e 's/^rabbitmq_v//' -e 's/^v//' -e 's/_/./g' -e 's/-/+/' \ + -e 's/-/./g'; \ +fi) +endif + +# -------------------------------------------------------------------- +# RabbitMQ components. +# -------------------------------------------------------------------- + +# For RabbitMQ repositories, we want to checkout branches which match +# the parent project. For instance, if the parent project is on a +# release tag, dependencies must be on the same release tag. If the +# parent project is on a topic branch, dependencies must be on the same +# topic branch or fallback to `stable` or `master` whichever was the +# base of the topic branch. + +dep_amqp_client = git_rmq rabbitmq-erlang-client $(current_rmq_ref) $(base_rmq_ref) master +dep_amqp10_client = git_rmq rabbitmq-amqp1.0-client $(current_rmq_ref) $(base_rmq_ref) master +dep_amqp10_common = git_rmq rabbitmq-amqp1.0-common $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbit = git_rmq rabbitmq-server $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbit_common = git_rmq rabbitmq-common $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_amqp1_0 = git_rmq rabbitmq-amqp1.0 $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_auth_backend_amqp = git_rmq rabbitmq-auth-backend-amqp $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_auth_backend_cache = git_rmq rabbitmq-auth-backend-cache $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_auth_backend_http = git_rmq rabbitmq-auth-backend-http $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_auth_backend_ldap = git_rmq rabbitmq-auth-backend-ldap $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_auth_backend_oauth2 = git_rmq rabbitmq-auth-backend-oauth2 $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_auth_mechanism_ssl = git_rmq rabbitmq-auth-mechanism-ssl $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_aws = git_rmq rabbitmq-aws $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_boot_steps_visualiser = git_rmq rabbitmq-boot-steps-visualiser $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_cli = git_rmq rabbitmq-cli $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_codegen = git_rmq rabbitmq-codegen $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_consistent_hash_exchange = git_rmq rabbitmq-consistent-hash-exchange $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_ct_client_helpers = git_rmq rabbitmq-ct-client-helpers $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_ct_helpers = git_rmq rabbitmq-ct-helpers $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_delayed_message_exchange = git_rmq rabbitmq-delayed-message-exchange $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_dotnet_client = git_rmq rabbitmq-dotnet-client $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_event_exchange = git_rmq rabbitmq-event-exchange $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_federation = git_rmq rabbitmq-federation $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_federation_management = git_rmq rabbitmq-federation-management $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_java_client = git_rmq rabbitmq-java-client $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_jms_client = git_rmq rabbitmq-jms-client $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_jms_cts = git_rmq rabbitmq-jms-cts $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_jms_topic_exchange = git_rmq rabbitmq-jms-topic-exchange $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_lvc_exchange = git_rmq rabbitmq-lvc-exchange $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_management = git_rmq rabbitmq-management $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_management_agent = git_rmq rabbitmq-management-agent $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_management_exchange = git_rmq rabbitmq-management-exchange $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_management_themes = git_rmq rabbitmq-management-themes $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_message_timestamp = git_rmq rabbitmq-message-timestamp $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_metronome = git_rmq rabbitmq-metronome $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_mqtt = git_rmq rabbitmq-mqtt $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_objc_client = git_rmq rabbitmq-objc-client $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_peer_discovery_aws = git_rmq rabbitmq-peer-discovery-aws $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_peer_discovery_common = git_rmq rabbitmq-peer-discovery-common $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_peer_discovery_consul = git_rmq rabbitmq-peer-discovery-consul $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_peer_discovery_etcd = git_rmq rabbitmq-peer-discovery-etcd $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_peer_discovery_k8s = git_rmq rabbitmq-peer-discovery-k8s $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_prometheus = git_rmq rabbitmq-prometheus $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_random_exchange = git_rmq rabbitmq-random-exchange $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_recent_history_exchange = git_rmq rabbitmq-recent-history-exchange $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_routing_node_stamp = git_rmq rabbitmq-routing-node-stamp $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_rtopic_exchange = git_rmq rabbitmq-rtopic-exchange $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_server_release = git_rmq rabbitmq-server-release $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_sharding = git_rmq rabbitmq-sharding $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_shovel = git_rmq rabbitmq-shovel $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_shovel_management = git_rmq rabbitmq-shovel-management $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_stomp = git_rmq rabbitmq-stomp $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_stream = git_rmq rabbitmq-stream $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_toke = git_rmq rabbitmq-toke $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_top = git_rmq rabbitmq-top $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_tracing = git_rmq rabbitmq-tracing $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_trust_store = git_rmq rabbitmq-trust-store $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_test = git_rmq rabbitmq-test $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_web_dispatch = git_rmq rabbitmq-web-dispatch $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_web_stomp = git_rmq rabbitmq-web-stomp $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_web_stomp_examples = git_rmq rabbitmq-web-stomp-examples $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_web_mqtt = git_rmq rabbitmq-web-mqtt $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_web_mqtt_examples = git_rmq rabbitmq-web-mqtt-examples $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_website = git_rmq rabbitmq-website $(current_rmq_ref) $(base_rmq_ref) live master +dep_toke = git_rmq toke $(current_rmq_ref) $(base_rmq_ref) master + +dep_rabbitmq_public_umbrella = git_rmq rabbitmq-public-umbrella $(current_rmq_ref) $(base_rmq_ref) master + +# Third-party dependencies version pinning. +# +# We do that in this file, which is copied in all projects, to ensure +# all projects use the same versions. It avoids conflicts and makes it +# possible to work with rabbitmq-public-umbrella. + +dep_accept = hex 0.3.5 +dep_cowboy = hex 2.8.0 +dep_cowlib = hex 2.9.1 +dep_jsx = hex 2.11.0 +dep_lager = hex 3.8.0 +dep_prometheus = git https://github.com/deadtrickster/prometheus.erl.git master +dep_ra = git https://github.com/rabbitmq/ra.git master +dep_ranch = hex 1.7.1 +dep_recon = hex 2.5.1 +dep_observer_cli = hex 1.5.4 +dep_stdout_formatter = hex 0.2.4 +dep_sysmon_handler = hex 1.3.0 + +RABBITMQ_COMPONENTS = amqp_client \ + amqp10_common \ + amqp10_client \ + rabbit \ + rabbit_common \ + rabbitmq_amqp1_0 \ + rabbitmq_auth_backend_amqp \ + rabbitmq_auth_backend_cache \ + rabbitmq_auth_backend_http \ + rabbitmq_auth_backend_ldap \ + rabbitmq_auth_backend_oauth2 \ + rabbitmq_auth_mechanism_ssl \ + rabbitmq_aws \ + rabbitmq_boot_steps_visualiser \ + rabbitmq_cli \ + rabbitmq_codegen \ + rabbitmq_consistent_hash_exchange \ + rabbitmq_ct_client_helpers \ + rabbitmq_ct_helpers \ + rabbitmq_delayed_message_exchange \ + rabbitmq_dotnet_client \ + rabbitmq_event_exchange \ + rabbitmq_federation \ + rabbitmq_federation_management \ + rabbitmq_java_client \ + rabbitmq_jms_client \ + rabbitmq_jms_cts \ + rabbitmq_jms_topic_exchange \ + rabbitmq_lvc_exchange \ + rabbitmq_management \ + rabbitmq_management_agent \ + rabbitmq_management_exchange \ + rabbitmq_management_themes \ + rabbitmq_message_timestamp \ + rabbitmq_metronome \ + rabbitmq_mqtt \ + rabbitmq_objc_client \ + rabbitmq_peer_discovery_aws \ + rabbitmq_peer_discovery_common \ + rabbitmq_peer_discovery_consul \ + rabbitmq_peer_discovery_etcd \ + rabbitmq_peer_discovery_k8s \ + rabbitmq_prometheus \ + rabbitmq_random_exchange \ + rabbitmq_recent_history_exchange \ + rabbitmq_routing_node_stamp \ + rabbitmq_rtopic_exchange \ + rabbitmq_server_release \ + rabbitmq_sharding \ + rabbitmq_shovel \ + rabbitmq_shovel_management \ + rabbitmq_stomp \ + rabbitmq_stream \ + rabbitmq_toke \ + rabbitmq_top \ + rabbitmq_tracing \ + rabbitmq_trust_store \ + rabbitmq_web_dispatch \ + rabbitmq_web_mqtt \ + rabbitmq_web_mqtt_examples \ + rabbitmq_web_stomp \ + rabbitmq_web_stomp_examples \ + rabbitmq_website + +# Erlang.mk does not rebuild dependencies by default, once they were +# compiled once, except for those listed in the `$(FORCE_REBUILD)` +# variable. +# +# We want all RabbitMQ components to always be rebuilt: this eases +# the work on several components at the same time. + +FORCE_REBUILD = $(RABBITMQ_COMPONENTS) + +# Several components have a custom erlang.mk/build.config, mainly +# to disable eunit. Therefore, we can't use the top-level project's +# erlang.mk copy. +NO_AUTOPATCH += $(RABBITMQ_COMPONENTS) + +ifeq ($(origin current_rmq_ref),undefined) +ifneq ($(wildcard .git),) +current_rmq_ref := $(shell (\ + ref=$$(LANG=C git branch --list | awk '/^\* \(.*detached / {ref=$$0; sub(/.*detached [^ ]+ /, "", ref); sub(/\)$$/, "", ref); print ref; exit;} /^\* / {ref=$$0; sub(/^\* /, "", ref); print ref; exit}');\ + if test "$$(git rev-parse --short HEAD)" != "$$ref"; then echo "$$ref"; fi)) +else +current_rmq_ref := master +endif +endif +export current_rmq_ref + +ifeq ($(origin base_rmq_ref),undefined) +ifneq ($(wildcard .git),) +possible_base_rmq_ref := master +ifeq ($(possible_base_rmq_ref),$(current_rmq_ref)) +base_rmq_ref := $(current_rmq_ref) +else +base_rmq_ref := $(shell \ + (git rev-parse --verify -q master >/dev/null && \ + git rev-parse --verify -q $(possible_base_rmq_ref) >/dev/null && \ + git merge-base --is-ancestor $$(git merge-base master HEAD) $(possible_base_rmq_ref) && \ + echo $(possible_base_rmq_ref)) || \ + echo master) +endif +else +base_rmq_ref := master +endif +endif +export base_rmq_ref + +# Repository URL selection. +# +# First, we infer other components' location from the current project +# repository URL, if it's a Git repository: +# - We take the "origin" remote URL as the base +# - The current project name and repository name is replaced by the +# target's properties: +# eg. rabbitmq-common is replaced by rabbitmq-codegen +# eg. rabbit_common is replaced by rabbitmq_codegen +# +# If cloning from this computed location fails, we fallback to RabbitMQ +# upstream which is GitHub. + +# Macro to transform eg. "rabbit_common" to "rabbitmq-common". +rmq_cmp_repo_name = $(word 2,$(dep_$(1))) + +# Upstream URL for the current project. +RABBITMQ_COMPONENT_REPO_NAME := $(call rmq_cmp_repo_name,$(PROJECT)) +RABBITMQ_UPSTREAM_FETCH_URL ?= https://github.com/rabbitmq/$(RABBITMQ_COMPONENT_REPO_NAME).git +RABBITMQ_UPSTREAM_PUSH_URL ?= git@github.com:rabbitmq/$(RABBITMQ_COMPONENT_REPO_NAME).git + +# Current URL for the current project. If this is not a Git clone, +# default to the upstream Git repository. +ifneq ($(wildcard .git),) +git_origin_fetch_url := $(shell git config remote.origin.url) +git_origin_push_url := $(shell git config remote.origin.pushurl || git config remote.origin.url) +RABBITMQ_CURRENT_FETCH_URL ?= $(git_origin_fetch_url) +RABBITMQ_CURRENT_PUSH_URL ?= $(git_origin_push_url) +else +RABBITMQ_CURRENT_FETCH_URL ?= $(RABBITMQ_UPSTREAM_FETCH_URL) +RABBITMQ_CURRENT_PUSH_URL ?= $(RABBITMQ_UPSTREAM_PUSH_URL) +endif + +# Macro to replace the following pattern: +# 1. /foo.git -> /bar.git +# 2. /foo -> /bar +# 3. /foo/ -> /bar/ +subst_repo_name = $(patsubst %/$(1)/%,%/$(2)/%,$(patsubst %/$(1),%/$(2),$(patsubst %/$(1).git,%/$(2).git,$(3)))) + +# Macro to replace both the project's name (eg. "rabbit_common") and +# repository name (eg. "rabbitmq-common") by the target's equivalent. +# +# This macro is kept on one line because we don't want whitespaces in +# the returned value, as it's used in $(dep_fetch_git_rmq) in a shell +# single-quoted string. +dep_rmq_repo = $(if $(dep_$(2)),$(call subst_repo_name,$(PROJECT),$(2),$(call subst_repo_name,$(RABBITMQ_COMPONENT_REPO_NAME),$(call rmq_cmp_repo_name,$(2)),$(1))),$(pkg_$(1)_repo)) + +dep_rmq_commits = $(if $(dep_$(1)), \ + $(wordlist 3,$(words $(dep_$(1))),$(dep_$(1))), \ + $(pkg_$(1)_commit)) + +define dep_fetch_git_rmq + fetch_url1='$(call dep_rmq_repo,$(RABBITMQ_CURRENT_FETCH_URL),$(1))'; \ + fetch_url2='$(call dep_rmq_repo,$(RABBITMQ_UPSTREAM_FETCH_URL),$(1))'; \ + if test "$$$$fetch_url1" != '$(RABBITMQ_CURRENT_FETCH_URL)' && \ + git clone -q -n -- "$$$$fetch_url1" $(DEPS_DIR)/$(call dep_name,$(1)); then \ + fetch_url="$$$$fetch_url1"; \ + push_url='$(call dep_rmq_repo,$(RABBITMQ_CURRENT_PUSH_URL),$(1))'; \ + elif git clone -q -n -- "$$$$fetch_url2" $(DEPS_DIR)/$(call dep_name,$(1)); then \ + fetch_url="$$$$fetch_url2"; \ + push_url='$(call dep_rmq_repo,$(RABBITMQ_UPSTREAM_PUSH_URL),$(1))'; \ + fi; \ + cd $(DEPS_DIR)/$(call dep_name,$(1)) && ( \ + $(foreach ref,$(call dep_rmq_commits,$(1)), \ + git checkout -q $(ref) >/dev/null 2>&1 || \ + ) \ + (echo "error: no valid pathspec among: $(call dep_rmq_commits,$(1))" \ + 1>&2 && false) ) && \ + (test "$$$$fetch_url" = "$$$$push_url" || \ + git remote set-url --push origin "$$$$push_url") +endef + +# -------------------------------------------------------------------- +# Component distribution. +# -------------------------------------------------------------------- + +list-dist-deps:: + @: + +prepare-dist:: + @: + +# -------------------------------------------------------------------- +# Umbrella-specific settings. +# -------------------------------------------------------------------- + +# If the top-level project is a RabbitMQ component, we override +# $(DEPS_DIR) for this project to point to the top-level's one. +# +# We also verify that the guessed DEPS_DIR is actually named `deps`, +# to rule out any situation where it is a coincidence that we found a +# `rabbitmq-components.mk` up upper directories. + +possible_deps_dir_1 = $(abspath ..) +possible_deps_dir_2 = $(abspath ../../..) + +ifeq ($(notdir $(possible_deps_dir_1)),deps) +ifneq ($(wildcard $(possible_deps_dir_1)/../rabbitmq-components.mk),) +deps_dir_overriden = 1 +DEPS_DIR ?= $(possible_deps_dir_1) +DISABLE_DISTCLEAN = 1 +endif +endif + +ifeq ($(deps_dir_overriden),) +ifeq ($(notdir $(possible_deps_dir_2)),deps) +ifneq ($(wildcard $(possible_deps_dir_2)/../rabbitmq-components.mk),) +deps_dir_overriden = 1 +DEPS_DIR ?= $(possible_deps_dir_2) +DISABLE_DISTCLEAN = 1 +endif +endif +endif + +ifneq ($(wildcard UMBRELLA.md),) +DISABLE_DISTCLEAN = 1 +endif + +# We disable `make distclean` so $(DEPS_DIR) is not accidentally removed. + +ifeq ($(DISABLE_DISTCLEAN),1) +ifneq ($(filter distclean distclean-deps,$(MAKECMDGOALS)),) +SKIP_DEPS = 1 +endif +endif diff --git a/deps/rabbitmq_federation/src/Elixir.RabbitMQ.CLI.Ctl.Commands.FederationStatusCommand.erl b/deps/rabbitmq_federation/src/Elixir.RabbitMQ.CLI.Ctl.Commands.FederationStatusCommand.erl new file mode 100644 index 0000000000..bab4dddeec --- /dev/null +++ b/deps/rabbitmq_federation/src/Elixir.RabbitMQ.CLI.Ctl.Commands.FederationStatusCommand.erl @@ -0,0 +1,117 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module('Elixir.RabbitMQ.CLI.Ctl.Commands.FederationStatusCommand'). + +-include("rabbit_federation.hrl"). + +-behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour'). + +-export([ + usage/0, + usage_additional/0, + usage_doc_guides/0, + flags/0, + validate/2, + merge_defaults/2, + banner/2, + run/2, + switches/0, + aliases/0, + output/2, + scopes/0, + formatter/0, + help_section/0, + description/0 + ]). + + +%%---------------------------------------------------------------------------- +%% Callbacks +%%---------------------------------------------------------------------------- +usage() -> + <<"federation_status [--only-down]">>. + +usage_additional() -> + [ + {<<"--only-down">>, <<"only display links that failed or are not currently connected">>} + ]. + +usage_doc_guides() -> + [?FEDERATION_GUIDE_URL]. + +help_section() -> + {plugin, federation}. + +description() -> + <<"Displays federation link status">>. + +flags() -> + []. + +validate(_,_) -> + ok. + +formatter() -> + 'Elixir.RabbitMQ.CLI.Formatters.Erlang'. + +merge_defaults(A, Opts) -> + {A, maps:merge(#{only_down => false}, Opts)}. + +banner(_, #{node := Node, only_down := true}) -> + erlang:iolist_to_binary([<<"Listing federation links which are down on node ">>, + atom_to_binary(Node, utf8), <<"...">>]); +banner(_, #{node := Node, only_down := false}) -> + erlang:iolist_to_binary([<<"Listing federation links on node ">>, + atom_to_binary(Node, utf8), <<"...">>]). + +run(_Args, #{node := Node, only_down := OnlyDown}) -> + case rabbit_misc:rpc_call(Node, rabbit_federation_status, status, []) of + {badrpc, _} = Error -> + Error; + Status -> + {stream, filter(Status, OnlyDown)} + end. + +switches() -> + [{only_down, boolean}]. + +aliases() -> + []. + +output({stream, FederationStatus}, _) -> + Formatted = [begin + Timestamp = proplists:get_value(timestamp, St), + Map0 = maps:remove(timestamp, maps:from_list(St)), + Map1 = maps:merge(#{queue => <<>>, + exchange => <<>>, + upstream_queue => <<>>, + upstream_exchange => <<>>, + local_connection => <<>>, + error => <<>>}, Map0), + Map1#{last_changed => fmt_ts(Timestamp)} + end || St <- FederationStatus], + {stream, Formatted}; +output(E, _Opts) -> + 'Elixir.RabbitMQ.CLI.DefaultOutput':output(E). + +scopes() -> + ['ctl', 'diagnostics']. + +%%---------------------------------------------------------------------------- +%% Formatting +%%---------------------------------------------------------------------------- +fmt_ts({{YY, MM, DD}, {Hour, Min, Sec}}) -> + erlang:list_to_binary( + io_lib:format("~4..0w-~2..0w-~2..0w ~2..0w:~2..0w:~2..0w", + [YY, MM, DD, Hour, Min, Sec])). + +filter(Status, _OnlyDown = false) -> + Status; +filter(Status, _OnlyDown = true) -> + [St || St <- Status, + not lists:member(proplists:get_value(status, St), [running, starting])]. diff --git a/deps/rabbitmq_federation/src/Elixir.RabbitMQ.CLI.Ctl.Commands.RestartFederationLinkCommand.erl b/deps/rabbitmq_federation/src/Elixir.RabbitMQ.CLI.Ctl.Commands.RestartFederationLinkCommand.erl new file mode 100644 index 0000000000..8d062c692c --- /dev/null +++ b/deps/rabbitmq_federation/src/Elixir.RabbitMQ.CLI.Ctl.Commands.RestartFederationLinkCommand.erl @@ -0,0 +1,84 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module('Elixir.RabbitMQ.CLI.Ctl.Commands.RestartFederationLinkCommand'). + +-include("rabbit_federation.hrl"). + +-behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour'). + +-export([ + usage/0, + usage_additional/0, + usage_doc_guides/0, + flags/0, + validate/2, + merge_defaults/2, + banner/2, + run/2, + aliases/0, + output/2, + help_section/0, + description/0 + ]). + + +%%---------------------------------------------------------------------------- +%% Callbacks +%%---------------------------------------------------------------------------- +usage() -> + <<"restart_federation_link <link_id>">>. + +usage_additional() -> + [ + {<<"<link_id>">>, <<"ID of the link to restart">>} + ]. + +usage_doc_guides() -> + [?FEDERATION_GUIDE_URL]. + +help_section() -> + {plugin, federation}. + +description() -> + <<"Restarts a running federation link">>. + +flags() -> + []. + +validate([], _Opts) -> + {validation_failure, not_enough_args}; +validate([_, _ | _], _Opts) -> + {validation_failure, too_many_args}; +validate([_], _) -> + ok. + +merge_defaults(A, O) -> + {A, O}. + +banner([Link], #{node := Node}) -> + erlang:iolist_to_binary([<<"Restarting federation link ">>, Link, << " on node ">>, + atom_to_binary(Node, utf8)]). + +run([Id], #{node := Node}) -> + case rabbit_misc:rpc_call(Node, rabbit_federation_status, lookup, [Id]) of + {badrpc, _} = Error -> + Error; + not_found -> + {error, <<"Link with the given ID was not found">>}; + Obj -> + Upstream = proplists:get_value(upstream, Obj), + Supervisor = proplists:get_value(supervisor, Obj), + rabbit_misc:rpc_call(Node, rabbit_federation_link_sup, restart, + [Supervisor, Upstream]) + end. + +aliases() -> + []. + +output(Output, _Opts) -> + 'Elixir.RabbitMQ.CLI.DefaultOutput':output(Output). diff --git a/deps/rabbitmq_federation/src/rabbit_federation_app.erl b/deps/rabbitmq_federation/src/rabbit_federation_app.erl new file mode 100644 index 0000000000..ee7ba91e5f --- /dev/null +++ b/deps/rabbitmq_federation/src/rabbit_federation_app.erl @@ -0,0 +1,38 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_federation_app). + +-behaviour(application). +-export([start/2, stop/1]). + +%% Dummy supervisor - see Ulf Wiger's comment at +%% http://erlang.2086793.n4.nabble.com/initializing-library-applications-without-processes-td2094473.html + +%% All of our actual server processes are supervised by +%% rabbit_federation_sup, which is started by a rabbit_boot_step +%% (since it needs to start up before queue / exchange recovery, so it +%% can't be part of our application). +%% +%% However, we still need an application behaviour since we need to +%% know when our application has started since then the Erlang client +%% will have started and we can therefore start our links going. Since +%% the application behaviour needs a tree of processes to supervise, +%% this is it... +-behaviour(supervisor). +-export([init/1]). + +start(_Type, _StartArgs) -> + rabbit_federation_exchange_link:go(), + rabbit_federation_queue_link:go(), + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +stop(_State) -> + ok. +%%---------------------------------------------------------------------------- + +init([]) -> {ok, {{one_for_one, 3, 10}, []}}. diff --git a/deps/rabbitmq_federation/src/rabbit_federation_db.erl b/deps/rabbitmq_federation/src/rabbit_federation_db.erl new file mode 100644 index 0000000000..e35e3646a8 --- /dev/null +++ b/deps/rabbitmq_federation/src/rabbit_federation_db.erl @@ -0,0 +1,47 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_federation_event). +-behaviour(gen_event). + +-include_lib("rabbit_common/include/rabbit.hrl"). + +-export([add_handler/0, remove_handler/0]). + +-export([init/1, handle_call/2, handle_event/2, handle_info/2, + terminate/2, code_change/3]). + +-import(rabbit_misc, [pget/2]). + +%%---------------------------------------------------------------------------- + +add_handler() -> + gen_event:add_handler(rabbit_event, ?MODULE, []). + +remove_handler() -> + gen_event:delete_handler(rabbit_event, ?MODULE, []). + +init([]) -> + {ok, []}. + +handle_call(_Request, State) -> + {ok, not_understood, State}. + +handle_event(#event{type = parameter_set, + props = Props0}, State) -> + Props = rabbit_data_coercion:to_list(Props0), + case {pget(component, Props), pget(name, Props)} of + {global, cluster_name} -> + rabbit_federation_parameters:adjust(everything); + _ -> + ok + end, + {ok, State}; +handle_event(_Event, State) -> + {ok, State}. + +handle_info(_Info, State) -> + {ok, State}. + +terminate(_Arg, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/deps/rabbitmq_federation/src/rabbit_federation_exchange.erl b/deps/rabbitmq_federation/src/rabbit_federation_exchange.erl new file mode 100644 index 0000000000..6b85b6756b --- /dev/null +++ b/deps/rabbitmq_federation/src/rabbit_federation_exchange.erl @@ -0,0 +1,105 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. All rights reserved. +%% + +%% TODO rename this +-module(rabbit_federation_exchange). + +-rabbit_boot_step({?MODULE, + [{description, "federation exchange decorator"}, + {mfa, {rabbit_registry, register, + [exchange_decorator, <<"federation">>, ?MODULE]}}, + {requires, rabbit_registry}, + {cleanup, {rabbit_registry, unregister, + [exchange_decorator, <<"federation">>]}}, + {enables, recovery}]}). + +-include_lib("amqp_client/include/amqp_client.hrl"). + +-behaviour(rabbit_exchange_decorator). + +-export([description/0, serialise_events/1]). +-export([create/2, delete/3, policy_changed/2, + add_binding/3, remove_bindings/3, route/2, active_for/1]). + +%%---------------------------------------------------------------------------- + +description() -> + [{description, <<"Federation exchange decorator">>}]. + +serialise_events(X) -> federate(X). + +create(transaction, _X) -> + ok; +create(none, X) -> + maybe_start(X). + +delete(transaction, _X, _Bs) -> + ok; +delete(none, X, _Bs) -> + maybe_stop(X). + +policy_changed(OldX, NewX) -> + maybe_stop(OldX), + maybe_start(NewX). + +add_binding(transaction, _X, _B) -> + ok; +add_binding(Serial, X = #exchange{name = XName}, B) -> + case federate(X) of + true -> rabbit_federation_exchange_link:add_binding(Serial, XName, B), + ok; + false -> ok + end. + +remove_bindings(transaction, _X, _Bs) -> + ok; +remove_bindings(Serial, X = #exchange{name = XName}, Bs) -> + case federate(X) of + true -> rabbit_federation_exchange_link:remove_bindings(Serial, XName, Bs), + ok; + false -> ok + end. + +route(_, _) -> []. + +active_for(X) -> + case federate(X) of + true -> noroute; + false -> none + end. + +%%---------------------------------------------------------------------------- + +%% Don't federate default exchange, we can't bind to it +federate(#exchange{name = #resource{name = <<"">>}}) -> + false; + +%% Don't federate any of our intermediate exchanges. Note that we use +%% internal=true since older brokers may not declare +%% x-federation-upstream on us. Also other internal exchanges should +%% probably not be federated. +federate(#exchange{internal = true}) -> + false; + +federate(X) -> + rabbit_federation_upstream:federate(X). + +maybe_start(X = #exchange{name = XName})-> + case federate(X) of + true -> ok = rabbit_federation_db:prune_scratch( + XName, rabbit_federation_upstream:for(X)), + ok = rabbit_federation_exchange_link_sup_sup:start_child(X), + ok; + false -> ok + end. + +maybe_stop(X = #exchange{name = XName}) -> + case federate(X) of + true -> ok = rabbit_federation_exchange_link_sup_sup:stop_child(X), + rabbit_federation_status:remove_exchange_or_queue(XName); + false -> ok + end. diff --git a/deps/rabbitmq_federation/src/rabbit_federation_exchange_link.erl b/deps/rabbitmq_federation/src/rabbit_federation_exchange_link.erl new file mode 100644 index 0000000000..869ab047ae --- /dev/null +++ b/deps/rabbitmq_federation/src/rabbit_federation_exchange_link.erl @@ -0,0 +1,696 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. All rights reserved. +%% + +-module(rabbit_federation_exchange_link). + +%% pg2 is deprecated in OTP 23. +-compile(nowarn_deprecated_function). + +-include_lib("amqp_client/include/amqp_client.hrl"). +-include("rabbit_federation.hrl"). + +-behaviour(gen_server2). + +-export([go/0, add_binding/3, remove_bindings/3]). +-export([list_routing_keys/1]). %% For testing + +-export([start_link/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-import(rabbit_misc, [pget/2]). +-import(rabbit_federation_util, [name/1, vhost/1, pgname/1]). + +-record(state, {upstream, + upstream_params, + upstream_name, + connection, + channel, + cmd_channel, + consumer_tag, + queue, + internal_exchange, + waiting_cmds = gb_trees:empty(), + next_serial, + bindings = #{}, + downstream_connection, + downstream_channel, + downstream_exchange, + unacked, + internal_exchange_timer, + internal_exchange_interval}). + +%%---------------------------------------------------------------------------- + +%% We start off in a state where we do not connect, since we can first +%% start during exchange recovery, when rabbit is not fully started +%% and the Erlang client is not running. This then gets invoked when +%% the federation app is started. +go() -> cast(go). + +add_binding(S, XN, B) -> cast(XN, {enqueue, S, {add_binding, B}}). +remove_bindings(S, XN, Bs) -> cast(XN, {enqueue, S, {remove_bindings, Bs}}). + +list_routing_keys(XN) -> call(XN, list_routing_keys). + +%%---------------------------------------------------------------------------- + +start_link(Args) -> + gen_server2:start_link(?MODULE, Args, [{timeout, infinity}]). + +init({Upstream, XName}) -> + %% If we are starting up due to a policy change then it's possible + %% for the exchange to have been deleted before we got here, in which + %% case it's possible that delete callback would also have been called + %% before we got here. So check if we still exist. + case rabbit_exchange:lookup(XName) of + {ok, X} -> + DeobfuscatedUpstream = rabbit_federation_util:deobfuscate_upstream(Upstream), + DeobfuscatedUParams = rabbit_federation_upstream:to_params(DeobfuscatedUpstream, X), + UParams = rabbit_federation_util:obfuscate_upstream_params(DeobfuscatedUParams), + rabbit_federation_status:report(Upstream, UParams, XName, starting), + join(rabbit_federation_exchanges), + join({rabbit_federation_exchange, XName}), + gen_server2:cast(self(), maybe_go), + {ok, {not_started, {Upstream, UParams, XName}}}; + {error, not_found} -> + rabbit_federation_link_util:log_warning(XName, "not found, stopping link~n", []), + {stop, gone} + end. + +handle_call(list_routing_keys, _From, State = #state{bindings = Bindings}) -> + {reply, lists:sort([K || {K, _} <- maps:keys(Bindings)]), State}; + +handle_call(Msg, _From, State) -> + {stop, {unexpected_call, Msg}, State}. + +handle_cast(maybe_go, S0 = {not_started, _Args}) -> + case federation_up() of + true -> go(S0); + false -> {noreply, S0} + end; + +handle_cast(go, S0 = {not_started, _Args}) -> + go(S0); + +%% There's a small race - I think we can realise federation is up +%% before 'go' gets invoked. Ignore. +handle_cast(go, State) -> + {noreply, State}; + +handle_cast({enqueue, _, _}, State = {not_started, _}) -> + {noreply, State}; + +handle_cast({enqueue, Serial, Cmd}, + State = #state{waiting_cmds = Waiting, + downstream_exchange = XName}) -> + Waiting1 = gb_trees:insert(Serial, Cmd, Waiting), + try + {noreply, play_back_commands(State#state{waiting_cmds = Waiting1})} + catch exit:{{shutdown, {server_initiated_close, 404, Text}}, _} -> + rabbit_federation_link_util:log_warning( + XName, "detected upstream changes, restarting link: ~p~n", [Text]), + {stop, {shutdown, restart}, State} + end; + +handle_cast(Msg, State) -> + {stop, {unexpected_cast, Msg}, State}. + +handle_info(#'basic.consume_ok'{}, State) -> + {noreply, State}; + +handle_info(#'basic.ack'{} = Ack, State = #state{channel = Ch, + unacked = Unacked}) -> + Unacked1 = rabbit_federation_link_util:ack(Ack, Ch, Unacked), + {noreply, State#state{unacked = Unacked1}}; + +handle_info(#'basic.nack'{} = Nack, State = #state{channel = Ch, + unacked = Unacked}) -> + Unacked1 = rabbit_federation_link_util:nack(Nack, Ch, Unacked), + {noreply, State#state{unacked = Unacked1}}; + +handle_info({#'basic.deliver'{routing_key = Key, + redelivered = Redelivered} = DeliverMethod, Msg}, + State = #state{ + upstream = Upstream = #upstream{max_hops = MaxH}, + upstream_params = UParams = #upstream_params{x_or_q = UpstreamX}, + upstream_name = UName, + downstream_exchange = #resource{name = XNameBin, virtual_host = DVhost}, + downstream_channel = DCh, + channel = Ch, + unacked = Unacked}) -> + UVhost = vhost(UpstreamX), + PublishMethod = #'basic.publish'{exchange = XNameBin, + routing_key = Key}, + HeadersFun = fun (H) -> update_routing_headers(UParams, UName, UVhost, Redelivered, H) end, + %% We need to check should_forward/2 here in case the upstream + %% does not have federation and thus is using a fanout exchange. + ForwardFun = fun (H) -> + DName = rabbit_nodes:cluster_name(), + rabbit_federation_util:should_forward(H, MaxH, DName, DVhost) + end, + Unacked1 = rabbit_federation_link_util:forward( + Upstream, DeliverMethod, Ch, DCh, PublishMethod, + HeadersFun, ForwardFun, Msg, Unacked), + {noreply, State#state{unacked = Unacked1}}; + +handle_info(#'basic.cancel'{}, State = #state{upstream = Upstream, + upstream_params = UParams, + downstream_exchange = XName}) -> + rabbit_federation_link_util:connection_error( + local, basic_cancel, Upstream, UParams, XName, State); + +handle_info({'DOWN', _Ref, process, Pid, Reason}, + State = #state{downstream_channel = DCh, + channel = Ch, + cmd_channel = CmdCh, + upstream = Upstream, + upstream_params = UParams, + downstream_exchange = XName}) -> + handle_down(Pid, Reason, Ch, CmdCh, DCh, + {Upstream, UParams, XName}, State); + +handle_info(check_internal_exchange, State = #state{internal_exchange = IntXNameBin, + internal_exchange_interval = Interval}) -> + case check_internal_exchange(IntXNameBin, State) of + upstream_not_found -> + rabbit_log_federation:warning("Federation link could not find upstream exchange '~s' and will restart", + [IntXNameBin]), + {stop, {shutdown, restart}, State}; + _ -> + TRef = erlang:send_after(Interval, self(), check_internal_exchange), + {noreply, State#state{internal_exchange_timer = TRef}} + end; + +handle_info(Msg, State) -> + {stop, {unexpected_info, Msg}, State}. + +terminate(_Reason, {not_started, _}) -> + ok; +terminate(Reason, #state{downstream_connection = DConn, + connection = Conn, + upstream = Upstream, + upstream_params = UParams, + downstream_exchange = XName, + internal_exchange_timer = TRef, + internal_exchange = IntExchange, + queue = Queue}) when Reason =:= shutdown; + Reason =:= {shutdown, restart}; + Reason =:= gone -> + timer:cancel(TRef), + rabbit_federation_link_util:ensure_connection_closed(DConn), + + rabbit_log:debug("Exchange federation: link is shutting down, resource cleanup mode: ~p", [Upstream#upstream.resource_cleanup_mode]), + case Upstream#upstream.resource_cleanup_mode of + never -> ok; + _ -> + %% This is a normal shutdown and we are allowed to clean up the internally used queue and exchange + rabbit_log:debug("Federated exchange '~s' link will delete its internal queue '~s'", [Upstream#upstream.exchange_name, Queue]), + delete_upstream_queue(Conn, Queue), + rabbit_log:debug("Federated exchange '~s' link will delete its upstream exchange", [Upstream#upstream.exchange_name]), + delete_upstream_exchange(Conn, IntExchange) + end, + + rabbit_federation_link_util:ensure_connection_closed(Conn), + rabbit_federation_link_util:log_terminate(Reason, Upstream, UParams, XName), + ok; +%% unexpected shutdown +terminate(Reason, #state{downstream_connection = DConn, + connection = Conn, + upstream = Upstream, + upstream_params = UParams, + downstream_exchange = XName, + internal_exchange_timer = TRef}) -> + timer:cancel(TRef), + + rabbit_federation_link_util:ensure_connection_closed(DConn), + + %% unlike in the clean shutdown case above, we keep the queue + %% and exchange around + + rabbit_federation_link_util:ensure_connection_closed(Conn), + rabbit_federation_link_util:log_terminate(Reason, Upstream, UParams, XName), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%---------------------------------------------------------------------------- + +call(XName, Msg) -> [gen_server2:call(Pid, Msg, infinity) || Pid <- x(XName)]. +cast(Msg) -> [gen_server2:cast(Pid, Msg) || Pid <- all()]. +cast(XName, Msg) -> [gen_server2:cast(Pid, Msg) || Pid <- x(XName)]. + +join(Name) -> + pg2:create(pgname(Name)), + ok = pg2:join(pgname(Name), self()). + +all() -> + pg2:create(pgname(rabbit_federation_exchanges)), + pg2:get_members(pgname(rabbit_federation_exchanges)). + +x(XName) -> + pg2:create(pgname({rabbit_federation_exchange, XName})), + pg2:get_members(pgname({rabbit_federation_exchange, XName})). + +%%---------------------------------------------------------------------------- + +federation_up() -> is_pid(whereis(rabbit_federation_app)). + +handle_command({add_binding, Binding}, State) -> + add_binding(Binding, State); + +handle_command({remove_bindings, Bindings}, State) -> + lists:foldl(fun remove_binding/2, State, Bindings). + +play_back_commands(State = #state{waiting_cmds = Waiting, + next_serial = Next}) -> + case gb_trees:is_empty(Waiting) of + false -> case gb_trees:take_smallest(Waiting) of + {Next, Cmd, Waiting1} -> + %% The next one. Just execute it. + play_back_commands( + handle_command(Cmd, State#state{ + waiting_cmds = Waiting1, + next_serial = Next + 1})); + {Serial, _Cmd, Waiting1} when Serial < Next -> + %% This command came from before we executed + %% binding:list_for_source. Ignore it. + play_back_commands(State#state{ + waiting_cmds = Waiting1}); + _ -> + %% Some future command. Don't do anything. + State + end; + true -> State + end. + +add_binding(B, State) -> + binding_op(fun record_binding/2, bind_cmd(bind, B, State), B, State). + +remove_binding(B, State) -> + binding_op(fun forget_binding/2, bind_cmd(unbind, B, State), B, State). + +record_binding(B = #binding{destination = Dest}, + State = #state{bindings = Bs}) -> + {DoIt, Set} = case maps:find(key(B), Bs) of + error -> {true, sets:from_list([Dest])}; + {ok, Dests} -> {false, sets:add_element( + Dest, Dests)} + end, + {DoIt, State#state{bindings = maps:put(key(B), Set, Bs)}}. + +forget_binding(B = #binding{destination = Dest}, + State = #state{bindings = Bs}) -> + Dests = sets:del_element(Dest, maps:get(key(B), Bs)), + {DoIt, Bs1} = case sets:size(Dests) of + 0 -> {true, maps:remove(key(B), Bs)}; + _ -> {false, maps:put(key(B), Dests, Bs)} + end, + {DoIt, State#state{bindings = Bs1}}. + +binding_op(UpdateFun, Cmd, B = #binding{args = Args}, + State = #state{cmd_channel = Ch}) -> + {DoIt, State1} = + case rabbit_misc:table_lookup(Args, ?BINDING_HEADER) of + undefined -> UpdateFun(B, State); + {array, _} -> {Cmd =/= ignore, State} + end, + case DoIt of + true -> amqp_channel:call(Ch, Cmd); + false -> ok + end, + State1. + +bind_cmd(Type, #binding{key = Key, args = Args}, + State = #state{internal_exchange = IntXNameBin, + upstream_params = UpstreamParams, + upstream = Upstream}) -> + #upstream_params{x_or_q = X} = UpstreamParams, + #upstream{bind_nowait = Nowait} = Upstream, + case update_binding(Args, State) of + ignore -> ignore; + NewArgs -> bind_cmd0(Type, name(X), IntXNameBin, Key, NewArgs, Nowait) + end. + +bind_cmd0(bind, Source, Destination, RoutingKey, Arguments, Nowait) -> + #'exchange.bind'{source = Source, + destination = Destination, + routing_key = RoutingKey, + arguments = Arguments, + nowait = Nowait}; + +bind_cmd0(unbind, Source, Destination, RoutingKey, Arguments, Nowait) -> + #'exchange.unbind'{source = Source, + destination = Destination, + routing_key = RoutingKey, + arguments = Arguments, + nowait = Nowait}. + +%% This function adds information about the current node to the +%% binding arguments, or returns 'ignore' if it determines the binding +%% should propagate no further. The interesting part is the latter. +%% +%% We want bindings to propagate in the same way as messages +%% w.r.t. max_hops - if we determine that a message can get from node +%% A to B (assuming bindings are in place) then it follows that a +%% binding at B should propagate back to A, and no further. There is +%% no point in propagating bindings past the point where messages +%% would propagate, and we will lose messages if bindings don't +%% propagate as far. +%% +%% Note that we still want to have limits on how far messages can +%% propagate: limiting our bindings is not enough, since other +%% bindings from other nodes can overlap. +%% +%% So in short we want bindings to obey max_hops. However, they can't +%% just obey the max_hops of the current link, since they are +%% travelling in the opposite direction to messages! Consider the +%% following federation: +%% +%% A -----------> B -----------> C +%% max_hops=1 max_hops=2 +%% +%% where the arrows indicate message flow. A binding created at C +%% should propagate to B, then to A, and no further. Therefore every +%% time we traverse a link, we keep a count of the number of hops that +%% a message could have made so far to reach this point, and still be +%% able to propagate. When this number ("hops" below) reaches 0 we +%% propagate no further. +%% +%% hops(link(N)) is given by: +%% +%% min(hops(link(N-1))-1, max_hops(link(N))) +%% +%% where link(N) is the link that bindings propagate over after N +%% steps (e.g. link(1) is CB above, link(2) is BA). +%% +%% In other words, we count down to 0 from the link with the most +%% restrictive max_hops we have yet passed through. + +update_binding(Args, #state{downstream_exchange = X, + upstream = Upstream, + upstream_params = #upstream_params{x_or_q = UpstreamX}, + upstream_name = UName}) -> + #upstream{max_hops = MaxHops} = Upstream, + UVhost = vhost(UpstreamX), + Hops = case rabbit_misc:table_lookup(Args, ?BINDING_HEADER) of + undefined -> MaxHops; + {array, All} -> [{table, Prev} | _] = All, + PrevHops = get_hops(Prev), + case rabbit_federation_util:already_seen( + UName, UVhost, All) of + true -> 0; + false -> lists:min([PrevHops - 1, MaxHops]) + end + end, + case Hops of + 0 -> ignore; + _ -> Cluster = rabbit_nodes:cluster_name(), + ABSuffix = rabbit_federation_db:get_active_suffix( + X, Upstream, <<"A">>), + DVhost = vhost(X), + DName = name(X), + Down = <<DVhost/binary,":", DName/binary, " ", ABSuffix/binary>>, + Info = [{<<"cluster-name">>, longstr, Cluster}, + {<<"vhost">>, longstr, DVhost}, + {<<"exchange">>, longstr, Down}, + {<<"hops">>, short, Hops}], + rabbit_basic:prepend_table_header(?BINDING_HEADER, Info, Args) + end. + + + +key(#binding{key = Key, args = Args}) -> {Key, Args}. + +go(S0 = {not_started, {Upstream, UParams, DownXName}}) -> + Unacked = rabbit_federation_link_util:unacked_new(), + + log_link_startup_attempt(Upstream, DownXName), + rabbit_federation_link_util:start_conn_ch( + fun (Conn, Ch, DConn, DCh) -> + {ok, CmdCh} = open_cmd_channel(Conn, Upstream, UParams, DownXName, S0), + erlang:monitor(process, CmdCh), + Props = pget(server_properties, + amqp_connection:info(Conn, [server_properties])), + UName = case rabbit_misc:table_lookup( + Props, <<"cluster_name">>) of + {longstr, N} -> N; + _ -> unknown + end, + {Serial, Bindings} = + rabbit_misc:execute_mnesia_transaction( + fun () -> + {rabbit_exchange:peek_serial(DownXName), + rabbit_binding:list_for_source(DownXName)} + end), + true = is_integer(Serial), + %% If we are very short lived, Serial can be undefined at + %% this point (since the deletion of the X could have + %% overtaken the creation of this process). However, this + %% is not a big deal - 'undefined' just becomes the next + %% serial we will process. Since it compares larger than + %% any number we never process any commands. And we will + %% soon get told to stop anyway. + {ok, Interval} = application:get_env(rabbitmq_federation, + internal_exchange_check_interval), + State = ensure_upstream_bindings( + consume_from_upstream_queue( + #state{upstream = Upstream, + upstream_params = UParams, + upstream_name = UName, + connection = Conn, + channel = Ch, + cmd_channel = CmdCh, + next_serial = Serial, + downstream_connection = DConn, + downstream_channel = DCh, + downstream_exchange = DownXName, + unacked = Unacked, + internal_exchange_interval = Interval}), + Bindings), + rabbit_log_federation:info("Federation link for ~s (upstream: ~s) will perform internal exchange checks " + "every ~b seconds", [rabbit_misc:rs(DownXName), UName, round(Interval / 1000)]), + TRef = erlang:send_after(Interval, self(), check_internal_exchange), + {noreply, State#state{internal_exchange_timer = TRef}} + end, Upstream, UParams, DownXName, S0). + +log_link_startup_attempt(OUpstream, DownXName) -> + rabbit_log_federation:debug("Will try to start a federation link for ~s, upstream: '~s'", + [rabbit_misc:rs(DownXName), OUpstream#upstream.name]). + +open_cmd_channel(Conn, Upstream = #upstream{name = UName}, UParams, DownXName, S0) -> + rabbit_log_federation:debug("Will open a command channel to upstream '~s' for downstream federated ~s", + [UName, rabbit_misc:rs(DownXName)]), + case amqp_connection:open_channel(Conn) of + {ok, CCh} -> + erlang:monitor(process, CCh), + {ok, CCh}; + E -> + rabbit_federation_link_util:ensure_connection_closed(Conn), + rabbit_federation_link_util:connection_error(command_channel, E, + Upstream, UParams, DownXName, S0), + E + end. + +consume_from_upstream_queue( + State = #state{upstream = Upstream, + upstream_params = UParams, + channel = Ch, + downstream_exchange = DownXName}) -> + #upstream{prefetch_count = Prefetch, + expires = Expiry, + message_ttl = TTL, + ha_policy = HA} = Upstream, + #upstream_params{x_or_q = X, + params = Params} = UParams, + Q = upstream_queue_name(name(X), vhost(Params), DownXName), + Args = [A || {_K, _T, V} = A + <- [{<<"x-expires">>, long, Expiry}, + {<<"x-message-ttl">>, long, TTL}, + {<<"x-ha-policy">>, longstr, HA}, + {<<"x-internal-purpose">>, longstr, <<"federation">>}], + V =/= none], + amqp_channel:call(Ch, #'queue.declare'{queue = Q, + durable = true, + arguments = Args}), + NoAck = Upstream#upstream.ack_mode =:= 'no-ack', + case NoAck of + false -> amqp_channel:call(Ch, #'basic.qos'{prefetch_count = Prefetch}); + true -> ok + end, + #'basic.consume_ok'{consumer_tag = CTag} = + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, + no_ack = NoAck}, self()), + State#state{consumer_tag = CTag, + queue = Q}. + +ensure_upstream_bindings(State = #state{upstream = Upstream, + connection = Conn, + channel = Ch, + downstream_exchange = DownXName, + queue = Q}, Bindings) -> + OldSuffix = rabbit_federation_db:get_active_suffix( + DownXName, Upstream, <<"A">>), + Suffix = case OldSuffix of + <<"A">> -> <<"B">>; + <<"B">> -> <<"A">> + end, + IntXNameBin = upstream_exchange_name(Q, Suffix), + ensure_upstream_exchange(State), + ensure_internal_exchange(IntXNameBin, State), + amqp_channel:call(Ch, #'queue.bind'{exchange = IntXNameBin, queue = Q}), + State1 = State#state{internal_exchange = IntXNameBin}, + rabbit_federation_db:set_active_suffix(DownXName, Upstream, Suffix), + State2 = lists:foldl(fun add_binding/2, State1, Bindings), + OldIntXNameBin = upstream_exchange_name(Q, OldSuffix), + delete_upstream_exchange(Conn, OldIntXNameBin), + State2. + +ensure_upstream_exchange(#state{upstream_params = UParams, + connection = Conn, + channel = Ch}) -> + #upstream_params{x_or_q = X} = UParams, + #exchange{type = Type, + durable = Durable, + auto_delete = AutoDelete, + internal = Internal, + arguments = Arguments} = X, + Decl = #'exchange.declare'{exchange = name(X), + type = list_to_binary(atom_to_list(Type)), + durable = Durable, + auto_delete = AutoDelete, + internal = Internal, + arguments = Arguments}, + rabbit_federation_link_util:disposable_channel_call( + Conn, Decl#'exchange.declare'{passive = true}, + fun(?NOT_FOUND, _Text) -> + amqp_channel:call(Ch, Decl) + end). + +ensure_internal_exchange(IntXNameBin, + #state{upstream = #upstream{max_hops = MaxHops, name = UName}, + upstream_params = UParams, + connection = Conn, + channel = Ch, + downstream_exchange = #resource{virtual_host = DVhost}}) -> + rabbit_log_federation:debug("Exchange federation will set up exchange '~s' in upstream '~s'", + [IntXNameBin, UName]), + #upstream_params{params = Params} = rabbit_federation_util:deobfuscate_upstream_params(UParams), + rabbit_log_federation:debug("Will delete upstream exchange '~s'", [IntXNameBin]), + delete_upstream_exchange(Conn, IntXNameBin), + rabbit_log_federation:debug("Will declare an internal upstream exchange '~s'", [IntXNameBin]), + Base = #'exchange.declare'{exchange = IntXNameBin, + durable = true, + internal = true, + auto_delete = true}, + Purpose = [{<<"x-internal-purpose">>, longstr, <<"federation">>}], + XFUArgs = [{?MAX_HOPS_ARG, long, MaxHops}, + {?DOWNSTREAM_NAME_ARG, longstr, cycle_detection_node_identifier()}, + {?DOWNSTREAM_VHOST_ARG, longstr, DVhost} + | Purpose], + XFU = Base#'exchange.declare'{type = <<"x-federation-upstream">>, + arguments = XFUArgs}, + Fan = Base#'exchange.declare'{type = <<"fanout">>, + arguments = Purpose}, + rabbit_federation_link_util:disposable_connection_call( + Params, XFU, fun(?COMMAND_INVALID, _Text) -> + amqp_channel:call(Ch, Fan) + end). + +check_internal_exchange(IntXNameBin, + #state{upstream = #upstream{max_hops = MaxHops, name = UName}, + upstream_params = UParams, + downstream_exchange = XName = #resource{virtual_host = DVhost}}) -> + #upstream_params{params = Params} = + rabbit_federation_util:deobfuscate_upstream_params(UParams), + rabbit_log_federation:debug("Exchange federation will check on exchange '~s' in upstream '~s'", + [IntXNameBin, UName]), + Base = #'exchange.declare'{exchange = IntXNameBin, + passive = true, + durable = true, + internal = true, + auto_delete = true}, + Purpose = [{<<"x-internal-purpose">>, longstr, <<"federation">>}], + XFUArgs = [{?MAX_HOPS_ARG, long, MaxHops}, + {?DOWNSTREAM_NAME_ARG, longstr, cycle_detection_node_identifier()}, + {?DOWNSTREAM_VHOST_ARG, longstr, DVhost} + | Purpose], + XFU = Base#'exchange.declare'{type = <<"x-federation-upstream">>, + arguments = XFUArgs}, + rabbit_federation_link_util:disposable_connection_call( + Params, XFU, fun(404, Text) -> + rabbit_federation_link_util:log_warning( + XName, "detected internal upstream exchange changes," + " restarting link: ~p~n", [Text]), + upstream_not_found; + (Code, Text) -> + rabbit_federation_link_util:log_warning( + XName, "internal upstream exchange check failed: ~p ~p~n", + [Code, Text]), + error + end). + +upstream_queue_name(XNameBin, VHost, #resource{name = DownXNameBin, + virtual_host = DownVHost}) -> + Node = rabbit_nodes:cluster_name(), + DownPart = case DownVHost of + VHost -> case DownXNameBin of + XNameBin -> <<"">>; + _ -> <<":", DownXNameBin/binary>> + end; + _ -> <<":", DownVHost/binary, + ":", DownXNameBin/binary>> + end, + <<"federation: ", XNameBin/binary, " -> ", Node/binary, DownPart/binary>>. + +cycle_detection_node_identifier() -> + rabbit_nodes:cluster_name(). + +upstream_exchange_name(UpstreamQName, Suffix) -> + <<UpstreamQName/binary, " ", Suffix/binary>>. + +delete_upstream_exchange(Conn, XNameBin) -> + rabbit_federation_link_util:disposable_channel_call( + Conn, #'exchange.delete'{exchange = XNameBin}). + +delete_upstream_queue(Conn, Queue) -> + rabbit_federation_link_util:disposable_channel_call( + Conn, #'queue.delete'{queue = Queue}). + +update_routing_headers(#upstream_params{table = Table}, UpstreamName, UVhost, Redelivered, Headers) -> + NewValue = Table ++ + [{<<"redelivered">>, bool, Redelivered}] ++ + header_for_upstream_name(UpstreamName) ++ + header_for_upstream_vhost(UVhost), + rabbit_basic:prepend_table_header(?ROUTING_HEADER, NewValue, Headers). + +header_for_upstream_name(unknown) -> []; +header_for_upstream_name(Name) -> [{<<"cluster-name">>, longstr, Name}]. + +header_for_upstream_vhost(unknown) -> []; +header_for_upstream_vhost(Name) -> [{<<"vhost">>, longstr, Name}]. + +get_hops(Table) -> + case rabbit_misc:table_lookup(Table, <<"hops">>) of + %% see rabbit_binary_generator + {short, N} -> N; + {long, N} -> N; + {byte, N} -> N; + {signedint, N} -> N; + {unsignedbyte, N} -> N; + {unsignedshort, N} -> N; + {unsignedint, N} -> N; + {_, N} when is_integer(N) andalso N >= 0 -> N + end. + +handle_down(DCh, Reason, _Ch, _CmdCh, DCh, Args, State) -> + rabbit_federation_link_util:handle_downstream_down(Reason, Args, State); +handle_down(ChPid, Reason, Ch, CmdCh, _DCh, Args, State) + when ChPid =:= Ch; ChPid =:= CmdCh -> + rabbit_federation_link_util:handle_upstream_down(Reason, Args, State). diff --git a/deps/rabbitmq_federation/src/rabbit_federation_exchange_link_sup_sup.erl b/deps/rabbitmq_federation/src/rabbit_federation_exchange_link_sup_sup.erl new file mode 100644 index 0000000000..fda76a5070 --- /dev/null +++ b/deps/rabbitmq_federation/src/rabbit_federation_exchange_link_sup_sup.erl @@ -0,0 +1,75 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. We need +%% different handling here since exchanges want a mirrored sup. + +-export([start_link/0, start_child/1, adjust/1, stop_child/1]). +-export([init/1]). + +%%---------------------------------------------------------------------------- + +start_link() -> + mirrored_supervisor:start_link({local, ?SUPERVISOR}, ?SUPERVISOR, + fun rabbit_misc:execute_mnesia_transaction/1, + ?MODULE, []). + +%% Note that the next supervisor down, rabbit_federation_link_sup, is common +%% between exchanges and queues. +start_child(X) -> + case mirrored_supervisor:start_child( + ?SUPERVISOR, + {id(X), {rabbit_federation_link_sup, start_link, [X]}, + transient, ?SUPERVISOR_WAIT, supervisor, + [rabbit_federation_link_sup]}) of + {ok, _Pid} -> ok; + {error, {already_started, _Pid}} -> + #exchange{name = ExchangeName} = X, + rabbit_log_federation:debug("Federation link for exchange ~p was already started", + [rabbit_misc:rs(ExchangeName)]), + ok; + %% A link returned {stop, gone}, the link_sup shut down, that's OK. + {error, {shutdown, _}} -> ok + end. + +adjust({clear_upstream, VHost, UpstreamName}) -> + [rabbit_federation_link_sup:adjust(Pid, X, {clear_upstream, UpstreamName}) || + {#exchange{name = Name} = X, Pid, _, _} <- mirrored_supervisor:which_children(?SUPERVISOR), + Name#resource.virtual_host == VHost], + ok; +adjust(Reason) -> + [rabbit_federation_link_sup:adjust(Pid, X, Reason) || + {X, Pid, _, _} <- mirrored_supervisor:which_children(?SUPERVISOR)], + ok. + +stop_child(X) -> + case mirrored_supervisor:terminate_child(?SUPERVISOR, id(X)) of + ok -> ok; + {error, Err} -> + #exchange{name = ExchangeName} = X, + rabbit_log_federation:warning( + "Attempt to stop a federation link for exchange ~p failed: ~p", + [rabbit_misc:rs(ExchangeName), Err]), + ok + end, + ok = mirrored_supervisor:delete_child(?SUPERVISOR, id(X)). + +%%---------------------------------------------------------------------------- + +init([]) -> + {ok, {{one_for_one, 1200, 60}, []}}. + +%% See comment in rabbit_federation_queue_link_sup_sup:id/1 +id(X = #exchange{policy = Policy}) -> X1 = rabbit_exchange:immutable(X), + X1#exchange{policy = Policy}. diff --git a/deps/rabbitmq_federation/src/rabbit_federation_link_sup.erl b/deps/rabbitmq_federation/src/rabbit_federation_link_sup.erl new file mode 100644 index 0000000000..27d1b50277 --- /dev/null +++ b/deps/rabbitmq_federation/src/rabbit_federation_link_sup.erl @@ -0,0 +1,109 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. All rights reserved. +%% + +-module(rabbit_federation_link_sup). + +-behaviour(supervisor2). + +-include_lib("rabbit_common/include/rabbit.hrl"). +-include_lib("rabbit/include/amqqueue.hrl"). +-include("rabbit_federation.hrl"). + +%% Supervises the upstream links for an exchange or queue. + +-export([start_link/1, adjust/3, restart/2]). +-export([init/1]). + +start_link(XorQ) -> + supervisor2:start_link(?MODULE, XorQ). + +adjust(Sup, XorQ, everything) -> + [stop(Sup, Upstream, XorQ) || + {Upstream, _, _, _} <- supervisor2:which_children(Sup)], + [{ok, _Pid} = supervisor2:start_child(Sup, Spec) || Spec <- specs(XorQ)]; + +adjust(Sup, XorQ, {upstream, UpstreamName}) -> + OldUpstreams0 = children(Sup, UpstreamName), + NewUpstreams0 = rabbit_federation_upstream:for(XorQ, UpstreamName), + %% If any haven't changed, don't restart them. The broker will + %% avoid telling us about connections that have not changed + %% syntactically, but even if one has, this XorQ may not have that + %% connection in an upstream, so we still need to check here. + {OldUpstreams, NewUpstreams} = + lists:foldl( + fun (OldU, {OldUs, NewUs}) -> + case lists:member(OldU, NewUs) of + true -> {OldUs -- [OldU], NewUs -- [OldU]}; + false -> {OldUs, NewUs} + end + end, {OldUpstreams0, NewUpstreams0}, OldUpstreams0), + [stop(Sup, OldUpstream, XorQ) || OldUpstream <- OldUpstreams], + [start(Sup, NewUpstream, XorQ) || NewUpstream <- NewUpstreams]; + +adjust(Sup, XorQ, {clear_upstream, UpstreamName}) -> + ok = rabbit_federation_db:prune_scratch( + name(XorQ), rabbit_federation_upstream:for(XorQ)), + [stop(Sup, Upstream, XorQ) || Upstream <- children(Sup, UpstreamName)]; + +adjust(Sup, X = #exchange{name = XName}, {upstream_set, _Set}) -> + adjust(Sup, X, everything), + case rabbit_federation_upstream:federate(X) of + false -> ok; + true -> ok = rabbit_federation_db:prune_scratch( + XName, rabbit_federation_upstream:for(X)) + end; +adjust(Sup, Q, {upstream_set, _}) when ?is_amqqueue(Q) -> + adjust(Sup, Q, everything); +adjust(Sup, XorQ, {clear_upstream_set, _}) -> + adjust(Sup, XorQ, everything). + +restart(Sup, Upstream) -> + ok = supervisor2:terminate_child(Sup, Upstream), + {ok, _Pid} = supervisor2:restart_child(Sup, Upstream), + ok. + +start(Sup, Upstream, XorQ) -> + {ok, _Pid} = supervisor2:start_child(Sup, spec(rabbit_federation_util:obfuscate_upstream(Upstream), XorQ)), + ok. + +stop(Sup, Upstream, XorQ) -> + ok = supervisor2:terminate_child(Sup, Upstream), + ok = supervisor2:delete_child(Sup, Upstream), + %% While the link will report its own removal, that only works if + %% the link was actually up. If the link was broken and failing to + %% come up, the possibility exists that there *is* no link + %% process, but we still have a report in the status table. So + %% remove it here too. + rabbit_federation_status:remove(Upstream, name(XorQ)). + +children(Sup, UpstreamName) -> + rabbit_federation_util:find_upstreams( + UpstreamName, [U || {U, _, _, _} <- supervisor2:which_children(Sup)]). + +%%---------------------------------------------------------------------------- + +init(XorQ) -> + %% 1, ?MAX_WAIT so that we always give up after one fast retry and get + %% into the reconnect delay. + {ok, {{one_for_one, 1, ?MAX_WAIT}, specs(XorQ)}}. + +specs(XorQ) -> + [spec(rabbit_federation_util:obfuscate_upstream(Upstream), XorQ) + || Upstream <- rabbit_federation_upstream:for(XorQ)]. + +spec(U = #upstream{reconnect_delay = Delay}, #exchange{name = XName}) -> + {U, {rabbit_federation_exchange_link, start_link, [{U, XName}]}, + {permanent, Delay}, ?WORKER_WAIT, worker, + [rabbit_federation_exchange_link]}; + +spec(Upstream = #upstream{reconnect_delay = Delay}, Q) when ?is_amqqueue(Q) -> + {Upstream, {rabbit_federation_queue_link, start_link, [{Upstream, Q}]}, + {permanent, Delay}, ?WORKER_WAIT, worker, + [rabbit_federation_queue_link]}. + +name(#exchange{name = XName}) -> XName; +name(Q) when ?is_amqqueue(Q) -> amqqueue:get_name(Q). diff --git a/deps/rabbitmq_federation/src/rabbit_federation_link_util.erl b/deps/rabbitmq_federation/src/rabbit_federation_link_util.erl new file mode 100644 index 0000000000..a5fd560f0b --- /dev/null +++ b/deps/rabbitmq_federation/src/rabbit_federation_link_util.erl @@ -0,0 +1,364 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. All rights reserved. +%% + +-module(rabbit_federation_link_util). + +-include_lib("rabbit/include/amqqueue.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). +-include("rabbit_federation.hrl"). + +%% real +-export([start_conn_ch/5, disposable_channel_call/2, disposable_channel_call/3, + disposable_connection_call/3, ensure_connection_closed/1, + log_terminate/4, unacked_new/0, ack/3, nack/3, forward/9, + handle_downstream_down/3, handle_upstream_down/3, + get_connection_name/2, log_debug/3, log_info/3, log_warning/3, + log_error/3]). + +%% temp +-export([connection_error/6]). + +-import(rabbit_misc, [pget/2]). + +-define(MAX_CONNECTION_CLOSE_TIMEOUT, 10000). + +%%---------------------------------------------------------------------------- + +start_conn_ch(Fun, OUpstream, OUParams, + XorQName = #resource{virtual_host = DownVHost}, State) -> + + Upstream = rabbit_federation_util:deobfuscate_upstream(OUpstream), + UParams = rabbit_federation_util:deobfuscate_upstream_params(OUParams), + + ConnName = get_connection_name(Upstream, UParams), + case open_monitor(#amqp_params_direct{virtual_host = DownVHost}, ConnName) of + {ok, DConn, DCh} -> + case Upstream#upstream.ack_mode of + 'on-confirm' -> + #'confirm.select_ok'{} = + amqp_channel:call(DCh, #'confirm.select'{}), + amqp_channel:register_confirm_handler(DCh, self()); + _ -> + ok + end, + case open_monitor(UParams#upstream_params.params, ConnName) of + {ok, Conn, Ch} -> + %% Don't trap exits until we have established + %% connections so that if we try to delete + %% federation upstreams while waiting for a + %% connection to be established then we don't + %% block + process_flag(trap_exit, true), + try + R = Fun(Conn, Ch, DConn, DCh), + log_info( + XorQName, "connected to ~s~n", + [rabbit_federation_upstream:params_to_string( + UParams)]), + Name = pget(name, amqp_connection:info(DConn, [name])), + rabbit_federation_status:report( + OUpstream, OUParams, XorQName, {running, Name}), + R + catch exit:E -> + %% terminate/2 will not get this, as we + %% have not put them in our state yet + ensure_connection_closed(DConn), + ensure_connection_closed(Conn), + connection_error(remote_start, E, + OUpstream, OUParams, XorQName, State) + end; + E -> + ensure_connection_closed(DConn), + connection_error(remote_start, E, + OUpstream, OUParams, XorQName, State) + end; + E -> + connection_error(local_start, E, + OUpstream, OUParams, XorQName, State) + end. + +get_connection_name(#upstream{name = UpstreamName}, + #upstream_params{x_or_q = Resource}) when is_record(Resource, exchange)-> + Policy = Resource#exchange.policy, + PolicyName = proplists:get_value(name, Policy), + connection_name(UpstreamName, PolicyName); + +get_connection_name(#upstream{name = UpstreamName}, + #upstream_params{x_or_q = Resource}) when ?is_amqqueue(Resource) -> + Policy = amqqueue:get_policy(Resource), + PolicyName = proplists:get_value(name, Policy), + connection_name(UpstreamName, PolicyName); + +get_connection_name(_, _) -> + connection_name(undefined, undefined). + +connection_name(Upstream, Policy) when is_binary(Upstream), is_binary(Policy) -> + <<<<"Federation link (upstream: ">>/binary, Upstream/binary, <<", policy: ">>/binary, Policy/binary, <<")">>/binary>>; +connection_name(_, _) -> + <<"Federation link">>. + +open_monitor(Params, Name) -> + case open(Params, Name) of + {ok, Conn, Ch} -> erlang:monitor(process, Ch), + {ok, Conn, Ch}; + E -> E + end. + +open(Params, Name) -> + try + amqp_connection:start(Params, Name) + of + {ok, Conn} -> + try + amqp_connection:open_channel(Conn) + of + {ok, Ch} -> {ok, Conn, Ch}; + E -> ensure_connection_closed(Conn), + E + catch + _:E -> + ensure_connection_closed(Conn), + E + end; + E -> E + catch + _:E -> E + end. + +ensure_channel_closed(Ch) -> catch amqp_channel:close(Ch). + +ensure_connection_closed(Conn) -> + catch amqp_connection:close(Conn, ?MAX_CONNECTION_CLOSE_TIMEOUT). + +connection_error(remote_start, {{shutdown, {server_initiated_close, Code, Message}}, _} = E, + Upstream, UParams, XorQName, State) -> + rabbit_federation_status:report( + Upstream, UParams, XorQName, clean_reason(E)), + log_warning(XorQName, + "did not connect to ~s. Server has closed the connection due to an error, code: ~p, " + "message: ~s", + [rabbit_federation_upstream:params_to_string(UParams), + Code, Message]), + {stop, {shutdown, restart}, State}; + +connection_error(remote_start, E, Upstream, UParams, XorQName, State) -> + rabbit_federation_status:report( + Upstream, UParams, XorQName, clean_reason(E)), + log_warning(XorQName, "did not connect to ~s. Reason: ~p", + [rabbit_federation_upstream:params_to_string(UParams), + E]), + {stop, {shutdown, restart}, State}; + +connection_error(remote, E, Upstream, UParams, XorQName, State) -> + rabbit_federation_status:report( + Upstream, UParams, XorQName, clean_reason(E)), + log_info(XorQName, "disconnected from ~s~n~p", + [rabbit_federation_upstream:params_to_string(UParams), E]), + {stop, {shutdown, restart}, State}; + +connection_error(command_channel, E, Upstream, UParams, XorQName, State) -> + rabbit_federation_status:report( + Upstream, UParams, XorQName, clean_reason(E)), + log_info(XorQName, "failed to open a command channel for upstream ~s~n~p", + [rabbit_federation_upstream:params_to_string(UParams), E]), + {stop, {shutdown, restart}, State}; + +connection_error(local, basic_cancel, Upstream, UParams, XorQName, State) -> + rabbit_federation_status:report( + Upstream, UParams, XorQName, {error, basic_cancel}), + log_info(XorQName, "received a 'basic.cancel'", []), + {stop, {shutdown, restart}, State}; + +connection_error(local_start, E, Upstream, UParams, XorQName, State) -> + rabbit_federation_status:report( + Upstream, UParams, XorQName, clean_reason(E)), + log_warning(XorQName, "did not connect locally~n~p", [E]), + {stop, {shutdown, restart}, State}. + +%% If we terminate due to a gen_server call exploding (almost +%% certainly due to an amqp_channel:call() exploding) then we do not +%% want to report the gen_server call in our status. +clean_reason({E = {shutdown, _}, _}) -> E; +clean_reason(E) -> E. + +%% local / disconnected never gets invoked, see handle_info({'DOWN', ... + +%%---------------------------------------------------------------------------- + +unacked_new() -> gb_trees:empty(). + +ack(#'basic.ack'{delivery_tag = Seq, + multiple = Multiple}, Ch, Unack) -> + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = gb_trees:get(Seq, Unack), + multiple = Multiple}), + remove_delivery_tags(Seq, Multiple, Unack). + + +%% Note: at time of writing the broker will never send requeue=false. And it's +%% hard to imagine why it would. But we may as well handle it. +nack(#'basic.nack'{delivery_tag = Seq, + multiple = Multiple, + requeue = Requeue}, Ch, Unack) -> + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = gb_trees:get(Seq, Unack), + multiple = Multiple, + requeue = Requeue}), + remove_delivery_tags(Seq, Multiple, Unack). + +remove_delivery_tags(Seq, false, Unacked) -> + gb_trees:delete(Seq, Unacked); +remove_delivery_tags(Seq, true, Unacked) -> + case gb_trees:is_empty(Unacked) of + true -> Unacked; + false -> {Smallest, _Val, Unacked1} = gb_trees:take_smallest(Unacked), + case Smallest > Seq of + true -> Unacked; + false -> remove_delivery_tags(Seq, true, Unacked1) + end + end. + +forward(#upstream{ack_mode = AckMode, + trust_user_id = Trust}, + #'basic.deliver'{delivery_tag = DT}, + Ch, DCh, PublishMethod, HeadersFun, ForwardFun, Msg, Unacked) -> + Headers = extract_headers(Msg), + case ForwardFun(Headers) of + true -> Msg1 = maybe_clear_user_id( + Trust, update_headers(HeadersFun(Headers), Msg)), + Seq = case AckMode of + 'on-confirm' -> amqp_channel:next_publish_seqno(DCh); + _ -> ignore + end, + amqp_channel:cast(DCh, PublishMethod, Msg1), + case AckMode of + 'on-confirm' -> + gb_trees:insert(Seq, DT, Unacked); + 'on-publish' -> + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DT}), + Unacked; + 'no-ack' -> + Unacked + end; + false -> amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DT}), + %% Drop it, but acknowledge it! + Unacked + end. + +maybe_clear_user_id(false, Msg = #amqp_msg{props = Props}) -> + Msg#amqp_msg{props = Props#'P_basic'{user_id = undefined}}; +maybe_clear_user_id(true, Msg) -> + Msg. + +extract_headers(#amqp_msg{props = #'P_basic'{headers = Headers}}) -> + Headers. + +update_headers(Headers, Msg = #amqp_msg{props = Props}) -> + Msg#amqp_msg{props = Props#'P_basic'{headers = Headers}}. + +%%---------------------------------------------------------------------------- + +%% If the downstream channel shuts down cleanly, we can just ignore it +%% - we're the same node, we're presumably about to go down too. +handle_downstream_down(shutdown, _Args, State) -> + {noreply, State}; + +handle_downstream_down(Reason, _Args, State) -> + {stop, {downstream_channel_down, Reason}, State}. + +%% If the upstream channel goes down for an intelligible reason, just +%% log it and die quietly. +handle_upstream_down({shutdown, Reason}, {Upstream, UParams, XName}, State) -> + rabbit_federation_link_util:connection_error( + remote, {upstream_channel_down, Reason}, Upstream, UParams, XName, State); + +handle_upstream_down(Reason, _Args, State) -> + {stop, {upstream_channel_down, Reason}, State}. + +%%---------------------------------------------------------------------------- + +log_terminate(gone, _Upstream, _UParams, _XorQName) -> + %% the link cannot start, this has been logged already + ok; +log_terminate({shutdown, restart}, _Upstream, _UParams, _XorQName) -> + %% We've already logged this before munging the reason + ok; +log_terminate(shutdown, Upstream, UParams, XorQName) -> + %% The supervisor is shutting us down; we are probably restarting + %% the link because configuration has changed. So try to shut down + %% nicely so that we do not cause unacked messages to be + %% redelivered. + log_info(XorQName, "disconnecting from ~s~n", + [rabbit_federation_upstream:params_to_string(UParams)]), + rabbit_federation_status:remove(Upstream, XorQName); + +log_terminate(Reason, Upstream, UParams, XorQName) -> + %% Unexpected death. sasl will log it, but we should update + %% rabbit_federation_status. + rabbit_federation_status:report( + Upstream, UParams, XorQName, clean_reason(Reason)). + +log_debug(XorQName, Fmt, Args) -> log(debug, XorQName, Fmt, Args). +log_info(XorQName, Fmt, Args) -> log(info, XorQName, Fmt, Args). +log_warning(XorQName, Fmt, Args) -> log(warning, XorQName, Fmt, Args). +log_error(XorQName, Fmt, Args) -> log(error, XorQName, Fmt, Args). + +log(Level, XorQName, Fmt0, Args0) -> + Fmt = "Federation ~s " ++ Fmt0, + Args = [rabbit_misc:rs(XorQName) | Args0], + case Level of + debug -> rabbit_log_federation:debug(Fmt, Args); + info -> rabbit_log_federation:info(Fmt, Args); + warning -> rabbit_log_federation:warning(Fmt, Args); + error -> rabbit_log_federation:error(Fmt, Args) + end. + +%%---------------------------------------------------------------------------- + +disposable_channel_call(Conn, Method) -> + disposable_channel_call(Conn, Method, fun(_, _) -> ok end). + +disposable_channel_call(Conn, Method, ErrFun) -> + try + {ok, Ch} = amqp_connection:open_channel(Conn), + try + amqp_channel:call(Ch, Method) + catch exit:{{shutdown, {server_initiated_close, Code, Message}}, _} -> + ErrFun(Code, Message) + after + ensure_channel_closed(Ch) + end + catch + Exception:Reason -> + rabbit_log_federation:error("Federation link could not create a disposable (one-off) channel due to an error ~p: ~p~n", [Exception, Reason]) + end. + +disposable_connection_call(Params, Method, ErrFun) -> + try + rabbit_log_federation:debug("Disposable connection parameters: ~p", [Params]), + case open(Params, <<"Disposable exchange federation link connection">>) of + {ok, Conn, Ch} -> + try + amqp_channel:call(Ch, Method) + catch exit:{{shutdown, {connection_closing, {server_initiated_close, Code, Message}}}, _} -> + ErrFun(Code, Message); + exit:{{shutdown, {server_initiated_close, Code, Message}}, _} -> + ErrFun(Code, Message) + after + ensure_connection_closed(Conn) + end; + {error, {auth_failure, Message}} -> + rabbit_log_federation:error("Federation link could not open a disposable (one-off) connection " + "due to an authentication failure: ~s~n", [Message]); + Error -> + rabbit_log_federation:error("Federation link could not open a disposable (one-off) connection, " + "reason: ~p~n", [Error]), + Error + end + catch + Exception:Reason -> + rabbit_log_federation:error("Federation link could not create a disposable (one-off) connection " + "due to an error ~p: ~p~n", [Exception, Reason]) + end. diff --git a/deps/rabbitmq_federation/src/rabbit_federation_parameters.erl b/deps/rabbitmq_federation/src/rabbit_federation_parameters.erl new file mode 100644 index 0000000000..928e41dc0f --- /dev/null +++ b/deps/rabbitmq_federation/src/rabbit_federation_parameters.erl @@ -0,0 +1,139 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. All rights reserved. +%% + +-module(rabbit_federation_parameters). +-behaviour(rabbit_runtime_parameter). +-behaviour(rabbit_policy_validator). + +-include_lib("rabbit_common/include/rabbit.hrl"). + +-export([validate/5, notify/5, notify_clear/4]). +-export([register/0, unregister/0, validate_policy/1, adjust/1]). + +-define(RUNTIME_PARAMETERS, + [{runtime_parameter, <<"federation">>}, + {runtime_parameter, <<"federation-upstream">>}, + {runtime_parameter, <<"federation-upstream-set">>}, + {policy_validator, <<"federation-upstream">>}, + {policy_validator, <<"federation-upstream-pattern">>}, + {policy_validator, <<"federation-upstream-set">>}]). + +-rabbit_boot_step({?MODULE, + [{description, "federation parameters"}, + {mfa, {rabbit_federation_parameters, register, []}}, + {requires, rabbit_registry}, + {cleanup, {rabbit_federation_parameters, unregister, []}}, + {enables, recovery}]}). + +register() -> + [rabbit_registry:register(Class, Name, ?MODULE) || + {Class, Name} <- ?RUNTIME_PARAMETERS], + ok. + +unregister() -> + [rabbit_registry:unregister(Class, Name) || + {Class, Name} <- ?RUNTIME_PARAMETERS], + ok. + +validate(_VHost, <<"federation-upstream-set">>, Name, Term0, _User) -> + Term = [rabbit_data_coercion:to_proplist(Upstream) || Upstream <- Term0], + [rabbit_parameter_validation:proplist( + Name, + [{<<"upstream">>, fun rabbit_parameter_validation:binary/2, mandatory} | + shared_validation()], Upstream) + || Upstream <- Term]; + +validate(_VHost, <<"federation-upstream">>, Name, Term0, _User) -> + Term = rabbit_data_coercion:to_proplist(Term0), + rabbit_parameter_validation:proplist( + Name, [{<<"uri">>, fun validate_uri/2, mandatory} | + shared_validation()], Term); + +validate(_VHost, _Component, Name, _Term, _User) -> + {error, "name not recognised: ~p", [Name]}. + +notify(_VHost, <<"federation-upstream-set">>, Name, _Term, _Username) -> + adjust({upstream_set, Name}); + +notify(_VHost, <<"federation-upstream">>, Name, _Term, _Username) -> + adjust({upstream, Name}). + +notify_clear(_VHost, <<"federation-upstream-set">>, Name, _Username) -> + adjust({clear_upstream_set, Name}); + +notify_clear(VHost, <<"federation-upstream">>, Name, _Username) -> + rabbit_federation_exchange_link_sup_sup:adjust({clear_upstream, VHost, Name}), + rabbit_federation_queue_link_sup_sup:adjust({clear_upstream, VHost, Name}). + +adjust(Thing) -> + rabbit_federation_exchange_link_sup_sup:adjust(Thing), + rabbit_federation_queue_link_sup_sup:adjust(Thing). + +%%---------------------------------------------------------------------------- + +shared_validation() -> + [{<<"exchange">>, fun rabbit_parameter_validation:binary/2, optional}, + {<<"queue">>, fun rabbit_parameter_validation:binary/2, optional}, + {<<"consumer-tag">>, fun rabbit_parameter_validation:binary/2, optional}, + {<<"prefetch-count">>, fun rabbit_parameter_validation:number/2, optional}, + {<<"reconnect-delay">>,fun rabbit_parameter_validation:number/2, optional}, + {<<"max-hops">>, fun rabbit_parameter_validation:number/2, optional}, + {<<"expires">>, fun rabbit_parameter_validation:number/2, optional}, + {<<"message-ttl">>, fun rabbit_parameter_validation:number/2, optional}, + {<<"trust-user-id">>, fun rabbit_parameter_validation:boolean/2, optional}, + {<<"ack-mode">>, rabbit_parameter_validation:enum( + ['no-ack', 'on-publish', 'on-confirm']), optional}, + {<<"resource-cleanup-mode">>, rabbit_parameter_validation:enum(['default', 'never']), optional}, + {<<"ha-policy">>, fun rabbit_parameter_validation:binary/2, optional}, + {<<"bind-nowait">>, fun rabbit_parameter_validation:boolean/2, optional}]. + +validate_uri(Name, Term) when is_binary(Term) -> + case rabbit_parameter_validation:binary(Name, Term) of + ok -> case amqp_uri:parse(binary_to_list(Term)) of + {ok, _} -> ok; + {error, E} -> {error, "\"~s\" not a valid URI: ~p", [Term, E]} + end; + E -> E + end; +validate_uri(Name, Term) -> + case rabbit_parameter_validation:list(Name, Term) of + ok -> case [V || U <- Term, + V <- [validate_uri(Name, U)], + element(1, V) =:= error] of + [] -> ok; + [E | _] -> E + end; + E -> E + end. + +%%---------------------------------------------------------------------------- + +validate_policy([{<<"federation-upstream-set">>, Value}]) + when is_binary(Value) -> + ok; +validate_policy([{<<"federation-upstream-set">>, Value}]) -> + {error, "~p is not a valid federation upstream set name", [Value]}; + +validate_policy([{<<"federation-upstream-pattern">>, Value}]) + when is_binary(Value) -> + case re:compile(Value) of + {ok, _} -> ok; + {error, Reason} -> {error, "could not compile pattern ~s to a regular expression. " + "Error: ~p", [Value, Reason]} + end; +validate_policy([{<<"federation-upstream-pattern">>, Value}]) -> + {error, "~p is not a valid federation upstream pattern name", [Value]}; + +validate_policy([{<<"federation-upstream">>, Value}]) + when is_binary(Value) -> + ok; +validate_policy([{<<"federation-upstream">>, Value}]) -> + {error, "~p is not a valid federation upstream name", [Value]}; + +validate_policy(L) when length(L) >= 2 -> + {error, "cannot specify federation-upstream, federation-upstream-set " + "or federation-upstream-pattern together", []}. diff --git a/deps/rabbitmq_federation/src/rabbit_federation_queue.erl b/deps/rabbitmq_federation/src/rabbit_federation_queue.erl new file mode 100644 index 0000000000..3117792589 --- /dev/null +++ b/deps/rabbitmq_federation/src/rabbit_federation_queue.erl @@ -0,0 +1,111 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. All rights reserved. +%% + +-module(rabbit_federation_queue). + +-rabbit_boot_step({?MODULE, + [{description, "federation queue decorator"}, + {mfa, {rabbit_queue_decorator, register, + [<<"federation">>, ?MODULE]}}, + {requires, rabbit_registry}, + {cleanup, {rabbit_queue_decorator, unregister, + [<<"federation">>]}}, + {enables, recovery}]}). + +-include_lib("rabbit/include/amqqueue.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). +-include("rabbit_federation.hrl"). + +-behaviour(rabbit_queue_decorator). + +-export([startup/1, shutdown/1, policy_changed/2, active_for/1, + consumer_state_changed/3]). +-export([policy_changed_local/2]). + +%%---------------------------------------------------------------------------- + +startup(Q) -> + case active_for(Q) of + true -> rabbit_federation_queue_link_sup_sup:start_child(Q); + false -> ok + end, + ok. + +shutdown(Q) when ?is_amqqueue(Q) -> + QName = amqqueue:get_name(Q), + case active_for(Q) of + true -> rabbit_federation_queue_link_sup_sup:stop_child(Q), + rabbit_federation_status:remove_exchange_or_queue(QName); + false -> ok + end, + ok. + +policy_changed(Q1, Q2) when ?is_amqqueue(Q1) -> + QName = amqqueue:get_name(Q1), + case rabbit_amqqueue:lookup(QName) of + {ok, Q0} when ?is_amqqueue(Q0) -> + QPid = amqqueue:get_pid(Q0), + rpc:call(node(QPid), rabbit_federation_queue, + policy_changed_local, [Q1, Q2]); + {error, not_found} -> + ok + end. + +policy_changed_local(Q1, Q2) -> + shutdown(Q1), + startup(Q2). + +active_for(Q) -> + Args = amqqueue:get_arguments(Q), + case rabbit_misc:table_lookup(Args, <<"x-internal-purpose">>) of + {longstr, _} -> false; %% [0] + _ -> rabbit_federation_upstream:federate(Q) + end. +%% [0] Currently the only "internal purpose" is federation, but I +%% suspect if we introduce another one it will also be for something +%% that doesn't want to be federated. + +%% We need to reconsider whether we need to run or pause every time +%% the consumer state changes in the queue. But why can the state +%% change? +%% +%% consumer blocked | We may have no more active consumers, and thus need to +%% | pause +%% | +%% consumer unblocked | We don't care +%% | +%% queue empty | The queue has become empty therefore we need to run to +%% | get more messages +%% | +%% basic consume | We don't care +%% | +%% basic cancel | We may have no more active consumers, and thus need to +%% | pause +%% | +%% refresh | We asked for it (we have started a new link after +%% | failover and need something to prod us into action +%% | (or not)). +%% +%% In the cases where we don't care it's not prohibitively expensive +%% for us to be here anyway, so never mind. +%% +%% Note that there is no "queue became non-empty" state change - that's +%% because of the queue invariant. If the queue transitions from empty to +%% non-empty then it must have no active consumers - in which case it stays +%% the same from our POV. + +consumer_state_changed(Q, MaxActivePriority, IsEmpty) -> + QName = amqqueue:get_name(Q), + case IsEmpty andalso active_unfederated(MaxActivePriority) of + true -> rabbit_federation_queue_link:run(QName); + false -> rabbit_federation_queue_link:pause(QName) + end, + ok. + +active_unfederated(empty) -> false; +active_unfederated(P) when P >= 0 -> true; +active_unfederated(_P) -> false. diff --git a/deps/rabbitmq_federation/src/rabbit_federation_queue_link.erl b/deps/rabbitmq_federation/src/rabbit_federation_queue_link.erl new file mode 100644 index 0000000000..97389cb8f6 --- /dev/null +++ b/deps/rabbitmq_federation/src/rabbit_federation_queue_link.erl @@ -0,0 +1,330 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_federation_queue_link). + +%% pg2 is deprecated in OTP 23. +-compile(nowarn_deprecated_function). + +-include_lib("rabbit/include/amqqueue.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). +-include("rabbit_federation.hrl"). + +-behaviour(gen_server2). + +-export([start_link/1, go/0, run/1, pause/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-import(rabbit_misc, [pget/2]). +-import(rabbit_federation_util, [name/1, pgname/1]). + +-record(not_started, {queue, run, upstream, upstream_params}). +-record(state, {queue, run, conn, ch, dconn, dch, upstream, upstream_params, + unacked}). + +start_link(Args) -> + gen_server2:start_link(?MODULE, Args, [{timeout, infinity}]). + +run(QName) -> cast(QName, run). +pause(QName) -> cast(QName, pause). +go() -> cast(go). + +%%---------------------------------------------------------------------------- +%%call(QName, Msg) -> [gen_server2:call(Pid, Msg, infinity) || Pid <- q(QName)]. +cast(Msg) -> [gen_server2:cast(Pid, Msg) || Pid <- all()]. +cast(QName, Msg) -> [gen_server2:cast(Pid, Msg) || Pid <- q(QName)]. + +join(Name) -> + pg2:create(pgname(Name)), + ok = pg2:join(pgname(Name), self()). + +all() -> + pg2:create(pgname(rabbit_federation_queues)), + pg2:get_members(pgname(rabbit_federation_queues)). + +q(QName) -> + pg2:create(pgname({rabbit_federation_queue, QName})), + pg2:get_members(pgname({rabbit_federation_queue, QName})). + +federation_up() -> + proplists:is_defined(rabbitmq_federation, + application:which_applications(infinity)). + +%%---------------------------------------------------------------------------- + +init({Upstream, Queue}) when ?is_amqqueue(Queue) -> + QName = amqqueue:get_name(Queue), + case rabbit_amqqueue:lookup(QName) of + {ok, Q} -> + DeobfuscatedUpstream = rabbit_federation_util:deobfuscate_upstream(Upstream), + DeobfuscatedUParams = rabbit_federation_upstream:to_params(DeobfuscatedUpstream, Queue), + UParams = rabbit_federation_util:obfuscate_upstream_params(DeobfuscatedUParams), + rabbit_federation_status:report(Upstream, UParams, QName, starting), + join(rabbit_federation_queues), + join({rabbit_federation_queue, QName}), + gen_server2:cast(self(), maybe_go), + rabbit_amqqueue:notify_decorators(Q), + {ok, #not_started{queue = Queue, + run = false, + upstream = Upstream, + upstream_params = UParams}}; + {error, not_found} -> + rabbit_federation_link_util:log_warning(QName, "not found, stopping link~n", []), + {stop, gone} + end. + +handle_call(Msg, _From, State) -> + {stop, {unexpected_call, Msg}, State}. + +handle_cast(maybe_go, State) -> + case federation_up() of + true -> go(State); + false -> {noreply, State} + end; + +handle_cast(go, State = #not_started{}) -> + go(State); + +handle_cast(go, State) -> + {noreply, State}; + +handle_cast(run, State = #state{upstream = Upstream, + upstream_params = UParams, + ch = Ch, + run = false}) -> + consume(Ch, Upstream, UParams#upstream_params.x_or_q), + {noreply, State#state{run = true}}; + +handle_cast(run, State = #not_started{}) -> + {noreply, State#not_started{run = true}}; + +handle_cast(run, State) -> + %% Already started + {noreply, State}; + +handle_cast(pause, State = #state{run = false}) -> + %% Already paused + {noreply, State}; + +handle_cast(pause, State = #not_started{}) -> + {noreply, State#not_started{run = false}}; + +handle_cast(pause, State = #state{ch = Ch, upstream = Upstream}) -> + cancel(Ch, Upstream), + {noreply, State#state{run = false}}; + +handle_cast(Msg, State) -> + {stop, {unexpected_cast, Msg}, State}. + +handle_info(#'basic.consume_ok'{}, State) -> + {noreply, State}; + +handle_info(#'basic.ack'{} = Ack, State = #state{ch = Ch, + unacked = Unacked}) -> + Unacked1 = rabbit_federation_link_util:ack(Ack, Ch, Unacked), + {noreply, State#state{unacked = Unacked1}}; + +handle_info(#'basic.nack'{} = Nack, State = #state{ch = Ch, + unacked = Unacked}) -> + Unacked1 = rabbit_federation_link_util:nack(Nack, Ch, Unacked), + {noreply, State#state{unacked = Unacked1}}; + +handle_info({#'basic.deliver'{redelivered = Redelivered, + exchange = X, + routing_key = K} = DeliverMethod, Msg}, + State = #state{queue = Q, + upstream = Upstream, + upstream_params = UParams, + ch = Ch, + dch = DCh, + unacked = Unacked}) when ?is_amqqueue(Q) -> + QName = amqqueue:get_name(Q), + PublishMethod = #'basic.publish'{exchange = <<"">>, + routing_key = QName#resource.name}, + HeadersFun = fun (H) -> update_headers(UParams, Redelivered, X, K, H) end, + ForwardFun = fun (_H) -> true end, + Unacked1 = rabbit_federation_link_util:forward( + Upstream, DeliverMethod, Ch, DCh, PublishMethod, + HeadersFun, ForwardFun, Msg, Unacked), + %% TODO actually we could reject when 'stopped' + {noreply, State#state{unacked = Unacked1}}; + +handle_info(#'basic.cancel'{}, + State = #state{queue = Q, + upstream = Upstream, + upstream_params = UParams}) when ?is_amqqueue(Q) -> + QName = amqqueue:get_name(Q), + rabbit_federation_link_util:connection_error( + local, basic_cancel, Upstream, UParams, QName, State); + +handle_info({'DOWN', _Ref, process, Pid, Reason}, + State = #state{dch = DCh, + ch = Ch, + upstream = Upstream, + upstream_params = UParams, + queue = Q}) when ?is_amqqueue(Q) -> + QName = amqqueue:get_name(Q), + handle_down(Pid, Reason, Ch, DCh, {Upstream, UParams, QName}, State); + +handle_info(Msg, State) -> + {stop, {unexpected_info, Msg}, State}. + +terminate(Reason, #not_started{upstream = Upstream, + upstream_params = UParams, + queue = Q}) when ?is_amqqueue(Q) -> + QName = amqqueue:get_name(Q), + rabbit_federation_link_util:log_terminate(Reason, Upstream, UParams, QName), + ok; + +terminate(Reason, #state{dconn = DConn, + conn = Conn, + upstream = Upstream, + upstream_params = UParams, + queue = Q}) when ?is_amqqueue(Q) -> + QName = amqqueue:get_name(Q), + rabbit_federation_link_util:ensure_connection_closed(DConn), + rabbit_federation_link_util:ensure_connection_closed(Conn), + rabbit_federation_link_util:log_terminate(Reason, Upstream, UParams, QName), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%---------------------------------------------------------------------------- + +go(S0 = #not_started{run = Run, + upstream = Upstream = #upstream{ + prefetch_count = Prefetch}, + upstream_params = UParams, + queue = Queue}) when ?is_amqqueue(Queue) -> + QName = amqqueue:get_name(Queue), + #upstream_params{x_or_q = UQueue} = UParams, + Durable = amqqueue:is_durable(UQueue), + AutoDelete = amqqueue:is_auto_delete(UQueue), + Args = amqqueue:get_arguments(UQueue), + Unacked = rabbit_federation_link_util:unacked_new(), + rabbit_federation_link_util:start_conn_ch( + fun (Conn, Ch, DConn, DCh) -> + check_upstream_suitable(Conn), + amqp_channel:call(Ch, #'queue.declare'{queue = name(UQueue), + durable = Durable, + auto_delete = AutoDelete, + arguments = Args}), + case Upstream#upstream.ack_mode of + 'no-ack' -> ok; + _ -> amqp_channel:call( + Ch, #'basic.qos'{prefetch_count = Prefetch}) + end, + amqp_selective_consumer:register_default_consumer(Ch, self()), + case Run of + true -> consume(Ch, Upstream, UQueue); + false -> ok + end, + {noreply, #state{queue = Queue, + run = Run, + conn = Conn, + ch = Ch, + dconn = DConn, + dch = DCh, + upstream = Upstream, + upstream_params = UParams, + unacked = Unacked}} + end, Upstream, UParams, QName, S0). + +check_upstream_suitable(Conn) -> + Props = pget(server_properties, + amqp_connection:info(Conn, [server_properties])), + {table, Caps} = rabbit_misc:table_lookup(Props, <<"capabilities">>), + case rabbit_misc:table_lookup(Caps, <<"consumer_priorities">>) of + {bool, true} -> ok; + _ -> exit({error, upstream_lacks_consumer_priorities}) + end. + +update_headers(UParams, Redelivered, X, K, undefined) -> + update_headers(UParams, Redelivered, X, K, []); + +update_headers(#upstream_params{table = Table}, Redelivered, X, K, Headers) -> + {Headers1, Count} = + case rabbit_misc:table_lookup(Headers, ?ROUTING_HEADER) of + undefined -> + %% We only want to record the original exchange and + %% routing key the first time a message gets + %% forwarded; after that it's known that they were + %% <<>> and QueueName respectively. + {init_x_original_source_headers(Headers, X, K), 0}; + {array, Been} -> + update_visit_count(Table, Been, Headers); + %% this means the header comes from the client + %% which re-published the message, most likely unintentionally. + %% We can't assume much about the value, so we simply ignore it. + _Other -> + {init_x_original_source_headers(Headers, X, K), 0} + end, + rabbit_basic:prepend_table_header( + ?ROUTING_HEADER, Table ++ [{<<"redelivered">>, bool, Redelivered}, + {<<"visit-count">>, long, Count + 1}], + swap_cc_header(Headers1)). + +init_x_original_source_headers(Headers, X, K) -> + rabbit_misc:set_table_value( + rabbit_misc:set_table_value( + Headers, <<"x-original-exchange">>, longstr, X), + <<"x-original-routing-key">>, longstr, K). + +update_visit_count(Table, Been, Headers) -> + {Found, Been1} = lists:partition( + fun(I) -> visit_match(I, Table) end, + Been), + C = case Found of + [] -> 0; + [{table, T}] -> case rabbit_misc:table_lookup( + T, <<"visit-count">>) of + {_, I} when is_number(I) -> I; + _ -> 0 + end + end, + {rabbit_misc:set_table_value( + Headers, ?ROUTING_HEADER, array, Been1), C}. + +swap_cc_header(Table) -> + [{case K of + <<"CC">> -> <<"x-original-cc">>; + _ -> K + end, T, V} || {K, T, V} <- Table]. + +visit_match({table, T}, Info) -> + lists:all(fun (K) -> + rabbit_misc:table_lookup(T, K) =:= + rabbit_misc:table_lookup(Info, K) + end, [<<"uri">>, <<"virtual_host">>, <<"queue">>]); +visit_match(_ ,_) -> + false. + +consumer_tag(#upstream{consumer_tag = ConsumerTag}) -> + ConsumerTag. + +consume(Ch, Upstream, UQueue) -> + ConsumerTag = consumer_tag(Upstream), + NoAck = Upstream#upstream.ack_mode =:= 'no-ack', + amqp_channel:cast( + Ch, #'basic.consume'{queue = name(UQueue), + no_ack = NoAck, + nowait = true, + consumer_tag = ConsumerTag, + arguments = [{<<"x-priority">>, long, -1}]}). + +cancel(Ch, Upstream) -> + ConsumerTag = consumer_tag(Upstream), + amqp_channel:cast(Ch, #'basic.cancel'{nowait = true, + consumer_tag = ConsumerTag}). + +handle_down(DCh, Reason, _Ch, DCh, Args, State) -> + rabbit_federation_link_util:handle_downstream_down(Reason, Args, State); +handle_down(Ch, Reason, Ch, _DCh, Args, State) -> + rabbit_federation_link_util:handle_upstream_down(Reason, Args, State). diff --git a/deps/rabbitmq_federation/src/rabbit_federation_queue_link_sup_sup.erl b/deps/rabbitmq_federation/src/rabbit_federation_queue_link_sup_sup.erl new file mode 100644 index 0000000000..1f6ec2b88f --- /dev/null +++ b/deps/rabbitmq_federation/src/rabbit_federation_queue_link_sup_sup.erl @@ -0,0 +1,87 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_federation_queue_link_sup_sup). + +-behaviour(mirrored_supervisor). + +-include_lib("rabbit_common/include/rabbit.hrl"). +-include_lib("rabbit/include/amqqueue.hrl"). +-define(SUPERVISOR, ?MODULE). + +%% Supervises the upstream links for all queues (but not exchanges). We need +%% different handling here since queues do not want a mirrored sup. + +-export([start_link/0, start_child/1, adjust/1, stop_child/1]). +-export([init/1]). + +%%---------------------------------------------------------------------------- + +start_link() -> + mirrored_supervisor:start_link({local, ?SUPERVISOR}, ?SUPERVISOR, + fun rabbit_misc:execute_mnesia_transaction/1, + ?MODULE, []). + +%% Note that the next supervisor down, rabbit_federation_link_sup, is common +%% between exchanges and queues. +start_child(Q) -> + case mirrored_supervisor:start_child( + ?SUPERVISOR, + {id(Q), {rabbit_federation_link_sup, start_link, [Q]}, + transient, ?SUPERVISOR_WAIT, supervisor, + [rabbit_federation_link_sup]}) of + {ok, _Pid} -> ok; + {error, {already_started, _Pid}} -> + QueueName = amqqueue:get_name(Q), + rabbit_log_federation:warning("Federation link for queue ~p was already started", + [rabbit_misc:rs(QueueName)]), + ok; + %% A link returned {stop, gone}, the link_sup shut down, that's OK. + {error, {shutdown, _}} -> ok + end. + + +adjust({clear_upstream, VHost, UpstreamName}) -> + [rabbit_federation_link_sup:adjust(Pid, Q, {clear_upstream, UpstreamName}) || + {Q, Pid, _, _} <- mirrored_supervisor:which_children(?SUPERVISOR), + ?amqqueue_vhost_equals(Q, VHost)], + ok; +adjust(Reason) -> + [rabbit_federation_link_sup:adjust(Pid, Q, Reason) || + {Q, Pid, _, _} <- mirrored_supervisor:which_children(?SUPERVISOR)], + ok. + +stop_child(Q) -> + case mirrored_supervisor:terminate_child(?SUPERVISOR, id(Q)) of + ok -> ok; + {error, Err} -> + QueueName = amqqueue:get_name(Q), + rabbit_log_federation:warning( + "Attempt to stop a federation link for queue ~p failed: ~p", + [rabbit_misc:rs(QueueName), Err]), + ok + end, + ok = mirrored_supervisor:delete_child(?SUPERVISOR, id(Q)). + +%%---------------------------------------------------------------------------- + +init([]) -> + {ok, {{one_for_one, 1200, 60}, []}}. + +%% Clean out all mutable aspects of the queue except policy. We need +%% to keep the entire queue around rather than just take its name +%% since we will want to know its policy to determine how to federate +%% it, and its immutable properties in case we want to redeclare it +%% upstream. We don't just take its name and look it up again since +%% that would introduce race conditions when policies change +%% frequently. Note that since we take down all the links and start +%% again when policies change, the policy will always be correct, so +%% we don't clear it out here and can trust it. +id(Q) when ?is_amqqueue(Q) -> + Policy = amqqueue:get_policy(Q), + Q1 = rabbit_amqqueue:immutable(Q), + amqqueue:set_policy(Q1, Policy). diff --git a/deps/rabbitmq_federation/src/rabbit_federation_status.erl b/deps/rabbitmq_federation/src/rabbit_federation_status.erl new file mode 100644 index 0000000000..04afec990d --- /dev/null +++ b/deps/rabbitmq_federation/src/rabbit_federation_status.erl @@ -0,0 +1,175 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_federation_status). +-behaviour(gen_server). + +-include_lib("amqp_client/include/amqp_client.hrl"). +-include("rabbit_federation.hrl"). + +-export([start_link/0]). + +-export([report/4, remove_exchange_or_queue/1, remove/2, status/0, lookup/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-import(rabbit_federation_util, [name/1]). + +-define(SERVER, ?MODULE). +-define(ETS_NAME, ?MODULE). + +-record(state, {}). +-record(entry, {key, uri, status, timestamp, id, supervisor, upstream}). + +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +report(Upstream, UParams, XorQName, Status) -> + [Supervisor | _] = get('$ancestors'), + gen_server:cast(?SERVER, {report, Supervisor, Upstream, UParams, XorQName, + Status, calendar:local_time()}). + +remove_exchange_or_queue(XorQName) -> + gen_server:call(?SERVER, {remove_exchange_or_queue, XorQName}, infinity). + +remove(Upstream, XorQName) -> + gen_server:call(?SERVER, {remove, Upstream, XorQName}, infinity). + +status() -> + gen_server:call(?SERVER, status, infinity). + +lookup(Id) -> + gen_server:call(?SERVER, {lookup, Id}, infinity). + +init([]) -> + ?ETS_NAME = ets:new(?ETS_NAME, + [named_table, {keypos, #entry.key}, private]), + {ok, #state{}}. + +handle_call({remove_exchange_or_queue, XorQName}, _From, State) -> + [link_gone(Entry) + || Entry <- ets:match_object(?ETS_NAME, match_entry(xorqkey(XorQName)))], + {reply, ok, State}; + +handle_call({remove, Upstream, XorQName}, _From, State) -> + case ets:match_object(?ETS_NAME, match_entry(key(XorQName, Upstream))) of + [Entry] -> link_gone(Entry); + [] -> ok + end, + {reply, ok, State}; + +handle_call({lookup, Id}, _From, State) -> + Link = case ets:match_object(?ETS_NAME, match_id(Id)) of + [Entry] -> + [{key, Entry#entry.key}, + {uri, Entry#entry.uri}, + {status, Entry#entry.status}, + {timestamp, Entry#entry.timestamp}, + {id, Entry#entry.id}, + {supervisor, Entry#entry.supervisor}, + {upstream, Entry#entry.upstream}]; + [] -> not_found + end, + {reply, Link, State}; + +handle_call(status, _From, State) -> + Entries = ets:tab2list(?ETS_NAME), + {reply, [format(Entry) || Entry <- Entries], State}. + +handle_cast({report, Supervisor, Upstream, #upstream_params{safe_uri = URI}, + XorQName, Status, Timestamp}, State) -> + Key = key(XorQName, Upstream), + Entry = #entry{key = Key, + status = Status, + uri = URI, + timestamp = Timestamp, + supervisor = Supervisor, + upstream = Upstream, + id = unique_id(Key)}, + true = ets:insert(?ETS_NAME, Entry), + rabbit_event:notify(federation_link_status, format(Entry)), + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +format(#entry{status = Status, + uri = URI, + timestamp = Timestamp} = Entry) -> + identity(Entry) ++ split_status(Status) ++ [{uri, URI}, + {timestamp, Timestamp}]. + +identity(#entry{key = {#resource{virtual_host = VHost, + kind = Type, + name = XorQNameBin}, + UpstreamName, UXorQNameBin}, + id = Id, + upstream = #upstream{consumer_tag = ConsumerTag}}) -> + case Type of + exchange -> [{exchange, XorQNameBin}, + {upstream_exchange, UXorQNameBin}]; + queue -> [{queue, XorQNameBin}, + {upstream_queue, UXorQNameBin}, + {consumer_tag, ConsumerTag}] + end ++ [{type, Type}, + {vhost, VHost}, + {upstream, UpstreamName}, + {id, Id}]. + +unique_id(Key = {#resource{}, UpName, ResName}) when is_binary(UpName), is_binary(ResName) -> + PHash = erlang:phash2(Key, 1 bsl 32), + << << case N >= 10 of + true -> N - 10 + $a; + false -> N + $0 end >> + || <<N:4>> <= <<PHash:32>> >>. + +split_status({running, ConnName}) -> [{status, running}, + {local_connection, ConnName}]; +split_status({Status, Error}) -> [{status, Status}, + {error, Error}]; +split_status(Status) when is_atom(Status) -> [{status, Status}]. + +link_gone(Entry) -> + rabbit_event:notify(federation_link_removed, identity(Entry)), + true = ets:delete_object(?ETS_NAME, Entry). + +%% We don't want to key off the entire upstream, bits of it may change +key(XName = #resource{kind = exchange}, #upstream{name = UpstreamName, + exchange_name = UXNameBin}) -> + {XName, UpstreamName, UXNameBin}; + +key(QName = #resource{kind = queue}, #upstream{name = UpstreamName, + queue_name = UQNameBin}) -> + {QName, UpstreamName, UQNameBin}. + +xorqkey(XorQName) -> + {XorQName, '_', '_'}. + +match_entry(Key) -> + #entry{key = Key, + uri = '_', + status = '_', + timestamp = '_', + id = '_', + supervisor = '_', + upstream = '_'}. + +match_id(Id) -> + #entry{key = '_', + uri = '_', + status = '_', + timestamp = '_', + id = Id, + supervisor = '_', + upstream = '_'}. diff --git a/deps/rabbitmq_federation/src/rabbit_federation_sup.erl b/deps/rabbitmq_federation/src/rabbit_federation_sup.erl new file mode 100644 index 0000000000..d3642b52c2 --- /dev/null +++ b/deps/rabbitmq_federation/src/rabbit_federation_sup.erl @@ -0,0 +1,63 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. There is just one of these. + +-include_lib("rabbit_common/include/rabbit.hrl"). +-define(SUPERVISOR, rabbit_federation_sup). + +-export([start_link/0, stop/0]). + +-export([init/1]). + +%% This supervisor needs to be part of the rabbit application since +%% a) it needs to be in place when exchange recovery takes place +%% b) it needs to go up and down with rabbit + +-rabbit_boot_step({rabbit_federation_supervisor, + [{description, "federation"}, + {mfa, {rabbit_sup, start_child, [?MODULE]}}, + {requires, kernel_ready}, + {cleanup, {?MODULE, stop, []}}, + {enables, rabbit_federation_exchange}, + {enables, rabbit_federation_queue}]}). + +%%---------------------------------------------------------------------------- + +start_link() -> + R = supervisor:start_link({local, ?SUPERVISOR}, ?MODULE, []), + rabbit_federation_event:add_handler(), + R. + +stop() -> + rabbit_federation_event:remove_handler(), + ok = supervisor:terminate_child(rabbit_sup, ?MODULE), + ok = supervisor:delete_child(rabbit_sup, ?MODULE). + +%%---------------------------------------------------------------------------- + +init([]) -> + Status = {status, {rabbit_federation_status, start_link, []}, + transient, ?WORKER_WAIT, worker, + [rabbit_federation_status]}, + XLinkSupSup = {x_links, + {rabbit_federation_exchange_link_sup_sup, start_link, []}, + transient, ?SUPERVISOR_WAIT, supervisor, + [rabbit_federation_exchange_link_sup_sup]}, + QLinkSupSup = {q_links, + {rabbit_federation_queue_link_sup_sup, start_link, []}, + transient, ?SUPERVISOR_WAIT, supervisor, + [rabbit_federation_queue_link_sup_sup]}, + %% with default reconnect-delay of 5 second, this supports up to + %% 100 links constantly failing and being restarted a minute + %% (or 200 links if reconnect-delay is 10 seconds, 600 with 30 seconds, + %% etc: N * (60/reconnect-delay) <= 1200) + {ok, {{one_for_one, 1200, 60}, [Status, XLinkSupSup, QLinkSupSup]}}. diff --git a/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl b/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl new file mode 100644 index 0000000000..e079b850b5 --- /dev/null +++ b/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl @@ -0,0 +1,164 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. All rights reserved. +%% + +-module(rabbit_federation_upstream). + +-include("rabbit_federation.hrl"). +-include_lib("rabbit/include/amqqueue.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). + +-export([federate/1, for/1, for/2, params_to_string/1, to_params/2]). +%% For testing +-export([from_set/2, from_pattern/2, remove_credentials/1]). + +-import(rabbit_misc, [pget/2, pget/3]). +-import(rabbit_federation_util, [name/1, vhost/1, r/1]). +-import(rabbit_data_coercion, [to_atom/1]). + +%%---------------------------------------------------------------------------- + +federate(XorQ) -> + rabbit_policy:get(<<"federation-upstream">>, XorQ) =/= undefined orelse + rabbit_policy:get(<<"federation-upstream-set">>, XorQ) =/= undefined orelse + rabbit_policy:get(<<"federation-upstream-pattern">>, XorQ) =/= undefined. + +for(XorQ) -> + case federate(XorQ) of + false -> []; + true -> from_set_contents(upstreams(XorQ), XorQ) + end. + +for(XorQ, UpstreamName) -> + case federate(XorQ) of + false -> []; + true -> rabbit_federation_util:find_upstreams( + UpstreamName, from_set_contents(upstreams(XorQ), XorQ)) + end. + +upstreams(XorQ) -> + UName = rabbit_policy:get(<<"federation-upstream">>, XorQ), + USetName = rabbit_policy:get(<<"federation-upstream-set">>, XorQ), + UPatternValue = rabbit_policy:get(<<"federation-upstream-pattern">>, XorQ), + %% Cannot define 2 at a time, see rabbit_federation_parameters:validate_policy/1 + case {UName, USetName, UPatternValue} of + {undefined, undefined, undefined} -> []; + {undefined, undefined, _} -> find_contents(UPatternValue, vhost(XorQ)); + {undefined, _, undefined} -> set_contents(USetName, vhost(XorQ)); + {_, undefined, undefined} -> [[{<<"upstream">>, UName}]] + end. + +params_table(SafeURI, XorQ) -> + Key = case XorQ of + #exchange{} -> <<"exchange">>; + Q when ?is_amqqueue(Q) -> <<"queue">> + end, + [{<<"uri">>, longstr, SafeURI}, + {Key, longstr, name(XorQ)}]. + +params_to_string(#upstream_params{safe_uri = SafeURI, + x_or_q = XorQ}) -> + print("~s on ~s", [rabbit_misc:rs(r(XorQ)), SafeURI]). + +remove_credentials(URI) -> + list_to_binary(amqp_uri:remove_credentials(binary_to_list(URI))). + +to_params(Upstream = #upstream{uris = URIs}, XorQ) -> + URI = lists:nth(rand:uniform(length(URIs)), URIs), + {ok, Params} = amqp_uri:parse(binary_to_list(URI), vhost(XorQ)), + XorQ1 = with_name(Upstream, vhost(Params), XorQ), + SafeURI = remove_credentials(URI), + #upstream_params{params = Params, + uri = URI, + x_or_q = XorQ1, + safe_uri = SafeURI, + table = params_table(SafeURI, XorQ)}. + +print(Fmt, Args) -> iolist_to_binary(io_lib:format(Fmt, Args)). + +from_set(SetName, XorQ) -> + from_set_contents(set_contents(SetName, vhost(XorQ)), XorQ). + +from_pattern(SetName, XorQ) -> + from_set_contents(find_contents(SetName, vhost(XorQ)), XorQ). + +set_contents(<<"all">>, VHost) -> + Upstreams0 = rabbit_runtime_parameters:list( + VHost, <<"federation-upstream">>), + Upstreams = [rabbit_data_coercion:to_list(U) || U <- Upstreams0], + [[{<<"upstream">>, pget(name, U)}] || U <- Upstreams]; + +set_contents(SetName, VHost) -> + case rabbit_runtime_parameters:value( + VHost, <<"federation-upstream-set">>, SetName) of + not_found -> []; + Set -> Set + end. + +find_contents(RegExp, VHost) -> + Upstreams0 = rabbit_runtime_parameters:list( + VHost, <<"federation-upstream">>), + Upstreams = [rabbit_data_coercion:to_list(U) || U <- Upstreams0, + re:run(pget(name, U), RegExp) =/= nomatch], + [[{<<"upstream">>, pget(name, U)}] || U <- Upstreams]. + +from_set_contents(Set, XorQ) -> + Results = [from_set_element(P, XorQ) || P <- Set], + [R || R <- Results, R =/= not_found]. + +from_set_element(UpstreamSetElem0, XorQ) -> + UpstreamSetElem = rabbit_data_coercion:to_proplist(UpstreamSetElem0), + Name = bget(upstream, UpstreamSetElem, []), + case rabbit_runtime_parameters:value( + vhost(XorQ), <<"federation-upstream">>, Name) of + not_found -> not_found; + Upstream -> from_upstream_or_set( + UpstreamSetElem, Name, Upstream, XorQ) + end. + +from_upstream_or_set(US, Name, U, XorQ) -> + URIParam = bget(uri, US, U), + URIs = case URIParam of + B when is_binary(B) -> [B]; + L when is_list(L) -> L + end, + #upstream{uris = URIs, + exchange_name = bget(exchange, US, U, name(XorQ)), + queue_name = bget(queue, US, U, name(XorQ)), + consumer_tag = bget('consumer-tag', US, U, <<"federation-link-", Name/binary>>), + prefetch_count = bget('prefetch-count', US, U, ?DEF_PREFETCH), + reconnect_delay = bget('reconnect-delay', US, U, 5), + max_hops = bget('max-hops', US, U, 1), + expires = bget(expires, US, U, none), + message_ttl = bget('message-ttl', US, U, none), + trust_user_id = bget('trust-user-id', US, U, false), + ack_mode = to_atom(bget('ack-mode', US, U, <<"on-confirm">>)), + ha_policy = bget('ha-policy', US, U, none), + name = Name, + bind_nowait = bget('bind-nowait', US, U, false), + resource_cleanup_mode = to_atom(bget('resource-cleanup-mode', US, U, <<"default">>))}. + +%%---------------------------------------------------------------------------- + +bget(K, L1, L2) -> bget(K, L1, L2, undefined). + +bget(K0, L1, L2, D) -> + K = a2b(K0), + %% coerce maps to proplists + PL1 = rabbit_data_coercion:to_list(L1), + PL2 = rabbit_data_coercion:to_list(L2), + case pget(K, PL1, undefined) of + undefined -> pget(K, PL2, D); + Result -> Result + end. + +a2b(A) -> list_to_binary(atom_to_list(A)). + +with_name(#upstream{exchange_name = XNameBin}, VHostBin, X = #exchange{}) -> + X#exchange{name = rabbit_misc:r(VHostBin, exchange, XNameBin)}; + +with_name(#upstream{queue_name = QNameBin}, VHostBin, Q) when ?is_amqqueue(Q) -> + amqqueue:set_name(Q, rabbit_misc:r(VHostBin, queue, QNameBin)). diff --git a/deps/rabbitmq_federation/src/rabbit_federation_upstream_exchange.erl b/deps/rabbitmq_federation/src/rabbit_federation_upstream_exchange.erl new file mode 100644 index 0000000000..6018dd90a5 --- /dev/null +++ b/deps/rabbitmq_federation/src/rabbit_federation_upstream_exchange.erl @@ -0,0 +1,75 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. All rights reserved. +%% + +-module(rabbit_federation_upstream_exchange). + +-rabbit_boot_step({?MODULE, + [{description, "federation upstream exchange type"}, + {mfa, {rabbit_registry, register, + [exchange, <<"x-federation-upstream">>, ?MODULE]}}, + {requires, rabbit_registry}, + {cleanup, {rabbit_registry, unregister, + [exchange, <<"x-federation-upstream">>]}}, + {enables, recovery}]}). + +-include_lib("rabbit_common/include/rabbit.hrl"). +-include("rabbit_federation.hrl"). + +-behaviour(rabbit_exchange_type). + +-export([description/0, serialise_events/0, route/2]). +-export([validate/1, validate_binding/2, + create/2, delete/3, policy_changed/2, + add_binding/3, remove_bindings/3, assert_args_equivalence/2]). +-export([info/1, info/2]). + +%%---------------------------------------------------------------------------- + +info(_X) -> []. +info(_X, _) -> []. + +description() -> + [{description, <<"Federation upstream helper exchange">>}, + {internal_purpose, federation}]. + +serialise_events() -> false. + +route(X = #exchange{arguments = Args}, + D = #delivery{message = #basic_message{content = Content}}) -> + %% This arg was introduced in the same release as this exchange type; + %% it must be set + {long, MaxHops} = rabbit_misc:table_lookup(Args, ?MAX_HOPS_ARG), + %% Will be missing for pre-3.3.0 versions + DName = case rabbit_misc:table_lookup(Args, ?DOWNSTREAM_NAME_ARG) of + {longstr, Val0} -> Val0; + _ -> unknown + end, + %% Will be missing for pre-3.8.9 versions + DVhost = case rabbit_misc:table_lookup(Args, ?DOWNSTREAM_VHOST_ARG) of + {longstr, Val1} -> Val1; + _ -> unknown + end, + Headers = rabbit_basic:extract_headers(Content), + case rabbit_federation_util:should_forward(Headers, MaxHops, DName, DVhost) of + true -> rabbit_exchange_type_fanout:route(X, D); + false -> [] + end. + +validate(#exchange{arguments = Args}) -> + rabbit_federation_util:validate_arg(?MAX_HOPS_ARG, long, Args). + +validate_binding(_X, _B) -> ok. +create(_Tx, _X) -> ok. +delete(_Tx, _X, _Bs) -> ok. +policy_changed(_X1, _X2) -> ok. +add_binding(_Tx, _X, _B) -> ok. +remove_bindings(_Tx, _X, _Bs) -> ok. + +assert_args_equivalence(X = #exchange{name = Name, + arguments = Args}, ReqArgs) -> + rabbit_misc:assert_args_equivalence(Args, ReqArgs, Name, [?MAX_HOPS_ARG]), + rabbit_exchange:assert_args_equivalence(X, Args). diff --git a/deps/rabbitmq_federation/src/rabbit_federation_util.erl b/deps/rabbitmq_federation/src/rabbit_federation_util.erl new file mode 100644 index 0000000000..160bac996e --- /dev/null +++ b/deps/rabbitmq_federation/src/rabbit_federation_util.erl @@ -0,0 +1,102 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_federation_util). + +-include_lib("rabbit/include/amqqueue.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). +-include("rabbit_federation.hrl"). + +-export([should_forward/4, find_upstreams/2, already_seen/3]). +-export([validate_arg/3, fail/2, name/1, vhost/1, r/1, pgname/1]). +-export([obfuscate_upstream/1, deobfuscate_upstream/1, obfuscate_upstream_params/1, deobfuscate_upstream_params/1]). + +-import(rabbit_misc, [pget_or_die/2, pget/3]). + +%%---------------------------------------------------------------------------- + +should_forward(undefined, _MaxHops, _DName, _DVhost) -> + true; +should_forward(Headers, MaxHops, DName, DVhost) -> + case rabbit_misc:table_lookup(Headers, ?ROUTING_HEADER) of + {array, A} -> length(A) < MaxHops andalso not already_seen(DName, DVhost, A); + _ -> true + end. + +%% Used to detect message and binding forwarding cycles. +already_seen(UpstreamID, UpstreamVhost, Array) -> + lists:any(fun ({table, T}) -> + {longstr, UpstreamID} =:= rabbit_misc:table_lookup(T, <<"cluster-name">>) andalso + {longstr, UpstreamVhost} =:= rabbit_misc:table_lookup(T, <<"vhost">>); + (_) -> + false + end, Array). + +find_upstreams(Name, Upstreams) -> + [U || U = #upstream{name = Name2} <- Upstreams, + Name =:= Name2]. + +validate_arg(Name, Type, Args) -> + case rabbit_misc:table_lookup(Args, Name) of + {Type, _} -> ok; + undefined -> fail("Argument ~s missing", [Name]); + _ -> fail("Argument ~s must be of type ~s", [Name, Type]) + end. + +-spec fail(io:format(), [term()]) -> no_return(). + +fail(Fmt, Args) -> rabbit_misc:protocol_error(precondition_failed, Fmt, Args). + +name( #resource{name = XorQName}) -> XorQName; +name(#exchange{name = #resource{name = XName}}) -> XName; +name(Q) when ?is_amqqueue(Q) -> #resource{name = QName} = amqqueue:get_name(Q), QName. + +vhost( #resource{virtual_host = VHost}) -> VHost; +vhost(#exchange{name = #resource{virtual_host = VHost}}) -> VHost; +vhost(Q) when ?is_amqqueue(Q) -> #resource{virtual_host = VHost} = amqqueue:get_name(Q), VHost; +vhost(#amqp_params_direct{virtual_host = VHost}) -> VHost; +vhost(#amqp_params_network{virtual_host = VHost}) -> VHost. + +r(#exchange{name = XName}) -> XName; +r(Q) when ?is_amqqueue(Q) -> amqqueue:get_name(Q). + +pgname(Name) -> + case application:get_env(rabbitmq_federation, pgroup_name_cluster_id) of + {ok, false} -> Name; + {ok, true} -> {rabbit_nodes:cluster_name(), Name}; + %% default value is 'false', so do the same thing + {ok, undefined} -> Name; + _ -> Name + end. + +obfuscate_upstream(#upstream{uris = Uris} = Upstream) -> + Upstream#upstream{uris = [credentials_obfuscation:encrypt(Uri) || Uri <- Uris]}. + +obfuscate_upstream_params(#upstream_params{uri = Uri, params = #amqp_params_network{password = Password} = Params} = UParams) -> + UParams#upstream_params{ + uri = credentials_obfuscation:encrypt(Uri), + params = Params#amqp_params_network{password = credentials_obfuscation:encrypt(rabbit_data_coercion:to_binary(Password))} + }; +obfuscate_upstream_params(#upstream_params{uri = Uri, params = #amqp_params_direct{password = Password} = Params} = UParams) -> + UParams#upstream_params{ + uri = credentials_obfuscation:encrypt(Uri), + params = Params#amqp_params_direct{password = credentials_obfuscation:encrypt(rabbit_data_coercion:to_binary(Password))} + }. + +deobfuscate_upstream(#upstream{uris = EncryptedUris} = Upstream) -> + Upstream#upstream{uris = [credentials_obfuscation:decrypt(EncryptedUri) || EncryptedUri <- EncryptedUris]}. + +deobfuscate_upstream_params(#upstream_params{uri = EncryptedUri, params = #amqp_params_network{password = EncryptedPassword} = Params} = UParams) -> + UParams#upstream_params{ + uri = credentials_obfuscation:decrypt(EncryptedUri), + params = Params#amqp_params_network{password = credentials_obfuscation:decrypt(EncryptedPassword)} + }; +deobfuscate_upstream_params(#upstream_params{uri = EncryptedUri, params = #amqp_params_direct{password = EncryptedPassword} = Params} = UParams) -> + UParams#upstream_params{ + uri = credentials_obfuscation:decrypt(EncryptedUri), + params = Params#amqp_params_direct{password = credentials_obfuscation:decrypt(EncryptedPassword)} + }. diff --git a/deps/rabbitmq_federation/test/exchange_SUITE.erl b/deps/rabbitmq_federation/test/exchange_SUITE.erl new file mode 100644 index 0000000000..a0cd51c7c9 --- /dev/null +++ b/deps/rabbitmq_federation/test/exchange_SUITE.erl @@ -0,0 +1,1319 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. All rights reserved. +%% + +-module(exchange_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). + +-include("rabbit_federation.hrl"). + +-compile(export_all). + +-import(rabbit_federation_test_util, + [wait_for_federation/2, expect/3, expect/4, expect_empty/2, + set_upstream/4, set_upstream/5, set_upstream_in_vhost/5, set_upstream_in_vhost/6, + clear_upstream/3, set_upstream_set/4, + set_policy/5, set_policy_pattern/5, clear_policy/3, + set_policy_upstream/5, set_policy_upstreams/4, + all_federation_links/2, federation_links_in_vhost/3, status_fields/2]). + +-import(rabbit_ct_broker_helpers, + [set_policy_in_vhost/7]). + +all() -> + [ + {group, without_automatic_setup}, + {group, without_disambiguate}, + {group, with_disambiguate} + ]. + +groups() -> + [ + {without_automatic_setup, [], [ + message_cycle_detection_case2 + ]}, + {without_disambiguate, [], [ + {cluster_size_1, [], [ + simple, + multiple_upstreams, + multiple_upstreams_pattern, + multiple_uris, + multiple_downstreams, + e2e, + unbind_on_delete, + unbind_on_unbind, + unbind_gets_transmitted, + no_loop, + dynamic_reconfiguration, + dynamic_reconfiguration_integrity, + federate_unfederate, + dynamic_plugin_stop_start, + dynamic_plugin_cleanup_stop_start, + dynamic_policy_cleanup, + delete_federated_exchange_upstream, + delete_federated_queue_upstream + ]} + ]}, + {with_disambiguate, [], [ + {cluster_size_2, [], [ + user_id, + message_cycle_detection_case1, + restart_upstream + ]}, + {cluster_size_3, [], [ + max_hops, + binding_propagation + ]}, + + {without_plugins, [], [ + {cluster_size_2, [], [ + upstream_has_no_federation + ]} + ]} + ]} + ]. + +suite() -> + [{timetrap, {minutes, 5}}]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_group(without_automatic_setup, Config) -> + Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodename_suffix, Suffix}, + {rmq_nodes_clustered, false}, + {rmq_nodes_count, 1} + ]), + rabbit_ct_helpers:run_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()); +init_per_group(without_disambiguate, Config) -> + rabbit_ct_helpers:set_config(Config, + {disambiguate_step, []}); +init_per_group(with_disambiguate, Config) -> + rabbit_ct_helpers:set_config(Config, + {disambiguate_step, [fun rabbit_federation_test_util:disambiguate/1]}); +init_per_group(without_plugins, Config) -> + rabbit_ct_helpers:set_config(Config, + {broker_with_plugins, [true, false]}); +init_per_group(cluster_size_1 = Group, Config) -> + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodes_count, 1} + ]), + init_per_group1(Group, Config1); +init_per_group(cluster_size_2 = Group, Config) -> + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodes_count, 2} + ]), + init_per_group1(Group, Config1); +init_per_group(cluster_size_3 = Group, Config) -> + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodes_count, 3} + ]), + init_per_group1(Group, Config1). + +init_per_group1(Group, Config) -> + SetupFederation = case Group of + cluster_size_1 -> [fun rabbit_federation_test_util:setup_federation/1]; + cluster_size_2 -> []; + cluster_size_3 -> [] + end, + Disambiguate = ?config(disambiguate_step, Config), + Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodename_suffix, Suffix}, + {rmq_nodes_clustered, false} + ]), + rabbit_ct_helpers:run_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps() ++ + SetupFederation ++ Disambiguate). + +end_per_group(without_disambiguate, Config) -> + Config; +end_per_group(with_disambiguate, Config) -> + Config; +end_per_group(without_plugins, Config) -> + Config; +end_per_group(_, Config) -> + rabbit_ct_helpers:run_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps() + ). + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase). + +end_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +%% ------------------------------------------------------------------- +%% Testcases. +%% ------------------------------------------------------------------- + +simple(Config) -> + with_ch(Config, + fun (Ch) -> + Q = bind_queue(Ch, <<"fed.downstream">>, <<"key">>), + await_binding(Config, 0, <<"upstream">>, <<"key">>), + publish_expect(Ch, <<"upstream">>, <<"key">>, Q, <<"HELLO">>) + end, upstream_downstream()). + +multiple_upstreams(Config) -> + with_ch(Config, + fun (Ch) -> + Q = bind_queue(Ch, <<"fed12.downstream">>, <<"key">>), + await_binding(Config, 0, <<"upstream">>, <<"key">>), + await_binding(Config, 0, <<"upstream2">>, <<"key">>), + publish_expect(Ch, <<"upstream">>, <<"key">>, Q, <<"HELLO1">>), + publish_expect(Ch, <<"upstream2">>, <<"key">>, Q, <<"HELLO2">>) + end, [x(<<"upstream">>), + x(<<"upstream2">>), + x(<<"fed12.downstream">>)]). + +multiple_upstreams_pattern(Config) -> + set_upstream(Config, 0, <<"local453x">>, + rabbit_ct_broker_helpers:node_uri(Config, 0), [ + {<<"exchange">>, <<"upstream">>}, + {<<"queue">>, <<"upstream">>}]), + + set_upstream(Config, 0, <<"local3214x">>, + rabbit_ct_broker_helpers:node_uri(Config, 0), [ + {<<"exchange">>, <<"upstream2">>}, + {<<"queue">>, <<"upstream2">>}]), + + set_policy_pattern(Config, 0, <<"pattern">>, <<"^pattern\.">>, <<"local\\d+x">>), + + with_ch(Config, + fun (Ch) -> + Q = bind_queue(Ch, <<"pattern.downstream">>, <<"key">>), + await_binding(Config, 0, <<"upstream">>, <<"key">>), + await_binding(Config, 0, <<"upstream2">>, <<"key">>), + publish_expect(Ch, <<"upstream">>, <<"key">>, Q, <<"HELLO1">>), + publish_expect(Ch, <<"upstream2">>, <<"key">>, Q, <<"HELLO2">>) + end, [x(<<"upstream">>), + x(<<"upstream2">>), + x(<<"pattern.downstream">>)]), + + clear_upstream(Config, 0, <<"local453x">>), + clear_upstream(Config, 0, <<"local3214x">>), + clear_policy(Config, 0, <<"pattern">>). + +multiple_uris(Config) -> + %% We can't use a direct connection for Kill() to work. + URIs = [ + rabbit_ct_broker_helpers:node_uri(Config, 0), + rabbit_ct_broker_helpers:node_uri(Config, 0, [use_ipaddr]) + ], + set_upstream(Config, 0, <<"localhost">>, URIs), + WithCh = fun(F) -> + Ch = rabbit_ct_client_helpers:open_channel(Config, 0), + F(Ch), + rabbit_ct_client_helpers:close_channels_and_connection( + Config, 0) + end, + WithCh(fun (Ch) -> declare_all(Ch, upstream_downstream()) end), + expect_uris(Config, 0, URIs), + WithCh(fun (Ch) -> delete_all(Ch, upstream_downstream()) end), + %% Put back how it was + rabbit_federation_test_util:setup_federation(Config), + ok. + +expect_uris(_, _, []) -> + ok; +expect_uris(Config, Node, URIs) -> + [Link] = rabbit_ct_broker_helpers:rpc(Config, Node, + rabbit_federation_status, status, []), + URI = rabbit_misc:pget(uri, Link), + kill_only_connection(Config, Node), + expect_uris(Config, Node, URIs -- [URI]). + +kill_only_connection(Config, Node) -> + case connection_pids(Config, Node) of + [Pid] -> catch rabbit_ct_broker_helpers:rpc(Config, Node, + rabbit_networking, close_connection, [Pid, "boom"]), %% [1] + wait_for_pid_to_die(Config, Node, Pid); + _ -> timer:sleep(100), + kill_only_connection(Config, Node) + end. + +%% [1] the catch is because we could still see a connection from a +%% previous time round. If so that's fine (we'll just loop around +%% again) but we don't want the test to fail because a connection +%% closed as we were trying to close it. + +wait_for_pid_to_die(Config, Node, Pid) -> + case connection_pids(Config, Node) of + [Pid] -> timer:sleep(100), + wait_for_pid_to_die(Config, Node, Pid); + _ -> ok + end. + + +multiple_downstreams(Config) -> + with_ch(Config, + fun (Ch) -> + Q1 = bind_queue(Ch, <<"fed.downstream">>, <<"key">>), + Q12 = bind_queue(Ch, <<"fed12.downstream2">>, <<"key">>), + await_binding(Config, 0, <<"upstream">>, <<"key">>, 2), + await_binding(Config, 0, <<"upstream2">>, <<"key">>), + publish(Ch, <<"upstream">>, <<"key">>, <<"HELLO1">>), + publish(Ch, <<"upstream2">>, <<"key">>, <<"HELLO2">>), + expect(Ch, Q1, [<<"HELLO1">>]), + expect(Ch, Q12, [<<"HELLO1">>, <<"HELLO2">>]) + end, upstream_downstream() ++ + [x(<<"upstream2">>), + x(<<"fed12.downstream2">>)]). + +e2e(Config) -> + with_ch(Config, + fun (Ch) -> + bind_exchange(Ch, <<"downstream2">>, <<"fed.downstream">>, + <<"key">>), + await_binding(Config, 0, <<"upstream">>, <<"key">>), + Q = bind_queue(Ch, <<"downstream2">>, <<"key">>), + publish_expect(Ch, <<"upstream">>, <<"key">>, Q, <<"HELLO1">>) + end, upstream_downstream() ++ [x(<<"downstream2">>)]). + +unbind_on_delete(Config) -> + with_ch(Config, + fun (Ch) -> + Q1 = bind_queue(Ch, <<"fed.downstream">>, <<"key">>), + Q2 = bind_queue(Ch, <<"fed.downstream">>, <<"key">>), + await_binding(Config, 0, <<"upstream">>, <<"key">>), + delete_queue(Ch, Q2), + publish_expect(Ch, <<"upstream">>, <<"key">>, Q1, <<"HELLO">>) + end, upstream_downstream()). + +unbind_on_unbind(Config) -> + with_ch(Config, + fun (Ch) -> + Q1 = bind_queue(Ch, <<"fed.downstream">>, <<"key">>), + Q2 = bind_queue(Ch, <<"fed.downstream">>, <<"key">>), + await_binding(Config, 0, <<"upstream">>, <<"key">>), + unbind_queue(Ch, Q2, <<"fed.downstream">>, <<"key">>), + publish_expect(Ch, <<"upstream">>, <<"key">>, Q1, <<"HELLO">>), + delete_queue(Ch, Q2) + end, upstream_downstream()). + +user_id(Config) -> + [Rabbit, Hare] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + set_policy_upstream(Config, Rabbit, <<"^test$">>, + rabbit_ct_broker_helpers:node_uri(Config, 1), []), + Perm = fun (F, A) -> + ok = rpc:call(Hare, + rabbit_auth_backend_internal, F, A) + end, + Perm(add_user, [<<"hare-user">>, <<"hare-user">>, <<"acting-user">>]), + Perm(set_permissions, [<<"hare-user">>, + <<"/">>, <<".*">>, <<".*">>, <<".*">>, + <<"acting-user">>]), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Rabbit), + {ok, Conn2} = amqp_connection:start( + #amqp_params_network{ + username = <<"hare-user">>, + password = <<"hare-user">>, + port = rabbit_ct_broker_helpers:get_node_config(Config, Hare, + tcp_port_amqp)}), + {ok, Ch2} = amqp_connection:open_channel(Conn2), + + declare_exchange(Ch2, x(<<"test">>)), + declare_exchange(Ch, x(<<"test">>)), + Q = bind_queue(Ch, <<"test">>, <<"key">>), + await_binding(Config, Hare, <<"test">>, <<"key">>), + + Msg = #amqp_msg{props = #'P_basic'{user_id = <<"hare-user">>}, + payload = <<"HELLO">>}, + + SafeUri = fun (H) -> + {array, [{table, Recv}]} = + rabbit_misc:table_lookup( + H, <<"x-received-from">>), + URI = rabbit_ct_broker_helpers:node_uri(Config, 1), + {longstr, URI} = + rabbit_misc:table_lookup(Recv, <<"uri">>) + end, + ExpectUser = + fun (ExpUser) -> + fun () -> + receive + {#'basic.deliver'{}, + #amqp_msg{props = Props, + payload = Payload}} -> + #'P_basic'{user_id = ActUser, + headers = Headers} = Props, + SafeUri(Headers), + <<"HELLO">> = Payload, + ExpUser = ActUser + end + end + end, + + wait_for_federation( + 90, + fun() -> + VHost = <<"/">>, + X1s = rabbit_ct_broker_helpers:rpc( + Config, Rabbit, rabbit_exchange, list, [VHost]), + L1 = + [X || X <- X1s, + X#exchange.name =:= #resource{virtual_host = VHost, + kind = exchange, + name = <<"test">>}, + X#exchange.scratches =:= [{federation, + [{{<<"upstream-2">>, + <<"test">>}, + <<"B">>}]}]], + X2s = rabbit_ct_broker_helpers:rpc( + Config, Hare, rabbit_exchange, list, [VHost]), + L2 = + [X || X <- X2s, + X#exchange.type =:= 'x-federation-upstream'], + [] =/= L1 andalso [] =/= L2 andalso + has_internal_federated_queue(Config, Hare, VHost) + end), + publish(Ch2, <<"test">>, <<"key">>, Msg), + expect(Ch, Q, ExpectUser(undefined)), + + set_policy_upstream(Config, Rabbit, <<"^test$">>, + rabbit_ct_broker_helpers:node_uri(Config, 1), + [{<<"trust-user-id">>, true}]), + wait_for_federation( + 90, + fun() -> + VHost = <<"/">>, + X1s = rabbit_ct_broker_helpers:rpc( + Config, Rabbit, rabbit_exchange, list, [VHost]), + L1 = + [X || X <- X1s, + X#exchange.name =:= #resource{virtual_host = VHost, + kind = exchange, + name = <<"test">>}, + X#exchange.scratches =:= [{federation, + [{{<<"upstream-2">>, + <<"test">>}, + <<"A">>}]}]], + X2s = rabbit_ct_broker_helpers:rpc( + Config, Hare, rabbit_exchange, list, [VHost]), + L2 = + [X || X <- X2s, + X#exchange.type =:= 'x-federation-upstream'], + [] =/= L1 andalso [] =/= L2 andalso + has_internal_federated_queue(Config, Hare, VHost) + end), + publish(Ch2, <<"test">>, <<"key">>, Msg), + expect(Ch, Q, ExpectUser(<<"hare-user">>)), + + amqp_channel:close(Ch2), + amqp_connection:close(Conn2), + + ok. + +%% In order to test that unbinds get sent we deliberately set up a +%% broken config - with topic upstream and fanout downstream. You +%% shouldn't really do this, but it lets us see "extra" messages that +%% get sent. +unbind_gets_transmitted(Config) -> + with_ch(Config, + fun (Ch) -> + Q11 = bind_queue(Ch, <<"fed.downstream">>, <<"key1">>), + Q12 = bind_queue(Ch, <<"fed.downstream">>, <<"key1">>), + Q21 = bind_queue(Ch, <<"fed.downstream">>, <<"key2">>), + Q22 = bind_queue(Ch, <<"fed.downstream">>, <<"key2">>), + await_binding(Config, 0, <<"upstream">>, <<"key1">>), + await_binding(Config, 0, <<"upstream">>, <<"key2">>), + [delete_queue(Ch, Q) || Q <- [Q12, Q21, Q22]], + publish(Ch, <<"upstream">>, <<"key1">>, <<"YES">>), + publish(Ch, <<"upstream">>, <<"key2">>, <<"NO">>), + expect(Ch, Q11, [<<"YES">>]), + expect_empty(Ch, Q11) + end, [x(<<"upstream">>), + x(<<"fed.downstream">>)]). + +no_loop(Config) -> + with_ch(Config, + fun (Ch) -> + Q1 = bind_queue(Ch, <<"one">>, <<"key">>), + Q2 = bind_queue(Ch, <<"two">>, <<"key">>), + await_binding(Config, 0, <<"one">>, <<"key">>, 2), + await_binding(Config, 0, <<"two">>, <<"key">>, 2), + publish(Ch, <<"one">>, <<"key">>, <<"Hello from one">>), + publish(Ch, <<"two">>, <<"key">>, <<"Hello from two">>), + expect(Ch, Q1, [<<"Hello from one">>, <<"Hello from two">>]), + expect(Ch, Q2, [<<"Hello from one">>, <<"Hello from two">>]), + expect_empty(Ch, Q1), + expect_empty(Ch, Q2) + end, [x(<<"one">>), + x(<<"two">>)]). + +suffix(Config, Node, Name, XName) -> + rabbit_ct_broker_helpers:rpc(Config, Node, + rabbit_federation_db, get_active_suffix, + [xr(<<"fed.downstream">>), + #upstream{name = Name, + exchange_name = list_to_binary(XName)}, none]). + +restart_upstream(Config) -> + [Rabbit, Hare] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Downstream = rabbit_ct_client_helpers:open_channel(Config, Rabbit), + Upstream = rabbit_ct_client_helpers:open_channel(Config, Hare), + + rabbit_federation_test_util:set_upstream(Config, + Rabbit, <<"hare">>, rabbit_ct_broker_helpers:node_uri(Config, 1)), + rabbit_federation_test_util:set_upstream_set(Config, + Rabbit, <<"upstream">>, + [{<<"hare">>, [{<<"exchange">>, <<"upstream">>}]}]), + rabbit_federation_test_util:set_policy(Config, + Rabbit, <<"hare">>, <<"^hare\\.">>, <<"upstream">>), + + declare_exchange(Upstream, x(<<"upstream">>)), + declare_exchange(Downstream, x(<<"hare.downstream">>)), + + Qstays = bind_queue(Downstream, <<"hare.downstream">>, <<"stays">>), + Qgoes = bind_queue(Downstream, <<"hare.downstream">>, <<"goes">>), + + rabbit_ct_client_helpers:close_channels_and_connection(Config, Hare), + rabbit_ct_broker_helpers:stop_node(Config, Hare), + + Qcomes = bind_queue(Downstream, <<"hare.downstream">>, <<"comes">>), + unbind_queue(Downstream, Qgoes, <<"hare.downstream">>, <<"goes">>), + + rabbit_ct_broker_helpers:start_node(Config, Hare), + Upstream1 = rabbit_ct_client_helpers:open_channel(Config, Hare), + + %% Wait for the link to come up and for these bindings + %% to be transferred + await_binding(Config, Hare, <<"upstream">>, <<"comes">>, 1), + await_binding_absent(Config, Hare, <<"upstream">>, <<"goes">>), + await_binding(Config, Hare, <<"upstream">>, <<"stays">>, 1), + + publish(Upstream1, <<"upstream">>, <<"goes">>, <<"GOES">>), + publish(Upstream1, <<"upstream">>, <<"stays">>, <<"STAYS">>), + publish(Upstream1, <<"upstream">>, <<"comes">>, <<"COMES">>), + + expect(Downstream, Qstays, [<<"STAYS">>]), + expect(Downstream, Qcomes, [<<"COMES">>]), + expect_empty(Downstream, Qgoes), + + delete_exchange(Downstream, <<"hare.downstream">>), + delete_exchange(Upstream1, <<"upstream">>), + + rabbit_federation_test_util:clear_policy(Config, + Rabbit, <<"hare">>), + rabbit_federation_test_util:clear_upstream_set(Config, + Rabbit, <<"upstream">>), + rabbit_federation_test_util:clear_upstream(Config, + Rabbit, <<"hare">>), + ok. + +%% flopsy, mopsy and cottontail, connected in a ring with max_hops = 2 +%% for each connection. We should not see any duplicates. + +max_hops(Config) -> + [Flopsy, Mopsy, Cottontail] = rabbit_ct_broker_helpers:get_node_configs( + Config, nodename), + [set_policy_upstream(Config, Downstream, + <<"^ring$">>, + rabbit_ct_broker_helpers:node_uri(Config, Upstream), + [{<<"max-hops">>, 2}]) + || {Downstream, Upstream} <- [{Flopsy, Cottontail}, + {Mopsy, Flopsy}, + {Cottontail, Mopsy}]], + + FlopsyCh = rabbit_ct_client_helpers:open_channel(Config, Flopsy), + MopsyCh = rabbit_ct_client_helpers:open_channel(Config, Mopsy), + CottontailCh = rabbit_ct_client_helpers:open_channel(Config, Cottontail), + + declare_exchange(FlopsyCh, x(<<"ring">>)), + declare_exchange(MopsyCh, x(<<"ring">>)), + declare_exchange(CottontailCh, x(<<"ring">>)), + + Q1 = bind_queue(FlopsyCh, <<"ring">>, <<"key">>), + Q2 = bind_queue(MopsyCh, <<"ring">>, <<"key">>), + Q3 = bind_queue(CottontailCh, <<"ring">>, <<"key">>), + + await_binding(Config, Flopsy, <<"ring">>, <<"key">>, 3), + await_binding(Config, Mopsy, <<"ring">>, <<"key">>, 3), + await_binding(Config, Cottontail, <<"ring">>, <<"key">>, 3), + + publish(FlopsyCh, <<"ring">>, <<"key">>, <<"HELLO flopsy">>), + publish(MopsyCh, <<"ring">>, <<"key">>, <<"HELLO mopsy">>), + publish(CottontailCh, <<"ring">>, <<"key">>, <<"HELLO cottontail">>), + + Msgs = [<<"HELLO flopsy">>, <<"HELLO mopsy">>, <<"HELLO cottontail">>], + expect(FlopsyCh, Q1, Msgs), + expect(MopsyCh, Q2, Msgs), + expect(CottontailCh, Q3, Msgs), + expect_empty(FlopsyCh, Q1), + expect_empty(MopsyCh, Q2), + expect_empty(CottontailCh, Q3), + ok. + +%% Two nodes, federated two way with the same virtual hosts, and max_hops set to a +%% high value. +message_cycle_detection_case1(Config) -> + [Cycle1, Cycle2] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + [set_policy_upstream(Config, Downstream, + <<"^cycle$">>, + rabbit_ct_broker_helpers:node_uri(Config, Upstream), + [{<<"max-hops">>, 10}]) + || {Downstream, Upstream} <- [{Cycle1, Cycle2}, {Cycle2, Cycle1}]], + + Cycle1Ch = rabbit_ct_client_helpers:open_channel(Config, Cycle1), + Cycle2Ch = rabbit_ct_client_helpers:open_channel(Config, Cycle2), + + declare_exchange(Cycle1Ch, x(<<"cycle">>)), + declare_exchange(Cycle2Ch, x(<<"cycle">>)), + + Q1 = bind_queue(Cycle1Ch, <<"cycle">>, <<"cycle_detection-key">>), + Q2 = bind_queue(Cycle2Ch, <<"cycle">>, <<"cycle_detection-key">>), + + %% "key" present twice because once for the local queue and once + %% for federation in each case + await_binding(Config, Cycle1, <<"cycle">>, <<"cycle_detection-key">>, 2), + await_binding(Config, Cycle2, <<"cycle">>, <<"cycle_detection-key">>, 2), + + publish(Cycle1Ch, <<"cycle">>, <<"cycle_detection-key">>, <<"HELLO1">>), + publish(Cycle2Ch, <<"cycle">>, <<"cycle_detection-key">>, <<"HELLO2">>), + + Msgs = [<<"HELLO1">>, <<"HELLO2">>], + expect(Cycle1Ch, Q1, Msgs), + expect(Cycle2Ch, Q2, Msgs), + expect_empty(Cycle1Ch, Q1), + expect_empty(Cycle2Ch, Q2), + + ok. + +node_uri_with_virtual_host(Config, Vhost) -> + node_uri_with_virtual_host(Config, 0, Vhost). + +node_uri_with_virtual_host(Config, Node, Vhost) -> + NodeURI = rabbit_ct_broker_helpers:node_uri(Config, Node), + <<NodeURI/binary, "/", Vhost/binary>>. + +upstream_policy_defs(Upstream) -> + maps:to_list(#{<<"federation-upstream">> => Upstream}). + +%% Exchange federation between three local virtual hosts, A -> B -> C, +%% propagates messages from A to C with a high enough max-hops value +message_cycle_detection_case2(Config) -> + VH1 = <<"cycles.a">>, + VH2 = <<"cycles.b">>, + VH3 = <<"cycles.c">>, + [begin + rabbit_ct_broker_helpers:add_vhost(Config, V), + rabbit_ct_broker_helpers:set_full_permissions(Config, V) + end || V <- [VH1, VH2, VH3]], + + %% make sure that cycle detection does not drop messages because of a limit on hops + UpstreamOpts = [{<<"max-hops">>, 5}], + %% VH1 is an upstream for VH2 + %% VH2 is an upstream for VH3 + UpstreamA = <<"upstream_a">>, + URI1 = node_uri_with_virtual_host(Config, VH1), + set_upstream_in_vhost(Config, 0, VH2, UpstreamA, URI1, UpstreamOpts), + UpstreamB = <<"upstream_b">>, + URI2 = node_uri_with_virtual_host(Config, VH2), + set_upstream_in_vhost(Config, 0, VH3, UpstreamB, URI2, UpstreamOpts), + + %% policies + set_policy_in_vhost(Config, 0, VH2, <<"federate.x">>, <<"^federated">>, <<"exchanges">>, upstream_policy_defs(UpstreamA)), + set_policy_in_vhost(Config, 0, VH3, <<"federate.x">>, <<"^federated">>, <<"exchanges">>, upstream_policy_defs(UpstreamB)), + + %% channels + VH1Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VH1), + VH2Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VH2), + VH3Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VH3), + + {ok, VH1Ch} = amqp_connection:open_channel(VH1Conn), + {ok, VH2Ch} = amqp_connection:open_channel(VH2Conn), + {ok, VH3Ch} = amqp_connection:open_channel(VH3Conn), + + X = <<"federated.x">>, + declare_exchange(VH3Ch, x(X, <<"fanout">>)), + declare_exchange(VH2Ch, x(X, <<"fanout">>)), + declare_exchange(VH1Ch, x(X, <<"fanout">>)), + + rabbit_ct_helpers:await_condition( + fun () -> + LinksInB = federation_links_in_vhost(Config, 0, VH2), + LinksInC = federation_links_in_vhost(Config, 0, VH3), + length(LinksInB) =:= 1 andalso + length(LinksInC) =:= 1 andalso + [running] =:= status_fields(status, LinksInB ++ LinksInC) + end), + + Statuses = federation_links_in_vhost(Config, 0, VH2) ++ federation_links_in_vhost(Config, 0, VH3), + + ?assertEqual(lists:usort([URI1, URI2]), + status_fields(uri, Statuses)), + ?assertEqual(lists:usort([<<"federated.x">>]), + status_fields(upstream_exchange, Statuses)), + ?assertEqual(lists:usort([VH2, VH3]), + status_fields(vhost, Statuses)), + ?assertEqual(lists:usort([exchange]), + status_fields(type, Statuses)), + + %% give links some time to set up their topology + rabbit_ct_helpers:await_condition( + fun () -> + ExchangesInA = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_exchange, list, [VH1]), + ExchangesInB = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_exchange, list, [VH2]), + length(ExchangesInA) >= 1 andalso + length(ExchangesInB) >= 1 + end), + + RK = <<"doesn't matter">>, + Q = bind_queue(VH3Ch, X, RK), + ?assertEqual(ok, await_binding(Config, 0, VH2, X, RK, 1)), + ?assertEqual(ok, await_binding(Config, 0, VH3, X, RK, 1)), + timer:sleep(2000), + + rabbit_ct_helpers:await_condition( + fun () -> + ExchangesInA = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_exchange, list, [VH1]), + lists:any(fun(#exchange{name = XName}) -> + XName =:= rabbit_misc:r(VH1, exchange, X) + end, ExchangesInA) + end), + + Payload1 = <<"msg1">>, + Payload2 = <<"msg2">>, + publish(VH1Ch, X, RK, Payload1), + publish(VH1Ch, X, RK, Payload2), + + Msgs = [Payload1, Payload2], + %% payloads published to a federated exchange in A reach a queue in C + expect(VH3Ch, Q, Msgs, 10000), + + [amqp_connection:close(Conn) || Conn <- [VH1Conn, VH2Conn, VH3Conn]], + [rabbit_ct_broker_helpers:delete_vhost(Config, Vhost) || Vhost <- [VH1, VH2, VH3]], + ok. + +%% Arrows indicate message flow. Numbers indicate max_hops. +%% +%% Dylan ---1--> Bugs ---2--> Jessica +%% |^ |^ +%% |\--------------1---------------/| +%% \---------------1----------------/ +%% +%% +%% We want to demonstrate that if we bind a queue locally at each +%% broker, (exactly) the following bindings propagate: +%% +%% Bugs binds to Dylan +%% Jessica binds to Bugs, which then propagates on to Dylan +%% Jessica binds to Dylan directly +%% Dylan binds to Jessica. +%% +%% i.e. Dylan has two bindings from Jessica and one from Bugs +%% Bugs has one binding from Jessica +%% Jessica has one binding from Dylan +%% +%% So we tag each binding with its original broker and see how far it gets +%% +%% Also we check that when we tear down the original bindings +%% that we get rid of everything again. + +binding_propagation(Config) -> + [Dylan, Bugs, Jessica] = rabbit_ct_broker_helpers:get_node_configs(Config, + nodename), + set_policy_upstream(Config, Dylan, <<"^x$">>, + rabbit_ct_broker_helpers:node_uri(Config, Jessica), []), + set_policy_upstream(Config, Bugs, <<"^x$">>, + rabbit_ct_broker_helpers:node_uri(Config, Dylan), []), + set_policy_upstreams(Config, Jessica, <<"^x$">>, [ + {rabbit_ct_broker_helpers:node_uri(Config, Dylan), []}, + {rabbit_ct_broker_helpers:node_uri(Config, Bugs), + [{<<"max-hops">>, 2}]} + ]), + DylanCh = rabbit_ct_client_helpers:open_channel(Config, Dylan), + BugsCh = rabbit_ct_client_helpers:open_channel(Config, Bugs), + JessicaCh = rabbit_ct_client_helpers:open_channel(Config, Jessica), + + declare_exchange(DylanCh, x(<<"x">>)), + declare_exchange(BugsCh, x(<<"x">>)), + declare_exchange(JessicaCh, x(<<"x">>)), + + Q1 = bind_queue(DylanCh, <<"x">>, <<"dylan">>), + Q2 = bind_queue(BugsCh, <<"x">>, <<"bugs">>), + Q3 = bind_queue(JessicaCh, <<"x">>, <<"jessica">>), + + await_binding(Config, Dylan, <<"x">>, <<"jessica">>, 2), + await_bindings(Config, Dylan, <<"x">>, [<<"bugs">>, <<"dylan">>]), + await_bindings(Config, Bugs, <<"x">>, [<<"jessica">>, <<"bugs">>]), + await_bindings(Config, Jessica, <<"x">>, [<<"dylan">>, <<"jessica">>]), + + delete_queue(DylanCh, Q1), + delete_queue(BugsCh, Q2), + delete_queue(JessicaCh, Q3), + + await_bindings(Config, Dylan, <<"x">>, []), + await_bindings(Config, Bugs, <<"x">>, []), + await_bindings(Config, Jessica, <<"x">>, []), + + ok. + +upstream_has_no_federation(Config) -> + [Rabbit, Hare] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + set_policy_upstream(Config, Rabbit, <<"^test$">>, + rabbit_ct_broker_helpers:node_uri(Config, Hare), []), + Downstream = rabbit_ct_client_helpers:open_channel(Config, Rabbit), + Upstream = rabbit_ct_client_helpers:open_channel(Config, Hare), + declare_exchange(Upstream, x(<<"test">>)), + declare_exchange(Downstream, x(<<"test">>)), + Q = bind_queue(Downstream, <<"test">>, <<"routing">>), + await_binding(Config, Hare, <<"test">>, <<"routing">>), + publish(Upstream, <<"test">>, <<"routing">>, <<"HELLO">>), + expect(Downstream, Q, [<<"HELLO">>]), + ok. + +dynamic_reconfiguration(Config) -> + with_ch(Config, + fun (_Ch) -> + Xs = [<<"all.fed1">>, <<"all.fed2">>], + %% Left from the conf we set up for previous tests + assert_connections(Config, 0, Xs, [<<"localhost">>, <<"local5673">>]), + + %% Test that clearing connections works + clear_upstream(Config, 0, <<"localhost">>), + clear_upstream(Config, 0, <<"local5673">>), + assert_connections(Config, 0, Xs, []), + + %% Test that readding them and changing them works + set_upstream(Config, 0, + <<"localhost">>, rabbit_ct_broker_helpers:node_uri(Config, 0)), + %% Do it twice so we at least hit the no-restart optimisation + URI = rabbit_ct_broker_helpers:node_uri(Config, 0, [use_ipaddr]), + set_upstream(Config, 0, <<"localhost">>, URI), + set_upstream(Config, 0, <<"localhost">>, URI), + assert_connections(Config, 0, Xs, [<<"localhost">>]), + + %% And re-add the last - for next test + rabbit_federation_test_util:setup_federation(Config) + end, [x(<<"all.fed1">>), x(<<"all.fed2">>)]). + +dynamic_reconfiguration_integrity(Config) -> + with_ch(Config, + fun (_Ch) -> + Xs = [<<"new.fed1">>, <<"new.fed2">>], + + %% Declared exchanges with nonexistent set - no links + assert_connections(Config, 0, Xs, []), + + %% Create the set - links appear + set_upstream_set(Config, 0, <<"new-set">>, [{<<"localhost">>, []}]), + assert_connections(Config, 0, Xs, [<<"localhost">>]), + + %% Add nonexistent connections to set - nothing breaks + set_upstream_set(Config, 0, + <<"new-set">>, [{<<"localhost">>, []}, + {<<"does-not-exist">>, []}]), + assert_connections(Config, 0, Xs, [<<"localhost">>]), + + %% Change connection in set - links change + set_upstream_set(Config, 0, <<"new-set">>, [{<<"local5673">>, []}]), + assert_connections(Config, 0, Xs, [<<"local5673">>]) + end, [x(<<"new.fed1">>), x(<<"new.fed2">>)]). + +delete_federated_exchange_upstream(Config) -> + %% If two exchanges in different virtual hosts have the same name, only one should be deleted. + VH1 = <<"federation-downstream1">>, + rabbit_ct_broker_helpers:delete_vhost(Config, VH1), + rabbit_ct_broker_helpers:add_vhost(Config, VH1), + rabbit_ct_broker_helpers:set_full_permissions(Config, <<"guest">>, VH1), + VH2 = <<"federation-downstream2">>, + rabbit_ct_broker_helpers:delete_vhost(Config, VH2), + rabbit_ct_broker_helpers:add_vhost(Config, VH2), + rabbit_ct_broker_helpers:set_full_permissions(Config, <<"guest">>, VH2), + + Conn1 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VH1), + Conn2 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VH2), + {ok, Ch1} = amqp_connection:open_channel(Conn1), + {ok, Ch2} = amqp_connection:open_channel(Conn2), + + #'exchange.declare_ok'{} = declare_exchange(Ch1, #'exchange.declare'{exchange = <<"federated.topic">>, + type = <<"topic">>, + durable = true}), + #'exchange.declare_ok'{} = declare_exchange(Ch2, #'exchange.declare'{exchange = <<"federated.topic">>, + type = <<"topic">>, + durable = true}), + + rabbit_ct_broker_helpers:rpc(Config, 0, + rabbit_policy, set, + [VH1, + <<"federation">>, <<"^federated\.">>, + [{<<"federation-upstream-set">>, <<"all">>}], + 0, <<"exchanges">>, <<"acting-user">>]), + rabbit_ct_broker_helpers:rpc(Config, 0, + rabbit_policy, set, + [VH2, + <<"federation">>, <<"^federated\.">>, + [{<<"federation-upstream-set">>, <<"all">>}], + 0, <<"exchanges">>, <<"acting-user">>]), + + rabbit_ct_broker_helpers:set_parameter(Config, 0, VH1, + <<"federation-upstream">>, <<"upstream">>, + [{<<"uri">>, node_uri_with_virtual_host(Config, VH2)}]), + rabbit_ct_broker_helpers:set_parameter(Config, 0, VH2, + <<"federation-upstream">>, <<"upstream">>, + [{<<"uri">>, node_uri_with_virtual_host(Config, VH1)}]), + + ?assertEqual(1, length(federation_links_in_vhost(Config, 0, VH1))), + ?assertEqual(1, length(federation_links_in_vhost(Config, 0, VH2))), + + rabbit_ct_broker_helpers:clear_parameter(Config, 0, VH2, + <<"federation-upstream">>, <<"upstream">>), + + %% one link is still around + ?assertEqual(1, length(federation_links_in_vhost(Config, 0, VH1))), + ?assertEqual(0, length(federation_links_in_vhost(Config, 0, VH2))), + LinksInVH1 = federation_links_in_vhost(Config, 0, VH1), + ?assertEqual(VH1, proplists:get_value(vhost, hd(LinksInVH1))), + + [rabbit_ct_broker_helpers:delete_vhost(Config, Val) || Val <- [VH1, VH2]]. + +delete_federated_queue_upstream(Config) -> + %% If two queues in different virtual hosts have the same name, only one should be deleted. + VH1 = <<"federation-downstream1">>, + rabbit_ct_broker_helpers:delete_vhost(Config, VH1), + rabbit_ct_broker_helpers:add_vhost(Config, VH1), + rabbit_ct_broker_helpers:set_full_permissions(Config, <<"guest">>, VH1), + VH2 = <<"federation-downstream2">>, + rabbit_ct_broker_helpers:delete_vhost(Config, VH2), + rabbit_ct_broker_helpers:add_vhost(Config, VH2), + rabbit_ct_broker_helpers:set_full_permissions(Config, <<"guest">>, VH2), + + rabbit_ct_broker_helpers:rpc(Config, 0, + rabbit_policy, set, + [VH1, + <<"federation">>, <<"^federated\.">>, + [{<<"federation-upstream-set">>, <<"all">>}], + 0, <<"queues">>, <<"acting-user">>]), + rabbit_ct_broker_helpers:rpc(Config, 0, + rabbit_policy, set, + [VH2, + <<"federation">>, <<"^federated\.">>, + [{<<"federation-upstream-set">>, <<"all">>}], + 0, <<"queues">>, <<"acting-user">>]), + + rabbit_ct_broker_helpers:set_parameter(Config, 0, VH1, + <<"federation-upstream">>, <<"upstream">>, + [{<<"uri">>, node_uri_with_virtual_host(Config, VH2)}]), + rabbit_ct_broker_helpers:set_parameter(Config, 0, VH2, + <<"federation-upstream">>, <<"upstream">>, + [{<<"uri">>, node_uri_with_virtual_host(Config, VH1)}]), + + Conn1 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VH1), + Conn2 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VH2), + {ok, Ch1} = amqp_connection:open_channel(Conn1), + {ok, Ch2} = amqp_connection:open_channel(Conn2), + + #'queue.declare_ok'{} = declare_queue(Ch1, + #'queue.declare'{queue = <<"federated.queue">>, + durable = true}), + #'queue.declare_ok'{} = declare_queue(Ch2, + #'queue.declare'{queue = <<"federated.queue">>, + durable = true}), + + + rabbit_ct_helpers:await_condition( + fun () -> + length(federation_links_in_vhost(Config, 0, VH1)) > 0 andalso + length(federation_links_in_vhost(Config, 0, VH2)) > 0 + end), + + ?assertEqual(1, length(federation_links_in_vhost(Config, 0, VH1))), + ?assertEqual(1, length(federation_links_in_vhost(Config, 0, VH2))), + + rabbit_ct_broker_helpers:clear_parameter(Config, 0, VH2, + <<"federation-upstream">>, <<"upstream">>), + + ?assertEqual(1, length(federation_links_in_vhost(Config, 0, VH1))), + ?assertEqual(0, length(federation_links_in_vhost(Config, 0, VH2))), + LinksInVH1 = federation_links_in_vhost(Config, 0, VH1), + ?assertEqual(VH1, proplists:get_value(vhost, hd(LinksInVH1))), + + [rabbit_ct_broker_helpers:delete_vhost(Config, Val) || Val <- [VH1, VH2]]. + +federate_unfederate(Config) -> + with_ch(Config, + fun (_Ch) -> + Xs = [<<"dyn.exch1">>, <<"dyn.exch2">>], + + %% Declareda non-federated exchanges - no links + assert_connections(Config, 0, Xs, []), + + %% Federate them - links appear + set_policy(Config, 0, <<"dyn">>, <<"^dyn\\.">>, <<"all">>), + assert_connections(Config, 0, Xs, [<<"localhost">>, <<"local5673">>]), + + %% Change policy - links change + set_policy(Config, 0, <<"dyn">>, <<"^dyn\\.">>, <<"localhost">>), + assert_connections(Config, 0, Xs, [<<"localhost">>]), + + %% Unfederate them - links disappear + clear_policy(Config, 0, <<"dyn">>), + assert_connections(Config, 0, Xs, []) + end, [x(<<"dyn.exch1">>), x(<<"dyn.exch2">>)]). + +dynamic_plugin_stop_start(Config) -> + X1 = <<"dyn.exch1">>, + X2 = <<"dyn.exch2">>, + with_ch(Config, + fun (Ch) -> + set_policy(Config, 0, <<"dyn">>, <<"^dyn\\.">>, <<"localhost">>), + + %% Declare federated exchange - get link + assert_connections(Config, 0, [X1], [<<"localhost">>]), + + %% Disable plugin, link goes + ok = rabbit_ct_broker_helpers:disable_plugin(Config, 0, + "rabbitmq_federation"), + %% We can't check with status for obvious reasons... + undefined = rabbit_ct_broker_helpers:rpc(Config, 0, + erlang, whereis, [rabbit_federation_sup]), + {error, not_found} = rabbit_ct_broker_helpers:rpc(Config, 0, + rabbit_registry, lookup_module, + [exchange, 'x-federation-upstream']), + + %% Create exchange then re-enable plugin, links appear + declare_exchange(Ch, x(X2)), + ok = rabbit_ct_broker_helpers:enable_plugin(Config, 0, + "rabbitmq_federation"), + assert_connections(Config, 0, [X1, X2], [<<"localhost">>]), + {ok, _} = rabbit_ct_broker_helpers:rpc(Config, 0, + rabbit_registry, lookup_module, + [exchange, 'x-federation-upstream']), + + wait_for_federation( + 90, + fun() -> + VHost = <<"/">>, + Xs = rabbit_ct_broker_helpers:rpc( + Config, 0, rabbit_exchange, list, [VHost]), + L1 = + [X || X <- Xs, + X#exchange.type =:= 'x-federation-upstream'], + L2 = + [X || X <- Xs, + X#exchange.name =:= #resource{ + virtual_host = VHost, + kind = exchange, + name = X1}, + X#exchange.scratches =:= [{federation, + [{{<<"localhost">>, + X1}, + <<"A">>}]}]], + L3 = + [X || X <- Xs, + X#exchange.name =:= #resource{ + virtual_host = VHost, + kind = exchange, + name = X2}, + X#exchange.scratches =:= [{federation, + [{{<<"localhost">>, + X2}, + <<"B">>}]}]], + length(L1) =:= 2 andalso [] =/= L2 andalso [] =/= L3 andalso + has_internal_federated_queue(Config, 0, VHost) + end), + + %% Test both exchanges work. They are just federated to + %% themselves so should duplicate messages. + [begin + Q = bind_queue(Ch, X, <<"key">>), + await_binding(Config, 0, X, <<"key">>, 2), + publish(Ch, X, <<"key">>, <<"HELLO">>), + expect(Ch, Q, [<<"HELLO">>, <<"HELLO">>]), + delete_queue(Ch, Q) + end || X <- [X1, X2]], + + clear_policy(Config, 0, <<"dyn">>), + assert_connections(Config, 0, [X1, X2], []), + delete_exchange(Ch, X2) + end, [x(X1)]). + +dynamic_plugin_cleanup_stop_start(Config) -> + X1 = <<"dyn.exch1">>, + with_ch(Config, + fun (_Ch) -> + set_policy(Config, 0, <<"dyn">>, <<"^dyn\\.">>, <<"localhost">>), + + %% Declare a federated exchange, a link starts + assert_connections(Config, 0, [X1], [<<"localhost">>]), + wait_for_federation( + 90, + fun() -> + VHost = <<"/">>, + Xs = rabbit_ct_broker_helpers:rpc( + Config, 0, rabbit_exchange, list, [VHost]), + L1 = + [X || X <- Xs, + X#exchange.type =:= 'x-federation-upstream'], + L2 = + [X || X <- Xs, + X#exchange.name =:= #resource{ + virtual_host = VHost, + kind = exchange, + name = X1}, + X#exchange.scratches =:= [{federation, + [{{<<"localhost">>, + X1}, + <<"B">>}]}]], + [] =/= L1 andalso [] =/= L2 andalso + has_internal_federated_queue(Config, 0, VHost) + end), + + ?assert(has_internal_federated_exchange(Config, 0, <<"/">>)), + ?assert(has_internal_federated_queue(Config, 0, <<"/">>)), + + %% Disable plugin, link goes + ok = rabbit_ct_broker_helpers:disable_plugin(Config, 0, + "rabbitmq_federation"), + + %% Internal exchanges and queues need cleanup + ?assert(not has_internal_federated_exchange(Config, 0, <<"/">>)), + ?assert(not has_internal_federated_queue(Config, 0, <<"/">>)), + + ok = rabbit_ct_broker_helpers:enable_plugin(Config, 0, + "rabbitmq_federation"), + clear_policy(Config, 0, <<"dyn">>), + assert_connections(Config, 0, [X1], []) + end, [x(X1)]). + +dynamic_policy_cleanup(Config) -> + X1 = <<"dyn.exch1">>, + with_ch(Config, + fun (_Ch) -> + set_policy(Config, 0, <<"dyn">>, <<"^dyn\\.">>, <<"localhost">>), + + %% Declare federated exchange - get link + assert_connections(Config, 0, [X1], [<<"localhost">>]), + wait_for_federation( + 90, + fun() -> + VHost = <<"/">>, + Xs = rabbit_ct_broker_helpers:rpc( + Config, 0, rabbit_exchange, list, [VHost]), + L1 = + [X || X <- Xs, + X#exchange.type =:= 'x-federation-upstream'], + L2 = + [X || X <- Xs, + X#exchange.name =:= #resource{ + virtual_host = VHost, + kind = exchange, + name = X1}, + X#exchange.scratches =:= [{federation, + [{{<<"localhost">>, + X1}, + <<"B">>}]}]], + [] =/= L1 andalso [] =/= L2 andalso + has_internal_federated_queue(Config, 0, VHost) + end), + + ?assert(has_internal_federated_exchange(Config, 0, <<"/">>)), + ?assert(has_internal_federated_queue(Config, 0, <<"/">>)), + + clear_policy(Config, 0, <<"dyn">>), + timer:sleep(5000), + + %% Internal exchanges and queues need cleanup + ?assert(not has_internal_federated_exchange(Config, 0, <<"/">>)), + ?assert(not has_internal_federated_queue(Config, 0, <<"/">>)), + + clear_policy(Config, 0, <<"dyn">>), + assert_connections(Config, 0, [X1], []) + end, [x(X1)]). + +has_internal_federated_exchange(Config, Node, VHost) -> + lists:any(fun(X) -> + X#exchange.type == 'x-federation-upstream' + end, rabbit_ct_broker_helpers:rpc(Config, Node, + rabbit_exchange, list, [VHost])). + +has_internal_federated_queue(Config, Node, VHost) -> + lists:any( + fun(Q) -> + {'longstr', <<"federation">>} == + rabbit_misc:table_lookup(amqqueue:get_arguments(Q), <<"x-internal-purpose">>) + end, rabbit_ct_broker_helpers:rpc(Config, Node, + rabbit_amqqueue, list, [VHost])). + +%%---------------------------------------------------------------------------- + +with_ch(Config, Fun, Xs) -> + Ch = rabbit_ct_client_helpers:open_channel(Config, 0), + declare_all(Ch, Xs), + rabbit_federation_test_util:assert_status(Config, 0, + Xs, {exchange, upstream_exchange}), + Fun(Ch), + delete_all(Ch, Xs), + rabbit_ct_client_helpers:close_channel(Ch), + cleanup(Config, 0), + ok. + +cleanup(Config, Node) -> + [rabbit_ct_broker_helpers:rpc( + Config, Node, rabbit_amqqueue, delete, [Q, false, false, + <<"acting-user">>]) || + Q <- queues(Config, Node)]. + +queues(Config, Node) -> + Ret = rabbit_ct_broker_helpers:rpc(Config, Node, + rabbit_amqqueue, list, [<<"/">>]), + case Ret of + {badrpc, _} -> []; + Qs -> Qs + end. + +stop_other_node(Config, Node) -> + cleanup(Config, Node), + rabbit_federation_test_util:stop_other_node(Config, Node). + +declare_all(Ch, Xs) -> [declare_exchange(Ch, X) || X <- Xs]. +delete_all(Ch, Xs) -> + [delete_exchange(Ch, X) || #'exchange.declare'{exchange = X} <- Xs]. + +declare_exchange(Ch, X) -> + amqp_channel:call(Ch, X). + +x(Name) -> x(Name, <<"topic">>). + +x(Name, Type) -> + #'exchange.declare'{exchange = Name, + type = Type, + durable = true}. + +xr(Name) -> + rabbit_misc:r(<<"/">>, exchange, Name). + +xr(Vhost, Name) -> + rabbit_misc:r(Vhost, exchange, Name). + +declare_queue(Ch) -> + #'queue.declare_ok'{queue = Q} = + amqp_channel:call(Ch, #'queue.declare'{exclusive = true}), + Q. + +declare_queue(Ch, Q) -> + amqp_channel:call(Ch, Q). + +bind_queue(Ch, Q, X, Key) -> + amqp_channel:call(Ch, #'queue.bind'{queue = Q, + exchange = X, + routing_key = Key}). + +unbind_queue(Ch, Q, X, Key) -> + amqp_channel:call(Ch, #'queue.unbind'{queue = Q, + exchange = X, + routing_key = Key}). + +bind_exchange(Ch, D, S, Key) -> + amqp_channel:call(Ch, #'exchange.bind'{destination = D, + source = S, + routing_key = Key}). + +bind_queue(Ch, X, Key) -> + Q = declare_queue(Ch), + bind_queue(Ch, Q, X, Key), + Q. + +delete_exchange(Ch, X) -> + amqp_channel:call(Ch, #'exchange.delete'{exchange = X}). + +delete_queue(Ch, Q) -> + amqp_channel:call(Ch, #'queue.delete'{queue = Q}). + +await_binding(Config, Node, X, Key) -> + await_binding(Config, Node, X, Key, 1). + +await_binding(Config, Node, X, Key, ExpectedBindingCount) when is_integer(ExpectedBindingCount) -> + await_binding(Config, Node, <<"/">>, X, Key, ExpectedBindingCount). + +await_binding(Config, Node, Vhost, X, Key, ExpectedBindingCount) when is_integer(ExpectedBindingCount) -> + Attempts = 100, + await_binding(Config, Node, Vhost, X, Key, ExpectedBindingCount, Attempts). + +await_binding(_Config, _Node, _Vhost, _X, _Key, ExpectedBindingCount, 0) -> + {error, rabbit_misc:format("expected ~s bindings but they did not materialize in time", [ExpectedBindingCount])}; +await_binding(Config, Node, Vhost, X, Key, ExpectedBindingCount, AttemptsLeft) when is_integer(ExpectedBindingCount) -> + case bound_keys_from(Config, Node, Vhost, X, Key) of + Bs when length(Bs) < ExpectedBindingCount -> + timer:sleep(100), + await_binding(Config, Node, Vhost, X, Key, ExpectedBindingCount, AttemptsLeft - 1); + Bs when length(Bs) =:= ExpectedBindingCount -> + ok; + Bs -> + {error, rabbit_misc:format("expected ~b bindings, got ~b", [ExpectedBindingCount, length(Bs)])} + end. + +await_bindings(Config, Node, X, Keys) -> + [await_binding(Config, Node, X, Key) || Key <- Keys]. + +await_binding_absent(Config, Node, X, Key) -> + case bound_keys_from(Config, Node, <<"/">>, X, Key) of + [] -> ok; + _ -> timer:sleep(100), + await_binding_absent(Config, Node, X, Key) + end. + +bound_keys_from(Config, Node, Vhost, X, Key) -> + List = rabbit_ct_broker_helpers:rpc(Config, Node, rabbit_binding, + list_for_source, [xr(Vhost, X)]), + [K || #binding{key = K} <- List, K =:= Key]. + +publish(Ch, X, Key, Payload) when is_binary(Payload) -> + publish(Ch, X, Key, #amqp_msg{payload = Payload}); + +publish(Ch, X, Key, Msg = #amqp_msg{}) -> + amqp_channel:call(Ch, #'basic.publish'{exchange = X, + routing_key = Key}, Msg). + +publish_expect(Ch, X, Key, Q, Payload) -> + publish(Ch, X, Key, Payload), + expect(Ch, Q, [Payload]). + +%%---------------------------------------------------------------------------- + +assert_connections(Config, Node, Xs, Conns) -> + rabbit_ct_broker_helpers:rpc(Config, Node, + ?MODULE, assert_connections1, [Xs, Conns]). + +assert_connections1(Xs, Conns) -> + Links = [{X, C, X} || + X <- Xs, + C <- Conns], + Remaining = lists:foldl( + fun (Link, Status) -> + rabbit_federation_test_util:assert_link_status( + Link, Status, {exchange, upstream_exchange}) + end, rabbit_federation_status:status(), Links), + [] = Remaining, + ok. + +connection_pids(Config, Node) -> + [P || [{pid, P}] <- + rabbit_ct_broker_helpers:rpc(Config, Node, + rabbit_networking, connection_info_all, [[pid]])]. + +upstream_downstream() -> + [x(<<"upstream">>), x(<<"fed.downstream">>)]. diff --git a/deps/rabbitmq_federation/test/federation_status_command_SUITE.erl b/deps/rabbitmq_federation/test/federation_status_command_SUITE.erl new file mode 100644 index 0000000000..b7702bcf02 --- /dev/null +++ b/deps/rabbitmq_federation/test/federation_status_command_SUITE.erl @@ -0,0 +1,168 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. All rights reserved. +%% + +-module(federation_status_command_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). + +-compile(export_all). + +-define(CMD, 'Elixir.RabbitMQ.CLI.Ctl.Commands.FederationStatusCommand'). + +all() -> + [ + {group, not_federated}, + {group, federated}, + {group, federated_down} + ]. + +groups() -> + [ + {not_federated, [], [ + run_not_federated, + output_not_federated + ]}, + {federated, [], [ + run_federated, + output_federated + ]}, + {federated_down, [], [ + run_down_federated + ]} + ]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodename_suffix, ?MODULE} + ]), + Config2 = rabbit_ct_helpers:run_setup_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()), + Config2. + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_group(federated, Config) -> + rabbit_federation_test_util:setup_federation(Config), + Config; +init_per_group(federated_down, Config) -> + rabbit_federation_test_util:setup_down_federation(Config), + Config; +init_per_group(_, Config) -> + Config. + +end_per_group(_, Config) -> + Config. + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase). + +end_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +%% ------------------------------------------------------------------- +%% Testcases. +%% ------------------------------------------------------------------- +run_not_federated(Config) -> + [A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Opts = #{node => A}, + {stream, []} = ?CMD:run([], Opts#{only_down => false}). + +output_not_federated(Config) -> + [A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Opts = #{node => A}, + {stream, []} = ?CMD:output({stream, []}, Opts). + +run_federated(Config) -> + [A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Opts = #{node => A}, + %% All + rabbit_federation_test_util:with_ch( + Config, + fun(_) -> + timer:sleep(3000), + {stream, [Props]} = ?CMD:run([], Opts#{only_down => false}), + <<"upstream">> = proplists:get_value(upstream_queue, Props), + <<"fed.downstream">> = proplists:get_value(queue, Props), + <<"fed.tag">> = proplists:get_value(consumer_tag, Props), + running = proplists:get_value(status, Props) + end, + [rabbit_federation_test_util:q(<<"upstream">>), + rabbit_federation_test_util:q(<<"fed.downstream">>)]), + %% Down + rabbit_federation_test_util:with_ch( + Config, + fun(_) -> + {stream, []} = ?CMD:run([], Opts#{only_down => true}) + end, + [rabbit_federation_test_util:q(<<"upstream">>), + rabbit_federation_test_util:q(<<"fed.downstream">>)]). + +run_down_federated(Config) -> + [A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Opts = #{node => A}, + %% All + rabbit_federation_test_util:with_ch( + Config, + fun(_) -> + timer:sleep(3000), + {stream, ManyProps} = ?CMD:run([], Opts#{only_down => false}), + Links = [{proplists:get_value(upstream, Props), + proplists:get_value(status, Props)} + || Props <- ManyProps], + [{<<"broken-bunny">>, error}, {<<"localhost">>, running}] + = lists:sort(Links) + end, + [rabbit_federation_test_util:q(<<"upstream">>), + rabbit_federation_test_util:q(<<"fed.downstream">>)]), + %% Down + rabbit_federation_test_util:with_ch( + Config, + fun(_) -> + timer:sleep(3000), + {stream, [Props]} = ?CMD:run([], Opts#{only_down => true}), + <<"broken-bunny">> = proplists:get_value(upstream, Props), + error = proplists:get_value(status, Props) + end, + [rabbit_federation_test_util:q(<<"upstream">>), + rabbit_federation_test_util:q(<<"fed.downstream">>)]). + +output_federated(Config) -> + [A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Opts = #{node => A}, + Input = {stream,[[{queue, <<"fed.downstream">>}, + {consumer_tag, <<"fed.tag">>}, + {upstream_queue, <<"upstream">>}, + {type, queue}, + {vhost, <<"/">>}, + {upstream, <<"localhost">>}, + {status, running}, + {local_connection, <<"<rmq-ct-federation_status_command_SUITE-1-21000@localhost.1.563.0>">>}, + {uri, <<"amqp://localhost:21000">>}, + {timestamp, {{2016,11,21},{8,51,19}}}]]}, + {stream, [#{queue := <<"fed.downstream">>, + upstream_queue := <<"upstream">>, + type := queue, + vhost := <<"/">>, + upstream := <<"localhost">>, + status := running, + local_connection := <<"<rmq-ct-federation_status_command_SUITE-1-21000@localhost.1.563.0>">>, + uri := <<"amqp://localhost:21000">>, + last_changed := <<"2016-11-21 08:51:19">>, + exchange := <<>>, + upstream_exchange := <<>>, + error := <<>>}]} + = ?CMD:output(Input, Opts). diff --git a/deps/rabbitmq_federation/test/queue_SUITE.erl b/deps/rabbitmq_federation/test/queue_SUITE.erl new file mode 100644 index 0000000000..5c3660fb64 --- /dev/null +++ b/deps/rabbitmq_federation/test/queue_SUITE.erl @@ -0,0 +1,328 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. All rights reserved. +%% + +-module(queue_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). + +-compile(export_all). + +-import(rabbit_federation_test_util, + [wait_for_federation/2, expect/3, expect/4, + set_upstream/4, set_upstream/5, clear_upstream/3, set_policy/5, clear_policy/3, + set_policy_pattern/5, set_policy_upstream/5, q/1, with_ch/3, + declare_queue/2, delete_queue/2, + federation_links_in_vhost/3]). + +-define(INITIAL_WAIT, 6000). +-define(EXPECT_FEDERATION_TIMEOUT, 30000). + +all() -> + [ + {group, without_disambiguate}, + {group, with_disambiguate} + ]. + +groups() -> + [ + {without_disambiguate, [], [ + {cluster_size_1, [], [ + simple, + multiple_upstreams, + multiple_upstreams_pattern, + multiple_downstreams, + bidirectional, + dynamic_reconfiguration, + federate_unfederate, + dynamic_plugin_stop_start + ]} + ]}, + {with_disambiguate, [], [ + {cluster_size_2, [], [ + restart_upstream + ]} + ]} + ]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_group(without_disambiguate, Config) -> + rabbit_ct_helpers:set_config(Config, + {disambiguate_step, []}); +init_per_group(with_disambiguate, Config) -> + rabbit_ct_helpers:set_config(Config, + {disambiguate_step, [fun rabbit_federation_test_util:disambiguate/1]}); +init_per_group(cluster_size_1 = Group, Config) -> + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodes_count, 1} + ]), + init_per_group1(Group, Config1); +init_per_group(cluster_size_2 = Group, Config) -> + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodes_count, 2} + ]), + init_per_group1(Group, Config1). + +init_per_group1(Group, Config) -> + SetupFederation = case Group of + cluster_size_1 -> [fun rabbit_federation_test_util:setup_federation/1]; + cluster_size_2 -> [] + end, + Disambiguate = ?config(disambiguate_step, Config), + Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodename_suffix, Suffix}, + {rmq_nodes_clustered, false} + ]), + rabbit_ct_helpers:run_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps() ++ + SetupFederation ++ Disambiguate). + +end_per_group(without_disambiguate, Config) -> + Config; +end_per_group(with_disambiguate, Config) -> + Config; +end_per_group(_, Config) -> + rabbit_ct_helpers:run_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase). + +end_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +%% ------------------------------------------------------------------- +%% Testcases. +%% ------------------------------------------------------------------- + +simple(Config) -> + with_ch(Config, + fun (Ch) -> + expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>) + end, upstream_downstream()). + +multiple_upstreams(Config) -> + with_ch(Config, + fun (Ch) -> + expect_federation(Ch, <<"upstream">>, <<"fed12.downstream">>), + expect_federation(Ch, <<"upstream2">>, <<"fed12.downstream">>) + end, [q(<<"upstream">>), + q(<<"upstream2">>), + q(<<"fed12.downstream">>)]). + +multiple_upstreams_pattern(Config) -> + set_upstream(Config, 0, <<"local453x">>, + rabbit_ct_broker_helpers:node_uri(Config, 0), [ + {<<"exchange">>, <<"upstream">>}, + {<<"queue">>, <<"upstream">>}]), + + set_upstream(Config, 0, <<"zzzzzZZzz">>, + rabbit_ct_broker_helpers:node_uri(Config, 0), [ + {<<"exchange">>, <<"upstream-zzz">>}, + {<<"queue">>, <<"upstream-zzz">>}]), + + set_upstream(Config, 0, <<"local3214x">>, + rabbit_ct_broker_helpers:node_uri(Config, 0), [ + {<<"exchange">>, <<"upstream2">>}, + {<<"queue">>, <<"upstream2">>}]), + + set_policy_pattern(Config, 0, <<"pattern">>, <<"^pattern\.">>, <<"local\\d+x">>), + + with_ch(Config, + fun (Ch) -> + expect_federation(Ch, <<"upstream">>, <<"pattern.downstream">>, ?EXPECT_FEDERATION_TIMEOUT), + expect_federation(Ch, <<"upstream2">>, <<"pattern.downstream">>, ?EXPECT_FEDERATION_TIMEOUT) + end, [q(<<"upstream">>), + q(<<"upstream2">>), + q(<<"pattern.downstream">>)]), + + clear_upstream(Config, 0, <<"local453x">>), + clear_upstream(Config, 0, <<"local3214x">>), + clear_policy(Config, 0, <<"pattern">>). + +multiple_downstreams(Config) -> + with_ch(Config, + fun (Ch) -> + timer:sleep(?INITIAL_WAIT), + expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>, ?EXPECT_FEDERATION_TIMEOUT), + expect_federation(Ch, <<"upstream">>, <<"fed.downstream2">>, ?EXPECT_FEDERATION_TIMEOUT) + end, upstream_downstream() ++ [q(<<"fed.downstream2">>)]). + +bidirectional(Config) -> + with_ch(Config, + fun (Ch) -> + timer:sleep(?INITIAL_WAIT), + publish_expect(Ch, <<>>, <<"one">>, <<"one">>, <<"first one">>, ?EXPECT_FEDERATION_TIMEOUT), + publish_expect(Ch, <<>>, <<"two">>, <<"two">>, <<"first two">>, ?EXPECT_FEDERATION_TIMEOUT), + Seq = lists:seq(1, 100), + [publish(Ch, <<>>, <<"one">>, <<"bulk">>) || _ <- Seq], + [publish(Ch, <<>>, <<"two">>, <<"bulk">>) || _ <- Seq], + expect(Ch, <<"one">>, repeat(150, <<"bulk">>)), + expect(Ch, <<"two">>, repeat(50, <<"bulk">>)), + expect_empty(Ch, <<"one">>), + expect_empty(Ch, <<"two">>) + end, [q(<<"one">>), + q(<<"two">>)]). + +dynamic_reconfiguration(Config) -> + with_ch(Config, + fun (Ch) -> + timer:sleep(?INITIAL_WAIT), + expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>, ?EXPECT_FEDERATION_TIMEOUT), + + %% Test that clearing connections works + clear_upstream(Config, 0, <<"localhost">>), + expect_no_federation(Ch, <<"upstream">>, <<"fed.downstream">>), + + %% Test that reading them and changing them works + set_upstream(Config, 0, + <<"localhost">>, rabbit_ct_broker_helpers:node_uri(Config, 0)), + %% Do it twice so we at least hit the no-restart optimisation + URI = rabbit_ct_broker_helpers:node_uri(Config, 0, [use_ipaddr]), + set_upstream(Config, 0, <<"localhost">>, URI), + set_upstream(Config, 0, <<"localhost">>, URI), + expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>) + end, upstream_downstream()). + +federate_unfederate(Config) -> + with_ch(Config, + fun (Ch) -> + timer:sleep(?INITIAL_WAIT), + expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>, ?EXPECT_FEDERATION_TIMEOUT), + expect_federation(Ch, <<"upstream">>, <<"fed.downstream2">>, ?EXPECT_FEDERATION_TIMEOUT), + + %% clear the policy + rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"fed">>), + + expect_no_federation(Ch, <<"upstream">>, <<"fed.downstream">>), + expect_no_federation(Ch, <<"upstream">>, <<"fed.downstream2">>), + + rabbit_ct_broker_helpers:set_policy(Config, 0, + <<"fed">>, <<"^fed\.">>, <<"all">>, [ + {<<"federation-upstream-set">>, <<"upstream">>}]) + end, upstream_downstream() ++ [q(<<"fed.downstream2">>)]). + +dynamic_plugin_stop_start(Config) -> + DownQ2 = <<"fed.downstream2">>, + with_ch(Config, + fun (Ch) -> + timer:sleep(?INITIAL_WAIT), + UpQ = <<"upstream">>, + DownQ1 = <<"fed.downstream">>, + expect_federation(Ch, UpQ, DownQ1, ?EXPECT_FEDERATION_TIMEOUT), + expect_federation(Ch, UpQ, DownQ2, ?EXPECT_FEDERATION_TIMEOUT), + + %% Disable the plugin, the link disappears + ok = rabbit_ct_broker_helpers:disable_plugin(Config, 0, "rabbitmq_federation"), + + expect_no_federation(Ch, UpQ, DownQ1), + expect_no_federation(Ch, UpQ, DownQ2), + + declare_queue(Ch, q(DownQ1)), + declare_queue(Ch, q(DownQ2)), + ok = rabbit_ct_broker_helpers:enable_plugin(Config, 0, "rabbitmq_federation"), + + %% Declare a queue then re-enable the plugin, the links appear + wait_for_federation( + 90, + fun() -> + Status = rabbit_ct_broker_helpers:rpc(Config, 0, + rabbit_federation_status, status, []), + L = [ + Entry || Entry <- Status, + proplists:get_value(queue, Entry) =:= DownQ1 orelse + proplists:get_value(queue, Entry) =:= DownQ2, + proplists:get_value(upstream_queue, Entry) =:= UpQ, + proplists:get_value(status, Entry) =:= running + ], + length(L) =:= 2 + end), + expect_federation(Ch, UpQ, DownQ1, 120000) + end, upstream_downstream() ++ [q(DownQ2)]). + +restart_upstream(Config) -> + [Rabbit, Hare] = rabbit_ct_broker_helpers:get_node_configs(Config, + nodename), + set_policy_upstream(Config, Rabbit, <<"^test$">>, + rabbit_ct_broker_helpers:node_uri(Config, Hare), []), + + Downstream = rabbit_ct_client_helpers:open_channel(Config, Rabbit), + Upstream = rabbit_ct_client_helpers:open_channel(Config, Hare), + + declare_queue(Upstream, q(<<"test">>)), + declare_queue(Downstream, q(<<"test">>)), + Seq = lists:seq(1, 100), + [publish(Upstream, <<>>, <<"test">>, <<"bulk">>) || _ <- Seq], + expect(Upstream, <<"test">>, repeat(25, <<"bulk">>)), + expect(Downstream, <<"test">>, repeat(25, <<"bulk">>)), + + rabbit_ct_client_helpers:close_channels_and_connection(Config, Hare), + ok = rabbit_ct_broker_helpers:restart_node(Config, Hare), + Upstream2 = rabbit_ct_client_helpers:open_channel(Config, Hare), + + expect(Upstream2, <<"test">>, repeat(25, <<"bulk">>)), + expect(Downstream, <<"test">>, repeat(25, <<"bulk">>)), + expect_empty(Upstream2, <<"test">>), + expect_empty(Downstream, <<"test">>), + + ok. + +%upstream_has_no_federation(Config) -> +% %% TODO +% ok. + +%%---------------------------------------------------------------------------- +repeat(Count, Item) -> [Item || _ <- lists:seq(1, Count)]. + +%%---------------------------------------------------------------------------- + +publish(Ch, X, Key, Payload) when is_binary(Payload) -> + publish(Ch, X, Key, #amqp_msg{payload = Payload}); + +publish(Ch, X, Key, Msg = #amqp_msg{}) -> + amqp_channel:call(Ch, #'basic.publish'{exchange = X, + routing_key = Key}, Msg). + +publish_expect(Ch, X, Key, Q, Payload) -> + publish(Ch, X, Key, Payload), + expect(Ch, Q, [Payload]). + +publish_expect(Ch, X, Key, Q, Payload, Timeout) -> + publish(Ch, X, Key, Payload), + expect(Ch, Q, [Payload], Timeout). + +%% Doubled due to our strange basic.get behaviour. +expect_empty(Ch, Q) -> + rabbit_federation_test_util:expect_empty(Ch, Q), + rabbit_federation_test_util:expect_empty(Ch, Q). + +expect_federation(Ch, UpstreamQ, DownstreamQ) -> + publish_expect(Ch, <<>>, UpstreamQ, DownstreamQ, <<"HELLO">>). + +expect_federation(Ch, UpstreamQ, DownstreamQ, Timeout) -> + publish_expect(Ch, <<>>, UpstreamQ, DownstreamQ, <<"HELLO">>, Timeout). + +expect_no_federation(Ch, UpstreamQ, DownstreamQ) -> + publish(Ch, <<>>, UpstreamQ, <<"HELLO">>), + expect_empty(Ch, DownstreamQ), + expect(Ch, UpstreamQ, [<<"HELLO">>]). + +upstream_downstream() -> + [q(<<"upstream">>), q(<<"fed.downstream">>)]. diff --git a/deps/rabbitmq_federation/test/rabbit_federation_status_SUITE.erl b/deps/rabbitmq_federation/test/rabbit_federation_status_SUITE.erl new file mode 100644 index 0000000000..6b802a3f15 --- /dev/null +++ b/deps/rabbitmq_federation/test/rabbit_federation_status_SUITE.erl @@ -0,0 +1,129 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. All rights reserved. +%% + +-module(rabbit_federation_status_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). + +-include("rabbit_federation.hrl"). + +-compile(export_all). + +-import(rabbit_federation_test_util, + [expect/3, expect_empty/2, + set_upstream/4, clear_upstream/3, set_upstream_set/4, + set_policy/5, clear_policy/3, + set_policy_upstream/5, set_policy_upstreams/4, + no_plugins/1, with_ch/3]). + +all() -> + [ + {group, non_parallel_tests} + ]. + +groups() -> + [ + {non_parallel_tests, [], [ + exchange_status, + queue_status, + lookup_exchange_status, + lookup_queue_status, + lookup_bad_status + ]} + ]. + +suite() -> + [{timetrap, {minutes, 5}}]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodename_suffix, ?MODULE} + ]), + rabbit_ct_helpers:run_setup_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps() ++ + [fun rabbit_federation_test_util:setup_federation/1]). +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_group(_, Config) -> + Config. + +end_per_group(_, Config) -> + Config. + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase). + +end_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +%% ------------------------------------------------------------------- +%% Testcases. +%% ------------------------------------------------------------------- +exchange_status(Config) -> + exchange_SUITE:with_ch( + Config, + fun (_Ch) -> + [Link] = rabbit_ct_broker_helpers:rpc(Config, 0, + rabbit_federation_status, status, []), + true = is_binary(proplists:get_value(id, Link)) + end, exchange_SUITE:upstream_downstream()). + +queue_status(Config) -> + with_ch( + Config, + fun (_Ch) -> + timer:sleep(3000), + [Link] = rabbit_ct_broker_helpers:rpc(Config, 0, + rabbit_federation_status, status, []), + true = is_binary(proplists:get_value(id, Link)) + end, queue_SUITE:upstream_downstream()). + +lookup_exchange_status(Config) -> + exchange_SUITE:with_ch( + Config, + fun (_Ch) -> + [Link] = rabbit_ct_broker_helpers:rpc(Config, 0, + rabbit_federation_status, status, []), + Id = proplists:get_value(id, Link), + Props = rabbit_ct_broker_helpers:rpc(Config, 0, + rabbit_federation_status, lookup, [Id]), + lists:all(fun(K) -> lists:keymember(K, 1, Props) end, + [key, uri, status, timestamp, id, supervisor, upstream]) + end, exchange_SUITE:upstream_downstream()). + +lookup_queue_status(Config) -> + with_ch( + Config, + fun (_Ch) -> + timer:sleep(3000), + [Link] = rabbit_ct_broker_helpers:rpc(Config, 0, + rabbit_federation_status, status, []), + Id = proplists:get_value(id, Link), + Props = rabbit_ct_broker_helpers:rpc(Config, 0, + rabbit_federation_status, lookup, [Id]), + lists:all(fun(K) -> lists:keymember(K, 1, Props) end, + [key, uri, status, timestamp, id, supervisor, upstream]) + end, queue_SUITE:upstream_downstream()). + +lookup_bad_status(Config) -> + with_ch( + Config, + fun (_Ch) -> + timer:sleep(3000), + not_found = rabbit_ct_broker_helpers:rpc( + Config, 0, + rabbit_federation_status, lookup, [<<"justmadeitup">>]) + end, queue_SUITE:upstream_downstream()). diff --git a/deps/rabbitmq_federation/test/rabbit_federation_test_util.erl b/deps/rabbitmq_federation/test/rabbit_federation_test_util.erl new file mode 100644 index 0000000000..534817a2a4 --- /dev/null +++ b/deps/rabbitmq_federation/test/rabbit_federation_test_util.erl @@ -0,0 +1,354 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_federation_test_util). + +-include("rabbit_federation.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). + +-compile(export_all). + +-import(rabbit_misc, [pget/2]). + +setup_federation(Config) -> + rabbit_ct_broker_helpers:set_parameter(Config, 0, + <<"federation-upstream">>, <<"localhost">>, [ + {<<"uri">>, rabbit_ct_broker_helpers:node_uri(Config, 0)}, + {<<"consumer-tag">>, <<"fed.tag">>}]), + + rabbit_ct_broker_helpers:set_parameter(Config, 0, + <<"federation-upstream">>, <<"local5673">>, [ + {<<"uri">>, <<"amqp://localhost:1">>}]), + + rabbit_ct_broker_helpers:set_parameter(Config, 0, + <<"federation-upstream-set">>, <<"upstream">>, [ + [ + {<<"upstream">>, <<"localhost">>}, + {<<"exchange">>, <<"upstream">>}, + {<<"queue">>, <<"upstream">>} + ] + ]), + + rabbit_ct_broker_helpers:set_parameter(Config, 0, + <<"federation-upstream-set">>, <<"upstream2">>, [ + [ + {<<"upstream">>, <<"localhost">>}, + {<<"exchange">>, <<"upstream2">>}, + {<<"queue">>, <<"upstream2">>} + ] + ]), + + rabbit_ct_broker_helpers:set_parameter(Config, 0, + <<"federation-upstream-set">>, <<"localhost">>, [ + [{<<"upstream">>, <<"localhost">>}] + ]), + + rabbit_ct_broker_helpers:set_parameter(Config, 0, + <<"federation-upstream-set">>, <<"upstream12">>, [ + [ + {<<"upstream">>, <<"localhost">>}, + {<<"exchange">>, <<"upstream">>}, + {<<"queue">>, <<"upstream">>} + ], [ + {<<"upstream">>, <<"localhost">>}, + {<<"exchange">>, <<"upstream2">>}, + {<<"queue">>, <<"upstream2">>} + ] + ]), + + rabbit_ct_broker_helpers:set_parameter(Config, 0, + <<"federation-upstream-set">>, <<"one">>, [ + [ + {<<"upstream">>, <<"localhost">>}, + {<<"exchange">>, <<"one">>}, + {<<"queue">>, <<"one">>} + ] + ]), + + rabbit_ct_broker_helpers:set_parameter(Config, 0, + <<"federation-upstream-set">>, <<"two">>, [ + [ + {<<"upstream">>, <<"localhost">>}, + {<<"exchange">>, <<"two">>}, + {<<"queue">>, <<"two">>} + ] + ]), + + rabbit_ct_broker_helpers:set_parameter(Config, 0, + <<"federation-upstream-set">>, <<"upstream5673">>, [ + [ + {<<"upstream">>, <<"local5673">>}, + {<<"exchange">>, <<"upstream">>} + ] + ]), + + rabbit_ct_broker_helpers:set_policy(Config, 0, + <<"fed">>, <<"^fed\.">>, <<"all">>, [ + {<<"federation-upstream-set">>, <<"upstream">>}]), + + rabbit_ct_broker_helpers:set_policy(Config, 0, + <<"fed12">>, <<"^fed12\.">>, <<"all">>, [ + {<<"federation-upstream-set">>, <<"upstream12">>}]), + + rabbit_ct_broker_helpers:set_policy(Config, 0, + <<"one">>, <<"^two$">>, <<"all">>, [ + {<<"federation-upstream-set">>, <<"one">>}]), + + rabbit_ct_broker_helpers:set_policy(Config, 0, + <<"two">>, <<"^one$">>, <<"all">>, [ + {<<"federation-upstream-set">>, <<"two">>}]), + + rabbit_ct_broker_helpers:set_policy(Config, 0, + <<"hare">>, <<"^hare\.">>, <<"all">>, [ + {<<"federation-upstream-set">>, <<"upstream5673">>}]), + + rabbit_ct_broker_helpers:set_policy(Config, 0, + <<"all">>, <<"^all\.">>, <<"all">>, [ + {<<"federation-upstream-set">>, <<"all">>}]), + + rabbit_ct_broker_helpers:set_policy(Config, 0, + <<"new">>, <<"^new\.">>, <<"all">>, [ + {<<"federation-upstream-set">>, <<"new-set">>}]), + Config. + +setup_down_federation(Config) -> + rabbit_ct_broker_helpers:set_parameter( + Config, 0, <<"federation-upstream">>, <<"broken-bunny">>, + [{<<"uri">>, <<"amqp://broken-bunny">>}, + {<<"reconnect-delay">>, 600000}]), + rabbit_ct_broker_helpers:set_parameter( + Config, 0, <<"federation-upstream">>, <<"localhost">>, + [{<<"uri">>, rabbit_ct_broker_helpers:node_uri(Config, 0)}]), + rabbit_ct_broker_helpers:set_parameter( + Config, 0, + <<"federation-upstream-set">>, <<"upstream">>, + [[{<<"upstream">>, <<"localhost">>}, + {<<"exchange">>, <<"upstream">>}, + {<<"queue">>, <<"upstream">>}], + [{<<"upstream">>, <<"broken-bunny">>}, + {<<"exchange">>, <<"upstream">>}, + {<<"queue">>, <<"upstream">>}]]), + rabbit_ct_broker_helpers:set_policy( + Config, 0, + <<"fed">>, <<"^fed\.">>, <<"all">>, [{<<"federation-upstream-set">>, <<"upstream">>}]), + rabbit_ct_broker_helpers:set_policy( + Config, 0, + <<"fed">>, <<"^fed\.">>, <<"all">>, [{<<"federation-upstream-set">>, <<"upstream">>}]), + Config. + +wait_for_federation(Retries, Fun) -> + case Fun() of + true -> + ok; + false when Retries > 0 -> + timer:sleep(1000), + wait_for_federation(Retries - 1, Fun); + false -> + throw({timeout_while_waiting_for_federation, Fun}) + end. + +expect(Ch, Q, Fun) when is_function(Fun) -> + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, + no_ack = true}, self()), + CTag = receive + #'basic.consume_ok'{consumer_tag = CT} -> CT + end, + Fun(), + amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag}); + +expect(Ch, Q, Payloads) -> + expect(Ch, Q, fun() -> expect(Payloads) end). + +expect(Ch, Q, Payloads, Timeout) -> + expect(Ch, Q, fun() -> expect(Payloads, Timeout) end). + +expect([]) -> + ok; +expect(Payloads) -> + expect(Payloads, 60000). + +expect([], _Timeout) -> + ok; +expect(Payloads, Timeout) -> + receive + {#'basic.deliver'{}, #amqp_msg{payload = Payload}} -> + case lists:member(Payload, Payloads) of + true -> + ct:pal("Consumed a message: ~p", [Payload]), + expect(Payloads -- [Payload], Timeout); + false -> ?assert(false, rabbit_misc:format("received an unexpected payload ~p", [Payload])) + end + after Timeout -> + ?assert(false, rabbit_misc:format("Did not receive expected payloads ~p in time", [Payloads])) + end. + +expect_empty(Ch, Q) -> + ?assertMatch(#'basic.get_empty'{}, + amqp_channel:call(Ch, #'basic.get'{ queue = Q })). + +set_upstream(Config, Node, Name, URI) -> + set_upstream(Config, Node, Name, URI, []). + +set_upstream(Config, Node, Name, URI, Extra) -> + rabbit_ct_broker_helpers:set_parameter(Config, Node, + <<"federation-upstream">>, Name, [{<<"uri">>, URI} | Extra]). + +set_upstream_in_vhost(Config, Node, VirtualHost, Name, URI) -> + set_upstream_in_vhost(Config, Node, VirtualHost, Name, URI, []). + +set_upstream_in_vhost(Config, Node, VirtualHost, Name, URI, Extra) -> + rabbit_ct_broker_helpers:set_parameter(Config, Node, VirtualHost, + <<"federation-upstream">>, Name, [{<<"uri">>, URI} | Extra]). + +clear_upstream(Config, Node, Name) -> + rabbit_ct_broker_helpers:clear_parameter(Config, Node, + <<"federation-upstream">>, Name). + +set_upstream_set(Config, Node, Name, Set) -> + rabbit_ct_broker_helpers:set_parameter(Config, Node, + <<"federation-upstream-set">>, Name, + [[{<<"upstream">>, UStream} | Extra] || {UStream, Extra} <- Set]). + +clear_upstream_set(Config, Node, Name) -> + rabbit_ct_broker_helpers:clear_parameter(Config, Node, + <<"federation-upstream-set">>, Name). + +set_policy(Config, Node, Name, Pattern, UpstreamSet) -> + rabbit_ct_broker_helpers:set_policy(Config, Node, + Name, Pattern, <<"all">>, + [{<<"federation-upstream-set">>, UpstreamSet}]). + +set_policy_pattern(Config, Node, Name, Pattern, Regex) -> + rabbit_ct_broker_helpers:set_policy(Config, Node, + Name, Pattern, <<"all">>, + [{<<"federation-upstream-pattern">>, Regex}]). + +clear_policy(Config, Node, Name) -> + rabbit_ct_broker_helpers:clear_policy(Config, Node, Name). + +set_policy_upstream(Config, Node, Pattern, URI, Extra) -> + set_policy_upstreams(Config, Node, Pattern, [{URI, Extra}]). + +set_policy_upstreams(Config, Node, Pattern, URIExtras) -> + put(upstream_num, 1), + [set_upstream(Config, Node, gen_upstream_name(), URI, Extra) + || {URI, Extra} <- URIExtras], + set_policy(Config, Node, Pattern, Pattern, <<"all">>). + +gen_upstream_name() -> + list_to_binary("upstream-" ++ integer_to_list(next_upstream_num())). + +next_upstream_num() -> + R = get(upstream_num) + 1, + put(upstream_num, R), + R. + +%% Make sure that even though multiple nodes are in a single +%% distributed system, we still keep all our process groups separate. +disambiguate(Config) -> + rabbit_ct_broker_helpers:rpc_all(Config, + application, set_env, + [rabbitmq_federation, pgroup_name_cluster_id, true]), + Config. + +no_plugins(Cfg) -> + [{K, case K of + plugins -> none; + _ -> V + end} || {K, V} <- Cfg]. + +%%---------------------------------------------------------------------------- + +all_federation_links(Config, Node) -> + rabbit_ct_broker_helpers:rpc(Config, Node, rabbit_federation_status, status, []). + +federation_links_in_vhost(Config, Node, VirtualHost) -> + Links = rabbit_ct_broker_helpers:rpc(Config, Node, rabbit_federation_status, status, []), + lists:filter( + fun(Link) -> + VirtualHost =:= proplists:get_value(vhost, Link) + end, Links). + +status_fields(Prop, Statuses) -> + lists:usort( + lists:map( + fun(Link) -> proplists:get_value(Prop, Link) end, + Statuses)). + +assert_status(Config, Node, XorQs, Names) -> + rabbit_ct_broker_helpers:rpc(Config, Node, + ?MODULE, assert_status1, [XorQs, Names]). + +assert_status1(XorQs, Names) -> + [begin + ct:pal("links(XorQ) for ~p: ~p", [XorQ, links(XorQ)]) + end || XorQ <- XorQs], + Links = lists:append([links(XorQ) || XorQ <- XorQs]), + Remaining = lists:foldl(fun (Link, Status) -> + assert_link_status(Link, Status, Names) + end, rabbit_federation_status:status(), Links), + ?assertEqual([], Remaining), + ok. + +assert_link_status({DXorQNameBin, UpstreamName, UXorQNameBin}, Status, + {TypeName, UpstreamTypeName}) -> + {This, Rest} = lists:partition( + fun(St) -> + pget(upstream, St) =:= UpstreamName andalso + pget(TypeName, St) =:= DXorQNameBin andalso + pget(UpstreamTypeName, St) =:= UXorQNameBin + end, Status), + ?assertMatch([_], This), + Rest. + +links(#'exchange.declare'{exchange = Name}) -> + case rabbit_exchange:lookup(xr(Name)) of + {ok, X} -> + case rabbit_policy:get(<<"federation-upstream-set">>, X) of + undefined -> + case rabbit_policy:get(<<"federation-upstream-pattern">>, X) of + undefined -> []; + Regex -> + [{Name, U#upstream.name, U#upstream.exchange_name} || + U <- rabbit_federation_upstream:from_pattern(Regex, X)] + end; + Set -> + [{Name, U#upstream.name, U#upstream.exchange_name} || + U <- rabbit_federation_upstream:from_set(Set, X)] + end; + {error, not_found} -> + [] + end. + +xr(Name) -> rabbit_misc:r(<<"/">>, exchange, Name). + +with_ch(Config, Fun, Qs) -> + Ch = rabbit_ct_client_helpers:open_channel(Config, 0), + declare_all(Ch, Qs), + %% Clean up queues even after test failure. + try + Fun(Ch) + after + delete_all(Ch, Qs), + rabbit_ct_client_helpers:close_channel(Ch) + end, + ok. + +declare_all(Ch, Qs) -> [declare_queue(Ch, Q) || Q <- Qs]. +delete_all(Ch, Qs) -> + [delete_queue(Ch, Q) || #'queue.declare'{queue = Q} <- Qs]. + +declare_queue(Ch, Q) -> + amqp_channel:call(Ch, Q). + +delete_queue(Ch, Q) -> + amqp_channel:call(Ch, #'queue.delete'{queue = Q}). + +q(Name) -> + #'queue.declare'{queue = Name, + durable = true}. diff --git a/deps/rabbitmq_federation/test/restart_federation_link_command_SUITE.erl b/deps/rabbitmq_federation/test/restart_federation_link_command_SUITE.erl new file mode 100644 index 0000000000..511bae6540 --- /dev/null +++ b/deps/rabbitmq_federation/test/restart_federation_link_command_SUITE.erl @@ -0,0 +1,101 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(restart_federation_link_command_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). + +-compile(export_all). + +-define(CMD, 'Elixir.RabbitMQ.CLI.Ctl.Commands.RestartFederationLinkCommand'). + +all() -> + [ + {group, federated_down} + ]. + +groups() -> + [ + {federated_down, [], [ + run, + run_not_found, + output + ]} + ]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodename_suffix, ?MODULE} + ]), + Config2 = rabbit_ct_helpers:run_setup_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()), + Config2. + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_group(federated_down, Config) -> + rabbit_federation_test_util:setup_down_federation(Config), + Config; +init_per_group(_, Config) -> + Config. + +end_per_group(_, Config) -> + Config. + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase). + +end_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +%% ------------------------------------------------------------------- +%% Testcases. +%% ------------------------------------------------------------------- +run_not_federated(Config) -> + [A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Opts = #{node => A}, + {stream, []} = ?CMD:run([], Opts#{'only-down' => false}). + +output_not_federated(Config) -> + [A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Opts = #{node => A}, + {stream, []} = ?CMD:output({stream, []}, Opts). + +run(Config) -> + [A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Opts = #{node => A}, + rabbit_federation_test_util:with_ch( + Config, + fun(_) -> + timer:sleep(3000), + [Link | _] = rabbit_ct_broker_helpers:rpc(Config, 0, + rabbit_federation_status, status, []), + Id = proplists:get_value(id, Link), + ok = ?CMD:run([Id], Opts) + end, + [rabbit_federation_test_util:q(<<"upstream">>), + rabbit_federation_test_util:q(<<"fed.downstream">>)]). + +run_not_found(Config) -> + [A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Opts = #{node => A}, + {error, _ErrorMsg} = ?CMD:run([<<"MakingItUp">>], Opts). + +output(Config) -> + [A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Opts = #{node => A}, + ok = ?CMD:output(ok, Opts). diff --git a/deps/rabbitmq_federation/test/unit_SUITE.erl b/deps/rabbitmq_federation/test/unit_SUITE.erl new file mode 100644 index 0000000000..7d0707f96c --- /dev/null +++ b/deps/rabbitmq_federation/test/unit_SUITE.erl @@ -0,0 +1,65 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. All rights reserved. +%% + +-module(unit_SUITE). +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). +-include("rabbit_federation.hrl"). + +-compile(export_all). + +all() -> [ + obfuscate_upstream, + obfuscate_upstream_params_network, + obfuscate_upstream_params_network_with_char_list_password_value, + obfuscate_upstream_params_direct +]. + +init_per_suite(Config) -> + application:ensure_all_started(credentials_obfuscation), + Config. + +end_per_suite(Config) -> + Config. + +obfuscate_upstream(_Config) -> + Upstream = #upstream{uris = [<<"amqp://guest:password@localhost">>]}, + ObfuscatedUpstream = rabbit_federation_util:obfuscate_upstream(Upstream), + ?assertEqual(Upstream, rabbit_federation_util:deobfuscate_upstream(ObfuscatedUpstream)), + ok. + +obfuscate_upstream_params_network(_Config) -> + UpstreamParams = #upstream_params{ + uri = <<"amqp://guest:password@localhost">>, + params = #amqp_params_network{password = <<"password">>} + }, + ObfuscatedUpstreamParams = rabbit_federation_util:obfuscate_upstream_params(UpstreamParams), + ?assertEqual(UpstreamParams, rabbit_federation_util:deobfuscate_upstream_params(ObfuscatedUpstreamParams)), + ok. + +obfuscate_upstream_params_network_with_char_list_password_value(_Config) -> + Input = #upstream_params{ + uri = <<"amqp://guest:password@localhost">>, + params = #amqp_params_network{password = "password"} + }, + Output = #upstream_params{ + uri = <<"amqp://guest:password@localhost">>, + params = #amqp_params_network{password = <<"password">>} + }, + ObfuscatedUpstreamParams = rabbit_federation_util:obfuscate_upstream_params(Input), + ?assertEqual(Output, rabbit_federation_util:deobfuscate_upstream_params(ObfuscatedUpstreamParams)), + ok. + + obfuscate_upstream_params_direct(_Config) -> + UpstreamParams = #upstream_params{ + uri = <<"amqp://guest:password@localhost">>, + params = #amqp_params_direct{password = <<"password">>} + }, + ObfuscatedUpstreamParams = rabbit_federation_util:obfuscate_upstream_params(UpstreamParams), + ?assertEqual(UpstreamParams, rabbit_federation_util:deobfuscate_upstream_params(ObfuscatedUpstreamParams)), + ok. diff --git a/deps/rabbitmq_federation/test/unit_inbroker_SUITE.erl b/deps/rabbitmq_federation/test/unit_inbroker_SUITE.erl new file mode 100644 index 0000000000..f65dffbe8e --- /dev/null +++ b/deps/rabbitmq_federation/test/unit_inbroker_SUITE.erl @@ -0,0 +1,230 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. All rights reserved. +%% + +-module(unit_inbroker_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-include("rabbit_federation.hrl"). + +-compile(export_all). + +-define(US_NAME, <<"upstream">>). +-define(DS_NAME, <<"fed.downstream">>). + +all() -> + [ + {group, non_parallel_tests} + ]. + +groups() -> + [ + {non_parallel_tests, [], [ + serialisation, + scratch_space, + remove_credentials, + get_connection_name, + upstream_validation, + upstream_set_validation + ]} + ]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodename_suffix, ?MODULE} + ]), + rabbit_ct_helpers:run_setup_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_group(_, Config) -> + Config. + +end_per_group(_, Config) -> + Config. + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase). + +end_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +%% ------------------------------------------------------------------- +%% Testcases. +%% ------------------------------------------------------------------- + +%% Test that we apply binding changes in the correct order even when +%% they arrive out of order. +serialisation(Config) -> + ok = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, serialisation1, []). + +serialisation1() -> + with_exchanges( + fun(X) -> + [B1, B2, B3] = [b(K) || K <- [<<"1">>, <<"2">>, <<"3">>]], + remove_bindings(4, X, [B1, B3]), + add_binding(5, X, B1), + add_binding(1, X, B1), + add_binding(2, X, B2), + add_binding(3, X, B3), + %% List of lists because one for each link + Keys = rabbit_federation_exchange_link:list_routing_keys( + X#exchange.name), + [[<<"1">>, <<"2">>]] =:= Keys + end). + +scratch_space(Config) -> + ok = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, scratch_space1, []). + +scratch_space1() -> + A = <<"A">>, + B = <<"B">>, + DB = rabbit_federation_db, + with_exchanges( + fun(#exchange{name = N}) -> + DB:set_active_suffix(N, upstream(x), A), + DB:set_active_suffix(N, upstream(y), A), + DB:prune_scratch(N, [upstream(y), upstream(z)]), + DB:set_active_suffix(N, upstream(y), B), + DB:set_active_suffix(N, upstream(z), A), + none = DB:get_active_suffix(N, upstream(x), none), + B = DB:get_active_suffix(N, upstream(y), none), + A = DB:get_active_suffix(N, upstream(z), none) + end). + +remove_credentials(Config) -> + Test0 = fun (In, Exp) -> + Act = rabbit_ct_broker_helpers:rpc(Config, 0, + rabbit_federation_upstream, remove_credentials, [In]), + Exp = Act + end, + Cat = fun (Bs) -> + list_to_binary(lists:append([binary_to_list(B) || B <- Bs])) + end, + Test = fun (Scheme, Rest) -> + Exp = Cat([Scheme, Rest]), + Test0(Exp, Exp), + Test0(Cat([Scheme, <<"user@">>, Rest]), Exp), + Test0(Cat([Scheme, <<"user:pass@">>, Rest]), Exp) + end, + Test(<<"amqp://">>, <<"">>), + Test(<<"amqp://">>, <<"localhost">>), + Test(<<"amqp://">>, <<"localhost/">>), + Test(<<"amqp://">>, <<"localhost/foo">>), + Test(<<"amqp://">>, <<"localhost:5672">>), + Test(<<"amqp://">>, <<"localhost:5672/foo">>), + Test(<<"amqps://">>, <<"localhost:5672/%2f">>), + ok. + +get_connection_name(Config) -> + Amqqueue = rabbit_ct_broker_helpers:rpc( + Config, 0, + amqqueue, new, [rabbit_misc:r(<<"/">>, queue, <<"queue">>), + self(), + false, + false, + none, + [], + undefined, + #{}, + classic]), + AmqqueueWithPolicy = amqqueue:set_policy(Amqqueue, [{name, <<"my.federation.policy">>}]), + AmqqueueWithEmptyPolicy = amqqueue:set_policy(Amqqueue, []), + + + <<"Federation link (upstream: my.upstream, policy: my.federation.policy)">> = rabbit_federation_link_util:get_connection_name( + #upstream{name = <<"my.upstream">>}, + #upstream_params{x_or_q = AmqqueueWithPolicy} + ), + <<"Federation link (upstream: my.upstream, policy: my.federation.policy)">> = rabbit_federation_link_util:get_connection_name( + #upstream{name = <<"my.upstream">>}, + #upstream_params{x_or_q = #exchange{policy = [{name, <<"my.federation.policy">>}]}} + ), + <<"Federation link">> = rabbit_federation_link_util:get_connection_name( + #upstream{}, + #upstream_params{x_or_q = AmqqueueWithEmptyPolicy} + ), + <<"Federation link">> = rabbit_federation_link_util:get_connection_name( + #upstream{}, + #upstream_params{x_or_q = #exchange{policy = []}} + ), + <<"Federation link">> = rabbit_federation_link_util:get_connection_name( + whatever, + whatever + ), + ok. + +upstream_set_validation(_Config) -> + ?assertEqual(rabbit_federation_parameters:validate(<<"/">>, <<"federation-upstream-set">>, + <<"a-name">>, + [[{<<"upstream">>, <<"devtest1">>}], + [{<<"upstream">>, <<"devtest2">>}]], + <<"acting-user">>), + [[ok], [ok]]), + ?assertEqual(rabbit_federation_parameters:validate(<<"/">>, <<"federation-upstream-set">>, + <<"a-name">>, + [#{<<"upstream">> => <<"devtest3">>}, + #{<<"upstream">> => <<"devtest4">>}], + <<"acting-user">>), + [[ok], [ok]]), + ok. + +upstream_validation(_Config) -> + ?assertEqual(rabbit_federation_parameters:validate(<<"/">>, <<"federation-upstream">>, + <<"a-name">>, + [{<<"uri">>, <<"amqp://">>}], + <<"acting-user">>), + [ok]), + ?assertEqual(rabbit_federation_parameters:validate(<<"/">>, <<"federation-upstream">>, + <<"a-name">>, + #{<<"uri">> => <<"amqp://">>}, + <<"acting-user">>), + [ok]), + ok. + +with_exchanges(Fun) -> + rabbit_exchange:declare(r(?US_NAME), fanout, false, false, false, [], + <<"acting-user">>), + X = rabbit_exchange:declare(r(?DS_NAME), fanout, false, false, false, [], + <<"acting-user">>), + Fun(X), + %% Delete downstream first or it will recreate the upstream + rabbit_exchange:delete(r(?DS_NAME), false, <<"acting-user">>), + rabbit_exchange:delete(r(?US_NAME), false, <<"acting-user">>), + ok. + +add_binding(Ser, X, B) -> + rabbit_federation_exchange:add_binding(transaction, X, B), + rabbit_federation_exchange:add_binding(Ser, X, B). + +remove_bindings(Ser, X, Bs) -> + rabbit_federation_exchange:remove_bindings(transaction, X, Bs), + rabbit_federation_exchange:remove_bindings(Ser, X, Bs). + +r(Name) -> rabbit_misc:r(<<"/">>, exchange, Name). + +b(Key) -> + #binding{source = ?DS_NAME, destination = <<"whatever">>, + key = Key, args = []}. + +upstream(UpstreamName) -> + #upstream{name = atom_to_list(UpstreamName), + exchange_name = <<"upstream">>}. |