summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-10-23 10:17:39 +0200
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-10-23 10:17:39 +0200
commitbd191cf40d1b6b6643c570a820bcc997d03e24b5 (patch)
tree6c9641146ef6bbced33bae15da380a496edd04c9
parentbb510b0e140b507b1920535d4514a3056856c25e (diff)
downloadrabbitmq-server-git-bd191cf40d1b6b6643c570a820bcc997d03e24b5.tar.gz
Register connections and handle force_event_refresh
For existing connections that must emit stats when the management plugin is enabled. References rabbitmq/rabbitmq-server#2481
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_reader.erl22
1 files changed, 16 insertions, 6 deletions
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
index dbc78ae2ae..b9807e1b08 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
@@ -194,18 +194,14 @@ listen_loop_pre_auth(Transport, #stream_connection{socket = S} = Connection, Sta
% should be possible once the connections are available in ctl list_connections
pg_local:join(rabbit_stream_connections, self()),
Connection2 = rabbit_event:init_stats_timer(Connection1, #stream_connection.stats_timer),
- Connection3 = rabbit_event:ensure_stats_timer(Connection2, #stream_connection.stats_timer, emit_stats),
+ Connection3 = ensure_stats_timer(Connection2),
Infos = augment_infos_with_user_provided_connection_name(
infos(?CREATION_EVENT_KEYS, Connection3, State1),
Connection3
),
rabbit_core_metrics:connection_created(self(), Infos),
rabbit_event:notify(connection_created, Infos),
-
- % FIXME emit stats?
-
- % FIXME handle {'$gen_cast', {force_event_refresh, Ref}} event below?
-
+ rabbit_networking:register_external_connection(self()),
listen_loop_post_auth(Transport, Connection3, State1, Configuration);
failure ->
close(Transport, S);
@@ -249,6 +245,7 @@ listen_loop_post_auth(Transport, #stream_connection{socket = S,
case Step of
closing ->
close(Transport, S),
+ rabbit_networking:unregister_external_connection(self()),
notify_connection_closed(Connection1, State1);
close_sent ->
rabbit_log:debug("Transitioned to close_sent ~n"),
@@ -391,13 +388,23 @@ listen_loop_post_auth(Transport, #stream_connection{socket = S,
emit_stats ->
Connection1 = emit_stats(Connection, State),
listen_loop_post_auth(Transport, Connection1, State, Configuration);
+ {'$gen_cast', {force_event_refresh, Ref}} ->
+ Infos = augment_infos_with_user_provided_connection_name(
+ infos(?CREATION_EVENT_KEYS, Connection, State),
+ Connection
+ ),
+ rabbit_event:notify(connection_created, Infos, Ref),
+ Connection1 = rabbit_event:init_stats_timer(Connection, #stream_connection.stats_timer),
+ listen_loop_post_auth(Transport, Connection1, State, Configuration);
{Closed, S} ->
demonitor_all_streams(Connection),
+ rabbit_networking:unregister_external_connection(self()),
notify_connection_closed(Connection, State),
rabbit_log:info("Socket ~w closed [~w]~n", [S, self()]),
ok;
{Error, S, Reason} ->
demonitor_all_streams(Connection),
+ rabbit_networking:unregister_external_connection(self()),
notify_connection_closed(Connection, State),
rabbit_log:info("Socket error ~p [~w]~n", [Reason, S, self()]);
M ->
@@ -419,18 +426,21 @@ listen_loop_post_close(Transport, #stream_connection{socket = S} = Connection, S
closing_done ->
rabbit_log:debug("Received close confirmation from client"),
close(Transport, S),
+ rabbit_networking:unregister_external_connection(self()),
notify_connection_closed(Connection1, State1);
_ ->
Transport:setopts(S, [{active, once}]),
listen_loop_post_close(Transport, Connection1, State1, Configuration)
end;
{Closed, S} ->
+ rabbit_networking:unregister_external_connection(self()),
notify_connection_closed(Connection, State),
rabbit_log:info("Socket ~w closed [~w]~n", [S, self()]),
ok;
{Error, S, Reason} ->
rabbit_log:info("Socket error ~p [~w]~n", [Reason, S, self()]),
close(Transport, S),
+ rabbit_networking:unregister_external_connection(self()),
notify_connection_closed(Connection, State);
M ->
rabbit_log:warning("Ignored message on closing ~p~n", [M])