summaryrefslogtreecommitdiff
path: root/deps/rabbit/test/lazy_queue_SUITE.erl
blob: 8748b07aca375c5bf71bd215b6484714928d10ee (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates.  All rights reserved.
%%

-module(lazy_queue_SUITE).

-include_lib("common_test/include/ct.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").

-compile(export_all).

-define(QNAME, <<"queue.mode.test">>).
-define(MESSAGE_COUNT, 2000).

all() ->
    [
      {group, non_parallel_tests}
    ].

groups() ->
    [
      {non_parallel_tests, [], [
          declare_args,
          queue_mode_policy,
          publish_consume
        ]}
    ].

%% -------------------------------------------------------------------
%% Testsuite setup/teardown.
%% -------------------------------------------------------------------

init_per_suite(Config) ->
    rabbit_ct_helpers:log_environment(),
    rabbit_ct_helpers:run_setup_steps(Config).

end_per_suite(Config) ->
    rabbit_ct_helpers:run_teardown_steps(Config).

init_per_group(_, Config) ->
    Config.

end_per_group(_, Config) ->
    Config.

init_per_testcase(Testcase, Config) ->
    rabbit_ct_helpers:testcase_started(Config, Testcase),
    ClusterSize = 2,
    TestNumber = rabbit_ct_helpers:testcase_number(Config, ?MODULE, Testcase),
    Config1 = rabbit_ct_helpers:set_config(Config, [
        {rmq_nodes_count, ClusterSize},
        {rmq_nodes_clustered, true},
        {rmq_nodename_suffix, Testcase},
        {tcp_ports_base, {skip_n_nodes, TestNumber * ClusterSize}}
      ]),
    rabbit_ct_helpers:run_steps(Config1,
      rabbit_ct_broker_helpers:setup_steps() ++
      rabbit_ct_client_helpers:setup_steps() ++ [
        fun rabbit_ct_broker_helpers:set_ha_policy_all/1
      ]).

end_per_testcase(Testcase, Config) ->
    Config1 = rabbit_ct_helpers:run_steps(Config,
      rabbit_ct_client_helpers:teardown_steps() ++
      rabbit_ct_broker_helpers:teardown_steps()),
    rabbit_ct_helpers:testcase_finished(Config1, Testcase).

%% -------------------------------------------------------------------
%% Testcases.
%% -------------------------------------------------------------------

declare_args(Config) ->
    A = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),

    Ch = rabbit_ct_client_helpers:open_channel(Config, A),
    LQ = <<"lazy-q">>,
    declare(Ch, LQ, [{<<"x-queue-mode">>, longstr, <<"lazy">>}]),
    assert_queue_mode(A, LQ, lazy),

    DQ = <<"default-q">>,
    declare(Ch, DQ, [{<<"x-queue-mode">>, longstr, <<"default">>}]),
    assert_queue_mode(A, DQ, default),

    DQ2 = <<"default-q2">>,
    declare(Ch, DQ2),
    assert_queue_mode(A, DQ2, default),

    passed.

queue_mode_policy(Config) ->
    A = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),

    set_ha_mode_policy(Config, A, <<"lazy">>),

    Ch = rabbit_ct_client_helpers:open_channel(Config, A),

    LQ = <<"lazy-q">>,
    declare(Ch, LQ, [{<<"x-queue-mode">>, longstr, <<"lazy">>}]),
    assert_queue_mode(A, LQ, lazy),

    LQ2 = <<"lazy-q-2">>,
    declare(Ch, LQ2),
    assert_queue_mode(A, LQ2, lazy),

    DQ = <<"default-q">>,
    declare(Ch, DQ, [{<<"x-queue-mode">>, longstr, <<"default">>}]),
    assert_queue_mode(A, DQ, default),

    set_ha_mode_policy(Config, A, <<"default">>),

    ok = wait_for_queue_mode(A, LQ,  lazy, 5000),
    ok = wait_for_queue_mode(A, LQ2, default, 5000),
    ok = wait_for_queue_mode(A, DQ,  default, 5000),

    passed.

