diff options
author | Michael Klishin <klishinm@vmware.com> | 2021-11-08 16:05:11 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-11-08 16:05:11 +0300 |
commit | 339322d339300d26179a2ca4a2ca6e9a7669aff2 (patch) | |
tree | a60deaec3a4dd57c6c5e67be8dd03e548a1839b1 | |
parent | d3d1caff1ecb9d6adfbe9e275b82ff79a16c7e92 (diff) | |
parent | 3d7edcea921a9fc15349475a61f0c47b6cc668ae (diff) | |
download | rabbitmq-server-git-339322d339300d26179a2ca4a2ca6e9a7669aff2.tar.gz |
Merge pull request #3628 from rabbitmq/qq-register-enqueuer-reliability
QQ: stability and channel side improvements
-rw-r--r-- | deps/rabbit/src/rabbit_amqqueue.erl | 2 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_channel.erl | 6 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_fifo_client.erl | 59 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_queue_type.erl | 4 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_quorum_queue.erl | 76 | ||||
-rw-r--r-- | deps/rabbit/test/quorum_queue_SUITE.erl | 10 |
6 files changed, 102 insertions, 55 deletions
diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index 96db3bac07..c7c774dee0 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -677,7 +677,7 @@ priv_absent(QueueName, QPid, true, nodedown) -> rabbit_misc:protocol_error( not_found, "home node '~s' of durable ~s is down or inaccessible", - [node(QPid), rabbit_misc:rs(QueueName)]); + [amqqueue:qnode(QPid), rabbit_misc:rs(QueueName)]); priv_absent(QueueName, _QPid, _IsDurable, stopped) -> rabbit_misc:protocol_error( diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index e50fba8596..2d5437421d 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -1475,6 +1475,10 @@ handle_method(#'basic.consume'{queue = QueueNameBin, rabbit_misc:protocol_error( not_implemented, "~s does not support global qos", [rabbit_misc:rs(QueueName)]); + {error, timeout} -> + rabbit_misc:protocol_error( + internal_error, "~s timeout occurred during consume operation", + [rabbit_misc:rs(QueueName)]); {error, no_local_stream_replica_available} -> rabbit_misc:protocol_error( resource_error, "~s does not not have a running local replica", @@ -1803,6 +1807,8 @@ basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag, E; {{error, no_local_stream_replica_available} = E, _Q} -> E; + {{error, timeout} = E, _Q} -> + E; {{protocol_error, Type, Reason, ReasonArgs}, _Q} -> rabbit_misc:protocol_error(Type, Reason, ReasonArgs) end. diff --git a/deps/rabbit/src/rabbit_fifo_client.erl b/deps/rabbit/src/rabbit_fifo_client.erl index 7ac1d7ba6e..f9beb7928b 100644 --- a/deps/rabbit/src/rabbit_fifo_client.erl +++ b/deps/rabbit/src/rabbit_fifo_client.erl @@ -31,13 +31,15 @@ update_machine_state/2, pending_size/1, stat/1, - stat/2 + stat/2, + query_single_active_consumer/1 ]). -include_lib("rabbit_common/include/rabbit.hrl"). -define(SOFT_LIMIT, 32). -define(TIMER_TIME, 10000). +-define(COMMAND_TIMEOUT, 30000). -type seq() :: non_neg_integer(). %% last_applied is initialised to -1 @@ -142,27 +144,33 @@ init(ClusterName = #resource{}, Servers, SoftLimit, BlockFun, UnblockFun) -> enqueue(Correlation, Msg, #state{queue_status = undefined, next_enqueue_seq = 1, - cfg = #cfg{timeout = Timeout}} = State0) -> + cfg = #cfg{servers = Servers, + timeout = Timeout}} = State0) -> %% it is the first enqueue, check the version - {_, Node} = Server = pick_server(State0), + {_, Node} = pick_server(State0), case rpc:call(Node, ra_machine, version, [{machine, rabbit_fifo, #{}}]) of 0 -> %% the leader is running the old version - %% so we can't initialize the enqueuer session safely - %% fall back on old behavour enqueue(Correlation, Msg, State0#state{queue_status = go}); 1 -> %% were running the new version on the leader do sync initialisation %% of enqueuer session Reg = rabbit_fifo:make_register_enqueuer(self()), - case ra:process_command(Server, Reg, Timeout) of - {ok, reject_publish, _} -> - {reject_publish, State0#state{queue_status = reject_publish}}; - {ok, ok, _} -> - enqueue(Correlation, Msg, State0#state{queue_status = go}); + case ra:process_command(Servers, Reg, Timeout) of + {ok, reject_publish, Leader} -> + {reject_publish, State0#state{leader = Leader, + queue_status = reject_publish}}; + {ok, ok, Leader} -> + enqueue(Correlation, Msg, State0#state{leader = Leader, + queue_status = go}); + {error, {no_more_servers_to_try, _Errs}} -> + %% if we are not able to process the register command + %% it is safe to reject the message as we never attempted + %% to send it + {reject_publish, State0}; + %% TODO: not convinced this can ever happen when using + %% a list of servers {timeout, _} -> - %% if we timeout it is probably better to reject - %% the message than being uncertain {reject_publish, State0}; Err -> exit(Err) @@ -387,6 +395,20 @@ checkout(ConsumerTag, NumUnsettled, CreditMode, Meta, ack = Ack}, CDels0), try_process_command(Servers, Cmd, State0#state{consumer_deliveries = SDels}). + +-spec query_single_active_consumer(state()) -> + {ok, term()} | {error, term()} | {timeout, term()}. +query_single_active_consumer(#state{leader = undefined}) -> + {error, leader_not_known}; +query_single_active_consumer(#state{leader = Leader}) -> + case ra:local_query(Leader, fun rabbit_fifo:query_single_active_consumer/1, + ?COMMAND_TIMEOUT) of + {ok, {_, Reply}, _} -> + {ok, Reply}; + Err -> + Err + end. + %% @doc Provide credit to the queue %% %% This only has an effect if the consumer uses credit mode: credited @@ -436,8 +458,8 @@ cancel_checkout(ConsumerTag, #state{consumer_deliveries = CDels} = State0) -> %% @doc Purges all the messages from a rabbit_fifo queue and returns the number %% of messages purged. -spec purge(ra:server_id()) -> {ok, non_neg_integer()} | {error | timeout, term()}. -purge(Node) -> - case ra:process_command(Node, rabbit_fifo:make_purge()) of +purge(Server) -> + case ra:process_command(Server, rabbit_fifo:make_purge(), ?COMMAND_TIMEOUT) of {ok, {purge, Reply}, _} -> {ok, Reply}; Err -> @@ -474,7 +496,7 @@ cluster_name(#state{cfg = #cfg{cluster_name = ClusterName}}) -> ClusterName. update_machine_state(Server, Conf) -> - case ra:process_command(Server, rabbit_fifo:make_update_config(Conf)) of + case ra:process_command(Server, rabbit_fifo:make_update_config(Conf), ?COMMAND_TIMEOUT) of {ok, ok, _} -> ok; Err -> @@ -632,8 +654,9 @@ untracked_enqueue([Node | _], Msg) -> %% Internal -try_process_command([Server | Rem], Cmd, State) -> - case ra:process_command(Server, Cmd, 30000) of +try_process_command([Server | Rem], Cmd, + #state{cfg = #cfg{timeout = Timeout}} = State) -> + case ra:process_command(Server, Cmd, Timeout) of {ok, _, Leader} -> {ok, State#state{leader = Leader}}; Err when length(Rem) =:= 0 -> @@ -793,7 +816,7 @@ get_missing_deliveries(Leader, From, To, ConsumerTag) -> Query = fun (State) -> rabbit_fifo:get_checked_out(ConsumerId, From, To, State) end, - case ra:local_query(Leader, Query) of + case ra:local_query(Leader, Query, ?COMMAND_TIMEOUT) of {ok, {_, Missing}, _} -> Missing; {error, Error} -> diff --git a/deps/rabbit/src/rabbit_queue_type.erl b/deps/rabbit/src/rabbit_queue_type.erl index 886e0ba7e8..128d0a2bd0 100644 --- a/deps/rabbit/src/rabbit_queue_type.erl +++ b/deps/rabbit/src/rabbit_queue_type.erl @@ -384,7 +384,9 @@ recover(VHost, Qs) -> rabbit_quorum_queue => [], rabbit_stream_queue => []}, Qs), maps:fold(fun (Mod, Queues, {R0, F0}) -> - {R, F} = Mod:recover(VHost, Queues), + {Taken, {R, F}} = timer:tc(Mod, recover, [VHost, Queues]), + rabbit_log:info("Recovering ~b queues of type ~s took ~bms", + [length(Queues), Mod, Taken div 1000]), {R0 ++ R, F0 ++ F} end, {[], []}, ByType). diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 0131d1538b..9c46adb77b 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -415,7 +415,9 @@ handle_tick(QName, %% this makes calls to remote processes so cannot be run inside the %% ra server Self = self(), - _ = spawn(fun() -> + _ = spawn( + fun() -> + try R = reductions(Name), rabbit_core_metrics:queue_stats(QName, MR, MU, M, R), Util = case C of @@ -454,7 +456,11 @@ handle_tick(QName, ok end - end), + catch + _:_ -> + ok + end + end), ok. repair_leader_record(QName, Self) -> @@ -704,7 +710,7 @@ dequeue(NoAck, _LimiterPid, CTag0, QState0) -> rabbit_queue_type:consume_spec(), rabbit_fifo_client:state()) -> {ok, rabbit_fifo_client:state(), rabbit_queue_type:actions()} | - {error, global_qos_not_supported_for_queue_type}. + {error, global_qos_not_supported_for_queue_type | timeout}. consume(Q, #{limiter_active := true}, _State) when ?amqqueue_is_quorum(Q) -> {error, global_qos_not_supported_for_queue_type}; @@ -720,7 +726,6 @@ consume(Q, Spec, QState0) when ?amqqueue_is_quorum(Q) -> %% TODO: validate consumer arguments %% currently quorum queues do not support any arguments QName = amqqueue:get_name(Q), - QPid = amqqueue:get_pid(Q), maybe_send_reply(ChPid, OkMsg), ConsumerTag = quorum_ctag(ConsumerTag0), %% A prefetch count of 0 means no limitation, @@ -752,36 +757,43 @@ consume(Q, Spec, QState0) when ?amqqueue_is_quorum(Q) -> QState1); _ -> QState1 end, - case ra:local_query(QPid, - fun rabbit_fifo:query_single_active_consumer/1) of - {ok, {_, SacResult}, _} -> - SingleActiveConsumerOn = single_active_consumer_on(Q), - {IsSingleActiveConsumer, ActivityStatus} = case {SingleActiveConsumerOn, SacResult} of - {false, _} -> - {true, up}; - {true, {value, {ConsumerTag, ChPid}}} -> - {true, single_active}; - _ -> - {false, waiting} - end, + case single_active_consumer_on(Q) of + true -> + %% get the leader from state + case rabbit_fifo_client:query_single_active_consumer(QState) of + {ok, SacResult} -> + ActivityStatus = case SacResult of + {value, {ConsumerTag, ChPid}} -> + single_active; + _ -> + waiting + end, + rabbit_core_metrics:consumer_created( + ChPid, ConsumerTag, ExclusiveConsume, + AckRequired, QName, + ConsumerPrefetchCount, true, %% IsSingleSctiveconsumer + ActivityStatus, Args), + emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, + AckRequired, QName, Prefetch, + Args, none, ActingUser), + {ok, QState, []}; + {error, Error} -> + Error; + {timeout, _} -> + {error, timeout} + end; + false -> rabbit_core_metrics:consumer_created( - ChPid, ConsumerTag, ExclusiveConsume, - AckRequired, QName, - ConsumerPrefetchCount, IsSingleActiveConsumer, - ActivityStatus, Args), + ChPid, ConsumerTag, ExclusiveConsume, + AckRequired, QName, + ConsumerPrefetchCount, false, %% issingleactiveconsumer + up, Args), emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, - AckRequired, QName, Prefetch, - Args, none, ActingUser), - {ok, QState, []}; - {error, Error} -> - Error; - {timeout, _} -> - {error, timeout} + AckRequired, QName, Prefetch, + Args, none, ActingUser), + {ok, QState, []} end. -% -spec basic_cancel(rabbit_types:ctag(), ChPid :: pid(), any(), rabbit_fifo_client:state()) -> -% {'ok', rabbit_fifo_client:state()}. - cancel(_Q, ConsumerTag, OkMsg, _ActingUser, State) -> maybe_send_reply(self(), OkMsg), rabbit_fifo_client:cancel_checkout(quorum_ctag(ConsumerTag), State). @@ -898,8 +910,8 @@ stat(Q, Timeout) when ?is_amqqueue(Q) -> -spec purge(amqqueue:amqqueue()) -> {ok, non_neg_integer()}. purge(Q) when ?is_amqqueue(Q) -> - Node = amqqueue:get_pid(Q), - rabbit_fifo_client:purge(Node). + Server = amqqueue:get_pid(Q), + rabbit_fifo_client:purge(Server). requeue(ConsumerTag, MsgIds, QState) -> rabbit_fifo_client:return(quorum_ctag(ConsumerTag), MsgIds, QState). diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 1de9d1f5db..cdbb59d12b 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -1450,7 +1450,7 @@ confirm_availability_on_leader_change(Config) -> Ch = rabbit_ct_client_helpers:open_channel(Config, Node1), #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), ConfirmLoop = fun Loop() -> - ok = publish_confirm(Ch, QQ, 5000), + ok = publish_confirm(Ch, QQ, 15000), receive {done, P} -> P ! publisher_done, @@ -1470,9 +1470,13 @@ confirm_availability_on_leader_change(Config) -> timer:sleep(500), Publisher ! {done, self()}, receive - publisher_done -> ok; - {'EXIT', Publisher, Err} -> exit(Err) + publisher_done -> + ok; + {'EXIT', Publisher, Err} -> + ok = rabbit_ct_broker_helpers:start_node(Config, Node2), + exit(Err) after 30000 -> + ok = rabbit_ct_broker_helpers:start_node(Config, Node2), flush(100), exit(nothing_received_from_publisher_process) end, |