diff options
author | Michael Klishin <klishinm@vmware.com> | 2023-05-05 00:20:36 +0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-05-05 00:20:36 +0400 |
commit | e8ffc45cc9f5160836ce86ecc546f79c7f9bb8e5 (patch) | |
tree | 866ff29bbbe40f828cf6113f1ebe5a02e047f955 | |
parent | bbb98226e28185aa9deb60e856496b523d624f95 (diff) | |
parent | 967e2622725258b885bea3364be1963e0e19d585 (diff) | |
download | rabbitmq-server-git-e8ffc45cc9f5160836ce86ecc546f79c7f9bb8e5.tar.gz |
Merge pull request #8098 from rabbitmq/mqtt-connection-closed-event
Add MQTT client id to connection closed event
-rw-r--r-- | deps/rabbitmq_mqtt/include/rabbit_mqtt.hrl | 3 | ||||
-rw-r--r-- | deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl | 16 | ||||
-rw-r--r-- | deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl | 10 | ||||
-rw-r--r-- | deps/rabbitmq_mqtt/test/shared_SUITE.erl | 18 | ||||
-rw-r--r-- | deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl | 10 |
5 files changed, 26 insertions, 31 deletions
diff --git a/deps/rabbitmq_mqtt/include/rabbit_mqtt.hrl b/deps/rabbitmq_mqtt/include/rabbit_mqtt.hrl index c845240e55..e1fed692b9 100644 --- a/deps/rabbitmq_mqtt/include/rabbit_mqtt.hrl +++ b/deps/rabbitmq_mqtt/include/rabbit_mqtt.hrl @@ -50,7 +50,8 @@ messages_unacknowledged ]). --define(CREATION_EVENT_KEYS, +%% Connection opened or closed. +-define(EVENT_KEYS, ?ITEMS ++ [name, client_properties, diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl index 2e57eee9b8..48bc67d45b 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl @@ -149,7 +149,6 @@ process_connect( {ok, RaRegisterState} ?= register_client_id(VHost, ClientId), {TraceState, ConnName} = init_trace(VHost, ConnName0), ok = rabbit_mqtt_keepalive:start(KeepaliveSecs, Socket), - self() ! connection_created, {ok, #state{ cfg = #cfg{socket = Socket, @@ -207,6 +206,7 @@ process_connect(State0) -> {ok, SessPresent, State2} ?= handle_clean_sess_qos1(QoS0SessPresent, State1), State = cache_subscriptions(SessPresent, State2), rabbit_networking:register_non_amqp_connection(self()), + self() ! connection_created, {ok, SessPresent, State} else {error, _} = Error -> @@ -1190,19 +1190,9 @@ send(Packet, ProtoVer, SendFun) -> Data = rabbit_mqtt_packet:serialise(Packet, ProtoVer), ok = SendFun(Data). --spec terminate(boolean(), binary(), atom(), state()) -> ok. -terminate(SendWill, ConnName, ProtoFamily, - State = #state{cfg = #cfg{vhost = VHost}, - auth_state = #auth_state{user = #user{username = Username}} - }) -> +-spec terminate(boolean(), binary(), rabbit_event:event_props(), state()) -> ok. +terminate(SendWill, ConnName, Infos, State) -> maybe_send_will(SendWill, ConnName, State), - Infos = [{name, ConnName}, - {node, node()}, - {pid, self()}, - {disconnected_at, os:system_time(milli_seconds)}, - {protocol, {ProtoFamily, proto_version_tuple(State)}}, - {vhost, VHost}, - {user, Username}], rabbit_core_metrics:connection_closed(self()), rabbit_event:notify(connection_closed, Infos), rabbit_networking:unregister_non_amqp_connection(self()), diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl index 4be368404e..f40ef2862a 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl @@ -27,7 +27,6 @@ -type option(T) :: undefined | T. -define(HIBERNATE_AFTER, 1000). --define(PROTO_FAMILY, 'MQTT'). -record(state, {socket :: rabbit_net:socket(), @@ -140,7 +139,7 @@ handle_cast(QueueEvent = {queue_event, _, _}, end; handle_cast({force_event_refresh, Ref}, State0) -> - Infos = infos(?CREATION_EVENT_KEYS, State0), + Infos = infos(?EVENT_KEYS, State0), rabbit_event:notify(connection_created, Infos, Ref), State = rabbit_event:init_stats_timer(State0, #state.stats_timer), {noreply, State, ?HIBERNATE_AFTER}; @@ -154,7 +153,7 @@ handle_cast(Msg, State) -> {stop, {mqtt_unexpected_cast, Msg}, State}. handle_info(connection_created, State) -> - Infos = infos(?CREATION_EVENT_KEYS, State), + Infos = infos(?EVENT_KEYS, State), rabbit_core_metrics:connection_created(self(), Infos), rabbit_event:notify(connection_created, Infos), {noreply, State, ?HIBERNATE_AFTER}; @@ -265,7 +264,8 @@ terminate(Reason, {SendWill, State = #state{conn_name = ConnName, connect_packet_unprocessed -> ok; _ -> - rabbit_mqtt_processor:terminate(SendWill, ConnName, ?PROTO_FAMILY, PState) + Infos = infos(?EVENT_KEYS, State), + rabbit_mqtt_processor:terminate(SendWill, ConnName, Infos, PState) end, log_terminate(Reason, State). @@ -514,7 +514,7 @@ i(Cert, #state{socket = Sock}) i(timeout, #state{keepalive = KState}) -> rabbit_mqtt_keepalive:interval_secs(KState); i(protocol, #state{proc_state = ProcState}) -> - {?PROTO_FAMILY, rabbit_mqtt_processor:proto_version_tuple(ProcState)}; + {'MQTT', rabbit_mqtt_processor:proto_version_tuple(ProcState)}; i(Key, #state{proc_state = ProcState}) -> rabbit_mqtt_processor:info(Key, ProcState). diff --git a/deps/rabbitmq_mqtt/test/shared_SUITE.erl b/deps/rabbitmq_mqtt/test/shared_SUITE.erl index c629d72efc..de94bdee7a 100644 --- a/deps/rabbitmq_mqtt/test/shared_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/shared_SUITE.erl @@ -438,12 +438,13 @@ events(Config) -> true -> 'Web MQTT'; false -> 'MQTT' end, - ExpectedConnectionProps = [{protocol, {Proto, {3,1,1}}}, - {node, Server}, - {vhost, <<"/">>}, - {user, <<"guest">>}, - {pid, ConnectionPid}], - assert_event_prop(ExpectedConnectionProps, E1), + assert_event_prop([{protocol, {Proto, {3,1,1}}}, + {node, Server}, + {vhost, <<"/">>}, + {user, <<"guest">>}, + {client_properties, [{client_id, longstr, ClientId}]}, + {pid, ConnectionPid}], + E1), {ok, _, _} = emqtt:subscribe(C, <<"TopicA">>, qos0), @@ -490,7 +491,10 @@ events(Config) -> [E6, E7 | E8] = get_events(Server), assert_event_type(connection_closed, E6), - assert_event_prop(ExpectedConnectionProps, E6), + ?assertEqual(E1#event.props, E6#event.props, + "connection_closed event props should match connection_created event props. " + "See https://github.com/rabbitmq/rabbitmq-server/discussions/6331"), + case is_feature_flag_enabled(Config, rabbit_mqtt_qos0_queue) of true -> assert_event_type(queue_deleted, E7), diff --git a/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl b/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl index 2d837a7ea7..1131921beb 100644 --- a/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl +++ b/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl @@ -48,7 +48,6 @@ -define(CLOSE_NORMAL, 1000). -define(CLOSE_PROTOCOL_ERROR, 1002). -define(CLOSE_UNACCEPTABLE_DATA_TYPE, 1003). --define(PROTO_FAMILY, 'Web MQTT'). %% cowboy_sub_protcol upgrade(Req, Env, Handler, HandlerState) -> @@ -164,7 +163,7 @@ websocket_info({'$gen_cast', {close_connection, Reason}}, State = #state{ proc_s [rabbit_mqtt_processor:info(client_id, ProcState), ConnName, Reason]), stop(State); websocket_info({'$gen_cast', {force_event_refresh, Ref}}, State0) -> - Infos = infos(?CREATION_EVENT_KEYS, State0), + Infos = infos(?EVENT_KEYS, State0), rabbit_event:notify(connection_created, Infos, Ref), State = rabbit_event:init_stats_timer(State0, #state.stats_timer), {[], State, hibernate}; @@ -209,7 +208,7 @@ websocket_info({shutdown, Reason}, #state{conn_name = ConnName} = State) -> ?LOG_INFO("Web MQTT closing connection ~tp: ~tp", [ConnName, Reason]), stop(State, ?CLOSE_NORMAL, Reason); websocket_info(connection_created, State) -> - Infos = infos(?CREATION_EVENT_KEYS, State), + Infos = infos(?EVENT_KEYS, State), rabbit_core_metrics:connection_created(self(), Infos), rabbit_event:notify(connection_created, Infos), {[], State, hibernate}; @@ -231,7 +230,8 @@ terminate(_Reason, _Request, connect_packet_unprocessed -> ok; _ -> - rabbit_mqtt_processor:terminate(SendWill, ConnName, ?PROTO_FAMILY, PState) + Infos = infos(?EVENT_KEYS, State), + rabbit_mqtt_processor:terminate(SendWill, ConnName, Infos, PState) end. %% Internal. @@ -395,7 +395,7 @@ i(reductions, _) -> i(garbage_collection, _) -> rabbit_misc:get_gc_info(self()); i(protocol, #state{proc_state = PState}) -> - {?PROTO_FAMILY, rabbit_mqtt_processor:proto_version_tuple(PState)}; + {'Web MQTT', rabbit_mqtt_processor:proto_version_tuple(PState)}; i(SSL, #state{socket = Sock}) when SSL =:= ssl; SSL =:= ssl_protocol; |