publish_consume(Config) ->
    A = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),

    Ch = rabbit_ct_client_helpers:open_channel(Config, A),
    declare(Ch, ?QNAME),

    rabbit_ct_client_helpers:publish(Ch, ?QNAME, ?MESSAGE_COUNT),
    consume(Ch, ?QNAME, ack),
    [assert_delivered(Ch, ack, P) || P <- lists:seq(1, ?MESSAGE_COUNT)],

    set_ha_mode_policy(Config, A, <<"lazy">>),
    rabbit_ct_client_helpers:publish(Ch, ?QNAME, ?MESSAGE_COUNT),
    rabbit_ct_client_helpers:publish(Ch, ?QNAME, ?MESSAGE_COUNT),
    [assert_delivered(Ch, ack, P) || P <- lists:seq(1, ?MESSAGE_COUNT)],

    set_ha_mode_policy(Config, A, <<"default">>),
    [assert_delivered(Ch, ack, P) || P <- lists:seq(1, ?MESSAGE_COUNT)],

    rabbit_ct_client_helpers:publish(Ch, ?QNAME, ?MESSAGE_COUNT),
    set_ha_mode_policy(Config, A, <<"lazy">>),
    rabbit_ct_client_helpers:publish(Ch, ?QNAME, ?MESSAGE_COUNT),
    set_ha_mode_policy(Config, A, <<"default">>),
    [assert_delivered(Ch, ack, P) || P <- lists:seq(1, ?MESSAGE_COUNT)],

    set_ha_mode_policy(Config, A, <<"lazy">>),
    [assert_delivered(Ch, ack, P) || P <- lists:seq(1, ?MESSAGE_COUNT)],

    cancel(Ch),

    passed.

%%----------------------------------------------------------------------------

declare(Ch, Q) ->
    declare(Ch, Q, []).

declare(Ch, Q, Args) ->
    amqp_channel:call(Ch, #'queue.declare'{queue     = Q,
                                           durable   = true,
                                           arguments = Args}).

consume(Ch, Q, Ack) ->
    amqp_channel:subscribe(Ch, #'basic.consume'{queue        = Q,
                                                no_ack       = Ack =:= no_ack,
                                                consumer_tag = <<"ctag">>},
                           self()),
    receive
        #'basic.consume_ok'{consumer_tag = <<"ctag">>} ->
             ok
    end.

cancel(Ch) ->
    amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = <<"ctag">>}).

assert_delivered(Ch, Ack, Payload) ->
    PBin = payload2bin(Payload),
    receive
        {#'basic.deliver'{delivery_tag = DTag}, #amqp_msg{payload = PBin2}} ->
            PBin = PBin2,
            maybe_ack(Ch, Ack, DTag)
    end.

maybe_ack(Ch, do_ack, DTag) ->
    amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DTag}),
    DTag;
maybe_ack(_Ch, _, DTag) ->
    DTag.

payload2bin(Int) -> list_to_binary(integer_to_list(Int)).

set_ha_mode_policy(Config, Node, Mode) ->
    ok = rabbit_ct_broker_helpers:set_ha_policy(Config, Node, <<".*">>, <<"all">>,
      [{<<"queue-mode">>, Mode}]).


wait_for_queue_mode(_Node, _Q, _Mode, Max) when Max < 0 ->
    fail;
wait_for_queue_mode(Node, Q, Mode, Max) ->
    case get_queue_mode(Node, Q) of
        Mode  -> ok;
        _     -> timer:sleep(100),
                 wait_for_queue_mode(Node, Q, Mode, Max - 100)
    end.

assert_queue_mode(Node, Q, Expected) ->
    Actual = get_queue_mode(Node, Q),
    Expected = Actual.

get_queue_mode(Node, Q) ->
    QNameRes = rabbit_misc:r(<<"/">>, queue, Q),
    {ok, AMQQueue} =
        rpc:call(Node, rabbit_amqqueue, lookup, [QNameRes]),
    [{backing_queue_status, Status}] =
        rpc:call(Node, rabbit_amqqueue, info,
                 [AMQQueue, [backing_queue_status]]),
    proplists:get_value(mode, Status).