summaryrefslogtreecommitdiff
path: root/deps/rabbit/test/per_vhost_queue_limit_SUITE.erl
blob: 28a9f98537b28f7f76f5d505d32d1dc5549b2779 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2011-2020 VMware, Inc. or its affiliates.  All rights reserved.
%%

-module(per_vhost_queue_limit_SUITE).

-include_lib("common_test/include/ct.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("eunit/include/eunit.hrl").

-compile(export_all).

-import(rabbit_ct_client_helpers, [open_unmanaged_connection/3,
                                   close_connection_and_channel/2]).

all() ->
    [
     {group, cluster_size_1}
     , {group, cluster_size_2}
    ].

groups() ->
    [
      {cluster_size_1, [], [
          most_basic_single_node_queue_count,
          single_node_single_vhost_queue_count,
          single_node_multiple_vhosts_queue_count,
          single_node_single_vhost_limit,
          single_node_single_vhost_zero_limit,
          single_node_single_vhost_limit_with_durable_named_queue,
          single_node_single_vhost_zero_limit_with_durable_named_queue,
          single_node_single_vhost_limit_with_queue_ttl,
          single_node_single_vhost_limit_with_redeclaration
        ]},
      {cluster_size_2, [], [
          most_basic_cluster_queue_count,
          cluster_multiple_vhosts_queue_count,
          cluster_multiple_vhosts_limit,
          cluster_multiple_vhosts_zero_limit,
          cluster_multiple_vhosts_limit_with_durable_named_queue,
          cluster_multiple_vhosts_zero_limit_with_durable_named_queue,
          cluster_node_restart_queue_count
        ]}
    ].

suite() ->
    [
      %% If a test hangs, no need to wait for 30 minutes.
      {timetrap, {minutes, 8}}
    ].

%% -------------------------------------------------------------------
%% Testsuite setup/teardown.
%% -------------------------------------------------------------------

init_per_suite(Config) ->
    rabbit_ct_helpers:log_environment(),
    rabbit_ct_helpers:run_setup_steps(Config).

end_per_suite(Config) ->
    rabbit_ct_helpers:run_teardown_steps(Config).

init_per_group(cluster_size_1, Config) ->
    init_per_multinode_group(cluster_size_1, Config, 1);
init_per_group(cluster_size_2, Config) ->
    init_per_multinode_group(cluster_size_2, Config, 2);
init_per_group(cluster_rename, Config) ->
    init_per_multinode_group(cluster_rename, Config, 2).

init_per_multinode_group(Group, Config, NodeCount) ->
    Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
    Config1 = rabbit_ct_helpers:set_config(Config, [
                                                    {rmq_nodes_count, NodeCount},
                                                    {rmq_nodename_suffix, Suffix}
      ]),
    case Group of
        cluster_rename ->
            % The broker is managed by {init,end}_per_testcase().
            Config1;
        _ ->
            rabbit_ct_helpers:run_steps(Config1,
              rabbit_ct_broker_helpers:setup_steps() ++
              rabbit_ct_client_helpers:setup_steps())
    end.

end_per_group(cluster_rename, Config) ->
    % The broker is managed by {init,end}_per_testcase().
    Config;
end_per_group(_Group, Config) ->
    rabbit_ct_helpers:run_steps(Config,
      rabbit_ct_client_helpers:teardown_steps() ++
      rabbit_ct_broker_helpers:teardown_steps()).

init_per_testcase(vhost_limit_after_node_renamed = Testcase, Config) ->
    rabbit_ct_helpers:testcase_started(Config, Testcase),
    rabbit_ct_helpers:run_steps(Config,
      rabbit_ct_broker_helpers:setup_steps() ++
      rabbit_ct_client_helpers:setup_steps());
init_per_testcase(Testcase, Config) ->
    rabbit_ct_helpers:testcase_started(Config, Testcase),
    Config.

end_per_testcase(vhost_limit_after_node_renamed = Testcase, Config) ->
    Config1 = ?config(save_config, Config),
    rabbit_ct_helpers:run_steps(Config1,
      rabbit_ct_client_helpers:teardown_steps() ++
      rabbit_ct_broker_helpers:teardown_steps()),
    rabbit_ct_helpers:testcase_finished(Config1, Testcase);
end_per_testcase(Testcase, Config) ->
    rabbit_ct_helpers:testcase_finished(Config, Testcase).

%% -------------------------------------------------------------------
%% Test cases.
%% -------------------------------------------------------------------

most_basic_single_node_queue_count(Config) ->
    VHost = <<"queue-limits">>,
    set_up_vhost(Config, VHost),
    ?assertEqual(0, count_queues_in(Config, VHost)),
    Conn     = open_unmanaged_connection(Config, 0, VHost),
    {ok, Ch} = amqp_connection:open_channel(Conn),
    declare_exclusive_queues(Ch, 10),
    ?assertEqual(10, count_queues_in(Config, VHost)),
    close_connection_and_channel(Conn, Ch),
    ?assertEqual(0, count_queues_in(Config, VHost)),
    rabbit_ct_broker_helpers:delete_vhost(Config, VHost).

single_node_single_vhost_queue_count(Config) ->
    VHost = <<"queue-limits">>,
    set_up_vhost(Config, VHost),
    ?assertEqual(0, count_queues_in(Config, VHost)),
    Conn     = open_unmanaged_connection(Config, 0, VHost),
    {ok, Ch} = amqp_connection:open_channel(Conn),
    declare_exclusive_queues(Ch, 10),
    ?assertEqual(10, count_queues_in(Config, VHost)),
    declare_durable_queues(Ch, 10),
    ?assertEqual(20, count_queues_in(Config, VHost)),
    delete_durable_queues(Ch, 10),
    ?assertEqual(10, count_queues_in(Config, VHost)),
    close_connection_and_channel(Conn, Ch),
    ?assertEqual(0, count_queues_in(Config, VHost)),
    rabbit_ct_broker_helpers:delete_vhost(Config, VHost).

single_node_multiple_vhosts_queue_count(Config) ->
    VHost1 = <<"queue-limits1">>,
    VHost2 = <<"queue-limits2">>,
    set_up_vhost(Config, VHost1),
    set_up_vhost(Config, VHost2),

    ?assertEqual(0, count_queues_in(Config, VHost1)),
    ?assertEqual(0, count_queues_in(Config, VHost2)),

    Conn1     = open_unmanaged_connection(Config, 0, VHost1),
    {ok, Ch1} = amqp_connection:open_channel(Conn1),
    Conn2     = open_unmanaged_connection(Config, 0, VHost2),
    {ok, Ch2} = amqp_connection:open_channel(Conn2),

    declare_exclusive_queues(Ch1, 10),
    ?assertEqual(10, count_queues_in(Config, VHost1)),
    declare_durable_queues(Ch1, 10),
    ?assertEqual(20, count_queues_in(Config, VHost1)),
    delete_durable_queues(Ch1, 10),
    ?assertEqual(10, count_queues_in(Config, VHost1)),
    declare_exclusive_queues(Ch2, 30),
    ?assertEqual(30, count_queues_in(Config, VHost2)),
    close_connection_and_channel(Conn1, Ch1),
    ?assertEqual(0, count_queues_in(Config, VHost1)),
    close_connection_and_channel(Conn2, Ch2),
    ?assertEqual(0, count_queues_in(Config, VHost2)),
    rabbit_ct_broker_helpers:delete_vhost(Config, VHost1),
    rabbit_ct_broker_helpers:delete_vhost(Config, VHost2).

single_node_single_vhost_limit(Config) ->
    single_node_single_vhost_limit_with(Config, 5),
    single_node_single_vhost_limit_with(Config, 10).
single_node_single_vhost_zero_limit(Config) ->
    single_node_single_vhost_zero_limit_with(Config, #'queue.declare'{queue     = <<"">>,
                                                                      exclusive = true}).

