summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <klishinm@vmware.com>2021-11-08 16:05:11 +0300
committerGitHub <noreply@github.com>2021-11-08 16:05:11 +0300
commit339322d339300d26179a2ca4a2ca6e9a7669aff2 (patch)
treea60deaec3a4dd57c6c5e67be8dd03e548a1839b1
parentd3d1caff1ecb9d6adfbe9e275b82ff79a16c7e92 (diff)
parent3d7edcea921a9fc15349475a61f0c47b6cc668ae (diff)
downloadrabbitmq-server-git-339322d339300d26179a2ca4a2ca6e9a7669aff2.tar.gz
Merge pull request #3628 from rabbitmq/qq-register-enqueuer-reliability
QQ: stability and channel side improvements
-rw-r--r--deps/rabbit/src/rabbit_amqqueue.erl2
-rw-r--r--deps/rabbit/src/rabbit_channel.erl6
-rw-r--r--deps/rabbit/src/rabbit_fifo_client.erl59
-rw-r--r--deps/rabbit/src/rabbit_queue_type.erl4
-rw-r--r--deps/rabbit/src/rabbit_quorum_queue.erl76
-rw-r--r--deps/rabbit/test/quorum_queue_SUITE.erl10
6 files changed, 102 insertions, 55 deletions
diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl
index 96db3bac07..c7c774dee0 100644
--- a/deps/rabbit/src/rabbit_amqqueue.erl
+++ b/deps/rabbit/src/rabbit_amqqueue.erl
@@ -677,7 +677,7 @@ priv_absent(QueueName, QPid, true, nodedown) ->
rabbit_misc:protocol_error(
not_found,
"home node '~s' of durable ~s is down or inaccessible",
- [node(QPid), rabbit_misc:rs(QueueName)]);
+ [amqqueue:qnode(QPid), rabbit_misc:rs(QueueName)]);
priv_absent(QueueName, _QPid, _IsDurable, stopped) ->
rabbit_misc:protocol_error(
diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl
index e50fba8596..2d5437421d 100644
--- a/deps/rabbit/src/rabbit_channel.erl
+++ b/deps/rabbit/src/rabbit_channel.erl
@@ -1475,6 +1475,10 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
rabbit_misc:protocol_error(
not_implemented, "~s does not support global qos",
[rabbit_misc:rs(QueueName)]);
+ {error, timeout} ->
+ rabbit_misc:protocol_error(
+ internal_error, "~s timeout occurred during consume operation",
+ [rabbit_misc:rs(QueueName)]);
{error, no_local_stream_replica_available} ->
rabbit_misc:protocol_error(
resource_error, "~s does not not have a running local replica",
@@ -1803,6 +1807,8 @@ basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag,
E;
{{error, no_local_stream_replica_available} = E, _Q} ->
E;
+ {{error, timeout} = E, _Q} ->
+ E;
{{protocol_error, Type, Reason, ReasonArgs}, _Q} ->
rabbit_misc:protocol_error(Type, Reason, ReasonArgs)
end.
diff --git a/deps/rabbit/src/rabbit_fifo_client.erl b/deps/rabbit/src/rabbit_fifo_client.erl
index 7ac1d7ba6e..f9beb7928b 100644
--- a/deps/rabbit/src/rabbit_fifo_client.erl
+++ b/deps/rabbit/src/rabbit_fifo_client.erl
@@ -31,13 +31,15 @@
update_machine_state/2,
pending_size/1,
stat/1,
- stat/2
+ stat/2,
+ query_single_active_consumer/1
]).
-include_lib("rabbit_common/include/rabbit.hrl").
-define(SOFT_LIMIT, 32).
-define(TIMER_TIME, 10000).
+-define(COMMAND_TIMEOUT, 30000).
-type seq() :: non_neg_integer().
%% last_applied is initialised to -1
@@ -142,27 +144,33 @@ init(ClusterName = #resource{}, Servers, SoftLimit, BlockFun, UnblockFun) ->
enqueue(Correlation, Msg,
#state{queue_status = undefined,
next_enqueue_seq = 1,
- cfg = #cfg{timeout = Timeout}} = State0) ->
+ cfg = #cfg{servers = Servers,
+ timeout = Timeout}} = State0) ->
%% it is the first enqueue, check the version
- {_, Node} = Server = pick_server(State0),
+ {_, Node} = pick_server(State0),
case rpc:call(Node, ra_machine, version, [{machine, rabbit_fifo, #{}}]) of
0 ->
%% the leader is running the old version
- %% so we can't initialize the enqueuer session safely
- %% fall back on old behavour
enqueue(Correlation, Msg, State0#state{queue_status = go});
1 ->
%% were running the new version on the leader do sync initialisation
%% of enqueuer session
Reg = rabbit_fifo:make_register_enqueuer(self()),
- case ra:process_command(Server, Reg, Timeout) of
- {ok, reject_publish, _} ->
- {reject_publish, State0#state{queue_status = reject_publish}};
- {ok, ok, _} ->
- enqueue(Correlation, Msg, State0#state{queue_status = go});
+ case ra:process_command(Servers, Reg, Timeout) of
+ {ok, reject_publish, Leader} ->
+ {reject_publish, State0#state{leader = Leader,
+ queue_status = reject_publish}};
+ {ok, ok, Leader} ->
+ enqueue(Correlation, Msg, State0#state{leader = Leader,
+ queue_status = go});
+ {error, {no_more_servers_to_try, _Errs}} ->
+ %% if we are not able to process the register command
+ %% it is safe to reject the message as we never attempted
+ %% to send it
+ {reject_publish, State0};
+ %% TODO: not convinced this can ever happen when using
+ %% a list of servers
{timeout, _} ->
- %% if we timeout it is probably better to reject
- %% the message than being uncertain
{reject_publish, State0};
Err ->
exit(Err)
@@ -387,6 +395,20 @@ checkout(ConsumerTag, NumUnsettled, CreditMode, Meta,
ack = Ack}, CDels0),
try_process_command(Servers, Cmd, State0#state{consumer_deliveries = SDels}).
+
+-spec query_single_active_consumer(state()) ->
+ {ok, term()} | {error, term()} | {timeout, term()}.
+query_single_active_consumer(#state{leader = undefined}) ->
+ {error, leader_not_known};
+query_single_active_consumer(#state{leader = Leader}) ->
+ case ra:local_query(Leader, fun rabbit_fifo:query_single_active_consumer/1,
+ ?COMMAND_TIMEOUT) of
+ {ok, {_, Reply}, _} ->
+ {ok, Reply};
+ Err ->
+ Err
+ end.
+
%% @doc Provide credit to the queue
%%
%% This only has an effect if the consumer uses credit mode: credited
@@ -436,8 +458,8 @@ cancel_checkout(ConsumerTag, #state{consumer_deliveries = CDels} = State0) ->
%% @doc Purges all the messages from a rabbit_fifo queue and returns the number
%% of messages purged.
-spec purge(ra:server_id()) -> {ok, non_neg_integer()} | {error | timeout, term()}.
-purge(Node) ->
- case ra:process_command(Node, rabbit_fifo:make_purge()) of
+purge(Server) ->
+ case ra:process_command(Server, rabbit_fifo:make_purge(), ?COMMAND_TIMEOUT) of
{ok, {purge, Reply}, _} ->
{ok, Reply};
Err ->
@@ -474,7 +496,7 @@ cluster_name(#state{cfg = #cfg{cluster_name = ClusterName}}) ->
ClusterName.
update_machine_state(Server, Conf) ->
- case ra:process_command(Server, rabbit_fifo:make_update_config(Conf)) of
+ case ra:process_command(Server, rabbit_fifo:make_update_config(Conf), ?COMMAND_TIMEOUT) of
{ok, ok, _} ->
ok;
Err ->
@@ -632,8 +654,9 @@ untracked_enqueue([Node | _], Msg) ->
%% Internal
-try_process_command([Server | Rem], Cmd, State) ->
- case ra:process_command(Server, Cmd, 30000) of
+try_process_command([Server | Rem], Cmd,
+ #state{cfg = #cfg{timeout = Timeout}} = State) ->
+ case ra:process_command(Server, Cmd, Timeout) of
{ok, _, Leader} ->
{ok, State#state{leader = Leader}};
Err when length(Rem) =:= 0 ->
@@ -793,7 +816,7 @@ get_missing_deliveries(Leader, From, To, ConsumerTag) ->
Query = fun (State) ->
rabbit_fifo:get_checked_out(ConsumerId, From, To, State)
end,
- case ra:local_query(Leader, Query) of
+ case ra:local_query(Leader, Query, ?COMMAND_TIMEOUT) of
{ok, {_, Missing}, _} ->
Missing;
{error, Error} ->
diff --git a/deps/rabbit/src/rabbit_queue_type.erl b/deps/rabbit/src/rabbit_queue_type.erl
index 886e0ba7e8..128d0a2bd0 100644
--- a/deps/rabbit/src/rabbit_queue_type.erl
+++ b/deps/rabbit/src/rabbit_queue_type.erl
@@ -384,7 +384,9 @@ recover(VHost, Qs) ->
rabbit_quorum_queue => [],
rabbit_stream_queue => []}, Qs),
maps:fold(fun (Mod, Queues, {R0, F0}) ->
- {R, F} = Mod:recover(VHost, Queues),
+ {Taken, {R, F}} = timer:tc(Mod, recover, [VHost, Queues]),
+ rabbit_log:info("Recovering ~b queues of type ~s took ~bms",
+ [length(Queues), Mod, Taken div 1000]),
{R0 ++ R, F0 ++ F}
end, {[], []}, ByType).
diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl
index 0131d1538b..9c46adb77b 100644
--- a/deps/rabbit/src/rabbit_quorum_queue.erl
+++ b/deps/rabbit/src/rabbit_quorum_queue.erl
@@ -415,7 +415,9 @@ handle_tick(QName,
%% this makes calls to remote processes so cannot be run inside the
%% ra server
Self = self(),
- _ = spawn(fun() ->
+ _ = spawn(
+ fun() ->
+ try
R = reductions(Name),
rabbit_core_metrics:queue_stats(QName, MR, MU, M, R),
Util = case C of
@@ -454,7 +456,11 @@ handle_tick(QName,
ok
end
- end),
+ catch
+ _:_ ->
+ ok
+ end
+ end),
ok.
repair_leader_record(QName, Self) ->
@@ -704,7 +710,7 @@ dequeue(NoAck, _LimiterPid, CTag0, QState0) ->
rabbit_queue_type:consume_spec(),
rabbit_fifo_client:state()) ->
{ok, rabbit_fifo_client:state(), rabbit_queue_type:actions()} |
- {error, global_qos_not_supported_for_queue_type}.
+ {error, global_qos_not_supported_for_queue_type | timeout}.
consume(Q, #{limiter_active := true}, _State)
when ?amqqueue_is_quorum(Q) ->
{error, global_qos_not_supported_for_queue_type};
@@ -720,7 +726,6 @@ consume(Q, Spec, QState0) when ?amqqueue_is_quorum(Q) ->
%% TODO: validate consumer arguments
%% currently quorum queues do not support any arguments
QName = amqqueue:get_name(Q),
- QPid = amqqueue:get_pid(Q),
maybe_send_reply(ChPid, OkMsg),
ConsumerTag = quorum_ctag(ConsumerTag0),
%% A prefetch count of 0 means no limitation,
@@ -752,36 +757,43 @@ consume(Q, Spec, QState0) when ?amqqueue_is_quorum(Q) ->
QState1);
_ -> QState1
end,
- case ra:local_query(QPid,
- fun rabbit_fifo:query_single_active_consumer/1) of
- {ok, {_, SacResult}, _} ->
- SingleActiveConsumerOn = single_active_consumer_on(Q),
- {IsSingleActiveConsumer, ActivityStatus} = case {SingleActiveConsumerOn, SacResult} of
- {false, _} ->
- {true, up};
- {true, {value, {ConsumerTag, ChPid}}} ->
- {true, single_active};
- _ ->
- {false, waiting}
- end,
+ case single_active_consumer_on(Q) of
+ true ->
+ %% get the leader from state
+ case rabbit_fifo_client:query_single_active_consumer(QState) of
+ {ok, SacResult} ->
+ ActivityStatus = case SacResult of
+ {value, {ConsumerTag, ChPid}} ->
+ single_active;
+ _ ->
+ waiting
+ end,
+ rabbit_core_metrics:consumer_created(
+ ChPid, ConsumerTag, ExclusiveConsume,
+ AckRequired, QName,
+ ConsumerPrefetchCount, true, %% IsSingleSctiveconsumer
+ ActivityStatus, Args),
+ emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
+ AckRequired, QName, Prefetch,
+ Args, none, ActingUser),
+ {ok, QState, []};
+ {error, Error} ->
+ Error;
+ {timeout, _} ->
+ {error, timeout}
+ end;
+ false ->
rabbit_core_metrics:consumer_created(
- ChPid, ConsumerTag, ExclusiveConsume,
- AckRequired, QName,
- ConsumerPrefetchCount, IsSingleActiveConsumer,
- ActivityStatus, Args),
+ ChPid, ConsumerTag, ExclusiveConsume,
+ AckRequired, QName,
+ ConsumerPrefetchCount, false, %% issingleactiveconsumer
+ up, Args),
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
- AckRequired, QName, Prefetch,
- Args, none, ActingUser),
- {ok, QState, []};
- {error, Error} ->
- Error;
- {timeout, _} ->
- {error, timeout}
+ AckRequired, QName, Prefetch,
+ Args, none, ActingUser),
+ {ok, QState, []}
end.
-% -spec basic_cancel(rabbit_types:ctag(), ChPid :: pid(), any(), rabbit_fifo_client:state()) ->
-% {'ok', rabbit_fifo_client:state()}.
-
cancel(_Q, ConsumerTag, OkMsg, _ActingUser, State) ->
maybe_send_reply(self(), OkMsg),
rabbit_fifo_client:cancel_checkout(quorum_ctag(ConsumerTag), State).
@@ -898,8 +910,8 @@ stat(Q, Timeout) when ?is_amqqueue(Q) ->
-spec purge(amqqueue:amqqueue()) ->
{ok, non_neg_integer()}.
purge(Q) when ?is_amqqueue(Q) ->
- Node = amqqueue:get_pid(Q),
- rabbit_fifo_client:purge(Node).
+ Server = amqqueue:get_pid(Q),
+ rabbit_fifo_client:purge(Server).
requeue(ConsumerTag, MsgIds, QState) ->
rabbit_fifo_client:return(quorum_ctag(ConsumerTag), MsgIds, QState).
diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl
index 1de9d1f5db..cdbb59d12b 100644
--- a/deps/rabbit/test/quorum_queue_SUITE.erl
+++ b/deps/rabbit/test/quorum_queue_SUITE.erl
@@ -1450,7 +1450,7 @@ confirm_availability_on_leader_change(Config) ->
Ch = rabbit_ct_client_helpers:open_channel(Config, Node1),
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
ConfirmLoop = fun Loop() ->
- ok = publish_confirm(Ch, QQ, 5000),
+ ok = publish_confirm(Ch, QQ, 15000),
receive
{done, P} ->
P ! publisher_done,
@@ -1470,9 +1470,13 @@ confirm_availability_on_leader_change(Config) ->
timer:sleep(500),
Publisher ! {done, self()},
receive
- publisher_done -> ok;
- {'EXIT', Publisher, Err} -> exit(Err)
+ publisher_done ->
+ ok;
+ {'EXIT', Publisher, Err} ->
+ ok = rabbit_ct_broker_helpers:start_node(Config, Node2),
+ exit(Err)
after 30000 ->
+ ok = rabbit_ct_broker_helpers:start_node(Config, Node2),
flush(100),
exit(nothing_received_from_publisher_process)
end,