diff options
author | Michael Klishin <michael@clojurewerkz.org> | 2022-01-11 22:05:16 +0300 |
---|---|---|
committer | Michael Klishin <michael@clojurewerkz.org> | 2022-01-11 22:05:16 +0300 |
commit | a0855a89f28b314f816caa63b17f648d25ff83c8 (patch) | |
tree | 16e638ac859e30da5ec5982c0f440e4fdcb9e671 | |
parent | 8e2edc76c288e04ae89fbd6996fd429be733b082 (diff) | |
download | rabbitmq-server-git-a0855a89f28b314f816caa63b17f648d25ff83c8.tar.gz |
Import queue and binding definitions when a certain number of nodes join
by default the number is 1.
This is to make sure that quorum queues have enough nodes to provision
their initial replicas on instead of settling on a smaller number,
which very likely can be just 1 (the first node to boot).
References #3850.
-rw-r--r-- | deps/rabbit/src/rabbit_definitions.erl | 47 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_nodes.erl | 15 |
2 files changed, 52 insertions, 10 deletions
diff --git a/deps/rabbit/src/rabbit_definitions.erl b/deps/rabbit/src/rabbit_definitions.erl index 152a6be5f2..825a2e56c0 100644 --- a/deps/rabbit/src/rabbit_definitions.erl +++ b/deps/rabbit/src/rabbit_definitions.erl @@ -257,9 +257,7 @@ apply_defs(Map, ActingUser, SuccessFun) when is_function(SuccessFun) -> concurrent_for_all(permissions, ActingUser, Map, fun add_permission/2), concurrent_for_all(topic_permissions, ActingUser, Map, fun add_topic_permission/2), - concurrent_for_all(queues, ActingUser, Map, fun add_queue/2), concurrent_for_all(exchanges, ActingUser, Map, fun add_exchange/2), - concurrent_for_all(bindings, ActingUser, Map, fun add_binding/2), sequential_for_all(global_parameters, ActingUser, Map, fun add_global_parameter/2), %% importing policies concurrently can be unsafe as queues will be getting @@ -267,6 +265,19 @@ apply_defs(Map, ActingUser, SuccessFun) when is_function(SuccessFun) -> sequential_for_all(policies, ActingUser, Map, fun add_policy/2), sequential_for_all(parameters, ActingUser, Map, fun add_parameter/2), + rabbit_nodes:if_reached_target_cluster_size( + fun() -> + concurrent_for_all(queues, ActingUser, Map, fun add_queue/2), + concurrent_for_all(bindings, ActingUser, Map, fun add_binding/2) + end, + + fun() -> + rabbit_log:info("There are fewer than target cluster size (~d) nodes online," + " skipping queue and binding import from definitions", + [rabbit_nodes:target_cluster_size_hint()]) + end + ), + SuccessFun(), ok catch {error, E} -> {error, E}; @@ -284,15 +295,25 @@ apply_defs(Map, ActingUser, SuccessFun, VHost) when is_binary(VHost) -> try validate_limits(Map, VHost), - concurrent_for_all(queues, ActingUser, Map, VHost, fun add_queue/3), concurrent_for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3), - concurrent_for_all(bindings, ActingUser, Map, VHost, fun add_binding/3), - sequential_for_all(parameters, ActingUser, Map, VHost, fun add_parameter/3), %% importing policies concurrently can be unsafe as queues will be getting %% potentially out of order notifications of applicable policy changes sequential_for_all(policies, ActingUser, Map, VHost, fun add_policy/3), + rabbit_nodes:if_reached_target_cluster_size( + fun() -> + concurrent_for_all(queues, ActingUser, Map, VHost, fun add_queue/3), + concurrent_for_all(bindings, ActingUser, Map, VHost, fun add_binding/3) + end, + + fun() -> + rabbit_log:info("There are fewer than target cluster size (~d) nodes online," + " skipping queue and binding import from definitions", + [rabbit_nodes:target_cluster_size_hint()]) + end + ), + SuccessFun() catch {error, E} -> {error, format(E)}; exit:E -> {error, format(E)} @@ -310,15 +331,25 @@ apply_defs(Map, ActingUser, SuccessFun, ErrorFun, VHost) -> try validate_limits(Map, VHost), - concurrent_for_all(queues, ActingUser, Map, VHost, fun add_queue/3), - concurrent_for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3), concurrent_for_all(bindings, ActingUser, Map, VHost, fun add_binding/3), - sequential_for_all(parameters, ActingUser, Map, VHost, fun add_parameter/3), %% importing policies concurrently can be unsafe as queues will be getting %% potentially out of order notifications of applicable policy changes sequential_for_all(policies, ActingUser, Map, VHost, fun add_policy/3), + rabbit_nodes:if_reached_target_cluster_size( + fun() -> + concurrent_for_all(queues, ActingUser, Map, VHost, fun add_queue/3), + concurrent_for_all(bindings, ActingUser, Map, VHost, fun add_binding/3) + end, + + fun() -> + rabbit_log:info("There are fewer than target cluster size (~d) nodes online," + " skipping queue and binding import from definitions", + [rabbit_nodes:target_cluster_size_hint()]) + end + ), + SuccessFun() catch {error, E} -> ErrorFun(format(E)); exit:E -> ErrorFun(format(E)) diff --git a/deps/rabbit/src/rabbit_nodes.erl b/deps/rabbit/src/rabbit_nodes.erl index 092e8dc912..1668df78e3 100644 --- a/deps/rabbit/src/rabbit_nodes.erl +++ b/deps/rabbit/src/rabbit_nodes.erl @@ -14,7 +14,8 @@ await_running_count/2, is_single_node_cluster/0, boot/0]). -export([persistent_cluster_id/0, seed_internal_cluster_id/0, seed_user_provided_cluster_name/0]). --export([all/0, all_running_with_hashes/0, target_cluster_size_hint/0, reached_target_cluster_size/0]). +-export([all/0, all_running_with_hashes/0, target_cluster_size_hint/0, reached_target_cluster_size/0, + if_reached_target_cluster_size/2]). -export([lock_id/1, lock_retries/0]). -include_lib("kernel/include/inet.hrl"). @@ -82,7 +83,7 @@ is_process_running(Node, Process) -> -spec cluster_name() -> binary(). cluster_name() -> - case rabbit_runtime_parameters:value_global(cluster_name) of + case rabbit_runtime_parameters:value_global(cluster_name) of not_found -> cluster_name_default(); Name -> Name end. @@ -183,6 +184,16 @@ target_cluster_size_hint() -> reached_target_cluster_size() -> running_count() >= target_cluster_size_hint(). +-spec if_reached_target_cluster_size(ConditionSatisfiedFun :: fun(), ConditionNotSatisfiedFun :: fun()) -> boolean(). +if_reached_target_cluster_size(ConditionSatisfiedFun, ConditionNotSatisfiedFun) -> + case reached_target_cluster_size() of + true -> + ConditionSatisfiedFun(), + true; + false -> + ConditionNotSatisfiedFun(), + false + end. -spec lock_id(Node :: node()) -> {ResourceId :: string(), LockRequesterId :: node()}. lock_id(Node) -> |