single_node_single_vhost_limit_with_durable_named_queue(Config) ->
    VHost = <<"queue-limits">>,
    set_up_vhost(Config, VHost),
    ?assertEqual(0, count_queues_in(Config, VHost)),

    set_vhost_queue_limit(Config, VHost, 3),
    Conn     = open_unmanaged_connection(Config, 0, VHost),
    {ok, Ch} = amqp_connection:open_channel(Conn),

    #'queue.declare_ok'{queue = _} =
                        amqp_channel:call(Ch, #'queue.declare'{queue     = <<"Q1">>,
                                                               exclusive = false,
                                                               durable   = true}),
    #'queue.declare_ok'{queue = _} =
                        amqp_channel:call(Ch, #'queue.declare'{queue     = <<"Q2">>,
                                                               exclusive = false,
                                                               durable   = true}),
    #'queue.declare_ok'{queue = _} =
                        amqp_channel:call(Ch, #'queue.declare'{queue     = <<"Q3">>,
                                                               exclusive = false,
                                                               durable   = true}),

    expect_shutdown_due_to_precondition_failed(
     fun () ->
             amqp_channel:call(Ch, #'queue.declare'{queue     = <<"Q4">>,
                                                    exclusive = false,
                                                    durable   = true})
     end),

    rabbit_ct_broker_helpers:delete_vhost(Config, VHost).

