summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Davis <mcarsondavis@gmail.com>2023-05-08 14:42:34 -0500
committerDiana Parra Corbacho <dparracorbac@vmware.com>2023-05-12 11:31:16 +0200
commitc5c1f4a57b424d9d28b8f1d6da73f9668ecd093f (patch)
tree4c136ca94bdd4de816a509f48c3a13e56ae59460
parentda3d00957c8eedc0afb2a7d33e8fa60c58c606b9 (diff)
downloadrabbitmq-server-git-c5c1f4a57b424d9d28b8f1d6da73f9668ecd093f.tar.gz
Restore 'rabbit_khepri:wait_for_leader/0'
This is almost the same as the prior 'wait_for_leader/0' function except that we use 'khepri:exists/3' with the 'compromise' favor rather than 'ra_leaderboard:lookup_leader/1' to check if the cluster has a leader. The compromise favor either performs a leader or consistent query (which also goes through the leader) depending on whether a leader has been cached recently, so the query will block until a leader can be elected and works through any pending consistent queries. Elections run asynchronously after cluster start and may be slower than the call to 'ra_leaderboard', so for large values of 'khepri_leader_wait_retry_timeout' the server have take a noticeable pause on boot while waiting to poll 'ra_leaderboard'. Queries return soon after the election which minimizes this pause.
-rw-r--r--deps/rabbit/src/rabbit_khepri.erl33
-rw-r--r--deps/rabbit/test/clustering_management_SUITE.erl6
2 files changed, 38 insertions, 1 deletions
diff --git a/deps/rabbit/src/rabbit_khepri.erl b/deps/rabbit/src/rabbit_khepri.erl
index 1f6cdda346..ae5d9c920d 100644
--- a/deps/rabbit/src/rabbit_khepri.erl
+++ b/deps/rabbit/src/rabbit_khepri.erl
@@ -126,6 +126,7 @@ setup(_) ->
friendly_name => ?RA_FRIENDLY_NAME},
case khepri:start(?RA_SYSTEM, RaServerConfig) of
{ok, ?STORE_ID} ->
+ wait_for_leader(),
register_projections(),
?LOG_DEBUG(
"Khepri-based " ?RA_FRIENDLY_NAME " ready",
@@ -135,6 +136,38 @@ setup(_) ->
exit(Error)
end.
+wait_for_leader() ->
+ wait_for_leader(retry_timeout(), retry_limit()).
+
+retry_timeout() ->
+ case application:get_env(rabbit, khepri_leader_wait_retry_timeout) of
+ {ok, T} -> T;
+ undefined -> 30000
+ end.
+
+retry_limit() ->
+ case application:get_env(rabbit, khepri_leader_wait_retry_limit) of
+ {ok, T} -> T;
+ undefined -> 10
+ end.
+
+wait_for_leader(_Timeout, 0) ->
+ exit(timeout_waiting_for_leader);
+wait_for_leader(Timeout, Retries) ->
+ rabbit_log:info("Waiting for Khepri leader for ~tp ms, ~tp retries left",
+ [Timeout, Retries - 1]),
+ Options = #{timeout => Timeout,
+ favor => compromise},
+ case khepri:exists(?STORE_ID, [], Options) of
+ Exists when is_boolean(Exists) ->
+ rabbit_log:info("Khepri leader elected"),
+ ok;
+ {error, {timeout, _ServerId}} ->
+ wait_for_leader(Timeout, Retries -1);
+ {error, Reason} ->
+ throw(Reason)
+ end.
+
add_member(JoiningNode, JoinedNode)
when JoiningNode =:= node() andalso is_atom(JoinedNode) ->
Ret = do_join(JoinedNode),
diff --git a/deps/rabbit/test/clustering_management_SUITE.erl b/deps/rabbit/test/clustering_management_SUITE.erl
index 1b402be180..1f5e678887 100644
--- a/deps/rabbit/test/clustering_management_SUITE.erl
+++ b/deps/rabbit/test/clustering_management_SUITE.erl
@@ -617,8 +617,12 @@ reset_in_minority(Config) ->
rabbit_ct_broker_helpers:stop_node(Config, Hare),
+ ok = rpc:call(Rabbit, application, set_env,
+ [rabbit, khepri_leader_wait_retry_timeout, 1000]),
+ ok = rpc:call(Rabbit, application, set_env,
+ [rabbit, khepri_leader_wait_retry_limit, 3]),
stop_app(Rabbit),
- ?assertMatch({error, 75, _}, reset(Rabbit)),
+ ?assertMatch({error, 69, _}, reset(Rabbit)),
ok.