diff options
author | Iliia Khaprov <i.khaprov@gmail.com> | 2023-04-10 00:06:44 +0200 |
---|---|---|
committer | Iliia Khaprov <i.khaprov@gmail.com> | 2023-04-25 18:10:46 +0200 |
commit | 4e8f05e0ca143e6d940ea3c9112fae147a1e944b (patch) | |
tree | 58c2e18eefdf87729a78a52fc246e7234f912252 | |
parent | fd08728ff17cfa812f040acc567cf9192e90e74a (diff) | |
download | rabbitmq-server-git-4e8f05e0ca143e6d940ea3c9112fae147a1e944b.tar.gz |
Allow setting consumer timeout via queue policy/arg and as consumer arg. Close #5437
-rw-r--r-- | deps/rabbit/src/rabbit_channel.erl | 76 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_networking.erl | 2 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_policies.erl | 7 | ||||
-rw-r--r-- | deps/rabbit/test/consumer_timeout_SUITE.erl | 101 |
4 files changed, 137 insertions, 49 deletions
diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 869610f9ae..3f8116f5f5 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -2804,34 +2804,68 @@ get_operation_timeout_and_deadline() -> Deadline = now_millis() + Timeout, {Timeout, Deadline}. -evaluate_consumer_timeout(State0 = #ch{cfg = #conf{channel = Channel, - consumer_timeout = Timeout}, - unacked_message_q = UAMQ}) -> - Now = os:system_time(millisecond), +get_queue_consumer_timeout(_PA = #pending_ack{queue = QName}, + _State = #ch{cfg = #conf{consumer_timeout = GCT}}) -> + case rabbit_amqqueue:lookup(QName) of + {ok, Q} -> %% should we account for different queue states here? + case rabbit_queue_type_util:args_policy_lookup(<<"consumer-timeout">>, + fun (X, Y) -> erlang:min(X, Y) end, Q) of + undefined -> GCT; + Val -> Val + end; + _ -> + GCT + end. + +get_consumer_timeout(PA = #pending_ack{tag = CTag}, + State = #ch{consumer_mapping = CMap, + cfg = #conf{consumer_timeout = GCT}}) -> + case maps:find(CTag, CMap) of + {ok, {_, {_, _, _, Args}}} -> + case rabbit_misc:table_lookup(Args, <<"x-consumer-timeout">>) of + {long, Timeout} -> Timeout; + _ -> get_queue_consumer_timeout(PA, State) + end; + _ -> + get_queue_consumer_timeout(PA, State) + end. + +evaluate_consumer_timeout(State = #ch{unacked_message_q = UAMQ}) -> case ?QUEUE:get(UAMQ, empty) of - #pending_ack{delivery_tag = ConsumerTag, - delivered_at = Time} - when is_integer(Timeout) - andalso Time < Now - Timeout -> - rabbit_log_channel:warning("Consumer ~ts on channel ~w has timed out " - "waiting for delivery acknowledgement. Timeout used: ~tp ms. " - "This timeout value can be configured, see consumers doc guide to learn more", - [rabbit_data_coercion:to_binary(ConsumerTag), - Channel, Timeout]), - Ex = rabbit_misc:amqp_error(precondition_failed, - "delivery acknowledgement on channel ~w timed out. " - "Timeout value used: ~tp ms. " - "This timeout value can be configured, see consumers doc guide to learn more", - [Channel, Timeout], none), - handle_exception(Ex, State0); + empty -> + {noreply, State}; + PA -> evaluate_consumer_timeout1(PA, State) + end. + +evaluate_consumer_timeout1(PA = #pending_ack{delivered_at = Time}, + State) -> + Now = os:system_time(millisecond), + case get_consumer_timeout(PA, State) of + Timeout when is_integer(Timeout) + andalso Time < Now - Timeout -> + handle_consumer_timed_out(Timeout, PA, State); _ -> - {noreply, State0} + {noreply, State} end. +handle_consumer_timed_out(Timeout,#pending_ack{delivery_tag = DeliveryTag}, + State = #ch{cfg = #conf{channel = Channel}}) -> + rabbit_log_channel:warning("Consumer ~ts on channel ~w has timed out " + "waiting for delivery acknowledgement. Timeout used: ~tp ms. " + "This timeout value can be configured, see consumers doc guide to learn more", + [rabbit_data_coercion:to_binary(DeliveryTag), + Channel, Timeout]), + Ex = rabbit_misc:amqp_error(precondition_failed, + "delivery acknowledgement on channel ~w timed out. " + "Timeout value used: ~tp ms. " + "This timeout value can be configured, see consumers doc guide to learn more", + [Channel, Timeout], none), + handle_exception(Ex, State). + handle_queue_actions(Actions, #ch{} = State0) -> WriterPid = State0#ch.cfg#conf.writer_pid, lists:foldl( - fun + fun ({settled, QRef, MsgSeqNos}, S0) -> confirm(MsgSeqNos, QRef, S0); ({rejected, _QRef, MsgSeqNos}, S0) -> diff --git a/deps/rabbit/src/rabbit_networking.erl b/deps/rabbit/src/rabbit_networking.erl index 1ef40cb7d3..35711faf75 100644 --- a/deps/rabbit/src/rabbit_networking.erl +++ b/deps/rabbit/src/rabbit_networking.erl @@ -523,7 +523,7 @@ emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) -> emit_connection_info_local(Items, Ref, AggregatorPid) -> rabbit_control_misc:emitting_map_with_exit_handler( AggregatorPid, Ref, fun(Q) -> connection_info(Q, Items) end, - connections_local()). + connections_local() ++ rabbit_networking:local_non_amqp_connections()). -spec close_connection(pid(), string()) -> 'ok'. diff --git a/deps/rabbit/src/rabbit_policies.erl b/deps/rabbit/src/rabbit_policies.erl index 3d6ab3dffa..8422c86093 100644 --- a/deps/rabbit/src/rabbit_policies.erl +++ b/deps/rabbit/src/rabbit_policies.erl @@ -28,6 +28,7 @@ register() -> %% such as rabbit_mirror_queue_misc [rabbit_registry:register(Class, Name, ?MODULE) || {Class, Name} <- [{policy_validator, <<"alternate-exchange">>}, + {policy_validator, <<"consumer-timeout">>}, {policy_validator, <<"dead-letter-exchange">>}, {policy_validator, <<"dead-letter-routing-key">>}, {policy_validator, <<"dead-letter-strategy">>}, @@ -74,6 +75,12 @@ validate_policy0(<<"alternate-exchange">>, Value) validate_policy0(<<"alternate-exchange">>, Value) -> {error, "~tp is not a valid alternate exchange name", [Value]}; +validate_policy0(<<"consumer-timeout">>, Value) + when is_integer(Value), Value >= 0 -> + ok; +validate_policy0(<<"consumer-timeout">>, Value) -> + {error, "~tp is not a valid consumer timeout", [Value]}; + validate_policy0(<<"dead-letter-exchange">>, Value) when is_binary(Value) -> ok; diff --git a/deps/rabbit/test/consumer_timeout_SUITE.erl b/deps/rabbit/test/consumer_timeout_SUITE.erl index 9a0721b1ff..57d678a556 100644 --- a/deps/rabbit/test/consumer_timeout_SUITE.erl +++ b/deps/rabbit/test/consumer_timeout_SUITE.erl @@ -13,27 +13,58 @@ -compile(export_all). --define(TIMEOUT, 30000). +-define(CONSUMER_TIMEOUT, 3000). +-define(RECEIVE_TIMEOUT, 5000). + +-define(GROUP_CONFIG, + #{global_consumer_timeout => [{rabbit, [{consumer_timeout, ?CONSUMER_TIMEOUT}]}, + {queue_policy, []}, + {queue_arguments, []}, + {consumer_arguments, []}], + queue_policy_consumer_timeout => [{rabbit, []}, + {queue_policy, [{<<"consumer-timeout">>, ?CONSUMER_TIMEOUT}]}, + {queue_arguments, []}, + {consumer_arguments, []}], + queue_argument_consumer_timeout => [{rabbit, []}, + {queue_policy, []}, + {queue_arguments, [{<<"x-consumer-timeout">>, long, ?CONSUMER_TIMEOUT}]}, + {consumer_arguments, []}], + consumer_argument_consumer_timeout => [{rabbit, []}, + {queue_policy, []}, + {queue_arguments, []}, + {consumer_arguments, [{<<"x-consumer-timeout">>, long, ?CONSUMER_TIMEOUT}]}]}). -import(quorum_queue_utils, [wait_for_messages/2]). all() -> [ - {group, parallel_tests} + {group, global_consumer_timeout}, + {group, queue_policy_consumer_timeout}, + {group, queue_argument_consumer_timeout}, + {group, consumer_argument_consumer_timeout} ]. groups() -> - AllTests = [consumer_timeout, - consumer_timeout_basic_get, - consumer_timeout_no_basic_cancel_capability - ], - [ - {parallel_tests, [], - [ + ConsumerTests = [consumer_timeout, + consumer_timeout_no_basic_cancel_capability], + AllTests = ConsumerTests ++ [consumer_timeout_basic_get], + + ConsumerTestsParallel = [ + {classic_queue, [parallel], ConsumerTests}, + {mirrored_queue, [parallel], ConsumerTests}, + {quorum_queue, [parallel], ConsumerTests} + ], + + AllTestsParallel = [ {classic_queue, [parallel], AllTests}, {mirrored_queue, [parallel], AllTests}, {quorum_queue, [parallel], AllTests} - ]} + ], + [ + {global_consumer_timeout, [], AllTestsParallel}, + {queue_policy_consumer_timeout, [], AllTestsParallel}, + {queue_argument_consumer_timeout, [], AllTestsParallel}, + {consumer_argument_consumer_timeout, [], ConsumerTestsParallel} ]. suite() -> @@ -55,33 +86,36 @@ end_per_suite(Config) -> init_per_group(classic_queue, Config) -> rabbit_ct_helpers:set_config( Config, - [{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]}, + [{policy_type, <<"classic_queues">>}, + {queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]}, {queue_durable, true}]); init_per_group(quorum_queue, Config) -> rabbit_ct_helpers:set_config( Config, - [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]}, + [{policy_type, <<"quorum_queues">>}, + {queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]}, {queue_durable, true}]); init_per_group(mirrored_queue, Config) -> rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>, <<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]), Config1 = rabbit_ct_helpers:set_config( - Config, [{is_mirrored, true}, + Config, [{policy_type, <<"classic_queues">>}, + {is_mirrored, true}, {queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]}, {queue_durable, true}]), rabbit_ct_helpers:run_steps(Config1, []); init_per_group(Group, Config0) -> case lists:member({group, Group}, all()) of true -> + GroupConfig = maps:get(Group, ?GROUP_CONFIG), ClusterSize = 3, Config = rabbit_ct_helpers:merge_app_env( Config0, {rabbit, [{channel_tick_interval, 1000}, - {quorum_tick_interval, 1000}, - {consumer_timeout, 5000}]}), + {quorum_tick_interval, 1000}] ++ ?config(rabbit, GroupConfig)}), Config1 = rabbit_ct_helpers:set_config( Config, [ {rmq_nodename_suffix, Group}, {rmq_nodes_count, ClusterSize} - ]), + ] ++ GroupConfig), rabbit_ct_helpers:run_steps(Config1, rabbit_ct_broker_helpers:setup_steps() ++ rabbit_ct_client_helpers:setup_steps()); @@ -92,6 +126,11 @@ init_per_group(Group, Config0) -> end_per_group(Group, Config) -> case lists:member({group, Group}, all()) of true -> + case ?config(queue_policy, Config) of + [] -> ok; + _Policy -> + rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"consumer_timeout_queue_test_policy">>) + end, rabbit_ct_helpers:run_steps(Config, rabbit_ct_client_helpers:teardown_steps() ++ rabbit_ct_broker_helpers:teardown_steps()); @@ -119,12 +158,12 @@ consumer_timeout(Config) -> declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>]), wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), - subscribe(Ch, QName, false), + subscribe(Ch, QName, false, ?config(consumer_arguments, Config)), erlang:monitor(process, Conn), erlang:monitor(process, Ch), receive {'DOWN', _, process, Ch, _} -> ok - after 30000 -> + after ?RECEIVE_TIMEOUT -> flush(1), exit(channel_exit_expected) end, @@ -149,7 +188,7 @@ consumer_timeout_basic_get(Config) -> erlang:monitor(process, Ch), receive {'DOWN', _, process, Ch, _} -> ok - after 30000 -> + after ?RECEIVE_TIMEOUT -> flush(1), exit(channel_exit_expected) end, @@ -187,18 +226,18 @@ consumer_timeout_no_basic_cancel_capability(Config) -> wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), erlang:monitor(process, Conn), erlang:monitor(process, Ch), - subscribe(Ch, QName, false), + subscribe(Ch, QName, false, ?config(consumer_arguments, Config)), receive {#'basic.deliver'{delivery_tag = _, redelivered = false}, _} -> %% do nothing with the delivery should trigger timeout ok - after 5000 -> + after ?RECEIVE_TIMEOUT -> exit(deliver_timeout) end, receive {'DOWN', _, process, Ch, _} -> ok - after 30000 -> + after ?RECEIVE_TIMEOUT -> flush(1), exit(channel_exit_expected) end, @@ -217,8 +256,14 @@ consumer_timeout_no_basic_cancel_capability(Config) -> declare_queue(Ch, Config, QName) -> Args = ?config(queue_args, Config), Durable = ?config(queue_durable, Config), + case ?config(queue_policy, Config) of + [] -> ok; + Policy -> + rabbit_ct_broker_helpers:set_policy(Config, 0, <<"consumer_timeout_queue_test_policy">>, + <<".*">>, ?config(policy_type, Config), Policy) + end, #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, - arguments = Args, + arguments = Args ++ ?config(queue_arguments, Config), durable = Durable}). publish(Ch, QName, Payloads) -> [amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload}) @@ -235,13 +280,15 @@ consume(Ch, QName, NoAck, Payloads) -> DTag end || Payload <- Payloads]. -subscribe(Ch, Queue, NoAck) -> - subscribe(Ch, Queue, NoAck, <<"ctag">>). +subscribe(Ch, Queue, NoAck, Args) -> + subscribe(Ch, Queue, NoAck, <<"ctag">>, Args). -subscribe(Ch, Queue, NoAck, Ctag) -> +subscribe(Ch, Queue, NoAck, Ctag, Args) -> amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue, no_ack = NoAck, - consumer_tag = Ctag}, + consumer_tag = Ctag, + arguments = Args + }, self()), receive #'basic.consume_ok'{consumer_tag = Ctag} -> |