summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-10-23 10:11:59 +0200
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-10-23 10:11:59 +0200
commit4f58002b519b08830352c036a4481694e01bd047 (patch)
treeca0a16035caf3587189152590d11ac97c1724413
parentdd5e879949aeccad3aca29af56161bbfeee300fc (diff)
downloadrabbitmq-server-git-4f58002b519b08830352c036a4481694e01bd047.tar.gz
Add rabbit_external_connections process group
This commit allows to propagate the force_event_refresh event to non-AMQP connections. This way other types of connections can be notified of the management plugin activation and start emitting statistics. This will be used for now by connections from the stream plugin. The existing rabbit_connections process group cannot be re-used in this case as it is used by the list_connections CLI command, and we do not want non-AMQP connections to show up in the output.
-rw-r--r--src/rabbit.erl1
-rw-r--r--src/rabbit_networking.erl30
2 files changed, 29 insertions, 2 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 7d5c502eed..e7e485dc15 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -1069,6 +1069,7 @@ config_locations() ->
force_event_refresh(Ref) ->
ok = rabbit_direct:force_event_refresh(Ref),
ok = rabbit_networking:force_connection_event_refresh(Ref),
+ ok = rabbit_networking:force_external_connection_event_refresh(Ref),
ok = rabbit_channel:force_event_refresh(Ref),
ok = rabbit_amqqueue:force_event_refresh(Ref).
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index cf7b92e65e..cd0be8bd91 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -23,12 +23,14 @@
stop_tcp_listener/1, on_node_down/1, active_listeners/0,
node_listeners/1, node_client_listeners/1,
register_connection/1, unregister_connection/1,
- connections/0, connection_info_keys/0,
+ register_external_connection/1, unregister_external_connection/1,
+ connections/0, external_connections/0, connection_info_keys/0,
connection_info/1, connection_info/2,
connection_info_all/0, connection_info_all/1,
emit_connection_info_all/4, emit_connection_info_local/3,
close_connection/2, close_connections/2, close_all_connections/1,
- force_connection_event_refresh/1, handshake/2, tcp_host/1,
+ force_connection_event_refresh/1, force_external_connection_event_refresh/1,
+ handshake/2, tcp_host/1,
ranch_ref/1, ranch_ref/2, ranch_ref_of_protocol/1,
listener_of_protocol/1, stop_ranch_listener_of_protocol/1]).
@@ -42,6 +44,7 @@
-export([
local_connections/0,
+ local_external_connections/0,
%% prefer local_connections/0
connections_local/0
]).
@@ -397,6 +400,24 @@ local_connections() ->
%% @deprecated Prefer {@link local_connections}
connections_local() -> pg_local:get_members(rabbit_connections).
+-spec register_external_connection(pid()) -> ok.
+
+register_external_connection(Pid) -> pg_local:join(rabbit_external_connections, Pid).
+
+-spec unregister_external_connection(pid()) -> ok.
+
+unregister_external_connection(Pid) -> pg_local:leave(rabbit_connections, Pid).
+
+-spec external_connections() -> [rabbit_types:connection()].
+
+external_connections() ->
+ Nodes = rabbit_nodes:all_running(),
+ rabbit_misc:append_rpc_all_nodes(Nodes, rabbit_networking, local_external_connections, [], ?RPC_TIMEOUT).
+
+-spec local_external_connections() -> [rabbit_types:connection()].
+local_external_connections() ->
+ pg_local:get_members(rabbit_external_connections).
+
-spec connection_info_keys() -> rabbit_types:info_keys().
connection_info_keys() -> rabbit_reader:info_keys().
@@ -461,6 +482,11 @@ force_connection_event_refresh(Ref) ->
[rabbit_reader:force_event_refresh(C, Ref) || C <- connections()],
ok.
+-spec force_external_connection_event_refresh(reference()) -> 'ok'.
+force_external_connection_event_refresh(Ref) ->
+ [gen_server:cast(Pid, {force_event_refresh, Ref}) || Pid <- external_connections()],
+ ok.
+
-spec failed_to_recv_proxy_header(_, _) -> no_return().
failed_to_recv_proxy_header(Ref, Error) ->
Msg = case Error of