diff options
author | Michael Klishin <mklishin@pivotal.io> | 2019-12-12 18:46:14 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-12-12 18:46:14 +0300 |
commit | 3605ce32b38ef7bce038580ed9408a224e023c8c (patch) | |
tree | 9227a37c823501db3d636cd284ba0fccf2ce69bd | |
parent | ec7cae29f182b0bede9d431d04084f1d4f258827 (diff) | |
parent | bdede1ae94a28a2855bb1701872d2c7a56dcc6d3 (diff) | |
download | rabbitmq-server-git-3605ce32b38ef7bce038580ed9408a224e023c8c.tar.gz |
Merge pull request #2183 from rabbitmq/rabbitmq-cli-389-part1
rabbit_{amqueue,quorum_queue}: add several listing and sampling functions for new CLI commands
-rw-r--r-- | src/rabbit_amqqueue.erl | 75 | ||||
-rw-r--r-- | src/rabbit_core_metrics_gc.erl | 2 | ||||
-rw-r--r-- | src/rabbit_feature_flags.erl | 2 | ||||
-rw-r--r-- | src/rabbit_quorum_queue.erl | 69 | ||||
-rw-r--r-- | test/quorum_queue_SUITE.erl | 41 |
5 files changed, 176 insertions, 13 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index b77e0118e1..6227a5bbfa 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -32,8 +32,9 @@ emit_info_local/4, emit_info_down/4]). -export([count/0]). -export([list_down/1, count/1, list_names/0, list_names/1, list_local_names/0, + list_local_mirrored_classic_names/0, list_local_names_down/0, list_with_possible_retry/1]). --export([list_by_type/1]). +-export([list_by_type/1, sample_local_queues/0, sample_n_by_name/2, sample_n/2]). -export([force_event_refresh/1, notify_policy_changed/1]). -export([consumers/1, consumers_all/1, emit_consumers_all/4, consumer_info_keys/0]). -export([basic_get/6, basic_consume/12, basic_cancel/6, notify_decorators/1]). @@ -44,8 +45,8 @@ -export([update_mirroring/1, sync_mirrors/1, cancel_sync_mirrors/1]). -export([emit_unresponsive/6, emit_unresponsive_local/5, is_unresponsive/2]). -export([is_replicated/1, is_dead_exclusive/1]). % Note: exported due to use in qlc expression. --export([list_local_followers/0, - get_quorum_nodes/1]). +-export([list_local_quorum_queues/0, list_local_quorum_queue_names/0, + list_local_leaders/0, list_local_followers/0, get_quorum_nodes/1]). -export([ensure_rabbit_queue_record_is_initialized/1]). -export([format/1]). -export([delete_immediately_by_resource/1]). @@ -955,8 +956,41 @@ is_down(Q) -> true end. + +-spec sample_local_queues() -> [amqqueue:amqqueue()]. +sample_local_queues() -> sample_n_by_name(list_local_names(), 300). + +-spec sample_n_by_name([rabbit_amqqueue:name()], pos_integer()) -> [amqqueue:amqqueue()]. +sample_n_by_name([], _N) -> + []; +sample_n_by_name(Names, N) when is_list(Names) andalso is_integer(N) andalso N > 0 -> + %% lists:nth/2 throws when position is > list length + M = erlang:min(N, length(Names)), + Ids = lists:foldl(fun( _, Acc) when length(Acc) >= 100 -> + Acc; + (_, Acc) -> + Pick = lists:nth(rand:uniform(M), Names), + [Pick | Acc] + end, + [], lists:seq(1, M)), + lists:map(fun (Id) -> + {ok, Q} = rabbit_amqqueue:lookup(Id), + Q + end, + lists:usort(Ids)). + +-spec sample_n([amqqueue:amqqueue()], pos_integer()) -> [amqqueue:amqqueue()]. +sample_n([], _N) -> + []; +sample_n(Queues, N) when is_list(Queues) andalso is_integer(N) andalso N > 0 -> + Names = [amqqueue:get_name(Q) || Q <- Queues], + sample_n_by_name(Names, N). + + -spec list_by_type(atom()) -> [amqqueue:amqqueue()]. +list_by_type(classic) -> list_by_type(rabbit_classic_queue); +list_by_type(quorum) -> list_by_type(rabbit_quorum_queue); list_by_type(Type) -> {atomic, Qs} = mnesia:sync_transaction( @@ -967,11 +1001,40 @@ list_by_type(Type) -> end), Qs. +-spec list_local_quorum_queue_names() -> [rabbit_amqqueue:name()]. + +list_local_quorum_queue_names() -> + [ amqqueue:get_name(Q) || Q <- list_by_type(quorum), + amqqueue:get_state(Q) =/= crashed, + lists:member(node(), get_quorum_nodes(Q))]. + +-spec list_local_quorum_queues() -> [amqqueue:amqqueue()]. +list_local_quorum_queues() -> + [ Q || Q <- list_by_type(quorum), + amqqueue:get_state(Q) =/= crashed, + lists:member(node(), get_quorum_nodes(Q))]. + +-spec list_local_leaders() -> [amqqueue:amqqueue()]. +list_local_leaders() -> + [ Q || Q <- list(), + amqqueue:is_quorum(Q), + amqqueue:get_state(Q) =/= crashed, amqqueue:get_leader(Q) =:= node()]. + +-spec list_local_followers() -> [amqqueue:amqqueue()]. list_local_followers() -> - [ amqqueue:get_name(Q) - || Q <- list(), + [ Q || Q <- list(), amqqueue:is_quorum(Q), - amqqueue:get_state(Q) =/= crashed, amqqueue:get_leader(Q) =/= node(), lists:member(node(), get_quorum_nodes(Q))]. + amqqueue:get_state(Q) =/= crashed, amqqueue:get_leader(Q) =/= node(), + lists:member(node(), get_quorum_nodes(Q))]. + +-spec list_local_mirrored_classic_names() -> [rabbit_amqqueue:name()]. +list_local_mirrored_classic_names() -> + [ amqqueue:get_name(Q) || Q <- list(), + amqqueue:get_state(Q) =/= crashed, + amqqueue:is_classic(Q), + is_local_to_node(amqqueue:get_pid(Q), node()), + is_replicated(Q)]. + is_local_to_node(QPid, Node) when ?IS_CLASSIC(QPid) -> Node =:= node(QPid); diff --git a/src/rabbit_core_metrics_gc.erl b/src/rabbit_core_metrics_gc.erl index 326804e342..06cc401d32 100644 --- a/src/rabbit_core_metrics_gc.erl +++ b/src/rabbit_core_metrics_gc.erl @@ -80,7 +80,7 @@ gc_local_queues() -> GbSetDown = gb_sets:from_list(QueuesDown), gc_queue_metrics(GbSet, GbSetDown), gc_entity(queue_coarse_metrics, GbSet), - Followers = gb_sets:from_list(rabbit_amqqueue:list_local_followers()), + Followers = gb_sets:from_list([amqqueue:get_name(Q) || Q <- rabbit_amqqueue:list_local_followers() ]), gc_leader_data(Followers). gc_leader_data(Followers) -> diff --git a/src/rabbit_feature_flags.erl b/src/rabbit_feature_flags.erl index 9440c6cd0b..f26978338a 100644 --- a/src/rabbit_feature_flags.erl +++ b/src/rabbit_feature_flags.erl @@ -2293,7 +2293,7 @@ on_load() -> true -> %% RabbitMQ is running. %% - %% Now we want to differenciate a pre-feature-flags node + %% Now we want to differentiate a pre-feature-flags node %% from one having the subsystem. %% %% To do that, we verify if the `feature_flags_file` diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 867af6cb75..e27bbb67dc 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -42,10 +42,12 @@ -export([transfer_leadership/2, get_replicas/1, queue_length/1]). -export([file_handle_leader_reservation/1, file_handle_other_reservation/0]). -export([file_handle_release_reservation/0]). +-export([list_with_minimum_quorum/0, list_with_minimum_quorum_for_cli/0, + filter_quorum_critical/1, filter_quorum_critical/2, + all_replica_states/0]). -%%-include_lib("rabbit_common/include/rabbit.hrl"). --include_lib("rabbit.hrl"). -include_lib("stdlib/include/qlc.hrl"). +-include("rabbit.hrl"). -include("amqqueue.hrl"). -type msg_id() :: non_neg_integer(). @@ -234,6 +236,69 @@ become_leader(QName, Name) -> end end). +-spec all_replica_states() -> {node(), #{atom() => atom()}}. +all_replica_states() -> + Rows = ets:tab2list(ra_state), + {node(), maps:from_list(Rows)}. + +-spec list_with_minimum_quorum() -> [amqqueue:amqqueue()]. +list_with_minimum_quorum() -> + filter_quorum_critical(rabbit_amqqueue:list_local_quorum_queues()). + +-spec list_with_minimum_quorum_for_cli() -> [amqqueue:amqqueue()]. +list_with_minimum_quorum_for_cli() -> + QQs = list_with_minimum_quorum(), + [begin + #resource{name = Name} = amqqueue:get_name(Q), + #{ + <<"readable_name">> => rabbit_misc:rs(amqqueue:get_name(Q)), + <<"name">> => Name, + <<"virtual_host">> => amqqueue:get_vhost(Q), + <<"type">> => <<"quorum">> + } + end || Q <- QQs]. + +-spec filter_quorum_critical([amqqueue:amqqueue()]) -> [amqqueue:amqqueue()]. +filter_quorum_critical(Queues) -> + %% Example map of QQ replica states: + %% #{rabbit@warp10 => + %% #{'%2F_qq.636' => leader,'%2F_qq.243' => leader, + %% '%2F_qq.1939' => leader,'%2F_qq.1150' => leader, + %% '%2F_qq.1109' => leader,'%2F_qq.1654' => leader, + %% '%2F_qq.1679' => leader,'%2F_qq.1003' => leader, + %% '%2F_qq.1593' => leader,'%2F_qq.1765' => leader, + %% '%2F_qq.933' => leader,'%2F_qq.38' => leader, + %% '%2F_qq.1357' => leader,'%2F_qq.1345' => leader, + %% '%2F_qq.1694' => leader,'%2F_qq.994' => leader, + %% '%2F_qq.490' => leader,'%2F_qq.1704' => leader, + %% '%2F_qq.58' => leader,'%2F_qq.564' => leader, + %% '%2F_qq.683' => leader,'%2F_qq.386' => leader, + %% '%2F_qq.753' => leader,'%2F_qq.6' => leader, + %% '%2F_qq.1590' => leader,'%2F_qq.1363' => leader, + %% '%2F_qq.882' => leader,'%2F_qq.1161' => leader,...}} + ReplicaStates = maps:from_list( + rabbit_misc:append_rpc_all_nodes(rabbit_nodes:all_running(), + ?MODULE, all_replica_states, [])), + filter_quorum_critical(Queues, ReplicaStates). + +-spec filter_quorum_critical([amqqueue:amqqueue()], #{node() => #{atom() => atom()}}) -> [amqqueue:amqqueue()]. + +filter_quorum_critical(Queues, ReplicaStates) -> + lists:filter(fun (Q) -> + MemberNodes = rabbit_amqqueue:get_quorum_nodes(Q), + {Name, _Node} = amqqueue:get_pid(Q), + AllUp = lists:filter(fun (N) -> + {Name, _} = amqqueue:get_pid(Q), + case maps:get(N, ReplicaStates, undefined) of + #{Name := State} when State =:= follower orelse State =:= leader -> + true; + _ -> false + end + end, MemberNodes), + MinQuorum = length(MemberNodes) div 2 + 1, + length(AllUp) =< MinQuorum + end, Queues). + rpc_delete_metrics(QName) -> ets:delete(queue_coarse_metrics, QName), ets:delete(queue_metrics, QName), diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 64058ce013..808d4470aa 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -43,6 +43,7 @@ groups() -> [ {single_node, [], all_tests()}, {single_node, [], memory_tests()}, + {single_node, [], [node_removal_is_quorum_critical]}, {unclustered, [], [ {cluster_size_2, [], [add_member]} ]}, @@ -56,7 +57,8 @@ groups() -> delete_member_classic, delete_member_queue_not_found, delete_member, - delete_member_not_a_member] + delete_member_not_a_member, + node_removal_is_quorum_critical] ++ all_tests()}, {cluster_size_2, [], memory_tests()}, {cluster_size_3, [], [ @@ -74,12 +76,14 @@ groups() -> shrink_all, rebalance, file_handle_reservations, - file_handle_reservations_above_limit + file_handle_reservations_above_limit, + node_removal_is_not_quorum_critical ]}, {cluster_size_5, [], [start_queue, start_queue_concurrent, quorum_cluster_size_3, - quorum_cluster_size_7 + quorum_cluster_size_7, + node_removal_is_not_quorum_critical ]}, {clustered_with_partitions, [], [ reconnect_consumer_and_publish, @@ -1387,6 +1391,31 @@ delete_member_not_a_member(Config) -> rpc:call(Server, rabbit_quorum_queue, delete_member, [<<"/">>, QQ, Server])). +%% These tests check if node removal would cause any queues to lose (or not lose) +%% their quorum. See rabbitmq/rabbitmq-cli#389 for background. + +node_removal_is_quorum_critical(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QName = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QName, 0, 0}, + declare(Ch, QName, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + timer:sleep(100), + [begin + Qs = rpc:call(S, rabbit_quorum_queue, list_with_minimum_quorum, []), + ?assertEqual([QName], queue_names(Qs)) + end || S <- Servers]. + +node_removal_is_not_quorum_critical(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QName = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QName, 0, 0}, + declare(Ch, QName, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + timer:sleep(100), + Qs = rpc:call(Server, rabbit_quorum_queue, list_with_minimum_quorum, []), + ?assertEqual([], Qs). + file_handle_reservations(Config) -> Servers = [Server1 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), Ch = rabbit_ct_client_helpers:open_channel(Config, Server1), @@ -2407,3 +2436,9 @@ wait_for_consensus(Name, Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), RaName = ra_name(Name), {ok, _, _} = ra:members({RaName, Server}). + +queue_names(Records) -> + [begin + #resource{name = Name} = amqqueue:get_name(Q), + Name + end || Q <- Records]. |