diff options
author | Michael Klishin <klishinm@vmware.com> | 2022-01-13 10:54:51 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-01-13 10:54:51 +0300 |
commit | dd24555b0f443b751a5e0936465265c4dad96fbd (patch) | |
tree | a52b16e16a918827746b00081e787d4ff3de4748 | |
parent | aed64900f8d5d90c01d62962c91d6c0eeed269c8 (diff) | |
parent | 10f9e062e3b2fd8ecfbfd38ac00caa6b1b2cb090 (diff) | |
download | rabbitmq-server-git-dd24555b0f443b751a5e0936465265c4dad96fbd.tar.gz |
Merge pull request #3981 from rabbitmq/mk-two-staged-definition-import
Import queue and binding definitions when a certain number of nodes join
-rw-r--r-- | deps/rabbit/src/rabbit_definitions.erl | 47 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_nodes.erl | 15 |
2 files changed, 52 insertions, 10 deletions
diff --git a/deps/rabbit/src/rabbit_definitions.erl b/deps/rabbit/src/rabbit_definitions.erl index 152a6be5f2..4b799d7d2d 100644 --- a/deps/rabbit/src/rabbit_definitions.erl +++ b/deps/rabbit/src/rabbit_definitions.erl @@ -257,9 +257,7 @@ apply_defs(Map, ActingUser, SuccessFun) when is_function(SuccessFun) -> concurrent_for_all(permissions, ActingUser, Map, fun add_permission/2), concurrent_for_all(topic_permissions, ActingUser, Map, fun add_topic_permission/2), - concurrent_for_all(queues, ActingUser, Map, fun add_queue/2), concurrent_for_all(exchanges, ActingUser, Map, fun add_exchange/2), - concurrent_for_all(bindings, ActingUser, Map, fun add_binding/2), sequential_for_all(global_parameters, ActingUser, Map, fun add_global_parameter/2), %% importing policies concurrently can be unsafe as queues will be getting @@ -267,6 +265,19 @@ apply_defs(Map, ActingUser, SuccessFun) when is_function(SuccessFun) -> sequential_for_all(policies, ActingUser, Map, fun add_policy/2), sequential_for_all(parameters, ActingUser, Map, fun add_parameter/2), + rabbit_nodes:if_reached_target_cluster_size( + fun() -> + concurrent_for_all(queues, ActingUser, Map, fun add_queue/2), + concurrent_for_all(bindings, ActingUser, Map, fun add_binding/2) + end, + + fun() -> + rabbit_log:info("There are fewer than target cluster size (~b) nodes online," + " skipping queue and binding import from definitions", + [rabbit_nodes:target_cluster_size_hint()]) + end + ), + SuccessFun(), ok catch {error, E} -> {error, E}; @@ -284,15 +295,25 @@ apply_defs(Map, ActingUser, SuccessFun, VHost) when is_binary(VHost) -> try validate_limits(Map, VHost), - concurrent_for_all(queues, ActingUser, Map, VHost, fun add_queue/3), concurrent_for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3), - concurrent_for_all(bindings, ActingUser, Map, VHost, fun add_binding/3), - sequential_for_all(parameters, ActingUser, Map, VHost, fun add_parameter/3), %% importing policies concurrently can be unsafe as queues will be getting %% potentially out of order notifications of applicable policy changes sequential_for_all(policies, ActingUser, Map, VHost, fun add_policy/3), + rabbit_nodes:if_reached_target_cluster_size( + fun() -> + concurrent_for_all(queues, ActingUser, Map, VHost, fun add_queue/3), + concurrent_for_all(bindings, ActingUser, Map, VHost, fun add_binding/3) + end, + + fun() -> + rabbit_log:info("There are fewer than target cluster size (~b) nodes online," + " skipping queue and binding import from definitions", + [rabbit_nodes:target_cluster_size_hint()]) + end + ), + SuccessFun() catch {error, E} -> {error, format(E)}; exit:E -> {error, format(E)} @@ -310,15 +331,25 @@ apply_defs(Map, ActingUser, SuccessFun, ErrorFun, VHost) -> try validate_limits(Map, VHost), - concurrent_for_all(queues, ActingUser, Map, VHost, fun add_queue/3), - concurrent_for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3), concurrent_for_all(bindings, ActingUser, Map, VHost, fun add_binding/3), - sequential_for_all(parameters, ActingUser, Map, VHost, fun add_parameter/3), %% importing policies concurrently can be unsafe as queues will be getting %% potentially out of order notifications of applicable policy changes sequential_for_all(policies, ActingUser, Map, VHost, fun add_policy/3), + rabbit_nodes:if_reached_target_cluster_size( + fun() -> + concurrent_for_all(queues, ActingUser, Map, VHost, fun add_queue/3), + concurrent_for_all(bindings, ActingUser, Map, VHost, fun add_binding/3) + end, + + fun() -> + rabbit_log:info("There are fewer than target cluster size (~b) nodes online," + " skipping queue and binding import from definitions", + [rabbit_nodes:target_cluster_size_hint()]) + end + ), + SuccessFun() catch {error, E} -> ErrorFun(format(E)); exit:E -> ErrorFun(format(E)) diff --git a/deps/rabbit/src/rabbit_nodes.erl b/deps/rabbit/src/rabbit_nodes.erl index 092e8dc912..1668df78e3 100644 --- a/deps/rabbit/src/rabbit_nodes.erl +++ b/deps/rabbit/src/rabbit_nodes.erl @@ -14,7 +14,8 @@ await_running_count/2, is_single_node_cluster/0, boot/0]). -export([persistent_cluster_id/0, seed_internal_cluster_id/0, seed_user_provided_cluster_name/0]). --export([all/0, all_running_with_hashes/0, target_cluster_size_hint/0, reached_target_cluster_size/0]). +-export([all/0, all_running_with_hashes/0, target_cluster_size_hint/0, reached_target_cluster_size/0, + if_reached_target_cluster_size/2]). -export([lock_id/1, lock_retries/0]). -include_lib("kernel/include/inet.hrl"). @@ -82,7 +83,7 @@ is_process_running(Node, Process) -> -spec cluster_name() -> binary(). cluster_name() -> - case rabbit_runtime_parameters:value_global(cluster_name) of + case rabbit_runtime_parameters:value_global(cluster_name) of not_found -> cluster_name_default(); Name -> Name end. @@ -183,6 +184,16 @@ target_cluster_size_hint() -> reached_target_cluster_size() -> running_count() >= target_cluster_size_hint(). +-spec if_reached_target_cluster_size(ConditionSatisfiedFun :: fun(), ConditionNotSatisfiedFun :: fun()) -> boolean(). +if_reached_target_cluster_size(ConditionSatisfiedFun, ConditionNotSatisfiedFun) -> + case reached_target_cluster_size() of + true -> + ConditionSatisfiedFun(), + true; + false -> + ConditionNotSatisfiedFun(), + false + end. -spec lock_id(Node :: node()) -> {ResourceId :: string(), LockRequesterId :: node()}. lock_id(Node) -> |