diff options
author | Diana Parra Corbacho <dparracorbac@vmware.com> | 2023-05-09 16:30:03 +0200 |
---|---|---|
committer | Diana Parra Corbacho <dparracorbac@vmware.com> | 2023-05-12 12:49:05 +0200 |
commit | 9ab28025457fe8e6a5429f5482288ddfc01034ce (patch) | |
tree | cfb66ee0a31862e6c7a38b10e34da9f99eccab45 | |
parent | c5c1f4a57b424d9d28b8f1d6da73f9668ecd093f (diff) | |
download | rabbitmq-server-git-khepri.tar.gz |
Khepri: clusteringkhepri
-rw-r--r-- | deps/rabbit/BUILD.bazel | 2 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_core_ff.erl | 15 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_khepri.erl | 15 | ||||
-rw-r--r-- | deps/rabbit/test/clustering_management_SUITE.erl | 33 | ||||
-rw-r--r-- | deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/force_standalone_khepri_boot.ex | 45 | ||||
-rw-r--r-- | deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/join_cluster_command.ex | 10 | ||||
-rw-r--r-- | deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/reset_command.ex | 5 | ||||
-rw-r--r-- | deps/rabbitmq_cli/lib/rabbitmq/cli/default_output.ex | 14 |
8 files changed, 125 insertions, 14 deletions
diff --git a/deps/rabbit/BUILD.bazel b/deps/rabbit/BUILD.bazel index 14e8986503..7dea0851f8 100644 --- a/deps/rabbit/BUILD.bazel +++ b/deps/rabbit/BUILD.bazel @@ -332,7 +332,7 @@ rabbitmq_integration_suite( name = "clustering_management_SUITE", size = "large", flaky = True, - shard_count = 46, + shard_count = 47, sharding_method = "case", ) diff --git a/deps/rabbit/src/rabbit_core_ff.erl b/deps/rabbit/src/rabbit_core_ff.erl index fd750b7d0f..3d00e4a398 100644 --- a/deps/rabbit/src/rabbit_core_ff.erl +++ b/deps/rabbit/src/rabbit_core_ff.erl @@ -205,8 +205,12 @@ mds_phase1_migration_post_enable(#{feature_name := FeatureName}) -> mds_migration_post_enable(FeatureName, Tables). mds_migration_enable(FeatureName, TablesAndOwners) -> - ok = ensure_khepri_cluster_matches_mnesia(FeatureName), - mds_migrate_tables_to_khepri(FeatureName, TablesAndOwners). + case ensure_khepri_cluster_matches_mnesia(FeatureName) of + ok -> + mds_migrate_tables_to_khepri(FeatureName, TablesAndOwners); + Error -> + Error + end. mds_migration_post_enable(FeatureName, TablesAndOwners) -> ?assert(rabbit_khepri:is_enabled(non_blocking)), @@ -221,7 +225,12 @@ ensure_khepri_cluster_matches_mnesia(FeatureName) -> "Feature flag `~s`: updating the Khepri cluster to match " "the Mnesia cluster", [FeatureName]), - rabbit_khepri:init_cluster(). + try + rabbit_khepri:init_cluster() + catch + error:{khepri_mnesia_migration_ex, _, _} = Reason -> + {error, Reason} + end. mds_plugin_migration_enable(FeatureName, TablesAndOwners) -> global:set_lock({FeatureName, self()}), diff --git a/deps/rabbit/src/rabbit_khepri.erl b/deps/rabbit/src/rabbit_khepri.erl index ae5d9c920d..91f3d5b441 100644 --- a/deps/rabbit/src/rabbit_khepri.erl +++ b/deps/rabbit/src/rabbit_khepri.erl @@ -86,6 +86,8 @@ -export([if_has_data/1, if_has_data_wildcard/0]). +-export([force_shrink_member_to_current_member/0]). + -ifdef(TEST). -export([force_metadata_store/1, clear_forced_metadata_store/0]). @@ -372,6 +374,10 @@ force_reset() -> DataDir = maps:get(data_dir, ra_system:fetch(coordination)), ok = rabbit_file:recursive_delete(filelib:wildcard(DataDir ++ "/*")). +force_shrink_member_to_current_member() -> + ok = ra_server_proc:force_shrink_members_to_current_member( + {?RA_CLUSTER_NAME, node()}). + ensure_ra_system_started() -> {ok, _} = application:ensure_all_started(khepri), ok = rabbit_ra_systems:ensure_ra_system_started(?RA_SYSTEM). @@ -466,9 +472,9 @@ get_sys_status(Proc) -> cli_cluster_status() -> case rabbit:is_running() of true -> - Nodes = nodes(), - [{nodes, [{disc, [N || N <- Nodes, rabbit_nodes:is_running(N)]}]}, - {running_nodes, Nodes}, + Nodes = locally_known_nodes(), + [{nodes, [{disc, Nodes}]}, + {running_nodes, [N || N <- Nodes, rabbit_nodes:is_running(N)]}, {cluster_name, rabbit_nodes:cluster_name()}]; false -> [] @@ -492,6 +498,9 @@ init_cluster() -> _ = application:ensure_all_started(khepri_mnesia_migration), rabbit_log:debug("Khepri clustering: syncing cluster membership"), mnesia_to_khepri:sync_cluster_membership(?STORE_ID) + catch + error:{khepri_mnesia_migration_ex, _, _} = Reason -> + {error, Reason} after case IsRunning of true -> ok; diff --git a/deps/rabbit/test/clustering_management_SUITE.erl b/deps/rabbit/test/clustering_management_SUITE.erl index 1f5e678887..5ee58a8d18 100644 --- a/deps/rabbit/test/clustering_management_SUITE.erl +++ b/deps/rabbit/test/clustering_management_SUITE.erl @@ -100,6 +100,11 @@ groups() -> start_nodes_in_stop_order, start_nodes_in_stop_order_with_force_boot ]} + ]}, + {clustered_3_nodes, [], + [{cluster_size_3, [], [ + force_standalone_boot + ]} ]} ]} ]. @@ -622,10 +627,16 @@ reset_in_minority(Config) -> ok = rpc:call(Rabbit, application, set_env, [rabbit, khepri_leader_wait_retry_limit, 3]), stop_app(Rabbit), - ?assertMatch({error, 69, _}, reset(Rabbit)), + + is_in_minority(reset(Rabbit)), ok. +is_in_minority(Ret) -> + ?assertMatch({error, 75, _}, Ret), + {error, _, Msg} = Ret, + ?assertMatch(match, re:run(Msg, ".*timed out.*minority.*", [{capture, none}])). + reset_last_disc_node(Config) -> Servers = [Rabbit, Hare | _] = cluster_members(Config), @@ -1216,6 +1227,23 @@ start_nodes_in_stop_order_with_force_boot(Config) -> ok = rabbit_ct_broker_helpers:start_node(Config, Bunny), assert_clustered([Rabbit, Hare, Bunny]). +%% TODO test force_standalone after restarting this last node +force_standalone_boot(Config) -> + [Rabbit, Hare, Bunny] = cluster_members(Config), + + assert_cluster_status({[Rabbit, Hare, Bunny], [Rabbit, Hare, Bunny], [Rabbit, Hare, Bunny]}, + [Rabbit, Hare, Bunny]), + + ok = rabbit_ct_broker_helpers:stop_node(Config, Hare), + ok = rabbit_ct_broker_helpers:stop_node(Config, Bunny), + ok = force_standalone_khepri_boot(Rabbit), + + assert_cluster_status({[Rabbit], [Rabbit], [Rabbit, Hare, Bunny], + [Rabbit, Hare, Bunny], [Rabbit]}, + [Rabbit]), + + ok. + %% ---------------------------------------------------------------------------- %% Internal utils %% ---------------------------------------------------------------------------- @@ -1362,6 +1390,9 @@ change_cluster_node_type(Node, Type) -> update_cluster_nodes(Node, DiscoveryNode) -> rabbit_control_helper:command(update_cluster_nodes, Node, [atom_to_list(DiscoveryNode)]). +force_standalone_khepri_boot(Node) -> + rabbit_control_helper:command(force_standalone_khepri_boot, Node, []). + stop_join_start(Node, ClusterTo, Ram) -> ok = stop_app(Node), ok = join_cluster(Node, ClusterTo, Ram), diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/force_standalone_khepri_boot.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/force_standalone_khepri_boot.ex new file mode 100644 index 0000000000..aaea2c51d7 --- /dev/null +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/force_standalone_khepri_boot.ex @@ -0,0 +1,45 @@ +## This Source Code Form is subject to the terms of the Mozilla Public +## License, v. 2.0. If a copy of the MPL was not distributed with this +## file, You can obtain one at https://mozilla.org/MPL/2.0/. +## +## Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved. + +defmodule RabbitMQ.CLI.Ctl.Commands.ForceStandaloneKhepriBootCommand do + alias RabbitMQ.CLI.Core.{Config, DocGuide} + + @behaviour RabbitMQ.CLI.CommandBehaviour + + use RabbitMQ.CLI.Core.MergesNoDefaults + use RabbitMQ.CLI.Core.AcceptsNoPositionalArguments + + def run([], %{node: node_name} = opts) do + ret = + :rabbit_misc.rpc_call(node_name, :rabbit_khepri, :force_shrink_member_to_current_member, []) + + case ret do + {:badrpc, {:EXIT, {:undef, _}}} -> + {:error, RabbitMQ.CLI.Core.ExitCodes.exit_usage(), + "This command is not supported by node #{node_name}"} + + _ -> + ret + end + end + + use RabbitMQ.CLI.DefaultOutput + + def usage, do: "force_standalone_khepri_boot" + + def usage_doc_guides() do + [ + DocGuide.clustering() + ] + end + + def help_section(), do: :cluster_management + + def description(), + do: "Forces node to start as a standalone node" + + def banner(_, _), do: nil +end diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/join_cluster_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/join_cluster_command.ex index 11b9695ac5..809c0ee095 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/join_cluster_command.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/join_cluster_command.ex @@ -78,6 +78,16 @@ defmodule RabbitMQ.CLI.Ctl.Commands.JoinClusterCommand do "Error: cannot cluster node with itself: #{node_name}"} end + def output( + {:error, + {:khepri_mnesia_migration_ex, :all_mnesia_nodes_must_run, + %{all_nodes: nodes, running_nodes: running}}}, + _opts + ) do + {:error, RabbitMQ.CLI.Core.ExitCodes.exit_software(), + "Error: all mnesia nodes must run to join the cluster, mnesia nodes: #{inspect(nodes)}, running nodes: #{inspect(running)}"} + end + use RabbitMQ.CLI.DefaultOutput def banner([target_node], %{node: node_name}) do diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/reset_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/reset_command.ex index a219d285e1..c2734399a0 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/reset_command.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/reset_command.ex @@ -28,11 +28,6 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ResetCommand do RabbitMQ.CLI.DefaultOutput.mnesia_running_error(node_name)} end - def output({:error, {:timeout, {:metadata_store, _}}}, %{node: node_name}) do - {:error, RabbitMQ.CLI.Core.ExitCodes.exit_tempfail(), - RabbitMQ.CLI.DefaultOutput.khepri_timeout_error(node_name)} - end - use RabbitMQ.CLI.DefaultOutput def usage, do: "reset" diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/default_output.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/default_output.ex index 788b92b065..8e77a62255 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/default_output.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/default_output.ex @@ -17,7 +17,7 @@ defmodule RabbitMQ.CLI.DefaultOutput do end def output(result, opts \\ %{}) do - format_output(normalize_output(result, opts)) + format_output(format_khepri_output(normalize_output(result, opts), opts)) end def mnesia_running_error(node_name) do @@ -68,6 +68,18 @@ defmodule RabbitMQ.CLI.DefaultOutput do defp normalize_output({unknown, _} = input, _opts) when is_atom(unknown), do: {:error, input} defp normalize_output(result, _opts) when not is_atom(result), do: {:ok, result} + defp format_khepri_output({:error, {:timeout, {:metadata_store, _}}}, %{node: node_name}) do + {:error, RabbitMQ.CLI.Core.ExitCodes.exit_tempfail(), khepri_timeout_error(node_name)} + end + + defp format_khepri_output({:error, :timeout_waiting_for_leader}, %{node: node_name}) do + {:error, RabbitMQ.CLI.Core.ExitCodes.exit_tempfail(), khepri_timeout_error(node_name)} + end + + defp format_khepri_output(result, _opts) do + result + end + defp format_output({:error, _} = result) do result end |