single_node_single_vhost_zero_limit_with_durable_named_queue(Config) ->
    single_node_single_vhost_zero_limit_with(Config, #'queue.declare'{queue     = <<"Q4">>,
                                                                      exclusive = false,
                                                                      durable   = true}).

single_node_single_vhost_limit_with(Config, WatermarkLimit) ->
    VHost = <<"queue-limits">>,
    set_up_vhost(Config, VHost),
    ?assertEqual(0, count_queues_in(Config, VHost)),

    set_vhost_queue_limit(Config, VHost, 3),
    Conn     = open_unmanaged_connection(Config, 0, VHost),
    {ok, Ch} = amqp_connection:open_channel(Conn),

    set_vhost_queue_limit(Config, VHost, WatermarkLimit),
    lists:foreach(fun (_) ->
                    #'queue.declare_ok'{queue = _} =
                        amqp_channel:call(Ch, #'queue.declare'{queue     = <<"">>,
                                                               exclusive = true})
                  end, lists:seq(1, WatermarkLimit)),

    expect_shutdown_due_to_precondition_failed(
     fun () ->
             amqp_channel:call(Ch, #'queue.declare'{queue     = <<"">>,
                                                    exclusive = true})
     end),

    rabbit_ct_broker_helpers:delete_vhost(Config, VHost).

single_node_single_vhost_zero_limit_with(Config, QueueDeclare) ->
    VHost = <<"queue-limits">>,
    set_up_vhost(Config, VHost),
    ?assertEqual(0, count_queues_in(Config, VHost)),

    Conn1     = open_unmanaged_connection(Config, 0, VHost),
    {ok, Ch1} = amqp_connection:open_channel(Conn1),

    set_vhost_queue_limit(Config, VHost, 0),

    expect_shutdown_due_to_precondition_failed(
     fun () ->
             amqp_channel:call(Ch1, QueueDeclare)
     end),

    Conn2     = open_unmanaged_connection(Config, 0, VHost),
    {ok, Ch2} = amqp_connection:open_channel(Conn2),

    %% lift the limit
    set_vhost_queue_limit(Config, VHost, -1),
    lists:foreach(fun (_) ->
                    #'queue.declare_ok'{queue = _} =
                        amqp_channel:call(Ch2, #'queue.declare'{queue     = <<"">>,
                                                               exclusive = true})
                  end, lists:seq(1, 100)),

    rabbit_ct_broker_helpers:delete_vhost(Config, VHost).


single_node_single_vhost_limit_with_queue_ttl(Config) ->
    VHost = <<"queue-limits">>,
    set_up_vhost(Config, VHost),
    ?assertEqual(0, count_queues_in(Config, VHost)),

    Conn1     = open_unmanaged_connection(Config, 0, VHost),
    {ok, Ch1} = amqp_connection:open_channel(Conn1),

    set_vhost_queue_limit(Config, VHost, 3),

    lists:foreach(fun (_) ->
                    #'queue.declare_ok'{queue = _} =
                        amqp_channel:call(Ch1, #'queue.declare'{queue     = <<"">>,
                                                                exclusive = true,
                                                                arguments = [{<<"x-expires">>, long, 2000}]})
                  end, lists:seq(1, 3)),


    expect_shutdown_due_to_precondition_failed(
     fun () ->
             amqp_channel:call(Ch1, #'queue.declare'{queue     = <<"">>,
                                                     exclusive = true})
     end),

    Conn2     = open_unmanaged_connection(Config, 0, VHost),
    {ok, Ch2} = amqp_connection:open_channel(Conn2),

    %% wait for the queues to expire
    timer:sleep(3000),

    #'queue.declare_ok'{queue = _} =
        amqp_channel:call(Ch2, #'queue.declare'{queue     = <<"">>,
                                                exclusive = true}),

    rabbit_ct_broker_helpers:delete_vhost(Config, VHost).


