summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2022-01-11 22:05:16 +0300
committerMichael Klishin <michael@clojurewerkz.org>2022-01-11 22:05:16 +0300
commita0855a89f28b314f816caa63b17f648d25ff83c8 (patch)
tree16e638ac859e30da5ec5982c0f440e4fdcb9e671
parent8e2edc76c288e04ae89fbd6996fd429be733b082 (diff)
downloadrabbitmq-server-git-a0855a89f28b314f816caa63b17f648d25ff83c8.tar.gz
Import queue and binding definitions when a certain number of nodes join
by default the number is 1. This is to make sure that quorum queues have enough nodes to provision their initial replicas on instead of settling on a smaller number, which very likely can be just 1 (the first node to boot). References #3850.
-rw-r--r--deps/rabbit/src/rabbit_definitions.erl47
-rw-r--r--deps/rabbit/src/rabbit_nodes.erl15
2 files changed, 52 insertions, 10 deletions
diff --git a/deps/rabbit/src/rabbit_definitions.erl b/deps/rabbit/src/rabbit_definitions.erl
index 152a6be5f2..825a2e56c0 100644
--- a/deps/rabbit/src/rabbit_definitions.erl
+++ b/deps/rabbit/src/rabbit_definitions.erl
@@ -257,9 +257,7 @@ apply_defs(Map, ActingUser, SuccessFun) when is_function(SuccessFun) ->
concurrent_for_all(permissions, ActingUser, Map, fun add_permission/2),
concurrent_for_all(topic_permissions, ActingUser, Map, fun add_topic_permission/2),
- concurrent_for_all(queues, ActingUser, Map, fun add_queue/2),
concurrent_for_all(exchanges, ActingUser, Map, fun add_exchange/2),
- concurrent_for_all(bindings, ActingUser, Map, fun add_binding/2),
sequential_for_all(global_parameters, ActingUser, Map, fun add_global_parameter/2),
%% importing policies concurrently can be unsafe as queues will be getting
@@ -267,6 +265,19 @@ apply_defs(Map, ActingUser, SuccessFun) when is_function(SuccessFun) ->
sequential_for_all(policies, ActingUser, Map, fun add_policy/2),
sequential_for_all(parameters, ActingUser, Map, fun add_parameter/2),
+ rabbit_nodes:if_reached_target_cluster_size(
+ fun() ->
+ concurrent_for_all(queues, ActingUser, Map, fun add_queue/2),
+ concurrent_for_all(bindings, ActingUser, Map, fun add_binding/2)
+ end,
+
+ fun() ->
+ rabbit_log:info("There are fewer than target cluster size (~d) nodes online,"
+ " skipping queue and binding import from definitions",
+ [rabbit_nodes:target_cluster_size_hint()])
+ end
+ ),
+
SuccessFun(),
ok
catch {error, E} -> {error, E};
@@ -284,15 +295,25 @@ apply_defs(Map, ActingUser, SuccessFun, VHost) when is_binary(VHost) ->
try
validate_limits(Map, VHost),
- concurrent_for_all(queues, ActingUser, Map, VHost, fun add_queue/3),
concurrent_for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3),
- concurrent_for_all(bindings, ActingUser, Map, VHost, fun add_binding/3),
-
sequential_for_all(parameters, ActingUser, Map, VHost, fun add_parameter/3),
%% importing policies concurrently can be unsafe as queues will be getting
%% potentially out of order notifications of applicable policy changes
sequential_for_all(policies, ActingUser, Map, VHost, fun add_policy/3),
+ rabbit_nodes:if_reached_target_cluster_size(
+ fun() ->
+ concurrent_for_all(queues, ActingUser, Map, VHost, fun add_queue/3),
+ concurrent_for_all(bindings, ActingUser, Map, VHost, fun add_binding/3)
+ end,
+
+ fun() ->
+ rabbit_log:info("There are fewer than target cluster size (~d) nodes online,"
+ " skipping queue and binding import from definitions",
+ [rabbit_nodes:target_cluster_size_hint()])
+ end
+ ),
+
SuccessFun()
catch {error, E} -> {error, format(E)};
exit:E -> {error, format(E)}
@@ -310,15 +331,25 @@ apply_defs(Map, ActingUser, SuccessFun, ErrorFun, VHost) ->
try
validate_limits(Map, VHost),
- concurrent_for_all(queues, ActingUser, Map, VHost, fun add_queue/3),
- concurrent_for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3),
concurrent_for_all(bindings, ActingUser, Map, VHost, fun add_binding/3),
-
sequential_for_all(parameters, ActingUser, Map, VHost, fun add_parameter/3),
%% importing policies concurrently can be unsafe as queues will be getting
%% potentially out of order notifications of applicable policy changes
sequential_for_all(policies, ActingUser, Map, VHost, fun add_policy/3),
+ rabbit_nodes:if_reached_target_cluster_size(
+ fun() ->
+ concurrent_for_all(queues, ActingUser, Map, VHost, fun add_queue/3),
+ concurrent_for_all(bindings, ActingUser, Map, VHost, fun add_binding/3)
+ end,
+
+ fun() ->
+ rabbit_log:info("There are fewer than target cluster size (~d) nodes online,"
+ " skipping queue and binding import from definitions",
+ [rabbit_nodes:target_cluster_size_hint()])
+ end
+ ),
+
SuccessFun()
catch {error, E} -> ErrorFun(format(E));
exit:E -> ErrorFun(format(E))
diff --git a/deps/rabbit/src/rabbit_nodes.erl b/deps/rabbit/src/rabbit_nodes.erl
index 092e8dc912..1668df78e3 100644
--- a/deps/rabbit/src/rabbit_nodes.erl
+++ b/deps/rabbit/src/rabbit_nodes.erl
@@ -14,7 +14,8 @@
await_running_count/2, is_single_node_cluster/0,
boot/0]).
-export([persistent_cluster_id/0, seed_internal_cluster_id/0, seed_user_provided_cluster_name/0]).
--export([all/0, all_running_with_hashes/0, target_cluster_size_hint/0, reached_target_cluster_size/0]).
+-export([all/0, all_running_with_hashes/0, target_cluster_size_hint/0, reached_target_cluster_size/0,
+ if_reached_target_cluster_size/2]).
-export([lock_id/1, lock_retries/0]).
-include_lib("kernel/include/inet.hrl").
@@ -82,7 +83,7 @@ is_process_running(Node, Process) ->
-spec cluster_name() -> binary().
cluster_name() ->
- case rabbit_runtime_parameters:value_global(cluster_name) of
+ case rabbit_runtime_parameters:value_global(cluster_name) of
not_found -> cluster_name_default();
Name -> Name
end.
@@ -183,6 +184,16 @@ target_cluster_size_hint() ->
reached_target_cluster_size() ->
running_count() >= target_cluster_size_hint().
+-spec if_reached_target_cluster_size(ConditionSatisfiedFun :: fun(), ConditionNotSatisfiedFun :: fun()) -> boolean().
+if_reached_target_cluster_size(ConditionSatisfiedFun, ConditionNotSatisfiedFun) ->
+ case reached_target_cluster_size() of
+ true ->
+ ConditionSatisfiedFun(),
+ true;
+ false ->
+ ConditionNotSatisfiedFun(),
+ false
+ end.
-spec lock_id(Node :: node()) -> {ResourceId :: string(), LockRequesterId :: node()}.
lock_id(Node) ->