summaryrefslogtreecommitdiff
path: root/deps/rabbitmq_shovel/test/parameters_SUITE.erl
blob: 516b1bd190d299cc5b28c37c5588f9df1efcf61f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates.  All rights reserved.
%%

-module(parameters_SUITE).

-compile(export_all).

-include_lib("eunit/include/eunit.hrl").
-include_lib("rabbit_common/include/rabbit_framing.hrl").

-define(EXCHANGE,    <<"test_exchange">>).
-define(TO_SHOVEL,   <<"to_the_shovel">>).
-define(FROM_SHOVEL, <<"from_the_shovel">>).
-define(UNSHOVELLED, <<"unshovelled">>).
-define(SHOVELLED,   <<"shovelled">>).
-define(TIMEOUT,     1000).

all() ->
    [
      {group, tests}
    ].

groups() ->
    [
      {tests, [parallel], [
          parse_amqp091_maps,
          parse_amqp091_proplists,
          parse_amqp091_empty_maps,
          parse_amqp091_empty_proplists,
          parse_amqp10,
          parse_amqp10_minimal,
          validate_amqp10,
          validate_amqp10_with_a_map
        ]}
    ].

%% -------------------------------------------------------------------
%% Testsuite setup/teardown.
%% -------------------------------------------------------------------

init_per_suite(Config) ->
    Config.

end_per_suite(Config) ->
    Config.

init_per_group(_, Config) ->
    Config.

end_per_group(_, Config) ->
    Config.

init_per_testcase(_Testcase, Config) -> Config.

end_per_testcase(_Testcase, Config) -> Config.


%% -------------------------------------------------------------------
%% Testcases.
%% -------------------------------------------------------------------

parse_amqp091_maps(_Config) ->
    Params =
        [{<<"src-uri">>, <<"amqp://localhost:5672">>},
         {<<"src-protocol">>, <<"amqp091">>},
         {<<"dest-protocol">>, <<"amqp091">>},
         {<<"dest-uri">>, <<"amqp://remotehost:5672">>},
         {<<"add-forward-headers">>, true},
         {<<"add-timestamp-header">>, true},
         {<<"publish-properties">>, #{<<"cluster_id">> => <<"x">>,
                                      <<"delivery_mode">> => 2}},
         {<<"ack-mode">>, <<"on-publish">>},
         {<<"src-delete-after">>, <<"queue-length">>},
         {<<"prefetch-count">>, 30},
         {<<"reconnect-delay">>, 1001},
         {<<"src-queue">>, <<"a-src-queue">>},
         {<<"dest-queue">>, <<"a-dest-queue">>}
        ],

    test_parse_amqp091(Params).

parse_amqp091_proplists(_Config) ->
    Params =
        [{<<"src-uri">>, <<"amqp://localhost:5672">>},
         {<<"src-protocol">>, <<"amqp091">>},
         {<<"dest-protocol">>, <<"amqp091">>},
         {<<"dest-uri">>, <<"amqp://remotehost:5672">>},
         {<<"dest-add-forward-headers">>, true},
         {<<"dest-add-timestamp-header">>, true},
         {<<"dest-publish-properties">>, [{<<"cluster_id">>, <<"x">>},
                                          {<<"delivery_mode">>, 2}]},
         {<<"ack-mode">>, <<"on-publish">>},
         {<<"src-delete-after">>, <<"queue-length">>},
         {<<"src-prefetch-count">>, 30},
         {<<"reconnect-delay">>, 1001},
         {<<"src-queue">>, <<"a-src-queue">>},
         {<<"dest-queue">>, <<"a-dest-queue">>}
        ],
    test_parse_amqp091(Params).

