summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <klishinm@vmware.com>2021-11-25 15:14:42 +0300
committerGitHub <noreply@github.com>2021-11-25 15:14:42 +0300
commitd8a4d4cc4f59e1d14497779521e47927c6d1f2b2 (patch)
tree2e601ae07c9829128b057718a1d66e25c8296894
parentce497a5b525b5399ecc08548bf98acb29c41b377 (diff)
parent4f09fd109c0192318803db8c1de599c43b6589f4 (diff)
downloadrabbitmq-server-git-d8a4d4cc4f59e1d14497779521e47927c6d1f2b2.tar.gz
Merge pull request #3746 from rabbitmq/lukebakken/gh-3729
Fix case where quorum queue consumer is cancelled while message is about to be delivered
-rw-r--r--deps/rabbit/src/rabbit_fifo_client.erl28
-rw-r--r--deps/rabbit/test/quorum_queue_SUITE.erl61
2 files changed, 75 insertions, 14 deletions
diff --git a/deps/rabbit/src/rabbit_fifo_client.erl b/deps/rabbit/src/rabbit_fifo_client.erl
index 9b8920f89e..7f2cd55acc 100644
--- a/deps/rabbit/src/rabbit_fifo_client.erl
+++ b/deps/rabbit/src/rabbit_fifo_client.erl
@@ -42,7 +42,6 @@
-define(COMMAND_TIMEOUT, 30000).
-type seq() :: non_neg_integer().
-%% last_applied is initialised to -1
-type maybe_seq() :: integer().
-type action() :: {send_credit_reply, Available :: non_neg_integer()} |
{send_drained, CTagCredit ::
@@ -300,7 +299,7 @@ settle(ConsumerTag, [_|_] = MsgIds,
%% from {@link rabbit_fifo:delivery/0.}
%% @param State the {@module} state
%% @returns
-%% `{ok | slow, State}' if the command was successfully sent. If the return
+%% `{State, list()}' if the command was successfully sent. If the return
%% tag is `slow' it means the limit is approaching and it is time to slow down
%% the sending rate.
%%
@@ -310,10 +309,8 @@ return(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) ->
Node = pick_server(State0),
% TODO: make rabbit_fifo return support lists of message ids
Cmd = rabbit_fifo:make_return(consumer_id(ConsumerTag), MsgIds),
- case send_command(Node, undefined, Cmd, normal, State0) of
- {_, S} ->
- {S, []}
- end;
+ {_Tag, State1} = send_command(Node, undefined, Cmd, normal, State0),
+ {State1, []};
return(ConsumerTag, [_|_] = MsgIds,
#state{unsent_commands = Unsent0} = State0) ->
ConsumerId = consumer_id(ConsumerTag),
@@ -323,7 +320,8 @@ return(ConsumerTag, [_|_] = MsgIds,
fun ({Settles, Returns, Discards}) ->
{Settles, Returns ++ MsgIds, Discards}
end, {[], MsgIds, []}, Unsent0),
- {State0#state{unsent_commands = Unsent}, []}.
+ State1 = State0#state{unsent_commands = Unsent},
+ {State1, []}.
%% @doc Discards a checked out message.
%% If the queue has a dead_letter_handler configured this will be called.
@@ -732,10 +730,10 @@ maybe_auto_ack(false, {deliver, Tag, _Ack, Msgs} = Deliver, State0) ->
{State, Actions} = settle(Tag, MsgIds, State0),
{ok, State, [Deliver] ++ Actions}.
-
handle_delivery(Leader, {delivery, Tag, [{FstId, _} | _] = IdMsgs},
#state{cfg = #cfg{cluster_name = QName},
- consumer_deliveries = CDels0} = State0) ->
+ consumer_deliveries = CDels0} = State0)
+ when is_map_key(Tag, CDels0) ->
QRef = qref(Leader),
{LastId, _} = lists:last(IdMsgs),
Consumer = #consumer{ack = Ack} = maps:get(Tag, CDels0),
@@ -787,7 +785,17 @@ handle_delivery(Leader, {delivery, Tag, [{FstId, _} | _] = IdMsgs},
length(IdMsgs),
C#consumer{last_msg_id = LastId},
CDels0)})
- end.
+ end;
+handle_delivery(_Leader, {delivery, Tag, [_ | _] = IdMsgs},
+ #state{consumer_deliveries = CDels0} = State0)
+ when not is_map_key(Tag, CDels0) ->
+ %% Note:
+ %% https://github.com/rabbitmq/rabbitmq-server/issues/3729
+ %% If the consumer is no longer in the deliveries map,
+ %% we should return all messages.
+ MsgIntIds = [Id || {Id, _} <- IdMsgs],
+ {State1, Deliveries} = return(Tag, MsgIntIds, State0),
+ {ok, State1, Deliveries}.
transform_msgs(QName, QRef, Msgs) ->
lists:map(
diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl
index cdbb59d12b..6b21c1e813 100644
--- a/deps/rabbit/test/quorum_queue_SUITE.erl
+++ b/deps/rabbit/test/quorum_queue_SUITE.erl
@@ -137,7 +137,8 @@ all_tests() ->
delete_if_unused,
queue_ttl,
peek,
- consumer_priorities
+ consumer_priorities,
+ cancel_consumer_gh_3729
].
memory_tests() ->
@@ -1981,7 +1982,7 @@ subscribe_redelivery_limit(Config) ->
receive
{#'basic.deliver'{redelivered = true}, #amqp_msg{}} ->
throw(unexpected_redelivery)
- after 2000 ->
+ after 5000 ->
ok
end.
@@ -2027,7 +2028,7 @@ subscribe_redelivery_policy(Config) ->
receive
{#'basic.deliver'{redelivered = true}, #amqp_msg{}} ->
throw(unexpected_redelivery)
- after 2000 ->
+ after 5000 ->
ok
end,
ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"delivery-limit">>).
@@ -2766,6 +2767,54 @@ consumer_priorities(Config) ->
ok.
+cancel_consumer_gh_3729(Config) ->
+ %% Test the scenario where a message is published to a quorum queue
+ %% but the consumer has been cancelled
+ %% https://github.com/rabbitmq/rabbitmq-server/pull/3746
+ QQ = ?config(queue_name, Config),
+
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+
+ ExpectedDeclareRslt0 = #'queue.declare_ok'{queue = QQ, message_count = 0, consumer_count = 0},
+ DeclareRslt0 = declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}]),
+ ?assertMatch(ExpectedDeclareRslt0, DeclareRslt0),
+
+ ok = publish(Ch, QQ),
+
+ ok = subscribe(Ch, QQ, false),
+
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
+ R = #'basic.reject'{delivery_tag = DeliveryTag, requeue = true},
+ ok = amqp_channel:cast(Ch, R)
+ after 5000 ->
+ flush(100),
+ ct:fail("basic.deliver timeout")
+ end,
+
+ ok = cancel(Ch),
+
+ D = #'queue.declare'{queue = QQ, passive = true, arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}]},
+ #'queue.declare_ok'{queue = QQ, message_count = 0, consumer_count = 1} = amqp_channel:call(Ch, D),
+
+ receive
+ #'basic.cancel_ok'{consumer_tag = <<"ctag">>} -> ok
+ after 5000 ->
+ flush(100),
+ ct:fail("basic.cancel_ok timeout")
+ end,
+
+ F = fun() ->
+ #'queue.declare_ok'{queue = QQ,
+ message_count = MC,
+ consumer_count = CC} = amqp_channel:call(Ch, D),
+ MC =:= 1 andalso CC =:= 0
+ end,
+ wait_until(F),
+
+ ok = rabbit_ct_client_helpers:close_channel(Ch).
+
%%----------------------------------------------------------------------------
declare(Ch, Q) ->
@@ -2824,6 +2873,10 @@ qos(Ch, Prefetch, Global) ->
amqp_channel:call(Ch, #'basic.qos'{global = Global,
prefetch_count = Prefetch})).
+cancel(Ch) ->
+ ?assertMatch(#'basic.cancel_ok'{consumer_tag = <<"ctag">>},
+ amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = <<"ctag">>})).
+
receive_basic_deliver(Redelivered) ->
receive
{#'basic.deliver'{redelivered = R}, _} when R == Redelivered ->
@@ -2913,7 +2966,7 @@ validate_queue(Ch, Queue, ExpectedMsgs) ->
#amqp_msg{payload = M}} ->
amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1,
multiple = false})
- after 2000 ->
+ after 5000 ->
flush(10),
exit({validate_queue_timeout, M})
end