diff options
author | dcorbacho <dparracorbacho@piotal.io> | 2020-02-13 12:36:29 +0100 |
---|---|---|
committer | dcorbacho <dparracorbacho@piotal.io> | 2020-02-13 12:36:29 +0100 |
commit | bc8de1581791b6c8d5e5a7c08426c985fa08ae43 (patch) | |
tree | 7d5d36a19ea91766101cac4815a3982ae4a3b55e | |
parent | 646c446ee5211adc1f310b6ac941fee214c67d51 (diff) | |
download | rabbitmq-server-git-bc8de1581791b6c8d5e5a7c08426c985fa08ae43.tar.gz |
Fix confirms for quorum and stream2 queues
-rw-r--r-- | src/rabbit_channel.erl | 17 | ||||
-rw-r--r-- | src/rabbit_stream2_queue.erl | 4 | ||||
-rw-r--r-- | test/rabbit_stream2_queue_SUITE.erl | 7 |
3 files changed, 18 insertions, 10 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 193aba1e06..c6e7eb4d96 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -865,7 +865,7 @@ handle_info({osiris_written, QName, Corrs}, {MsgSeqNos, QState} = rabbit_stream2_queue:handle_written( QState0, Corrs), State = State0#ch{queue_states = QueueStates0#{QName => QState}}, - noreply_coalesce(confirm(MsgSeqNos, rabbit_stream2_queue:leader(QState), State)); + noreply_coalesce(confirm(MsgSeqNos, QName, State)); _ -> noreply(State0) end; @@ -2304,8 +2304,12 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ Message, State1), AllDeliveredQNames = [ QName || QRef <- AllDeliveredQRefs, {ok, QName} <- [maps:find(QRef, QNames1)]], + %% classic queue confirms return a Pid, any other returns the queue name + AllDeliveredQConfirmRefs = DeliveredQPids ++ + [ QName || QRef <- DeliveredQQPids, + {ok, QName} <- [maps:find(QRef, QNames1)]], State2 = process_routing_confirm(Confirm, - AllDeliveredQRefs, + AllDeliveredQConfirmRefs, AllDeliveredQNames, MsgSeqNo, XName, State1), @@ -2346,7 +2350,14 @@ confirm(MsgSeqNos, QRef, State = #ch{queue_names = QNames, unconfirmed = UC}) -> %% NOTE: if queue name does not exist here it's likely that the ref also %% does not exist in unconfirmed messages. %% Neither does the 'ignore' atom, so it's a reasonable fallback. - QName = maps:get(QRef, QNames, ignore), + QName = case is_pid(QRef) of + true -> + %% for classic queues + maps:get(QRef, QNames, ignore); + _ -> + %% for any other type of queue + QRef + end, {ConfirmMXs, RejectMXs, UC1} = unconfirmed_messages:confirm_multiple_msg_ref(MsgSeqNos, QName, QRef, UC), %% NB: don't call noreply/1 since we don't want to send confirms. diff --git a/src/rabbit_stream2_queue.erl b/src/rabbit_stream2_queue.erl index adfa0dfcfb..f361fdd1c5 100644 --- a/src/rabbit_stream2_queue.erl +++ b/src/rabbit_stream2_queue.erl @@ -15,7 +15,6 @@ init_client/1, queue_name/1, pending_size/1, - leader/1, %% mgmt declare/1, @@ -140,9 +139,6 @@ init_client(Q) when ?is_amqqueue(Q) -> queue_name(#stream2_client{name = Name}) -> Name. -leader(#stream2_client{leader = Leader}) -> - Leader. - pending_size(#stream2_client{correlation = Correlation}) -> maps:size(Correlation). diff --git a/test/rabbit_stream2_queue_SUITE.erl b/test/rabbit_stream2_queue_SUITE.erl index 7509c18cb7..feb89babb6 100644 --- a/test/rabbit_stream2_queue_SUITE.erl +++ b/test/rabbit_stream2_queue_SUITE.erl @@ -458,9 +458,10 @@ publish_confirm_with_replica_down(Config) -> amqp_channel:register_confirm_handler(Ch, self()), ok = receive - #'basic.nack'{} -> ok - after 2500 -> - exit(confirm_timeout) + #'basic.nack'{} -> exit(unconfirmed); + #'basic.ack'{} -> exit(confirmed) + after 1000 -> + ok end, rabbit_ct_broker_helpers:start_node(Config, Server2), |