diff options
author | Karl Nilsson <kjnilsson@gmail.com> | 2021-11-03 16:26:40 +0000 |
---|---|---|
committer | Karl Nilsson <kjnilsson@gmail.com> | 2021-11-08 11:07:41 +0000 |
commit | 1c6e45257d824ef72b11fad171bb11a88bb9a4b5 (patch) | |
tree | df124d30bb9405bbb541fa6c4d8cba265f8bde8f | |
parent | 2fb1291c735028f359f88108a47232cf124179c0 (diff) | |
download | rabbitmq-server-git-1c6e45257d824ef72b11fad171bb11a88bb9a4b5.tar.gz |
QQ: set better timeouts for commands
Refactor how the single active consumer check is performed when consuming.
Improve timeouts in rabbit_fifo_client.
-rw-r--r-- | deps/rabbit/src/rabbit_channel.erl | 6 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_fifo_client.erl | 33 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_quorum_queue.erl | 66 | ||||
-rw-r--r-- | deps/rabbit/test/quorum_queue_SUITE.erl | 10 |
4 files changed, 73 insertions, 42 deletions
diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index e50fba8596..2d5437421d 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -1475,6 +1475,10 @@ handle_method(#'basic.consume'{queue = QueueNameBin, rabbit_misc:protocol_error( not_implemented, "~s does not support global qos", [rabbit_misc:rs(QueueName)]); + {error, timeout} -> + rabbit_misc:protocol_error( + internal_error, "~s timeout occurred during consume operation", + [rabbit_misc:rs(QueueName)]); {error, no_local_stream_replica_available} -> rabbit_misc:protocol_error( resource_error, "~s does not not have a running local replica", @@ -1803,6 +1807,8 @@ basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag, E; {{error, no_local_stream_replica_available} = E, _Q} -> E; + {{error, timeout} = E, _Q} -> + E; {{protocol_error, Type, Reason, ReasonArgs}, _Q} -> rabbit_misc:protocol_error(Type, Reason, ReasonArgs) end. diff --git a/deps/rabbit/src/rabbit_fifo_client.erl b/deps/rabbit/src/rabbit_fifo_client.erl index 1544e2be06..f9beb7928b 100644 --- a/deps/rabbit/src/rabbit_fifo_client.erl +++ b/deps/rabbit/src/rabbit_fifo_client.erl @@ -31,13 +31,15 @@ update_machine_state/2, pending_size/1, stat/1, - stat/2 + stat/2, + query_single_active_consumer/1 ]). -include_lib("rabbit_common/include/rabbit.hrl"). -define(SOFT_LIMIT, 32). -define(TIMER_TIME, 10000). +-define(COMMAND_TIMEOUT, 30000). -type seq() :: non_neg_integer(). %% last_applied is initialised to -1 @@ -149,8 +151,6 @@ enqueue(Correlation, Msg, case rpc:call(Node, ra_machine, version, [{machine, rabbit_fifo, #{}}]) of 0 -> %% the leader is running the old version - %% so we can't initialize the enqueuer session safely - %% fall back on old behavour enqueue(Correlation, Msg, State0#state{queue_status = go}); 1 -> %% were running the new version on the leader do sync initialisation @@ -395,6 +395,20 @@ checkout(ConsumerTag, NumUnsettled, CreditMode, Meta, ack = Ack}, CDels0), try_process_command(Servers, Cmd, State0#state{consumer_deliveries = SDels}). + +-spec query_single_active_consumer(state()) -> + {ok, term()} | {error, term()} | {timeout, term()}. +query_single_active_consumer(#state{leader = undefined}) -> + {error, leader_not_known}; +query_single_active_consumer(#state{leader = Leader}) -> + case ra:local_query(Leader, fun rabbit_fifo:query_single_active_consumer/1, + ?COMMAND_TIMEOUT) of + {ok, {_, Reply}, _} -> + {ok, Reply}; + Err -> + Err + end. + %% @doc Provide credit to the queue %% %% This only has an effect if the consumer uses credit mode: credited @@ -444,8 +458,8 @@ cancel_checkout(ConsumerTag, #state{consumer_deliveries = CDels} = State0) -> %% @doc Purges all the messages from a rabbit_fifo queue and returns the number %% of messages purged. -spec purge(ra:server_id()) -> {ok, non_neg_integer()} | {error | timeout, term()}. -purge(Node) -> - case ra:process_command(Node, rabbit_fifo:make_purge()) of +purge(Server) -> + case ra:process_command(Server, rabbit_fifo:make_purge(), ?COMMAND_TIMEOUT) of {ok, {purge, Reply}, _} -> {ok, Reply}; Err -> @@ -482,7 +496,7 @@ cluster_name(#state{cfg = #cfg{cluster_name = ClusterName}}) -> ClusterName. update_machine_state(Server, Conf) -> - case ra:process_command(Server, rabbit_fifo:make_update_config(Conf)) of + case ra:process_command(Server, rabbit_fifo:make_update_config(Conf), ?COMMAND_TIMEOUT) of {ok, ok, _} -> ok; Err -> @@ -640,8 +654,9 @@ untracked_enqueue([Node | _], Msg) -> %% Internal -try_process_command([Server | Rem], Cmd, State) -> - case ra:process_command(Server, Cmd, 30000) of +try_process_command([Server | Rem], Cmd, + #state{cfg = #cfg{timeout = Timeout}} = State) -> + case ra:process_command(Server, Cmd, Timeout) of {ok, _, Leader} -> {ok, State#state{leader = Leader}}; Err when length(Rem) =:= 0 -> @@ -801,7 +816,7 @@ get_missing_deliveries(Leader, From, To, ConsumerTag) -> Query = fun (State) -> rabbit_fifo:get_checked_out(ConsumerId, From, To, State) end, - case ra:local_query(Leader, Query) of + case ra:local_query(Leader, Query, ?COMMAND_TIMEOUT) of {ok, {_, Missing}, _} -> Missing; {error, Error} -> diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index d63757a59f..9c46adb77b 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -710,7 +710,7 @@ dequeue(NoAck, _LimiterPid, CTag0, QState0) -> rabbit_queue_type:consume_spec(), rabbit_fifo_client:state()) -> {ok, rabbit_fifo_client:state(), rabbit_queue_type:actions()} | - {error, global_qos_not_supported_for_queue_type}. + {error, global_qos_not_supported_for_queue_type | timeout}. consume(Q, #{limiter_active := true}, _State) when ?amqqueue_is_quorum(Q) -> {error, global_qos_not_supported_for_queue_type}; @@ -726,7 +726,6 @@ consume(Q, Spec, QState0) when ?amqqueue_is_quorum(Q) -> %% TODO: validate consumer arguments %% currently quorum queues do not support any arguments QName = amqqueue:get_name(Q), - QPid = amqqueue:get_pid(Q), maybe_send_reply(ChPid, OkMsg), ConsumerTag = quorum_ctag(ConsumerTag0), %% A prefetch count of 0 means no limitation, @@ -758,36 +757,43 @@ consume(Q, Spec, QState0) when ?amqqueue_is_quorum(Q) -> QState1); _ -> QState1 end, - case ra:local_query(QPid, - fun rabbit_fifo:query_single_active_consumer/1) of - {ok, {_, SacResult}, _} -> - SingleActiveConsumerOn = single_active_consumer_on(Q), - {IsSingleActiveConsumer, ActivityStatus} = case {SingleActiveConsumerOn, SacResult} of - {false, _} -> - {true, up}; - {true, {value, {ConsumerTag, ChPid}}} -> - {true, single_active}; - _ -> - {false, waiting} - end, + case single_active_consumer_on(Q) of + true -> + %% get the leader from state + case rabbit_fifo_client:query_single_active_consumer(QState) of + {ok, SacResult} -> + ActivityStatus = case SacResult of + {value, {ConsumerTag, ChPid}} -> + single_active; + _ -> + waiting + end, + rabbit_core_metrics:consumer_created( + ChPid, ConsumerTag, ExclusiveConsume, + AckRequired, QName, + ConsumerPrefetchCount, true, %% IsSingleSctiveconsumer + ActivityStatus, Args), + emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, + AckRequired, QName, Prefetch, + Args, none, ActingUser), + {ok, QState, []}; + {error, Error} -> + Error; + {timeout, _} -> + {error, timeout} + end; + false -> rabbit_core_metrics:consumer_created( - ChPid, ConsumerTag, ExclusiveConsume, - AckRequired, QName, - ConsumerPrefetchCount, IsSingleActiveConsumer, - ActivityStatus, Args), + ChPid, ConsumerTag, ExclusiveConsume, + AckRequired, QName, + ConsumerPrefetchCount, false, %% issingleactiveconsumer + up, Args), emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, - AckRequired, QName, Prefetch, - Args, none, ActingUser), - {ok, QState, []}; - {error, Error} -> - Error; - {timeout, _} -> - {error, timeout} + AckRequired, QName, Prefetch, + Args, none, ActingUser), + {ok, QState, []} end. -% -spec basic_cancel(rabbit_types:ctag(), ChPid :: pid(), any(), rabbit_fifo_client:state()) -> -% {'ok', rabbit_fifo_client:state()}. - cancel(_Q, ConsumerTag, OkMsg, _ActingUser, State) -> maybe_send_reply(self(), OkMsg), rabbit_fifo_client:cancel_checkout(quorum_ctag(ConsumerTag), State). @@ -904,8 +910,8 @@ stat(Q, Timeout) when ?is_amqqueue(Q) -> -spec purge(amqqueue:amqqueue()) -> {ok, non_neg_integer()}. purge(Q) when ?is_amqqueue(Q) -> - Node = amqqueue:get_pid(Q), - rabbit_fifo_client:purge(Node). + Server = amqqueue:get_pid(Q), + rabbit_fifo_client:purge(Server). requeue(ConsumerTag, MsgIds, QState) -> rabbit_fifo_client:return(quorum_ctag(ConsumerTag), MsgIds, QState). diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 1de9d1f5db..cdbb59d12b 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -1450,7 +1450,7 @@ confirm_availability_on_leader_change(Config) -> Ch = rabbit_ct_client_helpers:open_channel(Config, Node1), #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), ConfirmLoop = fun Loop() -> - ok = publish_confirm(Ch, QQ, 5000), + ok = publish_confirm(Ch, QQ, 15000), receive {done, P} -> P ! publisher_done, @@ -1470,9 +1470,13 @@ confirm_availability_on_leader_change(Config) -> timer:sleep(500), Publisher ! {done, self()}, receive - publisher_done -> ok; - {'EXIT', Publisher, Err} -> exit(Err) + publisher_done -> + ok; + {'EXIT', Publisher, Err} -> + ok = rabbit_ct_broker_helpers:start_node(Config, Node2), + exit(Err) after 30000 -> + ok = rabbit_ct_broker_helpers:start_node(Config, Node2), flush(100), exit(nothing_received_from_publisher_process) end, |