summaryrefslogtreecommitdiff
path: root/deps/rabbit/test/per_user_connection_channel_tracking_SUITE.erl
diff options
context:
space:
mode:
Diffstat (limited to 'deps/rabbit/test/per_user_connection_channel_tracking_SUITE.erl')
-rw-r--r--deps/rabbit/test/per_user_connection_channel_tracking_SUITE.erl850
1 files changed, 850 insertions, 0 deletions
diff --git a/deps/rabbit/test/per_user_connection_channel_tracking_SUITE.erl b/deps/rabbit/test/per_user_connection_channel_tracking_SUITE.erl
new file mode 100644
index 0000000000..8b4bd91d09
--- /dev/null
+++ b/deps/rabbit/test/per_user_connection_channel_tracking_SUITE.erl
@@ -0,0 +1,850 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(per_user_connection_channel_tracking_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
+
+-compile(export_all).
+
+all() ->
+ [
+ {group, cluster_size_1_network},
+ {group, cluster_size_2_network},
+ {group, cluster_size_1_direct},
+ {group, cluster_size_2_direct}
+ ].
+
+groups() ->
+ ClusterSize1Tests = [
+ single_node_user_connection_channel_tracking,
+ single_node_user_deletion,
+ single_node_vhost_down_mimic,
+ single_node_vhost_deletion
+ ],
+ ClusterSize2Tests = [
+ cluster_user_deletion,
+ cluster_vhost_down_mimic,
+ cluster_vhost_deletion,
+ cluster_node_removed
+ ],
+ [
+ {cluster_size_1_network, [], ClusterSize1Tests},
+ {cluster_size_2_network, [], ClusterSize2Tests},
+ {cluster_size_1_direct, [], ClusterSize1Tests},
+ {cluster_size_2_direct, [], ClusterSize2Tests}
+ ].
+
+suite() ->
+ [
+ %% If a test hangs, no need to wait for 30 minutes.
+ {timetrap, {minutes, 8}}
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(cluster_size_1_network, Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, network}]),
+ init_per_multinode_group(cluster_size_1_network, Config1, 1);
+init_per_group(cluster_size_2_network, Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, network}]),
+ init_per_multinode_group(cluster_size_2_network, Config1, 2);
+init_per_group(cluster_size_1_direct, Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, direct}]),
+ init_per_multinode_group(cluster_size_1_direct, Config1, 1);
+init_per_group(cluster_size_2_direct, Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, direct}]),
+ init_per_multinode_group(cluster_size_2_direct, Config1, 2).
+
+init_per_multinode_group(Group, Config, NodeCount) ->
+ Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodes_count, NodeCount},
+ {rmq_nodename_suffix, Suffix}
+ ]),
+ Config2 = rabbit_ct_helpers:run_steps(
+ Config1, rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()),
+ EnableFF = rabbit_ct_broker_helpers:enable_feature_flag(
+ Config2, user_limits),
+ case EnableFF of
+ ok ->
+ Config2;
+ Skip ->
+ end_per_group(Group, Config2),
+ Skip
+ end.
+
+end_per_group(_Group, Config) ->
+ rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase),
+ clear_all_connection_tracking_tables(Config),
+ clear_all_channel_tracking_tables(Config),
+ Config.
+
+end_per_testcase(Testcase, Config) ->
+ clear_all_connection_tracking_tables(Config),
+ clear_all_channel_tracking_tables(Config),
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+clear_all_connection_tracking_tables(Config) ->
+ [rabbit_ct_broker_helpers:rpc(Config,
+ N,
+ rabbit_connection_tracking,
+ clear_tracking_tables,
+ []) || N <- rabbit_ct_broker_helpers:get_node_configs(Config, nodename)].
+
+clear_all_channel_tracking_tables(Config) ->
+ [rabbit_ct_broker_helpers:rpc(Config,
+ N,
+ rabbit_channel_tracking,
+ clear_tracking_tables,
+ []) || N <- rabbit_ct_broker_helpers:get_node_configs(Config, nodename)].
+
+%% -------------------------------------------------------------------
+%% Test cases.
+%% -------------------------------------------------------------------
+single_node_user_connection_channel_tracking(Config) ->
+ Username = proplists:get_value(rmq_username, Config),
+ Username2 = <<"guest2">>,
+
+ Vhost = proplists:get_value(rmq_vhost, Config),
+
+ rabbit_ct_broker_helpers:add_user(Config, Username2),
+ rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost),
+
+ ?assertEqual(0, count_connections_in(Config, Username)),
+ ?assertEqual(0, count_connections_in(Config, Username2)),
+ ?assertEqual(0, count_channels_in(Config, Username)),
+ ?assertEqual(0, count_channels_in(Config, Username2)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username2)),
+
+ [Conn1] = open_connections(Config, [0]),
+ [Chan1] = open_channels(Conn1, 1),
+ [#tracked_connection{username = Username}] = connections_in(Config, Username),
+ [#tracked_channel{username = Username}] = channels_in(Config, Username),
+ ?assertEqual(true, is_process_alive(Conn1)),
+ ?assertEqual(true, is_process_alive(Chan1)),
+ close_channels([Chan1]),
+ ?awaitMatch(0, count_channels_in(Config, Username), 20000),
+ ?awaitMatch(0, tracked_user_channel_count(Config, Username), 20000),
+ ?awaitMatch(false, is_process_alive(Chan1), 20000),
+ close_connections([Conn1]),
+ ?awaitMatch(0, length(connections_in(Config, Username)), 20000),
+ ?awaitMatch(0, tracked_user_connection_count(Config, Username), 20000),
+ ?awaitMatch(false, is_process_alive(Conn1), 20000),
+
+ [Conn2] = open_connections(Config, [{0, Username2}]),
+ Chans2 = [_|_] = open_channels(Conn2, 5),
+ timer:sleep(100),
+ [#tracked_connection{username = Username2}] = connections_in(Config, Username2),
+ ?assertEqual(5, count_channels_in(Config, Username2)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username2)),
+ ?assertEqual(true, is_process_alive(Conn2)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2],
+
+ [Conn3] = open_connections(Config, [0]),
+ Chans3 = [_|_] = open_channels(Conn3, 5),
+ [#tracked_connection{username = Username}] = connections_in(Config, Username),
+ ?assertEqual(5, count_channels_in(Config, Username)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(true, is_process_alive(Conn3)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans3],
+
+ [Conn4] = open_connections(Config, [0]),
+ Chans4 = [_|_] = open_channels(Conn4, 5),
+ ?assertEqual(2, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(10, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(true, is_process_alive(Conn4)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans4],
+ kill_connections([Conn4]),
+ [#tracked_connection{username = Username}] = connections_in(Config, Username),
+ ?awaitMatch(5, count_channels_in(Config, Username), 20000),
+ ?awaitMatch(1, tracked_user_connection_count(Config, Username), 20000),
+ ?awaitMatch(5, tracked_user_channel_count(Config, Username), 20000),
+ ?assertEqual(false, is_process_alive(Conn4)),
+ [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans4],
+
+ [Conn5] = open_connections(Config, [0]),
+ Chans5 = [_|_] = open_channels(Conn5, 7),
+ [Username, Username] =
+ lists:map(fun (#tracked_connection{username = U}) -> U end,
+ connections_in(Config, Username)),
+ ?assertEqual(12, count_channels_in(Config, Username)),
+ ?assertEqual(12, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(2, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(true, is_process_alive(Conn5)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans5],
+
+ close_channels(Chans2 ++ Chans3 ++ Chans5),
+ ?awaitMatch(0, length(all_channels(Config)), 20000),
+ ?awaitMatch(0, tracked_user_channel_count(Config, Username), 20000),
+ ?awaitMatch(0, tracked_user_channel_count(Config, Username2), 20000),
+
+ close_connections([Conn2, Conn3, Conn5]),
+ rabbit_ct_broker_helpers:delete_user(Config, Username2),
+ ?awaitMatch(0, tracked_user_connection_count(Config, Username), 20000),
+ ?awaitMatch(0, tracked_user_connection_count(Config, Username2), 20000),
+ ?awaitMatch(0, length(all_connections(Config)), 20000).
+
+single_node_user_deletion(Config) ->
+ set_tracking_execution_timeout(Config, 100),
+
+ Username = proplists:get_value(rmq_username, Config),
+ Username2 = <<"guest2">>,
+
+ Vhost = proplists:get_value(rmq_vhost, Config),
+
+ rabbit_ct_broker_helpers:add_user(Config, Username2),
+ rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost),
+
+ ?assertEqual(100, get_tracking_execution_timeout(Config)),
+
+ ?assertEqual(0, count_connections_in(Config, Username)),
+ ?assertEqual(0, count_connections_in(Config, Username2)),
+ ?assertEqual(0, count_channels_in(Config, Username)),
+ ?assertEqual(0, count_channels_in(Config, Username2)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username2)),
+
+ [Conn1] = open_connections(Config, [0]),
+ Chans1 = [_|_] = open_channels(Conn1, 5),
+ ?assertEqual(1, count_connections_in(Config, Username)),
+ ?assertEqual(5, count_channels_in(Config, Username)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(true, is_process_alive(Conn1)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],
+
+ [Conn2] = open_connections(Config, [{0, Username2}]),
+ Chans2 = [_|_] = open_channels(Conn2, 5),
+ ?assertEqual(1, count_connections_in(Config, Username2)),
+ ?assertEqual(5, count_channels_in(Config, Username2)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username2)),
+ ?assertEqual(true, is_process_alive(Conn2)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2],
+
+ ?assertEqual(true, exists_in_tracked_connection_per_user_table(Config, Username2)),
+ ?assertEqual(true, exists_in_tracked_channel_per_user_table(Config, Username2)),
+
+ rabbit_ct_broker_helpers:delete_user(Config, Username2),
+ timer:sleep(100),
+ ?assertEqual(0, count_connections_in(Config, Username2)),
+ ?assertEqual(0, count_channels_in(Config, Username2)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username2)),
+ ?assertEqual(false, is_process_alive(Conn2)),
+ [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2],
+
+ %% ensure vhost entry is cleared after 'tracking_execution_timeout'
+ ?awaitMatch(false, exists_in_tracked_connection_per_user_table(Config, Username2), 20000),
+ ?awaitMatch(false, exists_in_tracked_channel_per_user_table(Config, Username2), 20000),
+
+ ?assertEqual(1, count_connections_in(Config, Username)),
+ ?assertEqual(5, count_channels_in(Config, Username)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(true, is_process_alive(Conn1)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],
+
+ close_channels(Chans1),
+ ?awaitMatch(0, count_channels_in(Config, Username), 20000),
+ ?awaitMatch(0, tracked_user_channel_count(Config, Username), 20000),
+
+ close_connections([Conn1]),
+ ?awaitMatch(0, count_connections_in(Config, Username), 20000),
+ ?awaitMatch(0, tracked_user_connection_count(Config, Username), 20000).
+
+single_node_vhost_deletion(Config) ->
+ set_tracking_execution_timeout(Config, 100),
+
+ Username = proplists:get_value(rmq_username, Config),
+ Username2 = <<"guest2">>,
+
+ Vhost = proplists:get_value(rmq_vhost, Config),
+
+ rabbit_ct_broker_helpers:add_user(Config, Username2),
+ rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost),
+
+ ?assertEqual(100, get_tracking_execution_timeout(Config)),
+
+ ?assertEqual(0, count_connections_in(Config, Username)),
+ ?assertEqual(0, count_connections_in(Config, Username2)),
+ ?assertEqual(0, count_channels_in(Config, Username)),
+ ?assertEqual(0, count_channels_in(Config, Username2)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username2)),
+
+ [Conn1] = open_connections(Config, [0]),
+ Chans1 = [_|_] = open_channels(Conn1, 5),
+ ?assertEqual(1, count_connections_in(Config, Username)),
+ ?assertEqual(5, count_channels_in(Config, Username)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(true, is_process_alive(Conn1)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],
+
+ [Conn2] = open_connections(Config, [{0, Username2}]),
+ Chans2 = [_|_] = open_channels(Conn2, 5),
+ ?assertEqual(1, count_connections_in(Config, Username2)),
+ ?assertEqual(5, count_channels_in(Config, Username2)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username2)),
+ ?assertEqual(true, is_process_alive(Conn2)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2],
+
+ ?assertEqual(true, exists_in_tracked_connection_per_vhost_table(Config, Vhost)),
+
+ rabbit_ct_broker_helpers:delete_vhost(Config, Vhost),
+ timer:sleep(200),
+ ?assertEqual(0, count_connections_in(Config, Username2)),
+ ?assertEqual(0, count_channels_in(Config, Username2)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username2)),
+ ?assertEqual(false, is_process_alive(Conn2)),
+ [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2],
+
+ ?assertEqual(0, count_connections_in(Config, Username)),
+ ?assertEqual(0, count_channels_in(Config, Username)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(false, is_process_alive(Conn1)),
+ [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans1],
+
+ %% ensure vhost entry is cleared after 'tracking_execution_timeout'
+ ?assertEqual(false, exists_in_tracked_connection_per_vhost_table(Config, Vhost)),
+
+ rabbit_ct_broker_helpers:add_vhost(Config, Vhost).
+
+single_node_vhost_down_mimic(Config) ->
+ Username = proplists:get_value(rmq_username, Config),
+ Username2 = <<"guest2">>,
+
+ Vhost = proplists:get_value(rmq_vhost, Config),
+
+ rabbit_ct_broker_helpers:add_user(Config, Username2),
+ rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost),
+
+ ?assertEqual(0, count_connections_in(Config, Username)),
+ ?assertEqual(0, count_connections_in(Config, Username2)),
+ ?assertEqual(0, count_channels_in(Config, Username)),
+ ?assertEqual(0, count_channels_in(Config, Username2)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username2)),
+
+ [Conn1] = open_connections(Config, [0]),
+ Chans1 = [_|_] = open_channels(Conn1, 5),
+ ?assertEqual(1, count_connections_in(Config, Username)),
+ ?assertEqual(5, count_channels_in(Config, Username)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(true, is_process_alive(Conn1)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],
+
+ [Conn2] = open_connections(Config, [{0, Username2}]),
+ Chans2 = [_|_] = open_channels(Conn2, 5),
+ ?assertEqual(1, count_connections_in(Config, Username2)),
+ ?assertEqual(5, count_channels_in(Config, Username2)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username2)),
+ ?assertEqual(true, is_process_alive(Conn2)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2],
+
+ %% mimic vhost down event, while connections exist
+ mimic_vhost_down(Config, 0, Vhost),
+ timer:sleep(200),
+ ?assertEqual(0, count_connections_in(Config, Username2)),
+ ?assertEqual(0, count_channels_in(Config, Username2)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username2)),
+ ?assertEqual(false, is_process_alive(Conn2)),
+ [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2],
+
+ ?assertEqual(0, count_connections_in(Config, Username)),
+ ?assertEqual(0, count_channels_in(Config, Username)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(false, is_process_alive(Conn1)),
+ [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans1].
+
+cluster_user_deletion(Config) ->
+ set_tracking_execution_timeout(Config, 0, 100),
+ set_tracking_execution_timeout(Config, 1, 100),
+ Username = proplists:get_value(rmq_username, Config),
+ Username2 = <<"guest2">>,
+
+ Vhost = proplists:get_value(rmq_vhost, Config),
+
+ rabbit_ct_broker_helpers:add_user(Config, Username2),
+ rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost),
+
+ ?assertEqual(100, get_tracking_execution_timeout(Config, 0)),
+ ?assertEqual(100, get_tracking_execution_timeout(Config, 1)),
+
+ ?assertEqual(0, count_connections_in(Config, Username)),
+ ?assertEqual(0, count_connections_in(Config, Username2)),
+ ?assertEqual(0, count_channels_in(Config, Username)),
+ ?assertEqual(0, count_channels_in(Config, Username2)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username2)),
+
+ [Conn1] = open_connections(Config, [0]),
+ Chans1 = [_|_] = open_channels(Conn1, 5),
+ ?assertEqual(1, count_connections_in(Config, Username)),
+ ?assertEqual(5, count_channels_in(Config, Username)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(true, is_process_alive(Conn1)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],
+
+ [Conn2] = open_connections(Config, [{1, Username2}]),
+ Chans2 = [_|_] = open_channels(Conn2, 5),
+ ?assertEqual(1, count_connections_in(Config, Username2)),
+ ?assertEqual(5, count_channels_in(Config, Username2)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username2)),
+ ?assertEqual(true, is_process_alive(Conn2)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2],
+
+ ?assertEqual(true, exists_in_tracked_connection_per_user_table(Config, 1, Username2)),
+ ?assertEqual(true, exists_in_tracked_channel_per_user_table(Config, 1, Username2)),
+
+ rabbit_ct_broker_helpers:delete_user(Config, Username2),
+ timer:sleep(200),
+ ?assertEqual(0, count_connections_in(Config, Username2)),
+ ?assertEqual(0, count_channels_in(Config, Username2)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username2)),
+ ?assertEqual(false, is_process_alive(Conn2)),
+ [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2],
+
+ %% ensure user entry is cleared after 'tracking_execution_timeout'
+ ?assertEqual(false, exists_in_tracked_connection_per_user_table(Config, 1, Username2)),
+ ?assertEqual(false, exists_in_tracked_channel_per_user_table(Config, 1, Username2)),
+
+ close_channels(Chans1),
+ ?awaitMatch(0, count_channels_in(Config, Username), 20000),
+ ?awaitMatch(0, tracked_user_channel_count(Config, Username), 20000),
+
+ close_connections([Conn1]),
+ ?awaitMatch(0, count_connections_in(Config, Username), 20000),
+ ?awaitMatch(0, tracked_user_connection_count(Config, Username), 20000).
+
+cluster_vhost_deletion(Config) ->
+ set_tracking_execution_timeout(Config, 0, 100),
+ set_tracking_execution_timeout(Config, 1, 100),
+ Username = proplists:get_value(rmq_username, Config),
+ Username2 = <<"guest2">>,
+
+ Vhost = proplists:get_value(rmq_vhost, Config),
+
+ rabbit_ct_broker_helpers:add_user(Config, Username2),
+ rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost),
+
+ ?assertEqual(100, get_tracking_execution_timeout(Config, 0)),
+ ?assertEqual(100, get_tracking_execution_timeout(Config, 1)),
+
+ ?assertEqual(0, count_connections_in(Config, Username)),
+ ?assertEqual(0, count_connections_in(Config, Username2)),
+ ?assertEqual(0, count_channels_in(Config, Username)),
+ ?assertEqual(0, count_channels_in(Config, Username2)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username2)),
+
+ [Conn1] = open_connections(Config, [{0, Username}]),
+ Chans1 = [_|_] = open_channels(Conn1, 5),
+ ?assertEqual(1, count_connections_in(Config, Username)),
+ ?assertEqual(5, count_channels_in(Config, Username)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(true, is_process_alive(Conn1)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],
+
+ [Conn2] = open_connections(Config, [{1, Username2}]),
+ Chans2 = [_|_] = open_channels(Conn2, 5),
+ ?assertEqual(1, count_connections_in(Config, Username2)),
+ ?assertEqual(5, count_channels_in(Config, Username2)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username2)),
+ ?assertEqual(true, is_process_alive(Conn2)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2],
+
+ ?assertEqual(true, exists_in_tracked_connection_per_vhost_table(Config, 0, Vhost)),
+ ?assertEqual(true, exists_in_tracked_connection_per_vhost_table(Config, 1, Vhost)),
+
+ rabbit_ct_broker_helpers:delete_vhost(Config, Vhost),
+ timer:sleep(200),
+ ?assertEqual(0, count_connections_in(Config, Username2)),
+ ?assertEqual(0, count_channels_in(Config, Username2)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username2)),
+ ?assertEqual(false, is_process_alive(Conn2)),
+ [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2],
+
+ ?assertEqual(0, count_connections_in(Config, Username)),
+ ?assertEqual(0, count_channels_in(Config, Username)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(false, is_process_alive(Conn1)),
+ [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans1],
+
+ %% ensure vhost entry is cleared after 'tracking_execution_timeout'
+ ?assertEqual(false, exists_in_tracked_connection_per_vhost_table(Config, 0, Vhost)),
+ ?assertEqual(false, exists_in_tracked_connection_per_vhost_table(Config, 1, Vhost)),
+
+ rabbit_ct_broker_helpers:add_vhost(Config, Vhost),
+ rabbit_ct_broker_helpers:add_user(Config, Username),
+ rabbit_ct_broker_helpers:set_full_permissions(Config, Username, Vhost).
+
+cluster_vhost_down_mimic(Config) ->
+ Username = proplists:get_value(rmq_username, Config),
+ Username2 = <<"guest2">>,
+
+ Vhost = proplists:get_value(rmq_vhost, Config),
+
+ rabbit_ct_broker_helpers:add_user(Config, Username2),
+ rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost),
+
+ ?assertEqual(0, count_connections_in(Config, Username)),
+ ?assertEqual(0, count_connections_in(Config, Username2)),
+ ?assertEqual(0, count_channels_in(Config, Username)),
+ ?assertEqual(0, count_channels_in(Config, Username2)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username2)),
+
+ [Conn1] = open_connections(Config, [{0, Username}]),
+ Chans1 = [_|_] = open_channels(Conn1, 5),
+ ?assertEqual(1, count_connections_in(Config, Username)),
+ ?assertEqual(5, count_channels_in(Config, Username)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(true, is_process_alive(Conn1)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],
+
+ [Conn2] = open_connections(Config, [{1, Username2}]),
+ Chans2 = [_|_] = open_channels(Conn2, 5),
+ ?assertEqual(1, count_connections_in(Config, Username2)),
+ ?assertEqual(5, count_channels_in(Config, Username2)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username2)),
+ ?assertEqual(true, is_process_alive(Conn2)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2],
+
+ mimic_vhost_down(Config, 1, Vhost),
+ timer:sleep(100),
+ ?assertEqual(0, count_connections_in(Config, Username2)),
+ ?assertEqual(0, count_channels_in(Config, Username2)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username2)),
+ ?assertEqual(false, is_process_alive(Conn2)),
+ [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2],
+
+ %% gen_event notifies local handlers. remote connections still active
+ ?assertEqual(1, count_connections_in(Config, Username)),
+ ?assertEqual(5, count_channels_in(Config, Username)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(true, is_process_alive(Conn1)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],
+
+ mimic_vhost_down(Config, 0, Vhost),
+ timer:sleep(100),
+ ?assertEqual(0, count_connections_in(Config, Username)),
+ ?assertEqual(0, count_channels_in(Config, Username)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(false, is_process_alive(Conn1)),
+ [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans1].
+
+cluster_node_removed(Config) ->
+ Username = proplists:get_value(rmq_username, Config),
+ Username2 = <<"guest2">>,
+
+ Vhost = proplists:get_value(rmq_vhost, Config),
+
+ rabbit_ct_broker_helpers:add_user(Config, Username2),
+ rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost),
+
+ ?assertEqual(0, count_connections_in(Config, Username)),
+ ?assertEqual(0, count_connections_in(Config, Username2)),
+ ?assertEqual(0, count_channels_in(Config, Username)),
+ ?assertEqual(0, count_channels_in(Config, Username2)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username2)),
+
+ [Conn1] = open_connections(Config, [{0, Username}]),
+ Chans1 = [_|_] = open_channels(Conn1, 5),
+ ?assertEqual(1, count_connections_in(Config, Username)),
+ ?assertEqual(5, count_channels_in(Config, Username)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(true, is_process_alive(Conn1)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],
+
+ [Conn2] = open_connections(Config, [{1, Username2}]),
+ Chans2 = [_|_] = open_channels(Conn2, 5),
+ ?assertEqual(1, count_connections_in(Config, Username2)),
+ ?assertEqual(5, count_channels_in(Config, Username2)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username2)),
+ ?assertEqual(true, is_process_alive(Conn2)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2],
+
+ rabbit_ct_broker_helpers:stop_broker(Config, 1),
+ timer:sleep(200),
+ ?assertEqual(1, count_connections_in(Config, Username)),
+ ?assertEqual(5, count_channels_in(Config, Username)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(true, is_process_alive(Conn1)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],
+
+ rabbit_ct_broker_helpers:forget_cluster_node(Config, 0, 1),
+ timer:sleep(200),
+ NodeName = rabbit_ct_broker_helpers:get_node_config(Config, 1, nodename),
+
+ DroppedConnTrackingTables =
+ rabbit_connection_tracking:get_all_tracked_connection_table_names_for_node(NodeName),
+ [?assertEqual(
+ {'EXIT', {aborted, {no_exists, Tab, all}}},
+ catch mnesia:table_info(Tab, all)) || Tab <- DroppedConnTrackingTables],
+
+ DroppedChTrackingTables =
+ rabbit_channel_tracking:get_all_tracked_channel_table_names_for_node(NodeName),
+ [?assertEqual(
+ {'EXIT', {aborted, {no_exists, Tab, all}}},
+ catch mnesia:table_info(Tab, all)) || Tab <- DroppedChTrackingTables],
+
+ ?assertEqual(false, is_process_alive(Conn2)),
+ [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2],
+
+ ?assertEqual(1, count_connections_in(Config, Username)),
+ ?assertEqual(5, count_channels_in(Config, Username)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(true, is_process_alive(Conn1)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],
+
+ close_channels(Chans1),
+ ?awaitMatch(0, count_channels_in(Config, Username), 20000),
+ ?awaitMatch(0, tracked_user_channel_count(Config, Username), 20000),
+
+ close_connections([Conn1]),
+ ?awaitMatch(0, count_connections_in(Config, Username), 20000),
+ ?awaitMatch(0, tracked_user_connection_count(Config, Username), 20000).
+
+%% -------------------------------------------------------------------
+%% Helpers
+%% -------------------------------------------------------------------
+
+open_connections(Config, NodesAndUsers) ->
+ % Randomly select connection type
+ OpenConnectionFun = case ?config(connection_type, Config) of
+ network -> open_unmanaged_connection;
+ direct -> open_unmanaged_connection_direct
+ end,
+ Conns = lists:map(fun
+ ({Node, User}) ->
+ rabbit_ct_client_helpers:OpenConnectionFun(Config, Node,
+ User, User);
+ (Node) ->
+ rabbit_ct_client_helpers:OpenConnectionFun(Config, Node)
+ end, NodesAndUsers),
+ timer:sleep(500),
+ Conns.
+
+close_connections(Conns) ->
+ lists:foreach(fun
+ (Conn) ->
+ rabbit_ct_client_helpers:close_connection(Conn)
+ end, Conns),
+ timer:sleep(500).
+
+kill_connections(Conns) ->
+ lists:foreach(fun
+ (Conn) ->
+ (catch exit(Conn, please_terminate))
+ end, Conns),
+ timer:sleep(500).
+
+open_channels(Conn, N) ->
+ [begin
+ {ok, Ch} = amqp_connection:open_channel(Conn),
+ Ch
+ end || _ <- lists:seq(1, N)].
+
+close_channels(Channels = [_|_]) ->
+ [rabbit_ct_client_helpers:close_channel(Ch) || Ch <- Channels].
+
+count_connections_in(Config, Username) ->
+ length(connections_in(Config, Username)).
+
+connections_in(Config, Username) ->
+ connections_in(Config, 0, Username).
+connections_in(Config, NodeIndex, Username) ->
+ tracked_list_of_user(Config, NodeIndex, rabbit_connection_tracking, Username).
+
+count_channels_in(Config, Username) ->
+ Channels = channels_in(Config, Username),
+ length([Ch || Ch = #tracked_channel{username = Username0} <- Channels,
+ Username =:= Username0]).
+
+channels_in(Config, Username) ->
+ channels_in(Config, 0, Username).
+channels_in(Config, NodeIndex, Username) ->
+ tracked_list_of_user(Config, NodeIndex, rabbit_channel_tracking, Username).
+
+tracked_list_of_user(Config, NodeIndex, TrackingMod, Username) ->
+ rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
+ TrackingMod,
+ list_of_user, [Username]).
+
+tracked_user_connection_count(Config, Username) ->
+ tracked_user_connection_count(Config, 0, Username).
+tracked_user_connection_count(Config, NodeIndex, Username) ->
+ count_user_tracked_items(Config, NodeIndex, rabbit_connection_tracking, Username).
+
+tracked_user_channel_count(Config, Username) ->
+ tracked_user_channel_count(Config, 0, Username).
+tracked_user_channel_count(Config, NodeIndex, Username) ->
+ count_user_tracked_items(Config, NodeIndex, rabbit_channel_tracking, Username).
+
+count_user_tracked_items(Config, NodeIndex, TrackingMod, Username) ->
+ rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
+ TrackingMod,
+ count_tracked_items_in, [{user, Username}]).
+
+exists_in_tracked_connection_per_vhost_table(Config, VHost) ->
+ exists_in_tracked_connection_per_vhost_table(Config, 0, VHost).
+exists_in_tracked_connection_per_vhost_table(Config, NodeIndex, VHost) ->
+ exists_in_tracking_table(Config, NodeIndex,
+ fun rabbit_connection_tracking:tracked_connection_per_vhost_table_name_for/1,
+ VHost).
+
+exists_in_tracked_connection_per_user_table(Config, Username) ->
+ exists_in_tracked_connection_per_user_table(Config, 0, Username).
+exists_in_tracked_connection_per_user_table(Config, NodeIndex, Username) ->
+ exists_in_tracking_table(Config, NodeIndex,
+ fun rabbit_connection_tracking:tracked_connection_per_user_table_name_for/1,
+ Username).
+
+exists_in_tracked_channel_per_user_table(Config, Username) ->
+ exists_in_tracked_channel_per_user_table(Config, 0, Username).
+exists_in_tracked_channel_per_user_table(Config, NodeIndex, Username) ->
+ exists_in_tracking_table(Config, NodeIndex,
+ fun rabbit_channel_tracking:tracked_channel_per_user_table_name_for/1,
+ Username).
+
+exists_in_tracking_table(Config, NodeIndex, TableNameFun, Key) ->
+ Node = rabbit_ct_broker_helpers:get_node_config(
+ Config, NodeIndex, nodename),
+ Tab = TableNameFun(Node),
+ AllKeys = rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
+ mnesia,
+ dirty_all_keys, [Tab]),
+ lists:member(Key, AllKeys).
+
+mimic_vhost_down(Config, NodeIndex, VHost) ->
+ rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
+ rabbit_vhost, vhost_down, [VHost]).
+
+all_connections(Config) ->
+ all_connections(Config, 0).
+all_connections(Config, NodeIndex) ->
+ all_tracked_items(Config, NodeIndex, rabbit_connection_tracking).
+
+all_channels(Config) ->
+ all_channels(Config, 0).
+all_channels(Config, NodeIndex) ->
+ all_tracked_items(Config, NodeIndex, rabbit_channel_tracking).
+
+all_tracked_items(Config, NodeIndex, TrackingMod) ->
+ rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
+ TrackingMod,
+ list, []).
+
+set_up_vhost(Config, VHost) ->
+ rabbit_ct_broker_helpers:add_vhost(Config, VHost),
+ rabbit_ct_broker_helpers:set_full_permissions(Config, <<"guest">>, VHost),
+ set_vhost_connection_limit(Config, VHost, -1).
+
+set_vhost_connection_limit(Config, VHost, Count) ->
+ set_vhost_connection_limit(Config, 0, VHost, Count).
+
+set_vhost_connection_limit(Config, NodeIndex, VHost, Count) ->
+ Node = rabbit_ct_broker_helpers:get_node_config(
+ Config, NodeIndex, nodename),
+ ok = rabbit_ct_broker_helpers:control_action(
+ set_vhost_limits, Node,
+ ["{\"max-connections\": " ++ integer_to_list(Count) ++ "}"],
+ [{"-p", binary_to_list(VHost)}]).
+
+set_tracking_execution_timeout(Config, Timeout) ->
+ set_tracking_execution_timeout(Config, 0, Timeout).
+set_tracking_execution_timeout(Config, NodeIndex, Timeout) ->
+ rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
+ application, set_env,
+ [rabbit, tracking_execution_timeout, Timeout]).
+
+get_tracking_execution_timeout(Config) ->
+ get_tracking_execution_timeout(Config, 0).
+get_tracking_execution_timeout(Config, NodeIndex) ->
+ {ok, Timeout} = rabbit_ct_broker_helpers:rpc(
+ Config, NodeIndex,
+ application, get_env,
+ [rabbit, tracking_execution_timeout]),
+ Timeout.
+
+await_running_node_refresh(_Config, _NodeIndex) ->
+ timer:sleep(250).
+
+expect_that_client_connection_is_rejected(Config) ->
+ expect_that_client_connection_is_rejected(Config, 0).
+
+expect_that_client_connection_is_rejected(Config, NodeIndex) ->
+ {error, not_allowed} =
+ rabbit_ct_client_helpers:open_unmanaged_connection(Config, NodeIndex).
+
+expect_that_client_connection_is_rejected(Config, NodeIndex, VHost) ->
+ {error, not_allowed} =
+ rabbit_ct_client_helpers:open_unmanaged_connection(Config, NodeIndex, VHost).