summaryrefslogtreecommitdiff
path: root/deps/rabbit/test/per_user_connection_channel_tracking_SUITE.erl
blob: 79190dc2ff1087e88b64a97ac8d1d78ff0a28c9e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2020-2022 VMware, Inc. or its affiliates.  All rights reserved.
%%

-module(per_user_connection_channel_tracking_SUITE).

-include_lib("common_test/include/ct.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").

-compile(export_all).

all() ->
    [
     {group, cluster_size_1_network},
     {group, cluster_size_2_network},
     {group, cluster_size_1_direct},
     {group, cluster_size_2_direct}
    ].

groups() ->
    ClusterSize1Tests = [
        single_node_user_connection_channel_tracking,
        single_node_user_deletion,
        single_node_vhost_down_mimic,
        single_node_vhost_deletion
    ],
    ClusterSize2Tests = [
        cluster_user_deletion,
        cluster_vhost_down_mimic,
        cluster_vhost_deletion,
        cluster_node_removed
    ],
    [
      {cluster_size_1_network, [], ClusterSize1Tests},
      {cluster_size_2_network, [], ClusterSize2Tests},
      {cluster_size_1_direct, [], ClusterSize1Tests},
      {cluster_size_2_direct, [], ClusterSize2Tests}
    ].

suite() ->
    [
      %% If a test hangs, no need to wait for 30 minutes.
      {timetrap, {minutes, 8}}
    ].

%% -------------------------------------------------------------------
%% Testsuite setup/teardown.
%% -------------------------------------------------------------------

init_per_suite(Config) ->
    rabbit_ct_helpers:log_environment(),
    rabbit_ct_helpers:run_setup_steps(Config).

end_per_suite(Config) ->
    rabbit_ct_helpers:run_teardown_steps(Config).

init_per_group(cluster_size_1_network, Config) ->
    Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, network}]),
    init_per_multinode_group(cluster_size_1_network, Config1, 1);
init_per_group(cluster_size_2_network, Config) ->
    case rabbit_ct_helpers:is_mixed_versions() of
        false ->
            Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, network}]),
            init_per_multinode_group(cluster_size_2_network, Config1, 2);
        _ ->
            %% In a mixed 3.8/3.9 cluster, changes to rabbit_core_ff.erl imply that some
            %% feature flag related migrations cannot occur, and therefore user_limits
            %% cannot be enabled in a 3.8/3.9 mixed cluster
            {skip, "cluster_size_2_network is not mixed version compatible"}
    end;
init_per_group(cluster_size_1_direct, Config) ->
    Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, direct}]),
    init_per_multinode_group(cluster_size_1_direct, Config1, 1);
init_per_group(cluster_size_2_direct, Config) ->
    Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, direct}]),
    init_per_multinode_group(cluster_size_2_direct, Config1, 2).

init_per_multinode_group(Group, Config, NodeCount) ->
    Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
    Config1 = rabbit_ct_helpers:set_config(Config, [
                                                    {rmq_nodes_count, NodeCount},
                                                    {rmq_nodename_suffix, Suffix}
      ]),
    Config2 = rabbit_ct_helpers:run_steps(
                Config1, rabbit_ct_broker_helpers:setup_steps() ++
                rabbit_ct_client_helpers:setup_steps()),
    EnableFF = rabbit_ct_broker_helpers:enable_feature_flag(
                 Config2, user_limits),
    case EnableFF of
        ok ->
            Config2;
        {skip, _} = Skip ->
            end_per_group(Group, Config2),
            Skip;
        Other ->
            end_per_group(Group, Config2),
            {skip, Other}
    end.

end_per_group(_Group, Config) ->
    rabbit_ct_helpers:run_steps(Config,
      rabbit_ct_client_helpers:teardown_steps() ++
      rabbit_ct_broker_helpers:teardown_steps()).

init_per_testcase(Testcase, Config) ->
    rabbit_ct_helpers:testcase_started(Config, Testcase),
    clear_all_connection_tracking_tables(Config),
    clear_all_channel_tracking_tables(Config),
    Config.

end_per_testcase(Testcase, Config) ->
    clear_all_connection_tracking_tables(Config),
    clear_all_channel_tracking_tables(Config),
    rabbit_ct_helpers:testcase_finished(Config, Testcase).

clear_all_connection_tracking_tables(Config) ->
    [rabbit_ct_broker_helpers:rpc(Config,
        N,
        rabbit_connection_tracking,
        clear_tracking_tables,
        []) || N <- rabbit_ct_broker_helpers:get_node_configs(Config, nodename)].

clear_all_channel_tracking_tables(Config) ->
    [rabbit_ct_broker_helpers:rpc(Config,
        N,
        rabbit_channel_tracking,
        clear_tracking_tables,
        []) || N <- rabbit_ct_broker_helpers:get_node_configs(Config, nodename)].

