summaryrefslogtreecommitdiff
path: root/deps/rabbitmq_mqtt/test/auth_SUITE.erl
blob: 7368139d9529c62883c495d062d0fdb8b563c983 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates.  All rights reserved.
%%
-module(auth_SUITE).
-compile([export_all]).

-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-define(CONNECT_TIMEOUT, 10000).

all() ->
    [{group, anonymous_no_ssl_user},
     {group, anonymous_ssl_user},
     {group, no_ssl_user},
     {group, ssl_user},
     {group, client_id_propagation}].

groups() ->
    [{anonymous_ssl_user, [],
      [anonymous_auth_success,
       user_credentials_auth,
       ssl_user_auth_success,
       ssl_user_vhost_not_allowed,
       ssl_user_vhost_parameter_mapping_success,
       ssl_user_vhost_parameter_mapping_not_allowed,
       ssl_user_vhost_parameter_mapping_vhost_does_not_exist,
       ssl_user_port_vhost_mapping_takes_precedence_over_cert_vhost_mapping
      ]},
     {anonymous_no_ssl_user, [],
      [anonymous_auth_success,
       user_credentials_auth,
       port_vhost_mapping_success,
       port_vhost_mapping_success_no_mapping,
       port_vhost_mapping_not_allowed,
       port_vhost_mapping_vhost_does_not_exist
       %% SSL auth will succeed, because we cannot ignore anonymous
       ]},
     {ssl_user, [],
      [anonymous_auth_failure,
       user_credentials_auth,
       ssl_user_auth_success,
       ssl_user_vhost_not_allowed,
       ssl_user_vhost_parameter_mapping_success,
       ssl_user_vhost_parameter_mapping_not_allowed,
       ssl_user_vhost_parameter_mapping_vhost_does_not_exist,
       ssl_user_port_vhost_mapping_takes_precedence_over_cert_vhost_mapping
      ]},
     {no_ssl_user, [],
      [anonymous_auth_failure,
       user_credentials_auth,
       ssl_user_auth_failure,
       port_vhost_mapping_success,
       port_vhost_mapping_success_no_mapping,
       port_vhost_mapping_not_allowed,
       port_vhost_mapping_vhost_does_not_exist
     ]},
     {client_id_propagation, [],
      [client_id_propagation]
     }
    ].

init_per_suite(Config) ->
    rabbit_ct_helpers:log_environment(),
    Config.

end_per_suite(Config) ->
    Config.

init_per_group(Group, Config) ->
    Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
    Config1 = rabbit_ct_helpers:set_config(Config, [
        {rmq_nodename_suffix, Suffix},
        {rmq_certspwd, "bunnychow"}
    ]),
    MqttConfig = mqtt_config(Group),
    AuthConfig = auth_config(Group),
    rabbit_ct_helpers:run_setup_steps(Config1,
        [ fun(Conf) -> merge_app_env(MqttConfig, Conf) end ] ++
        [ fun(Conf) -> case AuthConfig of
                            undefined -> Conf;
                            _         -> merge_app_env(AuthConfig, Conf)
                       end
          end ] ++
        rabbit_ct_broker_helpers:setup_steps() ++
        rabbit_ct_client_helpers:setup_steps()).

end_per_group(_, Config) ->
    rabbit_ct_helpers:run_teardown_steps(Config,
      rabbit_ct_client_helpers:teardown_steps() ++
      rabbit_ct_broker_helpers:teardown_steps()).

merge_app_env(MqttConfig, Config) ->
    rabbit_ct_helpers:merge_app_env(Config, MqttConfig).

mqtt_config(anonymous_ssl_user) ->
    {rabbitmq_mqtt, [{ssl_cert_login,  true},
                     {allow_anonymous, true}]};
mqtt_config(anonymous_no_ssl_user) ->
    {rabbitmq_mqtt, [{ssl_cert_login,  false},
                     {allow_anonymous, true}]};
mqtt_config(ssl_user) ->
    {rabbitmq_mqtt, [{ssl_cert_login,  true},
                     {allow_anonymous, false}]};
mqtt_config(no_ssl_user) ->
    {rabbitmq_mqtt, [{ssl_cert_login,  false},
                     {allow_anonymous, false}]};
