diff options
Diffstat (limited to 'deps/rabbit/test/metrics_SUITE.erl')
-rw-r--r-- | deps/rabbit/test/metrics_SUITE.erl | 404 |
1 files changed, 404 insertions, 0 deletions
diff --git a/deps/rabbit/test/metrics_SUITE.erl b/deps/rabbit/test/metrics_SUITE.erl new file mode 100644 index 0000000000..e585ccd5a8 --- /dev/null +++ b/deps/rabbit/test/metrics_SUITE.erl @@ -0,0 +1,404 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2016-2020 VMware, Inc. or its affiliates. All rights reserved. +%% +-module(metrics_SUITE). +-compile(export_all). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("proper/include/proper.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). +-include_lib("rabbit_common/include/rabbit_core_metrics.hrl"). +-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). + + +all() -> + [ + {group, non_parallel_tests} + ]. + +groups() -> + [ + {non_parallel_tests, [], [ + connection, + channel, + channel_connection_close, + channel_queue_exchange_consumer_close_connection, + channel_queue_delete_queue, + connection_metric_count_test, + channel_metric_count_test, + queue_metric_count_test, + queue_metric_count_channel_per_queue_test, + connection_metric_idemp_test, + channel_metric_idemp_test, + queue_metric_idemp_test + ]} + ]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +merge_app_env(Config) -> + rabbit_ct_helpers:merge_app_env(Config, + {rabbit, [ + {collect_statistics, fine}, + {collect_statistics_interval, 500} + ]}). +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodename_suffix, ?MODULE} + ]), + rabbit_ct_helpers:run_setup_steps(Config1, + [ fun merge_app_env/1 ] ++ + rabbit_ct_broker_helpers:setup_steps()). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config, + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_group(_, Config) -> + Config. + +end_per_group(_, Config) -> + Config. + +init_per_testcase(Testcase, Config) -> + clean_core_metrics(Config), + rabbit_ct_helpers:testcase_started(Config, Testcase). + +end_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_finished(Config, Testcase). + + +%% ------------------------------------------------------------------- +%% Testcases. +%% ------------------------------------------------------------------- + +% NB: node_stats tests are in the management_agent repo + +connection_metric_count_test(Config) -> + rabbit_ct_proper_helpers:run_proper(fun prop_connection_metric_count/1, [Config], 25). + +channel_metric_count_test(Config) -> + rabbit_ct_proper_helpers:run_proper(fun prop_channel_metric_count/1, [Config], 25). + +queue_metric_count_test(Config) -> + rabbit_ct_proper_helpers:run_proper(fun prop_queue_metric_count/1, [Config], 5). + +queue_metric_count_channel_per_queue_test(Config) -> + rabbit_ct_proper_helpers:run_proper(fun prop_queue_metric_count_channel_per_queue/1, + [Config], 5). + +connection_metric_idemp_test(Config) -> + connection_metric_idemp(Config, {1, 1}), + connection_metric_idemp(Config, {1, 2}), + connection_metric_idemp(Config, {2, 2}). + +channel_metric_idemp_test(Config) -> + rabbit_ct_proper_helpers:run_proper(fun prop_channel_metric_idemp/1, [Config], 25). + +queue_metric_idemp_test(Config) -> + rabbit_ct_proper_helpers:run_proper(fun prop_queue_metric_idemp/1, [Config], 25). + +prop_connection_metric_idemp(Config) -> + ?FORALL(N, {integer(1, 25), integer(1, 25)}, + connection_metric_idemp(Config, N)). + +prop_channel_metric_idemp(Config) -> + ?FORALL(N, {integer(1, 25), integer(1, 25)}, + channel_metric_idemp(Config, N)). + +prop_queue_metric_idemp(Config) -> + ?FORALL(N, {integer(1, 25), integer(1, 25)}, + queue_metric_idemp(Config, N)). + +prop_connection_metric_count(Config) -> + ?FORALL(N, {integer(1, 25), resize(100, list(oneof([add, remove])))}, + connection_metric_count(Config, N)). + +prop_channel_metric_count(Config) -> + ?FORALL(N, {integer(1, 25), resize(100, list(oneof([add, remove])))}, + channel_metric_count(Config, N)). + +prop_queue_metric_count(Config) -> + ?FORALL(N, {integer(1, 10), resize(10, list(oneof([add, remove])))}, + queue_metric_count(Config, N)). + +prop_queue_metric_count_channel_per_queue(Config) -> + ?FORALL(N, {integer(1, 10), resize(10, list(oneof([add, remove])))}, + queue_metric_count_channel_per_queue(Config, N)). + +connection_metric_idemp(Config, {N, R}) -> + Conns = [rabbit_ct_client_helpers:open_unmanaged_connection(Config) + || _ <- lists:seq(1, N)], + Table = ?awaitMatch(L when is_list(L) andalso length(L) == N, + [ Pid || {Pid, _} <- read_table_rpc(Config, + connection_metrics)], + 5000), + Table2 = [ Pid || {Pid, _} <- read_table_rpc(Config, connection_coarse_metrics)], + % refresh stats 'R' times + [[Pid ! emit_stats || Pid <- Table] || _ <- lists:seq(1, R)], + force_metric_gc(Config), + TableAfter = [ Pid || {Pid, _} <- read_table_rpc(Config, connection_metrics)], + TableAfter2 = [ Pid || {Pid, _} <- read_table_rpc(Config, connection_coarse_metrics)], + [rabbit_ct_client_helpers:close_connection(Conn) || Conn <- Conns], + ?assertEqual(Table, TableAfter), + ?assertEqual(Table2, TableAfter2), + ?assertEqual(N, length(Table)), + ?assertEqual(N, length(TableAfter)). + +channel_metric_idemp(Config, {N, R}) -> + Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), + [amqp_connection:open_channel(Conn) || _ <- lists:seq(1, N)], + Table = [ Pid || {Pid, _} <- read_table_rpc(Config, channel_metrics)], + Table2 = [ Pid || {Pid, _} <- read_table_rpc(Config, channel_process_metrics)], + % refresh stats 'R' times + [[Pid ! emit_stats || Pid <- Table] || _ <- lists:seq(1, R)], + force_metric_gc(Config), + TableAfter = [ Pid || {Pid, _} <- read_table_rpc(Config, channel_metrics)], + TableAfter2 = [ Pid || {Pid, _} <- read_table_rpc(Config, channel_process_metrics)], + rabbit_ct_client_helpers:close_connection(Conn), + (Table2 == TableAfter2) and (Table == TableAfter) and + (N == length(Table)) and (N == length(TableAfter)). + +queue_metric_idemp(Config, {N, R}) -> + clean_core_metrics(Config), + Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), + {ok, Chan} = amqp_connection:open_channel(Conn), + Queues = + [begin + Queue = declare_queue(Chan), + ensure_exchange_metrics_populated(Chan, Queue), + ensure_channel_queue_metrics_populated(Chan, Queue), + Queue + end || _ <- lists:seq(1, N)], + + Table = [ Pid || {Pid, _, _} <- read_table_rpc(Config, queue_metrics)], + Table2 = [ Pid || {Pid, _, _} <- read_table_rpc(Config, queue_coarse_metrics)], + % refresh stats 'R' times + ChanTable = read_table_rpc(Config, channel_created), + [[Pid ! emit_stats || {Pid, _, _} <- ChanTable ] || _ <- lists:seq(1, R)], + force_metric_gc(Config), + TableAfter = [ Pid || {Pid, _, _} <- read_table_rpc(Config, queue_metrics)], + TableAfter2 = [ Pid || {Pid, _, _} <- read_table_rpc(Config, queue_coarse_metrics)], + [ delete_queue(Chan, Q) || Q <- Queues], + rabbit_ct_client_helpers:close_connection(Conn), + (Table2 == TableAfter2) and (Table == TableAfter) and + (N == length(Table)) and (N == length(TableAfter)). + +connection_metric_count(Config, Ops) -> + add_rem_counter(Config, Ops, + {fun rabbit_ct_client_helpers:open_unmanaged_connection/1, + fun(Cfg) -> + rabbit_ct_client_helpers:close_connection(Cfg) + end}, + [ connection_created, + connection_metrics, + connection_coarse_metrics ]). + +channel_metric_count(Config, Ops) -> + Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), + Result = add_rem_counter(Config, Ops, + {fun (_Config) -> + {ok, Chan} = amqp_connection:open_channel(Conn), + Chan + end, + fun amqp_channel:close/1}, + [ channel_created, + channel_metrics, + channel_process_metrics ]), + ok = rabbit_ct_client_helpers:close_connection(Conn), + Result. + +queue_metric_count(Config, Ops) -> + Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), + {ok, Chan} = amqp_connection:open_channel(Conn), + AddFun = fun (_) -> + Queue = declare_queue(Chan), + ensure_exchange_metrics_populated(Chan, Queue), + ensure_channel_queue_metrics_populated(Chan, Queue), + force_channel_stats(Config), + Queue + end, + Result = add_rem_counter(Config, Ops, + {AddFun, + fun (Q) -> delete_queue(Chan, Q), + force_metric_gc(Config) + end}, [channel_queue_metrics, + channel_queue_exchange_metrics ]), + ok = rabbit_ct_client_helpers:close_connection(Conn), + Result. + +queue_metric_count_channel_per_queue(Config, Ops) -> + Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), + AddFun = fun (_) -> + {ok, Chan} = amqp_connection:open_channel(Conn), + Queue = declare_queue(Chan), + ensure_exchange_metrics_populated(Chan, Queue), + ensure_channel_queue_metrics_populated(Chan, Queue), + force_channel_stats(Config), + {Chan, Queue} + end, + Result = add_rem_counter(Config, Ops, + {AddFun, + fun ({Chan, Q}) -> + delete_queue(Chan, Q), + force_metric_gc(Config) + end}, + [ channel_queue_metrics, + channel_queue_exchange_metrics ]), + ok = rabbit_ct_client_helpers:close_connection(Conn), + Result. + +add_rem_counter(Config, {Initial, Ops}, {AddFun, RemFun}, Tables) -> + Things = [ AddFun(Config) || _ <- lists:seq(1, Initial) ], + % either add or remove some things + {FinalLen, Things1} = + lists:foldl(fun(add, {L, Items}) -> + {L+1, [AddFun(Config) | Items]}; + (remove, {L, [H|Tail]}) -> + RemFun(H), + {L-1, Tail}; + (_, S) -> S end, + {Initial, Things}, + Ops), + force_metric_gc(Config), + TabLens = lists:map(fun(T) -> + length(read_table_rpc(Config, T)) + end, Tables), + [RemFun(Thing) || Thing <- Things1], + [FinalLen] == lists:usort(TabLens). + + +connection(Config) -> + Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), + [_] = read_table_rpc(Config, connection_created), + [_] = read_table_rpc(Config, connection_metrics), + [_] = read_table_rpc(Config, connection_coarse_metrics), + ok = rabbit_ct_client_helpers:close_connection(Conn), + force_metric_gc(Config), + [] = read_table_rpc(Config, connection_created), + [] = read_table_rpc(Config, connection_metrics), + [] = read_table_rpc(Config, connection_coarse_metrics), + ok. + +channel(Config) -> + Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), + {ok, Chan} = amqp_connection:open_channel(Conn), + [_] = read_table_rpc(Config, channel_created), + [_] = read_table_rpc(Config, channel_metrics), + [_] = read_table_rpc(Config, channel_process_metrics), + ok = amqp_channel:close(Chan), + [] = read_table_rpc(Config, channel_created), + [] = read_table_rpc(Config, channel_metrics), + [] = read_table_rpc(Config, channel_process_metrics), + ok = rabbit_ct_client_helpers:close_connection(Conn). + +channel_connection_close(Config) -> + Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), + {ok, _} = amqp_connection:open_channel(Conn), + [_] = read_table_rpc(Config, channel_created), + [_] = read_table_rpc(Config, channel_metrics), + [_] = read_table_rpc(Config, channel_process_metrics), + ok = rabbit_ct_client_helpers:close_connection(Conn), + [] = read_table_rpc(Config, channel_created), + [] = read_table_rpc(Config, channel_metrics), + [] = read_table_rpc(Config, channel_process_metrics). + +channel_queue_delete_queue(Config) -> + Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), + {ok, Chan} = amqp_connection:open_channel(Conn), + Queue = declare_queue(Chan), + ensure_exchange_metrics_populated(Chan, Queue), + ensure_channel_queue_metrics_populated(Chan, Queue), + force_channel_stats(Config), + [_] = read_table_rpc(Config, channel_queue_metrics), + [_] = read_table_rpc(Config, channel_queue_exchange_metrics), + + delete_queue(Chan, Queue), + force_metric_gc(Config), + % ensure removal of queue cleans up channel_queue metrics + [] = read_table_rpc(Config, channel_queue_exchange_metrics), + [] = read_table_rpc(Config, channel_queue_metrics), + ok = rabbit_ct_client_helpers:close_connection(Conn), + ok. + +channel_queue_exchange_consumer_close_connection(Config) -> + Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), + {ok, Chan} = amqp_connection:open_channel(Conn), + Queue = declare_queue(Chan), + ensure_exchange_metrics_populated(Chan, Queue), + force_channel_stats(Config), + + [_] = read_table_rpc(Config, channel_exchange_metrics), + [_] = read_table_rpc(Config, channel_queue_exchange_metrics), + + ensure_channel_queue_metrics_populated(Chan, Queue), + force_channel_stats(Config), + [_] = read_table_rpc(Config, channel_queue_metrics), + + Sub = #'basic.consume'{queue = Queue}, + #'basic.consume_ok'{consumer_tag = _} = + amqp_channel:call(Chan, Sub), + + [_] = read_table_rpc(Config, consumer_created), + + ok = rabbit_ct_client_helpers:close_connection(Conn), + % ensure cleanup happened + force_metric_gc(Config), + [] = read_table_rpc(Config, channel_exchange_metrics), + [] = read_table_rpc(Config, channel_queue_exchange_metrics), + [] = read_table_rpc(Config, channel_queue_metrics), + [] = read_table_rpc(Config, consumer_created), + ok. + + + +%% ------------------------------------------------------------------- +%% Utilities +%% ------------------------------------------------------------------- + +declare_queue(Chan) -> + Declare = #'queue.declare'{durable = false, auto_delete = true}, + #'queue.declare_ok'{queue = Name} = amqp_channel:call(Chan, Declare), + Name. + +delete_queue(Chan, Name) -> + Delete = #'queue.delete'{queue = Name}, + #'queue.delete_ok'{} = amqp_channel:call(Chan, Delete). + +ensure_exchange_metrics_populated(Chan, RoutingKey) -> + % need to publish for exchange metrics to be populated + Publish = #'basic.publish'{routing_key = RoutingKey}, + amqp_channel:call(Chan, Publish, #amqp_msg{payload = <<"hello">>}). + +ensure_channel_queue_metrics_populated(Chan, Queue) -> + % need to get and wait for timer for channel queue metrics to be populated + Get = #'basic.get'{queue = Queue, no_ack=true}, + {#'basic.get_ok'{}, #amqp_msg{}} = amqp_channel:call(Chan, Get). + +force_channel_stats(Config) -> + [ Pid ! emit_stats || {Pid, _} <- read_table_rpc(Config, channel_created) ], + timer:sleep(100). + +read_table_rpc(Config, Table) -> + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, read_table, [Table]). + +clean_core_metrics(Config) -> + [ rabbit_ct_broker_helpers:rpc(Config, 0, ets, delete_all_objects, [Table]) + || {Table, _} <- ?CORE_TABLES]. + +read_table(Table) -> + ets:tab2list(Table). + +force_metric_gc(Config) -> + timer:sleep(300), + rabbit_ct_broker_helpers:rpc(Config, 0, erlang, send, + [rabbit_core_metrics_gc, start_gc]), + rabbit_ct_broker_helpers:rpc(Config, 0, gen_server, call, + [rabbit_core_metrics_gc, test]). |