diff options
author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-10-23 10:11:59 +0200 |
---|---|---|
committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-10-23 10:11:59 +0200 |
commit | 4f58002b519b08830352c036a4481694e01bd047 (patch) | |
tree | ca0a16035caf3587189152590d11ac97c1724413 | |
parent | dd5e879949aeccad3aca29af56161bbfeee300fc (diff) | |
download | rabbitmq-server-git-4f58002b519b08830352c036a4481694e01bd047.tar.gz |
Add rabbit_external_connections process group
This commit allows to propagate the force_event_refresh
event to non-AMQP connections. This way other types of connections can
be notified of the management plugin activation and start emitting
statistics.
This will be used for now by connections from the stream plugin.
The existing rabbit_connections process group cannot be re-used in this
case as it is used by the list_connections CLI command, and we do not
want non-AMQP connections to show up in the output.
-rw-r--r-- | src/rabbit.erl | 1 | ||||
-rw-r--r-- | src/rabbit_networking.erl | 30 |
2 files changed, 29 insertions, 2 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 7d5c502eed..e7e485dc15 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -1069,6 +1069,7 @@ config_locations() -> force_event_refresh(Ref) -> ok = rabbit_direct:force_event_refresh(Ref), ok = rabbit_networking:force_connection_event_refresh(Ref), + ok = rabbit_networking:force_external_connection_event_refresh(Ref), ok = rabbit_channel:force_event_refresh(Ref), ok = rabbit_amqqueue:force_event_refresh(Ref). diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index cf7b92e65e..cd0be8bd91 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -23,12 +23,14 @@ stop_tcp_listener/1, on_node_down/1, active_listeners/0, node_listeners/1, node_client_listeners/1, register_connection/1, unregister_connection/1, - connections/0, connection_info_keys/0, + register_external_connection/1, unregister_external_connection/1, + connections/0, external_connections/0, connection_info_keys/0, connection_info/1, connection_info/2, connection_info_all/0, connection_info_all/1, emit_connection_info_all/4, emit_connection_info_local/3, close_connection/2, close_connections/2, close_all_connections/1, - force_connection_event_refresh/1, handshake/2, tcp_host/1, + force_connection_event_refresh/1, force_external_connection_event_refresh/1, + handshake/2, tcp_host/1, ranch_ref/1, ranch_ref/2, ranch_ref_of_protocol/1, listener_of_protocol/1, stop_ranch_listener_of_protocol/1]). @@ -42,6 +44,7 @@ -export([ local_connections/0, + local_external_connections/0, %% prefer local_connections/0 connections_local/0 ]). @@ -397,6 +400,24 @@ local_connections() -> %% @deprecated Prefer {@link local_connections} connections_local() -> pg_local:get_members(rabbit_connections). +-spec register_external_connection(pid()) -> ok. + +register_external_connection(Pid) -> pg_local:join(rabbit_external_connections, Pid). + +-spec unregister_external_connection(pid()) -> ok. + +unregister_external_connection(Pid) -> pg_local:leave(rabbit_connections, Pid). + +-spec external_connections() -> [rabbit_types:connection()]. + +external_connections() -> + Nodes = rabbit_nodes:all_running(), + rabbit_misc:append_rpc_all_nodes(Nodes, rabbit_networking, local_external_connections, [], ?RPC_TIMEOUT). + +-spec local_external_connections() -> [rabbit_types:connection()]. +local_external_connections() -> + pg_local:get_members(rabbit_external_connections). + -spec connection_info_keys() -> rabbit_types:info_keys(). connection_info_keys() -> rabbit_reader:info_keys(). @@ -461,6 +482,11 @@ force_connection_event_refresh(Ref) -> [rabbit_reader:force_event_refresh(C, Ref) || C <- connections()], ok. +-spec force_external_connection_event_refresh(reference()) -> 'ok'. +force_external_connection_event_refresh(Ref) -> + [gen_server:cast(Pid, {force_event_refresh, Ref}) || Pid <- external_connections()], + ok. + -spec failed_to_recv_proxy_header(_, _) -> no_return(). failed_to_recv_proxy_header(Ref, Error) -> Msg = case Error of |