diff options
author | Jean-Sébastien Pédron <jean-sebastien@rabbitmq.com> | 2022-07-15 16:09:47 +0200 |
---|---|---|
committer | Mergify <37929162+mergify[bot]@users.noreply.github.com> | 2022-08-04 11:16:03 +0000 |
commit | f7e50fd4ccdd2dc9a90481dccd2c485b32e8adad (patch) | |
tree | 06eb14ec81bba0c8f67a476ee80df39c59351322 | |
parent | 659e2c1b7d1a412303c2565d18107fa73c1e81cb (diff) | |
download | rabbitmq-server-git-f7e50fd4ccdd2dc9a90481dccd2c485b32e8adad.tar.gz |
Remove pre-quorum-queue compatibility code
Quorum queues were introduced in RabbitMQ 3.8.0. This was first time we
added a breaking change protected behind a feature flag. This allowed a
RabbitMQ cluster to be upgraded one node at a time, without having to
stop the entire cluster.
The breaking change was a new field in the `#amqqueue{}` record. This
broke the API and the ABI because records are a compile-time thing in
Erlang.
The compatibility code is in the wild for long enough that we want to
support the new `#amqqueue{}` record only from now on. The
`quorum_queue` feature flag was marked as required in a previous commit
(see #5202). This allows us to remove code in this patch.
References #5215.
(cherry picked from commit 909f861e5586b75146e86618a41829cdd47f9bc2)
-rw-r--r-- | deps/rabbit/include/amqqueue.hrl | 117 | ||||
-rw-r--r-- | deps/rabbit/include/amqqueue_v1.hrl | 20 | ||||
-rw-r--r-- | deps/rabbit/src/amqqueue.erl | 268 | ||||
-rw-r--r-- | deps/rabbit/src/amqqueue_v1.erl | 593 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_amqqueue.erl | 20 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_amqqueue_process.erl | 7 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_policy.erl | 16 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_quorum_queue.erl | 2 | ||||
-rw-r--r-- | deps/rabbit/test/amqqueue_backward_compatibility_SUITE.erl | 131 |
9 files changed, 107 insertions, 1067 deletions
diff --git a/deps/rabbit/include/amqqueue.hrl b/deps/rabbit/include/amqqueue.hrl index 097f1dfa0c..2c5de677a1 100644 --- a/deps/rabbit/include/amqqueue.hrl +++ b/deps/rabbit/include/amqqueue.hrl @@ -2,131 +2,72 @@ %% License, v. 2.0. If a copy of the MPL was not distributed with this %% file, You can obtain one at https://mozilla.org/MPL/2.0/. %% -%% Copyright (c) 2018-2020 VMware, Inc. or its affiliates. All rights reserved. +%% Copyright (c) 2018-2022 VMware, Inc. or its affiliates. All rights reserved. %% --include("amqqueue_v1.hrl"). -include("amqqueue_v2.hrl"). -define(is_amqqueue(Q), - (?is_amqqueue_v2(Q) orelse - ?is_amqqueue_v1(Q))). + (?is_amqqueue_v2(Q))). -define(amqqueue_is_auto_delete(Q), - ((?is_amqqueue_v2(Q) andalso - ?amqqueue_v2_field_auto_delete(Q) =:= true) orelse - (?is_amqqueue_v1(Q) andalso - ?amqqueue_v1_field_auto_delete(Q) =:= true))). + (?is_amqqueue_v2(Q) andalso + ?amqqueue_v2_field_auto_delete(Q) =:= true)). -define(amqqueue_is_durable(Q), - ((?is_amqqueue_v2(Q) andalso - ?amqqueue_v2_field_durable(Q) =:= true) orelse - (?is_amqqueue_v1(Q) andalso - ?amqqueue_v1_field_durable(Q) =:= true))). + (?is_amqqueue_v2(Q) andalso + ?amqqueue_v2_field_durable(Q) =:= true)). -define(amqqueue_exclusive_owner_is(Q, Owner), - ((?is_amqqueue_v2(Q) andalso - ?amqqueue_v2_field_exclusive_owner(Q) =:= Owner) orelse - (?is_amqqueue_v1(Q) andalso - ?amqqueue_v1_field_exclusive_owner(Q) =:= Owner))). + (?is_amqqueue_v2(Q) andalso + ?amqqueue_v2_field_exclusive_owner(Q) =:= Owner)). -define(amqqueue_exclusive_owner_is_pid(Q), - ((?is_amqqueue_v2(Q) andalso - is_pid(?amqqueue_v2_field_exclusive_owner(Q))) orelse - (?is_amqqueue_v1(Q) andalso - is_pid(?amqqueue_v1_field_exclusive_owner(Q))))). + (?is_amqqueue_v2(Q) andalso + is_pid(?amqqueue_v2_field_exclusive_owner(Q)))). -define(amqqueue_state_is(Q, State), - ((?is_amqqueue_v2(Q) andalso - ?amqqueue_v2_field_state(Q) =:= State) orelse - (?is_amqqueue_v1(Q) andalso - ?amqqueue_v1_field_state(Q) =:= State))). + (?is_amqqueue_v2(Q) andalso + ?amqqueue_v2_field_state(Q) =:= State)). -define(amqqueue_v1_type, rabbit_classic_queue). -define(amqqueue_is_classic(Q), - ((?is_amqqueue_v2(Q) andalso - ?amqqueue_v2_field_type(Q) =:= rabbit_classic_queue) orelse - ?is_amqqueue_v1(Q))). + (?is_amqqueue_v2(Q) andalso + ?amqqueue_v2_field_type(Q) =:= rabbit_classic_queue)). -define(amqqueue_is_quorum(Q), (?is_amqqueue_v2(Q) andalso - ?amqqueue_v2_field_type(Q) =:= rabbit_quorum_queue) orelse - false). + ?amqqueue_v2_field_type(Q) =:= rabbit_quorum_queue)). -define(amqqueue_is_stream(Q), (?is_amqqueue_v2(Q) andalso - ?amqqueue_v2_field_type(Q) =:= rabbit_stream_queue) orelse - false). + ?amqqueue_v2_field_type(Q) =:= rabbit_stream_queue)). -define(amqqueue_has_valid_pid(Q), - ((?is_amqqueue_v2(Q) andalso - is_pid(?amqqueue_v2_field_pid(Q))) orelse - (?is_amqqueue_v1(Q) andalso - is_pid(?amqqueue_v1_field_pid(Q))))). + (?is_amqqueue_v2(Q) andalso + is_pid(?amqqueue_v2_field_pid(Q)))). -define(amqqueue_pid_runs_on_local_node(Q), - ((?is_amqqueue_v2(Q) andalso - node(?amqqueue_v2_field_pid(Q)) =:= node()) orelse - (?is_amqqueue_v1(Q) andalso - node(?amqqueue_v1_field_pid(Q)) =:= node()))). + (?is_amqqueue_v2(Q) andalso + node(?amqqueue_v2_field_pid(Q)) =:= node())). -define(amqqueue_pid_equals(Q, Pid), - ((?is_amqqueue_v2(Q) andalso - ?amqqueue_v2_field_pid(Q) =:= Pid) orelse - (?is_amqqueue_v1(Q) andalso - ?amqqueue_v1_field_pid(Q) =:= Pid))). + (?is_amqqueue_v2(Q) andalso + ?amqqueue_v2_field_pid(Q) =:= Pid)). -define(amqqueue_pids_are_equal(Q0, Q1), - ((?is_amqqueue_v2(Q0) andalso ?is_amqqueue_v2(Q1) andalso - ?amqqueue_v2_field_pid(Q0) =:= ?amqqueue_v2_field_pid(Q1)) orelse - (?is_amqqueue_v1(Q0) andalso ?is_amqqueue_v1(Q1) andalso - ?amqqueue_v1_field_pid(Q0) =:= ?amqqueue_v1_field_pid(Q1)))). + (?is_amqqueue_v2(Q0) andalso ?is_amqqueue_v2(Q1) andalso + ?amqqueue_v2_field_pid(Q0) =:= ?amqqueue_v2_field_pid(Q1))). -define(amqqueue_field_name(Q), - case ?is_amqqueue_v2(Q) of - true -> ?amqqueue_v2_field_name(Q); - false -> case ?is_amqqueue_v1(Q) of - true -> ?amqqueue_v1_field_name(Q) - end - end). + ?amqqueue_v2_field_name(Q)). -define(amqqueue_field_pid(Q), - case ?is_amqqueue_v2(Q) of - true -> ?amqqueue_v2_field_pid(Q); - false -> case ?is_amqqueue_v1(Q) of - true -> ?amqqueue_v1_field_pid(Q) - end - end). - --define(amqqueue_v1_vhost(Q), element(2, ?amqqueue_v1_field_name(Q))). + ?amqqueue_v2_field_pid(Q)). + -define(amqqueue_v2_vhost(Q), element(2, ?amqqueue_v2_field_name(Q))). -define(amqqueue_vhost_equals(Q, VHost), - ((?is_amqqueue_v2(Q) andalso - ?amqqueue_v2_vhost(Q) =:= VHost) orelse - (?is_amqqueue_v1(Q) andalso - ?amqqueue_v1_vhost(Q) =:= VHost))). - --ifdef(DEBUG_QUORUM_QUEUE_FF). --define(enable_quorum_queue_if_debug, - begin - rabbit_log:info( - "---- ENABLING quorum_queue as part of " - "?try_mnesia_tx_or_upgrade_amqqueue_and_retry() ----"), - ok = rabbit_feature_flags:enable(quorum_queue) - end). --else. --define(enable_quorum_queue_if_debug, noop). --endif. - --define(try_mnesia_tx_or_upgrade_amqqueue_and_retry(Expr1, Expr2), - try - ?enable_quorum_queue_if_debug, - Expr1 - catch - throw:{error, {bad_type, T}} when ?is_amqqueue(T) -> - Expr2; - throw:{aborted, {bad_type, T}} when ?is_amqqueue(T) -> - Expr2 - end). + (?is_amqqueue_v2(Q) andalso + ?amqqueue_v2_vhost(Q) =:= VHost)). diff --git a/deps/rabbit/include/amqqueue_v1.hrl b/deps/rabbit/include/amqqueue_v1.hrl deleted file mode 100644 index 04b2d72850..0000000000 --- a/deps/rabbit/include/amqqueue_v1.hrl +++ /dev/null @@ -1,20 +0,0 @@ --define(is_amqqueue_v1(Q), is_record(Q, amqqueue, 19)). - --define(amqqueue_v1_field_name(Q), element(2, Q)). --define(amqqueue_v1_field_durable(Q), element(3, Q)). --define(amqqueue_v1_field_auto_delete(Q), element(4, Q)). --define(amqqueue_v1_field_exclusive_owner(Q), element(5, Q)). --define(amqqueue_v1_field_arguments(Q), element(6, Q)). --define(amqqueue_v1_field_pid(Q), element(7, Q)). --define(amqqueue_v1_field_slave_pids(Q), element(8, Q)). --define(amqqueue_v1_field_sync_slave_pids(Q), element(9, Q)). --define(amqqueue_v1_field_recoverable_slaves(Q), element(10, Q)). --define(amqqueue_v1_field_policy(Q), element(11, Q)). --define(amqqueue_v1_field_operator_policy(Q), element(12, Q)). --define(amqqueue_v1_field_gm_pids(Q), element(13, Q)). --define(amqqueue_v1_field_decorators(Q), element(14, Q)). --define(amqqueue_v1_field_state(Q), element(15, Q)). --define(amqqueue_v1_field_policy_version(Q), element(16, Q)). --define(amqqueue_v1_field_slave_pids_pending_shutdown(Q), element(17, Q)). --define(amqqueue_v1_field_vhost(Q), element(18, Q)). --define(amqqueue_v1_field_options(Q), element(19, Q)). diff --git a/deps/rabbit/src/amqqueue.erl b/deps/rabbit/src/amqqueue.erl index a828a22a8d..f69eaa3147 100644 --- a/deps/rabbit/src/amqqueue.erl +++ b/deps/rabbit/src/amqqueue.erl @@ -117,7 +117,7 @@ type_state = #{} :: map() | '_' }). --type amqqueue() :: amqqueue_v1:amqqueue_v1() | amqqueue_v2(). +-type amqqueue() :: amqqueue_v2(). -type amqqueue_v2() :: #amqqueue{ name :: rabbit_amqqueue:name(), durable :: boolean(), @@ -143,8 +143,7 @@ -type ra_server_id() :: {Name :: atom(), Node :: node()}. --type amqqueue_pattern() :: amqqueue_v1:amqqueue_v1_pattern() | - amqqueue_v2_pattern(). +-type amqqueue_pattern() :: amqqueue_v2_pattern(). -type amqqueue_v2_pattern() :: #amqqueue{ name :: rabbit_amqqueue:name() | '_', durable :: '_', @@ -235,34 +234,20 @@ new(#resource{kind = queue} = Name, (is_binary(VHost) orelse VHost =:= undefined) andalso is_map(Options) andalso is_atom(Type) -> - case record_version_to_use() of - ?record_version -> - new_with_version( - ?record_version, - Name, - Pid, - Durable, - AutoDelete, - Owner, - Args, - VHost, - Options, - Type); - _ -> - amqqueue_v1:new( - Name, - Pid, - Durable, - AutoDelete, - Owner, - Args, - VHost, - Options, - Type) - end. + new_with_version( + ?record_version, + Name, + Pid, + Durable, + AutoDelete, + Owner, + Args, + VHost, + Options, + Type). -spec new_with_version -(amqqueue_v1 | amqqueue_v2, +(amqqueue_v2, rabbit_amqqueue:name(), pid() | ra_server_id() | none, boolean(), @@ -300,7 +285,7 @@ new_with_version(RecordVersion, ?amqqueue_v1_type). -spec new_with_version -(amqqueue_v1 | amqqueue_v2, +(amqqueue_v2, rabbit_amqqueue:name(), pid() | ra_server_id() | none, boolean(), @@ -337,114 +322,66 @@ new_with_version(?record_version, pid = Pid, vhost = VHost, options = Options, - type = ensure_type_compat(Type)}; -new_with_version(Version, - Name, - Pid, - Durable, - AutoDelete, - Owner, - Args, - VHost, - Options, - Type) - when ?is_backwards_compat_classic(Type) -> - amqqueue_v1:new_with_version( - Version, - Name, - Pid, - Durable, - AutoDelete, - Owner, - Args, - VHost, - Options). + type = ensure_type_compat(Type)}. -spec is_amqqueue(any()) -> boolean(). -is_amqqueue(#amqqueue{}) -> true; -is_amqqueue(Queue) -> amqqueue_v1:is_amqqueue(Queue). +is_amqqueue(#amqqueue{}) -> true. --spec record_version_to_use() -> amqqueue_v1 | amqqueue_v2. +-spec record_version_to_use() -> amqqueue_v2. record_version_to_use() -> - case rabbit_feature_flags:is_enabled(quorum_queue) of - true -> ?record_version; - false -> amqqueue_v1:record_version_to_use() - end. + ?record_version. -spec upgrade(amqqueue()) -> amqqueue(). -upgrade(#amqqueue{} = Queue) -> Queue; -upgrade(OldQueue) -> upgrade_to(record_version_to_use(), OldQueue). +upgrade(#amqqueue{} = Queue) -> Queue. --spec upgrade_to -(amqqueue_v2, amqqueue()) -> amqqueue_v2(); -(amqqueue_v1, amqqueue_v1:amqqueue_v1()) -> amqqueue_v1:amqqueue_v1(). +-spec upgrade_to(amqqueue_v2, amqqueue()) -> amqqueue_v2(). upgrade_to(?record_version, #amqqueue{} = Queue) -> - Queue; -upgrade_to(?record_version, OldQueue) -> - Fields = erlang:tuple_to_list(OldQueue) ++ [?amqqueue_v1_type, - undefined], - #amqqueue{} = erlang:list_to_tuple(Fields); -upgrade_to(Version, OldQueue) -> - amqqueue_v1:upgrade_to(Version, OldQueue). + Queue. % arguments -spec get_arguments(amqqueue()) -> rabbit_framing:amqp_table(). get_arguments(#amqqueue{arguments = Args}) -> - Args; -get_arguments(Queue) -> - amqqueue_v1:get_arguments(Queue). + Args. -spec set_arguments(amqqueue(), rabbit_framing:amqp_table()) -> amqqueue(). set_arguments(#amqqueue{} = Queue, Args) -> - Queue#amqqueue{arguments = Args}; -set_arguments(Queue, Args) -> - amqqueue_v1:set_arguments(Queue, Args). + Queue#amqqueue{arguments = Args}. % decorators -spec get_decorators(amqqueue()) -> [atom()] | none | undefined. get_decorators(#amqqueue{decorators = Decorators}) -> - Decorators; -get_decorators(Queue) -> - amqqueue_v1:get_decorators(Queue). + Decorators. -spec set_decorators(amqqueue(), [atom()] | none | undefined) -> amqqueue(). set_decorators(#amqqueue{} = Queue, Decorators) -> - Queue#amqqueue{decorators = Decorators}; -set_decorators(Queue, Decorators) -> - amqqueue_v1:set_decorators(Queue, Decorators). + Queue#amqqueue{decorators = Decorators}. -spec get_exclusive_owner(amqqueue()) -> pid() | none. get_exclusive_owner(#amqqueue{exclusive_owner = Owner}) -> - Owner; -get_exclusive_owner(Queue) -> - amqqueue_v1:get_exclusive_owner(Queue). + Owner. % gm_pids -spec get_gm_pids(amqqueue()) -> [{pid(), pid()}] | none. get_gm_pids(#amqqueue{gm_pids = GMPids}) -> - GMPids; -get_gm_pids(Queue) -> - amqqueue_v1:get_gm_pids(Queue). + GMPids. -spec set_gm_pids(amqqueue(), [{pid(), pid()}] | none) -> amqqueue(). set_gm_pids(#amqqueue{} = Queue, GMPids) -> - Queue#amqqueue{gm_pids = GMPids}; -set_gm_pids(Queue, GMPids) -> - amqqueue_v1:set_gm_pids(Queue, GMPids). + Queue#amqqueue{gm_pids = GMPids}. -spec get_leader(amqqueue_v2()) -> node(). @@ -454,106 +391,79 @@ get_leader(#amqqueue{type = rabbit_quorum_queue, pid = {_, Leader}}) -> Leader. -spec get_operator_policy(amqqueue()) -> binary() | none | undefined. -get_operator_policy(#amqqueue{operator_policy = OpPolicy}) -> OpPolicy; -get_operator_policy(Queue) -> amqqueue_v1:get_operator_policy(Queue). +get_operator_policy(#amqqueue{operator_policy = OpPolicy}) -> OpPolicy. -spec set_operator_policy(amqqueue(), binary() | none | undefined) -> amqqueue(). set_operator_policy(#amqqueue{} = Queue, Policy) -> - Queue#amqqueue{operator_policy = Policy}; -set_operator_policy(Queue, Policy) -> - amqqueue_v1:set_operator_policy(Queue, Policy). + Queue#amqqueue{operator_policy = Policy}. % name -spec get_name(amqqueue()) -> rabbit_amqqueue:name(). -get_name(#amqqueue{name = Name}) -> Name; -get_name(Queue) -> amqqueue_v1:get_name(Queue). +get_name(#amqqueue{name = Name}) -> Name. -spec set_name(amqqueue(), rabbit_amqqueue:name()) -> amqqueue(). set_name(#amqqueue{} = Queue, Name) -> - Queue#amqqueue{name = Name}; -set_name(Queue, Name) -> - amqqueue_v1:set_name(Queue, Name). + Queue#amqqueue{name = Name}. -spec get_options(amqqueue()) -> map(). -get_options(#amqqueue{options = Options}) -> Options; -get_options(Queue) -> amqqueue_v1:get_options(Queue). +get_options(#amqqueue{options = Options}) -> Options. -spec set_options(amqqueue(), map()) -> amqqueue(). set_options(#amqqueue{} = Queue, Options) -> - Queue#amqqueue{options = Options}; -set_options(Queue, Options) -> - amqqueue_v1:set_options(Queue, Options). + Queue#amqqueue{options = Options}. % pid --spec get_pid -(amqqueue_v2()) -> pid() | ra_server_id() | none; -(amqqueue_v1:amqqueue_v1()) -> pid() | none. +-spec get_pid(amqqueue_v2()) -> pid() | ra_server_id() | none. -get_pid(#amqqueue{pid = Pid}) -> Pid; -get_pid(Queue) -> amqqueue_v1:get_pid(Queue). +get_pid(#amqqueue{pid = Pid}) -> Pid. --spec set_pid -(amqqueue_v2(), pid() | ra_server_id() | none) -> amqqueue_v2(); -(amqqueue_v1:amqqueue_v1(), pid() | none) -> amqqueue_v1:amqqueue_v1(). +-spec set_pid(amqqueue_v2(), pid() | ra_server_id() | none) -> amqqueue_v2(). set_pid(#amqqueue{} = Queue, Pid) -> - Queue#amqqueue{pid = Pid}; -set_pid(Queue, Pid) -> - amqqueue_v1:set_pid(Queue, Pid). + Queue#amqqueue{pid = Pid}. % policy -spec get_policy(amqqueue()) -> proplists:proplist() | none | undefined. -get_policy(#amqqueue{policy = Policy}) -> Policy; -get_policy(Queue) -> amqqueue_v1:get_policy(Queue). +get_policy(#amqqueue{policy = Policy}) -> Policy. -spec set_policy(amqqueue(), binary() | none | undefined) -> amqqueue(). set_policy(#amqqueue{} = Queue, Policy) -> - Queue#amqqueue{policy = Policy}; -set_policy(Queue, Policy) -> - amqqueue_v1:set_policy(Queue, Policy). + Queue#amqqueue{policy = Policy}. % policy_version -spec get_policy_version(amqqueue()) -> non_neg_integer(). get_policy_version(#amqqueue{policy_version = PV}) -> - PV; -get_policy_version(Queue) -> - amqqueue_v1:get_policy_version(Queue). + PV. -spec set_policy_version(amqqueue(), non_neg_integer()) -> amqqueue(). set_policy_version(#amqqueue{} = Queue, PV) -> - Queue#amqqueue{policy_version = PV}; -set_policy_version(Queue, PV) -> - amqqueue_v1:set_policy_version(Queue, PV). + Queue#amqqueue{policy_version = PV}. % recoverable_slaves -spec get_recoverable_slaves(amqqueue()) -> [atom()] | none. get_recoverable_slaves(#amqqueue{recoverable_slaves = Slaves}) -> - Slaves; -get_recoverable_slaves(Queue) -> - amqqueue_v1:get_recoverable_slaves(Queue). + Slaves. -spec set_recoverable_slaves(amqqueue(), [atom()] | none) -> amqqueue(). set_recoverable_slaves(#amqqueue{} = Queue, Slaves) -> - Queue#amqqueue{recoverable_slaves = Slaves}; -set_recoverable_slaves(Queue, Slaves) -> - amqqueue_v1:set_recoverable_slaves(Queue, Slaves). + Queue#amqqueue{recoverable_slaves = Slaves}. % type_state (new in v2) @@ -574,16 +484,12 @@ set_type_state(Queue, _TState) -> -spec get_slave_pids(amqqueue()) -> [pid()] | none. get_slave_pids(#amqqueue{slave_pids = Slaves}) -> - Slaves; -get_slave_pids(Queue) -> - amqqueue_v1:get_slave_pids(Queue). + Slaves. -spec set_slave_pids(amqqueue(), [pid()] | none) -> amqqueue(). set_slave_pids(#amqqueue{} = Queue, SlavePids) -> - Queue#amqqueue{slave_pids = SlavePids}; -set_slave_pids(Queue, SlavePids) -> - amqqueue_v1:set_slave_pids(Queue, SlavePids). + Queue#amqqueue{slave_pids = SlavePids}. % slave_pids_pending_shutdown @@ -591,70 +497,54 @@ set_slave_pids(Queue, SlavePids) -> get_slave_pids_pending_shutdown( #amqqueue{slave_pids_pending_shutdown = Slaves}) -> - Slaves; -get_slave_pids_pending_shutdown(Queue) -> - amqqueue_v1:get_slave_pids_pending_shutdown(Queue). + Slaves. -spec set_slave_pids_pending_shutdown(amqqueue(), [pid()]) -> amqqueue(). set_slave_pids_pending_shutdown(#amqqueue{} = Queue, SlavePids) -> - Queue#amqqueue{slave_pids_pending_shutdown = SlavePids}; -set_slave_pids_pending_shutdown(Queue, SlavePids) -> - amqqueue_v1:set_slave_pids_pending_shutdown(Queue, SlavePids). + Queue#amqqueue{slave_pids_pending_shutdown = SlavePids}. % state -spec get_state(amqqueue()) -> atom() | none. -get_state(#amqqueue{state = State}) -> State; -get_state(Queue) -> amqqueue_v1:get_state(Queue). +get_state(#amqqueue{state = State}) -> State. -spec set_state(amqqueue(), atom() | none) -> amqqueue(). set_state(#amqqueue{} = Queue, State) -> - Queue#amqqueue{state = State}; -set_state(Queue, State) -> - amqqueue_v1:set_state(Queue, State). + Queue#amqqueue{state = State}. % sync_slave_pids -spec get_sync_slave_pids(amqqueue()) -> [pid()] | none. get_sync_slave_pids(#amqqueue{sync_slave_pids = Pids}) -> - Pids; -get_sync_slave_pids(Queue) -> - amqqueue_v1:get_sync_slave_pids(Queue). + Pids. -spec set_sync_slave_pids(amqqueue(), [pid()] | none) -> amqqueue(). set_sync_slave_pids(#amqqueue{} = Queue, Pids) -> - Queue#amqqueue{sync_slave_pids = Pids}; -set_sync_slave_pids(Queue, Pids) -> - amqqueue_v1:set_sync_slave_pids(Queue, Pids). + Queue#amqqueue{sync_slave_pids = Pids}. %% New in v2. -spec get_type(amqqueue()) -> atom(). -get_type(#amqqueue{type = Type}) -> Type; -get_type(Queue) when ?is_amqqueue(Queue) -> ?amqqueue_v1_type. +get_type(#amqqueue{type = Type}) -> Type. -spec get_vhost(amqqueue()) -> rabbit_types:vhost() | undefined. -get_vhost(#amqqueue{vhost = VHost}) -> VHost; -get_vhost(Queue) -> amqqueue_v1:get_vhost(Queue). +get_vhost(#amqqueue{vhost = VHost}) -> VHost. -spec is_auto_delete(amqqueue()) -> boolean(). is_auto_delete(#amqqueue{auto_delete = AutoDelete}) -> - AutoDelete; -is_auto_delete(Queue) -> - amqqueue_v1:is_auto_delete(Queue). + AutoDelete. -spec is_durable(amqqueue()) -> boolean(). -is_durable(#amqqueue{durable = Durable}) -> Durable; -is_durable(Queue) -> amqqueue_v1:is_durable(Queue). +is_durable(#amqqueue{durable = Durable}) -> Durable. -spec is_classic(amqqueue()) -> boolean(). @@ -667,51 +557,27 @@ is_quorum(Queue) -> get_type(Queue) =:= rabbit_quorum_queue. fields() -> - case record_version_to_use() of - ?record_version -> fields(?record_version); - _ -> amqqueue_v1:fields() - end. + fields(?record_version). -fields(?record_version) -> record_info(fields, amqqueue); -fields(Version) -> amqqueue_v1:fields(Version). +fields(?record_version) -> record_info(fields, amqqueue). field_vhost() -> - case record_version_to_use() of - ?record_version -> #amqqueue.vhost; - _ -> amqqueue_v1:field_vhost() - end. + #amqqueue.vhost. -spec pattern_match_all() -> amqqueue_pattern(). pattern_match_all() -> - case record_version_to_use() of - ?record_version -> #amqqueue{_ = '_'}; - _ -> amqqueue_v1:pattern_match_all() - end. + #amqqueue{_ = '_'}. -spec pattern_match_on_name(rabbit_amqqueue:name()) -> amqqueue_pattern(). pattern_match_on_name(Name) -> - case record_version_to_use() of - ?record_version -> #amqqueue{name = Name, _ = '_'}; - _ -> amqqueue_v1:pattern_match_on_name(Name) - end. + #amqqueue{name = Name, _ = '_'}. -spec pattern_match_on_type(atom()) -> amqqueue_pattern(). pattern_match_on_type(Type) -> - case record_version_to_use() of - ?record_version -> - #amqqueue{type = Type, _ = '_'}; - _ when ?is_backwards_compat_classic(Type) -> - amqqueue_v1:pattern_match_all(); - %% FIXME: We try a pattern which should never match when the - %% `quorum_queue` feature flag is not enabled yet. Is there - %% a better solution? - _ -> - amqqueue_v1:pattern_match_on_name( - rabbit_misc:r(<<0>>, queue, <<0>>)) - end. + #amqqueue{type = Type, _ = '_'}. -spec reset_mirroring_and_decorators(amqqueue()) -> amqqueue(). @@ -719,9 +585,7 @@ reset_mirroring_and_decorators(#amqqueue{} = Queue) -> Queue#amqqueue{slave_pids = [], sync_slave_pids = [], gm_pids = [], - decorators = undefined}; -reset_mirroring_and_decorators(Queue) -> - amqqueue_v1:reset_mirroring_and_decorators(Queue). + decorators = undefined}. -spec set_immutable(amqqueue()) -> amqqueue(). @@ -733,9 +597,7 @@ set_immutable(#amqqueue{} = Queue) -> gm_pids = none, policy = none, decorators = none, - state = none}; -set_immutable(Queue) -> - amqqueue_v1:set_immutable(Queue). + state = none}. -spec qnode(amqqueue() | pid() | ra_server_id()) -> node(). diff --git a/deps/rabbit/src/amqqueue_v1.erl b/deps/rabbit/src/amqqueue_v1.erl deleted file mode 100644 index e6daef0421..0000000000 --- a/deps/rabbit/src/amqqueue_v1.erl +++ /dev/null @@ -1,593 +0,0 @@ -%% This Source Code Form is subject to the terms of the Mozilla Public -%% License, v. 2.0. If a copy of the MPL was not distributed with this -%% file, You can obtain one at https://mozilla.org/MPL/2.0/. -%% -%% Copyright (c) 2018-2022 VMware, Inc. or its affiliates. All rights reserved. -%% - --module(amqqueue_v1). - --include_lib("rabbit_common/include/resource.hrl"). --include("amqqueue.hrl"). - --export([new/8, - new/9, - new_with_version/9, - new_with_version/10, - fields/0, - fields/1, - field_vhost/0, - record_version_to_use/0, - upgrade/1, - upgrade_to/2, - % arguments - get_arguments/1, - set_arguments/2, - % decorators - get_decorators/1, - set_decorators/2, - % exclusive_owner - get_exclusive_owner/1, - % gm_pids - get_gm_pids/1, - set_gm_pids/2, - get_leader/1, - % name (#resource) - get_name/1, - set_name/2, - % operator_policy - get_operator_policy/1, - set_operator_policy/2, - % options - get_options/1, - set_options/2, - % pid - get_pid/1, - set_pid/2, - % policy - get_policy/1, - set_policy/2, - % policy_version - get_policy_version/1, - set_policy_version/2, - % type_state - get_type_state/1, - set_type_state/2, - % recoverable_slaves - get_recoverable_slaves/1, - set_recoverable_slaves/2, - % slave_pids - get_slave_pids/1, - set_slave_pids/2, - % slave_pids_pending_shutdown - get_slave_pids_pending_shutdown/1, - set_slave_pids_pending_shutdown/2, - % state - get_state/1, - set_state/2, - % sync_slave_pids - get_sync_slave_pids/1, - set_sync_slave_pids/2, - get_type/1, - get_vhost/1, - is_amqqueue/1, - is_auto_delete/1, - is_durable/1, - is_classic/1, - is_quorum/1, - pattern_match_all/0, - pattern_match_on_name/1, - pattern_match_on_type/1, - reset_mirroring_and_decorators/1, - set_immutable/1, - qnode/1, - macros/0]). - --define(record_version, ?MODULE). --define(is_backwards_compat_classic(T), - (T =:= classic orelse T =:= ?amqqueue_v1_type)). - --record(amqqueue, { - name :: rabbit_amqqueue:name() | '_', %% immutable - durable :: boolean() | '_', %% immutable - auto_delete :: boolean() | '_', %% immutable - exclusive_owner = none :: pid() | none | '_', %% immutable - arguments = [] :: rabbit_framing:amqp_table() | '_', %% immutable - pid :: pid() | none | '_', %% durable (just so we - %% know home node) - slave_pids = [] :: [pid()] | none | '_', %% transient - sync_slave_pids = [] :: [pid()] | none| '_',%% transient - recoverable_slaves = [] :: [atom()] | none | '_', %% durable - policy :: binary() | none | undefined | '_', %% durable, implicit - %% update as above - operator_policy :: binary() | none | undefined | '_', %% durable, - %% implicit - %% update - %% as above - gm_pids = [] :: [{pid(), pid()}] | none | '_', %% transient - decorators :: [atom()] | none | undefined | '_', %% transient, - %% recalculated - %% as above - state = live :: atom() | none | '_', %% durable (have we crashed?) - policy_version = 0 :: non_neg_integer() | '_', - slave_pids_pending_shutdown = [] :: [pid()] | '_', - vhost :: rabbit_types:vhost() | undefined | '_', %% secondary index - options = #{} :: map() | '_' - }). - --type amqqueue() :: amqqueue_v1(). --type amqqueue_v1() :: #amqqueue{ - name :: rabbit_amqqueue:name(), - durable :: boolean(), - auto_delete :: boolean(), - exclusive_owner :: pid() | none, - arguments :: rabbit_framing:amqp_table(), - pid :: pid() | none, - slave_pids :: [pid()] | none, - sync_slave_pids :: [pid()] | none, - recoverable_slaves :: [atom()] | none, - policy :: binary() | none | undefined, - operator_policy :: binary() | none | undefined, - gm_pids :: [{pid(), pid()}] | none, - decorators :: [atom()] | none | undefined, - state :: atom() | none, - policy_version :: non_neg_integer(), - slave_pids_pending_shutdown :: [pid()], - vhost :: rabbit_types:vhost() | undefined, - options :: map() - }. - --type amqqueue_pattern() :: amqqueue_v1_pattern(). --type amqqueue_v1_pattern() :: #amqqueue{ - name :: rabbit_amqqueue:name() | '_', - durable :: '_', - auto_delete :: '_', - exclusive_owner :: '_', - arguments :: '_', - pid :: '_', - slave_pids :: '_', - sync_slave_pids :: '_', - recoverable_slaves :: '_', - policy :: '_', - operator_policy :: '_', - gm_pids :: '_', - decorators :: '_', - state :: '_', - policy_version :: '_', - slave_pids_pending_shutdown :: '_', - vhost :: '_', - options :: '_' - }. - --export_type([amqqueue/0, - amqqueue_v1/0, - amqqueue_pattern/0, - amqqueue_v1_pattern/0]). - --spec new(rabbit_amqqueue:name(), - pid() | none, - boolean(), - boolean(), - pid() | none, - rabbit_framing:amqp_table(), - rabbit_types:vhost() | undefined, - map()) -> amqqueue(). - -new(#resource{kind = queue} = Name, - Pid, - Durable, - AutoDelete, - Owner, - Args, - VHost, - Options) - when (is_pid(Pid) orelse Pid =:= none) andalso - is_boolean(Durable) andalso - is_boolean(AutoDelete) andalso - (is_pid(Owner) orelse Owner =:= none) andalso - is_list(Args) andalso - (is_binary(VHost) orelse VHost =:= undefined) andalso - is_map(Options) -> - new_with_version( - ?record_version, - Name, - Pid, - Durable, - AutoDelete, - Owner, - Args, - VHost, - Options). - --spec new(rabbit_amqqueue:name(), - pid() | none, - boolean(), - boolean(), - pid() | none, - rabbit_framing:amqp_table(), - rabbit_types:vhost() | undefined, - map(), - ?amqqueue_v1_type | classic) -> amqqueue(). - -new(#resource{kind = queue} = Name, - Pid, - Durable, - AutoDelete, - Owner, - Args, - VHost, - Options, - Type) - when (is_pid(Pid) orelse Pid =:= none) andalso - is_boolean(Durable) andalso - is_boolean(AutoDelete) andalso - (is_pid(Owner) orelse Owner =:= none) andalso - is_list(Args) andalso - (is_binary(VHost) orelse VHost =:= undefined) andalso - is_map(Options) andalso - ?is_backwards_compat_classic(Type) -> - new( - Name, - Pid, - Durable, - AutoDelete, - Owner, - Args, - VHost, - Options). - --spec new_with_version(amqqueue_v1, - rabbit_amqqueue:name(), - pid() | none, - boolean(), - boolean(), - pid() | none, - rabbit_framing:amqp_table(), - rabbit_types:vhost() | undefined, - map()) -> amqqueue(). - -new_with_version(?record_version, - #resource{kind = queue} = Name, - Pid, - Durable, - AutoDelete, - Owner, - Args, - VHost, - Options) - when (is_pid(Pid) orelse Pid =:= none) andalso - is_boolean(Durable) andalso - is_boolean(AutoDelete) andalso - (is_pid(Owner) orelse Owner =:= none) andalso - is_list(Args) andalso - (is_binary(VHost) orelse VHost =:= undefined) andalso - is_map(Options) -> - #amqqueue{name = Name, - durable = Durable, - auto_delete = AutoDelete, - arguments = Args, - exclusive_owner = Owner, - pid = Pid, - vhost = VHost, - options = Options}. - --spec new_with_version(amqqueue_v1, - rabbit_amqqueue:name(), - pid() | none, - boolean(), - boolean(), - pid() | none, - rabbit_framing:amqp_table(), - rabbit_types:vhost() | undefined, - map(), - ?amqqueue_v1_type | classic) -> amqqueue(). - -new_with_version(?record_version, - #resource{kind = queue} = Name, - Pid, - Durable, - AutoDelete, - Owner, - Args, - VHost, - Options, - Type) - when (is_pid(Pid) orelse Pid =:= none) andalso - is_boolean(Durable) andalso - is_boolean(AutoDelete) andalso - (is_pid(Owner) orelse Owner =:= none) andalso - is_list(Args) andalso - (is_binary(VHost) orelse VHost =:= undefined) andalso - is_map(Options) andalso - ?is_backwards_compat_classic(Type) -> - new_with_version( - ?record_version, - Name, - Pid, - Durable, - AutoDelete, - Owner, - Args, - VHost, - Options). - --spec is_amqqueue(any()) -> boolean(). - -is_amqqueue(#amqqueue{}) -> true; -is_amqqueue(_) -> false. - --spec record_version_to_use() -> amqqueue_v1. - -record_version_to_use() -> - ?record_version. - --spec upgrade(amqqueue()) -> amqqueue(). - -upgrade(#amqqueue{} = Queue) -> Queue. - --spec upgrade_to(amqqueue_v1, amqqueue()) -> amqqueue(). - -upgrade_to(?record_version, #amqqueue{} = Queue) -> - Queue. - -% arguments - --spec get_arguments(amqqueue()) -> rabbit_framing:amqp_table(). - -get_arguments(#amqqueue{arguments = Args}) -> Args. - --spec set_arguments(amqqueue(), rabbit_framing:amqp_table()) -> amqqueue(). - -set_arguments(#amqqueue{} = Queue, Args) -> - Queue#amqqueue{arguments = Args}. - -% decorators - --spec get_decorators(amqqueue()) -> [atom()] | none | undefined. - -get_decorators(#amqqueue{decorators = Decorators}) -> Decorators. - --spec set_decorators(amqqueue(), [atom()] | none | undefined) -> amqqueue(). - -set_decorators(#amqqueue{} = Queue, Decorators) -> - Queue#amqqueue{decorators = Decorators}. - --spec get_exclusive_owner(amqqueue()) -> pid() | none. - -get_exclusive_owner(#amqqueue{exclusive_owner = Owner}) -> Owner. - -% gm_pids - --spec get_gm_pids(amqqueue()) -> [{pid(), pid()}] | none. - -get_gm_pids(#amqqueue{gm_pids = GMPids}) -> GMPids. - --spec set_gm_pids(amqqueue(), [{pid(), pid()}] | none) -> amqqueue(). - -set_gm_pids(#amqqueue{} = Queue, GMPids) -> - Queue#amqqueue{gm_pids = GMPids}. - --spec get_leader(amqqueue_v1()) -> no_return(). - -get_leader(_) -> throw({unsupported, ?record_version, get_leader}). - -% operator_policy - --spec get_operator_policy(amqqueue()) -> binary() | none | undefined. - -get_operator_policy(#amqqueue{operator_policy = OpPolicy}) -> OpPolicy. - --spec set_operator_policy(amqqueue(), binary() | none | undefined) -> - amqqueue(). - -set_operator_policy(#amqqueue{} = Queue, OpPolicy) -> - Queue#amqqueue{operator_policy = OpPolicy}. - -% name - --spec get_name(amqqueue()) -> rabbit_amqqueue:name(). - -get_name(#amqqueue{name = Name}) -> Name. - --spec set_name(amqqueue(), rabbit_amqqueue:name()) -> amqqueue(). - -set_name(#amqqueue{} = Queue, Name) -> - Queue#amqqueue{name = Name}. - -%% options - --spec get_options(amqqueue()) -> map(). - -get_options(#amqqueue{options = Options}) -> Options. - --spec set_options(amqqueue(), map()) -> amqqueue(). - -set_options(#amqqueue{} = Queue, Options) -> - Queue#amqqueue{options = Options}. - -% pid - --spec get_pid -(amqqueue_v1:amqqueue_v1()) -> pid() | none. - -get_pid(#amqqueue{pid = Pid}) -> Pid. - --spec set_pid -(amqqueue_v1:amqqueue_v1(), pid() | none) -> amqqueue_v1:amqqueue_v1(). - -set_pid(#amqqueue{} = Queue, Pid) -> - Queue#amqqueue{pid = Pid}. - -% policy - --spec get_policy(amqqueue()) -> proplists:proplist() | none | undefined. - -get_policy(#amqqueue{policy = Policy}) -> Policy. - --spec set_policy(amqqueue(), binary() | none | undefined) -> amqqueue(). - -set_policy(#amqqueue{} = Queue, Policy) -> - Queue#amqqueue{policy = Policy}. - -% policy_version - --spec get_policy_version(amqqueue()) -> non_neg_integer(). - -get_policy_version(#amqqueue{policy_version = PV}) -> - PV. - --spec set_policy_version(amqqueue(), non_neg_integer()) -> amqqueue(). - -set_policy_version(#amqqueue{} = Queue, PV) -> - Queue#amqqueue{policy_version = PV}. - -% recoverable_slaves - --spec get_recoverable_slaves(amqqueue()) -> [atom()] | none. - -get_recoverable_slaves(#amqqueue{recoverable_slaves = Slaves}) -> - Slaves. - --spec set_recoverable_slaves(amqqueue(), [atom()] | none) -> amqqueue(). - -set_recoverable_slaves(#amqqueue{} = Queue, Slaves) -> - Queue#amqqueue{recoverable_slaves = Slaves}. - -% type_state (new in v2) - --spec get_type_state(amqqueue()) -> no_return(). - -get_type_state(_) -> throw({unsupported, ?record_version, get_type_state}). - --spec set_type_state(amqqueue(), [node()]) -> no_return(). - -set_type_state(_, _) -> - throw({unsupported, ?record_version, set_type_state}). - -% slave_pids - -get_slave_pids(#amqqueue{slave_pids = Slaves}) -> - Slaves. - -set_slave_pids(#amqqueue{} = Queue, SlavePids) -> - Queue#amqqueue{slave_pids = SlavePids}. - -% slave_pids_pending_shutdown - -get_slave_pids_pending_shutdown( - #amqqueue{slave_pids_pending_shutdown = Slaves}) -> - Slaves. - -set_slave_pids_pending_shutdown(#amqqueue{} = Queue, SlavePids) -> - Queue#amqqueue{slave_pids_pending_shutdown = SlavePids}. - -% state - --spec get_state(amqqueue()) -> atom() | none. - -get_state(#amqqueue{state = State}) -> State. - --spec set_state(amqqueue(), atom() | none) -> amqqueue(). - -set_state(#amqqueue{} = Queue, State) -> - Queue#amqqueue{state = State}. - -% sync_slave_pids - --spec get_sync_slave_pids(amqqueue()) -> [pid()] | none. - -get_sync_slave_pids(#amqqueue{sync_slave_pids = Pids}) -> - Pids. - --spec set_sync_slave_pids(amqqueue(), [pid()] | none) -> amqqueue(). - -set_sync_slave_pids(#amqqueue{} = Queue, Pids) -> - Queue#amqqueue{sync_slave_pids = Pids}. - -%% New in v2. - --spec get_type(amqqueue()) -> atom(). - -get_type(Queue) when ?is_amqqueue(Queue) -> ?amqqueue_v1_type. - --spec get_vhost(amqqueue()) -> rabbit_types:vhost() | undefined. - -get_vhost(#amqqueue{vhost = VHost}) -> VHost. - --spec is_auto_delete(amqqueue()) -> boolean(). - -is_auto_delete(#amqqueue{auto_delete = AutoDelete}) -> AutoDelete. - --spec is_durable(amqqueue()) -> boolean(). - -is_durable(#amqqueue{durable = Durable}) -> Durable. - --spec is_classic(amqqueue()) -> boolean(). - -is_classic(Queue) -> - get_type(Queue) =:= ?amqqueue_v1_type. - --spec is_quorum(amqqueue()) -> boolean(). - -is_quorum(Queue) when ?is_amqqueue(Queue) -> - false. - -fields() -> fields(?record_version). - -fields(?record_version) -> record_info(fields, amqqueue). - -field_vhost() -> #amqqueue.vhost. - --spec pattern_match_all() -> amqqueue_pattern(). - -pattern_match_all() -> #amqqueue{_ = '_'}. - --spec pattern_match_on_name(rabbit_amqqueue:name()) -> - amqqueue_pattern(). - -pattern_match_on_name(Name) -> #amqqueue{name = Name, _ = '_'}. - --spec pattern_match_on_type(atom()) -> no_return(). - -pattern_match_on_type(_) -> - throw({unsupported, ?record_version, pattern_match_on_type}). - -reset_mirroring_and_decorators(#amqqueue{} = Queue) -> - Queue#amqqueue{slave_pids = [], - sync_slave_pids = [], - gm_pids = [], - decorators = undefined}. - -set_immutable(#amqqueue{} = Queue) -> - Queue#amqqueue{pid = none, - slave_pids = none, - sync_slave_pids = none, - recoverable_slaves = none, - gm_pids = none, - policy = none, - decorators = none, - state = none}. - --spec qnode(amqqueue() | pid()) -> node(). - -qnode(Queue) when ?is_amqqueue(Queue) -> - QPid = get_pid(Queue), - qnode(QPid); -qnode(QPid) when is_pid(QPid) -> - node(QPid). - -macros() -> - io:format( - "-define(is_~s(Q), is_record(Q, amqqueue, ~b)).~n~n", - [?record_version, record_info(size, amqqueue)]), - %% The field number starts at 2 because the first element is the - %% record name. - macros(record_info(fields, amqqueue), 2). - -macros([Field | Rest], I) -> - io:format( - "-define(~s_field_~s(Q), element(~b, Q)).~n", - [?record_version, Field, I]), - macros(Rest, I + 1); -macros([], _) -> - ok. diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index 97c38c8780..4d60a4d514 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -164,11 +164,6 @@ start(Qs) -> ok. mark_local_durable_queues_stopped(VHost) -> - ?try_mnesia_tx_or_upgrade_amqqueue_and_retry( - do_mark_local_durable_queues_stopped(VHost), - do_mark_local_durable_queues_stopped(VHost)). - -do_mark_local_durable_queues_stopped(VHost) -> Qs = find_local_durable_queues(VHost), rabbit_misc:execute_mnesia_transaction( fun() -> @@ -258,12 +253,7 @@ get_queue_type(Args) -> {created | existing, amqqueue:amqqueue()} | queue_absent(). internal_declare(Q, Recover) -> - ?try_mnesia_tx_or_upgrade_amqqueue_and_retry( - do_internal_declare(Q, Recover), - begin - Q1 = amqqueue:upgrade(Q), - do_internal_declare(Q1, Recover) - end). + do_internal_declare(Q, Recover). do_internal_declare(Q, true) -> rabbit_misc:execute_mnesia_tx_with_tail( @@ -311,14 +301,6 @@ update(Name, Fun) -> %% only really used for quorum queues to ensure the rabbit_queue record %% is initialised ensure_rabbit_queue_record_is_initialized(Q) -> - ?try_mnesia_tx_or_upgrade_amqqueue_and_retry( - do_ensure_rabbit_queue_record_is_initialized(Q), - begin - Q1 = amqqueue:upgrade(Q), - do_ensure_rabbit_queue_record_is_initialized(Q1) - end). - -do_ensure_rabbit_queue_record_is_initialized(Q) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> ok = store_queue(Q), diff --git a/deps/rabbit/src/rabbit_amqqueue_process.erl b/deps/rabbit/src/rabbit_amqqueue_process.erl index 0c4077a4aa..ba8e7d4bae 100644 --- a/deps/rabbit/src/rabbit_amqqueue_process.erl +++ b/deps/rabbit/src/rabbit_amqqueue_process.erl @@ -316,12 +316,7 @@ terminate(_Reason, State = #q{q = Q}) -> Q2 = amqqueue:set_state(Q, crashed), rabbit_misc:execute_mnesia_transaction( fun() -> - ?try_mnesia_tx_or_upgrade_amqqueue_and_retry( - rabbit_amqqueue:store_queue(Q2), - begin - Q3 = amqqueue:upgrade(Q2), - rabbit_amqqueue:store_queue(Q3) - end) + rabbit_amqqueue:store_queue(Q2) end), BQS end, State). diff --git a/deps/rabbit/src/rabbit_policy.erl b/deps/rabbit/src/rabbit_policy.erl index 8a2c0dc7fb..5a372c40c4 100644 --- a/deps/rabbit/src/rabbit_policy.erl +++ b/deps/rabbit/src/rabbit_policy.erl @@ -285,18 +285,10 @@ recover0() -> OpPolicy1 = match(QName, OpPolicies), Q2 = amqqueue:set_operator_policy(Q1, OpPolicy1), Q3 = rabbit_queue_decorator:set(Q2), - ?try_mnesia_tx_or_upgrade_amqqueue_and_retry( - rabbit_misc:execute_mnesia_transaction( - fun () -> - mnesia:write(rabbit_durable_queue, Q3, write) - end), - begin - Q4 = amqqueue:upgrade(Q3), - rabbit_misc:execute_mnesia_transaction( - fun () -> - mnesia:write(rabbit_durable_queue, Q4, write) - end) - end) + rabbit_misc:execute_mnesia_transaction( + fun () -> + mnesia:write(rabbit_durable_queue, Q3, write) + end) end || Q0 <- Qs], ok. diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 004d496758..d6f5bbac28 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -115,7 +115,7 @@ -spec is_enabled() -> boolean(). is_enabled() -> - rabbit_feature_flags:is_enabled(quorum_queue). + true. -spec is_compatible(boolean(), boolean(), boolean()) -> boolean(). is_compatible(_Durable = true, diff --git a/deps/rabbit/test/amqqueue_backward_compatibility_SUITE.erl b/deps/rabbit/test/amqqueue_backward_compatibility_SUITE.erl index a02c4721bc..db2a4c8e48 100644 --- a/deps/rabbit/test/amqqueue_backward_compatibility_SUITE.erl +++ b/deps/rabbit/test/amqqueue_backward_compatibility_SUITE.erl @@ -14,23 +14,17 @@ init_per_testcase/2, end_per_testcase/2, - new_amqqueue_v1_is_amqqueue/1, new_amqqueue_v2_is_amqqueue/1, random_term_is_not_amqqueue/1, - amqqueue_v1_is_durable/1, amqqueue_v2_is_durable/1, random_term_is_not_durable/1, - amqqueue_v1_state_matching/1, amqqueue_v2_state_matching/1, random_term_state_matching/1, - amqqueue_v1_type_matching/1, amqqueue_v2_type_matching/1, - random_term_type_matching/1, - - upgrade_v1_to_v2/1 + random_term_type_matching/1 ]). -define(long_tuple, {random_tuple, a, b, c, d, e, f, g, h, i, j, k, l, m, @@ -43,16 +37,12 @@ all() -> groups() -> [ - {parallel_tests, [parallel], [new_amqqueue_v1_is_amqqueue, - new_amqqueue_v2_is_amqqueue, + {parallel_tests, [parallel], [new_amqqueue_v2_is_amqqueue, random_term_is_not_amqqueue, - amqqueue_v1_is_durable, amqqueue_v2_is_durable, random_term_is_not_durable, - amqqueue_v1_state_matching, amqqueue_v2_state_matching, random_term_state_matching, - amqqueue_v1_type_matching, amqqueue_v2_type_matching, random_term_type_matching]} ]. @@ -66,32 +56,6 @@ end_per_group(_, Config) -> Config. init_per_testcase(_, Config) -> Config. end_per_testcase(_, Config) -> Config. -new_amqqueue_v1_is_amqqueue(_) -> - VHost = <<"/">>, - Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1), - Queue = amqqueue:new_with_version(amqqueue_v1, - Name, - self(), - false, - false, - none, - [], - VHost, - #{}, - ?amqqueue_v1_type), - ?assert(?is_amqqueue(Queue)), - ?assert(?is_amqqueue_v1(Queue)), - ?assert(not ?is_amqqueue_v2(Queue)), - ?assert(?amqqueue_is_classic(Queue)), - ?assert(amqqueue:is_classic(Queue)), - ?assert(not ?amqqueue_is_quorum(Queue)), - ?assert(not ?amqqueue_vhost_equals(Queue, <<"frazzle">>)), - ?assert(?amqqueue_has_valid_pid(Queue)), - ?assert(?amqqueue_pid_equals(Queue, self())), - ?assert(?amqqueue_pids_are_equal(Queue, Queue)), - ?assert(?amqqueue_pid_runs_on_local_node(Queue)), - ?assert(amqqueue:qnode(Queue) == node()). - new_amqqueue_v2_is_amqqueue(_) -> VHost = <<"/">>, Name = rabbit_misc:r(VHost, queue, my_amqqueue_v2), @@ -107,7 +71,6 @@ new_amqqueue_v2_is_amqqueue(_) -> rabbit_classic_queue), ?assert(?is_amqqueue(Queue)), ?assert(?is_amqqueue_v2(Queue)), - ?assert(not ?is_amqqueue_v1(Queue)), ?assert(?amqqueue_is_classic(Queue)), ?assert(amqqueue:is_classic(Queue)), ?assert(not ?amqqueue_is_quorum(Queue)), @@ -121,40 +84,13 @@ new_amqqueue_v2_is_amqqueue(_) -> random_term_is_not_amqqueue(_) -> Term = ?long_tuple, ?assert(not ?is_amqqueue(Term)), - ?assert(not ?is_amqqueue_v2(Term)), - ?assert(not ?is_amqqueue_v1(Term)). + ?assert(not ?is_amqqueue_v2(Term)). %% ------------------------------------------------------------------- -amqqueue_v1_is_durable(_) -> - VHost = <<"/">>, - Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1), - TransientQueue = amqqueue:new_with_version(amqqueue_v1, - Name, - self(), - false, - false, - none, - [], - VHost, - #{}, - ?amqqueue_v1_type), - DurableQueue = amqqueue:new_with_version(amqqueue_v1, - Name, - self(), - true, - false, - none, - [], - VHost, - #{}, - ?amqqueue_v1_type), - ?assert(not ?amqqueue_is_durable(TransientQueue)), - ?assert(?amqqueue_is_durable(DurableQueue)). - amqqueue_v2_is_durable(_) -> VHost = <<"/">>, - Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1), + Name = rabbit_misc:r(VHost, queue, my_amqqueue_v2), TransientQueue = amqqueue:new_with_version(amqqueue_v2, Name, self(), @@ -184,26 +120,9 @@ random_term_is_not_durable(_) -> %% ------------------------------------------------------------------- -amqqueue_v1_state_matching(_) -> - VHost = <<"/">>, - Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1), - Queue1 = amqqueue:new_with_version(amqqueue_v1, - Name, - self(), - true, - false, - none, - [], - VHost, - #{}, - ?amqqueue_v1_type), - ?assert(?amqqueue_state_is(Queue1, live)), - Queue2 = amqqueue:set_state(Queue1, stopped), - ?assert(?amqqueue_state_is(Queue2, stopped)). - amqqueue_v2_state_matching(_) -> VHost = <<"/">>, - Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1), + Name = rabbit_misc:r(VHost, queue, my_amqqueue_v2), Queue1 = amqqueue:new_with_version(amqqueue_v2, Name, self(), @@ -224,26 +143,9 @@ random_term_state_matching(_) -> %% ------------------------------------------------------------------- -amqqueue_v1_type_matching(_) -> - VHost = <<"/">>, - Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1), - Queue = amqqueue:new_with_version(amqqueue_v1, - Name, - self(), - true, - false, - none, - [], - VHost, - #{}, - ?amqqueue_v1_type), - ?assert(?amqqueue_is_classic(Queue)), - ?assert(amqqueue:is_classic(Queue)), - ?assert(not ?amqqueue_is_quorum(Queue)). - amqqueue_v2_type_matching(_) -> VHost = <<"/">>, - Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1), + Name = rabbit_misc:r(VHost, queue, my_amqqueue_v2), ClassicQueue = amqqueue:new_with_version(amqqueue_v2, Name, self(), @@ -279,24 +181,3 @@ random_term_type_matching(_) -> ?assert(not ?amqqueue_is_quorum(Term)), ?assertException(error, function_clause, amqqueue:is_classic(Term)), ?assertException(error, function_clause, amqqueue:is_quorum(Term)). - -%% ------------------------------------------------------------------- - -upgrade_v1_to_v2(_) -> - VHost = <<"/">>, - Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1), - OldQueue = amqqueue:new_with_version(amqqueue_v1, - Name, - self(), - true, - false, - none, - [], - VHost, - #{}, - ?amqqueue_v1_type), - ?assert(?is_amqqueue_v1(OldQueue)), - ?assert(not ?is_amqqueue_v2(OldQueue)), - NewQueue = amqqueue:upgrade_to(amqqueue_v2, OldQueue), - ?assert(not ?is_amqqueue_v1(NewQueue)), - ?assert(?is_amqqueue_v2(NewQueue)). |