diff options
author | kjnilsson <knilsson@pivotal.io> | 2021-03-18 09:50:32 +0000 |
---|---|---|
committer | Michael Klishin <michael@clojurewerkz.org> | 2021-03-22 21:44:15 +0300 |
commit | f6f02a5d2dd8e2ff5e00f8623744635fc0b1e922 (patch) | |
tree | 8f43b2375da12ee806dde17a40b3e23f0833bb13 | |
parent | 246f50598bed8b4e4d27b690b2a62752e997a0df (diff) | |
download | rabbitmq-server-git-f6f02a5d2dd8e2ff5e00f8623744635fc0b1e922.tar.gz |
ra systems wip
-rw-r--r-- | deps/rabbit/src/rabbit.erl | 16 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_fifo.erl | 8 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_maintenance.erl | 6 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_quorum_memory_manager.erl | 2 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_quorum_queue.erl | 65 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_stream_coordinator.erl | 46 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_stream_coordinator.hrl | 1 | ||||
-rw-r--r-- | deps/rabbit/test/quorum_queue_SUITE.erl | 49 | ||||
-rw-r--r-- | deps/rabbit/test/rabbit_fifo_int_SUITE.erl | 42 | ||||
-rw-r--r-- | rabbitmq-components.mk | 2 |
10 files changed, 142 insertions, 95 deletions
diff --git a/deps/rabbit/src/rabbit.erl b/deps/rabbit/src/rabbit.erl index 877891236e..5a1e807882 100644 --- a/deps/rabbit/src/rabbit.erl +++ b/deps/rabbit/src/rabbit.erl @@ -367,6 +367,22 @@ run_prelaunch_second_phase() -> ?LOG_DEBUG(""), ?LOG_DEBUG("== Prelaunch DONE =="), + ?LOG_DEBUG("Starting Ra Systems"), + Default = ra_system:default_config(), + Quorum = Default#{name => quorum}, + % names => ra_system:derive_names(quorum)}, + CoordDataDir = filename:join([rabbit_mnesia:dir(), "coordination", node()]), + Coord = Default#{name => coordination, + data_dir => CoordDataDir, + wal_data_dir => CoordDataDir, + names => ra_system:derive_names(coordination)}, + + {ok, _} = ra_system:start(Quorum), + {ok, _} = ra_system:start(Coord), + + ?LOG_DEBUG(""), + ?LOG_DEBUG("== Ra System Start done DONE =="), + case IsInitialPass of true -> rabbit_prelaunch:initial_pass_finished(); false -> ok diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index 7f4a9d5122..9c6b3f9cbe 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -775,11 +775,11 @@ init_aux(Name) when is_atom(Name) -> #aux{name = Name, capacity = {inactive, Now, 1, 1.0}}. -handle_aux(leader, _, garbage_collection, State, Log, _MacState) -> - ra_log_wal:force_roll_over(ra_log_wal), - {no_reply, State, Log}; +handle_aux(leader, _, garbage_collection, State, Log, MacState) -> + % ra_log_wal:force_roll_over(ra_log_wal), + {no_reply, force_eval_gc(Log, MacState, State), Log}; handle_aux(follower, _, garbage_collection, State, Log, MacState) -> - ra_log_wal:force_roll_over(ra_log_wal), + % ra_log_wal:force_roll_over(ra_log_wal), {no_reply, force_eval_gc(Log, MacState, State), Log}; handle_aux(_RaState, cast, eval, Aux0, Log, _MacState) -> {no_reply, Aux0, Log}; diff --git a/deps/rabbit/src/rabbit_maintenance.erl b/deps/rabbit/src/rabbit_maintenance.erl index a88d3de0c5..97de68bc6f 100644 --- a/deps/rabbit/src/rabbit_maintenance.erl +++ b/deps/rabbit/src/rabbit_maintenance.erl @@ -236,7 +236,7 @@ transfer_leadership_of_quorum_queues(_TransferCandidates) -> %% by simply shutting its local QQ replica (Ra server) RaLeader = amqqueue:get_pid(Q), rabbit_log:debug("Will stop Ra server ~p", [RaLeader]), - case ra:stop_server(RaLeader) of + case rabbit_quorum_queue:stop_server(RaLeader) of ok -> rabbit_log:debug("Successfully stopped Ra server ~p", [RaLeader]); {error, nodedown} -> @@ -296,7 +296,7 @@ stop_local_quorum_queue_followers() -> {RegisteredName, _LeaderNode} = amqqueue:get_pid(Q), RaNode = {RegisteredName, node()}, rabbit_log:debug("Will stop Ra server ~p", [RaNode]), - case ra:stop_server(RaNode) of + case rabbit_quorum_queue:stop_server(RaNode) of ok -> rabbit_log:debug("Successfully stopped Ra server ~p", [RaNode]); {error, nodedown} -> @@ -339,7 +339,7 @@ revive_local_quorum_queue_replicas() -> {Prefix, _Node} = amqqueue:get_pid(Q), RaServer = {Prefix, node()}, rabbit_log:debug("Will start Ra server ~p", [RaServer]), - case ra:restart_server(RaServer) of + case rabbit_quorum_queue:restart_server(RaServer) of ok -> rabbit_log:debug("Successfully restarted Ra server ~p", [RaServer]); {error, {already_started, _Pid}} -> diff --git a/deps/rabbit/src/rabbit_quorum_memory_manager.erl b/deps/rabbit/src/rabbit_quorum_memory_manager.erl index 5e5e6084bb..6ff14090f7 100644 --- a/deps/rabbit/src/rabbit_quorum_memory_manager.erl +++ b/deps/rabbit/src/rabbit_quorum_memory_manager.erl @@ -60,7 +60,7 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. force_roll_over(State) -> - ra_log_wal:force_roll_over(ra_log_wal), + rabbit_quorum_queue:wal_force_roll_over(node()), State#state{last_roll_over = erlang:system_time(millisecond)}. interval() -> diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index d1e2d73603..6d35aa9a7c 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -13,7 +13,14 @@ close/1, update/2, handle_event/2]). --export([is_recoverable/1, recover/2, stop/1, delete/4, delete_immediately/2]). +-export([is_recoverable/1, + recover/2, + stop/1, + start_server/1, + restart_server/1, + stop_server/1, + delete/4, + delete_immediately/2]). -export([state_info/1, info/2, stat/1, infos/1]). -export([settle/4, dequeue/4, consume/3, cancel/5]). -export([credit/4]). @@ -38,16 +45,20 @@ -export([shrink_all/1, grow/4]). -export([transfer_leadership/2, get_replicas/1, queue_length/1]). --export([file_handle_leader_reservation/1, file_handle_other_reservation/0]). +-export([file_handle_leader_reservation/1, + file_handle_other_reservation/0]). -export([file_handle_release_reservation/0]). --export([list_with_minimum_quorum/0, list_with_minimum_quorum_for_cli/0, - filter_quorum_critical/1, filter_quorum_critical/2, +-export([list_with_minimum_quorum/0, + list_with_minimum_quorum_for_cli/0, + filter_quorum_critical/1, + filter_quorum_critical/2, all_replica_states/0]). -export([capabilities/0]). -export([repair_amqqueue_nodes/1, repair_amqqueue_nodes/2 ]). --export([reclaim_memory/2]). +-export([reclaim_memory/2, + wal_force_roll_over/1]). -export([notify_decorators/1, notify_decorators/3, spawn_notify_decorators/3]). @@ -65,6 +76,9 @@ -type msg_id() :: non_neg_integer(). -type qmsg() :: {rabbit_types:r('queue'), pid(), msg_id(), boolean(), rabbit_types:message()}. +-define(RA_SYSTEM, quorum). +-define(RA_WAL_NAME, ra_log_wal). + -define(STATISTICS_KEYS, [policy, operator_policy, @@ -168,7 +182,7 @@ start_cluster(Q) -> TickTimeout = application:get_env(rabbit, quorum_tick_interval, ?TICK_TIMEOUT), RaConfs = [make_ra_conf(NewQ, ServerId, TickTimeout) || ServerId <- members(NewQ)], - case ra:start_cluster(RaConfs) of + case ra:start_cluster(?RA_SYSTEM, RaConfs) of {ok, _, _} -> %% ensure the latest config is evaluated properly %% even when running the machine version from 0 @@ -506,7 +520,7 @@ recover(_Vhost, Queues) -> QName = amqqueue:get_name(Q0), Nodes = get_nodes(Q0), Formatter = {?MODULE, format_ra_event, [QName]}, - Res = case ra:restart_server({Name, node()}, + Res = case ra:restart_server(?RA_SYSTEM, {Name, node()}, #{ra_event_formatter => Formatter}) of ok -> % queue was restarted, good @@ -518,7 +532,8 @@ recover(_Vhost, Queues) -> % so needs to be started from scratch. Machine = ra_machine(Q0), RaNodes = [{Name, Node} || Node <- Nodes], - case ra:start_server(Name, {Name, node()}, Machine, RaNodes) of + case ra:start_server(?RA_SYSTEM, Name, {Name, node()}, + Machine, RaNodes) of ok -> ok; Err2 -> rabbit_log:warning("recover: quorum queue ~w could not" @@ -553,10 +568,22 @@ recover(_Vhost, Queues) -> stop(VHost) -> _ = [begin Pid = amqqueue:get_pid(Q), - ra:stop_server(Pid) + ra:stop_server(?RA_SYSTEM, Pid) end || Q <- find_quorum_queues(VHost)], ok. +-spec stop_server({atom(), node()}) -> ok | {error, term()}. +stop_server({_, _} = Ref) -> + ra:stop_server(?RA_SYSTEM, Ref). + +-spec start_server(map()) -> ok | {error, term()}. +start_server(Conf) when is_map(Conf) -> + ra:start_server(?RA_SYSTEM, Conf). + +-spec restart_server({atom(), node()}) -> ok | {error, term()}. +restart_server({_, _} = Ref) -> + ra:restart_server(?RA_SYSTEM, Ref). + -spec delete(amqqueue:amqqueue(), boolean(), boolean(), rabbit_types:username()) -> @@ -617,7 +644,7 @@ delete(Q, _IfUnused, _IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) -> force_delete_queue(Servers) -> [begin - case catch(ra:force_delete_server(S)) of + case catch(ra:force_delete_server(?RA_SYSTEM, S)) of ok -> ok; Err -> rabbit_log:warning( @@ -877,19 +904,19 @@ cleanup_data_dir() -> || Q <- rabbit_amqqueue:list_by_type(?MODULE), lists:member(node(), get_nodes(Q))], NoQQClusters = rabbit_ra_registry:list_not_quorum_clusters(), - Registered = ra_directory:list_registered(), + Registered = ra_directory:list_registered(?RA_SYSTEM), Running = Names ++ NoQQClusters, _ = [maybe_delete_data_dir(UId) || {Name, UId} <- Registered, not lists:member(Name, Running)], ok. maybe_delete_data_dir(UId) -> - Dir = ra_env:server_data_dir(UId), + Dir = ra_env:server_data_dir(?RA_SYSTEM, UId), {ok, Config} = ra_log:read_config(Dir), case maps:get(machine, Config) of {module, rabbit_fifo, _} -> ra_lib:recursive_delete(Dir), - ra_directory:unregister_name(UId); + ra_directory:unregister_name(?RA_SYSTEM, UId); _ -> ok end. @@ -999,7 +1026,7 @@ add_member(Q, Node, Timeout) when ?amqqueue_is_quorum(Q) -> TickTimeout = application:get_env(rabbit, quorum_tick_interval, ?TICK_TIMEOUT), Conf = make_ra_conf(Q, ServerId, TickTimeout), - case ra:start_server(Conf) of + case ra:start_server(?RA_SYSTEM, Conf) of ok -> case ra:add_member(Members, ServerId, Timeout) of {ok, _, Leader} -> @@ -1014,11 +1041,11 @@ add_member(Q, Node, Timeout) when ?amqqueue_is_quorum(Q) -> fun() -> rabbit_amqqueue:update(QName, Fun) end), ok; {timeout, _} -> - _ = ra:force_delete_server(ServerId), + _ = ra:force_delete_server(?RA_SYSTEM, ServerId), _ = ra:remove_member(Members, ServerId), {error, timeout}; E -> - _ = ra:force_delete_server(ServerId), + _ = ra:force_delete_server(?RA_SYSTEM, ServerId), E end; E -> @@ -1065,7 +1092,7 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) -> end, rabbit_misc:execute_mnesia_transaction( fun() -> rabbit_amqqueue:update(QName, Fun) end), - case ra:force_delete_server(ServerId) of + case ra:force_delete_server(?RA_SYSTEM, ServerId) of ok -> ok; {error, {badrpc, nodedown}} -> @@ -1199,6 +1226,10 @@ reclaim_memory(Vhost, QueueName) -> E end. +-spec wal_force_roll_over(node()) -> ok. + wal_force_roll_over(Node) -> + ra_log_wal:force_roll_over({?RA_WAL_NAME, Node}). + %%---------------------------------------------------------------------------- dlx_mfa(Q) -> DLX = init_dlx(args_policy_lookup(<<"dead-letter-exchange">>, diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl index cd3439ddff..329119f3dd 100644 --- a/deps/rabbit/src/rabbit_stream_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_coordinator.erl @@ -89,10 +89,10 @@ start() -> Nodes = rabbit_mnesia:cluster_nodes(all), ServerId = {?MODULE, node()}, - case ra:restart_server(ServerId) of + case ra:restart_server(?RA_SYSTEM, ServerId) of {error, Reason} when Reason == not_started orelse Reason == name_not_registered -> - case ra:start_server(make_ra_conf(node(), Nodes)) of + case ra:start_server(?RA_SYSTEM, make_ra_conf(node(), Nodes)) of ok -> global:set_lock(?STREAM_COORDINATOR_STARTUP), case find_members(Nodes) of @@ -259,26 +259,24 @@ ensure_coordinator_started() -> case whereis(?MODULE) of undefined -> global:set_lock(?STREAM_COORDINATOR_STARTUP), - Nodes = - case ra:restart_server(Local) of - {error, Reason} when Reason == not_started orelse - Reason == name_not_registered -> - OtherNodes = all_coord_members() -- [Local], - %% We can't use find_members/0 here as a process that timeouts means the cluster is up - case lists:filter(fun(N) -> global:whereis_name(N) =/= undefined end, - OtherNodes) of - [] -> - start_coordinator_cluster(); - _ -> - OtherNodes - end; - ok -> - AllNodes; - {error, {already_started, _}} -> - AllNodes; - _ -> - AllNodes - end, + Nodes = case ra:restart_server(?RA_SYSTEM, Local) of + {error, Reason} when Reason == not_started orelse + Reason == name_not_registered -> + OtherNodes = all_coord_members() -- [Local], + %% We can't use find_members/0 here as a process that timeouts means the cluster is up + case lists:filter(fun(N) -> global:whereis_name(N) =/= undefined end, OtherNodes) of + [] -> + start_coordinator_cluster(); + _ -> + OtherNodes + end; + ok -> + AllNodes; + {error, {already_started, _}} -> + AllNodes; + _ -> + AllNodes + end, global:del_lock(?STREAM_COORDINATOR_STARTUP), Nodes; _ -> @@ -288,7 +286,7 @@ ensure_coordinator_started() -> start_coordinator_cluster() -> Nodes = rabbit_mnesia:cluster_nodes(running), rabbit_log:debug("Starting stream coordinator on nodes: ~w", [Nodes]), - case ra:start_cluster([make_ra_conf(Node, Nodes) || Node <- Nodes]) of + case ra:start_cluster(?RA_SYSTEM, [make_ra_conf(Node, Nodes) || Node <- Nodes]) of {ok, Started, _} -> rabbit_log:debug("Started stream coordinator on ~w", [Started]), Started; @@ -488,7 +486,7 @@ add_members(_, []) -> ok; add_members(Members, [Node | Nodes]) -> Conf = make_ra_conf(Node, [N || {_, N} <- Members]), - case ra:start_server(Conf) of + case ra:start_server(?RA_SYSTEM, Conf) of ok -> case ra:add_member(Members, {?MODULE, Node}) of {ok, NewMembers, _} -> diff --git a/deps/rabbit/src/rabbit_stream_coordinator.hrl b/deps/rabbit/src/rabbit_stream_coordinator.hrl index 3df16ce39c..e6bba92070 100644 --- a/deps/rabbit/src/rabbit_stream_coordinator.hrl +++ b/deps/rabbit/src/rabbit_stream_coordinator.hrl @@ -4,6 +4,7 @@ -define(RESTART_TIMEOUT, 1000). -define(PHASE_RETRY_TIMEOUT, 10000). -define(CMD_TIMEOUT, 30000). +-define(RA_SYSTEM, coordination). -type stream_id() :: string(). -type stream() :: #{conf := osiris:config(), diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 098c24b8c5..98d477278c 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -141,6 +141,7 @@ memory_tests() -> memory_alarm_rolls_wal ]. +-define(SUPNAME, ra_server_sup_sup). %% ------------------------------------------------------------------- %% Testsuite setup/teardown. %% ------------------------------------------------------------------- @@ -281,7 +282,7 @@ end_per_testcase(Testcase, Config) when Testcase == reconnect_consumer_and_publi rabbit_ct_broker_helpers:teardown_steps()), rabbit_ct_helpers:testcase_finished(Config1, Testcase); end_per_testcase(Testcase, Config) -> - catch delete_queues(), + % catch delete_queues(), Config1 = rabbit_ct_helpers:run_steps( Config, rabbit_ct_client_helpers:teardown_steps()), @@ -352,7 +353,7 @@ start_queue(Config) -> LQ = ?config(queue_name, Config), %% The stream coordinator is also a ra process, we need to ensure the quorum tests %% are not affected by any other ra cluster that could be added in the future - Children = length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])), + Children = length(rpc:call(Server, supervisor, which_children, [?SUPNAME])), ?assertEqual({'queue.declare_ok', LQ, 0, 0}, declare(Ch, LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), @@ -362,7 +363,7 @@ start_queue(Config) -> rpc:call(Server, application, which_applications, []))), Expected = Children + 1, ?assertMatch(Expected, - length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]))), + length(rpc:call(Server, supervisor, which_children, [?SUPNAME]))), %% Test declare an existing queue ?assertEqual({'queue.declare_ok', LQ, 0, 0}, @@ -379,7 +380,7 @@ start_queue(Config) -> ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, rpc:call(Server, application, which_applications, []))), ?assertMatch(Expected, - length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]))). + length(rpc:call(Server, supervisor, which_children, [?SUPNAME]))). start_queue_concurrent(Config) -> Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -439,7 +440,7 @@ stop_queue(Config) -> %% The stream coordinator is also a ra process, we need to ensure the quorum tests %% are not affected by any other ra cluster that could be added in the future - Children = length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])), + Children = length(rpc:call(Server, supervisor, which_children, [?SUPNAME])), Ch = rabbit_ct_client_helpers:open_channel(Config, Server), LQ = ?config(queue_name, Config), @@ -451,13 +452,13 @@ stop_queue(Config) -> rpc:call(Server, application, which_applications, []))), Expected = Children + 1, ?assertMatch(Expected, - length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]))), + length(rpc:call(Server, supervisor, which_children, [?SUPNAME]))), %% Delete the quorum queue ?assertMatch(#'queue.delete_ok'{}, amqp_channel:call(Ch, #'queue.delete'{queue = LQ})), %% Check that the application and process are down wait_until(fun() -> - Children == length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])) + Children == length(rpc:call(Server, supervisor, which_children, [?SUPNAME])) end), ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, rpc:call(Server, application, which_applications, []))). @@ -467,7 +468,7 @@ restart_queue(Config) -> %% The stream coordinator is also a ra process, we need to ensure the quorum tests %% are not affected by any other ra cluster that could be added in the future - Children = length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])), + Children = length(rpc:call(Server, supervisor, which_children, [?SUPNAME])), Ch = rabbit_ct_client_helpers:open_channel(Config, Server), LQ = ?config(queue_name, Config), @@ -482,7 +483,7 @@ restart_queue(Config) -> rpc:call(Server, application, which_applications, []))), Expected = Children + 1, ?assertMatch(Expected, - length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]))), + length(rpc:call(Server, supervisor, which_children, [?SUPNAME]))), Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server), delete_queues(Ch2, [LQ]). @@ -530,11 +531,11 @@ vhost_with_quorum_queue_is_deleted(Config) -> ?assertEqual({'queue.declare_ok', QName, 0, 0}, declare(Ch, QName, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - UId = rpc:call(Node, ra_directory, where_is, [RaName]), + UId = rpc:call(Node, ra_directory, where_is, [quorum, RaName]), ?assert(UId =/= undefined), ok = rabbit_ct_broker_helpers:delete_vhost(Config, VHost), %% validate quorum queues got deleted - undefined = rpc:call(Node, ra_directory, where_is, [RaName]), + undefined = rpc:call(Node, ra_directory, where_is, [quorum, RaName]), ok. restart_all_types(Config) -> @@ -544,7 +545,7 @@ restart_all_types(Config) -> %% The stream coordinator is also a ra process, we need to ensure the quorum tests %% are not affected by any other ra cluster that could be added in the future - Children = rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]), + Children = rpc:call(Server, supervisor, which_children, [?SUPNAME]), Ch = rabbit_ct_client_helpers:open_channel(Config, Server), QQ1 = <<"restart_all_types-qq1">>, @@ -577,7 +578,7 @@ restart_all_types(Config) -> Server, supervisor, which_children, - [ra_server_sup_sup])) + [?SUPNAME])) end, 60000), %% Check the classic queues restarted correctly Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server), @@ -598,7 +599,7 @@ stop_start_rabbit_app(Config) -> %% The stream coordinator is also a ra process, we need to ensure the quorum tests %% are not affected by any other ra cluster that could be added in the future - Children = length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])), + Children = length(rpc:call(Server, supervisor, which_children, [?SUPNAME])), Ch = rabbit_ct_client_helpers:open_channel(Config, Server), QQ1 = <<"stop_start_rabbit_app-qq">>, @@ -627,7 +628,7 @@ stop_start_rabbit_app(Config) -> rpc:call(Server, application, which_applications, []))), Expected = Children + 2, ?assertMatch(Expected, - length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]))), + length(rpc:call(Server, supervisor, which_children, [?SUPNAME]))), %% Check the classic queues restarted correctly Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server), {#'basic.get_ok'{}, #amqp_msg{}} = @@ -947,7 +948,7 @@ cleanup_queue_state_on_channel_after_publish(Config) -> %% The stream coordinator is also a ra process, we need to ensure the quorum tests %% are not affected by any other ra cluster that could be added in the future - Children = length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])), + Children = length(rpc:call(Server, supervisor, which_children, [?SUPNAME])), Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server), @@ -970,7 +971,7 @@ cleanup_queue_state_on_channel_after_publish(Config) -> amqp_channel:call(Ch1, #'queue.delete'{queue = QQ})), wait_until(fun() -> Children == length(rpc:call(Server, supervisor, which_children, - [ra_server_sup_sup])) + [?SUPNAME])) end), %% Check that all queue states have been cleaned wait_for_cleanup(Server, NCh2, 0), @@ -983,7 +984,7 @@ cleanup_queue_state_on_channel_after_subscribe(Config) -> %% The stream coordinator is also a ra process, we need to ensure the quorum tests %% are not affected by any other ra cluster that could be added in the future - Children = length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])), + Children = length(rpc:call(Server, supervisor, which_children, [?SUPNAME])), Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server), @@ -1011,7 +1012,7 @@ cleanup_queue_state_on_channel_after_subscribe(Config) -> wait_for_cleanup(Server, NCh2, 1), ?assertMatch(#'queue.delete_ok'{}, amqp_channel:call(Ch1, #'queue.delete'{queue = QQ})), wait_until(fun() -> - Children == length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])) + Children == length(rpc:call(Server, supervisor, which_children, [?SUPNAME])) end), %% Check that all queue states have been cleaned wait_for_cleanup(Server, NCh1, 0), @@ -1614,8 +1615,8 @@ cleanup_data_dir(Config) -> declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), timer:sleep(100), - UId1 = proplists:get_value(ra_name(QQ), rpc:call(Server1, ra_directory, list_registered, [])), - UId2 = proplists:get_value(ra_name(QQ), rpc:call(Server2, ra_directory, list_registered, [])), + UId1 = proplists:get_value(ra_name(QQ), rpc:call(Server1, ra_directory, list_registered, [quorum])), + UId2 = proplists:get_value(ra_name(QQ), rpc:call(Server2, ra_directory, list_registered, [quorum])), DataDir1 = rpc:call(Server1, ra_env, server_data_dir, [UId1]), DataDir2 = rpc:call(Server2, ra_env, server_data_dir, [UId2]), ?assert(filelib:is_dir(DataDir1)), @@ -1769,7 +1770,7 @@ delete_immediately_by_resource(Config) -> %% The stream coordinator is also a ra process, we need to ensure the quorum tests %% are not affected by any other ra cluster that could be added in the future - Children = length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])), + Children = length(rpc:call(Server, supervisor, which_children, [?SUPNAME])), QQ = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', QQ, 0, 0}, @@ -1779,7 +1780,7 @@ delete_immediately_by_resource(Config) -> %% Check that the application and process are down wait_until(fun() -> - Children == length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])) + Children == length(rpc:call(Server, supervisor, which_children, [?SUPNAME])) end), ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, rpc:call(Server, application, which_applications, []))). @@ -2082,7 +2083,7 @@ message_bytes_metrics(Config) -> memory_alarm_rolls_wal(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - WalDataDir = rpc:call(Server, ra_env, wal_data_dir, []), + #{wal_data_dir := WalDataDir} = ra_system:fetch(quorum, Server), [Wal0] = filelib:wildcard(WalDataDir ++ "/*.wal"), rabbit_ct_broker_helpers:set_alarm(Config, Server, memory), rabbit_ct_helpers:await_condition( diff --git a/deps/rabbit/test/rabbit_fifo_int_SUITE.erl b/deps/rabbit/test/rabbit_fifo_int_SUITE.erl index 37f5436dbf..1474c6b88d 100644 --- a/deps/rabbit/test/rabbit_fifo_int_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_int_SUITE.erl @@ -89,7 +89,7 @@ basics(Config) -> {ok, FState1} = rabbit_fifo_client:checkout(CustomerTag, 1, simple_prefetch, #{}, FState0), - ra_log_wal:force_roll_over(ra_log_wal), + rabbit_quorum_queue:wal_force_roll_over(node()), % create segment the segment will trigger a snapshot timer:sleep(1000), @@ -112,8 +112,8 @@ basics(Config) -> % process settle applied notification FState5b = process_ra_event(FState5, ?RA_EVENT_TIMEOUT), - _ = ra:stop_server(ServerId), - _ = ra:restart_server(ServerId), + _ = rabbit_quorum_queue:stop_server(ServerId), + _ = rabbit_quorum_queue:restart_server(ServerId), %% wait for leader change to notice server is up again receive @@ -137,7 +137,7 @@ basics(Config) -> after 2000 -> exit(await_msg_timeout) end, - ra:stop_server(ServerId), + rabbit_quorum_queue:stop_server(ServerId), ok. return(Config) -> @@ -152,7 +152,7 @@ return(Config) -> {ok, _, {_, _, MsgId, _, _}, F} = rabbit_fifo_client:dequeue(<<"tag">>, unsettled, F2), _F2 = rabbit_fifo_client:return(<<"tag">>, [MsgId], F), - ra:stop_server(ServerId), + rabbit_quorum_queue:stop_server(ServerId), ok. rabbit_fifo_returns_correlation(Config) -> @@ -172,7 +172,7 @@ rabbit_fifo_returns_correlation(Config) -> after 2000 -> exit(await_msg_timeout) end, - ra:stop_server(ServerId), + rabbit_quorum_queue:stop_server(ServerId), ok. duplicate_delivery(Config) -> @@ -207,7 +207,7 @@ duplicate_delivery(Config) -> end end, Fun(F2), - ra:stop_server(ServerId), + rabbit_quorum_queue:stop_server(ServerId), ok. usage(Config) -> @@ -223,7 +223,7 @@ usage(Config) -> ServerId ! tick_timeout, timer:sleep(50), Use = rabbit_fifo:usage(element(1, ServerId)), - ra:stop_server(ServerId), + rabbit_quorum_queue:stop_server(ServerId), ?assert(Use > 0.0), ok. @@ -245,7 +245,7 @@ resends_lost_command(Config) -> {ok, _, {_, _, _, _, msg1}, F5} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F4), {ok, _, {_, _, _, _, msg2}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5), {ok, _, {_, _, _, _, msg3}, _F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6), - ra:stop_server(ServerId), + rabbit_quorum_queue:stop_server(ServerId), ok. two_quick_enqueues(Config) -> @@ -257,7 +257,7 @@ two_quick_enqueues(Config) -> F1 = element(2, rabbit_fifo_client:enqueue(msg1, F0)), {ok, F2} = rabbit_fifo_client:enqueue(msg2, F1), _ = process_ra_events(receive_ra_events(2, 0), F2), - ra:stop_server(ServerId), + rabbit_quorum_queue:stop_server(ServerId), ok. detects_lost_delivery(Config) -> @@ -281,7 +281,7 @@ detects_lost_delivery(Config) -> % assert three deliveries were received {[_, _, _], _, _} = process_ra_events(receive_ra_events(2, 2), F3), - ra:stop_server(ServerId), + rabbit_quorum_queue:stop_server(ServerId), ok. returns_after_down(Config) -> @@ -306,7 +306,7 @@ returns_after_down(Config) -> timer:sleep(1000), % message should be available for dequeue {ok, _, {_, _, _, _, msg1}, _} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F2), - ra:stop_server(ServerId), + rabbit_quorum_queue:stop_server(ServerId), ok. resends_after_lost_applied(Config) -> @@ -331,7 +331,7 @@ resends_after_lost_applied(Config) -> {ok, _, {_, _, _, _, msg1}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5), {ok, _, {_, _, _, _, msg2}, F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6), {ok, _, {_, _, _, _, msg3}, _F8} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F7), - ra:stop_server(ServerId), + rabbit_quorum_queue:stop_server(ServerId), ok. handles_reject_notification(Config) -> @@ -355,8 +355,8 @@ handles_reject_notification(Config) -> % the applied notification _F2 = process_ra_events(receive_ra_events(1, 0), F1), - ra:stop_server(ServerId1), - ra:stop_server(ServerId2), + rabbit_quorum_queue:stop_server(ServerId1), + rabbit_quorum_queue:stop_server(ServerId2), ok. discard(Config) -> @@ -373,7 +373,7 @@ discard(Config) -> #{queue_resource => discard, dead_letter_handler => {?MODULE, dead_letter_handler, [self()]}}}}, - _ = ra:start_server(Conf), + _ = rabbit_quorum_queue:start_server(Conf), ok = ra:trigger_election(ServerId), _ = ra:members(ServerId), @@ -391,7 +391,7 @@ discard(Config) -> flush(), exit(dead_letter_timeout) end, - ra:stop_server(ServerId), + rabbit_quorum_queue:stop_server(ServerId), ok. cancel_checkout(Config) -> @@ -455,7 +455,7 @@ untracked_enqueue(Config) -> timer:sleep(100), F0 = rabbit_fifo_client:init(ClusterName, [ServerId]), {ok, _, {_, _, _, _, msg1}, _F5} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F0), - ra:stop_server(ServerId), + rabbit_quorum_queue:stop_server(ServerId), ok. @@ -470,7 +470,7 @@ flow(Config) -> {slow, F4} = rabbit_fifo_client:enqueue(m4, F3), {_, _, F5} = process_ra_events(receive_ra_events(4, 0), F4), {ok, _} = rabbit_fifo_client:enqueue(m5, F5), - ra:stop_server(ServerId), + rabbit_quorum_queue:stop_server(ServerId), ok. test_queries(Config) -> @@ -504,7 +504,7 @@ test_queries(Config) -> fun rabbit_fifo:query_processes/1), ?assertEqual(2, length(Processes)), P ! stop, - ra:stop_server(ServerId), + rabbit_quorum_queue:stop_server(ServerId), ok. dead_letter_handler(Pid, Msgs) -> @@ -527,7 +527,7 @@ dequeue(Config) -> {_, _, F4} = process_ra_events(receive_ra_events(1, 0), F4_), {ok, _, {_, _, MsgId, _, msg2}, F5} = rabbit_fifo_client:dequeue(Tag, unsettled, F4), {_F6, _A} = rabbit_fifo_client:settle(Tag, [MsgId], F5), - ra:stop_server(ServerId), + rabbit_quorum_queue:stop_server(ServerId), ok. conf(ClusterName, UId, ServerId, _, Peers) -> diff --git a/rabbitmq-components.mk b/rabbitmq-components.mk index f3b5e9674d..6124b3be85 100644 --- a/rabbitmq-components.mk +++ b/rabbitmq-components.mk @@ -116,7 +116,7 @@ dep_cowlib = hex 2.9.1 dep_jsx = hex 2.11.0 dep_looking_glass = git https://github.com/rabbitmq/looking_glass master dep_prometheus = git https://github.com/deadtrickster/prometheus.erl.git master -dep_ra = git https://github.com/rabbitmq/ra.git v1.x +dep_ra = git https://github.com/rabbitmq/ra.git master dep_ranch = hex 2.0.0 dep_recon = hex 2.5.1 dep_observer_cli = hex 1.6.1 |