diff options
author | Jean-Sébastien Pédron <jean-sebastien@rabbitmq.com> | 2022-08-04 14:29:08 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-08-04 14:29:08 +0200 |
commit | 38b9ada725fd163956dbc013c939c3d87400b244 (patch) | |
tree | 2072f675eec9974b7b6199e23973eb57d30737d8 | |
parent | 659e2c1b7d1a412303c2565d18107fa73c1e81cb (diff) | |
parent | f47f88e04a67ad97eb82406e77bd4f988b6e5ec3 (diff) | |
download | rabbitmq-server-git-38b9ada725fd163956dbc013c939c3d87400b244.tar.gz |
Merge pull request #5436 from rabbitmq/mergify/bp/v3.11.x/pr-5235
Remove pre-quorum-queue compatibility code (backport #5235)
33 files changed, 250 insertions, 1481 deletions
diff --git a/deps/rabbit/docs/rabbitmqctl.8 b/deps/rabbit/docs/rabbitmqctl.8 index 8cda319aa9..3d93c09bc1 100644 --- a/deps/rabbit/docs/rabbitmqctl.8 +++ b/deps/rabbit/docs/rabbitmqctl.8 @@ -2045,7 +2045,7 @@ Enables a feature flag on the target node. .Pp Example: .Sp -.Dl rabbitmqctl enable_feature_flag quorum_queue +.Dl rabbitmqctl enable_feature_flag stream_queue .Pp You can also enable all feature flags by specifying "all": .Sp diff --git a/deps/rabbit/include/amqqueue.hrl b/deps/rabbit/include/amqqueue.hrl index 097f1dfa0c..2c5de677a1 100644 --- a/deps/rabbit/include/amqqueue.hrl +++ b/deps/rabbit/include/amqqueue.hrl @@ -2,131 +2,72 @@ %% License, v. 2.0. If a copy of the MPL was not distributed with this %% file, You can obtain one at https://mozilla.org/MPL/2.0/. %% -%% Copyright (c) 2018-2020 VMware, Inc. or its affiliates. All rights reserved. +%% Copyright (c) 2018-2022 VMware, Inc. or its affiliates. All rights reserved. %% --include("amqqueue_v1.hrl"). -include("amqqueue_v2.hrl"). -define(is_amqqueue(Q), - (?is_amqqueue_v2(Q) orelse - ?is_amqqueue_v1(Q))). + (?is_amqqueue_v2(Q))). -define(amqqueue_is_auto_delete(Q), - ((?is_amqqueue_v2(Q) andalso - ?amqqueue_v2_field_auto_delete(Q) =:= true) orelse - (?is_amqqueue_v1(Q) andalso - ?amqqueue_v1_field_auto_delete(Q) =:= true))). + (?is_amqqueue_v2(Q) andalso + ?amqqueue_v2_field_auto_delete(Q) =:= true)). -define(amqqueue_is_durable(Q), - ((?is_amqqueue_v2(Q) andalso - ?amqqueue_v2_field_durable(Q) =:= true) orelse - (?is_amqqueue_v1(Q) andalso - ?amqqueue_v1_field_durable(Q) =:= true))). + (?is_amqqueue_v2(Q) andalso + ?amqqueue_v2_field_durable(Q) =:= true)). -define(amqqueue_exclusive_owner_is(Q, Owner), - ((?is_amqqueue_v2(Q) andalso - ?amqqueue_v2_field_exclusive_owner(Q) =:= Owner) orelse - (?is_amqqueue_v1(Q) andalso - ?amqqueue_v1_field_exclusive_owner(Q) =:= Owner))). + (?is_amqqueue_v2(Q) andalso + ?amqqueue_v2_field_exclusive_owner(Q) =:= Owner)). -define(amqqueue_exclusive_owner_is_pid(Q), - ((?is_amqqueue_v2(Q) andalso - is_pid(?amqqueue_v2_field_exclusive_owner(Q))) orelse - (?is_amqqueue_v1(Q) andalso - is_pid(?amqqueue_v1_field_exclusive_owner(Q))))). + (?is_amqqueue_v2(Q) andalso + is_pid(?amqqueue_v2_field_exclusive_owner(Q)))). -define(amqqueue_state_is(Q, State), - ((?is_amqqueue_v2(Q) andalso - ?amqqueue_v2_field_state(Q) =:= State) orelse - (?is_amqqueue_v1(Q) andalso - ?amqqueue_v1_field_state(Q) =:= State))). + (?is_amqqueue_v2(Q) andalso + ?amqqueue_v2_field_state(Q) =:= State)). -define(amqqueue_v1_type, rabbit_classic_queue). -define(amqqueue_is_classic(Q), - ((?is_amqqueue_v2(Q) andalso - ?amqqueue_v2_field_type(Q) =:= rabbit_classic_queue) orelse - ?is_amqqueue_v1(Q))). + (?is_amqqueue_v2(Q) andalso + ?amqqueue_v2_field_type(Q) =:= rabbit_classic_queue)). -define(amqqueue_is_quorum(Q), (?is_amqqueue_v2(Q) andalso - ?amqqueue_v2_field_type(Q) =:= rabbit_quorum_queue) orelse - false). + ?amqqueue_v2_field_type(Q) =:= rabbit_quorum_queue)). -define(amqqueue_is_stream(Q), (?is_amqqueue_v2(Q) andalso - ?amqqueue_v2_field_type(Q) =:= rabbit_stream_queue) orelse - false). + ?amqqueue_v2_field_type(Q) =:= rabbit_stream_queue)). -define(amqqueue_has_valid_pid(Q), - ((?is_amqqueue_v2(Q) andalso - is_pid(?amqqueue_v2_field_pid(Q))) orelse - (?is_amqqueue_v1(Q) andalso - is_pid(?amqqueue_v1_field_pid(Q))))). + (?is_amqqueue_v2(Q) andalso + is_pid(?amqqueue_v2_field_pid(Q)))). -define(amqqueue_pid_runs_on_local_node(Q), - ((?is_amqqueue_v2(Q) andalso - node(?amqqueue_v2_field_pid(Q)) =:= node()) orelse - (?is_amqqueue_v1(Q) andalso - node(?amqqueue_v1_field_pid(Q)) =:= node()))). + (?is_amqqueue_v2(Q) andalso + node(?amqqueue_v2_field_pid(Q)) =:= node())). -define(amqqueue_pid_equals(Q, Pid), - ((?is_amqqueue_v2(Q) andalso - ?amqqueue_v2_field_pid(Q) =:= Pid) orelse - (?is_amqqueue_v1(Q) andalso - ?amqqueue_v1_field_pid(Q) =:= Pid))). + (?is_amqqueue_v2(Q) andalso + ?amqqueue_v2_field_pid(Q) =:= Pid)). -define(amqqueue_pids_are_equal(Q0, Q1), - ((?is_amqqueue_v2(Q0) andalso ?is_amqqueue_v2(Q1) andalso - ?amqqueue_v2_field_pid(Q0) =:= ?amqqueue_v2_field_pid(Q1)) orelse - (?is_amqqueue_v1(Q0) andalso ?is_amqqueue_v1(Q1) andalso - ?amqqueue_v1_field_pid(Q0) =:= ?amqqueue_v1_field_pid(Q1)))). + (?is_amqqueue_v2(Q0) andalso ?is_amqqueue_v2(Q1) andalso + ?amqqueue_v2_field_pid(Q0) =:= ?amqqueue_v2_field_pid(Q1))). -define(amqqueue_field_name(Q), - case ?is_amqqueue_v2(Q) of - true -> ?amqqueue_v2_field_name(Q); - false -> case ?is_amqqueue_v1(Q) of - true -> ?amqqueue_v1_field_name(Q) - end - end). + ?amqqueue_v2_field_name(Q)). -define(amqqueue_field_pid(Q), - case ?is_amqqueue_v2(Q) of - true -> ?amqqueue_v2_field_pid(Q); - false -> case ?is_amqqueue_v1(Q) of - true -> ?amqqueue_v1_field_pid(Q) - end - end). - --define(amqqueue_v1_vhost(Q), element(2, ?amqqueue_v1_field_name(Q))). + ?amqqueue_v2_field_pid(Q)). + -define(amqqueue_v2_vhost(Q), element(2, ?amqqueue_v2_field_name(Q))). -define(amqqueue_vhost_equals(Q, VHost), - ((?is_amqqueue_v2(Q) andalso - ?amqqueue_v2_vhost(Q) =:= VHost) orelse - (?is_amqqueue_v1(Q) andalso - ?amqqueue_v1_vhost(Q) =:= VHost))). - --ifdef(DEBUG_QUORUM_QUEUE_FF). --define(enable_quorum_queue_if_debug, - begin - rabbit_log:info( - "---- ENABLING quorum_queue as part of " - "?try_mnesia_tx_or_upgrade_amqqueue_and_retry() ----"), - ok = rabbit_feature_flags:enable(quorum_queue) - end). --else. --define(enable_quorum_queue_if_debug, noop). --endif. - --define(try_mnesia_tx_or_upgrade_amqqueue_and_retry(Expr1, Expr2), - try - ?enable_quorum_queue_if_debug, - Expr1 - catch - throw:{error, {bad_type, T}} when ?is_amqqueue(T) -> - Expr2; - throw:{aborted, {bad_type, T}} when ?is_amqqueue(T) -> - Expr2 - end). + (?is_amqqueue_v2(Q) andalso + ?amqqueue_v2_vhost(Q) =:= VHost)). diff --git a/deps/rabbit/include/amqqueue_v1.hrl b/deps/rabbit/include/amqqueue_v1.hrl deleted file mode 100644 index 04b2d72850..0000000000 --- a/deps/rabbit/include/amqqueue_v1.hrl +++ /dev/null @@ -1,20 +0,0 @@ --define(is_amqqueue_v1(Q), is_record(Q, amqqueue, 19)). - --define(amqqueue_v1_field_name(Q), element(2, Q)). --define(amqqueue_v1_field_durable(Q), element(3, Q)). --define(amqqueue_v1_field_auto_delete(Q), element(4, Q)). --define(amqqueue_v1_field_exclusive_owner(Q), element(5, Q)). --define(amqqueue_v1_field_arguments(Q), element(6, Q)). --define(amqqueue_v1_field_pid(Q), element(7, Q)). --define(amqqueue_v1_field_slave_pids(Q), element(8, Q)). --define(amqqueue_v1_field_sync_slave_pids(Q), element(9, Q)). --define(amqqueue_v1_field_recoverable_slaves(Q), element(10, Q)). --define(amqqueue_v1_field_policy(Q), element(11, Q)). --define(amqqueue_v1_field_operator_policy(Q), element(12, Q)). --define(amqqueue_v1_field_gm_pids(Q), element(13, Q)). --define(amqqueue_v1_field_decorators(Q), element(14, Q)). --define(amqqueue_v1_field_state(Q), element(15, Q)). --define(amqqueue_v1_field_policy_version(Q), element(16, Q)). --define(amqqueue_v1_field_slave_pids_pending_shutdown(Q), element(17, Q)). --define(amqqueue_v1_field_vhost(Q), element(18, Q)). --define(amqqueue_v1_field_options(Q), element(19, Q)). diff --git a/deps/rabbit/src/amqqueue.erl b/deps/rabbit/src/amqqueue.erl index a828a22a8d..f69eaa3147 100644 --- a/deps/rabbit/src/amqqueue.erl +++ b/deps/rabbit/src/amqqueue.erl @@ -117,7 +117,7 @@ type_state = #{} :: map() | '_' }). --type amqqueue() :: amqqueue_v1:amqqueue_v1() | amqqueue_v2(). +-type amqqueue() :: amqqueue_v2(). -type amqqueue_v2() :: #amqqueue{ name :: rabbit_amqqueue:name(), durable :: boolean(), @@ -143,8 +143,7 @@ -type ra_server_id() :: {Name :: atom(), Node :: node()}. --type amqqueue_pattern() :: amqqueue_v1:amqqueue_v1_pattern() | - amqqueue_v2_pattern(). +-type amqqueue_pattern() :: amqqueue_v2_pattern(). -type amqqueue_v2_pattern() :: #amqqueue{ name :: rabbit_amqqueue:name() | '_', durable :: '_', @@ -235,34 +234,20 @@ new(#resource{kind = queue} = Name, (is_binary(VHost) orelse VHost =:= undefined) andalso is_map(Options) andalso is_atom(Type) -> - case record_version_to_use() of - ?record_version -> - new_with_version( - ?record_version, - Name, - Pid, - Durable, - AutoDelete, - Owner, - Args, - VHost, - Options, - Type); - _ -> - amqqueue_v1:new( - Name, - Pid, - Durable, - AutoDelete, - Owner, - Args, - VHost, - Options, - Type) - end. + new_with_version( + ?record_version, + Name, + Pid, + Durable, + AutoDelete, + Owner, + Args, + VHost, + Options, + Type). -spec new_with_version -(amqqueue_v1 | amqqueue_v2, +(amqqueue_v2, rabbit_amqqueue:name(), pid() | ra_server_id() | none, boolean(), @@ -300,7 +285,7 @@ new_with_version(RecordVersion, ?amqqueue_v1_type). -spec new_with_version -(amqqueue_v1 | amqqueue_v2, +(amqqueue_v2, rabbit_amqqueue:name(), pid() | ra_server_id() | none, boolean(), @@ -337,114 +322,66 @@ new_with_version(?record_version, pid = Pid, vhost = VHost, options = Options, - type = ensure_type_compat(Type)}; -new_with_version(Version, - Name, - Pid, - Durable, - AutoDelete, - Owner, - Args, - VHost, - Options, - Type) - when ?is_backwards_compat_classic(Type) -> - amqqueue_v1:new_with_version( - Version, - Name, - Pid, - Durable, - AutoDelete, - Owner, - Args, - VHost, - Options). + type = ensure_type_compat(Type)}. -spec is_amqqueue(any()) -> boolean(). -is_amqqueue(#amqqueue{}) -> true; -is_amqqueue(Queue) -> amqqueue_v1:is_amqqueue(Queue). +is_amqqueue(#amqqueue{}) -> true. --spec record_version_to_use() -> amqqueue_v1 | amqqueue_v2. +-spec record_version_to_use() -> amqqueue_v2. record_version_to_use() -> - case rabbit_feature_flags:is_enabled(quorum_queue) of - true -> ?record_version; - false -> amqqueue_v1:record_version_to_use() - end. + ?record_version. -spec upgrade(amqqueue()) -> amqqueue(). -upgrade(#amqqueue{} = Queue) -> Queue; -upgrade(OldQueue) -> upgrade_to(record_version_to_use(), OldQueue). +upgrade(#amqqueue{} = Queue) -> Queue. --spec upgrade_to -(amqqueue_v2, amqqueue()) -> amqqueue_v2(); -(amqqueue_v1, amqqueue_v1:amqqueue_v1()) -> amqqueue_v1:amqqueue_v1(). +-spec upgrade_to(amqqueue_v2, amqqueue()) -> amqqueue_v2(). upgrade_to(?record_version, #amqqueue{} = Queue) -> - Queue; -upgrade_to(?record_version, OldQueue) -> - Fields = erlang:tuple_to_list(OldQueue) ++ [?amqqueue_v1_type, - undefined], - #amqqueue{} = erlang:list_to_tuple(Fields); -upgrade_to(Version, OldQueue) -> - amqqueue_v1:upgrade_to(Version, OldQueue). + Queue. % arguments -spec get_arguments(amqqueue()) -> rabbit_framing:amqp_table(). get_arguments(#amqqueue{arguments = Args}) -> - Args; -get_arguments(Queue) -> - amqqueue_v1:get_arguments(Queue). + Args. -spec set_arguments(amqqueue(), rabbit_framing:amqp_table()) -> amqqueue(). set_arguments(#amqqueue{} = Queue, Args) -> - Queue#amqqueue{arguments = Args}; -set_arguments(Queue, Args) -> - amqqueue_v1:set_arguments(Queue, Args). + Queue#amqqueue{arguments = Args}. % decorators -spec get_decorators(amqqueue()) -> [atom()] | none | undefined. get_decorators(#amqqueue{decorators = Decorators}) -> - Decorators; -get_decorators(Queue) -> - amqqueue_v1:get_decorators(Queue). + Decorators. -spec set_decorators(amqqueue(), [atom()] | none | undefined) -> amqqueue(). set_decorators(#amqqueue{} = Queue, Decorators) -> - Queue#amqqueue{decorators = Decorators}; -set_decorators(Queue, Decorators) -> - amqqueue_v1:set_decorators(Queue, Decorators). + Queue#amqqueue{decorators = Decorators}. -spec get_exclusive_owner(amqqueue()) -> pid() | none. get_exclusive_owner(#amqqueue{exclusive_owner = Owner}) -> - Owner; -get_exclusive_owner(Queue) -> - amqqueue_v1:get_exclusive_owner(Queue). + Owner. % gm_pids -spec get_gm_pids(amqqueue()) -> [{pid(), pid()}] | none. get_gm_pids(#amqqueue{gm_pids = GMPids}) -> - GMPids; -get_gm_pids(Queue) -> - amqqueue_v1:get_gm_pids(Queue). + GMPids. -spec set_gm_pids(amqqueue(), [{pid(), pid()}] | none) -> amqqueue(). set_gm_pids(#amqqueue{} = Queue, GMPids) -> - Queue#amqqueue{gm_pids = GMPids}; -set_gm_pids(Queue, GMPids) -> - amqqueue_v1:set_gm_pids(Queue, GMPids). + Queue#amqqueue{gm_pids = GMPids}. -spec get_leader(amqqueue_v2()) -> node(). @@ -454,106 +391,79 @@ get_leader(#amqqueue{type = rabbit_quorum_queue, pid = {_, Leader}}) -> Leader. -spec get_operator_policy(amqqueue()) -> binary() | none | undefined. -get_operator_policy(#amqqueue{operator_policy = OpPolicy}) -> OpPolicy; -get_operator_policy(Queue) -> amqqueue_v1:get_operator_policy(Queue). +get_operator_policy(#amqqueue{operator_policy = OpPolicy}) -> OpPolicy. -spec set_operator_policy(amqqueue(), binary() | none | undefined) -> amqqueue(). set_operator_policy(#amqqueue{} = Queue, Policy) -> - Queue#amqqueue{operator_policy = Policy}; -set_operator_policy(Queue, Policy) -> - amqqueue_v1:set_operator_policy(Queue, Policy). + Queue#amqqueue{operator_policy = Policy}. % name -spec get_name(amqqueue()) -> rabbit_amqqueue:name(). -get_name(#amqqueue{name = Name}) -> Name; -get_name(Queue) -> amqqueue_v1:get_name(Queue). +get_name(#amqqueue{name = Name}) -> Name. -spec set_name(amqqueue(), rabbit_amqqueue:name()) -> amqqueue(). set_name(#amqqueue{} = Queue, Name) -> - Queue#amqqueue{name = Name}; -set_name(Queue, Name) -> - amqqueue_v1:set_name(Queue, Name). + Queue#amqqueue{name = Name}. -spec get_options(amqqueue()) -> map(). -get_options(#amqqueue{options = Options}) -> Options; -get_options(Queue) -> amqqueue_v1:get_options(Queue). +get_options(#amqqueue{options = Options}) -> Options. -spec set_options(amqqueue(), map()) -> amqqueue(). set_options(#amqqueue{} = Queue, Options) -> - Queue#amqqueue{options = Options}; -set_options(Queue, Options) -> - amqqueue_v1:set_options(Queue, Options). + Queue#amqqueue{options = Options}. % pid --spec get_pid -(amqqueue_v2()) -> pid() | ra_server_id() | none; -(amqqueue_v1:amqqueue_v1()) -> pid() | none. +-spec get_pid(amqqueue_v2()) -> pid() | ra_server_id() | none. -get_pid(#amqqueue{pid = Pid}) -> Pid; -get_pid(Queue) -> amqqueue_v1:get_pid(Queue). +get_pid(#amqqueue{pid = Pid}) -> Pid. --spec set_pid -(amqqueue_v2(), pid() | ra_server_id() | none) -> amqqueue_v2(); -(amqqueue_v1:amqqueue_v1(), pid() | none) -> amqqueue_v1:amqqueue_v1(). +-spec set_pid(amqqueue_v2(), pid() | ra_server_id() | none) -> amqqueue_v2(). set_pid(#amqqueue{} = Queue, Pid) -> - Queue#amqqueue{pid = Pid}; -set_pid(Queue, Pid) -> - amqqueue_v1:set_pid(Queue, Pid). + Queue#amqqueue{pid = Pid}. % policy -spec get_policy(amqqueue()) -> proplists:proplist() | none | undefined. -get_policy(#amqqueue{policy = Policy}) -> Policy; -get_policy(Queue) -> amqqueue_v1:get_policy(Queue). +get_policy(#amqqueue{policy = Policy}) -> Policy. -spec set_policy(amqqueue(), binary() | none | undefined) -> amqqueue(). set_policy(#amqqueue{} = Queue, Policy) -> - Queue#amqqueue{policy = Policy}; -set_policy(Queue, Policy) -> - amqqueue_v1:set_policy(Queue, Policy). + Queue#amqqueue{policy = Policy}. % policy_version -spec get_policy_version(amqqueue()) -> non_neg_integer(). get_policy_version(#amqqueue{policy_version = PV}) -> - PV; -get_policy_version(Queue) -> - amqqueue_v1:get_policy_version(Queue). + PV. -spec set_policy_version(amqqueue(), non_neg_integer()) -> amqqueue(). set_policy_version(#amqqueue{} = Queue, PV) -> - Queue#amqqueue{policy_version = PV}; -set_policy_version(Queue, PV) -> - amqqueue_v1:set_policy_version(Queue, PV). + Queue#amqqueue{policy_version = PV}. % recoverable_slaves -spec get_recoverable_slaves(amqqueue()) -> [atom()] | none. get_recoverable_slaves(#amqqueue{recoverable_slaves = Slaves}) -> - Slaves; -get_recoverable_slaves(Queue) -> - amqqueue_v1:get_recoverable_slaves(Queue). + Slaves. -spec set_recoverable_slaves(amqqueue(), [atom()] | none) -> amqqueue(). set_recoverable_slaves(#amqqueue{} = Queue, Slaves) -> - Queue#amqqueue{recoverable_slaves = Slaves}; -set_recoverable_slaves(Queue, Slaves) -> - amqqueue_v1:set_recoverable_slaves(Queue, Slaves). + Queue#amqqueue{recoverable_slaves = Slaves}. % type_state (new in v2) @@ -574,16 +484,12 @@ set_type_state(Queue, _TState) -> -spec get_slave_pids(amqqueue()) -> [pid()] | none. get_slave_pids(#amqqueue{slave_pids = Slaves}) -> - Slaves; -get_slave_pids(Queue) -> - amqqueue_v1:get_slave_pids(Queue). + Slaves. -spec set_slave_pids(amqqueue(), [pid()] | none) -> amqqueue(). set_slave_pids(#amqqueue{} = Queue, SlavePids) -> - Queue#amqqueue{slave_pids = SlavePids}; -set_slave_pids(Queue, SlavePids) -> - amqqueue_v1:set_slave_pids(Queue, SlavePids). + Queue#amqqueue{slave_pids = SlavePids}. % slave_pids_pending_shutdown @@ -591,70 +497,54 @@ set_slave_pids(Queue, SlavePids) -> get_slave_pids_pending_shutdown( #amqqueue{slave_pids_pending_shutdown = Slaves}) -> - Slaves; -get_slave_pids_pending_shutdown(Queue) -> - amqqueue_v1:get_slave_pids_pending_shutdown(Queue). + Slaves. -spec set_slave_pids_pending_shutdown(amqqueue(), [pid()]) -> amqqueue(). set_slave_pids_pending_shutdown(#amqqueue{} = Queue, SlavePids) -> - Queue#amqqueue{slave_pids_pending_shutdown = SlavePids}; -set_slave_pids_pending_shutdown(Queue, SlavePids) -> - amqqueue_v1:set_slave_pids_pending_shutdown(Queue, SlavePids). + Queue#amqqueue{slave_pids_pending_shutdown = SlavePids}. % state -spec get_state(amqqueue()) -> atom() | none. -get_state(#amqqueue{state = State}) -> State; -get_state(Queue) -> amqqueue_v1:get_state(Queue). +get_state(#amqqueue{state = State}) -> State. -spec set_state(amqqueue(), atom() | none) -> amqqueue(). set_state(#amqqueue{} = Queue, State) -> - Queue#amqqueue{state = State}; -set_state(Queue, State) -> - amqqueue_v1:set_state(Queue, State). + Queue#amqqueue{state = State}. % sync_slave_pids -spec get_sync_slave_pids(amqqueue()) -> [pid()] | none. get_sync_slave_pids(#amqqueue{sync_slave_pids = Pids}) -> - Pids; -get_sync_slave_pids(Queue) -> - amqqueue_v1:get_sync_slave_pids(Queue). + Pids. -spec set_sync_slave_pids(amqqueue(), [pid()] | none) -> amqqueue(). set_sync_slave_pids(#amqqueue{} = Queue, Pids) -> - Queue#amqqueue{sync_slave_pids = Pids}; -set_sync_slave_pids(Queue, Pids) -> - amqqueue_v1:set_sync_slave_pids(Queue, Pids). + Queue#amqqueue{sync_slave_pids = Pids}. %% New in v2. -spec get_type(amqqueue()) -> atom(). -get_type(#amqqueue{type = Type}) -> Type; -get_type(Queue) when ?is_amqqueue(Queue) -> ?amqqueue_v1_type. +get_type(#amqqueue{type = Type}) -> Type. -spec get_vhost(amqqueue()) -> rabbit_types:vhost() | undefined. -get_vhost(#amqqueue{vhost = VHost}) -> VHost; -get_vhost(Queue) -> amqqueue_v1:get_vhost(Queue). +get_vhost(#amqqueue{vhost = VHost}) -> VHost. -spec is_auto_delete(amqqueue()) -> boolean(). is_auto_delete(#amqqueue{auto_delete = AutoDelete}) -> - AutoDelete; -is_auto_delete(Queue) -> - amqqueue_v1:is_auto_delete(Queue). + AutoDelete. -spec is_durable(amqqueue()) -> boolean(). -is_durable(#amqqueue{durable = Durable}) -> Durable; -is_durable(Queue) -> amqqueue_v1:is_durable(Queue). +is_durable(#amqqueue{durable = Durable}) -> Durable. -spec is_classic(amqqueue()) -> boolean(). @@ -667,51 +557,27 @@ is_quorum(Queue) -> get_type(Queue) =:= rabbit_quorum_queue. fields() -> - case record_version_to_use() of - ?record_version -> fields(?record_version); - _ -> amqqueue_v1:fields() - end. + fields(?record_version). -fields(?record_version) -> record_info(fields, amqqueue); -fields(Version) -> amqqueue_v1:fields(Version). +fields(?record_version) -> record_info(fields, amqqueue). field_vhost() -> - case record_version_to_use() of - ?record_version -> #amqqueue.vhost; - _ -> amqqueue_v1:field_vhost() - end. + #amqqueue.vhost. -spec pattern_match_all() -> amqqueue_pattern(). pattern_match_all() -> - case record_version_to_use() of - ?record_version -> #amqqueue{_ = '_'}; - _ -> amqqueue_v1:pattern_match_all() - end. + #amqqueue{_ = '_'}. -spec pattern_match_on_name(rabbit_amqqueue:name()) -> amqqueue_pattern(). pattern_match_on_name(Name) -> - case record_version_to_use() of - ?record_version -> #amqqueue{name = Name, _ = '_'}; - _ -> amqqueue_v1:pattern_match_on_name(Name) - end. + #amqqueue{name = Name, _ = '_'}. -spec pattern_match_on_type(atom()) -> amqqueue_pattern(). pattern_match_on_type(Type) -> - case record_version_to_use() of - ?record_version -> - #amqqueue{type = Type, _ = '_'}; - _ when ?is_backwards_compat_classic(Type) -> - amqqueue_v1:pattern_match_all(); - %% FIXME: We try a pattern which should never match when the - %% `quorum_queue` feature flag is not enabled yet. Is there - %% a better solution? - _ -> - amqqueue_v1:pattern_match_on_name( - rabbit_misc:r(<<0>>, queue, <<0>>)) - end. + #amqqueue{type = Type, _ = '_'}. -spec reset_mirroring_and_decorators(amqqueue()) -> amqqueue(). @@ -719,9 +585,7 @@ reset_mirroring_and_decorators(#amqqueue{} = Queue) -> Queue#amqqueue{slave_pids = [], sync_slave_pids = [], gm_pids = [], - decorators = undefined}; -reset_mirroring_and_decorators(Queue) -> - amqqueue_v1:reset_mirroring_and_decorators(Queue). + decorators = undefined}. -spec set_immutable(amqqueue()) -> amqqueue(). @@ -733,9 +597,7 @@ set_immutable(#amqqueue{} = Queue) -> gm_pids = none, policy = none, decorators = none, - state = none}; -set_immutable(Queue) -> - amqqueue_v1:set_immutable(Queue). + state = none}. -spec qnode(amqqueue() | pid() | ra_server_id()) -> node(). diff --git a/deps/rabbit/src/amqqueue_v1.erl b/deps/rabbit/src/amqqueue_v1.erl deleted file mode 100644 index e6daef0421..0000000000 --- a/deps/rabbit/src/amqqueue_v1.erl +++ /dev/null @@ -1,593 +0,0 @@ -%% This Source Code Form is subject to the terms of the Mozilla Public -%% License, v. 2.0. If a copy of the MPL was not distributed with this -%% file, You can obtain one at https://mozilla.org/MPL/2.0/. -%% -%% Copyright (c) 2018-2022 VMware, Inc. or its affiliates. All rights reserved. -%% - --module(amqqueue_v1). - --include_lib("rabbit_common/include/resource.hrl"). --include("amqqueue.hrl"). - --export([new/8, - new/9, - new_with_version/9, - new_with_version/10, - fields/0, - fields/1, - field_vhost/0, - record_version_to_use/0, - upgrade/1, - upgrade_to/2, - % arguments - get_arguments/1, - set_arguments/2, - % decorators - get_decorators/1, - set_decorators/2, - % exclusive_owner - get_exclusive_owner/1, - % gm_pids - get_gm_pids/1, - set_gm_pids/2, - get_leader/1, - % name (#resource) - get_name/1, - set_name/2, - % operator_policy - get_operator_policy/1, - set_operator_policy/2, - % options - get_options/1, - set_options/2, - % pid - get_pid/1, - set_pid/2, - % policy - get_policy/1, - set_policy/2, - % policy_version - get_policy_version/1, - set_policy_version/2, - % type_state - get_type_state/1, - set_type_state/2, - % recoverable_slaves - get_recoverable_slaves/1, - set_recoverable_slaves/2, - % slave_pids - get_slave_pids/1, - set_slave_pids/2, - % slave_pids_pending_shutdown - get_slave_pids_pending_shutdown/1, - set_slave_pids_pending_shutdown/2, - % state - get_state/1, - set_state/2, - % sync_slave_pids - get_sync_slave_pids/1, - set_sync_slave_pids/2, - get_type/1, - get_vhost/1, - is_amqqueue/1, - is_auto_delete/1, - is_durable/1, - is_classic/1, - is_quorum/1, - pattern_match_all/0, - pattern_match_on_name/1, - pattern_match_on_type/1, - reset_mirroring_and_decorators/1, - set_immutable/1, - qnode/1, - macros/0]). - --define(record_version, ?MODULE). --define(is_backwards_compat_classic(T), - (T =:= classic orelse T =:= ?amqqueue_v1_type)). - --record(amqqueue, { - name :: rabbit_amqqueue:name() | '_', %% immutable - durable :: boolean() | '_', %% immutable - auto_delete :: boolean() | '_', %% immutable - exclusive_owner = none :: pid() | none | '_', %% immutable - arguments = [] :: rabbit_framing:amqp_table() | '_', %% immutable - pid :: pid() | none | '_', %% durable (just so we - %% know home node) - slave_pids = [] :: [pid()] | none | '_', %% transient - sync_slave_pids = [] :: [pid()] | none| '_',%% transient - recoverable_slaves = [] :: [atom()] | none | '_', %% durable - policy :: binary() | none | undefined | '_', %% durable, implicit - %% update as above - operator_policy :: binary() | none | undefined | '_', %% durable, - %% implicit - %% update - %% as above - gm_pids = [] :: [{pid(), pid()}] | none | '_', %% transient - decorators :: [atom()] | none | undefined | '_', %% transient, - %% recalculated - %% as above - state = live :: atom() | none | '_', %% durable (have we crashed?) - policy_version = 0 :: non_neg_integer() | '_', - slave_pids_pending_shutdown = [] :: [pid()] | '_', - vhost :: rabbit_types:vhost() | undefined | '_', %% secondary index - options = #{} :: map() | '_' - }). - --type amqqueue() :: amqqueue_v1(). --type amqqueue_v1() :: #amqqueue{ - name :: rabbit_amqqueue:name(), - durable :: boolean(), - auto_delete :: boolean(), - exclusive_owner :: pid() | none, - arguments :: rabbit_framing:amqp_table(), - pid :: pid() | none, - slave_pids :: [pid()] | none, - sync_slave_pids :: [pid()] | none, - recoverable_slaves :: [atom()] | none, - policy :: binary() | none | undefined, - operator_policy :: binary() | none | undefined, - gm_pids :: [{pid(), pid()}] | none, - decorators :: [atom()] | none | undefined, - state :: atom() | none, - policy_version :: non_neg_integer(), - slave_pids_pending_shutdown :: [pid()], - vhost :: rabbit_types:vhost() | undefined, - options :: map() - }. - --type amqqueue_pattern() :: amqqueue_v1_pattern(). --type amqqueue_v1_pattern() :: #amqqueue{ - name :: rabbit_amqqueue:name() | '_', - durable :: '_', - auto_delete :: '_', - exclusive_owner :: '_', - arguments :: '_', - pid :: '_', - slave_pids :: '_', - sync_slave_pids :: '_', - recoverable_slaves :: '_', - policy :: '_', - operator_policy :: '_', - gm_pids :: '_', - decorators :: '_', - state :: '_', - policy_version :: '_', - slave_pids_pending_shutdown :: '_', - vhost :: '_', - options :: '_' - }. - --export_type([amqqueue/0, - amqqueue_v1/0, - amqqueue_pattern/0, - amqqueue_v1_pattern/0]). - --spec new(rabbit_amqqueue:name(), - pid() | none, - boolean(), - boolean(), - pid() | none, - rabbit_framing:amqp_table(), - rabbit_types:vhost() | undefined, - map()) -> amqqueue(). - -new(#resource{kind = queue} = Name, - Pid, - Durable, - AutoDelete, - Owner, - Args, - VHost, - Options) - when (is_pid(Pid) orelse Pid =:= none) andalso - is_boolean(Durable) andalso - is_boolean(AutoDelete) andalso - (is_pid(Owner) orelse Owner =:= none) andalso - is_list(Args) andalso - (is_binary(VHost) orelse VHost =:= undefined) andalso - is_map(Options) -> - new_with_version( - ?record_version, - Name, - Pid, - Durable, - AutoDelete, - Owner, - Args, - VHost, - Options). - --spec new(rabbit_amqqueue:name(), - pid() | none, - boolean(), - boolean(), - pid() | none, - rabbit_framing:amqp_table(), - rabbit_types:vhost() | undefined, - map(), - ?amqqueue_v1_type | classic) -> amqqueue(). - -new(#resource{kind = queue} = Name, - Pid, - Durable, - AutoDelete, - Owner, - Args, - VHost, - Options, - Type) - when (is_pid(Pid) orelse Pid =:= none) andalso - is_boolean(Durable) andalso - is_boolean(AutoDelete) andalso - (is_pid(Owner) orelse Owner =:= none) andalso - is_list(Args) andalso - (is_binary(VHost) orelse VHost =:= undefined) andalso - is_map(Options) andalso - ?is_backwards_compat_classic(Type) -> - new( - Name, - Pid, - Durable, - AutoDelete, - Owner, - Args, - VHost, - Options). - --spec new_with_version(amqqueue_v1, - rabbit_amqqueue:name(), - pid() | none, - boolean(), - boolean(), - pid() | none, - rabbit_framing:amqp_table(), - rabbit_types:vhost() | undefined, - map()) -> amqqueue(). - -new_with_version(?record_version, - #resource{kind = queue} = Name, - Pid, - Durable, - AutoDelete, - Owner, - Args, - VHost, - Options) - when (is_pid(Pid) orelse Pid =:= none) andalso - is_boolean(Durable) andalso - is_boolean(AutoDelete) andalso - (is_pid(Owner) orelse Owner =:= none) andalso - is_list(Args) andalso - (is_binary(VHost) orelse VHost =:= undefined) andalso - is_map(Options) -> - #amqqueue{name = Name, - durable = Durable, - auto_delete = AutoDelete, - arguments = Args, - exclusive_owner = Owner, - pid = Pid, - vhost = VHost, - options = Options}. - --spec new_with_version(amqqueue_v1, - rabbit_amqqueue:name(), - pid() | none, - boolean(), - boolean(), - pid() | none, - rabbit_framing:amqp_table(), - rabbit_types:vhost() | undefined, - map(), - ?amqqueue_v1_type | classic) -> amqqueue(). - -new_with_version(?record_version, - #resource{kind = queue} = Name, - Pid, - Durable, - AutoDelete, - Owner, - Args, - VHost, - Options, - Type) - when (is_pid(Pid) orelse Pid =:= none) andalso - is_boolean(Durable) andalso - is_boolean(AutoDelete) andalso - (is_pid(Owner) orelse Owner =:= none) andalso - is_list(Args) andalso - (is_binary(VHost) orelse VHost =:= undefined) andalso - is_map(Options) andalso - ?is_backwards_compat_classic(Type) -> - new_with_version( - ?record_version, - Name, - Pid, - Durable, - AutoDelete, - Owner, - Args, - VHost, - Options). - --spec is_amqqueue(any()) -> boolean(). - -is_amqqueue(#amqqueue{}) -> true; -is_amqqueue(_) -> false. - --spec record_version_to_use() -> amqqueue_v1. - -record_version_to_use() -> - ?record_version. - --spec upgrade(amqqueue()) -> amqqueue(). - -upgrade(#amqqueue{} = Queue) -> Queue. - --spec upgrade_to(amqqueue_v1, amqqueue()) -> amqqueue(). - -upgrade_to(?record_version, #amqqueue{} = Queue) -> - Queue. - -% arguments - --spec get_arguments(amqqueue()) -> rabbit_framing:amqp_table(). - -get_arguments(#amqqueue{arguments = Args}) -> Args. - --spec set_arguments(amqqueue(), rabbit_framing:amqp_table()) -> amqqueue(). - -set_arguments(#amqqueue{} = Queue, Args) -> - Queue#amqqueue{arguments = Args}. - -% decorators - --spec get_decorators(amqqueue()) -> [atom()] | none | undefined. - -get_decorators(#amqqueue{decorators = Decorators}) -> Decorators. - --spec set_decorators(amqqueue(), [atom()] | none | undefined) -> amqqueue(). - -set_decorators(#amqqueue{} = Queue, Decorators) -> - Queue#amqqueue{decorators = Decorators}. - --spec get_exclusive_owner(amqqueue()) -> pid() | none. - -get_exclusive_owner(#amqqueue{exclusive_owner = Owner}) -> Owner. - -% gm_pids - --spec get_gm_pids(amqqueue()) -> [{pid(), pid()}] | none. - -get_gm_pids(#amqqueue{gm_pids = GMPids}) -> GMPids. - --spec set_gm_pids(amqqueue(), [{pid(), pid()}] | none) -> amqqueue(). - -set_gm_pids(#amqqueue{} = Queue, GMPids) -> - Queue#amqqueue{gm_pids = GMPids}. - --spec get_leader(amqqueue_v1()) -> no_return(). - -get_leader(_) -> throw({unsupported, ?record_version, get_leader}). - -% operator_policy - --spec get_operator_policy(amqqueue()) -> binary() | none | undefined. - -get_operator_policy(#amqqueue{operator_policy = OpPolicy}) -> OpPolicy. - --spec set_operator_policy(amqqueue(), binary() | none | undefined) -> - amqqueue(). - -set_operator_policy(#amqqueue{} = Queue, OpPolicy) -> - Queue#amqqueue{operator_policy = OpPolicy}. - -% name - --spec get_name(amqqueue()) -> rabbit_amqqueue:name(). - -get_name(#amqqueue{name = Name}) -> Name. - --spec set_name(amqqueue(), rabbit_amqqueue:name()) -> amqqueue(). - -set_name(#amqqueue{} = Queue, Name) -> - Queue#amqqueue{name = Name}. - -%% options - --spec get_options(amqqueue()) -> map(). - -get_options(#amqqueue{options = Options}) -> Options. - --spec set_options(amqqueue(), map()) -> amqqueue(). - -set_options(#amqqueue{} = Queue, Options) -> - Queue#amqqueue{options = Options}. - -% pid - --spec get_pid -(amqqueue_v1:amqqueue_v1()) -> pid() | none. - -get_pid(#amqqueue{pid = Pid}) -> Pid. - --spec set_pid -(amqqueue_v1:amqqueue_v1(), pid() | none) -> amqqueue_v1:amqqueue_v1(). - -set_pid(#amqqueue{} = Queue, Pid) -> - Queue#amqqueue{pid = Pid}. - -% policy - --spec get_policy(amqqueue()) -> proplists:proplist() | none | undefined. - -get_policy(#amqqueue{policy = Policy}) -> Policy. - --spec set_policy(amqqueue(), binary() | none | undefined) -> amqqueue(). - -set_policy(#amqqueue{} = Queue, Policy) -> - Queue#amqqueue{policy = Policy}. - -% policy_version - --spec get_policy_version(amqqueue()) -> non_neg_integer(). - -get_policy_version(#amqqueue{policy_version = PV}) -> - PV. - --spec set_policy_version(amqqueue(), non_neg_integer()) -> amqqueue(). - -set_policy_version(#amqqueue{} = Queue, PV) -> - Queue#amqqueue{policy_version = PV}. - -% recoverable_slaves - --spec get_recoverable_slaves(amqqueue()) -> [atom()] | none. - -get_recoverable_slaves(#amqqueue{recoverable_slaves = Slaves}) -> - Slaves. - --spec set_recoverable_slaves(amqqueue(), [atom()] | none) -> amqqueue(). - -set_recoverable_slaves(#amqqueue{} = Queue, Slaves) -> - Queue#amqqueue{recoverable_slaves = Slaves}. - -% type_state (new in v2) - --spec get_type_state(amqqueue()) -> no_return(). - -get_type_state(_) -> throw({unsupported, ?record_version, get_type_state}). - --spec set_type_state(amqqueue(), [node()]) -> no_return(). - -set_type_state(_, _) -> - throw({unsupported, ?record_version, set_type_state}). - -% slave_pids - -get_slave_pids(#amqqueue{slave_pids = Slaves}) -> - Slaves. - -set_slave_pids(#amqqueue{} = Queue, SlavePids) -> - Queue#amqqueue{slave_pids = SlavePids}. - -% slave_pids_pending_shutdown - -get_slave_pids_pending_shutdown( - #amqqueue{slave_pids_pending_shutdown = Slaves}) -> - Slaves. - -set_slave_pids_pending_shutdown(#amqqueue{} = Queue, SlavePids) -> - Queue#amqqueue{slave_pids_pending_shutdown = SlavePids}. - -% state - --spec get_state(amqqueue()) -> atom() | none. - -get_state(#amqqueue{state = State}) -> State. - --spec set_state(amqqueue(), atom() | none) -> amqqueue(). - -set_state(#amqqueue{} = Queue, State) -> - Queue#amqqueue{state = State}. - -% sync_slave_pids - --spec get_sync_slave_pids(amqqueue()) -> [pid()] | none. - -get_sync_slave_pids(#amqqueue{sync_slave_pids = Pids}) -> - Pids. - --spec set_sync_slave_pids(amqqueue(), [pid()] | none) -> amqqueue(). - -set_sync_slave_pids(#amqqueue{} = Queue, Pids) -> - Queue#amqqueue{sync_slave_pids = Pids}. - -%% New in v2. - --spec get_type(amqqueue()) -> atom(). - -get_type(Queue) when ?is_amqqueue(Queue) -> ?amqqueue_v1_type. - --spec get_vhost(amqqueue()) -> rabbit_types:vhost() | undefined. - -get_vhost(#amqqueue{vhost = VHost}) -> VHost. - --spec is_auto_delete(amqqueue()) -> boolean(). - -is_auto_delete(#amqqueue{auto_delete = AutoDelete}) -> AutoDelete. - --spec is_durable(amqqueue()) -> boolean(). - -is_durable(#amqqueue{durable = Durable}) -> Durable. - --spec is_classic(amqqueue()) -> boolean(). - -is_classic(Queue) -> - get_type(Queue) =:= ?amqqueue_v1_type. - --spec is_quorum(amqqueue()) -> boolean(). - -is_quorum(Queue) when ?is_amqqueue(Queue) -> - false. - -fields() -> fields(?record_version). - -fields(?record_version) -> record_info(fields, amqqueue). - -field_vhost() -> #amqqueue.vhost. - --spec pattern_match_all() -> amqqueue_pattern(). - -pattern_match_all() -> #amqqueue{_ = '_'}. - --spec pattern_match_on_name(rabbit_amqqueue:name()) -> - amqqueue_pattern(). - -pattern_match_on_name(Name) -> #amqqueue{name = Name, _ = '_'}. - --spec pattern_match_on_type(atom()) -> no_return(). - -pattern_match_on_type(_) -> - throw({unsupported, ?record_version, pattern_match_on_type}). - -reset_mirroring_and_decorators(#amqqueue{} = Queue) -> - Queue#amqqueue{slave_pids = [], - sync_slave_pids = [], - gm_pids = [], - decorators = undefined}. - -set_immutable(#amqqueue{} = Queue) -> - Queue#amqqueue{pid = none, - slave_pids = none, - sync_slave_pids = none, - recoverable_slaves = none, - gm_pids = none, - policy = none, - decorators = none, - state = none}. - --spec qnode(amqqueue() | pid()) -> node(). - -qnode(Queue) when ?is_amqqueue(Queue) -> - QPid = get_pid(Queue), - qnode(QPid); -qnode(QPid) when is_pid(QPid) -> - node(QPid). - -macros() -> - io:format( - "-define(is_~s(Q), is_record(Q, amqqueue, ~b)).~n~n", - [?record_version, record_info(size, amqqueue)]), - %% The field number starts at 2 because the first element is the - %% record name. - macros(record_info(fields, amqqueue), 2). - -macros([Field | Rest], I) -> - io:format( - "-define(~s_field_~s(Q), element(~b, Q)).~n", - [?record_version, Field, I]), - macros(Rest, I + 1); -macros([], _) -> - ok. diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index 97c38c8780..4d60a4d514 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -164,11 +164,6 @@ start(Qs) -> ok. mark_local_durable_queues_stopped(VHost) -> - ?try_mnesia_tx_or_upgrade_amqqueue_and_retry( - do_mark_local_durable_queues_stopped(VHost), - do_mark_local_durable_queues_stopped(VHost)). - -do_mark_local_durable_queues_stopped(VHost) -> Qs = find_local_durable_queues(VHost), rabbit_misc:execute_mnesia_transaction( fun() -> @@ -258,12 +253,7 @@ get_queue_type(Args) -> {created | existing, amqqueue:amqqueue()} | queue_absent(). internal_declare(Q, Recover) -> - ?try_mnesia_tx_or_upgrade_amqqueue_and_retry( - do_internal_declare(Q, Recover), - begin - Q1 = amqqueue:upgrade(Q), - do_internal_declare(Q1, Recover) - end). + do_internal_declare(Q, Recover). do_internal_declare(Q, true) -> rabbit_misc:execute_mnesia_tx_with_tail( @@ -311,14 +301,6 @@ update(Name, Fun) -> %% only really used for quorum queues to ensure the rabbit_queue record %% is initialised ensure_rabbit_queue_record_is_initialized(Q) -> - ?try_mnesia_tx_or_upgrade_amqqueue_and_retry( - do_ensure_rabbit_queue_record_is_initialized(Q), - begin - Q1 = amqqueue:upgrade(Q), - do_ensure_rabbit_queue_record_is_initialized(Q1) - end). - -do_ensure_rabbit_queue_record_is_initialized(Q) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> ok = store_queue(Q), diff --git a/deps/rabbit/src/rabbit_amqqueue_process.erl b/deps/rabbit/src/rabbit_amqqueue_process.erl index 0c4077a4aa..ba8e7d4bae 100644 --- a/deps/rabbit/src/rabbit_amqqueue_process.erl +++ b/deps/rabbit/src/rabbit_amqqueue_process.erl @@ -316,12 +316,7 @@ terminate(_Reason, State = #q{q = Q}) -> Q2 = amqqueue:set_state(Q, crashed), rabbit_misc:execute_mnesia_transaction( fun() -> - ?try_mnesia_tx_or_upgrade_amqqueue_and_retry( - rabbit_amqqueue:store_queue(Q2), - begin - Q3 = amqqueue:upgrade(Q2), - rabbit_amqqueue:store_queue(Q3) - end) + rabbit_amqqueue:store_queue(Q2) end), BQS end, State). diff --git a/deps/rabbit/src/rabbit_policy.erl b/deps/rabbit/src/rabbit_policy.erl index 8a2c0dc7fb..5a372c40c4 100644 --- a/deps/rabbit/src/rabbit_policy.erl +++ b/deps/rabbit/src/rabbit_policy.erl @@ -285,18 +285,10 @@ recover0() -> OpPolicy1 = match(QName, OpPolicies), Q2 = amqqueue:set_operator_policy(Q1, OpPolicy1), Q3 = rabbit_queue_decorator:set(Q2), - ?try_mnesia_tx_or_upgrade_amqqueue_and_retry( - rabbit_misc:execute_mnesia_transaction( - fun () -> - mnesia:write(rabbit_durable_queue, Q3, write) - end), - begin - Q4 = amqqueue:upgrade(Q3), - rabbit_misc:execute_mnesia_transaction( - fun () -> - mnesia:write(rabbit_durable_queue, Q4, write) - end) - end) + rabbit_misc:execute_mnesia_transaction( + fun () -> + mnesia:write(rabbit_durable_queue, Q3, write) + end) end || Q0 <- Qs], ok. diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 004d496758..d6f5bbac28 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -115,7 +115,7 @@ -spec is_enabled() -> boolean(). is_enabled() -> - rabbit_feature_flags:is_enabled(quorum_queue). + true. -spec is_compatible(boolean(), boolean(), boolean()) -> boolean(). is_compatible(_Durable = true, diff --git a/deps/rabbit/test/amqqueue_backward_compatibility_SUITE.erl b/deps/rabbit/test/amqqueue_backward_compatibility_SUITE.erl index a02c4721bc..db2a4c8e48 100644 --- a/deps/rabbit/test/amqqueue_backward_compatibility_SUITE.erl +++ b/deps/rabbit/test/amqqueue_backward_compatibility_SUITE.erl @@ -14,23 +14,17 @@ init_per_testcase/2, end_per_testcase/2, - new_amqqueue_v1_is_amqqueue/1, new_amqqueue_v2_is_amqqueue/1, random_term_is_not_amqqueue/1, - amqqueue_v1_is_durable/1, amqqueue_v2_is_durable/1, random_term_is_not_durable/1, - amqqueue_v1_state_matching/1, amqqueue_v2_state_matching/1, random_term_state_matching/1, - amqqueue_v1_type_matching/1, amqqueue_v2_type_matching/1, - random_term_type_matching/1, - - upgrade_v1_to_v2/1 + random_term_type_matching/1 ]). -define(long_tuple, {random_tuple, a, b, c, d, e, f, g, h, i, j, k, l, m, @@ -43,16 +37,12 @@ all() -> groups() -> [ - {parallel_tests, [parallel], [new_amqqueue_v1_is_amqqueue, - new_amqqueue_v2_is_amqqueue, + {parallel_tests, [parallel], [new_amqqueue_v2_is_amqqueue, random_term_is_not_amqqueue, - amqqueue_v1_is_durable, amqqueue_v2_is_durable, random_term_is_not_durable, - amqqueue_v1_state_matching, amqqueue_v2_state_matching, random_term_state_matching, - amqqueue_v1_type_matching, amqqueue_v2_type_matching, random_term_type_matching]} ]. @@ -66,32 +56,6 @@ end_per_group(_, Config) -> Config. init_per_testcase(_, Config) -> Config. end_per_testcase(_, Config) -> Config. -new_amqqueue_v1_is_amqqueue(_) -> - VHost = <<"/">>, - Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1), - Queue = amqqueue:new_with_version(amqqueue_v1, - Name, - self(), - false, - false, - none, - [], - VHost, - #{}, - ?amqqueue_v1_type), - ?assert(?is_amqqueue(Queue)), - ?assert(?is_amqqueue_v1(Queue)), - ?assert(not ?is_amqqueue_v2(Queue)), - ?assert(?amqqueue_is_classic(Queue)), - ?assert(amqqueue:is_classic(Queue)), - ?assert(not ?amqqueue_is_quorum(Queue)), - ?assert(not ?amqqueue_vhost_equals(Queue, <<"frazzle">>)), - ?assert(?amqqueue_has_valid_pid(Queue)), - ?assert(?amqqueue_pid_equals(Queue, self())), - ?assert(?amqqueue_pids_are_equal(Queue, Queue)), - ?assert(?amqqueue_pid_runs_on_local_node(Queue)), - ?assert(amqqueue:qnode(Queue) == node()). - new_amqqueue_v2_is_amqqueue(_) -> VHost = <<"/">>, Name = rabbit_misc:r(VHost, queue, my_amqqueue_v2), @@ -107,7 +71,6 @@ new_amqqueue_v2_is_amqqueue(_) -> rabbit_classic_queue), ?assert(?is_amqqueue(Queue)), ?assert(?is_amqqueue_v2(Queue)), - ?assert(not ?is_amqqueue_v1(Queue)), ?assert(?amqqueue_is_classic(Queue)), ?assert(amqqueue:is_classic(Queue)), ?assert(not ?amqqueue_is_quorum(Queue)), @@ -121,40 +84,13 @@ new_amqqueue_v2_is_amqqueue(_) -> random_term_is_not_amqqueue(_) -> Term = ?long_tuple, ?assert(not ?is_amqqueue(Term)), - ?assert(not ?is_amqqueue_v2(Term)), - ?assert(not ?is_amqqueue_v1(Term)). + ?assert(not ?is_amqqueue_v2(Term)). %% ------------------------------------------------------------------- -amqqueue_v1_is_durable(_) -> - VHost = <<"/">>, - Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1), - TransientQueue = amqqueue:new_with_version(amqqueue_v1, - Name, - self(), - false, - false, - none, - [], - VHost, - #{}, - ?amqqueue_v1_type), - DurableQueue = amqqueue:new_with_version(amqqueue_v1, - Name, - self(), - true, - false, - none, - [], - VHost, - #{}, - ?amqqueue_v1_type), - ?assert(not ?amqqueue_is_durable(TransientQueue)), - ?assert(?amqqueue_is_durable(DurableQueue)). - amqqueue_v2_is_durable(_) -> VHost = <<"/">>, - Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1), + Name = rabbit_misc:r(VHost, queue, my_amqqueue_v2), TransientQueue = amqqueue:new_with_version(amqqueue_v2, Name, self(), @@ -184,26 +120,9 @@ random_term_is_not_durable(_) -> %% ------------------------------------------------------------------- -amqqueue_v1_state_matching(_) -> - VHost = <<"/">>, - Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1), - Queue1 = amqqueue:new_with_version(amqqueue_v1, - Name, - self(), - true, - false, - none, - [], - VHost, - #{}, - ?amqqueue_v1_type), - ?assert(?amqqueue_state_is(Queue1, live)), - Queue2 = amqqueue:set_state(Queue1, stopped), - ?assert(?amqqueue_state_is(Queue2, stopped)). - amqqueue_v2_state_matching(_) -> VHost = <<"/">>, - Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1), + Name = rabbit_misc:r(VHost, queue, my_amqqueue_v2), Queue1 = amqqueue:new_with_version(amqqueue_v2, Name, self(), @@ -224,26 +143,9 @@ random_term_state_matching(_) -> %% ------------------------------------------------------------------- -amqqueue_v1_type_matching(_) -> - VHost = <<"/">>, - Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1), - Queue = amqqueue:new_with_version(amqqueue_v1, - Name, - self(), - true, - false, - none, - [], - VHost, - #{}, - ?amqqueue_v1_type), - ?assert(?amqqueue_is_classic(Queue)), - ?assert(amqqueue:is_classic(Queue)), - ?assert(not ?amqqueue_is_quorum(Queue)). - amqqueue_v2_type_matching(_) -> VHost = <<"/">>, - Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1), + Name = rabbit_misc:r(VHost, queue, my_amqqueue_v2), ClassicQueue = amqqueue:new_with_version(amqqueue_v2, Name, self(), @@ -279,24 +181,3 @@ random_term_type_matching(_) -> ?assert(not ?amqqueue_is_quorum(Term)), ?assertException(error, function_clause, amqqueue:is_classic(Term)), ?assertException(error, function_clause, amqqueue:is_quorum(Term)). - -%% ------------------------------------------------------------------- - -upgrade_v1_to_v2(_) -> - VHost = <<"/">>, - Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1), - OldQueue = amqqueue:new_with_version(amqqueue_v1, - Name, - self(), - true, - false, - none, - [], - VHost, - #{}, - ?amqqueue_v1_type), - ?assert(?is_amqqueue_v1(OldQueue)), - ?assert(not ?is_amqqueue_v2(OldQueue)), - NewQueue = amqqueue:upgrade_to(amqqueue_v2, OldQueue), - ?assert(not ?is_amqqueue_v1(NewQueue)), - ?assert(?is_amqqueue_v2(NewQueue)). diff --git a/deps/rabbit/test/consumer_timeout_SUITE.erl b/deps/rabbit/test/consumer_timeout_SUITE.erl index 8ba7ac059b..e5b6d8e9a5 100644 --- a/deps/rabbit/test/consumer_timeout_SUITE.erl +++ b/deps/rabbit/test/consumer_timeout_SUITE.erl @@ -58,15 +58,10 @@ init_per_group(classic_queue, Config) -> [{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]}, {queue_durable, true}]); init_per_group(quorum_queue, Config) -> - case rabbit_ct_broker_helpers:enable_feature_flag(Config, quorum_queue) of - ok -> - rabbit_ct_helpers:set_config( - Config, - [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]}, - {queue_durable, true}]); - Skip -> - Skip - end; + rabbit_ct_helpers:set_config( + Config, + [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]}, + {queue_durable, true}]); init_per_group(mirrored_queue, Config) -> rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>, <<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]), diff --git a/deps/rabbit/test/dead_lettering_SUITE.erl b/deps/rabbit/test/dead_lettering_SUITE.erl index 3c6ae8ebf5..18d3cc361d 100644 --- a/deps/rabbit/test/dead_lettering_SUITE.erl +++ b/deps/rabbit/test/dead_lettering_SUITE.erl @@ -113,15 +113,10 @@ init_per_group(mirrored_queue, Config) -> {queue_durable, false}]), rabbit_ct_helpers:run_steps(Config1, []); init_per_group(quorum_queue, Config) -> - case rabbit_ct_broker_helpers:enable_feature_flag(Config, quorum_queue) of - ok -> - rabbit_ct_helpers:set_config( - Config, - [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]}, - {queue_durable, true}]); - Skip -> - Skip - end; + rabbit_ct_helpers:set_config( + Config, + [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]}, + {queue_durable, true}]); init_per_group(at_most_once, Config) -> case outer_group_name(Config) of quorum_queue -> diff --git a/deps/rabbit/test/definition_import_SUITE.erl b/deps/rabbit/test/definition_import_SUITE.erl index 3e526a30fc..c3a84ccee6 100644 --- a/deps/rabbit/test/definition_import_SUITE.erl +++ b/deps/rabbit/test/definition_import_SUITE.erl @@ -207,54 +207,44 @@ import_case11(Config) -> import_file_case(Config, "case11"). import_case12(Config) -> import_invalid_file_case(Config, "failing_case12"). import_case13(Config) -> - case rabbit_ct_broker_helpers:enable_feature_flag(Config, quorum_queue) of - ok -> - import_file_case(Config, "case13"), - VHost = <<"/">>, - QueueName = <<"definitions.import.case13.qq.1">>, - QueueIsImported = - fun () -> - case queue_lookup(Config, VHost, QueueName) of - {ok, _} -> true; - _ -> false - end - end, - rabbit_ct_helpers:await_condition(QueueIsImported, 20000), - {ok, Q} = queue_lookup(Config, VHost, QueueName), - - %% see rabbitmq/rabbitmq-server#2400, rabbitmq/rabbitmq-server#2426 - ?assert(amqqueue:is_quorum(Q)), - ?assertEqual([{<<"x-max-length">>, long, 991}, - {<<"x-queue-type">>, longstr, <<"quorum">>}], - amqqueue:get_arguments(Q)); - Skip -> - Skip - end. + import_file_case(Config, "case13"), + VHost = <<"/">>, + QueueName = <<"definitions.import.case13.qq.1">>, + QueueIsImported = + fun () -> + case queue_lookup(Config, VHost, QueueName) of + {ok, _} -> true; + _ -> false + end + end, + rabbit_ct_helpers:await_condition(QueueIsImported, 20000), + {ok, Q} = queue_lookup(Config, VHost, QueueName), + + %% see rabbitmq/rabbitmq-server#2400, rabbitmq/rabbitmq-server#2426 + ?assert(amqqueue:is_quorum(Q)), + ?assertEqual([{<<"x-max-length">>, long, 991}, + {<<"x-queue-type">>, longstr, <<"quorum">>}], + amqqueue:get_arguments(Q)). import_case13a(Config) -> - case rabbit_ct_broker_helpers:enable_feature_flag(Config, quorum_queue) of - ok -> - import_file_case(Config, "case13"), - VHost = <<"/">>, - QueueName = <<"definitions.import.case13.qq.1">>, - QueueIsImported = - fun () -> - case queue_lookup(Config, VHost, QueueName) of - {ok, _} -> true; - _ -> false - end - end, - rabbit_ct_helpers:await_condition(QueueIsImported, 20000), - {ok, Q} = queue_lookup(Config, VHost, QueueName), - - %% We expect that importing an existing queue (i.e. same vhost and name) - %% but with different arguments and different properties is a no-op. - import_file_case(Config, "case13a"), - timer:sleep(1000), - ?assertMatch({ok, Q}, queue_lookup(Config, VHost, QueueName)); - Skip -> - Skip - end. + import_file_case(Config, "case13"), + VHost = <<"/">>, + QueueName = <<"definitions.import.case13.qq.1">>, + QueueIsImported = + fun () -> + case queue_lookup(Config, VHost, QueueName) of + {ok, _} -> true; + _ -> false + end + end, + rabbit_ct_helpers:await_condition(QueueIsImported, 20000), + {ok, Q} = queue_lookup(Config, VHost, QueueName), + + %% We expect that importing an existing queue (i.e. same vhost and name) + %% but with different arguments and different properties is a no-op. + import_file_case(Config, "case13a"), + timer:sleep(1000), + ?assertMatch({ok, Q}, queue_lookup(Config, VHost, QueueName)). import_case14(Config) -> import_file_case(Config, "case14"). %% contains a user with tags as a list diff --git a/deps/rabbit/test/dynamic_qq_SUITE.erl b/deps/rabbit/test/dynamic_qq_SUITE.erl index 89650c7173..3c1d03d288 100644 --- a/deps/rabbit/test/dynamic_qq_SUITE.erl +++ b/deps/rabbit/test/dynamic_qq_SUITE.erl @@ -73,17 +73,10 @@ init_per_testcase(Testcase, Config) -> {queue_name, Q}, {queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]} ]), - Config2 = rabbit_ct_helpers:run_steps( - Config1, - rabbit_ct_broker_helpers:setup_steps() ++ - rabbit_ct_client_helpers:setup_steps()), - case rabbit_ct_broker_helpers:enable_feature_flag(Config2, quorum_queue) of - ok -> - Config2; - Skip -> - end_per_testcase(Testcase, Config2), - Skip - end + rabbit_ct_helpers:run_steps( + Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()) end. end_per_testcase(Testcase, Config) -> diff --git a/deps/rabbit/test/maintenance_mode_SUITE.erl b/deps/rabbit/test/maintenance_mode_SUITE.erl index b57307a594..9400a0ae5c 100644 --- a/deps/rabbit/test/maintenance_mode_SUITE.erl +++ b/deps/rabbit/test/maintenance_mode_SUITE.erl @@ -16,8 +16,7 @@ all() -> [ - {group, cluster_size_3}, - {group, quorum_queues} + {group, cluster_size_3} ]. groups() -> @@ -25,9 +24,7 @@ groups() -> {cluster_size_3, [], [ maintenance_mode_status, listener_suspension_status, - client_connection_closure - ]}, - {quorum_queues, [], [ + client_connection_closure, quorum_queue_leadership_transfer ]} ]. @@ -43,17 +40,6 @@ init_per_suite(Config) -> end_per_suite(Config) -> rabbit_ct_helpers:run_teardown_steps(Config). -init_per_group(quorum_queues, Config) -> - case rabbit_ct_helpers:is_mixed_versions() of - true -> - %% In a mixed 3.8/3.9 cluster, unless the 3.8 node is the - %% one in maintenance mode, a quorum won't be available - %% due to mixed ra major versions - {skip, "test not supported in mixed version mode"}; - _ -> - rabbit_ct_helpers:set_config(Config, - [{rmq_nodes_count, 3}]) - end; init_per_group(_Group, Config) -> rabbit_ct_helpers:set_config(Config, [{rmq_nodes_count, 3}]). @@ -70,19 +56,10 @@ init_per_testcase(quorum_queue_leadership_transfer = Testcase, Config) -> {rmq_nodename_suffix, Testcase}, {tcp_ports_base, {skip_n_nodes, TestNumber * ClusterSize}} ]), - Config2 = rabbit_ct_helpers:run_steps( - Config1, - rabbit_ct_broker_helpers:setup_steps() ++ - rabbit_ct_client_helpers:setup_steps()), - QuorumQueueFFEnabled = rabbit_ct_broker_helpers:enable_feature_flag( - Config2, quorum_queue), - case QuorumQueueFFEnabled of - ok -> - Config2; - Skip -> - end_per_testcase(Testcase, Config2), - Skip - end; + rabbit_ct_helpers:run_steps( + Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()); init_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_started(Config, Testcase), ClusterSize = ?config(rmq_nodes_count, Config), diff --git a/deps/rabbit/test/publisher_confirms_parallel_SUITE.erl b/deps/rabbit/test/publisher_confirms_parallel_SUITE.erl index 00e0e3016d..79812e7099 100644 --- a/deps/rabbit/test/publisher_confirms_parallel_SUITE.erl +++ b/deps/rabbit/test/publisher_confirms_parallel_SUITE.erl @@ -67,15 +67,10 @@ init_per_group(classic_queue, Config) -> [{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]}, {queue_durable, true}]); init_per_group(quorum_queue, Config) -> - case rabbit_ct_broker_helpers:enable_feature_flag(Config, quorum_queue) of - ok -> - rabbit_ct_helpers:set_config( - Config, - [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]}, - {queue_durable, true}]); - Skip -> - Skip - end; + rabbit_ct_helpers:set_config( + Config, + [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]}, + {queue_durable, true}]); init_per_group(mirrored_queue, Config) -> rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>, <<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]), diff --git a/deps/rabbit/test/queue_length_limits_SUITE.erl b/deps/rabbit/test/queue_length_limits_SUITE.erl index cd7f0e79b6..eebbcfa67e 100644 --- a/deps/rabbit/test/queue_length_limits_SUITE.erl +++ b/deps/rabbit/test/queue_length_limits_SUITE.erl @@ -67,15 +67,10 @@ init_per_group(max_length_classic, Config) -> [{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]}, {queue_durable, false}]); init_per_group(max_length_quorum, Config) -> - case rabbit_ct_broker_helpers:enable_feature_flag(Config, quorum_queue) of - ok -> - rabbit_ct_helpers:set_config( - Config, - [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]}, - {queue_durable, true}]); - Skip -> - Skip - end; + rabbit_ct_helpers:set_config( + Config, + [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]}, + {queue_durable, true}]); init_per_group(max_length_mirrored, Config) -> rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>, <<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]), diff --git a/deps/rabbit/test/queue_parallel_SUITE.erl b/deps/rabbit/test/queue_parallel_SUITE.erl index a47b5ba527..efb6e04284 100644 --- a/deps/rabbit/test/queue_parallel_SUITE.erl +++ b/deps/rabbit/test/queue_parallel_SUITE.erl @@ -90,40 +90,25 @@ init_per_group(classic_queue, Config) -> {consumer_args, []}, {queue_durable, true}]); init_per_group(quorum_queue, Config) -> - case rabbit_ct_broker_helpers:enable_feature_flag(Config, quorum_queue) of - ok -> - rabbit_ct_helpers:set_config( - Config, - [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]}, - {consumer_args, []}, - {queue_durable, true}]); - Skip -> - Skip - end; + rabbit_ct_helpers:set_config( + Config, + [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]}, + {consumer_args, []}, + {queue_durable, true}]); init_per_group(quorum_queue_in_memory_limit, Config) -> - case rabbit_ct_broker_helpers:enable_feature_flag(Config, quorum_queue) of - ok -> - rabbit_ct_helpers:set_config( - Config, - [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}, - {<<"x-max-in-memory-length">>, long, 1}]}, - {consumer_args, []}, - {queue_durable, true}]); - Skip -> - Skip - end; + rabbit_ct_helpers:set_config( + Config, + [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-max-in-memory-length">>, long, 1}]}, + {consumer_args, []}, + {queue_durable, true}]); init_per_group(quorum_queue_in_memory_bytes, Config) -> - case rabbit_ct_broker_helpers:enable_feature_flag(Config, quorum_queue) of - ok -> - rabbit_ct_helpers:set_config( - Config, - [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}, - {<<"x-max-in-memory-bytes">>, long, 1}]}, - {consumer_args, []}, - {queue_durable, true}]); - Skip -> - Skip - end; + rabbit_ct_helpers:set_config( + Config, + [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-max-in-memory-bytes">>, long, 1}]}, + {consumer_args, []}, + {queue_durable, true}]); init_per_group(mirrored_queue, Config) -> rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>, <<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]), diff --git a/deps/rabbit/test/queue_type_SUITE.erl b/deps/rabbit/test/queue_type_SUITE.erl index 8e4c2a39fa..b757563f46 100644 --- a/deps/rabbit/test/queue_type_SUITE.erl +++ b/deps/rabbit/test/queue_type_SUITE.erl @@ -54,30 +54,24 @@ init_per_group(Group, Config) -> Config2 = rabbit_ct_helpers:run_steps(Config1b, [fun merge_app_env/1 ] ++ rabbit_ct_broker_helpers:setup_steps()), - case rabbit_ct_broker_helpers:enable_feature_flag(Config2, quorum_queue) of - ok -> - ok = rabbit_ct_broker_helpers:rpc( - Config2, 0, application, set_env, - [rabbit, channel_tick_interval, 100]), - %% HACK: the larger cluster sizes benefit for a bit more time - %% after clustering before running the tests. - Config3 = case Group of - cluster_size_5 -> - timer:sleep(5000), - Config2; - _ -> - Config2 - end, - - rabbit_ct_broker_helpers:set_policy( - Config3, 0, - <<"ha-policy">>, <<".*">>, <<"queues">>, - [{<<"ha-mode">>, <<"all">>}]), - Config3; - Skip -> - end_per_group(Group, Config2), - Skip - end. + ok = rabbit_ct_broker_helpers:rpc( + Config2, 0, application, set_env, + [rabbit, channel_tick_interval, 100]), + %% HACK: the larger cluster sizes benefit for a bit more time + %% after clustering before running the tests. + Config3 = case Group of + cluster_size_5 -> + timer:sleep(5000), + Config2; + _ -> + Config2 + end, + + rabbit_ct_broker_helpers:set_policy( + Config3, 0, + <<"ha-policy">>, <<".*">>, <<"queues">>, + [{<<"ha-mode">>, <<"all">>}]), + Config3. merge_app_env(Config) -> rabbit_ct_helpers:merge_app_env( diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index d22aa735c7..535c88ad38 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -211,22 +211,14 @@ init_per_group(Group, Config) -> {skip, _} -> Ret; Config2 -> - EnableFF = rabbit_ct_broker_helpers:enable_feature_flag( - Config2, quorum_queue), - case EnableFF of - ok -> - ok = rabbit_ct_broker_helpers:rpc( - Config2, 0, application, set_env, - [rabbit, channel_tick_interval, 100]), - %% HACK: the larger cluster sizes benefit for a bit - %% more time after clustering before running the - %% tests. - timer:sleep(ClusterSize * 1000), - Config2; - Skip -> - end_per_group(Group, Config2), - Skip - end + ok = rabbit_ct_broker_helpers:rpc( + Config2, 0, application, set_env, + [rabbit, channel_tick_interval, 100]), + %% HACK: the larger cluster sizes benefit for a bit + %% more time after clustering before running the + %% tests. + timer:sleep(ClusterSize * 1000), + Config2 end end. @@ -252,24 +244,10 @@ init_per_testcase(Testcase, Config) when Testcase == reconnect_consumer_and_publ {queue_name, Q}, {alt_queue_name, <<Q/binary, "_alt">>} ]), - Ret = rabbit_ct_helpers:run_steps( - Config2, - rabbit_ct_broker_helpers:setup_steps() ++ - rabbit_ct_client_helpers:setup_steps()), - case Ret of - {skip, _} -> - Ret; - Config3 -> - EnableFF = rabbit_ct_broker_helpers:enable_feature_flag( - Config3, quorum_queue), - case EnableFF of - ok -> - Config3; - Skip -> - end_per_testcase(Testcase, Config3), - Skip - end - end; + rabbit_ct_helpers:run_steps( + Config2, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()); init_per_testcase(Testcase, Config) -> ClusterSize = ?config(rmq_nodes_count, Config), IsMixed = rabbit_ct_helpers:is_mixed_versions(), @@ -309,15 +287,6 @@ init_per_testcase(Testcase, Config) -> {alt_queue_name, <<Q/binary, "_alt">>}, {alt_2_queue_name, <<Q/binary, "_alt_2">>} ]), - EnableFF = rabbit_ct_broker_helpers:enable_feature_flag( - Config2, quorum_queue), - case EnableFF of - ok -> - Config2; - Skip -> - end_per_testcase(Testcase, Config2), - Skip - end, rabbit_ct_helpers:run_steps(Config2, rabbit_ct_client_helpers:setup_steps()) end. diff --git a/deps/rabbit/test/rabbit_fifo_dlx_integration_SUITE.erl b/deps/rabbit/test/rabbit_fifo_dlx_integration_SUITE.erl index 913e3a7e81..d97fbcdec8 100644 --- a/deps/rabbit/test/rabbit_fifo_dlx_integration_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_dlx_integration_SUITE.erl @@ -95,11 +95,8 @@ init_per_group(Group, Config, NodesCount) -> ok = rabbit_ct_broker_helpers:rpc( Config2, 0, application, set_env, [rabbit, channel_tick_interval, 100]), - case rabbit_ct_broker_helpers:enable_feature_flag(Config2, quorum_queue) of - ok -> case rabbit_ct_broker_helpers:enable_feature_flag(Config2, stream_queue) of - ok -> Config2; - Skip -> Skip - end; + case rabbit_ct_broker_helpers:enable_feature_flag(Config2, stream_queue) of + ok -> Config2; Skip -> Skip end. diff --git a/deps/rabbit/test/rabbitmq_queues_cli_integration_SUITE.erl b/deps/rabbit/test/rabbitmq_queues_cli_integration_SUITE.erl index 5b956ec809..c9479631a5 100644 --- a/deps/rabbit/test/rabbitmq_queues_cli_integration_SUITE.erl +++ b/deps/rabbit/test/rabbitmq_queues_cli_integration_SUITE.erl @@ -43,16 +43,9 @@ init_per_group(tests, Config0) -> Config1 = rabbit_ct_helpers:set_config( Config0, [{rmq_nodes_count, NumNodes}, {rmq_nodes_clustered, true}]), - Config2 = rabbit_ct_helpers:run_steps(Config1, - rabbit_ct_broker_helpers:setup_steps() ++ - rabbit_ct_client_helpers:setup_steps()), - case rabbit_ct_broker_helpers:enable_feature_flag(Config2, quorum_queue) of - ok -> - Config2; - Skip -> - end_per_group(tests, Config2), - Skip - end. + rabbit_ct_helpers:run_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). end_per_group(tests, Config) -> rabbit_ct_helpers:run_steps(Config, diff --git a/deps/rabbit/test/single_active_consumer_SUITE.erl b/deps/rabbit/test/single_active_consumer_SUITE.erl index 2e06e9585e..30a92bf81a 100644 --- a/deps/rabbit/test/single_active_consumer_SUITE.erl +++ b/deps/rabbit/test/single_active_consumer_SUITE.erl @@ -64,21 +64,14 @@ init_per_group(classic_queue, Config) -> auto_delete = true} } | Config]; init_per_group(quorum_queue, Config) -> - Ret = rabbit_ct_broker_helpers:rpc( - Config, 0, rabbit_feature_flags, enable, [quorum_queue]), - case Ret of - ok -> - [{single_active_consumer_queue_declare, - #'queue.declare'{ - arguments = [ - {<<"x-single-active-consumer">>, bool, true}, - {<<"x-queue-type">>, longstr, <<"quorum">>} - ], - durable = true, exclusive = false, auto_delete = false} - } | Config]; - Error -> - {skip, {"Quorum queues are unsupported", Error}} - end. + [{single_active_consumer_queue_declare, + #'queue.declare'{ + arguments = [ + {<<"x-single-active-consumer">>, bool, true}, + {<<"x-queue-type">>, longstr, <<"quorum">>} + ], + durable = true, exclusive = false, auto_delete = false} + } | Config]. end_per_group(_, Config) -> Config. diff --git a/deps/rabbit/test/upgrade_preparation_SUITE.erl b/deps/rabbit/test/upgrade_preparation_SUITE.erl index 6898feca99..9e4174ace0 100644 --- a/deps/rabbit/test/upgrade_preparation_SUITE.erl +++ b/deps/rabbit/test/upgrade_preparation_SUITE.erl @@ -62,10 +62,7 @@ end_per_group(_Group, Config) -> init_per_testcase(TestCase, Config) -> rabbit_ct_helpers:testcase_started(Config, TestCase), - case rabbit_ct_broker_helpers:enable_feature_flag(Config, quorum_queue) of - ok -> Config; - Skip -> Skip - end. + Config. end_per_testcase(TestCase, Config) -> rabbit_ct_helpers:testcase_finished(Config, TestCase). diff --git a/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl b/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl index 3946183bb2..cb1e90c1d2 100644 --- a/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl +++ b/deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl @@ -48,26 +48,10 @@ init_per_group(Group, Config) -> {rmq_nodename_suffix, Suffix}, {amqp10_client_library, Group} ]), - Config2 = rabbit_ct_helpers:run_setup_steps( - Config1, - rabbit_ct_broker_helpers:setup_steps() ++ - rabbit_ct_client_helpers:setup_steps()), - Nodes = rabbit_ct_broker_helpers:get_node_configs( - Config2, nodename), - Ret = rabbit_ct_broker_helpers:rpc( - Config2, 0, - rabbit_feature_flags, - is_supported_remotely, - [Nodes, [quorum_queue], 60000]), - case Ret of - true -> - ok = rabbit_ct_broker_helpers:rpc( - Config2, 0, rabbit_feature_flags, enable, [quorum_queue]), - Config2; - false -> - end_per_group(Group, Config2), - {skip, "Quorum queues are unsupported"} - end. + rabbit_ct_helpers:run_setup_steps( + Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). end_per_group(_, Config) -> rabbit_ct_helpers:run_teardown_steps(Config, diff --git a/deps/rabbitmq_federation/test/queue_SUITE.erl b/deps/rabbitmq_federation/test/queue_SUITE.erl index 62e51febd9..719769a7d6 100644 --- a/deps/rabbitmq_federation/test/queue_SUITE.erl +++ b/deps/rabbitmq_federation/test/queue_SUITE.erl @@ -138,23 +138,10 @@ init_per_group1(Group, Config) -> {rmq_nodename_suffix, Suffix}, {rmq_nodes_clustered, false} ]), - Config2 = rabbit_ct_helpers:run_steps(Config1, - rabbit_ct_broker_helpers:setup_steps() ++ - rabbit_ct_client_helpers:setup_steps() ++ - SetupFederation ++ Disambiguate), - case ?config(target_queue_type, Config2) of - quorum -> - case rabbit_ct_broker_helpers:enable_feature_flag(Config2, quorum_queue) of - ok -> - Config2; - {skip, Skip} -> - Skip; - Other -> - {skip, Other} - end; - _ -> - Config2 - end. + rabbit_ct_helpers:run_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps() ++ + SetupFederation ++ Disambiguate). end_per_group(without_disambiguate, Config) -> Config; diff --git a/deps/rabbitmq_management/test/rabbit_mgmt_http_SUITE.erl b/deps/rabbitmq_management/test/rabbit_mgmt_http_SUITE.erl index 9b975988a1..3bacecefbb 100644 --- a/deps/rabbitmq_management/test/rabbit_mgmt_http_SUITE.erl +++ b/deps/rabbitmq_management/test/rabbit_mgmt_http_SUITE.erl @@ -177,49 +177,14 @@ finish_init(Group, Config) -> Config1 = rabbit_ct_helpers:set_config(Config, NodeConf), merge_app_env(Config1). -enable_feature_flag_or_skip(FFName, Group, Config0) -> - Config1 = finish_init(Group, Config0), - Config2 = start_broker(Config1), - Nodes = rabbit_ct_broker_helpers:get_node_configs( - Config2, nodename), - Ret = rabbit_ct_broker_helpers:rpc( - Config2, 0, - rabbit_feature_flags, - is_supported_remotely, - [Nodes, [FFName], 60000]), - case Ret of - true -> - ok = rabbit_ct_broker_helpers:rpc( - Config2, 0, rabbit_feature_flags, enable, [FFName]), - Config2; - false -> - end_per_group(Group, Config2), - {skip, rabbit_misc:format("Feature flag '~s' is not supported", [FFName])} - end. - init_per_group(all_tests_with_prefix=Group, Config0) -> PathConfig = {rabbitmq_management, [{path_prefix, ?PATH_PREFIX}]}, Config1 = rabbit_ct_helpers:merge_app_env(Config0, PathConfig), Config2 = finish_init(Group, Config1), - Config3 = start_broker(Config2), - Nodes = rabbit_ct_broker_helpers:get_node_configs( - Config3, nodename), - Ret = rabbit_ct_broker_helpers:rpc( - Config3, 0, - rabbit_feature_flags, - is_supported_remotely, - [Nodes, [quorum_queue], 60000]), - case Ret of - true -> - ok = rabbit_ct_broker_helpers:rpc( - Config3, 0, rabbit_feature_flags, enable, [quorum_queue]), - Config3; - false -> - end_per_group(Group, Config3), - {skip, "Quorum queues are unsupported"} - end; + start_broker(Config2); init_per_group(Group, Config0) -> - enable_feature_flag_or_skip(quorum_queue, Group, Config0). + Config1 = finish_init(Group, Config0), + start_broker(Config1). end_per_group(_, Config) -> inets:stop(), diff --git a/deps/rabbitmq_management/test/rabbit_mgmt_http_health_checks_SUITE.erl b/deps/rabbitmq_management/test/rabbit_mgmt_http_health_checks_SUITE.erl index 85d9491730..3d8aed3f8c 100644 --- a/deps/rabbitmq_management/test/rabbit_mgmt_http_health_checks_SUITE.erl +++ b/deps/rabbitmq_management/test/rabbit_mgmt_http_health_checks_SUITE.erl @@ -65,24 +65,10 @@ init_per_group(Group, Config0) -> {rmq_nodes_count, ClusterSize}, {tcp_ports_base}], Config2 = rabbit_ct_helpers:set_config(Config1, NodeConf), - Ret = rabbit_ct_helpers:run_setup_steps( - Config2, - rabbit_ct_broker_helpers:setup_steps() ++ - rabbit_ct_client_helpers:setup_steps()), - case Ret of - {skip, _} -> - Ret; - Config3 -> - EnableFF = rabbit_ct_broker_helpers:enable_feature_flag( - Config3, quorum_queue), - case EnableFF of - ok -> - Config3; - Skip -> - end_per_group(Group, Config3), - Skip - end - end. + rabbit_ct_helpers:run_setup_steps( + Config2, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). end_per_group(_, Config) -> inets:stop(), diff --git a/deps/rabbitmq_management/test/rabbit_mgmt_only_http_SUITE.erl b/deps/rabbitmq_management/test/rabbit_mgmt_only_http_SUITE.erl index e0cadbaef0..d0c9de605f 100644 --- a/deps/rabbitmq_management/test/rabbit_mgmt_only_http_SUITE.erl +++ b/deps/rabbitmq_management/test/rabbit_mgmt_only_http_SUITE.erl @@ -122,42 +122,10 @@ init_per_group(all_tests_with_prefix=Group, Config0) -> PathConfig = {rabbitmq_management, [{path_prefix, ?PATH_PREFIX}]}, Config1 = rabbit_ct_helpers:merge_app_env(Config0, PathConfig), Config2 = finish_init(Group, Config1), - Config3 = start_broker(Config2), - Nodes = rabbit_ct_broker_helpers:get_node_configs( - Config3, nodename), - Ret = rabbit_ct_broker_helpers:rpc( - Config3, 0, - rabbit_feature_flags, - is_supported_remotely, - [Nodes, [quorum_queue], 60000]), - case Ret of - true -> - ok = rabbit_ct_broker_helpers:rpc( - Config3, 0, rabbit_feature_flags, enable, [quorum_queue]), - Config3; - false -> - end_per_group(Group, Config3), - {skip, "Quorum queues are unsupported"} - end; + Config3 = start_broker(Config2); init_per_group(Group, Config0) -> Config1 = finish_init(Group, Config0), - Config2 = start_broker(Config1), - Nodes = rabbit_ct_broker_helpers:get_node_configs( - Config2, nodename), - Ret = rabbit_ct_broker_helpers:rpc( - Config2, 0, - rabbit_feature_flags, - is_supported_remotely, - [Nodes, [quorum_queue], 60000]), - case Ret of - true -> - ok = rabbit_ct_broker_helpers:rpc( - Config2, 0, rabbit_feature_flags, enable, [quorum_queue]), - Config2; - false -> - end_per_group(Group, Config2), - {skip, "Quorum queues are unsupported"} - end. + Config2 = start_broker(Config1). end_per_group(_, Config) -> inets:stop(), diff --git a/deps/rabbitmq_management_agent/test/rabbit_mgmt_gc_SUITE.erl b/deps/rabbitmq_management_agent/test/rabbit_mgmt_gc_SUITE.erl index 278df90d3c..12f7a63a29 100644 --- a/deps/rabbitmq_management_agent/test/rabbit_mgmt_gc_SUITE.erl +++ b/deps/rabbitmq_management_agent/test/rabbit_mgmt_gc_SUITE.erl @@ -77,22 +77,6 @@ init_per_group(_, Config) -> end_per_group(_, Config) -> Config. -init_per_testcase(quorum_queue_stats = Testcase, Config) -> - case rabbit_ct_helpers:is_mixed_versions() of - true -> - {skip, "not mixed versions compatible"}; - _ -> - case rabbit_ct_broker_helpers:enable_feature_flag(Config, quorum_queue) of - ok -> - rabbit_ct_helpers:testcase_started(Config, Testcase), - rabbit_ct_helpers:run_steps( - Config, rabbit_ct_client_helpers:setup_steps()); - {skip, _} = Skip -> - Skip; - Other -> - {skip, Other} - end - end; init_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_started(Config, Testcase), rabbit_ct_helpers:run_steps(Config, diff --git a/deps/rabbitmq_mqtt/test/reader_SUITE.erl b/deps/rabbitmq_mqtt/test/reader_SUITE.erl index df7dce5125..f029399ae6 100644 --- a/deps/rabbitmq_mqtt/test/reader_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/reader_SUITE.erl @@ -12,8 +12,7 @@ all() -> [ - {group, non_parallel_tests}, - {group, non_parallel_tests_quorum} + {group, non_parallel_tests} ]. groups() -> @@ -21,9 +20,7 @@ groups() -> {non_parallel_tests, [], [ block, handle_invalid_frames, - stats - ]}, - {non_parallel_tests_quorum, [], [ + stats, quorum_session_false, quorum_session_true, classic_session_true, @@ -62,14 +59,6 @@ end_per_suite(Config) -> rabbit_ct_client_helpers:teardown_steps() ++ rabbit_ct_broker_helpers:teardown_steps()). -init_per_group(non_parallel_tests_quorum, Config) -> -%% Added for quorum queue test else the mixing test would fail -%% with "feature flag is disabled" - case rabbit_ct_broker_helpers:enable_feature_flag(Config, quorum_queue) of - ok -> Config; - Skip -> Skip - end, - Config; init_per_group(_, Config) -> Config. diff --git a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl index 0ff94a91f6..d51f4abfa0 100644 --- a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl +++ b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl @@ -169,7 +169,6 @@ init_per_group(aggregated_metrics, Config0) -> [{rabbit, [{collect_statistics, coarse}, {collect_statistics_interval, 100}]}] ), Config2 = init_per_group(aggregated_metrics, Config1, []), - ok = rabbit_ct_broker_helpers:enable_feature_flag(Config2, quorum_queue), A = rabbit_ct_broker_helpers:get_node_config(Config2, 0, nodename), Ch = rabbit_ct_client_helpers:open_channel(Config2, A), diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl index 7749c1bca1..cd4635f6cc 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl @@ -83,7 +83,6 @@ init_per_group(Group, Config) [{forced_feature_flags_on_init, [classic_mirrored_queue_version, implicit_default_bindings, - quorum_queue, stream_queue]}]}) end]; _ -> |