summaryrefslogtreecommitdiff
path: root/deps/rabbitmq_sharding/test/src/rabbit_hash_exchange_SUITE.erl
blob: 66ce3daa4c09f545b959cba156f4f9161f0fbf40 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates.  All rights reserved.
%%
-module(rabbit_hash_exchange_SUITE).

-compile(export_all).

-include_lib("common_test/include/ct.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("eunit/include/eunit.hrl").

all() ->
    [
      {group, non_parallel_tests}
    ].

groups() ->
    [
      {non_parallel_tests, [], [
                                routed_to_zero_queue_test,
                                routed_to_one_queue_test,
                                routed_to_many_queue_test
                               ]}
    ].

%% -------------------------------------------------------------------
%% Test suite setup/teardown
%% -------------------------------------------------------------------

init_per_suite(Config) ->
    rabbit_ct_helpers:log_environment(),
    Config1 = rabbit_ct_helpers:set_config(Config, [
        {rmq_nodename_suffix, ?MODULE}
      ]),
    rabbit_ct_helpers:run_setup_steps(Config1,
      rabbit_ct_broker_helpers:setup_steps() ++
      rabbit_ct_client_helpers:setup_steps()).

end_per_suite(Config) ->
    rabbit_ct_helpers:run_teardown_steps(Config,
      rabbit_ct_client_helpers:teardown_steps() ++
      rabbit_ct_broker_helpers:teardown_steps()).

init_per_group(_, Config) ->
    Config.

end_per_group(_, Config) ->
    Config.

init_per_testcase(Testcase, Config) ->
    TestCaseName = rabbit_ct_helpers:config_to_testcase_name(Config, Testcase),
    Config1 = rabbit_ct_helpers:set_config(Config, {test_resource_name,
                                                    re:replace(TestCaseName, "/", "-", [global, {return, list}])}),
    rabbit_ct_helpers:testcase_started(Config1, Testcase).

end_per_testcase(Testcase, Config) ->
    rabbit_ct_helpers:testcase_finished(Config, Testcase).

%% -------------------------------------------------------------------
%% Test cases
%% -------------------------------------------------------------------

routed_to_zero_queue_test(Config) ->
    test0(Config, fun () ->
                  #'basic.publish'{exchange = make_exchange_name(Config, "0"), routing_key = rnd()}
          end,
          fun() ->
                  #amqp_msg{props = #'P_basic'{}, payload = <<>>}
          end, [], 5, 0),

    passed.

routed_to_one_queue_test(Config) ->
    test0(Config, fun () ->
                  #'basic.publish'{exchange = make_exchange_name(Config, "0"), routing_key = rnd()}
          end,
          fun() ->
                  #amqp_msg{props = #'P_basic'{}, payload = <<>>}
          end, [<<"q1">>, <<"q2">>, <<"q3">>], 1, 1),

    passed.

routed_to_many_queue_test(Config) ->
    test0(Config, fun () ->
                  #'basic.publish'{exchange = make_exchange_name(Config, "0"), routing_key = rnd()}
          end,
          fun() ->
                  #amqp_msg{props = #'P_basic'{}, payload = <<>>}
          end, [<<"q1">>, <<"q2">>, <<"q3">>], 5, 5),

    passed.

test0(Config, MakeMethod, MakeMsg, Queues, MsgCount, Count) ->
    {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
    E = make_exchange_name(Config, "0"),

    #'exchange.declare_ok'{} =
        amqp_channel:call(Chan,
                          #'exchange.declare' {
                            exchange = E,
                            type = <<"x-modulus-hash">>,
                            auto_delete = true
                           }),
    [#'queue.declare_ok'{} =
         amqp_channel:call(Chan, #'queue.declare' {
                             queue = Q, exclusive = true }) || Q <- Queues],
    [#'queue.bind_ok'{} =
         amqp_channel:call(Chan, #'queue.bind'{queue = Q,
                                               exchange = E,
                                               routing_key = <<"">>})
     || Q <- Queues],

    amqp_channel:call(Chan, #'confirm.select'{}),

    [amqp_channel:call(Chan,
                       MakeMethod(),
                       MakeMsg()) || _ <- lists:duplicate(MsgCount, const)],

    % ensure that the messages have been delivered to the queues before asking
    % for the message count
    amqp_channel:wait_for_confirms_or_die(Chan),

    Counts =
        [begin
             #'queue.declare_ok'{message_count = M} =
                 amqp_channel:call(Chan, #'queue.declare' {queue     = Q,
                                                           exclusive = true }),
             M
         end || Q <- Queues],

    ?assertEqual(Count, lists:sum(Counts)),

    amqp_channel:call(Chan, #'exchange.delete' { exchange = E }),
    [amqp_channel:call(Chan, #'queue.delete' { queue = Q }) || Q <- Queues],

    rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan),
    ok.

rnd() ->
    list_to_binary(integer_to_list(rand:uniform(1000000))).

make_exchange_name(Config, Suffix) ->
    B = rabbit_ct_helpers:get_config(Config, test_resource_name),
    erlang:list_to_binary("x-" ++ B ++ "-" ++ Suffix).