%% -------------------------------------------------------------------
%% Test cases.
%% -------------------------------------------------------------------
single_node_user_connection_channel_tracking(Config) ->
    Username = proplists:get_value(rmq_username, Config),
    Username2 = <<"guest2">>,

    Vhost = proplists:get_value(rmq_vhost, Config),

    rabbit_ct_broker_helpers:add_user(Config, Username2),
    rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost),

    ?assertEqual(0, count_connections_in(Config, Username)),
    ?assertEqual(0, count_connections_in(Config, Username2)),
    ?assertEqual(0, count_channels_in(Config, Username)),
    ?assertEqual(0, count_channels_in(Config, Username2)),
    ?assertEqual(0, tracked_user_connection_count(Config, Username)),
    ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
    ?assertEqual(0, tracked_user_channel_count(Config, Username)),
    ?assertEqual(0, tracked_user_channel_count(Config, Username2)),

    [Conn1] = open_connections(Config, [0]),
    [Chan1] = open_channels(Conn1, 1),
    [#tracked_connection{username = Username}] = connections_in(Config, Username),
    [#tracked_channel{username = Username}] = channels_in(Config, Username),
    ?assertEqual(true, is_process_alive(Conn1)),
    ?assertEqual(true, is_process_alive(Chan1)),
    close_channels([Chan1]),
    ?awaitMatch(0, count_channels_in(Config, Username), 20000),
    ?awaitMatch(0, tracked_user_channel_count(Config, Username), 20000),
    ?awaitMatch(false, is_process_alive(Chan1), 20000),
    close_connections([Conn1]),
    ?awaitMatch(0, length(connections_in(Config, Username)), 20000),
    ?awaitMatch(0, tracked_user_connection_count(Config, Username), 20000),
    ?awaitMatch(false, is_process_alive(Conn1), 20000),

    [Conn2] = open_connections(Config, [{0, Username2}]),
    Chans2  = [_|_] = open_channels(Conn2, 5),
    timer:sleep(100),
    [#tracked_connection{username = Username2}] = connections_in(Config, Username2),
    ?assertEqual(5, count_channels_in(Config, Username2)),
    ?assertEqual(1, tracked_user_connection_count(Config, Username2)),
    ?assertEqual(5, tracked_user_channel_count(Config, Username2)),
    ?assertEqual(true, is_process_alive(Conn2)),
    [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2],

    [Conn3] = open_connections(Config, [0]),
    Chans3 = [_|_] = open_channels(Conn3, 5),
    [#tracked_connection{username = Username}] = connections_in(Config, Username),
    ?assertEqual(5, count_channels_in(Config, Username)),
    ?assertEqual(1, tracked_user_connection_count(Config, Username)),
    ?assertEqual(5, tracked_user_channel_count(Config, Username)),
    ?assertEqual(true, is_process_alive(Conn3)),
    [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans3],

    [Conn4] = open_connections(Config, [0]),
    Chans4 = [_|_] = open_channels(Conn4, 5),
    ?assertEqual(2, tracked_user_connection_count(Config, Username)),
    ?assertEqual(10, tracked_user_channel_count(Config, Username)),
    ?assertEqual(true, is_process_alive(Conn4)),
    [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans4],
    kill_connections([Conn4]),
    [#tracked_connection{username = Username}] = connections_in(Config, Username),
    ?awaitMatch(5, count_channels_in(Config, Username), 20000),
    ?awaitMatch(1, tracked_user_connection_count(Config, Username), 20000),
    ?awaitMatch(5, tracked_user_channel_count(Config, Username), 20000),
    ?assertEqual(false, is_process_alive(Conn4)),
    [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans4],

    [Conn5] = open_connections(Config, [0]),
    Chans5  = [_|_] = open_channels(Conn5, 7),
    [Username, Username] =
        lists:map(fun (#tracked_connection{username = U}) -> U end,
                  connections_in(Config, Username)),
    ?assertEqual(12, count_channels_in(Config, Username)),
    ?assertEqual(12, tracked_user_channel_count(Config, Username)),
    ?assertEqual(2, tracked_user_connection_count(Config, Username)),
    ?assertEqual(true, is_process_alive(Conn5)),
    [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans5],

    close_channels(Chans2 ++ Chans3 ++ Chans5),
    ?awaitMatch(0, length(all_channels(Config)), 20000),
    ?awaitMatch(0, tracked_user_channel_count(Config, Username), 20000),
    ?awaitMatch(0, tracked_user_channel_count(Config, Username2), 20000),

    close_connections([Conn2, Conn3, Conn5]),
    rabbit_ct_broker_helpers:delete_user(Config, Username2),
    ?awaitMatch(0, tracked_user_connection_count(Config, Username), 20000),
    ?awaitMatch(0, tracked_user_connection_count(Config, Username2), 20000),
    ?awaitMatch(0, length(all_connections(Config)), 20000).

single_node_user_deletion(Config) ->
    set_tracking_execution_timeout(Config, 100),

    Username = proplists:get_value(rmq_username, Config),
    Username2 = <<"guest2">>,

    Vhost = proplists:get_value(rmq_vhost, Config),

    rabbit_ct_broker_helpers:add_user(Config, Username2),
    rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost),

    ?assertEqual(100, get_tracking_execution_timeout(Config)),

    ?assertEqual(0, count_connections_in(Config, Username)),
    ?assertEqual(0, count_connections_in(Config, Username2)),
    ?assertEqual(0, count_channels_in(Config, Username)),
    ?assertEqual(0, count_channels_in(Config, Username2)),
    ?assertEqual(0, tracked_user_connection_count(Config, Username)),
    ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
    ?assertEqual(0, tracked_user_channel_count(Config, Username)),
    ?assertEqual(0, tracked_user_channel_count(Config, Username2)),

    [Conn1] = open_connections(Config, [0]),
    Chans1 = [_|_] = open_channels(Conn1, 5),
    ?assertEqual(1, count_connections_in(Config, Username)),
    ?assertEqual(5, count_channels_in(Config, Username)),
    ?assertEqual(1, tracked_user_connection_count(Config, Username)),
    ?assertEqual(5, tracked_user_channel_count(Config, Username)),
    ?assertEqual(true, is_process_alive(Conn1)),
    [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],

    [Conn2] = open_connections(Config, [{0, Username2}]),
    Chans2 = [_|_] = open_channels(Conn2, 5),
    ?assertEqual(1, count_connections_in(Config, Username2)),
    ?assertEqual(5, count_channels_in(Config, Username2)),
    ?assertEqual(1, tracked_user_connection_count(Config, Username2)),
    ?assertEqual(5, tracked_user_channel_count(Config, Username2)),
    ?assertEqual(true, is_process_alive(Conn2)),
    [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2],

    ?assertEqual(true, exists_in_tracked_connection_per_user_table(Config, Username2)),
    ?assertEqual(true, exists_in_tracked_channel_per_user_table(Config, Username2)),

    rabbit_ct_broker_helpers:delete_user(Config, Username2),
    timer:sleep(100),
    ?assertEqual(0, count_connections_in(Config, Username2)),
    ?assertEqual(0, count_channels_in(Config, Username2)),
    ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
    ?assertEqual(0, tracked_user_channel_count(Config, Username2)),
    ?assertEqual(false, is_process_alive(Conn2)),
    [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2],

    %% ensure vhost entry is cleared after 'tracking_execution_timeout'
    ?awaitMatch(false, exists_in_tracked_connection_per_user_table(Config, Username2), 20000),
    ?awaitMatch(false, exists_in_tracked_channel_per_user_table(Config, Username2), 20000),

    ?assertEqual(1, count_connections_in(Config, Username)),
    ?assertEqual(5, count_channels_in(Config, Username)),
    ?assertEqual(1, tracked_user_connection_count(Config, Username)),
    ?assertEqual(5, tracked_user_channel_count(Config, Username)),
    ?assertEqual(true, is_process_alive(Conn1)),
    [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],

    close_channels(Chans1),
    ?awaitMatch(0, count_channels_in(Config, Username), 20000),
    ?awaitMatch(0, tracked_user_channel_count(Config, Username), 20000),

    close_connections([Conn1]),
    ?awaitMatch(0, count_connections_in(Config, Username), 20000),
    ?awaitMatch(0, tracked_user_connection_count(Config, Username), 20000).

single_node_vhost_deletion(Config) ->
    set_tracking_execution_timeout(Config, 100),

    Username = proplists:get_value(rmq_username, Config),
    Username2 = <<"guest2">>,

    Vhost = proplists:get_value(rmq_vhost, Config),

    rabbit_ct_broker_helpers:add_user(Config, Username2),
    rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost),

    ?assertEqual(100, get_tracking_execution_timeout(Config)),

    ?assertEqual(0, count_connections_in(Config, Username)),
    ?assertEqual(0, count_connections_in(Config, Username2)),
    ?assertEqual(0, count_channels_in(Config, Username)),
    ?assertEqual(0, count_channels_in(Config, Username2)),
    ?assertEqual(0, tracked_user_connection_count(Config, Username)),
    ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
    ?assertEqual(0, tracked_user_channel_count(Config, Username)),
    ?assertEqual(0, tracked_user_channel_count(Config, Username2)),

    [Conn1] = open_connections(Config, [0]),
    Chans1 = [_|_] = open_channels(Conn1, 5),
    ?assertEqual(1, count_connections_in(Config, Username)),
    ?assertEqual(5, count_channels_in(Config, Username)),
    ?assertEqual(1, tracked_user_connection_count(Config, Username)),
    ?assertEqual(5, tracked_user_channel_count(Config, Username)),
    ?assertEqual(true, is_process_alive(Conn1)),
    [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],

    [Conn2] = open_connections(Config, [{0, Username2}]),
    Chans2 = [_|_] = open_channels(Conn2, 5),
    ?assertEqual(1, count_connections_in(Config, Username2)),
    ?assertEqual(5, count_channels_in(Config, Username2)),
    ?assertEqual(1, tracked_user_connection_count(Config, Username2)),
    ?assertEqual(5, tracked_user_channel_count(Config, Username2)),
    ?assertEqual(true, is_process_alive(Conn2)),
    [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2],

    ?assertEqual(true, exists_in_tracked_connection_per_vhost_table(Config, Vhost)),

    rabbit_ct_broker_helpers:delete_vhost(Config, Vhost),
    timer:sleep(200),
    ?assertEqual(0, count_connections_in(Config, Username2)),
    ?assertEqual(0, count_channels_in(Config, Username2)),
    ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
    ?assertEqual(0, tracked_user_channel_count(Config, Username2)),
    ?assertEqual(false, is_process_alive(Conn2)),
    [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2],

    ?assertEqual(0, count_connections_in(Config, Username)),
    ?assertEqual(0, count_channels_in(Config, Username)),
    ?assertEqual(0, tracked_user_connection_count(Config, Username)),
    ?assertEqual(0, tracked_user_channel_count(Config, Username)),
    ?assertEqual(false, is_process_alive(Conn1)),
    [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans1],

    %% ensure vhost entry is cleared after 'tracking_execution_timeout'
    ?assertEqual(false, exists_in_tracked_connection_per_vhost_table(Config, Vhost)),

    rabbit_ct_broker_helpers:add_vhost(Config, Vhost).

single_node_vhost_down_mimic(Config) ->
    Username = proplists:get_value(rmq_username, Config),
    Username2 = <<"guest2">>,

    Vhost = proplists:get_value(rmq_vhost, Config),

    rabbit_ct_broker_helpers:add_user(Config, Username2),
    rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost),

    ?assertEqual(0, count_connections_in(Config, Username)),
    ?assertEqual(0, count_connections_in(Config, Username2)),
    ?assertEqual(0, count_channels_in(Config, Username)),
    ?assertEqual(0, count_channels_in(Config, Username2)),
    ?assertEqual(0, tracked_user_connection_count(Config, Username)),
    ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
    ?assertEqual(0, tracked_user_channel_count(Config, Username)),
    ?assertEqual(0, tracked_user_channel_count(Config, Username2)),

    [Conn1] = open_connections(Config, [0]),
    Chans1 = [_|_] = open_channels(Conn1, 5),
    ?assertEqual(1, count_connections_in(Config, Username)),
    ?assertEqual(5, count_channels_in(Config, Username)),
    ?assertEqual(1, tracked_user_connection_count(Config, Username)),
    ?assertEqual(5, tracked_user_channel_count(Config, Username)),
    ?assertEqual(true, is_process_alive(Conn1)),
    [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],

    [Conn2] = open_connections(Config, [{0, Username2}]),
    Chans2 = [_|_] = open_channels(Conn2, 5),
    ?assertEqual(1, count_connections_in(Config, Username2)),
    ?assertEqual(5, count_channels_in(Config, Username2)),
    ?assertEqual(1, tracked_user_connection_count(Config, Username2)),
    ?assertEqual(5, tracked_user_channel_count(Config, Username2)),
    ?assertEqual(true, is_process_alive(Conn2)),
    [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2],

    %% mimic vhost down event, while connections exist
    mimic_vhost_down(Config, 0, Vhost),
    timer:sleep(200),
    ?assertEqual(0, count_connections_in(Config, Username2)),
    ?assertEqual(0, count_channels_in(Config, Username2)),
    ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
    ?assertEqual(0, tracked_user_channel_count(Config, Username2)),
    ?assertEqual(false, is_process_alive(Conn2)),
    [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2],

    ?assertEqual(0, count_connections_in(Config, Username)),
    ?assertEqual(0, count_channels_in(Config, Username)),
    ?assertEqual(0, tracked_user_connection_count(Config, Username)),
    ?assertEqual(0, tracked_user_channel_count(Config, Username)),
    ?assertEqual(false, is_process_alive(Conn1)),
    [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans1].

cluster_user_deletion(Config) ->
    set_tracking_execution_timeout(Config, 0, 100),
    set_tracking_execution_timeout(Config, 1, 100),
    Username = proplists:get_value(rmq_username, Config),
    Username2 = <<"guest2">>,

    Vhost = proplists:get_value(rmq_vhost, Config),

    rabbit_ct_broker_helpers:add_user(Config, Username2),
    rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost),

    ?assertEqual(100, get_tracking_execution_timeout(Config, 0)),
    ?assertEqual(100, get_tracking_execution_timeout(Config, 1)),

    ?assertEqual(0, count_connections_in(Config, Username)),
    ?assertEqual(0, count_connections_in(Config, Username2)),
    ?assertEqual(0, count_channels_in(Config, Username)),
    ?assertEqual(0, count_channels_in(Config, Username2)),
    ?assertEqual(0, tracked_user_connection_count(Config, Username)),
    ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
    ?assertEqual(0, tracked_user_channel_count(Config, Username)),
    ?assertEqual(0, tracked_user_channel_count(Config, Username2)),

    [Conn1] = open_connections(Config, [0]),
    Chans1 = [_|_] = open_channels(Conn1, 5),
    ?assertEqual(1, count_connections_in(Config, Username)),
    ?assertEqual(5, count_channels_in(Config, Username)),
    ?assertEqual(1, tracked_user_connection_count(Config, Username)),
    ?assertEqual(5, tracked_user_channel_count(Config, Username)),
    ?assertEqual(true, is_process_alive(Conn1)),
    [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],

    [Conn2] = open_connections(Config, [{1, Username2}]),
    Chans2 = [_|_] = open_channels(Conn2, 5),
    ?assertEqual(1, count_connections_in(Config, Username2)),
    ?assertEqual(5, count_channels_in(Config, Username2)),
    ?assertEqual(1, tracked_user_connection_count(Config, Username2)),
    ?assertEqual(5, tracked_user_channel_count(Config, Username2)),
    ?assertEqual(true, is_process_alive(Conn2)),
    [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2],

    ?assertEqual(true, exists_in_tracked_connection_per_user_table(Config, 1, Username2)),
    ?assertEqual(true, exists_in_tracked_channel_per_user_table(Config, 1, Username2)),

    rabbit_ct_broker_helpers:delete_user(Config, Username2),
    timer:sleep(200),
    ?assertEqual(0, count_connections_in(Config, Username2)),
    ?assertEqual(0, count_channels_in(Config, Username2)),
    ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
    ?assertEqual(0, tracked_user_channel_count(Config, Username2)),
    ?assertEqual(false, is_process_alive(Conn2)),
    [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2],

    %% ensure user entry is cleared after 'tracking_execution_timeout'
    ?assertEqual(false, exists_in_tracked_connection_per_user_table(Config, 1, Username2)),
    ?assertEqual(false, exists_in_tracked_channel_per_user_table(Config, 1, Username2)),

    close_channels(Chans1),
    ?awaitMatch(0, count_channels_in(Config, Username), 20000),
    ?awaitMatch(0, tracked_user_channel_count(Config, Username), 20000),

    close_connections([Conn1]),
    ?awaitMatch(0, count_connections_in(Config, Username), 20000),
    ?awaitMatch(0, tracked_user_connection_count(Config, Username), 20000).

cluster_vhost_deletion(Config) ->
    set_tracking_execution_timeout(Config, 0, 100),
    set_tracking_execution_timeout(Config, 1, 100),
    Username = proplists:get_value(rmq_username, Config),
    Username2 = <<"guest2">>,

    Vhost = proplists:get_value(rmq_vhost, Config),

    rabbit_ct_broker_helpers:add_user(Config, Username2),
    rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost),

    ?assertEqual(100, get_tracking_execution_timeout(Config, 0)),
    ?assertEqual(100, get_tracking_execution_timeout(Config, 1)),

    ?assertEqual(0, count_connections_in(Config, Username)),
    ?assertEqual(0, count_connections_in(Config, Username2)),
    ?assertEqual(0, count_channels_in(Config, Username)),
    ?assertEqual(0, count_channels_in(Config, Username2)),
    ?assertEqual(0, tracked_user_connection_count(Config, Username)),
    ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
    ?assertEqual(0, tracked_user_channel_count(Config, Username)),
    ?assertEqual(0, tracked_user_channel_count(Config, Username2)),

    [Conn1] = open_connections(Config, [{0, Username}]),
    Chans1 = [_|_] = open_channels(Conn1, 5),
    ?assertEqual(1, count_connections_in(Config, Username)),
    ?assertEqual(5, count_channels_in(Config, Username)),
    ?assertEqual(1, tracked_user_connection_count(Config, Username)),
    ?assertEqual(5, tracked_user_channel_count(Config, Username)),
    ?assertEqual(true, is_process_alive(Conn1)),
    [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],

    [Conn2] = open_connections(Config, [{1, Username2}]),
    Chans2 = [_|_] = open_channels(Conn2, 5),
    ?assertEqual(1, count_connections_in(Config, Username2)),
    ?assertEqual(5, count_channels_in(Config, Username2)),
    ?assertEqual(1, tracked_user_connection_count(Config, Username2)),
    ?assertEqual(5, tracked_user_channel_count(Config, Username2)),
    ?assertEqual(true, is_process_alive(Conn2)),
    [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2],

    ?assertEqual(true, exists_in_tracked_connection_per_vhost_table(Config, 0, Vhost)),
    ?assertEqual(true, exists_in_tracked_connection_per_vhost_table(Config, 1, Vhost)),

    rabbit_ct_broker_helpers:delete_vhost(Config, Vhost),
    timer:sleep(200),
    ?assertEqual(0, count_connections_in(Config, Username2)),
    ?assertEqual(0, count_channels_in(Config, Username2)),
    ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
    ?assertEqual(0, tracked_user_channel_count(Config, Username2)),
    ?assertEqual(false, is_process_alive(Conn2)),
    [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2],

    ?assertEqual(0, count_connections_in(Config, Username)),
    ?assertEqual(0, count_channels_in(Config, Username)),
    ?assertEqual(0, tracked_user_connection_count(Config, Username)),
    ?assertEqual(0, tracked_user_channel_count(Config, Username)),
    ?assertEqual(false, is_process_alive(Conn1)),
    [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans1],

    %% ensure vhost entry is cleared after 'tracking_execution_timeout'
    ?assertEqual(false, exists_in_tracked_connection_per_vhost_table(Config, 0, Vhost)),
    ?assertEqual(false, exists_in_tracked_connection_per_vhost_table(Config, 1, Vhost)),

    rabbit_ct_broker_helpers:add_vhost(Config, Vhost),
    rabbit_ct_broker_helpers:add_user(Config, Username),
    rabbit_ct_broker_helpers:set_full_permissions(Config, Username, Vhost).

cluster_vhost_down_mimic(Config) ->
    Username = proplists:get_value(rmq_username, Config),
    Username2 = <<"guest2">>,

    Vhost = proplists:get_value(rmq_vhost, Config),

    rabbit_ct_broker_helpers:add_user(Config, Username2),
    rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost),

    ?assertEqual(0, count_connections_in(Config, Username)),
    ?assertEqual(0, count_connections_in(Config, Username2)),
    ?assertEqual(0, count_channels_in(Config, Username)),
    ?assertEqual(0, count_channels_in(Config, Username2)),
    ?assertEqual(0, tracked_user_connection_count(Config, Username)),
    ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
    ?assertEqual(0, tracked_user_channel_count(Config, Username)),
    ?assertEqual(0, tracked_user_channel_count(Config, Username2)),

    [Conn1] = open_connections(Config, [{0, Username}]),
    Chans1 = [_|_] = open_channels(Conn1, 5),
    ?assertEqual(1, count_connections_in(Config, Username)),
    ?assertEqual(5, count_channels_in(Config, Username)),
    ?assertEqual(1, tracked_user_connection_count(Config, Username)),
    ?assertEqual(5, tracked_user_channel_count(Config, Username)),
    ?assertEqual(true, is_process_alive(Conn1)),
    [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],

    [Conn2] = open_connections(Config, [{1, Username2}]),
    Chans2 = [_|_] = open_channels(Conn2, 5),
    ?assertEqual(1, count_connections_in(Config, Username2)),
    ?assertEqual(5, count_channels_in(Config, Username2)),
    ?assertEqual(1, tracked_user_connection_count(Config, Username2)),
    ?assertEqual(5, tracked_user_channel_count(Config, Username2)),
    ?assertEqual(true, is_process_alive(Conn2)),
    [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2],

    mimic_vhost_down(Config, 1, Vhost),
    timer:sleep(100),
    ?assertEqual(0, count_connections_in(Config, Username2)),
    ?assertEqual(0, count_channels_in(Config, Username2)),
    ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
    ?assertEqual(0, tracked_user_channel_count(Config, Username2)),
    ?assertEqual(false, is_process_alive(Conn2)),
    [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2],

    %% gen_event notifies local handlers. remote connections still active
    ?assertEqual(1, count_connections_in(Config, Username)),
    ?assertEqual(5, count_channels_in(Config, Username)),
    ?assertEqual(1, tracked_user_connection_count(Config, Username)),
    ?assertEqual(5, tracked_user_channel_count(Config, Username)),
    ?assertEqual(true, is_process_alive(Conn1)),
    [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],

    mimic_vhost_down(Config, 0, Vhost),
    timer:sleep(100),
    ?assertEqual(0, count_connections_in(Config, Username)),
    ?assertEqual(0, count_channels_in(Config, Username)),
    ?assertEqual(0, tracked_user_connection_count(Config, Username)),
    ?assertEqual(0, tracked_user_channel_count(Config, Username)),
    ?assertEqual(false, is_process_alive(Conn1)),
    [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans1].

