summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJean-Sébastien Pédron <jean-sebastien@rabbitmq.com>2020-03-16 15:06:09 +0100
committerGitHub <noreply@github.com>2020-03-16 15:06:09 +0100
commita0ba0ad9578e8a5617554f58844a63e00c20aefe (patch)
treed69e8ad446ff67352e487bccf43558518fef68d2
parent141f6bfc2ba058b5e57f93dc08a2d5a0595a3f1e (diff)
parent17890d5b7ad775485d7d5021a6dc430735321a4b (diff)
downloadrabbitmq-server-git-a0ba0ad9578e8a5617554f58844a63e00c20aefe.tar.gz
Merge pull request #2276 from rabbitmq/mk-peer-discovery-retries
Introduce peer discovery retries
-rw-r--r--priv/schema/rabbit.schema24
-rw-r--r--src/rabbit_mnesia.erl55
-rw-r--r--src/rabbit_nodes.erl9
-rw-r--r--src/rabbit_peer_discovery.erl21
-rw-r--r--src/rabbit_peer_discovery_classic_config.erl23
-rw-r--r--src/rabbit_table.erl1
-rw-r--r--test/peer_discovery_classic_config_SUITE.erl160
7 files changed, 268 insertions, 25 deletions
diff --git a/priv/schema/rabbit.schema b/priv/schema/rabbit.schema
index 1220b0dd05..7fed1372cf 100644
--- a/priv/schema/rabbit.schema
+++ b/priv/schema/rabbit.schema
@@ -952,6 +952,30 @@ fun(Conf) ->
end
end}.
+%% Cluster formation: discovery failure retries
+
+{mapping, "cluster_formation.lock_retry_limit", "rabbit.cluster_formation.lock_retry_limit",
+ [
+ {datatype, integer},
+ {validators, ["non_zero_positive_integer"]}
+ ]}.
+{mapping, "cluster_formation.lock_retry_timeout", "rabbit.cluster_formation.lock_retry_timeout",
+ [
+ {datatype, integer},
+ {validators, ["non_zero_positive_integer"]}
+ ]}.
+
+{mapping, "cluster_formation.discovery_retry_limit", "rabbit.cluster_formation.discovery_retry_limit",
+ [
+ {datatype, integer},
+ {validators, ["non_zero_positive_integer"]}
+ ]}.
+{mapping, "cluster_formation.discovery_retry_interval", "rabbit.cluster_formation.discovery_retry_interval",
+ [
+ {datatype, integer},
+ {validators, ["non_zero_positive_integer"]}
+ ]}.
+
%% Classic config-driven peer discovery backend.
%%
%% Make clustering happen *automatically* at startup - only applied
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index b627225785..d0ef4d5dc8 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -97,48 +97,61 @@ init() ->
ok.
init_with_lock() ->
- {Retries, Timeout} = rabbit_peer_discovery:retry_timeout(),
- init_with_lock(Retries, Timeout, fun init_from_config/0).
+ {Retries, Timeout} = rabbit_peer_discovery:locking_retry_timeout(),
+ init_with_lock(Retries, Timeout, fun run_peer_discovery/0).
-init_with_lock(0, _, InitFromConfig) ->
+init_with_lock(0, _, RunPeerDiscovery) ->
case rabbit_peer_discovery:lock_acquisition_failure_mode() of
ignore ->
rabbit_log:warning("Cannot acquire a lock during clustering", []),
- InitFromConfig(),
+ RunPeerDiscovery(),
rabbit_peer_discovery:maybe_register();
fail ->
exit(cannot_acquire_startup_lock)
end;
-init_with_lock(Retries, Timeout, InitFromConfig) ->
+init_with_lock(Retries, Timeout, RunPeerDiscovery) ->
case rabbit_peer_discovery:lock() of
not_supported ->
rabbit_log:info("Peer discovery backend does not support locking, falling back to randomized delay"),
%% See rabbitmq/rabbitmq-server#1202 for details.
rabbit_peer_discovery:maybe_inject_randomized_delay(),
- InitFromConfig(),
+ RunPeerDiscovery(),
rabbit_peer_discovery:maybe_register();
{error, _Reason} ->
timer:sleep(Timeout),
- init_with_lock(Retries - 1, Timeout, InitFromConfig);
+ init_with_lock(Retries - 1, Timeout, RunPeerDiscovery);
{ok, Data} ->
try
- InitFromConfig(),
+ RunPeerDiscovery(),
rabbit_peer_discovery:maybe_register()
after
rabbit_peer_discovery:unlock(Data)
end
end.
-init_from_config() ->
+-spec run_peer_discovery() -> ok | {[node()], node_type()}.
+run_peer_discovery() ->
+ {RetriesLeft, DelayInterval} = rabbit_peer_discovery:discovery_retries(),
+ run_peer_discovery_with_retries(RetriesLeft, DelayInterval).
+
+-spec run_peer_discovery_with_retries(non_neg_integer(), non_neg_integer()) -> ok | {[node()], node_type()}.
+run_peer_discovery_with_retries(0, _DelayInterval) ->
+ ok;
+run_peer_discovery_with_retries(RetriesLeft, DelayInterval) ->
FindBadNodeNames = fun
(Name, BadNames) when is_atom(Name) -> BadNames;
(Name, BadNames) -> [Name | BadNames]
end,
{DiscoveredNodes, NodeType} =
case rabbit_peer_discovery:discover_cluster_nodes() of
+ {error, Reason} ->
+ RetriesLeft1 = RetriesLeft - 1,
+ rabbit_log:error("Peer discovery returned an error: ~p. Will retry after a delay of ~b, ~b retries left...",
+ [Reason, DelayInterval, RetriesLeft1]),
+ timer:sleep(DelayInterval),
+ run_peer_discovery_with_retries(RetriesLeft1, DelayInterval);
{ok, {Nodes, Type} = Config}
- when is_list(Nodes) andalso
- (Type == disc orelse Type == disk orelse Type == ram) ->
+ when is_list(Nodes) andalso (Type == disc orelse Type == disk orelse Type == ram) ->
case lists:foldr(FindBadNodeNames, [], Nodes) of
[] -> Config;
BadNames -> e({invalid_cluster_node_names, BadNames})
@@ -167,6 +180,16 @@ init_from_config() ->
%% reachable and compatible (in terms of Mnesia internal protocol version and such)
%% cluster peers in order.
join_discovered_peers(TryNodes, NodeType) ->
+ {RetriesLeft, DelayInterval} = rabbit_peer_discovery:discovery_retries(),
+ join_discovered_peers_with_retries(TryNodes, NodeType, RetriesLeft, DelayInterval).
+
+join_discovered_peers_with_retries(TryNodes, _NodeType, 0, _DelayInterval) ->
+ rabbit_log:warning(
+ "Could not successfully contact any node of: ~s (as in Erlang distribution). "
+ "Starting as a blank standalone node...~n",
+ [string:join(lists:map(fun atom_to_list/1, TryNodes), ",")]),
+ init_db_and_upgrade([node()], disc, false, _Retry = true);
+join_discovered_peers_with_retries(TryNodes, NodeType, RetriesLeft, DelayInterval) ->
case find_reachable_peer_to_cluster_with(nodes_excl_me(TryNodes)) of
{ok, Node} ->
rabbit_log:info("Node '~s' selected for auto-clustering~n", [Node]),
@@ -175,11 +198,11 @@ join_discovered_peers(TryNodes, NodeType) ->
rabbit_connection_tracking:boot(),
rabbit_node_monitor:notify_joined_cluster();
none ->
- rabbit_log:warning(
- "Could not successfully contact any node of: ~s (as in Erlang distribution). "
- "Starting as a blank standalone node...~n",
- [string:join(lists:map(fun atom_to_list/1, TryNodes), ",")]),
- init_db_and_upgrade([node()], disc, false, _Retry = true)
+ RetriesLeft1 = RetriesLeft - 1,
+ rabbit_log:error("Trying to join discovered peers failed. Will retry after a delay of ~b, ~b retries left...",
+ [DelayInterval, RetriesLeft1]),
+ timer:sleep(DelayInterval),
+ join_discovered_peers_with_retries(TryNodes, NodeType, RetriesLeft1, DelayInterval)
end.
%% Make the node join a cluster. The node will be reset automatically
diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl
index d38bd46018..1f066c6b91 100644
--- a/src/rabbit_nodes.erl
+++ b/src/rabbit_nodes.erl
@@ -17,7 +17,7 @@
-module(rabbit_nodes).
--export([names/1, diagnostics/1, make/1, parts/1, cookie_hash/0,
+-export([names/1, diagnostics/1, make/1, make/2, parts/1, cookie_hash/0,
is_running/2, is_process_running/2,
cluster_name/0, set_cluster_name/1, set_cluster_name/2, ensure_epmd/0,
all_running/0, name_type/0, running_count/0,
@@ -55,8 +55,11 @@ names(Hostname) ->
diagnostics(Nodes) ->
rabbit_nodes_common:diagnostics(Nodes).
-make(NodeStr) ->
- rabbit_nodes_common:make(NodeStr).
+make(NameOrParts) ->
+ rabbit_nodes_common:make(NameOrParts).
+
+make(ShortName, Hostname) ->
+ make({ShortName, Hostname}).
parts(NodeStr) ->
rabbit_nodes_common:parts(NodeStr).
diff --git a/src/rabbit_peer_discovery.erl b/src/rabbit_peer_discovery.erl
index 44c36e06d2..8b44e8b5a0 100644
--- a/src/rabbit_peer_discovery.erl
+++ b/src/rabbit_peer_discovery.erl
@@ -23,8 +23,9 @@
-export([maybe_init/0, discover_cluster_nodes/0, backend/0, node_type/0,
normalize/1, format_discovered_nodes/1, log_configured_backend/0,
register/0, unregister/0, maybe_register/0, maybe_unregister/0,
- maybe_inject_randomized_delay/0, lock/0, unlock/1]).
--export([append_node_prefix/1, node_prefix/0, retry_timeout/0,
+ maybe_inject_randomized_delay/0, lock/0, unlock/1,
+ discovery_retries/0]).
+-export([append_node_prefix/1, node_prefix/0, locking_retry_timeout/0,
lock_acquisition_failure_mode/0]).
-define(DEFAULT_BACKEND, rabbit_peer_discovery_classic_config).
@@ -61,9 +62,9 @@ node_type() ->
?DEFAULT_NODE_TYPE
end.
--spec retry_timeout() -> {Retries :: integer(), Timeout :: integer()}.
+-spec locking_retry_timeout() -> {Retries :: integer(), Timeout :: integer()}.
-retry_timeout() ->
+locking_retry_timeout() ->
case application:get_env(rabbit, cluster_formation) of
{ok, Proplist} ->
Retries = proplists:get_value(lock_retry_limit, Proplist, 10),
@@ -146,6 +147,18 @@ maybe_unregister() ->
ok
end.
+-spec discovery_retries() -> {Retries :: integer(), Interval :: integer()}.
+
+discovery_retries() ->
+ case application:get_env(rabbit, cluster_formation) of
+ {ok, Proplist} ->
+ Retries = proplists:get_value(discovery_retry_limit, Proplist, 10),
+ Interval = proplists:get_value(discovery_retry_interval, Proplist, 500),
+ {Retries, Interval};
+ undefined ->
+ {10, 500}
+ end.
+
-spec maybe_inject_randomized_delay() -> ok.
maybe_inject_randomized_delay() ->
diff --git a/src/rabbit_peer_discovery_classic_config.erl b/src/rabbit_peer_discovery_classic_config.erl
index e5f41c9594..42d9db1c61 100644
--- a/src/rabbit_peer_discovery_classic_config.erl
+++ b/src/rabbit_peer_discovery_classic_config.erl
@@ -26,7 +26,8 @@
%% API
%%
--spec list_nodes() -> {ok, {Nodes :: [node()], rabbit_types:node_type()}}.
+-spec list_nodes() -> {ok, {Nodes :: [node()], rabbit_types:node_type()}} |
+ {error, Reason :: string()}.
list_nodes() ->
case application:get_env(rabbit, cluster_nodes, {[], disc}) of
@@ -37,7 +38,9 @@ list_nodes() ->
-spec supports_registration() -> boolean().
supports_registration() ->
- false.
+ %% If we don't have any nodes configured, skip randomized delay and similar operations
+ %% as we don't want to delay startup for no reason. MK.
+ has_any_peer_nodes_configured().
-spec register() -> ok.
@@ -63,3 +66,19 @@ lock(_Node) ->
unlock(_Data) ->
ok.
+
+%%
+%% Helpers
+%%
+
+has_any_peer_nodes_configured() ->
+ case application:get_env(rabbit, cluster_nodes, []) of
+ {[], _NodeType} ->
+ false;
+ {Nodes, _NodeType} when is_list(Nodes) ->
+ true;
+ [] ->
+ false;
+ Nodes when is_list(Nodes) ->
+ true
+ end.
diff --git a/src/rabbit_table.erl b/src/rabbit_table.erl
index c504719e4d..b00f39ebec 100644
--- a/src/rabbit_table.erl
+++ b/src/rabbit_table.erl
@@ -114,6 +114,7 @@ wait(TableNames, Timeout, Retries) ->
end,
case {Retries, Result} of
{_, ok} ->
+ rabbit_log:info("Successfully synced tables from a peer"),
ok;
{1, {error, _} = Error} ->
throw(Error);
diff --git a/test/peer_discovery_classic_config_SUITE.erl b/test/peer_discovery_classic_config_SUITE.erl
new file mode 100644
index 0000000000..9e51139e69
--- /dev/null
+++ b/test/peer_discovery_classic_config_SUITE.erl
@@ -0,0 +1,160 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% https://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2011-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(peer_discovery_classic_config_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-import(rabbit_ct_broker_helpers, [
+ stop_node/2, reset_node/1, start_node/2,
+ rewrite_node_config_file/2, cluster_members_online/2
+]).
+
+-compile(export_all).
+
+all() ->
+ [
+ {group, non_parallel}
+ ].
+
+groups() ->
+ [
+ {non_parallel, [], [
+ successful_discovery
+ , successful_discovery_with_a_subset_of_nodes_coming_online
+ , no_nodes_configured
+ ]}
+ ].
+
+suite() ->
+ [
+ {timetrap, {minutes, 5}}
+ ].
+
+
+%%
+%% Setup/teardown.
+%%
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(_, Config) ->
+ Config.
+
+end_per_group(_, Config) ->
+ Config.
+
+init_per_testcase(Testcase, Config) when Testcase =:= successful_discovery->
+ Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase),
+
+ N = 3,
+ NodeNames = [
+ list_to_atom(rabbit_misc:format("~s-~b", [Testcase, I]))
+ || I <- lists:seq(1, N)
+ ],
+ Config2 = rabbit_ct_helpers:set_config(Config1, [
+ {rmq_nodename_suffix, Testcase},
+ %% note: this must not include the host part
+ {rmq_nodes_count, NodeNames},
+ {rmq_nodes_clustered, false}
+ ]),
+ NodeNamesWithHostname = [rabbit_nodes:make({Name, "localhost"}) || Name <- NodeNames],
+ Config3 = rabbit_ct_helpers:merge_app_env(Config2,
+ {rabbit, [
+ {cluster_nodes, {NodeNamesWithHostname, disc}}
+ ]}),
+ rabbit_ct_helpers:run_steps(Config3,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps());
+init_per_testcase(Testcase, Config) when Testcase =:= successful_discovery_with_a_subset_of_nodes_coming_online->
+ Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase),
+
+ N = 2,
+ NodeNames = [
+ list_to_atom(rabbit_misc:format("~s-~b", [Testcase, I]))
+ || I <- lists:seq(1, N)
+ ],
+ Config2 = rabbit_ct_helpers:set_config(Config1, [
+ {rmq_nodename_suffix, Testcase},
+ %% note: this must not include the host part
+ {rmq_nodes_count, NodeNames},
+ {rmq_nodes_clustered, false}
+ ]),
+ NodeNamesWithHostname = [rabbit_nodes:make({Name, "localhost"}) || Name <- [nonexistent | NodeNames]],
+ %% reduce retry time since we know one node on the list does
+ %% not exist and not just unreachable
+ Config3 = rabbit_ct_helpers:merge_app_env(Config2,
+ {rabbit, [
+ {cluster_formation, [
+ {discovery_retry_limit, 10},
+ {discovery_retry_interval, 200}
+ ]},
+ {cluster_nodes, {NodeNamesWithHostname, disc}}
+ ]}),
+ rabbit_ct_helpers:run_steps(Config3,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps());
+init_per_testcase(no_nodes_configured = Testcase, Config) ->
+ Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase),
+ Config2 = rabbit_ct_helpers:set_config(Config1, [
+ {rmq_nodename_suffix, Testcase},
+ {rmq_nodes_count, 2},
+ {rmq_nodes_clustered, false}
+ ]),
+ Config3 = rabbit_ct_helpers:merge_app_env(Config2,
+ {rabbit, [
+ {cluster_nodes, {[], disc}}
+ ]}),
+ rabbit_ct_helpers:run_steps(Config3,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps());
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase),
+ rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()).
+
+
+end_per_testcase(Testcase, Config) ->
+ Config1 = rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()),
+ rabbit_ct_helpers:testcase_finished(Config1, Testcase).
+
+
+%%
+%% Test cases
+%%
+
+successful_discovery(Config) ->
+ ?assertEqual(3, length(cluster_members_online(Config, 0))),
+ ?assertEqual(3, length(cluster_members_online(Config, 1))).
+
+successful_discovery_with_a_subset_of_nodes_coming_online(Config) ->
+ ?assertEqual(2, length(cluster_members_online(Config, 0))),
+ ?assertEqual(2, length(cluster_members_online(Config, 1))).
+
+no_nodes_configured(Config) ->
+ ct:pal("Cluster members online: ~p", [cluster_members_online(Config, 0)]),
+ ?assert(length(cluster_members_online(Config, 0)) < 2).
+