summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIliia Khaprov <i.khaprov@gmail.com>2023-04-10 00:06:44 +0200
committerIliia Khaprov <i.khaprov@gmail.com>2023-04-25 18:10:46 +0200
commit4e8f05e0ca143e6d940ea3c9112fae147a1e944b (patch)
tree58c2e18eefdf87729a78a52fc246e7234f912252
parentfd08728ff17cfa812f040acc567cf9192e90e74a (diff)
downloadrabbitmq-server-git-4e8f05e0ca143e6d940ea3c9112fae147a1e944b.tar.gz
Allow setting consumer timeout via queue policy/arg and as consumer arg. Close #5437
-rw-r--r--deps/rabbit/src/rabbit_channel.erl76
-rw-r--r--deps/rabbit/src/rabbit_networking.erl2
-rw-r--r--deps/rabbit/src/rabbit_policies.erl7
-rw-r--r--deps/rabbit/test/consumer_timeout_SUITE.erl101
4 files changed, 137 insertions, 49 deletions
diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl
index 869610f9ae..3f8116f5f5 100644
--- a/deps/rabbit/src/rabbit_channel.erl
+++ b/deps/rabbit/src/rabbit_channel.erl
@@ -2804,34 +2804,68 @@ get_operation_timeout_and_deadline() ->
Deadline = now_millis() + Timeout,
{Timeout, Deadline}.
-evaluate_consumer_timeout(State0 = #ch{cfg = #conf{channel = Channel,
- consumer_timeout = Timeout},
- unacked_message_q = UAMQ}) ->
- Now = os:system_time(millisecond),
+get_queue_consumer_timeout(_PA = #pending_ack{queue = QName},
+ _State = #ch{cfg = #conf{consumer_timeout = GCT}}) ->
+ case rabbit_amqqueue:lookup(QName) of
+ {ok, Q} -> %% should we account for different queue states here?
+ case rabbit_queue_type_util:args_policy_lookup(<<"consumer-timeout">>,
+ fun (X, Y) -> erlang:min(X, Y) end, Q) of
+ undefined -> GCT;
+ Val -> Val
+ end;
+ _ ->
+ GCT
+ end.
+
+get_consumer_timeout(PA = #pending_ack{tag = CTag},
+ State = #ch{consumer_mapping = CMap,
+ cfg = #conf{consumer_timeout = GCT}}) ->
+ case maps:find(CTag, CMap) of
+ {ok, {_, {_, _, _, Args}}} ->
+ case rabbit_misc:table_lookup(Args, <<"x-consumer-timeout">>) of
+ {long, Timeout} -> Timeout;
+ _ -> get_queue_consumer_timeout(PA, State)
+ end;
+ _ ->
+ get_queue_consumer_timeout(PA, State)
+ end.
+
+evaluate_consumer_timeout(State = #ch{unacked_message_q = UAMQ}) ->
case ?QUEUE:get(UAMQ, empty) of
- #pending_ack{delivery_tag = ConsumerTag,
- delivered_at = Time}
- when is_integer(Timeout)
- andalso Time < Now - Timeout ->
- rabbit_log_channel:warning("Consumer ~ts on channel ~w has timed out "
- "waiting for delivery acknowledgement. Timeout used: ~tp ms. "
- "This timeout value can be configured, see consumers doc guide to learn more",
- [rabbit_data_coercion:to_binary(ConsumerTag),
- Channel, Timeout]),
- Ex = rabbit_misc:amqp_error(precondition_failed,
- "delivery acknowledgement on channel ~w timed out. "
- "Timeout value used: ~tp ms. "
- "This timeout value can be configured, see consumers doc guide to learn more",
- [Channel, Timeout], none),
- handle_exception(Ex, State0);
+ empty ->
+ {noreply, State};
+ PA -> evaluate_consumer_timeout1(PA, State)
+ end.
+
+evaluate_consumer_timeout1(PA = #pending_ack{delivered_at = Time},
+ State) ->
+ Now = os:system_time(millisecond),
+ case get_consumer_timeout(PA, State) of
+ Timeout when is_integer(Timeout)
+ andalso Time < Now - Timeout ->
+ handle_consumer_timed_out(Timeout, PA, State);
_ ->
- {noreply, State0}
+ {noreply, State}
end.
+handle_consumer_timed_out(Timeout,#pending_ack{delivery_tag = DeliveryTag},
+ State = #ch{cfg = #conf{channel = Channel}}) ->
+ rabbit_log_channel:warning("Consumer ~ts on channel ~w has timed out "
+ "waiting for delivery acknowledgement. Timeout used: ~tp ms. "
+ "This timeout value can be configured, see consumers doc guide to learn more",
+ [rabbit_data_coercion:to_binary(DeliveryTag),
+ Channel, Timeout]),
+ Ex = rabbit_misc:amqp_error(precondition_failed,
+ "delivery acknowledgement on channel ~w timed out. "
+ "Timeout value used: ~tp ms. "
+ "This timeout value can be configured, see consumers doc guide to learn more",
+ [Channel, Timeout], none),
+ handle_exception(Ex, State).
+
handle_queue_actions(Actions, #ch{} = State0) ->
WriterPid = State0#ch.cfg#conf.writer_pid,
lists:foldl(
- fun
+ fun
({settled, QRef, MsgSeqNos}, S0) ->
confirm(MsgSeqNos, QRef, S0);
({rejected, _QRef, MsgSeqNos}, S0) ->
diff --git a/deps/rabbit/src/rabbit_networking.erl b/deps/rabbit/src/rabbit_networking.erl
index 1ef40cb7d3..35711faf75 100644
--- a/deps/rabbit/src/rabbit_networking.erl
+++ b/deps/rabbit/src/rabbit_networking.erl
@@ -523,7 +523,7 @@ emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) ->
emit_connection_info_local(Items, Ref, AggregatorPid) ->
rabbit_control_misc:emitting_map_with_exit_handler(
AggregatorPid, Ref, fun(Q) -> connection_info(Q, Items) end,
- connections_local()).
+ connections_local() ++ rabbit_networking:local_non_amqp_connections()).
-spec close_connection(pid(), string()) -> 'ok'.
diff --git a/deps/rabbit/src/rabbit_policies.erl b/deps/rabbit/src/rabbit_policies.erl
index 3d6ab3dffa..8422c86093 100644
--- a/deps/rabbit/src/rabbit_policies.erl
+++ b/deps/rabbit/src/rabbit_policies.erl
@@ -28,6 +28,7 @@ register() ->
%% such as rabbit_mirror_queue_misc
[rabbit_registry:register(Class, Name, ?MODULE) ||
{Class, Name} <- [{policy_validator, <<"alternate-exchange">>},
+ {policy_validator, <<"consumer-timeout">>},
{policy_validator, <<"dead-letter-exchange">>},
{policy_validator, <<"dead-letter-routing-key">>},
{policy_validator, <<"dead-letter-strategy">>},
@@ -74,6 +75,12 @@ validate_policy0(<<"alternate-exchange">>, Value)
validate_policy0(<<"alternate-exchange">>, Value) ->
{error, "~tp is not a valid alternate exchange name", [Value]};
+validate_policy0(<<"consumer-timeout">>, Value)
+ when is_integer(Value), Value >= 0 ->
+ ok;
+validate_policy0(<<"consumer-timeout">>, Value) ->
+ {error, "~tp is not a valid consumer timeout", [Value]};
+
validate_policy0(<<"dead-letter-exchange">>, Value)
when is_binary(Value) ->
ok;
diff --git a/deps/rabbit/test/consumer_timeout_SUITE.erl b/deps/rabbit/test/consumer_timeout_SUITE.erl
index 9a0721b1ff..57d678a556 100644
--- a/deps/rabbit/test/consumer_timeout_SUITE.erl
+++ b/deps/rabbit/test/consumer_timeout_SUITE.erl
@@ -13,27 +13,58 @@
-compile(export_all).
--define(TIMEOUT, 30000).
+-define(CONSUMER_TIMEOUT, 3000).
+-define(RECEIVE_TIMEOUT, 5000).
+
+-define(GROUP_CONFIG,
+ #{global_consumer_timeout => [{rabbit, [{consumer_timeout, ?CONSUMER_TIMEOUT}]},
+ {queue_policy, []},
+ {queue_arguments, []},
+ {consumer_arguments, []}],
+ queue_policy_consumer_timeout => [{rabbit, []},
+ {queue_policy, [{<<"consumer-timeout">>, ?CONSUMER_TIMEOUT}]},
+ {queue_arguments, []},
+ {consumer_arguments, []}],
+ queue_argument_consumer_timeout => [{rabbit, []},
+ {queue_policy, []},
+ {queue_arguments, [{<<"x-consumer-timeout">>, long, ?CONSUMER_TIMEOUT}]},
+ {consumer_arguments, []}],
+ consumer_argument_consumer_timeout => [{rabbit, []},
+ {queue_policy, []},
+ {queue_arguments, []},
+ {consumer_arguments, [{<<"x-consumer-timeout">>, long, ?CONSUMER_TIMEOUT}]}]}).
-import(quorum_queue_utils, [wait_for_messages/2]).
all() ->
[
- {group, parallel_tests}
+ {group, global_consumer_timeout},
+ {group, queue_policy_consumer_timeout},
+ {group, queue_argument_consumer_timeout},
+ {group, consumer_argument_consumer_timeout}
].
groups() ->
- AllTests = [consumer_timeout,
- consumer_timeout_basic_get,
- consumer_timeout_no_basic_cancel_capability
- ],
- [
- {parallel_tests, [],
- [
+ ConsumerTests = [consumer_timeout,
+ consumer_timeout_no_basic_cancel_capability],
+ AllTests = ConsumerTests ++ [consumer_timeout_basic_get],
+
+ ConsumerTestsParallel = [
+ {classic_queue, [parallel], ConsumerTests},
+ {mirrored_queue, [parallel], ConsumerTests},
+ {quorum_queue, [parallel], ConsumerTests}
+ ],
+
+ AllTestsParallel = [
{classic_queue, [parallel], AllTests},
{mirrored_queue, [parallel], AllTests},
{quorum_queue, [parallel], AllTests}
- ]}
+ ],
+ [
+ {global_consumer_timeout, [], AllTestsParallel},
+ {queue_policy_consumer_timeout, [], AllTestsParallel},
+ {queue_argument_consumer_timeout, [], AllTestsParallel},
+ {consumer_argument_consumer_timeout, [], ConsumerTestsParallel}
].
suite() ->
@@ -55,33 +86,36 @@ end_per_suite(Config) ->
init_per_group(classic_queue, Config) ->
rabbit_ct_helpers:set_config(
Config,
- [{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
+ [{policy_type, <<"classic_queues">>},
+ {queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
{queue_durable, true}]);
init_per_group(quorum_queue, Config) ->
rabbit_ct_helpers:set_config(
Config,
- [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]},
+ [{policy_type, <<"quorum_queues">>},
+ {queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]},
{queue_durable, true}]);
init_per_group(mirrored_queue, Config) ->
rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>,
<<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]),
Config1 = rabbit_ct_helpers:set_config(
- Config, [{is_mirrored, true},
+ Config, [{policy_type, <<"classic_queues">>},
+ {is_mirrored, true},
{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
{queue_durable, true}]),
rabbit_ct_helpers:run_steps(Config1, []);
init_per_group(Group, Config0) ->
case lists:member({group, Group}, all()) of
true ->
+ GroupConfig = maps:get(Group, ?GROUP_CONFIG),
ClusterSize = 3,
Config = rabbit_ct_helpers:merge_app_env(
Config0, {rabbit, [{channel_tick_interval, 1000},
- {quorum_tick_interval, 1000},
- {consumer_timeout, 5000}]}),
+ {quorum_tick_interval, 1000}] ++ ?config(rabbit, GroupConfig)}),
Config1 = rabbit_ct_helpers:set_config(
Config, [ {rmq_nodename_suffix, Group},
{rmq_nodes_count, ClusterSize}
- ]),
+ ] ++ GroupConfig),
rabbit_ct_helpers:run_steps(Config1,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps());
@@ -92,6 +126,11 @@ init_per_group(Group, Config0) ->
end_per_group(Group, Config) ->
case lists:member({group, Group}, all()) of
true ->
+ case ?config(queue_policy, Config) of
+ [] -> ok;
+ _Policy ->
+ rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"consumer_timeout_queue_test_policy">>)
+ end,
rabbit_ct_helpers:run_steps(Config,
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps());
@@ -119,12 +158,12 @@ consumer_timeout(Config) ->
declare_queue(Ch, Config, QName),
publish(Ch, QName, [<<"msg1">>]),
wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
- subscribe(Ch, QName, false),
+ subscribe(Ch, QName, false, ?config(consumer_arguments, Config)),
erlang:monitor(process, Conn),
erlang:monitor(process, Ch),
receive
{'DOWN', _, process, Ch, _} -> ok
- after 30000 ->
+ after ?RECEIVE_TIMEOUT ->
flush(1),
exit(channel_exit_expected)
end,
@@ -149,7 +188,7 @@ consumer_timeout_basic_get(Config) ->
erlang:monitor(process, Ch),
receive
{'DOWN', _, process, Ch, _} -> ok
- after 30000 ->
+ after ?RECEIVE_TIMEOUT ->
flush(1),
exit(channel_exit_expected)
end,
@@ -187,18 +226,18 @@ consumer_timeout_no_basic_cancel_capability(Config) ->
wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
erlang:monitor(process, Conn),
erlang:monitor(process, Ch),
- subscribe(Ch, QName, false),
+ subscribe(Ch, QName, false, ?config(consumer_arguments, Config)),
receive
{#'basic.deliver'{delivery_tag = _,
redelivered = false}, _} ->
%% do nothing with the delivery should trigger timeout
ok
- after 5000 ->
+ after ?RECEIVE_TIMEOUT ->
exit(deliver_timeout)
end,
receive
{'DOWN', _, process, Ch, _} -> ok
- after 30000 ->
+ after ?RECEIVE_TIMEOUT ->
flush(1),
exit(channel_exit_expected)
end,
@@ -217,8 +256,14 @@ consumer_timeout_no_basic_cancel_capability(Config) ->
declare_queue(Ch, Config, QName) ->
Args = ?config(queue_args, Config),
Durable = ?config(queue_durable, Config),
+ case ?config(queue_policy, Config) of
+ [] -> ok;
+ Policy ->
+ rabbit_ct_broker_helpers:set_policy(Config, 0, <<"consumer_timeout_queue_test_policy">>,
+ <<".*">>, ?config(policy_type, Config), Policy)
+ end,
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName,
- arguments = Args,
+ arguments = Args ++ ?config(queue_arguments, Config),
durable = Durable}).
publish(Ch, QName, Payloads) ->
[amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload})
@@ -235,13 +280,15 @@ consume(Ch, QName, NoAck, Payloads) ->
DTag
end || Payload <- Payloads].
-subscribe(Ch, Queue, NoAck) ->
- subscribe(Ch, Queue, NoAck, <<"ctag">>).
+subscribe(Ch, Queue, NoAck, Args) ->
+ subscribe(Ch, Queue, NoAck, <<"ctag">>, Args).
-subscribe(Ch, Queue, NoAck, Ctag) ->
+subscribe(Ch, Queue, NoAck, Ctag, Args) ->
amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue,
no_ack = NoAck,
- consumer_tag = Ctag},
+ consumer_tag = Ctag,
+ arguments = Args
+ },
self()),
receive
#'basic.consume_ok'{consumer_tag = Ctag} ->