mqtt_config(client_id_propagation) ->
    {rabbitmq_mqtt, [{ssl_cert_login,  true},
                     {allow_anonymous, true}]}.

auth_config(client_id_propagation) ->
    {rabbit, [
            {auth_backends, [rabbit_auth_backend_mqtt_mock]}
          ]
    };
auth_config(_) ->
    undefined.

init_per_testcase(Testcase, Config) when Testcase == ssl_user_auth_success;
                                         Testcase == ssl_user_auth_failure ->
    Config1 = set_cert_user_on_default_vhost(Config),
    rabbit_ct_helpers:testcase_started(Config1, Testcase);
init_per_testcase(ssl_user_vhost_parameter_mapping_success, Config) ->
    Config1 = set_cert_user_on_default_vhost(Config),
    User = ?config(temp_ssl_user, Config1),
    ok = rabbit_ct_broker_helpers:clear_permissions(Config1, User, <<"/">>),
    Config2 = set_vhost_for_cert_user(Config1, User),
    rabbit_ct_helpers:testcase_started(Config2, ssl_user_vhost_parameter_mapping_success);
init_per_testcase(ssl_user_vhost_parameter_mapping_not_allowed, Config) ->
    Config1 = set_cert_user_on_default_vhost(Config),
    User = ?config(temp_ssl_user, Config1),
    Config2 = set_vhost_for_cert_user(Config1, User),
    VhostForCertUser = ?config(temp_vhost_for_ssl_user, Config2),
    ok = rabbit_ct_broker_helpers:clear_permissions(Config2, User, VhostForCertUser),
    rabbit_ct_helpers:testcase_started(Config2, ssl_user_vhost_parameter_mapping_not_allowed);
init_per_testcase(user_credentials_auth, Config) ->
    User = <<"new-user">>,
    Pass = <<"new-user-pass">>,
    ok = rabbit_ct_broker_helpers:add_user(Config, 0, User, Pass),
    ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, <<"/">>),
    Config1 = rabbit_ct_helpers:set_config(Config, [{new_user, User},
                                                    {new_user_pass, Pass}]),
    rabbit_ct_helpers:testcase_started(Config1, user_credentials_auth);
init_per_testcase(ssl_user_vhost_not_allowed, Config) ->
    Config1 = set_cert_user_on_default_vhost(Config),
    User = ?config(temp_ssl_user, Config1),
    ok = rabbit_ct_broker_helpers:clear_permissions(Config1, User, <<"/">>),
    rabbit_ct_helpers:testcase_started(Config1, ssl_user_vhost_not_allowed);
init_per_testcase(ssl_user_vhost_parameter_mapping_vhost_does_not_exist, Config) ->
    Config1 = set_cert_user_on_default_vhost(Config),
    User = ?config(temp_ssl_user, Config1),
    Config2 = set_vhost_for_cert_user(Config1, User),
    VhostForCertUser = ?config(temp_vhost_for_ssl_user, Config2),
    ok = rabbit_ct_broker_helpers:delete_vhost(Config, VhostForCertUser),
    rabbit_ct_helpers:testcase_started(Config1, ssl_user_vhost_parameter_mapping_vhost_does_not_exist);
init_per_testcase(port_vhost_mapping_success, Config) ->
    User = <<"guest">>,
    Config1 = set_vhost_for_port_vhost_mapping_user(Config, User),
    rabbit_ct_broker_helpers:clear_permissions(Config1, User, <<"/">>),
    rabbit_ct_helpers:testcase_started(Config1, port_vhost_mapping_success);
init_per_testcase(port_vhost_mapping_success_no_mapping, Config) ->
    User = <<"guest">>,
    Config1 = set_vhost_for_port_vhost_mapping_user(Config, User),
    PortToVHostMappingParameter = [
        {<<"1">>,   <<"unlikely to exist">>},
        {<<"2">>,   <<"unlikely to exist">>}],
    ok = rabbit_ct_broker_helpers:set_global_parameter(Config, mqtt_port_to_vhost_mapping, PortToVHostMappingParameter),
    VHost = ?config(temp_vhost_for_port_mapping, Config1),
    rabbit_ct_broker_helpers:clear_permissions(Config1, User, VHost),
    rabbit_ct_helpers:testcase_started(Config1, port_vhost_mapping_success_no_mapping);
