path: root/deps/rabbitmq_management_agent/src
diff options
Diffstat (limited to 'deps/rabbitmq_management_agent/src')
16 files changed, 3732 insertions, 0 deletions
diff --git a/deps/rabbitmq_management_agent/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ResetStatsDbCommand.erl b/deps/rabbitmq_management_agent/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ResetStatsDbCommand.erl
new file mode 100644
index 0000000000..bc6bdbdc25
--- /dev/null
+++ b/deps/rabbitmq_management_agent/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ResetStatsDbCommand.erl
@@ -0,0 +1,54 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+ usage/0,
+ validate/2,
+ merge_defaults/2,
+ banner/2,
+ run/2,
+ output/2,
+ switches/0,
+ description/0
+ ]).
+%% Callbacks
+usage() ->
+ <<"reset_stats_db [--all]">>.
+validate(_, _) ->
+ ok.
+merge_defaults(A, Opts) ->
+ {A, maps:merge(#{all => false}, Opts)}.
+switches() ->
+ [{all, boolean}].
+run(_Args, #{node := Node, all := true}) ->
+ rabbit_misc:rpc_call(Node, rabbit_mgmt_storage, reset_all, []);
+run(_Args, #{node := Node, all := false}) ->
+ rabbit_misc:rpc_call(Node, rabbit_mgmt_storage, reset, []).
+output(Output, _Opts) ->
+ 'Elixir.RabbitMQ.CLI.DefaultOutput':output(Output).
+banner(_, #{all := true}) ->
+ <<"Resetting statistics database in all nodes">>;
+banner(_, #{node := Node}) ->
+ erlang:iolist_to_binary([<<"Resetting statistics database on node ">>,
+ atom_to_binary(Node, utf8)]).
+description() ->
+ <<"Resets statistics database. This will remove all metrics data!">>.
diff --git a/deps/rabbitmq_management_agent/src/exometer_slide.erl b/deps/rabbitmq_management_agent/src/exometer_slide.erl
new file mode 100644
index 0000000000..2c4e4c6d35
--- /dev/null
+++ b/deps/rabbitmq_management_agent/src/exometer_slide.erl
@@ -0,0 +1,551 @@
+%% This file is a copy of exometer_slide.erl from,
+%% with the following modifications:
+%% 1) The elements are tuples of numbers
+%% 2) Only one element for each expected interval point is added, intermediate values
+%% are discarded. Thus, if we have a window of 60s and interval of 5s, at max 12 elements
+%% are stored.
+%% 3) Additions can be provided as increments to the last value stored
+%% 4) sum/1 implements the sum of several slides, generating a new timestamp sequence based
+%% on the given intervals. Elements on each window are added to the closest interval point.
+%% Original commit:
+%% -------------------------------------------------------------------
+%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved.
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at
+%% -------------------------------------------------------------------
+%% @author Tony Rogvall <>
+%% @author Ulf Wiger <>
+%% @author Magnus Feuer <>
+%% @doc Efficient sliding-window buffer
+%% Initial implementation: 29 Sep 2009 by Tony Rogvall
+%% This module implements an efficient sliding window, maintaining
+%% two lists - a primary and a secondary. Values are paired with a
+%% timestamp (millisecond resolution, see `timestamp/0')
+%% and prepended to the primary list. When the time span between the oldest
+%% and the newest entry in the primary list exceeds the given window size,
+%% the primary list is shifted into the secondary list position, and the
+%% new entry is added to a new (empty) primary list.
+%% The window can be converted to a list using `to_list/1'.
+%% @end
+%% All modifications are (C) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%% The Initial Developer of the Original Code is Basho Technologies, Inc.
+-export([new/2, new/3,
+ reset/1,
+ add_element/3,
+ to_list/2,
+ to_list/3,
+ foldl/5,
+ map/2,
+ to_normalized_list/5]).
+ last_two/1,
+ last/1]).
+ sum/2,
+ sum/5,
+ optimize/1]).
+%% For testing
+-type value() :: tuple().
+-type internal_value() :: tuple() | drop.
+-type timestamp() :: non_neg_integer().
+-type fold_acc() :: any().
+-type fold_fun() :: fun(({timestamp(), internal_value()}, fold_acc()) -> fold_acc()).
+%% Fixed size event buffer
+-record(slide, {size = 0 :: integer(), % ms window
+ n = 0 :: integer(), % number of elements in buf1
+ max_n :: infinity | integer(), % max no of elements
+ incremental = false :: boolean(),
+ interval :: integer(),
+ last = 0 :: integer(), % millisecond timestamp
+ first = undefined :: undefined | integer(), % millisecond timestamp
+ buf1 = [] :: [internal_value()],
+ buf2 = [] :: [internal_value()],
+ total :: undefined | value()}).
+-opaque slide() :: #slide{}.
+-export_type([slide/0, timestamp/0]).
+-spec timestamp() -> timestamp().
+%% @doc Generate a millisecond-resolution timestamp.
+%% This timestamp format is used e.g. by the `exometer_slide' and
+%% `exometer_histogram' implementations.
+%% @end
+timestamp() ->
+ os:system_time(milli_seconds).
+-spec new(_Size::integer(), _Options::list()) -> slide().
+%% @doc Create a new sliding-window buffer.
+%% `Size' determines the size in milliseconds of the sliding window.
+%% The implementation prepends values into a primary list until the oldest
+%% element in the list is `Size' ms older than the current value. It then
+%% swaps the primary list into a secondary list, and starts prepending to
+%% a new primary list. This means that more data than fits inside the window
+%% will be kept - upwards of twice as much. On the other hand, updating the
+%% buffer is very cheap.
+%% @end
+new(Size, Opts) -> new(timestamp(), Size, Opts).
+-spec new(Timestamp :: timestamp(), Size::integer(), Options::list()) -> slide().
+new(TS, Size, Opts) ->
+ #slide{size = Size,
+ max_n = proplists:get_value(max_n, Opts, infinity),
+ interval = proplists:get_value(interval, Opts, infinity),
+ last = TS,
+ first = undefined,
+ incremental = proplists:get_value(incremental, Opts, false),
+ buf1 = [],
+ buf2 = []}.
+-spec reset(slide()) -> slide().
+%% @doc Empty the buffer
+reset(Slide) ->
+ Slide#slide{n = 0, buf1 = [], buf2 = [], last = 0, first = undefined}.
+%% @doc Add an element to the buffer, tagged with the given timestamp.
+%% Apart from the specified timestamp, this function works just like
+%% {@link add_element/2}.
+%% @end
+-spec add_element(timestamp(), value(), slide()) -> slide().
+add_element(_TS, _Evt, Slide) when Slide#slide.size == 0 ->
+ Slide;
+add_element(TS, Evt, #slide{last = Last, interval = Interval, total = Total0,
+ incremental = true} = Slide)
+ when (TS - Last) < Interval ->
+ Total = add_to_total(Evt, Total0),
+ Slide#slide{total = Total};
+add_element(TS, Evt, #slide{last = Last, interval = Interval} = Slide)
+ when (TS - Last) < Interval ->
+ Slide#slide{total = Evt};
+add_element(TS, Evt, #slide{last = Last, size = Sz, incremental = true,
+ n = N, max_n = MaxN, total = Total0,
+ buf1 = Buf1} = Slide) ->
+ N1 = N+1,
+ Total = add_to_total(Evt, Total0),
+ %% Total could be the same as the last sample, by adding and substracting
+ %% the same amout to the totals. That is not strictly a drop, but should
+ %% generate new samples.
+ %% I.e. 0, 0, -14, 14 (total = 0, samples = 14, -14, 0, drop)
+ case {is_zeros(Evt), Buf1} of
+ {_, []} ->
+ Slide#slide{n = N1, first = TS, buf1 = [{TS, Total} | Buf1],
+ last = TS, total = Total};
+ _ when TS - Last > Sz; N1 > MaxN ->
+ %% swap
+ Slide#slide{last = TS, n = 1, buf1 = [{TS, Total}],
+ buf2 = Buf1, total = Total};
+ {true, [{_, Total}, {_, drop} = Drop | Tail]} ->
+ %% Memory optimisation
+ Slide#slide{buf1 = [{TS, Total}, Drop | Tail],
+ n = N1, last = TS};
+ {true, [{DropTS, Total} | Tail]} ->
+ %% Memory optimisation
+ Slide#slide{buf1 = [{TS, Total}, {DropTS, drop} | Tail],
+ n = N1, last = TS};
+ _ ->
+ Slide#slide{n = N1, buf1 = [{TS, Total} | Buf1],
+ last = TS, total = Total}
+ end;
+add_element(TS, Evt, #slide{last = Last, size = Sz, n = N, max_n = MaxN,
+ buf1 = Buf1} = Slide)
+ when TS - Last > Sz; N + 1 > MaxN ->
+ Slide#slide{last = TS, n = 1, buf1 = [{TS, Evt}],
+ buf2 = Buf1, total = Evt};
+add_element(TS, Evt, #slide{buf1 = [{_, Evt}, {_, drop} = Drop | Tail],
+ n = N} = Slide) ->
+ %% Memory optimisation
+ Slide#slide{buf1 = [{TS, Evt}, Drop | Tail], n = N + 1, last = TS};
+add_element(TS, Evt, #slide{buf1 = [{DropTS, Evt} | Tail], n = N} = Slide) ->
+ %% Memory optimisation
+ Slide#slide{buf1 = [{TS, Evt}, {DropTS, drop} | Tail],
+ n = N + 1, last = TS};
+add_element(TS, Evt, #slide{n = N, buf1 = Buf1} = Slide) ->
+ N1 = N+1,
+ case Buf1 of
+ [] ->
+ Slide#slide{n = N1, buf1 = [{TS, Evt} | Buf1],
+ last = TS, first = TS, total = Evt};
+ _ ->
+ Slide#slide{n = N1, buf1 = [{TS, Evt} | Buf1],
+ last = TS, total = Evt}
+ end.
+add_to_total(Evt, undefined) ->
+ Evt;
+add_to_total({A0}, {B0}) ->
+ {B0 + A0};
+add_to_total({A0, A1}, {B0, B1}) ->
+ {B0 + A0, B1 + A1};
+add_to_total({A0, A1, A2}, {B0, B1, B2}) ->
+ {B0 + A0, B1 + A1, B2 + A2};
+add_to_total({A0, A1, A2, A3}, {B0, B1, B2, B3}) ->
+ {B0 + A0, B1 + A1, B2 + A2, B3 + A3};
+add_to_total({A0, A1, A2, A3, A4}, {B0, B1, B2, B3, B4}) ->
+ {B0 + A0, B1 + A1, B2 + A2, B3 + A3, B4 + A4};
+add_to_total({A0, A1, A2, A3, A4, A5}, {B0, B1, B2, B3, B4, B5}) ->
+ {B0 + A0, B1 + A1, B2 + A2, B3 + A3, B4 + A4, B5 + A5};
+add_to_total({A0, A1, A2, A3, A4, A5, A6}, {B0, B1, B2, B3, B4, B5, B6}) ->
+ {B0 + A0, B1 + A1, B2 + A2, B3 + A3, B4 + A4, B5 + A5, B6 + A6};
+add_to_total({A0, A1, A2, A3, A4, A5, A6, A7}, {B0, B1, B2, B3, B4, B5, B6, B7}) ->
+ {B0 + A0, B1 + A1, B2 + A2, B3 + A3, B4 + A4, B5 + A5, B6 + A6, B7 + A7};
+add_to_total({A0, A1, A2, A3, A4, A5, A6, A7, A8}, {B0, B1, B2, B3, B4, B5, B6, B7, B8}) ->
+ {B0 + A0, B1 + A1, B2 + A2, B3 + A3, B4 + A4, B5 + A5, B6 + A6, B7 + A7, B8 + A8};
+add_to_total({A0, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14,
+ A15, A16, A17, A18, A19},
+ {B0, B1, B2, B3, B4, B5, B6, B7, B8, B9, B10, B11, B12, B13, B14,
+ B15, B16, B17, B18, B19}) ->
+ {B0 + A0, B1 + A1, B2 + A2, B3 + A3, B4 + A4, B5 + A5, B6 + A6, B7 + A7, B8 + A8,
+ B9 + A9, B10 + A10, B11 + A11, B12 + A12, B13 + A13, B14 + A14, B15 + A15, B16 + A16,
+ B17 + A17, B18 + A18, B19 + A19}.
+is_zeros({0}) ->
+ true;
+is_zeros({0, 0}) ->
+ true;
+is_zeros({0, 0, 0}) ->
+ true;
+is_zeros({0, 0, 0, 0}) ->
+ true;
+is_zeros({0, 0, 0, 0, 0}) ->
+ true;
+is_zeros({0, 0, 0, 0, 0, 0}) ->
+ true;
+is_zeros({0, 0, 0, 0, 0, 0, 0}) ->
+ true;
+is_zeros({0, 0, 0, 0, 0, 0, 0, 0, 0}) ->
+ true;
+is_zeros({0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}) ->
+ true;
+is_zeros(_) ->
+ false.
+-spec optimize(slide()) -> slide().
+optimize(#slide{buf2 = []} = Slide) ->
+ Slide;
+optimize(#slide{buf1 = Buf1, buf2 = Buf2, max_n = MaxN, n = N} = Slide)
+ when is_integer(MaxN) andalso length(Buf1) < MaxN ->
+ Slide#slide{buf1 = Buf1,
+ buf2 = lists:sublist(Buf2, n_diff(MaxN, N) + 1)};
+optimize(Slide) -> Slide.
+snd(T) when is_tuple(T) ->
+ element(2, T).
+-spec to_list(timestamp(), slide()) -> [{timestamp(), value()}].
+%% @doc Convert the sliding window into a list of timestamped values.
+%% @end
+to_list(_Now, #slide{size = Sz}) when Sz == 0 ->
+ [];
+to_list(Now, #slide{size = Sz} = Slide) ->
+ snd(to_list_from(Now, Now - Sz, Slide)).
+to_list(Now, Start, Slide) ->
+ snd(to_list_from(Now, Start, Slide)).
+to_list_from(Now, Start0, #slide{max_n = MaxN, buf2 = Buf2, first = FirstTS,
+ interval = Interval} = Slide) ->
+ {NewN, Buf1} = maybe_add_last_sample(Now, Slide),
+ Start = first_max(FirstTS, Start0),
+ {Prev0, Buf1_1} = take_since(Buf1, Now, Start, first_max(MaxN, NewN), [], Interval),
+ case take_since(Buf2, Now, Start, first_max(MaxN, NewN), Buf1_1, Interval) of
+ {undefined, Buf1_1} ->
+ {Prev0, Buf1_1};
+ {_Prev, Buf1_1} = Res ->
+ case Prev0 of
+ undefined ->
+ Res;
+ _ ->
+ %% If take_since returns the same buffer, that means we don't
+ %% need Buf2 at all. We might be returning a too old sample
+ %% in previous, so we must use the one from Buf1
+ {Prev0, Buf1_1}
+ end;
+ Res ->
+ Res
+ end.
+first_max(F, X) when is_integer(F) -> max(F, X);
+first_max(_, X) -> X.
+-spec last_two(slide()) -> [{timestamp(), value()}].
+%% @doc Returns the newest 2 elements on the sample
+last_two(#slide{buf1 = [{TS, Evt} = H1, {_, drop} | _], interval = Interval}) ->
+ [H1, {TS - Interval, Evt}];
+last_two(#slide{buf1 = [H1, H2_0 | _], interval = Interval}) ->
+ H2 = adjust_timestamp(H1, H2_0, Interval),
+ [H1, H2];
+last_two(#slide{buf1 = [H1], buf2 = [H2_0 | _],
+ interval = Interval}) ->
+ H2 = adjust_timestamp(H1, H2_0, Interval),
+ [H1, H2];
+last_two(#slide{buf1 = [H1], buf2 = []}) ->
+ [H1];
+last_two(_) ->
+ [].
+adjust_timestamp({TS1, _}, {TS2, V2}, Interval) ->
+ case TS1 - TS2 > Interval of
+ true -> {TS1 - Interval, V2};
+ false -> {TS2, V2}
+ end.
+-spec last(slide()) -> value() | undefined.
+last(#slide{total = T}) when T =/= undefined ->
+ T;
+last(#slide{buf1 = [{_TS, T} | _]}) ->
+ T;
+last(#slide{buf2 = [{_TS, T} | _]}) ->
+ T;
+last(_) ->
+ undefined.
+-spec foldl(timestamp(), timestamp(), fold_fun(), fold_acc(), slide()) -> fold_acc().
+%% @doc Fold over the sliding window, starting from `Timestamp'.
+%% Now provides a reference point to evaluate whether to include
+%% partial, unrealised sample values in the sequence. Unrealised values will be
+%% appended to the sequence when Now >= LastTS + Interval
+%% The fun should as `fun({Timestamp, Value}, Acc) -> NewAcc'.
+%% The values are processed in order from oldest to newest.
+%% @end
+foldl(_Now, _Timestamp, _Fun, _Acc, #slide{size = Sz}) when Sz == 0 ->
+ [];
+foldl(Now, Start0, Fun, Acc, #slide{max_n = _MaxN, buf2 = _Buf2,
+ interval = _Interval} = Slide) ->
+ lists:foldl(Fun, Acc, element(2, to_list_from(Now, Start0, Slide)) ++ [last]).
+map(Fun, #slide{buf1 = Buf1, buf2 = Buf2, total = Total} = Slide) ->
+ BufFun = fun({Timestamp, Value}) ->
+ {Timestamp, Fun(Value)}
+ end,
+ MappedBuf1 = lists:map(BufFun, Buf1),
+ MappedBuf2 = lists:map(BufFun, Buf2),
+ MappedTotal = Fun(Total),
+ Slide#slide{buf1 = MappedBuf1, buf2 = MappedBuf2, total = MappedTotal}.
+maybe_add_last_sample(_Now, #slide{total = T, n = N,
+ buf1 = [{_, T} | _] = Buf1}) ->
+ {N, Buf1};
+maybe_add_last_sample(Now, #slide{total = T,
+ n = N,
+ last = Last,
+ interval = I,
+ buf1 = Buf1})
+ when T =/= undefined andalso Now >= Last + I ->
+ {N + 1, [{Last + I, T} | Buf1]};
+maybe_add_last_sample(_Now, #slide{buf1 = Buf1, n = N}) ->
+ {N, Buf1}.
+create_normalized_lookup(Start, Interval, RoundFun, Samples) ->
+ lists:foldl(fun({TS, Value}, Acc) when TS - Start >= 0 ->
+ NewTS = map_timestamp(TS, Start, Interval, RoundFun),
+ maps:update_with(NewTS,
+ fun({T, V}) when T > TS ->
+ {T, V};
+ (_) ->
+ {TS, Value}
+ end, {TS, Value}, Acc);
+ (_, Acc) ->
+ Acc
+ end, #{}, Samples).
+-spec to_normalized_list(timestamp(), timestamp(), integer(), slide(),
+ no_pad | tuple()) -> [tuple()].
+to_normalized_list(Now, Start, Interval, Slide, Empty) ->
+ to_normalized_list(Now, Start, Interval, Slide, Empty, fun ceil/1).
+to_normalized_list(Now, Start, Interval, #slide{first = FirstTS0,
+ total = Total} = Slide,
+ Empty, RoundFun) ->
+ RoundTSFun = fun (TS) -> map_timestamp(TS, Start, Interval, RoundFun) end,
+ % add interval as we don't want to miss a sample due to rounding
+ {Prev, Samples} = to_list_from(Now + Interval, Start, Slide),
+ Lookup = create_normalized_lookup(Start, Interval, RoundFun, Samples),
+ NowRound = RoundTSFun(Now),
+ Pad = case Samples of
+ _ when Empty =:= no_pad ->
+ [];
+ [{TS, _} | _] when Prev =/= undefined, Start =< TS ->
+ [{T, snd(Prev)}
+ || T <- lists:seq(RoundTSFun(TS) - Interval, Start,
+ -Interval)];
+ [{TS, _} | _] when is_number(FirstTS0) andalso Start < FirstTS0 ->
+ % only if we know there is nothing in the past can we
+ % generate a 0 pad
+ [{T, Empty} || T <- lists:seq(RoundTSFun(TS) - Interval, Start,
+ -Interval)];
+ _ when FirstTS0 =:= undefined andalso Total =:= undefined ->
+ [{T, Empty} || T <- lists:seq(NowRound, Start, -Interval)];
+ [] -> % samples have been seen, use the total to pad
+ [{T, Total} || T <- lists:seq(NowRound, Start, -Interval)];
+ _ -> []
+ end,
+ {_, Res1} = lists:foldl(
+ fun(T, {Last, Acc}) ->
+ case maps:find(T, Lookup) of
+ {ok, {_, V}} ->
+ {V, [{T, V} | Acc]};
+ error when Last =:= undefined ->
+ {Last, Acc};
+ error -> % this pads the last value into the future
+ {Last, [{T, Last} | Acc]}
+ end
+ end, {undefined, []},
+ lists:seq(Start, NowRound, Interval)),
+ Res1 ++ Pad.
+%% @doc Sums a list of slides
+%% Takes the last known timestamp and creates an template version of the
+%% sliding window. Timestamps are then truncated and summed with the value
+%% in the template slide.
+%% @end
+-spec sum([slide()]) -> slide().
+sum(Slides) -> sum(Slides, no_pad).
+sum([#slide{size = Size, interval = Interval} | _] = Slides, Pad) ->
+ % take the freshest timestamp as reference point for summing operation
+ Now = lists:max([Last || #slide{last = Last} <- Slides]),
+ Start = Now - Size,
+ sum(Now, Start, Interval, Slides, Pad).
+sum(Now, Start, Interval, [Slide | _ ] = All, Pad) ->
+ Fun = fun({TS, Value}, Acc) ->
+ maps:update_with(TS, fun(V) -> add_to_total(V, Value) end,
+ Value, Acc)
+ end,
+ {Total, Dict} =
+ lists:foldl(fun(#slide{total = T} = S, {Tot, Acc}) ->
+ Samples = to_normalized_list(Now, Start, Interval, S,
+ Pad, fun ceil/1),
+ Total = add_to_total(T, Tot),
+ Folded = lists:foldl(Fun, Acc, Samples),
+ {Total, Folded}
+ end, {undefined, #{}}, All),
+ {First, Buffer} = case lists:sort(maps:to_list(Dict)) of
+ [] ->
+ F = case [TS || #slide{first = TS} <- All,
+ is_integer(TS)] of
+ [] -> undefined;
+ FS -> lists:min(FS)
+ end,
+ {F, []};
+ [{F, _} | _ ] = B ->
+ {F, lists:reverse(B)}
+ end,
+ Slide#slide{buf1 = Buffer, buf2 = [], total = Total, n = length(Buffer),
+ first = First, last = Now}.
+truncated_seq(_First, _Last, _Incr, 0) ->
+ [];
+truncated_seq(TS, TS, _Incr, MaxN) when MaxN > 0 ->
+ [TS];
+truncated_seq(First, Last, Incr, MaxN) when First =< Last andalso MaxN > 0 ->
+ End = min(Last, First + (MaxN * Incr) - Incr),
+ lists:seq(First, End, Incr);
+truncated_seq(First, Last, Incr, MaxN) ->
+ End = max(Last, First + (MaxN * Incr) - Incr),
+ lists:seq(First, End, Incr).
+take_since([{DropTS, drop} | T], Now, Start, N, [{TS, Evt} | _] = Acc,
+ Interval) ->
+ case T of
+ [] ->
+ Fill = [{TS0, Evt} || TS0 <- truncated_seq(TS - Interval,
+ max(DropTS, Start),
+ -Interval, N)],
+ {undefined, lists:reverse(Fill) ++ Acc};
+ [{TS0, _} = E | Rest] when TS0 >= Start, N > 0 ->
+ Fill = [{TS1, Evt} || TS1 <- truncated_seq(TS0 + Interval,
+ max(TS0 + Interval, TS - Interval),
+ Interval, N)],
+ take_since(Rest, Now, Start, decr(N), [E | Fill ++ Acc], Interval);
+ [Prev | _] -> % next sample is out of range so needs to be filled from Start
+ Fill = [{TS1, Evt} || TS1 <- truncated_seq(Start, max(Start, TS - Interval),
+ Interval, N)],
+ {Prev, Fill ++ Acc}
+ end;
+take_since([{TS, V} = H | T], Now, Start, N, Acc, Interval) when TS >= Start,
+ N > 0,
+ TS =< Now,
+ is_tuple(V) ->
+ take_since(T, Now, Start, decr(N), [H|Acc], Interval);
+take_since([{TS,_} | T], Now, Start, N, Acc, Interval) when TS >= Start, N > 0 ->
+ take_since(T, Now, Start, decr(N), Acc, Interval);
+take_since([Prev | _], _, _, _, Acc, _) ->
+ {Prev, Acc};
+take_since(_, _, _, _, Acc, _) ->
+ %% Don't reverse; already the wanted order.
+ {undefined, Acc}.
+decr(N) when is_integer(N) ->
+ N-1;
+decr(N) -> N.
+n_diff(A, B) when is_integer(A) ->
+ A - B.
+ceil(X) when X < 0 ->
+ trunc(X);
+ceil(X) ->
+ T = trunc(X),
+ case X - T == 0 of
+ true -> T;
+ false -> T + 1
+ end.
+map_timestamp(TS, Start, Interval, Round) ->
+ Factor = Round((TS - Start) / Interval),
+ Start + Interval * Factor.
+buffer(#slide{buf1 = Buf1, buf2 = Buf2}) ->
+ Buf1 ++ Buf2.
diff --git a/deps/rabbitmq_management_agent/src/rabbit_mgmt_agent_app.erl b/deps/rabbitmq_management_agent/src/rabbit_mgmt_agent_app.erl
new file mode 100644
index 0000000000..e889815c2f
--- /dev/null
+++ b/deps/rabbitmq_management_agent/src/rabbit_mgmt_agent_app.erl
@@ -0,0 +1,17 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+-export([start/2, stop/1]).
+start(_Type, _StartArgs) ->
+ rabbit_mgmt_agent_sup_sup:start_link().
+stop(_State) ->
+ ok.
diff --git a/deps/rabbitmq_management_agent/src/rabbit_mgmt_agent_config.erl b/deps/rabbitmq_management_agent/src/rabbit_mgmt_agent_config.erl
new file mode 100644
index 0000000000..e8d074e891
--- /dev/null
+++ b/deps/rabbitmq_management_agent/src/rabbit_mgmt_agent_config.erl
@@ -0,0 +1,22 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+-export([get_env/1, get_env/2]).
+%% some people have reasons to only run with the agent enabled:
+%% make it possible for them to configure key management app
+%% settings such as rates_mode.
+get_env(Key) ->
+ rabbit_misc:get_env(rabbitmq_management, Key,
+ rabbit_misc:get_env(rabbitmq_management_agent, Key,
+ undefined)).
+get_env(Key, Default) ->
+ rabbit_misc:get_env(rabbitmq_management, Key,
+ rabbit_misc:get_env(rabbitmq_management_agent, Key,
+ Default)).
diff --git a/deps/rabbitmq_management_agent/src/rabbit_mgmt_agent_sup.erl b/deps/rabbitmq_management_agent/src/rabbit_mgmt_agent_sup.erl
new file mode 100644
index 0000000000..0c4a5465e9
--- /dev/null
+++ b/deps/rabbitmq_management_agent/src/rabbit_mgmt_agent_sup.erl
@@ -0,0 +1,55 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%% pg2 is deprecated in OTP 23.
+init([]) ->
+ MCs = maybe_enable_metrics_collector(),
+ ExternalStats = {rabbit_mgmt_external_stats,
+ {rabbit_mgmt_external_stats, start_link, []},
+ permanent, 5000, worker, [rabbit_mgmt_external_stats]},
+ {ok, {{one_for_one, 100, 10}, [ExternalStats] ++ MCs}}.
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+maybe_enable_metrics_collector() ->
+ case application:get_env(rabbitmq_management_agent, disable_metrics_collector, false) of
+ false ->
+ pg2:create(management_db),
+ ok = pg2:join(management_db, self()),
+ ST = {rabbit_mgmt_storage, {rabbit_mgmt_storage, start_link, []},
+ permanent, ?WORKER_WAIT, worker, [rabbit_mgmt_storage]},
+ MD = {delegate_management_sup, {delegate_sup, start_link, [5, ?DELEGATE_PREFIX]},
+ permanent, ?SUPERVISOR_WAIT, supervisor, [delegate_sup]},
+ MC = [{rabbit_mgmt_metrics_collector:name(Table),
+ {rabbit_mgmt_metrics_collector, start_link, [Table]},
+ permanent, ?WORKER_WAIT, worker, [rabbit_mgmt_metrics_collector]}
+ || {Table, _} <- ?CORE_TABLES],
+ MGC = [{rabbit_mgmt_metrics_gc:name(Event),
+ {rabbit_mgmt_metrics_gc, start_link, [Event]},
+ permanent, ?WORKER_WAIT, worker, [rabbit_mgmt_metrics_gc]}
+ || Event <- ?GC_EVENTS],
+ GC = {rabbit_mgmt_gc, {rabbit_mgmt_gc, start_link, []},
+ permanent, ?WORKER_WAIT, worker, [rabbit_mgmt_gc]},
+ [ST, MD, GC | MC ++ MGC];
+ true ->
+ []
+ end.
diff --git a/deps/rabbitmq_management_agent/src/rabbit_mgmt_agent_sup_sup.erl b/deps/rabbitmq_management_agent/src/rabbit_mgmt_agent_sup_sup.erl
new file mode 100644
index 0000000000..17ffa35307
--- /dev/null
+++ b/deps/rabbitmq_management_agent/src/rabbit_mgmt_agent_sup_sup.erl
@@ -0,0 +1,28 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+-export([start_link/0, start_child/0]).
+start_child() ->
+ supervisor2:start_child(?MODULE, sup()).
+sup() ->
+ {rabbit_mgmt_agent_sup, {rabbit_mgmt_agent_sup, start_link, []},
+ temporary, ?SUPERVISOR_WAIT, supervisor, [rabbit_mgmt_agent_sup]}.
+init([]) ->
+ {ok, {{one_for_one, 0, 1}, [sup()]}}.
+start_link() ->
+ supervisor2:start_link({local, ?MODULE}, ?MODULE, []).
diff --git a/deps/rabbitmq_management_agent/src/rabbit_mgmt_data.erl b/deps/rabbitmq_management_agent/src/rabbit_mgmt_data.erl
new file mode 100644
index 0000000000..d73c8a3819
--- /dev/null
+++ b/deps/rabbitmq_management_agent/src/rabbit_mgmt_data.erl
@@ -0,0 +1,572 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at
+%% Copyright (c) 2016-2020 VMware, Inc. or its affiliates. All rights reserved.
+-export([empty/2, pick_range/2]).
+% delegate api
+ consumer_data/2,
+ all_list_queue_data/3,
+ all_detail_queue_data/3,
+ all_exchange_data/3,
+ all_connection_data/3,
+ all_list_channel_data/3,
+ all_detail_channel_data/3,
+ all_vhost_data/3,
+ all_node_data/3,
+ augmented_created_stats/2,
+ augmented_created_stats/3,
+ augment_channel_pids/2,
+ augment_details/2,
+ lookup_element/2,
+ lookup_element/3
+ ]).
+-import(rabbit_misc, [pget/2]).
+-type maybe_slide() :: exometer_slide:slide() | not_found.
+-type ranges() :: {maybe_range(), maybe_range(), maybe_range(), maybe_range()}.
+-type maybe_range() :: no_range | #range{}.
+%% Internal, query-time - node-local operations
+created_stats(Name, Type) ->
+ case ets:select(Type, [{{'_', '$2', '$3'}, [{'==', Name, '$2'}], ['$3']}]) of
+ [] -> not_found;
+ [Elem] -> Elem
+ end.
+created_stats(Type) ->
+ %% TODO better tab2list?
+ ets:select(Type, [{{'_', '_', '$3'}, [], ['$3']}]).
+-spec all_detail_queue_data(pid(), [any()], ranges()) -> #{atom() => any()}.
+all_detail_queue_data(_Pid, Ids, Ranges) ->
+ lists:foldl(fun (Id, Acc) ->
+ Data = detail_queue_data(Ranges, Id),
+ maps:put(Id, Data, Acc)
+ end, #{}, Ids).
+all_list_queue_data(_Pid, Ids, Ranges) ->
+ lists:foldl(fun (Id, Acc) ->
+ Data = list_queue_data(Ranges, Id),
+ maps:put(Id, Data, Acc)
+ end, #{}, Ids).
+all_detail_channel_data(_Pid, Ids, Ranges) ->
+ lists:foldl(fun (Id, Acc) ->
+ Data = detail_channel_data(Ranges, Id),
+ maps:put(Id, Data, Acc)
+ end, #{}, Ids).
+all_list_channel_data(_Pid, Ids, Ranges) ->
+ lists:foldl(fun (Id, Acc) ->
+ Data = list_channel_data(Ranges, Id),
+ maps:put(Id, Data, Acc)
+ end, #{}, Ids).
+connection_data(Ranges, Id) ->
+ maps:from_list([raw_message_data(connection_stats_coarse_conn_stats,
+ pick_range(coarse_conn_stats, Ranges), Id),
+ {connection_stats, lookup_element(connection_stats, Id)}]).
+exchange_data(Ranges, Id) ->
+ maps:from_list(
+ exchange_raw_detail_stats_data(Ranges, Id) ++
+ [raw_message_data(exchange_stats_publish_out,
+ pick_range(fine_stats, Ranges), Id),
+ raw_message_data(exchange_stats_publish_in,
+ pick_range(fine_stats, Ranges), Id)]).
+vhost_data(Ranges, Id) ->
+ maps:from_list([raw_message_data(vhost_stats_coarse_conn_stats,
+ pick_range(coarse_conn_stats, Ranges), Id),
+ raw_message_data(vhost_msg_stats,
+ pick_range(queue_msg_rates, Ranges), Id),
+ raw_message_data(vhost_stats_fine_stats,
+ pick_range(fine_stats, Ranges), Id),
+ raw_message_data(vhost_stats_deliver_stats,
+ pick_range(deliver_get, Ranges), Id)]).
+node_data(Ranges, Id) ->
+ maps:from_list(
+ [{mgmt_stats, mgmt_queue_length_stats(Id)}] ++
+ [{node_node_metrics, node_node_metrics()}] ++
+ node_raw_detail_stats_data(Ranges, Id) ++
+ [raw_message_data(node_coarse_stats,
+ pick_range(coarse_node_stats, Ranges), Id),
+ raw_message_data(node_persister_stats,
+ pick_range(coarse_node_stats, Ranges), Id),
+ {node_stats, lookup_element(node_stats, Id)}] ++
+ node_connection_churn_rates_data(Ranges, Id)).
+overview_data(_Pid, User, Ranges, VHosts) ->
+ Raw = [raw_all_message_data(vhost_msg_stats, pick_range(queue_msg_counts, Ranges), VHosts),
+ raw_all_message_data(vhost_stats_fine_stats, pick_range(fine_stats, Ranges), VHosts),
+ raw_all_message_data(vhost_msg_rates, pick_range(queue_msg_rates, Ranges), VHosts),
+ raw_all_message_data(vhost_stats_deliver_stats, pick_range(deliver_get, Ranges), VHosts),
+ raw_message_data(connection_churn_rates, pick_range(queue_msg_rates, Ranges), node())],
+ maps:from_list(Raw ++
+ [{connections_count, count_created_stats(connection_created_stats, User)},
+ {channels_count, count_created_stats(channel_created_stats, User)},
+ {consumers_count, ets:info(consumer_stats, size)}]).
+consumer_data(_Pid, VHost) ->
+ maps:from_list(
+ [{C, augment_msg_stats(augment_consumer(C))}
+ || C <- consumers_by_vhost(VHost)]).
+all_connection_data(_Pid, Ids, Ranges) ->
+ maps:from_list([{Id, connection_data(Ranges, Id)} || Id <- Ids]).
+all_exchange_data(_Pid, Ids, Ranges) ->
+ maps:from_list([{Id, exchange_data(Ranges, Id)} || Id <- Ids]).
+all_vhost_data(_Pid, Ids, Ranges) ->
+ maps:from_list([{Id, vhost_data(Ranges, Id)} || Id <- Ids]).
+all_node_data(_Pid, Ids, Ranges) ->
+ maps:from_list([{Id, node_data(Ranges, Id)} || Id <- Ids]).
+channel_raw_message_data(Ranges, Id) ->
+ [raw_message_data(channel_stats_fine_stats, pick_range(fine_stats, Ranges), Id),
+ raw_message_data(channel_stats_deliver_stats, pick_range(deliver_get, Ranges), Id),
+ raw_message_data(channel_process_stats, pick_range(process_stats, Ranges), Id)].
+queue_raw_message_data(Ranges, Id) ->
+ [raw_message_data(queue_stats_publish, pick_range(fine_stats, Ranges), Id),
+ raw_message_data(queue_stats_deliver_stats, pick_range(deliver_get, Ranges), Id),
+ raw_message_data(queue_process_stats, pick_range(process_stats, Ranges), Id),
+ raw_message_data(queue_msg_stats, pick_range(queue_msg_counts, Ranges), Id)].
+queue_raw_deliver_stats_data(Ranges, Id) ->
+ [raw_message_data2(channel_queue_stats_deliver_stats,
+ pick_range(deliver_get, Ranges), Key)
+ || Key <- get_table_keys(channel_queue_stats_deliver_stats, second(Id))] ++
+ [raw_message_data2(queue_exchange_stats_publish,
+ pick_range(fine_stats, Ranges), Key)
+ || Key <- get_table_keys(queue_exchange_stats_publish, first(Id))].
+node_raw_detail_stats_data(Ranges, Id) ->
+ [raw_message_data2(node_node_coarse_stats,
+ pick_range(coarse_node_node_stats, Ranges), Key)
+ || Key <- get_table_keys(node_node_coarse_stats, first(Id))].
+node_connection_churn_rates_data(Ranges, Id) ->
+ [raw_message_data(connection_churn_rates,
+ pick_range(churn_rates, Ranges), Id)].
+exchange_raw_detail_stats_data(Ranges, Id) ->
+ [raw_message_data2(channel_exchange_stats_fine_stats,
+ pick_range(fine_stats, Ranges), Key)
+ || Key <- get_table_keys(channel_exchange_stats_fine_stats, second(Id))] ++
+ [raw_message_data2(queue_exchange_stats_publish,
+ pick_range(fine_stats, Ranges), Key)
+ || Key <- get_table_keys(queue_exchange_stats_publish, second(Id))].
+channel_raw_detail_stats_data(Ranges, Id) ->
+ [raw_message_data2(channel_exchange_stats_fine_stats,
+ pick_range(fine_stats, Ranges), Key)
+ || Key <- get_table_keys(channel_exchange_stats_fine_stats, first(Id))] ++
+ [raw_message_data2(channel_queue_stats_deliver_stats,
+ pick_range(fine_stats, Ranges), Key)
+ || Key <- get_table_keys(channel_queue_stats_deliver_stats, first(Id))].
+raw_message_data2(Table, no_range, Id) ->
+ SmallSample = lookup_smaller_sample(Table, Id),
+ {{Table, Id}, {SmallSample, not_found}};
+raw_message_data2(Table, Range, Id) ->
+ SmallSample = lookup_smaller_sample(Table, Id),
+ Samples = lookup_samples(Table, Id, Range),
+ {{Table, Id}, {SmallSample, Samples}}.
+detail_queue_data(Ranges, Id) ->
+ maps:from_list(queue_raw_message_data(Ranges, Id) ++
+ queue_raw_deliver_stats_data(Ranges, Id) ++
+ [{queue_stats, lookup_element(queue_stats, Id)},
+ {consumer_stats, get_queue_consumer_stats(Id)}]).
+list_queue_data(Ranges, Id) ->
+ maps:from_list(queue_raw_message_data(Ranges, Id) ++
+ queue_raw_deliver_stats_data(Ranges, Id) ++
+ [{queue_stats, lookup_element(queue_stats, Id)}]).
+detail_channel_data(Ranges, Id) ->
+ maps:from_list(channel_raw_message_data(Ranges, Id) ++
+ channel_raw_detail_stats_data(Ranges, Id) ++
+ [{channel_stats, lookup_element(channel_stats, Id)},
+ {consumer_stats, get_consumer_stats(Id)}]).
+list_channel_data(Ranges, Id) ->
+ maps:from_list(channel_raw_message_data(Ranges, Id) ++
+ channel_raw_detail_stats_data(Ranges, Id) ++
+ [{channel_stats, lookup_element(channel_stats, Id)}]).
+-spec raw_message_data(atom(), maybe_range(), any()) ->
+ {atom(), {maybe_slide(), maybe_slide()}}.
+raw_message_data(Table, no_range, Id) ->
+ SmallSample = lookup_smaller_sample(Table, Id),
+ {Table, {SmallSample, not_found}};
+raw_message_data(Table, Range, Id) ->
+ SmallSample = lookup_smaller_sample(Table, Id),
+ Samples = lookup_samples(Table, Id, Range),
+ {Table, {SmallSample, Samples}}.
+raw_all_message_data(Table, Range, VHosts) ->
+ SmallSample = lookup_all(Table, VHosts, select_smaller_sample(Table)),
+ RangeSample = case Range of
+ no_range -> not_found;
+ _ ->
+ lookup_all(Table, VHosts, select_range_sample(Table,
+ Range))
+ end,
+ {Table, {SmallSample, RangeSample}}.
+get_queue_consumer_stats(Id) ->
+ Consumers = ets:select(consumer_stats, match_queue_consumer_spec(Id)),
+ [augment_consumer(C) || C <- Consumers].
+get_consumer_stats(Id) ->
+ Consumers = ets:select(consumer_stats, match_consumer_spec(Id)),
+ [augment_consumer(C) || C <- Consumers].
+count_created_stats(Type, all) ->
+ ets:info(Type, size);
+count_created_stats(Type, User) ->
+ length(filter_user(created_stats(Type), User)).
+augment_consumer({{Q, Ch, CTag}, Props}) ->
+ [{queue, format_resource(Q)},
+ {channel_details, augment_channel_pid(Ch)},
+ {channel_pid, Ch},
+ {consumer_tag, CTag} | Props].
+consumers_by_vhost(VHost) ->
+ ets:select(consumer_stats,
+ [{{{#resource{virtual_host = '$1', _ = '_'}, '_', '_'}, '_'},
+ [{'orelse', {'==', 'all', VHost}, {'==', VHost, '$1'}}],
+ ['$_']}]).
+augment_msg_stats(Props) ->
+ augment_details(Props, []) ++ Props.
+augment_details([{_, none} | T], Acc) ->
+ augment_details(T, Acc);
+augment_details([{_, unknown} | T], Acc) ->
+ augment_details(T, Acc);
+augment_details([{connection, Value} | T], Acc) ->
+ augment_details(T, [{connection_details, augment_connection_pid(Value)} | Acc]);
+augment_details([{channel, Value} | T], Acc) ->
+ augment_details(T, [{channel_details, augment_channel_pid(Value)} | Acc]);
+augment_details([{owner_pid, Value} | T], Acc) ->
+ augment_details(T, [{owner_pid_details, augment_connection_pid(Value)} | Acc]);
+augment_details([_ | T], Acc) ->
+ augment_details(T, Acc);
+augment_details([], Acc) ->
+ Acc.
+augment_channel_pids(_Pid, ChPids) ->
+ lists:map(fun (ChPid) -> augment_channel_pid(ChPid) end, ChPids).
+augment_channel_pid(Pid) ->
+ Ch = lookup_element(channel_created_stats, Pid, 3),
+ Conn = lookup_element(connection_created_stats, pget(connection, Ch), 3),
+ case Conn of
+ [] -> %% If the connection has just been opened, we might not yet have the data
+ [];
+ _ ->
+ [{name, pget(name, Ch)},
+ {pid, pget(pid, Ch)},
+ {number, pget(number, Ch)},
+ {user, pget(user, Ch)},
+ {connection_name, pget(name, Conn)},
+ {peer_port, pget(peer_port, Conn)},
+ {peer_host, pget(peer_host, Conn)}]
+ end.
+augment_connection_pid(Pid) ->
+ Conn = lookup_element(connection_created_stats, Pid, 3),
+ case Conn of
+ [] -> %% If the connection has just been opened, we might not yet have the data
+ [];
+ _ ->
+ [{name, pget(name, Conn)},
+ {peer_port, pget(peer_port, Conn)},
+ {peer_host, pget(peer_host, Conn)}]
+ end.
+augmented_created_stats(_Pid, Key, Type) ->
+ case created_stats(Key, Type) of
+ not_found -> not_found;
+ S -> augment_msg_stats(S)
+ end.
+augmented_created_stats(_Pid, Type) ->
+ [ augment_msg_stats(S) || S <- created_stats(Type) ].
+match_consumer_spec(Id) ->
+ [{{{'_', '$1', '_'}, '_'}, [{'==', Id, '$1'}], ['$_']}].
+match_queue_consumer_spec(Id) ->
+ [{{{'$1', '_', '_'}, '_'}, [{'==', {Id}, '$1'}], ['$_']}].
+lookup_element(Table, Key) -> lookup_element(Table, Key, 2).
+lookup_element(Table, Key, Pos) ->
+ try ets:lookup_element(Table, Key, Pos)
+ catch error:badarg -> []
+ end.
+-spec lookup_smaller_sample(atom(), any()) -> maybe_slide().
+lookup_smaller_sample(Table, Id) ->
+ case ets:lookup(Table, {Id, select_smaller_sample(Table)}) of
+ [] ->
+ not_found;
+ [{_, Slide}] ->
+ Slide1 = exometer_slide:optimize(Slide),
+ maybe_convert_for_compatibility(Table, Slide1)
+ end.
+-spec lookup_samples(atom(), any(), #range{}) -> maybe_slide().
+lookup_samples(Table, Id, Range) ->
+ case ets:lookup(Table, {Id, select_range_sample(Table, Range)}) of
+ [] ->
+ not_found;
+ [{_, Slide}] ->
+ Slide1 = exometer_slide:optimize(Slide),
+ maybe_convert_for_compatibility(Table, Slide1)
+ end.
+lookup_all(Table, Ids, SecondKey) ->
+ Slides = lists:foldl(fun(Id, Acc) ->
+ case ets:lookup(Table, {Id, SecondKey}) of
+ [] ->
+ Acc;
+ [{_, Slide}] ->
+ [Slide | Acc]
+ end
+ end, [], Ids),
+ case Slides of
+ [] ->
+ not_found;
+ _ ->
+ Slide = exometer_slide:sum(Slides, empty(Table, 0)),
+ maybe_convert_for_compatibility(Table, Slide)
+ end.
+maybe_convert_for_compatibility(Table, Slide)
+ when Table =:= channel_stats_fine_stats orelse
+ Table =:= channel_exchange_stats_fine_stats orelse
+ Table =:= vhost_stats_fine_stats ->
+ ConversionNeeded = rabbit_feature_flags:is_disabled(
+ drop_unroutable_metric),
+ case ConversionNeeded of
+ false ->
+ Slide;
+ true ->
+ %% drop_drop because the metric is named "drop_unroutable"
+ rabbit_mgmt_data_compat:drop_drop_unroutable_metric(Slide)
+ end;
+maybe_convert_for_compatibility(Table, Slide)
+ when Table =:= channel_queue_stats_deliver_stats orelse
+ Table =:= channel_stats_deliver_stats orelse
+ Table =:= queue_stats_deliver_stats orelse
+ Table =:= vhost_stats_deliver_stats ->
+ ConversionNeeded = rabbit_feature_flags:is_disabled(
+ empty_basic_get_metric),
+ case ConversionNeeded of
+ false ->
+ Slide;
+ true ->
+ rabbit_mgmt_data_compat:drop_get_empty_queue_metric(Slide)
+ end;
+maybe_convert_for_compatibility(_, Slide) ->
+ Slide.
+get_table_keys(Table, Id0) ->
+ ets:select(Table, match_spec_keys(Id0)).
+match_spec_keys(Id) ->
+ MatchCondition = to_match_condition(Id),
+ MatchHead = {{{'$1', '$2'}, '_'}, '_'},
+ [{MatchHead, [MatchCondition], [{{'$1', '$2'}}]}].
+to_match_condition({'_', Id1}) when is_tuple(Id1) ->
+ {'==', {Id1}, '$2'};
+to_match_condition({'_', Id1}) ->
+ {'==', Id1, '$2'};
+to_match_condition({Id0, '_'}) when is_tuple(Id0) ->
+ {'==', {Id0}, '$1'};
+to_match_condition({Id0, '_'}) ->
+ {'==', Id0, '$1'}.
+mgmt_queue_length_stats(Id) when Id =:= node() ->
+ GCsQueueLengths = lists:map(fun (T) ->
+ case whereis(rabbit_mgmt_metrics_gc:name(T)) of
+ P when is_pid(P) ->
+ {message_queue_len, Len} =
+ erlang:process_info(P, message_queue_len),
+ {T, Len};
+ _ -> {T, 0}
+ end
+ end,
+ [{metrics_gc_queue_length, GCsQueueLengths}];
+mgmt_queue_length_stats(_Id) ->
+ % if it isn't for the current node just return an empty list
+ [].
+node_node_metrics() ->
+ maps:from_list(ets:tab2list(node_node_metrics)).
+select_range_sample(Table, #range{first = First, last = Last}) ->
+ Range = Last - First,
+ Policies = rabbit_mgmt_agent_config:get_env(sample_retention_policies),
+ Policy = retention_policy(Table),
+ [T | _] = TablePolicies = lists:sort(proplists:get_value(Policy, Policies)),
+ {_, Sample} = select_smallest_above(T, TablePolicies, Range),
+ Sample.
+select_smaller_sample(Table) ->
+ Policies = rabbit_mgmt_agent_config:get_env(sample_retention_policies),
+ Policy = retention_policy(Table),
+ TablePolicies = proplists:get_value(Policy, Policies),
+ [V | _] = lists:sort([I || {_, I} <- TablePolicies]),
+ V.
+select_smallest_above(V, [], _) ->
+ V;
+select_smallest_above(_, [{H, _} = S | _T], Interval) when (H * 1000) > Interval ->
+ S;
+select_smallest_above(_, [H | T], Interval) ->
+ select_smallest_above(H, T, Interval).
+pick_range(queue_msg_counts, {RangeL, _RangeM, _RangeD, _RangeN}) ->
+ RangeL;
+pick_range(K, {_RangeL, RangeM, _RangeD, _RangeN}) when K == fine_stats;
+ K == deliver_get;
+ K == queue_msg_rates ->
+ RangeM;
+pick_range(K, {_RangeL, _RangeM, RangeD, _RangeN}) when K == coarse_conn_stats;
+ K == process_stats ->
+ RangeD;
+pick_range(K, {_RangeL, _RangeM, _RangeD, RangeN})
+ when K == coarse_node_stats;
+ K == coarse_node_node_stats;
+ K == churn_rates ->
+ RangeN.
+first(Id) ->
+ {Id, '_'}.
+second(Id) ->
+ {'_', Id}.
+empty(Type, V) when Type =:= connection_stats_coarse_conn_stats;
+ Type =:= queue_msg_stats;
+ Type =:= vhost_msg_stats ->
+ {V, V, V};
+empty(Type, V) when Type =:= channel_stats_fine_stats;
+ Type =:= channel_exchange_stats_fine_stats;
+ Type =:= vhost_stats_fine_stats ->
+ {V, V, V, V};
+empty(Type, V) when Type =:= channel_queue_stats_deliver_stats;
+ Type =:= queue_stats_deliver_stats;
+ Type =:= vhost_stats_deliver_stats;
+ Type =:= channel_stats_deliver_stats ->
+ {V, V, V, V, V, V, V, V};
+empty(Type, V) when Type =:= channel_process_stats;
+ Type =:= queue_process_stats;
+ Type =:= queue_stats_publish;
+ Type =:= queue_exchange_stats_publish;
+ Type =:= exchange_stats_publish_out;
+ Type =:= exchange_stats_publish_in ->
+ {V};
+empty(node_coarse_stats, V) ->
+ {V, V, V, V, V, V, V, V};
+empty(node_persister_stats, V) ->
+ {V, V, V, V, V, V, V, V, V, V, V, V, V, V, V, V, V, V, V, V};
+empty(Type, V) when Type =:= node_node_coarse_stats;
+ Type =:= vhost_stats_coarse_conn_stats;
+ Type =:= queue_msg_rates;
+ Type =:= vhost_msg_rates ->
+ {V, V};
+empty(connection_churn_rates, V) ->
+ {V, V, V, V, V, V, V}.
+retention_policy(connection_stats_coarse_conn_stats) ->
+ basic;
+retention_policy(channel_stats_fine_stats) ->
+ basic;
+retention_policy(channel_queue_stats_deliver_stats) ->
+ detailed;
+retention_policy(channel_exchange_stats_fine_stats) ->
+ detailed;
+retention_policy(channel_process_stats) ->
+ basic;
+retention_policy(vhost_stats_fine_stats) ->
+ global;
+retention_policy(vhost_stats_deliver_stats) ->
+ global;
+retention_policy(vhost_stats_coarse_conn_stats) ->
+ global;
+retention_policy(vhost_msg_rates) ->
+ global;
+retention_policy(channel_stats_deliver_stats) ->
+ basic;
+retention_policy(queue_stats_deliver_stats) ->
+ basic;
+retention_policy(queue_stats_publish) ->
+ basic;
+retention_policy(queue_exchange_stats_publish) ->
+ basic;
+retention_policy(exchange_stats_publish_out) ->
+ basic;
+retention_policy(exchange_stats_publish_in) ->
+ basic;
+retention_policy(queue_process_stats) ->
+ basic;
+retention_policy(queue_msg_stats) ->
+ basic;
+retention_policy(queue_msg_rates) ->
+ basic;
+retention_policy(vhost_msg_stats) ->
+ global;
+retention_policy(node_coarse_stats) ->
+ global;
+retention_policy(node_persister_stats) ->
+ global;
+retention_policy(node_node_coarse_stats) ->
+ global;
+retention_policy(connection_churn_rates) ->
+ global.
+format_resource(unknown) -> unknown;
+format_resource(Res) -> format_resource(name, Res).
+format_resource(_, unknown) ->
+ unknown;
+format_resource(NameAs, #resource{name = Name, virtual_host = VHost}) ->
+ [{NameAs, Name}, {vhost, VHost}].
+filter_user(List, #user{username = Username, tags = Tags}) ->
+ case is_monitor(Tags) of
+ true -> List;
+ false -> [I || I <- List, pget(user, I) == Username]
+ end.
+is_monitor(T) -> intersects(T, [administrator, monitoring]).
+intersects(A, B) -> lists:any(fun(I) -> lists:member(I, B) end, A).
diff --git a/deps/rabbitmq_management_agent/src/rabbit_mgmt_data_compat.erl b/deps/rabbitmq_management_agent/src/rabbit_mgmt_data_compat.erl
new file mode 100644
index 0000000000..9fd127aff5
--- /dev/null
+++ b/deps/rabbitmq_management_agent/src/rabbit_mgmt_data_compat.erl
@@ -0,0 +1,80 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at
+%% Copyright (c) 2018-2020 VMware, Inc. or its affiliates. All rights reserved.
+ drop_get_empty_queue_metric/1,
+ fill_consumer_active_fields/1,
+ fill_drop_unroutable_metric/1,
+ drop_drop_unroutable_metric/1]).
+fill_get_empty_queue_metric(Slide) ->
+ exometer_slide:map(
+ fun
+ (Value) when is_tuple(Value) andalso size(Value) =:= 8 ->
+ Value;
+ (Value) when is_tuple(Value) andalso size(Value) =:= 7 ->
+ %% Inject a 0 for the new metric
+ list_to_tuple(
+ tuple_to_list(Value) ++ [0]);
+ (Value) ->
+ Value
+ end, Slide).
+drop_get_empty_queue_metric(Slide) ->
+ exometer_slide:map(
+ fun
+ (Value) when is_tuple(Value) andalso size(Value) =:= 8 ->
+ %% We want to remove the last element, which is
+ %% the count of basic.get on empty queues.
+ list_to_tuple(
+ lists:sublist(
+ tuple_to_list(Value), size(Value) - 1));
+ (Value) when is_tuple(Value) andalso size(Value) =:= 7 ->
+ Value;
+ (Value) ->
+ Value
+ end, Slide).
+fill_drop_unroutable_metric(Slide) ->
+ exometer_slide:map(
+ fun
+ (Value) when is_tuple(Value) andalso size(Value) =:= 4 ->
+ Value;
+ (Value) when is_tuple(Value) andalso size(Value) =:= 3 ->
+ %% Inject a 0
+ list_to_tuple(
+ tuple_to_list(Value) ++ [0]);
+ (Value) ->
+ Value
+ end, Slide).
+drop_drop_unroutable_metric(Slide) ->
+ exometer_slide:map(
+ fun
+ (Value) when is_tuple(Value) andalso size(Value) =:= 4 ->
+ %% Remove the last element.
+ list_to_tuple(
+ lists:sublist(
+ tuple_to_list(Value), size(Value) - 1));
+ (Value) when is_tuple(Value) andalso size(Value) =:= 3 ->
+ Value;
+ (Value) ->
+ Value
+ end, Slide).
+fill_consumer_active_fields(ConsumersStats) ->
+ [case proplists:get_value(active, ConsumerStats) of
+ undefined ->
+ [{active, true},
+ {activity_status, up}
+ | ConsumerStats];
+ _ ->
+ ConsumerStats
+ end
+ || ConsumerStats <- ConsumersStats].
diff --git a/deps/rabbitmq_management_agent/src/rabbit_mgmt_db_handler.erl b/deps/rabbitmq_management_agent/src/rabbit_mgmt_db_handler.erl
new file mode 100644
index 0000000000..c1e43223d7
--- /dev/null
+++ b/deps/rabbitmq_management_agent/src/rabbit_mgmt_db_handler.erl
@@ -0,0 +1,99 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%% Make sure our database is hooked in *before* listening on the network or
+%% recovering queues (i.e. so there can't be any events fired before it starts).
+ [{description, "management agent"},
+ {mfa, {?MODULE, add_handler, []}},
+ {cleanup, {gen_event, delete_handler,
+ [rabbit_event, ?MODULE, []]}},
+ {requires, rabbit_event},
+ {enables, recovery}]}).
+-export([add_handler/0, gc/0, rates_mode/0]).
+-export([init/1, handle_call/2, handle_event/2, handle_info/2,
+ terminate/2, code_change/3]).
+add_handler() ->
+ ok = ensure_statistics_enabled(),
+ gen_event:add_handler(rabbit_event, ?MODULE, []).
+gc() ->
+ erlang:garbage_collect(whereis(rabbit_event)).
+rates_mode() ->
+ case rabbit_mgmt_agent_config:get_env(rates_mode) of
+ undefined -> basic;
+ Mode -> Mode
+ end.
+handle_force_fine_statistics() ->
+ case rabbit_mgmt_agent_config:get_env(force_fine_statistics) of
+ undefined ->
+ ok;
+ X ->
+ rabbit_log:warning(
+ "force_fine_statistics set to ~p; ignored.~n"
+ "Replaced by {rates_mode, none} in the rabbitmq_management "
+ "application.~n", [X])
+ end.
+ensure_statistics_enabled() ->
+ ForceStats = rates_mode() =/= none,
+ handle_force_fine_statistics(),
+ {ok, StatsLevel} = application:get_env(rabbit, collect_statistics),
+ rabbit_log:info("Management plugin: using rates mode '~p'~n", [rates_mode()]),
+ case {ForceStats, StatsLevel} of
+ {true, fine} ->
+ ok;
+ {true, _} ->
+ application:set_env(rabbit, collect_statistics, fine);
+ {false, none} ->
+ application:set_env(rabbit, collect_statistics, coarse);
+ {_, _} ->
+ ok
+ end,
+ ok = rabbit:force_event_refresh(erlang:make_ref()).
+init([]) ->
+ {ok, []}.
+handle_call(_Request, State) ->
+ {ok, not_understood, State}.
+handle_event(#event{type = Type} = Event, State)
+ when Type == connection_closed; Type == channel_closed; Type == queue_deleted;
+ Type == exchange_deleted; Type == vhost_deleted;
+ Type == consumer_deleted; Type == node_node_deleted;
+ Type == channel_consumer_deleted ->
+ gen_server:cast(rabbit_mgmt_metrics_gc:name(Type), {event, Event}),
+ {ok, State};
+handle_event(_, State) ->
+ {ok, State}.
+handle_info(_Info, State) ->
+ {ok, State}.
+terminate(_Arg, _State) ->
+ ok.
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
diff --git a/deps/rabbitmq_management_agent/src/rabbit_mgmt_external_stats.erl b/deps/rabbitmq_management_agent/src/rabbit_mgmt_external_stats.erl
new file mode 100644
index 0000000000..5e92d8394c
--- /dev/null
+++ b/deps/rabbitmq_management_agent/src/rabbit_mgmt_external_stats.erl
@@ -0,0 +1,501 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%% Transitional step until we can require Erlang/OTP 21 and
+%% use the now recommended try/catch syntax for obtaining the stack trace.
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
+-import(rabbit_misc, [pget/2]).
+-define(METRICS_KEYS, [fd_used, sockets_used, mem_used, disk_free, proc_used, gc_num,
+ gc_bytes_reclaimed, context_switches]).
+-define(PERSISTER_KEYS, [persister_stats]).
+-define(OTHER_KEYS, [name, partitions, os_pid, fd_total, sockets_total, mem_limit,
+ mem_alarm, disk_free_limit, disk_free_alarm, proc_total,
+ rates_mode, uptime, run_queue, processors, exchange_types,
+ auth_mechanisms, applications, contexts, log_files,
+ db_dir, config_files, net_ticktime, enabled_plugins,
+ mem_calculation_strategy, ra_open_file_metrics]).
+-define(TEN_MINUTES_AS_SECONDS, 600).
+-record(state, {
+ fd_total,
+ fhc_stats,
+ node_owners,
+ last_ts,
+ interval,
+ error_logged_time
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+get_used_fd(State0) ->
+ try
+ case get_used_fd(os:type(), State0) of
+ {State1, UsedFd} when is_number(UsedFd) ->
+ {State1, UsedFd};
+ {State1, _Other} ->
+ %% Defaults to 0 if data is not available
+ {State1, 0}
+ end
+ catch
+ _:Error ->
+ State2 = log_fd_error("Could not infer the number of file handles used: ~p~n", [Error], State0),
+ {State2, 0}
+ end.
+get_used_fd({unix, linux}, State0) ->
+ case file:list_dir("/proc/" ++ os:getpid() ++ "/fd") of
+ {ok, Files} ->
+ {State0, length(Files)};
+ {error, _} ->
+ get_used_fd({unix, generic}, State0)
+ end;
+get_used_fd({unix, BSD}, State0)
+ when BSD == openbsd; BSD == freebsd; BSD == netbsd ->
+ IsDigit = fun (D) -> lists:member(D, "0123456789*") end,
+ Output = os:cmd("fstat -p " ++ os:getpid()),
+ try
+ F = fun (Line) ->
+ lists:all(IsDigit, lists:nth(4, string:tokens(Line, " ")))
+ end,
+ UsedFd = length(lists:filter(F, string:tokens(Output, "\n"))),
+ {State0, UsedFd}
+ catch _:Error:Stacktrace ->
+ State1 = log_fd_error("Could not parse fstat output:~n~s~n~p~n",
+ [Output, {Error, Stacktrace}], State0),
+ {State1, 0}
+ end;
+get_used_fd({unix, _}, State0) ->
+ Cmd = rabbit_misc:format(
+ "lsof -d \"0-9999999\" -lna -p ~s || echo failed", [os:getpid()]),
+ Res = os:cmd(Cmd),
+ case string:right(Res, 7) of
+ "failed\n" ->
+ State1 = log_fd_error("Could not obtain lsof output~n", [], State0),
+ {State1, 0};
+ _ ->
+ UsedFd = string:words(Res, $\n) - 1,
+ {State0, UsedFd}
+ end;
+%% handle.exe can be obtained from
+%% Output looks like:
+%% Handle v3.42
+%% Copyright (C) 1997-2008 Mark Russinovich
+%% Sysinternals -
+%% Handle type summary:
+%% ALPC Port : 2
+%% Desktop : 1
+%% Directory : 1
+%% Event : 108
+%% File : 25
+%% IoCompletion : 3
+%% Key : 7
+%% KeyedEvent : 1
+%% Mutant : 1
+%% Process : 3
+%% Process : 38
+%% Thread : 41
+%% Timer : 3
+%% TpWorkerFactory : 2
+%% WindowStation : 2
+%% Total handles: 238
+%% Nthandle v4.22 - Handle viewer
+%% Copyright (C) 1997-2019 Mark Russinovich
+%% Sysinternals -
+%% Handle type summary:
+%% <Unknown type> : 1
+%% <Unknown type> : 166
+%% ALPC Port : 11
+%% Desktop : 1
+%% Directory : 2
+%% Event : 226
+%% File : 122
+%% IoCompletion : 8
+%% IRTimer : 6
+%% Key : 42
+%% Mutant : 7
+%% Process : 3
+%% Section : 2
+%% Semaphore : 43
+%% Thread : 36
+%% TpWorkerFactory : 3
+%% WaitCompletionPacket: 25
+%% WindowStation : 2
+%% Total handles: 706
+%% Note that the "File" number appears to include network sockets too; I assume
+%% that's the number we care about. Note also that if you omit "-s" you will
+%% see a list of file handles *without* network sockets. If you then add "-a"
+%% you will see a list of handles of various types, including network sockets
+%% shown as file handles to \Device\Afd.
+get_used_fd({win32, _}, State0) ->
+ Handle = rabbit_misc:os_cmd(
+ "handle.exe /accepteula -s -p " ++ os:getpid() ++ " 2> nul"),
+ case Handle of
+ [] ->
+ State1 = log_fd_error("Could not find handle.exe, please install from sysinternals~n", [], State0),
+ {State1, 0};
+ _ ->
+ case find_files_line(string:tokens(Handle, "\r\n")) of
+ unknown ->
+ State1 = log_fd_error("handle.exe output did not contain "
+ "a line beginning with ' File ', unable "
+ "to determine used file descriptor "
+ "count: ~p~n", [Handle], State0),
+ {State1, 0};
+ UsedFd ->
+ {State0, UsedFd}
+ end
+ end.
+find_files_line([]) ->
+ unknown;
+find_files_line([" File " ++ Rest | _T]) ->
+ [Files] = string:tokens(Rest, ": "),
+ list_to_integer(Files);
+find_files_line([_H | T]) ->
+ find_files_line(T).
+-define(SAFE_CALL(Fun, NoProcFailResult),
+ try
+ Fun
+ catch exit:{noproc, _} -> NoProcFailResult
+ end).
+get_disk_free_limit() -> ?SAFE_CALL(rabbit_disk_monitor:get_disk_free_limit(),
+ disk_free_monitoring_disabled).
+get_disk_free() -> ?SAFE_CALL(rabbit_disk_monitor:get_disk_free(),
+ disk_free_monitoring_disabled).
+log_fd_error(Fmt, Args, #state{error_logged_time = undefined}=State) ->
+ % rabbitmq/rabbitmq-management#90
+ % no errors have been logged, so log it and make a note of when
+ Now = erlang:monotonic_time(second),
+ ok = rabbit_log:error(Fmt, Args),
+ State#state{error_logged_time = Now};
+log_fd_error(Fmt, Args, #state{error_logged_time = Time}=State) ->
+ Now = erlang:monotonic_time(second),
+ case Now >= Time + ?TEN_MINUTES_AS_SECONDS of
+ true ->
+ % rabbitmq/rabbitmq-management#90
+ % it has been longer than 10 minutes,
+ % re-log the error
+ ok = rabbit_log:error(Fmt, Args),
+ State#state{error_logged_time = Now};
+ _ ->
+ % 10 minutes have not yet passed
+ State
+ end.
+infos([], Acc, State) ->
+ {State, lists:reverse(Acc)};
+infos([Item|T], Acc0, State0) ->
+ {State1, Infos} = i(Item, State0),
+ Acc1 = [{Item, Infos}|Acc0],
+ infos(T, Acc1, State1).
+i(name, State) ->
+ {State, node()};
+i(partitions, State) ->
+ {State, rabbit_node_monitor:partitions()};
+i(fd_used, State) ->
+ get_used_fd(State);
+i(fd_total, #state{fd_total = FdTotal}=State) ->
+ {State, FdTotal};
+i(sockets_used, State) ->
+ {State, proplists:get_value(sockets_used, file_handle_cache:info([sockets_used]))};
+i(sockets_total, State) ->
+ {State, proplists:get_value(sockets_limit, file_handle_cache:info([sockets_limit]))};
+i(os_pid, State) ->
+ {State, list_to_binary(os:getpid())};
+i(mem_used, State) ->
+ {State, vm_memory_monitor:get_process_memory()};
+i(mem_calculation_strategy, State) ->
+ {State, vm_memory_monitor:get_memory_calculation_strategy()};
+i(mem_limit, State) ->
+ {State, vm_memory_monitor:get_memory_limit()};
+i(mem_alarm, State) ->
+ {State, resource_alarm_set(memory)};
+i(proc_used, State) ->
+ {State, erlang:system_info(process_count)};
+i(proc_total, State) ->
+ {State, erlang:system_info(process_limit)};
+i(run_queue, State) ->
+ {State, erlang:statistics(run_queue)};
+i(processors, State) ->
+ {State, erlang:system_info(logical_processors)};
+i(disk_free_limit, State) ->
+ {State, get_disk_free_limit()};
+i(disk_free, State) ->
+ {State, get_disk_free()};
+i(disk_free_alarm, State) ->
+ {State, resource_alarm_set(disk)};
+i(contexts, State) ->
+ {State, rabbit_web_dispatch_contexts()};
+i(uptime, State) ->
+ {Total, _} = erlang:statistics(wall_clock),
+ {State, Total};
+i(rates_mode, State) ->
+ {State, rabbit_mgmt_db_handler:rates_mode()};
+i(exchange_types, State) ->
+ {State, list_registry_plugins(exchange)};
+i(log_files, State) ->
+ {State, [list_to_binary(F) || F <- rabbit:log_locations()]};
+i(db_dir, State) ->
+ {State, list_to_binary(rabbit_mnesia:dir())};
+i(config_files, State) ->
+ {State, [list_to_binary(F) || F <- rabbit:config_files()]};
+i(net_ticktime, State) ->
+ {State, net_kernel:get_net_ticktime()};
+i(persister_stats, State) ->
+ {State, persister_stats(State)};
+i(enabled_plugins, State) ->
+ {ok, Dir} = application:get_env(rabbit, enabled_plugins_file),
+ {State, rabbit_plugins:read_enabled(Dir)};
+i(auth_mechanisms, State) ->
+ {ok, Mechanisms} = application:get_env(rabbit, auth_mechanisms),
+ F = fun (N) ->
+ lists:member(list_to_atom(binary_to_list(N)), Mechanisms)
+ end,
+ {State, list_registry_plugins(auth_mechanism, F)};
+i(applications, State) ->
+ {State, [format_application(A) || A <- lists:keysort(1, rabbit_misc:which_applications())]};
+i(gc_num, State) ->
+ {GCs, _, _} = erlang:statistics(garbage_collection),
+ {State, GCs};
+i(gc_bytes_reclaimed, State) ->
+ {_, Words, _} = erlang:statistics(garbage_collection),
+ {State, Words * erlang:system_info(wordsize)};
+i(context_switches, State) ->
+ {Sw, 0} = erlang:statistics(context_switches),
+ {State, Sw};
+i(ra_open_file_metrics, State) ->
+ {State, [{ra_log_wal, ra_metrics(ra_log_wal)},
+ {ra_log_segment_writer, ra_metrics(ra_log_segment_writer)}]}.
+ra_metrics(K) ->
+ try
+ case ets:lookup(ra_open_file_metrics, whereis(K)) of
+ [] -> 0;
+ [{_, C}] -> C
+ end
+ catch
+ error:badarg ->
+ %% On startup the mgmt might start before ra does
+ 0
+ end.
+resource_alarm_set(Source) ->
+ lists:member({{resource_limit, Source, node()},[]},
+ rabbit_alarm:get_alarms()).
+list_registry_plugins(Type) ->
+ list_registry_plugins(Type, fun(_) -> true end).
+list_registry_plugins(Type, Fun) ->
+ [registry_plugin_enabled(set_plugin_name(Name, Module), Fun) ||
+ {Name, Module} <- rabbit_registry:lookup_all(Type)].
+registry_plugin_enabled(Desc, Fun) ->
+ Desc ++ [{enabled, Fun(proplists:get_value(name, Desc))}].
+format_application({Application, Description, Version}) ->
+ [{name, Application},
+ {description, list_to_binary(Description)},
+ {version, list_to_binary(Version)}].
+set_plugin_name(Name, Module) ->
+ [{name, list_to_binary(atom_to_list(Name))} |
+ proplists:delete(name, Module:description())].
+persister_stats(#state{fhc_stats = FHC}) ->
+ [{flatten_key(K), V} || {{_Op, _Type} = K, V} <- FHC].
+flatten_key({A, B}) ->
+ list_to_atom(atom_to_list(A) ++ "_" ++ atom_to_list(B)).
+cluster_links() ->
+ {ok, Items} = net_kernel:nodes_info(),
+ [Link || Item <- Items,
+ Link <- [format_nodes_info(Item)], Link =/= undefined].
+format_nodes_info({Node, Info}) ->
+ Owner = proplists:get_value(owner, Info),
+ case catch process_info(Owner, links) of
+ {links, Links} ->
+ case [Link || Link <- Links, is_port(Link)] of
+ [Port] ->
+ {Node, Owner, format_nodes_info1(Port)};
+ _ ->
+ undefined
+ end;
+ _ ->
+ undefined
+ end.
+format_nodes_info1(Port) ->
+ case {rabbit_net:socket_ends(Port, inbound),
+ rabbit_net:getstat(Port, [recv_oct, send_oct])} of
+ {{ok, {PeerAddr, PeerPort, SockAddr, SockPort}}, {ok, Stats}} ->
+ [{peer_addr, maybe_ntoab(PeerAddr)},
+ {peer_port, PeerPort},
+ {sock_addr, maybe_ntoab(SockAddr)},
+ {sock_port, SockPort},
+ {recv_bytes, pget(recv_oct, Stats)},
+ {send_bytes, pget(send_oct, Stats)}];
+ _ ->
+ []
+ end.
+maybe_ntoab(A) when is_tuple(A) -> list_to_binary(rabbit_misc:ntoab(A));
+maybe_ntoab(H) -> H.
+%% This is slightly icky in that we introduce knowledge of
+%% rabbit_web_dispatch, which is not a dependency. But the last thing I
+%% want to do is create a rabbitmq_mochiweb_management_agent plugin.
+rabbit_web_dispatch_contexts() ->
+ [format_context(C) || C <- rabbit_web_dispatch_registry_list_all()].
+%% For similar reasons we don't declare a dependency on
+%% rabbitmq_mochiweb - so at startup there's no guarantee it will be
+%% running. So we have to catch this noproc.
+rabbit_web_dispatch_registry_list_all() ->
+ case code:is_loaded(rabbit_web_dispatch_registry) of
+ false -> [];
+ _ -> try
+ M = rabbit_web_dispatch_registry, %% Fool xref
+ M:list_all()
+ catch exit:{noproc, _} ->
+ []
+ end
+ end.
+format_context({Path, Description, Rest}) ->
+ [{description, list_to_binary(Description)},
+ {path, list_to_binary("/" ++ Path)} |
+ format_mochiweb_option_list(Rest)].
+format_mochiweb_option_list(C) ->
+ [{K, format_mochiweb_option(K, V)} || {K, V} <- C].
+format_mochiweb_option(ssl_opts, V) ->
+ format_mochiweb_option_list(V);
+format_mochiweb_option(_K, V) ->
+ case io_lib:printable_list(V) of
+ true -> list_to_binary(V);
+ false -> list_to_binary(rabbit_misc:format("~w", [V]))
+ end.
+init([]) ->
+ {ok, Interval} = application:get_env(rabbit, collect_statistics_interval),
+ State = #state{fd_total = file_handle_cache:ulimit(),
+ fhc_stats = get_fhc_stats(),
+ node_owners = sets:new(),
+ interval = Interval},
+ %% We can update stats straight away as they need to be available
+ %% when the mgmt plugin starts a collector
+ {ok, emit_update(State)}.
+handle_call(_Req, _From, State) ->
+ {reply, unknown_request, State}.
+handle_cast(_C, State) ->
+ {noreply, State}.
+handle_info(emit_update, State) ->
+ {noreply, emit_update(State)};
+handle_info(_I, State) ->
+ {noreply, State}.
+terminate(_, _) -> ok.
+code_change(_, State, _) -> {ok, State}.
+emit_update(State0) ->
+ State1 = update_state(State0),
+ {State2, MStats} = infos(?METRICS_KEYS, [], State1),
+ {State3, PStats} = infos(?PERSISTER_KEYS, [], State2),
+ {State4, OStats} = infos(?OTHER_KEYS, [], State3),
+ [{persister_stats, PStats0}] = PStats,
+ [{name, _Name} | OStats0] = OStats,
+ rabbit_core_metrics:node_stats(persister_metrics, PStats0),
+ rabbit_core_metrics:node_stats(coarse_metrics, MStats),
+ rabbit_core_metrics:node_stats(node_metrics, OStats0),
+ rabbit_event:notify(node_stats, PStats ++ MStats ++ OStats),
+ erlang:send_after(State4#state.interval, self(), emit_update),
+ emit_node_node_stats(State4).
+emit_node_node_stats(State = #state{node_owners = Owners}) ->
+ Links = cluster_links(),
+ NewOwners = sets:from_list([{Node, Owner} || {Node, Owner, _} <- Links]),
+ Dead = sets:to_list(sets:subtract(Owners, NewOwners)),
+ [rabbit_event:notify(
+ node_node_deleted, [{route, Route}]) || {Node, _Owner} <- Dead,
+ Route <- [{node(), Node},
+ {Node, node()}]],
+ [begin
+ rabbit_core_metrics:node_node_stats({node(), Node}, Stats),
+ rabbit_event:notify(
+ node_node_stats, [{route, {node(), Node}} | Stats])
+ end || {Node, _Owner, Stats} <- Links],
+ State#state{node_owners = NewOwners}.
+update_state(State0) ->
+ %% Store raw data, the average operation time is calculated during querying
+ %% from the accumulated total
+ FHC = get_fhc_stats(),
+ State0#state{fhc_stats = FHC}.
+get_fhc_stats() ->
+ dict:to_list(dict:merge(fun(_, V1, V2) -> V1 + V2 end,
+ dict:from_list(file_handle_cache_stats:get()),
+ dict:from_list(get_ra_io_metrics()))).
+get_ra_io_metrics() ->
+ lists:sort(ets:tab2list(ra_io_metrics)).
diff --git a/deps/rabbitmq_management_agent/src/rabbit_mgmt_ff.erl b/deps/rabbitmq_management_agent/src/rabbit_mgmt_ff.erl
new file mode 100644
index 0000000000..c8173c1244
--- /dev/null
+++ b/deps/rabbitmq_management_agent/src/rabbit_mgmt_ff.erl
@@ -0,0 +1,20 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at
+%% Copyright (c) 2018-2020 VMware, Inc. or its affiliates. All rights reserved.
+ {empty_basic_get_metric,
+ #{desc => "Count AMQP `basic.get` on empty queues in stats",
+ stability => stable
+ }}).
+ {drop_unroutable_metric,
+ #{desc => "Count unroutable publishes to be dropped in stats",
+ stability => stable
+ }}).
diff --git a/deps/rabbitmq_management_agent/src/rabbit_mgmt_format.erl b/deps/rabbitmq_management_agent/src/rabbit_mgmt_format.erl
new file mode 100644
index 0000000000..4c9e8c189f
--- /dev/null
+++ b/deps/rabbitmq_management_agent/src/rabbit_mgmt_format.erl
@@ -0,0 +1,559 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+-export([format/2, ip/1, ipb/1, amqp_table/1, tuple/1]).
+-export([parameter/1, now_to_str/1, now_to_str_ms/1, strip_pids/1]).
+-export([protocol/1, resource/1, queue/1, queue_state/1, queue_info/1]).
+-export([exchange/1, user/1, internal_user/1, binding/1, url/2]).
+-export([pack_binding_props/2, tokenise/1]).
+-export([to_amqp_table/1, listener/1, web_context/1, properties/1, basic_properties/1]).
+-export([record/2, to_basic_properties/1]).
+-export([addr/1, port/1]).
+-export([format_nulls/1, escape_html_tags/1]).
+-export([print/2, print/1]).
+-export([format_queue_stats/1, format_channel_stats/1,
+ format_consumer_arguments/1,
+ format_connection_created/1,
+ format_accept_content/1, format_args/1]).
+-export([clean_consumer_details/1, clean_channel_details/1]).
+-import(rabbit_misc, [pget/2, pget/3, pset/3]).
+format(Stats, {[], _}) ->
+ [Stat || {_Name, Value} = Stat <- Stats, Value =/= unknown];
+format(Stats, {Fs, true}) ->
+ [Fs(Stat) || {_Name, Value} = Stat <- Stats, Value =/= unknown];
+format(Stats, {Fs, false}) ->
+ lists:concat([Fs(Stat) || {_Name, Value} = Stat <- Stats,
+ Value =/= unknown]).
+format_queue_stats({reductions, _}) ->
+ [];
+format_queue_stats({exclusive_consumer_pid, _}) ->
+ [];
+format_queue_stats({single_active_consumer_pid, _}) ->
+ [];
+format_queue_stats({slave_pids, ''}) ->
+ [];
+format_queue_stats({slave_pids, Pids}) ->
+ [{slave_nodes, [node(Pid) || Pid <- Pids]}];
+format_queue_stats({leader, Leader}) ->
+ [{node, Leader}];
+format_queue_stats({synchronised_slave_pids, ''}) ->
+ [];
+format_queue_stats({effective_policy_definition, []}) ->
+ [{effective_policy_definition, #{}}];
+format_queue_stats({synchronised_slave_pids, Pids}) ->
+ [{synchronised_slave_nodes, [node(Pid) || Pid <- Pids]}];
+format_queue_stats({backing_queue_status, Value}) ->
+ [{backing_queue_status, properties(Value)}];
+format_queue_stats({idle_since, Value}) ->
+ [{idle_since, now_to_str(Value)}];
+format_queue_stats({state, Value}) ->
+ queue_state(Value);
+format_queue_stats({disk_reads, _}) ->
+ [];
+format_queue_stats({disk_writes, _}) ->
+ [];
+format_queue_stats(Stat) ->
+ [Stat].
+format_channel_stats([{idle_since, Value} | Rest]) ->
+ [{idle_since, now_to_str(Value)} | Rest];
+format_channel_stats(Stats) ->
+ Stats.
+%% Conerts an HTTP API request payload value
+%% to AMQP 0-9-1 arguments table
+format_args({arguments, []}) ->
+ {arguments, []};
+format_args({arguments, Value}) ->
+ {arguments, to_amqp_table(Value)};
+format_args(Stat) ->
+ Stat.
+format_connection_created({host, Value}) ->
+ {host, addr(Value)};
+format_connection_created({peer_host, Value}) ->
+ {peer_host, addr(Value)};
+format_connection_created({port, Value}) ->
+ {port, port(Value)};
+format_connection_created({peer_port, Value}) ->
+ {peer_port, port(Value)};
+format_connection_created({protocol, Value}) ->
+ {protocol, protocol(Value)};
+format_connection_created({client_properties, Value}) ->
+ {client_properties, amqp_table(Value)};
+format_connection_created(Stat) ->
+ Stat.
+format_exchange_and_queue({policy, Value}) ->
+ policy(Value);
+format_exchange_and_queue({arguments, Value}) ->
+ [{arguments, amqp_table(Value)}];
+format_exchange_and_queue({name, Value}) ->
+ resource(Value);
+format_exchange_and_queue(Stat) ->
+ [Stat].
+format_binding({source, Value}) ->
+ resource(source, Value);
+format_binding({arguments, Value}) ->
+ [{arguments, amqp_table(Value)}];
+format_binding(Stat) ->
+ [Stat].
+format_basic_properties({headers, Value}) ->
+ {headers, amqp_table(Value)};
+format_basic_properties(Stat) ->
+ Stat.
+format_accept_content({durable, Value}) ->
+ {durable, parse_bool(Value)};
+format_accept_content({auto_delete, Value}) ->
+ {auto_delete, parse_bool(Value)};
+format_accept_content({internal, Value}) ->
+ {internal, parse_bool(Value)};
+format_accept_content(Stat) ->
+ Stat.
+print(Fmt, Val) when is_list(Val) ->
+ list_to_binary(lists:flatten(io_lib:format(Fmt, Val)));
+print(Fmt, Val) ->
+ print(Fmt, [Val]).
+print(Val) when is_list(Val) ->
+ list_to_binary(lists:flatten(Val));
+print(Val) ->
+ Val.
+%% TODO - can we remove all these "unknown" cases? Coverage never hits them.
+ip(unknown) -> unknown;
+ip(IP) -> list_to_binary(rabbit_misc:ntoa(IP)).
+ipb(unknown) -> unknown;
+ipb(IP) -> list_to_binary(rabbit_misc:ntoab(IP)).
+addr(S) when is_list(S); is_atom(S); is_binary(S) -> print("~s", S);
+addr(Addr) when is_tuple(Addr) -> ip(Addr).
+port(Port) when is_number(Port) -> Port;
+port(Port) -> print("~w", Port).
+properties(unknown) -> unknown;
+properties(Table) -> maps:from_list([{Name, tuple(Value)} ||
+ {Name, Value} <- Table]).
+amqp_table(Value) -> rabbit_misc:amqp_table(Value).
+parameter(P) -> pset(value, pget(value, P), P).
+tuple(unknown) -> unknown;
+tuple(Tuple) when is_tuple(Tuple) -> [tuple(E) || E <- tuple_to_list(Tuple)];
+tuple(Term) -> Term.
+protocol(unknown) ->
+ unknown;
+protocol(Version = {_Major, _Minor, _Revision}) ->
+ protocol({'AMQP', Version});
+protocol({Family, Version}) ->
+ print("~s ~s", [Family, protocol_version(Version)]).
+ when is_list(Arbitrary) -> Arbitrary;
+protocol_version({Major, Minor}) -> io_lib:format("~B-~B", [Major, Minor]);
+protocol_version({Major, Minor, 0}) -> protocol_version({Major, Minor});
+protocol_version({Major, Minor, Revision}) -> io_lib:format("~B-~B-~B",
+ [Major, Minor, Revision]).
+now_to_str(unknown) ->
+ unknown;
+now_to_str(MilliSeconds) ->
+ BaseDate = calendar:datetime_to_gregorian_seconds({{1970, 1, 1},
+ {0, 0, 0}}),
+ Seconds = BaseDate + (MilliSeconds div 1000),
+ {{Y, M, D}, {H, Min, S}} = calendar:gregorian_seconds_to_datetime(Seconds),
+ print("~w-~2.2.0w-~2.2.0w ~w:~2.2.0w:~2.2.0w", [Y, M, D, H, Min, S]).
+now_to_str_ms(unknown) ->
+ unknown;
+now_to_str_ms(MilliSeconds) ->
+ print("~s:~3.3.0w", [now_to_str(MilliSeconds), MilliSeconds rem 1000]).
+resource(unknown) -> unknown;
+resource(Res) -> resource(name, Res).
+resource(_, unknown) ->
+ unknown;
+resource(NameAs, #resource{name = Name, virtual_host = VHost}) ->
+ [{NameAs, Name}, {vhost, VHost}].
+policy('') -> [];
+policy(Policy) -> [{policy, Policy}].
+internal_user(User) ->
+ [{name, internal_user:get_username(User)},
+ {password_hash, base64:encode(internal_user:get_password_hash(User))},
+ {hashing_algorithm, rabbit_auth_backend_internal:hashing_module_for_user(
+ User)},
+ {tags, tags(internal_user:get_tags(User))},
+ {limits, internal_user:get_limits(User)}].
+user(User) ->
+ [{name, User#user.username},
+ {tags, tags(User#user.tags)}].
+tags(Tags) ->
+ list_to_binary(string:join([atom_to_list(T) || T <- Tags], ",")).
+listener(#listener{node = Node, protocol = Protocol,
+ ip_address = IPAddress, port = Port, opts=Opts}) ->
+ [{node, Node},
+ {protocol, Protocol},
+ {ip_address, ip(IPAddress)},
+ {port, Port},
+ {socket_opts, format_socket_opts(Opts)}].
+web_context(Props0) ->
+ SslOpts = pget(ssl_opts, Props0, []),
+ Props = proplists:delete(ssl_opts, Props0),
+ [{ssl_opts, format_socket_opts(SslOpts)} | Props].
+format_socket_opts(Opts) ->
+ format_socket_opts(Opts, []).
+format_socket_opts([], Acc) ->
+ lists:reverse(Acc);
+%% for HTTP API listeners this will be included into
+%% socket_opts
+format_socket_opts([{ssl_opts, Value} | Tail], Acc) ->
+ format_socket_opts(Tail, [{ssl_opts, format_socket_opts(Value)} | Acc]);
+%% exclude options that have values that are nested
+%% data structures or may include functions. They are fairly
+%% obscure and not worth reporting via HTTP API.
+format_socket_opts([{verify_fun, _Value} | Tail], Acc) ->
+ format_socket_opts(Tail, Acc);
+format_socket_opts([{crl_cache, _Value} | Tail], Acc) ->
+ format_socket_opts(Tail, Acc);
+format_socket_opts([{partial_chain, _Value} | Tail], Acc) ->
+ format_socket_opts(Tail, Acc);
+format_socket_opts([{user_lookup_fun, _Value} | Tail], Acc) ->
+ format_socket_opts(Tail, Acc);
+format_socket_opts([{sni_fun, _Value} | Tail], Acc) ->
+ format_socket_opts(Tail, Acc);
+%% we do not report SNI host details in the UI,
+%% so skip this option and avoid some recursive formatting
+%% complexity
+format_socket_opts([{sni_hosts, _Value} | Tail], Acc) ->
+ format_socket_opts(Tail, Acc);
+format_socket_opts([{reuse_session, _Value} | Tail], Acc) ->
+ format_socket_opts(Tail, Acc);
+%% we do not want to report configured cipher suites, even
+%% though formatting them is straightforward
+format_socket_opts([{ciphers, _Value} | Tail], Acc) ->
+ format_socket_opts(Tail, Acc);
+%% single atom options, e.g. `binary`
+format_socket_opts([Head | Tail], Acc) when is_atom(Head) ->
+ format_socket_opts(Tail, [{Head, true} | Acc]);
+%% verify_fun value is a tuple that includes a function
+format_socket_opts([_Head = {verify_fun, _Value} | Tail], Acc) ->
+ format_socket_opts(Tail, Acc);
+format_socket_opts([Head = {Name, Value} | Tail], Acc) when is_list(Value) ->
+ case io_lib:printable_unicode_list(Value) of
+ true -> format_socket_opts(Tail, [{Name, unicode:characters_to_binary(Value)} | Acc]);
+ false -> format_socket_opts(Tail, [Head | Acc])
+ end;
+format_socket_opts([{Name, Value} | Tail], Acc) when is_tuple(Value) ->
+ format_socket_opts(Tail, [{Name, tuple_to_list(Value)} | Acc]);
+%% exclude functions from JSON encoding
+format_socket_opts([_Head = {_Name, Value} | Tail], Acc) when is_function(Value) ->
+ format_socket_opts(Tail, Acc);
+format_socket_opts([Head | Tail], Acc) ->
+ format_socket_opts(Tail, [Head | Acc]).
+pack_binding_props(<<"">>, []) ->
+ <<"~">>;
+pack_binding_props(Key, []) ->
+ list_to_binary(quote_binding(Key));
+pack_binding_props(Key, Args) ->
+ ArgsEnc = args_hash(Args),
+ list_to_binary(quote_binding(Key) ++ "~" ++ quote_binding(ArgsEnc)).
+quote_binding(Name) ->
+ re:replace(rabbit_http_util:quote_plus(Name), "~", "%7E", [global]).
+%% Unfortunately string:tokens("foo~~bar", "~"). -> ["foo","bar"], we lose
+%% the fact that there's a double ~.
+tokenise("") ->
+ [];
+tokenise(Str) ->
+ Count = string:cspan(Str, "~"),
+ case length(Str) of
+ Count -> [Str];
+ _ -> [string:sub_string(Str, 1, Count) |
+ tokenise(string:sub_string(Str, Count + 2))]
+ end.
+to_amqp_table(V) -> rabbit_misc:to_amqp_table(V).
+url(Fmt, Vals) ->
+ print(Fmt, [rabbit_http_util:quote_plus(V) || V <- Vals]).
+exchange(X) ->
+ format(X, {fun format_exchange_and_queue/1, false}).
+%% We get queues using rabbit_amqqueue:list/1 rather than :info_all/1 since
+%% the latter wakes up each queue. Therefore we have a record rather than a
+%% proplist to deal with.
+queue(Q) when ?is_amqqueue(Q) ->
+ Name = amqqueue:get_name(Q),
+ Durable = amqqueue:is_durable(Q),
+ AutoDelete = amqqueue:is_auto_delete(Q),
+ ExclusiveOwner = amqqueue:get_exclusive_owner(Q),
+ Arguments = amqqueue:get_arguments(Q),
+ Pid = amqqueue:get_pid(Q),
+ State = amqqueue:get_state(Q),
+ %% TODO: in the future queue types should be registered with their
+ %% full and short names and this hard-coded translation should not be
+ %% necessary
+ Type = case amqqueue:get_type(Q) of
+ rabbit_classic_queue -> classic;
+ rabbit_quorum_queue -> quorum;
+ T -> T
+ end,
+ format(
+ [{name, Name},
+ {durable, Durable},
+ {auto_delete, AutoDelete},
+ {exclusive, is_pid(ExclusiveOwner)},
+ {owner_pid, ExclusiveOwner},
+ {arguments, Arguments},
+ {pid, Pid},
+ {type, Type},
+ {state, State}] ++ rabbit_amqqueue:format(Q),
+ {fun format_exchange_and_queue/1, false}).
+queue_info(List) ->
+ format(List, {fun format_exchange_and_queue/1, false}).
+queue_state({syncing, Msgs}) -> [{state, syncing},
+ {sync_messages, Msgs}];
+queue_state({terminated_by, Name}) ->
+ [{state, terminated},
+ {terminated_by, Name}];
+queue_state(Status) -> [{state, Status}].
+%% We get bindings using rabbit_binding:list_*/1 rather than :info_all/1 since
+%% there are no per-exchange / queue / etc variants for the latter. Therefore
+%% we have a record rather than a proplist to deal with.
+binding(#binding{source = S,
+ key = Key,
+ destination = D,
+ args = Args}) ->
+ format(
+ [{source, S},
+ {destination,},
+ {destination_type, D#resource.kind},
+ {routing_key, Key},
+ {arguments, Args},
+ {properties_key, pack_binding_props(Key, Args)}],
+ {fun format_binding/1, false}).
+basic_properties(Props = #'P_basic'{}) ->
+ Res = record(Props, record_info(fields, 'P_basic')),
+ format(Res, {fun format_basic_properties/1, true}).
+record(Record, Fields) ->
+ {Res, _Ix} = lists:foldl(fun (K, {L, Ix}) ->
+ {case element(Ix, Record) of
+ undefined -> L;
+ V -> [{K, V}|L]
+ end, Ix + 1}
+ end, {[], 2}, Fields),
+ Res.
+to_basic_properties(Props) when is_map(Props) ->
+ E = fun err/2,
+ Fmt = fun (headers, H) -> to_amqp_table(H);
+ (delivery_mode, V) when is_integer(V) -> V;
+ (delivery_mode, _V) -> E(not_int,delivery_mode);
+ (priority, V) when is_integer(V) -> V;
+ (priority, _V) -> E(not_int, priority);
+ (timestamp, V) when is_integer(V) -> V;
+ (timestamp, _V) -> E(not_int, timestamp);
+ (_, V) when is_binary(V) -> V;
+ (K, _V) -> E(not_string, K)
+ end,
+ {Res, _Ix} = lists:foldl(
+ fun (K, {P, Ix}) ->
+ {case maps:get(a2b(K), Props, undefined) of
+ undefined -> P;
+ V -> setelement(Ix, P, Fmt(K, V))
+ end, Ix + 1}
+ end, {#'P_basic'{}, 2},
+ record_info(fields, 'P_basic')),
+ Res.
+-spec err(term(), term()) -> no_return().
+err(A, B) ->
+ throw({error, {A, B}}).
+a2b(A) ->
+ list_to_binary(atom_to_list(A)).
+strip_queue_pids(Item) ->
+ strip_queue_pids(Item, []).
+strip_queue_pids([{_, unknown} | T], Acc) ->
+ strip_queue_pids(T, Acc);
+strip_queue_pids([{pid, Pid} | T], Acc0) when is_pid(Pid) ->
+ Acc = case proplists:is_defined(node, Acc0) of
+ false -> [{node, node(Pid)} | Acc0];
+ true -> Acc0
+ end,
+ strip_queue_pids(T, Acc);
+strip_queue_pids([{pid, _} | T], Acc) ->
+ strip_queue_pids(T, Acc);
+strip_queue_pids([{owner_pid, _} | T], Acc) ->
+ strip_queue_pids(T, Acc);
+strip_queue_pids([Any | T], Acc) ->
+ strip_queue_pids(T, [Any | Acc]);
+strip_queue_pids([], Acc) ->
+ Acc.
+%% Items can be connections, channels, consumers or queues, hence remove takes
+%% various items.
+strip_pids(Item = [T | _]) when is_tuple(T) ->
+ lists:usort(strip_pids(Item, []));
+strip_pids(Items) -> [lists:usort(strip_pids(I)) || I <- Items].
+strip_pids([{_, unknown} | T], Acc) ->
+ strip_pids(T, Acc);
+strip_pids([{pid, Pid} | T], Acc) when is_pid(Pid) ->
+ strip_pids(T, [{node, node(Pid)} | Acc]);
+strip_pids([{pid, _} | T], Acc) ->
+ strip_pids(T, Acc);
+strip_pids([{connection, _} | T], Acc) ->
+ strip_pids(T, Acc);
+strip_pids([{owner_pid, _} | T], Acc) ->
+ strip_pids(T, Acc);
+strip_pids([{channel, _} | T], Acc) ->
+ strip_pids(T, Acc);
+strip_pids([{channel_pid, _} | T], Acc) ->
+ strip_pids(T, Acc);
+strip_pids([{exclusive_consumer_pid, _} | T], Acc) ->
+ strip_pids(T, Acc);
+strip_pids([{slave_pids, ''} | T], Acc) ->
+ strip_pids(T, Acc);
+strip_pids([{slave_pids, Pids} | T], Acc) ->
+ strip_pids(T, [{slave_nodes, [node(Pid) || Pid <- Pids]} | Acc]);
+strip_pids([{synchronised_slave_pids, ''} | T], Acc) ->
+ strip_pids(T, Acc);
+strip_pids([{synchronised_slave_pids, Pids} | T], Acc) ->
+ strip_pids(T, [{synchronised_slave_nodes, [node(Pid) || Pid <- Pids]} | Acc]);
+strip_pids([{K, [P|_] = Nested} | T], Acc) when is_tuple(P) -> % recurse
+ strip_pids(T, [{K, strip_pids(Nested)} | Acc]);
+strip_pids([{K, [L|_] = Nested} | T], Acc) when is_list(L) -> % recurse
+ strip_pids(T, [{K, strip_pids(Nested)} | Acc]);
+strip_pids([Any | T], Acc) ->
+ strip_pids(T, [Any | Acc]);
+strip_pids([], Acc) ->
+ Acc.
+%% Format for JSON replies. Transforms '' into null
+format_nulls(Items) when is_list(Items) ->
+ [format_null_item(Pair) || Pair <- Items];
+format_nulls(Item) ->
+ format_null_item(Item).
+format_null_item({Key, ''}) ->
+ {Key, null};
+format_null_item({Key, Value}) when is_list(Value) ->
+ {Key, format_nulls(Value)};
+format_null_item({Key, Value}) ->
+ {Key, Value};
+format_null_item([{_K, _V} | _T] = L) ->
+ format_nulls(L);
+format_null_item(Value) ->
+ Value.
+-spec escape_html_tags(string()) -> binary().
+escape_html_tags(S) ->
+ escape_html_tags(rabbit_data_coercion:to_list(S), []).
+-spec escape_html_tags(string(), string()) -> binary().
+escape_html_tags([], Acc) ->
+ rabbit_data_coercion:to_binary(lists:reverse(Acc));
+escape_html_tags("<" ++ Rest, Acc) ->
+ escape_html_tags(Rest, lists:reverse("&lt;", Acc));
+escape_html_tags(">" ++ Rest, Acc) ->
+ escape_html_tags(Rest, lists:reverse("&gt;", Acc));
+escape_html_tags("&" ++ Rest, Acc) ->
+ escape_html_tags(Rest, lists:reverse("&amp;", Acc));
+escape_html_tags([C | Rest], Acc) ->
+ escape_html_tags(Rest, [C | Acc]).
+-spec clean_consumer_details(proplists:proplist()) -> proplists:proplist().
+clean_consumer_details(Obj) ->
+ case pget(consumer_details, Obj) of
+ undefined -> Obj;
+ Cds ->
+ Cons = [format_consumer_arguments(clean_channel_details(Con)) || Con <- Cds],
+ pset(consumer_details, Cons, Obj)
+ end.
+-spec clean_channel_details(proplists:proplist()) -> proplists:proplist().
+clean_channel_details(Obj) ->
+ Obj0 = lists:keydelete(channel_pid, 1, Obj),
+ case pget(channel_details, Obj0) of
+ undefined -> Obj0;
+ Chd ->
+ pset(channel_details,
+ lists:keydelete(pid, 1, Chd),
+ Obj0)
+ end.
+-spec format_consumer_arguments(proplists:proplist()) -> proplists:proplist().
+format_consumer_arguments(Obj) ->
+ case pget(arguments, Obj) of
+ undefined -> Obj;
+ #{} -> Obj;
+ [] -> pset(arguments, #{}, Obj);
+ Args -> pset(arguments, amqp_table(Args), Obj)
+ end.
+parse_bool(<<"true">>) -> true;
+parse_bool(<<"false">>) -> false;
+parse_bool(true) -> true;
+parse_bool(false) -> false;
+parse_bool(undefined) -> undefined;
+parse_bool(V) -> throw({error, {not_boolean, V}}).
+args_hash(Args) ->
+ list_to_binary(rabbit_misc:base64url(<<(erlang:phash2(Args, 1 bsl 32)):32>>)).
diff --git a/deps/rabbitmq_management_agent/src/rabbit_mgmt_gc.erl b/deps/rabbitmq_management_agent/src/rabbit_mgmt_gc.erl
new file mode 100644
index 0000000000..99ddc89a8e
--- /dev/null
+++ b/deps/rabbitmq_management_agent/src/rabbit_mgmt_gc.erl
@@ -0,0 +1,230 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+-record(state, {timer,
+ interval
+ }).
+-spec start_link() -> rabbit_types:ok_pid_or_error().
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+init(_) ->
+ Interval = rabbit_misc:get_env(rabbitmq_management_agent, metrics_gc_interval, 120000),
+ {ok, start_timer(#state{interval = Interval})}.
+handle_call(test, _From, State) ->
+ {reply, ok, State}.
+handle_cast(_Request, State) ->
+ {noreply, State}.
+handle_info(start_gc, State) ->
+ gc_connections(),
+ gc_vhosts(),
+ gc_channels(),
+ gc_queues(),
+ gc_exchanges(),
+ gc_nodes(),
+ {noreply, start_timer(State)}.
+terminate(_Reason, #state{timer = TRef}) ->
+ _ = erlang:cancel_timer(TRef),
+ ok.
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+start_timer(#state{interval = Interval} = St) ->
+ TRef = erlang:send_after(Interval, self(), start_gc),
+ St#state{timer = TRef}.
+gc_connections() ->
+ gc_process(connection_stats_coarse_conn_stats),
+ gc_process(connection_created_stats),
+ gc_process(connection_stats).
+gc_vhosts() ->
+ VHosts = rabbit_vhost:list(),
+ GbSet = gb_sets:from_list(VHosts),
+ gc_entity(vhost_stats_coarse_conn_stats, GbSet),
+ gc_entity(vhost_stats_fine_stats, GbSet),
+ gc_entity(vhost_msg_stats, GbSet),
+ gc_entity(vhost_msg_rates, GbSet),
+ gc_entity(vhost_stats_deliver_stats, GbSet).
+gc_channels() ->
+ gc_process(channel_created_stats),
+ gc_process(channel_stats),
+ gc_process(channel_stats_fine_stats),
+ gc_process(channel_process_stats),
+ gc_process(channel_stats_deliver_stats),
+ ok.
+gc_queues() ->
+ Queues = rabbit_amqqueue:list_names(),
+ GbSet = gb_sets:from_list(Queues),
+ LocalQueues = rabbit_amqqueue:list_local_names(),
+ LocalGbSet = gb_sets:from_list(LocalQueues),
+ gc_entity(queue_stats_publish, GbSet),
+ gc_entity(queue_stats, LocalGbSet),
+ gc_entity(queue_msg_stats, LocalGbSet),
+ gc_entity(queue_process_stats, LocalGbSet),
+ gc_entity(queue_msg_rates, LocalGbSet),
+ gc_entity(queue_stats_deliver_stats, GbSet),
+ gc_process_and_entity(channel_queue_stats_deliver_stats_queue_index, GbSet),
+ gc_process_and_entity(consumer_stats_queue_index, GbSet),
+ gc_process_and_entity(consumer_stats_channel_index, GbSet),
+ gc_process_and_entity(consumer_stats, GbSet),
+ gc_process_and_entity(channel_exchange_stats_fine_stats_channel_index, GbSet),
+ gc_process_and_entity(channel_queue_stats_deliver_stats, GbSet),
+ gc_process_and_entity(channel_queue_stats_deliver_stats_channel_index, GbSet),
+ ExchangeGbSet = gb_sets:from_list(rabbit_exchange:list_names()),
+ gc_entities(queue_exchange_stats_publish, GbSet, ExchangeGbSet),
+ gc_entities(queue_exchange_stats_publish_queue_index, GbSet, ExchangeGbSet),
+ gc_entities(queue_exchange_stats_publish_exchange_index, GbSet, ExchangeGbSet).
+gc_exchanges() ->
+ Exchanges = rabbit_exchange:list_names(),
+ GbSet = gb_sets:from_list(Exchanges),
+ gc_entity(exchange_stats_publish_in, GbSet),
+ gc_entity(exchange_stats_publish_out, GbSet),
+ gc_entity(channel_exchange_stats_fine_stats_exchange_index, GbSet),
+ gc_process_and_entity(channel_exchange_stats_fine_stats, GbSet).
+gc_nodes() ->
+ Nodes = rabbit_mnesia:cluster_nodes(all),
+ GbSet = gb_sets:from_list(Nodes),
+ gc_entity(node_stats, GbSet),
+ gc_entity(node_coarse_stats, GbSet),
+ gc_entity(node_persister_stats, GbSet),
+ gc_entity(node_node_coarse_stats_node_index, GbSet),
+ gc_entity(node_node_stats, GbSet),
+ gc_entity(node_node_coarse_stats, GbSet).
+gc_process(Table) ->
+ ets:foldl(fun({{Pid, _} = Key, _}, none) ->
+ gc_process(Pid, Table, Key);
+ ({Pid = Key, _}, none) ->
+ gc_process(Pid, Table, Key);
+ ({Pid = Key, _, _}, none) ->
+ gc_process(Pid, Table, Key);
+ ({{Pid, _} = Key, _, _, _, _}, none) ->
+ gc_process(Pid, Table, Key)
+ end, none, Table).
+gc_process(Pid, Table, Key) ->
+ case rabbit_misc:is_process_alive(Pid) of
+ true ->
+ none;
+ false ->
+ ets:delete(Table, Key),
+ none
+ end.
+gc_entity(Table, GbSet) ->
+ ets:foldl(fun({{_, Id} = Key, _}, none) when Table == node_node_stats ->
+ gc_entity(Id, Table, Key, GbSet);
+ ({{{_, Id}, _} = Key, _}, none) when Table == node_node_coarse_stats ->
+ gc_entity(Id, Table, Key, GbSet);
+ ({{Id, _} = Key, _}, none) ->
+ gc_entity(Id, Table, Key, GbSet);
+ ({Id = Key, _}, none) ->
+ gc_entity(Id, Table, Key, GbSet);
+ ({{Id, _} = Key, _}, none) ->
+ gc_entity(Id, Table, Key, GbSet)
+ end, none, Table).
+gc_entity(Id, Table, Key, GbSet) ->
+ case gb_sets:is_member(Id, GbSet) of
+ true ->
+ none;
+ false ->
+ ets:delete(Table, Key),
+ none
+ end.
+gc_process_and_entity(Table, GbSet) ->
+ ets:foldl(fun({{Id, Pid, _} = Key, _}, none) when Table == consumer_stats ->
+ gc_process_and_entity(Id, Pid, Table, Key, GbSet);
+ ({Id = Key, {_, Pid, _}} = Object, none)
+ when Table == consumer_stats_queue_index ->
+ gc_object(Pid, Table, Object),
+ gc_entity(Id, Table, Key, GbSet);
+ ({Pid = Key, {Id, _, _}} = Object, none)
+ when Table == consumer_stats_channel_index ->
+ gc_object(Id, Table, Object, GbSet),
+ gc_process(Pid, Table, Key);
+ ({Id = Key, {{Pid, _}, _}} = Object, none)
+ when Table == channel_exchange_stats_fine_stats_exchange_index;
+ Table == channel_queue_stats_deliver_stats_queue_index ->
+ gc_object(Pid, Table, Object),
+ gc_entity(Id, Table, Key, GbSet);
+ ({Pid = Key, {{_, Id}, _}} = Object, none)
+ when Table == channel_exchange_stats_fine_stats_channel_index;
+ Table == channel_queue_stats_deliver_stats_channel_index ->
+ gc_object(Id, Table, Object, GbSet),
+ gc_process(Pid, Table, Key);
+ ({{{Pid, Id}, _} = Key, _}, none)
+ when Table == channel_queue_stats_deliver_stats;
+ Table == channel_exchange_stats_fine_stats ->
+ gc_process_and_entity(Id, Pid, Table, Key, GbSet);
+ ({{{Pid, Id}, _} = Key, _, _, _, _, _, _, _, _}, none) ->
+ gc_process_and_entity(Id, Pid, Table, Key, GbSet);
+ ({{{Pid, Id}, _} = Key, _, _, _, _}, none) ->
+ gc_process_and_entity(Id, Pid, Table, Key, GbSet)
+ end, none, Table).
+gc_process_and_entity(Id, Pid, Table, Key, GbSet) ->
+ case rabbit_misc:is_process_alive(Pid) andalso gb_sets:is_member(Id, GbSet) of
+ true ->
+ none;
+ false ->
+ ets:delete(Table, Key),
+ none
+ end.
+gc_object(Pid, Table, Object) ->
+ case rabbit_misc:is_process_alive(Pid) of
+ true ->
+ none;
+ false ->
+ ets:delete_object(Table, Object),
+ none
+ end.
+gc_object(Id, Table, Object, GbSet) ->
+ case gb_sets:is_member(Id, GbSet) of
+ true ->
+ none;
+ false ->
+ ets:delete_object(Table, Object),
+ none
+ end.
+gc_entities(Table, QueueGbSet, ExchangeGbSet) ->
+ ets:foldl(fun({{{Q, X}, _} = Key, _}, none)
+ when Table == queue_exchange_stats_publish ->
+ gc_entity(Q, Table, Key, QueueGbSet),
+ gc_entity(X, Table, Key, ExchangeGbSet);
+ ({Q, {{_, X}, _}} = Object, none)
+ when Table == queue_exchange_stats_publish_queue_index ->
+ gc_object(X, Table, Object, ExchangeGbSet),
+ gc_entity(Q, Table, Q, QueueGbSet);
+ ({X, {{Q, _}, _}} = Object, none)
+ when Table == queue_exchange_stats_publish_exchange_index ->
+ gc_object(Q, Table, Object, QueueGbSet),
+ gc_entity(X, Table, X, ExchangeGbSet)
+ end, none, Table).
diff --git a/deps/rabbitmq_management_agent/src/rabbit_mgmt_metrics_collector.erl b/deps/rabbitmq_management_agent/src/rabbit_mgmt_metrics_collector.erl
new file mode 100644
index 0000000000..298f17a18d
--- /dev/null
+++ b/deps/rabbitmq_management_agent/src/rabbit_mgmt_metrics_collector.erl
@@ -0,0 +1,712 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+-compile({no_auto_import, [ceil/1]}).
+-spec start_link(atom()) -> rabbit_types:ok_pid_or_error().
+-export([override_lookups/2, reset_lookups/1]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
+-import(rabbit_mgmt_data, [lookup_element/3]).
+-record(state, {table, interval, policies, rates_mode, lookup_queue,
+ lookup_exchange, old_aggr_stats}).
+%% Data is stored in ETS tables:
+%% * One ETS table per metric (queue_stats, channel_stats_deliver_stats...)
+%% (see ?TABLES in rabbit_mgmt_metrics.hrl)
+%% * Stats are stored as key value pairs where the key is a tuple of
+%% some value (such as a channel pid) and the retention interval.
+%% The value is an instance of an exometer_slide providing a sliding window
+%% of samples for some {Object, Interval}.
+%% * Each slide can store multiple stats. See stats_per_table in
+%% rabbit_mgmt_metrics.hrl for a map of which stats are recorded in which
+%% table.
+reset_all() ->
+ [reset(Table) || {Table, _} <- ?CORE_TABLES].
+reset(Table) ->
+ gen_server:cast(name(Table), reset).
+name(Table) ->
+ list_to_atom((atom_to_list(Table) ++ "_metrics_collector")).
+start_link(Table) ->
+ gen_server:start_link({local, name(Table)}, ?MODULE, [Table], []).
+override_lookups(Table, Lookups) ->
+ gen_server:call(name(Table), {override_lookups, Lookups}, infinity).
+reset_lookups(Table) ->
+ gen_server:call(name(Table), reset_lookups, infinity).
+init([Table]) ->
+ {RatesMode, Policies} = load_config(),
+ Policy = retention_policy(Table),
+ Interval = take_smaller(proplists:get_value(Policy, Policies, [])) * 1000,
+ erlang:send_after(Interval, self(), collect_metrics),
+ {ok, #state{table = Table, interval = Interval,
+ policies = {proplists:get_value(basic, Policies),
+ proplists:get_value(detailed, Policies),
+ proplists:get_value(global, Policies)},
+ rates_mode = RatesMode,
+ old_aggr_stats = #{},
+ lookup_queue = fun queue_exists/1,
+ lookup_exchange = fun exchange_exists/1}}.
+handle_call(reset_lookups, _From, State) ->
+ {reply, ok, State#state{lookup_queue = fun queue_exists/1,
+ lookup_exchange = fun exchange_exists/1}};
+handle_call({override_lookups, Lookups}, _From, State) ->
+ {reply, ok, State#state{lookup_queue = pget(queue, Lookups),
+ lookup_exchange = pget(exchange, Lookups)}};
+handle_call({submit, Fun}, _From, State) ->
+ {reply, Fun(), State};
+handle_call(_Request, _From, State) ->
+ {noreply, State}.
+handle_cast(reset, State) ->
+ {noreply, State#state{old_aggr_stats = #{}}};
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+handle_info(collect_metrics, #state{interval = Interval} = State0) ->
+ Timestamp = exometer_slide:timestamp(),
+ State = aggregate_metrics(Timestamp, State0),
+ erlang:send_after(Interval, self(), collect_metrics),
+ {noreply, State};
+handle_info(purge_old_stats, State) ->
+ {noreply, State#state{old_aggr_stats = #{}}};
+handle_info(_Msg, State) ->
+ {noreply, State}.
+terminate(_Reason, _State) ->
+ ok.
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+retention_policy(connection_created) -> basic; %% really nothing
+retention_policy(connection_metrics) -> basic;
+retention_policy(connection_coarse_metrics) -> basic;
+retention_policy(channel_created) -> basic;
+retention_policy(channel_metrics) -> basic;
+retention_policy(channel_queue_exchange_metrics) -> detailed;
+retention_policy(channel_exchange_metrics) -> detailed;
+retention_policy(channel_queue_metrics) -> detailed;
+retention_policy(channel_process_metrics) -> basic;
+retention_policy(consumer_created) -> basic;
+retention_policy(queue_metrics) -> basic;
+retention_policy(queue_coarse_metrics) -> basic;
+retention_policy(node_persister_metrics) -> global;
+retention_policy(node_coarse_metrics) -> global;
+retention_policy(node_metrics) -> basic;
+retention_policy(node_node_metrics) -> global;
+retention_policy(connection_churn_metrics) -> basic.
+take_smaller(Policies) ->
+ Intervals = [I || {_, I} <- Policies],
+ case Intervals of
+ [] -> throw(missing_sample_retention_policies);
+ _ -> lists:min(Intervals)
+ end.
+insert_old_aggr_stats(NextStats, Id, Stat) ->
+ NextStats#{Id => Stat}.
+handle_deleted_queues(queue_coarse_metrics, Remainders,
+ #state{policies = {BPolicies, _, GPolicies}}) ->
+ TS = exometer_slide:timestamp(),
+ lists:foreach(fun ({Queue, {R, U, M}}) ->
+ NegStats = ?vhost_msg_stats(-R, -U, -M),
+ [insert_entry(vhost_msg_stats, vhost(Queue), TS,
+ NegStats, Size, Interval, true)
+ || {Size, Interval} <- GPolicies],
+ % zero out msg stats to avoid duplicating msg
+ % stats when a master queue is migrated
+ QNegStats = ?queue_msg_stats(0, 0, 0),
+ [insert_entry(queue_msg_stats, Queue, TS,
+ QNegStats, Size, Interval, false)
+ || {Size, Interval} <- BPolicies],
+ ets:delete(queue_stats, Queue),
+ ets:delete(queue_process_stats, Queue)
+ end, maps:to_list(Remainders));
+handle_deleted_queues(_T, _R, _P) -> ok.
+aggregate_metrics(Timestamp, #state{table = Table,
+ policies = {_, _, _GPolicies}} = State0) ->
+ Table = State0#state.table,
+ {Next, Ops, #state{old_aggr_stats = Remainders}} =
+ ets:foldl(fun(R, {NextStats, O, State}) ->
+ aggregate_entry(R, NextStats, O, State)
+ end, {#{}, #{}, State0}, Table),
+ maps:fold(fun(Tbl, TblOps, Acc) ->
+ _ = exec_table_ops(Tbl, Timestamp, TblOps),
+ Acc
+ end, no_acc, Ops),
+ handle_deleted_queues(Table, Remainders, State0),
+ State0#state{old_aggr_stats = Next}.
+exec_table_ops(Table, Timestamp, TableOps) ->
+ maps:fold(fun(_Id, {insert, Entry}, A) ->
+ ets:insert(Table, Entry),
+ A;
+ (Id, {insert_with_index, Entry}, A) ->
+ insert_with_index(Table, Id, Entry),
+ A;
+ ({Id, Size, Interval, Incremental},
+ {insert_entry, Entry}, A) ->
+ insert_entry(Table, Id, Timestamp, Entry, Size,
+ Interval, Incremental),
+ A
+ end, no_acc, TableOps).
+aggregate_entry({Id, Metrics}, NextStats, Ops0,
+ #state{table = connection_created} = State) ->
+ case ets:lookup(connection_created_stats, Id) of
+ [] ->
+ Ftd = rabbit_mgmt_format:format(
+ Metrics,
+ {fun rabbit_mgmt_format:format_connection_created/1, true}),
+ Entry = ?connection_created_stats(Id, pget(name, Ftd, unknown), Ftd),
+ Ops = insert_op(connection_created_stats, Id, Entry, Ops0),
+ {NextStats, Ops, State};
+ _ ->
+ {NextStats, Ops0, State}
+ end;
+aggregate_entry({Id, Metrics}, NextStats, Ops0,
+ #state{table = connection_metrics} = State) ->
+ Entry = ?connection_stats(Id, Metrics),
+ Ops = insert_op(connection_stats, Id, Entry, Ops0),
+ {NextStats, Ops, State};
+aggregate_entry({Id, RecvOct, SendOct, Reductions, 0}, NextStats, Ops0,
+ #state{table = connection_coarse_metrics,
+ policies = {BPolicies, _, GPolicies}} = State) ->
+ Stats = ?vhost_stats_coarse_conn_stats(RecvOct, SendOct),
+ Diff = get_difference(Id, Stats, State),
+ Ops1 = insert_entry_ops(vhost_stats_coarse_conn_stats,
+ vhost({connection_created, Id}), true, Diff, Ops0,
+ GPolicies),
+ Entry = ?connection_stats_coarse_conn_stats(RecvOct, SendOct, Reductions),
+ Ops2 = insert_entry_ops(connection_stats_coarse_conn_stats, Id, false, Entry,
+ Ops1, BPolicies),
+ {insert_old_aggr_stats(NextStats, Id, Stats), Ops2, State};
+aggregate_entry({Id, RecvOct, SendOct, _Reductions, 1}, NextStats, Ops0,
+ #state{table = connection_coarse_metrics,
+ policies = {_BPolicies, _, GPolicies}} = State) ->
+ Stats = ?vhost_stats_coarse_conn_stats(RecvOct, SendOct),
+ Diff = get_difference(Id, Stats, State),
+ Ops1 = insert_entry_ops(vhost_stats_coarse_conn_stats,
+ vhost({connection_created, Id}), true, Diff, Ops0,
+ GPolicies),
+ rabbit_core_metrics:delete(connection_coarse_metrics, Id),
+ {NextStats, Ops1, State};
+aggregate_entry({Id, Metrics}, NextStats, Ops0,
+ #state{table = channel_created} = State) ->
+ case ets:lookup(channel_created_stats, Id) of
+ [] ->
+ Ftd = rabbit_mgmt_format:format(Metrics, {[], false}),
+ Entry = ?channel_created_stats(Id, pget(name, Ftd, unknown), Ftd),
+ Ops = insert_op(channel_created_stats, Id, Entry, Ops0),
+ {NextStats, Ops, State};
+ _ ->
+ {NextStats, Ops0, State}
+ end;
+aggregate_entry({Id, Metrics}, NextStats, Ops0,
+ #state{table = channel_metrics} = State) ->
+ %% First metric must be `idle_since` (if available), as expected by
+ %% `rabbit_mgmt_format:format_channel_stats`. This is a performance
+ %% optimisation that avoids traversing the whole list when only
+ %% one element has to be formatted.
+ Ftd = rabbit_mgmt_format:format_channel_stats(Metrics),
+ Entry = ?channel_stats(Id, Ftd),
+ Ops = insert_op(channel_stats, Id, Entry, Ops0),
+ {NextStats, Ops, State};
+aggregate_entry({{Ch, X} = Id, Publish0, Confirm, ReturnUnroutable, DropUnroutable, 0},
+ NextStats, Ops0,
+ #state{table = channel_exchange_metrics,
+ policies = {BPolicies, DPolicies, GPolicies},
+ rates_mode = RatesMode,
+ lookup_exchange = ExchangeFun} = State) ->
+ Stats = ?channel_stats_fine_stats(Publish0, Confirm, ReturnUnroutable, DropUnroutable),
+ {Publish, _, _, _} = Diff = get_difference(Id, Stats, State),
+ Ops1 = insert_entry_ops(channel_stats_fine_stats, Ch, true, Diff, Ops0,
+ BPolicies),
+ Ops2 = insert_entry_ops(vhost_stats_fine_stats, vhost(X), true, Diff, Ops1,
+ GPolicies),
+ Ops3 = case {ExchangeFun(X), RatesMode} of
+ {true, basic} ->
+ Entry = ?exchange_stats_publish_in(Publish),
+ insert_entry_ops(exchange_stats_publish_in, X, true, Entry,
+ Ops2, DPolicies);
+ {true, _} ->
+ Entry = ?exchange_stats_publish_in(Publish),
+ O = insert_entry_ops(exchange_stats_publish_in, X, true,
+ Entry, Ops2, DPolicies),
+ insert_entry_ops(channel_exchange_stats_fine_stats, Id,
+ false, Stats, O, DPolicies);
+ _ ->
+ Ops2
+ end,
+ {insert_old_aggr_stats(NextStats, Id, Stats), Ops3, State};
+aggregate_entry({{_Ch, X} = Id, Publish0, Confirm, ReturnUnroutable, DropUnroutable, 1},
+ NextStats, Ops0,
+ #state{table = channel_exchange_metrics,
+ policies = {_BPolicies, DPolicies, GPolicies},
+ lookup_exchange = ExchangeFun} = State) ->
+ Stats = ?channel_stats_fine_stats(Publish0, Confirm, ReturnUnroutable, DropUnroutable),
+ {Publish, _, _, _} = Diff = get_difference(Id, Stats, State),
+ Ops1 = insert_entry_ops(vhost_stats_fine_stats, vhost(X), true, Diff, Ops0,
+ GPolicies),
+ Ops2 = case ExchangeFun(X) of
+ true ->
+ Entry = ?exchange_stats_publish_in(Publish),
+ insert_entry_ops(exchange_stats_publish_in, X, true, Entry,
+ Ops1, DPolicies);
+ _ ->
+ Ops1
+ end,
+ rabbit_core_metrics:delete(channel_exchange_metrics, Id),
+ {NextStats, Ops2, State};
+aggregate_entry({{Ch, Q} = Id, Get, GetNoAck, Deliver, DeliverNoAck,
+ Redeliver, Ack, GetEmpty, 0}, NextStats, Ops0,
+ #state{table = channel_queue_metrics,
+ policies = {BPolicies, DPolicies, GPolicies},
+ rates_mode = RatesMode,
+ lookup_queue = QueueFun} = State) ->
+ Stats = ?vhost_stats_deliver_stats(Get, GetNoAck, Deliver, DeliverNoAck,
+ Redeliver, Ack,
+ Deliver + DeliverNoAck + Get + GetNoAck,
+ GetEmpty),
+ Diff = get_difference(Id, Stats, State),
+ Ops1 = insert_entry_ops(vhost_stats_deliver_stats, vhost(Q), true, Diff,
+ Ops0, GPolicies),
+ Ops2 = insert_entry_ops(channel_stats_deliver_stats, Ch, true, Diff, Ops1,
+ BPolicies),
+ Ops3 = case {QueueFun(Q), RatesMode} of
+ {true, basic} ->
+ insert_entry_ops(queue_stats_deliver_stats, Q, true, Diff,
+ Ops2, BPolicies);
+ {true, _} ->
+ O = insert_entry_ops(queue_stats_deliver_stats, Q, true,
+ Diff, Ops2, BPolicies),
+ insert_entry_ops(channel_queue_stats_deliver_stats, Id,
+ false, Stats, O, DPolicies);
+ _ ->
+ Ops2
+ end,
+ {insert_old_aggr_stats(NextStats, Id, Stats), Ops3, State};
+aggregate_entry({{_, Q} = Id, Get, GetNoAck, Deliver, DeliverNoAck,
+ Redeliver, Ack, GetEmpty, 1}, NextStats, Ops0,
+ #state{table = channel_queue_metrics,
+ policies = {BPolicies, _, GPolicies},
+ lookup_queue = QueueFun} = State) ->
+ Stats = ?vhost_stats_deliver_stats(Get, GetNoAck, Deliver, DeliverNoAck,
+ Redeliver, Ack,
+ Deliver + DeliverNoAck + Get + GetNoAck,
+ GetEmpty),
+ Diff = get_difference(Id, Stats, State),
+ Ops1 = insert_entry_ops(vhost_stats_deliver_stats, vhost(Q), true, Diff,
+ Ops0, GPolicies),
+ Ops2 = case QueueFun(Q) of
+ true ->
+ insert_entry_ops(queue_stats_deliver_stats, Q, true, Diff,
+ Ops1, BPolicies);
+ _ ->
+ Ops1
+ end,
+ rabbit_core_metrics:delete(channel_queue_metrics, Id),
+ {NextStats, Ops2, State};
+aggregate_entry({{_Ch, {Q, X}} = Id, Publish, ToDelete}, NextStats, Ops0,
+ #state{table = channel_queue_exchange_metrics,
+ policies = {BPolicies, _, _},
+ rates_mode = RatesMode,
+ lookup_queue = QueueFun,
+ lookup_exchange = ExchangeFun} = State) ->
+ Stats = ?queue_stats_publish(Publish),
+ Diff = get_difference(Id, Stats, State),
+ Ops1 = case {QueueFun(Q), ExchangeFun(X), RatesMode, ToDelete} of
+ {true, false, _, _} ->
+ insert_entry_ops(queue_stats_publish, Q, true, Diff,
+ Ops0, BPolicies);
+ {false, true, _, _} ->
+ insert_entry_ops(exchange_stats_publish_out, X, true, Diff,
+ Ops0, BPolicies);
+ {true, true, basic, _} ->
+ O = insert_entry_ops(queue_stats_publish, Q, true, Diff,
+ Ops0, BPolicies),
+ insert_entry_ops(exchange_stats_publish_out, X, true, Diff,
+ O, BPolicies);
+ {true, true, _, 0} ->
+ O1 = insert_entry_ops(queue_stats_publish, Q, true, Diff,
+ Ops0, BPolicies),
+ O2 = insert_entry_ops(exchange_stats_publish_out, X, true,
+ Diff, O1, BPolicies),
+ insert_entry_ops(queue_exchange_stats_publish, {Q, X},
+ true, Diff, O2, BPolicies);
+ {true, true, _, 1} ->
+ O = insert_entry_ops(queue_stats_publish, Q, true, Diff,
+ Ops0, BPolicies),
+ insert_entry_ops(exchange_stats_publish_out, X, true,
+ Diff, O, BPolicies);
+ _ ->
+ Ops0
+ end,
+ case ToDelete of
+ 0 ->
+ {insert_old_aggr_stats(NextStats, Id, Stats), Ops1, State};
+ 1 ->
+ rabbit_core_metrics:delete(channel_queue_exchange_metrics, Id),
+ {NextStats, Ops1, State}
+ end;
+aggregate_entry({Id, Reductions}, NextStats, Ops0,
+ #state{table = channel_process_metrics,
+ policies = {BPolicies, _, _}} = State) ->
+ Entry = ?channel_process_stats(Reductions),
+ Ops = insert_entry_ops(channel_process_stats, Id, false,
+ Entry, Ops0, BPolicies),
+ {NextStats, Ops, State};
+aggregate_entry({Id, Exclusive, AckRequired, PrefetchCount,
+ Active, ActivityStatus, Args},
+ NextStats, Ops0,
+ #state{table = consumer_created} = State) ->
+ case ets:lookup(consumer_stats, Id) of
+ [] ->
+ Fmt = rabbit_mgmt_format:format([{exclusive, Exclusive},
+ {ack_required, AckRequired},
+ {prefetch_count, PrefetchCount},
+ {active, Active},
+ {activity_status, ActivityStatus},
+ {arguments, Args}], {[], false}),
+ Entry = ?consumer_stats(Id, Fmt),
+ Ops = insert_with_index_op(consumer_stats, Id, Entry, Ops0),
+ {NextStats, Ops , State};
+ [{_K, V}] ->
+ CurrentActive = proplists:get_value(active, V, undefined),
+ case Active =:= CurrentActive of
+ false ->
+ Fmt = rabbit_mgmt_format:format([{exclusive, Exclusive},
+ {ack_required, AckRequired},
+ {prefetch_count, PrefetchCount},
+ {active, Active},
+ {activity_status, ActivityStatus},
+ {arguments, Args}], {[], false}),
+ Entry = ?consumer_stats(Id, Fmt),
+ Ops = insert_with_index_op(consumer_stats, Id, Entry, Ops0),
+ {NextStats, Ops , State};
+ _ ->
+ {NextStats, Ops0, State}
+ end;
+ _ ->
+ {NextStats, Ops0, State}
+ end;
+aggregate_entry({Id, Metrics, 0}, NextStats, Ops0,
+ #state{table = queue_metrics,
+ policies = {BPolicies, _, GPolicies},
+ lookup_queue = QueueFun} = State) ->
+ Stats = ?queue_msg_rates(pget(disk_reads, Metrics, 0),
+ pget(disk_writes, Metrics, 0)),
+ Diff = get_difference(Id, Stats, State),
+ Ops1 = insert_entry_ops(vhost_msg_rates, vhost(Id), true, Diff, Ops0,
+ GPolicies),
+ Ops2 = case QueueFun(Id) of
+ true ->
+ O = insert_entry_ops(queue_msg_rates, Id, false, Stats, Ops1,
+ BPolicies),
+ Fmt = rabbit_mgmt_format:format(
+ Metrics,
+ {fun rabbit_mgmt_format:format_queue_stats/1, false}),
+ insert_op(queue_stats, Id, ?queue_stats(Id, Fmt), O);
+ false ->
+ Ops1
+ end,
+ {insert_old_aggr_stats(NextStats, Id, Stats), Ops2, State};
+aggregate_entry({Id, Metrics, 1}, NextStats, Ops0,
+ #state{table = queue_metrics,
+ policies = {_, _, GPolicies}} = State) ->
+ Stats = ?queue_msg_rates(pget(disk_reads, Metrics, 0),
+ pget(disk_writes, Metrics, 0)),
+ Diff = get_difference(Id, Stats, State),
+ Ops = insert_entry_ops(vhost_msg_rates, vhost(Id), true, Diff, Ops0,
+ GPolicies),
+ rabbit_core_metrics:delete(queue_metrics, Id),
+ {NextStats, Ops, State};
+aggregate_entry({Name, Ready, Unack, Msgs, Red}, NextStats, Ops0,
+ #state{table = queue_coarse_metrics,
+ old_aggr_stats = Old,
+ policies = {BPolicies, _, GPolicies},
+ lookup_queue = QueueFun} = State) ->
+ Stats = ?vhost_msg_stats(Ready, Unack, Msgs),
+ Diff = get_difference(Name, Stats, State),
+ Ops1 = insert_entry_ops(vhost_msg_stats, vhost(Name), true, Diff, Ops0,
+ GPolicies),
+ Ops2 = case QueueFun(Name) of
+ true ->
+ QPS =?queue_process_stats(Red),
+ O1 = insert_entry_ops(queue_process_stats, Name, false, QPS,
+ Ops1, BPolicies),
+ QMS = ?queue_msg_stats(Ready, Unack, Msgs),
+ insert_entry_ops(queue_msg_stats, Name, false, QMS,
+ O1, BPolicies);
+ _ ->
+ Ops1
+ end,
+ State1 = State#state{old_aggr_stats = maps:remove(Name, Old)},
+ {insert_old_aggr_stats(NextStats, Name, Stats), Ops2, State1};
+aggregate_entry({Id, Metrics}, NextStats, Ops0,
+ #state{table = node_metrics} = State) ->
+ Ops = insert_op(node_stats, Id, {Id, Metrics}, Ops0),
+ {NextStats, Ops, State};
+aggregate_entry({Id, Metrics}, NextStats, Ops0,
+ #state{table = node_coarse_metrics,
+ policies = {_, _, GPolicies}} = State) ->
+ Stats = ?node_coarse_stats(
+ pget(fd_used, Metrics, 0), pget(sockets_used, Metrics, 0),
+ pget(mem_used, Metrics, 0), pget(disk_free, Metrics, 0),
+ pget(proc_used, Metrics, 0), pget(gc_num, Metrics, 0),
+ pget(gc_bytes_reclaimed, Metrics, 0),
+ pget(context_switches, Metrics, 0)),
+ Ops = insert_entry_ops(node_coarse_stats, Id, false, Stats, Ops0,
+ GPolicies),
+ {NextStats, Ops, State};
+aggregate_entry({Id, Metrics}, NextStats, Ops0,
+ #state{table = node_persister_metrics,
+ policies = {_, _, GPolicies}} = State) ->
+ Stats = ?node_persister_stats(
+ pget(io_read_count, Metrics, 0), pget(io_read_bytes, Metrics, 0),
+ pget(io_read_time, Metrics, 0), pget(io_write_count, Metrics, 0),
+ pget(io_write_bytes, Metrics, 0), pget(io_write_time, Metrics, 0),
+ pget(io_sync_count, Metrics, 0), pget(io_sync_time, Metrics, 0),
+ pget(io_seek_count, Metrics, 0), pget(io_seek_time, Metrics, 0),
+ pget(io_reopen_count, Metrics, 0), pget(mnesia_ram_tx_count, Metrics, 0),
+ pget(mnesia_disk_tx_count, Metrics, 0), pget(msg_store_read_count, Metrics, 0),
+ pget(msg_store_write_count, Metrics, 0),
+ pget(queue_index_journal_write_count, Metrics, 0),
+ pget(queue_index_write_count, Metrics, 0), pget(queue_index_read_count, Metrics, 0),
+ pget(io_file_handle_open_attempt_count, Metrics, 0),
+ pget(io_file_handle_open_attempt_time, Metrics, 0)),
+ Ops = insert_entry_ops(node_persister_stats, Id, false, Stats, Ops0,
+ GPolicies),
+ {NextStats, Ops, State};
+aggregate_entry({Id, Metrics}, NextStats, Ops0,
+ #state{table = node_node_metrics,
+ policies = {_, _, GPolicies}} = State) ->
+ Stats = ?node_node_coarse_stats(pget(send_bytes, Metrics, 0),
+ pget(recv_bytes, Metrics, 0)),
+ CleanMetrics = lists:keydelete(recv_bytes, 1,
+ lists:keydelete(send_bytes, 1, Metrics)),
+ Ops1 = insert_op(node_node_stats, Id, ?node_node_stats(Id, CleanMetrics),
+ Ops0),
+ Ops = insert_entry_ops(node_node_coarse_stats, Id, false, Stats, Ops1,
+ GPolicies),
+ {NextStats, Ops, State};
+aggregate_entry({Id, ConnCreated, ConnClosed, ChCreated, ChClosed,
+ QueueDeclared, QueueCreated, QueueDeleted}, NextStats, Ops0,
+ #state{table = connection_churn_metrics,
+ policies = {_, _, GPolicies}} = State) ->
+ %% Id is the local node. There is only one entry on every ETS table.
+ Stats = ?connection_churn_rates(ConnCreated, ConnClosed, ChCreated, ChClosed,
+ QueueDeclared, QueueCreated, QueueDeleted),
+ Diff = get_difference(Id, Stats, State),
+ Ops = insert_entry_ops(connection_churn_rates, Id, true, Diff, Ops0,
+ GPolicies),
+ {insert_old_aggr_stats(NextStats, Id, Stats), Ops, State}.
+insert_entry(Table, Id, TS, Entry, Size, Interval0, Incremental) ->
+ Key = {Id, Interval0},
+ Slide =
+ case ets:lookup(Table, Key) of
+ [{Key, S}] ->
+ S;
+ [] ->
+ IntervalMs = Interval0 * 1000,
+ % add some margin to Size and max_n to reduce chances of off-by-one errors
+ exometer_slide:new(TS - IntervalMs, (Size + Interval0) * 1000,
+ [{interval, IntervalMs},
+ {max_n, ceil(Size / Interval0) + 1},
+ {incremental, Incremental}])
+ end,
+ insert_with_index(Table, Key, {Key, exometer_slide:add_element(TS, Entry,
+ Slide)}).
+update_op(Table, Key, Op, Ops) ->
+ TableOps = case maps:find(Table, Ops) of
+ {ok, Inner} ->
+ maps:put(Key, Op, Inner);
+ error ->
+ Inner = #{},
+ maps:put(Key, Op, Inner)
+ end,
+ maps:put(Table, TableOps, Ops).
+insert_with_index_op(Table, Key, Entry, Ops) ->
+ update_op(Table, Key, {insert_with_index, Entry}, Ops).
+insert_op(Table, Key, Entry, Ops) ->
+ update_op(Table, Key, {insert, Entry}, Ops).
+insert_entry_op(Table, Key, Entry, Ops) ->
+ TableOps0 = case maps:find(Table, Ops) of
+ {ok, Inner} -> Inner;
+ error -> #{}
+ end,
+ TableOps = maps:update_with(Key, fun({insert_entry, Entry0}) ->
+ {insert_entry, sum_entry(Entry0, Entry)}
+ end, {insert_entry, Entry}, TableOps0),
+ maps:put(Table, TableOps, Ops).
+insert_entry_ops(Table, Id, Incr, Entry, Ops, Policies) ->
+ lists:foldl(fun({Size, Interval}, Acc) ->
+ Key = {Id, Size, Interval, Incr},
+ insert_entry_op(Table, Key, Entry, Acc)
+ end, Ops, Policies).
+get_difference(Id, Stats, #state{old_aggr_stats = OldStats}) ->
+ case maps:find(Id, OldStats) of
+ error ->
+ Stats;
+ {ok, OldStat} ->
+ difference(OldStat, Stats)
+ end.
+sum_entry({A0}, {B0}) ->
+ {B0 + A0};
+sum_entry({A0, A1}, {B0, B1}) ->
+ {B0 + A0, B1 + A1};
+sum_entry({A0, A1, A2}, {B0, B1, B2}) ->
+ {B0 + A0, B1 + A1, B2 + A2};
+sum_entry({A0, A1, A2, A3}, {B0, B1, B2, B3}) ->
+ {B0 + A0, B1 + A1, B2 + A2, B3 + A3};
+sum_entry({A0, A1, A2, A3, A4}, {B0, B1, B2, B3, B4}) ->
+ {B0 + A0, B1 + A1, B2 + A2, B3 + A3, B4 + A4};
+sum_entry({A0, A1, A2, A3, A4, A5}, {B0, B1, B2, B3, B4, B5}) ->
+ {B0 + A0, B1 + A1, B2 + A2, B3 + A3, B4 + A4, B5 + A5};
+sum_entry({A0, A1, A2, A3, A4, A5, A6}, {B0, B1, B2, B3, B4, B5, B6}) ->
+ {B0 + A0, B1 + A1, B2 + A2, B3 + A3, B4 + A4, B5 + A5, B6 + A6};
+sum_entry({A0, A1, A2, A3, A4, A5, A6, A7}, {B0, B1, B2, B3, B4, B5, B6, B7}) ->
+ {B0 + A0, B1 + A1, B2 + A2, B3 + A3, B4 + A4, B5 + A5, B6 + A6, B7 + A7}.
+difference({A0}, {B0}) ->
+ {B0 - A0};
+difference({A0, A1}, {B0, B1}) ->
+ {B0 - A0, B1 - A1};
+difference({A0, A1, A2}, {B0, B1, B2}) ->
+ {B0 - A0, B1 - A1, B2 - A2};
+difference({A0, A1, A2, A3}, {B0, B1, B2, B3}) ->
+ {B0 - A0, B1 - A1, B2 - A2, B3 - A3};
+difference({A0, A1, A2, A3, A4}, {B0, B1, B2, B3, B4}) ->
+ {B0 - A0, B1 - A1, B2 - A2, B3 - A3, B4 - A4};
+difference({A0, A1, A2, A3, A4, A5}, {B0, B1, B2, B3, B4, B5}) ->
+ {B0 - A0, B1 - A1, B2 - A2, B3 - A3, B4 - A4, B5 - A5};
+difference({A0, A1, A2, A3, A4, A5, A6}, {B0, B1, B2, B3, B4, B5, B6}) ->
+ {B0 - A0, B1 - A1, B2 - A2, B3 - A3, B4 - A4, B5 - A5, B6 - A6};
+difference({A0, A1, A2, A3, A4, A5, A6, A7}, {B0, B1, B2, B3, B4, B5, B6, B7}) ->
+ {B0 - A0, B1 - A1, B2 - A2, B3 - A3, B4 - A4, B5 - A5, B6 - A6, B7 - A7}.
+vhost(#resource{virtual_host = VHost}) ->
+ VHost;
+vhost({queue_stats, #resource{virtual_host = VHost}}) ->
+ VHost;
+vhost({TName, Pid}) ->
+ pget(vhost, lookup_element(TName, Pid, 2)).
+exchange_exists(Name) ->
+ case rabbit_exchange:lookup(Name) of
+ {ok, _} ->
+ true;
+ _ ->
+ false
+ end.
+queue_exists(Name) ->
+ case rabbit_amqqueue:lookup(Name) of
+ {ok, _} ->
+ true;
+ _ ->
+ false
+ end.
+insert_with_index(Table, Key, Tuple) ->
+ Insert = ets:insert(Table, Tuple),
+ insert_index(Table, Key),
+ Insert.
+insert_index(consumer_stats, {Q, Ch, _} = Key) ->
+ ets:insert(index_table(consumer_stats, queue), {Q, Key}),
+ ets:insert(index_table(consumer_stats, channel), {Ch, Key});
+insert_index(channel_exchange_stats_fine_stats, {{Ch, Ex}, _} = Key) ->
+ ets:insert(index_table(channel_exchange_stats_fine_stats, exchange), {Ex, Key}),
+ ets:insert(index_table(channel_exchange_stats_fine_stats, channel), {Ch, Key});
+insert_index(channel_queue_stats_deliver_stats, {{Ch, Q}, _} = Key) ->
+ ets:insert(index_table(channel_queue_stats_deliver_stats, queue), {Q, Key}),
+ ets:insert(index_table(channel_queue_stats_deliver_stats, channel), {Ch, Key});
+insert_index(queue_exchange_stats_publish, {{Q, Ex}, _} = Key) ->
+ ets:insert(index_table(queue_exchange_stats_publish, queue), {Q, Key}),
+ ets:insert(index_table(queue_exchange_stats_publish, exchange), {Ex, Key});
+insert_index(node_node_coarse_stats, {{_, Node}, _} = Key) ->
+ ets:insert(index_table(node_node_coarse_stats, node), {Node, Key});
+insert_index(_, _) -> ok.
+index_table(consumer_stats, queue) -> consumer_stats_queue_index;
+index_table(consumer_stats, channel) -> consumer_stats_channel_index;
+index_table(channel_exchange_stats_fine_stats, exchange) -> channel_exchange_stats_fine_stats_exchange_index;
+index_table(channel_exchange_stats_fine_stats, channel) -> channel_exchange_stats_fine_stats_channel_index;
+index_table(channel_queue_stats_deliver_stats, queue) -> channel_queue_stats_deliver_stats_queue_index;
+index_table(channel_queue_stats_deliver_stats, channel) -> channel_queue_stats_deliver_stats_channel_index;
+index_table(queue_exchange_stats_publish, queue) -> queue_exchange_stats_publish_queue_index;
+index_table(queue_exchange_stats_publish, exchange) -> queue_exchange_stats_publish_exchange_index;
+index_table(node_node_coarse_stats, node) -> node_node_coarse_stats_node_index.
+load_config() ->
+ RatesMode = rabbit_mgmt_agent_config:get_env(rates_mode),
+ Policies = rabbit_mgmt_agent_config:get_env(sample_retention_policies, []),
+ {RatesMode, Policies}.
+ceil(X) when X < 0 ->
+ trunc(X);
+ceil(X) ->
+ T = trunc(X),
+ case X - T == 0 of
+ true -> T;
+ false -> T + 1
+ end.
+pget(Key, List) -> pget(Key, List, unknown).
+pget(Key, List, Default) when is_number(Default) ->
+ case rabbit_misc:pget(Key, List) of
+ Number when is_number(Number) ->
+ Number;
+ _Other ->
+ Default
+ end;
+pget(Key, List, Default) ->
+ rabbit_misc:pget(Key, List, Default).
diff --git a/deps/rabbitmq_management_agent/src/rabbit_mgmt_metrics_gc.erl b/deps/rabbitmq_management_agent/src/rabbit_mgmt_metrics_gc.erl
new file mode 100644
index 0000000000..f1ae48e0e4
--- /dev/null
+++ b/deps/rabbitmq_management_agent/src/rabbit_mgmt_metrics_gc.erl
@@ -0,0 +1,175 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+-record(state, {basic_i,
+ detailed_i,
+ global_i}).
+-spec start_link(atom()) -> rabbit_types:ok_pid_or_error().
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
+name(EventType) ->
+ list_to_atom((atom_to_list(EventType) ++ "_metrics_gc")).
+start_link(EventType) ->
+ gen_server:start_link({local, name(EventType)}, ?MODULE, [], []).
+init(_) ->
+ Policies = rabbit_mgmt_agent_config:get_env(sample_retention_policies),
+ {ok, #state{basic_i = intervals(basic, Policies),
+ global_i = intervals(global, Policies),
+ detailed_i = intervals(detailed, Policies)}}.
+handle_call(_Request, _From, State) ->
+ {noreply, State}.
+handle_cast({event, #event{type = connection_closed, props = Props}},
+ State = #state{basic_i = BIntervals}) ->
+ Pid = pget(pid, Props),
+ remove_connection(Pid, BIntervals),
+ {noreply, State};
+handle_cast({event, #event{type = channel_closed, props = Props}},
+ State = #state{basic_i = BIntervals}) ->
+ Pid = pget(pid, Props),
+ remove_channel(Pid, BIntervals),
+ {noreply, State};
+handle_cast({event, #event{type = consumer_deleted, props = Props}}, State) ->
+ remove_consumer(Props),
+ {noreply, State};
+handle_cast({event, #event{type = exchange_deleted, props = Props}},
+ State = #state{basic_i = BIntervals}) ->
+ Name = pget(name, Props),
+ remove_exchange(Name, BIntervals),
+ {noreply, State};
+handle_cast({event, #event{type = queue_deleted, props = Props}},
+ State = #state{basic_i = BIntervals}) ->
+ Name = pget(name, Props),
+ remove_queue(Name, BIntervals),
+ {noreply, State};
+handle_cast({event, #event{type = vhost_deleted, props = Props}},
+ State = #state{global_i = GIntervals}) ->
+ Name = pget(name, Props),
+ remove_vhost(Name, GIntervals),
+ {noreply, State};
+handle_cast({event, #event{type = node_node_deleted, props = Props}}, State) ->
+ Name = pget(route, Props),
+ remove_node_node(Name),
+ {noreply, State}.
+handle_info(_Msg, State) ->
+ {noreply, State}.
+terminate(_Reason, _State) ->
+ ok.
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+remove_connection(Id, BIntervals) ->
+ ets:delete(connection_created_stats, Id),
+ ets:delete(connection_stats, Id),
+ delete_samples(connection_stats_coarse_conn_stats, Id, BIntervals),
+ ok.
+remove_channel(Id, BIntervals) ->
+ ets:delete(channel_created_stats, Id),
+ ets:delete(channel_stats, Id),
+ delete_samples(channel_process_stats, Id, BIntervals),
+ delete_samples(channel_stats_fine_stats, Id, BIntervals),
+ delete_samples(channel_stats_deliver_stats, Id, BIntervals),
+ index_delete(consumer_stats, channel, Id),
+ index_delete(channel_exchange_stats_fine_stats, channel, Id),
+ index_delete(channel_queue_stats_deliver_stats, channel, Id),
+ ok.
+remove_consumer(Props) ->
+ Id = {pget(queue, Props), pget(channel, Props), pget(consumer_tag, Props)},
+ ets:delete(consumer_stats, Id),
+ cleanup_index(consumer_stats, Id),
+ ok.
+remove_exchange(Name, BIntervals) ->
+ delete_samples(exchange_stats_publish_out, Name, BIntervals),
+ delete_samples(exchange_stats_publish_in, Name, BIntervals),
+ index_delete(queue_exchange_stats_publish, exchange, Name),
+ index_delete(channel_exchange_stats_fine_stats, exchange, Name),
+ ok.
+remove_queue(Name, BIntervals) ->
+ ets:delete(queue_stats, Name),
+ delete_samples(queue_stats_publish, Name, BIntervals),
+ delete_samples(queue_stats_deliver_stats, Name, BIntervals),
+ delete_samples(queue_process_stats, Name, BIntervals),
+ delete_samples(queue_msg_stats, Name, BIntervals),
+ delete_samples(queue_msg_rates, Name, BIntervals),
+ index_delete(channel_queue_stats_deliver_stats, queue, Name),
+ index_delete(queue_exchange_stats_publish, queue, Name),
+ index_delete(consumer_stats, queue, Name),
+ ok.
+remove_vhost(Name, GIntervals) ->
+ delete_samples(vhost_stats_coarse_conn_stats, Name, GIntervals),
+ delete_samples(vhost_stats_fine_stats, Name, GIntervals),
+ delete_samples(vhost_stats_deliver_stats, Name, GIntervals),
+ ok.
+remove_node_node(Name) ->
+ index_delete(node_node_coarse_stats, node, Name),
+ ok.
+intervals(Type, Policies) ->
+ [I || {_, I} <- proplists:get_value(Type, Policies)].
+delete_samples(Table, Id, Intervals) ->
+ [ets:delete(Table, {Id, I}) || I <- Intervals],
+ ok.
+index_delete(Table, Type, Id) ->
+ IndexTable = rabbit_mgmt_metrics_collector:index_table(Table, Type),
+ Keys = ets:lookup(IndexTable, Id),
+ [ begin
+ ets:delete(Table, Key),
+ cleanup_index(Table, Key)
+ end
+ || {_Index, Key} <- Keys ],
+ ets:delete(IndexTable, Id),
+ ok.
+cleanup_index(consumer_stats, {Q, Ch, _} = Key) ->
+ delete_index(consumer_stats, queue, {Q, Key}),
+ delete_index(consumer_stats, channel, {Ch, Key}),
+ ok;
+cleanup_index(channel_exchange_stats_fine_stats, {{Ch, Ex}, _} = Key) ->
+ delete_index(channel_exchange_stats_fine_stats, exchange, {Ex, Key}),
+ delete_index(channel_exchange_stats_fine_stats, channel, {Ch, Key}),
+ ok;
+cleanup_index(channel_queue_stats_deliver_stats, {{Ch, Q}, _} = Key) ->
+ delete_index(channel_queue_stats_deliver_stats, queue, {Q, Key}),
+ delete_index(channel_queue_stats_deliver_stats, channel, {Ch, Key}),
+ ok;
+cleanup_index(queue_exchange_stats_publish, {{Q, Ex}, _} = Key) ->
+ delete_index(queue_exchange_stats_publish, queue, {Q, Key}),
+ delete_index(queue_exchange_stats_publish, exchange, {Ex, Key}),
+ ok;
+cleanup_index(node_node_coarse_stats, {{_, Node}, _} = Key) ->
+ delete_index(node_node_coarse_stats, node, {Node, Key}),
+ ok;
+cleanup_index(_, _) -> ok.
+delete_index(Table, Index, Obj) ->
+ ets:delete_object(rabbit_mgmt_metrics_collector:index_table(Table, Index),
+ Obj).
+pget(Key, List) -> rabbit_misc:pget(Key, List, unknown).
diff --git a/deps/rabbitmq_management_agent/src/rabbit_mgmt_storage.erl b/deps/rabbitmq_management_agent/src/rabbit_mgmt_storage.erl
new file mode 100644
index 0000000000..4c5c8c18ef
--- /dev/null
+++ b/deps/rabbitmq_management_agent/src/rabbit_mgmt_storage.erl
@@ -0,0 +1,57 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+-record(state, {}).
+-spec start_link() -> rabbit_types:ok_pid_or_error().
+-export([reset/0, reset_all/0]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
+%% ETS owner
+start_link() ->
+ gen_server2:start_link({local, ?MODULE}, ?MODULE, [], []).
+reset() ->
+ rabbit_log:warning("Resetting RabbitMQ management storage"),
+ [ets:delete_all_objects(IndexTable) || IndexTable <- ?INDEX_TABLES],
+ [ets:delete_all_objects(Table) || {Table, _} <- ?TABLES],
+ _ = rabbit_mgmt_metrics_collector:reset_all(),
+ ok.
+reset_all() ->
+ _ = [rpc:call(Node, rabbit_mgmt_storage, reset, [])
+ || Node <- rabbit_nodes:all_running()],
+ ok.
+init(_) ->
+ _ = [ets:new(IndexTable, [public, bag, named_table])
+ || IndexTable <- ?INDEX_TABLES],
+ _ = [ets:new(Table, [public, Type, named_table])
+ || {Table, Type} <- ?TABLES],
+ _ = ets:new(rabbit_mgmt_db_cache, [public, set, named_table]),
+ {ok, #state{}}.
+handle_call(_Request, _From, State) ->
+ {noreply, State}.
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+handle_info(_Msg, State) ->
+ {noreply, State}.
+terminate(_Reason, _State) ->
+ ok.
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.