diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2010-08-16 07:14:32 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-08-16 07:14:32 +0100 |
commit | 8ccf027aee330024c6c626d485798fc796a6dd6e (patch) | |
tree | 579d282d4002d011c771c2e8d476c4d2fb08d221 | |
parent | 85a9de64b873e6a9d7df2047fbc0ce3143309485 (diff) | |
parent | 1d8e8a4d11ef2b7b397e366de389b6f514e46914 (diff) | |
download | rabbitmq-server-8ccf027aee330024c6c626d485798fc796a6dd6e.tar.gz |
bug23129
-rw-r--r-- | src/file_handle_cache.erl | 3 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 234 |
2 files changed, 86 insertions, 151 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 5b348580..8db5a794 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -382,7 +382,8 @@ copy(Src, Dest, Count) -> {ok, Count1} = Result1 -> {Result1, [SHandle #handle { offset = SOffset + Count1 }, - DHandle #handle { offset = DOffset + Count1 }]}; + DHandle #handle { offset = DOffset + Count1, + is_dirty = true }]}; Error -> {Error, [SHandle, DHandle]} end; diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index c7ebfd97..5bc1f9d5 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -495,7 +495,7 @@ client_read3(Server, #msg_location { guid = Guid, file = File }, Defer, %% gen_server callbacks %%---------------------------------------------------------------------------- -init([Server, BaseDir, ClientRefs, {MsgRefDeltaGen, MsgRefDeltaGenInit}]) -> +init([Server, BaseDir, ClientRefs, StartupFunState]) -> process_flag(trap_exit, true), ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use, @@ -506,11 +506,32 @@ init([Server, BaseDir, ClientRefs, {MsgRefDeltaGen, MsgRefDeltaGenInit}]) -> {ok, IndexModule} = application:get_env(msg_store_index_module), rabbit_log:info("~w: using ~p to provide index~n", [Server, IndexModule]), - {AllCleanShutdown, IndexState, ClientRefs1} = - recover_index_and_client_refs(IndexModule, ClientRefs, Dir, Server), + AttemptFileSummaryRecovery = + case ClientRefs of + undefined -> ok = rabbit_misc:recursive_delete([Dir]), + ok = filelib:ensure_dir(filename:join(Dir, "nothing")), + false; + _ -> ok = filelib:ensure_dir(filename:join(Dir, "nothing")), + recover_crashed_compactions(Dir) + end, + %% if we found crashed compactions we trust neither the + %% file_summary nor the location index. Note the file_summary is + %% left empty here if it can't be recovered. {FileSummaryRecovered, FileSummaryEts} = - recover_file_summary(AllCleanShutdown, Dir, Server), + recover_file_summary(AttemptFileSummaryRecovery, Dir), + + {CleanShutdown, IndexState, ClientRefs1} = + recover_index_and_client_refs(IndexModule, FileSummaryRecovered, + ClientRefs, Dir, Server), + %% CleanShutdown => msg location index and file_summary both + %% recovered correctly. + true = case {FileSummaryRecovered, CleanShutdown} of + {true, false} -> ets:delete_all_objects(FileSummaryEts); + _ -> true + end, + %% CleanShutdown <=> msg location index and file_summary both + %% recovered correctly. DedupCacheEts = ets:new(rabbit_msg_store_dedup_cache, [set, public]), FileHandlesEts = ets:new(rabbit_msg_store_shared_file_handles, @@ -537,26 +558,14 @@ init([Server, BaseDir, ClientRefs, {MsgRefDeltaGen, MsgRefDeltaGenInit}]) -> dedup_cache_ets = DedupCacheEts, cur_file_cache_ets = CurFileCacheEts, client_refs = ClientRefs1, - successfully_recovered = AllCleanShutdown, + successfully_recovered = CleanShutdown, file_size_limit = FileSizeLimit }, - ok = case AllCleanShutdown of - true -> ok; - false -> count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State) - end, - - FileNames = - sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION, Dir)), - TmpFileNames = - sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION_TMP, Dir)), - ok = recover_crashed_compactions(Dir, FileNames, TmpFileNames), - - %% There should be no more tmp files now, so go ahead and load the - %% whole lot - Files = [filename_to_num(FileName) || FileName <- FileNames], + %% If we didn't recover the msg location index then we need to + %% rebuild it now. {Offset, State1 = #msstate { current_file = CurFile }} = - build_index(FileSummaryRecovered, Files, State), + build_index(CleanShutdown, StartupFunState, State), %% read is only needed so that we can seek {ok, CurHdl} = open_file(Dir, filenum_to_name(CurFile), @@ -1038,9 +1047,9 @@ filenum_to_name(File) -> integer_to_list(File) ++ ?FILE_EXTENSION. filename_to_num(FileName) -> list_to_integer(filename:rootname(FileName)). -sort_file_names(FileNames) -> +list_sorted_file_names(Dir, Ext) -> lists:sort(fun (A, B) -> filename_to_num(A) < filename_to_num(B) end, - FileNames). + filelib:wildcard("*" ++ Ext, Dir)). %%---------------------------------------------------------------------------- %% message cache helper functions @@ -1120,15 +1129,15 @@ index_delete_by_file(File, #msstate { index_module = Index, %% shutdown and recovery %%---------------------------------------------------------------------------- -recover_index_and_client_refs(IndexModule, undefined, Dir, _Server) -> - ok = rabbit_misc:recursive_delete([Dir]), - ok = filelib:ensure_dir(filename:join(Dir, "nothing")), +recover_index_and_client_refs(IndexModule, _Recover, undefined, Dir, _Server) -> + {false, IndexModule:new(Dir), sets:new()}; +recover_index_and_client_refs(IndexModule, false, _ClientRefs, Dir, Server) -> + rabbit_log:warning("~w: rebuilding indices from scratch~n", [Server]), {false, IndexModule:new(Dir), sets:new()}; -recover_index_and_client_refs(IndexModule, ClientRefs, Dir, Server) -> - ok = filelib:ensure_dir(filename:join(Dir, "nothing")), +recover_index_and_client_refs(IndexModule, true, ClientRefs, Dir, Server) -> Fresh = fun (ErrorMsg, ErrorArgs) -> - rabbit_log:warning("~w: " ++ ErrorMsg ++ - "~nrebuilding indices from scratch~n", + rabbit_log:warning("~w: " ++ ErrorMsg ++ "~n" + "rebuilding indices from scratch~n", [Server | ErrorArgs]), {false, IndexModule:new(Dir), sets:new()} end, @@ -1142,12 +1151,12 @@ recover_index_and_client_refs(IndexModule, ClientRefs, Dir, Server) -> andalso IndexModule =:= RecIndexModule) of true -> case IndexModule:recover(Dir) of {ok, IndexState1} -> - ClientRefs1 = sets:from_list(ClientRefs), - {true, IndexState1, ClientRefs1}; + {true, IndexState1, + sets:from_list(ClientRefs)}; {error, Error} -> Fresh("failed to recover index: ~p", [Error]) end; - false -> Fresh("recovery terms differ from present", []) + false -> Fresh("recovery terms differ from present", []) end end. @@ -1168,7 +1177,7 @@ store_file_summary(Tid, Dir) -> ok = ets:tab2file(Tid, filename:join(Dir, ?FILE_SUMMARY_FILENAME), [{extended_info, [object_count]}]). -recover_file_summary(false, _Dir, _Server) -> +recover_file_summary(false, _Dir) -> %% TODO: the only reason for this to be an *ordered*_set is so %% that a) maybe_compact can start a traversal from the eldest %% file, and b) build_index in fast recovery mode can easily @@ -1178,15 +1187,12 @@ recover_file_summary(false, _Dir, _Server) -> %% ditching the latter would be neater. {false, ets:new(rabbit_msg_store_file_summary, [ordered_set, public, {keypos, #file_summary.file}])}; -recover_file_summary(true, Dir, Server) -> +recover_file_summary(true, Dir) -> Path = filename:join(Dir, ?FILE_SUMMARY_FILENAME), case ets:file2tab(Path) of - {ok, Tid} -> file:delete(Path), + {ok, Tid} -> file:delete(Path), {true, Tid}; - {error, Error} -> rabbit_log:warning( - "~w: failed to recover file summary: ~p~n" - "rebuilding~n", [Server, Error]), - recover_file_summary(false, Dir, Server) + {error, _Error} -> recover_file_summary(false, Dir) end. count_msg_refs(Gen, Seed, State) -> @@ -1199,6 +1205,7 @@ count_msg_refs(Gen, Seed, State) -> ok = case index_lookup(Guid, State) of not_found -> index_insert(#msg_location { guid = Guid, + file = undefined, ref_count = Delta }, State); #msg_location { ref_count = RefCount } = StoreEntry -> @@ -1213,7 +1220,9 @@ count_msg_refs(Gen, Seed, State) -> count_msg_refs(Gen, Next, State) end. -recover_crashed_compactions(Dir, FileNames, TmpFileNames) -> +recover_crashed_compactions(Dir) -> + FileNames = list_sorted_file_names(Dir, ?FILE_EXTENSION), + TmpFileNames = list_sorted_file_names(Dir, ?FILE_EXTENSION_TMP), lists:foreach( fun (TmpFileName) -> NonTmpRelatedFileName = @@ -1222,103 +1231,26 @@ recover_crashed_compactions(Dir, FileNames, TmpFileNames) -> ok = recover_crashed_compaction( Dir, TmpFileName, NonTmpRelatedFileName) end, TmpFileNames), - ok. + TmpFileNames == []. recover_crashed_compaction(Dir, TmpFileName, NonTmpRelatedFileName) -> - {ok, UncorruptedMessagesTmp, GuidsTmp} = - scan_file_for_valid_messages_and_guids(Dir, TmpFileName), - {ok, UncorruptedMessages, Guids} = - scan_file_for_valid_messages_and_guids(Dir, NonTmpRelatedFileName), - %% 1) It's possible that everything in the tmp file is also in the - %% main file such that the main file is (prefix ++ - %% tmpfile). This means that compaction failed immediately - %% prior to the final step of deleting the tmp file. Plan: just - %% delete the tmp file - %% 2) It's possible that everything in the tmp file is also in the - %% main file but with holes throughout (or just somthing like - %% main = (prefix ++ hole ++ tmpfile)). This means that - %% compaction wrote out the tmp file successfully and then - %% failed. Plan: just delete the tmp file and allow the - %% compaction to eventually be triggered later - %% 3) It's possible that everything in the tmp file is also in the - %% main file but such that the main file does not end with tmp - %% file (and there are valid messages in the suffix; main = - %% (prefix ++ tmpfile[with extra holes?] ++ suffix)). This - %% means that compaction failed as we were writing out the tmp - %% file. Plan: just delete the tmp file and allow the - %% compaction to eventually be triggered later - %% 4) It's possible that there are messages in the tmp file which - %% are not in the main file. This means that writing out the - %% tmp file succeeded, but then we failed as we were copying - %% them back over to the main file, after truncating the main - %% file. As the main file has already been truncated, it should - %% consist only of valid messages. Plan: Truncate the main file - %% back to before any of the files in the tmp file and copy - %% them over again - TmpPath = form_filename(Dir, TmpFileName), - case is_sublist(GuidsTmp, Guids) of - true -> %% we're in case 1, 2 or 3 above. Just delete the tmp file - %% note this also catches the case when the tmp file - %% is empty - ok = file:delete(TmpPath); - false -> - %% We're in case 4 above. We only care about the inital - %% msgs in main file that are not in the tmp file. If - %% there are no msgs in the tmp file then we would be in - %% the 'true' branch of this case, so we know the - %% lists:last call is safe. - EldestTmpGuid = lists:last(GuidsTmp), - {Guids1, UncorruptedMessages1} - = case lists:splitwith( - fun (Guid) -> Guid =/= EldestTmpGuid end, Guids) of - {_Guids, []} -> %% no msgs from tmp in main - {Guids, UncorruptedMessages}; - {Dropped, [EldestTmpGuid | Rest]} -> - %% Msgs in Dropped are in tmp, so forget them. - %% *cry*. Lists indexed from 1. - {Rest, lists:sublist(UncorruptedMessages, - 2 + length(Dropped), - length(Rest))} - end, - %% The main file prefix should be contiguous - {Top, Guids1} = find_contiguous_block_prefix( - lists:reverse(UncorruptedMessages1)), - %% we should have that none of the messages in the prefix - %% are in the tmp file - true = is_disjoint(Guids1, GuidsTmp), - %% must open with read flag, otherwise will stomp over contents - {ok, MainHdl} = open_file(Dir, NonTmpRelatedFileName, - [read | ?WRITE_MODE]), - %% Wipe out any rubbish at the end of the file. Remember - %% the head of the list will be the highest entry in the - %% file. - [{_, TmpTopTotalSize, TmpTopOffset}|_] = UncorruptedMessagesTmp, - TmpSize = TmpTopOffset + TmpTopTotalSize, - %% Extend the main file as big as necessary in a single - %% move. If we run out of disk space, this truncate could - %% fail, but we still aren't risking losing data - ok = truncate_and_extend_file(MainHdl, Top, Top + TmpSize), - {ok, TmpHdl} = open_file(Dir, TmpFileName, ?READ_AHEAD_MODE), - {ok, TmpSize} = file_handle_cache:copy(TmpHdl, MainHdl, TmpSize), - ok = file_handle_cache:close(MainHdl), - ok = file_handle_cache:delete(TmpHdl), - - {ok, _MainMessages, GuidsMain} = - scan_file_for_valid_messages_and_guids( - Dir, NonTmpRelatedFileName), - %% check that everything in Guids1 is in GuidsMain - true = is_sublist(Guids1, GuidsMain), - %% check that everything in GuidsTmp is in GuidsMain - true = is_sublist(GuidsTmp, GuidsMain) - end, + %% Because a msg can legitimately appear multiple times in the + %% same file, identifying the contents of the tmp file and where + %% they came from is non-trivial. If we are recovering a crashed + %% compaction then we will be rebuilding the index, which can cope + %% with duplicates appearing. Thus the simplest and safest thing + %% to do is to append the contents of the tmp file to its main + %% file. + {ok, TmpHdl} = open_file(Dir, TmpFileName, ?READ_MODE), + {ok, MainHdl} = open_file(Dir, NonTmpRelatedFileName, + ?READ_MODE ++ ?WRITE_MODE), + {ok, _End} = file_handle_cache:position(MainHdl, eof), + Size = filelib:file_size(form_filename(Dir, TmpFileName)), + {ok, Size} = file_handle_cache:copy(TmpHdl, MainHdl, Size), + ok = file_handle_cache:close(MainHdl), + ok = file_handle_cache:delete(TmpHdl), ok. -is_sublist(SmallerL, BiggerL) -> - lists:all(fun (Item) -> lists:member(Item, BiggerL) end, SmallerL). - -is_disjoint(SmallerL, BiggerL) -> - lists:all(fun (Item) -> not lists:member(Item, BiggerL) end, SmallerL). - scan_file_for_valid_messages(Dir, FileName) -> case open_file(Dir, FileName, ?READ_MODE) of {ok, Hdl} -> Valid = rabbit_msg_file:scan( @@ -1332,10 +1264,6 @@ scan_file_for_valid_messages(Dir, FileName) -> {error, Reason} -> {error, {unable_to_scan_file, FileName, Reason}} end. -scan_file_for_valid_messages_and_guids(Dir, FileName) -> - {ok, Messages, _FileSize} = scan_file_for_valid_messages(Dir, FileName), - {ok, Messages, [Guid || {Guid, _TotalSize, _FileOffset} <- Messages]}. - %% Takes the list in *ascending* order (i.e. eldest message %% first). This is the opposite of what scan_file_for_valid_messages %% produces. The list of msgs that is produced is youngest first. @@ -1350,8 +1278,8 @@ find_contiguous_block_prefix([{Guid, TotalSize, ExpectedOffset} | Tail], find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, Guids) -> {ExpectedOffset, Guids}. -build_index(true, _Files, State = #msstate { - file_summary_ets = FileSummaryEts }) -> +build_index(true, _StartupFunState, + State = #msstate { file_summary_ets = FileSummaryEts }) -> ets:foldl( fun (#file_summary { valid_total_size = ValidTotalSize, file_size = FileSize, @@ -1363,12 +1291,17 @@ build_index(true, _Files, State = #msstate { sum_file_size = SumFileSize + FileSize, current_file = File }} end, {0, State}, FileSummaryEts); -build_index(false, Files, State) -> +build_index(false, {MsgRefDeltaGen, MsgRefDeltaGenInit}, + State = #msstate { dir = Dir }) -> + ok = count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State), {ok, Pid} = gatherer:start_link(), - case Files of - [] -> build_index(Pid, undefined, [State #msstate.current_file], State); - _ -> {Offset, State1} = build_index(Pid, undefined, Files, State), - {Offset, lists:foldl(fun delete_file_if_empty/2, State1, Files)} + case [filename_to_num(FileName) || + FileName <- list_sorted_file_names(Dir, ?FILE_EXTENSION)] of + [] -> build_index(Pid, undefined, [State #msstate.current_file], + State); + Files -> {Offset, State1} = build_index(Pid, undefined, Files, State), + {Offset, lists:foldl(fun delete_file_if_empty/2, + State1, Files)} end. build_index(Gatherer, Left, [], @@ -1409,14 +1342,14 @@ build_index_worker(Gatherer, State = #msstate { dir = Dir }, lists:foldl( fun (Obj = {Guid, TotalSize, Offset}, {VMAcc, VTSAcc}) -> case index_lookup(Guid, State) of - not_found -> - {VMAcc, VTSAcc}; - StoreEntry -> + #msg_location { file = undefined } = StoreEntry -> ok = index_update(StoreEntry #msg_location { file = File, offset = Offset, total_size = TotalSize }, State), - {[Obj | VMAcc], VTSAcc + TotalSize} + {[Obj | VMAcc], VTSAcc + TotalSize}; + _ -> + {VMAcc, VTSAcc} end end, {[], 0}, Messages), %% foldl reverses lists, find_contiguous_block_prefix needs @@ -1671,9 +1604,10 @@ find_unremoved_messages_in_file(File, {ok, Messages, _FileSize} = scan_file_for_valid_messages(Dir, filenum_to_name(File)), %% foldl will reverse so will end up with msgs in ascending offset order - lists:foldl(fun ({Guid, TotalSize, _Offset}, Acc = {List, Size}) -> + lists:foldl(fun ({Guid, TotalSize, Offset}, Acc = {List, Size}) -> case Index:lookup(Guid, IndexState) of - #msg_location { file = File } = Entry -> + #msg_location { file = File, total_size = TotalSize, + offset = Offset } = Entry -> {[ Entry | List ], TotalSize + Size}; _ -> Acc |