diff options
author | dcorbacho <dparracorbacho@piotal.io> | 2021-02-10 15:06:19 +0100 |
---|---|---|
committer | dcorbacho <dparracorbacho@piotal.io> | 2021-02-18 17:15:47 +0100 |
commit | 699cd1ab29a86c70a930d589041a78f5b3bc338e (patch) | |
tree | 1f0833bea92b2028dc04ac761bb08145b2fb2625 | |
parent | a71771b8c12c8226ea7c6397ff6dc24e093cdda6 (diff) | |
download | rabbitmq-server-git-699cd1ab29a86c70a930d589041a78f5b3bc338e.tar.gz |
Add federation support for quorum queues
-rw-r--r-- | deps/rabbit/src/rabbit_amqqueue.erl | 2 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_fifo.erl | 74 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_quorum_queue.erl | 33 | ||||
-rw-r--r-- | deps/rabbit/test/rabbit_fifo_SUITE.erl | 41 | ||||
-rw-r--r-- | deps/rabbitmq_federation/test/queue_SUITE.erl | 142 | ||||
-rw-r--r-- | deps/rabbitmq_federation/test/rabbit_federation_test_util.erl | 10 |
6 files changed, 224 insertions, 78 deletions
diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index 56c75e104b..81cf64a0ad 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -1637,6 +1637,8 @@ basic_cancel(Q, ConsumerTag, OkMsg, ActingUser, QStates) -> notify_decorators(Q) when ?amqqueue_is_classic(Q) -> QPid = amqqueue:get_pid(Q), delegate:invoke_no_result(QPid, {gen_server2, cast, [notify_decorators]}); +notify_decorators(Q) when ?amqqueue_is_quorum(Q) -> + rabbit_quorum_queue:notify_decorators(Q); notify_decorators(_Q) -> %% Not supported by any other queue type ok. diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index 3b5b9f5e40..ee2ceb0298 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -41,6 +41,7 @@ query_single_active_consumer/1, query_in_memory_usage/1, query_peek/2, + query_notify_decorators_info/1, usage/1, zero/1, @@ -241,7 +242,7 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, {State1, ok, Effects} = checkout(Meta, State0, State0#?MODULE{service_queue = ServiceQueue, - consumers = Cons}, []), + consumers = Cons}, [], false), Response = {send_credit_reply, messages_ready(State1)}, %% by this point all checkouts for the updated credit value %% should be processed so we can evaluate the drain @@ -299,7 +300,8 @@ apply(#{index := Index, Exists = maps:is_key(ConsumerId, Consumers), case messages_ready(State0) of 0 -> - update_smallest_raft_index(Index, {dequeue, empty}, State0, []); + update_smallest_raft_index(Index, {dequeue, empty}, State0, + [notify_decorators_effect(State0)]); _ when Exists -> %% a dequeue using the same consumer_id isn't possible at this point {State0, {dequeue, empty}}; @@ -330,8 +332,8 @@ apply(#{index := Index, {{dequeue, {MsgId, Msg}, Ready-1}, Effects1} end, - - case evaluate_limit(Index, false, State0, State4, Effects2) of + NotifyEffect = notify_decorators_effect(State4), + case evaluate_limit(Index, false, State0, State4, [NotifyEffect | Effects2]) of {State, true, Effects} -> update_smallest_raft_index(Index, Reply, State, Effects); {State, false, Effects} -> @@ -456,6 +458,7 @@ apply(#{system_time := Ts} = Meta, {down, Pid, noconnection}, % Monitor the node so that we can "unsuspect" these processes when the node % comes back, then re-issue all monitors and discover the final fate of % these processes + Effects = case maps:size(State#?MODULE.consumers) of 0 -> [{aux, inactive}, {monitor, node, Node}]; @@ -959,6 +962,21 @@ query_peek(Pos, State0) when Pos > 0 -> query_peek(Pos-1, State) end. +query_notify_decorators_info(#?MODULE{consumers = Consumers} = State) -> + MaxActivePriority = maps:fold(fun(_, #consumer{credit = C, + status = up, + priority = P0}, MaxP) when C > 0 -> + P = -P0, + case MaxP of + empty -> P; + MaxP when MaxP > P -> MaxP; + _ -> P + end; + (_, _, MaxP) -> + MaxP + end, empty, Consumers), + IsEmpty = (messages_ready(State) == 0), + {MaxActivePriority, IsEmpty}. -spec usage(atom()) -> float(). usage(Name) when is_atom(Name) -> @@ -1062,11 +1080,13 @@ cancel_consumer0(Meta, ConsumerId, #{ConsumerId := Consumer} -> {S, Effects2} = maybe_return_all(Meta, ConsumerId, Consumer, S0, Effects0, Reason), + %% The effects are emitted before the consumer is actually removed %% if the consumer has unacked messages. This is a bit weird but %% in line with what classic queues do (from an external point of %% view) Effects = cancel_consumer_effects(ConsumerId, S, Effects2), + case maps:size(S#?MODULE.consumers) of 0 -> {S, [{aux, inactive} | Effects]}; @@ -1129,7 +1149,7 @@ apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) -> case maybe_enqueue(RaftIdx, From, Seq, RawMsg, [], State0) of {ok, State1, Effects1} -> State2 = append_to_master_index(RaftIdx, State1), - {State, ok, Effects} = checkout(Meta, State0, State2, Effects1), + {State, ok, Effects} = checkout(Meta, State0, State2, Effects1, false), {maybe_store_dehydrated_state(RaftIdx, State), ok, Effects}; {duplicate, State, Effects} -> {State, ok, Effects} @@ -1287,7 +1307,7 @@ return(#{index := IncomingRaftIdx} = Meta, ConsumerId, Returned, _ -> State1 end, - {State, ok, Effects} = checkout(Meta, State0, State2, Effects1), + {State, ok, Effects} = checkout(Meta, State0, State2, Effects1, false), update_smallest_raft_index(IncomingRaftIdx, State, Effects). % used to processes messages that are finished @@ -1331,7 +1351,7 @@ complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId, Discarded = maps:with(MsgIds, Checked0), {State2, Effects1} = complete(Meta, ConsumerId, Discarded, Con0, Effects0, State0), - {State, ok, Effects} = checkout(Meta, State0, State2, Effects1), + {State, ok, Effects} = checkout(Meta, State0, State2, Effects1, false), update_smallest_raft_index(IncomingRaftIdx, State, Effects). dead_letter_effects(_Reason, _Discarded, @@ -1363,9 +1383,10 @@ dead_letter_effects(Reason, Discarded, end} | Effects]. cancel_consumer_effects(ConsumerId, - #?MODULE{cfg = #cfg{resource = QName}}, Effects) -> + #?MODULE{cfg = #cfg{resource = QName}} = State, Effects) -> [{mod_call, rabbit_quorum_queue, - cancel_consumer_handler, [QName, ConsumerId]} | Effects]. + cancel_consumer_handler, [QName, ConsumerId]}, + notify_decorators_effect(State) | Effects]. update_smallest_raft_index(Idx, State, Effects) -> update_smallest_raft_index(Idx, ok, State, Effects). @@ -1500,14 +1521,30 @@ return_all(Meta, #?MODULE{consumers = Cons} = State0, Effects0, ConsumerId, end, {State, Effects0}, Checked). %% checkout new messages to consumers -checkout(#{index := Index} = Meta, OldState, State0, Effects0) -> +checkout(Meta, OldState, State, Effects) -> + checkout(Meta, OldState, State, Effects, true). + +checkout(#{index := Index} = Meta, #?MODULE{cfg = #cfg{resource = QName}} = OldState, State0, + Effects0, HandleConsumerChanges) -> {State1, _Result, Effects1} = checkout0(Meta, checkout_one(Meta, State0), Effects0, {#{}, #{}}), case evaluate_limit(Index, false, OldState, State1, Effects1) of {State, true, Effects} -> - update_smallest_raft_index(Index, State, Effects); + case have_active_consumers_changed(State, HandleConsumerChanges) of + {true, {MaxActivePriority, IsEmpty}} -> + NotifyEffect = notify_decorators_effect(QName, MaxActivePriority, IsEmpty), + update_smallest_raft_index(Index, State, [NotifyEffect | Effects]); + false -> + update_smallest_raft_index(Index, State, Effects) + end; {State, false, Effects} -> - {State, ok, Effects} + case have_active_consumers_changed(State, HandleConsumerChanges) of + {true, {MaxActivePriority, IsEmpty}} -> + NotifyEffect = notify_decorators_effect(QName, MaxActivePriority, IsEmpty), + {State, ok, [NotifyEffect | Effects]}; + false -> + {State, ok, Effects} + end end. checkout0(Meta, {success, ConsumerId, MsgId, {RaftIdx, {Header, 'empty'}}, State}, @@ -2126,3 +2163,16 @@ get_priority_from_args(#{args := Args}) -> end; get_priority_from_args(_) -> 0. + +have_active_consumers_changed(_, false) -> + false; +have_active_consumers_changed(State, _) -> + {true, query_notify_decorators_info(State)}. + +notify_decorators_effect(#?MODULE{cfg = #cfg{resource = QName}} = State) -> + {MaxActivePriority, IsEmpty} = query_notify_decorators_info(State), + notify_decorators_effect(QName, MaxActivePriority, IsEmpty). + +notify_decorators_effect(QName, MaxActivePriority, IsEmpty) -> + {mod_call, rabbit_quorum_queue, spawn_notify_decorators, + [QName, consumer_state_changed, [MaxActivePriority, IsEmpty]]}. diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 3fb3e026d4..fa9c4f5d45 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -48,6 +48,9 @@ repair_amqqueue_nodes/2 ]). -export([reclaim_memory/2]). +-export([notify_decorators/1, + notify_decorators/3, + spawn_notify_decorators/3]). -export([is_enabled/0, declare/2]). @@ -172,6 +175,7 @@ start_cluster(Q) -> ra_machine_config(NewQ)), %% force a policy change to ensure the latest config is %% updated even when running the machine version from 0 + notify_decorators(QName, startup), rabbit_event:notify(queue_created, [{name, QName}, {durable, Durable}, @@ -369,6 +373,11 @@ spawn_deleter(QName) -> delete(Q, false, false, <<"expired">>) end). +spawn_notify_decorators(QName, Fun, Args) -> + spawn(fun () -> + notify_decorators(QName, Fun, Args) + end). + handle_tick(QName, {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack}, Nodes) -> @@ -568,6 +577,7 @@ delete(Q, _IfUnused, _IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) -> after Timeout -> ok = force_delete_queue(Servers) end, + notify_decorators(QName, shutdown), ok = delete_queue_data(QName, ActingUser), rpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName], ?RPC_TIMEOUT), @@ -589,6 +599,7 @@ delete(Q, _IfUnused, _IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) -> " Attempting force delete.", [rabbit_misc:rs(QName), Errs]), ok = force_delete_queue(Servers), + notify_decorators(QName, shutdown), delete_queue_data(QName, ActingUser), {ok, ReadyMsgs} end @@ -1525,3 +1536,25 @@ parse_credit_args(Default, Args) -> undefined -> {simple_prefetch, Default, false} end. + +-spec notify_decorators(amqqueue:amqqueue()) -> 'ok'. +notify_decorators(Q) when ?is_amqqueue(Q) -> + QName = amqqueue:get_name(Q), + QPid = amqqueue:get_pid(Q), + case ra:local_query(QPid, fun rabbit_fifo:query_notify_decorators_info/1) of + {ok, {_, {MaxActivePriority, IsEmpty}}, _} -> + notify_decorators(QName, consumer_state_changed, [MaxActivePriority, IsEmpty]); + _ -> ok + end. + +notify_decorators(QName, Event) -> notify_decorators(QName, Event, []). + +notify_decorators(QName, F, A) -> + %% Look up again in case policy and hence decorators have changed + case rabbit_amqqueue:lookup(QName) of + {ok, Q} -> + Ds = amqqueue:get_decorators(Q), + [ok = apply(M, F, [Q|A]) || M <- rabbit_queue_decorator:select(Ds)]; + {error, not_found} -> + ok + end. diff --git a/deps/rabbit/test/rabbit_fifo_SUITE.erl b/deps/rabbit/test/rabbit_fifo_SUITE.erl index 1b8761f90e..c92437fcee 100644 --- a/deps/rabbit/test/rabbit_fifo_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_SUITE.erl @@ -177,7 +177,8 @@ enq_enq_deq_test(_) -> {State2, _} = enq(2, 2, second, State1), % get returns a reply value NumReady = 1, - {_State3, {dequeue, {0, {_, first}}, NumReady}, [{monitor, _, _}]} = + {_State3, {dequeue, {0, {_, first}}, NumReady}, + [{mod_call, rabbit_quorum_queue, spawn_notify_decorators, _}, {monitor, _, _}]} = apply(meta(3), rabbit_fifo:make_checkout(Cid, {dequeue, unsettled}, #{}), State2), ok. @@ -187,7 +188,8 @@ enq_enq_deq_deq_settle_test(_) -> {State1, _} = enq(1, 1, first, test_init(test)), {State2, _} = enq(2, 2, second, State1), % get returns a reply value - {State3, {dequeue, {0, {_, first}}, 1}, [{monitor, _, _}]} = + {State3, {dequeue, {0, {_, first}}, 1}, + [{mod_call, rabbit_quorum_queue, spawn_notify_decorators, _}, {monitor, _, _}]} = apply(meta(3), rabbit_fifo:make_checkout(Cid, {dequeue, unsettled}, #{}), State2), {_State4, {dequeue, empty}} = @@ -235,7 +237,8 @@ release_cursor_test(_) -> checkout_enq_settle_test(_) -> Cid = {?FUNCTION_NAME, self()}, - {State1, [{monitor, _, _} | _]} = check(Cid, 1, test_init(test)), + {State1, [{mod_call, rabbit_quorum_queue, spawn_notify_decorators, _}, + {monitor, _, _} | _]} = check(Cid, 1, test_init(test)), {State2, Effects0} = enq(2, 1, first, State1), ?ASSERT_EFF({send_msg, _, {delivery, ?FUNCTION_NAME, @@ -250,7 +253,8 @@ checkout_enq_settle_test(_) -> out_of_order_enqueue_test(_) -> Cid = {?FUNCTION_NAME, self()}, - {State1, [{monitor, _, _} | _]} = check_n(Cid, 5, 5, test_init(test)), + {State1, [{mod_call, rabbit_quorum_queue, spawn_notify_decorators, _}, + {monitor, _, _} | _]} = check_n(Cid, 5, 5, test_init(test)), {State2, Effects2} = enq(2, 1, first, State1), ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects2), % assert monitor was set up @@ -280,7 +284,8 @@ out_of_order_first_enqueue_test(_) -> duplicate_enqueue_test(_) -> Cid = {<<"duplicate_enqueue_test">>, self()}, - {State1, [{monitor, _, _} | _]} = check_n(Cid, 5, 5, test_init(test)), + {State1, [{mod_call, rabbit_quorum_queue, spawn_notify_decorators, _}, + {monitor, _, _} | _]} = check_n(Cid, 5, 5, test_init(test)), {State2, Effects2} = enq(2, 1, first, State1), ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects2), {_State3, Effects3} = enq(3, 1, first, State2), @@ -331,7 +336,8 @@ return_non_existent_test(_) -> return_checked_out_test(_) -> Cid = {<<"cid">>, self()}, {State0, [_, _]} = enq(1, 1, first, test_init(test)), - {State1, [_Monitor, + {State1, [{mod_call, rabbit_quorum_queue, spawn_notify_decorators, _}, + _Monitor, {send_msg, _, {delivery, _, [{MsgId, _}]}, _}, {aux, active} | _ ]} = check_auto(Cid, 2, State0), % returning immediately checks out the same message again @@ -348,7 +354,8 @@ return_checked_out_limit_test(_) -> release_cursor_interval => 0, delivery_limit => 1}), {State0, [_, _]} = enq(1, 1, first, Init), - {State1, [_Monitor, + {State1, [{mod_call, rabbit_quorum_queue, spawn_notify_decorators, _}, + _Monitor, {send_msg, _, {delivery, _, [{MsgId, _}]}, _}, {aux, active} | _ ]} = check_auto(Cid, 2, State0), % returning immediately checks out the same message again @@ -366,7 +373,8 @@ return_auto_checked_out_test(_) -> {State0, [_]} = enq(2, 2, second, State00), % it first active then inactive as the consumer took on but cannot take % any more - {State1, [_Monitor, + {State1, [{mod_call, rabbit_quorum_queue, spawn_notify_decorators, _}, + _Monitor, {send_msg, _, {delivery, _, [{MsgId, _}]}, _}, {aux, active}, {aux, inactive} @@ -401,7 +409,7 @@ cancelled_checkout_out_test(_) -> down_with_noproc_consumer_returns_unsettled_test(_) -> Cid = {<<"down_consumer_returns_unsettled_test">>, self()}, {State0, [_, _]} = enq(1, 1, second, test_init(test)), - {State1, [{monitor, process, Pid} | _]} = check(Cid, 2, State0), + {State1, [_, {monitor, process, Pid} | _]} = check(Cid, 2, State0), {State2, _, _} = apply(meta(3), {down, Pid, noproc}, State1), {_State, Effects} = check(Cid, 4, State2), ?ASSERT_EFF({monitor, process, _}, Effects), @@ -600,7 +608,8 @@ purge_test(_) -> {State2, {purge, 1}, _} = apply(meta(2), rabbit_fifo:make_purge(), State1), {State3, _} = enq(3, 2, second, State2), % get returns a reply value - {_State4, {dequeue, {0, {_, second}}, _}, [{monitor, _, _}]} = + {_State4, {dequeue, {0, {_, second}}, _}, + [{mod_call, rabbit_quorum_queue, spawn_notify_decorators, _}, {monitor, _, _}]} = apply(meta(4), rabbit_fifo:make_checkout(Cid, {dequeue, unsettled}, #{}), State3), ok. @@ -1137,12 +1146,12 @@ active_flag_updated_when_consumer_suspected_unsuspected_test(_) -> {State2, _, Effects2} = apply(#{index => 3, system_time => 1500}, {down, Pid1, noconnection}, State1), - % 1 effect to update the metrics of each consumer (they belong to the same node), 1 more effect to monitor the node - ?assertEqual(4 + 1, length(Effects2)), + % 1 effect to update the metrics of each consumer (they belong to the same node), 1 more effect to monitor the node, 1 more decorators effect + ?assertEqual(4 + 1 + 1, length(Effects2)), {_, _, Effects3} = apply(#{index => 4}, {nodeup, node(self())}, State2), - % for each consumer: 1 effect to update the metrics, 1 effect to monitor the consumer PID - ?assertEqual(4 + 4, length(Effects3)). + % for each consumer: 1 effect to update the metrics, 1 effect to monitor the consumer PID, 1 more decorators effect + ?assertEqual(4 + 4 + 1, length(Effects3)). active_flag_not_updated_when_consumer_suspected_unsuspected_and_single_active_consumer_is_on_test(_) -> State0 = init(#{name => ?FUNCTION_NAME, @@ -1171,11 +1180,11 @@ active_flag_not_updated_when_consumer_suspected_unsuspected_and_single_active_co {State2, _, Effects2} = apply(meta(2), {down, Pid1, noconnection}, State1), % one monitor and one consumer status update (deactivated) - ?assertEqual(3, length(Effects2)), + ?assertEqual(4, length(Effects2)), {_, _, Effects3} = apply(meta(3), {nodeup, node(self())}, State2), % for each consumer: 1 effect to monitor the consumer PID - ?assertEqual(5, length(Effects3)). + ?assertEqual(6, length(Effects3)). single_active_cancelled_with_unacked_test(_) -> State0 = init(#{name => ?FUNCTION_NAME, diff --git a/deps/rabbitmq_federation/test/queue_SUITE.erl b/deps/rabbitmq_federation/test/queue_SUITE.erl index 82b7cd6499..62f9abcc4c 100644 --- a/deps/rabbitmq_federation/test/queue_SUITE.erl +++ b/deps/rabbitmq_federation/test/queue_SUITE.erl @@ -15,7 +15,7 @@ -import(rabbit_federation_test_util, [wait_for_federation/2, expect/3, expect/4, set_upstream/4, set_upstream/5, clear_upstream/3, set_policy/5, clear_policy/3, - set_policy_pattern/5, set_policy_upstream/5, q/1, with_ch/3, + set_policy_pattern/5, set_policy_upstream/5, q/2, with_ch/3, declare_queue/2, delete_queue/2, federation_links_in_vhost/3]). @@ -24,30 +24,37 @@ all() -> [ - {group, without_disambiguate}, - {group, with_disambiguate} + {group, classic_queue}, + {group, quorum_queue} ]. groups() -> - [ - {without_disambiguate, [], [ - {cluster_size_1, [], [ - simple, - multiple_upstreams, - multiple_upstreams_pattern, - multiple_downstreams, - bidirectional, - dynamic_reconfiguration, - federate_unfederate, - dynamic_plugin_stop_start - ]} - ]}, - {with_disambiguate, [], [ - {cluster_size_2, [], [ - restart_upstream - ]} - ]} - ]. + ClusterSize1 = [simple, + multiple_upstreams, + multiple_upstreams_pattern, + multiple_downstreams, + bidirectional, + dynamic_reconfiguration, + federate_unfederate, + dynamic_plugin_stop_start + ], + ClusterSize2 = [restart_upstream], + [{classic_queue, [], [ + {without_disambiguate, [], [ + {cluster_size_1, [], ClusterSize1} + ]}, + {with_disambiguate, [], [ + {cluster_size_2, [], ClusterSize2} + ]} + ]}, + {quorum_queue, [], [ + {without_disambiguate, [], [ + {cluster_size_1, [], ClusterSize1} + ]}, + {with_disambiguate, [], [ + {cluster_size_2, [], ClusterSize2} + ]} + ]}]. %% ------------------------------------------------------------------- %% Testsuite setup/teardown. @@ -60,6 +67,16 @@ init_per_suite(Config) -> end_per_suite(Config) -> rabbit_ct_helpers:run_teardown_steps(Config). +init_per_group(classic_queue, Config) -> + rabbit_ct_helpers:set_config( + Config, + [{queue_type, classic}, + {queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]}]); +init_per_group(quorum_queue, Config) -> + rabbit_ct_helpers:set_config( + Config, + [{queue_type, quorum}, + {queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]}]); init_per_group(without_disambiguate, Config) -> rabbit_ct_helpers:set_config(Config, {disambiguate_step, []}); @@ -88,15 +105,30 @@ init_per_group1(Group, Config) -> {rmq_nodename_suffix, Suffix}, {rmq_nodes_clustered, false} ]), - rabbit_ct_helpers:run_steps(Config1, - rabbit_ct_broker_helpers:setup_steps() ++ - rabbit_ct_client_helpers:setup_steps() ++ - SetupFederation ++ Disambiguate). + Config2 = rabbit_ct_helpers:run_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps() ++ + SetupFederation ++ Disambiguate), + case ?config(queue_type, Config2) of + quorum -> + case rabbit_ct_broker_helpers:enable_feature_flag(Config2, quorum_queue) of + ok -> + Config2; + Skip -> + Skip + end; + _ -> + Config2 + end. end_per_group(without_disambiguate, Config) -> Config; end_per_group(with_disambiguate, Config) -> Config; +end_per_group(classic_queue, Config) -> + Config; +end_per_group(quorum_queue, Config) -> + Config; end_per_group(_, Config) -> rabbit_ct_helpers:run_steps(Config, rabbit_ct_client_helpers:teardown_steps() ++ @@ -113,19 +145,21 @@ end_per_testcase(Testcase, Config) -> %% ------------------------------------------------------------------- simple(Config) -> + Args = ?config(queue_args, Config), with_ch(Config, fun (Ch) -> expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>) - end, upstream_downstream()). + end, upstream_downstream(Args)). multiple_upstreams(Config) -> + Args = ?config(queue_args, Config), with_ch(Config, fun (Ch) -> expect_federation(Ch, <<"upstream">>, <<"fed12.downstream">>), expect_federation(Ch, <<"upstream2">>, <<"fed12.downstream">>) - end, [q(<<"upstream">>), - q(<<"upstream2">>), - q(<<"fed12.downstream">>)]). + end, [q(<<"upstream">>, Args), + q(<<"upstream2">>, Args), + q(<<"fed12.downstream">>, Args)]). multiple_upstreams_pattern(Config) -> set_upstream(Config, 0, <<"local453x">>, @@ -145,43 +179,51 @@ multiple_upstreams_pattern(Config) -> set_policy_pattern(Config, 0, <<"pattern">>, <<"^pattern\.">>, <<"local\\d+x">>), + Args = ?config(queue_args, Config), with_ch(Config, fun (Ch) -> expect_federation(Ch, <<"upstream">>, <<"pattern.downstream">>, ?EXPECT_FEDERATION_TIMEOUT), expect_federation(Ch, <<"upstream2">>, <<"pattern.downstream">>, ?EXPECT_FEDERATION_TIMEOUT) - end, [q(<<"upstream">>), - q(<<"upstream2">>), - q(<<"pattern.downstream">>)]), + end, [q(<<"upstream">>, Args), + q(<<"upstream2">>, Args), + q(<<"pattern.downstream">>, Args)]), clear_upstream(Config, 0, <<"local453x">>), clear_upstream(Config, 0, <<"local3214x">>), clear_policy(Config, 0, <<"pattern">>). multiple_downstreams(Config) -> + Args = ?config(queue_args, Config), with_ch(Config, fun (Ch) -> timer:sleep(?INITIAL_WAIT), expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>, ?EXPECT_FEDERATION_TIMEOUT), expect_federation(Ch, <<"upstream">>, <<"fed.downstream2">>, ?EXPECT_FEDERATION_TIMEOUT) - end, upstream_downstream() ++ [q(<<"fed.downstream2">>)]). + end, upstream_downstream(Args) ++ [q(<<"fed.downstream2">>, Args)]). bidirectional(Config) -> + Args = ?config(queue_args, Config), with_ch(Config, fun (Ch) -> timer:sleep(?INITIAL_WAIT), publish_expect(Ch, <<>>, <<"one">>, <<"one">>, <<"first one">>, ?EXPECT_FEDERATION_TIMEOUT), publish_expect(Ch, <<>>, <<"two">>, <<"two">>, <<"first two">>, ?EXPECT_FEDERATION_TIMEOUT), - Seq = lists:seq(1, 100), + Seq = lists:seq(1, 50), [publish(Ch, <<>>, <<"one">>, <<"bulk">>) || _ <- Seq], [publish(Ch, <<>>, <<"two">>, <<"bulk">>) || _ <- Seq], - expect(Ch, <<"one">>, repeat(150, <<"bulk">>)), - expect(Ch, <<"two">>, repeat(50, <<"bulk">>)), + expect(Ch, <<"one">>, repeat(100, <<"bulk">>)), + expect_empty(Ch, <<"one">>), + expect_empty(Ch, <<"two">>), + [publish(Ch, <<>>, <<"one">>, <<"bulk">>) || _ <- Seq], + [publish(Ch, <<>>, <<"two">>, <<"bulk">>) || _ <- Seq], + expect(Ch, <<"two">>, repeat(100, <<"bulk">>)), expect_empty(Ch, <<"one">>), expect_empty(Ch, <<"two">>) - end, [q(<<"one">>), - q(<<"two">>)]). + end, [q(<<"one">>, Args), + q(<<"two">>, Args)]). dynamic_reconfiguration(Config) -> + Args = ?config(queue_args, Config), with_ch(Config, fun (Ch) -> timer:sleep(?INITIAL_WAIT), @@ -199,9 +241,10 @@ dynamic_reconfiguration(Config) -> set_upstream(Config, 0, <<"localhost">>, URI), set_upstream(Config, 0, <<"localhost">>, URI), expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>) - end, upstream_downstream()). + end, upstream_downstream(Args)). federate_unfederate(Config) -> + Args = ?config(queue_args, Config), with_ch(Config, fun (Ch) -> timer:sleep(?INITIAL_WAIT), @@ -217,10 +260,11 @@ federate_unfederate(Config) -> rabbit_ct_broker_helpers:set_policy(Config, 0, <<"fed">>, <<"^fed\.">>, <<"all">>, [ {<<"federation-upstream-set">>, <<"upstream">>}]) - end, upstream_downstream() ++ [q(<<"fed.downstream2">>)]). + end, upstream_downstream(Args) ++ [q(<<"fed.downstream2">>, Args)]). dynamic_plugin_stop_start(Config) -> DownQ2 = <<"fed.downstream2">>, + Args = ?config(queue_args, Config), with_ch(Config, fun (Ch) -> timer:sleep(?INITIAL_WAIT), @@ -235,8 +279,8 @@ dynamic_plugin_stop_start(Config) -> expect_no_federation(Ch, UpQ, DownQ1), expect_no_federation(Ch, UpQ, DownQ2), - declare_queue(Ch, q(DownQ1)), - declare_queue(Ch, q(DownQ2)), + declare_queue(Ch, q(DownQ1, Args)), + declare_queue(Ch, q(DownQ2, Args)), ok = rabbit_ct_broker_helpers:enable_plugin(Config, 0, "rabbitmq_federation"), %% Declare a queue then re-enable the plugin, the links appear @@ -255,7 +299,7 @@ dynamic_plugin_stop_start(Config) -> length(L) =:= 2 end), expect_federation(Ch, UpQ, DownQ1, 120000) - end, upstream_downstream() ++ [q(DownQ2)]). + end, upstream_downstream(Args) ++ [q(DownQ2, Args)]). restart_upstream(Config) -> [Rabbit, Hare] = rabbit_ct_broker_helpers:get_node_configs(Config, @@ -266,8 +310,9 @@ restart_upstream(Config) -> Downstream = rabbit_ct_client_helpers:open_channel(Config, Rabbit), Upstream = rabbit_ct_client_helpers:open_channel(Config, Hare), - declare_queue(Upstream, q(<<"test">>)), - declare_queue(Downstream, q(<<"test">>)), + Args = ?config(queue_args, Config), + declare_queue(Upstream, q(<<"test">>, Args)), + declare_queue(Downstream, q(<<"test">>, Args)), Seq = lists:seq(1, 100), [publish(Upstream, <<>>, <<"test">>, <<"bulk">>) || _ <- Seq], expect(Upstream, <<"test">>, repeat(25, <<"bulk">>)), @@ -325,4 +370,7 @@ expect_no_federation(Ch, UpstreamQ, DownstreamQ) -> expect(Ch, UpstreamQ, [<<"HELLO">>]). upstream_downstream() -> - [q(<<"upstream">>), q(<<"fed.downstream">>)]. + upstream_downstream([]). + +upstream_downstream(Args) -> + [q(<<"upstream">>, Args), q(<<"fed.downstream">>, Args)]. diff --git a/deps/rabbitmq_federation/test/rabbit_federation_test_util.erl b/deps/rabbitmq_federation/test/rabbit_federation_test_util.erl index 471ba0ecee..577934bf10 100644 --- a/deps/rabbitmq_federation/test/rabbit_federation_test_util.erl +++ b/deps/rabbitmq_federation/test/rabbit_federation_test_util.erl @@ -176,10 +176,10 @@ expect([], _Timeout) -> ok; expect(Payloads, Timeout) -> receive - {#'basic.deliver'{}, #amqp_msg{payload = Payload}} -> + {#'basic.deliver'{delivery_tag = DTag}, #amqp_msg{payload = Payload}} -> case lists:member(Payload, Payloads) of true -> - ct:pal("Consumed a message: ~p", [Payload]), + ct:pal("Consumed a message: ~p ~p left: ~p", [Payload, DTag, length(Payloads) - 1]), expect(Payloads -- [Payload], Timeout); false -> ?assert(false, rabbit_misc:format("received an unexpected payload ~p", [Payload])) end @@ -350,5 +350,9 @@ delete_queue(Ch, Q) -> amqp_channel:call(Ch, #'queue.delete'{queue = Q}). q(Name) -> + q(Name, []). + +q(Name, Args) -> #'queue.declare'{queue = Name, - durable = true}. + durable = true, + arguments = Args}. |