summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <klishinm@vmware.com>2021-11-13 04:28:57 +0300
committerGitHub <noreply@github.com>2021-11-13 04:28:57 +0300
commit38982e2c672aeb203eca508cc9691586e1fb8b8b (patch)
tree905c074b82d0ded7d79254e898e0618827b08ae1
parent98f9b126af8dbb7a625e02c0f0adfe3fc6b0ff92 (diff)
parente0723d5e6620182380566ee36d381af068ecf43d (diff)
downloadrabbitmq-server-git-38982e2c672aeb203eca508cc9691586e1fb8b8b.tar.gz
Merge pull request #3714 from rabbitmq/mqtt-crash-log
Prevent crash logs when mqtt user is missing permissions
-rw-r--r--deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl195
-rw-r--r--deps/rabbitmq_mqtt/test/auth_SUITE.erl327
2 files changed, 437 insertions, 85 deletions
diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
index ab9a8cb75b..d904450cb9 100644
--- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
+++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
@@ -66,9 +66,18 @@ process_frame(#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }},
{error, connect_expected, PState};
process_frame(Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }},
PState) ->
- case process_request(Type, Frame, PState) of
+ try process_request(Type, Frame, PState) of
{ok, PState1} -> {ok, PState1, PState1#proc_state.connection};
Ret -> Ret
+ catch
+ _:{{shutdown, {server_initiated_close, 403, _}}, _} ->
+ %% NB: MQTT spec says we should ack normally, ie pretend
+ %% there was no auth error, but here we are closing the
+ %% connection with an error. This is what happens anyway
+ %% if there is an authorization failure at the AMQP 0-9-1
+ %% client level. And error was already logged by AMQP
+ %% channel, so no need for custom logging.
+ {error, access_refused, PState}
end.
add_client_id_to_adapter_info(ClientId, #amqp_adapter_info{additional_info = AdditionalInfo0} = AdapterInfo) ->
@@ -133,8 +142,7 @@ process_request(?CONNECT,
{UserBin, PassBin} ->
case process_login(UserBin, PassBin, ProtoVersion, PState1) of
connack_dup_auth ->
- {SessionPresent0, PState2} = maybe_clean_sess(PState1),
- {{?CONNACK_ACCEPT, SessionPresent0}, PState2};
+ maybe_clean_sess(PState1);
{?CONNACK_ACCEPT, Conn, VHost, AState} ->
case rabbit_mqtt_collector:register(ClientId, self()) of
{ok, Corr} ->
@@ -156,8 +164,7 @@ process_request(?CONNECT,
retainer_pid = RetainerPid,
auth_state = AState,
register_state = {pending, Corr}},
- {SessionPresent1, PState4} = maybe_clean_sess(PState3),
- {{?CONNACK_ACCEPT, SessionPresent1}, PState4};
+ maybe_clean_sess(PState3);
%% e.g. this node was removed from the MQTT cluster members
{error, _} = Err ->
rabbit_log_connection:error("MQTT cannot accept a connection: "
@@ -256,44 +263,43 @@ process_request(?SUBSCRIBE,
message_id = StateMsgId,
mqtt2amqp_fun = Mqtt2AmqpFun} = PState0) ->
rabbit_log_connection:debug("Received a SUBSCRIBE for topic(s) ~p", [Topics]),
- check_subscribe(Topics, fun() ->
- {QosResponse, PState1} =
- lists:foldl(fun (#mqtt_topic{name = TopicName,
- qos = Qos}, {QosList, PState}) ->
- SupportedQos = supported_subs_qos(Qos),
- {Queue, #proc_state{subscriptions = Subs} = PState1} =
- ensure_queue(SupportedQos, PState),
- RoutingKey = Mqtt2AmqpFun(TopicName),
- Binding = #'queue.bind'{
- queue = Queue,
- exchange = Exchange,
- routing_key = RoutingKey},
- #'queue.bind_ok'{} = amqp_channel:call(Channel, Binding),
- SupportedQosList = case maps:find(TopicName, Subs) of
- {ok, L} -> [SupportedQos|L];
- error -> [SupportedQos]
- end,
- {[SupportedQos | QosList],
- PState1 #proc_state{
- subscriptions =
- maps:put(TopicName, SupportedQosList, Subs)}}
- end, {[], PState0}, Topics),
- SendFun(#mqtt_frame{fixed = #mqtt_frame_fixed{type = ?SUBACK},
- variable = #mqtt_frame_suback{
- message_id = SubscribeMsgId,
- qos_table = QosResponse}}, PState1),
- %% we may need to send up to length(Topics) messages.
- %% if QoS is > 0 then we need to generate a message id,
- %% and increment the counter.
- StartMsgId = safe_max_id(SubscribeMsgId, StateMsgId),
- N = lists:foldl(fun (Topic, Acc) ->
- case maybe_send_retained_message(RPid, Topic, Acc, PState1) of
- {true, X} -> Acc + X;
- false -> Acc
- end
- end, StartMsgId, Topics),
- {ok, PState1#proc_state{message_id = N}}
- end, PState0);
+
+ {QosResponse, PState1} =
+ lists:foldl(fun (#mqtt_topic{name = TopicName,
+ qos = Qos}, {QosList, PState}) ->
+ SupportedQos = supported_subs_qos(Qos),
+ {Queue, #proc_state{subscriptions = Subs} = PState1} =
+ ensure_queue(SupportedQos, PState),
+ RoutingKey = Mqtt2AmqpFun(TopicName),
+ Binding = #'queue.bind'{
+ queue = Queue,
+ exchange = Exchange,
+ routing_key = RoutingKey},
+ #'queue.bind_ok'{} = amqp_channel:call(Channel, Binding),
+ SupportedQosList = case maps:find(TopicName, Subs) of
+ {ok, L} -> [SupportedQos|L];
+ error -> [SupportedQos]
+ end,
+ {[SupportedQos | QosList],
+ PState1 #proc_state{
+ subscriptions =
+ maps:put(TopicName, SupportedQosList, Subs)}}
+ end, {[], PState0}, Topics),
+ SendFun(#mqtt_frame{fixed = #mqtt_frame_fixed{type = ?SUBACK},
+ variable = #mqtt_frame_suback{
+ message_id = SubscribeMsgId,
+ qos_table = QosResponse}}, PState1),
+ %% we may need to send up to length(Topics) messages.
+ %% if QoS is > 0 then we need to generate a message id,
+ %% and increment the counter.
+ StartMsgId = safe_max_id(SubscribeMsgId, StateMsgId),
+ N = lists:foldl(fun (Topic, Acc) ->
+ case maybe_send_retained_message(RPid, Topic, Acc, PState1) of
+ {true, X} -> Acc + X;
+ false -> Acc
+ end
+ end, StartMsgId, Topics),
+ {ok, PState1#proc_state{message_id = N}};
process_request(?UNSUBSCRIBE,
#mqtt_frame{
@@ -489,35 +495,65 @@ delivery_qos(Tag, Headers, #proc_state{ consumer_tags = {_, Tag} }) ->
maybe_clean_sess(PState = #proc_state { clean_sess = false,
connection = Conn,
+ auth_state = #auth_state{vhost = VHost},
client_id = ClientId }) ->
- SessionPresent = session_present(Conn, ClientId),
- {_Queue, PState1} = ensure_queue(?QOS_1, PState),
- {SessionPresent, PState1};
+ SessionPresent = session_present(VHost, ClientId),
+ case SessionPresent of
+ false ->
+ %% ensure_queue/2 not only ensures that queue is created, but also starts consuming from it.
+ %% Let's avoid creating that queue until explicitly asked by a client.
+ %% Then publish-only clients, that connect with clean_sess=true due to some misconfiguration,
+ %% will consume less resources.
+ {{?CONNACK_ACCEPT, SessionPresent}, PState};
+ true ->
+ try ensure_queue(?QOS_1, PState) of
+ {_Queue, PState1} -> {{?CONNACK_ACCEPT, SessionPresent}, PState1}
+ catch
+ exit:({{shutdown, {server_initiated_close, 403, _}}, _}) ->
+ %% Connection is not yet propagated to #proc_state{}, let's close it here
+ catch amqp_connection:close(Conn),
+ rabbit_log_connection:error("MQTT cannot recover a session, user is missing permissions"),
+ {?CONNACK_SERVER, PState};
+ C:E:S ->
+ %% Connection is not yet propagated to
+ %% #proc_state{}, let's close it here.
+ %% This is an exceptional situation anyway, but
+ %% doing this will prevent second crash from
+ %% amqp client being logged.
+ catch amqp_connection:close(Conn),
+ erlang:raise(C, E, S)
+ end
+ end;
maybe_clean_sess(PState = #proc_state { clean_sess = true,
connection = Conn,
+ auth_state = #auth_state{vhost = VHost},
client_id = ClientId }) ->
{_, Queue} = rabbit_mqtt_util:subcription_queue_name(ClientId),
{ok, Channel} = amqp_connection:open_channel(Conn),
- ok = try amqp_channel:call(Channel, #'queue.delete'{ queue = Queue }) of
- #'queue.delete_ok'{} -> ok
- catch
- exit:_Error -> ok
- after
- amqp_channel:close(Channel)
- end,
- {false, PState}.
-
-session_present(Conn, ClientId) ->
+ case session_present(VHost, ClientId) of
+ false ->
+ {{?CONNACK_ACCEPT, false}, PState};
+ true ->
+ try amqp_channel:call(Channel, #'queue.delete'{ queue = Queue }) of
+ #'queue.delete_ok'{} -> {{?CONNACK_ACCEPT, false}, PState}
+ catch
+ exit:({{shutdown, {server_initiated_close, 403, _}}, _}) ->
+ %% Connection is not yet propagated to #proc_state{}, let's close it here
+ catch amqp_connection:close(Conn),
+ rabbit_log_connection:error("MQTT cannot start a clean session: "
+ "`configure` permission missing for queue `~p`", [Queue]),
+ {?CONNACK_SERVER, PState}
+ after
+ catch amqp_channel:close(Channel)
+ end
+ end.
+
+session_present(VHost, ClientId) ->
{_, QueueQ1} = rabbit_mqtt_util:subcription_queue_name(ClientId),
- Declare = #'queue.declare'{queue = QueueQ1,
- passive = true},
- {ok, Channel} = amqp_connection:open_channel(Conn),
- try
- amqp_channel:call(Channel, Declare),
- amqp_channel:close(Channel),
- true
- catch exit:{{shutdown, {server_initiated_close, ?NOT_FOUND, _Text}}, _} ->
- false
+ QueueName = rabbit_misc:r(VHost, queue, QueueQ1),
+ case rabbit_amqqueue:lookup(QueueName) of
+ {ok, _} -> true;
+ {error, not_found} -> false
end.
make_will_msg(#mqtt_frame_connect{ will_flag = false }) ->
@@ -832,6 +868,22 @@ send_will(PState = #proc_state{will_msg = WillMsg = #mqtt_msg{retain = Retain,
end,
PState #proc_state{ channels = {undefined, undefined} }.
+%% TODO amqp_pub/2 is publishing messages asynchronously, using
+%% amqp_channel:cast_flow/3
+%%
+%% It does access check using check_publish/3 before submitting, but
+%% this is superfluous, as actual publishing will do the same
+%% check. While check results cached, it's still some unnecessary
+%% work.
+%%
+%% And the only reason to keep it that way is that it prevents useless
+%% crash messages flooding logs, as there is no code to handle async
+%% channel crash gracefully.
+%%
+%% It'd be better to rework the whole thing, removing performance
+%% penalty and some 50 lines of duplicate code. Maybe unlinking from
+%% channel, and adding it as a child of connection supervisor instead.
+%% But exact details are not yet clear.
amqp_pub(undefined, PState) ->
PState;
@@ -944,25 +996,12 @@ handle_ra_event(Evt, PState) ->
rabbit_log:debug("unhandled ra_event: ~w ", [Evt]),
PState.
-%% NB: check_*: MQTT spec says we should ack normally, ie pretend there
-%% was no auth error, but here we are closing the connection with an error. This
-%% is what happens anyway if there is an authorization failure at the AMQP 0-9-1 client level.
-
check_publish(TopicName, Fn, PState) ->
case check_topic_access(TopicName, write, PState) of
ok -> Fn();
_ -> {error, unauthorized, PState}
end.
-check_subscribe([], Fn, _) ->
- Fn();
-
-check_subscribe([#mqtt_topic{name = TopicName} | Topics], Fn, PState) ->
- case check_topic_access(TopicName, read, PState) of
- ok -> check_subscribe(Topics, Fn, PState);
- _ -> {error, unauthorized, PState}
- end.
-
check_topic_access(TopicName, Access,
#proc_state{
auth_state = #auth_state{user = User = #user{username = Username},
diff --git a/deps/rabbitmq_mqtt/test/auth_SUITE.erl b/deps/rabbitmq_mqtt/test/auth_SUITE.erl
index dd7dc9ee9e..b494939914 100644
--- a/deps/rabbitmq_mqtt/test/auth_SUITE.erl
+++ b/deps/rabbitmq_mqtt/test/auth_SUITE.erl
@@ -10,13 +10,15 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-define(CONNECT_TIMEOUT, 10000).
+-define(WAIT_LOG_NO_CRASHES, {["Generic server.*terminating"], fun () -> exit(there_should_be_no_crashes) end}).
all() ->
[{group, anonymous_no_ssl_user},
{group, anonymous_ssl_user},
{group, no_ssl_user},
{group, ssl_user},
- {group, client_id_propagation}].
+ {group, client_id_propagation},
+ {group, authz_handling}].
groups() ->
[{anonymous_ssl_user, [],
@@ -59,6 +61,15 @@ groups() ->
]},
{client_id_propagation, [],
[client_id_propagation]
+ },
+ {authz_handling, [],
+ [no_queue_bind_permission,
+ no_queue_consume_permission,
+ no_queue_consume_permission_on_connect,
+ no_queue_delete_permission,
+ no_queue_declare_permission,
+ no_topic_read_permission,
+ no_topic_write_permission]
}
].
@@ -69,6 +80,23 @@ init_per_suite(Config) ->
end_per_suite(Config) ->
Config.
+init_per_group(authz_handling, Config0) ->
+ User = <<"mqtt-user">>,
+ Password = <<"mqtt-password">>,
+ VHost = <<"mqtt-vhost">>,
+ MqttConfig = {rabbitmq_mqtt, [{default_user, User}
+ ,{default_pass, Password}
+ ,{allow_anonymous, true}
+ ,{vhost, VHost}
+ ,{exchange, <<"amq.topic">>}
+ ]},
+ Config1 = rabbit_ct_helpers:run_setup_steps(rabbit_ct_helpers:merge_app_env(Config0, MqttConfig),
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()),
+ rabbit_ct_broker_helpers:add_user(Config1, User, Password),
+ rabbit_ct_broker_helpers:add_vhost(Config1, VHost),
+ [Log|_] = rabbit_ct_broker_helpers:rpc(Config1, 0, rabbit, log_locations, []),
+ [{mqtt_user, User}, {mqtt_vhost, VHost}, {mqtt_password, Password}, {log_location, Log}|Config1];
init_per_group(Group, Config) ->
Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
Config1 = rabbit_ct_helpers:set_config(Config, [
@@ -284,6 +312,26 @@ end_per_testcase(ssl_user_port_vhost_mapping_takes_precedence_over_cert_vhost_ma
ok = rabbit_ct_broker_helpers:delete_vhost(Config, VHostForPortVHostMapping),
ok = rabbit_ct_broker_helpers:clear_global_parameter(Config, mqtt_port_to_vhost_mapping),
rabbit_ct_helpers:testcase_finished(Config, ssl_user_port_vhost_mapping_takes_precedence_over_cert_vhost_mapping);
+end_per_testcase(Testcase, Config) when Testcase == no_queue_bind_permission;
+ Testcase == no_queue_consume_permission;
+ Testcase == no_queue_consume_permission_on_connect;
+ Testcase == no_queue_delete_permission;
+ Testcase == no_queue_declare_permission;
+ Testcase == no_topic_read_permission;
+ Testcase == no_topic_write_permission ->
+ %% So let's wait before logs are surely flushed
+ Marker = "MQTT_AUTH_SUITE_MARKER",
+ rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_log, error, [Marker]),
+ wait_log(Config, erlang:system_time(microsecond) + 1000000,
+ [{[Marker], fun () -> stop end}]),
+
+ %% Preserve file contents in case some investigation is needed, before truncating.
+ file:copy(?config(log_location, Config), iolist_to_binary([?config(log_location, Config), ".", atom_to_binary(Testcase)])),
+
+ %% And provide an empy log file for the next test in this group
+ file:write_file(?config(log_location, Config), <<>>),
+
+ rabbit_ct_helpers:testcase_finished(Config, Testcase);
end_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_finished(Config, Testcase).
@@ -448,9 +496,237 @@ client_id_propagation(Config) ->
emqttc:disconnect(C).
+%% These tests try to cover all operations that are listed in the
+%% table in https://www.rabbitmq.com/access-control.html#authorisation
+%% and which MQTT plugin tries to perform.
+%%
+%% Silly MQTT doesn't allow us to see any error codes in the protocol,
+%% so the only non-intrusive way to check for `access_refused`
+%% codepath is by checking logs. Every testcase from this group
+%% truncates log file beforehand, so it'd be easier to analyze. There
+%% is additional wait in the corresponding end_per_testcase that
+%% ensures that logs were for the current testcase were completely
+%% flushed, and won't contaminate following tests from this group.
+%%
+%% Then each test-case asserts that logs contain following things:
+%% 1) Handling of access_refused error handler in MQTT reader:
+%% https://github.com/rabbitmq/rabbitmq-server/blob/69dc53fb8938c7f135bf0002b0904cf28c25c571/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl#L332
+%% 2) Mention of which AMQP operation caused that error (that one is
+%% kinda superflous, it just makes sure that every AMQP operation
+%% in MQTT plugin was tested)
+no_queue_bind_permission(Config) ->
+ test_subscribe_permissions_combination(<<".*">>, <<"">>, <<".*">>, Config,
+ ["operation queue.bind caused a channel exception access_refused"]).
+
+no_queue_consume_permission(Config) ->
+ test_subscribe_permissions_combination(<<".*">>, <<".*">>, <<"^amq\\.topic">>, Config,
+ ["operation basic.consume caused a channel exception access_refused"]).
+
+no_queue_delete_permission(Config) ->
+ set_permissions(".*", ".*", ".*", Config),
+ C1 = open_mqtt_connection(Config, [{client_id, <<"no_queue_delete_permission">>}, {clean_sess, false}]),
+ emqttc:sync_subscribe(C1, {<<"test/topic">>, qos1}),
+ emqttc:disconnect(C1),
+ set_permissions(<<>>, ".*", ".*", Config),
+
+ %% And now we have a durable queue that user don't have permissions to delete
+ %% Attempt to establish clean session should fail
+
+ expect_server_error(
+ fun () ->
+ connect_user(
+ ?config(mqtt_user, Config), ?config(mqtt_password, Config),
+ Config, ?config(mqtt_user, Config),
+ [{client_id, <<"no_queue_delete_permission">>}, {clean_sess, true}])
+ end),
+
+ wait_log(Config, erlang:system_time(microsecond) + 1000000,
+ [{["Generic server.*terminating"], fun () -> exit(there_should_be_no_crashes) end}
+ ,{["operation queue.delete caused a channel exception access_refused",
+ "MQTT cannot start a clean session: `configure` permission missing for queue"],
+ fun () -> stop end}
+ ]),
+ ok.
+
+no_queue_consume_permission_on_connect(Config) ->
+ set_permissions(".*", ".*", ".*", Config),
+ C1 = open_mqtt_connection(Config, [{client_id, <<"no_queue_consume_permission_on_connect">>}, {clean_sess, false}]),
+ emqttc:sync_subscribe(C1, {<<"test/topic">>, qos1}),
+ emqttc:disconnect(C1),
+ set_permissions(".*", ".*", "^amq\\.topic", Config),
+
+ expect_server_error(
+ fun () ->
+ connect_user(
+ ?config(mqtt_user, Config), ?config(mqtt_password, Config),
+ Config, ?config(mqtt_user, Config),
+ [{client_id, <<"no_queue_consume_permission_on_connect">>}, {clean_sess, false}])
+ end),
+
+ wait_log(Config, erlang:system_time(microsecond) + 1000000,
+ [{["Generic server.*terminating"], fun () -> exit(there_should_be_no_crashes) end}
+ ,{["operation basic.consume caused a channel exception access_refused",
+ "MQTT cannot recover a session, user is missing permissions"],
+ fun () -> stop end}
+ ]),
+ ok.
+
+
+no_queue_declare_permission(Config) ->
+ rabbit_ct_broker_helpers:set_permissions(Config, ?config(mqtt_user, Config), ?config(mqtt_vhost, Config), <<"">>, <<".*">>, <<".*">>),
+ P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt),
+ {ok, C} = emqttc:start_link([{host, "localhost"},
+ {port, P},
+ {client_id, <<"no_queue_declare_permission">>},
+ {proto_ver, 3},
+ {logger, info},
+ {username, ?config(mqtt_user, Config)},
+ {password, ?config(mqtt_password, Config)},
+ {clean_sess, false}
+ ]),
+
+ receive
+ {mqttc, _, connected} -> ok
+ after
+ ?CONNECT_TIMEOUT -> exit(emqttc_connection_timeout)
+ end,
+
+ process_flag(trap_exit, true),
+ try emqttc:sync_subscribe(C, <<"test/topic">>) of
+ _ -> exit(this_should_not_succeed)
+ catch
+ exit:{{shutdown, tcp_closed} , _} -> ok
+ end,
+ process_flag(trap_exit, false),
+
+ wait_log(Config, erlang:system_time(microsecond) + 1000000,
+ [{["Generic server.*terminating"], fun () -> exit(there_should_be_no_crashes) end}
+ ,{["MQTT protocol error on connection.*access_refused",
+ "operation queue.declare caused a channel exception access_refused"],
+ fun () -> stop end}
+ ]),
+ ok.
+
+no_topic_read_permission(Config) ->
+ set_permissions(".*", ".*", ".*", Config),
+ set_topic_permissions("^allow-write\\..*", "^allow-read\\..*", Config),
+
+ C = open_mqtt_connection(Config),
+
+ emqttc:sync_subscribe(C, <<"allow-read/some/topic">>), %% Just to be sure that our permission setup is indeed working
+
+ expect_sync_error(fun () ->
+ emqttc:sync_subscribe(C, <<"test/topic">>)
+ end),
+ wait_log(Config, erlang:system_time(microsecond) + 1000000,
+ [?WAIT_LOG_NO_CRASHES
+ ,{["MQTT protocol error on connection.*access_refused",
+ "operation queue.bind caused a channel exception access_refused: access to topic 'test.topic' in exchange 'amq.topic' in vhost 'mqtt-vhost' refused for user 'mqtt-user'"],
+ fun () -> stop end}
+ ]),
+ ok.
+
+no_topic_write_permission(Config) ->
+ set_permissions(".*", ".*", ".*", Config),
+ set_topic_permissions("^allow-write\\..*", "^allow-read\\..*", Config),
+ C = open_mqtt_connection(Config),
+
+ emqttc:sync_publish(C, <<"allow-write/some/topic">>, <<"payload">>, qos1), %% Just to be sure that our permission setup is indeed working
+
+ expect_sync_error(fun () ->
+ emqttc:sync_publish(C, <<"some/other/topic">>, <<"payload">>, qos1)
+ end),
+ wait_log(Config, erlang:system_time(microsecond) + 1000000,
+ [?WAIT_LOG_NO_CRASHES
+ ,{["access to topic 'some.other.topic' in exchange 'amq.topic' in vhost 'mqtt-vhost' refused for user 'mqtt-user'",
+ "MQTT connection.*is closing due to an authorization failure"],
+ fun () -> stop end}
+ ]),
+ ok.
+
+
+expect_server_error(Fun) ->
+ process_flag(trap_exit, true),
+ {ok, C} = Fun(),
+ Result = receive
+ {mqttc, C, connected} -> {error, unexpected_successful_connection};
+ {'EXIT', C, {shutdown,{connack_error,'CONNACK_SERVER'}}} -> ok;
+ {'EXIT', C, {shutdown, Err}} -> {error, unexpected_error, Err}
+ after
+ ?CONNECT_TIMEOUT -> {error, emqttc_connection_timeout}
+ end,
+ process_flag(trap_exit, false),
+
+ case Result of
+ ok -> ok;
+ {error, E} -> exit(E)
+ end.
+
+expect_sync_error(Fun) ->
+ process_flag(trap_exit, true),
+ try Fun() of
+ _ -> exit(this_should_not_succeed)
+ catch
+ exit:{{shutdown, tcp_closed} , _} -> ok
+ after
+ process_flag(trap_exit, false)
+ end.
+
+set_topic_permissions(WritePat, ReadPat, Config) ->
+ rabbit_ct_broker_helpers:rpc(Config, 0,
+ rabbit_auth_backend_internal, set_topic_permissions,
+ [?config(mqtt_user, Config), ?config(mqtt_vhost, Config),
+ <<"amq.topic">>, WritePat, ReadPat, <<"acting-user">>]).
+
+
+set_permissions(PermConf, PermWrite, PermRead, Config) ->
+ rabbit_ct_broker_helpers:set_permissions(Config, ?config(mqtt_user, Config), ?config(mqtt_vhost, Config),
+ iolist_to_binary(PermConf),
+ iolist_to_binary(PermWrite),
+ iolist_to_binary(PermRead)).
+
+open_mqtt_connection(Config) ->
+ open_mqtt_connection(Config, []).
+open_mqtt_connection(Config, Opts) ->
+ {ok, C} = connect_user(?config(mqtt_user, Config), ?config(mqtt_password, Config), Config, ?config(mqtt_user, Config), Opts),
+ receive
+ {mqttc, _, connected} -> ok
+ after
+ ?CONNECT_TIMEOUT -> exit(emqttc_connection_timeout)
+ end,
+ C.
+
+test_subscribe_permissions_combination(PermConf, PermWrite, PermRead, Config, ExtraLogChecks) ->
+ rabbit_ct_broker_helpers:set_permissions(Config, ?config(mqtt_user, Config), ?config(mqtt_vhost, Config), PermConf, PermWrite, PermRead),
+
+ {ok, C} = connect_user(?config(mqtt_user, Config), ?config(mqtt_password, Config), Config),
+ receive
+ {mqttc, _, connected} -> ok
+ after
+ ?CONNECT_TIMEOUT -> exit(emqttc_connection_timeout)
+ end,
+
+ process_flag(trap_exit, true),
+ try emqttc:sync_subscribe(C, <<"test/topic">>) of
+ _ -> exit(this_should_not_succeed)
+ catch
+ exit:{{shutdown, tcp_closed} , _} -> ok
+ end,
+
+ process_flag(trap_exit, false),
+
+ wait_log(Config, erlang:system_time(microsecond) + 1000000,
+ [{["Generic server.*terminating"], fun () -> exit(there_should_be_no_crashes) end}
+ ,{["MQTT protocol error on connection.*access_refused"|ExtraLogChecks],
+ fun () -> stop end}
+ ]),
+ ok.
+
connect_user(User, Pass, Config) ->
- connect_user(User, Pass, Config, User).
+ connect_user(User, Pass, Config, User, []).
connect_user(User, Pass, Config, ClientID) ->
+ connect_user(User, Pass, Config, ClientID, []).
+connect_user(User, Pass, Config, ClientID, Opts) ->
Creds = case User of
undefined -> [];
_ -> [{username, User}]
@@ -459,11 +735,13 @@ connect_user(User, Pass, Config, ClientID) ->
_ -> [{password, Pass}]
end,
P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt),
- emqttc:start_link([{host, "localhost"},
- {port, P},
- {client_id, ClientID},
- {proto_ver, 3},
- {logger, info}] ++ Creds).
+ emqttc:start_link(Opts
+ ++ [{host, "localhost"},
+ {port, P},
+ {client_id, ClientID},
+ {proto_ver, 3},
+ {logger, info}]
+ ++ Creds).
expect_successful_connection(ConnectFun, Config) ->
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_core_metrics, reset_auth_attempt_metrics, []),
@@ -504,3 +782,38 @@ expect_authentication_failure(ConnectFun, Config) ->
ok -> ok;
{error, Err} -> exit(Err)
end.
+
+wait_log(Config, Deadline, Clauses) ->
+ {ok, Content} = file:read_file(?config(log_location, Config)),
+ case erlang:system_time(microsecond) of
+ T when T > Deadline ->
+ lists:foreach(fun
+ ({REs, _}) ->
+ Matches = [ io_lib:format("~p - ~s~n", [RE, re:run(Content, RE, [{capture, none}])]) || RE <- REs ],
+ ct:pal("Wait log clause status: ~s", [Matches])
+ end, Clauses),
+ exit(no_log_lines_detected);
+ _ -> ok
+ end,
+ case wait_log_check_clauses(Content, Clauses) of
+ stop -> ok;
+ continue ->
+ timer:sleep(50),
+ wait_log(Config, Deadline, Clauses)
+ end,
+ ok.
+
+wait_log_check_clauses(_, []) ->
+ continue;
+wait_log_check_clauses(Content, [{REs, Fun}|Rest]) ->
+ case multiple_re_match(Content, REs) of
+ true -> Fun();
+ _ ->
+ wait_log_check_clauses(Content, Rest)
+ end.
+
+multiple_re_match(Content, REs) ->
+ lists:all(fun (RE) ->
+ match == re:run(Content, RE, [{capture, none}])
+ end,
+ REs).