diff options
Diffstat (limited to 'deps/rabbit/src/rabbit_vhost.erl')
-rw-r--r-- | deps/rabbit/src/rabbit_vhost.erl | 422 |
1 files changed, 422 insertions, 0 deletions
diff --git a/deps/rabbit/src/rabbit_vhost.erl b/deps/rabbit/src/rabbit_vhost.erl new file mode 100644 index 0000000000..c8c5fc961a --- /dev/null +++ b/deps/rabbit/src/rabbit_vhost.erl @@ -0,0 +1,422 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_vhost). + +-include_lib("rabbit_common/include/rabbit.hrl"). +-include("vhost.hrl"). + +-export([recover/0, recover/1]). +-export([add/2, add/4, delete/2, exists/1, with/2, with_user_and_vhost/3, assert/1, update/2, + set_limits/2, vhost_cluster_state/1, is_running_on_all_nodes/1, await_running_on_all_nodes/2, + list/0, count/0, list_names/0, all/0, parse_tags/1]). +-export([info/1, info/2, info_all/0, info_all/1, info_all/2, info_all/3]). +-export([dir/1, msg_store_dir_path/1, msg_store_dir_wildcard/0]). +-export([delete_storage/1]). +-export([vhost_down/1]). +-export([put_vhost/5]). + +%% +%% API +%% + +recover() -> + %% Clear out remnants of old incarnation, in case we restarted + %% faster than other nodes handled DOWN messages from us. + rabbit_amqqueue:on_node_down(node()), + + rabbit_amqqueue:warn_file_limit(), + + %% Prepare rabbit_semi_durable_route table + rabbit_binding:recover(), + + %% rabbit_vhost_sup_sup will start the actual recovery. + %% So recovery will be run every time a vhost supervisor is restarted. + ok = rabbit_vhost_sup_sup:start(), + + [ok = rabbit_vhost_sup_sup:init_vhost(VHost) || VHost <- list_names()], + ok. + +recover(VHost) -> + VHostDir = msg_store_dir_path(VHost), + rabbit_log:info("Making sure data directory '~ts' for vhost '~s' exists~n", + [VHostDir, VHost]), + VHostStubFile = filename:join(VHostDir, ".vhost"), + ok = rabbit_file:ensure_dir(VHostStubFile), + ok = file:write_file(VHostStubFile, VHost), + {Recovered, Failed} = rabbit_amqqueue:recover(VHost), + AllQs = Recovered ++ Failed, + QNames = [amqqueue:get_name(Q) || Q <- AllQs], + ok = rabbit_binding:recover(rabbit_exchange:recover(VHost), QNames), + ok = rabbit_amqqueue:start(Recovered), + %% Start queue mirrors. + ok = rabbit_mirror_queue_misc:on_vhost_up(VHost), + ok. + +-define(INFO_KEYS, vhost:info_keys()). + +-spec parse_tags(binary() | string() | atom()) -> [atom()]. +parse_tags(undefined) -> + []; +parse_tags("") -> + []; +parse_tags(<<"">>) -> + []; +parse_tags(Val) when is_binary(Val) -> + parse_tags(rabbit_data_coercion:to_list(Val)); +parse_tags(Val) when is_list(Val) -> + [trim_tag(Tag) || Tag <- re:split(Val, ",", [{return, list}])]. + +-spec add(vhost:name(), rabbit_types:username()) -> rabbit_types:ok_or_error(any()). + +add(VHost, ActingUser) -> + case exists(VHost) of + true -> ok; + false -> do_add(VHost, <<"">>, [], ActingUser) + end. + +-spec add(vhost:name(), binary(), [atom()], rabbit_types:username()) -> rabbit_types:ok_or_error(any()). + +add(Name, Description, Tags, ActingUser) -> + case exists(Name) of + true -> ok; + false -> do_add(Name, Description, Tags, ActingUser) + end. + +do_add(Name, Description, Tags, ActingUser) -> + case Description of + undefined -> + rabbit_log:info("Adding vhost '~s' without a description", [Name]); + Value -> + rabbit_log:info("Adding vhost '~s' (description: '~s')", [Name, Value]) + end, + VHost = rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:wread({rabbit_vhost, Name}) of + [] -> + Row = vhost:new(Name, [], #{description => Description, tags => Tags}), + rabbit_log:debug("Inserting a virtual host record ~p", [Row]), + ok = mnesia:write(rabbit_vhost, Row, write), + Row; + %% the vhost already exists + [Row] -> + Row + end + end, + fun (VHost1, true) -> + VHost1; + (VHost1, false) -> + [begin + Resource = rabbit_misc:r(Name, exchange, ExchangeName), + rabbit_log:debug("Will declare an exchange ~p", [Resource]), + _ = rabbit_exchange:declare(Resource, Type, true, false, Internal, [], ActingUser) + end || {ExchangeName, Type, Internal} <- + [{<<"">>, direct, false}, + {<<"amq.direct">>, direct, false}, + {<<"amq.topic">>, topic, false}, + %% per 0-9-1 pdf + {<<"amq.match">>, headers, false}, + %% per 0-9-1 xml + {<<"amq.headers">>, headers, false}, + {<<"amq.fanout">>, fanout, false}, + {<<"amq.rabbitmq.trace">>, topic, true}]], + VHost1 + end), + case rabbit_vhost_sup_sup:start_on_all_nodes(Name) of + ok -> + rabbit_event:notify(vhost_created, info(VHost) + ++ [{user_who_performed_action, ActingUser}, + {description, Description}, + {tags, Tags}]), + ok; + {error, Reason} -> + Msg = rabbit_misc:format("failed to set up vhost '~s': ~p", + [Name, Reason]), + {error, Msg} + end. + +-spec delete(vhost:name(), rabbit_types:username()) -> rabbit_types:ok_or_error(any()). + +delete(VHost, ActingUser) -> + %% FIXME: We are forced to delete the queues and exchanges outside + %% the TX below. Queue deletion involves sending messages to the queue + %% process, which in turn results in further mnesia actions and + %% eventually the termination of that process. Exchange deletion causes + %% notifications which must be sent outside the TX + rabbit_log:info("Deleting vhost '~s'~n", [VHost]), + QDelFun = fun (Q) -> rabbit_amqqueue:delete(Q, false, false, ActingUser) end, + [begin + Name = amqqueue:get_name(Q), + assert_benign(rabbit_amqqueue:with(Name, QDelFun), ActingUser) + end || Q <- rabbit_amqqueue:list(VHost)], + [assert_benign(rabbit_exchange:delete(Name, false, ActingUser), ActingUser) || + #exchange{name = Name} <- rabbit_exchange:list(VHost)], + Funs = rabbit_misc:execute_mnesia_transaction( + with(VHost, fun () -> internal_delete(VHost, ActingUser) end)), + ok = rabbit_event:notify(vhost_deleted, [{name, VHost}, + {user_who_performed_action, ActingUser}]), + [case Fun() of + ok -> ok; + {error, {no_such_vhost, VHost}} -> ok + end || Fun <- Funs], + %% After vhost was deleted from mnesia DB, we try to stop vhost supervisors + %% on all the nodes. + rabbit_vhost_sup_sup:delete_on_all_nodes(VHost), + ok. + +put_vhost(Name, Description, Tags0, Trace, Username) -> + Tags = case Tags0 of + undefined -> <<"">>; + null -> <<"">>; + "undefined" -> <<"">>; + "null" -> <<"">>; + Other -> Other + end, + Result = case exists(Name) of + true -> ok; + false -> add(Name, Description, parse_tags(Tags), Username), + %% wait for up to 45 seconds for the vhost to initialise + %% on all nodes + case await_running_on_all_nodes(Name, 45000) of + ok -> + maybe_grant_full_permissions(Name, Username); + {error, timeout} -> + {error, timeout} + end + end, + case Trace of + true -> rabbit_trace:start(Name); + false -> rabbit_trace:stop(Name); + undefined -> ok + end, + Result. + +%% when definitions are loaded on boot, Username here will be ?INTERNAL_USER, +%% which does not actually exist +maybe_grant_full_permissions(_Name, ?INTERNAL_USER) -> + ok; +maybe_grant_full_permissions(Name, Username) -> + U = rabbit_auth_backend_internal:lookup_user(Username), + maybe_grant_full_permissions(U, Name, Username). + +maybe_grant_full_permissions({ok, _}, Name, Username) -> + rabbit_auth_backend_internal:set_permissions( + Username, Name, <<".*">>, <<".*">>, <<".*">>, Username); +maybe_grant_full_permissions(_, _Name, _Username) -> + ok. + + +%% 50 ms +-define(AWAIT_SAMPLE_INTERVAL, 50). + +-spec await_running_on_all_nodes(vhost:name(), integer()) -> ok | {error, timeout}. +await_running_on_all_nodes(VHost, Timeout) -> + Attempts = round(Timeout / ?AWAIT_SAMPLE_INTERVAL), + await_running_on_all_nodes0(VHost, Attempts). + +await_running_on_all_nodes0(_VHost, 0) -> + {error, timeout}; +await_running_on_all_nodes0(VHost, Attempts) -> + case is_running_on_all_nodes(VHost) of + true -> ok; + _ -> + timer:sleep(?AWAIT_SAMPLE_INTERVAL), + await_running_on_all_nodes0(VHost, Attempts - 1) + end. + +-spec is_running_on_all_nodes(vhost:name()) -> boolean(). +is_running_on_all_nodes(VHost) -> + States = vhost_cluster_state(VHost), + lists:all(fun ({_Node, State}) -> State =:= running end, + States). + +-spec vhost_cluster_state(vhost:name()) -> [{atom(), atom()}]. +vhost_cluster_state(VHost) -> + Nodes = rabbit_nodes:all_running(), + lists:map(fun(Node) -> + State = case rabbit_misc:rpc_call(Node, + rabbit_vhost_sup_sup, is_vhost_alive, + [VHost]) of + {badrpc, nodedown} -> nodedown; + true -> running; + false -> stopped + end, + {Node, State} + end, + Nodes). + +vhost_down(VHost) -> + ok = rabbit_event:notify(vhost_down, + [{name, VHost}, + {node, node()}, + {user_who_performed_action, ?INTERNAL_USER}]). + +delete_storage(VHost) -> + VhostDir = msg_store_dir_path(VHost), + rabbit_log:info("Deleting message store directory for vhost '~s' at '~s'~n", [VHost, VhostDir]), + %% Message store should be closed when vhost supervisor is closed. + case rabbit_file:recursive_delete([VhostDir]) of + ok -> ok; + {error, {_, enoent}} -> + %% a concurrent delete did the job for us + rabbit_log:warning("Tried to delete storage directories for vhost '~s', it failed with an ENOENT", [VHost]), + ok; + Other -> + rabbit_log:warning("Tried to delete storage directories for vhost '~s': ~p", [VHost, Other]), + Other + end. + +assert_benign(ok, _) -> ok; +assert_benign({ok, _}, _) -> ok; +assert_benign({ok, _, _}, _) -> ok; +assert_benign({error, not_found}, _) -> ok; +assert_benign({error, {absent, Q, _}}, ActingUser) -> + %% Removing the mnesia entries here is safe. If/when the down node + %% restarts, it will clear out the on-disk storage of the queue. + QName = amqqueue:get_name(Q), + rabbit_amqqueue:internal_delete(QName, ActingUser). + +internal_delete(VHost, ActingUser) -> + [ok = rabbit_auth_backend_internal:clear_permissions( + proplists:get_value(user, Info), VHost, ActingUser) + || Info <- rabbit_auth_backend_internal:list_vhost_permissions(VHost)], + TopicPermissions = rabbit_auth_backend_internal:list_vhost_topic_permissions(VHost), + [ok = rabbit_auth_backend_internal:clear_topic_permissions( + proplists:get_value(user, TopicPermission), VHost, ActingUser) + || TopicPermission <- TopicPermissions], + Fs1 = [rabbit_runtime_parameters:clear(VHost, + proplists:get_value(component, Info), + proplists:get_value(name, Info), + ActingUser) + || Info <- rabbit_runtime_parameters:list(VHost)], + Fs2 = [rabbit_policy:delete(VHost, proplists:get_value(name, Info), ActingUser) + || Info <- rabbit_policy:list(VHost)], + ok = mnesia:delete({rabbit_vhost, VHost}), + Fs1 ++ Fs2. + +-spec exists(vhost:name()) -> boolean(). + +exists(VHost) -> + mnesia:dirty_read({rabbit_vhost, VHost}) /= []. + +-spec list_names() -> [vhost:name()]. +list_names() -> mnesia:dirty_all_keys(rabbit_vhost). + +%% Exists for backwards compatibility, prefer list_names/0. +-spec list() -> [vhost:name()]. +list() -> list_names(). + +-spec all() -> [vhost:vhost()]. +all() -> mnesia:dirty_match_object(rabbit_vhost, vhost:pattern_match_all()). + +-spec count() -> non_neg_integer(). +count() -> + length(list()). + +-spec with(vhost:name(), rabbit_misc:thunk(A)) -> A. + +with(VHost, Thunk) -> + fun () -> + case mnesia:read({rabbit_vhost, VHost}) of + [] -> + mnesia:abort({no_such_vhost, VHost}); + [_V] -> + Thunk() + end + end. + +-spec with_user_and_vhost + (rabbit_types:username(), vhost:name(), rabbit_misc:thunk(A)) -> A. + +with_user_and_vhost(Username, VHost, Thunk) -> + rabbit_misc:with_user(Username, with(VHost, Thunk)). + +%% Like with/2 but outside an Mnesia tx + +-spec assert(vhost:name()) -> 'ok'. + +assert(VHost) -> case exists(VHost) of + true -> ok; + false -> throw({error, {no_such_vhost, VHost}}) + end. + +-spec update(vhost:name(), fun((vhost:vhost()) -> vhost:vhost())) -> vhost:vhost(). + +update(VHost, Fun) -> + case mnesia:read({rabbit_vhost, VHost}) of + [] -> + mnesia:abort({no_such_vhost, VHost}); + [V] -> + V1 = Fun(V), + ok = mnesia:write(rabbit_vhost, V1, write), + V1 + end. + +set_limits(VHost, undefined) -> + vhost:set_limits(VHost, []); +set_limits(VHost, Limits) -> + vhost:set_limits(VHost, Limits). + + +dir(Vhost) -> + <<Num:128>> = erlang:md5(Vhost), + rabbit_misc:format("~.36B", [Num]). + +msg_store_dir_path(VHost) -> + EncodedName = dir(VHost), + rabbit_data_coercion:to_list(filename:join([msg_store_dir_base(), EncodedName])). + +msg_store_dir_wildcard() -> + rabbit_data_coercion:to_list(filename:join([msg_store_dir_base(), "*"])). + +msg_store_dir_base() -> + Dir = rabbit_mnesia:dir(), + filename:join([Dir, "msg_stores", "vhosts"]). + +-spec trim_tag(list() | binary() | atom()) -> atom(). +trim_tag(Val) -> + rabbit_data_coercion:to_atom(string:trim(rabbit_data_coercion:to_list(Val))). + +%%---------------------------------------------------------------------------- + +infos(Items, X) -> [{Item, i(Item, X)} || Item <- Items]. + +i(name, VHost) -> vhost:get_name(VHost); +i(tracing, VHost) -> rabbit_trace:enabled(vhost:get_name(VHost)); +i(cluster_state, VHost) -> vhost_cluster_state(vhost:get_name(VHost)); +i(description, VHost) -> vhost:get_description(VHost); +i(tags, VHost) -> vhost:get_tags(VHost); +i(metadata, VHost) -> vhost:get_metadata(VHost); +i(Item, VHost) -> + rabbit_log:error("Don't know how to compute a virtual host info item '~s' for virtual host '~p'", [Item, VHost]), + throw({bad_argument, Item}). + +-spec info(vhost:vhost() | vhost:name()) -> rabbit_types:infos(). + +info(VHost) when ?is_vhost(VHost) -> + infos(?INFO_KEYS, VHost); +info(Key) -> + case mnesia:dirty_read({rabbit_vhost, Key}) of + [] -> []; + [VHost] -> infos(?INFO_KEYS, VHost) + end. + +-spec info(vhost:vhost(), rabbit_types:info_keys()) -> rabbit_types:infos(). +info(VHost, Items) -> infos(Items, VHost). + +-spec info_all() -> [rabbit_types:infos()]. +info_all() -> info_all(?INFO_KEYS). + +-spec info_all(rabbit_types:info_keys()) -> [rabbit_types:infos()]. +info_all(Items) -> [info(VHost, Items) || VHost <- all()]. + +info_all(Ref, AggregatorPid) -> info_all(?INFO_KEYS, Ref, AggregatorPid). + +-spec info_all(rabbit_types:info_keys(), reference(), pid()) -> 'ok'. +info_all(Items, Ref, AggregatorPid) -> + rabbit_control_misc:emitting_map( + AggregatorPid, Ref, fun(VHost) -> info(VHost, Items) end, all()). |