diff options
author | Michael Klishin <klishinm@vmware.com> | 2021-11-25 15:14:42 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-11-25 15:14:42 +0300 |
commit | d8a4d4cc4f59e1d14497779521e47927c6d1f2b2 (patch) | |
tree | 2e601ae07c9829128b057718a1d66e25c8296894 | |
parent | ce497a5b525b5399ecc08548bf98acb29c41b377 (diff) | |
parent | 4f09fd109c0192318803db8c1de599c43b6589f4 (diff) | |
download | rabbitmq-server-git-d8a4d4cc4f59e1d14497779521e47927c6d1f2b2.tar.gz |
Merge pull request #3746 from rabbitmq/lukebakken/gh-3729
Fix case where quorum queue consumer is cancelled while message is about to be delivered
-rw-r--r-- | deps/rabbit/src/rabbit_fifo_client.erl | 28 | ||||
-rw-r--r-- | deps/rabbit/test/quorum_queue_SUITE.erl | 61 |
2 files changed, 75 insertions, 14 deletions
diff --git a/deps/rabbit/src/rabbit_fifo_client.erl b/deps/rabbit/src/rabbit_fifo_client.erl index 9b8920f89e..7f2cd55acc 100644 --- a/deps/rabbit/src/rabbit_fifo_client.erl +++ b/deps/rabbit/src/rabbit_fifo_client.erl @@ -42,7 +42,6 @@ -define(COMMAND_TIMEOUT, 30000). -type seq() :: non_neg_integer(). -%% last_applied is initialised to -1 -type maybe_seq() :: integer(). -type action() :: {send_credit_reply, Available :: non_neg_integer()} | {send_drained, CTagCredit :: @@ -300,7 +299,7 @@ settle(ConsumerTag, [_|_] = MsgIds, %% from {@link rabbit_fifo:delivery/0.} %% @param State the {@module} state %% @returns -%% `{ok | slow, State}' if the command was successfully sent. If the return +%% `{State, list()}' if the command was successfully sent. If the return %% tag is `slow' it means the limit is approaching and it is time to slow down %% the sending rate. %% @@ -310,10 +309,8 @@ return(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) -> Node = pick_server(State0), % TODO: make rabbit_fifo return support lists of message ids Cmd = rabbit_fifo:make_return(consumer_id(ConsumerTag), MsgIds), - case send_command(Node, undefined, Cmd, normal, State0) of - {_, S} -> - {S, []} - end; + {_Tag, State1} = send_command(Node, undefined, Cmd, normal, State0), + {State1, []}; return(ConsumerTag, [_|_] = MsgIds, #state{unsent_commands = Unsent0} = State0) -> ConsumerId = consumer_id(ConsumerTag), @@ -323,7 +320,8 @@ return(ConsumerTag, [_|_] = MsgIds, fun ({Settles, Returns, Discards}) -> {Settles, Returns ++ MsgIds, Discards} end, {[], MsgIds, []}, Unsent0), - {State0#state{unsent_commands = Unsent}, []}. + State1 = State0#state{unsent_commands = Unsent}, + {State1, []}. %% @doc Discards a checked out message. %% If the queue has a dead_letter_handler configured this will be called. @@ -732,10 +730,10 @@ maybe_auto_ack(false, {deliver, Tag, _Ack, Msgs} = Deliver, State0) -> {State, Actions} = settle(Tag, MsgIds, State0), {ok, State, [Deliver] ++ Actions}. - handle_delivery(Leader, {delivery, Tag, [{FstId, _} | _] = IdMsgs}, #state{cfg = #cfg{cluster_name = QName}, - consumer_deliveries = CDels0} = State0) -> + consumer_deliveries = CDels0} = State0) + when is_map_key(Tag, CDels0) -> QRef = qref(Leader), {LastId, _} = lists:last(IdMsgs), Consumer = #consumer{ack = Ack} = maps:get(Tag, CDels0), @@ -787,7 +785,17 @@ handle_delivery(Leader, {delivery, Tag, [{FstId, _} | _] = IdMsgs}, length(IdMsgs), C#consumer{last_msg_id = LastId}, CDels0)}) - end. + end; +handle_delivery(_Leader, {delivery, Tag, [_ | _] = IdMsgs}, + #state{consumer_deliveries = CDels0} = State0) + when not is_map_key(Tag, CDels0) -> + %% Note: + %% https://github.com/rabbitmq/rabbitmq-server/issues/3729 + %% If the consumer is no longer in the deliveries map, + %% we should return all messages. + MsgIntIds = [Id || {Id, _} <- IdMsgs], + {State1, Deliveries} = return(Tag, MsgIntIds, State0), + {ok, State1, Deliveries}. transform_msgs(QName, QRef, Msgs) -> lists:map( diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index cdbb59d12b..6b21c1e813 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -137,7 +137,8 @@ all_tests() -> delete_if_unused, queue_ttl, peek, - consumer_priorities + consumer_priorities, + cancel_consumer_gh_3729 ]. memory_tests() -> @@ -1981,7 +1982,7 @@ subscribe_redelivery_limit(Config) -> receive {#'basic.deliver'{redelivered = true}, #amqp_msg{}} -> throw(unexpected_redelivery) - after 2000 -> + after 5000 -> ok end. @@ -2027,7 +2028,7 @@ subscribe_redelivery_policy(Config) -> receive {#'basic.deliver'{redelivered = true}, #amqp_msg{}} -> throw(unexpected_redelivery) - after 2000 -> + after 5000 -> ok end, ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"delivery-limit">>). @@ -2766,6 +2767,54 @@ consumer_priorities(Config) -> ok. +cancel_consumer_gh_3729(Config) -> + %% Test the scenario where a message is published to a quorum queue + %% but the consumer has been cancelled + %% https://github.com/rabbitmq/rabbitmq-server/pull/3746 + QQ = ?config(queue_name, Config), + + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + + ExpectedDeclareRslt0 = #'queue.declare_ok'{queue = QQ, message_count = 0, consumer_count = 0}, + DeclareRslt0 = declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}]), + ?assertMatch(ExpectedDeclareRslt0, DeclareRslt0), + + ok = publish(Ch, QQ), + + ok = subscribe(Ch, QQ, false), + + receive + {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> + R = #'basic.reject'{delivery_tag = DeliveryTag, requeue = true}, + ok = amqp_channel:cast(Ch, R) + after 5000 -> + flush(100), + ct:fail("basic.deliver timeout") + end, + + ok = cancel(Ch), + + D = #'queue.declare'{queue = QQ, passive = true, arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}]}, + #'queue.declare_ok'{queue = QQ, message_count = 0, consumer_count = 1} = amqp_channel:call(Ch, D), + + receive + #'basic.cancel_ok'{consumer_tag = <<"ctag">>} -> ok + after 5000 -> + flush(100), + ct:fail("basic.cancel_ok timeout") + end, + + F = fun() -> + #'queue.declare_ok'{queue = QQ, + message_count = MC, + consumer_count = CC} = amqp_channel:call(Ch, D), + MC =:= 1 andalso CC =:= 0 + end, + wait_until(F), + + ok = rabbit_ct_client_helpers:close_channel(Ch). + %%---------------------------------------------------------------------------- declare(Ch, Q) -> @@ -2824,6 +2873,10 @@ qos(Ch, Prefetch, Global) -> amqp_channel:call(Ch, #'basic.qos'{global = Global, prefetch_count = Prefetch})). +cancel(Ch) -> + ?assertMatch(#'basic.cancel_ok'{consumer_tag = <<"ctag">>}, + amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = <<"ctag">>})). + receive_basic_deliver(Redelivered) -> receive {#'basic.deliver'{redelivered = R}, _} when R == Redelivered -> @@ -2913,7 +2966,7 @@ validate_queue(Ch, Queue, ExpectedMsgs) -> #amqp_msg{payload = M}} -> amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1, multiple = false}) - after 2000 -> + after 5000 -> flush(10), exit({validate_queue_timeout, M}) end |