cluster_node_removed(Config) ->
    Username = proplists:get_value(rmq_username, Config),
    Username2 = <<"guest2">>,

    Vhost = proplists:get_value(rmq_vhost, Config),

    rabbit_ct_broker_helpers:add_user(Config, Username2),
    rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost),

    ?assertEqual(0, count_connections_in(Config, Username)),
    ?assertEqual(0, count_connections_in(Config, Username2)),
    ?assertEqual(0, count_channels_in(Config, Username)),
    ?assertEqual(0, count_channels_in(Config, Username2)),
    ?assertEqual(0, tracked_user_connection_count(Config, Username)),
    ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
    ?assertEqual(0, tracked_user_channel_count(Config, Username)),
    ?assertEqual(0, tracked_user_channel_count(Config, Username2)),

    [Conn1] = open_connections(Config, [{0, Username}]),
    Chans1 = [_|_] = open_channels(Conn1, 5),
    ?assertEqual(1, count_connections_in(Config, Username)),
    ?assertEqual(5, count_channels_in(Config, Username)),
    ?assertEqual(1, tracked_user_connection_count(Config, Username)),
    ?assertEqual(5, tracked_user_channel_count(Config, Username)),
    ?assertEqual(true, is_process_alive(Conn1)),
    [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],

    [Conn2] = open_connections(Config, [{1, Username2}]),
    Chans2 = [_|_] = open_channels(Conn2, 5),
    ?assertEqual(1, count_connections_in(Config, Username2)),
    ?assertEqual(5, count_channels_in(Config, Username2)),
    ?assertEqual(1, tracked_user_connection_count(Config, Username2)),
    ?assertEqual(5, tracked_user_channel_count(Config, Username2)),
    ?assertEqual(true, is_process_alive(Conn2)),
    [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2],

    rabbit_ct_broker_helpers:stop_broker(Config, 1),
    timer:sleep(200),
    ?assertEqual(1, count_connections_in(Config, Username)),
    ?assertEqual(5, count_channels_in(Config, Username)),
    ?assertEqual(1, tracked_user_connection_count(Config, Username)),
    ?assertEqual(5, tracked_user_channel_count(Config, Username)),
    ?assertEqual(true, is_process_alive(Conn1)),
    [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],

    rabbit_ct_broker_helpers:forget_cluster_node(Config, 0, 1),
    timer:sleep(200),
    NodeName = rabbit_ct_broker_helpers:get_node_config(Config, 1, nodename),

    DroppedConnTrackingTables =
        rabbit_connection_tracking:get_all_tracked_connection_table_names_for_node(NodeName),
    [?assertEqual(
        {'EXIT', {aborted, {no_exists, Tab, all}}},
        catch mnesia:table_info(Tab, all)) || Tab <- DroppedConnTrackingTables],

    DroppedChTrackingTables =
        rabbit_channel_tracking:get_all_tracked_channel_table_names_for_node(NodeName),
    [?assertEqual(
        {'EXIT', {aborted, {no_exists, Tab, all}}},
        catch mnesia:table_info(Tab, all)) || Tab <- DroppedChTrackingTables],

    ?assertEqual(false, is_process_alive(Conn2)),
    [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2],

    ?assertEqual(1, count_connections_in(Config, Username)),
    ?assertEqual(5, count_channels_in(Config, Username)),
    ?assertEqual(1, tracked_user_connection_count(Config, Username)),
    ?assertEqual(5, tracked_user_channel_count(Config, Username)),
    ?assertEqual(true, is_process_alive(Conn1)),
    [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],

    close_channels(Chans1),
    ?awaitMatch(0, count_channels_in(Config, Username), 20000),
    ?awaitMatch(0, tracked_user_channel_count(Config, Username), 20000),

    close_connections([Conn1]),
    ?awaitMatch(0, count_connections_in(Config, Username), 20000),
    ?awaitMatch(0, tracked_user_connection_count(Config, Username), 20000).

