diff options
-rw-r--r-- | deps/rabbit/src/rabbit_classic_queue_index_v2.erl | 20 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_file.erl | 16 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_queue_index.erl | 42 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_recovery_terms.erl | 17 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_vhost.erl | 2 | ||||
-rw-r--r-- | deps/rabbit/test/backing_queue_SUITE.erl | 2 |
6 files changed, 59 insertions, 40 deletions
diff --git a/deps/rabbit/src/rabbit_classic_queue_index_v2.erl b/deps/rabbit/src/rabbit_classic_queue_index_v2.erl index 51937558d7..196c08f164 100644 --- a/deps/rabbit/src/rabbit_classic_queue_index_v2.erl +++ b/deps/rabbit/src/rabbit_classic_queue_index_v2.erl @@ -205,9 +205,10 @@ init1(Name, Dir, OnSyncFun, OnSyncMsgFun) -> ensure_queue_name_stub_file(#resource{virtual_host = VHost, name = QName}, Dir) -> QueueNameFile = filename:join(Dir, ?QUEUE_NAME_STUB_FILE), - ok = write_file_and_ensure_dir(QueueNameFile, <<"VHOST: ", VHost/binary, "\n", - "QUEUE: ", QName/binary, "\n", - "INDEX: v2\n">>). + ok = rabbit_file:write_file_and_ensure_dir(QueueNameFile, + <<"VHOST: ", VHost/binary, "\n", + "QUEUE: ", QName/binary, "\n", + "INDEX: v2\n">>). -spec reset_state(State) -> State when State::state(). @@ -542,7 +543,7 @@ terminate(VHost, Terms, State0 = #qi { dir = Dir, file_handle_cache:release_reservation(), %% Write recovery terms for faster recovery. _ = rabbit_recovery_terms:store(VHost, - filename:basename(rabbit_file:binary_to_filename(Dir)), + rabbit_file:binary_to_filename(Dir), [{v2_index_state, {?VERSION, Segments}} | Terms]), State#qi{ segments = #{}, fds = #{} }. @@ -1291,14 +1292,3 @@ highest_continuous_seq_id([SeqId1, SeqId2|Tail], EndSeqId) highest_continuous_seq_id([SeqId2|Tail], EndSeqId); highest_continuous_seq_id([SeqId|Tail], _) -> {SeqId, Tail}. - -write_file_and_ensure_dir(Name, IOData) -> - case file:write_file(Name, IOData, [raw]) of - ok -> ok; - {error, enoent} -> - case filelib:ensure_dir(Name) of - ok -> file:write_file(Name, IOData, [raw]); - Err -> Err - end; - Err -> Err - end. diff --git a/deps/rabbit/src/rabbit_file.erl b/deps/rabbit/src/rabbit_file.erl index 8115be6923..f9c963c511 100644 --- a/deps/rabbit/src/rabbit_file.erl +++ b/deps/rabbit/src/rabbit_file.erl @@ -17,6 +17,7 @@ -export([read_file_info/1]). -export([filename_as_a_directory/1]). -export([filename_to_binary/1, binary_to_filename/1]). +-export([write_file_and_ensure_dir/2]). -import(file_handle_cache, [with_handle/1, with_handle/2]). @@ -349,3 +350,18 @@ binary_to_filename(Bin) when is_binary(Bin) -> Other -> erlang:error(Other) end. + +%% Try to write a file and if it fails, ensure_dir and try again. +%% This is an optimisation since ensuring dir takes time and often +%% we can assume the folder exists already. +-spec write_file_and_ensure_dir(file:filename(), iodata()) -> ok_or_error(). +write_file_and_ensure_dir(Name, IOData) -> + case file:write_file(Name, IOData, [raw]) of + ok -> ok; + {error, enoent} -> + case filelib:ensure_dir(Name) of + ok -> file:write_file(Name, IOData, [raw]); + Err -> Err + end; + Err -> Err + end. diff --git a/deps/rabbit/src/rabbit_queue_index.erl b/deps/rabbit/src/rabbit_queue_index.erl index 37a05263fd..2dd4173a32 100644 --- a/deps/rabbit/src/rabbit_queue_index.erl +++ b/deps/rabbit/src/rabbit_queue_index.erl @@ -333,8 +333,7 @@ recover(#resource{ virtual_host = VHost } = Name, Terms, MsgStoreRecovered, terminate(VHost, Terms, State = #qistate { dir = Dir }) -> {SegmentCounts, State1} = terminate(State), - _ = rabbit_recovery_terms:store(VHost, filename:basename(Dir), - [{segments, SegmentCounts} | Terms]), + _ = rabbit_recovery_terms:store(VHost, Dir, [{segments, SegmentCounts} | Terms]), State1. -spec delete_and_terminate(qistate()) -> qistate(). @@ -537,25 +536,28 @@ bounds(State = #qistate { segments = Segments }) -> start(VHost, DurableQueueNames) -> ok = rabbit_recovery_terms:start(VHost), + QueuesFolder = filename:join([rabbit_vhost:msg_store_dir_path(VHost), "queues"]), {DurableTerms, DurableDirectories} = - lists:foldl( - fun(QName, {RecoveryTerms, ValidDirectories}) -> - DirName = queue_name_to_dir_name(QName), - RecoveryInfo = case rabbit_recovery_terms:read(VHost, DirName) of - {error, _} -> non_clean_shutdown; - {ok, Terms} -> Terms - end, - {[RecoveryInfo | RecoveryTerms], - sets:add_element(DirName, ValidDirectories)} - end, {[], sets:new()}, DurableQueueNames), + lists:foldl( + fun(QName, {RecoveryTerms, ValidDirectories}) -> + DirName = queue_name_to_dir_name(QName), + QueueDir = filename:join([QueuesFolder, DirName]), + RecoveryInfo = case rabbit_recovery_terms:read(VHost, QueueDir) of + {ok, Terms} -> Terms; + {error, _} -> non_clean_shutdown + end, + {[RecoveryInfo | RecoveryTerms], + sets:add_element(DirName, ValidDirectories)} + end, {[], sets:new()}, DurableQueueNames), %% Any queue directory we've not been asked to recover is considered garbage - ToDelete = [filename:join([rabbit_vhost:msg_store_dir_path(VHost), "queues", Dir]) - || Dir <- lists:subtract(all_queue_directory_names(VHost), - sets:to_list(DurableDirectories))], - rabbit_log:debug("Deleting unknown files/folders: ~p", [ToDelete]), - _ = rabbit_file:recursive_delete(ToDelete), - - rabbit_recovery_terms:clear(VHost), + _ = case [filename:join([QueuesFolder, Dir]) + || Dir <- lists:subtract(all_queue_directory_names(VHost), + sets:to_list(DurableDirectories))] of + [] -> ok; + ToDelete -> + rabbit_log:debug("Deleting unknown files/folders: ~p", [ToDelete]), + _ = rabbit_file:recursive_delete(ToDelete) + end, %% The backing queue interface requires that the queue recovery terms %% which come back from start/1 are in the same order as DurableQueueNames @@ -796,7 +798,7 @@ recover_message( true, false, del, _RelSeq, SegmentAndDirtyCount, _MaxJournal SegmentAndDirtyCount; recover_message( true, false, no_del, RelSeq, {Segment, _DirtyCount}, MaxJournal) -> %% force to flush the segment - {add_to_journal(RelSeq, del, Segment), MaxJournal + 1}; + {add_to_journal(RelSeq, del, Segment), MaxJournal + 1}; recover_message(false, _, del, RelSeq, {Segment, DirtyCount}, _MaxJournal) -> {add_to_journal(RelSeq, ack, Segment), DirtyCount + 1}; recover_message(false, _, no_del, RelSeq, {Segment, DirtyCount}, _MaxJournal) -> diff --git a/deps/rabbit/src/rabbit_recovery_terms.erl b/deps/rabbit/src/rabbit_recovery_terms.erl index cae1a161e1..66283785d1 100644 --- a/deps/rabbit/src/rabbit_recovery_terms.erl +++ b/deps/rabbit/src/rabbit_recovery_terms.erl @@ -61,12 +61,23 @@ stop(VHost) -> -spec store(rabbit_types:vhost(), file:filename(), term()) -> rabbit_types:ok_or_error(term()). -store(VHost, DirBaseName, Terms) -> - dets:insert(VHost, {DirBaseName, Terms}). +store(_VHost, QueueDir, Terms) -> + RecoveryFile = filename:join([QueueDir, ".recovery"]), + _ = rabbit_file:write_file_and_ensure_dir(RecoveryFile, term_to_iovec(Terms)). -spec read(rabbit_types:vhost(), file:filename()) -> rabbit_types:ok_or_error2(term(), not_found). -read(VHost, DirBaseName) -> +read(VHost, QueueDir) -> + RecoveryFile = filename:join([rabbit_vhost:msg_store_dir_path(VHost), QueueDir, ".recovery"]), + case file:read_file(RecoveryFile) of + {ok, TermsBin} -> + _ = prim_file:delete(RecoveryFile), + {ok, binary_to_term(TermsBin)}; + {error, _} -> + read_legacy(VHost, QueueDir) + end. + +read_legacy(VHost, DirBaseName) -> case dets:lookup(VHost, DirBaseName) of [{_, Terms}] -> {ok, Terms}; _ -> {error, not_found} diff --git a/deps/rabbit/src/rabbit_vhost.erl b/deps/rabbit/src/rabbit_vhost.erl index 24bca6f808..ebb500824a 100644 --- a/deps/rabbit/src/rabbit_vhost.erl +++ b/deps/rabbit/src/rabbit_vhost.erl @@ -56,7 +56,7 @@ recover(VHost) -> rabbit_log:info("Making sure data directory '~ts' for vhost '~ts' exists", [VHostDir, VHost]), VHostStubFile = filename:join(VHostDir, ".vhost"), - ok = rabbit_file:ensure_dir(VHostStubFile), + ok = filelib:ensure_dir(VHostStubFile), ok = file:write_file(VHostStubFile, VHost), ok = ensure_config_file(VHost), {Recovered, Failed} = rabbit_amqqueue:recover(VHost), diff --git a/deps/rabbit/test/backing_queue_SUITE.erl b/deps/rabbit/test/backing_queue_SUITE.erl index 4901f27fd7..791a0495ef 100644 --- a/deps/rabbit/test/backing_queue_SUITE.erl +++ b/deps/rabbit/test/backing_queue_SUITE.erl @@ -1589,7 +1589,7 @@ variable_queue_read_terms(QName) -> virtual_host = VHost, name = Name } = QName, <<Num:128>> = erlang:md5(<<"queue", VHost/binary, Name/binary>>), - DirName = rabbit_misc:format("~.36B", [Num]), + DirName = filename:join([rabbit_vhost:msg_store_dir_path(VHost), "queues", rabbit_misc:format("~.36B", [Num])]), {ok, Terms} = rabbit_recovery_terms:read(VHost, DirName), Terms. |