diff options
author | kjnilsson <knilsson@pivotal.io> | 2019-07-02 16:11:54 +0100 |
---|---|---|
committer | kjnilsson <knilsson@pivotal.io> | 2019-08-07 16:09:29 +0100 |
commit | 344492576f6ff3bbd947b1d3b60f7cf01c367cd2 (patch) | |
tree | 11eddddf0b730e333e700f46966cace5588436e8 | |
parent | e59dcbe3f4945ca19ae580acae6422ae7adc39a4 (diff) | |
download | rabbitmq-server-git-344492576f6ff3bbd947b1d3b60f7cf01c367cd2.tar.gz |
Add marker rabbit_queue_type behaviour
And use the implementing module as the value of the amqqueue record
`type` field. This will allow for easy dispatch to the queue type
implementation.
Make amqqueue compatible with the classic queue tag
-rw-r--r-- | include/amqqueue.hrl | 6 | ||||
-rw-r--r-- | include/amqqueue_v2.hrl | 2 | ||||
-rw-r--r-- | src/amqqueue.erl | 59 | ||||
-rw-r--r-- | src/amqqueue_v1.erl | 36 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 40 | ||||
-rw-r--r-- | src/rabbit_quorum_queue.erl | 65 | ||||
-rw-r--r-- | test/amqqueue_backward_compatibility_SUITE.erl | 6 | ||||
-rw-r--r-- | test/quorum_queue_SUITE.erl | 6 |
8 files changed, 130 insertions, 90 deletions
diff --git a/include/amqqueue.hrl b/include/amqqueue.hrl index 3a9ac45cce..5a8a9bad03 100644 --- a/include/amqqueue.hrl +++ b/include/amqqueue.hrl @@ -51,16 +51,16 @@ (?is_amqqueue_v1(Q) andalso ?amqqueue_v1_field_state(Q) =:= State))). --define(amqqueue_v1_type, classic). +-define(amqqueue_v1_type, rabbit_classic_queue). -define(amqqueue_is_classic(Q), ((?is_amqqueue_v2(Q) andalso - ?amqqueue_v2_field_type(Q) =:= classic) orelse + ?amqqueue_v2_field_type(Q) =:= rabbit_classic_queue) orelse ?is_amqqueue_v1(Q))). -define(amqqueue_is_quorum(Q), (?is_amqqueue_v2(Q) andalso - ?amqqueue_v2_field_type(Q) =:= quorum) orelse + ?amqqueue_v2_field_type(Q) =:= rabbit_quorum_queue) orelse false). -define(amqqueue_has_valid_pid(Q), diff --git a/include/amqqueue_v2.hrl b/include/amqqueue_v2.hrl index 37cd7ba2a8..c79a3b7366 100644 --- a/include/amqqueue_v2.hrl +++ b/include/amqqueue_v2.hrl @@ -19,4 +19,4 @@ -define(amqqueue_v2_field_vhost(Q), element(18, Q)). -define(amqqueue_v2_field_options(Q), element(19, Q)). -define(amqqueue_v2_field_type(Q), element(20, Q)). --define(amqqueue_v2_field_quorum_nodes(Q), element(21, Q)). +-define(amqqueue_v2_field_type_state(Q), element(21, Q)). diff --git a/src/amqqueue.erl b/src/amqqueue.erl index 35e7f0c4c4..89969d018e 100644 --- a/src/amqqueue.erl +++ b/src/amqqueue.erl @@ -57,9 +57,9 @@ % policy_version get_policy_version/1, set_policy_version/2, - % quorum_nodes - get_quorum_nodes/1, - set_quorum_nodes/2, + % type_state + get_type_state/1, + set_type_state/2, % recoverable_slaves get_recoverable_slaves/1, set_recoverable_slaves/2, @@ -91,6 +91,8 @@ macros/0]). -define(record_version, amqqueue_v2). +-define(is_backwards_compat_classic(T), + (T =:= classic orelse T =:= ?amqqueue_v1_type)). -record(amqqueue, { name :: rabbit_amqqueue:name() | '_', %% immutable @@ -118,8 +120,8 @@ slave_pids_pending_shutdown = [] :: [pid()] | '_', vhost :: rabbit_types:vhost() | undefined | '_', %% secondary index options = #{} :: map() | '_', - type = ?amqqueue_v1_type :: atom() | '_', - quorum_nodes = [] :: [node()] | '_' + type = ?amqqueue_v1_type :: module() | '_', + type_state = #{} :: map() | '_' }). -type amqqueue() :: amqqueue_v1:amqqueue_v1() | amqqueue_v2(). @@ -143,7 +145,7 @@ vhost :: rabbit_types:vhost() | undefined, options :: map(), type :: atom(), - quorum_nodes :: [node()] + type_state :: #{} }. -type ra_server_id() :: {Name :: atom(), Node :: node()}. @@ -170,7 +172,7 @@ vhost :: '_', options :: '_', type :: atom() | '_', - quorum_nodes :: '_' + type_state :: '_' }. -export_type([amqqueue/0, @@ -341,7 +343,7 @@ new_with_version(?record_version, pid = Pid, vhost = VHost, options = Options, - type = Type}; + type = ensure_type_compat(Type)}; new_with_version(Version, Name, Pid, @@ -351,7 +353,8 @@ new_with_version(Version, Args, VHost, Options, - ?amqqueue_v1_type) -> + Type) + when ?is_backwards_compat_classic(Type) -> amqqueue_v1:new_with_version( Version, Name, @@ -451,7 +454,7 @@ set_gm_pids(Queue, GMPids) -> -spec get_leader(amqqueue_v2()) -> node(). -get_leader(#amqqueue{type = quorum, pid = {_, Leader}}) -> Leader. +get_leader(#amqqueue{type = rabbit_quorum_queue, pid = {_, Leader}}) -> Leader. % operator_policy @@ -551,18 +554,16 @@ set_recoverable_slaves(#amqqueue{} = Queue, Slaves) -> set_recoverable_slaves(Queue, Slaves) -> amqqueue_v1:set_recoverable_slaves(Queue, Slaves). -% quorum_nodes (new in v2) - --spec get_quorum_nodes(amqqueue()) -> [node()]. - -get_quorum_nodes(#amqqueue{quorum_nodes = Nodes}) -> Nodes; -get_quorum_nodes(_) -> []. +% type_state (new in v2) --spec set_quorum_nodes(amqqueue(), [node()]) -> amqqueue(). +-spec get_type_state(amqqueue()) -> map(). +get_type_state(#amqqueue{type_state = TState}) -> TState; +get_type_state(_) -> []. -set_quorum_nodes(#amqqueue{} = Queue, Nodes) -> - Queue#amqqueue{quorum_nodes = Nodes}; -set_quorum_nodes(Queue, _Nodes) -> +-spec set_type_state(amqqueue(), map()) -> amqqueue(). +set_type_state(#amqqueue{} = Queue, TState) -> + Queue#amqqueue{type_state = TState}; +set_type_state(Queue, _TState) -> Queue. % slave_pids @@ -660,7 +661,7 @@ is_classic(Queue) -> -spec is_quorum(amqqueue()) -> boolean(). is_quorum(Queue) -> - get_type(Queue) =:= quorum. + get_type(Queue) =:= rabbit_quorum_queue. fields() -> case record_version_to_use() of @@ -697,13 +698,16 @@ pattern_match_on_name(Name) -> pattern_match_on_type(Type) -> case record_version_to_use() of - ?record_version -> #amqqueue{type = Type, _ = '_'}; - _ when Type =:= classic -> amqqueue_v1:pattern_match_all(); + ?record_version -> + #amqqueue{type = Type, _ = '_'}; + _ when ?is_backwards_compat_classic(Type) -> + amqqueue_v1:pattern_match_all(); %% FIXME: We try a pattern which should never match when the %% `quorum_queue` feature flag is not enabled yet. Is there %% a better solution? - _ -> amqqueue_v1:pattern_match_on_name( - rabbit_misc:r(<<0>>, queue, <<0>>)) + _ -> + amqqueue_v1:pattern_match_on_name( + rabbit_misc:r(<<0>>, queue, <<0>>)) end. -spec reset_mirroring_and_decorators(amqqueue()) -> amqqueue(). @@ -757,3 +761,8 @@ macros([Field | Rest], I) -> macros(Rest, I + 1); macros([], _) -> ok. + +ensure_type_compat(classic) -> + ?amqqueue_v1_type; +ensure_type_compat(Type) -> + Type. diff --git a/src/amqqueue_v1.erl b/src/amqqueue_v1.erl index 9760d752f7..43b7ba9e43 100644 --- a/src/amqqueue_v1.erl +++ b/src/amqqueue_v1.erl @@ -57,9 +57,9 @@ % policy_version get_policy_version/1, set_policy_version/2, - % quorum_nodes - get_quorum_nodes/1, - set_quorum_nodes/2, + % type_state + get_type_state/1, + set_type_state/2, % recoverable_slaves get_recoverable_slaves/1, set_recoverable_slaves/2, @@ -93,6 +93,8 @@ -dialyzer({nowarn_function, is_quorum/1}). -define(record_version, ?MODULE). +-define(is_backwards_compat_classic(T), + (T =:= classic orelse T =:= ?amqqueue_v1_type)). -record(amqqueue, { name :: rabbit_amqqueue:name() | '_', %% immutable @@ -214,7 +216,7 @@ new(#resource{kind = queue} = Name, rabbit_framing:amqp_table(), rabbit_types:vhost() | undefined, map(), - ?amqqueue_v1_type) -> amqqueue(). + ?amqqueue_v1_type | classic) -> amqqueue(). new(#resource{kind = queue} = Name, Pid, @@ -224,14 +226,15 @@ new(#resource{kind = queue} = Name, Args, VHost, Options, - ?amqqueue_v1_type) + Type) when (is_pid(Pid) orelse Pid =:= none) andalso is_boolean(Durable) andalso is_boolean(AutoDelete) andalso (is_pid(Owner) orelse Owner =:= none) andalso is_list(Args) andalso (is_binary(VHost) orelse VHost =:= undefined) andalso - is_map(Options) -> + is_map(Options) andalso + ?is_backwards_compat_classic(Type) -> new( Name, Pid, @@ -297,14 +300,15 @@ new_with_version(?record_version, Args, VHost, Options, - ?amqqueue_v1_type) + Type) when (is_pid(Pid) orelse Pid =:= none) andalso is_boolean(Durable) andalso is_boolean(AutoDelete) andalso (is_pid(Owner) orelse Owner =:= none) andalso is_list(Args) andalso (is_binary(VHost) orelse VHost =:= undefined) andalso - is_map(Options) -> + is_map(Options) andalso + ?is_backwards_compat_classic(Type) -> new_with_version( ?record_version, Name, @@ -451,16 +455,16 @@ get_recoverable_slaves(#amqqueue{recoverable_slaves = Slaves}) -> set_recoverable_slaves(#amqqueue{} = Queue, Slaves) -> Queue#amqqueue{recoverable_slaves = Slaves}. -% quorum_nodes (new in v2) +% type_state (new in v2) --spec get_quorum_nodes(amqqueue()) -> no_return(). +-spec get_type_state(amqqueue()) -> no_return(). -get_quorum_nodes(_) -> throw({unsupported, ?record_version, get_quorum_nodes}). +get_type_state(_) -> throw({unsupported, ?record_version, get_type_state}). --spec set_quorum_nodes(amqqueue(), [node()]) -> no_return(). +-spec set_type_state(amqqueue(), [node()]) -> no_return(). -set_quorum_nodes(_, _) -> - throw({unsupported, ?record_version, set_quorum_nodes}). +set_type_state(_, _) -> + throw({unsupported, ?record_version, set_type_state}). % slave_pids @@ -527,8 +531,8 @@ is_classic(Queue) -> -spec is_quorum(amqqueue()) -> boolean(). -is_quorum(Queue) -> - get_type(Queue) =:= quorum. +is_quorum(Queue) when ?is_amqqueue(Queue) -> + false. fields() -> fields(?record_version). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index cbe8738c5a..65691cedd0 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -44,7 +44,8 @@ -export([update_mirroring/1, sync_mirrors/1, cancel_sync_mirrors/1]). -export([emit_unresponsive/6, emit_unresponsive_local/5, is_unresponsive/2]). -export([is_replicated/1, is_dead_exclusive/1]). % Note: exported due to use in qlc expression. --export([list_local_followers/0]). +-export([list_local_followers/0, + get_quorum_nodes/1]). -export([ensure_rabbit_queue_record_is_initialized/1]). -export([format/1]). -export([delete_immediately_by_resource/1]). @@ -192,7 +193,7 @@ find_local_quorum_queues(VHost) -> qlc:e(qlc:q([Q || Q <- mnesia:table(rabbit_durable_queue), amqqueue:get_vhost(Q) =:= VHost, amqqueue:is_quorum(Q) andalso - (lists:member(Node, amqqueue:get_quorum_nodes(Q)))])) + (lists:member(Node, get_quorum_nodes(Q)))])) end). find_local_durable_classic_queues(VHost) -> @@ -225,7 +226,7 @@ find_recoverable_queues() -> %% - if the record is present - in order to restart. (mnesia:read(rabbit_queue, amqqueue:get_name(Q), read) =:= [] orelse not rabbit_mnesia:is_process_alive(amqqueue:get_pid(Q))))) - orelse (amqqueue:is_quorum(Q) andalso lists:member(Node, amqqueue:get_quorum_nodes(Q))) + orelse (amqqueue:is_quorum(Q) andalso lists:member(Node, get_quorum_nodes(Q))) ])) end). @@ -274,7 +275,7 @@ declare(QueueName = #resource{virtual_host = VHost}, Durable, AutoDelete, Args, ok = check_declare_arguments(QueueName, Args), Type = get_queue_type(Args), TypeIsAllowed = - Type =:= classic orelse + Type =:= rabbit_classic_queue orelse rabbit_feature_flags:is_enabled(quorum_queue), case TypeIsAllowed of true -> @@ -325,9 +326,16 @@ declare_classic_queue(Q, Node) -> get_queue_type(Args) -> case rabbit_misc:table_lookup(Args, <<"x-queue-type">>) of undefined -> - classic; + rabbit_classic_queue; {_, V} -> - erlang:binary_to_existing_atom(V, utf8) + %% TODO: this mapping of "friendly" queue type name to the + %% implementing module should be part of some kind of registry + case V of + <<"quorum">> -> + rabbit_quorum_queue; + <<"classic">> -> + rabbit_classic_queue + end end. -spec internal_declare(amqqueue:amqqueue(), boolean()) -> @@ -824,7 +832,7 @@ list_local_followers() -> [ amqqueue:get_name(Q) || Q <- list(), amqqueue:is_quorum(Q), - amqqueue:get_state(Q) =/= crashed, amqqueue:get_leader(Q) =/= node(), lists:member(node(), amqqueue:get_quorum_nodes(Q))]. + amqqueue:get_state(Q) =/= crashed, amqqueue:get_leader(Q) =/= node(), lists:member(node(), get_quorum_nodes(Q))]. is_local_to_node(QPid, Node) when ?IS_CLASSIC(QPid) -> Node =:= node(QPid); @@ -1534,7 +1542,7 @@ forget_all_durable(Node) -> %% recovery. forget_node_for_queue(DeadNode, Q) when ?amqqueue_is_quorum(Q) -> - QN = amqqueue:get_quorum_nodes(Q), + QN = get_quorum_nodes(Q), forget_node_for_queue(DeadNode, QN, Q); forget_node_for_queue(DeadNode, Q) -> RS = amqqueue:get_recoverable_slaves(Q), @@ -1555,9 +1563,11 @@ forget_node_for_queue(DeadNode, [H|T], Q) when ?is_amqqueue(Q) -> Type = amqqueue:get_type(Q), case {node_permits_offline_promotion(H), Type} of {false, _} -> forget_node_for_queue(DeadNode, T, Q); - {true, classic} -> Q1 = amqqueue:set_pid(Q, rabbit_misc:node_to_fake_pid(H)), + {true, rabbit_classic_queue} -> + Q1 = amqqueue:set_pid(Q, rabbit_misc:node_to_fake_pid(H)), ok = mnesia:write(rabbit_durable_queue, Q1, write); - {true, quorum} -> ok + {true, rabbit_quorum_queue} -> + ok end. node_permits_offline_promotion(Node) -> @@ -1755,7 +1765,7 @@ pseudo_queue(#resource{kind = queue} = QueueName, Pid, Durable) [], undefined, % VHost, #{user => undefined}, % ActingUser - classic % Type + rabbit_classic_queue % Type ). -spec immutable(amqqueue:amqqueue()) -> amqqueue:amqqueue(). @@ -1864,3 +1874,11 @@ get_quorum_state({Name, _} = Id, QName, Map) -> get_quorum_state({Name, _}, Map) -> maps:get(Name, Map). + +get_quorum_nodes(Q) when ?is_amqqueue(Q) -> + case amqqueue:get_type_state(Q) of + #{nodes := Nodes} -> + Nodes; + _ -> + [] + end. diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index cf54656e34..b4118b96ac 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -81,7 +81,7 @@ init_state({Name, _}, QName = #resource{}) -> %% know what to do if the queue has `disappeared`. Let it crash. {ok, Q} = rabbit_amqqueue:lookup(QName), Leader = amqqueue:get_pid(Q), - Nodes = amqqueue:get_quorum_nodes(Q), + Nodes = rabbit_amqqueue:get_quorum_nodes(Q), %% Ensure the leader is listed first Servers0 = [{Name, N} || N <- Nodes], Servers = [Leader | lists:delete(Leader, Servers0)], @@ -117,7 +117,7 @@ declare(Q) when ?amqqueue_is_quorum(Q) -> Id = {RaName, node()}, Nodes = select_quorum_nodes(QuorumSize, rabbit_mnesia:cluster_nodes(all)), NewQ0 = amqqueue:set_pid(Q, Id), - NewQ1 = amqqueue:set_quorum_nodes(NewQ0, Nodes), + NewQ1 = amqqueue:set_type_state(NewQ0, #{nodes => Nodes}), case rabbit_amqqueue:internal_declare(NewQ1, false) of {created, NewQ} -> TickTimeout = application:get_env(rabbit, quorum_tick_interval, ?TICK_TIMEOUT), @@ -222,7 +222,7 @@ become_leader(QName, Name) -> end), case rabbit_amqqueue:lookup(QName) of {ok, Q0} when ?is_amqqueue(Q0) -> - Nodes = amqqueue:get_quorum_nodes(Q0), + Nodes = get_nodes(Q0), [rpc:call(Node, ?MODULE, rpc_delete_metrics, [QName], ?RPC_TIMEOUT) || Node <- Nodes, Node =/= node()]; @@ -369,7 +369,7 @@ delete(Q, _IfUnused, _IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) -> {Name, _} = amqqueue:get_pid(Q), QName = amqqueue:get_name(Q), - QNodes = amqqueue:get_quorum_nodes(Q), + QNodes = get_nodes(Q), %% TODO Quorum queue needs to support consumer tracking for IfUnused Timeout = ?DELETE_TIMEOUT, {ok, ReadyMsgs, _} = stat(Q), @@ -600,8 +600,8 @@ cleanup_data_dir() -> {Name, _} = amqqueue:get_pid(Q), Name end - || Q <- rabbit_amqqueue:list_by_type(quorum), - lists:member(node(), amqqueue:get_quorum_nodes(Q))], + || Q <- rabbit_amqqueue:list_by_type(?MODULE), + lists:member(node(), get_nodes(Q))], Registered = ra_directory:list_registered(), _ = [maybe_delete_data_dir(UId) || {Name, UId} <- Registered, not lists:member(Name, Names)], @@ -644,7 +644,7 @@ status(Vhost, QueueName) -> {ok, Q} when ?amqqueue_is_classic(Q) -> {error, classic_queue_not_supported}; {ok, Q} when ?amqqueue_is_quorum(Q) -> - Nodes = amqqueue:get_quorum_nodes(Q), + Nodes = get_nodes(Q), [begin case get_sys_status({RName, N}) of {ok, Sys} -> @@ -696,7 +696,7 @@ add_member(VHost, Name, Node, Timeout) -> {ok, Q} when ?amqqueue_is_classic(Q) -> {error, classic_queue_not_supported}; {ok, Q} when ?amqqueue_is_quorum(Q) -> - QNodes = amqqueue:get_quorum_nodes(Q), + QNodes = get_nodes(Q), case lists:member(Node, rabbit_mnesia:cluster_nodes(running)) of false -> {error, node_not_running}; @@ -727,9 +727,10 @@ add_member(Q, Node, Timeout) when ?amqqueue_is_quorum(Q) -> case ra:add_member(Members, ServerId, Timeout) of {ok, _, Leader} -> Fun = fun(Q1) -> - Q2 = amqqueue:set_quorum_nodes( - Q1, - [Node | amqqueue:get_quorum_nodes(Q1)]), + Q2 = update_type_state( + Q1, fun(#{nodes := Nodes} = Ts) -> + Ts#{nodes => [Node | Nodes]} + end), amqqueue:set_pid(Q2, Leader) end, rabbit_misc:execute_mnesia_transaction( @@ -753,7 +754,7 @@ delete_member(VHost, Name, Node) -> {ok, Q} when ?amqqueue_is_classic(Q) -> {error, classic_queue_not_supported}; {ok, Q} when ?amqqueue_is_quorum(Q) -> - QNodes = amqqueue:get_quorum_nodes(Q), + QNodes = get_nodes(Q), case lists:member(Node, QNodes) of false -> %% idempotent by design @@ -779,10 +780,11 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) -> case ra:leave_and_delete_server(Members, ServerId) of ok -> Fun = fun(Q1) -> - amqqueue:set_quorum_nodes( + update_type_state( Q1, - lists:delete(Node, - amqqueue:get_quorum_nodes(Q1))) + fun(#{nodes := Nodes} = Ts) -> + Ts#{nodes => lists:delete(Node, Nodes)} + end) end, rabbit_misc:execute_mnesia_transaction( fun() -> rabbit_amqqueue:update(QName, Fun) end), @@ -802,7 +804,7 @@ shrink_all(Node) -> QName = amqqueue:get_name(Q), rabbit_log:info("~s: removing member (replica) on node ~w", [rabbit_misc:rs(QName), Node]), - Size = length(amqqueue:get_quorum_nodes(Q)), + Size = length(get_nodes(Q)), case delete_member(Q, Node) of ok -> {QName, {ok, Size-1}}; @@ -812,8 +814,8 @@ shrink_all(Node) -> {QName, {error, Size, Err}} end end || Q <- rabbit_amqqueue:list(), - amqqueue:get_type(Q) == quorum, - lists:member(Node, amqqueue:get_quorum_nodes(Q))]. + amqqueue:get_type(Q) == ?MODULE, + lists:member(Node, get_nodes(Q))]. -spec grow(node(), binary(), binary(), all | even) -> [{rabbit_amqqueue:name(), @@ -821,7 +823,7 @@ shrink_all(Node) -> grow(Node, VhostSpec, QueueSpec, Strategy) -> Running = rabbit_mnesia:cluster_nodes(running), [begin - Size = length(amqqueue:get_quorum_nodes(Q)), + Size = length(get_nodes(Q)), QName = amqqueue:get_name(Q), rabbit_log:info("~s: adding a new member (replica) on node ~w", [rabbit_misc:rs(QName), Node]), @@ -836,12 +838,12 @@ grow(Node, VhostSpec, QueueSpec, Strategy) -> end end || Q <- rabbit_amqqueue:list(), - amqqueue:get_type(Q) == quorum, + amqqueue:get_type(Q) == ?MODULE, %% don't add a member if there is already one on the node - not lists:member(Node, amqqueue:get_quorum_nodes(Q)), + not lists:member(Node, get_nodes(Q)), %% node needs to be running lists:member(Node, Running), - matches_strategy(Strategy, amqqueue:get_quorum_nodes(Q)), + matches_strategy(Strategy, get_nodes(Q)), is_match(amqqueue:get_vhost(Q), VhostSpec) andalso is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec) ]. @@ -989,13 +991,13 @@ i(garbage_collection, Q) when ?is_amqqueue(Q) -> [] end; i(members, Q) when ?is_amqqueue(Q) -> - amqqueue:get_quorum_nodes(Q); + get_nodes(Q); i(online, Q) -> online(Q); i(leader, Q) -> leader(Q); i(open_files, Q) when ?is_amqqueue(Q) -> {Name, _} = amqqueue:get_pid(Q), - Nodes = amqqueue:get_quorum_nodes(Q), - {Data, _} = rpc:multicall(Nodes, rabbit_quorum_queue, open_files, [Name]), + Nodes = get_nodes(Q), + {Data, _} = rpc:multicall(Nodes, ?MODULE, open_files, [Name]), lists:flatten(Data); i(single_active_consumer_pid, Q) when ?is_amqqueue(Q) -> QPid = amqqueue:get_pid(Q), @@ -1047,12 +1049,12 @@ leader(Q) when ?is_amqqueue(Q) -> end. online(Q) when ?is_amqqueue(Q) -> - Nodes = amqqueue:get_quorum_nodes(Q), + Nodes = get_nodes(Q), {Name, _} = amqqueue:get_pid(Q), [Node || Node <- Nodes, is_process_alive(Name, Node)]. format(Q) when ?is_amqqueue(Q) -> - Nodes = amqqueue:get_quorum_nodes(Q), + Nodes = get_nodes(Q), [{members, Nodes}, {online, online(Q)}, {leader, leader(Q)}]. is_process_alive(Name, Node) -> @@ -1144,7 +1146,7 @@ select_quorum_nodes(Size, Rest, Selected) -> %% member with the current leader first members(Q) when ?amqqueue_is_quorum(Q) -> {RaName, LeaderNode} = amqqueue:get_pid(Q), - Nodes = lists:delete(LeaderNode, amqqueue:get_quorum_nodes(Q)), + Nodes = lists:delete(LeaderNode, get_nodes(Q)), [{RaName, N} || N <- [LeaderNode | Nodes]]. make_ra_conf(Q, ServerId, TickTimeout) -> @@ -1163,3 +1165,10 @@ make_ra_conf(Q, ServerId, TickTimeout) -> tick_timeout => TickTimeout, machine => RaMachine}. +get_nodes(Q) when ?is_amqqueue(Q) -> + #{nodes := Nodes} = amqqueue:get_type_state(Q), + Nodes. + +update_type_state(Q, Fun) when ?is_amqqueue(Q) -> + Ts = amqqueue:get_type_state(Q), + amqqueue:set_type_state(Q, Fun(Ts)). diff --git a/test/amqqueue_backward_compatibility_SUITE.erl b/test/amqqueue_backward_compatibility_SUITE.erl index 05a049c9bb..a02c4721bc 100644 --- a/test/amqqueue_backward_compatibility_SUITE.erl +++ b/test/amqqueue_backward_compatibility_SUITE.erl @@ -104,7 +104,7 @@ new_amqqueue_v2_is_amqqueue(_) -> [], VHost, #{}, - classic), + rabbit_classic_queue), ?assert(?is_amqqueue(Queue)), ?assert(?is_amqqueue_v2(Queue)), ?assert(not ?is_amqqueue_v1(Queue)), @@ -253,7 +253,7 @@ amqqueue_v2_type_matching(_) -> [], VHost, #{}, - classic), + rabbit_classic_queue), ?assert(?amqqueue_is_classic(ClassicQueue)), ?assert(amqqueue:is_classic(ClassicQueue)), ?assert(not ?amqqueue_is_quorum(ClassicQueue)), @@ -267,7 +267,7 @@ amqqueue_v2_type_matching(_) -> [], VHost, #{}, - quorum), + rabbit_quorum_queue), ?assert(not ?amqqueue_is_classic(QuorumQueue)), ?assert(not amqqueue:is_classic(QuorumQueue)), ?assert(?amqqueue_is_quorum(QuorumQueue)), diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index cc0e7aa75a..7ef38895eb 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -264,15 +264,15 @@ declare_args(Config) -> declare(Ch, LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, {<<"x-max-length">>, long, 2000}, {<<"x-max-length-bytes">>, long, 2000}]), - assert_queue_type(Server, LQ, quorum), + assert_queue_type(Server, LQ, rabbit_quorum_queue), DQ = <<"classic-declare-args-q">>, declare(Ch, DQ, [{<<"x-queue-type">>, longstr, <<"classic">>}]), - assert_queue_type(Server, DQ, classic), + assert_queue_type(Server, DQ, rabbit_classic_queue), DQ2 = <<"classic-q2">>, declare(Ch, DQ2), - assert_queue_type(Server, DQ2, classic). + assert_queue_type(Server, DQ2, rabbit_classic_queue). declare_invalid_properties(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), |