summaryrefslogtreecommitdiff
path: root/deps/rabbit/apps
diff options
context:
space:
mode:
authordcorbacho <dparracorbacho@piotal.io>2020-11-18 14:27:41 +0000
committerdcorbacho <dparracorbacho@piotal.io>2020-11-18 14:27:41 +0000
commitf23a51261d9502ec39df0f8db47ba6b22aa7659f (patch)
tree53dcdf46e7dc2c14e81ee960bce8793879b488d3 /deps/rabbit/apps
parentafa2c2bf6c7e0e9b63f4fb53dc931c70388e1c82 (diff)
parent9f6d64ec4a4b1eeac24d7846c5c64fd96798d892 (diff)
downloadrabbitmq-server-git-stream-timestamp-offset.tar.gz
Merge remote-tracking branch 'origin/master' into stream-timestamp-offsetstream-timestamp-offset
Diffstat (limited to 'deps/rabbit/apps')
-rw-r--r--deps/rabbit/apps/rabbitmq_prelaunch/.gitignore12
-rw-r--r--deps/rabbit/apps/rabbitmq_prelaunch/Makefile11
-rw-r--r--deps/rabbit/apps/rabbitmq_prelaunch/src/rabbit_boot_state.erl76
-rw-r--r--deps/rabbit/apps/rabbitmq_prelaunch/src/rabbit_boot_state_sup.erl38
-rw-r--r--deps/rabbit/apps/rabbitmq_prelaunch/src/rabbit_boot_state_systemd.erl174
-rw-r--r--deps/rabbit/apps/rabbitmq_prelaunch/src/rabbit_prelaunch.erl228
-rw-r--r--deps/rabbit/apps/rabbitmq_prelaunch/src/rabbit_prelaunch_app.erl11
-rw-r--r--deps/rabbit/apps/rabbitmq_prelaunch/src/rabbit_prelaunch_conf.erl520
-rw-r--r--deps/rabbit/apps/rabbitmq_prelaunch/src/rabbit_prelaunch_dist.erl104
-rw-r--r--deps/rabbit/apps/rabbitmq_prelaunch/src/rabbit_prelaunch_early_logging.erl115
-rw-r--r--deps/rabbit/apps/rabbitmq_prelaunch/src/rabbit_prelaunch_erlang_compat.erl47
-rw-r--r--deps/rabbit/apps/rabbitmq_prelaunch/src/rabbit_prelaunch_errors.erl114
-rw-r--r--deps/rabbit/apps/rabbitmq_prelaunch/src/rabbit_prelaunch_sighandler.erl93
-rw-r--r--deps/rabbit/apps/rabbitmq_prelaunch/src/rabbit_prelaunch_sup.erl22
14 files changed, 1565 insertions, 0 deletions
diff --git a/deps/rabbit/apps/rabbitmq_prelaunch/.gitignore b/deps/rabbit/apps/rabbitmq_prelaunch/.gitignore
new file mode 100644
index 0000000000..adca0d7655
--- /dev/null
+++ b/deps/rabbit/apps/rabbitmq_prelaunch/.gitignore
@@ -0,0 +1,12 @@
+*~
+.sw?
+.*.sw?
+*.beam
+*.coverdata
+/ebin/
+/.erlang.mk/
+/rabbitmq_prelaunch.d
+/xrefr
+
+# Dialyzer
+*.plt
diff --git a/deps/rabbit/apps/rabbitmq_prelaunch/Makefile b/deps/rabbit/apps/rabbitmq_prelaunch/Makefile
new file mode 100644
index 0000000000..572f7703d4
--- /dev/null
+++ b/deps/rabbit/apps/rabbitmq_prelaunch/Makefile
@@ -0,0 +1,11 @@
+PROJECT = rabbitmq_prelaunch
+PROJECT_DESCRIPTION = RabbitMQ prelaunch setup
+PROJECT_VERSION = 1.0.0
+PROJECT_MOD = rabbit_prelaunch_app
+
+DEPS = rabbit_common lager
+
+DEP_PLUGINS = rabbit_common/mk/rabbitmq-build.mk
+
+include ../../rabbitmq-components.mk
+include ../../erlang.mk
diff --git a/deps/rabbit/apps/rabbitmq_prelaunch/src/rabbit_boot_state.erl b/deps/rabbit/apps/rabbitmq_prelaunch/src/rabbit_boot_state.erl
new file mode 100644
index 0000000000..c76824e7be
--- /dev/null
+++ b/deps/rabbit/apps/rabbitmq_prelaunch/src/rabbit_boot_state.erl
@@ -0,0 +1,76 @@
+%%%-------------------------------------------------------------------
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2019-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_boot_state).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-export([get/0,
+ set/1,
+ wait_for/2]).
+
+-define(PT_KEY_BOOT_STATE, {?MODULE, boot_state}).
+
+-type boot_state() :: 'stopped' | 'booting' | 'ready' | 'stopping'.
+
+-export_type([boot_state/0]).
+
+-spec get() -> boot_state().
+get() ->
+ persistent_term:get(?PT_KEY_BOOT_STATE, stopped).
+
+-spec set(boot_state()) -> ok.
+set(BootState) ->
+ rabbit_log_prelaunch:debug("Change boot state to `~s`", [BootState]),
+ ?assert(is_valid(BootState)),
+ case BootState of
+ stopped -> persistent_term:erase(?PT_KEY_BOOT_STATE);
+ _ -> persistent_term:put(?PT_KEY_BOOT_STATE, BootState)
+ end,
+ rabbit_boot_state_sup:notify_boot_state_listeners(BootState).
+
+-spec wait_for(boot_state(), timeout()) -> ok | {error, timeout}.
+wait_for(BootState, infinity) ->
+ case is_reached(BootState) of
+ true -> ok;
+ false -> Wait = 200,
+ timer:sleep(Wait),
+ wait_for(BootState, infinity)
+ end;
+wait_for(BootState, Timeout)
+ when is_integer(Timeout) andalso Timeout >= 0 ->
+ case is_reached(BootState) of
+ true -> ok;
+ false -> Wait = 200,
+ timer:sleep(Wait),
+ wait_for(BootState, Timeout - Wait)
+ end;
+wait_for(_, _) ->
+ {error, timeout}.
+
+boot_state_idx(stopped) -> 0;
+boot_state_idx(booting) -> 1;
+boot_state_idx(ready) -> 2;
+boot_state_idx(stopping) -> 3.
+
+is_valid(BootState) ->
+ is_integer(boot_state_idx(BootState)).
+
+is_reached(TargetBootState) ->
+ is_reached(?MODULE:get(), TargetBootState).
+
+is_reached(CurrentBootState, CurrentBootState) ->
+ true;
+is_reached(stopping, stopped) ->
+ false;
+is_reached(_CurrentBootState, stopped) ->
+ true;
+is_reached(stopped, _TargetBootState) ->
+ true;
+is_reached(CurrentBootState, TargetBootState) ->
+ boot_state_idx(TargetBootState) =< boot_state_idx(CurrentBootState).
diff --git a/deps/rabbit/apps/rabbitmq_prelaunch/src/rabbit_boot_state_sup.erl b/deps/rabbit/apps/rabbitmq_prelaunch/src/rabbit_boot_state_sup.erl
new file mode 100644
index 0000000000..fbdc5781fc
--- /dev/null
+++ b/deps/rabbit/apps/rabbitmq_prelaunch/src/rabbit_boot_state_sup.erl
@@ -0,0 +1,38 @@
+%%%-------------------------------------------------------------------
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_boot_state_sup).
+-behaviour(supervisor).
+
+-export([start_link/0,
+ init/1]).
+
+-export([notify_boot_state_listeners/1]).
+
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+init([]) ->
+ SystemdSpec = #{id => rabbit_boot_state_systemd,
+ start => {rabbit_boot_state_systemd, start_link, []},
+ restart => transient},
+ {ok, {#{strategy => one_for_one,
+ intensity => 1,
+ period => 5},
+ [SystemdSpec]}}.
+
+-spec notify_boot_state_listeners(rabbit_boot_state:boot_state()) -> ok.
+notify_boot_state_listeners(BootState) ->
+ lists:foreach(
+ fun
+ ({_, Child, _, _}) when is_pid(Child) ->
+ gen_server:cast(Child, {notify_boot_state, BootState});
+ (_) ->
+ ok
+ end,
+ supervisor:which_children(?MODULE)).
diff --git a/deps/rabbit/apps/rabbitmq_prelaunch/src/rabbit_boot_state_systemd.erl b/deps/rabbit/apps/rabbitmq_prelaunch/src/rabbit_boot_state_systemd.erl
new file mode 100644
index 0000000000..f838535b6a
--- /dev/null
+++ b/deps/rabbit/apps/rabbitmq_prelaunch/src/rabbit_boot_state_systemd.erl
@@ -0,0 +1,174 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2015-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_boot_state_systemd).
+
+-behaviour(gen_server).
+
+-export([start_link/0]).
+
+-export([init/1,
+ handle_call/3,
+ handle_cast/2,
+ terminate/2,
+ code_change/3]).
+
+-record(state, {mechanism,
+ sd_notify_module,
+ socket}).
+
+-define(LOG_PREFIX, "Boot state/systemd: ").
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+init([]) ->
+ case os:type() of
+ {unix, _} ->
+ case code:load_file(sd_notify) of
+ {module, sd_notify} ->
+ {ok, #state{mechanism = legacy,
+ sd_notify_module = sd_notify}};
+ {error, _} ->
+ case os:getenv("NOTIFY_SOCKET") of
+ false ->
+ ignore;
+ "" ->
+ ignore;
+ Socket ->
+ {ok, #state{mechanism = socat,
+ socket = Socket}}
+ end
+ end;
+ _ ->
+ ignore
+ end.
+
+handle_call(_Request, _From, State) ->
+ {noreply, State}.
+
+handle_cast({notify_boot_state, BootState}, State) ->
+ notify_boot_state(BootState, State),
+ {noreply, State}.
+
+terminate(normal, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%% Private
+
+notify_boot_state(ready = BootState,
+ #state{mechanism = legacy, sd_notify_module = SDNotify}) ->
+ rabbit_log_prelaunch:debug(
+ ?LOG_PREFIX "notifying of state `~s` (via native module)",
+ [BootState]),
+ sd_notify_legacy(SDNotify);
+notify_boot_state(ready = BootState,
+ #state{mechanism = socat, socket = Socket}) ->
+ rabbit_log_prelaunch:debug(
+ ?LOG_PREFIX "notifying of state `~s` (via socat(1))",
+ [BootState]),
+ sd_notify_socat(Socket);
+notify_boot_state(BootState, _) ->
+ rabbit_log_prelaunch:debug(
+ ?LOG_PREFIX "ignoring state `~s`",
+ [BootState]),
+ ok.
+
+sd_notify_message() ->
+ "READY=1\nSTATUS=Initialized\nMAINPID=" ++ os:getpid() ++ "\n".
+
+sd_notify_legacy(SDNotify) ->
+ SDNotify:sd_notify(0, sd_notify_message()).
+
+%% socat(1) is the most portable way the sd_notify could be
+%% implemented in erlang, without introducing some NIF. Currently the
+%% following issues prevent us from implementing it in a more
+%% reasonable way:
+%% - systemd-notify(1) is unstable for non-root users
+%% - erlang doesn't support unix domain sockets.
+%%
+%% Some details on how we ended with such a solution:
+%% https://github.com/rabbitmq/rabbitmq-server/issues/664
+sd_notify_socat(Socket) ->
+ case sd_current_unit() of
+ {ok, Unit} ->
+ rabbit_log_prelaunch:debug(
+ ?LOG_PREFIX "systemd unit for activation check: \"~s\"~n",
+ [Unit]),
+ sd_notify_socat(Socket, Unit);
+ _ ->
+ ok
+ end.
+
+sd_notify_socat(Socket, Unit) ->
+ try sd_open_port(Socket) of
+ Port ->
+ Port ! {self(), {command, sd_notify_message()}},
+ Result = sd_wait_activation(Port, Unit),
+ port_close(Port),
+ Result
+ catch
+ Class:Reason ->
+ rabbit_log_prelaunch:debug(
+ ?LOG_PREFIX "Failed to start socat(1): ~p:~p~n",
+ [Class, Reason]),
+ false
+ end.
+
+sd_current_unit() ->
+ CmdOut = os:cmd("ps -o unit= -p " ++ os:getpid()),
+ Ret = (catch re:run(CmdOut,
+ "([-.@0-9a-zA-Z]+)",
+ [unicode, {capture, all_but_first, list}])),
+ case Ret of
+ {'EXIT', _} -> error;
+ {match, [Unit]} -> {ok, Unit};
+ _ -> error
+ end.
+
+socat_socket_arg("@" ++ AbstractUnixSocket) ->
+ "abstract-sendto:" ++ AbstractUnixSocket;
+socat_socket_arg(UnixSocket) ->
+ "unix-sendto:" ++ UnixSocket.
+
+sd_open_port(Socket) ->
+ open_port(
+ {spawn_executable, os:find_executable("socat")},
+ [{args, [socat_socket_arg(Socket), "STDIO"]},
+ use_stdio, out]).
+
+sd_wait_activation(Port, Unit) ->
+ case os:find_executable("systemctl") of
+ false ->
+ rabbit_log_prelaunch:debug(
+ ?LOG_PREFIX "systemctl(1) unavailable, falling back to sleep~n"),
+ timer:sleep(5000),
+ ok;
+ _ ->
+ sd_wait_activation(Port, Unit, 10)
+ end.
+
+sd_wait_activation(_, _, 0) ->
+ rabbit_log_prelaunch:debug(
+ ?LOG_PREFIX "service still in 'activating' state, bailing out~n"),
+ ok;
+sd_wait_activation(Port, Unit, AttemptsLeft) ->
+ Ret = os:cmd("systemctl show --property=ActiveState -- '" ++ Unit ++ "'"),
+ case Ret of
+ "ActiveState=activating\n" ->
+ timer:sleep(1000),
+ sd_wait_activation(Port, Unit, AttemptsLeft - 1);
+ "ActiveState=" ++ _ ->
+ ok;
+ _ = Err ->
+ rabbit_log_prelaunch:debug(
+ ?LOG_PREFIX "unexpected status from systemd: ~p~n", [Err]),
+ ok
+ end.
diff --git a/deps/rabbit/apps/rabbitmq_prelaunch/src/rabbit_prelaunch.erl b/deps/rabbit/apps/rabbitmq_prelaunch/src/rabbit_prelaunch.erl
new file mode 100644
index 0000000000..b6b29481c7
--- /dev/null
+++ b/deps/rabbit/apps/rabbitmq_prelaunch/src/rabbit_prelaunch.erl
@@ -0,0 +1,228 @@
+-module(rabbit_prelaunch).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-export([run_prelaunch_first_phase/0,
+ assert_mnesia_is_stopped/0,
+ get_context/0,
+ get_stop_reason/0,
+ set_stop_reason/1,
+ clear_stop_reason/0,
+ is_initial_pass/0,
+ initial_pass_finished/0,
+ shutdown_func/1]).
+
+-ifdef(TEST).
+-export([store_context/1,
+ clear_context_cache/0]).
+-endif.
+
+-define(PT_KEY_CONTEXT, {?MODULE, context}).
+-define(PT_KEY_INITIAL_PASS, {?MODULE, initial_pass_finished}).
+-define(PT_KEY_SHUTDOWN_FUNC, {?MODULE, chained_shutdown_func}).
+-define(PT_KEY_STOP_REASON, {?MODULE, stop_reason}).
+
+run_prelaunch_first_phase() ->
+ try
+ do_run()
+ catch
+ throw:{error, _} = Error ->
+ rabbit_prelaunch_errors:log_error(Error),
+ set_stop_reason(Error),
+ rabbit_boot_state:set(stopped),
+ Error;
+ Class:Exception:Stacktrace ->
+ rabbit_prelaunch_errors:log_exception(
+ Class, Exception, Stacktrace),
+ Error = {error, Exception},
+ set_stop_reason(Error),
+ rabbit_boot_state:set(stopped),
+ Error
+ end.
+
+do_run() ->
+ %% Indicate RabbitMQ is booting.
+ clear_stop_reason(),
+ rabbit_boot_state:set(booting),
+
+ %% Configure dbg if requested.
+ rabbit_prelaunch_early_logging:enable_quick_dbg(rabbit_env:dbg_config()),
+
+ %% Setup signal handler.
+ ok = rabbit_prelaunch_sighandler:setup(),
+
+ %% We assert Mnesia is stopped before we run the prelaunch
+ %% phases.
+ %%
+ %% We need this because our cluster consistency check (in the second
+ %% phase) depends on Mnesia not being started before it has a chance
+ %% to run.
+ %%
+ %% Also, in the initial pass, we don't want Mnesia to run before
+ %% Erlang distribution is configured.
+ assert_mnesia_is_stopped(),
+
+ %% Get informations to setup logging.
+ Context0 = rabbit_env:get_context_before_logging_init(),
+ ?assertMatch(#{}, Context0),
+
+ %% Setup logging for the prelaunch phase.
+ ok = rabbit_prelaunch_early_logging:setup_early_logging(Context0, true),
+
+ IsInitialPass = is_initial_pass(),
+ case IsInitialPass of
+ true ->
+ rabbit_log_prelaunch:debug(""),
+ rabbit_log_prelaunch:debug(
+ "== Prelaunch phase [1/2] (initial pass) =="),
+ rabbit_log_prelaunch:debug("");
+ false ->
+ rabbit_log_prelaunch:debug(""),
+ rabbit_log_prelaunch:debug("== Prelaunch phase [1/2] =="),
+ rabbit_log_prelaunch:debug("")
+ end,
+ rabbit_env:log_process_env(),
+
+ %% Load rabbitmq-env.conf, redo logging setup and continue.
+ Context1 = rabbit_env:get_context_after_logging_init(Context0),
+ ?assertMatch(#{}, Context1),
+ ok = rabbit_prelaunch_early_logging:setup_early_logging(Context1, true),
+ rabbit_env:log_process_env(),
+
+ %% Complete context now that we have the final environment loaded.
+ Context2 = rabbit_env:get_context_after_reloading_env(Context1),
+ ?assertMatch(#{}, Context2),
+ store_context(Context2),
+ rabbit_env:log_context(Context2),
+ ok = setup_shutdown_func(),
+
+ Context = Context2#{initial_pass => IsInitialPass},
+
+ rabbit_env:context_to_code_path(Context),
+ rabbit_env:context_to_app_env_vars(Context),
+
+ %% 1. Erlang/OTP compatibility check.
+ ok = rabbit_prelaunch_erlang_compat:check(Context),
+
+ %% 2. Configuration check + loading.
+ ok = rabbit_prelaunch_conf:setup(Context),
+
+ %% 3. Erlang distribution check + start.
+ ok = rabbit_prelaunch_dist:setup(Context),
+
+ %% 4. Write PID file.
+ rabbit_log_prelaunch:debug(""),
+ _ = write_pid_file(Context),
+ ignore.
+
+assert_mnesia_is_stopped() ->
+ ?assertNot(lists:keymember(mnesia, 1, application:which_applications())).
+
+store_context(Context) when is_map(Context) ->
+ persistent_term:put(?PT_KEY_CONTEXT, Context).
+
+get_context() ->
+ case persistent_term:get(?PT_KEY_CONTEXT, undefined) of
+ undefined -> undefined;
+ Context -> Context#{initial_pass => is_initial_pass()}
+ end.
+
+-ifdef(TEST).
+clear_context_cache() ->
+ persistent_term:erase(?PT_KEY_CONTEXT).
+-endif.
+
+get_stop_reason() ->
+ persistent_term:get(?PT_KEY_STOP_REASON, undefined).
+
+set_stop_reason(Reason) ->
+ case get_stop_reason() of
+ undefined ->
+ rabbit_log_prelaunch:debug("Set stop reason to: ~p", [Reason]),
+ persistent_term:put(?PT_KEY_STOP_REASON, Reason);
+ _ ->
+ ok
+ end.
+
+clear_stop_reason() ->
+ persistent_term:erase(?PT_KEY_STOP_REASON).
+
+is_initial_pass() ->
+ not persistent_term:get(?PT_KEY_INITIAL_PASS, false).
+
+initial_pass_finished() ->
+ persistent_term:put(?PT_KEY_INITIAL_PASS, true).
+
+setup_shutdown_func() ->
+ ThisMod = ?MODULE,
+ ThisFunc = shutdown_func,
+ ExistingShutdownFunc = application:get_env(kernel, shutdown_func),
+ case ExistingShutdownFunc of
+ {ok, {ThisMod, ThisFunc}} ->
+ ok;
+ {ok, {ExistingMod, ExistingFunc}} ->
+ rabbit_log_prelaunch:debug(
+ "Setting up kernel shutdown function: ~s:~s/1 "
+ "(chained with ~s:~s/1)",
+ [ThisMod, ThisFunc, ExistingMod, ExistingFunc]),
+ ok = persistent_term:put(
+ ?PT_KEY_SHUTDOWN_FUNC,
+ ExistingShutdownFunc),
+ ok = record_kernel_shutdown_func(ThisMod, ThisFunc);
+ _ ->
+ rabbit_log_prelaunch:debug(
+ "Setting up kernel shutdown function: ~s:~s/1",
+ [ThisMod, ThisFunc]),
+ ok = record_kernel_shutdown_func(ThisMod, ThisFunc)
+ end.
+
+record_kernel_shutdown_func(Mod, Func) ->
+ application:set_env(
+ kernel, shutdown_func, {Mod, Func},
+ [{persistent, true}]).
+
+shutdown_func(Reason) ->
+ rabbit_log_prelaunch:debug(
+ "Running ~s:shutdown_func() as part of `kernel` shutdown", [?MODULE]),
+ Context = get_context(),
+ remove_pid_file(Context),
+ ChainedShutdownFunc = persistent_term:get(
+ ?PT_KEY_SHUTDOWN_FUNC,
+ undefined),
+ case ChainedShutdownFunc of
+ {ChainedMod, ChainedFunc} -> ChainedMod:ChainedFunc(Reason);
+ _ -> ok
+ end.
+
+write_pid_file(#{pid_file := PidFile}) ->
+ rabbit_log_prelaunch:debug("Writing PID file: ~s", [PidFile]),
+ case filelib:ensure_dir(PidFile) of
+ ok ->
+ OSPid = os:getpid(),
+ case file:write_file(PidFile, OSPid) of
+ ok ->
+ ok;
+ {error, Reason} = Error ->
+ rabbit_log_prelaunch:warning(
+ "Failed to write PID file \"~s\": ~s",
+ [PidFile, file:format_error(Reason)]),
+ Error
+ end;
+ {error, Reason} = Error ->
+ rabbit_log_prelaunch:warning(
+ "Failed to create PID file \"~s\" directory: ~s",
+ [PidFile, file:format_error(Reason)]),
+ Error
+ end;
+write_pid_file(_) ->
+ ok.
+
+remove_pid_file(#{pid_file := PidFile, keep_pid_file_on_exit := true}) ->
+ rabbit_log_prelaunch:debug("Keeping PID file: ~s", [PidFile]),
+ ok;
+remove_pid_file(#{pid_file := PidFile}) ->
+ rabbit_log_prelaunch:debug("Deleting PID file: ~s", [PidFile]),
+ _ = file:delete(PidFile),
+ ok;
+remove_pid_file(_) ->
+ ok.
diff --git a/deps/rabbit/apps/rabbitmq_prelaunch/src/rabbit_prelaunch_app.erl b/deps/rabbit/apps/rabbitmq_prelaunch/src/rabbit_prelaunch_app.erl
new file mode 100644
index 0000000000..cef7f05e77
--- /dev/null
+++ b/deps/rabbit/apps/rabbitmq_prelaunch/src/rabbit_prelaunch_app.erl
@@ -0,0 +1,11 @@
+-module(rabbit_prelaunch_app).
+-behaviour(application).
+
+-export([start/2]).
+-export([stop/1]).
+
+start(_Type, _Args) ->
+ rabbit_prelaunch_sup:start_link().
+
+stop(_State) ->
+ ok.
diff --git a/deps/rabbit/apps/rabbitmq_prelaunch/src/rabbit_prelaunch_conf.erl b/deps/rabbit/apps/rabbitmq_prelaunch/src/rabbit_prelaunch_conf.erl
new file mode 100644
index 0000000000..fbbae7a185
--- /dev/null
+++ b/deps/rabbit/apps/rabbitmq_prelaunch/src/rabbit_prelaunch_conf.erl
@@ -0,0 +1,520 @@
+-module(rabbit_prelaunch_conf).
+
+-include_lib("kernel/include/file.hrl").
+-include_lib("stdlib/include/zip.hrl").
+
+-include_lib("rabbit_common/include/rabbit.hrl").
+
+-export([setup/1,
+ get_config_state/0,
+ generate_config_from_cuttlefish_files/3,
+ decrypt_config/1]).
+
+-ifdef(TEST).
+-export([decrypt_config/2]).
+-endif.
+
+setup(Context) ->
+ rabbit_log_prelaunch:debug(""),
+ rabbit_log_prelaunch:debug("== Configuration =="),
+
+ %% TODO: Check if directories/files are inside Mnesia dir.
+
+ ok = set_default_config(),
+
+ AdditionalConfigFiles = find_additional_config_files(Context),
+ AdvancedConfigFile = find_actual_advanced_config_file(Context),
+ State = case find_actual_main_config_file(Context) of
+ {MainConfigFile, erlang} ->
+ Config = load_cuttlefish_config_file(Context,
+ AdditionalConfigFiles,
+ MainConfigFile),
+ Apps = [App || {App, _} <- Config],
+ decrypt_config(Apps),
+ #{config_files => AdditionalConfigFiles,
+ config_advanced_file => MainConfigFile};
+ {MainConfigFile, cuttlefish} ->
+ ConfigFiles = [MainConfigFile | AdditionalConfigFiles],
+ Config = load_cuttlefish_config_file(Context,
+ ConfigFiles,
+ AdvancedConfigFile),
+ Apps = [App || {App, _} <- Config],
+ decrypt_config(Apps),
+ #{config_files => ConfigFiles,
+ config_advanced_file => AdvancedConfigFile};
+ undefined when AdditionalConfigFiles =/= [] ->
+ ConfigFiles = AdditionalConfigFiles,
+ Config = load_cuttlefish_config_file(Context,
+ ConfigFiles,
+ AdvancedConfigFile),
+ Apps = [App || {App, _} <- Config],
+ decrypt_config(Apps),
+ #{config_files => ConfigFiles,
+ config_advanced_file => AdvancedConfigFile};
+ undefined when AdvancedConfigFile =/= undefined ->
+ rabbit_log_prelaunch:warning(
+ "Using RABBITMQ_ADVANCED_CONFIG_FILE: ~s",
+ [AdvancedConfigFile]),
+ Config = load_cuttlefish_config_file(Context,
+ AdditionalConfigFiles,
+ AdvancedConfigFile),
+ Apps = [App || {App, _} <- Config],
+ decrypt_config(Apps),
+ #{config_files => AdditionalConfigFiles,
+ config_advanced_file => AdvancedConfigFile};
+ undefined ->
+ #{config_files => [],
+ config_advanced_file => undefined}
+ end,
+ ok = override_with_hard_coded_critical_config(),
+ ok = set_credentials_obfuscation_secret(),
+ rabbit_log_prelaunch:debug(
+ "Saving config state to application env: ~p", [State]),
+ store_config_state(State).
+
+store_config_state(ConfigState) ->
+ persistent_term:put({rabbitmq_prelaunch, config_state}, ConfigState).
+
+get_config_state() ->
+ persistent_term:get({rabbitmq_prelaunch, config_state}, undefined).
+
+%% -------------------------------------------------------------------
+%% Configuration loading.
+%% -------------------------------------------------------------------
+
+set_default_config() ->
+ rabbit_log_prelaunch:debug("Setting default config"),
+ Config = [
+ {ra,
+ [
+ {wal_max_size_bytes, 536870912}, %% 5 * 2 ^ 20
+ {wal_max_batch_size, 4096}
+ ]},
+ {aten,
+ [
+ %% a greater poll interval has shown to trigger fewer false
+ %% positive leader elections in quorum queues. The cost is slightly
+ %% longer detection time when a genuine network issue occurs.
+ %% Ra still uses erlang monitors of course so whenever a connection
+ %% goes down it is still immediately detected
+ {poll_interval, 5000}
+ ]},
+ {sysmon_handler,
+ [{process_limit, 100},
+ {port_limit, 100},
+ {gc_ms_limit, 0},
+ {schedule_ms_limit, 0},
+ {heap_word_limit, 0},
+ {busy_port, false},
+ {busy_dist_port, true}]}
+ ],
+ apply_erlang_term_based_config(Config).
+
+find_actual_main_config_file(#{main_config_file := File}) ->
+ case filelib:is_regular(File) of
+ true ->
+ Format = case filename:extension(File) of
+ ".conf" -> cuttlefish;
+ ".config" -> erlang;
+ _ -> determine_config_format(File)
+ end,
+ {File, Format};
+ false ->
+ OldFormatFile = File ++ ".config",
+ NewFormatFile = File ++ ".conf",
+ case filelib:is_regular(OldFormatFile) of
+ true ->
+ case filelib:is_regular(NewFormatFile) of
+ true ->
+ rabbit_log_prelaunch:warning(
+ "Both old (.config) and new (.conf) format "
+ "config files exist."),
+ rabbit_log_prelaunch:warning(
+ "Using the old format config file: ~s",
+ [OldFormatFile]),
+ rabbit_log_prelaunch:warning(
+ "Please update your config files to the new "
+ "format and remove the old file."),
+ ok;
+ false ->
+ ok
+ end,
+ {OldFormatFile, erlang};
+ false ->
+ case filelib:is_regular(NewFormatFile) of
+ true -> {NewFormatFile, cuttlefish};
+ false -> undefined
+ end
+ end
+ end.
+
+find_additional_config_files(#{additional_config_files := Pattern})
+ when Pattern =/= undefined ->
+ Pattern1 = case filelib:is_dir(Pattern) of
+ true -> filename:join(Pattern, "*");
+ false -> Pattern
+ end,
+ OnlyFiles = [File ||
+ File <- filelib:wildcard(Pattern1),
+ filelib:is_regular(File)],
+ lists:sort(OnlyFiles);
+find_additional_config_files(_) ->
+ [].
+
+find_actual_advanced_config_file(#{advanced_config_file := File}) ->
+ case filelib:is_regular(File) of
+ true -> File;
+ false -> undefined
+ end.
+
+determine_config_format(File) ->
+ case filelib:file_size(File) of
+ 0 ->
+ cuttlefish;
+ _ ->
+ case file:consult(File) of
+ {ok, _} -> erlang;
+ _ -> cuttlefish
+ end
+ end.
+
+load_cuttlefish_config_file(Context,
+ ConfigFiles,
+ AdvancedConfigFile) ->
+ Config = generate_config_from_cuttlefish_files(
+ Context, ConfigFiles, AdvancedConfigFile),
+ apply_erlang_term_based_config(Config),
+ Config.
+
+generate_config_from_cuttlefish_files(Context,
+ ConfigFiles,
+ AdvancedConfigFile) ->
+ %% Load schemas.
+ SchemaFiles = find_cuttlefish_schemas(Context),
+ case SchemaFiles of
+ [] ->
+ rabbit_log_prelaunch:error(
+ "No configuration schema found~n", []),
+ throw({error, no_configuration_schema_found});
+ _ ->
+ rabbit_log_prelaunch:debug(
+ "Configuration schemas found:~n", []),
+ lists:foreach(
+ fun(SchemaFile) ->
+ rabbit_log_prelaunch:debug(" - ~ts", [SchemaFile])
+ end,
+ SchemaFiles),
+ ok
+ end,
+ Schema = cuttlefish_schema:files(SchemaFiles),
+
+ %% Load configuration.
+ rabbit_log_prelaunch:debug(
+ "Loading configuration files (Cuttlefish based):"),
+ lists:foreach(
+ fun(ConfigFile) ->
+ rabbit_log_prelaunch:debug(" - ~ts", [ConfigFile])
+ end, ConfigFiles),
+ case cuttlefish_conf:files(ConfigFiles) of
+ {errorlist, Errors} ->
+ rabbit_log_prelaunch:error("Error parsing configuration:"),
+ lists:foreach(
+ fun(Error) ->
+ rabbit_log_prelaunch:error(
+ " - ~ts",
+ [cuttlefish_error:xlate(Error)])
+ end, Errors),
+ rabbit_log_prelaunch:error(
+ "Are these files using the Cuttlefish format?"),
+ throw({error, failed_to_parse_configuration_file});
+ Config0 ->
+ %% Finalize configuration, based on the schema.
+ Config = case cuttlefish_generator:map(Schema, Config0) of
+ {error, Phase, {errorlist, Errors}} ->
+ %% TODO
+ rabbit_log_prelaunch:error(
+ "Error preparing configuration in phase ~ts:",
+ [Phase]),
+ lists:foreach(
+ fun(Error) ->
+ rabbit_log_prelaunch:error(
+ " - ~ts",
+ [cuttlefish_error:xlate(Error)])
+ end, Errors),
+ throw(
+ {error, failed_to_prepare_configuration});
+ ValidConfig ->
+ proplists:delete(vm_args, ValidConfig)
+ end,
+
+ %% Apply advanced configuration overrides, if any.
+ override_with_advanced_config(Config, AdvancedConfigFile)
+ end.
+
+find_cuttlefish_schemas(Context) ->
+ Apps = list_apps(Context),
+ rabbit_log_prelaunch:debug(
+ "Looking up configuration schemas in the following applications:"),
+ find_cuttlefish_schemas(Apps, []).
+
+find_cuttlefish_schemas([App | Rest], AllSchemas) ->
+ Schemas = list_schemas_in_app(App),
+ find_cuttlefish_schemas(Rest, AllSchemas ++ Schemas);
+find_cuttlefish_schemas([], AllSchemas) ->
+ lists:sort(fun(A,B) -> A < B end, AllSchemas).
+
+list_apps(#{os_type := {win32, _}, plugins_path := PluginsPath}) ->
+ PluginsDirs = string:lexemes(PluginsPath, ";"),
+ list_apps1(PluginsDirs, []);
+list_apps(#{plugins_path := PluginsPath}) ->
+ PluginsDirs = string:lexemes(PluginsPath, ":"),
+ list_apps1(PluginsDirs, []).
+
+
+list_apps1([Dir | Rest], Apps) ->
+ case file:list_dir(Dir) of
+ {ok, Filenames} ->
+ NewApps = [list_to_atom(
+ hd(
+ string:split(filename:basename(F, ".ex"), "-")))
+ || F <- Filenames],
+ Apps1 = lists:umerge(Apps, lists:sort(NewApps)),
+ list_apps1(Rest, Apps1);
+ {error, Reason} ->
+ rabbit_log_prelaunch:debug(
+ "Failed to list directory \"~ts\" content: ~ts",
+ [Dir, file:format_error(Reason)]),
+ list_apps1(Rest, Apps)
+ end;
+list_apps1([], AppInfos) ->
+ AppInfos.
+
+list_schemas_in_app(App) ->
+ {Loaded, Unload} = case application:load(App) of
+ ok -> {true, true};
+ {error, {already_loaded, _}} -> {true, false};
+ {error, Reason} -> {Reason, false}
+ end,
+ List = case Loaded of
+ true ->
+ case code:priv_dir(App) of
+ {error, bad_name} ->
+ rabbit_log_prelaunch:debug(
+ " [ ] ~s (no readable priv dir)", [App]),
+ [];
+ PrivDir ->
+ SchemaDir = filename:join([PrivDir, "schema"]),
+ do_list_schemas_in_app(App, SchemaDir)
+ end;
+ Reason1 ->
+ rabbit_log_prelaunch:debug(
+ " [ ] ~s (failed to load application: ~p)",
+ [App, Reason1]),
+ []
+ end,
+ case Unload of
+ true -> _ = application:unload(App),
+ ok;
+ false -> ok
+ end,
+ List.
+
+do_list_schemas_in_app(App, SchemaDir) ->
+ case erl_prim_loader:list_dir(SchemaDir) of
+ {ok, Files} ->
+ rabbit_log_prelaunch:debug(" [x] ~s", [App]),
+ [filename:join(SchemaDir, File)
+ || [C | _] = File <- Files,
+ C =/= $.];
+ error ->
+ rabbit_log_prelaunch:debug(
+ " [ ] ~s (no readable schema dir)", [App]),
+ []
+ end.
+
+override_with_advanced_config(Config, undefined) ->
+ Config;
+override_with_advanced_config(Config, AdvancedConfigFile) ->
+ rabbit_log_prelaunch:debug(
+ "Override with advanced configuration file \"~ts\"",
+ [AdvancedConfigFile]),
+ case file:consult(AdvancedConfigFile) of
+ {ok, [AdvancedConfig]} ->
+ cuttlefish_advanced:overlay(Config, AdvancedConfig);
+ {ok, OtherTerms} ->
+ rabbit_log_prelaunch:error(
+ "Failed to load advanced configuration file \"~ts\", "
+ "incorrect format: ~p",
+ [AdvancedConfigFile, OtherTerms]),
+ throw({error, failed_to_parse_advanced_configuration_file});
+ {error, Reason} ->
+ rabbit_log_prelaunch:error(
+ "Failed to load advanced configuration file \"~ts\": ~ts",
+ [AdvancedConfigFile, file:format_error(Reason)]),
+ throw({error, failed_to_read_advanced_configuration_file})
+ end.
+
+override_with_hard_coded_critical_config() ->
+ rabbit_log_prelaunch:debug("Override with hard-coded critical config"),
+ Config = [
+ {ra,
+ %% Make Ra use a custom logger that dispatches to lager
+ %% instead of the default OTP logger
+ [{logger_module, rabbit_log_ra_shim}]},
+ {osiris,
+ [{logger_module, rabbit_log_osiris_shim}]}
+ ],
+ apply_erlang_term_based_config(Config).
+
+apply_erlang_term_based_config([{_, []} | Rest]) ->
+ apply_erlang_term_based_config(Rest);
+apply_erlang_term_based_config([{App, Vars} | Rest]) ->
+ rabbit_log_prelaunch:debug(" Applying configuration for '~s':", [App]),
+ ok = apply_app_env_vars(App, Vars),
+ apply_erlang_term_based_config(Rest);
+apply_erlang_term_based_config([]) ->
+ ok.
+
+apply_app_env_vars(App, [{Var, Value} | Rest]) ->
+ rabbit_log_prelaunch:debug(" - ~s = ~p", [Var, Value]),
+ ok = application:set_env(App, Var, Value, [{persistent, true}]),
+ apply_app_env_vars(App, Rest);
+apply_app_env_vars(_, []) ->
+ ok.
+
+set_credentials_obfuscation_secret() ->
+ rabbit_log_prelaunch:debug(
+ "Refreshing credentials obfuscation configuration from env: ~p",
+ [application:get_all_env(credentials_obfuscation)]),
+ ok = credentials_obfuscation:refresh_config(),
+ CookieBin = rabbit_data_coercion:to_binary(erlang:get_cookie()),
+ rabbit_log_prelaunch:debug(
+ "Setting credentials obfuscation secret to '~s'", [CookieBin]),
+ ok = credentials_obfuscation:set_secret(CookieBin).
+
+%% -------------------------------------------------------------------
+%% Config decryption.
+%% -------------------------------------------------------------------
+
+decrypt_config(Apps) ->
+ rabbit_log_prelaunch:debug("Decoding encrypted config values (if any)"),
+ ConfigEntryDecoder = application:get_env(rabbit, config_entry_decoder, []),
+ decrypt_config(Apps, ConfigEntryDecoder).
+
+decrypt_config([], _) ->
+ ok;
+decrypt_config([App | Apps], Algo) ->
+ Algo1 = decrypt_app(App, application:get_all_env(App), Algo),
+ decrypt_config(Apps, Algo1).
+
+decrypt_app(_, [], Algo) ->
+ Algo;
+decrypt_app(App, [{Key, Value} | Tail], Algo) ->
+ Algo2 = try
+ case decrypt(Value, Algo) of
+ {Value, Algo1} ->
+ Algo1;
+ {NewValue, Algo1} ->
+ rabbit_log_prelaunch:debug(
+ "Value of `~s` decrypted", [Key]),
+ ok = application:set_env(App, Key, NewValue,
+ [{persistent, true}]),
+ Algo1
+ end
+ catch
+ throw:{bad_config_entry_decoder, _} = Error ->
+ throw(Error);
+ _:Msg ->
+ throw({config_decryption_error, {key, Key}, Msg})
+ end,
+ decrypt_app(App, Tail, Algo2).
+
+decrypt({encrypted, _} = EncValue,
+ {Cipher, Hash, Iterations, PassPhrase} = Algo) ->
+ {rabbit_pbe:decrypt_term(Cipher, Hash, Iterations, PassPhrase, EncValue),
+ Algo};
+decrypt({encrypted, _} = EncValue,
+ ConfigEntryDecoder)
+ when is_list(ConfigEntryDecoder) ->
+ Algo = config_entry_decoder_to_algo(ConfigEntryDecoder),
+ decrypt(EncValue, Algo);
+decrypt(List, Algo) when is_list(List) ->
+ decrypt_list(List, Algo, []);
+decrypt(Value, Algo) ->
+ {Value, Algo}.
+
+%% We make no distinction between strings and other lists.
+%% When we receive a string, we loop through each element
+%% and ultimately return the string unmodified, as intended.
+decrypt_list([], Algo, Acc) ->
+ {lists:reverse(Acc), Algo};
+decrypt_list([{Key, Value} | Tail], Algo, Acc)
+ when Key =/= encrypted ->
+ {Value1, Algo1} = decrypt(Value, Algo),
+ decrypt_list(Tail, Algo1, [{Key, Value1} | Acc]);
+decrypt_list([Value | Tail], Algo, Acc) ->
+ {Value1, Algo1} = decrypt(Value, Algo),
+ decrypt_list(Tail, Algo1, [Value1 | Acc]).
+
+config_entry_decoder_to_algo(ConfigEntryDecoder) ->
+ case get_passphrase(ConfigEntryDecoder) of
+ undefined ->
+ throw({bad_config_entry_decoder, missing_passphrase});
+ PassPhrase ->
+ {
+ proplists:get_value(
+ cipher, ConfigEntryDecoder, rabbit_pbe:default_cipher()),
+ proplists:get_value(
+ hash, ConfigEntryDecoder, rabbit_pbe:default_hash()),
+ proplists:get_value(
+ iterations, ConfigEntryDecoder,
+ rabbit_pbe:default_iterations()),
+ PassPhrase
+ }
+ end.
+
+get_passphrase(ConfigEntryDecoder) ->
+ rabbit_log_prelaunch:debug("Getting encrypted config passphrase"),
+ case proplists:get_value(passphrase, ConfigEntryDecoder) of
+ prompt ->
+ IoDevice = get_input_iodevice(),
+ ok = io:setopts(IoDevice, [{echo, false}]),
+ PP = lists:droplast(io:get_line(IoDevice,
+ "\nPlease enter the passphrase to unlock encrypted "
+ "configuration entries.\n\nPassphrase: ")),
+ ok = io:setopts(IoDevice, [{echo, true}]),
+ io:format(IoDevice, "~n", []),
+ PP;
+ {file, Filename} ->
+ {ok, File} = file:read_file(Filename),
+ [PP|_] = binary:split(File, [<<"\r\n">>, <<"\n">>]),
+ PP;
+ PP ->
+ PP
+ end.
+
+%% This function retrieves the correct IoDevice for requesting
+%% input. The problem with using the default IoDevice is that
+%% the Erlang shell prevents us from getting the input.
+%%
+%% Instead we therefore look for the io process used by the
+%% shell and if it can't be found (because the shell is not
+%% started e.g with -noshell) we use the 'user' process.
+%%
+%% This function will not work when either -oldshell or -noinput
+%% options are passed to erl.
+get_input_iodevice() ->
+ case whereis(user) of
+ undefined ->
+ user;
+ User ->
+ case group:interfaces(User) of
+ [] ->
+ user;
+ [{user_drv, Drv}] ->
+ case user_drv:interfaces(Drv) of
+ [] -> user;
+ [{current_group, IoDevice}] -> IoDevice
+ end
+ end
+ end.
diff --git a/deps/rabbit/apps/rabbitmq_prelaunch/src/rabbit_prelaunch_dist.erl b/deps/rabbit/apps/rabbitmq_prelaunch/src/rabbit_prelaunch_dist.erl
new file mode 100644
index 0000000000..3d718438a7
--- /dev/null
+++ b/deps/rabbit/apps/rabbitmq_prelaunch/src/rabbit_prelaunch_dist.erl
@@ -0,0 +1,104 @@
+-module(rabbit_prelaunch_dist).
+
+-export([setup/1]).
+
+setup(#{nodename := Node, nodename_type := NameType} = Context) ->
+ rabbit_log_prelaunch:debug(""),
+ rabbit_log_prelaunch:debug("== Erlang distribution =="),
+ rabbit_log_prelaunch:debug("Rqeuested node name: ~s (type: ~s)",
+ [Node, NameType]),
+ case node() of
+ nonode@nohost ->
+ ok = rabbit_nodes_common:ensure_epmd(),
+ ok = dist_port_range_check(Context),
+ ok = dist_port_use_check(Context),
+ ok = duplicate_node_check(Context),
+
+ ok = do_setup(Context);
+ Node ->
+ rabbit_log_prelaunch:debug(
+ "Erlang distribution already running", []),
+ ok;
+ Unexpected ->
+ throw({error, {erlang_dist_running_with_unexpected_nodename,
+ Unexpected, Node}})
+ end,
+ ok.
+
+do_setup(#{nodename := Node, nodename_type := NameType}) ->
+ rabbit_log_prelaunch:debug("Starting Erlang distribution", []),
+ case application:get_env(kernel, net_ticktime) of
+ {ok, Ticktime} when is_integer(Ticktime) andalso Ticktime >= 1 ->
+ %% The value passed to net_kernel:start/1 is the
+ %% "minimum transition traffic interval" as defined in
+ %% net_kernel:set_net_ticktime/1.
+ MTTI = Ticktime * 1000 div 4,
+ {ok, _} = net_kernel:start([Node, NameType, MTTI]),
+ ok;
+ _ ->
+ {ok, _} = net_kernel:start([Node, NameType]),
+ ok
+ end,
+ ok.
+
+%% Check whether a node with the same name is already running
+duplicate_node_check(#{split_nodename := {NodeName, NodeHost}}) ->
+ rabbit_log_prelaunch:debug(
+ "Checking if node name ~s is already used", [NodeName]),
+ PrelaunchName = rabbit_nodes_common:make(
+ {NodeName ++ "_prelaunch_" ++ os:getpid(),
+ "localhost"}),
+ {ok, _} = net_kernel:start([PrelaunchName, shortnames]),
+ case rabbit_nodes_common:names(NodeHost) of
+ {ok, NamePorts} ->
+ case proplists:is_defined(NodeName, NamePorts) of
+ true ->
+ throw({error, {duplicate_node_name, NodeName, NodeHost}});
+ false ->
+ ok = net_kernel:stop(),
+ ok
+ end;
+ {error, EpmdReason} ->
+ throw({error, {epmd_error, NodeHost, EpmdReason}})
+ end.
+
+dist_port_range_check(#{erlang_dist_tcp_port := DistTcpPort}) ->
+ rabbit_log_prelaunch:debug(
+ "Checking if TCP port ~b is valid", [DistTcpPort]),
+ case DistTcpPort of
+ _ when DistTcpPort < 1 orelse DistTcpPort > 65535 ->
+ throw({error, {invalid_dist_port_range, DistTcpPort}});
+ _ ->
+ ok
+ end.
+
+dist_port_use_check(#{split_nodename := {_, NodeHost},
+ erlang_dist_tcp_port := DistTcpPort}) ->
+ rabbit_log_prelaunch:debug(
+ "Checking if TCP port ~b is available", [DistTcpPort]),
+ dist_port_use_check_ipv4(NodeHost, DistTcpPort).
+
+dist_port_use_check_ipv4(NodeHost, Port) ->
+ case gen_tcp:listen(Port, [inet, {reuseaddr, true}]) of
+ {ok, Sock} -> gen_tcp:close(Sock);
+ {error, einval} -> dist_port_use_check_ipv6(NodeHost, Port);
+ {error, _} -> dist_port_use_check_fail(Port, NodeHost)
+ end.
+
+dist_port_use_check_ipv6(NodeHost, Port) ->
+ case gen_tcp:listen(Port, [inet6, {reuseaddr, true}]) of
+ {ok, Sock} -> gen_tcp:close(Sock);
+ {error, _} -> dist_port_use_check_fail(Port, NodeHost)
+ end.
+
+-spec dist_port_use_check_fail(non_neg_integer(), string()) ->
+ no_return().
+
+dist_port_use_check_fail(Port, Host) ->
+ {ok, Names} = rabbit_nodes_common:names(Host),
+ case [N || {N, P} <- Names, P =:= Port] of
+ [] ->
+ throw({error, {dist_port_already_used, Port, not_erlang, Host}});
+ [Name] ->
+ throw({error, {dist_port_already_used, Port, Name, Host}})
+ end.
diff --git a/deps/rabbit/apps/rabbitmq_prelaunch/src/rabbit_prelaunch_early_logging.erl b/deps/rabbit/apps/rabbitmq_prelaunch/src/rabbit_prelaunch_early_logging.erl
new file mode 100644
index 0000000000..4e371c76ae
--- /dev/null
+++ b/deps/rabbit/apps/rabbitmq_prelaunch/src/rabbit_prelaunch_early_logging.erl
@@ -0,0 +1,115 @@
+-module(rabbit_prelaunch_early_logging).
+
+-include_lib("rabbit_common/include/rabbit_log.hrl").
+
+-export([setup_early_logging/2,
+ enable_quick_dbg/1,
+ use_colored_logging/0,
+ use_colored_logging/1,
+ list_expected_sinks/0]).
+
+setup_early_logging(#{log_levels := undefined} = Context,
+ LagerEventToStdout) ->
+ setup_early_logging(Context#{log_levels => get_default_log_level()},
+ LagerEventToStdout);
+setup_early_logging(Context, LagerEventToStdout) ->
+ Configured = lists:member(
+ lager_util:make_internal_sink_name(rabbit_log_prelaunch),
+ lager:list_all_sinks()),
+ case Configured of
+ true -> ok;
+ false -> do_setup_early_logging(Context, LagerEventToStdout)
+ end.
+
+get_default_log_level() ->
+ #{"prelaunch" => warning}.
+
+do_setup_early_logging(#{log_levels := LogLevels} = Context,
+ LagerEventToStdout) ->
+ redirect_logger_messages_to_lager(),
+ Colored = use_colored_logging(Context),
+ application:set_env(lager, colored, Colored),
+ ConsoleBackend = lager_console_backend,
+ case LagerEventToStdout of
+ true ->
+ GLogLevel = case LogLevels of
+ #{global := Level} -> Level;
+ _ -> warning
+ end,
+ _ = lager_app:start_handler(
+ lager_event, ConsoleBackend, [{level, GLogLevel}]),
+ ok;
+ false ->
+ ok
+ end,
+ lists:foreach(
+ fun(Sink) ->
+ CLogLevel = get_log_level(LogLevels, Sink),
+ lager_app:configure_sink(
+ Sink,
+ [{handlers, [{ConsoleBackend, [{level, CLogLevel}]}]}])
+ end, list_expected_sinks()),
+ ok.
+
+redirect_logger_messages_to_lager() ->
+ io:format(standard_error, "Configuring logger redirection~n", []),
+ ok = logger:add_handler(rabbit_log, rabbit_log, #{}),
+ ok = logger:set_primary_config(level, all).
+
+use_colored_logging() ->
+ use_colored_logging(rabbit_prelaunch:get_context()).
+
+use_colored_logging(#{log_levels := #{color := true},
+ output_supports_colors := true}) ->
+ true;
+use_colored_logging(_) ->
+ false.
+
+list_expected_sinks() ->
+ Key = {?MODULE, lager_extra_sinks},
+ case persistent_term:get(Key, undefined) of
+ undefined ->
+ CompileOptions = proplists:get_value(options,
+ module_info(compile),
+ []),
+ AutoList = [lager_util:make_internal_sink_name(M)
+ || M <- proplists:get_value(lager_extra_sinks,
+ CompileOptions, [])],
+ List = case lists:member(?LAGER_SINK, AutoList) of
+ true -> AutoList;
+ false -> [?LAGER_SINK | AutoList]
+ end,
+ %% Store the list in the application environment. If this
+ %% module is later cover-compiled, the compile option will
+ %% be lost, so we will be able to retrieve the list from the
+ %% application environment.
+ persistent_term:put(Key, List),
+ List;
+ List ->
+ List
+ end.
+
+sink_to_category(Sink) when is_atom(Sink) ->
+ re:replace(
+ atom_to_list(Sink),
+ "^rabbit_log_(.+)_lager_event$",
+ "\\1",
+ [{return, list}]).
+
+get_log_level(LogLevels, Sink) ->
+ Category = sink_to_category(Sink),
+ case LogLevels of
+ #{Category := Level} -> Level;
+ #{global := Level} -> Level;
+ _ -> warning
+ end.
+
+enable_quick_dbg(#{dbg_output := Output, dbg_mods := Mods}) ->
+ case Output of
+ stdout -> {ok, _} = dbg:tracer(),
+ ok;
+ _ -> {ok, _} = dbg:tracer(port, dbg:trace_port(file, Output)),
+ ok
+ end,
+ {ok, _} = dbg:p(all, c),
+ lists:foreach(fun(M) -> {ok, _} = dbg:tp(M, cx) end, Mods).
diff --git a/deps/rabbit/apps/rabbitmq_prelaunch/src/rabbit_prelaunch_erlang_compat.erl b/deps/rabbit/apps/rabbitmq_prelaunch/src/rabbit_prelaunch_erlang_compat.erl
new file mode 100644
index 0000000000..1e8fe2690d
--- /dev/null
+++ b/deps/rabbit/apps/rabbitmq_prelaunch/src/rabbit_prelaunch_erlang_compat.erl
@@ -0,0 +1,47 @@
+-module(rabbit_prelaunch_erlang_compat).
+
+-export([check/1]).
+
+-define(OTP_MINIMUM, "21.3").
+-define(ERTS_MINIMUM, "10.3").
+
+check(_Context) ->
+ rabbit_log_prelaunch:debug(""),
+ rabbit_log_prelaunch:debug("== Erlang/OTP compatibility check =="),
+
+ ERTSVer = erlang:system_info(version),
+ OTPRel = rabbit_misc:otp_release(),
+ rabbit_log_prelaunch:debug(
+ "Requiring: Erlang/OTP ~s (ERTS ~s)", [?OTP_MINIMUM, ?ERTS_MINIMUM]),
+ rabbit_log_prelaunch:debug(
+ "Running: Erlang/OTP ~s (ERTS ~s)", [OTPRel, ERTSVer]),
+
+ case rabbit_misc:version_compare(?ERTS_MINIMUM, ERTSVer, lte) of
+ true when ?ERTS_MINIMUM =/= ERTSVer ->
+ rabbit_log_prelaunch:debug(
+ "Erlang/OTP version requirement satisfied"),
+ ok;
+ true when ?ERTS_MINIMUM =:= ERTSVer andalso ?OTP_MINIMUM =< OTPRel ->
+ %% When a critical regression or bug is found, a new OTP
+ %% release can be published without changing the ERTS
+ %% version. For instance, this is the case with R16B03 and
+ %% R16B03-1.
+ %%
+ %% In this case, we compare the release versions
+ %% alphabetically.
+ ok;
+ _ ->
+ Msg =
+ "This RabbitMQ version cannot run on Erlang ~s (erts ~s): "
+ "minimum required version is ~s (erts ~s)",
+ Args = [OTPRel, ERTSVer, ?OTP_MINIMUM, ?ERTS_MINIMUM],
+ rabbit_log_prelaunch:error(Msg, Args),
+
+ %% Also print to stderr to make this more visible
+ io:format(standard_error, "Error: " ++ Msg ++ "~n", Args),
+
+ Msg2 = rabbit_misc:format(
+ "Erlang ~s or later is required, started on ~s",
+ [?OTP_MINIMUM, OTPRel]),
+ throw({error, {erlang_version_too_old, Msg2}})
+ end.
diff --git a/deps/rabbit/apps/rabbitmq_prelaunch/src/rabbit_prelaunch_errors.erl b/deps/rabbit/apps/rabbitmq_prelaunch/src/rabbit_prelaunch_errors.erl
new file mode 100644
index 0000000000..b2cc03d069
--- /dev/null
+++ b/deps/rabbit/apps/rabbitmq_prelaunch/src/rabbit_prelaunch_errors.erl
@@ -0,0 +1,114 @@
+-module(rabbit_prelaunch_errors).
+
+-export([format_error/1,
+ format_exception/3,
+ log_error/1,
+ log_exception/3]).
+
+-define(BOOT_FAILED_HEADER,
+ "\n"
+ "BOOT FAILED\n"
+ "===========\n").
+
+-define(BOOT_FAILED_FOOTER,
+ "\n").
+
+log_error(Error) ->
+ Message = format_error(Error),
+ log_message(Message).
+
+format_error({error, {duplicate_node_name, NodeName, NodeHost}}) ->
+ rabbit_misc:format(
+ "ERROR: node with name ~p is already running on host ~p",
+ [NodeName, NodeHost]);
+format_error({error, {epmd_error, NodeHost, EpmdReason}}) ->
+ rabbit_misc:format(
+ "ERROR: epmd error for host ~s: ~s",
+ [NodeHost, rabbit_misc:format_inet_error(EpmdReason)]);
+format_error({error, {invalid_dist_port_range, DistTcpPort}}) ->
+ rabbit_misc:format(
+ "Invalid Erlang distribution TCP port: ~b", [DistTcpPort]);
+format_error({error, {dist_port_already_used, Port, not_erlang, Host}}) ->
+ rabbit_misc:format(
+ "ERROR: could not bind to distribution port ~b on host ~s. It could "
+ "be in use by another process or cannot be bound to (e.g. due to a "
+ "security policy)", [Port, Host]);
+format_error({error, {dist_port_already_used, Port, Name, Host}}) ->
+ rabbit_misc:format(
+ "ERROR: could not bind to distribution port ~b, it is in use by "
+ "another node: ~s@~s", [Port, Name, Host]);
+format_error({error, {erlang_dist_running_with_unexpected_nodename,
+ Unexpected, Node}}) ->
+ rabbit_misc:format(
+ "Erlang distribution running with another node name (~s) "
+ "than the configured one (~s)",
+ [Unexpected, Node]);
+format_error({bad_config_entry_decoder, missing_passphrase}) ->
+ rabbit_misc:format(
+ "Missing passphrase or missing passphrase read method in "
+ "`config_entry_decoder`", []);
+format_error({config_decryption_error, {key, Key}, _Msg}) ->
+ rabbit_misc:format(
+ "Error while decrypting key '~p'. Please check encrypted value, "
+ "passphrase, and encryption configuration~n",
+ [Key]);
+format_error({error, {timeout_waiting_for_tables, AllNodes, _}}) ->
+ Suffix =
+ "~nBACKGROUND~n==========~n~n"
+ "This cluster node was shut down while other nodes were still running.~n"
+ "To avoid losing data, you should start the other nodes first, then~n"
+ "start this one. To force this node to start, first invoke~n"
+ "\"rabbitmqctl force_boot\". If you do so, any changes made on other~n"
+ "cluster nodes after this one was shut down may be lost.",
+ {Message, Nodes} =
+ case AllNodes -- [node()] of
+ [] -> {rabbit_misc:format(
+ "Timeout contacting cluster nodes. Since RabbitMQ was"
+ " shut down forcefully~nit cannot determine which nodes"
+ " are timing out.~n" ++ Suffix, []),
+ []};
+ Ns -> {rabbit_misc:format(
+ "Timeout contacting cluster nodes: ~p.~n" ++ Suffix,
+ [Ns]),
+ Ns}
+ end,
+ Message ++ "\n" ++ rabbit_nodes_common:diagnostics(Nodes);
+format_error({error, {cannot_log_to_file, unknown, Reason}}) ->
+ rabbit_misc:format(
+ "failed to initialised logger: ~p~n",
+ [Reason]);
+format_error({error, {cannot_log_to_file, LogFile,
+ {cannot_create_parent_dirs, _, Reason}}}) ->
+ rabbit_misc:format(
+ "failed to create parent directory for log file at '~s', reason: ~s~n",
+ [LogFile, file:format_error(Reason)]);
+format_error({error, {cannot_log_to_file, LogFile, Reason}}) ->
+ rabbit_misc:format(
+ "failed to open log file at '~s', reason: ~s",
+ [LogFile, file:format_error(Reason)]);
+format_error(Error) ->
+ rabbit_misc:format("Error during startup: ~p", [Error]).
+
+log_exception(Class, Exception, Stacktrace) ->
+ Message = format_exception(Class, Exception, Stacktrace),
+ log_message(Message).
+
+format_exception(Class, Exception, Stacktrace) ->
+ rabbit_misc:format(
+ "Exception during startup:~n~s",
+ [lager:pr_stacktrace(Stacktrace, {Class, Exception})]).
+
+log_message(Message) ->
+ Lines = string:split(
+ ?BOOT_FAILED_HEADER ++
+ Message ++
+ ?BOOT_FAILED_FOOTER,
+ [$\n],
+ all),
+ lists:foreach(
+ fun(Line) ->
+ rabbit_log_prelaunch:error("~s", [Line]),
+ io:format(standard_error, "~s~n", [Line])
+ end, Lines),
+ timer:sleep(1000),
+ ok.
diff --git a/deps/rabbit/apps/rabbitmq_prelaunch/src/rabbit_prelaunch_sighandler.erl b/deps/rabbit/apps/rabbitmq_prelaunch/src/rabbit_prelaunch_sighandler.erl
new file mode 100644
index 0000000000..f9a60effda
--- /dev/null
+++ b/deps/rabbit/apps/rabbitmq_prelaunch/src/rabbit_prelaunch_sighandler.erl
@@ -0,0 +1,93 @@
+-module(rabbit_prelaunch_sighandler).
+-behaviour(gen_event).
+
+-export([setup/0,
+ init/1,
+ handle_event/2,
+ handle_call/2,
+ handle_info/2,
+ terminate/2,
+ code_change/3]).
+
+%% CAUTION: Signal handling in this module must be kept consistent
+%% with the same handling in rabbitmq-server(8).
+
+%% #{signal => default | ignore | stop}.
+-define(SIGNALS_HANDLED_BY_US,
+ #{
+ %% SIGHUP is often used to reload the configuration or reopen
+ %% log files after they were rotated. We don't support any
+ %% of those two cases, so ignore it for now, until we can do
+ %% something about it.
+ sighup => ignore,
+
+ %% SIGTSTP is triggered by Ctrl+Z to pause a program. However
+ %% we can't handle SIGCONT, the signal used to resume the
+ %% program. Unfortunately, it makes a SIGTSTP handler less
+ %% useful here.
+ sigtstp => ignore
+ }).
+
+-define(SIGNAL_HANDLED_BY_ERLANG(Signal),
+ Signal =:= sigusr1 orelse
+ Signal =:= sigquit orelse
+ Signal =:= sigterm).
+
+-define(SERVER, erl_signal_server).
+
+setup() ->
+ case os:type() of
+ {unix, _} ->
+ case whereis(?SERVER) of
+ undefined ->
+ ok;
+ _ ->
+ case lists:member(?MODULE, gen_event:which_handlers(?SERVER)) of
+ true -> ok;
+ false -> gen_event:add_handler(?SERVER, ?MODULE, [])
+ end
+ end;
+ _ ->
+ ok
+ end.
+
+init(_Args) ->
+ maps:fold(
+ fun
+ (Signal, _, Ret) when ?SIGNAL_HANDLED_BY_ERLANG(Signal) -> Ret;
+ (Signal, default, ok) -> os:set_signal(Signal, default);
+ (Signal, ignore, ok) -> os:set_signal(Signal, ignore);
+ (Signal, _, ok) -> os:set_signal(Signal, handle)
+ end, ok, ?SIGNALS_HANDLED_BY_US),
+ {ok, #{}}.
+
+handle_event(Signal, State) when ?SIGNAL_HANDLED_BY_ERLANG(Signal) ->
+ {ok, State};
+handle_event(Signal, State) ->
+ case ?SIGNALS_HANDLED_BY_US of
+ %% The code below can be uncommented if we introduce a signal
+ %% which should stop RabbitMQ.
+ %
+ %#{Signal := stop} ->
+ % error_logger:info_msg(
+ % "~s received - shutting down~n",
+ % [string:uppercase(atom_to_list(Signal))]),
+ % ok = init:stop();
+ _ ->
+ error_logger:info_msg(
+ "~s received - unhandled signal~n",
+ [string:uppercase(atom_to_list(Signal))])
+ end,
+ {ok, State}.
+
+handle_info(_, State) ->
+ {ok, State}.
+
+handle_call(_, State) ->
+ {ok, ok, State}.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+terminate(_Args, _State) ->
+ ok.
diff --git a/deps/rabbit/apps/rabbitmq_prelaunch/src/rabbit_prelaunch_sup.erl b/deps/rabbit/apps/rabbitmq_prelaunch/src/rabbit_prelaunch_sup.erl
new file mode 100644
index 0000000000..9fd117d9f3
--- /dev/null
+++ b/deps/rabbit/apps/rabbitmq_prelaunch/src/rabbit_prelaunch_sup.erl
@@ -0,0 +1,22 @@
+-module(rabbit_prelaunch_sup).
+-behaviour(supervisor).
+
+-export([start_link/0]).
+-export([init/1]).
+
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+init([]) ->
+ BootStateSup = #{id => bootstate,
+ start => {rabbit_boot_state_sup, start_link, []},
+ type => supervisor},
+ %% `rabbit_prelaunch` does not start a process, it only configures
+ %% the node.
+ Prelaunch = #{id => prelaunch,
+ start => {rabbit_prelaunch, run_prelaunch_first_phase, []},
+ restart => transient},
+ Procs = [BootStateSup, Prelaunch],
+ {ok, {#{strategy => one_for_one,
+ intensity => 1,
+ period => 5}, Procs}}.