summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2020-10-07 10:57:45 +0100
committerkjnilsson <knilsson@pivotal.io>2020-10-07 10:57:45 +0100
commit91d6a5d84e377d288d114999542f58bf82b5c1b7 (patch)
tree3f69d74bd86a5918f4bdfa6354d5fa54b607c885
parent43293a8b1762016f724e9ee9062459a59f7208b3 (diff)
downloadrabbitmq-server-git-91d6a5d84e377d288d114999542f58bf82b5c1b7.tar.gz
Backwards compatibilty fixes
As some classic queue messages changed during the queue type refactoring we need to check the stream_queue feature flag to ensure we return the appropriate format to the interacting channel.
-rw-r--r--src/rabbit_amqqueue_process.erl32
-rw-r--r--src/rabbit_channel.erl9
-rw-r--r--src/rabbit_mirror_queue_slave.erl14
3 files changed, 44 insertions, 11 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 888bd04416..0aca86c4af 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -599,7 +599,7 @@ confirm_messages(MsgIds, MTC, QName) ->
end, {#{}, MTC}, MsgIds),
maps:fold(
fun(Pid, MsgSeqNos, _) ->
- rabbit_misc:confirm_to_sender(Pid, QName, MsgSeqNos)
+ confirm_to_sender(Pid, QName, MsgSeqNos)
end,
ok,
CMs),
@@ -622,7 +622,7 @@ send_or_record_confirm(#delivery{confirm = true,
sender = SenderPid,
msg_seq_no = MsgSeqNo},
#q{q = Q} = State) ->
- rabbit_misc:confirm_to_sender(SenderPid, amqqueue:get_name(Q), [MsgSeqNo]),
+ confirm_to_sender(SenderPid, amqqueue:get_name(Q), [MsgSeqNo]),
{immediately, State}.
%% This feature was used by `rabbit_amqqueue_process` and
@@ -806,7 +806,7 @@ send_reject_publish(#delivery{confirm = true,
backing_queue = BQ,
backing_queue_state = BQS,
msg_id_to_channel = MTC}) ->
- gen_server2:cast(SenderPid, {queue_event, Q, {reject_publish, MsgSeqNo, self()}}),
+ ok = send_rejection(SenderPid, amqqueue:get_name(Q), MsgSeqNo),
MTC1 = maps:remove(MsgId, MTC),
BQS1 = BQ:discard(MsgId, SenderPid, Flow, BQS),
@@ -1521,9 +1521,9 @@ handle_cast({run_backing_queue, Mod, Fun},
noreply(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)});
handle_cast({deliver,
- Delivery = #delivery{sender = Sender,
- flow = Flow},
- SlaveWhenPublished},
+ Delivery = #delivery{sender = Sender,
+ flow = Flow},
+ SlaveWhenPublished},
State = #q{senders = Senders}) ->
Senders1 = case Flow of
%% In both credit_flow:ack/1 we are acking messages to the channel
@@ -1840,3 +1840,23 @@ update_ha_mode(State) ->
{ok, Q} = rabbit_amqqueue:lookup(qname(State)),
ok = rabbit_mirror_queue_misc:update_mirrors(Q),
State.
+
+confirm_to_sender(Pid, QName, MsgSeqNos) ->
+ %% the stream queue included the queue type refactoring and thus requires
+ %% a different message format
+ case rabbit_ff_registry:is_enabled(steam_queue) of
+ true ->
+ rabbit_misc:confirm_to_sender(Pid, QName, MsgSeqNos);
+ false ->
+ rabbit_misc:confirm_to_sender_compat(Pid, QName, MsgSeqNos)
+ end.
+
+send_rejection(Pid, QName, MsgSeqNo) ->
+ case rabbit_ff_registry:is_enabled(steam_queue) of
+ true ->
+ gen_server2:cast(Pid, {queue_event, QName,
+ {reject_publish, MsgSeqNo, self()}});
+ false ->
+ gen_server2:cast(Pid, {reject_publish, MsgSeqNo, self()})
+ end.
+
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index fd4ecf6974..089df6e24a 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -294,7 +294,7 @@ send_command(Pid, Msg) ->
(pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg()) -> 'ok'.
deliver(Pid, ConsumerTag, AckRequired, Msg) ->
- gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, [Msg]}).
+ gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}).
-spec deliver_reply(binary(), rabbit_types:delivery()) -> 'ok'.
@@ -2568,10 +2568,13 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
check_not_default_exchange(ExchangeName),
_ = rabbit_exchange:lookup_or_die(ExchangeName).
-handle_deliver(CTag, Ack, Msgs, State) ->
+handle_deliver(CTag, Ack, Msgs, State) when is_list(Msgs) ->
lists:foldl(fun(Msg, S) ->
handle_deliver0(CTag, Ack, Msg, S)
- end, State, Msgs).
+ end, State, Msgs);
+handle_deliver(CTag, Ack, Msg, State) ->
+ %% backwards compatibility clause
+ handle_deliver0(CTag, Ack, Msg, State).
handle_deliver0(ConsumerTag, AckRequired,
Msg = {QName, QPid, _MsgId, Redelivered,
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index bd7a5b457d..994c2906b5 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -579,7 +579,7 @@ send_or_record_confirm(_Status, #delivery { sender = ChPid,
confirm = true,
msg_seq_no = MsgSeqNo },
MS, #state{q = Q} = _State) ->
- ok = rabbit_misc:confirm_to_sender(ChPid, amqqueue:get_name(Q), [MsgSeqNo]),
+ ok = confirm_to_sender(ChPid, amqqueue:get_name(Q), [MsgSeqNo]),
MS.
confirm_messages(MsgIds, State = #state{q = Q, msg_id_status = MS}) ->
@@ -612,7 +612,7 @@ confirm_messages(MsgIds, State = #state{q = Q, msg_id_status = MS}) ->
end
end, {gb_trees:empty(), MS}, MsgIds),
Fun = fun (Pid, MsgSeqNos) ->
- rabbit_misc:confirm_to_sender(Pid, QName, MsgSeqNos)
+ confirm_to_sender(Pid, QName, MsgSeqNos)
end,
rabbit_misc:gb_trees_foreach(Fun, CMs),
State #state { msg_id_status = MS1 }.
@@ -1090,3 +1090,13 @@ record_synchronised(Q0) when ?is_amqqueue(Q0) ->
ok -> ok;
{ok, Q2} -> rabbit_mirror_queue_misc:maybe_drop_master_after_sync(Q2)
end.
+
+confirm_to_sender(Pid, QName, MsgSeqNos) ->
+ %% the stream queue included the queue type refactoring and thus requires
+ %% a different message format
+ case rabbit_ff_registry:is_enabled(steam_queue) of
+ true ->
+ rabbit_misc:confirm_to_sender(Pid, QName, MsgSeqNos);
+ false ->
+ rabbit_misc:confirm_to_sender_compat(Pid, QName, MsgSeqNos)
+ end.