%% -------------------------------------------------------------------
%% Helpers
%% -------------------------------------------------------------------

open_connections(Config, NodesAndUsers) ->
    % Randomly select connection type
    OpenConnectionFun = case ?config(connection_type, Config) of
        network -> open_unmanaged_connection;
        direct  -> open_unmanaged_connection_direct
    end,
    Conns = lists:map(fun
      ({Node, User}) ->
          rabbit_ct_client_helpers:OpenConnectionFun(Config, Node,
                                                     User, User);
      (Node) ->
          rabbit_ct_client_helpers:OpenConnectionFun(Config, Node)
      end, NodesAndUsers),
    timer:sleep(500),
    Conns.

close_connections(Conns) ->
    lists:foreach(fun
      (Conn) ->
          rabbit_ct_client_helpers:close_connection(Conn)
      end, Conns),
    timer:sleep(500).

kill_connections(Conns) ->
    lists:foreach(fun
      (Conn) ->
          (catch exit(Conn, please_terminate))
      end, Conns),
    timer:sleep(500).

open_channels(Conn, N) ->
    [begin
        {ok, Ch} = amqp_connection:open_channel(Conn),
        Ch
     end || _ <- lists:seq(1, N)].

close_channels(Channels = [_|_]) ->
    [rabbit_ct_client_helpers:close_channel(Ch) || Ch <- Channels].

