diff options
author | Michael Klishin <michael@clojurewerkz.org> | 2020-02-18 21:04:14 +0300 |
---|---|---|
committer | Michael Klishin <michael@clojurewerkz.org> | 2020-02-18 21:04:14 +0300 |
commit | 12571627a6089aa7d603e92b3e35745c8d398b3e (patch) | |
tree | 7a73b216e96741afa49427d9989aa0462696f689 /src | |
parent | a63606045dbf34c082ba3a12cb1f5113db8d6a6f (diff) | |
download | rabbitmq-server-git-12571627a6089aa7d603e92b3e35745c8d398b3e.tar.gz |
Import definitions concurrently
…or rather, import some definition categories concurrently,
namely users, virtual hosts, queues, exchanges and bindings.
For some workloads this leads to a 20% to 70% reduction
in definition import time. Note that for virtual hosts,
most of the creation process steps cannot be made concurrent
or significantly optimised without compromising the observed
atomicity of HTTP API and CLI operations, so concurrent
import both makes less of a difference and is the only
realistic way of speeding up the process for virtual hosts.
This introduces a dedicated work pool for import operations
to avoid overloading the default pool, in particular on node
boot when definitions can be imported concurrently with on disk
data recovery steps which use the default pool heavily.
Diffstat (limited to 'src')
-rw-r--r-- | src/rabbit.erl | 7 | ||||
-rw-r--r-- | src/rabbit_definitions.erl | 114 |
2 files changed, 95 insertions, 26 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 0f98e50504..b1aee4248c 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -88,7 +88,7 @@ {enables, worker_pool}]}). -rabbit_boot_step({worker_pool, - [{description, "worker pool"}, + [{description, "default worker pool"}, {mfa, {rabbit_sup, start_supervisor_child, [worker_pool_sup]}}, {requires, pre_boot}, @@ -229,6 +229,11 @@ [{description, "ready to communicate with peers and clients"}, {requires, [core_initialized, recovery, routing_ready]}]}). +-rabbit_boot_step({definition_import_worker_pool, + [{description, "dedicated worker pool for definition import"}, + {mfa, {rabbit_definitions, boot, []}}, + {requires, pre_flight}]}). + -rabbit_boot_step({cluster_name, [{description, "sets cluster name if configured"}, {mfa, {rabbit_nodes, boot, []}}, diff --git a/src/rabbit_definitions.erl b/src/rabbit_definitions.erl index 1d0a09f682..14e991cac5 100644 --- a/src/rabbit_definitions.erl +++ b/src/rabbit_definitions.erl @@ -17,6 +17,7 @@ -module(rabbit_definitions). -include_lib("rabbit_common/include/rabbit.hrl"). +-export([boot/0]). %% automatic import on boot -export([maybe_load_definitions/0, maybe_load_definitions_from/2]). %% import @@ -58,6 +59,12 @@ -export_type([definition_object/0, definition_list/0, definition_category/0, definitions/0]). +-define(IMPORT_WORK_POOL, definition_import_pool). + +boot() -> + PoolSize = application:get_env(rabbit, definition_import_work_pool_size, rabbit_runtime:guess_number_of_cpu_cores()), + rabbit_sup:start_supervisor_child(definition_import_pool_sup, worker_pool_sup, [PoolSize, ?IMPORT_WORK_POOL]). + maybe_load_definitions() -> %% this feature was a part of rabbitmq-management for a long time, %% so we check rabbit_management.load_definitions for backward compatibility. @@ -224,20 +231,22 @@ apply_defs(Map, ActingUser, VHost) when is_binary(VHost) -> apply_defs(Map, ActingUser, SuccessFun) when is_function(SuccessFun) -> Version = maps:get(rabbitmq_version, Map, maps:get(rabbit_version, Map, undefined)), try - for_all(users, ActingUser, Map, + concurrent_for_all(users, ActingUser, Map, fun(User, _Username) -> rabbit_auth_backend_internal:put_user(User, Version, ActingUser) end), - for_all(vhosts, ActingUser, Map, fun add_vhost/2), + concurrent_for_all(vhosts, ActingUser, Map, fun add_vhost/2), validate_limits(Map), - for_all(permissions, ActingUser, Map, fun add_permission/2), - for_all(topic_permissions, ActingUser, Map, fun add_topic_permission/2), - for_all(parameters, ActingUser, Map, fun add_parameter/2), - for_all(global_parameters, ActingUser, Map, fun add_global_parameter/2), - for_all(policies, ActingUser, Map, fun add_policy/2), - for_all(queues, ActingUser, Map, fun add_queue/2), - for_all(exchanges, ActingUser, Map, fun add_exchange/2), - for_all(bindings, ActingUser, Map, fun add_binding/2), + concurrent_for_all(permissions, ActingUser, Map, fun add_permission/2), + concurrent_for_all(topic_permissions, ActingUser, Map, fun add_topic_permission/2), + sequential_for_all(parameters, ActingUser, Map, fun add_parameter/2), + sequential_for_all(global_parameters, ActingUser, Map, fun add_global_parameter/2), + %% importing policies concurrently can be unsafe as queues will be getting + %% potentially out of order notifications of applicable policy changes + sequential_for_all(policies, ActingUser, Map, fun add_policy/2), + concurrent_for_all(queues, ActingUser, Map, fun add_queue/2), + concurrent_for_all(exchanges, ActingUser, Map, fun add_exchange/2), + concurrent_for_all(bindings, ActingUser, Map, fun add_binding/2), SuccessFun(), ok catch {error, E} -> {error, E}; @@ -254,11 +263,13 @@ apply_defs(Map, ActingUser, SuccessFun, VHost) when is_binary(VHost) -> [VHost, ActingUser]), try validate_limits(Map, VHost), - for_all(parameters, ActingUser, Map, VHost, fun add_parameter/3), - for_all(policies, ActingUser, Map, VHost, fun add_policy/3), - for_all(queues, ActingUser, Map, VHost, fun add_queue/3), - for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3), - for_all(bindings, ActingUser, Map, VHost, fun add_binding/3), + sequential_for_all(parameters, ActingUser, Map, VHost, fun add_parameter/3), + %% importing policies concurrently can be unsafe as queues will be getting + %% potentially out of order notifications of applicable policy changes + sequential_for_all(policies, ActingUser, Map, VHost, fun add_policy/3), + concurrent_for_all(queues, ActingUser, Map, VHost, fun add_queue/3), + concurrent_for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3), + concurrent_for_all(bindings, ActingUser, Map, VHost, fun add_binding/3), SuccessFun() catch {error, E} -> {error, format(E)}; exit:E -> {error, format(E)} @@ -275,17 +286,19 @@ apply_defs(Map, ActingUser, SuccessFun, ErrorFun, VHost) -> [VHost, ActingUser]), try validate_limits(Map, VHost), - for_all(parameters, ActingUser, Map, VHost, fun add_parameter/3), - for_all(policies, ActingUser, Map, VHost, fun add_policy/3), - for_all(queues, ActingUser, Map, VHost, fun add_queue/3), - for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3), - for_all(bindings, ActingUser, Map, VHost, fun add_binding/3), + sequential_for_all(parameters, ActingUser, Map, VHost, fun add_parameter/3), + %% importing policies concurrently can be unsafe as queues will be getting + %% potentially out of order notifications of applicable policy changes + sequential_for_all(policies, ActingUser, Map, VHost, fun add_policy/3), + concurrent_for_all(queues, ActingUser, Map, VHost, fun add_queue/3), + concurrent_for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3), + concurrent_for_all(bindings, ActingUser, Map, VHost, fun add_binding/3), SuccessFun() catch {error, E} -> ErrorFun(format(E)); exit:E -> ErrorFun(format(E)) end. -for_all(Category, ActingUser, Definitions, Fun) -> +sequential_for_all(Category, ActingUser, Definitions, Fun) -> case maps:get(rabbit_data_coercion:to_atom(Category), Definitions, undefined) of undefined -> ok; List -> @@ -295,14 +308,12 @@ for_all(Category, ActingUser, Definitions, Fun) -> end, [begin %% keys are expected to be atoms - Atomized = maps:fold(fun (K, V, Acc) -> - maps:put(rabbit_data_coercion:to_atom(K), V, Acc) - end, #{}, M), + Atomized = atomize_keys(M), Fun(Atomized, ActingUser) end || M <- List, is_map(M)] end. -for_all(Name, ActingUser, Definitions, VHost, Fun) -> +sequential_for_all(Name, ActingUser, Definitions, VHost, Fun) -> case maps:get(rabbit_data_coercion:to_atom(Name), Definitions, undefined) of undefined -> ok; @@ -311,6 +322,57 @@ for_all(Name, ActingUser, Definitions, VHost, Fun) -> M <- List, is_map(M)] end. +concurrent_for_all(Category, ActingUser, Definitions, Fun) -> + case maps:get(rabbit_data_coercion:to_atom(Category), Definitions, undefined) of + undefined -> ok; + List -> + case length(List) of + 0 -> ok; + N -> rabbit_log:info("Importing ~p ~s...", [N, human_readable_category_name(Category)]) + end, + {ok, Gatherer} = gatherer:start_link(), + [begin + %% keys are expected to be atoms + Atomized = atomize_keys(M), + ok = gatherer:fork(Gatherer), + worker_pool:submit_async( + ?IMPORT_WORK_POOL, + fun() -> + Fun(Atomized, ActingUser), + gatherer:finish(Gatherer) + end) + end || M <- List, is_map(M)], + gatherer:out(Gatherer), + gatherer:stop(Gatherer) + end. + +concurrent_for_all(Name, ActingUser, Definitions, VHost, Fun) -> + case maps:get(rabbit_data_coercion:to_atom(Name), Definitions, undefined) of + undefined -> ok; + List -> + {ok, Gatherer} = gatherer:start_link(), + [begin + %% keys are expected to be atoms + Atomized = M = atomize_keys(M), + ok = gatherer:fork(Gatherer), + worker_pool:submit_async( + ?IMPORT_WORK_POOL, + fun() -> + Fun(VHost, Atomized, ActingUser), + gatherer:finish(Gatherer) + end) + end || M <- List, is_map(M)], + gatherer:out(Gatherer), + gatherer:stop(Gatherer) + end. + +-spec atomize_keys(#{any() => any()}) -> #{atom() => any()}. + +atomize_keys(M) -> + maps:fold(fun(K, V, Acc) -> + maps:put(rabbit_data_coercion:to_atom(K), V, Acc) + end, #{}, M). + -spec human_readable_category_name(definition_category()) -> string(). human_readable_category_name(topic_permissions) -> "topic permissions"; @@ -390,6 +452,8 @@ add_policy(VHost, Param, Username) -> exit(rabbit_data_coercion:to_binary(rabbit_misc:escape_html_tags(E ++ S))) end. +-spec add_vhost(map(), rabbit_types:username()) -> ok. + add_vhost(VHost, ActingUser) -> VHostName = maps:get(name, VHost, undefined), VHostTrace = maps:get(tracing, VHost, undefined), |