init_per_testcase(port_vhost_mapping_not_allowed, Config) ->
    User = <<"guest">>,
    Config1 = set_vhost_for_port_vhost_mapping_user(Config, User),
    rabbit_ct_broker_helpers:clear_permissions(Config1, User, <<"/">>),
    VHost = ?config(temp_vhost_for_port_mapping, Config1),
    rabbit_ct_broker_helpers:clear_permissions(Config1, User, VHost),
    rabbit_ct_helpers:testcase_started(Config1, port_vhost_mapping_not_allowed);
init_per_testcase(port_vhost_mapping_vhost_does_not_exist, Config) ->
    User = <<"guest">>,
    Config1 = set_vhost_for_port_vhost_mapping_user(Config, User),
    rabbit_ct_broker_helpers:clear_permissions(Config1, User, <<"/">>),
    VHost = ?config(temp_vhost_for_port_mapping, Config1),
    rabbit_ct_broker_helpers:delete_vhost(Config1, VHost),
    rabbit_ct_helpers:testcase_started(Config1, port_vhost_mapping_vhost_does_not_exist);
init_per_testcase(ssl_user_port_vhost_mapping_takes_precedence_over_cert_vhost_mapping, Config) ->
    Config1 = set_cert_user_on_default_vhost(Config),
    User = ?config(temp_ssl_user, Config1),
    Config2 = set_vhost_for_cert_user(Config1, User),

    Config3 = set_vhost_for_port_vhost_mapping_user(Config2, User),
    VhostForPortMapping = ?config(mqtt_port_to_vhost_mapping, Config2),
    rabbit_ct_broker_helpers:clear_permissions(Config3, User, VhostForPortMapping),

    rabbit_ct_broker_helpers:clear_permissions(Config3, User, <<"/">>),
    rabbit_ct_helpers:testcase_started(Config3, ssl_user_port_vhost_mapping_takes_precedence_over_cert_vhost_mapping);
init_per_testcase(Testcase, Config) ->
    rabbit_ct_helpers:testcase_started(Config, Testcase).

set_cert_user_on_default_vhost(Config) ->
    CertsDir = ?config(rmq_certsdir, Config),
    CertFile = filename:join([CertsDir, "client", "cert.pem"]),
    {ok, CertBin} = file:read_file(CertFile),
    [{'Certificate', Cert, not_encrypted}] = public_key:pem_decode(CertBin),
    UserBin = rabbit_ct_broker_helpers:rpc(Config, 0,
                                           rabbit_ssl,
                                           peer_cert_auth_name,
                                           [Cert]),
    User = binary_to_list(UserBin),
    ok = rabbit_ct_broker_helpers:add_user(Config, 0, User, ""),
    ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, <<"/">>),
    rabbit_ct_helpers:set_config(Config, [{temp_ssl_user, User}]).

set_vhost_for_cert_user(Config, User) ->
    VhostForCertUser = <<"vhost_for_cert_user">>,
    UserToVHostMappingParameter = [
        {rabbit_data_coercion:to_binary(User), VhostForCertUser},
        {<<"O=client,CN=unlikelytoexistuser">>, <<"vhost2">>}
    ],
    ok = rabbit_ct_broker_helpers:add_vhost(Config, VhostForCertUser),
    ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, VhostForCertUser),
    ok = rabbit_ct_broker_helpers:set_global_parameter(Config, mqtt_default_vhosts, UserToVHostMappingParameter),
    rabbit_ct_helpers:set_config(Config, [{temp_vhost_for_ssl_user, VhostForCertUser}]).

set_vhost_for_port_vhost_mapping_user(Config, User) ->
    VhostForPortMapping = <<"vhost_for_port_vhost_mapping">>,
    Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt),
    TlsPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt_tls),
    PortToVHostMappingParameter = [
        {integer_to_binary(Port),    VhostForPortMapping},
        {<<"1884">>,                 <<"vhost2">>},
        {integer_to_binary(TlsPort), VhostForPortMapping},
        {<<"8884">>,                 <<"vhost2">>}

    ],
    ok = rabbit_ct_broker_helpers:add_vhost(Config, VhostForPortMapping),
    ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, VhostForPortMapping),
    ok = rabbit_ct_broker_helpers:set_global_parameter(Config, mqtt_port_to_vhost_mapping, PortToVHostMappingParameter),
    rabbit_ct_helpers:set_config(Config, [{temp_vhost_for_port_mapping, VhostForPortMapping}]).

