summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKarl Nilsson <kjnilsson@gmail.com>2021-11-03 16:26:40 +0000
committermergify-bot <noreply@mergify.io>2021-11-08 13:05:51 +0000
commit0efba041228e230430196f55ff01375ee7dae4c1 (patch)
treee2207ffc1f41d3b8bf09fb88712ef475d64e050c
parent1f4eadb829151cedd9e9e87e8755e9f6de562aad (diff)
downloadrabbitmq-server-git-0efba041228e230430196f55ff01375ee7dae4c1.tar.gz
QQ: set better timeouts for commands
Refactor how the single active consumer check is performed when consuming. Improve timeouts in rabbit_fifo_client. (cherry picked from commit 1c6e45257d824ef72b11fad171bb11a88bb9a4b5)
-rw-r--r--deps/rabbit/src/rabbit_channel.erl6
-rw-r--r--deps/rabbit/src/rabbit_fifo_client.erl33
-rw-r--r--deps/rabbit/src/rabbit_quorum_queue.erl66
-rw-r--r--deps/rabbit/test/quorum_queue_SUITE.erl10
4 files changed, 73 insertions, 42 deletions
diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl
index e50fba8596..2d5437421d 100644
--- a/deps/rabbit/src/rabbit_channel.erl
+++ b/deps/rabbit/src/rabbit_channel.erl
@@ -1475,6 +1475,10 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
rabbit_misc:protocol_error(
not_implemented, "~s does not support global qos",
[rabbit_misc:rs(QueueName)]);
+ {error, timeout} ->
+ rabbit_misc:protocol_error(
+ internal_error, "~s timeout occurred during consume operation",
+ [rabbit_misc:rs(QueueName)]);
{error, no_local_stream_replica_available} ->
rabbit_misc:protocol_error(
resource_error, "~s does not not have a running local replica",
@@ -1803,6 +1807,8 @@ basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag,
E;
{{error, no_local_stream_replica_available} = E, _Q} ->
E;
+ {{error, timeout} = E, _Q} ->
+ E;
{{protocol_error, Type, Reason, ReasonArgs}, _Q} ->
rabbit_misc:protocol_error(Type, Reason, ReasonArgs)
end.
diff --git a/deps/rabbit/src/rabbit_fifo_client.erl b/deps/rabbit/src/rabbit_fifo_client.erl
index 1544e2be06..f9beb7928b 100644
--- a/deps/rabbit/src/rabbit_fifo_client.erl
+++ b/deps/rabbit/src/rabbit_fifo_client.erl
@@ -31,13 +31,15 @@
update_machine_state/2,
pending_size/1,
stat/1,
- stat/2
+ stat/2,
+ query_single_active_consumer/1
]).
-include_lib("rabbit_common/include/rabbit.hrl").
-define(SOFT_LIMIT, 32).
-define(TIMER_TIME, 10000).
+-define(COMMAND_TIMEOUT, 30000).
-type seq() :: non_neg_integer().
%% last_applied is initialised to -1
@@ -149,8 +151,6 @@ enqueue(Correlation, Msg,
case rpc:call(Node, ra_machine, version, [{machine, rabbit_fifo, #{}}]) of
0 ->
%% the leader is running the old version
- %% so we can't initialize the enqueuer session safely
- %% fall back on old behavour
enqueue(Correlation, Msg, State0#state{queue_status = go});
1 ->
%% were running the new version on the leader do sync initialisation
@@ -395,6 +395,20 @@ checkout(ConsumerTag, NumUnsettled, CreditMode, Meta,
ack = Ack}, CDels0),
try_process_command(Servers, Cmd, State0#state{consumer_deliveries = SDels}).
+
+-spec query_single_active_consumer(state()) ->
+ {ok, term()} | {error, term()} | {timeout, term()}.
+query_single_active_consumer(#state{leader = undefined}) ->
+ {error, leader_not_known};
+query_single_active_consumer(#state{leader = Leader}) ->
+ case ra:local_query(Leader, fun rabbit_fifo:query_single_active_consumer/1,
+ ?COMMAND_TIMEOUT) of
+ {ok, {_, Reply}, _} ->
+ {ok, Reply};
+ Err ->
+ Err
+ end.
+
%% @doc Provide credit to the queue
%%
%% This only has an effect if the consumer uses credit mode: credited
@@ -444,8 +458,8 @@ cancel_checkout(ConsumerTag, #state{consumer_deliveries = CDels} = State0) ->
%% @doc Purges all the messages from a rabbit_fifo queue and returns the number
%% of messages purged.
-spec purge(ra:server_id()) -> {ok, non_neg_integer()} | {error | timeout, term()}.
-purge(Node) ->
- case ra:process_command(Node, rabbit_fifo:make_purge()) of
+purge(Server) ->
+ case ra:process_command(Server, rabbit_fifo:make_purge(), ?COMMAND_TIMEOUT) of
{ok, {purge, Reply}, _} ->
{ok, Reply};
Err ->
@@ -482,7 +496,7 @@ cluster_name(#state{cfg = #cfg{cluster_name = ClusterName}}) ->
ClusterName.
update_machine_state(Server, Conf) ->
- case ra:process_command(Server, rabbit_fifo:make_update_config(Conf)) of
+ case ra:process_command(Server, rabbit_fifo:make_update_config(Conf), ?COMMAND_TIMEOUT) of
{ok, ok, _} ->
ok;
Err ->
@@ -640,8 +654,9 @@ untracked_enqueue([Node | _], Msg) ->
%% Internal
-try_process_command([Server | Rem], Cmd, State) ->
- case ra:process_command(Server, Cmd, 30000) of
+try_process_command([Server | Rem], Cmd,
+ #state{cfg = #cfg{timeout = Timeout}} = State) ->
+ case ra:process_command(Server, Cmd, Timeout) of
{ok, _, Leader} ->
{ok, State#state{leader = Leader}};
Err when length(Rem) =:= 0 ->
@@ -801,7 +816,7 @@ get_missing_deliveries(Leader, From, To, ConsumerTag) ->
Query = fun (State) ->
rabbit_fifo:get_checked_out(ConsumerId, From, To, State)
end,
- case ra:local_query(Leader, Query) of
+ case ra:local_query(Leader, Query, ?COMMAND_TIMEOUT) of
{ok, {_, Missing}, _} ->
Missing;
{error, Error} ->
diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl
index d63757a59f..9c46adb77b 100644
--- a/deps/rabbit/src/rabbit_quorum_queue.erl
+++ b/deps/rabbit/src/rabbit_quorum_queue.erl
@@ -710,7 +710,7 @@ dequeue(NoAck, _LimiterPid, CTag0, QState0) ->
rabbit_queue_type:consume_spec(),
rabbit_fifo_client:state()) ->
{ok, rabbit_fifo_client:state(), rabbit_queue_type:actions()} |
- {error, global_qos_not_supported_for_queue_type}.
+ {error, global_qos_not_supported_for_queue_type | timeout}.
consume(Q, #{limiter_active := true}, _State)
when ?amqqueue_is_quorum(Q) ->
{error, global_qos_not_supported_for_queue_type};
@@ -726,7 +726,6 @@ consume(Q, Spec, QState0) when ?amqqueue_is_quorum(Q) ->
%% TODO: validate consumer arguments
%% currently quorum queues do not support any arguments
QName = amqqueue:get_name(Q),
- QPid = amqqueue:get_pid(Q),
maybe_send_reply(ChPid, OkMsg),
ConsumerTag = quorum_ctag(ConsumerTag0),
%% A prefetch count of 0 means no limitation,
@@ -758,36 +757,43 @@ consume(Q, Spec, QState0) when ?amqqueue_is_quorum(Q) ->
QState1);
_ -> QState1
end,
- case ra:local_query(QPid,
- fun rabbit_fifo:query_single_active_consumer/1) of
- {ok, {_, SacResult}, _} ->
- SingleActiveConsumerOn = single_active_consumer_on(Q),
- {IsSingleActiveConsumer, ActivityStatus} = case {SingleActiveConsumerOn, SacResult} of
- {false, _} ->
- {true, up};
- {true, {value, {ConsumerTag, ChPid}}} ->
- {true, single_active};
- _ ->
- {false, waiting}
- end,
+ case single_active_consumer_on(Q) of
+ true ->
+ %% get the leader from state
+ case rabbit_fifo_client:query_single_active_consumer(QState) of
+ {ok, SacResult} ->
+ ActivityStatus = case SacResult of
+ {value, {ConsumerTag, ChPid}} ->
+ single_active;
+ _ ->
+ waiting
+ end,
+ rabbit_core_metrics:consumer_created(
+ ChPid, ConsumerTag, ExclusiveConsume,
+ AckRequired, QName,
+ ConsumerPrefetchCount, true, %% IsSingleSctiveconsumer
+ ActivityStatus, Args),
+ emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
+ AckRequired, QName, Prefetch,
+ Args, none, ActingUser),
+ {ok, QState, []};
+ {error, Error} ->
+ Error;
+ {timeout, _} ->
+ {error, timeout}
+ end;
+ false ->
rabbit_core_metrics:consumer_created(
- ChPid, ConsumerTag, ExclusiveConsume,
- AckRequired, QName,
- ConsumerPrefetchCount, IsSingleActiveConsumer,
- ActivityStatus, Args),
+ ChPid, ConsumerTag, ExclusiveConsume,
+ AckRequired, QName,
+ ConsumerPrefetchCount, false, %% issingleactiveconsumer
+ up, Args),
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
- AckRequired, QName, Prefetch,
- Args, none, ActingUser),
- {ok, QState, []};
- {error, Error} ->
- Error;
- {timeout, _} ->
- {error, timeout}
+ AckRequired, QName, Prefetch,
+ Args, none, ActingUser),
+ {ok, QState, []}
end.
-% -spec basic_cancel(rabbit_types:ctag(), ChPid :: pid(), any(), rabbit_fifo_client:state()) ->
-% {'ok', rabbit_fifo_client:state()}.
-
cancel(_Q, ConsumerTag, OkMsg, _ActingUser, State) ->
maybe_send_reply(self(), OkMsg),
rabbit_fifo_client:cancel_checkout(quorum_ctag(ConsumerTag), State).
@@ -904,8 +910,8 @@ stat(Q, Timeout) when ?is_amqqueue(Q) ->
-spec purge(amqqueue:amqqueue()) ->
{ok, non_neg_integer()}.
purge(Q) when ?is_amqqueue(Q) ->
- Node = amqqueue:get_pid(Q),
- rabbit_fifo_client:purge(Node).
+ Server = amqqueue:get_pid(Q),
+ rabbit_fifo_client:purge(Server).
requeue(ConsumerTag, MsgIds, QState) ->
rabbit_fifo_client:return(quorum_ctag(ConsumerTag), MsgIds, QState).
diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl
index 8911f1d94e..09dd70e729 100644
--- a/deps/rabbit/test/quorum_queue_SUITE.erl
+++ b/deps/rabbit/test/quorum_queue_SUITE.erl
@@ -1441,7 +1441,7 @@ confirm_availability_on_leader_change(Config) ->
Ch = rabbit_ct_client_helpers:open_channel(Config, Node1),
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
ConfirmLoop = fun Loop() ->
- ok = publish_confirm(Ch, QQ, 5000),
+ ok = publish_confirm(Ch, QQ, 15000),
receive
{done, P} ->
P ! publisher_done,
@@ -1461,9 +1461,13 @@ confirm_availability_on_leader_change(Config) ->
timer:sleep(500),
Publisher ! {done, self()},
receive
- publisher_done -> ok;
- {'EXIT', Publisher, Err} -> exit(Err)
+ publisher_done ->
+ ok;
+ {'EXIT', Publisher, Err} ->
+ ok = rabbit_ct_broker_helpers:start_node(Config, Node2),
+ exit(Err)
after 30000 ->
+ ok = rabbit_ct_broker_helpers:start_node(Config, Node2),
flush(100),
exit(nothing_received_from_publisher_process)
end,