summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <klishinm@vmware.com>2023-05-05 00:20:36 +0400
committerGitHub <noreply@github.com>2023-05-05 00:20:36 +0400
commite8ffc45cc9f5160836ce86ecc546f79c7f9bb8e5 (patch)
tree866ff29bbbe40f828cf6113f1ebe5a02e047f955
parentbbb98226e28185aa9deb60e856496b523d624f95 (diff)
parent967e2622725258b885bea3364be1963e0e19d585 (diff)
downloadrabbitmq-server-git-e8ffc45cc9f5160836ce86ecc546f79c7f9bb8e5.tar.gz
Merge pull request #8098 from rabbitmq/mqtt-connection-closed-event
Add MQTT client id to connection closed event
-rw-r--r--deps/rabbitmq_mqtt/include/rabbit_mqtt.hrl3
-rw-r--r--deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl16
-rw-r--r--deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl10
-rw-r--r--deps/rabbitmq_mqtt/test/shared_SUITE.erl18
-rw-r--r--deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl10
5 files changed, 26 insertions, 31 deletions
diff --git a/deps/rabbitmq_mqtt/include/rabbit_mqtt.hrl b/deps/rabbitmq_mqtt/include/rabbit_mqtt.hrl
index c845240e55..e1fed692b9 100644
--- a/deps/rabbitmq_mqtt/include/rabbit_mqtt.hrl
+++ b/deps/rabbitmq_mqtt/include/rabbit_mqtt.hrl
@@ -50,7 +50,8 @@
messages_unacknowledged
]).
--define(CREATION_EVENT_KEYS,
+%% Connection opened or closed.
+-define(EVENT_KEYS,
?ITEMS ++
[name,
client_properties,
diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
index 2e57eee9b8..48bc67d45b 100644
--- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
+++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
@@ -149,7 +149,6 @@ process_connect(
{ok, RaRegisterState} ?= register_client_id(VHost, ClientId),
{TraceState, ConnName} = init_trace(VHost, ConnName0),
ok = rabbit_mqtt_keepalive:start(KeepaliveSecs, Socket),
- self() ! connection_created,
{ok,
#state{
cfg = #cfg{socket = Socket,
@@ -207,6 +206,7 @@ process_connect(State0) ->
{ok, SessPresent, State2} ?= handle_clean_sess_qos1(QoS0SessPresent, State1),
State = cache_subscriptions(SessPresent, State2),
rabbit_networking:register_non_amqp_connection(self()),
+ self() ! connection_created,
{ok, SessPresent, State}
else
{error, _} = Error ->
@@ -1190,19 +1190,9 @@ send(Packet, ProtoVer, SendFun) ->
Data = rabbit_mqtt_packet:serialise(Packet, ProtoVer),
ok = SendFun(Data).
--spec terminate(boolean(), binary(), atom(), state()) -> ok.
-terminate(SendWill, ConnName, ProtoFamily,
- State = #state{cfg = #cfg{vhost = VHost},
- auth_state = #auth_state{user = #user{username = Username}}
- }) ->
+-spec terminate(boolean(), binary(), rabbit_event:event_props(), state()) -> ok.
+terminate(SendWill, ConnName, Infos, State) ->
maybe_send_will(SendWill, ConnName, State),
- Infos = [{name, ConnName},
- {node, node()},
- {pid, self()},
- {disconnected_at, os:system_time(milli_seconds)},
- {protocol, {ProtoFamily, proto_version_tuple(State)}},
- {vhost, VHost},
- {user, Username}],
rabbit_core_metrics:connection_closed(self()),
rabbit_event:notify(connection_closed, Infos),
rabbit_networking:unregister_non_amqp_connection(self()),
diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl
index 4be368404e..f40ef2862a 100644
--- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl
+++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl
@@ -27,7 +27,6 @@
-type option(T) :: undefined | T.
-define(HIBERNATE_AFTER, 1000).
--define(PROTO_FAMILY, 'MQTT').
-record(state,
{socket :: rabbit_net:socket(),
@@ -140,7 +139,7 @@ handle_cast(QueueEvent = {queue_event, _, _},
end;
handle_cast({force_event_refresh, Ref}, State0) ->
- Infos = infos(?CREATION_EVENT_KEYS, State0),
+ Infos = infos(?EVENT_KEYS, State0),
rabbit_event:notify(connection_created, Infos, Ref),
State = rabbit_event:init_stats_timer(State0, #state.stats_timer),
{noreply, State, ?HIBERNATE_AFTER};
@@ -154,7 +153,7 @@ handle_cast(Msg, State) ->
{stop, {mqtt_unexpected_cast, Msg}, State}.
handle_info(connection_created, State) ->
- Infos = infos(?CREATION_EVENT_KEYS, State),
+ Infos = infos(?EVENT_KEYS, State),
rabbit_core_metrics:connection_created(self(), Infos),
rabbit_event:notify(connection_created, Infos),
{noreply, State, ?HIBERNATE_AFTER};
@@ -265,7 +264,8 @@ terminate(Reason, {SendWill, State = #state{conn_name = ConnName,
connect_packet_unprocessed ->
ok;
_ ->
- rabbit_mqtt_processor:terminate(SendWill, ConnName, ?PROTO_FAMILY, PState)
+ Infos = infos(?EVENT_KEYS, State),
+ rabbit_mqtt_processor:terminate(SendWill, ConnName, Infos, PState)
end,
log_terminate(Reason, State).
@@ -514,7 +514,7 @@ i(Cert, #state{socket = Sock})
i(timeout, #state{keepalive = KState}) ->
rabbit_mqtt_keepalive:interval_secs(KState);
i(protocol, #state{proc_state = ProcState}) ->
- {?PROTO_FAMILY, rabbit_mqtt_processor:proto_version_tuple(ProcState)};
+ {'MQTT', rabbit_mqtt_processor:proto_version_tuple(ProcState)};
i(Key, #state{proc_state = ProcState}) ->
rabbit_mqtt_processor:info(Key, ProcState).
diff --git a/deps/rabbitmq_mqtt/test/shared_SUITE.erl b/deps/rabbitmq_mqtt/test/shared_SUITE.erl
index c629d72efc..de94bdee7a 100644
--- a/deps/rabbitmq_mqtt/test/shared_SUITE.erl
+++ b/deps/rabbitmq_mqtt/test/shared_SUITE.erl
@@ -438,12 +438,13 @@ events(Config) ->
true -> 'Web MQTT';
false -> 'MQTT'
end,
- ExpectedConnectionProps = [{protocol, {Proto, {3,1,1}}},
- {node, Server},
- {vhost, <<"/">>},
- {user, <<"guest">>},
- {pid, ConnectionPid}],
- assert_event_prop(ExpectedConnectionProps, E1),
+ assert_event_prop([{protocol, {Proto, {3,1,1}}},
+ {node, Server},
+ {vhost, <<"/">>},
+ {user, <<"guest">>},
+ {client_properties, [{client_id, longstr, ClientId}]},
+ {pid, ConnectionPid}],
+ E1),
{ok, _, _} = emqtt:subscribe(C, <<"TopicA">>, qos0),
@@ -490,7 +491,10 @@ events(Config) ->
[E6, E7 | E8] = get_events(Server),
assert_event_type(connection_closed, E6),
- assert_event_prop(ExpectedConnectionProps, E6),
+ ?assertEqual(E1#event.props, E6#event.props,
+ "connection_closed event props should match connection_created event props. "
+ "See https://github.com/rabbitmq/rabbitmq-server/discussions/6331"),
+
case is_feature_flag_enabled(Config, rabbit_mqtt_qos0_queue) of
true ->
assert_event_type(queue_deleted, E7),
diff --git a/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl b/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl
index 2d837a7ea7..1131921beb 100644
--- a/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl
+++ b/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl
@@ -48,7 +48,6 @@
-define(CLOSE_NORMAL, 1000).
-define(CLOSE_PROTOCOL_ERROR, 1002).
-define(CLOSE_UNACCEPTABLE_DATA_TYPE, 1003).
--define(PROTO_FAMILY, 'Web MQTT').
%% cowboy_sub_protcol
upgrade(Req, Env, Handler, HandlerState) ->
@@ -164,7 +163,7 @@ websocket_info({'$gen_cast', {close_connection, Reason}}, State = #state{ proc_s
[rabbit_mqtt_processor:info(client_id, ProcState), ConnName, Reason]),
stop(State);
websocket_info({'$gen_cast', {force_event_refresh, Ref}}, State0) ->
- Infos = infos(?CREATION_EVENT_KEYS, State0),
+ Infos = infos(?EVENT_KEYS, State0),
rabbit_event:notify(connection_created, Infos, Ref),
State = rabbit_event:init_stats_timer(State0, #state.stats_timer),
{[], State, hibernate};
@@ -209,7 +208,7 @@ websocket_info({shutdown, Reason}, #state{conn_name = ConnName} = State) ->
?LOG_INFO("Web MQTT closing connection ~tp: ~tp", [ConnName, Reason]),
stop(State, ?CLOSE_NORMAL, Reason);
websocket_info(connection_created, State) ->
- Infos = infos(?CREATION_EVENT_KEYS, State),
+ Infos = infos(?EVENT_KEYS, State),
rabbit_core_metrics:connection_created(self(), Infos),
rabbit_event:notify(connection_created, Infos),
{[], State, hibernate};
@@ -231,7 +230,8 @@ terminate(_Reason, _Request,
connect_packet_unprocessed ->
ok;
_ ->
- rabbit_mqtt_processor:terminate(SendWill, ConnName, ?PROTO_FAMILY, PState)
+ Infos = infos(?EVENT_KEYS, State),
+ rabbit_mqtt_processor:terminate(SendWill, ConnName, Infos, PState)
end.
%% Internal.
@@ -395,7 +395,7 @@ i(reductions, _) ->
i(garbage_collection, _) ->
rabbit_misc:get_gc_info(self());
i(protocol, #state{proc_state = PState}) ->
- {?PROTO_FAMILY, rabbit_mqtt_processor:proto_version_tuple(PState)};
+ {'Web MQTT', rabbit_mqtt_processor:proto_version_tuple(PState)};
i(SSL, #state{socket = Sock})
when SSL =:= ssl;
SSL =:= ssl_protocol;