summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Ansari <david.ansari@gmx.de>2023-03-16 08:58:46 +0000
committerDavid Ansari <david.ansari@gmx.de>2023-05-15 15:19:55 +0000
commitd1fa6e83c3a127f56c7bb10c64e597757629b060 (patch)
tree47bfd766ac4aaf1861e0b49df1f3c962e5f20466
parent7fa723d2129148e9520be0e8669fe65574d9f12f (diff)
downloadrabbitmq-server-git-d1fa6e83c3a127f56c7bb10c64e597757629b060.tar.gz
Remove code duplication
-rw-r--r--deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl23
-rw-r--r--deps/rabbitmq_mqtt/test/v5_SUITE.erl2
-rw-r--r--deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl10
3 files changed, 18 insertions, 17 deletions
diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
index 6941ae6de6..0b25c4e804 100644
--- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
+++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
@@ -305,18 +305,15 @@ process_request(?PUBACK,
{ok, State}
end;
-%% MQTT 5 spec 3.3.1.2 QoS
-%% If the Server included a Maximum QoS in its CONNACK response
-%% to a Client and it receives a PUBLISH packet with a QoS greater than this
-%% then it uses DISCONNECT with Reason Code 0x9B (QoS not supported).
process_request(?PUBLISH,
#mqtt_packet{fixed = #mqtt_packet_fixed{qos = ?QOS_2}},
- State = #state{cfg = #cfg{
- proto_ver = ?MQTT_PROTO_V5,
- client_id = ClientID}}) ->
- ?LOG_INFO("Terminating MQTT connection. QoS not supported, client ID: ~s, "
- "protocol version: ~p, QoS: ~p",
- [ClientID, ?MQTT_PROTO_V5, ?QOS_2]),
+ State = #state{cfg = #cfg{proto_ver = ?MQTT_PROTO_V5,
+ client_id = ClientId}}) ->
+ %% MQTT 5 spec 3.3.1.2 QoS
+ %% "If the Server included a Maximum QoS in its CONNACK response
+ %% to a Client and it receives a PUBLISH packet with a QoS greater than this
+ %% then it uses DISCONNECT with Reason Code 0x9B (QoS not supported)."
+ ?LOG_WARNING("Received PUBLISH with QoS2. Disconnecting MQTT client ~ts", [ClientId]),
send_disconnect(?RC_QOS_NOT_SUPPORTED, State),
{stop, {disconnect, server_initiated}, State};
process_request(?PUBLISH,
@@ -722,9 +719,9 @@ maybe_send_retained_message(RPid, #mqtt_topic{filter = Topic0, qos = SubscribeQo
make_will_msg(#mqtt_packet_connect{will_flag = false}) ->
{ok, undefined};
-make_will_msg(#mqtt_packet_connect{proto_ver = 5,
- will_flag = true,
- will_qos = ?QOS_2}) ->
+make_will_msg(#mqtt_packet_connect{will_flag = true,
+ will_qos = ?QOS_2,
+ proto_ver = 5}) ->
{error, ?RC_QOS_NOT_SUPPORTED};
make_will_msg(#mqtt_packet_connect{will_flag = true,
will_retain = Retain,
diff --git a/deps/rabbitmq_mqtt/test/v5_SUITE.erl b/deps/rabbitmq_mqtt/test/v5_SUITE.erl
index edaaa78420..78e8223815 100644
--- a/deps/rabbitmq_mqtt/test/v5_SUITE.erl
+++ b/deps/rabbitmq_mqtt/test/v5_SUITE.erl
@@ -269,7 +269,7 @@ client_publish_qos2(Config) ->
{C, Connect} = start_client(ClientId, Config, 0, []),
?assertMatch({ok, #{'Maximum-QoS' := 1}}, Connect(C)),
?assertEqual({error, {disconnected, _RcQosNotSupported = 155, #{}}},
- emqtt:publish(C, Topic, <<"msg">>, [{qos, 2}])).
+ emqtt:publish(C, Topic, <<"msg">>, qos2)).
client_rejects_publish(Config) ->
NumRejectedBefore = dead_letter_metric(messages_dead_lettered_rejected_total, Config),
diff --git a/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl b/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl
index 01426af027..3649d4a0c9 100644
--- a/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl
+++ b/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl
@@ -286,7 +286,7 @@ handle_data1(Data, State = #state{socket = Socket,
{error, Reason, _} ->
stop_mqtt_protocol_error(State, Reason, ConnName);
{stop, {disconnect, server_initiated}, _} ->
- self() ! {stop, ?CLOSE_PROTOCOL_ERROR, server_initiated_disconnect},
+ defer_close(),
{[], State};
{stop, {disconnect, client_initiated}, ProcState1} ->
stop({_SendWill = false, State#state{proc_state = ProcState1}})
@@ -294,8 +294,7 @@ handle_data1(Data, State = #state{socket = Socket,
end;
{error, {disconnect_reason_code, ReasonCode}} ->
rabbit_mqtt_processor:send_disconnect(ReasonCode, ProcState),
- %% Instead of closing immediately, first send the DISCONNECT packet to the client.
- self() ! {stop, ?CLOSE_PROTOCOL_ERROR, server_initiated_disconnect},
+ defer_close(),
{[], State};
{error, Reason} ->
stop_mqtt_protocol_error(State, Reason, ConnName)
@@ -306,6 +305,11 @@ handle_data1(Data, State = #state{socket = Socket,
stop_mqtt_protocol_error(State, cannot_parse, ConnName)
end.
+%% Allow DISCONNECT packet to be sent to client before closing the connection.
+defer_close() ->
+ self() ! {stop, ?CLOSE_PROTOCOL_ERROR, server_initiated_disconnect},
+ ok.
+
stop_mqtt_protocol_error(State, Reason, ConnName) ->
?LOG_WARNING("Web MQTT protocol error ~tp for connection ~tp", [Reason, ConnName]),
stop(State, ?CLOSE_PROTOCOL_ERROR, Reason).