parse_amqp091_empty_maps(_Config) ->
    Params =
        [{<<"src-uri">>, <<"amqp://localhost:5672">>},
         {<<"src-protocol">>, <<"amqp091">>},
         {<<"dest-protocol">>, <<"amqp091">>},
         {<<"dest-uri">>, <<"amqp://remotehost:5672">>},
         {<<"dest-add-forward-headers">>, true},
         {<<"dest-add-timestamp-header">>, true},
         {<<"dest-publish-properties">>, #{}},
         {<<"ack-mode">>, <<"on-publish">>},
         {<<"src-delete-after">>, <<"queue-length">>},
         {<<"src-prefetch-count">>, 30},
         {<<"reconnect-delay">>, 1001},
         {<<"src-queue">>, <<"a-src-queue">>},
         {<<"dest-queue">>, <<"a-dest-queue">>}
        ],
    test_parse_amqp091_with_blank_proprties(Params).

parse_amqp091_empty_proplists(_Config) ->
    Params =
        [{<<"src-uri">>, <<"amqp://localhost:5672">>},
         {<<"src-protocol">>, <<"amqp091">>},
         {<<"dest-protocol">>, <<"amqp091">>},
         {<<"dest-uri">>, <<"amqp://remotehost:5672">>},
         {<<"dest-add-forward-headers">>, true},
         {<<"dest-add-timestamp-header">>, true},
         {<<"dest-publish-properties">>, []},
         {<<"ack-mode">>, <<"on-publish">>},
         {<<"src-delete-after">>, <<"queue-length">>},
         {<<"src-prefetch-count">>, 30},
         {<<"reconnect-delay">>, 1001},
         {<<"src-queue">>, <<"a-src-queue">>},
         {<<"dest-queue">>, <<"a-dest-queue">>}
        ],
    test_parse_amqp091_with_blank_proprties(Params).


test_parse_amqp091(Params) ->
    {ok, Result} = rabbit_shovel_parameters:parse({"vhost", "name"},
                                                  "my-cluster", Params),
    #{ack_mode := on_publish,
      name := "name",
      reconnect_delay := 1001,
      dest := #{module := rabbit_amqp091_shovel,
                uris := ["amqp://remotehost:5672"],
                props_fun := PropsFun
               },
      source := #{module := rabbit_amqp091_shovel,
                  uris := ["amqp://localhost:5672"],
                  prefetch_count := 30,
                  queue := <<"a-src-queue">>,
                  delete_after := 'queue-length'}
     } = Result,

    #'P_basic'{headers = ActualHeaders,
               delivery_mode = 2,
               cluster_id = <<"x">>} = PropsFun("amqp://localhost:5672",
                                                "amqp://remotehost:5672",
                                                #'P_basic'{headers = undefined}),
    assert_amqp901_headers(ActualHeaders),
    ok.

test_parse_amqp091_with_blank_proprties(Params) ->
    {ok, Result} = rabbit_shovel_parameters:parse({"vhost", "name"},
                                                  "my-cluster", Params),
    #{ack_mode := on_publish,
      name := "name",
      reconnect_delay := 1001,
      dest := #{module := rabbit_amqp091_shovel,
                uris := ["amqp://remotehost:5672"],
                props_fun := PropsFun
               },
      source := #{module := rabbit_amqp091_shovel,
                  uris := ["amqp://localhost:5672"],
                  prefetch_count := 30,
                  queue := <<"a-src-queue">>,
                  delete_after := 'queue-length'}
     } = Result,

    #'P_basic'{headers = ActualHeaders} = PropsFun("amqp://localhost:5672",
                                                   "amqp://remotehost:5672",
                                                   #'P_basic'{headers = undefined}),
    assert_amqp901_headers(ActualHeaders),
    ok.

assert_amqp901_headers(ActualHeaders) ->
    {_, array, [{table, Shovelled}]} = lists:keyfind(<<"x-shovelled">>, 1, ActualHeaders),
    {_, long, _} = lists:keyfind(<<"x-shovelled-timestamp">>, 1, ActualHeaders),

    ExpectedHeaders =
    [{<<"shovelled-by">>, "my-cluster"},
     {<<"shovel-type">>, <<"dynamic">>},
     {<<"shovel-name">>, "name"},
     {<<"shovel-vhost">>, "vhost"},
     {<<"src-uri">>,"amqp://localhost:5672"},
     {<<"dest-uri">>,"amqp://remotehost:5672"},
     {<<"src-queue">>,<<"a-src-queue">>},
     {<<"dest-queue">>,<<"a-dest-queue">>}],
    lists:foreach(fun({K, V}) ->
                          ?assertMatch({K, _, V},
                                       lists:keyfind(K, 1, Shovelled))
                  end, ExpectedHeaders),
    ok.

parse_amqp10(_Config) ->
    Params =
        [
         {<<"ack-mode">>, <<"on-publish">>},
         {<<"reconnect-delay">>, 1001},

         {<<"src-protocol">>, <<"amqp10">>},
         {<<"src-uri">>, <<"amqp://localhost:5672">>},
         {<<"src-address">>, <<"a-src-queue">>},
         {<<"src-delete-after">>, <<"never">>},
         {<<"src-prefetch-count">>, 30},

         {<<"dest-protocol">>, <<"amqp10">>},
         {<<"dest-uri">>, <<"amqp://remotehost:5672">>},
         {<<"dest-address">>, <<"a-dest-queue">>},
         {<<"dest-add-forward-headers">>, true},
         {<<"dest-add-timestamp-header">>, true},
         {<<"dest-application-properties">>, [{<<"some-app-prop">>,
                                               <<"app-prop-value">>}]},
         {<<"dest-message-annotations">>, [{<<"some-message-ann">>,
                                            <<"message-ann-value">>}]},
         {<<"dest-properties">>, [{<<"user_id">>, <<"some-user">>}]}
        ],

    ?assertMatch(
       {ok, #{name := "my_shovel",
              ack_mode := on_publish,
              source := #{module := rabbit_amqp10_shovel,
                          uris := ["amqp://localhost:5672"],
                          delete_after := never,
                          prefetch_count := 30,
                          source_address := <<"a-src-queue">>
                          },
              dest := #{module := rabbit_amqp10_shovel,
                        uris := ["amqp://remotehost:5672"],
                        target_address := <<"a-dest-queue">>,
                        message_annotations := #{<<"some-message-ann">> :=
                                                 <<"message-ann-value">>},
                        application_properties := #{<<"some-app-prop">> :=
                                                    <<"app-prop-value">>},
                        properties := #{user_id := <<"some-user">>},
                        add_timestamp_header := true,
                        add_forward_headers := true
                       }
             }},
        rabbit_shovel_parameters:parse({"vhost", "my_shovel"}, "my-cluster",
                                       Params)),
    ok.