count_connections_in(Config, Username) ->
    length(connections_in(Config, Username)).

connections_in(Config, Username) ->
    connections_in(Config, 0, Username).
connections_in(Config, NodeIndex, Username) ->
    tracked_list_of_user(Config, NodeIndex, rabbit_connection_tracking, Username).

count_channels_in(Config, Username) ->
    Channels = channels_in(Config, Username),
    length([Ch || Ch = #tracked_channel{username = Username0} <- Channels,
                  Username =:= Username0]).

channels_in(Config, Username) ->
    channels_in(Config, 0, Username).
channels_in(Config, NodeIndex, Username) ->
    tracked_list_of_user(Config, NodeIndex, rabbit_channel_tracking, Username).

tracked_list_of_user(Config, NodeIndex, TrackingMod, Username) ->
    rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
                                 TrackingMod,
                                 list_of_user, [Username]).

tracked_user_connection_count(Config, Username) ->
    tracked_user_connection_count(Config, 0, Username).
tracked_user_connection_count(Config, NodeIndex, Username) ->
    count_user_tracked_items(Config, NodeIndex, rabbit_connection_tracking, Username).

tracked_user_channel_count(Config, Username) ->
    tracked_user_channel_count(Config, 0, Username).
tracked_user_channel_count(Config, NodeIndex, Username) ->
    count_user_tracked_items(Config, NodeIndex, rabbit_channel_tracking, Username).

