summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_amqqueue.erl')
-rw-r--r--src/rabbit_amqqueue.erl75
1 files changed, 69 insertions, 6 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index b77e0118e1..6227a5bbfa 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -32,8 +32,9 @@
emit_info_local/4, emit_info_down/4]).
-export([count/0]).
-export([list_down/1, count/1, list_names/0, list_names/1, list_local_names/0,
+ list_local_mirrored_classic_names/0,
list_local_names_down/0, list_with_possible_retry/1]).
--export([list_by_type/1]).
+-export([list_by_type/1, sample_local_queues/0, sample_n_by_name/2, sample_n/2]).
-export([force_event_refresh/1, notify_policy_changed/1]).
-export([consumers/1, consumers_all/1, emit_consumers_all/4, consumer_info_keys/0]).
-export([basic_get/6, basic_consume/12, basic_cancel/6, notify_decorators/1]).
@@ -44,8 +45,8 @@
-export([update_mirroring/1, sync_mirrors/1, cancel_sync_mirrors/1]).
-export([emit_unresponsive/6, emit_unresponsive_local/5, is_unresponsive/2]).
-export([is_replicated/1, is_dead_exclusive/1]). % Note: exported due to use in qlc expression.
--export([list_local_followers/0,
- get_quorum_nodes/1]).
+-export([list_local_quorum_queues/0, list_local_quorum_queue_names/0,
+ list_local_leaders/0, list_local_followers/0, get_quorum_nodes/1]).
-export([ensure_rabbit_queue_record_is_initialized/1]).
-export([format/1]).
-export([delete_immediately_by_resource/1]).
@@ -955,8 +956,41 @@ is_down(Q) ->
true
end.
+
+-spec sample_local_queues() -> [amqqueue:amqqueue()].
+sample_local_queues() -> sample_n_by_name(list_local_names(), 300).
+
+-spec sample_n_by_name([rabbit_amqqueue:name()], pos_integer()) -> [amqqueue:amqqueue()].
+sample_n_by_name([], _N) ->
+ [];
+sample_n_by_name(Names, N) when is_list(Names) andalso is_integer(N) andalso N > 0 ->
+ %% lists:nth/2 throws when position is > list length
+ M = erlang:min(N, length(Names)),
+ Ids = lists:foldl(fun( _, Acc) when length(Acc) >= 100 ->
+ Acc;
+ (_, Acc) ->
+ Pick = lists:nth(rand:uniform(M), Names),
+ [Pick | Acc]
+ end,
+ [], lists:seq(1, M)),
+ lists:map(fun (Id) ->
+ {ok, Q} = rabbit_amqqueue:lookup(Id),
+ Q
+ end,
+ lists:usort(Ids)).
+
+-spec sample_n([amqqueue:amqqueue()], pos_integer()) -> [amqqueue:amqqueue()].
+sample_n([], _N) ->
+ [];
+sample_n(Queues, N) when is_list(Queues) andalso is_integer(N) andalso N > 0 ->
+ Names = [amqqueue:get_name(Q) || Q <- Queues],
+ sample_n_by_name(Names, N).
+
+
-spec list_by_type(atom()) -> [amqqueue:amqqueue()].
+list_by_type(classic) -> list_by_type(rabbit_classic_queue);
+list_by_type(quorum) -> list_by_type(rabbit_quorum_queue);
list_by_type(Type) ->
{atomic, Qs} =
mnesia:sync_transaction(
@@ -967,11 +1001,40 @@ list_by_type(Type) ->
end),
Qs.
+-spec list_local_quorum_queue_names() -> [rabbit_amqqueue:name()].
+
+list_local_quorum_queue_names() ->
+ [ amqqueue:get_name(Q) || Q <- list_by_type(quorum),
+ amqqueue:get_state(Q) =/= crashed,
+ lists:member(node(), get_quorum_nodes(Q))].
+
+-spec list_local_quorum_queues() -> [amqqueue:amqqueue()].
+list_local_quorum_queues() ->
+ [ Q || Q <- list_by_type(quorum),
+ amqqueue:get_state(Q) =/= crashed,
+ lists:member(node(), get_quorum_nodes(Q))].
+
+-spec list_local_leaders() -> [amqqueue:amqqueue()].
+list_local_leaders() ->
+ [ Q || Q <- list(),
+ amqqueue:is_quorum(Q),
+ amqqueue:get_state(Q) =/= crashed, amqqueue:get_leader(Q) =:= node()].
+
+-spec list_local_followers() -> [amqqueue:amqqueue()].
list_local_followers() ->
- [ amqqueue:get_name(Q)
- || Q <- list(),
+ [ Q || Q <- list(),
amqqueue:is_quorum(Q),
- amqqueue:get_state(Q) =/= crashed, amqqueue:get_leader(Q) =/= node(), lists:member(node(), get_quorum_nodes(Q))].
+ amqqueue:get_state(Q) =/= crashed, amqqueue:get_leader(Q) =/= node(),
+ lists:member(node(), get_quorum_nodes(Q))].
+
+-spec list_local_mirrored_classic_names() -> [rabbit_amqqueue:name()].
+list_local_mirrored_classic_names() ->
+ [ amqqueue:get_name(Q) || Q <- list(),
+ amqqueue:get_state(Q) =/= crashed,
+ amqqueue:is_classic(Q),
+ is_local_to_node(amqqueue:get_pid(Q), node()),
+ is_replicated(Q)].
+
is_local_to_node(QPid, Node) when ?IS_CLASSIC(QPid) ->
Node =:= node(QPid);