diff options
authorAlexey Lebedeff <>2021-10-26 16:29:41 +0200
committerAlexey Lebedeff <>2021-11-12 18:03:05 +0100
commite0723d5e6620182380566ee36d381af068ecf43d (patch)
parent115b951b9cc718fd40aa560e49319f57c762f05d (diff)
Prevent crash logs when mqtt user is missing permissionsmqtt-crash-log
Fixes #2941 This adds proper exception handlers in the right places. And tests ensure that it indeed provides nice neat logs without large stacktraces for every amqp operation. Unnecessary checking for subscribe permissions on topic was dropped, as `queue.bind` does exactly the same check. Topic permissions tests were also added, and they indeed confirm that there was no change in behaviour. Ideally the same explicit topic permission check should be dropped for publishing, but it's more complicated - so for now there only a detailed comment in the source code explaining it. A few other things were also optimized away: - Using amqp client to test for queue existence - Creating queues/starting consumptions too eagerly, even if not yet requested by client
2 files changed, 437 insertions, 85 deletions
diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
index ab9a8cb75b..d904450cb9 100644
--- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
+++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
@@ -66,9 +66,18 @@ process_frame(#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }},
{error, connect_expected, PState};
process_frame(Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }},
PState) ->
- case process_request(Type, Frame, PState) of
+ try process_request(Type, Frame, PState) of
{ok, PState1} -> {ok, PState1, PState1#proc_state.connection};
Ret -> Ret
+ catch
+ _:{{shutdown, {server_initiated_close, 403, _}}, _} ->
+ %% NB: MQTT spec says we should ack normally, ie pretend
+ %% there was no auth error, but here we are closing the
+ %% connection with an error. This is what happens anyway
+ %% if there is an authorization failure at the AMQP 0-9-1
+ %% client level. And error was already logged by AMQP
+ %% channel, so no need for custom logging.
+ {error, access_refused, PState}
add_client_id_to_adapter_info(ClientId, #amqp_adapter_info{additional_info = AdditionalInfo0} = AdapterInfo) ->
@@ -133,8 +142,7 @@ process_request(?CONNECT,
{UserBin, PassBin} ->
case process_login(UserBin, PassBin, ProtoVersion, PState1) of
connack_dup_auth ->
- {SessionPresent0, PState2} = maybe_clean_sess(PState1),
- {{?CONNACK_ACCEPT, SessionPresent0}, PState2};
+ maybe_clean_sess(PState1);
{?CONNACK_ACCEPT, Conn, VHost, AState} ->
case rabbit_mqtt_collector:register(ClientId, self()) of
{ok, Corr} ->
@@ -156,8 +164,7 @@ process_request(?CONNECT,
retainer_pid = RetainerPid,
auth_state = AState,
register_state = {pending, Corr}},
- {SessionPresent1, PState4} = maybe_clean_sess(PState3),
- {{?CONNACK_ACCEPT, SessionPresent1}, PState4};
+ maybe_clean_sess(PState3);
%% e.g. this node was removed from the MQTT cluster members
{error, _} = Err ->
rabbit_log_connection:error("MQTT cannot accept a connection: "
@@ -256,44 +263,43 @@ process_request(?SUBSCRIBE,
message_id = StateMsgId,
mqtt2amqp_fun = Mqtt2AmqpFun} = PState0) ->
rabbit_log_connection:debug("Received a SUBSCRIBE for topic(s) ~p", [Topics]),
- check_subscribe(Topics, fun() ->
- {QosResponse, PState1} =
- lists:foldl(fun (#mqtt_topic{name = TopicName,
- qos = Qos}, {QosList, PState}) ->
- SupportedQos = supported_subs_qos(Qos),
- {Queue, #proc_state{subscriptions = Subs} = PState1} =
- ensure_queue(SupportedQos, PState),
- RoutingKey = Mqtt2AmqpFun(TopicName),
- Binding = #'queue.bind'{
- queue = Queue,
- exchange = Exchange,
- routing_key = RoutingKey},
- #'queue.bind_ok'{} = amqp_channel:call(Channel, Binding),
- SupportedQosList = case maps:find(TopicName, Subs) of
- {ok, L} -> [SupportedQos|L];
- error -> [SupportedQos]
- end,
- {[SupportedQos | QosList],
- PState1 #proc_state{
- subscriptions =
- maps:put(TopicName, SupportedQosList, Subs)}}
- end, {[], PState0}, Topics),
- SendFun(#mqtt_frame{fixed = #mqtt_frame_fixed{type = ?SUBACK},
- variable = #mqtt_frame_suback{
- message_id = SubscribeMsgId,
- qos_table = QosResponse}}, PState1),
- %% we may need to send up to length(Topics) messages.
- %% if QoS is > 0 then we need to generate a message id,
- %% and increment the counter.
- StartMsgId = safe_max_id(SubscribeMsgId, StateMsgId),
- N = lists:foldl(fun (Topic, Acc) ->
- case maybe_send_retained_message(RPid, Topic, Acc, PState1) of
- {true, X} -> Acc + X;
- false -> Acc
- end
- end, StartMsgId, Topics),
- {ok, PState1#proc_state{message_id = N}}
- end, PState0);
+ {QosResponse, PState1} =
+ lists:foldl(fun (#mqtt_topic{name = TopicName,
+ qos = Qos}, {QosList, PState}) ->
+ SupportedQos = supported_subs_qos(Qos),
+ {Queue, #proc_state{subscriptions = Subs} = PState1} =
+ ensure_queue(SupportedQos, PState),
+ RoutingKey = Mqtt2AmqpFun(TopicName),
+ Binding = #'queue.bind'{
+ queue = Queue,
+ exchange = Exchange,
+ routing_key = RoutingKey},
+ #'queue.bind_ok'{} = amqp_channel:call(Channel, Binding),
+ SupportedQosList = case maps:find(TopicName, Subs) of
+ {ok, L} -> [SupportedQos|L];
+ error -> [SupportedQos]
+ end,
+ {[SupportedQos | QosList],
+ PState1 #proc_state{
+ subscriptions =
+ maps:put(TopicName, SupportedQosList, Subs)}}
+ end, {[], PState0}, Topics),
+ SendFun(#mqtt_frame{fixed = #mqtt_frame_fixed{type = ?SUBACK},
+ variable = #mqtt_frame_suback{
+ message_id = SubscribeMsgId,
+ qos_table = QosResponse}}, PState1),
+ %% we may need to send up to length(Topics) messages.
+ %% if QoS is > 0 then we need to generate a message id,
+ %% and increment the counter.
+ StartMsgId = safe_max_id(SubscribeMsgId, StateMsgId),
+ N = lists:foldl(fun (Topic, Acc) ->
+ case maybe_send_retained_message(RPid, Topic, Acc, PState1) of
+ {true, X} -> Acc + X;
+ false -> Acc
+ end
+ end, StartMsgId, Topics),
+ {ok, PState1#proc_state{message_id = N}};
@@ -489,35 +495,65 @@ delivery_qos(Tag, Headers, #proc_state{ consumer_tags = {_, Tag} }) ->
maybe_clean_sess(PState = #proc_state { clean_sess = false,
connection = Conn,
+ auth_state = #auth_state{vhost = VHost},
client_id = ClientId }) ->
- SessionPresent = session_present(Conn, ClientId),
- {_Queue, PState1} = ensure_queue(?QOS_1, PState),
- {SessionPresent, PState1};
+ SessionPresent = session_present(VHost, ClientId),
+ case SessionPresent of
+ false ->
+ %% ensure_queue/2 not only ensures that queue is created, but also starts consuming from it.
+ %% Let's avoid creating that queue until explicitly asked by a client.
+ %% Then publish-only clients, that connect with clean_sess=true due to some misconfiguration,
+ %% will consume less resources.
+ {{?CONNACK_ACCEPT, SessionPresent}, PState};
+ true ->
+ try ensure_queue(?QOS_1, PState) of
+ {_Queue, PState1} -> {{?CONNACK_ACCEPT, SessionPresent}, PState1}
+ catch
+ exit:({{shutdown, {server_initiated_close, 403, _}}, _}) ->
+ %% Connection is not yet propagated to #proc_state{}, let's close it here
+ catch amqp_connection:close(Conn),
+ rabbit_log_connection:error("MQTT cannot recover a session, user is missing permissions"),
+ C:E:S ->
+ %% Connection is not yet propagated to
+ %% #proc_state{}, let's close it here.
+ %% This is an exceptional situation anyway, but
+ %% doing this will prevent second crash from
+ %% amqp client being logged.
+ catch amqp_connection:close(Conn),
+ erlang:raise(C, E, S)
+ end
+ end;
maybe_clean_sess(PState = #proc_state { clean_sess = true,
connection = Conn,
+ auth_state = #auth_state{vhost = VHost},
client_id = ClientId }) ->
{_, Queue} = rabbit_mqtt_util:subcription_queue_name(ClientId),
{ok, Channel} = amqp_connection:open_channel(Conn),
- ok = try amqp_channel:call(Channel, #'queue.delete'{ queue = Queue }) of
- #'queue.delete_ok'{} -> ok
- catch
- exit:_Error -> ok
- after
- amqp_channel:close(Channel)
- end,
- {false, PState}.
-session_present(Conn, ClientId) ->
+ case session_present(VHost, ClientId) of
+ false ->
+ {{?CONNACK_ACCEPT, false}, PState};
+ true ->
+ try amqp_channel:call(Channel, #'queue.delete'{ queue = Queue }) of
+ #'queue.delete_ok'{} -> {{?CONNACK_ACCEPT, false}, PState}
+ catch
+ exit:({{shutdown, {server_initiated_close, 403, _}}, _}) ->
+ %% Connection is not yet propagated to #proc_state{}, let's close it here
+ catch amqp_connection:close(Conn),
+ rabbit_log_connection:error("MQTT cannot start a clean session: "
+ "`configure` permission missing for queue `~p`", [Queue]),
+ after
+ catch amqp_channel:close(Channel)
+ end
+ end.
+session_present(VHost, ClientId) ->
{_, QueueQ1} = rabbit_mqtt_util:subcription_queue_name(ClientId),
- Declare = #'queue.declare'{queue = QueueQ1,
- passive = true},
- {ok, Channel} = amqp_connection:open_channel(Conn),
- try
- amqp_channel:call(Channel, Declare),
- amqp_channel:close(Channel),
- true
- catch exit:{{shutdown, {server_initiated_close, ?NOT_FOUND, _Text}}, _} ->
- false
+ QueueName = rabbit_misc:r(VHost, queue, QueueQ1),
+ case rabbit_amqqueue:lookup(QueueName) of
+ {ok, _} -> true;
+ {error, not_found} -> false
make_will_msg(#mqtt_frame_connect{ will_flag = false }) ->
@@ -832,6 +868,22 @@ send_will(PState = #proc_state{will_msg = WillMsg = #mqtt_msg{retain = Retain,
PState #proc_state{ channels = {undefined, undefined} }.
+%% TODO amqp_pub/2 is publishing messages asynchronously, using
+%% amqp_channel:cast_flow/3
+%% It does access check using check_publish/3 before submitting, but
+%% this is superfluous, as actual publishing will do the same
+%% check. While check results cached, it's still some unnecessary
+%% work.
+%% And the only reason to keep it that way is that it prevents useless
+%% crash messages flooding logs, as there is no code to handle async
+%% channel crash gracefully.
+%% It'd be better to rework the whole thing, removing performance
+%% penalty and some 50 lines of duplicate code. Maybe unlinking from
+%% channel, and adding it as a child of connection supervisor instead.
+%% But exact details are not yet clear.
amqp_pub(undefined, PState) ->
@@ -944,25 +996,12 @@ handle_ra_event(Evt, PState) ->
rabbit_log:debug("unhandled ra_event: ~w ", [Evt]),
-%% NB: check_*: MQTT spec says we should ack normally, ie pretend there
-%% was no auth error, but here we are closing the connection with an error. This
-%% is what happens anyway if there is an authorization failure at the AMQP 0-9-1 client level.
check_publish(TopicName, Fn, PState) ->
case check_topic_access(TopicName, write, PState) of
ok -> Fn();
_ -> {error, unauthorized, PState}
-check_subscribe([], Fn, _) ->
- Fn();
-check_subscribe([#mqtt_topic{name = TopicName} | Topics], Fn, PState) ->
- case check_topic_access(TopicName, read, PState) of
- ok -> check_subscribe(Topics, Fn, PState);
- _ -> {error, unauthorized, PState}
- end.
check_topic_access(TopicName, Access,
auth_state = #auth_state{user = User = #user{username = Username},
diff --git a/deps/rabbitmq_mqtt/test/auth_SUITE.erl b/deps/rabbitmq_mqtt/test/auth_SUITE.erl
index dd7dc9ee9e..b494939914 100644
--- a/deps/rabbitmq_mqtt/test/auth_SUITE.erl
+++ b/deps/rabbitmq_mqtt/test/auth_SUITE.erl
@@ -10,13 +10,15 @@
-define(CONNECT_TIMEOUT, 10000).
+-define(WAIT_LOG_NO_CRASHES, {["Generic server.*terminating"], fun () -> exit(there_should_be_no_crashes) end}).
all() ->
[{group, anonymous_no_ssl_user},
{group, anonymous_ssl_user},
{group, no_ssl_user},
{group, ssl_user},
- {group, client_id_propagation}].
+ {group, client_id_propagation},
+ {group, authz_handling}].
groups() ->
[{anonymous_ssl_user, [],
@@ -59,6 +61,15 @@ groups() ->
{client_id_propagation, [],
+ },
+ {authz_handling, [],
+ [no_queue_bind_permission,
+ no_queue_consume_permission,
+ no_queue_consume_permission_on_connect,
+ no_queue_delete_permission,
+ no_queue_declare_permission,
+ no_topic_read_permission,
+ no_topic_write_permission]
@@ -69,6 +80,23 @@ init_per_suite(Config) ->
end_per_suite(Config) ->
+init_per_group(authz_handling, Config0) ->
+ User = <<"mqtt-user">>,
+ Password = <<"mqtt-password">>,
+ VHost = <<"mqtt-vhost">>,
+ MqttConfig = {rabbitmq_mqtt, [{default_user, User}
+ ,{default_pass, Password}
+ ,{allow_anonymous, true}
+ ,{vhost, VHost}
+ ,{exchange, <<"amq.topic">>}
+ ]},
+ Config1 = rabbit_ct_helpers:run_setup_steps(rabbit_ct_helpers:merge_app_env(Config0, MqttConfig),
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()),
+ rabbit_ct_broker_helpers:add_user(Config1, User, Password),
+ rabbit_ct_broker_helpers:add_vhost(Config1, VHost),
+ [Log|_] = rabbit_ct_broker_helpers:rpc(Config1, 0, rabbit, log_locations, []),
+ [{mqtt_user, User}, {mqtt_vhost, VHost}, {mqtt_password, Password}, {log_location, Log}|Config1];
init_per_group(Group, Config) ->
Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
Config1 = rabbit_ct_helpers:set_config(Config, [
@@ -284,6 +312,26 @@ end_per_testcase(ssl_user_port_vhost_mapping_takes_precedence_over_cert_vhost_ma
ok = rabbit_ct_broker_helpers:delete_vhost(Config, VHostForPortVHostMapping),
ok = rabbit_ct_broker_helpers:clear_global_parameter(Config, mqtt_port_to_vhost_mapping),
rabbit_ct_helpers:testcase_finished(Config, ssl_user_port_vhost_mapping_takes_precedence_over_cert_vhost_mapping);
+end_per_testcase(Testcase, Config) when Testcase == no_queue_bind_permission;
+ Testcase == no_queue_consume_permission;
+ Testcase == no_queue_consume_permission_on_connect;
+ Testcase == no_queue_delete_permission;
+ Testcase == no_queue_declare_permission;
+ Testcase == no_topic_read_permission;
+ Testcase == no_topic_write_permission ->
+ %% So let's wait before logs are surely flushed
+ rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_log, error, [Marker]),
+ wait_log(Config, erlang:system_time(microsecond) + 1000000,
+ [{[Marker], fun () -> stop end}]),
+ %% Preserve file contents in case some investigation is needed, before truncating.
+ file:copy(?config(log_location, Config), iolist_to_binary([?config(log_location, Config), ".", atom_to_binary(Testcase)])),
+ %% And provide an empy log file for the next test in this group
+ file:write_file(?config(log_location, Config), <<>>),
+ rabbit_ct_helpers:testcase_finished(Config, Testcase);
end_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_finished(Config, Testcase).
@@ -448,9 +496,237 @@ client_id_propagation(Config) ->
+%% These tests try to cover all operations that are listed in the
+%% table in
+%% and which MQTT plugin tries to perform.
+%% Silly MQTT doesn't allow us to see any error codes in the protocol,
+%% so the only non-intrusive way to check for `access_refused`
+%% codepath is by checking logs. Every testcase from this group
+%% truncates log file beforehand, so it'd be easier to analyze. There
+%% is additional wait in the corresponding end_per_testcase that
+%% ensures that logs were for the current testcase were completely
+%% flushed, and won't contaminate following tests from this group.
+%% Then each test-case asserts that logs contain following things:
+%% 1) Handling of access_refused error handler in MQTT reader:
+%% 2) Mention of which AMQP operation caused that error (that one is
+%% kinda superflous, it just makes sure that every AMQP operation
+%% in MQTT plugin was tested)
+no_queue_bind_permission(Config) ->
+ test_subscribe_permissions_combination(<<".*">>, <<"">>, <<".*">>, Config,
+ ["operation queue.bind caused a channel exception access_refused"]).
+no_queue_consume_permission(Config) ->
+ test_subscribe_permissions_combination(<<".*">>, <<".*">>, <<"^amq\\.topic">>, Config,
+ ["operation basic.consume caused a channel exception access_refused"]).
+no_queue_delete_permission(Config) ->
+ set_permissions(".*", ".*", ".*", Config),
+ C1 = open_mqtt_connection(Config, [{client_id, <<"no_queue_delete_permission">>}, {clean_sess, false}]),
+ emqttc:sync_subscribe(C1, {<<"test/topic">>, qos1}),
+ emqttc:disconnect(C1),
+ set_permissions(<<>>, ".*", ".*", Config),
+ %% And now we have a durable queue that user don't have permissions to delete
+ %% Attempt to establish clean session should fail
+ expect_server_error(
+ fun () ->
+ connect_user(
+ ?config(mqtt_user, Config), ?config(mqtt_password, Config),
+ Config, ?config(mqtt_user, Config),
+ [{client_id, <<"no_queue_delete_permission">>}, {clean_sess, true}])
+ end),
+ wait_log(Config, erlang:system_time(microsecond) + 1000000,
+ [{["Generic server.*terminating"], fun () -> exit(there_should_be_no_crashes) end}
+ ,{["operation queue.delete caused a channel exception access_refused",
+ "MQTT cannot start a clean session: `configure` permission missing for queue"],
+ fun () -> stop end}
+ ]),
+ ok.
+no_queue_consume_permission_on_connect(Config) ->
+ set_permissions(".*", ".*", ".*", Config),
+ C1 = open_mqtt_connection(Config, [{client_id, <<"no_queue_consume_permission_on_connect">>}, {clean_sess, false}]),
+ emqttc:sync_subscribe(C1, {<<"test/topic">>, qos1}),
+ emqttc:disconnect(C1),
+ set_permissions(".*", ".*", "^amq\\.topic", Config),
+ expect_server_error(
+ fun () ->
+ connect_user(
+ ?config(mqtt_user, Config), ?config(mqtt_password, Config),
+ Config, ?config(mqtt_user, Config),
+ [{client_id, <<"no_queue_consume_permission_on_connect">>}, {clean_sess, false}])
+ end),
+ wait_log(Config, erlang:system_time(microsecond) + 1000000,
+ [{["Generic server.*terminating"], fun () -> exit(there_should_be_no_crashes) end}
+ ,{["operation basic.consume caused a channel exception access_refused",
+ "MQTT cannot recover a session, user is missing permissions"],
+ fun () -> stop end}
+ ]),
+ ok.
+no_queue_declare_permission(Config) ->
+ rabbit_ct_broker_helpers:set_permissions(Config, ?config(mqtt_user, Config), ?config(mqtt_vhost, Config), <<"">>, <<".*">>, <<".*">>),
+ P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt),
+ {ok, C} = emqttc:start_link([{host, "localhost"},
+ {port, P},
+ {client_id, <<"no_queue_declare_permission">>},
+ {proto_ver, 3},
+ {logger, info},
+ {username, ?config(mqtt_user, Config)},
+ {password, ?config(mqtt_password, Config)},
+ {clean_sess, false}
+ ]),
+ receive
+ {mqttc, _, connected} -> ok
+ after
+ ?CONNECT_TIMEOUT -> exit(emqttc_connection_timeout)
+ end,
+ process_flag(trap_exit, true),
+ try emqttc:sync_subscribe(C, <<"test/topic">>) of
+ _ -> exit(this_should_not_succeed)
+ catch
+ exit:{{shutdown, tcp_closed} , _} -> ok
+ end,
+ process_flag(trap_exit, false),
+ wait_log(Config, erlang:system_time(microsecond) + 1000000,
+ [{["Generic server.*terminating"], fun () -> exit(there_should_be_no_crashes) end}
+ ,{["MQTT protocol error on connection.*access_refused",
+ "operation queue.declare caused a channel exception access_refused"],
+ fun () -> stop end}
+ ]),
+ ok.
+no_topic_read_permission(Config) ->
+ set_permissions(".*", ".*", ".*", Config),
+ set_topic_permissions("^allow-write\\..*", "^allow-read\\..*", Config),
+ C = open_mqtt_connection(Config),
+ emqttc:sync_subscribe(C, <<"allow-read/some/topic">>), %% Just to be sure that our permission setup is indeed working
+ expect_sync_error(fun () ->
+ emqttc:sync_subscribe(C, <<"test/topic">>)
+ end),
+ wait_log(Config, erlang:system_time(microsecond) + 1000000,
+ ,{["MQTT protocol error on connection.*access_refused",
+ "operation queue.bind caused a channel exception access_refused: access to topic 'test.topic' in exchange 'amq.topic' in vhost 'mqtt-vhost' refused for user 'mqtt-user'"],
+ fun () -> stop end}
+ ]),
+ ok.
+no_topic_write_permission(Config) ->
+ set_permissions(".*", ".*", ".*", Config),
+ set_topic_permissions("^allow-write\\..*", "^allow-read\\..*", Config),
+ C = open_mqtt_connection(Config),
+ emqttc:sync_publish(C, <<"allow-write/some/topic">>, <<"payload">>, qos1), %% Just to be sure that our permission setup is indeed working
+ expect_sync_error(fun () ->
+ emqttc:sync_publish(C, <<"some/other/topic">>, <<"payload">>, qos1)
+ end),
+ wait_log(Config, erlang:system_time(microsecond) + 1000000,
+ ,{["access to topic 'some.other.topic' in exchange 'amq.topic' in vhost 'mqtt-vhost' refused for user 'mqtt-user'",
+ "MQTT connection.*is closing due to an authorization failure"],
+ fun () -> stop end}
+ ]),
+ ok.
+expect_server_error(Fun) ->
+ process_flag(trap_exit, true),
+ {ok, C} = Fun(),
+ Result = receive
+ {mqttc, C, connected} -> {error, unexpected_successful_connection};
+ {'EXIT', C, {shutdown,{connack_error,'CONNACK_SERVER'}}} -> ok;
+ {'EXIT', C, {shutdown, Err}} -> {error, unexpected_error, Err}
+ after
+ ?CONNECT_TIMEOUT -> {error, emqttc_connection_timeout}
+ end,
+ process_flag(trap_exit, false),
+ case Result of
+ ok -> ok;
+ {error, E} -> exit(E)
+ end.
+expect_sync_error(Fun) ->
+ process_flag(trap_exit, true),
+ try Fun() of
+ _ -> exit(this_should_not_succeed)
+ catch
+ exit:{{shutdown, tcp_closed} , _} -> ok
+ after
+ process_flag(trap_exit, false)
+ end.
+set_topic_permissions(WritePat, ReadPat, Config) ->
+ rabbit_ct_broker_helpers:rpc(Config, 0,
+ rabbit_auth_backend_internal, set_topic_permissions,
+ [?config(mqtt_user, Config), ?config(mqtt_vhost, Config),
+ <<"amq.topic">>, WritePat, ReadPat, <<"acting-user">>]).
+set_permissions(PermConf, PermWrite, PermRead, Config) ->
+ rabbit_ct_broker_helpers:set_permissions(Config, ?config(mqtt_user, Config), ?config(mqtt_vhost, Config),
+ iolist_to_binary(PermConf),
+ iolist_to_binary(PermWrite),
+ iolist_to_binary(PermRead)).
+open_mqtt_connection(Config) ->
+ open_mqtt_connection(Config, []).
+open_mqtt_connection(Config, Opts) ->
+ {ok, C} = connect_user(?config(mqtt_user, Config), ?config(mqtt_password, Config), Config, ?config(mqtt_user, Config), Opts),
+ receive
+ {mqttc, _, connected} -> ok
+ after
+ ?CONNECT_TIMEOUT -> exit(emqttc_connection_timeout)
+ end,
+ C.
+test_subscribe_permissions_combination(PermConf, PermWrite, PermRead, Config, ExtraLogChecks) ->
+ rabbit_ct_broker_helpers:set_permissions(Config, ?config(mqtt_user, Config), ?config(mqtt_vhost, Config), PermConf, PermWrite, PermRead),
+ {ok, C} = connect_user(?config(mqtt_user, Config), ?config(mqtt_password, Config), Config),
+ receive
+ {mqttc, _, connected} -> ok
+ after
+ ?CONNECT_TIMEOUT -> exit(emqttc_connection_timeout)
+ end,
+ process_flag(trap_exit, true),
+ try emqttc:sync_subscribe(C, <<"test/topic">>) of
+ _ -> exit(this_should_not_succeed)
+ catch
+ exit:{{shutdown, tcp_closed} , _} -> ok
+ end,
+ process_flag(trap_exit, false),
+ wait_log(Config, erlang:system_time(microsecond) + 1000000,
+ [{["Generic server.*terminating"], fun () -> exit(there_should_be_no_crashes) end}
+ ,{["MQTT protocol error on connection.*access_refused"|ExtraLogChecks],
+ fun () -> stop end}
+ ]),
+ ok.
connect_user(User, Pass, Config) ->
- connect_user(User, Pass, Config, User).
+ connect_user(User, Pass, Config, User, []).
connect_user(User, Pass, Config, ClientID) ->
+ connect_user(User, Pass, Config, ClientID, []).
+connect_user(User, Pass, Config, ClientID, Opts) ->
Creds = case User of
undefined -> [];
_ -> [{username, User}]
@@ -459,11 +735,13 @@ connect_user(User, Pass, Config, ClientID) ->
_ -> [{password, Pass}]
P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt),
- emqttc:start_link([{host, "localhost"},
- {port, P},
- {client_id, ClientID},
- {proto_ver, 3},
- {logger, info}] ++ Creds).
+ emqttc:start_link(Opts
+ ++ [{host, "localhost"},
+ {port, P},
+ {client_id, ClientID},
+ {proto_ver, 3},
+ {logger, info}]
+ ++ Creds).
expect_successful_connection(ConnectFun, Config) ->
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_core_metrics, reset_auth_attempt_metrics, []),
@@ -504,3 +782,38 @@ expect_authentication_failure(ConnectFun, Config) ->
ok -> ok;
{error, Err} -> exit(Err)
+wait_log(Config, Deadline, Clauses) ->
+ {ok, Content} = file:read_file(?config(log_location, Config)),
+ case erlang:system_time(microsecond) of
+ T when T > Deadline ->
+ lists:foreach(fun
+ ({REs, _}) ->
+ Matches = [ io_lib:format("~p - ~s~n", [RE, re:run(Content, RE, [{capture, none}])]) || RE <- REs ],
+ ct:pal("Wait log clause status: ~s", [Matches])
+ end, Clauses),
+ exit(no_log_lines_detected);
+ _ -> ok
+ end,
+ case wait_log_check_clauses(Content, Clauses) of
+ stop -> ok;
+ continue ->
+ timer:sleep(50),
+ wait_log(Config, Deadline, Clauses)
+ end,
+ ok.
+wait_log_check_clauses(_, []) ->
+ continue;
+wait_log_check_clauses(Content, [{REs, Fun}|Rest]) ->
+ case multiple_re_match(Content, REs) of
+ true -> Fun();
+ _ ->
+ wait_log_check_clauses(Content, Rest)
+ end.
+multiple_re_match(Content, REs) ->
+ lists:all(fun (RE) ->
+ match == re:run(Content, RE, [{capture, none}])
+ end,
+ REs).