diff options
author | Michael Klishin <mklishin@pivotal.io> | 2020-02-20 19:12:09 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-02-20 19:12:09 +0300 |
commit | 1081e8ed4f76f1dded34a838b9e48bc3016d181e (patch) | |
tree | 2931544e2de2b62d8068ea5d970a7f8d39e5f2f2 /src | |
parent | 3aa744ff56946e291e898bb6482248da626ea3a2 (diff) | |
parent | e12edfa06ca694d06642a14022f0b0abf1aeb8ee (diff) | |
download | rabbitmq-server-git-1081e8ed4f76f1dded34a838b9e48bc3016d181e.tar.gz |
Merge pull request #2248 from rabbitmq/mk-concurrent-definition-import
Concurrent definition import
Diffstat (limited to 'src')
-rw-r--r-- | src/gatherer.erl | 1 | ||||
-rw-r--r-- | src/rabbit.erl | 7 | ||||
-rw-r--r-- | src/rabbit_definitions.erl | 127 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 14 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 1 | ||||
-rw-r--r-- | src/rabbit_queue_index.erl | 2 |
6 files changed, 116 insertions, 36 deletions
diff --git a/src/gatherer.erl b/src/gatherer.erl index 52efef2bbb..99a487ed12 100644 --- a/src/gatherer.erl +++ b/src/gatherer.erl @@ -56,6 +56,7 @@ start_link() -> -spec stop(pid()) -> 'ok'. stop(Pid) -> + unlink(Pid), gen_server2:call(Pid, stop, infinity). -spec fork(pid()) -> 'ok'. diff --git a/src/rabbit.erl b/src/rabbit.erl index 0f98e50504..b1aee4248c 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -88,7 +88,7 @@ {enables, worker_pool}]}). -rabbit_boot_step({worker_pool, - [{description, "worker pool"}, + [{description, "default worker pool"}, {mfa, {rabbit_sup, start_supervisor_child, [worker_pool_sup]}}, {requires, pre_boot}, @@ -229,6 +229,11 @@ [{description, "ready to communicate with peers and clients"}, {requires, [core_initialized, recovery, routing_ready]}]}). +-rabbit_boot_step({definition_import_worker_pool, + [{description, "dedicated worker pool for definition import"}, + {mfa, {rabbit_definitions, boot, []}}, + {requires, pre_flight}]}). + -rabbit_boot_step({cluster_name, [{description, "sets cluster name if configured"}, {mfa, {rabbit_nodes, boot, []}}, diff --git a/src/rabbit_definitions.erl b/src/rabbit_definitions.erl index 4d666b88fc..6336fe4f31 100644 --- a/src/rabbit_definitions.erl +++ b/src/rabbit_definitions.erl @@ -17,6 +17,7 @@ -module(rabbit_definitions). -include_lib("rabbit_common/include/rabbit.hrl"). +-export([boot/0]). %% automatic import on boot -export([maybe_load_definitions/0, maybe_load_definitions_from/2]). %% import @@ -58,6 +59,12 @@ -export_type([definition_object/0, definition_list/0, definition_category/0, definitions/0]). +-define(IMPORT_WORK_POOL, definition_import_pool). + +boot() -> + PoolSize = application:get_env(rabbit, definition_import_work_pool_size, rabbit_runtime:guess_number_of_cpu_cores()), + rabbit_sup:start_supervisor_child(definition_import_pool_sup, worker_pool_sup, [PoolSize, ?IMPORT_WORK_POOL]). + maybe_load_definitions() -> %% this feature was a part of rabbitmq-management for a long time, %% so we check rabbit_management.load_definitions for backward compatibility. @@ -224,20 +231,22 @@ apply_defs(Map, ActingUser, VHost) when is_binary(VHost) -> apply_defs(Map, ActingUser, SuccessFun) when is_function(SuccessFun) -> Version = maps:get(rabbitmq_version, Map, maps:get(rabbit_version, Map, undefined)), try - for_all(users, ActingUser, Map, + concurrent_for_all(users, ActingUser, Map, fun(User, _Username) -> rabbit_auth_backend_internal:put_user(User, Version, ActingUser) end), - for_all(vhosts, ActingUser, Map, fun add_vhost/2), + concurrent_for_all(vhosts, ActingUser, Map, fun add_vhost/2), validate_limits(Map), - for_all(permissions, ActingUser, Map, fun add_permission/2), - for_all(topic_permissions, ActingUser, Map, fun add_topic_permission/2), - for_all(parameters, ActingUser, Map, fun add_parameter/2), - for_all(global_parameters, ActingUser, Map, fun add_global_parameter/2), - for_all(policies, ActingUser, Map, fun add_policy/2), - for_all(queues, ActingUser, Map, fun add_queue/2), - for_all(exchanges, ActingUser, Map, fun add_exchange/2), - for_all(bindings, ActingUser, Map, fun add_binding/2), + concurrent_for_all(permissions, ActingUser, Map, fun add_permission/2), + concurrent_for_all(topic_permissions, ActingUser, Map, fun add_topic_permission/2), + sequential_for_all(parameters, ActingUser, Map, fun add_parameter/2), + sequential_for_all(global_parameters, ActingUser, Map, fun add_global_parameter/2), + %% importing policies concurrently can be unsafe as queues will be getting + %% potentially out of order notifications of applicable policy changes + sequential_for_all(policies, ActingUser, Map, fun add_policy/2), + concurrent_for_all(queues, ActingUser, Map, fun add_queue/2), + concurrent_for_all(exchanges, ActingUser, Map, fun add_exchange/2), + concurrent_for_all(bindings, ActingUser, Map, fun add_binding/2), SuccessFun(), ok catch {error, E} -> {error, E}; @@ -254,11 +263,13 @@ apply_defs(Map, ActingUser, SuccessFun, VHost) when is_binary(VHost) -> [VHost, ActingUser]), try validate_limits(Map, VHost), - for_all(parameters, ActingUser, Map, VHost, fun add_parameter/3), - for_all(policies, ActingUser, Map, VHost, fun add_policy/3), - for_all(queues, ActingUser, Map, VHost, fun add_queue/3), - for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3), - for_all(bindings, ActingUser, Map, VHost, fun add_binding/3), + sequential_for_all(parameters, ActingUser, Map, VHost, fun add_parameter/3), + %% importing policies concurrently can be unsafe as queues will be getting + %% potentially out of order notifications of applicable policy changes + sequential_for_all(policies, ActingUser, Map, VHost, fun add_policy/3), + concurrent_for_all(queues, ActingUser, Map, VHost, fun add_queue/3), + concurrent_for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3), + concurrent_for_all(bindings, ActingUser, Map, VHost, fun add_binding/3), SuccessFun() catch {error, E} -> {error, format(E)}; exit:E -> {error, format(E)} @@ -275,17 +286,19 @@ apply_defs(Map, ActingUser, SuccessFun, ErrorFun, VHost) -> [VHost, ActingUser]), try validate_limits(Map, VHost), - for_all(parameters, ActingUser, Map, VHost, fun add_parameter/3), - for_all(policies, ActingUser, Map, VHost, fun add_policy/3), - for_all(queues, ActingUser, Map, VHost, fun add_queue/3), - for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3), - for_all(bindings, ActingUser, Map, VHost, fun add_binding/3), + sequential_for_all(parameters, ActingUser, Map, VHost, fun add_parameter/3), + %% importing policies concurrently can be unsafe as queues will be getting + %% potentially out of order notifications of applicable policy changes + sequential_for_all(policies, ActingUser, Map, VHost, fun add_policy/3), + concurrent_for_all(queues, ActingUser, Map, VHost, fun add_queue/3), + concurrent_for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3), + concurrent_for_all(bindings, ActingUser, Map, VHost, fun add_binding/3), SuccessFun() catch {error, E} -> ErrorFun(format(E)); exit:E -> ErrorFun(format(E)) end. -for_all(Category, ActingUser, Definitions, Fun) -> +sequential_for_all(Category, ActingUser, Definitions, Fun) -> case maps:get(rabbit_data_coercion:to_atom(Category), Definitions, undefined) of undefined -> ok; List -> @@ -295,22 +308,71 @@ for_all(Category, ActingUser, Definitions, Fun) -> end, [begin %% keys are expected to be atoms - Atomized = maps:fold(fun (K, V, Acc) -> - maps:put(rabbit_data_coercion:to_atom(K), V, Acc) - end, #{}, M), - Fun(Atomized, ActingUser) + Fun(atomize_keys(M), ActingUser) end || M <- List, is_map(M)] end. -for_all(Name, ActingUser, Definitions, VHost, Fun) -> +sequential_for_all(Name, ActingUser, Definitions, VHost, Fun) -> + case maps:get(rabbit_data_coercion:to_atom(Name), Definitions, undefined) of + undefined -> ok; + List -> [Fun(VHost, atomize_keys(M), ActingUser) || M <- List, is_map(M)] + end. + +concurrent_for_all(Category, ActingUser, Definitions, Fun) -> + case maps:get(rabbit_data_coercion:to_atom(Category), Definitions, undefined) of + undefined -> ok; + List -> + case length(List) of + 0 -> ok; + N -> rabbit_log:info("Importing ~p ~s...", [N, human_readable_category_name(Category)]) + end, + WorkPoolFun = fun(M) -> + Fun(atomize_keys(M), ActingUser) + end, + do_concurrent_for_all(List, WorkPoolFun) + end. +concurrent_for_all(Name, ActingUser, Definitions, VHost, Fun) -> case maps:get(rabbit_data_coercion:to_atom(Name), Definitions, undefined) of undefined -> ok; - List -> [Fun(VHost, maps:from_list([{atomise_name(K), V} || {K, V} <- maps:to_list(M)]), - ActingUser) || - M <- List, is_map(M)] + List -> + WorkPoolFun = fun(M) -> + Fun(VHost, atomize_keys(M), ActingUser) + end, + do_concurrent_for_all(List, WorkPoolFun) end. +do_concurrent_for_all(List, WorkPoolFun) -> + {ok, Gatherer} = gatherer:start_link(), + [begin + %% keys are expected to be atoms + ok = gatherer:fork(Gatherer), + worker_pool:submit_async( + ?IMPORT_WORK_POOL, + fun() -> + try + WorkPoolFun(M) + catch _:E -> gatherer:in(Gatherer, {error, E}); + {error, E} -> gatherer:in(Gatherer, {error, E}) + end, + gatherer:finish(Gatherer) + end) + end || M <- List, is_map(M)], + case gatherer:out(Gatherer) of + empty -> + ok = gatherer:stop(Gatherer); + {value, {error, E}} -> + ok = gatherer:stop(Gatherer), + throw({error, E}) + end. + +-spec atomize_keys(#{any() => any()}) -> #{atom() => any()}. + +atomize_keys(M) -> + maps:fold(fun(K, V, Acc) -> + maps:put(rabbit_data_coercion:to_atom(K), V, Acc) + end, #{}, M). + -spec human_readable_category_name(definition_category()) -> string(). human_readable_category_name(topic_permissions) -> "topic permissions"; @@ -390,6 +452,8 @@ add_policy(VHost, Param, Username) -> exit(rabbit_data_coercion:to_binary(rabbit_misc:escape_html_tags(E ++ S))) end. +-spec add_vhost(map(), rabbit_types:username()) -> ok. + add_vhost(VHost, ActingUser) -> VHostName = maps:get(name, VHost, undefined), VHostTrace = maps:get(tracing, VHost, undefined), @@ -588,7 +652,8 @@ list_exchanges() -> %% exclude internal exchanges, they are not meant to be declared or used by %% applications [exchange_definition(X) || X <- lists:filter(fun(#exchange{internal = true}) -> false; - (#exchange{}) -> true + (#exchange{name = #resource{name = <<>>}}) -> false; + (X) -> not rabbit_exchange:is_amq_prefixed(X) end, rabbit_exchange:list())]. @@ -664,7 +729,7 @@ list_users() -> end || U <- rabbit_auth_backend_internal:list_users()]. list_runtime_parameters() -> - [runtime_parameter_definition(P) || P <- rabbit_runtime_parameters:list()]. + [runtime_parameter_definition(P) || P <- rabbit_runtime_parameters:list(), is_list(P)]. runtime_parameter_definition(Param) -> #{ diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index df0138d165..fd9ff0c05b 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -24,7 +24,7 @@ update_scratch/3, update_decorators/1, immutable/1, info_keys/0, info/1, info/2, info_all/1, info_all/2, info_all/4, route/2, delete/3, validate_binding/2, count/0]). --export([list_names/0]). +-export([list_names/0, is_amq_prefixed/1]). %% these must be run inside a mnesia tx -export([maybe_auto_delete/2, serial/1, peek_serial/1, update/2]). @@ -102,6 +102,18 @@ serial(#exchange{name = XName} = X) -> (false) -> none end. +-spec is_amq_prefixed(rabbit_types:exchange() | binary()) -> boolean(). + +is_amq_prefixed(Name) when is_binary(Name) -> + case re:run(Name, <<"^amq\.">>) of + nomatch -> false; + {match, _} -> true + end; +is_amq_prefixed(#exchange{name = #resource{name = <<>>}}) -> + false; +is_amq_prefixed(#exchange{name = #resource{name = Name}}) -> + is_amq_prefixed(Name). + -spec declare (name(), type(), boolean(), boolean(), boolean(), rabbit_framing:amqp_table(), rabbit_types:username()) diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index c845575f5c..fdef5c3606 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -1751,7 +1751,6 @@ build_index(Gatherer, Left, [], sum_file_size = SumFileSize }) -> case gatherer:out(Gatherer) of empty -> - unlink(Gatherer), ok = gatherer:stop(Gatherer), ok = index_clean_up_temporary_reference_count_entries(State), Offset = case ets:lookup(FileSummaryEts, Left) of diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 42995685f6..dd7e973905 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -717,7 +717,6 @@ queue_index_walker({start, DurableQueues}) when is_list(DurableQueues) -> queue_index_walker({next, Gatherer}) when is_pid(Gatherer) -> case gatherer:out(Gatherer) of empty -> - unlink(Gatherer), ok = gatherer:stop(Gatherer), finished; {value, {MsgId, Count}} -> @@ -1432,7 +1431,6 @@ foreach_queue_index(Funs) -> end) end || QueueDirName <- QueueDirNames], empty = gatherer:out(Gatherer), - unlink(Gatherer), ok = gatherer:stop(Gatherer). transform_queue(Dir, Gatherer, {JournalFun, SegmentFun}) -> |