summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2020-02-18 21:04:14 +0300
committerMichael Klishin <michael@clojurewerkz.org>2020-02-18 21:04:14 +0300
commit12571627a6089aa7d603e92b3e35745c8d398b3e (patch)
tree7a73b216e96741afa49427d9989aa0462696f689 /src
parenta63606045dbf34c082ba3a12cb1f5113db8d6a6f (diff)
downloadrabbitmq-server-git-12571627a6089aa7d603e92b3e35745c8d398b3e.tar.gz
Import definitions concurrently
…or rather, import some definition categories concurrently, namely users, virtual hosts, queues, exchanges and bindings. For some workloads this leads to a 20% to 70% reduction in definition import time. Note that for virtual hosts, most of the creation process steps cannot be made concurrent or significantly optimised without compromising the observed atomicity of HTTP API and CLI operations, so concurrent import both makes less of a difference and is the only realistic way of speeding up the process for virtual hosts. This introduces a dedicated work pool for import operations to avoid overloading the default pool, in particular on node boot when definitions can be imported concurrently with on disk data recovery steps which use the default pool heavily.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl7
-rw-r--r--src/rabbit_definitions.erl114
2 files changed, 95 insertions, 26 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 0f98e50504..b1aee4248c 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -88,7 +88,7 @@
{enables, worker_pool}]}).
-rabbit_boot_step({worker_pool,
- [{description, "worker pool"},
+ [{description, "default worker pool"},
{mfa, {rabbit_sup, start_supervisor_child,
[worker_pool_sup]}},
{requires, pre_boot},
@@ -229,6 +229,11 @@
[{description, "ready to communicate with peers and clients"},
{requires, [core_initialized, recovery, routing_ready]}]}).
+-rabbit_boot_step({definition_import_worker_pool,
+ [{description, "dedicated worker pool for definition import"},
+ {mfa, {rabbit_definitions, boot, []}},
+ {requires, pre_flight}]}).
+
-rabbit_boot_step({cluster_name,
[{description, "sets cluster name if configured"},
{mfa, {rabbit_nodes, boot, []}},
diff --git a/src/rabbit_definitions.erl b/src/rabbit_definitions.erl
index 1d0a09f682..14e991cac5 100644
--- a/src/rabbit_definitions.erl
+++ b/src/rabbit_definitions.erl
@@ -17,6 +17,7 @@
-module(rabbit_definitions).
-include_lib("rabbit_common/include/rabbit.hrl").
+-export([boot/0]).
%% automatic import on boot
-export([maybe_load_definitions/0, maybe_load_definitions_from/2]).
%% import
@@ -58,6 +59,12 @@
-export_type([definition_object/0, definition_list/0, definition_category/0, definitions/0]).
+-define(IMPORT_WORK_POOL, definition_import_pool).
+
+boot() ->
+ PoolSize = application:get_env(rabbit, definition_import_work_pool_size, rabbit_runtime:guess_number_of_cpu_cores()),
+ rabbit_sup:start_supervisor_child(definition_import_pool_sup, worker_pool_sup, [PoolSize, ?IMPORT_WORK_POOL]).
+
maybe_load_definitions() ->
%% this feature was a part of rabbitmq-management for a long time,
%% so we check rabbit_management.load_definitions for backward compatibility.
@@ -224,20 +231,22 @@ apply_defs(Map, ActingUser, VHost) when is_binary(VHost) ->
apply_defs(Map, ActingUser, SuccessFun) when is_function(SuccessFun) ->
Version = maps:get(rabbitmq_version, Map, maps:get(rabbit_version, Map, undefined)),
try
- for_all(users, ActingUser, Map,
+ concurrent_for_all(users, ActingUser, Map,
fun(User, _Username) ->
rabbit_auth_backend_internal:put_user(User, Version, ActingUser)
end),
- for_all(vhosts, ActingUser, Map, fun add_vhost/2),
+ concurrent_for_all(vhosts, ActingUser, Map, fun add_vhost/2),
validate_limits(Map),
- for_all(permissions, ActingUser, Map, fun add_permission/2),
- for_all(topic_permissions, ActingUser, Map, fun add_topic_permission/2),
- for_all(parameters, ActingUser, Map, fun add_parameter/2),
- for_all(global_parameters, ActingUser, Map, fun add_global_parameter/2),
- for_all(policies, ActingUser, Map, fun add_policy/2),
- for_all(queues, ActingUser, Map, fun add_queue/2),
- for_all(exchanges, ActingUser, Map, fun add_exchange/2),
- for_all(bindings, ActingUser, Map, fun add_binding/2),
+ concurrent_for_all(permissions, ActingUser, Map, fun add_permission/2),
+ concurrent_for_all(topic_permissions, ActingUser, Map, fun add_topic_permission/2),
+ sequential_for_all(parameters, ActingUser, Map, fun add_parameter/2),
+ sequential_for_all(global_parameters, ActingUser, Map, fun add_global_parameter/2),
+ %% importing policies concurrently can be unsafe as queues will be getting
+ %% potentially out of order notifications of applicable policy changes
+ sequential_for_all(policies, ActingUser, Map, fun add_policy/2),
+ concurrent_for_all(queues, ActingUser, Map, fun add_queue/2),
+ concurrent_for_all(exchanges, ActingUser, Map, fun add_exchange/2),
+ concurrent_for_all(bindings, ActingUser, Map, fun add_binding/2),
SuccessFun(),
ok
catch {error, E} -> {error, E};
@@ -254,11 +263,13 @@ apply_defs(Map, ActingUser, SuccessFun, VHost) when is_binary(VHost) ->
[VHost, ActingUser]),
try
validate_limits(Map, VHost),
- for_all(parameters, ActingUser, Map, VHost, fun add_parameter/3),
- for_all(policies, ActingUser, Map, VHost, fun add_policy/3),
- for_all(queues, ActingUser, Map, VHost, fun add_queue/3),
- for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3),
- for_all(bindings, ActingUser, Map, VHost, fun add_binding/3),
+ sequential_for_all(parameters, ActingUser, Map, VHost, fun add_parameter/3),
+ %% importing policies concurrently can be unsafe as queues will be getting
+ %% potentially out of order notifications of applicable policy changes
+ sequential_for_all(policies, ActingUser, Map, VHost, fun add_policy/3),
+ concurrent_for_all(queues, ActingUser, Map, VHost, fun add_queue/3),
+ concurrent_for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3),
+ concurrent_for_all(bindings, ActingUser, Map, VHost, fun add_binding/3),
SuccessFun()
catch {error, E} -> {error, format(E)};
exit:E -> {error, format(E)}
@@ -275,17 +286,19 @@ apply_defs(Map, ActingUser, SuccessFun, ErrorFun, VHost) ->
[VHost, ActingUser]),
try
validate_limits(Map, VHost),
- for_all(parameters, ActingUser, Map, VHost, fun add_parameter/3),
- for_all(policies, ActingUser, Map, VHost, fun add_policy/3),
- for_all(queues, ActingUser, Map, VHost, fun add_queue/3),
- for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3),
- for_all(bindings, ActingUser, Map, VHost, fun add_binding/3),
+ sequential_for_all(parameters, ActingUser, Map, VHost, fun add_parameter/3),
+ %% importing policies concurrently can be unsafe as queues will be getting
+ %% potentially out of order notifications of applicable policy changes
+ sequential_for_all(policies, ActingUser, Map, VHost, fun add_policy/3),
+ concurrent_for_all(queues, ActingUser, Map, VHost, fun add_queue/3),
+ concurrent_for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3),
+ concurrent_for_all(bindings, ActingUser, Map, VHost, fun add_binding/3),
SuccessFun()
catch {error, E} -> ErrorFun(format(E));
exit:E -> ErrorFun(format(E))
end.
-for_all(Category, ActingUser, Definitions, Fun) ->
+sequential_for_all(Category, ActingUser, Definitions, Fun) ->
case maps:get(rabbit_data_coercion:to_atom(Category), Definitions, undefined) of
undefined -> ok;
List ->
@@ -295,14 +308,12 @@ for_all(Category, ActingUser, Definitions, Fun) ->
end,
[begin
%% keys are expected to be atoms
- Atomized = maps:fold(fun (K, V, Acc) ->
- maps:put(rabbit_data_coercion:to_atom(K), V, Acc)
- end, #{}, M),
+ Atomized = atomize_keys(M),
Fun(Atomized, ActingUser)
end || M <- List, is_map(M)]
end.
-for_all(Name, ActingUser, Definitions, VHost, Fun) ->
+sequential_for_all(Name, ActingUser, Definitions, VHost, Fun) ->
case maps:get(rabbit_data_coercion:to_atom(Name), Definitions, undefined) of
undefined -> ok;
@@ -311,6 +322,57 @@ for_all(Name, ActingUser, Definitions, VHost, Fun) ->
M <- List, is_map(M)]
end.
+concurrent_for_all(Category, ActingUser, Definitions, Fun) ->
+ case maps:get(rabbit_data_coercion:to_atom(Category), Definitions, undefined) of
+ undefined -> ok;
+ List ->
+ case length(List) of
+ 0 -> ok;
+ N -> rabbit_log:info("Importing ~p ~s...", [N, human_readable_category_name(Category)])
+ end,
+ {ok, Gatherer} = gatherer:start_link(),
+ [begin
+ %% keys are expected to be atoms
+ Atomized = atomize_keys(M),
+ ok = gatherer:fork(Gatherer),
+ worker_pool:submit_async(
+ ?IMPORT_WORK_POOL,
+ fun() ->
+ Fun(Atomized, ActingUser),
+ gatherer:finish(Gatherer)
+ end)
+ end || M <- List, is_map(M)],
+ gatherer:out(Gatherer),
+ gatherer:stop(Gatherer)
+ end.
+
+concurrent_for_all(Name, ActingUser, Definitions, VHost, Fun) ->
+ case maps:get(rabbit_data_coercion:to_atom(Name), Definitions, undefined) of
+ undefined -> ok;
+ List ->
+ {ok, Gatherer} = gatherer:start_link(),
+ [begin
+ %% keys are expected to be atoms
+ Atomized = M = atomize_keys(M),
+ ok = gatherer:fork(Gatherer),
+ worker_pool:submit_async(
+ ?IMPORT_WORK_POOL,
+ fun() ->
+ Fun(VHost, Atomized, ActingUser),
+ gatherer:finish(Gatherer)
+ end)
+ end || M <- List, is_map(M)],
+ gatherer:out(Gatherer),
+ gatherer:stop(Gatherer)
+ end.
+
+-spec atomize_keys(#{any() => any()}) -> #{atom() => any()}.
+
+atomize_keys(M) ->
+ maps:fold(fun(K, V, Acc) ->
+ maps:put(rabbit_data_coercion:to_atom(K), V, Acc)
+ end, #{}, M).
+
-spec human_readable_category_name(definition_category()) -> string().
human_readable_category_name(topic_permissions) -> "topic permissions";
@@ -390,6 +452,8 @@ add_policy(VHost, Param, Username) ->
exit(rabbit_data_coercion:to_binary(rabbit_misc:escape_html_tags(E ++ S)))
end.
+-spec add_vhost(map(), rabbit_types:username()) -> ok.
+
add_vhost(VHost, ActingUser) ->
VHostName = maps:get(name, VHost, undefined),
VHostTrace = maps:get(tracing, VHost, undefined),