count_user_tracked_items(Config, NodeIndex, TrackingMod, Username) ->
    rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
                                 TrackingMod,
                                 count_tracked_items_in, [{user, Username}]).

exists_in_tracked_connection_per_vhost_table(Config, VHost) ->
    exists_in_tracked_connection_per_vhost_table(Config, 0, VHost).
exists_in_tracked_connection_per_vhost_table(Config, NodeIndex, VHost) ->
    exists_in_tracking_table(Config, NodeIndex,
        fun rabbit_connection_tracking:tracked_connection_per_vhost_table_name_for/1,
        VHost).

exists_in_tracked_connection_per_user_table(Config, Username) ->
    exists_in_tracked_connection_per_user_table(Config, 0, Username).
exists_in_tracked_connection_per_user_table(Config, NodeIndex, Username) ->
    exists_in_tracking_table(Config, NodeIndex,
        fun rabbit_connection_tracking:tracked_connection_per_user_table_name_for/1,
        Username).

exists_in_tracked_channel_per_user_table(Config, Username) ->
    exists_in_tracked_channel_per_user_table(Config, 0, Username).
exists_in_tracked_channel_per_user_table(Config, NodeIndex, Username) ->
    exists_in_tracking_table(Config, NodeIndex,
        fun rabbit_channel_tracking:tracked_channel_per_user_table_name_for/1,
        Username).