single_node_single_vhost_limit_with_redeclaration(Config) ->
    VHost = <<"queue-limits">>,
    set_up_vhost(Config, VHost),
    ?assertEqual(0, count_queues_in(Config, VHost)),

    set_vhost_queue_limit(Config, VHost, 3),
    Conn1     = open_unmanaged_connection(Config, 0, VHost),
    {ok, Ch1} = amqp_connection:open_channel(Conn1),

    #'queue.declare_ok'{queue = _} =
                        amqp_channel:call(Ch1, #'queue.declare'{queue     = <<"Q1">>,
                                                                exclusive = false,
                                                                durable   = true}),
    #'queue.declare_ok'{queue = _} =
                        amqp_channel:call(Ch1, #'queue.declare'{queue     = <<"Q2">>,
                                                                exclusive = false,
                                                                durable   = true}),
    #'queue.declare_ok'{queue = _} =
                        amqp_channel:call(Ch1, #'queue.declare'{queue     = <<"Q3">>,
                                                                exclusive = false,
                                                                durable   = true}),

    %% can't declare a new queue...
    expect_shutdown_due_to_precondition_failed(
     fun () ->
             amqp_channel:call(Ch1, #'queue.declare'{queue     = <<"Q4">>,
                                                     exclusive = false,
                                                     durable   = true})
     end),


    Conn2     = open_unmanaged_connection(Config, 0, VHost),
    {ok, Ch2} = amqp_connection:open_channel(Conn2),

    %% ...but re-declarations succeed
    #'queue.declare_ok'{queue = _} =
                        amqp_channel:call(Ch2, #'queue.declare'{queue     = <<"Q1">>,
                                                                exclusive = false,
                                                                durable   = true}),
    #'queue.declare_ok'{queue = _} =
                        amqp_channel:call(Ch2, #'queue.declare'{queue     = <<"Q2">>,
                                                                exclusive = false,
                                                                durable   = true}),
    #'queue.declare_ok'{queue = _} =
                        amqp_channel:call(Ch2, #'queue.declare'{queue     = <<"Q3">>,
                                                                exclusive = false,
                                                                durable   = true}),

    expect_shutdown_due_to_precondition_failed(
     fun () ->
             amqp_channel:call(Ch2, #'queue.declare'{queue     = <<"Q4">>,
                                                     exclusive = false,
                                                     durable   = true})
     end),

    rabbit_ct_broker_helpers:delete_vhost(Config, VHost).


most_basic_cluster_queue_count(Config) ->
    VHost = <<"queue-limits">>,
    set_up_vhost(Config, VHost),
    ?assertEqual(0, count_queues_in(Config, VHost, 0)),
    ?assertEqual(0, count_queues_in(Config, VHost, 1)),

    Conn1     = open_unmanaged_connection(Config, 0, VHost),
    {ok, Ch1} = amqp_connection:open_channel(Conn1),
    declare_exclusive_queues(Ch1, 10),
    ?assertEqual(10, count_queues_in(Config, VHost, 0)),
    ?assertEqual(10, count_queues_in(Config, VHost, 1)),

    Conn2     = open_unmanaged_connection(Config, 0, VHost),
    {ok, Ch2} = amqp_connection:open_channel(Conn2),
    declare_exclusive_queues(Ch2, 15),
    ?assertEqual(25, count_queues_in(Config, VHost, 0)),
    ?assertEqual(25, count_queues_in(Config, VHost, 1)),
    close_connection_and_channel(Conn1, Ch1),
    close_connection_and_channel(Conn2, Ch2),
    ?assertEqual(0, count_queues_in(Config, VHost, 0)),
    ?assertEqual(0, count_queues_in(Config, VHost, 1)),
    rabbit_ct_broker_helpers:delete_vhost(Config, VHost).

