summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <mklishin@pivotal.io>2019-12-12 18:46:14 +0300
committerGitHub <noreply@github.com>2019-12-12 18:46:14 +0300
commit3605ce32b38ef7bce038580ed9408a224e023c8c (patch)
tree9227a37c823501db3d636cd284ba0fccf2ce69bd
parentec7cae29f182b0bede9d431d04084f1d4f258827 (diff)
parentbdede1ae94a28a2855bb1701872d2c7a56dcc6d3 (diff)
downloadrabbitmq-server-git-3605ce32b38ef7bce038580ed9408a224e023c8c.tar.gz
Merge pull request #2183 from rabbitmq/rabbitmq-cli-389-part1
rabbit_{amqueue,quorum_queue}: add several listing and sampling functions for new CLI commands
-rw-r--r--src/rabbit_amqqueue.erl75
-rw-r--r--src/rabbit_core_metrics_gc.erl2
-rw-r--r--src/rabbit_feature_flags.erl2
-rw-r--r--src/rabbit_quorum_queue.erl69
-rw-r--r--test/quorum_queue_SUITE.erl41
5 files changed, 176 insertions, 13 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index b77e0118e1..6227a5bbfa 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -32,8 +32,9 @@
emit_info_local/4, emit_info_down/4]).
-export([count/0]).
-export([list_down/1, count/1, list_names/0, list_names/1, list_local_names/0,
+ list_local_mirrored_classic_names/0,
list_local_names_down/0, list_with_possible_retry/1]).
--export([list_by_type/1]).
+-export([list_by_type/1, sample_local_queues/0, sample_n_by_name/2, sample_n/2]).
-export([force_event_refresh/1, notify_policy_changed/1]).
-export([consumers/1, consumers_all/1, emit_consumers_all/4, consumer_info_keys/0]).
-export([basic_get/6, basic_consume/12, basic_cancel/6, notify_decorators/1]).
@@ -44,8 +45,8 @@
-export([update_mirroring/1, sync_mirrors/1, cancel_sync_mirrors/1]).
-export([emit_unresponsive/6, emit_unresponsive_local/5, is_unresponsive/2]).
-export([is_replicated/1, is_dead_exclusive/1]). % Note: exported due to use in qlc expression.
--export([list_local_followers/0,
- get_quorum_nodes/1]).
+-export([list_local_quorum_queues/0, list_local_quorum_queue_names/0,
+ list_local_leaders/0, list_local_followers/0, get_quorum_nodes/1]).
-export([ensure_rabbit_queue_record_is_initialized/1]).
-export([format/1]).
-export([delete_immediately_by_resource/1]).
@@ -955,8 +956,41 @@ is_down(Q) ->
true
end.
+
+-spec sample_local_queues() -> [amqqueue:amqqueue()].
+sample_local_queues() -> sample_n_by_name(list_local_names(), 300).
+
+-spec sample_n_by_name([rabbit_amqqueue:name()], pos_integer()) -> [amqqueue:amqqueue()].
+sample_n_by_name([], _N) ->
+ [];
+sample_n_by_name(Names, N) when is_list(Names) andalso is_integer(N) andalso N > 0 ->
+ %% lists:nth/2 throws when position is > list length
+ M = erlang:min(N, length(Names)),
+ Ids = lists:foldl(fun( _, Acc) when length(Acc) >= 100 ->
+ Acc;
+ (_, Acc) ->
+ Pick = lists:nth(rand:uniform(M), Names),
+ [Pick | Acc]
+ end,
+ [], lists:seq(1, M)),
+ lists:map(fun (Id) ->
+ {ok, Q} = rabbit_amqqueue:lookup(Id),
+ Q
+ end,
+ lists:usort(Ids)).
+
+-spec sample_n([amqqueue:amqqueue()], pos_integer()) -> [amqqueue:amqqueue()].
+sample_n([], _N) ->
+ [];
+sample_n(Queues, N) when is_list(Queues) andalso is_integer(N) andalso N > 0 ->
+ Names = [amqqueue:get_name(Q) || Q <- Queues],
+ sample_n_by_name(Names, N).
+
+
-spec list_by_type(atom()) -> [amqqueue:amqqueue()].
+list_by_type(classic) -> list_by_type(rabbit_classic_queue);
+list_by_type(quorum) -> list_by_type(rabbit_quorum_queue);
list_by_type(Type) ->
{atomic, Qs} =
mnesia:sync_transaction(
@@ -967,11 +1001,40 @@ list_by_type(Type) ->
end),
Qs.
+-spec list_local_quorum_queue_names() -> [rabbit_amqqueue:name()].
+
+list_local_quorum_queue_names() ->
+ [ amqqueue:get_name(Q) || Q <- list_by_type(quorum),
+ amqqueue:get_state(Q) =/= crashed,
+ lists:member(node(), get_quorum_nodes(Q))].
+
+-spec list_local_quorum_queues() -> [amqqueue:amqqueue()].
+list_local_quorum_queues() ->
+ [ Q || Q <- list_by_type(quorum),
+ amqqueue:get_state(Q) =/= crashed,
+ lists:member(node(), get_quorum_nodes(Q))].
+
+-spec list_local_leaders() -> [amqqueue:amqqueue()].
+list_local_leaders() ->
+ [ Q || Q <- list(),
+ amqqueue:is_quorum(Q),
+ amqqueue:get_state(Q) =/= crashed, amqqueue:get_leader(Q) =:= node()].
+
+-spec list_local_followers() -> [amqqueue:amqqueue()].
list_local_followers() ->
- [ amqqueue:get_name(Q)
- || Q <- list(),
+ [ Q || Q <- list(),
amqqueue:is_quorum(Q),
- amqqueue:get_state(Q) =/= crashed, amqqueue:get_leader(Q) =/= node(), lists:member(node(), get_quorum_nodes(Q))].
+ amqqueue:get_state(Q) =/= crashed, amqqueue:get_leader(Q) =/= node(),
+ lists:member(node(), get_quorum_nodes(Q))].
+
+-spec list_local_mirrored_classic_names() -> [rabbit_amqqueue:name()].
+list_local_mirrored_classic_names() ->
+ [ amqqueue:get_name(Q) || Q <- list(),
+ amqqueue:get_state(Q) =/= crashed,
+ amqqueue:is_classic(Q),
+ is_local_to_node(amqqueue:get_pid(Q), node()),
+ is_replicated(Q)].
+
is_local_to_node(QPid, Node) when ?IS_CLASSIC(QPid) ->
Node =:= node(QPid);
diff --git a/src/rabbit_core_metrics_gc.erl b/src/rabbit_core_metrics_gc.erl
index 326804e342..06cc401d32 100644
--- a/src/rabbit_core_metrics_gc.erl
+++ b/src/rabbit_core_metrics_gc.erl
@@ -80,7 +80,7 @@ gc_local_queues() ->
GbSetDown = gb_sets:from_list(QueuesDown),
gc_queue_metrics(GbSet, GbSetDown),
gc_entity(queue_coarse_metrics, GbSet),
- Followers = gb_sets:from_list(rabbit_amqqueue:list_local_followers()),
+ Followers = gb_sets:from_list([amqqueue:get_name(Q) || Q <- rabbit_amqqueue:list_local_followers() ]),
gc_leader_data(Followers).
gc_leader_data(Followers) ->
diff --git a/src/rabbit_feature_flags.erl b/src/rabbit_feature_flags.erl
index 9440c6cd0b..f26978338a 100644
--- a/src/rabbit_feature_flags.erl
+++ b/src/rabbit_feature_flags.erl
@@ -2293,7 +2293,7 @@ on_load() ->
true ->
%% RabbitMQ is running.
%%
- %% Now we want to differenciate a pre-feature-flags node
+ %% Now we want to differentiate a pre-feature-flags node
%% from one having the subsystem.
%%
%% To do that, we verify if the `feature_flags_file`
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 867af6cb75..e27bbb67dc 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -42,10 +42,12 @@
-export([transfer_leadership/2, get_replicas/1, queue_length/1]).
-export([file_handle_leader_reservation/1, file_handle_other_reservation/0]).
-export([file_handle_release_reservation/0]).
+-export([list_with_minimum_quorum/0, list_with_minimum_quorum_for_cli/0,
+ filter_quorum_critical/1, filter_quorum_critical/2,
+ all_replica_states/0]).
-%%-include_lib("rabbit_common/include/rabbit.hrl").
--include_lib("rabbit.hrl").
-include_lib("stdlib/include/qlc.hrl").
+-include("rabbit.hrl").
-include("amqqueue.hrl").
-type msg_id() :: non_neg_integer().
@@ -234,6 +236,69 @@ become_leader(QName, Name) ->
end
end).
+-spec all_replica_states() -> {node(), #{atom() => atom()}}.
+all_replica_states() ->
+ Rows = ets:tab2list(ra_state),
+ {node(), maps:from_list(Rows)}.
+
+-spec list_with_minimum_quorum() -> [amqqueue:amqqueue()].
+list_with_minimum_quorum() ->
+ filter_quorum_critical(rabbit_amqqueue:list_local_quorum_queues()).
+
+-spec list_with_minimum_quorum_for_cli() -> [amqqueue:amqqueue()].
+list_with_minimum_quorum_for_cli() ->
+ QQs = list_with_minimum_quorum(),
+ [begin
+ #resource{name = Name} = amqqueue:get_name(Q),
+ #{
+ <<"readable_name">> => rabbit_misc:rs(amqqueue:get_name(Q)),
+ <<"name">> => Name,
+ <<"virtual_host">> => amqqueue:get_vhost(Q),
+ <<"type">> => <<"quorum">>
+ }
+ end || Q <- QQs].
+
+-spec filter_quorum_critical([amqqueue:amqqueue()]) -> [amqqueue:amqqueue()].
+filter_quorum_critical(Queues) ->
+ %% Example map of QQ replica states:
+ %% #{rabbit@warp10 =>
+ %% #{'%2F_qq.636' => leader,'%2F_qq.243' => leader,
+ %% '%2F_qq.1939' => leader,'%2F_qq.1150' => leader,
+ %% '%2F_qq.1109' => leader,'%2F_qq.1654' => leader,
+ %% '%2F_qq.1679' => leader,'%2F_qq.1003' => leader,
+ %% '%2F_qq.1593' => leader,'%2F_qq.1765' => leader,
+ %% '%2F_qq.933' => leader,'%2F_qq.38' => leader,
+ %% '%2F_qq.1357' => leader,'%2F_qq.1345' => leader,
+ %% '%2F_qq.1694' => leader,'%2F_qq.994' => leader,
+ %% '%2F_qq.490' => leader,'%2F_qq.1704' => leader,
+ %% '%2F_qq.58' => leader,'%2F_qq.564' => leader,
+ %% '%2F_qq.683' => leader,'%2F_qq.386' => leader,
+ %% '%2F_qq.753' => leader,'%2F_qq.6' => leader,
+ %% '%2F_qq.1590' => leader,'%2F_qq.1363' => leader,
+ %% '%2F_qq.882' => leader,'%2F_qq.1161' => leader,...}}
+ ReplicaStates = maps:from_list(
+ rabbit_misc:append_rpc_all_nodes(rabbit_nodes:all_running(),
+ ?MODULE, all_replica_states, [])),
+ filter_quorum_critical(Queues, ReplicaStates).
+
+-spec filter_quorum_critical([amqqueue:amqqueue()], #{node() => #{atom() => atom()}}) -> [amqqueue:amqqueue()].
+
+filter_quorum_critical(Queues, ReplicaStates) ->
+ lists:filter(fun (Q) ->
+ MemberNodes = rabbit_amqqueue:get_quorum_nodes(Q),
+ {Name, _Node} = amqqueue:get_pid(Q),
+ AllUp = lists:filter(fun (N) ->
+ {Name, _} = amqqueue:get_pid(Q),
+ case maps:get(N, ReplicaStates, undefined) of
+ #{Name := State} when State =:= follower orelse State =:= leader ->
+ true;
+ _ -> false
+ end
+ end, MemberNodes),
+ MinQuorum = length(MemberNodes) div 2 + 1,
+ length(AllUp) =< MinQuorum
+ end, Queues).
+
rpc_delete_metrics(QName) ->
ets:delete(queue_coarse_metrics, QName),
ets:delete(queue_metrics, QName),
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 64058ce013..808d4470aa 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -43,6 +43,7 @@ groups() ->
[
{single_node, [], all_tests()},
{single_node, [], memory_tests()},
+ {single_node, [], [node_removal_is_quorum_critical]},
{unclustered, [], [
{cluster_size_2, [], [add_member]}
]},
@@ -56,7 +57,8 @@ groups() ->
delete_member_classic,
delete_member_queue_not_found,
delete_member,
- delete_member_not_a_member]
+ delete_member_not_a_member,
+ node_removal_is_quorum_critical]
++ all_tests()},
{cluster_size_2, [], memory_tests()},
{cluster_size_3, [], [
@@ -74,12 +76,14 @@ groups() ->
shrink_all,
rebalance,
file_handle_reservations,
- file_handle_reservations_above_limit
+ file_handle_reservations_above_limit,
+ node_removal_is_not_quorum_critical
]},
{cluster_size_5, [], [start_queue,
start_queue_concurrent,
quorum_cluster_size_3,
- quorum_cluster_size_7
+ quorum_cluster_size_7,
+ node_removal_is_not_quorum_critical
]},
{clustered_with_partitions, [], [
reconnect_consumer_and_publish,
@@ -1387,6 +1391,31 @@ delete_member_not_a_member(Config) ->
rpc:call(Server, rabbit_quorum_queue, delete_member,
[<<"/">>, QQ, Server])).
+%% These tests check if node removal would cause any queues to lose (or not lose)
+%% their quorum. See rabbitmq/rabbitmq-cli#389 for background.
+
+node_removal_is_quorum_critical(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QName = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QName, 0, 0},
+ declare(Ch, QName, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ timer:sleep(100),
+ [begin
+ Qs = rpc:call(S, rabbit_quorum_queue, list_with_minimum_quorum, []),
+ ?assertEqual([QName], queue_names(Qs))
+ end || S <- Servers].
+
+node_removal_is_not_quorum_critical(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QName = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QName, 0, 0},
+ declare(Ch, QName, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ timer:sleep(100),
+ Qs = rpc:call(Server, rabbit_quorum_queue, list_with_minimum_quorum, []),
+ ?assertEqual([], Qs).
+
file_handle_reservations(Config) ->
Servers = [Server1 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server1),
@@ -2407,3 +2436,9 @@ wait_for_consensus(Name, Config) ->
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
RaName = ra_name(Name),
{ok, _, _} = ra:members({RaName, Server}).
+
+queue_names(Records) ->
+ [begin
+ #resource{name = Name} = amqqueue:get_name(Q),
+ Name
+ end || Q <- Records].