end_per_testcase(Testcase, Config) when Testcase == ssl_user_auth_success;
                                        Testcase == ssl_user_auth_failure;
                                        Testcase == ssl_user_vhost_not_allowed ->
    delete_cert_user(Config),
    rabbit_ct_helpers:testcase_finished(Config, Testcase);
end_per_testcase(TestCase, Config) when TestCase == ssl_user_vhost_parameter_mapping_success;
                                        TestCase == ssl_user_vhost_parameter_mapping_not_allowed ->
    delete_cert_user(Config),
    VhostForCertUser = ?config(temp_vhost_for_ssl_user, Config),
    ok = rabbit_ct_broker_helpers:delete_vhost(Config, VhostForCertUser),
    ok = rabbit_ct_broker_helpers:clear_global_parameter(Config, mqtt_default_vhosts),
    rabbit_ct_helpers:testcase_finished(Config, TestCase);
end_per_testcase(user_credentials_auth, Config) ->
    User = ?config(new_user, Config),
    {ok,_} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["delete_user", User]),
    rabbit_ct_helpers:testcase_finished(Config, user_credentials_auth);
end_per_testcase(ssl_user_vhost_parameter_mapping_vhost_does_not_exist, Config) ->
    delete_cert_user(Config),
    ok = rabbit_ct_broker_helpers:clear_global_parameter(Config, mqtt_default_vhosts),
    rabbit_ct_helpers:testcase_finished(Config, ssl_user_vhost_parameter_mapping_vhost_does_not_exist);
end_per_testcase(Testcase, Config) when Testcase == port_vhost_mapping_success;
                                        Testcase == port_vhost_mapping_not_allowed;
                                        Testcase == port_vhost_mapping_success_no_mapping ->
    User = <<"guest">>,
    rabbit_ct_broker_helpers:set_full_permissions(Config, User, <<"/">>),
    VHost = ?config(temp_vhost_for_port_mapping, Config),
    ok = rabbit_ct_broker_helpers:delete_vhost(Config, VHost),
    ok = rabbit_ct_broker_helpers:clear_global_parameter(Config, mqtt_port_to_vhost_mapping),
    rabbit_ct_helpers:testcase_finished(Config, Testcase);
end_per_testcase(port_vhost_mapping_vhost_does_not_exist, Config) ->
    User = <<"guest">>,
    ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, <<"/">>),
    ok = rabbit_ct_broker_helpers:clear_global_parameter(Config, mqtt_port_to_vhost_mapping),
    rabbit_ct_helpers:testcase_finished(Config, port_vhost_mapping_vhost_does_not_exist);
end_per_testcase(ssl_user_port_vhost_mapping_takes_precedence_over_cert_vhost_mapping, Config) ->
    delete_cert_user(Config),
    VhostForCertUser = ?config(temp_vhost_for_ssl_user, Config),
    ok = rabbit_ct_broker_helpers:delete_vhost(Config, VhostForCertUser),
    ok = rabbit_ct_broker_helpers:clear_global_parameter(Config, mqtt_default_vhosts),

    VHostForPortVHostMapping = ?config(temp_vhost_for_port_mapping, Config),
    ok = rabbit_ct_broker_helpers:delete_vhost(Config, VHostForPortVHostMapping),
    ok = rabbit_ct_broker_helpers:clear_global_parameter(Config, mqtt_port_to_vhost_mapping),
    rabbit_ct_helpers:testcase_finished(Config, ssl_user_port_vhost_mapping_takes_precedence_over_cert_vhost_mapping);
end_per_testcase(Testcase, Config) ->
    rabbit_ct_helpers:testcase_finished(Config, Testcase).

delete_cert_user(Config) ->
    User = ?config(temp_ssl_user, Config),
    {ok,_} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["delete_user", User]).

anonymous_auth_success(Config) ->
    expect_successful_connection(fun connect_anonymous/1, Config).

anonymous_auth_failure(Config) ->
    expect_authentication_failure(fun connect_anonymous/1, Config).


