summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <klishinm@vmware.com>2021-02-25 19:10:15 +0300
committerGitHub <noreply@github.com>2021-02-25 19:10:15 +0300
commita2f98f25e99a33824510a4bbf7809a47764d44bb (patch)
treeee5d52eacdddbb5de081d5fe07530923b02e82f6
parent17b082abeb19340eea98a9700b32a964db75dadc (diff)
parent6778c1fea3227c3fba9bc495d53648fea739eb20 (diff)
downloadrabbitmq-server-git-a2f98f25e99a33824510a4bbf7809a47764d44bb.tar.gz
Merge pull request #2804 from rabbitmq/rabbitmq-server-2756
Add federation support for quorum queues
-rw-r--r--deps/rabbit/src/rabbit_amqqueue.erl8
-rw-r--r--deps/rabbit/src/rabbit_classic_queue.erl18
-rw-r--r--deps/rabbit/src/rabbit_fifo.erl74
-rw-r--r--deps/rabbit/src/rabbit_queue_type.erl14
-rw-r--r--deps/rabbit/src/rabbit_quorum_queue.erl46
-rw-r--r--deps/rabbit/src/rabbit_stream_queue.erl18
-rw-r--r--deps/rabbit/test/rabbit_fifo_SUITE.erl41
-rw-r--r--deps/rabbitmq_federation/src/rabbit_federation_queue.erl3
-rw-r--r--deps/rabbitmq_federation/test/queue_SUITE.erl142
-rw-r--r--deps/rabbitmq_federation/test/rabbit_federation_test_util.erl10
10 files changed, 270 insertions, 104 deletions
diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl
index 56c75e104b..5a3a60377c 100644
--- a/deps/rabbit/src/rabbit_amqqueue.erl
+++ b/deps/rabbit/src/rabbit_amqqueue.erl
@@ -1634,12 +1634,8 @@ basic_cancel(Q, ConsumerTag, OkMsg, ActingUser, QStates) ->
-spec notify_decorators(amqqueue:amqqueue()) -> 'ok'.
-notify_decorators(Q) when ?amqqueue_is_classic(Q) ->
- QPid = amqqueue:get_pid(Q),
- delegate:invoke_no_result(QPid, {gen_server2, cast, [notify_decorators]});
-notify_decorators(_Q) ->
- %% Not supported by any other queue type
- ok.
+notify_decorators(Q) ->
+ rabbit_queue_type:notify_decorators(Q).
notify_sent(QPid, ChPid) ->
rabbit_amqqueue_common:notify_sent(QPid, ChPid).
diff --git a/deps/rabbit/src/rabbit_classic_queue.erl b/deps/rabbit/src/rabbit_classic_queue.erl
index e53c0aecc2..91cfde038f 100644
--- a/deps/rabbit/src/rabbit_classic_queue.erl
+++ b/deps/rabbit/src/rabbit_classic_queue.erl
@@ -38,7 +38,8 @@
dequeue/4,
info/2,
state_info/1,
- capabilities/0
+ capabilities/0,
+ notify_decorators/1
]).
-export([delete_crashed/1,
@@ -441,14 +442,9 @@ recover_durable_queues(QueuesAndRecoveryTerms) ->
[Q || {_, {new, Q}} <- Results].
capabilities() ->
- #{policies => [<<"expires">>, <<"message-ttl">>, <<"dead-letter-exchange">>,
- <<"dead-letter-routing-key">>, <<"max-length">>,
- <<"max-length-bytes">>, <<"max-in-memory-length">>, <<"max-in-memory-bytes">>,
- <<"max-priority">>, <<"overflow">>, <<"queue-mode">>,
- <<"single-active-consumer">>, <<"delivery-limit">>,
- <<"ha-mode">>, <<"ha-params">>, <<"ha-sync-mode">>,
- <<"ha-promote-on-shutdown">>, <<"ha-promote-on-failure">>,
- <<"queue-master-locator">>],
+ #{policies => [ %% Stream policies
+ <<"max-age">>, <<"max-segment-size">>,
+ <<"queue-leader-locator">>, <<"initial-cluster-size">>],
queue_arguments => [<<"x-expires">>, <<"x-message-ttl">>, <<"x-dead-letter-exchange">>,
<<"x-dead-letter-routing-key">>, <<"x-max-length">>,
<<"x-max-length-bytes">>, <<"x-max-in-memory-length">>,
@@ -460,6 +456,10 @@ capabilities() ->
],
server_named => true}.
+notify_decorators(Q) when ?is_amqqueue(Q) ->
+ QPid = amqqueue:get_pid(Q),
+ delegate:invoke_no_result(QPid, {gen_server2, cast, [notify_decorators]}).
+
reject_seq_no(SeqNo, U0) ->
reject_seq_no(SeqNo, U0, []).
diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl
index e2f69d047b..974ab7b7a0 100644
--- a/deps/rabbit/src/rabbit_fifo.erl
+++ b/deps/rabbit/src/rabbit_fifo.erl
@@ -41,6 +41,7 @@
query_single_active_consumer/1,
query_in_memory_usage/1,
query_peek/2,
+ query_notify_decorators_info/1,
usage/1,
zero/1,
@@ -241,7 +242,7 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
{State1, ok, Effects} =
checkout(Meta, State0,
State0#?MODULE{service_queue = ServiceQueue,
- consumers = Cons}, []),
+ consumers = Cons}, [], false),
Response = {send_credit_reply, messages_ready(State1)},
%% by this point all checkouts for the updated credit value
%% should be processed so we can evaluate the drain
@@ -299,7 +300,8 @@ apply(#{index := Index,
Exists = maps:is_key(ConsumerId, Consumers),
case messages_ready(State0) of
0 ->
- update_smallest_raft_index(Index, {dequeue, empty}, State0, []);
+ update_smallest_raft_index(Index, {dequeue, empty}, State0,
+ [notify_decorators_effect(State0)]);
_ when Exists ->
%% a dequeue using the same consumer_id isn't possible at this point
{State0, {dequeue, empty}};
@@ -330,8 +332,8 @@ apply(#{index := Index,
{{dequeue, {MsgId, Msg}, Ready-1}, Effects1}
end,
-
- case evaluate_limit(Index, false, State0, State4, Effects2) of
+ NotifyEffect = notify_decorators_effect(State4),
+ case evaluate_limit(Index, false, State0, State4, [NotifyEffect | Effects2]) of
{State, true, Effects} ->
update_smallest_raft_index(Index, Reply, State, Effects);
{State, false, Effects} ->
@@ -456,6 +458,7 @@ apply(#{system_time := Ts} = Meta, {down, Pid, noconnection},
% Monitor the node so that we can "unsuspect" these processes when the node
% comes back, then re-issue all monitors and discover the final fate of
% these processes
+
Effects = case maps:size(State#?MODULE.consumers) of
0 ->
[{aux, inactive}, {monitor, node, Node}];
@@ -959,6 +962,21 @@ query_peek(Pos, State0) when Pos > 0 ->
query_peek(Pos-1, State)
end.
+query_notify_decorators_info(#?MODULE{consumers = Consumers} = State) ->
+ MaxActivePriority = maps:fold(fun(_, #consumer{credit = C,
+ status = up,
+ priority = P0}, MaxP) when C > 0 ->
+ P = -P0,
+ case MaxP of
+ empty -> P;
+ MaxP when MaxP > P -> MaxP;
+ _ -> P
+ end;
+ (_, _, MaxP) ->
+ MaxP
+ end, empty, Consumers),
+ IsEmpty = (messages_ready(State) == 0),
+ {MaxActivePriority, IsEmpty}.
-spec usage(atom()) -> float().
usage(Name) when is_atom(Name) ->
@@ -1064,11 +1082,13 @@ cancel_consumer0(Meta, ConsumerId,
#{ConsumerId := Consumer} ->
{S, Effects2} = maybe_return_all(Meta, ConsumerId, Consumer,
S0, Effects0, Reason),
+
%% The effects are emitted before the consumer is actually removed
%% if the consumer has unacked messages. This is a bit weird but
%% in line with what classic queues do (from an external point of
%% view)
Effects = cancel_consumer_effects(ConsumerId, S, Effects2),
+
case maps:size(S#?MODULE.consumers) of
0 ->
{S, [{aux, inactive} | Effects]};
@@ -1131,7 +1151,7 @@ apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) ->
case maybe_enqueue(RaftIdx, From, Seq, RawMsg, [], State0) of
{ok, State1, Effects1} ->
State2 = append_to_master_index(RaftIdx, State1),
- {State, ok, Effects} = checkout(Meta, State0, State2, Effects1),
+ {State, ok, Effects} = checkout(Meta, State0, State2, Effects1, false),
{maybe_store_dehydrated_state(RaftIdx, State), ok, Effects};
{duplicate, State, Effects} ->
{State, ok, Effects}
@@ -1289,7 +1309,7 @@ return(#{index := IncomingRaftIdx} = Meta, ConsumerId, Returned,
_ ->
State1
end,
- {State, ok, Effects} = checkout(Meta, State0, State2, Effects1),
+ {State, ok, Effects} = checkout(Meta, State0, State2, Effects1, false),
update_smallest_raft_index(IncomingRaftIdx, State, Effects).
% used to processes messages that are finished
@@ -1333,7 +1353,7 @@ complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId,
Discarded = maps:with(MsgIds, Checked0),
{State2, Effects1} = complete(Meta, ConsumerId, Discarded, Con0,
Effects0, State0),
- {State, ok, Effects} = checkout(Meta, State0, State2, Effects1),
+ {State, ok, Effects} = checkout(Meta, State0, State2, Effects1, false),
update_smallest_raft_index(IncomingRaftIdx, State, Effects).
dead_letter_effects(_Reason, _Discarded,
@@ -1365,9 +1385,10 @@ dead_letter_effects(Reason, Discarded,
end} | Effects].
cancel_consumer_effects(ConsumerId,
- #?MODULE{cfg = #cfg{resource = QName}}, Effects) ->
+ #?MODULE{cfg = #cfg{resource = QName}} = State, Effects) ->
[{mod_call, rabbit_quorum_queue,
- cancel_consumer_handler, [QName, ConsumerId]} | Effects].
+ cancel_consumer_handler, [QName, ConsumerId]},
+ notify_decorators_effect(State) | Effects].
update_smallest_raft_index(Idx, State, Effects) ->
update_smallest_raft_index(Idx, ok, State, Effects).
@@ -1502,14 +1523,30 @@ return_all(Meta, #?MODULE{consumers = Cons} = State0, Effects0, ConsumerId,
end, {State, Effects0}, Checked).
%% checkout new messages to consumers
-checkout(#{index := Index} = Meta, OldState, State0, Effects0) ->
+checkout(Meta, OldState, State, Effects) ->
+ checkout(Meta, OldState, State, Effects, true).
+
+checkout(#{index := Index} = Meta, #?MODULE{cfg = #cfg{resource = QName}} = OldState, State0,
+ Effects0, HandleConsumerChanges) ->
{State1, _Result, Effects1} = checkout0(Meta, checkout_one(Meta, State0),
Effects0, {#{}, #{}}),
case evaluate_limit(Index, false, OldState, State1, Effects1) of
{State, true, Effects} ->
- update_smallest_raft_index(Index, State, Effects);
+ case maybe_notify_decorators(State, HandleConsumerChanges) of
+ {true, {MaxActivePriority, IsEmpty}} ->
+ NotifyEffect = notify_decorators_effect(QName, MaxActivePriority, IsEmpty),
+ update_smallest_raft_index(Index, State, [NotifyEffect | Effects]);
+ false ->
+ update_smallest_raft_index(Index, State, Effects)
+ end;
{State, false, Effects} ->
- {State, ok, Effects}
+ case maybe_notify_decorators(State, HandleConsumerChanges) of
+ {true, {MaxActivePriority, IsEmpty}} ->
+ NotifyEffect = notify_decorators_effect(QName, MaxActivePriority, IsEmpty),
+ {State, ok, [NotifyEffect | Effects]};
+ false ->
+ {State, ok, Effects}
+ end
end.
checkout0(Meta, {success, ConsumerId, MsgId, {RaftIdx, {Header, 'empty'}}, State},
@@ -2128,3 +2165,16 @@ get_priority_from_args(#{args := Args}) ->
end;
get_priority_from_args(_) ->
0.
+
+maybe_notify_decorators(_, false) ->
+ false;
+maybe_notify_decorators(State, _) ->
+ {true, query_notify_decorators_info(State)}.
+
+notify_decorators_effect(#?MODULE{cfg = #cfg{resource = QName}} = State) ->
+ {MaxActivePriority, IsEmpty} = query_notify_decorators_info(State),
+ notify_decorators_effect(QName, MaxActivePriority, IsEmpty).
+
+notify_decorators_effect(QName, MaxActivePriority, IsEmpty) ->
+ {mod_call, rabbit_quorum_queue, spawn_notify_decorators,
+ [QName, consumer_state_changed, [MaxActivePriority, IsEmpty]]}.
diff --git a/deps/rabbit/src/rabbit_queue_type.erl b/deps/rabbit/src/rabbit_queue_type.erl
index 6ecaf40c7d..47bf95a982 100644
--- a/deps/rabbit/src/rabbit_queue_type.erl
+++ b/deps/rabbit/src/rabbit_queue_type.erl
@@ -33,7 +33,8 @@
dequeue/5,
fold_state/3,
is_policy_applicable/2,
- is_server_named_allowed/1
+ is_server_named_allowed/1,
+ notify_decorators/1
]).
-type queue_name() :: rabbit_types:r(queue).
@@ -198,6 +199,9 @@
-callback capabilities() ->
#{atom() := term()}.
+-callback notify_decorators(amqqueue:amqqueue()) ->
+ ok.
+
%% TODO: this should be controlled by a registry that is populated on boot
discover(<<"quorum">>) ->
rabbit_quorum_queue;
@@ -298,15 +302,19 @@ i_down(_K, _Q, _DownReason) -> ''.
is_policy_applicable(Q, Policy) ->
Mod = amqqueue:get_type(Q),
Capabilities = Mod:capabilities(),
- Applicable = maps:get(policies, Capabilities, []),
+ NotApplicable = maps:get(policies, Capabilities, []),
lists:all(fun({P, _}) ->
- lists:member(P, Applicable)
+ not lists:member(P, NotApplicable)
end, Policy).
is_server_named_allowed(Type) ->
Capabilities = Type:capabilities(),
maps:get(server_named, Capabilities, false).
+notify_decorators(Q) ->
+ Mod = amqqueue:get_type(Q),
+ Mod:notify_decorators(Q).
+
-spec init() -> state().
init() ->
#?STATE{}.
diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl
index 52b9930ab6..af5373012c 100644
--- a/deps/rabbit/src/rabbit_quorum_queue.erl
+++ b/deps/rabbit/src/rabbit_quorum_queue.erl
@@ -48,6 +48,9 @@
repair_amqqueue_nodes/2
]).
-export([reclaim_memory/2]).
+-export([notify_decorators/1,
+ notify_decorators/3,
+ spawn_notify_decorators/3]).
-export([is_enabled/0,
declare/2]).
@@ -172,6 +175,7 @@ start_cluster(Q) ->
ra_machine_config(NewQ)),
%% force a policy change to ensure the latest config is
%% updated even when running the machine version from 0
+ notify_decorators(QName, startup),
rabbit_event:notify(queue_created,
[{name, QName},
{durable, Durable},
@@ -346,9 +350,14 @@ filter_quorum_critical(Queues, ReplicaStates) ->
end, Queues).
capabilities() ->
- #{policies => [<<"max-length">>, <<"max-length-bytes">>, <<"overflow">>,
- <<"expires">>, <<"max-in-memory-length">>, <<"max-in-memory-bytes">>,
- <<"delivery-limit">>, <<"dead-letter-exchange">>, <<"dead-letter-routing-key">>],
+ #{policies => [ %% Classic policies
+ <<"message-ttl">>, <<"max-priority">>, <<"queue-mode">>,
+ <<"single-active-consumer">>, <<"ha-mode">>, <<"ha-params">>,
+ <<"ha-sync-mode">>, <<"ha-promote-on-shutdown">>, <<"ha-promote-on-failure">>,
+ <<"queue-master-locator">>,
+ %% Stream policies
+ <<"max-age">>, <<"max-segment-size">>,
+ <<"queue-leader-locator">>, <<"initial-cluster-size">>],
queue_arguments => [<<"x-expires">>, <<"x-dead-letter-exchange">>,
<<"x-dead-letter-routing-key">>, <<"x-max-length">>,
<<"x-max-length-bytes">>, <<"x-max-in-memory-length">>,
@@ -369,6 +378,11 @@ spawn_deleter(QName) ->
delete(Q, false, false, <<"expired">>)
end).
+spawn_notify_decorators(QName, Fun, Args) ->
+ spawn(fun () ->
+ notify_decorators(QName, Fun, Args)
+ end).
+
handle_tick(QName,
{Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack},
Nodes) ->
@@ -569,6 +583,7 @@ delete(Q, _IfUnused, _IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) ->
after Timeout ->
ok = force_delete_queue(Servers)
end,
+ notify_decorators(QName, shutdown),
ok = delete_queue_data(QName, ActingUser),
rpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName],
?RPC_TIMEOUT),
@@ -590,6 +605,7 @@ delete(Q, _IfUnused, _IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) ->
" Attempting force delete.",
[rabbit_misc:rs(QName), Errs]),
ok = force_delete_queue(Servers),
+ notify_decorators(QName, shutdown),
delete_queue_data(QName, ActingUser),
{ok, ReadyMsgs}
end
@@ -1526,3 +1542,27 @@ parse_credit_args(Default, Args) ->
undefined ->
{simple_prefetch, Default, false}
end.
+
+-spec notify_decorators(amqqueue:amqqueue()) -> 'ok'.
+notify_decorators(Q) when ?is_amqqueue(Q) ->
+ QName = amqqueue:get_name(Q),
+ QPid = amqqueue:get_pid(Q),
+ case ra:local_query(QPid, fun rabbit_fifo:query_notify_decorators_info/1) of
+ {ok, {_, {MaxActivePriority, IsEmpty}}, _} ->
+ notify_decorators(QName, consumer_state_changed, [MaxActivePriority, IsEmpty]);
+ _ -> ok
+ end.
+
+notify_decorators(QName, Event) ->
+ notify_decorators(QName, Event, []).
+
+notify_decorators(QName, F, A) ->
+ %% Look up again in case policy and hence decorators have changed
+ case rabbit_amqqueue:lookup(QName) of
+ {ok, Q} ->
+ Ds = amqqueue:get_decorators(Q),
+ [ok = apply(M, F, [Q|A]) || M <- rabbit_queue_decorator:select(Ds)],
+ ok;
+ {error, not_found} ->
+ ok
+ end.
diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl
index 206584fe61..0fe32e52a7 100644
--- a/deps/rabbit/src/rabbit_stream_queue.erl
+++ b/deps/rabbit/src/rabbit_stream_queue.erl
@@ -37,7 +37,8 @@
update/2,
state_info/1,
stat/1,
- capabilities/0]).
+ capabilities/0,
+ notify_decorators/1]).
-export([set_retention_policy/3]).
-export([add_replica/3,
@@ -744,8 +745,15 @@ msg_to_iodata(#basic_message{exchange_name = #resource{name = Exchange},
rabbit_msg_record:to_iodata(R).
capabilities() ->
- #{policies => [<<"max-length-bytes">>, <<"max-age">>, <<"max-segment-size">>,
- <<"queue-leader-locator">>, <<"initial-cluster-size">>],
+ #{policies => [ %% Classic policies
+ <<"expires">>, <<"message-ttl">>, <<"dead-letter-exchange">>,
+ <<"dead-letter-routing-key">>, <<"max-length">>,
+ <<"max-in-memory-length">>, <<"max-in-memory-bytes">>,
+ <<"max-priority">>, <<"overflow">>, <<"queue-mode">>,
+ <<"single-active-consumer">>, <<"delivery-limit">>,
+ <<"ha-mode">>, <<"ha-params">>, <<"ha-sync-mode">>,
+ <<"ha-promote-on-shutdown">>, <<"ha-promote-on-failure">>,
+ <<"queue-master-locator">>],
queue_arguments => [<<"x-dead-letter-exchange">>, <<"x-dead-letter-routing-key">>,
<<"x-max-length">>, <<"x-max-length-bytes">>,
<<"x-single-active-consumer">>, <<"x-queue-type">>,
@@ -754,6 +762,10 @@ capabilities() ->
consumer_arguments => [<<"x-stream-offset">>],
server_named => false}.
+notify_decorators(Q) when ?is_amqqueue(Q) ->
+ %% Not supported
+ ok.
+
resend_all(#stream_client{leader = LeaderPid,
writer_id = WriterId,
correlation = Corrs} = State) ->
diff --git a/deps/rabbit/test/rabbit_fifo_SUITE.erl b/deps/rabbit/test/rabbit_fifo_SUITE.erl
index 1b8761f90e..c92437fcee 100644
--- a/deps/rabbit/test/rabbit_fifo_SUITE.erl
+++ b/deps/rabbit/test/rabbit_fifo_SUITE.erl
@@ -177,7 +177,8 @@ enq_enq_deq_test(_) ->
{State2, _} = enq(2, 2, second, State1),
% get returns a reply value
NumReady = 1,
- {_State3, {dequeue, {0, {_, first}}, NumReady}, [{monitor, _, _}]} =
+ {_State3, {dequeue, {0, {_, first}}, NumReady},
+ [{mod_call, rabbit_quorum_queue, spawn_notify_decorators, _}, {monitor, _, _}]} =
apply(meta(3), rabbit_fifo:make_checkout(Cid, {dequeue, unsettled}, #{}),
State2),
ok.
@@ -187,7 +188,8 @@ enq_enq_deq_deq_settle_test(_) ->
{State1, _} = enq(1, 1, first, test_init(test)),
{State2, _} = enq(2, 2, second, State1),
% get returns a reply value
- {State3, {dequeue, {0, {_, first}}, 1}, [{monitor, _, _}]} =
+ {State3, {dequeue, {0, {_, first}}, 1},
+ [{mod_call, rabbit_quorum_queue, spawn_notify_decorators, _}, {monitor, _, _}]} =
apply(meta(3), rabbit_fifo:make_checkout(Cid, {dequeue, unsettled}, #{}),
State2),
{_State4, {dequeue, empty}} =
@@ -235,7 +237,8 @@ release_cursor_test(_) ->
checkout_enq_settle_test(_) ->
Cid = {?FUNCTION_NAME, self()},
- {State1, [{monitor, _, _} | _]} = check(Cid, 1, test_init(test)),
+ {State1, [{mod_call, rabbit_quorum_queue, spawn_notify_decorators, _},
+ {monitor, _, _} | _]} = check(Cid, 1, test_init(test)),
{State2, Effects0} = enq(2, 1, first, State1),
?ASSERT_EFF({send_msg, _,
{delivery, ?FUNCTION_NAME,
@@ -250,7 +253,8 @@ checkout_enq_settle_test(_) ->
out_of_order_enqueue_test(_) ->
Cid = {?FUNCTION_NAME, self()},
- {State1, [{monitor, _, _} | _]} = check_n(Cid, 5, 5, test_init(test)),
+ {State1, [{mod_call, rabbit_quorum_queue, spawn_notify_decorators, _},
+ {monitor, _, _} | _]} = check_n(Cid, 5, 5, test_init(test)),
{State2, Effects2} = enq(2, 1, first, State1),
?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects2),
% assert monitor was set up
@@ -280,7 +284,8 @@ out_of_order_first_enqueue_test(_) ->
duplicate_enqueue_test(_) ->
Cid = {<<"duplicate_enqueue_test">>, self()},
- {State1, [{monitor, _, _} | _]} = check_n(Cid, 5, 5, test_init(test)),
+ {State1, [{mod_call, rabbit_quorum_queue, spawn_notify_decorators, _},
+ {monitor, _, _} | _]} = check_n(Cid, 5, 5, test_init(test)),
{State2, Effects2} = enq(2, 1, first, State1),
?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects2),
{_State3, Effects3} = enq(3, 1, first, State2),
@@ -331,7 +336,8 @@ return_non_existent_test(_) ->
return_checked_out_test(_) ->
Cid = {<<"cid">>, self()},
{State0, [_, _]} = enq(1, 1, first, test_init(test)),
- {State1, [_Monitor,
+ {State1, [{mod_call, rabbit_quorum_queue, spawn_notify_decorators, _},
+ _Monitor,
{send_msg, _, {delivery, _, [{MsgId, _}]}, _},
{aux, active} | _ ]} = check_auto(Cid, 2, State0),
% returning immediately checks out the same message again
@@ -348,7 +354,8 @@ return_checked_out_limit_test(_) ->
release_cursor_interval => 0,
delivery_limit => 1}),
{State0, [_, _]} = enq(1, 1, first, Init),
- {State1, [_Monitor,
+ {State1, [{mod_call, rabbit_quorum_queue, spawn_notify_decorators, _},
+ _Monitor,
{send_msg, _, {delivery, _, [{MsgId, _}]}, _},
{aux, active} | _ ]} = check_auto(Cid, 2, State0),
% returning immediately checks out the same message again
@@ -366,7 +373,8 @@ return_auto_checked_out_test(_) ->
{State0, [_]} = enq(2, 2, second, State00),
% it first active then inactive as the consumer took on but cannot take
% any more
- {State1, [_Monitor,
+ {State1, [{mod_call, rabbit_quorum_queue, spawn_notify_decorators, _},
+ _Monitor,
{send_msg, _, {delivery, _, [{MsgId, _}]}, _},
{aux, active},
{aux, inactive}
@@ -401,7 +409,7 @@ cancelled_checkout_out_test(_) ->
down_with_noproc_consumer_returns_unsettled_test(_) ->
Cid = {<<"down_consumer_returns_unsettled_test">>, self()},
{State0, [_, _]} = enq(1, 1, second, test_init(test)),
- {State1, [{monitor, process, Pid} | _]} = check(Cid, 2, State0),
+ {State1, [_, {monitor, process, Pid} | _]} = check(Cid, 2, State0),
{State2, _, _} = apply(meta(3), {down, Pid, noproc}, State1),
{_State, Effects} = check(Cid, 4, State2),
?ASSERT_EFF({monitor, process, _}, Effects),
@@ -600,7 +608,8 @@ purge_test(_) ->
{State2, {purge, 1}, _} = apply(meta(2), rabbit_fifo:make_purge(), State1),
{State3, _} = enq(3, 2, second, State2),
% get returns a reply value
- {_State4, {dequeue, {0, {_, second}}, _}, [{monitor, _, _}]} =
+ {_State4, {dequeue, {0, {_, second}}, _},
+ [{mod_call, rabbit_quorum_queue, spawn_notify_decorators, _}, {monitor, _, _}]} =
apply(meta(4), rabbit_fifo:make_checkout(Cid, {dequeue, unsettled}, #{}), State3),
ok.
@@ -1137,12 +1146,12 @@ active_flag_updated_when_consumer_suspected_unsuspected_test(_) ->
{State2, _, Effects2} = apply(#{index => 3,
system_time => 1500}, {down, Pid1, noconnection}, State1),
- % 1 effect to update the metrics of each consumer (they belong to the same node), 1 more effect to monitor the node
- ?assertEqual(4 + 1, length(Effects2)),
+ % 1 effect to update the metrics of each consumer (they belong to the same node), 1 more effect to monitor the node, 1 more decorators effect
+ ?assertEqual(4 + 1 + 1, length(Effects2)),
{_, _, Effects3} = apply(#{index => 4}, {nodeup, node(self())}, State2),
- % for each consumer: 1 effect to update the metrics, 1 effect to monitor the consumer PID
- ?assertEqual(4 + 4, length(Effects3)).
+ % for each consumer: 1 effect to update the metrics, 1 effect to monitor the consumer PID, 1 more decorators effect
+ ?assertEqual(4 + 4 + 1, length(Effects3)).
active_flag_not_updated_when_consumer_suspected_unsuspected_and_single_active_consumer_is_on_test(_) ->
State0 = init(#{name => ?FUNCTION_NAME,
@@ -1171,11 +1180,11 @@ active_flag_not_updated_when_consumer_suspected_unsuspected_and_single_active_co
{State2, _, Effects2} = apply(meta(2), {down, Pid1, noconnection}, State1),
% one monitor and one consumer status update (deactivated)
- ?assertEqual(3, length(Effects2)),
+ ?assertEqual(4, length(Effects2)),
{_, _, Effects3} = apply(meta(3), {nodeup, node(self())}, State2),
% for each consumer: 1 effect to monitor the consumer PID
- ?assertEqual(5, length(Effects3)).
+ ?assertEqual(6, length(Effects3)).
single_active_cancelled_with_unacked_test(_) ->
State0 = init(#{name => ?FUNCTION_NAME,
diff --git a/deps/rabbitmq_federation/src/rabbit_federation_queue.erl b/deps/rabbitmq_federation/src/rabbit_federation_queue.erl
index e3ad0f9cb6..ea996aa8de 100644
--- a/deps/rabbitmq_federation/src/rabbit_federation_queue.erl
+++ b/deps/rabbitmq_federation/src/rabbit_federation_queue.erl
@@ -48,8 +48,7 @@ policy_changed(Q1, Q2) when ?is_amqqueue(Q1) ->
QName = amqqueue:get_name(Q1),
case rabbit_amqqueue:lookup(QName) of
{ok, Q0} when ?is_amqqueue(Q0) ->
- QPid = amqqueue:get_pid(Q0),
- rpc:call(node(QPid), rabbit_federation_queue,
+ rpc:call(amqqueue:qnode(Q0), rabbit_federation_queue,
policy_changed_local, [Q1, Q2]);
{error, not_found} ->
ok
diff --git a/deps/rabbitmq_federation/test/queue_SUITE.erl b/deps/rabbitmq_federation/test/queue_SUITE.erl
index 82b7cd6499..62f9abcc4c 100644
--- a/deps/rabbitmq_federation/test/queue_SUITE.erl
+++ b/deps/rabbitmq_federation/test/queue_SUITE.erl
@@ -15,7 +15,7 @@
-import(rabbit_federation_test_util,
[wait_for_federation/2, expect/3, expect/4,
set_upstream/4, set_upstream/5, clear_upstream/3, set_policy/5, clear_policy/3,
- set_policy_pattern/5, set_policy_upstream/5, q/1, with_ch/3,
+ set_policy_pattern/5, set_policy_upstream/5, q/2, with_ch/3,
declare_queue/2, delete_queue/2,
federation_links_in_vhost/3]).
@@ -24,30 +24,37 @@
all() ->
[
- {group, without_disambiguate},
- {group, with_disambiguate}
+ {group, classic_queue},
+ {group, quorum_queue}
].
groups() ->
- [
- {without_disambiguate, [], [
- {cluster_size_1, [], [
- simple,
- multiple_upstreams,
- multiple_upstreams_pattern,
- multiple_downstreams,
- bidirectional,
- dynamic_reconfiguration,
- federate_unfederate,
- dynamic_plugin_stop_start
- ]}
- ]},
- {with_disambiguate, [], [
- {cluster_size_2, [], [
- restart_upstream
- ]}
- ]}
- ].
+ ClusterSize1 = [simple,
+ multiple_upstreams,
+ multiple_upstreams_pattern,
+ multiple_downstreams,
+ bidirectional,
+ dynamic_reconfiguration,
+ federate_unfederate,
+ dynamic_plugin_stop_start
+ ],
+ ClusterSize2 = [restart_upstream],
+ [{classic_queue, [], [
+ {without_disambiguate, [], [
+ {cluster_size_1, [], ClusterSize1}
+ ]},
+ {with_disambiguate, [], [
+ {cluster_size_2, [], ClusterSize2}
+ ]}
+ ]},
+ {quorum_queue, [], [
+ {without_disambiguate, [], [
+ {cluster_size_1, [], ClusterSize1}
+ ]},
+ {with_disambiguate, [], [
+ {cluster_size_2, [], ClusterSize2}
+ ]}
+ ]}].
%% -------------------------------------------------------------------
%% Testsuite setup/teardown.
@@ -60,6 +67,16 @@ init_per_suite(Config) ->
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).
+init_per_group(classic_queue, Config) ->
+ rabbit_ct_helpers:set_config(
+ Config,
+ [{queue_type, classic},
+ {queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]}]);
+init_per_group(quorum_queue, Config) ->
+ rabbit_ct_helpers:set_config(
+ Config,
+ [{queue_type, quorum},
+ {queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]}]);
init_per_group(without_disambiguate, Config) ->
rabbit_ct_helpers:set_config(Config,
{disambiguate_step, []});
@@ -88,15 +105,30 @@ init_per_group1(Group, Config) ->
{rmq_nodename_suffix, Suffix},
{rmq_nodes_clustered, false}
]),
- rabbit_ct_helpers:run_steps(Config1,
- rabbit_ct_broker_helpers:setup_steps() ++
- rabbit_ct_client_helpers:setup_steps() ++
- SetupFederation ++ Disambiguate).
+ Config2 = rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps() ++
+ SetupFederation ++ Disambiguate),
+ case ?config(queue_type, Config2) of
+ quorum ->
+ case rabbit_ct_broker_helpers:enable_feature_flag(Config2, quorum_queue) of
+ ok ->
+ Config2;
+ Skip ->
+ Skip
+ end;
+ _ ->
+ Config2
+ end.
end_per_group(without_disambiguate, Config) ->
Config;
end_per_group(with_disambiguate, Config) ->
Config;
+end_per_group(classic_queue, Config) ->
+ Config;
+end_per_group(quorum_queue, Config) ->
+ Config;
end_per_group(_, Config) ->
rabbit_ct_helpers:run_steps(Config,
rabbit_ct_client_helpers:teardown_steps() ++
@@ -113,19 +145,21 @@ end_per_testcase(Testcase, Config) ->
%% -------------------------------------------------------------------
simple(Config) ->
+ Args = ?config(queue_args, Config),
with_ch(Config,
fun (Ch) ->
expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>)
- end, upstream_downstream()).
+ end, upstream_downstream(Args)).
multiple_upstreams(Config) ->
+ Args = ?config(queue_args, Config),
with_ch(Config,
fun (Ch) ->
expect_federation(Ch, <<"upstream">>, <<"fed12.downstream">>),
expect_federation(Ch, <<"upstream2">>, <<"fed12.downstream">>)
- end, [q(<<"upstream">>),
- q(<<"upstream2">>),
- q(<<"fed12.downstream">>)]).
+ end, [q(<<"upstream">>, Args),
+ q(<<"upstream2">>, Args),
+ q(<<"fed12.downstream">>, Args)]).
multiple_upstreams_pattern(Config) ->
set_upstream(Config, 0, <<"local453x">>,
@@ -145,43 +179,51 @@ multiple_upstreams_pattern(Config) ->
set_policy_pattern(Config, 0, <<"pattern">>, <<"^pattern\.">>, <<"local\\d+x">>),
+ Args = ?config(queue_args, Config),
with_ch(Config,
fun (Ch) ->
expect_federation(Ch, <<"upstream">>, <<"pattern.downstream">>, ?EXPECT_FEDERATION_TIMEOUT),
expect_federation(Ch, <<"upstream2">>, <<"pattern.downstream">>, ?EXPECT_FEDERATION_TIMEOUT)
- end, [q(<<"upstream">>),
- q(<<"upstream2">>),
- q(<<"pattern.downstream">>)]),
+ end, [q(<<"upstream">>, Args),
+ q(<<"upstream2">>, Args),
+ q(<<"pattern.downstream">>, Args)]),
clear_upstream(Config, 0, <<"local453x">>),
clear_upstream(Config, 0, <<"local3214x">>),
clear_policy(Config, 0, <<"pattern">>).
multiple_downstreams(Config) ->
+ Args = ?config(queue_args, Config),
with_ch(Config,
fun (Ch) ->
timer:sleep(?INITIAL_WAIT),
expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>, ?EXPECT_FEDERATION_TIMEOUT),
expect_federation(Ch, <<"upstream">>, <<"fed.downstream2">>, ?EXPECT_FEDERATION_TIMEOUT)
- end, upstream_downstream() ++ [q(<<"fed.downstream2">>)]).
+ end, upstream_downstream(Args) ++ [q(<<"fed.downstream2">>, Args)]).
bidirectional(Config) ->
+ Args = ?config(queue_args, Config),
with_ch(Config,
fun (Ch) ->
timer:sleep(?INITIAL_WAIT),
publish_expect(Ch, <<>>, <<"one">>, <<"one">>, <<"first one">>, ?EXPECT_FEDERATION_TIMEOUT),
publish_expect(Ch, <<>>, <<"two">>, <<"two">>, <<"first two">>, ?EXPECT_FEDERATION_TIMEOUT),
- Seq = lists:seq(1, 100),
+ Seq = lists:seq(1, 50),
[publish(Ch, <<>>, <<"one">>, <<"bulk">>) || _ <- Seq],
[publish(Ch, <<>>, <<"two">>, <<"bulk">>) || _ <- Seq],
- expect(Ch, <<"one">>, repeat(150, <<"bulk">>)),
- expect(Ch, <<"two">>, repeat(50, <<"bulk">>)),
+ expect(Ch, <<"one">>, repeat(100, <<"bulk">>)),
+ expect_empty(Ch, <<"one">>),
+ expect_empty(Ch, <<"two">>),
+ [publish(Ch, <<>>, <<"one">>, <<"bulk">>) || _ <- Seq],
+ [publish(Ch, <<>>, <<"two">>, <<"bulk">>) || _ <- Seq],
+ expect(Ch, <<"two">>, repeat(100, <<"bulk">>)),
expect_empty(Ch, <<"one">>),
expect_empty(Ch, <<"two">>)
- end, [q(<<"one">>),
- q(<<"two">>)]).
+ end, [q(<<"one">>, Args),
+ q(<<"two">>, Args)]).
dynamic_reconfiguration(Config) ->
+ Args = ?config(queue_args, Config),
with_ch(Config,
fun (Ch) ->
timer:sleep(?INITIAL_WAIT),
@@ -199,9 +241,10 @@ dynamic_reconfiguration(Config) ->
set_upstream(Config, 0, <<"localhost">>, URI),
set_upstream(Config, 0, <<"localhost">>, URI),
expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>)
- end, upstream_downstream()).
+ end, upstream_downstream(Args)).
federate_unfederate(Config) ->
+ Args = ?config(queue_args, Config),
with_ch(Config,
fun (Ch) ->
timer:sleep(?INITIAL_WAIT),
@@ -217,10 +260,11 @@ federate_unfederate(Config) ->
rabbit_ct_broker_helpers:set_policy(Config, 0,
<<"fed">>, <<"^fed\.">>, <<"all">>, [
{<<"federation-upstream-set">>, <<"upstream">>}])
- end, upstream_downstream() ++ [q(<<"fed.downstream2">>)]).
+ end, upstream_downstream(Args) ++ [q(<<"fed.downstream2">>, Args)]).
dynamic_plugin_stop_start(Config) ->
DownQ2 = <<"fed.downstream2">>,
+ Args = ?config(queue_args, Config),
with_ch(Config,
fun (Ch) ->
timer:sleep(?INITIAL_WAIT),
@@ -235,8 +279,8 @@ dynamic_plugin_stop_start(Config) ->
expect_no_federation(Ch, UpQ, DownQ1),
expect_no_federation(Ch, UpQ, DownQ2),
- declare_queue(Ch, q(DownQ1)),
- declare_queue(Ch, q(DownQ2)),
+ declare_queue(Ch, q(DownQ1, Args)),
+ declare_queue(Ch, q(DownQ2, Args)),
ok = rabbit_ct_broker_helpers:enable_plugin(Config, 0, "rabbitmq_federation"),
%% Declare a queue then re-enable the plugin, the links appear
@@ -255,7 +299,7 @@ dynamic_plugin_stop_start(Config) ->
length(L) =:= 2
end),
expect_federation(Ch, UpQ, DownQ1, 120000)
- end, upstream_downstream() ++ [q(DownQ2)]).
+ end, upstream_downstream(Args) ++ [q(DownQ2, Args)]).
restart_upstream(Config) ->
[Rabbit, Hare] = rabbit_ct_broker_helpers:get_node_configs(Config,
@@ -266,8 +310,9 @@ restart_upstream(Config) ->
Downstream = rabbit_ct_client_helpers:open_channel(Config, Rabbit),
Upstream = rabbit_ct_client_helpers:open_channel(Config, Hare),
- declare_queue(Upstream, q(<<"test">>)),
- declare_queue(Downstream, q(<<"test">>)),
+ Args = ?config(queue_args, Config),
+ declare_queue(Upstream, q(<<"test">>, Args)),
+ declare_queue(Downstream, q(<<"test">>, Args)),
Seq = lists:seq(1, 100),
[publish(Upstream, <<>>, <<"test">>, <<"bulk">>) || _ <- Seq],
expect(Upstream, <<"test">>, repeat(25, <<"bulk">>)),
@@ -325,4 +370,7 @@ expect_no_federation(Ch, UpstreamQ, DownstreamQ) ->
expect(Ch, UpstreamQ, [<<"HELLO">>]).
upstream_downstream() ->
- [q(<<"upstream">>), q(<<"fed.downstream">>)].
+ upstream_downstream([]).
+
+upstream_downstream(Args) ->
+ [q(<<"upstream">>, Args), q(<<"fed.downstream">>, Args)].
diff --git a/deps/rabbitmq_federation/test/rabbit_federation_test_util.erl b/deps/rabbitmq_federation/test/rabbit_federation_test_util.erl
index a9e08dcb09..05e7b9e30a 100644
--- a/deps/rabbitmq_federation/test/rabbit_federation_test_util.erl
+++ b/deps/rabbitmq_federation/test/rabbit_federation_test_util.erl
@@ -181,10 +181,10 @@ expect([], _Timeout) ->
ok;
expect(Payloads, Timeout) ->
receive
- {#'basic.deliver'{}, #amqp_msg{payload = Payload}} ->
+ {#'basic.deliver'{delivery_tag = DTag}, #amqp_msg{payload = Payload}} ->
case lists:member(Payload, Payloads) of
true ->
- ct:pal("Consumed a message: ~p", [Payload]),
+ ct:pal("Consumed a message: ~p ~p left: ~p", [Payload, DTag, length(Payloads) - 1]),
expect(Payloads -- [Payload], Timeout);
false -> ?assert(false, rabbit_misc:format("received an unexpected payload ~p", [Payload]))
end
@@ -355,5 +355,9 @@ delete_queue(Ch, Q) ->
amqp_channel:call(Ch, #'queue.delete'{queue = Q}).
q(Name) ->
+ q(Name, []).
+
+q(Name, Args) ->
#'queue.declare'{queue = Name,
- durable = true}.
+ durable = true,
+ arguments = Args}.