diff options
author | Michael Davis <mcarsondavis@gmail.com> | 2023-05-08 14:42:34 -0500 |
---|---|---|
committer | Diana Parra Corbacho <dparracorbac@vmware.com> | 2023-05-12 11:31:16 +0200 |
commit | c5c1f4a57b424d9d28b8f1d6da73f9668ecd093f (patch) | |
tree | 4c136ca94bdd4de816a509f48c3a13e56ae59460 | |
parent | da3d00957c8eedc0afb2a7d33e8fa60c58c606b9 (diff) | |
download | rabbitmq-server-git-c5c1f4a57b424d9d28b8f1d6da73f9668ecd093f.tar.gz |
Restore 'rabbit_khepri:wait_for_leader/0'
This is almost the same as the prior 'wait_for_leader/0' function
except that we use 'khepri:exists/3' with the 'compromise' favor rather
than 'ra_leaderboard:lookup_leader/1' to check if the cluster has a
leader. The compromise favor either performs a leader or consistent
query (which also goes through the leader) depending on whether a
leader has been cached recently, so the query will block until a leader
can be elected and works through any pending consistent queries.
Elections run asynchronously after cluster start and may be slower
than the call to 'ra_leaderboard', so for large values of
'khepri_leader_wait_retry_timeout' the server have take a noticeable
pause on boot while waiting to poll 'ra_leaderboard'. Queries return
soon after the election which minimizes this pause.
-rw-r--r-- | deps/rabbit/src/rabbit_khepri.erl | 33 | ||||
-rw-r--r-- | deps/rabbit/test/clustering_management_SUITE.erl | 6 |
2 files changed, 38 insertions, 1 deletions
diff --git a/deps/rabbit/src/rabbit_khepri.erl b/deps/rabbit/src/rabbit_khepri.erl index 1f6cdda346..ae5d9c920d 100644 --- a/deps/rabbit/src/rabbit_khepri.erl +++ b/deps/rabbit/src/rabbit_khepri.erl @@ -126,6 +126,7 @@ setup(_) -> friendly_name => ?RA_FRIENDLY_NAME}, case khepri:start(?RA_SYSTEM, RaServerConfig) of {ok, ?STORE_ID} -> + wait_for_leader(), register_projections(), ?LOG_DEBUG( "Khepri-based " ?RA_FRIENDLY_NAME " ready", @@ -135,6 +136,38 @@ setup(_) -> exit(Error) end. +wait_for_leader() -> + wait_for_leader(retry_timeout(), retry_limit()). + +retry_timeout() -> + case application:get_env(rabbit, khepri_leader_wait_retry_timeout) of + {ok, T} -> T; + undefined -> 30000 + end. + +retry_limit() -> + case application:get_env(rabbit, khepri_leader_wait_retry_limit) of + {ok, T} -> T; + undefined -> 10 + end. + +wait_for_leader(_Timeout, 0) -> + exit(timeout_waiting_for_leader); +wait_for_leader(Timeout, Retries) -> + rabbit_log:info("Waiting for Khepri leader for ~tp ms, ~tp retries left", + [Timeout, Retries - 1]), + Options = #{timeout => Timeout, + favor => compromise}, + case khepri:exists(?STORE_ID, [], Options) of + Exists when is_boolean(Exists) -> + rabbit_log:info("Khepri leader elected"), + ok; + {error, {timeout, _ServerId}} -> + wait_for_leader(Timeout, Retries -1); + {error, Reason} -> + throw(Reason) + end. + add_member(JoiningNode, JoinedNode) when JoiningNode =:= node() andalso is_atom(JoinedNode) -> Ret = do_join(JoinedNode), diff --git a/deps/rabbit/test/clustering_management_SUITE.erl b/deps/rabbit/test/clustering_management_SUITE.erl index 1b402be180..1f5e678887 100644 --- a/deps/rabbit/test/clustering_management_SUITE.erl +++ b/deps/rabbit/test/clustering_management_SUITE.erl @@ -617,8 +617,12 @@ reset_in_minority(Config) -> rabbit_ct_broker_helpers:stop_node(Config, Hare), + ok = rpc:call(Rabbit, application, set_env, + [rabbit, khepri_leader_wait_retry_timeout, 1000]), + ok = rpc:call(Rabbit, application, set_env, + [rabbit, khepri_leader_wait_retry_limit, 3]), stop_app(Rabbit), - ?assertMatch({error, 75, _}, reset(Rabbit)), + ?assertMatch({error, 69, _}, reset(Rabbit)), ok. |