ssl_user_auth_success(Config) ->
    expect_successful_connection(fun connect_ssl/1, Config).

ssl_user_auth_failure(Config) ->
    expect_authentication_failure(fun connect_ssl/1, Config).

user_credentials_auth(Config) ->
    NewUser = ?config(new_user, Config),
    NewUserPass = ?config(new_user_pass, Config),

    expect_successful_connection(
        fun(Conf) -> connect_user(NewUser, NewUserPass, Conf) end,
        Config),

    expect_successful_connection(
        fun(Conf) -> connect_user(<<"guest">>, <<"guest">>, Conf) end,
        Config),

    expect_successful_connection(
        fun(Conf) -> connect_user(<<"/:guest">>, <<"guest">>, Conf) end,
        Config),

    expect_authentication_failure(
        fun(Conf) -> connect_user(NewUser, <<"invalid_pass">>, Conf) end,
        Config),

    expect_authentication_failure(
        fun(Conf) -> connect_user(undefined, <<"pass">>, Conf) end,
        Config),

    expect_authentication_failure(
        fun(Conf) -> connect_user(NewUser, undefined, Conf) end,
        Config),

    expect_authentication_failure(
        fun(Conf) -> connect_user(<<"non-existing-vhost:guest">>, <<"guest">>, Conf) end,
        Config).

ssl_user_vhost_parameter_mapping_success(Config) ->
    expect_successful_connection(fun connect_ssl/1, Config).

ssl_user_vhost_parameter_mapping_not_allowed(Config) ->
    expect_authentication_failure(fun connect_ssl/1, Config).

ssl_user_vhost_not_allowed(Config) ->
    expect_authentication_failure(fun connect_ssl/1, Config).

ssl_user_vhost_parameter_mapping_vhost_does_not_exist(Config) ->
    expect_authentication_failure(fun connect_ssl/1, Config).

port_vhost_mapping_success(Config) ->
    expect_successful_connection(
        fun(Conf) -> connect_user(<<"guest">>, <<"guest">>, Conf) end,
        Config).

port_vhost_mapping_success_no_mapping(Config) ->
    %% no vhost mapping for the port, falling back to default vhost
    %% where the user can connect
    expect_successful_connection(
        fun(Conf) -> connect_user(<<"guest">>, <<"guest">>, Conf) end,
        Config
    ).

port_vhost_mapping_not_allowed(Config) ->
    expect_authentication_failure(
        fun(Conf) -> connect_user(<<"guest">>, <<"guest">>, Conf) end,
        Config
    ).

port_vhost_mapping_vhost_does_not_exist(Config) ->
    expect_authentication_failure(
        fun(Conf) -> connect_user(<<"guest">>, <<"guest">>, Conf) end,
        Config
    ).

ssl_user_port_vhost_mapping_takes_precedence_over_cert_vhost_mapping(Config) ->
    expect_successful_connection(fun connect_ssl/1, Config).

connect_anonymous(Config) ->
    P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt),
    emqttc:start_link([{host, "localhost"},
                       {port, P},
                       {client_id, <<"simpleClient">>},
                       {proto_ver, 3},
                       {logger, info}]).

connect_ssl(Config) ->
    CertsDir = ?config(rmq_certsdir, Config),
    SSLConfig = [{cacertfile, filename:join([CertsDir, "testca", "cacert.pem"])},
                 {certfile, filename:join([CertsDir, "client", "cert.pem"])},
                 {keyfile, filename:join([CertsDir, "client", "key.pem"])}],
    P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt_tls),
    emqttc:start_link([{host, "localhost"},
                       {port, P},
                       {client_id, <<"simpleClient">>},
                       {proto_ver, 3},
                       {logger, info},
                       {ssl, SSLConfig}]).