exists_in_tracking_table(Config, NodeIndex, TableNameFun, Key) ->
    Node = rabbit_ct_broker_helpers:get_node_config(
                Config, NodeIndex, nodename),
    Tab = TableNameFun(Node),
    AllKeys = rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
                                           mnesia,
                                           dirty_all_keys, [Tab]),
    lists:member(Key, AllKeys).

mimic_vhost_down(Config, NodeIndex, VHost) ->
    rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
                                 rabbit_vhost, vhost_down, [VHost]).

all_connections(Config) ->
    all_connections(Config, 0).
all_connections(Config, NodeIndex) ->
    all_tracked_items(Config, NodeIndex, rabbit_connection_tracking).

all_channels(Config) ->
    all_channels(Config, 0).
all_channels(Config, NodeIndex) ->
    all_tracked_items(Config, NodeIndex, rabbit_channel_tracking).

all_tracked_items(Config, NodeIndex, TrackingMod) ->
    rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
                                 TrackingMod,
                                 list, []).

set_up_vhost(Config, VHost) ->
    rabbit_ct_broker_helpers:add_vhost(Config, VHost),
    rabbit_ct_broker_helpers:set_full_permissions(Config, <<"guest">>, VHost),
    set_vhost_connection_limit(Config, VHost, -1).

