diff options
Diffstat (limited to 'src/rabbit_amqqueue.erl')
-rw-r--r-- | src/rabbit_amqqueue.erl | 75 |
1 files changed, 69 insertions, 6 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index b77e0118e1..6227a5bbfa 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -32,8 +32,9 @@ emit_info_local/4, emit_info_down/4]). -export([count/0]). -export([list_down/1, count/1, list_names/0, list_names/1, list_local_names/0, + list_local_mirrored_classic_names/0, list_local_names_down/0, list_with_possible_retry/1]). --export([list_by_type/1]). +-export([list_by_type/1, sample_local_queues/0, sample_n_by_name/2, sample_n/2]). -export([force_event_refresh/1, notify_policy_changed/1]). -export([consumers/1, consumers_all/1, emit_consumers_all/4, consumer_info_keys/0]). -export([basic_get/6, basic_consume/12, basic_cancel/6, notify_decorators/1]). @@ -44,8 +45,8 @@ -export([update_mirroring/1, sync_mirrors/1, cancel_sync_mirrors/1]). -export([emit_unresponsive/6, emit_unresponsive_local/5, is_unresponsive/2]). -export([is_replicated/1, is_dead_exclusive/1]). % Note: exported due to use in qlc expression. --export([list_local_followers/0, - get_quorum_nodes/1]). +-export([list_local_quorum_queues/0, list_local_quorum_queue_names/0, + list_local_leaders/0, list_local_followers/0, get_quorum_nodes/1]). -export([ensure_rabbit_queue_record_is_initialized/1]). -export([format/1]). -export([delete_immediately_by_resource/1]). @@ -955,8 +956,41 @@ is_down(Q) -> true end. + +-spec sample_local_queues() -> [amqqueue:amqqueue()]. +sample_local_queues() -> sample_n_by_name(list_local_names(), 300). + +-spec sample_n_by_name([rabbit_amqqueue:name()], pos_integer()) -> [amqqueue:amqqueue()]. +sample_n_by_name([], _N) -> + []; +sample_n_by_name(Names, N) when is_list(Names) andalso is_integer(N) andalso N > 0 -> + %% lists:nth/2 throws when position is > list length + M = erlang:min(N, length(Names)), + Ids = lists:foldl(fun( _, Acc) when length(Acc) >= 100 -> + Acc; + (_, Acc) -> + Pick = lists:nth(rand:uniform(M), Names), + [Pick | Acc] + end, + [], lists:seq(1, M)), + lists:map(fun (Id) -> + {ok, Q} = rabbit_amqqueue:lookup(Id), + Q + end, + lists:usort(Ids)). + +-spec sample_n([amqqueue:amqqueue()], pos_integer()) -> [amqqueue:amqqueue()]. +sample_n([], _N) -> + []; +sample_n(Queues, N) when is_list(Queues) andalso is_integer(N) andalso N > 0 -> + Names = [amqqueue:get_name(Q) || Q <- Queues], + sample_n_by_name(Names, N). + + -spec list_by_type(atom()) -> [amqqueue:amqqueue()]. +list_by_type(classic) -> list_by_type(rabbit_classic_queue); +list_by_type(quorum) -> list_by_type(rabbit_quorum_queue); list_by_type(Type) -> {atomic, Qs} = mnesia:sync_transaction( @@ -967,11 +1001,40 @@ list_by_type(Type) -> end), Qs. +-spec list_local_quorum_queue_names() -> [rabbit_amqqueue:name()]. + +list_local_quorum_queue_names() -> + [ amqqueue:get_name(Q) || Q <- list_by_type(quorum), + amqqueue:get_state(Q) =/= crashed, + lists:member(node(), get_quorum_nodes(Q))]. + +-spec list_local_quorum_queues() -> [amqqueue:amqqueue()]. +list_local_quorum_queues() -> + [ Q || Q <- list_by_type(quorum), + amqqueue:get_state(Q) =/= crashed, + lists:member(node(), get_quorum_nodes(Q))]. + +-spec list_local_leaders() -> [amqqueue:amqqueue()]. +list_local_leaders() -> + [ Q || Q <- list(), + amqqueue:is_quorum(Q), + amqqueue:get_state(Q) =/= crashed, amqqueue:get_leader(Q) =:= node()]. + +-spec list_local_followers() -> [amqqueue:amqqueue()]. list_local_followers() -> - [ amqqueue:get_name(Q) - || Q <- list(), + [ Q || Q <- list(), amqqueue:is_quorum(Q), - amqqueue:get_state(Q) =/= crashed, amqqueue:get_leader(Q) =/= node(), lists:member(node(), get_quorum_nodes(Q))]. + amqqueue:get_state(Q) =/= crashed, amqqueue:get_leader(Q) =/= node(), + lists:member(node(), get_quorum_nodes(Q))]. + +-spec list_local_mirrored_classic_names() -> [rabbit_amqqueue:name()]. +list_local_mirrored_classic_names() -> + [ amqqueue:get_name(Q) || Q <- list(), + amqqueue:get_state(Q) =/= crashed, + amqqueue:is_classic(Q), + is_local_to_node(amqqueue:get_pid(Q), node()), + is_replicated(Q)]. + is_local_to_node(QPid, Node) when ?IS_CLASSIC(QPid) -> Node =:= node(QPid); |