client_id_propagation(Config) ->
    ok = rabbit_ct_broker_helpers:add_code_path_to_all_nodes(Config,
      rabbit_auth_backend_mqtt_mock),
    ClientId = <<"client-id-propagation">>,
    {ok, C} = connect_user(<<"client-id-propagation">>, <<"client-id-propagation">>,
                           Config, ClientId),
    receive {mqttc, C, connected} -> ok
    after ?CONNECT_TIMEOUT -> exit(emqttc_connection_timeout)
    end,
    emqttc:subscribe(C, <<"TopicA">>, qos0),
    [{authentication, AuthProps}] = rabbit_ct_broker_helpers:rpc(Config, 0,
                                                                 rabbit_auth_backend_mqtt_mock,
                                                                 get,
                                                                 [authentication]),
    ?assertEqual(ClientId, proplists:get_value(client_id, AuthProps)),

    [{vhost_access, AuthzData}] = rabbit_ct_broker_helpers:rpc(Config, 0,
                                                               rabbit_auth_backend_mqtt_mock,
                                                               get,
                                                               [vhost_access]),
    ?assertEqual(ClientId, maps:get(<<"client_id">>, AuthzData)),

    [{resource_access, AuthzContext}] = rabbit_ct_broker_helpers:rpc(Config, 0,
                                                                     rabbit_auth_backend_mqtt_mock,
                                                                     get,
                                                                     [resource_access]),
    ?assertEqual(true, maps:size(AuthzContext) > 0),
    ?assertEqual(ClientId, maps:get(<<"client_id">>, AuthzContext)),

    [{topic_access, TopicContext}] = rabbit_ct_broker_helpers:rpc(Config, 0,
                                                                  rabbit_auth_backend_mqtt_mock,
                                                                  get,
                                                                  [topic_access]),
    VariableMap = maps:get(variable_map, TopicContext),
    ?assertEqual(ClientId, maps:get(<<"client_id">>, VariableMap)),

    emqttc:disconnect(C).

connect_user(User, Pass, Config) ->
    connect_user(User, Pass, Config, User).
connect_user(User, Pass, Config, ClientID) ->
    Creds = case User of
        undefined -> [];
        _         -> [{username, User}]
    end ++ case Pass of
        undefined -> [];
        _         -> [{password, Pass}]
    end,
    P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt),
    emqttc:start_link([{host, "localhost"},
                       {port, P},
                       {client_id, ClientID},
                       {proto_ver, 3},
                       {logger, info}] ++ Creds).

expect_successful_connection(ConnectFun, Config) ->
    rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_core_metrics, reset_auth_attempt_metrics, []),
    {ok, C} = ConnectFun(Config),
    receive {mqttc, C, connected} -> emqttc:disconnect(C)
    after ?CONNECT_TIMEOUT -> exit(emqttc_connection_timeout)
    end,
    [Attempt] =
        rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_core_metrics, get_auth_attempts, []),
    ?assertEqual(false, proplists:is_defined(remote_address, Attempt)),
    ?assertEqual(false, proplists:is_defined(username, Attempt)),
    ?assertEqual(proplists:get_value(protocol, Attempt), <<"mqtt">>),
    ?assertEqual(proplists:get_value(auth_attempts, Attempt), 1),
    ?assertEqual(proplists:get_value(auth_attempts_failed, Attempt), 0),
    ?assertEqual(proplists:get_value(auth_attempts_succeeded, Attempt), 1).

expect_authentication_failure(ConnectFun, Config) ->
    process_flag(trap_exit, true),
    rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_core_metrics, reset_auth_attempt_metrics, []),
    {ok, C} = ConnectFun(Config),
    Result = receive
        {mqttc, C, connected} -> {error, unexpected_anonymous_connection};
        {'EXIT', C, {shutdown,{connack_error,'CONNACK_AUTH'}}} -> ok;
        {'EXIT', C, {shutdown,{connack_error,'CONNACK_CREDENTIALS'}}} -> ok
    after
        ?CONNECT_TIMEOUT -> {error, emqttc_connection_timeout}
    end,
    [Attempt] =
        rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_core_metrics, get_auth_attempts, []),
    ?assertEqual(false, proplists:is_defined(remote_address, Attempt), <<>>),
    ?assertEqual(false, proplists:is_defined(username, Attempt)),
    ?assertEqual(proplists:get_value(protocol, Attempt), <<"mqtt">>),
    ?assertEqual(proplists:get_value(auth_attempts, Attempt), 1),
    ?assertEqual(proplists:get_value(auth_attempts_failed, Attempt), 1),
    ?assertEqual(proplists:get_value(auth_attempts_succeeded, Attempt), 0),
    process_flag(trap_exit, false),
    case Result of
        ok -> ok;
        {error, Err} -> exit(Err)
    end.