diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-15 13:32:32 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-15 13:32:32 +0100 |
commit | e4ffea0262065f0745f25ba5d2c38f6722b1eb09 (patch) | |
tree | 47dfd484302815cfb474f7416677a02fa9ce5dfb | |
parent | 321878f3a523a79512c6c5e35ddbf796cd198481 (diff) | |
download | rabbitmq-server-e4ffea0262065f0745f25ba5d2c38f6722b1eb09.tar.gz |
Improve, correct and document msg store recovery
-rw-r--r-- | src/rabbit_msg_store.erl | 60 |
1 files changed, 41 insertions, 19 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 7fa0979c..52c4e696 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -506,11 +506,31 @@ init([Server, BaseDir, ClientRefs, {MsgRefDeltaGen, MsgRefDeltaGenInit}]) -> {ok, IndexModule} = application:get_env(msg_store_index_module), rabbit_log:info("~w: using ~p to provide index~n", [Server, IndexModule]), - {AllCleanShutdown, IndexState, ClientRefs1} = - recover_index_and_client_refs(IndexModule, ClientRefs, Dir, Server), + FileNames = + sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION, Dir)), + TmpFileNames = + sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION_TMP, Dir)), + FoundCrashedCompactions = + recover_crashed_compactions(Dir, FileNames, TmpFileNames), + %% if we found crashed compactions we trust neither the + %% file_summary nor the location index. Note the file_summary is + %% left empty here if it can't be recovered. {FileSummaryRecovered, FileSummaryEts} = - recover_file_summary(AllCleanShutdown, Dir, Server), + recover_file_summary(not FoundCrashedCompactions, Dir, Server), + %% FileSummaryRecovered => not FoundCrashedCompactions + + {AllCleanShutdown, IndexState, ClientRefs1} = + recover_index_and_client_refs(IndexModule, FileSummaryRecovered, + ClientRefs, Dir, Server), + %% AllCleanShutdown => msg location index and file_summary both + %% recovered correctly. + true = case {FileSummaryRecovered, AllCleanShutdown} of + {true, false} -> ets:delete_all_objects(FileSummaryEts); + _ -> true + end, + %% AllCleanShutdown <=> msg location index and file_summary both + %% recovered correctly. DedupCacheEts = ets:new(rabbit_msg_store_dedup_cache, [set, public]), FileHandlesEts = ets:new(rabbit_msg_store_shared_file_handles, @@ -541,24 +561,22 @@ init([Server, BaseDir, ClientRefs, {MsgRefDeltaGen, MsgRefDeltaGenInit}]) -> file_size_limit = FileSizeLimit }, + %% If we didn't recover the msg location index then we need to + %% rebuild it now. Does not touch file_summary (will stay empty). 2-stage + %% process. This part just seeds the index with msg guids and + %% ref_counts that the queues report to us. ok = case AllCleanShutdown of true -> ok; false -> count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State) end, - FileNames = + %% recover_index_and_client_refs could have wiped out all files, + %% so need to look again at what files we have to scan + FileNames1 = sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION, Dir)), - TmpFileNames = - sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION_TMP, Dir)), - FoundCrashedCompactions = - recover_crashed_compactions(Dir, FileNames, TmpFileNames), - - %% There should be no more tmp files now, so go ahead and load the - %% whole lot - Files = [filename_to_num(FileName) || FileName <- FileNames], - NeedsIndexBuild = FoundCrashedCompactions orelse not FileSummaryRecovered, + Files = [filename_to_num(FileName) || FileName <- FileNames1], {Offset, State1 = #msstate { current_file = CurFile }} = - build_index(NeedsIndexBuild, Files, State), + build_index(AllCleanShutdown, Files, State), %% read is only needed so that we can seek {ok, CurHdl} = open_file(Dir, filenum_to_name(CurFile), @@ -1122,11 +1140,15 @@ index_delete_by_file(File, #msstate { index_module = Index, %% shutdown and recovery %%---------------------------------------------------------------------------- -recover_index_and_client_refs(IndexModule, undefined, Dir, _Server) -> +recover_index_and_client_refs(IndexModule, _AttemptRecovery, undefined, Dir, + _Server) -> ok = rabbit_misc:recursive_delete([Dir]), ok = filelib:ensure_dir(filename:join(Dir, "nothing")), {false, IndexModule:new(Dir), sets:new()}; -recover_index_and_client_refs(IndexModule, ClientRefs, Dir, Server) -> +recover_index_and_client_refs(IndexModule, false, _ClientRefs, Dir, _Server) -> + ok = filelib:ensure_dir(filename:join(Dir, "nothing")), + {false, IndexModule:new(Dir), sets:new()}; +recover_index_and_client_refs(IndexModule, true, ClientRefs, Dir, Server) -> ok = filelib:ensure_dir(filename:join(Dir, "nothing")), Fresh = fun (ErrorMsg, ErrorArgs) -> rabbit_log:warning("~w: " ++ ErrorMsg ++ @@ -1237,7 +1259,7 @@ recover_crashed_compaction(Dir, TmpFileName, NonTmpRelatedFileName) -> %% file. {ok, TmpHdl} = open_file(Dir, TmpFileName, ?READ_AHEAD_MODE), {ok, MainHdl} = open_file(Dir, NonTmpRelatedFileName, - ?READ_AHEAD_MODE ++ ?WRITE_MODE), + ?READ_MODE ++ ?WRITE_MODE), {ok, _End} = file_handle_cache:position(MainHdl, eof), Size = filelib:file_size(form_filename(Dir, TmpFileName)), {ok, Size} = file_handle_cache:copy(TmpHdl, MainHdl, Size), @@ -1272,7 +1294,7 @@ find_contiguous_block_prefix([{Guid, TotalSize, ExpectedOffset} | Tail], find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, Guids) -> {ExpectedOffset, Guids}. -build_index(false, _Files, State = #msstate { +build_index(true, _Files, State = #msstate { file_summary_ets = FileSummaryEts }) -> ets:foldl( fun (#file_summary { valid_total_size = ValidTotalSize, @@ -1285,7 +1307,7 @@ build_index(false, _Files, State = #msstate { sum_file_size = SumFileSize + FileSize, current_file = File }} end, {0, State}, FileSummaryEts); -build_index(true, Files, State) -> +build_index(false, Files, State) -> {ok, Pid} = gatherer:start_link(), case Files of [] -> build_index(Pid, undefined, [State #msstate.current_file], State); |