cluster_node_restart_queue_count(Config) ->
    VHost = <<"queue-limits">>,
    set_up_vhost(Config, VHost),
    ?assertEqual(0, count_queues_in(Config, VHost, 0)),
    ?assertEqual(0, count_queues_in(Config, VHost, 1)),

    Conn1     = open_unmanaged_connection(Config, 0, VHost),
    {ok, Ch1} = amqp_connection:open_channel(Conn1),
    declare_exclusive_queues(Ch1, 10),
    ?assertEqual(10, count_queues_in(Config, VHost, 0)),
    ?assertEqual(10, count_queues_in(Config, VHost, 1)),

    rabbit_ct_broker_helpers:restart_broker(Config, 0),
    ?assertEqual(0, count_queues_in(Config, VHost, 0)),

    Conn2     = open_unmanaged_connection(Config, 1, VHost),
    {ok, Ch2} = amqp_connection:open_channel(Conn2),
    declare_exclusive_queues(Ch2, 15),
    ?assertEqual(15, count_queues_in(Config, VHost, 0)),
    ?assertEqual(15, count_queues_in(Config, VHost, 1)),

    declare_durable_queues(Ch2, 10),
    ?assertEqual(25, count_queues_in(Config, VHost, 0)),
    ?assertEqual(25, count_queues_in(Config, VHost, 1)),

    rabbit_ct_broker_helpers:restart_broker(Config, 1),

    ?assertEqual(10, count_queues_in(Config, VHost, 0)),
    ?assertEqual(10, count_queues_in(Config, VHost, 1)),
    rabbit_ct_broker_helpers:delete_vhost(Config, VHost).


cluster_multiple_vhosts_queue_count(Config) ->
    VHost1 = <<"queue-limits1">>,
    VHost2 = <<"queue-limits2">>,
    set_up_vhost(Config, VHost1),
    set_up_vhost(Config, VHost2),

    ?assertEqual(0, count_queues_in(Config, VHost1)),
    ?assertEqual(0, count_queues_in(Config, VHost2)),

    Conn1     = open_unmanaged_connection(Config, 0, VHost1),
    {ok, Ch1} = amqp_connection:open_channel(Conn1),
    declare_exclusive_queues(Ch1, 10),
    ?assertEqual(10, count_queues_in(Config, VHost1, 0)),
    ?assertEqual(10, count_queues_in(Config, VHost1, 1)),
    ?assertEqual(0,  count_queues_in(Config, VHost2, 0)),
    ?assertEqual(0,  count_queues_in(Config, VHost2, 1)),

    Conn2     = open_unmanaged_connection(Config, 0, VHost2),
    {ok, Ch2} = amqp_connection:open_channel(Conn2),
    declare_exclusive_queues(Ch2, 15),
    ?assertEqual(15, count_queues_in(Config, VHost2, 0)),
    ?assertEqual(15, count_queues_in(Config, VHost2, 1)),
    close_connection_and_channel(Conn1, Ch1),
    close_connection_and_channel(Conn2, Ch2),
    ?assertEqual(0, count_queues_in(Config, VHost1, 0)),
    ?assertEqual(0, count_queues_in(Config, VHost1, 1)),
    ?assertEqual(0, count_queues_in(Config, VHost2, 0)),
    ?assertEqual(0, count_queues_in(Config, VHost2, 1)),

    rabbit_ct_broker_helpers:delete_vhost(Config, VHost1),
    rabbit_ct_broker_helpers:delete_vhost(Config, VHost2).

cluster_multiple_vhosts_limit(Config) ->
    cluster_multiple_vhosts_limit_with(Config, 10),
    cluster_multiple_vhosts_limit_with(Config, 20).

cluster_multiple_vhosts_limit_with(Config, WatermarkLimit) ->
    VHost1 = <<"queue-limits1">>,
    VHost2 = <<"queue-limits2">>,
    set_up_vhost(Config, VHost1),
    set_up_vhost(Config, VHost2),
    ?assertEqual(0, count_queues_in(Config, VHost1)),
    ?assertEqual(0, count_queues_in(Config, VHost2)),

    set_vhost_queue_limit(Config, VHost1, 3),
    set_vhost_queue_limit(Config, VHost2, 3),

    Conn1     = open_unmanaged_connection(Config, 0, VHost1),
    {ok, Ch1} = amqp_connection:open_channel(Conn1),
    set_vhost_queue_limit(Config, VHost1, WatermarkLimit),

    lists:foreach(fun (_) ->
                    #'queue.declare_ok'{queue = _} =
                        amqp_channel:call(Ch1, #'queue.declare'{queue     = <<"">>,
                                                               exclusive = true})
                  end, lists:seq(1, WatermarkLimit)),

    Conn2     = open_unmanaged_connection(Config, 1, VHost2),
    {ok, Ch2} = amqp_connection:open_channel(Conn2),
    set_vhost_queue_limit(Config, VHost2, WatermarkLimit),

    lists:foreach(fun (_) ->
                    #'queue.declare_ok'{queue = _} =
                        amqp_channel:call(Ch2, #'queue.declare'{queue     = <<"">>,
                                                               exclusive = true})
                  end, lists:seq(1, WatermarkLimit)),

    expect_shutdown_due_to_precondition_failed(
     fun () ->
             amqp_channel:call(Ch1, #'queue.declare'{queue     = <<"">>,
                                                    exclusive = true})
     end),
    expect_shutdown_due_to_precondition_failed(
     fun () ->
             amqp_channel:call(Ch2, #'queue.declare'{queue     = <<"">>,
                                                    exclusive = true})
     end),

    rabbit_ct_broker_helpers:delete_vhost(Config, VHost1),
    rabbit_ct_broker_helpers:delete_vhost(Config, VHost2).


