summaryrefslogtreecommitdiff
path: root/deps/rabbit/src/rabbit_tracking.erl
blob: fba283097ecf534868dd9799ba36206eaef4bb4f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2020-2021 VMware, Inc. or its affiliates.  All rights reserved.
%%

-module(rabbit_tracking).

%% Common behaviour and processing functions for tracking components
%%
%% See in use:
%%  * rabbit_connection_tracking
%%  * rabbit_channel_tracking

-callback boot() -> ok.
-callback update_tracked(term()) -> ok.
-callback handle_cast(term()) -> ok.
-callback register_tracked(
              rabbit_types:tracked_connection() |
                  rabbit_types:tracked_channel()) -> 'ok'.
-callback unregister_tracked(
              rabbit_types:tracked_connection_id() |
                  rabbit_types:tracked_channel_id()) -> 'ok'.
-callback count_tracked_items_in(term()) -> non_neg_integer().
-callback clear_tracking_tables() -> 'ok'.
-callback shutdown_tracked_items(list(), term()) -> ok.

-export([id/2, count_tracked_items/4, match_tracked_items/2,
         clear_tracking_table/1, delete_tracking_table/3,
         delete_tracked_entry/3]).

%%----------------------------------------------------------------------------

-spec id(atom(), term()) ->
    rabbit_types:tracked_connection_id() | rabbit_types:tracked_channel_id().

id(Node, Name) -> {Node, Name}.

-spec count_tracked_items(function(), integer(), term(), string()) ->
    non_neg_integer().

count_tracked_items(TableNameFun, CountRecPosition, Key, ContextMsg) ->
    lists:foldl(fun (Node, Acc) ->
                        Tab = TableNameFun(Node),
                        try
                            N = case mnesia:dirty_read(Tab, Key) of
                                    []    -> 0;
                                    [Val] ->
                                        element(CountRecPosition, Val)
                                end,
                            Acc + N
                        catch _:Err  ->
                                rabbit_log:error(
                                  "Failed to fetch number of ~p ~p on node ~p:~n~p~n",
                                  [ContextMsg, Key, Node, Err]),
                                Acc
                        end
                end, 0, rabbit_nodes:all_running()).

-spec match_tracked_items(function(), tuple()) -> term().

match_tracked_items(TableNameFun, MatchSpec) ->
    lists:foldl(
        fun (Node, Acc) ->
                Tab = TableNameFun(Node),
                Acc ++ mnesia:dirty_match_object(
                         Tab,
                         MatchSpec)
        end, [], rabbit_nodes:all_running()).

-spec clear_tracking_table(atom()) -> ok.

clear_tracking_table(TableName) ->
    case mnesia:clear_table(TableName) of
        {atomic, ok} -> ok;
        {aborted, _} -> ok
    end.

-spec delete_tracking_table(atom(), node(), string()) -> ok.

delete_tracking_table(TableName, Node, ContextMsg) ->
    case mnesia:delete_table(TableName) of
        {atomic, ok}              -> ok;
        {aborted, {no_exists, _}} -> ok;
        {aborted, Error} ->
            rabbit_log:error("Failed to delete a ~p table for node ~p: ~p",
                [ContextMsg, Node, Error]),
            ok
    end.

-spec delete_tracked_entry({atom(), atom(), list()}, function(), term()) -> ok.

delete_tracked_entry(_ExistsCheckSpec = {M, F, A}, TableNameFun, Key) ->
    ClusterNodes = rabbit_nodes:all_running(),
    ExistsInCluster =
        lists:any(fun(Node) -> rpc:call(Node, M, F, A) end, ClusterNodes),
    case ExistsInCluster of
        false ->
            [mnesia:dirty_delete(TableNameFun(Node), Key) || Node <- ClusterNodes];
        true ->
            ok
    end.