summaryrefslogtreecommitdiff
path: root/deps/rabbit/src/rabbit_vhost.erl
diff options
context:
space:
mode:
Diffstat (limited to 'deps/rabbit/src/rabbit_vhost.erl')
-rw-r--r--deps/rabbit/src/rabbit_vhost.erl422
1 files changed, 422 insertions, 0 deletions
diff --git a/deps/rabbit/src/rabbit_vhost.erl b/deps/rabbit/src/rabbit_vhost.erl
new file mode 100644
index 0000000000..c8c5fc961a
--- /dev/null
+++ b/deps/rabbit/src/rabbit_vhost.erl
@@ -0,0 +1,422 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_vhost).
+
+-include_lib("rabbit_common/include/rabbit.hrl").
+-include("vhost.hrl").
+
+-export([recover/0, recover/1]).
+-export([add/2, add/4, delete/2, exists/1, with/2, with_user_and_vhost/3, assert/1, update/2,
+ set_limits/2, vhost_cluster_state/1, is_running_on_all_nodes/1, await_running_on_all_nodes/2,
+ list/0, count/0, list_names/0, all/0, parse_tags/1]).
+-export([info/1, info/2, info_all/0, info_all/1, info_all/2, info_all/3]).
+-export([dir/1, msg_store_dir_path/1, msg_store_dir_wildcard/0]).
+-export([delete_storage/1]).
+-export([vhost_down/1]).
+-export([put_vhost/5]).
+
+%%
+%% API
+%%
+
+recover() ->
+ %% Clear out remnants of old incarnation, in case we restarted
+ %% faster than other nodes handled DOWN messages from us.
+ rabbit_amqqueue:on_node_down(node()),
+
+ rabbit_amqqueue:warn_file_limit(),
+
+ %% Prepare rabbit_semi_durable_route table
+ rabbit_binding:recover(),
+
+ %% rabbit_vhost_sup_sup will start the actual recovery.
+ %% So recovery will be run every time a vhost supervisor is restarted.
+ ok = rabbit_vhost_sup_sup:start(),
+
+ [ok = rabbit_vhost_sup_sup:init_vhost(VHost) || VHost <- list_names()],
+ ok.
+
+recover(VHost) ->
+ VHostDir = msg_store_dir_path(VHost),
+ rabbit_log:info("Making sure data directory '~ts' for vhost '~s' exists~n",
+ [VHostDir, VHost]),
+ VHostStubFile = filename:join(VHostDir, ".vhost"),
+ ok = rabbit_file:ensure_dir(VHostStubFile),
+ ok = file:write_file(VHostStubFile, VHost),
+ {Recovered, Failed} = rabbit_amqqueue:recover(VHost),
+ AllQs = Recovered ++ Failed,
+ QNames = [amqqueue:get_name(Q) || Q <- AllQs],
+ ok = rabbit_binding:recover(rabbit_exchange:recover(VHost), QNames),
+ ok = rabbit_amqqueue:start(Recovered),
+ %% Start queue mirrors.
+ ok = rabbit_mirror_queue_misc:on_vhost_up(VHost),
+ ok.
+
+-define(INFO_KEYS, vhost:info_keys()).
+
+-spec parse_tags(binary() | string() | atom()) -> [atom()].
+parse_tags(undefined) ->
+ [];
+parse_tags("") ->
+ [];
+parse_tags(<<"">>) ->
+ [];
+parse_tags(Val) when is_binary(Val) ->
+ parse_tags(rabbit_data_coercion:to_list(Val));
+parse_tags(Val) when is_list(Val) ->
+ [trim_tag(Tag) || Tag <- re:split(Val, ",", [{return, list}])].
+
+-spec add(vhost:name(), rabbit_types:username()) -> rabbit_types:ok_or_error(any()).
+
+add(VHost, ActingUser) ->
+ case exists(VHost) of
+ true -> ok;
+ false -> do_add(VHost, <<"">>, [], ActingUser)
+ end.
+
+-spec add(vhost:name(), binary(), [atom()], rabbit_types:username()) -> rabbit_types:ok_or_error(any()).
+
+add(Name, Description, Tags, ActingUser) ->
+ case exists(Name) of
+ true -> ok;
+ false -> do_add(Name, Description, Tags, ActingUser)
+ end.
+
+do_add(Name, Description, Tags, ActingUser) ->
+ case Description of
+ undefined ->
+ rabbit_log:info("Adding vhost '~s' without a description", [Name]);
+ Value ->
+ rabbit_log:info("Adding vhost '~s' (description: '~s')", [Name, Value])
+ end,
+ VHost = rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:wread({rabbit_vhost, Name}) of
+ [] ->
+ Row = vhost:new(Name, [], #{description => Description, tags => Tags}),
+ rabbit_log:debug("Inserting a virtual host record ~p", [Row]),
+ ok = mnesia:write(rabbit_vhost, Row, write),
+ Row;
+ %% the vhost already exists
+ [Row] ->
+ Row
+ end
+ end,
+ fun (VHost1, true) ->
+ VHost1;
+ (VHost1, false) ->
+ [begin
+ Resource = rabbit_misc:r(Name, exchange, ExchangeName),
+ rabbit_log:debug("Will declare an exchange ~p", [Resource]),
+ _ = rabbit_exchange:declare(Resource, Type, true, false, Internal, [], ActingUser)
+ end || {ExchangeName, Type, Internal} <-
+ [{<<"">>, direct, false},
+ {<<"amq.direct">>, direct, false},
+ {<<"amq.topic">>, topic, false},
+ %% per 0-9-1 pdf
+ {<<"amq.match">>, headers, false},
+ %% per 0-9-1 xml
+ {<<"amq.headers">>, headers, false},
+ {<<"amq.fanout">>, fanout, false},
+ {<<"amq.rabbitmq.trace">>, topic, true}]],
+ VHost1
+ end),
+ case rabbit_vhost_sup_sup:start_on_all_nodes(Name) of
+ ok ->
+ rabbit_event:notify(vhost_created, info(VHost)
+ ++ [{user_who_performed_action, ActingUser},
+ {description, Description},
+ {tags, Tags}]),
+ ok;
+ {error, Reason} ->
+ Msg = rabbit_misc:format("failed to set up vhost '~s': ~p",
+ [Name, Reason]),
+ {error, Msg}
+ end.
+
+-spec delete(vhost:name(), rabbit_types:username()) -> rabbit_types:ok_or_error(any()).
+
+delete(VHost, ActingUser) ->
+ %% FIXME: We are forced to delete the queues and exchanges outside
+ %% the TX below. Queue deletion involves sending messages to the queue
+ %% process, which in turn results in further mnesia actions and
+ %% eventually the termination of that process. Exchange deletion causes
+ %% notifications which must be sent outside the TX
+ rabbit_log:info("Deleting vhost '~s'~n", [VHost]),
+ QDelFun = fun (Q) -> rabbit_amqqueue:delete(Q, false, false, ActingUser) end,
+ [begin
+ Name = amqqueue:get_name(Q),
+ assert_benign(rabbit_amqqueue:with(Name, QDelFun), ActingUser)
+ end || Q <- rabbit_amqqueue:list(VHost)],
+ [assert_benign(rabbit_exchange:delete(Name, false, ActingUser), ActingUser) ||
+ #exchange{name = Name} <- rabbit_exchange:list(VHost)],
+ Funs = rabbit_misc:execute_mnesia_transaction(
+ with(VHost, fun () -> internal_delete(VHost, ActingUser) end)),
+ ok = rabbit_event:notify(vhost_deleted, [{name, VHost},
+ {user_who_performed_action, ActingUser}]),
+ [case Fun() of
+ ok -> ok;
+ {error, {no_such_vhost, VHost}} -> ok
+ end || Fun <- Funs],
+ %% After vhost was deleted from mnesia DB, we try to stop vhost supervisors
+ %% on all the nodes.
+ rabbit_vhost_sup_sup:delete_on_all_nodes(VHost),
+ ok.
+
+put_vhost(Name, Description, Tags0, Trace, Username) ->
+ Tags = case Tags0 of
+ undefined -> <<"">>;
+ null -> <<"">>;
+ "undefined" -> <<"">>;
+ "null" -> <<"">>;
+ Other -> Other
+ end,
+ Result = case exists(Name) of
+ true -> ok;
+ false -> add(Name, Description, parse_tags(Tags), Username),
+ %% wait for up to 45 seconds for the vhost to initialise
+ %% on all nodes
+ case await_running_on_all_nodes(Name, 45000) of
+ ok ->
+ maybe_grant_full_permissions(Name, Username);
+ {error, timeout} ->
+ {error, timeout}
+ end
+ end,
+ case Trace of
+ true -> rabbit_trace:start(Name);
+ false -> rabbit_trace:stop(Name);
+ undefined -> ok
+ end,
+ Result.
+
+%% when definitions are loaded on boot, Username here will be ?INTERNAL_USER,
+%% which does not actually exist
+maybe_grant_full_permissions(_Name, ?INTERNAL_USER) ->
+ ok;
+maybe_grant_full_permissions(Name, Username) ->
+ U = rabbit_auth_backend_internal:lookup_user(Username),
+ maybe_grant_full_permissions(U, Name, Username).
+
+maybe_grant_full_permissions({ok, _}, Name, Username) ->
+ rabbit_auth_backend_internal:set_permissions(
+ Username, Name, <<".*">>, <<".*">>, <<".*">>, Username);
+maybe_grant_full_permissions(_, _Name, _Username) ->
+ ok.
+
+
+%% 50 ms
+-define(AWAIT_SAMPLE_INTERVAL, 50).
+
+-spec await_running_on_all_nodes(vhost:name(), integer()) -> ok | {error, timeout}.
+await_running_on_all_nodes(VHost, Timeout) ->
+ Attempts = round(Timeout / ?AWAIT_SAMPLE_INTERVAL),
+ await_running_on_all_nodes0(VHost, Attempts).
+
+await_running_on_all_nodes0(_VHost, 0) ->
+ {error, timeout};
+await_running_on_all_nodes0(VHost, Attempts) ->
+ case is_running_on_all_nodes(VHost) of
+ true -> ok;
+ _ ->
+ timer:sleep(?AWAIT_SAMPLE_INTERVAL),
+ await_running_on_all_nodes0(VHost, Attempts - 1)
+ end.
+
+-spec is_running_on_all_nodes(vhost:name()) -> boolean().
+is_running_on_all_nodes(VHost) ->
+ States = vhost_cluster_state(VHost),
+ lists:all(fun ({_Node, State}) -> State =:= running end,
+ States).
+
+-spec vhost_cluster_state(vhost:name()) -> [{atom(), atom()}].
+vhost_cluster_state(VHost) ->
+ Nodes = rabbit_nodes:all_running(),
+ lists:map(fun(Node) ->
+ State = case rabbit_misc:rpc_call(Node,
+ rabbit_vhost_sup_sup, is_vhost_alive,
+ [VHost]) of
+ {badrpc, nodedown} -> nodedown;
+ true -> running;
+ false -> stopped
+ end,
+ {Node, State}
+ end,
+ Nodes).
+
+vhost_down(VHost) ->
+ ok = rabbit_event:notify(vhost_down,
+ [{name, VHost},
+ {node, node()},
+ {user_who_performed_action, ?INTERNAL_USER}]).
+
+delete_storage(VHost) ->
+ VhostDir = msg_store_dir_path(VHost),
+ rabbit_log:info("Deleting message store directory for vhost '~s' at '~s'~n", [VHost, VhostDir]),
+ %% Message store should be closed when vhost supervisor is closed.
+ case rabbit_file:recursive_delete([VhostDir]) of
+ ok -> ok;
+ {error, {_, enoent}} ->
+ %% a concurrent delete did the job for us
+ rabbit_log:warning("Tried to delete storage directories for vhost '~s', it failed with an ENOENT", [VHost]),
+ ok;
+ Other ->
+ rabbit_log:warning("Tried to delete storage directories for vhost '~s': ~p", [VHost, Other]),
+ Other
+ end.
+
+assert_benign(ok, _) -> ok;
+assert_benign({ok, _}, _) -> ok;
+assert_benign({ok, _, _}, _) -> ok;
+assert_benign({error, not_found}, _) -> ok;
+assert_benign({error, {absent, Q, _}}, ActingUser) ->
+ %% Removing the mnesia entries here is safe. If/when the down node
+ %% restarts, it will clear out the on-disk storage of the queue.
+ QName = amqqueue:get_name(Q),
+ rabbit_amqqueue:internal_delete(QName, ActingUser).
+
+internal_delete(VHost, ActingUser) ->
+ [ok = rabbit_auth_backend_internal:clear_permissions(
+ proplists:get_value(user, Info), VHost, ActingUser)
+ || Info <- rabbit_auth_backend_internal:list_vhost_permissions(VHost)],
+ TopicPermissions = rabbit_auth_backend_internal:list_vhost_topic_permissions(VHost),
+ [ok = rabbit_auth_backend_internal:clear_topic_permissions(
+ proplists:get_value(user, TopicPermission), VHost, ActingUser)
+ || TopicPermission <- TopicPermissions],
+ Fs1 = [rabbit_runtime_parameters:clear(VHost,
+ proplists:get_value(component, Info),
+ proplists:get_value(name, Info),
+ ActingUser)
+ || Info <- rabbit_runtime_parameters:list(VHost)],
+ Fs2 = [rabbit_policy:delete(VHost, proplists:get_value(name, Info), ActingUser)
+ || Info <- rabbit_policy:list(VHost)],
+ ok = mnesia:delete({rabbit_vhost, VHost}),
+ Fs1 ++ Fs2.
+
+-spec exists(vhost:name()) -> boolean().
+
+exists(VHost) ->
+ mnesia:dirty_read({rabbit_vhost, VHost}) /= [].
+
+-spec list_names() -> [vhost:name()].
+list_names() -> mnesia:dirty_all_keys(rabbit_vhost).
+
+%% Exists for backwards compatibility, prefer list_names/0.
+-spec list() -> [vhost:name()].
+list() -> list_names().
+
+-spec all() -> [vhost:vhost()].
+all() -> mnesia:dirty_match_object(rabbit_vhost, vhost:pattern_match_all()).
+
+-spec count() -> non_neg_integer().
+count() ->
+ length(list()).
+
+-spec with(vhost:name(), rabbit_misc:thunk(A)) -> A.
+
+with(VHost, Thunk) ->
+ fun () ->
+ case mnesia:read({rabbit_vhost, VHost}) of
+ [] ->
+ mnesia:abort({no_such_vhost, VHost});
+ [_V] ->
+ Thunk()
+ end
+ end.
+
+-spec with_user_and_vhost
+ (rabbit_types:username(), vhost:name(), rabbit_misc:thunk(A)) -> A.
+
+with_user_and_vhost(Username, VHost, Thunk) ->
+ rabbit_misc:with_user(Username, with(VHost, Thunk)).
+
+%% Like with/2 but outside an Mnesia tx
+
+-spec assert(vhost:name()) -> 'ok'.
+
+assert(VHost) -> case exists(VHost) of
+ true -> ok;
+ false -> throw({error, {no_such_vhost, VHost}})
+ end.
+
+-spec update(vhost:name(), fun((vhost:vhost()) -> vhost:vhost())) -> vhost:vhost().
+
+update(VHost, Fun) ->
+ case mnesia:read({rabbit_vhost, VHost}) of
+ [] ->
+ mnesia:abort({no_such_vhost, VHost});
+ [V] ->
+ V1 = Fun(V),
+ ok = mnesia:write(rabbit_vhost, V1, write),
+ V1
+ end.
+
+set_limits(VHost, undefined) ->
+ vhost:set_limits(VHost, []);
+set_limits(VHost, Limits) ->
+ vhost:set_limits(VHost, Limits).
+
+
+dir(Vhost) ->
+ <<Num:128>> = erlang:md5(Vhost),
+ rabbit_misc:format("~.36B", [Num]).
+
+msg_store_dir_path(VHost) ->
+ EncodedName = dir(VHost),
+ rabbit_data_coercion:to_list(filename:join([msg_store_dir_base(), EncodedName])).
+
+msg_store_dir_wildcard() ->
+ rabbit_data_coercion:to_list(filename:join([msg_store_dir_base(), "*"])).
+
+msg_store_dir_base() ->
+ Dir = rabbit_mnesia:dir(),
+ filename:join([Dir, "msg_stores", "vhosts"]).
+
+-spec trim_tag(list() | binary() | atom()) -> atom().
+trim_tag(Val) ->
+ rabbit_data_coercion:to_atom(string:trim(rabbit_data_coercion:to_list(Val))).
+
+%%----------------------------------------------------------------------------
+
+infos(Items, X) -> [{Item, i(Item, X)} || Item <- Items].
+
+i(name, VHost) -> vhost:get_name(VHost);
+i(tracing, VHost) -> rabbit_trace:enabled(vhost:get_name(VHost));
+i(cluster_state, VHost) -> vhost_cluster_state(vhost:get_name(VHost));
+i(description, VHost) -> vhost:get_description(VHost);
+i(tags, VHost) -> vhost:get_tags(VHost);
+i(metadata, VHost) -> vhost:get_metadata(VHost);
+i(Item, VHost) ->
+ rabbit_log:error("Don't know how to compute a virtual host info item '~s' for virtual host '~p'", [Item, VHost]),
+ throw({bad_argument, Item}).
+
+-spec info(vhost:vhost() | vhost:name()) -> rabbit_types:infos().
+
+info(VHost) when ?is_vhost(VHost) ->
+ infos(?INFO_KEYS, VHost);
+info(Key) ->
+ case mnesia:dirty_read({rabbit_vhost, Key}) of
+ [] -> [];
+ [VHost] -> infos(?INFO_KEYS, VHost)
+ end.
+
+-spec info(vhost:vhost(), rabbit_types:info_keys()) -> rabbit_types:infos().
+info(VHost, Items) -> infos(Items, VHost).
+
+-spec info_all() -> [rabbit_types:infos()].
+info_all() -> info_all(?INFO_KEYS).
+
+-spec info_all(rabbit_types:info_keys()) -> [rabbit_types:infos()].
+info_all(Items) -> [info(VHost, Items) || VHost <- all()].
+
+info_all(Ref, AggregatorPid) -> info_all(?INFO_KEYS, Ref, AggregatorPid).
+
+-spec info_all(rabbit_types:info_keys(), reference(), pid()) -> 'ok'.
+info_all(Items, Ref, AggregatorPid) ->
+ rabbit_control_misc:emitting_map(
+ AggregatorPid, Ref, fun(VHost) -> info(VHost, Items) end, all()).