1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
|
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2011-2021 VMware, Inc. or its affiliates. All rights reserved.
%%
-module(priority_queue_recovery_SUITE).
-include_lib("common_test/include/ct.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-compile(export_all).
all() ->
[
{group, non_parallel_tests}
].
groups() ->
[
{non_parallel_tests, [], [
recovery %% Restart RabbitMQ.
]}
].
%% -------------------------------------------------------------------
%% Testsuite setup/teardown.
%% -------------------------------------------------------------------
init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(),
rabbit_ct_helpers:run_setup_steps(Config).
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).
init_per_group(_, Config) ->
Config1 = rabbit_ct_helpers:set_config(Config, [
{rmq_nodes_count, 2}
]),
rabbit_ct_helpers:run_steps(Config1,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()).
end_per_group(_, Config) ->
rabbit_ct_helpers:run_steps(Config,
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()).
init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase).
end_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_finished(Config, Testcase).
%% -------------------------------------------------------------------
%% Testcases.
%% -------------------------------------------------------------------
recovery(Config) ->
{Conn, Ch} = open(Config),
Q = <<"recovery-queue">>,
declare(Ch, Q, 3),
publish(Ch, Q, [1, 2, 3, 1, 2, 3, 1, 2, 3]),
rabbit_ct_client_helpers:close_channel(Ch),
rabbit_ct_client_helpers:close_connection(Conn),
rabbit_ct_broker_helpers:restart_broker(Config, 0),
{Conn2, Ch2} = open(Config, 1),
get_all(Ch2, Q, do_ack, [3, 3, 3, 2, 2, 2, 1, 1, 1]),
delete(Ch2, Q),
rabbit_ct_client_helpers:close_channel(Ch2),
rabbit_ct_client_helpers:close_connection(Conn2),
passed.
%%----------------------------------------------------------------------------
open(Config) ->
open(Config, 0).
open(Config, NodeIndex) ->
rabbit_ct_client_helpers:open_connection_and_channel(Config, NodeIndex).
declare(Ch, Q, Args) when is_list(Args) ->
amqp_channel:call(Ch, #'queue.declare'{queue = Q,
durable = true,
arguments = Args});
declare(Ch, Q, Max) ->
declare(Ch, Q, arguments(Max)).
delete(Ch, Q) ->
amqp_channel:call(Ch, #'queue.delete'{queue = Q}).
publish(Ch, Q, Ps) ->
amqp_channel:call(Ch, #'confirm.select'{}),
[publish1(Ch, Q, P) || P <- Ps],
amqp_channel:wait_for_confirms(Ch).
publish1(Ch, Q, P) ->
amqp_channel:cast(Ch, #'basic.publish'{routing_key = Q},
#amqp_msg{props = props(P),
payload = priority2bin(P)}).
publish1(Ch, Q, P, Pd) ->
amqp_channel:cast(Ch, #'basic.publish'{routing_key = Q},
#amqp_msg{props = props(P),
payload = Pd}).
get_all(Ch, Q, Ack, Ps) ->
DTags = get_partial(Ch, Q, Ack, Ps),
get_empty(Ch, Q),
DTags.
get_partial(Ch, Q, Ack, Ps) ->
[get_ok(Ch, Q, Ack, priority2bin(P)) || P <- Ps].
get_empty(Ch, Q) ->
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = Q}).
get_ok(Ch, Q, Ack, PBin) ->
{#'basic.get_ok'{delivery_tag = DTag}, #amqp_msg{payload = PBin2}} =
amqp_channel:call(Ch, #'basic.get'{queue = Q,
no_ack = Ack =:= no_ack}),
PBin = PBin2,
maybe_ack(Ch, Ack, DTag).
maybe_ack(Ch, do_ack, DTag) ->
amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DTag}),
DTag;
maybe_ack(_Ch, _, DTag) ->
DTag.
arguments(none) -> [];
arguments(Max) -> [{<<"x-max-priority">>, byte, Max}].
priority2bin(undefined) -> <<"undefined">>;
priority2bin(Int) -> list_to_binary(integer_to_list(Int)).
props(undefined) -> #'P_basic'{delivery_mode = 2};
props(P) -> #'P_basic'{priority = P,
delivery_mode = 2}.
|