summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordcorbacho <dparracorbacho@piotal.io>2020-02-13 12:36:29 +0100
committerdcorbacho <dparracorbacho@piotal.io>2020-02-13 12:36:29 +0100
commitbc8de1581791b6c8d5e5a7c08426c985fa08ae43 (patch)
tree7d5d36a19ea91766101cac4815a3982ae4a3b55e
parent646c446ee5211adc1f310b6ac941fee214c67d51 (diff)
downloadrabbitmq-server-git-bc8de1581791b6c8d5e5a7c08426c985fa08ae43.tar.gz
Fix confirms for quorum and stream2 queues
-rw-r--r--src/rabbit_channel.erl17
-rw-r--r--src/rabbit_stream2_queue.erl4
-rw-r--r--test/rabbit_stream2_queue_SUITE.erl7
3 files changed, 18 insertions, 10 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 193aba1e06..c6e7eb4d96 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -865,7 +865,7 @@ handle_info({osiris_written, QName, Corrs},
{MsgSeqNos, QState} = rabbit_stream2_queue:handle_written(
QState0, Corrs),
State = State0#ch{queue_states = QueueStates0#{QName => QState}},
- noreply_coalesce(confirm(MsgSeqNos, rabbit_stream2_queue:leader(QState), State));
+ noreply_coalesce(confirm(MsgSeqNos, QName, State));
_ ->
noreply(State0)
end;
@@ -2304,8 +2304,12 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
Message, State1),
AllDeliveredQNames = [ QName || QRef <- AllDeliveredQRefs,
{ok, QName} <- [maps:find(QRef, QNames1)]],
+ %% classic queue confirms return a Pid, any other returns the queue name
+ AllDeliveredQConfirmRefs = DeliveredQPids ++
+ [ QName || QRef <- DeliveredQQPids,
+ {ok, QName} <- [maps:find(QRef, QNames1)]],
State2 = process_routing_confirm(Confirm,
- AllDeliveredQRefs,
+ AllDeliveredQConfirmRefs,
AllDeliveredQNames,
MsgSeqNo,
XName, State1),
@@ -2346,7 +2350,14 @@ confirm(MsgSeqNos, QRef, State = #ch{queue_names = QNames, unconfirmed = UC}) ->
%% NOTE: if queue name does not exist here it's likely that the ref also
%% does not exist in unconfirmed messages.
%% Neither does the 'ignore' atom, so it's a reasonable fallback.
- QName = maps:get(QRef, QNames, ignore),
+ QName = case is_pid(QRef) of
+ true ->
+ %% for classic queues
+ maps:get(QRef, QNames, ignore);
+ _ ->
+ %% for any other type of queue
+ QRef
+ end,
{ConfirmMXs, RejectMXs, UC1} =
unconfirmed_messages:confirm_multiple_msg_ref(MsgSeqNos, QName, QRef, UC),
%% NB: don't call noreply/1 since we don't want to send confirms.
diff --git a/src/rabbit_stream2_queue.erl b/src/rabbit_stream2_queue.erl
index adfa0dfcfb..f361fdd1c5 100644
--- a/src/rabbit_stream2_queue.erl
+++ b/src/rabbit_stream2_queue.erl
@@ -15,7 +15,6 @@
init_client/1,
queue_name/1,
pending_size/1,
- leader/1,
%% mgmt
declare/1,
@@ -140,9 +139,6 @@ init_client(Q) when ?is_amqqueue(Q) ->
queue_name(#stream2_client{name = Name}) ->
Name.
-leader(#stream2_client{leader = Leader}) ->
- Leader.
-
pending_size(#stream2_client{correlation = Correlation}) ->
maps:size(Correlation).
diff --git a/test/rabbit_stream2_queue_SUITE.erl b/test/rabbit_stream2_queue_SUITE.erl
index 7509c18cb7..feb89babb6 100644
--- a/test/rabbit_stream2_queue_SUITE.erl
+++ b/test/rabbit_stream2_queue_SUITE.erl
@@ -458,9 +458,10 @@ publish_confirm_with_replica_down(Config) ->
amqp_channel:register_confirm_handler(Ch, self()),
ok = receive
- #'basic.nack'{} -> ok
- after 2500 ->
- exit(confirm_timeout)
+ #'basic.nack'{} -> exit(unconfirmed);
+ #'basic.ack'{} -> exit(confirmed)
+ after 1000 ->
+ ok
end,
rabbit_ct_broker_helpers:start_node(Config, Server2),