diff options
author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-10-23 15:39:27 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-10-23 15:39:27 +0200 |
commit | f0f5134f1b0c2b114683fc66a277d3f3cd3679fe (patch) | |
tree | 045c88ac9b72cb4fca9155dc758448bf0fb947b2 | |
parent | dd5e879949aeccad3aca29af56161bbfeee300fc (diff) | |
parent | 016a714b607eb7b7620ab9142d3195dbe45bc071 (diff) | |
download | rabbitmq-server-git-f0f5134f1b0c2b114683fc66a277d3f3cd3679fe.tar.gz |
Merge pull request #2481 from rabbitmq/add-external-connections-for-refresh-event
Add rabbit_non_amqp_connections process group
-rw-r--r-- | src/rabbit.erl | 5 | ||||
-rw-r--r-- | src/rabbit_networking.erl | 30 |
2 files changed, 33 insertions, 2 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 7d5c502eed..9248c945dc 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -1067,8 +1067,13 @@ config_locations() -> % This event is necessary for the stats timer to be initialized with % the correct values once the management agent has started force_event_refresh(Ref) -> + % direct connections, e.g. MQTT, STOMP ok = rabbit_direct:force_event_refresh(Ref), + % AMQP connections ok = rabbit_networking:force_connection_event_refresh(Ref), + % "external" connections, which are not handled by the "AMQP core", + % e.g. connections to the stream plugin + ok = rabbit_networking:force_non_amqp_connection_event_refresh(Ref), ok = rabbit_channel:force_event_refresh(Ref), ok = rabbit_amqqueue:force_event_refresh(Ref). diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index cf7b92e65e..433b1d7540 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -23,12 +23,14 @@ stop_tcp_listener/1, on_node_down/1, active_listeners/0, node_listeners/1, node_client_listeners/1, register_connection/1, unregister_connection/1, - connections/0, connection_info_keys/0, + register_non_amqp_connection/1, unregister_non_amqp_connection/1, + connections/0, non_amqp_connections/0, connection_info_keys/0, connection_info/1, connection_info/2, connection_info_all/0, connection_info_all/1, emit_connection_info_all/4, emit_connection_info_local/3, close_connection/2, close_connections/2, close_all_connections/1, - force_connection_event_refresh/1, handshake/2, tcp_host/1, + force_connection_event_refresh/1, force_non_amqp_connection_event_refresh/1, + handshake/2, tcp_host/1, ranch_ref/1, ranch_ref/2, ranch_ref_of_protocol/1, listener_of_protocol/1, stop_ranch_listener_of_protocol/1]). @@ -42,6 +44,7 @@ -export([ local_connections/0, + local_non_amqp_connections/0, %% prefer local_connections/0 connections_local/0 ]). @@ -397,6 +400,24 @@ local_connections() -> %% @deprecated Prefer {@link local_connections} connections_local() -> pg_local:get_members(rabbit_connections). +-spec register_non_amqp_connection(pid()) -> ok. + +register_non_amqp_connection(Pid) -> pg_local:join(rabbit_non_amqp_connections, Pid). + +-spec unregister_non_amqp_connection(pid()) -> ok. + +unregister_non_amqp_connection(Pid) -> pg_local:leave(rabbit_non_amqp_connections, Pid). + +-spec non_amqp_connections() -> [rabbit_types:connection()]. + +non_amqp_connections() -> + Nodes = rabbit_nodes:all_running(), + rabbit_misc:append_rpc_all_nodes(Nodes, rabbit_networking, local_non_amqp_connections, [], ?RPC_TIMEOUT). + +-spec local_non_amqp_connections() -> [rabbit_types:connection()]. +local_non_amqp_connections() -> + pg_local:get_members(rabbit_non_amqp_connections). + -spec connection_info_keys() -> rabbit_types:info_keys(). connection_info_keys() -> rabbit_reader:info_keys(). @@ -461,6 +482,11 @@ force_connection_event_refresh(Ref) -> [rabbit_reader:force_event_refresh(C, Ref) || C <- connections()], ok. +-spec force_non_amqp_connection_event_refresh(reference()) -> 'ok'. +force_non_amqp_connection_event_refresh(Ref) -> + [gen_server:cast(Pid, {force_event_refresh, Ref}) || Pid <- non_amqp_connections()], + ok. + -spec failed_to_recv_proxy_header(_, _) -> no_return(). failed_to_recv_proxy_header(Ref, Error) -> Msg = case Error of |