summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-10-22 13:52:05 +0200
committerGitHub <noreply@github.com>2020-10-22 13:52:05 +0200
commitdd5e879949aeccad3aca29af56161bbfeee300fc (patch)
tree340634edc6bef1d1e42519855924cb656b19ec40
parent3147ee9a71ee37a63551fa93f91e279b650c834b (diff)
parent440f05ab5e3d488d21ceb52b7eb15e5ce5bf033a (diff)
downloadrabbitmq-server-git-dd5e879949aeccad3aca29af56161bbfeee300fc.tar.gz
Merge pull request #2475 from rabbitmq/queue-type-info-keys
Refactor info queues on queue types
-rw-r--r--src/rabbit_amqqueue.erl21
-rw-r--r--src/rabbit_mirror_queue_slave.erl4
-rw-r--r--src/rabbit_queue_type.erl18
-rw-r--r--src/rabbit_quorum_queue.erl6
-rw-r--r--src/rabbit_stream_queue.erl10
-rw-r--r--src/rabbit_variable_queue.erl4
6 files changed, 35 insertions, 28 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index deead84971..6104228c54 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -1176,6 +1176,9 @@ list_for_count(VHost) ->
-spec info_keys() -> rabbit_types:info_keys().
+%% It should no default to classic queue keys, but a subset of those that must be shared
+%% by all queue types. Not sure this is even being used, so will leave it here for backwards
+%% compatibility. Each queue type handles now info(Q, all_keys) with the keys it supports.
info_keys() -> rabbit_amqqueue_process:info_keys().
map(Qs, F) -> rabbit_misc:filter_exit_map(F, Qs).
@@ -1220,24 +1223,10 @@ info(Q, Items) when ?is_amqqueue(Q) ->
rabbit_queue_type:info(Q, Items).
info_down(Q, DownReason) ->
- info_down(Q, rabbit_amqqueue_process:info_keys(), DownReason).
+ rabbit_queue_type:info_down(Q, DownReason).
info_down(Q, Items, DownReason) ->
- [{Item, i_down(Item, Q, DownReason)} || Item <- Items, Item =/= totals, Item =/= type_specific].
-
-i_down(name, Q, _) -> amqqueue:get_name(Q);
-i_down(durable, Q, _) -> amqqueue:is_durable(Q);
-i_down(auto_delete, Q, _) -> amqqueue:is_auto_delete(Q);
-i_down(arguments, Q, _) -> amqqueue:get_arguments(Q);
-i_down(pid, Q, _) -> amqqueue:get_pid(Q);
-i_down(recoverable_slaves, Q, _) -> amqqueue:get_recoverable_slaves(Q);
-i_down(type, Q, _) -> amqqueue:get_type(Q);
-i_down(state, _Q, DownReason) -> DownReason;
-i_down(K, _Q, _DownReason) ->
- case lists:member(K, rabbit_amqqueue_process:info_keys()) of
- true -> '';
- false -> throw({bad_argument, K})
- end.
+ rabbit_queue_type:info_down(Q, Items, DownReason).
-spec info_all(rabbit_types:vhost()) -> [rabbit_types:infos()].
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 49e0a13a0e..0480db9cfe 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -535,8 +535,8 @@ i(master_pid, #state{q = Q}) when ?is_amqqueue(Q) ->
amqqueue:get_pid(Q);
i(is_synchronised, #state{depth_delta = DD}) ->
DD =:= 0;
-i(Item, _State) ->
- throw({bad_argument, Item}).
+i(_, _) ->
+ ''.
bq_init(BQ, Q, Recover) ->
Self = self(),
diff --git a/src/rabbit_queue_type.erl b/src/rabbit_queue_type.erl
index a357c5e334..2d45c043ba 100644
--- a/src/rabbit_queue_type.erl
+++ b/src/rabbit_queue_type.erl
@@ -18,6 +18,7 @@
remove/2,
info/2,
state_info/1,
+ info_down/2,
info_down/3,
%% stateful client API
new/2,
@@ -46,6 +47,10 @@
-define(STATE, ?MODULE).
+%% Recoverable slaves shouldn't really be a generic one, but let's keep it here until
+%% mirrored queues are deprecated.
+-define(DOWN_KEYS, [name, durable, auto_delete, arguments, pid, recoverable_slaves, type, state]).
+
-define(QREF(QueueReference),
(is_tuple(QueueReference) andalso element(1, QueueReference) == resource)
orelse is_atom(QueueReference)).
@@ -271,8 +276,13 @@ state_info(#ctx{state = S,
state_info(_) ->
#{}.
+down_keys() -> ?DOWN_KEYS.
+
+info_down(Q, DownReason) ->
+ info_down(Q, down_keys(), DownReason).
+
info_down(Q, all_keys, DownReason) ->
- info_down(Q, rabbit_amqqueue_process:info_keys(), DownReason);
+ info_down(Q, down_keys(), DownReason);
info_down(Q, Items, DownReason) ->
[{Item, i_down(Item, Q, DownReason)} || Item <- Items].
@@ -284,11 +294,7 @@ i_down(pid, Q, _) -> amqqueue:get_pid(Q);
i_down(recoverable_slaves, Q, _) -> amqqueue:get_recoverable_slaves(Q);
i_down(type, Q, _) -> amqqueue:get_type(Q);
i_down(state, _Q, DownReason) -> DownReason;
-i_down(K, _Q, _DownReason) ->
- case lists:member(K, rabbit_amqqueue_process:info_keys()) of
- true -> '';
- false -> throw({bad_argument, K})
- end.
+i_down(_K, _Q, _DownReason) -> ''.
is_policy_applicable(Q, Policy) ->
Mod = amqqueue:get_type(Q),
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 9d42db2169..563b206590 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -80,6 +80,9 @@
message_bytes_ram
]).
+-define(INFO_KEYS, [name, durable, auto_delete, arguments, pid, messages, messages_ready,
+ messages_unacknowledged, local_state, type] ++ ?STATISTICS_KEYS).
+
-define(RPC_TIMEOUT, 1000).
-define(TICK_TIMEOUT, 5000). %% the ra server tick time
-define(DELETE_TIMEOUT, 5000).
@@ -779,7 +782,8 @@ infos(QName, Keys) ->
[]
end.
-
+info(Q, all_keys) ->
+ info(Q, ?INFO_KEYS);
info(Q, Items) ->
lists:foldr(fun(totals, Acc) ->
i_totals(Q) ++ Acc;
diff --git a/src/rabbit_stream_queue.erl b/src/rabbit_stream_queue.erl
index 2a9575c117..0ae58fc6b8 100644
--- a/src/rabbit_stream_queue.erl
+++ b/src/rabbit_stream_queue.erl
@@ -48,6 +48,10 @@
-include("rabbit.hrl").
-include("amqqueue.hrl").
+-define(INFO_KEYS, [name, durable, auto_delete, arguments, leader, members, online, state,
+ messages, messages_ready, messages_unacknowledged, committed_offset,
+ policy, operator_policy, effective_policy_definition, type]).
+
-type appender_seq() :: non_neg_integer().
-record(stream, {name :: rabbit_types:r('queue'),
@@ -342,6 +346,8 @@ settle(_, _, _, #stream_client{name = Name}) ->
"basic.nack and basic.reject not supported by stream queues ~s",
[rabbit_misc:rs(Name)]).
+info(Q, all_items) ->
+ info(Q, ?INFO_KEYS);
info(Q, Items) ->
lists:foldr(fun(Item, Acc) ->
[{Item, i(Item, Q)} | Acc]
@@ -409,7 +415,9 @@ i(effective_policy_definition, Q) ->
Def -> Def
end;
i(type, _) ->
- stream.
+ stream;
+i(_, _) ->
+ ''.
init(Q) when ?is_amqqueue(Q) ->
Leader = amqqueue:get_pid(Q),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 617ccb432e..e3837c726e 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -975,8 +975,8 @@ info(backing_queue_status, #vqstate {
{avg_egress_rate , AvgEgressRate},
{avg_ack_ingress_rate, AvgAckIngressRate},
{avg_ack_egress_rate , AvgAckEgressRate} ];
-info(Item, _) ->
- throw({bad_argument, Item}).
+info(_, _) ->
+ ''.
invoke(?MODULE, Fun, State) -> Fun(?MODULE, State);
invoke( _, _, State) -> State.