summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <mklishin@pivotal.io>2020-02-20 19:12:09 +0300
committerGitHub <noreply@github.com>2020-02-20 19:12:09 +0300
commit1081e8ed4f76f1dded34a838b9e48bc3016d181e (patch)
tree2931544e2de2b62d8068ea5d970a7f8d39e5f2f2 /src
parent3aa744ff56946e291e898bb6482248da626ea3a2 (diff)
parente12edfa06ca694d06642a14022f0b0abf1aeb8ee (diff)
downloadrabbitmq-server-git-1081e8ed4f76f1dded34a838b9e48bc3016d181e.tar.gz
Merge pull request #2248 from rabbitmq/mk-concurrent-definition-import
Concurrent definition import
Diffstat (limited to 'src')
-rw-r--r--src/gatherer.erl1
-rw-r--r--src/rabbit.erl7
-rw-r--r--src/rabbit_definitions.erl127
-rw-r--r--src/rabbit_exchange.erl14
-rw-r--r--src/rabbit_msg_store.erl1
-rw-r--r--src/rabbit_queue_index.erl2
6 files changed, 116 insertions, 36 deletions
diff --git a/src/gatherer.erl b/src/gatherer.erl
index 52efef2bbb..99a487ed12 100644
--- a/src/gatherer.erl
+++ b/src/gatherer.erl
@@ -56,6 +56,7 @@ start_link() ->
-spec stop(pid()) -> 'ok'.
stop(Pid) ->
+ unlink(Pid),
gen_server2:call(Pid, stop, infinity).
-spec fork(pid()) -> 'ok'.
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 0f98e50504..b1aee4248c 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -88,7 +88,7 @@
{enables, worker_pool}]}).
-rabbit_boot_step({worker_pool,
- [{description, "worker pool"},
+ [{description, "default worker pool"},
{mfa, {rabbit_sup, start_supervisor_child,
[worker_pool_sup]}},
{requires, pre_boot},
@@ -229,6 +229,11 @@
[{description, "ready to communicate with peers and clients"},
{requires, [core_initialized, recovery, routing_ready]}]}).
+-rabbit_boot_step({definition_import_worker_pool,
+ [{description, "dedicated worker pool for definition import"},
+ {mfa, {rabbit_definitions, boot, []}},
+ {requires, pre_flight}]}).
+
-rabbit_boot_step({cluster_name,
[{description, "sets cluster name if configured"},
{mfa, {rabbit_nodes, boot, []}},
diff --git a/src/rabbit_definitions.erl b/src/rabbit_definitions.erl
index 4d666b88fc..6336fe4f31 100644
--- a/src/rabbit_definitions.erl
+++ b/src/rabbit_definitions.erl
@@ -17,6 +17,7 @@
-module(rabbit_definitions).
-include_lib("rabbit_common/include/rabbit.hrl").
+-export([boot/0]).
%% automatic import on boot
-export([maybe_load_definitions/0, maybe_load_definitions_from/2]).
%% import
@@ -58,6 +59,12 @@
-export_type([definition_object/0, definition_list/0, definition_category/0, definitions/0]).
+-define(IMPORT_WORK_POOL, definition_import_pool).
+
+boot() ->
+ PoolSize = application:get_env(rabbit, definition_import_work_pool_size, rabbit_runtime:guess_number_of_cpu_cores()),
+ rabbit_sup:start_supervisor_child(definition_import_pool_sup, worker_pool_sup, [PoolSize, ?IMPORT_WORK_POOL]).
+
maybe_load_definitions() ->
%% this feature was a part of rabbitmq-management for a long time,
%% so we check rabbit_management.load_definitions for backward compatibility.
@@ -224,20 +231,22 @@ apply_defs(Map, ActingUser, VHost) when is_binary(VHost) ->
apply_defs(Map, ActingUser, SuccessFun) when is_function(SuccessFun) ->
Version = maps:get(rabbitmq_version, Map, maps:get(rabbit_version, Map, undefined)),
try
- for_all(users, ActingUser, Map,
+ concurrent_for_all(users, ActingUser, Map,
fun(User, _Username) ->
rabbit_auth_backend_internal:put_user(User, Version, ActingUser)
end),
- for_all(vhosts, ActingUser, Map, fun add_vhost/2),
+ concurrent_for_all(vhosts, ActingUser, Map, fun add_vhost/2),
validate_limits(Map),
- for_all(permissions, ActingUser, Map, fun add_permission/2),
- for_all(topic_permissions, ActingUser, Map, fun add_topic_permission/2),
- for_all(parameters, ActingUser, Map, fun add_parameter/2),
- for_all(global_parameters, ActingUser, Map, fun add_global_parameter/2),
- for_all(policies, ActingUser, Map, fun add_policy/2),
- for_all(queues, ActingUser, Map, fun add_queue/2),
- for_all(exchanges, ActingUser, Map, fun add_exchange/2),
- for_all(bindings, ActingUser, Map, fun add_binding/2),
+ concurrent_for_all(permissions, ActingUser, Map, fun add_permission/2),
+ concurrent_for_all(topic_permissions, ActingUser, Map, fun add_topic_permission/2),
+ sequential_for_all(parameters, ActingUser, Map, fun add_parameter/2),
+ sequential_for_all(global_parameters, ActingUser, Map, fun add_global_parameter/2),
+ %% importing policies concurrently can be unsafe as queues will be getting
+ %% potentially out of order notifications of applicable policy changes
+ sequential_for_all(policies, ActingUser, Map, fun add_policy/2),
+ concurrent_for_all(queues, ActingUser, Map, fun add_queue/2),
+ concurrent_for_all(exchanges, ActingUser, Map, fun add_exchange/2),
+ concurrent_for_all(bindings, ActingUser, Map, fun add_binding/2),
SuccessFun(),
ok
catch {error, E} -> {error, E};
@@ -254,11 +263,13 @@ apply_defs(Map, ActingUser, SuccessFun, VHost) when is_binary(VHost) ->
[VHost, ActingUser]),
try
validate_limits(Map, VHost),
- for_all(parameters, ActingUser, Map, VHost, fun add_parameter/3),
- for_all(policies, ActingUser, Map, VHost, fun add_policy/3),
- for_all(queues, ActingUser, Map, VHost, fun add_queue/3),
- for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3),
- for_all(bindings, ActingUser, Map, VHost, fun add_binding/3),
+ sequential_for_all(parameters, ActingUser, Map, VHost, fun add_parameter/3),
+ %% importing policies concurrently can be unsafe as queues will be getting
+ %% potentially out of order notifications of applicable policy changes
+ sequential_for_all(policies, ActingUser, Map, VHost, fun add_policy/3),
+ concurrent_for_all(queues, ActingUser, Map, VHost, fun add_queue/3),
+ concurrent_for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3),
+ concurrent_for_all(bindings, ActingUser, Map, VHost, fun add_binding/3),
SuccessFun()
catch {error, E} -> {error, format(E)};
exit:E -> {error, format(E)}
@@ -275,17 +286,19 @@ apply_defs(Map, ActingUser, SuccessFun, ErrorFun, VHost) ->
[VHost, ActingUser]),
try
validate_limits(Map, VHost),
- for_all(parameters, ActingUser, Map, VHost, fun add_parameter/3),
- for_all(policies, ActingUser, Map, VHost, fun add_policy/3),
- for_all(queues, ActingUser, Map, VHost, fun add_queue/3),
- for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3),
- for_all(bindings, ActingUser, Map, VHost, fun add_binding/3),
+ sequential_for_all(parameters, ActingUser, Map, VHost, fun add_parameter/3),
+ %% importing policies concurrently can be unsafe as queues will be getting
+ %% potentially out of order notifications of applicable policy changes
+ sequential_for_all(policies, ActingUser, Map, VHost, fun add_policy/3),
+ concurrent_for_all(queues, ActingUser, Map, VHost, fun add_queue/3),
+ concurrent_for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3),
+ concurrent_for_all(bindings, ActingUser, Map, VHost, fun add_binding/3),
SuccessFun()
catch {error, E} -> ErrorFun(format(E));
exit:E -> ErrorFun(format(E))
end.
-for_all(Category, ActingUser, Definitions, Fun) ->
+sequential_for_all(Category, ActingUser, Definitions, Fun) ->
case maps:get(rabbit_data_coercion:to_atom(Category), Definitions, undefined) of
undefined -> ok;
List ->
@@ -295,22 +308,71 @@ for_all(Category, ActingUser, Definitions, Fun) ->
end,
[begin
%% keys are expected to be atoms
- Atomized = maps:fold(fun (K, V, Acc) ->
- maps:put(rabbit_data_coercion:to_atom(K), V, Acc)
- end, #{}, M),
- Fun(Atomized, ActingUser)
+ Fun(atomize_keys(M), ActingUser)
end || M <- List, is_map(M)]
end.
-for_all(Name, ActingUser, Definitions, VHost, Fun) ->
+sequential_for_all(Name, ActingUser, Definitions, VHost, Fun) ->
+ case maps:get(rabbit_data_coercion:to_atom(Name), Definitions, undefined) of
+ undefined -> ok;
+ List -> [Fun(VHost, atomize_keys(M), ActingUser) || M <- List, is_map(M)]
+ end.
+
+concurrent_for_all(Category, ActingUser, Definitions, Fun) ->
+ case maps:get(rabbit_data_coercion:to_atom(Category), Definitions, undefined) of
+ undefined -> ok;
+ List ->
+ case length(List) of
+ 0 -> ok;
+ N -> rabbit_log:info("Importing ~p ~s...", [N, human_readable_category_name(Category)])
+ end,
+ WorkPoolFun = fun(M) ->
+ Fun(atomize_keys(M), ActingUser)
+ end,
+ do_concurrent_for_all(List, WorkPoolFun)
+ end.
+concurrent_for_all(Name, ActingUser, Definitions, VHost, Fun) ->
case maps:get(rabbit_data_coercion:to_atom(Name), Definitions, undefined) of
undefined -> ok;
- List -> [Fun(VHost, maps:from_list([{atomise_name(K), V} || {K, V} <- maps:to_list(M)]),
- ActingUser) ||
- M <- List, is_map(M)]
+ List ->
+ WorkPoolFun = fun(M) ->
+ Fun(VHost, atomize_keys(M), ActingUser)
+ end,
+ do_concurrent_for_all(List, WorkPoolFun)
end.
+do_concurrent_for_all(List, WorkPoolFun) ->
+ {ok, Gatherer} = gatherer:start_link(),
+ [begin
+ %% keys are expected to be atoms
+ ok = gatherer:fork(Gatherer),
+ worker_pool:submit_async(
+ ?IMPORT_WORK_POOL,
+ fun() ->
+ try
+ WorkPoolFun(M)
+ catch _:E -> gatherer:in(Gatherer, {error, E});
+ {error, E} -> gatherer:in(Gatherer, {error, E})
+ end,
+ gatherer:finish(Gatherer)
+ end)
+ end || M <- List, is_map(M)],
+ case gatherer:out(Gatherer) of
+ empty ->
+ ok = gatherer:stop(Gatherer);
+ {value, {error, E}} ->
+ ok = gatherer:stop(Gatherer),
+ throw({error, E})
+ end.
+
+-spec atomize_keys(#{any() => any()}) -> #{atom() => any()}.
+
+atomize_keys(M) ->
+ maps:fold(fun(K, V, Acc) ->
+ maps:put(rabbit_data_coercion:to_atom(K), V, Acc)
+ end, #{}, M).
+
-spec human_readable_category_name(definition_category()) -> string().
human_readable_category_name(topic_permissions) -> "topic permissions";
@@ -390,6 +452,8 @@ add_policy(VHost, Param, Username) ->
exit(rabbit_data_coercion:to_binary(rabbit_misc:escape_html_tags(E ++ S)))
end.
+-spec add_vhost(map(), rabbit_types:username()) -> ok.
+
add_vhost(VHost, ActingUser) ->
VHostName = maps:get(name, VHost, undefined),
VHostTrace = maps:get(tracing, VHost, undefined),
@@ -588,7 +652,8 @@ list_exchanges() ->
%% exclude internal exchanges, they are not meant to be declared or used by
%% applications
[exchange_definition(X) || X <- lists:filter(fun(#exchange{internal = true}) -> false;
- (#exchange{}) -> true
+ (#exchange{name = #resource{name = <<>>}}) -> false;
+ (X) -> not rabbit_exchange:is_amq_prefixed(X)
end,
rabbit_exchange:list())].
@@ -664,7 +729,7 @@ list_users() ->
end || U <- rabbit_auth_backend_internal:list_users()].
list_runtime_parameters() ->
- [runtime_parameter_definition(P) || P <- rabbit_runtime_parameters:list()].
+ [runtime_parameter_definition(P) || P <- rabbit_runtime_parameters:list(), is_list(P)].
runtime_parameter_definition(Param) ->
#{
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index df0138d165..fd9ff0c05b 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -24,7 +24,7 @@
update_scratch/3, update_decorators/1, immutable/1,
info_keys/0, info/1, info/2, info_all/1, info_all/2, info_all/4,
route/2, delete/3, validate_binding/2, count/0]).
--export([list_names/0]).
+-export([list_names/0, is_amq_prefixed/1]).
%% these must be run inside a mnesia tx
-export([maybe_auto_delete/2, serial/1, peek_serial/1, update/2]).
@@ -102,6 +102,18 @@ serial(#exchange{name = XName} = X) ->
(false) -> none
end.
+-spec is_amq_prefixed(rabbit_types:exchange() | binary()) -> boolean().
+
+is_amq_prefixed(Name) when is_binary(Name) ->
+ case re:run(Name, <<"^amq\.">>) of
+ nomatch -> false;
+ {match, _} -> true
+ end;
+is_amq_prefixed(#exchange{name = #resource{name = <<>>}}) ->
+ false;
+is_amq_prefixed(#exchange{name = #resource{name = Name}}) ->
+ is_amq_prefixed(Name).
+
-spec declare
(name(), type(), boolean(), boolean(), boolean(),
rabbit_framing:amqp_table(), rabbit_types:username())
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index c845575f5c..fdef5c3606 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -1751,7 +1751,6 @@ build_index(Gatherer, Left, [],
sum_file_size = SumFileSize }) ->
case gatherer:out(Gatherer) of
empty ->
- unlink(Gatherer),
ok = gatherer:stop(Gatherer),
ok = index_clean_up_temporary_reference_count_entries(State),
Offset = case ets:lookup(FileSummaryEts, Left) of
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 42995685f6..dd7e973905 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -717,7 +717,6 @@ queue_index_walker({start, DurableQueues}) when is_list(DurableQueues) ->
queue_index_walker({next, Gatherer}) when is_pid(Gatherer) ->
case gatherer:out(Gatherer) of
empty ->
- unlink(Gatherer),
ok = gatherer:stop(Gatherer),
finished;
{value, {MsgId, Count}} ->
@@ -1432,7 +1431,6 @@ foreach_queue_index(Funs) ->
end)
end || QueueDirName <- QueueDirNames],
empty = gatherer:out(Gatherer),
- unlink(Gatherer),
ok = gatherer:stop(Gatherer).
transform_queue(Dir, Gatherer, {JournalFun, SegmentFun}) ->