summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2019-07-02 16:11:54 +0100
committerkjnilsson <knilsson@pivotal.io>2019-08-07 16:09:29 +0100
commit344492576f6ff3bbd947b1d3b60f7cf01c367cd2 (patch)
tree11eddddf0b730e333e700f46966cace5588436e8
parente59dcbe3f4945ca19ae580acae6422ae7adc39a4 (diff)
downloadrabbitmq-server-git-344492576f6ff3bbd947b1d3b60f7cf01c367cd2.tar.gz
Add marker rabbit_queue_type behaviour
And use the implementing module as the value of the amqqueue record `type` field. This will allow for easy dispatch to the queue type implementation. Make amqqueue compatible with the classic queue tag
-rw-r--r--include/amqqueue.hrl6
-rw-r--r--include/amqqueue_v2.hrl2
-rw-r--r--src/amqqueue.erl59
-rw-r--r--src/amqqueue_v1.erl36
-rw-r--r--src/rabbit_amqqueue.erl40
-rw-r--r--src/rabbit_quorum_queue.erl65
-rw-r--r--test/amqqueue_backward_compatibility_SUITE.erl6
-rw-r--r--test/quorum_queue_SUITE.erl6
8 files changed, 130 insertions, 90 deletions
diff --git a/include/amqqueue.hrl b/include/amqqueue.hrl
index 3a9ac45cce..5a8a9bad03 100644
--- a/include/amqqueue.hrl
+++ b/include/amqqueue.hrl
@@ -51,16 +51,16 @@
(?is_amqqueue_v1(Q) andalso
?amqqueue_v1_field_state(Q) =:= State))).
--define(amqqueue_v1_type, classic).
+-define(amqqueue_v1_type, rabbit_classic_queue).
-define(amqqueue_is_classic(Q),
((?is_amqqueue_v2(Q) andalso
- ?amqqueue_v2_field_type(Q) =:= classic) orelse
+ ?amqqueue_v2_field_type(Q) =:= rabbit_classic_queue) orelse
?is_amqqueue_v1(Q))).
-define(amqqueue_is_quorum(Q),
(?is_amqqueue_v2(Q) andalso
- ?amqqueue_v2_field_type(Q) =:= quorum) orelse
+ ?amqqueue_v2_field_type(Q) =:= rabbit_quorum_queue) orelse
false).
-define(amqqueue_has_valid_pid(Q),
diff --git a/include/amqqueue_v2.hrl b/include/amqqueue_v2.hrl
index 37cd7ba2a8..c79a3b7366 100644
--- a/include/amqqueue_v2.hrl
+++ b/include/amqqueue_v2.hrl
@@ -19,4 +19,4 @@
-define(amqqueue_v2_field_vhost(Q), element(18, Q)).
-define(amqqueue_v2_field_options(Q), element(19, Q)).
-define(amqqueue_v2_field_type(Q), element(20, Q)).
--define(amqqueue_v2_field_quorum_nodes(Q), element(21, Q)).
+-define(amqqueue_v2_field_type_state(Q), element(21, Q)).
diff --git a/src/amqqueue.erl b/src/amqqueue.erl
index 35e7f0c4c4..89969d018e 100644
--- a/src/amqqueue.erl
+++ b/src/amqqueue.erl
@@ -57,9 +57,9 @@
% policy_version
get_policy_version/1,
set_policy_version/2,
- % quorum_nodes
- get_quorum_nodes/1,
- set_quorum_nodes/2,
+ % type_state
+ get_type_state/1,
+ set_type_state/2,
% recoverable_slaves
get_recoverable_slaves/1,
set_recoverable_slaves/2,
@@ -91,6 +91,8 @@
macros/0]).
-define(record_version, amqqueue_v2).
+-define(is_backwards_compat_classic(T),
+ (T =:= classic orelse T =:= ?amqqueue_v1_type)).
-record(amqqueue, {
name :: rabbit_amqqueue:name() | '_', %% immutable
@@ -118,8 +120,8 @@
slave_pids_pending_shutdown = [] :: [pid()] | '_',
vhost :: rabbit_types:vhost() | undefined | '_', %% secondary index
options = #{} :: map() | '_',
- type = ?amqqueue_v1_type :: atom() | '_',
- quorum_nodes = [] :: [node()] | '_'
+ type = ?amqqueue_v1_type :: module() | '_',
+ type_state = #{} :: map() | '_'
}).
-type amqqueue() :: amqqueue_v1:amqqueue_v1() | amqqueue_v2().
@@ -143,7 +145,7 @@
vhost :: rabbit_types:vhost() | undefined,
options :: map(),
type :: atom(),
- quorum_nodes :: [node()]
+ type_state :: #{}
}.
-type ra_server_id() :: {Name :: atom(), Node :: node()}.
@@ -170,7 +172,7 @@
vhost :: '_',
options :: '_',
type :: atom() | '_',
- quorum_nodes :: '_'
+ type_state :: '_'
}.
-export_type([amqqueue/0,
@@ -341,7 +343,7 @@ new_with_version(?record_version,
pid = Pid,
vhost = VHost,
options = Options,
- type = Type};
+ type = ensure_type_compat(Type)};
new_with_version(Version,
Name,
Pid,
@@ -351,7 +353,8 @@ new_with_version(Version,
Args,
VHost,
Options,
- ?amqqueue_v1_type) ->
+ Type)
+ when ?is_backwards_compat_classic(Type) ->
amqqueue_v1:new_with_version(
Version,
Name,
@@ -451,7 +454,7 @@ set_gm_pids(Queue, GMPids) ->
-spec get_leader(amqqueue_v2()) -> node().
-get_leader(#amqqueue{type = quorum, pid = {_, Leader}}) -> Leader.
+get_leader(#amqqueue{type = rabbit_quorum_queue, pid = {_, Leader}}) -> Leader.
% operator_policy
@@ -551,18 +554,16 @@ set_recoverable_slaves(#amqqueue{} = Queue, Slaves) ->
set_recoverable_slaves(Queue, Slaves) ->
amqqueue_v1:set_recoverable_slaves(Queue, Slaves).
-% quorum_nodes (new in v2)
-
--spec get_quorum_nodes(amqqueue()) -> [node()].
-
-get_quorum_nodes(#amqqueue{quorum_nodes = Nodes}) -> Nodes;
-get_quorum_nodes(_) -> [].
+% type_state (new in v2)
--spec set_quorum_nodes(amqqueue(), [node()]) -> amqqueue().
+-spec get_type_state(amqqueue()) -> map().
+get_type_state(#amqqueue{type_state = TState}) -> TState;
+get_type_state(_) -> [].
-set_quorum_nodes(#amqqueue{} = Queue, Nodes) ->
- Queue#amqqueue{quorum_nodes = Nodes};
-set_quorum_nodes(Queue, _Nodes) ->
+-spec set_type_state(amqqueue(), map()) -> amqqueue().
+set_type_state(#amqqueue{} = Queue, TState) ->
+ Queue#amqqueue{type_state = TState};
+set_type_state(Queue, _TState) ->
Queue.
% slave_pids
@@ -660,7 +661,7 @@ is_classic(Queue) ->
-spec is_quorum(amqqueue()) -> boolean().
is_quorum(Queue) ->
- get_type(Queue) =:= quorum.
+ get_type(Queue) =:= rabbit_quorum_queue.
fields() ->
case record_version_to_use() of
@@ -697,13 +698,16 @@ pattern_match_on_name(Name) ->
pattern_match_on_type(Type) ->
case record_version_to_use() of
- ?record_version -> #amqqueue{type = Type, _ = '_'};
- _ when Type =:= classic -> amqqueue_v1:pattern_match_all();
+ ?record_version ->
+ #amqqueue{type = Type, _ = '_'};
+ _ when ?is_backwards_compat_classic(Type) ->
+ amqqueue_v1:pattern_match_all();
%% FIXME: We try a pattern which should never match when the
%% `quorum_queue` feature flag is not enabled yet. Is there
%% a better solution?
- _ -> amqqueue_v1:pattern_match_on_name(
- rabbit_misc:r(<<0>>, queue, <<0>>))
+ _ ->
+ amqqueue_v1:pattern_match_on_name(
+ rabbit_misc:r(<<0>>, queue, <<0>>))
end.
-spec reset_mirroring_and_decorators(amqqueue()) -> amqqueue().
@@ -757,3 +761,8 @@ macros([Field | Rest], I) ->
macros(Rest, I + 1);
macros([], _) ->
ok.
+
+ensure_type_compat(classic) ->
+ ?amqqueue_v1_type;
+ensure_type_compat(Type) ->
+ Type.
diff --git a/src/amqqueue_v1.erl b/src/amqqueue_v1.erl
index 9760d752f7..43b7ba9e43 100644
--- a/src/amqqueue_v1.erl
+++ b/src/amqqueue_v1.erl
@@ -57,9 +57,9 @@
% policy_version
get_policy_version/1,
set_policy_version/2,
- % quorum_nodes
- get_quorum_nodes/1,
- set_quorum_nodes/2,
+ % type_state
+ get_type_state/1,
+ set_type_state/2,
% recoverable_slaves
get_recoverable_slaves/1,
set_recoverable_slaves/2,
@@ -93,6 +93,8 @@
-dialyzer({nowarn_function, is_quorum/1}).
-define(record_version, ?MODULE).
+-define(is_backwards_compat_classic(T),
+ (T =:= classic orelse T =:= ?amqqueue_v1_type)).
-record(amqqueue, {
name :: rabbit_amqqueue:name() | '_', %% immutable
@@ -214,7 +216,7 @@ new(#resource{kind = queue} = Name,
rabbit_framing:amqp_table(),
rabbit_types:vhost() | undefined,
map(),
- ?amqqueue_v1_type) -> amqqueue().
+ ?amqqueue_v1_type | classic) -> amqqueue().
new(#resource{kind = queue} = Name,
Pid,
@@ -224,14 +226,15 @@ new(#resource{kind = queue} = Name,
Args,
VHost,
Options,
- ?amqqueue_v1_type)
+ Type)
when (is_pid(Pid) orelse Pid =:= none) andalso
is_boolean(Durable) andalso
is_boolean(AutoDelete) andalso
(is_pid(Owner) orelse Owner =:= none) andalso
is_list(Args) andalso
(is_binary(VHost) orelse VHost =:= undefined) andalso
- is_map(Options) ->
+ is_map(Options) andalso
+ ?is_backwards_compat_classic(Type) ->
new(
Name,
Pid,
@@ -297,14 +300,15 @@ new_with_version(?record_version,
Args,
VHost,
Options,
- ?amqqueue_v1_type)
+ Type)
when (is_pid(Pid) orelse Pid =:= none) andalso
is_boolean(Durable) andalso
is_boolean(AutoDelete) andalso
(is_pid(Owner) orelse Owner =:= none) andalso
is_list(Args) andalso
(is_binary(VHost) orelse VHost =:= undefined) andalso
- is_map(Options) ->
+ is_map(Options) andalso
+ ?is_backwards_compat_classic(Type) ->
new_with_version(
?record_version,
Name,
@@ -451,16 +455,16 @@ get_recoverable_slaves(#amqqueue{recoverable_slaves = Slaves}) ->
set_recoverable_slaves(#amqqueue{} = Queue, Slaves) ->
Queue#amqqueue{recoverable_slaves = Slaves}.
-% quorum_nodes (new in v2)
+% type_state (new in v2)
--spec get_quorum_nodes(amqqueue()) -> no_return().
+-spec get_type_state(amqqueue()) -> no_return().
-get_quorum_nodes(_) -> throw({unsupported, ?record_version, get_quorum_nodes}).
+get_type_state(_) -> throw({unsupported, ?record_version, get_type_state}).
--spec set_quorum_nodes(amqqueue(), [node()]) -> no_return().
+-spec set_type_state(amqqueue(), [node()]) -> no_return().
-set_quorum_nodes(_, _) ->
- throw({unsupported, ?record_version, set_quorum_nodes}).
+set_type_state(_, _) ->
+ throw({unsupported, ?record_version, set_type_state}).
% slave_pids
@@ -527,8 +531,8 @@ is_classic(Queue) ->
-spec is_quorum(amqqueue()) -> boolean().
-is_quorum(Queue) ->
- get_type(Queue) =:= quorum.
+is_quorum(Queue) when ?is_amqqueue(Queue) ->
+ false.
fields() -> fields(?record_version).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index cbe8738c5a..65691cedd0 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -44,7 +44,8 @@
-export([update_mirroring/1, sync_mirrors/1, cancel_sync_mirrors/1]).
-export([emit_unresponsive/6, emit_unresponsive_local/5, is_unresponsive/2]).
-export([is_replicated/1, is_dead_exclusive/1]). % Note: exported due to use in qlc expression.
--export([list_local_followers/0]).
+-export([list_local_followers/0,
+ get_quorum_nodes/1]).
-export([ensure_rabbit_queue_record_is_initialized/1]).
-export([format/1]).
-export([delete_immediately_by_resource/1]).
@@ -192,7 +193,7 @@ find_local_quorum_queues(VHost) ->
qlc:e(qlc:q([Q || Q <- mnesia:table(rabbit_durable_queue),
amqqueue:get_vhost(Q) =:= VHost,
amqqueue:is_quorum(Q) andalso
- (lists:member(Node, amqqueue:get_quorum_nodes(Q)))]))
+ (lists:member(Node, get_quorum_nodes(Q)))]))
end).
find_local_durable_classic_queues(VHost) ->
@@ -225,7 +226,7 @@ find_recoverable_queues() ->
%% - if the record is present - in order to restart.
(mnesia:read(rabbit_queue, amqqueue:get_name(Q), read) =:= []
orelse not rabbit_mnesia:is_process_alive(amqqueue:get_pid(Q)))))
- orelse (amqqueue:is_quorum(Q) andalso lists:member(Node, amqqueue:get_quorum_nodes(Q)))
+ orelse (amqqueue:is_quorum(Q) andalso lists:member(Node, get_quorum_nodes(Q)))
]))
end).
@@ -274,7 +275,7 @@ declare(QueueName = #resource{virtual_host = VHost}, Durable, AutoDelete, Args,
ok = check_declare_arguments(QueueName, Args),
Type = get_queue_type(Args),
TypeIsAllowed =
- Type =:= classic orelse
+ Type =:= rabbit_classic_queue orelse
rabbit_feature_flags:is_enabled(quorum_queue),
case TypeIsAllowed of
true ->
@@ -325,9 +326,16 @@ declare_classic_queue(Q, Node) ->
get_queue_type(Args) ->
case rabbit_misc:table_lookup(Args, <<"x-queue-type">>) of
undefined ->
- classic;
+ rabbit_classic_queue;
{_, V} ->
- erlang:binary_to_existing_atom(V, utf8)
+ %% TODO: this mapping of "friendly" queue type name to the
+ %% implementing module should be part of some kind of registry
+ case V of
+ <<"quorum">> ->
+ rabbit_quorum_queue;
+ <<"classic">> ->
+ rabbit_classic_queue
+ end
end.
-spec internal_declare(amqqueue:amqqueue(), boolean()) ->
@@ -824,7 +832,7 @@ list_local_followers() ->
[ amqqueue:get_name(Q)
|| Q <- list(),
amqqueue:is_quorum(Q),
- amqqueue:get_state(Q) =/= crashed, amqqueue:get_leader(Q) =/= node(), lists:member(node(), amqqueue:get_quorum_nodes(Q))].
+ amqqueue:get_state(Q) =/= crashed, amqqueue:get_leader(Q) =/= node(), lists:member(node(), get_quorum_nodes(Q))].
is_local_to_node(QPid, Node) when ?IS_CLASSIC(QPid) ->
Node =:= node(QPid);
@@ -1534,7 +1542,7 @@ forget_all_durable(Node) ->
%% recovery.
forget_node_for_queue(DeadNode, Q)
when ?amqqueue_is_quorum(Q) ->
- QN = amqqueue:get_quorum_nodes(Q),
+ QN = get_quorum_nodes(Q),
forget_node_for_queue(DeadNode, QN, Q);
forget_node_for_queue(DeadNode, Q) ->
RS = amqqueue:get_recoverable_slaves(Q),
@@ -1555,9 +1563,11 @@ forget_node_for_queue(DeadNode, [H|T], Q) when ?is_amqqueue(Q) ->
Type = amqqueue:get_type(Q),
case {node_permits_offline_promotion(H), Type} of
{false, _} -> forget_node_for_queue(DeadNode, T, Q);
- {true, classic} -> Q1 = amqqueue:set_pid(Q, rabbit_misc:node_to_fake_pid(H)),
+ {true, rabbit_classic_queue} ->
+ Q1 = amqqueue:set_pid(Q, rabbit_misc:node_to_fake_pid(H)),
ok = mnesia:write(rabbit_durable_queue, Q1, write);
- {true, quorum} -> ok
+ {true, rabbit_quorum_queue} ->
+ ok
end.
node_permits_offline_promotion(Node) ->
@@ -1755,7 +1765,7 @@ pseudo_queue(#resource{kind = queue} = QueueName, Pid, Durable)
[],
undefined, % VHost,
#{user => undefined}, % ActingUser
- classic % Type
+ rabbit_classic_queue % Type
).
-spec immutable(amqqueue:amqqueue()) -> amqqueue:amqqueue().
@@ -1864,3 +1874,11 @@ get_quorum_state({Name, _} = Id, QName, Map) ->
get_quorum_state({Name, _}, Map) ->
maps:get(Name, Map).
+
+get_quorum_nodes(Q) when ?is_amqqueue(Q) ->
+ case amqqueue:get_type_state(Q) of
+ #{nodes := Nodes} ->
+ Nodes;
+ _ ->
+ []
+ end.
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index cf54656e34..b4118b96ac 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -81,7 +81,7 @@ init_state({Name, _}, QName = #resource{}) ->
%% know what to do if the queue has `disappeared`. Let it crash.
{ok, Q} = rabbit_amqqueue:lookup(QName),
Leader = amqqueue:get_pid(Q),
- Nodes = amqqueue:get_quorum_nodes(Q),
+ Nodes = rabbit_amqqueue:get_quorum_nodes(Q),
%% Ensure the leader is listed first
Servers0 = [{Name, N} || N <- Nodes],
Servers = [Leader | lists:delete(Leader, Servers0)],
@@ -117,7 +117,7 @@ declare(Q) when ?amqqueue_is_quorum(Q) ->
Id = {RaName, node()},
Nodes = select_quorum_nodes(QuorumSize, rabbit_mnesia:cluster_nodes(all)),
NewQ0 = amqqueue:set_pid(Q, Id),
- NewQ1 = amqqueue:set_quorum_nodes(NewQ0, Nodes),
+ NewQ1 = amqqueue:set_type_state(NewQ0, #{nodes => Nodes}),
case rabbit_amqqueue:internal_declare(NewQ1, false) of
{created, NewQ} ->
TickTimeout = application:get_env(rabbit, quorum_tick_interval, ?TICK_TIMEOUT),
@@ -222,7 +222,7 @@ become_leader(QName, Name) ->
end),
case rabbit_amqqueue:lookup(QName) of
{ok, Q0} when ?is_amqqueue(Q0) ->
- Nodes = amqqueue:get_quorum_nodes(Q0),
+ Nodes = get_nodes(Q0),
[rpc:call(Node, ?MODULE, rpc_delete_metrics,
[QName], ?RPC_TIMEOUT)
|| Node <- Nodes, Node =/= node()];
@@ -369,7 +369,7 @@ delete(Q,
_IfUnused, _IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) ->
{Name, _} = amqqueue:get_pid(Q),
QName = amqqueue:get_name(Q),
- QNodes = amqqueue:get_quorum_nodes(Q),
+ QNodes = get_nodes(Q),
%% TODO Quorum queue needs to support consumer tracking for IfUnused
Timeout = ?DELETE_TIMEOUT,
{ok, ReadyMsgs, _} = stat(Q),
@@ -600,8 +600,8 @@ cleanup_data_dir() ->
{Name, _} = amqqueue:get_pid(Q),
Name
end
- || Q <- rabbit_amqqueue:list_by_type(quorum),
- lists:member(node(), amqqueue:get_quorum_nodes(Q))],
+ || Q <- rabbit_amqqueue:list_by_type(?MODULE),
+ lists:member(node(), get_nodes(Q))],
Registered = ra_directory:list_registered(),
_ = [maybe_delete_data_dir(UId) || {Name, UId} <- Registered,
not lists:member(Name, Names)],
@@ -644,7 +644,7 @@ status(Vhost, QueueName) ->
{ok, Q} when ?amqqueue_is_classic(Q) ->
{error, classic_queue_not_supported};
{ok, Q} when ?amqqueue_is_quorum(Q) ->
- Nodes = amqqueue:get_quorum_nodes(Q),
+ Nodes = get_nodes(Q),
[begin
case get_sys_status({RName, N}) of
{ok, Sys} ->
@@ -696,7 +696,7 @@ add_member(VHost, Name, Node, Timeout) ->
{ok, Q} when ?amqqueue_is_classic(Q) ->
{error, classic_queue_not_supported};
{ok, Q} when ?amqqueue_is_quorum(Q) ->
- QNodes = amqqueue:get_quorum_nodes(Q),
+ QNodes = get_nodes(Q),
case lists:member(Node, rabbit_mnesia:cluster_nodes(running)) of
false ->
{error, node_not_running};
@@ -727,9 +727,10 @@ add_member(Q, Node, Timeout) when ?amqqueue_is_quorum(Q) ->
case ra:add_member(Members, ServerId, Timeout) of
{ok, _, Leader} ->
Fun = fun(Q1) ->
- Q2 = amqqueue:set_quorum_nodes(
- Q1,
- [Node | amqqueue:get_quorum_nodes(Q1)]),
+ Q2 = update_type_state(
+ Q1, fun(#{nodes := Nodes} = Ts) ->
+ Ts#{nodes => [Node | Nodes]}
+ end),
amqqueue:set_pid(Q2, Leader)
end,
rabbit_misc:execute_mnesia_transaction(
@@ -753,7 +754,7 @@ delete_member(VHost, Name, Node) ->
{ok, Q} when ?amqqueue_is_classic(Q) ->
{error, classic_queue_not_supported};
{ok, Q} when ?amqqueue_is_quorum(Q) ->
- QNodes = amqqueue:get_quorum_nodes(Q),
+ QNodes = get_nodes(Q),
case lists:member(Node, QNodes) of
false ->
%% idempotent by design
@@ -779,10 +780,11 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
case ra:leave_and_delete_server(Members, ServerId) of
ok ->
Fun = fun(Q1) ->
- amqqueue:set_quorum_nodes(
+ update_type_state(
Q1,
- lists:delete(Node,
- amqqueue:get_quorum_nodes(Q1)))
+ fun(#{nodes := Nodes} = Ts) ->
+ Ts#{nodes => lists:delete(Node, Nodes)}
+ end)
end,
rabbit_misc:execute_mnesia_transaction(
fun() -> rabbit_amqqueue:update(QName, Fun) end),
@@ -802,7 +804,7 @@ shrink_all(Node) ->
QName = amqqueue:get_name(Q),
rabbit_log:info("~s: removing member (replica) on node ~w",
[rabbit_misc:rs(QName), Node]),
- Size = length(amqqueue:get_quorum_nodes(Q)),
+ Size = length(get_nodes(Q)),
case delete_member(Q, Node) of
ok ->
{QName, {ok, Size-1}};
@@ -812,8 +814,8 @@ shrink_all(Node) ->
{QName, {error, Size, Err}}
end
end || Q <- rabbit_amqqueue:list(),
- amqqueue:get_type(Q) == quorum,
- lists:member(Node, amqqueue:get_quorum_nodes(Q))].
+ amqqueue:get_type(Q) == ?MODULE,
+ lists:member(Node, get_nodes(Q))].
-spec grow(node(), binary(), binary(), all | even) ->
[{rabbit_amqqueue:name(),
@@ -821,7 +823,7 @@ shrink_all(Node) ->
grow(Node, VhostSpec, QueueSpec, Strategy) ->
Running = rabbit_mnesia:cluster_nodes(running),
[begin
- Size = length(amqqueue:get_quorum_nodes(Q)),
+ Size = length(get_nodes(Q)),
QName = amqqueue:get_name(Q),
rabbit_log:info("~s: adding a new member (replica) on node ~w",
[rabbit_misc:rs(QName), Node]),
@@ -836,12 +838,12 @@ grow(Node, VhostSpec, QueueSpec, Strategy) ->
end
end
|| Q <- rabbit_amqqueue:list(),
- amqqueue:get_type(Q) == quorum,
+ amqqueue:get_type(Q) == ?MODULE,
%% don't add a member if there is already one on the node
- not lists:member(Node, amqqueue:get_quorum_nodes(Q)),
+ not lists:member(Node, get_nodes(Q)),
%% node needs to be running
lists:member(Node, Running),
- matches_strategy(Strategy, amqqueue:get_quorum_nodes(Q)),
+ matches_strategy(Strategy, get_nodes(Q)),
is_match(amqqueue:get_vhost(Q), VhostSpec) andalso
is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec) ].
@@ -989,13 +991,13 @@ i(garbage_collection, Q) when ?is_amqqueue(Q) ->
[]
end;
i(members, Q) when ?is_amqqueue(Q) ->
- amqqueue:get_quorum_nodes(Q);
+ get_nodes(Q);
i(online, Q) -> online(Q);
i(leader, Q) -> leader(Q);
i(open_files, Q) when ?is_amqqueue(Q) ->
{Name, _} = amqqueue:get_pid(Q),
- Nodes = amqqueue:get_quorum_nodes(Q),
- {Data, _} = rpc:multicall(Nodes, rabbit_quorum_queue, open_files, [Name]),
+ Nodes = get_nodes(Q),
+ {Data, _} = rpc:multicall(Nodes, ?MODULE, open_files, [Name]),
lists:flatten(Data);
i(single_active_consumer_pid, Q) when ?is_amqqueue(Q) ->
QPid = amqqueue:get_pid(Q),
@@ -1047,12 +1049,12 @@ leader(Q) when ?is_amqqueue(Q) ->
end.
online(Q) when ?is_amqqueue(Q) ->
- Nodes = amqqueue:get_quorum_nodes(Q),
+ Nodes = get_nodes(Q),
{Name, _} = amqqueue:get_pid(Q),
[Node || Node <- Nodes, is_process_alive(Name, Node)].
format(Q) when ?is_amqqueue(Q) ->
- Nodes = amqqueue:get_quorum_nodes(Q),
+ Nodes = get_nodes(Q),
[{members, Nodes}, {online, online(Q)}, {leader, leader(Q)}].
is_process_alive(Name, Node) ->
@@ -1144,7 +1146,7 @@ select_quorum_nodes(Size, Rest, Selected) ->
%% member with the current leader first
members(Q) when ?amqqueue_is_quorum(Q) ->
{RaName, LeaderNode} = amqqueue:get_pid(Q),
- Nodes = lists:delete(LeaderNode, amqqueue:get_quorum_nodes(Q)),
+ Nodes = lists:delete(LeaderNode, get_nodes(Q)),
[{RaName, N} || N <- [LeaderNode | Nodes]].
make_ra_conf(Q, ServerId, TickTimeout) ->
@@ -1163,3 +1165,10 @@ make_ra_conf(Q, ServerId, TickTimeout) ->
tick_timeout => TickTimeout,
machine => RaMachine}.
+get_nodes(Q) when ?is_amqqueue(Q) ->
+ #{nodes := Nodes} = amqqueue:get_type_state(Q),
+ Nodes.
+
+update_type_state(Q, Fun) when ?is_amqqueue(Q) ->
+ Ts = amqqueue:get_type_state(Q),
+ amqqueue:set_type_state(Q, Fun(Ts)).
diff --git a/test/amqqueue_backward_compatibility_SUITE.erl b/test/amqqueue_backward_compatibility_SUITE.erl
index 05a049c9bb..a02c4721bc 100644
--- a/test/amqqueue_backward_compatibility_SUITE.erl
+++ b/test/amqqueue_backward_compatibility_SUITE.erl
@@ -104,7 +104,7 @@ new_amqqueue_v2_is_amqqueue(_) ->
[],
VHost,
#{},
- classic),
+ rabbit_classic_queue),
?assert(?is_amqqueue(Queue)),
?assert(?is_amqqueue_v2(Queue)),
?assert(not ?is_amqqueue_v1(Queue)),
@@ -253,7 +253,7 @@ amqqueue_v2_type_matching(_) ->
[],
VHost,
#{},
- classic),
+ rabbit_classic_queue),
?assert(?amqqueue_is_classic(ClassicQueue)),
?assert(amqqueue:is_classic(ClassicQueue)),
?assert(not ?amqqueue_is_quorum(ClassicQueue)),
@@ -267,7 +267,7 @@ amqqueue_v2_type_matching(_) ->
[],
VHost,
#{},
- quorum),
+ rabbit_quorum_queue),
?assert(not ?amqqueue_is_classic(QuorumQueue)),
?assert(not amqqueue:is_classic(QuorumQueue)),
?assert(?amqqueue_is_quorum(QuorumQueue)),
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index cc0e7aa75a..7ef38895eb 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -264,15 +264,15 @@ declare_args(Config) ->
declare(Ch, LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
{<<"x-max-length">>, long, 2000},
{<<"x-max-length-bytes">>, long, 2000}]),
- assert_queue_type(Server, LQ, quorum),
+ assert_queue_type(Server, LQ, rabbit_quorum_queue),
DQ = <<"classic-declare-args-q">>,
declare(Ch, DQ, [{<<"x-queue-type">>, longstr, <<"classic">>}]),
- assert_queue_type(Server, DQ, classic),
+ assert_queue_type(Server, DQ, rabbit_classic_queue),
DQ2 = <<"classic-q2">>,
declare(Ch, DQ2),
- assert_queue_type(Server, DQ2, classic).
+ assert_queue_type(Server, DQ2, rabbit_classic_queue).
declare_invalid_properties(Config) ->
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),