diff options
author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-10-22 13:52:05 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-10-22 13:52:05 +0200 |
commit | dd5e879949aeccad3aca29af56161bbfeee300fc (patch) | |
tree | 340634edc6bef1d1e42519855924cb656b19ec40 | |
parent | 3147ee9a71ee37a63551fa93f91e279b650c834b (diff) | |
parent | 440f05ab5e3d488d21ceb52b7eb15e5ce5bf033a (diff) | |
download | rabbitmq-server-git-dd5e879949aeccad3aca29af56161bbfeee300fc.tar.gz |
Merge pull request #2475 from rabbitmq/queue-type-info-keys
Refactor info queues on queue types
-rw-r--r-- | src/rabbit_amqqueue.erl | 21 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 4 | ||||
-rw-r--r-- | src/rabbit_queue_type.erl | 18 | ||||
-rw-r--r-- | src/rabbit_quorum_queue.erl | 6 | ||||
-rw-r--r-- | src/rabbit_stream_queue.erl | 10 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 4 |
6 files changed, 35 insertions, 28 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index deead84971..6104228c54 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -1176,6 +1176,9 @@ list_for_count(VHost) -> -spec info_keys() -> rabbit_types:info_keys(). +%% It should no default to classic queue keys, but a subset of those that must be shared +%% by all queue types. Not sure this is even being used, so will leave it here for backwards +%% compatibility. Each queue type handles now info(Q, all_keys) with the keys it supports. info_keys() -> rabbit_amqqueue_process:info_keys(). map(Qs, F) -> rabbit_misc:filter_exit_map(F, Qs). @@ -1220,24 +1223,10 @@ info(Q, Items) when ?is_amqqueue(Q) -> rabbit_queue_type:info(Q, Items). info_down(Q, DownReason) -> - info_down(Q, rabbit_amqqueue_process:info_keys(), DownReason). + rabbit_queue_type:info_down(Q, DownReason). info_down(Q, Items, DownReason) -> - [{Item, i_down(Item, Q, DownReason)} || Item <- Items, Item =/= totals, Item =/= type_specific]. - -i_down(name, Q, _) -> amqqueue:get_name(Q); -i_down(durable, Q, _) -> amqqueue:is_durable(Q); -i_down(auto_delete, Q, _) -> amqqueue:is_auto_delete(Q); -i_down(arguments, Q, _) -> amqqueue:get_arguments(Q); -i_down(pid, Q, _) -> amqqueue:get_pid(Q); -i_down(recoverable_slaves, Q, _) -> amqqueue:get_recoverable_slaves(Q); -i_down(type, Q, _) -> amqqueue:get_type(Q); -i_down(state, _Q, DownReason) -> DownReason; -i_down(K, _Q, _DownReason) -> - case lists:member(K, rabbit_amqqueue_process:info_keys()) of - true -> ''; - false -> throw({bad_argument, K}) - end. + rabbit_queue_type:info_down(Q, Items, DownReason). -spec info_all(rabbit_types:vhost()) -> [rabbit_types:infos()]. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 49e0a13a0e..0480db9cfe 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -535,8 +535,8 @@ i(master_pid, #state{q = Q}) when ?is_amqqueue(Q) -> amqqueue:get_pid(Q); i(is_synchronised, #state{depth_delta = DD}) -> DD =:= 0; -i(Item, _State) -> - throw({bad_argument, Item}). +i(_, _) -> + ''. bq_init(BQ, Q, Recover) -> Self = self(), diff --git a/src/rabbit_queue_type.erl b/src/rabbit_queue_type.erl index a357c5e334..2d45c043ba 100644 --- a/src/rabbit_queue_type.erl +++ b/src/rabbit_queue_type.erl @@ -18,6 +18,7 @@ remove/2, info/2, state_info/1, + info_down/2, info_down/3, %% stateful client API new/2, @@ -46,6 +47,10 @@ -define(STATE, ?MODULE). +%% Recoverable slaves shouldn't really be a generic one, but let's keep it here until +%% mirrored queues are deprecated. +-define(DOWN_KEYS, [name, durable, auto_delete, arguments, pid, recoverable_slaves, type, state]). + -define(QREF(QueueReference), (is_tuple(QueueReference) andalso element(1, QueueReference) == resource) orelse is_atom(QueueReference)). @@ -271,8 +276,13 @@ state_info(#ctx{state = S, state_info(_) -> #{}. +down_keys() -> ?DOWN_KEYS. + +info_down(Q, DownReason) -> + info_down(Q, down_keys(), DownReason). + info_down(Q, all_keys, DownReason) -> - info_down(Q, rabbit_amqqueue_process:info_keys(), DownReason); + info_down(Q, down_keys(), DownReason); info_down(Q, Items, DownReason) -> [{Item, i_down(Item, Q, DownReason)} || Item <- Items]. @@ -284,11 +294,7 @@ i_down(pid, Q, _) -> amqqueue:get_pid(Q); i_down(recoverable_slaves, Q, _) -> amqqueue:get_recoverable_slaves(Q); i_down(type, Q, _) -> amqqueue:get_type(Q); i_down(state, _Q, DownReason) -> DownReason; -i_down(K, _Q, _DownReason) -> - case lists:member(K, rabbit_amqqueue_process:info_keys()) of - true -> ''; - false -> throw({bad_argument, K}) - end. +i_down(_K, _Q, _DownReason) -> ''. is_policy_applicable(Q, Policy) -> Mod = amqqueue:get_type(Q), diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 9d42db2169..563b206590 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -80,6 +80,9 @@ message_bytes_ram ]). +-define(INFO_KEYS, [name, durable, auto_delete, arguments, pid, messages, messages_ready, + messages_unacknowledged, local_state, type] ++ ?STATISTICS_KEYS). + -define(RPC_TIMEOUT, 1000). -define(TICK_TIMEOUT, 5000). %% the ra server tick time -define(DELETE_TIMEOUT, 5000). @@ -779,7 +782,8 @@ infos(QName, Keys) -> [] end. - +info(Q, all_keys) -> + info(Q, ?INFO_KEYS); info(Q, Items) -> lists:foldr(fun(totals, Acc) -> i_totals(Q) ++ Acc; diff --git a/src/rabbit_stream_queue.erl b/src/rabbit_stream_queue.erl index 2a9575c117..0ae58fc6b8 100644 --- a/src/rabbit_stream_queue.erl +++ b/src/rabbit_stream_queue.erl @@ -48,6 +48,10 @@ -include("rabbit.hrl"). -include("amqqueue.hrl"). +-define(INFO_KEYS, [name, durable, auto_delete, arguments, leader, members, online, state, + messages, messages_ready, messages_unacknowledged, committed_offset, + policy, operator_policy, effective_policy_definition, type]). + -type appender_seq() :: non_neg_integer(). -record(stream, {name :: rabbit_types:r('queue'), @@ -342,6 +346,8 @@ settle(_, _, _, #stream_client{name = Name}) -> "basic.nack and basic.reject not supported by stream queues ~s", [rabbit_misc:rs(Name)]). +info(Q, all_items) -> + info(Q, ?INFO_KEYS); info(Q, Items) -> lists:foldr(fun(Item, Acc) -> [{Item, i(Item, Q)} | Acc] @@ -409,7 +415,9 @@ i(effective_policy_definition, Q) -> Def -> Def end; i(type, _) -> - stream. + stream; +i(_, _) -> + ''. init(Q) when ?is_amqqueue(Q) -> Leader = amqqueue:get_pid(Q), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 617ccb432e..e3837c726e 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -975,8 +975,8 @@ info(backing_queue_status, #vqstate { {avg_egress_rate , AvgEgressRate}, {avg_ack_ingress_rate, AvgAckIngressRate}, {avg_ack_egress_rate , AvgAckEgressRate} ]; -info(Item, _) -> - throw({bad_argument, Item}). +info(_, _) -> + ''. invoke(?MODULE, Fun, State) -> Fun(?MODULE, State); invoke( _, _, State) -> State. |