summaryrefslogtreecommitdiff
path: root/deps/rabbit_common/test
diff options
context:
space:
mode:
Diffstat (limited to 'deps/rabbit_common/test')
-rw-r--r--deps/rabbit_common/test/gen_server2_test_server.erl72
-rw-r--r--deps/rabbit_common/test/rabbit_env_SUITE.erl1098
-rw-r--r--deps/rabbit_common/test/supervisor2_SUITE.erl128
-rw-r--r--deps/rabbit_common/test/unit_SUITE.erl446
-rw-r--r--deps/rabbit_common/test/unit_priority_queue_SUITE.erl35
-rw-r--r--deps/rabbit_common/test/worker_pool_SUITE.erl220
6 files changed, 1999 insertions, 0 deletions
diff --git a/deps/rabbit_common/test/gen_server2_test_server.erl b/deps/rabbit_common/test/gen_server2_test_server.erl
new file mode 100644
index 0000000000..0d68df8f7e
--- /dev/null
+++ b/deps/rabbit_common/test/gen_server2_test_server.erl
@@ -0,0 +1,72 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2017-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(gen_server2_test_server).
+-behaviour(gen_server2).
+-record(gs2_state, {parent, name, state, mod, time,
+ timeout_state, queue, debug, prioritisers,
+ timer, emit_stats_fun, stop_stats_fun}).
+
+-export([start_link/0, start_link/1, start_link/2, stats_count/1]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3, handle_post_hibernate/1]).
+
+start_link(count_stats) ->
+ start_link(count_stats, infinity).
+
+start_link(count_stats, Time) ->
+ {ok, Server} = gen_server2:start_link(gen_server2_test_server, [Time], []),
+ Counter = gen_server2:call(Server, get_counter),
+ sys:replace_state(Server,
+ fun(GSState) ->
+ GSState#gs2_state{
+ emit_stats_fun = fun(State) -> count_stats(Counter), State end
+ }
+ end),
+ {ok, Server}.
+
+start_link() ->
+ gen_server2:start_link(gen_server2_test_server, [], []).
+
+stats_count(Server) ->
+ Counter = gen_server2:call(Server, get_counter),
+ [{count, Count}] = ets:lookup(Counter, count),
+ Count.
+
+init([]) ->
+ init([infinity]);
+init([Time]) ->
+ Counter = ets:new(stats_count, [public]),
+ ets:insert(Counter, {count, 0}),
+ case Time of
+ {backoff, _, _, _} ->
+ {ok, {counter, Counter}, hibernate, Time};
+ _ ->
+ {ok, {counter, Counter}, Time}
+ end.
+
+count_stats(Counter) ->
+ ets:update_counter(Counter, count, {2, 1}).
+
+handle_call(get_counter,_, {counter, Counter} = State) ->
+ {reply, Counter, State};
+handle_call(hibernate, _, State) ->
+ {reply, ok, State, hibernate};
+handle_call(_,_,State) ->
+ {reply, ok, State}.
+
+handle_cast({sleep, Time}, State) -> timer:sleep(Time), {noreply, State};
+handle_cast(_,State) -> {noreply, State}.
+
+handle_post_hibernate(State) -> {noreply, State}.
+
+handle_info(_,State) -> {noreply, State}.
+
+terminate(_,_State) -> ok.
+
+code_change(_,State,_) -> {ok, State}.
diff --git a/deps/rabbit_common/test/rabbit_env_SUITE.erl b/deps/rabbit_common/test/rabbit_env_SUITE.erl
new file mode 100644
index 0000000000..a881097e6b
--- /dev/null
+++ b/deps/rabbit_common/test/rabbit_env_SUITE.erl
@@ -0,0 +1,1098 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2019-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_env_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-export([all/0,
+ suite/0,
+ groups/0,
+ init_per_suite/1,
+ end_per_suite/1,
+ init_per_group/2,
+ end_per_group/2,
+ init_per_testcase/2,
+ end_per_testcase/2,
+ check_data_dir/1,
+ check_default_values/1,
+ check_values_from_reachable_remote_node/1,
+ check_values_from_offline_remote_node/1,
+ check_context_to_app_env_vars/1,
+ check_context_to_code_path/1,
+ check_RABBITMQ_ADVANCED_CONFIG_FILE/1,
+ check_RABBITMQ_CONFIG_FILE/1,
+ check_RABBITMQ_CONFIG_FILES/1,
+ check_RABBITMQ_DIST_PORT/1,
+ check_RABBITMQ_ENABLED_PLUGINS/1,
+ check_RABBITMQ_ENABLED_PLUGINS_FILE/1,
+ check_RABBITMQ_FEATURE_FLAGS_FILE/1,
+ check_RABBITMQ_KEEP_PID_FILE_ON_EXIT/1,
+ check_RABBITMQ_LOG/1,
+ check_RABBITMQ_LOG_BASE/1,
+ check_RABBITMQ_LOGS/1,
+ check_RABBITMQ_MNESIA_BASE/1,
+ check_RABBITMQ_MNESIA_DIR/1,
+ check_RABBITMQ_MOTD_FILE/1,
+ check_RABBITMQ_NODE_IP_ADDRESS/1,
+ check_RABBITMQ_NODE_PORT/1,
+ check_RABBITMQ_NODENAME/1,
+ check_RABBITMQ_PID_FILE/1,
+ check_RABBITMQ_PLUGINS_DIR/1,
+ check_RABBITMQ_PLUGINS_EXPAND_DIR/1,
+ check_RABBITMQ_PRODUCT_NAME/1,
+ check_RABBITMQ_PRODUCT_VERSION/1,
+ check_RABBITMQ_QUORUM_DIR/1,
+ check_RABBITMQ_STREAM_DIR/1,
+ check_RABBITMQ_UPGRADE_LOG/1,
+ check_RABBITMQ_USE_LOGNAME/1,
+ check_value_is_yes/1,
+ check_log_process_env/1,
+ check_log_context/1,
+ check_get_used_env_vars/1,
+ check_parse_conf_env_file_output/1
+ ]).
+
+all() ->
+ [
+ check_data_dir,
+ check_default_values,
+ check_values_from_reachable_remote_node,
+ check_values_from_offline_remote_node,
+ check_context_to_app_env_vars,
+ check_context_to_code_path,
+ check_RABBITMQ_ADVANCED_CONFIG_FILE,
+ check_RABBITMQ_CONFIG_FILE,
+ check_RABBITMQ_CONFIG_FILES,
+ check_RABBITMQ_DIST_PORT,
+ check_RABBITMQ_ENABLED_PLUGINS,
+ check_RABBITMQ_ENABLED_PLUGINS_FILE,
+ check_RABBITMQ_FEATURE_FLAGS_FILE,
+ check_RABBITMQ_KEEP_PID_FILE_ON_EXIT,
+ check_RABBITMQ_LOG,
+ check_RABBITMQ_LOG_BASE,
+ check_RABBITMQ_LOGS,
+ check_RABBITMQ_MNESIA_BASE,
+ check_RABBITMQ_MNESIA_DIR,
+ check_RABBITMQ_MOTD_FILE,
+ check_RABBITMQ_NODE_IP_ADDRESS,
+ check_RABBITMQ_NODE_PORT,
+ check_RABBITMQ_NODENAME,
+ check_RABBITMQ_PID_FILE,
+ check_RABBITMQ_PLUGINS_DIR,
+ check_RABBITMQ_PLUGINS_EXPAND_DIR,
+ check_RABBITMQ_PRODUCT_NAME,
+ check_RABBITMQ_PRODUCT_VERSION,
+ check_RABBITMQ_QUORUM_DIR,
+ check_RABBITMQ_UPGRADE_LOG,
+ check_RABBITMQ_USE_LOGNAME,
+ check_value_is_yes,
+ check_log_process_env,
+ check_log_context,
+ check_get_used_env_vars,
+ check_parse_conf_env_file_output
+ ].
+
+suite() ->
+ [{timetrap, {seconds, 10}}].
+
+groups() ->
+ [
+ {parallel_tests, [parallel], all()}
+ ].
+
+init_per_suite(Config) ->
+ persistent_term:put({rabbit_env, load_conf_env_file}, false),
+ Config.
+
+end_per_suite(Config) ->
+ persistent_term:erase({rabbit_env, load_conf_env_file}),
+ Config.
+
+init_per_group(_, Config) -> Config.
+end_per_group(_, Config) -> Config.
+
+init_per_testcase(_, Config) -> Config.
+end_per_testcase(_, Config) -> Config.
+
+check_data_dir(_) ->
+ {Variable, ExpValue} = case os:type() of
+ {win32, _} ->
+ {"RABBITMQ_BASE",
+ "value of RABBITMQ_BASE"};
+ {unix, _} ->
+ {"SYS_PREFIX",
+ "value of SYS_PREFIX/var/lib/rabbitmq"}
+ end,
+ Value = "value of " ++ Variable,
+ os:putenv(Variable, Value),
+ ?assertMatch(#{data_dir := ExpValue}, rabbit_env:get_context()),
+
+ os:unsetenv(Variable),
+ ?assertNotMatch(#{data_dir := ExpValue}, rabbit_env:get_context()),
+ ?assertMatch(#{data_dir := _}, rabbit_env:get_context()),
+
+ os:unsetenv(Variable).
+
+check_default_values(_) ->
+ %% When `rabbit_env` is built with `TEST` defined, we can override
+ %% the OS type.
+ persistent_term:put({rabbit_env, os_type}, {unix, undefined}),
+ UnixContext = rabbit_env:get_context(),
+
+ persistent_term:put({rabbit_env, os_type}, {win32, undefined}),
+ SavedAppData = os:getenv("APPDATA"),
+ os:putenv("APPDATA", "%APPDATA%"),
+ Win32Context = rabbit_env:get_context(),
+ case SavedAppData of
+ false -> os:unsetenv("APPDATA");
+ _ -> os:putenv("APPDATA", SavedAppData)
+ end,
+
+ persistent_term:erase({rabbit_env, os_type}),
+
+ {RFFValue, RFFOrigin} = forced_feature_flags_on_init_expect(),
+
+ Node = get_default_nodename(),
+ NodeS = atom_to_list(Node),
+
+ Origins = #{
+ additional_config_files => default,
+ advanced_config_file => default,
+ amqp_ipaddr => default,
+ amqp_tcp_port => default,
+ conf_env_file => default,
+ enabled_plugins => default,
+ enabled_plugins_file => default,
+ erlang_dist_tcp_port => default,
+ feature_flags_file => default,
+ forced_feature_flags_on_init => RFFOrigin,
+ interactive_shell => default,
+ keep_pid_file_on_exit => default,
+ log_base_dir => default,
+ log_feature_flags_registry => default,
+ log_levels => default,
+ main_config_file => default,
+ main_log_file => default,
+ mnesia_base_dir => default,
+ mnesia_dir => default,
+ motd_file => default,
+ nodename => default,
+ nodename_type => default,
+ os_type => environment,
+ output_supports_colors => default,
+ pid_file => default,
+ plugins_expand_dir => default,
+ plugins_path => default,
+ product_name => default,
+ product_version => default,
+ quorum_queue_dir => default,
+ rabbitmq_home => default,
+ stream_queue_dir => default,
+ upgrade_log_file => default
+ },
+
+ ?assertEqual(
+ #{additional_config_files => "/etc/rabbitmq/conf.d/*.conf",
+ advanced_config_file => "/etc/rabbitmq/advanced.config",
+ amqp_ipaddr => "auto",
+ amqp_tcp_port => 5672,
+ conf_env_file => "/etc/rabbitmq/rabbitmq-env.conf",
+ config_base_dir => "/etc/rabbitmq",
+ data_dir => "/var/lib/rabbitmq",
+ dbg_mods => [],
+ dbg_output => stdout,
+ enabled_plugins => undefined,
+ enabled_plugins_file => "/etc/rabbitmq/enabled_plugins",
+ erlang_dist_tcp_port => 25672,
+ feature_flags_file =>
+ "/var/lib/rabbitmq/mnesia/" ++ NodeS ++ "-feature_flags",
+ forced_feature_flags_on_init => RFFValue,
+ interactive_shell => false,
+ keep_pid_file_on_exit => false,
+ log_base_dir => "/var/log/rabbitmq",
+ log_feature_flags_registry => false,
+ log_levels => undefined,
+ main_config_file => "/etc/rabbitmq/rabbitmq",
+ main_log_file => "/var/log/rabbitmq/" ++ NodeS ++ ".log",
+ mnesia_base_dir => "/var/lib/rabbitmq/mnesia",
+ mnesia_dir => "/var/lib/rabbitmq/mnesia/" ++ NodeS,
+ motd_file => "/etc/rabbitmq/motd",
+ nodename => Node,
+ nodename_type => shortnames,
+ os_type => {unix, undefined},
+ output_supports_colors => true,
+ pid_file => "/var/lib/rabbitmq/mnesia/" ++ NodeS ++ ".pid",
+ plugins_expand_dir =>
+ "/var/lib/rabbitmq/mnesia/" ++ NodeS ++ "-plugins-expand",
+ plugins_path => maps:get(plugins_path, UnixContext),
+ product_name => undefined,
+ product_version => undefined,
+ quorum_queue_dir =>
+ "/var/lib/rabbitmq/mnesia/" ++ NodeS ++ "/quorum",
+ rabbitmq_home => maps:get(rabbitmq_home, UnixContext),
+ stream_queue_dir =>
+ "/var/lib/rabbitmq/mnesia/" ++ NodeS ++ "/stream",
+ split_nodename => rabbit_nodes_common:parts(Node),
+ sys_prefix => "",
+ upgrade_log_file =>
+ "/var/log/rabbitmq/" ++ NodeS ++ "_upgrade.log",
+
+ var_origins => Origins#{sys_prefix => default}},
+ UnixContext),
+
+ ?assertEqual(
+ #{additional_config_files => "%APPDATA%/RabbitMQ/conf.d/*.conf",
+ advanced_config_file => "%APPDATA%/RabbitMQ/advanced.config",
+ amqp_ipaddr => "auto",
+ amqp_tcp_port => 5672,
+ conf_env_file => "%APPDATA%/RabbitMQ/rabbitmq-env-conf.bat",
+ config_base_dir => "%APPDATA%/RabbitMQ",
+ data_dir => "%APPDATA%/RabbitMQ",
+ dbg_mods => [],
+ dbg_output => stdout,
+ enabled_plugins => undefined,
+ enabled_plugins_file => "%APPDATA%/RabbitMQ/enabled_plugins",
+ erlang_dist_tcp_port => 25672,
+ feature_flags_file =>
+ "%APPDATA%/RabbitMQ/db/" ++ NodeS ++ "-feature_flags",
+ forced_feature_flags_on_init => RFFValue,
+ interactive_shell => false,
+ keep_pid_file_on_exit => false,
+ log_base_dir => "%APPDATA%/RabbitMQ/log",
+ log_feature_flags_registry => false,
+ log_levels => undefined,
+ main_config_file => "%APPDATA%/RabbitMQ/rabbitmq",
+ main_log_file => "%APPDATA%/RabbitMQ/log/" ++ NodeS ++ ".log",
+ mnesia_base_dir => "%APPDATA%/RabbitMQ/db",
+ mnesia_dir => "%APPDATA%/RabbitMQ/db/" ++ NodeS ++ "-mnesia",
+ motd_file => "%APPDATA%/RabbitMQ/motd.txt",
+ nodename => Node,
+ nodename_type => shortnames,
+ os_type => {win32, undefined},
+ output_supports_colors => false,
+ pid_file => "%APPDATA%/RabbitMQ/db/" ++ NodeS ++ ".pid",
+ plugins_expand_dir =>
+ "%APPDATA%/RabbitMQ/db/" ++ NodeS ++ "-plugins-expand",
+ plugins_path => maps:get(plugins_path, Win32Context),
+ product_name => undefined,
+ product_version => undefined,
+ quorum_queue_dir =>
+ "%APPDATA%/RabbitMQ/db/" ++ NodeS ++ "-mnesia/quorum",
+ rabbitmq_base => "%APPDATA%/RabbitMQ",
+ rabbitmq_home => maps:get(rabbitmq_home, Win32Context),
+ stream_queue_dir =>
+ "%APPDATA%/RabbitMQ/db/" ++ NodeS ++ "-mnesia/stream",
+ split_nodename => rabbit_nodes_common:parts(Node),
+ upgrade_log_file =>
+ "%APPDATA%/RabbitMQ/log/" ++ NodeS ++ "_upgrade.log",
+
+ var_origins => Origins#{rabbitmq_base => default}},
+ Win32Context).
+
+forced_feature_flags_on_init_expect() ->
+ %% In the case of mixed-versions-cluster testing in CI, the test
+ %% sets $RABBITMQ_FEATURE_FLAGS to an empty string. This obviously
+ %% changes the context returned by rabbit_env.
+ case os:getenv("RABBITMQ_FEATURE_FLAGS") of
+ false -> {undefined, default};
+ "" -> {[], environment}
+ end.
+
+check_values_from_reachable_remote_node(Config) ->
+ PrivDir = ?config(priv_dir, Config),
+
+ MnesiaDir = filename:join(PrivDir, "mnesia"),
+ RabbitAppDir = filename:join(PrivDir, "rabbit"),
+ RabbitEbinDir = filename:join(RabbitAppDir, "ebin"),
+
+ FeatureFlagsFile = filename:join(PrivDir, "feature_flags"),
+ PluginsDir = filename:join(PrivDir, "plugins"),
+ EnabledPluginsFile = filename:join(PrivDir, "enabled_plugins"),
+
+ ok = file:make_dir(MnesiaDir),
+ ok = file:make_dir(RabbitAppDir),
+ ok = file:make_dir(RabbitEbinDir),
+
+ %% Create a fake `rabbit` application.
+ App = {application,
+ rabbit,
+ [{vsn, "fake-rabbit"}]},
+ AppFile = filename:join(RabbitEbinDir, "rabbit.app"),
+ AppContent = io_lib:format("~p.~n", [App]),
+ ok = file:write_file(AppFile, AppContent),
+
+ %% Start a fake RabbitMQ node.
+ Node = rabbit_nodes_common:make(
+ {atom_to_list(?FUNCTION_NAME), "localhost"}),
+ NodeS = atom_to_list(Node),
+ true = os:putenv("RABBITMQ_NODENAME", NodeS),
+ RabbitCommonEbinDir = filename:dirname(code:which(rabbit_env)),
+ Args = ["-noinput",
+ "-sname", atom_to_list(Node),
+ "-pa", RabbitCommonEbinDir,
+ "-pa", RabbitEbinDir,
+ "-mnesia", "dir",
+ rabbit_misc:format("~p", [MnesiaDir]),
+ "-rabbit", "feature_flags_file",
+ rabbit_misc:format("~p", [FeatureFlagsFile]),
+ "-rabbit", "plugins_dir",
+ rabbit_misc:format("~p", [PluginsDir]),
+ "-rabbit", "enabled_plugins_file",
+ rabbit_misc:format("~p", [EnabledPluginsFile]),
+ "-eval",
+ "ok = application:load(mnesia),"
+ "ok = application:load(rabbit)."],
+ PortName = {spawn_executable, os:find_executable("erl")},
+ PortSettings = [{cd, PrivDir},
+ {args, Args},
+ {env, [{"ERL_LIBS", false}]},
+ {line, 512},
+ exit_status,
+ stderr_to_stdout],
+ ct:pal(
+ "Starting fake RabbitMQ node with the following settings:~n~p",
+ [PortSettings]),
+ Pid = spawn_link(
+ fun() ->
+ Port = erlang:open_port(PortName, PortSettings),
+ consume_stdout(Port, Node)
+ end),
+ wait_for_remote_node(Node),
+
+ try
+ persistent_term:put({rabbit_env, os_type}, {unix, undefined}),
+ UnixContext = rabbit_env:get_context(Node),
+
+ persistent_term:erase({rabbit_env, os_type}),
+
+ {RFFValue, RFFOrigin} = forced_feature_flags_on_init_expect(),
+
+ Origins = #{
+ additional_config_files => default,
+ advanced_config_file => default,
+ amqp_ipaddr => default,
+ amqp_tcp_port => default,
+ conf_env_file => default,
+ enabled_plugins => default,
+ enabled_plugins_file => remote_node,
+ erlang_dist_tcp_port => default,
+ feature_flags_file => remote_node,
+ forced_feature_flags_on_init => RFFOrigin,
+ interactive_shell => default,
+ keep_pid_file_on_exit => default,
+ log_base_dir => default,
+ log_feature_flags_registry => default,
+ log_levels => default,
+ main_config_file => default,
+ main_log_file => default,
+ mnesia_base_dir => default,
+ mnesia_dir => remote_node,
+ motd_file => default,
+ nodename => environment,
+ nodename_type => default,
+ os_type => environment,
+ output_supports_colors => default,
+ pid_file => default,
+ plugins_expand_dir => default,
+ plugins_path => remote_node,
+ product_name => default,
+ product_version => default,
+ quorum_queue_dir => default,
+ rabbitmq_home => default,
+ stream_queue_dir => default,
+ upgrade_log_file => default
+ },
+
+ ?assertEqual(
+ #{additional_config_files => "/etc/rabbitmq/conf.d/*.conf",
+ advanced_config_file => "/etc/rabbitmq/advanced.config",
+ amqp_ipaddr => "auto",
+ amqp_tcp_port => 5672,
+ conf_env_file => "/etc/rabbitmq/rabbitmq-env.conf",
+ config_base_dir => "/etc/rabbitmq",
+ data_dir => "/var/lib/rabbitmq",
+ dbg_mods => [],
+ dbg_output => stdout,
+ enabled_plugins => undefined,
+ enabled_plugins_file => EnabledPluginsFile,
+ erlang_dist_tcp_port => 25672,
+ feature_flags_file => FeatureFlagsFile,
+ forced_feature_flags_on_init => RFFValue,
+ from_remote_node => {Node, 10000},
+ interactive_shell => false,
+ keep_pid_file_on_exit => false,
+ log_base_dir => "/var/log/rabbitmq",
+ log_feature_flags_registry => false,
+ log_levels => undefined,
+ main_config_file => "/etc/rabbitmq/rabbitmq",
+ main_log_file => "/var/log/rabbitmq/" ++ NodeS ++ ".log",
+ mnesia_base_dir => undefined,
+ mnesia_dir => MnesiaDir,
+ motd_file => undefined,
+ nodename => Node,
+ nodename_type => shortnames,
+ os_type => {unix, undefined},
+ output_supports_colors => true,
+ pid_file => undefined,
+ plugins_expand_dir => undefined,
+ plugins_path => PluginsDir,
+ product_name => undefined,
+ product_version => undefined,
+ quorum_queue_dir => MnesiaDir ++ "/quorum",
+ rabbitmq_home => maps:get(rabbitmq_home, UnixContext),
+ stream_queue_dir => MnesiaDir ++ "/stream",
+ split_nodename => rabbit_nodes_common:parts(Node),
+ sys_prefix => "",
+ upgrade_log_file =>
+ "/var/log/rabbitmq/" ++ NodeS ++ "_upgrade.log",
+
+ var_origins => Origins#{sys_prefix => default}},
+ UnixContext)
+ after
+ os:unsetenv("RABBITMQ_NODENAME"),
+ unlink(Pid),
+ rpc:call(Node, erlang, halt, [])
+ end.
+
+consume_stdout(Port, Nodename) ->
+ receive
+ {Port, {exit_status, X}} ->
+ ?assertEqual(0, X);
+ {Port, {data, Out}} ->
+ ct:pal("stdout: ~p", [Out]),
+ consume_stdout(Port, Nodename)
+ end.
+
+wait_for_remote_node(Nodename) ->
+ case net_adm:ping(Nodename) of
+ pong -> ok;
+ pang -> timer:sleep(200),
+ wait_for_remote_node(Nodename)
+ end.
+
+check_values_from_offline_remote_node(_) ->
+ Node = rabbit_nodes_common:make(
+ {atom_to_list(?FUNCTION_NAME), "localhost"}),
+ NodeS = atom_to_list(Node),
+ true = os:putenv("RABBITMQ_NODENAME", NodeS),
+
+ persistent_term:put({rabbit_env, os_type}, {unix, undefined}),
+ UnixContext = rabbit_env:get_context(offline),
+
+ persistent_term:erase({rabbit_env, os_type}),
+ os:unsetenv("RABBITMQ_NODENAME"),
+
+ {RFFValue, RFFOrigin} = forced_feature_flags_on_init_expect(),
+
+ Origins = #{
+ additional_config_files => default,
+ advanced_config_file => default,
+ amqp_ipaddr => default,
+ amqp_tcp_port => default,
+ conf_env_file => default,
+ enabled_plugins => default,
+ enabled_plugins_file => default,
+ erlang_dist_tcp_port => default,
+ feature_flags_file => default,
+ forced_feature_flags_on_init => RFFOrigin,
+ interactive_shell => default,
+ keep_pid_file_on_exit => default,
+ log_base_dir => default,
+ log_feature_flags_registry => default,
+ log_levels => default,
+ main_config_file => default,
+ main_log_file => default,
+ mnesia_base_dir => default,
+ mnesia_dir => default,
+ motd_file => default,
+ nodename => environment,
+ nodename_type => default,
+ os_type => environment,
+ output_supports_colors => default,
+ pid_file => default,
+ plugins_expand_dir => default,
+ plugins_path => default,
+ product_name => default,
+ product_version => default,
+ quorum_queue_dir => default,
+ rabbitmq_home => default,
+ stream_queue_dir => default,
+ upgrade_log_file => default
+ },
+
+ ?assertEqual(
+ #{additional_config_files => "/etc/rabbitmq/conf.d/*.conf",
+ advanced_config_file => "/etc/rabbitmq/advanced.config",
+ amqp_ipaddr => "auto",
+ amqp_tcp_port => 5672,
+ conf_env_file => "/etc/rabbitmq/rabbitmq-env.conf",
+ config_base_dir => "/etc/rabbitmq",
+ data_dir => "/var/lib/rabbitmq",
+ dbg_mods => [],
+ dbg_output => stdout,
+ enabled_plugins => undefined,
+ enabled_plugins_file => undefined,
+ erlang_dist_tcp_port => 25672,
+ feature_flags_file => undefined,
+ forced_feature_flags_on_init => RFFValue,
+ from_remote_node => offline,
+ interactive_shell => false,
+ keep_pid_file_on_exit => false,
+ log_base_dir => "/var/log/rabbitmq",
+ log_feature_flags_registry => false,
+ log_levels => undefined,
+ main_config_file => "/etc/rabbitmq/rabbitmq",
+ main_log_file => "/var/log/rabbitmq/" ++ NodeS ++ ".log",
+ mnesia_base_dir => undefined,
+ mnesia_dir => undefined,
+ motd_file => undefined,
+ nodename => Node,
+ nodename_type => shortnames,
+ os_type => {unix, undefined},
+ output_supports_colors => true,
+ pid_file => undefined,
+ plugins_expand_dir => undefined,
+ plugins_path => undefined,
+ product_name => undefined,
+ product_version => undefined,
+ quorum_queue_dir => undefined,
+ rabbitmq_home => maps:get(rabbitmq_home, UnixContext),
+ stream_queue_dir => undefined,
+ split_nodename => rabbit_nodes_common:parts(Node),
+ sys_prefix => "",
+ upgrade_log_file =>
+ "/var/log/rabbitmq/" ++ NodeS ++ "_upgrade.log",
+
+ var_origins => Origins#{sys_prefix => default}},
+ UnixContext).
+
+check_context_to_app_env_vars(_) ->
+ %% When `rabbit_env` is built with `TEST` defined, we can override
+ %% the OS type.
+ persistent_term:put({rabbit_env, os_type}, {unix, undefined}),
+ UnixContext = rabbit_env:get_context(),
+
+ persistent_term:erase({rabbit_env, os_type}),
+
+ Vars = [{mnesia, dir, maps:get(mnesia_dir, UnixContext)},
+ {ra, data_dir, maps:get(quorum_queue_dir, UnixContext)},
+ {osiris, data_dir, maps:get(stream_queue_dir, UnixContext)},
+ {rabbit, feature_flags_file,
+ maps:get(feature_flags_file, UnixContext)},
+ {rabbit, plugins_dir, maps:get(plugins_path, UnixContext)},
+ {rabbit, plugins_expand_dir,
+ maps:get(plugins_expand_dir, UnixContext)},
+ {rabbit, enabled_plugins_file,
+ maps:get(enabled_plugins_file, UnixContext)}],
+
+ lists:foreach(
+ fun({App, Param, _}) ->
+ ?assertEqual(undefined, application:get_env(App, Param))
+ end,
+ Vars),
+
+ rabbit_env:context_to_app_env_vars(UnixContext),
+ lists:foreach(
+ fun({App, Param, Value}) ->
+ ?assertEqual({ok, Value}, application:get_env(App, Param))
+ end,
+ Vars),
+
+ lists:foreach(
+ fun({App, Param, _}) ->
+ application:unset_env(App, Param),
+ ?assertEqual(undefined, application:get_env(App, Param))
+ end,
+ Vars),
+
+ rabbit_env:context_to_app_env_vars_no_logging(UnixContext),
+ lists:foreach(
+ fun({App, Param, Value}) ->
+ ?assertEqual({ok, Value}, application:get_env(App, Param))
+ end,
+ Vars).
+
+check_context_to_code_path(Config) ->
+ PrivDir = ?config(priv_dir, Config),
+ PluginsDir1 = filename:join(
+ PrivDir, rabbit_misc:format("~s-1", [?FUNCTION_NAME])),
+ MyPlugin1Dir = filename:join(PluginsDir1, "my_plugin1"),
+ MyPlugin1EbinDir = filename:join(MyPlugin1Dir, "ebin"),
+ PluginsDir2 = filename:join(
+ PrivDir, rabbit_misc:format("~s-2", [?FUNCTION_NAME])),
+ MyPlugin2Dir = filename:join(PluginsDir2, "my_plugin2"),
+ MyPlugin2EbinDir = filename:join(MyPlugin2Dir, "ebin"),
+
+ ok = file:make_dir(PluginsDir1),
+ ok = file:make_dir(MyPlugin1Dir),
+ ok = file:make_dir(MyPlugin1EbinDir),
+ ok = file:make_dir(PluginsDir2),
+ ok = file:make_dir(MyPlugin2Dir),
+ ok = file:make_dir(MyPlugin2EbinDir),
+
+ %% On Unix.
+ %%
+ %% We can't test the Unix codepath on Windows because the drive letter
+ %% separator conflicts with the path separator (they are both ':').
+ %% However, the Windows codepath can be tested on both Unix and Windows.
+ case os:type() of
+ {unix, _} ->
+ UnixPluginsPath = PluginsDir1 ++ ":" ++ PluginsDir2,
+ true = os:putenv("RABBITMQ_PLUGINS_DIR", UnixPluginsPath),
+ persistent_term:put({rabbit_env, os_type}, {unix, undefined}),
+ UnixContext = rabbit_env:get_context(),
+
+ persistent_term:erase({rabbit_env, os_type}),
+ os:unsetenv("RABBITMQ_PLUGINS_DIR"),
+
+ ?assertEqual(UnixPluginsPath, maps:get(plugins_path, UnixContext)),
+
+ OldCodePath1 = code:get_path(),
+ ?assertNot(lists:member(MyPlugin1EbinDir, OldCodePath1)),
+ ?assertNot(lists:member(MyPlugin2EbinDir, OldCodePath1)),
+
+ rabbit_env:context_to_code_path(UnixContext),
+
+ NewCodePath1 = code:get_path(),
+ ?assert(lists:member(MyPlugin1EbinDir, NewCodePath1)),
+ ?assert(lists:member(MyPlugin2EbinDir, NewCodePath1)),
+ ?assertEqual(
+ [MyPlugin1EbinDir, MyPlugin2EbinDir],
+ lists:filter(
+ fun(Dir) ->
+ Dir =:= MyPlugin1EbinDir orelse
+ Dir =:= MyPlugin2EbinDir
+ end, NewCodePath1)),
+
+ true = code:del_path(MyPlugin1EbinDir),
+ true = code:del_path(MyPlugin2EbinDir);
+ _ ->
+ ok
+ end,
+
+ %% On Windows.
+ Win32PluginsPath = PluginsDir1 ++ ";" ++ PluginsDir2,
+ true = os:putenv("RABBITMQ_PLUGINS_DIR", Win32PluginsPath),
+ persistent_term:put({rabbit_env, os_type}, {win32, undefined}),
+ Win32Context = rabbit_env:get_context(),
+
+ persistent_term:erase({rabbit_env, os_type}),
+ os:unsetenv("RABBITMQ_PLUGINS_DIR"),
+
+ ?assertEqual(Win32PluginsPath, maps:get(plugins_path, Win32Context)),
+
+ OldCodePath2 = code:get_path(),
+ ?assertNot(lists:member(MyPlugin1EbinDir, OldCodePath2)),
+ ?assertNot(lists:member(MyPlugin2EbinDir, OldCodePath2)),
+
+ rabbit_env:context_to_code_path(Win32Context),
+
+ NewCodePath2 = code:get_path(),
+ ?assert(lists:member(MyPlugin1EbinDir, NewCodePath2)),
+ ?assert(lists:member(MyPlugin2EbinDir, NewCodePath2)),
+ ?assertEqual(
+ [MyPlugin1EbinDir, MyPlugin2EbinDir],
+ lists:filter(
+ fun(Dir) ->
+ Dir =:= MyPlugin1EbinDir orelse
+ Dir =:= MyPlugin2EbinDir
+ end, NewCodePath2)),
+
+ true = code:del_path(MyPlugin1EbinDir),
+ true = code:del_path(MyPlugin2EbinDir).
+
+check_RABBITMQ_ADVANCED_CONFIG_FILE(_) ->
+ Value1 = random_string(),
+ Value2 = random_string(),
+ check_prefixed_variable("RABBITMQ_ADVANCED_CONFIG_FILE",
+ advanced_config_file,
+ '_',
+ Value1, Value1,
+ Value2, Value2).
+
+check_RABBITMQ_CONFIG_FILE(_) ->
+ Value1 = random_string(),
+ Value2 = random_string(),
+ check_prefixed_variable("RABBITMQ_CONFIG_FILE",
+ main_config_file,
+ '_',
+ Value1, Value1,
+ Value2, Value2).
+
+check_RABBITMQ_CONFIG_FILES(_) ->
+ Value1 = random_string(),
+ Value2 = random_string(),
+ check_prefixed_variable("RABBITMQ_CONFIG_FILES",
+ additional_config_files,
+ '_',
+ Value1, Value1,
+ Value2, Value2).
+
+check_RABBITMQ_DIST_PORT(_) ->
+ Value1 = random_int(),
+ Value2 = random_int(),
+ check_prefixed_variable("RABBITMQ_DIST_PORT",
+ erlang_dist_tcp_port,
+ 25672,
+ integer_to_list(Value1), Value1,
+ integer_to_list(Value2), Value2).
+
+check_RABBITMQ_ENABLED_PLUGINS(_) ->
+ Value1 = [random_atom(), random_atom()],
+ Value2 = [random_atom(), random_atom()],
+ check_prefixed_variable("RABBITMQ_ENABLED_PLUGINS",
+ enabled_plugins,
+ '_',
+ "", [],
+ "", []),
+ check_prefixed_variable("RABBITMQ_ENABLED_PLUGINS",
+ enabled_plugins,
+ '_',
+ rabbit_misc:format("~s,~s", Value1), Value1,
+ rabbit_misc:format("~s,~s", Value2), Value2).
+
+check_RABBITMQ_ENABLED_PLUGINS_FILE(_) ->
+ Value1 = random_string(),
+ Value2 = random_string(),
+ check_prefixed_variable("RABBITMQ_ENABLED_PLUGINS_FILE",
+ enabled_plugins_file,
+ '_',
+ Value1, Value1,
+ Value2, Value2).
+
+check_RABBITMQ_FEATURE_FLAGS_FILE(_) ->
+ Value1 = random_string(),
+ check_variable("RABBITMQ_FEATURE_FLAGS_FILE",
+ feature_flags_file,
+ Value1, Value1).
+
+check_RABBITMQ_KEEP_PID_FILE_ON_EXIT(_) ->
+ Value1 = true,
+ Value2 = false,
+ check_prefixed_variable("RABBITMQ_KEEP_PID_FILE_ON_EXIT",
+ keep_pid_file_on_exit,
+ false,
+ atom_to_list(Value1), Value1,
+ atom_to_list(Value2), Value2).
+
+check_RABBITMQ_LOG(_) ->
+ Value1 = random_string(),
+ Value2 = random_string(),
+ check_prefixed_variable("RABBITMQ_LOG",
+ log_levels,
+ '_',
+ "critical", #{global => critical},
+ "emergency", #{global => emergency}),
+ check_prefixed_variable("RABBITMQ_LOG",
+ log_levels,
+ '_',
+ Value1, #{Value1 => info},
+ Value2, #{Value2 => info}),
+ check_prefixed_variable("RABBITMQ_LOG",
+ log_levels,
+ '_',
+ Value1 ++ ",none", #{global => none,
+ Value1 => info},
+ Value2 ++ ",none", #{global => none,
+ Value2 => info}),
+ check_prefixed_variable("RABBITMQ_LOG",
+ log_levels,
+ '_',
+ Value1 ++ "=debug", #{Value1 => debug},
+ Value2 ++ "=info", #{Value2 => info}),
+ check_prefixed_variable("RABBITMQ_LOG",
+ log_levels,
+ '_',
+ Value1 ++ ",-color", #{Value1 => info,
+ color => false},
+ Value2 ++ ",+color", #{Value2 => info,
+ color => true}),
+ check_prefixed_variable("RABBITMQ_LOG",
+ log_levels,
+ '_',
+ Value1 ++ "=notice,-color", #{Value1 => notice,
+ color => false},
+ Value2 ++ "=warning,+color", #{Value2 => warning,
+ color => true}),
+ check_prefixed_variable("RABBITMQ_LOG",
+ log_levels,
+ '_',
+ Value1 ++ "=error," ++ Value2, #{Value1 => error,
+ Value2 => info},
+ Value2 ++ "=alert," ++ Value1, #{Value1 => info,
+ Value2 => alert}).
+
+check_RABBITMQ_LOG_BASE(_) ->
+ Value1 = random_string(),
+ Value2 = random_string(),
+ check_prefixed_variable("RABBITMQ_LOG_BASE",
+ log_base_dir,
+ '_',
+ Value1, Value1,
+ Value2, Value2).
+
+check_RABBITMQ_LOGS(_) ->
+ Value1 = random_string(),
+ Value2 = random_string(),
+ check_prefixed_variable("RABBITMQ_LOGS",
+ main_log_file,
+ '_',
+ Value1, Value1,
+ Value2, Value2).
+
+check_RABBITMQ_UPGRADE_LOG(_) ->
+ Value1 = random_string(),
+ Value2 = random_string(),
+ check_prefixed_variable("RABBITMQ_UPGRADE_LOG",
+ upgrade_log_file,
+ '_',
+ Value1, Value1,
+ Value2, Value2).
+
+check_RABBITMQ_MNESIA_BASE(_) ->
+ Value1 = random_string(),
+ Value2 = random_string(),
+ check_prefixed_variable("RABBITMQ_MNESIA_BASE",
+ mnesia_base_dir,
+ '_',
+ Value1, Value1,
+ Value2, Value2).
+
+check_RABBITMQ_MNESIA_DIR(_) ->
+ Value1 = random_string(),
+ Value2 = random_string(),
+ check_prefixed_variable("RABBITMQ_MNESIA_DIR",
+ mnesia_dir,
+ '_',
+ Value1, Value1,
+ Value2, Value2).
+
+check_RABBITMQ_NODE_IP_ADDRESS(_) ->
+ Value1 = random_string(),
+ Value2 = random_string(),
+ check_prefixed_variable("RABBITMQ_NODE_IP_ADDRESS",
+ amqp_ipaddr,
+ "auto",
+ Value1, Value1,
+ Value2, Value2).
+
+check_RABBITMQ_NODE_PORT(_) ->
+ Value1 = random_int(),
+ Value2 = random_int(),
+ check_prefixed_variable("RABBITMQ_NODE_PORT",
+ amqp_tcp_port,
+ 5672,
+ integer_to_list(Value1), Value1,
+ integer_to_list(Value2), Value2).
+
+check_RABBITMQ_NODENAME(_) ->
+ DefaultNodename = get_default_nodename(),
+ {_, DefaultHostname} = rabbit_nodes_common:parts(DefaultNodename),
+
+ Value1 = random_atom(),
+ Value2 = random_atom(),
+ check_prefixed_variable("RABBITMQ_NODENAME",
+ nodename,
+ DefaultNodename,
+ atom_to_list(Value1),
+ list_to_atom(
+ atom_to_list(Value1) ++ "@" ++ DefaultHostname),
+ atom_to_list(Value2),
+ list_to_atom(
+ atom_to_list(Value2) ++ "@" ++ DefaultHostname)),
+
+ Value3 = list_to_atom(random_string() ++ "@" ++ random_string()),
+ Value4 = list_to_atom(random_string() ++ "@" ++ random_string()),
+ check_prefixed_variable("RABBITMQ_NODENAME",
+ nodename,
+ DefaultNodename,
+ atom_to_list(Value3), Value3,
+ atom_to_list(Value4), Value4).
+
+check_RABBITMQ_PID_FILE(_) ->
+ Value1 = random_string(),
+ Value2 = random_string(),
+ check_prefixed_variable("RABBITMQ_PID_FILE",
+ pid_file,
+ '_',
+ Value1, Value1,
+ Value2, Value2).
+
+check_RABBITMQ_PLUGINS_DIR(_) ->
+ Value1 = random_string(),
+ Value2 = random_string(),
+ check_prefixed_variable("RABBITMQ_PLUGINS_DIR",
+ plugins_path,
+ '_',
+ Value1, Value1,
+ Value2, Value2).
+
+check_RABBITMQ_PLUGINS_EXPAND_DIR(_) ->
+ Value1 = random_string(),
+ Value2 = random_string(),
+ check_prefixed_variable("RABBITMQ_PLUGINS_EXPAND_DIR",
+ plugins_expand_dir,
+ '_',
+ Value1, Value1,
+ Value2, Value2).
+
+check_RABBITMQ_PRODUCT_NAME(_) ->
+ Value1 = random_string(),
+ Value2 = random_string(),
+ check_prefixed_variable("RABBITMQ_PRODUCT_NAME",
+ product_name,
+ '_',
+ Value1, Value1,
+ Value2, Value2).
+
+check_RABBITMQ_PRODUCT_VERSION(_) ->
+ Value1 = random_string(),
+ Value2 = random_string(),
+ check_prefixed_variable("RABBITMQ_PRODUCT_VERSION",
+ product_version,
+ '_',
+ Value1, Value1,
+ Value2, Value2).
+
+check_RABBITMQ_MOTD_FILE(_) ->
+ Value1 = random_string(),
+ Value2 = random_string(),
+ check_prefixed_variable("RABBITMQ_MOTD_FILE",
+ motd_file,
+ '_',
+ Value1, Value1,
+ Value2, Value2).
+
+check_RABBITMQ_QUORUM_DIR(_) ->
+ Value1 = random_string(),
+ Value2 = random_string(),
+ check_prefixed_variable("RABBITMQ_QUORUM_DIR",
+ quorum_queue_dir,
+ '_',
+ Value1, Value1,
+ Value2, Value2).
+
+check_RABBITMQ_STREAM_DIR(_) ->
+ Value1 = random_string(),
+ Value2 = random_string(),
+ check_prefixed_variable("RABBITMQ_STREAM_DIR",
+ stream_queue_dir,
+ '_',
+ Value1, Value1,
+ Value2, Value2).
+check_RABBITMQ_USE_LOGNAME(_) ->
+ check_prefixed_variable("RABBITMQ_USE_LONGNAME",
+ nodename_type,
+ shortnames,
+ "true", longnames,
+ "false", shortnames).
+
+check_value_is_yes(_) ->
+ ?assert(rabbit_env:value_is_yes("1")),
+ ?assert(rabbit_env:value_is_yes("yes")),
+ ?assert(rabbit_env:value_is_yes("true")),
+ ?assertNot(rabbit_env:value_is_yes("0")),
+ ?assertNot(rabbit_env:value_is_yes("no")),
+ ?assertNot(rabbit_env:value_is_yes("false")),
+ ?assertNot(rabbit_env:value_is_yes(random_string() ++ ".")).
+
+check_log_process_env(_) ->
+ ok = rabbit_env:log_process_env().
+
+check_log_context(_) ->
+ Context = rabbit_env:get_context(),
+ ok = rabbit_env:log_context(Context).
+
+check_get_used_env_vars(_) ->
+ os:putenv("RABBITMQ_LOGS", "-"),
+ os:putenv("CONFIG_FILE", "filename"),
+ Vars = rabbit_env:get_used_env_vars(),
+ ?assert(lists:keymember("RABBITMQ_LOGS", 1, Vars)),
+ ?assert(lists:keymember("CONFIG_FILE", 1, Vars)),
+ ?assertNot(lists:keymember("HOME", 1, Vars)),
+ ?assertNot(lists:keymember("PATH", 1, Vars)),
+ os:unsetenv("RABBITMQ_LOGS"),
+ os:unsetenv("CONFIG_FILE").
+
+check_variable(Variable, Key, ValueToSet, Comparison) ->
+ os:putenv(Variable, ValueToSet),
+ ?assertMatch(#{Key := Comparison}, rabbit_env:get_context()),
+
+ os:unsetenv(Variable),
+ Context = rabbit_env:get_context(),
+ ?assertNotMatch(#{Key := Comparison}, Context),
+ ?assertMatch(#{Key := _}, Context).
+
+check_prefixed_variable("RABBITMQ_" ++ Variable = PrefixedVariable,
+ Key,
+ DefaultValue,
+ Value1ToSet, Comparison1,
+ Value2ToSet, Comparison2) ->
+ os:putenv(Variable, Value1ToSet),
+ os:unsetenv(PrefixedVariable),
+ ?assertMatch(#{Key := Comparison1}, rabbit_env:get_context()),
+
+ os:putenv(PrefixedVariable, Value2ToSet),
+ ?assertMatch(#{Key := Comparison2}, rabbit_env:get_context()),
+
+ os:unsetenv(Variable),
+ os:unsetenv(PrefixedVariable),
+ Context = rabbit_env:get_context(),
+ case DefaultValue of
+ '_' ->
+ ?assertNotMatch(#{Key := Comparison1}, Context),
+ ?assertNotMatch(#{Key := Comparison2}, Context),
+ ?assertMatch(#{Key := _}, Context);
+ _ ->
+ ?assertMatch(#{Key := DefaultValue}, Context)
+ end.
+
+random_int() -> rand:uniform(50000).
+random_string() -> integer_to_list(random_int()).
+random_atom() -> list_to_atom(random_string()).
+
+get_default_nodename() ->
+ CTNode = node(),
+ NodeS = re:replace(
+ atom_to_list(CTNode),
+ "^[^@]+@(.*)$",
+ "rabbit@\\1",
+ [{return, list}]),
+ list_to_atom(NodeS).
+
+check_parse_conf_env_file_output(_) ->
+ ?assertEqual(
+ #{},
+ rabbit_env:parse_conf_env_file_output2(
+ [],
+ #{}
+ )),
+ ?assertEqual(
+ #{"UNQUOTED" => "a",
+ "SINGLE_QUOTED" => "b",
+ "DOUBLE_QUOTED" => "c",
+ "SINGLE_DOLLAR" => "d"},
+ rabbit_env:parse_conf_env_file_output2(
+ ["UNQUOTED=a",
+ "SINGLE_QUOTED='b'",
+ "DOUBLE_QUOTED=\"c\"",
+ "SINGLE_DOLLAR=$'d'"],
+ #{}
+ )),
+ ?assertEqual(
+ #{"A" => "a",
+ "B" => "b",
+ "MULTI_LINE" => "\n'foobar'"},
+ rabbit_env:parse_conf_env_file_output2(
+ ["A=a",
+ "MULTI_LINE='",
+ "'\"'\"'foobar'\"'\"",
+ "B=b"],
+ #{}
+ )).
diff --git a/deps/rabbit_common/test/supervisor2_SUITE.erl b/deps/rabbit_common/test/supervisor2_SUITE.erl
new file mode 100644
index 0000000000..7b89363999
--- /dev/null
+++ b/deps/rabbit_common/test/supervisor2_SUITE.erl
@@ -0,0 +1,128 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(supervisor2_SUITE).
+
+-behaviour(supervisor2).
+
+-include_lib("common_test/include/ct.hrl").
+
+-compile(export_all).
+
+all() -> [intrinsic, delayed_restart].
+
+intrinsic(_Config) ->
+ false = process_flag(trap_exit, true),
+ Intensity = 5,
+ Args = {one_for_one, intrinsic, Intensity},
+ {passed, SupPid} = with_sup(Args, fun test_supervisor_intrinsic/1),
+ receive
+ {'EXIT', SupPid, shutdown} -> ok
+ end,
+ false = is_process_alive(SupPid).
+
+delayed_restart(_Config) ->
+ DelayInSeconds = 1,
+ Intensity = 1,
+ Args0 = {simple_one_for_one, {permanent, DelayInSeconds}, Intensity},
+ F = fun(SupPid) ->
+ {ok, _ChildPid} =
+ supervisor2:start_child(SupPid, []),
+ test_supervisor_delayed_restart(SupPid)
+ end,
+ {passed, _} = with_sup(Args0, F),
+
+ Args1 = {one_for_one, {permanent, DelayInSeconds}, Intensity},
+ {passed, _} = with_sup(Args1, fun test_supervisor_delayed_restart/1).
+
+test_supervisor_intrinsic(SupPid) ->
+ ok = ping_child(SupPid),
+
+ ok = exit_child(SupPid, abnormal),
+ ok = timer:sleep(100),
+ ok = ping_child(SupPid),
+
+ ok = exit_child(SupPid, {shutdown, restart}),
+ ok = timer:sleep(100),
+ ok = ping_child(SupPid),
+
+ ok = exit_child(SupPid, shutdown),
+ ok = timer:sleep(100),
+ passed.
+
+test_supervisor_delayed_restart(SupPid) ->
+ ok = ping_child(SupPid),
+
+ ok = exit_child(SupPid, abnormal),
+ ok = timer:sleep(100),
+ ok = ping_child(SupPid),
+
+ ok = exit_child(SupPid, abnormal),
+ ok = timer:sleep(100),
+ timeout = ping_child(SupPid),
+
+ ok = timer:sleep(1010),
+ ok = ping_child(SupPid),
+ passed.
+
+with_sup({RestartStrategy, Restart, Intensity}, Fun) ->
+ {ok, SupPid} = supervisor2:start_link(?MODULE, [RestartStrategy, Restart, Intensity]),
+ Res = Fun(SupPid),
+ true = unlink(SupPid),
+ {Res, SupPid}.
+
+init([RestartStrategy, Restart, Intensity]) ->
+ SupFlags = #{
+ strategy => RestartStrategy,
+ intensity => Intensity,
+ period => 1
+ },
+ ChildSpec = #{
+ id => test,
+ start => {?MODULE, start_child, []},
+ restart => Restart,
+ shutdown => 16#ffffffff,
+ type => worker,
+ modules => [?MODULE]
+ },
+ {ok, {SupFlags, [ChildSpec]}}.
+
+start_child() ->
+ {ok, proc_lib:spawn_link(fun run_child/0)}.
+
+ping_child(SupPid) ->
+ Ref = make_ref(),
+ F = fun(ChildPid) ->
+ ChildPid ! {ping, Ref, self()}
+ end,
+ with_child_pid(SupPid, F),
+ receive
+ {pong, Ref} -> ok
+ after 1000 -> timeout
+ end.
+
+exit_child(SupPid, ExitType) ->
+ F = fun(ChildPid) ->
+ exit(ChildPid, ExitType)
+ end,
+ with_child_pid(SupPid, F),
+ ok.
+
+with_child_pid(SupPid, Fun) ->
+ case supervisor2:which_children(SupPid) of
+ [{_Id, undefined, worker, [?MODULE]}] -> ok;
+ [{_Id, restarting, worker, [?MODULE]}] -> ok;
+ [{_Id, ChildPid, worker, [?MODULE]}] -> Fun(ChildPid);
+ [] -> ok
+ end.
+
+run_child() ->
+ receive
+ {ping, Ref, Pid} ->
+ Pid ! {pong, Ref},
+ run_child()
+ end.
diff --git a/deps/rabbit_common/test/unit_SUITE.erl b/deps/rabbit_common/test/unit_SUITE.erl
new file mode 100644
index 0000000000..925155211f
--- /dev/null
+++ b/deps/rabbit_common/test/unit_SUITE.erl
@@ -0,0 +1,446 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2016-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(unit_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("proper/include/proper.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-include("rabbit_memory.hrl").
+-include("rabbit.hrl").
+
+-compile(export_all).
+
+all() ->
+ [
+ {group, parallel_tests},
+ {group, parse_mem_limit},
+ {group, gen_server2}
+ ].
+
+groups() ->
+ [
+ {parallel_tests, [parallel], [
+ data_coercion_to_proplist,
+ data_coercion_to_list,
+ data_coercion_to_map,
+ pget,
+ encrypt_decrypt,
+ encrypt_decrypt_term,
+ version_equivalence,
+ pid_decompose_compose,
+ platform_and_version,
+ frame_encoding_does_not_fail_with_empty_binary_payload,
+ amqp_table_conversion,
+ name_type,
+ get_erl_path
+ ]},
+ {parse_mem_limit, [parallel], [
+ parse_mem_limit_relative_exactly_max,
+ parse_mem_relative_above_max,
+ parse_mem_relative_integer,
+ parse_mem_relative_invalid
+ ]},
+ {gen_server2, [parallel], [
+ stats_timer_is_working,
+ stats_timer_writes_gen_server2_metrics_if_core_metrics_ets_exists,
+ stop_stats_timer_on_hibernation,
+ stop_stats_timer_on_backoff,
+ stop_stats_timer_on_backoff_when_backoff_less_than_stats_timeout,
+ gen_server2_stop
+ ]}
+ ].
+
+init_per_group(_, Config) -> Config.
+end_per_group(_, Config) -> Config.
+
+init_per_testcase(_, Config) -> Config.
+
+end_per_testcase(stats_timer_is_working, Config) ->
+ reset_stats_interval(),
+ Config;
+end_per_testcase(stop_stats_timer_on_hibernation, Config) ->
+ reset_stats_interval(),
+ Config;
+end_per_testcase(stop_stats_timer_on_backoff, Config) ->
+ reset_stats_interval(),
+ Config;
+end_per_testcase(stop_stats_timer_on_backoff_when_backoff_less_than_stats_timeout, Config) ->
+ reset_stats_interval(),
+ Config;
+end_per_testcase(stats_timer_writes_gen_server2_metrics_if_core_metrics_ets_exists, Config) ->
+ rabbit_core_metrics:terminate(),
+ reset_stats_interval(),
+ Config;
+end_per_testcase(_, Config) -> Config.
+
+stats_timer_is_working(_) ->
+ StatsInterval = 300,
+ set_stats_interval(StatsInterval),
+
+ {ok, TestServer} = gen_server2_test_server:start_link(count_stats),
+ %% Start the emission
+ % TestServer ! emit_gen_server2_stats,
+
+ timer:sleep(StatsInterval * 4 + 100),
+ StatsCount = gen_server2_test_server:stats_count(TestServer),
+ ?assertEqual(4, StatsCount).
+
+stats_timer_writes_gen_server2_metrics_if_core_metrics_ets_exists(_) ->
+ rabbit_core_metrics:init(),
+
+ StatsInterval = 300,
+ set_stats_interval(StatsInterval),
+
+ {ok, TestServer} = gen_server2_test_server:start_link(),
+ timer:sleep(StatsInterval * 4),
+
+ %% No messages in the buffer
+ ?assertEqual(0, rabbit_core_metrics:get_gen_server2_stats(TestServer)),
+
+ %% Sleep to accumulate messages
+ gen_server2:cast(TestServer, {sleep, StatsInterval + 100}),
+
+ %% Sleep to get results
+ gen_server2:cast(TestServer, {sleep, 1000}),
+ gen_server2:cast(TestServer, ignore),
+ gen_server2:cast(TestServer, ignore),
+ gen_server2:cast(TestServer, ignore),
+
+ timer:sleep(StatsInterval + 150),
+ ?assertEqual(4, rabbit_core_metrics:get_gen_server2_stats(TestServer)).
+
+stop_stats_timer_on_hibernation(_) ->
+ StatsInterval = 300,
+ set_stats_interval(StatsInterval),
+
+ %% No backoff configured
+ {ok, TestServer} = gen_server2_test_server:start_link(count_stats),
+
+ ?assertEqual(ok, gen_server2:call(TestServer, hibernate)),
+
+ timer:sleep(50),
+
+ ?assertEqual({current_function,{erlang, hibernate, 3}},
+ erlang:process_info(TestServer, current_function)),
+
+ timer:sleep(StatsInterval * 6 + 100),
+ StatsCount1 = gen_server2_test_server:stats_count(TestServer),
+ %% The timer was stopped. No stats collected
+ %% The count is 1 because hibernation emits stats
+ ?assertEqual(1, StatsCount1),
+
+ %% A message will wake up the process
+ gen_server2:call(TestServer, wake_up),
+ gen_server2:call(TestServer, wake_up),
+ gen_server2:call(TestServer, wake_up),
+
+ timer:sleep(StatsInterval * 4 + 100),
+ StatsCount5 = gen_server2_test_server:stats_count(TestServer),
+ ?assertEqual(5, StatsCount5),
+ ?assertEqual(ok, gen_server2:call(TestServer, hibernate)),
+
+ timer:sleep(50),
+
+ {current_function,{erlang,hibernate,3}} =
+ erlang:process_info(TestServer, current_function),
+
+ timer:sleep(StatsInterval * 4 + 100),
+ StatsCount6 = gen_server2_test_server:stats_count(TestServer),
+ %% The timer was stopped. No stats collected
+ %% The count is 1 because hibernation emits stats
+ 6 = StatsCount6.
+
+stop_stats_timer_on_backoff(_) ->
+ StatsInterval = 300,
+ set_stats_interval(StatsInterval),
+
+ Backoff = 1000,
+ {ok, TestServer} =
+ gen_server2_test_server:start_link(
+ count_stats,
+ {backoff, Backoff, Backoff, 10000}),
+
+ ok = gen_server2:call(TestServer, hibernate),
+
+ {current_function,{gen_server2,process_next_msg,1}} =
+ erlang:process_info(TestServer, current_function),
+
+ %% Receiving messages during backoff period does not emit stats
+ timer:sleep(Backoff div 2),
+ ok = gen_server2:call(TestServer, hibernate),
+
+ timer:sleep(Backoff div 2 + 50),
+ ?assertEqual({current_function,{gen_server2,process_next_msg,1}},
+ erlang:process_info(TestServer, current_function)),
+
+ %% Hibernate after backoff time after last message
+ timer:sleep(Backoff div 2),
+ ?assertEqual({current_function,{erlang,hibernate,3}},
+ erlang:process_info(TestServer, current_function)),
+
+ timer:sleep(StatsInterval * 4 + 100),
+ StatsCount = gen_server2_test_server:stats_count(TestServer),
+ %% The timer was stopped. No stats collected
+ %% The count is 1 because hibernation emits stats
+ ?assertEqual(1, StatsCount),
+
+ %% A message will wake up the process
+ gen_server2:call(TestServer, wake_up),
+
+ timer:sleep(StatsInterval * 4 + 100),
+ StatsCount5 = gen_server2_test_server:stats_count(TestServer),
+ ?assertEqual(5, StatsCount5).
+
+stop_stats_timer_on_backoff_when_backoff_less_than_stats_timeout(_) ->
+ StatsInterval = 300,
+ set_stats_interval(StatsInterval),
+
+ Backoff = 200,
+ {ok, TestServer} =
+ gen_server2_test_server:start_link(
+ count_stats,
+ {backoff, Backoff, Backoff, 10000}),
+
+ ?assertEqual(ok, gen_server2:call(TestServer, hibernate)),
+
+ ?assertEqual({current_function, {gen_server2, process_next_msg, 1}},
+ erlang:process_info(TestServer, current_function)),
+
+ timer:sleep(Backoff + 50),
+
+ ?assertEqual({current_function, {erlang, hibernate, 3}},
+ erlang:process_info(TestServer, current_function)),
+
+ timer:sleep(StatsInterval * 4 + 100),
+ StatsCount = gen_server2_test_server:stats_count(TestServer),
+ %% The timer was stopped. No stats collected
+ %% The count is 1 because hibernation emits stats
+ ?assertEqual(1, StatsCount),
+
+ %% A message will wake up the process
+ gen_server2:call(TestServer, wake_up),
+
+ timer:sleep(StatsInterval * 4 + 100),
+ StatsCount5 = gen_server2_test_server:stats_count(TestServer),
+ ?assertEqual(5, StatsCount5).
+
+gen_server2_stop(_) ->
+ {ok, TestServer} = gen_server2_test_server:start_link(),
+ ?assertEqual(ok, gen_server2:stop(TestServer)),
+ ?assertEqual(false, erlang:is_process_alive(TestServer)),
+ ?assertEqual({'EXIT', noproc}, (catch gen_server:stop(TestServer))),
+ ok.
+
+parse_mem_limit_relative_exactly_max(_Config) ->
+ MemLimit = vm_memory_monitor:parse_mem_limit(1.0),
+ case MemLimit of
+ ?MAX_VM_MEMORY_HIGH_WATERMARK -> ok;
+ _ -> ct:fail(
+ "Expected memory limit to be ~p, but it was ~p",
+ [?MAX_VM_MEMORY_HIGH_WATERMARK, MemLimit]
+ )
+ end.
+
+parse_mem_relative_above_max(_Config) ->
+ MemLimit = vm_memory_monitor:parse_mem_limit(1.01),
+ case MemLimit of
+ ?MAX_VM_MEMORY_HIGH_WATERMARK -> ok;
+ _ -> ct:fail(
+ "Expected memory limit to be ~p, but it was ~p",
+ [?MAX_VM_MEMORY_HIGH_WATERMARK, MemLimit]
+ )
+ end.
+
+parse_mem_relative_integer(_Config) ->
+ MemLimit = vm_memory_monitor:parse_mem_limit(1),
+ case MemLimit of
+ ?MAX_VM_MEMORY_HIGH_WATERMARK -> ok;
+ _ -> ct:fail(
+ "Expected memory limit to be ~p, but it was ~p",
+ [?MAX_VM_MEMORY_HIGH_WATERMARK, MemLimit]
+ )
+ end.
+
+parse_mem_relative_invalid(_Config) ->
+ MemLimit = vm_memory_monitor:parse_mem_limit([255]),
+ case MemLimit of
+ ?DEFAULT_VM_MEMORY_HIGH_WATERMARK -> ok;
+ _ -> ct:fail(
+ "Expected memory limit to be ~p, but it was ~p",
+ [?DEFAULT_VM_MEMORY_HIGH_WATERMARK, MemLimit]
+ )
+ end.
+
+platform_and_version(_Config) ->
+ MajorVersion = erlang:system_info(otp_release),
+ Result = rabbit_misc:platform_and_version(),
+ RegExp = "^Erlang/OTP\s" ++ MajorVersion,
+ case re:run(Result, RegExp) of
+ nomatch -> ct:fail("~p does not match ~p", [Result, RegExp]);
+ {error, ErrType} -> ct:fail("~p", [ErrType]);
+ _ -> ok
+ end.
+
+data_coercion_to_map(_Config) ->
+ ?assertEqual(#{a => 1}, rabbit_data_coercion:to_map([{a, 1}])),
+ ?assertEqual(#{a => 1}, rabbit_data_coercion:to_map(#{a => 1})).
+
+data_coercion_to_proplist(_Config) ->
+ ?assertEqual([{a, 1}], rabbit_data_coercion:to_proplist([{a, 1}])),
+ ?assertEqual([{a, 1}], rabbit_data_coercion:to_proplist(#{a => 1})).
+
+data_coercion_to_list(_Config) ->
+ ?assertEqual([{a, 1}], rabbit_data_coercion:to_list([{a, 1}])),
+ ?assertEqual([{a, 1}], rabbit_data_coercion:to_list(#{a => 1})).
+
+pget(_Config) ->
+ ?assertEqual(1, rabbit_misc:pget(a, [{a, 1}])),
+ ?assertEqual(undefined, rabbit_misc:pget(b, [{a, 1}])),
+
+ ?assertEqual(1, rabbit_misc:pget(a, #{a => 1})),
+ ?assertEqual(undefined, rabbit_misc:pget(b, #{a => 1})).
+
+pid_decompose_compose(_Config) ->
+ Pid = self(),
+ {Node, Cre, Id, Ser} = rabbit_misc:decompose_pid(Pid),
+ Node = node(Pid),
+ Pid = rabbit_misc:compose_pid(Node, Cre, Id, Ser),
+ OtherNode = 'some_node@localhost',
+ PidOnOtherNode = rabbit_misc:pid_change_node(Pid, OtherNode),
+ {OtherNode, Cre, Id, Ser} = rabbit_misc:decompose_pid(PidOnOtherNode).
+
+encrypt_decrypt(_Config) ->
+ %% Take all available block ciphers.
+ Hashes = rabbit_pbe:supported_hashes(),
+ Ciphers = rabbit_pbe:supported_ciphers(),
+ %% For each cipher, try to encrypt and decrypt data sizes from 0 to 64 bytes
+ %% with a random passphrase.
+ _ = [begin
+ PassPhrase = crypto:strong_rand_bytes(16),
+ Iterations = rand:uniform(100),
+ Data = crypto:strong_rand_bytes(64),
+ [begin
+ Expected = binary:part(Data, 0, Len),
+ Enc = rabbit_pbe:encrypt(C, H, Iterations, PassPhrase, Expected),
+ Expected = iolist_to_binary(rabbit_pbe:decrypt(C, H, Iterations, PassPhrase, Enc))
+ end || Len <- lists:seq(0, byte_size(Data))]
+ end || H <- Hashes, C <- Ciphers],
+ ok.
+
+encrypt_decrypt_term(_Config) ->
+ %% Take all available block ciphers.
+ Hashes = rabbit_pbe:supported_hashes(),
+ Ciphers = rabbit_pbe:supported_ciphers(),
+ %% Different Erlang terms to try encrypting.
+ DataSet = [
+ 10000,
+ [5672],
+ [{"127.0.0.1", 5672},
+ {"::1", 5672}],
+ [{connection, info}, {channel, info}],
+ [{cacertfile, "/path/to/testca/cacert.pem"},
+ {certfile, "/path/to/server/cert.pem"},
+ {keyfile, "/path/to/server/key.pem"},
+ {verify, verify_peer},
+ {fail_if_no_peer_cert, false}],
+ [<<".*">>, <<".*">>, <<".*">>]
+ ],
+ _ = [begin
+ PassPhrase = crypto:strong_rand_bytes(16),
+ Iterations = rand:uniform(100),
+ Enc = rabbit_pbe:encrypt_term(C, H, Iterations, PassPhrase, Data),
+ Data = rabbit_pbe:decrypt_term(C, H, Iterations, PassPhrase, Enc)
+ end || H <- Hashes, C <- Ciphers, Data <- DataSet],
+ ok.
+
+version_equivalence(_Config) ->
+ true = rabbit_misc:version_minor_equivalent("3.0.0", "3.0.0"),
+ true = rabbit_misc:version_minor_equivalent("3.0.0", "3.0.1"),
+ true = rabbit_misc:version_minor_equivalent("%%VSN%%", "%%VSN%%"),
+ true = rabbit_misc:version_minor_equivalent("3.0.0", "3.0"),
+ true = rabbit_misc:version_minor_equivalent("3.0.0", "3.0.0.1"),
+ true = rabbit_misc:version_minor_equivalent("3.0.0.1", "3.0.0.3"),
+ true = rabbit_misc:version_minor_equivalent("3.0.0.1", "3.0.1.3"),
+ true = rabbit_misc:version_minor_equivalent("3.0.0", "3.0.foo"),
+ false = rabbit_misc:version_minor_equivalent("3.0.0", "3.1.0"),
+ false = rabbit_misc:version_minor_equivalent("3.0.0.1", "3.1.0.1"),
+
+ false = rabbit_misc:version_minor_equivalent("3.5.7", "3.6.7"),
+ false = rabbit_misc:version_minor_equivalent("3.6.5", "3.6.6"),
+ false = rabbit_misc:version_minor_equivalent("3.6.6", "3.7.0"),
+ true = rabbit_misc:version_minor_equivalent("3.6.7", "3.6.6"),
+
+ %% Starting with RabbitMQ 3.7.x and feature flags introduced in
+ %% RabbitMQ 3.8.x, versions are considered equivalent and the actual
+ %% check is deferred to the feature flags module.
+ false = rabbit_misc:version_minor_equivalent("3.6.0", "3.8.0"),
+ true = rabbit_misc:version_minor_equivalent("3.7.0", "3.8.0"),
+ true = rabbit_misc:version_minor_equivalent("3.7.0", "3.10.0"),
+
+ true = rabbit_misc:version_minor_equivalent(<<"3.0.0">>, <<"3.0.0">>),
+ true = rabbit_misc:version_minor_equivalent(<<"3.0.0">>, <<"3.0.1">>),
+ true = rabbit_misc:version_minor_equivalent(<<"%%VSN%%">>, <<"%%VSN%%">>),
+ true = rabbit_misc:version_minor_equivalent(<<"3.0.0">>, <<"3.0">>),
+ true = rabbit_misc:version_minor_equivalent(<<"3.0.0">>, <<"3.0.0.1">>),
+ false = rabbit_misc:version_minor_equivalent(<<"3.0.0">>, <<"3.1.0">>),
+ false = rabbit_misc:version_minor_equivalent(<<"3.0.0.1">>, <<"3.1.0.1">>).
+
+frame_encoding_does_not_fail_with_empty_binary_payload(_Config) ->
+ [begin
+ Content = #content{
+ class_id = 60, properties = none, properties_bin = <<0,0>>, protocol = rabbit_framing_amqp_0_9_1,
+ payload_fragments_rev = P
+ },
+ ExpectedFrames = rabbit_binary_generator:build_simple_content_frames(1, Content, 0, rabbit_framing_amqp_0_9_1)
+ end || {P, ExpectedFrames} <- [
+ {[], [[<<2,0,1,0,0,0,14>>,[<<0,60,0,0,0,0,0,0,0,0,0,0>>,<<0,0>>],206]]},
+ {[<<>>], [[<<2,0,1,0,0,0,14>>,[<<0,60,0,0,0,0,0,0,0,0,0,0>>,<<0,0>>],206]]},
+ {[<<"payload">>], [[<<2,0,1,0,0,0,14>>,[<<0,60,0,0,0,0,0,0,0,0,0,7>>,<<0,0>>],206],
+ [<<3,0,1,0,0,0,7>>,[<<"payload">>],206]]}
+ ]],
+ ok.
+
+amqp_table_conversion(_Config) ->
+ assert_table(#{}, []),
+ assert_table(#{<<"x-expires">> => 1000},
+ [{<<"x-expires">>, long, 1000}]),
+ assert_table(#{<<"x-forwarding">> =>
+ [#{<<"uri">> => <<"amqp://localhost/%2F/upstream">>}]},
+ [{<<"x-forwarding">>, array,
+ [{table, [{<<"uri">>, longstr,
+ <<"amqp://localhost/%2F/upstream">>}]}]}]).
+
+assert_table(JSON, AMQP) ->
+ ?assertEqual(JSON, rabbit_misc:amqp_table(AMQP)),
+ ?assertEqual(AMQP, rabbit_misc:to_amqp_table(JSON)).
+
+
+set_stats_interval(Interval) ->
+ application:set_env(rabbit, collect_statistics, coarse),
+ application:set_env(rabbit, collect_statistics_interval, Interval).
+
+reset_stats_interval() ->
+ application:unset_env(rabbit, collect_statistics),
+ application:unset_env(rabbit, collect_statistics_interval).
+
+name_type(_) ->
+ ?assertEqual(shortnames, rabbit_nodes_common:name_type(rabbit)),
+ ?assertEqual(shortnames, rabbit_nodes_common:name_type(rabbit@localhost)),
+ ?assertEqual(longnames, rabbit_nodes_common:name_type('rabbit@localhost.example.com')),
+ ok.
+
+get_erl_path(_) ->
+ Exe = rabbit_runtime:get_erl_path(),
+ case os:type() of
+ {win32, _} ->
+ ?assertNotMatch(nomatch, string:find(Exe, "erl.exe"));
+ _ ->
+ ?assertNotMatch(nomatch, string:find(Exe, "erl"))
+ end,
+ ok.
diff --git a/deps/rabbit_common/test/unit_priority_queue_SUITE.erl b/deps/rabbit_common/test/unit_priority_queue_SUITE.erl
new file mode 100644
index 0000000000..8d58c72f10
--- /dev/null
+++ b/deps/rabbit_common/test/unit_priority_queue_SUITE.erl
@@ -0,0 +1,35 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2018-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(unit_priority_queue_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-compile(export_all).
+
+all() ->
+ [
+ member,
+ member_priority_queue
+ ].
+
+member(_Config) ->
+ Q = lists:foldl(fun(V, Acc) -> priority_queue:in(V, Acc) end, priority_queue:new(), lists:seq(1, 10)),
+ ?assert(priority_queue:member(1, Q)),
+ ?assert(priority_queue:member(2, Q)),
+ ?assertNot(priority_queue:member(100, Q)),
+ ?assertNot(priority_queue:member(1, priority_queue:new())),
+ ok.
+
+member_priority_queue(_Config) ->
+ Q = lists:foldl(fun(V, Acc) -> priority_queue:in(V, V rem 4, Acc) end, priority_queue:new(),
+ lists:seq(1, 100)),
+ ?assert(priority_queue:member(1, Q)),
+ ?assert(priority_queue:member(50, Q)),
+ ?assertNot(priority_queue:member(200, Q)),
+ ok.
diff --git a/deps/rabbit_common/test/worker_pool_SUITE.erl b/deps/rabbit_common/test/worker_pool_SUITE.erl
new file mode 100644
index 0000000000..a50104f6c7
--- /dev/null
+++ b/deps/rabbit_common/test/worker_pool_SUITE.erl
@@ -0,0 +1,220 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(worker_pool_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-compile(export_all).
+
+-define(POOL_SIZE, 1).
+-define(POOL_NAME, test_pool).
+
+all() ->
+ [
+ run_code_synchronously,
+ run_code_asynchronously,
+ set_timeout,
+ cancel_timeout,
+ cancel_timeout_by_setting,
+ dispatch_async_blocks_until_task_begins
+ ].
+
+init_per_testcase(_, Config) ->
+ {ok, Pool} = worker_pool_sup:start_link(?POOL_SIZE, ?POOL_NAME),
+ [{pool_sup, Pool} | Config].
+
+end_per_testcase(_, Config) ->
+ Pool = ?config(pool_sup, Config),
+ unlink(Pool),
+ exit(Pool, kill).
+
+run_code_synchronously(_) ->
+ Self = self(),
+ Test = make_ref(),
+ Sleep = 200,
+ {Time, Result} = timer:tc(fun() ->
+ worker_pool:submit(?POOL_NAME,
+ fun() ->
+ timer:sleep(Sleep),
+ Self ! {hi, Test},
+ self()
+ end,
+ reuse)
+ end),
+ % Worker run synchronously
+ true = Time > Sleep,
+ % Worker have sent message
+ receive {hi, Test} -> ok
+ after 0 -> error(no_message_from_worker)
+ end,
+ % Worker is a separate process
+ true = (Self /= Result).
+
+run_code_asynchronously(_) ->
+ Self = self(),
+ Test = make_ref(),
+ Sleep = 200,
+ {Time, Result} = timer:tc(fun() ->
+ worker_pool:submit_async(?POOL_NAME,
+ fun() ->
+ timer:sleep(Sleep),
+ Self ! {hi, Test},
+ self()
+ end)
+ end),
+ % Worker run synchronously
+ true = Time < Sleep,
+ % Worker have sent message
+ receive {hi, Test} -> ok
+ after Sleep + 100 -> error(no_message_from_worker)
+ end,
+ % Worker is a separate process
+ true = (Self /= Result).
+
+set_timeout(_) ->
+ Self = self(),
+ Test = make_ref(),
+ Worker = worker_pool:submit(?POOL_NAME,
+ fun() ->
+ Worker = self(),
+ timer:sleep(100),
+ worker_pool_worker:set_timeout(
+ my_timeout, 1000,
+ fun() ->
+ Self ! {hello, self(), Test}
+ end),
+ Worker
+ end,
+ reuse),
+
+ % Timeout will occur after 1000 ms only
+ receive {hello, Worker, Test} -> exit(timeout_should_wait)
+ after 0 -> ok
+ end,
+
+ timer:sleep(1000),
+
+ receive {hello, Worker, Test} -> ok
+ after 1000 -> exit(timeout_is_late)
+ end.
+
+
+cancel_timeout(_) ->
+ Self = self(),
+ Test = make_ref(),
+ Worker = worker_pool:submit(?POOL_NAME,
+ fun() ->
+ Worker = self(),
+ timer:sleep(100),
+ worker_pool_worker:set_timeout(
+ my_timeout, 1000,
+ fun() ->
+ Self ! {hello, self(), Test}
+ end),
+ Worker
+ end,
+ reuse),
+
+ % Timeout will occur after 1000 ms only
+ receive {hello, Worker, Test} -> exit(timeout_should_wait)
+ after 0 -> ok
+ end,
+
+ worker_pool_worker:next_job_from(Worker, Self),
+ Worker = worker_pool_worker:submit(Worker,
+ fun() ->
+ worker_pool_worker:clear_timeout(my_timeout),
+ Worker
+ end,
+ reuse),
+
+ timer:sleep(1000),
+ receive {hello, Worker, Test} -> exit(timeout_is_not_cancelled)
+ after 0 -> ok
+ end.
+
+cancel_timeout_by_setting(_) ->
+ Self = self(),
+ Test = make_ref(),
+ Worker = worker_pool:submit(?POOL_NAME,
+ fun() ->
+ Worker = self(),
+ timer:sleep(100),
+ worker_pool_worker:set_timeout(
+ my_timeout, 1000,
+ fun() ->
+ Self ! {hello, self(), Test}
+ end),
+ Worker
+ end,
+ reuse),
+
+ % Timeout will occur after 1000 ms only
+ receive {hello, Worker, Test} -> exit(timeout_should_wait)
+ after 0 -> ok
+ end,
+
+ worker_pool_worker:next_job_from(Worker, Self),
+ Worker = worker_pool_worker:submit(Worker,
+ fun() ->
+ worker_pool_worker:set_timeout(my_timeout, 1000,
+ fun() ->
+ Self ! {hello_reset, self(), Test}
+ end),
+ Worker
+ end,
+ reuse),
+
+ timer:sleep(1000),
+ receive {hello, Worker, Test} -> exit(timeout_is_not_cancelled)
+ after 0 -> ok
+ end,
+
+ receive {hello_reset, Worker, Test} -> ok
+ after 1000 -> exit(timeout_is_late)
+ end.
+
+dispatch_async_blocks_until_task_begins(_) ->
+ Self = self(),
+
+ Waiter = fun() ->
+ Self ! {register, self()},
+ receive
+ go -> ok
+ end
+ end,
+
+ ok = worker_pool:dispatch_sync(?POOL_NAME, Waiter),
+ SomeWorker = receive
+ {register, WPid} -> WPid
+ after 250 ->
+ none
+ end,
+ ?assert(is_process_alive(SomeWorker), "Dispatched tasks should be running"),
+ spawn(fun() ->
+ ok = worker_pool:dispatch_sync(?POOL_NAME,
+ Waiter),
+ Self ! done_waiting,
+ exit(normal)
+ end),
+ DidWait = receive
+ done_waiting ->
+ false
+ after 250 ->
+ true
+ end,
+ ?assert(DidWait, "dispatch_sync should block until there is a free worker"),
+ SomeWorker ! go,
+ DidFinish = receive
+ done_waiting ->
+ true
+ after 250 ->
+ false
+ end,
+ ?assert(DidFinish, "appearance of a free worker should unblock the dispatcher").