diff options
author | dcorbacho <dparracorbacho@piotal.io> | 2020-10-19 13:03:02 +0100 |
---|---|---|
committer | dcorbacho <dparracorbacho@piotal.io> | 2020-10-19 16:05:27 +0100 |
commit | dc54e7f943497f3a385e42bbfc8c88b56ef2cca8 (patch) | |
tree | fb06fc1c2c5feec4d3d945420aa350f5b181e228 | |
parent | bfe9eebc635d7a1b13981bf66995279d7d4c48c0 (diff) | |
download | rabbitmq-server-git-dc54e7f943497f3a385e42bbfc8c88b56ef2cca8.tar.gz |
Refactor info queues on queue types
Allow each queue type to have its own keys or return '' for unknown keys
-rw-r--r-- | src/rabbit_amqqueue.erl | 21 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 4 | ||||
-rw-r--r-- | src/rabbit_queue_type.erl | 18 | ||||
-rw-r--r-- | src/rabbit_quorum_queue.erl | 6 | ||||
-rw-r--r-- | src/rabbit_stream_queue.erl | 10 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 4 |
6 files changed, 35 insertions, 28 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 82b261045d..fdf5d4feea 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -1165,6 +1165,9 @@ list_for_count(VHost) -> -spec info_keys() -> rabbit_types:info_keys(). +%% It should no default to classic queue keys, but a subset of those that must be shared +%% by all queue types. Not sure this is even being used, so will leave it here for backwards +%% compatibility. Each queue type handles now info(Q, all_keys) with the keys it supports. info_keys() -> rabbit_amqqueue_process:info_keys(). map(Qs, F) -> rabbit_misc:filter_exit_map(F, Qs). @@ -1209,24 +1212,10 @@ info(Q, Items) when ?is_amqqueue(Q) -> rabbit_queue_type:info(Q, Items). info_down(Q, DownReason) -> - info_down(Q, rabbit_amqqueue_process:info_keys(), DownReason). + rabbit_queue_type:info_down(Q, DownReason). info_down(Q, Items, DownReason) -> - [{Item, i_down(Item, Q, DownReason)} || Item <- Items, Item =/= totals, Item =/= type_specific]. - -i_down(name, Q, _) -> amqqueue:get_name(Q); -i_down(durable, Q, _) -> amqqueue:is_durable(Q); -i_down(auto_delete, Q, _) -> amqqueue:is_auto_delete(Q); -i_down(arguments, Q, _) -> amqqueue:get_arguments(Q); -i_down(pid, Q, _) -> amqqueue:get_pid(Q); -i_down(recoverable_slaves, Q, _) -> amqqueue:get_recoverable_slaves(Q); -i_down(type, Q, _) -> amqqueue:get_type(Q); -i_down(state, _Q, DownReason) -> DownReason; -i_down(K, _Q, _DownReason) -> - case lists:member(K, rabbit_amqqueue_process:info_keys()) of - true -> ''; - false -> throw({bad_argument, K}) - end. + rabbit_queue_type:info_down(Q, Items, DownReason). -spec info_all(rabbit_types:vhost()) -> [rabbit_types:infos()]. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 49e0a13a0e..0480db9cfe 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -535,8 +535,8 @@ i(master_pid, #state{q = Q}) when ?is_amqqueue(Q) -> amqqueue:get_pid(Q); i(is_synchronised, #state{depth_delta = DD}) -> DD =:= 0; -i(Item, _State) -> - throw({bad_argument, Item}). +i(_, _) -> + ''. bq_init(BQ, Q, Recover) -> Self = self(), diff --git a/src/rabbit_queue_type.erl b/src/rabbit_queue_type.erl index 8424ee5d15..3d34c969e0 100644 --- a/src/rabbit_queue_type.erl +++ b/src/rabbit_queue_type.erl @@ -18,6 +18,7 @@ remove/2, info/2, state_info/1, + info_down/2, info_down/3, %% stateful client API new/2, @@ -46,6 +47,10 @@ -define(STATE, ?MODULE). +%% Recoverable slaves shouldn't really be a generic one, but let's keep it here until +%% mirrored queues are deprecated. +-define(DOWN_KEYS, [name, durable, auto_delete, arguments, pid, recoverable_slaves, type, state]). + -define(QREF(QueueReference), (is_tuple(QueueReference) andalso element(1, QueueReference) == resource) orelse is_atom(QueueReference)). @@ -274,8 +279,13 @@ state_info(#ctx{state = S, state_info(_) -> #{}. +down_keys() -> ?DOWN_KEYS. + +info_down(Q, DownReason) -> + info_down(Q, down_keys(), DownReason). + info_down(Q, all_keys, DownReason) -> - info_down(Q, rabbit_amqqueue_process:info_keys(), DownReason); + info_down(Q, down_keys(), DownReason); info_down(Q, Items, DownReason) -> [{Item, i_down(Item, Q, DownReason)} || Item <- Items]. @@ -287,11 +297,7 @@ i_down(pid, Q, _) -> amqqueue:get_pid(Q); i_down(recoverable_slaves, Q, _) -> amqqueue:get_recoverable_slaves(Q); i_down(type, Q, _) -> amqqueue:get_type(Q); i_down(state, _Q, DownReason) -> DownReason; -i_down(K, _Q, _DownReason) -> - case lists:member(K, rabbit_amqqueue_process:info_keys()) of - true -> ''; - false -> throw({bad_argument, K}) - end. +i_down(_K, _Q, _DownReason) -> ''. is_policy_applicable(Q, Policy) -> Mod = amqqueue:get_type(Q), diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index c0b1aa0965..32ac9c4036 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -80,6 +80,9 @@ message_bytes_ram ]). +-define(INFO_KEYS, [name, durable, auto_delete, arguments, pid, messages, messages_ready, + messages_unacknowledged, local_state, type] ++ ?STATISTICS_KEYS). + -define(RPC_TIMEOUT, 1000). -define(TICK_TIMEOUT, 5000). %% the ra server tick time -define(DELETE_TIMEOUT, 5000). @@ -779,7 +782,8 @@ infos(QName, Keys) -> [] end. - +info(Q, all_keys) -> + info(Q, ?INFO_KEYS); info(Q, Items) -> lists:foldr(fun(totals, Acc) -> i_totals(Q) ++ Acc; diff --git a/src/rabbit_stream_queue.erl b/src/rabbit_stream_queue.erl index 7ad5076cd3..8e45290da4 100644 --- a/src/rabbit_stream_queue.erl +++ b/src/rabbit_stream_queue.erl @@ -48,6 +48,10 @@ -include("rabbit.hrl"). -include("amqqueue.hrl"). +-define(INFO_KEYS, [name, durable, auto_delete, arguments, leader, members, online, state, + messages, messages_ready, messages_unacknowledged, committed_offset, + policy, operator_policy, effective_policy_definition, type]). + -type appender_seq() :: non_neg_integer(). -record(stream, {name :: rabbit_types:r('queue'), @@ -342,6 +346,8 @@ settle(_, _, _, #stream_client{name = Name}) -> "basic.nack and basic.reject not supported by stream queues ~s", [rabbit_misc:rs(Name)]). +info(Q, all_items) -> + info(Q, ?INFO_KEYS); info(Q, Items) -> lists:foldr(fun(Item, Acc) -> [{Item, i(Item, Q)} | Acc] @@ -409,7 +415,9 @@ i(effective_policy_definition, Q) -> Def -> Def end; i(type, _) -> - stream. + stream; +i(_, _) -> + ''. init(Q) when ?is_amqqueue(Q) -> Leader = amqqueue:get_pid(Q), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 617ccb432e..e3837c726e 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -975,8 +975,8 @@ info(backing_queue_status, #vqstate { {avg_egress_rate , AvgEgressRate}, {avg_ack_ingress_rate, AvgAckIngressRate}, {avg_ack_egress_rate , AvgAckEgressRate} ]; -info(Item, _) -> - throw({bad_argument, Item}). +info(_, _) -> + ''. invoke(?MODULE, Fun, State) -> Fun(?MODULE, State); invoke( _, _, State) -> State. |