diff options
author | Michael Klishin <klishinm@vmware.com> | 2021-02-25 19:10:15 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-02-25 19:10:15 +0300 |
commit | a2f98f25e99a33824510a4bbf7809a47764d44bb (patch) | |
tree | ee5d52eacdddbb5de081d5fe07530923b02e82f6 | |
parent | 17b082abeb19340eea98a9700b32a964db75dadc (diff) | |
parent | 6778c1fea3227c3fba9bc495d53648fea739eb20 (diff) | |
download | rabbitmq-server-git-a2f98f25e99a33824510a4bbf7809a47764d44bb.tar.gz |
Merge pull request #2804 from rabbitmq/rabbitmq-server-2756
Add federation support for quorum queues
-rw-r--r-- | deps/rabbit/src/rabbit_amqqueue.erl | 8 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_classic_queue.erl | 18 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_fifo.erl | 74 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_queue_type.erl | 14 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_quorum_queue.erl | 46 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_stream_queue.erl | 18 | ||||
-rw-r--r-- | deps/rabbit/test/rabbit_fifo_SUITE.erl | 41 | ||||
-rw-r--r-- | deps/rabbitmq_federation/src/rabbit_federation_queue.erl | 3 | ||||
-rw-r--r-- | deps/rabbitmq_federation/test/queue_SUITE.erl | 142 | ||||
-rw-r--r-- | deps/rabbitmq_federation/test/rabbit_federation_test_util.erl | 10 |
10 files changed, 270 insertions, 104 deletions
diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index 56c75e104b..5a3a60377c 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -1634,12 +1634,8 @@ basic_cancel(Q, ConsumerTag, OkMsg, ActingUser, QStates) -> -spec notify_decorators(amqqueue:amqqueue()) -> 'ok'. -notify_decorators(Q) when ?amqqueue_is_classic(Q) -> - QPid = amqqueue:get_pid(Q), - delegate:invoke_no_result(QPid, {gen_server2, cast, [notify_decorators]}); -notify_decorators(_Q) -> - %% Not supported by any other queue type - ok. +notify_decorators(Q) -> + rabbit_queue_type:notify_decorators(Q). notify_sent(QPid, ChPid) -> rabbit_amqqueue_common:notify_sent(QPid, ChPid). diff --git a/deps/rabbit/src/rabbit_classic_queue.erl b/deps/rabbit/src/rabbit_classic_queue.erl index e53c0aecc2..91cfde038f 100644 --- a/deps/rabbit/src/rabbit_classic_queue.erl +++ b/deps/rabbit/src/rabbit_classic_queue.erl @@ -38,7 +38,8 @@ dequeue/4, info/2, state_info/1, - capabilities/0 + capabilities/0, + notify_decorators/1 ]). -export([delete_crashed/1, @@ -441,14 +442,9 @@ recover_durable_queues(QueuesAndRecoveryTerms) -> [Q || {_, {new, Q}} <- Results]. capabilities() -> - #{policies => [<<"expires">>, <<"message-ttl">>, <<"dead-letter-exchange">>, - <<"dead-letter-routing-key">>, <<"max-length">>, - <<"max-length-bytes">>, <<"max-in-memory-length">>, <<"max-in-memory-bytes">>, - <<"max-priority">>, <<"overflow">>, <<"queue-mode">>, - <<"single-active-consumer">>, <<"delivery-limit">>, - <<"ha-mode">>, <<"ha-params">>, <<"ha-sync-mode">>, - <<"ha-promote-on-shutdown">>, <<"ha-promote-on-failure">>, - <<"queue-master-locator">>], + #{policies => [ %% Stream policies + <<"max-age">>, <<"max-segment-size">>, + <<"queue-leader-locator">>, <<"initial-cluster-size">>], queue_arguments => [<<"x-expires">>, <<"x-message-ttl">>, <<"x-dead-letter-exchange">>, <<"x-dead-letter-routing-key">>, <<"x-max-length">>, <<"x-max-length-bytes">>, <<"x-max-in-memory-length">>, @@ -460,6 +456,10 @@ capabilities() -> ], server_named => true}. +notify_decorators(Q) when ?is_amqqueue(Q) -> + QPid = amqqueue:get_pid(Q), + delegate:invoke_no_result(QPid, {gen_server2, cast, [notify_decorators]}). + reject_seq_no(SeqNo, U0) -> reject_seq_no(SeqNo, U0, []). diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index e2f69d047b..974ab7b7a0 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -41,6 +41,7 @@ query_single_active_consumer/1, query_in_memory_usage/1, query_peek/2, + query_notify_decorators_info/1, usage/1, zero/1, @@ -241,7 +242,7 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, {State1, ok, Effects} = checkout(Meta, State0, State0#?MODULE{service_queue = ServiceQueue, - consumers = Cons}, []), + consumers = Cons}, [], false), Response = {send_credit_reply, messages_ready(State1)}, %% by this point all checkouts for the updated credit value %% should be processed so we can evaluate the drain @@ -299,7 +300,8 @@ apply(#{index := Index, Exists = maps:is_key(ConsumerId, Consumers), case messages_ready(State0) of 0 -> - update_smallest_raft_index(Index, {dequeue, empty}, State0, []); + update_smallest_raft_index(Index, {dequeue, empty}, State0, + [notify_decorators_effect(State0)]); _ when Exists -> %% a dequeue using the same consumer_id isn't possible at this point {State0, {dequeue, empty}}; @@ -330,8 +332,8 @@ apply(#{index := Index, {{dequeue, {MsgId, Msg}, Ready-1}, Effects1} end, - - case evaluate_limit(Index, false, State0, State4, Effects2) of + NotifyEffect = notify_decorators_effect(State4), + case evaluate_limit(Index, false, State0, State4, [NotifyEffect | Effects2]) of {State, true, Effects} -> update_smallest_raft_index(Index, Reply, State, Effects); {State, false, Effects} -> @@ -456,6 +458,7 @@ apply(#{system_time := Ts} = Meta, {down, Pid, noconnection}, % Monitor the node so that we can "unsuspect" these processes when the node % comes back, then re-issue all monitors and discover the final fate of % these processes + Effects = case maps:size(State#?MODULE.consumers) of 0 -> [{aux, inactive}, {monitor, node, Node}]; @@ -959,6 +962,21 @@ query_peek(Pos, State0) when Pos > 0 -> query_peek(Pos-1, State) end. +query_notify_decorators_info(#?MODULE{consumers = Consumers} = State) -> + MaxActivePriority = maps:fold(fun(_, #consumer{credit = C, + status = up, + priority = P0}, MaxP) when C > 0 -> + P = -P0, + case MaxP of + empty -> P; + MaxP when MaxP > P -> MaxP; + _ -> P + end; + (_, _, MaxP) -> + MaxP + end, empty, Consumers), + IsEmpty = (messages_ready(State) == 0), + {MaxActivePriority, IsEmpty}. -spec usage(atom()) -> float(). usage(Name) when is_atom(Name) -> @@ -1064,11 +1082,13 @@ cancel_consumer0(Meta, ConsumerId, #{ConsumerId := Consumer} -> {S, Effects2} = maybe_return_all(Meta, ConsumerId, Consumer, S0, Effects0, Reason), + %% The effects are emitted before the consumer is actually removed %% if the consumer has unacked messages. This is a bit weird but %% in line with what classic queues do (from an external point of %% view) Effects = cancel_consumer_effects(ConsumerId, S, Effects2), + case maps:size(S#?MODULE.consumers) of 0 -> {S, [{aux, inactive} | Effects]}; @@ -1131,7 +1151,7 @@ apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) -> case maybe_enqueue(RaftIdx, From, Seq, RawMsg, [], State0) of {ok, State1, Effects1} -> State2 = append_to_master_index(RaftIdx, State1), - {State, ok, Effects} = checkout(Meta, State0, State2, Effects1), + {State, ok, Effects} = checkout(Meta, State0, State2, Effects1, false), {maybe_store_dehydrated_state(RaftIdx, State), ok, Effects}; {duplicate, State, Effects} -> {State, ok, Effects} @@ -1289,7 +1309,7 @@ return(#{index := IncomingRaftIdx} = Meta, ConsumerId, Returned, _ -> State1 end, - {State, ok, Effects} = checkout(Meta, State0, State2, Effects1), + {State, ok, Effects} = checkout(Meta, State0, State2, Effects1, false), update_smallest_raft_index(IncomingRaftIdx, State, Effects). % used to processes messages that are finished @@ -1333,7 +1353,7 @@ complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId, Discarded = maps:with(MsgIds, Checked0), {State2, Effects1} = complete(Meta, ConsumerId, Discarded, Con0, Effects0, State0), - {State, ok, Effects} = checkout(Meta, State0, State2, Effects1), + {State, ok, Effects} = checkout(Meta, State0, State2, Effects1, false), update_smallest_raft_index(IncomingRaftIdx, State, Effects). dead_letter_effects(_Reason, _Discarded, @@ -1365,9 +1385,10 @@ dead_letter_effects(Reason, Discarded, end} | Effects]. cancel_consumer_effects(ConsumerId, - #?MODULE{cfg = #cfg{resource = QName}}, Effects) -> + #?MODULE{cfg = #cfg{resource = QName}} = State, Effects) -> [{mod_call, rabbit_quorum_queue, - cancel_consumer_handler, [QName, ConsumerId]} | Effects]. + cancel_consumer_handler, [QName, ConsumerId]}, + notify_decorators_effect(State) | Effects]. update_smallest_raft_index(Idx, State, Effects) -> update_smallest_raft_index(Idx, ok, State, Effects). @@ -1502,14 +1523,30 @@ return_all(Meta, #?MODULE{consumers = Cons} = State0, Effects0, ConsumerId, end, {State, Effects0}, Checked). %% checkout new messages to consumers -checkout(#{index := Index} = Meta, OldState, State0, Effects0) -> +checkout(Meta, OldState, State, Effects) -> + checkout(Meta, OldState, State, Effects, true). + +checkout(#{index := Index} = Meta, #?MODULE{cfg = #cfg{resource = QName}} = OldState, State0, + Effects0, HandleConsumerChanges) -> {State1, _Result, Effects1} = checkout0(Meta, checkout_one(Meta, State0), Effects0, {#{}, #{}}), case evaluate_limit(Index, false, OldState, State1, Effects1) of {State, true, Effects} -> - update_smallest_raft_index(Index, State, Effects); + case maybe_notify_decorators(State, HandleConsumerChanges) of + {true, {MaxActivePriority, IsEmpty}} -> + NotifyEffect = notify_decorators_effect(QName, MaxActivePriority, IsEmpty), + update_smallest_raft_index(Index, State, [NotifyEffect | Effects]); + false -> + update_smallest_raft_index(Index, State, Effects) + end; {State, false, Effects} -> - {State, ok, Effects} + case maybe_notify_decorators(State, HandleConsumerChanges) of + {true, {MaxActivePriority, IsEmpty}} -> + NotifyEffect = notify_decorators_effect(QName, MaxActivePriority, IsEmpty), + {State, ok, [NotifyEffect | Effects]}; + false -> + {State, ok, Effects} + end end. checkout0(Meta, {success, ConsumerId, MsgId, {RaftIdx, {Header, 'empty'}}, State}, @@ -2128,3 +2165,16 @@ get_priority_from_args(#{args := Args}) -> end; get_priority_from_args(_) -> 0. + +maybe_notify_decorators(_, false) -> + false; +maybe_notify_decorators(State, _) -> + {true, query_notify_decorators_info(State)}. + +notify_decorators_effect(#?MODULE{cfg = #cfg{resource = QName}} = State) -> + {MaxActivePriority, IsEmpty} = query_notify_decorators_info(State), + notify_decorators_effect(QName, MaxActivePriority, IsEmpty). + +notify_decorators_effect(QName, MaxActivePriority, IsEmpty) -> + {mod_call, rabbit_quorum_queue, spawn_notify_decorators, + [QName, consumer_state_changed, [MaxActivePriority, IsEmpty]]}. diff --git a/deps/rabbit/src/rabbit_queue_type.erl b/deps/rabbit/src/rabbit_queue_type.erl index 6ecaf40c7d..47bf95a982 100644 --- a/deps/rabbit/src/rabbit_queue_type.erl +++ b/deps/rabbit/src/rabbit_queue_type.erl @@ -33,7 +33,8 @@ dequeue/5, fold_state/3, is_policy_applicable/2, - is_server_named_allowed/1 + is_server_named_allowed/1, + notify_decorators/1 ]). -type queue_name() :: rabbit_types:r(queue). @@ -198,6 +199,9 @@ -callback capabilities() -> #{atom() := term()}. +-callback notify_decorators(amqqueue:amqqueue()) -> + ok. + %% TODO: this should be controlled by a registry that is populated on boot discover(<<"quorum">>) -> rabbit_quorum_queue; @@ -298,15 +302,19 @@ i_down(_K, _Q, _DownReason) -> ''. is_policy_applicable(Q, Policy) -> Mod = amqqueue:get_type(Q), Capabilities = Mod:capabilities(), - Applicable = maps:get(policies, Capabilities, []), + NotApplicable = maps:get(policies, Capabilities, []), lists:all(fun({P, _}) -> - lists:member(P, Applicable) + not lists:member(P, NotApplicable) end, Policy). is_server_named_allowed(Type) -> Capabilities = Type:capabilities(), maps:get(server_named, Capabilities, false). +notify_decorators(Q) -> + Mod = amqqueue:get_type(Q), + Mod:notify_decorators(Q). + -spec init() -> state(). init() -> #?STATE{}. diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 52b9930ab6..af5373012c 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -48,6 +48,9 @@ repair_amqqueue_nodes/2 ]). -export([reclaim_memory/2]). +-export([notify_decorators/1, + notify_decorators/3, + spawn_notify_decorators/3]). -export([is_enabled/0, declare/2]). @@ -172,6 +175,7 @@ start_cluster(Q) -> ra_machine_config(NewQ)), %% force a policy change to ensure the latest config is %% updated even when running the machine version from 0 + notify_decorators(QName, startup), rabbit_event:notify(queue_created, [{name, QName}, {durable, Durable}, @@ -346,9 +350,14 @@ filter_quorum_critical(Queues, ReplicaStates) -> end, Queues). capabilities() -> - #{policies => [<<"max-length">>, <<"max-length-bytes">>, <<"overflow">>, - <<"expires">>, <<"max-in-memory-length">>, <<"max-in-memory-bytes">>, - <<"delivery-limit">>, <<"dead-letter-exchange">>, <<"dead-letter-routing-key">>], + #{policies => [ %% Classic policies + <<"message-ttl">>, <<"max-priority">>, <<"queue-mode">>, + <<"single-active-consumer">>, <<"ha-mode">>, <<"ha-params">>, + <<"ha-sync-mode">>, <<"ha-promote-on-shutdown">>, <<"ha-promote-on-failure">>, + <<"queue-master-locator">>, + %% Stream policies + <<"max-age">>, <<"max-segment-size">>, + <<"queue-leader-locator">>, <<"initial-cluster-size">>], queue_arguments => [<<"x-expires">>, <<"x-dead-letter-exchange">>, <<"x-dead-letter-routing-key">>, <<"x-max-length">>, <<"x-max-length-bytes">>, <<"x-max-in-memory-length">>, @@ -369,6 +378,11 @@ spawn_deleter(QName) -> delete(Q, false, false, <<"expired">>) end). +spawn_notify_decorators(QName, Fun, Args) -> + spawn(fun () -> + notify_decorators(QName, Fun, Args) + end). + handle_tick(QName, {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack}, Nodes) -> @@ -569,6 +583,7 @@ delete(Q, _IfUnused, _IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) -> after Timeout -> ok = force_delete_queue(Servers) end, + notify_decorators(QName, shutdown), ok = delete_queue_data(QName, ActingUser), rpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName], ?RPC_TIMEOUT), @@ -590,6 +605,7 @@ delete(Q, _IfUnused, _IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) -> " Attempting force delete.", [rabbit_misc:rs(QName), Errs]), ok = force_delete_queue(Servers), + notify_decorators(QName, shutdown), delete_queue_data(QName, ActingUser), {ok, ReadyMsgs} end @@ -1526,3 +1542,27 @@ parse_credit_args(Default, Args) -> undefined -> {simple_prefetch, Default, false} end. + +-spec notify_decorators(amqqueue:amqqueue()) -> 'ok'. +notify_decorators(Q) when ?is_amqqueue(Q) -> + QName = amqqueue:get_name(Q), + QPid = amqqueue:get_pid(Q), + case ra:local_query(QPid, fun rabbit_fifo:query_notify_decorators_info/1) of + {ok, {_, {MaxActivePriority, IsEmpty}}, _} -> + notify_decorators(QName, consumer_state_changed, [MaxActivePriority, IsEmpty]); + _ -> ok + end. + +notify_decorators(QName, Event) -> + notify_decorators(QName, Event, []). + +notify_decorators(QName, F, A) -> + %% Look up again in case policy and hence decorators have changed + case rabbit_amqqueue:lookup(QName) of + {ok, Q} -> + Ds = amqqueue:get_decorators(Q), + [ok = apply(M, F, [Q|A]) || M <- rabbit_queue_decorator:select(Ds)], + ok; + {error, not_found} -> + ok + end. diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index 206584fe61..0fe32e52a7 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -37,7 +37,8 @@ update/2, state_info/1, stat/1, - capabilities/0]). + capabilities/0, + notify_decorators/1]). -export([set_retention_policy/3]). -export([add_replica/3, @@ -744,8 +745,15 @@ msg_to_iodata(#basic_message{exchange_name = #resource{name = Exchange}, rabbit_msg_record:to_iodata(R). capabilities() -> - #{policies => [<<"max-length-bytes">>, <<"max-age">>, <<"max-segment-size">>, - <<"queue-leader-locator">>, <<"initial-cluster-size">>], + #{policies => [ %% Classic policies + <<"expires">>, <<"message-ttl">>, <<"dead-letter-exchange">>, + <<"dead-letter-routing-key">>, <<"max-length">>, + <<"max-in-memory-length">>, <<"max-in-memory-bytes">>, + <<"max-priority">>, <<"overflow">>, <<"queue-mode">>, + <<"single-active-consumer">>, <<"delivery-limit">>, + <<"ha-mode">>, <<"ha-params">>, <<"ha-sync-mode">>, + <<"ha-promote-on-shutdown">>, <<"ha-promote-on-failure">>, + <<"queue-master-locator">>], queue_arguments => [<<"x-dead-letter-exchange">>, <<"x-dead-letter-routing-key">>, <<"x-max-length">>, <<"x-max-length-bytes">>, <<"x-single-active-consumer">>, <<"x-queue-type">>, @@ -754,6 +762,10 @@ capabilities() -> consumer_arguments => [<<"x-stream-offset">>], server_named => false}. +notify_decorators(Q) when ?is_amqqueue(Q) -> + %% Not supported + ok. + resend_all(#stream_client{leader = LeaderPid, writer_id = WriterId, correlation = Corrs} = State) -> diff --git a/deps/rabbit/test/rabbit_fifo_SUITE.erl b/deps/rabbit/test/rabbit_fifo_SUITE.erl index 1b8761f90e..c92437fcee 100644 --- a/deps/rabbit/test/rabbit_fifo_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_SUITE.erl @@ -177,7 +177,8 @@ enq_enq_deq_test(_) -> {State2, _} = enq(2, 2, second, State1), % get returns a reply value NumReady = 1, - {_State3, {dequeue, {0, {_, first}}, NumReady}, [{monitor, _, _}]} = + {_State3, {dequeue, {0, {_, first}}, NumReady}, + [{mod_call, rabbit_quorum_queue, spawn_notify_decorators, _}, {monitor, _, _}]} = apply(meta(3), rabbit_fifo:make_checkout(Cid, {dequeue, unsettled}, #{}), State2), ok. @@ -187,7 +188,8 @@ enq_enq_deq_deq_settle_test(_) -> {State1, _} = enq(1, 1, first, test_init(test)), {State2, _} = enq(2, 2, second, State1), % get returns a reply value - {State3, {dequeue, {0, {_, first}}, 1}, [{monitor, _, _}]} = + {State3, {dequeue, {0, {_, first}}, 1}, + [{mod_call, rabbit_quorum_queue, spawn_notify_decorators, _}, {monitor, _, _}]} = apply(meta(3), rabbit_fifo:make_checkout(Cid, {dequeue, unsettled}, #{}), State2), {_State4, {dequeue, empty}} = @@ -235,7 +237,8 @@ release_cursor_test(_) -> checkout_enq_settle_test(_) -> Cid = {?FUNCTION_NAME, self()}, - {State1, [{monitor, _, _} | _]} = check(Cid, 1, test_init(test)), + {State1, [{mod_call, rabbit_quorum_queue, spawn_notify_decorators, _}, + {monitor, _, _} | _]} = check(Cid, 1, test_init(test)), {State2, Effects0} = enq(2, 1, first, State1), ?ASSERT_EFF({send_msg, _, {delivery, ?FUNCTION_NAME, @@ -250,7 +253,8 @@ checkout_enq_settle_test(_) -> out_of_order_enqueue_test(_) -> Cid = {?FUNCTION_NAME, self()}, - {State1, [{monitor, _, _} | _]} = check_n(Cid, 5, 5, test_init(test)), + {State1, [{mod_call, rabbit_quorum_queue, spawn_notify_decorators, _}, + {monitor, _, _} | _]} = check_n(Cid, 5, 5, test_init(test)), {State2, Effects2} = enq(2, 1, first, State1), ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects2), % assert monitor was set up @@ -280,7 +284,8 @@ out_of_order_first_enqueue_test(_) -> duplicate_enqueue_test(_) -> Cid = {<<"duplicate_enqueue_test">>, self()}, - {State1, [{monitor, _, _} | _]} = check_n(Cid, 5, 5, test_init(test)), + {State1, [{mod_call, rabbit_quorum_queue, spawn_notify_decorators, _}, + {monitor, _, _} | _]} = check_n(Cid, 5, 5, test_init(test)), {State2, Effects2} = enq(2, 1, first, State1), ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects2), {_State3, Effects3} = enq(3, 1, first, State2), @@ -331,7 +336,8 @@ return_non_existent_test(_) -> return_checked_out_test(_) -> Cid = {<<"cid">>, self()}, {State0, [_, _]} = enq(1, 1, first, test_init(test)), - {State1, [_Monitor, + {State1, [{mod_call, rabbit_quorum_queue, spawn_notify_decorators, _}, + _Monitor, {send_msg, _, {delivery, _, [{MsgId, _}]}, _}, {aux, active} | _ ]} = check_auto(Cid, 2, State0), % returning immediately checks out the same message again @@ -348,7 +354,8 @@ return_checked_out_limit_test(_) -> release_cursor_interval => 0, delivery_limit => 1}), {State0, [_, _]} = enq(1, 1, first, Init), - {State1, [_Monitor, + {State1, [{mod_call, rabbit_quorum_queue, spawn_notify_decorators, _}, + _Monitor, {send_msg, _, {delivery, _, [{MsgId, _}]}, _}, {aux, active} | _ ]} = check_auto(Cid, 2, State0), % returning immediately checks out the same message again @@ -366,7 +373,8 @@ return_auto_checked_out_test(_) -> {State0, [_]} = enq(2, 2, second, State00), % it first active then inactive as the consumer took on but cannot take % any more - {State1, [_Monitor, + {State1, [{mod_call, rabbit_quorum_queue, spawn_notify_decorators, _}, + _Monitor, {send_msg, _, {delivery, _, [{MsgId, _}]}, _}, {aux, active}, {aux, inactive} @@ -401,7 +409,7 @@ cancelled_checkout_out_test(_) -> down_with_noproc_consumer_returns_unsettled_test(_) -> Cid = {<<"down_consumer_returns_unsettled_test">>, self()}, {State0, [_, _]} = enq(1, 1, second, test_init(test)), - {State1, [{monitor, process, Pid} | _]} = check(Cid, 2, State0), + {State1, [_, {monitor, process, Pid} | _]} = check(Cid, 2, State0), {State2, _, _} = apply(meta(3), {down, Pid, noproc}, State1), {_State, Effects} = check(Cid, 4, State2), ?ASSERT_EFF({monitor, process, _}, Effects), @@ -600,7 +608,8 @@ purge_test(_) -> {State2, {purge, 1}, _} = apply(meta(2), rabbit_fifo:make_purge(), State1), {State3, _} = enq(3, 2, second, State2), % get returns a reply value - {_State4, {dequeue, {0, {_, second}}, _}, [{monitor, _, _}]} = + {_State4, {dequeue, {0, {_, second}}, _}, + [{mod_call, rabbit_quorum_queue, spawn_notify_decorators, _}, {monitor, _, _}]} = apply(meta(4), rabbit_fifo:make_checkout(Cid, {dequeue, unsettled}, #{}), State3), ok. @@ -1137,12 +1146,12 @@ active_flag_updated_when_consumer_suspected_unsuspected_test(_) -> {State2, _, Effects2} = apply(#{index => 3, system_time => 1500}, {down, Pid1, noconnection}, State1), - % 1 effect to update the metrics of each consumer (they belong to the same node), 1 more effect to monitor the node - ?assertEqual(4 + 1, length(Effects2)), + % 1 effect to update the metrics of each consumer (they belong to the same node), 1 more effect to monitor the node, 1 more decorators effect + ?assertEqual(4 + 1 + 1, length(Effects2)), {_, _, Effects3} = apply(#{index => 4}, {nodeup, node(self())}, State2), - % for each consumer: 1 effect to update the metrics, 1 effect to monitor the consumer PID - ?assertEqual(4 + 4, length(Effects3)). + % for each consumer: 1 effect to update the metrics, 1 effect to monitor the consumer PID, 1 more decorators effect + ?assertEqual(4 + 4 + 1, length(Effects3)). active_flag_not_updated_when_consumer_suspected_unsuspected_and_single_active_consumer_is_on_test(_) -> State0 = init(#{name => ?FUNCTION_NAME, @@ -1171,11 +1180,11 @@ active_flag_not_updated_when_consumer_suspected_unsuspected_and_single_active_co {State2, _, Effects2} = apply(meta(2), {down, Pid1, noconnection}, State1), % one monitor and one consumer status update (deactivated) - ?assertEqual(3, length(Effects2)), + ?assertEqual(4, length(Effects2)), {_, _, Effects3} = apply(meta(3), {nodeup, node(self())}, State2), % for each consumer: 1 effect to monitor the consumer PID - ?assertEqual(5, length(Effects3)). + ?assertEqual(6, length(Effects3)). single_active_cancelled_with_unacked_test(_) -> State0 = init(#{name => ?FUNCTION_NAME, diff --git a/deps/rabbitmq_federation/src/rabbit_federation_queue.erl b/deps/rabbitmq_federation/src/rabbit_federation_queue.erl index e3ad0f9cb6..ea996aa8de 100644 --- a/deps/rabbitmq_federation/src/rabbit_federation_queue.erl +++ b/deps/rabbitmq_federation/src/rabbit_federation_queue.erl @@ -48,8 +48,7 @@ policy_changed(Q1, Q2) when ?is_amqqueue(Q1) -> QName = amqqueue:get_name(Q1), case rabbit_amqqueue:lookup(QName) of {ok, Q0} when ?is_amqqueue(Q0) -> - QPid = amqqueue:get_pid(Q0), - rpc:call(node(QPid), rabbit_federation_queue, + rpc:call(amqqueue:qnode(Q0), rabbit_federation_queue, policy_changed_local, [Q1, Q2]); {error, not_found} -> ok diff --git a/deps/rabbitmq_federation/test/queue_SUITE.erl b/deps/rabbitmq_federation/test/queue_SUITE.erl index 82b7cd6499..62f9abcc4c 100644 --- a/deps/rabbitmq_federation/test/queue_SUITE.erl +++ b/deps/rabbitmq_federation/test/queue_SUITE.erl @@ -15,7 +15,7 @@ -import(rabbit_federation_test_util, [wait_for_federation/2, expect/3, expect/4, set_upstream/4, set_upstream/5, clear_upstream/3, set_policy/5, clear_policy/3, - set_policy_pattern/5, set_policy_upstream/5, q/1, with_ch/3, + set_policy_pattern/5, set_policy_upstream/5, q/2, with_ch/3, declare_queue/2, delete_queue/2, federation_links_in_vhost/3]). @@ -24,30 +24,37 @@ all() -> [ - {group, without_disambiguate}, - {group, with_disambiguate} + {group, classic_queue}, + {group, quorum_queue} ]. groups() -> - [ - {without_disambiguate, [], [ - {cluster_size_1, [], [ - simple, - multiple_upstreams, - multiple_upstreams_pattern, - multiple_downstreams, - bidirectional, - dynamic_reconfiguration, - federate_unfederate, - dynamic_plugin_stop_start - ]} - ]}, - {with_disambiguate, [], [ - {cluster_size_2, [], [ - restart_upstream - ]} - ]} - ]. + ClusterSize1 = [simple, + multiple_upstreams, + multiple_upstreams_pattern, + multiple_downstreams, + bidirectional, + dynamic_reconfiguration, + federate_unfederate, + dynamic_plugin_stop_start + ], + ClusterSize2 = [restart_upstream], + [{classic_queue, [], [ + {without_disambiguate, [], [ + {cluster_size_1, [], ClusterSize1} + ]}, + {with_disambiguate, [], [ + {cluster_size_2, [], ClusterSize2} + ]} + ]}, + {quorum_queue, [], [ + {without_disambiguate, [], [ + {cluster_size_1, [], ClusterSize1} + ]}, + {with_disambiguate, [], [ + {cluster_size_2, [], ClusterSize2} + ]} + ]}]. %% ------------------------------------------------------------------- %% Testsuite setup/teardown. @@ -60,6 +67,16 @@ init_per_suite(Config) -> end_per_suite(Config) -> rabbit_ct_helpers:run_teardown_steps(Config). +init_per_group(classic_queue, Config) -> + rabbit_ct_helpers:set_config( + Config, + [{queue_type, classic}, + {queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]}]); +init_per_group(quorum_queue, Config) -> + rabbit_ct_helpers:set_config( + Config, + [{queue_type, quorum}, + {queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]}]); init_per_group(without_disambiguate, Config) -> rabbit_ct_helpers:set_config(Config, {disambiguate_step, []}); @@ -88,15 +105,30 @@ init_per_group1(Group, Config) -> {rmq_nodename_suffix, Suffix}, {rmq_nodes_clustered, false} ]), - rabbit_ct_helpers:run_steps(Config1, - rabbit_ct_broker_helpers:setup_steps() ++ - rabbit_ct_client_helpers:setup_steps() ++ - SetupFederation ++ Disambiguate). + Config2 = rabbit_ct_helpers:run_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps() ++ + SetupFederation ++ Disambiguate), + case ?config(queue_type, Config2) of + quorum -> + case rabbit_ct_broker_helpers:enable_feature_flag(Config2, quorum_queue) of + ok -> + Config2; + Skip -> + Skip + end; + _ -> + Config2 + end. end_per_group(without_disambiguate, Config) -> Config; end_per_group(with_disambiguate, Config) -> Config; +end_per_group(classic_queue, Config) -> + Config; +end_per_group(quorum_queue, Config) -> + Config; end_per_group(_, Config) -> rabbit_ct_helpers:run_steps(Config, rabbit_ct_client_helpers:teardown_steps() ++ @@ -113,19 +145,21 @@ end_per_testcase(Testcase, Config) -> %% ------------------------------------------------------------------- simple(Config) -> + Args = ?config(queue_args, Config), with_ch(Config, fun (Ch) -> expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>) - end, upstream_downstream()). + end, upstream_downstream(Args)). multiple_upstreams(Config) -> + Args = ?config(queue_args, Config), with_ch(Config, fun (Ch) -> expect_federation(Ch, <<"upstream">>, <<"fed12.downstream">>), expect_federation(Ch, <<"upstream2">>, <<"fed12.downstream">>) - end, [q(<<"upstream">>), - q(<<"upstream2">>), - q(<<"fed12.downstream">>)]). + end, [q(<<"upstream">>, Args), + q(<<"upstream2">>, Args), + q(<<"fed12.downstream">>, Args)]). multiple_upstreams_pattern(Config) -> set_upstream(Config, 0, <<"local453x">>, @@ -145,43 +179,51 @@ multiple_upstreams_pattern(Config) -> set_policy_pattern(Config, 0, <<"pattern">>, <<"^pattern\.">>, <<"local\\d+x">>), + Args = ?config(queue_args, Config), with_ch(Config, fun (Ch) -> expect_federation(Ch, <<"upstream">>, <<"pattern.downstream">>, ?EXPECT_FEDERATION_TIMEOUT), expect_federation(Ch, <<"upstream2">>, <<"pattern.downstream">>, ?EXPECT_FEDERATION_TIMEOUT) - end, [q(<<"upstream">>), - q(<<"upstream2">>), - q(<<"pattern.downstream">>)]), + end, [q(<<"upstream">>, Args), + q(<<"upstream2">>, Args), + q(<<"pattern.downstream">>, Args)]), clear_upstream(Config, 0, <<"local453x">>), clear_upstream(Config, 0, <<"local3214x">>), clear_policy(Config, 0, <<"pattern">>). multiple_downstreams(Config) -> + Args = ?config(queue_args, Config), with_ch(Config, fun (Ch) -> timer:sleep(?INITIAL_WAIT), expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>, ?EXPECT_FEDERATION_TIMEOUT), expect_federation(Ch, <<"upstream">>, <<"fed.downstream2">>, ?EXPECT_FEDERATION_TIMEOUT) - end, upstream_downstream() ++ [q(<<"fed.downstream2">>)]). + end, upstream_downstream(Args) ++ [q(<<"fed.downstream2">>, Args)]). bidirectional(Config) -> + Args = ?config(queue_args, Config), with_ch(Config, fun (Ch) -> timer:sleep(?INITIAL_WAIT), publish_expect(Ch, <<>>, <<"one">>, <<"one">>, <<"first one">>, ?EXPECT_FEDERATION_TIMEOUT), publish_expect(Ch, <<>>, <<"two">>, <<"two">>, <<"first two">>, ?EXPECT_FEDERATION_TIMEOUT), - Seq = lists:seq(1, 100), + Seq = lists:seq(1, 50), [publish(Ch, <<>>, <<"one">>, <<"bulk">>) || _ <- Seq], [publish(Ch, <<>>, <<"two">>, <<"bulk">>) || _ <- Seq], - expect(Ch, <<"one">>, repeat(150, <<"bulk">>)), - expect(Ch, <<"two">>, repeat(50, <<"bulk">>)), + expect(Ch, <<"one">>, repeat(100, <<"bulk">>)), + expect_empty(Ch, <<"one">>), + expect_empty(Ch, <<"two">>), + [publish(Ch, <<>>, <<"one">>, <<"bulk">>) || _ <- Seq], + [publish(Ch, <<>>, <<"two">>, <<"bulk">>) || _ <- Seq], + expect(Ch, <<"two">>, repeat(100, <<"bulk">>)), expect_empty(Ch, <<"one">>), expect_empty(Ch, <<"two">>) - end, [q(<<"one">>), - q(<<"two">>)]). + end, [q(<<"one">>, Args), + q(<<"two">>, Args)]). dynamic_reconfiguration(Config) -> + Args = ?config(queue_args, Config), with_ch(Config, fun (Ch) -> timer:sleep(?INITIAL_WAIT), @@ -199,9 +241,10 @@ dynamic_reconfiguration(Config) -> set_upstream(Config, 0, <<"localhost">>, URI), set_upstream(Config, 0, <<"localhost">>, URI), expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>) - end, upstream_downstream()). + end, upstream_downstream(Args)). federate_unfederate(Config) -> + Args = ?config(queue_args, Config), with_ch(Config, fun (Ch) -> timer:sleep(?INITIAL_WAIT), @@ -217,10 +260,11 @@ federate_unfederate(Config) -> rabbit_ct_broker_helpers:set_policy(Config, 0, <<"fed">>, <<"^fed\.">>, <<"all">>, [ {<<"federation-upstream-set">>, <<"upstream">>}]) - end, upstream_downstream() ++ [q(<<"fed.downstream2">>)]). + end, upstream_downstream(Args) ++ [q(<<"fed.downstream2">>, Args)]). dynamic_plugin_stop_start(Config) -> DownQ2 = <<"fed.downstream2">>, + Args = ?config(queue_args, Config), with_ch(Config, fun (Ch) -> timer:sleep(?INITIAL_WAIT), @@ -235,8 +279,8 @@ dynamic_plugin_stop_start(Config) -> expect_no_federation(Ch, UpQ, DownQ1), expect_no_federation(Ch, UpQ, DownQ2), - declare_queue(Ch, q(DownQ1)), - declare_queue(Ch, q(DownQ2)), + declare_queue(Ch, q(DownQ1, Args)), + declare_queue(Ch, q(DownQ2, Args)), ok = rabbit_ct_broker_helpers:enable_plugin(Config, 0, "rabbitmq_federation"), %% Declare a queue then re-enable the plugin, the links appear @@ -255,7 +299,7 @@ dynamic_plugin_stop_start(Config) -> length(L) =:= 2 end), expect_federation(Ch, UpQ, DownQ1, 120000) - end, upstream_downstream() ++ [q(DownQ2)]). + end, upstream_downstream(Args) ++ [q(DownQ2, Args)]). restart_upstream(Config) -> [Rabbit, Hare] = rabbit_ct_broker_helpers:get_node_configs(Config, @@ -266,8 +310,9 @@ restart_upstream(Config) -> Downstream = rabbit_ct_client_helpers:open_channel(Config, Rabbit), Upstream = rabbit_ct_client_helpers:open_channel(Config, Hare), - declare_queue(Upstream, q(<<"test">>)), - declare_queue(Downstream, q(<<"test">>)), + Args = ?config(queue_args, Config), + declare_queue(Upstream, q(<<"test">>, Args)), + declare_queue(Downstream, q(<<"test">>, Args)), Seq = lists:seq(1, 100), [publish(Upstream, <<>>, <<"test">>, <<"bulk">>) || _ <- Seq], expect(Upstream, <<"test">>, repeat(25, <<"bulk">>)), @@ -325,4 +370,7 @@ expect_no_federation(Ch, UpstreamQ, DownstreamQ) -> expect(Ch, UpstreamQ, [<<"HELLO">>]). upstream_downstream() -> - [q(<<"upstream">>), q(<<"fed.downstream">>)]. + upstream_downstream([]). + +upstream_downstream(Args) -> + [q(<<"upstream">>, Args), q(<<"fed.downstream">>, Args)]. diff --git a/deps/rabbitmq_federation/test/rabbit_federation_test_util.erl b/deps/rabbitmq_federation/test/rabbit_federation_test_util.erl index a9e08dcb09..05e7b9e30a 100644 --- a/deps/rabbitmq_federation/test/rabbit_federation_test_util.erl +++ b/deps/rabbitmq_federation/test/rabbit_federation_test_util.erl @@ -181,10 +181,10 @@ expect([], _Timeout) -> ok; expect(Payloads, Timeout) -> receive - {#'basic.deliver'{}, #amqp_msg{payload = Payload}} -> + {#'basic.deliver'{delivery_tag = DTag}, #amqp_msg{payload = Payload}} -> case lists:member(Payload, Payloads) of true -> - ct:pal("Consumed a message: ~p", [Payload]), + ct:pal("Consumed a message: ~p ~p left: ~p", [Payload, DTag, length(Payloads) - 1]), expect(Payloads -- [Payload], Timeout); false -> ?assert(false, rabbit_misc:format("received an unexpected payload ~p", [Payload])) end @@ -355,5 +355,9 @@ delete_queue(Ch, Q) -> amqp_channel:call(Ch, #'queue.delete'{queue = Q}). q(Name) -> + q(Name, []). + +q(Name, Args) -> #'queue.declare'{queue = Name, - durable = true}. + durable = true, + arguments = Args}. |