summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2021-03-18 09:50:32 +0000
committerMichael Klishin <michael@clojurewerkz.org>2021-03-22 21:44:15 +0300
commitf6f02a5d2dd8e2ff5e00f8623744635fc0b1e922 (patch)
tree8f43b2375da12ee806dde17a40b3e23f0833bb13
parent246f50598bed8b4e4d27b690b2a62752e997a0df (diff)
downloadrabbitmq-server-git-f6f02a5d2dd8e2ff5e00f8623744635fc0b1e922.tar.gz
ra systems wip
-rw-r--r--deps/rabbit/src/rabbit.erl16
-rw-r--r--deps/rabbit/src/rabbit_fifo.erl8
-rw-r--r--deps/rabbit/src/rabbit_maintenance.erl6
-rw-r--r--deps/rabbit/src/rabbit_quorum_memory_manager.erl2
-rw-r--r--deps/rabbit/src/rabbit_quorum_queue.erl65
-rw-r--r--deps/rabbit/src/rabbit_stream_coordinator.erl46
-rw-r--r--deps/rabbit/src/rabbit_stream_coordinator.hrl1
-rw-r--r--deps/rabbit/test/quorum_queue_SUITE.erl49
-rw-r--r--deps/rabbit/test/rabbit_fifo_int_SUITE.erl42
-rw-r--r--rabbitmq-components.mk2
10 files changed, 142 insertions, 95 deletions
diff --git a/deps/rabbit/src/rabbit.erl b/deps/rabbit/src/rabbit.erl
index 877891236e..5a1e807882 100644
--- a/deps/rabbit/src/rabbit.erl
+++ b/deps/rabbit/src/rabbit.erl
@@ -367,6 +367,22 @@ run_prelaunch_second_phase() ->
?LOG_DEBUG(""),
?LOG_DEBUG("== Prelaunch DONE =="),
+ ?LOG_DEBUG("Starting Ra Systems"),
+ Default = ra_system:default_config(),
+ Quorum = Default#{name => quorum},
+ % names => ra_system:derive_names(quorum)},
+ CoordDataDir = filename:join([rabbit_mnesia:dir(), "coordination", node()]),
+ Coord = Default#{name => coordination,
+ data_dir => CoordDataDir,
+ wal_data_dir => CoordDataDir,
+ names => ra_system:derive_names(coordination)},
+
+ {ok, _} = ra_system:start(Quorum),
+ {ok, _} = ra_system:start(Coord),
+
+ ?LOG_DEBUG(""),
+ ?LOG_DEBUG("== Ra System Start done DONE =="),
+
case IsInitialPass of
true -> rabbit_prelaunch:initial_pass_finished();
false -> ok
diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl
index 7f4a9d5122..9c6b3f9cbe 100644
--- a/deps/rabbit/src/rabbit_fifo.erl
+++ b/deps/rabbit/src/rabbit_fifo.erl
@@ -775,11 +775,11 @@ init_aux(Name) when is_atom(Name) ->
#aux{name = Name,
capacity = {inactive, Now, 1, 1.0}}.
-handle_aux(leader, _, garbage_collection, State, Log, _MacState) ->
- ra_log_wal:force_roll_over(ra_log_wal),
- {no_reply, State, Log};
+handle_aux(leader, _, garbage_collection, State, Log, MacState) ->
+ % ra_log_wal:force_roll_over(ra_log_wal),
+ {no_reply, force_eval_gc(Log, MacState, State), Log};
handle_aux(follower, _, garbage_collection, State, Log, MacState) ->
- ra_log_wal:force_roll_over(ra_log_wal),
+ % ra_log_wal:force_roll_over(ra_log_wal),
{no_reply, force_eval_gc(Log, MacState, State), Log};
handle_aux(_RaState, cast, eval, Aux0, Log, _MacState) ->
{no_reply, Aux0, Log};
diff --git a/deps/rabbit/src/rabbit_maintenance.erl b/deps/rabbit/src/rabbit_maintenance.erl
index a88d3de0c5..97de68bc6f 100644
--- a/deps/rabbit/src/rabbit_maintenance.erl
+++ b/deps/rabbit/src/rabbit_maintenance.erl
@@ -236,7 +236,7 @@ transfer_leadership_of_quorum_queues(_TransferCandidates) ->
%% by simply shutting its local QQ replica (Ra server)
RaLeader = amqqueue:get_pid(Q),
rabbit_log:debug("Will stop Ra server ~p", [RaLeader]),
- case ra:stop_server(RaLeader) of
+ case rabbit_quorum_queue:stop_server(RaLeader) of
ok ->
rabbit_log:debug("Successfully stopped Ra server ~p", [RaLeader]);
{error, nodedown} ->
@@ -296,7 +296,7 @@ stop_local_quorum_queue_followers() ->
{RegisteredName, _LeaderNode} = amqqueue:get_pid(Q),
RaNode = {RegisteredName, node()},
rabbit_log:debug("Will stop Ra server ~p", [RaNode]),
- case ra:stop_server(RaNode) of
+ case rabbit_quorum_queue:stop_server(RaNode) of
ok ->
rabbit_log:debug("Successfully stopped Ra server ~p", [RaNode]);
{error, nodedown} ->
@@ -339,7 +339,7 @@ revive_local_quorum_queue_replicas() ->
{Prefix, _Node} = amqqueue:get_pid(Q),
RaServer = {Prefix, node()},
rabbit_log:debug("Will start Ra server ~p", [RaServer]),
- case ra:restart_server(RaServer) of
+ case rabbit_quorum_queue:restart_server(RaServer) of
ok ->
rabbit_log:debug("Successfully restarted Ra server ~p", [RaServer]);
{error, {already_started, _Pid}} ->
diff --git a/deps/rabbit/src/rabbit_quorum_memory_manager.erl b/deps/rabbit/src/rabbit_quorum_memory_manager.erl
index 5e5e6084bb..6ff14090f7 100644
--- a/deps/rabbit/src/rabbit_quorum_memory_manager.erl
+++ b/deps/rabbit/src/rabbit_quorum_memory_manager.erl
@@ -60,7 +60,7 @@ code_change(_OldVsn, State, _Extra) ->
{ok, State}.
force_roll_over(State) ->
- ra_log_wal:force_roll_over(ra_log_wal),
+ rabbit_quorum_queue:wal_force_roll_over(node()),
State#state{last_roll_over = erlang:system_time(millisecond)}.
interval() ->
diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl
index d1e2d73603..6d35aa9a7c 100644
--- a/deps/rabbit/src/rabbit_quorum_queue.erl
+++ b/deps/rabbit/src/rabbit_quorum_queue.erl
@@ -13,7 +13,14 @@
close/1,
update/2,
handle_event/2]).
--export([is_recoverable/1, recover/2, stop/1, delete/4, delete_immediately/2]).
+-export([is_recoverable/1,
+ recover/2,
+ stop/1,
+ start_server/1,
+ restart_server/1,
+ stop_server/1,
+ delete/4,
+ delete_immediately/2]).
-export([state_info/1, info/2, stat/1, infos/1]).
-export([settle/4, dequeue/4, consume/3, cancel/5]).
-export([credit/4]).
@@ -38,16 +45,20 @@
-export([shrink_all/1,
grow/4]).
-export([transfer_leadership/2, get_replicas/1, queue_length/1]).
--export([file_handle_leader_reservation/1, file_handle_other_reservation/0]).
+-export([file_handle_leader_reservation/1,
+ file_handle_other_reservation/0]).
-export([file_handle_release_reservation/0]).
--export([list_with_minimum_quorum/0, list_with_minimum_quorum_for_cli/0,
- filter_quorum_critical/1, filter_quorum_critical/2,
+-export([list_with_minimum_quorum/0,
+ list_with_minimum_quorum_for_cli/0,
+ filter_quorum_critical/1,
+ filter_quorum_critical/2,
all_replica_states/0]).
-export([capabilities/0]).
-export([repair_amqqueue_nodes/1,
repair_amqqueue_nodes/2
]).
--export([reclaim_memory/2]).
+-export([reclaim_memory/2,
+ wal_force_roll_over/1]).
-export([notify_decorators/1,
notify_decorators/3,
spawn_notify_decorators/3]).
@@ -65,6 +76,9 @@
-type msg_id() :: non_neg_integer().
-type qmsg() :: {rabbit_types:r('queue'), pid(), msg_id(), boolean(), rabbit_types:message()}.
+-define(RA_SYSTEM, quorum).
+-define(RA_WAL_NAME, ra_log_wal).
+
-define(STATISTICS_KEYS,
[policy,
operator_policy,
@@ -168,7 +182,7 @@ start_cluster(Q) ->
TickTimeout = application:get_env(rabbit, quorum_tick_interval, ?TICK_TIMEOUT),
RaConfs = [make_ra_conf(NewQ, ServerId, TickTimeout)
|| ServerId <- members(NewQ)],
- case ra:start_cluster(RaConfs) of
+ case ra:start_cluster(?RA_SYSTEM, RaConfs) of
{ok, _, _} ->
%% ensure the latest config is evaluated properly
%% even when running the machine version from 0
@@ -506,7 +520,7 @@ recover(_Vhost, Queues) ->
QName = amqqueue:get_name(Q0),
Nodes = get_nodes(Q0),
Formatter = {?MODULE, format_ra_event, [QName]},
- Res = case ra:restart_server({Name, node()},
+ Res = case ra:restart_server(?RA_SYSTEM, {Name, node()},
#{ra_event_formatter => Formatter}) of
ok ->
% queue was restarted, good
@@ -518,7 +532,8 @@ recover(_Vhost, Queues) ->
% so needs to be started from scratch.
Machine = ra_machine(Q0),
RaNodes = [{Name, Node} || Node <- Nodes],
- case ra:start_server(Name, {Name, node()}, Machine, RaNodes) of
+ case ra:start_server(?RA_SYSTEM, Name, {Name, node()},
+ Machine, RaNodes) of
ok -> ok;
Err2 ->
rabbit_log:warning("recover: quorum queue ~w could not"
@@ -553,10 +568,22 @@ recover(_Vhost, Queues) ->
stop(VHost) ->
_ = [begin
Pid = amqqueue:get_pid(Q),
- ra:stop_server(Pid)
+ ra:stop_server(?RA_SYSTEM, Pid)
end || Q <- find_quorum_queues(VHost)],
ok.
+-spec stop_server({atom(), node()}) -> ok | {error, term()}.
+stop_server({_, _} = Ref) ->
+ ra:stop_server(?RA_SYSTEM, Ref).
+
+-spec start_server(map()) -> ok | {error, term()}.
+start_server(Conf) when is_map(Conf) ->
+ ra:start_server(?RA_SYSTEM, Conf).
+
+-spec restart_server({atom(), node()}) -> ok | {error, term()}.
+restart_server({_, _} = Ref) ->
+ ra:restart_server(?RA_SYSTEM, Ref).
+
-spec delete(amqqueue:amqqueue(),
boolean(), boolean(),
rabbit_types:username()) ->
@@ -617,7 +644,7 @@ delete(Q, _IfUnused, _IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) ->
force_delete_queue(Servers) ->
[begin
- case catch(ra:force_delete_server(S)) of
+ case catch(ra:force_delete_server(?RA_SYSTEM, S)) of
ok -> ok;
Err ->
rabbit_log:warning(
@@ -877,19 +904,19 @@ cleanup_data_dir() ->
|| Q <- rabbit_amqqueue:list_by_type(?MODULE),
lists:member(node(), get_nodes(Q))],
NoQQClusters = rabbit_ra_registry:list_not_quorum_clusters(),
- Registered = ra_directory:list_registered(),
+ Registered = ra_directory:list_registered(?RA_SYSTEM),
Running = Names ++ NoQQClusters,
_ = [maybe_delete_data_dir(UId) || {Name, UId} <- Registered,
not lists:member(Name, Running)],
ok.
maybe_delete_data_dir(UId) ->
- Dir = ra_env:server_data_dir(UId),
+ Dir = ra_env:server_data_dir(?RA_SYSTEM, UId),
{ok, Config} = ra_log:read_config(Dir),
case maps:get(machine, Config) of
{module, rabbit_fifo, _} ->
ra_lib:recursive_delete(Dir),
- ra_directory:unregister_name(UId);
+ ra_directory:unregister_name(?RA_SYSTEM, UId);
_ ->
ok
end.
@@ -999,7 +1026,7 @@ add_member(Q, Node, Timeout) when ?amqqueue_is_quorum(Q) ->
TickTimeout = application:get_env(rabbit, quorum_tick_interval,
?TICK_TIMEOUT),
Conf = make_ra_conf(Q, ServerId, TickTimeout),
- case ra:start_server(Conf) of
+ case ra:start_server(?RA_SYSTEM, Conf) of
ok ->
case ra:add_member(Members, ServerId, Timeout) of
{ok, _, Leader} ->
@@ -1014,11 +1041,11 @@ add_member(Q, Node, Timeout) when ?amqqueue_is_quorum(Q) ->
fun() -> rabbit_amqqueue:update(QName, Fun) end),
ok;
{timeout, _} ->
- _ = ra:force_delete_server(ServerId),
+ _ = ra:force_delete_server(?RA_SYSTEM, ServerId),
_ = ra:remove_member(Members, ServerId),
{error, timeout};
E ->
- _ = ra:force_delete_server(ServerId),
+ _ = ra:force_delete_server(?RA_SYSTEM, ServerId),
E
end;
E ->
@@ -1065,7 +1092,7 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
end,
rabbit_misc:execute_mnesia_transaction(
fun() -> rabbit_amqqueue:update(QName, Fun) end),
- case ra:force_delete_server(ServerId) of
+ case ra:force_delete_server(?RA_SYSTEM, ServerId) of
ok ->
ok;
{error, {badrpc, nodedown}} ->
@@ -1199,6 +1226,10 @@ reclaim_memory(Vhost, QueueName) ->
E
end.
+-spec wal_force_roll_over(node()) -> ok.
+ wal_force_roll_over(Node) ->
+ ra_log_wal:force_roll_over({?RA_WAL_NAME, Node}).
+
%%----------------------------------------------------------------------------
dlx_mfa(Q) ->
DLX = init_dlx(args_policy_lookup(<<"dead-letter-exchange">>,
diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl
index cd3439ddff..329119f3dd 100644
--- a/deps/rabbit/src/rabbit_stream_coordinator.erl
+++ b/deps/rabbit/src/rabbit_stream_coordinator.erl
@@ -89,10 +89,10 @@
start() ->
Nodes = rabbit_mnesia:cluster_nodes(all),
ServerId = {?MODULE, node()},
- case ra:restart_server(ServerId) of
+ case ra:restart_server(?RA_SYSTEM, ServerId) of
{error, Reason} when Reason == not_started orelse
Reason == name_not_registered ->
- case ra:start_server(make_ra_conf(node(), Nodes)) of
+ case ra:start_server(?RA_SYSTEM, make_ra_conf(node(), Nodes)) of
ok ->
global:set_lock(?STREAM_COORDINATOR_STARTUP),
case find_members(Nodes) of
@@ -259,26 +259,24 @@ ensure_coordinator_started() ->
case whereis(?MODULE) of
undefined ->
global:set_lock(?STREAM_COORDINATOR_STARTUP),
- Nodes =
- case ra:restart_server(Local) of
- {error, Reason} when Reason == not_started orelse
- Reason == name_not_registered ->
- OtherNodes = all_coord_members() -- [Local],
- %% We can't use find_members/0 here as a process that timeouts means the cluster is up
- case lists:filter(fun(N) -> global:whereis_name(N) =/= undefined end,
- OtherNodes) of
- [] ->
- start_coordinator_cluster();
- _ ->
- OtherNodes
- end;
- ok ->
- AllNodes;
- {error, {already_started, _}} ->
- AllNodes;
- _ ->
- AllNodes
- end,
+ Nodes = case ra:restart_server(?RA_SYSTEM, Local) of
+ {error, Reason} when Reason == not_started orelse
+ Reason == name_not_registered ->
+ OtherNodes = all_coord_members() -- [Local],
+ %% We can't use find_members/0 here as a process that timeouts means the cluster is up
+ case lists:filter(fun(N) -> global:whereis_name(N) =/= undefined end, OtherNodes) of
+ [] ->
+ start_coordinator_cluster();
+ _ ->
+ OtherNodes
+ end;
+ ok ->
+ AllNodes;
+ {error, {already_started, _}} ->
+ AllNodes;
+ _ ->
+ AllNodes
+ end,
global:del_lock(?STREAM_COORDINATOR_STARTUP),
Nodes;
_ ->
@@ -288,7 +286,7 @@ ensure_coordinator_started() ->
start_coordinator_cluster() ->
Nodes = rabbit_mnesia:cluster_nodes(running),
rabbit_log:debug("Starting stream coordinator on nodes: ~w", [Nodes]),
- case ra:start_cluster([make_ra_conf(Node, Nodes) || Node <- Nodes]) of
+ case ra:start_cluster(?RA_SYSTEM, [make_ra_conf(Node, Nodes) || Node <- Nodes]) of
{ok, Started, _} ->
rabbit_log:debug("Started stream coordinator on ~w", [Started]),
Started;
@@ -488,7 +486,7 @@ add_members(_, []) ->
ok;
add_members(Members, [Node | Nodes]) ->
Conf = make_ra_conf(Node, [N || {_, N} <- Members]),
- case ra:start_server(Conf) of
+ case ra:start_server(?RA_SYSTEM, Conf) of
ok ->
case ra:add_member(Members, {?MODULE, Node}) of
{ok, NewMembers, _} ->
diff --git a/deps/rabbit/src/rabbit_stream_coordinator.hrl b/deps/rabbit/src/rabbit_stream_coordinator.hrl
index 3df16ce39c..e6bba92070 100644
--- a/deps/rabbit/src/rabbit_stream_coordinator.hrl
+++ b/deps/rabbit/src/rabbit_stream_coordinator.hrl
@@ -4,6 +4,7 @@
-define(RESTART_TIMEOUT, 1000).
-define(PHASE_RETRY_TIMEOUT, 10000).
-define(CMD_TIMEOUT, 30000).
+-define(RA_SYSTEM, coordination).
-type stream_id() :: string().
-type stream() :: #{conf := osiris:config(),
diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl
index 098c24b8c5..98d477278c 100644
--- a/deps/rabbit/test/quorum_queue_SUITE.erl
+++ b/deps/rabbit/test/quorum_queue_SUITE.erl
@@ -141,6 +141,7 @@ memory_tests() ->
memory_alarm_rolls_wal
].
+-define(SUPNAME, ra_server_sup_sup).
%% -------------------------------------------------------------------
%% Testsuite setup/teardown.
%% -------------------------------------------------------------------
@@ -281,7 +282,7 @@ end_per_testcase(Testcase, Config) when Testcase == reconnect_consumer_and_publi
rabbit_ct_broker_helpers:teardown_steps()),
rabbit_ct_helpers:testcase_finished(Config1, Testcase);
end_per_testcase(Testcase, Config) ->
- catch delete_queues(),
+ % catch delete_queues(),
Config1 = rabbit_ct_helpers:run_steps(
Config,
rabbit_ct_client_helpers:teardown_steps()),
@@ -352,7 +353,7 @@ start_queue(Config) ->
LQ = ?config(queue_name, Config),
%% The stream coordinator is also a ra process, we need to ensure the quorum tests
%% are not affected by any other ra cluster that could be added in the future
- Children = length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
+ Children = length(rpc:call(Server, supervisor, which_children, [?SUPNAME])),
?assertEqual({'queue.declare_ok', LQ, 0, 0},
declare(Ch, LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
@@ -362,7 +363,7 @@ start_queue(Config) ->
rpc:call(Server, application, which_applications, []))),
Expected = Children + 1,
?assertMatch(Expected,
- length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]))),
+ length(rpc:call(Server, supervisor, which_children, [?SUPNAME]))),
%% Test declare an existing queue
?assertEqual({'queue.declare_ok', LQ, 0, 0},
@@ -379,7 +380,7 @@ start_queue(Config) ->
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))),
?assertMatch(Expected,
- length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]))).
+ length(rpc:call(Server, supervisor, which_children, [?SUPNAME]))).
start_queue_concurrent(Config) ->
Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -439,7 +440,7 @@ stop_queue(Config) ->
%% The stream coordinator is also a ra process, we need to ensure the quorum tests
%% are not affected by any other ra cluster that could be added in the future
- Children = length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
+ Children = length(rpc:call(Server, supervisor, which_children, [?SUPNAME])),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
LQ = ?config(queue_name, Config),
@@ -451,13 +452,13 @@ stop_queue(Config) ->
rpc:call(Server, application, which_applications, []))),
Expected = Children + 1,
?assertMatch(Expected,
- length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]))),
+ length(rpc:call(Server, supervisor, which_children, [?SUPNAME]))),
%% Delete the quorum queue
?assertMatch(#'queue.delete_ok'{}, amqp_channel:call(Ch, #'queue.delete'{queue = LQ})),
%% Check that the application and process are down
wait_until(fun() ->
- Children == length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]))
+ Children == length(rpc:call(Server, supervisor, which_children, [?SUPNAME]))
end),
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))).
@@ -467,7 +468,7 @@ restart_queue(Config) ->
%% The stream coordinator is also a ra process, we need to ensure the quorum tests
%% are not affected by any other ra cluster that could be added in the future
- Children = length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
+ Children = length(rpc:call(Server, supervisor, which_children, [?SUPNAME])),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
LQ = ?config(queue_name, Config),
@@ -482,7 +483,7 @@ restart_queue(Config) ->
rpc:call(Server, application, which_applications, []))),
Expected = Children + 1,
?assertMatch(Expected,
- length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]))),
+ length(rpc:call(Server, supervisor, which_children, [?SUPNAME]))),
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server),
delete_queues(Ch2, [LQ]).
@@ -530,11 +531,11 @@ vhost_with_quorum_queue_is_deleted(Config) ->
?assertEqual({'queue.declare_ok', QName, 0, 0},
declare(Ch, QName, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
- UId = rpc:call(Node, ra_directory, where_is, [RaName]),
+ UId = rpc:call(Node, ra_directory, where_is, [quorum, RaName]),
?assert(UId =/= undefined),
ok = rabbit_ct_broker_helpers:delete_vhost(Config, VHost),
%% validate quorum queues got deleted
- undefined = rpc:call(Node, ra_directory, where_is, [RaName]),
+ undefined = rpc:call(Node, ra_directory, where_is, [quorum, RaName]),
ok.
restart_all_types(Config) ->
@@ -544,7 +545,7 @@ restart_all_types(Config) ->
%% The stream coordinator is also a ra process, we need to ensure the quorum tests
%% are not affected by any other ra cluster that could be added in the future
- Children = rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]),
+ Children = rpc:call(Server, supervisor, which_children, [?SUPNAME]),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
QQ1 = <<"restart_all_types-qq1">>,
@@ -577,7 +578,7 @@ restart_all_types(Config) ->
Server,
supervisor,
which_children,
- [ra_server_sup_sup]))
+ [?SUPNAME]))
end, 60000),
%% Check the classic queues restarted correctly
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server),
@@ -598,7 +599,7 @@ stop_start_rabbit_app(Config) ->
%% The stream coordinator is also a ra process, we need to ensure the quorum tests
%% are not affected by any other ra cluster that could be added in the future
- Children = length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
+ Children = length(rpc:call(Server, supervisor, which_children, [?SUPNAME])),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
QQ1 = <<"stop_start_rabbit_app-qq">>,
@@ -627,7 +628,7 @@ stop_start_rabbit_app(Config) ->
rpc:call(Server, application, which_applications, []))),
Expected = Children + 2,
?assertMatch(Expected,
- length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]))),
+ length(rpc:call(Server, supervisor, which_children, [?SUPNAME]))),
%% Check the classic queues restarted correctly
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server),
{#'basic.get_ok'{}, #amqp_msg{}} =
@@ -947,7 +948,7 @@ cleanup_queue_state_on_channel_after_publish(Config) ->
%% The stream coordinator is also a ra process, we need to ensure the quorum tests
%% are not affected by any other ra cluster that could be added in the future
- Children = length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
+ Children = length(rpc:call(Server, supervisor, which_children, [?SUPNAME])),
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server),
@@ -970,7 +971,7 @@ cleanup_queue_state_on_channel_after_publish(Config) ->
amqp_channel:call(Ch1, #'queue.delete'{queue = QQ})),
wait_until(fun() ->
Children == length(rpc:call(Server, supervisor, which_children,
- [ra_server_sup_sup]))
+ [?SUPNAME]))
end),
%% Check that all queue states have been cleaned
wait_for_cleanup(Server, NCh2, 0),
@@ -983,7 +984,7 @@ cleanup_queue_state_on_channel_after_subscribe(Config) ->
%% The stream coordinator is also a ra process, we need to ensure the quorum tests
%% are not affected by any other ra cluster that could be added in the future
- Children = length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
+ Children = length(rpc:call(Server, supervisor, which_children, [?SUPNAME])),
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server),
@@ -1011,7 +1012,7 @@ cleanup_queue_state_on_channel_after_subscribe(Config) ->
wait_for_cleanup(Server, NCh2, 1),
?assertMatch(#'queue.delete_ok'{}, amqp_channel:call(Ch1, #'queue.delete'{queue = QQ})),
wait_until(fun() ->
- Children == length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]))
+ Children == length(rpc:call(Server, supervisor, which_children, [?SUPNAME]))
end),
%% Check that all queue states have been cleaned
wait_for_cleanup(Server, NCh1, 0),
@@ -1614,8 +1615,8 @@ cleanup_data_dir(Config) ->
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
timer:sleep(100),
- UId1 = proplists:get_value(ra_name(QQ), rpc:call(Server1, ra_directory, list_registered, [])),
- UId2 = proplists:get_value(ra_name(QQ), rpc:call(Server2, ra_directory, list_registered, [])),
+ UId1 = proplists:get_value(ra_name(QQ), rpc:call(Server1, ra_directory, list_registered, [quorum])),
+ UId2 = proplists:get_value(ra_name(QQ), rpc:call(Server2, ra_directory, list_registered, [quorum])),
DataDir1 = rpc:call(Server1, ra_env, server_data_dir, [UId1]),
DataDir2 = rpc:call(Server2, ra_env, server_data_dir, [UId2]),
?assert(filelib:is_dir(DataDir1)),
@@ -1769,7 +1770,7 @@ delete_immediately_by_resource(Config) ->
%% The stream coordinator is also a ra process, we need to ensure the quorum tests
%% are not affected by any other ra cluster that could be added in the future
- Children = length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
+ Children = length(rpc:call(Server, supervisor, which_children, [?SUPNAME])),
QQ = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', QQ, 0, 0},
@@ -1779,7 +1780,7 @@ delete_immediately_by_resource(Config) ->
%% Check that the application and process are down
wait_until(fun() ->
- Children == length(rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]))
+ Children == length(rpc:call(Server, supervisor, which_children, [?SUPNAME]))
end),
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))).
@@ -2082,7 +2083,7 @@ message_bytes_metrics(Config) ->
memory_alarm_rolls_wal(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
- WalDataDir = rpc:call(Server, ra_env, wal_data_dir, []),
+ #{wal_data_dir := WalDataDir} = ra_system:fetch(quorum, Server),
[Wal0] = filelib:wildcard(WalDataDir ++ "/*.wal"),
rabbit_ct_broker_helpers:set_alarm(Config, Server, memory),
rabbit_ct_helpers:await_condition(
diff --git a/deps/rabbit/test/rabbit_fifo_int_SUITE.erl b/deps/rabbit/test/rabbit_fifo_int_SUITE.erl
index 37f5436dbf..1474c6b88d 100644
--- a/deps/rabbit/test/rabbit_fifo_int_SUITE.erl
+++ b/deps/rabbit/test/rabbit_fifo_int_SUITE.erl
@@ -89,7 +89,7 @@ basics(Config) ->
{ok, FState1} = rabbit_fifo_client:checkout(CustomerTag, 1, simple_prefetch,
#{}, FState0),
- ra_log_wal:force_roll_over(ra_log_wal),
+ rabbit_quorum_queue:wal_force_roll_over(node()),
% create segment the segment will trigger a snapshot
timer:sleep(1000),
@@ -112,8 +112,8 @@ basics(Config) ->
% process settle applied notification
FState5b = process_ra_event(FState5, ?RA_EVENT_TIMEOUT),
- _ = ra:stop_server(ServerId),
- _ = ra:restart_server(ServerId),
+ _ = rabbit_quorum_queue:stop_server(ServerId),
+ _ = rabbit_quorum_queue:restart_server(ServerId),
%% wait for leader change to notice server is up again
receive
@@ -137,7 +137,7 @@ basics(Config) ->
after 2000 ->
exit(await_msg_timeout)
end,
- ra:stop_server(ServerId),
+ rabbit_quorum_queue:stop_server(ServerId),
ok.
return(Config) ->
@@ -152,7 +152,7 @@ return(Config) ->
{ok, _, {_, _, MsgId, _, _}, F} = rabbit_fifo_client:dequeue(<<"tag">>, unsettled, F2),
_F2 = rabbit_fifo_client:return(<<"tag">>, [MsgId], F),
- ra:stop_server(ServerId),
+ rabbit_quorum_queue:stop_server(ServerId),
ok.
rabbit_fifo_returns_correlation(Config) ->
@@ -172,7 +172,7 @@ rabbit_fifo_returns_correlation(Config) ->
after 2000 ->
exit(await_msg_timeout)
end,
- ra:stop_server(ServerId),
+ rabbit_quorum_queue:stop_server(ServerId),
ok.
duplicate_delivery(Config) ->
@@ -207,7 +207,7 @@ duplicate_delivery(Config) ->
end
end,
Fun(F2),
- ra:stop_server(ServerId),
+ rabbit_quorum_queue:stop_server(ServerId),
ok.
usage(Config) ->
@@ -223,7 +223,7 @@ usage(Config) ->
ServerId ! tick_timeout,
timer:sleep(50),
Use = rabbit_fifo:usage(element(1, ServerId)),
- ra:stop_server(ServerId),
+ rabbit_quorum_queue:stop_server(ServerId),
?assert(Use > 0.0),
ok.
@@ -245,7 +245,7 @@ resends_lost_command(Config) ->
{ok, _, {_, _, _, _, msg1}, F5} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F4),
{ok, _, {_, _, _, _, msg2}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5),
{ok, _, {_, _, _, _, msg3}, _F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6),
- ra:stop_server(ServerId),
+ rabbit_quorum_queue:stop_server(ServerId),
ok.
two_quick_enqueues(Config) ->
@@ -257,7 +257,7 @@ two_quick_enqueues(Config) ->
F1 = element(2, rabbit_fifo_client:enqueue(msg1, F0)),
{ok, F2} = rabbit_fifo_client:enqueue(msg2, F1),
_ = process_ra_events(receive_ra_events(2, 0), F2),
- ra:stop_server(ServerId),
+ rabbit_quorum_queue:stop_server(ServerId),
ok.
detects_lost_delivery(Config) ->
@@ -281,7 +281,7 @@ detects_lost_delivery(Config) ->
% assert three deliveries were received
{[_, _, _], _, _} = process_ra_events(receive_ra_events(2, 2), F3),
- ra:stop_server(ServerId),
+ rabbit_quorum_queue:stop_server(ServerId),
ok.
returns_after_down(Config) ->
@@ -306,7 +306,7 @@ returns_after_down(Config) ->
timer:sleep(1000),
% message should be available for dequeue
{ok, _, {_, _, _, _, msg1}, _} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F2),
- ra:stop_server(ServerId),
+ rabbit_quorum_queue:stop_server(ServerId),
ok.
resends_after_lost_applied(Config) ->
@@ -331,7 +331,7 @@ resends_after_lost_applied(Config) ->
{ok, _, {_, _, _, _, msg1}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5),
{ok, _, {_, _, _, _, msg2}, F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6),
{ok, _, {_, _, _, _, msg3}, _F8} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F7),
- ra:stop_server(ServerId),
+ rabbit_quorum_queue:stop_server(ServerId),
ok.
handles_reject_notification(Config) ->
@@ -355,8 +355,8 @@ handles_reject_notification(Config) ->
% the applied notification
_F2 = process_ra_events(receive_ra_events(1, 0), F1),
- ra:stop_server(ServerId1),
- ra:stop_server(ServerId2),
+ rabbit_quorum_queue:stop_server(ServerId1),
+ rabbit_quorum_queue:stop_server(ServerId2),
ok.
discard(Config) ->
@@ -373,7 +373,7 @@ discard(Config) ->
#{queue_resource => discard,
dead_letter_handler =>
{?MODULE, dead_letter_handler, [self()]}}}},
- _ = ra:start_server(Conf),
+ _ = rabbit_quorum_queue:start_server(Conf),
ok = ra:trigger_election(ServerId),
_ = ra:members(ServerId),
@@ -391,7 +391,7 @@ discard(Config) ->
flush(),
exit(dead_letter_timeout)
end,
- ra:stop_server(ServerId),
+ rabbit_quorum_queue:stop_server(ServerId),
ok.
cancel_checkout(Config) ->
@@ -455,7 +455,7 @@ untracked_enqueue(Config) ->
timer:sleep(100),
F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
{ok, _, {_, _, _, _, msg1}, _F5} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F0),
- ra:stop_server(ServerId),
+ rabbit_quorum_queue:stop_server(ServerId),
ok.
@@ -470,7 +470,7 @@ flow(Config) ->
{slow, F4} = rabbit_fifo_client:enqueue(m4, F3),
{_, _, F5} = process_ra_events(receive_ra_events(4, 0), F4),
{ok, _} = rabbit_fifo_client:enqueue(m5, F5),
- ra:stop_server(ServerId),
+ rabbit_quorum_queue:stop_server(ServerId),
ok.
test_queries(Config) ->
@@ -504,7 +504,7 @@ test_queries(Config) ->
fun rabbit_fifo:query_processes/1),
?assertEqual(2, length(Processes)),
P ! stop,
- ra:stop_server(ServerId),
+ rabbit_quorum_queue:stop_server(ServerId),
ok.
dead_letter_handler(Pid, Msgs) ->
@@ -527,7 +527,7 @@ dequeue(Config) ->
{_, _, F4} = process_ra_events(receive_ra_events(1, 0), F4_),
{ok, _, {_, _, MsgId, _, msg2}, F5} = rabbit_fifo_client:dequeue(Tag, unsettled, F4),
{_F6, _A} = rabbit_fifo_client:settle(Tag, [MsgId], F5),
- ra:stop_server(ServerId),
+ rabbit_quorum_queue:stop_server(ServerId),
ok.
conf(ClusterName, UId, ServerId, _, Peers) ->
diff --git a/rabbitmq-components.mk b/rabbitmq-components.mk
index f3b5e9674d..6124b3be85 100644
--- a/rabbitmq-components.mk
+++ b/rabbitmq-components.mk
@@ -116,7 +116,7 @@ dep_cowlib = hex 2.9.1
dep_jsx = hex 2.11.0
dep_looking_glass = git https://github.com/rabbitmq/looking_glass master
dep_prometheus = git https://github.com/deadtrickster/prometheus.erl.git master
-dep_ra = git https://github.com/rabbitmq/ra.git v1.x
+dep_ra = git https://github.com/rabbitmq/ra.git master
dep_ranch = hex 2.0.0
dep_recon = hex 2.5.1
dep_observer_cli = hex 1.6.1