summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDiana Parra Corbacho <dparracorbac@vmware.com>2023-05-09 16:30:03 +0200
committerDiana Parra Corbacho <dparracorbac@vmware.com>2023-05-12 12:49:05 +0200
commit9ab28025457fe8e6a5429f5482288ddfc01034ce (patch)
treecfb66ee0a31862e6c7a38b10e34da9f99eccab45
parentc5c1f4a57b424d9d28b8f1d6da73f9668ecd093f (diff)
downloadrabbitmq-server-git-khepri.tar.gz
Khepri: clusteringkhepri
-rw-r--r--deps/rabbit/BUILD.bazel2
-rw-r--r--deps/rabbit/src/rabbit_core_ff.erl15
-rw-r--r--deps/rabbit/src/rabbit_khepri.erl15
-rw-r--r--deps/rabbit/test/clustering_management_SUITE.erl33
-rw-r--r--deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/force_standalone_khepri_boot.ex45
-rw-r--r--deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/join_cluster_command.ex10
-rw-r--r--deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/reset_command.ex5
-rw-r--r--deps/rabbitmq_cli/lib/rabbitmq/cli/default_output.ex14
8 files changed, 125 insertions, 14 deletions
diff --git a/deps/rabbit/BUILD.bazel b/deps/rabbit/BUILD.bazel
index 14e8986503..7dea0851f8 100644
--- a/deps/rabbit/BUILD.bazel
+++ b/deps/rabbit/BUILD.bazel
@@ -332,7 +332,7 @@ rabbitmq_integration_suite(
name = "clustering_management_SUITE",
size = "large",
flaky = True,
- shard_count = 46,
+ shard_count = 47,
sharding_method = "case",
)
diff --git a/deps/rabbit/src/rabbit_core_ff.erl b/deps/rabbit/src/rabbit_core_ff.erl
index fd750b7d0f..3d00e4a398 100644
--- a/deps/rabbit/src/rabbit_core_ff.erl
+++ b/deps/rabbit/src/rabbit_core_ff.erl
@@ -205,8 +205,12 @@ mds_phase1_migration_post_enable(#{feature_name := FeatureName}) ->
mds_migration_post_enable(FeatureName, Tables).
mds_migration_enable(FeatureName, TablesAndOwners) ->
- ok = ensure_khepri_cluster_matches_mnesia(FeatureName),
- mds_migrate_tables_to_khepri(FeatureName, TablesAndOwners).
+ case ensure_khepri_cluster_matches_mnesia(FeatureName) of
+ ok ->
+ mds_migrate_tables_to_khepri(FeatureName, TablesAndOwners);
+ Error ->
+ Error
+ end.
mds_migration_post_enable(FeatureName, TablesAndOwners) ->
?assert(rabbit_khepri:is_enabled(non_blocking)),
@@ -221,7 +225,12 @@ ensure_khepri_cluster_matches_mnesia(FeatureName) ->
"Feature flag `~s`: updating the Khepri cluster to match "
"the Mnesia cluster",
[FeatureName]),
- rabbit_khepri:init_cluster().
+ try
+ rabbit_khepri:init_cluster()
+ catch
+ error:{khepri_mnesia_migration_ex, _, _} = Reason ->
+ {error, Reason}
+ end.
mds_plugin_migration_enable(FeatureName, TablesAndOwners) ->
global:set_lock({FeatureName, self()}),
diff --git a/deps/rabbit/src/rabbit_khepri.erl b/deps/rabbit/src/rabbit_khepri.erl
index ae5d9c920d..91f3d5b441 100644
--- a/deps/rabbit/src/rabbit_khepri.erl
+++ b/deps/rabbit/src/rabbit_khepri.erl
@@ -86,6 +86,8 @@
-export([if_has_data/1,
if_has_data_wildcard/0]).
+-export([force_shrink_member_to_current_member/0]).
+
-ifdef(TEST).
-export([force_metadata_store/1,
clear_forced_metadata_store/0]).
@@ -372,6 +374,10 @@ force_reset() ->
DataDir = maps:get(data_dir, ra_system:fetch(coordination)),
ok = rabbit_file:recursive_delete(filelib:wildcard(DataDir ++ "/*")).
+force_shrink_member_to_current_member() ->
+ ok = ra_server_proc:force_shrink_members_to_current_member(
+ {?RA_CLUSTER_NAME, node()}).
+
ensure_ra_system_started() ->
{ok, _} = application:ensure_all_started(khepri),
ok = rabbit_ra_systems:ensure_ra_system_started(?RA_SYSTEM).
@@ -466,9 +472,9 @@ get_sys_status(Proc) ->
cli_cluster_status() ->
case rabbit:is_running() of
true ->
- Nodes = nodes(),
- [{nodes, [{disc, [N || N <- Nodes, rabbit_nodes:is_running(N)]}]},
- {running_nodes, Nodes},
+ Nodes = locally_known_nodes(),
+ [{nodes, [{disc, Nodes}]},
+ {running_nodes, [N || N <- Nodes, rabbit_nodes:is_running(N)]},
{cluster_name, rabbit_nodes:cluster_name()}];
false ->
[]
@@ -492,6 +498,9 @@ init_cluster() ->
_ = application:ensure_all_started(khepri_mnesia_migration),
rabbit_log:debug("Khepri clustering: syncing cluster membership"),
mnesia_to_khepri:sync_cluster_membership(?STORE_ID)
+ catch
+ error:{khepri_mnesia_migration_ex, _, _} = Reason ->
+ {error, Reason}
after
case IsRunning of
true -> ok;
diff --git a/deps/rabbit/test/clustering_management_SUITE.erl b/deps/rabbit/test/clustering_management_SUITE.erl
index 1f5e678887..5ee58a8d18 100644
--- a/deps/rabbit/test/clustering_management_SUITE.erl
+++ b/deps/rabbit/test/clustering_management_SUITE.erl
@@ -100,6 +100,11 @@ groups() ->
start_nodes_in_stop_order,
start_nodes_in_stop_order_with_force_boot
]}
+ ]},
+ {clustered_3_nodes, [],
+ [{cluster_size_3, [], [
+ force_standalone_boot
+ ]}
]}
]}
].
@@ -622,10 +627,16 @@ reset_in_minority(Config) ->
ok = rpc:call(Rabbit, application, set_env,
[rabbit, khepri_leader_wait_retry_limit, 3]),
stop_app(Rabbit),
- ?assertMatch({error, 69, _}, reset(Rabbit)),
+
+ is_in_minority(reset(Rabbit)),
ok.
+is_in_minority(Ret) ->
+ ?assertMatch({error, 75, _}, Ret),
+ {error, _, Msg} = Ret,
+ ?assertMatch(match, re:run(Msg, ".*timed out.*minority.*", [{capture, none}])).
+
reset_last_disc_node(Config) ->
Servers = [Rabbit, Hare | _] = cluster_members(Config),
@@ -1216,6 +1227,23 @@ start_nodes_in_stop_order_with_force_boot(Config) ->
ok = rabbit_ct_broker_helpers:start_node(Config, Bunny),
assert_clustered([Rabbit, Hare, Bunny]).
+%% TODO test force_standalone after restarting this last node
+force_standalone_boot(Config) ->
+ [Rabbit, Hare, Bunny] = cluster_members(Config),
+
+ assert_cluster_status({[Rabbit, Hare, Bunny], [Rabbit, Hare, Bunny], [Rabbit, Hare, Bunny]},
+ [Rabbit, Hare, Bunny]),
+
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Hare),
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Bunny),
+ ok = force_standalone_khepri_boot(Rabbit),
+
+ assert_cluster_status({[Rabbit], [Rabbit], [Rabbit, Hare, Bunny],
+ [Rabbit, Hare, Bunny], [Rabbit]},
+ [Rabbit]),
+
+ ok.
+
%% ----------------------------------------------------------------------------
%% Internal utils
%% ----------------------------------------------------------------------------
@@ -1362,6 +1390,9 @@ change_cluster_node_type(Node, Type) ->
update_cluster_nodes(Node, DiscoveryNode) ->
rabbit_control_helper:command(update_cluster_nodes, Node, [atom_to_list(DiscoveryNode)]).
+force_standalone_khepri_boot(Node) ->
+ rabbit_control_helper:command(force_standalone_khepri_boot, Node, []).
+
stop_join_start(Node, ClusterTo, Ram) ->
ok = stop_app(Node),
ok = join_cluster(Node, ClusterTo, Ram),
diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/force_standalone_khepri_boot.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/force_standalone_khepri_boot.ex
new file mode 100644
index 0000000000..aaea2c51d7
--- /dev/null
+++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/force_standalone_khepri_boot.ex
@@ -0,0 +1,45 @@
+## This Source Code Form is subject to the terms of the Mozilla Public
+## License, v. 2.0. If a copy of the MPL was not distributed with this
+## file, You can obtain one at https://mozilla.org/MPL/2.0/.
+##
+## Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
+
+defmodule RabbitMQ.CLI.Ctl.Commands.ForceStandaloneKhepriBootCommand do
+ alias RabbitMQ.CLI.Core.{Config, DocGuide}
+
+ @behaviour RabbitMQ.CLI.CommandBehaviour
+
+ use RabbitMQ.CLI.Core.MergesNoDefaults
+ use RabbitMQ.CLI.Core.AcceptsNoPositionalArguments
+
+ def run([], %{node: node_name} = opts) do
+ ret =
+ :rabbit_misc.rpc_call(node_name, :rabbit_khepri, :force_shrink_member_to_current_member, [])
+
+ case ret do
+ {:badrpc, {:EXIT, {:undef, _}}} ->
+ {:error, RabbitMQ.CLI.Core.ExitCodes.exit_usage(),
+ "This command is not supported by node #{node_name}"}
+
+ _ ->
+ ret
+ end
+ end
+
+ use RabbitMQ.CLI.DefaultOutput
+
+ def usage, do: "force_standalone_khepri_boot"
+
+ def usage_doc_guides() do
+ [
+ DocGuide.clustering()
+ ]
+ end
+
+ def help_section(), do: :cluster_management
+
+ def description(),
+ do: "Forces node to start as a standalone node"
+
+ def banner(_, _), do: nil
+end
diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/join_cluster_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/join_cluster_command.ex
index 11b9695ac5..809c0ee095 100644
--- a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/join_cluster_command.ex
+++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/join_cluster_command.ex
@@ -78,6 +78,16 @@ defmodule RabbitMQ.CLI.Ctl.Commands.JoinClusterCommand do
"Error: cannot cluster node with itself: #{node_name}"}
end
+ def output(
+ {:error,
+ {:khepri_mnesia_migration_ex, :all_mnesia_nodes_must_run,
+ %{all_nodes: nodes, running_nodes: running}}},
+ _opts
+ ) do
+ {:error, RabbitMQ.CLI.Core.ExitCodes.exit_software(),
+ "Error: all mnesia nodes must run to join the cluster, mnesia nodes: #{inspect(nodes)}, running nodes: #{inspect(running)}"}
+ end
+
use RabbitMQ.CLI.DefaultOutput
def banner([target_node], %{node: node_name}) do
diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/reset_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/reset_command.ex
index a219d285e1..c2734399a0 100644
--- a/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/reset_command.ex
+++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/ctl/commands/reset_command.ex
@@ -28,11 +28,6 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ResetCommand do
RabbitMQ.CLI.DefaultOutput.mnesia_running_error(node_name)}
end
- def output({:error, {:timeout, {:metadata_store, _}}}, %{node: node_name}) do
- {:error, RabbitMQ.CLI.Core.ExitCodes.exit_tempfail(),
- RabbitMQ.CLI.DefaultOutput.khepri_timeout_error(node_name)}
- end
-
use RabbitMQ.CLI.DefaultOutput
def usage, do: "reset"
diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/default_output.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/default_output.ex
index 788b92b065..8e77a62255 100644
--- a/deps/rabbitmq_cli/lib/rabbitmq/cli/default_output.ex
+++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/default_output.ex
@@ -17,7 +17,7 @@ defmodule RabbitMQ.CLI.DefaultOutput do
end
def output(result, opts \\ %{}) do
- format_output(normalize_output(result, opts))
+ format_output(format_khepri_output(normalize_output(result, opts), opts))
end
def mnesia_running_error(node_name) do
@@ -68,6 +68,18 @@ defmodule RabbitMQ.CLI.DefaultOutput do
defp normalize_output({unknown, _} = input, _opts) when is_atom(unknown), do: {:error, input}
defp normalize_output(result, _opts) when not is_atom(result), do: {:ok, result}
+ defp format_khepri_output({:error, {:timeout, {:metadata_store, _}}}, %{node: node_name}) do
+ {:error, RabbitMQ.CLI.Core.ExitCodes.exit_tempfail(), khepri_timeout_error(node_name)}
+ end
+
+ defp format_khepri_output({:error, :timeout_waiting_for_leader}, %{node: node_name}) do
+ {:error, RabbitMQ.CLI.Core.ExitCodes.exit_tempfail(), khepri_timeout_error(node_name)}
+ end
+
+ defp format_khepri_output(result, _opts) do
+ result
+ end
+
defp format_output({:error, _} = result) do
result
end