diff options
author | David Ansari <david.ansari@gmx.de> | 2023-03-16 08:58:46 +0000 |
---|---|---|
committer | David Ansari <david.ansari@gmx.de> | 2023-05-15 15:19:55 +0000 |
commit | d1fa6e83c3a127f56c7bb10c64e597757629b060 (patch) | |
tree | 47bfd766ac4aaf1861e0b49df1f3c962e5f20466 | |
parent | 7fa723d2129148e9520be0e8669fe65574d9f12f (diff) | |
download | rabbitmq-server-git-d1fa6e83c3a127f56c7bb10c64e597757629b060.tar.gz |
Remove code duplication
-rw-r--r-- | deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl | 23 | ||||
-rw-r--r-- | deps/rabbitmq_mqtt/test/v5_SUITE.erl | 2 | ||||
-rw-r--r-- | deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl | 10 |
3 files changed, 18 insertions, 17 deletions
diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl index 6941ae6de6..0b25c4e804 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl @@ -305,18 +305,15 @@ process_request(?PUBACK, {ok, State} end; -%% MQTT 5 spec 3.3.1.2 QoS -%% If the Server included a Maximum QoS in its CONNACK response -%% to a Client and it receives a PUBLISH packet with a QoS greater than this -%% then it uses DISCONNECT with Reason Code 0x9B (QoS not supported). process_request(?PUBLISH, #mqtt_packet{fixed = #mqtt_packet_fixed{qos = ?QOS_2}}, - State = #state{cfg = #cfg{ - proto_ver = ?MQTT_PROTO_V5, - client_id = ClientID}}) -> - ?LOG_INFO("Terminating MQTT connection. QoS not supported, client ID: ~s, " - "protocol version: ~p, QoS: ~p", - [ClientID, ?MQTT_PROTO_V5, ?QOS_2]), + State = #state{cfg = #cfg{proto_ver = ?MQTT_PROTO_V5, + client_id = ClientId}}) -> + %% MQTT 5 spec 3.3.1.2 QoS + %% "If the Server included a Maximum QoS in its CONNACK response + %% to a Client and it receives a PUBLISH packet with a QoS greater than this + %% then it uses DISCONNECT with Reason Code 0x9B (QoS not supported)." + ?LOG_WARNING("Received PUBLISH with QoS2. Disconnecting MQTT client ~ts", [ClientId]), send_disconnect(?RC_QOS_NOT_SUPPORTED, State), {stop, {disconnect, server_initiated}, State}; process_request(?PUBLISH, @@ -722,9 +719,9 @@ maybe_send_retained_message(RPid, #mqtt_topic{filter = Topic0, qos = SubscribeQo make_will_msg(#mqtt_packet_connect{will_flag = false}) -> {ok, undefined}; -make_will_msg(#mqtt_packet_connect{proto_ver = 5, - will_flag = true, - will_qos = ?QOS_2}) -> +make_will_msg(#mqtt_packet_connect{will_flag = true, + will_qos = ?QOS_2, + proto_ver = 5}) -> {error, ?RC_QOS_NOT_SUPPORTED}; make_will_msg(#mqtt_packet_connect{will_flag = true, will_retain = Retain, diff --git a/deps/rabbitmq_mqtt/test/v5_SUITE.erl b/deps/rabbitmq_mqtt/test/v5_SUITE.erl index edaaa78420..78e8223815 100644 --- a/deps/rabbitmq_mqtt/test/v5_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/v5_SUITE.erl @@ -269,7 +269,7 @@ client_publish_qos2(Config) -> {C, Connect} = start_client(ClientId, Config, 0, []), ?assertMatch({ok, #{'Maximum-QoS' := 1}}, Connect(C)), ?assertEqual({error, {disconnected, _RcQosNotSupported = 155, #{}}}, - emqtt:publish(C, Topic, <<"msg">>, [{qos, 2}])). + emqtt:publish(C, Topic, <<"msg">>, qos2)). client_rejects_publish(Config) -> NumRejectedBefore = dead_letter_metric(messages_dead_lettered_rejected_total, Config), diff --git a/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl b/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl index 01426af027..3649d4a0c9 100644 --- a/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl +++ b/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl @@ -286,7 +286,7 @@ handle_data1(Data, State = #state{socket = Socket, {error, Reason, _} -> stop_mqtt_protocol_error(State, Reason, ConnName); {stop, {disconnect, server_initiated}, _} -> - self() ! {stop, ?CLOSE_PROTOCOL_ERROR, server_initiated_disconnect}, + defer_close(), {[], State}; {stop, {disconnect, client_initiated}, ProcState1} -> stop({_SendWill = false, State#state{proc_state = ProcState1}}) @@ -294,8 +294,7 @@ handle_data1(Data, State = #state{socket = Socket, end; {error, {disconnect_reason_code, ReasonCode}} -> rabbit_mqtt_processor:send_disconnect(ReasonCode, ProcState), - %% Instead of closing immediately, first send the DISCONNECT packet to the client. - self() ! {stop, ?CLOSE_PROTOCOL_ERROR, server_initiated_disconnect}, + defer_close(), {[], State}; {error, Reason} -> stop_mqtt_protocol_error(State, Reason, ConnName) @@ -306,6 +305,11 @@ handle_data1(Data, State = #state{socket = Socket, stop_mqtt_protocol_error(State, cannot_parse, ConnName) end. +%% Allow DISCONNECT packet to be sent to client before closing the connection. +defer_close() -> + self() ! {stop, ?CLOSE_PROTOCOL_ERROR, server_initiated_disconnect}, + ok. + stop_mqtt_protocol_error(State, Reason, ConnName) -> ?LOG_WARNING("Web MQTT protocol error ~tp for connection ~tp", [Reason, ConnName]), stop(State, ?CLOSE_PROTOCOL_ERROR, Reason). |