summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLuke Bakken <luke@bakken.io>2021-11-16 11:29:22 -0800
committerLuke Bakken <luke@bakken.io>2021-11-23 08:59:47 -0800
commit6d545447b98501577bfceaaf8186bd3091f88257 (patch)
tree7b2589264986d76501619c59a6c83564f6e8ca86
parente22e667a103fcb1460896906fe97c7ea6c0ce460 (diff)
downloadrabbitmq-server-git-6d545447b98501577bfceaaf8186bd3091f88257.tar.gz
Fix quorum queue crash during consumer cancel with return
Fixes #3729
-rw-r--r--deps/rabbit/src/rabbit_fifo_client.erl28
-rw-r--r--deps/rabbit/test/quorum_queue_SUITE.erl56
2 files changed, 73 insertions, 11 deletions
diff --git a/deps/rabbit/src/rabbit_fifo_client.erl b/deps/rabbit/src/rabbit_fifo_client.erl
index 9b8920f89e..7f2cd55acc 100644
--- a/deps/rabbit/src/rabbit_fifo_client.erl
+++ b/deps/rabbit/src/rabbit_fifo_client.erl
@@ -42,7 +42,6 @@
-define(COMMAND_TIMEOUT, 30000).
-type seq() :: non_neg_integer().
-%% last_applied is initialised to -1
-type maybe_seq() :: integer().
-type action() :: {send_credit_reply, Available :: non_neg_integer()} |
{send_drained, CTagCredit ::
@@ -300,7 +299,7 @@ settle(ConsumerTag, [_|_] = MsgIds,
%% from {@link rabbit_fifo:delivery/0.}
%% @param State the {@module} state
%% @returns
-%% `{ok | slow, State}' if the command was successfully sent. If the return
+%% `{State, list()}' if the command was successfully sent. If the return
%% tag is `slow' it means the limit is approaching and it is time to slow down
%% the sending rate.
%%
@@ -310,10 +309,8 @@ return(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) ->
Node = pick_server(State0),
% TODO: make rabbit_fifo return support lists of message ids
Cmd = rabbit_fifo:make_return(consumer_id(ConsumerTag), MsgIds),
- case send_command(Node, undefined, Cmd, normal, State0) of
- {_, S} ->
- {S, []}
- end;
+ {_Tag, State1} = send_command(Node, undefined, Cmd, normal, State0),
+ {State1, []};
return(ConsumerTag, [_|_] = MsgIds,
#state{unsent_commands = Unsent0} = State0) ->
ConsumerId = consumer_id(ConsumerTag),
@@ -323,7 +320,8 @@ return(ConsumerTag, [_|_] = MsgIds,
fun ({Settles, Returns, Discards}) ->
{Settles, Returns ++ MsgIds, Discards}
end, {[], MsgIds, []}, Unsent0),
- {State0#state{unsent_commands = Unsent}, []}.
+ State1 = State0#state{unsent_commands = Unsent},
+ {State1, []}.
%% @doc Discards a checked out message.
%% If the queue has a dead_letter_handler configured this will be called.
@@ -732,10 +730,10 @@ maybe_auto_ack(false, {deliver, Tag, _Ack, Msgs} = Deliver, State0) ->
{State, Actions} = settle(Tag, MsgIds, State0),
{ok, State, [Deliver] ++ Actions}.
-
handle_delivery(Leader, {delivery, Tag, [{FstId, _} | _] = IdMsgs},
#state{cfg = #cfg{cluster_name = QName},
- consumer_deliveries = CDels0} = State0) ->
+ consumer_deliveries = CDels0} = State0)
+ when is_map_key(Tag, CDels0) ->
QRef = qref(Leader),
{LastId, _} = lists:last(IdMsgs),
Consumer = #consumer{ack = Ack} = maps:get(Tag, CDels0),
@@ -787,7 +785,17 @@ handle_delivery(Leader, {delivery, Tag, [{FstId, _} | _] = IdMsgs},
length(IdMsgs),
C#consumer{last_msg_id = LastId},
CDels0)})
- end.
+ end;
+handle_delivery(_Leader, {delivery, Tag, [_ | _] = IdMsgs},
+ #state{consumer_deliveries = CDels0} = State0)
+ when not is_map_key(Tag, CDels0) ->
+ %% Note:
+ %% https://github.com/rabbitmq/rabbitmq-server/issues/3729
+ %% If the consumer is no longer in the deliveries map,
+ %% we should return all messages.
+ MsgIntIds = [Id || {Id, _} <- IdMsgs],
+ {State1, Deliveries} = return(Tag, MsgIntIds, State0),
+ {ok, State1, Deliveries}.
transform_msgs(QName, QRef, Msgs) ->
lists:map(
diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl
index cdbb59d12b..c133307bf7 100644
--- a/deps/rabbit/test/quorum_queue_SUITE.erl
+++ b/deps/rabbit/test/quorum_queue_SUITE.erl
@@ -137,7 +137,8 @@ all_tests() ->
delete_if_unused,
queue_ttl,
peek,
- consumer_priorities
+ consumer_priorities,
+ cancel_consumer_gh_3729
].
memory_tests() ->
@@ -2766,6 +2767,55 @@ consumer_priorities(Config) ->
ok.
+cancel_consumer_gh_3729(Config) ->
+ %% Test the scenario where a message is published to a quorum queue
+ %% but the consumer has been cancelled
+ %% https://github.com/rabbitmq/rabbitmq-server/pull/3746
+ QQ = ?config(queue_name, Config),
+
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+
+ ExpectedDeclareRslt0 = #'queue.declare_ok'{queue = QQ, message_count = 0, consumer_count = 0},
+ DeclareRslt0 = declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}]),
+ ?assertMatch(ExpectedDeclareRslt0, DeclareRslt0),
+
+ ok = publish(Ch, QQ),
+
+ ok = subscribe(Ch, QQ, false),
+
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
+ R = #'basic.reject'{delivery_tag = DeliveryTag, requeue = true},
+ ok = amqp_channel:cast(Ch, R)
+ after 1000 ->
+ flush(100),
+ ct:fail("basic.deliver timeout")
+ end,
+
+ ok = cancel(Ch),
+
+ D = #'queue.declare'{queue = QQ, passive = true, arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}]},
+ #'queue.declare_ok'{queue = QQ, message_count = 0, consumer_count = 1} = amqp_channel:call(Ch, D),
+
+ receive
+ #'basic.cancel_ok'{consumer_tag = <<"ctag">>} -> ok
+ after 1000 ->
+ flush(100),
+ ct:fail("basic.cancel_ok timeout")
+ end,
+
+ F = fun() ->
+ #'queue.declare_ok'{queue = QQ,
+ message_count = MC,
+ consumer_count = CC} = amqp_channel:call(Ch, D),
+ ct:pal("MC ~w CC ~w", [MC, CC]),
+ MC =:= 1 andalso CC =:= 0
+ end,
+ wait_until(F),
+
+ ok = rabbit_ct_client_helpers:close_channel(Ch).
+
%%----------------------------------------------------------------------------
declare(Ch, Q) ->
@@ -2824,6 +2874,10 @@ qos(Ch, Prefetch, Global) ->
amqp_channel:call(Ch, #'basic.qos'{global = Global,
prefetch_count = Prefetch})).
+cancel(Ch) ->
+ ?assertMatch(#'basic.cancel_ok'{consumer_tag = <<"ctag">>},
+ amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = <<"ctag">>})).
+
receive_basic_deliver(Redelivered) ->
receive
{#'basic.deliver'{redelivered = R}, _} when R == Redelivered ->