diff options
| author | Jean-Sébastien Pédron <jean-sebastien@rabbitmq.com> | 2022-06-28 13:33:59 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2022-06-28 13:33:59 +0200 |
| commit | a48a4a09c4ec255753df738910f4e23a5d1bb921 (patch) | |
| tree | 26b3c14eae488fa1ada416b404aede77eb8269b7 | |
| parent | 0fd051234ec167432f512d605178ada56aea9109 (diff) | |
| parent | da807547f58b2e785c11e436715f63b8e3383bf7 (diff) | |
| download | rabbitmq-server-git-a48a4a09c4ec255753df738910f4e23a5d1bb921.tar.gz | |
Merge pull request #3940 from rabbitmq/add-feature-flags-controller
Add a feature flags controller
| -rw-r--r-- | deps/rabbit/BUILD.bazel | 7 | ||||
| -rw-r--r-- | deps/rabbit/include/feature_flags.hrl | 13 | ||||
| -rw-r--r-- | deps/rabbit/src/rabbit.erl | 6 | ||||
| -rw-r--r-- | deps/rabbit/src/rabbit_core_ff.erl | 33 | ||||
| -rw-r--r-- | deps/rabbit/src/rabbit_feature_flags.erl | 1075 | ||||
| -rw-r--r-- | deps/rabbit/src/rabbit_ff_controller.erl | 1152 | ||||
| -rw-r--r-- | deps/rabbit/src/rabbit_ff_registry.erl | 20 | ||||
| -rw-r--r-- | deps/rabbit/src/rabbit_ff_registry_factory.erl | 631 | ||||
| -rw-r--r-- | deps/rabbit/src/rabbit_prelaunch_feature_flags.erl | 2 | ||||
| -rw-r--r-- | deps/rabbit/test/feature_flags_SUITE.erl | 191 | ||||
| -rw-r--r-- | deps/rabbit/test/feature_flags_v2_SUITE.erl | 1540 | ||||
| -rw-r--r-- | deps/rabbit/test/feature_flags_with_unpriveleged_user_SUITE.erl | 1 | ||||
| -rw-r--r-- | deps/rabbit_common/src/rabbit_misc.erl | 1 | ||||
| -rw-r--r-- | deps/rabbitmq_cli/test/ctl/enable_feature_flag_test.exs | 3 | ||||
| -rw-r--r-- | deps/rabbitmq_cli/test/ctl/list_feature_flags_command_test.exs | 3 | ||||
| -rw-r--r-- | deps/rabbitmq_ct_helpers/src/cth_log_redirect_any_domains.erl | 23 | ||||
| -rw-r--r-- | deps/rabbitmq_ct_helpers/src/rabbit_ct_helpers.erl | 30 |
17 files changed, 3938 insertions, 793 deletions
diff --git a/deps/rabbit/BUILD.bazel b/deps/rabbit/BUILD.bazel index 465b15b36c..4e5b6ee4d2 100644 --- a/deps/rabbit/BUILD.bazel +++ b/deps/rabbit/BUILD.bazel @@ -432,13 +432,18 @@ suites = [ name = "feature_flags_SUITE", size = "large", flaky = True, - shard_count = 5, + shard_count = 9, runtime_deps = [ "//deps/rabbit/test/feature_flags_SUITE_data/my_plugin:erlang_app", ], ), rabbitmq_integration_suite( PACKAGE, + name = "feature_flags_v2_SUITE", + size = "large", + ), + rabbitmq_integration_suite( + PACKAGE, name = "feature_flags_with_unpriveleged_user_SUITE", size = "large", additional_beam = [ diff --git a/deps/rabbit/include/feature_flags.hrl b/deps/rabbit/include/feature_flags.hrl new file mode 100644 index 0000000000..b90bdd1063 --- /dev/null +++ b/deps/rabbit/include/feature_flags.hrl @@ -0,0 +1,13 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2022 VMware, Inc. or its affiliates. All rights reserved. +%% + +-record(ffcommand, { + name :: rabbit_feature_flags:feature_name(), + props :: rabbit_feature_flags:feature_props_extended(), + command :: enable | post_enable, + extra :: #{nodes := [node()]} + }). diff --git a/deps/rabbit/src/rabbit.erl b/deps/rabbit/src/rabbit.erl index ff6a1da1b0..5f387412de 100644 --- a/deps/rabbit/src/rabbit.erl +++ b/deps/rabbit/src/rabbit.erl @@ -851,6 +851,12 @@ start(normal, []) -> log_motd(), {ok, SupPid} = rabbit_sup:start_link(), + %% When we load plugins later in this function, we refresh feature + %% flags. If `feature_flags_v2' is enabled, `rabbit_ff_controller' + %% will be used. We start it now because we can't wait for boot steps + %% to do this (feature flags are refreshed before boot steps run). + ok = rabbit_sup:start_child(rabbit_ff_controller), + %% Compatibility with older RabbitMQ versions + required by %% rabbit_node_monitor:notify_node_up/0: %% diff --git a/deps/rabbit/src/rabbit_core_ff.erl b/deps/rabbit/src/rabbit_core_ff.erl index 661717c7aa..60075c99ba 100644 --- a/deps/rabbit/src/rabbit_core_ff.erl +++ b/deps/rabbit/src/rabbit_core_ff.erl @@ -7,21 +7,17 @@ -module(rabbit_core_ff). --export([classic_mirrored_queue_version_migration/3, - quorum_queue_migration/3, - stream_queue_migration/3, +-export([quorum_queue_migration/3, implicit_default_bindings_migration/3, virtual_host_metadata_migration/3, maintenance_mode_status_migration/3, user_limits_migration/3, - stream_single_active_consumer_migration/3, direct_exchange_routing_v2_migration/3]). -rabbit_feature_flag( {classic_mirrored_queue_version, #{desc => "Support setting version for classic mirrored queues", - stability => stable, - migration_fun => {?MODULE, classic_mirrored_queue_version_migration} + stability => stable }}). -rabbit_feature_flag( @@ -37,8 +33,7 @@ #{desc => "Support queues of type `stream`", doc_url => "https://www.rabbitmq.com/stream.html", stability => stable, - depends_on => [quorum_queue], - migration_fun => {?MODULE, stream_queue_migration} + depends_on => [quorum_queue] }}). -rabbit_feature_flag( @@ -75,8 +70,7 @@ #{desc => "Single active consumer for streams", doc_url => "https://www.rabbitmq.com/stream.html", stability => stable, - depends_on => [stream_queue], - migration_fun => {?MODULE, stream_single_active_consumer_migration} + depends_on => [stream_queue] }}). -rabbit_feature_flag( @@ -87,8 +81,11 @@ migration_fun => {?MODULE, direct_exchange_routing_v2_migration} }}). -classic_mirrored_queue_version_migration(_FeatureName, _FeatureProps, _Enable) -> - ok. +-rabbit_feature_flag( + {feature_flags_v2, + #{desc => "Feature flags subsystem V2", + stability => stable + }}). %% ------------------------------------------------------------------- %% Quorum queues. @@ -109,9 +106,6 @@ quorum_queue_migration(_FeatureName, _FeatureProps, is_enabled) -> mnesia:table_info(rabbit_queue, attributes) =:= Fields andalso mnesia:table_info(rabbit_durable_queue, attributes) =:= Fields. -stream_queue_migration(_FeatureName, _FeatureProps, _Enable) -> - ok. - migrate_to_amqqueue_with_type(FeatureName, [Table | Rest], Fields) -> rabbit_log_feature_flags:info( "Feature flag `~s`: migrating Mnesia table ~s...", @@ -209,15 +203,6 @@ user_limits_migration(_FeatureName, _FeatureProps, is_enabled) -> mnesia:table_info(rabbit_user, attributes) =:= internal_user:fields(internal_user_v2). %% ------------------------------------------------------------------- -%% Stream single active consumer. -%% ------------------------------------------------------------------- - -stream_single_active_consumer_migration(_FeatureName, _FeatureProps, enable) -> - ok; -stream_single_active_consumer_migration(_FeatureName, _FeatureProps, is_enabled) -> - undefined. - -%% ------------------------------------------------------------------- %% Direct exchange routing v2. %% ------------------------------------------------------------------- diff --git a/deps/rabbit/src/rabbit_feature_flags.erl b/deps/rabbit/src/rabbit_feature_flags.erl index 3cffec6a88..9961449407 100644 --- a/deps/rabbit/src/rabbit_feature_flags.erl +++ b/deps/rabbit/src/rabbit_feature_flags.erl @@ -45,7 +45,7 @@ %% == How to declare a feature flag == %% %% To define a new feature flag, you need to use the -%% `rabbit_feature_flag()' module attribute: +%% `-rabbit_feature_flag()' module attribute: %% %% ``` %% -rabbit_feature_flag(FeatureFlag). @@ -77,6 +77,12 @@ -module(rabbit_feature_flags). +-include_lib("kernel/include/logger.hrl"). + +-include_lib("rabbit_common/include/logging.hrl"). + +-include("feature_flags.hrl"). + -export([list/0, list/1, list/2, @@ -111,30 +117,32 @@ ]). %% RabbitMQ internal use only. --export([initialize_registry/0, - initialize_registry/1, - mark_as_enabled_locally/2, +-export([mark_as_enabled_locally/2, remote_nodes/0, running_remote_nodes/0, does_node_support/3, merge_feature_flags_from_unknown_apps/1, - do_sync_feature_flags_with_node/1]). + do_sync_feature_flags_with_node/1, + enabled_feature_flags_to_feature_states/1, + inject_test_feature_flags/1, + query_supported_feature_flags/0, + read_enabled_feature_flags_list/0, + run_migration_fun/3, + uses_migration_fun_v2/1, + uses_migration_fun_v2/2]). -ifdef(TEST). --export([inject_test_feature_flags/1, - initialize_registry/3, - query_supported_feature_flags/0, - mark_as_enabled_remotely/2, - mark_as_enabled_remotely/4, - registry_loading_lock/0]). +-export([mark_as_enabled_remotely/4, + override_nodes/1, + override_running_nodes/1, + get_overriden_nodes/0, + get_overriden_running_nodes/0, + share_new_feature_flags_after_app_load/2]). -endif. %% Default timeout for operations on remote nodes. -define(TIMEOUT, 60000). --define(FF_REGISTRY_LOADING_LOCK, {feature_flags_registry_loading, self()}). --define(FF_STATE_CHANGE_LOCK, {feature_flags_state_change, self()}). - -type feature_flag_modattr() :: {feature_name(), feature_props()}. %% The value of a `-rabbitmq_feature_flag()' module attribute used to @@ -200,18 +208,26 @@ -type feature_states() :: #{feature_name() => feature_state()}. -type stability() :: stable | experimental. -%% The level of stability of a feature flag. Currently, only informational. +%% The level of stability of a feature flag. +%% +%% Experimental feature flags are not enabled by default on a fresh RabbitMQ +%% node. They must be enabled by the user. --type migration_fun_name() :: {Module :: atom(), Function :: atom()}. +-type migration_fun_name() :: {Module :: module(), Function :: atom()}. %% The name of the module and function to call when changing the state of %% the feature flag. --type migration_fun() :: fun((feature_name(), - feature_props_extended(), - migration_fun_context()) - -> ok | {error, any()} | % context = enable - boolean() | undefined). % context = is_enabled +-type migration_fun() :: migration_fun_v1() | migration_fun_v2(). %% The migration function signature. + +-type migration_fun_context() :: enable | is_enabled. + +-type migration_fun_v1() :: fun((feature_name(), + feature_props_extended(), + migration_fun_context()) + -> ok | {error, any()} | + boolean() | undefined). +%% The migration function signature (v1). %% %% It is called with context `enable' when a feature flag is being enabled. %% The function is responsible for this feature-flag-specific verification @@ -226,9 +242,53 @@ %% is actually enabled. It is useful on RabbitMQ startup, just in case %% the previous instance failed to write the feature flags list file. --type migration_fun_context() :: enable | is_enabled. +-type ffcommand() :: #ffcommand{}. --type registry_vsn() :: term(). +-type migration_fun_v2() :: fun((ffcommand()) -> ok | no_return()). +%% The migration function signature (v2). +%% +%% The migration function is called on all nodes which fulfill the following +%% conditions: +%% <ol> +%% <li>The node knows the feature flag.</li> +%% <li>The feature flag is disabled on that node before calling the migration +%% function.</li> +%% </ol> +%% +%% All executions of the migration function on these nodes will run in +%% parallel (concurrently). The migration function is responsible for its own +%% locking and synchronization. +%% +%% It is first called with the command `enable' when a feature flag is being +%% enabled. The function is responsible for this feature-flag-specific +%% verification and data conversion. It returns `ok' if RabbitMQ can mark the +%% feature flag as enabled an continue with the next one, if any. Other return +%% values or exceptions are an error and the feature flag should remain +%% disabled. +%% +%% It is then called with the command `post_enable' after a feature flag has +%% been marked as enabled. The return value or enay exceptions are ignored and +%% the feature flag will remain enabled even if there is a failure. +%% +%% When a node is joining a cluster where one side has a feature flag enabled, +%% that feature flag will be enabled on the other side. It means the migration +%% function will run on the nodes where it is disabled. Therefore the +%% migration function can run in clusters where some nodes previously executed +%% it and the feature flag was already enabled. +%% +%% The migration function should also be idempotent. For instance, if the +%% feature flag couldn't be marked as enabled everywhere after the migration +%% function was called with the `enable' command, it may be called again. + +-type inventory() :: #{applications := [atom()], + feature_flags := feature_flags(), + states := feature_states()}. + +-type cluster_inventory() :: #{feature_flags := feature_flags(), + applications_per_node := + #{node() => [atom()]}, + states_per_node := + #{node() => feature_states()}}. -export_type([feature_flag_modattr/0, feature_props/0, @@ -240,7 +300,12 @@ stability/0, migration_fun_name/0, migration_fun/0, - migration_fun_context/0]). + migration_fun_v1/0, + migration_fun_v2/0, + migration_fun_context/0, + ffcommand/0, + inventory/0, + cluster_inventory/0]). -on_load(on_load/0). @@ -296,36 +361,85 @@ list(Which, Stability) %% dependency tree are left unchanged). enable(FeatureName) when is_atom(FeatureName) -> + case is_enabled(feature_flags_v2) of + true -> + rabbit_ff_controller:enable(FeatureName); + false -> + case requires_feature_flags_v2(FeatureName) of + true -> + ?LOG_DEBUG( + "Feature flags: `~s` uses the migration function " + "API v2 and thus requires `feature_flags_v2; " + "enabling the latter first", + [FeatureName], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + enable([feature_flags_v2, FeatureName]); + false -> + enable_v1(FeatureName) + end + end; +enable(FeatureNames) when is_list(FeatureNames) -> + FeatureNames1 = sort_feature_flags_v2_first(FeatureNames), + with_feature_flags(FeatureNames1, fun enable/1). + +sort_feature_flags_v2_first(FeatureNames) -> + lists:sort( + fun + (feature_flags_v2, _) -> true; + (_, feature_flags_v2) -> false; + (A, B) -> A =< B + end, FeatureNames). + +requires_feature_flags_v2(FeatureName) -> + uses_migration_fun_v2(FeatureName). + +uses_migration_fun_v2(FeatureName) -> + case rabbit_ff_registry:get(FeatureName) of + undefined -> + false; + FeatureProps -> + case maps:get(migration_fun, FeatureProps, none) of + {MigrationMod, MigrationFun} + when is_atom(MigrationMod) andalso is_atom(MigrationFun) -> + uses_migration_fun_v2(MigrationMod, MigrationFun); + _ -> + false + end + end. + +uses_migration_fun_v2(MigrationMod, MigrationFun) -> + erlang:function_exported(MigrationMod, MigrationFun, 1). + +enable_v1(FeatureName) -> rabbit_log_feature_flags:debug( - "Feature flag `~s`: REQUEST TO ENABLE", + "Feature flags: `~s`: REQUEST TO ENABLE", [FeatureName]), case is_enabled(FeatureName) of true -> rabbit_log_feature_flags:debug( - "Feature flag `~s`: already enabled", + "Feature flags: `~s`: already enabled", [FeatureName]), ok; false -> rabbit_log_feature_flags:debug( - "Feature flag `~s`: not enabled, check if supported by cluster", + "Feature flags: `~s`: not enabled, check if supported by " + "cluster", [FeatureName]), %% The feature flag must be supported locally and remotely %% (i.e. by all members of the cluster). case is_supported(FeatureName) of true -> rabbit_log_feature_flags:info( - "Feature flag `~s`: supported, attempt to enable...", + "Feature flags: `~s`: supported, attempt to enable...", [FeatureName]), do_enable(FeatureName); false -> rabbit_log_feature_flags:error( - "Feature flag `~s`: not supported", + "Feature flags: `~s`: not supported", [FeatureName]), {error, unsupported} end - end; -enable(FeatureNames) when is_list(FeatureNames) -> - with_feature_flags(FeatureNames, fun enable/1). + end. -spec enable_all() -> ok | {error, any()}. %% @doc @@ -355,7 +469,7 @@ enable_all() -> enable_all(Stability) when Stability =:= stable orelse Stability =:= experimental -> - with_feature_flags(maps:keys(list(all, Stability)), fun enable/1). + enable(maps:keys(list(all, Stability))). -spec disable(feature_name() | [feature_name()]) -> ok | {error, any()}. %% @doc @@ -412,8 +526,13 @@ with_feature_flags([], _) -> %% `false' if one of them is not or the RPC timed out. is_supported(FeatureNames) -> - is_supported_locally(FeatureNames) andalso - is_supported_remotely(FeatureNames). + case is_enabled(feature_flags_v2) of + true -> + rabbit_ff_controller:is_supported(FeatureNames); + false -> + is_supported_locally(FeatureNames) andalso + is_supported_remotely(FeatureNames) + end. -spec is_supported(feature_name() | [feature_name()], timeout()) -> boolean(). @@ -431,8 +550,13 @@ is_supported(FeatureNames) -> %% `false' if one of them is not or the RPC timed out. is_supported(FeatureNames, Timeout) -> - is_supported_locally(FeatureNames) andalso - is_supported_remotely(FeatureNames, Timeout). + case is_enabled(feature_flags_v2) of + true -> + rabbit_ff_controller:is_supported(FeatureNames, Timeout); + false -> + is_supported_locally(FeatureNames) andalso + is_supported_remotely(FeatureNames, Timeout) + end. -spec is_supported_locally(feature_name() | [feature_name()]) -> boolean(). %% @doc @@ -578,8 +702,8 @@ is_enabled(FeatureNames, non_blocking) -> is_enabled(FeatureNames, blocking) -> case is_enabled_nb(FeatureNames) of state_changing -> - global:set_lock(?FF_STATE_CHANGE_LOCK), - global:del_lock(?FF_STATE_CHANGE_LOCK), + rabbit_ff_registry_factory:acquire_state_change_lock(), + rabbit_ff_registry_factory:release_state_change_lock(), is_enabled(FeatureNames, blocking); IsEnabled -> IsEnabled @@ -751,327 +875,68 @@ init() -> _ = list(all), ok. --spec initialize_registry() -> ok | {error, any()} | no_return(). -%% @private -%% @doc -%% Initializes or reinitializes the registry. -%% -%% The registry is an Erlang module recompiled at runtime to hold the -%% state of all supported feature flags. -%% -%% That Erlang module is called {@link rabbit_ff_registry}. The initial -%% source code of this module simply calls this function so it is -%% replaced by a proper registry. -%% -%% Once replaced, the registry contains the map of all supported feature -%% flags and their state. This makes it very efficient to query a -%% feature flag state or property. -%% -%% The registry is local to all RabbitMQ nodes. - -initialize_registry() -> - initialize_registry(#{}). - --spec initialize_registry(feature_flags()) -> - ok | {error, any()} | no_return(). -%% @private -%% @doc -%% Initializes or reinitializes the registry. -%% -%% See {@link initialize_registry/0} for a description of the registry. -%% -%% This function takes a map of new supported feature flags (so their -%% name and extended properties) to add to the existing known feature -%% flags. - -initialize_registry(NewSupportedFeatureFlags) -> - %% The first step is to get the feature flag states: if this is the - %% first time we initialize it, we read the list from disk (the - %% `feature_flags` file). Otherwise we query the existing registry - %% before it is replaced. - RegistryInitialized = rabbit_ff_registry:is_registry_initialized(), - FeatureStates = case RegistryInitialized of - true -> - rabbit_ff_registry:states(); - false -> - EnabledFeatureNames = - read_enabled_feature_flags_list(), - list_of_enabled_feature_flags_to_feature_states( - EnabledFeatureNames) - end, - - %% We also record if the feature flags state was correctly written - %% to disk. Currently we don't use this information, but in the - %% future, we might want to retry the write if it failed so far. - %% - %% TODO: Retry to write the feature flags state if the first try - %% failed. - WrittenToDisk = case RegistryInitialized of - true -> - rabbit_ff_registry:is_registry_written_to_disk(); - false -> - true - end, - initialize_registry(NewSupportedFeatureFlags, - FeatureStates, - WrittenToDisk). - --spec list_of_enabled_feature_flags_to_feature_states([feature_name()]) -> - feature_states(). - -list_of_enabled_feature_flags_to_feature_states(FeatureNames) -> - maps:from_list([{FeatureName, true} || FeatureName <- FeatureNames]). - --spec initialize_registry(feature_flags(), - feature_states(), - boolean()) -> - ok | {error, any()} | no_return(). -%% @private -%% @doc -%% Initializes or reinitializes the registry. -%% -%% See {@link initialize_registry/0} for a description of the registry. -%% -%% This function takes a map of new supported feature flags (so their -%% name and extended properties) to add to the existing known feature -%% flags, a map of the new feature flag states (whether they are -%% enabled, disabled or `state_changing'), and a flag to indicate if the -%% feature flag states was recorded to disk. -%% -%% The latter is used to block callers asking if a feature flag is -%% enabled or disabled while its state is changing. - -initialize_registry(NewSupportedFeatureFlags, - NewFeatureStates, - WrittenToDisk) -> - Ret = maybe_initialize_registry(NewSupportedFeatureFlags, - NewFeatureStates, - WrittenToDisk), - case Ret of - ok -> ok; - restart -> initialize_registry(NewSupportedFeatureFlags, - NewFeatureStates, - WrittenToDisk); - Error -> Error - end. - --spec maybe_initialize_registry(feature_flags(), - feature_states(), - boolean()) -> - ok | restart | {error, any()} | no_return(). - -maybe_initialize_registry(NewSupportedFeatureFlags, - NewFeatureStates, - WrittenToDisk) -> - %% We save the version of the current registry before computing - %% the new one. This is used when we do the actual reload: if the - %% current registry was reloaded in the meantime, we need to restart - %% the computation to make sure we don't loose data. - RegistryVsn = registry_vsn(), - - %% We take the feature flags already registered. - RegistryInitialized = rabbit_ff_registry:is_registry_initialized(), - KnownFeatureFlags1 = case RegistryInitialized of - true -> rabbit_ff_registry:list(all); - false -> #{} - end, - - %% Query the list (it's a map to be exact) of known - %% supported feature flags. That list comes from the - %% `-rabbitmq_feature_flag().` module attributes exposed by all - %% currently loaded Erlang modules. - KnownFeatureFlags2 = query_supported_feature_flags(), - - %% We merge the feature flags we already knew about - %% (KnownFeatureFlags1), those found in the loaded applications - %% (KnownFeatureFlags2) and those specified in arguments - %% (NewSupportedFeatureFlags). The latter come from remote nodes - %% usually: for example, they can come from plugins loaded on remote - %% node but the plugins are missing locally. In this case, we - %% consider those feature flags supported because there is no code - %% locally which would cause issues. - %% - %% It means that the list of feature flags only grows. we don't try - %% to clean it at some point because we want to remember about the - %% feature flags we saw (and their state). It should be fine because - %% that list should remain small. - KnownFeatureFlags = maps:merge(KnownFeatureFlags1, - KnownFeatureFlags2), - AllFeatureFlags = maps:merge(KnownFeatureFlags, - NewSupportedFeatureFlags), - - %% Next we want to update the feature states, based on the new - %% states passed as arguments. - FeatureStates0 = case RegistryInitialized of - true -> - maps:merge(rabbit_ff_registry:states(), - NewFeatureStates); - false -> - NewFeatureStates - end, - FeatureStates = maps:filter( - fun(_, true) -> true; - (_, state_changing) -> true; - (_, false) -> false - end, FeatureStates0), - - Proceed = does_registry_need_refresh(AllFeatureFlags, - FeatureStates, - WrittenToDisk), - - case Proceed of - true -> - rabbit_log_feature_flags:debug( - "Feature flags: (re)initialize registry (~p)", - [self()]), - T0 = erlang:timestamp(), - Ret = do_initialize_registry(RegistryVsn, - AllFeatureFlags, - FeatureStates, - WrittenToDisk), - T1 = erlang:timestamp(), - rabbit_log_feature_flags:debug( - "Feature flags: time to regen registry: ~p µs", - [timer:now_diff(T1, T0)]), - Ret; - false -> - rabbit_log_feature_flags:debug( - "Feature flags: registry already up-to-date, skipping init"), - ok - end. - --spec does_registry_need_refresh(feature_flags(), - feature_states(), - boolean()) -> - boolean(). - -does_registry_need_refresh(AllFeatureFlags, - FeatureStates, - WrittenToDisk) -> - case rabbit_ff_registry:is_registry_initialized() of - true -> - %% Before proceeding with the actual - %% (re)initialization, let's see if there are any - %% changes. - CurrentAllFeatureFlags = rabbit_ff_registry:list(all), - CurrentFeatureStates = rabbit_ff_registry:states(), - CurrentWrittenToDisk = - rabbit_ff_registry:is_registry_written_to_disk(), - - if - AllFeatureFlags =/= CurrentAllFeatureFlags -> - rabbit_log_feature_flags:debug( - "Feature flags: registry refresh needed: " - "yes, list of feature flags differs"), - true; - FeatureStates =/= CurrentFeatureStates -> - rabbit_log_feature_flags:debug( - "Feature flags: registry refresh needed: " - "yes, feature flag states differ"), - true; - WrittenToDisk =/= CurrentWrittenToDisk -> - rabbit_log_feature_flags:debug( - "Feature flags: registry refresh needed: " - "yes, \"written to disk\" state changed"), - true; - true -> - rabbit_log_feature_flags:debug( - "Feature flags: registry refresh needed: no"), - false - end; - false -> - rabbit_log_feature_flags:debug( - "Feature flags: registry refresh needed: " - "yes, first-time initialization"), - true - end. - --spec do_initialize_registry(registry_vsn(), - feature_flags(), - feature_states(), - boolean()) -> - ok | restart | {error, any()} | no_return(). -%% @private - -do_initialize_registry(RegistryVsn, - AllFeatureFlags, - FeatureStates, - WrittenToDisk) -> - %% We log the state of those feature flags. - rabbit_log_feature_flags:info( - "Feature flags: list of feature flags found:"), - lists:foreach( - fun(FeatureName) -> - rabbit_log_feature_flags:info( - "Feature flags: [~s] ~s", - [case maps:is_key(FeatureName, FeatureStates) of - true -> - case maps:get(FeatureName, FeatureStates) of - true -> "x"; - state_changing -> "~" - end; - false -> - " " - end, - FeatureName]) - end, lists:sort(maps:keys(AllFeatureFlags))), - rabbit_log_feature_flags:info( - "Feature flags: feature flag states written to disk: ~s", - [case WrittenToDisk of - true -> "yes"; - false -> "no" - end]), - - %% We request the registry to be regenerated and reloaded with the - %% new state. - regen_registry_mod(RegistryVsn, - AllFeatureFlags, - FeatureStates, - WrittenToDisk). - --spec query_supported_feature_flags() -> feature_flags(). -%% @private - --ifdef(TEST). -define(PT_TESTSUITE_ATTRS, {?MODULE, testsuite_feature_flags_attrs}). -inject_test_feature_flags(AttributesFromTestsuite) -> +inject_test_feature_flags(FeatureFlags) -> + ExistingAppAttrs = module_attributes_from_testsuite(), + FeatureFlagsPerApp0 = lists:foldl( + fun({Origin, Origin, FFlags}, Acc) -> + Acc#{Origin => maps:from_list(FFlags)} + end, #{}, ExistingAppAttrs), + FeatureFlagsPerApp1 = maps:fold( + fun(FeatureName, FeatureProps, Acc) -> + Origin = case FeatureProps of + #{provided_by := App} -> + App; + _ -> + '$injected' + end, + FFlags0 = maps:get(Origin, Acc, #{}), + FFlags1 = FFlags0#{ + FeatureName => FeatureProps}, + Acc#{Origin => FFlags1} + end, FeatureFlagsPerApp0, FeatureFlags), + AttributesFromTestsuite = maps:fold( + fun(Origin, FFlags, Acc) -> + [{Origin, % Application + Origin, % Module + maps:to_list(FFlags)} | Acc] + end, [], FeatureFlagsPerApp1), rabbit_log_feature_flags:debug( - "Feature flags: injecting feature flags from testsuite: ~p", - [AttributesFromTestsuite]), + "Feature flags: injecting feature flags from testsuite: ~p~n" + "Feature flags: all injected feature flags: ~p", + [FeatureFlags, AttributesFromTestsuite]), ok = persistent_term:put(?PT_TESTSUITE_ATTRS, AttributesFromTestsuite), - initialize_registry(). + rabbit_ff_registry_factory:initialize_registry(). module_attributes_from_testsuite() -> persistent_term:get(?PT_TESTSUITE_ATTRS, []). +-spec query_supported_feature_flags() -> {ScannedApps, FeatureFlags} when + ScannedApps :: [atom()], + FeatureFlags :: feature_flags(). +%% @private + query_supported_feature_flags() -> rabbit_log_feature_flags:debug( - "Feature flags: query feature flags in loaded applications " - "+ testsuite"), + "Feature flags: query feature flags in loaded applications"), T0 = erlang:timestamp(), - AttributesPerApp = rabbit_misc:rabbitmq_related_module_attributes( - rabbit_feature_flag), + %% We need to know the list of applications we scanned for feature flags. + %% We can't derive that list of the returned feature flags because an + %% application might be loaded/present and not have a specific feature + %% flag. In this case, the feature flag should be considered unsupported. + ScannedApps = rabbit_misc:rabbitmq_related_apps(), + AttributesPerApp = rabbit_misc:module_attributes_from_apps( + rabbit_feature_flag, ScannedApps), AttributesFromTestsuite = module_attributes_from_testsuite(), + TestsuiteProviders = [App || {App, _, _} <- AttributesFromTestsuite], T1 = erlang:timestamp(), rabbit_log_feature_flags:debug( "Feature flags: time to find supported feature flags: ~p µs", [timer:now_diff(T1, T0)]), AllAttributes = AttributesPerApp ++ AttributesFromTestsuite, - prepare_queried_feature_flags(AllAttributes, #{}). --else. -query_supported_feature_flags() -> - rabbit_log_feature_flags:debug( - "Feature flags: query feature flags in loaded applications"), - T0 = erlang:timestamp(), - AttributesPerApp = rabbit_misc:rabbitmq_related_module_attributes( - rabbit_feature_flag), - T1 = erlang:timestamp(), - rabbit_log_feature_flags:debug( - "Feature flags: time to find supported feature flags: ~p µs", - [timer:now_diff(T1, T0)]), - prepare_queried_feature_flags(AttributesPerApp, #{}). --endif. + AllApps = lists:usort(ScannedApps ++ TestsuiteProviders), + {AllApps, prepare_queried_feature_flags(AllAttributes, #{})}. prepare_queried_feature_flags([{App, _Module, Attributes} | Rest], AllFeatureFlags) -> @@ -1105,293 +970,6 @@ merge_new_feature_flags(AllFeatureFlags, App, FeatureName, FeatureProps) maps:merge(AllFeatureFlags, #{FeatureName => FeatureProps1}). --spec regen_registry_mod(registry_vsn(), - feature_flags(), - feature_states(), - boolean()) -> - ok | restart | {error, any()} | no_return(). -%% @private - -regen_registry_mod(RegistryVsn, - AllFeatureFlags, - FeatureStates, - WrittenToDisk) -> - %% Here, we recreate the source code of the `rabbit_ff_registry` - %% module from scratch. - %% - %% IMPORTANT: We want both modules to have the exact same public - %% API in order to simplify the life of developers and their tools - %% (Dialyzer, completion, and so on). - - %% -module(rabbit_ff_registry). - ModuleAttr = erl_syntax:attribute( - erl_syntax:atom(module), - [erl_syntax:atom(rabbit_ff_registry)]), - ModuleForm = erl_syntax:revert(ModuleAttr), - %% -export([...]). - ExportAttr = erl_syntax:attribute( - erl_syntax:atom(export), - [erl_syntax:list( - [erl_syntax:arity_qualifier( - erl_syntax:atom(F), - erl_syntax:integer(A)) - || {F, A} <- [{get, 1}, - {list, 1}, - {states, 0}, - {is_supported, 1}, - {is_enabled, 1}, - {is_registry_initialized, 0}, - {is_registry_written_to_disk, 0}]] - ) - ] - ), - ExportForm = erl_syntax:revert(ExportAttr), - %% get(_) -> ... - GetClauses = [erl_syntax:clause( - [erl_syntax:atom(FeatureName)], - [], - [erl_syntax:abstract(maps:get(FeatureName, - AllFeatureFlags))]) - || FeatureName <- maps:keys(AllFeatureFlags) - ], - GetUnknownClause = erl_syntax:clause( - [erl_syntax:variable("_")], - [], - [erl_syntax:atom(undefined)]), - GetFun = erl_syntax:function( - erl_syntax:atom(get), - GetClauses ++ [GetUnknownClause]), - GetFunForm = erl_syntax:revert(GetFun), - %% list(_) -> ... - ListAllBody = erl_syntax:abstract(AllFeatureFlags), - ListAllClause = erl_syntax:clause([erl_syntax:atom(all)], - [], - [ListAllBody]), - EnabledFeatureFlags = maps:filter( - fun(FeatureName, _) -> - maps:is_key(FeatureName, - FeatureStates) - andalso - maps:get(FeatureName, FeatureStates) - =:= - true - end, AllFeatureFlags), - ListEnabledBody = erl_syntax:abstract(EnabledFeatureFlags), - ListEnabledClause = erl_syntax:clause( - [erl_syntax:atom(enabled)], - [], - [ListEnabledBody]), - DisabledFeatureFlags = maps:filter( - fun(FeatureName, _) -> - not maps:is_key(FeatureName, - FeatureStates) - end, AllFeatureFlags), - ListDisabledBody = erl_syntax:abstract(DisabledFeatureFlags), - ListDisabledClause = erl_syntax:clause( - [erl_syntax:atom(disabled)], - [], - [ListDisabledBody]), - StateChangingFeatureFlags = maps:filter( - fun(FeatureName, _) -> - maps:is_key(FeatureName, - FeatureStates) - andalso - maps:get(FeatureName, FeatureStates) - =:= - state_changing - end, AllFeatureFlags), - ListStateChangingBody = erl_syntax:abstract(StateChangingFeatureFlags), - ListStateChangingClause = erl_syntax:clause( - [erl_syntax:atom(state_changing)], - [], - [ListStateChangingBody]), - ListFun = erl_syntax:function( - erl_syntax:atom(list), - [ListAllClause, - ListEnabledClause, - ListDisabledClause, - ListStateChangingClause]), - ListFunForm = erl_syntax:revert(ListFun), - %% states() -> ... - StatesBody = erl_syntax:abstract(FeatureStates), - StatesClause = erl_syntax:clause([], [], [StatesBody]), - StatesFun = erl_syntax:function( - erl_syntax:atom(states), - [StatesClause]), - StatesFunForm = erl_syntax:revert(StatesFun), - %% is_supported(_) -> ... - IsSupportedClauses = [erl_syntax:clause( - [erl_syntax:atom(FeatureName)], - [], - [erl_syntax:atom(true)]) - || FeatureName <- maps:keys(AllFeatureFlags) - ], - NotSupportedClause = erl_syntax:clause( - [erl_syntax:variable("_")], - [], - [erl_syntax:atom(false)]), - IsSupportedFun = erl_syntax:function( - erl_syntax:atom(is_supported), - IsSupportedClauses ++ [NotSupportedClause]), - IsSupportedFunForm = erl_syntax:revert(IsSupportedFun), - %% is_enabled(_) -> ... - IsEnabledClauses = [erl_syntax:clause( - [erl_syntax:atom(FeatureName)], - [], - [case maps:is_key(FeatureName, FeatureStates) of - true -> - erl_syntax:atom( - maps:get(FeatureName, FeatureStates)); - false -> - erl_syntax:atom(false) - end]) - || FeatureName <- maps:keys(AllFeatureFlags) - ], - NotEnabledClause = erl_syntax:clause( - [erl_syntax:variable("_")], - [], - [erl_syntax:atom(false)]), - IsEnabledFun = erl_syntax:function( - erl_syntax:atom(is_enabled), - IsEnabledClauses ++ [NotEnabledClause]), - IsEnabledFunForm = erl_syntax:revert(IsEnabledFun), - %% is_registry_initialized() -> ... - IsInitializedClauses = [erl_syntax:clause( - [], - [], - [erl_syntax:atom(true)]) - ], - IsInitializedFun = erl_syntax:function( - erl_syntax:atom(is_registry_initialized), - IsInitializedClauses), - IsInitializedFunForm = erl_syntax:revert(IsInitializedFun), - %% is_registry_written_to_disk() -> ... - IsWrittenToDiskClauses = [erl_syntax:clause( - [], - [], - [erl_syntax:atom(WrittenToDisk)]) - ], - IsWrittenToDiskFun = erl_syntax:function( - erl_syntax:atom(is_registry_written_to_disk), - IsWrittenToDiskClauses), - IsWrittenToDiskFunForm = erl_syntax:revert(IsWrittenToDiskFun), - %% Compilation! - Forms = [ModuleForm, - ExportForm, - GetFunForm, - ListFunForm, - StatesFunForm, - IsSupportedFunForm, - IsEnabledFunForm, - IsInitializedFunForm, - IsWrittenToDiskFunForm], - maybe_log_registry_source_code(Forms), - CompileOpts = [return_errors, - return_warnings], - case compile:forms(Forms, CompileOpts) of - {ok, Mod, Bin, _} -> - load_registry_mod(RegistryVsn, Mod, Bin); - {error, Errors, Warnings} -> - rabbit_log_feature_flags:error( - "Feature flags: registry compilation:~n" - "Errors: ~p~n" - "Warnings: ~p", - [Errors, Warnings]), - {error, {compilation_failure, Errors, Warnings}} - end. - -maybe_log_registry_source_code(Forms) -> - case rabbit_prelaunch:get_context() of - #{log_feature_flags_registry := true} -> - rabbit_log_feature_flags:debug( - "== FEATURE FLAGS REGISTRY ==~n" - "~s~n" - "== END ==~n", - [erl_prettypr:format(erl_syntax:form_list(Forms))]); - _ -> - ok - end. - --ifdef(TEST). -registry_loading_lock() -> ?FF_REGISTRY_LOADING_LOCK. --endif. - --spec load_registry_mod(registry_vsn(), atom(), binary()) -> - ok | restart | no_return(). -%% @private - -load_registry_mod(RegistryVsn, Mod, Bin) -> - rabbit_log_feature_flags:debug( - "Feature flags: registry module ready, loading it (~p)...", - [self()]), - FakeFilename = "Compiled and loaded by " ?MODULE_STRING, - %% Time to load the new registry, replacing the old one. We use a - %% lock here to synchronize concurrent reloads. - global:set_lock(?FF_REGISTRY_LOADING_LOCK, [node()]), - rabbit_log_feature_flags:debug( - "Feature flags: acquired lock before reloading registry module (~p)", - [self()]), - %% We want to make sure that the old registry (not the one being - %% currently in use) is purged by the code server. It means no - %% process lingers on that old code. - %% - %% We use code:soft_purge() for that (meaning no process is killed) - %% and we wait in an infinite loop for that to succeed. - ok = purge_old_registry(Mod), - %% Now we can replace the currently loaded registry by the new one. - %% The code server takes care of marking the current registry as old - %% and load the new module in an atomic operation. - %% - %% Therefore there is no chance of a window where there is no - %% registry module available, causing the one on disk to be - %% reloaded. - Ret = case registry_vsn() of - RegistryVsn -> code:load_binary(Mod, FakeFilename, Bin); - OtherVsn -> {error, {restart, RegistryVsn, OtherVsn}} - end, - rabbit_log_feature_flags:debug( - "Feature flags: releasing lock after reloading registry module (~p)", - [self()]), - global:del_lock(?FF_REGISTRY_LOADING_LOCK, [node()]), - case Ret of - {module, _} -> - rabbit_log_feature_flags:debug( - "Feature flags: registry module loaded (vsn: ~p -> ~p)", - [RegistryVsn, registry_vsn()]), - ok; - {error, {restart, Expected, Current}} -> - rabbit_log_feature_flags:error( - "Feature flags: another registry module was loaded in the " - "meantime (expected old vsn: ~p, current vsn: ~p); " - "restarting the regen", - [Expected, Current]), - restart; - {error, Reason} -> - rabbit_log_feature_flags:error( - "Feature flags: failed to load registry module: ~p", - [Reason]), - throw({feature_flag_registry_reload_failure, Reason}) - end. - --spec registry_vsn() -> registry_vsn(). -%% @private - -registry_vsn() -> - Attrs = rabbit_ff_registry:module_info(attributes), - proplists:get_value(vsn, Attrs, undefined). - -purge_old_registry(Mod) -> - case code:is_loaded(Mod) of - {file, _} -> do_purge_old_registry(Mod); - false -> ok - end. - -do_purge_old_registry(Mod) -> - case code:soft_purge(Mod) of - true -> ok; - false -> do_purge_old_registry(Mod) - end. - %% ------------------------------------------------------------------- %% Feature flags state storage. %% ------------------------------------------------------------------- @@ -1515,7 +1093,7 @@ do_enable(FeatureName) -> %% We mark this feature flag as "state changing" before doing the %% actual state change. We also take a global lock: this permits %% to block callers asking about a feature flag changing state. - global:set_lock(?FF_STATE_CHANGE_LOCK), + rabbit_ff_registry_factory:acquire_state_change_lock(), Ret = case mark_as_enabled(FeatureName, state_changing) of ok -> case enable_dependencies(FeatureName, true) of @@ -1538,7 +1116,7 @@ do_enable(FeatureName) -> ok -> ok; _ -> mark_as_enabled(FeatureName, false) end, - global:del_lock(?FF_STATE_CHANGE_LOCK), + rabbit_ff_registry_factory:release_state_change_lock(), Ret. -spec enable_locally(feature_name()) -> ok | {error, any()} | no_return(). @@ -1550,7 +1128,7 @@ enable_locally(FeatureName) when is_atom(FeatureName) -> ok; false -> rabbit_log_feature_flags:debug( - "Feature flag `~s`: enable locally (as part of feature " + "Feature flags: `~s`: enable locally (as part of feature " "flag states synchronization)", [FeatureName]), do_enable_locally(FeatureName) @@ -1582,7 +1160,7 @@ enable_dependencies(FeatureName, Everywhere) -> FeatureProps = rabbit_ff_registry:get(FeatureName), DependsOn = maps:get(depends_on, FeatureProps, []), rabbit_log_feature_flags:debug( - "Feature flag `~s`: enable dependencies: ~p", + "Feature flags: `~s`: enable dependencies: ~p", [FeatureName, DependsOn]), enable_dependencies(FeatureName, DependsOn, Everywhere). @@ -1615,8 +1193,9 @@ run_migration_fun(FeatureName, FeatureProps, Arg) -> {MigrationMod, MigrationFun} when is_atom(MigrationMod) andalso is_atom(MigrationFun) -> rabbit_log_feature_flags:debug( - "Feature flag `~s`: run migration function ~p with arg: ~p", - [FeatureName, MigrationFun, Arg]), + "Feature flags: `~s`: run migration function (v1) ~s:~s " + "with arg=~p on node ~p", + [FeatureName, MigrationMod, MigrationFun, Arg, node()]), try erlang:apply(MigrationMod, MigrationFun, @@ -1624,7 +1203,8 @@ run_migration_fun(FeatureName, FeatureProps, Arg) -> catch _:Reason:Stacktrace -> rabbit_log_feature_flags:error( - "Feature flag `~s`: migration function crashed: ~p~n~p", + "Feature flags: `~s`: migration function crashed: " + "~p~n~p", [FeatureName, Reason, Stacktrace]), {error, {migration_fun_crash, Reason, Stacktrace}} end; @@ -1632,7 +1212,7 @@ run_migration_fun(FeatureName, FeatureProps, Arg) -> {error, no_migration_fun}; Invalid -> rabbit_log_feature_flags:error( - "Feature flag `~s`: invalid migration function: ~p", + "Feature flags: `~s`: invalid migration function: ~p", [FeatureName, Invalid]), {error, {invalid_migration_fun, Invalid}} end. @@ -1654,8 +1234,8 @@ mark_as_enabled(FeatureName, IsEnabled) -> %% @private mark_as_enabled_locally(FeatureName, IsEnabled) -> - rabbit_log_feature_flags:info( - "Feature flag `~s`: mark as enabled=~p", + rabbit_log_feature_flags:debug( + "Feature flags: `~s`: mark as enabled=~p", [FeatureName, IsEnabled]), EnabledFeatureNames = maps:keys(list(enabled)), NewEnabledFeatureNames = case IsEnabled of @@ -1673,9 +1253,10 @@ mark_as_enabled_locally(FeatureName, IsEnabled) -> ok =:= try_to_write_enabled_feature_flags_list( NewEnabledFeatureNames) end, - initialize_registry(#{}, - #{FeatureName => IsEnabled}, - WrittenToDisk). + rabbit_ff_registry_factory:initialize_registry( + #{}, + #{FeatureName => IsEnabled}, + WrittenToDisk). -spec mark_as_enabled_remotely(feature_name(), feature_state()) -> any() | {error, any()} | no_return(). @@ -1748,6 +1329,7 @@ mark_as_enabled_remotely(Nodes, FeatureName, IsEnabled, Timeout) -> %% Coordination with remote nodes. %% ------------------------------------------------------------------- +-ifndef(TEST). -spec remote_nodes() -> [node()]. %% @private @@ -1761,6 +1343,44 @@ running_remote_nodes() -> mnesia:system_info(running_db_nodes) -- [node()]. query_running_remote_nodes(Node, Timeout) -> + query_running_remote_nodes1(Node, Timeout). +-else. +-define(PT_OVERRIDDEN_NODES, {?MODULE, overridden_nodes}). +-define(PT_OVERRIDDEN_RUNNING_NODES, {?MODULE, overridden_running_nodes}). + +remote_nodes() -> + case get_overriden_nodes() of + undefined -> mnesia:system_info(db_nodes) -- [node()]; + Nodes -> Nodes -- [node()] + end. + +running_remote_nodes() -> + case get_overriden_running_nodes() of + undefined -> mnesia:system_info(running_db_nodes) -- [node()]; + Nodes -> Nodes -- [node()] + end. + +query_running_remote_nodes(Node, Timeout) -> + case rpc:call(Node, ?MODULE, get_overriden_running_nodes, [], Timeout) of + {badrpc, _} = Error -> Error; + undefined -> query_running_remote_nodes1(Node, Timeout); + Nodes -> Nodes -- [node()] + end. + +override_nodes(Nodes) -> + persistent_term:put(?PT_OVERRIDDEN_NODES, Nodes). + +get_overriden_nodes() -> + persistent_term:get(?PT_OVERRIDDEN_NODES, undefined). + +override_running_nodes(Nodes) -> + persistent_term:put(?PT_OVERRIDDEN_RUNNING_NODES, Nodes). + +get_overriden_running_nodes() -> + persistent_term:get(?PT_OVERRIDDEN_RUNNING_NODES, undefined). +-endif. + +query_running_remote_nodes1(Node, Timeout) -> case rpc:call(Node, mnesia, system_info, [running_db_nodes], Timeout) of {badrpc, _} = Error -> Error; Nodes -> Nodes -- [node()] @@ -1841,6 +1461,12 @@ check_node_compatibility(Node) -> %% @see check_node_compatibility/1 check_node_compatibility(Node, Timeout) -> + case is_enabled(feature_flags_v2) of + true -> rabbit_ff_controller:check_node_compatibility(Node); + false -> check_node_compatibility_v1(Node, Timeout) + end. + +check_node_compatibility_v1(Node, Timeout) -> %% Before checking compatibility, we exchange feature flags from %% unknown Erlang applications. So we fetch remote feature flags %% from applications which are not loaded locally, and the opposite. @@ -1941,42 +1567,8 @@ remote_enabled_feature_flags_is_supported_locally(Node, Timeout) -> is_supported_locally(RemoteEnabledFeatureNames) end. --spec run_feature_flags_mod_on_remote_node(node(), - atom(), - [term()], - timeout()) -> - term() | {error, term()}. -%% @private - run_feature_flags_mod_on_remote_node(Node, Function, Args, Timeout) -> - case rpc:call(Node, ?MODULE, Function, Args, Timeout) of - {badrpc, {'EXIT', - {undef, - [{?MODULE, Function, Args, []} - | _]}}} -> - %% If rabbit_feature_flags:Function() is undefined - %% on the remote node, we consider it to be a 3.7.x - %% pre-feature-flags node. - %% - %% Theoretically, it could be an older version (3.6.x and - %% older). But the RabbitMQ version consistency check - %% (rabbit_misc:version_minor_equivalent/2) called from - %% rabbit_mnesia:check_rabbit_consistency/2 already blocked - %% this situation from happening before we reach this point. - rabbit_log_feature_flags:debug( - "Feature flags: ~s:~s~p unavailable on node `~s`: " - "assuming it is a RabbitMQ 3.7.x pre-feature-flags node", - [?MODULE, Function, Args, Node]), - {error, pre_feature_flags_rabbitmq}; - {badrpc, Reason} = Error -> - rabbit_log_feature_flags:error( - "Feature flags: error while running ~s:~s~p " - "on node `~s`: ~p", - [?MODULE, Function, Args, Node, Reason]), - {error, Error}; - Ret -> - Ret - end. + rabbit_ff_controller:rpc_call(Node, ?MODULE, Function, Args, Timeout). -spec query_remote_feature_flags(node(), Which :: all | enabled | disabled, @@ -2044,7 +1636,8 @@ merge_feature_flags_from_unknown_apps(FeatureFlags) "Feature flags: register feature flags provided by applications " "unknown locally: ~p", [maps:keys(FeatureFlagsFromUnknownApps)]), - initialize_registry(FeatureFlagsFromUnknownApps) + rabbit_ff_registry_factory:initialize_registry( + FeatureFlagsFromUnknownApps) end. exchange_feature_flags_from_unknown_apps(Node, Timeout) -> @@ -2097,7 +1690,13 @@ sync_feature_flags_with_cluster(Nodes, NodeIsVirgin) -> ok | {error, any()} | no_return(). %% @private -sync_feature_flags_with_cluster([], NodeIsVirgin, _) -> +sync_feature_flags_with_cluster(Nodes, NodeIsVirgin, Timeout) -> + case is_enabled(feature_flags_v2) of + true -> rabbit_ff_controller:sync_cluster(); + false -> sync_cluster_v1(Nodes, NodeIsVirgin, Timeout) + end. + +sync_cluster_v1([], NodeIsVirgin, _) -> verify_which_feature_flags_are_actually_enabled(), case NodeIsVirgin of true -> @@ -2139,10 +1738,96 @@ sync_feature_flags_with_cluster([], NodeIsVirgin, _) -> "current state"), ok end; -sync_feature_flags_with_cluster(Nodes, _, Timeout) -> - verify_which_feature_flags_are_actually_enabled(), +sync_cluster_v1(Nodes, _, Timeout) -> + case sync_feature_flags_v2_first(Nodes, Timeout) of + true -> + ?LOG_DEBUG( + "Feature flags: both sides have `feature_flags_v2` enabled; " + "switching to controller's sync", + [], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + rabbit_ff_controller:sync_cluster(); + false -> + verify_which_feature_flags_are_actually_enabled(), + RemoteNodes = Nodes -- [node()], + sync_feature_flags_with_cluster1(RemoteNodes, Timeout) + end. + +sync_feature_flags_v2_first(Nodes, Timeout) -> + ?LOG_DEBUG( + "Feature flags: checking if one side of the sync has " + "`feature_flags_v2` enabled", + [], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), RemoteNodes = Nodes -- [node()], - sync_feature_flags_with_cluster1(RemoteNodes, Timeout). + RandomRemoteNode = pick_one_node(RemoteNodes), + Ret1 = run_feature_flags_mod_on_remote_node( + RandomRemoteNode, + is_enabled, + [feature_flags_v2], + Timeout), + case Ret1 of + {error, Reason} -> + ?LOG_DEBUG( + "Feature flags: failed to check `feature_flags_v2` on remote " + "node: ~p", + [Reason], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + false; + EnabledRemotely -> + EnabledLocally = is_enabled(feature_flags_v2), + ?LOG_DEBUG( + "Feature flags: `feature_flags_v2` state: local=~s remote=~s", + [EnabledLocally, EnabledRemotely], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + case {EnabledLocally, EnabledRemotely} of + {true, true} -> + true; + {true, false} -> + ?LOG_DEBUG( + "Feature flags: enable `feature_flags_v2` remotely " + "and restart sync", + [], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + Ret2 = run_feature_flags_mod_on_remote_node( + RandomRemoteNode, + do_sync_feature_flags_with_node, + [[feature_flags_v2]], + Timeout), + case Ret2 of + ok -> + true; + {error, Reason} -> + ?LOG_DEBUG( + "Feature flags: failed to enable " + "`feature_flags_v2` remotely: ~p", + [Reason], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + false + end; + {false, true} -> + ?LOG_DEBUG( + "Feature flags: enable `feature_flags_v2` locally " + "and restart sync", + [], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + Ret3 = do_sync_feature_flags_with_node( + [feature_flags_v2]), + case Ret3 of + ok -> + true; + {error, Reason} -> + ?LOG_DEBUG( + "Feature flags: failed to enable " + "`feature_flags_v2` locally: ~p", + [Reason], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + false + end; + {false, false} -> + false + end + end. sync_feature_flags_with_cluster1([], _) -> ok; @@ -2337,25 +2022,37 @@ verify_which_feature_flags_are_actually_enabled() -> "flags"), WrittenToDisk = ok =:= try_to_write_enabled_feature_flags_list( RepairedEnabledFeatureNames), - initialize_registry( + rabbit_ff_registry_factory:initialize_registry( #{}, - list_of_enabled_feature_flags_to_feature_states( + enabled_feature_flags_to_feature_states( RepairedEnabledFeatureNames), WrittenToDisk) end. +-spec enabled_feature_flags_to_feature_states([feature_name()]) -> + feature_states(). + +enabled_feature_flags_to_feature_states(FeatureNames) -> + maps:from_list([{FeatureName, true} || FeatureName <- FeatureNames]). + -spec refresh_feature_flags_after_app_load([atom()]) -> ok | {error, any()} | no_return(). -refresh_feature_flags_after_app_load([]) -> - ok; refresh_feature_flags_after_app_load(Apps) -> + case is_enabled(feature_flags_v2) of + true -> rabbit_ff_controller:refresh_after_app_load(); + false -> refresh_feature_flags_after_app_load_v1(Apps) + end. + +refresh_feature_flags_after_app_load_v1([]) -> + ok; +refresh_feature_flags_after_app_load_v1(Apps) -> rabbit_log_feature_flags:debug( "Feature flags: new apps loaded: ~p -> refreshing feature flags", [Apps]), FeatureFlags0 = list(all), - FeatureFlags1 = query_supported_feature_flags(), + {_ScannedApps, FeatureFlags1} = query_supported_feature_flags(), %% The following list contains all the feature flags this node %% learned about only because remote nodes have them. Now, the @@ -2396,7 +2093,7 @@ refresh_feature_flags_after_app_load(Apps) -> [lists:sort(NewSupportedFeatureNames)]) end, - case initialize_registry() of + case rabbit_ff_registry_factory:initialize_registry() of ok -> Ret = maybe_enable_locally_after_app_load( AlreadySupportedFeatureNames), diff --git a/deps/rabbit/src/rabbit_ff_controller.erl b/deps/rabbit/src/rabbit_ff_controller.erl new file mode 100644 index 0000000000..709e2e05fe --- /dev/null +++ b/deps/rabbit/src/rabbit_ff_controller.erl @@ -0,0 +1,1152 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2021-2022 VMware, Inc. or its affiliates. All rights reserved. +%% + +%% @author The RabbitMQ team +%% @copyright 2022 VMware, Inc. or its affiliates. +%% +%% @doc +%% The feature flag controller is responsible for synchronization and managing +%% concurrency when feature flag states are changed. +%% +%% It makes sure only one node can enable feature flags or synchronize its +%% feature flag states with other nodes at a time. If another node wants to +%% enable a feature flag or if it joins a cluster and needs to synchronize its +%% feature flag states, it will wait for the current task to finish. +%% +%% The feature flag controller is used as soon as the `feature_flags_v2' +%% feature flag is enabled. +%% +%% Compared to the initial subsystem, it also introduces a new migration +%% function signature; See {@link rabbit_feature_flags:migration_fun_v2()}. + +-module(rabbit_ff_controller). +-behaviour(gen_statem). + +-include_lib("kernel/include/logger.hrl"). +-include_lib("stdlib/include/assert.hrl"). + +-include_lib("rabbit_common/include/logging.hrl"). + +-include("feature_flags.hrl"). + +-export([is_supported/1, is_supported/2, + enable/1, + check_node_compatibility/1, + sync_cluster/0, + refresh_after_app_load/0]). + +%% Internal use only. +-export([start/0, + start_link/0, + rpc_call/5, + all_nodes/0, + running_nodes/0]). + +%% gen_statem callbacks. +-export([callback_mode/0, + init/1, + terminate/3, + code_change/4, + + standing_by/3, + waiting_for_end_of_controller_task/3, + updating_feature_flag_states/3]). + +-record(?MODULE, {from, + notify = #{}}). + +-define(LOCAL_NAME, ?MODULE). +-define(GLOBAL_NAME, {?MODULE, global}). + +%% Default timeout for operations on remote nodes. +-define(TIMEOUT, 60000). + +start() -> + gen_statem:start({local, ?LOCAL_NAME}, ?MODULE, none, []). + +start_link() -> + gen_statem:start_link({local, ?LOCAL_NAME}, ?MODULE, none, []). + +is_supported(FeatureNames) -> + is_supported(FeatureNames, ?TIMEOUT). + +is_supported(FeatureName, Timeout) when is_atom(FeatureName) -> + is_supported([FeatureName], Timeout); +is_supported(FeatureNames, Timeout) when is_list(FeatureNames) -> + Nodes = running_nodes(), + case collect_inventory_on_nodes(Nodes, Timeout) of + {ok, Inventory} -> + lists:all( + fun(FeatureName) -> + is_known_and_supported(Inventory, FeatureName) + end, + FeatureNames); + _Error -> + false + end. + +enable(FeatureName) when is_atom(FeatureName) -> + enable([FeatureName]); +enable(FeatureNames) when is_list(FeatureNames) -> + ?LOG_DEBUG( + "Feature flags: REQUEST TO ENABLE: ~p", + [FeatureNames], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + gen_statem:call(?LOCAL_NAME, {enable, FeatureNames}). + +check_node_compatibility(RemoteNode) -> + ThisNode = node(), + ?LOG_DEBUG( + "Feature flags: CHECKING COMPATIBILITY between nodes `~s` and `~s`", + [ThisNode, RemoteNode], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + %% We don't go through the controller process to check nodes compatibility + %% because this function is used while `rabbit' is stopped usually. + %% + %% There is no benefit in starting a controller just for this check + %% because it would not guaranty that the compatibility remains true after + %% this function finishes and before the node starts and synchronizes + %% feature flags. + check_node_compatibility_task(ThisNode, RemoteNode). + +sync_cluster() -> + ?LOG_DEBUG( + "Feature flags: SYNCING FEATURE FLAGS in cluster...", + [], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + case erlang:whereis(?LOCAL_NAME) of + Pid when is_pid(Pid) -> + %% The function is called while `rabbit' is running. + gen_statem:call(?LOCAL_NAME, sync_cluster); + undefined -> + %% The function is called while `rabbit' is stopped. We need to + %% start a one-off controller, again to make sure concurrent + %% changes are blocked. + {ok, Pid} = start_link(), + Ret = gen_statem:call(Pid, sync_cluster), + gen_statem:stop(Pid), + Ret + end. + +refresh_after_app_load() -> + ?LOG_DEBUG( + "Feature flags: REFRESHING after applications load...", + [], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + gen_statem:call(?LOCAL_NAME, refresh_after_app_load). + +%% -------------------------------------------------------------------- +%% gen_statem callbacks. +%% -------------------------------------------------------------------- + +callback_mode() -> + state_functions. + +init(_Args) -> + {ok, standing_by, none}. + +standing_by( + {call, From} = EventType, EventContent, none) + when EventContent =/= notify_when_done -> + ?LOG_DEBUG( + "Feature flags: registering controller globally before " + "proceeding with task: ~p", + [EventContent], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + + %% The first step is to register this process globally (it is already + %% registered locally). The purpose is to make sure this one takes full + %% control on feature flag changes among other controllers. + %% + %% This is useful for situations where a new node joins the cluster while + %% a feature flag is being enabled. In this case, when that new node joins + %% and its controller wants to synchronize feature flags, it will block + %% and wait for this one to finish. + case register_globally() of + yes -> + %% We would register the process globally. Therefore we can + %% proceed with enabling/syncing feature flags. + ?LOG_DEBUG( + "Feature flags: controller globally registered; can proceed " + "with task", + [], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + + Data = #?MODULE{from = From}, + {next_state, updating_feature_flag_states, Data, + [{next_event, internal, EventContent}]}; + + no -> + %% Another controller is globally registered. We ask that global + %% controller to notify us when it is done, and we wait for its + %% response. + ?LOG_DEBUG( + "Feature flags: controller NOT globally registered; need to " + "wait for the current global controller's task to finish", + [], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + + RequestId = notify_me_when_done(), + {next_state, waiting_for_end_of_controller_task, RequestId, + [{next_event, EventType, EventContent}]} + end; +standing_by( + {call, From}, notify_when_done, none) -> + %% This state is entered when a globally-registered controller finished + %% its task but had unhandled `notify_when_done` requests in its inbox. We + %% just need to notify the caller that it can proceed. + notify_waiting_controller(From), + {keep_state_and_data, []}. + +waiting_for_end_of_controller_task( + {call, _From}, _EventContent, _RequestId) -> + {keep_state_and_data, [postpone]}; +waiting_for_end_of_controller_task( + info, Msg, RequestId) -> + case gen_statem:check_response(Msg, RequestId) of + {reply, done} -> + ?LOG_DEBUG( + "Feature flags: current global controller's task finished; " + "trying to take next turn", + [], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + {next_state, standing_by, none, []}; + {error, Reason} -> + ?LOG_DEBUG( + "Feature flags: error while waiting for current global " + "controller's task: ~0p; trying to take next turn", + [Reason], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + {next_state, standing_by, none, []}; + no_reply -> + ?LOG_DEBUG( + "Feature flags: unknown message while waiting for current " + "global controller's task: ~0p; still waiting", + [Msg], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + keep_state_and_data + end. + +updating_feature_flag_states( + internal, Task, #?MODULE{from = From} = Data) -> + Reply = proceed_with_task(Task), + + ?LOG_DEBUG( + "Feature flags: unregistering controller globally", + [], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + unregister_globally(), + notify_waiting_controllers(Data), + {next_state, standing_by, none, [{reply, From, Reply}]}; +updating_feature_flag_states( + {call, From}, notify_when_done, #?MODULE{notify = Notify} = Data) -> + Notify1 = Notify#{From => true}, + Data1 = Data#?MODULE{notify = Notify1}, + {keep_state, Data1}. + +proceed_with_task({enable, FeatureNames}) -> + enable_task(FeatureNames); +proceed_with_task(sync_cluster) -> + sync_cluster_task(); +proceed_with_task(refresh_after_app_load) -> + refresh_after_app_load_task(). + +terminate(_Reason, _State, _Data) -> + ok. + +code_change(_OldVsn, OldState, OldData, _Extra) -> + {ok, OldState, OldData}. + +%% -------------------------------------------------------------------- +%% Global name registration. +%% -------------------------------------------------------------------- + +register_globally() -> + ?LOG_DEBUG( + "Feature flags: [global sync] @ ~s", + [node()], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + ok = global:sync(), + ?LOG_DEBUG( + "Feature flags: [global register] @ ~s", + [node()], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + global:register_name(?GLOBAL_NAME, self()). + +unregister_globally() -> + ?LOG_DEBUG( + "Feature flags: [global unregister] @ ~s", + [node()], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + _ = global:unregister_name(?GLOBAL_NAME), + ok. + +notify_me_when_done() -> + gen_statem:send_request({global, ?GLOBAL_NAME}, notify_when_done). + +notify_waiting_controllers(#?MODULE{notify = Notify}) -> + maps:fold( + fun(From, true, Acc) -> + notify_waiting_controller(From), + Acc + end, ok, Notify). + +notify_waiting_controller({ControlerPid, _} = From) -> + ControlerNode = node(ControlerPid), + ?LOG_DEBUG( + "Feature flags: controller's task finished; notify waiting controller " + "on node ~p", + [ControlerNode], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + gen_statem:reply(From, done). + +%% -------------------------------------------------------------------- +%% Code to check compatibility between nodes. +%% -------------------------------------------------------------------- + +-spec check_node_compatibility_task(Node, Node) -> Ret when + Node :: node(), + Ret :: ok | {error, Reason}, + Reason :: incompatible_feature_flags. + +check_node_compatibility_task(NodeA, NodeB) -> + ?LOG_NOTICE( + "Feature flags: checking nodes `~s` and `~s` compatibility...", + [NodeA, NodeB], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + NodesA = list_nodes_clustered_with(NodeA), + NodesB = list_nodes_clustered_with(NodeB), + AreCompatible = case collect_inventory_on_nodes(NodesA) of + {ok, InventoryA} -> + ?LOG_DEBUG( + "Feature flags: inventory of node `~s`:~n~p", + [NodeA, InventoryA], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + case collect_inventory_on_nodes(NodesB) of + {ok, InventoryB} -> + ?LOG_DEBUG( + "Feature flags: inventory of node " + "`~s`:~n~p", + [NodeB, InventoryB], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + are_compatible(InventoryA, InventoryB); + _ -> + false + end; + _ -> + false + end, + case AreCompatible of + true -> + ?LOG_NOTICE( + "Feature flags: nodes `~s` and `~s` are compatible", + [NodeA, NodeB], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + ok; + false -> + ?LOG_WARNING( + "Feature flags: nodes `~s` and `~s` are incompatible", + [NodeA, NodeB], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + {error, incompatible_feature_flags} + end. + +-spec list_nodes_clustered_with(Node) -> [Node] when + Node :: node(). + +list_nodes_clustered_with(Node) -> + %% If Mnesia is stopped on the given node, it will return an empty list. + %% In this case, only consider that stopped node. + case rpc_call(Node, ?MODULE, running_nodes, [], ?TIMEOUT) of + [] -> [Node]; + List -> List + end. + +-spec are_compatible(Inventory, Inventory) -> AreCompatible when + Inventory :: rabbit_feature_flags:cluster_inventory(), + AreCompatible :: boolean(). + +are_compatible(InventoryA, InventoryB) -> + check_one_way_compatibility(InventoryA, InventoryB) + andalso + check_one_way_compatibility(InventoryB, InventoryA). + +-spec check_one_way_compatibility(Inventory, Inventory) -> AreCompatible when + Inventory :: rabbit_feature_flags:cluster_inventory(), + AreCompatible :: boolean(). + +check_one_way_compatibility(InventoryA, InventoryB) -> + %% This function checks the compatibility in one way, "inventory A" is + %% compatible with "inventory B". This is true if all feature flags + %% enabled in "inventory B" are supported by "inventory A". + %% + %% They don't need to be enabled on both side: the feature flags states + %% will be synchronized by `sync_cluster()'. + FeatureNames = list_feature_flags_enabled_somewhere(InventoryB, true), + lists:all( + fun(FeatureName) -> + #{feature_flags := #{FeatureName := FeatureFlag}} = InventoryB, + not is_known(InventoryA, FeatureFlag) + orelse + is_known_and_supported(InventoryA, FeatureName) + end, FeatureNames). + +%% -------------------------------------------------------------------- +%% Code to enable and sync feature flags. +%% -------------------------------------------------------------------- + +-spec enable_task(FeatureNames) -> Ret when + FeatureNames :: [rabbit_feature_flags:feature_name()], + Ret :: ok | {error, Reason}, + Reason :: term(). + +enable_task(FeatureNames) -> + %% We take a snapshot of clustered nodes, including stopped nodes, at the + %% beginning of the process and use that list at all time. + %% + %% If there are some missing, unreachable or stopped nodes, we abort the + %% task and refuse to enable the feature flag(s). This is to make sure + %% that if there is a network partition, the other side of the partition + %% doesn't "rejoin" the cluster with some feature flags enabled behind the + %% scene. + %% + %% TODO: Should we remove the requirement above and call `sync_cluster()' + %% when a network partition is repaired? + %% + %% If a node tries to join the cluster during the task, it will block + %% because it will want to synchronize its feature flags state with the + %% cluster. For that to happen, this controller `enable' task needs to + %% finish before the synchronization task starts. + %% + %% If a node stops during the task, this will trigger an RPC error at some + %% point and the task will abort. That's ok because migration functions + %% are supposed to be idempotent. + AllNodes = all_nodes(), + RunningNodes = running_nodes(), + case RunningNodes of + AllNodes -> + ?LOG_DEBUG( + "Feature flags: nodes where the feature flags will be " + "enabled: ~p~n" + "Feature flags: new nodes joining the cluster in between " + "will wait and synchronize their feature flag states after.", + [AllNodes], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + + %% Likewise, we take a snapshot of the feature flags states on all + %% running nodes right from the beginning. This is what we use + %% during the entire task to determine if feature flags are + %% supported, enabled somewhere, etc. + case collect_inventory_on_nodes(AllNodes) of + {ok, Inventory} -> enable_many(Inventory, FeatureNames); + Error -> Error + end; + _ -> + ?LOG_ERROR( + "Feature flags: refuse to enable feature flags while " + "clustered nodes are missing, stopped or unreachable: ~p", + [AllNodes -- RunningNodes]), + {error, missing_clustered_nodes} + end. + +-spec sync_cluster_task() -> Ret when + Ret :: ok | {error, Reason}, + Reason :: term(). + +sync_cluster_task() -> + %% We assume that a feature flag can only be enabled, not disabled. + %% Therefore this synchronization searches for feature flags enabled on + %% some nodes but not all, and make sure they are enabled everywhere. + %% + %% This happens when a node joins a cluster and that node has a different + %% set of enabled feature flags. + %% + %% FIXME: `enable_task()' requires that all nodes in the cluster run to + %% enable anything. Should we require the same here? On one hand, this + %% would make sure a feature flag isn't enabled while there is a network + %% partition. On the other hand, this would require that all nodes are + %% running before we can expand the cluster... + Nodes = running_nodes(), + ?LOG_DEBUG( + "Feature flags: synchronizing feature flags on nodes: ~p", + [Nodes], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + + case collect_inventory_on_nodes(Nodes) of + {ok, Inventory} -> + FeatureNames = list_feature_flags_enabled_somewhere( + Inventory, false), + enable_many(Inventory, FeatureNames); + Error -> + Error + end. + +-spec refresh_after_app_load_task() -> Ret when + Ret :: ok | {error, Reason}, + Reason :: term(). + +refresh_after_app_load_task() -> + case rabbit_ff_registry_factory:initialize_registry() of + ok -> sync_cluster_task(); + Error -> Error + end. + +-spec enable_many(Inventory, FeatureNames) -> Ret when + Inventory :: rabbit_feature_flags:cluster_inventory(), + FeatureNames :: [rabbit_feature_flags:feature_name()], + Ret :: ok | {error, Reason}, + Reason :: term(). + +enable_many(#{states_per_node := _} = Inventory, [FeatureName | Rest]) -> + case enable_if_supported(Inventory, FeatureName) of + ok -> enable_many(Inventory, Rest); + Error -> Error + end; +enable_many(_Inventory, []) -> + ok. + +-spec enable_if_supported(Inventory, FeatureName) -> Ret when + Inventory :: rabbit_feature_flags:cluster_inventory(), + FeatureName :: rabbit_feature_flags:feature_name(), + Ret :: ok | {error, Reason}, + Reason :: term(). + +enable_if_supported(#{states_per_node := _} = Inventory, FeatureName) -> + case is_known_and_supported(Inventory, FeatureName) of + true -> + ?LOG_DEBUG( + "Feature flags: `~s`: supported; continuing", + [FeatureName], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + case lock_registry_and_enable(Inventory, FeatureName) of + {ok, _Inventory} -> ok; + Error -> Error + end; + false -> + ?LOG_DEBUG( + "Feature flags: `~s`: unsupported; aborting", + [FeatureName], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + {error, unsupported} + end. + +-spec lock_registry_and_enable(Inventory, FeatureName) -> Ret when + Inventory :: rabbit_feature_flags:cluster_inventory(), + FeatureName :: rabbit_feature_flags:feature_name(), + Ret :: {ok, Inventory} | {error, Reason}, + Reason :: term(). + +lock_registry_and_enable(#{states_per_node := _} = Inventory, FeatureName) -> + %% We acquire a lock before making any change to the registry. This is not + %% used by the controller (because it is already using a globally + %% registered name to prevent concurrent runs). But this is used in + %% `rabbit_feature_flags:is_enabled()' to block while the state is + %% `state_changing'. + ?LOG_DEBUG( + "Feature flags: acquiring registry state change lock", + [], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + rabbit_ff_registry_factory:acquire_state_change_lock(), + + Ret = enable_with_registry_locked(Inventory, FeatureName), + + ?LOG_DEBUG( + "Feature flags: releasing registry state change lock", + [], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + rabbit_ff_registry_factory:release_state_change_lock(), + Ret. + +-spec enable_with_registry_locked(Inventory, FeatureName) -> Ret when + Inventory :: rabbit_feature_flags:cluster_inventory(), + FeatureName :: rabbit_feature_flags:feature_name(), + Ret :: {ok, Inventory} | {error, Reason}, + Reason :: term(). + +enable_with_registry_locked( + #{states_per_node := _} = Inventory, FeatureName) -> + %% We verify if the feature flag needs to be enabled somewhere. This may + %% have changed since the beginning, not because another controller + %% enabled it (this is not possible because only one can run at a given + %% time), but because this feature flag was already enabled as a + %% consequence (for instance, it's a dependency of another feature flag we + %% processed). + Nodes = list_nodes_where_feature_flag_is_disabled(Inventory, FeatureName), + case Nodes of + [] -> + ?LOG_DEBUG( + "Feature flags: `~s`: already enabled; skipping", + [FeatureName], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + {ok, Inventory}; + _ -> + ?LOG_NOTICE( + "Feature flags: attempt to enable `~s`...", + [FeatureName], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + + case update_feature_state_and_enable(Inventory, FeatureName) of + {ok, _Inventory} = Ok -> + ?LOG_NOTICE( + "Feature flags: `~s` enabled", + [FeatureName], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + Ok; + Error -> + ?LOG_ERROR( + "Feature flags: failed to enable `~s`: ~p", + [FeatureName, Error], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + Error + end + end. + +-spec update_feature_state_and_enable(Inventory, FeatureName) -> Ret when + Inventory :: rabbit_feature_flags:cluster_inventory(), + FeatureName :: rabbit_feature_flags:feature_name(), + Ret :: {ok, Inventory} | {error, Reason}, + Reason :: term(). + +update_feature_state_and_enable( + #{states_per_node := _} = Inventory, FeatureName) -> + %% The feature flag is marked as `state_changing' on all running nodes who + %% know it, including those where it's already enabled. The idea is that + %% the feature flag is between two states on some nodes and the code + %% running on the nodes where the feature flag enabled shouldn't assume + %% all nodes in the cluster are in the same situation. + Nodes = list_nodes_who_know_the_feature_flag(Inventory, FeatureName), + + %% We only consider nodes where the feature flag is disabled. The + %% migration function is only executed on those nodes. I.e. we don't run + %% it again where it has already been executed. + NodesWhereDisabled = list_nodes_where_feature_flag_is_disabled( + Inventory, FeatureName), + + Ret1 = mark_as_enabled_on_nodes( + Nodes, Inventory, FeatureName, state_changing), + case Ret1 of + %% We ignore the returned updated inventory because we don't need or + %% even want to remember the `state_changing' state. This is only used + %% for external queries of the registry. + {ok, _Inventory0} -> + case do_enable(Inventory, FeatureName, NodesWhereDisabled) of + {ok, Inventory1} -> + Ret2 = mark_as_enabled_on_nodes( + Nodes, Inventory1, FeatureName, true), + case Ret2 of + {ok, Inventory2} -> + post_enable( + Inventory2, FeatureName, NodesWhereDisabled), + Ret2; + Error -> + _ = mark_as_enabled_on_nodes( + Nodes, Inventory, FeatureName, false), + Error + end; + Error -> + _ = mark_as_enabled_on_nodes( + Nodes, Inventory, FeatureName, false), + Error + end; + Error -> + _ = mark_as_enabled_on_nodes( + Nodes, Inventory, FeatureName, false), + Error + end. + +-spec do_enable(Inventory, FeatureName, Nodes) -> Ret when + Inventory :: rabbit_feature_flags:cluster_inventory(), + FeatureName :: rabbit_feature_flags:feature_name(), + Nodes :: [node()], + Ret :: {ok, Inventory} | {error, Reason}, + Reason :: term(). + +do_enable(#{states_per_node := _} = Inventory, FeatureName, Nodes) -> + %% After dependencies are enabled, we need to remember the updated + %% inventory. This is useful later to skip feature flags enabled earlier + %% in the process. For instance because a feature flag is dependency of + %% several other feature flags. + case enable_dependencies(Inventory, FeatureName) of + {ok, Inventory1} -> + Extra = #{nodes => Nodes}, + Rets = run_migration_fun( + Nodes, FeatureName, enable, Extra, infinity), + maps:fold( + fun + (_Node, ok, {ok, _} = Ret) -> Ret; + (_Node, Error, {ok, _}) -> Error; + (_Node, _, Error) -> Error + end, {ok, Inventory1}, Rets); + Error -> + Error + end. + +-spec post_enable(Inventory, FeatureName, Nodes) -> Ret when + Inventory :: rabbit_feature_flags:cluster_inventory(), + FeatureName :: rabbit_feature_flags:feature_name(), + Nodes :: [node()], + Ret :: ok. + +post_enable(#{states_per_node := _}, FeatureName, Nodes) -> + case rabbit_feature_flags:uses_migration_fun_v2(FeatureName) of + true -> + Extra = #{nodes => Nodes}, + _ = run_migration_fun( + Nodes, FeatureName, post_enable, Extra, infinity), + ok; + false -> + ok + end. + +%% -------------------------------------------------------------------- +%% Cluster relationship and inventory. +%% -------------------------------------------------------------------- + +-ifndef(TEST). +all_nodes() -> + lists:usort([node() | mnesia:system_info(db_nodes)]). + +running_nodes() -> + lists:usort([node() | mnesia:system_info(running_db_nodes)]). +-else. +all_nodes() -> + RemoteNodes = case rabbit_feature_flags:get_overriden_nodes() of + undefined -> mnesia:system_info(db_nodes); + Nodes -> Nodes + end, + lists:usort([node() | RemoteNodes]). + +running_nodes() -> + RemoteNodes = case rabbit_feature_flags:get_overriden_running_nodes() of + undefined -> mnesia:system_info(running_db_nodes); + Nodes -> Nodes + end, + lists:usort([node() | RemoteNodes]). +-endif. + +collect_inventory_on_nodes(Nodes) -> + collect_inventory_on_nodes(Nodes, ?TIMEOUT). + +-spec collect_inventory_on_nodes(Nodes, Timeout) -> Ret when + Nodes :: [node()], + Timeout :: timeout(), + Ret :: {ok, Inventory} | {error, Reason}, + Inventory :: rabbit_feature_flags:cluster_inventory(), + Reason :: term(). + +collect_inventory_on_nodes(Nodes, Timeout) -> + ?LOG_DEBUG( + "Feature flags: collecting inventory on nodes: ~p", + [Nodes], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + Inventory0 = #{feature_flags => #{}, + applications_per_node => #{}, + states_per_node => #{}}, + Rets = rpc_calls(Nodes, rabbit_ff_registry, inventory, [], Timeout), + maps:fold( + fun + (Node, + #{feature_flags := FeatureFlags1, + applications := ScannedApps, + states := FeatureStates}, + {ok, #{feature_flags := FeatureFlags, + applications_per_node := ScannedAppsPerNode, + states_per_node := StatesPerNode} = Inventory}) -> + FeatureFlags2 = maps:merge(FeatureFlags, FeatureFlags1), + ScannedAppsPerNode1 = ScannedAppsPerNode#{Node => ScannedApps}, + StatesPerNode1 = StatesPerNode#{Node => FeatureStates}, + Inventory1 = Inventory#{ + feature_flags => FeatureFlags2, + applications_per_node => ScannedAppsPerNode1, + states_per_node => StatesPerNode1}, + {ok, Inventory1}; + (_Node, #{}, Error) -> + Error; + (_Node, Error, {ok, #{}}) -> + Error; + (_Node, _Error, Error) -> + Error + end, {ok, Inventory0}, Rets). + +-spec list_feature_flags_enabled_somewhere(Inventory, HandleStateChanging) -> + Ret when + Inventory :: rabbit_feature_flags:cluster_inventory(), + HandleStateChanging :: boolean(), + Ret :: [FeatureName], + FeatureName :: rabbit_feature_flags:feature_name(). + +list_feature_flags_enabled_somewhere( + #{states_per_node := StatesPerNode}, + HandleStateChanging) -> + %% We want to collect feature flags which are enabled on at least one + %% node. + MergedStates = maps:fold( + fun(_Node, FeatureStates, Acc1) -> + maps:fold( + fun + (FeatureName, true, Acc2) -> + Acc2#{FeatureName => true}; + (_FeatureName, false, Acc2) -> + Acc2; + (FeatureName, state_changing, Acc2) + when HandleStateChanging -> + Acc2#{FeatureName => true} + end, Acc1, FeatureStates) + end, #{}, StatesPerNode), + lists:sort(maps:keys(MergedStates)). + +-spec list_nodes_who_know_the_feature_flag(Inventory, FeatureName) -> + Ret when + Inventory :: rabbit_feature_flags:cluster_inventory(), + FeatureName :: rabbit_feature_flags:feature_name(), + Ret :: [node()]. + +list_nodes_who_know_the_feature_flag( + #{states_per_node := StatesPerNode}, + FeatureName) -> + Nodes = lists:sort( + maps:keys( + maps:filter( + fun(_Node, FeatureStates) -> + maps:is_key(FeatureName, FeatureStates) + end, StatesPerNode))), + this_node_first(Nodes). + +-spec list_nodes_where_feature_flag_is_disabled(Inventory, FeatureName) -> + Ret when + Inventory :: rabbit_feature_flags:cluster_inventory(), + FeatureName :: rabbit_feature_flags:feature_name(), + Ret :: [node()]. + +list_nodes_where_feature_flag_is_disabled( + #{states_per_node := StatesPerNode}, + FeatureName) -> + Nodes = lists:sort( + maps:keys( + maps:filter( + fun(_Node, FeatureStates) -> + case FeatureStates of + %% The feature flag is known on this node, run + %% the migration function only if it is + %% disabled. + #{FeatureName := Enabled} -> not Enabled; + + + %% The feature flags is unknown on this node, + %% don't run the migration function. + _ -> false + end + end, StatesPerNode))), + this_node_first(Nodes). + +this_node_first(Nodes) -> + ThisNode = node(), + case lists:member(ThisNode, Nodes) of + true -> [ThisNode | Nodes -- [ThisNode]]; + false -> Nodes + end. + +-spec rpc_call(Node, Module, Function, Args, Timeout) -> Ret when + Node :: node(), + Module :: module(), + Function :: atom(), + Args :: [term()], + Timeout :: timeout(), + Ret :: term() | {error, term()}. + +rpc_call(Node, Module, Function, Args, Timeout) -> + case rpc:call(Node, Module, Function, Args, Timeout) of + {badrpc, {'EXIT', + {undef, + [{rabbit_feature_flags, Function, Args, []} + | _]}}} -> + %% If rabbit_feature_flags:Function() is undefined + %% on the remote node, we consider it to be a 3.7.x + %% pre-feature-flags node. + %% + %% Theoretically, it could be an older version (3.6.x and + %% older). But the RabbitMQ version consistency check + %% (rabbit_misc:version_minor_equivalent/2) called from + %% rabbit_mnesia:check_rabbit_consistency/2 already blocked + %% this situation from happening before we reach this point. + ?LOG_DEBUG( + "Feature flags: ~s:~s~p unavailable on node `~s`: " + "assuming it is a RabbitMQ 3.7.x pre-feature-flags node", + [?MODULE, Function, Args, Node], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + {error, pre_feature_flags_rabbitmq}; + {badrpc, Reason} = Error -> + ?LOG_ERROR( + "Feature flags: error while running:~n" + "Feature flags: ~s:~s~p~n" + "Feature flags: on node `~s`:~n" + "Feature flags: ~p", + [?MODULE, Function, Args, Node, Reason], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + {error, Error}; + Ret -> + Ret + end. + +-spec rpc_calls(Nodes, Module, Function, Args, Timeout) -> Rets when + Nodes :: [node()], + Module :: module(), + Function :: atom(), + Args :: [term()], + Timeout :: timeout(), + Rets :: #{node() => term()}. + +rpc_calls(Nodes, Module, Function, Args, Timeout) when is_list(Nodes) -> + Parent = self(), + CallRef = {Module, Function, Args}, + Runners = [{Node, + spawn_monitor( + fun() -> + Ret = rpc_call( + Node, Module, Function, Args, Timeout), + Parent ! {internal_rpc_call, Node, CallRef, Ret} + end)} + || Node <- Nodes], + Rets = [receive + {internal_rpc_call, Node, CallRef, Ret} -> + %% After we got the return value for that node, we still + %% need to consume the `DOWN' message emitted once the + %% spawn process exited. + receive + {'DOWN', MRef, process, Pid, Reason} -> + ?assertEqual(normal, Reason) + end, + {Node, Ret}; + {'DOWN', MRef, process, Pid, Reason} -> + {Node, {error, {'DOWN', Reason}}} + end + || {Node, {Pid, MRef}} <- Runners], + maps:from_list(Rets). + +%% -------------------------------------------------------------------- +%% Feature flag support queries. +%% -------------------------------------------------------------------- + +-spec is_known(Inventory, FeatureFlag) -> IsKnown when + Inventory :: rabbit_feature_flags:cluster_inventory(), + FeatureFlag :: rabbit_feature_flags:feature_props_extended(), + IsKnown :: boolean(). + +is_known( + #{applications_per_node := ScannedAppsPerNode}, + #{provided_by := App} = _FeatureFlag) -> + maps:fold( + fun + (_Node, ScannedApps, false) -> lists:member(App, ScannedApps); + (_Node, _ScannedApps, true) -> true + end, false, ScannedAppsPerNode). + +-spec is_known_and_supported(Inventory, FeatureName) -> + IsKnownAndSupported when + Inventory :: rabbit_feature_flags:cluster_inventory(), + FeatureName :: rabbit_feature_flags:feature_name(), + IsKnownAndSupported :: boolean(). + +is_known_and_supported( + #{feature_flags := FeatureFlags, + applications_per_node := ScannedAppsPerNode, + states_per_node := StatesPerNode}, + FeatureName) + when is_map_key(FeatureName, FeatureFlags) -> + %% A feature flag is considered supported by a node if: + %% - the node knows the feature flag, or + %% - the node does not have the application providing it. + %% + %% Therefore, we first need to look up the application providing this + %% feature flag. + #{FeatureName := #{provided_by := App}} = FeatureFlags, + + maps:fold( + fun + (Node, FeatureStates, true) -> + case FeatureStates of + #{FeatureName := _} -> + %% The node knows about the feature flag. + true; + _ -> + %% The node doesn't know about the feature flag, so does + %% it have the application providing it loaded? + #{Node := ScannedApps} = ScannedAppsPerNode, + not lists:member(App, ScannedApps) + end; + (_Node, _FeatureStates, false) -> + false + end, true, StatesPerNode); +is_known_and_supported(_Inventory, _FeatureName) -> + %% None of the nodes know about this feature flag at all. + false. + +%% -------------------------------------------------------------------- +%% Feature flag state changes. +%% -------------------------------------------------------------------- + +-spec mark_as_enabled_on_nodes(Nodes, Inventory, FeatureName, IsEnabled) -> + Ret when + Nodes :: [node()], + Inventory :: rabbit_feature_flags:cluster_inventory(), + FeatureName :: rabbit_feature_flags:feature_name(), + IsEnabled :: rabbit_feature_flags:feature_state(), + Ret :: {ok, Inventory} | {error, Reason}, + Reason :: term(). + +mark_as_enabled_on_nodes( + Nodes, + #{states_per_node := StatesPerNode} = Inventory, + FeatureName, IsEnabled) -> + ?LOG_DEBUG( + "Feature flags: `~s`: mark as enabled=~p on nodes ~p", + [FeatureName, IsEnabled, Nodes], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + Rets = rpc_calls( + Nodes, rabbit_feature_flags, mark_as_enabled_locally, + [FeatureName, IsEnabled], ?TIMEOUT), + Ret = maps:fold( + fun + (Node, ok, {ok, StatesPerNode1}) -> + FeatureStates1 = maps:get(Node, StatesPerNode1), + FeatureStates2 = FeatureStates1#{ + FeatureName => IsEnabled}, + StatesPerNode2 = StatesPerNode1#{ + Node => FeatureStates2}, + {ok, StatesPerNode2}; + (_Node, ok, Error) -> + Error; + (_Node, Error, {ok, _StatesPerNode1}) -> + Error; + (_Node, _Error, Error) -> + Error + end, {ok, StatesPerNode}, Rets), + case Ret of + {ok, StatesPerNode3} -> + Inventory1 = Inventory#{states_per_node => StatesPerNode3}, + {ok, Inventory1}; + Error -> + Error + end. + +%% -------------------------------------------------------------------- +%% Feature flag dependencies handling. +%% -------------------------------------------------------------------- + +-spec enable_dependencies(Inventory, FeatureName) -> Ret when + Inventory :: rabbit_feature_flags:cluster_inventory(), + FeatureName :: rabbit_feature_flags:feature_name(), + Ret :: {ok, Inventory} | {error, Reason}, + Reason :: term(). + +enable_dependencies( + #{feature_flags := FeatureFlags} = Inventory, FeatureName) -> + #{FeatureName := FeatureProps} = FeatureFlags, + DependsOn = maps:get(depends_on, FeatureProps, []), + case DependsOn of + [] -> + {ok, Inventory}; + _ -> + ?LOG_DEBUG( + "Feature flags: `~s`: enable dependencies: ~p", + [FeatureName, DependsOn], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + enable_dependencies1(Inventory, DependsOn) + end. + +enable_dependencies1( + #{states_per_node := _} = Inventory, [FeatureName | Rest]) -> + case enable_with_registry_locked(Inventory, FeatureName) of + {ok, Inventory1} -> enable_dependencies1(Inventory1, Rest); + Error -> Error + end; +enable_dependencies1( + #{states_per_node := _} = Inventory, []) -> + {ok, Inventory}. + +%% -------------------------------------------------------------------- +%% Migration function. +%% -------------------------------------------------------------------- + +-spec run_migration_fun(Nodes, FeatureName, Command, Extra, Timeout) -> + Rets when + Nodes :: [node()], + FeatureName :: rabbit_feature_flags:feature_name(), + Command :: atom(), + Extra :: map(), + Timeout :: timeout(), + Rets :: #{node() => term()}. + +run_migration_fun(Nodes, FeatureName, Command, Extra, Timeout) -> + FeatureProps = rabbit_ff_registry:get(FeatureName), + case maps:get(migration_fun, FeatureProps, none) of + {MigrationMod, MigrationFun} + when is_atom(MigrationMod) andalso is_atom(MigrationFun) -> + UsesMFv2 = rabbit_feature_flags:uses_migration_fun_v2( + MigrationMod, MigrationFun), + case UsesMFv2 of + true -> + %% The migration fun API v2 is of the form: + %% MigrationMod:MigrationFun(#ffcommand{...}). + %% + %% Also, the function is executed on all nodes in + %% parallel. + FFCommand = #ffcommand{ + name = FeatureName, + props = FeatureProps, + command = Command, + extra = Extra}, + run_migration_fun_v2( + Nodes, MigrationMod, MigrationFun, FFCommand, Timeout); + false -> + %% The migration fun API v1 is of the form: + %% MigrationMod:MigrationFun( + %% FeatureName, FeatureProps, Command). + %% + %% Also, the function is executed once on the calling node + %% only. + Ret = rabbit_feature_flags:run_migration_fun( + FeatureName, FeatureProps, Command), + #{node() => Ret} + end; + none -> + #{}; + Invalid -> + ?LOG_ERROR( + "Feature flags: `~s`: invalid migration function: ~p", + [FeatureName, Invalid], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + #{node() => {error, {invalid_migration_fun, Invalid}}} + end. + +-spec run_migration_fun_v2(Nodes, MigrationMod, MigrationFun, FFCommand, + Timeout) -> Rets when + Nodes :: [node()], + MigrationMod :: module(), + MigrationFun :: atom(), + FFCommand :: rabbit_feature_flags:ffcommand(), + Timeout :: timeout(), + Rets :: #{node() => term}. + +run_migration_fun_v2( + Nodes, MigrationMod, MigrationFun, + #ffcommand{name = FeatureName} = FFCommand, + Timeout) -> + ?LOG_DEBUG( + "Feature flags: `~s`: run migration function (v2) ~s:~s~n" + "Feature flags: with command=~p~n" + "Feature flags: on nodes ~p", + [FeatureName, MigrationMod, MigrationFun, FFCommand, Nodes], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + Rets = rpc_calls( + Nodes, MigrationMod, MigrationFun, [FFCommand], Timeout), + ?LOG_DEBUG( + "Feature flags: `~s`: migration function (v2) ~s:~s returned:~n" + "Feature flags: ~p", + [FeatureName, MigrationMod, MigrationFun, Rets], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + Rets. diff --git a/deps/rabbit/src/rabbit_ff_registry.erl b/deps/rabbit/src/rabbit_ff_registry.erl index b160de060d..78eef2a34c 100644 --- a/deps/rabbit/src/rabbit_ff_registry.erl +++ b/deps/rabbit/src/rabbit_ff_registry.erl @@ -26,7 +26,8 @@ is_supported/1, is_enabled/1, is_registry_initialized/0, - is_registry_written_to_disk/0]). + is_registry_written_to_disk/0, + inventory/0]). -ifdef(TEST). -on_load(on_load/0). @@ -44,7 +45,7 @@ %% @returns the properties of the specified feature flag. get(FeatureName) -> - rabbit_feature_flags:initialize_registry(), + rabbit_ff_registry_factory:initialize_registry(), %% Initially, is_registry_initialized/0 always returns `false` %% and this ?MODULE:get(FeatureName) is always called. The case %% statement is here to please Dialyzer. @@ -65,7 +66,7 @@ get(FeatureName) -> %% @returns A map of selected feature flags. list(Which) -> - rabbit_feature_flags:initialize_registry(), + rabbit_ff_registry_factory:initialize_registry(), %% See get/1 for an explanation of the case statement below. case is_registry_initialized() of false -> ?MODULE:list(Which); @@ -82,7 +83,7 @@ list(Which) -> %% @returns A map of feature flag states. states() -> - rabbit_feature_flags:initialize_registry(), + rabbit_ff_registry_factory:initialize_registry(), %% See get/1 for an explanation of the case statement below. case is_registry_initialized() of false -> ?MODULE:states(); @@ -101,7 +102,7 @@ states() -> %% otherwise. is_supported(FeatureName) -> - rabbit_feature_flags:initialize_registry(), + rabbit_ff_registry_factory:initialize_registry(), %% See get/1 for an explanation of the case statement below. case is_registry_initialized() of false -> ?MODULE:is_supported(FeatureName); @@ -120,7 +121,7 @@ is_supported(FeatureName) -> %% its state is transient, or `false' otherwise. is_enabled(FeatureName) -> - rabbit_feature_flags:initialize_registry(), + rabbit_ff_registry_factory:initialize_registry(), %% See get/1 for an explanation of the case statement below. case is_registry_initialized() of false -> ?MODULE:is_enabled(FeatureName); @@ -158,6 +159,13 @@ is_registry_initialized() -> is_registry_written_to_disk() -> always_return_true(). +-spec inventory() -> rabbit_feature_flags:inventory(). + +inventory() -> + #{applications => [], + feature_flags => #{}, + states => #{}}. + always_return_true() -> %% This function is here to trick Dialyzer. We want some functions %% in this initial on-disk registry to always return `true` or diff --git a/deps/rabbit/src/rabbit_ff_registry_factory.erl b/deps/rabbit/src/rabbit_ff_registry_factory.erl new file mode 100644 index 0000000000..a9b4b0c7b4 --- /dev/null +++ b/deps/rabbit/src/rabbit_ff_registry_factory.erl @@ -0,0 +1,631 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2018-2022 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_ff_registry_factory). + +-include_lib("kernel/include/logger.hrl"). + +-export([initialize_registry/0, + initialize_registry/1, + initialize_registry/3, + acquire_state_change_lock/0, + release_state_change_lock/0]). + +-ifdef(TEST). +-export([registry_loading_lock/0]). +-endif. + +-define(FF_STATE_CHANGE_LOCK, {feature_flags_state_change, self()}). +-define(FF_REGISTRY_LOADING_LOCK, {feature_flags_registry_loading, self()}). + +-type registry_vsn() :: term(). + +acquire_state_change_lock() -> + global:set_lock(?FF_STATE_CHANGE_LOCK). + +release_state_change_lock() -> + global:del_lock(?FF_STATE_CHANGE_LOCK). + +-spec initialize_registry() -> ok | {error, any()} | no_return(). +%% @private +%% @doc +%% Initializes or reinitializes the registry. +%% +%% The registry is an Erlang module recompiled at runtime to hold the +%% state of all supported feature flags. +%% +%% That Erlang module is called {@link rabbit_ff_registry}. The initial +%% source code of this module simply calls this function so it is +%% replaced by a proper registry. +%% +%% Once replaced, the registry contains the map of all supported feature +%% flags and their state. This is makes it very efficient to query a +%% feature flag state or property. +%% +%% The registry is local to all RabbitMQ nodes. + +initialize_registry() -> + initialize_registry(#{}). + +-spec initialize_registry(rabbit_feature_flags:feature_flags()) -> + ok | {error, any()} | no_return(). +%% @private +%% @doc +%% Initializes or reinitializes the registry. +%% +%% See {@link initialize_registry/0} for a description of the registry. +%% +%% This function takes a map of new supported feature flags (so their +%% name and extended properties) to add to the existing known feature +%% flags. + +initialize_registry(NewSupportedFeatureFlags) -> + %% The first step is to get the feature flag states: if this is the + %% first time we initialize it, we read the list from disk (the + %% `feature_flags` file). Otherwise we query the existing registry + %% before it is replaced. + RegistryInitialized = rabbit_ff_registry:is_registry_initialized(), + FeatureStates = + case RegistryInitialized of + true -> + rabbit_ff_registry:states(); + false -> + EnabledFeatureNames = + rabbit_feature_flags:read_enabled_feature_flags_list(), + rabbit_feature_flags:enabled_feature_flags_to_feature_states( + EnabledFeatureNames) + end, + + %% We also record if the feature flags state was correctly written + %% to disk. Currently we don't use this information, but in the + %% future, we might want to retry the write if it failed so far. + %% + %% TODO: Retry to write the feature flags state if the first try + %% failed. + WrittenToDisk = case RegistryInitialized of + true -> + rabbit_ff_registry:is_registry_written_to_disk(); + false -> + true + end, + initialize_registry(NewSupportedFeatureFlags, + FeatureStates, + WrittenToDisk). + +-spec initialize_registry(rabbit_feature_flags:feature_flags(), + rabbit_feature_flags:feature_states(), + boolean()) -> + ok | {error, any()} | no_return(). +%% @private +%% @doc +%% Initializes or reinitializes the registry. +%% +%% See {@link initialize_registry/0} for a description of the registry. +%% +%% This function takes a map of new supported feature flags (so their +%% name and extended properties) to add to the existing known feature +%% flags, a map of the new feature flag states (whether they are +%% enabled, disabled or `state_changing'), and a flag to indicate if the +%% feature flag states was recorded to disk. +%% +%% The latter is used to block callers asking if a feature flag is +%% enabled or disabled while its state is changing. + +initialize_registry(NewSupportedFeatureFlags, + NewFeatureStates, + WrittenToDisk) -> + Ret = maybe_initialize_registry(NewSupportedFeatureFlags, + NewFeatureStates, + WrittenToDisk), + case Ret of + ok -> ok; + restart -> initialize_registry(NewSupportedFeatureFlags, + NewFeatureStates, + WrittenToDisk); + Error -> Error + end. + +-spec maybe_initialize_registry(rabbit_feature_flags:feature_flags(), + rabbit_feature_flags:feature_states(), + boolean()) -> + ok | restart | {error, any()} | no_return(). + +maybe_initialize_registry(NewSupportedFeatureFlags, + NewFeatureStates, + WrittenToDisk) -> + %% We save the version of the current registry before computing + %% the new one. This is used when we do the actual reload: if the + %% current registry was reloaded in the meantime, we need to restart + %% the computation to make sure we don't loose data. + RegistryVsn = registry_vsn(), + + %% We take the feature flags already registered. + RegistryInitialized = rabbit_ff_registry:is_registry_initialized(), + KnownFeatureFlags1 = case RegistryInitialized of + true -> rabbit_ff_registry:list(all); + false -> #{} + end, + + %% Query the list (it's a map to be exact) of known + %% supported feature flags. That list comes from the + %% `-rabbitmq_feature_flag().` module attributes exposed by all + %% currently loaded Erlang modules. + {ScannedApps, KnownFeatureFlags2} = + rabbit_feature_flags:query_supported_feature_flags(), + + %% We merge the feature flags we already knew about + %% (KnownFeatureFlags1), those found in the loaded applications + %% (KnownFeatureFlags2) and those specified in arguments + %% (NewSupportedFeatureFlags). The latter come from remote nodes + %% usually: for example, they can come from plugins loaded on remote + %% node but the plugins are missing locally. In this case, we + %% consider those feature flags supported because there is no code + %% locally which would cause issues. + %% + %% It means that the list of feature flags only grows. we don't try + %% to clean it at some point because we want to remember about the + %% feature flags we saw (and their state). It should be fine because + %% that list should remain small. + KnownFeatureFlags = maps:merge(KnownFeatureFlags1, + KnownFeatureFlags2), + AllFeatureFlags = maps:merge(KnownFeatureFlags, + NewSupportedFeatureFlags), + + %% Next we want to update the feature states, based on the new + %% states passed as arguments. + FeatureStates0 = case RegistryInitialized of + true -> + maps:merge(rabbit_ff_registry:states(), + NewFeatureStates); + false -> + NewFeatureStates + end, + FeatureStates = maps:map( + fun(FeatureName, _FeatureProps) -> + case FeatureStates0 of + #{FeatureName := FeatureState} -> + FeatureState; + _ -> + false + end + end, AllFeatureFlags), + + %% The feature flags inventory is used by rabbit_ff_controller to query + %% feature flags atomically. The inventory also contains the list of + %% scanned applications: this is used to determine if an application is + %% known by this node or not, and decide if a missing feature flag is + %% unknown or unsupported. + Inventory = #{applications => ScannedApps, + feature_flags => KnownFeatureFlags2, + states => FeatureStates}, + + Proceed = does_registry_need_refresh(AllFeatureFlags, + FeatureStates, + WrittenToDisk), + + case Proceed of + true -> + rabbit_log_feature_flags:debug( + "Feature flags: (re)initialize registry (~p)", + [self()]), + T0 = erlang:timestamp(), + Ret = do_initialize_registry(RegistryVsn, + AllFeatureFlags, + FeatureStates, + Inventory, + WrittenToDisk), + T1 = erlang:timestamp(), + rabbit_log_feature_flags:debug( + "Feature flags: time to regen registry: ~p µs", + [timer:now_diff(T1, T0)]), + Ret; + false -> + rabbit_log_feature_flags:debug( + "Feature flags: registry already up-to-date, skipping init"), + ok + end. + +-spec does_registry_need_refresh(rabbit_feature_flags:feature_flags(), + rabbit_feature_flags:feature_states(), + boolean()) -> + boolean(). + +does_registry_need_refresh(AllFeatureFlags, + FeatureStates, + WrittenToDisk) -> + case rabbit_ff_registry:is_registry_initialized() of + true -> + %% Before proceeding with the actual + %% (re)initialization, let's see if there are any + %% changes. + CurrentAllFeatureFlags = rabbit_ff_registry:list(all), + CurrentFeatureStates = rabbit_ff_registry:states(), + CurrentWrittenToDisk = + rabbit_ff_registry:is_registry_written_to_disk(), + + if + AllFeatureFlags =/= CurrentAllFeatureFlags -> + rabbit_log_feature_flags:debug( + "Feature flags: registry refresh needed: " + "yes, list of feature flags differs"), + true; + FeatureStates =/= CurrentFeatureStates -> + rabbit_log_feature_flags:debug( + "Feature flags: registry refresh needed: " + "yes, feature flag states differ"), + true; + WrittenToDisk =/= CurrentWrittenToDisk -> + rabbit_log_feature_flags:debug( + "Feature flags: registry refresh needed: " + "yes, \"written to disk\" state changed"), + true; + true -> + rabbit_log_feature_flags:debug( + "Feature flags: registry refresh needed: no"), + false + end; + false -> + rabbit_log_feature_flags:debug( + "Feature flags: registry refresh needed: " + "yes, first-time initialization"), + true + end. + +-spec do_initialize_registry(registry_vsn(), + rabbit_feature_flags:feature_flags(), + rabbit_feature_flags:feature_states(), + rabbit_feature_flags:inventory(), + boolean()) -> + ok | restart | {error, any()} | no_return(). +%% @private + +do_initialize_registry(RegistryVsn, + AllFeatureFlags, + FeatureStates, + #{applications := ScannedApps} = Inventory, + WrittenToDisk) -> + %% We log the state of those feature flags. + rabbit_log_feature_flags:debug( + "Feature flags: list of feature flags found:~n" ++ + lists:flatten( + [rabbit_misc:format( + "Feature flags: [~s] ~s~n", + [case maps:get(FeatureName, FeatureStates, false) of + true -> "x"; + state_changing -> "~~"; + false -> " " + end, + FeatureName]) + || FeatureName <- lists:sort(maps:keys(AllFeatureFlags))] ++ + [rabbit_misc:format( + "Feature flags: scanned applications: ~p~n" + "Feature flags: feature flag states written to disk: ~s", + [ScannedApps, + case WrittenToDisk of + true -> "yes"; + false -> "no" + end])]) + ), + + %% We request the registry to be regenerated and reloaded with the + %% new state. + regen_registry_mod(RegistryVsn, + AllFeatureFlags, + FeatureStates, + Inventory, + WrittenToDisk). + +-spec regen_registry_mod( + RegistryVsn, AllFeatureFlags, FeatureStates, Inventory, + WrittenToDisk) -> Ret when + RegistryVsn :: registry_vsn(), + AllFeatureFlags :: rabbit_feature_flags:feature_flags(), + FeatureStates :: rabbit_feature_flags:feature_states(), + Inventory :: rabbit_feature_flags:inventory(), + WrittenToDisk :: boolean(), + Ret :: ok | restart | {error, any()} | no_return(). +%% @private + +regen_registry_mod(RegistryVsn, + AllFeatureFlags, + FeatureStates, + Inventory, + WrittenToDisk) -> + %% Here, we recreate the source code of the `rabbit_ff_registry` + %% module from scratch. + %% + %% IMPORTANT: We want both modules to have the exact same public + %% API in order to simplify the life of developers and their tools + %% (Dialyzer, completion, and so on). + + %% -module(rabbit_ff_registry). + ModuleAttr = erl_syntax:attribute( + erl_syntax:atom(module), + [erl_syntax:atom(rabbit_ff_registry)]), + ModuleForm = erl_syntax:revert(ModuleAttr), + %% -export([...]). + ExportAttr = erl_syntax:attribute( + erl_syntax:atom(export), + [erl_syntax:list( + [erl_syntax:arity_qualifier( + erl_syntax:atom(F), + erl_syntax:integer(A)) + || {F, A} <- [{get, 1}, + {list, 1}, + {states, 0}, + {is_supported, 1}, + {is_enabled, 1}, + {is_registry_initialized, 0}, + {is_registry_written_to_disk, 0}, + {inventory, 0}]] + ) + ] + ), + ExportForm = erl_syntax:revert(ExportAttr), + %% get(_) -> ... + GetClauses = [erl_syntax:clause( + [erl_syntax:atom(FeatureName)], + [], + [erl_syntax:abstract(maps:get(FeatureName, + AllFeatureFlags))]) + || FeatureName <- maps:keys(AllFeatureFlags) + ], + GetUnknownClause = erl_syntax:clause( + [erl_syntax:variable("_")], + [], + [erl_syntax:atom(undefined)]), + GetFun = erl_syntax:function( + erl_syntax:atom(get), + GetClauses ++ [GetUnknownClause]), + GetFunForm = erl_syntax:revert(GetFun), + %% list(_) -> ... + ListAllBody = erl_syntax:abstract(AllFeatureFlags), + ListAllClause = erl_syntax:clause([erl_syntax:atom(all)], + [], + [ListAllBody]), + EnabledFeatureFlags = maps:filter( + fun(FeatureName, _) -> + maps:is_key(FeatureName, + FeatureStates) + andalso + maps:get(FeatureName, FeatureStates) + =:= + true + end, AllFeatureFlags), + ListEnabledBody = erl_syntax:abstract(EnabledFeatureFlags), + ListEnabledClause = erl_syntax:clause( + [erl_syntax:atom(enabled)], + [], + [ListEnabledBody]), + DisabledFeatureFlags = maps:filter( + fun(FeatureName, _) -> + not maps:is_key(FeatureName, + FeatureStates) + orelse + maps:get(FeatureName, FeatureStates) + =:= + false + end, AllFeatureFlags), + ListDisabledBody = erl_syntax:abstract(DisabledFeatureFlags), + ListDisabledClause = erl_syntax:clause( + [erl_syntax:atom(disabled)], + [], + [ListDisabledBody]), + StateChangingFeatureFlags = maps:filter( + fun(FeatureName, _) -> + maps:is_key(FeatureName, + FeatureStates) + andalso + maps:get(FeatureName, FeatureStates) + =:= + state_changing + end, AllFeatureFlags), + ListStateChangingBody = erl_syntax:abstract(StateChangingFeatureFlags), + ListStateChangingClause = erl_syntax:clause( + [erl_syntax:atom(state_changing)], + [], + [ListStateChangingBody]), + ListFun = erl_syntax:function( + erl_syntax:atom(list), + [ListAllClause, + ListEnabledClause, + ListDisabledClause, + ListStateChangingClause]), + ListFunForm = erl_syntax:revert(ListFun), + %% states() -> ... + StatesBody = erl_syntax:abstract(FeatureStates), + StatesClause = erl_syntax:clause([], [], [StatesBody]), + StatesFun = erl_syntax:function( + erl_syntax:atom(states), + [StatesClause]), + StatesFunForm = erl_syntax:revert(StatesFun), + %% is_supported(_) -> ... + IsSupportedClauses = [erl_syntax:clause( + [erl_syntax:atom(FeatureName)], + [], + [erl_syntax:atom(true)]) + || FeatureName <- maps:keys(AllFeatureFlags) + ], + NotSupportedClause = erl_syntax:clause( + [erl_syntax:variable("_")], + [], + [erl_syntax:atom(false)]), + IsSupportedFun = erl_syntax:function( + erl_syntax:atom(is_supported), + IsSupportedClauses ++ [NotSupportedClause]), + IsSupportedFunForm = erl_syntax:revert(IsSupportedFun), + %% is_enabled(_) -> ... + IsEnabledClauses = [erl_syntax:clause( + [erl_syntax:atom(FeatureName)], + [], + [case maps:is_key(FeatureName, FeatureStates) of + true -> + erl_syntax:atom( + maps:get(FeatureName, FeatureStates)); + false -> + erl_syntax:atom(false) + end]) + || FeatureName <- maps:keys(AllFeatureFlags) + ], + NotEnabledClause = erl_syntax:clause( + [erl_syntax:variable("_")], + [], + [erl_syntax:atom(false)]), + IsEnabledFun = erl_syntax:function( + erl_syntax:atom(is_enabled), + IsEnabledClauses ++ [NotEnabledClause]), + IsEnabledFunForm = erl_syntax:revert(IsEnabledFun), + %% is_registry_initialized() -> ... + IsInitializedClauses = [erl_syntax:clause( + [], + [], + [erl_syntax:atom(true)]) + ], + IsInitializedFun = erl_syntax:function( + erl_syntax:atom(is_registry_initialized), + IsInitializedClauses), + IsInitializedFunForm = erl_syntax:revert(IsInitializedFun), + %% is_registry_written_to_disk() -> ... + IsWrittenToDiskClauses = [erl_syntax:clause( + [], + [], + [erl_syntax:atom(WrittenToDisk)]) + ], + IsWrittenToDiskFun = erl_syntax:function( + erl_syntax:atom(is_registry_written_to_disk), + IsWrittenToDiskClauses), + IsWrittenToDiskFunForm = erl_syntax:revert(IsWrittenToDiskFun), + %% inventory() -> ... + InventoryBody = erl_syntax:abstract(Inventory), + InventoryClause = erl_syntax:clause([], [], [InventoryBody]), + InventoryFun = erl_syntax:function( + erl_syntax:atom(inventory), + [InventoryClause]), + InventoryFunForm = erl_syntax:revert(InventoryFun), + %% Compilation! + Forms = [ModuleForm, + ExportForm, + GetFunForm, + ListFunForm, + StatesFunForm, + IsSupportedFunForm, + IsEnabledFunForm, + IsInitializedFunForm, + IsWrittenToDiskFunForm, + InventoryFunForm], + maybe_log_registry_source_code(Forms), + CompileOpts = [return_errors, + return_warnings], + case compile:forms(Forms, CompileOpts) of + {ok, Mod, Bin, _} -> + load_registry_mod(RegistryVsn, Mod, Bin); + {error, Errors, Warnings} -> + rabbit_log_feature_flags:error( + "Feature flags: registry compilation failure:~n" + "Errors: ~p~n" + "Warnings: ~p", + [Errors, Warnings]), + {error, {compilation_failure, Errors, Warnings}}; + error -> + rabbit_log_feature_flags:error( + "Feature flags: registry compilation failure", + []), + {error, {compilation_failure, [], []}} + end. + +maybe_log_registry_source_code(Forms) -> + case rabbit_prelaunch:get_context() of + #{log_feature_flags_registry := true} -> + rabbit_log_feature_flags:debug( + "== FEATURE FLAGS REGISTRY ==~n" + "~s~n" + "== END ==~n", + [erl_prettypr:format(erl_syntax:form_list(Forms))]), + ok; + _ -> + ok + end. + +-ifdef(TEST). +registry_loading_lock() -> ?FF_REGISTRY_LOADING_LOCK. +-endif. + +-spec load_registry_mod(registry_vsn(), module(), binary()) -> + ok | restart | no_return(). +%% @private + +load_registry_mod(RegistryVsn, Mod, Bin) -> + rabbit_log_feature_flags:debug( + "Feature flags: registry module ready, loading it (~p)...", + [self()]), + FakeFilename = "Compiled and loaded by " ?MODULE_STRING, + %% Time to load the new registry, replacing the old one. We use a + %% lock here to synchronize concurrent reloads. + global:set_lock(?FF_REGISTRY_LOADING_LOCK, [node()]), + rabbit_log_feature_flags:debug( + "Feature flags: acquired lock before reloading registry module (~p)", + [self()]), + %% We want to make sure that the old registry (not the one being + %% currently in use) is purged by the code server. It means no + %% process lingers on that old code. + %% + %% We use code:soft_purge() for that (meaning no process is killed) + %% and we wait in an infinite loop for that to succeed. + ok = purge_old_registry(Mod), + %% Now we can replace the currently loaded registry by the new one. + %% The code server takes care of marking the current registry as old + %% and load the new module in an atomic operation. + %% + %% Therefore there is no chance of a window where there is no + %% registry module available, causing the one on disk to be + %% reloaded. + Ret = case registry_vsn() of + RegistryVsn -> code:load_binary(Mod, FakeFilename, Bin); + OtherVsn -> {error, {restart, RegistryVsn, OtherVsn}} + end, + rabbit_log_feature_flags:debug( + "Feature flags: releasing lock after reloading registry module (~p)", + [self()]), + global:del_lock(?FF_REGISTRY_LOADING_LOCK, [node()]), + case Ret of + {module, _} -> + rabbit_log_feature_flags:debug( + "Feature flags: registry module loaded (vsn: ~p -> ~p)", + [RegistryVsn, registry_vsn()]), + ok; + {error, {restart, Expected, Current}} -> + rabbit_log_feature_flags:error( + "Feature flags: another registry module was loaded in the " + "meantime (expected old vsn: ~p, current vsn: ~p); " + "restarting the regen", + [Expected, Current]), + restart; + {error, Reason} -> + rabbit_log_feature_flags:error( + "Feature flags: failed to load registry module: ~p", + [Reason]), + throw({feature_flag_registry_reload_failure, Reason}) + end. + +-spec registry_vsn() -> registry_vsn(). +%% @private + +registry_vsn() -> + Attrs = rabbit_ff_registry:module_info(attributes), + proplists:get_value(vsn, Attrs, undefined). + +purge_old_registry(Mod) -> + case code:is_loaded(Mod) of + {file, _} -> do_purge_old_registry(Mod); + false -> ok + end. + +do_purge_old_registry(Mod) -> + case code:soft_purge(Mod) of + true -> ok; + false -> do_purge_old_registry(Mod) + end. diff --git a/deps/rabbit/src/rabbit_prelaunch_feature_flags.erl b/deps/rabbit/src/rabbit_prelaunch_feature_flags.erl index 51040ad75c..3d9a1b470a 100644 --- a/deps/rabbit/src/rabbit_prelaunch_feature_flags.erl +++ b/deps/rabbit/src/rabbit_prelaunch_feature_flags.erl @@ -22,7 +22,7 @@ setup(#{feature_flags_file := FFFile}) -> ?LOG_DEBUG( "Initializing feature flags registry", [], #{domain => ?RMQLOG_DOMAIN_PRELAUNCH}), - case rabbit_feature_flags:initialize_registry() of + case rabbit_ff_registry_factory:initialize_registry() of ok -> ok; {error, Reason} -> diff --git a/deps/rabbit/test/feature_flags_SUITE.erl b/deps/rabbit/test/feature_flags_SUITE.erl index da0283e78d..f8402f32d3 100644 --- a/deps/rabbit/test/feature_flags_SUITE.erl +++ b/deps/rabbit/test/feature_flags_SUITE.erl @@ -45,19 +45,13 @@ suite() -> all() -> [ {group, registry}, - {group, enabling_on_single_node}, - {group, enabling_in_cluster}, - {group, clustering}, - {group, activating_plugin} + {group, feature_flags_v1}, + {group, feature_flags_v2} ]. groups() -> + Groups = [ - {registry, [], - [ - registry_general_usage, - registry_concurrent_reloads - ]}, {enabling_on_single_node, [], [ enable_feature_flag_in_a_healthy_situation, @@ -85,6 +79,16 @@ groups() -> activating_plugin_with_new_ff_disabled, activating_plugin_with_new_ff_enabled ]} + ], + + [ + {registry, [], + [ + registry_general_usage, + registry_concurrent_reloads + ]}, + {feature_flags_v1, [], Groups}, + {feature_flags_v2, [], Groups} ]. %% ------------------------------------------------------------------- @@ -100,6 +104,10 @@ init_per_suite(Config) -> end_per_suite(Config) -> rabbit_ct_helpers:run_teardown_steps(Config). +init_per_group(feature_flags_v1, Config) -> + rabbit_ct_helpers:set_config(Config, {enable_feature_flags_v2, false}); +init_per_group(feature_flags_v2, Config) -> + rabbit_ct_helpers:set_config(Config, {enable_feature_flags_v2, true}); init_per_group(enabling_on_single_node, Config) -> rabbit_ct_helpers:set_config( Config, @@ -161,6 +169,16 @@ end_per_group(_, Config) -> init_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_started(Config, Testcase), TestNumber = rabbit_ct_helpers:testcase_number(Config, ?MODULE, Testcase), + UsingFFv2 = rabbit_ct_helpers:get_config( + Config, enable_feature_flags_v2, false), + ForcedFFs = case UsingFFv2 of + true -> [feature_flags_v2]; + false -> [] + end, + Suffix = case UsingFFv2 of + false -> rabbit_misc:format("~s-v1", [Testcase]); + true -> rabbit_misc:format("~s-v2", [Testcase]) + end, case ?config(tc_group_properties, Config) of [{name, registry} | _] -> logger:set_primary_config(level, debug), @@ -178,14 +196,14 @@ init_per_testcase(Testcase, Config) -> ClusterSize = ?config(rmq_nodes_count, Config), Config1 = rabbit_ct_helpers:set_config( Config, - [{rmq_nodename_suffix, Testcase}, + [{rmq_nodename_suffix, Suffix}, {tcp_ports_base, {skip_n_nodes, TestNumber * ClusterSize}} ]), Config2 = rabbit_ct_helpers:merge_app_env( Config1, {rabbit, - [{forced_feature_flags_on_init, []}, + [{forced_feature_flags_on_init, ForcedFFs}, {log, [{file, [{level, debug}]}]}]}), Config3 = rabbit_ct_helpers:run_steps( Config2, @@ -211,7 +229,7 @@ init_per_testcase(Testcase, Config) -> ClusterSize = ?config(rmq_nodes_count, Config), Config1 = rabbit_ct_helpers:set_config( Config, - [{rmq_nodename_suffix, Testcase}, + [{rmq_nodename_suffix, Suffix}, {tcp_ports_base, {skip_n_nodes, TestNumber * ClusterSize}}, {net_ticktime, 5} @@ -219,7 +237,7 @@ init_per_testcase(Testcase, Config) -> Config2 = rabbit_ct_helpers:merge_app_env( Config1, {rabbit, - [{forced_feature_flags_on_init, []}, + [{forced_feature_flags_on_init, ForcedFFs}, {log, [{file, [{level, debug}]}]}]}), Config3 = rabbit_ct_helpers:run_steps( Config2, @@ -267,17 +285,18 @@ registry_general_usage(_Config) -> ?assertNot(rabbit_ff_registry:is_registry_initialized()), FeatureFlags = #{ff_a => - #{desc => "Feature flag A", - stability => stable}, + #{desc => "Feature flag A", + provided_by => ?MODULE, + stability => stable}, ff_b => - #{desc => "Feature flag B", - stability => stable}}, - rabbit_feature_flags:inject_test_feature_flags( - feature_flags_to_app_attrs(FeatureFlags)), + #{desc => "Feature flag B", + provided_by => ?MODULE, + stability => stable}}, + rabbit_feature_flags:inject_test_feature_flags(FeatureFlags), %% After initialization, it must know about the feature flags %% declared in this testsuite. They must be disabled however. - rabbit_feature_flags:initialize_registry(), + rabbit_ff_registry_factory:initialize_registry(), ?assert(rabbit_ff_registry:is_registry_initialized()), ?assertMatch([ff_a, ff_b], ?list_ff(all)), @@ -286,7 +305,8 @@ registry_general_usage(_Config) -> ?assertNot(rabbit_ff_registry:is_supported(ff_c)), ?assertNot(rabbit_ff_registry:is_supported(ff_d)), - ?assertEqual(erlang:map_size(rabbit_ff_registry:states()), 0), + ?assertEqual(#{ff_a => false, + ff_b => false}, rabbit_ff_registry:states()), ?assertMatch([], ?list_ff(enabled)), ?assertMatch([], ?list_ff(state_changing)), ?assertMatch([ff_a, ff_b], ?list_ff(disabled)), @@ -301,7 +321,8 @@ registry_general_usage(_Config) -> #{desc => "Feature flag C", provided_by => ?MODULE, stability => stable}}, - rabbit_feature_flags:initialize_registry(NewFeatureFlags), + rabbit_feature_flags:inject_test_feature_flags(NewFeatureFlags), + rabbit_ff_registry_factory:initialize_registry(), ?assertMatch([ff_a, ff_b, ff_c], lists:sort(maps:keys(rabbit_ff_registry:list(all)))), @@ -310,7 +331,9 @@ registry_general_usage(_Config) -> ?assert(rabbit_ff_registry:is_supported(ff_c)), ?assertNot(rabbit_ff_registry:is_supported(ff_d)), - ?assertEqual(erlang:map_size(rabbit_ff_registry:states()), 0), + ?assertEqual(#{ff_a => false, + ff_b => false, + ff_c => false}, rabbit_ff_registry:states()), ?assertMatch([], ?list_ff(enabled)), ?assertMatch([], ?list_ff(state_changing)), ?assertMatch([ff_a, ff_b, ff_c], ?list_ff(disabled)), @@ -321,9 +344,9 @@ registry_general_usage(_Config) -> %% After enabling `ff_a`, it is actually the case. Others are %% supported but remain disabled. - rabbit_feature_flags:initialize_registry(#{}, - #{ff_a => true}, - true), + rabbit_ff_registry_factory:initialize_registry(#{}, + #{ff_a => true}, + true), ?assertMatch([ff_a, ff_b, ff_c], lists:sort(maps:keys(rabbit_ff_registry:list(all)))), @@ -332,7 +355,9 @@ registry_general_usage(_Config) -> ?assert(rabbit_ff_registry:is_supported(ff_c)), ?assertNot(rabbit_ff_registry:is_supported(ff_d)), - ?assertMatch(#{ff_a := true}, rabbit_ff_registry:states()), + ?assertEqual(#{ff_a => true, + ff_b => false, + ff_c => false}, rabbit_ff_registry:states()), ?assertMatch([ff_a], ?list_ff(enabled)), ?assertMatch([], ?list_ff(state_changing)), ?assertMatch([ff_b, ff_c], ?list_ff(disabled)), @@ -343,10 +368,10 @@ registry_general_usage(_Config) -> %% This time, we mark the state of `ff_c` as `state_changing`. We %% expect all other feature flag states to remain unchanged. - rabbit_feature_flags:initialize_registry(#{}, - #{ff_a => false, - ff_c => state_changing}, - true), + rabbit_ff_registry_factory:initialize_registry(#{}, + #{ff_a => false, + ff_c => state_changing}, + true), ?assertMatch([ff_a, ff_b, ff_c], lists:sort(maps:keys(rabbit_ff_registry:list(all)))), @@ -355,7 +380,9 @@ registry_general_usage(_Config) -> ?assert(rabbit_ff_registry:is_supported(ff_c)), ?assertNot(rabbit_ff_registry:is_supported(ff_d)), - ?assertMatch(#{ff_c := state_changing}, rabbit_ff_registry:states()), + ?assertEqual(#{ff_a => false, + ff_b => false, + ff_c => state_changing}, rabbit_ff_registry:states()), ?assertMatch([], ?list_ff(enabled)), ?assertMatch([ff_c], ?list_ff(state_changing)), ?assertMatch([ff_a, ff_b], ?list_ff(disabled)), @@ -366,10 +393,10 @@ registry_general_usage(_Config) -> %% Finally, we disable `ff_c`. All of them are supported but %% disabled. - rabbit_feature_flags:initialize_registry(#{}, - #{ff_b => false, - ff_c => false}, - true), + rabbit_ff_registry_factory:initialize_registry(#{}, + #{ff_b => false, + ff_c => false}, + true), ?assertMatch([ff_a, ff_b, ff_c], lists:sort(maps:keys(rabbit_ff_registry:list(all)))), @@ -378,7 +405,9 @@ registry_general_usage(_Config) -> ?assert(rabbit_ff_registry:is_supported(ff_c)), ?assertNot(rabbit_ff_registry:is_supported(ff_d)), - ?assertEqual(erlang:map_size(rabbit_ff_registry:states()), 0), + ?assertEqual(#{ff_a => false, + ff_b => false, + ff_c => false}, rabbit_ff_registry:states()), ?assertMatch([], ?list_ff(enabled)), ?assertMatch([], ?list_ff(state_changing)), ?assertMatch([ff_a, ff_b, ff_c], ?list_ff(disabled)), @@ -390,7 +419,7 @@ registry_general_usage(_Config) -> registry_concurrent_reloads(_Config) -> case rabbit_ff_registry:is_registry_initialized() of true -> ok; - false -> rabbit_feature_flags:initialize_registry() + false -> rabbit_ff_registry_factory:initialize_registry() end, ?assert(rabbit_ff_registry:is_registry_initialized()), @@ -409,22 +438,24 @@ registry_concurrent_reloads(_Config) -> Name = MakeName(I), Desc = rabbit_misc:format("Feature flag ~b", [I]), NewFF = #{Name => - #{desc => Desc, - stability => stable}}, - rabbit_feature_flags:initialize_registry(NewFF), + #{desc => Desc, + provided_by => ?MODULE, + stability => stable}}, + rabbit_feature_flags:inject_test_feature_flags(NewFF), unlink(Parent) end, %% Prepare feature flags which the spammer process should get at %% some point. FeatureFlags = #{ff_a => - #{desc => "Feature flag A", - stability => stable}, + #{desc => "Feature flag A", + provided_by => ?MODULE, + stability => stable}, ff_b => - #{desc => "Feature flag B", - stability => stable}}, - rabbit_feature_flags:inject_test_feature_flags( - feature_flags_to_app_attrs(FeatureFlags)), + #{desc => "Feature flag B", + provided_by => ?MODULE, + stability => stable}}, + rabbit_feature_flags:inject_test_feature_flags(FeatureFlags), %% Spawn a process which heavily uses the registry. FinalFFList = lists:sort( @@ -437,7 +468,7 @@ registry_concurrent_reloads(_Config) -> %% We acquire the lock from the main process to synchronize the test %% processes we are about to spawn. - Lock = rabbit_feature_flags:registry_loading_lock(), + Lock = rabbit_ff_registry_factory:registry_loading_lock(), ThisNode = [node()], rabbit_log_feature_flags:info( ?MODULE_STRING ": Acquiring registry load lock"), @@ -632,9 +663,17 @@ enable_feature_flag_with_a_network_partition(Config) -> %% Enabling the feature flag should fail in the specific case of %% `ff_from_testsuite`, if the network is broken. - ?assertEqual( - {error, unsupported}, - enable_feature_flag_on(Config, B, FeatureName)), + UsingFFv1 = not ?config(enable_feature_flags_v2, Config), + case UsingFFv1 of + true -> + ?assertEqual( + {error, unsupported}, + enable_feature_flag_on(Config, B, FeatureName)); + false -> + ?assertEqual( + {error, missing_clustered_nodes}, + enable_feature_flag_on(Config, B, FeatureName)) + end, ?assertEqual( False, is_feature_flag_enabled(Config, FeatureName)), @@ -818,7 +857,7 @@ clustering_ok_with_new_ff_disabled(Config) -> stability => stable}}, rabbit_ct_broker_helpers:rpc( Config, 0, - rabbit_feature_flags, initialize_registry, [NewFeatureFlags]), + rabbit_feature_flags, inject_test_feature_flags, [NewFeatureFlags]), FFSubsysOk = is_feature_flag_subsystem_available(Config), @@ -854,7 +893,7 @@ clustering_denied_with_new_ff_enabled(Config) -> stability => stable}}, rabbit_ct_broker_helpers:rpc( Config, 0, - rabbit_feature_flags, initialize_registry, [NewFeatureFlags]), + rabbit_feature_flags, inject_test_feature_flags, [NewFeatureFlags]), enable_feature_flag_on(Config, 0, time_travel), FFSubsysOk = is_feature_flag_subsystem_available(Config), @@ -933,12 +972,21 @@ clustering_ok_with_new_ff_enabled_from_plugin_on_some_nodes(Config) -> ?assertEqual(Config, rabbit_ct_broker_helpers:cluster_nodes(Config)), log_feature_flags_of_all_nodes(Config), + UsingFFv2 = ?config(enable_feature_flags_v2, Config), + UsingFFv1 = not UsingFFv2, case FFSubsysOk of - true -> ?assertEqual([true, true], - is_feature_flag_supported(Config, plugin_ff)), - ?assertEqual([true, true], - is_feature_flag_enabled(Config, plugin_ff)); - false -> ok + true when UsingFFv1 -> + ?assertEqual([true, true], + is_feature_flag_supported(Config, plugin_ff)), + ?assertEqual([true, true], + is_feature_flag_enabled(Config, plugin_ff)); + true when UsingFFv2 -> + ?assertEqual([true, true], + is_feature_flag_supported(Config, plugin_ff)), + ?assertEqual([true, false], + is_feature_flag_enabled(Config, plugin_ff)); + false -> + ok end, ok. @@ -994,12 +1042,21 @@ activating_plugin_with_new_ff_enabled(Config) -> enable_feature_flag_on(Config, 0, plugin_ff), log_feature_flags_of_all_nodes(Config), + UsingFFv2 = ?config(enable_feature_flags_v2, Config), + UsingFFv1 = not UsingFFv2, case FFSubsysOk of - true -> ?assertEqual([true, true], - is_feature_flag_supported(Config, plugin_ff)), - ?assertEqual([true, true], - is_feature_flag_enabled(Config, plugin_ff)); - false -> ok + true when UsingFFv1 -> + ?assertEqual([true, true], + is_feature_flag_supported(Config, plugin_ff)), + ?assertEqual([true, true], + is_feature_flag_enabled(Config, plugin_ff)); + true when UsingFFv2 -> + ?assertEqual([true, true], + is_feature_flag_supported(Config, plugin_ff)), + ?assertEqual([true, false], + is_feature_flag_enabled(Config, plugin_ff)); + false -> + ok end, ok. @@ -1150,20 +1207,16 @@ log_feature_flags_of_all_nodes(Config) -> Config, rabbit_feature_flags, info, [#{color => false, lines => false}]). -feature_flags_to_app_attrs(FeatureFlags) when is_map(FeatureFlags) -> - [{?MODULE, % Application - ?MODULE, % Module - maps:to_list(FeatureFlags)}]. - declare_arbitrary_feature_flag(Config) -> FeatureFlags = #{ff_from_testsuite => #{desc => "My feature flag", + provided_by => ?MODULE, stability => stable}}, rabbit_ct_broker_helpers:rpc_all( Config, rabbit_feature_flags, inject_test_feature_flags, - [feature_flags_to_app_attrs(FeatureFlags)]), + [FeatureFlags]), ok. block(Pairs) -> [block(X, Y) || {X, Y} <- Pairs]. diff --git a/deps/rabbit/test/feature_flags_v2_SUITE.erl b/deps/rabbit/test/feature_flags_v2_SUITE.erl new file mode 100644 index 0000000000..de4daab394 --- /dev/null +++ b/deps/rabbit/test/feature_flags_v2_SUITE.erl @@ -0,0 +1,1540 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2021-2022 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(feature_flags_v2_SUITE). + +-include_lib("kernel/include/logger.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +-include_lib("rabbit_common/include/logging.hrl"). + +-include("feature_flags.hrl"). + +-export([suite/0, + all/0, + groups/0, + init_per_suite/1, + end_per_suite/1, + init_per_group/2, + end_per_group/2, + init_per_testcase/2, + end_per_testcase/2, + + mf_count_runs/3, + mf_wait_and_count_runs/3, + mf_wait_and_count_runs_v2/1, + mf_wait_and_count_runs_in_post_enable/1, + + enable_unknown_feature_flag_on_a_single_node/1, + enable_supported_feature_flag_on_a_single_node/1, + enable_unknown_feature_flag_in_a_3node_cluster/1, + enable_supported_feature_flag_in_a_3node_cluster/1, + enable_partially_supported_feature_flag_in_a_3node_cluster/1, + enable_unsupported_feature_flag_in_a_3node_cluster/1, + enable_feature_flag_in_cluster_and_add_member_after/1, + enable_feature_flag_in_cluster_and_add_member_concurrently_mfv1/1, + enable_feature_flag_in_cluster_and_add_member_concurrently_mfv2/1, + enable_feature_flag_in_cluster_and_remove_member_concurrently_mfv1/1, + enable_feature_flag_in_cluster_and_remove_member_concurrently_mfv2/1, + enable_feature_flag_with_post_enable/1 + ]). + +suite() -> + [{timetrap, {minutes, 1}}]. + +all() -> + [ + {group, feature_flags_v1}, + {group, feature_flags_v2} + ]. + +groups() -> + Groups = + [ + {cluster_size_1, [parallel], + [ + enable_unknown_feature_flag_on_a_single_node, + enable_supported_feature_flag_on_a_single_node + ]}, + {cluster_size_3, [parallel], + [ + enable_unknown_feature_flag_in_a_3node_cluster, + enable_supported_feature_flag_in_a_3node_cluster, + enable_partially_supported_feature_flag_in_a_3node_cluster, + enable_unsupported_feature_flag_in_a_3node_cluster, + enable_feature_flag_in_cluster_and_add_member_after, + enable_feature_flag_in_cluster_and_add_member_concurrently_mfv1, + enable_feature_flag_in_cluster_and_add_member_concurrently_mfv2, + enable_feature_flag_in_cluster_and_remove_member_concurrently_mfv1, + enable_feature_flag_in_cluster_and_remove_member_concurrently_mfv2, + enable_feature_flag_with_post_enable + ]} + ], + [ + {feature_flags_v1, [], Groups}, + {feature_flags_v2, [], Groups} + ]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + logger:set_primary_config(level, debug), + rabbit_ct_helpers:run_steps( + Config, + [fun rabbit_ct_helpers:redirect_logger_to_ct_logs/1]). + +end_per_suite(Config) -> + Config. + +init_per_group(feature_flags_v1, Config) -> + rabbit_ct_helpers:set_config(Config, {enable_feature_flags_v2, false}); +init_per_group(feature_flags_v2, Config) -> + rabbit_ct_helpers:set_config(Config, {enable_feature_flags_v2, true}); +init_per_group(cluster_size_1, Config) -> + rabbit_ct_helpers:set_config(Config, {nodes_count, 1}); +init_per_group(cluster_size_3, Config) -> + rabbit_ct_helpers:set_config(Config, {nodes_count, 3}); +init_per_group(_Group, Config) -> + Config. + +end_per_group(_Group, Config) -> + Config. + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:run_steps( + Config, + [fun(Cfg) -> start_slave_nodes(Cfg, Testcase) end]). + +end_per_testcase(_Testcase, Config) -> + rabbit_ct_helpers:run_steps( + Config, + [fun stop_slave_nodes/1]). + +start_slave_nodes(Config, Testcase) -> + NodesCount = ?config(nodes_count, Config), + ct:pal("Starting ~b slave node(s):", [NodesCount]), + Parent = self(), + Starters = [spawn_link( + fun() -> + start_slave_node(Parent, Config, Testcase, N) + end) + || N <- lists:seq(1, NodesCount)], + Nodes = lists:sort( + [receive + {node, Starter, Node} -> Node + end || Starter <- Starters]), + ct:pal("Started ~b slave node(s): ~p", [NodesCount, Nodes]), + rabbit_ct_helpers:set_config(Config, {nodes, Nodes}). + +start_slave_node(Parent, Config, Testcase, N) -> + Prefix = case ?config(enable_feature_flags_v2, Config) of + false -> "ffv1"; + true -> "ffv2" + end, + Name = list_to_atom( + rabbit_misc:format("~s-~s-~b", [Prefix, Testcase, N])), + ct:pal("- Starting slave node `~s@...`", [Name]), + {ok, Node} = slave:start(net_adm:localhost(), Name), + ct:pal("- Slave node `~s` started", [Node]), + + TestCodePath = filename:dirname(code:which(?MODULE)), + true = rpc:call(Node, code, add_path, [TestCodePath]), + ok = run_on_node(Node, fun setup_slave_node/1, [Config]), + ct:pal("- Slave node `~s` configured", [Node]), + Parent ! {node, self(), Node}. + +stop_slave_nodes(Config) -> + Nodes = ?config(nodes, Config), + ct:pal("Stopping ~b slave nodes:", [length(Nodes)]), + lists:foreach(fun stop_slave_node/1, Nodes), + rabbit_ct_helpers:delete_config(Config, nodes). + +stop_slave_node(Node) -> + ct:pal("- Stopping slave node `~s`...", [Node]), + ok = slave:stop(Node). + +connect_nodes([FirstNode | OtherNodes] = Nodes) -> + lists:foreach( + fun(Node) -> pong = rpc:call(Node, net_adm, ping, [FirstNode]) end, + OtherNodes), + Cluster = lists:sort( + [FirstNode | rpc:call(FirstNode, erlang, nodes, [])]), + ?assert(lists:all(fun(Node) -> lists:member(Node, Cluster) end, Nodes)). + +run_on_node(Node, Fun) -> + run_on_node(Node, Fun, []). + +run_on_node(Node, Fun, Args) -> + rpc:call(Node, erlang, apply, [Fun, Args]). + +%% ------------------------------------------------------------------- +%% Slave node configuration. +%% ------------------------------------------------------------------- + +setup_slave_node(Config) -> + ok = setup_logger(), + ok = setup_feature_flags_file(Config), + ok = start_controller(), + ok = maybe_enable_feature_flags_v2(Config), + ok. + +setup_logger() -> + logger:set_primary_config(level, debug), + ok. + +setup_feature_flags_file(Config) -> + %% The `feature_flags_file' is set to a specific location in the test log + %% directory. + FeatureFlagsFile = filename:join( + ?config(priv_dir, Config), + rabbit_misc:format("feature_flags-~s", [node()])), + ?LOG_INFO("Setting `feature_flags_file to \"~ts\"", [FeatureFlagsFile]), + case application:load(rabbit) of + ok -> ok; + {error, {already_loaded, _}} -> ok + end, + ok = application:set_env(rabbit, feature_flags_file, FeatureFlagsFile), + ok. + +start_controller() -> + ?LOG_INFO("Starting feature flags controller"), + {ok, Pid} = rabbit_ff_controller:start(), + ?LOG_INFO("Feature flags controller: ~p", [Pid]), + ok. + +maybe_enable_feature_flags_v2(Config) -> + EnableFFv2 = ?config(enable_feature_flags_v2, Config), + case EnableFFv2 of + true -> ok = rabbit_feature_flags:enable(feature_flags_v2); + false -> ok + end, + IsEnabled = rabbit_feature_flags:is_enabled(feature_flags_v2), + ?LOG_INFO("`feature_flags_v2` enabled: ~s", [IsEnabled]), + ?assertEqual(EnableFFv2, IsEnabled), + ok. + +override_running_nodes(Nodes) when is_list(Nodes) -> + ct:pal("Overring (running) remote nodes for ~p", [Nodes]), + _ = [begin + ok = rpc:call( + Node, rabbit_feature_flags, override_nodes, + [Nodes]), + ?assertEqual( + Nodes, + lists:sort( + [Node | + rpc:call(Node, rabbit_feature_flags, remote_nodes, [])])), + ?assertEqual( + Nodes, + rpc:call(Node, rabbit_ff_controller, all_nodes, [])), + + ok = rpc:call( + Node, rabbit_feature_flags, override_running_nodes, + [Nodes]), + ?assertEqual( + Nodes, + lists:sort( + [Node | + rpc:call( + Node, rabbit_feature_flags, running_remote_nodes, [])])), + ?assertEqual( + Nodes, + rpc:call(Node, rabbit_ff_controller, running_nodes, [])) + end + || Node <- Nodes], + ok. + +inject_on_nodes(Nodes, FeatureFlags) -> + ct:pal( + "Injecting feature flags on nodes~n" + " Nodes: ~p~n" + " Feature flags: ~p~n", + [FeatureFlags, Nodes]), + _ = [ok = + run_on_node( + Node, + fun() -> + ?assertEqual( + ok, + rabbit_feature_flags:inject_test_feature_flags( + FeatureFlags)), + ok + end, + []) + || Node <- Nodes], + ok. + +%% ------------------------------------------------------------------- +%% Migration functions. +%% ------------------------------------------------------------------- + +-define(PT_MIGRATION_FUN_RUNS, {?MODULE, migration_fun_runs}). + +bump_runs() -> + Count = persistent_term:get(?PT_MIGRATION_FUN_RUNS, 0), + persistent_term:put(?PT_MIGRATION_FUN_RUNS, Count + 1), + ok. + +mf_count_runs(_FeatureName, _FeatureProps, enable) -> + bump_runs(), + ok. + +mf_wait_and_count_runs(_FeatureName, _FeatureProps, enable) -> + Peer = get_peer_proc(), + Peer ! {node(), self(), waiting}, + ?LOG_NOTICE("Migration function: waiting for signal from ~p...", [Peer]), + receive proceed -> ok end, + ?LOG_NOTICE("Migration function: unblocked!", []), + bump_runs(), + ok. + +mf_wait_and_count_runs_v2(#ffcommand{command = enable}) -> + Peer = get_peer_proc(), + Peer ! {node(), self(), waiting}, + ?LOG_NOTICE("Migration function: waiting for signal from ~p...", [Peer]), + receive proceed -> ok end, + ?LOG_NOTICE("Migration function: unblocked!", []), + bump_runs(), + ok; +mf_wait_and_count_runs_v2(_) -> + ok. + +mf_wait_and_count_runs_in_post_enable(#ffcommand{command = post_enable}) -> + Peer = get_peer_proc(), + Peer ! {node(), self(), waiting}, + ?LOG_NOTICE("Migration function: waiting for signal from ~p...", [Peer]), + receive proceed -> ok end, + ?LOG_NOTICE("Migration function: unblocked!", []), + bump_runs(), + ok; +mf_wait_and_count_runs_in_post_enable(_) -> + ok. + +-define(PT_PEER_PROC, {?MODULE, peer_proc}). + +record_peer_proc(Peer) -> + ?LOG_ALERT("Recording peer=~p", [Peer]), + persistent_term:put(?PT_PEER_PROC, Peer). + +get_peer_proc() -> + persistent_term:get(?PT_PEER_PROC). + +%% ------------------------------------------------------------------- +%% Testcases. +%% ------------------------------------------------------------------- + +enable_unknown_feature_flag_on_a_single_node(Config) -> + [Node] = ?config(nodes, Config), + ok = run_on_node( + Node, fun enable_unknown_feature_flag_on_a_single_node/0). + +enable_unknown_feature_flag_on_a_single_node() -> + FeatureName = ?FUNCTION_NAME, + ?assertNot(rabbit_feature_flags:is_supported(FeatureName)), + ?assertNot(rabbit_feature_flags:is_enabled(FeatureName)), + + %% The node doesn't know about the feature flag and thus rejects the + %% request. + ?assertEqual( + {error, unsupported}, rabbit_feature_flags:enable(FeatureName)), + ?assertNot(rabbit_feature_flags:is_supported(FeatureName)), + ?assertNot(rabbit_feature_flags:is_enabled(FeatureName)), + ok. + +enable_supported_feature_flag_on_a_single_node(Config) -> + [Node] = ?config(nodes, Config), + ok = run_on_node( + Node, fun enable_supported_feature_flag_on_a_single_node/0). + +enable_supported_feature_flag_on_a_single_node() -> + FeatureName = ?FUNCTION_NAME, + FeatureFlags = #{FeatureName => #{provided_by => ?MODULE, + stability => stable}}, + ?assertEqual( + ok, rabbit_feature_flags:inject_test_feature_flags(FeatureFlags)), + ?assert(rabbit_feature_flags:is_supported(FeatureName)), + ?assertNot(rabbit_feature_flags:is_enabled(FeatureName)), + + ?assertEqual(ok, rabbit_feature_flags:enable(FeatureName)), + ?assert(rabbit_feature_flags:is_supported(FeatureName)), + ?assert(rabbit_feature_flags:is_enabled(FeatureName)), + ok. + +enable_unknown_feature_flag_in_a_3node_cluster(Config) -> + Nodes = ?config(nodes, Config), + connect_nodes(Nodes), + override_running_nodes(Nodes), + + FeatureName = ?FUNCTION_NAME, + + ct:pal( + "Checking the feature flag is unsupported and disabled on all nodes"), + _ = [ok = + run_on_node( + Node, + fun() -> + ?assertNot(rabbit_feature_flags:is_supported(FeatureName)), + ?assertNot(rabbit_feature_flags:is_enabled(FeatureName)), + ok + end, + []) + || Node <- Nodes], + + %% No nodes know about the feature flag and thus all reject the request. + ct:pal("Trying to enable the feature flag on all nodes"), + _ = [ok = + run_on_node( + Node, + fun() -> + ?assertEqual( + {error, unsupported}, + rabbit_feature_flags:enable(FeatureName)), + ok + end, + []) + || Node <- Nodes], + ct:pal( + "Checking the feature flag is still unsupported and disabled on all " + "nodes"), + _ = [ok = + run_on_node( + Node, + fun() -> + ?assertNot(rabbit_feature_flags:is_supported(FeatureName)), + ?assertNot(rabbit_feature_flags:is_enabled(FeatureName)), + ok + end, + []) + || Node <- Nodes], + ok. + +enable_supported_feature_flag_in_a_3node_cluster(Config) -> + Nodes = ?config(nodes, Config), + connect_nodes(Nodes), + override_running_nodes(Nodes), + + FeatureName = ?FUNCTION_NAME, + FeatureFlags = #{FeatureName => #{provided_by => ?MODULE, + stability => stable}}, + inject_on_nodes(Nodes, FeatureFlags), + + ct:pal( + "Checking the feature flag is supported but disabled on all nodes"), + _ = [ok = + run_on_node( + Node, + fun() -> + ?assert(rabbit_feature_flags:is_supported(FeatureName)), + ?assertNot(rabbit_feature_flags:is_enabled(FeatureName)), + ok + end, + []) + || Node <- Nodes], + + %% The first call enables the feature flag, the following calls are + %% idempotent and do nothing. + ct:pal("Enabling the feature flag on all nodes"), + _ = [ok = + run_on_node( + Node, + fun() -> + ?assertEqual(ok, rabbit_feature_flags:enable(FeatureName)), + ok + end, + []) + || Node <- Nodes], + ct:pal("Checking the feature flag is supported and enabled on all nodes"), + _ = [ok = + run_on_node( + Node, + fun() -> + ?assert(rabbit_feature_flags:is_supported(FeatureName)), + ?assert(rabbit_feature_flags:is_enabled(FeatureName)), + ok + end, + []) + || Node <- Nodes], + ok. + +enable_partially_supported_feature_flag_in_a_3node_cluster(Config) -> + [FirstNode | OtherNodes] = Nodes = ?config(nodes, Config), + connect_nodes(Nodes), + override_running_nodes(Nodes), + UsingFFv1 = not ?config(enable_feature_flags_v2, Config), + + %% This time, we inject the feature flag on a single node only. The other + %% nodes don't know about it. + FeatureName = ?FUNCTION_NAME, + FeatureFlags = #{FeatureName => #{provided_by => ?MODULE, + stability => stable}}, + inject_on_nodes([FirstNode], FeatureFlags), + + case UsingFFv1 of + true -> + %% With `feature_flags_v1', the code would have shared the new + %% feature flags with remote nodes, so let's run that here. In the + %% end, the testcase is similar to + %% `enable_supported_feature_flag_in_a_3node_cluster'. + ct:pal("Refreshing feature flags after app load"), + ok = run_on_node( + FirstNode, + fun() -> + ?assertEqual( + ok, + rabbit_feature_flags: + share_new_feature_flags_after_app_load( + FeatureFlags, infinity)), + ok + end, + []); + false -> + ok + end, + + ct:pal( + "Checking the feature flag is supported but disabled on all nodes"), + _ = [ok = + run_on_node( + Node, + fun() -> + ?assert(rabbit_feature_flags:is_supported(FeatureName)), + ?assertNot(rabbit_feature_flags:is_enabled(FeatureName)), + ok + end, + []) + || Node <- Nodes], + + %% The first call enables the feature flag, the following calls are + %% idempotent and do nothing. + ct:pal("Enabling the feature flag on all nodes"), + _ = [ok = + run_on_node( + Node, + fun() -> + ?assertEqual( + ok, + rabbit_feature_flags:enable(FeatureName)), + ok + end, + []) + || Node <- Nodes], + ct:pal( + "Checking the feature flag is supported on all nodes and enabled on " + "all nodes (v1) or the node knowing it only (v2)"), + ok = run_on_node( + FirstNode, + fun() -> + ?assert(rabbit_feature_flags:is_supported(FeatureName)), + ?assert(rabbit_feature_flags:is_enabled(FeatureName)), + ok + end, + []), + _ = [ok = + run_on_node( + Node, + fun() -> + case UsingFFv1 of + true -> + ?assert( + rabbit_feature_flags:is_supported(FeatureName)), + ?assert( + rabbit_feature_flags:is_enabled(FeatureName)); + false -> + ?assert( + rabbit_feature_flags:is_supported(FeatureName)), + ?assertNot( + rabbit_feature_flags:is_enabled(FeatureName)) + end, + ok + end, + []) + || Node <- OtherNodes], + ok. + +enable_unsupported_feature_flag_in_a_3node_cluster(Config) -> + [FirstNode | _OtherNodes] = Nodes = ?config(nodes, Config), + connect_nodes(Nodes), + override_running_nodes(Nodes), + + %% We inject the feature flag on a single node only. We tell it is + %% provided by `rabbit' which was already loaded and scanned while + %% configuring the node. This way, we ensure the feature flag is + %% considered supported by the node where is was injected, but + %% unsupported by other nodes. + FeatureName = ?FUNCTION_NAME, + FeatureFlags = #{FeatureName => #{provided_by => rabbit, + stability => stable}}, + inject_on_nodes([FirstNode], FeatureFlags), + + ct:pal( + "Checking the feature flag is unsupported and disabled on all nodes"), + _ = [ok = + run_on_node( + Node, + fun() -> + ?assertNot(rabbit_feature_flags:is_supported(FeatureName)), + ?assertNot(rabbit_feature_flags:is_enabled(FeatureName)), + ok + end, + []) + || Node <- Nodes], + + %% The feature flag is unsupported, thus all reject the request. + ct:pal("Enabling the feature flag on all nodes (denied)"), + _ = [ok = + run_on_node( + Node, + fun() -> + ?assertEqual( + {error, unsupported}, + rabbit_feature_flags:enable(FeatureName)), + ok + end, + []) + || Node <- Nodes], + ct:pal("Checking the feature flag is still disabled on all nodes"), + _ = [ok = + run_on_node( + Node, + fun() -> + ?assertNot(rabbit_feature_flags:is_enabled(FeatureName)), + ok + end, + []) + || Node <- Nodes], + ok. + +enable_feature_flag_in_cluster_and_add_member_after(Config) -> + AllNodes = [NewNode | [FirstNode | _] = Nodes] = ?config(nodes, Config), + connect_nodes(Nodes), + override_running_nodes([NewNode]), + override_running_nodes(Nodes), + + FeatureName = ?FUNCTION_NAME, + FeatureFlags = #{FeatureName => + #{provided_by => ?MODULE, + stability => stable, + migration_fun => {?MODULE, mf_count_runs}}}, + inject_on_nodes(AllNodes, FeatureFlags), + + ct:pal( + "Checking the feature flag is supported but disabled on all nodes"), + _ = [ok = + run_on_node( + Node, + fun() -> + ?assert(rabbit_feature_flags:is_supported(FeatureName)), + ?assertNot(rabbit_feature_flags:is_enabled(FeatureName)), + ok + end, + []) + || Node <- AllNodes], + + ct:pal("Enabling the feature flag in the cluster"), + _ = [ok = + run_on_node( + Node, + fun() -> + ?assertEqual(ok, rabbit_feature_flags:enable(FeatureName)), + ok + end, + []) + || Node <- Nodes], + ct:pal("Checking the feature flag is enabled in the initial cluster"), + _ = [ok = + run_on_node( + Node, + fun() -> + ?assert(rabbit_feature_flags:is_enabled(FeatureName)), + ok + end, + []) + || Node <- Nodes], + ct:pal("Checking the feature flag is still disabled on the new node"), + ok = run_on_node( + NewNode, + fun() -> + ?assertNot(rabbit_feature_flags:is_enabled(FeatureName)), + ok + end, + []), + + %% Check compatibility between NewNodes and Nodes. + ok = run_on_node( + NewNode, + fun() -> + ?assertEqual( + ok, + rabbit_feature_flags:check_node_compatibility( + FirstNode)), + ok + end, []), + + %% Add node to cluster and synchronize feature flags. + connect_nodes(AllNodes), + override_running_nodes(AllNodes), + ct:pal( + "Synchronizing feature flags in the expanded cluster~n" + "~n" + "NOTE: Error messages about crashed migration functions can be " + "ignored for feature~n" + " flags other than `~s`~n" + " because they assume they run inside RabbitMQ.", + [FeatureName]), + ok = run_on_node( + NewNode, + fun() -> + ?assertEqual( + ok, + rabbit_feature_flags:sync_feature_flags_with_cluster( + Nodes, true)), + ok + end, []), + + ct:pal("Checking the feature flag is enabled in the expanded cluster"), + _ = [ok = + run_on_node( + Node, + fun() -> + ?assert(rabbit_feature_flags:is_enabled(FeatureName)), + %% With both feature flags v1 and v2, the migration + %% function is executed on the node where `enable()' was + %% called, and then on the node joining the cluster. + Count = case Node of + FirstNode -> 1; + NewNode -> 1; + _ -> 0 + end, + ?assertEqual( + Count, + persistent_term:get(?PT_MIGRATION_FUN_RUNS, 0)), + ok + end, + []) + || Node <- AllNodes], + ok. + +enable_feature_flag_in_cluster_and_add_member_concurrently_mfv1(Config) -> + AllNodes = [NewNode | [FirstNode | _] = Nodes] = ?config(nodes, Config), + connect_nodes(Nodes), + override_running_nodes([NewNode]), + override_running_nodes(Nodes), + + FeatureName = ?FUNCTION_NAME, + FeatureFlags = #{FeatureName => + #{provided_by => ?MODULE, + stability => stable, + migration_fun => {?MODULE, mf_wait_and_count_runs}}}, + inject_on_nodes(AllNodes, FeatureFlags), + + ct:pal( + "Checking the feature flag is supported but disabled on all nodes"), + _ = [ok = + run_on_node( + Node, + fun() -> + ?assert(rabbit_feature_flags:is_supported(FeatureName)), + ?assertNot(rabbit_feature_flags:is_enabled(FeatureName)), + ok + end, + []) + || Node <- AllNodes], + + ct:pal( + "Enabling the feature flag in the cluster (in a separate process)"), + Peer = self(), + Enabler = spawn_link( + fun() -> + ok = + run_on_node( + FirstNode, + fun() -> + %% The migration function uses the `Peer' + %% PID (the process executing the testcase) + %% to notify its own PID and wait for a + %% signal from `Peer' to proceed and finish + %% the migration. + record_peer_proc(Peer), + ?assertEqual( + ok, + rabbit_feature_flags:enable( + FeatureName)), + ok + end, + []) + end), + + %% By waiting for the message from one of the migration function + %% instances, we make sure the feature flags controller on `FirstNode' is + %% blocked and waits for a message from this process. Therefore, we are + %% sure the feature flag is in the `state_changing' state and we can try + %% to add a new node and sync its feature flags. + FirstNodeMigFunPid = receive + {FirstNode, MigFunPid1, waiting} -> MigFunPid1 + end, + + %% Check compatibility between NewNodes and Nodes. This doesn't block. + ok = run_on_node( + NewNode, + fun() -> + ?assertEqual( + ok, + rabbit_feature_flags:check_node_compatibility( + FirstNode)), + ok + end, []), + + %% Add node to cluster and synchronize feature flags. The synchronization + %% blocks. + connect_nodes(AllNodes), + override_running_nodes(AllNodes), + ct:pal( + "Synchronizing feature flags in the expanded cluster (in a separate " + "process)~n" + "~n" + "NOTE: Error messages about crashed migration functions can be " + "ignored for feature~n" + " flags other than `~s`~n" + " because they assume they run inside RabbitMQ.", + [FeatureName]), + Syncer = spawn_link( + fun() -> + ok = + run_on_node( + NewNode, + fun() -> + record_peer_proc(Peer), + ?assertEqual( + ok, + rabbit_feature_flags: + sync_feature_flags_with_cluster( + Nodes, true)), + ok + end, []) + end), + + ct:pal( + "Checking the feature flag state is changing in the initial cluster"), + _ = [ok = + run_on_node( + Node, + fun() -> + ?assertEqual( + state_changing, + rabbit_feature_flags:is_enabled( + FeatureName, non_blocking)), + ok + end, + []) + || Node <- Nodes], + + ct:pal("Checking the feature flag is still disabled on the new node"), + ok = run_on_node( + NewNode, + fun() -> + ?assertNot(rabbit_feature_flags:is_enabled(FeatureName)), + ok + end, + []), + + %% Unblock the migration functions on `Nodes'. + UsingFFv1 = not ?config(enable_feature_flags_v2, Config), + EnablerMRef = erlang:monitor(process, Enabler), + SyncerMRef = erlang:monitor(process, Syncer), + unlink(Enabler), + unlink(Syncer), + ExpectedNodes = case UsingFFv1 of + true -> + %% With v1, the migration function runs on a + %% single node in the cluster only in this + %% scenario. + %% + %% The reason is that the new node joined during + %% the migration and the feature flag was marked + %% as enabled there as well, even though the + %% migration function possibly didn't know about + %% it. This is one of the problems + %% `feature_flags_v2' fixes. + [FirstNode]; + false -> + %% With v2 but still using the old migration + %% function API (taking 3 arguments), the + %% migration function is executed on the node + %% where `enable()' was called, and then on the + %% node joining the cluster, thanks to the + %% synchronization. + [FirstNode, NewNode] + end, + + %% Unblock the migration function for which we already consumed the + %% `waiting' notification. + FirstMigratedNode = node(FirstNodeMigFunPid), + ?assertEqual(FirstNode, FirstMigratedNode), + ct:pal( + "Unblocking first node (~p @ ~s)", + [FirstNodeMigFunPid, FirstMigratedNode]), + FirstNodeMigFunPid ! proceed, + + %% Unblock the rest and collect the node names of all migration functions + %% which ran. + ct:pal("Unblocking other nodes, including the joining one"), + OtherMigratedNodes = [receive + {Node, MigFunPid2, waiting} -> + MigFunPid2 ! proceed, + Node + end || Node <- ExpectedNodes -- [FirstMigratedNode]], + MigratedNodes = [FirstMigratedNode | OtherMigratedNodes], + ?assertEqual(lists:sort(ExpectedNodes), lists:sort(MigratedNodes)), + + ct:pal("Waiting for spawned processes to terminate"), + receive + {'DOWN', EnablerMRef, process, Enabler, EnablerReason} -> + ?assertEqual(normal, EnablerReason) + end, + receive + {'DOWN', SyncerMRef, process, Syncer, SyncerReason} -> + ?assertEqual(normal, SyncerReason) + end, + + ct:pal("Checking the feature flag is enabled in the expanded cluster"), + _ = [ok = + run_on_node( + Node, + fun() -> + ?assert(rabbit_feature_flags:is_enabled(FeatureName)), + ?assert( + not lists:member(Node, MigratedNodes) orelse + 1 =:= persistent_term:get(?PT_MIGRATION_FUN_RUNS, 0)), + ok + end, + []) + || Node <- AllNodes], + ok. + +enable_feature_flag_in_cluster_and_add_member_concurrently_mfv2(Config) -> + AllNodes = [NewNode | [FirstNode | _] = Nodes] = ?config(nodes, Config), + connect_nodes(Nodes), + override_running_nodes([NewNode]), + override_running_nodes(Nodes), + + FeatureName = ?FUNCTION_NAME, + FeatureFlags = #{FeatureName => + #{provided_by => ?MODULE, + stability => stable, + migration_fun => {?MODULE, + mf_wait_and_count_runs_v2}}}, + inject_on_nodes(AllNodes, FeatureFlags), + + ct:pal( + "Checking the feature flag is supported but disabled on all nodes"), + _ = [ok = + run_on_node( + Node, + fun() -> + ?assert(rabbit_feature_flags:is_supported(FeatureName)), + ?assertNot(rabbit_feature_flags:is_enabled(FeatureName)), + ok + end, + []) + || Node <- AllNodes], + + ct:pal( + "Enabling the feature flag in the cluster (in a separate process)"), + Peer = self(), + _ = [ok = + run_on_node( + Node, + fun() -> + %% The migration function uses the `Peer' PID (the process + %% executing the testcase) to notify its own PID and wait + %% for a signal from `Peer' to proceed and finish the + %% migration. + record_peer_proc(Peer), + ok + end, + []) + || Node <- AllNodes], + Enabler = spawn_link( + fun() -> + ok = + run_on_node( + FirstNode, + fun() -> + ?assertEqual( + ok, + rabbit_feature_flags:enable( + FeatureName)), + ok + end, + []) + end), + + %% By waiting for the message from one of the migration function + %% instances, we make sure the feature flags controller on `FirstNode' is + %% blocked and waits for a message from this process. Therefore, we are + %% sure the feature flag is in the `state_changing' state and we can try + %% to add a new node and sync its feature flags. + FirstNodeMigFunPid = receive + {_Node, MigFunPid1, waiting} -> MigFunPid1 + end, + + %% Check compatibility between NewNodes and Nodes. This doesn't block. + ok = run_on_node( + NewNode, + fun() -> + ?assertEqual( + ok, + rabbit_feature_flags:check_node_compatibility( + FirstNode)), + ok + end, []), + + %% Add node to cluster and synchronize feature flags. The synchronization + %% blocks. + connect_nodes(AllNodes), + override_running_nodes(AllNodes), + ct:pal( + "Synchronizing feature flags in the expanded cluster (in a separate " + "process)~n" + "~n" + "NOTE: Error messages about crashed migration functions can be " + "ignored for feature~n" + " flags other than `~s`~n" + " because they assume they run inside RabbitMQ.", + [FeatureName]), + Syncer = spawn_link( + fun() -> + ok = + run_on_node( + NewNode, + fun() -> + ?assertEqual( + ok, + rabbit_feature_flags: + sync_feature_flags_with_cluster( + Nodes, true)), + ok + end, []) + end), + + ct:pal( + "Checking the feature flag state is changing in the initial cluster"), + _ = [ok = + run_on_node( + Node, + fun() -> + ?assertEqual( + state_changing, + rabbit_feature_flags:is_enabled( + FeatureName, non_blocking)), + ok + end, + []) + || Node <- Nodes], + + ct:pal("Checking the feature flag is still disabled on the new node"), + ok = run_on_node( + NewNode, + fun() -> + ?assertNot(rabbit_feature_flags:is_enabled(FeatureName)), + ok + end, + []), + + %% Unblock the migration functions on `Nodes'. + EnablerMRef = erlang:monitor(process, Enabler), + SyncerMRef = erlang:monitor(process, Syncer), + unlink(Enabler), + unlink(Syncer), + + %% The migration function runs on all clustered nodes with v2, including + %% the one joining the cluster, thanks to the synchronization. + %% + %% When this testcase runs with feature flags v1, the feature flag we want + %% to enable uses the migration function API v2: this implicitly enables + %% `feature_flags_v2'. As part of the synchronization, the node still on + %% feature flags v1 will try to sync `feature_flags_v2' specificaly first. + %% After that, the controller-based sync proceeds. + ExpectedNodes = Nodes ++ [NewNode], + + %% Unblock the migration function for which we already consumed the + %% `waiting' notification. + FirstMigratedNode = node(FirstNodeMigFunPid), + ct:pal( + "Unblocking first node (~p @ ~s)", + [FirstNodeMigFunPid, FirstMigratedNode]), + FirstNodeMigFunPid ! proceed, + + %% Unblock the rest and collect the node names of all migration functions + %% which ran. + ct:pal("Unblocking other nodes, including the joining one"), + OtherMigratedNodes = [receive + {Node, MigFunPid2, waiting} -> + MigFunPid2 ! proceed, + Node + end || Node <- ExpectedNodes -- [FirstMigratedNode]], + MigratedNodes = [FirstMigratedNode | OtherMigratedNodes], + ?assertEqual(lists:sort(ExpectedNodes), lists:sort(MigratedNodes)), + + ct:pal("Waiting for spawned processes to terminate"), + receive + {'DOWN', EnablerMRef, process, Enabler, EnablerReason} -> + ?assertEqual(normal, EnablerReason) + end, + receive + {'DOWN', SyncerMRef, process, Syncer, SyncerReason} -> + ?assertEqual(normal, SyncerReason) + end, + + ct:pal("Checking the feature flag is enabled in the expanded cluster"), + _ = [ok = + run_on_node( + Node, + fun() -> + ?assert(rabbit_feature_flags:is_enabled(FeatureName)), + ?assertEqual( + 1, + persistent_term:get(?PT_MIGRATION_FUN_RUNS, 0)), + ok + end, + []) + || Node <- AllNodes], + ok. + +enable_feature_flag_in_cluster_and_remove_member_concurrently_mfv1(Config) -> + AllNodes = [LeavingNode | [FirstNode | _] = Nodes] = ?config( + nodes, Config), + connect_nodes(AllNodes), + override_running_nodes(AllNodes), + + FeatureName = ?FUNCTION_NAME, + FeatureFlags = #{FeatureName => + #{provided_by => ?MODULE, + stability => stable, + migration_fun => {?MODULE, mf_wait_and_count_runs}}}, + inject_on_nodes(AllNodes, FeatureFlags), + + ct:pal( + "Checking the feature flag is supported but disabled on all nodes"), + _ = [ok = + run_on_node( + Node, + fun() -> + ?assert(rabbit_feature_flags:is_supported(FeatureName)), + ?assertNot(rabbit_feature_flags:is_enabled(FeatureName)), + ok + end, + []) + || Node <- AllNodes], + + UsingFFv1 = not ?config(enable_feature_flags_v2, Config), + ExpectedRet = case UsingFFv1 of + true -> ok; + false -> {error, {badrpc, nodedown}} + end, + ct:pal( + "Enabling the feature flag in the cluster (in a separate process)"), + Peer = self(), + Enabler = spawn_link( + fun() -> + ok = + run_on_node( + FirstNode, + fun() -> + %% The migration function uses the `Peer' + %% PID (the process executing the testcase) + %% to notify its own PID and wait for a + %% signal from `Peer' to proceed and finish + %% the migration. + record_peer_proc(Peer), + ?assertEqual( + ExpectedRet, + rabbit_feature_flags:enable( + FeatureName)), + ok + end, + []) + end), + + %% By waiting for the message from one of the migration function + %% instances, we make sure the feature flags controller on `FirstNode' is + %% blocked and waits for a message from this process. Therefore, we are + %% sure the feature flag is in the `state_changing' state and we can try + %% to add a new node and sync its feature flags. + FirstNodeMigFunPid = receive + {_Node, MigFunPid1, waiting} -> MigFunPid1 + end, + + %% Remove node from cluster. + stop_slave_node(LeavingNode), + override_running_nodes(Nodes), + + %% Unblock the migration functions on `Nodes'. + EnablerMRef = erlang:monitor(process, Enabler), + unlink(Enabler), + + %% Unblock the migration function for which we already consumed the + %% `waiting' notification. + FirstMigratedNode = node(FirstNodeMigFunPid), + ct:pal( + "Unblocking first node (~p @ ~s)", + [FirstNodeMigFunPid, FirstMigratedNode]), + FirstNodeMigFunPid ! proceed, + + ct:pal("Waiting for spawned processes to terminate"), + receive + {'DOWN', EnablerMRef, process, Enabler, EnablerReason} -> + ?assertEqual(normal, EnablerReason) + end, + + ct:pal( + "Checking the feature flag is enabled (v1) or disabled (v2) in the " + "cluster"), + _ = [ok = + run_on_node( + Node, + fun() -> + case UsingFFv1 of + true -> + ?assert( + rabbit_feature_flags:is_enabled(FeatureName)); + false -> + ?assertNot( + rabbit_feature_flags:is_enabled(FeatureName)) + end, + ok + end, + []) + || Node <- Nodes], + ok. + +enable_feature_flag_in_cluster_and_remove_member_concurrently_mfv2(Config) -> + AllNodes = [LeavingNode | [FirstNode | _] = Nodes] = ?config( + nodes, Config), + connect_nodes(AllNodes), + override_running_nodes(AllNodes), + + FeatureName = ?FUNCTION_NAME, + FeatureFlags = #{FeatureName => + #{provided_by => ?MODULE, + stability => stable, + migration_fun => {?MODULE, + mf_wait_and_count_runs_v2}}}, + inject_on_nodes(AllNodes, FeatureFlags), + + UsingFFv1 = not ?config(enable_feature_flags_v2, Config), + + ct:pal( + "Checking the feature flag is supported but disabled on all nodes"), + _ = [ok = + run_on_node( + Node, + fun() -> + ?assert(rabbit_feature_flags:is_supported(FeatureName)), + ?assertNot(rabbit_feature_flags:is_enabled(FeatureName)), + ok + end, + []) + || Node <- AllNodes], + + ct:pal( + "Enabling the feature flag in the cluster (in a separate process)"), + Peer = self(), + _ = [ok = + run_on_node( + Node, + fun() -> + %% The migration function uses the `Peer' PID (the process + %% executing the testcase) to notify its own PID and wait + %% for a signal from `Peer' to proceed and finish the + %% migration. + record_peer_proc(Peer), + ok + end, + []) + || Node <- AllNodes], + Enabler = spawn_link( + fun() -> + ok = + run_on_node( + FirstNode, + fun() -> + ?assertEqual( + {error, {badrpc, nodedown}}, + rabbit_feature_flags:enable( + FeatureName)), + ok + end, + []) + end), + + %% By waiting for the message from one of the migration function + %% instances, we make sure the feature flags controller on `FirstNode' is + %% blocked and waits for a message from this process. Therefore, we are + %% sure the feature flag is in the `state_changing' state and we can try + %% to add a new node and sync its feature flags. + FirstNodeMigFunPid = receive + {FirstNode, MigFunPid1, waiting} -> MigFunPid1 + end, + + %% Remove node from cluster. + stop_slave_node(LeavingNode), + override_running_nodes(Nodes), + + %% Unblock the migration functions on `Nodes'. + EnablerMRef = erlang:monitor(process, Enabler), + unlink(Enabler), + + %% The migration function runs on all clustered nodes with v2. + %% + %% When this testcase runs with feature flags v1, the feature flag we want + %% to enable uses the migration function API v2: this implicitly enables + %% `feature_flags_v2'. As part of the synchronization, the node still on + %% feature flags v1 will try to sync `feature_flags_v2' specificaly first. + %% After that, the controller-based sync proceeds. + ExpectedNodes = Nodes, + + %% Unblock the migration function for which we already consumed the + %% `waiting' notification. + FirstMigratedNode = node(FirstNodeMigFunPid), + ?assertEqual(FirstNode, FirstMigratedNode), + ct:pal( + "Unblocking first node (~p @ ~s)", + [FirstNodeMigFunPid, FirstMigratedNode]), + FirstNodeMigFunPid ! proceed, + + %% Unblock the rest and collect the node names of all migration functions + %% which ran. + ct:pal("Unblocking other nodes"), + OtherMigratedNodes = [receive + {Node, MigFunPid2, waiting} -> + MigFunPid2 ! proceed, + Node + end || Node <- ExpectedNodes -- [FirstMigratedNode]], + MigratedNodes = [FirstMigratedNode | OtherMigratedNodes], + ?assertEqual(lists:sort(ExpectedNodes), lists:sort(MigratedNodes)), + + ct:pal("Waiting for spawned processes to terminate"), + receive + {'DOWN', EnablerMRef, process, Enabler, EnablerReason} -> + ?assertEqual(normal, EnablerReason) + end, + + ct:pal( + "Checking the feature flag is enabled (v1) or disabled (v2) in the " + "cluster"), + _ = [ok = + run_on_node( + Node, + fun() -> + case UsingFFv1 of + true -> + ?assertNot( + rabbit_feature_flags:is_enabled(FeatureName)); + false -> + ?assertNot( + rabbit_feature_flags:is_enabled(FeatureName)) + end, + ok + end, + []) + || Node <- Nodes], + ok. + +enable_feature_flag_with_post_enable(Config) -> + AllNodes = [NewNode | [FirstNode | _] = Nodes] = ?config(nodes, Config), + connect_nodes(Nodes), + override_running_nodes([NewNode]), + override_running_nodes(Nodes), + + FeatureName = ?FUNCTION_NAME, + FeatureFlags = #{FeatureName => + #{provided_by => ?MODULE, + stability => stable, + migration_fun => + {?MODULE, + mf_wait_and_count_runs_in_post_enable}}}, + inject_on_nodes(AllNodes, FeatureFlags), + + ct:pal( + "Checking the feature flag is supported but disabled on all nodes"), + _ = [ok = + run_on_node( + Node, + fun() -> + ?assert(rabbit_feature_flags:is_supported(FeatureName)), + ?assertNot(rabbit_feature_flags:is_enabled(FeatureName)), + ok + end, + []) + || Node <- AllNodes], + + ct:pal( + "Enabling the feature flag in the cluster (in a separate process)"), + Peer = self(), + _ = [ok = + run_on_node( + Node, + fun() -> + %% The migration function uses the `Peer' PID (the process + %% executing the testcase) to notify its own PID and wait + %% for a signal from `Peer' to proceed and finish the + %% migration. + record_peer_proc(Peer), + ok + end, + []) + || Node <- AllNodes], + Enabler = spawn_link( + fun() -> + ok = + run_on_node( + FirstNode, + fun() -> + ?assertEqual( + ok, + rabbit_feature_flags:enable( + FeatureName)), + ok + end, + []) + end), + + %% By waiting for the message from one of the migration function + %% instances, we make sure the feature flags controller on `FirstNode' is + %% blocked and waits for a message from this process. Therefore, we are + %% sure the feature flag is in the `state_changing' state and we can try + %% to add a new node and sync its feature flags. + FirstNodeMigFunPid = receive + {_Node, MigFunPid1, waiting} -> MigFunPid1 + end, + + %% Check compatibility between NewNodes and Nodes. This doesn't block. + ok = run_on_node( + NewNode, + fun() -> + ?assertEqual( + ok, + rabbit_feature_flags:check_node_compatibility( + FirstNode)), + ok + end, []), + + %% Add node to cluster and synchronize feature flags. The synchronization + %% blocks. + connect_nodes(AllNodes), + override_running_nodes(AllNodes), + ct:pal( + "Synchronizing feature flags in the expanded cluster (in a separate " + "process)~n" + "~n" + "NOTE: Error messages about crashed migration functions can be " + "ignored for feature~n" + " flags other than `~s`~n" + " because they assume they run inside RabbitMQ.", + [FeatureName]), + Syncer = spawn_link( + fun() -> + ok = + run_on_node( + NewNode, + fun() -> + ?assertEqual( + ok, + rabbit_feature_flags: + sync_feature_flags_with_cluster( + Nodes, true)), + ok + end, []) + end), + + ct:pal( + "Checking the feature flag is enabled in the initial cluster"), + _ = [ok = + run_on_node( + Node, + fun() -> + ?assert( + rabbit_feature_flags:is_enabled( + FeatureName, non_blocking)), + ok + end, + []) + || Node <- Nodes], + + ct:pal("Checking the feature flag is still disabled on the new node"), + ok = run_on_node( + NewNode, + fun() -> + ?assertNot(rabbit_feature_flags:is_enabled(FeatureName)), + ok + end, + []), + + %% Unblock the migration functions on `Nodes'. + EnablerMRef = erlang:monitor(process, Enabler), + SyncerMRef = erlang:monitor(process, Syncer), + unlink(Enabler), + unlink(Syncer), + + %% The migration function runs on all clustered nodes with v2, including + %% the one joining the cluster, thanks to the synchronization. + %% + %% When this testcase runs with feature flags v1, the feature flag we want + %% to enable uses the migration function API v2: this implicitly enables + %% `feature_flags_v2'. As part of the synchronization, the node still on + %% feature flags v1 will try to sync `feature_flags_v2' specificaly first. + %% After that, the controller-based sync proceeds. + ExpectedNodes = Nodes ++ [NewNode], + + %% Unblock the migration function for which we already consumed the + %% `waiting' notification. + FirstMigratedNode = node(FirstNodeMigFunPid), + ct:pal( + "Unblocking first node (~p @ ~s)", + [FirstNodeMigFunPid, FirstMigratedNode]), + FirstNodeMigFunPid ! proceed, + + %% Unblock the rest and collect the node names of all migration functions + %% which ran. + ct:pal("Unblocking other nodes, including the joining one"), + OtherMigratedNodes = [receive + {Node, MigFunPid2, waiting} -> + MigFunPid2 ! proceed, + Node + end || Node <- ExpectedNodes -- [FirstMigratedNode]], + MigratedNodes = [FirstMigratedNode | OtherMigratedNodes], + ?assertEqual(lists:sort(ExpectedNodes), lists:sort(MigratedNodes)), + + ct:pal("Waiting for spawned processes to terminate"), + receive + {'DOWN', EnablerMRef, process, Enabler, EnablerReason} -> + ?assertEqual(normal, EnablerReason) + end, + receive + {'DOWN', SyncerMRef, process, Syncer, SyncerReason} -> + ?assertEqual(normal, SyncerReason) + end, + + ct:pal("Checking the feature flag is enabled in the expanded cluster"), + _ = [ok = + run_on_node( + Node, + fun() -> + ?assert(rabbit_feature_flags:is_enabled(FeatureName)), + ?assertEqual( + 1, + persistent_term:get(?PT_MIGRATION_FUN_RUNS, 0)), + ok + end, + []) + || Node <- AllNodes], + ok. diff --git a/deps/rabbit/test/feature_flags_with_unpriveleged_user_SUITE.erl b/deps/rabbit/test/feature_flags_with_unpriveleged_user_SUITE.erl index 96cb0b542a..0ae07f009d 100644 --- a/deps/rabbit/test/feature_flags_with_unpriveleged_user_SUITE.erl +++ b/deps/rabbit/test/feature_flags_with_unpriveleged_user_SUITE.erl @@ -53,7 +53,6 @@ init_per_suite(Config) -> end_per_suite(Config) -> feature_flags_SUITE:end_per_suite(Config). - init_per_group(enabling_in_cluster, Config) -> case rabbit_ct_helpers:is_mixed_versions() of true -> diff --git a/deps/rabbit_common/src/rabbit_misc.erl b/deps/rabbit_common/src/rabbit_misc.erl index 95fdd9d627..811dcfbe73 100644 --- a/deps/rabbit_common/src/rabbit_misc.erl +++ b/deps/rabbit_common/src/rabbit_misc.erl @@ -52,6 +52,7 @@ -export([dict_cons/3, orddict_cons/3, maps_cons/3, gb_trees_cons/3]). -export([gb_trees_fold/3, gb_trees_foreach/2]). -export([all_module_attributes/1, + rabbitmq_related_apps/0, rabbitmq_related_module_attributes/1, module_attributes_from_apps/2, build_acyclic_graph/3]). diff --git a/deps/rabbitmq_cli/test/ctl/enable_feature_flag_test.exs b/deps/rabbitmq_cli/test/ctl/enable_feature_flag_test.exs index f8a3e62920..75815151ee 100644 --- a/deps/rabbitmq_cli/test/ctl/enable_feature_flag_test.exs +++ b/deps/rabbitmq_cli/test/ctl/enable_feature_flag_test.exs @@ -23,7 +23,8 @@ defmodule EnableFeatureFlagCommandTest do provided_by: :EnableFeatureFlagCommandTest, stability: :stable}} :ok = :rabbit_misc.rpc_call( - node, :rabbit_feature_flags, :initialize_registry, [new_feature_flags]) + node, :rabbit_feature_flags, :inject_test_feature_flags, + [new_feature_flags]) { :ok, diff --git a/deps/rabbitmq_cli/test/ctl/list_feature_flags_command_test.exs b/deps/rabbitmq_cli/test/ctl/list_feature_flags_command_test.exs index b2cf1ad52a..1d9ace0fe4 100644 --- a/deps/rabbitmq_cli/test/ctl/list_feature_flags_command_test.exs +++ b/deps/rabbitmq_cli/test/ctl/list_feature_flags_command_test.exs @@ -28,7 +28,8 @@ defmodule ListFeatureFlagsCommandTest do provided_by: :ListFeatureFlagsCommandTest, stability: :stable}} :ok = :rabbit_misc.rpc_call( - node, :rabbit_feature_flags, :initialize_registry, [new_feature_flags]) + node, :rabbit_feature_flags, :inject_test_feature_flags, + [new_feature_flags]) :ok = :rabbit_misc.rpc_call( node, :rabbit_feature_flags, :enable_all, []) diff --git a/deps/rabbitmq_ct_helpers/src/cth_log_redirect_any_domains.erl b/deps/rabbitmq_ct_helpers/src/cth_log_redirect_any_domains.erl new file mode 100644 index 0000000000..4caa1cf6fd --- /dev/null +++ b/deps/rabbitmq_ct_helpers/src/cth_log_redirect_any_domains.erl @@ -0,0 +1,23 @@ +-module(cth_log_redirect_any_domains). + +-export([log/2]). + +-define(BACKEND_MODULE, cth_log_redirect). + +%% Reversed behavior compared to `cth_log_redirect': log events with an +%% unknown domain are sent to the `cth_log_redirect' server, others are +%% dropped (as they are already handled by `cth_log_redirect'). +log(#{msg:={report,_Msg},meta:=#{domain:=[otp,sasl]}},_Config) -> + ok; +log(#{meta:=#{domain:=[otp]}},_Config) -> + ok; +log(#{meta:=#{domain:=_}}=Log,Config) -> + do_log(add_log_category(Log,error_logger),Config); +log(_Log,_Config) -> + ok. + +add_log_category(#{meta:=Meta}=Log,Category) -> + Log#{meta=>Meta#{?BACKEND_MODULE=>#{category=>Category}}}. + +do_log(Log,Config) -> + gen_server:call(?BACKEND_MODULE,{log,Log,Config}). diff --git a/deps/rabbitmq_ct_helpers/src/rabbit_ct_helpers.erl b/deps/rabbitmq_ct_helpers/src/rabbit_ct_helpers.erl index b72f4f244c..6b62f59b98 100644 --- a/deps/rabbitmq_ct_helpers/src/rabbit_ct_helpers.erl +++ b/deps/rabbitmq_ct_helpers/src/rabbit_ct_helpers.erl @@ -7,6 +7,7 @@ -module(rabbit_ct_helpers). +-include_lib("kernel/include/logger.hrl"). -include_lib("common_test/include/ct.hrl"). -deprecated({is_mixed_versions,1,"Use is_mixed_versions/0 instead"}). @@ -26,6 +27,7 @@ load_rabbitmqctl_app/1, ensure_rabbitmq_plugins_cmd/1, ensure_rabbitmq_queues_cmd/1, + redirect_logger_to_ct_logs/1, init_skip_as_error_flag/1, start_long_running_testsuite_monitor/1, stop_long_running_testsuite_monitor/1, @@ -155,6 +157,34 @@ run_steps(Config, [Step | Rest]) -> run_steps(Config, []) -> Config. +redirect_logger_to_ct_logs(Config) -> + ct:pal( + ?LOW_IMPORTANCE, + "Configuring logger to send logs to common_test logs"), + logger:set_handler_config(cth_log_redirect, level, debug), + + %% Let's use the same format as RabbitMQ itself. + logger:set_handler_config( + cth_log_redirect, formatter, + rabbit_prelaunch_early_logging:default_file_formatter(#{})), + + %% We use an addition logger handler for messages tagged with a non-OTP + %% domain because by default, `cth_log_redirect' drop them. + {ok, LogCfg0} = logger:get_handler_config(cth_log_redirect), + LogCfg = maps:remove(id, maps:remove(module, LogCfg0)), + ok = logger:add_handler( + cth_log_redirect_any_domains, cth_log_redirect_any_domains, + LogCfg), + + logger:remove_handler(default), + + ct:pal( + ?LOW_IMPORTANCE, + "Logger configured to send logs to common_test logs; you should see " + "a message below saying so"), + ?LOG_INFO("Logger message logged to common_test logs"), + Config. + init_skip_as_error_flag(Config) -> SkipAsError = case os:getenv("RABBITMQ_CT_SKIP_AS_ERROR") of false -> false; |
