summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <klishinm@vmware.com>2022-01-13 10:54:51 +0300
committerGitHub <noreply@github.com>2022-01-13 10:54:51 +0300
commitdd24555b0f443b751a5e0936465265c4dad96fbd (patch)
treea52b16e16a918827746b00081e787d4ff3de4748
parentaed64900f8d5d90c01d62962c91d6c0eeed269c8 (diff)
parent10f9e062e3b2fd8ecfbfd38ac00caa6b1b2cb090 (diff)
downloadrabbitmq-server-git-dd24555b0f443b751a5e0936465265c4dad96fbd.tar.gz
Merge pull request #3981 from rabbitmq/mk-two-staged-definition-import
Import queue and binding definitions when a certain number of nodes join
-rw-r--r--deps/rabbit/src/rabbit_definitions.erl47
-rw-r--r--deps/rabbit/src/rabbit_nodes.erl15
2 files changed, 52 insertions, 10 deletions
diff --git a/deps/rabbit/src/rabbit_definitions.erl b/deps/rabbit/src/rabbit_definitions.erl
index 152a6be5f2..4b799d7d2d 100644
--- a/deps/rabbit/src/rabbit_definitions.erl
+++ b/deps/rabbit/src/rabbit_definitions.erl
@@ -257,9 +257,7 @@ apply_defs(Map, ActingUser, SuccessFun) when is_function(SuccessFun) ->
concurrent_for_all(permissions, ActingUser, Map, fun add_permission/2),
concurrent_for_all(topic_permissions, ActingUser, Map, fun add_topic_permission/2),
- concurrent_for_all(queues, ActingUser, Map, fun add_queue/2),
concurrent_for_all(exchanges, ActingUser, Map, fun add_exchange/2),
- concurrent_for_all(bindings, ActingUser, Map, fun add_binding/2),
sequential_for_all(global_parameters, ActingUser, Map, fun add_global_parameter/2),
%% importing policies concurrently can be unsafe as queues will be getting
@@ -267,6 +265,19 @@ apply_defs(Map, ActingUser, SuccessFun) when is_function(SuccessFun) ->
sequential_for_all(policies, ActingUser, Map, fun add_policy/2),
sequential_for_all(parameters, ActingUser, Map, fun add_parameter/2),
+ rabbit_nodes:if_reached_target_cluster_size(
+ fun() ->
+ concurrent_for_all(queues, ActingUser, Map, fun add_queue/2),
+ concurrent_for_all(bindings, ActingUser, Map, fun add_binding/2)
+ end,
+
+ fun() ->
+ rabbit_log:info("There are fewer than target cluster size (~b) nodes online,"
+ " skipping queue and binding import from definitions",
+ [rabbit_nodes:target_cluster_size_hint()])
+ end
+ ),
+
SuccessFun(),
ok
catch {error, E} -> {error, E};
@@ -284,15 +295,25 @@ apply_defs(Map, ActingUser, SuccessFun, VHost) when is_binary(VHost) ->
try
validate_limits(Map, VHost),
- concurrent_for_all(queues, ActingUser, Map, VHost, fun add_queue/3),
concurrent_for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3),
- concurrent_for_all(bindings, ActingUser, Map, VHost, fun add_binding/3),
-
sequential_for_all(parameters, ActingUser, Map, VHost, fun add_parameter/3),
%% importing policies concurrently can be unsafe as queues will be getting
%% potentially out of order notifications of applicable policy changes
sequential_for_all(policies, ActingUser, Map, VHost, fun add_policy/3),
+ rabbit_nodes:if_reached_target_cluster_size(
+ fun() ->
+ concurrent_for_all(queues, ActingUser, Map, VHost, fun add_queue/3),
+ concurrent_for_all(bindings, ActingUser, Map, VHost, fun add_binding/3)
+ end,
+
+ fun() ->
+ rabbit_log:info("There are fewer than target cluster size (~b) nodes online,"
+ " skipping queue and binding import from definitions",
+ [rabbit_nodes:target_cluster_size_hint()])
+ end
+ ),
+
SuccessFun()
catch {error, E} -> {error, format(E)};
exit:E -> {error, format(E)}
@@ -310,15 +331,25 @@ apply_defs(Map, ActingUser, SuccessFun, ErrorFun, VHost) ->
try
validate_limits(Map, VHost),
- concurrent_for_all(queues, ActingUser, Map, VHost, fun add_queue/3),
- concurrent_for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3),
concurrent_for_all(bindings, ActingUser, Map, VHost, fun add_binding/3),
-
sequential_for_all(parameters, ActingUser, Map, VHost, fun add_parameter/3),
%% importing policies concurrently can be unsafe as queues will be getting
%% potentially out of order notifications of applicable policy changes
sequential_for_all(policies, ActingUser, Map, VHost, fun add_policy/3),
+ rabbit_nodes:if_reached_target_cluster_size(
+ fun() ->
+ concurrent_for_all(queues, ActingUser, Map, VHost, fun add_queue/3),
+ concurrent_for_all(bindings, ActingUser, Map, VHost, fun add_binding/3)
+ end,
+
+ fun() ->
+ rabbit_log:info("There are fewer than target cluster size (~b) nodes online,"
+ " skipping queue and binding import from definitions",
+ [rabbit_nodes:target_cluster_size_hint()])
+ end
+ ),
+
SuccessFun()
catch {error, E} -> ErrorFun(format(E));
exit:E -> ErrorFun(format(E))
diff --git a/deps/rabbit/src/rabbit_nodes.erl b/deps/rabbit/src/rabbit_nodes.erl
index 092e8dc912..1668df78e3 100644
--- a/deps/rabbit/src/rabbit_nodes.erl
+++ b/deps/rabbit/src/rabbit_nodes.erl
@@ -14,7 +14,8 @@
await_running_count/2, is_single_node_cluster/0,
boot/0]).
-export([persistent_cluster_id/0, seed_internal_cluster_id/0, seed_user_provided_cluster_name/0]).
--export([all/0, all_running_with_hashes/0, target_cluster_size_hint/0, reached_target_cluster_size/0]).
+-export([all/0, all_running_with_hashes/0, target_cluster_size_hint/0, reached_target_cluster_size/0,
+ if_reached_target_cluster_size/2]).
-export([lock_id/1, lock_retries/0]).
-include_lib("kernel/include/inet.hrl").
@@ -82,7 +83,7 @@ is_process_running(Node, Process) ->
-spec cluster_name() -> binary().
cluster_name() ->
- case rabbit_runtime_parameters:value_global(cluster_name) of
+ case rabbit_runtime_parameters:value_global(cluster_name) of
not_found -> cluster_name_default();
Name -> Name
end.
@@ -183,6 +184,16 @@ target_cluster_size_hint() ->
reached_target_cluster_size() ->
running_count() >= target_cluster_size_hint().
+-spec if_reached_target_cluster_size(ConditionSatisfiedFun :: fun(), ConditionNotSatisfiedFun :: fun()) -> boolean().
+if_reached_target_cluster_size(ConditionSatisfiedFun, ConditionNotSatisfiedFun) ->
+ case reached_target_cluster_size() of
+ true ->
+ ConditionSatisfiedFun(),
+ true;
+ false ->
+ ConditionNotSatisfiedFun(),
+ false
+ end.
-spec lock_id(Node :: node()) -> {ResourceId :: string(), LockRequesterId :: node()}.
lock_id(Node) ->