diff options
Diffstat (limited to 'deps/rabbit/src/rabbit_vhost_sup_sup.erl')
-rw-r--r-- | deps/rabbit/src/rabbit_vhost_sup_sup.erl | 271 |
1 files changed, 271 insertions, 0 deletions
diff --git a/deps/rabbit/src/rabbit_vhost_sup_sup.erl b/deps/rabbit/src/rabbit_vhost_sup_sup.erl new file mode 100644 index 0000000000..c201237daa --- /dev/null +++ b/deps/rabbit/src/rabbit_vhost_sup_sup.erl @@ -0,0 +1,271 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_vhost_sup_sup). + +-include("rabbit.hrl"). + +-behaviour(supervisor2). + +-export([init/1]). + +-export([start_link/0, start/0]). +-export([init_vhost/1, + start_vhost/1, start_vhost/2, + get_vhost_sup/1, get_vhost_sup/2, + save_vhost_sup/3, + save_vhost_process/2]). +-export([delete_on_all_nodes/1, start_on_all_nodes/1]). +-export([is_vhost_alive/1]). +-export([check/0]). + +%% Internal +-export([stop_and_delete_vhost/1]). + +-record(vhost_sup, {vhost, vhost_sup_pid, wrapper_pid, vhost_process_pid}). + +start() -> + case supervisor:start_child(rabbit_sup, {?MODULE, + {?MODULE, start_link, []}, + permanent, infinity, supervisor, + [?MODULE]}) of + {ok, _} -> ok; + {error, Err} -> {error, Err} + end. + +start_link() -> + supervisor2:start_link({local, ?MODULE}, ?MODULE, []). + +init([]) -> + %% This assumes that a single vhost termination should not shut down nodes + %% unless the operator opts in. + RestartStrategy = vhost_restart_strategy(), + ets:new(?MODULE, [named_table, public, {keypos, #vhost_sup.vhost}]), + {ok, {{simple_one_for_one, 0, 5}, + [{rabbit_vhost, {rabbit_vhost_sup_wrapper, start_link, []}, + RestartStrategy, ?SUPERVISOR_WAIT, supervisor, + [rabbit_vhost_sup_wrapper, rabbit_vhost_sup]}]}}. + +start_on_all_nodes(VHost) -> + %% Do not try to start a vhost on booting peer nodes + AllBooted = [Node || Node <- rabbit_nodes:all_running(), rabbit:is_booted(Node)], + Nodes = [node() | AllBooted], + Results = [{Node, start_vhost(VHost, Node)} || Node <- Nodes], + Failures = lists:filter(fun + ({_, {ok, _}}) -> false; + ({_, {error, {already_started, _}}}) -> false; + (_) -> true + end, + Results), + case Failures of + [] -> ok; + Errors -> {error, {failed_to_start_vhost_on_nodes, Errors}} + end. + +delete_on_all_nodes(VHost) -> + [ stop_and_delete_vhost(VHost, Node) || Node <- rabbit_nodes:all_running() ], + ok. + +stop_and_delete_vhost(VHost) -> + StopResult = case lookup_vhost_sup_record(VHost) of + not_found -> ok; + #vhost_sup{wrapper_pid = WrapperPid, + vhost_sup_pid = VHostSupPid} -> + case is_process_alive(WrapperPid) of + false -> ok; + true -> + rabbit_log:info("Stopping vhost supervisor ~p" + " for vhost '~s'~n", + [VHostSupPid, VHost]), + case supervisor2:terminate_child(?MODULE, WrapperPid) of + ok -> + true = ets:delete(?MODULE, VHost), + ok; + Other -> + Other + end + end + end, + ok = rabbit_vhost:delete_storage(VHost), + StopResult. + +%% We take an optimistic approach whan stopping a remote VHost supervisor. +stop_and_delete_vhost(VHost, Node) when Node == node(self()) -> + stop_and_delete_vhost(VHost); +stop_and_delete_vhost(VHost, Node) -> + case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, stop_and_delete_vhost, [VHost]) of + ok -> ok; + {badrpc, RpcErr} -> + rabbit_log:error("Failed to stop and delete a vhost ~p" + " on node ~p." + " Reason: ~p", + [VHost, Node, RpcErr]), + {error, RpcErr} + end. + +-spec init_vhost(rabbit_types:vhost()) -> ok | {error, {no_such_vhost, rabbit_types:vhost()}}. +init_vhost(VHost) -> + case start_vhost(VHost) of + {ok, _} -> ok; + {error, {already_started, _}} -> + rabbit_log:warning( + "Attempting to start an already started vhost '~s'.", + [VHost]), + ok; + {error, {no_such_vhost, VHost}} -> + {error, {no_such_vhost, VHost}}; + {error, Reason} -> + case vhost_restart_strategy() of + permanent -> + rabbit_log:error( + "Unable to initialize vhost data store for vhost '~s'." + " Reason: ~p", + [VHost, Reason]), + throw({error, Reason}); + transient -> + rabbit_log:warning( + "Unable to initialize vhost data store for vhost '~s'." + " The vhost will be stopped for this node. " + " Reason: ~p", + [VHost, Reason]), + ok + end + end. + +-type vhost_error() :: {no_such_vhost, rabbit_types:vhost()} | + {vhost_supervisor_not_running, rabbit_types:vhost()}. + +-spec get_vhost_sup(rabbit_types:vhost(), node()) -> {ok, pid()} | {error, vhost_error() | term()}. +get_vhost_sup(VHost, Node) -> + case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, get_vhost_sup, [VHost]) of + {ok, Pid} when is_pid(Pid) -> + {ok, Pid}; + {error, Err} -> + {error, Err}; + {badrpc, RpcErr} -> + {error, RpcErr} + end. + +-spec get_vhost_sup(rabbit_types:vhost()) -> {ok, pid()} | {error, vhost_error()}. +get_vhost_sup(VHost) -> + case rabbit_vhost:exists(VHost) of + false -> + {error, {no_such_vhost, VHost}}; + true -> + case vhost_sup_pid(VHost) of + no_pid -> + {error, {vhost_supervisor_not_running, VHost}}; + {ok, Pid} when is_pid(Pid) -> + {ok, Pid} + end + end. + +-spec start_vhost(rabbit_types:vhost(), node()) -> {ok, pid()} | {error, term()}. +start_vhost(VHost, Node) -> + case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, start_vhost, [VHost]) of + {ok, Pid} -> {ok, Pid}; + {error, Err} -> {error, Err}; + {badrpc, RpcErr} -> {error, RpcErr} + end. + +-spec start_vhost(rabbit_types:vhost()) -> {ok, pid()} | {error, term()}. +start_vhost(VHost) -> + case rabbit_vhost:exists(VHost) of + false -> {error, {no_such_vhost, VHost}}; + true -> + case whereis(?MODULE) of + Pid when is_pid(Pid) -> + supervisor2:start_child(?MODULE, [VHost]); + undefined -> + {error, rabbit_vhost_sup_sup_not_running} + end + end. + +-spec is_vhost_alive(rabbit_types:vhost()) -> boolean(). +is_vhost_alive(VHost) -> +%% A vhost is considered alive if it's supervision tree is alive and +%% saved in the ETS table + case lookup_vhost_sup_record(VHost) of + #vhost_sup{wrapper_pid = WrapperPid, + vhost_sup_pid = VHostSupPid, + vhost_process_pid = VHostProcessPid} + when is_pid(WrapperPid), + is_pid(VHostSupPid), + is_pid(VHostProcessPid) -> + is_process_alive(WrapperPid) + andalso + is_process_alive(VHostSupPid) + andalso + is_process_alive(VHostProcessPid); + _ -> false + end. + + +-spec save_vhost_sup(rabbit_types:vhost(), pid(), pid()) -> ok. +save_vhost_sup(VHost, WrapperPid, VHostPid) -> + true = ets:insert(?MODULE, #vhost_sup{vhost = VHost, + vhost_sup_pid = VHostPid, + wrapper_pid = WrapperPid}), + ok. + +-spec save_vhost_process(rabbit_types:vhost(), pid()) -> ok. +save_vhost_process(VHost, VHostProcessPid) -> + true = ets:update_element(?MODULE, VHost, + {#vhost_sup.vhost_process_pid, VHostProcessPid}), + ok. + +-spec lookup_vhost_sup_record(rabbit_types:vhost()) -> #vhost_sup{} | not_found. +lookup_vhost_sup_record(VHost) -> + case ets:info(?MODULE, name) of + ?MODULE -> + case ets:lookup(?MODULE, VHost) of + [] -> not_found; + [#vhost_sup{} = VHostSup] -> VHostSup + end; + undefined -> not_found + end. + +-spec vhost_sup_pid(rabbit_types:vhost()) -> no_pid | {ok, pid()}. +vhost_sup_pid(VHost) -> + case lookup_vhost_sup_record(VHost) of + not_found -> + no_pid; + #vhost_sup{vhost_sup_pid = Pid} = VHostSup -> + case erlang:is_process_alive(Pid) of + true -> {ok, Pid}; + false -> + ets:delete_object(?MODULE, VHostSup), + no_pid + end + end. + +vhost_restart_strategy() -> + %% This assumes that a single vhost termination should not shut down nodes + %% unless the operator opts in. + case application:get_env(rabbit, vhost_restart_strategy, continue) of + continue -> transient; + stop_node -> permanent; + transient -> transient; + permanent -> permanent + end. + +check() -> + VHosts = rabbit_vhost:list_names(), + lists:filter( + fun(V) -> + case rabbit_vhost_sup_sup:get_vhost_sup(V) of + {ok, Sup} -> + MsgStores = [Pid || {Name, Pid, _, _} <- supervisor:which_children(Sup), + lists:member(Name, [msg_store_persistent, + msg_store_transient])], + not is_vhost_alive(V) orelse (not lists:all(fun(P) -> + erlang:is_process_alive(P) + end, MsgStores)); + {error, _} -> + true + end + end, VHosts). |