diff options
author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-10-23 10:17:39 +0200 |
---|---|---|
committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-10-23 10:17:39 +0200 |
commit | bd191cf40d1b6b6643c570a820bcc997d03e24b5 (patch) | |
tree | 6c9641146ef6bbced33bae15da380a496edd04c9 | |
parent | bb510b0e140b507b1920535d4514a3056856c25e (diff) | |
download | rabbitmq-server-git-bd191cf40d1b6b6643c570a820bcc997d03e24b5.tar.gz |
Register connections and handle force_event_refresh
For existing connections that must emit stats when the management plugin
is enabled.
References rabbitmq/rabbitmq-server#2481
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream_reader.erl | 22 |
1 files changed, 16 insertions, 6 deletions
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index dbc78ae2ae..b9807e1b08 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -194,18 +194,14 @@ listen_loop_pre_auth(Transport, #stream_connection{socket = S} = Connection, Sta % should be possible once the connections are available in ctl list_connections pg_local:join(rabbit_stream_connections, self()), Connection2 = rabbit_event:init_stats_timer(Connection1, #stream_connection.stats_timer), - Connection3 = rabbit_event:ensure_stats_timer(Connection2, #stream_connection.stats_timer, emit_stats), + Connection3 = ensure_stats_timer(Connection2), Infos = augment_infos_with_user_provided_connection_name( infos(?CREATION_EVENT_KEYS, Connection3, State1), Connection3 ), rabbit_core_metrics:connection_created(self(), Infos), rabbit_event:notify(connection_created, Infos), - - % FIXME emit stats? - - % FIXME handle {'$gen_cast', {force_event_refresh, Ref}} event below? - + rabbit_networking:register_external_connection(self()), listen_loop_post_auth(Transport, Connection3, State1, Configuration); failure -> close(Transport, S); @@ -249,6 +245,7 @@ listen_loop_post_auth(Transport, #stream_connection{socket = S, case Step of closing -> close(Transport, S), + rabbit_networking:unregister_external_connection(self()), notify_connection_closed(Connection1, State1); close_sent -> rabbit_log:debug("Transitioned to close_sent ~n"), @@ -391,13 +388,23 @@ listen_loop_post_auth(Transport, #stream_connection{socket = S, emit_stats -> Connection1 = emit_stats(Connection, State), listen_loop_post_auth(Transport, Connection1, State, Configuration); + {'$gen_cast', {force_event_refresh, Ref}} -> + Infos = augment_infos_with_user_provided_connection_name( + infos(?CREATION_EVENT_KEYS, Connection, State), + Connection + ), + rabbit_event:notify(connection_created, Infos, Ref), + Connection1 = rabbit_event:init_stats_timer(Connection, #stream_connection.stats_timer), + listen_loop_post_auth(Transport, Connection1, State, Configuration); {Closed, S} -> demonitor_all_streams(Connection), + rabbit_networking:unregister_external_connection(self()), notify_connection_closed(Connection, State), rabbit_log:info("Socket ~w closed [~w]~n", [S, self()]), ok; {Error, S, Reason} -> demonitor_all_streams(Connection), + rabbit_networking:unregister_external_connection(self()), notify_connection_closed(Connection, State), rabbit_log:info("Socket error ~p [~w]~n", [Reason, S, self()]); M -> @@ -419,18 +426,21 @@ listen_loop_post_close(Transport, #stream_connection{socket = S} = Connection, S closing_done -> rabbit_log:debug("Received close confirmation from client"), close(Transport, S), + rabbit_networking:unregister_external_connection(self()), notify_connection_closed(Connection1, State1); _ -> Transport:setopts(S, [{active, once}]), listen_loop_post_close(Transport, Connection1, State1, Configuration) end; {Closed, S} -> + rabbit_networking:unregister_external_connection(self()), notify_connection_closed(Connection, State), rabbit_log:info("Socket ~w closed [~w]~n", [S, self()]), ok; {Error, S, Reason} -> rabbit_log:info("Socket error ~p [~w]~n", [Reason, S, self()]), close(Transport, S), + rabbit_networking:unregister_external_connection(self()), notify_connection_closed(Connection, State); M -> rabbit_log:warning("Ignored message on closing ~p~n", [M]) |