summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordcorbacho <dparracorbac@vmware.com>2022-07-20 16:24:48 +0200
committerMergify <37929162+mergify[bot]@users.noreply.github.com>2022-07-21 11:17:49 +0000
commit1720358e26102bbdbd88824aedf077191a744245 (patch)
treefa7f157d936bae6ebc7ab9cc6d8edffc877f203f
parent730787059b5df467157b730dd17a242cdea69a9e (diff)
downloadrabbitmq-server-git-tracking-to-ets-backup.tar.gz
Move mnesia tracking tables to ets tablestracking-to-ets-backup
The original cluster-wide tracking tables were transformed into local tables, which are ram only. They can be moved into ets tables making its use much simpler
-rw-r--r--deps/rabbit/src/rabbit_channel_tracking.erl145
-rw-r--r--deps/rabbit/src/rabbit_connection_tracking.erl258
-rw-r--r--deps/rabbit/src/rabbit_tracking.erl83
-rw-r--r--deps/rabbit/src/rabbit_tracking_store.erl47
-rw-r--r--deps/rabbit/test/per_user_connection_channel_tracking_SUITE.erl38
5 files changed, 219 insertions, 352 deletions
diff --git a/deps/rabbit/src/rabbit_channel_tracking.erl b/deps/rabbit/src/rabbit_channel_tracking.erl
index bb798645b4..b608bfe01b 100644
--- a/deps/rabbit/src/rabbit_channel_tracking.erl
+++ b/deps/rabbit/src/rabbit_channel_tracking.erl
@@ -27,15 +27,15 @@
shutdown_tracked_items/2]).
-export([list/0, list_of_user/1, list_on_node/1,
- tracked_channel_table_name_for/1,
- tracked_channel_per_user_table_name_for/1,
- get_all_tracked_channel_table_names_for_node/1,
delete_tracked_channel_user_entry/1]).
-include_lib("rabbit_common/include/rabbit.hrl").
-import(rabbit_misc, [pget/2]).
+-define(TRACKED_CHANNEL_TABLE, tracked_channel).
+-define(TRACKED_CHANNEL_TABLE_PER_USER, tracked_channel_per_user).
+
%%
%% API
%%
@@ -44,13 +44,11 @@
-spec boot() -> ok.
boot() ->
- ensure_tracked_channels_table_for_this_node(),
+ ensure_tracked_tables_for_this_node(),
rabbit_log:info("Setting up a table for channel tracking on this node: ~p",
- [tracked_channel_table_name_for(node())]),
- ensure_per_user_tracked_channels_table_for_node(),
+ [?TRACKED_CHANNEL_TABLE]),
rabbit_log:info("Setting up a table for channel tracking on this node: ~p",
- [tracked_channel_per_user_table_name_for(node())]),
- clear_tracking_tables(),
+ [?TRACKED_CHANNEL_TABLE_PER_USER]),
ok.
-spec update_tracked(term()) -> ok.
@@ -115,51 +113,41 @@ handle_cast({user_deleted, Details}) ->
_ = timer:apply_after(?TRACKING_EXECUTION_TIMEOUT, ?MODULE,
delete_tracked_channel_user_entry, [Username]),
ok;
-handle_cast({node_deleted, Details}) ->
- Node = pget(node, Details),
- rabbit_log_channel:info(
- "Node '~s' was removed from the cluster, deleting"
- " its channel tracking tables...", [Node]),
- delete_tracked_channels_table_for_node(Node),
- delete_per_user_tracked_channels_table_for_node(Node).
+handle_cast({node_deleted, _Details}) ->
+ ok.
-spec register_tracked(rabbit_types:tracked_channel()) -> ok.
-dialyzer([{nowarn_function, [register_tracked/1]}]).
register_tracked(TrackedCh =
- #tracked_channel{node = Node, name = Name, username = Username}) ->
+ #tracked_channel{node = Node, name = Name, username = Username}) when Node == node() ->
ChId = rabbit_tracking:id(Node, Name),
- TableName = tracked_channel_table_name_for(Node),
- PerUserChTableName = tracked_channel_per_user_table_name_for(Node),
%% upsert
- case mnesia:dirty_read(TableName, ChId) of
- [] ->
- mnesia:dirty_write(TableName, TrackedCh),
- mnesia:dirty_update_counter(PerUserChTableName, Username, 1);
- [#tracked_channel{}] ->
- ok
+ case ets:lookup(?TRACKED_CHANNEL_TABLE, ChId) of
+ [] ->
+ ets:insert(?TRACKED_CHANNEL_TABLE, TrackedCh),
+ ets:update_counter(?TRACKED_CHANNEL_TABLE_PER_USER, Username, 1, {Username, 0});
+ [#tracked_channel{}] ->
+ ok
end,
ok.
-spec unregister_tracked(rabbit_types:tracked_channel_id()) -> ok.
unregister_tracked(ChId = {Node, _Name}) when Node =:= node() ->
- TableName = tracked_channel_table_name_for(Node),
- PerUserChannelTableName = tracked_channel_per_user_table_name_for(Node),
- case mnesia:dirty_read(TableName, ChId) of
+ case ets:lookup(?TRACKED_CHANNEL_TABLE, ChId) of
[] -> ok;
[#tracked_channel{username = Username}] ->
- mnesia:dirty_update_counter(PerUserChannelTableName, Username, -1),
- mnesia:dirty_delete(TableName, ChId)
+ ets:update_counter(?TRACKED_CHANNEL_TABLE_PER_USER, Username, -1),
+ ets:delete(?TRACKED_CHANNEL_TABLE, ChId)
end.
-spec count_tracked_items_in({atom(), rabbit_types:username()}) -> non_neg_integer().
count_tracked_items_in({user, Username}) ->
rabbit_tracking:count_tracked_items(
- fun tracked_channel_per_user_table_name_for/1,
- #tracked_channel_per_user.channel_count, Username,
- "channels in vhost").
+ ?TRACKED_CHANNEL_TABLE_PER_USER, Username,
+ "channels in vhost").
-spec clear_tracking_tables() -> ok.
@@ -178,109 +166,52 @@ shutdown_tracked_items(TrackedItems, _Args) ->
list() ->
lists:foldl(
fun (Node, Acc) ->
- Tab = tracked_channel_table_name_for(Node),
- try
- Acc ++
- mnesia:dirty_match_object(Tab, #tracked_channel{_ = '_'})
- catch
- exit:{aborted, {no_exists, [Tab, _]}} ->
- %% The table might not exist yet (or is already gone)
- %% between the time rabbit_nodes:all_running() runs and
- %% returns a specific node, and
- %% mnesia:dirty_match_object() is called for that node's
- %% table.
- Acc
- end
+ list_on_node(Node) ++ Acc
end, [], rabbit_nodes:all_running()).
-spec list_of_user(rabbit_types:username()) -> [rabbit_types:tracked_channel()].
list_of_user(Username) ->
rabbit_tracking:match_tracked_items(
- fun tracked_channel_table_name_for/1,
- #tracked_channel{username = Username, _ = '_'}).
+ ?TRACKED_CHANNEL_TABLE,
+ #tracked_channel{username = Username, _ = '_'}).
-spec list_on_node(node()) -> [rabbit_types:tracked_channel()].
+list_on_node(Node) when Node == node() ->
+ ets:tab2list(?TRACKED_CHANNEL_TABLE);
list_on_node(Node) ->
- try mnesia:dirty_match_object(
- tracked_channel_table_name_for(Node),
- #tracked_channel{_ = '_'})
- catch exit:{aborted, {no_exists, _}} -> []
+ case rabbit_misc:rpc_call(Node, ets, tab2list, [?TRACKED_CHANNEL_TABLE]) of
+ List when is_list(List) ->
+ List;
+ _ ->
+ []
end.
--spec tracked_channel_table_name_for(node()) -> atom().
-
-tracked_channel_table_name_for(Node) ->
- list_to_atom(rabbit_misc:format("tracked_channel_on_node_~s", [Node])).
-
--spec tracked_channel_per_user_table_name_for(node()) -> atom().
-
-tracked_channel_per_user_table_name_for(Node) ->
- list_to_atom(rabbit_misc:format(
- "tracked_channel_table_per_user_on_node_~s", [Node])).
-
%% internal
-ensure_tracked_channels_table_for_this_node() ->
- ensure_tracked_channels_table_for_node(node()).
-
-ensure_per_user_tracked_channels_table_for_node() ->
- ensure_per_user_tracked_channels_table_for_node(node()).
-
-%% Create tables
-ensure_tracked_channels_table_for_node(Node) ->
- TableName = tracked_channel_table_name_for(Node),
- case mnesia:create_table(TableName, [{record_name, tracked_channel},
- {attributes, record_info(fields, tracked_channel)}]) of
- {atomic, ok} -> ok;
- {aborted, {already_exists, _}} -> ok;
- {aborted, Error} ->
- rabbit_log:error("Failed to create a tracked channel table for node ~p: ~p", [Node, Error]),
- ok
- end.
-
-ensure_per_user_tracked_channels_table_for_node(Node) ->
- TableName = tracked_channel_per_user_table_name_for(Node),
- case mnesia:create_table(TableName, [{record_name, tracked_channel_per_user},
- {attributes, record_info(fields, tracked_channel_per_user)}]) of
- {atomic, ok} -> ok;
- {aborted, {already_exists, _}} -> ok;
- {aborted, Error} ->
- rabbit_log:error("Failed to create a per-user tracked channel table for node ~p: ~p", [Node, Error]),
- ok
- end.
+ensure_tracked_tables_for_this_node() ->
+ ets:new(?TRACKED_CHANNEL_TABLE, [named_table, public, {write_concurrency, true},
+ {keypos, #tracked_connection.id}]),
+ ets:new(?TRACKED_CHANNEL_TABLE_PER_USER, [named_table, public, {write_concurrency, true}]).
clear_tracked_channel_tables_for_this_node() ->
[rabbit_tracking:clear_tracking_table(T)
- || T <- get_all_tracked_channel_table_names_for_node(node())].
-
-delete_tracked_channels_table_for_node(Node) ->
- TableName = tracked_channel_table_name_for(Node),
- rabbit_tracking:delete_tracking_table(TableName, Node, "tracked channel").
-
-delete_per_user_tracked_channels_table_for_node(Node) ->
- TableName = tracked_channel_per_user_table_name_for(Node),
- rabbit_tracking:delete_tracking_table(TableName, Node,
- "per-user tracked channels").
-
-get_all_tracked_channel_table_names_for_node(Node) ->
- [tracked_channel_table_name_for(Node),
- tracked_channel_per_user_table_name_for(Node)].
+ || T <- [?TRACKED_CHANNEL_TABLE, ?TRACKED_CHANNEL_TABLE_PER_USER]].
get_tracked_channels_by_connection_pid(ConnPid) ->
rabbit_tracking:match_tracked_items(
- fun tracked_channel_table_name_for/1,
+ ?TRACKED_CHANNEL_TABLE,
#tracked_channel{connection = ConnPid, _ = '_'}).
get_tracked_channel_by_pid(ChPid) ->
rabbit_tracking:match_tracked_items(
- fun tracked_channel_table_name_for/1,
+ ?TRACKED_CHANNEL_TABLE,
#tracked_channel{pid = ChPid, _ = '_'}).
delete_tracked_channel_user_entry(Username) ->
rabbit_tracking:delete_tracked_entry(
{rabbit_auth_backend_internal, exists, [Username]},
- fun tracked_channel_per_user_table_name_for/1,
+ ?TRACKED_CHANNEL_TABLE_PER_USER,
Username).
tracked_channel_from_channel_created_event(ChannelDetails) ->
diff --git a/deps/rabbit/src/rabbit_connection_tracking.erl b/deps/rabbit/src/rabbit_connection_tracking.erl
index b33d5da18c..098fa9cb87 100644
--- a/deps/rabbit/src/rabbit_connection_tracking.erl
+++ b/deps/rabbit/src/rabbit_connection_tracking.erl
@@ -26,23 +26,7 @@
clear_tracking_tables/0,
shutdown_tracked_items/2]).
--export([ensure_tracked_connections_table_for_node/1,
- ensure_per_vhost_tracked_connections_table_for_node/1,
- ensure_per_user_tracked_connections_table_for_node/1,
-
- ensure_tracked_connections_table_for_this_node/0,
- ensure_per_vhost_tracked_connections_table_for_this_node/0,
- ensure_per_user_tracked_connections_table_for_this_node/0,
-
- tracked_connection_table_name_for/1,
- tracked_connection_per_vhost_table_name_for/1,
- tracked_connection_per_user_table_name_for/1,
- get_all_tracked_connection_table_names_for_node/1,
-
- delete_tracked_connections_table_for_node/1,
- delete_per_vhost_tracked_connections_table_for_node/1,
- delete_per_user_tracked_connections_table_for_node/1,
- delete_tracked_connection_user_entry/1,
+-export([delete_tracked_connection_user_entry/1,
delete_tracked_connection_vhost_entry/1,
clear_tracked_connection_tables_for_this_node/0,
@@ -53,12 +37,18 @@
lookup/1,
count/0]).
+-export([ensure_tracked_tables_for_this_node/0]).
+
-include_lib("rabbit_common/include/rabbit.hrl").
-import(rabbit_misc, [pget/2]).
-export([close_connections/3]).
+-define(TRACKED_CONNECTION_TABLE, tracked_connection).
+-define(TRACKED_CONNECTION_TABLE_PER_USER, tracked_connection_per_user).
+-define(TRACKED_CONNECTION_TABLE_PER_VHOST, tracked_connection_per_vhost).
+
%%
%% API
%%
@@ -70,16 +60,13 @@
%% Sets up and resets connection tracking tables for this
%% node.
boot() ->
- ensure_tracked_connections_table_for_this_node(),
+ ensure_tracked_tables_for_this_node(),
rabbit_log:info("Setting up a table for connection tracking on this node: ~p",
- [tracked_connection_table_name_for(node())]),
- ensure_per_vhost_tracked_connections_table_for_this_node(),
+ [?TRACKED_CONNECTION_TABLE]),
rabbit_log:info("Setting up a table for per-vhost connection counting on this node: ~p",
- [tracked_connection_per_vhost_table_name_for(node())]),
- ensure_per_user_tracked_connections_table_for_this_node(),
+ [?TRACKED_CONNECTION_TABLE_PER_VHOST]),
rabbit_log:info("Setting up a table for per-user connection counting on this node: ~p",
- [tracked_connection_per_user_table_name_for(node())]),
- clear_tracking_tables(),
+ [?TRACKED_CONNECTION_TABLE_PER_USER]),
ok.
-spec update_tracked(term()) -> ok.
@@ -158,26 +145,19 @@ handle_cast({user_deleted, Details}) ->
rabbit_connection_tracking:list_of_user(Username),
rabbit_misc:format("user '~s' is deleted", [Username]));
%% A node had been deleted from the cluster.
-handle_cast({node_deleted, Details}) ->
- Node = pget(node, Details),
- rabbit_log_connection:info("Node '~s' was removed from the cluster, deleting its connection tracking tables...", [Node]),
- delete_tracked_connections_table_for_node(Node),
- delete_per_vhost_tracked_connections_table_for_node(Node),
- delete_per_user_tracked_connections_table_for_node(Node).
+handle_cast({node_deleted, _}) ->
+ ok.
-spec register_tracked(rabbit_types:tracked_connection()) -> ok.
-dialyzer([{nowarn_function, [register_tracked/1]}]).
register_tracked(#tracked_connection{username = Username, vhost = VHost, id = ConnId, node = Node} = Conn) when Node =:= node() ->
- TableName = tracked_connection_table_name_for(Node),
- PerVhostTableName = tracked_connection_per_vhost_table_name_for(Node),
- PerUserConnTableName = tracked_connection_per_user_table_name_for(Node),
%% upsert
- case mnesia:dirty_read(TableName, ConnId) of
+ case ets:lookup(?TRACKED_CONNECTION_TABLE, ConnId) of
[] ->
- mnesia:dirty_write(TableName, Conn),
- mnesia:dirty_update_counter(PerVhostTableName, VHost, 1),
- mnesia:dirty_update_counter(PerUserConnTableName, Username, 1);
+ ets:insert(?TRACKED_CONNECTION_TABLE, Conn),
+ ets:update_counter(?TRACKED_CONNECTION_TABLE_PER_VHOST, VHost, 1, {VHost, 0}),
+ ets:update_counter(?TRACKED_CONNECTION_TABLE_PER_USER, Username, 1, {Username, 0});
[#tracked_connection{}] ->
ok
end,
@@ -186,28 +166,25 @@ register_tracked(#tracked_connection{username = Username, vhost = VHost, id = Co
-spec unregister_tracked(rabbit_types:tracked_connection_id()) -> ok.
unregister_tracked(ConnId = {Node, _Name}) when Node =:= node() ->
- TableName = tracked_connection_table_name_for(Node),
- PerVhostTableName = tracked_connection_per_vhost_table_name_for(Node),
- PerUserConnTableName = tracked_connection_per_user_table_name_for(Node),
- case mnesia:dirty_read(TableName, ConnId) of
+ case ets:lookup(?TRACKED_CONNECTION_TABLE, ConnId) of
[] -> ok;
[#tracked_connection{vhost = VHost, username = Username}] ->
- mnesia:dirty_update_counter(PerUserConnTableName, Username, -1),
- mnesia:dirty_update_counter(PerVhostTableName, VHost, -1),
- mnesia:dirty_delete(TableName, ConnId)
+ ets:update_counter(?TRACKED_CONNECTION_TABLE_PER_USER, Username, -1),
+ ets:update_counter(?TRACKED_CONNECTION_TABLE_PER_VHOST, VHost, -1),
+ ets:delete(?TRACKED_CONNECTION_TABLE, ConnId)
end.
-spec count_tracked_items_in({atom(), rabbit_types:vhost()}) -> non_neg_integer().
count_tracked_items_in({vhost, VirtualHost}) ->
rabbit_tracking:count_tracked_items(
- fun tracked_connection_per_vhost_table_name_for/1,
- #tracked_connection_per_vhost.connection_count, VirtualHost,
+ ?TRACKED_CONNECTION_TABLE_PER_VHOST,
+ VirtualHost,
"connections in vhost");
count_tracked_items_in({user, Username}) ->
rabbit_tracking:count_tracked_items(
- fun tracked_connection_per_user_table_name_for/1,
- #tracked_connection_per_user.connection_count, Username,
+ ?TRACKED_CONNECTION_TABLE_PER_USER,
+ Username,
"connections for user").
-spec clear_tracking_tables() -> ok.
@@ -222,114 +199,22 @@ shutdown_tracked_items(TrackedItems, Message) ->
%% Extended API
--spec ensure_tracked_connections_table_for_this_node() -> ok.
-
-ensure_tracked_connections_table_for_this_node() ->
- ensure_tracked_connections_table_for_node(node()).
-
-
--spec ensure_per_vhost_tracked_connections_table_for_this_node() -> ok.
-
-ensure_per_vhost_tracked_connections_table_for_this_node() ->
- ensure_per_vhost_tracked_connections_table_for_node(node()).
-
-
--spec ensure_per_user_tracked_connections_table_for_this_node() -> ok.
-
-ensure_per_user_tracked_connections_table_for_this_node() ->
- ensure_per_user_tracked_connections_table_for_node(node()).
-
-
%% Create tables
--spec ensure_tracked_connections_table_for_node(node()) -> ok.
-
-ensure_tracked_connections_table_for_node(Node) ->
- TableName = tracked_connection_table_name_for(Node),
- case mnesia:create_table(TableName, [{record_name, tracked_connection},
- {attributes, record_info(fields, tracked_connection)}]) of
- {atomic, ok} -> ok;
- {aborted, {already_exists, _}} -> ok;
- {aborted, Error} ->
- rabbit_log:error("Failed to create a tracked connection table for node ~p: ~p", [Node, Error]),
- ok
- end.
-
--spec ensure_per_vhost_tracked_connections_table_for_node(node()) -> ok.
-
-ensure_per_vhost_tracked_connections_table_for_node(Node) ->
- TableName = tracked_connection_per_vhost_table_name_for(Node),
- case mnesia:create_table(TableName, [{record_name, tracked_connection_per_vhost},
- {attributes, record_info(fields, tracked_connection_per_vhost)}]) of
- {atomic, ok} -> ok;
- {aborted, {already_exists, _}} -> ok;
- {aborted, Error} ->
- rabbit_log:error("Failed to create a per-vhost tracked connection table for node ~p: ~p", [Node, Error]),
- ok
- end.
-
--spec ensure_per_user_tracked_connections_table_for_node(node()) -> ok.
-
-ensure_per_user_tracked_connections_table_for_node(Node) ->
- TableName = tracked_connection_per_user_table_name_for(Node),
- case mnesia:create_table(TableName, [{record_name, tracked_connection_per_user},
- {attributes, record_info(fields, tracked_connection_per_user)}]) of
- {atomic, ok} -> ok;
- {aborted, {already_exists, _}} -> ok;
- {aborted, Error} ->
- rabbit_log:error("Failed to create a per-user tracked connection table for node ~p: ~p", [Node, Error]),
- ok
- end.
+ensure_tracked_tables_for_this_node() ->
+ ets:new(?TRACKED_CONNECTION_TABLE, [named_table, public, {write_concurrency, true},
+ {keypos, #tracked_connection.id}]),
+ ets:new(?TRACKED_CONNECTION_TABLE_PER_USER, [named_table, public, {write_concurrency, true}]),
+ ets:new(?TRACKED_CONNECTION_TABLE_PER_VHOST, [named_table, public, {write_concurrency, true}]).
-spec clear_tracked_connection_tables_for_this_node() -> ok.
clear_tracked_connection_tables_for_this_node() ->
[rabbit_tracking:clear_tracking_table(T)
- || T <- get_all_tracked_connection_table_names_for_node(node())],
+ || T <- [?TRACKED_CONNECTION_TABLE,
+ ?TRACKED_CONNECTION_TABLE_PER_USER,
+ ?TRACKED_CONNECTION_TABLE_PER_VHOST]],
ok.
--spec delete_tracked_connections_table_for_node(node()) -> ok.
-
-delete_tracked_connections_table_for_node(Node) ->
- TableName = tracked_connection_table_name_for(Node),
- rabbit_tracking:delete_tracking_table(TableName, Node, "tracked connection").
-
--spec delete_per_vhost_tracked_connections_table_for_node(node()) -> ok.
-
-delete_per_vhost_tracked_connections_table_for_node(Node) ->
- TableName = tracked_connection_per_vhost_table_name_for(Node),
- rabbit_tracking:delete_tracking_table(TableName, Node,
- "per-vhost tracked connection").
-
--spec delete_per_user_tracked_connections_table_for_node(node()) -> ok.
-
-delete_per_user_tracked_connections_table_for_node(Node) ->
- TableName = tracked_connection_per_user_table_name_for(Node),
- rabbit_tracking:delete_tracking_table(TableName, Node,
- "per-user tracked connection").
-
--spec tracked_connection_table_name_for(node()) -> atom().
-
-tracked_connection_table_name_for(Node) ->
- list_to_atom(rabbit_misc:format("tracked_connection_on_node_~s", [Node])).
-
--spec tracked_connection_per_vhost_table_name_for(node()) -> atom().
-
-tracked_connection_per_vhost_table_name_for(Node) ->
- list_to_atom(rabbit_misc:format("tracked_connection_per_vhost_on_node_~s", [Node])).
-
--spec tracked_connection_per_user_table_name_for(node()) -> atom().
-
-tracked_connection_per_user_table_name_for(Node) ->
- list_to_atom(rabbit_misc:format(
- "tracked_connection_table_per_user_on_node_~s", [Node])).
-
--spec get_all_tracked_connection_table_names_for_node(node()) -> [atom()].
-
-get_all_tracked_connection_table_names_for_node(Node) ->
- [tracked_connection_table_name_for(Node),
- tracked_connection_per_vhost_table_name_for(Node),
- tracked_connection_per_user_table_name_for(Node)].
-
-spec lookup(rabbit_types:connection_name()) -> rabbit_types:tracked_connection() | 'not_found'.
lookup(Name) ->
@@ -338,11 +223,15 @@ lookup(Name) ->
lookup(_, []) ->
not_found;
-lookup(Name, [Node | Nodes]) ->
- TableName = tracked_connection_table_name_for(Node),
- case mnesia:dirty_read(TableName, {Node, Name}) of
+lookup(Name, [Node | Nodes]) when Node == node() ->
+ case ets:lookup(?TRACKED_CONNECTION_TABLE, {Node, Name}) of
[] -> lookup(Name, Nodes);
[Row] -> Row
+ end;
+lookup(Name, [Node | Nodes]) ->
+ case rabbit_misc:rpc_call(Node, ets, lookup, [?TRACKED_CONNECTION_TABLE, {Node, Name}]) of
+ [Row] -> Row;
+ _ -> lookup(Name, Nodes)
end.
-spec list() -> [rabbit_types:tracked_connection()].
@@ -350,78 +239,77 @@ lookup(Name, [Node | Nodes]) ->
list() ->
lists:foldl(
fun (Node, Acc) ->
- Tab = tracked_connection_table_name_for(Node),
- try
- Acc ++
- mnesia:dirty_match_object(Tab, #tracked_connection{_ = '_'})
- catch
- exit:{aborted, {no_exists, [Tab, _]}} ->
- %% The table might not exist yet (or is already gone)
- %% between the time rabbit_nodes:all_running() runs and
- %% returns a specific node, and
- %% mnesia:dirty_match_object() is called for that node's
- %% table.
- Acc
- end
+ list_on_node(Node) ++ Acc
end, [], rabbit_nodes:all_running()).
-spec count() -> non_neg_integer().
count() ->
lists:foldl(
- fun (Node, Acc) ->
- Tab = tracked_connection_table_name_for(Node),
- %% mnesia:table_info() returns 0 if the table doesn't exist. We
- %% don't need the same kind of protection as the list() function
- %% above.
- Acc + mnesia:table_info(Tab, size)
+ fun (Node, Acc) when Node == node() ->
+ Acc + ets:info(?TRACKED_CONNECTION_TABLE, size);
+ (Node, Acc) ->
+ case rabbit_misc:rpc_call(Node, ets, info, [?TRACKED_CONNECTION_TABLE, size]) of
+ Int when is_integer(Int) ->
+ Acc + Int;
+ _ ->
+ Acc
+ end
end, 0, rabbit_nodes:all_running()).
-spec list(rabbit_types:vhost()) -> [rabbit_types:tracked_connection()].
list(VHost) ->
rabbit_tracking:match_tracked_items(
- fun tracked_connection_table_name_for/1,
+ ?TRACKED_CONNECTION_TABLE,
#tracked_connection{vhost = VHost, _ = '_'}).
-spec list_on_node(node()) -> [rabbit_types:tracked_connection()].
+list_on_node(Node) when Node == node() ->
+ ets:tab2list(?TRACKED_CONNECTION_TABLE);
list_on_node(Node) ->
- try mnesia:dirty_match_object(
- tracked_connection_table_name_for(Node),
- #tracked_connection{_ = '_'})
- catch exit:{aborted, {no_exists, _}} -> []
+ case rabbit_misc:rpc_call(Node, ets, tab2list, [?TRACKED_CONNECTION_TABLE]) of
+ List when is_list(List) ->
+ List;
+ _ ->
+ []
end.
-spec list_on_node(node(), rabbit_types:vhost()) -> [rabbit_types:tracked_connection()].
+list_on_node(Node, VHost) when Node == node() ->
+ ets:match_object(?TRACKED_CONNECTION_TABLE,
+ #tracked_connection{vhost = VHost, _ = '_'});
list_on_node(Node, VHost) ->
- try mnesia:dirty_match_object(
- tracked_connection_table_name_for(Node),
- #tracked_connection{vhost = VHost, _ = '_'})
- catch exit:{aborted, {no_exists, _}} -> []
+ case rabbit_misc:rpc_call(Node, ets, match_object,
+ [?TRACKED_CONNECTION_TABLE,
+ #tracked_connection{vhost = VHost, _ = '_'}]) of
+ List when is_list(List) ->
+ List;
+ _ ->
+ []
end.
-
-
+
-spec list_of_user(rabbit_types:username()) -> [rabbit_types:tracked_connection()].
list_of_user(Username) ->
rabbit_tracking:match_tracked_items(
- fun tracked_connection_table_name_for/1,
- #tracked_connection{username = Username, _ = '_'}).
+ ?TRACKED_CONNECTION_TABLE,
+ #tracked_connection{username = Username, _ = '_'}).
%% Internal, delete tracked entries
delete_tracked_connection_vhost_entry(Vhost) ->
rabbit_tracking:delete_tracked_entry(
{rabbit_vhost, exists, [Vhost]},
- fun tracked_connection_per_vhost_table_name_for/1,
+ ?TRACKED_CONNECTION_TABLE_PER_VHOST,
Vhost).
delete_tracked_connection_user_entry(Username) ->
rabbit_tracking:delete_tracked_entry(
{rabbit_auth_backend_internal, exists, [Username]},
- fun tracked_connection_per_user_table_name_for/1,
+ ?TRACKED_CONNECTION_TABLE_PER_USER,
Username).
%% Returns a #tracked_connection from connection_created
diff --git a/deps/rabbit/src/rabbit_tracking.erl b/deps/rabbit/src/rabbit_tracking.erl
index 87b0e7bb4d..cea1e89a70 100644
--- a/deps/rabbit/src/rabbit_tracking.erl
+++ b/deps/rabbit/src/rabbit_tracking.erl
@@ -26,7 +26,7 @@
-callback clear_tracking_tables() -> 'ok'.
-callback shutdown_tracked_items(list(), term()) -> ok.
--export([id/2, count_tracked_items/4, match_tracked_items/2,
+-export([id/2, count_tracked_items/3, match_tracked_items/2,
clear_tracking_table/1, delete_tracking_table/3,
delete_tracked_entry/3]).
@@ -37,53 +37,66 @@
id(Node, Name) -> {Node, Name}.
--spec count_tracked_items(function(), integer(), term(), string()) ->
+-spec count_tracked_items(function(), term(), string()) ->
non_neg_integer().
-count_tracked_items(TableNameFun, CountRecPosition, Key, ContextMsg) ->
- lists:foldl(fun (Node, Acc) ->
- Tab = TableNameFun(Node),
- try
- N = case mnesia:dirty_read(Tab, Key) of
- [] -> 0;
- [Val] ->
- element(CountRecPosition, Val)
- end,
- Acc + N
- catch _:Err ->
- rabbit_log:error(
- "Failed to fetch number of ~p ~p on node ~p:~n~p",
- [ContextMsg, Key, Node, Err]),
- Acc
- end
+count_tracked_items(Tab, Key, ContextMsg) ->
+ lists:foldl(fun (Node, Acc) when Node == node() ->
+ N = case ets:lookup(Tab, Key) of
+ [] -> 0;
+ [{_, Val}] -> Val
+ end,
+ Acc + N;
+ (Node, Acc) ->
+ N = case rabbit_misc:rpc_call(Node, ets, lookup, [Tab, Key]) of
+ [] -> 0;
+ [{_, Val}] -> Val;
+ {badrpc, Err} ->
+ rabbit_log:error(
+ "Failed to fetch number of ~p ~p on node ~p:~n~p",
+ [ContextMsg, Key, Node, Err]),
+ 0
+ end,
+ Acc + N
end, 0, rabbit_nodes:all_running()).
-spec match_tracked_items(function(), tuple()) -> term().
-match_tracked_items(TableNameFun, MatchSpec) ->
+match_tracked_items(Tab, MatchSpec) ->
lists:foldl(
- fun (Node, Acc) ->
- Tab = TableNameFun(Node),
- Acc ++ mnesia:dirty_match_object(
+ fun (Node, Acc) when Node == node() ->
+ Acc ++ ets:match_object(
Tab,
- MatchSpec)
+ MatchSpec);
+ (Node, Acc) ->
+ case rabbit_misc:rpc_call(Node, ets, match_object, [Tab, MatchSpec]) of
+ List when is_list(List) ->
+ Acc ++ List;
+ _ ->
+ Acc
+ end
end, [], rabbit_nodes:all_running()).
-spec clear_tracking_table(atom()) -> ok.
clear_tracking_table(TableName) ->
- case mnesia:clear_table(TableName) of
- {atomic, ok} -> ok;
- {aborted, _} -> ok
+ try
+ true = ets:delete_all_objects(TableName),
+ ok
+ catch
+ error:badarg ->
+ ok
end.
-spec delete_tracking_table(atom(), node(), string()) -> ok.
+delete_tracking_table(TableName, Node, _ContextMsg) when Node == node() ->
+ true = ets:delete(TableName),
+ ok;
delete_tracking_table(TableName, Node, ContextMsg) ->
- case mnesia:delete_table(TableName) of
- {atomic, ok} -> ok;
- {aborted, {no_exists, _}} -> ok;
- {aborted, Error} ->
+ case rabbit_misc:rpc_call(Node, ets, delete, [TableName]) of
+ true -> ok;
+ {badrpc, Error} ->
rabbit_log:error("Failed to delete a ~p table for node ~p: ~p",
[ContextMsg, Node, Error]),
ok
@@ -91,13 +104,19 @@ delete_tracking_table(TableName, Node, ContextMsg) ->
-spec delete_tracked_entry({atom(), atom(), list()}, function(), term()) -> ok.
-delete_tracked_entry(_ExistsCheckSpec = {M, F, A}, TableNameFun, Key) ->
+delete_tracked_entry(_ExistsCheckSpec = {M, F, A}, TableName, Key) ->
ClusterNodes = rabbit_nodes:all_running(),
ExistsInCluster =
lists:any(fun(Node) -> rpc:call(Node, M, F, A) end, ClusterNodes),
case ExistsInCluster of
false ->
- [mnesia:dirty_delete(TableNameFun(Node), Key) || Node <- ClusterNodes];
+ [delete_tracked_entry0(Node, TableName, Key) || Node <- ClusterNodes];
true ->
ok
end.
+
+delete_tracked_entry0(Node, Tab, Key) when Node == node() ->
+ true = ets:delete(Tab, Key);
+delete_tracked_entry0(Node, Tab, Key) ->
+ _ = rabbit_misc:rpc_call(Node, ets, delete, [Tab, Key]),
+ ok.
diff --git a/deps/rabbit/src/rabbit_tracking_store.erl b/deps/rabbit/src/rabbit_tracking_store.erl
new file mode 100644
index 0000000000..49da0bfb0a
--- /dev/null
+++ b/deps/rabbit/src/rabbit_tracking_store.erl
@@ -0,0 +1,47 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2022 VMware, Inc. or its affiliates. All rights reserved.
+%%
+-module(rabbit_tracking_store).
+
+-behaviour(gen_server).
+
+%% API
+-export([start_link/0]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3, format_status/2]).
+
+-define(SERVER, ?MODULE).
+
+-record(state, {}).
+
+start_link() ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+
+init([]) ->
+ rabbit_connection_tracking:ensure_tracked_tables_for_this_node(),
+ rabbit_channel_tracking:ensure_tracked_tables_for_this_node(),
+ {ok, #state{}}.
+
+handle_call(_Request, _From, State) ->
+ Reply = ok,
+ {reply, Reply, State}.
+
+handle_cast(_Request, State) ->
+ {noreply, State}.
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+format_status(_Opt, Status) ->
+ Status.
diff --git a/deps/rabbit/test/per_user_connection_channel_tracking_SUITE.erl b/deps/rabbit/test/per_user_connection_channel_tracking_SUITE.erl
index 79190dc2ff..9fde0465e1 100644
--- a/deps/rabbit/test/per_user_connection_channel_tracking_SUITE.erl
+++ b/deps/rabbit/test/per_user_connection_channel_tracking_SUITE.erl
@@ -652,19 +652,6 @@ cluster_node_removed(Config) ->
rabbit_ct_broker_helpers:forget_cluster_node(Config, 0, 1),
timer:sleep(200),
- NodeName = rabbit_ct_broker_helpers:get_node_config(Config, 1, nodename),
-
- DroppedConnTrackingTables =
- rabbit_connection_tracking:get_all_tracked_connection_table_names_for_node(NodeName),
- [?assertEqual(
- {'EXIT', {aborted, {no_exists, Tab, all}}},
- catch mnesia:table_info(Tab, all)) || Tab <- DroppedConnTrackingTables],
-
- DroppedChTrackingTables =
- rabbit_channel_tracking:get_all_tracked_channel_table_names_for_node(NodeName),
- [?assertEqual(
- {'EXIT', {aborted, {no_exists, Tab, all}}},
- catch mnesia:table_info(Tab, all)) || Tab <- DroppedChTrackingTables],
?assertEqual(false, is_process_alive(Conn2)),
[?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2],
@@ -769,31 +756,26 @@ exists_in_tracked_connection_per_vhost_table(Config, VHost) ->
exists_in_tracked_connection_per_vhost_table(Config, 0, VHost).
exists_in_tracked_connection_per_vhost_table(Config, NodeIndex, VHost) ->
exists_in_tracking_table(Config, NodeIndex,
- fun rabbit_connection_tracking:tracked_connection_per_vhost_table_name_for/1,
- VHost).
+ tracked_connection_per_vhost,
+ VHost).
exists_in_tracked_connection_per_user_table(Config, Username) ->
exists_in_tracked_connection_per_user_table(Config, 0, Username).
exists_in_tracked_connection_per_user_table(Config, NodeIndex, Username) ->
exists_in_tracking_table(Config, NodeIndex,
- fun rabbit_connection_tracking:tracked_connection_per_user_table_name_for/1,
- Username).
+ tracked_connection_per_user,
+ Username).
exists_in_tracked_channel_per_user_table(Config, Username) ->
exists_in_tracked_channel_per_user_table(Config, 0, Username).
exists_in_tracked_channel_per_user_table(Config, NodeIndex, Username) ->
exists_in_tracking_table(Config, NodeIndex,
- fun rabbit_channel_tracking:tracked_channel_per_user_table_name_for/1,
- Username).
-
-exists_in_tracking_table(Config, NodeIndex, TableNameFun, Key) ->
- Node = rabbit_ct_broker_helpers:get_node_config(
- Config, NodeIndex, nodename),
- Tab = TableNameFun(Node),
- AllKeys = rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
- mnesia,
- dirty_all_keys, [Tab]),
- lists:member(Key, AllKeys).
+ tracked_connection_per_user,
+ Username).
+
+exists_in_tracking_table(Config, NodeIndex, Tab, Key) ->
+ All = rabbit_ct_broker_helpers:rpc(Config, NodeIndex, ets, tab2list, [Tab]),
+ lists:keymember(Key, 1, All).
mimic_vhost_down(Config, NodeIndex, VHost) ->
rabbit_ct_broker_helpers:rpc(Config, NodeIndex,