cluster_multiple_vhosts_zero_limit(Config) ->
    cluster_multiple_vhosts_zero_limit_with(Config, #'queue.declare'{queue     = <<"">>,
                                                                      exclusive = true}).

cluster_multiple_vhosts_limit_with_durable_named_queue(Config) ->
    VHost1 = <<"queue-limits1">>,
    VHost2 = <<"queue-limits2">>,
    set_up_vhost(Config, VHost1),
    set_up_vhost(Config, VHost2),
    ?assertEqual(0, count_queues_in(Config, VHost1)),
    ?assertEqual(0, count_queues_in(Config, VHost2)),

    set_vhost_queue_limit(Config, VHost1, 3),
    set_vhost_queue_limit(Config, VHost2, 3),

    Conn1     = open_unmanaged_connection(Config, 0, VHost1),
    {ok, Ch1} = amqp_connection:open_channel(Conn1),

    Conn2     = open_unmanaged_connection(Config, 1, VHost2),
    {ok, Ch2} = amqp_connection:open_channel(Conn2),

    #'queue.declare_ok'{} =
                        amqp_channel:call(Ch1, #'queue.declare'{queue     = <<"Q1">>,
                                                                exclusive = false,
                                                                durable   = true}),
    #'queue.declare_ok'{} =
                        amqp_channel:call(Ch1, #'queue.declare'{queue     = <<"Q2">>,
                                                                exclusive = false,
                                                                durable   = true}),
    #'queue.declare_ok'{} =
                        amqp_channel:call(Ch1, #'queue.declare'{queue     = <<"Q3">>,
                                                                exclusive = false,
                                                                durable   = true}),

    #'queue.declare_ok'{} =
                        amqp_channel:call(Ch2, #'queue.declare'{queue     = <<"Q1">>,
                                                                exclusive = false,
                                                                durable   = true}),
    #'queue.declare_ok'{} =
                        amqp_channel:call(Ch2, #'queue.declare'{queue     = <<"Q2">>,
                                                                exclusive = false,
                                                                durable   = true}),
    #'queue.declare_ok'{} =
                        amqp_channel:call(Ch2, #'queue.declare'{queue     = <<"Q3">>,
                                                                exclusive = false,
                                                                durable   = true}),

    expect_shutdown_due_to_precondition_failed(
     fun () ->
             amqp_channel:call(Ch1, #'queue.declare'{queue     = <<"Q3">>,
                                                     exclusive = false,
                                                     durable   = true})
     end),
    expect_shutdown_due_to_precondition_failed(
     fun () ->
             amqp_channel:call(Ch2, #'queue.declare'{queue     = <<"Q3">>,
                                                     exclusive = false,
                                                     durable   = true})
     end),

    rabbit_ct_broker_helpers:delete_vhost(Config, VHost1),
    rabbit_ct_broker_helpers:delete_vhost(Config, VHost2).

cluster_multiple_vhosts_zero_limit_with_durable_named_queue(Config) ->
    cluster_multiple_vhosts_zero_limit_with(Config, #'queue.declare'{queue     = <<"Q4">>,
                                                                  exclusive = false,
                                                                  durable   = true}).