parse_amqp10_minimal(_Config) ->
    Params =
        [
         {<<"src-protocol">>, <<"amqp10">>},
         {<<"src-uri">>, <<"amqp://localhost:5672">>},
         {<<"src-address">>, <<"a-src-queue">>},

         {<<"dest-protocol">>, <<"amqp10">>},
         {<<"dest-uri">>, <<"amqp://remotehost:5672">>},
         {<<"dest-address">>, <<"a-dest-queue">>}
        ],
    ?assertMatch(
       {ok, #{name := "my_shovel",
              ack_mode := on_confirm,
              source := #{module := rabbit_amqp10_shovel,
                          uris := ["amqp://localhost:5672"],
                          delete_after := never,
                          source_address := <<"a-src-queue">>
                          },
              dest := #{module := rabbit_amqp10_shovel,
                        uris := ["amqp://remotehost:5672"],
                        unacked := #{},
                        target_address := <<"a-dest-queue">>
                       }
             }},
        rabbit_shovel_parameters:parse({"vhost", "my_shovel"}, "my-cluster",
                                       Params)),
    ok.

validate_amqp10(_Config) ->
    Params =
        [
         {<<"ack-mode">>, <<"on-publish">>},
         {<<"reconnect-delay">>, 1001},

         {<<"src-protocol">>, <<"amqp10">>},
         {<<"src-uri">>, <<"amqp://localhost:5672">>},
         {<<"src-address">>, <<"a-src-queue">>},
         {<<"src-delete-after">>, <<"never">>},
         {<<"src-prefetch-count">>, 30},

         {<<"dest-protocol">>, <<"amqp10">>},
         {<<"dest-uri">>, <<"amqp://remotehost:5672">>},
         {<<"dest-address">>, <<"a-dest-queue">>},
         {<<"dest-add-forward-headers">>, true},
         {<<"dest-add-timestamp-header">>, true},
         {<<"dest-application-properties">>, [{<<"some-app-prop">>,
                                               <<"app-prop-value">>}]},
         {<<"dest-message-annotations">>, [{<<"some-message-ann">>,
                                               <<"message-ann-value">>}]},
         {<<"dest-properties">>, [{<<"user_id">>, <<"some-user">>}]}
        ],

        Res = rabbit_shovel_parameters:validate("my-vhost", <<"shovel">>,
                                                "my-shovel", Params, none),
        [] = validate_ok(Res),
        ok.

validate_amqp10_with_a_map(_Config) ->
    Params =
        #{
         <<"ack-mode">> => <<"on-publish">>,
         <<"reconnect-delay">> => 1001,

         <<"src-protocol">> => <<"amqp10">>,
         <<"src-uri">> => <<"amqp://localhost:5672">>,
         <<"src-address">> => <<"a-src-queue">>,
         <<"src-delete-after">> => <<"never">>,
         <<"src-prefetch-count">> => 30,

         <<"dest-protocol">> => <<"amqp10">>,
         <<"dest-uri">> => <<"amqp://remotehost:5672">>,
         <<"dest-address">> => <<"a-dest-queue">>,
         <<"dest-add-forward-headers">> => true,
         <<"dest-add-timestamp-header">> => true,
         <<"dest-application-properties">> => [{<<"some-app-prop">>,
                                                <<"app-prop-value">>}],
         <<"dest-message-annotations">> => [{<<"some-message-ann">>, <<"message-ann-value">>}],
         <<"dest-properties">> => #{<<"user_id">> => <<"some-user">>}
        },

        Res = rabbit_shovel_parameters:validate("my-vhost", <<"shovel">>,
                                                "my-shovel", Params, none),
        [] = validate_ok(Res),
        ok.

validate_ok([ok | T]) ->
    validate_ok(T);
validate_ok([[_|_] = L | T]) ->
    validate_ok(L) ++ validate_ok(T);
validate_ok([]) -> [];
validate_ok(X) ->
    exit({not_ok, X}).