set_vhost_connection_limit(Config, VHost, Count) ->
    set_vhost_connection_limit(Config, 0, VHost, Count).

set_vhost_connection_limit(Config, NodeIndex, VHost, Count) ->
    Node  = rabbit_ct_broker_helpers:get_node_config(
              Config, NodeIndex, nodename),
    ok = rabbit_ct_broker_helpers:control_action(
      set_vhost_limits, Node,
      ["{\"max-connections\": " ++ integer_to_list(Count) ++ "}"],
      [{"-p", binary_to_list(VHost)}]).

set_tracking_execution_timeout(Config, Timeout) ->
    set_tracking_execution_timeout(Config, 0, Timeout).
set_tracking_execution_timeout(Config, NodeIndex, Timeout) ->
    rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
                                 application, set_env,
                                 [rabbit, tracking_execution_timeout, Timeout]).

get_tracking_execution_timeout(Config) ->
    get_tracking_execution_timeout(Config, 0).
get_tracking_execution_timeout(Config, NodeIndex) ->
    {ok, Timeout} = rabbit_ct_broker_helpers:rpc(
                                    Config, NodeIndex,
                                    application, get_env,
                                    [rabbit, tracking_execution_timeout]),
    Timeout.

await_running_node_refresh(_Config, _NodeIndex) ->
    timer:sleep(250).

expect_that_client_connection_is_rejected(Config) ->
    expect_that_client_connection_is_rejected(Config, 0).

expect_that_client_connection_is_rejected(Config, NodeIndex) ->
    {error, not_allowed} =
      rabbit_ct_client_helpers:open_unmanaged_connection(Config, NodeIndex).

expect_that_client_connection_is_rejected(Config, NodeIndex, VHost) ->
    {error, not_allowed} =
      rabbit_ct_client_helpers:open_unmanaged_connection(Config, NodeIndex, VHost).