diff options
| author | kjnilsson <knilsson@pivotal.io> | 2020-10-07 10:57:45 +0100 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2020-10-07 10:57:45 +0100 |
| commit | 91d6a5d84e377d288d114999542f58bf82b5c1b7 (patch) | |
| tree | 3f69d74bd86a5918f4bdfa6354d5fa54b607c885 | |
| parent | 43293a8b1762016f724e9ee9062459a59f7208b3 (diff) | |
| download | rabbitmq-server-git-91d6a5d84e377d288d114999542f58bf82b5c1b7.tar.gz | |
Backwards compatibilty fixes
As some classic queue messages changed during the queue type refactoring
we need to check the stream_queue feature flag to ensure we return the
appropriate format to the interacting channel.
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 32 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 14 |
3 files changed, 44 insertions, 11 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 888bd04416..0aca86c4af 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -599,7 +599,7 @@ confirm_messages(MsgIds, MTC, QName) -> end, {#{}, MTC}, MsgIds), maps:fold( fun(Pid, MsgSeqNos, _) -> - rabbit_misc:confirm_to_sender(Pid, QName, MsgSeqNos) + confirm_to_sender(Pid, QName, MsgSeqNos) end, ok, CMs), @@ -622,7 +622,7 @@ send_or_record_confirm(#delivery{confirm = true, sender = SenderPid, msg_seq_no = MsgSeqNo}, #q{q = Q} = State) -> - rabbit_misc:confirm_to_sender(SenderPid, amqqueue:get_name(Q), [MsgSeqNo]), + confirm_to_sender(SenderPid, amqqueue:get_name(Q), [MsgSeqNo]), {immediately, State}. %% This feature was used by `rabbit_amqqueue_process` and @@ -806,7 +806,7 @@ send_reject_publish(#delivery{confirm = true, backing_queue = BQ, backing_queue_state = BQS, msg_id_to_channel = MTC}) -> - gen_server2:cast(SenderPid, {queue_event, Q, {reject_publish, MsgSeqNo, self()}}), + ok = send_rejection(SenderPid, amqqueue:get_name(Q), MsgSeqNo), MTC1 = maps:remove(MsgId, MTC), BQS1 = BQ:discard(MsgId, SenderPid, Flow, BQS), @@ -1521,9 +1521,9 @@ handle_cast({run_backing_queue, Mod, Fun}, noreply(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}); handle_cast({deliver, - Delivery = #delivery{sender = Sender, - flow = Flow}, - SlaveWhenPublished}, + Delivery = #delivery{sender = Sender, + flow = Flow}, + SlaveWhenPublished}, State = #q{senders = Senders}) -> Senders1 = case Flow of %% In both credit_flow:ack/1 we are acking messages to the channel @@ -1840,3 +1840,23 @@ update_ha_mode(State) -> {ok, Q} = rabbit_amqqueue:lookup(qname(State)), ok = rabbit_mirror_queue_misc:update_mirrors(Q), State. + +confirm_to_sender(Pid, QName, MsgSeqNos) -> + %% the stream queue included the queue type refactoring and thus requires + %% a different message format + case rabbit_ff_registry:is_enabled(steam_queue) of + true -> + rabbit_misc:confirm_to_sender(Pid, QName, MsgSeqNos); + false -> + rabbit_misc:confirm_to_sender_compat(Pid, QName, MsgSeqNos) + end. + +send_rejection(Pid, QName, MsgSeqNo) -> + case rabbit_ff_registry:is_enabled(steam_queue) of + true -> + gen_server2:cast(Pid, {queue_event, QName, + {reject_publish, MsgSeqNo, self()}}); + false -> + gen_server2:cast(Pid, {reject_publish, MsgSeqNo, self()}) + end. + diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index fd4ecf6974..089df6e24a 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -294,7 +294,7 @@ send_command(Pid, Msg) -> (pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg()) -> 'ok'. deliver(Pid, ConsumerTag, AckRequired, Msg) -> - gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, [Msg]}). + gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}). -spec deliver_reply(binary(), rabbit_types:delivery()) -> 'ok'. @@ -2568,10 +2568,13 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, check_not_default_exchange(ExchangeName), _ = rabbit_exchange:lookup_or_die(ExchangeName). -handle_deliver(CTag, Ack, Msgs, State) -> +handle_deliver(CTag, Ack, Msgs, State) when is_list(Msgs) -> lists:foldl(fun(Msg, S) -> handle_deliver0(CTag, Ack, Msg, S) - end, State, Msgs). + end, State, Msgs); +handle_deliver(CTag, Ack, Msg, State) -> + %% backwards compatibility clause + handle_deliver0(CTag, Ack, Msg, State). handle_deliver0(ConsumerTag, AckRequired, Msg = {QName, QPid, _MsgId, Redelivered, diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index bd7a5b457d..994c2906b5 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -579,7 +579,7 @@ send_or_record_confirm(_Status, #delivery { sender = ChPid, confirm = true, msg_seq_no = MsgSeqNo }, MS, #state{q = Q} = _State) -> - ok = rabbit_misc:confirm_to_sender(ChPid, amqqueue:get_name(Q), [MsgSeqNo]), + ok = confirm_to_sender(ChPid, amqqueue:get_name(Q), [MsgSeqNo]), MS. confirm_messages(MsgIds, State = #state{q = Q, msg_id_status = MS}) -> @@ -612,7 +612,7 @@ confirm_messages(MsgIds, State = #state{q = Q, msg_id_status = MS}) -> end end, {gb_trees:empty(), MS}, MsgIds), Fun = fun (Pid, MsgSeqNos) -> - rabbit_misc:confirm_to_sender(Pid, QName, MsgSeqNos) + confirm_to_sender(Pid, QName, MsgSeqNos) end, rabbit_misc:gb_trees_foreach(Fun, CMs), State #state { msg_id_status = MS1 }. @@ -1090,3 +1090,13 @@ record_synchronised(Q0) when ?is_amqqueue(Q0) -> ok -> ok; {ok, Q2} -> rabbit_mirror_queue_misc:maybe_drop_master_after_sync(Q2) end. + +confirm_to_sender(Pid, QName, MsgSeqNos) -> + %% the stream queue included the queue type refactoring and thus requires + %% a different message format + case rabbit_ff_registry:is_enabled(steam_queue) of + true -> + rabbit_misc:confirm_to_sender(Pid, QName, MsgSeqNos); + false -> + rabbit_misc:confirm_to_sender_compat(Pid, QName, MsgSeqNos) + end. |
