summaryrefslogtreecommitdiff
path: root/deps/rabbit/test/metrics_SUITE.erl
diff options
context:
space:
mode:
Diffstat (limited to 'deps/rabbit/test/metrics_SUITE.erl')
-rw-r--r--deps/rabbit/test/metrics_SUITE.erl404
1 files changed, 404 insertions, 0 deletions
diff --git a/deps/rabbit/test/metrics_SUITE.erl b/deps/rabbit/test/metrics_SUITE.erl
new file mode 100644
index 0000000000..e585ccd5a8
--- /dev/null
+++ b/deps/rabbit/test/metrics_SUITE.erl
@@ -0,0 +1,404 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2016-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+-module(metrics_SUITE).
+-compile(export_all).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("proper/include/proper.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+-include_lib("rabbit_common/include/rabbit_core_metrics.hrl").
+-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
+
+
+all() ->
+ [
+ {group, non_parallel_tests}
+ ].
+
+groups() ->
+ [
+ {non_parallel_tests, [], [
+ connection,
+ channel,
+ channel_connection_close,
+ channel_queue_exchange_consumer_close_connection,
+ channel_queue_delete_queue,
+ connection_metric_count_test,
+ channel_metric_count_test,
+ queue_metric_count_test,
+ queue_metric_count_channel_per_queue_test,
+ connection_metric_idemp_test,
+ channel_metric_idemp_test,
+ queue_metric_idemp_test
+ ]}
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+merge_app_env(Config) ->
+ rabbit_ct_helpers:merge_app_env(Config,
+ {rabbit, [
+ {collect_statistics, fine},
+ {collect_statistics_interval, 500}
+ ]}).
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodename_suffix, ?MODULE}
+ ]),
+ rabbit_ct_helpers:run_setup_steps(Config1,
+ [ fun merge_app_env/1 ] ++
+ rabbit_ct_broker_helpers:setup_steps()).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config,
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_group(_, Config) ->
+ Config.
+
+end_per_group(_, Config) ->
+ Config.
+
+init_per_testcase(Testcase, Config) ->
+ clean_core_metrics(Config),
+ rabbit_ct_helpers:testcase_started(Config, Testcase).
+
+end_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+
+%% -------------------------------------------------------------------
+%% Testcases.
+%% -------------------------------------------------------------------
+
+% NB: node_stats tests are in the management_agent repo
+
+connection_metric_count_test(Config) ->
+ rabbit_ct_proper_helpers:run_proper(fun prop_connection_metric_count/1, [Config], 25).
+
+channel_metric_count_test(Config) ->
+ rabbit_ct_proper_helpers:run_proper(fun prop_channel_metric_count/1, [Config], 25).
+
+queue_metric_count_test(Config) ->
+ rabbit_ct_proper_helpers:run_proper(fun prop_queue_metric_count/1, [Config], 5).
+
+queue_metric_count_channel_per_queue_test(Config) ->
+ rabbit_ct_proper_helpers:run_proper(fun prop_queue_metric_count_channel_per_queue/1,
+ [Config], 5).
+
+connection_metric_idemp_test(Config) ->
+ connection_metric_idemp(Config, {1, 1}),
+ connection_metric_idemp(Config, {1, 2}),
+ connection_metric_idemp(Config, {2, 2}).
+
+channel_metric_idemp_test(Config) ->
+ rabbit_ct_proper_helpers:run_proper(fun prop_channel_metric_idemp/1, [Config], 25).
+
+queue_metric_idemp_test(Config) ->
+ rabbit_ct_proper_helpers:run_proper(fun prop_queue_metric_idemp/1, [Config], 25).
+
+prop_connection_metric_idemp(Config) ->
+ ?FORALL(N, {integer(1, 25), integer(1, 25)},
+ connection_metric_idemp(Config, N)).
+
+prop_channel_metric_idemp(Config) ->
+ ?FORALL(N, {integer(1, 25), integer(1, 25)},
+ channel_metric_idemp(Config, N)).
+
+prop_queue_metric_idemp(Config) ->
+ ?FORALL(N, {integer(1, 25), integer(1, 25)},
+ queue_metric_idemp(Config, N)).
+
+prop_connection_metric_count(Config) ->
+ ?FORALL(N, {integer(1, 25), resize(100, list(oneof([add, remove])))},
+ connection_metric_count(Config, N)).
+
+prop_channel_metric_count(Config) ->
+ ?FORALL(N, {integer(1, 25), resize(100, list(oneof([add, remove])))},
+ channel_metric_count(Config, N)).
+
+prop_queue_metric_count(Config) ->
+ ?FORALL(N, {integer(1, 10), resize(10, list(oneof([add, remove])))},
+ queue_metric_count(Config, N)).
+
+prop_queue_metric_count_channel_per_queue(Config) ->
+ ?FORALL(N, {integer(1, 10), resize(10, list(oneof([add, remove])))},
+ queue_metric_count_channel_per_queue(Config, N)).
+
+connection_metric_idemp(Config, {N, R}) ->
+ Conns = [rabbit_ct_client_helpers:open_unmanaged_connection(Config)
+ || _ <- lists:seq(1, N)],
+ Table = ?awaitMatch(L when is_list(L) andalso length(L) == N,
+ [ Pid || {Pid, _} <- read_table_rpc(Config,
+ connection_metrics)],
+ 5000),
+ Table2 = [ Pid || {Pid, _} <- read_table_rpc(Config, connection_coarse_metrics)],
+ % refresh stats 'R' times
+ [[Pid ! emit_stats || Pid <- Table] || _ <- lists:seq(1, R)],
+ force_metric_gc(Config),
+ TableAfter = [ Pid || {Pid, _} <- read_table_rpc(Config, connection_metrics)],
+ TableAfter2 = [ Pid || {Pid, _} <- read_table_rpc(Config, connection_coarse_metrics)],
+ [rabbit_ct_client_helpers:close_connection(Conn) || Conn <- Conns],
+ ?assertEqual(Table, TableAfter),
+ ?assertEqual(Table2, TableAfter2),
+ ?assertEqual(N, length(Table)),
+ ?assertEqual(N, length(TableAfter)).
+
+channel_metric_idemp(Config, {N, R}) ->
+ Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
+ [amqp_connection:open_channel(Conn) || _ <- lists:seq(1, N)],
+ Table = [ Pid || {Pid, _} <- read_table_rpc(Config, channel_metrics)],
+ Table2 = [ Pid || {Pid, _} <- read_table_rpc(Config, channel_process_metrics)],
+ % refresh stats 'R' times
+ [[Pid ! emit_stats || Pid <- Table] || _ <- lists:seq(1, R)],
+ force_metric_gc(Config),
+ TableAfter = [ Pid || {Pid, _} <- read_table_rpc(Config, channel_metrics)],
+ TableAfter2 = [ Pid || {Pid, _} <- read_table_rpc(Config, channel_process_metrics)],
+ rabbit_ct_client_helpers:close_connection(Conn),
+ (Table2 == TableAfter2) and (Table == TableAfter) and
+ (N == length(Table)) and (N == length(TableAfter)).
+
+queue_metric_idemp(Config, {N, R}) ->
+ clean_core_metrics(Config),
+ Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
+ {ok, Chan} = amqp_connection:open_channel(Conn),
+ Queues =
+ [begin
+ Queue = declare_queue(Chan),
+ ensure_exchange_metrics_populated(Chan, Queue),
+ ensure_channel_queue_metrics_populated(Chan, Queue),
+ Queue
+ end || _ <- lists:seq(1, N)],
+
+ Table = [ Pid || {Pid, _, _} <- read_table_rpc(Config, queue_metrics)],
+ Table2 = [ Pid || {Pid, _, _} <- read_table_rpc(Config, queue_coarse_metrics)],
+ % refresh stats 'R' times
+ ChanTable = read_table_rpc(Config, channel_created),
+ [[Pid ! emit_stats || {Pid, _, _} <- ChanTable ] || _ <- lists:seq(1, R)],
+ force_metric_gc(Config),
+ TableAfter = [ Pid || {Pid, _, _} <- read_table_rpc(Config, queue_metrics)],
+ TableAfter2 = [ Pid || {Pid, _, _} <- read_table_rpc(Config, queue_coarse_metrics)],
+ [ delete_queue(Chan, Q) || Q <- Queues],
+ rabbit_ct_client_helpers:close_connection(Conn),
+ (Table2 == TableAfter2) and (Table == TableAfter) and
+ (N == length(Table)) and (N == length(TableAfter)).
+
+connection_metric_count(Config, Ops) ->
+ add_rem_counter(Config, Ops,
+ {fun rabbit_ct_client_helpers:open_unmanaged_connection/1,
+ fun(Cfg) ->
+ rabbit_ct_client_helpers:close_connection(Cfg)
+ end},
+ [ connection_created,
+ connection_metrics,
+ connection_coarse_metrics ]).
+
+channel_metric_count(Config, Ops) ->
+ Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
+ Result = add_rem_counter(Config, Ops,
+ {fun (_Config) ->
+ {ok, Chan} = amqp_connection:open_channel(Conn),
+ Chan
+ end,
+ fun amqp_channel:close/1},
+ [ channel_created,
+ channel_metrics,
+ channel_process_metrics ]),
+ ok = rabbit_ct_client_helpers:close_connection(Conn),
+ Result.
+
+queue_metric_count(Config, Ops) ->
+ Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
+ {ok, Chan} = amqp_connection:open_channel(Conn),
+ AddFun = fun (_) ->
+ Queue = declare_queue(Chan),
+ ensure_exchange_metrics_populated(Chan, Queue),
+ ensure_channel_queue_metrics_populated(Chan, Queue),
+ force_channel_stats(Config),
+ Queue
+ end,
+ Result = add_rem_counter(Config, Ops,
+ {AddFun,
+ fun (Q) -> delete_queue(Chan, Q),
+ force_metric_gc(Config)
+ end}, [channel_queue_metrics,
+ channel_queue_exchange_metrics ]),
+ ok = rabbit_ct_client_helpers:close_connection(Conn),
+ Result.
+
+queue_metric_count_channel_per_queue(Config, Ops) ->
+ Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
+ AddFun = fun (_) ->
+ {ok, Chan} = amqp_connection:open_channel(Conn),
+ Queue = declare_queue(Chan),
+ ensure_exchange_metrics_populated(Chan, Queue),
+ ensure_channel_queue_metrics_populated(Chan, Queue),
+ force_channel_stats(Config),
+ {Chan, Queue}
+ end,
+ Result = add_rem_counter(Config, Ops,
+ {AddFun,
+ fun ({Chan, Q}) ->
+ delete_queue(Chan, Q),
+ force_metric_gc(Config)
+ end},
+ [ channel_queue_metrics,
+ channel_queue_exchange_metrics ]),
+ ok = rabbit_ct_client_helpers:close_connection(Conn),
+ Result.
+
+add_rem_counter(Config, {Initial, Ops}, {AddFun, RemFun}, Tables) ->
+ Things = [ AddFun(Config) || _ <- lists:seq(1, Initial) ],
+ % either add or remove some things
+ {FinalLen, Things1} =
+ lists:foldl(fun(add, {L, Items}) ->
+ {L+1, [AddFun(Config) | Items]};
+ (remove, {L, [H|Tail]}) ->
+ RemFun(H),
+ {L-1, Tail};
+ (_, S) -> S end,
+ {Initial, Things},
+ Ops),
+ force_metric_gc(Config),
+ TabLens = lists:map(fun(T) ->
+ length(read_table_rpc(Config, T))
+ end, Tables),
+ [RemFun(Thing) || Thing <- Things1],
+ [FinalLen] == lists:usort(TabLens).
+
+
+connection(Config) ->
+ Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
+ [_] = read_table_rpc(Config, connection_created),
+ [_] = read_table_rpc(Config, connection_metrics),
+ [_] = read_table_rpc(Config, connection_coarse_metrics),
+ ok = rabbit_ct_client_helpers:close_connection(Conn),
+ force_metric_gc(Config),
+ [] = read_table_rpc(Config, connection_created),
+ [] = read_table_rpc(Config, connection_metrics),
+ [] = read_table_rpc(Config, connection_coarse_metrics),
+ ok.
+
+channel(Config) ->
+ Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
+ {ok, Chan} = amqp_connection:open_channel(Conn),
+ [_] = read_table_rpc(Config, channel_created),
+ [_] = read_table_rpc(Config, channel_metrics),
+ [_] = read_table_rpc(Config, channel_process_metrics),
+ ok = amqp_channel:close(Chan),
+ [] = read_table_rpc(Config, channel_created),
+ [] = read_table_rpc(Config, channel_metrics),
+ [] = read_table_rpc(Config, channel_process_metrics),
+ ok = rabbit_ct_client_helpers:close_connection(Conn).
+
+channel_connection_close(Config) ->
+ Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
+ {ok, _} = amqp_connection:open_channel(Conn),
+ [_] = read_table_rpc(Config, channel_created),
+ [_] = read_table_rpc(Config, channel_metrics),
+ [_] = read_table_rpc(Config, channel_process_metrics),
+ ok = rabbit_ct_client_helpers:close_connection(Conn),
+ [] = read_table_rpc(Config, channel_created),
+ [] = read_table_rpc(Config, channel_metrics),
+ [] = read_table_rpc(Config, channel_process_metrics).
+
+channel_queue_delete_queue(Config) ->
+ Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
+ {ok, Chan} = amqp_connection:open_channel(Conn),
+ Queue = declare_queue(Chan),
+ ensure_exchange_metrics_populated(Chan, Queue),
+ ensure_channel_queue_metrics_populated(Chan, Queue),
+ force_channel_stats(Config),
+ [_] = read_table_rpc(Config, channel_queue_metrics),
+ [_] = read_table_rpc(Config, channel_queue_exchange_metrics),
+
+ delete_queue(Chan, Queue),
+ force_metric_gc(Config),
+ % ensure removal of queue cleans up channel_queue metrics
+ [] = read_table_rpc(Config, channel_queue_exchange_metrics),
+ [] = read_table_rpc(Config, channel_queue_metrics),
+ ok = rabbit_ct_client_helpers:close_connection(Conn),
+ ok.
+
+channel_queue_exchange_consumer_close_connection(Config) ->
+ Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
+ {ok, Chan} = amqp_connection:open_channel(Conn),
+ Queue = declare_queue(Chan),
+ ensure_exchange_metrics_populated(Chan, Queue),
+ force_channel_stats(Config),
+
+ [_] = read_table_rpc(Config, channel_exchange_metrics),
+ [_] = read_table_rpc(Config, channel_queue_exchange_metrics),
+
+ ensure_channel_queue_metrics_populated(Chan, Queue),
+ force_channel_stats(Config),
+ [_] = read_table_rpc(Config, channel_queue_metrics),
+
+ Sub = #'basic.consume'{queue = Queue},
+ #'basic.consume_ok'{consumer_tag = _} =
+ amqp_channel:call(Chan, Sub),
+
+ [_] = read_table_rpc(Config, consumer_created),
+
+ ok = rabbit_ct_client_helpers:close_connection(Conn),
+ % ensure cleanup happened
+ force_metric_gc(Config),
+ [] = read_table_rpc(Config, channel_exchange_metrics),
+ [] = read_table_rpc(Config, channel_queue_exchange_metrics),
+ [] = read_table_rpc(Config, channel_queue_metrics),
+ [] = read_table_rpc(Config, consumer_created),
+ ok.
+
+
+
+%% -------------------------------------------------------------------
+%% Utilities
+%% -------------------------------------------------------------------
+
+declare_queue(Chan) ->
+ Declare = #'queue.declare'{durable = false, auto_delete = true},
+ #'queue.declare_ok'{queue = Name} = amqp_channel:call(Chan, Declare),
+ Name.
+
+delete_queue(Chan, Name) ->
+ Delete = #'queue.delete'{queue = Name},
+ #'queue.delete_ok'{} = amqp_channel:call(Chan, Delete).
+
+ensure_exchange_metrics_populated(Chan, RoutingKey) ->
+ % need to publish for exchange metrics to be populated
+ Publish = #'basic.publish'{routing_key = RoutingKey},
+ amqp_channel:call(Chan, Publish, #amqp_msg{payload = <<"hello">>}).
+
+ensure_channel_queue_metrics_populated(Chan, Queue) ->
+ % need to get and wait for timer for channel queue metrics to be populated
+ Get = #'basic.get'{queue = Queue, no_ack=true},
+ {#'basic.get_ok'{}, #amqp_msg{}} = amqp_channel:call(Chan, Get).
+
+force_channel_stats(Config) ->
+ [ Pid ! emit_stats || {Pid, _} <- read_table_rpc(Config, channel_created) ],
+ timer:sleep(100).
+
+read_table_rpc(Config, Table) ->
+ rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, read_table, [Table]).
+
+clean_core_metrics(Config) ->
+ [ rabbit_ct_broker_helpers:rpc(Config, 0, ets, delete_all_objects, [Table])
+ || {Table, _} <- ?CORE_TABLES].
+
+read_table(Table) ->
+ ets:tab2list(Table).
+
+force_metric_gc(Config) ->
+ timer:sleep(300),
+ rabbit_ct_broker_helpers:rpc(Config, 0, erlang, send,
+ [rabbit_core_metrics_gc, start_gc]),
+ rabbit_ct_broker_helpers:rpc(Config, 0, gen_server, call,
+ [rabbit_core_metrics_gc, test]).