diff options
author | Michael Klishin <klishinm@vmware.com> | 2021-11-13 04:28:57 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-11-13 04:28:57 +0300 |
commit | 38982e2c672aeb203eca508cc9691586e1fb8b8b (patch) | |
tree | 905c074b82d0ded7d79254e898e0618827b08ae1 | |
parent | 98f9b126af8dbb7a625e02c0f0adfe3fc6b0ff92 (diff) | |
parent | e0723d5e6620182380566ee36d381af068ecf43d (diff) | |
download | rabbitmq-server-git-38982e2c672aeb203eca508cc9691586e1fb8b8b.tar.gz |
Merge pull request #3714 from rabbitmq/mqtt-crash-log
Prevent crash logs when mqtt user is missing permissions
-rw-r--r-- | deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl | 195 | ||||
-rw-r--r-- | deps/rabbitmq_mqtt/test/auth_SUITE.erl | 327 |
2 files changed, 437 insertions, 85 deletions
diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl index ab9a8cb75b..d904450cb9 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl @@ -66,9 +66,18 @@ process_frame(#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}, {error, connect_expected, PState}; process_frame(Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}, PState) -> - case process_request(Type, Frame, PState) of + try process_request(Type, Frame, PState) of {ok, PState1} -> {ok, PState1, PState1#proc_state.connection}; Ret -> Ret + catch + _:{{shutdown, {server_initiated_close, 403, _}}, _} -> + %% NB: MQTT spec says we should ack normally, ie pretend + %% there was no auth error, but here we are closing the + %% connection with an error. This is what happens anyway + %% if there is an authorization failure at the AMQP 0-9-1 + %% client level. And error was already logged by AMQP + %% channel, so no need for custom logging. + {error, access_refused, PState} end. add_client_id_to_adapter_info(ClientId, #amqp_adapter_info{additional_info = AdditionalInfo0} = AdapterInfo) -> @@ -133,8 +142,7 @@ process_request(?CONNECT, {UserBin, PassBin} -> case process_login(UserBin, PassBin, ProtoVersion, PState1) of connack_dup_auth -> - {SessionPresent0, PState2} = maybe_clean_sess(PState1), - {{?CONNACK_ACCEPT, SessionPresent0}, PState2}; + maybe_clean_sess(PState1); {?CONNACK_ACCEPT, Conn, VHost, AState} -> case rabbit_mqtt_collector:register(ClientId, self()) of {ok, Corr} -> @@ -156,8 +164,7 @@ process_request(?CONNECT, retainer_pid = RetainerPid, auth_state = AState, register_state = {pending, Corr}}, - {SessionPresent1, PState4} = maybe_clean_sess(PState3), - {{?CONNACK_ACCEPT, SessionPresent1}, PState4}; + maybe_clean_sess(PState3); %% e.g. this node was removed from the MQTT cluster members {error, _} = Err -> rabbit_log_connection:error("MQTT cannot accept a connection: " @@ -256,44 +263,43 @@ process_request(?SUBSCRIBE, message_id = StateMsgId, mqtt2amqp_fun = Mqtt2AmqpFun} = PState0) -> rabbit_log_connection:debug("Received a SUBSCRIBE for topic(s) ~p", [Topics]), - check_subscribe(Topics, fun() -> - {QosResponse, PState1} = - lists:foldl(fun (#mqtt_topic{name = TopicName, - qos = Qos}, {QosList, PState}) -> - SupportedQos = supported_subs_qos(Qos), - {Queue, #proc_state{subscriptions = Subs} = PState1} = - ensure_queue(SupportedQos, PState), - RoutingKey = Mqtt2AmqpFun(TopicName), - Binding = #'queue.bind'{ - queue = Queue, - exchange = Exchange, - routing_key = RoutingKey}, - #'queue.bind_ok'{} = amqp_channel:call(Channel, Binding), - SupportedQosList = case maps:find(TopicName, Subs) of - {ok, L} -> [SupportedQos|L]; - error -> [SupportedQos] - end, - {[SupportedQos | QosList], - PState1 #proc_state{ - subscriptions = - maps:put(TopicName, SupportedQosList, Subs)}} - end, {[], PState0}, Topics), - SendFun(#mqtt_frame{fixed = #mqtt_frame_fixed{type = ?SUBACK}, - variable = #mqtt_frame_suback{ - message_id = SubscribeMsgId, - qos_table = QosResponse}}, PState1), - %% we may need to send up to length(Topics) messages. - %% if QoS is > 0 then we need to generate a message id, - %% and increment the counter. - StartMsgId = safe_max_id(SubscribeMsgId, StateMsgId), - N = lists:foldl(fun (Topic, Acc) -> - case maybe_send_retained_message(RPid, Topic, Acc, PState1) of - {true, X} -> Acc + X; - false -> Acc - end - end, StartMsgId, Topics), - {ok, PState1#proc_state{message_id = N}} - end, PState0); + + {QosResponse, PState1} = + lists:foldl(fun (#mqtt_topic{name = TopicName, + qos = Qos}, {QosList, PState}) -> + SupportedQos = supported_subs_qos(Qos), + {Queue, #proc_state{subscriptions = Subs} = PState1} = + ensure_queue(SupportedQos, PState), + RoutingKey = Mqtt2AmqpFun(TopicName), + Binding = #'queue.bind'{ + queue = Queue, + exchange = Exchange, + routing_key = RoutingKey}, + #'queue.bind_ok'{} = amqp_channel:call(Channel, Binding), + SupportedQosList = case maps:find(TopicName, Subs) of + {ok, L} -> [SupportedQos|L]; + error -> [SupportedQos] + end, + {[SupportedQos | QosList], + PState1 #proc_state{ + subscriptions = + maps:put(TopicName, SupportedQosList, Subs)}} + end, {[], PState0}, Topics), + SendFun(#mqtt_frame{fixed = #mqtt_frame_fixed{type = ?SUBACK}, + variable = #mqtt_frame_suback{ + message_id = SubscribeMsgId, + qos_table = QosResponse}}, PState1), + %% we may need to send up to length(Topics) messages. + %% if QoS is > 0 then we need to generate a message id, + %% and increment the counter. + StartMsgId = safe_max_id(SubscribeMsgId, StateMsgId), + N = lists:foldl(fun (Topic, Acc) -> + case maybe_send_retained_message(RPid, Topic, Acc, PState1) of + {true, X} -> Acc + X; + false -> Acc + end + end, StartMsgId, Topics), + {ok, PState1#proc_state{message_id = N}}; process_request(?UNSUBSCRIBE, #mqtt_frame{ @@ -489,35 +495,65 @@ delivery_qos(Tag, Headers, #proc_state{ consumer_tags = {_, Tag} }) -> maybe_clean_sess(PState = #proc_state { clean_sess = false, connection = Conn, + auth_state = #auth_state{vhost = VHost}, client_id = ClientId }) -> - SessionPresent = session_present(Conn, ClientId), - {_Queue, PState1} = ensure_queue(?QOS_1, PState), - {SessionPresent, PState1}; + SessionPresent = session_present(VHost, ClientId), + case SessionPresent of + false -> + %% ensure_queue/2 not only ensures that queue is created, but also starts consuming from it. + %% Let's avoid creating that queue until explicitly asked by a client. + %% Then publish-only clients, that connect with clean_sess=true due to some misconfiguration, + %% will consume less resources. + {{?CONNACK_ACCEPT, SessionPresent}, PState}; + true -> + try ensure_queue(?QOS_1, PState) of + {_Queue, PState1} -> {{?CONNACK_ACCEPT, SessionPresent}, PState1} + catch + exit:({{shutdown, {server_initiated_close, 403, _}}, _}) -> + %% Connection is not yet propagated to #proc_state{}, let's close it here + catch amqp_connection:close(Conn), + rabbit_log_connection:error("MQTT cannot recover a session, user is missing permissions"), + {?CONNACK_SERVER, PState}; + C:E:S -> + %% Connection is not yet propagated to + %% #proc_state{}, let's close it here. + %% This is an exceptional situation anyway, but + %% doing this will prevent second crash from + %% amqp client being logged. + catch amqp_connection:close(Conn), + erlang:raise(C, E, S) + end + end; maybe_clean_sess(PState = #proc_state { clean_sess = true, connection = Conn, + auth_state = #auth_state{vhost = VHost}, client_id = ClientId }) -> {_, Queue} = rabbit_mqtt_util:subcription_queue_name(ClientId), {ok, Channel} = amqp_connection:open_channel(Conn), - ok = try amqp_channel:call(Channel, #'queue.delete'{ queue = Queue }) of - #'queue.delete_ok'{} -> ok - catch - exit:_Error -> ok - after - amqp_channel:close(Channel) - end, - {false, PState}. - -session_present(Conn, ClientId) -> + case session_present(VHost, ClientId) of + false -> + {{?CONNACK_ACCEPT, false}, PState}; + true -> + try amqp_channel:call(Channel, #'queue.delete'{ queue = Queue }) of + #'queue.delete_ok'{} -> {{?CONNACK_ACCEPT, false}, PState} + catch + exit:({{shutdown, {server_initiated_close, 403, _}}, _}) -> + %% Connection is not yet propagated to #proc_state{}, let's close it here + catch amqp_connection:close(Conn), + rabbit_log_connection:error("MQTT cannot start a clean session: " + "`configure` permission missing for queue `~p`", [Queue]), + {?CONNACK_SERVER, PState} + after + catch amqp_channel:close(Channel) + end + end. + +session_present(VHost, ClientId) -> {_, QueueQ1} = rabbit_mqtt_util:subcription_queue_name(ClientId), - Declare = #'queue.declare'{queue = QueueQ1, - passive = true}, - {ok, Channel} = amqp_connection:open_channel(Conn), - try - amqp_channel:call(Channel, Declare), - amqp_channel:close(Channel), - true - catch exit:{{shutdown, {server_initiated_close, ?NOT_FOUND, _Text}}, _} -> - false + QueueName = rabbit_misc:r(VHost, queue, QueueQ1), + case rabbit_amqqueue:lookup(QueueName) of + {ok, _} -> true; + {error, not_found} -> false end. make_will_msg(#mqtt_frame_connect{ will_flag = false }) -> @@ -832,6 +868,22 @@ send_will(PState = #proc_state{will_msg = WillMsg = #mqtt_msg{retain = Retain, end, PState #proc_state{ channels = {undefined, undefined} }. +%% TODO amqp_pub/2 is publishing messages asynchronously, using +%% amqp_channel:cast_flow/3 +%% +%% It does access check using check_publish/3 before submitting, but +%% this is superfluous, as actual publishing will do the same +%% check. While check results cached, it's still some unnecessary +%% work. +%% +%% And the only reason to keep it that way is that it prevents useless +%% crash messages flooding logs, as there is no code to handle async +%% channel crash gracefully. +%% +%% It'd be better to rework the whole thing, removing performance +%% penalty and some 50 lines of duplicate code. Maybe unlinking from +%% channel, and adding it as a child of connection supervisor instead. +%% But exact details are not yet clear. amqp_pub(undefined, PState) -> PState; @@ -944,25 +996,12 @@ handle_ra_event(Evt, PState) -> rabbit_log:debug("unhandled ra_event: ~w ", [Evt]), PState. -%% NB: check_*: MQTT spec says we should ack normally, ie pretend there -%% was no auth error, but here we are closing the connection with an error. This -%% is what happens anyway if there is an authorization failure at the AMQP 0-9-1 client level. - check_publish(TopicName, Fn, PState) -> case check_topic_access(TopicName, write, PState) of ok -> Fn(); _ -> {error, unauthorized, PState} end. -check_subscribe([], Fn, _) -> - Fn(); - -check_subscribe([#mqtt_topic{name = TopicName} | Topics], Fn, PState) -> - case check_topic_access(TopicName, read, PState) of - ok -> check_subscribe(Topics, Fn, PState); - _ -> {error, unauthorized, PState} - end. - check_topic_access(TopicName, Access, #proc_state{ auth_state = #auth_state{user = User = #user{username = Username}, diff --git a/deps/rabbitmq_mqtt/test/auth_SUITE.erl b/deps/rabbitmq_mqtt/test/auth_SUITE.erl index dd7dc9ee9e..b494939914 100644 --- a/deps/rabbitmq_mqtt/test/auth_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/auth_SUITE.erl @@ -10,13 +10,15 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). -define(CONNECT_TIMEOUT, 10000). +-define(WAIT_LOG_NO_CRASHES, {["Generic server.*terminating"], fun () -> exit(there_should_be_no_crashes) end}). all() -> [{group, anonymous_no_ssl_user}, {group, anonymous_ssl_user}, {group, no_ssl_user}, {group, ssl_user}, - {group, client_id_propagation}]. + {group, client_id_propagation}, + {group, authz_handling}]. groups() -> [{anonymous_ssl_user, [], @@ -59,6 +61,15 @@ groups() -> ]}, {client_id_propagation, [], [client_id_propagation] + }, + {authz_handling, [], + [no_queue_bind_permission, + no_queue_consume_permission, + no_queue_consume_permission_on_connect, + no_queue_delete_permission, + no_queue_declare_permission, + no_topic_read_permission, + no_topic_write_permission] } ]. @@ -69,6 +80,23 @@ init_per_suite(Config) -> end_per_suite(Config) -> Config. +init_per_group(authz_handling, Config0) -> + User = <<"mqtt-user">>, + Password = <<"mqtt-password">>, + VHost = <<"mqtt-vhost">>, + MqttConfig = {rabbitmq_mqtt, [{default_user, User} + ,{default_pass, Password} + ,{allow_anonymous, true} + ,{vhost, VHost} + ,{exchange, <<"amq.topic">>} + ]}, + Config1 = rabbit_ct_helpers:run_setup_steps(rabbit_ct_helpers:merge_app_env(Config0, MqttConfig), + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()), + rabbit_ct_broker_helpers:add_user(Config1, User, Password), + rabbit_ct_broker_helpers:add_vhost(Config1, VHost), + [Log|_] = rabbit_ct_broker_helpers:rpc(Config1, 0, rabbit, log_locations, []), + [{mqtt_user, User}, {mqtt_vhost, VHost}, {mqtt_password, Password}, {log_location, Log}|Config1]; init_per_group(Group, Config) -> Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"), Config1 = rabbit_ct_helpers:set_config(Config, [ @@ -284,6 +312,26 @@ end_per_testcase(ssl_user_port_vhost_mapping_takes_precedence_over_cert_vhost_ma ok = rabbit_ct_broker_helpers:delete_vhost(Config, VHostForPortVHostMapping), ok = rabbit_ct_broker_helpers:clear_global_parameter(Config, mqtt_port_to_vhost_mapping), rabbit_ct_helpers:testcase_finished(Config, ssl_user_port_vhost_mapping_takes_precedence_over_cert_vhost_mapping); +end_per_testcase(Testcase, Config) when Testcase == no_queue_bind_permission; + Testcase == no_queue_consume_permission; + Testcase == no_queue_consume_permission_on_connect; + Testcase == no_queue_delete_permission; + Testcase == no_queue_declare_permission; + Testcase == no_topic_read_permission; + Testcase == no_topic_write_permission -> + %% So let's wait before logs are surely flushed + Marker = "MQTT_AUTH_SUITE_MARKER", + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_log, error, [Marker]), + wait_log(Config, erlang:system_time(microsecond) + 1000000, + [{[Marker], fun () -> stop end}]), + + %% Preserve file contents in case some investigation is needed, before truncating. + file:copy(?config(log_location, Config), iolist_to_binary([?config(log_location, Config), ".", atom_to_binary(Testcase)])), + + %% And provide an empy log file for the next test in this group + file:write_file(?config(log_location, Config), <<>>), + + rabbit_ct_helpers:testcase_finished(Config, Testcase); end_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_finished(Config, Testcase). @@ -448,9 +496,237 @@ client_id_propagation(Config) -> emqttc:disconnect(C). +%% These tests try to cover all operations that are listed in the +%% table in https://www.rabbitmq.com/access-control.html#authorisation +%% and which MQTT plugin tries to perform. +%% +%% Silly MQTT doesn't allow us to see any error codes in the protocol, +%% so the only non-intrusive way to check for `access_refused` +%% codepath is by checking logs. Every testcase from this group +%% truncates log file beforehand, so it'd be easier to analyze. There +%% is additional wait in the corresponding end_per_testcase that +%% ensures that logs were for the current testcase were completely +%% flushed, and won't contaminate following tests from this group. +%% +%% Then each test-case asserts that logs contain following things: +%% 1) Handling of access_refused error handler in MQTT reader: +%% https://github.com/rabbitmq/rabbitmq-server/blob/69dc53fb8938c7f135bf0002b0904cf28c25c571/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl#L332 +%% 2) Mention of which AMQP operation caused that error (that one is +%% kinda superflous, it just makes sure that every AMQP operation +%% in MQTT plugin was tested) +no_queue_bind_permission(Config) -> + test_subscribe_permissions_combination(<<".*">>, <<"">>, <<".*">>, Config, + ["operation queue.bind caused a channel exception access_refused"]). + +no_queue_consume_permission(Config) -> + test_subscribe_permissions_combination(<<".*">>, <<".*">>, <<"^amq\\.topic">>, Config, + ["operation basic.consume caused a channel exception access_refused"]). + +no_queue_delete_permission(Config) -> + set_permissions(".*", ".*", ".*", Config), + C1 = open_mqtt_connection(Config, [{client_id, <<"no_queue_delete_permission">>}, {clean_sess, false}]), + emqttc:sync_subscribe(C1, {<<"test/topic">>, qos1}), + emqttc:disconnect(C1), + set_permissions(<<>>, ".*", ".*", Config), + + %% And now we have a durable queue that user don't have permissions to delete + %% Attempt to establish clean session should fail + + expect_server_error( + fun () -> + connect_user( + ?config(mqtt_user, Config), ?config(mqtt_password, Config), + Config, ?config(mqtt_user, Config), + [{client_id, <<"no_queue_delete_permission">>}, {clean_sess, true}]) + end), + + wait_log(Config, erlang:system_time(microsecond) + 1000000, + [{["Generic server.*terminating"], fun () -> exit(there_should_be_no_crashes) end} + ,{["operation queue.delete caused a channel exception access_refused", + "MQTT cannot start a clean session: `configure` permission missing for queue"], + fun () -> stop end} + ]), + ok. + +no_queue_consume_permission_on_connect(Config) -> + set_permissions(".*", ".*", ".*", Config), + C1 = open_mqtt_connection(Config, [{client_id, <<"no_queue_consume_permission_on_connect">>}, {clean_sess, false}]), + emqttc:sync_subscribe(C1, {<<"test/topic">>, qos1}), + emqttc:disconnect(C1), + set_permissions(".*", ".*", "^amq\\.topic", Config), + + expect_server_error( + fun () -> + connect_user( + ?config(mqtt_user, Config), ?config(mqtt_password, Config), + Config, ?config(mqtt_user, Config), + [{client_id, <<"no_queue_consume_permission_on_connect">>}, {clean_sess, false}]) + end), + + wait_log(Config, erlang:system_time(microsecond) + 1000000, + [{["Generic server.*terminating"], fun () -> exit(there_should_be_no_crashes) end} + ,{["operation basic.consume caused a channel exception access_refused", + "MQTT cannot recover a session, user is missing permissions"], + fun () -> stop end} + ]), + ok. + + +no_queue_declare_permission(Config) -> + rabbit_ct_broker_helpers:set_permissions(Config, ?config(mqtt_user, Config), ?config(mqtt_vhost, Config), <<"">>, <<".*">>, <<".*">>), + P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt), + {ok, C} = emqttc:start_link([{host, "localhost"}, + {port, P}, + {client_id, <<"no_queue_declare_permission">>}, + {proto_ver, 3}, + {logger, info}, + {username, ?config(mqtt_user, Config)}, + {password, ?config(mqtt_password, Config)}, + {clean_sess, false} + ]), + + receive + {mqttc, _, connected} -> ok + after + ?CONNECT_TIMEOUT -> exit(emqttc_connection_timeout) + end, + + process_flag(trap_exit, true), + try emqttc:sync_subscribe(C, <<"test/topic">>) of + _ -> exit(this_should_not_succeed) + catch + exit:{{shutdown, tcp_closed} , _} -> ok + end, + process_flag(trap_exit, false), + + wait_log(Config, erlang:system_time(microsecond) + 1000000, + [{["Generic server.*terminating"], fun () -> exit(there_should_be_no_crashes) end} + ,{["MQTT protocol error on connection.*access_refused", + "operation queue.declare caused a channel exception access_refused"], + fun () -> stop end} + ]), + ok. + +no_topic_read_permission(Config) -> + set_permissions(".*", ".*", ".*", Config), + set_topic_permissions("^allow-write\\..*", "^allow-read\\..*", Config), + + C = open_mqtt_connection(Config), + + emqttc:sync_subscribe(C, <<"allow-read/some/topic">>), %% Just to be sure that our permission setup is indeed working + + expect_sync_error(fun () -> + emqttc:sync_subscribe(C, <<"test/topic">>) + end), + wait_log(Config, erlang:system_time(microsecond) + 1000000, + [?WAIT_LOG_NO_CRASHES + ,{["MQTT protocol error on connection.*access_refused", + "operation queue.bind caused a channel exception access_refused: access to topic 'test.topic' in exchange 'amq.topic' in vhost 'mqtt-vhost' refused for user 'mqtt-user'"], + fun () -> stop end} + ]), + ok. + +no_topic_write_permission(Config) -> + set_permissions(".*", ".*", ".*", Config), + set_topic_permissions("^allow-write\\..*", "^allow-read\\..*", Config), + C = open_mqtt_connection(Config), + + emqttc:sync_publish(C, <<"allow-write/some/topic">>, <<"payload">>, qos1), %% Just to be sure that our permission setup is indeed working + + expect_sync_error(fun () -> + emqttc:sync_publish(C, <<"some/other/topic">>, <<"payload">>, qos1) + end), + wait_log(Config, erlang:system_time(microsecond) + 1000000, + [?WAIT_LOG_NO_CRASHES + ,{["access to topic 'some.other.topic' in exchange 'amq.topic' in vhost 'mqtt-vhost' refused for user 'mqtt-user'", + "MQTT connection.*is closing due to an authorization failure"], + fun () -> stop end} + ]), + ok. + + +expect_server_error(Fun) -> + process_flag(trap_exit, true), + {ok, C} = Fun(), + Result = receive + {mqttc, C, connected} -> {error, unexpected_successful_connection}; + {'EXIT', C, {shutdown,{connack_error,'CONNACK_SERVER'}}} -> ok; + {'EXIT', C, {shutdown, Err}} -> {error, unexpected_error, Err} + after + ?CONNECT_TIMEOUT -> {error, emqttc_connection_timeout} + end, + process_flag(trap_exit, false), + + case Result of + ok -> ok; + {error, E} -> exit(E) + end. + +expect_sync_error(Fun) -> + process_flag(trap_exit, true), + try Fun() of + _ -> exit(this_should_not_succeed) + catch + exit:{{shutdown, tcp_closed} , _} -> ok + after + process_flag(trap_exit, false) + end. + +set_topic_permissions(WritePat, ReadPat, Config) -> + rabbit_ct_broker_helpers:rpc(Config, 0, + rabbit_auth_backend_internal, set_topic_permissions, + [?config(mqtt_user, Config), ?config(mqtt_vhost, Config), + <<"amq.topic">>, WritePat, ReadPat, <<"acting-user">>]). + + +set_permissions(PermConf, PermWrite, PermRead, Config) -> + rabbit_ct_broker_helpers:set_permissions(Config, ?config(mqtt_user, Config), ?config(mqtt_vhost, Config), + iolist_to_binary(PermConf), + iolist_to_binary(PermWrite), + iolist_to_binary(PermRead)). + +open_mqtt_connection(Config) -> + open_mqtt_connection(Config, []). +open_mqtt_connection(Config, Opts) -> + {ok, C} = connect_user(?config(mqtt_user, Config), ?config(mqtt_password, Config), Config, ?config(mqtt_user, Config), Opts), + receive + {mqttc, _, connected} -> ok + after + ?CONNECT_TIMEOUT -> exit(emqttc_connection_timeout) + end, + C. + +test_subscribe_permissions_combination(PermConf, PermWrite, PermRead, Config, ExtraLogChecks) -> + rabbit_ct_broker_helpers:set_permissions(Config, ?config(mqtt_user, Config), ?config(mqtt_vhost, Config), PermConf, PermWrite, PermRead), + + {ok, C} = connect_user(?config(mqtt_user, Config), ?config(mqtt_password, Config), Config), + receive + {mqttc, _, connected} -> ok + after + ?CONNECT_TIMEOUT -> exit(emqttc_connection_timeout) + end, + + process_flag(trap_exit, true), + try emqttc:sync_subscribe(C, <<"test/topic">>) of + _ -> exit(this_should_not_succeed) + catch + exit:{{shutdown, tcp_closed} , _} -> ok + end, + + process_flag(trap_exit, false), + + wait_log(Config, erlang:system_time(microsecond) + 1000000, + [{["Generic server.*terminating"], fun () -> exit(there_should_be_no_crashes) end} + ,{["MQTT protocol error on connection.*access_refused"|ExtraLogChecks], + fun () -> stop end} + ]), + ok. + connect_user(User, Pass, Config) -> - connect_user(User, Pass, Config, User). + connect_user(User, Pass, Config, User, []). connect_user(User, Pass, Config, ClientID) -> + connect_user(User, Pass, Config, ClientID, []). +connect_user(User, Pass, Config, ClientID, Opts) -> Creds = case User of undefined -> []; _ -> [{username, User}] @@ -459,11 +735,13 @@ connect_user(User, Pass, Config, ClientID) -> _ -> [{password, Pass}] end, P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt), - emqttc:start_link([{host, "localhost"}, - {port, P}, - {client_id, ClientID}, - {proto_ver, 3}, - {logger, info}] ++ Creds). + emqttc:start_link(Opts + ++ [{host, "localhost"}, + {port, P}, + {client_id, ClientID}, + {proto_ver, 3}, + {logger, info}] + ++ Creds). expect_successful_connection(ConnectFun, Config) -> rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_core_metrics, reset_auth_attempt_metrics, []), @@ -504,3 +782,38 @@ expect_authentication_failure(ConnectFun, Config) -> ok -> ok; {error, Err} -> exit(Err) end. + +wait_log(Config, Deadline, Clauses) -> + {ok, Content} = file:read_file(?config(log_location, Config)), + case erlang:system_time(microsecond) of + T when T > Deadline -> + lists:foreach(fun + ({REs, _}) -> + Matches = [ io_lib:format("~p - ~s~n", [RE, re:run(Content, RE, [{capture, none}])]) || RE <- REs ], + ct:pal("Wait log clause status: ~s", [Matches]) + end, Clauses), + exit(no_log_lines_detected); + _ -> ok + end, + case wait_log_check_clauses(Content, Clauses) of + stop -> ok; + continue -> + timer:sleep(50), + wait_log(Config, Deadline, Clauses) + end, + ok. + +wait_log_check_clauses(_, []) -> + continue; +wait_log_check_clauses(Content, [{REs, Fun}|Rest]) -> + case multiple_re_match(Content, REs) of + true -> Fun(); + _ -> + wait_log_check_clauses(Content, Rest) + end. + +multiple_re_match(Content, REs) -> + lists:all(fun (RE) -> + match == re:run(Content, RE, [{capture, none}]) + end, + REs). |