summaryrefslogtreecommitdiff
path: root/deps/rabbit/src/rabbit_connection_tracking.erl
diff options
context:
space:
mode:
Diffstat (limited to 'deps/rabbit/src/rabbit_connection_tracking.erl')
-rw-r--r--deps/rabbit/src/rabbit_connection_tracking.erl515
1 files changed, 515 insertions, 0 deletions
diff --git a/deps/rabbit/src/rabbit_connection_tracking.erl b/deps/rabbit/src/rabbit_connection_tracking.erl
new file mode 100644
index 0000000000..c0704e6a7c
--- /dev/null
+++ b/deps/rabbit/src/rabbit_connection_tracking.erl
@@ -0,0 +1,515 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_connection_tracking).
+
+%% Abstracts away how tracked connection records are stored
+%% and queried.
+%%
+%% See also:
+%%
+%% * rabbit_connection_tracking_handler
+%% * rabbit_reader
+%% * rabbit_event
+-behaviour(rabbit_tracking).
+
+-export([boot/0,
+ update_tracked/1,
+ handle_cast/1,
+ register_tracked/1,
+ unregister_tracked/1,
+ count_tracked_items_in/1,
+ clear_tracking_tables/0,
+ shutdown_tracked_items/2]).
+
+-export([ensure_tracked_connections_table_for_node/1,
+ ensure_per_vhost_tracked_connections_table_for_node/1,
+ ensure_per_user_tracked_connections_table_for_node/1,
+
+ ensure_tracked_connections_table_for_this_node/0,
+ ensure_per_vhost_tracked_connections_table_for_this_node/0,
+ ensure_per_user_tracked_connections_table_for_this_node/0,
+
+ tracked_connection_table_name_for/1,
+ tracked_connection_per_vhost_table_name_for/1,
+ tracked_connection_per_user_table_name_for/1,
+ get_all_tracked_connection_table_names_for_node/1,
+
+ delete_tracked_connections_table_for_node/1,
+ delete_per_vhost_tracked_connections_table_for_node/1,
+ delete_per_user_tracked_connections_table_for_node/1,
+ delete_tracked_connection_user_entry/1,
+ delete_tracked_connection_vhost_entry/1,
+
+ clear_tracked_connection_tables_for_this_node/0,
+
+ list/0, list/1, list_on_node/1, list_on_node/2, list_of_user/1,
+ tracked_connection_from_connection_created/1,
+ tracked_connection_from_connection_state/1,
+ lookup/1,
+ count/0]).
+
+-include_lib("rabbit.hrl").
+
+-import(rabbit_misc, [pget/2]).
+
+-export([close_connections/3]).
+
+%%
+%% API
+%%
+
+%% Behaviour callbacks
+
+-spec boot() -> ok.
+
+%% Sets up and resets connection tracking tables for this
+%% node.
+boot() ->
+ ensure_tracked_connections_table_for_this_node(),
+ rabbit_log:info("Setting up a table for connection tracking on this node: ~p",
+ [tracked_connection_table_name_for(node())]),
+ ensure_per_vhost_tracked_connections_table_for_this_node(),
+ rabbit_log:info("Setting up a table for per-vhost connection counting on this node: ~p",
+ [tracked_connection_per_vhost_table_name_for(node())]),
+ ensure_per_user_tracked_connections_table_for_this_node(),
+ rabbit_log:info("Setting up a table for per-user connection counting on this node: ~p",
+ [tracked_connection_per_user_table_name_for(node())]),
+ clear_tracking_tables(),
+ ok.
+
+-spec update_tracked(term()) -> ok.
+
+update_tracked(Event) ->
+ spawn(?MODULE, handle_cast, [Event]),
+ ok.
+
+%% Asynchronously handle update events
+-spec handle_cast(term()) -> ok.
+
+handle_cast({connection_created, Details}) ->
+ ThisNode = node(),
+ case pget(node, Details) of
+ ThisNode ->
+ TConn = tracked_connection_from_connection_created(Details),
+ ConnId = TConn#tracked_connection.id,
+ try
+ register_tracked(TConn)
+ catch
+ error:{no_exists, _} ->
+ Msg = "Could not register connection ~p for tracking, "
+ "its table is not ready yet or the connection terminated prematurely",
+ rabbit_log_connection:warning(Msg, [ConnId]),
+ ok;
+ error:Err ->
+ Msg = "Could not register connection ~p for tracking: ~p",
+ rabbit_log_connection:warning(Msg, [ConnId, Err]),
+ ok
+ end;
+ _OtherNode ->
+ %% ignore
+ ok
+ end;
+handle_cast({connection_closed, Details}) ->
+ ThisNode = node(),
+ case pget(node, Details) of
+ ThisNode ->
+ %% [{name,<<"127.0.0.1:64078 -> 127.0.0.1:5672">>},
+ %% {pid,<0.1774.0>},
+ %% {node, rabbit@hostname}]
+ unregister_tracked(
+ rabbit_tracking:id(ThisNode, pget(name, Details)));
+ _OtherNode ->
+ %% ignore
+ ok
+ end;
+handle_cast({vhost_deleted, Details}) ->
+ VHost = pget(name, Details),
+ %% Schedule vhost entry deletion, allowing time for connections to close
+ _ = timer:apply_after(?TRACKING_EXECUTION_TIMEOUT, ?MODULE,
+ delete_tracked_connection_vhost_entry, [VHost]),
+ rabbit_log_connection:info("Closing all connections in vhost '~s' because it's being deleted", [VHost]),
+ shutdown_tracked_items(
+ rabbit_connection_tracking:list(VHost),
+ rabbit_misc:format("vhost '~s' is deleted", [VHost]));
+%% Note: under normal circumstances this will be called immediately
+%% after the vhost_deleted above. Therefore we should be careful about
+%% what we log and be more defensive.
+handle_cast({vhost_down, Details}) ->
+ VHost = pget(name, Details),
+ Node = pget(node, Details),
+ rabbit_log_connection:info("Closing all connections in vhost '~s' on node '~s'"
+ " because the vhost is stopping",
+ [VHost, Node]),
+ shutdown_tracked_items(
+ rabbit_connection_tracking:list_on_node(Node, VHost),
+ rabbit_misc:format("vhost '~s' is down", [VHost]));
+handle_cast({user_deleted, Details}) ->
+ Username = pget(name, Details),
+ %% Schedule user entry deletion, allowing time for connections to close
+ _ = timer:apply_after(?TRACKING_EXECUTION_TIMEOUT, ?MODULE,
+ delete_tracked_connection_user_entry, [Username]),
+ rabbit_log_connection:info("Closing all connections from user '~s' because it's being deleted", [Username]),
+ shutdown_tracked_items(
+ rabbit_connection_tracking:list_of_user(Username),
+ rabbit_misc:format("user '~s' is deleted", [Username]));
+%% A node had been deleted from the cluster.
+handle_cast({node_deleted, Details}) ->
+ Node = pget(node, Details),
+ rabbit_log_connection:info("Node '~s' was removed from the cluster, deleting its connection tracking tables...", [Node]),
+ delete_tracked_connections_table_for_node(Node),
+ delete_per_vhost_tracked_connections_table_for_node(Node),
+ delete_per_user_tracked_connections_table_for_node(Node).
+
+-spec register_tracked(rabbit_types:tracked_connection()) -> ok.
+-dialyzer([{nowarn_function, [register_tracked/1]}, race_conditions]).
+
+register_tracked(#tracked_connection{username = Username, vhost = VHost, id = ConnId, node = Node} = Conn) when Node =:= node() ->
+ TableName = tracked_connection_table_name_for(Node),
+ PerVhostTableName = tracked_connection_per_vhost_table_name_for(Node),
+ PerUserConnTableName = tracked_connection_per_user_table_name_for(Node),
+ %% upsert
+ case mnesia:dirty_read(TableName, ConnId) of
+ [] ->
+ mnesia:dirty_write(TableName, Conn),
+ mnesia:dirty_update_counter(PerVhostTableName, VHost, 1),
+ mnesia:dirty_update_counter(PerUserConnTableName, Username, 1);
+ [#tracked_connection{}] ->
+ ok
+ end,
+ ok.
+
+-spec unregister_tracked(rabbit_types:tracked_connection_id()) -> ok.
+
+unregister_tracked(ConnId = {Node, _Name}) when Node =:= node() ->
+ TableName = tracked_connection_table_name_for(Node),
+ PerVhostTableName = tracked_connection_per_vhost_table_name_for(Node),
+ PerUserConnTableName = tracked_connection_per_user_table_name_for(Node),
+ case mnesia:dirty_read(TableName, ConnId) of
+ [] -> ok;
+ [#tracked_connection{vhost = VHost, username = Username}] ->
+ mnesia:dirty_update_counter(PerUserConnTableName, Username, -1),
+ mnesia:dirty_update_counter(PerVhostTableName, VHost, -1),
+ mnesia:dirty_delete(TableName, ConnId)
+ end.
+
+-spec count_tracked_items_in({atom(), rabbit_types:vhost()}) -> non_neg_integer().
+
+count_tracked_items_in({vhost, VirtualHost}) ->
+ rabbit_tracking:count_tracked_items(
+ fun tracked_connection_per_vhost_table_name_for/1,
+ #tracked_connection_per_vhost.connection_count, VirtualHost,
+ "connections in vhost");
+count_tracked_items_in({user, Username}) ->
+ rabbit_tracking:count_tracked_items(
+ fun tracked_connection_per_user_table_name_for/1,
+ #tracked_connection_per_user.connection_count, Username,
+ "connections for user").
+
+-spec clear_tracking_tables() -> ok.
+
+clear_tracking_tables() ->
+ clear_tracked_connection_tables_for_this_node().
+
+-spec shutdown_tracked_items(list(), term()) -> ok.
+
+shutdown_tracked_items(TrackedItems, Message) ->
+ close_connections(TrackedItems, Message).
+
+%% Extended API
+
+-spec ensure_tracked_connections_table_for_this_node() -> ok.
+
+ensure_tracked_connections_table_for_this_node() ->
+ ensure_tracked_connections_table_for_node(node()).
+
+
+-spec ensure_per_vhost_tracked_connections_table_for_this_node() -> ok.
+
+ensure_per_vhost_tracked_connections_table_for_this_node() ->
+ ensure_per_vhost_tracked_connections_table_for_node(node()).
+
+
+-spec ensure_per_user_tracked_connections_table_for_this_node() -> ok.
+
+ensure_per_user_tracked_connections_table_for_this_node() ->
+ ensure_per_user_tracked_connections_table_for_node(node()).
+
+
+%% Create tables
+-spec ensure_tracked_connections_table_for_node(node()) -> ok.
+
+ensure_tracked_connections_table_for_node(Node) ->
+ TableName = tracked_connection_table_name_for(Node),
+ case mnesia:create_table(TableName, [{record_name, tracked_connection},
+ {attributes, record_info(fields, tracked_connection)}]) of
+ {atomic, ok} -> ok;
+ {aborted, {already_exists, _}} -> ok;
+ {aborted, Error} ->
+ rabbit_log:error("Failed to create a tracked connection table for node ~p: ~p", [Node, Error]),
+ ok
+ end.
+
+-spec ensure_per_vhost_tracked_connections_table_for_node(node()) -> ok.
+
+ensure_per_vhost_tracked_connections_table_for_node(Node) ->
+ TableName = tracked_connection_per_vhost_table_name_for(Node),
+ case mnesia:create_table(TableName, [{record_name, tracked_connection_per_vhost},
+ {attributes, record_info(fields, tracked_connection_per_vhost)}]) of
+ {atomic, ok} -> ok;
+ {aborted, {already_exists, _}} -> ok;
+ {aborted, Error} ->
+ rabbit_log:error("Failed to create a per-vhost tracked connection table for node ~p: ~p", [Node, Error]),
+ ok
+ end.
+
+-spec ensure_per_user_tracked_connections_table_for_node(node()) -> ok.
+
+ensure_per_user_tracked_connections_table_for_node(Node) ->
+ TableName = tracked_connection_per_user_table_name_for(Node),
+ case mnesia:create_table(TableName, [{record_name, tracked_connection_per_user},
+ {attributes, record_info(fields, tracked_connection_per_user)}]) of
+ {atomic, ok} -> ok;
+ {aborted, {already_exists, _}} -> ok;
+ {aborted, Error} ->
+ rabbit_log:error("Failed to create a per-user tracked connection table for node ~p: ~p", [Node, Error]),
+ ok
+ end.
+
+-spec clear_tracked_connection_tables_for_this_node() -> ok.
+
+clear_tracked_connection_tables_for_this_node() ->
+ [rabbit_tracking:clear_tracking_table(T)
+ || T <- get_all_tracked_connection_table_names_for_node(node())],
+ ok.
+
+-spec delete_tracked_connections_table_for_node(node()) -> ok.
+
+delete_tracked_connections_table_for_node(Node) ->
+ TableName = tracked_connection_table_name_for(Node),
+ rabbit_tracking:delete_tracking_table(TableName, Node, "tracked connection").
+
+-spec delete_per_vhost_tracked_connections_table_for_node(node()) -> ok.
+
+delete_per_vhost_tracked_connections_table_for_node(Node) ->
+ TableName = tracked_connection_per_vhost_table_name_for(Node),
+ rabbit_tracking:delete_tracking_table(TableName, Node,
+ "per-vhost tracked connection").
+
+-spec delete_per_user_tracked_connections_table_for_node(node()) -> ok.
+
+delete_per_user_tracked_connections_table_for_node(Node) ->
+ TableName = tracked_connection_per_user_table_name_for(Node),
+ rabbit_tracking:delete_tracking_table(TableName, Node,
+ "per-user tracked connection").
+
+-spec tracked_connection_table_name_for(node()) -> atom().
+
+tracked_connection_table_name_for(Node) ->
+ list_to_atom(rabbit_misc:format("tracked_connection_on_node_~s", [Node])).
+
+-spec tracked_connection_per_vhost_table_name_for(node()) -> atom().
+
+tracked_connection_per_vhost_table_name_for(Node) ->
+ list_to_atom(rabbit_misc:format("tracked_connection_per_vhost_on_node_~s", [Node])).
+
+-spec tracked_connection_per_user_table_name_for(node()) -> atom().
+
+tracked_connection_per_user_table_name_for(Node) ->
+ list_to_atom(rabbit_misc:format(
+ "tracked_connection_table_per_user_on_node_~s", [Node])).
+
+-spec get_all_tracked_connection_table_names_for_node(node()) -> [atom()].
+
+get_all_tracked_connection_table_names_for_node(Node) ->
+ [tracked_connection_table_name_for(Node),
+ tracked_connection_per_vhost_table_name_for(Node),
+ tracked_connection_per_user_table_name_for(Node)].
+
+-spec lookup(rabbit_types:connection_name()) -> rabbit_types:tracked_connection() | 'not_found'.
+
+lookup(Name) ->
+ Nodes = rabbit_nodes:all_running(),
+ lookup(Name, Nodes).
+
+lookup(_, []) ->
+ not_found;
+lookup(Name, [Node | Nodes]) ->
+ TableName = tracked_connection_table_name_for(Node),
+ case mnesia:dirty_read(TableName, {Node, Name}) of
+ [] -> lookup(Name, Nodes);
+ [Row] -> Row
+ end.
+
+-spec list() -> [rabbit_types:tracked_connection()].
+
+list() ->
+ lists:foldl(
+ fun (Node, Acc) ->
+ Tab = tracked_connection_table_name_for(Node),
+ Acc ++ mnesia:dirty_match_object(Tab, #tracked_connection{_ = '_'})
+ end, [], rabbit_nodes:all_running()).
+
+-spec count() -> non_neg_integer().
+
+count() ->
+ lists:foldl(
+ fun (Node, Acc) ->
+ Tab = tracked_connection_table_name_for(Node),
+ Acc + mnesia:table_info(Tab, size)
+ end, 0, rabbit_nodes:all_running()).
+
+-spec list(rabbit_types:vhost()) -> [rabbit_types:tracked_connection()].
+
+list(VHost) ->
+ rabbit_tracking:match_tracked_items(
+ fun tracked_connection_table_name_for/1,
+ #tracked_connection{vhost = VHost, _ = '_'}).
+
+-spec list_on_node(node()) -> [rabbit_types:tracked_connection()].
+
+list_on_node(Node) ->
+ try mnesia:dirty_match_object(
+ tracked_connection_table_name_for(Node),
+ #tracked_connection{_ = '_'})
+ catch exit:{aborted, {no_exists, _}} -> []
+ end.
+
+-spec list_on_node(node(), rabbit_types:vhost()) -> [rabbit_types:tracked_connection()].
+
+list_on_node(Node, VHost) ->
+ try mnesia:dirty_match_object(
+ tracked_connection_table_name_for(Node),
+ #tracked_connection{vhost = VHost, _ = '_'})
+ catch exit:{aborted, {no_exists, _}} -> []
+ end.
+
+
+-spec list_of_user(rabbit_types:username()) -> [rabbit_types:tracked_connection()].
+
+list_of_user(Username) ->
+ rabbit_tracking:match_tracked_items(
+ fun tracked_connection_table_name_for/1,
+ #tracked_connection{username = Username, _ = '_'}).
+
+%% Internal, delete tracked entries
+
+delete_tracked_connection_vhost_entry(Vhost) ->
+ rabbit_tracking:delete_tracked_entry(
+ {rabbit_vhost, exists, [Vhost]},
+ fun tracked_connection_per_vhost_table_name_for/1,
+ Vhost).
+
+delete_tracked_connection_user_entry(Username) ->
+ rabbit_tracking:delete_tracked_entry(
+ {rabbit_auth_backend_internal, exists, [Username]},
+ fun tracked_connection_per_user_table_name_for/1,
+ Username).
+
+%% Returns a #tracked_connection from connection_created
+%% event details.
+%%
+%% @see rabbit_connection_tracking_handler.
+tracked_connection_from_connection_created(EventDetails) ->
+ %% Example event:
+ %%
+ %% [{type,network},
+ %% {pid,<0.329.0>},
+ %% {name,<<"127.0.0.1:60998 -> 127.0.0.1:5672">>},
+ %% {port,5672},
+ %% {peer_port,60998},
+ %% {host,{0,0,0,0,0,65535,32512,1}},
+ %% {peer_host,{0,0,0,0,0,65535,32512,1}},
+ %% {ssl,false},
+ %% {peer_cert_subject,''},
+ %% {peer_cert_issuer,''},
+ %% {peer_cert_validity,''},
+ %% {auth_mechanism,<<"PLAIN">>},
+ %% {ssl_protocol,''},
+ %% {ssl_key_exchange,''},
+ %% {ssl_cipher,''},
+ %% {ssl_hash,''},
+ %% {protocol,{0,9,1}},
+ %% {user,<<"guest">>},
+ %% {vhost,<<"/">>},
+ %% {timeout,14},
+ %% {frame_max,131072},
+ %% {channel_max,65535},
+ %% {client_properties,
+ %% [{<<"capabilities">>,table,
+ %% [{<<"publisher_confirms">>,bool,true},
+ %% {<<"consumer_cancel_notify">>,bool,true},
+ %% {<<"exchange_exchange_bindings">>,bool,true},
+ %% {<<"basic.nack">>,bool,true},
+ %% {<<"connection.blocked">>,bool,true},
+ %% {<<"authentication_failure_close">>,bool,true}]},
+ %% {<<"product">>,longstr,<<"Bunny">>},
+ %% {<<"platform">>,longstr,
+ %% <<"ruby 2.3.0p0 (2015-12-25 revision 53290) [x86_64-darwin15]">>},
+ %% {<<"version">>,longstr,<<"2.3.0.pre">>},
+ %% {<<"information">>,longstr,
+ %% <<"http://rubybunny.info">>}]},
+ %% {connected_at,1453214290847}]
+ Name = pget(name, EventDetails),
+ Node = pget(node, EventDetails),
+ #tracked_connection{id = rabbit_tracking:id(Node, Name),
+ name = Name,
+ node = Node,
+ vhost = pget(vhost, EventDetails),
+ username = pget(user, EventDetails),
+ connected_at = pget(connected_at, EventDetails),
+ pid = pget(pid, EventDetails),
+ type = pget(type, EventDetails),
+ peer_host = pget(peer_host, EventDetails),
+ peer_port = pget(peer_port, EventDetails)}.
+
+tracked_connection_from_connection_state(#connection{
+ vhost = VHost,
+ connected_at = Ts,
+ peer_host = PeerHost,
+ peer_port = PeerPort,
+ user = Username,
+ name = Name
+ }) ->
+ tracked_connection_from_connection_created(
+ [{name, Name},
+ {node, node()},
+ {vhost, VHost},
+ {user, Username},
+ {user_who_performed_action, Username},
+ {connected_at, Ts},
+ {pid, self()},
+ {type, network},
+ {peer_port, PeerPort},
+ {peer_host, PeerHost}]).
+
+close_connections(Tracked, Message) ->
+ close_connections(Tracked, Message, 0).
+
+close_connections(Tracked, Message, Delay) ->
+ [begin
+ close_connection(Conn, Message),
+ timer:sleep(Delay)
+ end || Conn <- Tracked],
+ ok.
+
+close_connection(#tracked_connection{pid = Pid, type = network}, Message) ->
+ try
+ rabbit_networking:close_connection(Pid, Message)
+ catch error:{not_a_connection, _} ->
+ %% could has been closed concurrently, or the input
+ %% is bogus. In any case, we should not terminate
+ ok;
+ _:Err ->
+ %% ignore, don't terminate
+ rabbit_log:warning("Could not close connection ~p: ~p", [Pid, Err]),
+ ok
+ end;
+close_connection(#tracked_connection{pid = Pid, type = direct}, Message) ->
+ %% Do an RPC call to the node running the direct client.
+ Node = node(Pid),
+ rpc:call(Node, amqp_direct_connection, server_close, [Pid, 320, Message]).