summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-10-23 15:39:27 +0200
committerGitHub <noreply@github.com>2020-10-23 15:39:27 +0200
commitf0f5134f1b0c2b114683fc66a277d3f3cd3679fe (patch)
tree045c88ac9b72cb4fca9155dc758448bf0fb947b2
parentdd5e879949aeccad3aca29af56161bbfeee300fc (diff)
parent016a714b607eb7b7620ab9142d3195dbe45bc071 (diff)
downloadrabbitmq-server-git-f0f5134f1b0c2b114683fc66a277d3f3cd3679fe.tar.gz
Merge pull request #2481 from rabbitmq/add-external-connections-for-refresh-event
Add rabbit_non_amqp_connections process group
-rw-r--r--src/rabbit.erl5
-rw-r--r--src/rabbit_networking.erl30
2 files changed, 33 insertions, 2 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 7d5c502eed..9248c945dc 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -1067,8 +1067,13 @@ config_locations() ->
% This event is necessary for the stats timer to be initialized with
% the correct values once the management agent has started
force_event_refresh(Ref) ->
+ % direct connections, e.g. MQTT, STOMP
ok = rabbit_direct:force_event_refresh(Ref),
+ % AMQP connections
ok = rabbit_networking:force_connection_event_refresh(Ref),
+ % "external" connections, which are not handled by the "AMQP core",
+ % e.g. connections to the stream plugin
+ ok = rabbit_networking:force_non_amqp_connection_event_refresh(Ref),
ok = rabbit_channel:force_event_refresh(Ref),
ok = rabbit_amqqueue:force_event_refresh(Ref).
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index cf7b92e65e..433b1d7540 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -23,12 +23,14 @@
stop_tcp_listener/1, on_node_down/1, active_listeners/0,
node_listeners/1, node_client_listeners/1,
register_connection/1, unregister_connection/1,
- connections/0, connection_info_keys/0,
+ register_non_amqp_connection/1, unregister_non_amqp_connection/1,
+ connections/0, non_amqp_connections/0, connection_info_keys/0,
connection_info/1, connection_info/2,
connection_info_all/0, connection_info_all/1,
emit_connection_info_all/4, emit_connection_info_local/3,
close_connection/2, close_connections/2, close_all_connections/1,
- force_connection_event_refresh/1, handshake/2, tcp_host/1,
+ force_connection_event_refresh/1, force_non_amqp_connection_event_refresh/1,
+ handshake/2, tcp_host/1,
ranch_ref/1, ranch_ref/2, ranch_ref_of_protocol/1,
listener_of_protocol/1, stop_ranch_listener_of_protocol/1]).
@@ -42,6 +44,7 @@
-export([
local_connections/0,
+ local_non_amqp_connections/0,
%% prefer local_connections/0
connections_local/0
]).
@@ -397,6 +400,24 @@ local_connections() ->
%% @deprecated Prefer {@link local_connections}
connections_local() -> pg_local:get_members(rabbit_connections).
+-spec register_non_amqp_connection(pid()) -> ok.
+
+register_non_amqp_connection(Pid) -> pg_local:join(rabbit_non_amqp_connections, Pid).
+
+-spec unregister_non_amqp_connection(pid()) -> ok.
+
+unregister_non_amqp_connection(Pid) -> pg_local:leave(rabbit_non_amqp_connections, Pid).
+
+-spec non_amqp_connections() -> [rabbit_types:connection()].
+
+non_amqp_connections() ->
+ Nodes = rabbit_nodes:all_running(),
+ rabbit_misc:append_rpc_all_nodes(Nodes, rabbit_networking, local_non_amqp_connections, [], ?RPC_TIMEOUT).
+
+-spec local_non_amqp_connections() -> [rabbit_types:connection()].
+local_non_amqp_connections() ->
+ pg_local:get_members(rabbit_non_amqp_connections).
+
-spec connection_info_keys() -> rabbit_types:info_keys().
connection_info_keys() -> rabbit_reader:info_keys().
@@ -461,6 +482,11 @@ force_connection_event_refresh(Ref) ->
[rabbit_reader:force_event_refresh(C, Ref) || C <- connections()],
ok.
+-spec force_non_amqp_connection_event_refresh(reference()) -> 'ok'.
+force_non_amqp_connection_event_refresh(Ref) ->
+ [gen_server:cast(Pid, {force_event_refresh, Ref}) || Pid <- non_amqp_connections()],
+ ok.
+
-spec failed_to_recv_proxy_header(_, _) -> no_return().
failed_to_recv_proxy_header(Ref, Error) ->
Msg = case Error of