summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--deps/rabbit/src/rabbit_classic_queue_index_v2.erl20
-rw-r--r--deps/rabbit/src/rabbit_file.erl16
-rw-r--r--deps/rabbit/src/rabbit_queue_index.erl42
-rw-r--r--deps/rabbit/src/rabbit_recovery_terms.erl17
-rw-r--r--deps/rabbit/src/rabbit_vhost.erl2
-rw-r--r--deps/rabbit/test/backing_queue_SUITE.erl2
6 files changed, 59 insertions, 40 deletions
diff --git a/deps/rabbit/src/rabbit_classic_queue_index_v2.erl b/deps/rabbit/src/rabbit_classic_queue_index_v2.erl
index 51937558d7..196c08f164 100644
--- a/deps/rabbit/src/rabbit_classic_queue_index_v2.erl
+++ b/deps/rabbit/src/rabbit_classic_queue_index_v2.erl
@@ -205,9 +205,10 @@ init1(Name, Dir, OnSyncFun, OnSyncMsgFun) ->
ensure_queue_name_stub_file(#resource{virtual_host = VHost, name = QName}, Dir) ->
QueueNameFile = filename:join(Dir, ?QUEUE_NAME_STUB_FILE),
- ok = write_file_and_ensure_dir(QueueNameFile, <<"VHOST: ", VHost/binary, "\n",
- "QUEUE: ", QName/binary, "\n",
- "INDEX: v2\n">>).
+ ok = rabbit_file:write_file_and_ensure_dir(QueueNameFile,
+ <<"VHOST: ", VHost/binary, "\n",
+ "QUEUE: ", QName/binary, "\n",
+ "INDEX: v2\n">>).
-spec reset_state(State) -> State when State::state().
@@ -542,7 +543,7 @@ terminate(VHost, Terms, State0 = #qi { dir = Dir,
file_handle_cache:release_reservation(),
%% Write recovery terms for faster recovery.
_ = rabbit_recovery_terms:store(VHost,
- filename:basename(rabbit_file:binary_to_filename(Dir)),
+ rabbit_file:binary_to_filename(Dir),
[{v2_index_state, {?VERSION, Segments}} | Terms]),
State#qi{ segments = #{},
fds = #{} }.
@@ -1291,14 +1292,3 @@ highest_continuous_seq_id([SeqId1, SeqId2|Tail], EndSeqId)
highest_continuous_seq_id([SeqId2|Tail], EndSeqId);
highest_continuous_seq_id([SeqId|Tail], _) ->
{SeqId, Tail}.
-
-write_file_and_ensure_dir(Name, IOData) ->
- case file:write_file(Name, IOData, [raw]) of
- ok -> ok;
- {error, enoent} ->
- case filelib:ensure_dir(Name) of
- ok -> file:write_file(Name, IOData, [raw]);
- Err -> Err
- end;
- Err -> Err
- end.
diff --git a/deps/rabbit/src/rabbit_file.erl b/deps/rabbit/src/rabbit_file.erl
index 8115be6923..f9c963c511 100644
--- a/deps/rabbit/src/rabbit_file.erl
+++ b/deps/rabbit/src/rabbit_file.erl
@@ -17,6 +17,7 @@
-export([read_file_info/1]).
-export([filename_as_a_directory/1]).
-export([filename_to_binary/1, binary_to_filename/1]).
+-export([write_file_and_ensure_dir/2]).
-import(file_handle_cache, [with_handle/1, with_handle/2]).
@@ -349,3 +350,18 @@ binary_to_filename(Bin) when is_binary(Bin) ->
Other ->
erlang:error(Other)
end.
+
+%% Try to write a file and if it fails, ensure_dir and try again.
+%% This is an optimisation since ensuring dir takes time and often
+%% we can assume the folder exists already.
+-spec write_file_and_ensure_dir(file:filename(), iodata()) -> ok_or_error().
+write_file_and_ensure_dir(Name, IOData) ->
+ case file:write_file(Name, IOData, [raw]) of
+ ok -> ok;
+ {error, enoent} ->
+ case filelib:ensure_dir(Name) of
+ ok -> file:write_file(Name, IOData, [raw]);
+ Err -> Err
+ end;
+ Err -> Err
+ end.
diff --git a/deps/rabbit/src/rabbit_queue_index.erl b/deps/rabbit/src/rabbit_queue_index.erl
index 37a05263fd..2dd4173a32 100644
--- a/deps/rabbit/src/rabbit_queue_index.erl
+++ b/deps/rabbit/src/rabbit_queue_index.erl
@@ -333,8 +333,7 @@ recover(#resource{ virtual_host = VHost } = Name, Terms, MsgStoreRecovered,
terminate(VHost, Terms, State = #qistate { dir = Dir }) ->
{SegmentCounts, State1} = terminate(State),
- _ = rabbit_recovery_terms:store(VHost, filename:basename(Dir),
- [{segments, SegmentCounts} | Terms]),
+ _ = rabbit_recovery_terms:store(VHost, Dir, [{segments, SegmentCounts} | Terms]),
State1.
-spec delete_and_terminate(qistate()) -> qistate().
@@ -537,25 +536,28 @@ bounds(State = #qistate { segments = Segments }) ->
start(VHost, DurableQueueNames) ->
ok = rabbit_recovery_terms:start(VHost),
+ QueuesFolder = filename:join([rabbit_vhost:msg_store_dir_path(VHost), "queues"]),
{DurableTerms, DurableDirectories} =
- lists:foldl(
- fun(QName, {RecoveryTerms, ValidDirectories}) ->
- DirName = queue_name_to_dir_name(QName),
- RecoveryInfo = case rabbit_recovery_terms:read(VHost, DirName) of
- {error, _} -> non_clean_shutdown;
- {ok, Terms} -> Terms
- end,
- {[RecoveryInfo | RecoveryTerms],
- sets:add_element(DirName, ValidDirectories)}
- end, {[], sets:new()}, DurableQueueNames),
+ lists:foldl(
+ fun(QName, {RecoveryTerms, ValidDirectories}) ->
+ DirName = queue_name_to_dir_name(QName),
+ QueueDir = filename:join([QueuesFolder, DirName]),
+ RecoveryInfo = case rabbit_recovery_terms:read(VHost, QueueDir) of
+ {ok, Terms} -> Terms;
+ {error, _} -> non_clean_shutdown
+ end,
+ {[RecoveryInfo | RecoveryTerms],
+ sets:add_element(DirName, ValidDirectories)}
+ end, {[], sets:new()}, DurableQueueNames),
%% Any queue directory we've not been asked to recover is considered garbage
- ToDelete = [filename:join([rabbit_vhost:msg_store_dir_path(VHost), "queues", Dir])
- || Dir <- lists:subtract(all_queue_directory_names(VHost),
- sets:to_list(DurableDirectories))],
- rabbit_log:debug("Deleting unknown files/folders: ~p", [ToDelete]),
- _ = rabbit_file:recursive_delete(ToDelete),
-
- rabbit_recovery_terms:clear(VHost),
+ _ = case [filename:join([QueuesFolder, Dir])
+ || Dir <- lists:subtract(all_queue_directory_names(VHost),
+ sets:to_list(DurableDirectories))] of
+ [] -> ok;
+ ToDelete ->
+ rabbit_log:debug("Deleting unknown files/folders: ~p", [ToDelete]),
+ _ = rabbit_file:recursive_delete(ToDelete)
+ end,
%% The backing queue interface requires that the queue recovery terms
%% which come back from start/1 are in the same order as DurableQueueNames
@@ -796,7 +798,7 @@ recover_message( true, false, del, _RelSeq, SegmentAndDirtyCount, _MaxJournal
SegmentAndDirtyCount;
recover_message( true, false, no_del, RelSeq, {Segment, _DirtyCount}, MaxJournal) ->
%% force to flush the segment
- {add_to_journal(RelSeq, del, Segment), MaxJournal + 1};
+ {add_to_journal(RelSeq, del, Segment), MaxJournal + 1};
recover_message(false, _, del, RelSeq, {Segment, DirtyCount}, _MaxJournal) ->
{add_to_journal(RelSeq, ack, Segment), DirtyCount + 1};
recover_message(false, _, no_del, RelSeq, {Segment, DirtyCount}, _MaxJournal) ->
diff --git a/deps/rabbit/src/rabbit_recovery_terms.erl b/deps/rabbit/src/rabbit_recovery_terms.erl
index cae1a161e1..66283785d1 100644
--- a/deps/rabbit/src/rabbit_recovery_terms.erl
+++ b/deps/rabbit/src/rabbit_recovery_terms.erl
@@ -61,12 +61,23 @@ stop(VHost) ->
-spec store(rabbit_types:vhost(), file:filename(), term()) -> rabbit_types:ok_or_error(term()).
-store(VHost, DirBaseName, Terms) ->
- dets:insert(VHost, {DirBaseName, Terms}).
+store(_VHost, QueueDir, Terms) ->
+ RecoveryFile = filename:join([QueueDir, ".recovery"]),
+ _ = rabbit_file:write_file_and_ensure_dir(RecoveryFile, term_to_iovec(Terms)).
-spec read(rabbit_types:vhost(), file:filename()) -> rabbit_types:ok_or_error2(term(), not_found).
-read(VHost, DirBaseName) ->
+read(VHost, QueueDir) ->
+ RecoveryFile = filename:join([rabbit_vhost:msg_store_dir_path(VHost), QueueDir, ".recovery"]),
+ case file:read_file(RecoveryFile) of
+ {ok, TermsBin} ->
+ _ = prim_file:delete(RecoveryFile),
+ {ok, binary_to_term(TermsBin)};
+ {error, _} ->
+ read_legacy(VHost, QueueDir)
+ end.
+
+read_legacy(VHost, DirBaseName) ->
case dets:lookup(VHost, DirBaseName) of
[{_, Terms}] -> {ok, Terms};
_ -> {error, not_found}
diff --git a/deps/rabbit/src/rabbit_vhost.erl b/deps/rabbit/src/rabbit_vhost.erl
index 24bca6f808..ebb500824a 100644
--- a/deps/rabbit/src/rabbit_vhost.erl
+++ b/deps/rabbit/src/rabbit_vhost.erl
@@ -56,7 +56,7 @@ recover(VHost) ->
rabbit_log:info("Making sure data directory '~ts' for vhost '~ts' exists",
[VHostDir, VHost]),
VHostStubFile = filename:join(VHostDir, ".vhost"),
- ok = rabbit_file:ensure_dir(VHostStubFile),
+ ok = filelib:ensure_dir(VHostStubFile),
ok = file:write_file(VHostStubFile, VHost),
ok = ensure_config_file(VHost),
{Recovered, Failed} = rabbit_amqqueue:recover(VHost),
diff --git a/deps/rabbit/test/backing_queue_SUITE.erl b/deps/rabbit/test/backing_queue_SUITE.erl
index 4901f27fd7..791a0495ef 100644
--- a/deps/rabbit/test/backing_queue_SUITE.erl
+++ b/deps/rabbit/test/backing_queue_SUITE.erl
@@ -1589,7 +1589,7 @@ variable_queue_read_terms(QName) ->
virtual_host = VHost,
name = Name } = QName,
<<Num:128>> = erlang:md5(<<"queue", VHost/binary, Name/binary>>),
- DirName = rabbit_misc:format("~.36B", [Num]),
+ DirName = filename:join([rabbit_vhost:msg_store_dir_path(VHost), "queues", rabbit_misc:format("~.36B", [Num])]),
{ok, Terms} = rabbit_recovery_terms:read(VHost, DirName),
Terms.