From 967e2622725258b885bea3364be1963e0e19d585 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Thu, 4 May 2023 08:51:30 +0000 Subject: Add MQTT client id to connection closed event As requested in https://github.com/rabbitmq/rabbitmq-server/discussions/6331#discussioncomment-5796154 include all infos that were emitted in the MQTT connection created event also in the MQTT connection closed event. This ensures infos such as MQTT client ID are part of the connection closed event. Therefore, it's easy for the user to correlate between the two event types. Note that the MQTT plugin emits connection created and connection closed events only if the CONNECT packet was successfully processed, i.e.authentication was successful. Remove the disconnected_at property because it was never used. rabbit_event already adds a timestamp to any event. --- deps/rabbitmq_mqtt/include/rabbit_mqtt.hrl | 3 ++- deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl | 16 +++------------- deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl | 10 +++++----- deps/rabbitmq_mqtt/test/shared_SUITE.erl | 18 +++++++++++------- deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl | 10 +++++----- 5 files changed, 26 insertions(+), 31 deletions(-) diff --git a/deps/rabbitmq_mqtt/include/rabbit_mqtt.hrl b/deps/rabbitmq_mqtt/include/rabbit_mqtt.hrl index c845240e55..e1fed692b9 100644 --- a/deps/rabbitmq_mqtt/include/rabbit_mqtt.hrl +++ b/deps/rabbitmq_mqtt/include/rabbit_mqtt.hrl @@ -50,7 +50,8 @@ messages_unacknowledged ]). --define(CREATION_EVENT_KEYS, +%% Connection opened or closed. +-define(EVENT_KEYS, ?ITEMS ++ [name, client_properties, diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl index 2e57eee9b8..48bc67d45b 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl @@ -149,7 +149,6 @@ process_connect( {ok, RaRegisterState} ?= register_client_id(VHost, ClientId), {TraceState, ConnName} = init_trace(VHost, ConnName0), ok = rabbit_mqtt_keepalive:start(KeepaliveSecs, Socket), - self() ! connection_created, {ok, #state{ cfg = #cfg{socket = Socket, @@ -207,6 +206,7 @@ process_connect(State0) -> {ok, SessPresent, State2} ?= handle_clean_sess_qos1(QoS0SessPresent, State1), State = cache_subscriptions(SessPresent, State2), rabbit_networking:register_non_amqp_connection(self()), + self() ! connection_created, {ok, SessPresent, State} else {error, _} = Error -> @@ -1190,19 +1190,9 @@ send(Packet, ProtoVer, SendFun) -> Data = rabbit_mqtt_packet:serialise(Packet, ProtoVer), ok = SendFun(Data). --spec terminate(boolean(), binary(), atom(), state()) -> ok. -terminate(SendWill, ConnName, ProtoFamily, - State = #state{cfg = #cfg{vhost = VHost}, - auth_state = #auth_state{user = #user{username = Username}} - }) -> +-spec terminate(boolean(), binary(), rabbit_event:event_props(), state()) -> ok. +terminate(SendWill, ConnName, Infos, State) -> maybe_send_will(SendWill, ConnName, State), - Infos = [{name, ConnName}, - {node, node()}, - {pid, self()}, - {disconnected_at, os:system_time(milli_seconds)}, - {protocol, {ProtoFamily, proto_version_tuple(State)}}, - {vhost, VHost}, - {user, Username}], rabbit_core_metrics:connection_closed(self()), rabbit_event:notify(connection_closed, Infos), rabbit_networking:unregister_non_amqp_connection(self()), diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl index 4be368404e..f40ef2862a 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl @@ -27,7 +27,6 @@ -type option(T) :: undefined | T. -define(HIBERNATE_AFTER, 1000). --define(PROTO_FAMILY, 'MQTT'). -record(state, {socket :: rabbit_net:socket(), @@ -140,7 +139,7 @@ handle_cast(QueueEvent = {queue_event, _, _}, end; handle_cast({force_event_refresh, Ref}, State0) -> - Infos = infos(?CREATION_EVENT_KEYS, State0), + Infos = infos(?EVENT_KEYS, State0), rabbit_event:notify(connection_created, Infos, Ref), State = rabbit_event:init_stats_timer(State0, #state.stats_timer), {noreply, State, ?HIBERNATE_AFTER}; @@ -154,7 +153,7 @@ handle_cast(Msg, State) -> {stop, {mqtt_unexpected_cast, Msg}, State}. handle_info(connection_created, State) -> - Infos = infos(?CREATION_EVENT_KEYS, State), + Infos = infos(?EVENT_KEYS, State), rabbit_core_metrics:connection_created(self(), Infos), rabbit_event:notify(connection_created, Infos), {noreply, State, ?HIBERNATE_AFTER}; @@ -265,7 +264,8 @@ terminate(Reason, {SendWill, State = #state{conn_name = ConnName, connect_packet_unprocessed -> ok; _ -> - rabbit_mqtt_processor:terminate(SendWill, ConnName, ?PROTO_FAMILY, PState) + Infos = infos(?EVENT_KEYS, State), + rabbit_mqtt_processor:terminate(SendWill, ConnName, Infos, PState) end, log_terminate(Reason, State). @@ -514,7 +514,7 @@ i(Cert, #state{socket = Sock}) i(timeout, #state{keepalive = KState}) -> rabbit_mqtt_keepalive:interval_secs(KState); i(protocol, #state{proc_state = ProcState}) -> - {?PROTO_FAMILY, rabbit_mqtt_processor:proto_version_tuple(ProcState)}; + {'MQTT', rabbit_mqtt_processor:proto_version_tuple(ProcState)}; i(Key, #state{proc_state = ProcState}) -> rabbit_mqtt_processor:info(Key, ProcState). diff --git a/deps/rabbitmq_mqtt/test/shared_SUITE.erl b/deps/rabbitmq_mqtt/test/shared_SUITE.erl index c629d72efc..de94bdee7a 100644 --- a/deps/rabbitmq_mqtt/test/shared_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/shared_SUITE.erl @@ -438,12 +438,13 @@ events(Config) -> true -> 'Web MQTT'; false -> 'MQTT' end, - ExpectedConnectionProps = [{protocol, {Proto, {3,1,1}}}, - {node, Server}, - {vhost, <<"/">>}, - {user, <<"guest">>}, - {pid, ConnectionPid}], - assert_event_prop(ExpectedConnectionProps, E1), + assert_event_prop([{protocol, {Proto, {3,1,1}}}, + {node, Server}, + {vhost, <<"/">>}, + {user, <<"guest">>}, + {client_properties, [{client_id, longstr, ClientId}]}, + {pid, ConnectionPid}], + E1), {ok, _, _} = emqtt:subscribe(C, <<"TopicA">>, qos0), @@ -490,7 +491,10 @@ events(Config) -> [E6, E7 | E8] = get_events(Server), assert_event_type(connection_closed, E6), - assert_event_prop(ExpectedConnectionProps, E6), + ?assertEqual(E1#event.props, E6#event.props, + "connection_closed event props should match connection_created event props. " + "See https://github.com/rabbitmq/rabbitmq-server/discussions/6331"), + case is_feature_flag_enabled(Config, rabbit_mqtt_qos0_queue) of true -> assert_event_type(queue_deleted, E7), diff --git a/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl b/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl index 2d837a7ea7..1131921beb 100644 --- a/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl +++ b/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl @@ -48,7 +48,6 @@ -define(CLOSE_NORMAL, 1000). -define(CLOSE_PROTOCOL_ERROR, 1002). -define(CLOSE_UNACCEPTABLE_DATA_TYPE, 1003). --define(PROTO_FAMILY, 'Web MQTT'). %% cowboy_sub_protcol upgrade(Req, Env, Handler, HandlerState) -> @@ -164,7 +163,7 @@ websocket_info({'$gen_cast', {close_connection, Reason}}, State = #state{ proc_s [rabbit_mqtt_processor:info(client_id, ProcState), ConnName, Reason]), stop(State); websocket_info({'$gen_cast', {force_event_refresh, Ref}}, State0) -> - Infos = infos(?CREATION_EVENT_KEYS, State0), + Infos = infos(?EVENT_KEYS, State0), rabbit_event:notify(connection_created, Infos, Ref), State = rabbit_event:init_stats_timer(State0, #state.stats_timer), {[], State, hibernate}; @@ -209,7 +208,7 @@ websocket_info({shutdown, Reason}, #state{conn_name = ConnName} = State) -> ?LOG_INFO("Web MQTT closing connection ~tp: ~tp", [ConnName, Reason]), stop(State, ?CLOSE_NORMAL, Reason); websocket_info(connection_created, State) -> - Infos = infos(?CREATION_EVENT_KEYS, State), + Infos = infos(?EVENT_KEYS, State), rabbit_core_metrics:connection_created(self(), Infos), rabbit_event:notify(connection_created, Infos), {[], State, hibernate}; @@ -231,7 +230,8 @@ terminate(_Reason, _Request, connect_packet_unprocessed -> ok; _ -> - rabbit_mqtt_processor:terminate(SendWill, ConnName, ?PROTO_FAMILY, PState) + Infos = infos(?EVENT_KEYS, State), + rabbit_mqtt_processor:terminate(SendWill, ConnName, Infos, PState) end. %% Internal. @@ -395,7 +395,7 @@ i(reductions, _) -> i(garbage_collection, _) -> rabbit_misc:get_gc_info(self()); i(protocol, #state{proc_state = PState}) -> - {?PROTO_FAMILY, rabbit_mqtt_processor:proto_version_tuple(PState)}; + {'Web MQTT', rabbit_mqtt_processor:proto_version_tuple(PState)}; i(SSL, #state{socket = Sock}) when SSL =:= ssl; SSL =:= ssl_protocol; -- cgit v1.2.1