summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJean-Sébastien Pédron <jean-sebastien@rabbitmq.com>2022-06-28 13:33:59 +0200
committerGitHub <noreply@github.com>2022-06-28 13:33:59 +0200
commita48a4a09c4ec255753df738910f4e23a5d1bb921 (patch)
tree26b3c14eae488fa1ada416b404aede77eb8269b7
parent0fd051234ec167432f512d605178ada56aea9109 (diff)
parentda807547f58b2e785c11e436715f63b8e3383bf7 (diff)
downloadrabbitmq-server-git-a48a4a09c4ec255753df738910f4e23a5d1bb921.tar.gz
Merge pull request #3940 from rabbitmq/add-feature-flags-controller
Add a feature flags controller
-rw-r--r--deps/rabbit/BUILD.bazel7
-rw-r--r--deps/rabbit/include/feature_flags.hrl13
-rw-r--r--deps/rabbit/src/rabbit.erl6
-rw-r--r--deps/rabbit/src/rabbit_core_ff.erl33
-rw-r--r--deps/rabbit/src/rabbit_feature_flags.erl1075
-rw-r--r--deps/rabbit/src/rabbit_ff_controller.erl1152
-rw-r--r--deps/rabbit/src/rabbit_ff_registry.erl20
-rw-r--r--deps/rabbit/src/rabbit_ff_registry_factory.erl631
-rw-r--r--deps/rabbit/src/rabbit_prelaunch_feature_flags.erl2
-rw-r--r--deps/rabbit/test/feature_flags_SUITE.erl191
-rw-r--r--deps/rabbit/test/feature_flags_v2_SUITE.erl1540
-rw-r--r--deps/rabbit/test/feature_flags_with_unpriveleged_user_SUITE.erl1
-rw-r--r--deps/rabbit_common/src/rabbit_misc.erl1
-rw-r--r--deps/rabbitmq_cli/test/ctl/enable_feature_flag_test.exs3
-rw-r--r--deps/rabbitmq_cli/test/ctl/list_feature_flags_command_test.exs3
-rw-r--r--deps/rabbitmq_ct_helpers/src/cth_log_redirect_any_domains.erl23
-rw-r--r--deps/rabbitmq_ct_helpers/src/rabbit_ct_helpers.erl30
17 files changed, 3938 insertions, 793 deletions
diff --git a/deps/rabbit/BUILD.bazel b/deps/rabbit/BUILD.bazel
index 465b15b36c..4e5b6ee4d2 100644
--- a/deps/rabbit/BUILD.bazel
+++ b/deps/rabbit/BUILD.bazel
@@ -432,13 +432,18 @@ suites = [
name = "feature_flags_SUITE",
size = "large",
flaky = True,
- shard_count = 5,
+ shard_count = 9,
runtime_deps = [
"//deps/rabbit/test/feature_flags_SUITE_data/my_plugin:erlang_app",
],
),
rabbitmq_integration_suite(
PACKAGE,
+ name = "feature_flags_v2_SUITE",
+ size = "large",
+ ),
+ rabbitmq_integration_suite(
+ PACKAGE,
name = "feature_flags_with_unpriveleged_user_SUITE",
size = "large",
additional_beam = [
diff --git a/deps/rabbit/include/feature_flags.hrl b/deps/rabbit/include/feature_flags.hrl
new file mode 100644
index 0000000000..b90bdd1063
--- /dev/null
+++ b/deps/rabbit/include/feature_flags.hrl
@@ -0,0 +1,13 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2022 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-record(ffcommand, {
+ name :: rabbit_feature_flags:feature_name(),
+ props :: rabbit_feature_flags:feature_props_extended(),
+ command :: enable | post_enable,
+ extra :: #{nodes := [node()]}
+ }).
diff --git a/deps/rabbit/src/rabbit.erl b/deps/rabbit/src/rabbit.erl
index ff6a1da1b0..5f387412de 100644
--- a/deps/rabbit/src/rabbit.erl
+++ b/deps/rabbit/src/rabbit.erl
@@ -851,6 +851,12 @@ start(normal, []) ->
log_motd(),
{ok, SupPid} = rabbit_sup:start_link(),
+ %% When we load plugins later in this function, we refresh feature
+ %% flags. If `feature_flags_v2' is enabled, `rabbit_ff_controller'
+ %% will be used. We start it now because we can't wait for boot steps
+ %% to do this (feature flags are refreshed before boot steps run).
+ ok = rabbit_sup:start_child(rabbit_ff_controller),
+
%% Compatibility with older RabbitMQ versions + required by
%% rabbit_node_monitor:notify_node_up/0:
%%
diff --git a/deps/rabbit/src/rabbit_core_ff.erl b/deps/rabbit/src/rabbit_core_ff.erl
index 661717c7aa..60075c99ba 100644
--- a/deps/rabbit/src/rabbit_core_ff.erl
+++ b/deps/rabbit/src/rabbit_core_ff.erl
@@ -7,21 +7,17 @@
-module(rabbit_core_ff).
--export([classic_mirrored_queue_version_migration/3,
- quorum_queue_migration/3,
- stream_queue_migration/3,
+-export([quorum_queue_migration/3,
implicit_default_bindings_migration/3,
virtual_host_metadata_migration/3,
maintenance_mode_status_migration/3,
user_limits_migration/3,
- stream_single_active_consumer_migration/3,
direct_exchange_routing_v2_migration/3]).
-rabbit_feature_flag(
{classic_mirrored_queue_version,
#{desc => "Support setting version for classic mirrored queues",
- stability => stable,
- migration_fun => {?MODULE, classic_mirrored_queue_version_migration}
+ stability => stable
}}).
-rabbit_feature_flag(
@@ -37,8 +33,7 @@
#{desc => "Support queues of type `stream`",
doc_url => "https://www.rabbitmq.com/stream.html",
stability => stable,
- depends_on => [quorum_queue],
- migration_fun => {?MODULE, stream_queue_migration}
+ depends_on => [quorum_queue]
}}).
-rabbit_feature_flag(
@@ -75,8 +70,7 @@
#{desc => "Single active consumer for streams",
doc_url => "https://www.rabbitmq.com/stream.html",
stability => stable,
- depends_on => [stream_queue],
- migration_fun => {?MODULE, stream_single_active_consumer_migration}
+ depends_on => [stream_queue]
}}).
-rabbit_feature_flag(
@@ -87,8 +81,11 @@
migration_fun => {?MODULE, direct_exchange_routing_v2_migration}
}}).
-classic_mirrored_queue_version_migration(_FeatureName, _FeatureProps, _Enable) ->
- ok.
+-rabbit_feature_flag(
+ {feature_flags_v2,
+ #{desc => "Feature flags subsystem V2",
+ stability => stable
+ }}).
%% -------------------------------------------------------------------
%% Quorum queues.
@@ -109,9 +106,6 @@ quorum_queue_migration(_FeatureName, _FeatureProps, is_enabled) ->
mnesia:table_info(rabbit_queue, attributes) =:= Fields andalso
mnesia:table_info(rabbit_durable_queue, attributes) =:= Fields.
-stream_queue_migration(_FeatureName, _FeatureProps, _Enable) ->
- ok.
-
migrate_to_amqqueue_with_type(FeatureName, [Table | Rest], Fields) ->
rabbit_log_feature_flags:info(
"Feature flag `~s`: migrating Mnesia table ~s...",
@@ -209,15 +203,6 @@ user_limits_migration(_FeatureName, _FeatureProps, is_enabled) ->
mnesia:table_info(rabbit_user, attributes) =:= internal_user:fields(internal_user_v2).
%% -------------------------------------------------------------------
-%% Stream single active consumer.
-%% -------------------------------------------------------------------
-
-stream_single_active_consumer_migration(_FeatureName, _FeatureProps, enable) ->
- ok;
-stream_single_active_consumer_migration(_FeatureName, _FeatureProps, is_enabled) ->
- undefined.
-
-%% -------------------------------------------------------------------
%% Direct exchange routing v2.
%% -------------------------------------------------------------------
diff --git a/deps/rabbit/src/rabbit_feature_flags.erl b/deps/rabbit/src/rabbit_feature_flags.erl
index 3cffec6a88..9961449407 100644
--- a/deps/rabbit/src/rabbit_feature_flags.erl
+++ b/deps/rabbit/src/rabbit_feature_flags.erl
@@ -45,7 +45,7 @@
%% == How to declare a feature flag ==
%%
%% To define a new feature flag, you need to use the
-%% `rabbit_feature_flag()' module attribute:
+%% `-rabbit_feature_flag()' module attribute:
%%
%% ```
%% -rabbit_feature_flag(FeatureFlag).
@@ -77,6 +77,12 @@
-module(rabbit_feature_flags).
+-include_lib("kernel/include/logger.hrl").
+
+-include_lib("rabbit_common/include/logging.hrl").
+
+-include("feature_flags.hrl").
+
-export([list/0,
list/1,
list/2,
@@ -111,30 +117,32 @@
]).
%% RabbitMQ internal use only.
--export([initialize_registry/0,
- initialize_registry/1,
- mark_as_enabled_locally/2,
+-export([mark_as_enabled_locally/2,
remote_nodes/0,
running_remote_nodes/0,
does_node_support/3,
merge_feature_flags_from_unknown_apps/1,
- do_sync_feature_flags_with_node/1]).
+ do_sync_feature_flags_with_node/1,
+ enabled_feature_flags_to_feature_states/1,
+ inject_test_feature_flags/1,
+ query_supported_feature_flags/0,
+ read_enabled_feature_flags_list/0,
+ run_migration_fun/3,
+ uses_migration_fun_v2/1,
+ uses_migration_fun_v2/2]).
-ifdef(TEST).
--export([inject_test_feature_flags/1,
- initialize_registry/3,
- query_supported_feature_flags/0,
- mark_as_enabled_remotely/2,
- mark_as_enabled_remotely/4,
- registry_loading_lock/0]).
+-export([mark_as_enabled_remotely/4,
+ override_nodes/1,
+ override_running_nodes/1,
+ get_overriden_nodes/0,
+ get_overriden_running_nodes/0,
+ share_new_feature_flags_after_app_load/2]).
-endif.
%% Default timeout for operations on remote nodes.
-define(TIMEOUT, 60000).
--define(FF_REGISTRY_LOADING_LOCK, {feature_flags_registry_loading, self()}).
--define(FF_STATE_CHANGE_LOCK, {feature_flags_state_change, self()}).
-
-type feature_flag_modattr() :: {feature_name(),
feature_props()}.
%% The value of a `-rabbitmq_feature_flag()' module attribute used to
@@ -200,18 +208,26 @@
-type feature_states() :: #{feature_name() => feature_state()}.
-type stability() :: stable | experimental.
-%% The level of stability of a feature flag. Currently, only informational.
+%% The level of stability of a feature flag.
+%%
+%% Experimental feature flags are not enabled by default on a fresh RabbitMQ
+%% node. They must be enabled by the user.
--type migration_fun_name() :: {Module :: atom(), Function :: atom()}.
+-type migration_fun_name() :: {Module :: module(), Function :: atom()}.
%% The name of the module and function to call when changing the state of
%% the feature flag.
--type migration_fun() :: fun((feature_name(),
- feature_props_extended(),
- migration_fun_context())
- -> ok | {error, any()} | % context = enable
- boolean() | undefined). % context = is_enabled
+-type migration_fun() :: migration_fun_v1() | migration_fun_v2().
%% The migration function signature.
+
+-type migration_fun_context() :: enable | is_enabled.
+
+-type migration_fun_v1() :: fun((feature_name(),
+ feature_props_extended(),
+ migration_fun_context())
+ -> ok | {error, any()} |
+ boolean() | undefined).
+%% The migration function signature (v1).
%%
%% It is called with context `enable' when a feature flag is being enabled.
%% The function is responsible for this feature-flag-specific verification
@@ -226,9 +242,53 @@
%% is actually enabled. It is useful on RabbitMQ startup, just in case
%% the previous instance failed to write the feature flags list file.
--type migration_fun_context() :: enable | is_enabled.
+-type ffcommand() :: #ffcommand{}.
--type registry_vsn() :: term().
+-type migration_fun_v2() :: fun((ffcommand()) -> ok | no_return()).
+%% The migration function signature (v2).
+%%
+%% The migration function is called on all nodes which fulfill the following
+%% conditions:
+%% <ol>
+%% <li>The node knows the feature flag.</li>
+%% <li>The feature flag is disabled on that node before calling the migration
+%% function.</li>
+%% </ol>
+%%
+%% All executions of the migration function on these nodes will run in
+%% parallel (concurrently). The migration function is responsible for its own
+%% locking and synchronization.
+%%
+%% It is first called with the command `enable' when a feature flag is being
+%% enabled. The function is responsible for this feature-flag-specific
+%% verification and data conversion. It returns `ok' if RabbitMQ can mark the
+%% feature flag as enabled an continue with the next one, if any. Other return
+%% values or exceptions are an error and the feature flag should remain
+%% disabled.
+%%
+%% It is then called with the command `post_enable' after a feature flag has
+%% been marked as enabled. The return value or enay exceptions are ignored and
+%% the feature flag will remain enabled even if there is a failure.
+%%
+%% When a node is joining a cluster where one side has a feature flag enabled,
+%% that feature flag will be enabled on the other side. It means the migration
+%% function will run on the nodes where it is disabled. Therefore the
+%% migration function can run in clusters where some nodes previously executed
+%% it and the feature flag was already enabled.
+%%
+%% The migration function should also be idempotent. For instance, if the
+%% feature flag couldn't be marked as enabled everywhere after the migration
+%% function was called with the `enable' command, it may be called again.
+
+-type inventory() :: #{applications := [atom()],
+ feature_flags := feature_flags(),
+ states := feature_states()}.
+
+-type cluster_inventory() :: #{feature_flags := feature_flags(),
+ applications_per_node :=
+ #{node() => [atom()]},
+ states_per_node :=
+ #{node() => feature_states()}}.
-export_type([feature_flag_modattr/0,
feature_props/0,
@@ -240,7 +300,12 @@
stability/0,
migration_fun_name/0,
migration_fun/0,
- migration_fun_context/0]).
+ migration_fun_v1/0,
+ migration_fun_v2/0,
+ migration_fun_context/0,
+ ffcommand/0,
+ inventory/0,
+ cluster_inventory/0]).
-on_load(on_load/0).
@@ -296,36 +361,85 @@ list(Which, Stability)
%% dependency tree are left unchanged).
enable(FeatureName) when is_atom(FeatureName) ->
+ case is_enabled(feature_flags_v2) of
+ true ->
+ rabbit_ff_controller:enable(FeatureName);
+ false ->
+ case requires_feature_flags_v2(FeatureName) of
+ true ->
+ ?LOG_DEBUG(
+ "Feature flags: `~s` uses the migration function "
+ "API v2 and thus requires `feature_flags_v2; "
+ "enabling the latter first",
+ [FeatureName],
+ #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
+ enable([feature_flags_v2, FeatureName]);
+ false ->
+ enable_v1(FeatureName)
+ end
+ end;
+enable(FeatureNames) when is_list(FeatureNames) ->
+ FeatureNames1 = sort_feature_flags_v2_first(FeatureNames),
+ with_feature_flags(FeatureNames1, fun enable/1).
+
+sort_feature_flags_v2_first(FeatureNames) ->
+ lists:sort(
+ fun
+ (feature_flags_v2, _) -> true;
+ (_, feature_flags_v2) -> false;
+ (A, B) -> A =< B
+ end, FeatureNames).
+
+requires_feature_flags_v2(FeatureName) ->
+ uses_migration_fun_v2(FeatureName).
+
+uses_migration_fun_v2(FeatureName) ->
+ case rabbit_ff_registry:get(FeatureName) of
+ undefined ->
+ false;
+ FeatureProps ->
+ case maps:get(migration_fun, FeatureProps, none) of
+ {MigrationMod, MigrationFun}
+ when is_atom(MigrationMod) andalso is_atom(MigrationFun) ->
+ uses_migration_fun_v2(MigrationMod, MigrationFun);
+ _ ->
+ false
+ end
+ end.
+
+uses_migration_fun_v2(MigrationMod, MigrationFun) ->
+ erlang:function_exported(MigrationMod, MigrationFun, 1).
+
+enable_v1(FeatureName) ->
rabbit_log_feature_flags:debug(
- "Feature flag `~s`: REQUEST TO ENABLE",
+ "Feature flags: `~s`: REQUEST TO ENABLE",
[FeatureName]),
case is_enabled(FeatureName) of
true ->
rabbit_log_feature_flags:debug(
- "Feature flag `~s`: already enabled",
+ "Feature flags: `~s`: already enabled",
[FeatureName]),
ok;
false ->
rabbit_log_feature_flags:debug(
- "Feature flag `~s`: not enabled, check if supported by cluster",
+ "Feature flags: `~s`: not enabled, check if supported by "
+ "cluster",
[FeatureName]),
%% The feature flag must be supported locally and remotely
%% (i.e. by all members of the cluster).
case is_supported(FeatureName) of
true ->
rabbit_log_feature_flags:info(
- "Feature flag `~s`: supported, attempt to enable...",
+ "Feature flags: `~s`: supported, attempt to enable...",
[FeatureName]),
do_enable(FeatureName);
false ->
rabbit_log_feature_flags:error(
- "Feature flag `~s`: not supported",
+ "Feature flags: `~s`: not supported",
[FeatureName]),
{error, unsupported}
end
- end;
-enable(FeatureNames) when is_list(FeatureNames) ->
- with_feature_flags(FeatureNames, fun enable/1).
+ end.
-spec enable_all() -> ok | {error, any()}.
%% @doc
@@ -355,7 +469,7 @@ enable_all() ->
enable_all(Stability)
when Stability =:= stable orelse Stability =:= experimental ->
- with_feature_flags(maps:keys(list(all, Stability)), fun enable/1).
+ enable(maps:keys(list(all, Stability))).
-spec disable(feature_name() | [feature_name()]) -> ok | {error, any()}.
%% @doc
@@ -412,8 +526,13 @@ with_feature_flags([], _) ->
%% `false' if one of them is not or the RPC timed out.
is_supported(FeatureNames) ->
- is_supported_locally(FeatureNames) andalso
- is_supported_remotely(FeatureNames).
+ case is_enabled(feature_flags_v2) of
+ true ->
+ rabbit_ff_controller:is_supported(FeatureNames);
+ false ->
+ is_supported_locally(FeatureNames) andalso
+ is_supported_remotely(FeatureNames)
+ end.
-spec is_supported(feature_name() | [feature_name()], timeout()) ->
boolean().
@@ -431,8 +550,13 @@ is_supported(FeatureNames) ->
%% `false' if one of them is not or the RPC timed out.
is_supported(FeatureNames, Timeout) ->
- is_supported_locally(FeatureNames) andalso
- is_supported_remotely(FeatureNames, Timeout).
+ case is_enabled(feature_flags_v2) of
+ true ->
+ rabbit_ff_controller:is_supported(FeatureNames, Timeout);
+ false ->
+ is_supported_locally(FeatureNames) andalso
+ is_supported_remotely(FeatureNames, Timeout)
+ end.
-spec is_supported_locally(feature_name() | [feature_name()]) -> boolean().
%% @doc
@@ -578,8 +702,8 @@ is_enabled(FeatureNames, non_blocking) ->
is_enabled(FeatureNames, blocking) ->
case is_enabled_nb(FeatureNames) of
state_changing ->
- global:set_lock(?FF_STATE_CHANGE_LOCK),
- global:del_lock(?FF_STATE_CHANGE_LOCK),
+ rabbit_ff_registry_factory:acquire_state_change_lock(),
+ rabbit_ff_registry_factory:release_state_change_lock(),
is_enabled(FeatureNames, blocking);
IsEnabled ->
IsEnabled
@@ -751,327 +875,68 @@ init() ->
_ = list(all),
ok.
--spec initialize_registry() -> ok | {error, any()} | no_return().
-%% @private
-%% @doc
-%% Initializes or reinitializes the registry.
-%%
-%% The registry is an Erlang module recompiled at runtime to hold the
-%% state of all supported feature flags.
-%%
-%% That Erlang module is called {@link rabbit_ff_registry}. The initial
-%% source code of this module simply calls this function so it is
-%% replaced by a proper registry.
-%%
-%% Once replaced, the registry contains the map of all supported feature
-%% flags and their state. This makes it very efficient to query a
-%% feature flag state or property.
-%%
-%% The registry is local to all RabbitMQ nodes.
-
-initialize_registry() ->
- initialize_registry(#{}).
-
--spec initialize_registry(feature_flags()) ->
- ok | {error, any()} | no_return().
-%% @private
-%% @doc
-%% Initializes or reinitializes the registry.
-%%
-%% See {@link initialize_registry/0} for a description of the registry.
-%%
-%% This function takes a map of new supported feature flags (so their
-%% name and extended properties) to add to the existing known feature
-%% flags.
-
-initialize_registry(NewSupportedFeatureFlags) ->
- %% The first step is to get the feature flag states: if this is the
- %% first time we initialize it, we read the list from disk (the
- %% `feature_flags` file). Otherwise we query the existing registry
- %% before it is replaced.
- RegistryInitialized = rabbit_ff_registry:is_registry_initialized(),
- FeatureStates = case RegistryInitialized of
- true ->
- rabbit_ff_registry:states();
- false ->
- EnabledFeatureNames =
- read_enabled_feature_flags_list(),
- list_of_enabled_feature_flags_to_feature_states(
- EnabledFeatureNames)
- end,
-
- %% We also record if the feature flags state was correctly written
- %% to disk. Currently we don't use this information, but in the
- %% future, we might want to retry the write if it failed so far.
- %%
- %% TODO: Retry to write the feature flags state if the first try
- %% failed.
- WrittenToDisk = case RegistryInitialized of
- true ->
- rabbit_ff_registry:is_registry_written_to_disk();
- false ->
- true
- end,
- initialize_registry(NewSupportedFeatureFlags,
- FeatureStates,
- WrittenToDisk).
-
--spec list_of_enabled_feature_flags_to_feature_states([feature_name()]) ->
- feature_states().
-
-list_of_enabled_feature_flags_to_feature_states(FeatureNames) ->
- maps:from_list([{FeatureName, true} || FeatureName <- FeatureNames]).
-
--spec initialize_registry(feature_flags(),
- feature_states(),
- boolean()) ->
- ok | {error, any()} | no_return().
-%% @private
-%% @doc
-%% Initializes or reinitializes the registry.
-%%
-%% See {@link initialize_registry/0} for a description of the registry.
-%%
-%% This function takes a map of new supported feature flags (so their
-%% name and extended properties) to add to the existing known feature
-%% flags, a map of the new feature flag states (whether they are
-%% enabled, disabled or `state_changing'), and a flag to indicate if the
-%% feature flag states was recorded to disk.
-%%
-%% The latter is used to block callers asking if a feature flag is
-%% enabled or disabled while its state is changing.
-
-initialize_registry(NewSupportedFeatureFlags,
- NewFeatureStates,
- WrittenToDisk) ->
- Ret = maybe_initialize_registry(NewSupportedFeatureFlags,
- NewFeatureStates,
- WrittenToDisk),
- case Ret of
- ok -> ok;
- restart -> initialize_registry(NewSupportedFeatureFlags,
- NewFeatureStates,
- WrittenToDisk);
- Error -> Error
- end.
-
--spec maybe_initialize_registry(feature_flags(),
- feature_states(),
- boolean()) ->
- ok | restart | {error, any()} | no_return().
-
-maybe_initialize_registry(NewSupportedFeatureFlags,
- NewFeatureStates,
- WrittenToDisk) ->
- %% We save the version of the current registry before computing
- %% the new one. This is used when we do the actual reload: if the
- %% current registry was reloaded in the meantime, we need to restart
- %% the computation to make sure we don't loose data.
- RegistryVsn = registry_vsn(),
-
- %% We take the feature flags already registered.
- RegistryInitialized = rabbit_ff_registry:is_registry_initialized(),
- KnownFeatureFlags1 = case RegistryInitialized of
- true -> rabbit_ff_registry:list(all);
- false -> #{}
- end,
-
- %% Query the list (it's a map to be exact) of known
- %% supported feature flags. That list comes from the
- %% `-rabbitmq_feature_flag().` module attributes exposed by all
- %% currently loaded Erlang modules.
- KnownFeatureFlags2 = query_supported_feature_flags(),
-
- %% We merge the feature flags we already knew about
- %% (KnownFeatureFlags1), those found in the loaded applications
- %% (KnownFeatureFlags2) and those specified in arguments
- %% (NewSupportedFeatureFlags). The latter come from remote nodes
- %% usually: for example, they can come from plugins loaded on remote
- %% node but the plugins are missing locally. In this case, we
- %% consider those feature flags supported because there is no code
- %% locally which would cause issues.
- %%
- %% It means that the list of feature flags only grows. we don't try
- %% to clean it at some point because we want to remember about the
- %% feature flags we saw (and their state). It should be fine because
- %% that list should remain small.
- KnownFeatureFlags = maps:merge(KnownFeatureFlags1,
- KnownFeatureFlags2),
- AllFeatureFlags = maps:merge(KnownFeatureFlags,
- NewSupportedFeatureFlags),
-
- %% Next we want to update the feature states, based on the new
- %% states passed as arguments.
- FeatureStates0 = case RegistryInitialized of
- true ->
- maps:merge(rabbit_ff_registry:states(),
- NewFeatureStates);
- false ->
- NewFeatureStates
- end,
- FeatureStates = maps:filter(
- fun(_, true) -> true;
- (_, state_changing) -> true;
- (_, false) -> false
- end, FeatureStates0),
-
- Proceed = does_registry_need_refresh(AllFeatureFlags,
- FeatureStates,
- WrittenToDisk),
-
- case Proceed of
- true ->
- rabbit_log_feature_flags:debug(
- "Feature flags: (re)initialize registry (~p)",
- [self()]),
- T0 = erlang:timestamp(),
- Ret = do_initialize_registry(RegistryVsn,
- AllFeatureFlags,
- FeatureStates,
- WrittenToDisk),
- T1 = erlang:timestamp(),
- rabbit_log_feature_flags:debug(
- "Feature flags: time to regen registry: ~p µs",
- [timer:now_diff(T1, T0)]),
- Ret;
- false ->
- rabbit_log_feature_flags:debug(
- "Feature flags: registry already up-to-date, skipping init"),
- ok
- end.
-
--spec does_registry_need_refresh(feature_flags(),
- feature_states(),
- boolean()) ->
- boolean().
-
-does_registry_need_refresh(AllFeatureFlags,
- FeatureStates,
- WrittenToDisk) ->
- case rabbit_ff_registry:is_registry_initialized() of
- true ->
- %% Before proceeding with the actual
- %% (re)initialization, let's see if there are any
- %% changes.
- CurrentAllFeatureFlags = rabbit_ff_registry:list(all),
- CurrentFeatureStates = rabbit_ff_registry:states(),
- CurrentWrittenToDisk =
- rabbit_ff_registry:is_registry_written_to_disk(),
-
- if
- AllFeatureFlags =/= CurrentAllFeatureFlags ->
- rabbit_log_feature_flags:debug(
- "Feature flags: registry refresh needed: "
- "yes, list of feature flags differs"),
- true;
- FeatureStates =/= CurrentFeatureStates ->
- rabbit_log_feature_flags:debug(
- "Feature flags: registry refresh needed: "
- "yes, feature flag states differ"),
- true;
- WrittenToDisk =/= CurrentWrittenToDisk ->
- rabbit_log_feature_flags:debug(
- "Feature flags: registry refresh needed: "
- "yes, \"written to disk\" state changed"),
- true;
- true ->
- rabbit_log_feature_flags:debug(
- "Feature flags: registry refresh needed: no"),
- false
- end;
- false ->
- rabbit_log_feature_flags:debug(
- "Feature flags: registry refresh needed: "
- "yes, first-time initialization"),
- true
- end.
-
--spec do_initialize_registry(registry_vsn(),
- feature_flags(),
- feature_states(),
- boolean()) ->
- ok | restart | {error, any()} | no_return().
-%% @private
-
-do_initialize_registry(RegistryVsn,
- AllFeatureFlags,
- FeatureStates,
- WrittenToDisk) ->
- %% We log the state of those feature flags.
- rabbit_log_feature_flags:info(
- "Feature flags: list of feature flags found:"),
- lists:foreach(
- fun(FeatureName) ->
- rabbit_log_feature_flags:info(
- "Feature flags: [~s] ~s",
- [case maps:is_key(FeatureName, FeatureStates) of
- true ->
- case maps:get(FeatureName, FeatureStates) of
- true -> "x";
- state_changing -> "~"
- end;
- false ->
- " "
- end,
- FeatureName])
- end, lists:sort(maps:keys(AllFeatureFlags))),
- rabbit_log_feature_flags:info(
- "Feature flags: feature flag states written to disk: ~s",
- [case WrittenToDisk of
- true -> "yes";
- false -> "no"
- end]),
-
- %% We request the registry to be regenerated and reloaded with the
- %% new state.
- regen_registry_mod(RegistryVsn,
- AllFeatureFlags,
- FeatureStates,
- WrittenToDisk).
-
--spec query_supported_feature_flags() -> feature_flags().
-%% @private
-
--ifdef(TEST).
-define(PT_TESTSUITE_ATTRS, {?MODULE, testsuite_feature_flags_attrs}).
-inject_test_feature_flags(AttributesFromTestsuite) ->
+inject_test_feature_flags(FeatureFlags) ->
+ ExistingAppAttrs = module_attributes_from_testsuite(),
+ FeatureFlagsPerApp0 = lists:foldl(
+ fun({Origin, Origin, FFlags}, Acc) ->
+ Acc#{Origin => maps:from_list(FFlags)}
+ end, #{}, ExistingAppAttrs),
+ FeatureFlagsPerApp1 = maps:fold(
+ fun(FeatureName, FeatureProps, Acc) ->
+ Origin = case FeatureProps of
+ #{provided_by := App} ->
+ App;
+ _ ->
+ '$injected'
+ end,
+ FFlags0 = maps:get(Origin, Acc, #{}),
+ FFlags1 = FFlags0#{
+ FeatureName => FeatureProps},
+ Acc#{Origin => FFlags1}
+ end, FeatureFlagsPerApp0, FeatureFlags),
+ AttributesFromTestsuite = maps:fold(
+ fun(Origin, FFlags, Acc) ->
+ [{Origin, % Application
+ Origin, % Module
+ maps:to_list(FFlags)} | Acc]
+ end, [], FeatureFlagsPerApp1),
rabbit_log_feature_flags:debug(
- "Feature flags: injecting feature flags from testsuite: ~p",
- [AttributesFromTestsuite]),
+ "Feature flags: injecting feature flags from testsuite: ~p~n"
+ "Feature flags: all injected feature flags: ~p",
+ [FeatureFlags, AttributesFromTestsuite]),
ok = persistent_term:put(?PT_TESTSUITE_ATTRS, AttributesFromTestsuite),
- initialize_registry().
+ rabbit_ff_registry_factory:initialize_registry().
module_attributes_from_testsuite() ->
persistent_term:get(?PT_TESTSUITE_ATTRS, []).
+-spec query_supported_feature_flags() -> {ScannedApps, FeatureFlags} when
+ ScannedApps :: [atom()],
+ FeatureFlags :: feature_flags().
+%% @private
+
query_supported_feature_flags() ->
rabbit_log_feature_flags:debug(
- "Feature flags: query feature flags in loaded applications "
- "+ testsuite"),
+ "Feature flags: query feature flags in loaded applications"),
T0 = erlang:timestamp(),
- AttributesPerApp = rabbit_misc:rabbitmq_related_module_attributes(
- rabbit_feature_flag),
+ %% We need to know the list of applications we scanned for feature flags.
+ %% We can't derive that list of the returned feature flags because an
+ %% application might be loaded/present and not have a specific feature
+ %% flag. In this case, the feature flag should be considered unsupported.
+ ScannedApps = rabbit_misc:rabbitmq_related_apps(),
+ AttributesPerApp = rabbit_misc:module_attributes_from_apps(
+ rabbit_feature_flag, ScannedApps),
AttributesFromTestsuite = module_attributes_from_testsuite(),
+ TestsuiteProviders = [App || {App, _, _} <- AttributesFromTestsuite],
T1 = erlang:timestamp(),
rabbit_log_feature_flags:debug(
"Feature flags: time to find supported feature flags: ~p µs",
[timer:now_diff(T1, T0)]),
AllAttributes = AttributesPerApp ++ AttributesFromTestsuite,
- prepare_queried_feature_flags(AllAttributes, #{}).
--else.
-query_supported_feature_flags() ->
- rabbit_log_feature_flags:debug(
- "Feature flags: query feature flags in loaded applications"),
- T0 = erlang:timestamp(),
- AttributesPerApp = rabbit_misc:rabbitmq_related_module_attributes(
- rabbit_feature_flag),
- T1 = erlang:timestamp(),
- rabbit_log_feature_flags:debug(
- "Feature flags: time to find supported feature flags: ~p µs",
- [timer:now_diff(T1, T0)]),
- prepare_queried_feature_flags(AttributesPerApp, #{}).
--endif.
+ AllApps = lists:usort(ScannedApps ++ TestsuiteProviders),
+ {AllApps, prepare_queried_feature_flags(AllAttributes, #{})}.
prepare_queried_feature_flags([{App, _Module, Attributes} | Rest],
AllFeatureFlags) ->
@@ -1105,293 +970,6 @@ merge_new_feature_flags(AllFeatureFlags, App, FeatureName, FeatureProps)
maps:merge(AllFeatureFlags,
#{FeatureName => FeatureProps1}).
--spec regen_registry_mod(registry_vsn(),
- feature_flags(),
- feature_states(),
- boolean()) ->
- ok | restart | {error, any()} | no_return().
-%% @private
-
-regen_registry_mod(RegistryVsn,
- AllFeatureFlags,
- FeatureStates,
- WrittenToDisk) ->
- %% Here, we recreate the source code of the `rabbit_ff_registry`
- %% module from scratch.
- %%
- %% IMPORTANT: We want both modules to have the exact same public
- %% API in order to simplify the life of developers and their tools
- %% (Dialyzer, completion, and so on).
-
- %% -module(rabbit_ff_registry).
- ModuleAttr = erl_syntax:attribute(
- erl_syntax:atom(module),
- [erl_syntax:atom(rabbit_ff_registry)]),
- ModuleForm = erl_syntax:revert(ModuleAttr),
- %% -export([...]).
- ExportAttr = erl_syntax:attribute(
- erl_syntax:atom(export),
- [erl_syntax:list(
- [erl_syntax:arity_qualifier(
- erl_syntax:atom(F),
- erl_syntax:integer(A))
- || {F, A} <- [{get, 1},
- {list, 1},
- {states, 0},
- {is_supported, 1},
- {is_enabled, 1},
- {is_registry_initialized, 0},
- {is_registry_written_to_disk, 0}]]
- )
- ]
- ),
- ExportForm = erl_syntax:revert(ExportAttr),
- %% get(_) -> ...
- GetClauses = [erl_syntax:clause(
- [erl_syntax:atom(FeatureName)],
- [],
- [erl_syntax:abstract(maps:get(FeatureName,
- AllFeatureFlags))])
- || FeatureName <- maps:keys(AllFeatureFlags)
- ],
- GetUnknownClause = erl_syntax:clause(
- [erl_syntax:variable("_")],
- [],
- [erl_syntax:atom(undefined)]),
- GetFun = erl_syntax:function(
- erl_syntax:atom(get),
- GetClauses ++ [GetUnknownClause]),
- GetFunForm = erl_syntax:revert(GetFun),
- %% list(_) -> ...
- ListAllBody = erl_syntax:abstract(AllFeatureFlags),
- ListAllClause = erl_syntax:clause([erl_syntax:atom(all)],
- [],
- [ListAllBody]),
- EnabledFeatureFlags = maps:filter(
- fun(FeatureName, _) ->
- maps:is_key(FeatureName,
- FeatureStates)
- andalso
- maps:get(FeatureName, FeatureStates)
- =:=
- true
- end, AllFeatureFlags),
- ListEnabledBody = erl_syntax:abstract(EnabledFeatureFlags),
- ListEnabledClause = erl_syntax:clause(
- [erl_syntax:atom(enabled)],
- [],
- [ListEnabledBody]),
- DisabledFeatureFlags = maps:filter(
- fun(FeatureName, _) ->
- not maps:is_key(FeatureName,
- FeatureStates)
- end, AllFeatureFlags),
- ListDisabledBody = erl_syntax:abstract(DisabledFeatureFlags),
- ListDisabledClause = erl_syntax:clause(
- [erl_syntax:atom(disabled)],
- [],
- [ListDisabledBody]),
- StateChangingFeatureFlags = maps:filter(
- fun(FeatureName, _) ->
- maps:is_key(FeatureName,
- FeatureStates)
- andalso
- maps:get(FeatureName, FeatureStates)
- =:=
- state_changing
- end, AllFeatureFlags),
- ListStateChangingBody = erl_syntax:abstract(StateChangingFeatureFlags),
- ListStateChangingClause = erl_syntax:clause(
- [erl_syntax:atom(state_changing)],
- [],
- [ListStateChangingBody]),
- ListFun = erl_syntax:function(
- erl_syntax:atom(list),
- [ListAllClause,
- ListEnabledClause,
- ListDisabledClause,
- ListStateChangingClause]),
- ListFunForm = erl_syntax:revert(ListFun),
- %% states() -> ...
- StatesBody = erl_syntax:abstract(FeatureStates),
- StatesClause = erl_syntax:clause([], [], [StatesBody]),
- StatesFun = erl_syntax:function(
- erl_syntax:atom(states),
- [StatesClause]),
- StatesFunForm = erl_syntax:revert(StatesFun),
- %% is_supported(_) -> ...
- IsSupportedClauses = [erl_syntax:clause(
- [erl_syntax:atom(FeatureName)],
- [],
- [erl_syntax:atom(true)])
- || FeatureName <- maps:keys(AllFeatureFlags)
- ],
- NotSupportedClause = erl_syntax:clause(
- [erl_syntax:variable("_")],
- [],
- [erl_syntax:atom(false)]),
- IsSupportedFun = erl_syntax:function(
- erl_syntax:atom(is_supported),
- IsSupportedClauses ++ [NotSupportedClause]),
- IsSupportedFunForm = erl_syntax:revert(IsSupportedFun),
- %% is_enabled(_) -> ...
- IsEnabledClauses = [erl_syntax:clause(
- [erl_syntax:atom(FeatureName)],
- [],
- [case maps:is_key(FeatureName, FeatureStates) of
- true ->
- erl_syntax:atom(
- maps:get(FeatureName, FeatureStates));
- false ->
- erl_syntax:atom(false)
- end])
- || FeatureName <- maps:keys(AllFeatureFlags)
- ],
- NotEnabledClause = erl_syntax:clause(
- [erl_syntax:variable("_")],
- [],
- [erl_syntax:atom(false)]),
- IsEnabledFun = erl_syntax:function(
- erl_syntax:atom(is_enabled),
- IsEnabledClauses ++ [NotEnabledClause]),
- IsEnabledFunForm = erl_syntax:revert(IsEnabledFun),
- %% is_registry_initialized() -> ...
- IsInitializedClauses = [erl_syntax:clause(
- [],
- [],
- [erl_syntax:atom(true)])
- ],
- IsInitializedFun = erl_syntax:function(
- erl_syntax:atom(is_registry_initialized),
- IsInitializedClauses),
- IsInitializedFunForm = erl_syntax:revert(IsInitializedFun),
- %% is_registry_written_to_disk() -> ...
- IsWrittenToDiskClauses = [erl_syntax:clause(
- [],
- [],
- [erl_syntax:atom(WrittenToDisk)])
- ],
- IsWrittenToDiskFun = erl_syntax:function(
- erl_syntax:atom(is_registry_written_to_disk),
- IsWrittenToDiskClauses),
- IsWrittenToDiskFunForm = erl_syntax:revert(IsWrittenToDiskFun),
- %% Compilation!
- Forms = [ModuleForm,
- ExportForm,
- GetFunForm,
- ListFunForm,
- StatesFunForm,
- IsSupportedFunForm,
- IsEnabledFunForm,
- IsInitializedFunForm,
- IsWrittenToDiskFunForm],
- maybe_log_registry_source_code(Forms),
- CompileOpts = [return_errors,
- return_warnings],
- case compile:forms(Forms, CompileOpts) of
- {ok, Mod, Bin, _} ->
- load_registry_mod(RegistryVsn, Mod, Bin);
- {error, Errors, Warnings} ->
- rabbit_log_feature_flags:error(
- "Feature flags: registry compilation:~n"
- "Errors: ~p~n"
- "Warnings: ~p",
- [Errors, Warnings]),
- {error, {compilation_failure, Errors, Warnings}}
- end.
-
-maybe_log_registry_source_code(Forms) ->
- case rabbit_prelaunch:get_context() of
- #{log_feature_flags_registry := true} ->
- rabbit_log_feature_flags:debug(
- "== FEATURE FLAGS REGISTRY ==~n"
- "~s~n"
- "== END ==~n",
- [erl_prettypr:format(erl_syntax:form_list(Forms))]);
- _ ->
- ok
- end.
-
--ifdef(TEST).
-registry_loading_lock() -> ?FF_REGISTRY_LOADING_LOCK.
--endif.
-
--spec load_registry_mod(registry_vsn(), atom(), binary()) ->
- ok | restart | no_return().
-%% @private
-
-load_registry_mod(RegistryVsn, Mod, Bin) ->
- rabbit_log_feature_flags:debug(
- "Feature flags: registry module ready, loading it (~p)...",
- [self()]),
- FakeFilename = "Compiled and loaded by " ?MODULE_STRING,
- %% Time to load the new registry, replacing the old one. We use a
- %% lock here to synchronize concurrent reloads.
- global:set_lock(?FF_REGISTRY_LOADING_LOCK, [node()]),
- rabbit_log_feature_flags:debug(
- "Feature flags: acquired lock before reloading registry module (~p)",
- [self()]),
- %% We want to make sure that the old registry (not the one being
- %% currently in use) is purged by the code server. It means no
- %% process lingers on that old code.
- %%
- %% We use code:soft_purge() for that (meaning no process is killed)
- %% and we wait in an infinite loop for that to succeed.
- ok = purge_old_registry(Mod),
- %% Now we can replace the currently loaded registry by the new one.
- %% The code server takes care of marking the current registry as old
- %% and load the new module in an atomic operation.
- %%
- %% Therefore there is no chance of a window where there is no
- %% registry module available, causing the one on disk to be
- %% reloaded.
- Ret = case registry_vsn() of
- RegistryVsn -> code:load_binary(Mod, FakeFilename, Bin);
- OtherVsn -> {error, {restart, RegistryVsn, OtherVsn}}
- end,
- rabbit_log_feature_flags:debug(
- "Feature flags: releasing lock after reloading registry module (~p)",
- [self()]),
- global:del_lock(?FF_REGISTRY_LOADING_LOCK, [node()]),
- case Ret of
- {module, _} ->
- rabbit_log_feature_flags:debug(
- "Feature flags: registry module loaded (vsn: ~p -> ~p)",
- [RegistryVsn, registry_vsn()]),
- ok;
- {error, {restart, Expected, Current}} ->
- rabbit_log_feature_flags:error(
- "Feature flags: another registry module was loaded in the "
- "meantime (expected old vsn: ~p, current vsn: ~p); "
- "restarting the regen",
- [Expected, Current]),
- restart;
- {error, Reason} ->
- rabbit_log_feature_flags:error(
- "Feature flags: failed to load registry module: ~p",
- [Reason]),
- throw({feature_flag_registry_reload_failure, Reason})
- end.
-
--spec registry_vsn() -> registry_vsn().
-%% @private
-
-registry_vsn() ->
- Attrs = rabbit_ff_registry:module_info(attributes),
- proplists:get_value(vsn, Attrs, undefined).
-
-purge_old_registry(Mod) ->
- case code:is_loaded(Mod) of
- {file, _} -> do_purge_old_registry(Mod);
- false -> ok
- end.
-
-do_purge_old_registry(Mod) ->
- case code:soft_purge(Mod) of
- true -> ok;
- false -> do_purge_old_registry(Mod)
- end.
-
%% -------------------------------------------------------------------
%% Feature flags state storage.
%% -------------------------------------------------------------------
@@ -1515,7 +1093,7 @@ do_enable(FeatureName) ->
%% We mark this feature flag as "state changing" before doing the
%% actual state change. We also take a global lock: this permits
%% to block callers asking about a feature flag changing state.
- global:set_lock(?FF_STATE_CHANGE_LOCK),
+ rabbit_ff_registry_factory:acquire_state_change_lock(),
Ret = case mark_as_enabled(FeatureName, state_changing) of
ok ->
case enable_dependencies(FeatureName, true) of
@@ -1538,7 +1116,7 @@ do_enable(FeatureName) ->
ok -> ok;
_ -> mark_as_enabled(FeatureName, false)
end,
- global:del_lock(?FF_STATE_CHANGE_LOCK),
+ rabbit_ff_registry_factory:release_state_change_lock(),
Ret.
-spec enable_locally(feature_name()) -> ok | {error, any()} | no_return().
@@ -1550,7 +1128,7 @@ enable_locally(FeatureName) when is_atom(FeatureName) ->
ok;
false ->
rabbit_log_feature_flags:debug(
- "Feature flag `~s`: enable locally (as part of feature "
+ "Feature flags: `~s`: enable locally (as part of feature "
"flag states synchronization)",
[FeatureName]),
do_enable_locally(FeatureName)
@@ -1582,7 +1160,7 @@ enable_dependencies(FeatureName, Everywhere) ->
FeatureProps = rabbit_ff_registry:get(FeatureName),
DependsOn = maps:get(depends_on, FeatureProps, []),
rabbit_log_feature_flags:debug(
- "Feature flag `~s`: enable dependencies: ~p",
+ "Feature flags: `~s`: enable dependencies: ~p",
[FeatureName, DependsOn]),
enable_dependencies(FeatureName, DependsOn, Everywhere).
@@ -1615,8 +1193,9 @@ run_migration_fun(FeatureName, FeatureProps, Arg) ->
{MigrationMod, MigrationFun}
when is_atom(MigrationMod) andalso is_atom(MigrationFun) ->
rabbit_log_feature_flags:debug(
- "Feature flag `~s`: run migration function ~p with arg: ~p",
- [FeatureName, MigrationFun, Arg]),
+ "Feature flags: `~s`: run migration function (v1) ~s:~s "
+ "with arg=~p on node ~p",
+ [FeatureName, MigrationMod, MigrationFun, Arg, node()]),
try
erlang:apply(MigrationMod,
MigrationFun,
@@ -1624,7 +1203,8 @@ run_migration_fun(FeatureName, FeatureProps, Arg) ->
catch
_:Reason:Stacktrace ->
rabbit_log_feature_flags:error(
- "Feature flag `~s`: migration function crashed: ~p~n~p",
+ "Feature flags: `~s`: migration function crashed: "
+ "~p~n~p",
[FeatureName, Reason, Stacktrace]),
{error, {migration_fun_crash, Reason, Stacktrace}}
end;
@@ -1632,7 +1212,7 @@ run_migration_fun(FeatureName, FeatureProps, Arg) ->
{error, no_migration_fun};
Invalid ->
rabbit_log_feature_flags:error(
- "Feature flag `~s`: invalid migration function: ~p",
+ "Feature flags: `~s`: invalid migration function: ~p",
[FeatureName, Invalid]),
{error, {invalid_migration_fun, Invalid}}
end.
@@ -1654,8 +1234,8 @@ mark_as_enabled(FeatureName, IsEnabled) ->
%% @private
mark_as_enabled_locally(FeatureName, IsEnabled) ->
- rabbit_log_feature_flags:info(
- "Feature flag `~s`: mark as enabled=~p",
+ rabbit_log_feature_flags:debug(
+ "Feature flags: `~s`: mark as enabled=~p",
[FeatureName, IsEnabled]),
EnabledFeatureNames = maps:keys(list(enabled)),
NewEnabledFeatureNames = case IsEnabled of
@@ -1673,9 +1253,10 @@ mark_as_enabled_locally(FeatureName, IsEnabled) ->
ok =:= try_to_write_enabled_feature_flags_list(
NewEnabledFeatureNames)
end,
- initialize_registry(#{},
- #{FeatureName => IsEnabled},
- WrittenToDisk).
+ rabbit_ff_registry_factory:initialize_registry(
+ #{},
+ #{FeatureName => IsEnabled},
+ WrittenToDisk).
-spec mark_as_enabled_remotely(feature_name(), feature_state()) ->
any() | {error, any()} | no_return().
@@ -1748,6 +1329,7 @@ mark_as_enabled_remotely(Nodes, FeatureName, IsEnabled, Timeout) ->
%% Coordination with remote nodes.
%% -------------------------------------------------------------------
+-ifndef(TEST).
-spec remote_nodes() -> [node()].
%% @private
@@ -1761,6 +1343,44 @@ running_remote_nodes() ->
mnesia:system_info(running_db_nodes) -- [node()].
query_running_remote_nodes(Node, Timeout) ->
+ query_running_remote_nodes1(Node, Timeout).
+-else.
+-define(PT_OVERRIDDEN_NODES, {?MODULE, overridden_nodes}).
+-define(PT_OVERRIDDEN_RUNNING_NODES, {?MODULE, overridden_running_nodes}).
+
+remote_nodes() ->
+ case get_overriden_nodes() of
+ undefined -> mnesia:system_info(db_nodes) -- [node()];
+ Nodes -> Nodes -- [node()]
+ end.
+
+running_remote_nodes() ->
+ case get_overriden_running_nodes() of
+ undefined -> mnesia:system_info(running_db_nodes) -- [node()];
+ Nodes -> Nodes -- [node()]
+ end.
+
+query_running_remote_nodes(Node, Timeout) ->
+ case rpc:call(Node, ?MODULE, get_overriden_running_nodes, [], Timeout) of
+ {badrpc, _} = Error -> Error;
+ undefined -> query_running_remote_nodes1(Node, Timeout);
+ Nodes -> Nodes -- [node()]
+ end.
+
+override_nodes(Nodes) ->
+ persistent_term:put(?PT_OVERRIDDEN_NODES, Nodes).
+
+get_overriden_nodes() ->
+ persistent_term:get(?PT_OVERRIDDEN_NODES, undefined).
+
+override_running_nodes(Nodes) ->
+ persistent_term:put(?PT_OVERRIDDEN_RUNNING_NODES, Nodes).
+
+get_overriden_running_nodes() ->
+ persistent_term:get(?PT_OVERRIDDEN_RUNNING_NODES, undefined).
+-endif.
+
+query_running_remote_nodes1(Node, Timeout) ->
case rpc:call(Node, mnesia, system_info, [running_db_nodes], Timeout) of
{badrpc, _} = Error -> Error;
Nodes -> Nodes -- [node()]
@@ -1841,6 +1461,12 @@ check_node_compatibility(Node) ->
%% @see check_node_compatibility/1
check_node_compatibility(Node, Timeout) ->
+ case is_enabled(feature_flags_v2) of
+ true -> rabbit_ff_controller:check_node_compatibility(Node);
+ false -> check_node_compatibility_v1(Node, Timeout)
+ end.
+
+check_node_compatibility_v1(Node, Timeout) ->
%% Before checking compatibility, we exchange feature flags from
%% unknown Erlang applications. So we fetch remote feature flags
%% from applications which are not loaded locally, and the opposite.
@@ -1941,42 +1567,8 @@ remote_enabled_feature_flags_is_supported_locally(Node, Timeout) ->
is_supported_locally(RemoteEnabledFeatureNames)
end.
--spec run_feature_flags_mod_on_remote_node(node(),
- atom(),
- [term()],
- timeout()) ->
- term() | {error, term()}.
-%% @private
-
run_feature_flags_mod_on_remote_node(Node, Function, Args, Timeout) ->
- case rpc:call(Node, ?MODULE, Function, Args, Timeout) of
- {badrpc, {'EXIT',
- {undef,
- [{?MODULE, Function, Args, []}
- | _]}}} ->
- %% If rabbit_feature_flags:Function() is undefined
- %% on the remote node, we consider it to be a 3.7.x
- %% pre-feature-flags node.
- %%
- %% Theoretically, it could be an older version (3.6.x and
- %% older). But the RabbitMQ version consistency check
- %% (rabbit_misc:version_minor_equivalent/2) called from
- %% rabbit_mnesia:check_rabbit_consistency/2 already blocked
- %% this situation from happening before we reach this point.
- rabbit_log_feature_flags:debug(
- "Feature flags: ~s:~s~p unavailable on node `~s`: "
- "assuming it is a RabbitMQ 3.7.x pre-feature-flags node",
- [?MODULE, Function, Args, Node]),
- {error, pre_feature_flags_rabbitmq};
- {badrpc, Reason} = Error ->
- rabbit_log_feature_flags:error(
- "Feature flags: error while running ~s:~s~p "
- "on node `~s`: ~p",
- [?MODULE, Function, Args, Node, Reason]),
- {error, Error};
- Ret ->
- Ret
- end.
+ rabbit_ff_controller:rpc_call(Node, ?MODULE, Function, Args, Timeout).
-spec query_remote_feature_flags(node(),
Which :: all | enabled | disabled,
@@ -2044,7 +1636,8 @@ merge_feature_flags_from_unknown_apps(FeatureFlags)
"Feature flags: register feature flags provided by applications "
"unknown locally: ~p",
[maps:keys(FeatureFlagsFromUnknownApps)]),
- initialize_registry(FeatureFlagsFromUnknownApps)
+ rabbit_ff_registry_factory:initialize_registry(
+ FeatureFlagsFromUnknownApps)
end.
exchange_feature_flags_from_unknown_apps(Node, Timeout) ->
@@ -2097,7 +1690,13 @@ sync_feature_flags_with_cluster(Nodes, NodeIsVirgin) ->
ok | {error, any()} | no_return().
%% @private
-sync_feature_flags_with_cluster([], NodeIsVirgin, _) ->
+sync_feature_flags_with_cluster(Nodes, NodeIsVirgin, Timeout) ->
+ case is_enabled(feature_flags_v2) of
+ true -> rabbit_ff_controller:sync_cluster();
+ false -> sync_cluster_v1(Nodes, NodeIsVirgin, Timeout)
+ end.
+
+sync_cluster_v1([], NodeIsVirgin, _) ->
verify_which_feature_flags_are_actually_enabled(),
case NodeIsVirgin of
true ->
@@ -2139,10 +1738,96 @@ sync_feature_flags_with_cluster([], NodeIsVirgin, _) ->
"current state"),
ok
end;
-sync_feature_flags_with_cluster(Nodes, _, Timeout) ->
- verify_which_feature_flags_are_actually_enabled(),
+sync_cluster_v1(Nodes, _, Timeout) ->
+ case sync_feature_flags_v2_first(Nodes, Timeout) of
+ true ->
+ ?LOG_DEBUG(
+ "Feature flags: both sides have `feature_flags_v2` enabled; "
+ "switching to controller's sync",
+ [],
+ #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
+ rabbit_ff_controller:sync_cluster();
+ false ->
+ verify_which_feature_flags_are_actually_enabled(),
+ RemoteNodes = Nodes -- [node()],
+ sync_feature_flags_with_cluster1(RemoteNodes, Timeout)
+ end.
+
+sync_feature_flags_v2_first(Nodes, Timeout) ->
+ ?LOG_DEBUG(
+ "Feature flags: checking if one side of the sync has "
+ "`feature_flags_v2` enabled",
+ [],
+ #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
RemoteNodes = Nodes -- [node()],
- sync_feature_flags_with_cluster1(RemoteNodes, Timeout).
+ RandomRemoteNode = pick_one_node(RemoteNodes),
+ Ret1 = run_feature_flags_mod_on_remote_node(
+ RandomRemoteNode,
+ is_enabled,
+ [feature_flags_v2],
+ Timeout),
+ case Ret1 of
+ {error, Reason} ->
+ ?LOG_DEBUG(
+ "Feature flags: failed to check `feature_flags_v2` on remote "
+ "node: ~p",
+ [Reason],
+ #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
+ false;
+ EnabledRemotely ->
+ EnabledLocally = is_enabled(feature_flags_v2),
+ ?LOG_DEBUG(
+ "Feature flags: `feature_flags_v2` state: local=~s remote=~s",
+ [EnabledLocally, EnabledRemotely],
+ #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
+ case {EnabledLocally, EnabledRemotely} of
+ {true, true} ->
+ true;
+ {true, false} ->
+ ?LOG_DEBUG(
+ "Feature flags: enable `feature_flags_v2` remotely "
+ "and restart sync",
+ [],
+ #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
+ Ret2 = run_feature_flags_mod_on_remote_node(
+ RandomRemoteNode,
+ do_sync_feature_flags_with_node,
+ [[feature_flags_v2]],
+ Timeout),
+ case Ret2 of
+ ok ->
+ true;
+ {error, Reason} ->
+ ?LOG_DEBUG(
+ "Feature flags: failed to enable "
+ "`feature_flags_v2` remotely: ~p",
+ [Reason],
+ #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
+ false
+ end;
+ {false, true} ->
+ ?LOG_DEBUG(
+ "Feature flags: enable `feature_flags_v2` locally "
+ "and restart sync",
+ [],
+ #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
+ Ret3 = do_sync_feature_flags_with_node(
+ [feature_flags_v2]),
+ case Ret3 of
+ ok ->
+ true;
+ {error, Reason} ->
+ ?LOG_DEBUG(
+ "Feature flags: failed to enable "
+ "`feature_flags_v2` locally: ~p",
+ [Reason],
+ #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
+ false
+ end;
+ {false, false} ->
+ false
+ end
+ end.
sync_feature_flags_with_cluster1([], _) ->
ok;
@@ -2337,25 +2022,37 @@ verify_which_feature_flags_are_actually_enabled() ->
"flags"),
WrittenToDisk = ok =:= try_to_write_enabled_feature_flags_list(
RepairedEnabledFeatureNames),
- initialize_registry(
+ rabbit_ff_registry_factory:initialize_registry(
#{},
- list_of_enabled_feature_flags_to_feature_states(
+ enabled_feature_flags_to_feature_states(
RepairedEnabledFeatureNames),
WrittenToDisk)
end.
+-spec enabled_feature_flags_to_feature_states([feature_name()]) ->
+ feature_states().
+
+enabled_feature_flags_to_feature_states(FeatureNames) ->
+ maps:from_list([{FeatureName, true} || FeatureName <- FeatureNames]).
+
-spec refresh_feature_flags_after_app_load([atom()]) ->
ok | {error, any()} | no_return().
-refresh_feature_flags_after_app_load([]) ->
- ok;
refresh_feature_flags_after_app_load(Apps) ->
+ case is_enabled(feature_flags_v2) of
+ true -> rabbit_ff_controller:refresh_after_app_load();
+ false -> refresh_feature_flags_after_app_load_v1(Apps)
+ end.
+
+refresh_feature_flags_after_app_load_v1([]) ->
+ ok;
+refresh_feature_flags_after_app_load_v1(Apps) ->
rabbit_log_feature_flags:debug(
"Feature flags: new apps loaded: ~p -> refreshing feature flags",
[Apps]),
FeatureFlags0 = list(all),
- FeatureFlags1 = query_supported_feature_flags(),
+ {_ScannedApps, FeatureFlags1} = query_supported_feature_flags(),
%% The following list contains all the feature flags this node
%% learned about only because remote nodes have them. Now, the
@@ -2396,7 +2093,7 @@ refresh_feature_flags_after_app_load(Apps) ->
[lists:sort(NewSupportedFeatureNames)])
end,
- case initialize_registry() of
+ case rabbit_ff_registry_factory:initialize_registry() of
ok ->
Ret = maybe_enable_locally_after_app_load(
AlreadySupportedFeatureNames),
diff --git a/deps/rabbit/src/rabbit_ff_controller.erl b/deps/rabbit/src/rabbit_ff_controller.erl
new file mode 100644
index 0000000000..709e2e05fe
--- /dev/null
+++ b/deps/rabbit/src/rabbit_ff_controller.erl
@@ -0,0 +1,1152 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2021-2022 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+%% @author The RabbitMQ team
+%% @copyright 2022 VMware, Inc. or its affiliates.
+%%
+%% @doc
+%% The feature flag controller is responsible for synchronization and managing
+%% concurrency when feature flag states are changed.
+%%
+%% It makes sure only one node can enable feature flags or synchronize its
+%% feature flag states with other nodes at a time. If another node wants to
+%% enable a feature flag or if it joins a cluster and needs to synchronize its
+%% feature flag states, it will wait for the current task to finish.
+%%
+%% The feature flag controller is used as soon as the `feature_flags_v2'
+%% feature flag is enabled.
+%%
+%% Compared to the initial subsystem, it also introduces a new migration
+%% function signature; See {@link rabbit_feature_flags:migration_fun_v2()}.
+
+-module(rabbit_ff_controller).
+-behaviour(gen_statem).
+
+-include_lib("kernel/include/logger.hrl").
+-include_lib("stdlib/include/assert.hrl").
+
+-include_lib("rabbit_common/include/logging.hrl").
+
+-include("feature_flags.hrl").
+
+-export([is_supported/1, is_supported/2,
+ enable/1,
+ check_node_compatibility/1,
+ sync_cluster/0,
+ refresh_after_app_load/0]).
+
+%% Internal use only.
+-export([start/0,
+ start_link/0,
+ rpc_call/5,
+ all_nodes/0,
+ running_nodes/0]).
+
+%% gen_statem callbacks.
+-export([callback_mode/0,
+ init/1,
+ terminate/3,
+ code_change/4,
+
+ standing_by/3,
+ waiting_for_end_of_controller_task/3,
+ updating_feature_flag_states/3]).
+
+-record(?MODULE, {from,
+ notify = #{}}).
+
+-define(LOCAL_NAME, ?MODULE).
+-define(GLOBAL_NAME, {?MODULE, global}).
+
+%% Default timeout for operations on remote nodes.
+-define(TIMEOUT, 60000).
+
+start() ->
+ gen_statem:start({local, ?LOCAL_NAME}, ?MODULE, none, []).
+
+start_link() ->
+ gen_statem:start_link({local, ?LOCAL_NAME}, ?MODULE, none, []).
+
+is_supported(FeatureNames) ->
+ is_supported(FeatureNames, ?TIMEOUT).
+
+is_supported(FeatureName, Timeout) when is_atom(FeatureName) ->
+ is_supported([FeatureName], Timeout);
+is_supported(FeatureNames, Timeout) when is_list(FeatureNames) ->
+ Nodes = running_nodes(),
+ case collect_inventory_on_nodes(Nodes, Timeout) of
+ {ok, Inventory} ->
+ lists:all(
+ fun(FeatureName) ->
+ is_known_and_supported(Inventory, FeatureName)
+ end,
+ FeatureNames);
+ _Error ->
+ false
+ end.
+
+enable(FeatureName) when is_atom(FeatureName) ->
+ enable([FeatureName]);
+enable(FeatureNames) when is_list(FeatureNames) ->
+ ?LOG_DEBUG(
+ "Feature flags: REQUEST TO ENABLE: ~p",
+ [FeatureNames],
+ #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
+ gen_statem:call(?LOCAL_NAME, {enable, FeatureNames}).
+
+check_node_compatibility(RemoteNode) ->
+ ThisNode = node(),
+ ?LOG_DEBUG(
+ "Feature flags: CHECKING COMPATIBILITY between nodes `~s` and `~s`",
+ [ThisNode, RemoteNode],
+ #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
+ %% We don't go through the controller process to check nodes compatibility
+ %% because this function is used while `rabbit' is stopped usually.
+ %%
+ %% There is no benefit in starting a controller just for this check
+ %% because it would not guaranty that the compatibility remains true after
+ %% this function finishes and before the node starts and synchronizes
+ %% feature flags.
+ check_node_compatibility_task(ThisNode, RemoteNode).
+
+sync_cluster() ->
+ ?LOG_DEBUG(
+ "Feature flags: SYNCING FEATURE FLAGS in cluster...",
+ [],
+ #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
+ case erlang:whereis(?LOCAL_NAME) of
+ Pid when is_pid(Pid) ->
+ %% The function is called while `rabbit' is running.
+ gen_statem:call(?LOCAL_NAME, sync_cluster);
+ undefined ->
+ %% The function is called while `rabbit' is stopped. We need to
+ %% start a one-off controller, again to make sure concurrent
+ %% changes are blocked.
+ {ok, Pid} = start_link(),
+ Ret = gen_statem:call(Pid, sync_cluster),
+ gen_statem:stop(Pid),
+ Ret
+ end.
+
+refresh_after_app_load() ->
+ ?LOG_DEBUG(
+ "Feature flags: REFRESHING after applications load...",
+ [],
+ #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
+ gen_statem:call(?LOCAL_NAME, refresh_after_app_load).
+
+%% --------------------------------------------------------------------
+%% gen_statem callbacks.
+%% --------------------------------------------------------------------
+
+callback_mode() ->
+ state_functions.
+
+init(_Args) ->
+ {ok, standing_by, none}.
+
+standing_by(
+ {call, From} = EventType, EventContent, none)
+ when EventContent =/= notify_when_done ->
+ ?LOG_DEBUG(
+ "Feature flags: registering controller globally before "
+ "proceeding with task: ~p",
+ [EventContent],
+ #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
+
+ %% The first step is to register this process globally (it is already
+ %% registered locally). The purpose is to make sure this one takes full
+ %% control on feature flag changes among other controllers.
+ %%
+ %% This is useful for situations where a new node joins the cluster while
+ %% a feature flag is being enabled. In this case, when that new node joins
+ %% and its controller wants to synchronize feature flags, it will block
+ %% and wait for this one to finish.
+ case register_globally() of
+ yes ->
+ %% We would register the process globally. Therefore we can
+ %% proceed with enabling/syncing feature flags.
+ ?LOG_DEBUG(
+ "Feature flags: controller globally registered; can proceed "
+ "with task",
+ [],
+ #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
+
+ Data = #?MODULE{from = From},
+ {next_state, updating_feature_flag_states, Data,
+ [{next_event, internal, EventContent}]};
+
+ no ->
+ %% Another controller is globally registered. We ask that global
+ %% controller to notify us when it is done, and we wait for its
+ %% response.
+ ?LOG_DEBUG(
+ "Feature flags: controller NOT globally registered; need to "
+ "wait for the current global controller's task to finish",
+ [],
+ #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
+
+ RequestId = notify_me_when_done(),
+ {next_state, waiting_for_end_of_controller_task, RequestId,
+ [{next_event, EventType, EventContent}]}
+ end;
+standing_by(
+ {call, From}, notify_when_done, none) ->
+ %% This state is entered when a globally-registered controller finished
+ %% its task but had unhandled `notify_when_done` requests in its inbox. We
+ %% just need to notify the caller that it can proceed.
+ notify_waiting_controller(From),
+ {keep_state_and_data, []}.
+
+waiting_for_end_of_controller_task(
+ {call, _From}, _EventContent, _RequestId) ->
+ {keep_state_and_data, [postpone]};
+waiting_for_end_of_controller_task(
+ info, Msg, RequestId) ->
+ case gen_statem:check_response(Msg, RequestId) of
+ {reply, done} ->
+ ?LOG_DEBUG(
+ "Feature flags: current global controller's task finished; "
+ "trying to take next turn",
+ [],
+ #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
+ {next_state, standing_by, none, []};
+ {error, Reason} ->
+ ?LOG_DEBUG(
+ "Feature flags: error while waiting for current global "
+ "controller's task: ~0p; trying to take next turn",
+ [Reason],
+ #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
+ {next_state, standing_by, none, []};
+ no_reply ->
+ ?LOG_DEBUG(
+ "Feature flags: unknown message while waiting for current "
+ "global controller's task: ~0p; still waiting",
+ [Msg],
+ #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
+ keep_state_and_data
+ end.
+
+updating_feature_flag_states(
+ internal, Task, #?MODULE{from = From} = Data) ->
+ Reply = proceed_with_task(Task),
+
+ ?LOG_DEBUG(
+ "Feature flags: unregistering controller globally",
+ [],
+ #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
+ unregister_globally(),
+ notify_waiting_controllers(Data),
+ {next_state, standing_by, none, [{reply, From, Reply}]};
+updating_feature_flag_states(
+ {call, From}, notify_when_done, #?MODULE{notify = Notify} = Data) ->
+ Notify1 = Notify#{From => true},
+ Data1 = Data#?MODULE{notify = Notify1},
+ {keep_state, Data1}.
+
+proceed_with_task({enable, FeatureNames}) ->
+ enable_task(FeatureNames);
+proceed_with_task(sync_cluster) ->
+ sync_cluster_task();
+proceed_with_task(refresh_after_app_load) ->
+ refresh_after_app_load_task().
+
+terminate(_Reason, _State, _Data) ->
+ ok.
+
+code_change(_OldVsn, OldState, OldData, _Extra) ->
+ {ok, OldState, OldData}.
+
+%% --------------------------------------------------------------------
+%% Global name registration.
+%% --------------------------------------------------------------------
+
+register_globally() ->
+ ?LOG_DEBUG(
+ "Feature flags: [global sync] @ ~s",
+ [node()],
+ #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
+ ok = global:sync(),
+ ?LOG_DEBUG(
+ "Feature flags: [global register] @ ~s",
+ [node()],
+ #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
+ global:register_name(?GLOBAL_NAME, self()).
+
+unregister_globally() ->
+ ?LOG_DEBUG(
+ "Feature flags: [global unregister] @ ~s",
+ [node()],
+ #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
+ _ = global:unregister_name(?GLOBAL_NAME),
+ ok.
+
+notify_me_when_done() ->
+ gen_statem:send_request({global, ?GLOBAL_NAME}, notify_when_done).
+
+notify_waiting_controllers(#?MODULE{notify = Notify}) ->
+ maps:fold(
+ fun(From, true, Acc) ->
+ notify_waiting_controller(From),
+ Acc
+ end, ok, Notify).
+
+notify_waiting_controller({ControlerPid, _} = From) ->
+ ControlerNode = node(ControlerPid),
+ ?LOG_DEBUG(
+ "Feature flags: controller's task finished; notify waiting controller "
+ "on node ~p",
+ [ControlerNode],
+ #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
+ gen_statem:reply(From, done).
+
+%% --------------------------------------------------------------------
+%% Code to check compatibility between nodes.
+%% --------------------------------------------------------------------
+
+-spec check_node_compatibility_task(Node, Node) -> Ret when
+ Node :: node(),
+ Ret :: ok | {error, Reason},
+ Reason :: incompatible_feature_flags.
+
+check_node_compatibility_task(NodeA, NodeB) ->
+ ?LOG_NOTICE(
+ "Feature flags: checking nodes `~s` and `~s` compatibility...",
+ [NodeA, NodeB],
+ #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
+ NodesA = list_nodes_clustered_with(NodeA),
+ NodesB = list_nodes_clustered_with(NodeB),
+ AreCompatible = case collect_inventory_on_nodes(NodesA) of
+ {ok, InventoryA} ->
+ ?LOG_DEBUG(
+ "Feature flags: inventory of node `~s`:~n~p",
+ [NodeA, InventoryA],
+ #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
+ case collect_inventory_on_nodes(NodesB) of
+ {ok, InventoryB} ->
+ ?LOG_DEBUG(
+ "Feature flags: inventory of node "
+ "`~s`:~n~p",
+ [NodeB, InventoryB],
+ #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
+ are_compatible(InventoryA, InventoryB);
+ _ ->
+ false
+ end;
+ _ ->
+ false
+ end,
+ case AreCompatible of
+ true ->
+ ?LOG_NOTICE(
+ "Feature flags: nodes `~s` and `~s` are compatible",
+ [NodeA, NodeB],
+ #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
+ ok;
+ false ->
+ ?LOG_WARNING(
+ "Feature flags: nodes `~s` and `~s` are incompatible",
+ [NodeA, NodeB],
+ #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
+ {error, incompatible_feature_flags}
+ end.
+
+-spec list_nodes_clustered_with(Node) -> [Node] when
+ Node :: node().
+
+list_nodes_clustered_with(Node) ->
+ %% If Mnesia is stopped on the given node, it will return an empty list.
+ %% In this case, only consider that stopped node.
+ case rpc_call(Node, ?MODULE, running_nodes, [], ?TIMEOUT) of
+ [] -> [Node];
+ List -> List
+ end.
+
+-spec are_compatible(Inventory, Inventory) -> AreCompatible when
+ Inventory :: rabbit_feature_flags:cluster_inventory(),
+ AreCompatible :: boolean().
+
+are_compatible(InventoryA, InventoryB) ->
+ check_one_way_compatibility(InventoryA, InventoryB)
+ andalso
+ check_one_way_compatibility(InventoryB, InventoryA).
+
+-spec check_one_way_compatibility(Inventory, Inventory) -> AreCompatible when
+ Inventory :: rabbit_feature_flags:cluster_inventory(),
+ AreCompatible :: boolean().
+
+check_one_way_compatibility(InventoryA, InventoryB) ->
+ %% This function checks the compatibility in one way, "inventory A" is
+ %% compatible with "inventory B". This is true if all feature flags
+ %% enabled in "inventory B" are supported by "inventory A".
+ %%
+ %% They don't need to be enabled on both side: the feature flags states
+ %% will be synchronized by `sync_cluster()'.
+ FeatureNames = list_feature_flags_enabled_somewhere(InventoryB, true),
+ lists:all(
+ fun(FeatureName) ->
+ #{feature_flags := #{FeatureName := FeatureFlag}} = InventoryB,
+ not is_known(InventoryA, FeatureFlag)
+ orelse
+ is_known_and_supported(InventoryA, FeatureName)
+ end, FeatureNames).
+
+%% --------------------------------------------------------------------
+%% Code to enable and sync feature flags.
+%% --------------------------------------------------------------------
+
+-spec enable_task(FeatureNames) -> Ret when
+ FeatureNames :: [rabbit_feature_flags:feature_name()],
+ Ret :: ok | {error, Reason},
+ Reason :: term().
+
+enable_task(FeatureNames) ->
+ %% We take a snapshot of clustered nodes, including stopped nodes, at the
+ %% beginning of the process and use that list at all time.
+ %%
+ %% If there are some missing, unreachable or stopped nodes, we abort the
+ %% task and refuse to enable the feature flag(s). This is to make sure
+ %% that if there is a network partition, the other side of the partition
+ %% doesn't "rejoin" the cluster with some feature flags enabled behind the
+ %% scene.
+ %%
+ %% TODO: Should we remove the requirement above and call `sync_cluster()'
+ %% when a network partition is repaired?
+ %%
+ %% If a node tries to join the cluster during the task, it will block
+ %% because it will want to synchronize its feature flags state with the
+ %% cluster. For that to happen, this controller `enable' task needs to
+ %% finish before the synchronization task starts.
+ %%
+ %% If a node stops during the task, this will trigger an RPC error at some
+ %% point and the task will abort. That's ok because migration functions
+ %% are supposed to be idempotent.
+ AllNodes = all_nodes(),
+ RunningNodes = running_nodes(),
+ case RunningNodes of
+ AllNodes ->
+ ?LOG_DEBUG(
+ "Feature flags: nodes where the feature flags will be "
+ "enabled: ~p~n"
+ "Feature flags: new nodes joining the cluster in between "
+ "will wait and synchronize their feature flag states after.",
+ [AllNodes],
+ #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
+
+ %% Likewise, we take a snapshot of the feature flags states on all
+ %% running nodes right from the beginning. This is what we use
+ %% during the entire task to determine if feature flags are
+ %% supported, enabled somewhere, etc.
+ case collect_inventory_on_nodes(AllNodes) of
+ {ok, Inventory} -> enable_many(Inventory, FeatureNames);
+ Error -> Error
+ end;
+ _ ->
+ ?LOG_ERROR(
+ "Feature flags: refuse to enable feature flags while "
+ "clustered nodes are missing, stopped or unreachable: ~p",
+ [AllNodes -- RunningNodes]),
+ {error, missing_clustered_nodes}
+ end.
+
+-spec sync_cluster_task() -> Ret when
+ Ret :: ok | {error, Reason},
+ Reason :: term().
+
+sync_cluster_task() ->
+ %% We assume that a feature flag can only be enabled, not disabled.
+ %% Therefore this synchronization searches for feature flags enabled on
+ %% some nodes but not all, and make sure they are enabled everywhere.
+ %%
+ %% This happens when a node joins a cluster and that node has a different
+ %% set of enabled feature flags.
+ %%
+ %% FIXME: `enable_task()' requires that all nodes in the cluster run to
+ %% enable anything. Should we require the same here? On one hand, this
+ %% would make sure a feature flag isn't enabled while there is a network
+ %% partition. On the other hand, this would require that all nodes are
+ %% running before we can expand the cluster...
+ Nodes = running_nodes(),
+ ?LOG_DEBUG(
+ "Feature flags: synchronizing feature flags on nodes: ~p",
+ [Nodes],
+ #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
+
+ case collect_inventory_on_nodes(Nodes) of
+ {ok, Inventory} ->
+ FeatureNames = list_feature_flags_enabled_somewhere(
+ Inventory, false),
+ enable_many(Inventory, FeatureNames);
+ Error ->
+ Error
+ end.
+
+-spec refresh_after_app_load_task() -> Ret when
+ Ret :: ok | {error, Reason},
+ Reason :: term().
+
+refresh_after_app_load_task() ->
+ case rabbit_ff_registry_factory:initialize_registry() of
+ ok -> sync_cluster_task();
+ Error -> Error
+ end.
+
+-spec enable_many(Inventory, FeatureNames) -> Ret when
+ Inventory :: rabbit_feature_flags:cluster_inventory(),
+ FeatureNames :: [rabbit_feature_flags:feature_name()],
+ Ret :: ok | {error, Reason},
+ Reason :: term().
+
+enable_many(#{states_per_node := _} = Inventory, [FeatureName | Rest]) ->
+ case enable_if_supported(Inventory, FeatureName) of
+ ok -> enable_many(Inventory, Rest);
+ Error -> Error
+ end;
+enable_many(_Inventory, []) ->
+ ok.
+
+-spec enable_if_supported(Inventory, FeatureName) -> Ret when
+ Inventory :: rabbit_feature_flags:cluster_inventory(),
+ FeatureName :: rabbit_feature_flags:feature_name(),
+ Ret :: ok | {error, Reason},
+ Reason :: term().
+
+enable_if_supported(#{states_per_node := _} = Inventory, FeatureName) ->
+ case is_known_and_supported(Inventory, FeatureName) of
+ true ->
+ ?LOG_DEBUG(
+ "Feature flags: `~s`: supported; continuing",
+ [FeatureName],
+ #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
+ case lock_registry_and_enable(Inventory, FeatureName) of
+ {ok, _Inventory} -> ok;
+ Error -> Error
+ end;
+ false ->
+ ?LOG_DEBUG(
+ "Feature flags: `~s`: unsupported; aborting",
+ [FeatureName],
+ #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
+ {error, unsupported}
+ end.
+
+-spec lock_registry_and_enable(Inventory, FeatureName) -> Ret when
+ Inventory :: rabbit_feature_flags:cluster_inventory(),
+ FeatureName :: rabbit_feature_flags:feature_name(),
+ Ret :: {ok, Inventory} | {error, Reason},
+ Reason :: term().
+
+lock_registry_and_enable(#{states_per_node := _} = Inventory, FeatureName) ->
+ %% We acquire a lock before making any change to the registry. This is not
+ %% used by the controller (because it is already using a globally
+ %% registered name to prevent concurrent runs). But this is used in
+ %% `rabbit_feature_flags:is_enabled()' to block while the state is
+ %% `state_changing'.
+ ?LOG_DEBUG(
+ "Feature flags: acquiring registry state change lock",
+ [],
+ #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
+ rabbit_ff_registry_factory:acquire_state_change_lock(),
+
+ Ret = enable_with_registry_locked(Inventory, FeatureName),
+
+ ?LOG_DEBUG(
+ "Feature flags: releasing registry state change lock",
+ [],
+ #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
+ rabbit_ff_registry_factory:release_state_change_lock(),
+ Ret.
+
+-spec enable_with_registry_locked(Inventory, FeatureName) -> Ret when
+ Inventory :: rabbit_feature_flags:cluster_inventory(),
+ FeatureName :: rabbit_feature_flags:feature_name(),
+ Ret :: {ok, Inventory} | {error, Reason},
+ Reason :: term().
+
+enable_with_registry_locked(
+ #{states_per_node := _} = Inventory, FeatureName) ->
+ %% We verify if the feature flag needs to be enabled somewhere. This may
+ %% have changed since the beginning, not because another controller
+ %% enabled it (this is not possible because only one can run at a given
+ %% time), but because this feature flag was already enabled as a
+ %% consequence (for instance, it's a dependency of another feature flag we
+ %% processed).
+ Nodes = list_nodes_where_feature_flag_is_disabled(Inventory, FeatureName),
+ case Nodes of
+ [] ->
+ ?LOG_DEBUG(
+ "Feature flags: `~s`: already enabled; skipping",
+ [FeatureName],
+ #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
+ {ok, Inventory};
+ _ ->
+ ?LOG_NOTICE(
+ "Feature flags: attempt to enable `~s`...",
+ [FeatureName],
+ #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
+
+ case update_feature_state_and_enable(Inventory, FeatureName) of
+ {ok, _Inventory} = Ok ->
+ ?LOG_NOTICE(
+ "Feature flags: `~s` enabled",
+ [FeatureName],
+ #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
+ Ok;
+ Error ->
+ ?LOG_ERROR(
+ "Feature flags: failed to enable `~s`: ~p",
+ [FeatureName, Error],
+ #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
+ Error
+ end
+ end.
+
+-spec update_feature_state_and_enable(Inventory, FeatureName) -> Ret when
+ Inventory :: rabbit_feature_flags:cluster_inventory(),
+ FeatureName :: rabbit_feature_flags:feature_name(),
+ Ret :: {ok, Inventory} | {error, Reason},
+ Reason :: term().
+
+update_feature_state_and_enable(
+ #{states_per_node := _} = Inventory, FeatureName) ->
+ %% The feature flag is marked as `state_changing' on all running nodes who
+ %% know it, including those where it's already enabled. The idea is that
+ %% the feature flag is between two states on some nodes and the code
+ %% running on the nodes where the feature flag enabled shouldn't assume
+ %% all nodes in the cluster are in the same situation.
+ Nodes = list_nodes_who_know_the_feature_flag(Inventory, FeatureName),
+
+ %% We only consider nodes where the feature flag is disabled. The
+ %% migration function is only executed on those nodes. I.e. we don't run
+ %% it again where it has already been executed.
+ NodesWhereDisabled = list_nodes_where_feature_flag_is_disabled(
+ Inventory, FeatureName),
+
+ Ret1 = mark_as_enabled_on_nodes(
+ Nodes, Inventory, FeatureName, state_changing),
+ case Ret1 of
+ %% We ignore the returned updated inventory because we don't need or
+ %% even want to remember the `state_changing' state. This is only used
+ %% for external queries of the registry.
+ {ok, _Inventory0} ->
+ case do_enable(Inventory, FeatureName, NodesWhereDisabled) of
+ {ok, Inventory1} ->
+ Ret2 = mark_as_enabled_on_nodes(
+ Nodes, Inventory1, FeatureName, true),
+ case Ret2 of
+ {ok, Inventory2} ->
+ post_enable(
+ Inventory2, FeatureName, NodesWhereDisabled),
+ Ret2;
+ Error ->
+ _ = mark_as_enabled_on_nodes(
+ Nodes, Inventory, FeatureName, false),
+ Error
+ end;
+ Error ->
+ _ = mark_as_enabled_on_nodes(
+ Nodes, Inventory, FeatureName, false),
+ Error
+ end;
+ Error ->
+ _ = mark_as_enabled_on_nodes(
+ Nodes, Inventory, FeatureName, false),
+ Error
+ end.
+
+-spec do_enable(Inventory, FeatureName, Nodes) -> Ret when
+ Inventory :: rabbit_feature_flags:cluster_inventory(),
+ FeatureName :: rabbit_feature_flags:feature_name(),
+ Nodes :: [node()],
+ Ret :: {ok, Inventory} | {error, Reason},
+ Reason :: term().
+
+do_enable(#{states_per_node := _} = Inventory, FeatureName, Nodes) ->
+ %% After dependencies are enabled, we need to remember the updated
+ %% inventory. This is useful later to skip feature flags enabled earlier
+ %% in the process. For instance because a feature flag is dependency of
+ %% several other feature flags.
+ case enable_dependencies(Inventory, FeatureName) of
+ {ok, Inventory1} ->
+ Extra = #{nodes => Nodes},
+ Rets = run_migration_fun(
+ Nodes, FeatureName, enable, Extra, infinity),
+ maps:fold(
+ fun
+ (_Node, ok, {ok, _} = Ret) -> Ret;
+ (_Node, Error, {ok, _}) -> Error;
+ (_Node, _, Error) -> Error
+ end, {ok, Inventory1}, Rets);
+ Error ->
+ Error
+ end.
+
+-spec post_enable(Inventory, FeatureName, Nodes) -> Ret when
+ Inventory :: rabbit_feature_flags:cluster_inventory(),
+ FeatureName :: rabbit_feature_flags:feature_name(),
+ Nodes :: [node()],
+ Ret :: ok.
+
+post_enable(#{states_per_node := _}, FeatureName, Nodes) ->
+ case rabbit_feature_flags:uses_migration_fun_v2(FeatureName) of
+ true ->
+ Extra = #{nodes => Nodes},
+ _ = run_migration_fun(
+ Nodes, FeatureName, post_enable, Extra, infinity),
+ ok;
+ false ->
+ ok
+ end.
+
+%% --------------------------------------------------------------------
+%% Cluster relationship and inventory.
+%% --------------------------------------------------------------------
+
+-ifndef(TEST).
+all_nodes() ->
+ lists:usort([node() | mnesia:system_info(db_nodes)]).
+
+running_nodes() ->
+ lists:usort([node() | mnesia:system_info(running_db_nodes)]).
+-else.
+all_nodes() ->
+ RemoteNodes = case rabbit_feature_flags:get_overriden_nodes() of
+ undefined -> mnesia:system_info(db_nodes);
+ Nodes -> Nodes
+ end,
+ lists:usort([node() | RemoteNodes]).
+
+running_nodes() ->
+ RemoteNodes = case rabbit_feature_flags:get_overriden_running_nodes() of
+ undefined -> mnesia:system_info(running_db_nodes);
+ Nodes -> Nodes
+ end,
+ lists:usort([node() | RemoteNodes]).
+-endif.
+
+collect_inventory_on_nodes(Nodes) ->
+ collect_inventory_on_nodes(Nodes, ?TIMEOUT).
+
+-spec collect_inventory_on_nodes(Nodes, Timeout) -> Ret when
+ Nodes :: [node()],
+ Timeout :: timeout(),
+ Ret :: {ok, Inventory} | {error, Reason},
+ Inventory :: rabbit_feature_flags:cluster_inventory(),
+ Reason :: term().
+
+collect_inventory_on_nodes(Nodes, Timeout) ->
+ ?LOG_DEBUG(
+ "Feature flags: collecting inventory on nodes: ~p",
+ [Nodes],
+ #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
+ Inventory0 = #{feature_flags => #{},
+ applications_per_node => #{},
+ states_per_node => #{}},
+ Rets = rpc_calls(Nodes, rabbit_ff_registry, inventory, [], Timeout),
+ maps:fold(
+ fun
+ (Node,
+ #{feature_flags := FeatureFlags1,
+ applications := ScannedApps,
+ states := FeatureStates},
+ {ok, #{feature_flags := FeatureFlags,
+ applications_per_node := ScannedAppsPerNode,
+ states_per_node := StatesPerNode} = Inventory}) ->
+ FeatureFlags2 = maps:merge(FeatureFlags, FeatureFlags1),
+ ScannedAppsPerNode1 = ScannedAppsPerNode#{Node => ScannedApps},
+ StatesPerNode1 = StatesPerNode#{Node => FeatureStates},
+ Inventory1 = Inventory#{
+ feature_flags => FeatureFlags2,
+ applications_per_node => ScannedAppsPerNode1,
+ states_per_node => StatesPerNode1},
+ {ok, Inventory1};
+ (_Node, #{}, Error) ->
+ Error;
+ (_Node, Error, {ok, #{}}) ->
+ Error;
+ (_Node, _Error, Error) ->
+ Error
+ end, {ok, Inventory0}, Rets).
+
+-spec list_feature_flags_enabled_somewhere(Inventory, HandleStateChanging) ->
+ Ret when
+ Inventory :: rabbit_feature_flags:cluster_inventory(),
+ HandleStateChanging :: boolean(),
+ Ret :: [FeatureName],
+ FeatureName :: rabbit_feature_flags:feature_name().
+
+list_feature_flags_enabled_somewhere(
+ #{states_per_node := StatesPerNode},
+ HandleStateChanging) ->
+ %% We want to collect feature flags which are enabled on at least one
+ %% node.
+ MergedStates = maps:fold(
+ fun(_Node, FeatureStates, Acc1) ->
+ maps:fold(
+ fun
+ (FeatureName, true, Acc2) ->
+ Acc2#{FeatureName => true};
+ (_FeatureName, false, Acc2) ->
+ Acc2;
+ (FeatureName, state_changing, Acc2)
+ when HandleStateChanging ->
+ Acc2#{FeatureName => true}
+ end, Acc1, FeatureStates)
+ end, #{}, StatesPerNode),
+ lists:sort(maps:keys(MergedStates)).
+
+-spec list_nodes_who_know_the_feature_flag(Inventory, FeatureName) ->
+ Ret when
+ Inventory :: rabbit_feature_flags:cluster_inventory(),
+ FeatureName :: rabbit_feature_flags:feature_name(),
+ Ret :: [node()].
+
+list_nodes_who_know_the_feature_flag(
+ #{states_per_node := StatesPerNode},
+ FeatureName) ->
+ Nodes = lists:sort(
+ maps:keys(
+ maps:filter(
+ fun(_Node, FeatureStates) ->
+ maps:is_key(FeatureName, FeatureStates)
+ end, StatesPerNode))),
+ this_node_first(Nodes).
+
+-spec list_nodes_where_feature_flag_is_disabled(Inventory, FeatureName) ->
+ Ret when
+ Inventory :: rabbit_feature_flags:cluster_inventory(),
+ FeatureName :: rabbit_feature_flags:feature_name(),
+ Ret :: [node()].
+
+list_nodes_where_feature_flag_is_disabled(
+ #{states_per_node := StatesPerNode},
+ FeatureName) ->
+ Nodes = lists:sort(
+ maps:keys(
+ maps:filter(
+ fun(_Node, FeatureStates) ->
+ case FeatureStates of
+ %% The feature flag is known on this node, run
+ %% the migration function only if it is
+ %% disabled.
+ #{FeatureName := Enabled} -> not Enabled;
+
+
+ %% The feature flags is unknown on this node,
+ %% don't run the migration function.
+ _ -> false
+ end
+ end, StatesPerNode))),
+ this_node_first(Nodes).
+
+this_node_first(Nodes) ->
+ ThisNode = node(),
+ case lists:member(ThisNode, Nodes) of
+ true -> [ThisNode | Nodes -- [ThisNode]];
+ false -> Nodes
+ end.
+
+-spec rpc_call(Node, Module, Function, Args, Timeout) -> Ret when
+ Node :: node(),
+ Module :: module(),
+ Function :: atom(),
+ Args :: [term()],
+ Timeout :: timeout(),
+ Ret :: term() | {error, term()}.
+
+rpc_call(Node, Module, Function, Args, Timeout) ->
+ case rpc:call(Node, Module, Function, Args, Timeout) of
+ {badrpc, {'EXIT',
+ {undef,
+ [{rabbit_feature_flags, Function, Args, []}
+ | _]}}} ->
+ %% If rabbit_feature_flags:Function() is undefined
+ %% on the remote node, we consider it to be a 3.7.x
+ %% pre-feature-flags node.
+ %%
+ %% Theoretically, it could be an older version (3.6.x and
+ %% older). But the RabbitMQ version consistency check
+ %% (rabbit_misc:version_minor_equivalent/2) called from
+ %% rabbit_mnesia:check_rabbit_consistency/2 already blocked
+ %% this situation from happening before we reach this point.
+ ?LOG_DEBUG(
+ "Feature flags: ~s:~s~p unavailable on node `~s`: "
+ "assuming it is a RabbitMQ 3.7.x pre-feature-flags node",
+ [?MODULE, Function, Args, Node],
+ #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
+ {error, pre_feature_flags_rabbitmq};
+ {badrpc, Reason} = Error ->
+ ?LOG_ERROR(
+ "Feature flags: error while running:~n"
+ "Feature flags: ~s:~s~p~n"
+ "Feature flags: on node `~s`:~n"
+ "Feature flags: ~p",
+ [?MODULE, Function, Args, Node, Reason],
+ #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
+ {error, Error};
+ Ret ->
+ Ret
+ end.
+
+-spec rpc_calls(Nodes, Module, Function, Args, Timeout) -> Rets when
+ Nodes :: [node()],
+ Module :: module(),
+ Function :: atom(),
+ Args :: [term()],
+ Timeout :: timeout(),
+ Rets :: #{node() => term()}.
+
+rpc_calls(Nodes, Module, Function, Args, Timeout) when is_list(Nodes) ->
+ Parent = self(),
+ CallRef = {Module, Function, Args},
+ Runners = [{Node,
+ spawn_monitor(
+ fun() ->
+ Ret = rpc_call(
+ Node, Module, Function, Args, Timeout),
+ Parent ! {internal_rpc_call, Node, CallRef, Ret}
+ end)}
+ || Node <- Nodes],
+ Rets = [receive
+ {internal_rpc_call, Node, CallRef, Ret} ->
+ %% After we got the return value for that node, we still
+ %% need to consume the `DOWN' message emitted once the
+ %% spawn process exited.
+ receive
+ {'DOWN', MRef, process, Pid, Reason} ->
+ ?assertEqual(normal, Reason)
+ end,
+ {Node, Ret};
+ {'DOWN', MRef, process, Pid, Reason} ->
+ {Node, {error, {'DOWN', Reason}}}
+ end
+ || {Node, {Pid, MRef}} <- Runners],
+ maps:from_list(Rets).
+
+%% --------------------------------------------------------------------
+%% Feature flag support queries.
+%% --------------------------------------------------------------------
+
+-spec is_known(Inventory, FeatureFlag) -> IsKnown when
+ Inventory :: rabbit_feature_flags:cluster_inventory(),
+ FeatureFlag :: rabbit_feature_flags:feature_props_extended(),
+ IsKnown :: boolean().
+
+is_known(
+ #{applications_per_node := ScannedAppsPerNode},
+ #{provided_by := App} = _FeatureFlag) ->
+ maps:fold(
+ fun
+ (_Node, ScannedApps, false) -> lists:member(App, ScannedApps);
+ (_Node, _ScannedApps, true) -> true
+ end, false, ScannedAppsPerNode).
+
+-spec is_known_and_supported(Inventory, FeatureName) ->
+ IsKnownAndSupported when
+ Inventory :: rabbit_feature_flags:cluster_inventory(),
+ FeatureName :: rabbit_feature_flags:feature_name(),
+ IsKnownAndSupported :: boolean().
+
+is_known_and_supported(
+ #{feature_flags := FeatureFlags,
+ applications_per_node := ScannedAppsPerNode,
+ states_per_node := StatesPerNode},
+ FeatureName)
+ when is_map_key(FeatureName, FeatureFlags) ->
+ %% A feature flag is considered supported by a node if:
+ %% - the node knows the feature flag, or
+ %% - the node does not have the application providing it.
+ %%
+ %% Therefore, we first need to look up the application providing this
+ %% feature flag.
+ #{FeatureName := #{provided_by := App}} = FeatureFlags,
+
+ maps:fold(
+ fun
+ (Node, FeatureStates, true) ->
+ case FeatureStates of
+ #{FeatureName := _} ->
+ %% The node knows about the feature flag.
+ true;
+ _ ->
+ %% The node doesn't know about the feature flag, so does
+ %% it have the application providing it loaded?
+ #{Node := ScannedApps} = ScannedAppsPerNode,
+ not lists:member(App, ScannedApps)
+ end;
+ (_Node, _FeatureStates, false) ->
+ false
+ end, true, StatesPerNode);
+is_known_and_supported(_Inventory, _FeatureName) ->
+ %% None of the nodes know about this feature flag at all.
+ false.
+
+%% --------------------------------------------------------------------
+%% Feature flag state changes.
+%% --------------------------------------------------------------------
+
+-spec mark_as_enabled_on_nodes(Nodes, Inventory, FeatureName, IsEnabled) ->
+ Ret when
+ Nodes :: [node()],
+ Inventory :: rabbit_feature_flags:cluster_inventory(),
+ FeatureName :: rabbit_feature_flags:feature_name(),
+ IsEnabled :: rabbit_feature_flags:feature_state(),
+ Ret :: {ok, Inventory} | {error, Reason},
+ Reason :: term().
+
+mark_as_enabled_on_nodes(
+ Nodes,
+ #{states_per_node := StatesPerNode} = Inventory,
+ FeatureName, IsEnabled) ->
+ ?LOG_DEBUG(
+ "Feature flags: `~s`: mark as enabled=~p on nodes ~p",
+ [FeatureName, IsEnabled, Nodes],
+ #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
+ Rets = rpc_calls(
+ Nodes, rabbit_feature_flags, mark_as_enabled_locally,
+ [FeatureName, IsEnabled], ?TIMEOUT),
+ Ret = maps:fold(
+ fun
+ (Node, ok, {ok, StatesPerNode1}) ->
+ FeatureStates1 = maps:get(Node, StatesPerNode1),
+ FeatureStates2 = FeatureStates1#{
+ FeatureName => IsEnabled},
+ StatesPerNode2 = StatesPerNode1#{
+ Node => FeatureStates2},
+ {ok, StatesPerNode2};
+ (_Node, ok, Error) ->
+ Error;
+ (_Node, Error, {ok, _StatesPerNode1}) ->
+ Error;
+ (_Node, _Error, Error) ->
+ Error
+ end, {ok, StatesPerNode}, Rets),
+ case Ret of
+ {ok, StatesPerNode3} ->
+ Inventory1 = Inventory#{states_per_node => StatesPerNode3},
+ {ok, Inventory1};
+ Error ->
+ Error
+ end.
+
+%% --------------------------------------------------------------------
+%% Feature flag dependencies handling.
+%% --------------------------------------------------------------------
+
+-spec enable_dependencies(Inventory, FeatureName) -> Ret when
+ Inventory :: rabbit_feature_flags:cluster_inventory(),
+ FeatureName :: rabbit_feature_flags:feature_name(),
+ Ret :: {ok, Inventory} | {error, Reason},
+ Reason :: term().
+
+enable_dependencies(
+ #{feature_flags := FeatureFlags} = Inventory, FeatureName) ->
+ #{FeatureName := FeatureProps} = FeatureFlags,
+ DependsOn = maps:get(depends_on, FeatureProps, []),
+ case DependsOn of
+ [] ->
+ {ok, Inventory};
+ _ ->
+ ?LOG_DEBUG(
+ "Feature flags: `~s`: enable dependencies: ~p",
+ [FeatureName, DependsOn],
+ #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
+ enable_dependencies1(Inventory, DependsOn)
+ end.
+
+enable_dependencies1(
+ #{states_per_node := _} = Inventory, [FeatureName | Rest]) ->
+ case enable_with_registry_locked(Inventory, FeatureName) of
+ {ok, Inventory1} -> enable_dependencies1(Inventory1, Rest);
+ Error -> Error
+ end;
+enable_dependencies1(
+ #{states_per_node := _} = Inventory, []) ->
+ {ok, Inventory}.
+
+%% --------------------------------------------------------------------
+%% Migration function.
+%% --------------------------------------------------------------------
+
+-spec run_migration_fun(Nodes, FeatureName, Command, Extra, Timeout) ->
+ Rets when
+ Nodes :: [node()],
+ FeatureName :: rabbit_feature_flags:feature_name(),
+ Command :: atom(),
+ Extra :: map(),
+ Timeout :: timeout(),
+ Rets :: #{node() => term()}.
+
+run_migration_fun(Nodes, FeatureName, Command, Extra, Timeout) ->
+ FeatureProps = rabbit_ff_registry:get(FeatureName),
+ case maps:get(migration_fun, FeatureProps, none) of
+ {MigrationMod, MigrationFun}
+ when is_atom(MigrationMod) andalso is_atom(MigrationFun) ->
+ UsesMFv2 = rabbit_feature_flags:uses_migration_fun_v2(
+ MigrationMod, MigrationFun),
+ case UsesMFv2 of
+ true ->
+ %% The migration fun API v2 is of the form:
+ %% MigrationMod:MigrationFun(#ffcommand{...}).
+ %%
+ %% Also, the function is executed on all nodes in
+ %% parallel.
+ FFCommand = #ffcommand{
+ name = FeatureName,
+ props = FeatureProps,
+ command = Command,
+ extra = Extra},
+ run_migration_fun_v2(
+ Nodes, MigrationMod, MigrationFun, FFCommand, Timeout);
+ false ->
+ %% The migration fun API v1 is of the form:
+ %% MigrationMod:MigrationFun(
+ %% FeatureName, FeatureProps, Command).
+ %%
+ %% Also, the function is executed once on the calling node
+ %% only.
+ Ret = rabbit_feature_flags:run_migration_fun(
+ FeatureName, FeatureProps, Command),
+ #{node() => Ret}
+ end;
+ none ->
+ #{};
+ Invalid ->
+ ?LOG_ERROR(
+ "Feature flags: `~s`: invalid migration function: ~p",
+ [FeatureName, Invalid],
+ #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
+ #{node() => {error, {invalid_migration_fun, Invalid}}}
+ end.
+
+-spec run_migration_fun_v2(Nodes, MigrationMod, MigrationFun, FFCommand,
+ Timeout) -> Rets when
+ Nodes :: [node()],
+ MigrationMod :: module(),
+ MigrationFun :: atom(),
+ FFCommand :: rabbit_feature_flags:ffcommand(),
+ Timeout :: timeout(),
+ Rets :: #{node() => term}.
+
+run_migration_fun_v2(
+ Nodes, MigrationMod, MigrationFun,
+ #ffcommand{name = FeatureName} = FFCommand,
+ Timeout) ->
+ ?LOG_DEBUG(
+ "Feature flags: `~s`: run migration function (v2) ~s:~s~n"
+ "Feature flags: with command=~p~n"
+ "Feature flags: on nodes ~p",
+ [FeatureName, MigrationMod, MigrationFun, FFCommand, Nodes],
+ #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
+ Rets = rpc_calls(
+ Nodes, MigrationMod, MigrationFun, [FFCommand], Timeout),
+ ?LOG_DEBUG(
+ "Feature flags: `~s`: migration function (v2) ~s:~s returned:~n"
+ "Feature flags: ~p",
+ [FeatureName, MigrationMod, MigrationFun, Rets],
+ #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
+ Rets.
diff --git a/deps/rabbit/src/rabbit_ff_registry.erl b/deps/rabbit/src/rabbit_ff_registry.erl
index b160de060d..78eef2a34c 100644
--- a/deps/rabbit/src/rabbit_ff_registry.erl
+++ b/deps/rabbit/src/rabbit_ff_registry.erl
@@ -26,7 +26,8 @@
is_supported/1,
is_enabled/1,
is_registry_initialized/0,
- is_registry_written_to_disk/0]).
+ is_registry_written_to_disk/0,
+ inventory/0]).
-ifdef(TEST).
-on_load(on_load/0).
@@ -44,7 +45,7 @@
%% @returns the properties of the specified feature flag.
get(FeatureName) ->
- rabbit_feature_flags:initialize_registry(),
+ rabbit_ff_registry_factory:initialize_registry(),
%% Initially, is_registry_initialized/0 always returns `false`
%% and this ?MODULE:get(FeatureName) is always called. The case
%% statement is here to please Dialyzer.
@@ -65,7 +66,7 @@ get(FeatureName) ->
%% @returns A map of selected feature flags.
list(Which) ->
- rabbit_feature_flags:initialize_registry(),
+ rabbit_ff_registry_factory:initialize_registry(),
%% See get/1 for an explanation of the case statement below.
case is_registry_initialized() of
false -> ?MODULE:list(Which);
@@ -82,7 +83,7 @@ list(Which) ->
%% @returns A map of feature flag states.
states() ->
- rabbit_feature_flags:initialize_registry(),
+ rabbit_ff_registry_factory:initialize_registry(),
%% See get/1 for an explanation of the case statement below.
case is_registry_initialized() of
false -> ?MODULE:states();
@@ -101,7 +102,7 @@ states() ->
%% otherwise.
is_supported(FeatureName) ->
- rabbit_feature_flags:initialize_registry(),
+ rabbit_ff_registry_factory:initialize_registry(),
%% See get/1 for an explanation of the case statement below.
case is_registry_initialized() of
false -> ?MODULE:is_supported(FeatureName);
@@ -120,7 +121,7 @@ is_supported(FeatureName) ->
%% its state is transient, or `false' otherwise.
is_enabled(FeatureName) ->
- rabbit_feature_flags:initialize_registry(),
+ rabbit_ff_registry_factory:initialize_registry(),
%% See get/1 for an explanation of the case statement below.
case is_registry_initialized() of
false -> ?MODULE:is_enabled(FeatureName);
@@ -158,6 +159,13 @@ is_registry_initialized() ->
is_registry_written_to_disk() ->
always_return_true().
+-spec inventory() -> rabbit_feature_flags:inventory().
+
+inventory() ->
+ #{applications => [],
+ feature_flags => #{},
+ states => #{}}.
+
always_return_true() ->
%% This function is here to trick Dialyzer. We want some functions
%% in this initial on-disk registry to always return `true` or
diff --git a/deps/rabbit/src/rabbit_ff_registry_factory.erl b/deps/rabbit/src/rabbit_ff_registry_factory.erl
new file mode 100644
index 0000000000..a9b4b0c7b4
--- /dev/null
+++ b/deps/rabbit/src/rabbit_ff_registry_factory.erl
@@ -0,0 +1,631 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2018-2022 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_ff_registry_factory).
+
+-include_lib("kernel/include/logger.hrl").
+
+-export([initialize_registry/0,
+ initialize_registry/1,
+ initialize_registry/3,
+ acquire_state_change_lock/0,
+ release_state_change_lock/0]).
+
+-ifdef(TEST).
+-export([registry_loading_lock/0]).
+-endif.
+
+-define(FF_STATE_CHANGE_LOCK, {feature_flags_state_change, self()}).
+-define(FF_REGISTRY_LOADING_LOCK, {feature_flags_registry_loading, self()}).
+
+-type registry_vsn() :: term().
+
+acquire_state_change_lock() ->
+ global:set_lock(?FF_STATE_CHANGE_LOCK).
+
+release_state_change_lock() ->
+ global:del_lock(?FF_STATE_CHANGE_LOCK).
+
+-spec initialize_registry() -> ok | {error, any()} | no_return().
+%% @private
+%% @doc
+%% Initializes or reinitializes the registry.
+%%
+%% The registry is an Erlang module recompiled at runtime to hold the
+%% state of all supported feature flags.
+%%
+%% That Erlang module is called {@link rabbit_ff_registry}. The initial
+%% source code of this module simply calls this function so it is
+%% replaced by a proper registry.
+%%
+%% Once replaced, the registry contains the map of all supported feature
+%% flags and their state. This is makes it very efficient to query a
+%% feature flag state or property.
+%%
+%% The registry is local to all RabbitMQ nodes.
+
+initialize_registry() ->
+ initialize_registry(#{}).
+
+-spec initialize_registry(rabbit_feature_flags:feature_flags()) ->
+ ok | {error, any()} | no_return().
+%% @private
+%% @doc
+%% Initializes or reinitializes the registry.
+%%
+%% See {@link initialize_registry/0} for a description of the registry.
+%%
+%% This function takes a map of new supported feature flags (so their
+%% name and extended properties) to add to the existing known feature
+%% flags.
+
+initialize_registry(NewSupportedFeatureFlags) ->
+ %% The first step is to get the feature flag states: if this is the
+ %% first time we initialize it, we read the list from disk (the
+ %% `feature_flags` file). Otherwise we query the existing registry
+ %% before it is replaced.
+ RegistryInitialized = rabbit_ff_registry:is_registry_initialized(),
+ FeatureStates =
+ case RegistryInitialized of
+ true ->
+ rabbit_ff_registry:states();
+ false ->
+ EnabledFeatureNames =
+ rabbit_feature_flags:read_enabled_feature_flags_list(),
+ rabbit_feature_flags:enabled_feature_flags_to_feature_states(
+ EnabledFeatureNames)
+ end,
+
+ %% We also record if the feature flags state was correctly written
+ %% to disk. Currently we don't use this information, but in the
+ %% future, we might want to retry the write if it failed so far.
+ %%
+ %% TODO: Retry to write the feature flags state if the first try
+ %% failed.
+ WrittenToDisk = case RegistryInitialized of
+ true ->
+ rabbit_ff_registry:is_registry_written_to_disk();
+ false ->
+ true
+ end,
+ initialize_registry(NewSupportedFeatureFlags,
+ FeatureStates,
+ WrittenToDisk).
+
+-spec initialize_registry(rabbit_feature_flags:feature_flags(),
+ rabbit_feature_flags:feature_states(),
+ boolean()) ->
+ ok | {error, any()} | no_return().
+%% @private
+%% @doc
+%% Initializes or reinitializes the registry.
+%%
+%% See {@link initialize_registry/0} for a description of the registry.
+%%
+%% This function takes a map of new supported feature flags (so their
+%% name and extended properties) to add to the existing known feature
+%% flags, a map of the new feature flag states (whether they are
+%% enabled, disabled or `state_changing'), and a flag to indicate if the
+%% feature flag states was recorded to disk.
+%%
+%% The latter is used to block callers asking if a feature flag is
+%% enabled or disabled while its state is changing.
+
+initialize_registry(NewSupportedFeatureFlags,
+ NewFeatureStates,
+ WrittenToDisk) ->
+ Ret = maybe_initialize_registry(NewSupportedFeatureFlags,
+ NewFeatureStates,
+ WrittenToDisk),
+ case Ret of
+ ok -> ok;
+ restart -> initialize_registry(NewSupportedFeatureFlags,
+ NewFeatureStates,
+ WrittenToDisk);
+ Error -> Error
+ end.
+
+-spec maybe_initialize_registry(rabbit_feature_flags:feature_flags(),
+ rabbit_feature_flags:feature_states(),
+ boolean()) ->
+ ok | restart | {error, any()} | no_return().
+
+maybe_initialize_registry(NewSupportedFeatureFlags,
+ NewFeatureStates,
+ WrittenToDisk) ->
+ %% We save the version of the current registry before computing
+ %% the new one. This is used when we do the actual reload: if the
+ %% current registry was reloaded in the meantime, we need to restart
+ %% the computation to make sure we don't loose data.
+ RegistryVsn = registry_vsn(),
+
+ %% We take the feature flags already registered.
+ RegistryInitialized = rabbit_ff_registry:is_registry_initialized(),
+ KnownFeatureFlags1 = case RegistryInitialized of
+ true -> rabbit_ff_registry:list(all);
+ false -> #{}
+ end,
+
+ %% Query the list (it's a map to be exact) of known
+ %% supported feature flags. That list comes from the
+ %% `-rabbitmq_feature_flag().` module attributes exposed by all
+ %% currently loaded Erlang modules.
+ {ScannedApps, KnownFeatureFlags2} =
+ rabbit_feature_flags:query_supported_feature_flags(),
+
+ %% We merge the feature flags we already knew about
+ %% (KnownFeatureFlags1), those found in the loaded applications
+ %% (KnownFeatureFlags2) and those specified in arguments
+ %% (NewSupportedFeatureFlags). The latter come from remote nodes
+ %% usually: for example, they can come from plugins loaded on remote
+ %% node but the plugins are missing locally. In this case, we
+ %% consider those feature flags supported because there is no code
+ %% locally which would cause issues.
+ %%
+ %% It means that the list of feature flags only grows. we don't try
+ %% to clean it at some point because we want to remember about the
+ %% feature flags we saw (and their state). It should be fine because
+ %% that list should remain small.
+ KnownFeatureFlags = maps:merge(KnownFeatureFlags1,
+ KnownFeatureFlags2),
+ AllFeatureFlags = maps:merge(KnownFeatureFlags,
+ NewSupportedFeatureFlags),
+
+ %% Next we want to update the feature states, based on the new
+ %% states passed as arguments.
+ FeatureStates0 = case RegistryInitialized of
+ true ->
+ maps:merge(rabbit_ff_registry:states(),
+ NewFeatureStates);
+ false ->
+ NewFeatureStates
+ end,
+ FeatureStates = maps:map(
+ fun(FeatureName, _FeatureProps) ->
+ case FeatureStates0 of
+ #{FeatureName := FeatureState} ->
+ FeatureState;
+ _ ->
+ false
+ end
+ end, AllFeatureFlags),
+
+ %% The feature flags inventory is used by rabbit_ff_controller to query
+ %% feature flags atomically. The inventory also contains the list of
+ %% scanned applications: this is used to determine if an application is
+ %% known by this node or not, and decide if a missing feature flag is
+ %% unknown or unsupported.
+ Inventory = #{applications => ScannedApps,
+ feature_flags => KnownFeatureFlags2,
+ states => FeatureStates},
+
+ Proceed = does_registry_need_refresh(AllFeatureFlags,
+ FeatureStates,
+ WrittenToDisk),
+
+ case Proceed of
+ true ->
+ rabbit_log_feature_flags:debug(
+ "Feature flags: (re)initialize registry (~p)",
+ [self()]),
+ T0 = erlang:timestamp(),
+ Ret = do_initialize_registry(RegistryVsn,
+ AllFeatureFlags,
+ FeatureStates,
+ Inventory,
+ WrittenToDisk),
+ T1 = erlang:timestamp(),
+ rabbit_log_feature_flags:debug(
+ "Feature flags: time to regen registry: ~p µs",
+ [timer:now_diff(T1, T0)]),
+ Ret;
+ false ->
+ rabbit_log_feature_flags:debug(
+ "Feature flags: registry already up-to-date, skipping init"),
+ ok
+ end.
+
+-spec does_registry_need_refresh(rabbit_feature_flags:feature_flags(),
+ rabbit_feature_flags:feature_states(),
+ boolean()) ->
+ boolean().
+
+does_registry_need_refresh(AllFeatureFlags,
+ FeatureStates,
+ WrittenToDisk) ->
+ case rabbit_ff_registry:is_registry_initialized() of
+ true ->
+ %% Before proceeding with the actual
+ %% (re)initialization, let's see if there are any
+ %% changes.
+ CurrentAllFeatureFlags = rabbit_ff_registry:list(all),
+ CurrentFeatureStates = rabbit_ff_registry:states(),
+ CurrentWrittenToDisk =
+ rabbit_ff_registry:is_registry_written_to_disk(),
+
+ if
+ AllFeatureFlags =/= CurrentAllFeatureFlags ->
+ rabbit_log_feature_flags:debug(
+ "Feature flags: registry refresh needed: "
+ "yes, list of feature flags differs"),
+ true;
+ FeatureStates =/= CurrentFeatureStates ->
+ rabbit_log_feature_flags:debug(
+ "Feature flags: registry refresh needed: "
+ "yes, feature flag states differ"),
+ true;
+ WrittenToDisk =/= CurrentWrittenToDisk ->
+ rabbit_log_feature_flags:debug(
+ "Feature flags: registry refresh needed: "
+ "yes, \"written to disk\" state changed"),
+ true;
+ true ->
+ rabbit_log_feature_flags:debug(
+ "Feature flags: registry refresh needed: no"),
+ false
+ end;
+ false ->
+ rabbit_log_feature_flags:debug(
+ "Feature flags: registry refresh needed: "
+ "yes, first-time initialization"),
+ true
+ end.
+
+-spec do_initialize_registry(registry_vsn(),
+ rabbit_feature_flags:feature_flags(),
+ rabbit_feature_flags:feature_states(),
+ rabbit_feature_flags:inventory(),
+ boolean()) ->
+ ok | restart | {error, any()} | no_return().
+%% @private
+
+do_initialize_registry(RegistryVsn,
+ AllFeatureFlags,
+ FeatureStates,
+ #{applications := ScannedApps} = Inventory,
+ WrittenToDisk) ->
+ %% We log the state of those feature flags.
+ rabbit_log_feature_flags:debug(
+ "Feature flags: list of feature flags found:~n" ++
+ lists:flatten(
+ [rabbit_misc:format(
+ "Feature flags: [~s] ~s~n",
+ [case maps:get(FeatureName, FeatureStates, false) of
+ true -> "x";
+ state_changing -> "~~";
+ false -> " "
+ end,
+ FeatureName])
+ || FeatureName <- lists:sort(maps:keys(AllFeatureFlags))] ++
+ [rabbit_misc:format(
+ "Feature flags: scanned applications: ~p~n"
+ "Feature flags: feature flag states written to disk: ~s",
+ [ScannedApps,
+ case WrittenToDisk of
+ true -> "yes";
+ false -> "no"
+ end])])
+ ),
+
+ %% We request the registry to be regenerated and reloaded with the
+ %% new state.
+ regen_registry_mod(RegistryVsn,
+ AllFeatureFlags,
+ FeatureStates,
+ Inventory,
+ WrittenToDisk).
+
+-spec regen_registry_mod(
+ RegistryVsn, AllFeatureFlags, FeatureStates, Inventory,
+ WrittenToDisk) -> Ret when
+ RegistryVsn :: registry_vsn(),
+ AllFeatureFlags :: rabbit_feature_flags:feature_flags(),
+ FeatureStates :: rabbit_feature_flags:feature_states(),
+ Inventory :: rabbit_feature_flags:inventory(),
+ WrittenToDisk :: boolean(),
+ Ret :: ok | restart | {error, any()} | no_return().
+%% @private
+
+regen_registry_mod(RegistryVsn,
+ AllFeatureFlags,
+ FeatureStates,
+ Inventory,
+ WrittenToDisk) ->
+ %% Here, we recreate the source code of the `rabbit_ff_registry`
+ %% module from scratch.
+ %%
+ %% IMPORTANT: We want both modules to have the exact same public
+ %% API in order to simplify the life of developers and their tools
+ %% (Dialyzer, completion, and so on).
+
+ %% -module(rabbit_ff_registry).
+ ModuleAttr = erl_syntax:attribute(
+ erl_syntax:atom(module),
+ [erl_syntax:atom(rabbit_ff_registry)]),
+ ModuleForm = erl_syntax:revert(ModuleAttr),
+ %% -export([...]).
+ ExportAttr = erl_syntax:attribute(
+ erl_syntax:atom(export),
+ [erl_syntax:list(
+ [erl_syntax:arity_qualifier(
+ erl_syntax:atom(F),
+ erl_syntax:integer(A))
+ || {F, A} <- [{get, 1},
+ {list, 1},
+ {states, 0},
+ {is_supported, 1},
+ {is_enabled, 1},
+ {is_registry_initialized, 0},
+ {is_registry_written_to_disk, 0},
+ {inventory, 0}]]
+ )
+ ]
+ ),
+ ExportForm = erl_syntax:revert(ExportAttr),
+ %% get(_) -> ...
+ GetClauses = [erl_syntax:clause(
+ [erl_syntax:atom(FeatureName)],
+ [],
+ [erl_syntax:abstract(maps:get(FeatureName,
+ AllFeatureFlags))])
+ || FeatureName <- maps:keys(AllFeatureFlags)
+ ],
+ GetUnknownClause = erl_syntax:clause(
+ [erl_syntax:variable("_")],
+ [],
+ [erl_syntax:atom(undefined)]),
+ GetFun = erl_syntax:function(
+ erl_syntax:atom(get),
+ GetClauses ++ [GetUnknownClause]),
+ GetFunForm = erl_syntax:revert(GetFun),
+ %% list(_) -> ...
+ ListAllBody = erl_syntax:abstract(AllFeatureFlags),
+ ListAllClause = erl_syntax:clause([erl_syntax:atom(all)],
+ [],
+ [ListAllBody]),
+ EnabledFeatureFlags = maps:filter(
+ fun(FeatureName, _) ->
+ maps:is_key(FeatureName,
+ FeatureStates)
+ andalso
+ maps:get(FeatureName, FeatureStates)
+ =:=
+ true
+ end, AllFeatureFlags),
+ ListEnabledBody = erl_syntax:abstract(EnabledFeatureFlags),
+ ListEnabledClause = erl_syntax:clause(
+ [erl_syntax:atom(enabled)],
+ [],
+ [ListEnabledBody]),
+ DisabledFeatureFlags = maps:filter(
+ fun(FeatureName, _) ->
+ not maps:is_key(FeatureName,
+ FeatureStates)
+ orelse
+ maps:get(FeatureName, FeatureStates)
+ =:=
+ false
+ end, AllFeatureFlags),
+ ListDisabledBody = erl_syntax:abstract(DisabledFeatureFlags),
+ ListDisabledClause = erl_syntax:clause(
+ [erl_syntax:atom(disabled)],
+ [],
+ [ListDisabledBody]),
+ StateChangingFeatureFlags = maps:filter(
+ fun(FeatureName, _) ->
+ maps:is_key(FeatureName,
+ FeatureStates)
+ andalso
+ maps:get(FeatureName, FeatureStates)
+ =:=
+ state_changing
+ end, AllFeatureFlags),
+ ListStateChangingBody = erl_syntax:abstract(StateChangingFeatureFlags),
+ ListStateChangingClause = erl_syntax:clause(
+ [erl_syntax:atom(state_changing)],
+ [],
+ [ListStateChangingBody]),
+ ListFun = erl_syntax:function(
+ erl_syntax:atom(list),
+ [ListAllClause,
+ ListEnabledClause,
+ ListDisabledClause,
+ ListStateChangingClause]),
+ ListFunForm = erl_syntax:revert(ListFun),
+ %% states() -> ...
+ StatesBody = erl_syntax:abstract(FeatureStates),
+ StatesClause = erl_syntax:clause([], [], [StatesBody]),
+ StatesFun = erl_syntax:function(
+ erl_syntax:atom(states),
+ [StatesClause]),
+ StatesFunForm = erl_syntax:revert(StatesFun),
+ %% is_supported(_) -> ...
+ IsSupportedClauses = [erl_syntax:clause(
+ [erl_syntax:atom(FeatureName)],
+ [],
+ [erl_syntax:atom(true)])
+ || FeatureName <- maps:keys(AllFeatureFlags)
+ ],
+ NotSupportedClause = erl_syntax:clause(
+ [erl_syntax:variable("_")],
+ [],
+ [erl_syntax:atom(false)]),
+ IsSupportedFun = erl_syntax:function(
+ erl_syntax:atom(is_supported),
+ IsSupportedClauses ++ [NotSupportedClause]),
+ IsSupportedFunForm = erl_syntax:revert(IsSupportedFun),
+ %% is_enabled(_) -> ...
+ IsEnabledClauses = [erl_syntax:clause(
+ [erl_syntax:atom(FeatureName)],
+ [],
+ [case maps:is_key(FeatureName, FeatureStates) of
+ true ->
+ erl_syntax:atom(
+ maps:get(FeatureName, FeatureStates));
+ false ->
+ erl_syntax:atom(false)
+ end])
+ || FeatureName <- maps:keys(AllFeatureFlags)
+ ],
+ NotEnabledClause = erl_syntax:clause(
+ [erl_syntax:variable("_")],
+ [],
+ [erl_syntax:atom(false)]),
+ IsEnabledFun = erl_syntax:function(
+ erl_syntax:atom(is_enabled),
+ IsEnabledClauses ++ [NotEnabledClause]),
+ IsEnabledFunForm = erl_syntax:revert(IsEnabledFun),
+ %% is_registry_initialized() -> ...
+ IsInitializedClauses = [erl_syntax:clause(
+ [],
+ [],
+ [erl_syntax:atom(true)])
+ ],
+ IsInitializedFun = erl_syntax:function(
+ erl_syntax:atom(is_registry_initialized),
+ IsInitializedClauses),
+ IsInitializedFunForm = erl_syntax:revert(IsInitializedFun),
+ %% is_registry_written_to_disk() -> ...
+ IsWrittenToDiskClauses = [erl_syntax:clause(
+ [],
+ [],
+ [erl_syntax:atom(WrittenToDisk)])
+ ],
+ IsWrittenToDiskFun = erl_syntax:function(
+ erl_syntax:atom(is_registry_written_to_disk),
+ IsWrittenToDiskClauses),
+ IsWrittenToDiskFunForm = erl_syntax:revert(IsWrittenToDiskFun),
+ %% inventory() -> ...
+ InventoryBody = erl_syntax:abstract(Inventory),
+ InventoryClause = erl_syntax:clause([], [], [InventoryBody]),
+ InventoryFun = erl_syntax:function(
+ erl_syntax:atom(inventory),
+ [InventoryClause]),
+ InventoryFunForm = erl_syntax:revert(InventoryFun),
+ %% Compilation!
+ Forms = [ModuleForm,
+ ExportForm,
+ GetFunForm,
+ ListFunForm,
+ StatesFunForm,
+ IsSupportedFunForm,
+ IsEnabledFunForm,
+ IsInitializedFunForm,
+ IsWrittenToDiskFunForm,
+ InventoryFunForm],
+ maybe_log_registry_source_code(Forms),
+ CompileOpts = [return_errors,
+ return_warnings],
+ case compile:forms(Forms, CompileOpts) of
+ {ok, Mod, Bin, _} ->
+ load_registry_mod(RegistryVsn, Mod, Bin);
+ {error, Errors, Warnings} ->
+ rabbit_log_feature_flags:error(
+ "Feature flags: registry compilation failure:~n"
+ "Errors: ~p~n"
+ "Warnings: ~p",
+ [Errors, Warnings]),
+ {error, {compilation_failure, Errors, Warnings}};
+ error ->
+ rabbit_log_feature_flags:error(
+ "Feature flags: registry compilation failure",
+ []),
+ {error, {compilation_failure, [], []}}
+ end.
+
+maybe_log_registry_source_code(Forms) ->
+ case rabbit_prelaunch:get_context() of
+ #{log_feature_flags_registry := true} ->
+ rabbit_log_feature_flags:debug(
+ "== FEATURE FLAGS REGISTRY ==~n"
+ "~s~n"
+ "== END ==~n",
+ [erl_prettypr:format(erl_syntax:form_list(Forms))]),
+ ok;
+ _ ->
+ ok
+ end.
+
+-ifdef(TEST).
+registry_loading_lock() -> ?FF_REGISTRY_LOADING_LOCK.
+-endif.
+
+-spec load_registry_mod(registry_vsn(), module(), binary()) ->
+ ok | restart | no_return().
+%% @private
+
+load_registry_mod(RegistryVsn, Mod, Bin) ->
+ rabbit_log_feature_flags:debug(
+ "Feature flags: registry module ready, loading it (~p)...",
+ [self()]),
+ FakeFilename = "Compiled and loaded by " ?MODULE_STRING,
+ %% Time to load the new registry, replacing the old one. We use a
+ %% lock here to synchronize concurrent reloads.
+ global:set_lock(?FF_REGISTRY_LOADING_LOCK, [node()]),
+ rabbit_log_feature_flags:debug(
+ "Feature flags: acquired lock before reloading registry module (~p)",
+ [self()]),
+ %% We want to make sure that the old registry (not the one being
+ %% currently in use) is purged by the code server. It means no
+ %% process lingers on that old code.
+ %%
+ %% We use code:soft_purge() for that (meaning no process is killed)
+ %% and we wait in an infinite loop for that to succeed.
+ ok = purge_old_registry(Mod),
+ %% Now we can replace the currently loaded registry by the new one.
+ %% The code server takes care of marking the current registry as old
+ %% and load the new module in an atomic operation.
+ %%
+ %% Therefore there is no chance of a window where there is no
+ %% registry module available, causing the one on disk to be
+ %% reloaded.
+ Ret = case registry_vsn() of
+ RegistryVsn -> code:load_binary(Mod, FakeFilename, Bin);
+ OtherVsn -> {error, {restart, RegistryVsn, OtherVsn}}
+ end,
+ rabbit_log_feature_flags:debug(
+ "Feature flags: releasing lock after reloading registry module (~p)",
+ [self()]),
+ global:del_lock(?FF_REGISTRY_LOADING_LOCK, [node()]),
+ case Ret of
+ {module, _} ->
+ rabbit_log_feature_flags:debug(
+ "Feature flags: registry module loaded (vsn: ~p -> ~p)",
+ [RegistryVsn, registry_vsn()]),
+ ok;
+ {error, {restart, Expected, Current}} ->
+ rabbit_log_feature_flags:error(
+ "Feature flags: another registry module was loaded in the "
+ "meantime (expected old vsn: ~p, current vsn: ~p); "
+ "restarting the regen",
+ [Expected, Current]),
+ restart;
+ {error, Reason} ->
+ rabbit_log_feature_flags:error(
+ "Feature flags: failed to load registry module: ~p",
+ [Reason]),
+ throw({feature_flag_registry_reload_failure, Reason})
+ end.
+
+-spec registry_vsn() -> registry_vsn().
+%% @private
+
+registry_vsn() ->
+ Attrs = rabbit_ff_registry:module_info(attributes),
+ proplists:get_value(vsn, Attrs, undefined).
+
+purge_old_registry(Mod) ->
+ case code:is_loaded(Mod) of
+ {file, _} -> do_purge_old_registry(Mod);
+ false -> ok
+ end.
+
+do_purge_old_registry(Mod) ->
+ case code:soft_purge(Mod) of
+ true -> ok;
+ false -> do_purge_old_registry(Mod)
+ end.
diff --git a/deps/rabbit/src/rabbit_prelaunch_feature_flags.erl b/deps/rabbit/src/rabbit_prelaunch_feature_flags.erl
index 51040ad75c..3d9a1b470a 100644
--- a/deps/rabbit/src/rabbit_prelaunch_feature_flags.erl
+++ b/deps/rabbit/src/rabbit_prelaunch_feature_flags.erl
@@ -22,7 +22,7 @@ setup(#{feature_flags_file := FFFile}) ->
?LOG_DEBUG(
"Initializing feature flags registry", [],
#{domain => ?RMQLOG_DOMAIN_PRELAUNCH}),
- case rabbit_feature_flags:initialize_registry() of
+ case rabbit_ff_registry_factory:initialize_registry() of
ok ->
ok;
{error, Reason} ->
diff --git a/deps/rabbit/test/feature_flags_SUITE.erl b/deps/rabbit/test/feature_flags_SUITE.erl
index da0283e78d..f8402f32d3 100644
--- a/deps/rabbit/test/feature_flags_SUITE.erl
+++ b/deps/rabbit/test/feature_flags_SUITE.erl
@@ -45,19 +45,13 @@ suite() ->
all() ->
[
{group, registry},
- {group, enabling_on_single_node},
- {group, enabling_in_cluster},
- {group, clustering},
- {group, activating_plugin}
+ {group, feature_flags_v1},
+ {group, feature_flags_v2}
].
groups() ->
+ Groups =
[
- {registry, [],
- [
- registry_general_usage,
- registry_concurrent_reloads
- ]},
{enabling_on_single_node, [],
[
enable_feature_flag_in_a_healthy_situation,
@@ -85,6 +79,16 @@ groups() ->
activating_plugin_with_new_ff_disabled,
activating_plugin_with_new_ff_enabled
]}
+ ],
+
+ [
+ {registry, [],
+ [
+ registry_general_usage,
+ registry_concurrent_reloads
+ ]},
+ {feature_flags_v1, [], Groups},
+ {feature_flags_v2, [], Groups}
].
%% -------------------------------------------------------------------
@@ -100,6 +104,10 @@ init_per_suite(Config) ->
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).
+init_per_group(feature_flags_v1, Config) ->
+ rabbit_ct_helpers:set_config(Config, {enable_feature_flags_v2, false});
+init_per_group(feature_flags_v2, Config) ->
+ rabbit_ct_helpers:set_config(Config, {enable_feature_flags_v2, true});
init_per_group(enabling_on_single_node, Config) ->
rabbit_ct_helpers:set_config(
Config,
@@ -161,6 +169,16 @@ end_per_group(_, Config) ->
init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase),
TestNumber = rabbit_ct_helpers:testcase_number(Config, ?MODULE, Testcase),
+ UsingFFv2 = rabbit_ct_helpers:get_config(
+ Config, enable_feature_flags_v2, false),
+ ForcedFFs = case UsingFFv2 of
+ true -> [feature_flags_v2];
+ false -> []
+ end,
+ Suffix = case UsingFFv2 of
+ false -> rabbit_misc:format("~s-v1", [Testcase]);
+ true -> rabbit_misc:format("~s-v2", [Testcase])
+ end,
case ?config(tc_group_properties, Config) of
[{name, registry} | _] ->
logger:set_primary_config(level, debug),
@@ -178,14 +196,14 @@ init_per_testcase(Testcase, Config) ->
ClusterSize = ?config(rmq_nodes_count, Config),
Config1 = rabbit_ct_helpers:set_config(
Config,
- [{rmq_nodename_suffix, Testcase},
+ [{rmq_nodename_suffix, Suffix},
{tcp_ports_base, {skip_n_nodes,
TestNumber * ClusterSize}}
]),
Config2 = rabbit_ct_helpers:merge_app_env(
Config1,
{rabbit,
- [{forced_feature_flags_on_init, []},
+ [{forced_feature_flags_on_init, ForcedFFs},
{log, [{file, [{level, debug}]}]}]}),
Config3 = rabbit_ct_helpers:run_steps(
Config2,
@@ -211,7 +229,7 @@ init_per_testcase(Testcase, Config) ->
ClusterSize = ?config(rmq_nodes_count, Config),
Config1 = rabbit_ct_helpers:set_config(
Config,
- [{rmq_nodename_suffix, Testcase},
+ [{rmq_nodename_suffix, Suffix},
{tcp_ports_base, {skip_n_nodes,
TestNumber * ClusterSize}},
{net_ticktime, 5}
@@ -219,7 +237,7 @@ init_per_testcase(Testcase, Config) ->
Config2 = rabbit_ct_helpers:merge_app_env(
Config1,
{rabbit,
- [{forced_feature_flags_on_init, []},
+ [{forced_feature_flags_on_init, ForcedFFs},
{log, [{file, [{level, debug}]}]}]}),
Config3 = rabbit_ct_helpers:run_steps(
Config2,
@@ -267,17 +285,18 @@ registry_general_usage(_Config) ->
?assertNot(rabbit_ff_registry:is_registry_initialized()),
FeatureFlags = #{ff_a =>
- #{desc => "Feature flag A",
- stability => stable},
+ #{desc => "Feature flag A",
+ provided_by => ?MODULE,
+ stability => stable},
ff_b =>
- #{desc => "Feature flag B",
- stability => stable}},
- rabbit_feature_flags:inject_test_feature_flags(
- feature_flags_to_app_attrs(FeatureFlags)),
+ #{desc => "Feature flag B",
+ provided_by => ?MODULE,
+ stability => stable}},
+ rabbit_feature_flags:inject_test_feature_flags(FeatureFlags),
%% After initialization, it must know about the feature flags
%% declared in this testsuite. They must be disabled however.
- rabbit_feature_flags:initialize_registry(),
+ rabbit_ff_registry_factory:initialize_registry(),
?assert(rabbit_ff_registry:is_registry_initialized()),
?assertMatch([ff_a, ff_b], ?list_ff(all)),
@@ -286,7 +305,8 @@ registry_general_usage(_Config) ->
?assertNot(rabbit_ff_registry:is_supported(ff_c)),
?assertNot(rabbit_ff_registry:is_supported(ff_d)),
- ?assertEqual(erlang:map_size(rabbit_ff_registry:states()), 0),
+ ?assertEqual(#{ff_a => false,
+ ff_b => false}, rabbit_ff_registry:states()),
?assertMatch([], ?list_ff(enabled)),
?assertMatch([], ?list_ff(state_changing)),
?assertMatch([ff_a, ff_b], ?list_ff(disabled)),
@@ -301,7 +321,8 @@ registry_general_usage(_Config) ->
#{desc => "Feature flag C",
provided_by => ?MODULE,
stability => stable}},
- rabbit_feature_flags:initialize_registry(NewFeatureFlags),
+ rabbit_feature_flags:inject_test_feature_flags(NewFeatureFlags),
+ rabbit_ff_registry_factory:initialize_registry(),
?assertMatch([ff_a, ff_b, ff_c],
lists:sort(maps:keys(rabbit_ff_registry:list(all)))),
@@ -310,7 +331,9 @@ registry_general_usage(_Config) ->
?assert(rabbit_ff_registry:is_supported(ff_c)),
?assertNot(rabbit_ff_registry:is_supported(ff_d)),
- ?assertEqual(erlang:map_size(rabbit_ff_registry:states()), 0),
+ ?assertEqual(#{ff_a => false,
+ ff_b => false,
+ ff_c => false}, rabbit_ff_registry:states()),
?assertMatch([], ?list_ff(enabled)),
?assertMatch([], ?list_ff(state_changing)),
?assertMatch([ff_a, ff_b, ff_c], ?list_ff(disabled)),
@@ -321,9 +344,9 @@ registry_general_usage(_Config) ->
%% After enabling `ff_a`, it is actually the case. Others are
%% supported but remain disabled.
- rabbit_feature_flags:initialize_registry(#{},
- #{ff_a => true},
- true),
+ rabbit_ff_registry_factory:initialize_registry(#{},
+ #{ff_a => true},
+ true),
?assertMatch([ff_a, ff_b, ff_c],
lists:sort(maps:keys(rabbit_ff_registry:list(all)))),
@@ -332,7 +355,9 @@ registry_general_usage(_Config) ->
?assert(rabbit_ff_registry:is_supported(ff_c)),
?assertNot(rabbit_ff_registry:is_supported(ff_d)),
- ?assertMatch(#{ff_a := true}, rabbit_ff_registry:states()),
+ ?assertEqual(#{ff_a => true,
+ ff_b => false,
+ ff_c => false}, rabbit_ff_registry:states()),
?assertMatch([ff_a], ?list_ff(enabled)),
?assertMatch([], ?list_ff(state_changing)),
?assertMatch([ff_b, ff_c], ?list_ff(disabled)),
@@ -343,10 +368,10 @@ registry_general_usage(_Config) ->
%% This time, we mark the state of `ff_c` as `state_changing`. We
%% expect all other feature flag states to remain unchanged.
- rabbit_feature_flags:initialize_registry(#{},
- #{ff_a => false,
- ff_c => state_changing},
- true),
+ rabbit_ff_registry_factory:initialize_registry(#{},
+ #{ff_a => false,
+ ff_c => state_changing},
+ true),
?assertMatch([ff_a, ff_b, ff_c],
lists:sort(maps:keys(rabbit_ff_registry:list(all)))),
@@ -355,7 +380,9 @@ registry_general_usage(_Config) ->
?assert(rabbit_ff_registry:is_supported(ff_c)),
?assertNot(rabbit_ff_registry:is_supported(ff_d)),
- ?assertMatch(#{ff_c := state_changing}, rabbit_ff_registry:states()),
+ ?assertEqual(#{ff_a => false,
+ ff_b => false,
+ ff_c => state_changing}, rabbit_ff_registry:states()),
?assertMatch([], ?list_ff(enabled)),
?assertMatch([ff_c], ?list_ff(state_changing)),
?assertMatch([ff_a, ff_b], ?list_ff(disabled)),
@@ -366,10 +393,10 @@ registry_general_usage(_Config) ->
%% Finally, we disable `ff_c`. All of them are supported but
%% disabled.
- rabbit_feature_flags:initialize_registry(#{},
- #{ff_b => false,
- ff_c => false},
- true),
+ rabbit_ff_registry_factory:initialize_registry(#{},
+ #{ff_b => false,
+ ff_c => false},
+ true),
?assertMatch([ff_a, ff_b, ff_c],
lists:sort(maps:keys(rabbit_ff_registry:list(all)))),
@@ -378,7 +405,9 @@ registry_general_usage(_Config) ->
?assert(rabbit_ff_registry:is_supported(ff_c)),
?assertNot(rabbit_ff_registry:is_supported(ff_d)),
- ?assertEqual(erlang:map_size(rabbit_ff_registry:states()), 0),
+ ?assertEqual(#{ff_a => false,
+ ff_b => false,
+ ff_c => false}, rabbit_ff_registry:states()),
?assertMatch([], ?list_ff(enabled)),
?assertMatch([], ?list_ff(state_changing)),
?assertMatch([ff_a, ff_b, ff_c], ?list_ff(disabled)),
@@ -390,7 +419,7 @@ registry_general_usage(_Config) ->
registry_concurrent_reloads(_Config) ->
case rabbit_ff_registry:is_registry_initialized() of
true -> ok;
- false -> rabbit_feature_flags:initialize_registry()
+ false -> rabbit_ff_registry_factory:initialize_registry()
end,
?assert(rabbit_ff_registry:is_registry_initialized()),
@@ -409,22 +438,24 @@ registry_concurrent_reloads(_Config) ->
Name = MakeName(I),
Desc = rabbit_misc:format("Feature flag ~b", [I]),
NewFF = #{Name =>
- #{desc => Desc,
- stability => stable}},
- rabbit_feature_flags:initialize_registry(NewFF),
+ #{desc => Desc,
+ provided_by => ?MODULE,
+ stability => stable}},
+ rabbit_feature_flags:inject_test_feature_flags(NewFF),
unlink(Parent)
end,
%% Prepare feature flags which the spammer process should get at
%% some point.
FeatureFlags = #{ff_a =>
- #{desc => "Feature flag A",
- stability => stable},
+ #{desc => "Feature flag A",
+ provided_by => ?MODULE,
+ stability => stable},
ff_b =>
- #{desc => "Feature flag B",
- stability => stable}},
- rabbit_feature_flags:inject_test_feature_flags(
- feature_flags_to_app_attrs(FeatureFlags)),
+ #{desc => "Feature flag B",
+ provided_by => ?MODULE,
+ stability => stable}},
+ rabbit_feature_flags:inject_test_feature_flags(FeatureFlags),
%% Spawn a process which heavily uses the registry.
FinalFFList = lists:sort(
@@ -437,7 +468,7 @@ registry_concurrent_reloads(_Config) ->
%% We acquire the lock from the main process to synchronize the test
%% processes we are about to spawn.
- Lock = rabbit_feature_flags:registry_loading_lock(),
+ Lock = rabbit_ff_registry_factory:registry_loading_lock(),
ThisNode = [node()],
rabbit_log_feature_flags:info(
?MODULE_STRING ": Acquiring registry load lock"),
@@ -632,9 +663,17 @@ enable_feature_flag_with_a_network_partition(Config) ->
%% Enabling the feature flag should fail in the specific case of
%% `ff_from_testsuite`, if the network is broken.
- ?assertEqual(
- {error, unsupported},
- enable_feature_flag_on(Config, B, FeatureName)),
+ UsingFFv1 = not ?config(enable_feature_flags_v2, Config),
+ case UsingFFv1 of
+ true ->
+ ?assertEqual(
+ {error, unsupported},
+ enable_feature_flag_on(Config, B, FeatureName));
+ false ->
+ ?assertEqual(
+ {error, missing_clustered_nodes},
+ enable_feature_flag_on(Config, B, FeatureName))
+ end,
?assertEqual(
False,
is_feature_flag_enabled(Config, FeatureName)),
@@ -818,7 +857,7 @@ clustering_ok_with_new_ff_disabled(Config) ->
stability => stable}},
rabbit_ct_broker_helpers:rpc(
Config, 0,
- rabbit_feature_flags, initialize_registry, [NewFeatureFlags]),
+ rabbit_feature_flags, inject_test_feature_flags, [NewFeatureFlags]),
FFSubsysOk = is_feature_flag_subsystem_available(Config),
@@ -854,7 +893,7 @@ clustering_denied_with_new_ff_enabled(Config) ->
stability => stable}},
rabbit_ct_broker_helpers:rpc(
Config, 0,
- rabbit_feature_flags, initialize_registry, [NewFeatureFlags]),
+ rabbit_feature_flags, inject_test_feature_flags, [NewFeatureFlags]),
enable_feature_flag_on(Config, 0, time_travel),
FFSubsysOk = is_feature_flag_subsystem_available(Config),
@@ -933,12 +972,21 @@ clustering_ok_with_new_ff_enabled_from_plugin_on_some_nodes(Config) ->
?assertEqual(Config, rabbit_ct_broker_helpers:cluster_nodes(Config)),
log_feature_flags_of_all_nodes(Config),
+ UsingFFv2 = ?config(enable_feature_flags_v2, Config),
+ UsingFFv1 = not UsingFFv2,
case FFSubsysOk of
- true -> ?assertEqual([true, true],
- is_feature_flag_supported(Config, plugin_ff)),
- ?assertEqual([true, true],
- is_feature_flag_enabled(Config, plugin_ff));
- false -> ok
+ true when UsingFFv1 ->
+ ?assertEqual([true, true],
+ is_feature_flag_supported(Config, plugin_ff)),
+ ?assertEqual([true, true],
+ is_feature_flag_enabled(Config, plugin_ff));
+ true when UsingFFv2 ->
+ ?assertEqual([true, true],
+ is_feature_flag_supported(Config, plugin_ff)),
+ ?assertEqual([true, false],
+ is_feature_flag_enabled(Config, plugin_ff));
+ false ->
+ ok
end,
ok.
@@ -994,12 +1042,21 @@ activating_plugin_with_new_ff_enabled(Config) ->
enable_feature_flag_on(Config, 0, plugin_ff),
log_feature_flags_of_all_nodes(Config),
+ UsingFFv2 = ?config(enable_feature_flags_v2, Config),
+ UsingFFv1 = not UsingFFv2,
case FFSubsysOk of
- true -> ?assertEqual([true, true],
- is_feature_flag_supported(Config, plugin_ff)),
- ?assertEqual([true, true],
- is_feature_flag_enabled(Config, plugin_ff));
- false -> ok
+ true when UsingFFv1 ->
+ ?assertEqual([true, true],
+ is_feature_flag_supported(Config, plugin_ff)),
+ ?assertEqual([true, true],
+ is_feature_flag_enabled(Config, plugin_ff));
+ true when UsingFFv2 ->
+ ?assertEqual([true, true],
+ is_feature_flag_supported(Config, plugin_ff)),
+ ?assertEqual([true, false],
+ is_feature_flag_enabled(Config, plugin_ff));
+ false ->
+ ok
end,
ok.
@@ -1150,20 +1207,16 @@ log_feature_flags_of_all_nodes(Config) ->
Config, rabbit_feature_flags, info, [#{color => false,
lines => false}]).
-feature_flags_to_app_attrs(FeatureFlags) when is_map(FeatureFlags) ->
- [{?MODULE, % Application
- ?MODULE, % Module
- maps:to_list(FeatureFlags)}].
-
declare_arbitrary_feature_flag(Config) ->
FeatureFlags = #{ff_from_testsuite =>
#{desc => "My feature flag",
+ provided_by => ?MODULE,
stability => stable}},
rabbit_ct_broker_helpers:rpc_all(
Config,
rabbit_feature_flags,
inject_test_feature_flags,
- [feature_flags_to_app_attrs(FeatureFlags)]),
+ [FeatureFlags]),
ok.
block(Pairs) -> [block(X, Y) || {X, Y} <- Pairs].
diff --git a/deps/rabbit/test/feature_flags_v2_SUITE.erl b/deps/rabbit/test/feature_flags_v2_SUITE.erl
new file mode 100644
index 0000000000..de4daab394
--- /dev/null
+++ b/deps/rabbit/test/feature_flags_v2_SUITE.erl
@@ -0,0 +1,1540 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2021-2022 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(feature_flags_v2_SUITE).
+
+-include_lib("kernel/include/logger.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+
+-include_lib("rabbit_common/include/logging.hrl").
+
+-include("feature_flags.hrl").
+
+-export([suite/0,
+ all/0,
+ groups/0,
+ init_per_suite/1,
+ end_per_suite/1,
+ init_per_group/2,
+ end_per_group/2,
+ init_per_testcase/2,
+ end_per_testcase/2,
+
+ mf_count_runs/3,
+ mf_wait_and_count_runs/3,
+ mf_wait_and_count_runs_v2/1,
+ mf_wait_and_count_runs_in_post_enable/1,
+
+ enable_unknown_feature_flag_on_a_single_node/1,
+ enable_supported_feature_flag_on_a_single_node/1,
+ enable_unknown_feature_flag_in_a_3node_cluster/1,
+ enable_supported_feature_flag_in_a_3node_cluster/1,
+ enable_partially_supported_feature_flag_in_a_3node_cluster/1,
+ enable_unsupported_feature_flag_in_a_3node_cluster/1,
+ enable_feature_flag_in_cluster_and_add_member_after/1,
+ enable_feature_flag_in_cluster_and_add_member_concurrently_mfv1/1,
+ enable_feature_flag_in_cluster_and_add_member_concurrently_mfv2/1,
+ enable_feature_flag_in_cluster_and_remove_member_concurrently_mfv1/1,
+ enable_feature_flag_in_cluster_and_remove_member_concurrently_mfv2/1,
+ enable_feature_flag_with_post_enable/1
+ ]).
+
+suite() ->
+ [{timetrap, {minutes, 1}}].
+
+all() ->
+ [
+ {group, feature_flags_v1},
+ {group, feature_flags_v2}
+ ].
+
+groups() ->
+ Groups =
+ [
+ {cluster_size_1, [parallel],
+ [
+ enable_unknown_feature_flag_on_a_single_node,
+ enable_supported_feature_flag_on_a_single_node
+ ]},
+ {cluster_size_3, [parallel],
+ [
+ enable_unknown_feature_flag_in_a_3node_cluster,
+ enable_supported_feature_flag_in_a_3node_cluster,
+ enable_partially_supported_feature_flag_in_a_3node_cluster,
+ enable_unsupported_feature_flag_in_a_3node_cluster,
+ enable_feature_flag_in_cluster_and_add_member_after,
+ enable_feature_flag_in_cluster_and_add_member_concurrently_mfv1,
+ enable_feature_flag_in_cluster_and_add_member_concurrently_mfv2,
+ enable_feature_flag_in_cluster_and_remove_member_concurrently_mfv1,
+ enable_feature_flag_in_cluster_and_remove_member_concurrently_mfv2,
+ enable_feature_flag_with_post_enable
+ ]}
+ ],
+ [
+ {feature_flags_v1, [], Groups},
+ {feature_flags_v2, [], Groups}
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ logger:set_primary_config(level, debug),
+ rabbit_ct_helpers:run_steps(
+ Config,
+ [fun rabbit_ct_helpers:redirect_logger_to_ct_logs/1]).
+
+end_per_suite(Config) ->
+ Config.
+
+init_per_group(feature_flags_v1, Config) ->
+ rabbit_ct_helpers:set_config(Config, {enable_feature_flags_v2, false});
+init_per_group(feature_flags_v2, Config) ->
+ rabbit_ct_helpers:set_config(Config, {enable_feature_flags_v2, true});
+init_per_group(cluster_size_1, Config) ->
+ rabbit_ct_helpers:set_config(Config, {nodes_count, 1});
+init_per_group(cluster_size_3, Config) ->
+ rabbit_ct_helpers:set_config(Config, {nodes_count, 3});
+init_per_group(_Group, Config) ->
+ Config.
+
+end_per_group(_Group, Config) ->
+ Config.
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:run_steps(
+ Config,
+ [fun(Cfg) -> start_slave_nodes(Cfg, Testcase) end]).
+
+end_per_testcase(_Testcase, Config) ->
+ rabbit_ct_helpers:run_steps(
+ Config,
+ [fun stop_slave_nodes/1]).
+
+start_slave_nodes(Config, Testcase) ->
+ NodesCount = ?config(nodes_count, Config),
+ ct:pal("Starting ~b slave node(s):", [NodesCount]),
+ Parent = self(),
+ Starters = [spawn_link(
+ fun() ->
+ start_slave_node(Parent, Config, Testcase, N)
+ end)
+ || N <- lists:seq(1, NodesCount)],
+ Nodes = lists:sort(
+ [receive
+ {node, Starter, Node} -> Node
+ end || Starter <- Starters]),
+ ct:pal("Started ~b slave node(s): ~p", [NodesCount, Nodes]),
+ rabbit_ct_helpers:set_config(Config, {nodes, Nodes}).
+
+start_slave_node(Parent, Config, Testcase, N) ->
+ Prefix = case ?config(enable_feature_flags_v2, Config) of
+ false -> "ffv1";
+ true -> "ffv2"
+ end,
+ Name = list_to_atom(
+ rabbit_misc:format("~s-~s-~b", [Prefix, Testcase, N])),
+ ct:pal("- Starting slave node `~s@...`", [Name]),
+ {ok, Node} = slave:start(net_adm:localhost(), Name),
+ ct:pal("- Slave node `~s` started", [Node]),
+
+ TestCodePath = filename:dirname(code:which(?MODULE)),
+ true = rpc:call(Node, code, add_path, [TestCodePath]),
+ ok = run_on_node(Node, fun setup_slave_node/1, [Config]),
+ ct:pal("- Slave node `~s` configured", [Node]),
+ Parent ! {node, self(), Node}.
+
+stop_slave_nodes(Config) ->
+ Nodes = ?config(nodes, Config),
+ ct:pal("Stopping ~b slave nodes:", [length(Nodes)]),
+ lists:foreach(fun stop_slave_node/1, Nodes),
+ rabbit_ct_helpers:delete_config(Config, nodes).
+
+stop_slave_node(Node) ->
+ ct:pal("- Stopping slave node `~s`...", [Node]),
+ ok = slave:stop(Node).
+
+connect_nodes([FirstNode | OtherNodes] = Nodes) ->
+ lists:foreach(
+ fun(Node) -> pong = rpc:call(Node, net_adm, ping, [FirstNode]) end,
+ OtherNodes),
+ Cluster = lists:sort(
+ [FirstNode | rpc:call(FirstNode, erlang, nodes, [])]),
+ ?assert(lists:all(fun(Node) -> lists:member(Node, Cluster) end, Nodes)).
+
+run_on_node(Node, Fun) ->
+ run_on_node(Node, Fun, []).
+
+run_on_node(Node, Fun, Args) ->
+ rpc:call(Node, erlang, apply, [Fun, Args]).
+
+%% -------------------------------------------------------------------
+%% Slave node configuration.
+%% -------------------------------------------------------------------
+
+setup_slave_node(Config) ->
+ ok = setup_logger(),
+ ok = setup_feature_flags_file(Config),
+ ok = start_controller(),
+ ok = maybe_enable_feature_flags_v2(Config),
+ ok.
+
+setup_logger() ->
+ logger:set_primary_config(level, debug),
+ ok.
+
+setup_feature_flags_file(Config) ->
+ %% The `feature_flags_file' is set to a specific location in the test log
+ %% directory.
+ FeatureFlagsFile = filename:join(
+ ?config(priv_dir, Config),
+ rabbit_misc:format("feature_flags-~s", [node()])),
+ ?LOG_INFO("Setting `feature_flags_file to \"~ts\"", [FeatureFlagsFile]),
+ case application:load(rabbit) of
+ ok -> ok;
+ {error, {already_loaded, _}} -> ok
+ end,
+ ok = application:set_env(rabbit, feature_flags_file, FeatureFlagsFile),
+ ok.
+
+start_controller() ->
+ ?LOG_INFO("Starting feature flags controller"),
+ {ok, Pid} = rabbit_ff_controller:start(),
+ ?LOG_INFO("Feature flags controller: ~p", [Pid]),
+ ok.
+
+maybe_enable_feature_flags_v2(Config) ->
+ EnableFFv2 = ?config(enable_feature_flags_v2, Config),
+ case EnableFFv2 of
+ true -> ok = rabbit_feature_flags:enable(feature_flags_v2);
+ false -> ok
+ end,
+ IsEnabled = rabbit_feature_flags:is_enabled(feature_flags_v2),
+ ?LOG_INFO("`feature_flags_v2` enabled: ~s", [IsEnabled]),
+ ?assertEqual(EnableFFv2, IsEnabled),
+ ok.
+
+override_running_nodes(Nodes) when is_list(Nodes) ->
+ ct:pal("Overring (running) remote nodes for ~p", [Nodes]),
+ _ = [begin
+ ok = rpc:call(
+ Node, rabbit_feature_flags, override_nodes,
+ [Nodes]),
+ ?assertEqual(
+ Nodes,
+ lists:sort(
+ [Node |
+ rpc:call(Node, rabbit_feature_flags, remote_nodes, [])])),
+ ?assertEqual(
+ Nodes,
+ rpc:call(Node, rabbit_ff_controller, all_nodes, [])),
+
+ ok = rpc:call(
+ Node, rabbit_feature_flags, override_running_nodes,
+ [Nodes]),
+ ?assertEqual(
+ Nodes,
+ lists:sort(
+ [Node |
+ rpc:call(
+ Node, rabbit_feature_flags, running_remote_nodes, [])])),
+ ?assertEqual(
+ Nodes,
+ rpc:call(Node, rabbit_ff_controller, running_nodes, []))
+ end
+ || Node <- Nodes],
+ ok.
+
+inject_on_nodes(Nodes, FeatureFlags) ->
+ ct:pal(
+ "Injecting feature flags on nodes~n"
+ " Nodes: ~p~n"
+ " Feature flags: ~p~n",
+ [FeatureFlags, Nodes]),
+ _ = [ok =
+ run_on_node(
+ Node,
+ fun() ->
+ ?assertEqual(
+ ok,
+ rabbit_feature_flags:inject_test_feature_flags(
+ FeatureFlags)),
+ ok
+ end,
+ [])
+ || Node <- Nodes],
+ ok.
+
+%% -------------------------------------------------------------------
+%% Migration functions.
+%% -------------------------------------------------------------------
+
+-define(PT_MIGRATION_FUN_RUNS, {?MODULE, migration_fun_runs}).
+
+bump_runs() ->
+ Count = persistent_term:get(?PT_MIGRATION_FUN_RUNS, 0),
+ persistent_term:put(?PT_MIGRATION_FUN_RUNS, Count + 1),
+ ok.
+
+mf_count_runs(_FeatureName, _FeatureProps, enable) ->
+ bump_runs(),
+ ok.
+
+mf_wait_and_count_runs(_FeatureName, _FeatureProps, enable) ->
+ Peer = get_peer_proc(),
+ Peer ! {node(), self(), waiting},
+ ?LOG_NOTICE("Migration function: waiting for signal from ~p...", [Peer]),
+ receive proceed -> ok end,
+ ?LOG_NOTICE("Migration function: unblocked!", []),
+ bump_runs(),
+ ok.
+
+mf_wait_and_count_runs_v2(#ffcommand{command = enable}) ->
+ Peer = get_peer_proc(),
+ Peer ! {node(), self(), waiting},
+ ?LOG_NOTICE("Migration function: waiting for signal from ~p...", [Peer]),
+ receive proceed -> ok end,
+ ?LOG_NOTICE("Migration function: unblocked!", []),
+ bump_runs(),
+ ok;
+mf_wait_and_count_runs_v2(_) ->
+ ok.
+
+mf_wait_and_count_runs_in_post_enable(#ffcommand{command = post_enable}) ->
+ Peer = get_peer_proc(),
+ Peer ! {node(), self(), waiting},
+ ?LOG_NOTICE("Migration function: waiting for signal from ~p...", [Peer]),
+ receive proceed -> ok end,
+ ?LOG_NOTICE("Migration function: unblocked!", []),
+ bump_runs(),
+ ok;
+mf_wait_and_count_runs_in_post_enable(_) ->
+ ok.
+
+-define(PT_PEER_PROC, {?MODULE, peer_proc}).
+
+record_peer_proc(Peer) ->
+ ?LOG_ALERT("Recording peer=~p", [Peer]),
+ persistent_term:put(?PT_PEER_PROC, Peer).
+
+get_peer_proc() ->
+ persistent_term:get(?PT_PEER_PROC).
+
+%% -------------------------------------------------------------------
+%% Testcases.
+%% -------------------------------------------------------------------
+
+enable_unknown_feature_flag_on_a_single_node(Config) ->
+ [Node] = ?config(nodes, Config),
+ ok = run_on_node(
+ Node, fun enable_unknown_feature_flag_on_a_single_node/0).
+
+enable_unknown_feature_flag_on_a_single_node() ->
+ FeatureName = ?FUNCTION_NAME,
+ ?assertNot(rabbit_feature_flags:is_supported(FeatureName)),
+ ?assertNot(rabbit_feature_flags:is_enabled(FeatureName)),
+
+ %% The node doesn't know about the feature flag and thus rejects the
+ %% request.
+ ?assertEqual(
+ {error, unsupported}, rabbit_feature_flags:enable(FeatureName)),
+ ?assertNot(rabbit_feature_flags:is_supported(FeatureName)),
+ ?assertNot(rabbit_feature_flags:is_enabled(FeatureName)),
+ ok.
+
+enable_supported_feature_flag_on_a_single_node(Config) ->
+ [Node] = ?config(nodes, Config),
+ ok = run_on_node(
+ Node, fun enable_supported_feature_flag_on_a_single_node/0).
+
+enable_supported_feature_flag_on_a_single_node() ->
+ FeatureName = ?FUNCTION_NAME,
+ FeatureFlags = #{FeatureName => #{provided_by => ?MODULE,
+ stability => stable}},
+ ?assertEqual(
+ ok, rabbit_feature_flags:inject_test_feature_flags(FeatureFlags)),
+ ?assert(rabbit_feature_flags:is_supported(FeatureName)),
+ ?assertNot(rabbit_feature_flags:is_enabled(FeatureName)),
+
+ ?assertEqual(ok, rabbit_feature_flags:enable(FeatureName)),
+ ?assert(rabbit_feature_flags:is_supported(FeatureName)),
+ ?assert(rabbit_feature_flags:is_enabled(FeatureName)),
+ ok.
+
+enable_unknown_feature_flag_in_a_3node_cluster(Config) ->
+ Nodes = ?config(nodes, Config),
+ connect_nodes(Nodes),
+ override_running_nodes(Nodes),
+
+ FeatureName = ?FUNCTION_NAME,
+
+ ct:pal(
+ "Checking the feature flag is unsupported and disabled on all nodes"),
+ _ = [ok =
+ run_on_node(
+ Node,
+ fun() ->
+ ?assertNot(rabbit_feature_flags:is_supported(FeatureName)),
+ ?assertNot(rabbit_feature_flags:is_enabled(FeatureName)),
+ ok
+ end,
+ [])
+ || Node <- Nodes],
+
+ %% No nodes know about the feature flag and thus all reject the request.
+ ct:pal("Trying to enable the feature flag on all nodes"),
+ _ = [ok =
+ run_on_node(
+ Node,
+ fun() ->
+ ?assertEqual(
+ {error, unsupported},
+ rabbit_feature_flags:enable(FeatureName)),
+ ok
+ end,
+ [])
+ || Node <- Nodes],
+ ct:pal(
+ "Checking the feature flag is still unsupported and disabled on all "
+ "nodes"),
+ _ = [ok =
+ run_on_node(
+ Node,
+ fun() ->
+ ?assertNot(rabbit_feature_flags:is_supported(FeatureName)),
+ ?assertNot(rabbit_feature_flags:is_enabled(FeatureName)),
+ ok
+ end,
+ [])
+ || Node <- Nodes],
+ ok.
+
+enable_supported_feature_flag_in_a_3node_cluster(Config) ->
+ Nodes = ?config(nodes, Config),
+ connect_nodes(Nodes),
+ override_running_nodes(Nodes),
+
+ FeatureName = ?FUNCTION_NAME,
+ FeatureFlags = #{FeatureName => #{provided_by => ?MODULE,
+ stability => stable}},
+ inject_on_nodes(Nodes, FeatureFlags),
+
+ ct:pal(
+ "Checking the feature flag is supported but disabled on all nodes"),
+ _ = [ok =
+ run_on_node(
+ Node,
+ fun() ->
+ ?assert(rabbit_feature_flags:is_supported(FeatureName)),
+ ?assertNot(rabbit_feature_flags:is_enabled(FeatureName)),
+ ok
+ end,
+ [])
+ || Node <- Nodes],
+
+ %% The first call enables the feature flag, the following calls are
+ %% idempotent and do nothing.
+ ct:pal("Enabling the feature flag on all nodes"),
+ _ = [ok =
+ run_on_node(
+ Node,
+ fun() ->
+ ?assertEqual(ok, rabbit_feature_flags:enable(FeatureName)),
+ ok
+ end,
+ [])
+ || Node <- Nodes],
+ ct:pal("Checking the feature flag is supported and enabled on all nodes"),
+ _ = [ok =
+ run_on_node(
+ Node,
+ fun() ->
+ ?assert(rabbit_feature_flags:is_supported(FeatureName)),
+ ?assert(rabbit_feature_flags:is_enabled(FeatureName)),
+ ok
+ end,
+ [])
+ || Node <- Nodes],
+ ok.
+
+enable_partially_supported_feature_flag_in_a_3node_cluster(Config) ->
+ [FirstNode | OtherNodes] = Nodes = ?config(nodes, Config),
+ connect_nodes(Nodes),
+ override_running_nodes(Nodes),
+ UsingFFv1 = not ?config(enable_feature_flags_v2, Config),
+
+ %% This time, we inject the feature flag on a single node only. The other
+ %% nodes don't know about it.
+ FeatureName = ?FUNCTION_NAME,
+ FeatureFlags = #{FeatureName => #{provided_by => ?MODULE,
+ stability => stable}},
+ inject_on_nodes([FirstNode], FeatureFlags),
+
+ case UsingFFv1 of
+ true ->
+ %% With `feature_flags_v1', the code would have shared the new
+ %% feature flags with remote nodes, so let's run that here. In the
+ %% end, the testcase is similar to
+ %% `enable_supported_feature_flag_in_a_3node_cluster'.
+ ct:pal("Refreshing feature flags after app load"),
+ ok = run_on_node(
+ FirstNode,
+ fun() ->
+ ?assertEqual(
+ ok,
+ rabbit_feature_flags:
+ share_new_feature_flags_after_app_load(
+ FeatureFlags, infinity)),
+ ok
+ end,
+ []);
+ false ->
+ ok
+ end,
+
+ ct:pal(
+ "Checking the feature flag is supported but disabled on all nodes"),
+ _ = [ok =
+ run_on_node(
+ Node,
+ fun() ->
+ ?assert(rabbit_feature_flags:is_supported(FeatureName)),
+ ?assertNot(rabbit_feature_flags:is_enabled(FeatureName)),
+ ok
+ end,
+ [])
+ || Node <- Nodes],
+
+ %% The first call enables the feature flag, the following calls are
+ %% idempotent and do nothing.
+ ct:pal("Enabling the feature flag on all nodes"),
+ _ = [ok =
+ run_on_node(
+ Node,
+ fun() ->
+ ?assertEqual(
+ ok,
+ rabbit_feature_flags:enable(FeatureName)),
+ ok
+ end,
+ [])
+ || Node <- Nodes],
+ ct:pal(
+ "Checking the feature flag is supported on all nodes and enabled on "
+ "all nodes (v1) or the node knowing it only (v2)"),
+ ok = run_on_node(
+ FirstNode,
+ fun() ->
+ ?assert(rabbit_feature_flags:is_supported(FeatureName)),
+ ?assert(rabbit_feature_flags:is_enabled(FeatureName)),
+ ok
+ end,
+ []),
+ _ = [ok =
+ run_on_node(
+ Node,
+ fun() ->
+ case UsingFFv1 of
+ true ->
+ ?assert(
+ rabbit_feature_flags:is_supported(FeatureName)),
+ ?assert(
+ rabbit_feature_flags:is_enabled(FeatureName));
+ false ->
+ ?assert(
+ rabbit_feature_flags:is_supported(FeatureName)),
+ ?assertNot(
+ rabbit_feature_flags:is_enabled(FeatureName))
+ end,
+ ok
+ end,
+ [])
+ || Node <- OtherNodes],
+ ok.
+
+enable_unsupported_feature_flag_in_a_3node_cluster(Config) ->
+ [FirstNode | _OtherNodes] = Nodes = ?config(nodes, Config),
+ connect_nodes(Nodes),
+ override_running_nodes(Nodes),
+
+ %% We inject the feature flag on a single node only. We tell it is
+ %% provided by `rabbit' which was already loaded and scanned while
+ %% configuring the node. This way, we ensure the feature flag is
+ %% considered supported by the node where is was injected, but
+ %% unsupported by other nodes.
+ FeatureName = ?FUNCTION_NAME,
+ FeatureFlags = #{FeatureName => #{provided_by => rabbit,
+ stability => stable}},
+ inject_on_nodes([FirstNode], FeatureFlags),
+
+ ct:pal(
+ "Checking the feature flag is unsupported and disabled on all nodes"),
+ _ = [ok =
+ run_on_node(
+ Node,
+ fun() ->
+ ?assertNot(rabbit_feature_flags:is_supported(FeatureName)),
+ ?assertNot(rabbit_feature_flags:is_enabled(FeatureName)),
+ ok
+ end,
+ [])
+ || Node <- Nodes],
+
+ %% The feature flag is unsupported, thus all reject the request.
+ ct:pal("Enabling the feature flag on all nodes (denied)"),
+ _ = [ok =
+ run_on_node(
+ Node,
+ fun() ->
+ ?assertEqual(
+ {error, unsupported},
+ rabbit_feature_flags:enable(FeatureName)),
+ ok
+ end,
+ [])
+ || Node <- Nodes],
+ ct:pal("Checking the feature flag is still disabled on all nodes"),
+ _ = [ok =
+ run_on_node(
+ Node,
+ fun() ->
+ ?assertNot(rabbit_feature_flags:is_enabled(FeatureName)),
+ ok
+ end,
+ [])
+ || Node <- Nodes],
+ ok.
+
+enable_feature_flag_in_cluster_and_add_member_after(Config) ->
+ AllNodes = [NewNode | [FirstNode | _] = Nodes] = ?config(nodes, Config),
+ connect_nodes(Nodes),
+ override_running_nodes([NewNode]),
+ override_running_nodes(Nodes),
+
+ FeatureName = ?FUNCTION_NAME,
+ FeatureFlags = #{FeatureName =>
+ #{provided_by => ?MODULE,
+ stability => stable,
+ migration_fun => {?MODULE, mf_count_runs}}},
+ inject_on_nodes(AllNodes, FeatureFlags),
+
+ ct:pal(
+ "Checking the feature flag is supported but disabled on all nodes"),
+ _ = [ok =
+ run_on_node(
+ Node,
+ fun() ->
+ ?assert(rabbit_feature_flags:is_supported(FeatureName)),
+ ?assertNot(rabbit_feature_flags:is_enabled(FeatureName)),
+ ok
+ end,
+ [])
+ || Node <- AllNodes],
+
+ ct:pal("Enabling the feature flag in the cluster"),
+ _ = [ok =
+ run_on_node(
+ Node,
+ fun() ->
+ ?assertEqual(ok, rabbit_feature_flags:enable(FeatureName)),
+ ok
+ end,
+ [])
+ || Node <- Nodes],
+ ct:pal("Checking the feature flag is enabled in the initial cluster"),
+ _ = [ok =
+ run_on_node(
+ Node,
+ fun() ->
+ ?assert(rabbit_feature_flags:is_enabled(FeatureName)),
+ ok
+ end,
+ [])
+ || Node <- Nodes],
+ ct:pal("Checking the feature flag is still disabled on the new node"),
+ ok = run_on_node(
+ NewNode,
+ fun() ->
+ ?assertNot(rabbit_feature_flags:is_enabled(FeatureName)),
+ ok
+ end,
+ []),
+
+ %% Check compatibility between NewNodes and Nodes.
+ ok = run_on_node(
+ NewNode,
+ fun() ->
+ ?assertEqual(
+ ok,
+ rabbit_feature_flags:check_node_compatibility(
+ FirstNode)),
+ ok
+ end, []),
+
+ %% Add node to cluster and synchronize feature flags.
+ connect_nodes(AllNodes),
+ override_running_nodes(AllNodes),
+ ct:pal(
+ "Synchronizing feature flags in the expanded cluster~n"
+ "~n"
+ "NOTE: Error messages about crashed migration functions can be "
+ "ignored for feature~n"
+ " flags other than `~s`~n"
+ " because they assume they run inside RabbitMQ.",
+ [FeatureName]),
+ ok = run_on_node(
+ NewNode,
+ fun() ->
+ ?assertEqual(
+ ok,
+ rabbit_feature_flags:sync_feature_flags_with_cluster(
+ Nodes, true)),
+ ok
+ end, []),
+
+ ct:pal("Checking the feature flag is enabled in the expanded cluster"),
+ _ = [ok =
+ run_on_node(
+ Node,
+ fun() ->
+ ?assert(rabbit_feature_flags:is_enabled(FeatureName)),
+ %% With both feature flags v1 and v2, the migration
+ %% function is executed on the node where `enable()' was
+ %% called, and then on the node joining the cluster.
+ Count = case Node of
+ FirstNode -> 1;
+ NewNode -> 1;
+ _ -> 0
+ end,
+ ?assertEqual(
+ Count,
+ persistent_term:get(?PT_MIGRATION_FUN_RUNS, 0)),
+ ok
+ end,
+ [])
+ || Node <- AllNodes],
+ ok.
+
+enable_feature_flag_in_cluster_and_add_member_concurrently_mfv1(Config) ->
+ AllNodes = [NewNode | [FirstNode | _] = Nodes] = ?config(nodes, Config),
+ connect_nodes(Nodes),
+ override_running_nodes([NewNode]),
+ override_running_nodes(Nodes),
+
+ FeatureName = ?FUNCTION_NAME,
+ FeatureFlags = #{FeatureName =>
+ #{provided_by => ?MODULE,
+ stability => stable,
+ migration_fun => {?MODULE, mf_wait_and_count_runs}}},
+ inject_on_nodes(AllNodes, FeatureFlags),
+
+ ct:pal(
+ "Checking the feature flag is supported but disabled on all nodes"),
+ _ = [ok =
+ run_on_node(
+ Node,
+ fun() ->
+ ?assert(rabbit_feature_flags:is_supported(FeatureName)),
+ ?assertNot(rabbit_feature_flags:is_enabled(FeatureName)),
+ ok
+ end,
+ [])
+ || Node <- AllNodes],
+
+ ct:pal(
+ "Enabling the feature flag in the cluster (in a separate process)"),
+ Peer = self(),
+ Enabler = spawn_link(
+ fun() ->
+ ok =
+ run_on_node(
+ FirstNode,
+ fun() ->
+ %% The migration function uses the `Peer'
+ %% PID (the process executing the testcase)
+ %% to notify its own PID and wait for a
+ %% signal from `Peer' to proceed and finish
+ %% the migration.
+ record_peer_proc(Peer),
+ ?assertEqual(
+ ok,
+ rabbit_feature_flags:enable(
+ FeatureName)),
+ ok
+ end,
+ [])
+ end),
+
+ %% By waiting for the message from one of the migration function
+ %% instances, we make sure the feature flags controller on `FirstNode' is
+ %% blocked and waits for a message from this process. Therefore, we are
+ %% sure the feature flag is in the `state_changing' state and we can try
+ %% to add a new node and sync its feature flags.
+ FirstNodeMigFunPid = receive
+ {FirstNode, MigFunPid1, waiting} -> MigFunPid1
+ end,
+
+ %% Check compatibility between NewNodes and Nodes. This doesn't block.
+ ok = run_on_node(
+ NewNode,
+ fun() ->
+ ?assertEqual(
+ ok,
+ rabbit_feature_flags:check_node_compatibility(
+ FirstNode)),
+ ok
+ end, []),
+
+ %% Add node to cluster and synchronize feature flags. The synchronization
+ %% blocks.
+ connect_nodes(AllNodes),
+ override_running_nodes(AllNodes),
+ ct:pal(
+ "Synchronizing feature flags in the expanded cluster (in a separate "
+ "process)~n"
+ "~n"
+ "NOTE: Error messages about crashed migration functions can be "
+ "ignored for feature~n"
+ " flags other than `~s`~n"
+ " because they assume they run inside RabbitMQ.",
+ [FeatureName]),
+ Syncer = spawn_link(
+ fun() ->
+ ok =
+ run_on_node(
+ NewNode,
+ fun() ->
+ record_peer_proc(Peer),
+ ?assertEqual(
+ ok,
+ rabbit_feature_flags:
+ sync_feature_flags_with_cluster(
+ Nodes, true)),
+ ok
+ end, [])
+ end),
+
+ ct:pal(
+ "Checking the feature flag state is changing in the initial cluster"),
+ _ = [ok =
+ run_on_node(
+ Node,
+ fun() ->
+ ?assertEqual(
+ state_changing,
+ rabbit_feature_flags:is_enabled(
+ FeatureName, non_blocking)),
+ ok
+ end,
+ [])
+ || Node <- Nodes],
+
+ ct:pal("Checking the feature flag is still disabled on the new node"),
+ ok = run_on_node(
+ NewNode,
+ fun() ->
+ ?assertNot(rabbit_feature_flags:is_enabled(FeatureName)),
+ ok
+ end,
+ []),
+
+ %% Unblock the migration functions on `Nodes'.
+ UsingFFv1 = not ?config(enable_feature_flags_v2, Config),
+ EnablerMRef = erlang:monitor(process, Enabler),
+ SyncerMRef = erlang:monitor(process, Syncer),
+ unlink(Enabler),
+ unlink(Syncer),
+ ExpectedNodes = case UsingFFv1 of
+ true ->
+ %% With v1, the migration function runs on a
+ %% single node in the cluster only in this
+ %% scenario.
+ %%
+ %% The reason is that the new node joined during
+ %% the migration and the feature flag was marked
+ %% as enabled there as well, even though the
+ %% migration function possibly didn't know about
+ %% it. This is one of the problems
+ %% `feature_flags_v2' fixes.
+ [FirstNode];
+ false ->
+ %% With v2 but still using the old migration
+ %% function API (taking 3 arguments), the
+ %% migration function is executed on the node
+ %% where `enable()' was called, and then on the
+ %% node joining the cluster, thanks to the
+ %% synchronization.
+ [FirstNode, NewNode]
+ end,
+
+ %% Unblock the migration function for which we already consumed the
+ %% `waiting' notification.
+ FirstMigratedNode = node(FirstNodeMigFunPid),
+ ?assertEqual(FirstNode, FirstMigratedNode),
+ ct:pal(
+ "Unblocking first node (~p @ ~s)",
+ [FirstNodeMigFunPid, FirstMigratedNode]),
+ FirstNodeMigFunPid ! proceed,
+
+ %% Unblock the rest and collect the node names of all migration functions
+ %% which ran.
+ ct:pal("Unblocking other nodes, including the joining one"),
+ OtherMigratedNodes = [receive
+ {Node, MigFunPid2, waiting} ->
+ MigFunPid2 ! proceed,
+ Node
+ end || Node <- ExpectedNodes -- [FirstMigratedNode]],
+ MigratedNodes = [FirstMigratedNode | OtherMigratedNodes],
+ ?assertEqual(lists:sort(ExpectedNodes), lists:sort(MigratedNodes)),
+
+ ct:pal("Waiting for spawned processes to terminate"),
+ receive
+ {'DOWN', EnablerMRef, process, Enabler, EnablerReason} ->
+ ?assertEqual(normal, EnablerReason)
+ end,
+ receive
+ {'DOWN', SyncerMRef, process, Syncer, SyncerReason} ->
+ ?assertEqual(normal, SyncerReason)
+ end,
+
+ ct:pal("Checking the feature flag is enabled in the expanded cluster"),
+ _ = [ok =
+ run_on_node(
+ Node,
+ fun() ->
+ ?assert(rabbit_feature_flags:is_enabled(FeatureName)),
+ ?assert(
+ not lists:member(Node, MigratedNodes) orelse
+ 1 =:= persistent_term:get(?PT_MIGRATION_FUN_RUNS, 0)),
+ ok
+ end,
+ [])
+ || Node <- AllNodes],
+ ok.
+
+enable_feature_flag_in_cluster_and_add_member_concurrently_mfv2(Config) ->
+ AllNodes = [NewNode | [FirstNode | _] = Nodes] = ?config(nodes, Config),
+ connect_nodes(Nodes),
+ override_running_nodes([NewNode]),
+ override_running_nodes(Nodes),
+
+ FeatureName = ?FUNCTION_NAME,
+ FeatureFlags = #{FeatureName =>
+ #{provided_by => ?MODULE,
+ stability => stable,
+ migration_fun => {?MODULE,
+ mf_wait_and_count_runs_v2}}},
+ inject_on_nodes(AllNodes, FeatureFlags),
+
+ ct:pal(
+ "Checking the feature flag is supported but disabled on all nodes"),
+ _ = [ok =
+ run_on_node(
+ Node,
+ fun() ->
+ ?assert(rabbit_feature_flags:is_supported(FeatureName)),
+ ?assertNot(rabbit_feature_flags:is_enabled(FeatureName)),
+ ok
+ end,
+ [])
+ || Node <- AllNodes],
+
+ ct:pal(
+ "Enabling the feature flag in the cluster (in a separate process)"),
+ Peer = self(),
+ _ = [ok =
+ run_on_node(
+ Node,
+ fun() ->
+ %% The migration function uses the `Peer' PID (the process
+ %% executing the testcase) to notify its own PID and wait
+ %% for a signal from `Peer' to proceed and finish the
+ %% migration.
+ record_peer_proc(Peer),
+ ok
+ end,
+ [])
+ || Node <- AllNodes],
+ Enabler = spawn_link(
+ fun() ->
+ ok =
+ run_on_node(
+ FirstNode,
+ fun() ->
+ ?assertEqual(
+ ok,
+ rabbit_feature_flags:enable(
+ FeatureName)),
+ ok
+ end,
+ [])
+ end),
+
+ %% By waiting for the message from one of the migration function
+ %% instances, we make sure the feature flags controller on `FirstNode' is
+ %% blocked and waits for a message from this process. Therefore, we are
+ %% sure the feature flag is in the `state_changing' state and we can try
+ %% to add a new node and sync its feature flags.
+ FirstNodeMigFunPid = receive
+ {_Node, MigFunPid1, waiting} -> MigFunPid1
+ end,
+
+ %% Check compatibility between NewNodes and Nodes. This doesn't block.
+ ok = run_on_node(
+ NewNode,
+ fun() ->
+ ?assertEqual(
+ ok,
+ rabbit_feature_flags:check_node_compatibility(
+ FirstNode)),
+ ok
+ end, []),
+
+ %% Add node to cluster and synchronize feature flags. The synchronization
+ %% blocks.
+ connect_nodes(AllNodes),
+ override_running_nodes(AllNodes),
+ ct:pal(
+ "Synchronizing feature flags in the expanded cluster (in a separate "
+ "process)~n"
+ "~n"
+ "NOTE: Error messages about crashed migration functions can be "
+ "ignored for feature~n"
+ " flags other than `~s`~n"
+ " because they assume they run inside RabbitMQ.",
+ [FeatureName]),
+ Syncer = spawn_link(
+ fun() ->
+ ok =
+ run_on_node(
+ NewNode,
+ fun() ->
+ ?assertEqual(
+ ok,
+ rabbit_feature_flags:
+ sync_feature_flags_with_cluster(
+ Nodes, true)),
+ ok
+ end, [])
+ end),
+
+ ct:pal(
+ "Checking the feature flag state is changing in the initial cluster"),
+ _ = [ok =
+ run_on_node(
+ Node,
+ fun() ->
+ ?assertEqual(
+ state_changing,
+ rabbit_feature_flags:is_enabled(
+ FeatureName, non_blocking)),
+ ok
+ end,
+ [])
+ || Node <- Nodes],
+
+ ct:pal("Checking the feature flag is still disabled on the new node"),
+ ok = run_on_node(
+ NewNode,
+ fun() ->
+ ?assertNot(rabbit_feature_flags:is_enabled(FeatureName)),
+ ok
+ end,
+ []),
+
+ %% Unblock the migration functions on `Nodes'.
+ EnablerMRef = erlang:monitor(process, Enabler),
+ SyncerMRef = erlang:monitor(process, Syncer),
+ unlink(Enabler),
+ unlink(Syncer),
+
+ %% The migration function runs on all clustered nodes with v2, including
+ %% the one joining the cluster, thanks to the synchronization.
+ %%
+ %% When this testcase runs with feature flags v1, the feature flag we want
+ %% to enable uses the migration function API v2: this implicitly enables
+ %% `feature_flags_v2'. As part of the synchronization, the node still on
+ %% feature flags v1 will try to sync `feature_flags_v2' specificaly first.
+ %% After that, the controller-based sync proceeds.
+ ExpectedNodes = Nodes ++ [NewNode],
+
+ %% Unblock the migration function for which we already consumed the
+ %% `waiting' notification.
+ FirstMigratedNode = node(FirstNodeMigFunPid),
+ ct:pal(
+ "Unblocking first node (~p @ ~s)",
+ [FirstNodeMigFunPid, FirstMigratedNode]),
+ FirstNodeMigFunPid ! proceed,
+
+ %% Unblock the rest and collect the node names of all migration functions
+ %% which ran.
+ ct:pal("Unblocking other nodes, including the joining one"),
+ OtherMigratedNodes = [receive
+ {Node, MigFunPid2, waiting} ->
+ MigFunPid2 ! proceed,
+ Node
+ end || Node <- ExpectedNodes -- [FirstMigratedNode]],
+ MigratedNodes = [FirstMigratedNode | OtherMigratedNodes],
+ ?assertEqual(lists:sort(ExpectedNodes), lists:sort(MigratedNodes)),
+
+ ct:pal("Waiting for spawned processes to terminate"),
+ receive
+ {'DOWN', EnablerMRef, process, Enabler, EnablerReason} ->
+ ?assertEqual(normal, EnablerReason)
+ end,
+ receive
+ {'DOWN', SyncerMRef, process, Syncer, SyncerReason} ->
+ ?assertEqual(normal, SyncerReason)
+ end,
+
+ ct:pal("Checking the feature flag is enabled in the expanded cluster"),
+ _ = [ok =
+ run_on_node(
+ Node,
+ fun() ->
+ ?assert(rabbit_feature_flags:is_enabled(FeatureName)),
+ ?assertEqual(
+ 1,
+ persistent_term:get(?PT_MIGRATION_FUN_RUNS, 0)),
+ ok
+ end,
+ [])
+ || Node <- AllNodes],
+ ok.
+
+enable_feature_flag_in_cluster_and_remove_member_concurrently_mfv1(Config) ->
+ AllNodes = [LeavingNode | [FirstNode | _] = Nodes] = ?config(
+ nodes, Config),
+ connect_nodes(AllNodes),
+ override_running_nodes(AllNodes),
+
+ FeatureName = ?FUNCTION_NAME,
+ FeatureFlags = #{FeatureName =>
+ #{provided_by => ?MODULE,
+ stability => stable,
+ migration_fun => {?MODULE, mf_wait_and_count_runs}}},
+ inject_on_nodes(AllNodes, FeatureFlags),
+
+ ct:pal(
+ "Checking the feature flag is supported but disabled on all nodes"),
+ _ = [ok =
+ run_on_node(
+ Node,
+ fun() ->
+ ?assert(rabbit_feature_flags:is_supported(FeatureName)),
+ ?assertNot(rabbit_feature_flags:is_enabled(FeatureName)),
+ ok
+ end,
+ [])
+ || Node <- AllNodes],
+
+ UsingFFv1 = not ?config(enable_feature_flags_v2, Config),
+ ExpectedRet = case UsingFFv1 of
+ true -> ok;
+ false -> {error, {badrpc, nodedown}}
+ end,
+ ct:pal(
+ "Enabling the feature flag in the cluster (in a separate process)"),
+ Peer = self(),
+ Enabler = spawn_link(
+ fun() ->
+ ok =
+ run_on_node(
+ FirstNode,
+ fun() ->
+ %% The migration function uses the `Peer'
+ %% PID (the process executing the testcase)
+ %% to notify its own PID and wait for a
+ %% signal from `Peer' to proceed and finish
+ %% the migration.
+ record_peer_proc(Peer),
+ ?assertEqual(
+ ExpectedRet,
+ rabbit_feature_flags:enable(
+ FeatureName)),
+ ok
+ end,
+ [])
+ end),
+
+ %% By waiting for the message from one of the migration function
+ %% instances, we make sure the feature flags controller on `FirstNode' is
+ %% blocked and waits for a message from this process. Therefore, we are
+ %% sure the feature flag is in the `state_changing' state and we can try
+ %% to add a new node and sync its feature flags.
+ FirstNodeMigFunPid = receive
+ {_Node, MigFunPid1, waiting} -> MigFunPid1
+ end,
+
+ %% Remove node from cluster.
+ stop_slave_node(LeavingNode),
+ override_running_nodes(Nodes),
+
+ %% Unblock the migration functions on `Nodes'.
+ EnablerMRef = erlang:monitor(process, Enabler),
+ unlink(Enabler),
+
+ %% Unblock the migration function for which we already consumed the
+ %% `waiting' notification.
+ FirstMigratedNode = node(FirstNodeMigFunPid),
+ ct:pal(
+ "Unblocking first node (~p @ ~s)",
+ [FirstNodeMigFunPid, FirstMigratedNode]),
+ FirstNodeMigFunPid ! proceed,
+
+ ct:pal("Waiting for spawned processes to terminate"),
+ receive
+ {'DOWN', EnablerMRef, process, Enabler, EnablerReason} ->
+ ?assertEqual(normal, EnablerReason)
+ end,
+
+ ct:pal(
+ "Checking the feature flag is enabled (v1) or disabled (v2) in the "
+ "cluster"),
+ _ = [ok =
+ run_on_node(
+ Node,
+ fun() ->
+ case UsingFFv1 of
+ true ->
+ ?assert(
+ rabbit_feature_flags:is_enabled(FeatureName));
+ false ->
+ ?assertNot(
+ rabbit_feature_flags:is_enabled(FeatureName))
+ end,
+ ok
+ end,
+ [])
+ || Node <- Nodes],
+ ok.
+
+enable_feature_flag_in_cluster_and_remove_member_concurrently_mfv2(Config) ->
+ AllNodes = [LeavingNode | [FirstNode | _] = Nodes] = ?config(
+ nodes, Config),
+ connect_nodes(AllNodes),
+ override_running_nodes(AllNodes),
+
+ FeatureName = ?FUNCTION_NAME,
+ FeatureFlags = #{FeatureName =>
+ #{provided_by => ?MODULE,
+ stability => stable,
+ migration_fun => {?MODULE,
+ mf_wait_and_count_runs_v2}}},
+ inject_on_nodes(AllNodes, FeatureFlags),
+
+ UsingFFv1 = not ?config(enable_feature_flags_v2, Config),
+
+ ct:pal(
+ "Checking the feature flag is supported but disabled on all nodes"),
+ _ = [ok =
+ run_on_node(
+ Node,
+ fun() ->
+ ?assert(rabbit_feature_flags:is_supported(FeatureName)),
+ ?assertNot(rabbit_feature_flags:is_enabled(FeatureName)),
+ ok
+ end,
+ [])
+ || Node <- AllNodes],
+
+ ct:pal(
+ "Enabling the feature flag in the cluster (in a separate process)"),
+ Peer = self(),
+ _ = [ok =
+ run_on_node(
+ Node,
+ fun() ->
+ %% The migration function uses the `Peer' PID (the process
+ %% executing the testcase) to notify its own PID and wait
+ %% for a signal from `Peer' to proceed and finish the
+ %% migration.
+ record_peer_proc(Peer),
+ ok
+ end,
+ [])
+ || Node <- AllNodes],
+ Enabler = spawn_link(
+ fun() ->
+ ok =
+ run_on_node(
+ FirstNode,
+ fun() ->
+ ?assertEqual(
+ {error, {badrpc, nodedown}},
+ rabbit_feature_flags:enable(
+ FeatureName)),
+ ok
+ end,
+ [])
+ end),
+
+ %% By waiting for the message from one of the migration function
+ %% instances, we make sure the feature flags controller on `FirstNode' is
+ %% blocked and waits for a message from this process. Therefore, we are
+ %% sure the feature flag is in the `state_changing' state and we can try
+ %% to add a new node and sync its feature flags.
+ FirstNodeMigFunPid = receive
+ {FirstNode, MigFunPid1, waiting} -> MigFunPid1
+ end,
+
+ %% Remove node from cluster.
+ stop_slave_node(LeavingNode),
+ override_running_nodes(Nodes),
+
+ %% Unblock the migration functions on `Nodes'.
+ EnablerMRef = erlang:monitor(process, Enabler),
+ unlink(Enabler),
+
+ %% The migration function runs on all clustered nodes with v2.
+ %%
+ %% When this testcase runs with feature flags v1, the feature flag we want
+ %% to enable uses the migration function API v2: this implicitly enables
+ %% `feature_flags_v2'. As part of the synchronization, the node still on
+ %% feature flags v1 will try to sync `feature_flags_v2' specificaly first.
+ %% After that, the controller-based sync proceeds.
+ ExpectedNodes = Nodes,
+
+ %% Unblock the migration function for which we already consumed the
+ %% `waiting' notification.
+ FirstMigratedNode = node(FirstNodeMigFunPid),
+ ?assertEqual(FirstNode, FirstMigratedNode),
+ ct:pal(
+ "Unblocking first node (~p @ ~s)",
+ [FirstNodeMigFunPid, FirstMigratedNode]),
+ FirstNodeMigFunPid ! proceed,
+
+ %% Unblock the rest and collect the node names of all migration functions
+ %% which ran.
+ ct:pal("Unblocking other nodes"),
+ OtherMigratedNodes = [receive
+ {Node, MigFunPid2, waiting} ->
+ MigFunPid2 ! proceed,
+ Node
+ end || Node <- ExpectedNodes -- [FirstMigratedNode]],
+ MigratedNodes = [FirstMigratedNode | OtherMigratedNodes],
+ ?assertEqual(lists:sort(ExpectedNodes), lists:sort(MigratedNodes)),
+
+ ct:pal("Waiting for spawned processes to terminate"),
+ receive
+ {'DOWN', EnablerMRef, process, Enabler, EnablerReason} ->
+ ?assertEqual(normal, EnablerReason)
+ end,
+
+ ct:pal(
+ "Checking the feature flag is enabled (v1) or disabled (v2) in the "
+ "cluster"),
+ _ = [ok =
+ run_on_node(
+ Node,
+ fun() ->
+ case UsingFFv1 of
+ true ->
+ ?assertNot(
+ rabbit_feature_flags:is_enabled(FeatureName));
+ false ->
+ ?assertNot(
+ rabbit_feature_flags:is_enabled(FeatureName))
+ end,
+ ok
+ end,
+ [])
+ || Node <- Nodes],
+ ok.
+
+enable_feature_flag_with_post_enable(Config) ->
+ AllNodes = [NewNode | [FirstNode | _] = Nodes] = ?config(nodes, Config),
+ connect_nodes(Nodes),
+ override_running_nodes([NewNode]),
+ override_running_nodes(Nodes),
+
+ FeatureName = ?FUNCTION_NAME,
+ FeatureFlags = #{FeatureName =>
+ #{provided_by => ?MODULE,
+ stability => stable,
+ migration_fun =>
+ {?MODULE,
+ mf_wait_and_count_runs_in_post_enable}}},
+ inject_on_nodes(AllNodes, FeatureFlags),
+
+ ct:pal(
+ "Checking the feature flag is supported but disabled on all nodes"),
+ _ = [ok =
+ run_on_node(
+ Node,
+ fun() ->
+ ?assert(rabbit_feature_flags:is_supported(FeatureName)),
+ ?assertNot(rabbit_feature_flags:is_enabled(FeatureName)),
+ ok
+ end,
+ [])
+ || Node <- AllNodes],
+
+ ct:pal(
+ "Enabling the feature flag in the cluster (in a separate process)"),
+ Peer = self(),
+ _ = [ok =
+ run_on_node(
+ Node,
+ fun() ->
+ %% The migration function uses the `Peer' PID (the process
+ %% executing the testcase) to notify its own PID and wait
+ %% for a signal from `Peer' to proceed and finish the
+ %% migration.
+ record_peer_proc(Peer),
+ ok
+ end,
+ [])
+ || Node <- AllNodes],
+ Enabler = spawn_link(
+ fun() ->
+ ok =
+ run_on_node(
+ FirstNode,
+ fun() ->
+ ?assertEqual(
+ ok,
+ rabbit_feature_flags:enable(
+ FeatureName)),
+ ok
+ end,
+ [])
+ end),
+
+ %% By waiting for the message from one of the migration function
+ %% instances, we make sure the feature flags controller on `FirstNode' is
+ %% blocked and waits for a message from this process. Therefore, we are
+ %% sure the feature flag is in the `state_changing' state and we can try
+ %% to add a new node and sync its feature flags.
+ FirstNodeMigFunPid = receive
+ {_Node, MigFunPid1, waiting} -> MigFunPid1
+ end,
+
+ %% Check compatibility between NewNodes and Nodes. This doesn't block.
+ ok = run_on_node(
+ NewNode,
+ fun() ->
+ ?assertEqual(
+ ok,
+ rabbit_feature_flags:check_node_compatibility(
+ FirstNode)),
+ ok
+ end, []),
+
+ %% Add node to cluster and synchronize feature flags. The synchronization
+ %% blocks.
+ connect_nodes(AllNodes),
+ override_running_nodes(AllNodes),
+ ct:pal(
+ "Synchronizing feature flags in the expanded cluster (in a separate "
+ "process)~n"
+ "~n"
+ "NOTE: Error messages about crashed migration functions can be "
+ "ignored for feature~n"
+ " flags other than `~s`~n"
+ " because they assume they run inside RabbitMQ.",
+ [FeatureName]),
+ Syncer = spawn_link(
+ fun() ->
+ ok =
+ run_on_node(
+ NewNode,
+ fun() ->
+ ?assertEqual(
+ ok,
+ rabbit_feature_flags:
+ sync_feature_flags_with_cluster(
+ Nodes, true)),
+ ok
+ end, [])
+ end),
+
+ ct:pal(
+ "Checking the feature flag is enabled in the initial cluster"),
+ _ = [ok =
+ run_on_node(
+ Node,
+ fun() ->
+ ?assert(
+ rabbit_feature_flags:is_enabled(
+ FeatureName, non_blocking)),
+ ok
+ end,
+ [])
+ || Node <- Nodes],
+
+ ct:pal("Checking the feature flag is still disabled on the new node"),
+ ok = run_on_node(
+ NewNode,
+ fun() ->
+ ?assertNot(rabbit_feature_flags:is_enabled(FeatureName)),
+ ok
+ end,
+ []),
+
+ %% Unblock the migration functions on `Nodes'.
+ EnablerMRef = erlang:monitor(process, Enabler),
+ SyncerMRef = erlang:monitor(process, Syncer),
+ unlink(Enabler),
+ unlink(Syncer),
+
+ %% The migration function runs on all clustered nodes with v2, including
+ %% the one joining the cluster, thanks to the synchronization.
+ %%
+ %% When this testcase runs with feature flags v1, the feature flag we want
+ %% to enable uses the migration function API v2: this implicitly enables
+ %% `feature_flags_v2'. As part of the synchronization, the node still on
+ %% feature flags v1 will try to sync `feature_flags_v2' specificaly first.
+ %% After that, the controller-based sync proceeds.
+ ExpectedNodes = Nodes ++ [NewNode],
+
+ %% Unblock the migration function for which we already consumed the
+ %% `waiting' notification.
+ FirstMigratedNode = node(FirstNodeMigFunPid),
+ ct:pal(
+ "Unblocking first node (~p @ ~s)",
+ [FirstNodeMigFunPid, FirstMigratedNode]),
+ FirstNodeMigFunPid ! proceed,
+
+ %% Unblock the rest and collect the node names of all migration functions
+ %% which ran.
+ ct:pal("Unblocking other nodes, including the joining one"),
+ OtherMigratedNodes = [receive
+ {Node, MigFunPid2, waiting} ->
+ MigFunPid2 ! proceed,
+ Node
+ end || Node <- ExpectedNodes -- [FirstMigratedNode]],
+ MigratedNodes = [FirstMigratedNode | OtherMigratedNodes],
+ ?assertEqual(lists:sort(ExpectedNodes), lists:sort(MigratedNodes)),
+
+ ct:pal("Waiting for spawned processes to terminate"),
+ receive
+ {'DOWN', EnablerMRef, process, Enabler, EnablerReason} ->
+ ?assertEqual(normal, EnablerReason)
+ end,
+ receive
+ {'DOWN', SyncerMRef, process, Syncer, SyncerReason} ->
+ ?assertEqual(normal, SyncerReason)
+ end,
+
+ ct:pal("Checking the feature flag is enabled in the expanded cluster"),
+ _ = [ok =
+ run_on_node(
+ Node,
+ fun() ->
+ ?assert(rabbit_feature_flags:is_enabled(FeatureName)),
+ ?assertEqual(
+ 1,
+ persistent_term:get(?PT_MIGRATION_FUN_RUNS, 0)),
+ ok
+ end,
+ [])
+ || Node <- AllNodes],
+ ok.
diff --git a/deps/rabbit/test/feature_flags_with_unpriveleged_user_SUITE.erl b/deps/rabbit/test/feature_flags_with_unpriveleged_user_SUITE.erl
index 96cb0b542a..0ae07f009d 100644
--- a/deps/rabbit/test/feature_flags_with_unpriveleged_user_SUITE.erl
+++ b/deps/rabbit/test/feature_flags_with_unpriveleged_user_SUITE.erl
@@ -53,7 +53,6 @@ init_per_suite(Config) ->
end_per_suite(Config) ->
feature_flags_SUITE:end_per_suite(Config).
-
init_per_group(enabling_in_cluster, Config) ->
case rabbit_ct_helpers:is_mixed_versions() of
true ->
diff --git a/deps/rabbit_common/src/rabbit_misc.erl b/deps/rabbit_common/src/rabbit_misc.erl
index 95fdd9d627..811dcfbe73 100644
--- a/deps/rabbit_common/src/rabbit_misc.erl
+++ b/deps/rabbit_common/src/rabbit_misc.erl
@@ -52,6 +52,7 @@
-export([dict_cons/3, orddict_cons/3, maps_cons/3, gb_trees_cons/3]).
-export([gb_trees_fold/3, gb_trees_foreach/2]).
-export([all_module_attributes/1,
+ rabbitmq_related_apps/0,
rabbitmq_related_module_attributes/1,
module_attributes_from_apps/2,
build_acyclic_graph/3]).
diff --git a/deps/rabbitmq_cli/test/ctl/enable_feature_flag_test.exs b/deps/rabbitmq_cli/test/ctl/enable_feature_flag_test.exs
index f8a3e62920..75815151ee 100644
--- a/deps/rabbitmq_cli/test/ctl/enable_feature_flag_test.exs
+++ b/deps/rabbitmq_cli/test/ctl/enable_feature_flag_test.exs
@@ -23,7 +23,8 @@ defmodule EnableFeatureFlagCommandTest do
provided_by: :EnableFeatureFlagCommandTest,
stability: :stable}}
:ok = :rabbit_misc.rpc_call(
- node, :rabbit_feature_flags, :initialize_registry, [new_feature_flags])
+ node, :rabbit_feature_flags, :inject_test_feature_flags,
+ [new_feature_flags])
{
:ok,
diff --git a/deps/rabbitmq_cli/test/ctl/list_feature_flags_command_test.exs b/deps/rabbitmq_cli/test/ctl/list_feature_flags_command_test.exs
index b2cf1ad52a..1d9ace0fe4 100644
--- a/deps/rabbitmq_cli/test/ctl/list_feature_flags_command_test.exs
+++ b/deps/rabbitmq_cli/test/ctl/list_feature_flags_command_test.exs
@@ -28,7 +28,8 @@ defmodule ListFeatureFlagsCommandTest do
provided_by: :ListFeatureFlagsCommandTest,
stability: :stable}}
:ok = :rabbit_misc.rpc_call(
- node, :rabbit_feature_flags, :initialize_registry, [new_feature_flags])
+ node, :rabbit_feature_flags, :inject_test_feature_flags,
+ [new_feature_flags])
:ok = :rabbit_misc.rpc_call(
node, :rabbit_feature_flags, :enable_all, [])
diff --git a/deps/rabbitmq_ct_helpers/src/cth_log_redirect_any_domains.erl b/deps/rabbitmq_ct_helpers/src/cth_log_redirect_any_domains.erl
new file mode 100644
index 0000000000..4caa1cf6fd
--- /dev/null
+++ b/deps/rabbitmq_ct_helpers/src/cth_log_redirect_any_domains.erl
@@ -0,0 +1,23 @@
+-module(cth_log_redirect_any_domains).
+
+-export([log/2]).
+
+-define(BACKEND_MODULE, cth_log_redirect).
+
+%% Reversed behavior compared to `cth_log_redirect': log events with an
+%% unknown domain are sent to the `cth_log_redirect' server, others are
+%% dropped (as they are already handled by `cth_log_redirect').
+log(#{msg:={report,_Msg},meta:=#{domain:=[otp,sasl]}},_Config) ->
+ ok;
+log(#{meta:=#{domain:=[otp]}},_Config) ->
+ ok;
+log(#{meta:=#{domain:=_}}=Log,Config) ->
+ do_log(add_log_category(Log,error_logger),Config);
+log(_Log,_Config) ->
+ ok.
+
+add_log_category(#{meta:=Meta}=Log,Category) ->
+ Log#{meta=>Meta#{?BACKEND_MODULE=>#{category=>Category}}}.
+
+do_log(Log,Config) ->
+ gen_server:call(?BACKEND_MODULE,{log,Log,Config}).
diff --git a/deps/rabbitmq_ct_helpers/src/rabbit_ct_helpers.erl b/deps/rabbitmq_ct_helpers/src/rabbit_ct_helpers.erl
index b72f4f244c..6b62f59b98 100644
--- a/deps/rabbitmq_ct_helpers/src/rabbit_ct_helpers.erl
+++ b/deps/rabbitmq_ct_helpers/src/rabbit_ct_helpers.erl
@@ -7,6 +7,7 @@
-module(rabbit_ct_helpers).
+-include_lib("kernel/include/logger.hrl").
-include_lib("common_test/include/ct.hrl").
-deprecated({is_mixed_versions,1,"Use is_mixed_versions/0 instead"}).
@@ -26,6 +27,7 @@
load_rabbitmqctl_app/1,
ensure_rabbitmq_plugins_cmd/1,
ensure_rabbitmq_queues_cmd/1,
+ redirect_logger_to_ct_logs/1,
init_skip_as_error_flag/1,
start_long_running_testsuite_monitor/1,
stop_long_running_testsuite_monitor/1,
@@ -155,6 +157,34 @@ run_steps(Config, [Step | Rest]) ->
run_steps(Config, []) ->
Config.
+redirect_logger_to_ct_logs(Config) ->
+ ct:pal(
+ ?LOW_IMPORTANCE,
+ "Configuring logger to send logs to common_test logs"),
+ logger:set_handler_config(cth_log_redirect, level, debug),
+
+ %% Let's use the same format as RabbitMQ itself.
+ logger:set_handler_config(
+ cth_log_redirect, formatter,
+ rabbit_prelaunch_early_logging:default_file_formatter(#{})),
+
+ %% We use an addition logger handler for messages tagged with a non-OTP
+ %% domain because by default, `cth_log_redirect' drop them.
+ {ok, LogCfg0} = logger:get_handler_config(cth_log_redirect),
+ LogCfg = maps:remove(id, maps:remove(module, LogCfg0)),
+ ok = logger:add_handler(
+ cth_log_redirect_any_domains, cth_log_redirect_any_domains,
+ LogCfg),
+
+ logger:remove_handler(default),
+
+ ct:pal(
+ ?LOW_IMPORTANCE,
+ "Logger configured to send logs to common_test logs; you should see "
+ "a message below saying so"),
+ ?LOG_INFO("Logger message logged to common_test logs"),
+ Config.
+
init_skip_as_error_flag(Config) ->
SkipAsError = case os:getenv("RABBITMQ_CT_SKIP_AS_ERROR") of
false -> false;