summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-08-15 13:32:32 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-08-15 13:32:32 +0100
commite4ffea0262065f0745f25ba5d2c38f6722b1eb09 (patch)
tree47dfd484302815cfb474f7416677a02fa9ce5dfb
parent321878f3a523a79512c6c5e35ddbf796cd198481 (diff)
downloadrabbitmq-server-e4ffea0262065f0745f25ba5d2c38f6722b1eb09.tar.gz
Improve, correct and document msg store recovery
-rw-r--r--src/rabbit_msg_store.erl60
1 files changed, 41 insertions, 19 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 7fa0979c..52c4e696 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -506,11 +506,31 @@ init([Server, BaseDir, ClientRefs, {MsgRefDeltaGen, MsgRefDeltaGenInit}]) ->
{ok, IndexModule} = application:get_env(msg_store_index_module),
rabbit_log:info("~w: using ~p to provide index~n", [Server, IndexModule]),
- {AllCleanShutdown, IndexState, ClientRefs1} =
- recover_index_and_client_refs(IndexModule, ClientRefs, Dir, Server),
+ FileNames =
+ sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION, Dir)),
+ TmpFileNames =
+ sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION_TMP, Dir)),
+ FoundCrashedCompactions =
+ recover_crashed_compactions(Dir, FileNames, TmpFileNames),
+ %% if we found crashed compactions we trust neither the
+ %% file_summary nor the location index. Note the file_summary is
+ %% left empty here if it can't be recovered.
{FileSummaryRecovered, FileSummaryEts} =
- recover_file_summary(AllCleanShutdown, Dir, Server),
+ recover_file_summary(not FoundCrashedCompactions, Dir, Server),
+ %% FileSummaryRecovered => not FoundCrashedCompactions
+
+ {AllCleanShutdown, IndexState, ClientRefs1} =
+ recover_index_and_client_refs(IndexModule, FileSummaryRecovered,
+ ClientRefs, Dir, Server),
+ %% AllCleanShutdown => msg location index and file_summary both
+ %% recovered correctly.
+ true = case {FileSummaryRecovered, AllCleanShutdown} of
+ {true, false} -> ets:delete_all_objects(FileSummaryEts);
+ _ -> true
+ end,
+ %% AllCleanShutdown <=> msg location index and file_summary both
+ %% recovered correctly.
DedupCacheEts = ets:new(rabbit_msg_store_dedup_cache, [set, public]),
FileHandlesEts = ets:new(rabbit_msg_store_shared_file_handles,
@@ -541,24 +561,22 @@ init([Server, BaseDir, ClientRefs, {MsgRefDeltaGen, MsgRefDeltaGenInit}]) ->
file_size_limit = FileSizeLimit
},
+ %% If we didn't recover the msg location index then we need to
+ %% rebuild it now. Does not touch file_summary (will stay empty). 2-stage
+ %% process. This part just seeds the index with msg guids and
+ %% ref_counts that the queues report to us.
ok = case AllCleanShutdown of
true -> ok;
false -> count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State)
end,
- FileNames =
+ %% recover_index_and_client_refs could have wiped out all files,
+ %% so need to look again at what files we have to scan
+ FileNames1 =
sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION, Dir)),
- TmpFileNames =
- sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION_TMP, Dir)),
- FoundCrashedCompactions =
- recover_crashed_compactions(Dir, FileNames, TmpFileNames),
-
- %% There should be no more tmp files now, so go ahead and load the
- %% whole lot
- Files = [filename_to_num(FileName) || FileName <- FileNames],
- NeedsIndexBuild = FoundCrashedCompactions orelse not FileSummaryRecovered,
+ Files = [filename_to_num(FileName) || FileName <- FileNames1],
{Offset, State1 = #msstate { current_file = CurFile }} =
- build_index(NeedsIndexBuild, Files, State),
+ build_index(AllCleanShutdown, Files, State),
%% read is only needed so that we can seek
{ok, CurHdl} = open_file(Dir, filenum_to_name(CurFile),
@@ -1122,11 +1140,15 @@ index_delete_by_file(File, #msstate { index_module = Index,
%% shutdown and recovery
%%----------------------------------------------------------------------------
-recover_index_and_client_refs(IndexModule, undefined, Dir, _Server) ->
+recover_index_and_client_refs(IndexModule, _AttemptRecovery, undefined, Dir,
+ _Server) ->
ok = rabbit_misc:recursive_delete([Dir]),
ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
{false, IndexModule:new(Dir), sets:new()};
-recover_index_and_client_refs(IndexModule, ClientRefs, Dir, Server) ->
+recover_index_and_client_refs(IndexModule, false, _ClientRefs, Dir, _Server) ->
+ ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
+ {false, IndexModule:new(Dir), sets:new()};
+recover_index_and_client_refs(IndexModule, true, ClientRefs, Dir, Server) ->
ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
Fresh = fun (ErrorMsg, ErrorArgs) ->
rabbit_log:warning("~w: " ++ ErrorMsg ++
@@ -1237,7 +1259,7 @@ recover_crashed_compaction(Dir, TmpFileName, NonTmpRelatedFileName) ->
%% file.
{ok, TmpHdl} = open_file(Dir, TmpFileName, ?READ_AHEAD_MODE),
{ok, MainHdl} = open_file(Dir, NonTmpRelatedFileName,
- ?READ_AHEAD_MODE ++ ?WRITE_MODE),
+ ?READ_MODE ++ ?WRITE_MODE),
{ok, _End} = file_handle_cache:position(MainHdl, eof),
Size = filelib:file_size(form_filename(Dir, TmpFileName)),
{ok, Size} = file_handle_cache:copy(TmpHdl, MainHdl, Size),
@@ -1272,7 +1294,7 @@ find_contiguous_block_prefix([{Guid, TotalSize, ExpectedOffset} | Tail],
find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, Guids) ->
{ExpectedOffset, Guids}.
-build_index(false, _Files, State = #msstate {
+build_index(true, _Files, State = #msstate {
file_summary_ets = FileSummaryEts }) ->
ets:foldl(
fun (#file_summary { valid_total_size = ValidTotalSize,
@@ -1285,7 +1307,7 @@ build_index(false, _Files, State = #msstate {
sum_file_size = SumFileSize + FileSize,
current_file = File }}
end, {0, State}, FileSummaryEts);
-build_index(true, Files, State) ->
+build_index(false, Files, State) ->
{ok, Pid} = gatherer:start_link(),
case Files of
[] -> build_index(Pid, undefined, [State #msstate.current_file], State);