cluster_multiple_vhosts_zero_limit_with(Config, QueueDeclare) ->
    VHost1 = <<"queue-limits1">>,
    VHost2 = <<"queue-limits2">>,
    set_up_vhost(Config, VHost1),
    set_up_vhost(Config, VHost2),
    ?assertEqual(0, count_queues_in(Config, VHost1)),
    ?assertEqual(0, count_queues_in(Config, VHost2)),

    Conn1     = open_unmanaged_connection(Config, 0, VHost1),
    {ok, Ch1} = amqp_connection:open_channel(Conn1),
    Conn2     = open_unmanaged_connection(Config, 1, VHost2),
    {ok, Ch2} = amqp_connection:open_channel(Conn2),

    set_vhost_queue_limit(Config, VHost1, 0),
    set_vhost_queue_limit(Config, VHost2, 0),

    expect_shutdown_due_to_precondition_failed(
     fun () ->
             amqp_channel:call(Ch1, QueueDeclare)
     end),
    expect_shutdown_due_to_precondition_failed(
     fun () ->
             amqp_channel:call(Ch2, QueueDeclare)
     end),


    Conn3     = open_unmanaged_connection(Config, 0, VHost1),
    {ok, Ch3} = amqp_connection:open_channel(Conn3),
    Conn4     = open_unmanaged_connection(Config, 1, VHost2),
    {ok, Ch4} = amqp_connection:open_channel(Conn4),

    %% lift the limits
    set_vhost_queue_limit(Config, VHost1, -1),
    set_vhost_queue_limit(Config, VHost2, -1),
    lists:foreach(fun (_) ->
                    #'queue.declare_ok'{queue = _} =
                        amqp_channel:call(Ch3, #'queue.declare'{queue     = <<"">>,
                                                               exclusive = true}),
                    #'queue.declare_ok'{queue = _} =
                        amqp_channel:call(Ch4, #'queue.declare'{queue     = <<"">>,
                                                               exclusive = true})
                  end, lists:seq(1, 400)),

    rabbit_ct_broker_helpers:delete_vhost(Config, VHost1),
    rabbit_ct_broker_helpers:delete_vhost(Config, VHost2).



%% -------------------------------------------------------------------
%% Helpers
%% -------------------------------------------------------------------

set_up_vhost(Config, VHost) ->
    rabbit_ct_broker_helpers:add_vhost(Config, VHost),
    rabbit_ct_broker_helpers:set_full_permissions(Config, <<"guest">>, VHost),
    set_vhost_queue_limit(Config, VHost, -1).

set_vhost_queue_limit(Config, VHost, Count) ->
    set_vhost_queue_limit(Config, 0, VHost, Count).

set_vhost_queue_limit(Config, NodeIndex, VHost, Count) ->
    Node  = rabbit_ct_broker_helpers:get_node_config(
              Config, NodeIndex, nodename),
    rabbit_ct_broker_helpers:control_action(
      set_vhost_limits, Node,
      ["{\"max-queues\": " ++ integer_to_list(Count) ++ "}"],
      [{"-p", binary_to_list(VHost)}]).

count_queues_in(Config, VHost) ->
    count_queues_in(Config, VHost, 0).
count_queues_in(Config, VHost, NodeIndex) ->
    timer:sleep(200),
    rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
                                 rabbit_amqqueue,
                                 count, [VHost]).

declare_exclusive_queues(Ch, N) ->
    lists:foreach(fun (_) ->
                          amqp_channel:call(Ch,
                                            #'queue.declare'{queue     = <<"">>,
                                                             exclusive = true})
                  end,
                  lists:seq(1, N)).

declare_durable_queues(Ch, N) ->
    lists:foreach(fun (I) ->
                          amqp_channel:call(Ch,
                                            #'queue.declare'{queue     = durable_queue_name(I),
                                                             exclusive = false,
                                                             durable   = true})
                  end,
                  lists:seq(1, N)).

delete_durable_queues(Ch, N) ->
    lists:foreach(fun (I) ->
                          amqp_channel:call(Ch,
                                            #'queue.delete'{queue = durable_queue_name(I)})
                  end,
                  lists:seq(1, N)).

durable_queue_name(N) when is_integer(N) ->
    iolist_to_binary(io_lib:format("queue-limits-durable-~p", [N])).

expect_shutdown_due_to_precondition_failed(Thunk) ->
    try
        Thunk(),
        ok
    catch _:{{shutdown, {server_initiated_close, 406, _}}, _} ->
        %% expected
        ok
    end.