summaryrefslogtreecommitdiff
path: root/deps/rabbit
diff options
context:
space:
mode:
authorMichael Klishin <klishinm@vmware.com>2023-04-19 21:17:51 +0400
committerGitHub <noreply@github.com>2023-04-19 21:17:51 +0400
commit51cd6486bf3ca3e536a5e4986f7fb4c8671b53f8 (patch)
tree76f93ee3a10dd58b67469d21dda9c489edbf66ec /deps/rabbit
parentb71d19d0143632727aefcca6ed52533b0bb83c13 (diff)
parentd34ac54bf19e31a7270be6e687d903e0178e343b (diff)
downloadrabbitmq-server-git-51cd6486bf3ca3e536a5e4986f7fb4c8671b53f8.tar.gz
Merge pull request #7745 from rabbitmq/recovery-terms-ram-file-on-start
Recovery terms: use ram_file on start, but not on shutdown
Diffstat (limited to 'deps/rabbit')
-rw-r--r--deps/rabbit/src/rabbit_queue_index.erl3
-rw-r--r--deps/rabbit/src/rabbit_recovery_terms.erl32
-rw-r--r--deps/rabbit/src/rabbit_vhost_sup_sup.erl29
3 files changed, 45 insertions, 19 deletions
diff --git a/deps/rabbit/src/rabbit_queue_index.erl b/deps/rabbit/src/rabbit_queue_index.erl
index 37a05263fd..3333ecab50 100644
--- a/deps/rabbit/src/rabbit_queue_index.erl
+++ b/deps/rabbit/src/rabbit_queue_index.erl
@@ -536,7 +536,8 @@ bounds(State = #qistate { segments = Segments }) ->
-spec start(rabbit_types:vhost(), [rabbit_amqqueue:name()]) -> {[[any()]], {walker(A), A}}.
start(VHost, DurableQueueNames) ->
- ok = rabbit_recovery_terms:start(VHost),
+ {ok, RecoveryTermsPid} = rabbit_recovery_terms:start(VHost),
+ rabbit_vhost_sup_sup:save_vhost_recovery_terms(VHost, RecoveryTermsPid),
{DurableTerms, DurableDirectories} =
lists:foldl(
fun(QName, {RecoveryTerms, ValidDirectories}) ->
diff --git a/deps/rabbit/src/rabbit_recovery_terms.erl b/deps/rabbit/src/rabbit_recovery_terms.erl
index cae1a161e1..869e176c2c 100644
--- a/deps/rabbit/src/rabbit_recovery_terms.erl
+++ b/deps/rabbit/src/rabbit_recovery_terms.erl
@@ -22,25 +22,23 @@
%%----------------------------------------------------------------------------
--spec start(rabbit_types:vhost()) -> rabbit_types:ok_or_error(term()).
+-spec start(rabbit_types:vhost()) -> rabbit_types:ok_or_error2(term()).
start(VHost) ->
case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of
{ok, VHostSup} ->
- {ok, _} = supervisor:start_child(
+ supervisor:start_child(
VHostSup,
{?MODULE,
{?MODULE, start_link, [VHost]},
transient, ?WORKER_WAIT, worker,
- [?MODULE]}),
- ok;
+ [?MODULE]});
%% we can get here if a vhost is added and removed concurrently
%% e.g. some integration tests do it
{error, {no_such_vhost, VHost}} ->
rabbit_log:error("Failed to start a recovery terms manager for vhost ~ts: vhost no longer exists!",
[VHost])
- end,
- ok.
+ end.
-spec stop(rabbit_types:vhost()) -> rabbit_types:ok_or_error(term()).
@@ -77,6 +75,8 @@ read(VHost, DirBaseName) ->
clear(VHost) ->
try
_ = dets:delete_all_objects(VHost),
+ VHostRecoveryTermsOwner = rabbit_vhost_sup_sup:lookup_vhost_recovery_terms(VHost),
+ ok = gen_server:call(VHostRecoveryTermsOwner, {reopen_for_writes, VHost}),
ok
%% see start/1
catch _:badarg ->
@@ -93,9 +93,13 @@ start_link(VHost) ->
init([VHost]) ->
process_flag(trap_exit, true),
- _ = open_table(VHost),
+ ok = open_table(VHost, true),
{ok, VHost}.
+handle_call({reopen_for_writes, VHost}, _, State) ->
+ close_table(VHost),
+ ok = open_table(VHost, false),
+ {reply, ok, State};
handle_call(Msg, _, State) -> {stop, {unexpected_call, Msg}, State}.
handle_cast(Msg, State) -> {stop, {unexpected_cast, Msg}, State}.
@@ -110,18 +114,18 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
--spec open_table(vhost:name()) -> rabbit_types:ok_or_error(any()).
+-spec open_table(vhost:name(), boolean()) -> rabbit_types:ok_or_error(any()).
-open_table(VHost) ->
- open_table(VHost, 10).
+open_table(VHost, RamFile) ->
+ open_table(VHost, RamFile, 10).
--spec open_table(vhost:name(), non_neg_integer()) -> rabbit_types:ok_or_error(any()).
+-spec open_table(vhost:name(), boolean(), non_neg_integer()) -> rabbit_types:ok_or_error(any()).
-open_table(VHost, RetriesLeft) ->
+open_table(VHost, RamFile, RetriesLeft) ->
VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
File = filename:join(VHostDir, "recovery.dets"),
Opts = [{file, File},
- {ram_file, true},
+ {ram_file, RamFile},
{auto_save, infinity}],
case dets:open_file(VHost, Opts) of
{ok, _} -> ok;
@@ -136,7 +140,7 @@ open_table(VHost, RetriesLeft) ->
rabbit_log:warning("Failed to open a recovery terms DETS file at ~tp. Will delete it and retry in ~tp ms (~tp retries left)",
[File, DelayInMs, RetriesLeft]),
timer:sleep(DelayInMs),
- open_table(VHost, RetriesLeft - 1)
+ open_table(VHost, RamFile, RetriesLeft - 1)
end
end.
diff --git a/deps/rabbit/src/rabbit_vhost_sup_sup.erl b/deps/rabbit/src/rabbit_vhost_sup_sup.erl
index e8e524c6f6..706004efc8 100644
--- a/deps/rabbit/src/rabbit_vhost_sup_sup.erl
+++ b/deps/rabbit/src/rabbit_vhost_sup_sup.erl
@@ -18,7 +18,10 @@
start_vhost/1, start_vhost/2,
get_vhost_sup/1, get_vhost_sup/2,
save_vhost_sup/3,
- save_vhost_process/2]).
+ save_vhost_process/2,
+ save_vhost_recovery_terms/2,
+ lookup_vhost_sup_record/1,
+ lookup_vhost_recovery_terms/1]).
-export([delete_on_all_nodes/1, start_on_all_nodes/1]).
-export([is_vhost_alive/1]).
-export([check/0]).
@@ -26,7 +29,7 @@
%% Internal
-export([stop_and_delete_vhost/1]).
--record(vhost_sup, {vhost, vhost_sup_pid, wrapper_pid, vhost_process_pid}).
+-record(vhost_sup, {vhost, vhost_sup_pid, wrapper_pid, vhost_process_pid, recovery_terms_pid}).
start() ->
case supervisor:start_child(rabbit_sup, {?MODULE,
@@ -209,13 +212,20 @@ is_vhost_alive(VHost) ->
save_vhost_sup(VHost, WrapperPid, VHostPid) ->
true = ets:insert(?MODULE, #vhost_sup{vhost = VHost,
vhost_sup_pid = VHostPid,
- wrapper_pid = WrapperPid}),
+ wrapper_pid = WrapperPid,
+ recovery_terms_pid = no_pid}),
+ ok.
+
+-spec save_vhost_recovery_terms(rabbit_types:vhost(), pid()) -> ok.
+save_vhost_recovery_terms(VHost, RecoveryTermsPid) ->
+ true = ets:update_element(?MODULE, VHost,
+ [{#vhost_sup.recovery_terms_pid, RecoveryTermsPid}]),
ok.
-spec save_vhost_process(rabbit_types:vhost(), pid()) -> ok.
save_vhost_process(VHost, VHostProcessPid) ->
true = ets:update_element(?MODULE, VHost,
- {#vhost_sup.vhost_process_pid, VHostProcessPid}),
+ [{#vhost_sup.vhost_process_pid, VHostProcessPid}]),
ok.
-spec lookup_vhost_sup_record(rabbit_types:vhost()) -> #vhost_sup{} | not_found.
@@ -229,6 +239,17 @@ lookup_vhost_sup_record(VHost) ->
undefined -> not_found
end.
+-spec lookup_vhost_recovery_terms(rabbit_types:vhost()) -> pid() | not_found.
+lookup_vhost_recovery_terms(VHost) ->
+ case ets:info(?MODULE, name) of
+ ?MODULE ->
+ case ets:lookup(?MODULE, VHost) of
+ [] -> not_found;
+ [#vhost_sup{} = VHostSup] -> VHostSup#vhost_sup.recovery_terms_pid
+ end;
+ undefined -> not_found
+ end.
+
-spec vhost_sup_pid(rabbit_types:vhost()) -> no_pid | {ok, pid()}.
vhost_sup_pid(VHost) ->
case lookup_vhost_sup_record(VHost) of